Compare commits

...

23 Commits

Author SHA1 Message Date
set
9a33235c7e Автоматическое удаление комнат при неактивности
All checks were successful
Build G365SFU / build (push) Successful in 10m16s
2026-04-02 18:02:41 +02:00
set
feb4a51ab0 Буферы RTP пакетов для сокращения задержек если downstream медленный
All checks were successful
Build G365SFU / build (push) Successful in 12m1s
2026-03-20 23:19:22 +02:00
set
8c386eb42a Фикс Go версии
All checks were successful
Build G365SFU / build (push) Successful in 13m19s
2026-03-20 20:25:35 +02:00
set
027626eb2c Фикс CI/CD
Some checks failed
Build G365SFU / build (push) Failing after 6m47s
2026-03-20 20:13:32 +02:00
set
1cbe48d327 Добавлена система сборки и исправлены ошибки
Some checks failed
Build G365SFU / build (push) Failing after 46s
2026-03-20 19:26:13 +02:00
set
625a7acfb7 Система CI/CD
Some checks failed
Build G365SFU / build (push) Has been cancelled
2026-03-20 19:24:03 +02:00
set
6364253c6f Дополнительная информация по установке и дисклеймер 2026-03-20 18:54:06 +02:00
set
f6e99cf32b Исправлена ошибка несуществуюзего сокета 2026-03-20 18:39:10 +02:00
set
6dadec6b64 Исправление Race при renegotiation 2026-03-17 19:17:02 +02:00
set
96df1e52f9 Поддержка динамического запроса TURN (ICE) серверов для связи. 2026-03-17 15:27:14 +02:00
set
c8141f00bc Добавление по readme 2026-03-17 14:33:51 +02:00
set
3c810407db Базовый Readme, понятные переменные окружения 2026-03-17 14:28:15 +02:00
set
e703ac22e6 Добавление и обработка событий Peer_disconnected и Room_Delete 2026-03-16 19:25:55 +02:00
set
380299295d Исправление форвардинга RTP 2026-03-16 19:25:28 +02:00
set
3fb5f83f38 Структура и номера пакетов 2026-03-16 19:25:09 +02:00
set
e67bd3d824 Новые события с пирами и комнатами, базовый форвардинг 2026-03-16 17:08:56 +02:00
set
a9a1dc5895 Life-Check для проверки соединения с SFU 2026-03-15 17:55:57 +02:00
set
b6d5780584 TURN сервер для выхода за сложные NAT 2026-03-14 23:08:18 +02:00
set
ef591072f3 Исправлена гонка записи в сокет 2026-03-14 21:16:47 +02:00
set
56a85a436b Обработка Offers от сервера 2026-03-14 20:55:15 +02:00
set
9112764d6a SDP Answer согласно протоколу g365sfu 2026-03-14 19:42:16 +02:00
set
4c2d382b34 Фикс перевода в JSON 2026-03-14 19:29:36 +02:00
set
c7919310b8 ICE Candidate перевод в JSON 2026-03-14 19:26:50 +02:00
20 changed files with 1193 additions and 108 deletions

27
.env Normal file
View File

@@ -0,0 +1,27 @@
###########
# GATEWAY #
###########
#Секретный ключ по которому авторизуются бекенды при подключении к SFU шлюзу
SECRET=SFU_TEST_SECRET
#PORT SFU шлюза (для приема соединений от бекендов)
PORT=1001
###############
# SFU SECTION #
###############
#Публичный IP адрес, который будет использоваться для ICE кандидатов (если SFU работает за NAT)
SFU_PUBLIC_IP=10.211.55.2
#Диапазон портов для ICE кандидатов
SFU_PORT_RANGE_FROM=30000
SFU_PORT_RANGE_TO=39999
################
# TURN SECTION #
################
#Разрешить использовать этот SFU как TURN сервер тоже (для ретрансляции медиа трафика на сам SFU)
TURN_ALLOW=true
#TURN имя пользователя и пароль для аутентификации на TURN сервере (если используется)
TURN_PUBLIC_IP=10.211.55.2
TURN_USER=user
TURN_PASS=pass
#Диапазон занимемых TURN сервером портов (tcp/udp)
TURN_PORT_RANGE_FROM=40000
TURN_PORT_RANGE_TO=50000

View File

@@ -0,0 +1,44 @@
name: Build G365SFU
run-name: Build and Deploy G365SFU
on:
push:
branches: [ main ]
workflow_dispatch:
jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v6
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: 1.24
- name: Build G365SFU
run: |
make build
- name: Deploy to server via SSH
uses: appleboy/scp-action@v1
with:
host: ${{ secrets.SFU_SSH_HOST }}
username: ${{ secrets.SFU_SSH_USER }}
password: ${{ secrets.SFU_SSH_PASSWORD }}
port: ${{ secrets.SFU_SSH_PORT }}
source: "build/*"
target: ${{ secrets.SFU_DEPLOY_PATH }}
strip_components: 1
rm: false
debug: true
flatten: true
- name: Restart Docker containers
uses: appleboy/ssh-action@v1.2.5
with:
host: ${{ secrets.SFU_SSH_HOST }}
username: ${{ secrets.SFU_SSH_USER }}
password: ${{ secrets.SFU_SSH_PASSWORD }}
port: ${{ secrets.SFU_SSH_PORT }}
script: |
cd ${{ secrets.SFU_DEPLOY_PATH }}
docker-compose down
docker compose up -d --build

6
.gitignore vendored
View File

@@ -1,2 +1,4 @@
.env .vscode
.vscode __debug*
build/.env
build/g365sfu

5
Makefile Normal file
View File

@@ -0,0 +1,5 @@
.PHONY: build
build:
mkdir -p build
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o build/g365sfu .

View File

@@ -1,2 +1,29 @@
### SFU server # G365SFU
Сервер для организации видеоконференций на основе WebRTC. Написан на Go и использует библиотеку Pion WebRTC. G365SFU - это медиасервер, который используется для организации звонков. Он работает по протоколам WebSocket (шлюз/gateway) и WebRTC (для медиа-трафика) и состоит из трех основных компонентов: шлюза (gateway), SFU сервера, TURN сервера. SFU занимается пересылкой медиа-потоков между участниками звонка, а TURN сервер обеспечивает возможность связи между участниками, находящимися за NAT или брандмауэром. **ВАЖНО!** G365SFU не занимается сигнализацией, а только пересылает медиа-трафик, поэтому для организации звонков необходимо использовать отдельный сервер сигнализации, который будет обмениваться данными о подключении между участниками и SFU. **ВАЖНО!** Пользователи в звонках не соединяются друг с другом напрямую, они "созваниваются" с сервером SFU, используя его в качестве посредника для передачи медиа-трафика. Сервер SFU при этом не может расшифровать звонок, так как ему приходят уже зашифрованные RTP пакеты, которые он просто пересылает между участниками.
### Disclaimer
В Docker работает только на Linux (Ubuntu 22.04 и выше). На других платформах нужно запускать бинарный файл напрямую ./build/g365sfu, предварительно собрав его с помощью команды `make build`. Связано это с публикацией огромного диапазона портов для SFU и TURN сервера, что не поддерживается в Docker на других платформах. Сейчас Docker использует host-network, который поддерживает только ядра Linux, что позволяет обойти эту проблему.
### Gatway
Gateway - это компонент, который обеспечивает связь между Вашим сервером сигналинга (в Rosetta он написан на Java) и SFU сервером. Он принимает WebSocket соединения от сервера сигнализации и обрабатывает сообщения, связанные с управлением звонками, такими как создание комнаты, присоединение к комнате, отключение от комнаты и т.д. Gateway также отвечает за аутентификацию пользователей и управление их сессиями. В G365SFU используется встроенный шлюз, который можно настроить через переменные окружения. Он будет слушать на порту 1001 (по умолчанию) для входящих WebSocket соединений от сервера сигнализации. Gateway обеспечивает надежную связь между сервером сигнализации и SFU сервером, позволяя эффективно управлять звонками и участниками.
### SFU сервер
SFU - Selective Forwarding Unit - это тип медиасервера, который принимает медиа-потоки от участников видеоконференции и пересылает их другим участникам без декодирования и повторного кодирования. Это позволяет снизить нагрузку на сервер и улучшить качество видео для всех участников. Сейчас G365SFU просто пересылает RTP пакеты между участниками, не обрабатывая их содержимое (потому, что оно зашифровано). В будущем планируется добавить возможность обработки слоев улучшения.
### TURN сервер
TURN - Traversal Using Relays around NAT - это протокол, который позволяет устройствам за NAT (Network Address Translation) или брандмауэром устанавливать связь с другими устройствами в интернете. TURN серверы используются для ретрансляции медиа-трафика между участниками видеоконференции, когда прямое соединение между ними невозможно из-за ограничений сети. В G365SFU используется встроенный TURN сервер, который можно включить с помощью переменной окружения `TURN_ALLOW=true`. Он будет слушать на порту 3478 и использовать диапазон портов от 40000 до 50000 для ретрансляции трафика. Параметры сервера, такие как публичный IP, имя пользователя и пароль, также настраиваются через переменные окружения. TURN сервер обеспечивает надежную связь между участниками звонка, даже если они находятся за NAT.
# Установка (Ubuntu 22.04 и выше)
Для начала, нам необходимо открыть порты 30000-39999 для SFU и 40000-50000 для TURN сервера (по умолчанию, если перенастраивается .env то нужно указать другие). Это можно сделать с помощью команды `ufw`:
```bash
sudo ufw allow 30000:39999/udp
sudo ufw allow 40000:50000/udp
sudo ufw allow 30000:39999/tcp
sudo ufw allow 40000:50000/tcp
sudo ufw allow 3478/tcp
```
Запускаем докер-контейнер с G365SFU, указав необходимые переменные окружения в файле `build/.env` (пример можно взять из корневой папки проекта .env). Для этого нужно выполнить следующую команду в терминале, находясь в папке build:
```bash
docker compose up -d
```

View File

@@ -1,9 +1,15 @@
package boot package boot
import ( import (
"encoding/json"
"g365sfu/bytebuffer"
"g365sfu/logger" "g365sfu/logger"
"g365sfu/network"
"g365sfu/sfu" "g365sfu/sfu"
"g365sfu/socket" "g365sfu/socket"
connection "g365sfu/socket/struct"
"g365sfu/turn"
"g365sfu/utils"
"net/http" "net/http"
"os" "os"
@@ -24,14 +30,100 @@ func Bootstrap() {
port = "1001" port = "1001"
} }
sfu.OnLocalICECandidate = OnLocalICECandidate sfu.OnLocalICECandidate = OnLocalICECandidate
logger.LogInfoMessage("server started at x.x.x.x:" + port) sfu.OnServerOffer = OnServerOffer
sfu.OnRoomDelete = OnRoomDelete
sfu.OnPeerDisconnected = OnPeerDisconnected
if os.Getenv("TURN_ALLOW") == "true" {
turnServer, err := turn.Start(turn.Config{
ListenAddr: "0.0.0.0:3478",
PublicIP: os.Getenv("TURN_PUBLIC_IP"),
Realm: "g365sfu",
Username: os.Getenv("TURN_USER"),
Password: os.Getenv("TURN_PASS"),
MinRelayPort: uint16(utils.AtoiOrDefault(os.Getenv("TURN_PORT_RANGE_FROM"), 40000)),
MaxRelayPort: uint16(utils.AtoiOrDefault(os.Getenv("TURN_PORT_RANGE_TO"), 50000)),
})
if err != nil {
logger.LogWarnMessage("error while starting TURN server: " + err.Error())
logger.LogInfoMessage("starting without TURN server, peer connections may fail if clients are behind symmetric NATs")
} else {
logger.LogInfoMessage("server TURN started at 0.0.0.0:3478")
// Заполняем глобальные переменные для TURN провайдера, чтобы их могли использовать другие части приложения
// Обратите внимание, заполняем их только в случе успешного старта Turn сервера
turn.TURN_PASS = os.Getenv("TURN_PASS")
turn.TURN_USER = os.Getenv("TURN_USER")
turn.TURN_PUBLIC_IP = os.Getenv("TURN_PUBLIC_IP")
defer turnServer.Close()
}
} else {
// TURN сервер выключен в конфиге, что может влиять на соединение некоторых пользователей
logger.LogInfoMessage("starting without TURN server, peer connections may fail if clients are behind symmetric NATs")
}
logger.LogInfoMessage("server SFU started at x.x.x.x:" + port)
http.ListenAndServe(":"+port, nil) http.ListenAndServe(":"+port, nil)
} }
func OnLocalICECandidate(roomID, peerID string, candidate webrtc.ICECandidateInit) { // Коллбек для обработки новых ICE кандидатов от сервера к пиру
logger.LogInfoMessage("new local ICE candidate for peer " + peerID + " in room " + roomID) func OnLocalICECandidate(roomID string, peerID string, candidate webrtc.ICECandidateInit) {
room, exists := sfu.GetRoom(roomID)
if !exists {
logger.LogWarnMessage("tried to send local ICE candidate to non existing room " + roomID)
return
}
jsonCandidate, _ := json.Marshal(candidate)
buffer := bytebuffer.Allocate(
1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)) + 4 + len(jsonCandidate),
)
buffer.Put(byte(network.ON_LOCAL_ICE_CANDIDATE))
buffer.PutUint32(uint32(len([]byte(roomID))))
buffer.PutBytes([]byte(roomID))
buffer.PutUint32(uint32(len([]byte(peerID))))
buffer.PutBytes([]byte(peerID))
buffer.PutUint32(uint32(len([]byte(jsonCandidate))))
buffer.PutBytes([]byte(jsonCandidate))
buffer.Flip()
room.Server.WriteBinary(buffer.Bytes())
} }
// Обработка нового оффера от сервера для конкретного пира (при renegotiation)
func OnServerOffer(roomID string, peerID string, offer webrtc.SessionDescription) { func OnServerOffer(roomID string, peerID string, offer webrtc.SessionDescription) {
logger.LogInfoMessage("new server offer for peer " + peerID + " in room " + roomID) room, exists := sfu.GetRoom(roomID)
if !exists {
logger.LogWarnMessage("tried to send server offer to non existing room " + roomID)
return
}
jsonOffer, _ := json.Marshal(offer)
buffer := bytebuffer.Allocate(
1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)) + 4 + len(jsonOffer),
)
buffer.Put(byte(network.ON_SERVER_OFFER))
buffer.PutUint32(uint32(len([]byte(roomID))))
buffer.PutBytes([]byte(roomID))
buffer.PutUint32(uint32(len([]byte(peerID))))
buffer.PutBytes([]byte(peerID))
buffer.PutUint32(uint32(len([]byte(jsonOffer))))
buffer.PutBytes([]byte(jsonOffer))
buffer.Flip()
room.Server.WriteBinary(buffer.Bytes())
}
func OnRoomDelete(roomID string, server *connection.Connection) {
buffer := bytebuffer.Allocate(1 + 4 + len([]byte(roomID)))
buffer.Put(byte(network.ON_ROOM_DELETE))
buffer.PutUint32(uint32(len([]byte(roomID))))
buffer.PutBytes([]byte(roomID))
buffer.Flip()
server.WriteBinary(buffer.Bytes())
}
func OnPeerDisconnected(roomID string, peerID string, server *connection.Connection, reason sfu.DisconnectReason) {
buffer := bytebuffer.Allocate(1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)) + 4)
buffer.Put(byte(network.ON_PEER_DISCONNECTED))
buffer.PutUint32(uint32(len([]byte(roomID))))
buffer.PutBytes([]byte(roomID))
buffer.PutUint32(uint32(len([]byte(peerID))))
buffer.PutBytes([]byte(peerID))
buffer.PutUint32(uint32(reason))
buffer.Flip()
server.WriteBinary(buffer.Bytes())
} }

18
build/Dockerfile Normal file
View File

@@ -0,0 +1,18 @@
FROM alpine:3.20
RUN apk add --no-cache ca-certificates tzdata
WORKDIR /app
# Бинарь должен быть собран заранее в ./build/g365sfu
COPY g365sfu /app/g365sfu
COPY .env /app/.env
RUN chmod +x /app/g365sfu
EXPOSE 1001
EXPOSE 3478/tcp
EXPOSE 3478/udp
ENTRYPOINT ["/app/g365sfu"]

13
build/docker-compose.yml Normal file
View File

@@ -0,0 +1,13 @@
services:
g365sfu:
container_name: g365sfu
build:
context: .
dockerfile: Dockerfile
image: g365sfu:latest
restart: unless-stopped
env_file:
- .env
#Поддерживается только в Linux (на Windows и MacOS нужно использовать bridge сеть и проброс портов, но нельзя пробрасывать большие
#диапазоны портов, это может вызывать проблемы с работой Docker)
network_mode: host

View File

@@ -20,7 +20,7 @@ func LogInfoMessage(message string) {
fmt.Printf("%s[g365sfu] %s[%s]%s %s[INFO]%s %s\n", fmt.Printf("%s[g365sfu] %s[%s]%s %s[INFO]%s %s\n",
colorBlue, colorBlue,
colorGray, timestamp, colorReset, colorGray, timestamp, colorReset,
colorGreen, colorReset, colorCyan, colorReset,
message, message,
) )
} }
@@ -60,7 +60,7 @@ func LogSuccessMessage(message string) {
fmt.Printf("%s[g365sfu] %s[%s]%s %s[SUCCESS]%s %s\n", fmt.Printf("%s[g365sfu] %s[%s]%s %s[SUCCESS]%s %s\n",
colorBlue, colorBlue,
colorGray, timestamp, colorReset, colorGray, timestamp, colorReset,
colorCyan, colorReset, colorGreen, colorReset,
message, message,
) )
} }

20
network/incoming.go Normal file
View File

@@ -0,0 +1,20 @@
package network
// Входящие пакеты от бекендов для SFU
var (
// Рукопожатие от бекенда при подключении
HANDSHAKE = 0x01
// Запрос на создание комнаты
ROOM_CREATE = 0x02
// SDP OFFER от бекенда для подключения к комнате
SDP_OFFER = 0x03
// ICE кандидат от бекенда от конкретного peer
ICE_CANDIDATE = 0x06
// SDP ANSWER от клиента при renegotiation
SDP_ANSWER_RENEGOTIATION = 0x07
// Check life для проверки соединения с сервером SFU
CHECK_LIFE = 0xAE
// Запрос от бекенда на получение данных для подключения к TURN серверу
TURN_ASK = 0x19
)

25
network/outgoing.go Normal file
View File

@@ -0,0 +1,25 @@
package network
// События для отправки подключенным бекендам
var (
// Успешное рукопожатие
HANDSHAKE_SUCCESS = 0x01
// Неудачное рукопожатие
HANDSHAKE_FAILURE = 0xFF
// Локальный ICE-кандидат для конкретного peer
ON_LOCAL_ICE_CANDIDATE = 0x04
// Новый оффер от сервера для конкретного peer (при renegotiation)
ON_SERVER_OFFER = 0x08
// Удаление комнаты
ON_ROOM_DELETE = 0x10
// При отключении пира (обрыве связи)
ON_PEER_DISCONNECTED = 0x11
// Успешное создание комнаты
ROOM_CREATE_SUCCESS = 0x02
// SDP Answer от локального сервера при подключении к комнате
SDP_ANSWER = 0x05
// Сервер SFU отправит этот пакет бекенду при получении Check life для подтверждения, что соединение живо
CHECK_LIFE_SUCCESS = 0xAE
// Ответ от сервера при вопросе о TURN
TURN_SERVER = 0x19
)

View File

@@ -2,75 +2,358 @@ package sfu
import ( import (
"fmt" "fmt"
"g365sfu/logger"
"io" "io"
"strings"
"sync"
"time"
"github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4" "github.com/pion/webrtc/v4"
) )
// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты var (
func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) { pendingMu sync.Mutex
publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { pendingCandidates = map[string][]webrtc.ICECandidateInit{} // key: roomID|peerID
localTrack, err := webrtc.NewTrackLocalStaticRTP(
remote.Codec().RTPCodecCapability, renegMu sync.Map // key -> *sync.Mutex
fmt.Sprintf("%s:%s", publisherPeerID, remote.ID()), renegPending sync.Map // key -> bool
remote.StreamID(), )
)
if err != nil { const (
disconnectGracePeriod = 30 * time.Second
outboundPacketBuffer = 256
maxConsecutiveReadErrs = 25
maxConsecutiveWriteErrs = 50
packetDropLogEvery = 100
)
func peerKey(roomID, peerID string) string {
return roomID + "|" + peerID
}
func getRenegLock(roomID, peerID string) *sync.Mutex {
k := peerKey(roomID, peerID)
v, _ := renegMu.LoadOrStore(k, &sync.Mutex{})
return v.(*sync.Mutex)
}
func setRenegPending(roomID, peerID string, v bool) {
renegPending.Store(peerKey(roomID, peerID), v)
}
func popRenegPending(roomID, peerID string) bool {
k := peerKey(roomID, peerID)
v, ok := renegPending.Load(k)
if ok {
renegPending.Delete(k)
}
if !ok {
return false
}
b, _ := v.(bool)
return b
}
func extractICEUfrag(sdp string) string {
for _, line := range strings.Split(sdp, "\n") {
line = strings.TrimSpace(line)
if strings.HasPrefix(line, "a=ice-ufrag:") {
return strings.TrimPrefix(line, "a=ice-ufrag:")
}
}
return ""
}
func removeRoomTrack(roomID, trackID string) {
room, ok := GetRoom(roomID)
if !ok {
return
}
room.mu.Lock()
defer room.mu.Unlock()
out := room.Tracks[:0]
for _, t := range room.Tracks {
if t.TrackID != trackID {
out = append(out, t)
}
}
room.Tracks = out
}
func isPeerConnectionAlive(pc *webrtc.PeerConnection) bool {
state := pc.ConnectionState()
return state != webrtc.PeerConnectionStateClosed &&
state != webrtc.PeerConnectionStateFailed
}
// Вешается на каждый PeerConnection при JoinWithOffer.
// Автоматически вызывает LeaveRoom при обрыве соединения и очищает состояние ретрансляции.
func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) {
var (
mu sync.Mutex
disconnecting bool
timer *time.Timer
leaveOnce sync.Once
)
room, exists := GetRoom(roomID)
if !exists {
logger.LogWarnMessage("BindPeerLifecycle: tried to bind non existing room " + roomID)
return
}
server := room.Server
cancelTimer := func() {
mu.Lock()
defer mu.Unlock()
if timer != nil {
timer.Stop()
timer = nil
}
disconnecting = false
}
leaveAndNotify := func(reason DisconnectReason) {
leaveOnce.Do(func() {
cancelTimer()
err := LeaveRoom(roomID, peerID)
if OnPeerDisconnected != nil && err == nil {
OnPeerDisconnected(roomID, peerID, server, reason)
}
})
}
startTimer := func() {
mu.Lock()
defer mu.Unlock()
if disconnecting {
return return
} }
// Добавляем этот track всем, кроме publisher disconnecting = true
roomsMu.RLock() timer = time.AfterFunc(disconnectGracePeriod, func() {
room, ok := rooms[roomID] state := pc.ConnectionState()
if !ok { if state == webrtc.PeerConnectionStateConnected {
roomsMu.RUnlock() mu.Lock()
disconnecting = false
mu.Unlock()
return
}
leaveAndNotify(DisconnectReasonFailed)
})
}
pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
switch state {
case webrtc.ICEConnectionStateConnected, webrtc.ICEConnectionStateCompleted:
cancelTimer()
case webrtc.ICEConnectionStateDisconnected:
startTimer()
case webrtc.ICEConnectionStateClosed:
leaveAndNotify(DisconnectReasonClosed)
case webrtc.ICEConnectionStateFailed:
leaveAndNotify(DisconnectReasonFailed)
}
})
pc.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
switch state {
case webrtc.PeerConnectionStateConnected:
cancelTimer()
case webrtc.PeerConnectionStateDisconnected:
startTimer()
case webrtc.PeerConnectionStateClosed:
leaveAndNotify(DisconnectReasonClosed)
case webrtc.PeerConnectionStateFailed:
leaveAndNotify(DisconnectReasonFailed)
}
})
}
// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты.
func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) {
publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
localTrackID := fmt.Sprintf("%s:%s:%s:%d",
publisherPeerID,
remote.Kind().String(),
remote.ID(),
remote.SSRC(),
)
localTrack, err := webrtc.NewTrackLocalStaticRTP(
remote.Codec().RTPCodecCapability,
localTrackID,
remote.StreamID(),
)
if err != nil {
logger.LogErrorMessage("SetupForwardingForPeer: NewTrackLocalStaticRTP error: " + err.Error())
return return
} }
defer removeRoomTrack(roomID, localTrack.ID())
room, ok := GetRoom(roomID)
if !ok {
return
}
room.mu.Lock()
room.Tracks = append(room.Tracks, RoomTrack{
TrackID: localTrack.ID(),
OwnerPeer: publisherPeerID,
Local: localTrack,
})
peers := make([]Peer, len(room.Peers)) peers := make([]Peer, len(room.Peers))
copy(peers, room.Peers) copy(peers, room.Peers)
roomsMu.RUnlock() room.mu.Unlock()
for _, sub := range peers { for _, sub := range peers {
if sub.PeerID == publisherPeerID { if sub.PeerID == publisherPeerID {
continue continue
} }
sender, err := sub.PeerConnection.AddTrack(localTrack) if !isPeerConnectionAlive(sub.PeerConnection) {
if err != nil {
continue continue
} }
// RTCP drain обязателен sender, err := sub.PeerConnection.AddTrack(localTrack)
if err != nil {
logger.LogWarnMessage("SetupForwardingForPeer: AddTrack error: " + sub.PeerID + " " + err.Error())
continue
}
senderCopy := sender
go func() { go func() {
buf := make([]byte, 1500) buf := make([]byte, 1500)
for { for {
if _, _, e := sender.Read(buf); e != nil { if _, _, e := senderCopy.Read(buf); e != nil {
return return
} }
} }
}() }()
// После AddTrack нужна renegotiation для подписчика subID := sub.PeerID
_ = renegotiatePeer(roomID, sub.PeerID, sub.PeerConnection) subPC := sub.PeerConnection
go func() {
if err := renegotiatePeer(roomID, subID, subPC); err != nil {
logger.LogWarnMessage("SetupForwardingForPeer: renegotiatePeer error: " + subID + " " + err.Error())
}
}()
} }
// Пересылаем RTP пакеты от издателя всем подписчикам if remote.Kind() == webrtc.RTPCodecTypeVideo {
_ = publisherPC.WriteRTCP([]rtcp.Packet{
&rtcp.PictureLossIndication{MediaSSRC: uint32(remote.SSRC())},
})
}
// Отделяем чтение входящего RTP от записи в TrackLocal bounded-очередью.
// Если downstream начинает тормозить, пакеты дропаются из очереди,
// а не накапливают бесконечную задержку.
packetQueue := make(chan *rtp.Packet, outboundPacketBuffer)
writerDone := make(chan struct{})
defer close(packetQueue)
go func() {
defer close(writerDone)
consecutiveWriteErrs := 0
for pkt := range packetQueue {
if err := localTrack.WriteRTP(pkt); err != nil {
consecutiveWriteErrs++
if consecutiveWriteErrs == 1 || consecutiveWriteErrs%10 == 0 {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: WriteRTP error publisher=%s track=%s count=%d err=%v",
publisherPeerID, localTrack.ID(), consecutiveWriteErrs, err,
))
}
if consecutiveWriteErrs >= maxConsecutiveWriteErrs {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: too many WriteRTP errors publisher=%s track=%s",
publisherPeerID, localTrack.ID(),
))
return
}
continue
}
consecutiveWriteErrs = 0
}
}()
consecutiveReadErrs := 0
droppedPackets := 0
for { for {
pkt, _, err := remote.ReadRTP() pkt, _, err := remote.ReadRTP()
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
return return
} }
return
consecutiveReadErrs++
if consecutiveReadErrs == 1 || consecutiveReadErrs%10 == 0 {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: ReadRTP error publisher=%s track=%s count=%d err=%v",
publisherPeerID, localTrack.ID(), consecutiveReadErrs, err,
))
}
if consecutiveReadErrs >= maxConsecutiveReadErrs {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: too many ReadRTP errors publisher=%s track=%s",
publisherPeerID, localTrack.ID(),
))
return
}
time.Sleep(10 * time.Millisecond)
continue
} }
if err = localTrack.WriteRTP(pkt); err != nil {
consecutiveReadErrs = 0
select {
case <-writerDone:
return return
default:
}
select {
case <-writerDone:
return
case packetQueue <- pkt:
default:
droppedPackets++
if droppedPackets == 1 || droppedPackets%packetDropLogEvery == 0 {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: packet queue overflow publisher=%s track=%s dropped=%d",
publisherPeerID, localTrack.ID(), droppedPackets,
))
}
} }
} }
}) })
} }
func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error { func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error {
lock := getRenegLock(roomID, peerID)
lock.Lock()
defer lock.Unlock()
if !isPeerConnectionAlive(pc) {
return nil
}
if pc.SignalingState() != webrtc.SignalingStateStable {
setRenegPending(roomID, peerID, true)
return nil
}
offer, err := pc.CreateOffer(nil) offer, err := pc.CreateOffer(nil)
if err != nil { if err != nil {
return err return err
@@ -88,31 +371,14 @@ func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error {
return nil return nil
} }
// Добавляет ICE-кандидата к пиру // Добавляет ICE-кандидата к пиру.
func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidateInit) error { func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidateInit) error {
room, exists := GetRoom(roomID) room, exists := GetRoom(roomID)
if !exists { if !exists {
return ErrRoomNotFound return ErrRoomNotFound
} }
for _, peer := range room.Peers { room.mu.RLock()
if peer.PeerID == peerID {
return peer.PeerConnection.AddICECandidate(candidate)
}
}
return ErrPeerNotFound
}
// Обрабатывает SDP ответ от клиента при renegotiation
func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescription) error {
roomsMu.RLock()
room, ok := rooms[roomID]
if !ok {
roomsMu.RUnlock()
return ErrRoomNotFound
}
var pc *webrtc.PeerConnection var pc *webrtc.PeerConnection
for _, p := range room.Peers { for _, p := range room.Peers {
if p.PeerID == peerID { if p.PeerID == peerID {
@@ -120,10 +386,88 @@ func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescr
break break
} }
} }
roomsMu.RUnlock() room.mu.RUnlock()
if pc == nil { if pc == nil {
return ErrPeerNotFound return ErrPeerNotFound
} }
return pc.SetRemoteDescription(answer)
rd := pc.RemoteDescription()
if rd == nil {
pendingMu.Lock()
k := peerKey(roomID, peerID)
pendingCandidates[k] = append(pendingCandidates[k], candidate)
pendingMu.Unlock()
return nil
}
if candidate.UsernameFragment != nil {
current := extractICEUfrag(rd.SDP)
if current != "" && *candidate.UsernameFragment != current {
return nil
}
}
err := pc.AddICECandidate(candidate)
if err != nil && strings.Contains(err.Error(), "doesn't match the current ufrags") {
return nil
}
return err
}
// Обрабатывает SDP answer от клиента при renegotiation.
func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescription) error {
if answer.Type != webrtc.SDPTypeAnswer {
return fmt.Errorf("invalid sdp type: %s", answer.Type.String())
}
room, exists := GetRoom(roomID)
if !exists {
return ErrRoomNotFound
}
room.mu.RLock()
var pc *webrtc.PeerConnection
for _, p := range room.Peers {
if p.PeerID == peerID {
pc = p.PeerConnection
break
}
}
room.mu.RUnlock()
if pc == nil {
return ErrPeerNotFound
}
if err := pc.SetRemoteDescription(answer); err != nil {
return err
}
k := peerKey(roomID, peerID)
pendingMu.Lock()
queue := pendingCandidates[k]
delete(pendingCandidates, k)
pendingMu.Unlock()
for _, c := range queue {
_ = AddICECandidate(roomID, peerID, c)
}
if popRenegPending(roomID, peerID) {
_ = renegotiatePeer(roomID, peerID, pc)
}
return nil
}
func cleanupForwardingState(roomID, peerID string) {
k := peerKey(roomID, peerID)
pendingMu.Lock()
delete(pendingCandidates, k)
pendingMu.Unlock()
renegPending.Delete(k)
renegMu.Delete(k)
} }

11
sfu/reason.go Normal file
View File

@@ -0,0 +1,11 @@
package sfu
// Причины отключения пира от комнаты, которые могут быть использованы для логирования или уведомлений
type DisconnectReason int
const (
// Пир отключился из-за ошибки соединения или другой проблемы
DisconnectReasonFailed DisconnectReason = 0
// Пир отключился по своей инициативе (например, закрыл приложение)
DisconnectReasonClosed = 1
)

View File

@@ -1,8 +1,10 @@
package sfu package sfu
import ( import (
"g365sfu/logger"
connection "g365sfu/socket/struct" connection "g365sfu/socket/struct"
"sync" "sync"
"time"
"github.com/pion/webrtc/v4" "github.com/pion/webrtc/v4"
) )
@@ -16,6 +18,12 @@ type Peer struct {
PeerConnection *webrtc.PeerConnection PeerConnection *webrtc.PeerConnection
} }
type RoomTrack struct {
TrackID string
OwnerPeer string
Local *webrtc.TrackLocalStaticRTP
}
type Room struct { type Room struct {
//Уникальный идентификатор комнаты //Уникальный идентификатор комнаты
RoomID string RoomID string
@@ -23,6 +31,12 @@ type Room struct {
Server *connection.Connection Server *connection.Connection
//Пиры которые подключились к комнате //Пиры которые подключились к комнате
Peers []Peer Peers []Peer
Tracks []RoomTrack
mu sync.RWMutex
emptyTimer *time.Timer
} }
// Общие переменные // Общие переменные
@@ -31,6 +45,46 @@ var (
roomsMu sync.RWMutex roomsMu sync.RWMutex
) )
const emptyRoomTTL = 30 * time.Second
func scheduleEmptyRoomDeletion(room *Room) {
room.mu.Lock()
if len(room.Peers) > 0 {
if room.emptyTimer != nil {
room.emptyTimer.Stop()
room.emptyTimer = nil
}
room.mu.Unlock()
return
}
if room.emptyTimer != nil {
room.mu.Unlock()
return
}
roomID := room.RoomID
var timer *time.Timer
timer = time.AfterFunc(emptyRoomTTL, func() {
currentRoom, exists := GetRoom(roomID)
if !exists {
return
}
currentRoom.mu.Lock()
if currentRoom.emptyTimer != timer || len(currentRoom.Peers) > 0 {
currentRoom.mu.Unlock()
return
}
currentRoom.emptyTimer = nil
currentRoom.mu.Unlock()
if err := DeleteRoom(roomID); err != nil && err != ErrRoomNotFound {
logger.LogWarnMessage("scheduleEmptyRoomDeletion: failed to delete room " + roomID + ": " + err.Error())
}
})
room.emptyTimer = timer
room.mu.Unlock()
}
// CreateRoom создает комнату // CreateRoom создает комнату
func CreateRoom(server *connection.Connection, roomID string) (*Room, error) { func CreateRoom(server *connection.Connection, roomID string) (*Room, error) {
roomsMu.Lock() roomsMu.Lock()
@@ -42,6 +96,7 @@ func CreateRoom(server *connection.Connection, roomID string) (*Room, error) {
Peers: []Peer{}, Peers: []Peer{},
} }
rooms[roomID] = room rooms[roomID] = room
scheduleEmptyRoomDeletion(room)
return room, nil return room, nil
} }
@@ -54,13 +109,6 @@ func GetRoom(roomID string) (*Room, bool) {
return room, exists return room, exists
} }
// DeleteRoom удаляет комнату по идентификатору
func DeleteRoom(roomID string) {
roomsMu.Lock()
defer roomsMu.Unlock()
delete(rooms, roomID)
}
// JoinWithOffer позволяет пиру присоединиться к комнате с помощью SDP оффера // JoinWithOffer позволяет пиру присоединиться к комнате с помощью SDP оффера
func JoinWithOffer(roomID string, peerID string, offer webrtc.SessionDescription) (*webrtc.SessionDescription, error) { func JoinWithOffer(roomID string, peerID string, offer webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
room, exists := GetRoom(roomID) room, exists := GetRoom(roomID)
@@ -73,43 +121,117 @@ func JoinWithOffer(roomID string, peerID string, offer webrtc.SessionDescription
return nil, err return nil, err
} }
// SFU локальные ICE-кандидаты отправляем сначала бекенду затем тот их BindPeerLifecycle(roomID, peerID, peerConnection)
// пересылает клиенту для установления соединения SetupForwardingForPeer(roomID, peerID, peerConnection)
if err = peerConnection.SetRemoteDescription(offer); err != nil {
_ = peerConnection.Close()
return nil, err
}
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
_ = peerConnection.Close()
return nil, err
}
gatherDone := webrtc.GatheringCompletePromise(peerConnection)
if err = peerConnection.SetLocalDescription(answer); err != nil {
_ = peerConnection.Close()
return nil, err
}
<-gatherDone
peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) { peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) {
if c == nil { if c == nil {
return // gathering finished return
} }
if OnLocalICECandidate != nil { if OnLocalICECandidate != nil {
OnLocalICECandidate(roomID, peerID, c.ToJSON()) OnLocalICECandidate(roomID, peerID, c.ToJSON())
} }
}) })
err = peerConnection.SetRemoteDescription(offer) // Добавляем peer в комнату и сразу снимаем snapshot существующих треков
if err != nil { // в одном локе — чтобы не было race с OnTrack
return nil, err room.mu.Lock()
if room.emptyTimer != nil {
room.emptyTimer.Stop()
room.emptyTimer = nil
} }
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
return nil, err
}
err = peerConnection.SetLocalDescription(answer)
if err != nil {
return nil, err
}
// Настраиваем пересылку RTP пакетов от издателя к другим участникам комнаты
SetupForwardingForPeer(roomID, peerID, peerConnection)
room.Peers = append(room.Peers, Peer{ room.Peers = append(room.Peers, Peer{
PeerID: peerID, PeerID: peerID,
PeerConnection: peerConnection, PeerConnection: peerConnection,
}) })
existingTracks := make([]RoomTrack, len(room.Tracks))
copy(existingTracks, room.Tracks)
room.mu.Unlock()
// Подписываем нового peer на уже существующие треки ПОСЛЕ добавления в комнату
for _, t := range existingTracks {
if t.OwnerPeer == peerID {
continue
}
sender, err := peerConnection.AddTrack(t.Local)
if err != nil {
continue
}
senderCopy := sender
go func() {
buf := make([]byte, 1500)
for {
if _, _, e := senderCopy.Read(buf); e != nil {
return
}
}
}()
}
// Если были добавлены треки — нужна renegotiation
if len(existingTracks) > 0 {
go func() {
if err := renegotiatePeer(roomID, peerID, peerConnection); err != nil {
logger.LogWarnMessage("JoinWithOffer: renegotiatePeer error: " + err.Error())
}
}()
}
return peerConnection.LocalDescription(), nil return peerConnection.LocalDescription(), nil
} }
func DeleteRoom(roomID string) error {
roomsMu.Lock()
room, exists := rooms[roomID]
server := room.Server
if !exists {
roomsMu.Unlock()
return ErrRoomNotFound
}
delete(rooms, roomID)
roomsMu.Unlock()
room.mu.Lock()
if room.emptyTimer != nil {
room.emptyTimer.Stop()
room.emptyTimer = nil
}
peers := make([]Peer, len(room.Peers))
copy(peers, room.Peers)
room.Peers = nil
room.Tracks = nil
room.mu.Unlock()
for _, p := range peers {
_ = p.PeerConnection.Close()
cleanupForwardingState(roomID, p.PeerID)
}
OnRoomDelete(roomID, server)
return nil
}
// LeaveRoom позволяет пиру покинуть комнату // LeaveRoom позволяет пиру покинуть комнату
func LeaveRoom(roomID string, peerID string) error { func LeaveRoom(roomID string, peerID string) error {
room, exists := GetRoom(roomID) room, exists := GetRoom(roomID)
@@ -117,12 +239,60 @@ func LeaveRoom(roomID string, peerID string) error {
return ErrRoomNotFound return ErrRoomNotFound
} }
for i, peer := range room.Peers { var (
if peer.PeerID == peerID { removedPC *webrtc.PeerConnection
peer.PeerConnection.Close() removed bool
room.Peers = append(room.Peers[:i], room.Peers[i+1:]...) shouldDrop bool
break )
room.mu.Lock()
// удаляем peer
nextPeers := make([]Peer, 0, len(room.Peers))
for _, p := range room.Peers {
if p.PeerID == peerID {
removedPC = p.PeerConnection
removed = true
continue
} }
nextPeers = append(nextPeers, p)
}
room.Peers = nextPeers
// удаляем треки этого publisher
nextTracks := room.Tracks[:0]
for _, t := range room.Tracks {
if t.OwnerPeer != peerID {
nextTracks = append(nextTracks, t)
}
}
room.Tracks = nextTracks
shouldDrop = len(room.Peers) == 0
room.mu.Unlock()
if !removed {
return ErrPeerNotFound
}
if removedPC != nil {
_ = removedPC.Close()
}
cleanupForwardingState(roomID, peerID)
// Комната пустая -> планируем удаление через TTL
if shouldDrop {
scheduleEmptyRoomDeletion(room)
return nil
}
// renegotiation оставшимся peer после удаления треков/peer
room.mu.RLock()
rest := make([]Peer, len(room.Peers))
copy(rest, room.Peers)
room.mu.RUnlock()
for _, p := range rest {
_ = renegotiatePeer(roomID, p.PeerID, p.PeerConnection)
} }
return nil return nil

View File

@@ -3,6 +3,8 @@ package sfu
import ( import (
"errors" "errors"
"g365sfu/logger" "g365sfu/logger"
connection "g365sfu/socket/struct"
"g365sfu/utils"
"os" "os"
"github.com/pion/interceptor" "github.com/pion/interceptor"
@@ -20,6 +22,12 @@ var OnServerOffer func(roomID string, peerID string, offer webrtc.SessionDescrip
// Коллбек для обработки новых ICE кандидатов // Коллбек для обработки новых ICE кандидатов
var OnLocalICECandidate func(roomID, peerID string, candidate webrtc.ICECandidateInit) var OnLocalICECandidate func(roomID, peerID string, candidate webrtc.ICECandidateInit)
// Коллбек для обработки отключения пира (обрыв связи)
var OnPeerDisconnected func(roomID, peerID string, server *connection.Connection, reason DisconnectReason)
// Коллбек для обработки удаления комнаты
var OnRoomDelete func(roomID string, server *connection.Connection)
// Ошибки // Ошибки
var ( var (
ErrRoomNotFound = errors.New("room not found") ErrRoomNotFound = errors.New("room not found")
@@ -27,12 +35,12 @@ var (
) )
func InitWebRTCEngines() { func InitWebRTCEngines() {
publicIP := os.Getenv("PUBLIC_IP") publicIP := os.Getenv("SFU_PUBLIC_IP")
fromPort := os.Getenv("PORT_RANGE_FROM") fromPort := utils.AtoiOrDefault(os.Getenv("SFU_PORT_RANGE_FROM"), 30000)
toPort := os.Getenv("PORT_RANGE_TO") toPort := utils.AtoiOrDefault(os.Getenv("SFU_PORT_RANGE_TO"), 39999)
if publicIP == "" || fromPort == "" || toPort == "" { if publicIP == "" || fromPort == 0 || toPort == 0 {
// Если не указаны необходимые переменные окружения, логируем ошибку и завершаем процесс сервера // Если не указаны необходимые переменные окружения, логируем ошибку и завершаем процесс сервера
logger.LogErrorMessage("PUBLIC_IP, PORT_RANGE_FROM and PORT_RANGE_TO environment variables must be set") logger.LogErrorMessage("SFU_PUBLIC_IP, SFU_PORT_RANGE_FROM and SFU_PORT_RANGE_TO environment variables must be set")
os.Exit(-1) os.Exit(-1)
return return
} }
@@ -43,9 +51,9 @@ func InitWebRTCEngines() {
_ = webrtc.RegisterDefaultInterceptors(m, i) _ = webrtc.RegisterDefaultInterceptors(m, i)
se := webrtc.SettingEngine{} se := webrtc.SettingEngine{}
_ = se.SetEphemeralUDPPortRange(40000, 50000) _ = se.SetEphemeralUDPPortRange(uint16(fromPort), uint16(toPort))
if publicIP := os.Getenv("PUBLIC_IP"); publicIP != "" { if publicIP := os.Getenv("SFU_PUBLIC_IP"); publicIP != "" {
se.SetICEAddressRewriteRules(webrtc.ICEAddressRewriteRule{ se.SetICEAddressRewriteRules(webrtc.ICEAddressRewriteRule{
External: []string{publicIP}, External: []string{publicIP},
AsCandidateType: webrtc.ICECandidateTypeHost, AsCandidateType: webrtc.ICECandidateTypeHost,

View File

@@ -4,8 +4,10 @@ import (
"encoding/json" "encoding/json"
"g365sfu/bytebuffer" "g365sfu/bytebuffer"
"g365sfu/logger" "g365sfu/logger"
"g365sfu/network"
"g365sfu/sfu" "g365sfu/sfu"
connection "g365sfu/socket/struct" connection "g365sfu/socket/struct"
"g365sfu/turn"
"g365sfu/utils" "g365sfu/utils"
"net/http" "net/http"
"os" "os"
@@ -30,7 +32,15 @@ func getSecret() string {
// Обработчик WebSocket соединений // Обработчик WebSocket соединений
func HandleWebSocket(w http.ResponseWriter, r *http.Request) { func HandleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, _ := upgrader.Upgrade(w, r, nil) conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logger.LogWarnMessage("failed to upgrade to websocket: " + err.Error())
return
}
if conn == nil {
logger.LogWarnMessage("failed to upgrade to websocket: connection is nil")
return
}
defer conn.Close() defer conn.Close()
// Канал для передачи байтов // Канал для передачи байтов
@@ -68,7 +78,7 @@ func processData(data <-chan []byte, connection *connection.Connection) {
// Логика обработки байтов // Логика обработки байтов
buffer := bytebuffer.Wrap(bytes) buffer := bytebuffer.Wrap(bytes)
packetId, _ := buffer.Get() packetId, _ := buffer.Get()
if packetId == 0x01 { if packetId == byte(network.HANDSHAKE) {
// Это рукопожатие, дальше сравниваем секретные ключи // Это рукопожатие, дальше сравниваем секретные ключи
secretLength, _ := buffer.GetUint32() secretLength, _ := buffer.GetUint32()
@@ -78,15 +88,19 @@ func processData(data <-chan []byte, connection *connection.Connection) {
AddSocket(connection) AddSocket(connection)
// Подготовка ответа для клиента о успешном рукопожатии // Подготовка ответа для клиента о успешном рукопожатии
response := bytebuffer.Allocate(1) response := bytebuffer.Allocate(1)
response.Put(0x01) response.Put(byte(network.HANDSHAKE_SUCCESS))
response.Flip() response.Flip()
// Отправляем ответ клиенту // Отправляем ответ клиенту
connection.Socket.WriteMessage(websocket.BinaryMessage, response.Bytes()) connection.WriteBinary(response.Bytes())
continue continue
} }
connection.Socket.WriteMessage(websocket.BinaryMessage, []byte{0xFF}) response := bytebuffer.Allocate(1)
response.Put(byte(network.HANDSHAKE_FAILURE))
response.Flip()
// Отправляем ответ клиенту
connection.WriteBinary(response.Bytes())
logger.LogWarnMessage("failed handshake from " + connection.Socket.RemoteAddr().String() + " because of invalid secret key") logger.LogWarnMessage("failed handshake from " + connection.Socket.RemoteAddr().String() + " because of invalid secret key")
connection.Socket.Close() connection.Close()
return return
} }
@@ -98,7 +112,7 @@ func processData(data <-chan []byte, connection *connection.Connection) {
} }
// Создание комнаты // Создание комнаты
if packetId == 0x02 { if packetId == byte(network.ROOM_CREATE) {
roomLength, _ := buffer.GetUint32() roomLength, _ := buffer.GetUint32()
roomIDBytes, _ := buffer.GetBytes(int(roomLength)) roomIDBytes, _ := buffer.GetBytes(int(roomLength))
roomID := string(roomIDBytes) roomID := string(roomIDBytes)
@@ -106,17 +120,17 @@ func processData(data <-chan []byte, connection *connection.Connection) {
logger.LogSuccessMessage("room initializated " + room.RoomID) logger.LogSuccessMessage("room initializated " + room.RoomID)
// Подготовка ответа для клиента с ID комнаты // Подготовка ответа для клиента с ID комнаты
response := bytebuffer.Allocate(1 + 4 + len([]byte(room.RoomID))) response := bytebuffer.Allocate(1 + 4 + len([]byte(room.RoomID)))
response.Put(0x02) response.Put(byte(network.ROOM_CREATE_SUCCESS))
response.PutUint32(uint32(len([]byte(room.RoomID)))) response.PutUint32(uint32(len([]byte(room.RoomID))))
response.PutBytes([]byte(room.RoomID)) response.PutBytes([]byte(room.RoomID))
response.Flip() response.Flip()
// Отправляем ответ клиенту // Отправляем ответ клиенту
connection.Socket.WriteMessage(websocket.BinaryMessage, response.Bytes()) connection.WriteBinary(response.Bytes())
continue continue
} }
//SDP OFFER для подключения к комнате //SDP OFFER для подключения к комнате
if packetId == 0x03 { if packetId == byte(network.SDP_OFFER) {
roomIdLen, _ := buffer.GetUint32() roomIdLen, _ := buffer.GetUint32()
roomIDBytes, _ := buffer.GetBytes(int(roomIdLen)) roomIDBytes, _ := buffer.GetBytes(int(roomIdLen))
roomID := string(roomIDBytes) roomID := string(roomIDBytes)
@@ -137,12 +151,117 @@ func processData(data <-chan []byte, connection *connection.Connection) {
logger.LogWarnMessage("failed to unmarshal offer from peer " + peerID + " in room " + roomID + ": " + err.Error()) logger.LogWarnMessage("failed to unmarshal offer from peer " + peerID + " in room " + roomID + ": " + err.Error())
continue continue
} }
_, err = sfu.JoinWithOffer(roomID, peerID, offer) answer, err := sfu.JoinWithOffer(roomID, peerID, offer)
if err != nil { if err != nil {
logger.LogWarnMessage("failed to join peer " + peerID + " to room " + roomID + ": " + err.Error()) logger.LogWarnMessage("failed to join peer " + peerID + " to room " + roomID + ": " + err.Error())
continue continue
} }
answerBytes, _ := json.Marshal(answer)
// Подготовка ответа для клиента с SDP answer
response := bytebuffer.Allocate(1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)) + 4 + len(answerBytes))
response.Put(byte(network.SDP_ANSWER))
response.PutUint32(uint32(len([]byte(roomID))))
response.PutBytes([]byte(roomID))
response.PutUint32(uint32(len([]byte(peerID))))
response.PutBytes([]byte(peerID))
response.PutUint32(uint32(len(answerBytes)))
response.PutBytes(answerBytes)
response.Flip()
// Отправляем ответ клиенту
connection.WriteBinary(response.Bytes())
continue continue
} }
//ICE-candidate пришел от участника комнаты
if packetId == byte(network.ICE_CANDIDATE) {
roomIdLen, _ := buffer.GetUint32()
roomIDBytes, _ := buffer.GetBytes(int(roomIdLen))
roomID := string(roomIDBytes)
peerIdLen, _ := buffer.GetUint32()
peerIDBytes, _ := buffer.GetBytes(int(peerIdLen))
peerID := string(peerIDBytes)
candidateLength, _ := buffer.GetUint32()
candidateBytes, _ := buffer.GetBytes(int(candidateLength))
var candidate webrtc.ICECandidateInit
err := json.Unmarshal(candidateBytes, &candidate)
if err != nil {
logger.LogWarnMessage("failed to unmarshal ICE candidate from peer " + peerID + " in room " + roomID + ": " + err.Error())
continue
}
err = sfu.AddICECandidate(roomID, peerID, candidate)
if err != nil {
logger.LogWarnMessage("failed to add ICE candidate from peer " + peerID + " in room " + roomID + ": " + err.Error())
continue
}
}
// SDP ANSWER от клиента при renegotiation
if packetId == byte(network.SDP_ANSWER_RENEGOTIATION) {
roomIdLen, _ := buffer.GetUint32()
roomIDBytes, _ := buffer.GetBytes(int(roomIdLen))
roomID := string(roomIDBytes)
peerIdLen, _ := buffer.GetUint32()
peerIDBytes, _ := buffer.GetBytes(int(peerIdLen))
peerID := string(peerIDBytes)
answerLength, _ := buffer.GetUint32()
answerBytes, _ := buffer.GetBytes(int(answerLength))
var answer webrtc.SessionDescription
err := json.Unmarshal(answerBytes, &answer)
if err != nil {
logger.LogWarnMessage("failed to unmarshal answer from peer " + peerID + " in room " + roomID + ": " + err.Error())
continue
}
err = sfu.HandleClientAnswer(roomID, peerID, answer)
if err != nil {
logger.LogWarnMessage("failed to handle client answer from peer " + peerID + " in room " + roomID + ": " + err.Error())
continue
}
}
//Check life для проверки соединения с сервером SFU
if packetId == byte(network.CHECK_LIFE) {
// Подготовка ответа для клиента о том, что соединение живо
response := bytebuffer.Allocate(1)
response.Put(byte(network.CHECK_LIFE_SUCCESS))
response.Flip()
// Отправляем ответ клиенту
connection.WriteBinary(response.Bytes())
continue
}
// Запрос от бекенда на получение данных для подключения к TURN серверу
if packetId == byte(network.TURN_ASK) && turn.TURN_PUBLIC_IP != "" {
// Отвечаем только в том случае, если TURN сервер был успешно запущен и данные для подключения к нему были заполнены
// Отправляем два пакета один на tcp сервер другой на udp сервер, так как некоторые клиенты могут поддерживать только один из этих протоколов
//tcp
response := bytebuffer.Allocate(1 + 4 + len([]byte(turn.TURN_PUBLIC_IP)) + 4 + len([]byte(turn.TURN_USER)) + 4 + len([]byte(turn.TURN_PASS)) + 4 + len([]byte("tcp")))
response.Put(byte(network.TURN_SERVER))
response.PutUint32(uint32(len([]byte(turn.TURN_PUBLIC_IP))))
response.PutBytes([]byte(turn.TURN_PUBLIC_IP))
response.PutUint32(uint32(len([]byte(turn.TURN_USER))))
response.PutBytes([]byte(turn.TURN_USER))
response.PutUint32(uint32(len([]byte(turn.TURN_PASS))))
response.PutBytes([]byte(turn.TURN_PASS))
response.PutUint32(uint32(len([]byte("tcp"))))
response.PutBytes([]byte("tcp"))
response.Flip()
connection.WriteBinary(response.Bytes())
//udp
responseUDP := bytebuffer.Allocate(1 + 4 + len([]byte(turn.TURN_PUBLIC_IP)) + 4 + len([]byte(turn.TURN_USER)) + 4 + len([]byte(turn.TURN_PASS)) + 4 + len([]byte("udp")))
responseUDP.Put(byte(network.TURN_SERVER))
responseUDP.PutUint32(uint32(len([]byte(turn.TURN_PUBLIC_IP))))
responseUDP.PutBytes([]byte(turn.TURN_PUBLIC_IP))
responseUDP.PutUint32(uint32(len([]byte(turn.TURN_USER))))
responseUDP.PutBytes([]byte(turn.TURN_USER))
responseUDP.PutUint32(uint32(len([]byte(turn.TURN_PASS))))
responseUDP.PutBytes([]byte(turn.TURN_PASS))
responseUDP.PutUint32(uint32(len([]byte("udp"))))
responseUDP.PutBytes([]byte("udp"))
responseUDP.Flip()
connection.WriteBinary(responseUDP.Bytes())
continue
}
} }
} }

View File

@@ -1,11 +1,38 @@
package connection package connection
import ( import (
"sync"
"github.com/gorilla/websocket" "github.com/gorilla/websocket"
) )
type Connection struct { type Connection struct {
Identificator string Identificator string
//Подсоединенный сокет Socket *websocket.Conn
Socket *websocket.Conn
writeMu sync.Mutex
closeMu sync.Mutex
closed bool
}
func (c *Connection) WriteBinary(payload []byte) error {
c.writeMu.Lock()
defer c.writeMu.Unlock()
return c.Socket.WriteMessage(websocket.BinaryMessage, payload)
}
func (c *Connection) WriteJSON(v any) error {
c.writeMu.Lock()
defer c.writeMu.Unlock()
return c.Socket.WriteJSON(v)
}
func (c *Connection) Close() error {
c.closeMu.Lock()
defer c.closeMu.Unlock()
if c.closed {
return nil
}
c.closed = true
return c.Socket.Close()
} }

8
turn/provider.go Normal file
View File

@@ -0,0 +1,8 @@
package turn
// Заполнится если сервер запустится успешно
var (
TURN_PUBLIC_IP = ""
TURN_USER = ""
TURN_PASS = ""
)

111
turn/turn.go Normal file
View File

@@ -0,0 +1,111 @@
package turn
import (
"fmt"
"net"
pionturn "github.com/pion/turn/v4"
)
type Server struct {
udpConn net.PacketConn
tcpListener net.Listener
srv *pionturn.Server
}
type Config struct {
ListenAddr string // "0.0.0.0:3478"
PublicIP string
Realm string
Username string
Password string
MinRelayPort uint16
MaxRelayPort uint16
}
func relayGen(ip net.IP, minPort, maxPort uint16) pionturn.RelayAddressGenerator {
if minPort != 0 && maxPort != 0 && minPort <= maxPort {
return &pionturn.RelayAddressGeneratorPortRange{
RelayAddress: ip,
Address: "0.0.0.0",
MinPort: minPort,
MaxPort: maxPort,
}
}
return &pionturn.RelayAddressGeneratorStatic{
RelayAddress: ip,
Address: "0.0.0.0",
}
}
func Start(cfg Config) (*Server, error) {
ip := net.ParseIP(cfg.PublicIP)
if ip == nil {
return nil, fmt.Errorf("turn: invalid PublicIP: %s", cfg.PublicIP)
}
udpConn, err := net.ListenPacket("udp4", cfg.ListenAddr)
if err != nil {
return nil, err
}
tcpListener, err := net.Listen("tcp4", cfg.ListenAddr)
if err != nil {
_ = udpConn.Close()
return nil, err
}
rg := relayGen(ip, cfg.MinRelayPort, cfg.MaxRelayPort)
srv, err := pionturn.NewServer(pionturn.ServerConfig{
Realm: cfg.Realm,
AuthHandler: func(username, realm string, srcAddr net.Addr) ([]byte, bool) {
if username != cfg.Username {
return nil, false
}
return pionturn.GenerateAuthKey(username, realm, cfg.Password), true
},
PacketConnConfigs: []pionturn.PacketConnConfig{
{
PacketConn: udpConn,
RelayAddressGenerator: rg,
},
},
ListenerConfigs: []pionturn.ListenerConfig{
{
Listener: tcpListener,
RelayAddressGenerator: rg,
},
},
})
if err != nil {
_ = tcpListener.Close()
_ = udpConn.Close()
return nil, err
}
return &Server{
udpConn: udpConn,
tcpListener: tcpListener,
srv: srv,
}, nil
}
func (s *Server) Close() error {
if s == nil {
return nil
}
if s.srv != nil {
_ = s.srv.Close()
}
if s.tcpListener != nil {
_ = s.tcpListener.Close()
}
if s.udpConn != nil {
_ = s.udpConn.Close()
}
return nil
}

View File

@@ -1,6 +1,9 @@
package utils package utils
import "math/rand" import (
"math/rand"
"strconv"
)
// Генерация случайной строки заданной длины // Генерация случайной строки заданной длины
func RandomString(n int) string { func RandomString(n int) string {
@@ -11,3 +14,14 @@ func RandomString(n int) string {
} }
return string(b) return string(b)
} }
func AtoiOrDefault(s string, defaultValue int) int {
if s == "" {
return defaultValue
}
n, err := strconv.Atoi(s)
if err != nil {
return defaultValue
}
return n
}