diff --git a/src/main/java/im/rosetta/Boot.java b/src/main/java/im/rosetta/Boot.java index eef6f93..a5f411e 100644 --- a/src/main/java/im/rosetta/Boot.java +++ b/src/main/java/im/rosetta/Boot.java @@ -17,7 +17,8 @@ import im.rosetta.executors.Executor21GroupLeave; import im.rosetta.executors.Executor22GroupBan; import im.rosetta.executors.Executor24DeviceResolve; import im.rosetta.executors.Executor25Sync; -import im.rosetta.executors.Executor26Signal; +import im.rosetta.executors.Executor26SignalPeer; +import im.rosetta.executors.Executor27WebRTC; import im.rosetta.executors.Executor3Search; import im.rosetta.executors.Executor4OnlineState; import im.rosetta.executors.Executor6Message; @@ -45,7 +46,8 @@ import im.rosetta.packet.Packet22GroupBan; import im.rosetta.packet.Packet23DeviceList; import im.rosetta.packet.Packet24DeviceResolve; import im.rosetta.packet.Packet25Sync; -import im.rosetta.packet.Packet26Signal; +import im.rosetta.packet.Packet26SignalPeer; +import im.rosetta.packet.Packet27WebRTC; import im.rosetta.packet.Packet2Result; import im.rosetta.packet.Packet3Search; import im.rosetta.packet.Packet4OnlineSubscribe; @@ -56,7 +58,6 @@ import im.rosetta.packet.Packet8Delivery; import im.rosetta.packet.Packet9DeviceNew; import im.rosetta.service.services.BufferCleanupService; import im.rosetta.service.services.ForwardUnitService; -import io.g365sfu.SFU; import io.orprotocol.Server; import io.orprotocol.Settings; import io.orprotocol.packet.PacketManager; @@ -110,7 +111,7 @@ public class Boot { int cleanupEveryDays = System.getenv("BUFFER_CLEANUP_DAYS") != null ? Integer.parseInt(System.getenv("BUFFER_CLEANUP_DAYS")) : 7; this.bufferCleanupService = new BufferCleanupService(cleanupEveryDays, this.logger); - this.forwardUnitService = new ForwardUnitService(this.logger); + this.forwardUnitService = new ForwardUnitService(this.logger, this.clientManager); } /** @@ -201,7 +202,8 @@ public class Boot { this.packetManager.registerPacket(23, Packet23DeviceList.class); this.packetManager.registerPacket(24, Packet24DeviceResolve.class); this.packetManager.registerPacket(25, Packet25Sync.class); - this.packetManager.registerPacket(26, Packet26Signal.class); + this.packetManager.registerPacket(26, Packet26SignalPeer.class); + this.packetManager.registerPacket(27, Packet27WebRTC.class); } private void registerAllExecutors() { @@ -223,7 +225,8 @@ public class Boot { this.packetManager.registerExecutor(22, new Executor22GroupBan()); this.packetManager.registerExecutor(24, new Executor24DeviceResolve(this.clientManager, this.eventManager, this.packetManager)); this.packetManager.registerExecutor(25, new Executor25Sync(this.packetManager)); - this.packetManager.registerExecutor(26, new Executor26Signal(this.clientManager)); + this.packetManager.registerExecutor(26, new Executor26SignalPeer(this.clientManager, this.forwardUnitService)); + this.packetManager.registerExecutor(27, new Executor27WebRTC(this.forwardUnitService)); } private void printBootMessage() { diff --git a/src/main/java/im/rosetta/executors/Executor27WebRTC.java b/src/main/java/im/rosetta/executors/Executor27WebRTC.java new file mode 100644 index 0000000..4d9aa19 --- /dev/null +++ b/src/main/java/im/rosetta/executors/Executor27WebRTC.java @@ -0,0 +1,73 @@ +package im.rosetta.executors; + +import im.rosetta.Failures; +import im.rosetta.client.tags.ECIAuthentificate; +import im.rosetta.packet.Packet27WebRTC; +import im.rosetta.packet.runtime.NetworkWebRTCType; +import im.rosetta.service.services.ForwardUnitService; +import io.g365sfu.Room; +import io.orprotocol.ProtocolException; +import io.orprotocol.client.Client; +import io.orprotocol.packet.PacketExecutor; + +public class Executor27WebRTC extends PacketExecutor { + + private ForwardUnitService fus; + + public Executor27WebRTC(ForwardUnitService fus) { + this.fus = fus; + } + + @Override + public void onPacketReceived(Packet27WebRTC packet, Client client) throws Exception, ProtocolException { + ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class); + if(eciAuthentificate == null || !eciAuthentificate.hasAuthorized()) { + /** + * Если клиент не авторизован, то мы не будем обрабатывать его сигналы на инициализацию звонка + * и просто отключим его от сервера. + */ + client.disconnect(Failures.HANDSHAKE_NOT_COMPLETED); + return; + } + + String publicKey = eciAuthentificate.getPublicKey(); + + /** + * Так как в комнатах Participants это публичные ключи пользователей, то мы можем + * найти комнату, в которой находится пользователь, по его публичному ключу + */ + Room room = this.fus.getRoomByParticipantId(publicKey); + if(room == null) { + /** + * Если комната не найдена, то мы не будем обрабатывать сигналы для звонка + * и просто отключим клиента от сервера. + */ + client.disconnect(Failures.DATA_MISSMATCH); + return; + } + + NetworkWebRTCType type = packet.getType(); + if(type == NetworkWebRTCType.OFFER) { + /** + * Если это OFFER, то отправляем OFFER на сервер SFU, + * который отвечает за эту комнату, чтобы он транслировал его всем участникам комнаты, кроме отправителя + */ + room.sdpOffer(publicKey, packet.getSdpOrCandidate()); + } + if(type == NetworkWebRTCType.ICE_CANDIDATE) { + /** + * Если это ICE кандидат, то отправляем его на сервер SFU, + * который отвечает за эту комнату, чтобы он транслировал его всем участникам комнаты, кроме отправителя + */ + room.iceCandidate(publicKey, packet.getSdpOrCandidate()); + } + if(type == NetworkWebRTCType.ANSWER) { + /** + * Если это ANSWER, то отправляем его на сервер SFU, + * который отвечает за эту комнату, чтобы он транслировал его всем участникам комнаты, кроме отправителя + */ + room.sdpAnswer(publicKey, packet.getSdpOrCandidate()); + } + } + +} diff --git a/src/main/java/im/rosetta/packet/Packet27WebRTC.java b/src/main/java/im/rosetta/packet/Packet27WebRTC.java new file mode 100644 index 0000000..611c9a2 --- /dev/null +++ b/src/main/java/im/rosetta/packet/Packet27WebRTC.java @@ -0,0 +1,65 @@ +package im.rosetta.packet; + +import im.rosetta.packet.runtime.NetworkWebRTCType; +import io.orprotocol.Stream; +import io.orprotocol.packet.Packet; + +public class Packet27WebRTC extends Packet { + /** + * SDP оффер/answer или ICE кандидат, в зависимости от типа сообщения + */ + private String sdpOrCandidate; + /** + * Тип сообщения WebRTC + */ + private NetworkWebRTCType type; + + @Override + public void read(Stream stream) { + this.type = NetworkWebRTCType.fromCode(stream.readInt8()); + this.sdpOrCandidate = stream.readString(); + } + + @Override + public Stream write() { + Stream steram = new Stream(); + steram.writeInt16(this.packetId); + steram.writeInt8(this.type.getCode()); + steram.writeString(this.sdpOrCandidate); + return steram; + } + + /** + * Получить SDP оффер/answer или ICE кандидат, в зависимости от типа сообщения + * @return SDP оффер/answer или ICE кандидат + */ + public String getSdpOrCandidate() { + return sdpOrCandidate; + } + + /** + * Получить тип сообщения WebRTC, который указывает на то, является ли это оффером, ответом на оффер или ICE кандидатом + * @return тип сообщения WebRTC + */ + public NetworkWebRTCType getType() { + return type; + } + + /** + * Установить SDP оффер/answer или ICE кандидат, в зависимости от типа сообщения + * @param sdpOrCandidate SDP оффер/answer или ICE кандидат + */ + public void setSdpOrCandidate(String sdpOrCandidate) { + this.sdpOrCandidate = sdpOrCandidate; + } + + /** + * Установить тип сообщения WebRTC, который указывает на то, является ли это оффером, ответом на оффер или ICE кандидатом + * @param type тип сообщения WebRTC + */ + public void setType(NetworkWebRTCType type) { + this.type = type; + } + + +}