diff --git a/src/main/java/com/rosetta/im/Boot.java b/src/main/java/com/rosetta/im/Boot.java index 39d0c52..5ecc8ca 100644 --- a/src/main/java/com/rosetta/im/Boot.java +++ b/src/main/java/com/rosetta/im/Boot.java @@ -7,6 +7,7 @@ import com.rosetta.im.executors.Executor0Handshake; import com.rosetta.im.executors.Executor1UserInfo; import com.rosetta.im.executors.Executor3Search; import com.rosetta.im.executors.Executor4OnlineState; +import com.rosetta.im.executors.Executor6Message; import com.rosetta.im.listeners.HandshakeCompleteListener; import com.rosetta.im.listeners.OnlineStatusDisconnectListener; import com.rosetta.im.listeners.OnlineStatusHandshakeCompleteListener; @@ -20,6 +21,8 @@ import com.rosetta.im.packet.Packet2Result; import com.rosetta.im.packet.Packet3Search; import com.rosetta.im.packet.Packet4OnlineSubscribe; import com.rosetta.im.packet.Packet5OnlineState; +import com.rosetta.im.packet.Packet6Message; +import com.rosetta.im.packet.Packet8Delivery; import io.orprotocol.Server; import io.orprotocol.Settings; @@ -28,6 +31,9 @@ import io.orprotocol.packet.PacketManager; /** * Boot отвечает за инициализацию всех пакетов и их обработчиков, * а так же событий приложения. Этот Boot отвечает за приложение, а не за протокол. + * + * Нужен он для того, чтобы все части приложения получали одинаковые ссылки на глобальные обьекты приложения, такие как менеджер пакетов, + * менеджер событий, менеджер клиентов и так далее */ public class Boot { @@ -111,13 +117,16 @@ public class Boot { this.packetManager.registerPacket(3, Packet3Search.class); this.packetManager.registerPacket(4, Packet4OnlineSubscribe.class); this.packetManager.registerPacket(5, Packet5OnlineState.class); + this.packetManager.registerPacket(6, Packet6Message.class); + this.packetManager.registerPacket(8, Packet8Delivery.class); } private void registerAllExecutors() { this.packetManager.registerExecutor(0, new Executor0Handshake(this.eventManager)); this.packetManager.registerExecutor(1, new Executor1UserInfo()); this.packetManager.registerExecutor(3, new Executor3Search(this.clientManager)); - this.packetManager.registerExecutor(4, new Executor4OnlineState(this.onlineManager)); + this.packetManager.registerExecutor(4, new Executor4OnlineState(this.onlineManager, this.clientManager)); + this.packetManager.registerExecutor(6, new Executor6Message(this.clientManager, this.packetManager)); } private void printBootMessage() { diff --git a/src/main/java/com/rosetta/im/Failures.java b/src/main/java/com/rosetta/im/Failures.java index f867872..d57737d 100644 --- a/src/main/java/com/rosetta/im/Failures.java +++ b/src/main/java/com/rosetta/im/Failures.java @@ -12,10 +12,18 @@ public enum Failures implements BaseFailures { * Handshake не завершен */ HANDSHAKE_NOT_COMPLETED(3002), + /** + * Пользователь не состоит в группе, в которую пытается отправить сообщение + */ + USER_NOT_IN_GROUP(3005), /** * Неподдерживаемый протокол */ UNSUPPORTED_PROTOCOL(3008), + /** + * Слишком много вложений отправлено в сообщении + */ + TOO_MANY_ATTACHMENTS(3009), /** * Слишком много подписок на онлайн статусы */ diff --git a/src/main/java/com/rosetta/im/client/ClientManager.java b/src/main/java/com/rosetta/im/client/ClientManager.java index 20672fa..85c0dc0 100644 --- a/src/main/java/com/rosetta/im/client/ClientManager.java +++ b/src/main/java/com/rosetta/im/client/ClientManager.java @@ -1,12 +1,15 @@ package com.rosetta.im.client; import java.util.HashSet; +import java.util.List; import com.rosetta.im.client.tags.ECIAuthentificate; +import io.orprotocol.ProtocolException; import io.orprotocol.Server; import io.orprotocol.client.Client; import io.orprotocol.index.ClientIndexer; +import io.orprotocol.packet.Packet; /** * Менеджер клиентов @@ -38,6 +41,43 @@ public class ClientManager { */ return false; } + + /** + * Отправить пакет всем клиентам с публичным ключом publicKey + * @param publicKey публичный ключ получателя + * @param packet пакет для отправки + * @throws ProtocolException если произошла ошибка при отправке пакета клиенту + */ + public void sendPacketToPK(String publicKey, Packet packet) throws ProtocolException { + HashSet clients = this.clientIndexer.getClients(ECIAuthentificate.class, "publicKey", publicKey); + for(Client client : clients){ + ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class); + if(eciAuthentificate == null || !eciAuthentificate.hasAuthorized()){ + /** + * Если клиент не авторизован, пропускаем его, он не должен получать пакеты, + * если нужно отправить пакет неавторизованному клиенту, нужно отправить его напрямую посредством client.send(packet), + * а не через этот метод + */ + continue; + } + /** + * Отправляем пакет каждому клиенту с таким публичным ключом + */ + client.send(packet); + } + } + + /** + * Отправить пакет всем клиентам с публичными ключами из списка publicKeys + * @param publicKeys список публичных ключей получателей + * @param packet пакет для отправки + * @throws ProtocolException если произошла ошибка при отправке пакета клиенту + */ + public void sendPacketToPK(List publicKeys, Packet packet) throws ProtocolException { + for(String publicKey : publicKeys){ + this.sendPacketToPK(publicKey, packet); + } + } diff --git a/src/main/java/com/rosetta/im/database/entity/Buffer.java b/src/main/java/com/rosetta/im/database/entity/Buffer.java new file mode 100644 index 0000000..dd99599 --- /dev/null +++ b/src/main/java/com/rosetta/im/database/entity/Buffer.java @@ -0,0 +1,74 @@ +package com.rosetta.im.database.entity; + +import com.rosetta.im.database.CreateUpdateEntity; + +import jakarta.persistence.Column; +import jakarta.persistence.Entity; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.GenerationType; +import jakarta.persistence.Id; +import jakarta.persistence.Table; + +/** + * Сущность для буфера сообщений, которые не были доставлены получателю, например, + * из-за того, что он был оффлайн, а так же для синхронизации сообщений + * между устройствами одного пользователя. + * Сообщения в буфере хранятся в виде сериализованных пакетов. + * Когда получатель становится онлайн, сервер пытается доставить ему все сообщения из буфера. + */ +@Entity +@Table(name = "packet_buffer") +public class Buffer extends CreateUpdateEntity { + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long id; + + @Column(name = "source") + private String from; + + @Column(name = "destination") + private String to; + + @Column(name = "packet", columnDefinition = "bytea") + private byte[] packet; + + @Column(name = "timestamp") + private Long timestamp; + + + public Long getId() { + return id; + } + + public String getFrom() { + return from; + } + + public String getTo() { + return to; + } + + public byte[] getPacket() { + return packet; + } + + public Long getTimestamp() { + return timestamp; + } + + public void setFrom(String from) { + this.from = from; + } + + public void setTo(String to) { + this.to = to; + } + + public void setPacket(byte[] packet) { + this.packet = packet; + } + + public void setTimestamp(Long timestamp) { + this.timestamp = timestamp; + } +} diff --git a/src/main/java/com/rosetta/im/database/entity/Group.java b/src/main/java/com/rosetta/im/database/entity/Group.java new file mode 100644 index 0000000..77f9e51 --- /dev/null +++ b/src/main/java/com/rosetta/im/database/entity/Group.java @@ -0,0 +1,67 @@ +package com.rosetta.im.database.entity; + +import java.util.ArrayList; +import java.util.List; + +import com.rosetta.im.database.CreateUpdateEntity; +import com.rosetta.im.database.converters.StringListConverter; + +import jakarta.persistence.Column; +import jakarta.persistence.Convert; +import jakarta.persistence.Entity; +import jakarta.persistence.GeneratedValue; +import jakarta.persistence.GenerationType; +import jakarta.persistence.Id; +import jakarta.persistence.Table; + +/** + * Сущность для групповых чатов. + */ +@Entity +@Table(name = "groups") +public class Group extends CreateUpdateEntity { + + @Id + @GeneratedValue(strategy = GenerationType.IDENTITY) + private Long id; + + @Column(name = "groupId") + private String groupId; + + @Convert(converter = StringListConverter.class) + @Column(name = "membersPublicKeys", nullable = false) + private List membersPublicKeys = new ArrayList<>(); + + @Convert(converter = StringListConverter.class) + @Column(name = "bannedPublicKeys", nullable = false) + private List bannedPublicKeys = new ArrayList<>(); + + public Long getId() { + return id; + } + + public String getGroupId() { + return groupId; + } + + public void setGroupId(String groupId) { + this.groupId = groupId; + } + + public List getMembersPublicKeys() { + return membersPublicKeys; + } + + public void setMembersPublicKeys(List membersPublicKeys) { + this.membersPublicKeys = membersPublicKeys; + } + + public List getBannedPublicKeys() { + return bannedPublicKeys; + } + + public void setBannedPublicKeys(List bannedPublicKeys) { + this.bannedPublicKeys = bannedPublicKeys; + } + +} diff --git a/src/main/java/com/rosetta/im/database/repository/BufferRepository.java b/src/main/java/com/rosetta/im/database/repository/BufferRepository.java new file mode 100644 index 0000000..1e68351 --- /dev/null +++ b/src/main/java/com/rosetta/im/database/repository/BufferRepository.java @@ -0,0 +1,12 @@ +package com.rosetta.im.database.repository; + +import com.rosetta.im.database.Repository; +import com.rosetta.im.database.entity.Buffer; + +public class BufferRepository extends Repository { + + public BufferRepository() { + super(Buffer.class); + } + +} diff --git a/src/main/java/com/rosetta/im/database/repository/DeviceRepository.java b/src/main/java/com/rosetta/im/database/repository/DeviceRepository.java index 6989f66..35a71f7 100644 --- a/src/main/java/com/rosetta/im/database/repository/DeviceRepository.java +++ b/src/main/java/com/rosetta/im/database/repository/DeviceRepository.java @@ -21,5 +21,27 @@ public class DeviceRepository extends Repository { return this.findAllByField("publicKey", user.getPublicKey()); } + /** + * Считает количество устройств пользователя + * @param user пользователь + * @return количество устройств + */ + public long countUserDevices(User user) { + return this.countByField("publicKey", user.getPublicKey()); + } + + /** + * Обновляет время последней активности устройства + * @param deviceId ID устройства + */ + public void updateDeviceLeaveTime(String deviceId) { + Device device = this.findByField("deviceId", deviceId); + if(device == null) { + return; + } + device.setLeaveTime(System.currentTimeMillis()); + this.update(device); + } + } diff --git a/src/main/java/com/rosetta/im/database/repository/GroupRepository.java b/src/main/java/com/rosetta/im/database/repository/GroupRepository.java new file mode 100644 index 0000000..4b73131 --- /dev/null +++ b/src/main/java/com/rosetta/im/database/repository/GroupRepository.java @@ -0,0 +1,28 @@ +package com.rosetta.im.database.repository; + +import java.util.ArrayList; +import java.util.List; + +import com.rosetta.im.database.Repository; +import com.rosetta.im.database.entity.Group; + +public class GroupRepository extends Repository { + + public GroupRepository() { + super(Group.class); + } + + /** + * Найти участников группы по groupId + * @param groupId ID группы + * @return список публичных ключей участников группы + */ + public List findGroupMembers(String groupId) { + Group group = this.findByField("groupId", groupId); + if(group == null) { + return new ArrayList<>(); + } + return group.getMembersPublicKeys(); + } + +} diff --git a/src/main/java/com/rosetta/im/exception/UnauthorizedExeception.java b/src/main/java/com/rosetta/im/exception/UnauthorizedExeception.java new file mode 100644 index 0000000..9766fcb --- /dev/null +++ b/src/main/java/com/rosetta/im/exception/UnauthorizedExeception.java @@ -0,0 +1,9 @@ +package com.rosetta.im.exception; + +public class UnauthorizedExeception extends Exception { + + public UnauthorizedExeception(String message) { + super(message); + } + +} diff --git a/src/main/java/com/rosetta/im/executors/Executor0Handshake.java b/src/main/java/com/rosetta/im/executors/Executor0Handshake.java index 3dfbff7..acae560 100644 --- a/src/main/java/com/rosetta/im/executors/Executor0Handshake.java +++ b/src/main/java/com/rosetta/im/executors/Executor0Handshake.java @@ -126,7 +126,7 @@ public class Executor0Handshake extends PacketExecutor { client.disconnect(Failures.AUTHENTIFICATION_ERROR); return; } - long userDevicesCount = deviceService.countUserDevices(user); + long userDevicesCount = deviceRepository.countUserDevices(user); /** * Проверяем верифицировано ли устройство diff --git a/src/main/java/com/rosetta/im/executors/Executor4OnlineState.java b/src/main/java/com/rosetta/im/executors/Executor4OnlineState.java index 0d126ef..f508ed6 100644 --- a/src/main/java/com/rosetta/im/executors/Executor4OnlineState.java +++ b/src/main/java/com/rosetta/im/executors/Executor4OnlineState.java @@ -1,11 +1,16 @@ package com.rosetta.im.executors; +import java.util.ArrayList; import java.util.List; import com.rosetta.im.Failures; +import com.rosetta.im.client.ClientManager; import com.rosetta.im.client.OnlineManager; import com.rosetta.im.client.tags.ECIAuthentificate; import com.rosetta.im.packet.Packet4OnlineSubscribe; +import com.rosetta.im.packet.Packet5OnlineState; +import com.rosetta.im.packet.runtime.NetworkStatus; +import com.rosetta.im.packet.runtime.PKNetworkStatus; import io.orprotocol.ProtocolException; import io.orprotocol.client.Client; @@ -14,9 +19,11 @@ import io.orprotocol.packet.PacketExecutor; public class Executor4OnlineState extends PacketExecutor { private final OnlineManager onlineManager; + private final ClientManager clientManager; - public Executor4OnlineState(OnlineManager onlineManager) { + public Executor4OnlineState(OnlineManager onlineManager, ClientManager clientManager) { this.onlineManager = onlineManager; + this.clientManager = clientManager; } @Override @@ -49,6 +56,19 @@ public class Executor4OnlineState extends PacketExecutor for (String targetPublicKey : publicKeys) { this.onlineManager.subscribe(client, targetPublicKey); } + /** + * Сразу же формируем и отправляем клиенту онлайн статус для указанных публичных ключей, чтобы клиент не ждал обновления статуса, + * а получил его сразу после подписки + */ + Packet5OnlineState onlineStates = new Packet5OnlineState(); + List onlineStatuses = new ArrayList<>(); + for (String targetPublicKey : publicKeys) { + boolean isOnline = this.clientManager.isClientConnected(targetPublicKey); + PKNetworkStatus networkStatus = new PKNetworkStatus(targetPublicKey, NetworkStatus.fromBoolean(isOnline)); + onlineStatuses.add(networkStatus); + } + onlineStates.setPkNetworkStatuses(onlineStatuses); + client.send(onlineStates); } } diff --git a/src/main/java/com/rosetta/im/executors/Executor6Message.java b/src/main/java/com/rosetta/im/executors/Executor6Message.java new file mode 100644 index 0000000..80d44d4 --- /dev/null +++ b/src/main/java/com/rosetta/im/executors/Executor6Message.java @@ -0,0 +1,182 @@ +package com.rosetta.im.executors; + +import java.util.List; + +import com.rosetta.im.Failures; +import com.rosetta.im.client.ClientManager; +import com.rosetta.im.client.tags.ECIAuthentificate; +import com.rosetta.im.database.repository.BufferRepository; +import com.rosetta.im.database.repository.GroupRepository; +import com.rosetta.im.packet.Packet6Message; +import com.rosetta.im.packet.Packet8Delivery; +import com.rosetta.im.packet.runtime.Attachment; +import com.rosetta.im.service.services.BufferService; + +import io.orprotocol.ProtocolException; +import io.orprotocol.client.Client; +import io.orprotocol.packet.PacketExecutor; +import io.orprotocol.packet.PacketManager; + +/** + * Обработчик пакета сообщений + */ +public class Executor6Message extends PacketExecutor { + + private final GroupRepository groupRepository = new GroupRepository(); + private final ClientManager clientManager; + private final BufferRepository bufferRepository = new BufferRepository(); + private final BufferService bufferService; + + public Executor6Message(ClientManager clientManager, PacketManager packetManager) { + this.clientManager = clientManager; + this.bufferService = new BufferService(bufferRepository, packetManager); + } + + @Override + public void onPacketReceived(Packet6Message packet, Client client) throws Exception, ProtocolException { + String fromPublicKey = packet.getFromPublicKey(); + String toPublicKey = packet.getToPublicKey(); + String messageId = packet.getMessageId(); + List attachments = packet.getAttachments(); + int attachmentsCount = attachments.size(); + ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class); + + if(eciAuthentificate == null || !eciAuthentificate.hasAuthorized()){ + /** + * Если пользователь не авторизован + */ + client.disconnect(Failures.HANDSHAKE_NOT_COMPLETED); + return; + } + + if(!eciAuthentificate.getPublicKey().equals(fromPublicKey)){ + /** + * Если клиент пытается отправить сообщение от отправителя, + * которым он не является + */ + client.disconnect(Failures.DATA_MISSMATCH); + return; + } + + if(fromPublicKey.equals(toPublicKey)){ + /** + * Самому себе отправить сообщение нельзя, но это более-менее + * нормальное поведение, хотя на клиентах должно быть отработано, + * не кикаем пользователя + */ + return; + } + + long currentTimestampSec = (System.currentTimeMillis() / 1000); + long messageTimestampSec = (packet.getTimestamp() / 1000); + /** + * Максимальный возраст сообщения в секундах, который сервер примет, чтобы + * клиент не мог подделать дату отправки и отправлять + * сообщения из "прошлого" + */ + long maxPaddingSec = 30; + if(attachmentsCount > 0){ + /** + * Так как у нас есть вложения, то клиенту нужно какое-то время на их загрузку, + * разрешаем клиенту превысить maxPaddingSec и даем ему 30 секунд + * на отправку одного вложения (этого более чем достаточно, так как клиент + * вообще не должен отправлять сообщение пока все вложения не будут загружены + * на сервер) + */ + maxPaddingSec = maxPaddingSec * attachmentsCount; + } + if(currentTimestampSec - messageTimestampSec > maxPaddingSec){ + /** + * Если сообщение было отправлено из "прошлого", то есть на момент + * прихода на сервер сообщению уже больше секунд чем допускает + * maxPaddingSec, то отклоняем его + */ + return; + } + if(attachmentsCount > 10){ + /** + * Слишком много отправляемых вложений, так нельзя + */ + client.disconnect(Failures.TOO_MANY_ATTACHMENTS); + return; + } + /** + * Обновляем системную метку времени в соотвествии с сервером, так как у клиентов могут быть например неправильно настроены часы + * или разные часовые пояса + */ + packet.setTimestamp(System.currentTimeMillis()); + packet.setPrivateKey(""); + + if(toPublicKey.startsWith("#group:")){ + /** + * Это групповое сообщение, отправляем его всем участникам группы, кроме отправителя + */ + this.sendMessageToGroup(packet, client, eciAuthentificate); + }else{ + /** + * Это личное сообщение, отправляем его получателю + */ + this.sendIMMessage(packet, client); + } + + /** + * Сообщение успешно отправлено, отправялем клиенту пакет успешной доставки + */ + Packet8Delivery deliveryPacket = new Packet8Delivery(); + deliveryPacket.setMessageId(messageId); + deliveryPacket.setToPublicKey(toPublicKey); + client.send(deliveryPacket); + } + + /** + * Отправляет групповое сообщение всем участникам группы, кроме отправителя + * @param packet пакет с групповым сообщением + */ + private void sendMessageToGroup(Packet6Message packet, Client client, ECIAuthentificate eciAuthentificate) throws ProtocolException { + String toPublicKey = packet.getToPublicKey(); + List groupMembersPublicKeys = this.groupRepository.findGroupMembers(toPublicKey.replace("#group:", "")); + if(groupMembersPublicKeys.isEmpty()){ + /** + * Если группа не найдена или в группе нет участников, то в такую отправить + * сообщение нельзя + */ + client.disconnect(Failures.DATA_MISSMATCH); + return; + } + if(!groupMembersPublicKeys.contains(eciAuthentificate.getPublicKey())){ + /** + * Если отправитель не является участником группы, то он не может отправлять + * сообщения в эту группу + */ + client.disconnect(Failures.USER_NOT_IN_GROUP); + return; + } + /** + * Отправляем всем участникам группы, кроме отправителя этот пакет, попутно не забывая проверить, а не один ли он в группе + */ + groupMembersPublicKeys.remove(eciAuthentificate.getPublicKey()); + if(groupMembersPublicKeys.isEmpty()){ + /** + * Если отправитель был единственным участником группы, то отправлять сообщение некуда, + * не кикаем пользователя + */ + return; + } + this.clientManager.sendPacketToPK(groupMembersPublicKeys, packet); + } + + /** + * Отправляет личное сообщение получателю + * @param packet пакет с личным сообщением + */ + private void sendIMMessage(Packet6Message packet, Client client) throws ProtocolException { + String fromPublicKey = packet.getFromPublicKey(); + String toPublicKey = packet.getToPublicKey(); + this.clientManager.sendPacketToPK(toPublicKey, packet); + /** + * Сохраняем сообщение в буфер на случай если получатель офлайн, или нам нужна будет синхронизация сообщений для получателя + */ + this.bufferService.pushPacketToBuffer(fromPublicKey, toPublicKey, packet); + } + +} diff --git a/src/main/java/com/rosetta/im/listeners/ServerStopListener.java b/src/main/java/com/rosetta/im/listeners/ServerStopListener.java index e0d459b..74411a6 100644 --- a/src/main/java/com/rosetta/im/listeners/ServerStopListener.java +++ b/src/main/java/com/rosetta/im/listeners/ServerStopListener.java @@ -20,7 +20,6 @@ import io.orprotocol.client.Client; public class ServerStopListener implements Listener { private final DeviceRepository deviceRepository = new DeviceRepository(); - private final DeviceService deviceService = new DeviceService(deviceRepository); private Logger logger; public ServerStopListener(Logger logger) { @@ -49,7 +48,7 @@ public class ServerStopListener implements Listener { */ continue; } - deviceService.updateDeviceLeaveTime(eciDevice.getDeviceId()); + deviceRepository.updateDeviceLeaveTime(eciDevice.getDeviceId()); } this.logger.info(Color.RED + "Время последней активности устройств клиентов обновлено."); } diff --git a/src/main/java/com/rosetta/im/packet/Packet6Message.java b/src/main/java/com/rosetta/im/packet/Packet6Message.java new file mode 100644 index 0000000..37c7dac --- /dev/null +++ b/src/main/java/com/rosetta/im/packet/Packet6Message.java @@ -0,0 +1,249 @@ +package com.rosetta.im.packet; + +import java.util.List; + +import com.rosetta.im.packet.runtime.Attachment; +import com.rosetta.im.packet.runtime.AttachmentType; + +import io.orprotocol.Stream; +import io.orprotocol.packet.Packet; + +/** + * Пакет для отправки сообщений между пользователями. Содержит зашифрованное текстовое содержимое и массив вложений с + * их метаданными. Временная метка используется для сортировки сообщений и отображения времени отправки. Идентификатор + * сообщения нужен для редактирования и удаления сообщений. Ключ chacha используется для шифрования текста сообщения, + * а aesChachaKey нужен для последующей синхронизации своих же сообщений на других устройствах. + * Сам ключ ChaCha20 во избежания обемена ключами зашифрован ECC. + */ +public class Packet6Message extends Packet { + /** + * Публичный ключ отправителя + */ + private String fromPublicKey; + /** + * Публичный ключ получателя, может начинаться с #group: для групповых сообщений + */ + private String toPublicKey; + /** + * Текстовое содержимое сообщения, зашифровано ChaCha20, зашифровано ECC + */ + private String content; + /** + * Ключ chacha для шифрования сообщения, зашифрован ECC + */ + private String chachaKey; + /** + * Временная метка сообщения в миллисекундах + */ + private long timestamp; + /** + * Приватный ключ отправителя + */ + private String privateKey; + /** + * Идентификатор сообщения, нужен для редактирования и удаления сообщений + */ + private String messageId; + /** + * Массив вложений в сообщении + */ + private List attachments; + /** + * Закодированный с помощью AES ключ chacha, нужен + * для последующей синхронизации своих же сообщений + */ + private String aesChachaKey; + + @Override + public void read(Stream stream) { + this.fromPublicKey = stream.readString(); + this.toPublicKey = stream.readString(); + this.content = stream.readString(); + this.chachaKey = stream.readString(); + this.timestamp = stream.readInt64(); + this.privateKey = stream.readString(); + this.messageId = stream.readString(); + int attachmentsCount = stream.readInt8(); + this.attachments = new java.util.ArrayList<>(); + for (int i = 0; i < attachmentsCount; i++) { + String id = stream.readString(); + String preview = stream.readString(); + String blob = stream.readString(); + AttachmentType type = AttachmentType.fromCode(stream.readInt8()); + this.attachments.add(new Attachment(id, blob, type, preview)); + } + this.aesChachaKey = stream.readString(); + } + + @Override + public Stream write() { + Stream stream = new Stream(); + stream.writeInt16(this.packetId); + stream.writeString(this.fromPublicKey); + stream.writeString(this.toPublicKey); + stream.writeString(this.content); + stream.writeString(this.chachaKey); + stream.writeInt64(this.timestamp); + stream.writeString(this.privateKey); + stream.writeString(this.messageId); + stream.writeInt8(this.attachments.size()); + for (Attachment attachment : this.attachments) { + stream.writeString(attachment.getId()); + stream.writeString(attachment.getPreview()); + stream.writeString(attachment.getBlob()); + stream.writeInt8((byte) attachment.getType().getCode()); + } + stream.writeString(this.aesChachaKey); + return stream; + } + + /** + * Получить публичный ключ отправителя + * @return публичный ключ отправителя + */ + public String getFromPublicKey() { + return fromPublicKey; + } + + /** + * Получить публичный ключ получателя + * @return публичный ключ получателя + */ + public String getToPublicKey() { + return toPublicKey; + } + + /** + * Получить текстовое содержимое сообщения, зашифровано ChaCha20, зашифровано ECC + * @return текстовое содержимое сообщения + */ + public String getContent() { + return content; + } + /** + * Получить ключ chacha для шифрования сообщения, зашифрован ECC + * @return ключ chacha + */ + public String getChachaKey() { + return chachaKey; + } + + /** + * Получить временную метку сообщения в миллисекундах + * @return временная метка сообщения в мсиллисекундах + */ + public long getTimestamp() { + return timestamp; + } + + /** + * Получает приватный ключ пользователя + * @return приватный ключ + * @deprecated с версии сервера 1.1 использование приватных ключей + * в протоколе устарело, так как теперь сервер использует Handshake для аутентификации пользователей. + */ + @Deprecated(since = "1.1", forRemoval = true) + public String getPrivateKey() { + return this.privateKey; + } + + /** + * Получить идентификатор сообщения + * @return идентификатор сообщения + */ + public String getMessageId() { + return messageId; + } + + /** + * Получить массив вложений в сообщении + * @return массив вложений в сообщении + */ + public List getAttachments() { + return attachments; + } + + /** + * Получить закодированный с помощью AES ключ chacha + * @return ключ chacha + */ + public String getAesChachaKey() { + return aesChachaKey; + } + + /** + * Устанавливает публичный ключ отправителя + * @param fromPublicKey публичный ключ отправителя + */ + public void setFromPublicKey(String fromPublicKey) { + this.fromPublicKey = fromPublicKey; + } + + /** + * Устанавливает публичный ключ получателя + * @param toPublicKey публичный ключ получателя + */ + public void setToPublicKey(String toPublicKey) { + this.toPublicKey = toPublicKey; + } + + /** + * Устанавливает текстовое содержимое сообщения + * @param content текстовое содержимое сообщения + */ + public void setContent(String content) { + this.content = content; + } + + /** + * Устанавливает ключ chacha для шифрования сообщения + * @param chachaKey + */ + public void setChachaKey(String chachaKey) { + this.chachaKey = chachaKey; + } + + /** + * Устанавливает временную метку сообщения в миллисекундах + * @param timestamp временная метка сообщения в миллисекундах + */ + public void setTimestamp(long timestamp) { + this.timestamp = timestamp; + } + + /** + * Устанавливает приватный ключ пользователя + * @param privateKey приватный ключ + * @deprecated с версии сервера 1.1 использование приватных ключей + * в протоколе устарело, так как теперь сервер использует Handshake для аутентификации пользователей. + */ + @Deprecated(since = "1.1", forRemoval = true) + public void setPrivateKey(String privateKey) { + this.privateKey = privateKey; + } + + /** + * Устанавливает идентификатор сообщения + * @param messageId идентификатор сообщения + */ + public void setMessageId(String messageId) { + this.messageId = messageId; + } + + /** + * Устанавливает массив вложений в сообщении + * @param attachments массив вложений в сообщении + */ + public void setAttachments(List attachments) { + this.attachments = attachments; + } + + /** + * Устанавливает закодированный с помощью AES ключ chacha + * @param aesChachaKey ключ chacha + */ + public void setAesChachaKey(String aesChachaKey) { + this.aesChachaKey = aesChachaKey; + } + +} diff --git a/src/main/java/com/rosetta/im/packet/Packet8Delivery.java b/src/main/java/com/rosetta/im/packet/Packet8Delivery.java new file mode 100644 index 0000000..48965d2 --- /dev/null +++ b/src/main/java/com/rosetta/im/packet/Packet8Delivery.java @@ -0,0 +1,61 @@ +package com.rosetta.im.packet; + +import io.orprotocol.Stream; +import io.orprotocol.packet.Packet; + +/** + * Пакет обозначающий доставку сообщения получателю. Отправляется после успешной доставки сообщения получателю + */ +public class Packet8Delivery extends Packet { + + private String messageId; + private String toPublicKey; + + @Override + public void read(Stream stream) { + this.toPublicKey = stream.readString(); + this.messageId = stream.readString(); + } + + @Override + public Stream write() { + Stream stream = new Stream(); + stream.writeInt16(this.packetId); + stream.writeString(this.toPublicKey); + stream.writeString(this.messageId); + return stream; + } + + /** + * Получить идентификатор доставленного сообщения + * @return идентификатор доставленного сообщения + */ + public String getMessageId() { + return messageId; + } + + /** + * Получить публичный ключ получателя доставленного сообщения + * @return публичный ключ получателя доставленного сообщения + */ + public String getToPublicKey() { + return toPublicKey; + } + + /** + * Установить идентификатор доставленного сообщения + * @param messageId идентификатор доставленного сообщения + */ + public void setMessageId(String messageId) { + this.messageId = messageId; + } + + /** + * Установить публичный ключ получателя доставленного сообщения + * @param toPublicKey публичный ключ получателя доставленного сообщения + */ + public void setToPublicKey(String toPublicKey) { + this.toPublicKey = toPublicKey; + } + +} diff --git a/src/main/java/com/rosetta/im/packet/runtime/Attachment.java b/src/main/java/com/rosetta/im/packet/runtime/Attachment.java new file mode 100644 index 0000000..67ecfe4 --- /dev/null +++ b/src/main/java/com/rosetta/im/packet/runtime/Attachment.java @@ -0,0 +1,52 @@ +package com.rosetta.im.packet.runtime; + +/** + * Вложение в сообщении + */ +public class Attachment { + + private String id; + private String blob; + private AttachmentType type; + private String preview; + + public Attachment(String id, String blob, AttachmentType type, String preview) { + this.id = id; + this.blob = blob; + this.type = type; + this.preview = preview; + } + + /** + * Получить идентификатор вложения + * @return + */ + public String getId() { + return id; + } + + /** + * Получить данные вложения в виде строки + * @return + */ + public String getBlob() { + return blob; + } + + /** + * Получить тип вложения + * @return + */ + public AttachmentType getType() { + return type; + } + + /** + * Получить превью вложения (например, для изображений) + * @return + */ + public String getPreview() { + return preview; + } + +} diff --git a/src/main/java/com/rosetta/im/packet/runtime/AttachmentType.java b/src/main/java/com/rosetta/im/packet/runtime/AttachmentType.java new file mode 100644 index 0000000..dfbe12c --- /dev/null +++ b/src/main/java/com/rosetta/im/packet/runtime/AttachmentType.java @@ -0,0 +1,31 @@ +package com.rosetta.im.packet.runtime; + +/** + * Тип вложения в сообщении + */ +public enum AttachmentType { + IMAGE(0), + MESSAGES(1), + FILE(2), + AVATAR(3); + + + private final int code; + + AttachmentType(int code) { + this.code = code; + } + + public int getCode() { + return code; + } + + public static AttachmentType fromCode(int code) { + for (AttachmentType type : AttachmentType.values()) { + if (type.getCode() == code) { + return type; + } + } + throw new IllegalArgumentException("Invalid AttachmentType code: " + code); + } +} diff --git a/src/main/java/com/rosetta/im/service/services/BufferService.java b/src/main/java/com/rosetta/im/service/services/BufferService.java new file mode 100644 index 0000000..800bf73 --- /dev/null +++ b/src/main/java/com/rosetta/im/service/services/BufferService.java @@ -0,0 +1,76 @@ +package com.rosetta.im.service.services; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; + +import com.rosetta.im.client.tags.ECIAuthentificate; +import com.rosetta.im.database.QuerySession; +import com.rosetta.im.database.entity.Buffer; +import com.rosetta.im.database.repository.BufferRepository; +import com.rosetta.im.exception.UnauthorizedExeception; +import com.rosetta.im.service.Service; + +import io.orprotocol.ProtocolException; +import io.orprotocol.client.Client; +import io.orprotocol.packet.Packet; +import io.orprotocol.packet.PacketManager; + +public class BufferService extends Service { + + private PacketManager packetManager; + + public BufferService(BufferRepository repository, PacketManager packetManager) { + super(repository); + this.packetManager = packetManager; + } + + /** + * Получить пакеты из буфера для клиента, которые были добавлены в буфер после fromTimestampMs. Если клиент не авторизован, возвращает пустой список. + * @param client + * @param fromTimestampMs + * @return + * @throws ProtocolException + */ + public List getPacketsFromTime(Client client, long fromTimestampMs) throws ProtocolException, UnauthorizedExeception { + ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class); + if(eciAuthentificate == null || !eciAuthentificate.hasAuthorized()){ + /** + * Если клиент не авторизован, то он не может получать пакеты из буфера, возвращаем пустой список + */ + throw new UnauthorizedExeception("Unauthorized client cannot get packets from buffer"); + } + String toPublicKey = eciAuthentificate.getPublicKey(); + String hql = "FROM Buffer WHERE to = :to AND timestamp > :timestamp ORDER BY timestamp ASC"; + HashMap parameters = new HashMap<>(); + parameters.put("to", toPublicKey); + parameters.put("timestamp", fromTimestampMs); + List packets = new ArrayList<>(); + try(QuerySession querySession = this.getRepository().buildQuery(hql, parameters)){ + List buffers = querySession.getQuery().list(); + for(Buffer buffer : buffers) { + byte[] packetBytes = buffer.getPacket(); + Packet packet = this.packetManager.createPacket(packetBytes); + packets.add(packet); + } + } + return packets; + } + + /** + * Добавить пакет в буфер для клиента с публичным ключом to. Если клиент не авторизован, выбрасывает UnauthorizedExeception + * @param from публичный ключ отправителя пакета + * @param to публичный ключ получателя пакета + * @param packet пакет для добавления в буфер + */ + public void pushPacketToBuffer(String from, String to, Packet packet) { + byte[] packetBytes = packet.write().getBuffer(); + Buffer buffer = new Buffer(); + buffer.setFrom(from); + buffer.setTo(to); + buffer.setTimestamp(System.currentTimeMillis()); + buffer.setPacket(packetBytes); + this.getRepository().save(buffer); + } + +} diff --git a/src/main/java/com/rosetta/im/service/services/DeviceService.java b/src/main/java/com/rosetta/im/service/services/DeviceService.java index 8645651..6a2d34d 100644 --- a/src/main/java/com/rosetta/im/service/services/DeviceService.java +++ b/src/main/java/com/rosetta/im/service/services/DeviceService.java @@ -36,26 +36,4 @@ public class DeviceService extends Service { return false; } - /** - * Считает количество устройств пользователя - * @param user пользователь - * @return количество устройств - */ - public long countUserDevices(User user) { - return this.getRepository().countByField("publicKey", user.getPublicKey()); - } - - /** - * Обновляет время последней активности устройства - * @param deviceId ID устройства - */ - public void updateDeviceLeaveTime(String deviceId) { - Device device = this.getRepository().findByField("deviceId", deviceId); - if(device == null) { - return; - } - device.setLeaveTime(System.currentTimeMillis()); - this.getRepository().update(device); - } - } diff --git a/src/main/java/io/orprotocol/Server.java b/src/main/java/io/orprotocol/Server.java index de4a261..9001221 100644 --- a/src/main/java/io/orprotocol/Server.java +++ b/src/main/java/io/orprotocol/Server.java @@ -17,6 +17,7 @@ import io.orprotocol.index.ClientIndexer; import io.orprotocol.lock.ThreadLocker; import io.orprotocol.packet.Packet; import io.orprotocol.packet.PacketExecutor; +import io.orprotocol.packet.PacketFactory; import io.orprotocol.packet.PacketManager; public class Server extends WebSocketServer { @@ -95,37 +96,13 @@ public class Server extends WebSocketServer { public void onMessage(WebSocket socket, ByteBuffer byteBuffer) { Client client = socket.getAttachment(); byte[] bytes = byteBuffer.array(); - Stream stream = new Stream(bytes); - int packetId = stream.readInt16(); - /** - * Обновляем время последнего полученного heartbeat. - * Так как клиент отпраивл нам пакет, он живой. - */ - client.updateHeartbeat(); - - if(!this.packetManager.hasPacketSupported(packetId)){ - /** - * Если пакет не поддерживается, отключаем клиента с соответствующим кодом ошибки. - */ - client.disconnect(ServerFailures.UNSUPPORTED_PACKET); - return; - } - if(!this.packetManager.hasExecutorDelegated(packetId)){ - /** - * Если для пакета не назначен обработчик, отключаем клиента с соответствующим кодом ошибки. - */ - client.disconnect(ServerFailures.UNSUPPORTED_PACKET); - return; - } - - Class packetClass = this.packetManager.getPacketClass(packetId); try { - Packet packet = packetClass.getConstructor().newInstance(); - packet.packetId = packetId; /** - * Читаем данные пакета из потока. + * Создаем пакет из полученных байтов. */ - packet.read(stream); + PacketFactory packetFactory = new PacketFactory(bytes, this.packetManager); + Packet packet = packetFactory.createPacket(); + int packetId = packetFactory.getPacketId(); /** * Получаем обработчик пакета и вызываем его метод обработки. * @@ -161,8 +138,8 @@ public class Server extends WebSocketServer { threadLocker.releaseLock(packet, executor.getClass()); } } catch (Exception e) { - System.out.println("Error while processing packet " + packetClass.getName()); e.printStackTrace(); + //client.disconnect(ServerFailures.UNSUPPORTED_PACKET); } } diff --git a/src/main/java/io/orprotocol/packet/PacketFactory.java b/src/main/java/io/orprotocol/packet/PacketFactory.java new file mode 100644 index 0000000..4218f61 --- /dev/null +++ b/src/main/java/io/orprotocol/packet/PacketFactory.java @@ -0,0 +1,52 @@ +package io.orprotocol.packet; + +import io.orprotocol.ProtocolException; +import io.orprotocol.Stream; + +/** + * Фабрика для создания пакетов из байтового массива. Используется для создания пакетов при получении данных от клиента, + * а так же может быть использована приложением + */ +public class PacketFactory { + + private byte[] bytes; + private PacketManager packetManager; + + /** + * Создать фабрику для создания пакетов из байтового массива + * @param bytes байтовый массив для создания пакета + * @param packetManager менеджер пакетов для получения класса пакета по его id + */ + public PacketFactory(byte[] bytes, PacketManager packetManager) { + this.bytes = bytes; + this.packetManager = packetManager; + } + + /** + * Создает пакет из массива байт, сериализует и возвращает его. Если пакет с таким id не поддерживается, выбрасывает ProtocolException + * @return созданный пакет + * @throws ProtocolException + */ + public Packet createPacket() throws ProtocolException { + Stream stream = new Stream(this.bytes); + int packetId = stream.readInt16(); + if(!this.packetManager.hasPacketSupported(packetId)){ + throw new ProtocolException("Unsupported packet with id " + packetId); + } + Class packetClass = this.packetManager.getPacketClass(packetId); + try { + Packet packet = packetClass.getConstructor().newInstance(); + packet.packetId = packetId; + packet.read(stream); + return packet; + } catch (Exception e) { + throw new ProtocolException("Failed to create packet with id " + packetId); + } + } + + public int getPacketId() { + Stream stream = new Stream(this.bytes); + return stream.readInt16(); + } + +} diff --git a/src/main/java/io/orprotocol/packet/PacketManager.java b/src/main/java/io/orprotocol/packet/PacketManager.java index 3e11a29..2862c03 100644 --- a/src/main/java/io/orprotocol/packet/PacketManager.java +++ b/src/main/java/io/orprotocol/packet/PacketManager.java @@ -2,6 +2,8 @@ package io.orprotocol.packet; import java.util.HashMap; +import io.orprotocol.ProtocolException; + /** * Менеджер сетевых пакетов и их обработчиков. */ @@ -96,5 +98,24 @@ public class PacketManager { return this.packets.size(); } + /** + * Создает пакет из массива байт, сериализует и возвращает его. Если пакет с таким id не поддерживается, выбрасывает ProtocolException + * @param bytes байтовый массив для создания пакета + * @return созданный пакет + * @throws ProtocolException если пакет с таким id не поддерживается или произошла ошибка при создании пакета + */ + public Packet createPacket(byte[] bytes) throws ProtocolException { + PacketFactory packetFactory = new PacketFactory(bytes, this); + return packetFactory.createPacket(); + } + + /** + * Создает фабрику для создания пакетов из байтового массива + * @param bytes байтовый массив для создания пакета + * @return фабрика для создания пакетов из байтового массива + */ + public PacketFactory getPacketFactory(byte[] bytes) { + return new PacketFactory(bytes, this); + } } diff --git a/src/main/resources/hibernate.cfg.xml b/src/main/resources/hibernate.cfg.xml index 1a1dafc..edad6db 100644 --- a/src/main/resources/hibernate.cfg.xml +++ b/src/main/resources/hibernate.cfg.xml @@ -11,6 +11,8 @@ + +