From feb4a51ab002610e195a40d6efebafc3e760598e Mon Sep 17 00:00:00 2001 From: set Date: Fri, 20 Mar 2026 23:19:22 +0200 Subject: [PATCH] =?UTF-8?q?=D0=91=D1=83=D1=84=D0=B5=D1=80=D1=8B=20RTP=20?= =?UTF-8?q?=D0=BF=D0=B0=D0=BA=D0=B5=D1=82=D0=BE=D0=B2=20=D0=B4=D0=BB=D1=8F?= =?UTF-8?q?=20=D1=81=D0=BE=D0=BA=D1=80=D0=B0=D1=89=D0=B5=D0=BD=D0=B8=D1=8F?= =?UTF-8?q?=20=D0=B7=D0=B0=D0=B4=D0=B5=D1=80=D0=B6=D0=B5=D0=BA=20=D0=B5?= =?UTF-8?q?=D1=81=D0=BB=D0=B8=20downstream=20=D0=BC=D0=B5=D0=B4=D0=BB?= =?UTF-8?q?=D0=B5=D0=BD=D0=BD=D1=8B=D0=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sfu/forwarding.go | 109 ++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 90 insertions(+), 19 deletions(-) diff --git a/sfu/forwarding.go b/sfu/forwarding.go index 72f31be..9289c75 100644 --- a/sfu/forwarding.go +++ b/sfu/forwarding.go @@ -9,6 +9,7 @@ import ( "time" "github.com/pion/rtcp" + "github.com/pion/rtp" "github.com/pion/webrtc/v4" ) @@ -20,7 +21,13 @@ var ( renegPending sync.Map // key -> bool ) -const disconnectGracePeriod = 12 * time.Second +const ( + disconnectGracePeriod = 30 * time.Second + outboundPacketBuffer = 256 + maxConsecutiveReadErrs = 25 + maxConsecutiveWriteErrs = 50 + packetDropLogEvery = 100 +) func peerKey(roomID, peerID string) string { return roomID + "|" + peerID @@ -64,6 +71,7 @@ func removeRoomTrack(roomID, trackID string) { if !ok { return } + room.mu.Lock() defer room.mu.Unlock() @@ -79,8 +87,7 @@ func removeRoomTrack(roomID, trackID string) { func isPeerConnectionAlive(pc *webrtc.PeerConnection) bool { state := pc.ConnectionState() return state != webrtc.PeerConnectionStateClosed && - state != webrtc.PeerConnectionStateFailed && - state != webrtc.PeerConnectionStateDisconnected + state != webrtc.PeerConnectionStateFailed } // Вешается на каждый PeerConnection при JoinWithOffer. @@ -103,6 +110,7 @@ func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) { cancelTimer := func() { mu.Lock() defer mu.Unlock() + if timer != nil { timer.Stop() timer = nil @@ -123,9 +131,11 @@ func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) { startTimer := func() { mu.Lock() defer mu.Unlock() + if disconnecting { return } + disconnecting = true timer = time.AfterFunc(disconnectGracePeriod, func() { state := pc.ConnectionState() @@ -166,8 +176,7 @@ func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) { }) } -// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты -// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты +// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты. func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) { publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { localTrackID := fmt.Sprintf("%s:%s:%s:%d", @@ -232,33 +241,100 @@ func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC * subPC := sub.PeerConnection go func() { - logger.LogInfoMessage("SetupForwardingForPeer: starting renegotiation for peer=" + subID) if err := renegotiatePeer(roomID, subID, subPC); err != nil { logger.LogWarnMessage("SetupForwardingForPeer: renegotiatePeer error: " + subID + " " + err.Error()) } }() } - // Для video просим keyframe if remote.Kind() == webrtc.RTPCodecTypeVideo { _ = publisherPC.WriteRTCP([]rtcp.Packet{ &rtcp.PictureLossIndication{MediaSSRC: uint32(remote.SSRC())}, }) } - // Publisher RTP -> localTrack -> subscribers + // Отделяем чтение входящего RTP от записи в TrackLocal bounded-очередью. + // Если downstream начинает тормозить, пакеты дропаются из очереди, + // а не накапливают бесконечную задержку. + packetQueue := make(chan *rtp.Packet, outboundPacketBuffer) + writerDone := make(chan struct{}) + defer close(packetQueue) + + go func() { + defer close(writerDone) + + consecutiveWriteErrs := 0 + for pkt := range packetQueue { + if err := localTrack.WriteRTP(pkt); err != nil { + consecutiveWriteErrs++ + if consecutiveWriteErrs == 1 || consecutiveWriteErrs%10 == 0 { + logger.LogWarnMessage(fmt.Sprintf( + "SetupForwardingForPeer: WriteRTP error publisher=%s track=%s count=%d err=%v", + publisherPeerID, localTrack.ID(), consecutiveWriteErrs, err, + )) + } + if consecutiveWriteErrs >= maxConsecutiveWriteErrs { + logger.LogWarnMessage(fmt.Sprintf( + "SetupForwardingForPeer: too many WriteRTP errors publisher=%s track=%s", + publisherPeerID, localTrack.ID(), + )) + return + } + continue + } + consecutiveWriteErrs = 0 + } + }() + + consecutiveReadErrs := 0 + droppedPackets := 0 + for { pkt, _, err := remote.ReadRTP() if err != nil { if err == io.EOF { return } - logger.LogWarnMessage("SetupForwardingForPeer: ReadRTP error: " + err.Error()) - return + + consecutiveReadErrs++ + if consecutiveReadErrs == 1 || consecutiveReadErrs%10 == 0 { + logger.LogWarnMessage(fmt.Sprintf( + "SetupForwardingForPeer: ReadRTP error publisher=%s track=%s count=%d err=%v", + publisherPeerID, localTrack.ID(), consecutiveReadErrs, err, + )) + } + if consecutiveReadErrs >= maxConsecutiveReadErrs { + logger.LogWarnMessage(fmt.Sprintf( + "SetupForwardingForPeer: too many ReadRTP errors publisher=%s track=%s", + publisherPeerID, localTrack.ID(), + )) + return + } + + time.Sleep(10 * time.Millisecond) + continue } - if err = localTrack.WriteRTP(pkt); err != nil { - logger.LogWarnMessage("SetupForwardingForPeer: WriteRTP error: " + err.Error()) + + consecutiveReadErrs = 0 + + select { + case <-writerDone: return + default: + } + + select { + case <-writerDone: + return + case packetQueue <- pkt: + default: + droppedPackets++ + if droppedPackets == 1 || droppedPackets%packetDropLogEvery == 0 { + logger.LogWarnMessage(fmt.Sprintf( + "SetupForwardingForPeer: packet queue overflow publisher=%s track=%s dropped=%d", + publisherPeerID, localTrack.ID(), droppedPackets, + )) + } } } }) @@ -273,7 +349,6 @@ func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error { return nil } - // Не начинаем новую negotiation поверх текущей. if pc.SignalingState() != webrtc.SignalingStateStable { setRenegPending(roomID, peerID, true) return nil @@ -296,7 +371,7 @@ func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error { return nil } -// Добавляет ICE-кандидата к пиру +// Добавляет ICE-кандидата к пиру. func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidateInit) error { room, exists := GetRoom(roomID) if !exists { @@ -319,7 +394,6 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate rd := pc.RemoteDescription() if rd == nil { - // offer/answer еще не применен — буферизуем pendingMu.Lock() k := peerKey(roomID, peerID) pendingCandidates[k] = append(pendingCandidates[k], candidate) @@ -327,7 +401,6 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate return nil } - // Отбрасываем stale candidate по ufrag if candidate.UsernameFragment != nil { current := extractICEUfrag(rd.SDP) if current != "" && *candidate.UsernameFragment != current { @@ -342,7 +415,7 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate return err } -// Обрабатывает SDP answer от клиента при renegotiation +// Обрабатывает SDP answer от клиента при renegotiation. func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescription) error { if answer.Type != webrtc.SDPTypeAnswer { return fmt.Errorf("invalid sdp type: %s", answer.Type.String()) @@ -371,7 +444,6 @@ func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescr return err } - // После применения answer — применяем отложенные кандидаты. k := peerKey(roomID, peerID) pendingMu.Lock() queue := pendingCandidates[k] @@ -382,7 +454,6 @@ func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescr _ = AddICECandidate(roomID, peerID, c) } - // Если во время negotiation накопился новый запрос — запускаем еще цикл. if popRenegPending(roomID, peerID) { _ = renegotiatePeer(roomID, peerID, pc) }