diff --git a/boot/boot.go b/boot/boot.go index d1a24f1..2db79b2 100644 --- a/boot/boot.go +++ b/boot/boot.go @@ -6,6 +6,7 @@ import ( "g365sfu/logger" "g365sfu/sfu" "g365sfu/socket" + "g365sfu/turn" "net/http" "os" @@ -27,6 +28,19 @@ func Bootstrap() { } sfu.OnLocalICECandidate = OnLocalICECandidate sfu.OnServerOffer = OnServerOffer + turnServer, err := turn.Start(turn.Config{ + ListenAddr: "0.0.0.0:3478", + PublicIP: os.Getenv("TURN_PUBLIC_IP"), + Realm: "g365sfu", + Username: os.Getenv("TURN_USER"), + Password: os.Getenv("TURN_PASS"), + }) + if err != nil { + logger.LogWarnMessage("TURN start failed: " + err.Error()) + } else { + logger.LogInfoMessage("TURN started on 0.0.0.0:3478") + defer turnServer.Close() + } logger.LogInfoMessage("server started at x.x.x.x:" + port) http.ListenAndServe(":"+port, nil) } diff --git a/sfu/forwarding.go b/sfu/forwarding.go index 18b2d52..7880571 100644 --- a/sfu/forwarding.go +++ b/sfu/forwarding.go @@ -12,8 +12,15 @@ import ( var ( pendingMu sync.Mutex pendingCandidates = map[string][]webrtc.ICECandidateInit{} // key: roomID|peerID + renegMu sync.Map ) +func getRenegLock(roomID, peerID string) *sync.Mutex { + k := peerKey(roomID, peerID) + v, _ := renegMu.LoadOrStore(k, &sync.Mutex{}) + return v.(*sync.Mutex) +} + func peerKey(roomID, peerID string) string { return roomID + "|" + peerID } @@ -92,6 +99,15 @@ func SetupForwardingForPeer(roomID string, publisherPeerID string, publisherPC * } func renegotiatePeer(roomID, peerID string, pc *webrtc.PeerConnection) error { + lock := getRenegLock(roomID, peerID) + lock.Lock() + defer lock.Unlock() + + // Не начинаем новую negotiation поверх текущей + if pc.SignalingState() != webrtc.SignalingStateStable { + return nil + } + offer, err := pc.CreateOffer(nil) if err != nil { return err diff --git a/turn/turn.go b/turn/turn.go new file mode 100644 index 0000000..3ad39f8 --- /dev/null +++ b/turn/turn.go @@ -0,0 +1,95 @@ +package turn + +import ( + "fmt" + "net" + + pionturn "github.com/pion/turn/v4" +) + +type Server struct { + udpConn net.PacketConn + tcpListener net.Listener + srv *pionturn.Server +} + +type Config struct { + ListenAddr string // "0.0.0.0:3478" + PublicIP string + Realm string + Username string + Password string +} + +func Start(cfg Config) (*Server, error) { + ip := net.ParseIP(cfg.PublicIP) + if ip == nil { + return nil, fmt.Errorf("turn: invalid PublicIP: %s", cfg.PublicIP) + } + + udpConn, err := net.ListenPacket("udp4", cfg.ListenAddr) + if err != nil { + return nil, err + } + + tcpListener, err := net.Listen("tcp4", cfg.ListenAddr) + if err != nil { + _ = udpConn.Close() + return nil, err + } + + srv, err := pionturn.NewServer(pionturn.ServerConfig{ + Realm: cfg.Realm, + AuthHandler: func(username, realm string, srcAddr net.Addr) ([]byte, bool) { + if username != cfg.Username { + return nil, false + } + return pionturn.GenerateAuthKey(username, realm, cfg.Password), true + }, + PacketConnConfigs: []pionturn.PacketConnConfig{ + { + PacketConn: udpConn, + RelayAddressGenerator: &pionturn.RelayAddressGeneratorStatic{ + RelayAddress: ip, + Address: "0.0.0.0", + }, + }, + }, + ListenerConfigs: []pionturn.ListenerConfig{ + { + Listener: tcpListener, + RelayAddressGenerator: &pionturn.RelayAddressGeneratorStatic{ + RelayAddress: ip, + Address: "0.0.0.0", + }, + }, + }, + }) + if err != nil { + _ = tcpListener.Close() + _ = udpConn.Close() + return nil, err + } + + return &Server{ + udpConn: udpConn, + tcpListener: tcpListener, + srv: srv, + }, nil +} + +func (s *Server) Close() error { + if s == nil { + return nil + } + if s.srv != nil { + _ = s.srv.Close() + } + if s.tcpListener != nil { + _ = s.tcpListener.Close() + } + if s.udpConn != nil { + _ = s.udpConn.Close() + } + return nil +}