From cd1e6e6b145e0bcbaaca6b4a7c6e80e8c0d37c9e Mon Sep 17 00:00:00 2001 From: RoyceDa Date: Wed, 4 Feb 2026 06:01:12 +0200 Subject: [PATCH] =?UTF-8?q?=D0=92=D1=8B=D1=81=D0=BE=D0=BA=D0=BE=D0=BF?= =?UTF-8?q?=D1=80=D0=BE=D0=B8=D0=B7=D0=B2=D0=BE=D0=B4=D0=B8=D1=82=D0=B5?= =?UTF-8?q?=D0=BB=D1=8C=D0=BD=D0=B0=D1=8F=20=D0=B8=D0=BD=D0=B4=D0=B5=D0=BA?= =?UTF-8?q?=D1=81=D0=B0=D1=86=D0=B8=D1=8F=20=D0=BA=D0=BB=D0=B8=D0=B5=D0=BD?= =?UTF-8?q?=D1=82=D0=BE=D0=B2=20=D0=B2=D0=BD=D1=83=D1=82=D1=80=D0=B8=20?= =?UTF-8?q?=D0=BF=D1=80=D0=BE=D1=82=D0=BE=D0=BA=D0=BE=D0=BB=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../com/rosetta/im/client/ClientManager.java | 44 +++++ src/main/java/io/orprotocol/Server.java | 34 +++- .../java/io/orprotocol/client/Client.java | 48 +++-- .../java/io/orprotocol/client/ECITag.java | 10 +- .../io/orprotocol/index/ClientIndexer.java | 181 ++++++++++++++++++ 5 files changed, 301 insertions(+), 16 deletions(-) create mode 100644 src/main/java/com/rosetta/im/client/ClientManager.java create mode 100644 src/main/java/io/orprotocol/index/ClientIndexer.java diff --git a/src/main/java/com/rosetta/im/client/ClientManager.java b/src/main/java/com/rosetta/im/client/ClientManager.java new file mode 100644 index 0000000..20672fa --- /dev/null +++ b/src/main/java/com/rosetta/im/client/ClientManager.java @@ -0,0 +1,44 @@ +package com.rosetta.im.client; + +import java.util.HashSet; + +import com.rosetta.im.client.tags.ECIAuthentificate; + +import io.orprotocol.Server; +import io.orprotocol.client.Client; +import io.orprotocol.index.ClientIndexer; + +/** + * Менеджер клиентов + */ +public class ClientManager { + + private Server server; + private ClientIndexer clientIndexer; + + public ClientManager(Server server) { + this.server = server; + this.clientIndexer = server.getClientIndexer(); + } + + public Server getServer() { + return this.server; + } + + public boolean isClientConnected(String publicKey) { + HashSet clients = this.clientIndexer.getClients(ECIAuthentificate.class, "publicKey", publicKey); + if(clients.size() > 0){ + /** + * Есть клиенты с таким публичным ключом + */ + return true; + } + /** + * Нет клиентов с таким ключом + */ + return false; + } + + + +} diff --git a/src/main/java/io/orprotocol/Server.java b/src/main/java/io/orprotocol/Server.java index ed2e4b3..c8e995f 100644 --- a/src/main/java/io/orprotocol/Server.java +++ b/src/main/java/io/orprotocol/Server.java @@ -13,6 +13,7 @@ import org.java_websocket.handshake.ClientHandshake; import org.java_websocket.server.WebSocketServer; import io.orprotocol.client.Client; +import io.orprotocol.index.ClientIndexer; import io.orprotocol.lock.ThreadLocker; import io.orprotocol.packet.Packet; import io.orprotocol.packet.PacketExecutor; @@ -25,6 +26,7 @@ public class Server extends WebSocketServer { private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private ServerListener listener; private ThreadLocker threadLocker = new ThreadLocker(); + private ClientIndexer clientIndexer = new ClientIndexer(); /** * Конструктор сервера @@ -52,11 +54,16 @@ public class Server extends WebSocketServer { } @Override - public void onClose(WebSocket socket, int arg1, String arg2, boolean arg3) { + public void onClose(WebSocket socket, int reasonCode, String arg2, boolean arg3) { if(this.listener == null){ return; } - this.listener.onClientDisconnect(this, socket.getAttachment()); + Client client = socket.getAttachment(); + this.listener.onClientDisconnect(this, client); + /** + * Удаляем клиента из индексации (потому что он вышел) + */ + this.clientIndexer.removeClientFromIndex(client); } @Override @@ -120,7 +127,12 @@ public class Server extends WebSocketServer { packet.read(stream); /** * Получаем обработчик пакета и вызываем его метод обработки. + * + * @SuppressWarnings("rawtypes") используется потому что пакет это generic класс + * и ему нельзя без указания. Но так как тип нам точно известен просто + * убираем ошибку компилятора */ + @SuppressWarnings("rawtypes") PacketExecutor executor = this.packetManager.getExecutors().get(packetId); executor.settings = this.settings; if(listener != null && !listener.onPacketReceived(this, client, packet)) { @@ -160,7 +172,7 @@ public class Server extends WebSocketServer { * Передаем интервал heartbeat из настроек сервера. * Если клиент не отправляет heartbeat в указанный интервал, его можно отключить. */ - Client client = new Client(socket, this.settings.heartbeatInterval, this.packetManager); + Client client = new Client(socket, this.settings.heartbeatInterval, this); socket.setAttachment(client); if(this.listener == null){ return; @@ -199,6 +211,22 @@ public class Server extends WebSocketServer { return clients; } + /** + * Получает менеджер пакетов сервера (где хранятся зарегистрированные пакеты и обработчики) + * @return PacketManager + */ + public PacketManager getPacketManager() { + return this.packetManager; + } + + /** + * Получить индексатор сервера, нужен для быстрого поиска клиентов по индексам + * @return ClientIndexer + */ + public ClientIndexer getClientIndexer() { + return this.clientIndexer; + } + /** * Планировщик для проверки активности клиентов. * Если планировщик обнаруживает неактивного клиента, он отключает его с соответствующим кодом ошибки. diff --git a/src/main/java/io/orprotocol/client/Client.java b/src/main/java/io/orprotocol/client/Client.java index 05f6323..a1df794 100644 --- a/src/main/java/io/orprotocol/client/Client.java +++ b/src/main/java/io/orprotocol/client/Client.java @@ -6,8 +6,10 @@ import org.java_websocket.WebSocket; import io.orprotocol.BaseFailures; import io.orprotocol.ProtocolException; +import io.orprotocol.Server; import io.orprotocol.ServerFailures; import io.orprotocol.Stream; +import io.orprotocol.index.ClientIndexer; import io.orprotocol.packet.Packet; import io.orprotocol.packet.PacketManager; import io.orprotocol.util.StringUtil; @@ -32,6 +34,7 @@ public class Client { */ private volatile long lastHeartbeatTime; + private ClientIndexer clientIndexer; private PacketManager packetManager; /** @@ -40,13 +43,14 @@ public class Client { * @param socket Веб-сокет клиента. * */ - public Client(WebSocket socket, long heartbeatInterval, PacketManager packetManager) { + public Client(WebSocket socket, long heartbeatInterval, Server server) { this.socket = socket; this.clientId = StringUtil.randomString(32); this.eciTags = new HashMap, ECITag>(); this.heartbeatInterval = heartbeatInterval; this.lastHeartbeatTime = System.currentTimeMillis(); - this.packetManager = packetManager; + this.clientIndexer = server.getClientIndexer(); + this.packetManager = server.getPacketManager(); } /** @@ -96,16 +100,18 @@ public class Client { * @param key Ключ данных. * @param value Значение данных. */ - public void addTag(T eciTag) { - this.eciTags.put(eciTag.getClass(), eciTag); - } - - /** - * Устанавливает данные клиента. - * @param data Данные клиента. - */ - public void setTags(HashMap, ECITag> eciTags) { - this.eciTags = eciTags; + public void addTag(Class tagClass, T eciTag) { + if (eciTag == null) { + this.eciTags.remove(tagClass); + if (this.clientIndexer != null) { + this.clientIndexer.removeTagIndex(this, tagClass); + } + } else { + this.eciTags.put(tagClass, eciTag); + if (this.clientIndexer != null) { + this.clientIndexer.indexTag(this, tagClass, eciTag); + } + } } /** @@ -174,4 +180,22 @@ public class Client { this.socket.send(stream.getBuffer()); } + /** + * Проверяем схожесть двух Client + * @param client клиент + * @return true если это один и тот же клиент, false если нет + */ + public boolean equals(Client client) { + if(client == null){ + return false; + } + if(!(client instanceof Client)){ + return false; + } + if(!client.getClientId().equals(this.clientId)){ + return false; + } + return true; + } + } diff --git a/src/main/java/io/orprotocol/client/ECITag.java b/src/main/java/io/orprotocol/client/ECITag.java index b8365c1..7afa8dd 100644 --- a/src/main/java/io/orprotocol/client/ECITag.java +++ b/src/main/java/io/orprotocol/client/ECITag.java @@ -1,8 +1,16 @@ package io.orprotocol.client; +import java.util.Map; + /** * Embedded Client Information Tag. * * Используется для хранения дополнительной информации о клиенте. */ -public abstract class ECITag {} +public interface ECITag { + + default Map getIndex() { + return null; + } + +} diff --git a/src/main/java/io/orprotocol/index/ClientIndexer.java b/src/main/java/io/orprotocol/index/ClientIndexer.java new file mode 100644 index 0000000..2e3a7df --- /dev/null +++ b/src/main/java/io/orprotocol/index/ClientIndexer.java @@ -0,0 +1,181 @@ +package io.orprotocol.index; + +import java.util.HashSet; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import io.orprotocol.client.Client; +import io.orprotocol.client.ECITag; + +/** + * Представляет собой производительный индекс для клиентов по ECI тегам + */ +public class ClientIndexer { + + /** + * Индексы для быстрого поиска клиентов по различным полям. + * Ключ первого уровня - класс тега (ECITag). + * Ключ второго уровня - имя поля для индексации. + * Ключ третьего уровня - значение поля. + * Значение - клиент. + * Структура: tagClass -> indexName -> indexValue -> Client + * + * + * В качестве клиента используем HashSet потому что он очень хорош для операции contains, + * в отличие от ArrayList, который при поиске перебирает элементы один за другим, + * HashSet использует хэш таблицу и находит нужный объект почти + * мгновенно, независимо от того, 10 там элементов или миллион. + */ + private final Map, Map>>> indices + = new ConcurrentHashMap<>(); + + public void indexTag(Client client, Class tagClass, T tag) { + if(client == null || tagClass == null) { + return; + } + Map needToIndex = tag.getIndex(); + if(needToIndex == null || needToIndex.isEmpty()){ + /** + * Индексировать нечего, getIndex не реализован в ECI теге + */ + return; + } + /** + * Сначала удаляем этот тег у клиента + * (может быть только один набор ключей по тегу у одного клиента) + */ + this.removeTagIndex(client, tagClass); + + /** + * Инициализируем индексы для этого класса тега + * ВАЖНО! computeIfAbsent используеьтся потому, что нам нужно либо + * положить значение в indices либо вернуть актуальное значение оттуда. + * Если использовать putIfAbsent, то он вернет null если значение там уже есть, + * что не подходит + */ + Map>> tagIndices = this.indices.computeIfAbsent( + tagClass, + k -> new ConcurrentHashMap<>()); + + /** + * Проходим по всем значениям которые нужно проиндексировать из тега + * (возвращает getIndex у конкретного ECI тега) + */ + for (Map.Entry entry : needToIndex.entrySet()) { + String indexName = entry.getKey(); + Object indexValue = entry.getValue(); + + if (indexValue == null || indexName == null) { + /** + * Если значение или ключ не определен пропускаем, + * иначе в индекс попадает мусор + */ + continue; + } + + /** + * Инициализируем имя индекса, почему compute, а не put - написано выше + */ + Map> index = tagIndices.computeIfAbsent( + indexName, + k -> new ConcurrentHashMap<>() + ); + + /** + * Инициализируем значение индекса, потому что может быть несколько клиентов + * с одинковыми индексами, хотя такого лучше избегать, желательно чтобы индексы + * были уникальными, тогда обработка будет быстрее всего + */ + HashSet clients = index.computeIfAbsent(indexValue, k -> new HashSet<>()); + /** + * Добавляем клиента в инициализиованный индекс + */ + clients.add(client); + } + } + + + /** + * Удаляет клиента из индекса тега + * @internal + */ + public void removeTagIndex(Client client, Class tagClass) { + Map>> tagIndices = indices.get(tagClass); + if (tagIndices == null) { + /** + * Индекса и так не было, удалять нечего + */ + return; + } + + /** + * Удаляем все ключи indexName по tagClass если Client == client + */ + for (Map> index : tagIndices.values()) { + /** + * contains всегда использует переопределенный equals, по этому + * обьекты клиентов сравниваются нормально + */ + for(HashSet clients : index.values()){ + if(!clients.contains(client)){ + continue; + } + clients.removeIf(c -> c.equals(client)); + } + } + } + + /** + * Удаляет весь индекс для клтента, вызывается сервером при отключении клиента + * @internal + */ + public void removeClientFromIndex(Client client) { + for(Map>> tagIndices : this.indices.values()){ + for(Map> index : tagIndices.values()){ + for(HashSet clients : index.values()){ + /** + * Этот тройной цикл не такой страшный, так как мы всего лишь + * проходим по всем тегам (их немного), дальше идем по всем значениям в тегах + * (их тоже немного) и дальше используем быстрый contains у HashSet, + * для этого он и был нужен + */ + if(!clients.contains(client)){ + continue; + } + clients.removeIf(c -> c.equals(client)); + } + } + } + } + + /** + * Получить список клиентов по тегу, полю тега и его значению + * @param тип тега + * @param tagClass класс тега + * @param indexName поле в теге + * @param indexValue значение в теге + * @return список клиентов с заданными значениями + */ + public HashSet getClients(Class tagClass, String indexName, Object indexValue) { + if(indexName == null || indexValue == null){ + return null; + } + + /** + * Получение по индексу простое, так как каждое из заданных значений и есть ключ + */ + Map>> tagIndices = indices.get(tagClass); + if (tagIndices == null) { + return null; + } + + Map> index = tagIndices.get(indexName); + if (index == null) { + return null; + } + + return index.get(indexValue); + } + + +}