Files
g365sfu/sfu/forwarding.go

191 lines
4.4 KiB
Go

package sfu
import (
"fmt"
"io"
"strings"
"sync"
"github.com/pion/webrtc/v4"
)
var (
pendingMu sync.Mutex
pendingCandidates = map[string][]webrtc.ICECandidateInit{} // key: roomID|peerID
)
func peerKey(roomID, peerID string) string {
return roomID + "|" + peerID
}
func extractICEUfrag(sdp string) string {
for _, line := range strings.Split(sdp, "\n") {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "a=ice-ufrag:") {
return strings.TrimPrefix(line, "a=ice-ufrag:")
}
}
return ""
}
// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты
func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) {
publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
localTrack, err := webrtc.NewTrackLocalStaticRTP(
remote.Codec().RTPCodecCapability,
fmt.Sprintf("%s:%s", publisherPeerID, remote.ID()),
remote.StreamID(),
)
if err != nil {
return
}
// Добавляем этот track всем, кроме publisher
roomsMu.RLock()
room, ok := rooms[roomID]
if !ok {
roomsMu.RUnlock()
return
}
peers := make([]Peer, len(room.Peers))
copy(peers, room.Peers)
roomsMu.RUnlock()
for _, sub := range peers {
if sub.PeerID == publisherPeerID {
continue
}
sender, err := sub.PeerConnection.AddTrack(localTrack)
if err != nil {
continue
}
// RTCP drain обязателен
go func() {
buf := make([]byte, 1500)
for {
if _, _, e := sender.Read(buf); e != nil {
return
}
}
}()
// После AddTrack нужна renegotiation для подписчика
_ = renegotiatePeer(roomID, sub.PeerID, sub.PeerConnection)
}
// Пересылаем RTP пакеты от издателя всем подписчикам
for {
pkt, _, err := remote.ReadRTP()
if err != nil {
if err == io.EOF {
return
}
return
}
if err = localTrack.WriteRTP(pkt); err != nil {
return
}
}
})
}
func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error {
offer, err := pc.CreateOffer(nil)
if err != nil {
return err
}
gatherDone := webrtc.GatheringCompletePromise(pc)
if err = pc.SetLocalDescription(offer); err != nil {
return err
}
<-gatherDone
if OnServerOffer != nil && pc.LocalDescription() != nil {
OnServerOffer(roomID, peerID, *pc.LocalDescription())
}
return nil
}
// Добавляет ICE-кандидата к пиру
func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidateInit) error {
room, exists := GetRoom(roomID)
if !exists {
return ErrRoomNotFound
}
var pc *webrtc.PeerConnection
for _, p := range room.Peers {
if p.PeerID == peerID {
pc = p.PeerConnection
break
}
}
if pc == nil {
return ErrPeerNotFound
}
rd := pc.RemoteDescription()
if rd == nil {
// answer/offer еще не применен — буферизуем
pendingMu.Lock()
k := peerKey(roomID, peerID)
pendingCandidates[k] = append(pendingCandidates[k], candidate)
pendingMu.Unlock()
return nil
}
// отбрасываем stale candidate по ufrag
if candidate.UsernameFragment != nil {
current := extractICEUfrag(rd.SDP)
if current != "" && *candidate.UsernameFragment != current {
return nil
}
}
err := pc.AddICECandidate(candidate)
if err != nil && strings.Contains(err.Error(), "doesn't match the current ufrags") {
// поздний старый кандидат — игнорируем
return nil
}
return err
}
// Обрабатывает SDP ответ от клиента при renegotiation
func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescription) error {
room, exists := GetRoom(roomID)
if !exists {
return ErrRoomNotFound
}
var pc *webrtc.PeerConnection
for _, p := range room.Peers {
if p.PeerID == peerID {
pc = p.PeerConnection
break
}
}
if pc == nil {
return ErrPeerNotFound
}
if err := pc.SetRemoteDescription(answer); err != nil {
return err
}
// после применения answer — применяем отложенные кандидаты
k := peerKey(roomID, peerID)
pendingMu.Lock()
queue := pendingCandidates[k]
delete(pendingCandidates, k)
pendingMu.Unlock()
for _, c := range queue {
_ = AddICECandidate(roomID, peerID, c)
}
return nil
}