Compare commits
5 Commits
625a7acfb7
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9a33235c7e | ||
|
|
feb4a51ab0 | ||
|
|
8c386eb42a | ||
|
|
027626eb2c | ||
|
|
1cbe48d327 |
4
.env
4
.env
@@ -9,7 +9,7 @@ PORT=1001
|
||||
# SFU SECTION #
|
||||
###############
|
||||
#Публичный IP адрес, который будет использоваться для ICE кандидатов (если SFU работает за NAT)
|
||||
SFU_PUBLIC_IP=192.168.6.82
|
||||
SFU_PUBLIC_IP=10.211.55.2
|
||||
#Диапазон портов для ICE кандидатов
|
||||
SFU_PORT_RANGE_FROM=30000
|
||||
SFU_PORT_RANGE_TO=39999
|
||||
@@ -19,7 +19,7 @@ SFU_PORT_RANGE_TO=39999
|
||||
#Разрешить использовать этот SFU как TURN сервер тоже (для ретрансляции медиа трафика на сам SFU)
|
||||
TURN_ALLOW=true
|
||||
#TURN имя пользователя и пароль для аутентификации на TURN сервере (если используется)
|
||||
TURN_PUBLIC_IP=192.168.6.82
|
||||
TURN_PUBLIC_IP=10.211.55.2
|
||||
TURN_USER=user
|
||||
TURN_PASS=pass
|
||||
#Диапазон занимемых TURN сервером портов (tcp/udp)
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
name: Build G365SFU
|
||||
run-name: Build and Deploy G365SFU
|
||||
on:
|
||||
push:
|
||||
branches: [ main ]
|
||||
@@ -8,19 +9,12 @@ jobs:
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Install Node.js
|
||||
run: |
|
||||
if command -v apt-get &> /dev/null; then
|
||||
sudo apt-get update && sudo apt-get install -y nodejs npm
|
||||
elif command -v brew &> /dev/null; then
|
||||
brew install node
|
||||
fi
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v6
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v4
|
||||
with:
|
||||
go-version: 1.20
|
||||
go-version: 1.24
|
||||
- name: Build G365SFU
|
||||
run: |
|
||||
make build
|
||||
@@ -36,4 +30,15 @@ jobs:
|
||||
strip_components: 1
|
||||
rm: false
|
||||
debug: true
|
||||
flatten: true
|
||||
flatten: true
|
||||
- name: Restart Docker containers
|
||||
uses: appleboy/ssh-action@v1.2.5
|
||||
with:
|
||||
host: ${{ secrets.SFU_SSH_HOST }}
|
||||
username: ${{ secrets.SFU_SSH_USER }}
|
||||
password: ${{ secrets.SFU_SSH_PASSWORD }}
|
||||
port: ${{ secrets.SFU_SSH_PORT }}
|
||||
script: |
|
||||
cd ${{ secrets.SFU_DEPLOY_PATH }}
|
||||
docker-compose down
|
||||
docker compose up -d --build
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/pion/rtcp"
|
||||
"github.com/pion/rtp"
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
@@ -20,7 +21,13 @@ var (
|
||||
renegPending sync.Map // key -> bool
|
||||
)
|
||||
|
||||
const disconnectGracePeriod = 12 * time.Second
|
||||
const (
|
||||
disconnectGracePeriod = 30 * time.Second
|
||||
outboundPacketBuffer = 256
|
||||
maxConsecutiveReadErrs = 25
|
||||
maxConsecutiveWriteErrs = 50
|
||||
packetDropLogEvery = 100
|
||||
)
|
||||
|
||||
func peerKey(roomID, peerID string) string {
|
||||
return roomID + "|" + peerID
|
||||
@@ -64,6 +71,7 @@ func removeRoomTrack(roomID, trackID string) {
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
room.mu.Lock()
|
||||
defer room.mu.Unlock()
|
||||
|
||||
@@ -79,8 +87,7 @@ func removeRoomTrack(roomID, trackID string) {
|
||||
func isPeerConnectionAlive(pc *webrtc.PeerConnection) bool {
|
||||
state := pc.ConnectionState()
|
||||
return state != webrtc.PeerConnectionStateClosed &&
|
||||
state != webrtc.PeerConnectionStateFailed &&
|
||||
state != webrtc.PeerConnectionStateDisconnected
|
||||
state != webrtc.PeerConnectionStateFailed
|
||||
}
|
||||
|
||||
// Вешается на каждый PeerConnection при JoinWithOffer.
|
||||
@@ -103,6 +110,7 @@ func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) {
|
||||
cancelTimer := func() {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
if timer != nil {
|
||||
timer.Stop()
|
||||
timer = nil
|
||||
@@ -123,9 +131,11 @@ func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) {
|
||||
startTimer := func() {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
if disconnecting {
|
||||
return
|
||||
}
|
||||
|
||||
disconnecting = true
|
||||
timer = time.AfterFunc(disconnectGracePeriod, func() {
|
||||
state := pc.ConnectionState()
|
||||
@@ -166,8 +176,7 @@ func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) {
|
||||
})
|
||||
}
|
||||
|
||||
// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты
|
||||
// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты
|
||||
// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты.
|
||||
func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) {
|
||||
publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
|
||||
localTrackID := fmt.Sprintf("%s:%s:%s:%d",
|
||||
@@ -232,33 +241,100 @@ func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *
|
||||
subPC := sub.PeerConnection
|
||||
|
||||
go func() {
|
||||
logger.LogInfoMessage("SetupForwardingForPeer: starting renegotiation for peer=" + subID)
|
||||
if err := renegotiatePeer(roomID, subID, subPC); err != nil {
|
||||
logger.LogWarnMessage("SetupForwardingForPeer: renegotiatePeer error: " + subID + " " + err.Error())
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Для video просим keyframe
|
||||
if remote.Kind() == webrtc.RTPCodecTypeVideo {
|
||||
_ = publisherPC.WriteRTCP([]rtcp.Packet{
|
||||
&rtcp.PictureLossIndication{MediaSSRC: uint32(remote.SSRC())},
|
||||
})
|
||||
}
|
||||
|
||||
// Publisher RTP -> localTrack -> subscribers
|
||||
// Отделяем чтение входящего RTP от записи в TrackLocal bounded-очередью.
|
||||
// Если downstream начинает тормозить, пакеты дропаются из очереди,
|
||||
// а не накапливают бесконечную задержку.
|
||||
packetQueue := make(chan *rtp.Packet, outboundPacketBuffer)
|
||||
writerDone := make(chan struct{})
|
||||
defer close(packetQueue)
|
||||
|
||||
go func() {
|
||||
defer close(writerDone)
|
||||
|
||||
consecutiveWriteErrs := 0
|
||||
for pkt := range packetQueue {
|
||||
if err := localTrack.WriteRTP(pkt); err != nil {
|
||||
consecutiveWriteErrs++
|
||||
if consecutiveWriteErrs == 1 || consecutiveWriteErrs%10 == 0 {
|
||||
logger.LogWarnMessage(fmt.Sprintf(
|
||||
"SetupForwardingForPeer: WriteRTP error publisher=%s track=%s count=%d err=%v",
|
||||
publisherPeerID, localTrack.ID(), consecutiveWriteErrs, err,
|
||||
))
|
||||
}
|
||||
if consecutiveWriteErrs >= maxConsecutiveWriteErrs {
|
||||
logger.LogWarnMessage(fmt.Sprintf(
|
||||
"SetupForwardingForPeer: too many WriteRTP errors publisher=%s track=%s",
|
||||
publisherPeerID, localTrack.ID(),
|
||||
))
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
consecutiveWriteErrs = 0
|
||||
}
|
||||
}()
|
||||
|
||||
consecutiveReadErrs := 0
|
||||
droppedPackets := 0
|
||||
|
||||
for {
|
||||
pkt, _, err := remote.ReadRTP()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
return
|
||||
}
|
||||
logger.LogWarnMessage("SetupForwardingForPeer: ReadRTP error: " + err.Error())
|
||||
return
|
||||
|
||||
consecutiveReadErrs++
|
||||
if consecutiveReadErrs == 1 || consecutiveReadErrs%10 == 0 {
|
||||
logger.LogWarnMessage(fmt.Sprintf(
|
||||
"SetupForwardingForPeer: ReadRTP error publisher=%s track=%s count=%d err=%v",
|
||||
publisherPeerID, localTrack.ID(), consecutiveReadErrs, err,
|
||||
))
|
||||
}
|
||||
if consecutiveReadErrs >= maxConsecutiveReadErrs {
|
||||
logger.LogWarnMessage(fmt.Sprintf(
|
||||
"SetupForwardingForPeer: too many ReadRTP errors publisher=%s track=%s",
|
||||
publisherPeerID, localTrack.ID(),
|
||||
))
|
||||
return
|
||||
}
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
if err = localTrack.WriteRTP(pkt); err != nil {
|
||||
logger.LogWarnMessage("SetupForwardingForPeer: WriteRTP error: " + err.Error())
|
||||
|
||||
consecutiveReadErrs = 0
|
||||
|
||||
select {
|
||||
case <-writerDone:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
select {
|
||||
case <-writerDone:
|
||||
return
|
||||
case packetQueue <- pkt:
|
||||
default:
|
||||
droppedPackets++
|
||||
if droppedPackets == 1 || droppedPackets%packetDropLogEvery == 0 {
|
||||
logger.LogWarnMessage(fmt.Sprintf(
|
||||
"SetupForwardingForPeer: packet queue overflow publisher=%s track=%s dropped=%d",
|
||||
publisherPeerID, localTrack.ID(), droppedPackets,
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
@@ -273,7 +349,6 @@ func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Не начинаем новую negotiation поверх текущей.
|
||||
if pc.SignalingState() != webrtc.SignalingStateStable {
|
||||
setRenegPending(roomID, peerID, true)
|
||||
return nil
|
||||
@@ -296,7 +371,7 @@ func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Добавляет ICE-кандидата к пиру
|
||||
// Добавляет ICE-кандидата к пиру.
|
||||
func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidateInit) error {
|
||||
room, exists := GetRoom(roomID)
|
||||
if !exists {
|
||||
@@ -319,7 +394,6 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate
|
||||
|
||||
rd := pc.RemoteDescription()
|
||||
if rd == nil {
|
||||
// offer/answer еще не применен — буферизуем
|
||||
pendingMu.Lock()
|
||||
k := peerKey(roomID, peerID)
|
||||
pendingCandidates[k] = append(pendingCandidates[k], candidate)
|
||||
@@ -327,7 +401,6 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate
|
||||
return nil
|
||||
}
|
||||
|
||||
// Отбрасываем stale candidate по ufrag
|
||||
if candidate.UsernameFragment != nil {
|
||||
current := extractICEUfrag(rd.SDP)
|
||||
if current != "" && *candidate.UsernameFragment != current {
|
||||
@@ -342,7 +415,7 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate
|
||||
return err
|
||||
}
|
||||
|
||||
// Обрабатывает SDP answer от клиента при renegotiation
|
||||
// Обрабатывает SDP answer от клиента при renegotiation.
|
||||
func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescription) error {
|
||||
if answer.Type != webrtc.SDPTypeAnswer {
|
||||
return fmt.Errorf("invalid sdp type: %s", answer.Type.String())
|
||||
@@ -371,7 +444,6 @@ func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescr
|
||||
return err
|
||||
}
|
||||
|
||||
// После применения answer — применяем отложенные кандидаты.
|
||||
k := peerKey(roomID, peerID)
|
||||
pendingMu.Lock()
|
||||
queue := pendingCandidates[k]
|
||||
@@ -382,7 +454,6 @@ func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescr
|
||||
_ = AddICECandidate(roomID, peerID, c)
|
||||
}
|
||||
|
||||
// Если во время negotiation накопился новый запрос — запускаем еще цикл.
|
||||
if popRenegPending(roomID, peerID) {
|
||||
_ = renegotiatePeer(roomID, peerID, pc)
|
||||
}
|
||||
|
||||
57
sfu/rooms.go
57
sfu/rooms.go
@@ -4,6 +4,7 @@ import (
|
||||
"g365sfu/logger"
|
||||
connection "g365sfu/socket/struct"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
@@ -34,6 +35,8 @@ type Room struct {
|
||||
Tracks []RoomTrack
|
||||
|
||||
mu sync.RWMutex
|
||||
|
||||
emptyTimer *time.Timer
|
||||
}
|
||||
|
||||
// Общие переменные
|
||||
@@ -42,6 +45,46 @@ var (
|
||||
roomsMu sync.RWMutex
|
||||
)
|
||||
|
||||
const emptyRoomTTL = 30 * time.Second
|
||||
|
||||
func scheduleEmptyRoomDeletion(room *Room) {
|
||||
room.mu.Lock()
|
||||
if len(room.Peers) > 0 {
|
||||
if room.emptyTimer != nil {
|
||||
room.emptyTimer.Stop()
|
||||
room.emptyTimer = nil
|
||||
}
|
||||
room.mu.Unlock()
|
||||
return
|
||||
}
|
||||
if room.emptyTimer != nil {
|
||||
room.mu.Unlock()
|
||||
return
|
||||
}
|
||||
roomID := room.RoomID
|
||||
var timer *time.Timer
|
||||
timer = time.AfterFunc(emptyRoomTTL, func() {
|
||||
currentRoom, exists := GetRoom(roomID)
|
||||
if !exists {
|
||||
return
|
||||
}
|
||||
|
||||
currentRoom.mu.Lock()
|
||||
if currentRoom.emptyTimer != timer || len(currentRoom.Peers) > 0 {
|
||||
currentRoom.mu.Unlock()
|
||||
return
|
||||
}
|
||||
currentRoom.emptyTimer = nil
|
||||
currentRoom.mu.Unlock()
|
||||
|
||||
if err := DeleteRoom(roomID); err != nil && err != ErrRoomNotFound {
|
||||
logger.LogWarnMessage("scheduleEmptyRoomDeletion: failed to delete room " + roomID + ": " + err.Error())
|
||||
}
|
||||
})
|
||||
room.emptyTimer = timer
|
||||
room.mu.Unlock()
|
||||
}
|
||||
|
||||
// CreateRoom создает комнату
|
||||
func CreateRoom(server *connection.Connection, roomID string) (*Room, error) {
|
||||
roomsMu.Lock()
|
||||
@@ -53,6 +96,7 @@ func CreateRoom(server *connection.Connection, roomID string) (*Room, error) {
|
||||
Peers: []Peer{},
|
||||
}
|
||||
rooms[roomID] = room
|
||||
scheduleEmptyRoomDeletion(room)
|
||||
|
||||
return room, nil
|
||||
}
|
||||
@@ -110,6 +154,10 @@ func JoinWithOffer(roomID string, peerID string, offer webrtc.SessionDescription
|
||||
// Добавляем peer в комнату и сразу снимаем snapshot существующих треков
|
||||
// в одном локе — чтобы не было race с OnTrack
|
||||
room.mu.Lock()
|
||||
if room.emptyTimer != nil {
|
||||
room.emptyTimer.Stop()
|
||||
room.emptyTimer = nil
|
||||
}
|
||||
room.Peers = append(room.Peers, Peer{
|
||||
PeerID: peerID,
|
||||
PeerConnection: peerConnection,
|
||||
@@ -164,6 +212,10 @@ func DeleteRoom(roomID string) error {
|
||||
roomsMu.Unlock()
|
||||
|
||||
room.mu.Lock()
|
||||
if room.emptyTimer != nil {
|
||||
room.emptyTimer.Stop()
|
||||
room.emptyTimer = nil
|
||||
}
|
||||
peers := make([]Peer, len(room.Peers))
|
||||
copy(peers, room.Peers)
|
||||
room.Peers = nil
|
||||
@@ -227,9 +279,10 @@ func LeaveRoom(roomID string, peerID string) error {
|
||||
}
|
||||
cleanupForwardingState(roomID, peerID)
|
||||
|
||||
// Комната пустая -> удаляем
|
||||
// Комната пустая -> планируем удаление через TTL
|
||||
if shouldDrop {
|
||||
return DeleteRoom(roomID)
|
||||
scheduleEmptyRoomDeletion(room)
|
||||
return nil
|
||||
}
|
||||
|
||||
// renegotiation оставшимся peer после удаления треков/peer
|
||||
|
||||
Reference in New Issue
Block a user