Миграция протокола

This commit is contained in:
RoyceDa
2026-02-02 05:30:38 +02:00
parent a05501fb80
commit bbc83c7d39
17 changed files with 36 additions and 32 deletions

View File

@@ -2,7 +2,8 @@ package com.rosetta.im;
import com.rosetta.im.executors.Executor0Handshake;
import com.rosetta.im.packet.Packet0Handshake;
import com.rosetta.im.protocol.packet.PacketManager;
import io.orprotocol.packet.PacketManager;
public class Boot {

View File

@@ -1,6 +1,6 @@
package com.rosetta.im;
import com.rosetta.im.protocol.BaseFailures;
import io.orprotocol.BaseFailures;
public enum Failures implements BaseFailures {
/**

View File

@@ -1,8 +1,8 @@
package com.rosetta.im;
import com.rosetta.im.protocol.Server;
import com.rosetta.im.protocol.Settings;
import com.rosetta.im.protocol.packet.PacketManager;
import io.orprotocol.Server;
import io.orprotocol.Settings;
import io.orprotocol.packet.PacketManager;
public class Main {
public static void main(String[] args) {

View File

@@ -1,7 +1,8 @@
package com.rosetta.im.client.tags;
import com.rosetta.im.packet.enums.HandshakeStage;
import com.rosetta.im.protocol.client.ECITag;
import io.orprotocol.client.ECITag;
/**
* Это вложенный обьект для клиента, содержащий информацию об аутентификации.

View File

@@ -10,9 +10,10 @@ import com.rosetta.im.database.DatabaseManager;
import com.rosetta.im.database.entity.User;
import com.rosetta.im.packet.Packet0Handshake;
import com.rosetta.im.packet.enums.HandshakeStage;
import com.rosetta.im.protocol.client.Client;
import com.rosetta.im.protocol.packet.Packet;
import com.rosetta.im.protocol.packet.PacketExecutor;
import io.orprotocol.client.Client;
import io.orprotocol.packet.Packet;
import io.orprotocol.packet.PacketExecutor;
public class Executor0Handshake extends PacketExecutor {

View File

@@ -2,8 +2,9 @@ package com.rosetta.im.packet;
import com.rosetta.im.client.tags.Device;
import com.rosetta.im.packet.enums.HandshakeStage;
import com.rosetta.im.protocol.Stream;
import com.rosetta.im.protocol.packet.Packet;
import io.orprotocol.Stream;
import io.orprotocol.packet.Packet;
/**
* Пакет хэндшейка между клиентом и сервером.

View File

@@ -1,9 +0,0 @@
package com.rosetta.im.protocol;
public interface BaseFailures {
/**
* Получает код ошибки.
* @return
*/
int getCode();
}

View File

@@ -1,138 +0,0 @@
package com.rosetta.im.protocol;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.java_websocket.WebSocket;
import org.java_websocket.handshake.ClientHandshake;
import org.java_websocket.server.WebSocketServer;
import com.rosetta.im.protocol.client.Client;
import com.rosetta.im.protocol.packet.Packet;
import com.rosetta.im.protocol.packet.PacketExecutor;
import com.rosetta.im.protocol.packet.PacketManager;
public class Server extends WebSocketServer {
private PacketManager packetManager;
private Settings settings;
private ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
private Object attachment;
public Server(Settings settings, PacketManager packetManager, Object attachment) {
super(new InetSocketAddress(settings.port));
this.settings = settings;
this.packetManager = packetManager;
this.attachment = attachment;
}
@Override
public void onClose(WebSocket arg0, int arg1, String arg2, boolean arg3) {
}
@Override
public void onError(WebSocket arg0, Exception arg1) {
}
@Override
public void onMessage(WebSocket socket, String message) {
/**
* Обновляем время последнего полученного heartbeat.
* Так как клиент отпраивл нам пакет, он живой.
*/
Client client = socket.getAttachment();
client.updateHeartbeat();
}
@Override
public void onMessage(WebSocket socket, ByteBuffer byteBuffer) {
Client client = socket.getAttachment();
byte[] bytes = byteBuffer.array();
Stream stream = new Stream(bytes);
int packetId = stream.readInt16();
/**
* Обновляем время последнего полученного heartbeat.
* Так как клиент отпраивл нам пакет, он живой.
*/
client.updateHeartbeat();
if(!this.packetManager.hasPacketSupported(packetId)){
/**
* Если пакет не поддерживается, отключаем клиента с соответствующим кодом ошибки.
*/
client.disconnect(ServerFailures.UNSUPPORTED_PACKET);
return;
}
if(!this.packetManager.hasExecutorDelegated(packetId)){
/**
* Если для пакета не назначен обработчик, отключаем клиента с соответствующим кодом ошибки.
*/
client.disconnect(ServerFailures.UNSUPPORTED_PACKET);
return;
}
Class<? extends Packet> packetClass = this.packetManager.getPacketClass(packetId);
try {
Packet packet = packetClass.getConstructor().newInstance();
packet.packetId = packetId;
/**
* Читаем данные пакета из потока.
*/
packet.read(stream);
/**
* Получаем обработчик пакета и вызываем его метод обработки.
*/
Class<? extends PacketExecutor> executorClass = this.packetManager.getExecutors().get(packetId);
PacketExecutor executor = executorClass.getConstructor().newInstance();
executor.settings = this.settings;
executor.attachment = this.attachment;
executor.onPacketReceived(packet, client);
} catch (Exception e) {
System.out.println("Error while processing packet " + packetClass.getName());
System.out.println(e.getStackTrace());
}
}
@Override
public void onOpen(WebSocket socket, ClientHandshake arg1) {
/**
* Создаем нового клиента при открытии соединения.
* Передаем интервал heartbeat из настроек сервера.
* Если клиент не отправляет heartbeat в указанный интервал, его можно отключить.
*/
Client client = new Client(socket, this.settings.heartbeatInterval);
socket.setAttachment(client);
}
@Override
public void onStart() {
/**
* Настраиваем планировщик для проверки активности клиентов.
*/
this.inactivityShedulerTweaker();
int port = this.getPort();
System.out.println("\u001B[32mServer started at x.x.x.x:" + port + "\u001B[0m");
}
/**
* Планировщик для проверки активности клиентов.
* Если планировщик обнаруживает неактивного клиента, он отключает его с соответствующим кодом ошибки.
*/
public void inactivityShedulerTweaker() {
this.scheduler.scheduleAtFixedRate(() -> {
for(WebSocket socket : this.getConnections()) {
Client client = socket.getAttachment();
if(!client.isAlive()) {
client.disconnect(ServerFailures.INACTIVITY_TIMEOUT);
}
}
}, this.settings.heartbeatInterval, this.settings.heartbeatInterval, TimeUnit.MILLISECONDS);
}
}

View File

@@ -1,55 +0,0 @@
package com.rosetta.im.protocol;
/**
* Перечисление кодов ошибок, используемых в протоколе.
*/
public enum ServerFailures implements BaseFailures {
/**
* Код ошибки, указывающий на несоответствие данных.
*/
DATA_MISSMATCH(3001),
/**
* Код ошибки, указывающий на незавершенное рукопожатие.
*/
HANDSHAKE_NOT_COMPLETED(3002),
/**
* Код ошибки, указывающий на некорректный пакет.
*/
BAD_PACKET(3003),
/**
* Код ошибки, указывающий на некорректный пакет.
*/
INVALID_PACKET(3003),
/**
* Код ошибки, указывающий на тайм-аут бездействия.
*/
INACTIVITY_TIMEOUT(3004),
/**
* Код ошибки, указывающий на неизвестный тип пакета.
*/
PACKET_ID_FAILURE(3998),
/**
* Код ошибки, указывающий на неизвестный тип пакета.
*/
UNSUPPORTED_PACKET(3998),
/**
* Код ошибки, указывающий на неизвестную ошибку.
*/
UNKNOWN_FAILURE(3999);
private final int code;
ServerFailures(int code) {
this.code = code;
}
/**
* Получает код ошибки.
* @return Код ошибки.
*/
public int getCode() {
return code;
}
}

View File

@@ -1,18 +0,0 @@
package com.rosetta.im.protocol;
public class Settings {
/**
* Порт сервера
*/
public int port = 8881;
/**
* Интервал отправки heartbeat пакетов в секундах.
* Если клиент не отправляет heartbeat пакеты в течение этого времени, сервер может считать его отключенным.
*/
public long heartbeatInterval = 30;
public Settings(int port, long heartbeatInterval) {
this.port = port;
this.heartbeatInterval = heartbeatInterval;
}
}

View File

@@ -1,179 +0,0 @@
package com.rosetta.im.protocol;
public class Stream {
private byte[] stream;
private int readPointer = 0;
private int writePointer = 0;
public Stream() {
this.stream = new byte[0];
}
public Stream(byte[] stream) {
this.stream = stream;
}
public byte[] getStream() {
return this.stream;
}
public void setStream(byte[] stream) {
this.stream = stream;
}
public void writeInt8(int value) {
int negationBit = value < 0 ? 1 : 0;
int int8Value = Math.abs(value) & 0xFF;
ensureCapacity(writePointer >> 3);
stream[writePointer >> 3] |= (byte)(negationBit << (7 - (writePointer & 7)));
writePointer++;
for (int i = 0; i < 8; i++) {
int bit = (int8Value >> (7 - i)) & 1;
ensureCapacity(writePointer >> 3);
stream[writePointer >> 3] |= (byte)(bit << (7 - (writePointer & 7)));
writePointer++;
}
}
public int readInt8() {
int value = 0;
int negationBit = (stream[readPointer >> 3] >> (7 - (readPointer & 7))) & 1;
readPointer++;
for (int i = 0; i < 8; i++) {
int bit = (stream[readPointer >> 3] >> (7 - (readPointer & 7))) & 1;
value |= bit << (7 - i);
readPointer++;
}
return negationBit == 1 ? -value : value;
}
public void writeBit(int value) {
int bit = value & 1;
ensureCapacity(writePointer >> 3);
stream[writePointer >> 3] |= (byte)(bit << (7 - (writePointer & 7)));
writePointer++;
}
public int readBit() {
int bit = (stream[readPointer >> 3] >> (7 - (readPointer & 7))) & 1;
readPointer++;
return bit;
}
public void writeBoolean(boolean value) {
writeBit(value ? 1 : 0);
}
public boolean readBoolean() {
return readBit() == 1;
}
public void writeInt16(int value) {
writeInt8(value >> 8);
writeInt8(value & 0xFF);
}
public int readInt16() {
int value = readInt8() << 8;
return value | readInt8();
}
public void writeInt32(int value) {
writeInt16(value >> 16);
writeInt16(value & 0xFFFF);
}
public int readInt32() {
int value = readInt16() << 16;
return value | readInt16();
}
public void writeInt64(long value) {
int high = (int)(value >> 32);
int low = (int)value;
writeInt32(high);
writeInt32(low);
}
public long readInt64() {
long high = readInt32();
long low = readInt32() & 0xFFFFFFFFL;
return (high << 32) | low;
}
public void writeFloat32(float value) {
int floatValue = Float.floatToIntBits(value);
writeInt32(floatValue);
}
public float readFloat32() {
int floatValue = readInt32();
return Float.intBitsToFloat(floatValue);
}
public void writeString(String value) {
int length = value.length();
writeInt32(length);
for (int i = 0; i < value.length(); i++) {
writeInt16(value.charAt(i));
}
}
public String readString() {
int length = readInt32();
// Security fix for string length exceeding stream capacity
if (length < 0 || length > (stream.length - (readPointer >> 3))) {
System.out.println("Stream readString length invalid: " + length + ", stream length: " + stream.length + ", readPointer: " + readPointer);
return "";
}
StringBuilder value = new StringBuilder();
for (int i = 0; i < length; i++) {
value.append((char)readInt16());
}
return value.toString();
}
public void writeBytes(byte[] value) {
writeInt32(value.length);
for (int i = 0; i < value.length; i++) {
writeInt8(value[i]);
}
}
public byte[] readBytes() {
int length = readInt32();
byte[] value = new byte[length];
for (int i = 0; i < length; i++) {
value[i] = (byte)readInt8();
}
return value;
}
public boolean isEmpty() {
return this.stream.length == 0;
}
public int length() {
return this.stream.length;
}
public byte[] getBuffer() {
return this.stream;
}
private void ensureCapacity(int byteIndex) {
if (byteIndex >= stream.length) {
byte[] newStream = new byte[byteIndex + 1];
System.arraycopy(stream, 0, newStream, 0, stream.length);
stream = newStream;
}
}
}

View File

@@ -1,163 +0,0 @@
package com.rosetta.im.protocol.client;
import java.util.HashSet;
import java.util.Set;
import org.java_websocket.WebSocket;
import com.rosetta.im.protocol.BaseFailures;
import com.rosetta.im.protocol.ServerFailures;
import com.rosetta.im.protocol.util.StringUtil;
/**
* Клиент, подключенный к серверу.
*/
public class Client {
public WebSocket socket;
public String clientId;
/**
* Любые данные, связанные с клиентом.
*/
public Set<ECITag> eciTags;
/**
* Интервал отправки heartbeat пакетов в миллисекундах.
*/
public long heartbeatInterval = 0;
/**
* Время последнего полученного heartbeat в миллисекундах.
*/
private volatile long lastHeartbeatTime;
/**
* Создает нового клиента с указанным сокетом.
* Этот метод используется внутри протокола для управления подключениями клиентов.
* @param socket Веб-сокет клиента.
*
*/
public Client(WebSocket socket, long heartbeatInterval) {
this.socket = socket;
this.clientId = StringUtil.randomString(32);
this.eciTags = new HashSet<ECITag>();
this.heartbeatInterval = heartbeatInterval;
this.lastHeartbeatTime = System.currentTimeMillis();
}
/**
* Проверяет жив ли клиент на основе времени последнего heartbeat.
* Если с момента последнего heartbeat прошло больше, чем указанный интервал, клиент считается неактивным.
*
* Для того чтобы исключить сетевые задержки, проверка умножает интервал на 2.
* @return
*/
public boolean isAlive() {
return (System.currentTimeMillis() - this.lastHeartbeatTime) * 2 <= this.heartbeatInterval * 1000;
}
/**
* Обновляет время последнего полученного heartbeat на текущее время.
*/
public void updateHeartbeat() {
this.lastHeartbeatTime = System.currentTimeMillis();
}
/**
* Получает уникальный идентификатор клиента.
* @return Идентификатор клиента.
*/
public String getClientId() {
return clientId;
}
/**
* Получает данные, связанные с клиентом.
* @return Данные клиента.
*/
public Set<ECITag> getEciTags() {
return this.eciTags;
}
/**
* Устанавливает уникальный идентификатор клиента.
* @param clientId Идентификатор клиента.
*/
public void setClientId(String clientId) {
this.clientId = clientId;
}
/**
* Устанавливает данные для клиента по указанному ключу.
* @param key Ключ данных.
* @param value Значение данных.
*/
public <T extends ECITag> void addTag(T eciTag) {
this.eciTags.add(eciTag);
}
/**
* Устанавливает данные клиента.
* @param data Данные клиента.
*/
public void setTags(Set<ECITag> eciTags) {
this.eciTags = eciTags;
}
/**
* Получает данные клиента по указанному ключу.
* @param key Ключ данных.
* @return Значение данных.
*/
public <T extends ECITag> T getTag(ECITag eciTag) {
for (ECITag tag : this.eciTags) {
if (tag.getKey().equals(eciTag.getKey())) {
return (T) tag;
}
}
return null;
}
/**
* Проверяет наличие данных клиента по указанному ключу.
* @param key Ключ данных.
* @return true если данные существуют, иначе false.
*/
public boolean hasTag(ECITag eciTag) {
for (ECITag tag : this.eciTags) {
if (tag.getKey().equals(eciTag.getKey())) {
return true;
}
}
return false;
}
/**
* Получает веб-сокет клиента.
* @return Веб-сокет.
*/
public WebSocket getSocket() {
return socket;
}
/**
* Отключает клиента с указанным кодом.
* @param code Код отключения.
*/
public void disconnect(int code) {
this.socket.close(code);
}
/**
* Отключает клиента с указанным кодом отказа.
* @param code Код отказа.
*/
public void disconnect(BaseFailures code) {
this.disconnect(code.getCode());
}
/**
* Отключает клиента с неизвестной причиной.
*/
public void disconnect() {
this.disconnect(ServerFailures.UNKNOWN_FAILURE);
}
}

View File

@@ -1,31 +0,0 @@
package com.rosetta.im.protocol.client;
/**
* Embedded Client Information Tag.
*
* Используется для хранения дополнительной информации о клиенте.
*/
public abstract class ECITag {
/**
* Ключ тега.
*/
public String key;
/**
* Создает новый тег с указанным ключом и значением.
* @param key Ключ тега.
*/
public ECITag(String key) {
this.key = key;
}
/**
* Получает ключ тега.
* @return Ключ тега.
*/
public String getKey() {
return this.key;
}
}

View File

@@ -1,31 +0,0 @@
package com.rosetta.im.protocol.packet;
import com.rosetta.im.protocol.Stream;
/**
* Представляет собой абстрактный класс для всех пакетов, используемых в протоколе.
*/
public abstract class Packet {
public int packetId;
public PacketManager packetManager;
public Packet() {
}
/**
* Записывает данные пакета в поток. Исползуется при отправке
*
* @return Поток с записанными данными пакета.
*/
public abstract Stream write();
/**
* Читает данные пакета из потока. Используется при получении
*
* @param stream Поток с данными пакета.
*/
public abstract void read(Stream stream);
}

View File

@@ -1,35 +0,0 @@
package com.rosetta.im.protocol.packet;
import com.rosetta.im.protocol.Settings;
import com.rosetta.im.protocol.client.Client;
/**
* Базовый класс для обработчиков пакетов.
*/
public abstract class PacketExecutor {
public Settings settings;
public Object attachment;
/**
* Настройки сервера.
* @return
*/
public Settings getSettings() {
return settings;
}
/**
* Вложенный обьект, который был передан при создании сервера.
* @return вложенный обьект
*/
public Object getAttachment() {
return attachment;
}
/**
* Вызывается при получении пакета от клиента.
* @param packet Пакет, полученный от клиента.
* @param client Клиент, отправивший пакет.
*/
public abstract void onPacketReceived(Packet packet, Client client);
}

View File

@@ -1,91 +0,0 @@
package com.rosetta.im.protocol.packet;
import java.util.HashMap;
/**
* Менеджер сетевых пакетов и их обработчиков.
*/
public class PacketManager {
private HashMap<Integer, Class<? extends Packet>> packets;
private HashMap<Integer, Class<? extends PacketExecutor>> executors;
public PacketManager() {
this.packets = new HashMap<>();
this.executors = new HashMap<>();
}
/**
* Регистрирует пакет с указанным ID.
* @param packetId ID пакета
* @param packet Класс пакета
*/
public void registerPacket(int packetId, Class<? extends Packet> packet) {
this.packets.put(packetId, packet);
}
/**
* Проверяет, зарегистрирован ли обработчик для пакета с указанным ID.
* @param packetId ID пакета
* @return true, если обработчик зарегистрирован, иначе false.
*/
public boolean hasExecutorDelegated(int packetId) {
return this.executors.containsKey(packetId);
}
/**
* Проверяет, поддерживается ли пакет с указанным ID.
* @param packetId ID пакета
* @return true, если пакет поддерживается, иначе false.
*/
public boolean hasPacketSupported(int packetId) {
return this.packets.containsKey(packetId);
}
public HashMap<Integer, Class<? extends PacketExecutor>> getExecutors() {
return this.executors;
}
public Integer getPacketIdByClass(Class<? extends Packet> packetClass) {
for (var entry : this.packets.entrySet()) {
if (entry.getValue().equals(packetClass)) {
return entry.getKey();
}
}
return null;
}
public Class<? extends Packet> getPacketClass(int packetId) {
return this.packets.get(packetId);
}
/**
* Регистрирует обработчик пакета с указанным ID.
* @param packetId ID пакета
* @param executor Обработчик пакета
*/
public void registerExecutor(int packetId, Class <? extends PacketExecutor> executor) {
if (this.executors == null) {
this.executors = new HashMap<>();
}
this.executors.put(packetId, executor);
}
/**
* Возвращает общее количество зарегистрированных обработчиков пакетов.
* @return Количество обработчиков пакетов.
*/
public Integer totalExecutors() {
return this.executors.size();
}
/**
* Возвращает общее количество зарегистрированных пакетов.
* @return Количество пакетов.
*/
public Integer totalPackets() {
return this.packets.size();
}
}

View File

@@ -1,15 +0,0 @@
package com.rosetta.im.protocol.util;
public class StringUtil {
public static String randomString(int length) {
StringBuilder sb = new StringBuilder();
String characters = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789";
for (int i = 0; i < length; i++) {
int index = (int) (Math.random() * characters.length());
sb.append(characters.charAt(index));
}
return sb.toString();
}
}