diff --git a/.env b/.env index bacac01..c21b809 100644 --- a/.env +++ b/.env @@ -18,3 +18,6 @@ FIREBASE_CREDENTIALS_PATH=serviceAccount.json #Каждые сколько дней будет очищаться буфер (максимальная дистанция синхронизации сообщений) BUFFER_CLEANUP_DAYS=7 +#SFU Сервера +SFU_SERVERS=127.0.0.1:1001@SFU_TEST_SECRET + diff --git a/src/main/java/im/rosetta/Boot.java b/src/main/java/im/rosetta/Boot.java index 1a45a6a..7be744e 100644 --- a/src/main/java/im/rosetta/Boot.java +++ b/src/main/java/im/rosetta/Boot.java @@ -17,6 +17,9 @@ import im.rosetta.executors.Executor21GroupLeave; import im.rosetta.executors.Executor22GroupBan; import im.rosetta.executors.Executor24DeviceResolve; import im.rosetta.executors.Executor25Sync; +import im.rosetta.executors.Executor26SignalPeer; +import im.rosetta.executors.Executor27WebRTC; +import im.rosetta.executors.Executor28IceServers; import im.rosetta.executors.Executor3Search; import im.rosetta.executors.Executor4OnlineState; import im.rosetta.executors.Executor6Message; @@ -44,6 +47,9 @@ import im.rosetta.packet.Packet22GroupBan; import im.rosetta.packet.Packet23DeviceList; import im.rosetta.packet.Packet24DeviceResolve; import im.rosetta.packet.Packet25Sync; +import im.rosetta.packet.Packet26SignalPeer; +import im.rosetta.packet.Packet27WebRTC; +import im.rosetta.packet.Packet28IceServers; import im.rosetta.packet.Packet2Result; import im.rosetta.packet.Packet3Search; import im.rosetta.packet.Packet4OnlineSubscribe; @@ -53,6 +59,7 @@ import im.rosetta.packet.Packet7Read; import im.rosetta.packet.Packet8Delivery; import im.rosetta.packet.Packet9DeviceNew; import im.rosetta.service.services.BufferCleanupService; +import im.rosetta.service.services.ForwardUnitService; import io.orprotocol.Server; import io.orprotocol.Settings; import io.orprotocol.packet.PacketManager; @@ -74,6 +81,7 @@ public class Boot { private ClientManager clientManager; private OnlineManager onlineManager; private BufferCleanupService bufferCleanupService; + private ForwardUnitService forwardUnitService; /** * Конструктор по умолчанию, использует порт 3000 для сервера @@ -105,6 +113,7 @@ public class Boot { int cleanupEveryDays = System.getenv("BUFFER_CLEANUP_DAYS") != null ? Integer.parseInt(System.getenv("BUFFER_CLEANUP_DAYS")) : 7; this.bufferCleanupService = new BufferCleanupService(cleanupEveryDays, this.logger); + this.forwardUnitService = new ForwardUnitService(this.logger, this.clientManager); } /** @@ -151,7 +160,8 @@ public class Boot { this.registerAllEvents(); this.printBootMessage(); this.bufferCleanupService.start(); - return this; + this.forwardUnitService.connectToAllSFUServers(); + return this; }catch(Exception e){ this.logger.error(Color.RED + "Booting error, stack trace:"); e.printStackTrace(); @@ -194,6 +204,9 @@ public class Boot { this.packetManager.registerPacket(23, Packet23DeviceList.class); this.packetManager.registerPacket(24, Packet24DeviceResolve.class); this.packetManager.registerPacket(25, Packet25Sync.class); + this.packetManager.registerPacket(26, Packet26SignalPeer.class); + this.packetManager.registerPacket(27, Packet27WebRTC.class); + this.packetManager.registerPacket(28, Packet28IceServers.class); } private void registerAllExecutors() { @@ -215,6 +228,9 @@ public class Boot { this.packetManager.registerExecutor(22, new Executor22GroupBan()); this.packetManager.registerExecutor(24, new Executor24DeviceResolve(this.clientManager, this.eventManager, this.packetManager)); this.packetManager.registerExecutor(25, new Executor25Sync(this.packetManager)); + this.packetManager.registerExecutor(26, new Executor26SignalPeer(this.clientManager, this.forwardUnitService)); + this.packetManager.registerExecutor(27, new Executor27WebRTC(this.forwardUnitService)); + this.packetManager.registerExecutor(28, new Executor28IceServers(this.forwardUnitService)); } private void printBootMessage() { diff --git a/src/main/java/im/rosetta/database/Repository.java b/src/main/java/im/rosetta/database/Repository.java index 2d63286..eded74f 100644 --- a/src/main/java/im/rosetta/database/Repository.java +++ b/src/main/java/im/rosetta/database/Repository.java @@ -236,7 +236,7 @@ public abstract class Repository { * @param noResultType если true, то не указывать тип результата в запросе, используется для запросов типа UPDATE и DELETE * @return список сущностей */ - @SuppressWarnings("deprecation") + @SuppressWarnings({"unchecked", "deprecation"}) public QuerySession buildQuery(String queryString, HashMap parameters, boolean noResultType) { Session session = HibernateUtil.openSession(); try { diff --git a/src/main/java/im/rosetta/executors/Executor26SignalPeer.java b/src/main/java/im/rosetta/executors/Executor26SignalPeer.java new file mode 100644 index 0000000..9ea8147 --- /dev/null +++ b/src/main/java/im/rosetta/executors/Executor26SignalPeer.java @@ -0,0 +1,79 @@ +package im.rosetta.executors; + +import im.rosetta.Failures; +import im.rosetta.client.ClientManager; +import im.rosetta.client.tags.ECIAuthentificate; +import im.rosetta.packet.Packet26SignalPeer; +import im.rosetta.packet.runtime.NetworkSignalType; +import im.rosetta.service.services.ForwardUnitService; +import io.g365sfu.Room; +import io.orprotocol.ProtocolException; +import io.orprotocol.client.Client; +import io.orprotocol.packet.PacketExecutor; + +/** + * Используется в Peer To Peer звонках, в групповых звонках другой сигналинг + */ +public class Executor26SignalPeer extends PacketExecutor { + + private ClientManager clientManager; + private ForwardUnitService fus; + + public Executor26SignalPeer(ClientManager clientManager, ForwardUnitService fus) { + this.clientManager = clientManager; + this.fus = fus; + } + + @Override + public void onPacketReceived(Packet26SignalPeer packet, Client client) throws Exception, ProtocolException { + ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class); + if (eciAuthentificate == null || !eciAuthentificate.hasAuthorized()) { + /** + * Если клиент не авторизован, то мы не будем обрабатывать его сигналы на анициализацию звонка + * и просто отключим его от сервера. + */ + client.disconnect(Failures.HANDSHAKE_NOT_COMPLETED); + return; + } + NetworkSignalType type = packet.getSignalType(); + if(type == NetworkSignalType.CALL) { + /** + * Инициируется звонок от src к dst, проверяем, что dst не занят другим звонком, если занят, то отправляем сигнал END_CALL_BECAUSE_BUSY обратно src + */ + Room room = this.fus.getRoomByParticipantId(packet.getDst()); + if(room != null) { + /** + * Получатель сигнала уже находится в другой комнате, значит он занят другим звонком, отправляем сигнал END_CALL_BECAUSE_BUSY обратно src + */ + Packet26SignalPeer responsePacket = new Packet26SignalPeer(); + responsePacket.setSignalType(NetworkSignalType.END_CALL_BECAUSE_BUSY); + this.clientManager.sendPacketToAuthorizedPK(packet.getSrc(), responsePacket); + return; + } + } + if(type == NetworkSignalType.CREATE_ROOM){ + /** + * Создается комната для звонка + */ + Room room = this.fus.createRoom(); + room.addParticipant(packet.getSrc()); + room.addParticipant(packet.getDst()); + packet.setRoomId(room.getRoomId()); + /** + * Результат создания комнаты транслируем обоим участникам, чтобы они могли начать обмен WebRTC SDP, и тд + */ + this.clientManager.sendPacketToAuthorizedPK(packet.getSrc(), packet); + this.clientManager.sendPacketToAuthorizedPK(packet.getDst(), packet); + return; + } + /** + * TODO: Проверка на существование получателя + */ + this.clientManager.sendPacketToAuthorizedPK(packet.getDst(), packet); + /** + * TODO: Высокоприоритетный пуш для сигналов звонков, чтобы мобильные устройства могли показать + * интерфейс входящего звонка, даже если приложение находится в фоне + */ + } + +} diff --git a/src/main/java/im/rosetta/executors/Executor27WebRTC.java b/src/main/java/im/rosetta/executors/Executor27WebRTC.java new file mode 100644 index 0000000..4d9aa19 --- /dev/null +++ b/src/main/java/im/rosetta/executors/Executor27WebRTC.java @@ -0,0 +1,73 @@ +package im.rosetta.executors; + +import im.rosetta.Failures; +import im.rosetta.client.tags.ECIAuthentificate; +import im.rosetta.packet.Packet27WebRTC; +import im.rosetta.packet.runtime.NetworkWebRTCType; +import im.rosetta.service.services.ForwardUnitService; +import io.g365sfu.Room; +import io.orprotocol.ProtocolException; +import io.orprotocol.client.Client; +import io.orprotocol.packet.PacketExecutor; + +public class Executor27WebRTC extends PacketExecutor { + + private ForwardUnitService fus; + + public Executor27WebRTC(ForwardUnitService fus) { + this.fus = fus; + } + + @Override + public void onPacketReceived(Packet27WebRTC packet, Client client) throws Exception, ProtocolException { + ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class); + if(eciAuthentificate == null || !eciAuthentificate.hasAuthorized()) { + /** + * Если клиент не авторизован, то мы не будем обрабатывать его сигналы на инициализацию звонка + * и просто отключим его от сервера. + */ + client.disconnect(Failures.HANDSHAKE_NOT_COMPLETED); + return; + } + + String publicKey = eciAuthentificate.getPublicKey(); + + /** + * Так как в комнатах Participants это публичные ключи пользователей, то мы можем + * найти комнату, в которой находится пользователь, по его публичному ключу + */ + Room room = this.fus.getRoomByParticipantId(publicKey); + if(room == null) { + /** + * Если комната не найдена, то мы не будем обрабатывать сигналы для звонка + * и просто отключим клиента от сервера. + */ + client.disconnect(Failures.DATA_MISSMATCH); + return; + } + + NetworkWebRTCType type = packet.getType(); + if(type == NetworkWebRTCType.OFFER) { + /** + * Если это OFFER, то отправляем OFFER на сервер SFU, + * который отвечает за эту комнату, чтобы он транслировал его всем участникам комнаты, кроме отправителя + */ + room.sdpOffer(publicKey, packet.getSdpOrCandidate()); + } + if(type == NetworkWebRTCType.ICE_CANDIDATE) { + /** + * Если это ICE кандидат, то отправляем его на сервер SFU, + * который отвечает за эту комнату, чтобы он транслировал его всем участникам комнаты, кроме отправителя + */ + room.iceCandidate(publicKey, packet.getSdpOrCandidate()); + } + if(type == NetworkWebRTCType.ANSWER) { + /** + * Если это ANSWER, то отправляем его на сервер SFU, + * который отвечает за эту комнату, чтобы он транслировал его всем участникам комнаты, кроме отправителя + */ + room.sdpAnswer(publicKey, packet.getSdpOrCandidate()); + } + } + +} diff --git a/src/main/java/im/rosetta/executors/Executor28IceServers.java b/src/main/java/im/rosetta/executors/Executor28IceServers.java new file mode 100644 index 0000000..e9007b7 --- /dev/null +++ b/src/main/java/im/rosetta/executors/Executor28IceServers.java @@ -0,0 +1,39 @@ +package im.rosetta.executors; + +import java.util.ArrayList; + +import im.rosetta.Failures; +import im.rosetta.client.tags.ECIAuthentificate; +import im.rosetta.packet.Packet28IceServers; +import im.rosetta.service.services.ForwardUnitService; +import io.orprotocol.ProtocolException; +import io.orprotocol.client.Client; +import io.orprotocol.packet.PacketExecutor; + +public class Executor28IceServers extends PacketExecutor { + + private ForwardUnitService fus; + + public Executor28IceServers(ForwardUnitService fus) { + this.fus = fus; + } + + @Override + public void onPacketReceived(Packet28IceServers packet, Client client) throws Exception, ProtocolException { + ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class); + if(eciAuthentificate == null || !eciAuthentificate.hasAuthorized()) { + /** + * Если клиент не авторизован, то мы не будем обрабатывать его запрос на получение ICE серверов + * и просто отключим его от сервера. + */ + client.disconnect(Failures.HANDSHAKE_NOT_COMPLETED); + return; + } + /** + * Берем TURN сервера и отправляем их клиенту + */ + packet.setIceServers(new ArrayList<>(this.fus.getTurnServers())); + client.send(packet); + } + +} diff --git a/src/main/java/im/rosetta/packet/Packet26SignalPeer.java b/src/main/java/im/rosetta/packet/Packet26SignalPeer.java new file mode 100644 index 0000000..ffb37a1 --- /dev/null +++ b/src/main/java/im/rosetta/packet/Packet26SignalPeer.java @@ -0,0 +1,153 @@ +package im.rosetta.packet; + +import im.rosetta.packet.runtime.NetworkSignalType; +import io.orprotocol.Stream; +import io.orprotocol.packet.Packet; + +/** + * Пакет cигналинга для совершения звонка. Учавствует в обмене ключами, + * иницилизации звонка. + */ +public class Packet26SignalPeer extends Packet { + + /** + * Идентификатор отправителя сигнала, обычно это PK пользователя, который отправляет пакет + */ + private String src; + /** + * Идентификатор получателя сигнала, обычно это PK пользователя, который должен принять пакет + */ + private String dst; + + /** + * Если сигнал предназначен для обмена ключами, то в это поле + * будет помещаться sharedPublic публичная часть ключа DH алгоритма + */ + private String sharedPublic; + + /** + * Тип сигнала + */ + private NetworkSignalType signalType; + + /** + * Идентификатор комнаты, в которой происходит звонок, заполняется если тип сигнала CREATE_ROOM, иначе null + */ + private String roomId; + + @Override + public void read(Stream stream) { + this.signalType = NetworkSignalType.fromCode(stream.readInt8()); + if(this.signalType == NetworkSignalType.END_CALL_BECAUSE_BUSY || this.signalType == NetworkSignalType.END_CALL_BECAUSE_PEER_DISCONNECTED) { + return; + } + this.src = stream.readString(); + this.dst = stream.readString(); + if (signalType == NetworkSignalType.KEY_EXCHANGE) { + this.sharedPublic = stream.readString(); + } + if(signalType == NetworkSignalType.CREATE_ROOM) { + this.roomId = stream.readString(); + } + } + + @Override + public Stream write() { + Stream stream = new Stream(); + stream.writeInt16(this.packetId); + stream.writeInt8(this.signalType.getCode()); + if(this.signalType == NetworkSignalType.END_CALL_BECAUSE_BUSY || this.signalType == NetworkSignalType.END_CALL_BECAUSE_PEER_DISCONNECTED) { + return stream; + } + stream.writeString(this.src); + stream.writeString(this.dst); + if (signalType == NetworkSignalType.KEY_EXCHANGE) { + stream.writeString(this.sharedPublic); + } + if(signalType == NetworkSignalType.CREATE_ROOM) { + stream.writeString(this.roomId); + } + return stream; + } + + /** + * Получить идентификатор отправителя сигнала + * @return идентификатор отправителя сигнала + */ + public String getSrc() { + return src; + } + + /** + * Установить идентификатор отправителя сигнала + * @param src идентификатор отправителя сигнала + */ + public void setSrc(String src) { + this.src = src; + } + + /** + * Получить идентификатор получателя сигнала + * @return идентификатор получателя сигнала + */ + public String getDst() { + return dst; + } + + /** + * Установить идентификатор получателя сигнала + * @param dst идентификатор получателя сигнала + */ + public void setDst(String dst) { + this.dst = dst; + } + + /** + * Получить публичную часть ключа DH алгоритма, если сигнал предназначен для обмена ключами + * @return публичная часть ключа DH алгоритма или null, если сигнал не предназначен для обмена ключами + */ + public String getSharedPublic() { + return sharedPublic; + } + + /** + * Установить публичную часть ключа DH алгоритма, если сигнал предназначен для обмена ключами + * @param sharedPublic публичная часть ключа DH алгоритма + */ + public void setSharedPublic(String sharedPublic) { + this.sharedPublic = sharedPublic; + } + + /** + * Получить тип сигнала + * @return тип сигнала + */ + + public NetworkSignalType getSignalType() { + return signalType; + } + + /** + * Установить тип сигнала + * @param signalType тип сигнала + */ + public void setSignalType(NetworkSignalType signalType) { + this.signalType = signalType; + } + + /** + * Получить идентификатор созданной комнаты, если тип сигнала CREATE_ROOM + * @return идентификатор комнаты, если тип сигнала CREATE_ROOM, иначе null + */ + public String getRoomId() { + return roomId; + } + + /** + * Установить идентификатор комнаты, в которой происходит звонок, если тип сигнала CREATE_ROOM + * @param roomId идентификатор комнаты, если тип сигнала CREATE_ROOM + */ + public void setRoomId(String roomId) { + this.roomId = roomId; + } +} diff --git a/src/main/java/im/rosetta/packet/Packet27WebRTC.java b/src/main/java/im/rosetta/packet/Packet27WebRTC.java new file mode 100644 index 0000000..611c9a2 --- /dev/null +++ b/src/main/java/im/rosetta/packet/Packet27WebRTC.java @@ -0,0 +1,65 @@ +package im.rosetta.packet; + +import im.rosetta.packet.runtime.NetworkWebRTCType; +import io.orprotocol.Stream; +import io.orprotocol.packet.Packet; + +public class Packet27WebRTC extends Packet { + /** + * SDP оффер/answer или ICE кандидат, в зависимости от типа сообщения + */ + private String sdpOrCandidate; + /** + * Тип сообщения WebRTC + */ + private NetworkWebRTCType type; + + @Override + public void read(Stream stream) { + this.type = NetworkWebRTCType.fromCode(stream.readInt8()); + this.sdpOrCandidate = stream.readString(); + } + + @Override + public Stream write() { + Stream steram = new Stream(); + steram.writeInt16(this.packetId); + steram.writeInt8(this.type.getCode()); + steram.writeString(this.sdpOrCandidate); + return steram; + } + + /** + * Получить SDP оффер/answer или ICE кандидат, в зависимости от типа сообщения + * @return SDP оффер/answer или ICE кандидат + */ + public String getSdpOrCandidate() { + return sdpOrCandidate; + } + + /** + * Получить тип сообщения WebRTC, который указывает на то, является ли это оффером, ответом на оффер или ICE кандидатом + * @return тип сообщения WebRTC + */ + public NetworkWebRTCType getType() { + return type; + } + + /** + * Установить SDP оффер/answer или ICE кандидат, в зависимости от типа сообщения + * @param sdpOrCandidate SDP оффер/answer или ICE кандидат + */ + public void setSdpOrCandidate(String sdpOrCandidate) { + this.sdpOrCandidate = sdpOrCandidate; + } + + /** + * Установить тип сообщения WebRTC, который указывает на то, является ли это оффером, ответом на оффер или ICE кандидатом + * @param type тип сообщения WebRTC + */ + public void setType(NetworkWebRTCType type) { + this.type = type; + } + + +} diff --git a/src/main/java/im/rosetta/packet/Packet28IceServers.java b/src/main/java/im/rosetta/packet/Packet28IceServers.java new file mode 100644 index 0000000..ef4f26a --- /dev/null +++ b/src/main/java/im/rosetta/packet/Packet28IceServers.java @@ -0,0 +1,58 @@ +package im.rosetta.packet; + +import java.util.ArrayList; +import java.util.List; + +import io.g365sfu.webrtc.RTCIceServer; +import io.orprotocol.Stream; +import io.orprotocol.packet.Packet; + +public class Packet28IceServers extends Packet { + + private List iceServers; + + @Override + public void read(Stream stream) { + int count = stream.readInt16(); + this.iceServers = new ArrayList<>(); + for (int i = 0; i < count; i++) { + String url = stream.readString(); + String username = stream.readString(); + String credential = stream.readString(); + String transport = stream.readString(); + RTCIceServer iceServer = new RTCIceServer(url, username, credential, transport); + iceServers.add(iceServer); + } + } + + @Override + public Stream write() { + Stream stream = new Stream(); + stream.writeInt16(this.packetId); + stream.writeInt16(iceServers.size()); + for (RTCIceServer iceServer : iceServers) { + stream.writeString(iceServer.getUrl()); + stream.writeString(iceServer.getUsername()); + stream.writeString(iceServer.getCredential()); + stream.writeString(iceServer.getTransport()); + } + return stream; + } + + /** + * Получить список серверов ICE, которые могут быть использованы для обмена кандидатами между участниками звонка через сервер SFU. + * @return список серверов ICE, которые могут быть использованы для обмена кандидатами между участниками звонка через сервер SFU. + */ + public List getIceServers() { + return iceServers; + } + + /** + * Установить список серверов ICE, которые могут быть использованы для обмена кандидатами между участниками звонка через сервер SFU. + * @param iceServers список серверов ICE, которые могут быть использованы для обмена кандидатами между участниками звонка через сервер SFU. + */ + public void setIceServers(List iceServers) { + this.iceServers = iceServers; + } + +} diff --git a/src/main/java/im/rosetta/packet/runtime/NetworkSignalType.java b/src/main/java/im/rosetta/packet/runtime/NetworkSignalType.java new file mode 100644 index 0000000..7ee79bd --- /dev/null +++ b/src/main/java/im/rosetta/packet/runtime/NetworkSignalType.java @@ -0,0 +1,54 @@ +package im.rosetta.packet.runtime; + +/** + * Типы сигналов, используемых в сетевом взаимодействии при звонках + */ +public enum NetworkSignalType { + /** + * Сигнал для совершения звонка, инициирует процесс звонка + */ + CALL(0), + /** + * Сигнал для обмена ключами, используется для обмена DH ключами между участниками звонка + */ + KEY_EXCHANGE(1), + /** + * Сигнал для активного звонка, указывает на то, что звонок активен и участники могут обмениваться данными + */ + ACTIVE_CALL(2), + /** + * Сигнал для завершения звонка, указывает на то, что звонок завершен и участники должны прекратить обмен данными + */ + END_CALL(3), + /** + * Создание комнаты + */ + CREATE_ROOM(4), + /** + * Обрыв связи с пиром + */ + END_CALL_BECAUSE_PEER_DISCONNECTED(5), + /** + * Не удалось дозвониться - пользователь занят другим звонком + */ + END_CALL_BECAUSE_BUSY(6); + + private final int code; + + NetworkSignalType(int code) { + this.code = code; + } + + public int getCode() { + return code; + } + + public static NetworkSignalType fromCode(int code) { + for (NetworkSignalType type : NetworkSignalType.values()) { + if (type.code == code) { + return type; + } + } + throw new IllegalArgumentException("Unknown NetworkSignalType code: " + code); + } +} diff --git a/src/main/java/im/rosetta/packet/runtime/NetworkWebRTCType.java b/src/main/java/im/rosetta/packet/runtime/NetworkWebRTCType.java new file mode 100644 index 0000000..964477a --- /dev/null +++ b/src/main/java/im/rosetta/packet/runtime/NetworkWebRTCType.java @@ -0,0 +1,36 @@ +package im.rosetta.packet.runtime; + +public enum NetworkWebRTCType { + /** + * Оффер + */ + OFFER(0), + /** + * Ответ на оффер + */ + ANSWER(1), + /** + * ICE кандидат + */ + ICE_CANDIDATE(2); + + private final int code; + + NetworkWebRTCType(int code) { + this.code = code; + } + + public int getCode() { + return code; + } + + public static NetworkWebRTCType fromCode(int code) { + for (NetworkWebRTCType type : NetworkWebRTCType.values()) { + if (type.code == code) { + return type; + } + } + throw new IllegalArgumentException("Unknown NetworkWebRTCType code: " + code); + } + +} diff --git a/src/main/java/im/rosetta/service/services/ForwardUnitService.java b/src/main/java/im/rosetta/service/services/ForwardUnitService.java new file mode 100644 index 0000000..b07a9ab --- /dev/null +++ b/src/main/java/im/rosetta/service/services/ForwardUnitService.java @@ -0,0 +1,259 @@ +package im.rosetta.service.services; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import im.rosetta.client.ClientManager; +import im.rosetta.logger.Logger; +import im.rosetta.logger.enums.Color; +import im.rosetta.packet.Packet26SignalPeer; +import im.rosetta.packet.Packet27WebRTC; +import im.rosetta.packet.runtime.NetworkSignalType; +import im.rosetta.packet.runtime.NetworkWebRTCType; +import io.g365sfu.Room; +import io.g365sfu.SFU; +import io.g365sfu.net.DisconnectReason; +import io.g365sfu.net.DisconnectedPeer; +import io.g365sfu.webrtc.ICECandidate; +import io.g365sfu.webrtc.RTCIceServer; +import io.g365sfu.webrtc.SDPAnswer; +import io.g365sfu.webrtc.SDPOffer; +import io.orprotocol.ProtocolException; + +/** + * Это сервис который взаимодействуют с SFU серверами для организации звонков между пользователями. + */ +public class ForwardUnitService { + + private Logger logger; + private Set sfuConnections = ConcurrentHashMap.newKeySet(); + private ClientManager clientManager; + private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); + + public ForwardUnitService(Logger logger, ClientManager clientManager) { + this.logger = logger; + this.clientManager = clientManager; + this.sfuConnectionsSheduler(); + } + + /** + * Каждые 10 секунд проверяет доступность всех SFU серверов, и если сервер недоступен, то удаляет его из + * пула доступных серверов для организации звонков. + * Проверка доступности сервера осуществляется через отправку специального сообщения проверки соединения, + * и ожидание ответа от сервера. Если ответ не приходит в течение 10 секунд, то сервер считается недоступным. + * + * Так же, если sfuConnections пустой, то мы стараемся установить соединение со всеми серверами еще раз. + */ + private void sfuConnectionsSheduler() { + this.scheduler.scheduleAtFixedRate(() -> { + for(SFU sfu : this.sfuConnections) { + try{ + if(!sfu.getConnection().checkConnection()){ + this.logger.error("Server " + sfu.getServerAddress() + " not responding"); + this.sfuConnections.remove(sfu); + } + }catch(Exception e) { + this.logger.error("Failed to check connection to SFU server: " + sfu.getServerAddress() + ", error: " + e.getMessage()); + this.sfuConnections.remove(sfu); + } + } + if(this.sfuConnections.isEmpty()) { + this.connectToAllSFUServers(); + } + }, 10, 10, TimeUnit.SECONDS); + } + + /** + * Инициализирует соединения к SFU серверам для звонков. + * Ожидается, что адреса SFU серверов и секретные ключи для них будут переданы через переменную окружения SFU_SERVERS в формате "address1@secretKey1,address2@secretKey2,...". + * Для каждого сервера будет предпринята попытка установить соединение и выполнить рукопожатие. + * Если соединение не может быть установлено или рукопожатие не удается, + * будет выведено сообщение об ошибке в лог, но процесс продолжится для остальных серверов. + * Успешные подключения будут сохранены в наборе sfuConnections для дальнейшего использования + */ + public void connectToAllSFUServers() { + String sfuServersEnv = System.getenv("SFU_SERVERS"); + if(sfuServersEnv == null || sfuServersEnv.isEmpty()) { + this.logger.info(Color.YELLOW + "No SFU servers configured, skipping SFU connections boot"); + return; + } + String[] sfuServers = sfuServersEnv.split(","); + for(String sfuServer : sfuServers) { + String[] parts = sfuServer.split("@"); + if(parts.length != 2) { + this.logger.error(Color.RED + "Invalid SFU server configuration: " + sfuServer); + continue; + } + String address = parts[0]; + String secretKey = parts[1]; + try { + SFU connection = new SFU(address, secretKey); + connection.connect(); + connection.setIceConsumer(arg0 -> { + try { + onIceCandidate(arg0); + } catch (ProtocolException e) { + this.logger.error(Color.RED + "Failed to retranslate ICE-candidate from SFU server: " + address + ", error: " + e.getMessage()); + e.printStackTrace(); + } + }); + connection.setAnswerConsumer(arg0 -> { + try{ + onSdpAnswer(arg0); + }catch(ProtocolException e){ + this.logger.error(Color.RED + "Failed to retranslate SDP answer from SFU server: " + address + ", error: " + e.getMessage()); + e.printStackTrace(); + } + }); + connection.setOfferConsumer(arg0 -> { + try{ + onSdpOffer(arg0); + }catch(ProtocolException e){ + this.logger.error(Color.RED + "Failed to retranslate SDP offer from SFU server: " + address + ", error: " + e.getMessage()); + e.printStackTrace(); + } + }); + connection.setPeerDisconnectedConsumer(arg0 -> { + try{ + onPeerDisconnected(arg0); + }catch(ProtocolException e){ + this.logger.error(Color.RED + "Failed to handle peer disconnection from SFU server: " + address + ", error: " + e.getMessage()); + e.printStackTrace(); + } + }); + this.sfuConnections.add(connection); + this.logger.info(Color.GREEN + "Successfully connected to SFU server: " + address); + } catch (Exception e) { + //this.logger.error(Color.RED + "Failed to connect to SFU server: " + address + ", error: " + e.getMessage()); + } + } + } + + public void onPeerDisconnected(DisconnectedPeer disconnectedPeer) throws ProtocolException { + Room room = disconnectedPeer.getRoom(); + if(disconnectedPeer.getReason() != DisconnectReason.FAILED){ + /** + * Если у нас произошло штатное отключение, а не в результате обрыва связи - то не нужно отправлять + * оппонентам пакеты о том, что участник отключился в результате обрыва связи. + */ + return; + } + for(String peerId : room.getParticipants()) { + /** + * Уведомляем все пиры, что соединение с пиром было потеряно + */ + if(room.getParticipants().size() == 1) { + /** + * Звонок был завершен, так как в комнате остался только один участник, который не может продолжать звонок в одиночку. + */ + Packet26SignalPeer packet = new Packet26SignalPeer(); + packet.setSignalType(NetworkSignalType.END_CALL_BECAUSE_PEER_DISCONNECTED); + this.clientManager.sendPacketToAuthorizedPK(peerId, packet); + } + } + } + + /** + * Выполняется когда сервер SFU отправляет SDP answer для одного из участников комнаты. + * @param sdpAnswer объект SDPAnswer, который содержит информацию о комнате, участнике и самом answer, + * @throws ProtocolException + */ + public void onSdpAnswer(SDPAnswer sdpAnswer) throws ProtocolException { + String participantId = sdpAnswer.getParticipantId(); + Packet27WebRTC packet = new Packet27WebRTC(); + packet.setSdpOrCandidate(sdpAnswer.getSdp()); + packet.setType(NetworkWebRTCType.ANSWER); + this.clientManager.sendPacketToAuthorizedPK(participantId, packet); + } + + /** + * Выполняется когда сервер SFU отправляет ICE-кандидата для одного из участников комнаты. + * @param iceCandidate объект ICECandidate, + * который содержит информацию о комнате, участнике и самом кандидате, + * которая необходима для правильной маршрутизации данных между участниками звонка через сервер SFU. + * @throws ProtocolException + */ + public void onIceCandidate(ICECandidate iceCandidate) throws ProtocolException { + String publicKey = iceCandidate.getParticipantId(); + Packet27WebRTC packet = new Packet27WebRTC(); + packet.setSdpOrCandidate(iceCandidate.getCandidate()); + packet.setType(NetworkWebRTCType.ICE_CANDIDATE); + this.clientManager.sendPacketToAuthorizedPK(publicKey, packet); + } + + /** + * Выполняется когда сервер SFU отправляет SDP оффер для одного из участников комнаты. + * @param sdpOffer объект SDPOffer, + * который содержит информацию о комнате, участнике и самом оффере, + * которая необходима для правильной маршрутизации данных между участниками звонка через сервер SFU. + * @throws ProtocolException + */ + public void onSdpOffer(SDPOffer sdpOffer) throws ProtocolException { + String participantId = sdpOffer.getParticipantId(); + Packet27WebRTC packet = new Packet27WebRTC(); + packet.setSdpOrCandidate(sdpOffer.getSdp()); + packet.setType(NetworkWebRTCType.OFFER); + this.clientManager.sendPacketToAuthorizedPK(participantId, packet); + } + + /** + * Получает комнату в которой сейчас находится пользователь + * @param participantId идентификатор пользователя на сервере SFU + * @return комната Room если найдена, иначе null + */ + public Room getRoomByParticipantId(String participantId) { + for(SFU sfu : this.sfuConnections){ + Room room = sfu.getRoomByParticipantId(participantId); + if(room != null){ + return room; + } + } + return null; + } + + /** + * Автоматически выбирает сервер для создания комнаты, и создает в нем комнату + * @return комната + */ + public Room createRoom() { + if(this.sfuConnections.isEmpty()) { + return null; + } + SFU selectedSfu = null; + int minRooms = Integer.MAX_VALUE; + for(SFU sfu : this.sfuConnections) { + int roomsCount = sfu.getRoomsCount(); + if(roomsCount < minRooms) { + minRooms = roomsCount; + selectedSfu = sfu; + } + } + if(selectedSfu != null) { + try{ + return selectedSfu.createRoom(); + }catch(Exception e){ + this.logger.error(Color.RED + "Failed to create room on SFU server: " + selectedSfu.getServerAddress() + ", error: " + e.getMessage()); + } + } + return null; + } + + /** + * Получить список TURN серверов, которые могут быть использованы для обмена кандидатами между участниками звонка через сервер SFU. + * @return список серверов для RTC + */ + public Set getTurnServers() { + Set iceServers = new HashSet<>(); + for(SFU sfu : this.sfuConnections) { + Set iceServersSupporetd = sfu.getIceServers(); + iceServers.addAll(iceServersSupporetd); + } + return iceServers; + } + + +} diff --git a/src/main/java/io/g365sfu/Room.java b/src/main/java/io/g365sfu/Room.java new file mode 100644 index 0000000..9784c33 --- /dev/null +++ b/src/main/java/io/g365sfu/Room.java @@ -0,0 +1,160 @@ +package io.g365sfu; + +import java.nio.ByteBuffer; +import java.util.HashSet; + +import io.g365sfu.net.Outgoing; + +/** + * Это комната для звонков, она может быть как для двоих участников, так и для групповых звонков. + * Комната содержит в себе информацию о том, на каком она SFU сервере, какой у нее ID, кто в ней участвует. + */ +public class Room { + private String roomId; + private SFU sfu; + private HashSet participants; + + /** + * Создать комнату с заданным ID, SFU сервером и участниками + * @param roomId уникальный идентификатор комнаты, который должен быть согласован с SFU сервером и использоваться для всех операций с этой комнатой + * @param sfu SFU сервер, на котором будет создана комната и через который будет происходить обмен данными между участниками + * @param participants массив идентификаторов участников, которые будут добавлены в комнату. Идентификаторы должны быть согласованы с SFU сервером и использоваться для маршрутизации данных между участниками. + */ + public Room(String roomId, SFU sfu, HashSet participants) { + this.roomId = roomId; + this.sfu = sfu; + this.participants = participants; + } + + public String getRoomId() { + return roomId; + } + + public SFU getSfu() { + return sfu; + } + + public HashSet getParticipants() { + return participants; + } + + public void addParticipant(String participantId) { + this.participants.add(participantId); + } + + public void removeParticipant(String participantId) { + this.participants.remove(participantId); + } + + public boolean containsParticipant(String participantId) { + return this.participants.contains(participantId); + } + + /** + * Отправляет SDP offer в SFU сервер для организации звонка между пользователями. + * Этот метод используется для отправки предложения о соединении от одного участника + * к другому через сервер SFU, который будет пересылать это предложение целевому участнику. + * Параметры roomId и peerId используются для идентификации комнаты и целевого участника, + * а sdpOffer содержит описание медиа-сессии, которую участник хочет установить. + * @param roomId идентификатор комнаты, в которой участник хочет организовать звонок + * @param peerId идентификатор целевого участника, которому отправляется предложение о соединении + * @param sdpOffer строка, содержащая SDP offer, которая описывает медиа-сессию, которую участник хочет установить с целевым участником. + * @internal Этот метод формирует пакет с кодом 0x04, за которым следует идентификатор комнаты, идентификатор целевого участника и строка SDP offer. + */ + public void sdpOffer(String participantId, String sdpOffer) { + /** + * 1 байт номер пакета, + * 4 байта длина ID комнаты, + * N байт ID комнаты, + * 4 байта длина ID участника, + * M байт ID участника, + * 4 байта длина SDP offer, + * K байт SDP offer + */ + ByteBuffer buffer = ByteBuffer.allocate( + 1 + 4 + + this.roomId.getBytes().length + 4 + + participantId.getBytes().length + 4 + + sdpOffer.getBytes().length); + /** + * 0x03 - SDP offer + */ + buffer.put(Outgoing.SDP_OFFER_RETRANSLATE); + buffer.putInt(this.roomId.getBytes().length); + buffer.put(this.roomId.getBytes()); + buffer.putInt(participantId.getBytes().length); + buffer.put(participantId.getBytes()); + buffer.putInt(sdpOffer.getBytes().length); + buffer.put(sdpOffer.getBytes()); + buffer.flip(); + this.sfu.getConnection().send(buffer); + } + + /** + * Отправляет ICE-кандидата в SFU сервер для одного из участников комнаты. + * @param participantId участник комнаты + * @param iceCandidate кандидат + */ + public void iceCandidate(String participantId, String iceCandidate) { + /** + * 1 байт номер пакета, + * 4 байта длина ID комнаты, + * N байт ID комнаты, + * 4 байта длина ID участника, + * M байт ID участника, + * 4 байта длина ICE кандидата, + * K байт ICE кандидата + */ + ByteBuffer buffer = ByteBuffer.allocate( + 1 + 4 + + this.roomId.getBytes().length + 4 + + participantId.getBytes().length + 4 + + iceCandidate.getBytes().length); + /** + * 0x06 - ICE кандидат + */ + buffer.put(Outgoing.ICE_CANDIDATE_RETRANSLATE); + buffer.putInt(this.roomId.getBytes().length); + buffer.put(this.roomId.getBytes()); + buffer.putInt(participantId.getBytes().length); + buffer.put(participantId.getBytes()); + buffer.putInt(iceCandidate.getBytes().length); + buffer.put(iceCandidate.getBytes()); + buffer.flip(); + this.sfu.getConnection().send(buffer); + } + + /** + * Отправляет SDP answer в SFU сервер для одного из участников комнаты. + * @param participantId участник комнаты + * @param sdpAnswer SDP answer + */ + public void sdpAnswer(String participantId, String sdpAnswer) { + /** + * 1 байт номер пакета, + * 4 байта длина ID комнаты, + * N байт ID комнаты, + * 4 байта длина ID участника, + * M байт ID участника, + * 4 байта длина SDP answer, + * K байт SDP answer + */ + ByteBuffer buffer = ByteBuffer.allocate( + 1 + 4 + + this.roomId.getBytes().length + 4 + + participantId.getBytes().length + 4 + + sdpAnswer.getBytes().length); + /** + * 0x07 - SDP answer + */ + buffer.put(Outgoing.SDP_ANSWER_RETRANSLATE); + buffer.putInt(this.roomId.getBytes().length); + buffer.put(this.roomId.getBytes()); + buffer.putInt(participantId.getBytes().length); + buffer.put(participantId.getBytes()); + buffer.putInt(sdpAnswer.getBytes().length); + buffer.put(sdpAnswer.getBytes()); + buffer.flip(); + this.sfu.getConnection().send(buffer); + } +} \ No newline at end of file diff --git a/src/main/java/io/g365sfu/SFU.java b/src/main/java/io/g365sfu/SFU.java new file mode 100644 index 0000000..370922c --- /dev/null +++ b/src/main/java/io/g365sfu/SFU.java @@ -0,0 +1,415 @@ +package io.g365sfu; + +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Consumer; + +import io.g365sfu.exception.SFUException; +import io.g365sfu.exception.SFUHandshakeException; +import io.g365sfu.net.DisconnectReason; +import io.g365sfu.net.DisconnectedPeer; +import io.g365sfu.net.Incoming; +import io.g365sfu.net.Outgoing; +import io.g365sfu.net.SfuSock; +import io.g365sfu.util.StrUtils; +import io.g365sfu.webrtc.ICECandidate; +import io.g365sfu.webrtc.RTCIceServer; +import io.g365sfu.webrtc.SDPAnswer; +import io.g365sfu.webrtc.SDPOffer; + + +public class SFU { + + private String serverAddress; + private String secretKey; + private SfuSock socket; + + private HashMap> pendingRoomCreations = new HashMap<>(); + /** + * Комнаты которые принадлежат этому серверу SFU. Ключом является ID комнаты, + * а значением объект Room, который содержит информацию о комнате, ее участниках и связанном с ней сервере SFU. + */ + private HashMap rooms = new HashMap<>(); + + /** + * Потребитель для обработки входящих ICE-кандидатов от сервера SFU. + * Этот потребитель будет вызываться при получении сообщения от сервера SFU с кодом 0x04, + * содержащим информацию об ICE-кандидате для одного из участников комнаты. + */ + private Consumer onIceCandidate; + /** + * Потребитель для обработки входящих SDP Answer от сервера SFU. + * Этот потребитель будет вызываться при получении сообщения от сервера SFU с кодом 0x05, + * содержащим информацию об SDP Answer для одного из участников комнаты. + */ + private Consumer onSdpAnswer; + + private Consumer onSdpOffer; + + /** + * Потребитель для обработки сообщений от сервера SFU о том, что комната была удалена. + * Передает ID удаленной комнаты, чтобы клиент мог удалить ее из своего списка комнат и прекратить попытки взаимодействия с ней. + */ + private Consumer onDeleteRoom; + + /** + * Потребитель для обработки сообщений от сервера SFU о том, что участник отключился от комнаты. + * Передает объект DisconnectedPeer, который содержит информацию об отключившемся участнике и комнате, от которой он отключился + */ + private Consumer onPeerDisconnected; + + /** + * TURN сервер предоставляемый SFU (если включен), может поддерживать udp,tcp протоколы (несколько ice) + */ + private Set iceServers = new HashSet<>(); + + /** + * Конструктор для создания объекта SFU, который будет использоваться для установления соединения с SFU сервером. + * @param serverAddress адрес SFU сервера в формате "host:port", например "sfu.example.com:8080" + * @param secretKey секретный ключ для аутентификации с SFU сервером, который должен быть согласован с настройками сервера. + */ + public SFU(String serverAddress, String secretKey) { + this.serverAddress = serverAddress; + this.secretKey = secretKey; + } + + /** + * Установить соединение с SFU сервером и начать обмен рукопожатиями для аутентификации и установления безопасного канала связи. + * @throws URISyntaxException если адрес сервера имеет неправильный формат + * @throws InterruptedException если соединение было прервано во время попытки подключения + * @throws SFUException если не удалось установить соединение с SFU сервером или если соединение было установлено, + * но не открыто после подключения + * @throws TimeoutException не удалось обменяться рукопожатиями с SFU сервером в течение 30 секунд + * @throws ExecutionException если во время обмена рукопожатиями произошла ошибка выполнения + * @throws SFUHandshakeException если обмен рукопожатиями с SFU завершился неудачно (например, плохой ключ) + */ + public void connect() throws URISyntaxException, InterruptedException, SFUException, ExecutionException, TimeoutException, SFUHandshakeException { + this.socket = new SfuSock(this.serverAddress); + this.socket.setMessageConsumer(this::onMessage); + boolean connected = this.socket.connectBlocking(30, TimeUnit.SECONDS); + if(!connected){ + throw new SFUException("Failed to connect to SFU server, read time out: " + this.serverAddress); + } + if(!this.socket.isOpen()) { + throw new SFUException("Connection to SFU server at " + this.serverAddress + " is not open"); + } + boolean estabilished = this.socket.handshakeExchange(this.secretKey).get(30, TimeUnit.SECONDS); + if(!estabilished) { + throw new SFUHandshakeException("Failed to establish handshake with SFU server at " + this.serverAddress); + } + /** + * Спрашиваем про TURN + */ + this.askTurnServer(); + } + + private void onMessage(ByteBuffer message) { + if(message.remaining() < 1) { + System.err.println("Received empty message from SFU server"); + return; + } + byte packetId = message.get(0); + if(packetId == Incoming.ROOM_CREATE) { + /** + * Ответ на создание комнаты, который содержит ID созданной комнаты + */ + int roomIdLength = message.getInt(); + byte[] roomIdBytes = new byte[roomIdLength]; + message.get(roomIdBytes); + String roomId = new String(roomIdBytes).trim(); + CompletableFuture future = this.pendingRoomCreations.remove(roomId); + if(future != null) { + future.complete(roomId); + } + return; + } + if(packetId == Incoming.ICE_CANDIDATE) { + /** + * ICE-candidate от сервера SFU для одного из участников комнаты + */ + int roomidLength = message.getInt(); + byte[] roomIdBytes = new byte[roomidLength]; + message.get(roomIdBytes); + String roomId = new String(roomIdBytes).trim(); + int peerIdLength = message.getInt(); + byte[] peerIdBytes = new byte[peerIdLength]; + message.get(peerIdBytes); + String peerId = new String(peerIdBytes).trim(); + int candidateLength = message.getInt(); + byte[] candidateBytes = new byte[candidateLength]; + message.get(candidateBytes); + String candidate = new String(candidateBytes).trim(); + ICECandidate iceCandidate = new ICECandidate(roomId, peerId, candidate); + if(this.onIceCandidate != null) { + this.onIceCandidate.accept(iceCandidate); + } + return; + } + if(packetId == Incoming.SDP_ANSWER) { + /** + * Ответ на Offer от сервера SFU, который содержит SDP Answer + */ + int roomidLength = message.getInt(); + byte[] roomIdBytes = new byte[roomidLength]; + message.get(roomIdBytes); + String roomId = new String(roomIdBytes).trim(); + int peerIdLength = message.getInt(); + byte[] peerIdBytes = new byte[peerIdLength]; + message.get(peerIdBytes); + String peerId = new String(peerIdBytes).trim(); + int sdpAnswerLength = message.getInt(); + byte[] sdpAnswerBytes = new byte[sdpAnswerLength]; + message.get(sdpAnswerBytes); + String sdpAnswer = new String(sdpAnswerBytes).trim(); + SDPAnswer answer = new SDPAnswer(roomId, peerId, sdpAnswer); + if(this.onSdpAnswer != null) { + this.onSdpAnswer.accept(answer); + } + return; + } + if(packetId == Incoming.SDP_OFFER) { + /** + * Offer от сервера SFU для одного из участников комнаты при renegotiation + */ + int roomidLength = message.getInt(); + byte[] roomIdBytes = new byte[roomidLength]; + message.get(roomIdBytes); + String roomId = new String(roomIdBytes).trim(); + int peerIdLength = message.getInt(); + byte[] peerIdBytes = new byte[peerIdLength]; + message.get(peerIdBytes); + String peerId = new String(peerIdBytes).trim(); + int sdpOfferLength = message.getInt(); + byte[] sdpOfferBytes = new byte[sdpOfferLength]; + message.get(sdpOfferBytes); + String sdpOffer = new String(sdpOfferBytes).trim(); + SDPOffer offer = new SDPOffer(roomId, peerId, sdpOffer); + if(this.onSdpOffer != null) { + this.onSdpOffer.accept(offer); + } + return; + } + if(packetId == Incoming.ROOM_DELETE) { + int roomIdLength = message.getInt(); + byte[] roomIdBytes = new byte[roomIdLength]; + message.get(roomIdBytes); + String roomId = new String(roomIdBytes).trim(); + this.rooms.remove(roomId); + if(this.onDeleteRoom != null) { + this.onDeleteRoom.accept(roomId); + } + } + if(packetId == Incoming.PEER_DISCONNECTED) { + int roomIdLength = message.getInt(); + byte[] roomIdBytes = new byte[roomIdLength]; + message.get(roomIdBytes); + String roomId = new String(roomIdBytes).trim(); + int peerIdLength = message.getInt(); + byte[] peerIdBytes = new byte[peerIdLength]; + message.get(peerIdBytes); + String peerId = new String(peerIdBytes).trim(); + Room room = this.rooms.get(roomId); + DisconnectReason reason = io.g365sfu.net.DisconnectReason.fromCode(message.getInt()); + if(room != null) { + /** + * Если такая комната существует то удаляем оттуда участника + */ + room.removeParticipant(peerId); + } + DisconnectedPeer disconnectedPeer = new DisconnectedPeer(peerId, roomId, room, reason); + if(this.onPeerDisconnected != null) { + this.onPeerDisconnected.accept(disconnectedPeer); + } + } + if(packetId == Incoming.TURN_SERVER) { + int urlLength = message.getInt(); + byte[] urlBytes = new byte[urlLength]; + message.get(urlBytes); + String url = new String(urlBytes).trim(); + int usernameLength = message.getInt(); + byte[] usernameBytes = new byte[usernameLength]; + message.get(usernameBytes); + String username = new String(usernameBytes).trim(); + int credentialLength = message.getInt(); + byte[] credentialBytes = new byte[credentialLength]; + message.get(credentialBytes); + String credential = new String(credentialBytes).trim(); + int transportLength = message.getInt(); + byte[] transportBytes = new byte[transportLength]; + message.get(transportBytes); + String transport = new String(transportBytes).trim(); + this.iceServers.add(new RTCIceServer(url, username, credential, transport)); + } + } + + /** + * Получить адрес SFU сервера, к которому установлено соединение + * @return адрес SFU сервера + */ + public String getServerAddress() { + return this.serverAddress; + } + + /** + * Получить соединение к SFU серверу, если оно было установлено + * @return объект SfuSock, представляющий соединение к SFU серверу + */ + public SfuSock getConnection() { + return this.socket; + } + + /** + * Проверить, установлено ли соединение с SFU сервером и открыто ли оно + * @return true, если соединение установлено и открыто, false в противном случае + */ + public boolean isOpen() { + return this.socket != null && this.socket.isOpen(); + } + + /** + * Создает комнату на сервере SFU для организации звонков между пользователями. Комната автоматически удаляется + * при выходе последнего участника из нее. Внутри комнаты пользователи могут обмениваться аудио и видео потоками, а сервер SFU + * будет эффективно их пересылать между участниками, минимизируя задержки и оптимизируя использование пропускной способности. + * @throws TimeoutException + * @throws ExecutionException + * @throws InterruptedException + * @internal Этот метод формирует пакет с кодом 0x02, + * за которым следует случайно сгенерированный идентификатор комнаты, + * и отправляет его на сервер SFU. + */ + public Room createRoom() throws InterruptedException, ExecutionException, TimeoutException { + String roomId = StrUtils.randomString(64); + /** + * 1 байт номер пакета, 4 байта длина ID комнаты, N байт ID комнаты + */ + ByteBuffer buffer = ByteBuffer.allocate(1 + 4 + roomId.getBytes().length); + CompletableFuture future = new CompletableFuture<>(); + this.pendingRoomCreations.put(roomId, future); + /** + * 0x02 - создание комнаты + */ + buffer.put(Outgoing.ROOM_CREATE); + buffer.putInt(roomId.getBytes().length); + buffer.put(roomId.getBytes()); + buffer.flip(); + this.socket.send(buffer); + String createdRoomId = future.get(30, TimeUnit.SECONDS); + Room room = new Room(createdRoomId, this, new HashSet<>()); + this.rooms.put(createdRoomId, room); + return room; + } + + /** + * После успешного установления соединения и обменом handshake нужно узнать, поддерживает ли наш SFU встроенный TURN + */ + public void askTurnServer() { + ByteBuffer buffer = ByteBuffer.allocate(1); + buffer.put(Outgoing.ASK_TURN); + buffer.flip(); + this.socket.send(buffer); + } + + /** + * Получить все комнаты на сервере + * @return комнаты на этом сервере + */ + public HashSet getRooms() { + return new HashSet<>(this.rooms.values()); + } + + /** + * Получить комнату по ее идентификатору + * @param roomId идентификатор комнаты + * @return объект Room, представляющий комнату с данным идентификатором, или null, если комната не найдена + */ + public Room getRoom(String roomId) { + return this.rooms.get(roomId); + } + + /** + * Получить комнату, в которой участвует пользователь с данным идентификатором + * @param participantId идентификатор пользователя, который является участником комнаты + * @return объект Room, представляющий комнату, в которой участвует пользователь с данным идентификатором, или null, если такой комнаты не найдено + */ + public Room getRoomByParticipantId(String participantId) { + for(Room room : this.rooms.values()) { + if(room.containsParticipant(participantId)) { + return room; + } + } + return null; + } + + /** + * Получает количество комнат на этом сервере + * @return возвращает количество комнат на сервере + */ + public int getRoomsCount() { + return this.rooms.size(); + } + + /** + * Устанавливает потребителя для обработки входящих ICE-кандидатов от сервера SFU. + * @param onIceCandidate потребитель, который будет вызываться при получении сообщения от сервера SFU с кодом 0x04, + * содержащим информацию об ICE-кандидате для одного из участников комнаты. + * Параметром будет объект ICECandidate, который содержит информацию о комнате, участнике и самом + * кандидате, необходимую для правильной маршрутизации данных между участниками звонка через сервер SFU. + */ + public void setIceConsumer(Consumer onIceCandidate) { + this.onIceCandidate = onIceCandidate; + } + + /** + * Устанавливает потребителя для обработки входящих SDP Answer от сервера SFU. + * @param onSdpAnswer потребитель, который будет вызываться при получении сообщения от сервера SFU с кодом 0x05, + * содержащим информацию об SDP Answer для одного из участников комнаты. + * Параметром будет объект SDPAnswer, который содержит информацию о комнате, участнике и самом SDP Answer, + * необходимую для установления медиа-сессии между участником и сервером SFU. + */ + public void setAnswerConsumer(Consumer onSdpAnswer) { + this.onSdpAnswer = onSdpAnswer; + } + + /** + * Устанавливает потребителя для обработки входящих SDP Offer от сервера SFU при renegotiation. + * @param onSdpOffer потребитель, который будет вызываться при получении сообщения от сервера SFU с кодом 0x08, + * содержащим информацию об SDP Offer для одного из участников комнаты при renegotiation. + * Параметром будет объект SDPOffer, который содержит информацию о комнате, участнике и самом SDP Offer, + * необходимую для установления медиа-сессии между участником и сервером SFU. + */ + public void setOfferConsumer(Consumer onSdpOffer) { + this.onSdpOffer = onSdpOffer; + } + + /** + * Устанавливает потребителя для обработки сообщений от сервера SFU о том, что комната была удалена. + * @param onDeleteRoom потребитель, который будет вызываться при получении сообщения от сервера SFU с кодом 0x10, + * содержащим информацию о том, что комната была удалена. Параметром будет строка, содержащая ID удаленной комнаты, + * чтобы клиент мог удалить ее из своего списка комнат и прекратить попытки взаимодействия с ней. + */ + public void setDeleteRoomConsumer(Consumer onDeleteRoom) { + this.onDeleteRoom = onDeleteRoom; + } + + /** + * Устанавливает потребителя для обработки сообщений от сервера SFU о том, что участник отключился от комнаты. + * @param onPeerDisconnected потребитель, который будет вызываться при получении сообщения от сервера SFU с кодом 0x11, + */ + public void setPeerDisconnectedConsumer(Consumer onPeerDisconnected) { + this.onPeerDisconnected = onPeerDisconnected; + } + + /** + * Возвращает TURN сервер на этом SFU + */ + public Set getIceServers() { + return this.iceServers; + } +} diff --git a/src/main/java/io/g365sfu/exception/SFUException.java b/src/main/java/io/g365sfu/exception/SFUException.java new file mode 100644 index 0000000..e3c6239 --- /dev/null +++ b/src/main/java/io/g365sfu/exception/SFUException.java @@ -0,0 +1,13 @@ +package io.g365sfu.exception; + +public class SFUException extends Exception { + + public SFUException(String message) { + super(message); + } + + public SFUException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/src/main/java/io/g365sfu/exception/SFUHandshakeException.java b/src/main/java/io/g365sfu/exception/SFUHandshakeException.java new file mode 100644 index 0000000..9103da1 --- /dev/null +++ b/src/main/java/io/g365sfu/exception/SFUHandshakeException.java @@ -0,0 +1,13 @@ +package io.g365sfu.exception; + +public class SFUHandshakeException extends Exception { + + public SFUHandshakeException(String message) { + super(message); + } + + public SFUHandshakeException(String message, Throwable cause) { + super(message, cause); + } + +} diff --git a/src/main/java/io/g365sfu/net/DisconnectReason.java b/src/main/java/io/g365sfu/net/DisconnectReason.java new file mode 100644 index 0000000..21f8ccf --- /dev/null +++ b/src/main/java/io/g365sfu/net/DisconnectReason.java @@ -0,0 +1,25 @@ +package io.g365sfu.net; + +public enum DisconnectReason { + FAILED(0), + CLOSED(1); + + private final int code; + + DisconnectReason(int code) { + this.code = code; + } + + public int getCode() { + return code; + } + + public static DisconnectReason fromCode(int code) { + for (DisconnectReason reason : DisconnectReason.values()) { + if (reason.code == code) { + return reason; + } + } + throw new IllegalArgumentException("Unknown DisconnectReason code: " + code); + } +} diff --git a/src/main/java/io/g365sfu/net/DisconnectedPeer.java b/src/main/java/io/g365sfu/net/DisconnectedPeer.java new file mode 100644 index 0000000..36d83fe --- /dev/null +++ b/src/main/java/io/g365sfu/net/DisconnectedPeer.java @@ -0,0 +1,34 @@ +package io.g365sfu.net; + +import io.g365sfu.Room; + +public class DisconnectedPeer { + + private String peerId; + private Room room; + private String roomId; + private DisconnectReason reason; + + public DisconnectedPeer(String peerId, String roomId, Room room, DisconnectReason reason) { + this.peerId = peerId; + this.room = room; + this.roomId = roomId; + this.reason = reason; + } + + public String getPeerId() { + return peerId; + } + + public Room getRoom() { + return room; + } + + public String getRoomId() { + return roomId; + } + + public DisconnectReason getReason() { + return reason; + } +} diff --git a/src/main/java/io/g365sfu/net/Incoming.java b/src/main/java/io/g365sfu/net/Incoming.java new file mode 100644 index 0000000..f018e4b --- /dev/null +++ b/src/main/java/io/g365sfu/net/Incoming.java @@ -0,0 +1,61 @@ +package io.g365sfu.net; + +/** + * Входящие пакеты от SFU сервера, могут быть инициированы запросом клиента, или отправляться SFU при + * некоторых событиях + */ +public class Incoming { + + /** + * Означает, что сервер ответил на рукопожатие, и мы можем считать его успешным + */ + public static final byte HANDSHAKE_OK = (byte) 0x01; + + /** + * Сервер ответил на рукопожатие, но по какой-то из причин, отклонил его, например, неправильный + * секретный ключ + */ + public static final byte HANDSHAKE_FAILURE = (byte) 0xFF; + + /** + * Ответ от сервера о том, что соединение живое (ответ на проверку соединения), и мы можем считать его успешным + */ + public static final byte CONNECTION_ALIVE = (byte) 0xAE; + + /** + * ICE-Кандидат с сервера SFU, который нужно переслать целевому участнику + */ + public static final byte ICE_CANDIDATE = (byte) 0x04; + + /** + * SDP offer от сервера SFU, который нужно переслать целевому участнику + */ + public static final byte SDP_OFFER = (byte) 0x08; + + /** + * SDP Answer от SFU в ответ на отправленный целевым участником SDP Offer, который нужно переслать целевому участнику + */ + public static final byte SDP_ANSWER = (byte) 0x05; + + /** + * Сообщение от сервера SFU о том, тто комната успешно создана + */ + public static final byte ROOM_CREATE = (byte) 0x02; + + /** + * Сообщение от сервера SFU о том, что комната была удалена + */ + public static final byte ROOM_DELETE = (byte) 0x10; + + /** + * Сообщение об отсоединении участника от сервера SFU, например при обрыве связи + */ + public static final byte PEER_DISCONNECTED = (byte) 0x11; + + /** + * Вызывается когда сервер SFU отправляет TURN сервер (если он поддерживается), который может быть использован + * для обмена кандидатами между участниками звонка через NAT. + */ + public static final byte TURN_SERVER = (byte) 0x19; + +} diff --git a/src/main/java/io/g365sfu/net/Outgoing.java b/src/main/java/io/g365sfu/net/Outgoing.java new file mode 100644 index 0000000..752b8fe --- /dev/null +++ b/src/main/java/io/g365sfu/net/Outgoing.java @@ -0,0 +1,53 @@ +package io.g365sfu.net; + +/** + * Исходящие пакеты к SFU серверу, могут быть отправлены бекендом как в ответ на входящие сообщения от сервера, + * так и по инициативе бекенда + */ +public class Outgoing { + + /** + * Рукопожатие с сервером SFU, которое необходимо выполнить перед любыми другими операциями. + * Этот пакет используется для установления начального соединения с сервером SFU, и + * должен быть отправлен первым при подключении к серверу. Он может содержать информацию, + * необходимую для аутентификации (секретный ключ) + */ + public static final byte HANDSHAKE_EXCHANGE = (byte) 0x01; + + /** + * Проверка соединения с сервером SFU, которая может быть отправлена бекендом по инициативе бекенда для проверки, + * что соединение с сервером все еще активно. + */ + public static final byte CONNECTION_ALIVE = (byte) 0xAE; + + /** + * Ретрансляция ICE-кандидата от одного участника на сервер SFU, чтобы участник мог установить + * соединение с SFU + */ + public static final byte ICE_CANDIDATE_RETRANSLATE = (byte) 0x06; + + /** + * Ретрансляция SDP answer от одного участника на сервер SFU, чтобы участник мог установить + * соединение с SFU + */ + public static final byte SDP_ANSWER_RETRANSLATE = (byte) 0x07; + + /** + * Ретрансляция SDP offer от одного участника на сервер SFU, чтобы участник мог установить + * соединение с SFU + */ + public static final byte SDP_OFFER_RETRANSLATE = (byte) 0x03; + + /** + * Вызывается когда бекенд хочет создать комнату на сервере SFU, и сообщает об этом серверу, + * чтобы сервер создал комнату и был готов к приему участников + */ + public static final byte ROOM_CREATE = (byte) 0x02; + + /** + * Вызывается когда бекенд хочет спросить есть ли TURN сервер предоставляемый SFU, сервер ничего не ответит если + * TURN сервер не поддерживается. По умолчанию в G365SFU .env TURN сервер включен. + */ + public static final byte ASK_TURN = (byte) 0x19; + +} diff --git a/src/main/java/io/g365sfu/net/SfuSock.java b/src/main/java/io/g365sfu/net/SfuSock.java new file mode 100644 index 0000000..e396057 --- /dev/null +++ b/src/main/java/io/g365sfu/net/SfuSock.java @@ -0,0 +1,153 @@ +package io.g365sfu.net; + +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.nio.ByteOrder; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import org.java_websocket.client.WebSocketClient; +import org.java_websocket.handshake.ServerHandshake; + +public class SfuSock extends WebSocketClient { + + private CompletableFuture handshakeFuture = new CompletableFuture<>(); + private CompletableFuture connectionFuture = new CompletableFuture<>(); + private Consumer onMessage; + + public SfuSock(String serverAddress) throws URISyntaxException { + super(new URI("ws://" + serverAddress)); + } + + @Override + public void onOpen(ServerHandshake handshakedata) { + //System.out.println("Connected to SFU server"); + } + + @Override + public void onMessage(ByteBuffer bytes) { + bytes.order(ByteOrder.BIG_ENDIAN); + if(bytes.remaining() < 1) { + return; + } + byte messageType = bytes.get(); + if(messageType == Incoming.HANDSHAKE_OK) { + /** + * Сервер ответил на рукопожатие, и мы можем считать его успешным + */ + this.handshakeFuture.complete(true); + return; + } + + if(messageType == Incoming.HANDSHAKE_FAILURE) { + /** + * Сервер отклонил рукопожатие, и мы должны считать его неудачным + */ + this.handshakeFuture.complete(false); + return; + } + + if(messageType == Incoming.CONNECTION_ALIVE) { + /** + * Сервер отправил сообщение о том, что соединение живое (ответ на проверку соединения), и мы можем считать его успешным + */ + this.connectionFuture.complete(true); + return; + } + + /** + * Если это не сообщение рукопожатия, то мы передаем его в установленного потребителя + */ + if(this.onMessage != null) { + this.onMessage.accept(bytes); + } + } + + @Override + public void onMessage(String message) { + //System.err.println("Received unexpected text message from SFU server: " + message); + } + + @Override + public void onClose(int code, String reason, boolean remote) { + if(!this.handshakeFuture.isDone()) { + /** + * Если соединение было закрыто до завершения рукопожатия, то мы считаем его неудачным + */ + this.handshakeFuture.complete(false); + } + } + + @Override + public void onError(Exception ex){ + //System.err.println("Error: " + ex.getMessage()); + } + + /** + * Запускает обмен рукопожатиями с сервером SFU + * @param secretKey секретный ключ для аутентификации с сервером SFU + * @return CompletableFuture, который будет завершен с результатом true, если + * рукопожатие прошло успешно, или false, если рукопожатие не удалось или было отклонено сервером SFU + * @internal Этот метод отправляет пакет рукопожатия, который состоит из одного байта 0x01, за которым следует секретный ключ в виде строки байтов. + * Сервер SFU должен ответить одним байтом 0x01 для успешного рукопожатия или 0xFF для отклонения рукопожатия. + */ + public CompletableFuture handshakeExchange(String secretKey) { + /** + * 1 байт номер пакета, 4 байта длина секретного ключа, N байт секретный ключ + */ + ByteBuffer buffer = ByteBuffer.allocate( + 1 + 4 + secretKey.getBytes().length + ); + /** + * 0x01 - код рукопожатия в соотвествии с протоколом g365sfu, за которым следует секретный ключ в виде строки байтов + */ + buffer.put(Outgoing.HANDSHAKE_EXCHANGE); + buffer.putInt(secretKey.getBytes().length); + buffer.put(secretKey.getBytes()); + buffer.flip(); + /** + * Отправляем сформированный пакет, и возвращаем CompletableFuture + */ + this.send(buffer); + return this.handshakeFuture; + } + + /** + * Устанавливает потребителя для обработки входящих текстовых сообщений от сервера SFU. + * @param onMessage потребитель, который будет вызван с текстом сообщения при получении текстового сообщения от сервера SFU. + * @internal Этот метод позволяет установить обработчик для текстовых сообщений от сервера SFU + */ + public void setMessageConsumer(Consumer onMessage) { + this.onMessage = onMessage; + } + + /** + * Проверяет состояние соединения, если соединение активно и готово к обмену данными, то возвращает false + * @return + * @throws InterruptedException + */ + public boolean checkConnection() { + ByteBuffer buffer = ByteBuffer.allocate(1); + /** + * 0x08 - код проверки соединения в соотвествии с протоколом g365sfu + */ + buffer.put(Outgoing.CONNECTION_ALIVE); + buffer.flip(); + this.send(buffer); + try { + boolean result = this.connectionFuture.get(10, TimeUnit.SECONDS); + this.connectionFuture = new CompletableFuture<>(); + return result; + }catch(Exception e){ + /** + * Сбрасываем handshakeFuture, так как соединение не активно, + * и нам нужно будет пройти рукопожатие заново при следующей проверке соединения + */ + this.handshakeFuture = new CompletableFuture<>(); + this.connectionFuture = new CompletableFuture<>(); + return false; + } + } +} diff --git a/src/main/java/io/g365sfu/util/StrUtils.java b/src/main/java/io/g365sfu/util/StrUtils.java new file mode 100644 index 0000000..deccec8 --- /dev/null +++ b/src/main/java/io/g365sfu/util/StrUtils.java @@ -0,0 +1,15 @@ +package io.g365sfu.util; + +public class StrUtils { + + public static String randomString(int length) { + String chars = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789"; + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < length; i++) { + int index = (int) (Math.random() * chars.length()); + sb.append(chars.charAt(index)); + } + return sb.toString(); + } + +} diff --git a/src/main/java/io/g365sfu/webrtc/ICECandidate.java b/src/main/java/io/g365sfu/webrtc/ICECandidate.java new file mode 100644 index 0000000..63ac1ee --- /dev/null +++ b/src/main/java/io/g365sfu/webrtc/ICECandidate.java @@ -0,0 +1,34 @@ +package io.g365sfu.webrtc; + +/** + * Этот класс представляет собой ICE-кандидат, + * который используется для обмена информацией о сетевых маршрутах между участниками звонка через сервер SFU. + * + * Содержит информацию о комнате, участнике и самом кандидате, которая необходима для правильной маршрутизации данных между участниками звонка через сервер SFU. + */ +public class ICECandidate { + + private String roomId; + private String participantId; + private String candidate; + + public ICECandidate(String roomId, String participantId, String candidate) { + this.roomId = roomId; + this.participantId = participantId; + this.candidate = candidate; + } + + public String getRoomId() { + return roomId; + } + + public String getParticipantId() { + return participantId; + } + + public String getCandidate() { + return candidate; + } + + +} diff --git a/src/main/java/io/g365sfu/webrtc/RTCIceServer.java b/src/main/java/io/g365sfu/webrtc/RTCIceServer.java new file mode 100644 index 0000000..15cc7cd --- /dev/null +++ b/src/main/java/io/g365sfu/webrtc/RTCIceServer.java @@ -0,0 +1,54 @@ +package io.g365sfu.webrtc; + + +/** + * Представляет собой объект RTCIceServer который содержит информацию о сервере ICE, + * например TURN + */ +public class RTCIceServer { + + private String url; + private String username; + private String credential; + private String transport; + + public RTCIceServer(String url, String username, String credential, String transport) { + this.url = url; + this.username = username; + this.credential = credential; + this.transport = transport; + } + + /** + * URL сервера ICE, который используется для обмена кандидатами между участниками звонка через сервер SFU. + * @return строка, содержащая URL сервера ICE, который используется для обмена кандидатами между участниками звонка через сервер SFU. + */ + public String getUrl() { + return url; + } + + /** + * Имя пользователя для аутентификации на сервере ICE. + * @return строка, содержащая имя пользователя для аутентификации на сервере ICE. + */ + public String getUsername() { + return username; + } + + /** + * Учетные данные для аутентификации на сервере ICE. + * @return строка, содержащая учетные данные для аутентификации на сервере ICE. + */ + public String getCredential() { + return credential; + } + + /** + * Транспортный протокол, используемый для связи с сервером ICE (например, "udp" или "tcp"). + * @return строка, содержащая транспортный протокол, используемый для связи с сервером ICE (например, "udp" или "tcp"). + */ + public String getTransport() { + return transport; + } + +} diff --git a/src/main/java/io/g365sfu/webrtc/SDPAnswer.java b/src/main/java/io/g365sfu/webrtc/SDPAnswer.java new file mode 100644 index 0000000..333d13a --- /dev/null +++ b/src/main/java/io/g365sfu/webrtc/SDPAnswer.java @@ -0,0 +1,31 @@ +package io.g365sfu.webrtc; + +/** + * Приходит с сервера SFU в ответ на отправленный SDP Offer от участника комнаты. + * Содержит информацию о комнате, участнике и самом SDP Answer, который необходим для установления медиа-сессии + * между участником и сервером SFU + */ +public class SDPAnswer { + + private String roomId; + private String participantId; + private String sdp; + + public SDPAnswer(String roomId, String participantId, String sdp) { + this.roomId = roomId; + this.participantId = participantId; + this.sdp = sdp; + } + + public String getRoomId() { + return roomId; + } + + public String getParticipantId() { + return participantId; + } + + public String getSdp() { + return sdp; + } +} diff --git a/src/main/java/io/g365sfu/webrtc/SDPOffer.java b/src/main/java/io/g365sfu/webrtc/SDPOffer.java new file mode 100644 index 0000000..4814007 --- /dev/null +++ b/src/main/java/io/g365sfu/webrtc/SDPOffer.java @@ -0,0 +1,30 @@ +package io.g365sfu.webrtc; + +/** + * Приходит от SFU сервера для конкретного участника комнаты. Обычно приходит при renegotiation, + * когда участник комнаты отправляет новый SDP Offer на сервер SFU, а сервер SFU отвечает ему новым SDP Answer, + * который содержит обновленную информацию о медиа-сессии для этого участника. + */ +public class SDPOffer { + private String roomId; + private String participantId; + private String sdp; + + public SDPOffer(String roomId, String participantId, String sdp) { + this.roomId = roomId; + this.participantId = participantId; + this.sdp = sdp; + } + + public String getRoomId() { + return roomId; + } + + public String getParticipantId() { + return participantId; + } + + public String getSdp() { + return sdp; + } +} diff --git a/src/main/java/io/orprotocol/Server.java b/src/main/java/io/orprotocol/Server.java index fbf29e4..dfa705a 100644 --- a/src/main/java/io/orprotocol/Server.java +++ b/src/main/java/io/orprotocol/Server.java @@ -237,7 +237,7 @@ public class Server extends WebSocketServer { client.disconnect(ServerFailures.INACTIVITY_TIMEOUT); } } - }, this.settings.heartbeatInterval, this.settings.heartbeatInterval, TimeUnit.MILLISECONDS); + }, this.settings.heartbeatInterval, this.settings.heartbeatInterval, TimeUnit.SECONDS); } /**