From e67bd3d82404862589a941fc091b6500a13949fc Mon Sep 17 00:00:00 2001 From: set Date: Mon, 16 Mar 2026 17:08:56 +0200 Subject: [PATCH] =?UTF-8?q?=D0=9D=D0=BE=D0=B2=D1=8B=D0=B5=20=D1=81=D0=BE?= =?UTF-8?q?=D0=B1=D1=8B=D1=82=D0=B8=D1=8F=20=D1=81=20=D0=BF=D0=B8=D1=80?= =?UTF-8?q?=D0=B0=D0=BC=D0=B8=20=D0=B8=20=D0=BA=D0=BE=D0=BC=D0=BD=D0=B0?= =?UTF-8?q?=D1=82=D0=B0=D0=BC=D0=B8,=20=D0=B1=D0=B0=D0=B7=D0=BE=D0=B2?= =?UTF-8?q?=D1=8B=D0=B9=20=D1=84=D0=BE=D1=80=D0=B2=D0=B0=D1=80=D0=B4=D0=B8?= =?UTF-8?q?=D0=BD=D0=B3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- boot/boot.go | 32 +++++++ sfu/forwarding.go | 229 +++++++++++++++++++++++++++++++++++++++++----- sfu/rooms.go | 147 ++++++++++++++++++++++++----- sfu/sfu.go | 6 ++ socket/socket.go | 4 +- 5 files changed, 370 insertions(+), 48 deletions(-) diff --git a/boot/boot.go b/boot/boot.go index 2db79b2..884eb6d 100644 --- a/boot/boot.go +++ b/boot/boot.go @@ -28,6 +28,8 @@ func Bootstrap() { } sfu.OnLocalICECandidate = OnLocalICECandidate sfu.OnServerOffer = OnServerOffer + sfu.OnRoomDelete = OnRoomDelete + sfu.OnPeerDisconnected = OnPeerDisconnected turnServer, err := turn.Start(turn.Config{ ListenAddr: "0.0.0.0:3478", PublicIP: os.Getenv("TURN_PUBLIC_IP"), @@ -88,3 +90,33 @@ func OnServerOffer(roomID string, peerID string, offer webrtc.SessionDescription buffer.Flip() room.Server.WriteBinary(buffer.Bytes()) } + +func OnRoomDelete(roomID string) { + room, exists := sfu.GetRoom(roomID) + if !exists { + logger.LogWarnMessage("tried to send room delete event to non existing room " + roomID) + return + } + buffer := bytebuffer.Allocate(1 + 4 + len([]byte(roomID))) + buffer.Put(0x10) + buffer.PutUint32(uint32(len([]byte(roomID)))) + buffer.PutBytes([]byte(roomID)) + buffer.Flip() + room.Server.WriteBinary(buffer.Bytes()) +} + +func OnPeerDisconnected(roomID string, peerID string) { + room, exists := sfu.GetRoom(roomID) + if !exists { + logger.LogWarnMessage("tried to send peer disconnected event to non existing room " + roomID) + return + } + buffer := bytebuffer.Allocate(1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID))) + buffer.Put(0x11) + buffer.PutUint32(uint32(len([]byte(roomID)))) + buffer.PutBytes([]byte(roomID)) + buffer.PutUint32(uint32(len([]byte(peerID)))) + buffer.PutBytes([]byte(peerID)) + buffer.Flip() + room.Server.WriteBinary(buffer.Bytes()) +} diff --git a/sfu/forwarding.go b/sfu/forwarding.go index 7880571..aaa7a70 100644 --- a/sfu/forwarding.go +++ b/sfu/forwarding.go @@ -2,27 +2,51 @@ package sfu import ( "fmt" + "g365sfu/logger" "io" "strings" "sync" + "time" + "github.com/pion/rtcp" "github.com/pion/webrtc/v4" ) var ( pendingMu sync.Mutex pendingCandidates = map[string][]webrtc.ICECandidateInit{} // key: roomID|peerID - renegMu sync.Map + + renegMu sync.Map // key -> *sync.Mutex + renegPending sync.Map // key -> bool ) +const disconnectGracePeriod = 12 * time.Second + +func peerKey(roomID, peerID string) string { + return roomID + "|" + peerID +} + func getRenegLock(roomID, peerID string) *sync.Mutex { k := peerKey(roomID, peerID) v, _ := renegMu.LoadOrStore(k, &sync.Mutex{}) return v.(*sync.Mutex) } -func peerKey(roomID, peerID string) string { - return roomID + "|" + peerID +func setRenegPending(roomID, peerID string, v bool) { + renegPending.Store(peerKey(roomID, peerID), v) +} + +func popRenegPending(roomID, peerID string) bool { + k := peerKey(roomID, peerID) + v, ok := renegPending.Load(k) + if ok { + renegPending.Delete(k) + } + if !ok { + return false + } + b, _ := v.(bool) + return b } func extractICEUfrag(sdp string) string { @@ -35,40 +59,157 @@ func extractICEUfrag(sdp string) string { return "" } +func removeRoomTrack(roomID, trackID string) { + room, ok := GetRoom(roomID) + if !ok { + return + } + room.mu.Lock() + defer room.mu.Unlock() + + out := room.Tracks[:0] + for _, t := range room.Tracks { + if t.TrackID != trackID { + out = append(out, t) + } + } + room.Tracks = out +} + +func isPeerConnectionAlive(pc *webrtc.PeerConnection) bool { + state := pc.ConnectionState() + return state != webrtc.PeerConnectionStateClosed && + state != webrtc.PeerConnectionStateFailed && + state != webrtc.PeerConnectionStateDisconnected +} + +// Вешается на каждый PeerConnection при JoinWithOffer. +// Автоматически вызывает LeaveRoom при обрыве соединения и очищает состояние ретрансляции. +func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) { + var ( + mu sync.Mutex + disconnecting bool + timer *time.Timer + ) + + startTimer := func() { + mu.Lock() + defer mu.Unlock() + if disconnecting { + return + } + disconnecting = true + timer = time.AfterFunc(disconnectGracePeriod, func() { + state := pc.ConnectionState() + if state == webrtc.PeerConnectionStateConnected { + mu.Lock() + disconnecting = false + mu.Unlock() + return + } + _ = LeaveRoom(roomID, peerID) + if OnPeerDisconnected != nil { + OnPeerDisconnected(roomID, peerID) + } + }) + } + + cancelTimer := func() { + mu.Lock() + defer mu.Unlock() + if timer != nil { + timer.Stop() + timer = nil + } + disconnecting = false + } + + pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) { + switch state { + case webrtc.ICEConnectionStateConnected, webrtc.ICEConnectionStateCompleted: + cancelTimer() + case webrtc.ICEConnectionStateDisconnected: + startTimer() + case webrtc.ICEConnectionStateFailed, webrtc.ICEConnectionStateClosed: + cancelTimer() + _ = LeaveRoom(roomID, peerID) + if OnPeerDisconnected != nil { + OnPeerDisconnected(roomID, peerID) + } + } + }) + + pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { + switch state { + case webrtc.PeerConnectionStateConnected: + cancelTimer() + case webrtc.PeerConnectionStateDisconnected: + startTimer() + case webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateClosed: + cancelTimer() + _ = LeaveRoom(roomID, peerID) + if OnPeerDisconnected != nil { + OnPeerDisconnected(roomID, peerID) + } + } + }) +} + // Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) { publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { + localTrackID := fmt.Sprintf("%s:%s:%s:%d", + publisherPeerID, + remote.Kind().String(), + remote.ID(), + remote.SSRC(), + ) + localTrack, err := webrtc.NewTrackLocalStaticRTP( remote.Codec().RTPCodecCapability, - fmt.Sprintf("%s:%s", publisherPeerID, remote.ID()), + localTrackID, remote.StreamID(), ) if err != nil { + logger.LogErrorMessage("SetupForwardingForPeer: NewTrackLocalStaticRTP error") + return + } + defer removeRoomTrack(roomID, localTrack.ID()) + + room, ok := GetRoom(roomID) + if !ok { return } - // Добавляем этот track всем, кроме publisher - roomsMu.RLock() - room, ok := rooms[roomID] - if !ok { - roomsMu.RUnlock() - return - } + room.mu.Lock() + room.Tracks = append(room.Tracks, RoomTrack{ + TrackID: localTrack.ID(), + OwnerPeer: publisherPeerID, + Local: localTrack, + }) peers := make([]Peer, len(room.Peers)) copy(peers, room.Peers) - roomsMu.RUnlock() + room.mu.Unlock() for _, sub := range peers { if sub.PeerID == publisherPeerID { continue } - sender, err := sub.PeerConnection.AddTrack(localTrack) - if err != nil { + // Не трогаем закрытые/failed соединения + if !isPeerConnectionAlive(sub.PeerConnection) { + fmt.Println("SetupForwardingForPeer: skipping dead peer:", sub.PeerID, + sub.PeerConnection.ConnectionState().String()) continue } - // RTCP drain обязателен + sender, err := sub.PeerConnection.AddTrack(localTrack) + if err != nil { + fmt.Println("SetupForwardingForPeer: AddTrack error:", roomID, sub.PeerID, err) + continue + } + + // RTCP drain go func() { buf := make([]byte, 1500) for { @@ -78,20 +219,30 @@ func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC * } }() - // После AddTrack нужна renegotiation для подписчика - _ = renegotiatePeer(roomID, sub.PeerID, sub.PeerConnection) + if err = renegotiatePeer(roomID, sub.PeerID, sub.PeerConnection); err != nil { + fmt.Println("SetupForwardingForPeer: renegotiatePeer error:", roomID, sub.PeerID, err) + } } - // Пересылаем RTP пакеты от издателя всем подписчикам + // Для video просим keyframe + if remote.Kind() == webrtc.RTPCodecTypeVideo { + _ = publisherPC.WriteRTCP([]rtcp.Packet{ + &rtcp.PictureLossIndication{MediaSSRC: uint32(remote.SSRC())}, + }) + } + + // Publisher RTP -> localTrack -> subscribers for { pkt, _, err := remote.ReadRTP() if err != nil { if err == io.EOF { return } + fmt.Println("SetupForwardingForPeer: ReadRTP error:", err) return } if err = localTrack.WriteRTP(pkt); err != nil { + fmt.Println("SetupForwardingForPeer: WriteRTP error:", err) return } } @@ -103,8 +254,13 @@ func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error { lock.Lock() defer lock.Unlock() - // Не начинаем новую negotiation поверх текущей + if !isPeerConnectionAlive(pc) { + return nil + } + + // Не начинаем новую negotiation поверх текущей. if pc.SignalingState() != webrtc.SignalingStateStable { + setRenegPending(roomID, peerID, true) return nil } @@ -132,6 +288,7 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate return ErrRoomNotFound } + room.mu.RLock() var pc *webrtc.PeerConnection for _, p := range room.Peers { if p.PeerID == peerID { @@ -139,13 +296,15 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate break } } + room.mu.RUnlock() + if pc == nil { return ErrPeerNotFound } rd := pc.RemoteDescription() if rd == nil { - // answer/offer еще не применен — буферизуем + // offer/answer еще не применен — буферизуем pendingMu.Lock() k := peerKey(roomID, peerID) pendingCandidates[k] = append(pendingCandidates[k], candidate) @@ -153,7 +312,7 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate return nil } - // отбрасываем stale candidate по ufrag + // Отбрасываем stale candidate по ufrag if candidate.UsernameFragment != nil { current := extractICEUfrag(rd.SDP) if current != "" && *candidate.UsernameFragment != current { @@ -163,19 +322,23 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate err := pc.AddICECandidate(candidate) if err != nil && strings.Contains(err.Error(), "doesn't match the current ufrags") { - // поздний старый кандидат — игнорируем return nil } return err } -// Обрабатывает SDP ответ от клиента при renegotiation +// Обрабатывает SDP answer от клиента при renegotiation func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescription) error { + if answer.Type != webrtc.SDPTypeAnswer { + return fmt.Errorf("invalid sdp type: %s", answer.Type.String()) + } + room, exists := GetRoom(roomID) if !exists { return ErrRoomNotFound } + room.mu.RLock() var pc *webrtc.PeerConnection for _, p := range room.Peers { if p.PeerID == peerID { @@ -183,6 +346,8 @@ func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescr break } } + room.mu.RUnlock() + if pc == nil { return ErrPeerNotFound } @@ -191,7 +356,7 @@ func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescr return err } - // после применения answer — применяем отложенные кандидаты + // После применения answer — применяем отложенные кандидаты. k := peerKey(roomID, peerID) pendingMu.Lock() queue := pendingCandidates[k] @@ -202,5 +367,21 @@ func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescr _ = AddICECandidate(roomID, peerID, c) } + // Если во время negotiation накопился новый запрос — запускаем еще цикл. + if popRenegPending(roomID, peerID) { + _ = renegotiatePeer(roomID, peerID, pc) + } + return nil } + +func cleanupForwardingState(roomID, peerID string) { + k := peerKey(roomID, peerID) + + pendingMu.Lock() + delete(pendingCandidates, k) + pendingMu.Unlock() + + renegPending.Delete(k) + renegMu.Delete(k) +} diff --git a/sfu/rooms.go b/sfu/rooms.go index 9c0f0c2..572dcfc 100644 --- a/sfu/rooms.go +++ b/sfu/rooms.go @@ -16,6 +16,12 @@ type Peer struct { PeerConnection *webrtc.PeerConnection } +type RoomTrack struct { + TrackID string + OwnerPeer string + Local *webrtc.TrackLocalStaticRTP +} + type Room struct { //Уникальный идентификатор комнаты RoomID string @@ -23,6 +29,10 @@ type Room struct { Server *connection.Connection //Пиры которые подключились к комнате Peers []Peer + + Tracks []RoomTrack + + mu sync.RWMutex } // Общие переменные @@ -54,13 +64,6 @@ func GetRoom(roomID string) (*Room, bool) { return room, exists } -// DeleteRoom удаляет комнату по идентификатору -func DeleteRoom(roomID string) { - roomsMu.Lock() - defer roomsMu.Unlock() - delete(rooms, roomID) -} - // JoinWithOffer позволяет пиру присоединиться к комнате с помощью SDP оффера func JoinWithOffer(roomID string, peerID string, offer webrtc.SessionDescription) (*webrtc.SessionDescription, error) { room, exists := GetRoom(roomID) @@ -73,43 +76,96 @@ func JoinWithOffer(roomID string, peerID string, offer webrtc.SessionDescription return nil, err } - // SFU локальные ICE-кандидаты отправляем сначала бекенду затем тот их - // пересылает клиенту для установления соединения peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) { if c == nil { - return // gathering finished + return } if OnLocalICECandidate != nil { OnLocalICECandidate(roomID, peerID, c.ToJSON()) } }) - err = peerConnection.SetRemoteDescription(offer) - if err != nil { + BindPeerLifecycle(roomID, peerID, peerConnection) + SetupForwardingForPeer(roomID, peerID, peerConnection) + + room.mu.RLock() + existingTracks := make([]RoomTrack, len(room.Tracks)) + copy(existingTracks, room.Tracks) + room.mu.RUnlock() + + for _, t := range existingTracks { + if t.OwnerPeer == peerID { + continue + } + + sender, err := peerConnection.AddTrack(t.Local) + if err != nil { + continue + } + + go func() { + buf := make([]byte, 1500) + for { + if _, _, e := sender.Read(buf); e != nil { + return + } + } + }() + } + + if err = peerConnection.SetRemoteDescription(offer); err != nil { + _ = peerConnection.Close() return nil, err } answer, err := peerConnection.CreateAnswer(nil) if err != nil { + _ = peerConnection.Close() return nil, err } - err = peerConnection.SetLocalDescription(answer) - if err != nil { + gatherDone := webrtc.GatheringCompletePromise(peerConnection) + if err = peerConnection.SetLocalDescription(answer); err != nil { + _ = peerConnection.Close() return nil, err } + <-gatherDone - // Настраиваем пересылку RTP пакетов от издателя к другим участникам комнаты - SetupForwardingForPeer(roomID, peerID, peerConnection) - + room.mu.Lock() room.Peers = append(room.Peers, Peer{ PeerID: peerID, PeerConnection: peerConnection, }) + room.mu.Unlock() return peerConnection.LocalDescription(), nil } +func DeleteRoom(roomID string) error { + roomsMu.Lock() + room, exists := rooms[roomID] + if !exists { + roomsMu.Unlock() + return ErrRoomNotFound + } + delete(rooms, roomID) + roomsMu.Unlock() + + room.mu.Lock() + peers := make([]Peer, len(room.Peers)) + copy(peers, room.Peers) + room.Peers = nil + room.Tracks = nil + room.mu.Unlock() + + for _, p := range peers { + _ = p.PeerConnection.Close() + cleanupForwardingState(roomID, p.PeerID) + } + + return nil +} + // LeaveRoom позволяет пиру покинуть комнату func LeaveRoom(roomID string, peerID string) error { room, exists := GetRoom(roomID) @@ -117,12 +173,59 @@ func LeaveRoom(roomID string, peerID string) error { return ErrRoomNotFound } - for i, peer := range room.Peers { - if peer.PeerID == peerID { - peer.PeerConnection.Close() - room.Peers = append(room.Peers[:i], room.Peers[i+1:]...) - break + var ( + removedPC *webrtc.PeerConnection + removed bool + shouldDrop bool + ) + + room.mu.Lock() + // удаляем peer + nextPeers := make([]Peer, 0, len(room.Peers)) + for _, p := range room.Peers { + if p.PeerID == peerID { + removedPC = p.PeerConnection + removed = true + continue } + nextPeers = append(nextPeers, p) + } + room.Peers = nextPeers + + // удаляем треки этого publisher + nextTracks := room.Tracks[:0] + for _, t := range room.Tracks { + if t.OwnerPeer != peerID { + nextTracks = append(nextTracks, t) + } + } + room.Tracks = nextTracks + + shouldDrop = len(room.Peers) == 0 + room.mu.Unlock() + + if !removed { + return ErrPeerNotFound + } + + if removedPC != nil { + _ = removedPC.Close() + } + cleanupForwardingState(roomID, peerID) + + // Комната пустая -> удаляем + if shouldDrop { + return DeleteRoom(roomID) + } + + // Опционально: renegotiation оставшимся peer после удаления треков/peer + room.mu.RLock() + rest := make([]Peer, len(room.Peers)) + copy(rest, room.Peers) + room.mu.RUnlock() + + for _, p := range rest { + _ = renegotiatePeer(roomID, p.PeerID, p.PeerConnection) } return nil diff --git a/sfu/sfu.go b/sfu/sfu.go index 15e1785..c46da3c 100644 --- a/sfu/sfu.go +++ b/sfu/sfu.go @@ -20,6 +20,12 @@ var OnServerOffer func(roomID string, peerID string, offer webrtc.SessionDescrip // Коллбек для обработки новых ICE кандидатов var OnLocalICECandidate func(roomID, peerID string, candidate webrtc.ICECandidateInit) +// Коллбек для обработки отключения пира (обрыв связи) +var OnPeerDisconnected func(roomID, peerID string) + +// Коллбек для обработки удаления комнаты +var OnRoomDelete func(roomID string) + // Ошибки var ( ErrRoomNotFound = errors.New("room not found") diff --git a/socket/socket.go b/socket/socket.go index 4363ac5..1d52cf6 100644 --- a/socket/socket.go +++ b/socket/socket.go @@ -204,10 +204,10 @@ func processData(data <-chan []byte, connection *connection.Connection) { } } //Check life для проверки соединения с сервером SFU - if packetId == 0x08 { + if packetId == 0xAE { // Подготовка ответа для клиента о том, что соединение живо response := bytebuffer.Allocate(1) - response.Put(0x08) + response.Put(0xAE) response.Flip() // Отправляем ответ клиенту connection.WriteBinary(response.Bytes())