import { useEffect } from "react"; import { useProtocolState } from "../ProtocolProvider/useProtocolState"; import { ProtocolState } from "../ProtocolProvider/ProtocolProvider"; import { useDatabase } from "../DatabaseProvider/useDatabase"; import { usePublicKey } from "../AccountProvider/usePublicKey"; import { PacketSync, SyncStatus } from "../ProtocolProvider/protocol/packets/packet.sync"; import { useSender } from "../ProtocolProvider/useSender"; import { usePacket } from "../ProtocolProvider/usePacket"; import { runTaskInQueue, whenFinish } from "./dialogQueue"; import { useProtocol } from "../ProtocolProvider/useProtocol"; import { PacketGroupJoin } from "../ProtocolProvider/protocol/packets/packet.group.join"; import { useGroups } from "./useGroups"; import { chacha20Decrypt, decodeWithPassword, encodeWithPassword, generateMd5 } from "@/app/workers/crypto/crypto"; import { usePrivatePlain } from "../AccountProvider/usePrivatePlain"; import { GroupStatus } from "../ProtocolProvider/protocol/packets/packet.group.invite.info"; import { useConsoleLogger } from "@/app/hooks/useConsoleLogger"; import { useDialogsList } from "../DialogListProvider/useDialogsList"; import { useUpdateGroupInformation } from "../InformationProvider/useUpdateGroupInformation"; import { useGroupInviteStatus } from "./useGroupInviteStatus"; import { Attachment, AttachmentType, PacketMessage } from "../ProtocolProvider/protocol/packets/packet.message"; import { useUpdateSyncTime } from "./useUpdateSyncTime"; import { useFileStorage } from "@/app/hooks/useFileStorage"; import { AttachmentMeta, DeliveredMessageState, Message } from "./DialogProvider"; import { MESSAGE_MAX_LOADED, TIME_TO_INACTIVE_FOR_MESSAGES_UNREAD } from "@/app/constants"; import { useMemory } from "../MemoryProvider/useMemory"; import { useDialogsCache } from "./useDialogsCache"; import { PacketRead } from "../ProtocolProvider/protocol/packets/packet.read"; import { useLogger } from "@/app/hooks/useLogger"; import { useIdle } from "@mantine/hooks"; import { useViewPanelsState } from "@/app/hooks/useViewPanelsState"; import { useWindowFocus } from "@/app/hooks/useWindowFocus"; /** * Хук отвечает за синхронизацию сообщений, запрос синхронизации * при подключении */ export function useSynchronize() { const [protocolState, setProtocolState] = useProtocolState(); const {getQuery, runQuery} = useDatabase(); const publicKey = usePublicKey(); const send = useSender(); const {protocol} = useProtocol(); const {parseGroupString, hasGroup, getGroupKey} = useGroups(); const privatePlain = usePrivatePlain(); const {error, info} = useConsoleLogger('useSynchronize'); const log = useLogger('useSynchronize'); const {setInviteStatusByGroupId} = useGroupInviteStatus(''); const updateGroupInformation = useUpdateGroupInformation(); const {updateDialog} = useDialogsList(); const idle = useIdle(TIME_TO_INACTIVE_FOR_MESSAGES_UNREAD * 1000); const updateSyncTime = useUpdateSyncTime(); const {writeFile} = useFileStorage(); const { getDialogCache, addOrUpdateDialogCache } = useDialogsCache(); const [currentDialogPublicKeyView, __] = useMemory("current-dialog-public-key-view", "", true); const [viewState] = useViewPanelsState(); const focused = useWindowFocus(); useEffect(() => { if(protocol.handshakeExchangeComplete){ trySync(); } }, [protocol.handshakeExchangeComplete]); const trySync = async () => { const lastSyncTime = await getQuery(`SELECT last_sync FROM accounts_sync_times WHERE account = ?`, [publicKey]); sendSynchronize(lastSyncTime?.last_sync ?? 0); } const sendSynchronize = (timestamp: number) => { const packet = new PacketSync(); packet.setStatus(0); packet.setTimestamp(timestamp); send(packet); } /** * Пакет приходит либо при входе в группу (но там используется слушатель once), либо при * синхронизации. В данном случае этот пакет прийдет только при синхронизации */ usePacket(20, async (packet: PacketGroupJoin) => { const decryptedGroupString = await decodeWithPassword(privatePlain, packet.getGroupString()); const parsed = await parseGroupString(decryptedGroupString); if(!parsed){ error("Received invalid group string, skipping"); return; } const groupStatus = packet.getGroupStatus(); if(groupStatus != GroupStatus.JOINED){ error("Cannot sync group that is not joined, skipping"); return; } const secureKey = await encodeWithPassword(privatePlain, parsed.encryptKey); await runQuery(` INSERT INTO groups (account, group_id, title, description, key) VALUES (?, ?, ?, ?, ?) `, [publicKey, parsed.groupId, parsed.title, parsed.description, secureKey]); updateDialog("#group:" + parsed.groupId); setInviteStatusByGroupId(parsed.groupId, GroupStatus.JOINED); updateGroupInformation({ groupId: parsed.groupId, title: parsed.title, description: parsed.description }); info("Group synchronized " + parsed.groupId); }, [publicKey]); usePacket(25, async (packet: PacketSync) => { const status = packet.getStatus(); if(status == SyncStatus.BATCH_START){ setProtocolState(ProtocolState.SYNCHRONIZATION); } if(status == SyncStatus.BATCH_END){ /** * Этот Promise ждет пока все сообщения синхронизируются и обработаются, только * после этого */ await whenFinish(); await runQuery( "INSERT INTO accounts_sync_times (account, last_sync) VALUES (?, ?) " + "ON CONFLICT(account) DO UPDATE SET last_sync = ? WHERE account = ?", [publicKey, packet.getTimestamp(), packet.getTimestamp(), publicKey] ); console.info("Batch complete", publicKey, packet.getTimestamp()); trySync(); } if(status == SyncStatus.NOT_NEEDED){ /** * Синхронизация не нужна, все данные актуальны */ setProtocolState(ProtocolState.CONNECTED); } }, [publicKey]); /** * Нам приходят сообщения от себя самих же при синхронизации * нужно обрабатывать их особым образом соотвественно * * Метод нужен для синхронизации своих сообщений */ usePacket(0x06, async (packet: PacketMessage) => { runTaskInQueue(async () => { const fromPublicKey = packet.getFromPublicKey(); const toPublicKey = packet.getToPublicKey(); const aesChachaKey = packet.getAesChachaKey(); const content = packet.getContent(); const timestamp = packet.getTimestamp(); const messageId = packet.getMessageId(); if(hasGroup(toPublicKey)){ /** * Игнорируем если это сообщение для группы, для них есть отдельный слушатель usePacket ниже */ return; } if (fromPublicKey != publicKey) { /** * Игнорируем если это не сообщение от нас */ return; } const chachaKey = await decodeWithPassword(privatePlain, aesChachaKey); const chachaDecryptedKey = Buffer.from(chachaKey, "binary"); const key = chachaDecryptedKey.slice(0, 32); const nonce = chachaDecryptedKey.slice(32); const decryptedContent = await chacha20Decrypt(content, nonce.toString('hex'), key.toString('hex')); await updateSyncTime(timestamp); let attachmentsMeta: AttachmentMeta[] = []; let messageAttachments: Attachment[] = []; for (let i = 0; i < packet.getAttachments().length; i++) { const attachment = packet.getAttachments()[i]; let nextLength = messageAttachments.push({ ...attachment, blob: "" }); if (attachment.type == AttachmentType.MESSAGES) { /** * Этот тип вложения приходит сразу в blob и не нуждается * в последующем скачивании */ const decryptedBlob = await decodeWithPassword(chachaDecryptedKey.toString('utf-8'), attachment.blob); writeFile(`m/${await generateMd5(attachment.id + publicKey)}`, Buffer.from(await encodeWithPassword(privatePlain, decryptedBlob)).toString('binary')); messageAttachments[nextLength - 1].blob = decryptedBlob; } attachmentsMeta.push({ id: attachment.id, type: attachment.type, preview: attachment.preview, encoding: attachment.encoding, transport: attachment.transport }); } const newMessage: Message = { from_public_key: fromPublicKey, to_public_key: toPublicKey, content: content, timestamp: timestamp, readed: 1, //сообщение прочитано chacha_key: chachaDecryptedKey.toString('utf-8'), from_me: 1, //сообщение от нас plain_message: (decryptedContent as string), delivered: DeliveredMessageState.DELIVERED, message_id: messageId, attachments: messageAttachments }; await runQuery(` INSERT INTO messages (from_public_key, to_public_key, content, timestamp, read, chacha_key, from_me, plain_message, account, message_id, delivered, attachments) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `, [fromPublicKey, toPublicKey, content, timestamp, 0, //по умолчанию не прочитаны "sync:" + aesChachaKey, 1, //Свои же сообщения всегда от нас await encodeWithPassword(privatePlain, decryptedContent), publicKey, messageId, DeliveredMessageState.DELIVERED, JSON.stringify(attachmentsMeta)]); updateDialog(toPublicKey); let dialogCache = getDialogCache(toPublicKey); if (currentDialogPublicKeyView !== toPublicKey && dialogCache.length > 0) { addOrUpdateDialogCache(toPublicKey, [...dialogCache, newMessage].slice(-MESSAGE_MAX_LOADED)); } }); }, [privatePlain, currentDialogPublicKeyView]); /** * Обработчик синхронизации прочтения личных сообщений */ usePacket(0x07, async (packet: PacketRead) => { runTaskInQueue(async () => { if (hasGroup(packet.getToPublicKey())) { /** * Если это относится к группам, то игнорируем здесь, * для этого есть отдельный слушатель usePacket ниже */ return; } const fromPublicKey = packet.getFromPublicKey(); const toPublicKey = packet.getToPublicKey(); if (fromPublicKey != publicKey) { /** * Игнорируем если это не синхронизация нашего прочтения */ return; } await runQuery(`UPDATE messages SET read = 1 WHERE from_public_key = ? AND to_public_key = ? AND account = ?`, [toPublicKey, fromPublicKey, publicKey]); updateDialog(toPublicKey); addOrUpdateDialogCache(fromPublicKey, getDialogCache(fromPublicKey).map((message) => { if (message.from_public_key == toPublicKey && !message.readed) { console.info({ fromPublicKey, toPublicKey }); return { ...message, readed: 1 } } return message; })); }); }, [updateDialog, publicKey]); /** * Обработчик синхронизации прочтения групповых сообщений */ usePacket(0x07, async (packet: PacketRead) => { runTaskInQueue(async () => { const fromPublicKey = packet.getFromPublicKey(); const toPublicKey = packet.getToPublicKey(); if (!hasGroup(toPublicKey)) { /** * Если это не относится к группам, то игнорируем здесь, * для этого есть отдельный слушатель usePacket выше */ return; } if(fromPublicKey != publicKey){ /** * Игнорируем если это наше прочтение * которое получается при синхронизации */ return; } await runQuery(`UPDATE messages SET read = 1 WHERE to_public_key = ? AND from_public_key != ? AND account = ?`, [toPublicKey, publicKey, publicKey]); await updateSyncTime(Date.now()); updateDialog(toPublicKey); addOrUpdateDialogCache(toPublicKey, getDialogCache(toPublicKey).map((message) => { if (!message.readed && message.from_public_key != publicKey) { return { ...message, readed: 1 } } return message; })); }); }, [updateDialog]); /** * Обработчик сообщений для синхронизации своих же сообщений в группе */ usePacket(0x06, async (packet: PacketMessage) => { runTaskInQueue(async () => { const fromPublicKey = packet.getFromPublicKey(); const toPublicKey = packet.getToPublicKey(); const content = packet.getContent(); const timestamp = packet.getTimestamp(); const messageId = packet.getMessageId(); if (!hasGroup(toPublicKey)) { /** * Если это личное сообщение, то игнорируем его здесь * для него есть отдельный слушатель usePacket (снизу) */ return; } if (fromPublicKey != publicKey) { /** * Игнорируем если это сообщения не от нас */ return; } await updateSyncTime(timestamp); const groupKey = await getGroupKey(toPublicKey); if (!groupKey) { log("Group key not found for group " + toPublicKey); error("Message dropped because group key not found for group " + toPublicKey); return; } info("New group message packet received from " + fromPublicKey); let decryptedContent = ''; try { decryptedContent = await decodeWithPassword(groupKey, content); } catch (e) { decryptedContent = ''; } let attachmentsMeta: AttachmentMeta[] = []; let messageAttachments: Attachment[] = []; for (let i = 0; i < packet.getAttachments().length; i++) { const attachment = packet.getAttachments()[i]; log("Attachment received id " + attachment.id + " type " + attachment.type); let nextLength = messageAttachments.push({ ...attachment, blob: "" }); if (attachment.type == AttachmentType.MESSAGES) { /** * Этот тип вложения приходит сразу в blob и не нуждается * в последующем скачивании */ const decryptedBlob = await decodeWithPassword(groupKey, attachment.blob); writeFile(`m/${await generateMd5(attachment.id + publicKey)}`, Buffer.from(await encodeWithPassword(privatePlain, decryptedBlob)).toString('binary')); messageAttachments[nextLength - 1].blob = decryptedBlob; } attachmentsMeta.push({ id: attachment.id, type: attachment.type, preview: attachment.preview, encoding: attachment.encoding, transport: attachment.transport }); } const newMessage: Message = { from_public_key: fromPublicKey, to_public_key: toPublicKey, content: content, timestamp: timestamp, readed: 0, chacha_key: groupKey, from_me: 1, plain_message: decryptedContent, delivered: DeliveredMessageState.DELIVERED, message_id: messageId, attachments: messageAttachments }; await runQuery(` INSERT INTO messages (from_public_key, to_public_key, content, timestamp, read, chacha_key, from_me, plain_message, account, message_id, delivered, attachments) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) `, [fromPublicKey, toPublicKey, content, timestamp, 0, //по умолчанию не прочитаны "", 1, //Свои же сообщения всегда от нас await encodeWithPassword(privatePlain, decryptedContent), publicKey, messageId, DeliveredMessageState.DELIVERED, JSON.stringify(attachmentsMeta)]); /** * Так как у нас в toPublicKey приходит ID группы, * то обновляем диалог по этому ID, а не по fromPublicKey * как это сделано в личных сообщениях */ updateDialog(toPublicKey); let dialogCache = getDialogCache(toPublicKey); if (currentDialogPublicKeyView !== toPublicKey && dialogCache.length > 0) { addOrUpdateDialogCache(toPublicKey, [...dialogCache, newMessage].slice(-MESSAGE_MAX_LOADED)); } }); }, [updateDialog, focused, currentDialogPublicKeyView, viewState, idle, protocolState]); }