package socket import ( "encoding/json" "g365sfu/bytebuffer" "g365sfu/logger" "g365sfu/network" "g365sfu/sfu" connection "g365sfu/socket/struct" "g365sfu/turn" "g365sfu/utils" "net/http" "os" "github.com/gorilla/websocket" "github.com/pion/webrtc/v4" ) var upgrader = websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: func(r *http.Request) bool { // Разрешаем со всех origin return true }, } // Получение секретного ключа из переменных окружения func getSecret() string { return os.Getenv("SECRET") } // Обработчик WebSocket соединений func HandleWebSocket(w http.ResponseWriter, r *http.Request) { conn, _ := upgrader.Upgrade(w, r, nil) defer conn.Close() // Канал для передачи байтов dataChan := make(chan []byte) var connection = &connection.Connection{ Identificator: randomSocketIdentifier(), Socket: conn, } AddSocket(connection) // Удаление сокета из хранилища при закрытии соединения defer RemoveSocket(connection.Identificator) // Запуск обработчика в горутине go processData(dataChan, connection) for { messageType, p, err := conn.ReadMessage() if err != nil || messageType != websocket.BinaryMessage { break } // Передача байтов для обработки dataChan <- p } } // Генерация случайного идентификатора для сокета func randomSocketIdentifier() string { // Генерация случайного идентификатора для сокета return "sock_" + utils.RandomString(10) } func processData(data <-chan []byte, connection *connection.Connection) { for bytes := range data { // Логика обработки байтов buffer := bytebuffer.Wrap(bytes) packetId, _ := buffer.Get() if packetId == byte(network.HANDSHAKE) { // Это рукопожатие, дальше сравниваем секретные ключи secretLength, _ := buffer.GetUint32() receivedSecret, _ := buffer.GetBytes((int(secretLength))) if string(receivedSecret) == getSecret() { logger.LogSuccessMessage("success handshake from " + connection.Socket.RemoteAddr().String()) AddSocket(connection) // Подготовка ответа для клиента о успешном рукопожатии response := bytebuffer.Allocate(1) response.Put(byte(network.HANDSHAKE_SUCCESS)) response.Flip() // Отправляем ответ клиенту connection.WriteBinary(response.Bytes()) continue } response := bytebuffer.Allocate(1) response.Put(byte(network.HANDSHAKE_FAILURE)) response.Flip() // Отправляем ответ клиенту connection.WriteBinary(response.Bytes()) logger.LogWarnMessage("failed handshake from " + connection.Socket.RemoteAddr().String() + " because of invalid secret key") connection.Close() return } // Так как следующие пакеты требуют обязательного завершения рукопожатия, нужно проверять, что сокет уже прошел рукопожатие if _, exists := GetSocket(connection.Identificator); !exists { logger.LogWarnMessage("received data from " + connection.Socket.RemoteAddr().String() + " but it has not completed handshake") connection.Socket.Close() return } // Создание комнаты if packetId == byte(network.ROOM_CREATE) { roomLength, _ := buffer.GetUint32() roomIDBytes, _ := buffer.GetBytes(int(roomLength)) roomID := string(roomIDBytes) room, _ := sfu.CreateRoom(connection, roomID) logger.LogSuccessMessage("room initializated " + room.RoomID) // Подготовка ответа для клиента с ID комнаты response := bytebuffer.Allocate(1 + 4 + len([]byte(room.RoomID))) response.Put(byte(network.ROOM_CREATE_SUCCESS)) response.PutUint32(uint32(len([]byte(room.RoomID)))) response.PutBytes([]byte(room.RoomID)) response.Flip() // Отправляем ответ клиенту connection.WriteBinary(response.Bytes()) continue } //SDP OFFER для подключения к комнате if packetId == byte(network.SDP_OFFER) { roomIdLen, _ := buffer.GetUint32() roomIDBytes, _ := buffer.GetBytes(int(roomIdLen)) roomID := string(roomIDBytes) _, exists := sfu.GetRoom(roomID) if !exists { logger.LogWarnMessage("peer " + connection.Socket.RemoteAddr().String() + " tried to join non existing room " + roomID) continue } peerIdLen, _ := buffer.GetUint32() peerIDBytes, _ := buffer.GetBytes(int(peerIdLen)) peerID := string(peerIDBytes) logger.LogSuccessMessage("peer " + peerID + " joined to room " + roomID) offerLength, _ := buffer.GetUint32() offerBytes, _ := buffer.GetBytes(int(offerLength)) var offer webrtc.SessionDescription err := json.Unmarshal(offerBytes, &offer) if err != nil { logger.LogWarnMessage("failed to unmarshal offer from peer " + peerID + " in room " + roomID + ": " + err.Error()) continue } answer, err := sfu.JoinWithOffer(roomID, peerID, offer) if err != nil { logger.LogWarnMessage("failed to join peer " + peerID + " to room " + roomID + ": " + err.Error()) continue } answerBytes, _ := json.Marshal(answer) // Подготовка ответа для клиента с SDP answer response := bytebuffer.Allocate(1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)) + 4 + len(answerBytes)) response.Put(byte(network.SDP_ANSWER)) response.PutUint32(uint32(len([]byte(roomID)))) response.PutBytes([]byte(roomID)) response.PutUint32(uint32(len([]byte(peerID)))) response.PutBytes([]byte(peerID)) response.PutUint32(uint32(len(answerBytes))) response.PutBytes(answerBytes) response.Flip() // Отправляем ответ клиенту connection.WriteBinary(response.Bytes()) continue } //ICE-candidate пришел от участника комнаты if packetId == byte(network.ICE_CANDIDATE) { roomIdLen, _ := buffer.GetUint32() roomIDBytes, _ := buffer.GetBytes(int(roomIdLen)) roomID := string(roomIDBytes) peerIdLen, _ := buffer.GetUint32() peerIDBytes, _ := buffer.GetBytes(int(peerIdLen)) peerID := string(peerIDBytes) candidateLength, _ := buffer.GetUint32() candidateBytes, _ := buffer.GetBytes(int(candidateLength)) var candidate webrtc.ICECandidateInit err := json.Unmarshal(candidateBytes, &candidate) if err != nil { logger.LogWarnMessage("failed to unmarshal ICE candidate from peer " + peerID + " in room " + roomID + ": " + err.Error()) continue } err = sfu.AddICECandidate(roomID, peerID, candidate) if err != nil { logger.LogWarnMessage("failed to add ICE candidate from peer " + peerID + " in room " + roomID + ": " + err.Error()) continue } } // SDP ANSWER от клиента при renegotiation if packetId == byte(network.SDP_ANSWER_RENEGOTIATION) { roomIdLen, _ := buffer.GetUint32() roomIDBytes, _ := buffer.GetBytes(int(roomIdLen)) roomID := string(roomIDBytes) peerIdLen, _ := buffer.GetUint32() peerIDBytes, _ := buffer.GetBytes(int(peerIdLen)) peerID := string(peerIDBytes) answerLength, _ := buffer.GetUint32() answerBytes, _ := buffer.GetBytes(int(answerLength)) var answer webrtc.SessionDescription err := json.Unmarshal(answerBytes, &answer) if err != nil { logger.LogWarnMessage("failed to unmarshal answer from peer " + peerID + " in room " + roomID + ": " + err.Error()) continue } err = sfu.HandleClientAnswer(roomID, peerID, answer) if err != nil { logger.LogWarnMessage("failed to handle client answer from peer " + peerID + " in room " + roomID + ": " + err.Error()) continue } } //Check life для проверки соединения с сервером SFU if packetId == byte(network.CHECK_LIFE) { // Подготовка ответа для клиента о том, что соединение живо response := bytebuffer.Allocate(1) response.Put(byte(network.CHECK_LIFE_SUCCESS)) response.Flip() // Отправляем ответ клиенту connection.WriteBinary(response.Bytes()) continue } // Запрос от бекенда на получение данных для подключения к TURN серверу if packetId == byte(network.TURN_ASK) && turn.TURN_PUBLIC_IP != "" { // Отвечаем только в том случае, если TURN сервер был успешно запущен и данные для подключения к нему были заполнены // Отправляем два пакета один на tcp сервер другой на udp сервер, так как некоторые клиенты могут поддерживать только один из этих протоколов //tcp response := bytebuffer.Allocate(1 + 4 + len([]byte(turn.TURN_PUBLIC_IP)) + 4 + len([]byte(turn.TURN_USER)) + 4 + len([]byte(turn.TURN_PASS)) + 4 + len([]byte("tcp"))) response.Put(byte(network.TURN_SERVER)) response.PutUint32(uint32(len([]byte(turn.TURN_PUBLIC_IP)))) response.PutBytes([]byte(turn.TURN_PUBLIC_IP)) response.PutUint32(uint32(len([]byte(turn.TURN_USER)))) response.PutBytes([]byte(turn.TURN_USER)) response.PutUint32(uint32(len([]byte(turn.TURN_PASS)))) response.PutBytes([]byte(turn.TURN_PASS)) response.PutUint32(uint32(len([]byte("tcp")))) response.PutBytes([]byte("tcp")) response.Flip() connection.WriteBinary(response.Bytes()) //udp responseUDP := bytebuffer.Allocate(1 + 4 + len([]byte(turn.TURN_PUBLIC_IP)) + 4 + len([]byte(turn.TURN_USER)) + 4 + len([]byte(turn.TURN_PASS)) + 4 + len([]byte("udp"))) responseUDP.Put(byte(network.TURN_SERVER)) responseUDP.PutUint32(uint32(len([]byte(turn.TURN_PUBLIC_IP)))) responseUDP.PutBytes([]byte(turn.TURN_PUBLIC_IP)) responseUDP.PutUint32(uint32(len([]byte(turn.TURN_USER)))) responseUDP.PutBytes([]byte(turn.TURN_USER)) responseUDP.PutUint32(uint32(len([]byte(turn.TURN_PASS)))) responseUDP.PutBytes([]byte(turn.TURN_PASS)) responseUDP.PutUint32(uint32(len([]byte("udp")))) responseUDP.PutBytes([]byte("udp")) responseUDP.Flip() connection.WriteBinary(responseUDP.Bytes()) continue } } }