Compare commits

..

3 Commits

6 changed files with 310 additions and 49 deletions

1
.gitignore vendored
View File

@@ -1 +1,2 @@
.env .env
.vscode

37
boot/boot.go Normal file
View File

@@ -0,0 +1,37 @@
package boot
import (
"g365sfu/logger"
"g365sfu/sfu"
"g365sfu/socket"
"net/http"
"os"
"github.com/joho/godotenv"
"github.com/pion/webrtc/v4"
)
func Bootstrap() {
godotenv.Load()
sfu.InitWebRTCEngines()
if os.Getenv("SECRET") == "" {
logger.LogErrorMessage("server failed to start because not set secret key in .env variables")
return
}
http.HandleFunc("/", socket.HandleWebSocket)
port := os.Getenv("PORT")
if port == "" {
port = "1001"
}
sfu.OnLocalICECandidate = OnLocalICECandidate
logger.LogInfoMessage("server started at x.x.x.x:" + port)
http.ListenAndServe(":"+port, nil)
}
func OnLocalICECandidate(roomID, peerID string, candidate webrtc.ICECandidateInit) {
logger.LogInfoMessage("new local ICE candidate for peer " + peerID + " in room " + roomID)
}
func OnServerOffer(roomID string, peerID string, offer webrtc.SessionDescription) {
logger.LogInfoMessage("new server offer for peer " + peerID + " in room " + roomID)
}

218
bytebuffer/bytebuffer.go Normal file
View File

@@ -0,0 +1,218 @@
package bytebuffer
import (
"encoding/binary"
"errors"
)
var (
ErrBufferUnderflow = errors.New("bytebuffer: buffer underflow")
ErrBufferOverflow = errors.New("bytebuffer: buffer overflow")
ErrInvalidPosition = errors.New("bytebuffer: invalid position")
ErrInvalidLimit = errors.New("bytebuffer: invalid limit")
)
type ByteBuffer struct {
buf []byte
pos int
limit int
order binary.ByteOrder
}
func Allocate(capacity int) *ByteBuffer {
if capacity < 0 {
capacity = 0
}
b := make([]byte, capacity)
return &ByteBuffer{
buf: b,
pos: 0,
limit: capacity,
order: binary.BigEndian,
}
}
func Wrap(data []byte) *ByteBuffer {
return &ByteBuffer{
buf: data,
pos: 0,
limit: len(data),
order: binary.BigEndian,
}
}
func (b *ByteBuffer) Order(order binary.ByteOrder) *ByteBuffer {
if order != nil {
b.order = order
}
return b
}
func (b *ByteBuffer) Capacity() int { return len(b.buf) }
func (b *ByteBuffer) Position() int { return b.pos }
func (b *ByteBuffer) Limit() int { return b.limit }
func (b *ByteBuffer) SetPosition(pos int) error {
if pos < 0 || pos > b.limit {
return ErrInvalidPosition
}
b.pos = pos
return nil
}
func (b *ByteBuffer) SetLimit(limit int) error {
if limit < 0 || limit > len(b.buf) {
return ErrInvalidLimit
}
b.limit = limit
if b.pos > b.limit {
b.pos = b.limit
}
return nil
}
func (b *ByteBuffer) Remaining() int {
return b.limit - b.pos
}
func (b *ByteBuffer) HasRemaining() bool {
return b.Remaining() > 0
}
// Clear: position=0, limit=capacity
func (b *ByteBuffer) Clear() *ByteBuffer {
b.pos = 0
b.limit = len(b.buf)
return b
}
// Flip: limit=position, position=0
func (b *ByteBuffer) Flip() *ByteBuffer {
b.limit = b.pos
b.pos = 0
return b
}
// Rewind: position=0
func (b *ByteBuffer) Rewind() *ByteBuffer {
b.pos = 0
return b
}
// Array возвращает весь underlying массив.
func (b *ByteBuffer) Array() []byte {
return b.buf
}
// Bytes возвращает readable срез [position:limit].
func (b *ByteBuffer) Bytes() []byte {
return b.buf[b.pos:b.limit]
}
func (b *ByteBuffer) ensureWrite(n int) error {
if b.pos+n > b.limit {
return ErrBufferOverflow
}
return nil
}
func (b *ByteBuffer) ensureRead(n int) error {
if b.pos+n > b.limit {
return ErrBufferUnderflow
}
return nil
}
func (b *ByteBuffer) Put(v byte) error {
if err := b.ensureWrite(1); err != nil {
return err
}
b.buf[b.pos] = v
b.pos++
return nil
}
func (b *ByteBuffer) PutBytes(p []byte) error {
if err := b.ensureWrite(len(p)); err != nil {
return err
}
copy(b.buf[b.pos:], p)
b.pos += len(p)
return nil
}
func (b *ByteBuffer) PutUint16(v uint16) error {
if err := b.ensureWrite(2); err != nil {
return err
}
b.order.PutUint16(b.buf[b.pos:b.pos+2], v)
b.pos += 2
return nil
}
func (b *ByteBuffer) PutUint32(v uint32) error {
if err := b.ensureWrite(4); err != nil {
return err
}
b.order.PutUint32(b.buf[b.pos:b.pos+4], v)
b.pos += 4
return nil
}
func (b *ByteBuffer) PutUint64(v uint64) error {
if err := b.ensureWrite(8); err != nil {
return err
}
b.order.PutUint64(b.buf[b.pos:b.pos+8], v)
b.pos += 8
return nil
}
func (b *ByteBuffer) Get() (byte, error) {
if err := b.ensureRead(1); err != nil {
return 0, err
}
v := b.buf[b.pos]
b.pos++
return v, nil
}
func (b *ByteBuffer) GetBytes(n int) ([]byte, error) {
if n < 0 {
return nil, ErrBufferUnderflow
}
if err := b.ensureRead(n); err != nil {
return nil, err
}
out := make([]byte, n)
copy(out, b.buf[b.pos:b.pos+n])
b.pos += n
return out, nil
}
func (b *ByteBuffer) GetUint16() (uint16, error) {
if err := b.ensureRead(2); err != nil {
return 0, err
}
v := b.order.Uint16(b.buf[b.pos : b.pos+2])
b.pos += 2
return v, nil
}
func (b *ByteBuffer) GetUint32() (uint32, error) {
if err := b.ensureRead(4); err != nil {
return 0, err
}
v := b.order.Uint32(b.buf[b.pos : b.pos+4])
b.pos += 4
return v, nil
}
func (b *ByteBuffer) GetUint64() (uint64, error) {
if err := b.ensureRead(8); err != nil {
return 0, err
}
v := b.order.Uint64(b.buf[b.pos : b.pos+8])
b.pos += 8
return v, nil
}

25
main.go
View File

@@ -1,27 +1,8 @@
package main package main
import ( import "g365sfu/boot"
"g365sfu/logger"
"g365sfu/sfu"
"g365sfu/socket"
"net/http"
"os"
"github.com/joho/godotenv"
)
func main() { func main() {
godotenv.Load() // Инициализация и запуск SFU сервера
sfu.InitWebRTCEngines() boot.Bootstrap()
if os.Getenv("SECRET") == "" {
logger.LogErrorMessage("server failed to start because not set secret key in .env variables")
return
}
http.HandleFunc("/", socket.HandleWebSocket)
port := os.Getenv("PORT")
if port == "" {
port = "1001"
}
logger.LogInfoMessage("server started at x.x.x.x:" + port)
http.ListenAndServe(":"+port, nil)
} }

View File

@@ -2,7 +2,6 @@ package sfu
import ( import (
connection "g365sfu/socket/struct" connection "g365sfu/socket/struct"
"g365sfu/utils"
"sync" "sync"
"github.com/pion/webrtc/v4" "github.com/pion/webrtc/v4"
@@ -33,8 +32,7 @@ var (
) )
// CreateRoom создает комнату // CreateRoom создает комнату
func CreateRoom(server *connection.Connection) (*Room, error) { func CreateRoom(server *connection.Connection, roomID string) (*Room, error) {
roomID := "room_" + utils.RandomString(64)
roomsMu.Lock() roomsMu.Lock()
defer roomsMu.Unlock() defer roomsMu.Unlock()

View File

@@ -2,6 +2,7 @@ package socket
import ( import (
"encoding/json" "encoding/json"
"g365sfu/bytebuffer"
"g365sfu/logger" "g365sfu/logger"
"g365sfu/sfu" "g365sfu/sfu"
connection "g365sfu/socket/struct" connection "g365sfu/socket/struct"
@@ -65,15 +66,23 @@ func randomSocketIdentifier() string {
func processData(data <-chan []byte, connection *connection.Connection) { func processData(data <-chan []byte, connection *connection.Connection) {
for bytes := range data { for bytes := range data {
// Логика обработки байтов // Логика обработки байтов
if bytes[0] == 0x01 { buffer := bytebuffer.Wrap(bytes)
packetId, _ := buffer.Get()
if packetId == 0x01 {
// Это рукопожатие, дальше сравниваем секретные ключи // Это рукопожатие, дальше сравниваем секретные ключи
// Секретный ключ идет сразу после первого байта и до конца сообщения secretLength, _ := buffer.GetUint32()
receivedSecret := string(bytes[1:])
if receivedSecret == getSecret() { receivedSecret, _ := buffer.GetBytes((int(secretLength)))
if string(receivedSecret) == getSecret() {
logger.LogSuccessMessage("success handshake from " + connection.Socket.RemoteAddr().String()) logger.LogSuccessMessage("success handshake from " + connection.Socket.RemoteAddr().String())
AddSocket(connection) AddSocket(connection)
connection.Socket.WriteMessage(websocket.BinaryMessage, []byte{0x01}) // Подготовка ответа для клиента о успешном рукопожатии
return response := bytebuffer.Allocate(1)
response.Put(0x01)
response.Flip()
// Отправляем ответ клиенту
connection.Socket.WriteMessage(websocket.BinaryMessage, response.Bytes())
continue
} }
connection.Socket.WriteMessage(websocket.BinaryMessage, []byte{0xFF}) connection.Socket.WriteMessage(websocket.BinaryMessage, []byte{0xFF})
logger.LogWarnMessage("failed handshake from " + connection.Socket.RemoteAddr().String() + " because of invalid secret key") logger.LogWarnMessage("failed handshake from " + connection.Socket.RemoteAddr().String() + " because of invalid secret key")
@@ -89,34 +98,51 @@ func processData(data <-chan []byte, connection *connection.Connection) {
} }
// Создание комнаты // Создание комнаты
if bytes[0] == 0x02 { if packetId == 0x02 {
room, _ := sfu.CreateRoom(connection) roomLength, _ := buffer.GetUint32()
roomIDBytes, _ := buffer.GetBytes(int(roomLength))
roomID := string(roomIDBytes)
room, _ := sfu.CreateRoom(connection, roomID)
logger.LogSuccessMessage("room initializated " + room.RoomID) logger.LogSuccessMessage("room initializated " + room.RoomID)
bytes = append([]byte{0x02}, []byte(room.RoomID)...) // Подготовка ответа для клиента с ID комнаты
connection.Socket.WriteMessage(websocket.BinaryMessage, bytes) response := bytebuffer.Allocate(1 + 4 + len([]byte(room.RoomID)))
return response.Put(0x02)
response.PutUint32(uint32(len([]byte(room.RoomID))))
response.PutBytes([]byte(room.RoomID))
response.Flip()
// Отправляем ответ клиенту
connection.Socket.WriteMessage(websocket.BinaryMessage, response.Bytes())
continue
} }
//SDP OFFER для подключения к комнате //SDP OFFER для подключения к комнате
if bytes[0] == 0x03 { if packetId == 0x03 {
roomIdLen := int(bytes[1]) roomIdLen, _ := buffer.GetUint32()
roomID := string(bytes[2 : 2+roomIdLen]) roomIDBytes, _ := buffer.GetBytes(int(roomIdLen))
roomID := string(roomIDBytes)
_, exists := sfu.GetRoom(roomID) _, exists := sfu.GetRoom(roomID)
if !exists { if !exists {
logger.LogWarnMessage("peer " + connection.Socket.RemoteAddr().String() + " tried to join non existing room " + roomID) logger.LogWarnMessage("peer " + connection.Socket.RemoteAddr().String() + " tried to join non existing room " + roomID)
return continue
} }
peerIdLen := int(bytes[2+roomIdLen]) peerIdLen, _ := buffer.GetUint32()
peerID := string(bytes[3+roomIdLen : 3+roomIdLen+peerIdLen]) peerIDBytes, _ := buffer.GetBytes(int(peerIdLen))
logger.LogSuccessMessage("peer " + connection.Socket.RemoteAddr().String() + " joined to room " + roomID) peerID := string(peerIDBytes)
logger.LogSuccessMessage("peer " + peerID + " joined to room " + roomID)
offerLength, _ := buffer.GetUint32()
offerBytes, _ := buffer.GetBytes(int(offerLength))
var offer webrtc.SessionDescription var offer webrtc.SessionDescription
err := json.Unmarshal(bytes[3+roomIdLen+peerIdLen:], &offer) err := json.Unmarshal(offerBytes, &offer)
if err != nil { if err != nil {
logger.LogWarnMessage("failed to unmarshal offer from peer " + connection.Socket.RemoteAddr().String() + ": " + err.Error()) logger.LogWarnMessage("failed to unmarshal offer from peer " + peerID + " in room " + roomID + ": " + err.Error())
return continue
} }
sfu.JoinWithOffer(roomID, peerID, offer) _, err = sfu.JoinWithOffer(roomID, peerID, offer)
return if err != nil {
logger.LogWarnMessage("failed to join peer " + peerID + " to room " + roomID + ": " + err.Error())
continue
}
continue
} }
} }
} }