@@ -3,9 +3,18 @@ package im.rosetta.service.services;
import java.util.HashSet ;
import java.util.Set ;
import im.rosetta.client.ClientManager ;
import im.rosetta.logger.Logger ;
import im.rosetta.logger.enums.Color ;
import im.rosetta.packet.Packet27WebRTC ;
import im.rosetta.packet.runtime.NetworkWebRTCType ;
import io.g365sfu.Room ;
import io.g365sfu.SFU ;
import io.g365sfu.webrtc.ICECandidate ;
import io.g365sfu.webrtc.RTCIceServer ;
import io.g365sfu.webrtc.SDPAnswer ;
import io.g365sfu.webrtc.SDPOffer ;
import io.orprotocol.ProtocolException ;
/**
* Это сервис который взаимодействуют с SFU серверами для организации звонков между пользователями.
@@ -14,9 +23,44 @@ public class ForwardUnitService {
private Logger logger ;
private Set < SFU > sfuConnections = new HashSet < > ( ) ;
private ClientManager clientManager ;
private Set < RTCIceServer > turnServers = new HashSet < > ( ) ;
public ForwardUnitService ( Logger logger ) {
public ForwardUnitService ( Logger logger , ClientManager clientManager ) {
this . logger = logger ;
this . clientManager = clientManager ;
this . readAllTurnServers ( ) ;
}
/**
* Читаем все TURN сервера из переменной окружения и сохраняем их для дальнейшего
* использования при организации звонков через SFU серверы.
*/
private void readAllTurnServers ( ) {
String turnServersEnv = System . getenv ( " TURN_SERVERS " ) ;
if ( turnServersEnv = = null | | turnServersEnv . isEmpty ( ) ) {
this . logger . info ( Color . YELLOW + " No TURN servers configured, skipping TURN servers boot " ) ;
return ;
}
String [ ] turnServers = turnServersEnv . split ( " , " ) ;
for ( String turnServer : turnServers ) {
String [ ] parts = turnServer . split ( " @ " ) ;
if ( parts . length ! = 2 ) {
this . logger . error ( Color . RED + " Invalid TURN server configuration: " + turnServer ) ;
continue ;
}
String address = parts [ 0 ] ;
String credentialsPart = parts [ 1 ] ;
String [ ] credentialsParts = credentialsPart . split ( " : " ) ;
if ( credentialsParts . length ! = 2 ) {
this . logger . error ( Color . RED + " Invalid TURN server credentials configuration: " + credentialsPart ) ;
continue ;
}
String username = credentialsParts [ 0 ] ;
String credential = credentialsParts [ 1 ] ;
RTCIceServer iceServer = new RTCIceServer ( address , username , credential ) ;
this . turnServers . add ( iceServer ) ;
}
}
/**
@@ -45,6 +89,22 @@ public class ForwardUnitService {
try {
SFU connection = new SFU ( address , secretKey ) ;
connection . connect ( ) ;
connection . setIceConsumer ( arg0 - > {
try {
onIceCandidate ( arg0 ) ;
} catch ( ProtocolException e ) {
this . logger . error ( Color . RED + " Failed to retranslate ICE-candidate from SFU server: " + address + " , error: " + e . getMessage ( ) ) ;
e . printStackTrace ( ) ;
}
} ) ;
connection . setAnswerConsumer ( arg0 - > {
try {
onSdpAnswer ( arg0 ) ;
} catch ( ProtocolException e ) {
this . logger . error ( Color . RED + " Failed to retranslate SDP answer from SFU server: " + address + " , error: " + e . getMessage ( ) ) ;
e . printStackTrace ( ) ;
}
} ) ;
this . sfuConnections . add ( connection ) ;
this . logger . info ( Color . GREEN + " Successfully connected to SFU server: " + address ) ;
} catch ( Exception e ) {
@@ -53,4 +113,93 @@ public class ForwardUnitService {
}
}
public void onSdpAnswer ( SDPAnswer sdpAnswer ) throws ProtocolException {
String participantId = sdpAnswer . getParticipantId ( ) ;
Packet27WebRTC packet = new Packet27WebRTC ( ) ;
packet . setSdpOrCandidate ( sdpAnswer . getSdp ( ) ) ;
packet . setType ( NetworkWebRTCType . ANSWER ) ;
this . clientManager . sendPacketToAuthorizedPK ( participantId , packet ) ;
}
/**
* Выполняется когда сервер SFU отправляет ICE-кандидата для одного из участников комнаты.
* @param iceCandidate объект ICECandidate,
* который содержит информацию о комнате, участнике и самом кандидате,
* которая необходима для правильной маршрутизации данных между участниками звонка через сервер SFU.
* @throws ProtocolException
*/
public void onIceCandidate ( ICECandidate iceCandidate ) throws ProtocolException {
String publicKey = iceCandidate . getParticipantId ( ) ;
Packet27WebRTC packet = new Packet27WebRTC ( ) ;
packet . setSdpOrCandidate ( iceCandidate . getCandidate ( ) ) ;
packet . setType ( NetworkWebRTCType . ICE_CANDIDATE ) ;
this . clientManager . sendPacketToAuthorizedPK ( publicKey , packet ) ;
}
/**
* Выполняется когда сервер SFU отправляет SDP оффер для одного из участников комнаты.
* @param sdpOffer объект SDPOffer,
* который содержит информацию о комнате, участнике и самом оффере,
* которая необходима для правильной маршрутизации данных между участниками звонка через сервер SFU.
* @throws ProtocolException
*/
public void onSdpOffer ( SDPOffer sdpOffer ) throws ProtocolException {
String participantId = sdpOffer . getParticipantId ( ) ;
Packet27WebRTC packet = new Packet27WebRTC ( ) ;
packet . setSdpOrCandidate ( sdpOffer . getSdp ( ) ) ;
packet . setType ( NetworkWebRTCType . OFFER ) ;
this . clientManager . sendPacketToAuthorizedPK ( participantId , packet ) ;
}
/**
* Получает комнату в которой сейчас находится пользователь
* @param participantId идентификатор пользователя на сервере SFU
* @return комната Room если найдена, иначе null
*/
public Room getRoomByParticipantId ( String participantId ) {
for ( SFU sfu : this . sfuConnections ) {
Room room = sfu . getRoomByParticipantId ( participantId ) ;
if ( room ! = null ) {
return room ;
}
}
return null ;
}
/**
* Автоматически выбирает сервер для создания комнаты, и создает в нем комнату
* @return комната
*/
public Room createRoom ( ) {
if ( this . sfuConnections . isEmpty ( ) ) {
return null ;
}
SFU selectedSfu = null ;
int minRooms = Integer . MAX_VALUE ;
for ( SFU sfu : this . sfuConnections ) {
int roomsCount = sfu . getRoomsCount ( ) ;
if ( roomsCount < minRooms ) {
minRooms = roomsCount ;
selectedSfu = sfu ;
}
}
if ( selectedSfu ! = null ) {
try {
return selectedSfu . createRoom ( ) ;
} catch ( Exception e ) {
this . logger . error ( Color . RED + " Failed to create room on SFU server: " + selectedSfu . getServerAddress ( ) + " , error: " + e . getMessage ( ) ) ;
}
}
return null ;
}
/**
* Получить список TURN серверов, которые могут быть использованы для обмена кандидатами между участниками звонка через сервер SFU.
* @return список серверов для RTC
*/
public Set < RTCIceServer > getTurnServers ( ) {
return turnServers ;
}
}