Compare commits

...

15 Commits

Author SHA1 Message Date
set
9a33235c7e Автоматическое удаление комнат при неактивности
All checks were successful
Build G365SFU / build (push) Successful in 10m16s
2026-04-02 18:02:41 +02:00
set
feb4a51ab0 Буферы RTP пакетов для сокращения задержек если downstream медленный
All checks were successful
Build G365SFU / build (push) Successful in 12m1s
2026-03-20 23:19:22 +02:00
set
8c386eb42a Фикс Go версии
All checks were successful
Build G365SFU / build (push) Successful in 13m19s
2026-03-20 20:25:35 +02:00
set
027626eb2c Фикс CI/CD
Some checks failed
Build G365SFU / build (push) Failing after 6m47s
2026-03-20 20:13:32 +02:00
set
1cbe48d327 Добавлена система сборки и исправлены ошибки
Some checks failed
Build G365SFU / build (push) Failing after 46s
2026-03-20 19:26:13 +02:00
set
625a7acfb7 Система CI/CD
Some checks failed
Build G365SFU / build (push) Has been cancelled
2026-03-20 19:24:03 +02:00
set
6364253c6f Дополнительная информация по установке и дисклеймер 2026-03-20 18:54:06 +02:00
set
f6e99cf32b Исправлена ошибка несуществуюзего сокета 2026-03-20 18:39:10 +02:00
set
6dadec6b64 Исправление Race при renegotiation 2026-03-17 19:17:02 +02:00
set
96df1e52f9 Поддержка динамического запроса TURN (ICE) серверов для связи. 2026-03-17 15:27:14 +02:00
set
c8141f00bc Добавление по readme 2026-03-17 14:33:51 +02:00
set
3c810407db Базовый Readme, понятные переменные окружения 2026-03-17 14:28:15 +02:00
set
e703ac22e6 Добавление и обработка событий Peer_disconnected и Room_Delete 2026-03-16 19:25:55 +02:00
set
380299295d Исправление форвардинга RTP 2026-03-16 19:25:28 +02:00
set
3fb5f83f38 Структура и номера пакетов 2026-03-16 19:25:09 +02:00
19 changed files with 600 additions and 159 deletions

27
.env Normal file
View File

@@ -0,0 +1,27 @@
###########
# GATEWAY #
###########
#Секретный ключ по которому авторизуются бекенды при подключении к SFU шлюзу
SECRET=SFU_TEST_SECRET
#PORT SFU шлюза (для приема соединений от бекендов)
PORT=1001
###############
# SFU SECTION #
###############
#Публичный IP адрес, который будет использоваться для ICE кандидатов (если SFU работает за NAT)
SFU_PUBLIC_IP=10.211.55.2
#Диапазон портов для ICE кандидатов
SFU_PORT_RANGE_FROM=30000
SFU_PORT_RANGE_TO=39999
################
# TURN SECTION #
################
#Разрешить использовать этот SFU как TURN сервер тоже (для ретрансляции медиа трафика на сам SFU)
TURN_ALLOW=true
#TURN имя пользователя и пароль для аутентификации на TURN сервере (если используется)
TURN_PUBLIC_IP=10.211.55.2
TURN_USER=user
TURN_PASS=pass
#Диапазон занимемых TURN сервером портов (tcp/udp)
TURN_PORT_RANGE_FROM=40000
TURN_PORT_RANGE_TO=50000

View File

@@ -0,0 +1,44 @@
name: Build G365SFU
run-name: Build and Deploy G365SFU
on:
push:
branches: [ main ]
workflow_dispatch:
jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v6
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: 1.24
- name: Build G365SFU
run: |
make build
- name: Deploy to server via SSH
uses: appleboy/scp-action@v1
with:
host: ${{ secrets.SFU_SSH_HOST }}
username: ${{ secrets.SFU_SSH_USER }}
password: ${{ secrets.SFU_SSH_PASSWORD }}
port: ${{ secrets.SFU_SSH_PORT }}
source: "build/*"
target: ${{ secrets.SFU_DEPLOY_PATH }}
strip_components: 1
rm: false
debug: true
flatten: true
- name: Restart Docker containers
uses: appleboy/ssh-action@v1.2.5
with:
host: ${{ secrets.SFU_SSH_HOST }}
username: ${{ secrets.SFU_SSH_USER }}
password: ${{ secrets.SFU_SSH_PASSWORD }}
port: ${{ secrets.SFU_SSH_PORT }}
script: |
cd ${{ secrets.SFU_DEPLOY_PATH }}
docker-compose down
docker compose up -d --build

5
.gitignore vendored
View File

@@ -1,3 +1,4 @@
.env
.vscode
__debug*
__debug*
build/.env
build/g365sfu

5
Makefile Normal file
View File

@@ -0,0 +1,5 @@
.PHONY: build
build:
mkdir -p build
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o build/g365sfu .

View File

@@ -1,2 +1,29 @@
### SFU server
Сервер для организации видеоконференций на основе WebRTC. Написан на Go и использует библиотеку Pion WebRTC.
# G365SFU
G365SFU - это медиасервер, который используется для организации звонков. Он работает по протоколам WebSocket (шлюз/gateway) и WebRTC (для медиа-трафика) и состоит из трех основных компонентов: шлюза (gateway), SFU сервера, TURN сервера. SFU занимается пересылкой медиа-потоков между участниками звонка, а TURN сервер обеспечивает возможность связи между участниками, находящимися за NAT или брандмауэром. **ВАЖНО!** G365SFU не занимается сигнализацией, а только пересылает медиа-трафик, поэтому для организации звонков необходимо использовать отдельный сервер сигнализации, который будет обмениваться данными о подключении между участниками и SFU. **ВАЖНО!** Пользователи в звонках не соединяются друг с другом напрямую, они "созваниваются" с сервером SFU, используя его в качестве посредника для передачи медиа-трафика. Сервер SFU при этом не может расшифровать звонок, так как ему приходят уже зашифрованные RTP пакеты, которые он просто пересылает между участниками.
### Disclaimer
В Docker работает только на Linux (Ubuntu 22.04 и выше). На других платформах нужно запускать бинарный файл напрямую ./build/g365sfu, предварительно собрав его с помощью команды `make build`. Связано это с публикацией огромного диапазона портов для SFU и TURN сервера, что не поддерживается в Docker на других платформах. Сейчас Docker использует host-network, который поддерживает только ядра Linux, что позволяет обойти эту проблему.
### Gatway
Gateway - это компонент, который обеспечивает связь между Вашим сервером сигналинга (в Rosetta он написан на Java) и SFU сервером. Он принимает WebSocket соединения от сервера сигнализации и обрабатывает сообщения, связанные с управлением звонками, такими как создание комнаты, присоединение к комнате, отключение от комнаты и т.д. Gateway также отвечает за аутентификацию пользователей и управление их сессиями. В G365SFU используется встроенный шлюз, который можно настроить через переменные окружения. Он будет слушать на порту 1001 (по умолчанию) для входящих WebSocket соединений от сервера сигнализации. Gateway обеспечивает надежную связь между сервером сигнализации и SFU сервером, позволяя эффективно управлять звонками и участниками.
### SFU сервер
SFU - Selective Forwarding Unit - это тип медиасервера, который принимает медиа-потоки от участников видеоконференции и пересылает их другим участникам без декодирования и повторного кодирования. Это позволяет снизить нагрузку на сервер и улучшить качество видео для всех участников. Сейчас G365SFU просто пересылает RTP пакеты между участниками, не обрабатывая их содержимое (потому, что оно зашифровано). В будущем планируется добавить возможность обработки слоев улучшения.
### TURN сервер
TURN - Traversal Using Relays around NAT - это протокол, который позволяет устройствам за NAT (Network Address Translation) или брандмауэром устанавливать связь с другими устройствами в интернете. TURN серверы используются для ретрансляции медиа-трафика между участниками видеоконференции, когда прямое соединение между ними невозможно из-за ограничений сети. В G365SFU используется встроенный TURN сервер, который можно включить с помощью переменной окружения `TURN_ALLOW=true`. Он будет слушать на порту 3478 и использовать диапазон портов от 40000 до 50000 для ретрансляции трафика. Параметры сервера, такие как публичный IP, имя пользователя и пароль, также настраиваются через переменные окружения. TURN сервер обеспечивает надежную связь между участниками звонка, даже если они находятся за NAT.
# Установка (Ubuntu 22.04 и выше)
Для начала, нам необходимо открыть порты 30000-39999 для SFU и 40000-50000 для TURN сервера (по умолчанию, если перенастраивается .env то нужно указать другие). Это можно сделать с помощью команды `ufw`:
```bash
sudo ufw allow 30000:39999/udp
sudo ufw allow 40000:50000/udp
sudo ufw allow 30000:39999/tcp
sudo ufw allow 40000:50000/tcp
sudo ufw allow 3478/tcp
```
Запускаем докер-контейнер с G365SFU, указав необходимые переменные окружения в файле `build/.env` (пример можно взять из корневой папки проекта .env). Для этого нужно выполнить следующую команду в терминале, находясь в папке build:
```bash
docker compose up -d
```

View File

@@ -4,9 +4,12 @@ import (
"encoding/json"
"g365sfu/bytebuffer"
"g365sfu/logger"
"g365sfu/network"
"g365sfu/sfu"
"g365sfu/socket"
connection "g365sfu/socket/struct"
"g365sfu/turn"
"g365sfu/utils"
"net/http"
"os"
@@ -30,20 +33,33 @@ func Bootstrap() {
sfu.OnServerOffer = OnServerOffer
sfu.OnRoomDelete = OnRoomDelete
sfu.OnPeerDisconnected = OnPeerDisconnected
turnServer, err := turn.Start(turn.Config{
ListenAddr: "0.0.0.0:3478",
PublicIP: os.Getenv("TURN_PUBLIC_IP"),
Realm: "g365sfu",
Username: os.Getenv("TURN_USER"),
Password: os.Getenv("TURN_PASS"),
})
if err != nil {
logger.LogWarnMessage("TURN start failed: " + err.Error())
if os.Getenv("TURN_ALLOW") == "true" {
turnServer, err := turn.Start(turn.Config{
ListenAddr: "0.0.0.0:3478",
PublicIP: os.Getenv("TURN_PUBLIC_IP"),
Realm: "g365sfu",
Username: os.Getenv("TURN_USER"),
Password: os.Getenv("TURN_PASS"),
MinRelayPort: uint16(utils.AtoiOrDefault(os.Getenv("TURN_PORT_RANGE_FROM"), 40000)),
MaxRelayPort: uint16(utils.AtoiOrDefault(os.Getenv("TURN_PORT_RANGE_TO"), 50000)),
})
if err != nil {
logger.LogWarnMessage("error while starting TURN server: " + err.Error())
logger.LogInfoMessage("starting without TURN server, peer connections may fail if clients are behind symmetric NATs")
} else {
logger.LogInfoMessage("server TURN started at 0.0.0.0:3478")
// Заполняем глобальные переменные для TURN провайдера, чтобы их могли использовать другие части приложения
// Обратите внимание, заполняем их только в случе успешного старта Turn сервера
turn.TURN_PASS = os.Getenv("TURN_PASS")
turn.TURN_USER = os.Getenv("TURN_USER")
turn.TURN_PUBLIC_IP = os.Getenv("TURN_PUBLIC_IP")
defer turnServer.Close()
}
} else {
logger.LogInfoMessage("TURN started on 0.0.0.0:3478")
defer turnServer.Close()
// TURN сервер выключен в конфиге, что может влиять на соединение некоторых пользователей
logger.LogInfoMessage("starting without TURN server, peer connections may fail if clients are behind symmetric NATs")
}
logger.LogInfoMessage("server started at x.x.x.x:" + port)
logger.LogInfoMessage("server SFU started at x.x.x.x:" + port)
http.ListenAndServe(":"+port, nil)
}
@@ -58,7 +74,7 @@ func OnLocalICECandidate(roomID string, peerID string, candidate webrtc.ICECandi
buffer := bytebuffer.Allocate(
1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)) + 4 + len(jsonCandidate),
)
buffer.Put(0x04)
buffer.Put(byte(network.ON_LOCAL_ICE_CANDIDATE))
buffer.PutUint32(uint32(len([]byte(roomID))))
buffer.PutBytes([]byte(roomID))
buffer.PutUint32(uint32(len([]byte(peerID))))
@@ -80,7 +96,7 @@ func OnServerOffer(roomID string, peerID string, offer webrtc.SessionDescription
buffer := bytebuffer.Allocate(
1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)) + 4 + len(jsonOffer),
)
buffer.Put(0x08)
buffer.Put(byte(network.ON_SERVER_OFFER))
buffer.PutUint32(uint32(len([]byte(roomID))))
buffer.PutBytes([]byte(roomID))
buffer.PutUint32(uint32(len([]byte(peerID))))
@@ -91,32 +107,23 @@ func OnServerOffer(roomID string, peerID string, offer webrtc.SessionDescription
room.Server.WriteBinary(buffer.Bytes())
}
func OnRoomDelete(roomID string) {
room, exists := sfu.GetRoom(roomID)
if !exists {
logger.LogWarnMessage("tried to send room delete event to non existing room " + roomID)
return
}
func OnRoomDelete(roomID string, server *connection.Connection) {
buffer := bytebuffer.Allocate(1 + 4 + len([]byte(roomID)))
buffer.Put(0x10)
buffer.Put(byte(network.ON_ROOM_DELETE))
buffer.PutUint32(uint32(len([]byte(roomID))))
buffer.PutBytes([]byte(roomID))
buffer.Flip()
room.Server.WriteBinary(buffer.Bytes())
server.WriteBinary(buffer.Bytes())
}
func OnPeerDisconnected(roomID string, peerID string) {
room, exists := sfu.GetRoom(roomID)
if !exists {
logger.LogWarnMessage("tried to send peer disconnected event to non existing room " + roomID)
return
}
buffer := bytebuffer.Allocate(1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)))
buffer.Put(0x11)
func OnPeerDisconnected(roomID string, peerID string, server *connection.Connection, reason sfu.DisconnectReason) {
buffer := bytebuffer.Allocate(1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)) + 4)
buffer.Put(byte(network.ON_PEER_DISCONNECTED))
buffer.PutUint32(uint32(len([]byte(roomID))))
buffer.PutBytes([]byte(roomID))
buffer.PutUint32(uint32(len([]byte(peerID))))
buffer.PutBytes([]byte(peerID))
buffer.PutUint32(uint32(reason))
buffer.Flip()
room.Server.WriteBinary(buffer.Bytes())
server.WriteBinary(buffer.Bytes())
}

18
build/Dockerfile Normal file
View File

@@ -0,0 +1,18 @@
FROM alpine:3.20
RUN apk add --no-cache ca-certificates tzdata
WORKDIR /app
# Бинарь должен быть собран заранее в ./build/g365sfu
COPY g365sfu /app/g365sfu
COPY .env /app/.env
RUN chmod +x /app/g365sfu
EXPOSE 1001
EXPOSE 3478/tcp
EXPOSE 3478/udp
ENTRYPOINT ["/app/g365sfu"]

13
build/docker-compose.yml Normal file
View File

@@ -0,0 +1,13 @@
services:
g365sfu:
container_name: g365sfu
build:
context: .
dockerfile: Dockerfile
image: g365sfu:latest
restart: unless-stopped
env_file:
- .env
#Поддерживается только в Linux (на Windows и MacOS нужно использовать bridge сеть и проброс портов, но нельзя пробрасывать большие
#диапазоны портов, это может вызывать проблемы с работой Docker)
network_mode: host

View File

@@ -20,7 +20,7 @@ func LogInfoMessage(message string) {
fmt.Printf("%s[g365sfu] %s[%s]%s %s[INFO]%s %s\n",
colorBlue,
colorGray, timestamp, colorReset,
colorGreen, colorReset,
colorCyan, colorReset,
message,
)
}
@@ -60,7 +60,7 @@ func LogSuccessMessage(message string) {
fmt.Printf("%s[g365sfu] %s[%s]%s %s[SUCCESS]%s %s\n",
colorBlue,
colorGray, timestamp, colorReset,
colorCyan, colorReset,
colorGreen, colorReset,
message,
)
}

20
network/incoming.go Normal file
View File

@@ -0,0 +1,20 @@
package network
// Входящие пакеты от бекендов для SFU
var (
// Рукопожатие от бекенда при подключении
HANDSHAKE = 0x01
// Запрос на создание комнаты
ROOM_CREATE = 0x02
// SDP OFFER от бекенда для подключения к комнате
SDP_OFFER = 0x03
// ICE кандидат от бекенда от конкретного peer
ICE_CANDIDATE = 0x06
// SDP ANSWER от клиента при renegotiation
SDP_ANSWER_RENEGOTIATION = 0x07
// Check life для проверки соединения с сервером SFU
CHECK_LIFE = 0xAE
// Запрос от бекенда на получение данных для подключения к TURN серверу
TURN_ASK = 0x19
)

25
network/outgoing.go Normal file
View File

@@ -0,0 +1,25 @@
package network
// События для отправки подключенным бекендам
var (
// Успешное рукопожатие
HANDSHAKE_SUCCESS = 0x01
// Неудачное рукопожатие
HANDSHAKE_FAILURE = 0xFF
// Локальный ICE-кандидат для конкретного peer
ON_LOCAL_ICE_CANDIDATE = 0x04
// Новый оффер от сервера для конкретного peer (при renegotiation)
ON_SERVER_OFFER = 0x08
// Удаление комнаты
ON_ROOM_DELETE = 0x10
// При отключении пира (обрыве связи)
ON_PEER_DISCONNECTED = 0x11
// Успешное создание комнаты
ROOM_CREATE_SUCCESS = 0x02
// SDP Answer от локального сервера при подключении к комнате
SDP_ANSWER = 0x05
// Сервер SFU отправит этот пакет бекенду при получении Check life для подтверждения, что соединение живо
CHECK_LIFE_SUCCESS = 0xAE
// Ответ от сервера при вопросе о TURN
TURN_SERVER = 0x19
)

View File

@@ -9,6 +9,7 @@ import (
"time"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4"
)
@@ -20,7 +21,13 @@ var (
renegPending sync.Map // key -> bool
)
const disconnectGracePeriod = 12 * time.Second
const (
disconnectGracePeriod = 30 * time.Second
outboundPacketBuffer = 256
maxConsecutiveReadErrs = 25
maxConsecutiveWriteErrs = 50
packetDropLogEvery = 100
)
func peerKey(roomID, peerID string) string {
return roomID + "|" + peerID
@@ -64,6 +71,7 @@ func removeRoomTrack(roomID, trackID string) {
if !ok {
return
}
room.mu.Lock()
defer room.mu.Unlock()
@@ -79,8 +87,7 @@ func removeRoomTrack(roomID, trackID string) {
func isPeerConnectionAlive(pc *webrtc.PeerConnection) bool {
state := pc.ConnectionState()
return state != webrtc.PeerConnectionStateClosed &&
state != webrtc.PeerConnectionStateFailed &&
state != webrtc.PeerConnectionStateDisconnected
state != webrtc.PeerConnectionStateFailed
}
// Вешается на каждый PeerConnection при JoinWithOffer.
@@ -90,14 +97,45 @@ func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) {
mu sync.Mutex
disconnecting bool
timer *time.Timer
leaveOnce sync.Once
)
room, exists := GetRoom(roomID)
if !exists {
logger.LogWarnMessage("BindPeerLifecycle: tried to bind non existing room " + roomID)
return
}
server := room.Server
cancelTimer := func() {
mu.Lock()
defer mu.Unlock()
if timer != nil {
timer.Stop()
timer = nil
}
disconnecting = false
}
leaveAndNotify := func(reason DisconnectReason) {
leaveOnce.Do(func() {
cancelTimer()
err := LeaveRoom(roomID, peerID)
if OnPeerDisconnected != nil && err == nil {
OnPeerDisconnected(roomID, peerID, server, reason)
}
})
}
startTimer := func() {
mu.Lock()
defer mu.Unlock()
if disconnecting {
return
}
disconnecting = true
timer = time.AfterFunc(disconnectGracePeriod, func() {
state := pc.ConnectionState()
@@ -107,35 +145,20 @@ func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) {
mu.Unlock()
return
}
_ = LeaveRoom(roomID, peerID)
if OnPeerDisconnected != nil {
OnPeerDisconnected(roomID, peerID)
}
leaveAndNotify(DisconnectReasonFailed)
})
}
cancelTimer := func() {
mu.Lock()
defer mu.Unlock()
if timer != nil {
timer.Stop()
timer = nil
}
disconnecting = false
}
pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
switch state {
case webrtc.ICEConnectionStateConnected, webrtc.ICEConnectionStateCompleted:
cancelTimer()
case webrtc.ICEConnectionStateDisconnected:
startTimer()
case webrtc.ICEConnectionStateFailed, webrtc.ICEConnectionStateClosed:
cancelTimer()
_ = LeaveRoom(roomID, peerID)
if OnPeerDisconnected != nil {
OnPeerDisconnected(roomID, peerID)
}
case webrtc.ICEConnectionStateClosed:
leaveAndNotify(DisconnectReasonClosed)
case webrtc.ICEConnectionStateFailed:
leaveAndNotify(DisconnectReasonFailed)
}
})
@@ -145,17 +168,15 @@ func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) {
cancelTimer()
case webrtc.PeerConnectionStateDisconnected:
startTimer()
case webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateClosed:
cancelTimer()
_ = LeaveRoom(roomID, peerID)
if OnPeerDisconnected != nil {
OnPeerDisconnected(roomID, peerID)
}
case webrtc.PeerConnectionStateClosed:
leaveAndNotify(DisconnectReasonClosed)
case webrtc.PeerConnectionStateFailed:
leaveAndNotify(DisconnectReasonFailed)
}
})
}
// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты
// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты.
func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) {
publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
localTrackID := fmt.Sprintf("%s:%s:%s:%d",
@@ -171,7 +192,7 @@ func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *
remote.StreamID(),
)
if err != nil {
logger.LogErrorMessage("SetupForwardingForPeer: NewTrackLocalStaticRTP error")
logger.LogErrorMessage("SetupForwardingForPeer: NewTrackLocalStaticRTP error: " + err.Error())
return
}
defer removeRoomTrack(roomID, localTrack.ID())
@@ -196,54 +217,124 @@ func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *
continue
}
// Не трогаем закрытые/failed соединения
if !isPeerConnectionAlive(sub.PeerConnection) {
fmt.Println("SetupForwardingForPeer: skipping dead peer:", sub.PeerID,
sub.PeerConnection.ConnectionState().String())
continue
}
sender, err := sub.PeerConnection.AddTrack(localTrack)
if err != nil {
fmt.Println("SetupForwardingForPeer: AddTrack error:", roomID, sub.PeerID, err)
logger.LogWarnMessage("SetupForwardingForPeer: AddTrack error: " + sub.PeerID + " " + err.Error())
continue
}
// RTCP drain
senderCopy := sender
go func() {
buf := make([]byte, 1500)
for {
if _, _, e := sender.Read(buf); e != nil {
if _, _, e := senderCopy.Read(buf); e != nil {
return
}
}
}()
if err = renegotiatePeer(roomID, sub.PeerID, sub.PeerConnection); err != nil {
fmt.Println("SetupForwardingForPeer: renegotiatePeer error:", roomID, sub.PeerID, err)
}
subID := sub.PeerID
subPC := sub.PeerConnection
go func() {
if err := renegotiatePeer(roomID, subID, subPC); err != nil {
logger.LogWarnMessage("SetupForwardingForPeer: renegotiatePeer error: " + subID + " " + err.Error())
}
}()
}
// Для video просим keyframe
if remote.Kind() == webrtc.RTPCodecTypeVideo {
_ = publisherPC.WriteRTCP([]rtcp.Packet{
&rtcp.PictureLossIndication{MediaSSRC: uint32(remote.SSRC())},
})
}
// Publisher RTP -> localTrack -> subscribers
// Отделяем чтение входящего RTP от записи в TrackLocal bounded-очередью.
// Если downstream начинает тормозить, пакеты дропаются из очереди,
// а не накапливают бесконечную задержку.
packetQueue := make(chan *rtp.Packet, outboundPacketBuffer)
writerDone := make(chan struct{})
defer close(packetQueue)
go func() {
defer close(writerDone)
consecutiveWriteErrs := 0
for pkt := range packetQueue {
if err := localTrack.WriteRTP(pkt); err != nil {
consecutiveWriteErrs++
if consecutiveWriteErrs == 1 || consecutiveWriteErrs%10 == 0 {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: WriteRTP error publisher=%s track=%s count=%d err=%v",
publisherPeerID, localTrack.ID(), consecutiveWriteErrs, err,
))
}
if consecutiveWriteErrs >= maxConsecutiveWriteErrs {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: too many WriteRTP errors publisher=%s track=%s",
publisherPeerID, localTrack.ID(),
))
return
}
continue
}
consecutiveWriteErrs = 0
}
}()
consecutiveReadErrs := 0
droppedPackets := 0
for {
pkt, _, err := remote.ReadRTP()
if err != nil {
if err == io.EOF {
return
}
fmt.Println("SetupForwardingForPeer: ReadRTP error:", err)
return
consecutiveReadErrs++
if consecutiveReadErrs == 1 || consecutiveReadErrs%10 == 0 {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: ReadRTP error publisher=%s track=%s count=%d err=%v",
publisherPeerID, localTrack.ID(), consecutiveReadErrs, err,
))
}
if consecutiveReadErrs >= maxConsecutiveReadErrs {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: too many ReadRTP errors publisher=%s track=%s",
publisherPeerID, localTrack.ID(),
))
return
}
time.Sleep(10 * time.Millisecond)
continue
}
if err = localTrack.WriteRTP(pkt); err != nil {
fmt.Println("SetupForwardingForPeer: WriteRTP error:", err)
consecutiveReadErrs = 0
select {
case <-writerDone:
return
default:
}
select {
case <-writerDone:
return
case packetQueue <- pkt:
default:
droppedPackets++
if droppedPackets == 1 || droppedPackets%packetDropLogEvery == 0 {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: packet queue overflow publisher=%s track=%s dropped=%d",
publisherPeerID, localTrack.ID(), droppedPackets,
))
}
}
}
})
@@ -258,7 +349,6 @@ func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error {
return nil
}
// Не начинаем новую negotiation поверх текущей.
if pc.SignalingState() != webrtc.SignalingStateStable {
setRenegPending(roomID, peerID, true)
return nil
@@ -281,7 +371,7 @@ func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error {
return nil
}
// Добавляет ICE-кандидата к пиру
// Добавляет ICE-кандидата к пиру.
func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidateInit) error {
room, exists := GetRoom(roomID)
if !exists {
@@ -304,7 +394,6 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate
rd := pc.RemoteDescription()
if rd == nil {
// offer/answer еще не применен — буферизуем
pendingMu.Lock()
k := peerKey(roomID, peerID)
pendingCandidates[k] = append(pendingCandidates[k], candidate)
@@ -312,7 +401,6 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate
return nil
}
// Отбрасываем stale candidate по ufrag
if candidate.UsernameFragment != nil {
current := extractICEUfrag(rd.SDP)
if current != "" && *candidate.UsernameFragment != current {
@@ -327,7 +415,7 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate
return err
}
// Обрабатывает SDP answer от клиента при renegotiation
// Обрабатывает SDP answer от клиента при renegotiation.
func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescription) error {
if answer.Type != webrtc.SDPTypeAnswer {
return fmt.Errorf("invalid sdp type: %s", answer.Type.String())
@@ -356,7 +444,6 @@ func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescr
return err
}
// После применения answer — применяем отложенные кандидаты.
k := peerKey(roomID, peerID)
pendingMu.Lock()
queue := pendingCandidates[k]
@@ -367,7 +454,6 @@ func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescr
_ = AddICECandidate(roomID, peerID, c)
}
// Если во время negotiation накопился новый запрос — запускаем еще цикл.
if popRenegPending(roomID, peerID) {
_ = renegotiatePeer(roomID, peerID, pc)
}

11
sfu/reason.go Normal file
View File

@@ -0,0 +1,11 @@
package sfu
// Причины отключения пира от комнаты, которые могут быть использованы для логирования или уведомлений
type DisconnectReason int
const (
// Пир отключился из-за ошибки соединения или другой проблемы
DisconnectReasonFailed DisconnectReason = 0
// Пир отключился по своей инициативе (например, закрыл приложение)
DisconnectReasonClosed = 1
)

View File

@@ -1,8 +1,10 @@
package sfu
import (
"g365sfu/logger"
connection "g365sfu/socket/struct"
"sync"
"time"
"github.com/pion/webrtc/v4"
)
@@ -33,6 +35,8 @@ type Room struct {
Tracks []RoomTrack
mu sync.RWMutex
emptyTimer *time.Timer
}
// Общие переменные
@@ -41,6 +45,46 @@ var (
roomsMu sync.RWMutex
)
const emptyRoomTTL = 30 * time.Second
func scheduleEmptyRoomDeletion(room *Room) {
room.mu.Lock()
if len(room.Peers) > 0 {
if room.emptyTimer != nil {
room.emptyTimer.Stop()
room.emptyTimer = nil
}
room.mu.Unlock()
return
}
if room.emptyTimer != nil {
room.mu.Unlock()
return
}
roomID := room.RoomID
var timer *time.Timer
timer = time.AfterFunc(emptyRoomTTL, func() {
currentRoom, exists := GetRoom(roomID)
if !exists {
return
}
currentRoom.mu.Lock()
if currentRoom.emptyTimer != timer || len(currentRoom.Peers) > 0 {
currentRoom.mu.Unlock()
return
}
currentRoom.emptyTimer = nil
currentRoom.mu.Unlock()
if err := DeleteRoom(roomID); err != nil && err != ErrRoomNotFound {
logger.LogWarnMessage("scheduleEmptyRoomDeletion: failed to delete room " + roomID + ": " + err.Error())
}
})
room.emptyTimer = timer
room.mu.Unlock()
}
// CreateRoom создает комнату
func CreateRoom(server *connection.Connection, roomID string) (*Room, error) {
roomsMu.Lock()
@@ -52,6 +96,7 @@ func CreateRoom(server *connection.Connection, roomID string) (*Room, error) {
Peers: []Peer{},
}
rooms[roomID] = room
scheduleEmptyRoomDeletion(room)
return room, nil
}
@@ -76,43 +121,9 @@ func JoinWithOffer(roomID string, peerID string, offer webrtc.SessionDescription
return nil, err
}
peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) {
if c == nil {
return
}
if OnLocalICECandidate != nil {
OnLocalICECandidate(roomID, peerID, c.ToJSON())
}
})
BindPeerLifecycle(roomID, peerID, peerConnection)
SetupForwardingForPeer(roomID, peerID, peerConnection)
room.mu.RLock()
existingTracks := make([]RoomTrack, len(room.Tracks))
copy(existingTracks, room.Tracks)
room.mu.RUnlock()
for _, t := range existingTracks {
if t.OwnerPeer == peerID {
continue
}
sender, err := peerConnection.AddTrack(t.Local)
if err != nil {
continue
}
go func() {
buf := make([]byte, 1500)
for {
if _, _, e := sender.Read(buf); e != nil {
return
}
}
}()
}
if err = peerConnection.SetRemoteDescription(offer); err != nil {
_ = peerConnection.Close()
return nil, err
@@ -131,19 +142,68 @@ func JoinWithOffer(roomID string, peerID string, offer webrtc.SessionDescription
}
<-gatherDone
peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) {
if c == nil {
return
}
if OnLocalICECandidate != nil {
OnLocalICECandidate(roomID, peerID, c.ToJSON())
}
})
// Добавляем peer в комнату и сразу снимаем snapshot существующих треков
// в одном локе — чтобы не было race с OnTrack
room.mu.Lock()
if room.emptyTimer != nil {
room.emptyTimer.Stop()
room.emptyTimer = nil
}
room.Peers = append(room.Peers, Peer{
PeerID: peerID,
PeerConnection: peerConnection,
})
existingTracks := make([]RoomTrack, len(room.Tracks))
copy(existingTracks, room.Tracks)
room.mu.Unlock()
// Подписываем нового peer на уже существующие треки ПОСЛЕ добавления в комнату
for _, t := range existingTracks {
if t.OwnerPeer == peerID {
continue
}
sender, err := peerConnection.AddTrack(t.Local)
if err != nil {
continue
}
senderCopy := sender
go func() {
buf := make([]byte, 1500)
for {
if _, _, e := senderCopy.Read(buf); e != nil {
return
}
}
}()
}
// Если были добавлены треки — нужна renegotiation
if len(existingTracks) > 0 {
go func() {
if err := renegotiatePeer(roomID, peerID, peerConnection); err != nil {
logger.LogWarnMessage("JoinWithOffer: renegotiatePeer error: " + err.Error())
}
}()
}
return peerConnection.LocalDescription(), nil
}
func DeleteRoom(roomID string) error {
roomsMu.Lock()
room, exists := rooms[roomID]
server := room.Server
if !exists {
roomsMu.Unlock()
return ErrRoomNotFound
@@ -152,6 +212,10 @@ func DeleteRoom(roomID string) error {
roomsMu.Unlock()
room.mu.Lock()
if room.emptyTimer != nil {
room.emptyTimer.Stop()
room.emptyTimer = nil
}
peers := make([]Peer, len(room.Peers))
copy(peers, room.Peers)
room.Peers = nil
@@ -163,6 +227,8 @@ func DeleteRoom(roomID string) error {
cleanupForwardingState(roomID, p.PeerID)
}
OnRoomDelete(roomID, server)
return nil
}
@@ -213,12 +279,13 @@ func LeaveRoom(roomID string, peerID string) error {
}
cleanupForwardingState(roomID, peerID)
// Комната пустая -> удаляем
// Комната пустая -> планируем удаление через TTL
if shouldDrop {
return DeleteRoom(roomID)
scheduleEmptyRoomDeletion(room)
return nil
}
// Опционально: renegotiation оставшимся peer после удаления треков/peer
// renegotiation оставшимся peer после удаления треков/peer
room.mu.RLock()
rest := make([]Peer, len(room.Peers))
copy(rest, room.Peers)

View File

@@ -3,6 +3,8 @@ package sfu
import (
"errors"
"g365sfu/logger"
connection "g365sfu/socket/struct"
"g365sfu/utils"
"os"
"github.com/pion/interceptor"
@@ -21,10 +23,10 @@ var OnServerOffer func(roomID string, peerID string, offer webrtc.SessionDescrip
var OnLocalICECandidate func(roomID, peerID string, candidate webrtc.ICECandidateInit)
// Коллбек для обработки отключения пира (обрыв связи)
var OnPeerDisconnected func(roomID, peerID string)
var OnPeerDisconnected func(roomID, peerID string, server *connection.Connection, reason DisconnectReason)
// Коллбек для обработки удаления комнаты
var OnRoomDelete func(roomID string)
var OnRoomDelete func(roomID string, server *connection.Connection)
// Ошибки
var (
@@ -33,12 +35,12 @@ var (
)
func InitWebRTCEngines() {
publicIP := os.Getenv("PUBLIC_IP")
fromPort := os.Getenv("PORT_RANGE_FROM")
toPort := os.Getenv("PORT_RANGE_TO")
if publicIP == "" || fromPort == "" || toPort == "" {
publicIP := os.Getenv("SFU_PUBLIC_IP")
fromPort := utils.AtoiOrDefault(os.Getenv("SFU_PORT_RANGE_FROM"), 30000)
toPort := utils.AtoiOrDefault(os.Getenv("SFU_PORT_RANGE_TO"), 39999)
if publicIP == "" || fromPort == 0 || toPort == 0 {
// Если не указаны необходимые переменные окружения, логируем ошибку и завершаем процесс сервера
logger.LogErrorMessage("PUBLIC_IP, PORT_RANGE_FROM and PORT_RANGE_TO environment variables must be set")
logger.LogErrorMessage("SFU_PUBLIC_IP, SFU_PORT_RANGE_FROM and SFU_PORT_RANGE_TO environment variables must be set")
os.Exit(-1)
return
}
@@ -49,9 +51,9 @@ func InitWebRTCEngines() {
_ = webrtc.RegisterDefaultInterceptors(m, i)
se := webrtc.SettingEngine{}
_ = se.SetEphemeralUDPPortRange(40000, 50000)
_ = se.SetEphemeralUDPPortRange(uint16(fromPort), uint16(toPort))
if publicIP := os.Getenv("PUBLIC_IP"); publicIP != "" {
if publicIP := os.Getenv("SFU_PUBLIC_IP"); publicIP != "" {
se.SetICEAddressRewriteRules(webrtc.ICEAddressRewriteRule{
External: []string{publicIP},
AsCandidateType: webrtc.ICECandidateTypeHost,

View File

@@ -4,8 +4,10 @@ import (
"encoding/json"
"g365sfu/bytebuffer"
"g365sfu/logger"
"g365sfu/network"
"g365sfu/sfu"
connection "g365sfu/socket/struct"
"g365sfu/turn"
"g365sfu/utils"
"net/http"
"os"
@@ -30,7 +32,15 @@ func getSecret() string {
// Обработчик WebSocket соединений
func HandleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, _ := upgrader.Upgrade(w, r, nil)
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logger.LogWarnMessage("failed to upgrade to websocket: " + err.Error())
return
}
if conn == nil {
logger.LogWarnMessage("failed to upgrade to websocket: connection is nil")
return
}
defer conn.Close()
// Канал для передачи байтов
@@ -68,7 +78,7 @@ func processData(data <-chan []byte, connection *connection.Connection) {
// Логика обработки байтов
buffer := bytebuffer.Wrap(bytes)
packetId, _ := buffer.Get()
if packetId == 0x01 {
if packetId == byte(network.HANDSHAKE) {
// Это рукопожатие, дальше сравниваем секретные ключи
secretLength, _ := buffer.GetUint32()
@@ -78,13 +88,17 @@ func processData(data <-chan []byte, connection *connection.Connection) {
AddSocket(connection)
// Подготовка ответа для клиента о успешном рукопожатии
response := bytebuffer.Allocate(1)
response.Put(0x01)
response.Put(byte(network.HANDSHAKE_SUCCESS))
response.Flip()
// Отправляем ответ клиенту
connection.WriteBinary(response.Bytes())
continue
}
connection.WriteBinary([]byte{0xFF})
response := bytebuffer.Allocate(1)
response.Put(byte(network.HANDSHAKE_FAILURE))
response.Flip()
// Отправляем ответ клиенту
connection.WriteBinary(response.Bytes())
logger.LogWarnMessage("failed handshake from " + connection.Socket.RemoteAddr().String() + " because of invalid secret key")
connection.Close()
return
@@ -98,7 +112,7 @@ func processData(data <-chan []byte, connection *connection.Connection) {
}
// Создание комнаты
if packetId == 0x02 {
if packetId == byte(network.ROOM_CREATE) {
roomLength, _ := buffer.GetUint32()
roomIDBytes, _ := buffer.GetBytes(int(roomLength))
roomID := string(roomIDBytes)
@@ -106,7 +120,7 @@ func processData(data <-chan []byte, connection *connection.Connection) {
logger.LogSuccessMessage("room initializated " + room.RoomID)
// Подготовка ответа для клиента с ID комнаты
response := bytebuffer.Allocate(1 + 4 + len([]byte(room.RoomID)))
response.Put(0x02)
response.Put(byte(network.ROOM_CREATE_SUCCESS))
response.PutUint32(uint32(len([]byte(room.RoomID))))
response.PutBytes([]byte(room.RoomID))
response.Flip()
@@ -116,7 +130,7 @@ func processData(data <-chan []byte, connection *connection.Connection) {
}
//SDP OFFER для подключения к комнате
if packetId == 0x03 {
if packetId == byte(network.SDP_OFFER) {
roomIdLen, _ := buffer.GetUint32()
roomIDBytes, _ := buffer.GetBytes(int(roomIdLen))
roomID := string(roomIDBytes)
@@ -145,7 +159,7 @@ func processData(data <-chan []byte, connection *connection.Connection) {
answerBytes, _ := json.Marshal(answer)
// Подготовка ответа для клиента с SDP answer
response := bytebuffer.Allocate(1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)) + 4 + len(answerBytes))
response.Put(0x05)
response.Put(byte(network.SDP_ANSWER))
response.PutUint32(uint32(len([]byte(roomID))))
response.PutBytes([]byte(roomID))
response.PutUint32(uint32(len([]byte(peerID))))
@@ -159,7 +173,7 @@ func processData(data <-chan []byte, connection *connection.Connection) {
}
//ICE-candidate пришел от участника комнаты
if packetId == 0x06 {
if packetId == byte(network.ICE_CANDIDATE) {
roomIdLen, _ := buffer.GetUint32()
roomIDBytes, _ := buffer.GetBytes(int(roomIdLen))
roomID := string(roomIDBytes)
@@ -182,7 +196,7 @@ func processData(data <-chan []byte, connection *connection.Connection) {
}
// SDP ANSWER от клиента при renegotiation
if packetId == 0x07 {
if packetId == byte(network.SDP_ANSWER_RENEGOTIATION) {
roomIdLen, _ := buffer.GetUint32()
roomIDBytes, _ := buffer.GetBytes(int(roomIdLen))
roomID := string(roomIDBytes)
@@ -204,14 +218,50 @@ func processData(data <-chan []byte, connection *connection.Connection) {
}
}
//Check life для проверки соединения с сервером SFU
if packetId == 0xAE {
if packetId == byte(network.CHECK_LIFE) {
// Подготовка ответа для клиента о том, что соединение живо
response := bytebuffer.Allocate(1)
response.Put(0xAE)
response.Put(byte(network.CHECK_LIFE_SUCCESS))
response.Flip()
// Отправляем ответ клиенту
connection.WriteBinary(response.Bytes())
continue
}
// Запрос от бекенда на получение данных для подключения к TURN серверу
if packetId == byte(network.TURN_ASK) && turn.TURN_PUBLIC_IP != "" {
// Отвечаем только в том случае, если TURN сервер был успешно запущен и данные для подключения к нему были заполнены
// Отправляем два пакета один на tcp сервер другой на udp сервер, так как некоторые клиенты могут поддерживать только один из этих протоколов
//tcp
response := bytebuffer.Allocate(1 + 4 + len([]byte(turn.TURN_PUBLIC_IP)) + 4 + len([]byte(turn.TURN_USER)) + 4 + len([]byte(turn.TURN_PASS)) + 4 + len([]byte("tcp")))
response.Put(byte(network.TURN_SERVER))
response.PutUint32(uint32(len([]byte(turn.TURN_PUBLIC_IP))))
response.PutBytes([]byte(turn.TURN_PUBLIC_IP))
response.PutUint32(uint32(len([]byte(turn.TURN_USER))))
response.PutBytes([]byte(turn.TURN_USER))
response.PutUint32(uint32(len([]byte(turn.TURN_PASS))))
response.PutBytes([]byte(turn.TURN_PASS))
response.PutUint32(uint32(len([]byte("tcp"))))
response.PutBytes([]byte("tcp"))
response.Flip()
connection.WriteBinary(response.Bytes())
//udp
responseUDP := bytebuffer.Allocate(1 + 4 + len([]byte(turn.TURN_PUBLIC_IP)) + 4 + len([]byte(turn.TURN_USER)) + 4 + len([]byte(turn.TURN_PASS)) + 4 + len([]byte("udp")))
responseUDP.Put(byte(network.TURN_SERVER))
responseUDP.PutUint32(uint32(len([]byte(turn.TURN_PUBLIC_IP))))
responseUDP.PutBytes([]byte(turn.TURN_PUBLIC_IP))
responseUDP.PutUint32(uint32(len([]byte(turn.TURN_USER))))
responseUDP.PutBytes([]byte(turn.TURN_USER))
responseUDP.PutUint32(uint32(len([]byte(turn.TURN_PASS))))
responseUDP.PutBytes([]byte(turn.TURN_PASS))
responseUDP.PutUint32(uint32(len([]byte("udp"))))
responseUDP.PutBytes([]byte("udp"))
responseUDP.Flip()
connection.WriteBinary(responseUDP.Bytes())
continue
}
}
}

8
turn/provider.go Normal file
View File

@@ -0,0 +1,8 @@
package turn
// Заполнится если сервер запустится успешно
var (
TURN_PUBLIC_IP = ""
TURN_USER = ""
TURN_PASS = ""
)

View File

@@ -19,6 +19,25 @@ type Config struct {
Realm string
Username string
Password string
MinRelayPort uint16
MaxRelayPort uint16
}
func relayGen(ip net.IP, minPort, maxPort uint16) pionturn.RelayAddressGenerator {
if minPort != 0 && maxPort != 0 && minPort <= maxPort {
return &pionturn.RelayAddressGeneratorPortRange{
RelayAddress: ip,
Address: "0.0.0.0",
MinPort: minPort,
MaxPort: maxPort,
}
}
return &pionturn.RelayAddressGeneratorStatic{
RelayAddress: ip,
Address: "0.0.0.0",
}
}
func Start(cfg Config) (*Server, error) {
@@ -38,6 +57,8 @@ func Start(cfg Config) (*Server, error) {
return nil, err
}
rg := relayGen(ip, cfg.MinRelayPort, cfg.MaxRelayPort)
srv, err := pionturn.NewServer(pionturn.ServerConfig{
Realm: cfg.Realm,
AuthHandler: func(username, realm string, srcAddr net.Addr) ([]byte, bool) {
@@ -48,23 +69,18 @@ func Start(cfg Config) (*Server, error) {
},
PacketConnConfigs: []pionturn.PacketConnConfig{
{
PacketConn: udpConn,
RelayAddressGenerator: &pionturn.RelayAddressGeneratorStatic{
RelayAddress: ip,
Address: "0.0.0.0",
},
PacketConn: udpConn,
RelayAddressGenerator: rg,
},
},
ListenerConfigs: []pionturn.ListenerConfig{
{
Listener: tcpListener,
RelayAddressGenerator: &pionturn.RelayAddressGeneratorStatic{
RelayAddress: ip,
Address: "0.0.0.0",
},
Listener: tcpListener,
RelayAddressGenerator: rg,
},
},
})
if err != nil {
_ = tcpListener.Close()
_ = udpConn.Close()

View File

@@ -1,6 +1,9 @@
package utils
import "math/rand"
import (
"math/rand"
"strconv"
)
// Генерация случайной строки заданной длины
func RandomString(n int) string {
@@ -11,3 +14,14 @@ func RandomString(n int) string {
}
return string(b)
}
func AtoiOrDefault(s string, defaultValue int) int {
if s == "" {
return defaultValue
}
n, err := strconv.Atoi(s)
if err != nil {
return defaultValue
}
return n
}