diff --git a/sfu/rooms.go b/sfu/rooms.go index 5efd5fe..3c59889 100644 --- a/sfu/rooms.go +++ b/sfu/rooms.go @@ -1,18 +1,14 @@ package sfu import ( - "g365sfu/socket" + connection "g365sfu/socket/struct" "g365sfu/utils" "sync" "github.com/pion/webrtc/v4" ) -// Общие переменные -var ( - rooms = make(map[string]*Room) - roomsMu sync.RWMutex -) +// Структуры для управления комнатами и пирами в SFU type Peer struct { //Идентификатор пира, который будет использоваться для связи с ним @@ -25,20 +21,26 @@ type Room struct { //Уникальный идентификатор комнаты RoomID string //Сервер который создал комнату - Server []*socket.Connection + Server *connection.Connection //Пиры которые подключились к комнате Peers []Peer } +// Общие переменные +var ( + rooms = make(map[string]*Room) + roomsMu sync.RWMutex +) + // CreateRoom создает комнату -func CreateRoom() (*Room, error) { +func CreateRoom(server *connection.Connection) (*Room, error) { roomID := "room_" + utils.RandomString(64) roomsMu.Lock() defer roomsMu.Unlock() room := &Room{ RoomID: roomID, - Server: []*socket.Connection{}, + Server: server, Peers: []Peer{}, } rooms[roomID] = room diff --git a/socket/connections.go b/socket/connections.go index f5777cd..bf2d079 100644 --- a/socket/connections.go +++ b/socket/connections.go @@ -1,9 +1,8 @@ package socket import ( + connection "g365sfu/socket/struct" "sync" - - "github.com/gorilla/websocket" ) // Это хранилище для всех подключённых сокетов. @@ -13,20 +12,14 @@ import ( // Здесь содержатся только соединения прошедшие рукопожатие -type Connection struct { - Identificator string - //Подсоединенный сокет - Socket *websocket.Conn -} - // Потокобезопасное хранилище подключённых сокетов var ( - handshakeCompletedSockets = make(map[string]*Connection) + handshakeCompletedSockets = make(map[string]*connection.Connection) socketsMutex = sync.RWMutex{} ) // Добавление сокета в хранилище -func AddSocket(conn *Connection) { +func AddSocket(conn *connection.Connection) { socketsMutex.Lock() defer socketsMutex.Unlock() handshakeCompletedSockets[conn.Identificator] = conn @@ -40,7 +33,7 @@ func RemoveSocket(identificator string) { } // Получение сокета по идентификатору -func GetSocket(identificator string) (*Connection, bool) { +func GetSocket(identificator string) (*connection.Connection, bool) { socketsMutex.RLock() defer socketsMutex.RUnlock() conn, exists := handshakeCompletedSockets[identificator] @@ -48,10 +41,10 @@ func GetSocket(identificator string) (*Connection, bool) { } // Получение всех сокетов -func GetAllSockets() []*Connection { +func GetAllSockets() []*connection.Connection { socketsMutex.RLock() defer socketsMutex.RUnlock() - connections := make([]*Connection, 0, len(handshakeCompletedSockets)) + connections := make([]*connection.Connection, 0, len(handshakeCompletedSockets)) for _, conn := range handshakeCompletedSockets { connections = append(connections, conn) } diff --git a/socket/socket.go b/socket/socket.go index 09cdbad..cffa1af 100644 --- a/socket/socket.go +++ b/socket/socket.go @@ -1,12 +1,16 @@ package socket import ( + "encoding/json" "g365sfu/logger" + "g365sfu/sfu" + connection "g365sfu/socket/struct" "g365sfu/utils" "net/http" "os" "github.com/gorilla/websocket" + "github.com/pion/webrtc/v4" ) var upgrader = websocket.Upgrader{ @@ -31,7 +35,7 @@ func HandleWebSocket(w http.ResponseWriter, r *http.Request) { // Канал для передачи байтов dataChan := make(chan []byte) - var connection = &Connection{ + var connection = &connection.Connection{ Identificator: randomSocketIdentifier(), Socket: conn, } @@ -58,7 +62,7 @@ func randomSocketIdentifier() string { return "sock_" + utils.RandomString(10) } -func processData(data <-chan []byte, connection *Connection) { +func processData(data <-chan []byte, connection *connection.Connection) { for bytes := range data { // Логика обработки байтов if bytes[0] == 0x01 { @@ -84,6 +88,35 @@ func processData(data <-chan []byte, connection *Connection) { return } - // Следующие типы сообщений + // Создание комнаты + if bytes[0] == 0x02 { + room, _ := sfu.CreateRoom(connection) + logger.LogSuccessMessage("room initializated " + room.RoomID) + bytes = append([]byte{0x02}, []byte(room.RoomID)...) + connection.Socket.WriteMessage(websocket.BinaryMessage, bytes) + return + } + + //SDP OFFER для подключения к комнате + if bytes[0] == 0x03 { + roomIdLen := int(bytes[1]) + roomID := string(bytes[2 : 2+roomIdLen]) + _, exists := sfu.GetRoom(roomID) + if !exists { + logger.LogWarnMessage("peer " + connection.Socket.RemoteAddr().String() + " tried to join non existing room " + roomID) + return + } + peerIdLen := int(bytes[2+roomIdLen]) + peerID := string(bytes[3+roomIdLen : 3+roomIdLen+peerIdLen]) + logger.LogSuccessMessage("peer " + connection.Socket.RemoteAddr().String() + " joined to room " + roomID) + var offer webrtc.SessionDescription + err := json.Unmarshal(bytes[3+roomIdLen+peerIdLen:], &offer) + if err != nil { + logger.LogWarnMessage("failed to unmarshal offer from peer " + connection.Socket.RemoteAddr().String() + ": " + err.Error()) + return + } + sfu.JoinWithOffer(roomID, peerID, offer) + return + } } } diff --git a/socket/struct/connection.go b/socket/struct/connection.go new file mode 100644 index 0000000..bda4331 --- /dev/null +++ b/socket/struct/connection.go @@ -0,0 +1,11 @@ +package connection + +import ( + "github.com/gorilla/websocket" +) + +type Connection struct { + Identificator string + //Подсоединенный сокет + Socket *websocket.Conn +}