From ce404188d4e3f153b77a07f17a5a9548ec0f1abc Mon Sep 17 00:00:00 2001 From: RoyceDa Date: Sun, 15 Mar 2026 18:57:03 +0200 Subject: [PATCH] =?UTF-8?q?=D0=9F=D1=80=D0=BE=D0=B2=D0=B5=D1=80=D0=BA?= =?UTF-8?q?=D0=B0=20=D0=B6=D0=B8=D0=B7=D0=BD=D0=B5=D1=81=D0=BF=D0=BE=D1=81?= =?UTF-8?q?=D0=BE=D0=B1=D0=BD=D0=BE=D1=81=D1=82=D0=B8=20SFU?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env | 6 +-- .../service/services/ForwardUnitService.java | 37 ++++++++++++- src/main/java/io/g365sfu/SFU.java | 8 +-- src/main/java/io/g365sfu/net/SfuSock.java | 52 ++++++++++++++++--- 4 files changed, 88 insertions(+), 15 deletions(-) diff --git a/.env b/.env index 251f355..db4a9c6 100644 --- a/.env +++ b/.env @@ -8,9 +8,9 @@ PORT=3000 # Порт, на котором будет работать серве # Список серверов CDN и SDU. Разделяются запятой если их несколько # Без пробелов -CDN_SERVERS=http://192.168.6.82:7789 +CDN_SERVERS=http://10.211.55.2:7789 #SDU - Server Delivery Updates -SDU_SERVERS=http://192.168.6.82:7777 +SDU_SERVERS=http://10.211.55.2:7777 #Firebase Credentials FIREBASE_CREDENTIALS_PATH=serviceAccount.json @@ -23,5 +23,5 @@ SFU_SERVERS=127.0.0.1:1001@SFU_TEST_SECRET #TURN Сервера (должны поддерживать TCP и UDP протоколы) # Формат: host:port@username:password через запятую если их несколько, без пробелов -TURN_SERVERS=192.168.6.82:3478@user:pass +TURN_SERVERS=10.211.55.2:3478@user:pass diff --git a/src/main/java/im/rosetta/service/services/ForwardUnitService.java b/src/main/java/im/rosetta/service/services/ForwardUnitService.java index fa49f3b..d3b349a 100644 --- a/src/main/java/im/rosetta/service/services/ForwardUnitService.java +++ b/src/main/java/im/rosetta/service/services/ForwardUnitService.java @@ -2,6 +2,10 @@ package im.rosetta.service.services; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import im.rosetta.client.ClientManager; import im.rosetta.logger.Logger; @@ -22,14 +26,43 @@ import io.orprotocol.ProtocolException; public class ForwardUnitService { private Logger logger; - private Set sfuConnections = new HashSet<>(); + private Set sfuConnections = ConcurrentHashMap.newKeySet(); private ClientManager clientManager; private Set turnServers = new HashSet<>(); + private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); public ForwardUnitService(Logger logger, ClientManager clientManager) { this.logger = logger; this.clientManager = clientManager; this.readAllTurnServers(); + this.sfuConnectionsSheduler(); + } + + /** + * Каждые 10 секунд проверяет доступность всех SFU серверов, и если сервер недоступен, то удаляет его из + * пула доступных серверов для организации звонков. + * Проверка доступности сервера осуществляется через отправку специального сообщения проверки соединения, + * и ожидание ответа от сервера. Если ответ не приходит в течение 10 секунд, то сервер считается недоступным. + * + * Так же, если sfuConnections пустой, то мы стараемся установить соединение со всеми серверами еще раз. + */ + private void sfuConnectionsSheduler() { + this.scheduler.scheduleAtFixedRate(() -> { + for(SFU sfu : this.sfuConnections) { + try{ + if(!sfu.getConnection().checkConnection()){ + this.logger.error("Server " + sfu.getServerAddress() + " not responding"); + this.sfuConnections.remove(sfu); + } + }catch(Exception e) { + this.logger.error("Failed to check connection to SFU server: " + sfu.getServerAddress() + ", error: " + e.getMessage()); + this.sfuConnections.remove(sfu); + } + } + if(this.sfuConnections.isEmpty()) { + this.connectToAllSFUServers(); + } + }, 10, 10, TimeUnit.SECONDS); } /** @@ -108,7 +141,7 @@ public class ForwardUnitService { this.sfuConnections.add(connection); this.logger.info(Color.GREEN + "Successfully connected to SFU server: " + address); } catch (Exception e) { - this.logger.error(Color.RED + "Failed to connect to SFU server: " + address + ", error: " + e.getMessage()); + //this.logger.error(Color.RED + "Failed to connect to SFU server: " + address + ", error: " + e.getMessage()); } } } diff --git a/src/main/java/io/g365sfu/SFU.java b/src/main/java/io/g365sfu/SFU.java index ce089bb..7d26d5b 100644 --- a/src/main/java/io/g365sfu/SFU.java +++ b/src/main/java/io/g365sfu/SFU.java @@ -72,7 +72,7 @@ public class SFU { this.socket.setMessageConsumer(this::onMessage); boolean connected = this.socket.connectBlocking(30, TimeUnit.SECONDS); if(!connected){ - throw new SFUException("Failed to connect to SFU server, timeout after 30 seconds: " + this.serverAddress); + throw new SFUException("Failed to connect to SFU server, read time out: " + this.serverAddress); } if(!this.socket.isOpen()) { throw new SFUException("Connection to SFU server at " + this.serverAddress + " is not open"); @@ -160,9 +160,9 @@ public class SFU { message.get(peerIdBytes); String peerId = new String(peerIdBytes).trim(); int sdpOfferLength = message.getInt(); - byte[] sdpAnswerBytes = new byte[sdpOfferLength]; - message.get(sdpAnswerBytes); - String sdpOffer = new String(sdpAnswerBytes).trim(); + byte[] sdpOfferBytes = new byte[sdpOfferLength]; + message.get(sdpOfferBytes); + String sdpOffer = new String(sdpOfferBytes).trim(); SDPOffer offer = new SDPOffer(roomId, peerId, sdpOffer); if(this.onSdpOffer != null) { this.onSdpOffer.accept(offer); diff --git a/src/main/java/io/g365sfu/net/SfuSock.java b/src/main/java/io/g365sfu/net/SfuSock.java index ac207b1..a993c4c 100644 --- a/src/main/java/io/g365sfu/net/SfuSock.java +++ b/src/main/java/io/g365sfu/net/SfuSock.java @@ -3,7 +3,9 @@ package io.g365sfu.net; import java.net.URI; import java.net.URISyntaxException; import java.nio.ByteBuffer; +import java.nio.ByteOrder; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import org.java_websocket.client.WebSocketClient; @@ -12,6 +14,7 @@ import org.java_websocket.handshake.ServerHandshake; public class SfuSock extends WebSocketClient { private CompletableFuture handshakeFuture = new CompletableFuture<>(); + private CompletableFuture connectionFuture = new CompletableFuture<>(); private Consumer onMessage; public SfuSock(String serverAddress) throws URISyntaxException { @@ -20,24 +23,25 @@ public class SfuSock extends WebSocketClient { @Override public void onOpen(ServerHandshake handshakedata) { - System.out.println("Connected to SFU server"); + //System.out.println("Connected to SFU server"); } @Override public void onMessage(ByteBuffer bytes) { + bytes.order(ByteOrder.BIG_ENDIAN); if(bytes.remaining() < 1) { - System.err.println("Received empty message from SFU server"); return; } byte messageType = bytes.get(); - if(messageType == 0x01) { + if(messageType == (byte)0x01) { /** * Сервер ответил на рукопожатие, и мы можем считать его успешным */ this.handshakeFuture.complete(true); return; } - if(messageType == 0xFF) { + + if(messageType == (byte)0xFF) { /** * Сервер отклонил рукопожатие, и мы должны считать его неудачным */ @@ -45,6 +49,14 @@ public class SfuSock extends WebSocketClient { return; } + if(messageType == (byte)0xAE) { + /** + * Сервер отправил сообщение о том, что соединение живое (ответ на проверку соединения), и мы можем считать его успешным + */ + this.connectionFuture.complete(true); + return; + } + /** * Если это не сообщение рукопожатия, то мы передаем его в установленного потребителя */ @@ -55,7 +67,7 @@ public class SfuSock extends WebSocketClient { @Override public void onMessage(String message) { - System.err.println("Received unexpected text message from SFU server: " + message); + //System.err.println("Received unexpected text message from SFU server: " + message); } @Override @@ -70,7 +82,7 @@ public class SfuSock extends WebSocketClient { @Override public void onError(Exception ex){ - System.err.println("Error: " + ex.getMessage()); + //System.err.println("Error: " + ex.getMessage()); } /** @@ -110,4 +122,32 @@ public class SfuSock extends WebSocketClient { public void setMessageConsumer(Consumer onMessage) { this.onMessage = onMessage; } + + /** + * Проверяет состояние соединения, если соединение активно и готово к обмену данными, то возвращает false + * @return + * @throws InterruptedException + */ + public boolean checkConnection() { + ByteBuffer buffer = ByteBuffer.allocate(1); + /** + * 0x08 - код проверки соединения в соотвествии с протоколом g365sfu + */ + buffer.put((byte)0xAE); + buffer.flip(); + this.send(buffer); + try { + boolean result = this.connectionFuture.get(10, TimeUnit.SECONDS); + this.connectionFuture = new CompletableFuture<>(); + return result; + }catch(Exception e){ + /** + * Сбрасываем handshakeFuture, так как соединение не активно, + * и нам нужно будет пройти рукопожатие заново при следующей проверке соединения + */ + this.handshakeFuture = new CompletableFuture<>(); + this.connectionFuture = new CompletableFuture<>(); + return false; + } + } }