From 1c245bd453976e9f175a20c33078a20ea7dc4ab5 Mon Sep 17 00:00:00 2001 From: set Date: Wed, 11 Mar 2026 19:32:00 +0200 Subject: [PATCH] =?UTF-8?q?=D0=9F=D0=B5=D1=80=D0=B5=D1=81=D1=8B=D0=BB?= =?UTF-8?q?=D0=BA=D0=B0=20RTP=20=D0=BF=D0=B0=D0=BA=D0=B5=D1=82=D0=BE=D0=B2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- sfu/forwarding.go | 116 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 sfu/forwarding.go diff --git a/sfu/forwarding.go b/sfu/forwarding.go new file mode 100644 index 0000000..2256317 --- /dev/null +++ b/sfu/forwarding.go @@ -0,0 +1,116 @@ +package sfu + +import ( + "fmt" + "io" + + "github.com/pion/webrtc/v4" +) + +// SFU -> Java signaling: отправить server-offer для конкретного peer +var OnServerOffer func(roomID string, peerID string, offer webrtc.SessionDescription) + +// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты +func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) { + publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { + localTrack, err := webrtc.NewTrackLocalStaticRTP( + remote.Codec().RTPCodecCapability, + fmt.Sprintf("%s:%s", publisherPeerID, remote.ID()), + remote.StreamID(), + ) + if err != nil { + return + } + + // Добавляем этот track всем, кроме publisher + roomsMu.RLock() + room, ok := rooms[roomID] + if !ok { + roomsMu.RUnlock() + return + } + peers := make([]Peer, len(room.Peers)) + copy(peers, room.Peers) + roomsMu.RUnlock() + + for _, sub := range peers { + if sub.PeerID == publisherPeerID { + continue + } + + sender, err := sub.PeerConnection.AddTrack(localTrack) + if err != nil { + continue + } + + // RTCP drain обязателен + go func() { + buf := make([]byte, 1500) + for { + if _, _, e := sender.Read(buf); e != nil { + return + } + } + }() + + // После AddTrack нужна renegotiation для подписчика + _ = renegotiatePeer(roomID, sub.PeerID, sub.PeerConnection) + } + + // Пересылаем RTP пакеты от издателя всем подписчикам + for { + pkt, _, err := remote.ReadRTP() + if err != nil { + if err == io.EOF { + return + } + return + } + if err = localTrack.WriteRTP(pkt); err != nil { + return + } + } + }) +} + +func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error { + offer, err := pc.CreateOffer(nil) + if err != nil { + return err + } + + gatherDone := webrtc.GatheringCompletePromise(pc) + if err = pc.SetLocalDescription(offer); err != nil { + return err + } + <-gatherDone + + if OnServerOffer != nil && pc.LocalDescription() != nil { + OnServerOffer(roomID, peerID, *pc.LocalDescription()) + } + return nil +} + +// Java -> SFU: answer клиента на server-offer +func HandleClientAnswer(roomID, peerID string, answer webrtc.SessionDescription) error { + roomsMu.RLock() + room, ok := rooms[roomID] + if !ok { + roomsMu.RUnlock() + return ErrRoomNotFound + } + + var pc *webrtc.PeerConnection + for _, p := range room.Peers { + if p.PeerID == peerID { + pc = p.PeerConnection + break + } + } + roomsMu.RUnlock() + + if pc == nil { + return ErrPeerNotFound + } + return pc.SetRemoteDescription(answer) +}