diff --git a/src/main/java/im/rosetta/packet/runtime/NetworkSignalType.java b/src/main/java/im/rosetta/packet/runtime/NetworkSignalType.java index 9ab8677..d062cd0 100644 --- a/src/main/java/im/rosetta/packet/runtime/NetworkSignalType.java +++ b/src/main/java/im/rosetta/packet/runtime/NetworkSignalType.java @@ -19,7 +19,11 @@ public enum NetworkSignalType { /** * Сигнал для завершения звонка, указывает на то, что звонок завершен и участники должны прекратить обмен данными */ - END_CALL(3); + END_CALL(3), + /** + * Создание комнаты + */ + CREATE_ROOM(4); private final int code; diff --git a/src/main/java/im/rosetta/packet/runtime/NetworkWebRTCType.java b/src/main/java/im/rosetta/packet/runtime/NetworkWebRTCType.java new file mode 100644 index 0000000..964477a --- /dev/null +++ b/src/main/java/im/rosetta/packet/runtime/NetworkWebRTCType.java @@ -0,0 +1,36 @@ +package im.rosetta.packet.runtime; + +public enum NetworkWebRTCType { + /** + * Оффер + */ + OFFER(0), + /** + * Ответ на оффер + */ + ANSWER(1), + /** + * ICE кандидат + */ + ICE_CANDIDATE(2); + + private final int code; + + NetworkWebRTCType(int code) { + this.code = code; + } + + public int getCode() { + return code; + } + + public static NetworkWebRTCType fromCode(int code) { + for (NetworkWebRTCType type : NetworkWebRTCType.values()) { + if (type.code == code) { + return type; + } + } + throw new IllegalArgumentException("Unknown NetworkWebRTCType code: " + code); + } + +} diff --git a/src/main/java/im/rosetta/service/services/ForwardUnitService.java b/src/main/java/im/rosetta/service/services/ForwardUnitService.java index 2528efa..fa49f3b 100644 --- a/src/main/java/im/rosetta/service/services/ForwardUnitService.java +++ b/src/main/java/im/rosetta/service/services/ForwardUnitService.java @@ -3,9 +3,18 @@ package im.rosetta.service.services; import java.util.HashSet; import java.util.Set; +import im.rosetta.client.ClientManager; import im.rosetta.logger.Logger; import im.rosetta.logger.enums.Color; +import im.rosetta.packet.Packet27WebRTC; +import im.rosetta.packet.runtime.NetworkWebRTCType; +import io.g365sfu.Room; import io.g365sfu.SFU; +import io.g365sfu.webrtc.ICECandidate; +import io.g365sfu.webrtc.RTCIceServer; +import io.g365sfu.webrtc.SDPAnswer; +import io.g365sfu.webrtc.SDPOffer; +import io.orprotocol.ProtocolException; /** * Это сервис который взаимодействуют с SFU серверами для организации звонков между пользователями. @@ -14,9 +23,44 @@ public class ForwardUnitService { private Logger logger; private Set sfuConnections = new HashSet<>(); + private ClientManager clientManager; + private Set turnServers = new HashSet<>(); - public ForwardUnitService(Logger logger) { + public ForwardUnitService(Logger logger, ClientManager clientManager) { this.logger = logger; + this.clientManager = clientManager; + this.readAllTurnServers(); + } + + /** + * Читаем все TURN сервера из переменной окружения и сохраняем их для дальнейшего + * использования при организации звонков через SFU серверы. + */ + private void readAllTurnServers() { + String turnServersEnv = System.getenv("TURN_SERVERS"); + if(turnServersEnv == null || turnServersEnv.isEmpty()) { + this.logger.info(Color.YELLOW + "No TURN servers configured, skipping TURN servers boot"); + return; + } + String[] turnServers = turnServersEnv.split(","); + for(String turnServer : turnServers) { + String[] parts = turnServer.split("@"); + if(parts.length != 2) { + this.logger.error(Color.RED + "Invalid TURN server configuration: " + turnServer); + continue; + } + String address = parts[0]; + String credentialsPart = parts[1]; + String[] credentialsParts = credentialsPart.split(":"); + if(credentialsParts.length != 2) { + this.logger.error(Color.RED + "Invalid TURN server credentials configuration: " + credentialsPart); + continue; + } + String username = credentialsParts[0]; + String credential = credentialsParts[1]; + RTCIceServer iceServer = new RTCIceServer(address, username, credential); + this.turnServers.add(iceServer); + } } /** @@ -45,6 +89,22 @@ public class ForwardUnitService { try { SFU connection = new SFU(address, secretKey); connection.connect(); + connection.setIceConsumer(arg0 -> { + try { + onIceCandidate(arg0); + } catch (ProtocolException e) { + this.logger.error(Color.RED + "Failed to retranslate ICE-candidate from SFU server: " + address + ", error: " + e.getMessage()); + e.printStackTrace(); + } + }); + connection.setAnswerConsumer(arg0 -> { + try{ + onSdpAnswer(arg0); + }catch(ProtocolException e){ + this.logger.error(Color.RED + "Failed to retranslate SDP answer from SFU server: " + address + ", error: " + e.getMessage()); + e.printStackTrace(); + } + }); this.sfuConnections.add(connection); this.logger.info(Color.GREEN + "Successfully connected to SFU server: " + address); } catch (Exception e) { @@ -52,5 +112,94 @@ public class ForwardUnitService { } } } + + public void onSdpAnswer(SDPAnswer sdpAnswer) throws ProtocolException { + String participantId = sdpAnswer.getParticipantId(); + Packet27WebRTC packet = new Packet27WebRTC(); + packet.setSdpOrCandidate(sdpAnswer.getSdp()); + packet.setType(NetworkWebRTCType.ANSWER); + this.clientManager.sendPacketToAuthorizedPK(participantId, packet); + } + + /** + * Выполняется когда сервер SFU отправляет ICE-кандидата для одного из участников комнаты. + * @param iceCandidate объект ICECandidate, + * который содержит информацию о комнате, участнике и самом кандидате, + * которая необходима для правильной маршрутизации данных между участниками звонка через сервер SFU. + * @throws ProtocolException + */ + public void onIceCandidate(ICECandidate iceCandidate) throws ProtocolException { + String publicKey = iceCandidate.getParticipantId(); + Packet27WebRTC packet = new Packet27WebRTC(); + packet.setSdpOrCandidate(iceCandidate.getCandidate()); + packet.setType(NetworkWebRTCType.ICE_CANDIDATE); + this.clientManager.sendPacketToAuthorizedPK(publicKey, packet); + } + + /** + * Выполняется когда сервер SFU отправляет SDP оффер для одного из участников комнаты. + * @param sdpOffer объект SDPOffer, + * который содержит информацию о комнате, участнике и самом оффере, + * которая необходима для правильной маршрутизации данных между участниками звонка через сервер SFU. + * @throws ProtocolException + */ + public void onSdpOffer(SDPOffer sdpOffer) throws ProtocolException { + String participantId = sdpOffer.getParticipantId(); + Packet27WebRTC packet = new Packet27WebRTC(); + packet.setSdpOrCandidate(sdpOffer.getSdp()); + packet.setType(NetworkWebRTCType.OFFER); + this.clientManager.sendPacketToAuthorizedPK(participantId, packet); + } + + /** + * Получает комнату в которой сейчас находится пользователь + * @param participantId идентификатор пользователя на сервере SFU + * @return комната Room если найдена, иначе null + */ + public Room getRoomByParticipantId(String participantId) { + for(SFU sfu : this.sfuConnections){ + Room room = sfu.getRoomByParticipantId(participantId); + if(room != null){ + return room; + } + } + return null; + } + + /** + * Автоматически выбирает сервер для создания комнаты, и создает в нем комнату + * @return комната + */ + public Room createRoom() { + if(this.sfuConnections.isEmpty()) { + return null; + } + SFU selectedSfu = null; + int minRooms = Integer.MAX_VALUE; + for(SFU sfu : this.sfuConnections) { + int roomsCount = sfu.getRoomsCount(); + if(roomsCount < minRooms) { + minRooms = roomsCount; + selectedSfu = sfu; + } + } + if(selectedSfu != null) { + try{ + return selectedSfu.createRoom(); + }catch(Exception e){ + this.logger.error(Color.RED + "Failed to create room on SFU server: " + selectedSfu.getServerAddress() + ", error: " + e.getMessage()); + } + } + return null; + } + + /** + * Получить список TURN серверов, которые могут быть использованы для обмена кандидатами между участниками звонка через сервер SFU. + * @return список серверов для RTC + */ + public Set getTurnServers() { + return turnServers; + } + }