114 lines
5.2 KiB
TypeScript
114 lines
5.2 KiB
TypeScript
import { useEffect } from "react";
|
||
import { useProtocolState } from "../ProtocolProvider/useProtocolState";
|
||
import { ProtocolState } from "../ProtocolProvider/ProtocolProvider";
|
||
import { useDatabase } from "../DatabaseProvider/useDatabase";
|
||
import { usePublicKey } from "../AccountProvider/usePublicKey";
|
||
import { PacketSync, SyncStatus } from "../ProtocolProvider/protocol/packets/packet.sync";
|
||
import { useSender } from "../ProtocolProvider/useSender";
|
||
import { usePacket } from "../ProtocolProvider/usePacket";
|
||
import { whenFinish } from "./dialogQueue";
|
||
import { useProtocol } from "../ProtocolProvider/useProtocol";
|
||
import { PacketGroupJoin } from "../ProtocolProvider/protocol/packets/packet.group.join";
|
||
import { useGroups } from "./useGroups";
|
||
import { decodeWithPassword, encodeWithPassword } from "@/app/workers/crypto/crypto";
|
||
import { usePrivatePlain } from "../AccountProvider/usePrivatePlain";
|
||
import { GroupStatus } from "../ProtocolProvider/protocol/packets/packet.group.invite.info";
|
||
import { useConsoleLogger } from "@/app/hooks/useConsoleLogger";
|
||
import { useNavigate } from "react-router-dom";
|
||
import { useDialogsList } from "../DialogListProvider/useDialogsList";
|
||
import { useUpdateGroupInformation } from "../InformationProvider/useUpdateGroupInformation";
|
||
import { useGroupInviteStatus } from "./useGroupInviteStatus";
|
||
|
||
/**
|
||
* Хук отвечает за синхронизацию сообщений, запрос синхронизации
|
||
* при подключении
|
||
*/
|
||
export function useSynchronize() {
|
||
const [_, setProtocolState] = useProtocolState();
|
||
const {getQuery, runQuery} = useDatabase();
|
||
const publicKey = usePublicKey();
|
||
const send = useSender();
|
||
const {protocol} = useProtocol();
|
||
const {parseGroupString} = useGroups();
|
||
const privatePlain = usePrivatePlain();
|
||
const {error, info} = useConsoleLogger('useSynchronize');
|
||
const {setInviteStatusByGroupId} = useGroupInviteStatus('');
|
||
const updateGroupInformation = useUpdateGroupInformation();
|
||
const {updateDialog} = useDialogsList();
|
||
|
||
|
||
useEffect(() => {
|
||
if(protocol.handshakeExchangeComplete){
|
||
trySync();
|
||
}
|
||
}, [protocol.handshakeExchangeComplete]);
|
||
|
||
const trySync = async () => {
|
||
const lastSyncTime = await getQuery(`SELECT last_sync FROM accounts_sync_times WHERE account = ?`, [publicKey]);
|
||
sendSynchronize(lastSyncTime?.last_sync ?? 0);
|
||
}
|
||
|
||
const sendSynchronize = (timestamp: number) => {
|
||
const packet = new PacketSync();
|
||
packet.setStatus(0);
|
||
packet.setTimestamp(timestamp);
|
||
send(packet);
|
||
}
|
||
|
||
/**
|
||
* Пакет приходит либо при входе в группу (но там используется слушатель once), либо при
|
||
* синхронизации. В данном случае этот пакет прийдет только при синхронизации
|
||
*/
|
||
usePacket(20, async (packet: PacketGroupJoin) => {
|
||
const decryptedGroupString = await decodeWithPassword(privatePlain, packet.getGroupString());
|
||
const parsed = await parseGroupString(decryptedGroupString);
|
||
if(!parsed){
|
||
error("Received invalid group string, skipping");
|
||
return;
|
||
}
|
||
const groupStatus = packet.getGroupStatus();
|
||
if(groupStatus != GroupStatus.JOINED){
|
||
error("Cannot sync group that is not joined, skipping");
|
||
return;
|
||
}
|
||
const secureKey = await encodeWithPassword(privatePlain, parsed.encryptKey);
|
||
await runQuery(`
|
||
INSERT INTO groups (account, group_id, title, description, key) VALUES (?, ?, ?, ?, ?)
|
||
`, [publicKey, parsed.groupId, parsed.title, parsed.description, secureKey]);
|
||
updateDialog("#group:" + parsed.groupId);
|
||
setInviteStatusByGroupId(parsed.groupId, GroupStatus.JOINED);
|
||
updateGroupInformation({
|
||
groupId: parsed.groupId,
|
||
title: parsed.title,
|
||
description: parsed.description
|
||
});
|
||
info("Group synchronized " + parsed.groupId);
|
||
}, [publicKey]);
|
||
|
||
usePacket(25, async (packet: PacketSync) => {
|
||
const status = packet.getStatus();
|
||
if(status == SyncStatus.BATCH_START){
|
||
setProtocolState(ProtocolState.SYNCHRONIZATION);
|
||
}
|
||
if(status == SyncStatus.BATCH_END){
|
||
/**
|
||
* Этот Promise ждет пока все сообщения синхронизируются и обработаются, только
|
||
* после этого
|
||
*/
|
||
await whenFinish();
|
||
await runQuery(
|
||
"INSERT INTO accounts_sync_times (account, last_sync) VALUES (?, ?) " +
|
||
"ON CONFLICT(account) DO UPDATE SET last_sync = ? WHERE account = ?",
|
||
[publicKey, packet.getTimestamp(), packet.getTimestamp(), publicKey]
|
||
);
|
||
console.info("Batch complete", publicKey, packet.getTimestamp());
|
||
trySync();
|
||
}
|
||
if(status == SyncStatus.NOT_NEEDED){
|
||
/**
|
||
* Синхронизация не нужна, все данные актуальны
|
||
*/
|
||
setProtocolState(ProtocolState.CONNECTED);
|
||
}
|
||
}, [publicKey]);
|
||
} |