From 695ec9c5206f555a62e7716ff18a28e692001da5 Mon Sep 17 00:00:00 2001 From: RoyceDa Date: Tue, 3 Feb 2026 06:53:28 +0200 Subject: [PATCH] =?UTF-8?q?=D0=9E=D0=B1=D1=80=D0=B0=D0=B1=D0=BE=D1=82?= =?UTF-8?q?=D0=BA=D0=B0=20=D0=BE=D1=81=D1=82=D0=B0=D0=BD=D0=BE=D0=B2=D0=BA?= =?UTF-8?q?=D0=B8=20=D1=81=D0=B5=D1=80=D0=B2=D0=B5=D1=80=D0=B0,=20=D1=84?= =?UTF-8?q?=D0=B8=D0=BA=D1=81=20EventManager?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../java/com/rosetta/im/ContextBuilder.java | 2 + .../com/rosetta/im/event/EventManager.java | 15 ++++--- .../im/listeners/ServerStopListener.java | 41 +++++++++++++++++++ .../im/service/services/DeviceService.java | 13 ++++++ src/main/java/io/orprotocol/Server.java | 30 ++++++++++++++ 5 files changed, 93 insertions(+), 8 deletions(-) create mode 100644 src/main/java/com/rosetta/im/listeners/ServerStopListener.java diff --git a/src/main/java/com/rosetta/im/ContextBuilder.java b/src/main/java/com/rosetta/im/ContextBuilder.java index a687823..471690a 100644 --- a/src/main/java/com/rosetta/im/ContextBuilder.java +++ b/src/main/java/com/rosetta/im/ContextBuilder.java @@ -1,6 +1,7 @@ package com.rosetta.im; import com.rosetta.im.event.EventManager; +import com.rosetta.im.listeners.ServerStopListener; public class ContextBuilder { @@ -13,6 +14,7 @@ public class ContextBuilder { * Создание глобальных объектов приложения */ EventManager eventManager = new EventManager(); + eventManager.registerListener(new ServerStopListener()); this.appContext = new AppContext(eventManager); return this.appContext; diff --git a/src/main/java/com/rosetta/im/event/EventManager.java b/src/main/java/com/rosetta/im/event/EventManager.java index f92517c..1d46fc7 100644 --- a/src/main/java/com/rosetta/im/event/EventManager.java +++ b/src/main/java/com/rosetta/im/event/EventManager.java @@ -66,7 +66,7 @@ public class EventManager { * @throws EventException Ошибка при обработке события * @throws Exception Общая ошибка при вызове метода */ - private boolean fireEvent(Event event) throws EventException, Exception { + private boolean fireEvent(Event event) throws EventException { for(Listener listener : this.listeners) { /** * Получаем все методы в Listener с аннотацией @EventHandler @@ -81,8 +81,12 @@ public class EventManager { /** * Если параметры совпадают и они одного типа - вызываем событие */ - method.setAccessible(true); - method.invoke(listener, event); + try{ + method.setAccessible(true); + method.invoke(listener, event); + } catch (Exception e) { + throw new EventException("Error while invoking event handler method: " + method.getName(), e); + } /** * Если событие отменяемое - проверяем его статус */ @@ -94,11 +98,6 @@ public class EventManager { } continue; } - /** - * Если метод обрабатывающий событие - * реализован неправильно - выбрасываем исключение - */ - throw new EventException("Invalid event params"); } } return false; diff --git a/src/main/java/com/rosetta/im/listeners/ServerStopListener.java b/src/main/java/com/rosetta/im/listeners/ServerStopListener.java new file mode 100644 index 0000000..e56f16a --- /dev/null +++ b/src/main/java/com/rosetta/im/listeners/ServerStopListener.java @@ -0,0 +1,41 @@ +package com.rosetta.im.listeners; + +import com.rosetta.im.client.tags.ECIAuthentificate; +import com.rosetta.im.client.tags.ECIDevice; +import com.rosetta.im.database.repository.DeviceRepository; +import com.rosetta.im.event.EventHandler; +import com.rosetta.im.event.Listener; +import com.rosetta.im.event.events.ServerStopEvent; +import com.rosetta.im.service.services.DeviceService; + +import io.orprotocol.Server; +import io.orprotocol.client.Client; + +/** + * При остановке сервера нам нужно обновить всем клиентам время последней активности устройства + * чтобы корректно потом отработать загрузку сообщений для пользователя + */ +public class ServerStopListener implements Listener { + + private static final DeviceRepository deviceRepository = new DeviceRepository(); + private static final DeviceService deviceService = new DeviceService(deviceRepository); + + @EventHandler + public void onServerStop(ServerStopEvent event) { + Server server = event.getServer(); + System.out.println("Server is stopping. Please wait..."); + for(Client client : server.getClients()){ + ECIAuthentificate eciAuth = client.getTag(ECIAuthentificate.class); + if(eciAuth == null || !eciAuth.hasAuthorized()){ + continue; + } + ECIDevice eciDevice = client.getTag(ECIDevice.class); + if(eciDevice == null){ + continue; + } + deviceService.updateDeviceLeaveTime(eciDevice.getDeviceId()); + } + System.out.println("Server stopped successfully."); + } + +} diff --git a/src/main/java/com/rosetta/im/service/services/DeviceService.java b/src/main/java/com/rosetta/im/service/services/DeviceService.java index 124c340..8645651 100644 --- a/src/main/java/com/rosetta/im/service/services/DeviceService.java +++ b/src/main/java/com/rosetta/im/service/services/DeviceService.java @@ -45,4 +45,17 @@ public class DeviceService extends Service { return this.getRepository().countByField("publicKey", user.getPublicKey()); } + /** + * Обновляет время последней активности устройства + * @param deviceId ID устройства + */ + public void updateDeviceLeaveTime(String deviceId) { + Device device = this.getRepository().findByField("deviceId", deviceId); + if(device == null) { + return; + } + device.setLeaveTime(System.currentTimeMillis()); + this.getRepository().update(device); + } + } diff --git a/src/main/java/io/orprotocol/Server.java b/src/main/java/io/orprotocol/Server.java index 27a5848..a14416c 100644 --- a/src/main/java/io/orprotocol/Server.java +++ b/src/main/java/io/orprotocol/Server.java @@ -2,6 +2,8 @@ package io.orprotocol; import java.net.InetSocketAddress; import java.nio.ByteBuffer; +import java.util.HashSet; +import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -194,6 +196,7 @@ public class Server extends WebSocketServer { * Настраиваем планировщик для проверки активности клиентов. */ this.inactivityShedulerTweaker(); + this.wakeupShutdownHook(); int port = this.getPort(); System.out.println("\u001B[32mServer started at x.x.x.x:" + port + "\u001B[0m"); @@ -203,6 +206,18 @@ public class Server extends WebSocketServer { this.listener.onServerStart(this); } + /** + * Получить список подключенных клиентов + * @return множество клиентов + */ + public Set getClients() { + Set clients = new HashSet<>(); + for(WebSocket socket : this.getConnections()) { + clients.add(socket.getAttachment()); + } + return clients; + } + /** * Планировщик для проверки активности клиентов. * Если планировщик обнаруживает неактивного клиента, он отключает его с соответствующим кодом ошибки. @@ -218,4 +233,19 @@ public class Server extends WebSocketServer { }, this.settings.heartbeatInterval, this.settings.heartbeatInterval, TimeUnit.MILLISECONDS); } + /** + * Пробуждает хук завершения для корректной остановки сервера. + */ + private void wakeupShutdownHook() { + if(this.listener == null){ + return; + } + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + /** + * Останавливаем сервер при завершении работы и вызываем слушатели остановки сервера. + */ + System.out.println("JVM Shutdown detected, stopping server..."); + this.listener.onServerStop(this); + })); + } }