Реализация звонков на сервере #15

Merged
Royce59 merged 19 commits from dev into main 2026-03-18 17:36:43 +00:00
27 changed files with 1929 additions and 3 deletions

3
.env
View File

@@ -18,3 +18,6 @@ FIREBASE_CREDENTIALS_PATH=serviceAccount.json
#Каждые сколько дней будет очищаться буфер (максимальная дистанция синхронизации сообщений) #Каждые сколько дней будет очищаться буфер (максимальная дистанция синхронизации сообщений)
BUFFER_CLEANUP_DAYS=7 BUFFER_CLEANUP_DAYS=7
#SFU Сервера
SFU_SERVERS=127.0.0.1:1001@SFU_TEST_SECRET

View File

@@ -17,6 +17,9 @@ import im.rosetta.executors.Executor21GroupLeave;
import im.rosetta.executors.Executor22GroupBan; import im.rosetta.executors.Executor22GroupBan;
import im.rosetta.executors.Executor24DeviceResolve; import im.rosetta.executors.Executor24DeviceResolve;
import im.rosetta.executors.Executor25Sync; import im.rosetta.executors.Executor25Sync;
import im.rosetta.executors.Executor26SignalPeer;
import im.rosetta.executors.Executor27WebRTC;
import im.rosetta.executors.Executor28IceServers;
import im.rosetta.executors.Executor3Search; import im.rosetta.executors.Executor3Search;
import im.rosetta.executors.Executor4OnlineState; import im.rosetta.executors.Executor4OnlineState;
import im.rosetta.executors.Executor6Message; import im.rosetta.executors.Executor6Message;
@@ -44,6 +47,9 @@ import im.rosetta.packet.Packet22GroupBan;
import im.rosetta.packet.Packet23DeviceList; import im.rosetta.packet.Packet23DeviceList;
import im.rosetta.packet.Packet24DeviceResolve; import im.rosetta.packet.Packet24DeviceResolve;
import im.rosetta.packet.Packet25Sync; import im.rosetta.packet.Packet25Sync;
import im.rosetta.packet.Packet26SignalPeer;
import im.rosetta.packet.Packet27WebRTC;
import im.rosetta.packet.Packet28IceServers;
import im.rosetta.packet.Packet2Result; import im.rosetta.packet.Packet2Result;
import im.rosetta.packet.Packet3Search; import im.rosetta.packet.Packet3Search;
import im.rosetta.packet.Packet4OnlineSubscribe; import im.rosetta.packet.Packet4OnlineSubscribe;
@@ -53,6 +59,7 @@ import im.rosetta.packet.Packet7Read;
import im.rosetta.packet.Packet8Delivery; import im.rosetta.packet.Packet8Delivery;
import im.rosetta.packet.Packet9DeviceNew; import im.rosetta.packet.Packet9DeviceNew;
import im.rosetta.service.services.BufferCleanupService; import im.rosetta.service.services.BufferCleanupService;
import im.rosetta.service.services.ForwardUnitService;
import io.orprotocol.Server; import io.orprotocol.Server;
import io.orprotocol.Settings; import io.orprotocol.Settings;
import io.orprotocol.packet.PacketManager; import io.orprotocol.packet.PacketManager;
@@ -74,6 +81,7 @@ public class Boot {
private ClientManager clientManager; private ClientManager clientManager;
private OnlineManager onlineManager; private OnlineManager onlineManager;
private BufferCleanupService bufferCleanupService; private BufferCleanupService bufferCleanupService;
private ForwardUnitService forwardUnitService;
/** /**
* Конструктор по умолчанию, использует порт 3000 для сервера * Конструктор по умолчанию, использует порт 3000 для сервера
@@ -105,6 +113,7 @@ public class Boot {
int cleanupEveryDays = System.getenv("BUFFER_CLEANUP_DAYS") != null ? int cleanupEveryDays = System.getenv("BUFFER_CLEANUP_DAYS") != null ?
Integer.parseInt(System.getenv("BUFFER_CLEANUP_DAYS")) : 7; Integer.parseInt(System.getenv("BUFFER_CLEANUP_DAYS")) : 7;
this.bufferCleanupService = new BufferCleanupService(cleanupEveryDays, this.logger); this.bufferCleanupService = new BufferCleanupService(cleanupEveryDays, this.logger);
this.forwardUnitService = new ForwardUnitService(this.logger, this.clientManager);
} }
/** /**
@@ -151,6 +160,7 @@ public class Boot {
this.registerAllEvents(); this.registerAllEvents();
this.printBootMessage(); this.printBootMessage();
this.bufferCleanupService.start(); this.bufferCleanupService.start();
this.forwardUnitService.connectToAllSFUServers();
return this; return this;
}catch(Exception e){ }catch(Exception e){
this.logger.error(Color.RED + "Booting error, stack trace:"); this.logger.error(Color.RED + "Booting error, stack trace:");
@@ -194,6 +204,9 @@ public class Boot {
this.packetManager.registerPacket(23, Packet23DeviceList.class); this.packetManager.registerPacket(23, Packet23DeviceList.class);
this.packetManager.registerPacket(24, Packet24DeviceResolve.class); this.packetManager.registerPacket(24, Packet24DeviceResolve.class);
this.packetManager.registerPacket(25, Packet25Sync.class); this.packetManager.registerPacket(25, Packet25Sync.class);
this.packetManager.registerPacket(26, Packet26SignalPeer.class);
this.packetManager.registerPacket(27, Packet27WebRTC.class);
this.packetManager.registerPacket(28, Packet28IceServers.class);
} }
private void registerAllExecutors() { private void registerAllExecutors() {
@@ -215,6 +228,9 @@ public class Boot {
this.packetManager.registerExecutor(22, new Executor22GroupBan()); this.packetManager.registerExecutor(22, new Executor22GroupBan());
this.packetManager.registerExecutor(24, new Executor24DeviceResolve(this.clientManager, this.eventManager, this.packetManager)); this.packetManager.registerExecutor(24, new Executor24DeviceResolve(this.clientManager, this.eventManager, this.packetManager));
this.packetManager.registerExecutor(25, new Executor25Sync(this.packetManager)); this.packetManager.registerExecutor(25, new Executor25Sync(this.packetManager));
this.packetManager.registerExecutor(26, new Executor26SignalPeer(this.clientManager, this.forwardUnitService));
this.packetManager.registerExecutor(27, new Executor27WebRTC(this.forwardUnitService));
this.packetManager.registerExecutor(28, new Executor28IceServers(this.forwardUnitService));
} }
private void printBootMessage() { private void printBootMessage() {

View File

@@ -236,7 +236,7 @@ public abstract class Repository<T> {
* @param noResultType если true, то не указывать тип результата в запросе, используется для запросов типа UPDATE и DELETE * @param noResultType если true, то не указывать тип результата в запросе, используется для запросов типа UPDATE и DELETE
* @return список сущностей * @return список сущностей
*/ */
@SuppressWarnings("deprecation") @SuppressWarnings({"unchecked", "deprecation"})
public QuerySession<T> buildQuery(String queryString, HashMap<String, Object> parameters, boolean noResultType) { public QuerySession<T> buildQuery(String queryString, HashMap<String, Object> parameters, boolean noResultType) {
Session session = HibernateUtil.openSession(); Session session = HibernateUtil.openSession();
try { try {

View File

@@ -0,0 +1,79 @@
package im.rosetta.executors;
import im.rosetta.Failures;
import im.rosetta.client.ClientManager;
import im.rosetta.client.tags.ECIAuthentificate;
import im.rosetta.packet.Packet26SignalPeer;
import im.rosetta.packet.runtime.NetworkSignalType;
import im.rosetta.service.services.ForwardUnitService;
import io.g365sfu.Room;
import io.orprotocol.ProtocolException;
import io.orprotocol.client.Client;
import io.orprotocol.packet.PacketExecutor;
/**
* Используется в Peer To Peer звонках, в групповых звонках другой сигналинг
*/
public class Executor26SignalPeer extends PacketExecutor<Packet26SignalPeer> {
private ClientManager clientManager;
private ForwardUnitService fus;
public Executor26SignalPeer(ClientManager clientManager, ForwardUnitService fus) {
this.clientManager = clientManager;
this.fus = fus;
}
@Override
public void onPacketReceived(Packet26SignalPeer packet, Client client) throws Exception, ProtocolException {
ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class);
if (eciAuthentificate == null || !eciAuthentificate.hasAuthorized()) {
/**
* Если клиент не авторизован, то мы не будем обрабатывать его сигналы на анициализацию звонка
* и просто отключим его от сервера.
*/
client.disconnect(Failures.HANDSHAKE_NOT_COMPLETED);
return;
}
NetworkSignalType type = packet.getSignalType();
if(type == NetworkSignalType.CALL) {
/**
* Инициируется звонок от src к dst, проверяем, что dst не занят другим звонком, если занят, то отправляем сигнал END_CALL_BECAUSE_BUSY обратно src
*/
Room room = this.fus.getRoomByParticipantId(packet.getDst());
if(room != null) {
/**
* Получатель сигнала уже находится в другой комнате, значит он занят другим звонком, отправляем сигнал END_CALL_BECAUSE_BUSY обратно src
*/
Packet26SignalPeer responsePacket = new Packet26SignalPeer();
responsePacket.setSignalType(NetworkSignalType.END_CALL_BECAUSE_BUSY);
this.clientManager.sendPacketToAuthorizedPK(packet.getSrc(), responsePacket);
return;
}
}
if(type == NetworkSignalType.CREATE_ROOM){
/**
* Создается комната для звонка
*/
Room room = this.fus.createRoom();
room.addParticipant(packet.getSrc());
room.addParticipant(packet.getDst());
packet.setRoomId(room.getRoomId());
/**
* Результат создания комнаты транслируем обоим участникам, чтобы они могли начать обмен WebRTC SDP, и тд
*/
this.clientManager.sendPacketToAuthorizedPK(packet.getSrc(), packet);
this.clientManager.sendPacketToAuthorizedPK(packet.getDst(), packet);
return;
}
/**
* TODO: Проверка на существование получателя
*/
this.clientManager.sendPacketToAuthorizedPK(packet.getDst(), packet);
/**
* TODO: Высокоприоритетный пуш для сигналов звонков, чтобы мобильные устройства могли показать
* интерфейс входящего звонка, даже если приложение находится в фоне
*/
}
}

View File

@@ -0,0 +1,73 @@
package im.rosetta.executors;
import im.rosetta.Failures;
import im.rosetta.client.tags.ECIAuthentificate;
import im.rosetta.packet.Packet27WebRTC;
import im.rosetta.packet.runtime.NetworkWebRTCType;
import im.rosetta.service.services.ForwardUnitService;
import io.g365sfu.Room;
import io.orprotocol.ProtocolException;
import io.orprotocol.client.Client;
import io.orprotocol.packet.PacketExecutor;
public class Executor27WebRTC extends PacketExecutor<Packet27WebRTC> {
private ForwardUnitService fus;
public Executor27WebRTC(ForwardUnitService fus) {
this.fus = fus;
}
@Override
public void onPacketReceived(Packet27WebRTC packet, Client client) throws Exception, ProtocolException {
ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class);
if(eciAuthentificate == null || !eciAuthentificate.hasAuthorized()) {
/**
* Если клиент не авторизован, то мы не будем обрабатывать его сигналы на инициализацию звонка
* и просто отключим его от сервера.
*/
client.disconnect(Failures.HANDSHAKE_NOT_COMPLETED);
return;
}
String publicKey = eciAuthentificate.getPublicKey();
/**
* Так как в комнатах Participants это публичные ключи пользователей, то мы можем
* найти комнату, в которой находится пользователь, по его публичному ключу
*/
Room room = this.fus.getRoomByParticipantId(publicKey);
if(room == null) {
/**
* Если комната не найдена, то мы не будем обрабатывать сигналы для звонка
* и просто отключим клиента от сервера.
*/
client.disconnect(Failures.DATA_MISSMATCH);
return;
}
NetworkWebRTCType type = packet.getType();
if(type == NetworkWebRTCType.OFFER) {
/**
* Если это OFFER, то отправляем OFFER на сервер SFU,
* который отвечает за эту комнату, чтобы он транслировал его всем участникам комнаты, кроме отправителя
*/
room.sdpOffer(publicKey, packet.getSdpOrCandidate());
}
if(type == NetworkWebRTCType.ICE_CANDIDATE) {
/**
* Если это ICE кандидат, то отправляем его на сервер SFU,
* который отвечает за эту комнату, чтобы он транслировал его всем участникам комнаты, кроме отправителя
*/
room.iceCandidate(publicKey, packet.getSdpOrCandidate());
}
if(type == NetworkWebRTCType.ANSWER) {
/**
* Если это ANSWER, то отправляем его на сервер SFU,
* который отвечает за эту комнату, чтобы он транслировал его всем участникам комнаты, кроме отправителя
*/
room.sdpAnswer(publicKey, packet.getSdpOrCandidate());
}
}
}

View File

@@ -0,0 +1,39 @@
package im.rosetta.executors;
import java.util.ArrayList;
import im.rosetta.Failures;
import im.rosetta.client.tags.ECIAuthentificate;
import im.rosetta.packet.Packet28IceServers;
import im.rosetta.service.services.ForwardUnitService;
import io.orprotocol.ProtocolException;
import io.orprotocol.client.Client;
import io.orprotocol.packet.PacketExecutor;
public class Executor28IceServers extends PacketExecutor<Packet28IceServers> {
private ForwardUnitService fus;
public Executor28IceServers(ForwardUnitService fus) {
this.fus = fus;
}
@Override
public void onPacketReceived(Packet28IceServers packet, Client client) throws Exception, ProtocolException {
ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class);
if(eciAuthentificate == null || !eciAuthentificate.hasAuthorized()) {
/**
* Если клиент не авторизован, то мы не будем обрабатывать его запрос на получение ICE серверов
* и просто отключим его от сервера.
*/
client.disconnect(Failures.HANDSHAKE_NOT_COMPLETED);
return;
}
/**
* Берем TURN сервера и отправляем их клиенту
*/
packet.setIceServers(new ArrayList<>(this.fus.getTurnServers()));
client.send(packet);
}
}

View File

@@ -0,0 +1,153 @@
package im.rosetta.packet;
import im.rosetta.packet.runtime.NetworkSignalType;
import io.orprotocol.Stream;
import io.orprotocol.packet.Packet;
/**
* Пакет cигналинга для совершения звонка. Учавствует в обмене ключами,
* иницилизации звонка.
*/
public class Packet26SignalPeer extends Packet {
/**
* Идентификатор отправителя сигнала, обычно это PK пользователя, который отправляет пакет
*/
private String src;
/**
* Идентификатор получателя сигнала, обычно это PK пользователя, который должен принять пакет
*/
private String dst;
/**
* Если сигнал предназначен для обмена ключами, то в это поле
* будет помещаться sharedPublic публичная часть ключа DH алгоритма
*/
private String sharedPublic;
/**
* Тип сигнала
*/
private NetworkSignalType signalType;
/**
* Идентификатор комнаты, в которой происходит звонок, заполняется если тип сигнала CREATE_ROOM, иначе null
*/
private String roomId;
@Override
public void read(Stream stream) {
this.signalType = NetworkSignalType.fromCode(stream.readInt8());
if(this.signalType == NetworkSignalType.END_CALL_BECAUSE_BUSY || this.signalType == NetworkSignalType.END_CALL_BECAUSE_PEER_DISCONNECTED) {
return;
}
this.src = stream.readString();
this.dst = stream.readString();
if (signalType == NetworkSignalType.KEY_EXCHANGE) {
this.sharedPublic = stream.readString();
}
if(signalType == NetworkSignalType.CREATE_ROOM) {
this.roomId = stream.readString();
}
}
@Override
public Stream write() {
Stream stream = new Stream();
stream.writeInt16(this.packetId);
stream.writeInt8(this.signalType.getCode());
if(this.signalType == NetworkSignalType.END_CALL_BECAUSE_BUSY || this.signalType == NetworkSignalType.END_CALL_BECAUSE_PEER_DISCONNECTED) {
return stream;
}
stream.writeString(this.src);
stream.writeString(this.dst);
if (signalType == NetworkSignalType.KEY_EXCHANGE) {
stream.writeString(this.sharedPublic);
}
if(signalType == NetworkSignalType.CREATE_ROOM) {
stream.writeString(this.roomId);
}
return stream;
}
/**
* Получить идентификатор отправителя сигнала
* @return идентификатор отправителя сигнала
*/
public String getSrc() {
return src;
}
/**
* Установить идентификатор отправителя сигнала
* @param src идентификатор отправителя сигнала
*/
public void setSrc(String src) {
this.src = src;
}
/**
* Получить идентификатор получателя сигнала
* @return идентификатор получателя сигнала
*/
public String getDst() {
return dst;
}
/**
* Установить идентификатор получателя сигнала
* @param dst идентификатор получателя сигнала
*/
public void setDst(String dst) {
this.dst = dst;
}
/**
* Получить публичную часть ключа DH алгоритма, если сигнал предназначен для обмена ключами
* @return публичная часть ключа DH алгоритма или null, если сигнал не предназначен для обмена ключами
*/
public String getSharedPublic() {
return sharedPublic;
}
/**
* Установить публичную часть ключа DH алгоритма, если сигнал предназначен для обмена ключами
* @param sharedPublic публичная часть ключа DH алгоритма
*/
public void setSharedPublic(String sharedPublic) {
this.sharedPublic = sharedPublic;
}
/**
* Получить тип сигнала
* @return тип сигнала
*/
public NetworkSignalType getSignalType() {
return signalType;
}
/**
* Установить тип сигнала
* @param signalType тип сигнала
*/
public void setSignalType(NetworkSignalType signalType) {
this.signalType = signalType;
}
/**
* Получить идентификатор созданной комнаты, если тип сигнала CREATE_ROOM
* @return идентификатор комнаты, если тип сигнала CREATE_ROOM, иначе null
*/
public String getRoomId() {
return roomId;
}
/**
* Установить идентификатор комнаты, в которой происходит звонок, если тип сигнала CREATE_ROOM
* @param roomId идентификатор комнаты, если тип сигнала CREATE_ROOM
*/
public void setRoomId(String roomId) {
this.roomId = roomId;
}
}

View File

@@ -0,0 +1,65 @@
package im.rosetta.packet;
import im.rosetta.packet.runtime.NetworkWebRTCType;
import io.orprotocol.Stream;
import io.orprotocol.packet.Packet;
public class Packet27WebRTC extends Packet {
/**
* SDP оффер/answer или ICE кандидат, в зависимости от типа сообщения
*/
private String sdpOrCandidate;
/**
* Тип сообщения WebRTC
*/
private NetworkWebRTCType type;
@Override
public void read(Stream stream) {
this.type = NetworkWebRTCType.fromCode(stream.readInt8());
this.sdpOrCandidate = stream.readString();
}
@Override
public Stream write() {
Stream steram = new Stream();
steram.writeInt16(this.packetId);
steram.writeInt8(this.type.getCode());
steram.writeString(this.sdpOrCandidate);
return steram;
}
/**
* Получить SDP оффер/answer или ICE кандидат, в зависимости от типа сообщения
* @return SDP оффер/answer или ICE кандидат
*/
public String getSdpOrCandidate() {
return sdpOrCandidate;
}
/**
* Получить тип сообщения WebRTC, который указывает на то, является ли это оффером, ответом на оффер или ICE кандидатом
* @return тип сообщения WebRTC
*/
public NetworkWebRTCType getType() {
return type;
}
/**
* Установить SDP оффер/answer или ICE кандидат, в зависимости от типа сообщения
* @param sdpOrCandidate SDP оффер/answer или ICE кандидат
*/
public void setSdpOrCandidate(String sdpOrCandidate) {
this.sdpOrCandidate = sdpOrCandidate;
}
/**
* Установить тип сообщения WebRTC, который указывает на то, является ли это оффером, ответом на оффер или ICE кандидатом
* @param type тип сообщения WebRTC
*/
public void setType(NetworkWebRTCType type) {
this.type = type;
}
}

View File

@@ -0,0 +1,58 @@
package im.rosetta.packet;
import java.util.ArrayList;
import java.util.List;
import io.g365sfu.webrtc.RTCIceServer;
import io.orprotocol.Stream;
import io.orprotocol.packet.Packet;
public class Packet28IceServers extends Packet {
private List<RTCIceServer> iceServers;
@Override
public void read(Stream stream) {
int count = stream.readInt16();
this.iceServers = new ArrayList<>();
for (int i = 0; i < count; i++) {
String url = stream.readString();
String username = stream.readString();
String credential = stream.readString();
String transport = stream.readString();
RTCIceServer iceServer = new RTCIceServer(url, username, credential, transport);
iceServers.add(iceServer);
}
}
@Override
public Stream write() {
Stream stream = new Stream();
stream.writeInt16(this.packetId);
stream.writeInt16(iceServers.size());
for (RTCIceServer iceServer : iceServers) {
stream.writeString(iceServer.getUrl());
stream.writeString(iceServer.getUsername());
stream.writeString(iceServer.getCredential());
stream.writeString(iceServer.getTransport());
}
return stream;
}
/**
* Получить список серверов ICE, которые могут быть использованы для обмена кандидатами между участниками звонка через сервер SFU.
* @return список серверов ICE, которые могут быть использованы для обмена кандидатами между участниками звонка через сервер SFU.
*/
public List<RTCIceServer> getIceServers() {
return iceServers;
}
/**
* Установить список серверов ICE, которые могут быть использованы для обмена кандидатами между участниками звонка через сервер SFU.
* @param iceServers список серверов ICE, которые могут быть использованы для обмена кандидатами между участниками звонка через сервер SFU.
*/
public void setIceServers(List<RTCIceServer> iceServers) {
this.iceServers = iceServers;
}
}

View File

@@ -0,0 +1,54 @@
package im.rosetta.packet.runtime;
/**
* Типы сигналов, используемых в сетевом взаимодействии при звонках
*/
public enum NetworkSignalType {
/**
* Сигнал для совершения звонка, инициирует процесс звонка
*/
CALL(0),
/**
* Сигнал для обмена ключами, используется для обмена DH ключами между участниками звонка
*/
KEY_EXCHANGE(1),
/**
* Сигнал для активного звонка, указывает на то, что звонок активен и участники могут обмениваться данными
*/
ACTIVE_CALL(2),
/**
* Сигнал для завершения звонка, указывает на то, что звонок завершен и участники должны прекратить обмен данными
*/
END_CALL(3),
/**
* Создание комнаты
*/
CREATE_ROOM(4),
/**
* Обрыв связи с пиром
*/
END_CALL_BECAUSE_PEER_DISCONNECTED(5),
/**
* Не удалось дозвониться - пользователь занят другим звонком
*/
END_CALL_BECAUSE_BUSY(6);
private final int code;
NetworkSignalType(int code) {
this.code = code;
}
public int getCode() {
return code;
}
public static NetworkSignalType fromCode(int code) {
for (NetworkSignalType type : NetworkSignalType.values()) {
if (type.code == code) {
return type;
}
}
throw new IllegalArgumentException("Unknown NetworkSignalType code: " + code);
}
}

View File

@@ -0,0 +1,36 @@
package im.rosetta.packet.runtime;
public enum NetworkWebRTCType {
/**
* Оффер
*/
OFFER(0),
/**
* Ответ на оффер
*/
ANSWER(1),
/**
* ICE кандидат
*/
ICE_CANDIDATE(2);
private final int code;
NetworkWebRTCType(int code) {
this.code = code;
}
public int getCode() {
return code;
}
public static NetworkWebRTCType fromCode(int code) {
for (NetworkWebRTCType type : NetworkWebRTCType.values()) {
if (type.code == code) {
return type;
}
}
throw new IllegalArgumentException("Unknown NetworkWebRTCType code: " + code);
}
}

View File

@@ -0,0 +1,259 @@
package im.rosetta.service.services;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import im.rosetta.client.ClientManager;
import im.rosetta.logger.Logger;
import im.rosetta.logger.enums.Color;
import im.rosetta.packet.Packet26SignalPeer;
import im.rosetta.packet.Packet27WebRTC;
import im.rosetta.packet.runtime.NetworkSignalType;
import im.rosetta.packet.runtime.NetworkWebRTCType;
import io.g365sfu.Room;
import io.g365sfu.SFU;
import io.g365sfu.net.DisconnectReason;
import io.g365sfu.net.DisconnectedPeer;
import io.g365sfu.webrtc.ICECandidate;
import io.g365sfu.webrtc.RTCIceServer;
import io.g365sfu.webrtc.SDPAnswer;
import io.g365sfu.webrtc.SDPOffer;
import io.orprotocol.ProtocolException;
/**
* Это сервис который взаимодействуют с SFU серверами для организации звонков между пользователями.
*/
public class ForwardUnitService {
private Logger logger;
private Set<SFU> sfuConnections = ConcurrentHashMap.newKeySet();
private ClientManager clientManager;
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
public ForwardUnitService(Logger logger, ClientManager clientManager) {
this.logger = logger;
this.clientManager = clientManager;
this.sfuConnectionsSheduler();
}
/**
* Каждые 10 секунд проверяет доступность всех SFU серверов, и если сервер недоступен, то удаляет его из
* пула доступных серверов для организации звонков.
* Проверка доступности сервера осуществляется через отправку специального сообщения проверки соединения,
* и ожидание ответа от сервера. Если ответ не приходит в течение 10 секунд, то сервер считается недоступным.
*
* Так же, если sfuConnections пустой, то мы стараемся установить соединение со всеми серверами еще раз.
*/
private void sfuConnectionsSheduler() {
this.scheduler.scheduleAtFixedRate(() -> {
for(SFU sfu : this.sfuConnections) {
try{
if(!sfu.getConnection().checkConnection()){
this.logger.error("Server " + sfu.getServerAddress() + " not responding");
this.sfuConnections.remove(sfu);
}
}catch(Exception e) {
this.logger.error("Failed to check connection to SFU server: " + sfu.getServerAddress() + ", error: " + e.getMessage());
this.sfuConnections.remove(sfu);
}
}
if(this.sfuConnections.isEmpty()) {
this.connectToAllSFUServers();
}
}, 10, 10, TimeUnit.SECONDS);
}
/**
* Инициализирует соединения к SFU серверам для звонков.
* Ожидается, что адреса SFU серверов и секретные ключи для них будут переданы через переменную окружения SFU_SERVERS в формате "address1@secretKey1,address2@secretKey2,...".
* Для каждого сервера будет предпринята попытка установить соединение и выполнить рукопожатие.
* Если соединение не может быть установлено или рукопожатие не удается,
* будет выведено сообщение об ошибке в лог, но процесс продолжится для остальных серверов.
* Успешные подключения будут сохранены в наборе sfuConnections для дальнейшего использования
*/
public void connectToAllSFUServers() {
String sfuServersEnv = System.getenv("SFU_SERVERS");
if(sfuServersEnv == null || sfuServersEnv.isEmpty()) {
this.logger.info(Color.YELLOW + "No SFU servers configured, skipping SFU connections boot");
return;
}
String[] sfuServers = sfuServersEnv.split(",");
for(String sfuServer : sfuServers) {
String[] parts = sfuServer.split("@");
if(parts.length != 2) {
this.logger.error(Color.RED + "Invalid SFU server configuration: " + sfuServer);
continue;
}
String address = parts[0];
String secretKey = parts[1];
try {
SFU connection = new SFU(address, secretKey);
connection.connect();
connection.setIceConsumer(arg0 -> {
try {
onIceCandidate(arg0);
} catch (ProtocolException e) {
this.logger.error(Color.RED + "Failed to retranslate ICE-candidate from SFU server: " + address + ", error: " + e.getMessage());
e.printStackTrace();
}
});
connection.setAnswerConsumer(arg0 -> {
try{
onSdpAnswer(arg0);
}catch(ProtocolException e){
this.logger.error(Color.RED + "Failed to retranslate SDP answer from SFU server: " + address + ", error: " + e.getMessage());
e.printStackTrace();
}
});
connection.setOfferConsumer(arg0 -> {
try{
onSdpOffer(arg0);
}catch(ProtocolException e){
this.logger.error(Color.RED + "Failed to retranslate SDP offer from SFU server: " + address + ", error: " + e.getMessage());
e.printStackTrace();
}
});
connection.setPeerDisconnectedConsumer(arg0 -> {
try{
onPeerDisconnected(arg0);
}catch(ProtocolException e){
this.logger.error(Color.RED + "Failed to handle peer disconnection from SFU server: " + address + ", error: " + e.getMessage());
e.printStackTrace();
}
});
this.sfuConnections.add(connection);
this.logger.info(Color.GREEN + "Successfully connected to SFU server: " + address);
} catch (Exception e) {
//this.logger.error(Color.RED + "Failed to connect to SFU server: " + address + ", error: " + e.getMessage());
}
}
}
public void onPeerDisconnected(DisconnectedPeer disconnectedPeer) throws ProtocolException {
Room room = disconnectedPeer.getRoom();
if(disconnectedPeer.getReason() != DisconnectReason.FAILED){
/**
* Если у нас произошло штатное отключение, а не в результате обрыва связи - то не нужно отправлять
* оппонентам пакеты о том, что участник отключился в результате обрыва связи.
*/
return;
}
for(String peerId : room.getParticipants()) {
/**
* Уведомляем все пиры, что соединение с пиром было потеряно
*/
if(room.getParticipants().size() == 1) {
/**
* Звонок был завершен, так как в комнате остался только один участник, который не может продолжать звонок в одиночку.
*/
Packet26SignalPeer packet = new Packet26SignalPeer();
packet.setSignalType(NetworkSignalType.END_CALL_BECAUSE_PEER_DISCONNECTED);
this.clientManager.sendPacketToAuthorizedPK(peerId, packet);
}
}
}
/**
* Выполняется когда сервер SFU отправляет SDP answer для одного из участников комнаты.
* @param sdpAnswer объект SDPAnswer, который содержит информацию о комнате, участнике и самом answer,
* @throws ProtocolException
*/
public void onSdpAnswer(SDPAnswer sdpAnswer) throws ProtocolException {
String participantId = sdpAnswer.getParticipantId();
Packet27WebRTC packet = new Packet27WebRTC();
packet.setSdpOrCandidate(sdpAnswer.getSdp());
packet.setType(NetworkWebRTCType.ANSWER);
this.clientManager.sendPacketToAuthorizedPK(participantId, packet);
}
/**
* Выполняется когда сервер SFU отправляет ICE-кандидата для одного из участников комнаты.
* @param iceCandidate объект ICECandidate,
* который содержит информацию о комнате, участнике и самом кандидате,
* которая необходима для правильной маршрутизации данных между участниками звонка через сервер SFU.
* @throws ProtocolException
*/
public void onIceCandidate(ICECandidate iceCandidate) throws ProtocolException {
String publicKey = iceCandidate.getParticipantId();
Packet27WebRTC packet = new Packet27WebRTC();
packet.setSdpOrCandidate(iceCandidate.getCandidate());
packet.setType(NetworkWebRTCType.ICE_CANDIDATE);
this.clientManager.sendPacketToAuthorizedPK(publicKey, packet);
}
/**
* Выполняется когда сервер SFU отправляет SDP оффер для одного из участников комнаты.
* @param sdpOffer объект SDPOffer,
* который содержит информацию о комнате, участнике и самом оффере,
* которая необходима для правильной маршрутизации данных между участниками звонка через сервер SFU.
* @throws ProtocolException
*/
public void onSdpOffer(SDPOffer sdpOffer) throws ProtocolException {
String participantId = sdpOffer.getParticipantId();
Packet27WebRTC packet = new Packet27WebRTC();
packet.setSdpOrCandidate(sdpOffer.getSdp());
packet.setType(NetworkWebRTCType.OFFER);
this.clientManager.sendPacketToAuthorizedPK(participantId, packet);
}
/**
* Получает комнату в которой сейчас находится пользователь
* @param participantId идентификатор пользователя на сервере SFU
* @return комната Room если найдена, иначе null
*/
public Room getRoomByParticipantId(String participantId) {
for(SFU sfu : this.sfuConnections){
Room room = sfu.getRoomByParticipantId(participantId);
if(room != null){
return room;
}
}
return null;
}
/**
* Автоматически выбирает сервер для создания комнаты, и создает в нем комнату
* @return комната
*/
public Room createRoom() {
if(this.sfuConnections.isEmpty()) {
return null;
}
SFU selectedSfu = null;
int minRooms = Integer.MAX_VALUE;
for(SFU sfu : this.sfuConnections) {
int roomsCount = sfu.getRoomsCount();
if(roomsCount < minRooms) {
minRooms = roomsCount;
selectedSfu = sfu;
}
}
if(selectedSfu != null) {
try{
return selectedSfu.createRoom();
}catch(Exception e){
this.logger.error(Color.RED + "Failed to create room on SFU server: " + selectedSfu.getServerAddress() + ", error: " + e.getMessage());
}
}
return null;
}
/**
* Получить список TURN серверов, которые могут быть использованы для обмена кандидатами между участниками звонка через сервер SFU.
* @return список серверов для RTC
*/
public Set<RTCIceServer> getTurnServers() {
Set<RTCIceServer> iceServers = new HashSet<>();
for(SFU sfu : this.sfuConnections) {
Set<RTCIceServer> iceServersSupporetd = sfu.getIceServers();
iceServers.addAll(iceServersSupporetd);
}
return iceServers;
}
}

View File

@@ -0,0 +1,160 @@
package io.g365sfu;
import java.nio.ByteBuffer;
import java.util.HashSet;
import io.g365sfu.net.Outgoing;
/**
* Это комната для звонков, она может быть как для двоих участников, так и для групповых звонков.
* Комната содержит в себе информацию о том, на каком она SFU сервере, какой у нее ID, кто в ней участвует.
*/
public class Room {
private String roomId;
private SFU sfu;
private HashSet<String> participants;
/**
* Создать комнату с заданным ID, SFU сервером и участниками
* @param roomId уникальный идентификатор комнаты, который должен быть согласован с SFU сервером и использоваться для всех операций с этой комнатой
* @param sfu SFU сервер, на котором будет создана комната и через который будет происходить обмен данными между участниками
* @param participants массив идентификаторов участников, которые будут добавлены в комнату. Идентификаторы должны быть согласованы с SFU сервером и использоваться для маршрутизации данных между участниками.
*/
public Room(String roomId, SFU sfu, HashSet<String> participants) {
this.roomId = roomId;
this.sfu = sfu;
this.participants = participants;
}
public String getRoomId() {
return roomId;
}
public SFU getSfu() {
return sfu;
}
public HashSet<String> getParticipants() {
return participants;
}
public void addParticipant(String participantId) {
this.participants.add(participantId);
}
public void removeParticipant(String participantId) {
this.participants.remove(participantId);
}
public boolean containsParticipant(String participantId) {
return this.participants.contains(participantId);
}
/**
* Отправляет SDP offer в SFU сервер для организации звонка между пользователями.
* Этот метод используется для отправки предложения о соединении от одного участника
* к другому через сервер SFU, который будет пересылать это предложение целевому участнику.
* Параметры roomId и peerId используются для идентификации комнаты и целевого участника,
* а sdpOffer содержит описание медиа-сессии, которую участник хочет установить.
* @param roomId идентификатор комнаты, в которой участник хочет организовать звонок
* @param peerId идентификатор целевого участника, которому отправляется предложение о соединении
* @param sdpOffer строка, содержащая SDP offer, которая описывает медиа-сессию, которую участник хочет установить с целевым участником.
* @internal Этот метод формирует пакет с кодом 0x04, за которым следует идентификатор комнаты, идентификатор целевого участника и строка SDP offer.
*/
public void sdpOffer(String participantId, String sdpOffer) {
/**
* 1 байт номер пакета,
* 4 байта длина ID комнаты,
* N байт ID комнаты,
* 4 байта длина ID участника,
* M байт ID участника,
* 4 байта длина SDP offer,
* K байт SDP offer
*/
ByteBuffer buffer = ByteBuffer.allocate(
1 + 4 +
this.roomId.getBytes().length + 4 +
participantId.getBytes().length + 4 +
sdpOffer.getBytes().length);
/**
* 0x03 - SDP offer
*/
buffer.put(Outgoing.SDP_OFFER_RETRANSLATE);
buffer.putInt(this.roomId.getBytes().length);
buffer.put(this.roomId.getBytes());
buffer.putInt(participantId.getBytes().length);
buffer.put(participantId.getBytes());
buffer.putInt(sdpOffer.getBytes().length);
buffer.put(sdpOffer.getBytes());
buffer.flip();
this.sfu.getConnection().send(buffer);
}
/**
* Отправляет ICE-кандидата в SFU сервер для одного из участников комнаты.
* @param participantId участник комнаты
* @param iceCandidate кандидат
*/
public void iceCandidate(String participantId, String iceCandidate) {
/**
* 1 байт номер пакета,
* 4 байта длина ID комнаты,
* N байт ID комнаты,
* 4 байта длина ID участника,
* M байт ID участника,
* 4 байта длина ICE кандидата,
* K байт ICE кандидата
*/
ByteBuffer buffer = ByteBuffer.allocate(
1 + 4 +
this.roomId.getBytes().length + 4 +
participantId.getBytes().length + 4 +
iceCandidate.getBytes().length);
/**
* 0x06 - ICE кандидат
*/
buffer.put(Outgoing.ICE_CANDIDATE_RETRANSLATE);
buffer.putInt(this.roomId.getBytes().length);
buffer.put(this.roomId.getBytes());
buffer.putInt(participantId.getBytes().length);
buffer.put(participantId.getBytes());
buffer.putInt(iceCandidate.getBytes().length);
buffer.put(iceCandidate.getBytes());
buffer.flip();
this.sfu.getConnection().send(buffer);
}
/**
* Отправляет SDP answer в SFU сервер для одного из участников комнаты.
* @param participantId участник комнаты
* @param sdpAnswer SDP answer
*/
public void sdpAnswer(String participantId, String sdpAnswer) {
/**
* 1 байт номер пакета,
* 4 байта длина ID комнаты,
* N байт ID комнаты,
* 4 байта длина ID участника,
* M байт ID участника,
* 4 байта длина SDP answer,
* K байт SDP answer
*/
ByteBuffer buffer = ByteBuffer.allocate(
1 + 4 +
this.roomId.getBytes().length + 4 +
participantId.getBytes().length + 4 +
sdpAnswer.getBytes().length);
/**
* 0x07 - SDP answer
*/
buffer.put(Outgoing.SDP_ANSWER_RETRANSLATE);
buffer.putInt(this.roomId.getBytes().length);
buffer.put(this.roomId.getBytes());
buffer.putInt(participantId.getBytes().length);
buffer.put(participantId.getBytes());
buffer.putInt(sdpAnswer.getBytes().length);
buffer.put(sdpAnswer.getBytes());
buffer.flip();
this.sfu.getConnection().send(buffer);
}
}

View File

@@ -0,0 +1,415 @@
package io.g365sfu;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Consumer;
import io.g365sfu.exception.SFUException;
import io.g365sfu.exception.SFUHandshakeException;
import io.g365sfu.net.DisconnectReason;
import io.g365sfu.net.DisconnectedPeer;
import io.g365sfu.net.Incoming;
import io.g365sfu.net.Outgoing;
import io.g365sfu.net.SfuSock;
import io.g365sfu.util.StrUtils;
import io.g365sfu.webrtc.ICECandidate;
import io.g365sfu.webrtc.RTCIceServer;
import io.g365sfu.webrtc.SDPAnswer;
import io.g365sfu.webrtc.SDPOffer;
public class SFU {
private String serverAddress;
private String secretKey;
private SfuSock socket;
private HashMap<String, CompletableFuture<String>> pendingRoomCreations = new HashMap<>();
/**
* Комнаты которые принадлежат этому серверу SFU. Ключом является ID комнаты,
* а значением объект Room, который содержит информацию о комнате, ее участниках и связанном с ней сервере SFU.
*/
private HashMap<String, Room> rooms = new HashMap<>();
/**
* Потребитель для обработки входящих ICE-кандидатов от сервера SFU.
* Этот потребитель будет вызываться при получении сообщения от сервера SFU с кодом 0x04,
* содержащим информацию об ICE-кандидате для одного из участников комнаты.
*/
private Consumer<ICECandidate> onIceCandidate;
/**
* Потребитель для обработки входящих SDP Answer от сервера SFU.
* Этот потребитель будет вызываться при получении сообщения от сервера SFU с кодом 0x05,
* содержащим информацию об SDP Answer для одного из участников комнаты.
*/
private Consumer<SDPAnswer> onSdpAnswer;
private Consumer<SDPOffer> onSdpOffer;
/**
* Потребитель для обработки сообщений от сервера SFU о том, что комната была удалена.
* Передает ID удаленной комнаты, чтобы клиент мог удалить ее из своего списка комнат и прекратить попытки взаимодействия с ней.
*/
private Consumer<String> onDeleteRoom;
/**
* Потребитель для обработки сообщений от сервера SFU о том, что участник отключился от комнаты.
* Передает объект DisconnectedPeer, который содержит информацию об отключившемся участнике и комнате, от которой он отключился
*/
private Consumer<DisconnectedPeer> onPeerDisconnected;
/**
* TURN сервер предоставляемый SFU (если включен), может поддерживать udp,tcp протоколы (несколько ice)
*/
private Set<RTCIceServer> iceServers = new HashSet<>();
/**
* Конструктор для создания объекта SFU, который будет использоваться для установления соединения с SFU сервером.
* @param serverAddress адрес SFU сервера в формате "host:port", например "sfu.example.com:8080"
* @param secretKey секретный ключ для аутентификации с SFU сервером, который должен быть согласован с настройками сервера.
*/
public SFU(String serverAddress, String secretKey) {
this.serverAddress = serverAddress;
this.secretKey = secretKey;
}
/**
* Установить соединение с SFU сервером и начать обмен рукопожатиями для аутентификации и установления безопасного канала связи.
* @throws URISyntaxException если адрес сервера имеет неправильный формат
* @throws InterruptedException если соединение было прервано во время попытки подключения
* @throws SFUException если не удалось установить соединение с SFU сервером или если соединение было установлено,
* но не открыто после подключения
* @throws TimeoutException не удалось обменяться рукопожатиями с SFU сервером в течение 30 секунд
* @throws ExecutionException если во время обмена рукопожатиями произошла ошибка выполнения
* @throws SFUHandshakeException если обмен рукопожатиями с SFU завершился неудачно (например, плохой ключ)
*/
public void connect() throws URISyntaxException, InterruptedException, SFUException, ExecutionException, TimeoutException, SFUHandshakeException {
this.socket = new SfuSock(this.serverAddress);
this.socket.setMessageConsumer(this::onMessage);
boolean connected = this.socket.connectBlocking(30, TimeUnit.SECONDS);
if(!connected){
throw new SFUException("Failed to connect to SFU server, read time out: " + this.serverAddress);
}
if(!this.socket.isOpen()) {
throw new SFUException("Connection to SFU server at " + this.serverAddress + " is not open");
}
boolean estabilished = this.socket.handshakeExchange(this.secretKey).get(30, TimeUnit.SECONDS);
if(!estabilished) {
throw new SFUHandshakeException("Failed to establish handshake with SFU server at " + this.serverAddress);
}
/**
* Спрашиваем про TURN
*/
this.askTurnServer();
}
private void onMessage(ByteBuffer message) {
if(message.remaining() < 1) {
System.err.println("Received empty message from SFU server");
return;
}
byte packetId = message.get(0);
if(packetId == Incoming.ROOM_CREATE) {
/**
* Ответ на создание комнаты, который содержит ID созданной комнаты
*/
int roomIdLength = message.getInt();
byte[] roomIdBytes = new byte[roomIdLength];
message.get(roomIdBytes);
String roomId = new String(roomIdBytes).trim();
CompletableFuture<String> future = this.pendingRoomCreations.remove(roomId);
if(future != null) {
future.complete(roomId);
}
return;
}
if(packetId == Incoming.ICE_CANDIDATE) {
/**
* ICE-candidate от сервера SFU для одного из участников комнаты
*/
int roomidLength = message.getInt();
byte[] roomIdBytes = new byte[roomidLength];
message.get(roomIdBytes);
String roomId = new String(roomIdBytes).trim();
int peerIdLength = message.getInt();
byte[] peerIdBytes = new byte[peerIdLength];
message.get(peerIdBytes);
String peerId = new String(peerIdBytes).trim();
int candidateLength = message.getInt();
byte[] candidateBytes = new byte[candidateLength];
message.get(candidateBytes);
String candidate = new String(candidateBytes).trim();
ICECandidate iceCandidate = new ICECandidate(roomId, peerId, candidate);
if(this.onIceCandidate != null) {
this.onIceCandidate.accept(iceCandidate);
}
return;
}
if(packetId == Incoming.SDP_ANSWER) {
/**
* Ответ на Offer от сервера SFU, который содержит SDP Answer
*/
int roomidLength = message.getInt();
byte[] roomIdBytes = new byte[roomidLength];
message.get(roomIdBytes);
String roomId = new String(roomIdBytes).trim();
int peerIdLength = message.getInt();
byte[] peerIdBytes = new byte[peerIdLength];
message.get(peerIdBytes);
String peerId = new String(peerIdBytes).trim();
int sdpAnswerLength = message.getInt();
byte[] sdpAnswerBytes = new byte[sdpAnswerLength];
message.get(sdpAnswerBytes);
String sdpAnswer = new String(sdpAnswerBytes).trim();
SDPAnswer answer = new SDPAnswer(roomId, peerId, sdpAnswer);
if(this.onSdpAnswer != null) {
this.onSdpAnswer.accept(answer);
}
return;
}
if(packetId == Incoming.SDP_OFFER) {
/**
* Offer от сервера SFU для одного из участников комнаты при renegotiation
*/
int roomidLength = message.getInt();
byte[] roomIdBytes = new byte[roomidLength];
message.get(roomIdBytes);
String roomId = new String(roomIdBytes).trim();
int peerIdLength = message.getInt();
byte[] peerIdBytes = new byte[peerIdLength];
message.get(peerIdBytes);
String peerId = new String(peerIdBytes).trim();
int sdpOfferLength = message.getInt();
byte[] sdpOfferBytes = new byte[sdpOfferLength];
message.get(sdpOfferBytes);
String sdpOffer = new String(sdpOfferBytes).trim();
SDPOffer offer = new SDPOffer(roomId, peerId, sdpOffer);
if(this.onSdpOffer != null) {
this.onSdpOffer.accept(offer);
}
return;
}
if(packetId == Incoming.ROOM_DELETE) {
int roomIdLength = message.getInt();
byte[] roomIdBytes = new byte[roomIdLength];
message.get(roomIdBytes);
String roomId = new String(roomIdBytes).trim();
this.rooms.remove(roomId);
if(this.onDeleteRoom != null) {
this.onDeleteRoom.accept(roomId);
}
}
if(packetId == Incoming.PEER_DISCONNECTED) {
int roomIdLength = message.getInt();
byte[] roomIdBytes = new byte[roomIdLength];
message.get(roomIdBytes);
String roomId = new String(roomIdBytes).trim();
int peerIdLength = message.getInt();
byte[] peerIdBytes = new byte[peerIdLength];
message.get(peerIdBytes);
String peerId = new String(peerIdBytes).trim();
Room room = this.rooms.get(roomId);
DisconnectReason reason = io.g365sfu.net.DisconnectReason.fromCode(message.getInt());
if(room != null) {
/**
* Если такая комната существует то удаляем оттуда участника
*/
room.removeParticipant(peerId);
}
DisconnectedPeer disconnectedPeer = new DisconnectedPeer(peerId, roomId, room, reason);
if(this.onPeerDisconnected != null) {
this.onPeerDisconnected.accept(disconnectedPeer);
}
}
if(packetId == Incoming.TURN_SERVER) {
int urlLength = message.getInt();
byte[] urlBytes = new byte[urlLength];
message.get(urlBytes);
String url = new String(urlBytes).trim();
int usernameLength = message.getInt();
byte[] usernameBytes = new byte[usernameLength];
message.get(usernameBytes);
String username = new String(usernameBytes).trim();
int credentialLength = message.getInt();
byte[] credentialBytes = new byte[credentialLength];
message.get(credentialBytes);
String credential = new String(credentialBytes).trim();
int transportLength = message.getInt();
byte[] transportBytes = new byte[transportLength];
message.get(transportBytes);
String transport = new String(transportBytes).trim();
this.iceServers.add(new RTCIceServer(url, username, credential, transport));
}
}
/**
* Получить адрес SFU сервера, к которому установлено соединение
* @return адрес SFU сервера
*/
public String getServerAddress() {
return this.serverAddress;
}
/**
* Получить соединение к SFU серверу, если оно было установлено
* @return объект SfuSock, представляющий соединение к SFU серверу
*/
public SfuSock getConnection() {
return this.socket;
}
/**
* Проверить, установлено ли соединение с SFU сервером и открыто ли оно
* @return true, если соединение установлено и открыто, false в противном случае
*/
public boolean isOpen() {
return this.socket != null && this.socket.isOpen();
}
/**
* Создает комнату на сервере SFU для организации звонков между пользователями. Комната автоматически удаляется
* при выходе последнего участника из нее. Внутри комнаты пользователи могут обмениваться аудио и видео потоками, а сервер SFU
* будет эффективно их пересылать между участниками, минимизируя задержки и оптимизируя использование пропускной способности.
* @throws TimeoutException
* @throws ExecutionException
* @throws InterruptedException
* @internal Этот метод формирует пакет с кодом 0x02,
* за которым следует случайно сгенерированный идентификатор комнаты,
* и отправляет его на сервер SFU.
*/
public Room createRoom() throws InterruptedException, ExecutionException, TimeoutException {
String roomId = StrUtils.randomString(64);
/**
* 1 байт номер пакета, 4 байта длина ID комнаты, N байт ID комнаты
*/
ByteBuffer buffer = ByteBuffer.allocate(1 + 4 + roomId.getBytes().length);
CompletableFuture<String> future = new CompletableFuture<>();
this.pendingRoomCreations.put(roomId, future);
/**
* 0x02 - создание комнаты
*/
buffer.put(Outgoing.ROOM_CREATE);
buffer.putInt(roomId.getBytes().length);
buffer.put(roomId.getBytes());
buffer.flip();
this.socket.send(buffer);
String createdRoomId = future.get(30, TimeUnit.SECONDS);
Room room = new Room(createdRoomId, this, new HashSet<>());
this.rooms.put(createdRoomId, room);
return room;
}
/**
* После успешного установления соединения и обменом handshake нужно узнать, поддерживает ли наш SFU встроенный TURN
*/
public void askTurnServer() {
ByteBuffer buffer = ByteBuffer.allocate(1);
buffer.put(Outgoing.ASK_TURN);
buffer.flip();
this.socket.send(buffer);
}
/**
* Получить все комнаты на сервере
* @return комнаты на этом сервере
*/
public HashSet<Room> getRooms() {
return new HashSet<>(this.rooms.values());
}
/**
* Получить комнату по ее идентификатору
* @param roomId идентификатор комнаты
* @return объект Room, представляющий комнату с данным идентификатором, или null, если комната не найдена
*/
public Room getRoom(String roomId) {
return this.rooms.get(roomId);
}
/**
* Получить комнату, в которой участвует пользователь с данным идентификатором
* @param participantId идентификатор пользователя, который является участником комнаты
* @return объект Room, представляющий комнату, в которой участвует пользователь с данным идентификатором, или null, если такой комнаты не найдено
*/
public Room getRoomByParticipantId(String participantId) {
for(Room room : this.rooms.values()) {
if(room.containsParticipant(participantId)) {
return room;
}
}
return null;
}
/**
* Получает количество комнат на этом сервере
* @return возвращает количество комнат на сервере
*/
public int getRoomsCount() {
return this.rooms.size();
}
/**
* Устанавливает потребителя для обработки входящих ICE-кандидатов от сервера SFU.
* @param onIceCandidate потребитель, который будет вызываться при получении сообщения от сервера SFU с кодом 0x04,
* содержащим информацию об ICE-кандидате для одного из участников комнаты.
* Параметром будет объект ICECandidate, который содержит информацию о комнате, участнике и самом
* кандидате, необходимую для правильной маршрутизации данных между участниками звонка через сервер SFU.
*/
public void setIceConsumer(Consumer<ICECandidate> onIceCandidate) {
this.onIceCandidate = onIceCandidate;
}
/**
* Устанавливает потребителя для обработки входящих SDP Answer от сервера SFU.
* @param onSdpAnswer потребитель, который будет вызываться при получении сообщения от сервера SFU с кодом 0x05,
* содержащим информацию об SDP Answer для одного из участников комнаты.
* Параметром будет объект SDPAnswer, который содержит информацию о комнате, участнике и самом SDP Answer,
* необходимую для установления медиа-сессии между участником и сервером SFU.
*/
public void setAnswerConsumer(Consumer<SDPAnswer> onSdpAnswer) {
this.onSdpAnswer = onSdpAnswer;
}
/**
* Устанавливает потребителя для обработки входящих SDP Offer от сервера SFU при renegotiation.
* @param onSdpOffer потребитель, который будет вызываться при получении сообщения от сервера SFU с кодом 0x08,
* содержащим информацию об SDP Offer для одного из участников комнаты при renegotiation.
* Параметром будет объект SDPOffer, который содержит информацию о комнате, участнике и самом SDP Offer,
* необходимую для установления медиа-сессии между участником и сервером SFU.
*/
public void setOfferConsumer(Consumer<SDPOffer> onSdpOffer) {
this.onSdpOffer = onSdpOffer;
}
/**
* Устанавливает потребителя для обработки сообщений от сервера SFU о том, что комната была удалена.
* @param onDeleteRoom потребитель, который будет вызываться при получении сообщения от сервера SFU с кодом 0x10,
* содержащим информацию о том, что комната была удалена. Параметром будет строка, содержащая ID удаленной комнаты,
* чтобы клиент мог удалить ее из своего списка комнат и прекратить попытки взаимодействия с ней.
*/
public void setDeleteRoomConsumer(Consumer<String> onDeleteRoom) {
this.onDeleteRoom = onDeleteRoom;
}
/**
* Устанавливает потребителя для обработки сообщений от сервера SFU о том, что участник отключился от комнаты.
* @param onPeerDisconnected потребитель, который будет вызываться при получении сообщения от сервера SFU с кодом 0x11,
*/
public void setPeerDisconnectedConsumer(Consumer<DisconnectedPeer> onPeerDisconnected) {
this.onPeerDisconnected = onPeerDisconnected;
}
/**
* Возвращает TURN сервер на этом SFU
*/
public Set<RTCIceServer> getIceServers() {
return this.iceServers;
}
}

View File

@@ -0,0 +1,13 @@
package io.g365sfu.exception;
public class SFUException extends Exception {
public SFUException(String message) {
super(message);
}
public SFUException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -0,0 +1,13 @@
package io.g365sfu.exception;
public class SFUHandshakeException extends Exception {
public SFUHandshakeException(String message) {
super(message);
}
public SFUHandshakeException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@@ -0,0 +1,25 @@
package io.g365sfu.net;
public enum DisconnectReason {
FAILED(0),
CLOSED(1);
private final int code;
DisconnectReason(int code) {
this.code = code;
}
public int getCode() {
return code;
}
public static DisconnectReason fromCode(int code) {
for (DisconnectReason reason : DisconnectReason.values()) {
if (reason.code == code) {
return reason;
}
}
throw new IllegalArgumentException("Unknown DisconnectReason code: " + code);
}
}

View File

@@ -0,0 +1,34 @@
package io.g365sfu.net;
import io.g365sfu.Room;
public class DisconnectedPeer {
private String peerId;
private Room room;
private String roomId;
private DisconnectReason reason;
public DisconnectedPeer(String peerId, String roomId, Room room, DisconnectReason reason) {
this.peerId = peerId;
this.room = room;
this.roomId = roomId;
this.reason = reason;
}
public String getPeerId() {
return peerId;
}
public Room getRoom() {
return room;
}
public String getRoomId() {
return roomId;
}
public DisconnectReason getReason() {
return reason;
}
}

View File

@@ -0,0 +1,61 @@
package io.g365sfu.net;
/**
* Входящие пакеты от SFU сервера, могут быть инициированы запросом клиента, или отправляться SFU при
* некоторых событиях
*/
public class Incoming {
/**
* Означает, что сервер ответил на рукопожатие, и мы можем считать его успешным
*/
public static final byte HANDSHAKE_OK = (byte) 0x01;
/**
* Сервер ответил на рукопожатие, но по какой-то из причин, отклонил его, например, неправильный
* секретный ключ
*/
public static final byte HANDSHAKE_FAILURE = (byte) 0xFF;
/**
* Ответ от сервера о том, что соединение живое (ответ на проверку соединения), и мы можем считать его успешным
*/
public static final byte CONNECTION_ALIVE = (byte) 0xAE;
/**
* ICE-Кандидат с сервера SFU, который нужно переслать целевому участнику
*/
public static final byte ICE_CANDIDATE = (byte) 0x04;
/**
* SDP offer от сервера SFU, который нужно переслать целевому участнику
*/
public static final byte SDP_OFFER = (byte) 0x08;
/**
* SDP Answer от SFU в ответ на отправленный целевым участником SDP Offer, который нужно переслать целевому участнику
*/
public static final byte SDP_ANSWER = (byte) 0x05;
/**
* Сообщение от сервера SFU о том, тто комната успешно создана
*/
public static final byte ROOM_CREATE = (byte) 0x02;
/**
* Сообщение от сервера SFU о том, что комната была удалена
*/
public static final byte ROOM_DELETE = (byte) 0x10;
/**
* Сообщение об отсоединении участника от сервера SFU, например при обрыве связи
*/
public static final byte PEER_DISCONNECTED = (byte) 0x11;
/**
* Вызывается когда сервер SFU отправляет TURN сервер (если он поддерживается), который может быть использован
* для обмена кандидатами между участниками звонка через NAT.
*/
public static final byte TURN_SERVER = (byte) 0x19;
}

View File

@@ -0,0 +1,53 @@
package io.g365sfu.net;
/**
* Исходящие пакеты к SFU серверу, могут быть отправлены бекендом как в ответ на входящие сообщения от сервера,
* так и по инициативе бекенда
*/
public class Outgoing {
/**
* Рукопожатие с сервером SFU, которое необходимо выполнить перед любыми другими операциями.
* Этот пакет используется для установления начального соединения с сервером SFU, и
* должен быть отправлен первым при подключении к серверу. Он может содержать информацию,
* необходимую для аутентификации (секретный ключ)
*/
public static final byte HANDSHAKE_EXCHANGE = (byte) 0x01;
/**
* Проверка соединения с сервером SFU, которая может быть отправлена бекендом по инициативе бекенда для проверки,
* что соединение с сервером все еще активно.
*/
public static final byte CONNECTION_ALIVE = (byte) 0xAE;
/**
* Ретрансляция ICE-кандидата от одного участника на сервер SFU, чтобы участник мог установить
* соединение с SFU
*/
public static final byte ICE_CANDIDATE_RETRANSLATE = (byte) 0x06;
/**
* Ретрансляция SDP answer от одного участника на сервер SFU, чтобы участник мог установить
* соединение с SFU
*/
public static final byte SDP_ANSWER_RETRANSLATE = (byte) 0x07;
/**
* Ретрансляция SDP offer от одного участника на сервер SFU, чтобы участник мог установить
* соединение с SFU
*/
public static final byte SDP_OFFER_RETRANSLATE = (byte) 0x03;
/**
* Вызывается когда бекенд хочет создать комнату на сервере SFU, и сообщает об этом серверу,
* чтобы сервер создал комнату и был готов к приему участников
*/
public static final byte ROOM_CREATE = (byte) 0x02;
/**
* Вызывается когда бекенд хочет спросить есть ли TURN сервер предоставляемый SFU, сервер ничего не ответит если
* TURN сервер не поддерживается. По умолчанию в G365SFU .env TURN сервер включен.
*/
public static final byte ASK_TURN = (byte) 0x19;
}

View File

@@ -0,0 +1,153 @@
package io.g365sfu.net;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
public class SfuSock extends WebSocketClient {
private CompletableFuture<Boolean> handshakeFuture = new CompletableFuture<>();
private CompletableFuture<Boolean> connectionFuture = new CompletableFuture<>();
private Consumer<ByteBuffer> onMessage;
public SfuSock(String serverAddress) throws URISyntaxException {
super(new URI("ws://" + serverAddress));
}
@Override
public void onOpen(ServerHandshake handshakedata) {
//System.out.println("Connected to SFU server");
}
@Override
public void onMessage(ByteBuffer bytes) {
bytes.order(ByteOrder.BIG_ENDIAN);
if(bytes.remaining() < 1) {
return;
}
byte messageType = bytes.get();
if(messageType == Incoming.HANDSHAKE_OK) {
/**
* Сервер ответил на рукопожатие, и мы можем считать его успешным
*/
this.handshakeFuture.complete(true);
return;
}
if(messageType == Incoming.HANDSHAKE_FAILURE) {
/**
* Сервер отклонил рукопожатие, и мы должны считать его неудачным
*/
this.handshakeFuture.complete(false);
return;
}
if(messageType == Incoming.CONNECTION_ALIVE) {
/**
* Сервер отправил сообщение о том, что соединение живое (ответ на проверку соединения), и мы можем считать его успешным
*/
this.connectionFuture.complete(true);
return;
}
/**
* Если это не сообщение рукопожатия, то мы передаем его в установленного потребителя
*/
if(this.onMessage != null) {
this.onMessage.accept(bytes);
}
}
@Override
public void onMessage(String message) {
//System.err.println("Received unexpected text message from SFU server: " + message);
}
@Override
public void onClose(int code, String reason, boolean remote) {
if(!this.handshakeFuture.isDone()) {
/**
* Если соединение было закрыто до завершения рукопожатия, то мы считаем его неудачным
*/
this.handshakeFuture.complete(false);
}
}
@Override
public void onError(Exception ex){
//System.err.println("Error: " + ex.getMessage());
}
/**
* Запускает обмен рукопожатиями с сервером SFU
* @param secretKey секретный ключ для аутентификации с сервером SFU
* @return CompletableFuture, который будет завершен с результатом true, если
* рукопожатие прошло успешно, или false, если рукопожатие не удалось или было отклонено сервером SFU
* @internal Этот метод отправляет пакет рукопожатия, который состоит из одного байта 0x01, за которым следует секретный ключ в виде строки байтов.
* Сервер SFU должен ответить одним байтом 0x01 для успешного рукопожатия или 0xFF для отклонения рукопожатия.
*/
public CompletableFuture<Boolean> handshakeExchange(String secretKey) {
/**
* 1 байт номер пакета, 4 байта длина секретного ключа, N байт секретный ключ
*/
ByteBuffer buffer = ByteBuffer.allocate(
1 + 4 + secretKey.getBytes().length
);
/**
* 0x01 - код рукопожатия в соотвествии с протоколом g365sfu, за которым следует секретный ключ в виде строки байтов
*/
buffer.put(Outgoing.HANDSHAKE_EXCHANGE);
buffer.putInt(secretKey.getBytes().length);
buffer.put(secretKey.getBytes());
buffer.flip();
/**
* Отправляем сформированный пакет, и возвращаем CompletableFuture
*/
this.send(buffer);
return this.handshakeFuture;
}
/**
* Устанавливает потребителя для обработки входящих текстовых сообщений от сервера SFU.
* @param onMessage потребитель, который будет вызван с текстом сообщения при получении текстового сообщения от сервера SFU.
* @internal Этот метод позволяет установить обработчик для текстовых сообщений от сервера SFU
*/
public void setMessageConsumer(Consumer<ByteBuffer> onMessage) {
this.onMessage = onMessage;
}
/**
* Проверяет состояние соединения, если соединение активно и готово к обмену данными, то возвращает false
* @return
* @throws InterruptedException
*/
public boolean checkConnection() {
ByteBuffer buffer = ByteBuffer.allocate(1);
/**
* 0x08 - код проверки соединения в соотвествии с протоколом g365sfu
*/
buffer.put(Outgoing.CONNECTION_ALIVE);
buffer.flip();
this.send(buffer);
try {
boolean result = this.connectionFuture.get(10, TimeUnit.SECONDS);
this.connectionFuture = new CompletableFuture<>();
return result;
}catch(Exception e){
/**
* Сбрасываем handshakeFuture, так как соединение не активно,
* и нам нужно будет пройти рукопожатие заново при следующей проверке соединения
*/
this.handshakeFuture = new CompletableFuture<>();
this.connectionFuture = new CompletableFuture<>();
return false;
}
}
}

View File

@@ -0,0 +1,15 @@
package io.g365sfu.util;
public class StrUtils {
public static String randomString(int length) {
String chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
StringBuilder sb = new StringBuilder();
for (int i = 0; i < length; i++) {
int index = (int) (Math.random() * chars.length());
sb.append(chars.charAt(index));
}
return sb.toString();
}
}

View File

@@ -0,0 +1,34 @@
package io.g365sfu.webrtc;
/**
* Этот класс представляет собой ICE-кандидат,
* который используется для обмена информацией о сетевых маршрутах между участниками звонка через сервер SFU.
*
* Содержит информацию о комнате, участнике и самом кандидате, которая необходима для правильной маршрутизации данных между участниками звонка через сервер SFU.
*/
public class ICECandidate {
private String roomId;
private String participantId;
private String candidate;
public ICECandidate(String roomId, String participantId, String candidate) {
this.roomId = roomId;
this.participantId = participantId;
this.candidate = candidate;
}
public String getRoomId() {
return roomId;
}
public String getParticipantId() {
return participantId;
}
public String getCandidate() {
return candidate;
}
}

View File

@@ -0,0 +1,54 @@
package io.g365sfu.webrtc;
/**
* Представляет собой объект RTCIceServer который содержит информацию о сервере ICE,
* например TURN
*/
public class RTCIceServer {
private String url;
private String username;
private String credential;
private String transport;
public RTCIceServer(String url, String username, String credential, String transport) {
this.url = url;
this.username = username;
this.credential = credential;
this.transport = transport;
}
/**
* URL сервера ICE, который используется для обмена кандидатами между участниками звонка через сервер SFU.
* @return строка, содержащая URL сервера ICE, который используется для обмена кандидатами между участниками звонка через сервер SFU.
*/
public String getUrl() {
return url;
}
/**
* Имя пользователя для аутентификации на сервере ICE.
* @return строка, содержащая имя пользователя для аутентификации на сервере ICE.
*/
public String getUsername() {
return username;
}
/**
* Учетные данные для аутентификации на сервере ICE.
* @return строка, содержащая учетные данные для аутентификации на сервере ICE.
*/
public String getCredential() {
return credential;
}
/**
* Транспортный протокол, используемый для связи с сервером ICE (например, "udp" или "tcp").
* @return строка, содержащая транспортный протокол, используемый для связи с сервером ICE (например, "udp" или "tcp").
*/
public String getTransport() {
return transport;
}
}

View File

@@ -0,0 +1,31 @@
package io.g365sfu.webrtc;
/**
* Приходит с сервера SFU в ответ на отправленный SDP Offer от участника комнаты.
* Содержит информацию о комнате, участнике и самом SDP Answer, который необходим для установления медиа-сессии
* между участником и сервером SFU
*/
public class SDPAnswer {
private String roomId;
private String participantId;
private String sdp;
public SDPAnswer(String roomId, String participantId, String sdp) {
this.roomId = roomId;
this.participantId = participantId;
this.sdp = sdp;
}
public String getRoomId() {
return roomId;
}
public String getParticipantId() {
return participantId;
}
public String getSdp() {
return sdp;
}
}

View File

@@ -0,0 +1,30 @@
package io.g365sfu.webrtc;
/**
* Приходит от SFU сервера для конкретного участника комнаты. Обычно приходит при renegotiation,
* когда участник комнаты отправляет новый SDP Offer на сервер SFU, а сервер SFU отвечает ему новым SDP Answer,
* который содержит обновленную информацию о медиа-сессии для этого участника.
*/
public class SDPOffer {
private String roomId;
private String participantId;
private String sdp;
public SDPOffer(String roomId, String participantId, String sdp) {
this.roomId = roomId;
this.participantId = participantId;
this.sdp = sdp;
}
public String getRoomId() {
return roomId;
}
public String getParticipantId() {
return participantId;
}
public String getSdp() {
return sdp;
}
}

View File

@@ -237,7 +237,7 @@ public class Server extends WebSocketServer {
client.disconnect(ServerFailures.INACTIVITY_TIMEOUT); client.disconnect(ServerFailures.INACTIVITY_TIMEOUT);
} }
} }
}, this.settings.heartbeatInterval, this.settings.heartbeatInterval, TimeUnit.MILLISECONDS); }, this.settings.heartbeatInterval, this.settings.heartbeatInterval, TimeUnit.SECONDS);
} }
/** /**