fix: Фикс бага с подключением при первичной регистрации юзера

This commit is contained in:
2026-04-17 23:45:52 +05:00
parent 1a57d8f4d0
commit 7f4684082e
6 changed files with 936 additions and 103 deletions

382
Architecture.md Normal file
View File

@@ -0,0 +1,382 @@
# Rosetta Android — Architecture
> Документ описывает **текущую** архитектуру `rosetta-android` (ветка `dev`) по коду, без идеализаций.
## 1. Архитектурный стиль
Приложение построено как **layered + feature-oriented** архитектура:
- UI на Jetpack Compose (`MainActivity` + `ui/*`).
- Бизнес-оркестрация в singleton-сервисах (`ProtocolManager`, `CallManager`, `TransportManager`, `UpdateManager`).
- Data слой через репозитории (`MessageRepository`, `GroupRepository`, `AvatarRepository`, `AccountManager`).
- Persistence через Room (`RosettaDatabase`).
- Crypto изолирован в `crypto/*`.
DI-контейнера (Hilt/Koin) сейчас нет: зависимости поднимаются через `object`, `getInstance(...)`, singleton-инициализацию.
---
## 2. Слои и границы
```mermaid
flowchart TB
subgraph UI["UI Layer (Compose + ViewModel)"]
A1["MainActivity"]
A2["ui/chats/*"]
A3["ui/auth/*"]
A4["ui/settings/*"]
end
subgraph SVC["Service Layer (Singleton orchestrators)"]
B1["ProtocolManager"]
B2["CallManager"]
B3["TransportManager"]
B4["UpdateManager"]
B5["RosettaFirebaseMessagingService"]
end
subgraph DATA["Data Layer"]
C1["MessageRepository"]
C2["GroupRepository"]
C3["AvatarRepository"]
C4["AccountManager / PreferencesManager"]
C5["DraftManager / ForwardManager"]
end
subgraph DB["Persistence Layer"]
D1["Room: RosettaDatabase"]
D2["DAO: message/dialog/group/etc"]
end
subgraph NET["Network Layer"]
E1["Protocol (WebSocket)"]
E2["Packet* codec"]
E3["OkHttp HTTP (transport/update)"]
E4["WebRTC"]
end
subgraph CRYPTO["Crypto Layer"]
F1["CryptoManager"]
F2["MessageCrypto"]
F3["XChaCha20E2EE (calls)"]
end
UI --> SVC
UI --> DATA
SVC --> DATA
SVC --> NET
DATA --> DB
DATA --> CRYPTO
SVC --> CRYPTO
```
---
## 3. Главные модули и ответственность
### 3.1 `MainActivity` (composition root)
`MainActivity` — главный orchestration entrypoint приложения:
- поднимает `ProtocolManager.initialize(...)`, `CallManager.initialize(...)`;
- управляет auth-гейтингом, онбордингом и основным nav-stack (`Screen`);
- привязывает текущий аккаунт к runtime-сервисам;
- триггерит fast reconnect на `onResume`;
- следит за разрешениями (уведомления, fullscreen intent и т.д.).
### 3.2 `RosettaApplication`
На старте процесса инициализирует глобальные подсистемы:
- crash reporting,
- draft manager,
- transport manager,
- update manager.
### 3.3 Репозитории
#### `MessageRepository`
Ключевой data-центр для чатов:
- инициализация аккаунт-контекста (`publicKey/privateKey`);
- отправка сообщений (optimistic insert + сетевой send);
- обработка входящих `PacketMessage`/`PacketDelivery`/`PacketRead`;
- обновление `dialogs`, `messages`, `message_search_index`;
- синк timestamp (`accounts_sync_times`);
- шина событий для UI (`newMessageEvents`, `deliveryStatusEvents`).
#### `GroupRepository`
- хранение и операции по группам,
- интеграция с `PacketGroup*`.
#### `AvatarRepository`
- кэш/история аватаров (Room + file storage),
- реактивная выдача аватаров через `Flow`.
#### `AccountManager`
- хранение аккаунтов в DataStore,
- last logged public/private hash в SharedPreferences для быстрых синхронных чтений.
---
## 4. Сетевой стек
## 4.1 `Protocol` (низкий уровень)
`Protocol` отвечает за:
- WebSocket lifecycle (`DISCONNECTED/CONNECTING/CONNECTED/HANDSHAKING/DEVICE_VERIFICATION_REQUIRED/AUTHENTICATED`),
- heartbeat,
- reconnect/backoff,
- packet encode/decode,
- `waitPacket/unwaitPacket` обработчики,
- queue pre-handshake пакетов.
Ключевые механизмы устойчивости:
- `lifecycleMutex` для serialized lifecycle операций,
- `connectionGeneration` для игнора stale callbacks старых сокетов,
- guard `isConnecting` от параллельного `connect()`.
## 4.2 `ProtocolManager` (верхний уровень)
`ProtocolManager` — orchestration-слой над `Protocol`:
- единая точка для UI/Data слоёв (`send`, `authenticate`, `reconnect`, `waitPacket` и т.д.);
- bootstrap после auth (sync, own-profile resolve, push subscribe);
- маршрутизация входящих packet-ов в репозитории/подсистемы;
- call signaling bridge для `CallManager`;
- управление typed caches (`SearchUser`, user info cache).
Новый runtime-дизайн connection orchestration:
- `ProtocolConnectionModels.kt``ConnectionLifecycleState`, `ConnectionEvent`, bootstrap context;
- `ProtocolConnectionSupervisor.kt` — actor/event-loop для serialized событий;
- `ReadyPacketGate.kt` — очередь пакетов до состояния `READY` (TTL + max size).
---
## 5. Lifecycle состояния соединения (верхний уровень)
`ProtocolManager.connectionLifecycleState`:
- `DISCONNECTED`
- `CONNECTING`
- `HANDSHAKING`
- `AUTHENTICATED`
- `BOOTSTRAPPING`
- `READY`
- `DEVICE_VERIFICATION_REQUIRED`
Переход в `READY` происходит только когда одновременно выполнено:
- аккаунт инициализирован,
- протокол аутентифицирован,
- sync завершён,
- own-profile резолвнут (или истёк fallback timeout).
```mermaid
stateDiagram-v2
[*] --> DISCONNECTED
DISCONNECTED --> CONNECTING
CONNECTING --> HANDSHAKING
HANDSHAKING --> DEVICE_VERIFICATION_REQUIRED
HANDSHAKING --> AUTHENTICATED
AUTHENTICATED --> BOOTSTRAPPING
BOOTSTRAPPING --> READY
READY --> DISCONNECTED
DEVICE_VERIFICATION_REQUIRED --> CONNECTING
```
---
## 6. Поток auth / session bootstrap
```mermaid
sequenceDiagram
participant UI as AuthFlow/MainActivity
participant PM as ProtocolManager
participant P as Protocol
participant MR as MessageRepository
UI->>PM: initializeAccount(public, private)
UI->>PM: connect()
UI->>PM: authenticate(public, privateHash)
PM->>P: connect + startHandshake
P-->>PM: AUTHENTICATED
PM->>PM: onAuthenticated()
PM->>PM: subscribePushTokenIfAvailable()
PM->>PM: requestSynchronize()
PM->>MR: initialize(...)
PM-->>PM: SyncCompleted + OwnProfileResolved
PM-->>UI: connectionLifecycleState = READY
```
Критично: отправка пользовательских пакетов до `READY` не теряется, а попадает в `ReadyPacketGate`.
---
## 7. Поток сообщений (send/receive/delivery)
## 7.1 Отправка
1. UI (`ChatViewModel`) вызывает отправку через `MessageRepository.sendMessage(...)`.
2. Репозиторий делает optimistic insert в `messages` (`WAITING`).
3. Формируется `PacketMessage`.
4. Отправка идёт в `ProtocolManager.send(...)`.
5. Если состояние не `READY`, пакет ставится в `ReadyPacketGate`.
6. После `READY` очередь сбрасывается в `Protocol.sendPacket(...)`.
## 7.2 Подтверждение доставки
1. Сервер присылает `PacketDelivery (0x08)`.
2. `ProtocolManager` маршрутизирует в `MessageRepository.handleDelivery(...)`.
3. Репозиторий обновляет статус в Room.
4. UI получает апдейт через `Flow`/события.
## 7.3 Входящие
1. `PacketMessage (0x06)` приходит в `Protocol`.
2. `ProtocolManager` dispatch → `MessageRepository.handleIncomingMessage(...)`.
3. Сообщение сохраняется в Room, обновляется `dialogs` и индексы поиска.
4. UI реактивно перерисовывается.
---
## 8. Sync-пайплайн
Используется пакет `PacketSync (0x19)` и режимы:
- `BATCH_START`
- поток синк-пакетов (`MESSAGE/READ/DELIVERY/...`)
- `BATCH_END`
- `NOT_NEEDED`
Особенности:
- есть последовательная очередь inbound задач для сохранения порядка обработки;
- after-sync hooks: retry waiting messages, request missing user info;
- sync timestamp хранится в `accounts_sync_times`.
---
## 9. Calls (WebRTC + signaling)
`CallManager` использует:
- signaling пакеты: `PacketSignalPeer (0x1A)`, `PacketWebRTC (0x1B)`;
- ICE: `PacketIceServers (0x1C)`;
- WebRTC stack (`PeerConnectionFactory`, `PeerConnection`, audio track);
- E2EE голоса через `XChaCha20E2EE` обвязки sender/receiver.
Основные фазы звонка:
- `IDLE``INCOMING`/`OUTGOING``CONNECTING``ACTIVE``IDLE`.
```mermaid
stateDiagram-v2
[*] --> IDLE
IDLE --> OUTGOING
IDLE --> INCOMING
OUTGOING --> CONNECTING
INCOMING --> CONNECTING
CONNECTING --> ACTIVE
ACTIVE --> IDLE
OUTGOING --> IDLE
INCOMING --> IDLE
```
---
## 10. Transport (вложения)
`TransportManager`:
- получает transport server через `PacketRequestTransport (0x0F)` (desktop parity);
- upload/download через OkHttp;
- resumable download (HTTP Range);
- трекает состояния прогресса через `StateFlow` (`uploading`, `downloading`);
- поддерживает cancel/pause/resume через `FileDownloadManager`.
---
## 11. Push Notifications
`RosettaFirebaseMessagingService`:
- обрабатывает `onNewToken` и подписку токена через `ProtocolManager`;
- dedup пушей;
- маршрутизация типов (`personal_message`, `group_message`, `call`, `read`);
- очистка уведомлений по read events;
- wake-up reconnect при silent push.
---
## 12. Обновления (SDU)
`UpdateManager`:
1. запрашивает update server через `PacketRequestUpdate (0x0A)`;
2. ходит на SDU HTTP endpoint;
3. скачивает APK через `DownloadManager`;
4. ведёт update state machine (`Idle/Checking/UpdateAvailable/Downloading/ReadyToInstall/Error`).
---
## 13. Persistence (Room)
`RosettaDatabase` (version `17`) включает:
- `messages` — сообщения,
- `dialogs` — диалоги + денормализованные поля для быстрых списков,
- `message_search_index` — локальный индекс поиска,
- `groups` — группы,
- `pinned_messages` — закрепы,
- `avatar_cache` — аватары,
- `blacklist` — blacklist,
- `accounts_sync_times` — sync cursor,
- `encrypted_accounts` — аккаунты (legacy Room account storage).
Есть длинная цепочка миграций (4→17) с оптимизациями под производительность и денормализацию.
---
## 14. Crypto
- `CryptoManager`:
- seed phrase / keypair,
- PBKDF2-derived key caching,
- encrypt/decrypt для локального хранения.
- `MessageCrypto`:
- message-level XChaCha20-Poly1305,
- ECDH/AES-обмен ключом для payload,
- attachment decrypt logic.
Crypto и network связаны через `MessageRepository`/`ProtocolManager`.
---
## 15. Наблюдаемость и диагностика
- wire/protocol логи через `ProtocolManager.addLog(...)` + trace file;
- debug logs доступны в UI;
- отдельные диагностические логи для звонков (`CallManager` breadcrumbs).
---
## 16. Текущие архитектурные сильные стороны
- Реактивная модель состояния (`StateFlow`) на большинстве критических путей.
- Сильная декомпозиция packet протокола (`Packet*`).
- Наличие ready-gate и serialized supervisor снижает race-condition в соединении.
- Room + денормализация ускоряют списки чатов/поиск.
---
## 17. Текущие архитектурные риски
- `MainActivity` остаётся очень крупным composition root.
- `ProtocolManager` и `MessageRepository` всё ещё крупные “god objects”.
- Отсутствие DI усложняет управляемость зависимостей/тестируемость.
- Часть жизненного цикла связана через runtime singleton state, что повышает риск регрессий при эволюции.
---
## 18. Карта ключевых файлов
- `app/src/main/java/com/rosetta/messenger/MainActivity.kt`
- `app/src/main/java/com/rosetta/messenger/RosettaApplication.kt`
- `app/src/main/java/com/rosetta/messenger/network/Protocol.kt`
- `app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt`
- `app/src/main/java/com/rosetta/messenger/network/ProtocolConnectionModels.kt`
- `app/src/main/java/com/rosetta/messenger/network/ProtocolConnectionSupervisor.kt`
- `app/src/main/java/com/rosetta/messenger/network/ReadyPacketGate.kt`
- `app/src/main/java/com/rosetta/messenger/network/CallManager.kt`
- `app/src/main/java/com/rosetta/messenger/network/TransportManager.kt`
- `app/src/main/java/com/rosetta/messenger/push/RosettaFirebaseMessagingService.kt`
- `app/src/main/java/com/rosetta/messenger/update/UpdateManager.kt`
- `app/src/main/java/com/rosetta/messenger/data/MessageRepository.kt`
- `app/src/main/java/com/rosetta/messenger/database/RosettaDatabase.kt`
- `app/src/main/java/com/rosetta/messenger/crypto/CryptoManager.kt`
- `app/src/main/java/com/rosetta/messenger/crypto/MessageCrypto.kt`

View File

@@ -353,6 +353,13 @@ class MainActivity : FragmentActivity() {
// При открытии по звонку с lock screen — пропускаем auth
openedForCall && hasExistingAccount == true ->
"main"
// First-registration race: DataStore may flip isLoggedIn=true
// before AuthFlow returns DecryptedAccount to currentAccount.
// Do not enter MainScreen with null account (it leads to
// empty keys/UI placeholders until relog).
isLoggedIn == true && currentAccount == null &&
hasExistingAccount == false ->
"auth_new"
isLoggedIn != true && hasExistingAccount == false ->
"auth_new"
isLoggedIn != true && hasExistingAccount == true ->

View File

@@ -0,0 +1,33 @@
package com.rosetta.messenger.network
enum class ConnectionLifecycleState {
DISCONNECTED,
CONNECTING,
HANDSHAKING,
AUTHENTICATED,
BOOTSTRAPPING,
READY,
DEVICE_VERIFICATION_REQUIRED
}
sealed interface ConnectionEvent {
data class InitializeAccount(val publicKey: String, val privateKey: String) : ConnectionEvent
data class Connect(val reason: String) : ConnectionEvent
data class FastReconnect(val reason: String) : ConnectionEvent
data class Disconnect(val reason: String, val clearCredentials: Boolean) : ConnectionEvent
data class Authenticate(val publicKey: String, val privateHash: String) : ConnectionEvent
data class ProtocolStateChanged(val state: ProtocolState) : ConnectionEvent
data class SendPacket(val packet: Packet) : ConnectionEvent
data class SyncCompleted(val reason: String) : ConnectionEvent
data class OwnProfileResolved(val publicKey: String) : ConnectionEvent
data class OwnProfileFallbackTimeout(val sessionGeneration: Long) : ConnectionEvent
}
data class ConnectionBootstrapContext(
val accountPublicKey: String = "",
val accountInitialized: Boolean = false,
val protocolState: ProtocolState = ProtocolState.DISCONNECTED,
val authenticated: Boolean = false,
val syncCompleted: Boolean = false,
val ownProfileResolved: Boolean = false
)

View File

@@ -0,0 +1,48 @@
package com.rosetta.messenger.network
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.launch
class ProtocolConnectionSupervisor(
private val scope: CoroutineScope,
private val onEvent: suspend (ConnectionEvent) -> Unit,
private val onError: (Throwable) -> Unit,
private val addLog: (String) -> Unit
) {
private val eventChannel = Channel<ConnectionEvent>(Channel.UNLIMITED)
private val lock = Any()
@Volatile private var job: Job? = null
fun start() {
if (job?.isActive == true) return
synchronized(lock) {
if (job?.isActive == true) return
job =
scope.launch {
for (event in eventChannel) {
try {
onEvent(event)
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
addLog("❌ ConnectionSupervisor event failed: ${e.message}")
onError(e)
}
}
}
addLog("🧠 ConnectionSupervisor started")
}
}
fun post(event: ConnectionEvent) {
start()
val result = eventChannel.trySend(event)
if (result.isFailure) {
scope.launch { eventChannel.send(event) }
}
}
}

View File

@@ -46,6 +46,9 @@ object ProtocolManager {
private const val PACKET_WEB_RTC = 0x1B
private const val PACKET_ICE_SERVERS = 0x1C
private const val NETWORK_WAIT_TIMEOUT_MS = 20_000L
private const val BOOTSTRAP_OWN_PROFILE_FALLBACK_MS = 2_500L
private const val READY_PACKET_QUEUE_MAX = 500
private const val READY_PACKET_QUEUE_TTL_MS = 120_000L
// Desktop parity: use the same primary WebSocket endpoint as desktop client.
private const val SERVER_ADDRESS = "wss://wss.rosetta.im"
@@ -59,6 +62,19 @@ object ProtocolManager {
private var appContext: Context? = null
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private val protocolInstanceLock = Any()
private val connectionSupervisor =
ProtocolConnectionSupervisor(
scope = scope,
onEvent = ::handleConnectionEvent,
onError = { error -> android.util.Log.e(TAG, "ConnectionSupervisor event failed", error) },
addLog = ::addLog
)
private val sessionGeneration = AtomicLong(0L)
private val readyPacketGate =
ReadyPacketGate(
maxSize = READY_PACKET_QUEUE_MAX,
ttlMs = READY_PACKET_QUEUE_TTL_MS
)
@Volatile private var packetHandlersRegistered = false
@Volatile private var stateMonitoringStarted = false
@@ -68,6 +84,7 @@ object ProtocolManager {
@Volatile private var networkReconnectRegistered = false
@Volatile private var networkReconnectCallback: ConnectivityManager.NetworkCallback? = null
@Volatile private var networkReconnectTimeoutJob: Job? = null
@Volatile private var ownProfileFallbackJob: Job? = null
// Guard: prevent duplicate FCM token subscribe within a single session
@Volatile
@@ -117,9 +134,337 @@ object ProtocolManager {
private fun normalizeSearchQuery(value: String): String =
value.trim().removePrefix("@").lowercase(Locale.ROOT)
private fun ensureConnectionSupervisor() {
connectionSupervisor.start()
}
private fun postConnectionEvent(event: ConnectionEvent) {
connectionSupervisor.post(event)
}
private fun protocolToLifecycleState(state: ProtocolState): ConnectionLifecycleState =
when (state) {
ProtocolState.DISCONNECTED -> ConnectionLifecycleState.DISCONNECTED
ProtocolState.CONNECTING -> ConnectionLifecycleState.CONNECTING
ProtocolState.CONNECTED, ProtocolState.HANDSHAKING -> ConnectionLifecycleState.HANDSHAKING
ProtocolState.DEVICE_VERIFICATION_REQUIRED ->
ConnectionLifecycleState.DEVICE_VERIFICATION_REQUIRED
ProtocolState.AUTHENTICATED -> ConnectionLifecycleState.AUTHENTICATED
}
private fun setConnectionLifecycleState(next: ConnectionLifecycleState, reason: String) {
if (_connectionLifecycleState.value == next) return
addLog("🧭 CONNECTION STATE: ${_connectionLifecycleState.value} -> $next ($reason)")
_connectionLifecycleState.value = next
}
private fun recomputeConnectionLifecycleState(reason: String) {
val context = bootstrapContext
val nextState =
if (context.authenticated) {
if (context.accountInitialized && context.syncCompleted && context.ownProfileResolved) {
ConnectionLifecycleState.READY
} else {
ConnectionLifecycleState.BOOTSTRAPPING
}
} else {
protocolToLifecycleState(context.protocolState)
}
setConnectionLifecycleState(nextState, reason)
if (nextState == ConnectionLifecycleState.READY) {
flushReadyPacketQueue(context.accountPublicKey, reason)
}
}
private fun clearReadyPacketQueue(reason: String) {
readyPacketGate.clear(reason = reason, addLog = ::addLog)
}
private fun enqueueReadyPacket(packet: Packet) {
val accountKey =
messageRepository?.getCurrentAccountKey()?.trim().orEmpty().ifBlank {
bootstrapContext.accountPublicKey
}
readyPacketGate.enqueue(
packet = packet,
accountPublicKey = accountKey,
state = _connectionLifecycleState.value,
shortKeyForLog = ::shortKeyForLog,
addLog = ::addLog
)
}
private fun flushReadyPacketQueue(activeAccountKey: String, reason: String) {
val packetsToSend =
readyPacketGate.drainForAccount(
activeAccountKey = activeAccountKey,
reason = reason,
addLog = ::addLog
)
if (packetsToSend.isEmpty()) return
val protocolInstance = getProtocol()
packetsToSend.forEach { protocolInstance.sendPacket(it) }
}
private fun packetCanBypassReadyGate(packet: Packet): Boolean =
when (packet) {
is PacketHandshake,
is PacketSync,
is PacketSearch,
is PacketPushNotification,
is PacketRequestTransport,
is PacketRequestUpdate,
is PacketSignalPeer,
is PacketWebRTC,
is PacketIceServers,
is PacketDeviceResolve -> true
else -> false
}
private suspend fun handleConnectionEvent(event: ConnectionEvent) {
when (event) {
is ConnectionEvent.InitializeAccount -> {
val normalizedPublicKey = event.publicKey.trim()
val normalizedPrivateKey = event.privateKey.trim()
if (normalizedPublicKey.isBlank() || normalizedPrivateKey.isBlank()) {
addLog("⚠️ initializeAccount skipped: missing account credentials")
return
}
val protocolState = getProtocol().state.value
addLog(
"🔐 initializeAccount pk=${shortKeyForLog(normalizedPublicKey)} keyLen=${normalizedPrivateKey.length} state=$protocolState"
)
setSyncInProgress(false)
clearTypingState()
if (messageRepository == null) {
appContext?.let { messageRepository = MessageRepository.getInstance(it) }
}
messageRepository?.initialize(normalizedPublicKey, normalizedPrivateKey)
val sameAccount =
bootstrapContext.accountPublicKey.equals(normalizedPublicKey, ignoreCase = true)
if (!sameAccount) {
clearReadyPacketQueue("account_switch")
}
bootstrapContext =
bootstrapContext.copy(
accountPublicKey = normalizedPublicKey,
accountInitialized = true,
syncCompleted = if (sameAccount) bootstrapContext.syncCompleted else false,
ownProfileResolved = if (sameAccount) bootstrapContext.ownProfileResolved else false
)
recomputeConnectionLifecycleState("account_initialized")
val shouldResync =
resyncRequiredAfterAccountInit || protocol?.isAuthenticated() == true
if (shouldResync) {
resyncRequiredAfterAccountInit = false
syncRequestInFlight = false
clearSyncRequestTimeout()
addLog(
"🔄 Account initialized (${shortKeyForLog(normalizedPublicKey)}) -> force sync"
)
requestSynchronize()
}
if (
protocol?.isAuthenticated() == true &&
activeAuthenticatedSessionId > 0L &&
lastBootstrappedSessionId != activeAuthenticatedSessionId
) {
tryRunPostAuthBootstrap("account_initialized")
}
scope.launch {
messageRepository?.checkAndSendVersionUpdateMessage()
}
}
is ConnectionEvent.Connect -> {
if (!hasActiveInternet()) {
waitForNetworkAndReconnect("connect:${event.reason}")
return
}
stopWaitingForNetwork("connect:${event.reason}")
getProtocol().connect()
}
is ConnectionEvent.FastReconnect -> {
if (!hasActiveInternet()) {
waitForNetworkAndReconnect("reconnect:${event.reason}")
return
}
stopWaitingForNetwork("reconnect:${event.reason}")
getProtocol().reconnectNowIfNeeded(event.reason)
}
is ConnectionEvent.Disconnect -> {
stopWaitingForNetwork(event.reason)
protocol?.disconnect()
if (event.clearCredentials) {
protocol?.clearCredentials()
}
messageRepository?.clearInitialization()
clearTypingState()
_devices.value = emptyList()
_pendingDeviceVerification.value = null
syncRequestInFlight = false
clearSyncRequestTimeout()
setSyncInProgress(false)
resyncRequiredAfterAccountInit = false
lastSubscribedToken = null
ownProfileFallbackJob?.cancel()
ownProfileFallbackJob = null
deferredAuthBootstrap = false
activeAuthenticatedSessionId = 0L
lastBootstrappedSessionId = 0L
bootstrapContext = ConnectionBootstrapContext()
clearReadyPacketQueue("disconnect:${event.reason}")
recomputeConnectionLifecycleState("disconnect:${event.reason}")
}
is ConnectionEvent.Authenticate -> {
appContext?.let { context ->
runCatching {
val accountManager = AccountManager(context)
accountManager.setLastLoggedPublicKey(event.publicKey)
accountManager.setLastLoggedPrivateKeyHash(event.privateHash)
}
}
val device = buildHandshakeDevice()
getProtocol().startHandshake(event.publicKey, event.privateHash, device)
}
is ConnectionEvent.ProtocolStateChanged -> {
val previousProtocolState = bootstrapContext.protocolState
val newProtocolState = event.state
if (
newProtocolState == ProtocolState.AUTHENTICATED &&
previousProtocolState != ProtocolState.AUTHENTICATED
) {
lastSubscribedToken = null
stopWaitingForNetwork("authenticated")
ownProfileFallbackJob?.cancel()
val generation = sessionGeneration.incrementAndGet()
activeAuthenticatedSessionId = authenticatedSessionCounter.incrementAndGet()
deferredAuthBootstrap = false
bootstrapContext =
bootstrapContext.copy(
protocolState = newProtocolState,
authenticated = true,
syncCompleted = false,
ownProfileResolved = false
)
recomputeConnectionLifecycleState("protocol_authenticated")
ownProfileFallbackJob =
scope.launch {
delay(BOOTSTRAP_OWN_PROFILE_FALLBACK_MS)
postConnectionEvent(
ConnectionEvent.OwnProfileFallbackTimeout(generation)
)
}
onAuthenticated()
return
}
if (
newProtocolState != ProtocolState.AUTHENTICATED &&
newProtocolState != ProtocolState.HANDSHAKING
) {
syncRequestInFlight = false
clearSyncRequestTimeout()
setSyncInProgress(false)
lastSubscribedToken = null
cancelAllOutgoingRetries()
ownProfileFallbackJob?.cancel()
ownProfileFallbackJob = null
deferredAuthBootstrap = false
activeAuthenticatedSessionId = 0L
bootstrapContext =
bootstrapContext.copy(
protocolState = newProtocolState,
authenticated = false,
syncCompleted = false,
ownProfileResolved = false
)
recomputeConnectionLifecycleState("protocol_state_${newProtocolState.name.lowercase(Locale.ROOT)}")
return
}
if (newProtocolState == ProtocolState.HANDSHAKING && bootstrapContext.authenticated) {
ownProfileFallbackJob?.cancel()
ownProfileFallbackJob = null
deferredAuthBootstrap = false
activeAuthenticatedSessionId = 0L
bootstrapContext =
bootstrapContext.copy(
protocolState = newProtocolState,
authenticated = false,
syncCompleted = false,
ownProfileResolved = false
)
recomputeConnectionLifecycleState("protocol_re_handshaking")
return
}
bootstrapContext = bootstrapContext.copy(protocolState = newProtocolState)
recomputeConnectionLifecycleState("protocol_state_${newProtocolState.name.lowercase(Locale.ROOT)}")
}
is ConnectionEvent.SendPacket -> {
val packet = event.packet
val lifecycle = _connectionLifecycleState.value
if (packetCanBypassReadyGate(packet) || lifecycle == ConnectionLifecycleState.READY) {
getProtocol().sendPacket(packet)
} else {
enqueueReadyPacket(packet)
if (!isAuthenticated()) {
if (!hasActiveInternet()) {
waitForNetworkAndReconnect("ready_gate_send")
} else {
getProtocol().reconnectNowIfNeeded("ready_gate_send")
}
}
}
}
is ConnectionEvent.SyncCompleted -> {
syncRequestInFlight = false
clearSyncRequestTimeout()
inboundProcessingFailures.set(0)
inboundTasksInCurrentBatch.set(0)
fullFailureBatchStreak.set(0)
addLog(event.reason)
setSyncInProgress(false)
retryWaitingMessages()
requestMissingUserInfo()
bootstrapContext = bootstrapContext.copy(syncCompleted = true)
recomputeConnectionLifecycleState("sync_completed")
}
is ConnectionEvent.OwnProfileResolved -> {
val accountPublicKey = bootstrapContext.accountPublicKey
val matchesAccount =
accountPublicKey.isBlank() ||
event.publicKey.equals(accountPublicKey, ignoreCase = true)
if (!matchesAccount) return
ownProfileFallbackJob?.cancel()
ownProfileFallbackJob = null
bootstrapContext = bootstrapContext.copy(ownProfileResolved = true)
recomputeConnectionLifecycleState("own_profile_resolved")
}
is ConnectionEvent.OwnProfileFallbackTimeout -> {
if (sessionGeneration.get() != event.sessionGeneration) return
if (!bootstrapContext.authenticated || bootstrapContext.ownProfileResolved) return
addLog(
"⏱️ Own profile fetch timeout — continuing bootstrap for ${shortKeyForLog(bootstrapContext.accountPublicKey)}"
)
bootstrapContext = bootstrapContext.copy(ownProfileResolved = true)
recomputeConnectionLifecycleState("own_profile_fallback_timeout")
}
}
}
// Keep heavy protocol/message UI logs disabled by default.
private var uiLogsEnabled = false
private var lastProtocolState: ProtocolState? = null
private val _connectionLifecycleState = MutableStateFlow(ConnectionLifecycleState.DISCONNECTED)
val connectionLifecycleState: StateFlow<ConnectionLifecycleState> = _connectionLifecycleState.asStateFlow()
private var bootstrapContext = ConnectionBootstrapContext()
@Volatile private var syncBatchInProgress = false
private val _syncInProgress = MutableStateFlow(false)
val syncInProgress: StateFlow<Boolean> = _syncInProgress.asStateFlow()
@@ -259,6 +604,7 @@ object ProtocolManager {
appContext = context.applicationContext
messageRepository = MessageRepository.getInstance(context)
groupRepository = GroupRepository.getInstance(context)
ensureConnectionSupervisor()
if (!packetHandlersRegistered) {
setupPacketHandlers()
packetHandlersRegistered = true
@@ -275,27 +621,7 @@ object ProtocolManager {
private fun setupStateMonitoring() {
scope.launch {
getProtocol().state.collect { newState ->
val previous = lastProtocolState
if (newState == ProtocolState.AUTHENTICATED && previous != ProtocolState.AUTHENTICATED) {
// New authenticated websocket session: always allow fresh push subscribe.
lastSubscribedToken = null
stopWaitingForNetwork("authenticated")
activeAuthenticatedSessionId = authenticatedSessionCounter.incrementAndGet()
deferredAuthBootstrap = false
onAuthenticated()
}
if (newState != ProtocolState.AUTHENTICATED && newState != ProtocolState.HANDSHAKING) {
syncRequestInFlight = false
clearSyncRequestTimeout()
setSyncInProgress(false)
// Connection/session dropped: force re-subscribe on next AUTHENTICATED.
lastSubscribedToken = null
deferredAuthBootstrap = false
// iOS parity: cancel all pending outgoing retries on disconnect.
// They will be retried via retryWaitingMessages() on next handshake.
cancelAllOutgoingRetries()
}
lastProtocolState = newState
postConnectionEvent(ConnectionEvent.ProtocolStateChanged(newState))
}
}
}
@@ -305,38 +631,9 @@ object ProtocolManager {
* Должен вызываться после авторизации пользователя
*/
fun initializeAccount(publicKey: String, privateKey: String) {
val normalizedPublicKey = publicKey.trim()
val normalizedPrivateKey = privateKey.trim()
if (normalizedPublicKey.isBlank() || normalizedPrivateKey.isBlank()) {
addLog("⚠️ initializeAccount skipped: missing account credentials")
return
}
addLog(
"🔐 initializeAccount pk=${shortKeyForLog(normalizedPublicKey)} keyLen=${normalizedPrivateKey.length} state=${getProtocol().state.value}"
postConnectionEvent(
ConnectionEvent.InitializeAccount(publicKey = publicKey, privateKey = privateKey)
)
setSyncInProgress(false)
clearTypingState()
messageRepository?.initialize(normalizedPublicKey, normalizedPrivateKey)
if (deferredAuthBootstrap && protocol?.isAuthenticated() == true) {
addLog("🔁 AUTH bootstrap resume after initializeAccount")
}
val shouldResync = resyncRequiredAfterAccountInit || protocol?.isAuthenticated() == true
if (shouldResync) {
// Late account init may happen while an old sync request flag is still set.
// Force a fresh synchronize request to recover dropped inbound packets.
resyncRequiredAfterAccountInit = false
syncRequestInFlight = false
clearSyncRequestTimeout()
addLog("🔄 Account initialized (${shortKeyForLog(normalizedPublicKey)}) -> force sync")
requestSynchronize()
}
tryRunPostAuthBootstrap("initialize_account")
// Send "Rosetta Updates" message on version change (like desktop useUpdateMessage)
scope.launch {
messageRepository?.checkAndSendVersionUpdateMessage()
}
}
/**
@@ -576,6 +873,9 @@ object ProtocolManager {
accountManager.updateAccountUsername(ownPublicKey, user.username)
}
_ownProfileUpdated.value = System.currentTimeMillis()
postConnectionEvent(
ConnectionEvent.OwnProfileResolved(user.publicKey)
)
}
}
}
@@ -851,15 +1151,7 @@ object ProtocolManager {
}
private fun finishSyncCycle(reason: String) {
syncRequestInFlight = false
clearSyncRequestTimeout()
inboundProcessingFailures.set(0)
inboundTasksInCurrentBatch.set(0)
fullFailureBatchStreak.set(0)
addLog(reason)
setSyncInProgress(false)
retryWaitingMessages()
requestMissingUserInfo()
postConnectionEvent(ConnectionEvent.SyncCompleted(reason))
}
/**
@@ -1148,7 +1440,9 @@ object ProtocolManager {
if (hasActiveInternet()) {
addLog("📡 NETWORK AVAILABLE → reconnect")
stopWaitingForNetwork("available")
getProtocol().connect()
postConnectionEvent(
ConnectionEvent.FastReconnect("network_available")
)
}
}
@@ -1159,7 +1453,9 @@ object ProtocolManager {
if (networkCapabilities.hasCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)) {
addLog("📡 NETWORK CAPABILITIES READY → reconnect")
stopWaitingForNetwork("capabilities_changed")
getProtocol().connect()
postConnectionEvent(
ConnectionEvent.FastReconnect("network_capabilities_changed")
)
}
}
}
@@ -1187,7 +1483,9 @@ object ProtocolManager {
}.onFailure { error ->
addLog("⚠️ NETWORK WAIT register failed: ${error.message}")
stopWaitingForNetwork("register_failed")
getProtocol().reconnectNowIfNeeded("network_wait_register_failed")
postConnectionEvent(
ConnectionEvent.FastReconnect("network_wait_register_failed")
)
}
networkReconnectTimeoutJob?.cancel()
@@ -1197,7 +1495,9 @@ object ProtocolManager {
if (!hasActiveInternet()) {
addLog("⏱️ NETWORK WAIT timeout (${NETWORK_WAIT_TIMEOUT_MS}ms), reconnect fallback")
stopWaitingForNetwork("timeout")
getProtocol().reconnectNowIfNeeded("network_wait_timeout")
postConnectionEvent(
ConnectionEvent.FastReconnect("network_wait_timeout")
)
}
}
}
@@ -1240,24 +1540,14 @@ object ProtocolManager {
* Connect to server
*/
fun connect() {
if (!hasActiveInternet()) {
waitForNetworkAndReconnect("connect")
return
}
stopWaitingForNetwork("connect")
getProtocol().connect()
postConnectionEvent(ConnectionEvent.Connect(reason = "api_connect"))
}
/**
* Trigger immediate reconnect on app foreground (skip waiting backoff timer).
*/
fun reconnectNowIfNeeded(reason: String = "foreground_resume") {
if (!hasActiveInternet()) {
waitForNetworkAndReconnect("reconnect:$reason")
return
}
stopWaitingForNetwork("reconnect:$reason")
getProtocol().reconnectNowIfNeeded(reason)
postConnectionEvent(ConnectionEvent.FastReconnect(reason = reason))
}
/**
@@ -1308,15 +1598,9 @@ object ProtocolManager {
* Authenticate with server
*/
fun authenticate(publicKey: String, privateHash: String) {
appContext?.let { context ->
runCatching {
val accountManager = AccountManager(context)
accountManager.setLastLoggedPublicKey(publicKey)
accountManager.setLastLoggedPrivateKeyHash(privateHash)
}
}
val device = buildHandshakeDevice()
getProtocol().startHandshake(publicKey, privateHash, device)
postConnectionEvent(
ConnectionEvent.Authenticate(publicKey = publicKey, privateHash = privateHash)
)
}
/**
@@ -1546,7 +1830,7 @@ object ProtocolManager {
* Send packet (simplified)
*/
fun send(packet: Packet) {
getProtocol().sendPacket(packet)
postConnectionEvent(ConnectionEvent.SendPacket(packet))
}
/**
@@ -1842,21 +2126,12 @@ object ProtocolManager {
* Disconnect and clear
*/
fun disconnect() {
stopWaitingForNetwork("manual_disconnect")
protocol?.disconnect()
protocol?.clearCredentials()
messageRepository?.clearInitialization()
clearTypingState()
_devices.value = emptyList()
_pendingDeviceVerification.value = null
syncRequestInFlight = false
clearSyncRequestTimeout()
setSyncInProgress(false)
resyncRequiredAfterAccountInit = false
deferredAuthBootstrap = false
activeAuthenticatedSessionId = 0L
lastBootstrappedSessionId = 0L
lastSubscribedToken = null // reset so token is re-sent on next connect
postConnectionEvent(
ConnectionEvent.Disconnect(
reason = "manual_disconnect",
clearCredentials = true
)
)
}
/**

View File

@@ -0,0 +1,88 @@
package com.rosetta.messenger.network
class ReadyPacketGate(
private val maxSize: Int,
private val ttlMs: Long
) {
private data class QueuedPacket(
val packet: Packet,
val accountPublicKey: String,
val queuedAtMs: Long
)
private val queue = ArrayDeque<QueuedPacket>()
fun clear(reason: String, addLog: (String) -> Unit) {
val clearedCount =
synchronized(queue) {
val count = queue.size
queue.clear()
count
}
if (clearedCount > 0) {
addLog("🧹 READY-GATE queue cleared: $clearedCount packet(s), reason=$reason")
}
}
fun enqueue(
packet: Packet,
accountPublicKey: String,
state: ConnectionLifecycleState,
shortKeyForLog: (String) -> String,
addLog: (String) -> Unit
) {
val now = System.currentTimeMillis()
val packetId = packet.getPacketId()
synchronized(queue) {
while (queue.isNotEmpty()) {
val oldest = queue.first()
if (now - oldest.queuedAtMs <= ttlMs) break
queue.removeFirst()
}
while (queue.size >= maxSize) {
queue.removeFirst()
}
queue.addLast(
QueuedPacket(
packet = packet,
accountPublicKey = accountPublicKey,
queuedAtMs = now
)
)
}
addLog(
"📦 READY-GATE queued id=0x${packetId.toString(16)} state=$state account=${shortKeyForLog(accountPublicKey)}"
)
}
fun drainForAccount(
activeAccountKey: String,
reason: String,
addLog: (String) -> Unit
): List<Packet> {
if (activeAccountKey.isBlank()) return emptyList()
val now = System.currentTimeMillis()
val packetsToSend = mutableListOf<Packet>()
synchronized(queue) {
val iterator = queue.iterator()
while (iterator.hasNext()) {
val queued = iterator.next()
val isExpired = now - queued.queuedAtMs > ttlMs
val accountMatches =
queued.accountPublicKey.isBlank() ||
queued.accountPublicKey.equals(activeAccountKey, ignoreCase = true)
if (!isExpired && accountMatches) {
packetsToSend += queued.packet
}
iterator.remove()
}
}
if (packetsToSend.isNotEmpty()) {
addLog("📬 READY-GATE flush: ${packetsToSend.size} packet(s), reason=$reason")
}
return packetsToSend
}
}