Files
desktop/app/providers/ProtocolProvider/protocol/protocol.ts

280 lines
12 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import { EventEmitter } from "events";
import Packet from "./packet";
import PacketHandshake, { Device, HandshakeState } from "./packets/packet.handshake";
import { PacketMessage } from "./packets/packet.message";
import { PacketOnlineState } from "./packets/packet.onlinestate";
import { PacketOnlineSubscribe } from "./packets/packet.onlinesubscribe";
import { PacketRead } from "./packets/packet.read";
import { PacketResult } from "./packets/packet.result";
import { PacketSearch } from "./packets/packet.search";
import { PacketUserInfo } from "./packets/packet.userinfo";
import Stream from "./stream";
import { PacketDelivery } from "./packets/packet.delivery";
import { PacketRequestUpdate } from "./packets/packet.requestupdate";
import { RECONNECTING_INTERVAL } from "@/app/constants";
import { PacketTyping } from "./packets/packet.typeing";
import { PacketAvatar } from "./packets/packet.avatar";
import { PacketKernelUpdate } from "./packets/packet.kernelupdate";
import { PacketAppUpdate } from "./packets/packet.appupdate";
import { PacketRequestTransport } from "./packets/packet.requesttransport";
import { PacketPushNotification } from "./packets/packet.push.notification";
import { PacketCreateGroup } from "./packets/packet.create.group";
import { PacketGroupInfo } from "./packets/packet.group.info";
import { PacketGroupInviteInfo } from "./packets/packet.group.invite.info";
import { PacketGroupJoin } from "./packets/packet.group.join";
import { PacketGroupLeave } from "./packets/packet.group.leave";
import { PacketGroupBan } from "./packets/packet.group.ban";
import { PacketDeviceNew } from "./packets/packet.device.new";
import { PacketDeviceList } from "./packets/packet.device.list";
import { PacketDeviceResolve } from "./packets/packet.device.resolve";
export default class Protocol extends EventEmitter {
private serverAddress: string;
private socket: WebSocket | null = null;
private reconnectInterval: number = RECONNECTING_INTERVAL * 1000;
private isManuallyClosed: boolean = false;
private _supportedPackets: Map<number, Packet> = new Map();
private _packetWaiters: Map<number, ((packet: any) => void)[]> = new Map();
private _packetQueue: Packet[] = []; // Очередь для пакетов
private handshakeExchangeComplete : boolean = false;
private heartbeatIntervalTimer : NodeJS.Timeout | null = null;
constructor(serverAddress: string) {
super();
this.serverAddress = serverAddress;
this.loadAllSupportedPackets();
this.connect();
let _this = this;
this.waitPacket(0x00, (packet : PacketHandshake) => {
if(packet.getHandshakeState() == HandshakeState.COMPLETED) {
console.info('[protocol] %chandshake exchange complete', 'color: #12c456;');
_this.emit('handshake_complete');
_this.handshakeExchangeComplete = true;
_this._flushPacketQueue();
this.startHeartbeat(packet.getHeartbeatInterval());
}
if(packet.getHandshakeState() == HandshakeState.NEED_DEVICE_VERIFICATION) {
console.info('[protocol] %chandshake exchange need device verification', 'color: orange;');
_this.emit('handshake_need_device_verification');
_this._packetQueue = [];
this.startHeartbeat(packet.getHeartbeatInterval());
}
});
}
public startHeartbeat(intervalS : number) {
const heartbeat = () => {
if(this.socket && this.socket.readyState === WebSocket.OPEN){
this.socket?.send('heartbeat');
}
}
if(this.heartbeatIntervalTimer){
clearInterval(this.heartbeatIntervalTimer);
}
console.info(`[protocol] %cstarting heartbeat with interval: %c${intervalS} seconds`, 'color: #12c456;', 'color: orange;');
heartbeat();
this.heartbeatIntervalTimer = setInterval(() => {
heartbeat();
}, ((intervalS * 1000) / 2));
}
public startHandshakeExchange(publicKey: string, privateKey: string,
device: Device
) {
console.info(
`[protocol] %cstarting handshake exchange with server, public key: %c${publicKey}%c, private key hash: %c${privateKey}`,
'color: #deadcd;',
'color: #12c456;',
'color: #deadcd;',
'color: #12c456;'
);
let handshake = new PacketHandshake();
handshake.setPublicKey(publicKey);
handshake.setPrivateKey(privateKey);
handshake.setDevice(device);
this.sendPacket(handshake);
this.emit('handshake_start');
}
private loadAllSupportedPackets() {
this._supportedPackets.set(0x00, new PacketHandshake());
this._supportedPackets.set(0x01, new PacketUserInfo());
this._supportedPackets.set(0x02, new PacketResult());
this._supportedPackets.set(0x03, new PacketSearch());
this._supportedPackets.set(0x04, new PacketOnlineSubscribe());
this._supportedPackets.set(0x05, new PacketOnlineState());
this._supportedPackets.set(0x06, new PacketMessage());
this._supportedPackets.set(0x07, new PacketRead());
this._supportedPackets.set(0x08, new PacketDelivery());
this._supportedPackets.set(0x09, new PacketDeviceNew());
this._supportedPackets.set(0x0A, new PacketRequestUpdate());
this._supportedPackets.set(0x0B, new PacketTyping());
this._supportedPackets.set(0x0C, new PacketAvatar());
this._supportedPackets.set(0x0D, new PacketKernelUpdate());
this._supportedPackets.set(0x0E, new PacketAppUpdate());
this._supportedPackets.set(0x0F, new PacketRequestTransport());
this._supportedPackets.set(0x10, new PacketPushNotification());
this._supportedPackets.set(0x11, new PacketCreateGroup());
this._supportedPackets.set(0x12, new PacketGroupInfo());
this._supportedPackets.set(0x13, new PacketGroupInviteInfo());
this._supportedPackets.set(0x14, new PacketGroupJoin());
this._supportedPackets.set(0x15, new PacketGroupLeave());
this._supportedPackets.set(0x16, new PacketGroupBan());
this._supportedPackets.set(0x17, new PacketDeviceList());
this._supportedPackets.set(0x18, new PacketDeviceResolve());
}
private _findWaiters(packetId: number): ((packet: Packet) => void)[] {
if (!this._packetWaiters.has(packetId)) {
return [];
}
return this._packetWaiters.get(packetId)!;
}
public connect() {
this.socket = new WebSocket(this.serverAddress);
/**
* Сбрасываем флаг ручного закрытия соединения
* при подключении
*/
this.isManuallyClosed = false;
this.socket.addEventListener('open', () => {
//this.reconnectTryings = 0;
this.emit('connect');
this._flushPacketQueue(); // Отправляем все пакеты из очереди
});
this.socket.addEventListener('message', async (event: MessageEvent) => {
let stream = new Stream();
const data = await event.data.arrayBuffer();
const numbers = Array.from(new Uint8Array(data));
stream.setStream(numbers);
const packetId = stream.readInt16();
this._supportedPackets.get(packetId);
if (!this._supportedPackets.get(packetId)) {
console.error(`Unsupported packet ID: ${packetId}`);
return;
}
//чек безопасность
const packet = this._supportedPackets.get(packetId)!.clone();
packet._receive(stream);
const waiters = this._findWaiters(packetId);
if (waiters.length === 0) {
console.error(`No waiters found for packet ID: ${packetId}`);
return;
}
for (const waiter of waiters) {
waiter(packet);
}
});
this.socket.addEventListener('close', (e: CloseEvent) => {
console.log(`Connection reset by peer`, e.code);
this.handshakeExchangeComplete = false;
if (!this.isManuallyClosed) {
console.log(`Attempting to reconnect...`);
this.emit('reconnect');
setTimeout(() => {
this.connect();
if(this.socket?.readyState == WebSocket.OPEN){
return;
}
}, this.reconnectInterval);
}
});
}
private _flushPacketQueue() {
if (this.socket && this.socket.readyState === WebSocket.OPEN) {
for(let i = this._packetQueue.length - 1; i >= 0; i--){
let packet : Packet = this._packetQueue[i];
if(!this.handshakeExchangeComplete && packet.getPacketId() != 0x00){
/**
* Если рукопожатие еще не выполнено и текущий пакет для отправки из очереди -
* не рукопожатие то пропускаем, отправим в следующее очищение очереди
*/
continue;
}
this.sendPacket(packet);
this._packetQueue.splice(i, 1);
}
}
}
public async sendPacket(packet: Packet) {
if(!this.socket){
this.addPacketToQueue(packet);
return;
}
if(this.socket.readyState !== WebSocket.OPEN){
this.addPacketToQueue(packet);
return;
}
if(!this.handshakeExchangeComplete && packet.getPacketId() != 0x00){
this.addPacketToQueue(packet);
return;
}
const stream = await packet._send();
const packetName = packet.constructor.name;
const pIdHex = packet.getPacketId().toString(16).toLocaleUpperCase();
const pIdHexPadded = pIdHex.length === 1 ? '0' + pIdHex : pIdHex;
console.info(`[protocol] %csending packet: %c${packetName} (ID: 0x${pIdHexPadded})`, 'color: #deadcd;', 'color: orange;');
/**
* Если пакет больше максимально допустимого размера, то разбиваем его на чанки
* и отправляем по частям
*/
this.socket.send(Buffer.from(stream.getStream()));
}
public addPacketToQueue(packet : Packet) {
this._packetQueue.push(packet);
}
public close() {
this.isManuallyClosed = true;
if (this.socket) {
this.socket.close();
}
}
public unwaitPacket<T>(packet: number, callback: (packet: T) => void) {
if (!this._packetWaiters.has(packet)) {
return;
}
const waiters = this._packetWaiters.get(packet)!;
const index = waiters.indexOf(callback);
if (index !== -1) {
waiters.splice(index, 1);
}
if (waiters.length === 0) {
this._packetWaiters.delete(packet);
}
}
public waitPacket<T>(packet: number, callback: (packet: T) => void) : number {
if (!this._packetWaiters.has(packet)) {
this._packetWaiters.set(packet, []);
}
return this._packetWaiters.get(packet)!.push(callback);
}
/**
* Wait for a single packet to be received.
* @param packet packet number to wait
* @param callback callback to execute once the packet is received
*/
public waitPacketOnce<T>(packet: number, callback: (packet: T) => void) {
let wrapper = (receivedPacket: T) => {
callback(receivedPacket);
this.unwaitPacket(packet, wrapper);
};
this.waitPacket(packet, wrapper);
}
}