diff --git a/pom.xml b/pom.xml
index c84b6a0..90693f9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -13,8 +13,6 @@
17
-
-
org.java-websocket
diff --git a/src/main/java/com/rosetta/im/Boot.java b/src/main/java/com/rosetta/im/Boot.java
index 1bf8ca6..39d0c52 100644
--- a/src/main/java/com/rosetta/im/Boot.java
+++ b/src/main/java/com/rosetta/im/Boot.java
@@ -1,11 +1,15 @@
package com.rosetta.im;
import com.rosetta.im.client.ClientManager;
+import com.rosetta.im.client.OnlineManager;
import com.rosetta.im.event.EventManager;
import com.rosetta.im.executors.Executor0Handshake;
import com.rosetta.im.executors.Executor1UserInfo;
import com.rosetta.im.executors.Executor3Search;
+import com.rosetta.im.executors.Executor4OnlineState;
import com.rosetta.im.listeners.HandshakeCompleteListener;
+import com.rosetta.im.listeners.OnlineStatusDisconnectListener;
+import com.rosetta.im.listeners.OnlineStatusHandshakeCompleteListener;
import com.rosetta.im.listeners.ServerStopListener;
import com.rosetta.im.logger.Logger;
import com.rosetta.im.logger.enums.Color;
@@ -14,6 +18,8 @@ import com.rosetta.im.packet.Packet0Handshake;
import com.rosetta.im.packet.Packet1UserInfo;
import com.rosetta.im.packet.Packet2Result;
import com.rosetta.im.packet.Packet3Search;
+import com.rosetta.im.packet.Packet4OnlineSubscribe;
+import com.rosetta.im.packet.Packet5OnlineState;
import io.orprotocol.Server;
import io.orprotocol.Settings;
@@ -31,10 +37,12 @@ public class Boot {
private Server server;
private ServerAdapter serverAdapter;
private ClientManager clientManager;
+ private OnlineManager onlineManager;
public Boot() {
this.packetManager = new PacketManager();
this.eventManager = new EventManager();
+ this.onlineManager = new OnlineManager();
this.logger = new Logger(LogLevel.INFO);
this.serverAdapter = new ServerAdapter(this.eventManager);
this.server = new Server(new Settings(
@@ -92,6 +100,8 @@ public class Boot {
private void registerAllEvents() {
this.eventManager.registerListener(new ServerStopListener(this.logger));
this.eventManager.registerListener(new HandshakeCompleteListener());
+ this.eventManager.registerListener(new OnlineStatusHandshakeCompleteListener(this.onlineManager));
+ this.eventManager.registerListener(new OnlineStatusDisconnectListener(this.onlineManager));
}
private void registerAllPackets() {
@@ -99,12 +109,15 @@ public class Boot {
this.packetManager.registerPacket(1, Packet1UserInfo.class);
this.packetManager.registerPacket(2, Packet2Result.class);
this.packetManager.registerPacket(3, Packet3Search.class);
+ this.packetManager.registerPacket(4, Packet4OnlineSubscribe.class);
+ this.packetManager.registerPacket(5, Packet5OnlineState.class);
}
private void registerAllExecutors() {
this.packetManager.registerExecutor(0, new Executor0Handshake(this.eventManager));
this.packetManager.registerExecutor(1, new Executor1UserInfo());
this.packetManager.registerExecutor(3, new Executor3Search(this.clientManager));
+ this.packetManager.registerExecutor(4, new Executor4OnlineState(this.onlineManager));
}
private void printBootMessage() {
diff --git a/src/main/java/com/rosetta/im/Failures.java b/src/main/java/com/rosetta/im/Failures.java
index 7569b5a..f867872 100644
--- a/src/main/java/com/rosetta/im/Failures.java
+++ b/src/main/java/com/rosetta/im/Failures.java
@@ -15,7 +15,11 @@ public enum Failures implements BaseFailures {
/**
* Неподдерживаемый протокол
*/
- UNSUPPORTED_PROTOCOL(3008);
+ UNSUPPORTED_PROTOCOL(3008),
+ /**
+ * Слишком много подписок на онлайн статусы
+ */
+ TOO_MANY_ONLINE_SUBSCRIPTIONS(3010);
private final int code;
diff --git a/src/main/java/com/rosetta/im/client/OnlineManager.java b/src/main/java/com/rosetta/im/client/OnlineManager.java
new file mode 100644
index 0000000..f45e08a
--- /dev/null
+++ b/src/main/java/com/rosetta/im/client/OnlineManager.java
@@ -0,0 +1,73 @@
+package com.rosetta.im.client;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+
+import com.rosetta.im.client.tags.ECIAuthentificate;
+
+import io.orprotocol.client.Client;
+
+/**
+ * Отвечает за подписки на онлайн статус пользователей
+ * Каждый пользователь может подписаться на онлайн статус других пользователей
+ * и получать обновления об их статусе в реальном времени
+ */
+public class OnlineManager {
+
+ private HashMap> onlineSubscriptions;
+
+ public OnlineManager() {
+ this.onlineSubscriptions = new HashMap<>();
+ }
+
+ /**
+ * Подписывает клиента на онлайн статус другого пользователя по его публичному ключу
+ * @param client клиент, который подписывается
+ * @param targetPublicKey публичный ключ пользователя, на которого подписываются
+ */
+ public void subscribe(Client client, String targetPublicKey) {
+ this.onlineSubscriptions.computeIfAbsent(client, k -> new HashSet<>())
+ .add(targetPublicKey);
+ }
+
+ /**
+ * Отписывает клиента от онлайн статуса другого пользователя по его публичному ключу, например при отключении клиента
+ * @param client клиент, который отписывается от всех (отключается)
+ */
+ public void unsubscribeAll(Client client) {
+ this.onlineSubscriptions.remove(client);
+ }
+
+ /**
+ * Получает список клиентов, которые подписаны на онлайн статус пользователя с указанным публичным ключом
+ * @param targetPublicKey публичный ключ пользователя, чью онлайн статус интересует
+ * @return список клиентов, подписанных на этот публичный ключ
+ */
+ public List getSubscribers(String targetPublicKey) {
+ List subscribers = new java.util.ArrayList<>();
+ for (var entry : this.onlineSubscriptions.entrySet()) {
+ Client client = entry.getKey();
+ HashSet subscribedKeys = entry.getValue();
+ if (subscribedKeys.contains(targetPublicKey)) {
+ subscribers.add(client);
+ }
+ }
+ return subscribers;
+ }
+
+ /**
+ * Получает список клиентов, которые подписаны на онлайн статус пользователя, представленного данным клиентом
+ * @param client клиент, представляющий пользователя, чью онлайн статус интересует
+ * @return список клиентов, подписанных на этого пользователя
+ */
+ public List getSubscribers(Client client) {
+ ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class);
+ if(eciAuthentificate == null || !eciAuthentificate.hasAuthorized()) {
+ return new ArrayList<>();
+ }
+ String publicKey = eciAuthentificate.getPublicKey();
+ return this.getSubscribers(publicKey);
+ }
+}
diff --git a/src/main/java/com/rosetta/im/executors/Executor3Search.java b/src/main/java/com/rosetta/im/executors/Executor3Search.java
index 20e1533..9242ae0 100644
--- a/src/main/java/com/rosetta/im/executors/Executor3Search.java
+++ b/src/main/java/com/rosetta/im/executors/Executor3Search.java
@@ -50,7 +50,7 @@ public class Executor3Search extends PacketExecutor {
List usersFindedList = userService.searchUsers(search, 7);
Packet3Search response = new Packet3Search();
response.setSearch("");
-
+
response.setPrivateKey("");
List searchInfos = new ArrayList<>();
diff --git a/src/main/java/com/rosetta/im/executors/Executor4OnlineState.java b/src/main/java/com/rosetta/im/executors/Executor4OnlineState.java
new file mode 100644
index 0000000..0d126ef
--- /dev/null
+++ b/src/main/java/com/rosetta/im/executors/Executor4OnlineState.java
@@ -0,0 +1,54 @@
+package com.rosetta.im.executors;
+
+import java.util.List;
+
+import com.rosetta.im.Failures;
+import com.rosetta.im.client.OnlineManager;
+import com.rosetta.im.client.tags.ECIAuthentificate;
+import com.rosetta.im.packet.Packet4OnlineSubscribe;
+
+import io.orprotocol.ProtocolException;
+import io.orprotocol.client.Client;
+import io.orprotocol.packet.PacketExecutor;
+
+public class Executor4OnlineState extends PacketExecutor {
+
+ private final OnlineManager onlineManager;
+
+ public Executor4OnlineState(OnlineManager onlineManager) {
+ this.onlineManager = onlineManager;
+ }
+
+ @Override
+ public void onPacketReceived(Packet4OnlineSubscribe packet, Client client) throws Exception, ProtocolException {
+ ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class);
+ if(eciAuthentificate == null || !eciAuthentificate.hasAuthorized()) {
+ /**
+ * Клиент не авторизован
+ */
+ client.disconnect(Failures.HANDSHAKE_NOT_COMPLETED);
+ return;
+ }
+ /**
+ * Устанавливаем подписку на онлайн статус указанных публичных ключей
+ */
+ List publicKeys = packet.getPublicKeys();
+ if(publicKeys == null || publicKeys.isEmpty()) {
+ /**
+ * Пустой список, ничего не делаем
+ */
+ return;
+ }
+ if(publicKeys.size() > 20) {
+ /**
+ * Слишком много подписок за один раз
+ */
+ client.disconnect(Failures.TOO_MANY_ONLINE_SUBSCRIPTIONS);
+ return;
+ }
+ for (String targetPublicKey : publicKeys) {
+ this.onlineManager.subscribe(client, targetPublicKey);
+ }
+ }
+
+}
diff --git a/src/main/java/com/rosetta/im/listeners/OnlineStatusDisconnectListener.java b/src/main/java/com/rosetta/im/listeners/OnlineStatusDisconnectListener.java
new file mode 100644
index 0000000..e47e42d
--- /dev/null
+++ b/src/main/java/com/rosetta/im/listeners/OnlineStatusDisconnectListener.java
@@ -0,0 +1,61 @@
+package com.rosetta.im.listeners;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.rosetta.im.client.OnlineManager;
+import com.rosetta.im.client.tags.ECIAuthentificate;
+import com.rosetta.im.event.EventHandler;
+import com.rosetta.im.event.Listener;
+import com.rosetta.im.event.events.DisconnectEvent;
+import com.rosetta.im.packet.Packet5OnlineState;
+import com.rosetta.im.packet.runtime.NetworkStatus;
+import com.rosetta.im.packet.runtime.PKNetworkStatus;
+
+import io.orprotocol.ProtocolException;
+import io.orprotocol.client.Client;
+
+/**
+ * Слушатель отключения клиента
+ * Нужен для того чтобы обновлять онлайн статус пользоватеей, и уведомлять всех
+ * подписчиков об изменении статуса
+ */
+public class OnlineStatusDisconnectListener implements Listener {
+
+ private OnlineManager onlineManager;
+
+ public OnlineStatusDisconnectListener(OnlineManager onlineManager) {
+ this.onlineManager = onlineManager;
+ }
+
+ @EventHandler
+ public void onClientDisconnect(DisconnectEvent event) throws ProtocolException {
+ Client client = event.getClient();
+ ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class);
+ if(eciAuthentificate == null || !eciAuthentificate.hasAuthorized()) {
+ /**
+ * Клиент не авторизован, ничего не делаем
+ */
+ return;
+ }
+ List subscribers = this.onlineManager.getSubscribers(client);
+ /**
+ * Уведомляем всех подписчиков на его онлайн статус, что он отключился (ушел в оффлайн)
+ */
+ for (Client subscriber : subscribers) {
+ Packet5OnlineState packet = new Packet5OnlineState();
+ List statuses = new ArrayList<>();
+ statuses.add(new PKNetworkStatus(
+ eciAuthentificate.getPublicKey(),
+ NetworkStatus.OFFLINE
+ ));
+ packet.setPkNetworkStatuses(statuses);
+ subscriber.send(packet);
+ }
+ /**
+ * Удаляем все подписки этого клиента, так как он отключился
+ */
+ this.onlineManager.unsubscribeAll(client);
+ }
+
+}
diff --git a/src/main/java/com/rosetta/im/listeners/OnlineStatusHandshakeCompleteListener.java b/src/main/java/com/rosetta/im/listeners/OnlineStatusHandshakeCompleteListener.java
new file mode 100644
index 0000000..139cdd3
--- /dev/null
+++ b/src/main/java/com/rosetta/im/listeners/OnlineStatusHandshakeCompleteListener.java
@@ -0,0 +1,59 @@
+package com.rosetta.im.listeners;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import com.rosetta.im.client.OnlineManager;
+import com.rosetta.im.client.tags.ECIAuthentificate;
+import com.rosetta.im.event.EventHandler;
+import com.rosetta.im.event.Listener;
+import com.rosetta.im.event.events.handshake.HandshakeCompletedEvent;
+import com.rosetta.im.packet.Packet5OnlineState;
+import com.rosetta.im.packet.runtime.NetworkStatus;
+import com.rosetta.im.packet.runtime.PKNetworkStatus;
+
+import io.orprotocol.ProtocolException;
+import io.orprotocol.client.Client;
+
+/**
+ * Слушатель завершения рукопожатия (хэндшейкапа) клиента
+ * Нужен для того чтобы обновлять онлайн статус пользоватеей, и уведомлять всех
+ * подписчиков об изменении статуса
+ */
+public class OnlineStatusHandshakeCompleteListener implements Listener {
+
+ private final OnlineManager onlineManager;
+
+ public OnlineStatusHandshakeCompleteListener(OnlineManager onlineManager) {
+ this.onlineManager = onlineManager;
+ }
+
+ @EventHandler
+ public void onHandshakeComplete(HandshakeCompletedEvent event) throws ProtocolException {
+ Client client = event.getClient();
+ ECIAuthentificate eciAuthentificate = client.getTag(ECIAuthentificate.class);
+ if(eciAuthentificate == null || !eciAuthentificate.hasAuthorized()) {
+ /**
+ * Клиент не авторизован, ничего не делаем, однако такое
+ * не должно происходить, так как событие хэндшейкапа
+ * должно означать что клиент авторизован
+ */
+ return;
+ }
+ List subscribers = this.onlineManager.getSubscribers(client);
+ /**
+ * Уведомляем всех подписчиков на его онлайн статус, что он подключился (стал онлайн)
+ */
+ for (Client subscriber : subscribers) {
+ Packet5OnlineState packet = new Packet5OnlineState();
+ List statuses = new ArrayList<>();
+ statuses.add(new PKNetworkStatus(
+ eciAuthentificate.getPublicKey(),
+ NetworkStatus.ONLINE
+ ));
+ packet.setPkNetworkStatuses(statuses);
+ subscriber.send(packet);
+ }
+ }
+
+}
diff --git a/src/main/java/com/rosetta/im/packet/Packet4OnlineSubscribe.java b/src/main/java/com/rosetta/im/packet/Packet4OnlineSubscribe.java
new file mode 100644
index 0000000..8d639af
--- /dev/null
+++ b/src/main/java/com/rosetta/im/packet/Packet4OnlineSubscribe.java
@@ -0,0 +1,73 @@
+package com.rosetta.im.packet;
+
+import java.util.List;
+
+import io.orprotocol.Stream;
+import io.orprotocol.packet.Packet;
+
+public class Packet4OnlineSubscribe extends Packet {
+
+ private String privateKey;
+ private List publicKeys;
+
+ @Override
+ public void read(Stream stream) {
+ this.privateKey = stream.readString();
+ int publicKeysCount = stream.readInt16();
+ this.publicKeys = new java.util.ArrayList<>();
+ for (int i = 0; i < publicKeysCount; i++) {
+ this.publicKeys.add(stream.readString());
+ }
+ }
+
+ @Override
+ public Stream write() {
+ Stream stream = new Stream();
+ stream.writeInt16(this.packetId);
+ stream.writeString(this.privateKey);
+ stream.writeInt16(this.publicKeys.size());
+ for (String publicKey : this.publicKeys) {
+ stream.writeString(publicKey);
+ }
+ return stream;
+ }
+
+ /**
+ * Получает приватный ключ пользователя
+ * @return приватный ключ
+ * @deprecated с версии сервера 1.1 использование приватных ключей
+ * в протоколе устарело, так как теперь сервер использует Handshake для аутентификации пользователей.
+ */
+ @Deprecated(since = "1.1", forRemoval = true)
+ public String getPrivateKey() {
+ return this.privateKey;
+ }
+
+ /**
+ * Устанавливает приватный ключ пользователя
+ * @param privateKey приватный ключ
+ * @deprecated с версии сервера 1.1 использование приватных ключей
+ * в протоколе устарело, так как теперь сервер использует Handshake для аутентификации пользователей.
+ */
+ @Deprecated(since = "1.1", forRemoval = true)
+ public void setPrivateKey(String privateKey) {
+ this.privateKey = privateKey;
+ }
+
+ /**
+ * Получает список публичных ключей для подписки на онлайн статус
+ * @return список публичных ключей
+ */
+ public List getPublicKeys() {
+ return this.publicKeys;
+ }
+
+ /**
+ * Устанавливает список публичных ключей для подписки на онлайн статус
+ * @param publicKeys список публичных ключей
+ */
+ public void setPublicKeys(List publicKeys) {
+ this.publicKeys = publicKeys;
+ }
+
+}
diff --git a/src/main/java/com/rosetta/im/packet/Packet5OnlineState.java b/src/main/java/com/rosetta/im/packet/Packet5OnlineState.java
new file mode 100644
index 0000000..9857683
--- /dev/null
+++ b/src/main/java/com/rosetta/im/packet/Packet5OnlineState.java
@@ -0,0 +1,47 @@
+package com.rosetta.im.packet;
+
+import java.util.List;
+
+import com.rosetta.im.packet.runtime.NetworkStatus;
+import com.rosetta.im.packet.runtime.PKNetworkStatus;
+
+import io.orprotocol.Stream;
+import io.orprotocol.packet.Packet;
+
+public class Packet5OnlineState extends Packet {
+
+ private List pkNetworkStatuses;
+
+ @Override
+ public void read(Stream stream) {
+ int count = stream.readInt8();
+ this.pkNetworkStatuses = new java.util.ArrayList<>();
+ for (int i = 0; i < count; i++) {
+ String publicKey = stream.readString();
+ boolean online = stream.readBoolean();
+ PKNetworkStatus status = new PKNetworkStatus(publicKey, NetworkStatus.fromBoolean(online));
+ this.pkNetworkStatuses.add(status);
+ }
+ }
+
+ @Override
+ public Stream write() {
+ Stream stream = new Stream();
+ stream.writeInt16(this.packetId);
+ stream.writeInt8(this.pkNetworkStatuses.size());
+ for (PKNetworkStatus status : this.pkNetworkStatuses) {
+ stream.writeString(status.getPublicKey());
+ stream.writeBoolean(status.getNetworkStatus().toBoolean());
+ }
+ return stream;
+ }
+
+ public List getPkNetworkStatuses() {
+ return pkNetworkStatuses;
+ }
+
+ public void setPkNetworkStatuses(List pkNetworkStatuses) {
+ this.pkNetworkStatuses = pkNetworkStatuses;
+ }
+
+}
diff --git a/src/main/java/com/rosetta/im/packet/runtime/NetworkStatus.java b/src/main/java/com/rosetta/im/packet/runtime/NetworkStatus.java
index 602dfd2..11a4852 100644
--- a/src/main/java/com/rosetta/im/packet/runtime/NetworkStatus.java
+++ b/src/main/java/com/rosetta/im/packet/runtime/NetworkStatus.java
@@ -32,4 +32,8 @@ public enum NetworkStatus {
}
return NetworkStatus.OFFLINE;
}
+
+ public boolean toBoolean() {
+ return this.code == 0;
+ }
}
diff --git a/src/main/java/com/rosetta/im/packet/runtime/PKNetworkStatus.java b/src/main/java/com/rosetta/im/packet/runtime/PKNetworkStatus.java
new file mode 100644
index 0000000..a1c2815
--- /dev/null
+++ b/src/main/java/com/rosetta/im/packet/runtime/PKNetworkStatus.java
@@ -0,0 +1,35 @@
+package com.rosetta.im.packet.runtime;
+
+/**
+ * Сущность для обозначения статуса сети
+ * пользователя по публичному ключу
+ */
+public class PKNetworkStatus {
+
+ public String publicKey;
+ public NetworkStatus networkStatus;
+
+ public PKNetworkStatus() {}
+
+ public PKNetworkStatus(String publicKey, NetworkStatus networkStatus) {
+ this.publicKey = publicKey;
+ this.networkStatus = networkStatus;
+ }
+
+ public String getPublicKey() {
+ return publicKey;
+ }
+
+ public void setPublicKey(String publicKey) {
+ this.publicKey = publicKey;
+ }
+
+ public NetworkStatus getNetworkStatus() {
+ return networkStatus;
+ }
+
+ public void setNetworkStatus(NetworkStatus networkStatus) {
+ this.networkStatus = networkStatus;
+ }
+
+}
diff --git a/src/main/java/io/orprotocol/index/ClientIndexer.java b/src/main/java/io/orprotocol/index/ClientIndexer.java
index 2e3a7df..2a4f70d 100644
--- a/src/main/java/io/orprotocol/index/ClientIndexer.java
+++ b/src/main/java/io/orprotocol/index/ClientIndexer.java
@@ -48,7 +48,7 @@ public class ClientIndexer {
/**
* Инициализируем индексы для этого класса тега
- * ВАЖНО! computeIfAbsent используеьтся потому, что нам нужно либо
+ * ВАЖНО! computeIfAbsent используется потому, что нам нужно либо
* положить значение в indices либо вернуть актуальное значение оттуда.
* Если использовать putIfAbsent, то он вернет null если значение там уже есть,
* что не подходит