Compare commits

..

6 Commits

6 changed files with 343 additions and 12 deletions

View File

@@ -2,6 +2,7 @@ package main
import (
"g365sfu/logger"
"g365sfu/sfu"
"g365sfu/socket"
"net/http"
"os"
@@ -11,6 +12,7 @@ import (
func main() {
godotenv.Load()
sfu.InitWebRTCEngines()
if os.Getenv("SECRET") == "" {
logger.LogErrorMessage("server failed to start because not set secret key in .env variables")
return

129
sfu/forwarding.go Normal file
View File

@@ -0,0 +1,129 @@
package sfu
import (
"fmt"
"io"
"github.com/pion/webrtc/v4"
)
// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты
func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) {
publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
localTrack, err := webrtc.NewTrackLocalStaticRTP(
remote.Codec().RTPCodecCapability,
fmt.Sprintf("%s:%s", publisherPeerID, remote.ID()),
remote.StreamID(),
)
if err != nil {
return
}
// Добавляем этот track всем, кроме publisher
roomsMu.RLock()
room, ok := rooms[roomID]
if !ok {
roomsMu.RUnlock()
return
}
peers := make([]Peer, len(room.Peers))
copy(peers, room.Peers)
roomsMu.RUnlock()
for _, sub := range peers {
if sub.PeerID == publisherPeerID {
continue
}
sender, err := sub.PeerConnection.AddTrack(localTrack)
if err != nil {
continue
}
// RTCP drain обязателен
go func() {
buf := make([]byte, 1500)
for {
if _, _, e := sender.Read(buf); e != nil {
return
}
}
}()
// После AddTrack нужна renegotiation для подписчика
_ = renegotiatePeer(roomID, sub.PeerID, sub.PeerConnection)
}
// Пересылаем RTP пакеты от издателя всем подписчикам
for {
pkt, _, err := remote.ReadRTP()
if err != nil {
if err == io.EOF {
return
}
return
}
if err = localTrack.WriteRTP(pkt); err != nil {
return
}
}
})
}
func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error {
offer, err := pc.CreateOffer(nil)
if err != nil {
return err
}
gatherDone := webrtc.GatheringCompletePromise(pc)
if err = pc.SetLocalDescription(offer); err != nil {
return err
}
<-gatherDone
if OnServerOffer != nil && pc.LocalDescription() != nil {
OnServerOffer(roomID, peerID, *pc.LocalDescription())
}
return nil
}
// Добавляет ICE-кандидата к пиру
func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidateInit) error {
room, exists := GetRoom(roomID)
if !exists {
return ErrRoomNotFound
}
for _, peer := range room.Peers {
if peer.PeerID == peerID {
return peer.PeerConnection.AddICECandidate(candidate)
}
}
return ErrPeerNotFound
}
// Обрабатывает SDP ответ от клиента при renegotiation
func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescription) error {
roomsMu.RLock()
room, ok := rooms[roomID]
if !ok {
roomsMu.RUnlock()
return ErrRoomNotFound
}
var pc *webrtc.PeerConnection
for _, p := range room.Peers {
if p.PeerID == peerID {
pc = p.PeerConnection
break
}
}
roomsMu.RUnlock()
if pc == nil {
return ErrPeerNotFound
}
return pc.SetRemoteDescription(answer)
}

129
sfu/rooms.go Normal file
View File

@@ -0,0 +1,129 @@
package sfu
import (
"g365sfu/socket"
"g365sfu/utils"
"sync"
"github.com/pion/webrtc/v4"
)
// Общие переменные
var (
rooms = make(map[string]*Room)
roomsMu sync.RWMutex
)
type Peer struct {
//Идентификатор пира, который будет использоваться для связи с ним
PeerID string
//Подсоединенный пир
PeerConnection *webrtc.PeerConnection
}
type Room struct {
//Уникальный идентификатор комнаты
RoomID string
//Сервер который создал комнату
Server []*socket.Connection
//Пиры которые подключились к комнате
Peers []Peer
}
// CreateRoom создает комнату
func CreateRoom() (*Room, error) {
roomID := "room_" + utils.RandomString(64)
roomsMu.Lock()
defer roomsMu.Unlock()
room := &Room{
RoomID: roomID,
Server: []*socket.Connection{},
Peers: []Peer{},
}
rooms[roomID] = room
return room, nil
}
// GetRoom получает комнату по идентификатору
func GetRoom(roomID string) (*Room, bool) {
roomsMu.RLock()
defer roomsMu.RUnlock()
room, exists := rooms[roomID]
return room, exists
}
// DeleteRoom удаляет комнату по идентификатору
func DeleteRoom(roomID string) {
roomsMu.Lock()
defer roomsMu.Unlock()
delete(rooms, roomID)
}
// JoinWithOffer позволяет пиру присоединиться к комнате с помощью SDP оффера
func JoinWithOffer(roomID string, peerID string, offer webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
room, exists := GetRoom(roomID)
if !exists {
return nil, ErrRoomNotFound
}
peerConnection, err := api.NewPeerConnection(pcConfig)
if err != nil {
return nil, err
}
// SFU локальные ICE-кандидаты отправляем сначала бекенду затем тот их
// пересылает клиенту для установления соединения
peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) {
if c == nil {
return // gathering finished
}
if OnLocalICECandidate != nil {
OnLocalICECandidate(roomID, peerID, c.ToJSON())
}
})
err = peerConnection.SetRemoteDescription(offer)
if err != nil {
return nil, err
}
answer, err := peerConnection.CreateAnswer(nil)
if err != nil {
return nil, err
}
err = peerConnection.SetLocalDescription(answer)
if err != nil {
return nil, err
}
// Настраиваем пересылку RTP пакетов от издателя к другим участникам комнаты
SetupForwardingForPeer(roomID, peerID, peerConnection)
room.Peers = append(room.Peers, Peer{
PeerID: peerID,
PeerConnection: peerConnection,
})
return peerConnection.LocalDescription(), nil
}
// LeaveRoom позволяет пиру покинуть комнату
func LeaveRoom(roomID string, peerID string) error {
room, exists := GetRoom(roomID)
if !exists {
return ErrRoomNotFound
}
for i, peer := range room.Peers {
if peer.PeerID == peerID {
peer.PeerConnection.Close()
room.Peers = append(room.Peers[:i], room.Peers[i+1:]...)
break
}
}
return nil
}

67
sfu/sfu.go Normal file
View File

@@ -0,0 +1,67 @@
package sfu
import (
"errors"
"g365sfu/logger"
"os"
"github.com/pion/interceptor"
"github.com/pion/webrtc/v4"
)
var (
api *webrtc.API
pcConfig webrtc.Configuration
)
// Коллбек для обработки новых SDP офферов от сервера для конкретного пира (при renegotiation)
var OnServerOffer func(roomID string, peerID string, offer webrtc.SessionDescription)
// Коллбек для обработки новых ICE кандидатов
var OnLocalICECandidate func(roomID, peerID string, candidate webrtc.ICECandidateInit)
// Ошибки
var (
ErrRoomNotFound = errors.New("room not found")
ErrPeerNotFound = errors.New("peer not found")
)
func InitWebRTCEngines() {
publicIP := os.Getenv("PUBLIC_IP")
fromPort := os.Getenv("PORT_RANGE_FROM")
toPort := os.Getenv("PORT_RANGE_TO")
if publicIP == "" || fromPort == "" || toPort == "" {
// Если не указаны необходимые переменные окружения, логируем ошибку и завершаем процесс сервера
logger.LogErrorMessage("PUBLIC_IP, PORT_RANGE_FROM and PORT_RANGE_TO environment variables must be set")
os.Exit(-1)
return
}
m := &webrtc.MediaEngine{}
_ = m.RegisterDefaultCodecs()
i := &interceptor.Registry{}
_ = webrtc.RegisterDefaultInterceptors(m, i)
se := webrtc.SettingEngine{}
_ = se.SetEphemeralUDPPortRange(40000, 50000)
if publicIP := os.Getenv("PUBLIC_IP"); publicIP != "" {
se.SetICEAddressRewriteRules(webrtc.ICEAddressRewriteRule{
External: []string{publicIP},
AsCandidateType: webrtc.ICECandidateTypeHost,
Mode: webrtc.ICEAddressRewriteReplace,
})
}
api = webrtc.NewAPI(
webrtc.WithMediaEngine(m),
webrtc.WithInterceptorRegistry(i),
webrtc.WithSettingEngine(se),
)
pcConfig = webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{URLs: []string{"stun:stun.l.google.com:19302"}},
},
}
}

View File

@@ -2,7 +2,7 @@ package socket
import (
"g365sfu/logger"
"math/rand"
"g365sfu/utils"
"net/http"
"os"
@@ -18,6 +18,7 @@ var upgrader = websocket.Upgrader{
},
}
// Получение секретного ключа из переменных окружения
func getSecret() string {
return os.Getenv("SECRET")
}
@@ -54,17 +55,7 @@ func HandleWebSocket(w http.ResponseWriter, r *http.Request) {
// Генерация случайного идентификатора для сокета
func randomSocketIdentifier() string {
// Генерация случайного идентификатора для сокета
return "sock_" + randomString(10)
}
// Генерация случайной строки заданной длины
func randomString(n int) string {
const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
b := make([]byte, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
return "sock_" + utils.RandomString(10)
}
func processData(data <-chan []byte, connection *Connection) {

13
utils/utils.go Normal file
View File

@@ -0,0 +1,13 @@
package utils
import "math/rand"
// Генерация случайной строки заданной длины
func RandomString(n int) string {
const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
b := make([]byte, n)
for i := range b {
b[i] = letters[rand.Intn(len(letters))]
}
return string(b)
}