207 lines
4.8 KiB
Go
207 lines
4.8 KiB
Go
package sfu
|
||
|
||
import (
|
||
"fmt"
|
||
"io"
|
||
"strings"
|
||
"sync"
|
||
|
||
"github.com/pion/webrtc/v4"
|
||
)
|
||
|
||
var (
|
||
pendingMu sync.Mutex
|
||
pendingCandidates = map[string][]webrtc.ICECandidateInit{} // key: roomID|peerID
|
||
renegMu sync.Map
|
||
)
|
||
|
||
func getRenegLock(roomID, peerID string) *sync.Mutex {
|
||
k := peerKey(roomID, peerID)
|
||
v, _ := renegMu.LoadOrStore(k, &sync.Mutex{})
|
||
return v.(*sync.Mutex)
|
||
}
|
||
|
||
func peerKey(roomID, peerID string) string {
|
||
return roomID + "|" + peerID
|
||
}
|
||
|
||
func extractICEUfrag(sdp string) string {
|
||
for _, line := range strings.Split(sdp, "\n") {
|
||
line = strings.TrimSpace(line)
|
||
if strings.HasPrefix(line, "a=ice-ufrag:") {
|
||
return strings.TrimPrefix(line, "a=ice-ufrag:")
|
||
}
|
||
}
|
||
return ""
|
||
}
|
||
|
||
// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты
|
||
func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) {
|
||
publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
|
||
localTrack, err := webrtc.NewTrackLocalStaticRTP(
|
||
remote.Codec().RTPCodecCapability,
|
||
fmt.Sprintf("%s:%s", publisherPeerID, remote.ID()),
|
||
remote.StreamID(),
|
||
)
|
||
if err != nil {
|
||
return
|
||
}
|
||
|
||
// Добавляем этот track всем, кроме publisher
|
||
roomsMu.RLock()
|
||
room, ok := rooms[roomID]
|
||
if !ok {
|
||
roomsMu.RUnlock()
|
||
return
|
||
}
|
||
peers := make([]Peer, len(room.Peers))
|
||
copy(peers, room.Peers)
|
||
roomsMu.RUnlock()
|
||
|
||
for _, sub := range peers {
|
||
if sub.PeerID == publisherPeerID {
|
||
continue
|
||
}
|
||
|
||
sender, err := sub.PeerConnection.AddTrack(localTrack)
|
||
if err != nil {
|
||
continue
|
||
}
|
||
|
||
// RTCP drain обязателен
|
||
go func() {
|
||
buf := make([]byte, 1500)
|
||
for {
|
||
if _, _, e := sender.Read(buf); e != nil {
|
||
return
|
||
}
|
||
}
|
||
}()
|
||
|
||
// После AddTrack нужна renegotiation для подписчика
|
||
_ = renegotiatePeer(roomID, sub.PeerID, sub.PeerConnection)
|
||
}
|
||
|
||
// Пересылаем RTP пакеты от издателя всем подписчикам
|
||
for {
|
||
pkt, _, err := remote.ReadRTP()
|
||
if err != nil {
|
||
if err == io.EOF {
|
||
return
|
||
}
|
||
return
|
||
}
|
||
if err = localTrack.WriteRTP(pkt); err != nil {
|
||
return
|
||
}
|
||
}
|
||
})
|
||
}
|
||
|
||
func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error {
|
||
lock := getRenegLock(roomID, peerID)
|
||
lock.Lock()
|
||
defer lock.Unlock()
|
||
|
||
// Не начинаем новую negotiation поверх текущей
|
||
if pc.SignalingState() != webrtc.SignalingStateStable {
|
||
return nil
|
||
}
|
||
|
||
offer, err := pc.CreateOffer(nil)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
gatherDone := webrtc.GatheringCompletePromise(pc)
|
||
if err = pc.SetLocalDescription(offer); err != nil {
|
||
return err
|
||
}
|
||
<-gatherDone
|
||
|
||
if OnServerOffer != nil && pc.LocalDescription() != nil {
|
||
OnServerOffer(roomID, peerID, *pc.LocalDescription())
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// Добавляет ICE-кандидата к пиру
|
||
func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidateInit) error {
|
||
room, exists := GetRoom(roomID)
|
||
if !exists {
|
||
return ErrRoomNotFound
|
||
}
|
||
|
||
var pc *webrtc.PeerConnection
|
||
for _, p := range room.Peers {
|
||
if p.PeerID == peerID {
|
||
pc = p.PeerConnection
|
||
break
|
||
}
|
||
}
|
||
if pc == nil {
|
||
return ErrPeerNotFound
|
||
}
|
||
|
||
rd := pc.RemoteDescription()
|
||
if rd == nil {
|
||
// answer/offer еще не применен — буферизуем
|
||
pendingMu.Lock()
|
||
k := peerKey(roomID, peerID)
|
||
pendingCandidates[k] = append(pendingCandidates[k], candidate)
|
||
pendingMu.Unlock()
|
||
return nil
|
||
}
|
||
|
||
// отбрасываем stale candidate по ufrag
|
||
if candidate.UsernameFragment != nil {
|
||
current := extractICEUfrag(rd.SDP)
|
||
if current != "" && *candidate.UsernameFragment != current {
|
||
return nil
|
||
}
|
||
}
|
||
|
||
err := pc.AddICECandidate(candidate)
|
||
if err != nil && strings.Contains(err.Error(), "doesn't match the current ufrags") {
|
||
// поздний старый кандидат — игнорируем
|
||
return nil
|
||
}
|
||
return err
|
||
}
|
||
|
||
// Обрабатывает SDP ответ от клиента при renegotiation
|
||
func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescription) error {
|
||
room, exists := GetRoom(roomID)
|
||
if !exists {
|
||
return ErrRoomNotFound
|
||
}
|
||
|
||
var pc *webrtc.PeerConnection
|
||
for _, p := range room.Peers {
|
||
if p.PeerID == peerID {
|
||
pc = p.PeerConnection
|
||
break
|
||
}
|
||
}
|
||
if pc == nil {
|
||
return ErrPeerNotFound
|
||
}
|
||
|
||
if err := pc.SetRemoteDescription(answer); err != nil {
|
||
return err
|
||
}
|
||
|
||
// после применения answer — применяем отложенные кандидаты
|
||
k := peerKey(roomID, peerID)
|
||
pendingMu.Lock()
|
||
queue := pendingCandidates[k]
|
||
delete(pendingCandidates, k)
|
||
pendingMu.Unlock()
|
||
|
||
for _, c := range queue {
|
||
_ = AddICECandidate(roomID, peerID, c)
|
||
}
|
||
|
||
return nil
|
||
}
|