Синхронизация, фикс ошибок

This commit is contained in:
RoyceDa
2026-02-15 18:15:03 +02:00
parent 8b906169ce
commit 4a505ab974
6 changed files with 73 additions and 17 deletions

View File

@@ -17,6 +17,7 @@ export function DatabaseProvider(props: DatabaseProviderProps) {
(async () => { (async () => {
await createAllTables(); await createAllTables();
setInitialized(true); setInitialized(true);
//await runQuery("DROP TABLE IF EXISTS accounts_sync_times");
})(); })();
}, []); }, []);

View File

@@ -73,5 +73,15 @@ export const TABLES = [
last_timestamp INTEGER NOT NULL, last_timestamp INTEGER NOT NULL,
is_request INTEGER NOT NULL DEFAULT 0, is_request INTEGER NOT NULL DEFAULT 0,
UNIQUE (id) UNIQUE (id)
)`,
/**
* Таблица для хранения времени последней синхронизации сообщений
* last_sync время отключения клиента от сервера
*/
`CREATE TABLE IF NOT EXISTS accounts_sync_times (
id INTEGER PRIMARY KEY,
account TEXT NOT NULL,
last_sync INTEGER NOT NULL,
UNIQUE (account)
)` )`
] ]

View File

@@ -25,6 +25,8 @@ import { useDialogState } from "../DialogStateProvider.tsx/useDialogState";
import { useUserInformation } from "../InformationProvider/useUserInformation"; import { useUserInformation } from "../InformationProvider/useUserInformation";
import { useMentions } from "../DialogStateProvider.tsx/useMentions"; import { useMentions } from "../DialogStateProvider.tsx/useMentions";
import { runTaskInQueue } from "./dialogQueue"; import { runTaskInQueue } from "./dialogQueue";
import { useProtocolState } from "../ProtocolProvider/useProtocolState";
import { ProtocolState } from "../ProtocolProvider/ProtocolProvider";
/** /**
* При вызове будет запущен "фоновый" обработчик * При вызове будет запущен "фоновый" обработчик
@@ -50,6 +52,30 @@ export function useDialogFiber() {
const { muted } = useDialogState(); const { muted } = useDialogState();
const [userInfo] = useUserInformation(publicKey); const [userInfo] = useUserInformation(publicKey);
const { pushMention } = useMentions(); const { pushMention } = useMentions();
const [protocolState] = useProtocolState();
/**
* Обновляет время последней синхронизации для аккаунта
* @param timestamp время
*/
const updateSyncTime = async (timestamp: number) => {
if(protocolState == ProtocolState.SYNCHRONIZATION){
/**
* Если сейчас идет синхронизация то чтобы при синхронизации
* не создавать нагрузку на базу данных
* по постоянному обновлению, обновляем базу один раз - когда
* приходит пакет о том что синхронизация закончилась
*/
return;
}
await runQuery(
"INSERT INTO accounts_sync_times (account, last_sync) VALUES (?, ?) " +
"ON CONFLICT(account) DO UPDATE SET last_sync = CASE " +
"WHEN excluded.last_sync > last_sync THEN excluded.last_sync " +
"ELSE last_sync END",
[publicKey, timestamp]
);
};
/** /**
* Лог * Лог
@@ -177,6 +203,7 @@ export function useDialogFiber() {
*/ */
return; return;
} }
await updateSyncTime(timestamp);
const groupKey = await getGroupKey(toPublicKey); const groupKey = await getGroupKey(toPublicKey);
if (!groupKey) { if (!groupKey) {
log("Group key not found for group " + toPublicKey); log("Group key not found for group " + toPublicKey);
@@ -343,6 +370,8 @@ export function useDialogFiber() {
return; return;
} }
await updateSyncTime(timestamp);
const chachaDecryptedKey = Buffer.from(await decrypt(chachaKey, privatePlain), "binary"); const chachaDecryptedKey = Buffer.from(await decrypt(chachaKey, privatePlain), "binary");
const key = chachaDecryptedKey.slice(0, 32); const key = chachaDecryptedKey.slice(0, 32);
const nonce = chachaDecryptedKey.slice(32); const nonce = chachaDecryptedKey.slice(32);
@@ -494,6 +523,7 @@ export function useDialogFiber() {
*/ */
return; return;
} }
await updateSyncTime(Date.now());
console.info("PACKED_READ_IM"); console.info("PACKED_READ_IM");
await runQuery(`UPDATE messages SET read = 1 WHERE from_public_key = ? AND to_public_key = ? AND account = ?`, [toPublicKey, fromPublicKey, publicKey]); await runQuery(`UPDATE messages SET read = 1 WHERE from_public_key = ? AND to_public_key = ? AND account = ?`, [toPublicKey, fromPublicKey, publicKey]);
console.info("read im with params ", [fromPublicKey, toPublicKey, publicKey]); console.info("read im with params ", [fromPublicKey, toPublicKey, publicKey]);
@@ -527,6 +557,7 @@ export function useDialogFiber() {
const fromPublicKey = packet.getFromPublicKey(); const fromPublicKey = packet.getFromPublicKey();
const toPublicKey = packet.getToPublicKey(); const toPublicKey = packet.getToPublicKey();
await runQuery(`UPDATE messages SET read = 1 WHERE to_public_key = ? AND from_public_key = ? AND account = ?`, [toPublicKey, publicKey, publicKey]); await runQuery(`UPDATE messages SET read = 1 WHERE to_public_key = ? AND from_public_key = ? AND account = ?`, [toPublicKey, publicKey, publicKey]);
await updateSyncTime(Date.now());
updateDialog(toPublicKey); updateDialog(toPublicKey);
addOrUpdateDialogCache(toPublicKey, getDialogCache(toPublicKey).map((message) => { addOrUpdateDialogCache(toPublicKey, getDialogCache(toPublicKey).map((message) => {
if (!message.readed) { if (!message.readed) {

View File

@@ -7,31 +7,33 @@ import { PacketSync, SyncStatus } from "../ProtocolProvider/protocol/packets/pac
import { useSender } from "../ProtocolProvider/useSender"; import { useSender } from "../ProtocolProvider/useSender";
import { usePacket } from "../ProtocolProvider/usePacket"; import { usePacket } from "../ProtocolProvider/usePacket";
import { whenFinish } from "./dialogQueue"; import { whenFinish } from "./dialogQueue";
import { useProtocol } from "../ProtocolProvider/useProtocol";
/** /**
* Хук отвечает за синхронизацию сообщений, запрос синхронизации * Хук отвечает за синхронизацию сообщений, запрос синхронизации
* при подключении * при подключении
*/ */
export function useSynchronize() { export function useSynchronize() {
const [protocolState, setProtocolState] = useProtocolState(); const [_, setProtocolState] = useProtocolState();
const {getQuery} = useDatabase(); const {getQuery, runQuery} = useDatabase();
const publicKey = usePublicKey(); const publicKey = usePublicKey();
const send = useSender(); const send = useSender();
const {protocol} = useProtocol();
useEffect(() => { useEffect(() => {
if(protocolState == ProtocolState.CONNECTED){ const handshake_complete = () => {
trySync(); trySync();
setProtocolState(ProtocolState.SYNCHRONIZATION);
} }
}, [protocolState]); protocol.on('handshake_complete', handshake_complete);
return () => {
protocol.off('handshake_complete', handshake_complete);
}
}, [protocol]);
const trySync = async () => { const trySync = async () => {
const lastMessage = await getQuery("SELECT timestamp FROM messages WHERE account = ? ORDER BY timestamp DESC LIMIT 1", [publicKey]); const lastSyncTime = await getQuery(`SELECT last_sync FROM accounts_sync_times WHERE account = ?`, [publicKey]);
if(!lastMessage){ sendSynchronize(lastSyncTime?.last_sync ?? 0);
sendSynchronize(0);
return;
}
sendSynchronize(lastMessage.timestamp);
} }
const sendSynchronize = (timestamp: number) => { const sendSynchronize = (timestamp: number) => {
@@ -43,9 +45,20 @@ export function useSynchronize() {
usePacket(25, async (packet: PacketSync) => { usePacket(25, async (packet: PacketSync) => {
const status = packet.getStatus(); const status = packet.getStatus();
if(status == SyncStatus.BATCH_START){
setProtocolState(ProtocolState.SYNCHRONIZATION);
}
if(status == SyncStatus.BATCH_END){ if(status == SyncStatus.BATCH_END){
console.info("Batch start");
await whenFinish(); await whenFinish();
trySync(); console.info("Batch finished");
await runQuery(
"INSERT INTO accounts_sync_times (account, last_sync) VALUES (?, ?) " +
"ON CONFLICT(account) DO UPDATE SET last_sync = ?",
[publicKey, packet.getTimestamp(), packet.getTimestamp()]
);
console.info("Batch complete", publicKey, packet.getTimestamp());
trySync();
} }
if(status == SyncStatus.NOT_NEEDED){ if(status == SyncStatus.NOT_NEEDED){
/** /**
@@ -53,5 +66,5 @@ export function useSynchronize() {
*/ */
setProtocolState(ProtocolState.CONNECTED); setProtocolState(ProtocolState.CONNECTED);
} }
}); }, [publicKey]);
} }

View File

@@ -16,7 +16,8 @@ export function runQuery(query: string, params: any[] = []) : Promise<void> {
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
db.run(query, params, function (err) { db.run(query, params, function (err) {
if (err) { if (err) {
reject(); console.info("Query error: ", err, query, params);
reject(err);
} else { } else {
resolve(); resolve();
} }

View File

@@ -2,13 +2,13 @@ import { ipcMain } from 'electron';
import { runQuery, getQuery, allQuery } from '../database'; import { runQuery, getQuery, allQuery } from '../database';
ipcMain.handle('db:run', async (_, query: string, params: any[]) => { ipcMain.handle('db:run', async (_, query: string, params: any[]) => {
return await runQuery(query, params); await runQuery(query, params);
}); });
ipcMain.handle('db:get', async (_, query: string, params: any[]) => { ipcMain.handle('db:get', async (_, query: string, params: any[]) => {
return await getQuery(query, params); return await getQuery(query, params);
}); });
ipcMain.handle('db:all', async (_, query: string, params: any[]) => { ipcMain.handle('db:all', async (_, query: string, params: any[]) => {
return await allQuery(query, params); return await allQuery(query, params);
}); });