Compare commits

..

5 Commits

Author SHA1 Message Date
set
9a33235c7e Автоматическое удаление комнат при неактивности
All checks were successful
Build G365SFU / build (push) Successful in 10m16s
2026-04-02 18:02:41 +02:00
set
feb4a51ab0 Буферы RTP пакетов для сокращения задержек если downstream медленный
All checks were successful
Build G365SFU / build (push) Successful in 12m1s
2026-03-20 23:19:22 +02:00
set
8c386eb42a Фикс Go версии
All checks were successful
Build G365SFU / build (push) Successful in 13m19s
2026-03-20 20:25:35 +02:00
set
027626eb2c Фикс CI/CD
Some checks failed
Build G365SFU / build (push) Failing after 6m47s
2026-03-20 20:13:32 +02:00
set
1cbe48d327 Добавлена система сборки и исправлены ошибки
Some checks failed
Build G365SFU / build (push) Failing after 46s
2026-03-20 19:26:13 +02:00
4 changed files with 161 additions and 32 deletions

4
.env
View File

@@ -9,7 +9,7 @@ PORT=1001
# SFU SECTION # # SFU SECTION #
############### ###############
#Публичный IP адрес, который будет использоваться для ICE кандидатов (если SFU работает за NAT) #Публичный IP адрес, который будет использоваться для ICE кандидатов (если SFU работает за NAT)
SFU_PUBLIC_IP=192.168.6.82 SFU_PUBLIC_IP=10.211.55.2
#Диапазон портов для ICE кандидатов #Диапазон портов для ICE кандидатов
SFU_PORT_RANGE_FROM=30000 SFU_PORT_RANGE_FROM=30000
SFU_PORT_RANGE_TO=39999 SFU_PORT_RANGE_TO=39999
@@ -19,7 +19,7 @@ SFU_PORT_RANGE_TO=39999
#Разрешить использовать этот SFU как TURN сервер тоже (для ретрансляции медиа трафика на сам SFU) #Разрешить использовать этот SFU как TURN сервер тоже (для ретрансляции медиа трафика на сам SFU)
TURN_ALLOW=true TURN_ALLOW=true
#TURN имя пользователя и пароль для аутентификации на TURN сервере (если используется) #TURN имя пользователя и пароль для аутентификации на TURN сервере (если используется)
TURN_PUBLIC_IP=192.168.6.82 TURN_PUBLIC_IP=10.211.55.2
TURN_USER=user TURN_USER=user
TURN_PASS=pass TURN_PASS=pass
#Диапазон занимемых TURN сервером портов (tcp/udp) #Диапазон занимемых TURN сервером портов (tcp/udp)

View File

@@ -1,4 +1,5 @@
name: Build G365SFU name: Build G365SFU
run-name: Build and Deploy G365SFU
on: on:
push: push:
branches: [ main ] branches: [ main ]
@@ -8,19 +9,12 @@ jobs:
build: build:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Install Node.js
run: |
if command -v apt-get &> /dev/null; then
sudo apt-get update && sudo apt-get install -y nodejs npm
elif command -v brew &> /dev/null; then
brew install node
fi
- name: Checkout code - name: Checkout code
uses: actions/checkout@v6 uses: actions/checkout@v6
- name: Set up Go - name: Set up Go
uses: actions/setup-go@v4 uses: actions/setup-go@v4
with: with:
go-version: 1.20 go-version: 1.24
- name: Build G365SFU - name: Build G365SFU
run: | run: |
make build make build
@@ -37,3 +31,14 @@ jobs:
rm: false rm: false
debug: true debug: true
flatten: true flatten: true
- name: Restart Docker containers
uses: appleboy/ssh-action@v1.2.5
with:
host: ${{ secrets.SFU_SSH_HOST }}
username: ${{ secrets.SFU_SSH_USER }}
password: ${{ secrets.SFU_SSH_PASSWORD }}
port: ${{ secrets.SFU_SSH_PORT }}
script: |
cd ${{ secrets.SFU_DEPLOY_PATH }}
docker-compose down
docker compose up -d --build

View File

@@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/pion/rtcp" "github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4" "github.com/pion/webrtc/v4"
) )
@@ -20,7 +21,13 @@ var (
renegPending sync.Map // key -> bool renegPending sync.Map // key -> bool
) )
const disconnectGracePeriod = 12 * time.Second const (
disconnectGracePeriod = 30 * time.Second
outboundPacketBuffer = 256
maxConsecutiveReadErrs = 25
maxConsecutiveWriteErrs = 50
packetDropLogEvery = 100
)
func peerKey(roomID, peerID string) string { func peerKey(roomID, peerID string) string {
return roomID + "|" + peerID return roomID + "|" + peerID
@@ -64,6 +71,7 @@ func removeRoomTrack(roomID, trackID string) {
if !ok { if !ok {
return return
} }
room.mu.Lock() room.mu.Lock()
defer room.mu.Unlock() defer room.mu.Unlock()
@@ -79,8 +87,7 @@ func removeRoomTrack(roomID, trackID string) {
func isPeerConnectionAlive(pc *webrtc.PeerConnection) bool { func isPeerConnectionAlive(pc *webrtc.PeerConnection) bool {
state := pc.ConnectionState() state := pc.ConnectionState()
return state != webrtc.PeerConnectionStateClosed && return state != webrtc.PeerConnectionStateClosed &&
state != webrtc.PeerConnectionStateFailed && state != webrtc.PeerConnectionStateFailed
state != webrtc.PeerConnectionStateDisconnected
} }
// Вешается на каждый PeerConnection при JoinWithOffer. // Вешается на каждый PeerConnection при JoinWithOffer.
@@ -103,6 +110,7 @@ func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) {
cancelTimer := func() { cancelTimer := func() {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
if timer != nil { if timer != nil {
timer.Stop() timer.Stop()
timer = nil timer = nil
@@ -123,9 +131,11 @@ func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) {
startTimer := func() { startTimer := func() {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
if disconnecting { if disconnecting {
return return
} }
disconnecting = true disconnecting = true
timer = time.AfterFunc(disconnectGracePeriod, func() { timer = time.AfterFunc(disconnectGracePeriod, func() {
state := pc.ConnectionState() state := pc.ConnectionState()
@@ -166,8 +176,7 @@ func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) {
}) })
} }
// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты // Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты.
// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты
func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) { func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) {
publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
localTrackID := fmt.Sprintf("%s:%s:%s:%d", localTrackID := fmt.Sprintf("%s:%s:%s:%d",
@@ -232,33 +241,100 @@ func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *
subPC := sub.PeerConnection subPC := sub.PeerConnection
go func() { go func() {
logger.LogInfoMessage("SetupForwardingForPeer: starting renegotiation for peer=" + subID)
if err := renegotiatePeer(roomID, subID, subPC); err != nil { if err := renegotiatePeer(roomID, subID, subPC); err != nil {
logger.LogWarnMessage("SetupForwardingForPeer: renegotiatePeer error: " + subID + " " + err.Error()) logger.LogWarnMessage("SetupForwardingForPeer: renegotiatePeer error: " + subID + " " + err.Error())
} }
}() }()
} }
// Для video просим keyframe
if remote.Kind() == webrtc.RTPCodecTypeVideo { if remote.Kind() == webrtc.RTPCodecTypeVideo {
_ = publisherPC.WriteRTCP([]rtcp.Packet{ _ = publisherPC.WriteRTCP([]rtcp.Packet{
&rtcp.PictureLossIndication{MediaSSRC: uint32(remote.SSRC())}, &rtcp.PictureLossIndication{MediaSSRC: uint32(remote.SSRC())},
}) })
} }
// Publisher RTP -> localTrack -> subscribers // Отделяем чтение входящего RTP от записи в TrackLocal bounded-очередью.
// Если downstream начинает тормозить, пакеты дропаются из очереди,
// а не накапливают бесконечную задержку.
packetQueue := make(chan *rtp.Packet, outboundPacketBuffer)
writerDone := make(chan struct{})
defer close(packetQueue)
go func() {
defer close(writerDone)
consecutiveWriteErrs := 0
for pkt := range packetQueue {
if err := localTrack.WriteRTP(pkt); err != nil {
consecutiveWriteErrs++
if consecutiveWriteErrs == 1 || consecutiveWriteErrs%10 == 0 {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: WriteRTP error publisher=%s track=%s count=%d err=%v",
publisherPeerID, localTrack.ID(), consecutiveWriteErrs, err,
))
}
if consecutiveWriteErrs >= maxConsecutiveWriteErrs {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: too many WriteRTP errors publisher=%s track=%s",
publisherPeerID, localTrack.ID(),
))
return
}
continue
}
consecutiveWriteErrs = 0
}
}()
consecutiveReadErrs := 0
droppedPackets := 0
for { for {
pkt, _, err := remote.ReadRTP() pkt, _, err := remote.ReadRTP()
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
return return
} }
logger.LogWarnMessage("SetupForwardingForPeer: ReadRTP error: " + err.Error())
consecutiveReadErrs++
if consecutiveReadErrs == 1 || consecutiveReadErrs%10 == 0 {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: ReadRTP error publisher=%s track=%s count=%d err=%v",
publisherPeerID, localTrack.ID(), consecutiveReadErrs, err,
))
}
if consecutiveReadErrs >= maxConsecutiveReadErrs {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: too many ReadRTP errors publisher=%s track=%s",
publisherPeerID, localTrack.ID(),
))
return return
} }
if err = localTrack.WriteRTP(pkt); err != nil {
logger.LogWarnMessage("SetupForwardingForPeer: WriteRTP error: " + err.Error()) time.Sleep(10 * time.Millisecond)
continue
}
consecutiveReadErrs = 0
select {
case <-writerDone:
return return
default:
}
select {
case <-writerDone:
return
case packetQueue <- pkt:
default:
droppedPackets++
if droppedPackets == 1 || droppedPackets%packetDropLogEvery == 0 {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: packet queue overflow publisher=%s track=%s dropped=%d",
publisherPeerID, localTrack.ID(), droppedPackets,
))
}
} }
} }
}) })
@@ -273,7 +349,6 @@ func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error {
return nil return nil
} }
// Не начинаем новую negotiation поверх текущей.
if pc.SignalingState() != webrtc.SignalingStateStable { if pc.SignalingState() != webrtc.SignalingStateStable {
setRenegPending(roomID, peerID, true) setRenegPending(roomID, peerID, true)
return nil return nil
@@ -296,7 +371,7 @@ func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error {
return nil return nil
} }
// Добавляет ICE-кандидата к пиру // Добавляет ICE-кандидата к пиру.
func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidateInit) error { func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidateInit) error {
room, exists := GetRoom(roomID) room, exists := GetRoom(roomID)
if !exists { if !exists {
@@ -319,7 +394,6 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate
rd := pc.RemoteDescription() rd := pc.RemoteDescription()
if rd == nil { if rd == nil {
// offer/answer еще не применен — буферизуем
pendingMu.Lock() pendingMu.Lock()
k := peerKey(roomID, peerID) k := peerKey(roomID, peerID)
pendingCandidates[k] = append(pendingCandidates[k], candidate) pendingCandidates[k] = append(pendingCandidates[k], candidate)
@@ -327,7 +401,6 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate
return nil return nil
} }
// Отбрасываем stale candidate по ufrag
if candidate.UsernameFragment != nil { if candidate.UsernameFragment != nil {
current := extractICEUfrag(rd.SDP) current := extractICEUfrag(rd.SDP)
if current != "" && *candidate.UsernameFragment != current { if current != "" && *candidate.UsernameFragment != current {
@@ -342,7 +415,7 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate
return err return err
} }
// Обрабатывает SDP answer от клиента при renegotiation // Обрабатывает SDP answer от клиента при renegotiation.
func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescription) error { func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescription) error {
if answer.Type != webrtc.SDPTypeAnswer { if answer.Type != webrtc.SDPTypeAnswer {
return fmt.Errorf("invalid sdp type: %s", answer.Type.String()) return fmt.Errorf("invalid sdp type: %s", answer.Type.String())
@@ -371,7 +444,6 @@ func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescr
return err return err
} }
// После применения answer — применяем отложенные кандидаты.
k := peerKey(roomID, peerID) k := peerKey(roomID, peerID)
pendingMu.Lock() pendingMu.Lock()
queue := pendingCandidates[k] queue := pendingCandidates[k]
@@ -382,7 +454,6 @@ func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescr
_ = AddICECandidate(roomID, peerID, c) _ = AddICECandidate(roomID, peerID, c)
} }
// Если во время negotiation накопился новый запрос — запускаем еще цикл.
if popRenegPending(roomID, peerID) { if popRenegPending(roomID, peerID) {
_ = renegotiatePeer(roomID, peerID, pc) _ = renegotiatePeer(roomID, peerID, pc)
} }

View File

@@ -4,6 +4,7 @@ import (
"g365sfu/logger" "g365sfu/logger"
connection "g365sfu/socket/struct" connection "g365sfu/socket/struct"
"sync" "sync"
"time"
"github.com/pion/webrtc/v4" "github.com/pion/webrtc/v4"
) )
@@ -34,6 +35,8 @@ type Room struct {
Tracks []RoomTrack Tracks []RoomTrack
mu sync.RWMutex mu sync.RWMutex
emptyTimer *time.Timer
} }
// Общие переменные // Общие переменные
@@ -42,6 +45,46 @@ var (
roomsMu sync.RWMutex roomsMu sync.RWMutex
) )
const emptyRoomTTL = 30 * time.Second
func scheduleEmptyRoomDeletion(room *Room) {
room.mu.Lock()
if len(room.Peers) > 0 {
if room.emptyTimer != nil {
room.emptyTimer.Stop()
room.emptyTimer = nil
}
room.mu.Unlock()
return
}
if room.emptyTimer != nil {
room.mu.Unlock()
return
}
roomID := room.RoomID
var timer *time.Timer
timer = time.AfterFunc(emptyRoomTTL, func() {
currentRoom, exists := GetRoom(roomID)
if !exists {
return
}
currentRoom.mu.Lock()
if currentRoom.emptyTimer != timer || len(currentRoom.Peers) > 0 {
currentRoom.mu.Unlock()
return
}
currentRoom.emptyTimer = nil
currentRoom.mu.Unlock()
if err := DeleteRoom(roomID); err != nil && err != ErrRoomNotFound {
logger.LogWarnMessage("scheduleEmptyRoomDeletion: failed to delete room " + roomID + ": " + err.Error())
}
})
room.emptyTimer = timer
room.mu.Unlock()
}
// CreateRoom создает комнату // CreateRoom создает комнату
func CreateRoom(server *connection.Connection, roomID string) (*Room, error) { func CreateRoom(server *connection.Connection, roomID string) (*Room, error) {
roomsMu.Lock() roomsMu.Lock()
@@ -53,6 +96,7 @@ func CreateRoom(server *connection.Connection, roomID string) (*Room, error) {
Peers: []Peer{}, Peers: []Peer{},
} }
rooms[roomID] = room rooms[roomID] = room
scheduleEmptyRoomDeletion(room)
return room, nil return room, nil
} }
@@ -110,6 +154,10 @@ func JoinWithOffer(roomID string, peerID string, offer webrtc.SessionDescription
// Добавляем peer в комнату и сразу снимаем snapshot существующих треков // Добавляем peer в комнату и сразу снимаем snapshot существующих треков
// в одном локе — чтобы не было race с OnTrack // в одном локе — чтобы не было race с OnTrack
room.mu.Lock() room.mu.Lock()
if room.emptyTimer != nil {
room.emptyTimer.Stop()
room.emptyTimer = nil
}
room.Peers = append(room.Peers, Peer{ room.Peers = append(room.Peers, Peer{
PeerID: peerID, PeerID: peerID,
PeerConnection: peerConnection, PeerConnection: peerConnection,
@@ -164,6 +212,10 @@ func DeleteRoom(roomID string) error {
roomsMu.Unlock() roomsMu.Unlock()
room.mu.Lock() room.mu.Lock()
if room.emptyTimer != nil {
room.emptyTimer.Stop()
room.emptyTimer = nil
}
peers := make([]Peer, len(room.Peers)) peers := make([]Peer, len(room.Peers))
copy(peers, room.Peers) copy(peers, room.Peers)
room.Peers = nil room.Peers = nil
@@ -227,9 +279,10 @@ func LeaveRoom(roomID string, peerID string) error {
} }
cleanupForwardingState(roomID, peerID) cleanupForwardingState(roomID, peerID)
// Комната пустая -> удаляем // Комната пустая -> планируем удаление через TTL
if shouldDrop { if shouldDrop {
return DeleteRoom(roomID) scheduleEmptyRoomDeletion(room)
return nil
} }
// renegotiation оставшимся peer после удаления треков/peer // renegotiation оставшимся peer после удаления треков/peer