package io.orprotocol; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.util.HashSet; import java.util.Set; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.java_websocket.WebSocket; import org.java_websocket.handshake.ClientHandshake; import org.java_websocket.server.WebSocketServer; import io.orprotocol.client.Client; import io.orprotocol.index.ClientIndexer; import io.orprotocol.lock.ThreadLocker; import io.orprotocol.packet.Packet; import io.orprotocol.packet.PacketExecutor; import io.orprotocol.packet.PacketFactory; import io.orprotocol.packet.PacketManager; public class Server extends WebSocketServer { private PacketManager packetManager; private Settings settings; private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); private ServerListener listener; private ThreadLocker threadLocker = new ThreadLocker(); private ClientIndexer clientIndexer = new ClientIndexer(); /** * Конструктор сервера * @param settings базовые настройки серверера * @param packetManager менеджер пакетов (обработчиков и зарегистрированных пакетов) */ public Server(Settings settings, PacketManager packetManager) { super(new InetSocketAddress(settings.port)); this.settings = settings; this.packetManager = packetManager; } /** * Конструктор сервера с объектом прикрепления и слушателем событий сервера * @param settings базовые настройки серверера * @param packetManager менеджер пакетов (обработчиков и зарегистрированных пакетов) * @param listener слушатель событий сервера * может быть использовано для передачи контекста приложения */ public Server(Settings settings, PacketManager packetManager, ServerListener listener) { super(new InetSocketAddress(settings.port)); this.settings = settings; this.packetManager = packetManager; this.listener = listener; } @Override public void onClose(WebSocket socket, int reasonCode, String arg2, boolean arg3) { if(this.listener == null){ return; } Client client = socket.getAttachment(); /** * Удаляем клиента из индексации (потому что он вышел) */ this.clientIndexer.removeClientFromIndex(client); /** * Вызываем событие отключения клиента */ this.listener.onClientDisconnect(this, client); } @Override public void onError(WebSocket arg0, Exception arg1) { arg1.printStackTrace(); if(this.listener == null){ return; } this.listener.onError(this, arg1); } @Override public void onMessage(WebSocket socket, String message) { /** * Обновляем время последнего полученного heartbeat. * Так как клиент отпраивл нам пакет, он живой. */ Client client = socket.getAttachment(); client.updateHeartbeat(); if(this.listener != null) { /** * Сообщение от клиента, но пакет не сформирован, передаем null */ this.listener.onPacketReceived(this, client, null); } } @SuppressWarnings("unchecked") @Override public void onMessage(WebSocket socket, ByteBuffer byteBuffer) { Client client = socket.getAttachment(); byte[] bytes = byteBuffer.array(); try { /** * Создаем пакет из полученных байтов. */ PacketFactory packetFactory = new PacketFactory(bytes, this.packetManager); Packet packet = packetFactory.createPacket(); int packetId = packetFactory.getPacketId(); /** * Получаем обработчик пакета и вызываем его метод обработки. * * @SuppressWarnings("rawtypes") используется потому что пакет это generic класс * и ему нельзя без указания. Но так как тип нам точно известен просто * убираем ошибку компилятора */ @SuppressWarnings("rawtypes") PacketExecutor executor = this.packetManager.getExecutors().get(packetId); if(executor == null){ /** * Нет назначенного обработчика для этого packetId */ client.disconnect(ServerFailures.UNSUPPORTED_PACKET); return; } executor.settings = this.settings; if(listener != null && !listener.onPacketReceived(this, client, packet)) { /** * Если слушатель сервера вернул false, пакет не обрабатываем. */ return; } /** * Проверяем наличие блокировки для данного пакета и ключа в аннотации @Lock. */ if(!threadLocker.acquireLock(packet, executor.getClass())) { /** * Если блокировка уже существует, значит другой поток обрабатывает пакет * с таким же значением lockFor, отклоняем текущий пакет. */ return; } try { executor.onPacketReceived(packet, client); } finally { /** * Снимаем блокировку после обработки пакета. */ threadLocker.releaseLock(packet, executor.getClass()); } } catch (Exception e) { client.disconnect(ServerFailures.BAD_PACKET); e.printStackTrace(); } } @Override public void onOpen(WebSocket socket, ClientHandshake handshake) { /** * Создаем нового клиента при открытии соединения. * Передаем интервал heartbeat из настроек сервера. * Если клиент не отправляет heartbeat в указанный интервал, его можно отключить. */ Client client = new Client(socket, this.settings.heartbeatInterval, this); String ipAddress = handshake.getFieldValue("X-Forwarded-For"); if (ipAddress == null || ipAddress.isEmpty()) { ipAddress = socket.getRemoteSocketAddress().getAddress().getHostAddress(); } else { /** * Берем первый IP адрес из списка разделенного запятой */ ipAddress = ipAddress.split(",")[0].trim(); } client.setIpAddress(ipAddress); socket.setAttachment(client); if(this.listener == null){ return; } if(!this.listener.onClientConnect(this, client)) { client.disconnect(ServerFailures.SERVER_NOT_ACCEPT_CLIENT); return; } } @Override public void onStart() { /** * Настраиваем планировщик для проверки активности клиентов. */ this.inactivityShedulerTweaker(); this.wakeupShutdownHook(); int port = this.getPort(); System.out.println("\u001B[32mServer started at x.x.x.x:" + port + "\u001B[0m"); if(this.listener == null){ return; } this.listener.onServerStart(this); } /** * Получить список подключенных клиентов * @return множество клиентов */ public Set getClients() { Set clients = new HashSet<>(); for(WebSocket socket : this.getConnections()) { clients.add(socket.getAttachment()); } return clients; } /** * Получает менеджер пакетов сервера (где хранятся зарегистрированные пакеты и обработчики) * @return PacketManager */ public PacketManager getPacketManager() { return this.packetManager; } /** * Получить индексатор сервера, нужен для быстрого поиска клиентов по индексам * @return ClientIndexer */ public ClientIndexer getClientIndexer() { return this.clientIndexer; } /** * Планировщик для проверки активности клиентов. * Если планировщик обнаруживает неактивного клиента, он отключает его с соответствующим кодом ошибки. */ public void inactivityShedulerTweaker() { this.scheduler.scheduleAtFixedRate(() -> { for(WebSocket socket : this.getConnections()) { Client client = socket.getAttachment(); if(!client.isAlive()) { client.disconnect(ServerFailures.INACTIVITY_TIMEOUT); } } }, this.settings.heartbeatInterval, this.settings.heartbeatInterval, TimeUnit.SECONDS); } /** * Пробуждает хук завершения для корректной остановки сервера. */ private void wakeupShutdownHook() { if(this.listener == null){ return; } Runtime.getRuntime().addShutdownHook(new Thread(() -> { /** * Останавливаем сервер при завершении работы и вызываем слушатели остановки сервера. */ this.listener.onServerStop(this); try { this.stop(); } catch (InterruptedException e) { e.printStackTrace(); } })); } }