Добавление и обработка событий Peer_disconnected и Room_Delete

This commit is contained in:
set
2026-03-16 19:25:55 +02:00
parent 380299295d
commit e703ac22e6
4 changed files with 33 additions and 32 deletions

View File

@@ -4,8 +4,10 @@ import (
"encoding/json" "encoding/json"
"g365sfu/bytebuffer" "g365sfu/bytebuffer"
"g365sfu/logger" "g365sfu/logger"
"g365sfu/network"
"g365sfu/sfu" "g365sfu/sfu"
"g365sfu/socket" "g365sfu/socket"
connection "g365sfu/socket/struct"
"g365sfu/turn" "g365sfu/turn"
"net/http" "net/http"
"os" "os"
@@ -58,7 +60,7 @@ func OnLocalICECandidate(roomID string, peerID string, candidate webrtc.ICECandi
buffer := bytebuffer.Allocate( buffer := bytebuffer.Allocate(
1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)) + 4 + len(jsonCandidate), 1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)) + 4 + len(jsonCandidate),
) )
buffer.Put(0x04) buffer.Put(byte(network.ON_LOCAL_ICE_CANDIDATE))
buffer.PutUint32(uint32(len([]byte(roomID)))) buffer.PutUint32(uint32(len([]byte(roomID))))
buffer.PutBytes([]byte(roomID)) buffer.PutBytes([]byte(roomID))
buffer.PutUint32(uint32(len([]byte(peerID)))) buffer.PutUint32(uint32(len([]byte(peerID))))
@@ -80,7 +82,7 @@ func OnServerOffer(roomID string, peerID string, offer webrtc.SessionDescription
buffer := bytebuffer.Allocate( buffer := bytebuffer.Allocate(
1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)) + 4 + len(jsonOffer), 1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)) + 4 + len(jsonOffer),
) )
buffer.Put(0x08) buffer.Put(byte(network.ON_SERVER_OFFER))
buffer.PutUint32(uint32(len([]byte(roomID)))) buffer.PutUint32(uint32(len([]byte(roomID))))
buffer.PutBytes([]byte(roomID)) buffer.PutBytes([]byte(roomID))
buffer.PutUint32(uint32(len([]byte(peerID)))) buffer.PutUint32(uint32(len([]byte(peerID))))
@@ -91,32 +93,22 @@ func OnServerOffer(roomID string, peerID string, offer webrtc.SessionDescription
room.Server.WriteBinary(buffer.Bytes()) room.Server.WriteBinary(buffer.Bytes())
} }
func OnRoomDelete(roomID string) { func OnRoomDelete(roomID string, server *connection.Connection) {
room, exists := sfu.GetRoom(roomID)
if !exists {
logger.LogWarnMessage("tried to send room delete event to non existing room " + roomID)
return
}
buffer := bytebuffer.Allocate(1 + 4 + len([]byte(roomID))) buffer := bytebuffer.Allocate(1 + 4 + len([]byte(roomID)))
buffer.Put(0x10) buffer.Put(byte(network.ON_ROOM_DELETE))
buffer.PutUint32(uint32(len([]byte(roomID)))) buffer.PutUint32(uint32(len([]byte(roomID))))
buffer.PutBytes([]byte(roomID)) buffer.PutBytes([]byte(roomID))
buffer.Flip() buffer.Flip()
room.Server.WriteBinary(buffer.Bytes()) server.WriteBinary(buffer.Bytes())
} }
func OnPeerDisconnected(roomID string, peerID string) { func OnPeerDisconnected(roomID string, peerID string, server *connection.Connection) {
room, exists := sfu.GetRoom(roomID)
if !exists {
logger.LogWarnMessage("tried to send peer disconnected event to non existing room " + roomID)
return
}
buffer := bytebuffer.Allocate(1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID))) buffer := bytebuffer.Allocate(1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)))
buffer.Put(0x11) buffer.Put(byte(network.ON_PEER_DISCONNECTED))
buffer.PutUint32(uint32(len([]byte(roomID)))) buffer.PutUint32(uint32(len([]byte(roomID))))
buffer.PutBytes([]byte(roomID)) buffer.PutBytes([]byte(roomID))
buffer.PutUint32(uint32(len([]byte(peerID)))) buffer.PutUint32(uint32(len([]byte(peerID))))
buffer.PutBytes([]byte(peerID)) buffer.PutBytes([]byte(peerID))
buffer.Flip() buffer.Flip()
room.Server.WriteBinary(buffer.Bytes()) server.WriteBinary(buffer.Bytes())
} }

View File

@@ -144,6 +144,7 @@ func JoinWithOffer(roomID string, peerID string, offer webrtc.SessionDescription
func DeleteRoom(roomID string) error { func DeleteRoom(roomID string) error {
roomsMu.Lock() roomsMu.Lock()
room, exists := rooms[roomID] room, exists := rooms[roomID]
server := room.Server
if !exists { if !exists {
roomsMu.Unlock() roomsMu.Unlock()
return ErrRoomNotFound return ErrRoomNotFound
@@ -163,6 +164,8 @@ func DeleteRoom(roomID string) error {
cleanupForwardingState(roomID, p.PeerID) cleanupForwardingState(roomID, p.PeerID)
} }
OnRoomDelete(roomID, server)
return nil return nil
} }
@@ -218,7 +221,7 @@ func LeaveRoom(roomID string, peerID string) error {
return DeleteRoom(roomID) return DeleteRoom(roomID)
} }
// Опционально: renegotiation оставшимся peer после удаления треков/peer // renegotiation оставшимся peer после удаления треков/peer
room.mu.RLock() room.mu.RLock()
rest := make([]Peer, len(room.Peers)) rest := make([]Peer, len(room.Peers))
copy(rest, room.Peers) copy(rest, room.Peers)

View File

@@ -3,6 +3,7 @@ package sfu
import ( import (
"errors" "errors"
"g365sfu/logger" "g365sfu/logger"
connection "g365sfu/socket/struct"
"os" "os"
"github.com/pion/interceptor" "github.com/pion/interceptor"
@@ -21,10 +22,10 @@ var OnServerOffer func(roomID string, peerID string, offer webrtc.SessionDescrip
var OnLocalICECandidate func(roomID, peerID string, candidate webrtc.ICECandidateInit) var OnLocalICECandidate func(roomID, peerID string, candidate webrtc.ICECandidateInit)
// Коллбек для обработки отключения пира (обрыв связи) // Коллбек для обработки отключения пира (обрыв связи)
var OnPeerDisconnected func(roomID, peerID string) var OnPeerDisconnected func(roomID, peerID string, server *connection.Connection)
// Коллбек для обработки удаления комнаты // Коллбек для обработки удаления комнаты
var OnRoomDelete func(roomID string) var OnRoomDelete func(roomID string, server *connection.Connection)
// Ошибки // Ошибки
var ( var (

View File

@@ -4,6 +4,7 @@ import (
"encoding/json" "encoding/json"
"g365sfu/bytebuffer" "g365sfu/bytebuffer"
"g365sfu/logger" "g365sfu/logger"
"g365sfu/network"
"g365sfu/sfu" "g365sfu/sfu"
connection "g365sfu/socket/struct" connection "g365sfu/socket/struct"
"g365sfu/utils" "g365sfu/utils"
@@ -68,7 +69,7 @@ func processData(data <-chan []byte, connection *connection.Connection) {
// Логика обработки байтов // Логика обработки байтов
buffer := bytebuffer.Wrap(bytes) buffer := bytebuffer.Wrap(bytes)
packetId, _ := buffer.Get() packetId, _ := buffer.Get()
if packetId == 0x01 { if packetId == byte(network.HANDSHAKE) {
// Это рукопожатие, дальше сравниваем секретные ключи // Это рукопожатие, дальше сравниваем секретные ключи
secretLength, _ := buffer.GetUint32() secretLength, _ := buffer.GetUint32()
@@ -78,13 +79,17 @@ func processData(data <-chan []byte, connection *connection.Connection) {
AddSocket(connection) AddSocket(connection)
// Подготовка ответа для клиента о успешном рукопожатии // Подготовка ответа для клиента о успешном рукопожатии
response := bytebuffer.Allocate(1) response := bytebuffer.Allocate(1)
response.Put(0x01) response.Put(byte(network.HANDSHAKE_SUCCESS))
response.Flip() response.Flip()
// Отправляем ответ клиенту // Отправляем ответ клиенту
connection.WriteBinary(response.Bytes()) connection.WriteBinary(response.Bytes())
continue continue
} }
connection.WriteBinary([]byte{0xFF}) response := bytebuffer.Allocate(1)
response.Put(byte(network.HANDSHAKE_FAILURE))
response.Flip()
// Отправляем ответ клиенту
connection.WriteBinary(response.Bytes())
logger.LogWarnMessage("failed handshake from " + connection.Socket.RemoteAddr().String() + " because of invalid secret key") logger.LogWarnMessage("failed handshake from " + connection.Socket.RemoteAddr().String() + " because of invalid secret key")
connection.Close() connection.Close()
return return
@@ -98,7 +103,7 @@ func processData(data <-chan []byte, connection *connection.Connection) {
} }
// Создание комнаты // Создание комнаты
if packetId == 0x02 { if packetId == byte(network.ROOM_CREATE) {
roomLength, _ := buffer.GetUint32() roomLength, _ := buffer.GetUint32()
roomIDBytes, _ := buffer.GetBytes(int(roomLength)) roomIDBytes, _ := buffer.GetBytes(int(roomLength))
roomID := string(roomIDBytes) roomID := string(roomIDBytes)
@@ -106,7 +111,7 @@ func processData(data <-chan []byte, connection *connection.Connection) {
logger.LogSuccessMessage("room initializated " + room.RoomID) logger.LogSuccessMessage("room initializated " + room.RoomID)
// Подготовка ответа для клиента с ID комнаты // Подготовка ответа для клиента с ID комнаты
response := bytebuffer.Allocate(1 + 4 + len([]byte(room.RoomID))) response := bytebuffer.Allocate(1 + 4 + len([]byte(room.RoomID)))
response.Put(0x02) response.Put(byte(network.ROOM_CREATE_SUCCESS))
response.PutUint32(uint32(len([]byte(room.RoomID)))) response.PutUint32(uint32(len([]byte(room.RoomID))))
response.PutBytes([]byte(room.RoomID)) response.PutBytes([]byte(room.RoomID))
response.Flip() response.Flip()
@@ -116,7 +121,7 @@ func processData(data <-chan []byte, connection *connection.Connection) {
} }
//SDP OFFER для подключения к комнате //SDP OFFER для подключения к комнате
if packetId == 0x03 { if packetId == byte(network.SDP_OFFER) {
roomIdLen, _ := buffer.GetUint32() roomIdLen, _ := buffer.GetUint32()
roomIDBytes, _ := buffer.GetBytes(int(roomIdLen)) roomIDBytes, _ := buffer.GetBytes(int(roomIdLen))
roomID := string(roomIDBytes) roomID := string(roomIDBytes)
@@ -145,7 +150,7 @@ func processData(data <-chan []byte, connection *connection.Connection) {
answerBytes, _ := json.Marshal(answer) answerBytes, _ := json.Marshal(answer)
// Подготовка ответа для клиента с SDP answer // Подготовка ответа для клиента с SDP answer
response := bytebuffer.Allocate(1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)) + 4 + len(answerBytes)) response := bytebuffer.Allocate(1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)) + 4 + len(answerBytes))
response.Put(0x05) response.Put(byte(network.SDP_ANSWER))
response.PutUint32(uint32(len([]byte(roomID)))) response.PutUint32(uint32(len([]byte(roomID))))
response.PutBytes([]byte(roomID)) response.PutBytes([]byte(roomID))
response.PutUint32(uint32(len([]byte(peerID)))) response.PutUint32(uint32(len([]byte(peerID))))
@@ -159,7 +164,7 @@ func processData(data <-chan []byte, connection *connection.Connection) {
} }
//ICE-candidate пришел от участника комнаты //ICE-candidate пришел от участника комнаты
if packetId == 0x06 { if packetId == byte(network.ICE_CANDIDATE) {
roomIdLen, _ := buffer.GetUint32() roomIdLen, _ := buffer.GetUint32()
roomIDBytes, _ := buffer.GetBytes(int(roomIdLen)) roomIDBytes, _ := buffer.GetBytes(int(roomIdLen))
roomID := string(roomIDBytes) roomID := string(roomIDBytes)
@@ -182,7 +187,7 @@ func processData(data <-chan []byte, connection *connection.Connection) {
} }
// SDP ANSWER от клиента при renegotiation // SDP ANSWER от клиента при renegotiation
if packetId == 0x07 { if packetId == byte(network.SDP_ANSWER_RENEGOTIATION) {
roomIdLen, _ := buffer.GetUint32() roomIdLen, _ := buffer.GetUint32()
roomIDBytes, _ := buffer.GetBytes(int(roomIdLen)) roomIDBytes, _ := buffer.GetBytes(int(roomIdLen))
roomID := string(roomIDBytes) roomID := string(roomIDBytes)
@@ -204,10 +209,10 @@ func processData(data <-chan []byte, connection *connection.Connection) {
} }
} }
//Check life для проверки соединения с сервером SFU //Check life для проверки соединения с сервером SFU
if packetId == 0xAE { if packetId == byte(network.CHECK_LIFE) {
// Подготовка ответа для клиента о том, что соединение живо // Подготовка ответа для клиента о том, что соединение живо
response := bytebuffer.Allocate(1) response := bytebuffer.Allocate(1)
response.Put(0xAE) response.Put(byte(network.CHECK_LIFE_SUCCESS))
response.Flip() response.Flip()
// Отправляем ответ клиенту // Отправляем ответ клиенту
connection.WriteBinary(response.Bytes()) connection.WriteBinary(response.Bytes())