Files
g365sfu/sfu/forwarding.go

388 lines
9.1 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package sfu
import (
"fmt"
"g365sfu/logger"
"io"
"strings"
"sync"
"time"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v4"
)
var (
pendingMu sync.Mutex
pendingCandidates = map[string][]webrtc.ICECandidateInit{} // key: roomID|peerID
renegMu sync.Map // key -> *sync.Mutex
renegPending sync.Map // key -> bool
)
const disconnectGracePeriod = 12 * time.Second
func peerKey(roomID, peerID string) string {
return roomID + "|" + peerID
}
func getRenegLock(roomID, peerID string) *sync.Mutex {
k := peerKey(roomID, peerID)
v, _ := renegMu.LoadOrStore(k, &sync.Mutex{})
return v.(*sync.Mutex)
}
func setRenegPending(roomID, peerID string, v bool) {
renegPending.Store(peerKey(roomID, peerID), v)
}
func popRenegPending(roomID, peerID string) bool {
k := peerKey(roomID, peerID)
v, ok := renegPending.Load(k)
if ok {
renegPending.Delete(k)
}
if !ok {
return false
}
b, _ := v.(bool)
return b
}
func extractICEUfrag(sdp string) string {
for _, line := range strings.Split(sdp, "\n") {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "a=ice-ufrag:") {
return strings.TrimPrefix(line, "a=ice-ufrag:")
}
}
return ""
}
func removeRoomTrack(roomID, trackID string) {
room, ok := GetRoom(roomID)
if !ok {
return
}
room.mu.Lock()
defer room.mu.Unlock()
out := room.Tracks[:0]
for _, t := range room.Tracks {
if t.TrackID != trackID {
out = append(out, t)
}
}
room.Tracks = out
}
func isPeerConnectionAlive(pc *webrtc.PeerConnection) bool {
state := pc.ConnectionState()
return state != webrtc.PeerConnectionStateClosed &&
state != webrtc.PeerConnectionStateFailed &&
state != webrtc.PeerConnectionStateDisconnected
}
// Вешается на каждый PeerConnection при JoinWithOffer.
// Автоматически вызывает LeaveRoom при обрыве соединения и очищает состояние ретрансляции.
func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) {
var (
mu sync.Mutex
disconnecting bool
timer *time.Timer
)
startTimer := func() {
mu.Lock()
defer mu.Unlock()
if disconnecting {
return
}
disconnecting = true
timer = time.AfterFunc(disconnectGracePeriod, func() {
state := pc.ConnectionState()
if state == webrtc.PeerConnectionStateConnected {
mu.Lock()
disconnecting = false
mu.Unlock()
return
}
_ = LeaveRoom(roomID, peerID)
if OnPeerDisconnected != nil {
OnPeerDisconnected(roomID, peerID)
}
})
}
cancelTimer := func() {
mu.Lock()
defer mu.Unlock()
if timer != nil {
timer.Stop()
timer = nil
}
disconnecting = false
}
pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
switch state {
case webrtc.ICEConnectionStateConnected, webrtc.ICEConnectionStateCompleted:
cancelTimer()
case webrtc.ICEConnectionStateDisconnected:
startTimer()
case webrtc.ICEConnectionStateFailed, webrtc.ICEConnectionStateClosed:
cancelTimer()
_ = LeaveRoom(roomID, peerID)
if OnPeerDisconnected != nil {
OnPeerDisconnected(roomID, peerID)
}
}
})
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
switch state {
case webrtc.PeerConnectionStateConnected:
cancelTimer()
case webrtc.PeerConnectionStateDisconnected:
startTimer()
case webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateClosed:
cancelTimer()
_ = LeaveRoom(roomID, peerID)
if OnPeerDisconnected != nil {
OnPeerDisconnected(roomID, peerID)
}
}
})
}
// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты
func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) {
publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
localTrackID := fmt.Sprintf("%s:%s:%s:%d",
publisherPeerID,
remote.Kind().String(),
remote.ID(),
remote.SSRC(),
)
localTrack, err := webrtc.NewTrackLocalStaticRTP(
remote.Codec().RTPCodecCapability,
localTrackID,
remote.StreamID(),
)
if err != nil {
logger.LogErrorMessage("SetupForwardingForPeer: NewTrackLocalStaticRTP error")
return
}
defer removeRoomTrack(roomID, localTrack.ID())
room, ok := GetRoom(roomID)
if !ok {
return
}
room.mu.Lock()
room.Tracks = append(room.Tracks, RoomTrack{
TrackID: localTrack.ID(),
OwnerPeer: publisherPeerID,
Local: localTrack,
})
peers := make([]Peer, len(room.Peers))
copy(peers, room.Peers)
room.mu.Unlock()
for _, sub := range peers {
if sub.PeerID == publisherPeerID {
continue
}
// Не трогаем закрытые/failed соединения
if !isPeerConnectionAlive(sub.PeerConnection) {
fmt.Println("SetupForwardingForPeer: skipping dead peer:", sub.PeerID,
sub.PeerConnection.ConnectionState().String())
continue
}
sender, err := sub.PeerConnection.AddTrack(localTrack)
if err != nil {
fmt.Println("SetupForwardingForPeer: AddTrack error:", roomID, sub.PeerID, err)
continue
}
// RTCP drain
go func() {
buf := make([]byte, 1500)
for {
if _, _, e := sender.Read(buf); e != nil {
return
}
}
}()
if err = renegotiatePeer(roomID, sub.PeerID, sub.PeerConnection); err != nil {
fmt.Println("SetupForwardingForPeer: renegotiatePeer error:", roomID, sub.PeerID, err)
}
}
// Для video просим keyframe
if remote.Kind() == webrtc.RTPCodecTypeVideo {
_ = publisherPC.WriteRTCP([]rtcp.Packet{
&rtcp.PictureLossIndication{MediaSSRC: uint32(remote.SSRC())},
})
}
// Publisher RTP -> localTrack -> subscribers
for {
pkt, _, err := remote.ReadRTP()
if err != nil {
if err == io.EOF {
return
}
fmt.Println("SetupForwardingForPeer: ReadRTP error:", err)
return
}
if err = localTrack.WriteRTP(pkt); err != nil {
fmt.Println("SetupForwardingForPeer: WriteRTP error:", err)
return
}
}
})
}
func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error {
lock := getRenegLock(roomID, peerID)
lock.Lock()
defer lock.Unlock()
if !isPeerConnectionAlive(pc) {
return nil
}
// Не начинаем новую negotiation поверх текущей.
if pc.SignalingState() != webrtc.SignalingStateStable {
setRenegPending(roomID, peerID, true)
return nil
}
offer, err := pc.CreateOffer(nil)
if err != nil {
return err
}
gatherDone := webrtc.GatheringCompletePromise(pc)
if err = pc.SetLocalDescription(offer); err != nil {
return err
}
<-gatherDone
if OnServerOffer != nil && pc.LocalDescription() != nil {
OnServerOffer(roomID, peerID, *pc.LocalDescription())
}
return nil
}
// Добавляет ICE-кандидата к пиру
func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidateInit) error {
room, exists := GetRoom(roomID)
if !exists {
return ErrRoomNotFound
}
room.mu.RLock()
var pc *webrtc.PeerConnection
for _, p := range room.Peers {
if p.PeerID == peerID {
pc = p.PeerConnection
break
}
}
room.mu.RUnlock()
if pc == nil {
return ErrPeerNotFound
}
rd := pc.RemoteDescription()
if rd == nil {
// offer/answer еще не применен — буферизуем
pendingMu.Lock()
k := peerKey(roomID, peerID)
pendingCandidates[k] = append(pendingCandidates[k], candidate)
pendingMu.Unlock()
return nil
}
// Отбрасываем stale candidate по ufrag
if candidate.UsernameFragment != nil {
current := extractICEUfrag(rd.SDP)
if current != "" && *candidate.UsernameFragment != current {
return nil
}
}
err := pc.AddICECandidate(candidate)
if err != nil && strings.Contains(err.Error(), "doesn't match the current ufrags") {
return nil
}
return err
}
// Обрабатывает SDP answer от клиента при renegotiation
func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescription) error {
if answer.Type != webrtc.SDPTypeAnswer {
return fmt.Errorf("invalid sdp type: %s", answer.Type.String())
}
room, exists := GetRoom(roomID)
if !exists {
return ErrRoomNotFound
}
room.mu.RLock()
var pc *webrtc.PeerConnection
for _, p := range room.Peers {
if p.PeerID == peerID {
pc = p.PeerConnection
break
}
}
room.mu.RUnlock()
if pc == nil {
return ErrPeerNotFound
}
if err := pc.SetRemoteDescription(answer); err != nil {
return err
}
// После применения answer — применяем отложенные кандидаты.
k := peerKey(roomID, peerID)
pendingMu.Lock()
queue := pendingCandidates[k]
delete(pendingCandidates, k)
pendingMu.Unlock()
for _, c := range queue {
_ = AddICECandidate(roomID, peerID, c)
}
// Если во время negotiation накопился новый запрос — запускаем еще цикл.
if popRenegPending(roomID, peerID) {
_ = renegotiatePeer(roomID, peerID, pc)
}
return nil
}
func cleanupForwardingState(roomID, peerID string) {
k := peerKey(roomID, peerID)
pendingMu.Lock()
delete(pendingCandidates, k)
pendingMu.Unlock()
renegPending.Delete(k)
renegMu.Delete(k)
}