Files
g365sfu/sfu/forwarding.go

207 lines
4.8 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package sfu
import (
"fmt"
"io"
"strings"
"sync"
"github.com/pion/webrtc/v4"
)
var (
pendingMu sync.Mutex
pendingCandidates = map[string][]webrtc.ICECandidateInit{} // key: roomID|peerID
renegMu sync.Map
)
func getRenegLock(roomID, peerID string) *sync.Mutex {
k := peerKey(roomID, peerID)
v, _ := renegMu.LoadOrStore(k, &sync.Mutex{})
return v.(*sync.Mutex)
}
func peerKey(roomID, peerID string) string {
return roomID + "|" + peerID
}
func extractICEUfrag(sdp string) string {
for _, line := range strings.Split(sdp, "\n") {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "a=ice-ufrag:") {
return strings.TrimPrefix(line, "a=ice-ufrag:")
}
}
return ""
}
// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты
func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) {
publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
localTrack, err := webrtc.NewTrackLocalStaticRTP(
remote.Codec().RTPCodecCapability,
fmt.Sprintf("%s:%s", publisherPeerID, remote.ID()),
remote.StreamID(),
)
if err != nil {
return
}
// Добавляем этот track всем, кроме publisher
roomsMu.RLock()
room, ok := rooms[roomID]
if !ok {
roomsMu.RUnlock()
return
}
peers := make([]Peer, len(room.Peers))
copy(peers, room.Peers)
roomsMu.RUnlock()
for _, sub := range peers {
if sub.PeerID == publisherPeerID {
continue
}
sender, err := sub.PeerConnection.AddTrack(localTrack)
if err != nil {
continue
}
// RTCP drain обязателен
go func() {
buf := make([]byte, 1500)
for {
if _, _, e := sender.Read(buf); e != nil {
return
}
}
}()
// После AddTrack нужна renegotiation для подписчика
_ = renegotiatePeer(roomID, sub.PeerID, sub.PeerConnection)
}
// Пересылаем RTP пакеты от издателя всем подписчикам
for {
pkt, _, err := remote.ReadRTP()
if err != nil {
if err == io.EOF {
return
}
return
}
if err = localTrack.WriteRTP(pkt); err != nil {
return
}
}
})
}
func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error {
lock := getRenegLock(roomID, peerID)
lock.Lock()
defer lock.Unlock()
// Не начинаем новую negotiation поверх текущей
if pc.SignalingState() != webrtc.SignalingStateStable {
return nil
}
offer, err := pc.CreateOffer(nil)
if err != nil {
return err
}
gatherDone := webrtc.GatheringCompletePromise(pc)
if err = pc.SetLocalDescription(offer); err != nil {
return err
}
<-gatherDone
if OnServerOffer != nil && pc.LocalDescription() != nil {
OnServerOffer(roomID, peerID, *pc.LocalDescription())
}
return nil
}
// Добавляет ICE-кандидата к пиру
func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidateInit) error {
room, exists := GetRoom(roomID)
if !exists {
return ErrRoomNotFound
}
var pc *webrtc.PeerConnection
for _, p := range room.Peers {
if p.PeerID == peerID {
pc = p.PeerConnection
break
}
}
if pc == nil {
return ErrPeerNotFound
}
rd := pc.RemoteDescription()
if rd == nil {
// answer/offer еще не применен — буферизуем
pendingMu.Lock()
k := peerKey(roomID, peerID)
pendingCandidates[k] = append(pendingCandidates[k], candidate)
pendingMu.Unlock()
return nil
}
// отбрасываем stale candidate по ufrag
if candidate.UsernameFragment != nil {
current := extractICEUfrag(rd.SDP)
if current != "" && *candidate.UsernameFragment != current {
return nil
}
}
err := pc.AddICECandidate(candidate)
if err != nil && strings.Contains(err.Error(), "doesn't match the current ufrags") {
// поздний старый кандидат — игнорируем
return nil
}
return err
}
// Обрабатывает SDP ответ от клиента при renegotiation
func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescription) error {
room, exists := GetRoom(roomID)
if !exists {
return ErrRoomNotFound
}
var pc *webrtc.PeerConnection
for _, p := range room.Peers {
if p.PeerID == peerID {
pc = p.PeerConnection
break
}
}
if pc == nil {
return ErrPeerNotFound
}
if err := pc.SetRemoteDescription(answer); err != nil {
return err
}
// после применения answer — применяем отложенные кандидаты
k := peerKey(roomID, peerID)
pendingMu.Lock()
queue := pendingCandidates[k]
delete(pendingCandidates, k)
pendingMu.Unlock()
for _, c := range queue {
_ = AddICECandidate(roomID, peerID, c)
}
return nil
}