diff --git a/.env b/.env index bacac01..c21b809 100644 --- a/.env +++ b/.env @@ -18,3 +18,6 @@ FIREBASE_CREDENTIALS_PATH=serviceAccount.json #Каждые сколько дней будет очищаться буфер (максимальная дистанция синхронизации сообщений) BUFFER_CLEANUP_DAYS=7 +#SFU Сервера +SFU_SERVERS=127.0.0.1:1001@SFU_TEST_SECRET + diff --git a/src/main/java/im/rosetta/Boot.java b/src/main/java/im/rosetta/Boot.java index 795f4e0..eef6f93 100644 --- a/src/main/java/im/rosetta/Boot.java +++ b/src/main/java/im/rosetta/Boot.java @@ -17,6 +17,7 @@ import im.rosetta.executors.Executor21GroupLeave; import im.rosetta.executors.Executor22GroupBan; import im.rosetta.executors.Executor24DeviceResolve; import im.rosetta.executors.Executor25Sync; +import im.rosetta.executors.Executor26Signal; import im.rosetta.executors.Executor3Search; import im.rosetta.executors.Executor4OnlineState; import im.rosetta.executors.Executor6Message; @@ -54,6 +55,8 @@ import im.rosetta.packet.Packet7Read; import im.rosetta.packet.Packet8Delivery; import im.rosetta.packet.Packet9DeviceNew; import im.rosetta.service.services.BufferCleanupService; +import im.rosetta.service.services.ForwardUnitService; +import io.g365sfu.SFU; import io.orprotocol.Server; import io.orprotocol.Settings; import io.orprotocol.packet.PacketManager; @@ -75,6 +78,7 @@ public class Boot { private ClientManager clientManager; private OnlineManager onlineManager; private BufferCleanupService bufferCleanupService; + private ForwardUnitService forwardUnitService; /** * Конструктор по умолчанию, использует порт 3000 для сервера @@ -106,6 +110,7 @@ public class Boot { int cleanupEveryDays = System.getenv("BUFFER_CLEANUP_DAYS") != null ? Integer.parseInt(System.getenv("BUFFER_CLEANUP_DAYS")) : 7; this.bufferCleanupService = new BufferCleanupService(cleanupEveryDays, this.logger); + this.forwardUnitService = new ForwardUnitService(this.logger); } /** @@ -152,7 +157,8 @@ public class Boot { this.registerAllEvents(); this.printBootMessage(); this.bufferCleanupService.start(); - return this; + this.forwardUnitService.connectToAllSFUServers(); + return this; }catch(Exception e){ this.logger.error(Color.RED + "Booting error, stack trace:"); e.printStackTrace(); @@ -217,6 +223,7 @@ public class Boot { this.packetManager.registerExecutor(22, new Executor22GroupBan()); this.packetManager.registerExecutor(24, new Executor24DeviceResolve(this.clientManager, this.eventManager, this.packetManager)); this.packetManager.registerExecutor(25, new Executor25Sync(this.packetManager)); + this.packetManager.registerExecutor(26, new Executor26Signal(this.clientManager)); } private void printBootMessage() { diff --git a/src/main/java/im/rosetta/executors/Executor26Signal.java b/src/main/java/im/rosetta/executors/Executor26Signal.java new file mode 100644 index 0000000..be0108d --- /dev/null +++ b/src/main/java/im/rosetta/executors/Executor26Signal.java @@ -0,0 +1,39 @@ +package im.rosetta.executors; + +import im.rosetta.client.ClientManager; +import im.rosetta.client.tags.ECIAuthentificate; +import im.rosetta.packet.Packet26Signal; +import io.orprotocol.ProtocolException; +import io.orprotocol.client.Client; +import io.orprotocol.packet.PacketExecutor; + +public class Executor26Signal extends PacketExecutor { + + public ClientManager clientManager; + + public Executor26Signal(ClientManager clientManager) { + this.clientManager = clientManager; + } + + @Override + public void onPacketReceived(Packet26Signal packet, Client client) throws Exception, ProtocolException { + ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class); + if (eciAuthentificate == null || !eciAuthentificate.hasAuthorized()) { + /** + * Если клиент не авторизован, то мы не будем обрабатывать его сигналы на анициализацию звонка + * и просто отключим его от сервера. + */ + client.disconnect(); + return; + } + /** + * TODO: Проверка на существование получателя + */ + this.clientManager.sendPacketToAuthorizedPK(packet.getDst(), packet); + /** + * TODO: Высокоприоритетный пуш для сигналов звонков, чтобы мобильные устройства могли показать + * интерфейс входящего звонка, даже если приложение находится в фоне + */ + } + +} diff --git a/src/main/java/im/rosetta/packet/runtime/NetworkSignalType.java b/src/main/java/im/rosetta/packet/runtime/NetworkSignalType.java index ccaa4f1..9ab8677 100644 --- a/src/main/java/im/rosetta/packet/runtime/NetworkSignalType.java +++ b/src/main/java/im/rosetta/packet/runtime/NetworkSignalType.java @@ -5,25 +5,21 @@ package im.rosetta.packet.runtime; */ public enum NetworkSignalType { /** - * CALL - сигнал для совершения звонка, инициирует процесс звонка + * Сигнал для совершения звонка, инициирует процесс звонка */ CALL(0), /** - * KEY_EXCHANGE - сигнал для обмена ключами, используется для обмена DH ключами между участниками звонка + * Сигнал для обмена ключами, используется для обмена DH ключами между участниками звонка */ KEY_EXCHANGE(1), /** - * ICE_CANDIDATE - сигнал для обмена ICE кандидатами, используется для обмена сетевыми кандидатами между участниками звонка + * Сигнал для активного звонка, указывает на то, что звонок активен и участники могут обмениваться данными */ - ICE_CONFIG(2), + ACTIVE_CALL(2), /** - * ACTIVE_CALL - сигнал для активного звонка, указывает на то, что звонок активен и участники могут обмениваться данными + * Сигнал для завершения звонка, указывает на то, что звонок завершен и участники должны прекратить обмен данными */ - ACTIVE_CALL(3), - /** - * END_CALL - сигнал для завершения звонка, указывает на то, что звонок завершен и участники должны прекратить обмен данными - */ - END_CALL(4); + END_CALL(3); private final int code; diff --git a/src/main/java/im/rosetta/service/services/ForwardUnitService.java b/src/main/java/im/rosetta/service/services/ForwardUnitService.java new file mode 100644 index 0000000..2528efa --- /dev/null +++ b/src/main/java/im/rosetta/service/services/ForwardUnitService.java @@ -0,0 +1,56 @@ +package im.rosetta.service.services; + +import java.util.HashSet; +import java.util.Set; + +import im.rosetta.logger.Logger; +import im.rosetta.logger.enums.Color; +import io.g365sfu.SFU; + +/** + * Это сервис который взаимодействуют с SFU серверами для организации звонков между пользователями. + */ +public class ForwardUnitService { + + private Logger logger; + private Set sfuConnections = new HashSet<>(); + + public ForwardUnitService(Logger logger) { + this.logger = logger; + } + + /** + * Инициализирует соединения к SFU серверам для звонков. + * Ожидается, что адреса SFU серверов и секретные ключи для них будут переданы через переменную окружения SFU_SERVERS в формате "address1@secretKey1,address2@secretKey2,...". + * Для каждого сервера будет предпринята попытка установить соединение и выполнить рукопожатие. + * Если соединение не может быть установлено или рукопожатие не удается, + * будет выведено сообщение об ошибке в лог, но процесс продолжится для остальных серверов. + * Успешные подключения будут сохранены в наборе sfuConnections для дальнейшего использования + */ + public void connectToAllSFUServers() { + String sfuServersEnv = System.getenv("SFU_SERVERS"); + if(sfuServersEnv == null || sfuServersEnv.isEmpty()) { + this.logger.info(Color.YELLOW + "No SFU servers configured, skipping SFU connections boot"); + return; + } + String[] sfuServers = sfuServersEnv.split(","); + for(String sfuServer : sfuServers) { + String[] parts = sfuServer.split("@"); + if(parts.length != 2) { + this.logger.error(Color.RED + "Invalid SFU server configuration: " + sfuServer); + continue; + } + String address = parts[0]; + String secretKey = parts[1]; + try { + SFU connection = new SFU(address, secretKey); + connection.connect(); + this.sfuConnections.add(connection); + this.logger.info(Color.GREEN + "Successfully connected to SFU server: " + address); + } catch (Exception e) { + this.logger.error(Color.RED + "Failed to connect to SFU server: " + address + ", error: " + e.getMessage()); + } + } + } + +} diff --git a/src/main/java/io/g365sfu/SFU.java b/src/main/java/io/g365sfu/SFU.java new file mode 100644 index 0000000..235544d --- /dev/null +++ b/src/main/java/io/g365sfu/SFU.java @@ -0,0 +1,77 @@ +package io.g365sfu; + +import java.net.URISyntaxException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import io.g365sfu.exception.SFUException; +import io.g365sfu.exception.SFUHandshakeException; +import io.g365sfu.net.SfuSock; + + +public class SFU { + + private String serverAddress; + private String secretKey; + private SfuSock socket; + + /** + * Конструктор для создания объекта SFU, который будет использоваться для установления соединения с SFU сервером. + * @param serverAddress адрес SFU сервера в формате "host:port", например "sfu.example.com:8080" + * @param secretKey секретный ключ для аутентификации с SFU сервером, который должен быть согласован с настройками сервера. + */ + public SFU(String serverAddress, String secretKey) { + this.serverAddress = serverAddress; + this.secretKey = secretKey; + } + + /** + * Установить соединение с SFU сервером и начать обмен рукопожатиями для аутентификации и установления безопасного канала связи. + * @throws URISyntaxException если адрес сервера имеет неправильный формат + * @throws InterruptedException если соединение было прервано во время попытки подключения + * @throws SFUException если не удалось установить соединение с SFU сервером или если соединение было установлено, + * но не открыто после подключения + * @throws TimeoutException не удалось обменяться рукопожатиями с SFU сервером в течение 30 секунд + * @throws ExecutionException если во время обмена рукопожатиями произошла ошибка выполнения + * @throws SFUHandshakeException если обмен рукопожатиями с SFU завершился неудачно (например, плохой ключ) + */ + public void connect() throws URISyntaxException, InterruptedException, SFUException, ExecutionException, TimeoutException, SFUHandshakeException { + this.socket = new SfuSock(this.serverAddress); + boolean connected = this.socket.connectBlocking(30, TimeUnit.SECONDS); + if(!connected){ + throw new SFUException("Failed to connect to SFU server, timeout after 30 seconds: " + this.serverAddress); + } + if(!this.socket.isOpen()) { + throw new SFUException("Connection to SFU server at " + this.serverAddress + " is not open"); + } + boolean estabilished = this.socket.handshakeExchange(this.secretKey).get(30, TimeUnit.SECONDS); + if(!estabilished) { + throw new SFUHandshakeException("Failed to establish handshake with SFU server at " + this.serverAddress); + } + } + + /** + * Получить адрес SFU сервера, к которому установлено соединение + * @return адрес SFU сервера + */ + public String getServerAddress() { + return this.serverAddress; + } + + /** + * Получить соединение к SFU серверу, если оно было установлено + * @return объект SfuSock, представляющий соединение к SFU серверу + */ + public SfuSock getConnection() { + return this.socket; + } + + /** + * Проверить, установлено ли соединение с SFU сервером и открыто ли оно + * @return true, если соединение установлено и открыто, false в противном случае + */ + public boolean isOpen() { + return this.socket != null && this.socket.isOpen(); + } +} diff --git a/src/main/java/io/g365sfu/exception/SFUException.java b/src/main/java/io/g365sfu/exception/SFUException.java new file mode 100644 index 0000000..e3c6239 --- /dev/null +++ b/src/main/java/io/g365sfu/exception/SFUException.java @@ -0,0 +1,13 @@ +package io.g365sfu.exception; + +public class SFUException extends Exception { + + public SFUException(String message) { + super(message); + } + + public SFUException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/src/main/java/io/g365sfu/exception/SFUHandshakeException.java b/src/main/java/io/g365sfu/exception/SFUHandshakeException.java new file mode 100644 index 0000000..9103da1 --- /dev/null +++ b/src/main/java/io/g365sfu/exception/SFUHandshakeException.java @@ -0,0 +1,13 @@ +package io.g365sfu.exception; + +public class SFUHandshakeException extends Exception { + + public SFUHandshakeException(String message) { + super(message); + } + + public SFUHandshakeException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/src/main/java/io/g365sfu/net/SfuSock.java b/src/main/java/io/g365sfu/net/SfuSock.java new file mode 100644 index 0000000..6330736 --- /dev/null +++ b/src/main/java/io/g365sfu/net/SfuSock.java @@ -0,0 +1,90 @@ +package io.g365sfu.net; + +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; + +import org.java_websocket.client.WebSocketClient; +import org.java_websocket.handshake.ServerHandshake; + +public class SfuSock extends WebSocketClient { + + private CompletableFuture handshakeFuture = new CompletableFuture<>(); + + public SfuSock(String serverAddress) throws URISyntaxException { + super(new URI("ws://" + serverAddress)); + } + + @Override + public void onOpen(ServerHandshake handshakedata) { + System.out.println("Connected to SFU server"); + } + + @Override + public void onMessage(ByteBuffer bytes) { + if(bytes.remaining() < 1) { + System.err.println("Received empty message from SFU server"); + return; + } + byte messageType = bytes.get(); + if(messageType == 0x01) { + /** + * Сервер ответил на рукопожатие, и мы можем считать его успешным + */ + this.handshakeFuture.complete(true); + return; + } + if(messageType == 0xFF) { + /** + * Сервер отклонил рукопожатие, и мы должны считать его неудачным + */ + this.handshakeFuture.complete(false); + return; + } + } + + @Override + public void onMessage(String message) { + System.err.println("Received unexpected text message from SFU server: " + message); + } + + @Override + public void onClose(int code, String reason, boolean remote) { + if(!this.handshakeFuture.isDone()) { + /** + * Если соединение было закрыто до завершения рукопожатия, то мы считаем его неудачным + */ + this.handshakeFuture.complete(false); + } + } + + @Override + public void onError(Exception ex){ + System.err.println("Error: " + ex.getMessage()); + } + + /** + * Запускает обмен рукопожатиями с сервером SFU + * @param secretKey секретный ключ для аутентификации с сервером SFU + * @return CompletableFuture, который будет завершен с результатом true, если + * рукопожатие прошло успешно, или false, если рукопожатие не удалось или было отклонено сервером SFU + * @internal Этот метод отправляет пакет рукопожатия, который состоит из одного байта 0x01, за которым следует секретный ключ в виде строки байтов. + * Сервер SFU должен ответить одним байтом 0x01 для успешного рукопожатия или 0xFF для отклонения рукопожатия. + */ + public CompletableFuture handshakeExchange(String secretKey) { + ByteBuffer buffer = ByteBuffer.allocate(secretKey.length() + 1); + /** + * 0x01 - код рукопожатия в соотвествии с протоколом g365sfu, за которым следует секретный ключ в виде строки байтов + */ + buffer.put((byte)0x01); + buffer.put(secretKey.getBytes()); + buffer.flip(); + /** + * Отправляем сформированный пакет, и возвращаем CompletableFuture + */ + this.send(buffer); + return this.handshakeFuture; + } + +}