Compare commits

..

10 Commits

Author SHA1 Message Date
RoyceDa
29af00403c Дедубликаия сообщений в диалогах 2026-02-16 23:29:39 +02:00
RoyceDa
441c29fe6b Поднятие до 1.0.0 2026-02-16 20:26:54 +02:00
RoyceDa
2045bf284e Фикс синхронизации после подтверждения девайса 2026-02-16 20:16:12 +02:00
RoyceDa
432b270b34 Фикс бага с уникальными индексами сообщений 2026-02-16 19:28:36 +02:00
RoyceDa
d02cb63ffe gitignore 2026-02-16 16:05:18 +02:00
RoyceDa
e551ef51c7 Поднятие версии 2026-02-16 15:05:18 +02:00
RoyceDa
8c627f1c48 Дедубликация сообщений 2026-02-16 14:53:18 +02:00
RoyceDa
6df79ee317 Фикс репликации сообщений 2026-02-16 14:13:02 +02:00
RoyceDa
1efac74e27 Финальная версия синхронизации 2026-02-16 13:25:38 +02:00
RoyceDa
4a505ab974 Синхронизация, фикс ошибок 2026-02-15 18:15:03 +02:00
19 changed files with 121 additions and 28 deletions

2
.gitignore vendored
View File

@@ -4,6 +4,7 @@ out
packs
package-lock.json
LICENSE
CREDS
*.code-workspace
.DS_Store
.vscode
@@ -13,3 +14,4 @@ LICENSE
.env.test.local
.env.production.local
app/servers.ts
packs

View File

@@ -61,5 +61,6 @@ export const ALLOWED_DOMAINS_ZONES = [
'chat',
'gg',
'fm',
'tv'
'tv',
'im'
];

View File

@@ -17,6 +17,17 @@ export function DatabaseProvider(props: DatabaseProviderProps) {
(async () => {
await createAllTables();
setInitialized(true);
//await runQuery("DROP TABLE IF EXISTS accounts_sync_times");
/**
* Удаляем старый индекс только по message_id
*/
await runQuery("DROP INDEX IF EXISTS idx_messages_message_id");
/**
* Добавляем уникальный индекс на столбцы message_id и public_key
* в таблице messages чтобы избежать дубликации сообщений
*/
await runQuery("CREATE UNIQUE INDEX IF NOT EXISTS idx_messages_message_id_public_key ON messages(message_id, account)");
})();
}, []);

View File

@@ -43,6 +43,11 @@ export const TABLES = [
UNIQUE (id)
)`,
/**
* Создаем индекс на столбцы message_id и public_key чтобы избежать дубликации сообщений при синхронизации
*/
`CREATE UNIQUE INDEX IF NOT EXISTS idx_messages_message_id_public_key ON messages(message_id, account)`,
`CREATE TABLE IF NOT EXISTS cached_users (
public_key TEXT PRIMARY KEY,
title TEXT NOT NULL,
@@ -73,5 +78,15 @@ export const TABLES = [
last_timestamp INTEGER NOT NULL,
is_request INTEGER NOT NULL DEFAULT 0,
UNIQUE (id)
)`,
/**
* Таблица для хранения времени последней синхронизации сообщений
* last_sync время отключения клиента от сервера
*/
`CREATE TABLE IF NOT EXISTS accounts_sync_times (
id INTEGER PRIMARY KEY,
account TEXT NOT NULL,
last_sync INTEGER NOT NULL,
UNIQUE (account)
)`
]

View File

@@ -917,10 +917,28 @@ export function DialogProvider(props: DialogProviderProps) {
}
}
/**
* Дедубликация сообщений по message_id, так как может возникать ситуация, что одно и то же сообщение
* может загрузиться несколько раз при накладках сети, отставании часов, при синхронизации
* @param messages массив сообщений
* @returns массив уникальных сообщений
*/
const deduplicate = (messages: Message[]) => {
const map = new Map<string, Message>();
for(let i = 0; i < messages.length; i++){
const message = messages[i];
if(map.has(message.message_id)){
continue;
}
map.set(message.message_id, message);
}
return Array.from(map.values());
}
return (
<DialogContext.Provider value={{
loading,
messages,
messages: deduplicate(messages),
setMessages,
clearDialogCache: () => {
setDialogsCache(dialogsCache.filter((cache) => cache.publicKey != props.dialog));

View File

@@ -1,5 +1,12 @@
let tail: Promise<void> = Promise.resolve();
/**
* Ставит функцию в очередь на выполнение.
* Все функции выполняются последовательно, одна за другой.
* Если функция выбрасывает ошибку, она логируется,
* а выполнение очереди продолжается
* @param fn функция
*/
export const runTaskInQueue = (fn: () => Promise<void>) => {
tail = tail.then(fn).catch((e) => {
console.error("Dialog queue error", e);

View File

@@ -25,6 +25,8 @@ import { useDialogState } from "../DialogStateProvider.tsx/useDialogState";
import { useUserInformation } from "../InformationProvider/useUserInformation";
import { useMentions } from "../DialogStateProvider.tsx/useMentions";
import { runTaskInQueue } from "./dialogQueue";
import { useProtocolState } from "../ProtocolProvider/useProtocolState";
import { ProtocolState } from "../ProtocolProvider/ProtocolProvider";
/**
* При вызове будет запущен "фоновый" обработчик
@@ -50,6 +52,28 @@ export function useDialogFiber() {
const { muted } = useDialogState();
const [userInfo] = useUserInformation(publicKey);
const { pushMention } = useMentions();
const [protocolState] = useProtocolState();
/**
* Обновляет время последней синхронизации для аккаунта
* @param timestamp время
*/
const updateSyncTime = async (timestamp: number) => {
if(protocolState == ProtocolState.SYNCHRONIZATION){
/**
* Если сейчас идет синхронизация то чтобы при синхронизации
* не создавать нагрузку на базу данных
* по постоянному обновлению, обновляем базу один раз - когда
* приходит пакет о том что синхронизация закончилась
*/
return;
}
await runQuery(
"INSERT INTO accounts_sync_times (account, last_sync) VALUES (?, ?) " +
"ON CONFLICT(account) DO UPDATE SET last_sync = ? WHERE account = ?",
[publicKey, timestamp, timestamp, publicKey]
);
};
/**
* Лог
@@ -84,7 +108,7 @@ export function useDialogFiber() {
const key = chachaDecryptedKey.slice(0, 32);
const nonce = chachaDecryptedKey.slice(32);
const decryptedContent = await chacha20Decrypt(content, nonce.toString('hex'), key.toString('hex'));
await updateSyncTime(timestamp);
let attachmentsMeta: any[] = [];
let messageAttachments: Attachment[] = [];
for (let i = 0; i < packet.getAttachments().length; i++) {
@@ -177,6 +201,7 @@ export function useDialogFiber() {
*/
return;
}
await updateSyncTime(timestamp);
const groupKey = await getGroupKey(toPublicKey);
if (!groupKey) {
log("Group key not found for group " + toPublicKey);
@@ -343,6 +368,8 @@ export function useDialogFiber() {
return;
}
await updateSyncTime(timestamp);
const chachaDecryptedKey = Buffer.from(await decrypt(chachaKey, privatePlain), "binary");
const key = chachaDecryptedKey.slice(0, 32);
const nonce = chachaDecryptedKey.slice(32);
@@ -494,6 +521,7 @@ export function useDialogFiber() {
*/
return;
}
await updateSyncTime(Date.now());
console.info("PACKED_READ_IM");
await runQuery(`UPDATE messages SET read = 1 WHERE from_public_key = ? AND to_public_key = ? AND account = ?`, [toPublicKey, fromPublicKey, publicKey]);
console.info("read im with params ", [fromPublicKey, toPublicKey, publicKey]);
@@ -527,6 +555,7 @@ export function useDialogFiber() {
const fromPublicKey = packet.getFromPublicKey();
const toPublicKey = packet.getToPublicKey();
await runQuery(`UPDATE messages SET read = 1 WHERE to_public_key = ? AND from_public_key = ? AND account = ?`, [toPublicKey, publicKey, publicKey]);
await updateSyncTime(Date.now());
updateDialog(toPublicKey);
addOrUpdateDialogCache(toPublicKey, getDialogCache(toPublicKey).map((message) => {
if (!message.readed) {
@@ -549,6 +578,7 @@ export function useDialogFiber() {
const messageId = packet.getMessageId();
await runQuery(`UPDATE messages SET delivered = ?, timestamp = ? WHERE message_id = ? AND account = ?`, [DeliveredMessageState.DELIVERED, Date.now(), messageId, publicKey]);
updateDialog(packet.getToPublicKey());
await updateSyncTime(Date.now());
log("Delivery packet received msg id " + messageId);
addOrUpdateDialogCache(packet.getToPublicKey(), getDialogCache(packet.getToPublicKey()).map((message) => {
if (message.message_id == messageId) {

View File

@@ -7,31 +7,28 @@ import { PacketSync, SyncStatus } from "../ProtocolProvider/protocol/packets/pac
import { useSender } from "../ProtocolProvider/useSender";
import { usePacket } from "../ProtocolProvider/usePacket";
import { whenFinish } from "./dialogQueue";
import { useProtocol } from "../ProtocolProvider/useProtocol";
/**
* Хук отвечает за синхронизацию сообщений, запрос синхронизации
* при подключении
*/
export function useSynchronize() {
const [protocolState, setProtocolState] = useProtocolState();
const {getQuery} = useDatabase();
const [_, setProtocolState] = useProtocolState();
const {getQuery, runQuery} = useDatabase();
const publicKey = usePublicKey();
const send = useSender();
const {protocol} = useProtocol();
useEffect(() => {
if(protocolState == ProtocolState.CONNECTED){
if(protocol.handshakeExchangeComplete){
trySync();
setProtocolState(ProtocolState.SYNCHRONIZATION);
}
}, [protocolState]);
}, [protocol.handshakeExchangeComplete]);
const trySync = async () => {
const lastMessage = await getQuery("SELECT timestamp FROM messages WHERE account = ? ORDER BY timestamp DESC LIMIT 1", [publicKey]);
if(!lastMessage){
sendSynchronize(0);
return;
}
sendSynchronize(lastMessage.timestamp);
const lastSyncTime = await getQuery(`SELECT last_sync FROM accounts_sync_times WHERE account = ?`, [publicKey]);
sendSynchronize(lastSyncTime?.last_sync ?? 0);
}
const sendSynchronize = (timestamp: number) => {
@@ -43,8 +40,19 @@ export function useSynchronize() {
usePacket(25, async (packet: PacketSync) => {
const status = packet.getStatus();
if(status == SyncStatus.BATCH_START){
setProtocolState(ProtocolState.SYNCHRONIZATION);
}
if(status == SyncStatus.BATCH_END){
console.info("Batch start");
await whenFinish();
console.info("Batch finished");
await runQuery(
"INSERT INTO accounts_sync_times (account, last_sync) VALUES (?, ?) " +
"ON CONFLICT(account) DO UPDATE SET last_sync = ? WHERE account = ?",
[publicKey, packet.getTimestamp(), packet.getTimestamp(), publicKey]
);
console.info("Batch complete", publicKey, packet.getTimestamp());
trySync();
}
if(status == SyncStatus.NOT_NEEDED){
@@ -53,5 +61,5 @@ export function useSynchronize() {
*/
setProtocolState(ProtocolState.CONNECTED);
}
});
}, [publicKey]);
}

View File

@@ -34,7 +34,7 @@ export default class Protocol extends EventEmitter {
private _supportedPackets: Map<number, Packet> = new Map();
private _packetWaiters: Map<number, ((packet: any) => void)[]> = new Map();
private _packetQueue: Packet[] = []; // Очередь для пакетов
private handshakeExchangeComplete : boolean = false;
public handshakeExchangeComplete : boolean = false;
private heartbeatIntervalTimer : NodeJS.Timeout | null = null;
constructor(serverAddress: string) {

View File

@@ -1,8 +1,8 @@
export const APP_VERSION = "0.6.0";
export const CORE_MIN_REQUIRED_VERSION = "1.4.6";
export const APP_VERSION = "1.0.0";
export const CORE_MIN_REQUIRED_VERSION = "1.4.8";
export const RELEASE_NOTICE = `
**Update v0.6.1** :emoji_1f631:
- Fix login select account
**Update v1.0.0** :emoji_1f631:
- Full sync support with message history and attachments
- Fix sync issues for device confirming
`;

View File

@@ -16,7 +16,8 @@ export function runQuery(query: string, params: any[] = []) : Promise<void> {
return new Promise((resolve, reject) => {
db.run(query, params, function (err) {
if (err) {
reject();
console.info("Query error: ", err, query, params);
reject(err);
} else {
resolve();
}

View File

@@ -2,13 +2,13 @@ import { ipcMain } from 'electron';
import { runQuery, getQuery, allQuery } from '../database';
ipcMain.handle('db:run', async (_, query: string, params: any[]) => {
return await runQuery(query, params);
await runQuery(query, params);
});
ipcMain.handle('db:get', async (_, query: string, params: any[]) => {
return await getQuery(query, params);
return await getQuery(query, params);
});
ipcMain.handle('db:all', async (_, query: string, params: any[]) => {
return await allQuery(query, params);
return await allQuery(query, params);
});

View File

@@ -1,6 +1,6 @@
{
"name": "Rosetta",
"version": "1.4.7",
"version": "1.4.8",
"description": "Rosetta Messenger",
"main": "./out/main/main.js",
"license": "MIT",

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.