Compare commits
6 Commits
28b1b7c819
...
b81ae8a3c0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b81ae8a3c0 | ||
|
|
242456bb3c | ||
|
|
ebff80f5f1 | ||
|
|
8d5d5df8b9 | ||
|
|
b63867287b | ||
|
|
1c245bd453 |
2
main.go
2
main.go
@@ -2,6 +2,7 @@ package main
|
||||
|
||||
import (
|
||||
"g365sfu/logger"
|
||||
"g365sfu/sfu"
|
||||
"g365sfu/socket"
|
||||
"net/http"
|
||||
"os"
|
||||
@@ -11,6 +12,7 @@ import (
|
||||
|
||||
func main() {
|
||||
godotenv.Load()
|
||||
sfu.InitWebRTCEngines()
|
||||
if os.Getenv("SECRET") == "" {
|
||||
logger.LogErrorMessage("server failed to start because not set secret key in .env variables")
|
||||
return
|
||||
|
||||
129
sfu/forwarding.go
Normal file
129
sfu/forwarding.go
Normal file
@@ -0,0 +1,129 @@
|
||||
package sfu
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты
|
||||
func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) {
|
||||
publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
|
||||
localTrack, err := webrtc.NewTrackLocalStaticRTP(
|
||||
remote.Codec().RTPCodecCapability,
|
||||
fmt.Sprintf("%s:%s", publisherPeerID, remote.ID()),
|
||||
remote.StreamID(),
|
||||
)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Добавляем этот track всем, кроме publisher
|
||||
roomsMu.RLock()
|
||||
room, ok := rooms[roomID]
|
||||
if !ok {
|
||||
roomsMu.RUnlock()
|
||||
return
|
||||
}
|
||||
peers := make([]Peer, len(room.Peers))
|
||||
copy(peers, room.Peers)
|
||||
roomsMu.RUnlock()
|
||||
|
||||
for _, sub := range peers {
|
||||
if sub.PeerID == publisherPeerID {
|
||||
continue
|
||||
}
|
||||
|
||||
sender, err := sub.PeerConnection.AddTrack(localTrack)
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
// RTCP drain обязателен
|
||||
go func() {
|
||||
buf := make([]byte, 1500)
|
||||
for {
|
||||
if _, _, e := sender.Read(buf); e != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// После AddTrack нужна renegotiation для подписчика
|
||||
_ = renegotiatePeer(roomID, sub.PeerID, sub.PeerConnection)
|
||||
}
|
||||
|
||||
// Пересылаем RTP пакеты от издателя всем подписчикам
|
||||
for {
|
||||
pkt, _, err := remote.ReadRTP()
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
return
|
||||
}
|
||||
return
|
||||
}
|
||||
if err = localTrack.WriteRTP(pkt); err != nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error {
|
||||
offer, err := pc.CreateOffer(nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
gatherDone := webrtc.GatheringCompletePromise(pc)
|
||||
if err = pc.SetLocalDescription(offer); err != nil {
|
||||
return err
|
||||
}
|
||||
<-gatherDone
|
||||
|
||||
if OnServerOffer != nil && pc.LocalDescription() != nil {
|
||||
OnServerOffer(roomID, peerID, *pc.LocalDescription())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Добавляет ICE-кандидата к пиру
|
||||
func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidateInit) error {
|
||||
room, exists := GetRoom(roomID)
|
||||
if !exists {
|
||||
return ErrRoomNotFound
|
||||
}
|
||||
|
||||
for _, peer := range room.Peers {
|
||||
if peer.PeerID == peerID {
|
||||
return peer.PeerConnection.AddICECandidate(candidate)
|
||||
}
|
||||
}
|
||||
|
||||
return ErrPeerNotFound
|
||||
}
|
||||
|
||||
// Обрабатывает SDP ответ от клиента при renegotiation
|
||||
func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescription) error {
|
||||
roomsMu.RLock()
|
||||
room, ok := rooms[roomID]
|
||||
if !ok {
|
||||
roomsMu.RUnlock()
|
||||
return ErrRoomNotFound
|
||||
}
|
||||
|
||||
var pc *webrtc.PeerConnection
|
||||
for _, p := range room.Peers {
|
||||
if p.PeerID == peerID {
|
||||
pc = p.PeerConnection
|
||||
break
|
||||
}
|
||||
}
|
||||
roomsMu.RUnlock()
|
||||
|
||||
if pc == nil {
|
||||
return ErrPeerNotFound
|
||||
}
|
||||
return pc.SetRemoteDescription(answer)
|
||||
}
|
||||
129
sfu/rooms.go
Normal file
129
sfu/rooms.go
Normal file
@@ -0,0 +1,129 @@
|
||||
package sfu
|
||||
|
||||
import (
|
||||
"g365sfu/socket"
|
||||
"g365sfu/utils"
|
||||
"sync"
|
||||
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
// Общие переменные
|
||||
var (
|
||||
rooms = make(map[string]*Room)
|
||||
roomsMu sync.RWMutex
|
||||
)
|
||||
|
||||
type Peer struct {
|
||||
//Идентификатор пира, который будет использоваться для связи с ним
|
||||
PeerID string
|
||||
//Подсоединенный пир
|
||||
PeerConnection *webrtc.PeerConnection
|
||||
}
|
||||
|
||||
type Room struct {
|
||||
//Уникальный идентификатор комнаты
|
||||
RoomID string
|
||||
//Сервер который создал комнату
|
||||
Server []*socket.Connection
|
||||
//Пиры которые подключились к комнате
|
||||
Peers []Peer
|
||||
}
|
||||
|
||||
// CreateRoom создает комнату
|
||||
func CreateRoom() (*Room, error) {
|
||||
roomID := "room_" + utils.RandomString(64)
|
||||
roomsMu.Lock()
|
||||
defer roomsMu.Unlock()
|
||||
|
||||
room := &Room{
|
||||
RoomID: roomID,
|
||||
Server: []*socket.Connection{},
|
||||
Peers: []Peer{},
|
||||
}
|
||||
rooms[roomID] = room
|
||||
|
||||
return room, nil
|
||||
}
|
||||
|
||||
// GetRoom получает комнату по идентификатору
|
||||
func GetRoom(roomID string) (*Room, bool) {
|
||||
roomsMu.RLock()
|
||||
defer roomsMu.RUnlock()
|
||||
room, exists := rooms[roomID]
|
||||
return room, exists
|
||||
}
|
||||
|
||||
// DeleteRoom удаляет комнату по идентификатору
|
||||
func DeleteRoom(roomID string) {
|
||||
roomsMu.Lock()
|
||||
defer roomsMu.Unlock()
|
||||
delete(rooms, roomID)
|
||||
}
|
||||
|
||||
// JoinWithOffer позволяет пиру присоединиться к комнате с помощью SDP оффера
|
||||
func JoinWithOffer(roomID string, peerID string, offer webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
|
||||
room, exists := GetRoom(roomID)
|
||||
if !exists {
|
||||
return nil, ErrRoomNotFound
|
||||
}
|
||||
|
||||
peerConnection, err := api.NewPeerConnection(pcConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// SFU локальные ICE-кандидаты отправляем сначала бекенду затем тот их
|
||||
// пересылает клиенту для установления соединения
|
||||
peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) {
|
||||
if c == nil {
|
||||
return // gathering finished
|
||||
}
|
||||
if OnLocalICECandidate != nil {
|
||||
OnLocalICECandidate(roomID, peerID, c.ToJSON())
|
||||
}
|
||||
})
|
||||
|
||||
err = peerConnection.SetRemoteDescription(offer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
answer, err := peerConnection.CreateAnswer(nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
err = peerConnection.SetLocalDescription(answer)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Настраиваем пересылку RTP пакетов от издателя к другим участникам комнаты
|
||||
SetupForwardingForPeer(roomID, peerID, peerConnection)
|
||||
|
||||
room.Peers = append(room.Peers, Peer{
|
||||
PeerID: peerID,
|
||||
PeerConnection: peerConnection,
|
||||
})
|
||||
|
||||
return peerConnection.LocalDescription(), nil
|
||||
}
|
||||
|
||||
// LeaveRoom позволяет пиру покинуть комнату
|
||||
func LeaveRoom(roomID string, peerID string) error {
|
||||
room, exists := GetRoom(roomID)
|
||||
if !exists {
|
||||
return ErrRoomNotFound
|
||||
}
|
||||
|
||||
for i, peer := range room.Peers {
|
||||
if peer.PeerID == peerID {
|
||||
peer.PeerConnection.Close()
|
||||
room.Peers = append(room.Peers[:i], room.Peers[i+1:]...)
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
67
sfu/sfu.go
Normal file
67
sfu/sfu.go
Normal file
@@ -0,0 +1,67 @@
|
||||
package sfu
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"g365sfu/logger"
|
||||
"os"
|
||||
|
||||
"github.com/pion/interceptor"
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
var (
|
||||
api *webrtc.API
|
||||
pcConfig webrtc.Configuration
|
||||
)
|
||||
|
||||
// Коллбек для обработки новых SDP офферов от сервера для конкретного пира (при renegotiation)
|
||||
var OnServerOffer func(roomID string, peerID string, offer webrtc.SessionDescription)
|
||||
|
||||
// Коллбек для обработки новых ICE кандидатов
|
||||
var OnLocalICECandidate func(roomID, peerID string, candidate webrtc.ICECandidateInit)
|
||||
|
||||
// Ошибки
|
||||
var (
|
||||
ErrRoomNotFound = errors.New("room not found")
|
||||
ErrPeerNotFound = errors.New("peer not found")
|
||||
)
|
||||
|
||||
func InitWebRTCEngines() {
|
||||
publicIP := os.Getenv("PUBLIC_IP")
|
||||
fromPort := os.Getenv("PORT_RANGE_FROM")
|
||||
toPort := os.Getenv("PORT_RANGE_TO")
|
||||
if publicIP == "" || fromPort == "" || toPort == "" {
|
||||
// Если не указаны необходимые переменные окружения, логируем ошибку и завершаем процесс сервера
|
||||
logger.LogErrorMessage("PUBLIC_IP, PORT_RANGE_FROM and PORT_RANGE_TO environment variables must be set")
|
||||
os.Exit(-1)
|
||||
return
|
||||
}
|
||||
m := &webrtc.MediaEngine{}
|
||||
_ = m.RegisterDefaultCodecs()
|
||||
|
||||
i := &interceptor.Registry{}
|
||||
_ = webrtc.RegisterDefaultInterceptors(m, i)
|
||||
|
||||
se := webrtc.SettingEngine{}
|
||||
_ = se.SetEphemeralUDPPortRange(40000, 50000)
|
||||
|
||||
if publicIP := os.Getenv("PUBLIC_IP"); publicIP != "" {
|
||||
se.SetICEAddressRewriteRules(webrtc.ICEAddressRewriteRule{
|
||||
External: []string{publicIP},
|
||||
AsCandidateType: webrtc.ICECandidateTypeHost,
|
||||
Mode: webrtc.ICEAddressRewriteReplace,
|
||||
})
|
||||
}
|
||||
|
||||
api = webrtc.NewAPI(
|
||||
webrtc.WithMediaEngine(m),
|
||||
webrtc.WithInterceptorRegistry(i),
|
||||
webrtc.WithSettingEngine(se),
|
||||
)
|
||||
|
||||
pcConfig = webrtc.Configuration{
|
||||
ICEServers: []webrtc.ICEServer{
|
||||
{URLs: []string{"stun:stun.l.google.com:19302"}},
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -2,7 +2,7 @@ package socket
|
||||
|
||||
import (
|
||||
"g365sfu/logger"
|
||||
"math/rand"
|
||||
"g365sfu/utils"
|
||||
"net/http"
|
||||
"os"
|
||||
|
||||
@@ -18,6 +18,7 @@ var upgrader = websocket.Upgrader{
|
||||
},
|
||||
}
|
||||
|
||||
// Получение секретного ключа из переменных окружения
|
||||
func getSecret() string {
|
||||
return os.Getenv("SECRET")
|
||||
}
|
||||
@@ -54,17 +55,7 @@ func HandleWebSocket(w http.ResponseWriter, r *http.Request) {
|
||||
// Генерация случайного идентификатора для сокета
|
||||
func randomSocketIdentifier() string {
|
||||
// Генерация случайного идентификатора для сокета
|
||||
return "sock_" + randomString(10)
|
||||
}
|
||||
|
||||
// Генерация случайной строки заданной длины
|
||||
func randomString(n int) string {
|
||||
const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
|
||||
b := make([]byte, n)
|
||||
for i := range b {
|
||||
b[i] = letters[rand.Intn(len(letters))]
|
||||
}
|
||||
return string(b)
|
||||
return "sock_" + utils.RandomString(10)
|
||||
}
|
||||
|
||||
func processData(data <-chan []byte, connection *Connection) {
|
||||
|
||||
13
utils/utils.go
Normal file
13
utils/utils.go
Normal file
@@ -0,0 +1,13 @@
|
||||
package utils
|
||||
|
||||
import "math/rand"
|
||||
|
||||
// Генерация случайной строки заданной длины
|
||||
func RandomString(n int) string {
|
||||
const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
|
||||
b := make([]byte, n)
|
||||
for i := range b {
|
||||
b[i] = letters[rand.Intn(len(letters))]
|
||||
}
|
||||
return string(b)
|
||||
}
|
||||
Reference in New Issue
Block a user