Compare commits
12 Commits
e703ac22e6
...
main
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9a33235c7e | ||
|
|
feb4a51ab0 | ||
|
|
8c386eb42a | ||
|
|
027626eb2c | ||
|
|
1cbe48d327 | ||
|
|
625a7acfb7 | ||
|
|
6364253c6f | ||
|
|
f6e99cf32b | ||
|
|
6dadec6b64 | ||
|
|
96df1e52f9 | ||
|
|
c8141f00bc | ||
|
|
3c810407db |
27
.env
Normal file
27
.env
Normal file
@@ -0,0 +1,27 @@
|
||||
###########
|
||||
# GATEWAY #
|
||||
###########
|
||||
#Секретный ключ по которому авторизуются бекенды при подключении к SFU шлюзу
|
||||
SECRET=SFU_TEST_SECRET
|
||||
#PORT SFU шлюза (для приема соединений от бекендов)
|
||||
PORT=1001
|
||||
###############
|
||||
# SFU SECTION #
|
||||
###############
|
||||
#Публичный IP адрес, который будет использоваться для ICE кандидатов (если SFU работает за NAT)
|
||||
SFU_PUBLIC_IP=10.211.55.2
|
||||
#Диапазон портов для ICE кандидатов
|
||||
SFU_PORT_RANGE_FROM=30000
|
||||
SFU_PORT_RANGE_TO=39999
|
||||
################
|
||||
# TURN SECTION #
|
||||
################
|
||||
#Разрешить использовать этот SFU как TURN сервер тоже (для ретрансляции медиа трафика на сам SFU)
|
||||
TURN_ALLOW=true
|
||||
#TURN имя пользователя и пароль для аутентификации на TURN сервере (если используется)
|
||||
TURN_PUBLIC_IP=10.211.55.2
|
||||
TURN_USER=user
|
||||
TURN_PASS=pass
|
||||
#Диапазон занимемых TURN сервером портов (tcp/udp)
|
||||
TURN_PORT_RANGE_FROM=40000
|
||||
TURN_PORT_RANGE_TO=50000
|
||||
44
.gitea/workflows/build.yaml
Normal file
44
.gitea/workflows/build.yaml
Normal file
@@ -0,0 +1,44 @@
|
||||
name: Build G365SFU
|
||||
run-name: Build and Deploy G365SFU
|
||||
on:
|
||||
push:
|
||||
branches: [ main ]
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
build:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v6
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v4
|
||||
with:
|
||||
go-version: 1.24
|
||||
- name: Build G365SFU
|
||||
run: |
|
||||
make build
|
||||
- name: Deploy to server via SSH
|
||||
uses: appleboy/scp-action@v1
|
||||
with:
|
||||
host: ${{ secrets.SFU_SSH_HOST }}
|
||||
username: ${{ secrets.SFU_SSH_USER }}
|
||||
password: ${{ secrets.SFU_SSH_PASSWORD }}
|
||||
port: ${{ secrets.SFU_SSH_PORT }}
|
||||
source: "build/*"
|
||||
target: ${{ secrets.SFU_DEPLOY_PATH }}
|
||||
strip_components: 1
|
||||
rm: false
|
||||
debug: true
|
||||
flatten: true
|
||||
- name: Restart Docker containers
|
||||
uses: appleboy/ssh-action@v1.2.5
|
||||
with:
|
||||
host: ${{ secrets.SFU_SSH_HOST }}
|
||||
username: ${{ secrets.SFU_SSH_USER }}
|
||||
password: ${{ secrets.SFU_SSH_PASSWORD }}
|
||||
port: ${{ secrets.SFU_SSH_PORT }}
|
||||
script: |
|
||||
cd ${{ secrets.SFU_DEPLOY_PATH }}
|
||||
docker-compose down
|
||||
docker compose up -d --build
|
||||
5
.gitignore
vendored
5
.gitignore
vendored
@@ -1,3 +1,4 @@
|
||||
.env
|
||||
.vscode
|
||||
__debug*
|
||||
__debug*
|
||||
build/.env
|
||||
build/g365sfu
|
||||
5
Makefile
Normal file
5
Makefile
Normal file
@@ -0,0 +1,5 @@
|
||||
.PHONY: build
|
||||
|
||||
build:
|
||||
mkdir -p build
|
||||
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o build/g365sfu .
|
||||
31
README.md
31
README.md
@@ -1,2 +1,29 @@
|
||||
### SFU server
|
||||
Сервер для организации видеоконференций на основе WebRTC. Написан на Go и использует библиотеку Pion WebRTC.
|
||||
# G365SFU
|
||||
G365SFU - это медиасервер, который используется для организации звонков. Он работает по протоколам WebSocket (шлюз/gateway) и WebRTC (для медиа-трафика) и состоит из трех основных компонентов: шлюза (gateway), SFU сервера, TURN сервера. SFU занимается пересылкой медиа-потоков между участниками звонка, а TURN сервер обеспечивает возможность связи между участниками, находящимися за NAT или брандмауэром. **ВАЖНО!** G365SFU не занимается сигнализацией, а только пересылает медиа-трафик, поэтому для организации звонков необходимо использовать отдельный сервер сигнализации, который будет обмениваться данными о подключении между участниками и SFU. **ВАЖНО!** Пользователи в звонках не соединяются друг с другом напрямую, они "созваниваются" с сервером SFU, используя его в качестве посредника для передачи медиа-трафика. Сервер SFU при этом не может расшифровать звонок, так как ему приходят уже зашифрованные RTP пакеты, которые он просто пересылает между участниками.
|
||||
|
||||
### Disclaimer
|
||||
В Docker работает только на Linux (Ubuntu 22.04 и выше). На других платформах нужно запускать бинарный файл напрямую ./build/g365sfu, предварительно собрав его с помощью команды `make build`. Связано это с публикацией огромного диапазона портов для SFU и TURN сервера, что не поддерживается в Docker на других платформах. Сейчас Docker использует host-network, который поддерживает только ядра Linux, что позволяет обойти эту проблему.
|
||||
|
||||
### Gatway
|
||||
Gateway - это компонент, который обеспечивает связь между Вашим сервером сигналинга (в Rosetta он написан на Java) и SFU сервером. Он принимает WebSocket соединения от сервера сигнализации и обрабатывает сообщения, связанные с управлением звонками, такими как создание комнаты, присоединение к комнате, отключение от комнаты и т.д. Gateway также отвечает за аутентификацию пользователей и управление их сессиями. В G365SFU используется встроенный шлюз, который можно настроить через переменные окружения. Он будет слушать на порту 1001 (по умолчанию) для входящих WebSocket соединений от сервера сигнализации. Gateway обеспечивает надежную связь между сервером сигнализации и SFU сервером, позволяя эффективно управлять звонками и участниками.
|
||||
|
||||
### SFU сервер
|
||||
SFU - Selective Forwarding Unit - это тип медиасервера, который принимает медиа-потоки от участников видеоконференции и пересылает их другим участникам без декодирования и повторного кодирования. Это позволяет снизить нагрузку на сервер и улучшить качество видео для всех участников. Сейчас G365SFU просто пересылает RTP пакеты между участниками, не обрабатывая их содержимое (потому, что оно зашифровано). В будущем планируется добавить возможность обработки слоев улучшения.
|
||||
|
||||
### TURN сервер
|
||||
TURN - Traversal Using Relays around NAT - это протокол, который позволяет устройствам за NAT (Network Address Translation) или брандмауэром устанавливать связь с другими устройствами в интернете. TURN серверы используются для ретрансляции медиа-трафика между участниками видеоконференции, когда прямое соединение между ними невозможно из-за ограничений сети. В G365SFU используется встроенный TURN сервер, который можно включить с помощью переменной окружения `TURN_ALLOW=true`. Он будет слушать на порту 3478 и использовать диапазон портов от 40000 до 50000 для ретрансляции трафика. Параметры сервера, такие как публичный IP, имя пользователя и пароль, также настраиваются через переменные окружения. TURN сервер обеспечивает надежную связь между участниками звонка, даже если они находятся за NAT.
|
||||
|
||||
# Установка (Ubuntu 22.04 и выше)
|
||||
Для начала, нам необходимо открыть порты 30000-39999 для SFU и 40000-50000 для TURN сервера (по умолчанию, если перенастраивается .env то нужно указать другие). Это можно сделать с помощью команды `ufw`:
|
||||
```bash
|
||||
sudo ufw allow 30000:39999/udp
|
||||
sudo ufw allow 40000:50000/udp
|
||||
sudo ufw allow 30000:39999/tcp
|
||||
sudo ufw allow 40000:50000/tcp
|
||||
sudo ufw allow 3478/tcp
|
||||
```
|
||||
|
||||
Запускаем докер-контейнер с G365SFU, указав необходимые переменные окружения в файле `build/.env` (пример можно взять из корневой папки проекта .env). Для этого нужно выполнить следующую команду в терминале, находясь в папке build:
|
||||
```bash
|
||||
docker compose up -d
|
||||
```
|
||||
43
boot/boot.go
43
boot/boot.go
@@ -9,6 +9,7 @@ import (
|
||||
"g365sfu/socket"
|
||||
connection "g365sfu/socket/struct"
|
||||
"g365sfu/turn"
|
||||
"g365sfu/utils"
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
@@ -32,20 +33,33 @@ func Bootstrap() {
|
||||
sfu.OnServerOffer = OnServerOffer
|
||||
sfu.OnRoomDelete = OnRoomDelete
|
||||
sfu.OnPeerDisconnected = OnPeerDisconnected
|
||||
turnServer, err := turn.Start(turn.Config{
|
||||
ListenAddr: "0.0.0.0:3478",
|
||||
PublicIP: os.Getenv("TURN_PUBLIC_IP"),
|
||||
Realm: "g365sfu",
|
||||
Username: os.Getenv("TURN_USER"),
|
||||
Password: os.Getenv("TURN_PASS"),
|
||||
})
|
||||
if err != nil {
|
||||
logger.LogWarnMessage("TURN start failed: " + err.Error())
|
||||
if os.Getenv("TURN_ALLOW") == "true" {
|
||||
turnServer, err := turn.Start(turn.Config{
|
||||
ListenAddr: "0.0.0.0:3478",
|
||||
PublicIP: os.Getenv("TURN_PUBLIC_IP"),
|
||||
Realm: "g365sfu",
|
||||
Username: os.Getenv("TURN_USER"),
|
||||
Password: os.Getenv("TURN_PASS"),
|
||||
MinRelayPort: uint16(utils.AtoiOrDefault(os.Getenv("TURN_PORT_RANGE_FROM"), 40000)),
|
||||
MaxRelayPort: uint16(utils.AtoiOrDefault(os.Getenv("TURN_PORT_RANGE_TO"), 50000)),
|
||||
})
|
||||
if err != nil {
|
||||
logger.LogWarnMessage("error while starting TURN server: " + err.Error())
|
||||
logger.LogInfoMessage("starting without TURN server, peer connections may fail if clients are behind symmetric NATs")
|
||||
} else {
|
||||
logger.LogInfoMessage("server TURN started at 0.0.0.0:3478")
|
||||
// Заполняем глобальные переменные для TURN провайдера, чтобы их могли использовать другие части приложения
|
||||
// Обратите внимание, заполняем их только в случе успешного старта Turn сервера
|
||||
turn.TURN_PASS = os.Getenv("TURN_PASS")
|
||||
turn.TURN_USER = os.Getenv("TURN_USER")
|
||||
turn.TURN_PUBLIC_IP = os.Getenv("TURN_PUBLIC_IP")
|
||||
defer turnServer.Close()
|
||||
}
|
||||
} else {
|
||||
logger.LogInfoMessage("TURN started on 0.0.0.0:3478")
|
||||
defer turnServer.Close()
|
||||
// TURN сервер выключен в конфиге, что может влиять на соединение некоторых пользователей
|
||||
logger.LogInfoMessage("starting without TURN server, peer connections may fail if clients are behind symmetric NATs")
|
||||
}
|
||||
logger.LogInfoMessage("server started at x.x.x.x:" + port)
|
||||
logger.LogInfoMessage("server SFU started at x.x.x.x:" + port)
|
||||
http.ListenAndServe(":"+port, nil)
|
||||
}
|
||||
|
||||
@@ -102,13 +116,14 @@ func OnRoomDelete(roomID string, server *connection.Connection) {
|
||||
server.WriteBinary(buffer.Bytes())
|
||||
}
|
||||
|
||||
func OnPeerDisconnected(roomID string, peerID string, server *connection.Connection) {
|
||||
buffer := bytebuffer.Allocate(1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)))
|
||||
func OnPeerDisconnected(roomID string, peerID string, server *connection.Connection, reason sfu.DisconnectReason) {
|
||||
buffer := bytebuffer.Allocate(1 + 4 + len([]byte(roomID)) + 4 + len([]byte(peerID)) + 4)
|
||||
buffer.Put(byte(network.ON_PEER_DISCONNECTED))
|
||||
buffer.PutUint32(uint32(len([]byte(roomID))))
|
||||
buffer.PutBytes([]byte(roomID))
|
||||
buffer.PutUint32(uint32(len([]byte(peerID))))
|
||||
buffer.PutBytes([]byte(peerID))
|
||||
buffer.PutUint32(uint32(reason))
|
||||
buffer.Flip()
|
||||
server.WriteBinary(buffer.Bytes())
|
||||
}
|
||||
|
||||
18
build/Dockerfile
Normal file
18
build/Dockerfile
Normal file
@@ -0,0 +1,18 @@
|
||||
FROM alpine:3.20
|
||||
|
||||
RUN apk add --no-cache ca-certificates tzdata
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Бинарь должен быть собран заранее в ./build/g365sfu
|
||||
COPY g365sfu /app/g365sfu
|
||||
|
||||
COPY .env /app/.env
|
||||
|
||||
RUN chmod +x /app/g365sfu
|
||||
|
||||
EXPOSE 1001
|
||||
EXPOSE 3478/tcp
|
||||
EXPOSE 3478/udp
|
||||
|
||||
ENTRYPOINT ["/app/g365sfu"]
|
||||
13
build/docker-compose.yml
Normal file
13
build/docker-compose.yml
Normal file
@@ -0,0 +1,13 @@
|
||||
services:
|
||||
g365sfu:
|
||||
container_name: g365sfu
|
||||
build:
|
||||
context: .
|
||||
dockerfile: Dockerfile
|
||||
image: g365sfu:latest
|
||||
restart: unless-stopped
|
||||
env_file:
|
||||
- .env
|
||||
#Поддерживается только в Linux (на Windows и MacOS нужно использовать bridge сеть и проброс портов, но нельзя пробрасывать большие
|
||||
#диапазоны портов, это может вызывать проблемы с работой Docker)
|
||||
network_mode: host
|
||||
@@ -20,7 +20,7 @@ func LogInfoMessage(message string) {
|
||||
fmt.Printf("%s[g365sfu] %s[%s]%s %s[INFO]%s %s\n",
|
||||
colorBlue,
|
||||
colorGray, timestamp, colorReset,
|
||||
colorGreen, colorReset,
|
||||
colorCyan, colorReset,
|
||||
message,
|
||||
)
|
||||
}
|
||||
@@ -60,7 +60,7 @@ func LogSuccessMessage(message string) {
|
||||
fmt.Printf("%s[g365sfu] %s[%s]%s %s[SUCCESS]%s %s\n",
|
||||
colorBlue,
|
||||
colorGray, timestamp, colorReset,
|
||||
colorCyan, colorReset,
|
||||
colorGreen, colorReset,
|
||||
message,
|
||||
)
|
||||
}
|
||||
|
||||
@@ -15,4 +15,6 @@ var (
|
||||
SDP_ANSWER_RENEGOTIATION = 0x07
|
||||
// Check life для проверки соединения с сервером SFU
|
||||
CHECK_LIFE = 0xAE
|
||||
// Запрос от бекенда на получение данных для подключения к TURN серверу
|
||||
TURN_ASK = 0x19
|
||||
)
|
||||
|
||||
@@ -20,4 +20,6 @@ var (
|
||||
SDP_ANSWER = 0x05
|
||||
// Сервер SFU отправит этот пакет бекенду при получении Check life для подтверждения, что соединение живо
|
||||
CHECK_LIFE_SUCCESS = 0xAE
|
||||
// Ответ от сервера при вопросе о TURN
|
||||
TURN_SERVER = 0x19
|
||||
)
|
||||
|
||||
@@ -9,6 +9,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/pion/rtcp"
|
||||
"github.com/pion/rtp"
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
@@ -20,7 +21,13 @@ var (
|
||||
renegPending sync.Map // key -> bool
|
||||
)
|
||||
|
||||
const disconnectGracePeriod = 12 * time.Second
|
||||
const (
|
||||
disconnectGracePeriod = 30 * time.Second
|
||||
outboundPacketBuffer = 256
|
||||
maxConsecutiveReadErrs = 25
|
||||
maxConsecutiveWriteErrs = 50
|
||||
packetDropLogEvery = 100
|
||||
)
|
||||
|
||||
func peerKey(roomID, peerID string) string {
|
||||
return roomID + "|" + peerID
|
||||
@@ -64,6 +71,7 @@ func removeRoomTrack(roomID, trackID string) {
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
room.mu.Lock()
|
||||
defer room.mu.Unlock()
|
||||
|
||||
@@ -79,8 +87,7 @@ func removeRoomTrack(roomID, trackID string) {
|
||||
func isPeerConnectionAlive(pc *webrtc.PeerConnection) bool {
|
||||
state := pc.ConnectionState()
|
||||
return state != webrtc.PeerConnectionStateClosed &&
|
||||
state != webrtc.PeerConnectionStateFailed &&
|
||||
state != webrtc.PeerConnectionStateDisconnected
|
||||
state != webrtc.PeerConnectionStateFailed
|
||||
}
|
||||
|
||||
// Вешается на каждый PeerConnection при JoinWithOffer.
|
||||
@@ -90,6 +97,7 @@ func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) {
|
||||
mu sync.Mutex
|
||||
disconnecting bool
|
||||
timer *time.Timer
|
||||
leaveOnce sync.Once
|
||||
)
|
||||
|
||||
room, exists := GetRoom(roomID)
|
||||
@@ -99,12 +107,35 @@ func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) {
|
||||
}
|
||||
server := room.Server
|
||||
|
||||
cancelTimer := func() {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
if timer != nil {
|
||||
timer.Stop()
|
||||
timer = nil
|
||||
}
|
||||
disconnecting = false
|
||||
}
|
||||
|
||||
leaveAndNotify := func(reason DisconnectReason) {
|
||||
leaveOnce.Do(func() {
|
||||
cancelTimer()
|
||||
err := LeaveRoom(roomID, peerID)
|
||||
if OnPeerDisconnected != nil && err == nil {
|
||||
OnPeerDisconnected(roomID, peerID, server, reason)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
startTimer := func() {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
|
||||
if disconnecting {
|
||||
return
|
||||
}
|
||||
|
||||
disconnecting = true
|
||||
timer = time.AfterFunc(disconnectGracePeriod, func() {
|
||||
state := pc.ConnectionState()
|
||||
@@ -114,35 +145,20 @@ func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) {
|
||||
mu.Unlock()
|
||||
return
|
||||
}
|
||||
_ = LeaveRoom(roomID, peerID)
|
||||
if OnPeerDisconnected != nil {
|
||||
OnPeerDisconnected(roomID, peerID, server)
|
||||
}
|
||||
leaveAndNotify(DisconnectReasonFailed)
|
||||
})
|
||||
}
|
||||
|
||||
cancelTimer := func() {
|
||||
mu.Lock()
|
||||
defer mu.Unlock()
|
||||
if timer != nil {
|
||||
timer.Stop()
|
||||
timer = nil
|
||||
}
|
||||
disconnecting = false
|
||||
}
|
||||
|
||||
pc.OnICEConnectionStateChange(func(state webrtc.ICEConnectionState) {
|
||||
switch state {
|
||||
case webrtc.ICEConnectionStateConnected, webrtc.ICEConnectionStateCompleted:
|
||||
cancelTimer()
|
||||
case webrtc.ICEConnectionStateDisconnected:
|
||||
startTimer()
|
||||
case webrtc.ICEConnectionStateFailed, webrtc.ICEConnectionStateClosed:
|
||||
cancelTimer()
|
||||
_ = LeaveRoom(roomID, peerID)
|
||||
if OnPeerDisconnected != nil {
|
||||
OnPeerDisconnected(roomID, peerID, server)
|
||||
}
|
||||
case webrtc.ICEConnectionStateClosed:
|
||||
leaveAndNotify(DisconnectReasonClosed)
|
||||
case webrtc.ICEConnectionStateFailed:
|
||||
leaveAndNotify(DisconnectReasonFailed)
|
||||
}
|
||||
})
|
||||
|
||||
@@ -152,17 +168,15 @@ func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) {
|
||||
cancelTimer()
|
||||
case webrtc.PeerConnectionStateDisconnected:
|
||||
startTimer()
|
||||
case webrtc.PeerConnectionStateFailed, webrtc.PeerConnectionStateClosed:
|
||||
cancelTimer()
|
||||
_ = LeaveRoom(roomID, peerID)
|
||||
if OnPeerDisconnected != nil {
|
||||
OnPeerDisconnected(roomID, peerID, server)
|
||||
}
|
||||
case webrtc.PeerConnectionStateClosed:
|
||||
leaveAndNotify(DisconnectReasonClosed)
|
||||
case webrtc.PeerConnectionStateFailed:
|
||||
leaveAndNotify(DisconnectReasonFailed)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты
|
||||
// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты.
|
||||
func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) {
|
||||
publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
|
||||
localTrackID := fmt.Sprintf("%s:%s:%s:%d",
|
||||
@@ -178,7 +192,7 @@ func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *
|
||||
remote.StreamID(),
|
||||
)
|
||||
if err != nil {
|
||||
logger.LogErrorMessage("SetupForwardingForPeer: NewTrackLocalStaticRTP error")
|
||||
logger.LogErrorMessage("SetupForwardingForPeer: NewTrackLocalStaticRTP error: " + err.Error())
|
||||
return
|
||||
}
|
||||
defer removeRoomTrack(roomID, localTrack.ID())
|
||||
@@ -203,54 +217,124 @@ func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *
|
||||
continue
|
||||
}
|
||||
|
||||
// Не трогаем закрытые/failed соединения
|
||||
if !isPeerConnectionAlive(sub.PeerConnection) {
|
||||
fmt.Println("SetupForwardingForPeer: skipping dead peer:", sub.PeerID,
|
||||
sub.PeerConnection.ConnectionState().String())
|
||||
continue
|
||||
}
|
||||
|
||||
sender, err := sub.PeerConnection.AddTrack(localTrack)
|
||||
if err != nil {
|
||||
fmt.Println("SetupForwardingForPeer: AddTrack error:", roomID, sub.PeerID, err)
|
||||
logger.LogWarnMessage("SetupForwardingForPeer: AddTrack error: " + sub.PeerID + " " + err.Error())
|
||||
continue
|
||||
}
|
||||
|
||||
// RTCP drain
|
||||
senderCopy := sender
|
||||
go func() {
|
||||
buf := make([]byte, 1500)
|
||||
for {
|
||||
if _, _, e := sender.Read(buf); e != nil {
|
||||
if _, _, e := senderCopy.Read(buf); e != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
if err = renegotiatePeer(roomID, sub.PeerID, sub.PeerConnection); err != nil {
|
||||
fmt.Println("SetupForwardingForPeer: renegotiatePeer error:", roomID, sub.PeerID, err)
|
||||
}
|
||||
subID := sub.PeerID
|
||||
subPC := sub.PeerConnection
|
||||
|
||||
go func() {
|
||||
if err := renegotiatePeer(roomID, subID, subPC); err != nil {
|
||||
logger.LogWarnMessage("SetupForwardingForPeer: renegotiatePeer error: " + subID + " " + err.Error())
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Для video просим keyframe
|
||||
if remote.Kind() == webrtc.RTPCodecTypeVideo {
|
||||
_ = publisherPC.WriteRTCP([]rtcp.Packet{
|
||||
&rtcp.PictureLossIndication{MediaSSRC: uint32(remote.SSRC())},
|
||||
})
|
||||
}
|
||||
|
||||
// Publisher RTP -> localTrack -> subscribers
|
||||
// Отделяем чтение входящего RTP от записи в TrackLocal bounded-очередью.
|
||||
// Если downstream начинает тормозить, пакеты дропаются из очереди,
|
||||
// а не накапливают бесконечную задержку.
|
||||
packetQueue := make(chan *rtp.Packet, outboundPacketBuffer)
|
||||
writerDone := make(chan struct{})
|
||||
defer close(packetQueue)
|
||||
|
||||
go func() {
|
||||
defer close(writerDone)
|
||||
|
||||
consecutiveWriteErrs := 0
|
||||
for pkt := range packetQueue {
|
||||
if err := localTrack.WriteRTP(pkt); err != nil {
|
||||
consecutiveWriteErrs++
|
||||
if consecutiveWriteErrs == 1 || consecutiveWriteErrs%10 == 0 {
|
||||
logger.LogWarnMessage(fmt.Sprintf(
|
||||
"SetupForwardingForPeer: WriteRTP error publisher=%s track=%s count=%d err=%v",
|
||||
publisherPeerID, localTrack.ID(), consecutiveWriteErrs, err,
|
||||
))
|
||||
}
|
||||
if consecutiveWriteErrs >= maxConsecutiveWriteErrs {
|
||||
logger.LogWarnMessage(fmt.Sprintf(
|
||||
"SetupForwardingForPeer: too many WriteRTP errors publisher=%s track=%s",
|
||||
publisherPeerID, localTrack.ID(),
|
||||
))
|
||||
return
|
||||
}
|
||||
continue
|
||||
}
|
||||
consecutiveWriteErrs = 0
|
||||
}
|
||||
}()
|
||||
|
||||
consecutiveReadErrs := 0
|
||||
droppedPackets := 0
|
||||
|
||||
for {
|
||||
pkt, _, err := remote.ReadRTP()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
return
|
||||
}
|
||||
fmt.Println("SetupForwardingForPeer: ReadRTP error:", err)
|
||||
return
|
||||
|
||||
consecutiveReadErrs++
|
||||
if consecutiveReadErrs == 1 || consecutiveReadErrs%10 == 0 {
|
||||
logger.LogWarnMessage(fmt.Sprintf(
|
||||
"SetupForwardingForPeer: ReadRTP error publisher=%s track=%s count=%d err=%v",
|
||||
publisherPeerID, localTrack.ID(), consecutiveReadErrs, err,
|
||||
))
|
||||
}
|
||||
if consecutiveReadErrs >= maxConsecutiveReadErrs {
|
||||
logger.LogWarnMessage(fmt.Sprintf(
|
||||
"SetupForwardingForPeer: too many ReadRTP errors publisher=%s track=%s",
|
||||
publisherPeerID, localTrack.ID(),
|
||||
))
|
||||
return
|
||||
}
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
continue
|
||||
}
|
||||
if err = localTrack.WriteRTP(pkt); err != nil {
|
||||
fmt.Println("SetupForwardingForPeer: WriteRTP error:", err)
|
||||
|
||||
consecutiveReadErrs = 0
|
||||
|
||||
select {
|
||||
case <-writerDone:
|
||||
return
|
||||
default:
|
||||
}
|
||||
|
||||
select {
|
||||
case <-writerDone:
|
||||
return
|
||||
case packetQueue <- pkt:
|
||||
default:
|
||||
droppedPackets++
|
||||
if droppedPackets == 1 || droppedPackets%packetDropLogEvery == 0 {
|
||||
logger.LogWarnMessage(fmt.Sprintf(
|
||||
"SetupForwardingForPeer: packet queue overflow publisher=%s track=%s dropped=%d",
|
||||
publisherPeerID, localTrack.ID(), droppedPackets,
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
@@ -265,7 +349,6 @@ func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Не начинаем новую negotiation поверх текущей.
|
||||
if pc.SignalingState() != webrtc.SignalingStateStable {
|
||||
setRenegPending(roomID, peerID, true)
|
||||
return nil
|
||||
@@ -288,7 +371,7 @@ func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Добавляет ICE-кандидата к пиру
|
||||
// Добавляет ICE-кандидата к пиру.
|
||||
func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidateInit) error {
|
||||
room, exists := GetRoom(roomID)
|
||||
if !exists {
|
||||
@@ -311,7 +394,6 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate
|
||||
|
||||
rd := pc.RemoteDescription()
|
||||
if rd == nil {
|
||||
// offer/answer еще не применен — буферизуем
|
||||
pendingMu.Lock()
|
||||
k := peerKey(roomID, peerID)
|
||||
pendingCandidates[k] = append(pendingCandidates[k], candidate)
|
||||
@@ -319,7 +401,6 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate
|
||||
return nil
|
||||
}
|
||||
|
||||
// Отбрасываем stale candidate по ufrag
|
||||
if candidate.UsernameFragment != nil {
|
||||
current := extractICEUfrag(rd.SDP)
|
||||
if current != "" && *candidate.UsernameFragment != current {
|
||||
@@ -334,7 +415,7 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate
|
||||
return err
|
||||
}
|
||||
|
||||
// Обрабатывает SDP answer от клиента при renegotiation
|
||||
// Обрабатывает SDP answer от клиента при renegotiation.
|
||||
func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescription) error {
|
||||
if answer.Type != webrtc.SDPTypeAnswer {
|
||||
return fmt.Errorf("invalid sdp type: %s", answer.Type.String())
|
||||
@@ -363,7 +444,6 @@ func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescr
|
||||
return err
|
||||
}
|
||||
|
||||
// После применения answer — применяем отложенные кандидаты.
|
||||
k := peerKey(roomID, peerID)
|
||||
pendingMu.Lock()
|
||||
queue := pendingCandidates[k]
|
||||
@@ -374,7 +454,6 @@ func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescr
|
||||
_ = AddICECandidate(roomID, peerID, c)
|
||||
}
|
||||
|
||||
// Если во время negotiation накопился новый запрос — запускаем еще цикл.
|
||||
if popRenegPending(roomID, peerID) {
|
||||
_ = renegotiatePeer(roomID, peerID, pc)
|
||||
}
|
||||
|
||||
11
sfu/reason.go
Normal file
11
sfu/reason.go
Normal file
@@ -0,0 +1,11 @@
|
||||
package sfu
|
||||
|
||||
// Причины отключения пира от комнаты, которые могут быть использованы для логирования или уведомлений
|
||||
type DisconnectReason int
|
||||
|
||||
const (
|
||||
// Пир отключился из-за ошибки соединения или другой проблемы
|
||||
DisconnectReasonFailed DisconnectReason = 0
|
||||
// Пир отключился по своей инициативе (например, закрыл приложение)
|
||||
DisconnectReasonClosed = 1
|
||||
)
|
||||
136
sfu/rooms.go
136
sfu/rooms.go
@@ -1,8 +1,10 @@
|
||||
package sfu
|
||||
|
||||
import (
|
||||
"g365sfu/logger"
|
||||
connection "g365sfu/socket/struct"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
@@ -33,6 +35,8 @@ type Room struct {
|
||||
Tracks []RoomTrack
|
||||
|
||||
mu sync.RWMutex
|
||||
|
||||
emptyTimer *time.Timer
|
||||
}
|
||||
|
||||
// Общие переменные
|
||||
@@ -41,6 +45,46 @@ var (
|
||||
roomsMu sync.RWMutex
|
||||
)
|
||||
|
||||
const emptyRoomTTL = 30 * time.Second
|
||||
|
||||
func scheduleEmptyRoomDeletion(room *Room) {
|
||||
room.mu.Lock()
|
||||
if len(room.Peers) > 0 {
|
||||
if room.emptyTimer != nil {
|
||||
room.emptyTimer.Stop()
|
||||
room.emptyTimer = nil
|
||||
}
|
||||
room.mu.Unlock()
|
||||
return
|
||||
}
|
||||
if room.emptyTimer != nil {
|
||||
room.mu.Unlock()
|
||||
return
|
||||
}
|
||||
roomID := room.RoomID
|
||||
var timer *time.Timer
|
||||
timer = time.AfterFunc(emptyRoomTTL, func() {
|
||||
currentRoom, exists := GetRoom(roomID)
|
||||
if !exists {
|
||||
return
|
||||
}
|
||||
|
||||
currentRoom.mu.Lock()
|
||||
if currentRoom.emptyTimer != timer || len(currentRoom.Peers) > 0 {
|
||||
currentRoom.mu.Unlock()
|
||||
return
|
||||
}
|
||||
currentRoom.emptyTimer = nil
|
||||
currentRoom.mu.Unlock()
|
||||
|
||||
if err := DeleteRoom(roomID); err != nil && err != ErrRoomNotFound {
|
||||
logger.LogWarnMessage("scheduleEmptyRoomDeletion: failed to delete room " + roomID + ": " + err.Error())
|
||||
}
|
||||
})
|
||||
room.emptyTimer = timer
|
||||
room.mu.Unlock()
|
||||
}
|
||||
|
||||
// CreateRoom создает комнату
|
||||
func CreateRoom(server *connection.Connection, roomID string) (*Room, error) {
|
||||
roomsMu.Lock()
|
||||
@@ -52,6 +96,7 @@ func CreateRoom(server *connection.Connection, roomID string) (*Room, error) {
|
||||
Peers: []Peer{},
|
||||
}
|
||||
rooms[roomID] = room
|
||||
scheduleEmptyRoomDeletion(room)
|
||||
|
||||
return room, nil
|
||||
}
|
||||
@@ -76,43 +121,9 @@ func JoinWithOffer(roomID string, peerID string, offer webrtc.SessionDescription
|
||||
return nil, err
|
||||
}
|
||||
|
||||
peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
if OnLocalICECandidate != nil {
|
||||
OnLocalICECandidate(roomID, peerID, c.ToJSON())
|
||||
}
|
||||
})
|
||||
|
||||
BindPeerLifecycle(roomID, peerID, peerConnection)
|
||||
SetupForwardingForPeer(roomID, peerID, peerConnection)
|
||||
|
||||
room.mu.RLock()
|
||||
existingTracks := make([]RoomTrack, len(room.Tracks))
|
||||
copy(existingTracks, room.Tracks)
|
||||
room.mu.RUnlock()
|
||||
|
||||
for _, t := range existingTracks {
|
||||
if t.OwnerPeer == peerID {
|
||||
continue
|
||||
}
|
||||
|
||||
sender, err := peerConnection.AddTrack(t.Local)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
go func() {
|
||||
buf := make([]byte, 1500)
|
||||
for {
|
||||
if _, _, e := sender.Read(buf); e != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
if err = peerConnection.SetRemoteDescription(offer); err != nil {
|
||||
_ = peerConnection.Close()
|
||||
return nil, err
|
||||
@@ -131,13 +142,61 @@ func JoinWithOffer(roomID string, peerID string, offer webrtc.SessionDescription
|
||||
}
|
||||
<-gatherDone
|
||||
|
||||
peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) {
|
||||
if c == nil {
|
||||
return
|
||||
}
|
||||
if OnLocalICECandidate != nil {
|
||||
OnLocalICECandidate(roomID, peerID, c.ToJSON())
|
||||
}
|
||||
})
|
||||
|
||||
// Добавляем peer в комнату и сразу снимаем snapshot существующих треков
|
||||
// в одном локе — чтобы не было race с OnTrack
|
||||
room.mu.Lock()
|
||||
if room.emptyTimer != nil {
|
||||
room.emptyTimer.Stop()
|
||||
room.emptyTimer = nil
|
||||
}
|
||||
room.Peers = append(room.Peers, Peer{
|
||||
PeerID: peerID,
|
||||
PeerConnection: peerConnection,
|
||||
})
|
||||
existingTracks := make([]RoomTrack, len(room.Tracks))
|
||||
copy(existingTracks, room.Tracks)
|
||||
room.mu.Unlock()
|
||||
|
||||
// Подписываем нового peer на уже существующие треки ПОСЛЕ добавления в комнату
|
||||
for _, t := range existingTracks {
|
||||
if t.OwnerPeer == peerID {
|
||||
continue
|
||||
}
|
||||
|
||||
sender, err := peerConnection.AddTrack(t.Local)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
senderCopy := sender
|
||||
go func() {
|
||||
buf := make([]byte, 1500)
|
||||
for {
|
||||
if _, _, e := senderCopy.Read(buf); e != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
// Если были добавлены треки — нужна renegotiation
|
||||
if len(existingTracks) > 0 {
|
||||
go func() {
|
||||
if err := renegotiatePeer(roomID, peerID, peerConnection); err != nil {
|
||||
logger.LogWarnMessage("JoinWithOffer: renegotiatePeer error: " + err.Error())
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return peerConnection.LocalDescription(), nil
|
||||
}
|
||||
|
||||
@@ -153,6 +212,10 @@ func DeleteRoom(roomID string) error {
|
||||
roomsMu.Unlock()
|
||||
|
||||
room.mu.Lock()
|
||||
if room.emptyTimer != nil {
|
||||
room.emptyTimer.Stop()
|
||||
room.emptyTimer = nil
|
||||
}
|
||||
peers := make([]Peer, len(room.Peers))
|
||||
copy(peers, room.Peers)
|
||||
room.Peers = nil
|
||||
@@ -216,9 +279,10 @@ func LeaveRoom(roomID string, peerID string) error {
|
||||
}
|
||||
cleanupForwardingState(roomID, peerID)
|
||||
|
||||
// Комната пустая -> удаляем
|
||||
// Комната пустая -> планируем удаление через TTL
|
||||
if shouldDrop {
|
||||
return DeleteRoom(roomID)
|
||||
scheduleEmptyRoomDeletion(room)
|
||||
return nil
|
||||
}
|
||||
|
||||
// renegotiation оставшимся peer после удаления треков/peer
|
||||
|
||||
17
sfu/sfu.go
17
sfu/sfu.go
@@ -4,6 +4,7 @@ import (
|
||||
"errors"
|
||||
"g365sfu/logger"
|
||||
connection "g365sfu/socket/struct"
|
||||
"g365sfu/utils"
|
||||
"os"
|
||||
|
||||
"github.com/pion/interceptor"
|
||||
@@ -22,7 +23,7 @@ var OnServerOffer func(roomID string, peerID string, offer webrtc.SessionDescrip
|
||||
var OnLocalICECandidate func(roomID, peerID string, candidate webrtc.ICECandidateInit)
|
||||
|
||||
// Коллбек для обработки отключения пира (обрыв связи)
|
||||
var OnPeerDisconnected func(roomID, peerID string, server *connection.Connection)
|
||||
var OnPeerDisconnected func(roomID, peerID string, server *connection.Connection, reason DisconnectReason)
|
||||
|
||||
// Коллбек для обработки удаления комнаты
|
||||
var OnRoomDelete func(roomID string, server *connection.Connection)
|
||||
@@ -34,12 +35,12 @@ var (
|
||||
)
|
||||
|
||||
func InitWebRTCEngines() {
|
||||
publicIP := os.Getenv("PUBLIC_IP")
|
||||
fromPort := os.Getenv("PORT_RANGE_FROM")
|
||||
toPort := os.Getenv("PORT_RANGE_TO")
|
||||
if publicIP == "" || fromPort == "" || toPort == "" {
|
||||
publicIP := os.Getenv("SFU_PUBLIC_IP")
|
||||
fromPort := utils.AtoiOrDefault(os.Getenv("SFU_PORT_RANGE_FROM"), 30000)
|
||||
toPort := utils.AtoiOrDefault(os.Getenv("SFU_PORT_RANGE_TO"), 39999)
|
||||
if publicIP == "" || fromPort == 0 || toPort == 0 {
|
||||
// Если не указаны необходимые переменные окружения, логируем ошибку и завершаем процесс сервера
|
||||
logger.LogErrorMessage("PUBLIC_IP, PORT_RANGE_FROM and PORT_RANGE_TO environment variables must be set")
|
||||
logger.LogErrorMessage("SFU_PUBLIC_IP, SFU_PORT_RANGE_FROM and SFU_PORT_RANGE_TO environment variables must be set")
|
||||
os.Exit(-1)
|
||||
return
|
||||
}
|
||||
@@ -50,9 +51,9 @@ func InitWebRTCEngines() {
|
||||
_ = webrtc.RegisterDefaultInterceptors(m, i)
|
||||
|
||||
se := webrtc.SettingEngine{}
|
||||
_ = se.SetEphemeralUDPPortRange(40000, 50000)
|
||||
_ = se.SetEphemeralUDPPortRange(uint16(fromPort), uint16(toPort))
|
||||
|
||||
if publicIP := os.Getenv("PUBLIC_IP"); publicIP != "" {
|
||||
if publicIP := os.Getenv("SFU_PUBLIC_IP"); publicIP != "" {
|
||||
se.SetICEAddressRewriteRules(webrtc.ICEAddressRewriteRule{
|
||||
External: []string{publicIP},
|
||||
AsCandidateType: webrtc.ICECandidateTypeHost,
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
"g365sfu/network"
|
||||
"g365sfu/sfu"
|
||||
connection "g365sfu/socket/struct"
|
||||
"g365sfu/turn"
|
||||
"g365sfu/utils"
|
||||
"net/http"
|
||||
"os"
|
||||
@@ -31,7 +32,15 @@ func getSecret() string {
|
||||
|
||||
// Обработчик WebSocket соединений
|
||||
func HandleWebSocket(w http.ResponseWriter, r *http.Request) {
|
||||
conn, _ := upgrader.Upgrade(w, r, nil)
|
||||
conn, err := upgrader.Upgrade(w, r, nil)
|
||||
if err != nil {
|
||||
logger.LogWarnMessage("failed to upgrade to websocket: " + err.Error())
|
||||
return
|
||||
}
|
||||
if conn == nil {
|
||||
logger.LogWarnMessage("failed to upgrade to websocket: connection is nil")
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
// Канал для передачи байтов
|
||||
@@ -218,5 +227,41 @@ func processData(data <-chan []byte, connection *connection.Connection) {
|
||||
connection.WriteBinary(response.Bytes())
|
||||
continue
|
||||
}
|
||||
|
||||
// Запрос от бекенда на получение данных для подключения к TURN серверу
|
||||
if packetId == byte(network.TURN_ASK) && turn.TURN_PUBLIC_IP != "" {
|
||||
// Отвечаем только в том случае, если TURN сервер был успешно запущен и данные для подключения к нему были заполнены
|
||||
// Отправляем два пакета один на tcp сервер другой на udp сервер, так как некоторые клиенты могут поддерживать только один из этих протоколов
|
||||
|
||||
//tcp
|
||||
response := bytebuffer.Allocate(1 + 4 + len([]byte(turn.TURN_PUBLIC_IP)) + 4 + len([]byte(turn.TURN_USER)) + 4 + len([]byte(turn.TURN_PASS)) + 4 + len([]byte("tcp")))
|
||||
response.Put(byte(network.TURN_SERVER))
|
||||
response.PutUint32(uint32(len([]byte(turn.TURN_PUBLIC_IP))))
|
||||
response.PutBytes([]byte(turn.TURN_PUBLIC_IP))
|
||||
response.PutUint32(uint32(len([]byte(turn.TURN_USER))))
|
||||
response.PutBytes([]byte(turn.TURN_USER))
|
||||
response.PutUint32(uint32(len([]byte(turn.TURN_PASS))))
|
||||
response.PutBytes([]byte(turn.TURN_PASS))
|
||||
response.PutUint32(uint32(len([]byte("tcp"))))
|
||||
response.PutBytes([]byte("tcp"))
|
||||
response.Flip()
|
||||
connection.WriteBinary(response.Bytes())
|
||||
|
||||
//udp
|
||||
responseUDP := bytebuffer.Allocate(1 + 4 + len([]byte(turn.TURN_PUBLIC_IP)) + 4 + len([]byte(turn.TURN_USER)) + 4 + len([]byte(turn.TURN_PASS)) + 4 + len([]byte("udp")))
|
||||
responseUDP.Put(byte(network.TURN_SERVER))
|
||||
responseUDP.PutUint32(uint32(len([]byte(turn.TURN_PUBLIC_IP))))
|
||||
responseUDP.PutBytes([]byte(turn.TURN_PUBLIC_IP))
|
||||
responseUDP.PutUint32(uint32(len([]byte(turn.TURN_USER))))
|
||||
responseUDP.PutBytes([]byte(turn.TURN_USER))
|
||||
responseUDP.PutUint32(uint32(len([]byte(turn.TURN_PASS))))
|
||||
responseUDP.PutBytes([]byte(turn.TURN_PASS))
|
||||
responseUDP.PutUint32(uint32(len([]byte("udp"))))
|
||||
responseUDP.PutBytes([]byte("udp"))
|
||||
responseUDP.Flip()
|
||||
connection.WriteBinary(responseUDP.Bytes())
|
||||
continue
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
8
turn/provider.go
Normal file
8
turn/provider.go
Normal file
@@ -0,0 +1,8 @@
|
||||
package turn
|
||||
|
||||
// Заполнится если сервер запустится успешно
|
||||
var (
|
||||
TURN_PUBLIC_IP = ""
|
||||
TURN_USER = ""
|
||||
TURN_PASS = ""
|
||||
)
|
||||
36
turn/turn.go
36
turn/turn.go
@@ -19,6 +19,25 @@ type Config struct {
|
||||
Realm string
|
||||
Username string
|
||||
Password string
|
||||
|
||||
MinRelayPort uint16
|
||||
MaxRelayPort uint16
|
||||
}
|
||||
|
||||
func relayGen(ip net.IP, minPort, maxPort uint16) pionturn.RelayAddressGenerator {
|
||||
if minPort != 0 && maxPort != 0 && minPort <= maxPort {
|
||||
return &pionturn.RelayAddressGeneratorPortRange{
|
||||
RelayAddress: ip,
|
||||
Address: "0.0.0.0",
|
||||
MinPort: minPort,
|
||||
MaxPort: maxPort,
|
||||
}
|
||||
}
|
||||
|
||||
return &pionturn.RelayAddressGeneratorStatic{
|
||||
RelayAddress: ip,
|
||||
Address: "0.0.0.0",
|
||||
}
|
||||
}
|
||||
|
||||
func Start(cfg Config) (*Server, error) {
|
||||
@@ -38,6 +57,8 @@ func Start(cfg Config) (*Server, error) {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rg := relayGen(ip, cfg.MinRelayPort, cfg.MaxRelayPort)
|
||||
|
||||
srv, err := pionturn.NewServer(pionturn.ServerConfig{
|
||||
Realm: cfg.Realm,
|
||||
AuthHandler: func(username, realm string, srcAddr net.Addr) ([]byte, bool) {
|
||||
@@ -48,23 +69,18 @@ func Start(cfg Config) (*Server, error) {
|
||||
},
|
||||
PacketConnConfigs: []pionturn.PacketConnConfig{
|
||||
{
|
||||
PacketConn: udpConn,
|
||||
RelayAddressGenerator: &pionturn.RelayAddressGeneratorStatic{
|
||||
RelayAddress: ip,
|
||||
Address: "0.0.0.0",
|
||||
},
|
||||
PacketConn: udpConn,
|
||||
RelayAddressGenerator: rg,
|
||||
},
|
||||
},
|
||||
ListenerConfigs: []pionturn.ListenerConfig{
|
||||
{
|
||||
Listener: tcpListener,
|
||||
RelayAddressGenerator: &pionturn.RelayAddressGeneratorStatic{
|
||||
RelayAddress: ip,
|
||||
Address: "0.0.0.0",
|
||||
},
|
||||
Listener: tcpListener,
|
||||
RelayAddressGenerator: rg,
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
_ = tcpListener.Close()
|
||||
_ = udpConn.Close()
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
package utils
|
||||
|
||||
import "math/rand"
|
||||
import (
|
||||
"math/rand"
|
||||
"strconv"
|
||||
)
|
||||
|
||||
// Генерация случайной строки заданной длины
|
||||
func RandomString(n int) string {
|
||||
@@ -11,3 +14,14 @@ func RandomString(n int) string {
|
||||
}
|
||||
return string(b)
|
||||
}
|
||||
|
||||
func AtoiOrDefault(s string, defaultValue int) int {
|
||||
if s == "" {
|
||||
return defaultValue
|
||||
}
|
||||
n, err := strconv.Atoi(s)
|
||||
if err != nil {
|
||||
return defaultValue
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user