From 6dadec6b646925f03124c31a9f37984362eff663 Mon Sep 17 00:00:00 2001 From: set Date: Tue, 17 Mar 2026 19:17:02 +0200 Subject: [PATCH] =?UTF-8?q?=D0=98=D1=81=D0=BF=D1=80=D0=B0=D0=B2=D0=BB?= =?UTF-8?q?=D0=B5=D0=BD=D0=B8=D0=B5=20Race=20=D0=BF=D1=80=D0=B8=20renegoti?= =?UTF-8?q?ation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- boot/boot.go | 7 ++-- logger/logger.go | 4 +-- sfu/forwarding.go | 84 ++++++++++++++++++++++++++--------------------- sfu/reason.go | 11 +++++++ sfu/rooms.go | 79 +++++++++++++++++++++++++------------------- sfu/sfu.go | 2 +- 6 files changed, 109 insertions(+), 78 deletions(-) create mode 100644 sfu/reason.go diff --git a/boot/boot.go b/boot/boot.go index 4009b22..933e974 100644 --- a/boot/boot.go +++ b/boot/boot.go @@ -59,7 +59,7 @@ func Bootstrap() { // TURN сервер выключен в конфиге, что может влиять на соединение некоторых пользователей logger.LogInfoMessage("starting without TURN server, peer connections may fail if clients are behind symmetric NATs") } - logger.LogInfoMessage("server started at x.x.x.x:" + port) + logger.LogInfoMessage("server SFU started at x.x.x.x:" + port) http.ListenAndServe(":"+port, nil) } @@ -116,13 +116,14 @@ func OnRoomDelete(roomID string, server *connection.Connection) { server.WriteBinary(buffer.Bytes()) } -func OnPeerDisconnected(roomID string, peerID string, server *connection.Connection) { - buffer := bytebuffer.Allocate(1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID))) +func OnPeerDisconnected(roomID string, peerID string, server *connection.Connection, reason sfu.DisconnectReason) { + buffer := bytebuffer.Allocate(1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)) + 4) buffer.Put(byte(network.ON_PEER_DISCONNECTED)) buffer.PutUint32(uint32(len([]byte(roomID)))) buffer.PutBytes([]byte(roomID)) buffer.PutUint32(uint32(len([]byte(peerID)))) buffer.PutBytes([]byte(peerID)) + buffer.PutUint32(uint32(reason)) buffer.Flip() server.WriteBinary(buffer.Bytes()) } diff --git a/logger/logger.go b/logger/logger.go index 61cbadc..0fe8bb0 100644 --- a/logger/logger.go +++ b/logger/logger.go @@ -20,7 +20,7 @@ func LogInfoMessage(message string) { fmt.Printf("%s[g365sfu] %s[%s]%s %s[INFO]%s %s\n", colorBlue, colorGray, timestamp, colorReset, - colorGreen, colorReset, + colorCyan, colorReset, message, ) } @@ -60,7 +60,7 @@ func LogSuccessMessage(message string) { fmt.Printf("%s[g365sfu] %s[%s]%s %s[SUCCESS]%s %s\n", colorBlue, colorGray, timestamp, colorReset, - colorCyan, colorReset, + colorGreen, colorReset, message, ) } diff --git a/sfu/forwarding.go b/sfu/forwarding.go index f1e985b..72f31be 100644 --- a/sfu/forwarding.go +++ b/sfu/forwarding.go @@ -90,6 +90,7 @@ func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) { mu sync.Mutex disconnecting bool timer *time.Timer + leaveOnce sync.Once ) room, exists := GetRoom(roomID) @@ -99,6 +100,26 @@ func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) { } server := room.Server + cancelTimer := func() { + mu.Lock() + defer mu.Unlock() + if timer != nil { + timer.Stop() + timer = nil + } + disconnecting = false + } + + leaveAndNotify := func(reason DisconnectReason) { + leaveOnce.Do(func() { + cancelTimer() + err := LeaveRoom(roomID, peerID) + if OnPeerDisconnected != nil && err == nil { + OnPeerDisconnected(roomID, peerID, server, reason) + } + }) + } + startTimer := func() { mu.Lock() defer mu.Unlock() @@ -114,35 +135,20 @@ func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) { mu.Unlock() return } - _ = LeaveRoom(roomID, peerID) - if OnPeerDisconnected != nil { - OnPeerDisconnected(roomID, peerID, server) - } + leaveAndNotify(DisconnectReasonFailed) }) } - cancelTimer := func() { - mu.Lock() - defer mu.Unlock() - if timer != nil { - timer.Stop() - timer = nil - } - disconnecting = false - } - pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { switch state { case webrtc.ICEConnectionStateConnected, webrtc.ICEConnectionStateCompleted: cancelTimer() case webrtc.ICEConnectionStateDisconnected: startTimer() - case webrtc.ICEConnectionStateFailed, webrtc.ICEConnectionStateClosed: - cancelTimer() - _ = LeaveRoom(roomID, peerID) - if OnPeerDisconnected != nil { - OnPeerDisconnected(roomID, peerID, server) - } + case webrtc.ICEConnectionStateClosed: + leaveAndNotify(DisconnectReasonClosed) + case webrtc.ICEConnectionStateFailed: + leaveAndNotify(DisconnectReasonFailed) } }) @@ -152,16 +158,15 @@ func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) { cancelTimer() case webrtc.PeerConnectionStateDisconnected: startTimer() - case webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateClosed: - cancelTimer() - _ = LeaveRoom(roomID, peerID) - if OnPeerDisconnected != nil { - OnPeerDisconnected(roomID, peerID, server) - } + case webrtc.PeerConnectionStateClosed: + leaveAndNotify(DisconnectReasonClosed) + case webrtc.PeerConnectionStateFailed: + leaveAndNotify(DisconnectReasonFailed) } }) } +// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты // Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) { publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { @@ -178,7 +183,7 @@ func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC * remote.StreamID(), ) if err != nil { - logger.LogErrorMessage("SetupForwardingForPeer: NewTrackLocalStaticRTP error") + logger.LogErrorMessage("SetupForwardingForPeer: NewTrackLocalStaticRTP error: " + err.Error()) return } defer removeRoomTrack(roomID, localTrack.ID()) @@ -203,32 +208,35 @@ func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC * continue } - // Не трогаем закрытые/failed соединения if !isPeerConnectionAlive(sub.PeerConnection) { - fmt.Println("SetupForwardingForPeer: skipping dead peer:", sub.PeerID, - sub.PeerConnection.ConnectionState().String()) continue } sender, err := sub.PeerConnection.AddTrack(localTrack) if err != nil { - fmt.Println("SetupForwardingForPeer: AddTrack error:", roomID, sub.PeerID, err) + logger.LogWarnMessage("SetupForwardingForPeer: AddTrack error: " + sub.PeerID + " " + err.Error()) continue } - // RTCP drain + senderCopy := sender go func() { buf := make([]byte, 1500) for { - if _, _, e := sender.Read(buf); e != nil { + if _, _, e := senderCopy.Read(buf); e != nil { return } } }() - if err = renegotiatePeer(roomID, sub.PeerID, sub.PeerConnection); err != nil { - fmt.Println("SetupForwardingForPeer: renegotiatePeer error:", roomID, sub.PeerID, err) - } + subID := sub.PeerID + subPC := sub.PeerConnection + + go func() { + logger.LogInfoMessage("SetupForwardingForPeer: starting renegotiation for peer=" + subID) + if err := renegotiatePeer(roomID, subID, subPC); err != nil { + logger.LogWarnMessage("SetupForwardingForPeer: renegotiatePeer error: " + subID + " " + err.Error()) + } + }() } // Для video просим keyframe @@ -245,11 +253,11 @@ func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC * if err == io.EOF { return } - fmt.Println("SetupForwardingForPeer: ReadRTP error:", err) + logger.LogWarnMessage("SetupForwardingForPeer: ReadRTP error: " + err.Error()) return } if err = localTrack.WriteRTP(pkt); err != nil { - fmt.Println("SetupForwardingForPeer: WriteRTP error:", err) + logger.LogWarnMessage("SetupForwardingForPeer: WriteRTP error: " + err.Error()) return } } diff --git a/sfu/reason.go b/sfu/reason.go new file mode 100644 index 0000000..41e514c --- /dev/null +++ b/sfu/reason.go @@ -0,0 +1,11 @@ +package sfu + +// Причины отключения пира от комнаты, которые могут быть использованы для логирования или уведомлений +type DisconnectReason int + +const ( + // Пир отключился из-за ошибки соединения или другой проблемы + DisconnectReasonFailed DisconnectReason = 0 + // Пир отключился по своей инициативе (например, закрыл приложение) + DisconnectReasonClosed = 1 +) diff --git a/sfu/rooms.go b/sfu/rooms.go index 0dab757..06bfa8e 100644 --- a/sfu/rooms.go +++ b/sfu/rooms.go @@ -1,6 +1,7 @@ package sfu import ( + "g365sfu/logger" connection "g365sfu/socket/struct" "sync" @@ -76,43 +77,9 @@ func JoinWithOffer(roomID string, peerID string, offer webrtc.SessionDescription return nil, err } - peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) { - if c == nil { - return - } - if OnLocalICECandidate != nil { - OnLocalICECandidate(roomID, peerID, c.ToJSON()) - } - }) - BindPeerLifecycle(roomID, peerID, peerConnection) SetupForwardingForPeer(roomID, peerID, peerConnection) - room.mu.RLock() - existingTracks := make([]RoomTrack, len(room.Tracks)) - copy(existingTracks, room.Tracks) - room.mu.RUnlock() - - for _, t := range existingTracks { - if t.OwnerPeer == peerID { - continue - } - - sender, err := peerConnection.AddTrack(t.Local) - if err != nil { - continue - } - - go func() { - buf := make([]byte, 1500) - for { - if _, _, e := sender.Read(buf); e != nil { - return - } - } - }() - } - if err = peerConnection.SetRemoteDescription(offer); err != nil { _ = peerConnection.Close() return nil, err @@ -131,13 +98,57 @@ func JoinWithOffer(roomID string, peerID string, offer webrtc.SessionDescription } <-gatherDone + peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) { + if c == nil { + return + } + if OnLocalICECandidate != nil { + OnLocalICECandidate(roomID, peerID, c.ToJSON()) + } + }) + + // Добавляем peer в комнату и сразу снимаем snapshot существующих треков + // в одном локе — чтобы не было race с OnTrack room.mu.Lock() room.Peers = append(room.Peers, Peer{ PeerID: peerID, PeerConnection: peerConnection, }) + existingTracks := make([]RoomTrack, len(room.Tracks)) + copy(existingTracks, room.Tracks) room.mu.Unlock() + // Подписываем нового peer на уже существующие треки ПОСЛЕ добавления в комнату + for _, t := range existingTracks { + if t.OwnerPeer == peerID { + continue + } + + sender, err := peerConnection.AddTrack(t.Local) + if err != nil { + continue + } + + senderCopy := sender + go func() { + buf := make([]byte, 1500) + for { + if _, _, e := senderCopy.Read(buf); e != nil { + return + } + } + }() + } + + // Если были добавлены треки — нужна renegotiation + if len(existingTracks) > 0 { + go func() { + if err := renegotiatePeer(roomID, peerID, peerConnection); err != nil { + logger.LogWarnMessage("JoinWithOffer: renegotiatePeer error: " + err.Error()) + } + }() + } + return peerConnection.LocalDescription(), nil } diff --git a/sfu/sfu.go b/sfu/sfu.go index f50cfec..e94f040 100644 --- a/sfu/sfu.go +++ b/sfu/sfu.go @@ -23,7 +23,7 @@ var OnServerOffer func(roomID string, peerID string, offer webrtc.SessionDescrip var OnLocalICECandidate func(roomID, peerID string, candidate webrtc.ICECandidateInit) // Коллбек для обработки отключения пира (обрыв связи) -var OnPeerDisconnected func(roomID, peerID string, server *connection.Connection) +var OnPeerDisconnected func(roomID, peerID string, server *connection.Connection, reason DisconnectReason) // Коллбек для обработки удаления комнаты var OnRoomDelete func(roomID string, server *connection.Connection)