Обработка остановки сервера, фикс EventManager

This commit is contained in:
RoyceDa
2026-02-03 06:53:28 +02:00
parent 9b715df09d
commit 695ec9c520
5 changed files with 93 additions and 8 deletions

View File

@@ -1,6 +1,7 @@
package com.rosetta.im;
import com.rosetta.im.event.EventManager;
import com.rosetta.im.listeners.ServerStopListener;
public class ContextBuilder {
@@ -13,6 +14,7 @@ public class ContextBuilder {
* Создание глобальных объектов приложения
*/
EventManager eventManager = new EventManager();
eventManager.registerListener(new ServerStopListener());
this.appContext = new AppContext(eventManager);
return this.appContext;

View File

@@ -66,7 +66,7 @@ public class EventManager {
* @throws EventException Ошибка при обработке события
* @throws Exception Общая ошибка при вызове метода
*/
private boolean fireEvent(Event event) throws EventException, Exception {
private boolean fireEvent(Event event) throws EventException {
for(Listener listener : this.listeners) {
/**
* Получаем все методы в Listener с аннотацией @EventHandler
@@ -81,8 +81,12 @@ public class EventManager {
/**
* Если параметры совпадают и они одного типа - вызываем событие
*/
method.setAccessible(true);
method.invoke(listener, event);
try{
method.setAccessible(true);
method.invoke(listener, event);
} catch (Exception e) {
throw new EventException("Error while invoking event handler method: " + method.getName(), e);
}
/**
* Если событие отменяемое - проверяем его статус
*/
@@ -94,11 +98,6 @@ public class EventManager {
}
continue;
}
/**
* Если метод обрабатывающий событие
* реализован неправильно - выбрасываем исключение
*/
throw new EventException("Invalid event params");
}
}
return false;

View File

@@ -0,0 +1,41 @@
package com.rosetta.im.listeners;
import com.rosetta.im.client.tags.ECIAuthentificate;
import com.rosetta.im.client.tags.ECIDevice;
import com.rosetta.im.database.repository.DeviceRepository;
import com.rosetta.im.event.EventHandler;
import com.rosetta.im.event.Listener;
import com.rosetta.im.event.events.ServerStopEvent;
import com.rosetta.im.service.services.DeviceService;
import io.orprotocol.Server;
import io.orprotocol.client.Client;
/**
* При остановке сервера нам нужно обновить всем клиентам время последней активности устройства
* чтобы корректно потом отработать загрузку сообщений для пользователя
*/
public class ServerStopListener implements Listener {
private static final DeviceRepository deviceRepository = new DeviceRepository();
private static final DeviceService deviceService = new DeviceService(deviceRepository);
@EventHandler
public void onServerStop(ServerStopEvent event) {
Server server = event.getServer();
System.out.println("Server is stopping. Please wait...");
for(Client client : server.getClients()){
ECIAuthentificate eciAuth = client.getTag(ECIAuthentificate.class);
if(eciAuth == null || !eciAuth.hasAuthorized()){
continue;
}
ECIDevice eciDevice = client.getTag(ECIDevice.class);
if(eciDevice == null){
continue;
}
deviceService.updateDeviceLeaveTime(eciDevice.getDeviceId());
}
System.out.println("Server stopped successfully.");
}
}

View File

@@ -45,4 +45,17 @@ public class DeviceService extends Service<DeviceRepository> {
return this.getRepository().countByField("publicKey", user.getPublicKey());
}
/**
* Обновляет время последней активности устройства
* @param deviceId ID устройства
*/
public void updateDeviceLeaveTime(String deviceId) {
Device device = this.getRepository().findByField("deviceId", deviceId);
if(device == null) {
return;
}
device.setLeaveTime(System.currentTimeMillis());
this.getRepository().update(device);
}
}

View File

@@ -2,6 +2,8 @@ package io.orprotocol;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -194,6 +196,7 @@ public class Server extends WebSocketServer {
* Настраиваем планировщик для проверки активности клиентов.
*/
this.inactivityShedulerTweaker();
this.wakeupShutdownHook();
int port = this.getPort();
System.out.println("\u001B[32mServer started at x.x.x.x:" + port + "\u001B[0m");
@@ -203,6 +206,18 @@ public class Server extends WebSocketServer {
this.listener.onServerStart(this);
}
/**
* Получить список подключенных клиентов
* @return множество клиентов
*/
public Set<Client> getClients() {
Set<Client> clients = new HashSet<>();
for(WebSocket socket : this.getConnections()) {
clients.add(socket.getAttachment());
}
return clients;
}
/**
* Планировщик для проверки активности клиентов.
* Если планировщик обнаруживает неактивного клиента, он отключает его с соответствующим кодом ошибки.
@@ -218,4 +233,19 @@ public class Server extends WebSocketServer {
}, this.settings.heartbeatInterval, this.settings.heartbeatInterval, TimeUnit.MILLISECONDS);
}
/**
* Пробуждает хук завершения для корректной остановки сервера.
*/
private void wakeupShutdownHook() {
if(this.listener == null){
return;
}
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
/**
* Останавливаем сервер при завершении работы и вызываем слушатели остановки сервера.
*/
System.out.println("JVM Shutdown detected, stopping server...");
this.listener.onServerStop(this);
}));
}
}