Индикация чтения, базовые пакеты, новый диспатчер

This commit is contained in:
RoyceDa
2026-02-09 00:53:16 +02:00
parent 1c834cc4de
commit 7232fcb903
9 changed files with 295 additions and 123 deletions

View File

@@ -8,6 +8,7 @@ import com.rosetta.im.executors.Executor1UserInfo;
import com.rosetta.im.executors.Executor3Search;
import com.rosetta.im.executors.Executor4OnlineState;
import com.rosetta.im.executors.Executor6Message;
import com.rosetta.im.executors.Executor7Read;
import com.rosetta.im.listeners.HandshakeCompleteListener;
import com.rosetta.im.listeners.OnlineStatusDisconnectListener;
import com.rosetta.im.listeners.OnlineStatusHandshakeCompleteListener;
@@ -22,6 +23,7 @@ import com.rosetta.im.packet.Packet3Search;
import com.rosetta.im.packet.Packet4OnlineSubscribe;
import com.rosetta.im.packet.Packet5OnlineState;
import com.rosetta.im.packet.Packet6Message;
import com.rosetta.im.packet.Packet7Read;
import com.rosetta.im.packet.Packet8Delivery;
import io.orprotocol.Server;
@@ -118,6 +120,7 @@ public class Boot {
this.packetManager.registerPacket(4, Packet4OnlineSubscribe.class);
this.packetManager.registerPacket(5, Packet5OnlineState.class);
this.packetManager.registerPacket(6, Packet6Message.class);
this.packetManager.registerPacket(7, Packet7Read.class);
this.packetManager.registerPacket(8, Packet8Delivery.class);
}
@@ -127,6 +130,7 @@ public class Boot {
this.packetManager.registerExecutor(3, new Executor3Search(this.clientManager));
this.packetManager.registerExecutor(4, new Executor4OnlineState(this.onlineManager, this.clientManager));
this.packetManager.registerExecutor(6, new Executor6Message(this.clientManager, this.packetManager));
this.packetManager.registerExecutor(7, new Executor7Read(this.clientManager, this.packetManager));
}
private void printBootMessage() {

View File

@@ -30,16 +30,22 @@ public class ClientManager {
public boolean isClientConnected(String publicKey) {
HashSet<Client> clients = this.clientIndexer.getClients(ECIAuthentificate.class, "publicKey", publicKey);
if(clients.size() > 0){
if(clients == null){
/**
* Есть клиенты с таким публичным ключом
* Нет клиентов с таким публичным ключом
*/
return true;
return false;
}
if(clients.size() <= 0){
/**
* Нет клиентов с таким публичным ключом
*/
return false;
}
/**
* Нет клиентов с таким ключом
* Есть клиенты с таким публичным ключом
*/
return false;
return true;
}
/**
@@ -50,6 +56,12 @@ public class ClientManager {
*/
public void sendPacketToPK(String publicKey, Packet packet) throws ProtocolException {
HashSet<Client> clients = this.clientIndexer.getClients(ECIAuthentificate.class, "publicKey", publicKey);
if(clients == null){
/**
* Нет клиентов с таким публичным ключом, значит отправлять некому
*/
return;
}
for(Client client : clients){
ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class);
if(eciAuthentificate == null || !eciAuthentificate.hasAuthorized()){
@@ -61,7 +73,7 @@ public class ClientManager {
continue;
}
/**
* Отправляем пакет каждому клиенту с таким публичным ключом
* Отправляем пакет каждому клиенту с таким публичным ключом (то есть всем его авторизованным сессиям/устройствам)
*/
client.send(packet);
}

View File

@@ -10,6 +10,7 @@ import com.rosetta.im.database.repository.GroupRepository;
import com.rosetta.im.packet.Packet6Message;
import com.rosetta.im.packet.Packet8Delivery;
import com.rosetta.im.packet.runtime.Attachment;
import com.rosetta.im.service.dispatch.MessageDispatcher;
import com.rosetta.im.service.services.BufferService;
import io.orprotocol.ProtocolException;
@@ -22,14 +23,10 @@ import io.orprotocol.packet.PacketManager;
*/
public class Executor6Message extends PacketExecutor<Packet6Message> {
private final GroupRepository groupRepository = new GroupRepository();
private final ClientManager clientManager;
private final BufferRepository bufferRepository = new BufferRepository();
private final BufferService bufferService;
private final MessageDispatcher messageDispatcher;
public Executor6Message(ClientManager clientManager, PacketManager packetManager) {
this.clientManager = clientManager;
this.bufferService = new BufferService(bufferRepository, packetManager);
this.messageDispatcher = new MessageDispatcher(clientManager, packetManager);
}
@Override
@@ -111,12 +108,12 @@ public class Executor6Message extends PacketExecutor<Packet6Message> {
/**
* Это групповое сообщение, отправляем его всем участникам группы, кроме отправителя
*/
this.sendMessageToGroup(packet, client, eciAuthentificate);
this.messageDispatcher.sendGroup(packet, client, eciAuthentificate);
}else{
/**
* Это личное сообщение, отправляем его получателю
*/
this.sendIMMessage(packet, client);
this.messageDispatcher.sendPeer(packet, client);
}
/**
@@ -127,56 +124,5 @@ public class Executor6Message extends PacketExecutor<Packet6Message> {
deliveryPacket.setToPublicKey(toPublicKey);
client.send(deliveryPacket);
}
/**
* Отправляет групповое сообщение всем участникам группы, кроме отправителя
* @param packet пакет с групповым сообщением
*/
private void sendMessageToGroup(Packet6Message packet, Client client, ECIAuthentificate eciAuthentificate) throws ProtocolException {
String toPublicKey = packet.getToPublicKey();
List<String> groupMembersPublicKeys = this.groupRepository.findGroupMembers(toPublicKey.replace("#group:", ""));
if(groupMembersPublicKeys.isEmpty()){
/**
* Если группа не найдена или в группе нет участников, то в такую отправить
* сообщение нельзя
*/
client.disconnect(Failures.DATA_MISSMATCH);
return;
}
if(!groupMembersPublicKeys.contains(eciAuthentificate.getPublicKey())){
/**
* Если отправитель не является участником группы, то он не может отправлять
* сообщения в эту группу
*/
client.disconnect(Failures.USER_NOT_IN_GROUP);
return;
}
/**
* Отправляем всем участникам группы, кроме отправителя этот пакет, попутно не забывая проверить, а не один ли он в группе
*/
groupMembersPublicKeys.remove(eciAuthentificate.getPublicKey());
if(groupMembersPublicKeys.isEmpty()){
/**
* Если отправитель был единственным участником группы, то отправлять сообщение некуда,
* не кикаем пользователя
*/
return;
}
this.clientManager.sendPacketToPK(groupMembersPublicKeys, packet);
}
/**
* Отправляет личное сообщение получателю
* @param packet пакет с личным сообщением
*/
private void sendIMMessage(Packet6Message packet, Client client) throws ProtocolException {
String fromPublicKey = packet.getFromPublicKey();
String toPublicKey = packet.getToPublicKey();
this.clientManager.sendPacketToPK(toPublicKey, packet);
/**
* Сохраняем сообщение в буфер на случай если получатель офлайн, или нам нужна будет синхронизация сообщений для получателя
*/
this.bufferService.pushPacketToBuffer(fromPublicKey, toPublicKey, packet);
}
}

View File

@@ -0,0 +1,56 @@
package com.rosetta.im.executors;
import com.rosetta.im.Failures;
import com.rosetta.im.client.ClientManager;
import com.rosetta.im.client.tags.ECIAuthentificate;
import com.rosetta.im.packet.Packet7Read;
import com.rosetta.im.service.dispatch.MessageDispatcher;
import io.orprotocol.ProtocolException;
import io.orprotocol.client.Client;
import io.orprotocol.packet.PacketExecutor;
import io.orprotocol.packet.PacketManager;
public class Executor7Read extends PacketExecutor<Packet7Read> {
private final MessageDispatcher messageDispatcher;
public Executor7Read(ClientManager clientManager, PacketManager packetManager) {
this.messageDispatcher = new MessageDispatcher(clientManager, packetManager);
}
@Override
public void onPacketReceived(Packet7Read packet, Client client) throws Exception, ProtocolException {
ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class);
String fromPublicKey = packet.getFromPublicKey();
String toPublicKey = packet.getToPublicKey();
if(fromPublicKey.equals(toPublicKey)){
/**
* Ничего не делаем если назначение пакета такое же как и отправитель,
* такое поведение может быть при заходе в Saved Messages и должно быть правильно обработано на клиенте
*/
return;
}
if (eciAuthentificate == null || !eciAuthentificate.hasAuthorized()) {
/**
* Если клиент не прошел аутентификацию, то он не может читать сообщения
*/
client.disconnect(Failures.HANDSHAKE_NOT_COMPLETED);
return;
}
packet.setPrivateKey("");
if(toPublicKey.startsWith("#group:")){
/**
* Это групповое чтение, отправляем его всем участникам группы, кроме отправителя
*/
this.messageDispatcher.sendGroup(packet, client, eciAuthentificate);
}else{
/**
* Это личное сообщение, отправляем его получателю
*/
this.messageDispatcher.sendPeer(packet, client);
}
}
}

View File

@@ -0,0 +1,5 @@
package com.rosetta.im.executors.base;
public class ExecutorBaseDialog {
}

View File

@@ -2,11 +2,11 @@ package com.rosetta.im.packet;
import java.util.List;
import com.rosetta.im.packet.base.PacketBaseDialog;
import com.rosetta.im.packet.runtime.Attachment;
import com.rosetta.im.packet.runtime.AttachmentType;
import io.orprotocol.Stream;
import io.orprotocol.packet.Packet;
/**
* Пакет для отправки сообщений между пользователями. Содержит зашифрованное текстовое содержимое и массив вложений с
@@ -15,15 +15,7 @@ import io.orprotocol.packet.Packet;
* а aesChachaKey нужен для последующей синхронизации своих же сообщений на других устройствах.
* Сам ключ ChaCha20 во избежания обемена ключами зашифрован ECC.
*/
public class Packet6Message extends Packet {
/**
* Публичный ключ отправителя
*/
private String fromPublicKey;
/**
* Публичный ключ получателя, может начинаться с #group: для групповых сообщений
*/
private String toPublicKey;
public class Packet6Message extends PacketBaseDialog {
/**
* Текстовое содержимое сообщения, зашифровано ChaCha20, зашифровано ECC
*/
@@ -36,10 +28,6 @@ public class Packet6Message extends Packet {
* Временная метка сообщения в миллисекундах
*/
private long timestamp;
/**
* Приватный ключ отправителя
*/
private String privateKey;
/**
* Идентификатор сообщения, нужен для редактирования и удаления сообщений
*/
@@ -96,23 +84,6 @@ public class Packet6Message extends Packet {
stream.writeString(this.aesChachaKey);
return stream;
}
/**
* Получить публичный ключ отправителя
* @return публичный ключ отправителя
*/
public String getFromPublicKey() {
return fromPublicKey;
}
/**
* Получить публичный ключ получателя
* @return публичный ключ получателя
*/
public String getToPublicKey() {
return toPublicKey;
}
/**
* Получить текстовое содержимое сообщения, зашифровано ChaCha20, зашифровано ECC
* @return текстовое содержимое сообщения
@@ -136,17 +107,6 @@ public class Packet6Message extends Packet {
return timestamp;
}
/**
* Получает приватный ключ пользователя
* @return приватный ключ
* @deprecated с версии сервера 1.1 использование приватных ключей
* в протоколе устарело, так как теперь сервер использует Handshake для аутентификации пользователей.
*/
@Deprecated(since = "1.1", forRemoval = true)
public String getPrivateKey() {
return this.privateKey;
}
/**
* Получить идентификатор сообщения
* @return идентификатор сообщения
@@ -171,22 +131,6 @@ public class Packet6Message extends Packet {
return aesChachaKey;
}
/**
* Устанавливает публичный ключ отправителя
* @param fromPublicKey публичный ключ отправителя
*/
public void setFromPublicKey(String fromPublicKey) {
this.fromPublicKey = fromPublicKey;
}
/**
* Устанавливает публичный ключ получателя
* @param toPublicKey публичный ключ получателя
*/
public void setToPublicKey(String toPublicKey) {
this.toPublicKey = toPublicKey;
}
/**
* Устанавливает текстовое содержимое сообщения
* @param content текстовое содержимое сообщения

View File

@@ -0,0 +1,28 @@
package com.rosetta.im.packet;
import com.rosetta.im.packet.base.PacketBaseDialog;
import io.orprotocol.Stream;
/**
* Пакет для отметки сообщения как прочитанного
*/
public class Packet7Read extends PacketBaseDialog {
@Override
public void read(Stream stream) {
this.privateKey = stream.readString();
this.fromPublicKey = stream.readString();
this.toPublicKey = stream.readString();
}
@Override
public Stream write() {
Stream stream = new Stream();
stream.writeInt16(this.packetId);
stream.writeString(this.privateKey);
stream.writeString(this.fromPublicKey);
stream.writeString(this.toPublicKey);
return stream;
}
}

View File

@@ -0,0 +1,88 @@
package com.rosetta.im.packet.base;
import io.orprotocol.Stream;
import io.orprotocol.packet.Packet;
/**
* Базовый пакет для диалогов между пользователями
*/
public class PacketBaseDialog extends Packet {
/**
* Публичный ключ отправителя
*/
public String fromPublicKey;
/**
* Публичный ключ получателя, может начинаться с #group: для групповых сообщений
*/
public String toPublicKey;
/**
* Приватный ключ отправителя
*/
public String privateKey;
/**
* Заглушка
*/
@Override
public void read(Stream stream) {}
/**
* Заглушка
*/
@Override
public Stream write() {return null;}
/**
* Получить публичный ключ отправителя
* @return публичный ключ отправителя
*/
public String getFromPublicKey() {
return fromPublicKey;
}
/**
* Получить публичный ключ получателя
* @return публичный ключ получателя
*/
public String getToPublicKey() {
return toPublicKey;
}
/**
* Получает приватный ключ пользователя
* @return приватный ключ
* @deprecated с версии сервера 1.1 использование приватных ключей
* в протоколе устарело, так как теперь сервер использует Handshake для аутентификации пользователей.
*/
@Deprecated(since = "1.1", forRemoval = true)
public String getPrivateKey() {
return this.privateKey;
}
/**
* Устанавливает приватный ключ пользователя
* @param privateKey приватный ключ
* @deprecated с версии сервера 1.1 использование приватных ключей
* в протоколе устарело, так как теперь сервер использует Handshake для аутентификации пользователей.
*/
@Deprecated(since = "1.1", forRemoval = true)
public void setPrivateKey(String privateKey) {
this.privateKey = privateKey;
}
/**
* Устанавливает публичный ключ отправителя
* @param fromPublicKey публичный ключ отправителя
*/
public void setFromPublicKey(String fromPublicKey) {
this.fromPublicKey = fromPublicKey;
}
/**
* Устанавливает публичный ключ получателя
* @param toPublicKey публичный ключ получателя
*/
public void setToPublicKey(String toPublicKey) {
this.toPublicKey = toPublicKey;
}
}

View File

@@ -0,0 +1,89 @@
package com.rosetta.im.service.dispatch;
import java.util.List;
import com.rosetta.im.Failures;
import com.rosetta.im.client.ClientManager;
import com.rosetta.im.client.tags.ECIAuthentificate;
import com.rosetta.im.database.repository.BufferRepository;
import com.rosetta.im.database.repository.GroupRepository;
import com.rosetta.im.packet.base.PacketBaseDialog;
import com.rosetta.im.service.services.BufferService;
import io.orprotocol.ProtocolException;
import io.orprotocol.client.Client;
import io.orprotocol.packet.PacketManager;
/**
* Диспетчер сообщений, который отвечает за отправку сообщений получателям, а так же за сохранение сообщений в буфер для офлайн получателей и для синхронизации
* Такой диспетчер нужен для того, чтобы не загромождать логику обработчиков сообщений, а так же для того, чтобы
* централизовать логику отправки сообщений и сохранения их в буфер
* Например, при отправке группового сообщения, диспетчер сам достает участников группы и
* отправляет сообщение каждому участнику, а так же сохраняет сообщение в буфер для каждого участника, который офлайн
*/
public class MessageDispatcher {
private final GroupRepository groupRepository = new GroupRepository();
private final ClientManager clientManager;
private final BufferRepository bufferRepository = new BufferRepository();
private final BufferService bufferService;
public MessageDispatcher(ClientManager clientManager, PacketManager packetManager) {
this.clientManager = clientManager;
this.bufferService = new BufferService(bufferRepository, packetManager);
}
/**
* Отправляет групповое сообщение всем участникам группы, кроме отправителя
* @param packet пакет с групповым сообщением
*/
public void sendGroup(PacketBaseDialog packet, Client client, ECIAuthentificate eciAuthentificate) throws ProtocolException {
String toPublicKey = packet.getToPublicKey();
List<String> groupMembersPublicKeys = this.groupRepository.findGroupMembers(toPublicKey.replace("#group:", ""));
if(groupMembersPublicKeys.isEmpty()){
/**
* Если группа не найдена или в группе нет участников, то в такую отправить
* сообщение нельзя
*/
client.disconnect(Failures.DATA_MISSMATCH);
return;
}
if(!groupMembersPublicKeys.contains(eciAuthentificate.getPublicKey())){
/**
* Если отправитель не является участником группы, то он не может отправлять
* сообщения в эту группу
*/
client.disconnect(Failures.USER_NOT_IN_GROUP);
return;
}
/**
* Отправляем всем участникам группы, кроме отправителя этот пакет, попутно не забывая проверить, а не один ли он в группе
*/
groupMembersPublicKeys.remove(eciAuthentificate.getPublicKey());
if(groupMembersPublicKeys.isEmpty()){
/**
* Если отправитель был единственным участником группы, то отправлять сообщение некуда,
* не кикаем пользователя
*/
return;
}
this.clientManager.sendPacketToPK(groupMembersPublicKeys, packet);
//TODO: Сохранить сообщение в буфер для группы, чтобы группы тоже синхронизировались
}
/**
* Отправляет личное сообщение получателю
* @param packet пакет с личным сообщением
*/
public void sendPeer(PacketBaseDialog packet, Client client) throws ProtocolException {
String fromPublicKey = packet.getFromPublicKey();
String toPublicKey = packet.getToPublicKey();
this.clientManager.sendPacketToPK(toPublicKey, packet);
/**
* Сохраняем сообщение в буфер на случай если получатель офлайн, или нам нужна будет синхронизация сообщений для получателя
*/
this.bufferService.pushPacketToBuffer(fromPublicKey, toPublicKey, packet);
}
}