Compare commits
6 Commits
28b1b7c819
...
b81ae8a3c0
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b81ae8a3c0 | ||
|
|
242456bb3c | ||
|
|
ebff80f5f1 | ||
|
|
8d5d5df8b9 | ||
|
|
b63867287b | ||
|
|
1c245bd453 |
2
main.go
2
main.go
@@ -2,6 +2,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"g365sfu/logger"
|
"g365sfu/logger"
|
||||||
|
"g365sfu/sfu"
|
||||||
"g365sfu/socket"
|
"g365sfu/socket"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
@@ -11,6 +12,7 @@ import (
|
|||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
godotenv.Load()
|
godotenv.Load()
|
||||||
|
sfu.InitWebRTCEngines()
|
||||||
if os.Getenv("SECRET") == "" {
|
if os.Getenv("SECRET") == "" {
|
||||||
logger.LogErrorMessage("server failed to start because not set secret key in .env variables")
|
logger.LogErrorMessage("server failed to start because not set secret key in .env variables")
|
||||||
return
|
return
|
||||||
|
|||||||
129
sfu/forwarding.go
Normal file
129
sfu/forwarding.go
Normal file
@@ -0,0 +1,129 @@
|
|||||||
|
package sfu
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
|
||||||
|
"github.com/pion/webrtc/v4"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Вызывается при JoinWithOffer для ретрансляции RTP пакетов от издателя к другим участникам комнаты
|
||||||
|
func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC *webrtc.PeerConnection) {
|
||||||
|
publisherPC.OnTrack(func(remote *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
|
||||||
|
localTrack, err := webrtc.NewTrackLocalStaticRTP(
|
||||||
|
remote.Codec().RTPCodecCapability,
|
||||||
|
fmt.Sprintf("%s:%s", publisherPeerID, remote.ID()),
|
||||||
|
remote.StreamID(),
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Добавляем этот track всем, кроме publisher
|
||||||
|
roomsMu.RLock()
|
||||||
|
room, ok := rooms[roomID]
|
||||||
|
if !ok {
|
||||||
|
roomsMu.RUnlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
peers := make([]Peer, len(room.Peers))
|
||||||
|
copy(peers, room.Peers)
|
||||||
|
roomsMu.RUnlock()
|
||||||
|
|
||||||
|
for _, sub := range peers {
|
||||||
|
if sub.PeerID == publisherPeerID {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
sender, err := sub.PeerConnection.AddTrack(localTrack)
|
||||||
|
if err != nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// RTCP drain обязателен
|
||||||
|
go func() {
|
||||||
|
buf := make([]byte, 1500)
|
||||||
|
for {
|
||||||
|
if _, _, e := sender.Read(buf); e != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
// После AddTrack нужна renegotiation для подписчика
|
||||||
|
_ = renegotiatePeer(roomID, sub.PeerID, sub.PeerConnection)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Пересылаем RTP пакеты от издателя всем подписчикам
|
||||||
|
for {
|
||||||
|
pkt, _, err := remote.ReadRTP()
|
||||||
|
if err != nil {
|
||||||
|
if err == io.EOF {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if err = localTrack.WriteRTP(pkt); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error {
|
||||||
|
offer, err := pc.CreateOffer(nil)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
gatherDone := webrtc.GatheringCompletePromise(pc)
|
||||||
|
if err = pc.SetLocalDescription(offer); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
<-gatherDone
|
||||||
|
|
||||||
|
if OnServerOffer != nil && pc.LocalDescription() != nil {
|
||||||
|
OnServerOffer(roomID, peerID, *pc.LocalDescription())
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Добавляет ICE-кандидата к пиру
|
||||||
|
func AddICECandidate(roomID string, peerID string, candidate webrtc.ICECandidateInit) error {
|
||||||
|
room, exists := GetRoom(roomID)
|
||||||
|
if !exists {
|
||||||
|
return ErrRoomNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, peer := range room.Peers {
|
||||||
|
if peer.PeerID == peerID {
|
||||||
|
return peer.PeerConnection.AddICECandidate(candidate)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return ErrPeerNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
// Обрабатывает SDP ответ от клиента при renegotiation
|
||||||
|
func HandleClientAnswer(roomID string, peerID string, answer webrtc.SessionDescription) error {
|
||||||
|
roomsMu.RLock()
|
||||||
|
room, ok := rooms[roomID]
|
||||||
|
if !ok {
|
||||||
|
roomsMu.RUnlock()
|
||||||
|
return ErrRoomNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
var pc *webrtc.PeerConnection
|
||||||
|
for _, p := range room.Peers {
|
||||||
|
if p.PeerID == peerID {
|
||||||
|
pc = p.PeerConnection
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
roomsMu.RUnlock()
|
||||||
|
|
||||||
|
if pc == nil {
|
||||||
|
return ErrPeerNotFound
|
||||||
|
}
|
||||||
|
return pc.SetRemoteDescription(answer)
|
||||||
|
}
|
||||||
129
sfu/rooms.go
Normal file
129
sfu/rooms.go
Normal file
@@ -0,0 +1,129 @@
|
|||||||
|
package sfu
|
||||||
|
|
||||||
|
import (
|
||||||
|
"g365sfu/socket"
|
||||||
|
"g365sfu/utils"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/pion/webrtc/v4"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Общие переменные
|
||||||
|
var (
|
||||||
|
rooms = make(map[string]*Room)
|
||||||
|
roomsMu sync.RWMutex
|
||||||
|
)
|
||||||
|
|
||||||
|
type Peer struct {
|
||||||
|
//Идентификатор пира, который будет использоваться для связи с ним
|
||||||
|
PeerID string
|
||||||
|
//Подсоединенный пир
|
||||||
|
PeerConnection *webrtc.PeerConnection
|
||||||
|
}
|
||||||
|
|
||||||
|
type Room struct {
|
||||||
|
//Уникальный идентификатор комнаты
|
||||||
|
RoomID string
|
||||||
|
//Сервер который создал комнату
|
||||||
|
Server []*socket.Connection
|
||||||
|
//Пиры которые подключились к комнате
|
||||||
|
Peers []Peer
|
||||||
|
}
|
||||||
|
|
||||||
|
// CreateRoom создает комнату
|
||||||
|
func CreateRoom() (*Room, error) {
|
||||||
|
roomID := "room_" + utils.RandomString(64)
|
||||||
|
roomsMu.Lock()
|
||||||
|
defer roomsMu.Unlock()
|
||||||
|
|
||||||
|
room := &Room{
|
||||||
|
RoomID: roomID,
|
||||||
|
Server: []*socket.Connection{},
|
||||||
|
Peers: []Peer{},
|
||||||
|
}
|
||||||
|
rooms[roomID] = room
|
||||||
|
|
||||||
|
return room, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetRoom получает комнату по идентификатору
|
||||||
|
func GetRoom(roomID string) (*Room, bool) {
|
||||||
|
roomsMu.RLock()
|
||||||
|
defer roomsMu.RUnlock()
|
||||||
|
room, exists := rooms[roomID]
|
||||||
|
return room, exists
|
||||||
|
}
|
||||||
|
|
||||||
|
// DeleteRoom удаляет комнату по идентификатору
|
||||||
|
func DeleteRoom(roomID string) {
|
||||||
|
roomsMu.Lock()
|
||||||
|
defer roomsMu.Unlock()
|
||||||
|
delete(rooms, roomID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// JoinWithOffer позволяет пиру присоединиться к комнате с помощью SDP оффера
|
||||||
|
func JoinWithOffer(roomID string, peerID string, offer webrtc.SessionDescription) (*webrtc.SessionDescription, error) {
|
||||||
|
room, exists := GetRoom(roomID)
|
||||||
|
if !exists {
|
||||||
|
return nil, ErrRoomNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
peerConnection, err := api.NewPeerConnection(pcConfig)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// SFU локальные ICE-кандидаты отправляем сначала бекенду затем тот их
|
||||||
|
// пересылает клиенту для установления соединения
|
||||||
|
peerConnection.OnICECandidate(func(c *webrtc.ICECandidate) {
|
||||||
|
if c == nil {
|
||||||
|
return // gathering finished
|
||||||
|
}
|
||||||
|
if OnLocalICECandidate != nil {
|
||||||
|
OnLocalICECandidate(roomID, peerID, c.ToJSON())
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
err = peerConnection.SetRemoteDescription(offer)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
answer, err := peerConnection.CreateAnswer(nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = peerConnection.SetLocalDescription(answer)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Настраиваем пересылку RTP пакетов от издателя к другим участникам комнаты
|
||||||
|
SetupForwardingForPeer(roomID, peerID, peerConnection)
|
||||||
|
|
||||||
|
room.Peers = append(room.Peers, Peer{
|
||||||
|
PeerID: peerID,
|
||||||
|
PeerConnection: peerConnection,
|
||||||
|
})
|
||||||
|
|
||||||
|
return peerConnection.LocalDescription(), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// LeaveRoom позволяет пиру покинуть комнату
|
||||||
|
func LeaveRoom(roomID string, peerID string) error {
|
||||||
|
room, exists := GetRoom(roomID)
|
||||||
|
if !exists {
|
||||||
|
return ErrRoomNotFound
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, peer := range room.Peers {
|
||||||
|
if peer.PeerID == peerID {
|
||||||
|
peer.PeerConnection.Close()
|
||||||
|
room.Peers = append(room.Peers[:i], room.Peers[i+1:]...)
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
67
sfu/sfu.go
Normal file
67
sfu/sfu.go
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
package sfu
|
||||||
|
|
||||||
|
import (
|
||||||
|
"errors"
|
||||||
|
"g365sfu/logger"
|
||||||
|
"os"
|
||||||
|
|
||||||
|
"github.com/pion/interceptor"
|
||||||
|
"github.com/pion/webrtc/v4"
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
api *webrtc.API
|
||||||
|
pcConfig webrtc.Configuration
|
||||||
|
)
|
||||||
|
|
||||||
|
// Коллбек для обработки новых SDP офферов от сервера для конкретного пира (при renegotiation)
|
||||||
|
var OnServerOffer func(roomID string, peerID string, offer webrtc.SessionDescription)
|
||||||
|
|
||||||
|
// Коллбек для обработки новых ICE кандидатов
|
||||||
|
var OnLocalICECandidate func(roomID, peerID string, candidate webrtc.ICECandidateInit)
|
||||||
|
|
||||||
|
// Ошибки
|
||||||
|
var (
|
||||||
|
ErrRoomNotFound = errors.New("room not found")
|
||||||
|
ErrPeerNotFound = errors.New("peer not found")
|
||||||
|
)
|
||||||
|
|
||||||
|
func InitWebRTCEngines() {
|
||||||
|
publicIP := os.Getenv("PUBLIC_IP")
|
||||||
|
fromPort := os.Getenv("PORT_RANGE_FROM")
|
||||||
|
toPort := os.Getenv("PORT_RANGE_TO")
|
||||||
|
if publicIP == "" || fromPort == "" || toPort == "" {
|
||||||
|
// Если не указаны необходимые переменные окружения, логируем ошибку и завершаем процесс сервера
|
||||||
|
logger.LogErrorMessage("PUBLIC_IP, PORT_RANGE_FROM and PORT_RANGE_TO environment variables must be set")
|
||||||
|
os.Exit(-1)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m := &webrtc.MediaEngine{}
|
||||||
|
_ = m.RegisterDefaultCodecs()
|
||||||
|
|
||||||
|
i := &interceptor.Registry{}
|
||||||
|
_ = webrtc.RegisterDefaultInterceptors(m, i)
|
||||||
|
|
||||||
|
se := webrtc.SettingEngine{}
|
||||||
|
_ = se.SetEphemeralUDPPortRange(40000, 50000)
|
||||||
|
|
||||||
|
if publicIP := os.Getenv("PUBLIC_IP"); publicIP != "" {
|
||||||
|
se.SetICEAddressRewriteRules(webrtc.ICEAddressRewriteRule{
|
||||||
|
External: []string{publicIP},
|
||||||
|
AsCandidateType: webrtc.ICECandidateTypeHost,
|
||||||
|
Mode: webrtc.ICEAddressRewriteReplace,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
api = webrtc.NewAPI(
|
||||||
|
webrtc.WithMediaEngine(m),
|
||||||
|
webrtc.WithInterceptorRegistry(i),
|
||||||
|
webrtc.WithSettingEngine(se),
|
||||||
|
)
|
||||||
|
|
||||||
|
pcConfig = webrtc.Configuration{
|
||||||
|
ICEServers: []webrtc.ICEServer{
|
||||||
|
{URLs: []string{"stun:stun.l.google.com:19302"}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -2,7 +2,7 @@ package socket
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"g365sfu/logger"
|
"g365sfu/logger"
|
||||||
"math/rand"
|
"g365sfu/utils"
|
||||||
"net/http"
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
|
||||||
@@ -18,6 +18,7 @@ var upgrader = websocket.Upgrader{
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Получение секретного ключа из переменных окружения
|
||||||
func getSecret() string {
|
func getSecret() string {
|
||||||
return os.Getenv("SECRET")
|
return os.Getenv("SECRET")
|
||||||
}
|
}
|
||||||
@@ -54,17 +55,7 @@ func HandleWebSocket(w http.ResponseWriter, r *http.Request) {
|
|||||||
// Генерация случайного идентификатора для сокета
|
// Генерация случайного идентификатора для сокета
|
||||||
func randomSocketIdentifier() string {
|
func randomSocketIdentifier() string {
|
||||||
// Генерация случайного идентификатора для сокета
|
// Генерация случайного идентификатора для сокета
|
||||||
return "sock_" + randomString(10)
|
return "sock_" + utils.RandomString(10)
|
||||||
}
|
|
||||||
|
|
||||||
// Генерация случайной строки заданной длины
|
|
||||||
func randomString(n int) string {
|
|
||||||
const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
|
|
||||||
b := make([]byte, n)
|
|
||||||
for i := range b {
|
|
||||||
b[i] = letters[rand.Intn(len(letters))]
|
|
||||||
}
|
|
||||||
return string(b)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func processData(data <-chan []byte, connection *Connection) {
|
func processData(data <-chan []byte, connection *Connection) {
|
||||||
|
|||||||
13
utils/utils.go
Normal file
13
utils/utils.go
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
package utils
|
||||||
|
|
||||||
|
import "math/rand"
|
||||||
|
|
||||||
|
// Генерация случайной строки заданной длины
|
||||||
|
func RandomString(n int) string {
|
||||||
|
const letters = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
|
||||||
|
b := make([]byte, n)
|
||||||
|
for i := range b {
|
||||||
|
b[i] = letters[rand.Intn(len(letters))]
|
||||||
|
}
|
||||||
|
return string(b)
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user