package sfu import ( "fmt" "g365sfu/logger" "io" "strings" "sync" "time" "github.com/pion/rtcp" "github.com/pion/webrtc/v4" ) var ( pendingMu sync.Mutex pendingCandidates = map[string][]webrtc.ICECandidateInit{} // key: roomID|peerID renegMu sync.Map // key -> *sync.Mutex renegPending sync.Map // key -> bool ) const disconnectGracePeriod = 12 * time.Second func peerKey(roomID, peerID string) string { return roomID + "|" + peerID } func getRenegLock(roomID, peerID string) *sync.Mutex { k := peerKey(roomID, peerID) v, _ := renegMu.LoadOrStore(k, &sync.Mutex{}) return v.(*sync.Mutex) } func setRenegPending(roomID, peerID string, v bool) { renegPending.Store(peerKey(roomID, peerID), v) } func popRenegPending(roomID, peerID string) bool { k := peerKey(roomID, peerID) v, ok := renegPending.Load(k) if ok { renegPending.Delete(k) } if !ok { return false } b, _ := v.(bool) return b } func extractICEUfrag(sdp string) string { for _, line := range strings.Split(sdp, "\n") { line = strings.TrimSpace(line) if strings.HasPrefix(line, "a=ice-ufrag:") { return strings.TrimPrefix(line, "a=ice-ufrag:") } } return "" } func removeRoomTrack(roomID, trackID string) { room, ok := GetRoom(roomID) if !ok { return } room.mu.Lock() defer room.mu.Unlock() out := room.Tracks[:0] for _, t := range room.Tracks { if t.TrackID != trackID { out = append(out, t) } } room.Tracks = out } func isPeerConnectionAlive(pc *webrtc.PeerConnection) bool { state := pc.ConnectionState() return state != webrtc.PeerConnectionStateClosed && state != webrtc.PeerConnectionStateFailed && state != webrtc.PeerConnectionStateDisconnected } // Вешается на каждый PeerConnection при JoinWithOffer. // Автоматически вызывает LeaveRoom при обрыве соединения и очищает состояние ретрансляции. func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) { var ( mu sync.Mutex disconnecting bool timer *time.Timer ) room, exists := GetRoom(roomID) if !exists { logger.LogWarnMessage("BindPeerLifecycle: tried to bind non existing room " + roomID) return } server := room.Server startTimer := func() { mu.Lock() defer mu.Unlock() if disconnecting { return } disconnecting = true timer = time.AfterFunc(disconnectGracePeriod, func() { state := pc.ConnectionState() if state == webrtc.PeerConnectionStateConnected { mu.Lock() disconnecting = false mu.Unlock() return } _ = LeaveRoom(roomID, peerID) if OnPeerDisconnected != nil { OnPeerDisconnected(roomID, peerID, server) } }) } cancelTimer := func() { mu.Lock() defer mu.Unlock() if timer != nil { timer.Stop() timer = nil } disconnecting = false } pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { switch state { case webrtc.ICEConnectionStateConnected, webrtc.ICEConnectionStateCompleted: cancelTimer() case webrtc.ICEConnectionStateDisconnected: startTimer() case webrtc.ICEConnectionStateFailed, webrtc.ICEConnectionStateClosed: cancelTimer() _ = LeaveRoom(roomID, peerID) if OnPeerDisconnected != nil { OnPeerDisconnected(roomID, peerID, server) } } }) pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { switch state { case webrtc.PeerConnectionStateConnected: cancelTimer() case webrtc.PeerConnectionStateDisconnected: startTimer() case webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateClosed: cancelTimer() _ = LeaveRoom(roomID, peerID) if OnPeerDisconnected != nil { OnPeerDisconnected(roomID, peerID, server) } } }) } // Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) { publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { localTrackID := fmt.Sprintf("%s:%s:%s:%d", publisherPeerID, remote.Kind().String(), remote.ID(), remote.SSRC(), ) localTrack, err := webrtc.NewTrackLocalStaticRTP( remote.Codec().RTPCodecCapability, localTrackID, remote.StreamID(), ) if err != nil { logger.LogErrorMessage("SetupForwardingForPeer: NewTrackLocalStaticRTP error") return } defer removeRoomTrack(roomID, localTrack.ID()) room, ok := GetRoom(roomID) if !ok { return } room.mu.Lock() room.Tracks = append(room.Tracks, RoomTrack{ TrackID: localTrack.ID(), OwnerPeer: publisherPeerID, Local: localTrack, }) peers := make([]Peer, len(room.Peers)) copy(peers, room.Peers) room.mu.Unlock() for _, sub := range peers { if sub.PeerID == publisherPeerID { continue } // Не трогаем закрытые/failed соединения if !isPeerConnectionAlive(sub.PeerConnection) { fmt.Println("SetupForwardingForPeer: skipping dead peer:", sub.PeerID, sub.PeerConnection.ConnectionState().String()) continue } sender, err := sub.PeerConnection.AddTrack(localTrack) if err != nil { fmt.Println("SetupForwardingForPeer: AddTrack error:", roomID, sub.PeerID, err) continue } // RTCP drain go func() { buf := make([]byte, 1500) for { if _, _, e := sender.Read(buf); e != nil { return } } }() if err = renegotiatePeer(roomID, sub.PeerID, sub.PeerConnection); err != nil { fmt.Println("SetupForwardingForPeer: renegotiatePeer error:", roomID, sub.PeerID, err) } } // Для video просим keyframe if remote.Kind() == webrtc.RTPCodecTypeVideo { _ = publisherPC.WriteRTCP([]rtcp.Packet{ &rtcp.PictureLossIndication{MediaSSRC: uint32(remote.SSRC())}, }) } // Publisher RTP -> localTrack -> subscribers for { pkt, _, err := remote.ReadRTP() if err != nil { if err == io.EOF { return } fmt.Println("SetupForwardingForPeer: ReadRTP error:", err) return } if err = localTrack.WriteRTP(pkt); err != nil { fmt.Println("SetupForwardingForPeer: WriteRTP error:", err) return } } }) } func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error { lock := getRenegLock(roomID, peerID) lock.Lock() defer lock.Unlock() if !isPeerConnectionAlive(pc) { return nil } // Не начинаем новую negotiation поверх текущей. if pc.SignalingState() != webrtc.SignalingStateStable { setRenegPending(roomID, peerID, true) return nil } offer, err := pc.CreateOffer(nil) if err != nil { return err } gatherDone := webrtc.GatheringCompletePromise(pc) if err = pc.SetLocalDescription(offer); err != nil { return err } <-gatherDone if OnServerOffer != nil && pc.LocalDescription() != nil { OnServerOffer(roomID, peerID, *pc.LocalDescription()) } return nil } // Добавляет ICE-кандидата к пиру func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidateInit) error { room, exists := GetRoom(roomID) if !exists { return ErrRoomNotFound } room.mu.RLock() var pc *webrtc.PeerConnection for _, p := range room.Peers { if p.PeerID == peerID { pc = p.PeerConnection break } } room.mu.RUnlock() if pc == nil { return ErrPeerNotFound } rd := pc.RemoteDescription() if rd == nil { // offer/answer еще не применен — буферизуем pendingMu.Lock() k := peerKey(roomID, peerID) pendingCandidates[k] = append(pendingCandidates[k], candidate) pendingMu.Unlock() return nil } // Отбрасываем stale candidate по ufrag if candidate.UsernameFragment != nil { current := extractICEUfrag(rd.SDP) if current != "" && *candidate.UsernameFragment != current { return nil } } err := pc.AddICECandidate(candidate) if err != nil && strings.Contains(err.Error(), "doesn't match the current ufrags") { return nil } return err } // Обрабатывает SDP answer от клиента при renegotiation func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescription) error { if answer.Type != webrtc.SDPTypeAnswer { return fmt.Errorf("invalid sdp type: %s", answer.Type.String()) } room, exists := GetRoom(roomID) if !exists { return ErrRoomNotFound } room.mu.RLock() var pc *webrtc.PeerConnection for _, p := range room.Peers { if p.PeerID == peerID { pc = p.PeerConnection break } } room.mu.RUnlock() if pc == nil { return ErrPeerNotFound } if err := pc.SetRemoteDescription(answer); err != nil { return err } // После применения answer — применяем отложенные кандидаты. k := peerKey(roomID, peerID) pendingMu.Lock() queue := pendingCandidates[k] delete(pendingCandidates, k) pendingMu.Unlock() for _, c := range queue { _ = AddICECandidate(roomID, peerID, c) } // Если во время negotiation накопился новый запрос — запускаем еще цикл. if popRenegPending(roomID, peerID) { _ = renegotiatePeer(roomID, peerID, pc) } return nil } func cleanupForwardingState(roomID, peerID string) { k := peerKey(roomID, peerID) pendingMu.Lock() delete(pendingCandidates, k) pendingMu.Unlock() renegPending.Delete(k) renegMu.Delete(k) }