Files
g365sfu/socket/socket.go

268 lines
10 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package socket
import (
"encoding/json"
"g365sfu/bytebuffer"
"g365sfu/logger"
"g365sfu/network"
"g365sfu/sfu"
connection "g365sfu/socket/struct"
"g365sfu/turn"
"g365sfu/utils"
"net/http"
"os"
"github.com/gorilla/websocket"
"github.com/pion/webrtc/v4"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
// Разрешаем со всех origin
return true
},
}
// Получение секретного ключа из переменных окружения
func getSecret() string {
return os.Getenv("SECRET")
}
// Обработчик WebSocket соединений
func HandleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logger.LogWarnMessage("failed to upgrade to websocket: " + err.Error())
return
}
if conn == nil {
logger.LogWarnMessage("failed to upgrade to websocket: connection is nil")
return
}
defer conn.Close()
// Канал для передачи байтов
dataChan := make(chan []byte)
var connection = &connection.Connection{
Identificator: randomSocketIdentifier(),
Socket: conn,
}
AddSocket(connection)
// Удаление сокета из хранилища при закрытии соединения
defer RemoveSocket(connection.Identificator)
// Запуск обработчика в горутине
go processData(dataChan, connection)
for {
messageType, p, err := conn.ReadMessage()
if err != nil || messageType != websocket.BinaryMessage {
break
}
// Передача байтов для обработки
dataChan <- p
}
}
// Генерация случайного идентификатора для сокета
func randomSocketIdentifier() string {
// Генерация случайного идентификатора для сокета
return "sock_" + utils.RandomString(10)
}
func processData(data <-chan []byte, connection *connection.Connection) {
for bytes := range data {
// Логика обработки байтов
buffer := bytebuffer.Wrap(bytes)
packetId, _ := buffer.Get()
if packetId == byte(network.HANDSHAKE) {
// Это рукопожатие, дальше сравниваем секретные ключи
secretLength, _ := buffer.GetUint32()
receivedSecret, _ := buffer.GetBytes((int(secretLength)))
if string(receivedSecret) == getSecret() {
logger.LogSuccessMessage("success handshake from " + connection.Socket.RemoteAddr().String())
AddSocket(connection)
// Подготовка ответа для клиента о успешном рукопожатии
response := bytebuffer.Allocate(1)
response.Put(byte(network.HANDSHAKE_SUCCESS))
response.Flip()
// Отправляем ответ клиенту
connection.WriteBinary(response.Bytes())
continue
}
response := bytebuffer.Allocate(1)
response.Put(byte(network.HANDSHAKE_FAILURE))
response.Flip()
// Отправляем ответ клиенту
connection.WriteBinary(response.Bytes())
logger.LogWarnMessage("failed handshake from " + connection.Socket.RemoteAddr().String() + " because of invalid secret key")
connection.Close()
return
}
// Так как следующие пакеты требуют обязательного завершения рукопожатия, нужно проверять, что сокет уже прошел рукопожатие
if _, exists := GetSocket(connection.Identificator); !exists {
logger.LogWarnMessage("received data from " + connection.Socket.RemoteAddr().String() + " but it has not completed handshake")
connection.Socket.Close()
return
}
// Создание комнаты
if packetId == byte(network.ROOM_CREATE) {
roomLength, _ := buffer.GetUint32()
roomIDBytes, _ := buffer.GetBytes(int(roomLength))
roomID := string(roomIDBytes)
room, _ := sfu.CreateRoom(connection, roomID)
logger.LogSuccessMessage("room initializated " + room.RoomID)
// Подготовка ответа для клиента с ID комнаты
response := bytebuffer.Allocate(1 + 4 + len([]byte(room.RoomID)))
response.Put(byte(network.ROOM_CREATE_SUCCESS))
response.PutUint32(uint32(len([]byte(room.RoomID))))
response.PutBytes([]byte(room.RoomID))
response.Flip()
// Отправляем ответ клиенту
connection.WriteBinary(response.Bytes())
continue
}
//SDP OFFER для подключения к комнате
if packetId == byte(network.SDP_OFFER) {
roomIdLen, _ := buffer.GetUint32()
roomIDBytes, _ := buffer.GetBytes(int(roomIdLen))
roomID := string(roomIDBytes)
_, exists := sfu.GetRoom(roomID)
if !exists {
logger.LogWarnMessage("peer " + connection.Socket.RemoteAddr().String() + " tried to join non existing room " + roomID)
continue
}
peerIdLen, _ := buffer.GetUint32()
peerIDBytes, _ := buffer.GetBytes(int(peerIdLen))
peerID := string(peerIDBytes)
logger.LogSuccessMessage("peer " + peerID + " joined to room " + roomID)
offerLength, _ := buffer.GetUint32()
offerBytes, _ := buffer.GetBytes(int(offerLength))
var offer webrtc.SessionDescription
err := json.Unmarshal(offerBytes, &offer)
if err != nil {
logger.LogWarnMessage("failed to unmarshal offer from peer " + peerID + " in room " + roomID + ": " + err.Error())
continue
}
answer, err := sfu.JoinWithOffer(roomID, peerID, offer)
if err != nil {
logger.LogWarnMessage("failed to join peer " + peerID + " to room " + roomID + ": " + err.Error())
continue
}
answerBytes, _ := json.Marshal(answer)
// Подготовка ответа для клиента с SDP answer
response := bytebuffer.Allocate(1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)) + 4 + len(answerBytes))
response.Put(byte(network.SDP_ANSWER))
response.PutUint32(uint32(len([]byte(roomID))))
response.PutBytes([]byte(roomID))
response.PutUint32(uint32(len([]byte(peerID))))
response.PutBytes([]byte(peerID))
response.PutUint32(uint32(len(answerBytes)))
response.PutBytes(answerBytes)
response.Flip()
// Отправляем ответ клиенту
connection.WriteBinary(response.Bytes())
continue
}
//ICE-candidate пришел от участника комнаты
if packetId == byte(network.ICE_CANDIDATE) {
roomIdLen, _ := buffer.GetUint32()
roomIDBytes, _ := buffer.GetBytes(int(roomIdLen))
roomID := string(roomIDBytes)
peerIdLen, _ := buffer.GetUint32()
peerIDBytes, _ := buffer.GetBytes(int(peerIdLen))
peerID := string(peerIDBytes)
candidateLength, _ := buffer.GetUint32()
candidateBytes, _ := buffer.GetBytes(int(candidateLength))
var candidate webrtc.ICECandidateInit
err := json.Unmarshal(candidateBytes, &candidate)
if err != nil {
logger.LogWarnMessage("failed to unmarshal ICE candidate from peer " + peerID + " in room " + roomID + ": " + err.Error())
continue
}
err = sfu.AddICECandidate(roomID, peerID, candidate)
if err != nil {
logger.LogWarnMessage("failed to add ICE candidate from peer " + peerID + " in room " + roomID + ": " + err.Error())
continue
}
}
// SDP ANSWER от клиента при renegotiation
if packetId == byte(network.SDP_ANSWER_RENEGOTIATION) {
roomIdLen, _ := buffer.GetUint32()
roomIDBytes, _ := buffer.GetBytes(int(roomIdLen))
roomID := string(roomIDBytes)
peerIdLen, _ := buffer.GetUint32()
peerIDBytes, _ := buffer.GetBytes(int(peerIdLen))
peerID := string(peerIDBytes)
answerLength, _ := buffer.GetUint32()
answerBytes, _ := buffer.GetBytes(int(answerLength))
var answer webrtc.SessionDescription
err := json.Unmarshal(answerBytes, &answer)
if err != nil {
logger.LogWarnMessage("failed to unmarshal answer from peer " + peerID + " in room " + roomID + ": " + err.Error())
continue
}
err = sfu.HandleClientAnswer(roomID, peerID, answer)
if err != nil {
logger.LogWarnMessage("failed to handle client answer from peer " + peerID + " in room " + roomID + ": " + err.Error())
continue
}
}
//Check life для проверки соединения с сервером SFU
if packetId == byte(network.CHECK_LIFE) {
// Подготовка ответа для клиента о том, что соединение живо
response := bytebuffer.Allocate(1)
response.Put(byte(network.CHECK_LIFE_SUCCESS))
response.Flip()
// Отправляем ответ клиенту
connection.WriteBinary(response.Bytes())
continue
}
// Запрос от бекенда на получение данных для подключения к TURN серверу
if packetId == byte(network.TURN_ASK) && turn.TURN_PUBLIC_IP != "" {
// Отвечаем только в том случае, если TURN сервер был успешно запущен и данные для подключения к нему были заполнены
// Отправляем два пакета один на tcp сервер другой на udp сервер, так как некоторые клиенты могут поддерживать только один из этих протоколов
//tcp
response := bytebuffer.Allocate(1 + 4 + len([]byte(turn.TURN_PUBLIC_IP)) + 4 + len([]byte(turn.TURN_USER)) + 4 + len([]byte(turn.TURN_PASS)) + 4 + len([]byte("tcp")))
response.Put(byte(network.TURN_SERVER))
response.PutUint32(uint32(len([]byte(turn.TURN_PUBLIC_IP))))
response.PutBytes([]byte(turn.TURN_PUBLIC_IP))
response.PutUint32(uint32(len([]byte(turn.TURN_USER))))
response.PutBytes([]byte(turn.TURN_USER))
response.PutUint32(uint32(len([]byte(turn.TURN_PASS))))
response.PutBytes([]byte(turn.TURN_PASS))
response.PutUint32(uint32(len([]byte("tcp"))))
response.PutBytes([]byte("tcp"))
response.Flip()
connection.WriteBinary(response.Bytes())
//udp
responseUDP := bytebuffer.Allocate(1 + 4 + len([]byte(turn.TURN_PUBLIC_IP)) + 4 + len([]byte(turn.TURN_USER)) + 4 + len([]byte(turn.TURN_PASS)) + 4 + len([]byte("udp")))
responseUDP.Put(byte(network.TURN_SERVER))
responseUDP.PutUint32(uint32(len([]byte(turn.TURN_PUBLIC_IP))))
responseUDP.PutBytes([]byte(turn.TURN_PUBLIC_IP))
responseUDP.PutUint32(uint32(len([]byte(turn.TURN_USER))))
responseUDP.PutBytes([]byte(turn.TURN_USER))
responseUDP.PutUint32(uint32(len([]byte(turn.TURN_PASS))))
responseUDP.PutBytes([]byte(turn.TURN_PASS))
responseUDP.PutUint32(uint32(len([]byte("udp"))))
responseUDP.PutBytes([]byte("udp"))
responseUDP.Flip()
connection.WriteBinary(responseUDP.Bytes())
continue
}
}
}