diff --git a/sfu/forwarding.go b/sfu/forwarding.go new file mode 100644 index 0000000..2256317 --- /dev/null +++ b/sfu/forwarding.go @@ -0,0 +1,116 @@ +package sfu + +import ( + "fmt" + "io" + + "github.com/pion/webrtc/v4" +) + +// SFU -> Java signaling: отправить server-offer для конкретного peer +var OnServerOffer func(roomID string, peerID string, offer webrtc.SessionDescription) + +// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты +func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) { + publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { + localTrack, err := webrtc.NewTrackLocalStaticRTP( + remote.Codec().RTPCodecCapability, + fmt.Sprintf("%s:%s", publisherPeerID, remote.ID()), + remote.StreamID(), + ) + if err != nil { + return + } + + // Добавляем этот track всем, кроме publisher + roomsMu.RLock() + room, ok := rooms[roomID] + if !ok { + roomsMu.RUnlock() + return + } + peers := make([]Peer, len(room.Peers)) + copy(peers, room.Peers) + roomsMu.RUnlock() + + for _, sub := range peers { + if sub.PeerID == publisherPeerID { + continue + } + + sender, err := sub.PeerConnection.AddTrack(localTrack) + if err != nil { + continue + } + + // RTCP drain обязателен + go func() { + buf := make([]byte, 1500) + for { + if _, _, e := sender.Read(buf); e != nil { + return + } + } + }() + + // После AddTrack нужна renegotiation для подписчика + _ = renegotiatePeer(roomID, sub.PeerID, sub.PeerConnection) + } + + // Пересылаем RTP пакеты от издателя всем подписчикам + for { + pkt, _, err := remote.ReadRTP() + if err != nil { + if err == io.EOF { + return + } + return + } + if err = localTrack.WriteRTP(pkt); err != nil { + return + } + } + }) +} + +func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error { + offer, err := pc.CreateOffer(nil) + if err != nil { + return err + } + + gatherDone := webrtc.GatheringCompletePromise(pc) + if err = pc.SetLocalDescription(offer); err != nil { + return err + } + <-gatherDone + + if OnServerOffer != nil && pc.LocalDescription() != nil { + OnServerOffer(roomID, peerID, *pc.LocalDescription()) + } + return nil +} + +// Java -> SFU: answer клиента на server-offer +func HandleClientAnswer(roomID, peerID string, answer webrtc.SessionDescription) error { + roomsMu.RLock() + room, ok := rooms[roomID] + if !ok { + roomsMu.RUnlock() + return ErrRoomNotFound + } + + var pc *webrtc.PeerConnection + for _, p := range room.Peers { + if p.PeerID == peerID { + pc = p.PeerConnection + break + } + } + roomsMu.RUnlock() + + if pc == nil { + return ErrPeerNotFound + } + return pc.SetRemoteDescription(answer) +}