package sfu import ( "fmt" "g365sfu/logger" "io" "strings" "sync" "time" "github.com/pion/rtcp" "github.com/pion/rtp" "github.com/pion/webrtc/v4" ) var ( pendingMu sync.Mutex pendingCandidates = map[string][]webrtc.ICECandidateInit{} // key: roomID|peerID renegMu sync.Map // key -> *sync.Mutex renegPending sync.Map // key -> bool ) const ( disconnectGracePeriod = 30 * time.Second outboundPacketBuffer = 256 maxConsecutiveReadErrs = 25 maxConsecutiveWriteErrs = 50 packetDropLogEvery = 100 ) func peerKey(roomID, peerID string) string { return roomID + "|" + peerID } func getRenegLock(roomID, peerID string) *sync.Mutex { k := peerKey(roomID, peerID) v, _ := renegMu.LoadOrStore(k, &sync.Mutex{}) return v.(*sync.Mutex) } func setRenegPending(roomID, peerID string, v bool) { renegPending.Store(peerKey(roomID, peerID), v) } func popRenegPending(roomID, peerID string) bool { k := peerKey(roomID, peerID) v, ok := renegPending.Load(k) if ok { renegPending.Delete(k) } if !ok { return false } b, _ := v.(bool) return b } func extractICEUfrag(sdp string) string { for _, line := range strings.Split(sdp, "\n") { line = strings.TrimSpace(line) if strings.HasPrefix(line, "a=ice-ufrag:") { return strings.TrimPrefix(line, "a=ice-ufrag:") } } return "" } func removeRoomTrack(roomID, trackID string) { room, ok := GetRoom(roomID) if !ok { return } room.mu.Lock() defer room.mu.Unlock() out := room.Tracks[:0] for _, t := range room.Tracks { if t.TrackID != trackID { out = append(out, t) } } room.Tracks = out } func isPeerConnectionAlive(pc *webrtc.PeerConnection) bool { state := pc.ConnectionState() return state != webrtc.PeerConnectionStateClosed && state != webrtc.PeerConnectionStateFailed } // Вешается на каждый PeerConnection при JoinWithOffer. // Автоматически вызывает LeaveRoom при обрыве соединения и очищает состояние ретрансляции. func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) { var ( mu sync.Mutex disconnecting bool timer *time.Timer leaveOnce sync.Once ) room, exists := GetRoom(roomID) if !exists { logger.LogWarnMessage("BindPeerLifecycle: tried to bind non existing room " + roomID) return } server := room.Server cancelTimer := func() { mu.Lock() defer mu.Unlock() if timer != nil { timer.Stop() timer = nil } disconnecting = false } leaveAndNotify := func(reason DisconnectReason) { leaveOnce.Do(func() { cancelTimer() err := LeaveRoom(roomID, peerID) if OnPeerDisconnected != nil && err == nil { OnPeerDisconnected(roomID, peerID, server, reason) } }) } startTimer := func() { mu.Lock() defer mu.Unlock() if disconnecting { return } disconnecting = true timer = time.AfterFunc(disconnectGracePeriod, func() { state := pc.ConnectionState() if state == webrtc.PeerConnectionStateConnected { mu.Lock() disconnecting = false mu.Unlock() return } leaveAndNotify(DisconnectReasonFailed) }) } pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { switch state { case webrtc.ICEConnectionStateConnected, webrtc.ICEConnectionStateCompleted: cancelTimer() case webrtc.ICEConnectionStateDisconnected: startTimer() case webrtc.ICEConnectionStateClosed: leaveAndNotify(DisconnectReasonClosed) case webrtc.ICEConnectionStateFailed: leaveAndNotify(DisconnectReasonFailed) } }) pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { switch state { case webrtc.PeerConnectionStateConnected: cancelTimer() case webrtc.PeerConnectionStateDisconnected: startTimer() case webrtc.PeerConnectionStateClosed: leaveAndNotify(DisconnectReasonClosed) case webrtc.PeerConnectionStateFailed: leaveAndNotify(DisconnectReasonFailed) } }) } // Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты. func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) { publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { localTrackID := fmt.Sprintf("%s:%s:%s:%d", publisherPeerID, remote.Kind().String(), remote.ID(), remote.SSRC(), ) localTrack, err := webrtc.NewTrackLocalStaticRTP( remote.Codec().RTPCodecCapability, localTrackID, remote.StreamID(), ) if err != nil { logger.LogErrorMessage("SetupForwardingForPeer: NewTrackLocalStaticRTP error: " + err.Error()) return } defer removeRoomTrack(roomID, localTrack.ID()) room, ok := GetRoom(roomID) if !ok { return } room.mu.Lock() room.Tracks = append(room.Tracks, RoomTrack{ TrackID: localTrack.ID(), OwnerPeer: publisherPeerID, Local: localTrack, }) peers := make([]Peer, len(room.Peers)) copy(peers, room.Peers) room.mu.Unlock() for _, sub := range peers { if sub.PeerID == publisherPeerID { continue } if !isPeerConnectionAlive(sub.PeerConnection) { continue } sender, err := sub.PeerConnection.AddTrack(localTrack) if err != nil { logger.LogWarnMessage("SetupForwardingForPeer: AddTrack error: " + sub.PeerID + " " + err.Error()) continue } senderCopy := sender go func() { buf := make([]byte, 1500) for { if _, _, e := senderCopy.Read(buf); e != nil { return } } }() subID := sub.PeerID subPC := sub.PeerConnection go func() { if err := renegotiatePeer(roomID, subID, subPC); err != nil { logger.LogWarnMessage("SetupForwardingForPeer: renegotiatePeer error: " + subID + " " + err.Error()) } }() } if remote.Kind() == webrtc.RTPCodecTypeVideo { _ = publisherPC.WriteRTCP([]rtcp.Packet{ &rtcp.PictureLossIndication{MediaSSRC: uint32(remote.SSRC())}, }) } // Отделяем чтение входящего RTP от записи в TrackLocal bounded-очередью. // Если downstream начинает тормозить, пакеты дропаются из очереди, // а не накапливают бесконечную задержку. packetQueue := make(chan *rtp.Packet, outboundPacketBuffer) writerDone := make(chan struct{}) defer close(packetQueue) go func() { defer close(writerDone) consecutiveWriteErrs := 0 for pkt := range packetQueue { if err := localTrack.WriteRTP(pkt); err != nil { consecutiveWriteErrs++ if consecutiveWriteErrs == 1 || consecutiveWriteErrs%10 == 0 { logger.LogWarnMessage(fmt.Sprintf( "SetupForwardingForPeer: WriteRTP error publisher=%s track=%s count=%d err=%v", publisherPeerID, localTrack.ID(), consecutiveWriteErrs, err, )) } if consecutiveWriteErrs >= maxConsecutiveWriteErrs { logger.LogWarnMessage(fmt.Sprintf( "SetupForwardingForPeer: too many WriteRTP errors publisher=%s track=%s", publisherPeerID, localTrack.ID(), )) return } continue } consecutiveWriteErrs = 0 } }() consecutiveReadErrs := 0 droppedPackets := 0 for { pkt, _, err := remote.ReadRTP() if err != nil { if err == io.EOF { return } consecutiveReadErrs++ if consecutiveReadErrs == 1 || consecutiveReadErrs%10 == 0 { logger.LogWarnMessage(fmt.Sprintf( "SetupForwardingForPeer: ReadRTP error publisher=%s track=%s count=%d err=%v", publisherPeerID, localTrack.ID(), consecutiveReadErrs, err, )) } if consecutiveReadErrs >= maxConsecutiveReadErrs { logger.LogWarnMessage(fmt.Sprintf( "SetupForwardingForPeer: too many ReadRTP errors publisher=%s track=%s", publisherPeerID, localTrack.ID(), )) return } time.Sleep(10 * time.Millisecond) continue } consecutiveReadErrs = 0 select { case <-writerDone: return default: } select { case <-writerDone: return case packetQueue <- pkt: default: droppedPackets++ if droppedPackets == 1 || droppedPackets%packetDropLogEvery == 0 { logger.LogWarnMessage(fmt.Sprintf( "SetupForwardingForPeer: packet queue overflow publisher=%s track=%s dropped=%d", publisherPeerID, localTrack.ID(), droppedPackets, )) } } } }) } func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error { lock := getRenegLock(roomID, peerID) lock.Lock() defer lock.Unlock() if !isPeerConnectionAlive(pc) { return nil } if pc.SignalingState() != webrtc.SignalingStateStable { setRenegPending(roomID, peerID, true) return nil } offer, err := pc.CreateOffer(nil) if err != nil { return err } gatherDone := webrtc.GatheringCompletePromise(pc) if err = pc.SetLocalDescription(offer); err != nil { return err } <-gatherDone if OnServerOffer != nil && pc.LocalDescription() != nil { OnServerOffer(roomID, peerID, *pc.LocalDescription()) } return nil } // Добавляет ICE-кандидата к пиру. func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidateInit) error { room, exists := GetRoom(roomID) if !exists { return ErrRoomNotFound } room.mu.RLock() var pc *webrtc.PeerConnection for _, p := range room.Peers { if p.PeerID == peerID { pc = p.PeerConnection break } } room.mu.RUnlock() if pc == nil { return ErrPeerNotFound } rd := pc.RemoteDescription() if rd == nil { pendingMu.Lock() k := peerKey(roomID, peerID) pendingCandidates[k] = append(pendingCandidates[k], candidate) pendingMu.Unlock() return nil } if candidate.UsernameFragment != nil { current := extractICEUfrag(rd.SDP) if current != "" && *candidate.UsernameFragment != current { return nil } } err := pc.AddICECandidate(candidate) if err != nil && strings.Contains(err.Error(), "doesn't match the current ufrags") { return nil } return err } // Обрабатывает SDP answer от клиента при renegotiation. func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescription) error { if answer.Type != webrtc.SDPTypeAnswer { return fmt.Errorf("invalid sdp type: %s", answer.Type.String()) } room, exists := GetRoom(roomID) if !exists { return ErrRoomNotFound } room.mu.RLock() var pc *webrtc.PeerConnection for _, p := range room.Peers { if p.PeerID == peerID { pc = p.PeerConnection break } } room.mu.RUnlock() if pc == nil { return ErrPeerNotFound } if err := pc.SetRemoteDescription(answer); err != nil { return err } k := peerKey(roomID, peerID) pendingMu.Lock() queue := pendingCandidates[k] delete(pendingCandidates, k) pendingMu.Unlock() for _, c := range queue { _ = AddICECandidate(roomID, peerID, c) } if popRenegPending(roomID, peerID) { _ = renegotiatePeer(roomID, peerID, pc) } return nil } func cleanupForwardingState(roomID, peerID string) { k := peerKey(roomID, peerID) pendingMu.Lock() delete(pendingCandidates, k) pendingMu.Unlock() renegPending.Delete(k) renegMu.Delete(k) }