refactor: split protocol/session architecture and fix auth navigation regressions

This commit is contained in:
2026-04-18 01:28:30 +05:00
parent 7f4684082e
commit 660ba12c8c
23 changed files with 1478 additions and 620 deletions

View File

@@ -59,10 +59,12 @@ import com.rosetta.messenger.network.ProtocolManager
import com.rosetta.messenger.network.ProtocolState
import com.rosetta.messenger.network.SearchUser
import com.rosetta.messenger.repository.AvatarRepository
import com.rosetta.messenger.session.AppSessionCoordinator
import com.rosetta.messenger.session.IdentityStore
import com.rosetta.messenger.session.SessionState
import com.rosetta.messenger.ui.auth.AccountInfo
import com.rosetta.messenger.ui.auth.AuthFlow
import com.rosetta.messenger.ui.auth.DeviceConfirmScreen
import com.rosetta.messenger.ui.auth.startAuthHandshakeFast
import com.rosetta.messenger.ui.chats.ChatDetailScreen
import com.rosetta.messenger.ui.chats.ChatsListScreen
import com.rosetta.messenger.ui.chats.VoiceTopMiniPlayer
@@ -240,6 +242,8 @@ class MainActivity : FragmentActivity() {
var showOnboarding by remember { mutableStateOf(true) }
var hasExistingAccount by remember { mutableStateOf<Boolean?>(null) }
var currentAccount by remember { mutableStateOf(getCachedSessionAccount()) }
val identityState by IdentityStore.state.collectAsState()
val sessionState by AppSessionCoordinator.sessionState.collectAsState()
var accountInfoList by remember { mutableStateOf<List<AccountInfo>>(emptyList()) }
var startCreateAccountFlow by remember { mutableStateOf(false) }
var preservedMainNavStack by remember { mutableStateOf<List<Screen>>(emptyList()) }
@@ -247,6 +251,7 @@ class MainActivity : FragmentActivity() {
// Check for existing accounts and build AccountInfo list
LaunchedEffect(Unit) {
AppSessionCoordinator.syncFromCachedAccount(currentAccount)
val accounts = accountManager.getAllAccounts()
hasExistingAccount = accounts.isNotEmpty()
val infos = accounts.map { it.toAccountInfo() }
@@ -272,6 +277,38 @@ class MainActivity : FragmentActivity() {
}
}
LaunchedEffect(identityState.account) {
val identityAccount = identityState.account ?: return@LaunchedEffect
if (
identityAccount.publicKey != currentAccount?.publicKey ||
identityAccount.name != currentAccount?.name
) {
currentAccount = identityAccount
cacheSessionAccount(identityAccount)
}
}
LaunchedEffect(sessionState) {
val readyAccount = (sessionState as? SessionState.Ready)?.account ?: return@LaunchedEffect
if (
currentAccount?.publicKey != readyAccount.publicKey ||
currentAccount?.name != readyAccount.name
) {
currentAccount = readyAccount
cacheSessionAccount(readyAccount)
}
}
LaunchedEffect(currentAccount, isLoggedIn) {
val account = currentAccount
when {
account != null -> AppSessionCoordinator.markReady(account, reason = "main_activity_state")
isLoggedIn == true ->
AppSessionCoordinator.markAuthInProgress(reason = "main_activity_logged_in_no_account")
isLoggedIn == false -> AppSessionCoordinator.markLoggedOut(reason = "main_activity_logged_out")
}
}
// Wait for initial load
if (hasExistingAccount == null) {
Box(
@@ -353,24 +390,30 @@ class MainActivity : FragmentActivity() {
// При открытии по звонку с lock screen — пропускаем auth
openedForCall && hasExistingAccount == true ->
"main"
// First-registration race: DataStore may flip isLoggedIn=true
// before AuthFlow returns DecryptedAccount to currentAccount.
// Do not enter MainScreen with null account (it leads to
// empty keys/UI placeholders until relog).
isLoggedIn == true && currentAccount == null &&
// User explicitly entered account creation flow:
// keep auth screen even if intermediate session flags
// briefly oscillate during DataStore updates.
startCreateAccountFlow -> "auth_new"
sessionState is SessionState.LoggedOut &&
hasExistingAccount == false ->
"auth_new"
isLoggedIn != true && hasExistingAccount == false ->
"auth_new"
isLoggedIn != true && hasExistingAccount == true ->
sessionState is SessionState.LoggedOut &&
hasExistingAccount == true ->
"auth_unlock"
isLoggedIn == true &&
currentAccount == null &&
// First-registration race: DataStore may flip
// logged-in marker before currentAccount is ready.
// Keep user in create auth flow instead of jumping
// to main with incomplete session context.
sessionState is SessionState.AuthInProgress &&
hasExistingAccount == false ->
"auth_new"
sessionState is SessionState.AuthInProgress &&
hasExistingAccount == true ->
"auth_unlock"
protocolState ==
ProtocolState.DEVICE_VERIFICATION_REQUIRED ->
"device_confirm"
sessionState is SessionState.Ready -> "main"
else -> "main"
},
transitionSpec = {
@@ -437,52 +480,19 @@ class MainActivity : FragmentActivity() {
}
currentAccount = normalizedAccount
cacheSessionAccount(normalizedAccount)
hasExistingAccount = true
// Save as last logged account
normalizedAccount?.let {
accountManager.setLastLoggedPublicKey(it.publicKey)
// Initialize protocol/message account context
// immediately after auth completion to avoid
// packet processing race before MainScreen
// composition.
ProtocolManager.initializeAccount(
it.publicKey,
it.privateKey
AppSessionCoordinator.markReady(
account = it,
reason = "auth_complete"
)
}
// Первый запуск после регистрации:
// дополнительно перезапускаем auth/connect, чтобы не оставаться
// в "залипшем CONNECTING" до ручного рестарта приложения.
normalizedAccount?.let { authAccount ->
startAuthHandshakeFast(
authAccount.publicKey,
authAccount.privateKeyHash
} ?: AppSessionCoordinator.markAuthInProgress(
reason = "auth_complete_no_account"
)
scope.launch {
repeat(3) { attempt ->
if (ProtocolManager.isAuthenticated()) return@launch
delay(2000L * (attempt + 1))
if (ProtocolManager.isAuthenticated()) return@launch
ProtocolManager.reconnectNowIfNeeded(
"post_auth_complete_retry_${attempt + 1}"
)
startAuthHandshakeFast(
authAccount.publicKey,
authAccount.privateKeyHash
)
}
}
}
hasExistingAccount = true
// Reload accounts list
scope.launch {
normalizedAccount?.let {
// Синхронно помечаем текущий аккаунт активным в DataStore.
runCatching {
accountManager.setCurrentAccount(it.publicKey)
}
// Force-refresh account title from persisted
// profile (name/username) to avoid temporary
// public-key alias in UI after login.
@@ -516,6 +526,9 @@ class MainActivity : FragmentActivity() {
// lag
currentAccount = null
clearCachedSessionAccount()
AppSessionCoordinator.markLoggedOut(
reason = "auth_flow_logout"
)
com.rosetta.messenger.network.ProtocolManager
.disconnect()
scope.launch {
@@ -525,9 +538,12 @@ class MainActivity : FragmentActivity() {
)
}
"main" -> {
val activeAccountKey = currentAccount?.publicKey.orEmpty()
val resolvedAccount =
currentAccount
?: (sessionState as? SessionState.Ready)?.account
val activeAccountKey = resolvedAccount?.publicKey.orEmpty()
MainScreen(
account = currentAccount,
account = resolvedAccount,
initialNavStack =
if (
activeAccountKey.isNotBlank() &&
@@ -565,6 +581,9 @@ class MainActivity : FragmentActivity() {
// lag
currentAccount = null
clearCachedSessionAccount()
AppSessionCoordinator.markLoggedOut(
reason = "main_logout"
)
com.rosetta.messenger.network.ProtocolManager
.disconnect()
scope.launch {
@@ -600,6 +619,9 @@ class MainActivity : FragmentActivity() {
// 8. Navigate away last
currentAccount = null
clearCachedSessionAccount()
AppSessionCoordinator.markLoggedOut(
reason = "delete_current_account"
)
} catch (e: Exception) {
android.util.Log.e("DeleteAccount", "Failed to delete account", e)
}
@@ -641,6 +663,9 @@ class MainActivity : FragmentActivity() {
preservedMainNavAccountKey = ""
currentAccount = null
clearCachedSessionAccount()
AppSessionCoordinator.markLoggedOut(
reason = "delete_sidebar_current_account"
)
}
} catch (e: Exception) {
android.util.Log.e("DeleteAccount", "Failed to delete account from sidebar", e)
@@ -658,6 +683,9 @@ class MainActivity : FragmentActivity() {
// Switch to another account: logout current, then show unlock.
currentAccount = null
clearCachedSessionAccount()
AppSessionCoordinator.markLoggedOut(
reason = "switch_account"
)
com.rosetta.messenger.network.ProtocolManager.disconnect()
scope.launch {
accountManager.logout()
@@ -669,6 +697,9 @@ class MainActivity : FragmentActivity() {
preservedMainNavAccountKey = ""
currentAccount = null
clearCachedSessionAccount()
AppSessionCoordinator.markLoggedOut(
reason = "add_account"
)
com.rosetta.messenger.network.ProtocolManager.disconnect()
scope.launch {
accountManager.logout()
@@ -684,6 +715,9 @@ class MainActivity : FragmentActivity() {
preservedMainNavAccountKey = ""
currentAccount = null
clearCachedSessionAccount()
AppSessionCoordinator.markLoggedOut(
reason = "device_confirm_exit"
)
ProtocolManager.disconnect()
scope.launch {
accountManager.logout()
@@ -921,6 +955,7 @@ fun MainScreen(
// Following desktop version pattern: username is stored locally and loaded on app start
var accountUsername by remember { mutableStateOf("") }
var accountVerified by remember(accountPublicKey) { mutableIntStateOf(0) }
val identitySnapshot by IdentityStore.state.collectAsState()
var reloadTrigger by remember { mutableIntStateOf(0) }
// Load username AND name from AccountManager (persisted in DataStore)
@@ -1164,12 +1199,22 @@ fun MainScreen(
suspend fun refreshAccountIdentityState(accountKey: String) {
val accountManager = AccountManager(context)
val encryptedAccount = accountManager.getAccount(accountKey)
val identityOwn =
identitySnapshot.profile?.takeIf {
it.publicKey.equals(accountKey, ignoreCase = true)
}
val cachedOwn = ProtocolManager.getCachedUserInfo(accountKey)
val persistedName = encryptedAccount?.name?.trim().orEmpty()
val persistedUsername = encryptedAccount?.username?.trim().orEmpty()
val cachedName = cachedOwn?.title?.trim().orEmpty()
val cachedUsername = cachedOwn?.username?.trim().orEmpty()
val cachedName =
identityOwn?.displayName?.trim().orEmpty().ifBlank {
cachedOwn?.title?.trim().orEmpty()
}
val cachedUsername =
identityOwn?.username?.trim().orEmpty().ifBlank {
cachedOwn?.username?.trim().orEmpty()
}
if (cachedName.isNotBlank() &&
!isPlaceholderAccountName(cachedName) &&
@@ -1190,8 +1235,16 @@ fun MainScreen(
}
accountUsername = finalUsername
accountVerified = cachedOwn?.verified ?: 0
accountVerified = identityOwn?.verified ?: cachedOwn?.verified ?: 0
accountName = resolveAccountDisplayName(accountKey, preferredName, finalUsername)
IdentityStore.updateOwnProfile(
publicKey = accountKey,
displayName = accountName,
username = accountUsername,
verified = accountVerified,
resolved = true,
reason = "main_screen_refresh"
)
}
LaunchedEffect(accountPublicKey, reloadTrigger) {
@@ -1202,6 +1255,21 @@ fun MainScreen(
}
}
LaunchedEffect(identitySnapshot.profile, accountPublicKey) {
val profile = identitySnapshot.profile ?: return@LaunchedEffect
if (!profile.publicKey.equals(accountPublicKey, ignoreCase = true)) return@LaunchedEffect
val normalizedName =
resolveAccountDisplayName(
accountPublicKey,
profile.displayName.ifBlank { accountName },
profile.username.ifBlank { null }
)
accountName = normalizedName
accountUsername = profile.username
accountVerified = profile.verified
}
// Состояние протокола для передачи в SearchScreen
val protocolState by ProtocolManager.state.collectAsState()

View File

@@ -0,0 +1,108 @@
package com.rosetta.messenger.network
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.CopyOnWriteArrayList
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.asSharedFlow
import kotlinx.coroutines.launch
/**
* Centralized packet subscription registry.
*
* Guarantees exactly one low-level Protocol.waitPacket subscription per packet id
* and fans out packets to:
* 1) legacy callback listeners (waitPacket/unwaitPacket API),
* 2) SharedFlow collectors in network/UI layers.
*/
class PacketSubscriptionRegistry(
private val protocolProvider: () -> Protocol,
private val scope: CoroutineScope,
private val addLog: (String) -> Unit
) {
private data class PacketBus(
val packetId: Int,
val callbacks: CopyOnWriteArrayList<(Packet) -> Unit>,
val sharedFlow: MutableSharedFlow<Packet>,
val protocolBridge: (Packet) -> Unit
)
private val buses = ConcurrentHashMap<Int, PacketBus>()
private fun ensureBus(packetId: Int): PacketBus {
buses[packetId]?.let { return it }
val callbacks = CopyOnWriteArrayList<(Packet) -> Unit>()
val sharedFlow =
MutableSharedFlow<Packet>(
replay = 0,
extraBufferCapacity = 128,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
val bridge: (Packet) -> Unit = { packet ->
if (!sharedFlow.tryEmit(packet)) {
scope.launch { sharedFlow.emit(packet) }
}
callbacks.forEach { callback ->
runCatching { callback(packet) }
.onFailure { error ->
addLog("❌ PacketSubscriptionRegistry callback error: ${error.message}")
}
}
}
val created =
PacketBus(
packetId = packetId,
callbacks = callbacks,
sharedFlow = sharedFlow,
protocolBridge = bridge
)
val existing = buses.putIfAbsent(packetId, created)
if (existing == null) {
protocolProvider().waitPacket(packetId, bridge)
addLog(
"🧭 PacketSubscriptionRegistry attached id=0x${packetId.toString(16).uppercase()}"
)
return created
}
return existing
}
fun flow(packetId: Int): SharedFlow<Packet> = ensureBus(packetId).sharedFlow.asSharedFlow()
fun addCallback(packetId: Int, callback: (Packet) -> Unit) {
val bus = ensureBus(packetId)
if (bus.callbacks.contains(callback)) {
addLog(
"📝 registry waitPacket(0x${packetId.toString(16)}) skipped duplicate callback; callbacks=${bus.callbacks.size}"
)
return
}
bus.callbacks.add(callback)
addLog(
"📝 registry waitPacket(0x${packetId.toString(16)}) callback registered; callbacks=${bus.callbacks.size}"
)
}
fun removeCallback(packetId: Int, callback: (Packet) -> Unit) {
val bus = buses[packetId] ?: return
bus.callbacks.remove(callback)
addLog(
"📝 registry unwaitPacket(0x${packetId.toString(16)}) callback removed; callbacks=${bus.callbacks.size}"
)
}
fun destroy() {
buses.forEach { (packetId, bus) ->
runCatching {
protocolProvider().unwaitPacket(packetId, bus.protocolBridge)
}
}
buses.clear()
}
}

View File

@@ -187,6 +187,7 @@ class Protocol(
private var lastStateChangeTime = System.currentTimeMillis()
private var lastSuccessfulConnection = 0L
private var reconnectJob: Job? = null // Для отмены запланированных переподключений
private var connectingTimeoutJob: Job? = null
private var isConnecting = false // Флаг для защиты от одновременных подключений
private var connectingSinceMs = 0L
@@ -229,6 +230,7 @@ class Protocol(
val responseCode: Int?,
val responseMessage: String?
) : SessionEvent
data class ConnectingTimeout(val generation: Long) : SessionEvent
}
private val sessionEvents = Channel<SessionEvent>(Channel.UNLIMITED)
@@ -272,6 +274,7 @@ class Protocol(
is SessionEvent.SocketOpened -> handleSocketOpened(event)
is SessionEvent.SocketClosed -> handleSocketClosed(event)
is SessionEvent.SocketFailure -> handleSocketFailure(event)
is SessionEvent.ConnectingTimeout -> handleConnectingTimeout(event.generation)
}
} catch (e: CancellationException) {
throw e
@@ -323,6 +326,23 @@ class Protocol(
}
}
private fun cancelConnectingTimeout(reason: String) {
if (connectingTimeoutJob != null) {
log("⏱️ CONNECTING watchdog disarmed ($reason)")
}
connectingTimeoutJob?.cancel()
connectingTimeoutJob = null
}
private fun armConnectingTimeout(generation: Long) {
cancelConnectingTimeout(reason = "re-arm")
connectingTimeoutJob = scope.launch {
delay(CONNECTING_STUCK_TIMEOUT_MS)
enqueueSessionEvent(SessionEvent.ConnectingTimeout(generation))
}
log("⏱️ CONNECTING watchdog armed gen=$generation timeout=${CONNECTING_STUCK_TIMEOUT_MS}ms")
}
private fun handleSocketOpened(event: SessionEvent.SocketOpened) {
if (isStaleSocketEvent("onOpen", event.generation, event.socket)) return
log(
@@ -330,6 +350,7 @@ class Protocol(
"hasCredentials=${lastPublicKey != null}, gen=${event.generation}"
)
cancelConnectingTimeout(reason = "socket_opened")
isConnecting = false
connectingSinceMs = 0L
@@ -356,6 +377,7 @@ class Protocol(
"❌ WebSocket CLOSED: code=${event.code} reason='${event.reason}' state=${_state.value} " +
"manuallyClosed=$isManuallyClosed gen=${event.generation}"
)
cancelConnectingTimeout(reason = "socket_closed")
isConnecting = false
connectingSinceMs = 0L
handleDisconnectLocked("onClosed")
@@ -370,12 +392,41 @@ class Protocol(
log(" Reconnect attempts: $reconnectAttempts")
log(" Generation: ${event.generation}")
event.throwable.printStackTrace()
cancelConnectingTimeout(reason = "socket_failure")
isConnecting = false
connectingSinceMs = 0L
_lastError.value = event.throwable.message
handleDisconnectLocked("onFailure")
}
private fun handleConnectingTimeout(generation: Long) {
val currentState = _state.value
if (generation != activeConnectionGeneration) {
log(
"⏱️ CONNECTING watchdog ignored for stale generation " +
"(event=$generation active=$activeConnectionGeneration)"
)
return
}
if (!isConnecting || currentState != ProtocolState.CONNECTING) {
return
}
val elapsed = if (connectingSinceMs > 0L) {
System.currentTimeMillis() - connectingSinceMs
} else {
CONNECTING_STUCK_TIMEOUT_MS
}
log("🧯 CONNECTING TIMEOUT fired (elapsed=${elapsed}ms) -> forcing disconnect/reconnect")
cancelConnectingTimeout(reason = "timeout_fired")
isConnecting = false
connectingSinceMs = 0L
runCatching { webSocket?.cancel() }
webSocket = null
handleDisconnectLocked("connecting_timeout")
}
private fun handleHandshakeResponse(packet: PacketHandshake) {
handshakeJob?.cancel()
@@ -679,6 +730,7 @@ class Protocol(
return
}
log("🧯 CONNECTING STUCK detected (elapsed=${elapsed}ms) -> forcing reconnect reset")
cancelConnectingTimeout(reason = "connect_stuck_reset")
isConnecting = false
connectingSinceMs = 0L
runCatching { webSocket?.cancel() }
@@ -721,6 +773,7 @@ class Protocol(
isManuallyClosed = false
setState(ProtocolState.CONNECTING, "Starting new connection attempt #$reconnectAttempts")
_lastError.value = null
armConnectingTimeout(generation)
log("🔌 Connecting to: $serverAddress (attempt #$reconnectAttempts)")
@@ -1016,6 +1069,7 @@ class Protocol(
"🔌 DISCONNECT HANDLER: source=$source previousState=$previousState, manuallyClosed=$isManuallyClosed, " +
"reconnectAttempts=$reconnectAttempts, isConnecting=$isConnecting, instance=$instanceId"
)
cancelConnectingTimeout(reason = "handle_disconnect:$source")
// Duplicate callbacks are possible (e.g. heartbeat failure + onFailure/onClosed).
// If we are already disconnected and a reconnect is pending, avoid scheduling another one.
@@ -1120,6 +1174,7 @@ class Protocol(
private fun disconnectLocked(manual: Boolean, reason: String) {
log("🔌 Disconnect requested: manual=$manual reason='$reason' instance=$instanceId")
isManuallyClosed = manual
cancelConnectingTimeout(reason = "disconnect_locked")
isConnecting = false // Сбрасываем флаг
connectingSinceMs = 0L
reconnectJob?.cancel() // Отменяем запланированные переподключения
@@ -1170,6 +1225,7 @@ class Protocol(
return
}
log("🧯 FAST RECONNECT: stuck CONNECTING (${elapsed}ms) -> reset and reconnect")
cancelConnectingTimeout(reason = "fast_reconnect_reset")
isConnecting = false
connectingSinceMs = 0L
runCatching { webSocket?.cancel() }
@@ -1225,6 +1281,7 @@ class Protocol(
sessionLoopJob.cancelAndJoin()
}
}
connectingTimeoutJob?.cancel()
heartbeatJob?.cancel()
scope.cancel()
}

View File

@@ -10,10 +10,17 @@ import com.rosetta.messenger.data.AccountManager
import com.rosetta.messenger.data.GroupRepository
import com.rosetta.messenger.data.MessageRepository
import com.rosetta.messenger.data.isPlaceholderAccountName
import com.rosetta.messenger.network.connection.BootstrapCoordinator
import com.rosetta.messenger.network.connection.ConnectionOrchestrator
import com.rosetta.messenger.network.connection.OwnProfileSyncService
import com.rosetta.messenger.network.connection.PacketRouter
import com.rosetta.messenger.network.connection.RetryQueueService
import com.rosetta.messenger.session.IdentityStore
import com.rosetta.messenger.utils.MessageLogger
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.sync.Mutex
@@ -25,7 +32,6 @@ import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
import kotlin.coroutines.resume
/**
* Singleton manager for Protocol instance
@@ -62,6 +68,12 @@ object ProtocolManager {
private var appContext: Context? = null
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private val protocolInstanceLock = Any()
private val packetSubscriptionRegistry =
PacketSubscriptionRegistry(
protocolProvider = ::getProtocol,
scope = scope,
addLog = ::addLog
)
private val connectionSupervisor =
ProtocolConnectionSupervisor(
scope = scope,
@@ -75,6 +87,43 @@ object ProtocolManager {
maxSize = READY_PACKET_QUEUE_MAX,
ttlMs = READY_PACKET_QUEUE_TTL_MS
)
private val bootstrapCoordinator =
BootstrapCoordinator(
readyPacketGate = readyPacketGate,
addLog = ::addLog,
shortKeyForLog = ::shortKeyForLog,
sendPacketDirect = { packet -> getProtocol().sendPacket(packet) }
)
private val connectionOrchestrator =
ConnectionOrchestrator(
hasActiveInternet = ::hasActiveInternet,
waitForNetworkAndReconnect = ::waitForNetworkAndReconnect,
stopWaitingForNetwork = { reason -> stopWaitingForNetwork(reason) },
getProtocol = ::getProtocol,
appContextProvider = { appContext },
buildHandshakeDevice = ::buildHandshakeDevice
)
private val ownProfileSyncService = OwnProfileSyncService(::isPlaceholderAccountName)
private val packetRouter by lazy {
PacketRouter(
sendSearchPacket = { packet -> send(packet) },
privateHashProvider = {
try {
getProtocol().getPrivateHash()
} catch (_: Exception) {
null
}
}
)
}
private val retryQueueService =
RetryQueueService(
scope = scope,
sendPacket = { packet -> send(packet) },
isAuthenticated = ::isAuthenticated,
addLog = ::addLog,
markOutgoingAsError = ::markOutgoingAsError
)
@Volatile private var packetHandlersRegistered = false
@Volatile private var stateMonitoringStarted = false
@@ -119,20 +168,7 @@ object ProtocolManager {
val pendingDeviceVerification: StateFlow<DeviceEntry?> = _pendingDeviceVerification.asStateFlow()
// Сигнал обновления own profile (username/name загружены с сервера)
private val _ownProfileUpdated = MutableStateFlow(0L)
val ownProfileUpdated: StateFlow<Long> = _ownProfileUpdated.asStateFlow()
// 🔍 Global user info cache (like Desktop's InformationProvider.cachedUsers)
// publicKey → SearchUser (resolved via PacketSearch 0x03)
private val userInfoCache = ConcurrentHashMap<String, SearchUser>()
// Pending resolves: publicKey → list of continuations waiting for the result
private val pendingResolves = ConcurrentHashMap<String, MutableList<kotlinx.coroutines.CancellableContinuation<SearchUser?>>>()
// Pending search requests: query(username/publicKey fragment) → waiting continuations
private val pendingSearchQueries =
ConcurrentHashMap<String, MutableList<kotlinx.coroutines.CancellableContinuation<List<SearchUser>>>>()
private fun normalizeSearchQuery(value: String): String =
value.trim().removePrefix("@").lowercase(Locale.ROOT)
val ownProfileUpdated: StateFlow<Long> = ownProfileSyncService.ownProfileUpdated
private fun ensureConnectionSupervisor() {
connectionSupervisor.start()
@@ -143,14 +179,7 @@ object ProtocolManager {
}
private fun protocolToLifecycleState(state: ProtocolState): ConnectionLifecycleState =
when (state) {
ProtocolState.DISCONNECTED -> ConnectionLifecycleState.DISCONNECTED
ProtocolState.CONNECTING -> ConnectionLifecycleState.CONNECTING
ProtocolState.CONNECTED, ProtocolState.HANDSHAKING -> ConnectionLifecycleState.HANDSHAKING
ProtocolState.DEVICE_VERIFICATION_REQUIRED ->
ConnectionLifecycleState.DEVICE_VERIFICATION_REQUIRED
ProtocolState.AUTHENTICATED -> ConnectionLifecycleState.AUTHENTICATED
}
bootstrapCoordinator.protocolToLifecycleState(state)
private fun setConnectionLifecycleState(next: ConnectionLifecycleState, reason: String) {
if (_connectionLifecycleState.value == next) return
@@ -161,23 +190,18 @@ object ProtocolManager {
private fun recomputeConnectionLifecycleState(reason: String) {
val context = bootstrapContext
val nextState =
if (context.authenticated) {
if (context.accountInitialized && context.syncCompleted && context.ownProfileResolved) {
ConnectionLifecycleState.READY
} else {
ConnectionLifecycleState.BOOTSTRAPPING
}
} else {
protocolToLifecycleState(context.protocolState)
}
setConnectionLifecycleState(nextState, reason)
if (nextState == ConnectionLifecycleState.READY) {
flushReadyPacketQueue(context.accountPublicKey, reason)
bootstrapCoordinator.recomputeLifecycleState(
context = context,
currentState = _connectionLifecycleState.value,
reason = reason
) { state, updateReason ->
setConnectionLifecycleState(state, updateReason)
}
_connectionLifecycleState.value = nextState
}
private fun clearReadyPacketQueue(reason: String) {
readyPacketGate.clear(reason = reason, addLog = ::addLog)
bootstrapCoordinator.clearReadyPacketQueue(reason)
}
private fun enqueueReadyPacket(packet: Packet) {
@@ -185,41 +209,19 @@ object ProtocolManager {
messageRepository?.getCurrentAccountKey()?.trim().orEmpty().ifBlank {
bootstrapContext.accountPublicKey
}
readyPacketGate.enqueue(
bootstrapCoordinator.enqueueReadyPacket(
packet = packet,
accountPublicKey = accountKey,
state = _connectionLifecycleState.value,
shortKeyForLog = ::shortKeyForLog,
addLog = ::addLog
state = _connectionLifecycleState.value
)
}
private fun flushReadyPacketQueue(activeAccountKey: String, reason: String) {
val packetsToSend =
readyPacketGate.drainForAccount(
activeAccountKey = activeAccountKey,
reason = reason,
addLog = ::addLog
)
if (packetsToSend.isEmpty()) return
val protocolInstance = getProtocol()
packetsToSend.forEach { protocolInstance.sendPacket(it) }
bootstrapCoordinator.flushReadyPacketQueue(activeAccountKey, reason)
}
private fun packetCanBypassReadyGate(packet: Packet): Boolean =
when (packet) {
is PacketHandshake,
is PacketSync,
is PacketSearch,
is PacketPushNotification,
is PacketRequestTransport,
is PacketRequestUpdate,
is PacketSignalPeer,
is PacketWebRTC,
is PacketIceServers,
is PacketDeviceResolve -> true
else -> false
}
bootstrapCoordinator.packetCanBypassReadyGate(packet)
private suspend fun handleConnectionEvent(event: ConnectionEvent) {
when (event) {
@@ -281,20 +283,10 @@ object ProtocolManager {
}
}
is ConnectionEvent.Connect -> {
if (!hasActiveInternet()) {
waitForNetworkAndReconnect("connect:${event.reason}")
return
}
stopWaitingForNetwork("connect:${event.reason}")
getProtocol().connect()
connectionOrchestrator.handleConnect(event.reason)
}
is ConnectionEvent.FastReconnect -> {
if (!hasActiveInternet()) {
waitForNetworkAndReconnect("reconnect:${event.reason}")
return
}
stopWaitingForNetwork("reconnect:${event.reason}")
getProtocol().reconnectNowIfNeeded(event.reason)
connectionOrchestrator.handleFastReconnect(event.reason)
}
is ConnectionEvent.Disconnect -> {
stopWaitingForNetwork(event.reason)
@@ -321,15 +313,7 @@ object ProtocolManager {
recomputeConnectionLifecycleState("disconnect:${event.reason}")
}
is ConnectionEvent.Authenticate -> {
appContext?.let { context ->
runCatching {
val accountManager = AccountManager(context)
accountManager.setLastLoggedPublicKey(event.publicKey)
accountManager.setLastLoggedPrivateKeyHash(event.privateHash)
}
}
val device = buildHandshakeDevice()
getProtocol().startHandshake(event.publicKey, event.privateHash, device)
connectionOrchestrator.handleAuthenticate(event.publicKey, event.privateHash)
}
is ConnectionEvent.ProtocolStateChanged -> {
val previousProtocolState = bootstrapContext.protocolState
@@ -446,6 +430,11 @@ object ProtocolManager {
ownProfileFallbackJob?.cancel()
ownProfileFallbackJob = null
bootstrapContext = bootstrapContext.copy(ownProfileResolved = true)
IdentityStore.updateOwnProfile(
publicKey = event.publicKey,
resolved = true,
reason = "protocol_own_profile_resolved"
)
recomputeConnectionLifecycleState("own_profile_resolved")
}
is ConnectionEvent.OwnProfileFallbackTimeout -> {
@@ -455,6 +444,14 @@ object ProtocolManager {
"⏱️ Own profile fetch timeout — continuing bootstrap for ${shortKeyForLog(bootstrapContext.accountPublicKey)}"
)
bootstrapContext = bootstrapContext.copy(ownProfileResolved = true)
val accountPublicKey = bootstrapContext.accountPublicKey
if (accountPublicKey.isNotBlank()) {
IdentityStore.updateOwnProfile(
publicKey = accountPublicKey,
resolved = true,
reason = "protocol_own_profile_fallback_timeout"
)
}
recomputeConnectionLifecycleState("own_profile_fallback_timeout")
}
}
@@ -488,17 +485,6 @@ object ProtocolManager {
private val fullFailureBatchStreak = AtomicInteger(0)
private val syncBatchEndMutex = Mutex()
// iOS parity: outgoing message retry mechanism.
// iOS retries stuck WAITING messages every 4 seconds, up to 3 attempts,
// with a hard timeout of 80 seconds. Without this, messages stay as "clocks"
// until the next reconnect (which may never happen if the connection is alive).
private const val OUTGOING_RETRY_INTERVAL_MS = 4_000L
private const val OUTGOING_MAX_RETRY_ATTEMPTS = 3
private const val OUTGOING_MAX_LIFETIME_MS = 80_000L
private val pendingOutgoingPackets = ConcurrentHashMap<String, PacketMessage>()
private val pendingOutgoingAttempts = ConcurrentHashMap<String, Int>()
private val pendingOutgoingRetryJobs = ConcurrentHashMap<String, Job>()
private fun setSyncInProgress(value: Boolean) {
syncBatchInProgress = value
if (_syncInProgress.value != value) {
@@ -524,7 +510,9 @@ object ProtocolManager {
val timestamp =
java.text.SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault()).format(Date())
val line = "[$timestamp] $normalizedMessage"
if (shouldPersistProtocolTrace(normalizedMessage)) {
persistProtocolTraceLine(line)
}
if (!uiLogsEnabled) return
synchronized(debugLogsLock) {
if (debugLogsBuffer.size >= MAX_DEBUG_LOGS) {
@@ -535,6 +523,23 @@ object ProtocolManager {
flushDebugLogsThrottled()
}
/**
* Keep crash_reports trace lightweight when UI logs are disabled.
* This avoids excessive disk writes during long sync sessions.
*/
private fun shouldPersistProtocolTrace(message: String): Boolean {
if (uiLogsEnabled) return true
if (message.startsWith("") || message.startsWith("⚠️")) return true
if (message.contains("STATE CHANGE")) return true
if (message.contains("CONNECTION FULLY ESTABLISHED")) return true
if (message.contains("HANDSHAKE COMPLETE")) return true
if (message.contains("SYNC COMPLETE")) return true
if (message.startsWith("🔌 CONNECT CALLED") || message.startsWith("🔌 Connecting to")) return true
if (message.startsWith("✅ WebSocket OPEN")) return true
if (message.startsWith("📡 NETWORK")) return true
return false
}
private fun persistProtocolTraceLine(line: String) {
val context = appContext ?: return
runCatching {
@@ -832,28 +837,14 @@ object ProtocolManager {
waitPacket(0x03) { packet ->
val searchPacket = packet as PacketSearch
if (searchPacket.users.isNotEmpty()) {
scope.launch(Dispatchers.IO) {
val ownPublicKey =
getProtocol().getPublicKey()?.trim().orEmpty().ifBlank {
messageRepository?.getCurrentAccountKey()?.trim().orEmpty()
}
searchPacket.users.forEach { user ->
packetRouter.onSearchPacket(searchPacket) { user ->
val normalizedUserPublicKey = user.publicKey.trim()
// 🔍 Кэшируем всех пользователей (desktop parity: cachedUsers)
userInfoCache[normalizedUserPublicKey] = user
// Resume pending resolves for this publicKey (case-insensitive).
pendingResolves
.keys
.filter { it.equals(normalizedUserPublicKey, ignoreCase = true) }
.forEach { key ->
pendingResolves.remove(key)?.forEach { cont ->
try { cont.resume(user) } catch (_: Exception) {}
}
}
// Обновляем инфо в диалогах (для всех пользователей)
messageRepository?.updateDialogUserInfo(
normalizedUserPublicKey,
user.title,
@@ -861,84 +852,19 @@ object ProtocolManager {
user.verified
)
// Если это наш own profile — сохраняем username/name в AccountManager
if (ownPublicKey.isNotBlank() &&
normalizedUserPublicKey.equals(ownPublicKey, ignoreCase = true) &&
appContext != null) {
val accountManager = AccountManager(appContext!!)
if (user.title.isNotBlank() && !isPlaceholderAccountName(user.title)) {
accountManager.updateAccountName(ownPublicKey, user.title)
}
if (user.username.isNotBlank()) {
accountManager.updateAccountUsername(ownPublicKey, user.username)
}
_ownProfileUpdated.value = System.currentTimeMillis()
postConnectionEvent(
ConnectionEvent.OwnProfileResolved(user.publicKey)
val ownProfileResolved =
ownProfileSyncService.applyOwnProfileFromSearch(
appContext = appContext,
ownPublicKey = ownPublicKey,
user = user
)
if (ownProfileResolved) {
postConnectionEvent(ConnectionEvent.OwnProfileResolved(user.publicKey))
}
}
}
}
// Resume pending resolves that got empty response (no match)
if (searchPacket.search.isNotEmpty() && searchPacket.users.none { it.publicKey == searchPacket.search }) {
pendingResolves.remove(searchPacket.search)?.forEach { cont ->
try { cont.resume(null) } catch (_: Exception) {}
}
}
// Resume pending username/query searches.
// Server may return query in different case/format, so match robustly.
if (searchPacket.search.isNotEmpty()) {
val rawQuery = searchPacket.search.trim()
val normalizedQuery = normalizeSearchQuery(rawQuery)
val continuations = LinkedHashSet<kotlinx.coroutines.CancellableContinuation<List<SearchUser>>>()
fun collectByKey(key: String) {
if (key.isEmpty()) return
pendingSearchQueries.remove(key)?.let { continuations.addAll(it) }
}
collectByKey(rawQuery)
if (normalizedQuery.isNotEmpty() && normalizedQuery != rawQuery) {
collectByKey(normalizedQuery)
}
if (continuations.isEmpty()) {
val matchedByQuery =
pendingSearchQueries.keys.firstOrNull { pendingKey ->
pendingKey.equals(rawQuery, ignoreCase = true) ||
normalizeSearchQuery(pendingKey) == normalizedQuery
}
if (matchedByQuery != null) collectByKey(matchedByQuery)
}
if (continuations.isEmpty() && searchPacket.users.isNotEmpty()) {
val responseUsernames =
searchPacket.users
.map { normalizeSearchQuery(it.username) }
.filter { it.isNotEmpty() }
.toSet()
if (responseUsernames.isNotEmpty()) {
val matchedByUsers =
pendingSearchQueries.keys.firstOrNull { pendingKey ->
val normalizedPending = normalizeSearchQuery(pendingKey)
normalizedPending.isNotEmpty() &&
responseUsernames.contains(normalizedPending)
}
if (matchedByUsers != null) collectByKey(matchedByUsers)
}
}
continuations.forEach { cont ->
try {
cont.resume(searchPacket.users)
} catch (_: Exception) {}
}
}
}
// 🚀 Обработчик транспортного сервера (0x0F)
waitPacket(0x0F) { packet ->
val transportPacket = packet as PacketRequestTransport
@@ -1634,13 +1560,11 @@ object ProtocolManager {
* Аналог Desktop: useUserInformation(ownPublicKey) → PacketSearch(0x03)
*/
private fun fetchOwnProfile() {
val publicKey = getProtocol().getPublicKey() ?: return
val privateHash = getProtocol().getPrivateHash() ?: return
val packet = PacketSearch().apply {
this.privateKey = privateHash
this.search = publicKey
}
val packet =
ownProfileSyncService.buildOwnProfilePacket(
publicKey = getProtocol().getPublicKey(),
privateHash = getProtocol().getPrivateHash()
) ?: return
send(packet)
}
@@ -1651,63 +1575,25 @@ object ProtocolManager {
* @param timeoutMs max wait time for server response (default 3s)
*/
suspend fun resolveUserName(publicKey: String, timeoutMs: Long = 3000): String? {
if (publicKey.isEmpty()) return null
// 1. Check in-memory cache (instant)
userInfoCache[publicKey]?.let { cached ->
val name = cached.title.ifEmpty { cached.username }
if (name.isNotEmpty()) return name
}
// 2. Send PacketSearch and wait for response via suspendCancellableCoroutine
val privateHash = try { getProtocol().getPrivateHash() } catch (_: Exception) { null }
?: return null
return try {
withTimeout(timeoutMs) {
suspendCancellableCoroutine { cont ->
// Register continuation in pending list
pendingResolves.getOrPut(publicKey) { mutableListOf() }.add(cont)
cont.invokeOnCancellation {
pendingResolves[publicKey]?.remove(cont)
if (pendingResolves[publicKey]?.isEmpty() == true) {
pendingResolves.remove(publicKey)
}
}
// Send search request
val packet = PacketSearch().apply {
this.privateKey = privateHash
this.search = publicKey
}
send(packet)
}
}?.let { user -> user.title.ifEmpty { user.username }.ifEmpty { null } }
} catch (_: Exception) {
// Timeout or cancellation — clean up
pendingResolves.remove(publicKey)
null
}
return packetRouter.resolveUserName(publicKey = publicKey, timeoutMs = timeoutMs)
}
/**
* 🔍 Get cached user info (no network request)
*/
fun getCachedUserName(publicKey: String): String? {
val cached = userInfoCache[publicKey] ?: return null
return cached.title.ifEmpty { cached.username }.ifEmpty { null }
return packetRouter.getCachedUserName(publicKey)
}
/**
* 🔍 Get full cached user info (no network request)
*/
fun notifyOwnProfileUpdated() {
_ownProfileUpdated.value = System.currentTimeMillis()
ownProfileSyncService.notifyOwnProfileUpdated()
}
fun getCachedUserInfo(publicKey: String): SearchUser? {
return userInfoCache[publicKey]
return packetRouter.getCachedUserInfo(publicKey)
}
/**
@@ -1715,47 +1601,14 @@ object ProtocolManager {
* Username compare is case-insensitive and ignores '@'.
*/
fun getCachedUserByUsername(username: String): SearchUser? {
val normalizedUsername = normalizeSearchQuery(username)
if (normalizedUsername.isEmpty()) return null
return userInfoCache.values.firstOrNull { cached ->
normalizeSearchQuery(cached.username) == normalizedUsername
}
return packetRouter.getCachedUserByUsername(username)
}
/**
* 🔍 Resolve publicKey → full SearchUser (with server request if needed)
*/
suspend fun resolveUserInfo(publicKey: String, timeoutMs: Long = 3000): SearchUser? {
if (publicKey.isEmpty()) return null
// 1. Check in-memory cache
userInfoCache[publicKey]?.let { return it }
// 2. Send PacketSearch and wait
val privateHash = try { getProtocol().getPrivateHash() } catch (_: Exception) { null }
?: return null
return try {
withTimeout(timeoutMs) {
suspendCancellableCoroutine { cont ->
pendingResolves.getOrPut(publicKey) { mutableListOf() }.add(cont)
cont.invokeOnCancellation {
pendingResolves[publicKey]?.remove(cont)
if (pendingResolves[publicKey]?.isEmpty() == true) {
pendingResolves.remove(publicKey)
}
}
val packet = PacketSearch().apply {
this.privateKey = privateHash
this.search = publicKey
}
send(packet)
}
}
} catch (_: Exception) {
pendingResolves.remove(publicKey)
null
}
return packetRouter.resolveUserInfo(publicKey = publicKey, timeoutMs = timeoutMs)
}
/**
@@ -1763,45 +1616,7 @@ object ProtocolManager {
* Returns raw PacketSearch users list for the exact query.
*/
suspend fun searchUsers(query: String, timeoutMs: Long = 3000): List<SearchUser> {
val normalizedQuery = normalizeSearchQuery(query)
if (normalizedQuery.isEmpty()) return emptyList()
val privateHash = try { getProtocol().getPrivateHash() } catch (_: Exception) { null }
?: return emptyList()
val cachedMatches =
userInfoCache.values.filter { cached ->
normalizeSearchQuery(cached.username) == normalizedQuery && cached.publicKey.isNotBlank()
}
if (cachedMatches.isNotEmpty()) {
return cachedMatches.distinctBy { it.publicKey }
}
return try {
withTimeout(timeoutMs) {
suspendCancellableCoroutine { cont ->
pendingSearchQueries
.getOrPut(normalizedQuery) { mutableListOf() }
.add(cont)
cont.invokeOnCancellation {
pendingSearchQueries[normalizedQuery]?.remove(cont)
if (pendingSearchQueries[normalizedQuery]?.isEmpty() == true) {
pendingSearchQueries.remove(normalizedQuery)
}
}
val packet = PacketSearch().apply {
this.privateKey = privateHash
this.search = normalizedQuery
}
send(packet)
}
}
} catch (_: Exception) {
pendingSearchQueries.remove(normalizedQuery)
emptyList()
}
return packetRouter.searchUsers(query = query, timeoutMs = timeoutMs)
}
/**
@@ -1835,99 +1650,40 @@ object ProtocolManager {
/**
* Send an outgoing message packet and register it for automatic retry.
* iOS parity: registerOutgoingRetry + scheduleOutgoingRetry.
*/
fun sendMessageWithRetry(packet: PacketMessage) {
send(packet)
registerOutgoingRetry(packet)
retryQueueService.register(packet)
}
/**
* iOS parity: register an outgoing message for automatic retry.
* iOS parity: mark an outgoing message as error in persistent storage.
*/
private fun registerOutgoingRetry(packet: PacketMessage) {
val messageId = packet.messageId
pendingOutgoingRetryJobs[messageId]?.cancel()
pendingOutgoingPackets[messageId] = packet
pendingOutgoingAttempts[messageId] = 0
scheduleOutgoingRetry(messageId)
private suspend fun markOutgoingAsError(messageId: String, packet: PacketMessage) {
val repository = messageRepository ?: return
val opponentKey =
if (packet.fromPublicKey == repository.getCurrentAccountKey()) {
packet.toPublicKey
} else {
packet.fromPublicKey
}
/**
* iOS parity: schedule a retry attempt for a pending outgoing message.
* Retries every 4 seconds, marks as ERROR after 80s or 3 attempts.
*/
private fun scheduleOutgoingRetry(messageId: String) {
pendingOutgoingRetryJobs[messageId]?.cancel()
pendingOutgoingRetryJobs[messageId] = scope.launch {
delay(OUTGOING_RETRY_INTERVAL_MS)
val packet = pendingOutgoingPackets[messageId] ?: return@launch
val attempts = pendingOutgoingAttempts[messageId] ?: 0
// Check if message exceeded delivery timeout (80s)
val nowMs = System.currentTimeMillis()
val ageMs = nowMs - packet.timestamp
if (ageMs >= OUTGOING_MAX_LIFETIME_MS) {
addLog("⚠️ Message ${messageId.take(8)} expired after ${ageMs}ms — marking as error")
markOutgoingAsError(messageId, packet)
return@launch
}
if (attempts >= OUTGOING_MAX_RETRY_ATTEMPTS) {
addLog("⚠️ Message ${messageId.take(8)} exhausted $attempts retries — marking as error")
markOutgoingAsError(messageId, packet)
return@launch
}
if (!isAuthenticated()) {
// Not authenticated — defer to reconnect retry
addLog("⏳ Message ${messageId.take(8)} retry deferred — not authenticated")
resolveOutgoingRetry(messageId)
return@launch
}
val nextAttempt = attempts + 1
pendingOutgoingAttempts[messageId] = nextAttempt
addLog("🔄 Retrying message ${messageId.take(8)}, attempt $nextAttempt")
send(packet)
scheduleOutgoingRetry(messageId)
}
}
/**
* iOS parity: mark an outgoing message as error and clean up retry state.
*/
private fun markOutgoingAsError(messageId: String, packet: PacketMessage) {
scope.launch {
val repository = messageRepository ?: return@launch
val opponentKey = if (packet.fromPublicKey == repository.getCurrentAccountKey())
packet.toPublicKey else packet.fromPublicKey
val dialogKey = repository.getDialogKey(opponentKey)
repository.updateMessageDeliveryStatus(dialogKey, messageId, DeliveryStatus.ERROR)
}
resolveOutgoingRetry(messageId)
}
/**
* iOS parity: cancel retry and clean up state for a resolved outgoing message.
* Called when delivery ACK (0x08) is received.
*/
fun resolveOutgoingRetry(messageId: String) {
pendingOutgoingRetryJobs[messageId]?.cancel()
pendingOutgoingRetryJobs.remove(messageId)
pendingOutgoingPackets.remove(messageId)
pendingOutgoingAttempts.remove(messageId)
retryQueueService.resolve(messageId)
}
/**
* Cancel all pending outgoing retry jobs (e.g., on disconnect).
*/
private fun cancelAllOutgoingRetries() {
pendingOutgoingRetryJobs.values.forEach { it.cancel() }
pendingOutgoingRetryJobs.clear()
pendingOutgoingPackets.clear()
pendingOutgoingAttempts.clear()
retryQueueService.clear()
}
/**
@@ -2056,14 +1812,21 @@ object ProtocolManager {
* Register packet handler
*/
fun waitPacket(packetId: Int, callback: (Packet) -> Unit) {
getProtocol().waitPacket(packetId, callback)
packetSubscriptionRegistry.addCallback(packetId, callback)
}
/**
* Unregister packet handler
*/
fun unwaitPacket(packetId: Int, callback: (Packet) -> Unit) {
getProtocol().unwaitPacket(packetId, callback)
packetSubscriptionRegistry.removeCallback(packetId, callback)
}
/**
* SharedFlow fan-out stream for packet id.
*/
fun packetFlow(packetId: Int): SharedFlow<Packet> {
return packetSubscriptionRegistry.flow(packetId)
}
private fun buildHandshakeDevice(): HandshakeDevice {
@@ -2139,6 +1902,7 @@ object ProtocolManager {
*/
fun destroy() {
stopWaitingForNetwork("destroy")
packetSubscriptionRegistry.destroy()
synchronized(protocolInstanceLock) {
protocol?.destroy()
protocol = null

View File

@@ -0,0 +1,92 @@
package com.rosetta.messenger.network.connection
import com.rosetta.messenger.network.*
class BootstrapCoordinator(
private val readyPacketGate: ReadyPacketGate,
private val addLog: (String) -> Unit,
private val shortKeyForLog: (String) -> String,
private val sendPacketDirect: (Packet) -> Unit
) {
fun protocolToLifecycleState(state: ProtocolState): ConnectionLifecycleState =
when (state) {
ProtocolState.DISCONNECTED -> ConnectionLifecycleState.DISCONNECTED
ProtocolState.CONNECTING -> ConnectionLifecycleState.CONNECTING
ProtocolState.CONNECTED, ProtocolState.HANDSHAKING -> ConnectionLifecycleState.HANDSHAKING
ProtocolState.DEVICE_VERIFICATION_REQUIRED ->
ConnectionLifecycleState.DEVICE_VERIFICATION_REQUIRED
ProtocolState.AUTHENTICATED -> ConnectionLifecycleState.AUTHENTICATED
}
fun packetCanBypassReadyGate(packet: Packet): Boolean =
when (packet) {
is PacketHandshake,
is PacketSync,
is PacketSearch,
is PacketPushNotification,
is PacketRequestTransport,
is PacketRequestUpdate,
is PacketSignalPeer,
is PacketWebRTC,
is PacketIceServers,
is PacketDeviceResolve -> true
else -> false
}
fun recomputeLifecycleState(
context: ConnectionBootstrapContext,
currentState: ConnectionLifecycleState,
reason: String,
onStateChanged: (ConnectionLifecycleState, String) -> Unit
): ConnectionLifecycleState {
val nextState =
if (context.authenticated) {
if (context.accountInitialized && context.syncCompleted && context.ownProfileResolved) {
ConnectionLifecycleState.READY
} else {
ConnectionLifecycleState.BOOTSTRAPPING
}
} else {
protocolToLifecycleState(context.protocolState)
}
if (currentState != nextState) {
onStateChanged(nextState, reason)
}
if (nextState == ConnectionLifecycleState.READY) {
flushReadyPacketQueue(context.accountPublicKey, reason)
}
return nextState
}
fun clearReadyPacketQueue(reason: String) {
readyPacketGate.clear(reason = reason, addLog = addLog)
}
fun enqueueReadyPacket(
packet: Packet,
accountPublicKey: String,
state: ConnectionLifecycleState
) {
readyPacketGate.enqueue(
packet = packet,
accountPublicKey = accountPublicKey,
state = state,
shortKeyForLog = shortKeyForLog,
addLog = addLog
)
}
fun flushReadyPacketQueue(activeAccountKey: String, reason: String) {
val packetsToSend =
readyPacketGate.drainForAccount(
activeAccountKey = activeAccountKey,
reason = reason,
addLog = addLog
)
if (packetsToSend.isEmpty()) return
packetsToSend.forEach(sendPacketDirect)
}
}

View File

@@ -0,0 +1,45 @@
package com.rosetta.messenger.network.connection
import android.content.Context
import com.rosetta.messenger.data.AccountManager
import com.rosetta.messenger.network.HandshakeDevice
import com.rosetta.messenger.network.Protocol
class ConnectionOrchestrator(
private val hasActiveInternet: () -> Boolean,
private val waitForNetworkAndReconnect: (String) -> Unit,
private val stopWaitingForNetwork: (String) -> Unit,
private val getProtocol: () -> Protocol,
private val appContextProvider: () -> Context?,
private val buildHandshakeDevice: () -> HandshakeDevice
) {
fun handleConnect(reason: String) {
if (!hasActiveInternet()) {
waitForNetworkAndReconnect("connect:$reason")
return
}
stopWaitingForNetwork("connect:$reason")
getProtocol().connect()
}
fun handleFastReconnect(reason: String) {
if (!hasActiveInternet()) {
waitForNetworkAndReconnect("reconnect:$reason")
return
}
stopWaitingForNetwork("reconnect:$reason")
getProtocol().reconnectNowIfNeeded(reason)
}
fun handleAuthenticate(publicKey: String, privateHash: String) {
appContextProvider()?.let { context ->
runCatching {
val accountManager = AccountManager(context)
accountManager.setLastLoggedPublicKey(publicKey)
accountManager.setLastLoggedPrivateKeyHash(privateHash)
}
}
val device = buildHandshakeDevice()
getProtocol().startHandshake(publicKey, privateHash, device)
}
}

View File

@@ -0,0 +1,53 @@
package com.rosetta.messenger.network.connection
import android.content.Context
import com.rosetta.messenger.data.AccountManager
import com.rosetta.messenger.network.PacketSearch
import com.rosetta.messenger.network.SearchUser
import com.rosetta.messenger.session.IdentityStore
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
class OwnProfileSyncService(
private val isPlaceholderAccountName: (String?) -> Boolean
) {
private val _ownProfileUpdated = MutableStateFlow(0L)
val ownProfileUpdated: StateFlow<Long> = _ownProfileUpdated.asStateFlow()
fun notifyOwnProfileUpdated() {
_ownProfileUpdated.value = System.currentTimeMillis()
}
suspend fun applyOwnProfileFromSearch(
appContext: Context?,
ownPublicKey: String,
user: SearchUser
): Boolean {
if (ownPublicKey.isBlank()) return false
if (!user.publicKey.equals(ownPublicKey, ignoreCase = true)) return false
val context = appContext ?: return true
val accountManager = AccountManager(context)
if (user.title.isNotBlank() && !isPlaceholderAccountName(user.title)) {
accountManager.updateAccountName(ownPublicKey, user.title)
}
if (user.username.isNotBlank()) {
accountManager.updateAccountUsername(ownPublicKey, user.username)
}
IdentityStore.updateOwnProfile(user, reason = "protocol_search_own_profile")
_ownProfileUpdated.value = System.currentTimeMillis()
return true
}
fun buildOwnProfilePacket(publicKey: String?, privateHash: String?): PacketSearch? {
val normalizedPublicKey = publicKey?.trim().orEmpty()
val normalizedPrivateHash = privateHash?.trim().orEmpty()
if (normalizedPublicKey.isEmpty() || normalizedPrivateHash.isEmpty()) return null
return PacketSearch().apply {
this.privateKey = normalizedPrivateHash
this.search = normalizedPublicKey
}
}
}

View File

@@ -0,0 +1,221 @@
package com.rosetta.messenger.network.connection
import com.rosetta.messenger.network.PacketSearch
import com.rosetta.messenger.network.SearchUser
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withTimeout
import java.util.LinkedHashSet
import java.util.Locale
import java.util.concurrent.ConcurrentHashMap
import kotlin.coroutines.resume
class PacketRouter(
private val sendSearchPacket: (PacketSearch) -> Unit,
private val privateHashProvider: () -> String?
) {
private val userInfoCache = ConcurrentHashMap<String, SearchUser>()
private val pendingResolves =
ConcurrentHashMap<String, MutableList<kotlinx.coroutines.CancellableContinuation<SearchUser?>>>()
private val pendingSearchQueries =
ConcurrentHashMap<String, MutableList<kotlinx.coroutines.CancellableContinuation<List<SearchUser>>>>()
private fun normalizeSearchQuery(value: String): String =
value.trim().removePrefix("@").lowercase(Locale.ROOT)
suspend fun onSearchPacket(packet: PacketSearch, onUserDiscovered: suspend (SearchUser) -> Unit) {
if (packet.users.isNotEmpty()) {
packet.users.forEach { user ->
val normalizedUserPublicKey = user.publicKey.trim()
userInfoCache[normalizedUserPublicKey] = user
pendingResolves
.keys
.filter { it.equals(normalizedUserPublicKey, ignoreCase = true) }
.forEach { key ->
pendingResolves.remove(key)?.forEach { cont ->
try {
cont.resume(user)
} catch (_: Exception) {}
}
}
onUserDiscovered(user)
}
}
if (packet.search.isNotEmpty() && packet.users.none { it.publicKey == packet.search }) {
pendingResolves.remove(packet.search)?.forEach { cont ->
try {
cont.resume(null)
} catch (_: Exception) {}
}
}
if (packet.search.isNotEmpty()) {
val rawQuery = packet.search.trim()
val normalizedQuery = normalizeSearchQuery(rawQuery)
val continuations =
LinkedHashSet<kotlinx.coroutines.CancellableContinuation<List<SearchUser>>>()
fun collectByKey(key: String) {
if (key.isEmpty()) return
pendingSearchQueries.remove(key)?.let { continuations.addAll(it) }
}
collectByKey(rawQuery)
if (normalizedQuery.isNotEmpty() && normalizedQuery != rawQuery) {
collectByKey(normalizedQuery)
}
if (continuations.isEmpty()) {
val matchedByQuery =
pendingSearchQueries.keys.firstOrNull { pendingKey ->
pendingKey.equals(rawQuery, ignoreCase = true) ||
normalizeSearchQuery(pendingKey) == normalizedQuery
}
if (matchedByQuery != null) collectByKey(matchedByQuery)
}
if (continuations.isEmpty() && packet.users.isNotEmpty()) {
val responseUsernames =
packet.users
.map { normalizeSearchQuery(it.username) }
.filter { it.isNotEmpty() }
.toSet()
if (responseUsernames.isNotEmpty()) {
val matchedByUsers =
pendingSearchQueries.keys.firstOrNull { pendingKey ->
val normalizedPending = normalizeSearchQuery(pendingKey)
normalizedPending.isNotEmpty() &&
responseUsernames.contains(normalizedPending)
}
if (matchedByUsers != null) collectByKey(matchedByUsers)
}
}
continuations.forEach { cont ->
try {
cont.resume(packet.users)
} catch (_: Exception) {}
}
}
}
fun getCachedUserName(publicKey: String): String? {
val cached = userInfoCache[publicKey] ?: return null
return cached.title.ifEmpty { cached.username }.ifEmpty { null }
}
fun getCachedUserInfo(publicKey: String): SearchUser? = userInfoCache[publicKey]
fun getCachedUserByUsername(username: String): SearchUser? {
val normalizedUsername = normalizeSearchQuery(username)
if (normalizedUsername.isEmpty()) return null
return userInfoCache.values.firstOrNull { cached ->
normalizeSearchQuery(cached.username) == normalizedUsername
}
}
suspend fun resolveUserName(publicKey: String, timeoutMs: Long = 3000): String? {
if (publicKey.isEmpty()) return null
userInfoCache[publicKey]?.let { cached ->
val name = cached.title.ifEmpty { cached.username }
if (name.isNotEmpty()) return name
}
val privateHash = privateHashProvider()?.takeIf { it.isNotBlank() } ?: return null
return try {
withTimeout(timeoutMs) {
suspendCancellableCoroutine { cont ->
pendingResolves.getOrPut(publicKey) { mutableListOf() }.add(cont)
cont.invokeOnCancellation {
pendingResolves[publicKey]?.remove(cont)
if (pendingResolves[publicKey]?.isEmpty() == true) {
pendingResolves.remove(publicKey)
}
}
val packet = PacketSearch().apply {
this.privateKey = privateHash
this.search = publicKey
}
sendSearchPacket(packet)
}
}?.let { user -> user.title.ifEmpty { user.username }.ifEmpty { null } }
} catch (_: Exception) {
pendingResolves.remove(publicKey)
null
}
}
suspend fun resolveUserInfo(publicKey: String, timeoutMs: Long = 3000): SearchUser? {
if (publicKey.isEmpty()) return null
userInfoCache[publicKey]?.let { return it }
val privateHash = privateHashProvider()?.takeIf { it.isNotBlank() } ?: return null
return try {
withTimeout(timeoutMs) {
suspendCancellableCoroutine { cont ->
pendingResolves.getOrPut(publicKey) { mutableListOf() }.add(cont)
cont.invokeOnCancellation {
pendingResolves[publicKey]?.remove(cont)
if (pendingResolves[publicKey]?.isEmpty() == true) {
pendingResolves.remove(publicKey)
}
}
val packet = PacketSearch().apply {
this.privateKey = privateHash
this.search = publicKey
}
sendSearchPacket(packet)
}
}
} catch (_: Exception) {
pendingResolves.remove(publicKey)
null
}
}
suspend fun searchUsers(query: String, timeoutMs: Long = 3000): List<SearchUser> {
val normalizedQuery = normalizeSearchQuery(query)
if (normalizedQuery.isEmpty()) return emptyList()
val privateHash = privateHashProvider()?.takeIf { it.isNotBlank() } ?: return emptyList()
val cachedMatches =
userInfoCache.values.filter { cached ->
normalizeSearchQuery(cached.username) == normalizedQuery && cached.publicKey.isNotBlank()
}
if (cachedMatches.isNotEmpty()) {
return cachedMatches.distinctBy { it.publicKey }
}
return try {
withTimeout(timeoutMs) {
suspendCancellableCoroutine { cont ->
pendingSearchQueries.getOrPut(normalizedQuery) { mutableListOf() }.add(cont)
cont.invokeOnCancellation {
pendingSearchQueries[normalizedQuery]?.remove(cont)
if (pendingSearchQueries[normalizedQuery]?.isEmpty() == true) {
pendingSearchQueries.remove(normalizedQuery)
}
}
val packet = PacketSearch().apply {
this.privateKey = privateHash
this.search = normalizedQuery
}
sendSearchPacket(packet)
}
}
} catch (_: Exception) {
pendingSearchQueries.remove(normalizedQuery)
emptyList()
}
}
}

View File

@@ -0,0 +1,96 @@
package com.rosetta.messenger.network.connection
import com.rosetta.messenger.network.PacketMessage
import java.util.concurrent.ConcurrentHashMap
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
/**
* Outgoing retry queue for PacketMessage delivery.
*
* Mirrors iOS behavior:
* - retry every 4s,
* - max 3 attempts,
* - max 80s lifetime.
*/
class RetryQueueService(
private val scope: CoroutineScope,
private val sendPacket: (PacketMessage) -> Unit,
private val isAuthenticated: () -> Boolean,
private val addLog: (String) -> Unit,
private val markOutgoingAsError: suspend (messageId: String, packet: PacketMessage) -> Unit,
private val retryIntervalMs: Long = 4_000L,
private val maxRetryAttempts: Int = 3,
private val maxLifetimeMs: Long = 80_000L
) {
private val pendingOutgoingPackets = ConcurrentHashMap<String, PacketMessage>()
private val pendingOutgoingAttempts = ConcurrentHashMap<String, Int>()
private val pendingOutgoingRetryJobs = ConcurrentHashMap<String, Job>()
fun register(packet: PacketMessage) {
val messageId = packet.messageId
pendingOutgoingRetryJobs[messageId]?.cancel()
pendingOutgoingPackets[messageId] = packet
pendingOutgoingAttempts[messageId] = 0
schedule(messageId)
}
fun resolve(messageId: String) {
pendingOutgoingRetryJobs[messageId]?.cancel()
pendingOutgoingRetryJobs.remove(messageId)
pendingOutgoingPackets.remove(messageId)
pendingOutgoingAttempts.remove(messageId)
}
fun clear() {
pendingOutgoingRetryJobs.values.forEach { it.cancel() }
pendingOutgoingRetryJobs.clear()
pendingOutgoingPackets.clear()
pendingOutgoingAttempts.clear()
}
private fun schedule(messageId: String) {
pendingOutgoingRetryJobs[messageId]?.cancel()
pendingOutgoingRetryJobs[messageId] =
scope.launch {
delay(retryIntervalMs)
val packet = pendingOutgoingPackets[messageId] ?: return@launch
val attempts = pendingOutgoingAttempts[messageId] ?: 0
val nowMs = System.currentTimeMillis()
val ageMs = nowMs - packet.timestamp
if (ageMs >= maxLifetimeMs) {
addLog(
"⚠️ Message ${messageId.take(8)} expired after ${ageMs}ms — marking as error"
)
scope.launch { markOutgoingAsError(messageId, packet) }
resolve(messageId)
return@launch
}
if (attempts >= maxRetryAttempts) {
addLog(
"⚠️ Message ${messageId.take(8)} exhausted $attempts retries — marking as error"
)
scope.launch { markOutgoingAsError(messageId, packet) }
resolve(messageId)
return@launch
}
if (!isAuthenticated()) {
addLog("⏳ Message ${messageId.take(8)} retry deferred — not authenticated")
resolve(messageId)
return@launch
}
val nextAttempt = attempts + 1
pendingOutgoingAttempts[messageId] = nextAttempt
addLog("🔄 Retrying message ${messageId.take(8)}, attempt $nextAttempt")
sendPacket(packet)
schedule(messageId)
}
}
}

View File

@@ -0,0 +1,78 @@
package com.rosetta.messenger.session
import com.rosetta.messenger.data.AccountManager
import com.rosetta.messenger.data.DecryptedAccount
import com.rosetta.messenger.network.ProtocolManager
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
sealed interface SessionState {
data object LoggedOut : SessionState
data class AuthInProgress(
val publicKey: String? = null,
val reason: String = ""
) : SessionState
data class Ready(
val account: DecryptedAccount,
val reason: String = ""
) : SessionState
}
/**
* Single source of truth for app-level auth/session lifecycle.
* UI should rely on this state instead of scattering account checks.
*/
object AppSessionCoordinator {
private val _sessionState = MutableStateFlow<SessionState>(SessionState.LoggedOut)
val sessionState: StateFlow<SessionState> = _sessionState.asStateFlow()
fun markLoggedOut(reason: String = "") {
_sessionState.value = SessionState.LoggedOut
IdentityStore.markLoggedOut(reason = reason)
}
fun markAuthInProgress(publicKey: String? = null, reason: String = "") {
_sessionState.value = SessionState.AuthInProgress(publicKey = publicKey, reason = reason)
IdentityStore.markAuthInProgress(publicKey = publicKey, reason = reason)
}
fun markReady(account: DecryptedAccount, reason: String = "") {
_sessionState.value = SessionState.Ready(account = account, reason = reason)
IdentityStore.setAccount(account = account, reason = reason)
}
fun syncFromCachedAccount(account: DecryptedAccount?) {
if (account == null) {
if (_sessionState.value is SessionState.Ready) {
_sessionState.value = SessionState.LoggedOut
}
IdentityStore.markLoggedOut(reason = "cached_account_cleared")
return
}
_sessionState.value = SessionState.Ready(account = account, reason = "cached")
IdentityStore.setAccount(account = account, reason = "cached")
}
/**
* Unified bootstrap used by registration and unlock flows.
* Keeps protocol/account initialization sequence in one place.
*/
suspend fun bootstrapAuthenticatedSession(
accountManager: AccountManager,
account: DecryptedAccount,
reason: String
) {
markAuthInProgress(publicKey = account.publicKey, reason = reason)
// Initialize storage-bound account context before handshake completes
// to avoid early sync/message race conditions.
ProtocolManager.initializeAccount(account.publicKey, account.privateKey)
ProtocolManager.connect()
ProtocolManager.authenticate(account.publicKey, account.privateKeyHash)
ProtocolManager.reconnectNowIfNeeded("session_bootstrap_$reason")
accountManager.setCurrentAccount(account.publicKey)
markReady(account, reason = reason)
}
}

View File

@@ -0,0 +1,125 @@
package com.rosetta.messenger.session
import com.rosetta.messenger.data.DecryptedAccount
import com.rosetta.messenger.network.SearchUser
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
data class IdentityProfile(
val publicKey: String,
val displayName: String = "",
val username: String = "",
val verified: Int = 0,
val resolved: Boolean = false,
val updatedAtMs: Long = System.currentTimeMillis()
)
data class IdentityStateSnapshot(
val account: DecryptedAccount? = null,
val profile: IdentityProfile? = null,
val authInProgress: Boolean = false,
val pendingPublicKey: String? = null,
val reason: String = ""
) {
val ownProfileResolved: Boolean
get() {
val activeAccount = account ?: return false
val ownProfile = profile ?: return false
return ownProfile.resolved && ownProfile.publicKey.equals(activeAccount.publicKey, ignoreCase = true)
}
}
/**
* Runtime identity source of truth for account/profile resolution.
*/
object IdentityStore {
private val _state = MutableStateFlow(IdentityStateSnapshot())
val state: StateFlow<IdentityStateSnapshot> = _state.asStateFlow()
fun markLoggedOut(reason: String = "") {
_state.value =
IdentityStateSnapshot(
account = null,
profile = null,
authInProgress = false,
pendingPublicKey = null,
reason = reason
)
}
fun markAuthInProgress(publicKey: String? = null, reason: String = "") {
_state.value =
_state.value.copy(
authInProgress = true,
pendingPublicKey = publicKey?.trim().orEmpty().ifBlank { null },
reason = reason
)
}
fun setAccount(account: DecryptedAccount, reason: String = "") {
val current = _state.value
val existingProfile = current.profile
val nextProfile =
if (
existingProfile != null &&
existingProfile.publicKey.equals(account.publicKey, ignoreCase = true)
) {
existingProfile
} else {
null
}
_state.value =
current.copy(
account = account,
profile = nextProfile,
authInProgress = false,
pendingPublicKey = null,
reason = reason
)
}
fun updateOwnProfile(
publicKey: String,
displayName: String? = null,
username: String? = null,
verified: Int? = null,
resolved: Boolean = true,
reason: String = ""
) {
val normalizedPublicKey = publicKey.trim()
if (normalizedPublicKey.isBlank()) return
val current = _state.value
val base =
current.profile?.takeIf { it.publicKey.equals(normalizedPublicKey, ignoreCase = true) }
?: IdentityProfile(publicKey = normalizedPublicKey)
val nextProfile =
base.copy(
displayName = displayName?.takeIf { it.isNotBlank() } ?: base.displayName,
username = username?.takeIf { it.isNotBlank() } ?: base.username,
verified = verified ?: base.verified,
resolved = base.resolved || resolved,
updatedAtMs = System.currentTimeMillis()
)
_state.value =
current.copy(
profile = nextProfile,
reason = reason
)
}
fun updateOwnProfile(user: SearchUser, reason: String = "") {
updateOwnProfile(
publicKey = user.publicKey,
displayName = user.title,
username = user.username,
verified = user.verified,
resolved = true,
reason = reason
)
}
}

View File

@@ -8,6 +8,7 @@ import androidx.compose.ui.platform.LocalView
import androidx.core.view.WindowCompat
import com.rosetta.messenger.data.DecryptedAccount
import com.rosetta.messenger.data.AccountManager
import com.rosetta.messenger.session.AppSessionCoordinator
enum class AuthScreen {
SELECT_ACCOUNT,
@@ -62,6 +63,13 @@ fun AuthFlow(
var showCreateModal by remember { mutableStateOf(false) }
var isImportMode by remember { mutableStateOf(false) }
LaunchedEffect(currentScreen, selectedAccountId) {
AppSessionCoordinator.markAuthInProgress(
publicKey = selectedAccountId,
reason = "auth_flow_${currentScreen.name.lowercase()}"
)
}
// If parent requests create mode while AuthFlow is alive, jump to Welcome/Create path.
LaunchedEffect(startInCreateMode) {
if (startInCreateMode && currentScreen == AuthScreen.UNLOCK) {

View File

@@ -29,7 +29,7 @@ import com.rosetta.messenger.crypto.CryptoManager
import com.rosetta.messenger.data.AccountManager
import com.rosetta.messenger.data.DecryptedAccount
import com.rosetta.messenger.data.EncryptedAccount
import com.rosetta.messenger.network.ProtocolManager
import com.rosetta.messenger.session.AppSessionCoordinator
import com.rosetta.messenger.ui.onboarding.PrimaryBlue
import kotlinx.coroutines.launch
@@ -309,11 +309,6 @@ fun SetPasswordScreen(
)
accountManager.saveAccount(account)
val privateKeyHash = CryptoManager.generatePrivateKeyHash(keyPair.privateKey)
// Initialize repository/account context before handshake completes to avoid
// "Sync postponed until account is initialized" race on first login.
ProtocolManager.initializeAccount(keyPair.publicKey, keyPair.privateKey)
startAuthHandshakeFast(keyPair.publicKey, privateKeyHash)
accountManager.setCurrentAccount(keyPair.publicKey)
val decryptedAccount = DecryptedAccount(
publicKey = keyPair.publicKey,
privateKey = keyPair.privateKey,
@@ -321,6 +316,11 @@ fun SetPasswordScreen(
privateKeyHash = privateKeyHash,
name = truncatedKey
)
AppSessionCoordinator.bootstrapAuthenticatedSession(
accountManager = accountManager,
account = decryptedAccount,
reason = "set_password"
)
onAccountCreated(decryptedAccount)
} catch (e: Exception) {
error = "Failed to create account: ${e.message}"

View File

@@ -45,8 +45,8 @@ import com.rosetta.messenger.data.DecryptedAccount
import com.rosetta.messenger.data.EncryptedAccount
import com.rosetta.messenger.data.resolveAccountDisplayName
import com.rosetta.messenger.database.RosettaDatabase
import com.rosetta.messenger.network.ProtocolManager
import com.rosetta.messenger.repository.AvatarRepository
import com.rosetta.messenger.session.AppSessionCoordinator
import com.rosetta.messenger.ui.components.AvatarImage
import com.rosetta.messenger.ui.chats.getAvatarColor
import com.rosetta.messenger.ui.chats.getAvatarText
@@ -117,12 +117,11 @@ val decryptedPrivateKey = CryptoManager.decryptWithPassword(
name = selectedAccount.name
)
// Initialize repository/account context before handshake completes to avoid
// "Sync postponed until account is initialized" race.
ProtocolManager.initializeAccount(account.publicKey, decryptedPrivateKey)
startAuthHandshakeFast(account.publicKey, privateKeyHash)
accountManager.setCurrentAccount(account.publicKey)
AppSessionCoordinator.bootstrapAuthenticatedSession(
accountManager = accountManager,
account = decryptedAccount,
reason = "unlock"
)
onSuccess(decryptedAccount)
} catch (e: Exception) {
onError("Failed to unlock: ${e.message}")

View File

@@ -18,6 +18,12 @@ import com.rosetta.messenger.network.*
import com.rosetta.messenger.ui.components.metaball.DevicePerformanceClass
import com.rosetta.messenger.ui.components.metaball.PerformanceClass
import com.rosetta.messenger.ui.chats.models.*
import com.rosetta.messenger.ui.chats.usecase.ForwardPayloadMessage
import com.rosetta.messenger.ui.chats.usecase.SendForwardUseCase
import com.rosetta.messenger.ui.chats.usecase.SendMediaMessageCommand
import com.rosetta.messenger.ui.chats.usecase.SendMediaMessageUseCase
import com.rosetta.messenger.ui.chats.usecase.SendTextMessageCommand
import com.rosetta.messenger.ui.chats.usecase.SendTextMessageUseCase
import com.rosetta.messenger.utils.AttachmentFileManager
import com.rosetta.messenger.utils.MessageLogger
import com.rosetta.messenger.utils.MessageThrottleManager
@@ -119,6 +125,12 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) {
// MessageRepository для подписки на события новых сообщений
private val messageRepository =
com.rosetta.messenger.data.MessageRepository.getInstance(application)
private val sendTextMessageUseCase =
SendTextMessageUseCase(sendWithRetry = { packet -> ProtocolManager.sendMessageWithRetry(packet) })
private val sendMediaMessageUseCase =
SendMediaMessageUseCase(sendWithRetry = { packet -> ProtocolManager.sendMessageWithRetry(packet) })
private val sendForwardUseCase =
SendForwardUseCase(sendWithRetry = { packet -> ProtocolManager.sendMessageWithRetry(packet) })
// 🔥 Кэш расшифрованных сообщений (messageId -> plainText)
private val decryptionCache = ConcurrentHashMap<String, String>()
@@ -3987,26 +3999,24 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) {
}
val packet =
PacketMessage().apply {
fromPublicKey = command.senderPublicKey
toPublicKey = command.recipientPublicKey
content = encryptedContent
chachaKey = encryptedKey
this.aesChachaKey = aesChachaKey
this.timestamp = timestamp
this.privateKey = privateKeyHash
this.messageId = messageId
attachments = messageAttachments
}
sendTextMessageUseCase(
SendTextMessageCommand(
fromPublicKey = command.senderPublicKey,
toPublicKey = command.recipientPublicKey,
encryptedContent = encryptedContent,
encryptedKey = encryptedKey,
aesChachaKey = aesChachaKey,
privateKeyHash = privateKeyHash,
messageId = messageId,
timestamp = timestamp,
attachments = messageAttachments,
isSavedMessages = isSavedMessages
)
)
// 🔥 DEBUG: Log packet before sending
packet.attachments.forEachIndexed { idx, att -> }
// 📁 Для Saved Messages - НЕ отправляем пакет на сервер
if (!isSavedMessages) {
ProtocolManager.sendMessageWithRetry(packet)
}
withContext(Dispatchers.Main) {
if (isSavedMessages) {
updateMessageStatus(messageId, MessageStatus.SENT)
@@ -4167,74 +4177,30 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) {
encryptionContext.plainKeyAndNonce
?.joinToString("") { "%02x".format(it) }
.orEmpty()
val forwardPayloadMessages =
forwardMessages.map { fm ->
ForwardPayloadMessage(
messageId = fm.messageId,
senderPublicKey = fm.senderPublicKey,
senderName = fm.senderName,
text = fm.text,
timestamp = fm.timestamp,
chachaKeyPlain = fm.chachaKeyPlain,
attachments = fm.attachments
)
}
fun buildForwardReplyJson(
includeLocalUri: Boolean
): JSONArray {
val replyJsonArray = JSONArray()
forwardMessages.forEach { fm ->
val attachmentsArray = JSONArray()
fm.attachments.forEach { att ->
val fwdInfo =
forwardedAttMap[
forwardAttachmentRewriteKey(
fm.messageId,
att.id
return sendForwardUseCase.buildForwardReplyJson(
messages = forwardPayloadMessages,
rewrittenAttachments = forwardedAttMap,
rewrittenMessageIds = rewrittenForwardMessageIds,
outgoingForwardPlainKeyHex = outgoingForwardPlainKeyHex,
includeLocalUri = includeLocalUri,
rewriteKey = ::forwardAttachmentRewriteKey
)
]
val attId = fwdInfo?.id ?: att.id
val attPreview = fwdInfo?.preview ?: att.preview
val attTransportTag = fwdInfo?.transportTag ?: att.transportTag
val attTransportServer =
fwdInfo?.transportServer ?: att.transportServer
val attLocalUri = if (includeLocalUri) fwdInfo?.localUri ?: att.localUri else ""
attachmentsArray.put(
JSONObject().apply {
put("id", attId)
put("type", att.type.value)
put("preview", attPreview)
put("width", att.width)
put("height", att.height)
put("blob", "")
put("transportTag", attTransportTag)
put("transportServer", attTransportServer)
put(
"transport",
JSONObject().apply {
put("transport_tag", attTransportTag)
put("transport_server", attTransportServer)
}
)
if (includeLocalUri && attLocalUri.isNotEmpty()) {
put("localUri", attLocalUri)
}
}
)
}
val effectiveForwardPlainKey =
if (fm.messageId in rewrittenForwardMessageIds &&
outgoingForwardPlainKeyHex.isNotEmpty()
) {
outgoingForwardPlainKeyHex
} else {
fm.chachaKeyPlain
}
replyJsonArray.put(
JSONObject().apply {
put("message_id", fm.messageId)
put("publicKey", fm.senderPublicKey)
put("message", fm.text)
put("timestamp", fm.timestamp)
put("attachments", attachmentsArray)
put("forwarded", true)
put("senderName", fm.senderName)
if (effectiveForwardPlainKey.isNotEmpty()) {
put("chacha_key_plain", effectiveForwardPlainKey)
}
}
)
}
return replyJsonArray
}
// 1) 🚀 Optimistic forward: мгновенно показываем сообщение в текущем диалоге
@@ -4335,11 +4301,9 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) {
val finalMessageAttachments =
listOf(
MessageAttachment(
id = replyAttachmentId,
blob = encryptedReplyBlob,
type = AttachmentType.MESSAGES,
preview = ""
sendForwardUseCase.buildForwardAttachment(
replyAttachmentId = replyAttachmentId,
encryptedReplyBlob = encryptedReplyBlob
)
)
@@ -4355,9 +4319,7 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) {
this.messageId = messageId
attachments = finalMessageAttachments
}
if (!isSavedMessages) {
ProtocolManager.sendMessageWithRetry(packet)
}
sendForwardUseCase.dispatch(packet, isSavedMessages)
val finalAttachmentsJson =
JSONArray()
@@ -4720,8 +4682,8 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) {
}
// Отправляем пакет
sendMediaMessageUseCase.dispatch(packet, isSavedMessages)
if (!isSavedMessages) {
ProtocolManager.sendMessageWithRetry(packet)
packetSentToProtocol = true
logPhotoPipeline(messageId, "packet sent to protocol")
} else {
@@ -4920,9 +4882,7 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) {
}
// Отправляем пакет (без blob!)
if (!isSavedMessages) {
ProtocolManager.sendMessageWithRetry(packet)
}
sendMediaMessageUseCase.dispatch(packet, isSavedMessages)
// 💾 Сохраняем изображение в файл локально (как в desktop)
AttachmentFileManager.saveAttachment(
@@ -5252,9 +5212,7 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) {
attachments = networkAttachments
}
if (!isSavedMessages) {
ProtocolManager.sendMessageWithRetry(packet)
}
sendMediaMessageUseCase.dispatch(packet, isSavedMessages)
updateMessageStatusAndAttachmentsInDb(
messageId = messageId,
@@ -5474,8 +5432,8 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) {
// Для Saved Messages не отправляем на сервер
val isSavedMessages = (sender == recipient)
sendMediaMessageUseCase.dispatch(packet, isSavedMessages)
if (!isSavedMessages) {
ProtocolManager.sendMessageWithRetry(packet)
logPhotoPipeline(messageId, "group packet sent")
}
@@ -5656,9 +5614,7 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) {
}
// Отправляем пакет (без blob!)
if (!isSavedMessages) {
ProtocolManager.sendMessageWithRetry(packet)
}
sendMediaMessageUseCase.dispatch(packet, isSavedMessages)
// ⚠️ НЕ сохраняем файл локально - они слишком большие
// Файлы загружаются с Transport Server при необходимости
@@ -6009,21 +5965,21 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) {
transportServer = attachmentTransportServer
)
val packet =
PacketMessage().apply {
fromPublicKey = sender
toPublicKey = recipient
content = encryptedContent
chachaKey = encryptedKey
this.aesChachaKey = aesChachaKey
this.timestamp = timestamp
this.privateKey = privateKeyHash
this.messageId = messageId
attachments = listOf(videoAttachment)
}
sendMediaMessageUseCase(
SendMediaMessageCommand(
fromPublicKey = sender,
toPublicKey = recipient,
encryptedContent = encryptedContent,
encryptedKey = encryptedKey,
aesChachaKey = aesChachaKey,
privateKeyHash = privateKeyHash,
messageId = messageId,
timestamp = timestamp,
mediaAttachments = listOf(videoAttachment),
isSavedMessages = isSavedMessages
)
)
if (!isSavedMessages) {
ProtocolManager.sendMessageWithRetry(packet)
packetSentToProtocol = true
}
@@ -6202,9 +6158,7 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) {
attachments = listOf(voiceAttachment)
}
if (!isSavedMessages) {
ProtocolManager.sendMessageWithRetry(packet)
}
sendMediaMessageUseCase.dispatch(packet, isSavedMessages)
// Для отправителя сохраняем voice blob локально в encrypted cache.
runCatching {
@@ -6456,9 +6410,7 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) {
attachments = listOf(avatarAttachment)
}
if (!isSavedMessages) {
ProtocolManager.sendMessageWithRetry(packet)
}
sendMediaMessageUseCase.dispatch(packet, isSavedMessages)
// 💾 Сохраняем аватар в файл локально (как IMAGE - с приватным ключом)
AttachmentFileManager.saveAttachment(

View File

@@ -1855,10 +1855,10 @@ fun ChatsListScreen(
)
} else if (syncInProgress) {
AnimatedDotsText(
baseText = "Updating",
color = Color.White,
baseText = "Synchronizing",
fontWeight = FontWeight.Bold,
fontSize = 20.sp,
fontWeight = FontWeight.Bold
color = Color.White
)
} else {
Text(
@@ -5037,7 +5037,7 @@ fun TypingIndicatorSmall(
val typingColor = if (isDarkTheme) PrimaryBlue else Color(0xFF34C759)
val senderTypingColor =
remember(typingSenderPublicKey, isDarkTheme) {
if (typingSenderPublicKey.isBlank()) {
if (typingSenderPublicKey.isBlank() || !isDarkTheme) {
typingColor
} else {
getAvatarColor(typingSenderPublicKey, isDarkTheme).textColor

View File

@@ -61,7 +61,7 @@ fun RequestsListScreen(
},
title = {
Text(
text = if (syncInProgress) "Updating..." else "Requests",
text = if (syncInProgress) "Synchronizing..." else "Requests",
fontWeight = FontWeight.Bold,
fontSize = 20.sp,
color = Color.White

View File

@@ -13,6 +13,7 @@ import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.launch
/**
@@ -39,37 +40,33 @@ class SearchUsersViewModel : ViewModel() {
// Приватные переменные
private var searchJob: Job? = null
private var packetFlowJob: Job? = null
private var privateKeyHash: String = ""
private val timeFormatter = SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault())
// Callback для обработки ответа поиска
private val searchPacketHandler: (com.rosetta.messenger.network.Packet) -> Unit = handler@{ packet ->
if (packet is PacketSearch) {
init {
packetFlowJob =
viewModelScope.launch {
ProtocolManager.packetFlow(0x03).collectLatest { packet ->
val searchPacket = packet as? PacketSearch ?: return@collectLatest
logSearch(
"📥 PacketSearch response: search='${packet.search}', users=${packet.users.size}"
"📥 PacketSearch response: search='${searchPacket.search}', users=${searchPacket.users.size}"
)
// Desktop parity: любой ответ PacketSearch обновляет результаты
// пока в поле есть активный поисковый запрос.
if (_searchQuery.value.trim().isEmpty()) {
logSearch("⏭ Ignored response: query is empty")
return@handler
return@collectLatest
}
_searchResults.value = packet.users
_searchResults.value = searchPacket.users
_isSearching.value = false
logSearch("✅ Results updated")
}
}
init {
// Регистрируем обработчик пакетов поиска
ProtocolManager.waitPacket(0x03, searchPacketHandler)
}
override fun onCleared() {
super.onCleared()
// Отписываемся от пакетов при уничтожении ViewModel
ProtocolManager.unwaitPacket(0x03, searchPacketHandler)
searchJob?.cancel()
packetFlowJob?.cancel()
}
/**

View File

@@ -237,7 +237,7 @@ fun TypingIndicator(
if (isDarkTheme) Color(0xFF54A9EB) else Color.White.copy(alpha = 0.7f)
val senderTypingColor =
remember(typingSenderPublicKey, isDarkTheme) {
if (typingSenderPublicKey.isBlank()) {
if (typingSenderPublicKey.isBlank() || !isDarkTheme) {
typingColor
} else {
groupSenderLabelColor(typingSenderPublicKey, isDarkTheme)

View File

@@ -0,0 +1,102 @@
package com.rosetta.messenger.ui.chats.usecase
import com.rosetta.messenger.network.AttachmentType
import com.rosetta.messenger.network.MessageAttachment
import com.rosetta.messenger.network.PacketMessage
import org.json.JSONArray
import org.json.JSONObject
data class ForwardPayloadMessage(
val messageId: String,
val senderPublicKey: String,
val senderName: String,
val text: String,
val timestamp: Long,
val chachaKeyPlain: String,
val attachments: List<MessageAttachment>
)
class SendForwardUseCase(
private val sendWithRetry: (PacketMessage) -> Unit
) {
fun buildForwardReplyJson(
messages: List<ForwardPayloadMessage>,
rewrittenAttachments: Map<String, MessageAttachment>,
rewrittenMessageIds: Set<String>,
outgoingForwardPlainKeyHex: String,
includeLocalUri: Boolean,
rewriteKey: (messageId: String, attachmentId: String) -> String
): JSONArray {
val replyJsonArray = JSONArray()
messages.forEach { message ->
val attachmentsArray = JSONArray()
message.attachments.forEach { attachment ->
val rewritten =
rewrittenAttachments[rewriteKey(message.messageId, attachment.id)]
val effectiveAttachment = rewritten ?: attachment
attachmentsArray.put(
JSONObject().apply {
put("id", effectiveAttachment.id)
put("type", effectiveAttachment.type.value)
put("preview", effectiveAttachment.preview)
put("width", effectiveAttachment.width)
put("height", effectiveAttachment.height)
put("blob", "")
put("transportTag", effectiveAttachment.transportTag)
put("transportServer", effectiveAttachment.transportServer)
put(
"transport",
JSONObject().apply {
put("transport_tag", effectiveAttachment.transportTag)
put("transport_server", effectiveAttachment.transportServer)
}
)
if (includeLocalUri && effectiveAttachment.localUri.isNotEmpty()) {
put("localUri", effectiveAttachment.localUri)
}
}
)
}
val effectiveForwardPlainKey =
if (message.messageId in rewrittenMessageIds && outgoingForwardPlainKeyHex.isNotEmpty()) {
outgoingForwardPlainKeyHex
} else {
message.chachaKeyPlain
}
replyJsonArray.put(
JSONObject().apply {
put("message_id", message.messageId)
put("publicKey", message.senderPublicKey)
put("message", message.text)
put("timestamp", message.timestamp)
put("attachments", attachmentsArray)
put("forwarded", true)
put("senderName", message.senderName)
if (effectiveForwardPlainKey.isNotEmpty()) {
put("chacha_key_plain", effectiveForwardPlainKey)
}
}
)
}
return replyJsonArray
}
fun buildForwardAttachment(
replyAttachmentId: String,
encryptedReplyBlob: String
): MessageAttachment =
MessageAttachment(
id = replyAttachmentId,
blob = encryptedReplyBlob,
type = AttachmentType.MESSAGES,
preview = ""
)
fun dispatch(packet: PacketMessage, isSavedMessages: Boolean) {
if (!isSavedMessages) {
sendWithRetry(packet)
}
}
}

View File

@@ -0,0 +1,47 @@
package com.rosetta.messenger.ui.chats.usecase
import com.rosetta.messenger.network.MessageAttachment
import com.rosetta.messenger.network.PacketMessage
data class SendMediaMessageCommand(
val fromPublicKey: String,
val toPublicKey: String,
val encryptedContent: String,
val encryptedKey: String,
val aesChachaKey: String,
val privateKeyHash: String,
val messageId: String,
val timestamp: Long,
val mediaAttachments: List<MessageAttachment>,
val isSavedMessages: Boolean
)
class SendMediaMessageUseCase(
private val sendWithRetry: (PacketMessage) -> Unit
) {
operator fun invoke(command: SendMediaMessageCommand): PacketMessage {
val packet =
PacketMessage().apply {
fromPublicKey = command.fromPublicKey
toPublicKey = command.toPublicKey
content = command.encryptedContent
chachaKey = command.encryptedKey
aesChachaKey = command.aesChachaKey
privateKey = command.privateKeyHash
messageId = command.messageId
timestamp = command.timestamp
attachments = command.mediaAttachments
}
if (!command.isSavedMessages) {
sendWithRetry(packet)
}
return packet
}
fun dispatch(packet: PacketMessage, isSavedMessages: Boolean) {
if (!isSavedMessages) {
sendWithRetry(packet)
}
}
}

View File

@@ -0,0 +1,47 @@
package com.rosetta.messenger.ui.chats.usecase
import com.rosetta.messenger.network.MessageAttachment
import com.rosetta.messenger.network.PacketMessage
data class SendTextMessageCommand(
val fromPublicKey: String,
val toPublicKey: String,
val encryptedContent: String,
val encryptedKey: String,
val aesChachaKey: String,
val privateKeyHash: String,
val messageId: String,
val timestamp: Long,
val attachments: List<MessageAttachment> = emptyList(),
val isSavedMessages: Boolean
)
class SendTextMessageUseCase(
private val sendWithRetry: (PacketMessage) -> Unit
) {
operator fun invoke(command: SendTextMessageCommand): PacketMessage {
val packet =
PacketMessage().apply {
fromPublicKey = command.fromPublicKey
toPublicKey = command.toPublicKey
content = command.encryptedContent
chachaKey = command.encryptedKey
aesChachaKey = command.aesChachaKey
privateKey = command.privateKeyHash
messageId = command.messageId
timestamp = command.timestamp
attachments = command.attachments
}
if (!command.isSavedMessages) {
sendWithRetry(packet)
}
return packet
}
fun dispatch(packet: PacketMessage, isSavedMessages: Boolean) {
if (!isSavedMessages) {
sendWithRetry(packet)
}
}
}

View File

@@ -5,12 +5,12 @@ import androidx.lifecycle.AndroidViewModel
import androidx.lifecycle.viewModelScope
import com.rosetta.messenger.data.AccountManager
import com.rosetta.messenger.data.EncryptedAccount
import com.rosetta.messenger.network.Packet
import com.rosetta.messenger.network.PacketResult
import com.rosetta.messenger.network.PacketUserInfo
import com.rosetta.messenger.network.ProtocolManager
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.collectLatest
import kotlinx.coroutines.launch
import kotlinx.coroutines.delay
@@ -29,22 +29,21 @@ class ProfileViewModel(application: Application) : AndroidViewModel(application)
private val _state = MutableStateFlow(ProfileState())
val state: StateFlow<ProfileState> = _state
private var resultCallback: ((Packet) -> Unit)? = null
private var packetFlowJob: kotlinx.coroutines.Job? = null
init {
// Register listener for PacketResult (0x02)
resultCallback = { packet ->
if (packet is PacketResult) {
handlePacketResult(packet)
packetFlowJob =
viewModelScope.launch {
ProtocolManager.packetFlow(0x02).collectLatest { packet ->
val result = packet as? PacketResult ?: return@collectLatest
handlePacketResult(result)
}
}
ProtocolManager.waitPacket(0x02, resultCallback!!)
}
override fun onCleared() {
super.onCleared()
// Unregister listener
resultCallback?.let { ProtocolManager.unwaitPacket(0x02, it) }
packetFlowJob?.cancel()
}
private fun addLog(message: String) {