Высокопроизводительная индексация клиентов внутри протокола

This commit is contained in:
RoyceDa
2026-02-04 06:01:12 +02:00
parent 6a5c01bf25
commit cd1e6e6b14
5 changed files with 301 additions and 16 deletions

View File

@@ -0,0 +1,44 @@
package com.rosetta.im.client;
import java.util.HashSet;
import com.rosetta.im.client.tags.ECIAuthentificate;
import io.orprotocol.Server;
import io.orprotocol.client.Client;
import io.orprotocol.index.ClientIndexer;
/**
* Менеджер клиентов
*/
public class ClientManager {
private Server server;
private ClientIndexer clientIndexer;
public ClientManager(Server server) {
this.server = server;
this.clientIndexer = server.getClientIndexer();
}
public Server getServer() {
return this.server;
}
public boolean isClientConnected(String publicKey) {
HashSet<Client> clients = this.clientIndexer.getClients(ECIAuthentificate.class, "publicKey", publicKey);
if(clients.size() > 0){
/**
* Есть клиенты с таким публичным ключом
*/
return true;
}
/**
* Нет клиентов с таким ключом
*/
return false;
}
}

View File

@@ -13,6 +13,7 @@ import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;
import io.orprotocol.client.Client;
import io.orprotocol.index.ClientIndexer;
import io.orprotocol.lock.ThreadLocker;
import io.orprotocol.packet.Packet;
import io.orprotocol.packet.PacketExecutor;
@@ -25,6 +26,7 @@ public class Server extends WebSocketServer {
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private ServerListener listener;
private ThreadLocker threadLocker = new ThreadLocker();
private ClientIndexer clientIndexer = new ClientIndexer();
/**
* Конструктор сервера
@@ -52,11 +54,16 @@ public class Server extends WebSocketServer {
}
@Override
public void onClose(WebSocket socket, int arg1, String arg2, boolean arg3) {
public void onClose(WebSocket socket, int reasonCode, String arg2, boolean arg3) {
if(this.listener == null){
return;
}
this.listener.onClientDisconnect(this, socket.getAttachment());
Client client = socket.getAttachment();
this.listener.onClientDisconnect(this, client);
/**
* Удаляем клиента из индексации (потому что он вышел)
*/
this.clientIndexer.removeClientFromIndex(client);
}
@Override
@@ -120,7 +127,12 @@ public class Server extends WebSocketServer {
packet.read(stream);
/**
* Получаем обработчик пакета и вызываем его метод обработки.
*
* @SuppressWarnings("rawtypes") используется потому что пакет это generic класс
* и ему нельзя без указания. Но так как тип нам точно известен просто
* убираем ошибку компилятора
*/
@SuppressWarnings("rawtypes")
PacketExecutor executor = this.packetManager.getExecutors().get(packetId);
executor.settings = this.settings;
if(listener != null && !listener.onPacketReceived(this, client, packet)) {
@@ -160,7 +172,7 @@ public class Server extends WebSocketServer {
* Передаем интервал heartbeat из настроек сервера.
* Если клиент не отправляет heartbeat в указанный интервал, его можно отключить.
*/
Client client = new Client(socket, this.settings.heartbeatInterval, this.packetManager);
Client client = new Client(socket, this.settings.heartbeatInterval, this);
socket.setAttachment(client);
if(this.listener == null){
return;
@@ -199,6 +211,22 @@ public class Server extends WebSocketServer {
return clients;
}
/**
* Получает менеджер пакетов сервера (где хранятся зарегистрированные пакеты и обработчики)
* @return PacketManager
*/
public PacketManager getPacketManager() {
return this.packetManager;
}
/**
* Получить индексатор сервера, нужен для быстрого поиска клиентов по индексам
* @return ClientIndexer
*/
public ClientIndexer getClientIndexer() {
return this.clientIndexer;
}
/**
* Планировщик для проверки активности клиентов.
* Если планировщик обнаруживает неактивного клиента, он отключает его с соответствующим кодом ошибки.

View File

@@ -6,8 +6,10 @@ import org.java_websocket.WebSocket;
import io.orprotocol.BaseFailures;
import io.orprotocol.ProtocolException;
import io.orprotocol.Server;
import io.orprotocol.ServerFailures;
import io.orprotocol.Stream;
import io.orprotocol.index.ClientIndexer;
import io.orprotocol.packet.Packet;
import io.orprotocol.packet.PacketManager;
import io.orprotocol.util.StringUtil;
@@ -32,6 +34,7 @@ public class Client {
*/
private volatile long lastHeartbeatTime;
private ClientIndexer clientIndexer;
private PacketManager packetManager;
/**
@@ -40,13 +43,14 @@ public class Client {
* @param socket Веб-сокет клиента.
*
*/
public Client(WebSocket socket, long heartbeatInterval, PacketManager packetManager) {
public Client(WebSocket socket, long heartbeatInterval, Server server) {
this.socket = socket;
this.clientId = StringUtil.randomString(32);
this.eciTags = new HashMap<Class<? extends ECITag>, ECITag>();
this.heartbeatInterval = heartbeatInterval;
this.lastHeartbeatTime = System.currentTimeMillis();
this.packetManager = packetManager;
this.clientIndexer = server.getClientIndexer();
this.packetManager = server.getPacketManager();
}
/**
@@ -96,16 +100,18 @@ public class Client {
* @param key Ключ данных.
* @param value Значение данных.
*/
public <T extends ECITag> void addTag(T eciTag) {
this.eciTags.put(eciTag.getClass(), eciTag);
}
/**
* Устанавливает данные клиента.
* @param data Данные клиента.
*/
public void setTags(HashMap<Class<? extends ECITag>, ECITag> eciTags) {
this.eciTags = eciTags;
public <T extends ECITag> void addTag(Class<T> tagClass, T eciTag) {
if (eciTag == null) {
this.eciTags.remove(tagClass);
if (this.clientIndexer != null) {
this.clientIndexer.removeTagIndex(this, tagClass);
}
} else {
this.eciTags.put(tagClass, eciTag);
if (this.clientIndexer != null) {
this.clientIndexer.indexTag(this, tagClass, eciTag);
}
}
}
/**
@@ -174,4 +180,22 @@ public class Client {
this.socket.send(stream.getBuffer());
}
/**
* Проверяем схожесть двух Client
* @param client клиент
* @return true если это один и тот же клиент, false если нет
*/
public boolean equals(Client client) {
if(client == null){
return false;
}
if(!(client instanceof Client)){
return false;
}
if(!client.getClientId().equals(this.clientId)){
return false;
}
return true;
}
}

View File

@@ -1,8 +1,16 @@
package io.orprotocol.client;
import java.util.Map;
/**
* Embedded Client Information Tag.
*
* Используется для хранения дополнительной информации о клиенте.
*/
public abstract class ECITag {}
public interface ECITag {
default Map<String, Object> getIndex() {
return null;
}
}

View File

@@ -0,0 +1,181 @@
package io.orprotocol.index;
import java.util.HashSet;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import io.orprotocol.client.Client;
import io.orprotocol.client.ECITag;
/**
* Представляет собой производительный индекс для клиентов по ECI тегам
*/
public class ClientIndexer {
/**
* Индексы для быстрого поиска клиентов по различным полям.
* Ключ первого уровня - класс тега (ECITag).
* Ключ второго уровня - имя поля для индексации.
* Ключ третьего уровня - значение поля.
* Значение - клиент.
* Структура: tagClass -> indexName -> indexValue -> Client
*
*
* В качестве клиента используем HashSet потому что он очень хорош для операции contains,
* в отличие от ArrayList, который при поиске перебирает элементы один за другим,
* HashSet использует хэш таблицу и находит нужный объект почти
* мгновенно, независимо от того, 10 там элементов или миллион.
*/
private final Map<Class<?>, Map<String, Map<Object, HashSet<Client>>>> indices
= new ConcurrentHashMap<>();
public <T extends ECITag> void indexTag(Client client, Class<T> tagClass, T tag) {
if(client == null || tagClass == null) {
return;
}
Map<String, Object> needToIndex = tag.getIndex();
if(needToIndex == null || needToIndex.isEmpty()){
/**
* Индексировать нечего, getIndex не реализован в ECI теге
*/
return;
}
/**
* Сначала удаляем этот тег у клиента
* (может быть только один набор ключей по тегу у одного клиента)
*/
this.removeTagIndex(client, tagClass);
/**
* Инициализируем индексы для этого класса тега
* ВАЖНО! computeIfAbsent используеьтся потому, что нам нужно либо
* положить значение в indices либо вернуть актуальное значение оттуда.
* Если использовать putIfAbsent, то он вернет null если значение там уже есть,
* что не подходит
*/
Map<String, Map<Object, HashSet<Client>>> tagIndices = this.indices.computeIfAbsent(
tagClass,
k -> new ConcurrentHashMap<>());
/**
* Проходим по всем значениям которые нужно проиндексировать из тега
* (возвращает getIndex у конкретного ECI тега)
*/
for (Map.Entry<String, Object> entry : needToIndex.entrySet()) {
String indexName = entry.getKey();
Object indexValue = entry.getValue();
if (indexValue == null || indexName == null) {
/**
* Если значение или ключ не определен пропускаем,
* иначе в индекс попадает мусор
*/
continue;
}
/**
* Инициализируем имя индекса, почему compute, а не put - написано выше
*/
Map<Object, HashSet<Client>> index = tagIndices.computeIfAbsent(
indexName,
k -> new ConcurrentHashMap<>()
);
/**
* Инициализируем значение индекса, потому что может быть несколько клиентов
* с одинковыми индексами, хотя такого лучше избегать, желательно чтобы индексы
* были уникальными, тогда обработка будет быстрее всего
*/
HashSet<Client> clients = index.computeIfAbsent(indexValue, k -> new HashSet<>());
/**
* Добавляем клиента в инициализиованный индекс
*/
clients.add(client);
}
}
/**
* Удаляет клиента из индекса тега
* @internal
*/
public <T extends ECITag> void removeTagIndex(Client client, Class<T> tagClass) {
Map<String, Map<Object, HashSet<Client>>> tagIndices = indices.get(tagClass);
if (tagIndices == null) {
/**
* Индекса и так не было, удалять нечего
*/
return;
}
/**
* Удаляем все ключи indexName по tagClass если Client == client
*/
for (Map<Object, HashSet<Client>> index : tagIndices.values()) {
/**
* contains всегда использует переопределенный equals, по этому
* обьекты клиентов сравниваются нормально
*/
for(HashSet<Client> clients : index.values()){
if(!clients.contains(client)){
continue;
}
clients.removeIf(c -> c.equals(client));
}
}
}
/**
* Удаляет весь индекс для клтента, вызывается сервером при отключении клиента
* @internal
*/
public void removeClientFromIndex(Client client) {
for(Map<String, Map<Object, HashSet<Client>>> tagIndices : this.indices.values()){
for(Map<Object, HashSet<Client>> index : tagIndices.values()){
for(HashSet<Client> clients : index.values()){
/**
* Этот тройной цикл не такой страшный, так как мы всего лишь
* проходим по всем тегам (их немного), дальше идем по всем значениям в тегах
* (их тоже немного) и дальше используем быстрый contains у HashSet,
* для этого он и был нужен
*/
if(!clients.contains(client)){
continue;
}
clients.removeIf(c -> c.equals(client));
}
}
}
}
/**
* Получить список клиентов по тегу, полю тега и его значению
* @param <T> тип тега
* @param tagClass класс тега
* @param indexName поле в теге
* @param indexValue значение в теге
* @return список клиентов с заданными значениями
*/
public <T extends ECITag> HashSet<Client> getClients(Class<T> tagClass, String indexName, Object indexValue) {
if(indexName == null || indexValue == null){
return null;
}
/**
* Получение по индексу простое, так как каждое из заданных значений и есть ключ
*/
Map<String, Map<Object, HashSet<Client>>> tagIndices = indices.get(tagClass);
if (tagIndices == null) {
return null;
}
Map<Object, HashSet<Client>> index = tagIndices.get(indexName);
if (index == null) {
return null;
}
return index.get(indexValue);
}
}