Files
desktop/app/providers/DialogProvider/useDialogFiber.ts

595 lines
29 KiB
TypeScript
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import { useContext, useEffect } from "react";
import { Attachment, AttachmentType, PacketMessage } from "../ProtocolProvider/protocol/packets/packet.message";
import { usePacket } from "../ProtocolProvider/usePacket";
import { BlacklistContext } from "../BlacklistProvider/BlacklistProvider";
import { useLogger } from "@/app/hooks/useLogger";
import { useMemory } from "../MemoryProvider/useMemory";
import { useIdle } from "@mantine/hooks";
import { useNotification } from "@/app/hooks/useNotification";
import { useWindowFocus } from "@/app/hooks/useWindowFocus";
import { MESSAGE_MAX_LOADED, TIME_TO_INACTIVE_FOR_MESSAGES_UNREAD } from "@/app/constants";
import { useDialogsCache } from "./useDialogsCache";
import { useDatabase } from "@/app/providers/DatabaseProvider/useDatabase";
import { usePrivatePlain } from "../AccountProvider/usePrivatePlain";
import { usePublicKey } from "../AccountProvider/usePublicKey";
import { chacha20Decrypt, decodeWithPassword, decrypt, encodeWithPassword, generateMd5 } from "@/app/crypto/crypto";
import { DeliveredMessageState, Message } from "./DialogProvider";
import { PacketRead } from "../ProtocolProvider/protocol/packets/packet.read";
import { PacketDelivery } from "../ProtocolProvider/protocol/packets/packet.delivery";
import { useConsoleLogger } from "@/app/hooks/useConsoleLogger";
import { useViewPanelsState, ViewPanelsState } from "@/app/hooks/useViewPanelsState";
import { useFileStorage } from "@/app/hooks/useFileStorage";
import { useDialogsList } from "../DialogListProvider/useDialogsList";
import { useGroups } from "./useGroups";
import { useDialogState } from "../DialogStateProvider.tsx/useDialogState";
import { useUserInformation } from "../InformationProvider/useUserInformation";
import { useMentions } from "../DialogStateProvider.tsx/useMentions";
import { runTaskInQueue } from "./dialogQueue";
import { useProtocolState } from "../ProtocolProvider/useProtocolState";
import { ProtocolState } from "../ProtocolProvider/ProtocolProvider";
/**
* При вызове будет запущен "фоновый" обработчик
* входящих пакетов сообщений, который будет обрабатывать их и сохранять
* в базу данных в кэше или в базе данных
*/
export function useDialogFiber() {
const { blocked } = useContext(BlacklistContext);
const { runQuery } = useDatabase();
const privatePlain = usePrivatePlain();
const publicKey = usePublicKey();
const log = useLogger('useDialogFiber');
const [currentDialogPublicKeyView, _] = useMemory("current-dialog-public-key-view", "", true);
const idle = useIdle(TIME_TO_INACTIVE_FOR_MESSAGES_UNREAD * 1000);
const notify = useNotification();
const focused = useWindowFocus();
const { getDialogCache, addOrUpdateDialogCache } = useDialogsCache();
const { info, error } = useConsoleLogger('useDialogFiber');
const [viewState] = useViewPanelsState();
const { writeFile } = useFileStorage();
const { updateDialog } = useDialogsList();
const { hasGroup, getGroupKey, normalize } = useGroups();
const { muted } = useDialogState();
const [userInfo] = useUserInformation(publicKey);
const { pushMention } = useMentions();
const [protocolState] = useProtocolState();
/**
* Обновляет время последней синхронизации для аккаунта
* @param timestamp время
*/
const updateSyncTime = async (timestamp: number) => {
if(protocolState == ProtocolState.SYNCHRONIZATION){
/**
* Если сейчас идет синхронизация то чтобы при синхронизации
* не создавать нагрузку на базу данных
* по постоянному обновлению, обновляем базу один раз - когда
* приходит пакет о том что синхронизация закончилась
*/
return;
}
await runQuery(
"INSERT INTO accounts_sync_times (account, last_sync) VALUES (?, ?) " +
"ON CONFLICT(account) DO UPDATE SET last_sync = ? WHERE account = ?",
[publicKey, timestamp, timestamp, publicKey]
);
};
/**
* Лог
*/
useEffect(() => {
info("Starting passive fiber for dialog packets");
}, []);
/**
* Нам приходят сообщения от себя самих же при синхронизации
* нужно обрабатывать их особым образом соотвественно
*
* Метод нужен для синхронизации своих сообщений
*/
usePacket(0x06, async (packet: PacketMessage) => {
runTaskInQueue(async () => {
const fromPublicKey = packet.getFromPublicKey();
const toPublicKey = packet.getToPublicKey();
const aesChachaKey = packet.getAesChachaKey();
const content = packet.getContent();
const timestamp = packet.getTimestamp();
const messageId = packet.getMessageId();
if (fromPublicKey != publicKey) {
/**
* Игнорируем если это не сообщение от нас
*/
return;
}
const chachaKey = await decodeWithPassword(privatePlain, aesChachaKey);
const chachaDecryptedKey = Buffer.from(chachaKey, "binary");
const key = chachaDecryptedKey.slice(0, 32);
const nonce = chachaDecryptedKey.slice(32);
const decryptedContent = await chacha20Decrypt(content, nonce.toString('hex'), key.toString('hex'));
await updateSyncTime(timestamp);
let attachmentsMeta: any[] = [];
let messageAttachments: Attachment[] = [];
for (let i = 0; i < packet.getAttachments().length; i++) {
const attachment = packet.getAttachments()[i];
log("Attachment received id " + attachment.id + " type " + attachment.type);
let nextLength = messageAttachments.push({
...attachment,
blob: ""
});
if (attachment.type == AttachmentType.MESSAGES) {
/**
* Этот тип вложения приходит сразу в blob и не нуждается
* в последующем скачивании
*/
const decryptedBlob = await decodeWithPassword(chachaDecryptedKey.toString('utf-8'), attachment.blob);
writeFile(`m/${await generateMd5(attachment.id + publicKey)}`,
Buffer.from(await encodeWithPassword(privatePlain, decryptedBlob)).toString('binary'));
messageAttachments[nextLength - 1].blob = decryptedBlob;
}
attachmentsMeta.push({
id: attachment.id,
type: attachment.type,
preview: attachment.preview
});
}
const newMessage: Message = {
from_public_key: fromPublicKey,
to_public_key: toPublicKey,
content: content,
timestamp: timestamp,
readed: 1, //сообщение прочитано
chacha_key: chachaDecryptedKey.toString('utf-8'),
from_me: 1, //сообщение от нас
plain_message: (decryptedContent as string),
delivered: DeliveredMessageState.DELIVERED,
message_id: messageId,
attachments: messageAttachments
};
await runQuery(`
INSERT INTO messages
(from_public_key, to_public_key, content, timestamp, read, chacha_key, from_me, plain_message, account, message_id, delivered, attachments) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, [fromPublicKey,
toPublicKey,
content,
timestamp,
0, //по умолчанию не прочитаны
"sync:" + aesChachaKey,
1, //Свои же сообщения всегда от нас
await encodeWithPassword(privatePlain, decryptedContent),
publicKey,
messageId,
DeliveredMessageState.DELIVERED,
JSON.stringify(attachmentsMeta)]);
updateDialog(toPublicKey);
let dialogCache = getDialogCache(toPublicKey);
if (currentDialogPublicKeyView !== toPublicKey && dialogCache.length > 0) {
addOrUpdateDialogCache(toPublicKey, [...dialogCache, newMessage].slice(-MESSAGE_MAX_LOADED));
}
});
}, [privatePlain, currentDialogPublicKeyView]);
/**
* Обработчик сообщений для группы
*/
usePacket(0x06, async (packet: PacketMessage) => {
runTaskInQueue(async () => {
const fromPublicKey = packet.getFromPublicKey();
const toPublicKey = packet.getToPublicKey();
const content = packet.getContent();
const timestamp = packet.getTimestamp();
const messageId = packet.getMessageId();
if (!hasGroup(toPublicKey)) {
/**
* Если это личное сообщение, то игнорируем его здесь
* для него есть отдельный слушатель usePacket (снизу)
*/
return;
}
if (fromPublicKey == publicKey) {
/**
* Игнорируем свои же сообщения,
* такое получается при пакете синхронизации
*/
return;
}
await updateSyncTime(timestamp);
const groupKey = await getGroupKey(toPublicKey);
if (!groupKey) {
log("Group key not found for group " + toPublicKey);
error("Message dropped because group key not found for group " + toPublicKey);
return;
}
info("New group message packet received from " + fromPublicKey);
let decryptedContent = '';
try {
decryptedContent = await decodeWithPassword(groupKey, content);
} catch (e) {
decryptedContent = '';
}
let attachmentsMeta: any[] = [];
let messageAttachments: Attachment[] = [];
for (let i = 0; i < packet.getAttachments().length; i++) {
const attachment = packet.getAttachments()[i];
log("Attachment received id " + attachment.id + " type " + attachment.type);
let nextLength = messageAttachments.push({
...attachment,
blob: ""
});
if (attachment.type == AttachmentType.MESSAGES) {
/**
* Этот тип вложения приходит сразу в blob и не нуждается
* в последующем скачивании
*/
const decryptedBlob = await decodeWithPassword(groupKey, attachment.blob);
writeFile(`m/${await generateMd5(attachment.id + publicKey)}`,
Buffer.from(await encodeWithPassword(privatePlain, decryptedBlob)).toString('binary'));
messageAttachments[nextLength - 1].blob = decryptedBlob;
}
attachmentsMeta.push({
id: attachment.id,
type: attachment.type,
preview: attachment.preview
});
}
const newMessage: Message = {
from_public_key: fromPublicKey,
to_public_key: toPublicKey,
content: content,
timestamp: timestamp,
readed: idle ? 0 : 1,
chacha_key: groupKey,
from_me: fromPublicKey == publicKey ? 1 : 0,
plain_message: decryptedContent,
delivered: DeliveredMessageState.DELIVERED,
message_id: messageId,
attachments: messageAttachments
};
await runQuery(`
INSERT INTO messages
(from_public_key, to_public_key, content, timestamp, read, chacha_key, from_me, plain_message, account, message_id, delivered, attachments) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, [fromPublicKey,
toPublicKey,
content,
timestamp,
/**если текущий открытый диалог == беседе (которая приходит в toPublicKey) */
(currentDialogPublicKeyView == toPublicKey && !idle && viewState != ViewPanelsState.DIALOGS_PANEL_ONLY) ? 1 : 0,
'',
0,
await encodeWithPassword(privatePlain, decryptedContent),
publicKey,
messageId,
DeliveredMessageState.DELIVERED,
JSON.stringify(attachmentsMeta)]);
/**
* Так как у нас в toPublicKey приходит ID группы,
* то обновляем диалог по этому ID, а не по fromPublicKey
* как это сделано в личных сообщениях
*/
updateDialog(toPublicKey);
if (((normalize(currentDialogPublicKeyView) !== normalize(toPublicKey) || viewState == ViewPanelsState.DIALOGS_PANEL_ONLY) &&
(timestamp + TIME_TO_INACTIVE_FOR_MESSAGES_UNREAD) > (Date.now() / 1000)) || !focused) {
/**
* Условие со временем нужно для того,
* чтобы когда приходит пачка сообщений с сервера в момент того как
* пользователь был неактивен, не слать уведомления по всем этим сообщениям
*/
let mentionFlag = false;
if ((newMessage.from_public_key != publicKey) && (decryptedContent.includes(`@${userInfo.username}`) || decryptedContent.includes(`@all`))) {
/**
* Если в сообщении есть упоминание текущего пользователя или @all,
* при этом сообщение отправляли не мы,
* то добавляем упоминание в состояние диалога.
*
* TODO: сделать чтобы all работал только для админов группы
*/
mentionFlag = true;
}
if ((!muted.includes(toPublicKey) || mentionFlag) && protocolState != ProtocolState.SYNCHRONIZATION) {
/**
* Если группа не в мутие или есть упоминание - отправляем уведомление
*/
notify("New message", "You have a new message");
}
if (mentionFlag) {
/**
* Если в сообщении есть упоминание текущего пользователя или @all,
* то добавляем упоминание в состояние диалога
*
* TODO: сделать чтобы all работал только для админов группы
*/
pushMention({
dialog_id: toPublicKey,
message_id: messageId
});
}
}
let dialogCache = getDialogCache(toPublicKey);
if (currentDialogPublicKeyView !== toPublicKey && dialogCache.length > 0) {
addOrUpdateDialogCache(toPublicKey, [...dialogCache, newMessage].slice(-MESSAGE_MAX_LOADED));
}
});
}, [blocked, muted, updateDialog, focused, currentDialogPublicKeyView, viewState, idle]);
/**
* Обработчик личных сообщений
*/
usePacket(0x06, async (packet: PacketMessage) => {
runTaskInQueue(async () => {
const fromPublicKey = packet.getFromPublicKey();
if (fromPublicKey == publicKey) {
/**
* Игнорируем свои же сообщения,
* такое получается при пакете синхронизации
*/
return;
}
const toPublicKey = packet.getToPublicKey();
const content = packet.getContent();
const chachaKey = packet.getChachaKey();
const timestamp = packet.getTimestamp();
const messageId = packet.getMessageId();
if (hasGroup(toPublicKey)) {
/**
* Если это групповое сообщение, то игнорируем его здесь
* для него есть отдельный слушатель usePacket
*/
return;
}
info("New message packet received from " + fromPublicKey);
if (blocked.includes(fromPublicKey)) {
/**
* Если пользователь заблокирован и это не групповое сообщение,
* то игнорируем сообщение
*/
log("Message from blocked user, ignore " + fromPublicKey);
return;
}
if (privatePlain == "") {
return;
}
await updateSyncTime(timestamp);
const chachaDecryptedKey = Buffer.from(await decrypt(chachaKey, privatePlain), "binary");
const key = chachaDecryptedKey.slice(0, 32);
const nonce = chachaDecryptedKey.slice(32);
const decryptedContent = await chacha20Decrypt(content, nonce.toString('hex'), key.toString('hex'));
let attachmentsMeta: any[] = [];
let messageAttachments: Attachment[] = [];
for (let i = 0; i < packet.getAttachments().length; i++) {
const attachment = packet.getAttachments()[i];
log("Attachment received id " + attachment.id + " type " + attachment.type);
let nextLength = messageAttachments.push({
...attachment,
blob: ""
});
if (attachment.type == AttachmentType.MESSAGES) {
/**
* Этот тип вложения приходит сразу в blob и не нуждается
* в последующем скачивании
*/
const decryptedBlob = await decodeWithPassword(chachaDecryptedKey.toString('utf-8'), attachment.blob);
writeFile(`m/${await generateMd5(attachment.id + publicKey)}`,
Buffer.from(await encodeWithPassword(privatePlain, decryptedBlob)).toString('binary'));
messageAttachments[nextLength - 1].blob = decryptedBlob;
}
attachmentsMeta.push({
id: attachment.id,
type: attachment.type,
preview: attachment.preview
});
}
const newMessage: Message = {
from_public_key: fromPublicKey,
to_public_key: toPublicKey,
content: content,
timestamp: timestamp,
readed: idle ? 0 : 1,
chacha_key: chachaDecryptedKey.toString('utf-8'),
from_me: fromPublicKey == publicKey ? 1 : 0,
plain_message: (decryptedContent as string),
delivered: DeliveredMessageState.DELIVERED,
message_id: messageId,
attachments: messageAttachments
};
await runQuery(`
INSERT INTO messages
(from_public_key, to_public_key, content, timestamp, read, chacha_key, from_me, plain_message, account, message_id, delivered, attachments) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
`, [fromPublicKey,
toPublicKey,
content,
timestamp,
(currentDialogPublicKeyView == fromPublicKey && !idle && viewState != ViewPanelsState.DIALOGS_PANEL_ONLY) ? 1 : 0,
chachaKey,
0,
await encodeWithPassword(privatePlain, decryptedContent),
publicKey,
messageId,
DeliveredMessageState.DELIVERED,
JSON.stringify(attachmentsMeta)]);
log("New message received from " + fromPublicKey);
updateDialog(fromPublicKey);
if (((currentDialogPublicKeyView !== fromPublicKey || viewState == ViewPanelsState.DIALOGS_PANEL_ONLY) &&
(timestamp + TIME_TO_INACTIVE_FOR_MESSAGES_UNREAD) > (Date.now() / 1000)) || !focused) {
/**
* Условие со временем нужно для того,
* чтобы когда приходит пачка сообщений с сервера в момент того как
* пользователь был неактивен, не слать уведомления по всем этим сообщениям
*/
if (!muted.includes(fromPublicKey) || protocolState == ProtocolState.SYNCHRONIZATION) {
/**
* Если пользователь в муте или сейчас идет синхронизация - не отправляем уведомление
*/
notify("New message", "You have a new message");
}
}
let dialogCache = getDialogCache(fromPublicKey);
if (currentDialogPublicKeyView !== fromPublicKey && dialogCache.length > 0) {
addOrUpdateDialogCache(fromPublicKey, [...dialogCache, newMessage].slice(-MESSAGE_MAX_LOADED));
}
});
}, [blocked, muted, updateDialog, focused, currentDialogPublicKeyView, viewState, idle]);
/**
* Обработчик синхронизации прочтения личных сообщений
*/
usePacket(0x07, async (packet: PacketRead) => {
runTaskInQueue(async () => {
if (hasGroup(packet.getToPublicKey())) {
/**
* Если это относится к группам, то игнорируем здесь,
* для этого есть отдельный слушатель usePacket ниже
*/
return;
}
const fromPublicKey = packet.getFromPublicKey();
const toPublicKey = packet.getToPublicKey();
if (fromPublicKey != publicKey) {
/**
* Игнорируем если это не синхронизация нашего прочтения
*/
return;
}
console.info("PACKED_READ_SYNC");
await runQuery(`UPDATE messages SET read = 1 WHERE from_public_key = ? AND to_public_key = ? AND account = ?`,
[toPublicKey, fromPublicKey, publicKey]);
console.info("updating with params ", [fromPublicKey, toPublicKey, publicKey]);
updateDialog(toPublicKey);
log("Read sync packet from other device");
addOrUpdateDialogCache(fromPublicKey, getDialogCache(fromPublicKey).map((message) => {
if (message.from_public_key == toPublicKey && !message.readed) {
console.info("Marking message as read in cache for dialog with " + fromPublicKey);
console.info({ fromPublicKey, toPublicKey });
return {
...message,
readed: 1
}
}
return message;
}));
});
}, [updateDialog, publicKey]);
/**
* Обработчик прочтения личных сообщений
*/
usePacket(0x07, async (packet: PacketRead) => {
runTaskInQueue(async () => {
if (hasGroup(packet.getToPublicKey())) {
/**
* Если это относится к группам, то игнорируем здесь,
* для этого есть отдельный слушатель usePacket ниже
*/
return;
}
const fromPublicKey = packet.getFromPublicKey();
const toPublicKey = packet.getToPublicKey();
if (fromPublicKey == publicKey) {
/**
* Игнорируем если это наше прочтение
* которое получается при синхронизации
*/
return;
}
await updateSyncTime(Date.now());
console.info("PACKED_READ_IM");
await runQuery(`UPDATE messages SET read = 1 WHERE from_public_key = ? AND to_public_key = ? AND account = ?`, [toPublicKey, fromPublicKey, publicKey]);
console.info("read im with params ", [fromPublicKey, toPublicKey, publicKey]);
updateDialog(fromPublicKey);
log("Read packet received from " + fromPublicKey + " for " + toPublicKey);
addOrUpdateDialogCache(fromPublicKey, getDialogCache(fromPublicKey).map((message) => {
if (message.from_public_key == toPublicKey && !message.readed) {
console.info("Marking message as read in cache for dialog with " + fromPublicKey);
console.info({ fromPublicKey, toPublicKey });
return {
...message,
readed: 1
}
}
return message;
}));
});
}, [updateDialog, publicKey]);
/**
* Обработчик прочтения групповых сообщений
*/
usePacket(0x07, async (packet: PacketRead) => {
runTaskInQueue(async () => {
if (!hasGroup(packet.getToPublicKey())) {
/**
* Если это не относится к группам, то игнорируем здесь,
* для этого есть отдельный слушатель usePacket выше
*/
return;
}
const fromPublicKey = packet.getFromPublicKey();
const toPublicKey = packet.getToPublicKey();
await runQuery(`UPDATE messages SET read = 1 WHERE to_public_key = ? AND from_public_key = ? AND account = ?`, [toPublicKey, publicKey, publicKey]);
await updateSyncTime(Date.now());
updateDialog(toPublicKey);
addOrUpdateDialogCache(toPublicKey, getDialogCache(toPublicKey).map((message) => {
if (!message.readed) {
console.info("Marking message as read in cache for dialog with " + fromPublicKey);
console.info({ fromPublicKey, toPublicKey });
return {
...message,
readed: 1
}
}
return message;
}));
});
}, [updateDialog]);
/**
* Обработчик доставки сообщений
*/
usePacket(0x08, async (packet: PacketDelivery) => {
runTaskInQueue(async () => {
const messageId = packet.getMessageId();
await runQuery(`UPDATE messages SET delivered = ?, timestamp = ? WHERE message_id = ? AND account = ?`, [DeliveredMessageState.DELIVERED, Date.now(), messageId, publicKey]);
updateDialog(packet.getToPublicKey());
await updateSyncTime(Date.now());
log("Delivery packet received msg id " + messageId);
addOrUpdateDialogCache(packet.getToPublicKey(), getDialogCache(packet.getToPublicKey()).map((message) => {
if (message.message_id == messageId) {
return {
...message,
delivered: DeliveredMessageState.DELIVERED,
timestamp: Date.now()
}
}
return message;
}));
});
}, [updateDialog]);
}