Новые события с пирами и комнатами, базовый форвардинг

This commit is contained in:
set
2026-03-16 17:08:56 +02:00
parent a9a1dc5895
commit e67bd3d824
5 changed files with 370 additions and 48 deletions

View File

@@ -28,6 +28,8 @@ func Bootstrap() {
}
sfu.OnLocalICECandidate = OnLocalICECandidate
sfu.OnServerOffer = OnServerOffer
sfu.OnRoomDelete = OnRoomDelete
sfu.OnPeerDisconnected = OnPeerDisconnected
turnServer, err := turn.Start(turn.Config{
ListenAddr: "0.0.0.0:3478",
PublicIP: os.Getenv("TURN_PUBLIC_IP"),
@@ -88,3 +90,33 @@ func OnServerOffer(roomID string, peerID string, offer webrtc.SessionDescription
buffer.Flip()
room.Server.WriteBinary(buffer.Bytes())
}
func OnRoomDelete(roomID string) {
room, exists := sfu.GetRoom(roomID)
if !exists {
logger.LogWarnMessage("tried to send room delete event to non existing room " + roomID)
return
}
buffer := bytebuffer.Allocate(1 + 4 + len([]byte(roomID)))
buffer.Put(0x10)
buffer.PutUint32(uint32(len([]byte(roomID))))
buffer.PutBytes([]byte(roomID))
buffer.Flip()
room.Server.WriteBinary(buffer.Bytes())
}
func OnPeerDisconnected(roomID string, peerID string) {
room, exists := sfu.GetRoom(roomID)
if !exists {
logger.LogWarnMessage("tried to send peer disconnected event to non existing room " + roomID)
return
}
buffer := bytebuffer.Allocate(1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)))
buffer.Put(0x11)
buffer.PutUint32(uint32(len([]byte(roomID))))
buffer.PutBytes([]byte(roomID))
buffer.PutUint32(uint32(len([]byte(peerID))))
buffer.PutBytes([]byte(peerID))
buffer.Flip()
room.Server.WriteBinary(buffer.Bytes())
}

View File

@@ -2,27 +2,51 @@ package sfu
import (
"fmt"
"g365sfu/logger"
"io"
"strings"
"sync"
"time"
"github.com/pion/rtcp"
"github.com/pion/webrtc/v4"
)
var (
pendingMu sync.Mutex
pendingCandidates = map[string][]webrtc.ICECandidateInit{} // key: roomID|peerID
renegMu sync.Map
renegMu sync.Map // key -> *sync.Mutex
renegPending sync.Map // key -> bool
)
const disconnectGracePeriod = 12 * time.Second
func peerKey(roomID, peerID string) string {
return roomID + "|" + peerID
}
func getRenegLock(roomID, peerID string) *sync.Mutex {
k := peerKey(roomID, peerID)
v, _ := renegMu.LoadOrStore(k, &sync.Mutex{})
return v.(*sync.Mutex)
}
func peerKey(roomID, peerID string) string {
return roomID + "|" + peerID
func setRenegPending(roomID, peerID string, v bool) {
renegPending.Store(peerKey(roomID, peerID), v)
}
func popRenegPending(roomID, peerID string) bool {
k := peerKey(roomID, peerID)
v, ok := renegPending.Load(k)
if ok {
renegPending.Delete(k)
}
if !ok {
return false
}
b, _ := v.(bool)
return b
}
func extractICEUfrag(sdp string) string {
@@ -35,40 +59,157 @@ func extractICEUfrag(sdp string) string {
return ""
}
func removeRoomTrack(roomID, trackID string) {
room, ok := GetRoom(roomID)
if !ok {
return
}
room.mu.Lock()
defer room.mu.Unlock()
out := room.Tracks[:0]
for _, t := range room.Tracks {
if t.TrackID != trackID {
out = append(out, t)
}
}
room.Tracks = out
}
func isPeerConnectionAlive(pc *webrtc.PeerConnection) bool {
state := pc.ConnectionState()
return state != webrtc.PeerConnectionStateClosed &&
state != webrtc.PeerConnectionStateFailed &&
state != webrtc.PeerConnectionStateDisconnected
}
// Вешается на каждый PeerConnection при JoinWithOffer.
// Автоматически вызывает LeaveRoom при обрыве соединения и очищает состояние ретрансляции.
func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) {
var (
mu sync.Mutex
disconnecting bool
timer *time.Timer
)
startTimer := func() {
mu.Lock()
defer mu.Unlock()
if disconnecting {
return
}
disconnecting = true
timer = time.AfterFunc(disconnectGracePeriod, func() {
state := pc.ConnectionState()
if state == webrtc.PeerConnectionStateConnected {
mu.Lock()
disconnecting = false
mu.Unlock()
return
}
_ = LeaveRoom(roomID, peerID)
if OnPeerDisconnected != nil {
OnPeerDisconnected(roomID, peerID)
}
})
}
cancelTimer := func() {
mu.Lock()
defer mu.Unlock()
if timer != nil {
timer.Stop()
timer = nil
}
disconnecting = false
}
pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
switch state {
case webrtc.ICEConnectionStateConnected, webrtc.ICEConnectionStateCompleted:
cancelTimer()
case webrtc.ICEConnectionStateDisconnected:
startTimer()
case webrtc.ICEConnectionStateFailed, webrtc.ICEConnectionStateClosed:
cancelTimer()
_ = LeaveRoom(roomID, peerID)
if OnPeerDisconnected != nil {
OnPeerDisconnected(roomID, peerID)
}
}
})
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
switch state {
case webrtc.PeerConnectionStateConnected:
cancelTimer()
case webrtc.PeerConnectionStateDisconnected:
startTimer()
case webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateClosed:
cancelTimer()
_ = LeaveRoom(roomID, peerID)
if OnPeerDisconnected != nil {
OnPeerDisconnected(roomID, peerID)
}
}
})
}
// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты
func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) {
publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
localTrackID := fmt.Sprintf("%s:%s:%s:%d",
publisherPeerID,
remote.Kind().String(),
remote.ID(),
remote.SSRC(),
)
localTrack, err := webrtc.NewTrackLocalStaticRTP(
remote.Codec().RTPCodecCapability,
fmt.Sprintf("%s:%s", publisherPeerID, remote.ID()),
localTrackID,
remote.StreamID(),
)
if err != nil {
logger.LogErrorMessage("SetupForwardingForPeer: NewTrackLocalStaticRTP error")
return
}
defer removeRoomTrack(roomID, localTrack.ID())
room, ok := GetRoom(roomID)
if !ok {
return
}
// Добавляем этот track всем, кроме publisher
roomsMu.RLock()
room, ok := rooms[roomID]
if !ok {
roomsMu.RUnlock()
return
}
room.mu.Lock()
room.Tracks = append(room.Tracks, RoomTrack{
TrackID: localTrack.ID(),
OwnerPeer: publisherPeerID,
Local: localTrack,
})
peers := make([]Peer, len(room.Peers))
copy(peers, room.Peers)
roomsMu.RUnlock()
room.mu.Unlock()
for _, sub := range peers {
if sub.PeerID == publisherPeerID {
continue
}
sender, err := sub.PeerConnection.AddTrack(localTrack)
if err != nil {
// Не трогаем закрытые/failed соединения
if !isPeerConnectionAlive(sub.PeerConnection) {
fmt.Println("SetupForwardingForPeer: skipping dead peer:", sub.PeerID,
sub.PeerConnection.ConnectionState().String())
continue
}
// RTCP drain обязателен
sender, err := sub.PeerConnection.AddTrack(localTrack)
if err != nil {
fmt.Println("SetupForwardingForPeer: AddTrack error:", roomID, sub.PeerID, err)
continue
}
// RTCP drain
go func() {
buf := make([]byte, 1500)
for {
@@ -78,20 +219,30 @@ func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *
}
}()
// После AddTrack нужна renegotiation для подписчика
_ = renegotiatePeer(roomID, sub.PeerID, sub.PeerConnection)
if err = renegotiatePeer(roomID, sub.PeerID, sub.PeerConnection); err != nil {
fmt.Println("SetupForwardingForPeer: renegotiatePeer error:", roomID, sub.PeerID, err)
}
}
// Пересылаем RTP пакеты от издателя всем подписчикам
// Для video просим keyframe
if remote.Kind() == webrtc.RTPCodecTypeVideo {
_ = publisherPC.WriteRTCP([]rtcp.Packet{
&rtcp.PictureLossIndication{MediaSSRC: uint32(remote.SSRC())},
})
}
// Publisher RTP -> localTrack -> subscribers
for {
pkt, _, err := remote.ReadRTP()
if err != nil {
if err == io.EOF {
return
}
fmt.Println("SetupForwardingForPeer: ReadRTP error:", err)
return
}
if err = localTrack.WriteRTP(pkt); err != nil {
fmt.Println("SetupForwardingForPeer: WriteRTP error:", err)
return
}
}
@@ -103,8 +254,13 @@ func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error {
lock.Lock()
defer lock.Unlock()
// Не начинаем новую negotiation поверх текущей
if !isPeerConnectionAlive(pc) {
return nil
}
// Не начинаем новую negotiation поверх текущей.
if pc.SignalingState() != webrtc.SignalingStateStable {
setRenegPending(roomID, peerID, true)
return nil
}
@@ -132,6 +288,7 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate
return ErrRoomNotFound
}
room.mu.RLock()
var pc *webrtc.PeerConnection
for _, p := range room.Peers {
if p.PeerID == peerID {
@@ -139,13 +296,15 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate
break
}
}
room.mu.RUnlock()
if pc == nil {
return ErrPeerNotFound
}
rd := pc.RemoteDescription()
if rd == nil {
// answer/offer еще не применен — буферизуем
// offer/answer еще не применен — буферизуем
pendingMu.Lock()
k := peerKey(roomID, peerID)
pendingCandidates[k] = append(pendingCandidates[k], candidate)
@@ -153,7 +312,7 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate
return nil
}
// отбрасываем stale candidate по ufrag
// Отбрасываем stale candidate по ufrag
if candidate.UsernameFragment != nil {
current := extractICEUfrag(rd.SDP)
if current != "" && *candidate.UsernameFragment != current {
@@ -163,19 +322,23 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate
err := pc.AddICECandidate(candidate)
if err != nil && strings.Contains(err.Error(), "doesn't match the current ufrags") {
// поздний старый кандидат — игнорируем
return nil
}
return err
}
// Обрабатывает SDP ответ от клиента при renegotiation
// Обрабатывает SDP answer от клиента при renegotiation
func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescription) error {
if answer.Type != webrtc.SDPTypeAnswer {
return fmt.Errorf("invalid sdp type: %s", answer.Type.String())
}
room, exists := GetRoom(roomID)
if !exists {
return ErrRoomNotFound
}
room.mu.RLock()
var pc *webrtc.PeerConnection
for _, p := range room.Peers {
if p.PeerID == peerID {
@@ -183,6 +346,8 @@ func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescr
break
}
}
room.mu.RUnlock()
if pc == nil {
return ErrPeerNotFound
}
@@ -191,7 +356,7 @@ func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescr
return err
}
// после применения answer — применяем отложенные кандидаты
// После применения answer — применяем отложенные кандидаты.
k := peerKey(roomID, peerID)
pendingMu.Lock()
queue := pendingCandidates[k]
@@ -202,5 +367,21 @@ func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescr
_ = AddICECandidate(roomID, peerID, c)
}
// Если во время negotiation накопился новый запрос — запускаем еще цикл.
if popRenegPending(roomID, peerID) {
_ = renegotiatePeer(roomID, peerID, pc)
}
return nil
}
func cleanupForwardingState(roomID, peerID string) {
k := peerKey(roomID, peerID)
pendingMu.Lock()
delete(pendingCandidates, k)
pendingMu.Unlock()
renegPending.Delete(k)
renegMu.Delete(k)
}

View File

@@ -16,6 +16,12 @@ type Peer struct {
PeerConnection *webrtc.PeerConnection
}
type RoomTrack struct {
TrackID string
OwnerPeer string
Local *webrtc.TrackLocalStaticRTP
}
type Room struct {
//Уникальный идентификатор комнаты
RoomID string
@@ -23,6 +29,10 @@ type Room struct {
Server *connection.Connection
//Пиры которые подключились к комнате
Peers []Peer
Tracks []RoomTrack
mu sync.RWMutex
}
// Общие переменные
@@ -54,13 +64,6 @@ func GetRoom(roomID string) (*Room, bool) {
return room, exists
}
// DeleteRoom удаляет комнату по идентификатору
func DeleteRoom(roomID string) {
roomsMu.Lock()
defer roomsMu.Unlock()
delete(rooms, roomID)
}
// JoinWithOffer позволяет пиру присоединиться к комнате с помощью SDP оффера
func JoinWithOffer(roomID string, peerID string, offer webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
room, exists := GetRoom(roomID)
@@ -73,43 +76,96 @@ func JoinWithOffer(roomID string, peerID string, offer webrtc.SessionDescription
return nil, err
}
// SFU локальные ICE-кандидаты отправляем сначала бекенду затем тот их
// пересылает клиенту для установления соединения
peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) {
if c == nil {
return // gathering finished
return
}
if OnLocalICECandidate != nil {
OnLocalICECandidate(roomID, peerID, c.ToJSON())
}
})
err = peerConnection.SetRemoteDescription(offer)
if err != nil {
BindPeerLifecycle(roomID, peerID, peerConnection)
SetupForwardingForPeer(roomID, peerID, peerConnection)
room.mu.RLock()
existingTracks := make([]RoomTrack, len(room.Tracks))
copy(existingTracks, room.Tracks)
room.mu.RUnlock()
for _, t := range existingTracks {
if t.OwnerPeer == peerID {
continue
}
sender, err := peerConnection.AddTrack(t.Local)
if err != nil {
continue
}
go func() {
buf := make([]byte, 1500)
for {
if _, _, e := sender.Read(buf); e != nil {
return
}
}
}()
}
if err = peerConnection.SetRemoteDescription(offer); err != nil {
_ = peerConnection.Close()
return nil, err
}
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
_ = peerConnection.Close()
return nil, err
}
err = peerConnection.SetLocalDescription(answer)
if err != nil {
gatherDone := webrtc.GatheringCompletePromise(peerConnection)
if err = peerConnection.SetLocalDescription(answer); err != nil {
_ = peerConnection.Close()
return nil, err
}
<-gatherDone
// Настраиваем пересылку RTP пакетов от издателя к другим участникам комнаты
SetupForwardingForPeer(roomID, peerID, peerConnection)
room.mu.Lock()
room.Peers = append(room.Peers, Peer{
PeerID: peerID,
PeerConnection: peerConnection,
})
room.mu.Unlock()
return peerConnection.LocalDescription(), nil
}
func DeleteRoom(roomID string) error {
roomsMu.Lock()
room, exists := rooms[roomID]
if !exists {
roomsMu.Unlock()
return ErrRoomNotFound
}
delete(rooms, roomID)
roomsMu.Unlock()
room.mu.Lock()
peers := make([]Peer, len(room.Peers))
copy(peers, room.Peers)
room.Peers = nil
room.Tracks = nil
room.mu.Unlock()
for _, p := range peers {
_ = p.PeerConnection.Close()
cleanupForwardingState(roomID, p.PeerID)
}
return nil
}
// LeaveRoom позволяет пиру покинуть комнату
func LeaveRoom(roomID string, peerID string) error {
room, exists := GetRoom(roomID)
@@ -117,12 +173,59 @@ func LeaveRoom(roomID string, peerID string) error {
return ErrRoomNotFound
}
for i, peer := range room.Peers {
if peer.PeerID == peerID {
peer.PeerConnection.Close()
room.Peers = append(room.Peers[:i], room.Peers[i+1:]...)
break
var (
removedPC *webrtc.PeerConnection
removed bool
shouldDrop bool
)
room.mu.Lock()
// удаляем peer
nextPeers := make([]Peer, 0, len(room.Peers))
for _, p := range room.Peers {
if p.PeerID == peerID {
removedPC = p.PeerConnection
removed = true
continue
}
nextPeers = append(nextPeers, p)
}
room.Peers = nextPeers
// удаляем треки этого publisher
nextTracks := room.Tracks[:0]
for _, t := range room.Tracks {
if t.OwnerPeer != peerID {
nextTracks = append(nextTracks, t)
}
}
room.Tracks = nextTracks
shouldDrop = len(room.Peers) == 0
room.mu.Unlock()
if !removed {
return ErrPeerNotFound
}
if removedPC != nil {
_ = removedPC.Close()
}
cleanupForwardingState(roomID, peerID)
// Комната пустая -> удаляем
if shouldDrop {
return DeleteRoom(roomID)
}
// Опционально: renegotiation оставшимся peer после удаления треков/peer
room.mu.RLock()
rest := make([]Peer, len(room.Peers))
copy(rest, room.Peers)
room.mu.RUnlock()
for _, p := range rest {
_ = renegotiatePeer(roomID, p.PeerID, p.PeerConnection)
}
return nil

View File

@@ -20,6 +20,12 @@ var OnServerOffer func(roomID string, peerID string, offer webrtc.SessionDescrip
// Коллбек для обработки новых ICE кандидатов
var OnLocalICECandidate func(roomID, peerID string, candidate webrtc.ICECandidateInit)
// Коллбек для обработки отключения пира (обрыв связи)
var OnPeerDisconnected func(roomID, peerID string)
// Коллбек для обработки удаления комнаты
var OnRoomDelete func(roomID string)
// Ошибки
var (
ErrRoomNotFound = errors.New("room not found")

View File

@@ -204,10 +204,10 @@ func processData(data <-chan []byte, connection *connection.Connection) {
}
}
//Check life для проверки соединения с сервером SFU
if packetId == 0x08 {
if packetId == 0xAE {
// Подготовка ответа для клиента о том, что соединение живо
response := bytebuffer.Allocate(1)
response.Put(0x08)
response.Put(0xAE)
response.Flip()
// Отправляем ответ клиенту
connection.WriteBinary(response.Bytes())