diff --git a/src/main/java/com/rosetta/im/Boot.java b/src/main/java/com/rosetta/im/Boot.java index a301e44..57255b0 100644 --- a/src/main/java/com/rosetta/im/Boot.java +++ b/src/main/java/com/rosetta/im/Boot.java @@ -5,10 +5,12 @@ import com.rosetta.im.client.OnlineManager; import com.rosetta.im.event.EventManager; import com.rosetta.im.executors.Executor0Handshake; import com.rosetta.im.executors.Executor1UserInfo; +import com.rosetta.im.executors.Executor24DeviceResolve; import com.rosetta.im.executors.Executor3Search; import com.rosetta.im.executors.Executor4OnlineState; import com.rosetta.im.executors.Executor6Message; import com.rosetta.im.executors.Executor7Read; +import com.rosetta.im.listeners.DeviceListListener; import com.rosetta.im.listeners.HandshakeCompleteListener; import com.rosetta.im.listeners.OnlineStatusDisconnectListener; import com.rosetta.im.listeners.OnlineStatusHandshakeCompleteListener; @@ -18,6 +20,8 @@ import com.rosetta.im.logger.enums.Color; import com.rosetta.im.logger.enums.LogLevel; import com.rosetta.im.packet.Packet0Handshake; import com.rosetta.im.packet.Packet1UserInfo; +import com.rosetta.im.packet.Packet23DeviceList; +import com.rosetta.im.packet.Packet24DeviceResolve; import com.rosetta.im.packet.Packet2Result; import com.rosetta.im.packet.Packet3Search; import com.rosetta.im.packet.Packet4OnlineSubscribe; @@ -25,6 +29,7 @@ import com.rosetta.im.packet.Packet5OnlineState; import com.rosetta.im.packet.Packet6Message; import com.rosetta.im.packet.Packet7Read; import com.rosetta.im.packet.Packet8Delivery; +import com.rosetta.im.packet.Packet9DeviceNew; import io.orprotocol.Server; import io.orprotocol.Settings; @@ -110,6 +115,7 @@ public class Boot { this.eventManager.registerListener(new HandshakeCompleteListener()); this.eventManager.registerListener(new OnlineStatusHandshakeCompleteListener(this.onlineManager)); this.eventManager.registerListener(new OnlineStatusDisconnectListener(this.onlineManager)); + this.eventManager.registerListener(new DeviceListListener(this.clientManager)); } private void registerAllPackets() { @@ -122,15 +128,20 @@ public class Boot { this.packetManager.registerPacket(6, Packet6Message.class); this.packetManager.registerPacket(7, Packet7Read.class); this.packetManager.registerPacket(8, Packet8Delivery.class); + this.packetManager.registerPacket(9, Packet9DeviceNew.class); + this.packetManager.registerPacket(23, Packet23DeviceList.class); + this.packetManager.registerPacket(24, Packet24DeviceResolve.class); + } private void registerAllExecutors() { - this.packetManager.registerExecutor(0, new Executor0Handshake(this.eventManager)); + this.packetManager.registerExecutor(0, new Executor0Handshake(this.eventManager, this.clientManager, this.packetManager)); this.packetManager.registerExecutor(1, new Executor1UserInfo()); this.packetManager.registerExecutor(3, new Executor3Search(this.clientManager)); this.packetManager.registerExecutor(4, new Executor4OnlineState(this.onlineManager, this.clientManager)); this.packetManager.registerExecutor(6, new Executor6Message(this.clientManager, this.packetManager)); this.packetManager.registerExecutor(7, new Executor7Read(this.clientManager, this.packetManager)); + this.packetManager.registerExecutor(24, new Executor24DeviceResolve(this.clientManager, this.eventManager)); } private void printBootMessage() { diff --git a/src/main/java/com/rosetta/im/client/ClientManager.java b/src/main/java/com/rosetta/im/client/ClientManager.java index 6798795..9fd1259 100644 --- a/src/main/java/com/rosetta/im/client/ClientManager.java +++ b/src/main/java/com/rosetta/im/client/ClientManager.java @@ -49,12 +49,12 @@ public class ClientManager { } /** - * Отправить пакет всем клиентам с публичным ключом publicKey + * Отправить пакет всем АВТОРИЗОВАННЫМ клиентам с публичным ключом publicKey * @param publicKey публичный ключ получателя * @param packet пакет для отправки * @throws ProtocolException если произошла ошибка при отправке пакета клиенту */ - public void sendPacketToPK(String publicKey, Packet packet) throws ProtocolException { + public void sendPacketToAuthorizedPK(String publicKey, Packet packet) throws ProtocolException { HashSet clients = this.clientIndexer.getClients(ECIAuthentificate.class, "publicKey", publicKey); if(clients == null){ /** @@ -85,11 +85,27 @@ public class ClientManager { * @param packet пакет для отправки * @throws ProtocolException если произошла ошибка при отправке пакета клиенту */ - public void sendPacketToPK(List publicKeys, Packet packet) throws ProtocolException { + public void sendPacketToAuthorizedPK(List publicKeys, Packet packet) throws ProtocolException { for(String publicKey : publicKeys){ - this.sendPacketToPK(publicKey, packet); + this.sendPacketToAuthorizedPK(publicKey, packet); } } + + /** + * Получить список клиентов по публичному ключу (get PublicKey clients), могут быть неавторизованные клиенты + * @param publicKey публичный ключ клиента + * @return список клиентов с таким публичным ключом, может быть пустым, если клиентов с таким публичным ключом нет + */ + public List getPKClients(String publicKey) { + HashSet clients = this.clientIndexer.getClients(ECIAuthentificate.class, "publicKey", publicKey); + if(clients == null){ + /** + * Нет клиентов с таким публичным ключом + */ + return List.of(); + } + return List.copyOf(clients); + } diff --git a/src/main/java/com/rosetta/im/client/tags/ECIAuthentificate.java b/src/main/java/com/rosetta/im/client/tags/ECIAuthentificate.java index fd15864..e238b39 100644 --- a/src/main/java/com/rosetta/im/client/tags/ECIAuthentificate.java +++ b/src/main/java/com/rosetta/im/client/tags/ECIAuthentificate.java @@ -60,13 +60,7 @@ public class ECIAuthentificate implements ECITag { @Override public Map getIndex() { Map indexes = new HashMap<>(); - if(this.hasAuthorized()){ - /** - * Индексируем пользователя только если он авторизован, - * иначе не нужно их индексировать, чтобы не забивать память - */ - indexes.put("publicKey", publicKey); - } + indexes.put("publicKey", publicKey); return indexes; } diff --git a/src/main/java/com/rosetta/im/database/QuerySession.java b/src/main/java/com/rosetta/im/database/QuerySession.java index 9b64f0f..917298e 100644 --- a/src/main/java/com/rosetta/im/database/QuerySession.java +++ b/src/main/java/com/rosetta/im/database/QuerySession.java @@ -1,26 +1,41 @@ package com.rosetta.im.database; import org.hibernate.Session; +import org.hibernate.Transaction; import org.hibernate.query.Query; public class QuerySession implements AutoCloseable { - private Session session; - private Query query; + private final Session session; + private final Query query; + private final Transaction tx; public QuerySession(Session session, Query query) { this.session = session; this.query = query; + this.tx = session.beginTransaction(); } public Query getQuery() { return query; } - @Override - public void close() { - if (session != null && session.isOpen()) { - session.close(); + public void commit() { + if (tx != null && tx.isActive()) { + tx.commit(); } } -} + + @Override + public void close() { + try { + if (tx != null && tx.isActive()) { + tx.rollback(); + } + } finally { + if (session != null && session.isOpen()) { + session.close(); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/com/rosetta/im/database/Repository.java b/src/main/java/com/rosetta/im/database/Repository.java index 3ef5d7c..4aad181 100644 --- a/src/main/java/com/rosetta/im/database/Repository.java +++ b/src/main/java/com/rosetta/im/database/Repository.java @@ -233,12 +233,18 @@ public abstract class Repository { * Выполняет запрос с параметрами и возвращает список сущностей * @param queryString SQL запрос * @param parameters параметры запроса + * @param noResultType если true, то не указывать тип результата в запросе, используется для запросов типа UPDATE и DELETE * @return список сущностей */ - public QuerySession buildQuery(String queryString, HashMap parameters) { + public QuerySession buildQuery(String queryString, HashMap parameters, boolean noResultType) { Session session = HibernateUtil.openSession(); try { - Query query = session.createQuery(queryString, entityClass); + Query query; + if(noResultType) { + query = session.createQuery(queryString); + } else { + query = session.createQuery(queryString, entityClass); + } for (var entry : parameters.entrySet()) { query.setParameter(entry.getKey(), entry.getValue()); } @@ -248,6 +254,16 @@ public abstract class Repository { throw e; } } + + /** + * Выполняет запрос с параметрами и возвращает список сущностей, тип результата указывается автоматически, используется для запросов типа SELECT + * @param queryString SQL запрос + * @param parameters параметры запроса + * @return список сущностей + */ + public QuerySession buildQuery(String queryString, HashMap parameters) { + return buildQuery(queryString, parameters, false); + } /** * Подсчет сущностей по набору полей diff --git a/src/main/java/com/rosetta/im/database/entity/Buffer.java b/src/main/java/com/rosetta/im/database/entity/Buffer.java index dd99599..0d45c53 100644 --- a/src/main/java/com/rosetta/im/database/entity/Buffer.java +++ b/src/main/java/com/rosetta/im/database/entity/Buffer.java @@ -28,6 +28,9 @@ public class Buffer extends CreateUpdateEntity { @Column(name = "destination") private String to; + + @Column(name = "packetId") + private int packetId; @Column(name = "packet", columnDefinition = "bytea") private byte[] packet; @@ -71,4 +74,12 @@ public class Buffer extends CreateUpdateEntity { public void setTimestamp(Long timestamp) { this.timestamp = timestamp; } + + public int getPacketId() { + return packetId; + } + + public void setPacketId(int packetId) { + this.packetId = packetId; + } } diff --git a/src/main/java/com/rosetta/im/executors/Executor0Handshake.java b/src/main/java/com/rosetta/im/executors/Executor0Handshake.java index acae560..a87ebcb 100644 --- a/src/main/java/com/rosetta/im/executors/Executor0Handshake.java +++ b/src/main/java/com/rosetta/im/executors/Executor0Handshake.java @@ -2,10 +2,12 @@ package com.rosetta.im.executors; import com.rosetta.im.Configuration; import com.rosetta.im.Failures; +import com.rosetta.im.client.ClientManager; import com.rosetta.im.client.tags.ECIAuthentificate; import com.rosetta.im.client.tags.ECIDevice; import com.rosetta.im.database.entity.Device; import com.rosetta.im.database.entity.User; +import com.rosetta.im.database.repository.BufferRepository; import com.rosetta.im.database.repository.DeviceRepository; import com.rosetta.im.database.repository.UserRepository; import com.rosetta.im.event.EventManager; @@ -13,13 +15,16 @@ import com.rosetta.im.event.events.handshake.HandshakeCompletedEvent; import com.rosetta.im.event.events.handshake.HandshakeDeviceConfirmEvent; import com.rosetta.im.event.events.handshake.HandshakeFailedEvent; import com.rosetta.im.packet.Packet0Handshake; +import com.rosetta.im.packet.Packet9DeviceNew; import com.rosetta.im.packet.runtime.HandshakeStage; +import com.rosetta.im.service.services.BufferService; import com.rosetta.im.service.services.DeviceService; import io.orprotocol.ProtocolException; import io.orprotocol.client.Client; import io.orprotocol.lock.Lock; import io.orprotocol.packet.PacketExecutor; +import io.orprotocol.packet.PacketManager; public class Executor0Handshake extends PacketExecutor { @@ -27,12 +32,16 @@ public class Executor0Handshake extends PacketExecutor { private final DeviceRepository deviceRepository = new DeviceRepository(); private final DeviceService deviceService = new DeviceService(deviceRepository); private final EventManager eventManager; + private final ClientManager clientManager; + private final BufferRepository bufferRepository = new BufferRepository(); + private final BufferService bufferService; - public Executor0Handshake(EventManager eventManager) { + public Executor0Handshake(EventManager eventManager, ClientManager clientManager, PacketManager packetManager) { this.eventManager = eventManager; + this.clientManager = clientManager; + this.bufferService = new BufferService(bufferRepository, packetManager); } - @Override @Lock(lockFor = "publicKey") public void onPacketReceived(Packet0Handshake handshake, Client client) throws ProtocolException { @@ -138,23 +147,44 @@ public class Executor0Handshake extends PacketExecutor { */ handshake.setHandshakeStage(HandshakeStage.NEED_DEVICE_VERIFICATION); handshake.setHeartbeatInterval(this.settings.heartbeatInterval); - /** - * Вызываем событие подтверждения устройства - */ - this.eventManager.callEvent( - new HandshakeDeviceConfirmEvent(publicKey, privateKey, device, authentificate, client) - ); /** * Ставим метку аутентификации на клиента */ ECIAuthentificate eciTag = new ECIAuthentificate (publicKey, privateKey, HandshakeStage.NEED_DEVICE_VERIFICATION); client.addTag(ECIAuthentificate.class, eciTag); + /** + * Вызываем событие подтверждения устройства + */ + this.eventManager.callEvent( + new HandshakeDeviceConfirmEvent(publicKey, privateKey, device, authentificate, client) + ); /** * Отправляем клиенту информацию о необходимости * подтверждения устройства */ client.send(handshake); + + /** + * Уведомляем все авторизованные устройства пользователя о том, что нужно подтвердить новое устройство + */ + Packet9DeviceNew newDevicePacket = new Packet9DeviceNew(); + newDevicePacket.setDeviceId(deviceId); + newDevicePacket.setDeviceName(deviceName); + newDevicePacket.setDeviceOs(deviceOs); + newDevicePacket.setIpAddress(client.getSocket().getRemoteSocketAddress().toString()); + clientManager.sendPacketToAuthorizedPK(publicKey, newDevicePacket); + /** + * Сбрасываем клиенту все старые подтверждения устройств, чтобы исключить спам запросами + */ + this.bufferService.deletePacketsFromBuffer(publicKey, newDevicePacket, 0); + /** + * Кладем пакет в очередь на все устройства пользователя, + * чтобы если в момент отправки этого пакета какое-то устройство было не онлайн, + * то когда оно зайдет в сеть, то получит этот пакет и сможет отреагировать на него, + * показав пользователю уведомление о том, что нужно подтвердить новое устройство + */ + this.bufferService.pushPacketToBuffer("server", publicKey, newDevicePacket); return; } diff --git a/src/main/java/com/rosetta/im/executors/Executor24DeviceResolve.java b/src/main/java/com/rosetta/im/executors/Executor24DeviceResolve.java new file mode 100644 index 0000000..11827a3 --- /dev/null +++ b/src/main/java/com/rosetta/im/executors/Executor24DeviceResolve.java @@ -0,0 +1,115 @@ +package com.rosetta.im.executors; + +import java.util.List; + +import com.rosetta.im.Failures; +import com.rosetta.im.client.ClientManager; +import com.rosetta.im.client.tags.ECIAuthentificate; +import com.rosetta.im.client.tags.ECIDevice; +import com.rosetta.im.database.entity.Device; +import com.rosetta.im.database.repository.DeviceRepository; +import com.rosetta.im.event.EventManager; +import com.rosetta.im.event.events.handshake.HandshakeCompletedEvent; +import com.rosetta.im.packet.Packet0Handshake; +import com.rosetta.im.packet.Packet24DeviceResolve; +import com.rosetta.im.packet.runtime.DeviceSolution; +import com.rosetta.im.packet.runtime.HandshakeStage; + +import io.orprotocol.ProtocolException; +import io.orprotocol.client.Client; +import io.orprotocol.packet.PacketExecutor; + +public class Executor24DeviceResolve extends PacketExecutor { + + private final ClientManager clientManager; + private final EventManager eventManager; + private final DeviceRepository deviceRepository = new DeviceRepository(); + + public Executor24DeviceResolve(ClientManager clientManager, EventManager eventManager) { + this.clientManager = clientManager; + this.eventManager = eventManager; + } + + @Override + public void onPacketReceived(Packet24DeviceResolve packet, Client client) throws Exception, ProtocolException { + ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class); + if (eciAuthentificate == null || !eciAuthentificate.hasAuthorized()) { + /** + * Если клиент не прошел аутентификацию, то он не может разрешать устройства + */ + client.disconnect(Failures.HANDSHAKE_NOT_COMPLETED); + return; + } + String deviceId = packet.getDeviceId(); + DeviceSolution solution = packet.getSolution(); + + /** + * Получаем всех клиентов с таким publicKey, и сравниваем внутри них deviceId, + * если находим совпадение - разрешаем это устройство + */ + List clients = this.clientManager.getPKClients(eciAuthentificate.getPublicKey()); + for(Client c : clients){ + ECIDevice deviceTag = c.getTag(ECIDevice.class); + if(deviceTag != null && deviceTag.getDeviceId().equals(deviceId)){ + /** + * Нашли клиента с таким deviceId, разрешаем или отклоняем его в зависимости от решения которое + * пришло в пакете + */ + if(solution == DeviceSolution.ACCEPT){ + /** + * Разрешено, запоминаем устройство, инициируем событие успешного хэндшейка, и отправляем успешный хэндшейк этому устройству, + * чтобы клиент понял, что устройство разрешено и мог продолжать работу + */ + Device device = new Device(); + device.setDeviceId(deviceId); + device.setPublicKey(eciAuthentificate.getPublicKey()); + device.setDeviceOs(deviceTag.getDeviceOs()); + device.setDeviceName(deviceTag.getDeviceName()); + /** + * TODO: Здесь можно реализовать отключение синхронизации, + * например если у пользователя отключена синхронизация, то при разрешении нового устройства + * можно устанавливать leaveTime как текущее время, тогда сообщения новому устройству не загрузятся. + * Если установить leaveTime в 0, то синхронизируются все сообщения которые есть на сервере + */ + device.setLeaveTime(0L); + this.deviceRepository.save(device); + + /** + * Устанавливаем пользователю успешный хэндшейк + */ + ECIAuthentificate authTag = c.getTag(ECIAuthentificate.class); + authTag.setHandshakeStage(HandshakeStage.COMPLETED); + c.reindexTag(ECIAuthentificate.class, authTag); + /** + * Отправляем этому устройству пакет с успешным хэндшейком, чтобы клиент понял, + * что устройство разрешено и мог продолжать работу + */ + Packet0Handshake handshake = new Packet0Handshake(); + handshake.setHandshakeStage(HandshakeStage.COMPLETED); + handshake.setDeviceId(""); + handshake.setDeviceName(""); + handshake.setDeviceOs(""); + handshake.setHeartbeatInterval(this.getSettings().heartbeatInterval); + handshake.setPrivateKey(""); + handshake.setPublicKey(""); + c.send(handshake); + /** + * Инициируем событие успешного хэндшейка, чтобы другие части сервера могли отреагировать на это, + * например отправить синхронизацию сообщений этому устройству + */ + this.eventManager.callEvent(new HandshakeCompletedEvent(deviceId, deviceId, deviceTag, eciAuthentificate, client)); + break; + } + if(solution == DeviceSolution.DECLINE){ + /** + * Отклонено, отправляем отклонение + */ + c.send(packet); + c.disconnect(Failures.HANDSHAKE_NOT_COMPLETED); + break; + } + } + } + } + +} diff --git a/src/main/java/com/rosetta/im/listeners/DeviceListListener.java b/src/main/java/com/rosetta/im/listeners/DeviceListListener.java new file mode 100644 index 0000000..231414e --- /dev/null +++ b/src/main/java/com/rosetta/im/listeners/DeviceListListener.java @@ -0,0 +1,59 @@ +package com.rosetta.im.listeners; + +import com.rosetta.im.client.ClientManager; +import com.rosetta.im.client.tags.ECIAuthentificate; +import com.rosetta.im.event.EventHandler; +import com.rosetta.im.event.Listener; +import com.rosetta.im.event.events.DisconnectEvent; +import com.rosetta.im.event.events.handshake.HandshakeCompletedEvent; +import com.rosetta.im.event.events.handshake.HandshakeDeviceConfirmEvent; +import com.rosetta.im.service.dispatch.DeviceDispatcher; + +import io.orprotocol.ProtocolException; +import io.orprotocol.client.Client; + +public class DeviceListListener implements Listener { + + private final DeviceDispatcher deviceDispatcher; + + public DeviceListListener(ClientManager clientManager) { + this.deviceDispatcher = new DeviceDispatcher(clientManager); + } + + @EventHandler + public void onHandshakeComplete(HandshakeCompletedEvent event) throws ProtocolException { + Client client = event.getClient(); + ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class); + if(eciAuthentificate != null){ + /** + * Когда клиент прошел аутентификацию, отправляем ему список устройств + */ + this.deviceDispatcher.sendDevices(eciAuthentificate.getPublicKey()); + } + } + + @EventHandler + public void onDeviceConfirm(HandshakeDeviceConfirmEvent event) throws ProtocolException { + Client client = event.getClient(); + ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class); + if(eciAuthentificate != null){ + /** + * Когда к аккаунту присоединяется новое устройство отправляем всем клиентам с этим публичным ключом обновленный список устройств + */ + this.deviceDispatcher.sendDevices(eciAuthentificate.getPublicKey()); + } + } + + @EventHandler + public void onDisconnect(DisconnectEvent event) throws ProtocolException { + Client client = event.getClient(); + ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class); + if(eciAuthentificate != null){ + /** + * Когда устройство отключается от аккаунта, отправляем всем клиентам с этим публичным ключом обновленный список устройств + */ + this.deviceDispatcher.sendDevices(eciAuthentificate.getPublicKey()); + } + } + +} diff --git a/src/main/java/com/rosetta/im/packet/Packet23DeviceList.java b/src/main/java/com/rosetta/im/packet/Packet23DeviceList.java new file mode 100644 index 0000000..1478c11 --- /dev/null +++ b/src/main/java/com/rosetta/im/packet/Packet23DeviceList.java @@ -0,0 +1,62 @@ +package com.rosetta.im.packet; + +import java.util.List; + +import com.rosetta.im.packet.runtime.DeviceSolution; +import com.rosetta.im.packet.runtime.NetworkDevice; +import com.rosetta.im.packet.runtime.NetworkStatus; + +import io.orprotocol.Stream; +import io.orprotocol.packet.Packet; + +/** + * Пакет, который содержит список устройств, с которых был произведен вход в систему. + * Этот пакет может быть отправлен сервером в ответ на запрос клиента о получении списка устройств, + * или может быть отправлен сервером при обнаружении нового входа в систему с нового устройства, чтобы уведомить клиента о новом устройстве. + */ +public class Packet23DeviceList extends Packet { + + private List devices; + + @Override + public void read(Stream stream) { + int deviceCount = stream.readInt16(); + this.devices = new java.util.ArrayList<>(); + for(int i = 0; i < deviceCount; i++) { + NetworkDevice netDevice = new NetworkDevice(); + netDevice.setDeviceId(stream.readString()); + netDevice.setDeviceName(stream.readString()); + netDevice.setDeviceOs(stream.readString()); + /** + * TODO: Использовать boolean для обозначения статуса сети, а не int8. + */ + netDevice.setNetworkStatus(NetworkStatus.fromCode(stream.readInt8())); + netDevice.setDeviceSolution(DeviceSolution.fromCode(stream.readInt8())); + this.devices.add(netDevice); + } + } + + @Override + public Stream write() { + Stream stream = new Stream(); + stream.writeInt16(this.packetId); + stream.writeInt16(this.devices.size()); + for(NetworkDevice device : this.devices) { + stream.writeString(device.getDeviceId()); + stream.writeString(device.getDeviceName()); + stream.writeString(device.getDeviceOs()); + stream.writeInt8(device.getNetworkStatus().getCode()); + stream.writeInt8(device.getDeviceSolution().getCode()); + } + return stream; + } + + public List getDevices() { + return devices; + } + + public void setDevices(List devices) { + this.devices = devices; + } + +} diff --git a/src/main/java/com/rosetta/im/packet/Packet24DeviceResolve.java b/src/main/java/com/rosetta/im/packet/Packet24DeviceResolve.java new file mode 100644 index 0000000..6dbcc84 --- /dev/null +++ b/src/main/java/com/rosetta/im/packet/Packet24DeviceResolve.java @@ -0,0 +1,48 @@ +package com.rosetta.im.packet; + +import com.rosetta.im.packet.runtime.DeviceSolution; + +import io.orprotocol.Stream; +import io.orprotocol.packet.Packet; + +/** + * Пакет для решения по запросу на добавление устройства + * Принимается от клиента, который получил запрос на добавление устройства, и отправляется серверу для обработки решения + */ +public class Packet24DeviceResolve extends Packet { + + private String deviceId; + private DeviceSolution solution; + + @Override + public void read(Stream stream) { + this.deviceId = stream.readString(); + this.solution = DeviceSolution.fromCode(stream.readInt8()); + } + + @Override + public Stream write() { + Stream stream = new Stream(); + stream.writeInt16(this.packetId); + stream.writeString(this.deviceId); + stream.writeInt8(this.solution.getCode()); + return stream; + } + + public String getDeviceId() { + return deviceId; + } + + public void setDeviceId(String deviceId) { + this.deviceId = deviceId; + } + + public DeviceSolution getSolution() { + return solution; + } + + public void setSolution(DeviceSolution solution) { + this.solution = solution; + } + +} diff --git a/src/main/java/com/rosetta/im/packet/Packet9DeviceNew.java b/src/main/java/com/rosetta/im/packet/Packet9DeviceNew.java new file mode 100644 index 0000000..1d8f466 --- /dev/null +++ b/src/main/java/com/rosetta/im/packet/Packet9DeviceNew.java @@ -0,0 +1,72 @@ +package com.rosetta.im.packet; + +import io.orprotocol.Stream; +import io.orprotocol.packet.Packet; + +/** + * Пакет для уведомления о новом устройстве, авторизовавшемся с учетной записью пользователя + * Этот пакет может быть отправлен сервером всем авторизованным устройствам пользователя, + * чтобы уведомить их о том, что с учетной записью было авторизовано новое устройство, и предоставить информацию об этом устройстве (например, IP-адрес, тип устройства, операционная система и т.д.) + * Клиенты могут использовать эту информацию для отображения уведомления пользователю, + * а также для обеспечения безопасности учетной записи (например, если пользователь не узнает устройство, он может предпринять меры + * для защиты своей учетной записи, например, заблокировать вход для нового устройства) + */ +public class Packet9DeviceNew extends Packet { + + private String ipAddress; + private String deviceId; + private String deviceName; + private String deviceOs; + + @Override + public void read(Stream stream) { + this.ipAddress = stream.readString(); + this.deviceId = stream.readString(); + this.deviceName = stream.readString(); + this.deviceOs = stream.readString(); + } + + @Override + public Stream write() { + Stream stream = new Stream(); + stream.writeInt16(this.packetId); + stream.writeString(this.ipAddress); + stream.writeString(this.deviceId); + stream.writeString(this.deviceName); + stream.writeString(this.deviceOs); + return stream; + } + + public String getIpAddress() { + return ipAddress; + } + + public void setIpAddress(String ipAddress) { + this.ipAddress = ipAddress; + } + + public String getDeviceId() { + return deviceId; + } + + public void setDeviceId(String deviceId) { + this.deviceId = deviceId; + } + + public String getDeviceName() { + return deviceName; + } + + public void setDeviceName(String deviceName) { + this.deviceName = deviceName; + } + + public String getDeviceOs() { + return deviceOs; + } + + public void setDeviceOs(String deviceOs) { + this.deviceOs = deviceOs; + } + +} diff --git a/src/main/java/com/rosetta/im/packet/runtime/DeviceSolution.java b/src/main/java/com/rosetta/im/packet/runtime/DeviceSolution.java new file mode 100644 index 0000000..d74e511 --- /dev/null +++ b/src/main/java/com/rosetta/im/packet/runtime/DeviceSolution.java @@ -0,0 +1,33 @@ +package com.rosetta.im.packet.runtime; + +/** + * Решение по запросу на добавление устройства + */ +public enum DeviceSolution { + /** + * Принять запрос на добавление устройства + */ + ACCEPT(0), + /** + * Отклонить запрос на добавление устройства + */ + DECLINE(1); + + private int code; + private DeviceSolution(int code) { + this.code = code; + } + + public int getCode() { + return code; + } + + public static DeviceSolution fromCode(int code) { + for (DeviceSolution solution : DeviceSolution.values()) { + if (solution.getCode() == code) { + return solution; + } + } + throw new IllegalArgumentException("Unknown DeviceSolution value: " + code); + } +} diff --git a/src/main/java/com/rosetta/im/packet/runtime/NetworkDevice.java b/src/main/java/com/rosetta/im/packet/runtime/NetworkDevice.java new file mode 100644 index 0000000..c8e8b5c --- /dev/null +++ b/src/main/java/com/rosetta/im/packet/runtime/NetworkDevice.java @@ -0,0 +1,68 @@ +package com.rosetta.im.packet.runtime; + +/** + * Обозначает подключенное к аккаунту устройство, с которого + * был произведен вход в систему. + */ +public class NetworkDevice { + + private NetworkStatus networkStatus; + private String deviceName; + private String deviceOs; + private String deviceId; + private DeviceSolution deviceSolution; + + public NetworkDevice() { + } + + public NetworkDevice(NetworkStatus networkStatus, String deviceName, String deviceOs, String deviceId, + DeviceSolution deviceSolution) { + this.networkStatus = networkStatus; + this.deviceName = deviceName; + this.deviceOs = deviceOs; + this.deviceId = deviceId; + this.deviceSolution = deviceSolution; + } + + + public NetworkStatus getNetworkStatus() { + return networkStatus; + } + + public void setNetworkStatus(NetworkStatus networkStatus) { + this.networkStatus = networkStatus; + } + + public String getDeviceName() { + return deviceName; + } + + public void setDeviceName(String deviceName) { + this.deviceName = deviceName; + } + + public String getDeviceOs() { + return deviceOs; + } + + public void setDeviceOs(String deviceOs) { + this.deviceOs = deviceOs; + } + + public String getDeviceId() { + return deviceId; + } + + public void setDeviceId(String deviceId) { + this.deviceId = deviceId; + } + + public DeviceSolution getDeviceSolution() { + return deviceSolution; + } + + public void setDeviceSolution(DeviceSolution deviceSolution) { + this.deviceSolution = deviceSolution; + } + +} diff --git a/src/main/java/com/rosetta/im/service/dispatch/DeviceDispatcher.java b/src/main/java/com/rosetta/im/service/dispatch/DeviceDispatcher.java new file mode 100644 index 0000000..81b42a0 --- /dev/null +++ b/src/main/java/com/rosetta/im/service/dispatch/DeviceDispatcher.java @@ -0,0 +1,102 @@ +package com.rosetta.im.service.dispatch; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.rosetta.im.client.ClientManager; +import com.rosetta.im.client.tags.ECIDevice; +import com.rosetta.im.database.entity.Device; +import com.rosetta.im.database.repository.DeviceRepository; +import com.rosetta.im.packet.Packet23DeviceList; +import com.rosetta.im.packet.runtime.DeviceSolution; +import com.rosetta.im.packet.runtime.NetworkDevice; +import com.rosetta.im.packet.runtime.NetworkStatus; +import com.rosetta.im.service.services.DeviceService; + +import io.orprotocol.ProtocolException; +import io.orprotocol.client.Client; + +/** + * Диспетчер устройств, который отвечает за списки устройств в аккаунте + */ +public class DeviceDispatcher { + + private ClientManager clientManager; + private DeviceRepository deviceRepository = new DeviceRepository(); + private DeviceService deviceService = new DeviceService(deviceRepository); + + public DeviceDispatcher(ClientManager clientManager) { + this.clientManager = clientManager; + } + + /** + * Отправит список подключенных устройств всем авторизованным устройствам с publicKey + * @param publicKey публичный ключ аккаунта, для которого нужно отправить список устройств + */ + public void sendDevices(String publicKey) throws ProtocolException { + /** + * Получаем список авторизованных устройств, а так же список устройств которые сейчас в сети + */ + List verifiedDevices = deviceService.getDevicesByPK(publicKey); + List onlineDevices = this.getOnlineDevices(publicKey); + + Map byId = new HashMap<>(); + + /** + * Верифицированные устройства, по умолчанию оффлайн, но верифицированные + */ + for (Device d : verifiedDevices) { + String id = d.getDeviceId(); + NetworkDevice nd = new NetworkDevice(); + nd.setDeviceId(id); + nd.setDeviceSolution(DeviceSolution.ACCEPT); + nd.setNetworkStatus(NetworkStatus.OFFLINE); + nd.setDeviceName(d.getDeviceName()); + nd.setDeviceOs(d.getDeviceOs()); + byId.put(id, nd); + } + + /** + * Подгоняем онлайн статус, если усотройство верифицированно, то оно найдется + * в Map, если устройства там нет соотвественно оно не верифицированно + */ + for (ECIDevice od : onlineDevices) { + String id = od.getDeviceId(); + NetworkDevice nd = byId.get(id); + if (nd == null) { + nd = new NetworkDevice(); + nd.setDeviceId(id); + nd.setDeviceSolution(DeviceSolution.DECLINE); + nd.setDeviceName(od.getDeviceName()); + nd.setDeviceOs(od.getDeviceOs()); + byId.put(id, nd); + } + nd.setNetworkStatus(NetworkStatus.ONLINE); + } + + List networkDevices = new ArrayList<>(byId.values()); + Packet23DeviceList packet = new Packet23DeviceList(); + packet.setDevices(networkDevices); + this.clientManager.sendPacketToAuthorizedPK(publicKey, packet); + } + + /** + * Получить список устройств которые сейчас в сети для публичного ключа (берутся и не авторизованные устройства, так как они тоже в сети) + * @param publicKey публичный ключ аккаунта, для которого нужно получить список устройств которые сейчас в сети + * @return список устройств которые сейчас в сети для публичного ключа + */ + private List getOnlineDevices(String publicKey) { + List onlineDevices = new java.util.ArrayList<>(); + List clients = clientManager.getPKClients(publicKey); + for(Client client : clients){ + ECIDevice deviceTag = client.getTag(ECIDevice.class); + if(deviceTag != null){ + onlineDevices.add(deviceTag); + } + } + return onlineDevices; + } + +} diff --git a/src/main/java/com/rosetta/im/service/dispatch/MessageDispatcher.java b/src/main/java/com/rosetta/im/service/dispatch/MessageDispatcher.java index 520771c..092702a 100644 --- a/src/main/java/com/rosetta/im/service/dispatch/MessageDispatcher.java +++ b/src/main/java/com/rosetta/im/service/dispatch/MessageDispatcher.java @@ -67,7 +67,7 @@ public class MessageDispatcher { */ return; } - this.clientManager.sendPacketToPK(groupMembersPublicKeys, packet); + this.clientManager.sendPacketToAuthorizedPK(groupMembersPublicKeys, packet); //TODO: Сохранить сообщение в буфер для группы, чтобы группы тоже синхронизировались } @@ -79,7 +79,7 @@ public class MessageDispatcher { public void sendPeer(PacketBaseDialog packet, Client client) throws ProtocolException { String fromPublicKey = packet.getFromPublicKey(); String toPublicKey = packet.getToPublicKey(); - this.clientManager.sendPacketToPK(toPublicKey, packet); + this.clientManager.sendPacketToAuthorizedPK(toPublicKey, packet); /** * Сохраняем сообщение в буфер на случай если получатель офлайн, или нам нужна будет синхронизация сообщений для получателя */ diff --git a/src/main/java/com/rosetta/im/service/services/BufferService.java b/src/main/java/com/rosetta/im/service/services/BufferService.java index 800bf73..3a3642d 100644 --- a/src/main/java/com/rosetta/im/service/services/BufferService.java +++ b/src/main/java/com/rosetta/im/service/services/BufferService.java @@ -64,13 +64,36 @@ public class BufferService extends Service { * @param packet пакет для добавления в буфер */ public void pushPacketToBuffer(String from, String to, Packet packet) { + int packetId = this.packetManager.getPacketIdByClass(packet.getClass()); + packet.packetId = packetId; byte[] packetBytes = packet.write().getBuffer(); Buffer buffer = new Buffer(); buffer.setFrom(from); buffer.setTo(to); buffer.setTimestamp(System.currentTimeMillis()); + buffer.setPacketId(packetId); buffer.setPacket(packetBytes); this.getRepository().save(buffer); } + /** + * Удаляет из буфера все пакеты для определенного клиента с публичным ключом to, которые были добавлены + * в буфер после fromTimestampMs и имееют такой же тип пакета как и переданный packet + * @param to публичный ключ получателя пакета + * @param packet пакет, по типу которого будет происходить удаление из буфера + * @param fromTimestampMs метка времени в миллисекундах, после которой были добавлены пакеты, которые нужно удалить + */ + public void deletePacketsFromBuffer(String to, Packet packet, long fromTimestampMs) { + int packetId = this.packetManager.getPacketIdByClass(packet.getClass()); + String hql = "DELETE FROM Buffer WHERE to = :to AND packetId = :packetId AND timestamp > :timestamp"; + HashMap parameters = new HashMap<>(); + parameters.put("to", to); + parameters.put("packetId", packetId); + parameters.put("timestamp", fromTimestampMs); + try(QuerySession querySession = this.getRepository().buildQuery(hql, parameters, true)){ + querySession.getQuery().executeUpdate(); + querySession.commit(); + } + } + } diff --git a/src/main/java/com/rosetta/im/service/services/DeviceService.java b/src/main/java/com/rosetta/im/service/services/DeviceService.java index 6a2d34d..8394c27 100644 --- a/src/main/java/com/rosetta/im/service/services/DeviceService.java +++ b/src/main/java/com/rosetta/im/service/services/DeviceService.java @@ -36,4 +36,13 @@ public class DeviceService extends Service { return false; } + /** + * Получить список устройств для публичного ключа + * @param publicKey публичный ключ пользователя, для которого нужно получить список устройств + * @return список устройств для публичного ключа + */ + public List getDevicesByPK(String publicKey) { + return this.getRepository().findAllByField("publicKey", publicKey); + } + } diff --git a/src/main/java/io/orprotocol/Server.java b/src/main/java/io/orprotocol/Server.java index 9001221..9adfe95 100644 --- a/src/main/java/io/orprotocol/Server.java +++ b/src/main/java/io/orprotocol/Server.java @@ -60,11 +60,14 @@ public class Server extends WebSocketServer { return; } Client client = socket.getAttachment(); - this.listener.onClientDisconnect(this, client); /** * Удаляем клиента из индексации (потому что он вышел) */ this.clientIndexer.removeClientFromIndex(client); + /** + * Вызываем событие отключения клиента + */ + this.listener.onClientDisconnect(this, client); } @Override @@ -232,6 +235,11 @@ public class Server extends WebSocketServer { * Останавливаем сервер при завершении работы и вызываем слушатели остановки сервера. */ this.listener.onServerStop(this); + try { + this.stop(); + } catch (InterruptedException e) { + e.printStackTrace(); + } })); } } diff --git a/src/main/java/io/orprotocol/client/Client.java b/src/main/java/io/orprotocol/client/Client.java index a1df794..541d254 100644 --- a/src/main/java/io/orprotocol/client/Client.java +++ b/src/main/java/io/orprotocol/client/Client.java @@ -114,6 +114,15 @@ public class Client { } } + /** + * Переиндексирует тег клиента в индексе клиентов. + * @param tagClass + * @param eciTag + */ + public void reindexTag(Class tagClass, T eciTag) { + this.clientIndexer.reindexTag(this, tagClass, eciTag); + } + /** * Получает данные клиента по указанному ключу. * @param key Ключ данных. @@ -168,6 +177,9 @@ public class Client { * @param packet Пакет для отправки. */ public void send(Packet packet) throws ProtocolException { + if(!this.socket.isOpen()){ + return; + } Integer packetId = this.packetManager.getPacketIdByClass(packet.getClass()); if(packetId == null) { throw new ProtocolException("Unknown packet class: " + packet.getClass().getName()); diff --git a/src/main/java/io/orprotocol/index/ClientIndexer.java b/src/main/java/io/orprotocol/index/ClientIndexer.java index 2a4f70d..2c29859 100644 --- a/src/main/java/io/orprotocol/index/ClientIndexer.java +++ b/src/main/java/io/orprotocol/index/ClientIndexer.java @@ -94,6 +94,23 @@ public class ClientIndexer { } } + /** + * Переиндексирует тег клиента в индексе клиентов. + * @param тип тега + * @param client клиент для которого нужно переиндексировать тег + * @param tagClass класс тега + * @param tag тег с новыми данными для переиндексации + */ + public void reindexTag(Client client, Class tagClass, T tag) { + /** + * Ведет на тот же метод что и индексация, так как индексация по умолчанию + * удаляет старые индексы и практически всегда делает реиндекс. Нужно для + * удобного и читаемого API, чтобы вызывающий код не выглядел так, как будто постоянно + * индексирует новые данные + */ + this.indexTag(client, tagClass, tag); + } + /** * Удаляет клиента из индекса тега