Базовый SFU SDK для g365sfu
This commit is contained in:
3
.env
3
.env
@@ -18,3 +18,6 @@ FIREBASE_CREDENTIALS_PATH=serviceAccount.json
|
|||||||
#Каждые сколько дней будет очищаться буфер (максимальная дистанция синхронизации сообщений)
|
#Каждые сколько дней будет очищаться буфер (максимальная дистанция синхронизации сообщений)
|
||||||
BUFFER_CLEANUP_DAYS=7
|
BUFFER_CLEANUP_DAYS=7
|
||||||
|
|
||||||
|
#SFU Сервера
|
||||||
|
SFU_SERVERS=127.0.0.1:1001@SFU_TEST_SECRET
|
||||||
|
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ import im.rosetta.executors.Executor21GroupLeave;
|
|||||||
import im.rosetta.executors.Executor22GroupBan;
|
import im.rosetta.executors.Executor22GroupBan;
|
||||||
import im.rosetta.executors.Executor24DeviceResolve;
|
import im.rosetta.executors.Executor24DeviceResolve;
|
||||||
import im.rosetta.executors.Executor25Sync;
|
import im.rosetta.executors.Executor25Sync;
|
||||||
|
import im.rosetta.executors.Executor26Signal;
|
||||||
import im.rosetta.executors.Executor3Search;
|
import im.rosetta.executors.Executor3Search;
|
||||||
import im.rosetta.executors.Executor4OnlineState;
|
import im.rosetta.executors.Executor4OnlineState;
|
||||||
import im.rosetta.executors.Executor6Message;
|
import im.rosetta.executors.Executor6Message;
|
||||||
@@ -54,6 +55,8 @@ import im.rosetta.packet.Packet7Read;
|
|||||||
import im.rosetta.packet.Packet8Delivery;
|
import im.rosetta.packet.Packet8Delivery;
|
||||||
import im.rosetta.packet.Packet9DeviceNew;
|
import im.rosetta.packet.Packet9DeviceNew;
|
||||||
import im.rosetta.service.services.BufferCleanupService;
|
import im.rosetta.service.services.BufferCleanupService;
|
||||||
|
import im.rosetta.service.services.ForwardUnitService;
|
||||||
|
import io.g365sfu.SFU;
|
||||||
import io.orprotocol.Server;
|
import io.orprotocol.Server;
|
||||||
import io.orprotocol.Settings;
|
import io.orprotocol.Settings;
|
||||||
import io.orprotocol.packet.PacketManager;
|
import io.orprotocol.packet.PacketManager;
|
||||||
@@ -75,6 +78,7 @@ public class Boot {
|
|||||||
private ClientManager clientManager;
|
private ClientManager clientManager;
|
||||||
private OnlineManager onlineManager;
|
private OnlineManager onlineManager;
|
||||||
private BufferCleanupService bufferCleanupService;
|
private BufferCleanupService bufferCleanupService;
|
||||||
|
private ForwardUnitService forwardUnitService;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Конструктор по умолчанию, использует порт 3000 для сервера
|
* Конструктор по умолчанию, использует порт 3000 для сервера
|
||||||
@@ -106,6 +110,7 @@ public class Boot {
|
|||||||
int cleanupEveryDays = System.getenv("BUFFER_CLEANUP_DAYS") != null ?
|
int cleanupEveryDays = System.getenv("BUFFER_CLEANUP_DAYS") != null ?
|
||||||
Integer.parseInt(System.getenv("BUFFER_CLEANUP_DAYS")) : 7;
|
Integer.parseInt(System.getenv("BUFFER_CLEANUP_DAYS")) : 7;
|
||||||
this.bufferCleanupService = new BufferCleanupService(cleanupEveryDays, this.logger);
|
this.bufferCleanupService = new BufferCleanupService(cleanupEveryDays, this.logger);
|
||||||
|
this.forwardUnitService = new ForwardUnitService(this.logger);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -152,7 +157,8 @@ public class Boot {
|
|||||||
this.registerAllEvents();
|
this.registerAllEvents();
|
||||||
this.printBootMessage();
|
this.printBootMessage();
|
||||||
this.bufferCleanupService.start();
|
this.bufferCleanupService.start();
|
||||||
return this;
|
this.forwardUnitService.connectToAllSFUServers();
|
||||||
|
return this;
|
||||||
}catch(Exception e){
|
}catch(Exception e){
|
||||||
this.logger.error(Color.RED + "Booting error, stack trace:");
|
this.logger.error(Color.RED + "Booting error, stack trace:");
|
||||||
e.printStackTrace();
|
e.printStackTrace();
|
||||||
@@ -217,6 +223,7 @@ public class Boot {
|
|||||||
this.packetManager.registerExecutor(22, new Executor22GroupBan());
|
this.packetManager.registerExecutor(22, new Executor22GroupBan());
|
||||||
this.packetManager.registerExecutor(24, new Executor24DeviceResolve(this.clientManager, this.eventManager, this.packetManager));
|
this.packetManager.registerExecutor(24, new Executor24DeviceResolve(this.clientManager, this.eventManager, this.packetManager));
|
||||||
this.packetManager.registerExecutor(25, new Executor25Sync(this.packetManager));
|
this.packetManager.registerExecutor(25, new Executor25Sync(this.packetManager));
|
||||||
|
this.packetManager.registerExecutor(26, new Executor26Signal(this.clientManager));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void printBootMessage() {
|
private void printBootMessage() {
|
||||||
|
|||||||
39
src/main/java/im/rosetta/executors/Executor26Signal.java
Normal file
39
src/main/java/im/rosetta/executors/Executor26Signal.java
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
package im.rosetta.executors;
|
||||||
|
|
||||||
|
import im.rosetta.client.ClientManager;
|
||||||
|
import im.rosetta.client.tags.ECIAuthentificate;
|
||||||
|
import im.rosetta.packet.Packet26Signal;
|
||||||
|
import io.orprotocol.ProtocolException;
|
||||||
|
import io.orprotocol.client.Client;
|
||||||
|
import io.orprotocol.packet.PacketExecutor;
|
||||||
|
|
||||||
|
public class Executor26Signal extends PacketExecutor<Packet26Signal> {
|
||||||
|
|
||||||
|
public ClientManager clientManager;
|
||||||
|
|
||||||
|
public Executor26Signal(ClientManager clientManager) {
|
||||||
|
this.clientManager = clientManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onPacketReceived(Packet26Signal packet, Client client) throws Exception, ProtocolException {
|
||||||
|
ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class);
|
||||||
|
if (eciAuthentificate == null || !eciAuthentificate.hasAuthorized()) {
|
||||||
|
/**
|
||||||
|
* Если клиент не авторизован, то мы не будем обрабатывать его сигналы на анициализацию звонка
|
||||||
|
* и просто отключим его от сервера.
|
||||||
|
*/
|
||||||
|
client.disconnect();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
/**
|
||||||
|
* TODO: Проверка на существование получателя
|
||||||
|
*/
|
||||||
|
this.clientManager.sendPacketToAuthorizedPK(packet.getDst(), packet);
|
||||||
|
/**
|
||||||
|
* TODO: Высокоприоритетный пуш для сигналов звонков, чтобы мобильные устройства могли показать
|
||||||
|
* интерфейс входящего звонка, даже если приложение находится в фоне
|
||||||
|
*/
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -5,25 +5,21 @@ package im.rosetta.packet.runtime;
|
|||||||
*/
|
*/
|
||||||
public enum NetworkSignalType {
|
public enum NetworkSignalType {
|
||||||
/**
|
/**
|
||||||
* CALL - сигнал для совершения звонка, инициирует процесс звонка
|
* Сигнал для совершения звонка, инициирует процесс звонка
|
||||||
*/
|
*/
|
||||||
CALL(0),
|
CALL(0),
|
||||||
/**
|
/**
|
||||||
* KEY_EXCHANGE - сигнал для обмена ключами, используется для обмена DH ключами между участниками звонка
|
* Сигнал для обмена ключами, используется для обмена DH ключами между участниками звонка
|
||||||
*/
|
*/
|
||||||
KEY_EXCHANGE(1),
|
KEY_EXCHANGE(1),
|
||||||
/**
|
/**
|
||||||
* ICE_CANDIDATE - сигнал для обмена ICE кандидатами, используется для обмена сетевыми кандидатами между участниками звонка
|
* Сигнал для активного звонка, указывает на то, что звонок активен и участники могут обмениваться данными
|
||||||
*/
|
*/
|
||||||
ICE_CONFIG(2),
|
ACTIVE_CALL(2),
|
||||||
/**
|
/**
|
||||||
* ACTIVE_CALL - сигнал для активного звонка, указывает на то, что звонок активен и участники могут обмениваться данными
|
* Сигнал для завершения звонка, указывает на то, что звонок завершен и участники должны прекратить обмен данными
|
||||||
*/
|
*/
|
||||||
ACTIVE_CALL(3),
|
END_CALL(3);
|
||||||
/**
|
|
||||||
* END_CALL - сигнал для завершения звонка, указывает на то, что звонок завершен и участники должны прекратить обмен данными
|
|
||||||
*/
|
|
||||||
END_CALL(4);
|
|
||||||
|
|
||||||
private final int code;
|
private final int code;
|
||||||
|
|
||||||
|
|||||||
@@ -0,0 +1,56 @@
|
|||||||
|
package im.rosetta.service.services;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import im.rosetta.logger.Logger;
|
||||||
|
import im.rosetta.logger.enums.Color;
|
||||||
|
import io.g365sfu.SFU;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Это сервис который взаимодействуют с SFU серверами для организации звонков между пользователями.
|
||||||
|
*/
|
||||||
|
public class ForwardUnitService {
|
||||||
|
|
||||||
|
private Logger logger;
|
||||||
|
private Set<SFU> sfuConnections = new HashSet<>();
|
||||||
|
|
||||||
|
public ForwardUnitService(Logger logger) {
|
||||||
|
this.logger = logger;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Инициализирует соединения к SFU серверам для звонков.
|
||||||
|
* Ожидается, что адреса SFU серверов и секретные ключи для них будут переданы через переменную окружения SFU_SERVERS в формате "address1@secretKey1,address2@secretKey2,...".
|
||||||
|
* Для каждого сервера будет предпринята попытка установить соединение и выполнить рукопожатие.
|
||||||
|
* Если соединение не может быть установлено или рукопожатие не удается,
|
||||||
|
* будет выведено сообщение об ошибке в лог, но процесс продолжится для остальных серверов.
|
||||||
|
* Успешные подключения будут сохранены в наборе sfuConnections для дальнейшего использования
|
||||||
|
*/
|
||||||
|
public void connectToAllSFUServers() {
|
||||||
|
String sfuServersEnv = System.getenv("SFU_SERVERS");
|
||||||
|
if(sfuServersEnv == null || sfuServersEnv.isEmpty()) {
|
||||||
|
this.logger.info(Color.YELLOW + "No SFU servers configured, skipping SFU connections boot");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
String[] sfuServers = sfuServersEnv.split(",");
|
||||||
|
for(String sfuServer : sfuServers) {
|
||||||
|
String[] parts = sfuServer.split("@");
|
||||||
|
if(parts.length != 2) {
|
||||||
|
this.logger.error(Color.RED + "Invalid SFU server configuration: " + sfuServer);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
String address = parts[0];
|
||||||
|
String secretKey = parts[1];
|
||||||
|
try {
|
||||||
|
SFU connection = new SFU(address, secretKey);
|
||||||
|
connection.connect();
|
||||||
|
this.sfuConnections.add(connection);
|
||||||
|
this.logger.info(Color.GREEN + "Successfully connected to SFU server: " + address);
|
||||||
|
} catch (Exception e) {
|
||||||
|
this.logger.error(Color.RED + "Failed to connect to SFU server: " + address + ", error: " + e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
77
src/main/java/io/g365sfu/SFU.java
Normal file
77
src/main/java/io/g365sfu/SFU.java
Normal file
@@ -0,0 +1,77 @@
|
|||||||
|
package io.g365sfu;
|
||||||
|
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
|
import io.g365sfu.exception.SFUException;
|
||||||
|
import io.g365sfu.exception.SFUHandshakeException;
|
||||||
|
import io.g365sfu.net.SfuSock;
|
||||||
|
|
||||||
|
|
||||||
|
public class SFU {
|
||||||
|
|
||||||
|
private String serverAddress;
|
||||||
|
private String secretKey;
|
||||||
|
private SfuSock socket;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Конструктор для создания объекта SFU, который будет использоваться для установления соединения с SFU сервером.
|
||||||
|
* @param serverAddress адрес SFU сервера в формате "host:port", например "sfu.example.com:8080"
|
||||||
|
* @param secretKey секретный ключ для аутентификации с SFU сервером, который должен быть согласован с настройками сервера.
|
||||||
|
*/
|
||||||
|
public SFU(String serverAddress, String secretKey) {
|
||||||
|
this.serverAddress = serverAddress;
|
||||||
|
this.secretKey = secretKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Установить соединение с SFU сервером и начать обмен рукопожатиями для аутентификации и установления безопасного канала связи.
|
||||||
|
* @throws URISyntaxException если адрес сервера имеет неправильный формат
|
||||||
|
* @throws InterruptedException если соединение было прервано во время попытки подключения
|
||||||
|
* @throws SFUException если не удалось установить соединение с SFU сервером или если соединение было установлено,
|
||||||
|
* но не открыто после подключения
|
||||||
|
* @throws TimeoutException не удалось обменяться рукопожатиями с SFU сервером в течение 30 секунд
|
||||||
|
* @throws ExecutionException если во время обмена рукопожатиями произошла ошибка выполнения
|
||||||
|
* @throws SFUHandshakeException если обмен рукопожатиями с SFU завершился неудачно (например, плохой ключ)
|
||||||
|
*/
|
||||||
|
public void connect() throws URISyntaxException, InterruptedException, SFUException, ExecutionException, TimeoutException, SFUHandshakeException {
|
||||||
|
this.socket = new SfuSock(this.serverAddress);
|
||||||
|
boolean connected = this.socket.connectBlocking(30, TimeUnit.SECONDS);
|
||||||
|
if(!connected){
|
||||||
|
throw new SFUException("Failed to connect to SFU server, timeout after 30 seconds: " + this.serverAddress);
|
||||||
|
}
|
||||||
|
if(!this.socket.isOpen()) {
|
||||||
|
throw new SFUException("Connection to SFU server at " + this.serverAddress + " is not open");
|
||||||
|
}
|
||||||
|
boolean estabilished = this.socket.handshakeExchange(this.secretKey).get(30, TimeUnit.SECONDS);
|
||||||
|
if(!estabilished) {
|
||||||
|
throw new SFUHandshakeException("Failed to establish handshake with SFU server at " + this.serverAddress);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Получить адрес SFU сервера, к которому установлено соединение
|
||||||
|
* @return адрес SFU сервера
|
||||||
|
*/
|
||||||
|
public String getServerAddress() {
|
||||||
|
return this.serverAddress;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Получить соединение к SFU серверу, если оно было установлено
|
||||||
|
* @return объект SfuSock, представляющий соединение к SFU серверу
|
||||||
|
*/
|
||||||
|
public SfuSock getConnection() {
|
||||||
|
return this.socket;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Проверить, установлено ли соединение с SFU сервером и открыто ли оно
|
||||||
|
* @return true, если соединение установлено и открыто, false в противном случае
|
||||||
|
*/
|
||||||
|
public boolean isOpen() {
|
||||||
|
return this.socket != null && this.socket.isOpen();
|
||||||
|
}
|
||||||
|
}
|
||||||
13
src/main/java/io/g365sfu/exception/SFUException.java
Normal file
13
src/main/java/io/g365sfu/exception/SFUException.java
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
package io.g365sfu.exception;
|
||||||
|
|
||||||
|
public class SFUException extends Exception {
|
||||||
|
|
||||||
|
public SFUException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public SFUException(String message, Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,13 @@
|
|||||||
|
package io.g365sfu.exception;
|
||||||
|
|
||||||
|
public class SFUHandshakeException extends Exception {
|
||||||
|
|
||||||
|
public SFUHandshakeException(String message) {
|
||||||
|
super(message);
|
||||||
|
}
|
||||||
|
|
||||||
|
public SFUHandshakeException(String message, Throwable cause) {
|
||||||
|
super(message, cause);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
90
src/main/java/io/g365sfu/net/SfuSock.java
Normal file
90
src/main/java/io/g365sfu/net/SfuSock.java
Normal file
@@ -0,0 +1,90 @@
|
|||||||
|
package io.g365sfu.net;
|
||||||
|
|
||||||
|
import java.net.URI;
|
||||||
|
import java.net.URISyntaxException;
|
||||||
|
import java.nio.ByteBuffer;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
|
||||||
|
import org.java_websocket.client.WebSocketClient;
|
||||||
|
import org.java_websocket.handshake.ServerHandshake;
|
||||||
|
|
||||||
|
public class SfuSock extends WebSocketClient {
|
||||||
|
|
||||||
|
private CompletableFuture<Boolean> handshakeFuture = new CompletableFuture<>();
|
||||||
|
|
||||||
|
public SfuSock(String serverAddress) throws URISyntaxException {
|
||||||
|
super(new URI("ws://" + serverAddress));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onOpen(ServerHandshake handshakedata) {
|
||||||
|
System.out.println("Connected to SFU server");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMessage(ByteBuffer bytes) {
|
||||||
|
if(bytes.remaining() < 1) {
|
||||||
|
System.err.println("Received empty message from SFU server");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
byte messageType = bytes.get();
|
||||||
|
if(messageType == 0x01) {
|
||||||
|
/**
|
||||||
|
* Сервер ответил на рукопожатие, и мы можем считать его успешным
|
||||||
|
*/
|
||||||
|
this.handshakeFuture.complete(true);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if(messageType == 0xFF) {
|
||||||
|
/**
|
||||||
|
* Сервер отклонил рукопожатие, и мы должны считать его неудачным
|
||||||
|
*/
|
||||||
|
this.handshakeFuture.complete(false);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onMessage(String message) {
|
||||||
|
System.err.println("Received unexpected text message from SFU server: " + message);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onClose(int code, String reason, boolean remote) {
|
||||||
|
if(!this.handshakeFuture.isDone()) {
|
||||||
|
/**
|
||||||
|
* Если соединение было закрыто до завершения рукопожатия, то мы считаем его неудачным
|
||||||
|
*/
|
||||||
|
this.handshakeFuture.complete(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Exception ex){
|
||||||
|
System.err.println("Error: " + ex.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Запускает обмен рукопожатиями с сервером SFU
|
||||||
|
* @param secretKey секретный ключ для аутентификации с сервером SFU
|
||||||
|
* @return CompletableFuture, который будет завершен с результатом true, если
|
||||||
|
* рукопожатие прошло успешно, или false, если рукопожатие не удалось или было отклонено сервером SFU
|
||||||
|
* @internal Этот метод отправляет пакет рукопожатия, который состоит из одного байта 0x01, за которым следует секретный ключ в виде строки байтов.
|
||||||
|
* Сервер SFU должен ответить одним байтом 0x01 для успешного рукопожатия или 0xFF для отклонения рукопожатия.
|
||||||
|
*/
|
||||||
|
public CompletableFuture<Boolean> handshakeExchange(String secretKey) {
|
||||||
|
ByteBuffer buffer = ByteBuffer.allocate(secretKey.length() + 1);
|
||||||
|
/**
|
||||||
|
* 0x01 - код рукопожатия в соотвествии с протоколом g365sfu, за которым следует секретный ключ в виде строки байтов
|
||||||
|
*/
|
||||||
|
buffer.put((byte)0x01);
|
||||||
|
buffer.put(secretKey.getBytes());
|
||||||
|
buffer.flip();
|
||||||
|
/**
|
||||||
|
* Отправляем сформированный пакет, и возвращаем CompletableFuture
|
||||||
|
*/
|
||||||
|
this.send(buffer);
|
||||||
|
return this.handshakeFuture;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user