403 lines
9.6 KiB
Go
403 lines
9.6 KiB
Go
package sfu
|
||
|
||
import (
|
||
"fmt"
|
||
"g365sfu/logger"
|
||
"io"
|
||
"strings"
|
||
"sync"
|
||
"time"
|
||
|
||
"github.com/pion/rtcp"
|
||
"github.com/pion/webrtc/v4"
|
||
)
|
||
|
||
var (
|
||
pendingMu sync.Mutex
|
||
pendingCandidates = map[string][]webrtc.ICECandidateInit{} // key: roomID|peerID
|
||
|
||
renegMu sync.Map // key -> *sync.Mutex
|
||
renegPending sync.Map // key -> bool
|
||
)
|
||
|
||
const disconnectGracePeriod = 12 * time.Second
|
||
|
||
func peerKey(roomID, peerID string) string {
|
||
return roomID + "|" + peerID
|
||
}
|
||
|
||
func getRenegLock(roomID, peerID string) *sync.Mutex {
|
||
k := peerKey(roomID, peerID)
|
||
v, _ := renegMu.LoadOrStore(k, &sync.Mutex{})
|
||
return v.(*sync.Mutex)
|
||
}
|
||
|
||
func setRenegPending(roomID, peerID string, v bool) {
|
||
renegPending.Store(peerKey(roomID, peerID), v)
|
||
}
|
||
|
||
func popRenegPending(roomID, peerID string) bool {
|
||
k := peerKey(roomID, peerID)
|
||
v, ok := renegPending.Load(k)
|
||
if ok {
|
||
renegPending.Delete(k)
|
||
}
|
||
if !ok {
|
||
return false
|
||
}
|
||
b, _ := v.(bool)
|
||
return b
|
||
}
|
||
|
||
func extractICEUfrag(sdp string) string {
|
||
for _, line := range strings.Split(sdp, "\n") {
|
||
line = strings.TrimSpace(line)
|
||
if strings.HasPrefix(line, "a=ice-ufrag:") {
|
||
return strings.TrimPrefix(line, "a=ice-ufrag:")
|
||
}
|
||
}
|
||
return ""
|
||
}
|
||
|
||
func removeRoomTrack(roomID, trackID string) {
|
||
room, ok := GetRoom(roomID)
|
||
if !ok {
|
||
return
|
||
}
|
||
room.mu.Lock()
|
||
defer room.mu.Unlock()
|
||
|
||
out := room.Tracks[:0]
|
||
for _, t := range room.Tracks {
|
||
if t.TrackID != trackID {
|
||
out = append(out, t)
|
||
}
|
||
}
|
||
room.Tracks = out
|
||
}
|
||
|
||
func isPeerConnectionAlive(pc *webrtc.PeerConnection) bool {
|
||
state := pc.ConnectionState()
|
||
return state != webrtc.PeerConnectionStateClosed &&
|
||
state != webrtc.PeerConnectionStateFailed &&
|
||
state != webrtc.PeerConnectionStateDisconnected
|
||
}
|
||
|
||
// Вешается на каждый PeerConnection при JoinWithOffer.
|
||
// Автоматически вызывает LeaveRoom при обрыве соединения и очищает состояние ретрансляции.
|
||
func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) {
|
||
var (
|
||
mu sync.Mutex
|
||
disconnecting bool
|
||
timer *time.Timer
|
||
leaveOnce sync.Once
|
||
)
|
||
|
||
room, exists := GetRoom(roomID)
|
||
if !exists {
|
||
logger.LogWarnMessage("BindPeerLifecycle: tried to bind non existing room " + roomID)
|
||
return
|
||
}
|
||
server := room.Server
|
||
|
||
cancelTimer := func() {
|
||
mu.Lock()
|
||
defer mu.Unlock()
|
||
if timer != nil {
|
||
timer.Stop()
|
||
timer = nil
|
||
}
|
||
disconnecting = false
|
||
}
|
||
|
||
leaveAndNotify := func(reason DisconnectReason) {
|
||
leaveOnce.Do(func() {
|
||
cancelTimer()
|
||
err := LeaveRoom(roomID, peerID)
|
||
if OnPeerDisconnected != nil && err == nil {
|
||
OnPeerDisconnected(roomID, peerID, server, reason)
|
||
}
|
||
})
|
||
}
|
||
|
||
startTimer := func() {
|
||
mu.Lock()
|
||
defer mu.Unlock()
|
||
if disconnecting {
|
||
return
|
||
}
|
||
disconnecting = true
|
||
timer = time.AfterFunc(disconnectGracePeriod, func() {
|
||
state := pc.ConnectionState()
|
||
if state == webrtc.PeerConnectionStateConnected {
|
||
mu.Lock()
|
||
disconnecting = false
|
||
mu.Unlock()
|
||
return
|
||
}
|
||
leaveAndNotify(DisconnectReasonFailed)
|
||
})
|
||
}
|
||
|
||
pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
|
||
switch state {
|
||
case webrtc.ICEConnectionStateConnected, webrtc.ICEConnectionStateCompleted:
|
||
cancelTimer()
|
||
case webrtc.ICEConnectionStateDisconnected:
|
||
startTimer()
|
||
case webrtc.ICEConnectionStateClosed:
|
||
leaveAndNotify(DisconnectReasonClosed)
|
||
case webrtc.ICEConnectionStateFailed:
|
||
leaveAndNotify(DisconnectReasonFailed)
|
||
}
|
||
})
|
||
|
||
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
|
||
switch state {
|
||
case webrtc.PeerConnectionStateConnected:
|
||
cancelTimer()
|
||
case webrtc.PeerConnectionStateDisconnected:
|
||
startTimer()
|
||
case webrtc.PeerConnectionStateClosed:
|
||
leaveAndNotify(DisconnectReasonClosed)
|
||
case webrtc.PeerConnectionStateFailed:
|
||
leaveAndNotify(DisconnectReasonFailed)
|
||
}
|
||
})
|
||
}
|
||
|
||
// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты
|
||
// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты
|
||
func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) {
|
||
publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
|
||
localTrackID := fmt.Sprintf("%s:%s:%s:%d",
|
||
publisherPeerID,
|
||
remote.Kind().String(),
|
||
remote.ID(),
|
||
remote.SSRC(),
|
||
)
|
||
|
||
localTrack, err := webrtc.NewTrackLocalStaticRTP(
|
||
remote.Codec().RTPCodecCapability,
|
||
localTrackID,
|
||
remote.StreamID(),
|
||
)
|
||
if err != nil {
|
||
logger.LogErrorMessage("SetupForwardingForPeer: NewTrackLocalStaticRTP error: " + err.Error())
|
||
return
|
||
}
|
||
defer removeRoomTrack(roomID, localTrack.ID())
|
||
|
||
room, ok := GetRoom(roomID)
|
||
if !ok {
|
||
return
|
||
}
|
||
|
||
room.mu.Lock()
|
||
room.Tracks = append(room.Tracks, RoomTrack{
|
||
TrackID: localTrack.ID(),
|
||
OwnerPeer: publisherPeerID,
|
||
Local: localTrack,
|
||
})
|
||
peers := make([]Peer, len(room.Peers))
|
||
copy(peers, room.Peers)
|
||
room.mu.Unlock()
|
||
|
||
for _, sub := range peers {
|
||
if sub.PeerID == publisherPeerID {
|
||
continue
|
||
}
|
||
|
||
if !isPeerConnectionAlive(sub.PeerConnection) {
|
||
continue
|
||
}
|
||
|
||
sender, err := sub.PeerConnection.AddTrack(localTrack)
|
||
if err != nil {
|
||
logger.LogWarnMessage("SetupForwardingForPeer: AddTrack error: " + sub.PeerID + " " + err.Error())
|
||
continue
|
||
}
|
||
|
||
senderCopy := sender
|
||
go func() {
|
||
buf := make([]byte, 1500)
|
||
for {
|
||
if _, _, e := senderCopy.Read(buf); e != nil {
|
||
return
|
||
}
|
||
}
|
||
}()
|
||
|
||
subID := sub.PeerID
|
||
subPC := sub.PeerConnection
|
||
|
||
go func() {
|
||
logger.LogInfoMessage("SetupForwardingForPeer: starting renegotiation for peer=" + subID)
|
||
if err := renegotiatePeer(roomID, subID, subPC); err != nil {
|
||
logger.LogWarnMessage("SetupForwardingForPeer: renegotiatePeer error: " + subID + " " + err.Error())
|
||
}
|
||
}()
|
||
}
|
||
|
||
// Для video просим keyframe
|
||
if remote.Kind() == webrtc.RTPCodecTypeVideo {
|
||
_ = publisherPC.WriteRTCP([]rtcp.Packet{
|
||
&rtcp.PictureLossIndication{MediaSSRC: uint32(remote.SSRC())},
|
||
})
|
||
}
|
||
|
||
// Publisher RTP -> localTrack -> subscribers
|
||
for {
|
||
pkt, _, err := remote.ReadRTP()
|
||
if err != nil {
|
||
if err == io.EOF {
|
||
return
|
||
}
|
||
logger.LogWarnMessage("SetupForwardingForPeer: ReadRTP error: " + err.Error())
|
||
return
|
||
}
|
||
if err = localTrack.WriteRTP(pkt); err != nil {
|
||
logger.LogWarnMessage("SetupForwardingForPeer: WriteRTP error: " + err.Error())
|
||
return
|
||
}
|
||
}
|
||
})
|
||
}
|
||
|
||
func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error {
|
||
lock := getRenegLock(roomID, peerID)
|
||
lock.Lock()
|
||
defer lock.Unlock()
|
||
|
||
if !isPeerConnectionAlive(pc) {
|
||
return nil
|
||
}
|
||
|
||
// Не начинаем новую negotiation поверх текущей.
|
||
if pc.SignalingState() != webrtc.SignalingStateStable {
|
||
setRenegPending(roomID, peerID, true)
|
||
return nil
|
||
}
|
||
|
||
offer, err := pc.CreateOffer(nil)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
gatherDone := webrtc.GatheringCompletePromise(pc)
|
||
if err = pc.SetLocalDescription(offer); err != nil {
|
||
return err
|
||
}
|
||
<-gatherDone
|
||
|
||
if OnServerOffer != nil && pc.LocalDescription() != nil {
|
||
OnServerOffer(roomID, peerID, *pc.LocalDescription())
|
||
}
|
||
return nil
|
||
}
|
||
|
||
// Добавляет ICE-кандидата к пиру
|
||
func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidateInit) error {
|
||
room, exists := GetRoom(roomID)
|
||
if !exists {
|
||
return ErrRoomNotFound
|
||
}
|
||
|
||
room.mu.RLock()
|
||
var pc *webrtc.PeerConnection
|
||
for _, p := range room.Peers {
|
||
if p.PeerID == peerID {
|
||
pc = p.PeerConnection
|
||
break
|
||
}
|
||
}
|
||
room.mu.RUnlock()
|
||
|
||
if pc == nil {
|
||
return ErrPeerNotFound
|
||
}
|
||
|
||
rd := pc.RemoteDescription()
|
||
if rd == nil {
|
||
// offer/answer еще не применен — буферизуем
|
||
pendingMu.Lock()
|
||
k := peerKey(roomID, peerID)
|
||
pendingCandidates[k] = append(pendingCandidates[k], candidate)
|
||
pendingMu.Unlock()
|
||
return nil
|
||
}
|
||
|
||
// Отбрасываем stale candidate по ufrag
|
||
if candidate.UsernameFragment != nil {
|
||
current := extractICEUfrag(rd.SDP)
|
||
if current != "" && *candidate.UsernameFragment != current {
|
||
return nil
|
||
}
|
||
}
|
||
|
||
err := pc.AddICECandidate(candidate)
|
||
if err != nil && strings.Contains(err.Error(), "doesn't match the current ufrags") {
|
||
return nil
|
||
}
|
||
return err
|
||
}
|
||
|
||
// Обрабатывает SDP answer от клиента при renegotiation
|
||
func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescription) error {
|
||
if answer.Type != webrtc.SDPTypeAnswer {
|
||
return fmt.Errorf("invalid sdp type: %s", answer.Type.String())
|
||
}
|
||
|
||
room, exists := GetRoom(roomID)
|
||
if !exists {
|
||
return ErrRoomNotFound
|
||
}
|
||
|
||
room.mu.RLock()
|
||
var pc *webrtc.PeerConnection
|
||
for _, p := range room.Peers {
|
||
if p.PeerID == peerID {
|
||
pc = p.PeerConnection
|
||
break
|
||
}
|
||
}
|
||
room.mu.RUnlock()
|
||
|
||
if pc == nil {
|
||
return ErrPeerNotFound
|
||
}
|
||
|
||
if err := pc.SetRemoteDescription(answer); err != nil {
|
||
return err
|
||
}
|
||
|
||
// После применения answer — применяем отложенные кандидаты.
|
||
k := peerKey(roomID, peerID)
|
||
pendingMu.Lock()
|
||
queue := pendingCandidates[k]
|
||
delete(pendingCandidates, k)
|
||
pendingMu.Unlock()
|
||
|
||
for _, c := range queue {
|
||
_ = AddICECandidate(roomID, peerID, c)
|
||
}
|
||
|
||
// Если во время negotiation накопился новый запрос — запускаем еще цикл.
|
||
if popRenegPending(roomID, peerID) {
|
||
_ = renegotiatePeer(roomID, peerID, pc)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func cleanupForwardingState(roomID, peerID string) {
|
||
k := peerKey(roomID, peerID)
|
||
|
||
pendingMu.Lock()
|
||
delete(pendingCandidates, k)
|
||
pendingMu.Unlock()
|
||
|
||
renegPending.Delete(k)
|
||
renegMu.Delete(k)
|
||
}
|