Завершение перехода на новую версию протокола

This commit is contained in:
2026-04-19 12:59:16 +02:00
parent 00ca218c06
commit 07b4e9daa2
10 changed files with 121 additions and 74 deletions

View File

@@ -4,27 +4,27 @@ import im.rosetta.calls.CallManager;
import im.rosetta.client.ClientManager;
import im.rosetta.client.OnlineManager;
import im.rosetta.event.EventManager;
import im.rosetta.executors.ExecutorDeviceResolve;
import im.rosetta.executors.ExecutorGroupBan;
import im.rosetta.executors.ExecutorGroupCreate;
import im.rosetta.executors.ExecutorGroupInfo;
import im.rosetta.executors.ExecutorGroupInviteInfo;
import im.rosetta.executors.ExecutorGroupJoin;
import im.rosetta.executors.ExecutorGroupLeave;
import im.rosetta.executors.ExecutorHandshake;
import im.rosetta.executors.Executor10RequestUpdate;
import im.rosetta.executors.Executor11Typeing;
import im.rosetta.executors.Executor15RequestTransport;
import im.rosetta.executors.Executor16PushNotification;
import im.rosetta.executors.Executor17GroupCreate;
import im.rosetta.executors.Executor18GroupInfo;
import im.rosetta.executors.Executor19GroupInviteInfo;
import im.rosetta.executors.ExecutorIceServers;
import im.rosetta.executors.ExecutorMessage;
import im.rosetta.executors.ExecutorUserInfo;
import im.rosetta.executors.Executor20GroupJoin;
import im.rosetta.executors.Executor21GroupLeave;
import im.rosetta.executors.Executor22GroupBan;
import im.rosetta.executors.Executor24DeviceResolve;
import im.rosetta.executors.Executor25Sync;
import im.rosetta.executors.Executor26SignalPeer;
import im.rosetta.executors.Executor27WebRTC;
import im.rosetta.executors.Executor28IceServers;
import im.rosetta.executors.ExecutorWebRTC;
import im.rosetta.executors.ExecutorSearch;
import im.rosetta.executors.ExecutorSignalPeer;
import im.rosetta.executors.ExecutorSync;
import im.rosetta.executors.ExecutorTypeing;
import im.rosetta.executors.ExecutorOnlineState;
import im.rosetta.executors.Executor6Message;
import im.rosetta.executors.Executor7Read;
import im.rosetta.executors.ExecutorPushNotification;
import im.rosetta.executors.ExecutorRead;
import im.rosetta.executors.ExecutorRequestTransport;
import im.rosetta.executors.ExecutorRequestUpdate;
import im.rosetta.listeners.DeviceListListener;
import im.rosetta.listeners.HandshakeCompleteListener;
import im.rosetta.listeners.OnlineStatusDisconnectListener;
@@ -160,23 +160,23 @@ public class Boot {
this.packetManager.registerExecutor(1, new ExecutorUserInfo());
this.packetManager.registerExecutor(3, new ExecutorSearch(this.clientManager));
this.packetManager.registerExecutor(4, new ExecutorOnlineState(this.onlineManager, this.clientManager));
this.packetManager.registerExecutor(6, new Executor6Message(this.clientManager, this.packetManager));
this.packetManager.registerExecutor(7, new Executor7Read(this.clientManager, this.packetManager));
this.packetManager.registerExecutor(10, new Executor10RequestUpdate());
this.packetManager.registerExecutor(11, new Executor11Typeing(this.clientManager, this.packetManager));
this.packetManager.registerExecutor(15, new Executor15RequestTransport());
this.packetManager.registerExecutor(16, new Executor16PushNotification());
this.packetManager.registerExecutor(17, new Executor17GroupCreate());
this.packetManager.registerExecutor(18, new Executor18GroupInfo());
this.packetManager.registerExecutor(19, new Executor19GroupInviteInfo());
this.packetManager.registerExecutor(20, new Executor20GroupJoin(this.packetManager));
this.packetManager.registerExecutor(21, new Executor21GroupLeave());
this.packetManager.registerExecutor(22, new Executor22GroupBan());
this.packetManager.registerExecutor(24, new Executor24DeviceResolve(this.clientManager, this.eventManager, this.packetManager));
this.packetManager.registerExecutor(25, new Executor25Sync(this.packetManager));
this.packetManager.registerExecutor(26, new Executor26SignalPeer(this.clientManager, this.forwardUnitService, this.callManager));
this.packetManager.registerExecutor(27, new Executor27WebRTC(this.callManager));
this.packetManager.registerExecutor(28, new Executor28IceServers(this.forwardUnitService));
this.packetManager.registerExecutor(6, new ExecutorMessage(this.clientManager, this.packetManager));
this.packetManager.registerExecutor(7, new ExecutorRead(this.clientManager, this.packetManager));
this.packetManager.registerExecutor(10, new ExecutorRequestUpdate());
this.packetManager.registerExecutor(11, new ExecutorTypeing(this.clientManager, this.packetManager));
this.packetManager.registerExecutor(15, new ExecutorRequestTransport());
this.packetManager.registerExecutor(16, new ExecutorPushNotification());
this.packetManager.registerExecutor(17, new ExecutorGroupCreate());
this.packetManager.registerExecutor(18, new ExecutorGroupInfo());
this.packetManager.registerExecutor(19, new ExecutorGroupInviteInfo());
this.packetManager.registerExecutor(20, new ExecutorGroupJoin(this.packetManager));
this.packetManager.registerExecutor(21, new ExecutorGroupLeave());
this.packetManager.registerExecutor(22, new ExecutorGroupBan());
this.packetManager.registerExecutor(24, new ExecutorDeviceResolve(this.clientManager, this.eventManager, this.packetManager));
this.packetManager.registerExecutor(25, new ExecutorSync(this.packetManager));
this.packetManager.registerExecutor(26, new ExecutorSignalPeer(this.clientManager, this.forwardUnitService, this.callManager));
this.packetManager.registerExecutor(27, new ExecutorWebRTC(this.callManager));
this.packetManager.registerExecutor(28, new ExecutorIceServers(this.forwardUnitService));
}
private void printBootMessage() {

View File

@@ -7,8 +7,8 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import im.rosetta.client.ClientManager;
import im.rosetta.packet.Packet26SignalPeer;
import im.rosetta.packet.runtime.NetworkSignalType;
import im.rosetta.network.enums.SignalType;
import im.rosetta.network.packet.PacketSignalPeer;
import io.g365sfu.Room;
import io.orprotocol.ProtocolException;
import io.orprotocol.client.Client;
@@ -40,10 +40,10 @@ public class CallManager {
/**
* Отправляем всем в сессии что звонок завершился, так как он устарел, и удаляем сессию из списка активных сессий
*/
Packet26SignalPeer rtout = new Packet26SignalPeer();
rtout.setSignalType(NetworkSignalType.RINGING_TIMEOUT);
Packet26SignalPeer endCallPacket = new Packet26SignalPeer();
endCallPacket.setSignalType(NetworkSignalType.END_CALL);
PacketSignalPeer rtout = new PacketSignalPeer();
rtout.setSignalType(SignalType.RINGING_TIMEOUT);
PacketSignalPeer endCallPacket = new PacketSignalPeer();
endCallPacket.setSignalType(SignalType.END_CALL);
endCallPacket.setJoinToken(session.getJoinToken());
endCallPacket.setCallId(session.getCallId());

View File

@@ -32,6 +32,9 @@ public class Buffer extends CreateUpdateEntity {
@Column(name = "packetId")
private int packetId;
@Column(name = "version")
private int version;
@Column(name = "packet", columnDefinition = "bytea")
private byte[] packet;
@@ -79,6 +82,14 @@ public class Buffer extends CreateUpdateEntity {
return packetId;
}
public int getVersion() {
return version;
}
public void setVersion(int version) {
this.version = version;
}
public void setPacketId(int packetId) {
this.packetId = packetId;
}

View File

@@ -57,6 +57,9 @@ public final class RccGeneratedPacketRegistry {
public static final int PACKET_WEB_RTC_ID = 27;
public static final int PACKET_ICE_SERVERS_ID = 28;
public static final int MAX_VERSION = 1;
public static final int MIN_VERSION = 1;
private RccGeneratedPacketRegistry() {}
public static void registerAll(PacketManager manager) {

View File

@@ -9,10 +9,10 @@ import im.rosetta.client.ClientManager;
import im.rosetta.client.tags.ECIDevice;
import im.rosetta.database.entity.Device;
import im.rosetta.database.repository.DeviceRepository;
import im.rosetta.packet.Packet23DeviceList;
import im.rosetta.packet.runtime.DeviceSolution;
import im.rosetta.packet.runtime.NetworkDevice;
import im.rosetta.packet.runtime.NetworkStatus;
import im.rosetta.network.enums.DeviceSolution;
import im.rosetta.network.enums.NetworkStatus;
import im.rosetta.network.packet.PacketDeviceList;
import im.rosetta.network.types.NetworkDevice;
import im.rosetta.service.services.DeviceService;
import io.orprotocol.ProtocolException;
@@ -77,7 +77,7 @@ public class DeviceDispatcher {
}
List<NetworkDevice> networkDevices = new ArrayList<>(byId.values());
Packet23DeviceList packet = new Packet23DeviceList();
PacketDeviceList packet = new PacketDeviceList();
packet.setDevices(networkDevices);
this.clientManager.sendPacketToAuthorizedPK(publicKey, packet);
}

View File

@@ -7,7 +7,7 @@ import java.util.concurrent.Executors;
import im.rosetta.database.entity.PushToken;
import im.rosetta.database.repository.PushTokenRepository;
import im.rosetta.packet.runtime.TokenType;
import im.rosetta.network.enums.TokenType;
import im.rosetta.service.dispatch.push.dispatchers.FCM;
import im.rosetta.service.dispatch.push.dispatchers.VoIPApns;

View File

@@ -10,11 +10,14 @@ import im.rosetta.database.entity.Buffer;
import im.rosetta.database.repository.BufferRepository;
import im.rosetta.database.repository.GroupRepository;
import im.rosetta.exception.UnauthorizedExeception;
import im.rosetta.packet.Packet7Read;
import im.rosetta.network.RccGeneratedPacketRegistry;
import im.rosetta.network.packet.PacketRead;
import im.rosetta.service.Service;
import im.rosetta.service.services.runtime.PacketBuffer;
import io.orprotocol.ProtocolException;
import io.orprotocol.client.Client;
import io.orprotocol.frame.FrameDecoder;
import io.orprotocol.frame.FrameEncoder;
import io.orprotocol.packet.Packet;
import io.orprotocol.packet.PacketManager;
@@ -68,8 +71,13 @@ public class BufferService extends Service<BufferRepository> {
try(QuerySession<Buffer> querySession = this.getRepository().buildQuery(hql, parameters)){
List<Buffer> buffers = querySession.getQuery().setMaxResults(take).list();
for(Buffer buffer : buffers) {
/**
* Декодируем каждый пакет используя ту версию с которой он к нам пришел,
* на выходе получаем последнюю версию пакета
*/
byte[] packetBytes = buffer.getPacket();
Packet packet = this.packetManager.createPacket(packetBytes);
FrameDecoder decoder = new FrameDecoder(packetBytes, buffer.getVersion(), this.packetManager);
Packet packet = decoder.decode();
packets.add(packet);
}
if(!buffers.isEmpty()){
@@ -84,11 +92,12 @@ public class BufferService extends Service<BufferRepository> {
* @param from публичный ключ отправителя пакета
* @param to публичный ключ получателя пакета
* @param packet пакет для добавления в буфер
* @throws ProtocolException
*/
public void pushPacketToBuffer(String from, String to, Packet packet) {
public void pushPacketToBuffer(String from, String to, Packet packet) throws ProtocolException {
int packetId = this.packetManager.getPacketIdByClass(packet.getClass());
packet.packetId = packetId;
if(packet instanceof Packet7Read){
if(packet instanceof PacketRead){
/**
* Если это пакет чтения, то получаем последний пакет из буфера, если это уже чтение - то чтение больше не нужно
* записывать, так как эффект от нескольких пакетов чтения один и тот же, как и от одного, однако
@@ -114,9 +123,22 @@ public class BufferService extends Service<BufferRepository> {
}
}
}
byte[] packetBytes = packet.write().getBuffer();
/**
* Так как пакет у нас последней версии на сервере, то получаем максимальную версию протокола
* (Packet закодирован всегда в нее, потому что сервер всегда работает с последними версиями)
* и пишем в буфер байты максимальной версии пакета и саму
* версию которой принадлежат байты
*/
FrameEncoder encoder = new FrameEncoder(packet, RccGeneratedPacketRegistry.MAX_VERSION, packetManager);
byte[] packetBytes = encoder.encode();
Buffer buffer = new Buffer();
buffer.setFrom(from);
/**
* Пишем текующую максимальную версию в буфер потому что максимальная
* версия может и смениться, а при чтении нам нужно точно знать
* с какой версии читать пакет
*/
buffer.setVersion(RccGeneratedPacketRegistry.MAX_VERSION);
buffer.setTo(to);
buffer.setTimestamp(System.currentTimeMillis());
buffer.setPacketId(packetId);

View File

@@ -12,10 +12,10 @@ import im.rosetta.calls.CallSession;
import im.rosetta.client.ClientManager;
import im.rosetta.logger.Logger;
import im.rosetta.logger.enums.Color;
import im.rosetta.packet.Packet26SignalPeer;
import im.rosetta.packet.Packet27WebRTC;
import im.rosetta.packet.runtime.NetworkSignalType;
import im.rosetta.packet.runtime.NetworkWebRTCType;
import im.rosetta.network.enums.SignalType;
import im.rosetta.network.enums.WebRTCType;
import im.rosetta.network.packet.PacketSignalPeer;
import im.rosetta.network.packet.PacketWebRTC;
import io.g365sfu.Room;
import io.g365sfu.SFU;
import io.g365sfu.net.DisconnectReason;
@@ -147,8 +147,8 @@ public class ForwardUnitService {
* Произошло нештатное отключение клиента от сервера SFU, например, из-за сбоя сети
*/
if(callSession.shouldRemove()){
Packet26SignalPeer packet = new Packet26SignalPeer();
packet.setSignalType(NetworkSignalType.END_CALL_BECAUSE_PEER_DISCONNECTED);
PacketSignalPeer packet = new PacketSignalPeer();
packet.setSignalType(SignalType.END_CALL_BECAUSE_PEER_DISCONNECTED);
callSession.sendPacket(packet, null);
this.callManager.removeSession(callSession);
}
@@ -160,8 +160,8 @@ public class ForwardUnitService {
* (например если клиент отрабатывает выход из звонка по кнопке END не правильно)
*/
if(callSession.shouldRemove()){
Packet26SignalPeer packet = new Packet26SignalPeer();
packet.setSignalType(NetworkSignalType.END_CALL);
PacketSignalPeer packet = new PacketSignalPeer();
packet.setSignalType(SignalType.END_CALL);
callSession.sendPacket(packet, null);
this.callManager.removeSession(callSession);
}
@@ -176,9 +176,9 @@ public class ForwardUnitService {
*/
public void onSdpAnswer(SDPAnswer sdpAnswer) throws ProtocolException {
String participantId = sdpAnswer.getParticipantId();
Packet27WebRTC packet = new Packet27WebRTC();
PacketWebRTC packet = new PacketWebRTC();
packet.setSdpOrCandidate(sdpAnswer.getSdp());
packet.setType(NetworkWebRTCType.ANSWER);
packet.setType(WebRTCType.ANSWER);
this.clientManager.sendPacketToAuthorizedPK(participantId, packet);
}
@@ -191,9 +191,9 @@ public class ForwardUnitService {
*/
public void onIceCandidate(ICECandidate iceCandidate) throws ProtocolException {
String publicKey = iceCandidate.getParticipantId();
Packet27WebRTC packet = new Packet27WebRTC();
PacketWebRTC packet = new PacketWebRTC();
packet.setSdpOrCandidate(iceCandidate.getCandidate());
packet.setType(NetworkWebRTCType.ICE_CANDIDATE);
packet.setType(WebRTCType.ICE_CANDIDATE);
this.clientManager.sendPacketToAuthorizedPK(publicKey, packet);
}
@@ -206,9 +206,9 @@ public class ForwardUnitService {
*/
public void onSdpOffer(SDPOffer sdpOffer) throws ProtocolException {
String participantId = sdpOffer.getParticipantId();
Packet27WebRTC packet = new Packet27WebRTC();
PacketWebRTC packet = new PacketWebRTC();
packet.setSdpOrCandidate(sdpOffer.getSdp());
packet.setType(NetworkWebRTCType.OFFER);
packet.setType(WebRTCType.OFFER);
this.clientManager.sendPacketToAuthorizedPK(participantId, packet);
}

View File

@@ -14,26 +14,32 @@ import io.orprotocol.packet.PacketManager;
public class FrameDecoder {
private final byte[] frame;
private final Client sender;
private final PacketManager packetManager;
private final int version;
public FrameDecoder(byte[] frame, Client sender, PacketManager packetManager) {
this.frame = frame;
this.sender = sender;
this.version = sender.getVersion();
this.packetManager = packetManager;
}
public FrameDecoder(byte[] frame, int version, PacketManager packetManager) {
this.frame = frame;
this.version = version;
this.packetManager = packetManager;
}
public Packet decode() throws ProtocolException {
int senderVersion = this.sender.getVersion();
Buffer buf = Buffer.wrap(this.frame);
int packetId = buf.readUInt16();
Codec<? extends Packet> codec = this.packetManager.getCodec(packetId);
Packet packet = codec.decode(buf, senderVersion);
Packet packet = codec.decode(buf, this.version);
packet.packetId = packetId;
/**
* Буфер уже смещен на 2 байта, так что декодирование в кодеке начинается с данных пакета, а не с ID
*/
return codec.decode(buf, senderVersion);
return packet;
}

View File

@@ -13,20 +13,25 @@ import io.orprotocol.packet.PacketManager;
public class FrameEncoder {
private final Packet packet;
private final Client recipient;
private final int version;
private final PacketManager packetManager;
public FrameEncoder(Packet packet, Client recipient, PacketManager packetManager) {
this.packet = packet;
this.recipient = recipient;
this.version = recipient.getVersion();
this.packetManager = packetManager;
}
public FrameEncoder(Packet packet, int version, PacketManager packetManager) {
this.packet = packet;
this.version = version;
this.packetManager = packetManager;
}
public byte[] encode() throws ProtocolException {
int recipientVersion = this.recipient.getVersion();
@SuppressWarnings("unchecked")
Codec<Packet> codec = (Codec<Packet>) this.packetManager.getCodec(this.packet.packetId);
return codec.encode(this.packet, recipientVersion);
return codec.encode(this.packet, this.version);
}
}