diff --git a/boot/boot.go b/boot/boot.go index 884eb6d..259a487 100644 --- a/boot/boot.go +++ b/boot/boot.go @@ -4,8 +4,10 @@ import ( "encoding/json" "g365sfu/bytebuffer" "g365sfu/logger" + "g365sfu/network" "g365sfu/sfu" "g365sfu/socket" + connection "g365sfu/socket/struct" "g365sfu/turn" "net/http" "os" @@ -58,7 +60,7 @@ func OnLocalICECandidate(roomID string, peerID string, candidate webrtc.ICECandi buffer := bytebuffer.Allocate( 1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)) + 4 + len(jsonCandidate), ) - buffer.Put(0x04) + buffer.Put(byte(network.ON_LOCAL_ICE_CANDIDATE)) buffer.PutUint32(uint32(len([]byte(roomID)))) buffer.PutBytes([]byte(roomID)) buffer.PutUint32(uint32(len([]byte(peerID)))) @@ -80,7 +82,7 @@ func OnServerOffer(roomID string, peerID string, offer webrtc.SessionDescription buffer := bytebuffer.Allocate( 1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)) + 4 + len(jsonOffer), ) - buffer.Put(0x08) + buffer.Put(byte(network.ON_SERVER_OFFER)) buffer.PutUint32(uint32(len([]byte(roomID)))) buffer.PutBytes([]byte(roomID)) buffer.PutUint32(uint32(len([]byte(peerID)))) @@ -91,32 +93,22 @@ func OnServerOffer(roomID string, peerID string, offer webrtc.SessionDescription room.Server.WriteBinary(buffer.Bytes()) } -func OnRoomDelete(roomID string) { - room, exists := sfu.GetRoom(roomID) - if !exists { - logger.LogWarnMessage("tried to send room delete event to non existing room " + roomID) - return - } +func OnRoomDelete(roomID string, server *connection.Connection) { buffer := bytebuffer.Allocate(1 + 4 + len([]byte(roomID))) - buffer.Put(0x10) + buffer.Put(byte(network.ON_ROOM_DELETE)) buffer.PutUint32(uint32(len([]byte(roomID)))) buffer.PutBytes([]byte(roomID)) buffer.Flip() - room.Server.WriteBinary(buffer.Bytes()) + server.WriteBinary(buffer.Bytes()) } -func OnPeerDisconnected(roomID string, peerID string) { - room, exists := sfu.GetRoom(roomID) - if !exists { - logger.LogWarnMessage("tried to send peer disconnected event to non existing room " + roomID) - return - } +func OnPeerDisconnected(roomID string, peerID string, server *connection.Connection) { buffer := bytebuffer.Allocate(1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID))) - buffer.Put(0x11) + buffer.Put(byte(network.ON_PEER_DISCONNECTED)) buffer.PutUint32(uint32(len([]byte(roomID)))) buffer.PutBytes([]byte(roomID)) buffer.PutUint32(uint32(len([]byte(peerID)))) buffer.PutBytes([]byte(peerID)) buffer.Flip() - room.Server.WriteBinary(buffer.Bytes()) + server.WriteBinary(buffer.Bytes()) } diff --git a/sfu/rooms.go b/sfu/rooms.go index 572dcfc..0dab757 100644 --- a/sfu/rooms.go +++ b/sfu/rooms.go @@ -144,6 +144,7 @@ func JoinWithOffer(roomID string, peerID string, offer webrtc.SessionDescription func DeleteRoom(roomID string) error { roomsMu.Lock() room, exists := rooms[roomID] + server := room.Server if !exists { roomsMu.Unlock() return ErrRoomNotFound @@ -163,6 +164,8 @@ func DeleteRoom(roomID string) error { cleanupForwardingState(roomID, p.PeerID) } + OnRoomDelete(roomID, server) + return nil } @@ -218,7 +221,7 @@ func LeaveRoom(roomID string, peerID string) error { return DeleteRoom(roomID) } - // Опционально: renegotiation оставшимся peer после удаления треков/peer + // renegotiation оставшимся peer после удаления треков/peer room.mu.RLock() rest := make([]Peer, len(room.Peers)) copy(rest, room.Peers) diff --git a/sfu/sfu.go b/sfu/sfu.go index c46da3c..d44617c 100644 --- a/sfu/sfu.go +++ b/sfu/sfu.go @@ -3,6 +3,7 @@ package sfu import ( "errors" "g365sfu/logger" + connection "g365sfu/socket/struct" "os" "github.com/pion/interceptor" @@ -21,10 +22,10 @@ var OnServerOffer func(roomID string, peerID string, offer webrtc.SessionDescrip var OnLocalICECandidate func(roomID, peerID string, candidate webrtc.ICECandidateInit) // Коллбек для обработки отключения пира (обрыв связи) -var OnPeerDisconnected func(roomID, peerID string) +var OnPeerDisconnected func(roomID, peerID string, server *connection.Connection) // Коллбек для обработки удаления комнаты -var OnRoomDelete func(roomID string) +var OnRoomDelete func(roomID string, server *connection.Connection) // Ошибки var ( diff --git a/socket/socket.go b/socket/socket.go index 1d52cf6..d40e4a0 100644 --- a/socket/socket.go +++ b/socket/socket.go @@ -4,6 +4,7 @@ import ( "encoding/json" "g365sfu/bytebuffer" "g365sfu/logger" + "g365sfu/network" "g365sfu/sfu" connection "g365sfu/socket/struct" "g365sfu/utils" @@ -68,7 +69,7 @@ func processData(data <-chan []byte, connection *connection.Connection) { // Логика обработки байтов buffer := bytebuffer.Wrap(bytes) packetId, _ := buffer.Get() - if packetId == 0x01 { + if packetId == byte(network.HANDSHAKE) { // Это рукопожатие, дальше сравниваем секретные ключи secretLength, _ := buffer.GetUint32() @@ -78,13 +79,17 @@ func processData(data <-chan []byte, connection *connection.Connection) { AddSocket(connection) // Подготовка ответа для клиента о успешном рукопожатии response := bytebuffer.Allocate(1) - response.Put(0x01) + response.Put(byte(network.HANDSHAKE_SUCCESS)) response.Flip() // Отправляем ответ клиенту connection.WriteBinary(response.Bytes()) continue } - connection.WriteBinary([]byte{0xFF}) + response := bytebuffer.Allocate(1) + response.Put(byte(network.HANDSHAKE_FAILURE)) + response.Flip() + // Отправляем ответ клиенту + connection.WriteBinary(response.Bytes()) logger.LogWarnMessage("failed handshake from " + connection.Socket.RemoteAddr().String() + " because of invalid secret key") connection.Close() return @@ -98,7 +103,7 @@ func processData(data <-chan []byte, connection *connection.Connection) { } // Создание комнаты - if packetId == 0x02 { + if packetId == byte(network.ROOM_CREATE) { roomLength, _ := buffer.GetUint32() roomIDBytes, _ := buffer.GetBytes(int(roomLength)) roomID := string(roomIDBytes) @@ -106,7 +111,7 @@ func processData(data <-chan []byte, connection *connection.Connection) { logger.LogSuccessMessage("room initializated " + room.RoomID) // Подготовка ответа для клиента с ID комнаты response := bytebuffer.Allocate(1 + 4 + len([]byte(room.RoomID))) - response.Put(0x02) + response.Put(byte(network.ROOM_CREATE_SUCCESS)) response.PutUint32(uint32(len([]byte(room.RoomID)))) response.PutBytes([]byte(room.RoomID)) response.Flip() @@ -116,7 +121,7 @@ func processData(data <-chan []byte, connection *connection.Connection) { } //SDP OFFER для подключения к комнате - if packetId == 0x03 { + if packetId == byte(network.SDP_OFFER) { roomIdLen, _ := buffer.GetUint32() roomIDBytes, _ := buffer.GetBytes(int(roomIdLen)) roomID := string(roomIDBytes) @@ -145,7 +150,7 @@ func processData(data <-chan []byte, connection *connection.Connection) { answerBytes, _ := json.Marshal(answer) // Подготовка ответа для клиента с SDP answer response := bytebuffer.Allocate(1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)) + 4 + len(answerBytes)) - response.Put(0x05) + response.Put(byte(network.SDP_ANSWER)) response.PutUint32(uint32(len([]byte(roomID)))) response.PutBytes([]byte(roomID)) response.PutUint32(uint32(len([]byte(peerID)))) @@ -159,7 +164,7 @@ func processData(data <-chan []byte, connection *connection.Connection) { } //ICE-candidate пришел от участника комнаты - if packetId == 0x06 { + if packetId == byte(network.ICE_CANDIDATE) { roomIdLen, _ := buffer.GetUint32() roomIDBytes, _ := buffer.GetBytes(int(roomIdLen)) roomID := string(roomIDBytes) @@ -182,7 +187,7 @@ func processData(data <-chan []byte, connection *connection.Connection) { } // SDP ANSWER от клиента при renegotiation - if packetId == 0x07 { + if packetId == byte(network.SDP_ANSWER_RENEGOTIATION) { roomIdLen, _ := buffer.GetUint32() roomIDBytes, _ := buffer.GetBytes(int(roomIdLen)) roomID := string(roomIDBytes) @@ -204,10 +209,10 @@ func processData(data <-chan []byte, connection *connection.Connection) { } } //Check life для проверки соединения с сервером SFU - if packetId == 0xAE { + if packetId == byte(network.CHECK_LIFE) { // Подготовка ответа для клиента о том, что соединение живо response := bytebuffer.Allocate(1) - response.Put(0xAE) + response.Put(byte(network.CHECK_LIFE_SUCCESS)) response.Flip() // Отправляем ответ клиенту connection.WriteBinary(response.Bytes())