Полная реализация синхронизации
This commit is contained in:
@@ -16,6 +16,7 @@ import im.rosetta.executors.Executor20GroupJoin;
|
|||||||
import im.rosetta.executors.Executor21GroupLeave;
|
import im.rosetta.executors.Executor21GroupLeave;
|
||||||
import im.rosetta.executors.Executor22GroupBan;
|
import im.rosetta.executors.Executor22GroupBan;
|
||||||
import im.rosetta.executors.Executor24DeviceResolve;
|
import im.rosetta.executors.Executor24DeviceResolve;
|
||||||
|
import im.rosetta.executors.Executor25Sync;
|
||||||
import im.rosetta.executors.Executor3Search;
|
import im.rosetta.executors.Executor3Search;
|
||||||
import im.rosetta.executors.Executor4OnlineState;
|
import im.rosetta.executors.Executor4OnlineState;
|
||||||
import im.rosetta.executors.Executor6Message;
|
import im.rosetta.executors.Executor6Message;
|
||||||
@@ -42,6 +43,7 @@ import im.rosetta.packet.Packet21GroupLeave;
|
|||||||
import im.rosetta.packet.Packet22GroupBan;
|
import im.rosetta.packet.Packet22GroupBan;
|
||||||
import im.rosetta.packet.Packet23DeviceList;
|
import im.rosetta.packet.Packet23DeviceList;
|
||||||
import im.rosetta.packet.Packet24DeviceResolve;
|
import im.rosetta.packet.Packet24DeviceResolve;
|
||||||
|
import im.rosetta.packet.Packet25Sync;
|
||||||
import im.rosetta.packet.Packet2Result;
|
import im.rosetta.packet.Packet2Result;
|
||||||
import im.rosetta.packet.Packet3Search;
|
import im.rosetta.packet.Packet3Search;
|
||||||
import im.rosetta.packet.Packet4OnlineSubscribe;
|
import im.rosetta.packet.Packet4OnlineSubscribe;
|
||||||
@@ -181,6 +183,7 @@ public class Boot {
|
|||||||
this.packetManager.registerPacket(22, Packet22GroupBan.class);
|
this.packetManager.registerPacket(22, Packet22GroupBan.class);
|
||||||
this.packetManager.registerPacket(23, Packet23DeviceList.class);
|
this.packetManager.registerPacket(23, Packet23DeviceList.class);
|
||||||
this.packetManager.registerPacket(24, Packet24DeviceResolve.class);
|
this.packetManager.registerPacket(24, Packet24DeviceResolve.class);
|
||||||
|
this.packetManager.registerPacket(25, Packet25Sync.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void registerAllExecutors() {
|
private void registerAllExecutors() {
|
||||||
@@ -201,6 +204,7 @@ public class Boot {
|
|||||||
this.packetManager.registerExecutor(21, new Executor21GroupLeave());
|
this.packetManager.registerExecutor(21, new Executor21GroupLeave());
|
||||||
this.packetManager.registerExecutor(22, new Executor22GroupBan());
|
this.packetManager.registerExecutor(22, new Executor22GroupBan());
|
||||||
this.packetManager.registerExecutor(24, new Executor24DeviceResolve(this.clientManager, this.eventManager));
|
this.packetManager.registerExecutor(24, new Executor24DeviceResolve(this.clientManager, this.eventManager));
|
||||||
|
this.packetManager.registerExecutor(25, new Executor25Sync(this.packetManager));
|
||||||
}
|
}
|
||||||
|
|
||||||
private void printBootMessage() {
|
private void printBootMessage() {
|
||||||
|
|||||||
@@ -28,6 +28,10 @@ public class ClientManager {
|
|||||||
return this.server;
|
return this.server;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public ClientIndexer getClientIndexer() {
|
||||||
|
return this.clientIndexer;
|
||||||
|
}
|
||||||
|
|
||||||
public boolean isClientConnected(String publicKey) {
|
public boolean isClientConnected(String publicKey) {
|
||||||
HashSet<Client> clients = this.clientIndexer.getClients(ECIAuthentificate.class, "publicKey", publicKey);
|
HashSet<Client> clients = this.clientIndexer.getClients(ECIAuthentificate.class, "publicKey", publicKey);
|
||||||
if(clients == null){
|
if(clients == null){
|
||||||
|
|||||||
@@ -32,8 +32,8 @@ public class Device extends CreateUpdateEntity {
|
|||||||
/**
|
/**
|
||||||
* Время завершения сессии устройства
|
* Время завершения сессии устройства
|
||||||
*/
|
*/
|
||||||
@Column(name = "leaveTime", nullable = true, columnDefinition = "bigint default 0")
|
@Column(name = "syncTime", nullable = true, columnDefinition = "bigint default 0")
|
||||||
private Long leaveTime;
|
private Long syncTime;
|
||||||
|
|
||||||
public Long getId() {
|
public Long getId() {
|
||||||
return id;
|
return id;
|
||||||
@@ -55,12 +55,12 @@ public class Device extends CreateUpdateEntity {
|
|||||||
return deviceOs;
|
return deviceOs;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Long getLeaveTime() {
|
public Long getSyncTime() {
|
||||||
return leaveTime;
|
return syncTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setLeaveTime(Long leaveTime) {
|
public void setSyncTime(Long syncTime) {
|
||||||
this.leaveTime = leaveTime;
|
this.syncTime = syncTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setPublicKey(String publicKey) {
|
public void setPublicKey(String publicKey) {
|
||||||
|
|||||||
@@ -1,5 +1,6 @@
|
|||||||
package im.rosetta.database.repository;
|
package im.rosetta.database.repository;
|
||||||
|
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import im.rosetta.database.Repository;
|
import im.rosetta.database.Repository;
|
||||||
@@ -31,15 +32,19 @@ public class DeviceRepository extends Repository<Device> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Обновляет время последней активности устройства
|
* Обновляет время последней активности устройства в аккаунте
|
||||||
* @param deviceId ID устройства
|
* @param deviceId ID устройства
|
||||||
|
* @param publicKey публичный ключ аккаунта в котором нужно обновить последнюю активность
|
||||||
*/
|
*/
|
||||||
public void updateDeviceLeaveTime(String deviceId) {
|
public void updateDeviceLeaveTime(String deviceId, String publicKey) {
|
||||||
Device device = this.findByField("deviceId", deviceId);
|
Device device = this.findByField(new HashMap<String, Object>(){{
|
||||||
|
put("deviceId", deviceId);
|
||||||
|
put("publicKey", publicKey);
|
||||||
|
}});
|
||||||
if(device == null) {
|
if(device == null) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
device.setLeaveTime(System.currentTimeMillis());
|
device.setSyncTime(System.currentTimeMillis());
|
||||||
this.update(device);
|
this.update(device);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -96,12 +96,25 @@ public class Executor0Handshake extends PacketExecutor<Packet0Handshake> {
|
|||||||
|
|
||||||
userRepository.save(user);
|
userRepository.save(user);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Это первое устройство пользователя, сохраняем его
|
||||||
|
* как верифицированное
|
||||||
|
*/
|
||||||
|
Device newDevice = new Device();
|
||||||
|
newDevice.setDeviceId(deviceId);
|
||||||
|
newDevice.setDeviceName(deviceName);
|
||||||
|
newDevice.setDeviceOs(deviceOs);
|
||||||
|
newDevice.setPublicKey(publicKey);
|
||||||
|
newDevice.setSyncTime(System.currentTimeMillis());
|
||||||
|
deviceRepository.save(newDevice);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Ставим метку аутентификации на клиента
|
* Ставим метку аутентификации на клиента
|
||||||
*/
|
*/
|
||||||
ECIAuthentificate eciTag = new ECIAuthentificate
|
ECIAuthentificate eciTag = new ECIAuthentificate
|
||||||
(publicKey, privateKey, HandshakeStage.COMPLETED);
|
(publicKey, privateKey, HandshakeStage.COMPLETED);
|
||||||
client.addTag(ECIAuthentificate.class, eciTag);
|
client.addTag(ECIAuthentificate.class, eciTag);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Вызываем событие завершения хэндшейка
|
* Вызываем событие завершения хэндшейка
|
||||||
*/
|
*/
|
||||||
@@ -187,20 +200,6 @@ public class Executor0Handshake extends PacketExecutor<Packet0Handshake> {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(userDevicesCount == 0) {
|
|
||||||
/**
|
|
||||||
* Это первое устройство пользователя, сохраняем его
|
|
||||||
* как верифицированное
|
|
||||||
*/
|
|
||||||
Device newDevice = new Device();
|
|
||||||
newDevice.setDeviceId(deviceId);
|
|
||||||
newDevice.setDeviceName(deviceName);
|
|
||||||
newDevice.setDeviceOs(deviceOs);
|
|
||||||
newDevice.setPublicKey(publicKey);
|
|
||||||
newDevice.setLeaveTime(System.currentTimeMillis());
|
|
||||||
deviceRepository.save(newDevice);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Ставим метку аутентификации на клиента
|
* Ставим метку аутентификации на клиента
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -74,7 +74,7 @@ public class Executor24DeviceResolve extends PacketExecutor<Packet24DeviceResolv
|
|||||||
* можно устанавливать leaveTime как текущее время, тогда сообщения новому устройству не загрузятся.
|
* можно устанавливать leaveTime как текущее время, тогда сообщения новому устройству не загрузятся.
|
||||||
* Если установить leaveTime в 0, то синхронизируются все сообщения которые есть на сервере
|
* Если установить leaveTime в 0, то синхронизируются все сообщения которые есть на сервере
|
||||||
*/
|
*/
|
||||||
device.setLeaveTime(0L);
|
device.setSyncTime(0L);
|
||||||
this.deviceRepository.save(device);
|
this.deviceRepository.save(device);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
66
src/main/java/im/rosetta/executors/Executor25Sync.java
Normal file
66
src/main/java/im/rosetta/executors/Executor25Sync.java
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
package im.rosetta.executors;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import im.rosetta.database.repository.BufferRepository;
|
||||||
|
import im.rosetta.packet.Packet25Sync;
|
||||||
|
import im.rosetta.packet.runtime.NetworkSyncStatus;
|
||||||
|
import im.rosetta.service.services.BufferService;
|
||||||
|
import im.rosetta.service.services.runtime.PacketBuffer;
|
||||||
|
import io.orprotocol.ProtocolException;
|
||||||
|
import io.orprotocol.client.Client;
|
||||||
|
import io.orprotocol.packet.Packet;
|
||||||
|
import io.orprotocol.packet.PacketExecutor;
|
||||||
|
import io.orprotocol.packet.PacketManager;
|
||||||
|
|
||||||
|
public class Executor25Sync extends PacketExecutor<Packet25Sync> {
|
||||||
|
|
||||||
|
private final BufferRepository bufferRepository = new BufferRepository();
|
||||||
|
private final BufferService bufferService;
|
||||||
|
|
||||||
|
public Executor25Sync(PacketManager packetManager) {
|
||||||
|
this.bufferService = new BufferService(bufferRepository, packetManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onPacketReceived(Packet25Sync packet, Client client) throws Exception, ProtocolException {
|
||||||
|
/**
|
||||||
|
* Начиная с какого времени клиент желает получить синхронизацию
|
||||||
|
*/
|
||||||
|
long fromTimestampMs = packet.getTimestamp();
|
||||||
|
|
||||||
|
PacketBuffer packetBuffer = this.bufferService.getPacketsFromTime(client, fromTimestampMs, 50);
|
||||||
|
List<Packet> packets = packetBuffer.getPackets();
|
||||||
|
|
||||||
|
if(packets.isEmpty()){
|
||||||
|
/**
|
||||||
|
* Нет пакетов для синхронизации, сообщаем клиенту что он синхронизирован
|
||||||
|
*/
|
||||||
|
packet.setSyncStatus(NetworkSyncStatus.NOT_NEEDED);
|
||||||
|
client.send(packet);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Отправляем клиенту информацию о том, что синхронизация началась
|
||||||
|
*/
|
||||||
|
packet.setSyncStatus(NetworkSyncStatus.BATCH_START);
|
||||||
|
client.send(packet);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Отправляем клиенту пакеты для синхронизации
|
||||||
|
*/
|
||||||
|
|
||||||
|
for(Packet syncPacket : packets){
|
||||||
|
client.send(syncPacket);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Сообщаем клиенту, что синхронизация завершена
|
||||||
|
*/
|
||||||
|
packet.setSyncStatus(NetworkSyncStatus.BATCH_END);
|
||||||
|
packet.setTimestamp(packetBuffer.getLastPacketTimestamp());
|
||||||
|
client.send(packet);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -2,19 +2,21 @@ package im.rosetta.listeners;
|
|||||||
|
|
||||||
import im.rosetta.client.ClientManager;
|
import im.rosetta.client.ClientManager;
|
||||||
import im.rosetta.client.tags.ECIAuthentificate;
|
import im.rosetta.client.tags.ECIAuthentificate;
|
||||||
|
import im.rosetta.client.tags.ECIDevice;
|
||||||
|
import im.rosetta.database.repository.DeviceRepository;
|
||||||
import im.rosetta.event.EventHandler;
|
import im.rosetta.event.EventHandler;
|
||||||
import im.rosetta.event.Listener;
|
import im.rosetta.event.Listener;
|
||||||
import im.rosetta.event.events.DisconnectEvent;
|
import im.rosetta.event.events.DisconnectEvent;
|
||||||
import im.rosetta.event.events.handshake.HandshakeCompletedEvent;
|
import im.rosetta.event.events.handshake.HandshakeCompletedEvent;
|
||||||
import im.rosetta.event.events.handshake.HandshakeDeviceConfirmEvent;
|
import im.rosetta.event.events.handshake.HandshakeDeviceConfirmEvent;
|
||||||
import im.rosetta.service.dispatch.DeviceDispatcher;
|
import im.rosetta.service.dispatch.DeviceDispatcher;
|
||||||
|
|
||||||
import io.orprotocol.ProtocolException;
|
import io.orprotocol.ProtocolException;
|
||||||
import io.orprotocol.client.Client;
|
import io.orprotocol.client.Client;
|
||||||
|
|
||||||
public class DeviceListListener implements Listener {
|
public class DeviceListListener implements Listener {
|
||||||
|
|
||||||
private final DeviceDispatcher deviceDispatcher;
|
private final DeviceDispatcher deviceDispatcher;
|
||||||
|
private final DeviceRepository deviceRepository = new DeviceRepository();
|
||||||
|
|
||||||
public DeviceListListener(ClientManager clientManager) {
|
public DeviceListListener(ClientManager clientManager) {
|
||||||
this.deviceDispatcher = new DeviceDispatcher(clientManager);
|
this.deviceDispatcher = new DeviceDispatcher(clientManager);
|
||||||
@@ -48,11 +50,23 @@ public class DeviceListListener implements Listener {
|
|||||||
public void onDisconnect(DisconnectEvent event) throws ProtocolException {
|
public void onDisconnect(DisconnectEvent event) throws ProtocolException {
|
||||||
Client client = event.getClient();
|
Client client = event.getClient();
|
||||||
ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class);
|
ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class);
|
||||||
|
ECIDevice eciDevice = client.getTag(ECIDevice.class);
|
||||||
|
if(eciDevice == null){
|
||||||
|
/**
|
||||||
|
* Если у клиента нет тега устройства, пропускаем его
|
||||||
|
* такого быть не должно, но на всякий случай
|
||||||
|
*/
|
||||||
|
return;
|
||||||
|
}
|
||||||
if(eciAuthentificate != null){
|
if(eciAuthentificate != null){
|
||||||
/**
|
/**
|
||||||
* Когда устройство отключается от аккаунта, отправляем всем клиентам с этим публичным ключом обновленный список устройств
|
* Когда устройство отключается от аккаунта, отправляем всем клиентам с этим публичным ключом обновленный список устройств
|
||||||
*/
|
*/
|
||||||
this.deviceDispatcher.sendDevices(eciAuthentificate.getPublicKey());
|
this.deviceDispatcher.sendDevices(eciAuthentificate.getPublicKey());
|
||||||
|
/**
|
||||||
|
* Обновляем время последнего онлайна устройства, которое отключилось, для корректной работы синхронизации сообщений
|
||||||
|
*/
|
||||||
|
this.deviceRepository.updateDeviceLeaveTime(eciDevice.getDeviceId(), eciAuthentificate.getPublicKey());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -47,7 +47,7 @@ public class ServerStopListener implements Listener {
|
|||||||
*/
|
*/
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
deviceRepository.updateDeviceLeaveTime(eciDevice.getDeviceId());
|
deviceRepository.updateDeviceLeaveTime(eciDevice.getDeviceId(), eciAuth.getPublicKey());
|
||||||
}
|
}
|
||||||
this.logger.info(Color.RED + "Время последней активности устройств клиентов обновлено.");
|
this.logger.info(Color.RED + "Время последней активности устройств клиентов обновлено.");
|
||||||
}
|
}
|
||||||
|
|||||||
66
src/main/java/im/rosetta/packet/Packet25Sync.java
Normal file
66
src/main/java/im/rosetta/packet/Packet25Sync.java
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
package im.rosetta.packet;
|
||||||
|
|
||||||
|
import im.rosetta.packet.runtime.NetworkSyncStatus;
|
||||||
|
import io.orprotocol.Stream;
|
||||||
|
import io.orprotocol.packet.Packet;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Пакет отправляет клиент устанавливая timestamp последнего синхронизированного пакета,
|
||||||
|
* а так же статус синхронизации, который может быть:
|
||||||
|
* NOT_NEEDED - синхронизация не требуется, так как устройство уже синхронизировано или не требует синхронизации
|
||||||
|
* BATCH_START - начало синхронизации, сервер начинает отправлять клиенту пакеты для синхронизации, клиент должен подготовиться к приему пакетов для синхронизации
|
||||||
|
* BATCH_END - конец синхронизации, сервер завершил отправку пакетов для синхронизации, клиент может завершить процесс синхронизации
|
||||||
|
*/
|
||||||
|
public class Packet25Sync extends Packet {
|
||||||
|
|
||||||
|
private NetworkSyncStatus syncStatus;
|
||||||
|
private long timestamp;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void read(Stream stream) {
|
||||||
|
this.syncStatus = NetworkSyncStatus.fromValue(stream.readInt8());
|
||||||
|
this.timestamp = stream.readInt64();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Stream write() {
|
||||||
|
Stream stream = new Stream();
|
||||||
|
stream.writeInt16(this.packetId);
|
||||||
|
stream.writeInt8(this.syncStatus.getValue());
|
||||||
|
stream.writeInt64(this.timestamp);
|
||||||
|
return stream;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Получить статус синхронизации
|
||||||
|
* @return статус синхронизации
|
||||||
|
*/
|
||||||
|
public NetworkSyncStatus getSyncStatus() {
|
||||||
|
return syncStatus;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Установить статус синхронизации
|
||||||
|
* @param syncStatus статус синхронизации
|
||||||
|
*/
|
||||||
|
public void setSyncStatus(NetworkSyncStatus syncStatus) {
|
||||||
|
this.syncStatus = syncStatus;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Получить timestamp последнего синхронизированного пакета
|
||||||
|
* @return timestamp последнего синхронизированного пакета
|
||||||
|
*/
|
||||||
|
public long getTimestamp() {
|
||||||
|
return timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Установить timestamp последнего синхронизированного пакета
|
||||||
|
* @param timestamp timestamp последнего синхронизированного пакета
|
||||||
|
*/
|
||||||
|
public void setTimestamp(long timestamp) {
|
||||||
|
this.timestamp = timestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
@@ -0,0 +1,35 @@
|
|||||||
|
package im.rosetta.packet.runtime;
|
||||||
|
|
||||||
|
public enum NetworkSyncStatus {
|
||||||
|
/**
|
||||||
|
* Синхронизация не требуется, так как устройство уже синхронизировано или не требует синхронизации
|
||||||
|
*/
|
||||||
|
NOT_NEEDED(0),
|
||||||
|
/**
|
||||||
|
* Начало синхронизации, сервер начинает отправлять клиенту пакеты для синхронизации, клиент должен подготовиться к приему пакетов для синхронизации
|
||||||
|
*/
|
||||||
|
BATCH_START(1),
|
||||||
|
/**
|
||||||
|
* Конец синхронизации, сервер завершил отправку пакетов для синхронизации, клиент может завершить процесс синхронизации
|
||||||
|
*/
|
||||||
|
BATCH_END(2);
|
||||||
|
|
||||||
|
private final int value;
|
||||||
|
|
||||||
|
NetworkSyncStatus(int value) {
|
||||||
|
this.value = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getValue() {
|
||||||
|
return value;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static NetworkSyncStatus fromValue(int value) {
|
||||||
|
for (NetworkSyncStatus status : NetworkSyncStatus.values()) {
|
||||||
|
if (status.getValue() == value) {
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
throw new IllegalArgumentException("Unknown NetworkSyncStatus value: " + value);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,5 +1,6 @@
|
|||||||
package im.rosetta.service.dispatch;
|
package im.rosetta.service.dispatch;
|
||||||
|
|
||||||
|
import java.util.HashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import im.rosetta.Failures;
|
import im.rosetta.Failures;
|
||||||
@@ -96,6 +97,11 @@ public class MessageDispatcher {
|
|||||||
* Сохраняем сообщение в буфер на случай если получатель офлайн, или нам нужна будет синхронизация сообщений для получателя
|
* Сохраняем сообщение в буфер на случай если получатель офлайн, или нам нужна будет синхронизация сообщений для получателя
|
||||||
*/
|
*/
|
||||||
this.bufferService.pushPacketToBuffer(fromPublicKey, toPublicKey, packet);
|
this.bufferService.pushPacketToBuffer(fromPublicKey, toPublicKey, packet);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Ретранслируем сообщение всем авторизованным сессиям отправителя, чтобы синхронизировать отправленные сообщения
|
||||||
|
*/
|
||||||
|
this.retranslate(packet, client);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -111,5 +117,34 @@ public class MessageDispatcher {
|
|||||||
this.sendPeer(packet, client, true);
|
this.sendPeer(packet, client, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Сообщает всем авторизованным сессиям отправителя о том, что он отправил сообщения,
|
||||||
|
* для того чтобы синхронизировать отправленные сообщения на всех устройствах отправителя
|
||||||
|
* @param packet пакет сообщения
|
||||||
|
* @param client клиент отправляющий пакет
|
||||||
|
* @throws ProtocolException
|
||||||
|
*/
|
||||||
|
public void retranslate(PacketBaseDialog packet, Client client) throws ProtocolException {
|
||||||
|
ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class);
|
||||||
|
HashSet<Client> clients = this.clientManager.getClientIndexer()
|
||||||
|
.getClients(ECIAuthentificate.class, "publicKey", eciAuthentificate.getPublicKey());
|
||||||
|
if(clients == null){
|
||||||
|
/**
|
||||||
|
* Нет авторизованных сессий с таким публичным ключом
|
||||||
|
*/
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
for(Client c : clients){
|
||||||
|
/**
|
||||||
|
* Проходим по всем устройствам с таким публичным ключом и ретранслируем им пакет, кроме того устройства что
|
||||||
|
* отправило пакет
|
||||||
|
*/
|
||||||
|
if(c.equals(client)){
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
c.send(packet);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ import im.rosetta.database.entity.Buffer;
|
|||||||
import im.rosetta.database.repository.BufferRepository;
|
import im.rosetta.database.repository.BufferRepository;
|
||||||
import im.rosetta.exception.UnauthorizedExeception;
|
import im.rosetta.exception.UnauthorizedExeception;
|
||||||
import im.rosetta.service.Service;
|
import im.rosetta.service.Service;
|
||||||
|
import im.rosetta.service.services.runtime.PacketBuffer;
|
||||||
import io.orprotocol.ProtocolException;
|
import io.orprotocol.ProtocolException;
|
||||||
import io.orprotocol.client.Client;
|
import io.orprotocol.client.Client;
|
||||||
import io.orprotocol.packet.Packet;
|
import io.orprotocol.packet.Packet;
|
||||||
@@ -32,7 +32,7 @@ public class BufferService extends Service<BufferRepository> {
|
|||||||
* @return
|
* @return
|
||||||
* @throws ProtocolException
|
* @throws ProtocolException
|
||||||
*/
|
*/
|
||||||
public List<Packet> getPacketsFromTime(Client client, long fromTimestampMs) throws ProtocolException, UnauthorizedExeception {
|
public PacketBuffer getPacketsFromTime(Client client, long fromTimestampMs, int take) throws ProtocolException, UnauthorizedExeception {
|
||||||
ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class);
|
ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class);
|
||||||
if(eciAuthentificate == null || !eciAuthentificate.hasAuthorized()){
|
if(eciAuthentificate == null || !eciAuthentificate.hasAuthorized()){
|
||||||
/**
|
/**
|
||||||
@@ -41,20 +41,25 @@ public class BufferService extends Service<BufferRepository> {
|
|||||||
throw new UnauthorizedExeception("Unauthorized client cannot get packets from buffer");
|
throw new UnauthorizedExeception("Unauthorized client cannot get packets from buffer");
|
||||||
}
|
}
|
||||||
String toPublicKey = eciAuthentificate.getPublicKey();
|
String toPublicKey = eciAuthentificate.getPublicKey();
|
||||||
String hql = "FROM Buffer WHERE to = :to AND timestamp > :timestamp ORDER BY timestamp ASC";
|
String hql = "FROM Buffer WHERE (to = :to OR from = :from) AND timestamp > :timestamp ORDER BY timestamp ASC";
|
||||||
HashMap<String, Object> parameters = new HashMap<>();
|
HashMap<String, Object> parameters = new HashMap<>();
|
||||||
parameters.put("to", toPublicKey);
|
parameters.put("to", toPublicKey);
|
||||||
|
parameters.put("from", toPublicKey);
|
||||||
parameters.put("timestamp", fromTimestampMs);
|
parameters.put("timestamp", fromTimestampMs);
|
||||||
List<Packet> packets = new ArrayList<>();
|
List<Packet> packets = new ArrayList<>();
|
||||||
|
long lastTimestamp = fromTimestampMs;
|
||||||
try(QuerySession<Buffer> querySession = this.getRepository().buildQuery(hql, parameters)){
|
try(QuerySession<Buffer> querySession = this.getRepository().buildQuery(hql, parameters)){
|
||||||
List<Buffer> buffers = querySession.getQuery().list();
|
List<Buffer> buffers = querySession.getQuery().setMaxResults(take).list();
|
||||||
for(Buffer buffer : buffers) {
|
for(Buffer buffer : buffers) {
|
||||||
byte[] packetBytes = buffer.getPacket();
|
byte[] packetBytes = buffer.getPacket();
|
||||||
Packet packet = this.packetManager.createPacket(packetBytes);
|
Packet packet = this.packetManager.createPacket(packetBytes);
|
||||||
packets.add(packet);
|
packets.add(packet);
|
||||||
}
|
}
|
||||||
|
if(!buffers.isEmpty()){
|
||||||
|
lastTimestamp = buffers.get(buffers.size() - 1).getTimestamp();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return packets;
|
return new PacketBuffer(packets, lastTimestamp);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|||||||
@@ -2,10 +2,13 @@ package im.rosetta.service.services;
|
|||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import im.rosetta.client.tags.ECIAuthentificate;
|
||||||
|
import im.rosetta.client.tags.ECIDevice;
|
||||||
import im.rosetta.database.entity.Device;
|
import im.rosetta.database.entity.Device;
|
||||||
import im.rosetta.database.entity.User;
|
import im.rosetta.database.entity.User;
|
||||||
import im.rosetta.database.repository.DeviceRepository;
|
import im.rosetta.database.repository.DeviceRepository;
|
||||||
import im.rosetta.service.Service;
|
import im.rosetta.service.Service;
|
||||||
|
import io.orprotocol.client.Client;
|
||||||
|
|
||||||
public class DeviceService extends Service<DeviceRepository> {
|
public class DeviceService extends Service<DeviceRepository> {
|
||||||
|
|
||||||
@@ -45,4 +48,38 @@ public class DeviceService extends Service<DeviceRepository> {
|
|||||||
return this.getRepository().findAllByField("publicKey", publicKey);
|
return this.getRepository().findAllByField("publicKey", publicKey);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Получает время последней синхронизации устройства, для корректной работы синхронизации сообщений
|
||||||
|
* @param client клиент для которого нужно получить время последней синхронизации устройства
|
||||||
|
* @return время последней синхронизации устройства, или 0 если устройство не найдено,
|
||||||
|
* или клиент не авторизован, таким образом вызывающий код синхронизирует все сообщения
|
||||||
|
*/
|
||||||
|
public long getLastSyncTime(Client client){
|
||||||
|
ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class);
|
||||||
|
if(eciAuthentificate == null || !eciAuthentificate.hasAuthorized()){
|
||||||
|
/**
|
||||||
|
* Если клиент не авторизован, возвращаем 0, такого быть не должно
|
||||||
|
*/
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
ECIDevice eciDevice = client.getTag(ECIDevice.class);
|
||||||
|
if(eciDevice == null){
|
||||||
|
/**
|
||||||
|
* Если у клиента нет тега устройства, возвращаем 0, такого быть не должно, но на всякий случай
|
||||||
|
*/
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
Device device = this.getRepository().findByField(new java.util.HashMap<String, Object>(){{
|
||||||
|
put("deviceId", eciDevice.getDeviceId());
|
||||||
|
put("publicKey", eciAuthentificate.getPublicKey());
|
||||||
|
}});
|
||||||
|
if(device == null){
|
||||||
|
/**
|
||||||
|
* Если устройство не найдено, возвращаем 0, значит это устройство новое
|
||||||
|
*/
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return device.getSyncTime();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,31 @@
|
|||||||
|
package im.rosetta.service.services.runtime;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import io.orprotocol.packet.Packet;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Класс для хранения пакетов для синхронизации и времени последнего пакета для корректной работы синхронизации сообщений
|
||||||
|
* Когда клиент запрашивает синхронизацию сообщений, мы возвращаем ему список пакетов для
|
||||||
|
* синхронизации и время последнего пакета, чтобы клиент мог корректно обновить время последней
|
||||||
|
* синхронизации и не запрашивать одни и те же пакеты при следующей синхронизации
|
||||||
|
*/
|
||||||
|
public class PacketBuffer {
|
||||||
|
|
||||||
|
private List<Packet> packets;
|
||||||
|
private long lastPacketTimestamp;
|
||||||
|
|
||||||
|
public PacketBuffer(List<Packet> packets, long lastPacketTimestamp) {
|
||||||
|
this.packets = packets;
|
||||||
|
this.lastPacketTimestamp = lastPacketTimestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<Packet> getPackets() {
|
||||||
|
return packets;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getLastPacketTimestamp() {
|
||||||
|
return lastPacketTimestamp;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user