Сообщения, доставка сообщений, фикс хэндшейков, буферная зона (для синхронмзации)

This commit is contained in:
RoyceDa
2026-02-08 18:40:03 +02:00
parent 7766afa984
commit c036275ed9
23 changed files with 1025 additions and 56 deletions

View File

@@ -7,6 +7,7 @@ import com.rosetta.im.executors.Executor0Handshake;
import com.rosetta.im.executors.Executor1UserInfo;
import com.rosetta.im.executors.Executor3Search;
import com.rosetta.im.executors.Executor4OnlineState;
import com.rosetta.im.executors.Executor6Message;
import com.rosetta.im.listeners.HandshakeCompleteListener;
import com.rosetta.im.listeners.OnlineStatusDisconnectListener;
import com.rosetta.im.listeners.OnlineStatusHandshakeCompleteListener;
@@ -20,6 +21,8 @@ import com.rosetta.im.packet.Packet2Result;
import com.rosetta.im.packet.Packet3Search;
import com.rosetta.im.packet.Packet4OnlineSubscribe;
import com.rosetta.im.packet.Packet5OnlineState;
import com.rosetta.im.packet.Packet6Message;
import com.rosetta.im.packet.Packet8Delivery;
import io.orprotocol.Server;
import io.orprotocol.Settings;
@@ -28,6 +31,9 @@ import io.orprotocol.packet.PacketManager;
/**
* Boot отвечает за инициализацию всех пакетов и их обработчиков,
* а так же событий приложения. Этот Boot отвечает за приложение, а не за протокол.
*
* Нужен он для того, чтобы все части приложения получали одинаковые ссылки на глобальные обьекты приложения, такие как менеджер пакетов,
* менеджер событий, менеджер клиентов и так далее
*/
public class Boot {
@@ -111,13 +117,16 @@ public class Boot {
this.packetManager.registerPacket(3, Packet3Search.class);
this.packetManager.registerPacket(4, Packet4OnlineSubscribe.class);
this.packetManager.registerPacket(5, Packet5OnlineState.class);
this.packetManager.registerPacket(6, Packet6Message.class);
this.packetManager.registerPacket(8, Packet8Delivery.class);
}
private void registerAllExecutors() {
this.packetManager.registerExecutor(0, new Executor0Handshake(this.eventManager));
this.packetManager.registerExecutor(1, new Executor1UserInfo());
this.packetManager.registerExecutor(3, new Executor3Search(this.clientManager));
this.packetManager.registerExecutor(4, new Executor4OnlineState(this.onlineManager));
this.packetManager.registerExecutor(4, new Executor4OnlineState(this.onlineManager, this.clientManager));
this.packetManager.registerExecutor(6, new Executor6Message(this.clientManager, this.packetManager));
}
private void printBootMessage() {

View File

@@ -12,10 +12,18 @@ public enum Failures implements BaseFailures {
* Handshake не завершен
*/
HANDSHAKE_NOT_COMPLETED(3002),
/**
* Пользователь не состоит в группе, в которую пытается отправить сообщение
*/
USER_NOT_IN_GROUP(3005),
/**
* Неподдерживаемый протокол
*/
UNSUPPORTED_PROTOCOL(3008),
/**
* Слишком много вложений отправлено в сообщении
*/
TOO_MANY_ATTACHMENTS(3009),
/**
* Слишком много подписок на онлайн статусы
*/

View File

@@ -1,12 +1,15 @@
package com.rosetta.im.client;
import java.util.HashSet;
import java.util.List;
import com.rosetta.im.client.tags.ECIAuthentificate;
import io.orprotocol.ProtocolException;
import io.orprotocol.Server;
import io.orprotocol.client.Client;
import io.orprotocol.index.ClientIndexer;
import io.orprotocol.packet.Packet;
/**
* Менеджер клиентов
@@ -38,6 +41,43 @@ public class ClientManager {
*/
return false;
}
/**
* Отправить пакет всем клиентам с публичным ключом publicKey
* @param publicKey публичный ключ получателя
* @param packet пакет для отправки
* @throws ProtocolException если произошла ошибка при отправке пакета клиенту
*/
public void sendPacketToPK(String publicKey, Packet packet) throws ProtocolException {
HashSet<Client> clients = this.clientIndexer.getClients(ECIAuthentificate.class, "publicKey", publicKey);
for(Client client : clients){
ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class);
if(eciAuthentificate == null || !eciAuthentificate.hasAuthorized()){
/**
* Если клиент не авторизован, пропускаем его, он не должен получать пакеты,
* если нужно отправить пакет неавторизованному клиенту, нужно отправить его напрямую посредством client.send(packet),
* а не через этот метод
*/
continue;
}
/**
* Отправляем пакет каждому клиенту с таким публичным ключом
*/
client.send(packet);
}
}
/**
* Отправить пакет всем клиентам с публичными ключами из списка publicKeys
* @param publicKeys список публичных ключей получателей
* @param packet пакет для отправки
* @throws ProtocolException если произошла ошибка при отправке пакета клиенту
*/
public void sendPacketToPK(List<String> publicKeys, Packet packet) throws ProtocolException {
for(String publicKey : publicKeys){
this.sendPacketToPK(publicKey, packet);
}
}

View File

@@ -0,0 +1,74 @@
package com.rosetta.im.database.entity;
import com.rosetta.im.database.CreateUpdateEntity;
import jakarta.persistence.Column;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
/**
* Сущность для буфера сообщений, которые не были доставлены получателю, например,
* из-за того, что он был оффлайн, а так же для синхронизации сообщений
* между устройствами одного пользователя.
* Сообщения в буфере хранятся в виде сериализованных пакетов.
* Когда получатель становится онлайн, сервер пытается доставить ему все сообщения из буфера.
*/
@Entity
@Table(name = "packet_buffer")
public class Buffer extends CreateUpdateEntity {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "source")
private String from;
@Column(name = "destination")
private String to;
@Column(name = "packet", columnDefinition = "bytea")
private byte[] packet;
@Column(name = "timestamp")
private Long timestamp;
public Long getId() {
return id;
}
public String getFrom() {
return from;
}
public String getTo() {
return to;
}
public byte[] getPacket() {
return packet;
}
public Long getTimestamp() {
return timestamp;
}
public void setFrom(String from) {
this.from = from;
}
public void setTo(String to) {
this.to = to;
}
public void setPacket(byte[] packet) {
this.packet = packet;
}
public void setTimestamp(Long timestamp) {
this.timestamp = timestamp;
}
}

View File

@@ -0,0 +1,67 @@
package com.rosetta.im.database.entity;
import java.util.ArrayList;
import java.util.List;
import com.rosetta.im.database.CreateUpdateEntity;
import com.rosetta.im.database.converters.StringListConverter;
import jakarta.persistence.Column;
import jakarta.persistence.Convert;
import jakarta.persistence.Entity;
import jakarta.persistence.GeneratedValue;
import jakarta.persistence.GenerationType;
import jakarta.persistence.Id;
import jakarta.persistence.Table;
/**
* Сущность для групповых чатов.
*/
@Entity
@Table(name = "groups")
public class Group extends CreateUpdateEntity {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
@Column(name = "groupId")
private String groupId;
@Convert(converter = StringListConverter.class)
@Column(name = "membersPublicKeys", nullable = false)
private List<String> membersPublicKeys = new ArrayList<>();
@Convert(converter = StringListConverter.class)
@Column(name = "bannedPublicKeys", nullable = false)
private List<String> bannedPublicKeys = new ArrayList<>();
public Long getId() {
return id;
}
public String getGroupId() {
return groupId;
}
public void setGroupId(String groupId) {
this.groupId = groupId;
}
public List<String> getMembersPublicKeys() {
return membersPublicKeys;
}
public void setMembersPublicKeys(List<String> membersPublicKeys) {
this.membersPublicKeys = membersPublicKeys;
}
public List<String> getBannedPublicKeys() {
return bannedPublicKeys;
}
public void setBannedPublicKeys(List<String> bannedPublicKeys) {
this.bannedPublicKeys = bannedPublicKeys;
}
}

View File

@@ -0,0 +1,12 @@
package com.rosetta.im.database.repository;
import com.rosetta.im.database.Repository;
import com.rosetta.im.database.entity.Buffer;
public class BufferRepository extends Repository<Buffer> {
public BufferRepository() {
super(Buffer.class);
}
}

View File

@@ -21,5 +21,27 @@ public class DeviceRepository extends Repository<Device> {
return this.findAllByField("publicKey", user.getPublicKey());
}
/**
* Считает количество устройств пользователя
* @param user пользователь
* @return количество устройств
*/
public long countUserDevices(User user) {
return this.countByField("publicKey", user.getPublicKey());
}
/**
* Обновляет время последней активности устройства
* @param deviceId ID устройства
*/
public void updateDeviceLeaveTime(String deviceId) {
Device device = this.findByField("deviceId", deviceId);
if(device == null) {
return;
}
device.setLeaveTime(System.currentTimeMillis());
this.update(device);
}
}

View File

@@ -0,0 +1,28 @@
package com.rosetta.im.database.repository;
import java.util.ArrayList;
import java.util.List;
import com.rosetta.im.database.Repository;
import com.rosetta.im.database.entity.Group;
public class GroupRepository extends Repository<Group> {
public GroupRepository() {
super(Group.class);
}
/**
* Найти участников группы по groupId
* @param groupId ID группы
* @return список публичных ключей участников группы
*/
public List<String> findGroupMembers(String groupId) {
Group group = this.findByField("groupId", groupId);
if(group == null) {
return new ArrayList<>();
}
return group.getMembersPublicKeys();
}
}

View File

@@ -0,0 +1,9 @@
package com.rosetta.im.exception;
public class UnauthorizedExeception extends Exception {
public UnauthorizedExeception(String message) {
super(message);
}
}

View File

@@ -126,7 +126,7 @@ public class Executor0Handshake extends PacketExecutor<Packet0Handshake> {
client.disconnect(Failures.AUTHENTIFICATION_ERROR);
return;
}
long userDevicesCount = deviceService.countUserDevices(user);
long userDevicesCount = deviceRepository.countUserDevices(user);
/**
* Проверяем верифицировано ли устройство

View File

@@ -1,11 +1,16 @@
package com.rosetta.im.executors;
import java.util.ArrayList;
import java.util.List;
import com.rosetta.im.Failures;
import com.rosetta.im.client.ClientManager;
import com.rosetta.im.client.OnlineManager;
import com.rosetta.im.client.tags.ECIAuthentificate;
import com.rosetta.im.packet.Packet4OnlineSubscribe;
import com.rosetta.im.packet.Packet5OnlineState;
import com.rosetta.im.packet.runtime.NetworkStatus;
import com.rosetta.im.packet.runtime.PKNetworkStatus;
import io.orprotocol.ProtocolException;
import io.orprotocol.client.Client;
@@ -14,9 +19,11 @@ import io.orprotocol.packet.PacketExecutor;
public class Executor4OnlineState extends PacketExecutor<Packet4OnlineSubscribe> {
private final OnlineManager onlineManager;
private final ClientManager clientManager;
public Executor4OnlineState(OnlineManager onlineManager) {
public Executor4OnlineState(OnlineManager onlineManager, ClientManager clientManager) {
this.onlineManager = onlineManager;
this.clientManager = clientManager;
}
@Override
@@ -49,6 +56,19 @@ public class Executor4OnlineState extends PacketExecutor<Packet4OnlineSubscribe>
for (String targetPublicKey : publicKeys) {
this.onlineManager.subscribe(client, targetPublicKey);
}
/**
* Сразу же формируем и отправляем клиенту онлайн статус для указанных публичных ключей, чтобы клиент не ждал обновления статуса,
* а получил его сразу после подписки
*/
Packet5OnlineState onlineStates = new Packet5OnlineState();
List<PKNetworkStatus> onlineStatuses = new ArrayList<>();
for (String targetPublicKey : publicKeys) {
boolean isOnline = this.clientManager.isClientConnected(targetPublicKey);
PKNetworkStatus networkStatus = new PKNetworkStatus(targetPublicKey, NetworkStatus.fromBoolean(isOnline));
onlineStatuses.add(networkStatus);
}
onlineStates.setPkNetworkStatuses(onlineStatuses);
client.send(onlineStates);
}
}

View File

@@ -0,0 +1,182 @@
package com.rosetta.im.executors;
import java.util.List;
import com.rosetta.im.Failures;
import com.rosetta.im.client.ClientManager;
import com.rosetta.im.client.tags.ECIAuthentificate;
import com.rosetta.im.database.repository.BufferRepository;
import com.rosetta.im.database.repository.GroupRepository;
import com.rosetta.im.packet.Packet6Message;
import com.rosetta.im.packet.Packet8Delivery;
import com.rosetta.im.packet.runtime.Attachment;
import com.rosetta.im.service.services.BufferService;
import io.orprotocol.ProtocolException;
import io.orprotocol.client.Client;
import io.orprotocol.packet.PacketExecutor;
import io.orprotocol.packet.PacketManager;
/**
* Обработчик пакета сообщений
*/
public class Executor6Message extends PacketExecutor<Packet6Message> {
private final GroupRepository groupRepository = new GroupRepository();
private final ClientManager clientManager;
private final BufferRepository bufferRepository = new BufferRepository();
private final BufferService bufferService;
public Executor6Message(ClientManager clientManager, PacketManager packetManager) {
this.clientManager = clientManager;
this.bufferService = new BufferService(bufferRepository, packetManager);
}
@Override
public void onPacketReceived(Packet6Message packet, Client client) throws Exception, ProtocolException {
String fromPublicKey = packet.getFromPublicKey();
String toPublicKey = packet.getToPublicKey();
String messageId = packet.getMessageId();
List<Attachment> attachments = packet.getAttachments();
int attachmentsCount = attachments.size();
ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class);
if(eciAuthentificate == null || !eciAuthentificate.hasAuthorized()){
/**
* Если пользователь не авторизован
*/
client.disconnect(Failures.HANDSHAKE_NOT_COMPLETED);
return;
}
if(!eciAuthentificate.getPublicKey().equals(fromPublicKey)){
/**
* Если клиент пытается отправить сообщение от отправителя,
* которым он не является
*/
client.disconnect(Failures.DATA_MISSMATCH);
return;
}
if(fromPublicKey.equals(toPublicKey)){
/**
* Самому себе отправить сообщение нельзя, но это более-менее
* нормальное поведение, хотя на клиентах должно быть отработано,
* не кикаем пользователя
*/
return;
}
long currentTimestampSec = (System.currentTimeMillis() / 1000);
long messageTimestampSec = (packet.getTimestamp() / 1000);
/**
* Максимальный возраст сообщения в секундах, который сервер примет, чтобы
* клиент не мог подделать дату отправки и отправлять
* сообщения из "прошлого"
*/
long maxPaddingSec = 30;
if(attachmentsCount > 0){
/**
* Так как у нас есть вложения, то клиенту нужно какое-то время на их загрузку,
* разрешаем клиенту превысить maxPaddingSec и даем ему 30 секунд
* на отправку одного вложения (этого более чем достаточно, так как клиент
* вообще не должен отправлять сообщение пока все вложения не будут загружены
* на сервер)
*/
maxPaddingSec = maxPaddingSec * attachmentsCount;
}
if(currentTimestampSec - messageTimestampSec > maxPaddingSec){
/**
* Если сообщение было отправлено из "прошлого", то есть на момент
* прихода на сервер сообщению уже больше секунд чем допускает
* maxPaddingSec, то отклоняем его
*/
return;
}
if(attachmentsCount > 10){
/**
* Слишком много отправляемых вложений, так нельзя
*/
client.disconnect(Failures.TOO_MANY_ATTACHMENTS);
return;
}
/**
* Обновляем системную метку времени в соотвествии с сервером, так как у клиентов могут быть например неправильно настроены часы
* или разные часовые пояса
*/
packet.setTimestamp(System.currentTimeMillis());
packet.setPrivateKey("");
if(toPublicKey.startsWith("#group:")){
/**
* Это групповое сообщение, отправляем его всем участникам группы, кроме отправителя
*/
this.sendMessageToGroup(packet, client, eciAuthentificate);
}else{
/**
* Это личное сообщение, отправляем его получателю
*/
this.sendIMMessage(packet, client);
}
/**
* Сообщение успешно отправлено, отправялем клиенту пакет успешной доставки
*/
Packet8Delivery deliveryPacket = new Packet8Delivery();
deliveryPacket.setMessageId(messageId);
deliveryPacket.setToPublicKey(toPublicKey);
client.send(deliveryPacket);
}
/**
* Отправляет групповое сообщение всем участникам группы, кроме отправителя
* @param packet пакет с групповым сообщением
*/
private void sendMessageToGroup(Packet6Message packet, Client client, ECIAuthentificate eciAuthentificate) throws ProtocolException {
String toPublicKey = packet.getToPublicKey();
List<String> groupMembersPublicKeys = this.groupRepository.findGroupMembers(toPublicKey.replace("#group:", ""));
if(groupMembersPublicKeys.isEmpty()){
/**
* Если группа не найдена или в группе нет участников, то в такую отправить
* сообщение нельзя
*/
client.disconnect(Failures.DATA_MISSMATCH);
return;
}
if(!groupMembersPublicKeys.contains(eciAuthentificate.getPublicKey())){
/**
* Если отправитель не является участником группы, то он не может отправлять
* сообщения в эту группу
*/
client.disconnect(Failures.USER_NOT_IN_GROUP);
return;
}
/**
* Отправляем всем участникам группы, кроме отправителя этот пакет, попутно не забывая проверить, а не один ли он в группе
*/
groupMembersPublicKeys.remove(eciAuthentificate.getPublicKey());
if(groupMembersPublicKeys.isEmpty()){
/**
* Если отправитель был единственным участником группы, то отправлять сообщение некуда,
* не кикаем пользователя
*/
return;
}
this.clientManager.sendPacketToPK(groupMembersPublicKeys, packet);
}
/**
* Отправляет личное сообщение получателю
* @param packet пакет с личным сообщением
*/
private void sendIMMessage(Packet6Message packet, Client client) throws ProtocolException {
String fromPublicKey = packet.getFromPublicKey();
String toPublicKey = packet.getToPublicKey();
this.clientManager.sendPacketToPK(toPublicKey, packet);
/**
* Сохраняем сообщение в буфер на случай если получатель офлайн, или нам нужна будет синхронизация сообщений для получателя
*/
this.bufferService.pushPacketToBuffer(fromPublicKey, toPublicKey, packet);
}
}

View File

@@ -20,7 +20,6 @@ import io.orprotocol.client.Client;
public class ServerStopListener implements Listener {
private final DeviceRepository deviceRepository = new DeviceRepository();
private final DeviceService deviceService = new DeviceService(deviceRepository);
private Logger logger;
public ServerStopListener(Logger logger) {
@@ -49,7 +48,7 @@ public class ServerStopListener implements Listener {
*/
continue;
}
deviceService.updateDeviceLeaveTime(eciDevice.getDeviceId());
deviceRepository.updateDeviceLeaveTime(eciDevice.getDeviceId());
}
this.logger.info(Color.RED + "Время последней активности устройств клиентов обновлено.");
}

View File

@@ -0,0 +1,249 @@
package com.rosetta.im.packet;
import java.util.List;
import com.rosetta.im.packet.runtime.Attachment;
import com.rosetta.im.packet.runtime.AttachmentType;
import io.orprotocol.Stream;
import io.orprotocol.packet.Packet;
/**
* Пакет для отправки сообщений между пользователями. Содержит зашифрованное текстовое содержимое и массив вложений с
* их метаданными. Временная метка используется для сортировки сообщений и отображения времени отправки. Идентификатор
* сообщения нужен для редактирования и удаления сообщений. Ключ chacha используется для шифрования текста сообщения,
* а aesChachaKey нужен для последующей синхронизации своих же сообщений на других устройствах.
* Сам ключ ChaCha20 во избежания обемена ключами зашифрован ECC.
*/
public class Packet6Message extends Packet {
/**
* Публичный ключ отправителя
*/
private String fromPublicKey;
/**
* Публичный ключ получателя, может начинаться с #group: для групповых сообщений
*/
private String toPublicKey;
/**
* Текстовое содержимое сообщения, зашифровано ChaCha20, зашифровано ECC
*/
private String content;
/**
* Ключ chacha для шифрования сообщения, зашифрован ECC
*/
private String chachaKey;
/**
* Временная метка сообщения в миллисекундах
*/
private long timestamp;
/**
* Приватный ключ отправителя
*/
private String privateKey;
/**
* Идентификатор сообщения, нужен для редактирования и удаления сообщений
*/
private String messageId;
/**
* Массив вложений в сообщении
*/
private List<Attachment> attachments;
/**
* Закодированный с помощью AES ключ chacha, нужен
* для последующей синхронизации своих же сообщений
*/
private String aesChachaKey;
@Override
public void read(Stream stream) {
this.fromPublicKey = stream.readString();
this.toPublicKey = stream.readString();
this.content = stream.readString();
this.chachaKey = stream.readString();
this.timestamp = stream.readInt64();
this.privateKey = stream.readString();
this.messageId = stream.readString();
int attachmentsCount = stream.readInt8();
this.attachments = new java.util.ArrayList<>();
for (int i = 0; i < attachmentsCount; i++) {
String id = stream.readString();
String preview = stream.readString();
String blob = stream.readString();
AttachmentType type = AttachmentType.fromCode(stream.readInt8());
this.attachments.add(new Attachment(id, blob, type, preview));
}
this.aesChachaKey = stream.readString();
}
@Override
public Stream write() {
Stream stream = new Stream();
stream.writeInt16(this.packetId);
stream.writeString(this.fromPublicKey);
stream.writeString(this.toPublicKey);
stream.writeString(this.content);
stream.writeString(this.chachaKey);
stream.writeInt64(this.timestamp);
stream.writeString(this.privateKey);
stream.writeString(this.messageId);
stream.writeInt8(this.attachments.size());
for (Attachment attachment : this.attachments) {
stream.writeString(attachment.getId());
stream.writeString(attachment.getPreview());
stream.writeString(attachment.getBlob());
stream.writeInt8((byte) attachment.getType().getCode());
}
stream.writeString(this.aesChachaKey);
return stream;
}
/**
* Получить публичный ключ отправителя
* @return публичный ключ отправителя
*/
public String getFromPublicKey() {
return fromPublicKey;
}
/**
* Получить публичный ключ получателя
* @return публичный ключ получателя
*/
public String getToPublicKey() {
return toPublicKey;
}
/**
* Получить текстовое содержимое сообщения, зашифровано ChaCha20, зашифровано ECC
* @return текстовое содержимое сообщения
*/
public String getContent() {
return content;
}
/**
* Получить ключ chacha для шифрования сообщения, зашифрован ECC
* @return ключ chacha
*/
public String getChachaKey() {
return chachaKey;
}
/**
* Получить временную метку сообщения в миллисекундах
* @return временная метка сообщения в мсиллисекундах
*/
public long getTimestamp() {
return timestamp;
}
/**
* Получает приватный ключ пользователя
* @return приватный ключ
* @deprecated с версии сервера 1.1 использование приватных ключей
* в протоколе устарело, так как теперь сервер использует Handshake для аутентификации пользователей.
*/
@Deprecated(since = "1.1", forRemoval = true)
public String getPrivateKey() {
return this.privateKey;
}
/**
* Получить идентификатор сообщения
* @return идентификатор сообщения
*/
public String getMessageId() {
return messageId;
}
/**
* Получить массив вложений в сообщении
* @return массив вложений в сообщении
*/
public List<Attachment> getAttachments() {
return attachments;
}
/**
* Получить закодированный с помощью AES ключ chacha
* @return ключ chacha
*/
public String getAesChachaKey() {
return aesChachaKey;
}
/**
* Устанавливает публичный ключ отправителя
* @param fromPublicKey публичный ключ отправителя
*/
public void setFromPublicKey(String fromPublicKey) {
this.fromPublicKey = fromPublicKey;
}
/**
* Устанавливает публичный ключ получателя
* @param toPublicKey публичный ключ получателя
*/
public void setToPublicKey(String toPublicKey) {
this.toPublicKey = toPublicKey;
}
/**
* Устанавливает текстовое содержимое сообщения
* @param content текстовое содержимое сообщения
*/
public void setContent(String content) {
this.content = content;
}
/**
* Устанавливает ключ chacha для шифрования сообщения
* @param chachaKey
*/
public void setChachaKey(String chachaKey) {
this.chachaKey = chachaKey;
}
/**
* Устанавливает временную метку сообщения в миллисекундах
* @param timestamp временная метка сообщения в миллисекундах
*/
public void setTimestamp(long timestamp) {
this.timestamp = timestamp;
}
/**
* Устанавливает приватный ключ пользователя
* @param privateKey приватный ключ
* @deprecated с версии сервера 1.1 использование приватных ключей
* в протоколе устарело, так как теперь сервер использует Handshake для аутентификации пользователей.
*/
@Deprecated(since = "1.1", forRemoval = true)
public void setPrivateKey(String privateKey) {
this.privateKey = privateKey;
}
/**
* Устанавливает идентификатор сообщения
* @param messageId идентификатор сообщения
*/
public void setMessageId(String messageId) {
this.messageId = messageId;
}
/**
* Устанавливает массив вложений в сообщении
* @param attachments массив вложений в сообщении
*/
public void setAttachments(List<Attachment> attachments) {
this.attachments = attachments;
}
/**
* Устанавливает закодированный с помощью AES ключ chacha
* @param aesChachaKey ключ chacha
*/
public void setAesChachaKey(String aesChachaKey) {
this.aesChachaKey = aesChachaKey;
}
}

View File

@@ -0,0 +1,61 @@
package com.rosetta.im.packet;
import io.orprotocol.Stream;
import io.orprotocol.packet.Packet;
/**
* Пакет обозначающий доставку сообщения получателю. Отправляется после успешной доставки сообщения получателю
*/
public class Packet8Delivery extends Packet {
private String messageId;
private String toPublicKey;
@Override
public void read(Stream stream) {
this.toPublicKey = stream.readString();
this.messageId = stream.readString();
}
@Override
public Stream write() {
Stream stream = new Stream();
stream.writeInt16(this.packetId);
stream.writeString(this.toPublicKey);
stream.writeString(this.messageId);
return stream;
}
/**
* Получить идентификатор доставленного сообщения
* @return идентификатор доставленного сообщения
*/
public String getMessageId() {
return messageId;
}
/**
* Получить публичный ключ получателя доставленного сообщения
* @return публичный ключ получателя доставленного сообщения
*/
public String getToPublicKey() {
return toPublicKey;
}
/**
* Установить идентификатор доставленного сообщения
* @param messageId идентификатор доставленного сообщения
*/
public void setMessageId(String messageId) {
this.messageId = messageId;
}
/**
* Установить публичный ключ получателя доставленного сообщения
* @param toPublicKey публичный ключ получателя доставленного сообщения
*/
public void setToPublicKey(String toPublicKey) {
this.toPublicKey = toPublicKey;
}
}

View File

@@ -0,0 +1,52 @@
package com.rosetta.im.packet.runtime;
/**
* Вложение в сообщении
*/
public class Attachment {
private String id;
private String blob;
private AttachmentType type;
private String preview;
public Attachment(String id, String blob, AttachmentType type, String preview) {
this.id = id;
this.blob = blob;
this.type = type;
this.preview = preview;
}
/**
* Получить идентификатор вложения
* @return
*/
public String getId() {
return id;
}
/**
* Получить данные вложения в виде строки
* @return
*/
public String getBlob() {
return blob;
}
/**
* Получить тип вложения
* @return
*/
public AttachmentType getType() {
return type;
}
/**
* Получить превью вложения (например, для изображений)
* @return
*/
public String getPreview() {
return preview;
}
}

View File

@@ -0,0 +1,31 @@
package com.rosetta.im.packet.runtime;
/**
* Тип вложения в сообщении
*/
public enum AttachmentType {
IMAGE(0),
MESSAGES(1),
FILE(2),
AVATAR(3);
private final int code;
AttachmentType(int code) {
this.code = code;
}
public int getCode() {
return code;
}
public static AttachmentType fromCode(int code) {
for (AttachmentType type : AttachmentType.values()) {
if (type.getCode() == code) {
return type;
}
}
throw new IllegalArgumentException("Invalid AttachmentType code: " + code);
}
}

View File

@@ -0,0 +1,76 @@
package com.rosetta.im.service.services;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import com.rosetta.im.client.tags.ECIAuthentificate;
import com.rosetta.im.database.QuerySession;
import com.rosetta.im.database.entity.Buffer;
import com.rosetta.im.database.repository.BufferRepository;
import com.rosetta.im.exception.UnauthorizedExeception;
import com.rosetta.im.service.Service;
import io.orprotocol.ProtocolException;
import io.orprotocol.client.Client;
import io.orprotocol.packet.Packet;
import io.orprotocol.packet.PacketManager;
public class BufferService extends Service<BufferRepository> {
private PacketManager packetManager;
public BufferService(BufferRepository repository, PacketManager packetManager) {
super(repository);
this.packetManager = packetManager;
}
/**
* Получить пакеты из буфера для клиента, которые были добавлены в буфер после fromTimestampMs. Если клиент не авторизован, возвращает пустой список.
* @param client
* @param fromTimestampMs
* @return
* @throws ProtocolException
*/
public List<Packet> getPacketsFromTime(Client client, long fromTimestampMs) throws ProtocolException, UnauthorizedExeception {
ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class);
if(eciAuthentificate == null || !eciAuthentificate.hasAuthorized()){
/**
* Если клиент не авторизован, то он не может получать пакеты из буфера, возвращаем пустой список
*/
throw new UnauthorizedExeception("Unauthorized client cannot get packets from buffer");
}
String toPublicKey = eciAuthentificate.getPublicKey();
String hql = "FROM Buffer WHERE to = :to AND timestamp > :timestamp ORDER BY timestamp ASC";
HashMap<String, Object> parameters = new HashMap<>();
parameters.put("to", toPublicKey);
parameters.put("timestamp", fromTimestampMs);
List<Packet> packets = new ArrayList<>();
try(QuerySession<Buffer> querySession = this.getRepository().buildQuery(hql, parameters)){
List<Buffer> buffers = querySession.getQuery().list();
for(Buffer buffer : buffers) {
byte[] packetBytes = buffer.getPacket();
Packet packet = this.packetManager.createPacket(packetBytes);
packets.add(packet);
}
}
return packets;
}
/**
* Добавить пакет в буфер для клиента с публичным ключом to. Если клиент не авторизован, выбрасывает UnauthorizedExeception
* @param from публичный ключ отправителя пакета
* @param to публичный ключ получателя пакета
* @param packet пакет для добавления в буфер
*/
public void pushPacketToBuffer(String from, String to, Packet packet) {
byte[] packetBytes = packet.write().getBuffer();
Buffer buffer = new Buffer();
buffer.setFrom(from);
buffer.setTo(to);
buffer.setTimestamp(System.currentTimeMillis());
buffer.setPacket(packetBytes);
this.getRepository().save(buffer);
}
}

View File

@@ -36,26 +36,4 @@ public class DeviceService extends Service<DeviceRepository> {
return false;
}
/**
* Считает количество устройств пользователя
* @param user пользователь
* @return количество устройств
*/
public long countUserDevices(User user) {
return this.getRepository().countByField("publicKey", user.getPublicKey());
}
/**
* Обновляет время последней активности устройства
* @param deviceId ID устройства
*/
public void updateDeviceLeaveTime(String deviceId) {
Device device = this.getRepository().findByField("deviceId", deviceId);
if(device == null) {
return;
}
device.setLeaveTime(System.currentTimeMillis());
this.getRepository().update(device);
}
}

View File

@@ -17,6 +17,7 @@ import io.orprotocol.index.ClientIndexer;
import io.orprotocol.lock.ThreadLocker;
import io.orprotocol.packet.Packet;
import io.orprotocol.packet.PacketExecutor;
import io.orprotocol.packet.PacketFactory;
import io.orprotocol.packet.PacketManager;
public class Server extends WebSocketServer {
@@ -95,37 +96,13 @@ public class Server extends WebSocketServer {
public void onMessage(WebSocket socket, ByteBuffer byteBuffer) {
Client client = socket.getAttachment();
byte[] bytes = byteBuffer.array();
Stream stream = new Stream(bytes);
int packetId = stream.readInt16();
/**
* Обновляем время последнего полученного heartbeat.
* Так как клиент отпраивл нам пакет, он живой.
*/
client.updateHeartbeat();
if(!this.packetManager.hasPacketSupported(packetId)){
/**
* Если пакет не поддерживается, отключаем клиента с соответствующим кодом ошибки.
*/
client.disconnect(ServerFailures.UNSUPPORTED_PACKET);
return;
}
if(!this.packetManager.hasExecutorDelegated(packetId)){
/**
* Если для пакета не назначен обработчик, отключаем клиента с соответствующим кодом ошибки.
*/
client.disconnect(ServerFailures.UNSUPPORTED_PACKET);
return;
}
Class<? extends Packet> packetClass = this.packetManager.getPacketClass(packetId);
try {
Packet packet = packetClass.getConstructor().newInstance();
packet.packetId = packetId;
/**
* Читаем данные пакета из потока.
* Создаем пакет из полученных байтов.
*/
packet.read(stream);
PacketFactory packetFactory = new PacketFactory(bytes, this.packetManager);
Packet packet = packetFactory.createPacket();
int packetId = packetFactory.getPacketId();
/**
* Получаем обработчик пакета и вызываем его метод обработки.
*
@@ -161,8 +138,8 @@ public class Server extends WebSocketServer {
threadLocker.releaseLock(packet, executor.getClass());
}
} catch (Exception e) {
System.out.println("Error while processing packet " + packetClass.getName());
e.printStackTrace();
//client.disconnect(ServerFailures.UNSUPPORTED_PACKET);
}
}

View File

@@ -0,0 +1,52 @@
package io.orprotocol.packet;
import io.orprotocol.ProtocolException;
import io.orprotocol.Stream;
/**
* Фабрика для создания пакетов из байтового массива. Используется для создания пакетов при получении данных от клиента,
* а так же может быть использована приложением
*/
public class PacketFactory {
private byte[] bytes;
private PacketManager packetManager;
/**
* Создать фабрику для создания пакетов из байтового массива
* @param bytes байтовый массив для создания пакета
* @param packetManager менеджер пакетов для получения класса пакета по его id
*/
public PacketFactory(byte[] bytes, PacketManager packetManager) {
this.bytes = bytes;
this.packetManager = packetManager;
}
/**
* Создает пакет из массива байт, сериализует и возвращает его. Если пакет с таким id не поддерживается, выбрасывает ProtocolException
* @return созданный пакет
* @throws ProtocolException
*/
public Packet createPacket() throws ProtocolException {
Stream stream = new Stream(this.bytes);
int packetId = stream.readInt16();
if(!this.packetManager.hasPacketSupported(packetId)){
throw new ProtocolException("Unsupported packet with id " + packetId);
}
Class<? extends Packet> packetClass = this.packetManager.getPacketClass(packetId);
try {
Packet packet = packetClass.getConstructor().newInstance();
packet.packetId = packetId;
packet.read(stream);
return packet;
} catch (Exception e) {
throw new ProtocolException("Failed to create packet with id " + packetId);
}
}
public int getPacketId() {
Stream stream = new Stream(this.bytes);
return stream.readInt16();
}
}

View File

@@ -2,6 +2,8 @@ package io.orprotocol.packet;
import java.util.HashMap;
import io.orprotocol.ProtocolException;
/**
* Менеджер сетевых пакетов и их обработчиков.
*/
@@ -96,5 +98,24 @@ public class PacketManager {
return this.packets.size();
}
/**
* Создает пакет из массива байт, сериализует и возвращает его. Если пакет с таким id не поддерживается, выбрасывает ProtocolException
* @param bytes байтовый массив для создания пакета
* @return созданный пакет
* @throws ProtocolException если пакет с таким id не поддерживается или произошла ошибка при создании пакета
*/
public Packet createPacket(byte[] bytes) throws ProtocolException {
PacketFactory packetFactory = new PacketFactory(bytes, this);
return packetFactory.createPacket();
}
/**
* Создает фабрику для создания пакетов из байтового массива
* @param bytes байтовый массив для создания пакета
* @return фабрика для создания пакетов из байтового массива
*/
public PacketFactory getPacketFactory(byte[] bytes) {
return new PacketFactory(bytes, this);
}
}

View File

@@ -11,6 +11,8 @@
<!--Зарегистрированные таблицы-->
<mapping class="com.rosetta.im.database.entity.User"/>
<mapping class="com.rosetta.im.database.entity.Device"/>
<mapping class="com.rosetta.im.database.entity.Group"/>
<mapping class="com.rosetta.im.database.entity.Buffer"/>
</session-factory>
</hibernate-configuration>