Compare commits

...

8 Commits

Author SHA1 Message Date
set
9a33235c7e Автоматическое удаление комнат при неактивности
All checks were successful
Build G365SFU / build (push) Successful in 10m16s
2026-04-02 18:02:41 +02:00
set
feb4a51ab0 Буферы RTP пакетов для сокращения задержек если downstream медленный
All checks were successful
Build G365SFU / build (push) Successful in 12m1s
2026-03-20 23:19:22 +02:00
set
8c386eb42a Фикс Go версии
All checks were successful
Build G365SFU / build (push) Successful in 13m19s
2026-03-20 20:25:35 +02:00
set
027626eb2c Фикс CI/CD
Some checks failed
Build G365SFU / build (push) Failing after 6m47s
2026-03-20 20:13:32 +02:00
set
1cbe48d327 Добавлена система сборки и исправлены ошибки
Some checks failed
Build G365SFU / build (push) Failing after 46s
2026-03-20 19:26:13 +02:00
set
625a7acfb7 Система CI/CD
Some checks failed
Build G365SFU / build (push) Has been cancelled
2026-03-20 19:24:03 +02:00
set
6364253c6f Дополнительная информация по установке и дисклеймер 2026-03-20 18:54:06 +02:00
set
f6e99cf32b Исправлена ошибка несуществуюзего сокета 2026-03-20 18:39:10 +02:00
10 changed files with 273 additions and 25 deletions

27
.env Normal file
View File

@@ -0,0 +1,27 @@
###########
# GATEWAY #
###########
#Секретный ключ по которому авторизуются бекенды при подключении к SFU шлюзу
SECRET=SFU_TEST_SECRET
#PORT SFU шлюза (для приема соединений от бекендов)
PORT=1001
###############
# SFU SECTION #
###############
#Публичный IP адрес, который будет использоваться для ICE кандидатов (если SFU работает за NAT)
SFU_PUBLIC_IP=10.211.55.2
#Диапазон портов для ICE кандидатов
SFU_PORT_RANGE_FROM=30000
SFU_PORT_RANGE_TO=39999
################
# TURN SECTION #
################
#Разрешить использовать этот SFU как TURN сервер тоже (для ретрансляции медиа трафика на сам SFU)
TURN_ALLOW=true
#TURN имя пользователя и пароль для аутентификации на TURN сервере (если используется)
TURN_PUBLIC_IP=10.211.55.2
TURN_USER=user
TURN_PASS=pass
#Диапазон занимемых TURN сервером портов (tcp/udp)
TURN_PORT_RANGE_FROM=40000
TURN_PORT_RANGE_TO=50000

View File

@@ -0,0 +1,44 @@
name: Build G365SFU
run-name: Build and Deploy G365SFU
on:
push:
branches: [ main ]
workflow_dispatch:
jobs:
build:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v6
- name: Set up Go
uses: actions/setup-go@v4
with:
go-version: 1.24
- name: Build G365SFU
run: |
make build
- name: Deploy to server via SSH
uses: appleboy/scp-action@v1
with:
host: ${{ secrets.SFU_SSH_HOST }}
username: ${{ secrets.SFU_SSH_USER }}
password: ${{ secrets.SFU_SSH_PASSWORD }}
port: ${{ secrets.SFU_SSH_PORT }}
source: "build/*"
target: ${{ secrets.SFU_DEPLOY_PATH }}
strip_components: 1
rm: false
debug: true
flatten: true
- name: Restart Docker containers
uses: appleboy/ssh-action@v1.2.5
with:
host: ${{ secrets.SFU_SSH_HOST }}
username: ${{ secrets.SFU_SSH_USER }}
password: ${{ secrets.SFU_SSH_PASSWORD }}
port: ${{ secrets.SFU_SSH_PORT }}
script: |
cd ${{ secrets.SFU_DEPLOY_PATH }}
docker-compose down
docker compose up -d --build

3
.gitignore vendored
View File

@@ -1,3 +1,4 @@
.env
.vscode .vscode
__debug* __debug*
build/.env
build/g365sfu

5
Makefile Normal file
View File

@@ -0,0 +1,5 @@
.PHONY: build
build:
mkdir -p build
CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -o build/g365sfu .

View File

@@ -1,6 +1,9 @@
# G365SFU # G365SFU
G365SFU - это медиасервер, который используется для организации звонков. Он работает по протоколам WebSocket (шлюз/gateway) и WebRTC (для медиа-трафика) и состоит из трех основных компонентов: шлюза (gateway), SFU сервера, TURN сервера. SFU занимается пересылкой медиа-потоков между участниками звонка, а TURN сервер обеспечивает возможность связи между участниками, находящимися за NAT или брандмауэром. **ВАЖНО!** G365SFU не занимается сигнализацией, а только пересылает медиа-трафик, поэтому для организации звонков необходимо использовать отдельный сервер сигнализации, который будет обмениваться данными о подключении между участниками и SFU. **ВАЖНО!** Пользователи в звонках не соединяются друг с другом напрямую, они "созваниваются" с сервером SFU, используя его в качестве посредника для передачи медиа-трафика. Сервер SFU при этом не может расшифровать звонок, так как ему приходят уже зашифрованные RTP пакеты, которые он просто пересылает между участниками. G365SFU - это медиасервер, который используется для организации звонков. Он работает по протоколам WebSocket (шлюз/gateway) и WebRTC (для медиа-трафика) и состоит из трех основных компонентов: шлюза (gateway), SFU сервера, TURN сервера. SFU занимается пересылкой медиа-потоков между участниками звонка, а TURN сервер обеспечивает возможность связи между участниками, находящимися за NAT или брандмауэром. **ВАЖНО!** G365SFU не занимается сигнализацией, а только пересылает медиа-трафик, поэтому для организации звонков необходимо использовать отдельный сервер сигнализации, который будет обмениваться данными о подключении между участниками и SFU. **ВАЖНО!** Пользователи в звонках не соединяются друг с другом напрямую, они "созваниваются" с сервером SFU, используя его в качестве посредника для передачи медиа-трафика. Сервер SFU при этом не может расшифровать звонок, так как ему приходят уже зашифрованные RTP пакеты, которые он просто пересылает между участниками.
### Disclaimer
В Docker работает только на Linux (Ubuntu 22.04 и выше). На других платформах нужно запускать бинарный файл напрямую ./build/g365sfu, предварительно собрав его с помощью команды `make build`. Связано это с публикацией огромного диапазона портов для SFU и TURN сервера, что не поддерживается в Docker на других платформах. Сейчас Docker использует host-network, который поддерживает только ядра Linux, что позволяет обойти эту проблему.
### Gatway ### Gatway
Gateway - это компонент, который обеспечивает связь между Вашим сервером сигналинга (в Rosetta он написан на Java) и SFU сервером. Он принимает WebSocket соединения от сервера сигнализации и обрабатывает сообщения, связанные с управлением звонками, такими как создание комнаты, присоединение к комнате, отключение от комнаты и т.д. Gateway также отвечает за аутентификацию пользователей и управление их сессиями. В G365SFU используется встроенный шлюз, который можно настроить через переменные окружения. Он будет слушать на порту 1001 (по умолчанию) для входящих WebSocket соединений от сервера сигнализации. Gateway обеспечивает надежную связь между сервером сигнализации и SFU сервером, позволяя эффективно управлять звонками и участниками. Gateway - это компонент, который обеспечивает связь между Вашим сервером сигналинга (в Rosetta он написан на Java) и SFU сервером. Он принимает WebSocket соединения от сервера сигнализации и обрабатывает сообщения, связанные с управлением звонками, такими как создание комнаты, присоединение к комнате, отключение от комнаты и т.д. Gateway также отвечает за аутентификацию пользователей и управление их сессиями. В G365SFU используется встроенный шлюз, который можно настроить через переменные окружения. Он будет слушать на порту 1001 (по умолчанию) для входящих WebSocket соединений от сервера сигнализации. Gateway обеспечивает надежную связь между сервером сигнализации и SFU сервером, позволяя эффективно управлять звонками и участниками.
@@ -10,7 +13,7 @@ SFU - Selective Forwarding Unit - это тип медиасервера, кот
### TURN сервер ### TURN сервер
TURN - Traversal Using Relays around NAT - это протокол, который позволяет устройствам за NAT (Network Address Translation) или брандмауэром устанавливать связь с другими устройствами в интернете. TURN серверы используются для ретрансляции медиа-трафика между участниками видеоконференции, когда прямое соединение между ними невозможно из-за ограничений сети. В G365SFU используется встроенный TURN сервер, который можно включить с помощью переменной окружения `TURN_ALLOW=true`. Он будет слушать на порту 3478 и использовать диапазон портов от 40000 до 50000 для ретрансляции трафика. Параметры сервера, такие как публичный IP, имя пользователя и пароль, также настраиваются через переменные окружения. TURN сервер обеспечивает надежную связь между участниками звонка, даже если они находятся за NAT. TURN - Traversal Using Relays around NAT - это протокол, который позволяет устройствам за NAT (Network Address Translation) или брандмауэром устанавливать связь с другими устройствами в интернете. TURN серверы используются для ретрансляции медиа-трафика между участниками видеоконференции, когда прямое соединение между ними невозможно из-за ограничений сети. В G365SFU используется встроенный TURN сервер, который можно включить с помощью переменной окружения `TURN_ALLOW=true`. Он будет слушать на порту 3478 и использовать диапазон портов от 40000 до 50000 для ретрансляции трафика. Параметры сервера, такие как публичный IP, имя пользователя и пароль, также настраиваются через переменные окружения. TURN сервер обеспечивает надежную связь между участниками звонка, даже если они находятся за NAT.
# Установка # Установка (Ubuntu 22.04 и выше)
Для начала, нам необходимо открыть порты 30000-39999 для SFU и 40000-50000 для TURN сервера (по умолчанию, если перенастраивается .env то нужно указать другие). Это можно сделать с помощью команды `ufw`: Для начала, нам необходимо открыть порты 30000-39999 для SFU и 40000-50000 для TURN сервера (по умолчанию, если перенастраивается .env то нужно указать другие). Это можно сделать с помощью команды `ufw`:
```bash ```bash
sudo ufw allow 30000:39999/udp sudo ufw allow 30000:39999/udp
@@ -19,3 +22,8 @@ sudo ufw allow 30000:39999/tcp
sudo ufw allow 40000:50000/tcp sudo ufw allow 40000:50000/tcp
sudo ufw allow 3478/tcp sudo ufw allow 3478/tcp
``` ```
Запускаем докер-контейнер с G365SFU, указав необходимые переменные окружения в файле `build/.env` (пример можно взять из корневой папки проекта .env). Для этого нужно выполнить следующую команду в терминале, находясь в папке build:
```bash
docker compose up -d
```

18
build/Dockerfile Normal file
View File

@@ -0,0 +1,18 @@
FROM alpine:3.20
RUN apk add --no-cache ca-certificates tzdata
WORKDIR /app
# Бинарь должен быть собран заранее в ./build/g365sfu
COPY g365sfu /app/g365sfu
COPY .env /app/.env
RUN chmod +x /app/g365sfu
EXPOSE 1001
EXPOSE 3478/tcp
EXPOSE 3478/udp
ENTRYPOINT ["/app/g365sfu"]

13
build/docker-compose.yml Normal file
View File

@@ -0,0 +1,13 @@
services:
g365sfu:
container_name: g365sfu
build:
context: .
dockerfile: Dockerfile
image: g365sfu:latest
restart: unless-stopped
env_file:
- .env
#Поддерживается только в Linux (на Windows и MacOS нужно использовать bridge сеть и проброс портов, но нельзя пробрасывать большие
#диапазоны портов, это может вызывать проблемы с работой Docker)
network_mode: host

View File

@@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/pion/rtcp" "github.com/pion/rtcp"
"github.com/pion/rtp"
"github.com/pion/webrtc/v4" "github.com/pion/webrtc/v4"
) )
@@ -20,7 +21,13 @@ var (
renegPending sync.Map // key -> bool renegPending sync.Map // key -> bool
) )
const disconnectGracePeriod = 12 * time.Second const (
disconnectGracePeriod = 30 * time.Second
outboundPacketBuffer = 256
maxConsecutiveReadErrs = 25
maxConsecutiveWriteErrs = 50
packetDropLogEvery = 100
)
func peerKey(roomID, peerID string) string { func peerKey(roomID, peerID string) string {
return roomID + "|" + peerID return roomID + "|" + peerID
@@ -64,6 +71,7 @@ func removeRoomTrack(roomID, trackID string) {
if !ok { if !ok {
return return
} }
room.mu.Lock() room.mu.Lock()
defer room.mu.Unlock() defer room.mu.Unlock()
@@ -79,8 +87,7 @@ func removeRoomTrack(roomID, trackID string) {
func isPeerConnectionAlive(pc *webrtc.PeerConnection) bool { func isPeerConnectionAlive(pc *webrtc.PeerConnection) bool {
state := pc.ConnectionState() state := pc.ConnectionState()
return state != webrtc.PeerConnectionStateClosed && return state != webrtc.PeerConnectionStateClosed &&
state != webrtc.PeerConnectionStateFailed && state != webrtc.PeerConnectionStateFailed
state != webrtc.PeerConnectionStateDisconnected
} }
// Вешается на каждый PeerConnection при JoinWithOffer. // Вешается на каждый PeerConnection при JoinWithOffer.
@@ -103,6 +110,7 @@ func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) {
cancelTimer := func() { cancelTimer := func() {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
if timer != nil { if timer != nil {
timer.Stop() timer.Stop()
timer = nil timer = nil
@@ -123,9 +131,11 @@ func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) {
startTimer := func() { startTimer := func() {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
if disconnecting { if disconnecting {
return return
} }
disconnecting = true disconnecting = true
timer = time.AfterFunc(disconnectGracePeriod, func() { timer = time.AfterFunc(disconnectGracePeriod, func() {
state := pc.ConnectionState() state := pc.ConnectionState()
@@ -166,8 +176,7 @@ func BindPeerLifecycle(roomID, peerID string, pc *webrtc.PeerConnection) {
}) })
} }
// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты // Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты.
// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты
func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) { func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) {
publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
localTrackID := fmt.Sprintf("%s:%s:%s:%d", localTrackID := fmt.Sprintf("%s:%s:%s:%d",
@@ -232,33 +241,100 @@ func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *
subPC := sub.PeerConnection subPC := sub.PeerConnection
go func() { go func() {
logger.LogInfoMessage("SetupForwardingForPeer: starting renegotiation for peer=" + subID)
if err := renegotiatePeer(roomID, subID, subPC); err != nil { if err := renegotiatePeer(roomID, subID, subPC); err != nil {
logger.LogWarnMessage("SetupForwardingForPeer: renegotiatePeer error: " + subID + " " + err.Error()) logger.LogWarnMessage("SetupForwardingForPeer: renegotiatePeer error: " + subID + " " + err.Error())
} }
}() }()
} }
// Для video просим keyframe
if remote.Kind() == webrtc.RTPCodecTypeVideo { if remote.Kind() == webrtc.RTPCodecTypeVideo {
_ = publisherPC.WriteRTCP([]rtcp.Packet{ _ = publisherPC.WriteRTCP([]rtcp.Packet{
&rtcp.PictureLossIndication{MediaSSRC: uint32(remote.SSRC())}, &rtcp.PictureLossIndication{MediaSSRC: uint32(remote.SSRC())},
}) })
} }
// Publisher RTP -> localTrack -> subscribers // Отделяем чтение входящего RTP от записи в TrackLocal bounded-очередью.
// Если downstream начинает тормозить, пакеты дропаются из очереди,
// а не накапливают бесконечную задержку.
packetQueue := make(chan *rtp.Packet, outboundPacketBuffer)
writerDone := make(chan struct{})
defer close(packetQueue)
go func() {
defer close(writerDone)
consecutiveWriteErrs := 0
for pkt := range packetQueue {
if err := localTrack.WriteRTP(pkt); err != nil {
consecutiveWriteErrs++
if consecutiveWriteErrs == 1 || consecutiveWriteErrs%10 == 0 {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: WriteRTP error publisher=%s track=%s count=%d err=%v",
publisherPeerID, localTrack.ID(), consecutiveWriteErrs, err,
))
}
if consecutiveWriteErrs >= maxConsecutiveWriteErrs {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: too many WriteRTP errors publisher=%s track=%s",
publisherPeerID, localTrack.ID(),
))
return
}
continue
}
consecutiveWriteErrs = 0
}
}()
consecutiveReadErrs := 0
droppedPackets := 0
for { for {
pkt, _, err := remote.ReadRTP() pkt, _, err := remote.ReadRTP()
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
return return
} }
logger.LogWarnMessage("SetupForwardingForPeer: ReadRTP error: " + err.Error())
consecutiveReadErrs++
if consecutiveReadErrs == 1 || consecutiveReadErrs%10 == 0 {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: ReadRTP error publisher=%s track=%s count=%d err=%v",
publisherPeerID, localTrack.ID(), consecutiveReadErrs, err,
))
}
if consecutiveReadErrs >= maxConsecutiveReadErrs {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: too many ReadRTP errors publisher=%s track=%s",
publisherPeerID, localTrack.ID(),
))
return return
} }
if err = localTrack.WriteRTP(pkt); err != nil {
logger.LogWarnMessage("SetupForwardingForPeer: WriteRTP error: " + err.Error()) time.Sleep(10 * time.Millisecond)
continue
}
consecutiveReadErrs = 0
select {
case <-writerDone:
return return
default:
}
select {
case <-writerDone:
return
case packetQueue <- pkt:
default:
droppedPackets++
if droppedPackets == 1 || droppedPackets%packetDropLogEvery == 0 {
logger.LogWarnMessage(fmt.Sprintf(
"SetupForwardingForPeer: packet queue overflow publisher=%s track=%s dropped=%d",
publisherPeerID, localTrack.ID(), droppedPackets,
))
}
} }
} }
}) })
@@ -273,7 +349,6 @@ func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error {
return nil return nil
} }
// Не начинаем новую negotiation поверх текущей.
if pc.SignalingState() != webrtc.SignalingStateStable { if pc.SignalingState() != webrtc.SignalingStateStable {
setRenegPending(roomID, peerID, true) setRenegPending(roomID, peerID, true)
return nil return nil
@@ -296,7 +371,7 @@ func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error {
return nil return nil
} }
// Добавляет ICE-кандидата к пиру // Добавляет ICE-кандидата к пиру.
func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidateInit) error { func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidateInit) error {
room, exists := GetRoom(roomID) room, exists := GetRoom(roomID)
if !exists { if !exists {
@@ -319,7 +394,6 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate
rd := pc.RemoteDescription() rd := pc.RemoteDescription()
if rd == nil { if rd == nil {
// offer/answer еще не применен — буферизуем
pendingMu.Lock() pendingMu.Lock()
k := peerKey(roomID, peerID) k := peerKey(roomID, peerID)
pendingCandidates[k] = append(pendingCandidates[k], candidate) pendingCandidates[k] = append(pendingCandidates[k], candidate)
@@ -327,7 +401,6 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate
return nil return nil
} }
// Отбрасываем stale candidate по ufrag
if candidate.UsernameFragment != nil { if candidate.UsernameFragment != nil {
current := extractICEUfrag(rd.SDP) current := extractICEUfrag(rd.SDP)
if current != "" && *candidate.UsernameFragment != current { if current != "" && *candidate.UsernameFragment != current {
@@ -342,7 +415,7 @@ func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidate
return err return err
} }
// Обрабатывает SDP answer от клиента при renegotiation // Обрабатывает SDP answer от клиента при renegotiation.
func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescription) error { func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescription) error {
if answer.Type != webrtc.SDPTypeAnswer { if answer.Type != webrtc.SDPTypeAnswer {
return fmt.Errorf("invalid sdp type: %s", answer.Type.String()) return fmt.Errorf("invalid sdp type: %s", answer.Type.String())
@@ -371,7 +444,6 @@ func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescr
return err return err
} }
// После применения answer — применяем отложенные кандидаты.
k := peerKey(roomID, peerID) k := peerKey(roomID, peerID)
pendingMu.Lock() pendingMu.Lock()
queue := pendingCandidates[k] queue := pendingCandidates[k]
@@ -382,7 +454,6 @@ func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescr
_ = AddICECandidate(roomID, peerID, c) _ = AddICECandidate(roomID, peerID, c)
} }
// Если во время negotiation накопился новый запрос — запускаем еще цикл.
if popRenegPending(roomID, peerID) { if popRenegPending(roomID, peerID) {
_ = renegotiatePeer(roomID, peerID, pc) _ = renegotiatePeer(roomID, peerID, pc)
} }

View File

@@ -4,6 +4,7 @@ import (
"g365sfu/logger" "g365sfu/logger"
connection "g365sfu/socket/struct" connection "g365sfu/socket/struct"
"sync" "sync"
"time"
"github.com/pion/webrtc/v4" "github.com/pion/webrtc/v4"
) )
@@ -34,6 +35,8 @@ type Room struct {
Tracks []RoomTrack Tracks []RoomTrack
mu sync.RWMutex mu sync.RWMutex
emptyTimer *time.Timer
} }
// Общие переменные // Общие переменные
@@ -42,6 +45,46 @@ var (
roomsMu sync.RWMutex roomsMu sync.RWMutex
) )
const emptyRoomTTL = 30 * time.Second
func scheduleEmptyRoomDeletion(room *Room) {
room.mu.Lock()
if len(room.Peers) > 0 {
if room.emptyTimer != nil {
room.emptyTimer.Stop()
room.emptyTimer = nil
}
room.mu.Unlock()
return
}
if room.emptyTimer != nil {
room.mu.Unlock()
return
}
roomID := room.RoomID
var timer *time.Timer
timer = time.AfterFunc(emptyRoomTTL, func() {
currentRoom, exists := GetRoom(roomID)
if !exists {
return
}
currentRoom.mu.Lock()
if currentRoom.emptyTimer != timer || len(currentRoom.Peers) > 0 {
currentRoom.mu.Unlock()
return
}
currentRoom.emptyTimer = nil
currentRoom.mu.Unlock()
if err := DeleteRoom(roomID); err != nil && err != ErrRoomNotFound {
logger.LogWarnMessage("scheduleEmptyRoomDeletion: failed to delete room " + roomID + ": " + err.Error())
}
})
room.emptyTimer = timer
room.mu.Unlock()
}
// CreateRoom создает комнату // CreateRoom создает комнату
func CreateRoom(server *connection.Connection, roomID string) (*Room, error) { func CreateRoom(server *connection.Connection, roomID string) (*Room, error) {
roomsMu.Lock() roomsMu.Lock()
@@ -53,6 +96,7 @@ func CreateRoom(server *connection.Connection, roomID string) (*Room, error) {
Peers: []Peer{}, Peers: []Peer{},
} }
rooms[roomID] = room rooms[roomID] = room
scheduleEmptyRoomDeletion(room)
return room, nil return room, nil
} }
@@ -110,6 +154,10 @@ func JoinWithOffer(roomID string, peerID string, offer webrtc.SessionDescription
// Добавляем peer в комнату и сразу снимаем snapshot существующих треков // Добавляем peer в комнату и сразу снимаем snapshot существующих треков
// в одном локе — чтобы не было race с OnTrack // в одном локе — чтобы не было race с OnTrack
room.mu.Lock() room.mu.Lock()
if room.emptyTimer != nil {
room.emptyTimer.Stop()
room.emptyTimer = nil
}
room.Peers = append(room.Peers, Peer{ room.Peers = append(room.Peers, Peer{
PeerID: peerID, PeerID: peerID,
PeerConnection: peerConnection, PeerConnection: peerConnection,
@@ -164,6 +212,10 @@ func DeleteRoom(roomID string) error {
roomsMu.Unlock() roomsMu.Unlock()
room.mu.Lock() room.mu.Lock()
if room.emptyTimer != nil {
room.emptyTimer.Stop()
room.emptyTimer = nil
}
peers := make([]Peer, len(room.Peers)) peers := make([]Peer, len(room.Peers))
copy(peers, room.Peers) copy(peers, room.Peers)
room.Peers = nil room.Peers = nil
@@ -227,9 +279,10 @@ func LeaveRoom(roomID string, peerID string) error {
} }
cleanupForwardingState(roomID, peerID) cleanupForwardingState(roomID, peerID)
// Комната пустая -> удаляем // Комната пустая -> планируем удаление через TTL
if shouldDrop { if shouldDrop {
return DeleteRoom(roomID) scheduleEmptyRoomDeletion(room)
return nil
} }
// renegotiation оставшимся peer после удаления треков/peer // renegotiation оставшимся peer после удаления треков/peer

View File

@@ -32,7 +32,15 @@ func getSecret() string {
// Обработчик WebSocket соединений // Обработчик WebSocket соединений
func HandleWebSocket(w http.ResponseWriter, r *http.Request) { func HandleWebSocket(w http.ResponseWriter, r *http.Request) {
conn, _ := upgrader.Upgrade(w, r, nil) conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
logger.LogWarnMessage("failed to upgrade to websocket: " + err.Error())
return
}
if conn == nil {
logger.LogWarnMessage("failed to upgrade to websocket: connection is nil")
return
}
defer conn.Close() defer conn.Close()
// Канал для передачи байтов // Канал для передачи байтов