Files
g365sfu/sfu/forwarding.go
2026-03-20 23:19:22 +02:00

474 lines
11 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package sfu
import (
"fmt"
"g365sfu/logger"
"io"
"strings"
"sync"
"time"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4"
)
var (
pendingMu sync.Mutex
pendingCandidates = map[string][]webrtc.ICECandidateInit{} // key: roomID|peerID
renegMu sync.Map // key -> *sync.Mutex
renegPending sync.Map // key -> bool
)
const (
disconnectGracePeriod = 30 * time.Second
outboundPacketBuffer = 256
maxConsecutiveReadErrs = 25
maxConsecutiveWriteErrs = 50
packetDropLogEvery = 100
)
func peerKey(roomID, peerID string) string {
return roomID + "|" + peerID
}
func getRenegLock(roomID, peerID string) *sync.Mutex {
k := peerKey(roomID, peerID)
v, _ := renegMu.LoadOrStore(k, &sync.Mutex{})
return v.(*sync.Mutex)
}
func setRenegPending(roomID, peerID string, v bool) {
renegPending.Store(peerKey(roomID, peerID), v)
}
func popRenegPending(roomID, peerID string) bool {
k := peerKey(roomID, peerID)
v, ok := renegPending.Load(k)
if ok {
renegPending.Delete(k)
}
if !ok {
return false
}
b, _ := v.(bool)
return b
}
func extractICEUfrag(sdp string) string {
for _, line := range strings.Split(sdp, "\n") {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "a=ice-ufrag:") {
return strings.TrimPrefix(line, "a=ice-ufrag:")
}
}
return ""
}
func removeRoomTrack(roomID, trackID string) {
room, ok := GetRoom(roomID)
if !ok {
return
}
room.mu.Lock()
defer room.mu.Unlock()
out := room.Tracks[:0]
for _, t := range room.Tracks {
if t.TrackID != trackID {
out = append(out, t)
}
}
room.Tracks = out
}
func isPeerConnectionAlive(pc *webrtc.PeerConnection) bool {
state := pc.ConnectionState()
return state != webrtc.PeerConnectionStateClosed &&
state != webrtc.PeerConnectionStateFailed
}
// Вешается на каждый PeerConnection при JoinWithOffer.
// Автоматически вызывает LeaveRoom при обрыве соединения и очищает состояние ретрансляции.
func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) {
var (
mu sync.Mutex
disconnecting bool
timer *time.Timer
leaveOnce sync.Once
)
room, exists := GetRoom(roomID)
if !exists {
logger.LogWarnMessage("BindPeerLifecycle: tried to bind non existing room " + roomID)
return
}
server := room.Server
cancelTimer := func() {
mu.Lock()
defer mu.Unlock()
if timer != nil {
timer.Stop()
timer = nil
}
disconnecting = false
}
leaveAndNotify := func(reason DisconnectReason) {
leaveOnce.Do(func() {
cancelTimer()
err := LeaveRoom(roomID, peerID)
if OnPeerDisconnected != nil && err == nil {
OnPeerDisconnected(roomID, peerID, server, reason)
}
})
}
startTimer := func() {
mu.Lock()
defer mu.Unlock()
if disconnecting {
return
}
disconnecting = true
timer = time.AfterFunc(disconnectGracePeriod, func() {
state := pc.ConnectionState()
if state == webrtc.PeerConnectionStateConnected {
mu.Lock()
disconnecting = false
mu.Unlock()
return
}
leaveAndNotify(DisconnectReasonFailed)
})
}
pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
switch state {
case webrtc.ICEConnectionStateConnected, webrtc.ICEConnectionStateCompleted:
cancelTimer()
case webrtc.ICEConnectionStateDisconnected:
startTimer()
case webrtc.ICEConnectionStateClosed:
leaveAndNotify(DisconnectReasonClosed)
case webrtc.ICEConnectionStateFailed:
leaveAndNotify(DisconnectReasonFailed)
}
})
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
switch state {
case webrtc.PeerConnectionStateConnected:
cancelTimer()
case webrtc.PeerConnectionStateDisconnected:
startTimer()
case webrtc.PeerConnectionStateClosed:
leaveAndNotify(DisconnectReasonClosed)
case webrtc.PeerConnectionStateFailed:
leaveAndNotify(DisconnectReasonFailed)
}
})
}
// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты.
func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) {
publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
localTrackID := fmt.Sprintf("%s:%s:%s:%d",
publisherPeerID,
remote.Kind().String(),
remote.ID(),
remote.SSRC(),
)
localTrack, err := webrtc.NewTrackLocalStaticRTP(
remote.Codec().RTPCodecCapability,
localTrackID,
remote.StreamID(),
)
if err != nil {
logger.LogErrorMessage("SetupForwardingForPeer: NewTrackLocalStaticRTP error: " + err.Error())
return
}
defer removeRoomTrack(roomID, localTrack.ID())
room, ok := GetRoom(roomID)
if !ok {
return
}
room.mu.Lock()
room.Tracks = append(room.Tracks, RoomTrack{
TrackID: localTrack.ID(),
OwnerPeer: publisherPeerID,
Local: localTrack,
})
peers := make([]Peer, len(room.Peers))
copy(peers, room.Peers)
room.mu.Unlock()
for _, sub := range peers {
if sub.PeerID == publisherPeerID {
continue
}
if !isPeerConnectionAlive(sub.PeerConnection) {
continue
}
sender, err := sub.PeerConnection.AddTrack(localTrack)
if err != nil {
logger.LogWarnMessage("SetupForwardingForPeer: AddTrack error: " + sub.PeerID + " " + err.Error())
continue
}
senderCopy := sender
go func() {
buf := make([]byte, 1500)
for {
if _, _, e := senderCopy.Read(buf); e != nil {
return
}
}
}()
subID := sub.PeerID
subPC := sub.PeerConnection
go func() {
if err := renegotiatePeer(roomID, subID, subPC); err != nil {
logger.LogWarnMessage("SetupForwardingForPeer: renegotiatePeer error: " + subID + " " + err.Error())
}
}()
}
if remote.Kind() == webrtc.RTPCodecTypeVideo {
_ = publisherPC.WriteRTCP([]rtcp.Packet{
&rtcp.PictureLossIndication{MediaSSRC: uint32(remote.SSRC())},
})
}
// Отделяем чтение входящего RTP от записи в TrackLocal bounded-очередью.
// Если downstream начинает тормозить, пакеты дропаются из очереди,
// а не накапливают бесконечную задержку.
packetQueue := make(chan *rtp.Packet, outboundPacketBuffer)
writerDone := make(chan struct{})
defer close(packetQueue)
go func() {
defer close(writerDone)
consecutiveWriteErrs := 0
for pkt := range packetQueue {
if err := localTrack.WriteRTP(pkt); err != nil {
consecutiveWriteErrs++
if consecutiveWriteErrs == 1 || consecutiveWriteErrs%10 == 0 {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: WriteRTP error publisher=%s track=%s count=%d err=%v",
publisherPeerID, localTrack.ID(), consecutiveWriteErrs, err,
))
}
if consecutiveWriteErrs >= maxConsecutiveWriteErrs {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: too many WriteRTP errors publisher=%s track=%s",
publisherPeerID, localTrack.ID(),
))
return
}
continue
}
consecutiveWriteErrs = 0
}
}()
consecutiveReadErrs := 0
droppedPackets := 0
for {
pkt, _, err := remote.ReadRTP()
if err != nil {
if err == io.EOF {
return
}
consecutiveReadErrs++
if consecutiveReadErrs == 1 || consecutiveReadErrs%10 == 0 {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: ReadRTP error publisher=%s track=%s count=%d err=%v",
publisherPeerID, localTrack.ID(), consecutiveReadErrs, err,
))
}
if consecutiveReadErrs >= maxConsecutiveReadErrs {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: too many ReadRTP errors publisher=%s track=%s",
publisherPeerID, localTrack.ID(),
))
return
}
time.Sleep(10 * time.Millisecond)
continue
}
consecutiveReadErrs = 0
select {
case <-writerDone:
return
default:
}
select {
case <-writerDone:
return
case packetQueue <- pkt:
default:
droppedPackets++
if droppedPackets == 1 || droppedPackets%packetDropLogEvery == 0 {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: packet queue overflow publisher=%s track=%s dropped=%d",
publisherPeerID, localTrack.ID(), droppedPackets,
))
}
}
}
})
}
func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error {
lock := getRenegLock(roomID, peerID)
lock.Lock()
defer lock.Unlock()
if !isPeerConnectionAlive(pc) {
return nil
}
if pc.SignalingState() != webrtc.SignalingStateStable {
setRenegPending(roomID, peerID, true)
return nil
}
offer, err := pc.CreateOffer(nil)
if err != nil {
return err
}
gatherDone := webrtc.GatheringCompletePromise(pc)
if err = pc.SetLocalDescription(offer); err != nil {
return err
}
<-gatherDone
if OnServerOffer != nil && pc.LocalDescription() != nil {
OnServerOffer(roomID, peerID, *pc.LocalDescription())
}
return nil
}
// Добавляет ICE-кандидата к пиру.
func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidateInit) error {
room, exists := GetRoom(roomID)
if !exists {
return ErrRoomNotFound
}
room.mu.RLock()
var pc *webrtc.PeerConnection
for _, p := range room.Peers {
if p.PeerID == peerID {
pc = p.PeerConnection
break
}
}
room.mu.RUnlock()
if pc == nil {
return ErrPeerNotFound
}
rd := pc.RemoteDescription()
if rd == nil {
pendingMu.Lock()
k := peerKey(roomID, peerID)
pendingCandidates[k] = append(pendingCandidates[k], candidate)
pendingMu.Unlock()
return nil
}
if candidate.UsernameFragment != nil {
current := extractICEUfrag(rd.SDP)
if current != "" && *candidate.UsernameFragment != current {
return nil
}
}
err := pc.AddICECandidate(candidate)
if err != nil && strings.Contains(err.Error(), "doesn't match the current ufrags") {
return nil
}
return err
}
// Обрабатывает SDP answer от клиента при renegotiation.
func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescription) error {
if answer.Type != webrtc.SDPTypeAnswer {
return fmt.Errorf("invalid sdp type: %s", answer.Type.String())
}
room, exists := GetRoom(roomID)
if !exists {
return ErrRoomNotFound
}
room.mu.RLock()
var pc *webrtc.PeerConnection
for _, p := range room.Peers {
if p.PeerID == peerID {
pc = p.PeerConnection
break
}
}
room.mu.RUnlock()
if pc == nil {
return ErrPeerNotFound
}
if err := pc.SetRemoteDescription(answer); err != nil {
return err
}
k := peerKey(roomID, peerID)
pendingMu.Lock()
queue := pendingCandidates[k]
delete(pendingCandidates, k)
pendingMu.Unlock()
for _, c := range queue {
_ = AddICECandidate(roomID, peerID, c)
}
if popRenegPending(roomID, peerID) {
_ = renegotiatePeer(roomID, peerID, pc)
}
return nil
}
func cleanupForwardingState(roomID, peerID string) {
k := peerKey(roomID, peerID)
pendingMu.Lock()
delete(pendingCandidates, k)
pendingMu.Unlock()
renegPending.Delete(k)
renegMu.Delete(k)
}