From 07b4e9daa2f60ce3d30ffd6bad4abb52b61f555c Mon Sep 17 00:00:00 2001 From: Royce59 Date: Sun, 19 Apr 2026 12:59:16 +0200 Subject: [PATCH] =?UTF-8?q?=D0=97=D0=B0=D0=B2=D0=B5=D1=80=D1=88=D0=B5?= =?UTF-8?q?=D0=BD=D0=B8=D0=B5=20=D0=BF=D0=B5=D1=80=D0=B5=D1=85=D0=BE=D0=B4?= =?UTF-8?q?=D0=B0=20=D0=BD=D0=B0=20=D0=BD=D0=BE=D0=B2=D1=83=D1=8E=20=D0=B2?= =?UTF-8?q?=D0=B5=D1=80=D1=81=D0=B8=D1=8E=20=D0=BF=D1=80=D0=BE=D1=82=D0=BE?= =?UTF-8?q?=D0=BA=D0=BE=D0=BB=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/im/rosetta/Boot.java | 68 +++++++++---------- .../java/im/rosetta/calls/CallManager.java | 12 ++-- .../im/rosetta/database/entity/Buffer.java | 11 +++ .../network/RccGeneratedPacketRegistry.java | 3 + .../service/dispatch/DeviceDispatcher.java | 10 +-- .../dispatch/push/PushNotifyDispatcher.java | 2 +- .../service/services/BufferService.java | 32 +++++++-- .../service/services/ForwardUnitService.java | 28 ++++---- .../io/orprotocol/frame/FrameDecoder.java | 16 +++-- .../io/orprotocol/frame/FrameEncoder.java | 13 ++-- 10 files changed, 121 insertions(+), 74 deletions(-) diff --git a/src/main/java/im/rosetta/Boot.java b/src/main/java/im/rosetta/Boot.java index 62a5361..54e5ba0 100644 --- a/src/main/java/im/rosetta/Boot.java +++ b/src/main/java/im/rosetta/Boot.java @@ -4,27 +4,27 @@ import im.rosetta.calls.CallManager; import im.rosetta.client.ClientManager; import im.rosetta.client.OnlineManager; import im.rosetta.event.EventManager; +import im.rosetta.executors.ExecutorDeviceResolve; +import im.rosetta.executors.ExecutorGroupBan; +import im.rosetta.executors.ExecutorGroupCreate; +import im.rosetta.executors.ExecutorGroupInfo; +import im.rosetta.executors.ExecutorGroupInviteInfo; +import im.rosetta.executors.ExecutorGroupJoin; +import im.rosetta.executors.ExecutorGroupLeave; import im.rosetta.executors.ExecutorHandshake; -import im.rosetta.executors.Executor10RequestUpdate; -import im.rosetta.executors.Executor11Typeing; -import im.rosetta.executors.Executor15RequestTransport; -import im.rosetta.executors.Executor16PushNotification; -import im.rosetta.executors.Executor17GroupCreate; -import im.rosetta.executors.Executor18GroupInfo; -import im.rosetta.executors.Executor19GroupInviteInfo; +import im.rosetta.executors.ExecutorIceServers; +import im.rosetta.executors.ExecutorMessage; import im.rosetta.executors.ExecutorUserInfo; -import im.rosetta.executors.Executor20GroupJoin; -import im.rosetta.executors.Executor21GroupLeave; -import im.rosetta.executors.Executor22GroupBan; -import im.rosetta.executors.Executor24DeviceResolve; -import im.rosetta.executors.Executor25Sync; -import im.rosetta.executors.Executor26SignalPeer; -import im.rosetta.executors.Executor27WebRTC; -import im.rosetta.executors.Executor28IceServers; +import im.rosetta.executors.ExecutorWebRTC; import im.rosetta.executors.ExecutorSearch; +import im.rosetta.executors.ExecutorSignalPeer; +import im.rosetta.executors.ExecutorSync; +import im.rosetta.executors.ExecutorTypeing; import im.rosetta.executors.ExecutorOnlineState; -import im.rosetta.executors.Executor6Message; -import im.rosetta.executors.Executor7Read; +import im.rosetta.executors.ExecutorPushNotification; +import im.rosetta.executors.ExecutorRead; +import im.rosetta.executors.ExecutorRequestTransport; +import im.rosetta.executors.ExecutorRequestUpdate; import im.rosetta.listeners.DeviceListListener; import im.rosetta.listeners.HandshakeCompleteListener; import im.rosetta.listeners.OnlineStatusDisconnectListener; @@ -160,23 +160,23 @@ public class Boot { this.packetManager.registerExecutor(1, new ExecutorUserInfo()); this.packetManager.registerExecutor(3, new ExecutorSearch(this.clientManager)); this.packetManager.registerExecutor(4, new ExecutorOnlineState(this.onlineManager, this.clientManager)); - this.packetManager.registerExecutor(6, new Executor6Message(this.clientManager, this.packetManager)); - this.packetManager.registerExecutor(7, new Executor7Read(this.clientManager, this.packetManager)); - this.packetManager.registerExecutor(10, new Executor10RequestUpdate()); - this.packetManager.registerExecutor(11, new Executor11Typeing(this.clientManager, this.packetManager)); - this.packetManager.registerExecutor(15, new Executor15RequestTransport()); - this.packetManager.registerExecutor(16, new Executor16PushNotification()); - this.packetManager.registerExecutor(17, new Executor17GroupCreate()); - this.packetManager.registerExecutor(18, new Executor18GroupInfo()); - this.packetManager.registerExecutor(19, new Executor19GroupInviteInfo()); - this.packetManager.registerExecutor(20, new Executor20GroupJoin(this.packetManager)); - this.packetManager.registerExecutor(21, new Executor21GroupLeave()); - this.packetManager.registerExecutor(22, new Executor22GroupBan()); - this.packetManager.registerExecutor(24, new Executor24DeviceResolve(this.clientManager, this.eventManager, this.packetManager)); - this.packetManager.registerExecutor(25, new Executor25Sync(this.packetManager)); - this.packetManager.registerExecutor(26, new Executor26SignalPeer(this.clientManager, this.forwardUnitService, this.callManager)); - this.packetManager.registerExecutor(27, new Executor27WebRTC(this.callManager)); - this.packetManager.registerExecutor(28, new Executor28IceServers(this.forwardUnitService)); + this.packetManager.registerExecutor(6, new ExecutorMessage(this.clientManager, this.packetManager)); + this.packetManager.registerExecutor(7, new ExecutorRead(this.clientManager, this.packetManager)); + this.packetManager.registerExecutor(10, new ExecutorRequestUpdate()); + this.packetManager.registerExecutor(11, new ExecutorTypeing(this.clientManager, this.packetManager)); + this.packetManager.registerExecutor(15, new ExecutorRequestTransport()); + this.packetManager.registerExecutor(16, new ExecutorPushNotification()); + this.packetManager.registerExecutor(17, new ExecutorGroupCreate()); + this.packetManager.registerExecutor(18, new ExecutorGroupInfo()); + this.packetManager.registerExecutor(19, new ExecutorGroupInviteInfo()); + this.packetManager.registerExecutor(20, new ExecutorGroupJoin(this.packetManager)); + this.packetManager.registerExecutor(21, new ExecutorGroupLeave()); + this.packetManager.registerExecutor(22, new ExecutorGroupBan()); + this.packetManager.registerExecutor(24, new ExecutorDeviceResolve(this.clientManager, this.eventManager, this.packetManager)); + this.packetManager.registerExecutor(25, new ExecutorSync(this.packetManager)); + this.packetManager.registerExecutor(26, new ExecutorSignalPeer(this.clientManager, this.forwardUnitService, this.callManager)); + this.packetManager.registerExecutor(27, new ExecutorWebRTC(this.callManager)); + this.packetManager.registerExecutor(28, new ExecutorIceServers(this.forwardUnitService)); } private void printBootMessage() { diff --git a/src/main/java/im/rosetta/calls/CallManager.java b/src/main/java/im/rosetta/calls/CallManager.java index c99a8e0..d5706a9 100644 --- a/src/main/java/im/rosetta/calls/CallManager.java +++ b/src/main/java/im/rosetta/calls/CallManager.java @@ -7,8 +7,8 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import im.rosetta.client.ClientManager; -import im.rosetta.packet.Packet26SignalPeer; -import im.rosetta.packet.runtime.NetworkSignalType; +import im.rosetta.network.enums.SignalType; +import im.rosetta.network.packet.PacketSignalPeer; import io.g365sfu.Room; import io.orprotocol.ProtocolException; import io.orprotocol.client.Client; @@ -40,10 +40,10 @@ public class CallManager { /** * Отправляем всем в сессии что звонок завершился, так как он устарел, и удаляем сессию из списка активных сессий */ - Packet26SignalPeer rtout = new Packet26SignalPeer(); - rtout.setSignalType(NetworkSignalType.RINGING_TIMEOUT); - Packet26SignalPeer endCallPacket = new Packet26SignalPeer(); - endCallPacket.setSignalType(NetworkSignalType.END_CALL); + PacketSignalPeer rtout = new PacketSignalPeer(); + rtout.setSignalType(SignalType.RINGING_TIMEOUT); + PacketSignalPeer endCallPacket = new PacketSignalPeer(); + endCallPacket.setSignalType(SignalType.END_CALL); endCallPacket.setJoinToken(session.getJoinToken()); endCallPacket.setCallId(session.getCallId()); diff --git a/src/main/java/im/rosetta/database/entity/Buffer.java b/src/main/java/im/rosetta/database/entity/Buffer.java index 8b824c1..1f62395 100644 --- a/src/main/java/im/rosetta/database/entity/Buffer.java +++ b/src/main/java/im/rosetta/database/entity/Buffer.java @@ -32,6 +32,9 @@ public class Buffer extends CreateUpdateEntity { @Column(name = "packetId") private int packetId; + @Column(name = "version") + private int version; + @Column(name = "packet", columnDefinition = "bytea") private byte[] packet; @@ -79,6 +82,14 @@ public class Buffer extends CreateUpdateEntity { return packetId; } + public int getVersion() { + return version; + } + + public void setVersion(int version) { + this.version = version; + } + public void setPacketId(int packetId) { this.packetId = packetId; } diff --git a/src/main/java/im/rosetta/network/RccGeneratedPacketRegistry.java b/src/main/java/im/rosetta/network/RccGeneratedPacketRegistry.java index ffb40d1..7d5b8e8 100644 --- a/src/main/java/im/rosetta/network/RccGeneratedPacketRegistry.java +++ b/src/main/java/im/rosetta/network/RccGeneratedPacketRegistry.java @@ -57,6 +57,9 @@ public final class RccGeneratedPacketRegistry { public static final int PACKET_WEB_RTC_ID = 27; public static final int PACKET_ICE_SERVERS_ID = 28; + public static final int MAX_VERSION = 1; + public static final int MIN_VERSION = 1; + private RccGeneratedPacketRegistry() {} public static void registerAll(PacketManager manager) { diff --git a/src/main/java/im/rosetta/service/dispatch/DeviceDispatcher.java b/src/main/java/im/rosetta/service/dispatch/DeviceDispatcher.java index f03c079..2b58106 100644 --- a/src/main/java/im/rosetta/service/dispatch/DeviceDispatcher.java +++ b/src/main/java/im/rosetta/service/dispatch/DeviceDispatcher.java @@ -9,10 +9,10 @@ import im.rosetta.client.ClientManager; import im.rosetta.client.tags.ECIDevice; import im.rosetta.database.entity.Device; import im.rosetta.database.repository.DeviceRepository; -import im.rosetta.packet.Packet23DeviceList; -import im.rosetta.packet.runtime.DeviceSolution; -import im.rosetta.packet.runtime.NetworkDevice; -import im.rosetta.packet.runtime.NetworkStatus; +import im.rosetta.network.enums.DeviceSolution; +import im.rosetta.network.enums.NetworkStatus; +import im.rosetta.network.packet.PacketDeviceList; +import im.rosetta.network.types.NetworkDevice; import im.rosetta.service.services.DeviceService; import io.orprotocol.ProtocolException; @@ -77,7 +77,7 @@ public class DeviceDispatcher { } List networkDevices = new ArrayList<>(byId.values()); - Packet23DeviceList packet = new Packet23DeviceList(); + PacketDeviceList packet = new PacketDeviceList(); packet.setDevices(networkDevices); this.clientManager.sendPacketToAuthorizedPK(publicKey, packet); } diff --git a/src/main/java/im/rosetta/service/dispatch/push/PushNotifyDispatcher.java b/src/main/java/im/rosetta/service/dispatch/push/PushNotifyDispatcher.java index 805137a..7aa6e59 100644 --- a/src/main/java/im/rosetta/service/dispatch/push/PushNotifyDispatcher.java +++ b/src/main/java/im/rosetta/service/dispatch/push/PushNotifyDispatcher.java @@ -7,7 +7,7 @@ import java.util.concurrent.Executors; import im.rosetta.database.entity.PushToken; import im.rosetta.database.repository.PushTokenRepository; -import im.rosetta.packet.runtime.TokenType; +import im.rosetta.network.enums.TokenType; import im.rosetta.service.dispatch.push.dispatchers.FCM; import im.rosetta.service.dispatch.push.dispatchers.VoIPApns; diff --git a/src/main/java/im/rosetta/service/services/BufferService.java b/src/main/java/im/rosetta/service/services/BufferService.java index 2d04fca..80fa417 100644 --- a/src/main/java/im/rosetta/service/services/BufferService.java +++ b/src/main/java/im/rosetta/service/services/BufferService.java @@ -10,11 +10,14 @@ import im.rosetta.database.entity.Buffer; import im.rosetta.database.repository.BufferRepository; import im.rosetta.database.repository.GroupRepository; import im.rosetta.exception.UnauthorizedExeception; -import im.rosetta.packet.Packet7Read; +import im.rosetta.network.RccGeneratedPacketRegistry; +import im.rosetta.network.packet.PacketRead; import im.rosetta.service.Service; import im.rosetta.service.services.runtime.PacketBuffer; import io.orprotocol.ProtocolException; import io.orprotocol.client.Client; +import io.orprotocol.frame.FrameDecoder; +import io.orprotocol.frame.FrameEncoder; import io.orprotocol.packet.Packet; import io.orprotocol.packet.PacketManager; @@ -68,8 +71,13 @@ public class BufferService extends Service { try(QuerySession querySession = this.getRepository().buildQuery(hql, parameters)){ List buffers = querySession.getQuery().setMaxResults(take).list(); for(Buffer buffer : buffers) { + /** + * Декодируем каждый пакет используя ту версию с которой он к нам пришел, + * на выходе получаем последнюю версию пакета + */ byte[] packetBytes = buffer.getPacket(); - Packet packet = this.packetManager.createPacket(packetBytes); + FrameDecoder decoder = new FrameDecoder(packetBytes, buffer.getVersion(), this.packetManager); + Packet packet = decoder.decode(); packets.add(packet); } if(!buffers.isEmpty()){ @@ -84,11 +92,12 @@ public class BufferService extends Service { * @param from публичный ключ отправителя пакета * @param to публичный ключ получателя пакета * @param packet пакет для добавления в буфер + * @throws ProtocolException */ - public void pushPacketToBuffer(String from, String to, Packet packet) { + public void pushPacketToBuffer(String from, String to, Packet packet) throws ProtocolException { int packetId = this.packetManager.getPacketIdByClass(packet.getClass()); packet.packetId = packetId; - if(packet instanceof Packet7Read){ + if(packet instanceof PacketRead){ /** * Если это пакет чтения, то получаем последний пакет из буфера, если это уже чтение - то чтение больше не нужно * записывать, так как эффект от нескольких пакетов чтения один и тот же, как и от одного, однако @@ -114,9 +123,22 @@ public class BufferService extends Service { } } } - byte[] packetBytes = packet.write().getBuffer(); + /** + * Так как пакет у нас последней версии на сервере, то получаем максимальную версию протокола + * (Packet закодирован всегда в нее, потому что сервер всегда работает с последними версиями) + * и пишем в буфер байты максимальной версии пакета и саму + * версию которой принадлежат байты + */ + FrameEncoder encoder = new FrameEncoder(packet, RccGeneratedPacketRegistry.MAX_VERSION, packetManager); + byte[] packetBytes = encoder.encode(); Buffer buffer = new Buffer(); buffer.setFrom(from); + /** + * Пишем текующую максимальную версию в буфер потому что максимальная + * версия может и смениться, а при чтении нам нужно точно знать + * с какой версии читать пакет + */ + buffer.setVersion(RccGeneratedPacketRegistry.MAX_VERSION); buffer.setTo(to); buffer.setTimestamp(System.currentTimeMillis()); buffer.setPacketId(packetId); diff --git a/src/main/java/im/rosetta/service/services/ForwardUnitService.java b/src/main/java/im/rosetta/service/services/ForwardUnitService.java index 72a64d0..e2f7a4f 100644 --- a/src/main/java/im/rosetta/service/services/ForwardUnitService.java +++ b/src/main/java/im/rosetta/service/services/ForwardUnitService.java @@ -12,10 +12,10 @@ import im.rosetta.calls.CallSession; import im.rosetta.client.ClientManager; import im.rosetta.logger.Logger; import im.rosetta.logger.enums.Color; -import im.rosetta.packet.Packet26SignalPeer; -import im.rosetta.packet.Packet27WebRTC; -import im.rosetta.packet.runtime.NetworkSignalType; -import im.rosetta.packet.runtime.NetworkWebRTCType; +import im.rosetta.network.enums.SignalType; +import im.rosetta.network.enums.WebRTCType; +import im.rosetta.network.packet.PacketSignalPeer; +import im.rosetta.network.packet.PacketWebRTC; import io.g365sfu.Room; import io.g365sfu.SFU; import io.g365sfu.net.DisconnectReason; @@ -147,8 +147,8 @@ public class ForwardUnitService { * Произошло нештатное отключение клиента от сервера SFU, например, из-за сбоя сети */ if(callSession.shouldRemove()){ - Packet26SignalPeer packet = new Packet26SignalPeer(); - packet.setSignalType(NetworkSignalType.END_CALL_BECAUSE_PEER_DISCONNECTED); + PacketSignalPeer packet = new PacketSignalPeer(); + packet.setSignalType(SignalType.END_CALL_BECAUSE_PEER_DISCONNECTED); callSession.sendPacket(packet, null); this.callManager.removeSession(callSession); } @@ -160,8 +160,8 @@ public class ForwardUnitService { * (например если клиент отрабатывает выход из звонка по кнопке END не правильно) */ if(callSession.shouldRemove()){ - Packet26SignalPeer packet = new Packet26SignalPeer(); - packet.setSignalType(NetworkSignalType.END_CALL); + PacketSignalPeer packet = new PacketSignalPeer(); + packet.setSignalType(SignalType.END_CALL); callSession.sendPacket(packet, null); this.callManager.removeSession(callSession); } @@ -176,9 +176,9 @@ public class ForwardUnitService { */ public void onSdpAnswer(SDPAnswer sdpAnswer) throws ProtocolException { String participantId = sdpAnswer.getParticipantId(); - Packet27WebRTC packet = new Packet27WebRTC(); + PacketWebRTC packet = new PacketWebRTC(); packet.setSdpOrCandidate(sdpAnswer.getSdp()); - packet.setType(NetworkWebRTCType.ANSWER); + packet.setType(WebRTCType.ANSWER); this.clientManager.sendPacketToAuthorizedPK(participantId, packet); } @@ -191,9 +191,9 @@ public class ForwardUnitService { */ public void onIceCandidate(ICECandidate iceCandidate) throws ProtocolException { String publicKey = iceCandidate.getParticipantId(); - Packet27WebRTC packet = new Packet27WebRTC(); + PacketWebRTC packet = new PacketWebRTC(); packet.setSdpOrCandidate(iceCandidate.getCandidate()); - packet.setType(NetworkWebRTCType.ICE_CANDIDATE); + packet.setType(WebRTCType.ICE_CANDIDATE); this.clientManager.sendPacketToAuthorizedPK(publicKey, packet); } @@ -206,9 +206,9 @@ public class ForwardUnitService { */ public void onSdpOffer(SDPOffer sdpOffer) throws ProtocolException { String participantId = sdpOffer.getParticipantId(); - Packet27WebRTC packet = new Packet27WebRTC(); + PacketWebRTC packet = new PacketWebRTC(); packet.setSdpOrCandidate(sdpOffer.getSdp()); - packet.setType(NetworkWebRTCType.OFFER); + packet.setType(WebRTCType.OFFER); this.clientManager.sendPacketToAuthorizedPK(participantId, packet); } diff --git a/src/main/java/io/orprotocol/frame/FrameDecoder.java b/src/main/java/io/orprotocol/frame/FrameDecoder.java index 81c8903..f958051 100644 --- a/src/main/java/io/orprotocol/frame/FrameDecoder.java +++ b/src/main/java/io/orprotocol/frame/FrameDecoder.java @@ -14,26 +14,32 @@ import io.orprotocol.packet.PacketManager; public class FrameDecoder { private final byte[] frame; - private final Client sender; private final PacketManager packetManager; + private final int version; public FrameDecoder(byte[] frame, Client sender, PacketManager packetManager) { this.frame = frame; - this.sender = sender; + this.version = sender.getVersion(); this.packetManager = packetManager; } + public FrameDecoder(byte[] frame, int version, PacketManager packetManager) { + this.frame = frame; + this.version = version; + this.packetManager = packetManager; + } + + public Packet decode() throws ProtocolException { - int senderVersion = this.sender.getVersion(); Buffer buf = Buffer.wrap(this.frame); int packetId = buf.readUInt16(); Codec codec = this.packetManager.getCodec(packetId); - Packet packet = codec.decode(buf, senderVersion); + Packet packet = codec.decode(buf, this.version); packet.packetId = packetId; /** * Буфер уже смещен на 2 байта, так что декодирование в кодеке начинается с данных пакета, а не с ID */ - return codec.decode(buf, senderVersion); + return packet; } diff --git a/src/main/java/io/orprotocol/frame/FrameEncoder.java b/src/main/java/io/orprotocol/frame/FrameEncoder.java index 7a9a5e3..3a171f4 100644 --- a/src/main/java/io/orprotocol/frame/FrameEncoder.java +++ b/src/main/java/io/orprotocol/frame/FrameEncoder.java @@ -13,20 +13,25 @@ import io.orprotocol.packet.PacketManager; public class FrameEncoder { private final Packet packet; - private final Client recipient; + private final int version; private final PacketManager packetManager; public FrameEncoder(Packet packet, Client recipient, PacketManager packetManager) { this.packet = packet; - this.recipient = recipient; + this.version = recipient.getVersion(); + this.packetManager = packetManager; + } + + public FrameEncoder(Packet packet, int version, PacketManager packetManager) { + this.packet = packet; + this.version = version; this.packetManager = packetManager; } public byte[] encode() throws ProtocolException { - int recipientVersion = this.recipient.getVersion(); @SuppressWarnings("unchecked") Codec codec = (Codec) this.packetManager.getCodec(this.packet.packetId); - return codec.encode(this.packet, recipientVersion); + return codec.encode(this.packet, this.version); } }