From 7f4684082e89aea0c54777540f9a23e45f7f8694 Mon Sep 17 00:00:00 2001 From: k1ngsterr1 Date: Fri, 17 Apr 2026 23:45:52 +0500 Subject: [PATCH] =?UTF-8?q?fix:=20=D0=A4=D0=B8=D0=BA=D1=81=20=D0=B1=D0=B0?= =?UTF-8?q?=D0=B3=D0=B0=20=D1=81=20=D0=BF=D0=BE=D0=B4=D0=BA=D0=BB=D1=8E?= =?UTF-8?q?=D1=87=D0=B5=D0=BD=D0=B8=D0=B5=D0=BC=20=D0=BF=D1=80=D0=B8=20?= =?UTF-8?q?=D0=BF=D0=B5=D1=80=D0=B2=D0=B8=D1=87=D0=BD=D0=BE=D0=B9=20=D1=80?= =?UTF-8?q?=D0=B5=D0=B3=D0=B8=D1=81=D1=82=D1=80=D0=B0=D1=86=D0=B8=D0=B8=20?= =?UTF-8?q?=D1=8E=D0=B7=D0=B5=D1=80=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- Architecture.md | 382 ++++++++++++++ .../com/rosetta/messenger/MainActivity.kt | 7 + .../network/ProtocolConnectionModels.kt | 33 ++ .../network/ProtocolConnectionSupervisor.kt | 48 ++ .../messenger/network/ProtocolManager.kt | 481 ++++++++++++++---- .../messenger/network/ReadyPacketGate.kt | 88 ++++ 6 files changed, 936 insertions(+), 103 deletions(-) create mode 100644 Architecture.md create mode 100644 app/src/main/java/com/rosetta/messenger/network/ProtocolConnectionModels.kt create mode 100644 app/src/main/java/com/rosetta/messenger/network/ProtocolConnectionSupervisor.kt create mode 100644 app/src/main/java/com/rosetta/messenger/network/ReadyPacketGate.kt diff --git a/Architecture.md b/Architecture.md new file mode 100644 index 0000000..ddc5f27 --- /dev/null +++ b/Architecture.md @@ -0,0 +1,382 @@ +# Rosetta Android — Architecture + +> Документ описывает **текущую** архитектуру `rosetta-android` (ветка `dev`) по коду, без идеализаций. + +## 1. Архитектурный стиль + +Приложение построено как **layered + feature-oriented** архитектура: +- UI на Jetpack Compose (`MainActivity` + `ui/*`). +- Бизнес-оркестрация в singleton-сервисах (`ProtocolManager`, `CallManager`, `TransportManager`, `UpdateManager`). +- Data слой через репозитории (`MessageRepository`, `GroupRepository`, `AvatarRepository`, `AccountManager`). +- Persistence через Room (`RosettaDatabase`). +- Crypto изолирован в `crypto/*`. + +DI-контейнера (Hilt/Koin) сейчас нет: зависимости поднимаются через `object`, `getInstance(...)`, singleton-инициализацию. + +--- + +## 2. Слои и границы + +```mermaid +flowchart TB + subgraph UI["UI Layer (Compose + ViewModel)"] + A1["MainActivity"] + A2["ui/chats/*"] + A3["ui/auth/*"] + A4["ui/settings/*"] + end + + subgraph SVC["Service Layer (Singleton orchestrators)"] + B1["ProtocolManager"] + B2["CallManager"] + B3["TransportManager"] + B4["UpdateManager"] + B5["RosettaFirebaseMessagingService"] + end + + subgraph DATA["Data Layer"] + C1["MessageRepository"] + C2["GroupRepository"] + C3["AvatarRepository"] + C4["AccountManager / PreferencesManager"] + C5["DraftManager / ForwardManager"] + end + + subgraph DB["Persistence Layer"] + D1["Room: RosettaDatabase"] + D2["DAO: message/dialog/group/etc"] + end + + subgraph NET["Network Layer"] + E1["Protocol (WebSocket)"] + E2["Packet* codec"] + E3["OkHttp HTTP (transport/update)"] + E4["WebRTC"] + end + + subgraph CRYPTO["Crypto Layer"] + F1["CryptoManager"] + F2["MessageCrypto"] + F3["XChaCha20E2EE (calls)"] + end + + UI --> SVC + UI --> DATA + SVC --> DATA + SVC --> NET + DATA --> DB + DATA --> CRYPTO + SVC --> CRYPTO +``` + +--- + +## 3. Главные модули и ответственность + +### 3.1 `MainActivity` (composition root) + +`MainActivity` — главный orchestration entrypoint приложения: +- поднимает `ProtocolManager.initialize(...)`, `CallManager.initialize(...)`; +- управляет auth-гейтингом, онбордингом и основным nav-stack (`Screen`); +- привязывает текущий аккаунт к runtime-сервисам; +- триггерит fast reconnect на `onResume`; +- следит за разрешениями (уведомления, fullscreen intent и т.д.). + +### 3.2 `RosettaApplication` + +На старте процесса инициализирует глобальные подсистемы: +- crash reporting, +- draft manager, +- transport manager, +- update manager. + +### 3.3 Репозитории + +#### `MessageRepository` +Ключевой data-центр для чатов: +- инициализация аккаунт-контекста (`publicKey/privateKey`); +- отправка сообщений (optimistic insert + сетевой send); +- обработка входящих `PacketMessage`/`PacketDelivery`/`PacketRead`; +- обновление `dialogs`, `messages`, `message_search_index`; +- синк timestamp (`accounts_sync_times`); +- шина событий для UI (`newMessageEvents`, `deliveryStatusEvents`). + +#### `GroupRepository` +- хранение и операции по группам, +- интеграция с `PacketGroup*`. + +#### `AvatarRepository` +- кэш/история аватаров (Room + file storage), +- реактивная выдача аватаров через `Flow`. + +#### `AccountManager` +- хранение аккаунтов в DataStore, +- last logged public/private hash в SharedPreferences для быстрых синхронных чтений. + +--- + +## 4. Сетевой стек + +## 4.1 `Protocol` (низкий уровень) + +`Protocol` отвечает за: +- WebSocket lifecycle (`DISCONNECTED/CONNECTING/CONNECTED/HANDSHAKING/DEVICE_VERIFICATION_REQUIRED/AUTHENTICATED`), +- heartbeat, +- reconnect/backoff, +- packet encode/decode, +- `waitPacket/unwaitPacket` обработчики, +- queue pre-handshake пакетов. + +Ключевые механизмы устойчивости: +- `lifecycleMutex` для serialized lifecycle операций, +- `connectionGeneration` для игнора stale callbacks старых сокетов, +- guard `isConnecting` от параллельного `connect()`. + +## 4.2 `ProtocolManager` (верхний уровень) + +`ProtocolManager` — orchestration-слой над `Protocol`: +- единая точка для UI/Data слоёв (`send`, `authenticate`, `reconnect`, `waitPacket` и т.д.); +- bootstrap после auth (sync, own-profile resolve, push subscribe); +- маршрутизация входящих packet-ов в репозитории/подсистемы; +- call signaling bridge для `CallManager`; +- управление typed caches (`SearchUser`, user info cache). + +Новый runtime-дизайн connection orchestration: +- `ProtocolConnectionModels.kt` — `ConnectionLifecycleState`, `ConnectionEvent`, bootstrap context; +- `ProtocolConnectionSupervisor.kt` — actor/event-loop для serialized событий; +- `ReadyPacketGate.kt` — очередь пакетов до состояния `READY` (TTL + max size). + +--- + +## 5. Lifecycle состояния соединения (верхний уровень) + +`ProtocolManager.connectionLifecycleState`: +- `DISCONNECTED` +- `CONNECTING` +- `HANDSHAKING` +- `AUTHENTICATED` +- `BOOTSTRAPPING` +- `READY` +- `DEVICE_VERIFICATION_REQUIRED` + +Переход в `READY` происходит только когда одновременно выполнено: +- аккаунт инициализирован, +- протокол аутентифицирован, +- sync завершён, +- own-profile резолвнут (или истёк fallback timeout). + +```mermaid +stateDiagram-v2 + [*] --> DISCONNECTED + DISCONNECTED --> CONNECTING + CONNECTING --> HANDSHAKING + HANDSHAKING --> DEVICE_VERIFICATION_REQUIRED + HANDSHAKING --> AUTHENTICATED + AUTHENTICATED --> BOOTSTRAPPING + BOOTSTRAPPING --> READY + READY --> DISCONNECTED + DEVICE_VERIFICATION_REQUIRED --> CONNECTING +``` + +--- + +## 6. Поток auth / session bootstrap + +```mermaid +sequenceDiagram + participant UI as AuthFlow/MainActivity + participant PM as ProtocolManager + participant P as Protocol + participant MR as MessageRepository + + UI->>PM: initializeAccount(public, private) + UI->>PM: connect() + UI->>PM: authenticate(public, privateHash) + PM->>P: connect + startHandshake + P-->>PM: AUTHENTICATED + PM->>PM: onAuthenticated() + PM->>PM: subscribePushTokenIfAvailable() + PM->>PM: requestSynchronize() + PM->>MR: initialize(...) + PM-->>PM: SyncCompleted + OwnProfileResolved + PM-->>UI: connectionLifecycleState = READY +``` + +Критично: отправка пользовательских пакетов до `READY` не теряется, а попадает в `ReadyPacketGate`. + +--- + +## 7. Поток сообщений (send/receive/delivery) + +## 7.1 Отправка +1. UI (`ChatViewModel`) вызывает отправку через `MessageRepository.sendMessage(...)`. +2. Репозиторий делает optimistic insert в `messages` (`WAITING`). +3. Формируется `PacketMessage`. +4. Отправка идёт в `ProtocolManager.send(...)`. +5. Если состояние не `READY`, пакет ставится в `ReadyPacketGate`. +6. После `READY` очередь сбрасывается в `Protocol.sendPacket(...)`. + +## 7.2 Подтверждение доставки +1. Сервер присылает `PacketDelivery (0x08)`. +2. `ProtocolManager` маршрутизирует в `MessageRepository.handleDelivery(...)`. +3. Репозиторий обновляет статус в Room. +4. UI получает апдейт через `Flow`/события. + +## 7.3 Входящие +1. `PacketMessage (0x06)` приходит в `Protocol`. +2. `ProtocolManager` dispatch → `MessageRepository.handleIncomingMessage(...)`. +3. Сообщение сохраняется в Room, обновляется `dialogs` и индексы поиска. +4. UI реактивно перерисовывается. + +--- + +## 8. Sync-пайплайн + +Используется пакет `PacketSync (0x19)` и режимы: +- `BATCH_START` +- поток синк-пакетов (`MESSAGE/READ/DELIVERY/...`) +- `BATCH_END` +- `NOT_NEEDED` + +Особенности: +- есть последовательная очередь inbound задач для сохранения порядка обработки; +- after-sync hooks: retry waiting messages, request missing user info; +- sync timestamp хранится в `accounts_sync_times`. + +--- + +## 9. Calls (WebRTC + signaling) + +`CallManager` использует: +- signaling пакеты: `PacketSignalPeer (0x1A)`, `PacketWebRTC (0x1B)`; +- ICE: `PacketIceServers (0x1C)`; +- WebRTC stack (`PeerConnectionFactory`, `PeerConnection`, audio track); +- E2EE голоса через `XChaCha20E2EE` обвязки sender/receiver. + +Основные фазы звонка: +- `IDLE` → `INCOMING`/`OUTGOING` → `CONNECTING` → `ACTIVE` → `IDLE`. + +```mermaid +stateDiagram-v2 + [*] --> IDLE + IDLE --> OUTGOING + IDLE --> INCOMING + OUTGOING --> CONNECTING + INCOMING --> CONNECTING + CONNECTING --> ACTIVE + ACTIVE --> IDLE + OUTGOING --> IDLE + INCOMING --> IDLE +``` + +--- + +## 10. Transport (вложения) + +`TransportManager`: +- получает transport server через `PacketRequestTransport (0x0F)` (desktop parity); +- upload/download через OkHttp; +- resumable download (HTTP Range); +- трекает состояния прогресса через `StateFlow` (`uploading`, `downloading`); +- поддерживает cancel/pause/resume через `FileDownloadManager`. + +--- + +## 11. Push Notifications + +`RosettaFirebaseMessagingService`: +- обрабатывает `onNewToken` и подписку токена через `ProtocolManager`; +- dedup пушей; +- маршрутизация типов (`personal_message`, `group_message`, `call`, `read`); +- очистка уведомлений по read events; +- wake-up reconnect при silent push. + +--- + +## 12. Обновления (SDU) + +`UpdateManager`: +1. запрашивает update server через `PacketRequestUpdate (0x0A)`; +2. ходит на SDU HTTP endpoint; +3. скачивает APK через `DownloadManager`; +4. ведёт update state machine (`Idle/Checking/UpdateAvailable/Downloading/ReadyToInstall/Error`). + +--- + +## 13. Persistence (Room) + +`RosettaDatabase` (version `17`) включает: +- `messages` — сообщения, +- `dialogs` — диалоги + денормализованные поля для быстрых списков, +- `message_search_index` — локальный индекс поиска, +- `groups` — группы, +- `pinned_messages` — закрепы, +- `avatar_cache` — аватары, +- `blacklist` — blacklist, +- `accounts_sync_times` — sync cursor, +- `encrypted_accounts` — аккаунты (legacy Room account storage). + +Есть длинная цепочка миграций (4→17) с оптимизациями под производительность и денормализацию. + +--- + +## 14. Crypto + +- `CryptoManager`: +- seed phrase / keypair, +- PBKDF2-derived key caching, +- encrypt/decrypt для локального хранения. + +- `MessageCrypto`: +- message-level XChaCha20-Poly1305, +- ECDH/AES-обмен ключом для payload, +- attachment decrypt logic. + +Crypto и network связаны через `MessageRepository`/`ProtocolManager`. + +--- + +## 15. Наблюдаемость и диагностика + +- wire/protocol логи через `ProtocolManager.addLog(...)` + trace file; +- debug logs доступны в UI; +- отдельные диагностические логи для звонков (`CallManager` breadcrumbs). + +--- + +## 16. Текущие архитектурные сильные стороны + +- Реактивная модель состояния (`StateFlow`) на большинстве критических путей. +- Сильная декомпозиция packet протокола (`Packet*`). +- Наличие ready-gate и serialized supervisor снижает race-condition в соединении. +- Room + денормализация ускоряют списки чатов/поиск. + +--- + +## 17. Текущие архитектурные риски + +- `MainActivity` остаётся очень крупным composition root. +- `ProtocolManager` и `MessageRepository` всё ещё крупные “god objects”. +- Отсутствие DI усложняет управляемость зависимостей/тестируемость. +- Часть жизненного цикла связана через runtime singleton state, что повышает риск регрессий при эволюции. + +--- + +## 18. Карта ключевых файлов + +- `app/src/main/java/com/rosetta/messenger/MainActivity.kt` +- `app/src/main/java/com/rosetta/messenger/RosettaApplication.kt` +- `app/src/main/java/com/rosetta/messenger/network/Protocol.kt` +- `app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt` +- `app/src/main/java/com/rosetta/messenger/network/ProtocolConnectionModels.kt` +- `app/src/main/java/com/rosetta/messenger/network/ProtocolConnectionSupervisor.kt` +- `app/src/main/java/com/rosetta/messenger/network/ReadyPacketGate.kt` +- `app/src/main/java/com/rosetta/messenger/network/CallManager.kt` +- `app/src/main/java/com/rosetta/messenger/network/TransportManager.kt` +- `app/src/main/java/com/rosetta/messenger/push/RosettaFirebaseMessagingService.kt` +- `app/src/main/java/com/rosetta/messenger/update/UpdateManager.kt` +- `app/src/main/java/com/rosetta/messenger/data/MessageRepository.kt` +- `app/src/main/java/com/rosetta/messenger/database/RosettaDatabase.kt` +- `app/src/main/java/com/rosetta/messenger/crypto/CryptoManager.kt` +- `app/src/main/java/com/rosetta/messenger/crypto/MessageCrypto.kt` + diff --git a/app/src/main/java/com/rosetta/messenger/MainActivity.kt b/app/src/main/java/com/rosetta/messenger/MainActivity.kt index f40769a..8c0963e 100644 --- a/app/src/main/java/com/rosetta/messenger/MainActivity.kt +++ b/app/src/main/java/com/rosetta/messenger/MainActivity.kt @@ -353,6 +353,13 @@ class MainActivity : FragmentActivity() { // При открытии по звонку с lock screen — пропускаем auth openedForCall && hasExistingAccount == true -> "main" + // First-registration race: DataStore may flip isLoggedIn=true + // before AuthFlow returns DecryptedAccount to currentAccount. + // Do not enter MainScreen with null account (it leads to + // empty keys/UI placeholders until relog). + isLoggedIn == true && currentAccount == null && + hasExistingAccount == false -> + "auth_new" isLoggedIn != true && hasExistingAccount == false -> "auth_new" isLoggedIn != true && hasExistingAccount == true -> diff --git a/app/src/main/java/com/rosetta/messenger/network/ProtocolConnectionModels.kt b/app/src/main/java/com/rosetta/messenger/network/ProtocolConnectionModels.kt new file mode 100644 index 0000000..98d6aa0 --- /dev/null +++ b/app/src/main/java/com/rosetta/messenger/network/ProtocolConnectionModels.kt @@ -0,0 +1,33 @@ +package com.rosetta.messenger.network + +enum class ConnectionLifecycleState { + DISCONNECTED, + CONNECTING, + HANDSHAKING, + AUTHENTICATED, + BOOTSTRAPPING, + READY, + DEVICE_VERIFICATION_REQUIRED +} + +sealed interface ConnectionEvent { + data class InitializeAccount(val publicKey: String, val privateKey: String) : ConnectionEvent + data class Connect(val reason: String) : ConnectionEvent + data class FastReconnect(val reason: String) : ConnectionEvent + data class Disconnect(val reason: String, val clearCredentials: Boolean) : ConnectionEvent + data class Authenticate(val publicKey: String, val privateHash: String) : ConnectionEvent + data class ProtocolStateChanged(val state: ProtocolState) : ConnectionEvent + data class SendPacket(val packet: Packet) : ConnectionEvent + data class SyncCompleted(val reason: String) : ConnectionEvent + data class OwnProfileResolved(val publicKey: String) : ConnectionEvent + data class OwnProfileFallbackTimeout(val sessionGeneration: Long) : ConnectionEvent +} + +data class ConnectionBootstrapContext( + val accountPublicKey: String = "", + val accountInitialized: Boolean = false, + val protocolState: ProtocolState = ProtocolState.DISCONNECTED, + val authenticated: Boolean = false, + val syncCompleted: Boolean = false, + val ownProfileResolved: Boolean = false +) diff --git a/app/src/main/java/com/rosetta/messenger/network/ProtocolConnectionSupervisor.kt b/app/src/main/java/com/rosetta/messenger/network/ProtocolConnectionSupervisor.kt new file mode 100644 index 0000000..12f0bcb --- /dev/null +++ b/app/src/main/java/com/rosetta/messenger/network/ProtocolConnectionSupervisor.kt @@ -0,0 +1,48 @@ +package com.rosetta.messenger.network + +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.launch + +class ProtocolConnectionSupervisor( + private val scope: CoroutineScope, + private val onEvent: suspend (ConnectionEvent) -> Unit, + private val onError: (Throwable) -> Unit, + private val addLog: (String) -> Unit +) { + private val eventChannel = Channel(Channel.UNLIMITED) + private val lock = Any() + + @Volatile private var job: Job? = null + + fun start() { + if (job?.isActive == true) return + synchronized(lock) { + if (job?.isActive == true) return + job = + scope.launch { + for (event in eventChannel) { + try { + onEvent(event) + } catch (e: CancellationException) { + throw e + } catch (e: Exception) { + addLog("❌ ConnectionSupervisor event failed: ${e.message}") + onError(e) + } + } + } + addLog("🧠 ConnectionSupervisor started") + } + } + + fun post(event: ConnectionEvent) { + start() + val result = eventChannel.trySend(event) + if (result.isFailure) { + scope.launch { eventChannel.send(event) } + } + } +} diff --git a/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt b/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt index 1b11f73..28aeb69 100644 --- a/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt +++ b/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt @@ -46,6 +46,9 @@ object ProtocolManager { private const val PACKET_WEB_RTC = 0x1B private const val PACKET_ICE_SERVERS = 0x1C private const val NETWORK_WAIT_TIMEOUT_MS = 20_000L + private const val BOOTSTRAP_OWN_PROFILE_FALLBACK_MS = 2_500L + private const val READY_PACKET_QUEUE_MAX = 500 + private const val READY_PACKET_QUEUE_TTL_MS = 120_000L // Desktop parity: use the same primary WebSocket endpoint as desktop client. private const val SERVER_ADDRESS = "wss://wss.rosetta.im" @@ -59,6 +62,19 @@ object ProtocolManager { private var appContext: Context? = null private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) private val protocolInstanceLock = Any() + private val connectionSupervisor = + ProtocolConnectionSupervisor( + scope = scope, + onEvent = ::handleConnectionEvent, + onError = { error -> android.util.Log.e(TAG, "ConnectionSupervisor event failed", error) }, + addLog = ::addLog + ) + private val sessionGeneration = AtomicLong(0L) + private val readyPacketGate = + ReadyPacketGate( + maxSize = READY_PACKET_QUEUE_MAX, + ttlMs = READY_PACKET_QUEUE_TTL_MS + ) @Volatile private var packetHandlersRegistered = false @Volatile private var stateMonitoringStarted = false @@ -68,6 +84,7 @@ object ProtocolManager { @Volatile private var networkReconnectRegistered = false @Volatile private var networkReconnectCallback: ConnectivityManager.NetworkCallback? = null @Volatile private var networkReconnectTimeoutJob: Job? = null + @Volatile private var ownProfileFallbackJob: Job? = null // Guard: prevent duplicate FCM token subscribe within a single session @Volatile @@ -116,10 +133,338 @@ object ProtocolManager { private fun normalizeSearchQuery(value: String): String = value.trim().removePrefix("@").lowercase(Locale.ROOT) + + private fun ensureConnectionSupervisor() { + connectionSupervisor.start() + } + + private fun postConnectionEvent(event: ConnectionEvent) { + connectionSupervisor.post(event) + } + + private fun protocolToLifecycleState(state: ProtocolState): ConnectionLifecycleState = + when (state) { + ProtocolState.DISCONNECTED -> ConnectionLifecycleState.DISCONNECTED + ProtocolState.CONNECTING -> ConnectionLifecycleState.CONNECTING + ProtocolState.CONNECTED, ProtocolState.HANDSHAKING -> ConnectionLifecycleState.HANDSHAKING + ProtocolState.DEVICE_VERIFICATION_REQUIRED -> + ConnectionLifecycleState.DEVICE_VERIFICATION_REQUIRED + ProtocolState.AUTHENTICATED -> ConnectionLifecycleState.AUTHENTICATED + } + + private fun setConnectionLifecycleState(next: ConnectionLifecycleState, reason: String) { + if (_connectionLifecycleState.value == next) return + addLog("🧭 CONNECTION STATE: ${_connectionLifecycleState.value} -> $next ($reason)") + _connectionLifecycleState.value = next + } + + private fun recomputeConnectionLifecycleState(reason: String) { + val context = bootstrapContext + val nextState = + if (context.authenticated) { + if (context.accountInitialized && context.syncCompleted && context.ownProfileResolved) { + ConnectionLifecycleState.READY + } else { + ConnectionLifecycleState.BOOTSTRAPPING + } + } else { + protocolToLifecycleState(context.protocolState) + } + setConnectionLifecycleState(nextState, reason) + if (nextState == ConnectionLifecycleState.READY) { + flushReadyPacketQueue(context.accountPublicKey, reason) + } + } + + private fun clearReadyPacketQueue(reason: String) { + readyPacketGate.clear(reason = reason, addLog = ::addLog) + } + + private fun enqueueReadyPacket(packet: Packet) { + val accountKey = + messageRepository?.getCurrentAccountKey()?.trim().orEmpty().ifBlank { + bootstrapContext.accountPublicKey + } + readyPacketGate.enqueue( + packet = packet, + accountPublicKey = accountKey, + state = _connectionLifecycleState.value, + shortKeyForLog = ::shortKeyForLog, + addLog = ::addLog + ) + } + + private fun flushReadyPacketQueue(activeAccountKey: String, reason: String) { + val packetsToSend = + readyPacketGate.drainForAccount( + activeAccountKey = activeAccountKey, + reason = reason, + addLog = ::addLog + ) + if (packetsToSend.isEmpty()) return + val protocolInstance = getProtocol() + packetsToSend.forEach { protocolInstance.sendPacket(it) } + } + + private fun packetCanBypassReadyGate(packet: Packet): Boolean = + when (packet) { + is PacketHandshake, + is PacketSync, + is PacketSearch, + is PacketPushNotification, + is PacketRequestTransport, + is PacketRequestUpdate, + is PacketSignalPeer, + is PacketWebRTC, + is PacketIceServers, + is PacketDeviceResolve -> true + else -> false + } + + private suspend fun handleConnectionEvent(event: ConnectionEvent) { + when (event) { + is ConnectionEvent.InitializeAccount -> { + val normalizedPublicKey = event.publicKey.trim() + val normalizedPrivateKey = event.privateKey.trim() + if (normalizedPublicKey.isBlank() || normalizedPrivateKey.isBlank()) { + addLog("⚠️ initializeAccount skipped: missing account credentials") + return + } + + val protocolState = getProtocol().state.value + addLog( + "🔐 initializeAccount pk=${shortKeyForLog(normalizedPublicKey)} keyLen=${normalizedPrivateKey.length} state=$protocolState" + ) + setSyncInProgress(false) + clearTypingState() + if (messageRepository == null) { + appContext?.let { messageRepository = MessageRepository.getInstance(it) } + } + messageRepository?.initialize(normalizedPublicKey, normalizedPrivateKey) + + val sameAccount = + bootstrapContext.accountPublicKey.equals(normalizedPublicKey, ignoreCase = true) + if (!sameAccount) { + clearReadyPacketQueue("account_switch") + } + + bootstrapContext = + bootstrapContext.copy( + accountPublicKey = normalizedPublicKey, + accountInitialized = true, + syncCompleted = if (sameAccount) bootstrapContext.syncCompleted else false, + ownProfileResolved = if (sameAccount) bootstrapContext.ownProfileResolved else false + ) + recomputeConnectionLifecycleState("account_initialized") + + val shouldResync = + resyncRequiredAfterAccountInit || protocol?.isAuthenticated() == true + if (shouldResync) { + resyncRequiredAfterAccountInit = false + syncRequestInFlight = false + clearSyncRequestTimeout() + addLog( + "🔄 Account initialized (${shortKeyForLog(normalizedPublicKey)}) -> force sync" + ) + requestSynchronize() + } + if ( + protocol?.isAuthenticated() == true && + activeAuthenticatedSessionId > 0L && + lastBootstrappedSessionId != activeAuthenticatedSessionId + ) { + tryRunPostAuthBootstrap("account_initialized") + } + + scope.launch { + messageRepository?.checkAndSendVersionUpdateMessage() + } + } + is ConnectionEvent.Connect -> { + if (!hasActiveInternet()) { + waitForNetworkAndReconnect("connect:${event.reason}") + return + } + stopWaitingForNetwork("connect:${event.reason}") + getProtocol().connect() + } + is ConnectionEvent.FastReconnect -> { + if (!hasActiveInternet()) { + waitForNetworkAndReconnect("reconnect:${event.reason}") + return + } + stopWaitingForNetwork("reconnect:${event.reason}") + getProtocol().reconnectNowIfNeeded(event.reason) + } + is ConnectionEvent.Disconnect -> { + stopWaitingForNetwork(event.reason) + protocol?.disconnect() + if (event.clearCredentials) { + protocol?.clearCredentials() + } + messageRepository?.clearInitialization() + clearTypingState() + _devices.value = emptyList() + _pendingDeviceVerification.value = null + syncRequestInFlight = false + clearSyncRequestTimeout() + setSyncInProgress(false) + resyncRequiredAfterAccountInit = false + lastSubscribedToken = null + ownProfileFallbackJob?.cancel() + ownProfileFallbackJob = null + deferredAuthBootstrap = false + activeAuthenticatedSessionId = 0L + lastBootstrappedSessionId = 0L + bootstrapContext = ConnectionBootstrapContext() + clearReadyPacketQueue("disconnect:${event.reason}") + recomputeConnectionLifecycleState("disconnect:${event.reason}") + } + is ConnectionEvent.Authenticate -> { + appContext?.let { context -> + runCatching { + val accountManager = AccountManager(context) + accountManager.setLastLoggedPublicKey(event.publicKey) + accountManager.setLastLoggedPrivateKeyHash(event.privateHash) + } + } + val device = buildHandshakeDevice() + getProtocol().startHandshake(event.publicKey, event.privateHash, device) + } + is ConnectionEvent.ProtocolStateChanged -> { + val previousProtocolState = bootstrapContext.protocolState + val newProtocolState = event.state + + if ( + newProtocolState == ProtocolState.AUTHENTICATED && + previousProtocolState != ProtocolState.AUTHENTICATED + ) { + lastSubscribedToken = null + stopWaitingForNetwork("authenticated") + ownProfileFallbackJob?.cancel() + val generation = sessionGeneration.incrementAndGet() + activeAuthenticatedSessionId = authenticatedSessionCounter.incrementAndGet() + deferredAuthBootstrap = false + bootstrapContext = + bootstrapContext.copy( + protocolState = newProtocolState, + authenticated = true, + syncCompleted = false, + ownProfileResolved = false + ) + recomputeConnectionLifecycleState("protocol_authenticated") + ownProfileFallbackJob = + scope.launch { + delay(BOOTSTRAP_OWN_PROFILE_FALLBACK_MS) + postConnectionEvent( + ConnectionEvent.OwnProfileFallbackTimeout(generation) + ) + } + onAuthenticated() + return + } + + if ( + newProtocolState != ProtocolState.AUTHENTICATED && + newProtocolState != ProtocolState.HANDSHAKING + ) { + syncRequestInFlight = false + clearSyncRequestTimeout() + setSyncInProgress(false) + lastSubscribedToken = null + cancelAllOutgoingRetries() + ownProfileFallbackJob?.cancel() + ownProfileFallbackJob = null + deferredAuthBootstrap = false + activeAuthenticatedSessionId = 0L + bootstrapContext = + bootstrapContext.copy( + protocolState = newProtocolState, + authenticated = false, + syncCompleted = false, + ownProfileResolved = false + ) + recomputeConnectionLifecycleState("protocol_state_${newProtocolState.name.lowercase(Locale.ROOT)}") + return + } + + if (newProtocolState == ProtocolState.HANDSHAKING && bootstrapContext.authenticated) { + ownProfileFallbackJob?.cancel() + ownProfileFallbackJob = null + deferredAuthBootstrap = false + activeAuthenticatedSessionId = 0L + bootstrapContext = + bootstrapContext.copy( + protocolState = newProtocolState, + authenticated = false, + syncCompleted = false, + ownProfileResolved = false + ) + recomputeConnectionLifecycleState("protocol_re_handshaking") + return + } + + bootstrapContext = bootstrapContext.copy(protocolState = newProtocolState) + recomputeConnectionLifecycleState("protocol_state_${newProtocolState.name.lowercase(Locale.ROOT)}") + } + is ConnectionEvent.SendPacket -> { + val packet = event.packet + val lifecycle = _connectionLifecycleState.value + if (packetCanBypassReadyGate(packet) || lifecycle == ConnectionLifecycleState.READY) { + getProtocol().sendPacket(packet) + } else { + enqueueReadyPacket(packet) + if (!isAuthenticated()) { + if (!hasActiveInternet()) { + waitForNetworkAndReconnect("ready_gate_send") + } else { + getProtocol().reconnectNowIfNeeded("ready_gate_send") + } + } + } + } + is ConnectionEvent.SyncCompleted -> { + syncRequestInFlight = false + clearSyncRequestTimeout() + inboundProcessingFailures.set(0) + inboundTasksInCurrentBatch.set(0) + fullFailureBatchStreak.set(0) + addLog(event.reason) + setSyncInProgress(false) + retryWaitingMessages() + requestMissingUserInfo() + + bootstrapContext = bootstrapContext.copy(syncCompleted = true) + recomputeConnectionLifecycleState("sync_completed") + } + is ConnectionEvent.OwnProfileResolved -> { + val accountPublicKey = bootstrapContext.accountPublicKey + val matchesAccount = + accountPublicKey.isBlank() || + event.publicKey.equals(accountPublicKey, ignoreCase = true) + if (!matchesAccount) return + ownProfileFallbackJob?.cancel() + ownProfileFallbackJob = null + bootstrapContext = bootstrapContext.copy(ownProfileResolved = true) + recomputeConnectionLifecycleState("own_profile_resolved") + } + is ConnectionEvent.OwnProfileFallbackTimeout -> { + if (sessionGeneration.get() != event.sessionGeneration) return + if (!bootstrapContext.authenticated || bootstrapContext.ownProfileResolved) return + addLog( + "⏱️ Own profile fetch timeout — continuing bootstrap for ${shortKeyForLog(bootstrapContext.accountPublicKey)}" + ) + bootstrapContext = bootstrapContext.copy(ownProfileResolved = true) + recomputeConnectionLifecycleState("own_profile_fallback_timeout") + } + } + } // Keep heavy protocol/message UI logs disabled by default. private var uiLogsEnabled = false - private var lastProtocolState: ProtocolState? = null + private val _connectionLifecycleState = MutableStateFlow(ConnectionLifecycleState.DISCONNECTED) + val connectionLifecycleState: StateFlow = _connectionLifecycleState.asStateFlow() + private var bootstrapContext = ConnectionBootstrapContext() @Volatile private var syncBatchInProgress = false private val _syncInProgress = MutableStateFlow(false) val syncInProgress: StateFlow = _syncInProgress.asStateFlow() @@ -259,6 +604,7 @@ object ProtocolManager { appContext = context.applicationContext messageRepository = MessageRepository.getInstance(context) groupRepository = GroupRepository.getInstance(context) + ensureConnectionSupervisor() if (!packetHandlersRegistered) { setupPacketHandlers() packetHandlersRegistered = true @@ -275,27 +621,7 @@ object ProtocolManager { private fun setupStateMonitoring() { scope.launch { getProtocol().state.collect { newState -> - val previous = lastProtocolState - if (newState == ProtocolState.AUTHENTICATED && previous != ProtocolState.AUTHENTICATED) { - // New authenticated websocket session: always allow fresh push subscribe. - lastSubscribedToken = null - stopWaitingForNetwork("authenticated") - activeAuthenticatedSessionId = authenticatedSessionCounter.incrementAndGet() - deferredAuthBootstrap = false - onAuthenticated() - } - if (newState != ProtocolState.AUTHENTICATED && newState != ProtocolState.HANDSHAKING) { - syncRequestInFlight = false - clearSyncRequestTimeout() - setSyncInProgress(false) - // Connection/session dropped: force re-subscribe on next AUTHENTICATED. - lastSubscribedToken = null - deferredAuthBootstrap = false - // iOS parity: cancel all pending outgoing retries on disconnect. - // They will be retried via retryWaitingMessages() on next handshake. - cancelAllOutgoingRetries() - } - lastProtocolState = newState + postConnectionEvent(ConnectionEvent.ProtocolStateChanged(newState)) } } } @@ -305,38 +631,9 @@ object ProtocolManager { * Должен вызываться после авторизации пользователя */ fun initializeAccount(publicKey: String, privateKey: String) { - val normalizedPublicKey = publicKey.trim() - val normalizedPrivateKey = privateKey.trim() - if (normalizedPublicKey.isBlank() || normalizedPrivateKey.isBlank()) { - addLog("⚠️ initializeAccount skipped: missing account credentials") - return - } - - addLog( - "🔐 initializeAccount pk=${shortKeyForLog(normalizedPublicKey)} keyLen=${normalizedPrivateKey.length} state=${getProtocol().state.value}" + postConnectionEvent( + ConnectionEvent.InitializeAccount(publicKey = publicKey, privateKey = privateKey) ) - setSyncInProgress(false) - clearTypingState() - messageRepository?.initialize(normalizedPublicKey, normalizedPrivateKey) - if (deferredAuthBootstrap && protocol?.isAuthenticated() == true) { - addLog("🔁 AUTH bootstrap resume after initializeAccount") - } - - val shouldResync = resyncRequiredAfterAccountInit || protocol?.isAuthenticated() == true - if (shouldResync) { - // Late account init may happen while an old sync request flag is still set. - // Force a fresh synchronize request to recover dropped inbound packets. - resyncRequiredAfterAccountInit = false - syncRequestInFlight = false - clearSyncRequestTimeout() - addLog("🔄 Account initialized (${shortKeyForLog(normalizedPublicKey)}) -> force sync") - requestSynchronize() - } - tryRunPostAuthBootstrap("initialize_account") - // Send "Rosetta Updates" message on version change (like desktop useUpdateMessage) - scope.launch { - messageRepository?.checkAndSendVersionUpdateMessage() - } } /** @@ -576,6 +873,9 @@ object ProtocolManager { accountManager.updateAccountUsername(ownPublicKey, user.username) } _ownProfileUpdated.value = System.currentTimeMillis() + postConnectionEvent( + ConnectionEvent.OwnProfileResolved(user.publicKey) + ) } } } @@ -851,15 +1151,7 @@ object ProtocolManager { } private fun finishSyncCycle(reason: String) { - syncRequestInFlight = false - clearSyncRequestTimeout() - inboundProcessingFailures.set(0) - inboundTasksInCurrentBatch.set(0) - fullFailureBatchStreak.set(0) - addLog(reason) - setSyncInProgress(false) - retryWaitingMessages() - requestMissingUserInfo() + postConnectionEvent(ConnectionEvent.SyncCompleted(reason)) } /** @@ -1148,7 +1440,9 @@ object ProtocolManager { if (hasActiveInternet()) { addLog("📡 NETWORK AVAILABLE → reconnect") stopWaitingForNetwork("available") - getProtocol().connect() + postConnectionEvent( + ConnectionEvent.FastReconnect("network_available") + ) } } @@ -1159,7 +1453,9 @@ object ProtocolManager { if (networkCapabilities.hasCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)) { addLog("📡 NETWORK CAPABILITIES READY → reconnect") stopWaitingForNetwork("capabilities_changed") - getProtocol().connect() + postConnectionEvent( + ConnectionEvent.FastReconnect("network_capabilities_changed") + ) } } } @@ -1187,7 +1483,9 @@ object ProtocolManager { }.onFailure { error -> addLog("⚠️ NETWORK WAIT register failed: ${error.message}") stopWaitingForNetwork("register_failed") - getProtocol().reconnectNowIfNeeded("network_wait_register_failed") + postConnectionEvent( + ConnectionEvent.FastReconnect("network_wait_register_failed") + ) } networkReconnectTimeoutJob?.cancel() @@ -1197,7 +1495,9 @@ object ProtocolManager { if (!hasActiveInternet()) { addLog("⏱️ NETWORK WAIT timeout (${NETWORK_WAIT_TIMEOUT_MS}ms), reconnect fallback") stopWaitingForNetwork("timeout") - getProtocol().reconnectNowIfNeeded("network_wait_timeout") + postConnectionEvent( + ConnectionEvent.FastReconnect("network_wait_timeout") + ) } } } @@ -1240,24 +1540,14 @@ object ProtocolManager { * Connect to server */ fun connect() { - if (!hasActiveInternet()) { - waitForNetworkAndReconnect("connect") - return - } - stopWaitingForNetwork("connect") - getProtocol().connect() + postConnectionEvent(ConnectionEvent.Connect(reason = "api_connect")) } /** * Trigger immediate reconnect on app foreground (skip waiting backoff timer). */ fun reconnectNowIfNeeded(reason: String = "foreground_resume") { - if (!hasActiveInternet()) { - waitForNetworkAndReconnect("reconnect:$reason") - return - } - stopWaitingForNetwork("reconnect:$reason") - getProtocol().reconnectNowIfNeeded(reason) + postConnectionEvent(ConnectionEvent.FastReconnect(reason = reason)) } /** @@ -1308,15 +1598,9 @@ object ProtocolManager { * Authenticate with server */ fun authenticate(publicKey: String, privateHash: String) { - appContext?.let { context -> - runCatching { - val accountManager = AccountManager(context) - accountManager.setLastLoggedPublicKey(publicKey) - accountManager.setLastLoggedPrivateKeyHash(privateHash) - } - } - val device = buildHandshakeDevice() - getProtocol().startHandshake(publicKey, privateHash, device) + postConnectionEvent( + ConnectionEvent.Authenticate(publicKey = publicKey, privateHash = privateHash) + ) } /** @@ -1546,7 +1830,7 @@ object ProtocolManager { * Send packet (simplified) */ fun send(packet: Packet) { - getProtocol().sendPacket(packet) + postConnectionEvent(ConnectionEvent.SendPacket(packet)) } /** @@ -1842,21 +2126,12 @@ object ProtocolManager { * Disconnect and clear */ fun disconnect() { - stopWaitingForNetwork("manual_disconnect") - protocol?.disconnect() - protocol?.clearCredentials() - messageRepository?.clearInitialization() - clearTypingState() - _devices.value = emptyList() - _pendingDeviceVerification.value = null - syncRequestInFlight = false - clearSyncRequestTimeout() - setSyncInProgress(false) - resyncRequiredAfterAccountInit = false - deferredAuthBootstrap = false - activeAuthenticatedSessionId = 0L - lastBootstrappedSessionId = 0L - lastSubscribedToken = null // reset so token is re-sent on next connect + postConnectionEvent( + ConnectionEvent.Disconnect( + reason = "manual_disconnect", + clearCredentials = true + ) + ) } /** diff --git a/app/src/main/java/com/rosetta/messenger/network/ReadyPacketGate.kt b/app/src/main/java/com/rosetta/messenger/network/ReadyPacketGate.kt new file mode 100644 index 0000000..7e0217f --- /dev/null +++ b/app/src/main/java/com/rosetta/messenger/network/ReadyPacketGate.kt @@ -0,0 +1,88 @@ +package com.rosetta.messenger.network + +class ReadyPacketGate( + private val maxSize: Int, + private val ttlMs: Long +) { + private data class QueuedPacket( + val packet: Packet, + val accountPublicKey: String, + val queuedAtMs: Long + ) + + private val queue = ArrayDeque() + + fun clear(reason: String, addLog: (String) -> Unit) { + val clearedCount = + synchronized(queue) { + val count = queue.size + queue.clear() + count + } + if (clearedCount > 0) { + addLog("🧹 READY-GATE queue cleared: $clearedCount packet(s), reason=$reason") + } + } + + fun enqueue( + packet: Packet, + accountPublicKey: String, + state: ConnectionLifecycleState, + shortKeyForLog: (String) -> String, + addLog: (String) -> Unit + ) { + val now = System.currentTimeMillis() + val packetId = packet.getPacketId() + synchronized(queue) { + while (queue.isNotEmpty()) { + val oldest = queue.first() + if (now - oldest.queuedAtMs <= ttlMs) break + queue.removeFirst() + } + while (queue.size >= maxSize) { + queue.removeFirst() + } + queue.addLast( + QueuedPacket( + packet = packet, + accountPublicKey = accountPublicKey, + queuedAtMs = now + ) + ) + } + addLog( + "📦 READY-GATE queued id=0x${packetId.toString(16)} state=$state account=${shortKeyForLog(accountPublicKey)}" + ) + } + + fun drainForAccount( + activeAccountKey: String, + reason: String, + addLog: (String) -> Unit + ): List { + if (activeAccountKey.isBlank()) return emptyList() + + val now = System.currentTimeMillis() + val packetsToSend = mutableListOf() + + synchronized(queue) { + val iterator = queue.iterator() + while (iterator.hasNext()) { + val queued = iterator.next() + val isExpired = now - queued.queuedAtMs > ttlMs + val accountMatches = + queued.accountPublicKey.isBlank() || + queued.accountPublicKey.equals(activeAccountKey, ignoreCase = true) + if (!isExpired && accountMatches) { + packetsToSend += queued.packet + } + iterator.remove() + } + } + + if (packetsToSend.isNotEmpty()) { + addLog("📬 READY-GATE flush: ${packetsToSend.size} packet(s), reason=$reason") + } + return packetsToSend + } +}