Files
desktop/app/providers/DialogProvider/useSynchronize.ts

260 lines
12 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import { useEffect } from "react";
import { useProtocolState } from "../ProtocolProvider/useProtocolState";
import { ProtocolState } from "../ProtocolProvider/ProtocolProvider";
import { useDatabase } from "../DatabaseProvider/useDatabase";
import { usePublicKey } from "../AccountProvider/usePublicKey";
import { PacketSync, SyncStatus } from "../ProtocolProvider/protocol/packets/packet.sync";
import { useSender } from "../ProtocolProvider/useSender";
import { usePacket } from "../ProtocolProvider/usePacket";
import { runTaskInQueue, whenFinish } from "./dialogQueue";
import { useProtocol } from "../ProtocolProvider/useProtocol";
import { PacketGroupJoin } from "../ProtocolProvider/protocol/packets/packet.group.join";
import { useGroups } from "./useGroups";
import { chacha20Decrypt, decodeWithPassword, encodeWithPassword, generateMd5 } from "@/app/workers/crypto/crypto";
import { usePrivatePlain } from "../AccountProvider/usePrivatePlain";
import { GroupStatus } from "../ProtocolProvider/protocol/packets/packet.group.invite.info";
import { useConsoleLogger } from "@/app/hooks/useConsoleLogger";
import { useDialogsList } from "../DialogListProvider/useDialogsList";
import { useUpdateGroupInformation } from "../InformationProvider/useUpdateGroupInformation";
import { useGroupInviteStatus } from "./useGroupInviteStatus";
import { Attachment, AttachmentType, PacketMessage } from "../ProtocolProvider/protocol/packets/packet.message";
import { useUpdateSyncTime } from "./useUpdateSyncTime";
import { useFileStorage } from "@/app/hooks/useFileStorage";
import { DeliveredMessageState, Message } from "./DialogProvider";
import { MESSAGE_MAX_LOADED } from "@/app/constants";
import { useMemory } from "../MemoryProvider/useMemory";
import { useDialogsCache } from "./useDialogsCache";
import { PacketRead } from "../ProtocolProvider/protocol/packets/packet.read";
/**
* Хук отвечает за синхронизацию сообщений, запрос синхронизации
* при подключении
*/
export function useSynchronize() {
const [_, setProtocolState] = useProtocolState();
const {getQuery, runQuery} = useDatabase();
const publicKey = usePublicKey();
const send = useSender();
const {protocol} = useProtocol();
const {parseGroupString, hasGroup} = useGroups();
const privatePlain = usePrivatePlain();
const {error, info} = useConsoleLogger('useSynchronize');
const {setInviteStatusByGroupId} = useGroupInviteStatus('');
const updateGroupInformation = useUpdateGroupInformation();
const {updateDialog} = useDialogsList();
const updateSyncTime = useUpdateSyncTime();
const {writeFile} = useFileStorage();
const { getDialogCache, addOrUpdateDialogCache } = useDialogsCache();
const [currentDialogPublicKeyView, __] = useMemory("current-dialog-public-key-view", "", true);
useEffect(() => {
if(protocol.handshakeExchangeComplete){
trySync();
}
}, [protocol.handshakeExchangeComplete]);
const trySync = async () => {
const lastSyncTime = await getQuery(`SELECT last_sync FROM accounts_sync_times WHERE account = ?`, [publicKey]);
sendSynchronize(lastSyncTime?.last_sync ?? 0);
}
const sendSynchronize = (timestamp: number) => {
const packet = new PacketSync();
packet.setStatus(0);
packet.setTimestamp(timestamp);
send(packet);
}
/**
* Пакет приходит либо при входе в группу (но там используется слушатель once), либо при
* синхронизации. В данном случае этот пакет прийдет только при синхронизации
*/
usePacket(20, async (packet: PacketGroupJoin) => {
const decryptedGroupString = await decodeWithPassword(privatePlain, packet.getGroupString());
const parsed = await parseGroupString(decryptedGroupString);
if(!parsed){
error("Received invalid group string, skipping");
return;
}
const groupStatus = packet.getGroupStatus();
if(groupStatus != GroupStatus.JOINED){
error("Cannot sync group that is not joined, skipping");
return;
}
const secureKey = await encodeWithPassword(privatePlain, parsed.encryptKey);
await runQuery(`
INSERT INTO groups (account, group_id, title, description, key) VALUES (?, ?, ?, ?, ?)
`, [publicKey, parsed.groupId, parsed.title, parsed.description, secureKey]);
updateDialog("#group:" + parsed.groupId);
setInviteStatusByGroupId(parsed.groupId, GroupStatus.JOINED);
updateGroupInformation({
groupId: parsed.groupId,
title: parsed.title,
description: parsed.description
});
info("Group synchronized " + parsed.groupId);
}, [publicKey]);
usePacket(25, async (packet: PacketSync) => {
const status = packet.getStatus();
if(status == SyncStatus.BATCH_START){
setProtocolState(ProtocolState.SYNCHRONIZATION);
}
if(status == SyncStatus.BATCH_END){
/**
* Этот Promise ждет пока все сообщения синхронизируются и обработаются, только
* после этого
*/
await whenFinish();
await runQuery(
"INSERT INTO accounts_sync_times (account, last_sync) VALUES (?, ?) " +
"ON CONFLICT(account) DO UPDATE SET last_sync = ? WHERE account = ?",
[publicKey, packet.getTimestamp(), packet.getTimestamp(), publicKey]
);
console.info("Batch complete", publicKey, packet.getTimestamp());
trySync();
}
if(status == SyncStatus.NOT_NEEDED){
/**
* Синхронизация не нужна, все данные актуальны
*/
setProtocolState(ProtocolState.CONNECTED);
}
}, [publicKey]);
/**
* Нам приходят сообщения от себя самих же при синхронизации
* нужно обрабатывать их особым образом соотвественно
*
* Метод нужен для синхронизации своих сообщений
*/
usePacket(0x06, async (packet: PacketMessage) => {
runTaskInQueue(async () => {
const fromPublicKey = packet.getFromPublicKey();
const toPublicKey = packet.getToPublicKey();
const aesChachaKey = packet.getAesChachaKey();
const content = packet.getContent();
const timestamp = packet.getTimestamp();
const messageId = packet.getMessageId();
if (fromPublicKey != publicKey) {
/**
* Игнорируем если это не сообщение от нас
*/
return;
}
const chachaKey = await decodeWithPassword(privatePlain, aesChachaKey);
const chachaDecryptedKey = Buffer.from(chachaKey, "binary");
const key = chachaDecryptedKey.slice(0, 32);
const nonce = chachaDecryptedKey.slice(32);
const decryptedContent = await chacha20Decrypt(content, nonce.toString('hex'), key.toString('hex'));
await updateSyncTime(timestamp);
let attachmentsMeta: any[] = [];
let messageAttachments: Attachment[] = [];
for (let i = 0; i < packet.getAttachments().length; i++) {
const attachment = packet.getAttachments()[i];
let nextLength = messageAttachments.push({
...attachment,
blob: ""
});
if (attachment.type == AttachmentType.MESSAGES) {
/**
* Этот тип вложения приходит сразу в blob и не нуждается
* в последующем скачивании
*/
const decryptedBlob = await decodeWithPassword(chachaDecryptedKey.toString('utf-8'), attachment.blob);
writeFile(`m/${await generateMd5(attachment.id + publicKey)}`,
Buffer.from(await encodeWithPassword(privatePlain, decryptedBlob)).toString('binary'));
messageAttachments[nextLength - 1].blob = decryptedBlob;
}
attachmentsMeta.push({
id: attachment.id,
type: attachment.type,
preview: attachment.preview
});
}
const newMessage: Message = {
from_public_key: fromPublicKey,
to_public_key: toPublicKey,
content: content,
timestamp: timestamp,
readed: 1, //сообщение прочитано
chacha_key: chachaDecryptedKey.toString('utf-8'),
from_me: 1, //сообщение от нас
plain_message: (decryptedContent as string),
delivered: DeliveredMessageState.DELIVERED,
message_id: messageId,
attachments: messageAttachments
};
await runQuery(`
INSERT INTO messages
(from_public_key, to_public_key, content, timestamp, read, chacha_key, from_me, plain_message, account, message_id, delivered, attachments) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, [fromPublicKey,
toPublicKey,
content,
timestamp,
0, //по умолчанию не прочитаны
"sync:" + aesChachaKey,
1, //Свои же сообщения всегда от нас
await encodeWithPassword(privatePlain, decryptedContent),
publicKey,
messageId,
DeliveredMessageState.DELIVERED,
JSON.stringify(attachmentsMeta)]);
updateDialog(toPublicKey);
let dialogCache = getDialogCache(toPublicKey);
if (currentDialogPublicKeyView !== toPublicKey && dialogCache.length > 0) {
addOrUpdateDialogCache(toPublicKey, [...dialogCache, newMessage].slice(-MESSAGE_MAX_LOADED));
}
});
}, [privatePlain, currentDialogPublicKeyView]);
/**
* Обработчик синхронизации прочтения личных сообщений
*/
usePacket(0x07, async (packet: PacketRead) => {
runTaskInQueue(async () => {
if (hasGroup(packet.getToPublicKey())) {
/**
* Если это относится к группам, то игнорируем здесь,
* для этого есть отдельный слушатель usePacket ниже
*/
return;
}
const fromPublicKey = packet.getFromPublicKey();
const toPublicKey = packet.getToPublicKey();
if (fromPublicKey != publicKey) {
/**
* Игнорируем если это не синхронизация нашего прочтения
*/
return;
}
console.info("PACKED_READ_SYNC");
await runQuery(`UPDATE messages SET read = 1 WHERE from_public_key = ? AND to_public_key = ? AND account = ?`,
[toPublicKey, fromPublicKey, publicKey]);
console.info("updating with params ", [fromPublicKey, toPublicKey, publicKey]);
updateDialog(toPublicKey);
addOrUpdateDialogCache(fromPublicKey, getDialogCache(fromPublicKey).map((message) => {
if (message.from_public_key == toPublicKey && !message.readed) {
console.info("Marking message as read in cache for dialog with " + fromPublicKey);
console.info({ fromPublicKey, toPublicKey });
return {
...message,
readed: 1
}
}
return message;
}));
});
}, [updateDialog, publicKey]);
}