Проверка жизнеспособности SFU
This commit is contained in:
6
.env
6
.env
@@ -8,9 +8,9 @@ PORT=3000 # Порт, на котором будет работать серве
|
||||
|
||||
# Список серверов CDN и SDU. Разделяются запятой если их несколько
|
||||
# Без пробелов
|
||||
CDN_SERVERS=http://192.168.6.82:7789
|
||||
CDN_SERVERS=http://10.211.55.2:7789
|
||||
#SDU - Server Delivery Updates
|
||||
SDU_SERVERS=http://192.168.6.82:7777
|
||||
SDU_SERVERS=http://10.211.55.2:7777
|
||||
|
||||
#Firebase Credentials
|
||||
FIREBASE_CREDENTIALS_PATH=serviceAccount.json
|
||||
@@ -23,5 +23,5 @@ SFU_SERVERS=127.0.0.1:1001@SFU_TEST_SECRET
|
||||
|
||||
#TURN Сервера (должны поддерживать TCP и UDP протоколы)
|
||||
# Формат: host:port@username:password через запятую если их несколько, без пробелов
|
||||
TURN_SERVERS=192.168.6.82:3478@user:pass
|
||||
TURN_SERVERS=10.211.55.2:3478@user:pass
|
||||
|
||||
|
||||
@@ -2,6 +2,10 @@ package im.rosetta.service.services;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import im.rosetta.client.ClientManager;
|
||||
import im.rosetta.logger.Logger;
|
||||
@@ -22,14 +26,43 @@ import io.orprotocol.ProtocolException;
|
||||
public class ForwardUnitService {
|
||||
|
||||
private Logger logger;
|
||||
private Set<SFU> sfuConnections = new HashSet<>();
|
||||
private Set<SFU> sfuConnections = ConcurrentHashMap.newKeySet();
|
||||
private ClientManager clientManager;
|
||||
private Set<RTCIceServer> turnServers = new HashSet<>();
|
||||
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
|
||||
|
||||
public ForwardUnitService(Logger logger, ClientManager clientManager) {
|
||||
this.logger = logger;
|
||||
this.clientManager = clientManager;
|
||||
this.readAllTurnServers();
|
||||
this.sfuConnectionsSheduler();
|
||||
}
|
||||
|
||||
/**
|
||||
* Каждые 10 секунд проверяет доступность всех SFU серверов, и если сервер недоступен, то удаляет его из
|
||||
* пула доступных серверов для организации звонков.
|
||||
* Проверка доступности сервера осуществляется через отправку специального сообщения проверки соединения,
|
||||
* и ожидание ответа от сервера. Если ответ не приходит в течение 10 секунд, то сервер считается недоступным.
|
||||
*
|
||||
* Так же, если sfuConnections пустой, то мы стараемся установить соединение со всеми серверами еще раз.
|
||||
*/
|
||||
private void sfuConnectionsSheduler() {
|
||||
this.scheduler.scheduleAtFixedRate(() -> {
|
||||
for(SFU sfu : this.sfuConnections) {
|
||||
try{
|
||||
if(!sfu.getConnection().checkConnection()){
|
||||
this.logger.error("Server " + sfu.getServerAddress() + " not responding");
|
||||
this.sfuConnections.remove(sfu);
|
||||
}
|
||||
}catch(Exception e) {
|
||||
this.logger.error("Failed to check connection to SFU server: " + sfu.getServerAddress() + ", error: " + e.getMessage());
|
||||
this.sfuConnections.remove(sfu);
|
||||
}
|
||||
}
|
||||
if(this.sfuConnections.isEmpty()) {
|
||||
this.connectToAllSFUServers();
|
||||
}
|
||||
}, 10, 10, TimeUnit.SECONDS);
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -108,7 +141,7 @@ public class ForwardUnitService {
|
||||
this.sfuConnections.add(connection);
|
||||
this.logger.info(Color.GREEN + "Successfully connected to SFU server: " + address);
|
||||
} catch (Exception e) {
|
||||
this.logger.error(Color.RED + "Failed to connect to SFU server: " + address + ", error: " + e.getMessage());
|
||||
//this.logger.error(Color.RED + "Failed to connect to SFU server: " + address + ", error: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,7 +72,7 @@ public class SFU {
|
||||
this.socket.setMessageConsumer(this::onMessage);
|
||||
boolean connected = this.socket.connectBlocking(30, TimeUnit.SECONDS);
|
||||
if(!connected){
|
||||
throw new SFUException("Failed to connect to SFU server, timeout after 30 seconds: " + this.serverAddress);
|
||||
throw new SFUException("Failed to connect to SFU server, read time out: " + this.serverAddress);
|
||||
}
|
||||
if(!this.socket.isOpen()) {
|
||||
throw new SFUException("Connection to SFU server at " + this.serverAddress + " is not open");
|
||||
@@ -160,9 +160,9 @@ public class SFU {
|
||||
message.get(peerIdBytes);
|
||||
String peerId = new String(peerIdBytes).trim();
|
||||
int sdpOfferLength = message.getInt();
|
||||
byte[] sdpAnswerBytes = new byte[sdpOfferLength];
|
||||
message.get(sdpAnswerBytes);
|
||||
String sdpOffer = new String(sdpAnswerBytes).trim();
|
||||
byte[] sdpOfferBytes = new byte[sdpOfferLength];
|
||||
message.get(sdpOfferBytes);
|
||||
String sdpOffer = new String(sdpOfferBytes).trim();
|
||||
SDPOffer offer = new SDPOffer(roomId, peerId, sdpOffer);
|
||||
if(this.onSdpOffer != null) {
|
||||
this.onSdpOffer.accept(offer);
|
||||
|
||||
@@ -3,7 +3,9 @@ package io.g365sfu.net;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.ByteOrder;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.java_websocket.client.WebSocketClient;
|
||||
@@ -12,6 +14,7 @@ import org.java_websocket.handshake.ServerHandshake;
|
||||
public class SfuSock extends WebSocketClient {
|
||||
|
||||
private CompletableFuture<Boolean> handshakeFuture = new CompletableFuture<>();
|
||||
private CompletableFuture<Boolean> connectionFuture = new CompletableFuture<>();
|
||||
private Consumer<ByteBuffer> onMessage;
|
||||
|
||||
public SfuSock(String serverAddress) throws URISyntaxException {
|
||||
@@ -20,24 +23,25 @@ public class SfuSock extends WebSocketClient {
|
||||
|
||||
@Override
|
||||
public void onOpen(ServerHandshake handshakedata) {
|
||||
System.out.println("Connected to SFU server");
|
||||
//System.out.println("Connected to SFU server");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onMessage(ByteBuffer bytes) {
|
||||
bytes.order(ByteOrder.BIG_ENDIAN);
|
||||
if(bytes.remaining() < 1) {
|
||||
System.err.println("Received empty message from SFU server");
|
||||
return;
|
||||
}
|
||||
byte messageType = bytes.get();
|
||||
if(messageType == 0x01) {
|
||||
if(messageType == (byte)0x01) {
|
||||
/**
|
||||
* Сервер ответил на рукопожатие, и мы можем считать его успешным
|
||||
*/
|
||||
this.handshakeFuture.complete(true);
|
||||
return;
|
||||
}
|
||||
if(messageType == 0xFF) {
|
||||
|
||||
if(messageType == (byte)0xFF) {
|
||||
/**
|
||||
* Сервер отклонил рукопожатие, и мы должны считать его неудачным
|
||||
*/
|
||||
@@ -45,6 +49,14 @@ public class SfuSock extends WebSocketClient {
|
||||
return;
|
||||
}
|
||||
|
||||
if(messageType == (byte)0xAE) {
|
||||
/**
|
||||
* Сервер отправил сообщение о том, что соединение живое (ответ на проверку соединения), и мы можем считать его успешным
|
||||
*/
|
||||
this.connectionFuture.complete(true);
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* Если это не сообщение рукопожатия, то мы передаем его в установленного потребителя
|
||||
*/
|
||||
@@ -55,7 +67,7 @@ public class SfuSock extends WebSocketClient {
|
||||
|
||||
@Override
|
||||
public void onMessage(String message) {
|
||||
System.err.println("Received unexpected text message from SFU server: " + message);
|
||||
//System.err.println("Received unexpected text message from SFU server: " + message);
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -70,7 +82,7 @@ public class SfuSock extends WebSocketClient {
|
||||
|
||||
@Override
|
||||
public void onError(Exception ex){
|
||||
System.err.println("Error: " + ex.getMessage());
|
||||
//System.err.println("Error: " + ex.getMessage());
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -110,4 +122,32 @@ public class SfuSock extends WebSocketClient {
|
||||
public void setMessageConsumer(Consumer<ByteBuffer> onMessage) {
|
||||
this.onMessage = onMessage;
|
||||
}
|
||||
|
||||
/**
|
||||
* Проверяет состояние соединения, если соединение активно и готово к обмену данными, то возвращает false
|
||||
* @return
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public boolean checkConnection() {
|
||||
ByteBuffer buffer = ByteBuffer.allocate(1);
|
||||
/**
|
||||
* 0x08 - код проверки соединения в соотвествии с протоколом g365sfu
|
||||
*/
|
||||
buffer.put((byte)0xAE);
|
||||
buffer.flip();
|
||||
this.send(buffer);
|
||||
try {
|
||||
boolean result = this.connectionFuture.get(10, TimeUnit.SECONDS);
|
||||
this.connectionFuture = new CompletableFuture<>();
|
||||
return result;
|
||||
}catch(Exception e){
|
||||
/**
|
||||
* Сбрасываем handshakeFuture, так как соединение не активно,
|
||||
* и нам нужно будет пройти рукопожатие заново при следующей проверке соединения
|
||||
*/
|
||||
this.handshakeFuture = new CompletableFuture<>();
|
||||
this.connectionFuture = new CompletableFuture<>();
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user