Промежуточный этап синхронизации

This commit is contained in:
RoyceDa
2026-02-15 14:56:44 +02:00
parent 40ff99e66d
commit 8b906169ce
15 changed files with 609 additions and 427 deletions

View File

@@ -31,7 +31,7 @@ export function ChatHeader() {
const theme = useMantineTheme();
const [blocked, blockUser, unblockUser] = useBlacklist(dialog);
const [opponent, ___, forceUpdateUserInformation] = useUserInformation(dialog);
const protocolState = useProtocolState();
const [protocolState] = useProtocolState();
const [userTypeing, setUserTypeing] = useState(false);
const timeoutRef = useRef<NodeJS.Timeout>(undefined);
const avatars = useAvatars(dialog);

View File

@@ -0,0 +1,39 @@
import { useRosettaColors } from "@/app/hooks/useRosettaColors";
import { ProtocolState } from "@/app/providers/ProtocolProvider/ProtocolProvider";
import { useProtocolState } from "@/app/providers/ProtocolProvider/useProtocolState";
import { Flex, Loader, Text } from "@mantine/core";
export function DialogHeaderText() {
const [protocolState] = useProtocolState();
const colors = useRosettaColors();
const headerType = () => {
switch(protocolState){
case ProtocolState.SYNCHRONIZATION:
return (<>
<Loader size={12} color={colors.chevrons.active}></Loader>
<Text fw={500} style={{
userSelect: 'none'
}} size={'sm'}>Updating...</Text>
</>);
case ProtocolState.CONNECTED:
return (<>
<Text fw={500} style={{
userSelect: 'none'
}} size={'sm'}>Chats</Text>
</>);
default:
return (<>
<Text fw={500} style={{
userSelect: 'none'
}} size={'sm'}>Chats</Text>
</>);
}
}
return (
<Flex direction={'row'} align={'center'} gap={'xs'}>
{headerType()}
</Flex>
)
}

View File

@@ -6,6 +6,7 @@ import { useLogout } from "@/app/providers/AccountProvider/useLogout";
import { useHotkeys } from "@mantine/hooks";
import { useNavigate } from "react-router-dom";
import { usePublicKey } from "@/app/providers/AccountProvider/usePublicKey";
import { DialogHeaderText } from "../DialogHeaderText/DialogHeaderText";
export function DialogsPanelHeader() {
const colors = useRosettaColors();
@@ -66,9 +67,7 @@ export function DialogsPanelHeader() {
</Menu.Item>
</Menu.Dropdown>
</Menu>
<Text fw={500} style={{
userSelect: 'none'
}} size={'sm'}>Chats</Text>
<DialogHeaderText></DialogHeaderText>
<Menu withArrow width={150} shadow="md">
<Menu.Target>
<IconEdit style={{

View File

@@ -25,7 +25,7 @@ export function GroupHeader() {
const {deleteMessages, dialog} = useDialog();
const theme = useMantineTheme();
const {groupInfo} = useGroupInformation(dialog);
const protocolState = useProtocolState();
const [protocolState] = useProtocolState();
const [usersTypeing, setUsersTypeing] = useState<{
timeout: NodeJS.Timeout | null,
fromPublicKey: string

View File

@@ -8,7 +8,7 @@ import { MacFrameButtons } from "../MacFrameButtons/MacFrameButtons";
export function Topbar() {
const colors = useRosettaColors();
const protocolState = useProtocolState();
const [protocolState] = useProtocolState();
return (
@@ -16,14 +16,14 @@ export function Topbar() {
{window.platform == 'win32' && <WindowsFrameButtons></WindowsFrameButtons>}
{window.platform == 'darwin' && <MacFrameButtons></MacFrameButtons>}
{window.platform == 'linux' && <WindowsFrameButtons></WindowsFrameButtons>}
{(protocolState == ProtocolState.CONNECTED || !window.location.hash.includes("main")) &&
{(protocolState == ProtocolState.CONNECTED || protocolState == ProtocolState.SYNCHRONIZATION || !window.location.hash.includes("main")) &&
<Flex align={'center'} justify={'center'}>
<Text fw={'bolder'} fz={13} c={'gray'}>
Rosetta Messenger
</Text>
</Flex>
}
{(protocolState != ProtocolState.CONNECTED && protocolState != ProtocolState.DEVICE_VERIFICATION_REQUIRED && window.location.hash.includes("main")) &&
{(protocolState != ProtocolState.CONNECTED && protocolState != ProtocolState.SYNCHRONIZATION && protocolState != ProtocolState.DEVICE_VERIFICATION_REQUIRED && window.location.hash.includes("main")) &&
<Flex align={'center'} gap={5} justify={'center'}>
<Loader size={12} color={colors.chevrons.active}></Loader>
<Text fw={'bolder'} fz={13} c={'gray'}>

View File

@@ -0,0 +1,12 @@
let tail: Promise<void> = Promise.resolve();
export const runTaskInQueue = (fn: () => Promise<void>) => {
tail = tail.then(fn).catch((e) => {
console.error("Dialog queue error", e);
});
};
/**
* Ждет, пока все пакеты попадающие в очередь не будут обработаны
*/
export const whenFinish = () => tail;

View File

@@ -44,7 +44,7 @@ export function useDialog() : {
const privateKey = usePrivateKeyHash();
const privatePlain = usePrivatePlain();
const {writeFile} = useFileStorage();
const protocolState = useProtocolState();
const [protocolState] = useProtocolState();
const {hasGroup, getGroupKey} = useGroups();
const {warn} = useConsoleLogger('useDialog');

View File

@@ -24,6 +24,7 @@ import { useGroups } from "./useGroups";
import { useDialogState } from "../DialogStateProvider.tsx/useDialogState";
import { useUserInformation } from "../InformationProvider/useUserInformation";
import { useMentions } from "../DialogStateProvider.tsx/useMentions";
import { runTaskInQueue } from "./dialogQueue";
/**
* При вызове будет запущен "фоновый" обработчик
@@ -41,14 +42,14 @@ export function useDialogFiber() {
const notify = useNotification();
const focused = useWindowFocus();
const { getDialogCache, addOrUpdateDialogCache } = useDialogsCache();
const {info, error} = useConsoleLogger('useDialogFiber');
const { info, error } = useConsoleLogger('useDialogFiber');
const [viewState] = useViewPanelsState();
const {writeFile} = useFileStorage();
const {updateDialog} = useDialogsList();
const {hasGroup, getGroupKey, normalize} = useGroups();
const {muted} = useDialogState();
const { writeFile } = useFileStorage();
const { updateDialog } = useDialogsList();
const { hasGroup, getGroupKey, normalize } = useGroups();
const { muted } = useDialogState();
const [userInfo] = useUserInformation(publicKey);
const {pushMention} = useMentions();
const { pushMention } = useMentions();
/**
* Лог
@@ -64,6 +65,7 @@ export function useDialogFiber() {
* Метод нужен для синхронизации своих сообщений
*/
usePacket(0x06, async (packet: PacketMessage) => {
runTaskInQueue(async () => {
const fromPublicKey = packet.getFromPublicKey();
const toPublicKey = packet.getToPublicKey();
const aesChachaKey = packet.getAesChachaKey();
@@ -72,7 +74,7 @@ export function useDialogFiber() {
const messageId = packet.getMessageId();
if(fromPublicKey != publicKey){
if (fromPublicKey != publicKey) {
/**
* Игнорируем если это не сообщение от нас
*/
@@ -94,7 +96,7 @@ export function useDialogFiber() {
blob: ""
});
if(attachment.type == AttachmentType.MESSAGES){
if (attachment.type == AttachmentType.MESSAGES) {
/**
* Этот тип вложения приходит сразу в blob и не нуждается
* в последующем скачивании
@@ -148,25 +150,27 @@ export function useDialogFiber() {
if (currentDialogPublicKeyView !== toPublicKey && dialogCache.length > 0) {
addOrUpdateDialogCache(toPublicKey, [...dialogCache, newMessage].slice(-MESSAGE_MAX_LOADED));
}
});
}, [privatePlain, currentDialogPublicKeyView]);
/**
* Обработчик сообщений для группы
*/
usePacket(0x06, async (packet: PacketMessage) => {
runTaskInQueue(async () => {
const fromPublicKey = packet.getFromPublicKey();
const toPublicKey = packet.getToPublicKey();
const content = packet.getContent();
const timestamp = packet.getTimestamp();
const messageId = packet.getMessageId();
if(!hasGroup(toPublicKey)){
if (!hasGroup(toPublicKey)) {
/**
* Если это личное сообщение, то игнорируем его здесь
* для него есть отдельный слушатель usePacket (снизу)
*/
return;
}
if(fromPublicKey == publicKey){
if (fromPublicKey == publicKey) {
/**
* Игнорируем свои же сообщения,
* такое получается при пакете синхронизации
@@ -174,7 +178,7 @@ export function useDialogFiber() {
return;
}
const groupKey = await getGroupKey(toPublicKey);
if(!groupKey){
if (!groupKey) {
log("Group key not found for group " + toPublicKey);
error("Message dropped because group key not found for group " + toPublicKey);
return;
@@ -183,9 +187,9 @@ export function useDialogFiber() {
let decryptedContent = '';
try{
try {
decryptedContent = await decodeWithPassword(groupKey, content);
}catch(e) {
} catch (e) {
decryptedContent = '';
}
@@ -200,7 +204,7 @@ export function useDialogFiber() {
blob: ""
});
if(attachment.type == AttachmentType.MESSAGES){
if (attachment.type == AttachmentType.MESSAGES) {
/**
* Этот тип вложения приходит сразу в blob и не нуждается
* в последующем скачивании
@@ -264,7 +268,7 @@ export function useDialogFiber() {
* пользователь был неактивен, не слать уведомления по всем этим сообщениям
*/
let mentionFlag = false;
if((newMessage.from_public_key != publicKey) && (decryptedContent.includes(`@${userInfo.username}`) || decryptedContent.includes(`@all`))){
if ((newMessage.from_public_key != publicKey) && (decryptedContent.includes(`@${userInfo.username}`) || decryptedContent.includes(`@all`))) {
/**
* Если в сообщении есть упоминание текущего пользователя или @all,
* при этом сообщение отправляли не мы,
@@ -275,13 +279,13 @@ export function useDialogFiber() {
mentionFlag = true;
}
if(!muted.includes(toPublicKey) || mentionFlag){
if (!muted.includes(toPublicKey) || mentionFlag) {
/**
* Если группа не в мутие или есть упоминание - отправляем уведомление
*/
notify("New message", "You have a new message");
}
if(mentionFlag){
if (mentionFlag) {
/**
* Если в сообщении есть упоминание текущего пользователя или @all,
* то добавляем упоминание в состояние диалога
@@ -298,13 +302,15 @@ export function useDialogFiber() {
if (currentDialogPublicKeyView !== toPublicKey && dialogCache.length > 0) {
addOrUpdateDialogCache(toPublicKey, [...dialogCache, newMessage].slice(-MESSAGE_MAX_LOADED));
}
});
}, [blocked, muted, updateDialog, focused, currentDialogPublicKeyView, viewState, idle]);
/**
* Обработчик личных сообщений
*/
usePacket(0x06, async (packet: PacketMessage) => {
runTaskInQueue(async () => {
const fromPublicKey = packet.getFromPublicKey();
if(fromPublicKey == publicKey){
if (fromPublicKey == publicKey) {
/**
* Игнорируем свои же сообщения,
* такое получается при пакете синхронизации
@@ -316,7 +322,7 @@ export function useDialogFiber() {
const chachaKey = packet.getChachaKey();
const timestamp = packet.getTimestamp();
const messageId = packet.getMessageId();
if(hasGroup(toPublicKey)){
if (hasGroup(toPublicKey)) {
/**
* Если это групповое сообщение, то игнорируем его здесь
* для него есть отдельный слушатель usePacket
@@ -353,7 +359,7 @@ export function useDialogFiber() {
blob: ""
});
if(attachment.type == AttachmentType.MESSAGES){
if (attachment.type == AttachmentType.MESSAGES) {
/**
* Этот тип вложения приходит сразу в blob и не нуждается
* в последующем скачивании
@@ -412,7 +418,7 @@ export function useDialogFiber() {
* чтобы когда приходит пачка сообщений с сервера в момент того как
* пользователь был неактивен, не слать уведомления по всем этим сообщениям
*/
if(!muted.includes(fromPublicKey)){
if (!muted.includes(fromPublicKey)) {
/**
* Если пользователь в муте - не отправляем уведомление
*/
@@ -423,13 +429,15 @@ export function useDialogFiber() {
if (currentDialogPublicKeyView !== fromPublicKey && dialogCache.length > 0) {
addOrUpdateDialogCache(fromPublicKey, [...dialogCache, newMessage].slice(-MESSAGE_MAX_LOADED));
}
});
}, [blocked, muted, updateDialog, focused, currentDialogPublicKeyView, viewState, idle]);
/**
* Обработчик синхронизации прочтения личных сообщений
*/
usePacket(0x07, async (packet: PacketRead) => {
if(hasGroup(packet.getToPublicKey())){
runTaskInQueue(async () => {
if (hasGroup(packet.getToPublicKey())) {
/**
* Если это относится к группам, то игнорируем здесь,
* для этого есть отдельный слушатель usePacket ниже
@@ -438,7 +446,7 @@ export function useDialogFiber() {
}
const fromPublicKey = packet.getFromPublicKey();
const toPublicKey = packet.getToPublicKey();
if(fromPublicKey != publicKey){
if (fromPublicKey != publicKey) {
/**
* Игнорируем если это не синхронизация нашего прочтения
*/
@@ -454,7 +462,7 @@ export function useDialogFiber() {
addOrUpdateDialogCache(fromPublicKey, getDialogCache(fromPublicKey).map((message) => {
if (message.from_public_key == toPublicKey && !message.readed) {
console.info("Marking message as read in cache for dialog with " + fromPublicKey);
console.info({fromPublicKey, toPublicKey});
console.info({ fromPublicKey, toPublicKey });
return {
...message,
readed: 1
@@ -462,13 +470,15 @@ export function useDialogFiber() {
}
return message;
}));
});
}, [updateDialog, publicKey]);
/**
* Обработчик прочтения личных сообщений
*/
usePacket(0x07, async (packet: PacketRead) => {
if(hasGroup(packet.getToPublicKey())){
runTaskInQueue(async () => {
if (hasGroup(packet.getToPublicKey())) {
/**
* Если это относится к группам, то игнорируем здесь,
* для этого есть отдельный слушатель usePacket ниже
@@ -477,7 +487,7 @@ export function useDialogFiber() {
}
const fromPublicKey = packet.getFromPublicKey();
const toPublicKey = packet.getToPublicKey();
if(fromPublicKey == publicKey){
if (fromPublicKey == publicKey) {
/**
* Игнорируем если это наше прочтение
* которое получается при синхронизации
@@ -486,12 +496,13 @@ export function useDialogFiber() {
}
console.info("PACKED_READ_IM");
await runQuery(`UPDATE messages SET read = 1 WHERE from_public_key = ? AND to_public_key = ? AND account = ?`, [toPublicKey, fromPublicKey, publicKey]);
console.info("read im with params ", [fromPublicKey, toPublicKey, publicKey]);
updateDialog(fromPublicKey);
log("Read packet received from " + fromPublicKey + " for " + toPublicKey);
addOrUpdateDialogCache(fromPublicKey, getDialogCache(fromPublicKey).map((message) => {
if (message.from_public_key == toPublicKey && !message.readed) {
console.info("Marking message as read in cache for dialog with " + fromPublicKey);
console.info({fromPublicKey, toPublicKey});
console.info({ fromPublicKey, toPublicKey });
return {
...message,
readed: 1
@@ -499,12 +510,14 @@ export function useDialogFiber() {
}
return message;
}));
});
}, [updateDialog, publicKey]);
/**
* Обработчик прочтения групповых сообщений
*/
usePacket(0x07, async (packet: PacketRead) => {
if(!hasGroup(packet.getToPublicKey())){
runTaskInQueue(async () => {
if (!hasGroup(packet.getToPublicKey())) {
/**
* Если это не относится к группам, то игнорируем здесь,
* для этого есть отдельный слушатель usePacket выше
@@ -518,7 +531,7 @@ export function useDialogFiber() {
addOrUpdateDialogCache(toPublicKey, getDialogCache(toPublicKey).map((message) => {
if (!message.readed) {
console.info("Marking message as read in cache for dialog with " + fromPublicKey);
console.info({fromPublicKey, toPublicKey});
console.info({ fromPublicKey, toPublicKey });
return {
...message,
readed: 1
@@ -526,11 +539,13 @@ export function useDialogFiber() {
}
return message;
}));
});
}, [updateDialog]);
/**
* Обработчик доставки сообщений
*/
usePacket(0x08, async (packet: PacketDelivery) => {
runTaskInQueue(async () => {
const messageId = packet.getMessageId();
await runQuery(`UPDATE messages SET delivered = ?, timestamp = ? WHERE message_id = ? AND account = ?`, [DeliveredMessageState.DELIVERED, Date.now(), messageId, publicKey]);
updateDialog(packet.getToPublicKey());
@@ -545,5 +560,6 @@ export function useDialogFiber() {
}
return message;
}));
});
}, [updateDialog]);
}

View File

@@ -0,0 +1,57 @@
import { useEffect } from "react";
import { useProtocolState } from "../ProtocolProvider/useProtocolState";
import { ProtocolState } from "../ProtocolProvider/ProtocolProvider";
import { useDatabase } from "../DatabaseProvider/useDatabase";
import { usePublicKey } from "../AccountProvider/usePublicKey";
import { PacketSync, SyncStatus } from "../ProtocolProvider/protocol/packets/packet.sync";
import { useSender } from "../ProtocolProvider/useSender";
import { usePacket } from "../ProtocolProvider/usePacket";
import { whenFinish } from "./dialogQueue";
/**
* Хук отвечает за синхронизацию сообщений, запрос синхронизации
* при подключении
*/
export function useSynchronize() {
const [protocolState, setProtocolState] = useProtocolState();
const {getQuery} = useDatabase();
const publicKey = usePublicKey();
const send = useSender();
useEffect(() => {
if(protocolState == ProtocolState.CONNECTED){
trySync();
setProtocolState(ProtocolState.SYNCHRONIZATION);
}
}, [protocolState]);
const trySync = async () => {
const lastMessage = await getQuery("SELECT timestamp FROM messages WHERE account = ? ORDER BY timestamp DESC LIMIT 1", [publicKey]);
if(!lastMessage){
sendSynchronize(0);
return;
}
sendSynchronize(lastMessage.timestamp);
}
const sendSynchronize = (timestamp: number) => {
const packet = new PacketSync();
packet.setStatus(0);
packet.setTimestamp(timestamp);
send(packet);
}
usePacket(25, async (packet: PacketSync) => {
const status = packet.getStatus();
if(status == SyncStatus.BATCH_END){
await whenFinish();
trySync();
}
if(status == SyncStatus.NOT_NEEDED){
/**
* Синхронизация не нужна, все данные актуальны
*/
setProtocolState(ProtocolState.CONNECTED);
}
});
}

View File

@@ -12,10 +12,13 @@ export enum ProtocolState {
HANDSHAKE_EXCHANGE,
DISCONNECTED,
RECONNECTING,
DEVICE_VERIFICATION_REQUIRED
DEVICE_VERIFICATION_REQUIRED,
SYNCHRONIZATION
}
export const ProtocolContext = createContext<[Protocol|null, ProtocolState]>([null, ProtocolState.DISCONNECTED]);
export type ProtocolContextType = [Protocol|null, ProtocolState, (state: ProtocolState) => void];
export const ProtocolContext = createContext<ProtocolContextType>([null, ProtocolState.DISCONNECTED, () => {}]);
interface ProtocolProviderProps {
children: React.ReactNode;
@@ -91,7 +94,7 @@ export function ProtocolProvider(props : ProtocolProviderProps) {
}, [publicKey, privateKey, systemInfo.id]);
return (
<ProtocolContext.Provider value={[protocol, connect]}>
<ProtocolContext.Provider value={[protocol, connect, setConnect]}>
{props.children}
</ProtocolContext.Provider>
);

View File

@@ -0,0 +1,48 @@
import Packet from "../packet";
import Stream from "../stream";
export enum SyncStatus {
NOT_NEEDED,
BATCH_START,
BATCH_END
}
export class PacketSync extends Packet {
private status : SyncStatus = SyncStatus.NOT_NEEDED;
private timestamp : number = 0;
public getPacketId(): number {
return 25; //0x19
}
public _receive(stream: Stream): void {
this.status = stream.readInt8() as SyncStatus;
this.timestamp = stream.readInt64();
}
public _send(): Promise<Stream> | Stream {
let stream = new Stream();
stream.writeInt16(this.getPacketId());
stream.writeInt8(this.status);
stream.writeInt64(this.timestamp);
return stream;
}
public getStatus() : SyncStatus {
return this.status;
}
public setStatus(status: SyncStatus) {
this.status = status;
}
public getTimestamp() : number {
return this.timestamp;
}
public setTimestamp(timestamp: number) {
this.timestamp = timestamp;
}
}

View File

@@ -24,6 +24,7 @@ import { PacketGroupBan } from "./packets/packet.group.ban";
import { PacketDeviceNew } from "./packets/packet.device.new";
import { PacketDeviceList } from "./packets/packet.device.list";
import { PacketDeviceResolve } from "./packets/packet.device.resolve";
import { PacketSync } from "./packets/packet.sync";
export default class Protocol extends EventEmitter {
private serverAddress: string;
@@ -123,6 +124,7 @@ export default class Protocol extends EventEmitter {
this._supportedPackets.set(0x16, new PacketGroupBan());
this._supportedPackets.set(0x17, new PacketDeviceList());
this._supportedPackets.set(0x18, new PacketDeviceResolve());
this._supportedPackets.set(25, new PacketSync());
}
private _findWaiters(packetId: number): ((packet: Packet) => void)[] {

View File

@@ -1,12 +1,12 @@
import { useContext } from "react";
import { ProtocolContext } from "./ProtocolProvider";
import { ProtocolContext, ProtocolContextType, ProtocolState } from "./ProtocolProvider";
export const useProtocolState = () => {
const [context, connect] = useContext(ProtocolContext);
const context : ProtocolContextType = useContext(ProtocolContext);
if(!context){
throw new Error("useProtocol must be used within a ProtocolProvider");
}
return connect;
return [context[1], context[2]] as [ProtocolState, (state: ProtocolState) => void];
};

View File

@@ -12,7 +12,7 @@ import { usePacket } from "@/app/providers/ProtocolProvider/usePacket";
import { PacketDeviceResolve, Solution } from "@/app/providers/ProtocolProvider/protocol/packets/packet.device.resolve";
export function DeviceConfirm() {
const protocolState = useProtocolState();
const [protocolState] = useProtocolState();
const navigate = useNavigate();
const logout = useLogout();

View File

@@ -30,6 +30,7 @@ import { useLogout } from "@/app/providers/AccountProvider/useLogout";
import { useUpdateMessage } from "@/app/hooks/useUpdateMessage";
import { useDeviceMessage } from "@/app/hooks/useDeviceMessage";
import { UpdateProvider } from "@/app/providers/UpdateProvider/UpdateProvider";
import { useSynchronize } from "@/app/providers/DialogProvider/useSynchronize";
export function Main() {
const { mainColor, borderColor } = useRosettaColors();
@@ -54,6 +55,11 @@ export function Main() {
*/
useDeviceMessage();
/**
* Синхронизируем сообщения при подключении
*/
useSynchronize();
const { setSize, setResizeble } = useWindow();
/**