From 7dc94678bab32b00a799ef88b51dc36c94a26b19 Mon Sep 17 00:00:00 2001 From: RoyceDa Date: Sun, 15 Feb 2026 18:14:42 +0200 Subject: [PATCH] =?UTF-8?q?=D0=9F=D0=BE=D0=BB=D0=BD=D0=B0=D1=8F=20=D1=80?= =?UTF-8?q?=D0=B5=D0=B0=D0=BB=D0=B8=D0=B7=D0=B0=D1=86=D0=B8=D1=8F=20=D1=81?= =?UTF-8?q?=D0=B8=D0=BD=D1=85=D1=80=D0=BE=D0=BD=D0=B8=D0=B7=D0=B0=D1=86?= =?UTF-8?q?=D0=B8=D0=B8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/main/java/im/rosetta/Boot.java | 4 ++ .../java/im/rosetta/client/ClientManager.java | 4 ++ .../im/rosetta/database/entity/Device.java | 12 ++-- .../database/repository/DeviceRepository.java | 13 ++-- .../rosetta/executors/Executor0Handshake.java | 27 ++++---- .../executors/Executor24DeviceResolve.java | 2 +- .../im/rosetta/executors/Executor25Sync.java | 66 +++++++++++++++++++ .../rosetta/listeners/DeviceListListener.java | 16 ++++- .../rosetta/listeners/ServerStopListener.java | 2 +- .../java/im/rosetta/packet/Packet25Sync.java | 66 +++++++++++++++++++ .../packet/runtime/NetworkSyncStatus.java | 35 ++++++++++ .../service/dispatch/MessageDispatcher.java | 35 ++++++++++ .../service/services/BufferService.java | 15 +++-- .../service/services/DeviceService.java | 37 +++++++++++ .../services/runtime/PacketBuffer.java | 31 +++++++++ 15 files changed, 333 insertions(+), 32 deletions(-) create mode 100644 src/main/java/im/rosetta/executors/Executor25Sync.java create mode 100644 src/main/java/im/rosetta/packet/Packet25Sync.java create mode 100644 src/main/java/im/rosetta/packet/runtime/NetworkSyncStatus.java create mode 100644 src/main/java/im/rosetta/service/services/runtime/PacketBuffer.java diff --git a/src/main/java/im/rosetta/Boot.java b/src/main/java/im/rosetta/Boot.java index 9235654..2e11c8f 100644 --- a/src/main/java/im/rosetta/Boot.java +++ b/src/main/java/im/rosetta/Boot.java @@ -16,6 +16,7 @@ import im.rosetta.executors.Executor20GroupJoin; import im.rosetta.executors.Executor21GroupLeave; import im.rosetta.executors.Executor22GroupBan; import im.rosetta.executors.Executor24DeviceResolve; +import im.rosetta.executors.Executor25Sync; import im.rosetta.executors.Executor3Search; import im.rosetta.executors.Executor4OnlineState; import im.rosetta.executors.Executor6Message; @@ -42,6 +43,7 @@ import im.rosetta.packet.Packet21GroupLeave; import im.rosetta.packet.Packet22GroupBan; import im.rosetta.packet.Packet23DeviceList; import im.rosetta.packet.Packet24DeviceResolve; +import im.rosetta.packet.Packet25Sync; import im.rosetta.packet.Packet2Result; import im.rosetta.packet.Packet3Search; import im.rosetta.packet.Packet4OnlineSubscribe; @@ -181,6 +183,7 @@ public class Boot { this.packetManager.registerPacket(22, Packet22GroupBan.class); this.packetManager.registerPacket(23, Packet23DeviceList.class); this.packetManager.registerPacket(24, Packet24DeviceResolve.class); + this.packetManager.registerPacket(25, Packet25Sync.class); } private void registerAllExecutors() { @@ -201,6 +204,7 @@ public class Boot { this.packetManager.registerExecutor(21, new Executor21GroupLeave()); this.packetManager.registerExecutor(22, new Executor22GroupBan()); this.packetManager.registerExecutor(24, new Executor24DeviceResolve(this.clientManager, this.eventManager)); + this.packetManager.registerExecutor(25, new Executor25Sync(this.packetManager)); } private void printBootMessage() { diff --git a/src/main/java/im/rosetta/client/ClientManager.java b/src/main/java/im/rosetta/client/ClientManager.java index 95198c8..b4c8112 100644 --- a/src/main/java/im/rosetta/client/ClientManager.java +++ b/src/main/java/im/rosetta/client/ClientManager.java @@ -28,6 +28,10 @@ public class ClientManager { return this.server; } + public ClientIndexer getClientIndexer() { + return this.clientIndexer; + } + public boolean isClientConnected(String publicKey) { HashSet clients = this.clientIndexer.getClients(ECIAuthentificate.class, "publicKey", publicKey); if(clients == null){ diff --git a/src/main/java/im/rosetta/database/entity/Device.java b/src/main/java/im/rosetta/database/entity/Device.java index 52700a6..b3aa270 100644 --- a/src/main/java/im/rosetta/database/entity/Device.java +++ b/src/main/java/im/rosetta/database/entity/Device.java @@ -32,8 +32,8 @@ public class Device extends CreateUpdateEntity { /** * Время завершения сессии устройства */ - @Column(name = "leaveTime", nullable = true, columnDefinition = "bigint default 0") - private Long leaveTime; + @Column(name = "syncTime", nullable = true, columnDefinition = "bigint default 0") + private Long syncTime; public Long getId() { return id; @@ -55,12 +55,12 @@ public class Device extends CreateUpdateEntity { return deviceOs; } - public Long getLeaveTime() { - return leaveTime; + public Long getSyncTime() { + return syncTime; } - public void setLeaveTime(Long leaveTime) { - this.leaveTime = leaveTime; + public void setSyncTime(Long syncTime) { + this.syncTime = syncTime; } public void setPublicKey(String publicKey) { diff --git a/src/main/java/im/rosetta/database/repository/DeviceRepository.java b/src/main/java/im/rosetta/database/repository/DeviceRepository.java index 128b869..5fc5f22 100644 --- a/src/main/java/im/rosetta/database/repository/DeviceRepository.java +++ b/src/main/java/im/rosetta/database/repository/DeviceRepository.java @@ -1,5 +1,6 @@ package im.rosetta.database.repository; +import java.util.HashMap; import java.util.List; import im.rosetta.database.Repository; @@ -31,15 +32,19 @@ public class DeviceRepository extends Repository { } /** - * Обновляет время последней активности устройства + * Обновляет время последней активности устройства в аккаунте * @param deviceId ID устройства + * @param publicKey публичный ключ аккаунта в котором нужно обновить последнюю активность */ - public void updateDeviceLeaveTime(String deviceId) { - Device device = this.findByField("deviceId", deviceId); + public void updateDeviceLeaveTime(String deviceId, String publicKey) { + Device device = this.findByField(new HashMap(){{ + put("deviceId", deviceId); + put("publicKey", publicKey); + }}); if(device == null) { return; } - device.setLeaveTime(System.currentTimeMillis()); + device.setSyncTime(System.currentTimeMillis()); this.update(device); } diff --git a/src/main/java/im/rosetta/executors/Executor0Handshake.java b/src/main/java/im/rosetta/executors/Executor0Handshake.java index da59bcd..9c3e27d 100644 --- a/src/main/java/im/rosetta/executors/Executor0Handshake.java +++ b/src/main/java/im/rosetta/executors/Executor0Handshake.java @@ -96,12 +96,25 @@ public class Executor0Handshake extends PacketExecutor { userRepository.save(user); + /** + * Это первое устройство пользователя, сохраняем его + * как верифицированное + */ + Device newDevice = new Device(); + newDevice.setDeviceId(deviceId); + newDevice.setDeviceName(deviceName); + newDevice.setDeviceOs(deviceOs); + newDevice.setPublicKey(publicKey); + newDevice.setSyncTime(System.currentTimeMillis()); + deviceRepository.save(newDevice); + /** * Ставим метку аутентификации на клиента */ ECIAuthentificate eciTag = new ECIAuthentificate (publicKey, privateKey, HandshakeStage.COMPLETED); client.addTag(ECIAuthentificate.class, eciTag); + /** * Вызываем событие завершения хэндшейка */ @@ -186,20 +199,6 @@ public class Executor0Handshake extends PacketExecutor { this.bufferService.pushPacketToBuffer("server", publicKey, newDevicePacket); return; } - - if(userDevicesCount == 0) { - /** - * Это первое устройство пользователя, сохраняем его - * как верифицированное - */ - Device newDevice = new Device(); - newDevice.setDeviceId(deviceId); - newDevice.setDeviceName(deviceName); - newDevice.setDeviceOs(deviceOs); - newDevice.setPublicKey(publicKey); - newDevice.setLeaveTime(System.currentTimeMillis()); - deviceRepository.save(newDevice); - } /** * Ставим метку аутентификации на клиента diff --git a/src/main/java/im/rosetta/executors/Executor24DeviceResolve.java b/src/main/java/im/rosetta/executors/Executor24DeviceResolve.java index 85a1511..3554ba4 100644 --- a/src/main/java/im/rosetta/executors/Executor24DeviceResolve.java +++ b/src/main/java/im/rosetta/executors/Executor24DeviceResolve.java @@ -74,7 +74,7 @@ public class Executor24DeviceResolve extends PacketExecutor { + + private final BufferRepository bufferRepository = new BufferRepository(); + private final BufferService bufferService; + + public Executor25Sync(PacketManager packetManager) { + this.bufferService = new BufferService(bufferRepository, packetManager); + } + + @Override + public void onPacketReceived(Packet25Sync packet, Client client) throws Exception, ProtocolException { + /** + * Начиная с какого времени клиент желает получить синхронизацию + */ + long fromTimestampMs = packet.getTimestamp(); + + PacketBuffer packetBuffer = this.bufferService.getPacketsFromTime(client, fromTimestampMs, 50); + List packets = packetBuffer.getPackets(); + + if(packets.isEmpty()){ + /** + * Нет пакетов для синхронизации, сообщаем клиенту что он синхронизирован + */ + packet.setSyncStatus(NetworkSyncStatus.NOT_NEEDED); + client.send(packet); + return; + } + + /** + * Отправляем клиенту информацию о том, что синхронизация началась + */ + packet.setSyncStatus(NetworkSyncStatus.BATCH_START); + client.send(packet); + + /** + * Отправляем клиенту пакеты для синхронизации + */ + + for(Packet syncPacket : packets){ + client.send(syncPacket); + } + + /** + * Сообщаем клиенту, что синхронизация завершена + */ + packet.setSyncStatus(NetworkSyncStatus.BATCH_END); + packet.setTimestamp(packetBuffer.getLastPacketTimestamp()); + client.send(packet); + } + +} diff --git a/src/main/java/im/rosetta/listeners/DeviceListListener.java b/src/main/java/im/rosetta/listeners/DeviceListListener.java index 5a3243f..a589664 100644 --- a/src/main/java/im/rosetta/listeners/DeviceListListener.java +++ b/src/main/java/im/rosetta/listeners/DeviceListListener.java @@ -2,19 +2,21 @@ package im.rosetta.listeners; import im.rosetta.client.ClientManager; import im.rosetta.client.tags.ECIAuthentificate; +import im.rosetta.client.tags.ECIDevice; +import im.rosetta.database.repository.DeviceRepository; import im.rosetta.event.EventHandler; import im.rosetta.event.Listener; import im.rosetta.event.events.DisconnectEvent; import im.rosetta.event.events.handshake.HandshakeCompletedEvent; import im.rosetta.event.events.handshake.HandshakeDeviceConfirmEvent; import im.rosetta.service.dispatch.DeviceDispatcher; - import io.orprotocol.ProtocolException; import io.orprotocol.client.Client; public class DeviceListListener implements Listener { private final DeviceDispatcher deviceDispatcher; + private final DeviceRepository deviceRepository = new DeviceRepository(); public DeviceListListener(ClientManager clientManager) { this.deviceDispatcher = new DeviceDispatcher(clientManager); @@ -48,11 +50,23 @@ public class DeviceListListener implements Listener { public void onDisconnect(DisconnectEvent event) throws ProtocolException { Client client = event.getClient(); ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class); + ECIDevice eciDevice = client.getTag(ECIDevice.class); + if(eciDevice == null){ + /** + * Если у клиента нет тега устройства, пропускаем его + * такого быть не должно, но на всякий случай + */ + return; + } if(eciAuthentificate != null){ /** * Когда устройство отключается от аккаунта, отправляем всем клиентам с этим публичным ключом обновленный список устройств */ this.deviceDispatcher.sendDevices(eciAuthentificate.getPublicKey()); + /** + * Обновляем время последнего онлайна устройства, которое отключилось, для корректной работы синхронизации сообщений + */ + this.deviceRepository.updateDeviceLeaveTime(eciDevice.getDeviceId(), eciAuthentificate.getPublicKey()); } } diff --git a/src/main/java/im/rosetta/listeners/ServerStopListener.java b/src/main/java/im/rosetta/listeners/ServerStopListener.java index b911fba..84e1073 100644 --- a/src/main/java/im/rosetta/listeners/ServerStopListener.java +++ b/src/main/java/im/rosetta/listeners/ServerStopListener.java @@ -47,7 +47,7 @@ public class ServerStopListener implements Listener { */ continue; } - deviceRepository.updateDeviceLeaveTime(eciDevice.getDeviceId()); + deviceRepository.updateDeviceLeaveTime(eciDevice.getDeviceId(), eciAuth.getPublicKey()); } this.logger.info(Color.RED + "Время последней активности устройств клиентов обновлено."); } diff --git a/src/main/java/im/rosetta/packet/Packet25Sync.java b/src/main/java/im/rosetta/packet/Packet25Sync.java new file mode 100644 index 0000000..15437dd --- /dev/null +++ b/src/main/java/im/rosetta/packet/Packet25Sync.java @@ -0,0 +1,66 @@ +package im.rosetta.packet; + +import im.rosetta.packet.runtime.NetworkSyncStatus; +import io.orprotocol.Stream; +import io.orprotocol.packet.Packet; + +/** + * Пакет отправляет клиент устанавливая timestamp последнего синхронизированного пакета, + * а так же статус синхронизации, который может быть: + * NOT_NEEDED - синхронизация не требуется, так как устройство уже синхронизировано или не требует синхронизации + * BATCH_START - начало синхронизации, сервер начинает отправлять клиенту пакеты для синхронизации, клиент должен подготовиться к приему пакетов для синхронизации + * BATCH_END - конец синхронизации, сервер завершил отправку пакетов для синхронизации, клиент может завершить процесс синхронизации + */ +public class Packet25Sync extends Packet { + + private NetworkSyncStatus syncStatus; + private long timestamp; + + @Override + public void read(Stream stream) { + this.syncStatus = NetworkSyncStatus.fromValue(stream.readInt8()); + this.timestamp = stream.readInt64(); + } + + @Override + public Stream write() { + Stream stream = new Stream(); + stream.writeInt16(this.packetId); + stream.writeInt8(this.syncStatus.getValue()); + stream.writeInt64(this.timestamp); + return stream; + } + + /** + * Получить статус синхронизации + * @return статус синхронизации + */ + public NetworkSyncStatus getSyncStatus() { + return syncStatus; + } + + /** + * Установить статус синхронизации + * @param syncStatus статус синхронизации + */ + public void setSyncStatus(NetworkSyncStatus syncStatus) { + this.syncStatus = syncStatus; + } + + /** + * Получить timestamp последнего синхронизированного пакета + * @return timestamp последнего синхронизированного пакета + */ + public long getTimestamp() { + return timestamp; + } + + /** + * Установить timestamp последнего синхронизированного пакета + * @param timestamp timestamp последнего синхронизированного пакета + */ + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + +} diff --git a/src/main/java/im/rosetta/packet/runtime/NetworkSyncStatus.java b/src/main/java/im/rosetta/packet/runtime/NetworkSyncStatus.java new file mode 100644 index 0000000..933b6ea --- /dev/null +++ b/src/main/java/im/rosetta/packet/runtime/NetworkSyncStatus.java @@ -0,0 +1,35 @@ +package im.rosetta.packet.runtime; + +public enum NetworkSyncStatus { + /** + * Синхронизация не требуется, так как устройство уже синхронизировано или не требует синхронизации + */ + NOT_NEEDED(0), + /** + * Начало синхронизации, сервер начинает отправлять клиенту пакеты для синхронизации, клиент должен подготовиться к приему пакетов для синхронизации + */ + BATCH_START(1), + /** + * Конец синхронизации, сервер завершил отправку пакетов для синхронизации, клиент может завершить процесс синхронизации + */ + BATCH_END(2); + + private final int value; + + NetworkSyncStatus(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + + public static NetworkSyncStatus fromValue(int value) { + for (NetworkSyncStatus status : NetworkSyncStatus.values()) { + if (status.getValue() == value) { + return status; + } + } + throw new IllegalArgumentException("Unknown NetworkSyncStatus value: " + value); + } +} \ No newline at end of file diff --git a/src/main/java/im/rosetta/service/dispatch/MessageDispatcher.java b/src/main/java/im/rosetta/service/dispatch/MessageDispatcher.java index 1dd5934..691e29c 100644 --- a/src/main/java/im/rosetta/service/dispatch/MessageDispatcher.java +++ b/src/main/java/im/rosetta/service/dispatch/MessageDispatcher.java @@ -1,5 +1,6 @@ package im.rosetta.service.dispatch; +import java.util.HashSet; import java.util.List; import im.rosetta.Failures; @@ -96,6 +97,11 @@ public class MessageDispatcher { * Сохраняем сообщение в буфер на случай если получатель офлайн, или нам нужна будет синхронизация сообщений для получателя */ this.bufferService.pushPacketToBuffer(fromPublicKey, toPublicKey, packet); + + /** + * Ретранслируем сообщение всем авторизованным сессиям отправителя, чтобы синхронизировать отправленные сообщения + */ + this.retranslate(packet, client); } /** @@ -111,5 +117,34 @@ public class MessageDispatcher { this.sendPeer(packet, client, true); } + /** + * Сообщает всем авторизованным сессиям отправителя о том, что он отправил сообщения, + * для того чтобы синхронизировать отправленные сообщения на всех устройствах отправителя + * @param packet пакет сообщения + * @param client клиент отправляющий пакет + * @throws ProtocolException + */ + public void retranslate(PacketBaseDialog packet, Client client) throws ProtocolException { + ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class); + HashSet clients = this.clientManager.getClientIndexer() + .getClients(ECIAuthentificate.class, "publicKey", eciAuthentificate.getPublicKey()); + if(clients == null){ + /** + * Нет авторизованных сессий с таким публичным ключом + */ + return; + } + for(Client c : clients){ + /** + * Проходим по всем устройствам с таким публичным ключом и ретранслируем им пакет, кроме того устройства что + * отправило пакет + */ + if(c.equals(client)){ + continue; + } + c.send(packet); + } + } + } diff --git a/src/main/java/im/rosetta/service/services/BufferService.java b/src/main/java/im/rosetta/service/services/BufferService.java index 6ff5a2a..f7fbaa3 100644 --- a/src/main/java/im/rosetta/service/services/BufferService.java +++ b/src/main/java/im/rosetta/service/services/BufferService.java @@ -10,7 +10,7 @@ import im.rosetta.database.entity.Buffer; import im.rosetta.database.repository.BufferRepository; import im.rosetta.exception.UnauthorizedExeception; import im.rosetta.service.Service; - +import im.rosetta.service.services.runtime.PacketBuffer; import io.orprotocol.ProtocolException; import io.orprotocol.client.Client; import io.orprotocol.packet.Packet; @@ -32,7 +32,7 @@ public class BufferService extends Service { * @return * @throws ProtocolException */ - public List getPacketsFromTime(Client client, long fromTimestampMs) throws ProtocolException, UnauthorizedExeception { + public PacketBuffer getPacketsFromTime(Client client, long fromTimestampMs, int take) throws ProtocolException, UnauthorizedExeception { ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class); if(eciAuthentificate == null || !eciAuthentificate.hasAuthorized()){ /** @@ -41,20 +41,25 @@ public class BufferService extends Service { throw new UnauthorizedExeception("Unauthorized client cannot get packets from buffer"); } String toPublicKey = eciAuthentificate.getPublicKey(); - String hql = "FROM Buffer WHERE to = :to AND timestamp > :timestamp ORDER BY timestamp ASC"; + String hql = "FROM Buffer WHERE (to = :to OR from = :from) AND timestamp > :timestamp ORDER BY timestamp ASC"; HashMap parameters = new HashMap<>(); parameters.put("to", toPublicKey); + parameters.put("from", toPublicKey); parameters.put("timestamp", fromTimestampMs); List packets = new ArrayList<>(); + long lastTimestamp = fromTimestampMs; try(QuerySession querySession = this.getRepository().buildQuery(hql, parameters)){ - List buffers = querySession.getQuery().list(); + List buffers = querySession.getQuery().setMaxResults(take).list(); for(Buffer buffer : buffers) { byte[] packetBytes = buffer.getPacket(); Packet packet = this.packetManager.createPacket(packetBytes); packets.add(packet); } + if(!buffers.isEmpty()){ + lastTimestamp = buffers.get(buffers.size() - 1).getTimestamp(); + } } - return packets; + return new PacketBuffer(packets, lastTimestamp); } /** diff --git a/src/main/java/im/rosetta/service/services/DeviceService.java b/src/main/java/im/rosetta/service/services/DeviceService.java index 9b7ab41..b8a0b22 100644 --- a/src/main/java/im/rosetta/service/services/DeviceService.java +++ b/src/main/java/im/rosetta/service/services/DeviceService.java @@ -2,10 +2,13 @@ package im.rosetta.service.services; import java.util.List; +import im.rosetta.client.tags.ECIAuthentificate; +import im.rosetta.client.tags.ECIDevice; import im.rosetta.database.entity.Device; import im.rosetta.database.entity.User; import im.rosetta.database.repository.DeviceRepository; import im.rosetta.service.Service; +import io.orprotocol.client.Client; public class DeviceService extends Service { @@ -45,4 +48,38 @@ public class DeviceService extends Service { return this.getRepository().findAllByField("publicKey", publicKey); } + /** + * Получает время последней синхронизации устройства, для корректной работы синхронизации сообщений + * @param client клиент для которого нужно получить время последней синхронизации устройства + * @return время последней синхронизации устройства, или 0 если устройство не найдено, + * или клиент не авторизован, таким образом вызывающий код синхронизирует все сообщения + */ + public long getLastSyncTime(Client client){ + ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class); + if(eciAuthentificate == null || !eciAuthentificate.hasAuthorized()){ + /** + * Если клиент не авторизован, возвращаем 0, такого быть не должно + */ + return 0; + } + ECIDevice eciDevice = client.getTag(ECIDevice.class); + if(eciDevice == null){ + /** + * Если у клиента нет тега устройства, возвращаем 0, такого быть не должно, но на всякий случай + */ + return 0; + } + Device device = this.getRepository().findByField(new java.util.HashMap(){{ + put("deviceId", eciDevice.getDeviceId()); + put("publicKey", eciAuthentificate.getPublicKey()); + }}); + if(device == null){ + /** + * Если устройство не найдено, возвращаем 0, значит это устройство новое + */ + return 0; + } + return device.getSyncTime(); + } + } diff --git a/src/main/java/im/rosetta/service/services/runtime/PacketBuffer.java b/src/main/java/im/rosetta/service/services/runtime/PacketBuffer.java new file mode 100644 index 0000000..1e5b18e --- /dev/null +++ b/src/main/java/im/rosetta/service/services/runtime/PacketBuffer.java @@ -0,0 +1,31 @@ +package im.rosetta.service.services.runtime; + +import java.util.List; + +import io.orprotocol.packet.Packet; + +/** + * Класс для хранения пакетов для синхронизации и времени последнего пакета для корректной работы синхронизации сообщений + * Когда клиент запрашивает синхронизацию сообщений, мы возвращаем ему список пакетов для + * синхронизации и время последнего пакета, чтобы клиент мог корректно обновить время последней + * синхронизации и не запрашивать одни и те же пакеты при следующей синхронизации + */ +public class PacketBuffer { + + private List packets; + private long lastPacketTimestamp; + + public PacketBuffer(List packets, long lastPacketTimestamp) { + this.packets = packets; + this.lastPacketTimestamp = lastPacketTimestamp; + } + + public List getPackets() { + return packets; + } + + public long getLastPacketTimestamp() { + return lastPacketTimestamp; + } + +}