208 lines
7.6 KiB
Go
208 lines
7.6 KiB
Go
package socket
|
||
|
||
import (
|
||
"encoding/json"
|
||
"g365sfu/bytebuffer"
|
||
"g365sfu/logger"
|
||
"g365sfu/sfu"
|
||
connection "g365sfu/socket/struct"
|
||
"g365sfu/utils"
|
||
"net/http"
|
||
"os"
|
||
|
||
"github.com/gorilla/websocket"
|
||
"github.com/pion/webrtc/v4"
|
||
)
|
||
|
||
var upgrader = websocket.Upgrader{
|
||
ReadBufferSize: 1024,
|
||
WriteBufferSize: 1024,
|
||
CheckOrigin: func(r *http.Request) bool {
|
||
// Разрешаем со всех origin
|
||
return true
|
||
},
|
||
}
|
||
|
||
// Получение секретного ключа из переменных окружения
|
||
func getSecret() string {
|
||
return os.Getenv("SECRET")
|
||
}
|
||
|
||
// Обработчик WebSocket соединений
|
||
func HandleWebSocket(w http.ResponseWriter, r *http.Request) {
|
||
conn, _ := upgrader.Upgrade(w, r, nil)
|
||
defer conn.Close()
|
||
|
||
// Канал для передачи байтов
|
||
dataChan := make(chan []byte)
|
||
|
||
var connection = &connection.Connection{
|
||
Identificator: randomSocketIdentifier(),
|
||
Socket: conn,
|
||
}
|
||
AddSocket(connection)
|
||
// Удаление сокета из хранилища при закрытии соединения
|
||
defer RemoveSocket(connection.Identificator)
|
||
|
||
// Запуск обработчика в горутине
|
||
go processData(dataChan, connection)
|
||
|
||
for {
|
||
messageType, p, err := conn.ReadMessage()
|
||
if err != nil || messageType != websocket.BinaryMessage {
|
||
break
|
||
}
|
||
// Передача байтов для обработки
|
||
dataChan <- p
|
||
}
|
||
}
|
||
|
||
// Генерация случайного идентификатора для сокета
|
||
func randomSocketIdentifier() string {
|
||
// Генерация случайного идентификатора для сокета
|
||
return "sock_" + utils.RandomString(10)
|
||
}
|
||
|
||
func processData(data <-chan []byte, connection *connection.Connection) {
|
||
for bytes := range data {
|
||
// Логика обработки байтов
|
||
buffer := bytebuffer.Wrap(bytes)
|
||
packetId, _ := buffer.Get()
|
||
if packetId == 0x01 {
|
||
// Это рукопожатие, дальше сравниваем секретные ключи
|
||
secretLength, _ := buffer.GetUint32()
|
||
|
||
receivedSecret, _ := buffer.GetBytes((int(secretLength)))
|
||
if string(receivedSecret) == getSecret() {
|
||
logger.LogSuccessMessage("success handshake from " + connection.Socket.RemoteAddr().String())
|
||
AddSocket(connection)
|
||
// Подготовка ответа для клиента о успешном рукопожатии
|
||
response := bytebuffer.Allocate(1)
|
||
response.Put(0x01)
|
||
response.Flip()
|
||
// Отправляем ответ клиенту
|
||
connection.Socket.WriteMessage(websocket.BinaryMessage, response.Bytes())
|
||
continue
|
||
}
|
||
connection.Socket.WriteMessage(websocket.BinaryMessage, []byte{0xFF})
|
||
logger.LogWarnMessage("failed handshake from " + connection.Socket.RemoteAddr().String() + " because of invalid secret key")
|
||
connection.Socket.Close()
|
||
return
|
||
}
|
||
|
||
// Так как следующие пакеты требуют обязательного завершения рукопожатия, нужно проверять, что сокет уже прошел рукопожатие
|
||
if _, exists := GetSocket(connection.Identificator); !exists {
|
||
logger.LogWarnMessage("received data from " + connection.Socket.RemoteAddr().String() + " but it has not completed handshake")
|
||
connection.Socket.Close()
|
||
return
|
||
}
|
||
|
||
// Создание комнаты
|
||
if packetId == 0x02 {
|
||
roomLength, _ := buffer.GetUint32()
|
||
roomIDBytes, _ := buffer.GetBytes(int(roomLength))
|
||
roomID := string(roomIDBytes)
|
||
room, _ := sfu.CreateRoom(connection, roomID)
|
||
logger.LogSuccessMessage("room initializated " + room.RoomID)
|
||
// Подготовка ответа для клиента с ID комнаты
|
||
response := bytebuffer.Allocate(1 + 4 + len([]byte(room.RoomID)))
|
||
response.Put(0x02)
|
||
response.PutUint32(uint32(len([]byte(room.RoomID))))
|
||
response.PutBytes([]byte(room.RoomID))
|
||
response.Flip()
|
||
// Отправляем ответ клиенту
|
||
connection.Socket.WriteMessage(websocket.BinaryMessage, response.Bytes())
|
||
continue
|
||
}
|
||
|
||
//SDP OFFER для подключения к комнате
|
||
if packetId == 0x03 {
|
||
roomIdLen, _ := buffer.GetUint32()
|
||
roomIDBytes, _ := buffer.GetBytes(int(roomIdLen))
|
||
roomID := string(roomIDBytes)
|
||
_, exists := sfu.GetRoom(roomID)
|
||
if !exists {
|
||
logger.LogWarnMessage("peer " + connection.Socket.RemoteAddr().String() + " tried to join non existing room " + roomID)
|
||
continue
|
||
}
|
||
peerIdLen, _ := buffer.GetUint32()
|
||
peerIDBytes, _ := buffer.GetBytes(int(peerIdLen))
|
||
peerID := string(peerIDBytes)
|
||
logger.LogSuccessMessage("peer " + peerID + " joined to room " + roomID)
|
||
offerLength, _ := buffer.GetUint32()
|
||
offerBytes, _ := buffer.GetBytes(int(offerLength))
|
||
var offer webrtc.SessionDescription
|
||
err := json.Unmarshal(offerBytes, &offer)
|
||
if err != nil {
|
||
logger.LogWarnMessage("failed to unmarshal offer from peer " + peerID + " in room " + roomID + ": " + err.Error())
|
||
continue
|
||
}
|
||
answer, err := sfu.JoinWithOffer(roomID, peerID, offer)
|
||
if err != nil {
|
||
logger.LogWarnMessage("failed to join peer " + peerID + " to room " + roomID + ": " + err.Error())
|
||
continue
|
||
}
|
||
answerBytes, _ := json.Marshal(answer)
|
||
// Подготовка ответа для клиента с SDP answer
|
||
response := bytebuffer.Allocate(1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)) + 4 + len(answerBytes))
|
||
response.Put(0x05)
|
||
response.PutUint32(uint32(len([]byte(roomID))))
|
||
response.PutBytes([]byte(roomID))
|
||
response.PutUint32(uint32(len([]byte(peerID))))
|
||
response.PutBytes([]byte(peerID))
|
||
response.PutUint32(uint32(len(answerBytes)))
|
||
response.PutBytes(answerBytes)
|
||
response.Flip()
|
||
// Отправляем ответ клиенту
|
||
connection.Socket.WriteMessage(websocket.BinaryMessage, response.Bytes())
|
||
continue
|
||
}
|
||
|
||
//ICE-candidate пришел от участника комнаты
|
||
if packetId == 0x06 {
|
||
roomIdLen, _ := buffer.GetUint32()
|
||
roomIDBytes, _ := buffer.GetBytes(int(roomIdLen))
|
||
roomID := string(roomIDBytes)
|
||
peerIdLen, _ := buffer.GetUint32()
|
||
peerIDBytes, _ := buffer.GetBytes(int(peerIdLen))
|
||
peerID := string(peerIDBytes)
|
||
candidateLength, _ := buffer.GetUint32()
|
||
candidateBytes, _ := buffer.GetBytes(int(candidateLength))
|
||
var candidate webrtc.ICECandidateInit
|
||
err := json.Unmarshal(candidateBytes, &candidate)
|
||
if err != nil {
|
||
logger.LogWarnMessage("failed to unmarshal ICE candidate from peer " + peerID + " in room " + roomID + ": " + err.Error())
|
||
continue
|
||
}
|
||
err = sfu.AddICECandidate(roomID, peerID, candidate)
|
||
if err != nil {
|
||
logger.LogWarnMessage("failed to add ICE candidate from peer " + peerID + " in room " + roomID + ": " + err.Error())
|
||
continue
|
||
}
|
||
}
|
||
|
||
// SDP ANSWER от клиента при renegotiation
|
||
if packetId == 0x07 {
|
||
roomIdLen, _ := buffer.GetUint32()
|
||
roomIDBytes, _ := buffer.GetBytes(int(roomIdLen))
|
||
roomID := string(roomIDBytes)
|
||
peerIdLen, _ := buffer.GetUint32()
|
||
peerIDBytes, _ := buffer.GetBytes(int(peerIdLen))
|
||
peerID := string(peerIDBytes)
|
||
answerLength, _ := buffer.GetUint32()
|
||
answerBytes, _ := buffer.GetBytes(int(answerLength))
|
||
var answer webrtc.SessionDescription
|
||
err := json.Unmarshal(answerBytes, &answer)
|
||
if err != nil {
|
||
logger.LogWarnMessage("failed to unmarshal answer from peer " + peerID + " in room " + roomID + ": " + err.Error())
|
||
continue
|
||
}
|
||
err = sfu.HandleClientAnswer(roomID, peerID, answer)
|
||
if err != nil {
|
||
logger.LogWarnMessage("failed to handle client answer from peer " + peerID + " in room " + roomID + ": " + err.Error())
|
||
continue
|
||
}
|
||
}
|
||
}
|
||
}
|