diff --git a/boot/boot.go b/boot/boot.go index aac3aba..d1a24f1 100644 --- a/boot/boot.go +++ b/boot/boot.go @@ -9,7 +9,6 @@ import ( "net/http" "os" - "github.com/gorilla/websocket" "github.com/joho/godotenv" "github.com/pion/webrtc/v4" ) @@ -51,7 +50,7 @@ func OnLocalICECandidate(roomID string, peerID string, candidate webrtc.ICECandi buffer.PutUint32(uint32(len([]byte(jsonCandidate)))) buffer.PutBytes([]byte(jsonCandidate)) buffer.Flip() - room.Server.Socket.WriteMessage(websocket.BinaryMessage, buffer.Bytes()) + room.Server.WriteBinary(buffer.Bytes()) } // Обработка нового оффера от сервера для конкретного пира (при renegotiation) @@ -73,5 +72,5 @@ func OnServerOffer(roomID string, peerID string, offer webrtc.SessionDescription buffer.PutUint32(uint32(len([]byte(jsonOffer)))) buffer.PutBytes([]byte(jsonOffer)) buffer.Flip() - room.Server.Socket.WriteMessage(websocket.BinaryMessage, buffer.Bytes()) + room.Server.WriteBinary(buffer.Bytes()) } diff --git a/sfu/forwarding.go b/sfu/forwarding.go index 8a0679f..18b2d52 100644 --- a/sfu/forwarding.go +++ b/sfu/forwarding.go @@ -3,10 +3,31 @@ package sfu import ( "fmt" "io" + "strings" + "sync" "github.com/pion/webrtc/v4" ) +var ( + pendingMu sync.Mutex + pendingCandidates = map[string][]webrtc.ICECandidateInit{} // key: roomID|peerID +) + +func peerKey(roomID, peerID string) string { + return roomID + "|" + peerID +} + +func extractICEUfrag(sdp string) string { + for _, line := range strings.Split(sdp, "\n") { + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "a=ice-ufrag:") { + return strings.TrimPrefix(line, "a=ice-ufrag:") + } + } + return "" +} + // Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) { publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { @@ -95,21 +116,47 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate return ErrRoomNotFound } - for _, peer := range room.Peers { - if peer.PeerID == peerID { - return peer.PeerConnection.AddICECandidate(candidate) + var pc *webrtc.PeerConnection + for _, p := range room.Peers { + if p.PeerID == peerID { + pc = p.PeerConnection + break + } + } + if pc == nil { + return ErrPeerNotFound + } + + rd := pc.RemoteDescription() + if rd == nil { + // answer/offer еще не применен — буферизуем + pendingMu.Lock() + k := peerKey(roomID, peerID) + pendingCandidates[k] = append(pendingCandidates[k], candidate) + pendingMu.Unlock() + return nil + } + + // отбрасываем stale candidate по ufrag + if candidate.UsernameFragment != nil { + current := extractICEUfrag(rd.SDP) + if current != "" && *candidate.UsernameFragment != current { + return nil } } - return ErrPeerNotFound + err := pc.AddICECandidate(candidate) + if err != nil && strings.Contains(err.Error(), "doesn't match the current ufrags") { + // поздний старый кандидат — игнорируем + return nil + } + return err } // Обрабатывает SDP ответ от клиента при renegotiation func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescription) error { - roomsMu.RLock() - room, ok := rooms[roomID] - if !ok { - roomsMu.RUnlock() + room, exists := GetRoom(roomID) + if !exists { return ErrRoomNotFound } @@ -120,10 +167,24 @@ func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescr break } } - roomsMu.RUnlock() - if pc == nil { return ErrPeerNotFound } - return pc.SetRemoteDescription(answer) + + if err := pc.SetRemoteDescription(answer); err != nil { + return err + } + + // после применения answer — применяем отложенные кандидаты + k := peerKey(roomID, peerID) + pendingMu.Lock() + queue := pendingCandidates[k] + delete(pendingCandidates, k) + pendingMu.Unlock() + + for _, c := range queue { + _ = AddICECandidate(roomID, peerID, c) + } + + return nil } diff --git a/socket/socket.go b/socket/socket.go index a132348..68be3f1 100644 --- a/socket/socket.go +++ b/socket/socket.go @@ -81,12 +81,12 @@ func processData(data <-chan []byte, connection *connection.Connection) { response.Put(0x01) response.Flip() // Отправляем ответ клиенту - connection.Socket.WriteMessage(websocket.BinaryMessage, response.Bytes()) + connection.WriteBinary(response.Bytes()) continue } - connection.Socket.WriteMessage(websocket.BinaryMessage, []byte{0xFF}) + connection.WriteBinary([]byte{0xFF}) logger.LogWarnMessage("failed handshake from " + connection.Socket.RemoteAddr().String() + " because of invalid secret key") - connection.Socket.Close() + connection.Close() return } @@ -111,7 +111,7 @@ func processData(data <-chan []byte, connection *connection.Connection) { response.PutBytes([]byte(room.RoomID)) response.Flip() // Отправляем ответ клиенту - connection.Socket.WriteMessage(websocket.BinaryMessage, response.Bytes()) + connection.WriteBinary(response.Bytes()) continue } @@ -154,7 +154,7 @@ func processData(data <-chan []byte, connection *connection.Connection) { response.PutBytes(answerBytes) response.Flip() // Отправляем ответ клиенту - connection.Socket.WriteMessage(websocket.BinaryMessage, response.Bytes()) + connection.WriteBinary(response.Bytes()) continue } diff --git a/socket/struct/connection.go b/socket/struct/connection.go index bda4331..31795bc 100644 --- a/socket/struct/connection.go +++ b/socket/struct/connection.go @@ -1,11 +1,38 @@ package connection import ( + "sync" + "github.com/gorilla/websocket" ) type Connection struct { Identificator string - //Подсоединенный сокет - Socket *websocket.Conn + Socket *websocket.Conn + + writeMu sync.Mutex + closeMu sync.Mutex + closed bool +} + +func (c *Connection) WriteBinary(payload []byte) error { + c.writeMu.Lock() + defer c.writeMu.Unlock() + return c.Socket.WriteMessage(websocket.BinaryMessage, payload) +} + +func (c *Connection) WriteJSON(v any) error { + c.writeMu.Lock() + defer c.writeMu.Unlock() + return c.Socket.WriteJSON(v) +} + +func (c *Connection) Close() error { + c.closeMu.Lock() + defer c.closeMu.Unlock() + if c.closed { + return nil + } + c.closed = true + return c.Socket.Close() }