diff --git a/app/providers/DialogProvider/useDialogFiber.ts b/app/providers/DialogProvider/useDialogFiber.ts index 8373d2d..112bb5c 100644 --- a/app/providers/DialogProvider/useDialogFiber.ts +++ b/app/providers/DialogProvider/useDialogFiber.ts @@ -27,6 +27,7 @@ import { useMentions } from "../DialogStateProvider.tsx/useMentions"; import { runTaskInQueue } from "./dialogQueue"; import { useProtocolState } from "../ProtocolProvider/useProtocolState"; import { ProtocolState } from "../ProtocolProvider/ProtocolProvider"; +import { useUpdateSyncTime } from "./useUpdateSyncTime"; /** * При вызове будет запущен "фоновый" обработчик @@ -53,27 +54,7 @@ export function useDialogFiber() { const [userInfo] = useUserInformation(publicKey); const { pushMention } = useMentions(); const [protocolState] = useProtocolState(); - - /** - * Обновляет время последней синхронизации для аккаунта - * @param timestamp время - */ - const updateSyncTime = async (timestamp: number) => { - if(protocolState == ProtocolState.SYNCHRONIZATION){ - /** - * Если сейчас идет синхронизация то чтобы при синхронизации - * не создавать нагрузку на базу данных - * по постоянному обновлению, обновляем базу один раз - когда - * приходит пакет о том что синхронизация закончилась - */ - return; - } - await runQuery( - "INSERT INTO accounts_sync_times (account, last_sync) VALUES (?, ?) " + - "ON CONFLICT(account) DO UPDATE SET last_sync = ? WHERE account = ?", - [publicKey, timestamp, timestamp, publicKey] - ); - }; + const updateSyncTime = useUpdateSyncTime(); /** * Лог @@ -82,101 +63,6 @@ export function useDialogFiber() { info("Starting passive fiber for dialog packets"); }, []); - /** - * Нам приходят сообщения от себя самих же при синхронизации - * нужно обрабатывать их особым образом соотвественно - * - * Метод нужен для синхронизации своих сообщений - */ - usePacket(0x06, async (packet: PacketMessage) => { - runTaskInQueue(async () => { - const fromPublicKey = packet.getFromPublicKey(); - const toPublicKey = packet.getToPublicKey(); - const aesChachaKey = packet.getAesChachaKey(); - const content = packet.getContent(); - const timestamp = packet.getTimestamp(); - const messageId = packet.getMessageId(); - if (fromPublicKey != publicKey) { - /** - * Игнорируем если это не сообщение от нас - */ - return; - } - - const chachaKey = await decodeWithPassword(privatePlain, aesChachaKey); - const chachaDecryptedKey = Buffer.from(chachaKey, "binary"); - const key = chachaDecryptedKey.slice(0, 32); - const nonce = chachaDecryptedKey.slice(32); - const decryptedContent = await chacha20Decrypt(content, nonce.toString('hex'), key.toString('hex')); - await updateSyncTime(timestamp); - let attachmentsMeta: any[] = []; - let messageAttachments: Attachment[] = []; - for (let i = 0; i < packet.getAttachments().length; i++) { - const attachment = packet.getAttachments()[i]; - log("Attachment received id " + attachment.id + " type " + attachment.type); - - let nextLength = messageAttachments.push({ - ...attachment, - blob: "" - }); - - if (attachment.type == AttachmentType.MESSAGES) { - /** - * Этот тип вложения приходит сразу в blob и не нуждается - * в последующем скачивании - */ - const decryptedBlob = await decodeWithPassword(chachaDecryptedKey.toString('utf-8'), attachment.blob); - writeFile(`m/${await generateMd5(attachment.id + publicKey)}`, - Buffer.from(await encodeWithPassword(privatePlain, decryptedBlob)).toString('binary')); - messageAttachments[nextLength - 1].blob = decryptedBlob; - } - - attachmentsMeta.push({ - id: attachment.id, - type: attachment.type, - preview: attachment.preview - }); - } - - const newMessage: Message = { - from_public_key: fromPublicKey, - to_public_key: toPublicKey, - content: content, - timestamp: timestamp, - readed: 1, //сообщение прочитано - chacha_key: chachaDecryptedKey.toString('utf-8'), - from_me: 1, //сообщение от нас - plain_message: (decryptedContent as string), - delivered: DeliveredMessageState.DELIVERED, - message_id: messageId, - attachments: messageAttachments - }; - - await runQuery(` - INSERT INTO messages - (from_public_key, to_public_key, content, timestamp, read, chacha_key, from_me, plain_message, account, message_id, delivered, attachments) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) - `, [fromPublicKey, - toPublicKey, - content, - timestamp, - 0, //по умолчанию не прочитаны - "sync:" + aesChachaKey, - 1, //Свои же сообщения всегда от нас - await encodeWithPassword(privatePlain, decryptedContent), - publicKey, - messageId, - DeliveredMessageState.DELIVERED, - JSON.stringify(attachmentsMeta)]); - - updateDialog(toPublicKey); - - let dialogCache = getDialogCache(toPublicKey); - if (currentDialogPublicKeyView !== toPublicKey && dialogCache.length > 0) { - addOrUpdateDialogCache(toPublicKey, [...dialogCache, newMessage].slice(-MESSAGE_MAX_LOADED)); - } - }); - }, [privatePlain, currentDialogPublicKeyView]); - /** * Обработчик сообщений для группы */ @@ -459,47 +345,6 @@ export function useDialogFiber() { }); }, [blocked, muted, updateDialog, focused, currentDialogPublicKeyView, viewState, idle, protocolState]); - /** - * Обработчик синхронизации прочтения личных сообщений - */ - usePacket(0x07, async (packet: PacketRead) => { - runTaskInQueue(async () => { - if (hasGroup(packet.getToPublicKey())) { - /** - * Если это относится к группам, то игнорируем здесь, - * для этого есть отдельный слушатель usePacket ниже - */ - return; - } - const fromPublicKey = packet.getFromPublicKey(); - const toPublicKey = packet.getToPublicKey(); - if (fromPublicKey != publicKey) { - /** - * Игнорируем если это не синхронизация нашего прочтения - */ - return; - } - console.info("PACKED_READ_SYNC"); - await runQuery(`UPDATE messages SET read = 1 WHERE from_public_key = ? AND to_public_key = ? AND account = ?`, - [toPublicKey, fromPublicKey, publicKey]); - - console.info("updating with params ", [fromPublicKey, toPublicKey, publicKey]); - updateDialog(toPublicKey); - log("Read sync packet from other device"); - addOrUpdateDialogCache(fromPublicKey, getDialogCache(fromPublicKey).map((message) => { - if (message.from_public_key == toPublicKey && !message.readed) { - console.info("Marking message as read in cache for dialog with " + fromPublicKey); - console.info({ fromPublicKey, toPublicKey }); - return { - ...message, - readed: 1 - } - } - return message; - })); - }); - }, [updateDialog, publicKey]); - /** * Обработчик прочтения личных сообщений */ @@ -540,6 +385,7 @@ export function useDialogFiber() { })); }); }, [updateDialog, publicKey]); + /** * Обработчик прочтения групповых сообщений */ diff --git a/app/providers/DialogProvider/useSynchronize.ts b/app/providers/DialogProvider/useSynchronize.ts index 332ab79..b7248d3 100644 --- a/app/providers/DialogProvider/useSynchronize.ts +++ b/app/providers/DialogProvider/useSynchronize.ts @@ -6,18 +6,25 @@ import { usePublicKey } from "../AccountProvider/usePublicKey"; import { PacketSync, SyncStatus } from "../ProtocolProvider/protocol/packets/packet.sync"; import { useSender } from "../ProtocolProvider/useSender"; import { usePacket } from "../ProtocolProvider/usePacket"; -import { whenFinish } from "./dialogQueue"; +import { runTaskInQueue, whenFinish } from "./dialogQueue"; import { useProtocol } from "../ProtocolProvider/useProtocol"; import { PacketGroupJoin } from "../ProtocolProvider/protocol/packets/packet.group.join"; import { useGroups } from "./useGroups"; -import { decodeWithPassword, encodeWithPassword } from "@/app/workers/crypto/crypto"; +import { chacha20Decrypt, decodeWithPassword, encodeWithPassword, generateMd5 } from "@/app/workers/crypto/crypto"; import { usePrivatePlain } from "../AccountProvider/usePrivatePlain"; import { GroupStatus } from "../ProtocolProvider/protocol/packets/packet.group.invite.info"; import { useConsoleLogger } from "@/app/hooks/useConsoleLogger"; -import { useNavigate } from "react-router-dom"; import { useDialogsList } from "../DialogListProvider/useDialogsList"; import { useUpdateGroupInformation } from "../InformationProvider/useUpdateGroupInformation"; import { useGroupInviteStatus } from "./useGroupInviteStatus"; +import { Attachment, AttachmentType, PacketMessage } from "../ProtocolProvider/protocol/packets/packet.message"; +import { useUpdateSyncTime } from "./useUpdateSyncTime"; +import { useFileStorage } from "@/app/hooks/useFileStorage"; +import { DeliveredMessageState, Message } from "./DialogProvider"; +import { MESSAGE_MAX_LOADED } from "@/app/constants"; +import { useMemory } from "../MemoryProvider/useMemory"; +import { useDialogsCache } from "./useDialogsCache"; +import { PacketRead } from "../ProtocolProvider/protocol/packets/packet.read"; /** * Хук отвечает за синхронизацию сообщений, запрос синхронизации @@ -29,12 +36,16 @@ export function useSynchronize() { const publicKey = usePublicKey(); const send = useSender(); const {protocol} = useProtocol(); - const {parseGroupString} = useGroups(); + const {parseGroupString, hasGroup} = useGroups(); const privatePlain = usePrivatePlain(); const {error, info} = useConsoleLogger('useSynchronize'); const {setInviteStatusByGroupId} = useGroupInviteStatus(''); const updateGroupInformation = useUpdateGroupInformation(); const {updateDialog} = useDialogsList(); + const updateSyncTime = useUpdateSyncTime(); + const {writeFile} = useFileStorage(); + const { getDialogCache, addOrUpdateDialogCache } = useDialogsCache(); + const [currentDialogPublicKeyView, __] = useMemory("current-dialog-public-key-view", "", true); useEffect(() => { @@ -111,4 +122,139 @@ export function useSynchronize() { setProtocolState(ProtocolState.CONNECTED); } }, [publicKey]); + + /** + * Нам приходят сообщения от себя самих же при синхронизации + * нужно обрабатывать их особым образом соотвественно + * + * Метод нужен для синхронизации своих сообщений + */ + usePacket(0x06, async (packet: PacketMessage) => { + runTaskInQueue(async () => { + const fromPublicKey = packet.getFromPublicKey(); + const toPublicKey = packet.getToPublicKey(); + const aesChachaKey = packet.getAesChachaKey(); + const content = packet.getContent(); + const timestamp = packet.getTimestamp(); + const messageId = packet.getMessageId(); + if (fromPublicKey != publicKey) { + /** + * Игнорируем если это не сообщение от нас + */ + return; + } + + const chachaKey = await decodeWithPassword(privatePlain, aesChachaKey); + const chachaDecryptedKey = Buffer.from(chachaKey, "binary"); + const key = chachaDecryptedKey.slice(0, 32); + const nonce = chachaDecryptedKey.slice(32); + const decryptedContent = await chacha20Decrypt(content, nonce.toString('hex'), key.toString('hex')); + await updateSyncTime(timestamp); + let attachmentsMeta: any[] = []; + let messageAttachments: Attachment[] = []; + for (let i = 0; i < packet.getAttachments().length; i++) { + const attachment = packet.getAttachments()[i]; + + + let nextLength = messageAttachments.push({ + ...attachment, + blob: "" + }); + + if (attachment.type == AttachmentType.MESSAGES) { + /** + * Этот тип вложения приходит сразу в blob и не нуждается + * в последующем скачивании + */ + const decryptedBlob = await decodeWithPassword(chachaDecryptedKey.toString('utf-8'), attachment.blob); + writeFile(`m/${await generateMd5(attachment.id + publicKey)}`, + Buffer.from(await encodeWithPassword(privatePlain, decryptedBlob)).toString('binary')); + messageAttachments[nextLength - 1].blob = decryptedBlob; + } + + attachmentsMeta.push({ + id: attachment.id, + type: attachment.type, + preview: attachment.preview + }); + } + + const newMessage: Message = { + from_public_key: fromPublicKey, + to_public_key: toPublicKey, + content: content, + timestamp: timestamp, + readed: 1, //сообщение прочитано + chacha_key: chachaDecryptedKey.toString('utf-8'), + from_me: 1, //сообщение от нас + plain_message: (decryptedContent as string), + delivered: DeliveredMessageState.DELIVERED, + message_id: messageId, + attachments: messageAttachments + }; + + await runQuery(` + INSERT INTO messages + (from_public_key, to_public_key, content, timestamp, read, chacha_key, from_me, plain_message, account, message_id, delivered, attachments) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `, [fromPublicKey, + toPublicKey, + content, + timestamp, + 0, //по умолчанию не прочитаны + "sync:" + aesChachaKey, + 1, //Свои же сообщения всегда от нас + await encodeWithPassword(privatePlain, decryptedContent), + publicKey, + messageId, + DeliveredMessageState.DELIVERED, + JSON.stringify(attachmentsMeta)]); + + updateDialog(toPublicKey); + + let dialogCache = getDialogCache(toPublicKey); + if (currentDialogPublicKeyView !== toPublicKey && dialogCache.length > 0) { + addOrUpdateDialogCache(toPublicKey, [...dialogCache, newMessage].slice(-MESSAGE_MAX_LOADED)); + } + }); + }, [privatePlain, currentDialogPublicKeyView]); + + /** + * Обработчик синхронизации прочтения личных сообщений + */ + usePacket(0x07, async (packet: PacketRead) => { + runTaskInQueue(async () => { + if (hasGroup(packet.getToPublicKey())) { + /** + * Если это относится к группам, то игнорируем здесь, + * для этого есть отдельный слушатель usePacket ниже + */ + return; + } + const fromPublicKey = packet.getFromPublicKey(); + const toPublicKey = packet.getToPublicKey(); + if (fromPublicKey != publicKey) { + /** + * Игнорируем если это не синхронизация нашего прочтения + */ + return; + } + console.info("PACKED_READ_SYNC"); + await runQuery(`UPDATE messages SET read = 1 WHERE from_public_key = ? AND to_public_key = ? AND account = ?`, + [toPublicKey, fromPublicKey, publicKey]); + + console.info("updating with params ", [fromPublicKey, toPublicKey, publicKey]); + updateDialog(toPublicKey); + addOrUpdateDialogCache(fromPublicKey, getDialogCache(fromPublicKey).map((message) => { + if (message.from_public_key == toPublicKey && !message.readed) { + console.info("Marking message as read in cache for dialog with " + fromPublicKey); + console.info({ fromPublicKey, toPublicKey }); + return { + ...message, + readed: 1 + } + } + return message; + })); + }); + }, [updateDialog, publicKey]); } \ No newline at end of file diff --git a/app/providers/DialogProvider/useUpdateSyncTime.ts b/app/providers/DialogProvider/useUpdateSyncTime.ts new file mode 100644 index 0000000..45ad845 --- /dev/null +++ b/app/providers/DialogProvider/useUpdateSyncTime.ts @@ -0,0 +1,33 @@ +import { usePublicKey } from "../AccountProvider/usePublicKey"; +import { useDatabase } from "../DatabaseProvider/useDatabase"; +import { ProtocolState } from "../ProtocolProvider/ProtocolProvider"; +import { useProtocolState } from "../ProtocolProvider/useProtocolState"; + +export function useUpdateSyncTime() : (timestamp: number) => Promise { + const [protocolState] = useProtocolState(); + const {runQuery} = useDatabase(); + const publicKey = usePublicKey(); + + /** + * Обновляет время последней синхронизации для аккаунта + * @param timestamp время + */ + const updateSyncTime = async (timestamp: number) => { + if(protocolState == ProtocolState.SYNCHRONIZATION){ + /** + * Если сейчас идет синхронизация то чтобы при синхронизации + * не создавать нагрузку на базу данных + * по постоянному обновлению, обновляем базу один раз - когда + * приходит пакет о том что синхронизация закончилась + */ + return; + } + await runQuery( + "INSERT INTO accounts_sync_times (account, last_sync) VALUES (?, ?) " + + "ON CONFLICT(account) DO UPDATE SET last_sync = ? WHERE account = ?", + [publicKey, timestamp, timestamp, publicKey] + ); + }; + + return updateSyncTime; +} \ No newline at end of file