428 lines
20 KiB
TypeScript
428 lines
20 KiB
TypeScript
import { useEffect } from "react";
|
||
import { useProtocolState } from "../ProtocolProvider/useProtocolState";
|
||
import { ProtocolState } from "../ProtocolProvider/ProtocolProvider";
|
||
import { useDatabase } from "../DatabaseProvider/useDatabase";
|
||
import { usePublicKey } from "../AccountProvider/usePublicKey";
|
||
import { PacketSync, SyncStatus } from "../ProtocolProvider/protocol/packets/packet.sync";
|
||
import { useSender } from "../ProtocolProvider/useSender";
|
||
import { usePacket } from "../ProtocolProvider/usePacket";
|
||
import { runTaskInQueue, whenFinish } from "./dialogQueue";
|
||
import { useProtocol } from "../ProtocolProvider/useProtocol";
|
||
import { PacketGroupJoin } from "../ProtocolProvider/protocol/packets/packet.group.join";
|
||
import { useGroups } from "./useGroups";
|
||
import { chacha20Decrypt, decodeWithPassword, encodeWithPassword, generateMd5 } from "@/app/workers/crypto/crypto";
|
||
import { usePrivatePlain } from "../AccountProvider/usePrivatePlain";
|
||
import { GroupStatus } from "../ProtocolProvider/protocol/packets/packet.group.invite.info";
|
||
import { useConsoleLogger } from "@/app/hooks/useConsoleLogger";
|
||
import { useDialogsList } from "../DialogListProvider/useDialogsList";
|
||
import { useUpdateGroupInformation } from "../InformationProvider/useUpdateGroupInformation";
|
||
import { useGroupInviteStatus } from "./useGroupInviteStatus";
|
||
import { Attachment, AttachmentType, PacketMessage } from "../ProtocolProvider/protocol/packets/packet.message";
|
||
import { useUpdateSyncTime } from "./useUpdateSyncTime";
|
||
import { useFileStorage } from "@/app/hooks/useFileStorage";
|
||
import { AttachmentMeta, DeliveredMessageState, Message } from "./DialogProvider";
|
||
import { MESSAGE_MAX_LOADED, TIME_TO_INACTIVE_FOR_MESSAGES_UNREAD } from "@/app/constants";
|
||
import { useMemory } from "../MemoryProvider/useMemory";
|
||
import { useDialogsCache } from "./useDialogsCache";
|
||
import { PacketRead } from "../ProtocolProvider/protocol/packets/packet.read";
|
||
import { useLogger } from "@/app/hooks/useLogger";
|
||
import { useIdle } from "@mantine/hooks";
|
||
import { useViewPanelsState } from "@/app/hooks/useViewPanelsState";
|
||
import { useWindowFocus } from "@/app/hooks/useWindowFocus";
|
||
|
||
/**
|
||
* Хук отвечает за синхронизацию сообщений, запрос синхронизации
|
||
* при подключении
|
||
*/
|
||
export function useSynchronize() {
|
||
const [protocolState, setProtocolState] = useProtocolState();
|
||
const {getQuery, runQuery} = useDatabase();
|
||
const publicKey = usePublicKey();
|
||
const send = useSender();
|
||
const {protocol} = useProtocol();
|
||
const {parseGroupString, hasGroup, getGroupKey} = useGroups();
|
||
const privatePlain = usePrivatePlain();
|
||
const {error, info} = useConsoleLogger('useSynchronize');
|
||
const log = useLogger('useSynchronize');
|
||
const {setInviteStatusByGroupId} = useGroupInviteStatus('');
|
||
const updateGroupInformation = useUpdateGroupInformation();
|
||
const {updateDialog} = useDialogsList();
|
||
const idle = useIdle(TIME_TO_INACTIVE_FOR_MESSAGES_UNREAD * 1000);
|
||
const updateSyncTime = useUpdateSyncTime();
|
||
const {writeFile} = useFileStorage();
|
||
const { getDialogCache, addOrUpdateDialogCache } = useDialogsCache();
|
||
const [currentDialogPublicKeyView, __] = useMemory("current-dialog-public-key-view", "", true);
|
||
const [viewState] = useViewPanelsState();
|
||
const focused = useWindowFocus();
|
||
|
||
|
||
useEffect(() => {
|
||
if(protocol.handshakeExchangeComplete){
|
||
trySync();
|
||
}
|
||
}, [protocol.handshakeExchangeComplete]);
|
||
|
||
const trySync = async () => {
|
||
const lastSyncTime = await getQuery(`SELECT last_sync FROM accounts_sync_times WHERE account = ?`, [publicKey]);
|
||
sendSynchronize(lastSyncTime?.last_sync ?? 0);
|
||
}
|
||
|
||
const sendSynchronize = (timestamp: number) => {
|
||
const packet = new PacketSync();
|
||
packet.setStatus(0);
|
||
packet.setTimestamp(timestamp);
|
||
send(packet);
|
||
}
|
||
|
||
/**
|
||
* Пакет приходит либо при входе в группу (но там используется слушатель once), либо при
|
||
* синхронизации. В данном случае этот пакет прийдет только при синхронизации
|
||
*/
|
||
usePacket(20, async (packet: PacketGroupJoin) => {
|
||
const decryptedGroupString = await decodeWithPassword(privatePlain, packet.getGroupString());
|
||
const parsed = await parseGroupString(decryptedGroupString);
|
||
if(!parsed){
|
||
error("Received invalid group string, skipping");
|
||
return;
|
||
}
|
||
const groupStatus = packet.getGroupStatus();
|
||
if(groupStatus != GroupStatus.JOINED){
|
||
error("Cannot sync group that is not joined, skipping");
|
||
return;
|
||
}
|
||
const secureKey = await encodeWithPassword(privatePlain, parsed.encryptKey);
|
||
await runQuery(`
|
||
INSERT INTO groups (account, group_id, title, description, key) VALUES (?, ?, ?, ?, ?)
|
||
`, [publicKey, parsed.groupId, parsed.title, parsed.description, secureKey]);
|
||
updateDialog("#group:" + parsed.groupId);
|
||
setInviteStatusByGroupId(parsed.groupId, GroupStatus.JOINED);
|
||
updateGroupInformation({
|
||
groupId: parsed.groupId,
|
||
title: parsed.title,
|
||
description: parsed.description
|
||
});
|
||
info("Group synchronized " + parsed.groupId);
|
||
}, [publicKey]);
|
||
|
||
usePacket(25, async (packet: PacketSync) => {
|
||
const status = packet.getStatus();
|
||
if(status == SyncStatus.BATCH_START){
|
||
setProtocolState(ProtocolState.SYNCHRONIZATION);
|
||
}
|
||
if(status == SyncStatus.BATCH_END){
|
||
/**
|
||
* Этот Promise ждет пока все сообщения синхронизируются и обработаются, только
|
||
* после этого
|
||
*/
|
||
await whenFinish();
|
||
await runQuery(
|
||
"INSERT INTO accounts_sync_times (account, last_sync) VALUES (?, ?) " +
|
||
"ON CONFLICT(account) DO UPDATE SET last_sync = ? WHERE account = ?",
|
||
[publicKey, packet.getTimestamp(), packet.getTimestamp(), publicKey]
|
||
);
|
||
console.info("Batch complete", publicKey, packet.getTimestamp());
|
||
trySync();
|
||
}
|
||
if(status == SyncStatus.NOT_NEEDED){
|
||
/**
|
||
* Синхронизация не нужна, все данные актуальны
|
||
*/
|
||
setProtocolState(ProtocolState.CONNECTED);
|
||
}
|
||
}, [publicKey]);
|
||
|
||
/**
|
||
* Нам приходят сообщения от себя самих же при синхронизации
|
||
* нужно обрабатывать их особым образом соотвественно
|
||
*
|
||
* Метод нужен для синхронизации своих сообщений
|
||
*/
|
||
usePacket(0x06, async (packet: PacketMessage) => {
|
||
runTaskInQueue(async () => {
|
||
const fromPublicKey = packet.getFromPublicKey();
|
||
const toPublicKey = packet.getToPublicKey();
|
||
const aesChachaKey = packet.getAesChachaKey();
|
||
const content = packet.getContent();
|
||
const timestamp = packet.getTimestamp();
|
||
const messageId = packet.getMessageId();
|
||
if(hasGroup(toPublicKey)){
|
||
/**
|
||
* Игнорируем если это сообщение для группы, для них есть отдельный слушатель usePacket ниже
|
||
*/
|
||
return;
|
||
}
|
||
|
||
if (fromPublicKey != publicKey) {
|
||
/**
|
||
* Игнорируем если это не сообщение от нас
|
||
*/
|
||
return;
|
||
}
|
||
|
||
const chachaKey = await decodeWithPassword(privatePlain, aesChachaKey);
|
||
const chachaDecryptedKey = Buffer.from(chachaKey, "binary");
|
||
const key = chachaDecryptedKey.slice(0, 32);
|
||
const nonce = chachaDecryptedKey.slice(32);
|
||
const decryptedContent = await chacha20Decrypt(content, nonce.toString('hex'), key.toString('hex'));
|
||
await updateSyncTime(timestamp);
|
||
let attachmentsMeta: AttachmentMeta[] = [];
|
||
let messageAttachments: Attachment[] = [];
|
||
for (let i = 0; i < packet.getAttachments().length; i++) {
|
||
const attachment = packet.getAttachments()[i];
|
||
|
||
|
||
let nextLength = messageAttachments.push({
|
||
...attachment,
|
||
blob: ""
|
||
});
|
||
|
||
if (attachment.type == AttachmentType.MESSAGES) {
|
||
/**
|
||
* Этот тип вложения приходит сразу в blob и не нуждается
|
||
* в последующем скачивании
|
||
*/
|
||
const decryptedBlob = await decodeWithPassword(chachaDecryptedKey.toString('utf-8'), attachment.blob);
|
||
writeFile(`m/${await generateMd5(attachment.id + publicKey)}`,
|
||
Buffer.from(await encodeWithPassword(privatePlain, decryptedBlob)).toString('binary'));
|
||
messageAttachments[nextLength - 1].blob = decryptedBlob;
|
||
}
|
||
|
||
attachmentsMeta.push({
|
||
id: attachment.id,
|
||
type: attachment.type,
|
||
preview: attachment.preview,
|
||
encoding: attachment.encoding,
|
||
transport_server: attachment.transport_server,
|
||
transport_tag: attachment.transport_tag
|
||
});
|
||
}
|
||
|
||
const newMessage: Message = {
|
||
from_public_key: fromPublicKey,
|
||
to_public_key: toPublicKey,
|
||
content: content,
|
||
timestamp: timestamp,
|
||
readed: 1, //сообщение прочитано
|
||
chacha_key: chachaDecryptedKey.toString('utf-8'),
|
||
from_me: 1, //сообщение от нас
|
||
plain_message: (decryptedContent as string),
|
||
delivered: DeliveredMessageState.DELIVERED,
|
||
message_id: messageId,
|
||
attachments: messageAttachments
|
||
};
|
||
|
||
await runQuery(`
|
||
INSERT INTO messages
|
||
(from_public_key, to_public_key, content, timestamp, read, chacha_key, from_me, plain_message, account, message_id, delivered, attachments) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
`, [fromPublicKey,
|
||
toPublicKey,
|
||
content,
|
||
timestamp,
|
||
0, //по умолчанию не прочитаны
|
||
"sync:" + aesChachaKey,
|
||
1, //Свои же сообщения всегда от нас
|
||
await encodeWithPassword(privatePlain, decryptedContent),
|
||
publicKey,
|
||
messageId,
|
||
DeliveredMessageState.DELIVERED,
|
||
JSON.stringify(attachmentsMeta)]);
|
||
|
||
updateDialog(toPublicKey);
|
||
|
||
let dialogCache = getDialogCache(toPublicKey);
|
||
if (currentDialogPublicKeyView !== toPublicKey && dialogCache.length > 0) {
|
||
addOrUpdateDialogCache(toPublicKey, [...dialogCache, newMessage].slice(-MESSAGE_MAX_LOADED));
|
||
}
|
||
});
|
||
}, [privatePlain, currentDialogPublicKeyView]);
|
||
|
||
/**
|
||
* Обработчик синхронизации прочтения личных сообщений
|
||
*/
|
||
usePacket(0x07, async (packet: PacketRead) => {
|
||
runTaskInQueue(async () => {
|
||
if (hasGroup(packet.getToPublicKey())) {
|
||
/**
|
||
* Если это относится к группам, то игнорируем здесь,
|
||
* для этого есть отдельный слушатель usePacket ниже
|
||
*/
|
||
return;
|
||
}
|
||
const fromPublicKey = packet.getFromPublicKey();
|
||
const toPublicKey = packet.getToPublicKey();
|
||
if (fromPublicKey != publicKey) {
|
||
/**
|
||
* Игнорируем если это не синхронизация нашего прочтения
|
||
*/
|
||
return;
|
||
}
|
||
await runQuery(`UPDATE messages SET read = 1 WHERE from_public_key = ? AND to_public_key = ? AND account = ?`,
|
||
[toPublicKey, fromPublicKey, publicKey]);
|
||
|
||
updateDialog(toPublicKey);
|
||
addOrUpdateDialogCache(fromPublicKey, getDialogCache(fromPublicKey).map((message) => {
|
||
if (message.from_public_key == toPublicKey && !message.readed) {
|
||
console.info({ fromPublicKey, toPublicKey });
|
||
return {
|
||
...message,
|
||
readed: 1
|
||
}
|
||
}
|
||
return message;
|
||
}));
|
||
});
|
||
}, [updateDialog, publicKey]);
|
||
|
||
/**
|
||
* Обработчик синхронизации прочтения групповых сообщений
|
||
*/
|
||
usePacket(0x07, async (packet: PacketRead) => {
|
||
runTaskInQueue(async () => {
|
||
const fromPublicKey = packet.getFromPublicKey();
|
||
const toPublicKey = packet.getToPublicKey();
|
||
if (!hasGroup(toPublicKey)) {
|
||
/**
|
||
* Если это не относится к группам, то игнорируем здесь,
|
||
* для этого есть отдельный слушатель usePacket выше
|
||
*/
|
||
return;
|
||
}
|
||
if(fromPublicKey != publicKey){
|
||
/**
|
||
* Игнорируем если это наше прочтение
|
||
* которое получается при синхронизации
|
||
*/
|
||
return;
|
||
}
|
||
await runQuery(`UPDATE messages SET read = 1 WHERE to_public_key = ? AND from_public_key != ? AND account = ?`,
|
||
[toPublicKey, publicKey, publicKey]);
|
||
await updateSyncTime(Date.now());
|
||
updateDialog(toPublicKey);
|
||
addOrUpdateDialogCache(toPublicKey, getDialogCache(toPublicKey).map((message) => {
|
||
if (!message.readed && message.from_public_key != publicKey) {
|
||
return {
|
||
...message,
|
||
readed: 1
|
||
}
|
||
}
|
||
return message;
|
||
}));
|
||
});
|
||
}, [updateDialog]);
|
||
|
||
/**
|
||
* Обработчик сообщений для синхронизации своих же сообщений в группе
|
||
*/
|
||
usePacket(0x06, async (packet: PacketMessage) => {
|
||
runTaskInQueue(async () => {
|
||
const fromPublicKey = packet.getFromPublicKey();
|
||
const toPublicKey = packet.getToPublicKey();
|
||
const content = packet.getContent();
|
||
const timestamp = packet.getTimestamp();
|
||
const messageId = packet.getMessageId();
|
||
if (!hasGroup(toPublicKey)) {
|
||
/**
|
||
* Если это личное сообщение, то игнорируем его здесь
|
||
* для него есть отдельный слушатель usePacket (снизу)
|
||
*/
|
||
return;
|
||
}
|
||
if (fromPublicKey != publicKey) {
|
||
/**
|
||
* Игнорируем если это сообщения не от нас
|
||
*/
|
||
return;
|
||
}
|
||
await updateSyncTime(timestamp);
|
||
const groupKey = await getGroupKey(toPublicKey);
|
||
if (!groupKey) {
|
||
log("Group key not found for group " + toPublicKey);
|
||
error("Message dropped because group key not found for group " + toPublicKey);
|
||
return;
|
||
}
|
||
info("New group message packet received from " + fromPublicKey);
|
||
|
||
let decryptedContent = '';
|
||
|
||
try {
|
||
decryptedContent = await decodeWithPassword(groupKey, content);
|
||
} catch (e) {
|
||
decryptedContent = '';
|
||
}
|
||
|
||
let attachmentsMeta: AttachmentMeta[] = [];
|
||
let messageAttachments: Attachment[] = [];
|
||
for (let i = 0; i < packet.getAttachments().length; i++) {
|
||
const attachment = packet.getAttachments()[i];
|
||
log("Attachment received id " + attachment.id + " type " + attachment.type);
|
||
|
||
let nextLength = messageAttachments.push({
|
||
...attachment,
|
||
blob: ""
|
||
});
|
||
|
||
if (attachment.type == AttachmentType.MESSAGES) {
|
||
/**
|
||
* Этот тип вложения приходит сразу в blob и не нуждается
|
||
* в последующем скачивании
|
||
*/
|
||
const decryptedBlob = await decodeWithPassword(groupKey, attachment.blob);
|
||
writeFile(`m/${await generateMd5(attachment.id + publicKey)}`,
|
||
Buffer.from(await encodeWithPassword(privatePlain, decryptedBlob)).toString('binary'));
|
||
messageAttachments[nextLength - 1].blob = decryptedBlob;
|
||
}
|
||
|
||
attachmentsMeta.push({
|
||
id: attachment.id,
|
||
type: attachment.type,
|
||
preview: attachment.preview,
|
||
encoding: attachment.encoding,
|
||
transport_server: attachment.transport_server,
|
||
transport_tag: attachment.transport_tag
|
||
});
|
||
}
|
||
|
||
const newMessage: Message = {
|
||
from_public_key: fromPublicKey,
|
||
to_public_key: toPublicKey,
|
||
content: content,
|
||
timestamp: timestamp,
|
||
readed: 0,
|
||
chacha_key: groupKey,
|
||
from_me: 1,
|
||
plain_message: decryptedContent,
|
||
delivered: DeliveredMessageState.DELIVERED,
|
||
message_id: messageId,
|
||
attachments: messageAttachments
|
||
};
|
||
|
||
await runQuery(`
|
||
INSERT INTO messages
|
||
(from_public_key, to_public_key, content, timestamp, read, chacha_key, from_me, plain_message, account, message_id, delivered, attachments) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
|
||
`, [fromPublicKey,
|
||
toPublicKey,
|
||
content,
|
||
timestamp,
|
||
0, //по умолчанию не прочитаны
|
||
"",
|
||
1, //Свои же сообщения всегда от нас
|
||
await encodeWithPassword(privatePlain, decryptedContent),
|
||
publicKey,
|
||
messageId,
|
||
DeliveredMessageState.DELIVERED,
|
||
JSON.stringify(attachmentsMeta)]);
|
||
|
||
/**
|
||
* Так как у нас в toPublicKey приходит ID группы,
|
||
* то обновляем диалог по этому ID, а не по fromPublicKey
|
||
* как это сделано в личных сообщениях
|
||
*/
|
||
updateDialog(toPublicKey);
|
||
|
||
let dialogCache = getDialogCache(toPublicKey);
|
||
if (currentDialogPublicKeyView !== toPublicKey && dialogCache.length > 0) {
|
||
addOrUpdateDialogCache(toPublicKey, [...dialogCache, newMessage].slice(-MESSAGE_MAX_LOADED));
|
||
}
|
||
});
|
||
}, [updateDialog, focused, currentDialogPublicKeyView, viewState, idle, protocolState]);
|
||
} |