diff --git a/app/src/main/java/com/rosetta/messenger/MainActivity.kt b/app/src/main/java/com/rosetta/messenger/MainActivity.kt index 8c0963e..bd480bf 100644 --- a/app/src/main/java/com/rosetta/messenger/MainActivity.kt +++ b/app/src/main/java/com/rosetta/messenger/MainActivity.kt @@ -59,10 +59,12 @@ import com.rosetta.messenger.network.ProtocolManager import com.rosetta.messenger.network.ProtocolState import com.rosetta.messenger.network.SearchUser import com.rosetta.messenger.repository.AvatarRepository +import com.rosetta.messenger.session.AppSessionCoordinator +import com.rosetta.messenger.session.IdentityStore +import com.rosetta.messenger.session.SessionState import com.rosetta.messenger.ui.auth.AccountInfo import com.rosetta.messenger.ui.auth.AuthFlow import com.rosetta.messenger.ui.auth.DeviceConfirmScreen -import com.rosetta.messenger.ui.auth.startAuthHandshakeFast import com.rosetta.messenger.ui.chats.ChatDetailScreen import com.rosetta.messenger.ui.chats.ChatsListScreen import com.rosetta.messenger.ui.chats.VoiceTopMiniPlayer @@ -240,6 +242,8 @@ class MainActivity : FragmentActivity() { var showOnboarding by remember { mutableStateOf(true) } var hasExistingAccount by remember { mutableStateOf(null) } var currentAccount by remember { mutableStateOf(getCachedSessionAccount()) } + val identityState by IdentityStore.state.collectAsState() + val sessionState by AppSessionCoordinator.sessionState.collectAsState() var accountInfoList by remember { mutableStateOf>(emptyList()) } var startCreateAccountFlow by remember { mutableStateOf(false) } var preservedMainNavStack by remember { mutableStateOf>(emptyList()) } @@ -247,6 +251,7 @@ class MainActivity : FragmentActivity() { // Check for existing accounts and build AccountInfo list LaunchedEffect(Unit) { + AppSessionCoordinator.syncFromCachedAccount(currentAccount) val accounts = accountManager.getAllAccounts() hasExistingAccount = accounts.isNotEmpty() val infos = accounts.map { it.toAccountInfo() } @@ -272,6 +277,38 @@ class MainActivity : FragmentActivity() { } } + LaunchedEffect(identityState.account) { + val identityAccount = identityState.account ?: return@LaunchedEffect + if ( + identityAccount.publicKey != currentAccount?.publicKey || + identityAccount.name != currentAccount?.name + ) { + currentAccount = identityAccount + cacheSessionAccount(identityAccount) + } + } + + LaunchedEffect(sessionState) { + val readyAccount = (sessionState as? SessionState.Ready)?.account ?: return@LaunchedEffect + if ( + currentAccount?.publicKey != readyAccount.publicKey || + currentAccount?.name != readyAccount.name + ) { + currentAccount = readyAccount + cacheSessionAccount(readyAccount) + } + } + + LaunchedEffect(currentAccount, isLoggedIn) { + val account = currentAccount + when { + account != null -> AppSessionCoordinator.markReady(account, reason = "main_activity_state") + isLoggedIn == true -> + AppSessionCoordinator.markAuthInProgress(reason = "main_activity_logged_in_no_account") + isLoggedIn == false -> AppSessionCoordinator.markLoggedOut(reason = "main_activity_logged_out") + } + } + // Wait for initial load if (hasExistingAccount == null) { Box( @@ -353,24 +390,30 @@ class MainActivity : FragmentActivity() { // При открытии по звонку с lock screen — пропускаем auth openedForCall && hasExistingAccount == true -> "main" - // First-registration race: DataStore may flip isLoggedIn=true - // before AuthFlow returns DecryptedAccount to currentAccount. - // Do not enter MainScreen with null account (it leads to - // empty keys/UI placeholders until relog). - isLoggedIn == true && currentAccount == null && + // User explicitly entered account creation flow: + // keep auth screen even if intermediate session flags + // briefly oscillate during DataStore updates. + startCreateAccountFlow -> "auth_new" + sessionState is SessionState.LoggedOut && hasExistingAccount == false -> "auth_new" - isLoggedIn != true && hasExistingAccount == false -> - "auth_new" - isLoggedIn != true && hasExistingAccount == true -> + sessionState is SessionState.LoggedOut && + hasExistingAccount == true -> "auth_unlock" - isLoggedIn == true && - currentAccount == null && + // First-registration race: DataStore may flip + // logged-in marker before currentAccount is ready. + // Keep user in create auth flow instead of jumping + // to main with incomplete session context. + sessionState is SessionState.AuthInProgress && + hasExistingAccount == false -> + "auth_new" + sessionState is SessionState.AuthInProgress && hasExistingAccount == true -> "auth_unlock" protocolState == ProtocolState.DEVICE_VERIFICATION_REQUIRED -> "device_confirm" + sessionState is SessionState.Ready -> "main" else -> "main" }, transitionSpec = { @@ -437,52 +480,19 @@ class MainActivity : FragmentActivity() { } currentAccount = normalizedAccount cacheSessionAccount(normalizedAccount) - hasExistingAccount = true - // Save as last logged account normalizedAccount?.let { - accountManager.setLastLoggedPublicKey(it.publicKey) - // Initialize protocol/message account context - // immediately after auth completion to avoid - // packet processing race before MainScreen - // composition. - ProtocolManager.initializeAccount( - it.publicKey, - it.privateKey + AppSessionCoordinator.markReady( + account = it, + reason = "auth_complete" ) - } - - // Первый запуск после регистрации: - // дополнительно перезапускаем auth/connect, чтобы не оставаться - // в "залипшем CONNECTING" до ручного рестарта приложения. - normalizedAccount?.let { authAccount -> - startAuthHandshakeFast( - authAccount.publicKey, - authAccount.privateKeyHash - ) - scope.launch { - repeat(3) { attempt -> - if (ProtocolManager.isAuthenticated()) return@launch - delay(2000L * (attempt + 1)) - if (ProtocolManager.isAuthenticated()) return@launch - ProtocolManager.reconnectNowIfNeeded( - "post_auth_complete_retry_${attempt + 1}" - ) - startAuthHandshakeFast( - authAccount.publicKey, - authAccount.privateKeyHash - ) - } - } - } + } ?: AppSessionCoordinator.markAuthInProgress( + reason = "auth_complete_no_account" + ) + hasExistingAccount = true // Reload accounts list scope.launch { normalizedAccount?.let { - // Синхронно помечаем текущий аккаунт активным в DataStore. - runCatching { - accountManager.setCurrentAccount(it.publicKey) - } - // Force-refresh account title from persisted // profile (name/username) to avoid temporary // public-key alias in UI after login. @@ -516,6 +526,9 @@ class MainActivity : FragmentActivity() { // lag currentAccount = null clearCachedSessionAccount() + AppSessionCoordinator.markLoggedOut( + reason = "auth_flow_logout" + ) com.rosetta.messenger.network.ProtocolManager .disconnect() scope.launch { @@ -525,9 +538,12 @@ class MainActivity : FragmentActivity() { ) } "main" -> { - val activeAccountKey = currentAccount?.publicKey.orEmpty() + val resolvedAccount = + currentAccount + ?: (sessionState as? SessionState.Ready)?.account + val activeAccountKey = resolvedAccount?.publicKey.orEmpty() MainScreen( - account = currentAccount, + account = resolvedAccount, initialNavStack = if ( activeAccountKey.isNotBlank() && @@ -565,6 +581,9 @@ class MainActivity : FragmentActivity() { // lag currentAccount = null clearCachedSessionAccount() + AppSessionCoordinator.markLoggedOut( + reason = "main_logout" + ) com.rosetta.messenger.network.ProtocolManager .disconnect() scope.launch { @@ -600,6 +619,9 @@ class MainActivity : FragmentActivity() { // 8. Navigate away last currentAccount = null clearCachedSessionAccount() + AppSessionCoordinator.markLoggedOut( + reason = "delete_current_account" + ) } catch (e: Exception) { android.util.Log.e("DeleteAccount", "Failed to delete account", e) } @@ -641,6 +663,9 @@ class MainActivity : FragmentActivity() { preservedMainNavAccountKey = "" currentAccount = null clearCachedSessionAccount() + AppSessionCoordinator.markLoggedOut( + reason = "delete_sidebar_current_account" + ) } } catch (e: Exception) { android.util.Log.e("DeleteAccount", "Failed to delete account from sidebar", e) @@ -658,6 +683,9 @@ class MainActivity : FragmentActivity() { // Switch to another account: logout current, then show unlock. currentAccount = null clearCachedSessionAccount() + AppSessionCoordinator.markLoggedOut( + reason = "switch_account" + ) com.rosetta.messenger.network.ProtocolManager.disconnect() scope.launch { accountManager.logout() @@ -669,6 +697,9 @@ class MainActivity : FragmentActivity() { preservedMainNavAccountKey = "" currentAccount = null clearCachedSessionAccount() + AppSessionCoordinator.markLoggedOut( + reason = "add_account" + ) com.rosetta.messenger.network.ProtocolManager.disconnect() scope.launch { accountManager.logout() @@ -684,6 +715,9 @@ class MainActivity : FragmentActivity() { preservedMainNavAccountKey = "" currentAccount = null clearCachedSessionAccount() + AppSessionCoordinator.markLoggedOut( + reason = "device_confirm_exit" + ) ProtocolManager.disconnect() scope.launch { accountManager.logout() @@ -921,6 +955,7 @@ fun MainScreen( // Following desktop version pattern: username is stored locally and loaded on app start var accountUsername by remember { mutableStateOf("") } var accountVerified by remember(accountPublicKey) { mutableIntStateOf(0) } + val identitySnapshot by IdentityStore.state.collectAsState() var reloadTrigger by remember { mutableIntStateOf(0) } // Load username AND name from AccountManager (persisted in DataStore) @@ -1164,12 +1199,22 @@ fun MainScreen( suspend fun refreshAccountIdentityState(accountKey: String) { val accountManager = AccountManager(context) val encryptedAccount = accountManager.getAccount(accountKey) + val identityOwn = + identitySnapshot.profile?.takeIf { + it.publicKey.equals(accountKey, ignoreCase = true) + } val cachedOwn = ProtocolManager.getCachedUserInfo(accountKey) val persistedName = encryptedAccount?.name?.trim().orEmpty() val persistedUsername = encryptedAccount?.username?.trim().orEmpty() - val cachedName = cachedOwn?.title?.trim().orEmpty() - val cachedUsername = cachedOwn?.username?.trim().orEmpty() + val cachedName = + identityOwn?.displayName?.trim().orEmpty().ifBlank { + cachedOwn?.title?.trim().orEmpty() + } + val cachedUsername = + identityOwn?.username?.trim().orEmpty().ifBlank { + cachedOwn?.username?.trim().orEmpty() + } if (cachedName.isNotBlank() && !isPlaceholderAccountName(cachedName) && @@ -1190,8 +1235,16 @@ fun MainScreen( } accountUsername = finalUsername - accountVerified = cachedOwn?.verified ?: 0 + accountVerified = identityOwn?.verified ?: cachedOwn?.verified ?: 0 accountName = resolveAccountDisplayName(accountKey, preferredName, finalUsername) + IdentityStore.updateOwnProfile( + publicKey = accountKey, + displayName = accountName, + username = accountUsername, + verified = accountVerified, + resolved = true, + reason = "main_screen_refresh" + ) } LaunchedEffect(accountPublicKey, reloadTrigger) { @@ -1202,6 +1255,21 @@ fun MainScreen( } } + LaunchedEffect(identitySnapshot.profile, accountPublicKey) { + val profile = identitySnapshot.profile ?: return@LaunchedEffect + if (!profile.publicKey.equals(accountPublicKey, ignoreCase = true)) return@LaunchedEffect + + val normalizedName = + resolveAccountDisplayName( + accountPublicKey, + profile.displayName.ifBlank { accountName }, + profile.username.ifBlank { null } + ) + accountName = normalizedName + accountUsername = profile.username + accountVerified = profile.verified + } + // Состояние протокола для передачи в SearchScreen val protocolState by ProtocolManager.state.collectAsState() diff --git a/app/src/main/java/com/rosetta/messenger/network/PacketSubscriptionRegistry.kt b/app/src/main/java/com/rosetta/messenger/network/PacketSubscriptionRegistry.kt new file mode 100644 index 0000000..14e8bcc --- /dev/null +++ b/app/src/main/java/com/rosetta/messenger/network/PacketSubscriptionRegistry.kt @@ -0,0 +1,108 @@ +package com.rosetta.messenger.network + +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.CopyOnWriteArrayList +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow +import kotlinx.coroutines.flow.asSharedFlow +import kotlinx.coroutines.launch + +/** + * Centralized packet subscription registry. + * + * Guarantees exactly one low-level Protocol.waitPacket subscription per packet id + * and fans out packets to: + * 1) legacy callback listeners (waitPacket/unwaitPacket API), + * 2) SharedFlow collectors in network/UI layers. + */ +class PacketSubscriptionRegistry( + private val protocolProvider: () -> Protocol, + private val scope: CoroutineScope, + private val addLog: (String) -> Unit +) { + + private data class PacketBus( + val packetId: Int, + val callbacks: CopyOnWriteArrayList<(Packet) -> Unit>, + val sharedFlow: MutableSharedFlow, + val protocolBridge: (Packet) -> Unit + ) + + private val buses = ConcurrentHashMap() + + private fun ensureBus(packetId: Int): PacketBus { + buses[packetId]?.let { return it } + + val callbacks = CopyOnWriteArrayList<(Packet) -> Unit>() + val sharedFlow = + MutableSharedFlow( + replay = 0, + extraBufferCapacity = 128, + onBufferOverflow = BufferOverflow.DROP_OLDEST + ) + val bridge: (Packet) -> Unit = { packet -> + if (!sharedFlow.tryEmit(packet)) { + scope.launch { sharedFlow.emit(packet) } + } + callbacks.forEach { callback -> + runCatching { callback(packet) } + .onFailure { error -> + addLog("❌ PacketSubscriptionRegistry callback error: ${error.message}") + } + } + } + + val created = + PacketBus( + packetId = packetId, + callbacks = callbacks, + sharedFlow = sharedFlow, + protocolBridge = bridge + ) + + val existing = buses.putIfAbsent(packetId, created) + if (existing == null) { + protocolProvider().waitPacket(packetId, bridge) + addLog( + "🧭 PacketSubscriptionRegistry attached id=0x${packetId.toString(16).uppercase()}" + ) + return created + } + return existing + } + + fun flow(packetId: Int): SharedFlow = ensureBus(packetId).sharedFlow.asSharedFlow() + + fun addCallback(packetId: Int, callback: (Packet) -> Unit) { + val bus = ensureBus(packetId) + if (bus.callbacks.contains(callback)) { + addLog( + "📝 registry waitPacket(0x${packetId.toString(16)}) skipped duplicate callback; callbacks=${bus.callbacks.size}" + ) + return + } + bus.callbacks.add(callback) + addLog( + "📝 registry waitPacket(0x${packetId.toString(16)}) callback registered; callbacks=${bus.callbacks.size}" + ) + } + + fun removeCallback(packetId: Int, callback: (Packet) -> Unit) { + val bus = buses[packetId] ?: return + bus.callbacks.remove(callback) + addLog( + "📝 registry unwaitPacket(0x${packetId.toString(16)}) callback removed; callbacks=${bus.callbacks.size}" + ) + } + + fun destroy() { + buses.forEach { (packetId, bus) -> + runCatching { + protocolProvider().unwaitPacket(packetId, bus.protocolBridge) + } + } + buses.clear() + } +} diff --git a/app/src/main/java/com/rosetta/messenger/network/Protocol.kt b/app/src/main/java/com/rosetta/messenger/network/Protocol.kt index 9da7556..67a8f95 100644 --- a/app/src/main/java/com/rosetta/messenger/network/Protocol.kt +++ b/app/src/main/java/com/rosetta/messenger/network/Protocol.kt @@ -187,6 +187,7 @@ class Protocol( private var lastStateChangeTime = System.currentTimeMillis() private var lastSuccessfulConnection = 0L private var reconnectJob: Job? = null // Для отмены запланированных переподключений + private var connectingTimeoutJob: Job? = null private var isConnecting = false // Флаг для защиты от одновременных подключений private var connectingSinceMs = 0L @@ -229,6 +230,7 @@ class Protocol( val responseCode: Int?, val responseMessage: String? ) : SessionEvent + data class ConnectingTimeout(val generation: Long) : SessionEvent } private val sessionEvents = Channel(Channel.UNLIMITED) @@ -272,6 +274,7 @@ class Protocol( is SessionEvent.SocketOpened -> handleSocketOpened(event) is SessionEvent.SocketClosed -> handleSocketClosed(event) is SessionEvent.SocketFailure -> handleSocketFailure(event) + is SessionEvent.ConnectingTimeout -> handleConnectingTimeout(event.generation) } } catch (e: CancellationException) { throw e @@ -323,6 +326,23 @@ class Protocol( } } + private fun cancelConnectingTimeout(reason: String) { + if (connectingTimeoutJob != null) { + log("⏱️ CONNECTING watchdog disarmed ($reason)") + } + connectingTimeoutJob?.cancel() + connectingTimeoutJob = null + } + + private fun armConnectingTimeout(generation: Long) { + cancelConnectingTimeout(reason = "re-arm") + connectingTimeoutJob = scope.launch { + delay(CONNECTING_STUCK_TIMEOUT_MS) + enqueueSessionEvent(SessionEvent.ConnectingTimeout(generation)) + } + log("⏱️ CONNECTING watchdog armed gen=$generation timeout=${CONNECTING_STUCK_TIMEOUT_MS}ms") + } + private fun handleSocketOpened(event: SessionEvent.SocketOpened) { if (isStaleSocketEvent("onOpen", event.generation, event.socket)) return log( @@ -330,6 +350,7 @@ class Protocol( "hasCredentials=${lastPublicKey != null}, gen=${event.generation}" ) + cancelConnectingTimeout(reason = "socket_opened") isConnecting = false connectingSinceMs = 0L @@ -356,6 +377,7 @@ class Protocol( "❌ WebSocket CLOSED: code=${event.code} reason='${event.reason}' state=${_state.value} " + "manuallyClosed=$isManuallyClosed gen=${event.generation}" ) + cancelConnectingTimeout(reason = "socket_closed") isConnecting = false connectingSinceMs = 0L handleDisconnectLocked("onClosed") @@ -370,12 +392,41 @@ class Protocol( log(" Reconnect attempts: $reconnectAttempts") log(" Generation: ${event.generation}") event.throwable.printStackTrace() + cancelConnectingTimeout(reason = "socket_failure") isConnecting = false connectingSinceMs = 0L _lastError.value = event.throwable.message handleDisconnectLocked("onFailure") } + private fun handleConnectingTimeout(generation: Long) { + val currentState = _state.value + if (generation != activeConnectionGeneration) { + log( + "⏱️ CONNECTING watchdog ignored for stale generation " + + "(event=$generation active=$activeConnectionGeneration)" + ) + return + } + if (!isConnecting || currentState != ProtocolState.CONNECTING) { + return + } + + val elapsed = if (connectingSinceMs > 0L) { + System.currentTimeMillis() - connectingSinceMs + } else { + CONNECTING_STUCK_TIMEOUT_MS + } + log("🧯 CONNECTING TIMEOUT fired (elapsed=${elapsed}ms) -> forcing disconnect/reconnect") + + cancelConnectingTimeout(reason = "timeout_fired") + isConnecting = false + connectingSinceMs = 0L + runCatching { webSocket?.cancel() } + webSocket = null + handleDisconnectLocked("connecting_timeout") + } + private fun handleHandshakeResponse(packet: PacketHandshake) { handshakeJob?.cancel() @@ -679,6 +730,7 @@ class Protocol( return } log("🧯 CONNECTING STUCK detected (elapsed=${elapsed}ms) -> forcing reconnect reset") + cancelConnectingTimeout(reason = "connect_stuck_reset") isConnecting = false connectingSinceMs = 0L runCatching { webSocket?.cancel() } @@ -721,6 +773,7 @@ class Protocol( isManuallyClosed = false setState(ProtocolState.CONNECTING, "Starting new connection attempt #$reconnectAttempts") _lastError.value = null + armConnectingTimeout(generation) log("🔌 Connecting to: $serverAddress (attempt #$reconnectAttempts)") @@ -1016,6 +1069,7 @@ class Protocol( "🔌 DISCONNECT HANDLER: source=$source previousState=$previousState, manuallyClosed=$isManuallyClosed, " + "reconnectAttempts=$reconnectAttempts, isConnecting=$isConnecting, instance=$instanceId" ) + cancelConnectingTimeout(reason = "handle_disconnect:$source") // Duplicate callbacks are possible (e.g. heartbeat failure + onFailure/onClosed). // If we are already disconnected and a reconnect is pending, avoid scheduling another one. @@ -1120,6 +1174,7 @@ class Protocol( private fun disconnectLocked(manual: Boolean, reason: String) { log("🔌 Disconnect requested: manual=$manual reason='$reason' instance=$instanceId") isManuallyClosed = manual + cancelConnectingTimeout(reason = "disconnect_locked") isConnecting = false // Сбрасываем флаг connectingSinceMs = 0L reconnectJob?.cancel() // Отменяем запланированные переподключения @@ -1170,6 +1225,7 @@ class Protocol( return } log("🧯 FAST RECONNECT: stuck CONNECTING (${elapsed}ms) -> reset and reconnect") + cancelConnectingTimeout(reason = "fast_reconnect_reset") isConnecting = false connectingSinceMs = 0L runCatching { webSocket?.cancel() } @@ -1225,6 +1281,7 @@ class Protocol( sessionLoopJob.cancelAndJoin() } } + connectingTimeoutJob?.cancel() heartbeatJob?.cancel() scope.cancel() } diff --git a/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt b/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt index 28aeb69..53096da 100644 --- a/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt +++ b/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt @@ -10,10 +10,17 @@ import com.rosetta.messenger.data.AccountManager import com.rosetta.messenger.data.GroupRepository import com.rosetta.messenger.data.MessageRepository import com.rosetta.messenger.data.isPlaceholderAccountName +import com.rosetta.messenger.network.connection.BootstrapCoordinator +import com.rosetta.messenger.network.connection.ConnectionOrchestrator +import com.rosetta.messenger.network.connection.OwnProfileSyncService +import com.rosetta.messenger.network.connection.PacketRouter +import com.rosetta.messenger.network.connection.RetryQueueService +import com.rosetta.messenger.session.IdentityStore import com.rosetta.messenger.utils.MessageLogger import kotlinx.coroutines.* import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.sync.Mutex @@ -25,7 +32,6 @@ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicLong -import kotlin.coroutines.resume /** * Singleton manager for Protocol instance @@ -62,6 +68,12 @@ object ProtocolManager { private var appContext: Context? = null private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) private val protocolInstanceLock = Any() + private val packetSubscriptionRegistry = + PacketSubscriptionRegistry( + protocolProvider = ::getProtocol, + scope = scope, + addLog = ::addLog + ) private val connectionSupervisor = ProtocolConnectionSupervisor( scope = scope, @@ -75,6 +87,43 @@ object ProtocolManager { maxSize = READY_PACKET_QUEUE_MAX, ttlMs = READY_PACKET_QUEUE_TTL_MS ) + private val bootstrapCoordinator = + BootstrapCoordinator( + readyPacketGate = readyPacketGate, + addLog = ::addLog, + shortKeyForLog = ::shortKeyForLog, + sendPacketDirect = { packet -> getProtocol().sendPacket(packet) } + ) + private val connectionOrchestrator = + ConnectionOrchestrator( + hasActiveInternet = ::hasActiveInternet, + waitForNetworkAndReconnect = ::waitForNetworkAndReconnect, + stopWaitingForNetwork = { reason -> stopWaitingForNetwork(reason) }, + getProtocol = ::getProtocol, + appContextProvider = { appContext }, + buildHandshakeDevice = ::buildHandshakeDevice + ) + private val ownProfileSyncService = OwnProfileSyncService(::isPlaceholderAccountName) + private val packetRouter by lazy { + PacketRouter( + sendSearchPacket = { packet -> send(packet) }, + privateHashProvider = { + try { + getProtocol().getPrivateHash() + } catch (_: Exception) { + null + } + } + ) + } + private val retryQueueService = + RetryQueueService( + scope = scope, + sendPacket = { packet -> send(packet) }, + isAuthenticated = ::isAuthenticated, + addLog = ::addLog, + markOutgoingAsError = ::markOutgoingAsError + ) @Volatile private var packetHandlersRegistered = false @Volatile private var stateMonitoringStarted = false @@ -119,20 +168,7 @@ object ProtocolManager { val pendingDeviceVerification: StateFlow = _pendingDeviceVerification.asStateFlow() // Сигнал обновления own profile (username/name загружены с сервера) - private val _ownProfileUpdated = MutableStateFlow(0L) - val ownProfileUpdated: StateFlow = _ownProfileUpdated.asStateFlow() - - // 🔍 Global user info cache (like Desktop's InformationProvider.cachedUsers) - // publicKey → SearchUser (resolved via PacketSearch 0x03) - private val userInfoCache = ConcurrentHashMap() - // Pending resolves: publicKey → list of continuations waiting for the result - private val pendingResolves = ConcurrentHashMap>>() - // Pending search requests: query(username/publicKey fragment) → waiting continuations - private val pendingSearchQueries = - ConcurrentHashMap>>>() - - private fun normalizeSearchQuery(value: String): String = - value.trim().removePrefix("@").lowercase(Locale.ROOT) + val ownProfileUpdated: StateFlow = ownProfileSyncService.ownProfileUpdated private fun ensureConnectionSupervisor() { connectionSupervisor.start() @@ -143,14 +179,7 @@ object ProtocolManager { } private fun protocolToLifecycleState(state: ProtocolState): ConnectionLifecycleState = - when (state) { - ProtocolState.DISCONNECTED -> ConnectionLifecycleState.DISCONNECTED - ProtocolState.CONNECTING -> ConnectionLifecycleState.CONNECTING - ProtocolState.CONNECTED, ProtocolState.HANDSHAKING -> ConnectionLifecycleState.HANDSHAKING - ProtocolState.DEVICE_VERIFICATION_REQUIRED -> - ConnectionLifecycleState.DEVICE_VERIFICATION_REQUIRED - ProtocolState.AUTHENTICATED -> ConnectionLifecycleState.AUTHENTICATED - } + bootstrapCoordinator.protocolToLifecycleState(state) private fun setConnectionLifecycleState(next: ConnectionLifecycleState, reason: String) { if (_connectionLifecycleState.value == next) return @@ -161,23 +190,18 @@ object ProtocolManager { private fun recomputeConnectionLifecycleState(reason: String) { val context = bootstrapContext val nextState = - if (context.authenticated) { - if (context.accountInitialized && context.syncCompleted && context.ownProfileResolved) { - ConnectionLifecycleState.READY - } else { - ConnectionLifecycleState.BOOTSTRAPPING - } - } else { - protocolToLifecycleState(context.protocolState) + bootstrapCoordinator.recomputeLifecycleState( + context = context, + currentState = _connectionLifecycleState.value, + reason = reason + ) { state, updateReason -> + setConnectionLifecycleState(state, updateReason) } - setConnectionLifecycleState(nextState, reason) - if (nextState == ConnectionLifecycleState.READY) { - flushReadyPacketQueue(context.accountPublicKey, reason) - } + _connectionLifecycleState.value = nextState } private fun clearReadyPacketQueue(reason: String) { - readyPacketGate.clear(reason = reason, addLog = ::addLog) + bootstrapCoordinator.clearReadyPacketQueue(reason) } private fun enqueueReadyPacket(packet: Packet) { @@ -185,41 +209,19 @@ object ProtocolManager { messageRepository?.getCurrentAccountKey()?.trim().orEmpty().ifBlank { bootstrapContext.accountPublicKey } - readyPacketGate.enqueue( + bootstrapCoordinator.enqueueReadyPacket( packet = packet, accountPublicKey = accountKey, - state = _connectionLifecycleState.value, - shortKeyForLog = ::shortKeyForLog, - addLog = ::addLog + state = _connectionLifecycleState.value ) } private fun flushReadyPacketQueue(activeAccountKey: String, reason: String) { - val packetsToSend = - readyPacketGate.drainForAccount( - activeAccountKey = activeAccountKey, - reason = reason, - addLog = ::addLog - ) - if (packetsToSend.isEmpty()) return - val protocolInstance = getProtocol() - packetsToSend.forEach { protocolInstance.sendPacket(it) } + bootstrapCoordinator.flushReadyPacketQueue(activeAccountKey, reason) } private fun packetCanBypassReadyGate(packet: Packet): Boolean = - when (packet) { - is PacketHandshake, - is PacketSync, - is PacketSearch, - is PacketPushNotification, - is PacketRequestTransport, - is PacketRequestUpdate, - is PacketSignalPeer, - is PacketWebRTC, - is PacketIceServers, - is PacketDeviceResolve -> true - else -> false - } + bootstrapCoordinator.packetCanBypassReadyGate(packet) private suspend fun handleConnectionEvent(event: ConnectionEvent) { when (event) { @@ -281,20 +283,10 @@ object ProtocolManager { } } is ConnectionEvent.Connect -> { - if (!hasActiveInternet()) { - waitForNetworkAndReconnect("connect:${event.reason}") - return - } - stopWaitingForNetwork("connect:${event.reason}") - getProtocol().connect() + connectionOrchestrator.handleConnect(event.reason) } is ConnectionEvent.FastReconnect -> { - if (!hasActiveInternet()) { - waitForNetworkAndReconnect("reconnect:${event.reason}") - return - } - stopWaitingForNetwork("reconnect:${event.reason}") - getProtocol().reconnectNowIfNeeded(event.reason) + connectionOrchestrator.handleFastReconnect(event.reason) } is ConnectionEvent.Disconnect -> { stopWaitingForNetwork(event.reason) @@ -321,15 +313,7 @@ object ProtocolManager { recomputeConnectionLifecycleState("disconnect:${event.reason}") } is ConnectionEvent.Authenticate -> { - appContext?.let { context -> - runCatching { - val accountManager = AccountManager(context) - accountManager.setLastLoggedPublicKey(event.publicKey) - accountManager.setLastLoggedPrivateKeyHash(event.privateHash) - } - } - val device = buildHandshakeDevice() - getProtocol().startHandshake(event.publicKey, event.privateHash, device) + connectionOrchestrator.handleAuthenticate(event.publicKey, event.privateHash) } is ConnectionEvent.ProtocolStateChanged -> { val previousProtocolState = bootstrapContext.protocolState @@ -446,6 +430,11 @@ object ProtocolManager { ownProfileFallbackJob?.cancel() ownProfileFallbackJob = null bootstrapContext = bootstrapContext.copy(ownProfileResolved = true) + IdentityStore.updateOwnProfile( + publicKey = event.publicKey, + resolved = true, + reason = "protocol_own_profile_resolved" + ) recomputeConnectionLifecycleState("own_profile_resolved") } is ConnectionEvent.OwnProfileFallbackTimeout -> { @@ -455,6 +444,14 @@ object ProtocolManager { "⏱️ Own profile fetch timeout — continuing bootstrap for ${shortKeyForLog(bootstrapContext.accountPublicKey)}" ) bootstrapContext = bootstrapContext.copy(ownProfileResolved = true) + val accountPublicKey = bootstrapContext.accountPublicKey + if (accountPublicKey.isNotBlank()) { + IdentityStore.updateOwnProfile( + publicKey = accountPublicKey, + resolved = true, + reason = "protocol_own_profile_fallback_timeout" + ) + } recomputeConnectionLifecycleState("own_profile_fallback_timeout") } } @@ -488,17 +485,6 @@ object ProtocolManager { private val fullFailureBatchStreak = AtomicInteger(0) private val syncBatchEndMutex = Mutex() - // iOS parity: outgoing message retry mechanism. - // iOS retries stuck WAITING messages every 4 seconds, up to 3 attempts, - // with a hard timeout of 80 seconds. Without this, messages stay as "clocks" - // until the next reconnect (which may never happen if the connection is alive). - private const val OUTGOING_RETRY_INTERVAL_MS = 4_000L - private const val OUTGOING_MAX_RETRY_ATTEMPTS = 3 - private const val OUTGOING_MAX_LIFETIME_MS = 80_000L - private val pendingOutgoingPackets = ConcurrentHashMap() - private val pendingOutgoingAttempts = ConcurrentHashMap() - private val pendingOutgoingRetryJobs = ConcurrentHashMap() - private fun setSyncInProgress(value: Boolean) { syncBatchInProgress = value if (_syncInProgress.value != value) { @@ -524,7 +510,9 @@ object ProtocolManager { val timestamp = java.text.SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault()).format(Date()) val line = "[$timestamp] $normalizedMessage" - persistProtocolTraceLine(line) + if (shouldPersistProtocolTrace(normalizedMessage)) { + persistProtocolTraceLine(line) + } if (!uiLogsEnabled) return synchronized(debugLogsLock) { if (debugLogsBuffer.size >= MAX_DEBUG_LOGS) { @@ -535,6 +523,23 @@ object ProtocolManager { flushDebugLogsThrottled() } + /** + * Keep crash_reports trace lightweight when UI logs are disabled. + * This avoids excessive disk writes during long sync sessions. + */ + private fun shouldPersistProtocolTrace(message: String): Boolean { + if (uiLogsEnabled) return true + if (message.startsWith("❌") || message.startsWith("⚠️")) return true + if (message.contains("STATE CHANGE")) return true + if (message.contains("CONNECTION FULLY ESTABLISHED")) return true + if (message.contains("HANDSHAKE COMPLETE")) return true + if (message.contains("SYNC COMPLETE")) return true + if (message.startsWith("🔌 CONNECT CALLED") || message.startsWith("🔌 Connecting to")) return true + if (message.startsWith("✅ WebSocket OPEN")) return true + if (message.startsWith("📡 NETWORK")) return true + return false + } + private fun persistProtocolTraceLine(line: String) { val context = appContext ?: return runCatching { @@ -831,112 +836,33 @@ object ProtocolManager { // + обновляет own profile (username/name) аналогично Desktop useUserInformation() waitPacket(0x03) { packet -> val searchPacket = packet as PacketSearch - - if (searchPacket.users.isNotEmpty()) { - scope.launch(Dispatchers.IO) { - val ownPublicKey = - getProtocol().getPublicKey()?.trim().orEmpty().ifBlank { - messageRepository?.getCurrentAccountKey()?.trim().orEmpty() - } - searchPacket.users.forEach { user -> - val normalizedUserPublicKey = user.publicKey.trim() - // 🔍 Кэшируем всех пользователей (desktop parity: cachedUsers) - userInfoCache[normalizedUserPublicKey] = user - - // Resume pending resolves for this publicKey (case-insensitive). - pendingResolves - .keys - .filter { it.equals(normalizedUserPublicKey, ignoreCase = true) } - .forEach { key -> - pendingResolves.remove(key)?.forEach { cont -> - try { cont.resume(user) } catch (_: Exception) {} - } - } - - // Обновляем инфо в диалогах (для всех пользователей) - messageRepository?.updateDialogUserInfo( - normalizedUserPublicKey, - user.title, - user.username, - user.verified + + scope.launch(Dispatchers.IO) { + val ownPublicKey = + getProtocol().getPublicKey()?.trim().orEmpty().ifBlank { + messageRepository?.getCurrentAccountKey()?.trim().orEmpty() + } + + packetRouter.onSearchPacket(searchPacket) { user -> + val normalizedUserPublicKey = user.publicKey.trim() + messageRepository?.updateDialogUserInfo( + normalizedUserPublicKey, + user.title, + user.username, + user.verified + ) + + val ownProfileResolved = + ownProfileSyncService.applyOwnProfileFromSearch( + appContext = appContext, + ownPublicKey = ownPublicKey, + user = user ) - - // Если это наш own profile — сохраняем username/name в AccountManager - if (ownPublicKey.isNotBlank() && - normalizedUserPublicKey.equals(ownPublicKey, ignoreCase = true) && - appContext != null) { - val accountManager = AccountManager(appContext!!) - if (user.title.isNotBlank() && !isPlaceholderAccountName(user.title)) { - accountManager.updateAccountName(ownPublicKey, user.title) - } - if (user.username.isNotBlank()) { - accountManager.updateAccountUsername(ownPublicKey, user.username) - } - _ownProfileUpdated.value = System.currentTimeMillis() - postConnectionEvent( - ConnectionEvent.OwnProfileResolved(user.publicKey) - ) - } + if (ownProfileResolved) { + postConnectionEvent(ConnectionEvent.OwnProfileResolved(user.publicKey)) } } } - - // Resume pending resolves that got empty response (no match) - if (searchPacket.search.isNotEmpty() && searchPacket.users.none { it.publicKey == searchPacket.search }) { - pendingResolves.remove(searchPacket.search)?.forEach { cont -> - try { cont.resume(null) } catch (_: Exception) {} - } - } - - // Resume pending username/query searches. - // Server may return query in different case/format, so match robustly. - if (searchPacket.search.isNotEmpty()) { - val rawQuery = searchPacket.search.trim() - val normalizedQuery = normalizeSearchQuery(rawQuery) - val continuations = LinkedHashSet>>() - - fun collectByKey(key: String) { - if (key.isEmpty()) return - pendingSearchQueries.remove(key)?.let { continuations.addAll(it) } - } - - collectByKey(rawQuery) - if (normalizedQuery.isNotEmpty() && normalizedQuery != rawQuery) { - collectByKey(normalizedQuery) - } - - if (continuations.isEmpty()) { - val matchedByQuery = - pendingSearchQueries.keys.firstOrNull { pendingKey -> - pendingKey.equals(rawQuery, ignoreCase = true) || - normalizeSearchQuery(pendingKey) == normalizedQuery - } - if (matchedByQuery != null) collectByKey(matchedByQuery) - } - - if (continuations.isEmpty() && searchPacket.users.isNotEmpty()) { - val responseUsernames = - searchPacket.users - .map { normalizeSearchQuery(it.username) } - .filter { it.isNotEmpty() } - .toSet() - if (responseUsernames.isNotEmpty()) { - val matchedByUsers = - pendingSearchQueries.keys.firstOrNull { pendingKey -> - val normalizedPending = normalizeSearchQuery(pendingKey) - normalizedPending.isNotEmpty() && - responseUsernames.contains(normalizedPending) - } - if (matchedByUsers != null) collectByKey(matchedByUsers) - } - } - - continuations.forEach { cont -> - try { - cont.resume(searchPacket.users) - } catch (_: Exception) {} - } - } } // 🚀 Обработчик транспортного сервера (0x0F) @@ -1634,13 +1560,11 @@ object ProtocolManager { * Аналог Desktop: useUserInformation(ownPublicKey) → PacketSearch(0x03) */ private fun fetchOwnProfile() { - val publicKey = getProtocol().getPublicKey() ?: return - val privateHash = getProtocol().getPrivateHash() ?: return - - val packet = PacketSearch().apply { - this.privateKey = privateHash - this.search = publicKey - } + val packet = + ownProfileSyncService.buildOwnProfilePacket( + publicKey = getProtocol().getPublicKey(), + privateHash = getProtocol().getPrivateHash() + ) ?: return send(packet) } @@ -1651,63 +1575,25 @@ object ProtocolManager { * @param timeoutMs max wait time for server response (default 3s) */ suspend fun resolveUserName(publicKey: String, timeoutMs: Long = 3000): String? { - if (publicKey.isEmpty()) return null - - // 1. Check in-memory cache (instant) - userInfoCache[publicKey]?.let { cached -> - val name = cached.title.ifEmpty { cached.username } - if (name.isNotEmpty()) return name - } - - // 2. Send PacketSearch and wait for response via suspendCancellableCoroutine - val privateHash = try { getProtocol().getPrivateHash() } catch (_: Exception) { null } - ?: return null - - return try { - withTimeout(timeoutMs) { - suspendCancellableCoroutine { cont -> - // Register continuation in pending list - pendingResolves.getOrPut(publicKey) { mutableListOf() }.add(cont) - - cont.invokeOnCancellation { - pendingResolves[publicKey]?.remove(cont) - if (pendingResolves[publicKey]?.isEmpty() == true) { - pendingResolves.remove(publicKey) - } - } - - // Send search request - val packet = PacketSearch().apply { - this.privateKey = privateHash - this.search = publicKey - } - send(packet) - } - }?.let { user -> user.title.ifEmpty { user.username }.ifEmpty { null } } - } catch (_: Exception) { - // Timeout or cancellation — clean up - pendingResolves.remove(publicKey) - null - } + return packetRouter.resolveUserName(publicKey = publicKey, timeoutMs = timeoutMs) } /** * 🔍 Get cached user info (no network request) */ fun getCachedUserName(publicKey: String): String? { - val cached = userInfoCache[publicKey] ?: return null - return cached.title.ifEmpty { cached.username }.ifEmpty { null } + return packetRouter.getCachedUserName(publicKey) } /** * 🔍 Get full cached user info (no network request) */ fun notifyOwnProfileUpdated() { - _ownProfileUpdated.value = System.currentTimeMillis() + ownProfileSyncService.notifyOwnProfileUpdated() } fun getCachedUserInfo(publicKey: String): SearchUser? { - return userInfoCache[publicKey] + return packetRouter.getCachedUserInfo(publicKey) } /** @@ -1715,47 +1601,14 @@ object ProtocolManager { * Username compare is case-insensitive and ignores '@'. */ fun getCachedUserByUsername(username: String): SearchUser? { - val normalizedUsername = normalizeSearchQuery(username) - if (normalizedUsername.isEmpty()) return null - return userInfoCache.values.firstOrNull { cached -> - normalizeSearchQuery(cached.username) == normalizedUsername - } + return packetRouter.getCachedUserByUsername(username) } /** * 🔍 Resolve publicKey → full SearchUser (with server request if needed) */ suspend fun resolveUserInfo(publicKey: String, timeoutMs: Long = 3000): SearchUser? { - if (publicKey.isEmpty()) return null - - // 1. Check in-memory cache - userInfoCache[publicKey]?.let { return it } - - // 2. Send PacketSearch and wait - val privateHash = try { getProtocol().getPrivateHash() } catch (_: Exception) { null } - ?: return null - - return try { - withTimeout(timeoutMs) { - suspendCancellableCoroutine { cont -> - pendingResolves.getOrPut(publicKey) { mutableListOf() }.add(cont) - cont.invokeOnCancellation { - pendingResolves[publicKey]?.remove(cont) - if (pendingResolves[publicKey]?.isEmpty() == true) { - pendingResolves.remove(publicKey) - } - } - val packet = PacketSearch().apply { - this.privateKey = privateHash - this.search = publicKey - } - send(packet) - } - } - } catch (_: Exception) { - pendingResolves.remove(publicKey) - null - } + return packetRouter.resolveUserInfo(publicKey = publicKey, timeoutMs = timeoutMs) } /** @@ -1763,45 +1616,7 @@ object ProtocolManager { * Returns raw PacketSearch users list for the exact query. */ suspend fun searchUsers(query: String, timeoutMs: Long = 3000): List { - val normalizedQuery = normalizeSearchQuery(query) - if (normalizedQuery.isEmpty()) return emptyList() - - val privateHash = try { getProtocol().getPrivateHash() } catch (_: Exception) { null } - ?: return emptyList() - - val cachedMatches = - userInfoCache.values.filter { cached -> - normalizeSearchQuery(cached.username) == normalizedQuery && cached.publicKey.isNotBlank() - } - if (cachedMatches.isNotEmpty()) { - return cachedMatches.distinctBy { it.publicKey } - } - - return try { - withTimeout(timeoutMs) { - suspendCancellableCoroutine { cont -> - pendingSearchQueries - .getOrPut(normalizedQuery) { mutableListOf() } - .add(cont) - - cont.invokeOnCancellation { - pendingSearchQueries[normalizedQuery]?.remove(cont) - if (pendingSearchQueries[normalizedQuery]?.isEmpty() == true) { - pendingSearchQueries.remove(normalizedQuery) - } - } - - val packet = PacketSearch().apply { - this.privateKey = privateHash - this.search = normalizedQuery - } - send(packet) - } - } - } catch (_: Exception) { - pendingSearchQueries.remove(normalizedQuery) - emptyList() - } + return packetRouter.searchUsers(query = query, timeoutMs = timeoutMs) } /** @@ -1835,78 +1650,25 @@ object ProtocolManager { /** * Send an outgoing message packet and register it for automatic retry. - * iOS parity: registerOutgoingRetry + scheduleOutgoingRetry. */ fun sendMessageWithRetry(packet: PacketMessage) { send(packet) - registerOutgoingRetry(packet) + retryQueueService.register(packet) } /** - * iOS parity: register an outgoing message for automatic retry. + * iOS parity: mark an outgoing message as error in persistent storage. */ - private fun registerOutgoingRetry(packet: PacketMessage) { - val messageId = packet.messageId - pendingOutgoingRetryJobs[messageId]?.cancel() - pendingOutgoingPackets[messageId] = packet - pendingOutgoingAttempts[messageId] = 0 - scheduleOutgoingRetry(messageId) - } - - /** - * iOS parity: schedule a retry attempt for a pending outgoing message. - * Retries every 4 seconds, marks as ERROR after 80s or 3 attempts. - */ - private fun scheduleOutgoingRetry(messageId: String) { - pendingOutgoingRetryJobs[messageId]?.cancel() - pendingOutgoingRetryJobs[messageId] = scope.launch { - delay(OUTGOING_RETRY_INTERVAL_MS) - - val packet = pendingOutgoingPackets[messageId] ?: return@launch - val attempts = pendingOutgoingAttempts[messageId] ?: 0 - - // Check if message exceeded delivery timeout (80s) - val nowMs = System.currentTimeMillis() - val ageMs = nowMs - packet.timestamp - if (ageMs >= OUTGOING_MAX_LIFETIME_MS) { - addLog("⚠️ Message ${messageId.take(8)} expired after ${ageMs}ms — marking as error") - markOutgoingAsError(messageId, packet) - return@launch + private suspend fun markOutgoingAsError(messageId: String, packet: PacketMessage) { + val repository = messageRepository ?: return + val opponentKey = + if (packet.fromPublicKey == repository.getCurrentAccountKey()) { + packet.toPublicKey + } else { + packet.fromPublicKey } - - if (attempts >= OUTGOING_MAX_RETRY_ATTEMPTS) { - addLog("⚠️ Message ${messageId.take(8)} exhausted $attempts retries — marking as error") - markOutgoingAsError(messageId, packet) - return@launch - } - - if (!isAuthenticated()) { - // Not authenticated — defer to reconnect retry - addLog("⏳ Message ${messageId.take(8)} retry deferred — not authenticated") - resolveOutgoingRetry(messageId) - return@launch - } - - val nextAttempt = attempts + 1 - pendingOutgoingAttempts[messageId] = nextAttempt - addLog("🔄 Retrying message ${messageId.take(8)}, attempt $nextAttempt") - send(packet) - scheduleOutgoingRetry(messageId) - } - } - - /** - * iOS parity: mark an outgoing message as error and clean up retry state. - */ - private fun markOutgoingAsError(messageId: String, packet: PacketMessage) { - scope.launch { - val repository = messageRepository ?: return@launch - val opponentKey = if (packet.fromPublicKey == repository.getCurrentAccountKey()) - packet.toPublicKey else packet.fromPublicKey - val dialogKey = repository.getDialogKey(opponentKey) - repository.updateMessageDeliveryStatus(dialogKey, messageId, DeliveryStatus.ERROR) - } - resolveOutgoingRetry(messageId) + val dialogKey = repository.getDialogKey(opponentKey) + repository.updateMessageDeliveryStatus(dialogKey, messageId, DeliveryStatus.ERROR) } /** @@ -1914,20 +1676,14 @@ object ProtocolManager { * Called when delivery ACK (0x08) is received. */ fun resolveOutgoingRetry(messageId: String) { - pendingOutgoingRetryJobs[messageId]?.cancel() - pendingOutgoingRetryJobs.remove(messageId) - pendingOutgoingPackets.remove(messageId) - pendingOutgoingAttempts.remove(messageId) + retryQueueService.resolve(messageId) } /** * Cancel all pending outgoing retry jobs (e.g., on disconnect). */ private fun cancelAllOutgoingRetries() { - pendingOutgoingRetryJobs.values.forEach { it.cancel() } - pendingOutgoingRetryJobs.clear() - pendingOutgoingPackets.clear() - pendingOutgoingAttempts.clear() + retryQueueService.clear() } /** @@ -2056,14 +1812,21 @@ object ProtocolManager { * Register packet handler */ fun waitPacket(packetId: Int, callback: (Packet) -> Unit) { - getProtocol().waitPacket(packetId, callback) + packetSubscriptionRegistry.addCallback(packetId, callback) } /** * Unregister packet handler */ fun unwaitPacket(packetId: Int, callback: (Packet) -> Unit) { - getProtocol().unwaitPacket(packetId, callback) + packetSubscriptionRegistry.removeCallback(packetId, callback) + } + + /** + * SharedFlow fan-out stream for packet id. + */ + fun packetFlow(packetId: Int): SharedFlow { + return packetSubscriptionRegistry.flow(packetId) } private fun buildHandshakeDevice(): HandshakeDevice { @@ -2139,6 +1902,7 @@ object ProtocolManager { */ fun destroy() { stopWaitingForNetwork("destroy") + packetSubscriptionRegistry.destroy() synchronized(protocolInstanceLock) { protocol?.destroy() protocol = null diff --git a/app/src/main/java/com/rosetta/messenger/network/connection/BootstrapCoordinator.kt b/app/src/main/java/com/rosetta/messenger/network/connection/BootstrapCoordinator.kt new file mode 100644 index 0000000..0ca48b9 --- /dev/null +++ b/app/src/main/java/com/rosetta/messenger/network/connection/BootstrapCoordinator.kt @@ -0,0 +1,92 @@ +package com.rosetta.messenger.network.connection + +import com.rosetta.messenger.network.* + +class BootstrapCoordinator( + private val readyPacketGate: ReadyPacketGate, + private val addLog: (String) -> Unit, + private val shortKeyForLog: (String) -> String, + private val sendPacketDirect: (Packet) -> Unit +) { + fun protocolToLifecycleState(state: ProtocolState): ConnectionLifecycleState = + when (state) { + ProtocolState.DISCONNECTED -> ConnectionLifecycleState.DISCONNECTED + ProtocolState.CONNECTING -> ConnectionLifecycleState.CONNECTING + ProtocolState.CONNECTED, ProtocolState.HANDSHAKING -> ConnectionLifecycleState.HANDSHAKING + ProtocolState.DEVICE_VERIFICATION_REQUIRED -> + ConnectionLifecycleState.DEVICE_VERIFICATION_REQUIRED + ProtocolState.AUTHENTICATED -> ConnectionLifecycleState.AUTHENTICATED + } + + fun packetCanBypassReadyGate(packet: Packet): Boolean = + when (packet) { + is PacketHandshake, + is PacketSync, + is PacketSearch, + is PacketPushNotification, + is PacketRequestTransport, + is PacketRequestUpdate, + is PacketSignalPeer, + is PacketWebRTC, + is PacketIceServers, + is PacketDeviceResolve -> true + else -> false + } + + fun recomputeLifecycleState( + context: ConnectionBootstrapContext, + currentState: ConnectionLifecycleState, + reason: String, + onStateChanged: (ConnectionLifecycleState, String) -> Unit + ): ConnectionLifecycleState { + val nextState = + if (context.authenticated) { + if (context.accountInitialized && context.syncCompleted && context.ownProfileResolved) { + ConnectionLifecycleState.READY + } else { + ConnectionLifecycleState.BOOTSTRAPPING + } + } else { + protocolToLifecycleState(context.protocolState) + } + + if (currentState != nextState) { + onStateChanged(nextState, reason) + } + + if (nextState == ConnectionLifecycleState.READY) { + flushReadyPacketQueue(context.accountPublicKey, reason) + } + + return nextState + } + + fun clearReadyPacketQueue(reason: String) { + readyPacketGate.clear(reason = reason, addLog = addLog) + } + + fun enqueueReadyPacket( + packet: Packet, + accountPublicKey: String, + state: ConnectionLifecycleState + ) { + readyPacketGate.enqueue( + packet = packet, + accountPublicKey = accountPublicKey, + state = state, + shortKeyForLog = shortKeyForLog, + addLog = addLog + ) + } + + fun flushReadyPacketQueue(activeAccountKey: String, reason: String) { + val packetsToSend = + readyPacketGate.drainForAccount( + activeAccountKey = activeAccountKey, + reason = reason, + addLog = addLog + ) + if (packetsToSend.isEmpty()) return + packetsToSend.forEach(sendPacketDirect) + } +} diff --git a/app/src/main/java/com/rosetta/messenger/network/connection/ConnectionOrchestrator.kt b/app/src/main/java/com/rosetta/messenger/network/connection/ConnectionOrchestrator.kt new file mode 100644 index 0000000..a5e9f86 --- /dev/null +++ b/app/src/main/java/com/rosetta/messenger/network/connection/ConnectionOrchestrator.kt @@ -0,0 +1,45 @@ +package com.rosetta.messenger.network.connection + +import android.content.Context +import com.rosetta.messenger.data.AccountManager +import com.rosetta.messenger.network.HandshakeDevice +import com.rosetta.messenger.network.Protocol + +class ConnectionOrchestrator( + private val hasActiveInternet: () -> Boolean, + private val waitForNetworkAndReconnect: (String) -> Unit, + private val stopWaitingForNetwork: (String) -> Unit, + private val getProtocol: () -> Protocol, + private val appContextProvider: () -> Context?, + private val buildHandshakeDevice: () -> HandshakeDevice +) { + fun handleConnect(reason: String) { + if (!hasActiveInternet()) { + waitForNetworkAndReconnect("connect:$reason") + return + } + stopWaitingForNetwork("connect:$reason") + getProtocol().connect() + } + + fun handleFastReconnect(reason: String) { + if (!hasActiveInternet()) { + waitForNetworkAndReconnect("reconnect:$reason") + return + } + stopWaitingForNetwork("reconnect:$reason") + getProtocol().reconnectNowIfNeeded(reason) + } + + fun handleAuthenticate(publicKey: String, privateHash: String) { + appContextProvider()?.let { context -> + runCatching { + val accountManager = AccountManager(context) + accountManager.setLastLoggedPublicKey(publicKey) + accountManager.setLastLoggedPrivateKeyHash(privateHash) + } + } + val device = buildHandshakeDevice() + getProtocol().startHandshake(publicKey, privateHash, device) + } +} diff --git a/app/src/main/java/com/rosetta/messenger/network/connection/OwnProfileSyncService.kt b/app/src/main/java/com/rosetta/messenger/network/connection/OwnProfileSyncService.kt new file mode 100644 index 0000000..add8d95 --- /dev/null +++ b/app/src/main/java/com/rosetta/messenger/network/connection/OwnProfileSyncService.kt @@ -0,0 +1,53 @@ +package com.rosetta.messenger.network.connection + +import android.content.Context +import com.rosetta.messenger.data.AccountManager +import com.rosetta.messenger.network.PacketSearch +import com.rosetta.messenger.network.SearchUser +import com.rosetta.messenger.session.IdentityStore +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asStateFlow + +class OwnProfileSyncService( + private val isPlaceholderAccountName: (String?) -> Boolean +) { + private val _ownProfileUpdated = MutableStateFlow(0L) + val ownProfileUpdated: StateFlow = _ownProfileUpdated.asStateFlow() + + fun notifyOwnProfileUpdated() { + _ownProfileUpdated.value = System.currentTimeMillis() + } + + suspend fun applyOwnProfileFromSearch( + appContext: Context?, + ownPublicKey: String, + user: SearchUser + ): Boolean { + if (ownPublicKey.isBlank()) return false + if (!user.publicKey.equals(ownPublicKey, ignoreCase = true)) return false + + val context = appContext ?: return true + val accountManager = AccountManager(context) + if (user.title.isNotBlank() && !isPlaceholderAccountName(user.title)) { + accountManager.updateAccountName(ownPublicKey, user.title) + } + if (user.username.isNotBlank()) { + accountManager.updateAccountUsername(ownPublicKey, user.username) + } + IdentityStore.updateOwnProfile(user, reason = "protocol_search_own_profile") + _ownProfileUpdated.value = System.currentTimeMillis() + return true + } + + fun buildOwnProfilePacket(publicKey: String?, privateHash: String?): PacketSearch? { + val normalizedPublicKey = publicKey?.trim().orEmpty() + val normalizedPrivateHash = privateHash?.trim().orEmpty() + if (normalizedPublicKey.isEmpty() || normalizedPrivateHash.isEmpty()) return null + + return PacketSearch().apply { + this.privateKey = normalizedPrivateHash + this.search = normalizedPublicKey + } + } +} diff --git a/app/src/main/java/com/rosetta/messenger/network/connection/PacketRouter.kt b/app/src/main/java/com/rosetta/messenger/network/connection/PacketRouter.kt new file mode 100644 index 0000000..7033564 --- /dev/null +++ b/app/src/main/java/com/rosetta/messenger/network/connection/PacketRouter.kt @@ -0,0 +1,221 @@ +package com.rosetta.messenger.network.connection + +import com.rosetta.messenger.network.PacketSearch +import com.rosetta.messenger.network.SearchUser +import kotlinx.coroutines.suspendCancellableCoroutine +import kotlinx.coroutines.withTimeout +import java.util.LinkedHashSet +import java.util.Locale +import java.util.concurrent.ConcurrentHashMap +import kotlin.coroutines.resume + +class PacketRouter( + private val sendSearchPacket: (PacketSearch) -> Unit, + private val privateHashProvider: () -> String? +) { + private val userInfoCache = ConcurrentHashMap() + private val pendingResolves = + ConcurrentHashMap>>() + private val pendingSearchQueries = + ConcurrentHashMap>>>() + + private fun normalizeSearchQuery(value: String): String = + value.trim().removePrefix("@").lowercase(Locale.ROOT) + + suspend fun onSearchPacket(packet: PacketSearch, onUserDiscovered: suspend (SearchUser) -> Unit) { + if (packet.users.isNotEmpty()) { + packet.users.forEach { user -> + val normalizedUserPublicKey = user.publicKey.trim() + userInfoCache[normalizedUserPublicKey] = user + + pendingResolves + .keys + .filter { it.equals(normalizedUserPublicKey, ignoreCase = true) } + .forEach { key -> + pendingResolves.remove(key)?.forEach { cont -> + try { + cont.resume(user) + } catch (_: Exception) {} + } + } + + onUserDiscovered(user) + } + } + + if (packet.search.isNotEmpty() && packet.users.none { it.publicKey == packet.search }) { + pendingResolves.remove(packet.search)?.forEach { cont -> + try { + cont.resume(null) + } catch (_: Exception) {} + } + } + + if (packet.search.isNotEmpty()) { + val rawQuery = packet.search.trim() + val normalizedQuery = normalizeSearchQuery(rawQuery) + val continuations = + LinkedHashSet>>() + + fun collectByKey(key: String) { + if (key.isEmpty()) return + pendingSearchQueries.remove(key)?.let { continuations.addAll(it) } + } + + collectByKey(rawQuery) + if (normalizedQuery.isNotEmpty() && normalizedQuery != rawQuery) { + collectByKey(normalizedQuery) + } + + if (continuations.isEmpty()) { + val matchedByQuery = + pendingSearchQueries.keys.firstOrNull { pendingKey -> + pendingKey.equals(rawQuery, ignoreCase = true) || + normalizeSearchQuery(pendingKey) == normalizedQuery + } + if (matchedByQuery != null) collectByKey(matchedByQuery) + } + + if (continuations.isEmpty() && packet.users.isNotEmpty()) { + val responseUsernames = + packet.users + .map { normalizeSearchQuery(it.username) } + .filter { it.isNotEmpty() } + .toSet() + if (responseUsernames.isNotEmpty()) { + val matchedByUsers = + pendingSearchQueries.keys.firstOrNull { pendingKey -> + val normalizedPending = normalizeSearchQuery(pendingKey) + normalizedPending.isNotEmpty() && + responseUsernames.contains(normalizedPending) + } + if (matchedByUsers != null) collectByKey(matchedByUsers) + } + } + + continuations.forEach { cont -> + try { + cont.resume(packet.users) + } catch (_: Exception) {} + } + } + } + + fun getCachedUserName(publicKey: String): String? { + val cached = userInfoCache[publicKey] ?: return null + return cached.title.ifEmpty { cached.username }.ifEmpty { null } + } + + fun getCachedUserInfo(publicKey: String): SearchUser? = userInfoCache[publicKey] + + fun getCachedUserByUsername(username: String): SearchUser? { + val normalizedUsername = normalizeSearchQuery(username) + if (normalizedUsername.isEmpty()) return null + return userInfoCache.values.firstOrNull { cached -> + normalizeSearchQuery(cached.username) == normalizedUsername + } + } + + suspend fun resolveUserName(publicKey: String, timeoutMs: Long = 3000): String? { + if (publicKey.isEmpty()) return null + + userInfoCache[publicKey]?.let { cached -> + val name = cached.title.ifEmpty { cached.username } + if (name.isNotEmpty()) return name + } + + val privateHash = privateHashProvider()?.takeIf { it.isNotBlank() } ?: return null + + return try { + withTimeout(timeoutMs) { + suspendCancellableCoroutine { cont -> + pendingResolves.getOrPut(publicKey) { mutableListOf() }.add(cont) + + cont.invokeOnCancellation { + pendingResolves[publicKey]?.remove(cont) + if (pendingResolves[publicKey]?.isEmpty() == true) { + pendingResolves.remove(publicKey) + } + } + + val packet = PacketSearch().apply { + this.privateKey = privateHash + this.search = publicKey + } + sendSearchPacket(packet) + } + }?.let { user -> user.title.ifEmpty { user.username }.ifEmpty { null } } + } catch (_: Exception) { + pendingResolves.remove(publicKey) + null + } + } + + suspend fun resolveUserInfo(publicKey: String, timeoutMs: Long = 3000): SearchUser? { + if (publicKey.isEmpty()) return null + + userInfoCache[publicKey]?.let { return it } + val privateHash = privateHashProvider()?.takeIf { it.isNotBlank() } ?: return null + + return try { + withTimeout(timeoutMs) { + suspendCancellableCoroutine { cont -> + pendingResolves.getOrPut(publicKey) { mutableListOf() }.add(cont) + cont.invokeOnCancellation { + pendingResolves[publicKey]?.remove(cont) + if (pendingResolves[publicKey]?.isEmpty() == true) { + pendingResolves.remove(publicKey) + } + } + val packet = PacketSearch().apply { + this.privateKey = privateHash + this.search = publicKey + } + sendSearchPacket(packet) + } + } + } catch (_: Exception) { + pendingResolves.remove(publicKey) + null + } + } + + suspend fun searchUsers(query: String, timeoutMs: Long = 3000): List { + val normalizedQuery = normalizeSearchQuery(query) + if (normalizedQuery.isEmpty()) return emptyList() + + val privateHash = privateHashProvider()?.takeIf { it.isNotBlank() } ?: return emptyList() + + val cachedMatches = + userInfoCache.values.filter { cached -> + normalizeSearchQuery(cached.username) == normalizedQuery && cached.publicKey.isNotBlank() + } + if (cachedMatches.isNotEmpty()) { + return cachedMatches.distinctBy { it.publicKey } + } + + return try { + withTimeout(timeoutMs) { + suspendCancellableCoroutine { cont -> + pendingSearchQueries.getOrPut(normalizedQuery) { mutableListOf() }.add(cont) + + cont.invokeOnCancellation { + pendingSearchQueries[normalizedQuery]?.remove(cont) + if (pendingSearchQueries[normalizedQuery]?.isEmpty() == true) { + pendingSearchQueries.remove(normalizedQuery) + } + } + + val packet = PacketSearch().apply { + this.privateKey = privateHash + this.search = normalizedQuery + } + sendSearchPacket(packet) + } + } + } catch (_: Exception) { + pendingSearchQueries.remove(normalizedQuery) + emptyList() + } + } +} diff --git a/app/src/main/java/com/rosetta/messenger/network/connection/RetryQueueService.kt b/app/src/main/java/com/rosetta/messenger/network/connection/RetryQueueService.kt new file mode 100644 index 0000000..f3ac434 --- /dev/null +++ b/app/src/main/java/com/rosetta/messenger/network/connection/RetryQueueService.kt @@ -0,0 +1,96 @@ +package com.rosetta.messenger.network.connection + +import com.rosetta.messenger.network.PacketMessage +import java.util.concurrent.ConcurrentHashMap +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch + +/** + * Outgoing retry queue for PacketMessage delivery. + * + * Mirrors iOS behavior: + * - retry every 4s, + * - max 3 attempts, + * - max 80s lifetime. + */ +class RetryQueueService( + private val scope: CoroutineScope, + private val sendPacket: (PacketMessage) -> Unit, + private val isAuthenticated: () -> Boolean, + private val addLog: (String) -> Unit, + private val markOutgoingAsError: suspend (messageId: String, packet: PacketMessage) -> Unit, + private val retryIntervalMs: Long = 4_000L, + private val maxRetryAttempts: Int = 3, + private val maxLifetimeMs: Long = 80_000L +) { + private val pendingOutgoingPackets = ConcurrentHashMap() + private val pendingOutgoingAttempts = ConcurrentHashMap() + private val pendingOutgoingRetryJobs = ConcurrentHashMap() + + fun register(packet: PacketMessage) { + val messageId = packet.messageId + pendingOutgoingRetryJobs[messageId]?.cancel() + pendingOutgoingPackets[messageId] = packet + pendingOutgoingAttempts[messageId] = 0 + schedule(messageId) + } + + fun resolve(messageId: String) { + pendingOutgoingRetryJobs[messageId]?.cancel() + pendingOutgoingRetryJobs.remove(messageId) + pendingOutgoingPackets.remove(messageId) + pendingOutgoingAttempts.remove(messageId) + } + + fun clear() { + pendingOutgoingRetryJobs.values.forEach { it.cancel() } + pendingOutgoingRetryJobs.clear() + pendingOutgoingPackets.clear() + pendingOutgoingAttempts.clear() + } + + private fun schedule(messageId: String) { + pendingOutgoingRetryJobs[messageId]?.cancel() + pendingOutgoingRetryJobs[messageId] = + scope.launch { + delay(retryIntervalMs) + + val packet = pendingOutgoingPackets[messageId] ?: return@launch + val attempts = pendingOutgoingAttempts[messageId] ?: 0 + + val nowMs = System.currentTimeMillis() + val ageMs = nowMs - packet.timestamp + if (ageMs >= maxLifetimeMs) { + addLog( + "⚠️ Message ${messageId.take(8)} expired after ${ageMs}ms — marking as error" + ) + scope.launch { markOutgoingAsError(messageId, packet) } + resolve(messageId) + return@launch + } + + if (attempts >= maxRetryAttempts) { + addLog( + "⚠️ Message ${messageId.take(8)} exhausted $attempts retries — marking as error" + ) + scope.launch { markOutgoingAsError(messageId, packet) } + resolve(messageId) + return@launch + } + + if (!isAuthenticated()) { + addLog("⏳ Message ${messageId.take(8)} retry deferred — not authenticated") + resolve(messageId) + return@launch + } + + val nextAttempt = attempts + 1 + pendingOutgoingAttempts[messageId] = nextAttempt + addLog("🔄 Retrying message ${messageId.take(8)}, attempt $nextAttempt") + sendPacket(packet) + schedule(messageId) + } + } +} diff --git a/app/src/main/java/com/rosetta/messenger/session/AppSessionCoordinator.kt b/app/src/main/java/com/rosetta/messenger/session/AppSessionCoordinator.kt new file mode 100644 index 0000000..a7001f7 --- /dev/null +++ b/app/src/main/java/com/rosetta/messenger/session/AppSessionCoordinator.kt @@ -0,0 +1,78 @@ +package com.rosetta.messenger.session + +import com.rosetta.messenger.data.AccountManager +import com.rosetta.messenger.data.DecryptedAccount +import com.rosetta.messenger.network.ProtocolManager +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asStateFlow + +sealed interface SessionState { + data object LoggedOut : SessionState + data class AuthInProgress( + val publicKey: String? = null, + val reason: String = "" + ) : SessionState + data class Ready( + val account: DecryptedAccount, + val reason: String = "" + ) : SessionState +} + +/** + * Single source of truth for app-level auth/session lifecycle. + * UI should rely on this state instead of scattering account checks. + */ +object AppSessionCoordinator { + private val _sessionState = MutableStateFlow(SessionState.LoggedOut) + val sessionState: StateFlow = _sessionState.asStateFlow() + + fun markLoggedOut(reason: String = "") { + _sessionState.value = SessionState.LoggedOut + IdentityStore.markLoggedOut(reason = reason) + } + + fun markAuthInProgress(publicKey: String? = null, reason: String = "") { + _sessionState.value = SessionState.AuthInProgress(publicKey = publicKey, reason = reason) + IdentityStore.markAuthInProgress(publicKey = publicKey, reason = reason) + } + + fun markReady(account: DecryptedAccount, reason: String = "") { + _sessionState.value = SessionState.Ready(account = account, reason = reason) + IdentityStore.setAccount(account = account, reason = reason) + } + + fun syncFromCachedAccount(account: DecryptedAccount?) { + if (account == null) { + if (_sessionState.value is SessionState.Ready) { + _sessionState.value = SessionState.LoggedOut + } + IdentityStore.markLoggedOut(reason = "cached_account_cleared") + return + } + _sessionState.value = SessionState.Ready(account = account, reason = "cached") + IdentityStore.setAccount(account = account, reason = "cached") + } + + /** + * Unified bootstrap used by registration and unlock flows. + * Keeps protocol/account initialization sequence in one place. + */ + suspend fun bootstrapAuthenticatedSession( + accountManager: AccountManager, + account: DecryptedAccount, + reason: String + ) { + markAuthInProgress(publicKey = account.publicKey, reason = reason) + + // Initialize storage-bound account context before handshake completes + // to avoid early sync/message race conditions. + ProtocolManager.initializeAccount(account.publicKey, account.privateKey) + ProtocolManager.connect() + ProtocolManager.authenticate(account.publicKey, account.privateKeyHash) + ProtocolManager.reconnectNowIfNeeded("session_bootstrap_$reason") + + accountManager.setCurrentAccount(account.publicKey) + markReady(account, reason = reason) + } +} diff --git a/app/src/main/java/com/rosetta/messenger/session/IdentityStore.kt b/app/src/main/java/com/rosetta/messenger/session/IdentityStore.kt new file mode 100644 index 0000000..c60edf9 --- /dev/null +++ b/app/src/main/java/com/rosetta/messenger/session/IdentityStore.kt @@ -0,0 +1,125 @@ +package com.rosetta.messenger.session + +import com.rosetta.messenger.data.DecryptedAccount +import com.rosetta.messenger.network.SearchUser +import kotlinx.coroutines.flow.MutableStateFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.asStateFlow + +data class IdentityProfile( + val publicKey: String, + val displayName: String = "", + val username: String = "", + val verified: Int = 0, + val resolved: Boolean = false, + val updatedAtMs: Long = System.currentTimeMillis() +) + +data class IdentityStateSnapshot( + val account: DecryptedAccount? = null, + val profile: IdentityProfile? = null, + val authInProgress: Boolean = false, + val pendingPublicKey: String? = null, + val reason: String = "" +) { + val ownProfileResolved: Boolean + get() { + val activeAccount = account ?: return false + val ownProfile = profile ?: return false + return ownProfile.resolved && ownProfile.publicKey.equals(activeAccount.publicKey, ignoreCase = true) + } +} + +/** + * Runtime identity source of truth for account/profile resolution. + */ +object IdentityStore { + private val _state = MutableStateFlow(IdentityStateSnapshot()) + val state: StateFlow = _state.asStateFlow() + + fun markLoggedOut(reason: String = "") { + _state.value = + IdentityStateSnapshot( + account = null, + profile = null, + authInProgress = false, + pendingPublicKey = null, + reason = reason + ) + } + + fun markAuthInProgress(publicKey: String? = null, reason: String = "") { + _state.value = + _state.value.copy( + authInProgress = true, + pendingPublicKey = publicKey?.trim().orEmpty().ifBlank { null }, + reason = reason + ) + } + + fun setAccount(account: DecryptedAccount, reason: String = "") { + val current = _state.value + val existingProfile = current.profile + val nextProfile = + if ( + existingProfile != null && + existingProfile.publicKey.equals(account.publicKey, ignoreCase = true) + ) { + existingProfile + } else { + null + } + + _state.value = + current.copy( + account = account, + profile = nextProfile, + authInProgress = false, + pendingPublicKey = null, + reason = reason + ) + } + + fun updateOwnProfile( + publicKey: String, + displayName: String? = null, + username: String? = null, + verified: Int? = null, + resolved: Boolean = true, + reason: String = "" + ) { + val normalizedPublicKey = publicKey.trim() + if (normalizedPublicKey.isBlank()) return + + val current = _state.value + val base = + current.profile?.takeIf { it.publicKey.equals(normalizedPublicKey, ignoreCase = true) } + ?: IdentityProfile(publicKey = normalizedPublicKey) + + val nextProfile = + base.copy( + displayName = displayName?.takeIf { it.isNotBlank() } ?: base.displayName, + username = username?.takeIf { it.isNotBlank() } ?: base.username, + verified = verified ?: base.verified, + resolved = base.resolved || resolved, + updatedAtMs = System.currentTimeMillis() + ) + + _state.value = + current.copy( + profile = nextProfile, + reason = reason + ) + } + + fun updateOwnProfile(user: SearchUser, reason: String = "") { + updateOwnProfile( + publicKey = user.publicKey, + displayName = user.title, + username = user.username, + verified = user.verified, + resolved = true, + reason = reason + ) + } +} diff --git a/app/src/main/java/com/rosetta/messenger/ui/auth/AuthFlow.kt b/app/src/main/java/com/rosetta/messenger/ui/auth/AuthFlow.kt index cd84f62..215c2b8 100644 --- a/app/src/main/java/com/rosetta/messenger/ui/auth/AuthFlow.kt +++ b/app/src/main/java/com/rosetta/messenger/ui/auth/AuthFlow.kt @@ -8,6 +8,7 @@ import androidx.compose.ui.platform.LocalView import androidx.core.view.WindowCompat import com.rosetta.messenger.data.DecryptedAccount import com.rosetta.messenger.data.AccountManager +import com.rosetta.messenger.session.AppSessionCoordinator enum class AuthScreen { SELECT_ACCOUNT, @@ -62,6 +63,13 @@ fun AuthFlow( var showCreateModal by remember { mutableStateOf(false) } var isImportMode by remember { mutableStateOf(false) } + LaunchedEffect(currentScreen, selectedAccountId) { + AppSessionCoordinator.markAuthInProgress( + publicKey = selectedAccountId, + reason = "auth_flow_${currentScreen.name.lowercase()}" + ) + } + // If parent requests create mode while AuthFlow is alive, jump to Welcome/Create path. LaunchedEffect(startInCreateMode) { if (startInCreateMode && currentScreen == AuthScreen.UNLOCK) { diff --git a/app/src/main/java/com/rosetta/messenger/ui/auth/SetPasswordScreen.kt b/app/src/main/java/com/rosetta/messenger/ui/auth/SetPasswordScreen.kt index 2d0459d..3ec341a 100644 --- a/app/src/main/java/com/rosetta/messenger/ui/auth/SetPasswordScreen.kt +++ b/app/src/main/java/com/rosetta/messenger/ui/auth/SetPasswordScreen.kt @@ -29,7 +29,7 @@ import com.rosetta.messenger.crypto.CryptoManager import com.rosetta.messenger.data.AccountManager import com.rosetta.messenger.data.DecryptedAccount import com.rosetta.messenger.data.EncryptedAccount -import com.rosetta.messenger.network.ProtocolManager +import com.rosetta.messenger.session.AppSessionCoordinator import com.rosetta.messenger.ui.onboarding.PrimaryBlue import kotlinx.coroutines.launch @@ -309,11 +309,6 @@ fun SetPasswordScreen( ) accountManager.saveAccount(account) val privateKeyHash = CryptoManager.generatePrivateKeyHash(keyPair.privateKey) - // Initialize repository/account context before handshake completes to avoid - // "Sync postponed until account is initialized" race on first login. - ProtocolManager.initializeAccount(keyPair.publicKey, keyPair.privateKey) - startAuthHandshakeFast(keyPair.publicKey, privateKeyHash) - accountManager.setCurrentAccount(keyPair.publicKey) val decryptedAccount = DecryptedAccount( publicKey = keyPair.publicKey, privateKey = keyPair.privateKey, @@ -321,6 +316,11 @@ fun SetPasswordScreen( privateKeyHash = privateKeyHash, name = truncatedKey ) + AppSessionCoordinator.bootstrapAuthenticatedSession( + accountManager = accountManager, + account = decryptedAccount, + reason = "set_password" + ) onAccountCreated(decryptedAccount) } catch (e: Exception) { error = "Failed to create account: ${e.message}" diff --git a/app/src/main/java/com/rosetta/messenger/ui/auth/UnlockScreen.kt b/app/src/main/java/com/rosetta/messenger/ui/auth/UnlockScreen.kt index b7365f4..9c4152e 100644 --- a/app/src/main/java/com/rosetta/messenger/ui/auth/UnlockScreen.kt +++ b/app/src/main/java/com/rosetta/messenger/ui/auth/UnlockScreen.kt @@ -45,8 +45,8 @@ import com.rosetta.messenger.data.DecryptedAccount import com.rosetta.messenger.data.EncryptedAccount import com.rosetta.messenger.data.resolveAccountDisplayName import com.rosetta.messenger.database.RosettaDatabase -import com.rosetta.messenger.network.ProtocolManager import com.rosetta.messenger.repository.AvatarRepository +import com.rosetta.messenger.session.AppSessionCoordinator import com.rosetta.messenger.ui.components.AvatarImage import com.rosetta.messenger.ui.chats.getAvatarColor import com.rosetta.messenger.ui.chats.getAvatarText @@ -117,12 +117,11 @@ val decryptedPrivateKey = CryptoManager.decryptWithPassword( name = selectedAccount.name ) - // Initialize repository/account context before handshake completes to avoid - // "Sync postponed until account is initialized" race. - ProtocolManager.initializeAccount(account.publicKey, decryptedPrivateKey) - startAuthHandshakeFast(account.publicKey, privateKeyHash) - - accountManager.setCurrentAccount(account.publicKey) + AppSessionCoordinator.bootstrapAuthenticatedSession( + accountManager = accountManager, + account = decryptedAccount, + reason = "unlock" + ) onSuccess(decryptedAccount) } catch (e: Exception) { onError("Failed to unlock: ${e.message}") diff --git a/app/src/main/java/com/rosetta/messenger/ui/chats/ChatViewModel.kt b/app/src/main/java/com/rosetta/messenger/ui/chats/ChatViewModel.kt index 713385a..3088c73 100644 --- a/app/src/main/java/com/rosetta/messenger/ui/chats/ChatViewModel.kt +++ b/app/src/main/java/com/rosetta/messenger/ui/chats/ChatViewModel.kt @@ -18,6 +18,12 @@ import com.rosetta.messenger.network.* import com.rosetta.messenger.ui.components.metaball.DevicePerformanceClass import com.rosetta.messenger.ui.components.metaball.PerformanceClass import com.rosetta.messenger.ui.chats.models.* +import com.rosetta.messenger.ui.chats.usecase.ForwardPayloadMessage +import com.rosetta.messenger.ui.chats.usecase.SendForwardUseCase +import com.rosetta.messenger.ui.chats.usecase.SendMediaMessageCommand +import com.rosetta.messenger.ui.chats.usecase.SendMediaMessageUseCase +import com.rosetta.messenger.ui.chats.usecase.SendTextMessageCommand +import com.rosetta.messenger.ui.chats.usecase.SendTextMessageUseCase import com.rosetta.messenger.utils.AttachmentFileManager import com.rosetta.messenger.utils.MessageLogger import com.rosetta.messenger.utils.MessageThrottleManager @@ -119,6 +125,12 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) { // MessageRepository для подписки на события новых сообщений private val messageRepository = com.rosetta.messenger.data.MessageRepository.getInstance(application) + private val sendTextMessageUseCase = + SendTextMessageUseCase(sendWithRetry = { packet -> ProtocolManager.sendMessageWithRetry(packet) }) + private val sendMediaMessageUseCase = + SendMediaMessageUseCase(sendWithRetry = { packet -> ProtocolManager.sendMessageWithRetry(packet) }) + private val sendForwardUseCase = + SendForwardUseCase(sendWithRetry = { packet -> ProtocolManager.sendMessageWithRetry(packet) }) // 🔥 Кэш расшифрованных сообщений (messageId -> plainText) private val decryptionCache = ConcurrentHashMap() @@ -3987,26 +3999,24 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) { } val packet = - PacketMessage().apply { - fromPublicKey = command.senderPublicKey - toPublicKey = command.recipientPublicKey - content = encryptedContent - chachaKey = encryptedKey - this.aesChachaKey = aesChachaKey - this.timestamp = timestamp - this.privateKey = privateKeyHash - this.messageId = messageId - attachments = messageAttachments - } + sendTextMessageUseCase( + SendTextMessageCommand( + fromPublicKey = command.senderPublicKey, + toPublicKey = command.recipientPublicKey, + encryptedContent = encryptedContent, + encryptedKey = encryptedKey, + aesChachaKey = aesChachaKey, + privateKeyHash = privateKeyHash, + messageId = messageId, + timestamp = timestamp, + attachments = messageAttachments, + isSavedMessages = isSavedMessages + ) + ) // 🔥 DEBUG: Log packet before sending packet.attachments.forEachIndexed { idx, att -> } - // 📁 Для Saved Messages - НЕ отправляем пакет на сервер - if (!isSavedMessages) { - ProtocolManager.sendMessageWithRetry(packet) - } - withContext(Dispatchers.Main) { if (isSavedMessages) { updateMessageStatus(messageId, MessageStatus.SENT) @@ -4167,74 +4177,30 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) { encryptionContext.plainKeyAndNonce ?.joinToString("") { "%02x".format(it) } .orEmpty() + val forwardPayloadMessages = + forwardMessages.map { fm -> + ForwardPayloadMessage( + messageId = fm.messageId, + senderPublicKey = fm.senderPublicKey, + senderName = fm.senderName, + text = fm.text, + timestamp = fm.timestamp, + chachaKeyPlain = fm.chachaKeyPlain, + attachments = fm.attachments + ) + } fun buildForwardReplyJson( includeLocalUri: Boolean ): JSONArray { - val replyJsonArray = JSONArray() - forwardMessages.forEach { fm -> - val attachmentsArray = JSONArray() - fm.attachments.forEach { att -> - val fwdInfo = - forwardedAttMap[ - forwardAttachmentRewriteKey( - fm.messageId, - att.id - ) - ] - val attId = fwdInfo?.id ?: att.id - val attPreview = fwdInfo?.preview ?: att.preview - val attTransportTag = fwdInfo?.transportTag ?: att.transportTag - val attTransportServer = - fwdInfo?.transportServer ?: att.transportServer - val attLocalUri = if (includeLocalUri) fwdInfo?.localUri ?: att.localUri else "" - attachmentsArray.put( - JSONObject().apply { - put("id", attId) - put("type", att.type.value) - put("preview", attPreview) - put("width", att.width) - put("height", att.height) - put("blob", "") - put("transportTag", attTransportTag) - put("transportServer", attTransportServer) - put( - "transport", - JSONObject().apply { - put("transport_tag", attTransportTag) - put("transport_server", attTransportServer) - } - ) - if (includeLocalUri && attLocalUri.isNotEmpty()) { - put("localUri", attLocalUri) - } - } - ) - } - val effectiveForwardPlainKey = - if (fm.messageId in rewrittenForwardMessageIds && - outgoingForwardPlainKeyHex.isNotEmpty() - ) { - outgoingForwardPlainKeyHex - } else { - fm.chachaKeyPlain - } - replyJsonArray.put( - JSONObject().apply { - put("message_id", fm.messageId) - put("publicKey", fm.senderPublicKey) - put("message", fm.text) - put("timestamp", fm.timestamp) - put("attachments", attachmentsArray) - put("forwarded", true) - put("senderName", fm.senderName) - if (effectiveForwardPlainKey.isNotEmpty()) { - put("chacha_key_plain", effectiveForwardPlainKey) - } - } - ) - } - return replyJsonArray + return sendForwardUseCase.buildForwardReplyJson( + messages = forwardPayloadMessages, + rewrittenAttachments = forwardedAttMap, + rewrittenMessageIds = rewrittenForwardMessageIds, + outgoingForwardPlainKeyHex = outgoingForwardPlainKeyHex, + includeLocalUri = includeLocalUri, + rewriteKey = ::forwardAttachmentRewriteKey + ) } // 1) 🚀 Optimistic forward: мгновенно показываем сообщение в текущем диалоге @@ -4335,11 +4301,9 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) { val finalMessageAttachments = listOf( - MessageAttachment( - id = replyAttachmentId, - blob = encryptedReplyBlob, - type = AttachmentType.MESSAGES, - preview = "" + sendForwardUseCase.buildForwardAttachment( + replyAttachmentId = replyAttachmentId, + encryptedReplyBlob = encryptedReplyBlob ) ) @@ -4355,9 +4319,7 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) { this.messageId = messageId attachments = finalMessageAttachments } - if (!isSavedMessages) { - ProtocolManager.sendMessageWithRetry(packet) - } + sendForwardUseCase.dispatch(packet, isSavedMessages) val finalAttachmentsJson = JSONArray() @@ -4720,8 +4682,8 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) { } // Отправляем пакет + sendMediaMessageUseCase.dispatch(packet, isSavedMessages) if (!isSavedMessages) { - ProtocolManager.sendMessageWithRetry(packet) packetSentToProtocol = true logPhotoPipeline(messageId, "packet sent to protocol") } else { @@ -4920,9 +4882,7 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) { } // Отправляем пакет (без blob!) - if (!isSavedMessages) { - ProtocolManager.sendMessageWithRetry(packet) - } + sendMediaMessageUseCase.dispatch(packet, isSavedMessages) // 💾 Сохраняем изображение в файл локально (как в desktop) AttachmentFileManager.saveAttachment( @@ -5252,9 +5212,7 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) { attachments = networkAttachments } - if (!isSavedMessages) { - ProtocolManager.sendMessageWithRetry(packet) - } + sendMediaMessageUseCase.dispatch(packet, isSavedMessages) updateMessageStatusAndAttachmentsInDb( messageId = messageId, @@ -5474,8 +5432,8 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) { // Для Saved Messages не отправляем на сервер val isSavedMessages = (sender == recipient) + sendMediaMessageUseCase.dispatch(packet, isSavedMessages) if (!isSavedMessages) { - ProtocolManager.sendMessageWithRetry(packet) logPhotoPipeline(messageId, "group packet sent") } @@ -5656,9 +5614,7 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) { } // Отправляем пакет (без blob!) - if (!isSavedMessages) { - ProtocolManager.sendMessageWithRetry(packet) - } + sendMediaMessageUseCase.dispatch(packet, isSavedMessages) // ⚠️ НЕ сохраняем файл локально - они слишком большие // Файлы загружаются с Transport Server при необходимости @@ -6009,21 +5965,21 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) { transportServer = attachmentTransportServer ) - val packet = - PacketMessage().apply { - fromPublicKey = sender - toPublicKey = recipient - content = encryptedContent - chachaKey = encryptedKey - this.aesChachaKey = aesChachaKey - this.timestamp = timestamp - this.privateKey = privateKeyHash - this.messageId = messageId - attachments = listOf(videoAttachment) - } - + sendMediaMessageUseCase( + SendMediaMessageCommand( + fromPublicKey = sender, + toPublicKey = recipient, + encryptedContent = encryptedContent, + encryptedKey = encryptedKey, + aesChachaKey = aesChachaKey, + privateKeyHash = privateKeyHash, + messageId = messageId, + timestamp = timestamp, + mediaAttachments = listOf(videoAttachment), + isSavedMessages = isSavedMessages + ) + ) if (!isSavedMessages) { - ProtocolManager.sendMessageWithRetry(packet) packetSentToProtocol = true } @@ -6202,9 +6158,7 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) { attachments = listOf(voiceAttachment) } - if (!isSavedMessages) { - ProtocolManager.sendMessageWithRetry(packet) - } + sendMediaMessageUseCase.dispatch(packet, isSavedMessages) // Для отправителя сохраняем voice blob локально в encrypted cache. runCatching { @@ -6456,9 +6410,7 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) { attachments = listOf(avatarAttachment) } - if (!isSavedMessages) { - ProtocolManager.sendMessageWithRetry(packet) - } + sendMediaMessageUseCase.dispatch(packet, isSavedMessages) // 💾 Сохраняем аватар в файл локально (как IMAGE - с приватным ключом) AttachmentFileManager.saveAttachment( diff --git a/app/src/main/java/com/rosetta/messenger/ui/chats/ChatsListScreen.kt b/app/src/main/java/com/rosetta/messenger/ui/chats/ChatsListScreen.kt index a3e2271..31d327d 100644 --- a/app/src/main/java/com/rosetta/messenger/ui/chats/ChatsListScreen.kt +++ b/app/src/main/java/com/rosetta/messenger/ui/chats/ChatsListScreen.kt @@ -1855,10 +1855,10 @@ fun ChatsListScreen( ) } else if (syncInProgress) { AnimatedDotsText( - baseText = "Updating", - color = Color.White, + baseText = "Synchronizing", + fontWeight = FontWeight.Bold, fontSize = 20.sp, - fontWeight = FontWeight.Bold + color = Color.White ) } else { Text( @@ -5037,7 +5037,7 @@ fun TypingIndicatorSmall( val typingColor = if (isDarkTheme) PrimaryBlue else Color(0xFF34C759) val senderTypingColor = remember(typingSenderPublicKey, isDarkTheme) { - if (typingSenderPublicKey.isBlank()) { + if (typingSenderPublicKey.isBlank() || !isDarkTheme) { typingColor } else { getAvatarColor(typingSenderPublicKey, isDarkTheme).textColor diff --git a/app/src/main/java/com/rosetta/messenger/ui/chats/RequestsListScreen.kt b/app/src/main/java/com/rosetta/messenger/ui/chats/RequestsListScreen.kt index 76f07d4..cafe142 100644 --- a/app/src/main/java/com/rosetta/messenger/ui/chats/RequestsListScreen.kt +++ b/app/src/main/java/com/rosetta/messenger/ui/chats/RequestsListScreen.kt @@ -61,7 +61,7 @@ fun RequestsListScreen( }, title = { Text( - text = if (syncInProgress) "Updating..." else "Requests", + text = if (syncInProgress) "Synchronizing..." else "Requests", fontWeight = FontWeight.Bold, fontSize = 20.sp, color = Color.White diff --git a/app/src/main/java/com/rosetta/messenger/ui/chats/SearchUsersViewModel.kt b/app/src/main/java/com/rosetta/messenger/ui/chats/SearchUsersViewModel.kt index 3955c19..8b220bf 100644 --- a/app/src/main/java/com/rosetta/messenger/ui/chats/SearchUsersViewModel.kt +++ b/app/src/main/java/com/rosetta/messenger/ui/chats/SearchUsersViewModel.kt @@ -13,6 +13,7 @@ import kotlinx.coroutines.delay import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.launch /** @@ -39,37 +40,33 @@ class SearchUsersViewModel : ViewModel() { // Приватные переменные private var searchJob: Job? = null + private var packetFlowJob: Job? = null private var privateKeyHash: String = "" private val timeFormatter = SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault()) - // Callback для обработки ответа поиска - private val searchPacketHandler: (com.rosetta.messenger.network.Packet) -> Unit = handler@{ packet -> - if (packet is PacketSearch) { - logSearch( - "📥 PacketSearch response: search='${packet.search}', users=${packet.users.size}" - ) - // Desktop parity: любой ответ PacketSearch обновляет результаты - // пока в поле есть активный поисковый запрос. - if (_searchQuery.value.trim().isEmpty()) { - logSearch("⏭ Ignored response: query is empty") - return@handler - } - _searchResults.value = packet.users - _isSearching.value = false - logSearch("✅ Results updated") - } - } - init { - // Регистрируем обработчик пакетов поиска - ProtocolManager.waitPacket(0x03, searchPacketHandler) + packetFlowJob = + viewModelScope.launch { + ProtocolManager.packetFlow(0x03).collectLatest { packet -> + val searchPacket = packet as? PacketSearch ?: return@collectLatest + logSearch( + "📥 PacketSearch response: search='${searchPacket.search}', users=${searchPacket.users.size}" + ) + if (_searchQuery.value.trim().isEmpty()) { + logSearch("⏭ Ignored response: query is empty") + return@collectLatest + } + _searchResults.value = searchPacket.users + _isSearching.value = false + logSearch("✅ Results updated") + } + } } override fun onCleared() { super.onCleared() - // Отписываемся от пакетов при уничтожении ViewModel - ProtocolManager.unwaitPacket(0x03, searchPacketHandler) searchJob?.cancel() + packetFlowJob?.cancel() } /** diff --git a/app/src/main/java/com/rosetta/messenger/ui/chats/components/ChatDetailComponents.kt b/app/src/main/java/com/rosetta/messenger/ui/chats/components/ChatDetailComponents.kt index d2b1fdf..60de3b1 100644 --- a/app/src/main/java/com/rosetta/messenger/ui/chats/components/ChatDetailComponents.kt +++ b/app/src/main/java/com/rosetta/messenger/ui/chats/components/ChatDetailComponents.kt @@ -237,7 +237,7 @@ fun TypingIndicator( if (isDarkTheme) Color(0xFF54A9EB) else Color.White.copy(alpha = 0.7f) val senderTypingColor = remember(typingSenderPublicKey, isDarkTheme) { - if (typingSenderPublicKey.isBlank()) { + if (typingSenderPublicKey.isBlank() || !isDarkTheme) { typingColor } else { groupSenderLabelColor(typingSenderPublicKey, isDarkTheme) diff --git a/app/src/main/java/com/rosetta/messenger/ui/chats/usecase/SendForwardUseCase.kt b/app/src/main/java/com/rosetta/messenger/ui/chats/usecase/SendForwardUseCase.kt new file mode 100644 index 0000000..0b2a887 --- /dev/null +++ b/app/src/main/java/com/rosetta/messenger/ui/chats/usecase/SendForwardUseCase.kt @@ -0,0 +1,102 @@ +package com.rosetta.messenger.ui.chats.usecase + +import com.rosetta.messenger.network.AttachmentType +import com.rosetta.messenger.network.MessageAttachment +import com.rosetta.messenger.network.PacketMessage +import org.json.JSONArray +import org.json.JSONObject + +data class ForwardPayloadMessage( + val messageId: String, + val senderPublicKey: String, + val senderName: String, + val text: String, + val timestamp: Long, + val chachaKeyPlain: String, + val attachments: List +) + +class SendForwardUseCase( + private val sendWithRetry: (PacketMessage) -> Unit +) { + fun buildForwardReplyJson( + messages: List, + rewrittenAttachments: Map, + rewrittenMessageIds: Set, + outgoingForwardPlainKeyHex: String, + includeLocalUri: Boolean, + rewriteKey: (messageId: String, attachmentId: String) -> String + ): JSONArray { + val replyJsonArray = JSONArray() + messages.forEach { message -> + val attachmentsArray = JSONArray() + message.attachments.forEach { attachment -> + val rewritten = + rewrittenAttachments[rewriteKey(message.messageId, attachment.id)] + val effectiveAttachment = rewritten ?: attachment + attachmentsArray.put( + JSONObject().apply { + put("id", effectiveAttachment.id) + put("type", effectiveAttachment.type.value) + put("preview", effectiveAttachment.preview) + put("width", effectiveAttachment.width) + put("height", effectiveAttachment.height) + put("blob", "") + put("transportTag", effectiveAttachment.transportTag) + put("transportServer", effectiveAttachment.transportServer) + put( + "transport", + JSONObject().apply { + put("transport_tag", effectiveAttachment.transportTag) + put("transport_server", effectiveAttachment.transportServer) + } + ) + if (includeLocalUri && effectiveAttachment.localUri.isNotEmpty()) { + put("localUri", effectiveAttachment.localUri) + } + } + ) + } + + val effectiveForwardPlainKey = + if (message.messageId in rewrittenMessageIds && outgoingForwardPlainKeyHex.isNotEmpty()) { + outgoingForwardPlainKeyHex + } else { + message.chachaKeyPlain + } + + replyJsonArray.put( + JSONObject().apply { + put("message_id", message.messageId) + put("publicKey", message.senderPublicKey) + put("message", message.text) + put("timestamp", message.timestamp) + put("attachments", attachmentsArray) + put("forwarded", true) + put("senderName", message.senderName) + if (effectiveForwardPlainKey.isNotEmpty()) { + put("chacha_key_plain", effectiveForwardPlainKey) + } + } + ) + } + return replyJsonArray + } + + fun buildForwardAttachment( + replyAttachmentId: String, + encryptedReplyBlob: String + ): MessageAttachment = + MessageAttachment( + id = replyAttachmentId, + blob = encryptedReplyBlob, + type = AttachmentType.MESSAGES, + preview = "" + ) + + fun dispatch(packet: PacketMessage, isSavedMessages: Boolean) { + if (!isSavedMessages) { + sendWithRetry(packet) + } + } +} diff --git a/app/src/main/java/com/rosetta/messenger/ui/chats/usecase/SendMediaMessageUseCase.kt b/app/src/main/java/com/rosetta/messenger/ui/chats/usecase/SendMediaMessageUseCase.kt new file mode 100644 index 0000000..6190c69 --- /dev/null +++ b/app/src/main/java/com/rosetta/messenger/ui/chats/usecase/SendMediaMessageUseCase.kt @@ -0,0 +1,47 @@ +package com.rosetta.messenger.ui.chats.usecase + +import com.rosetta.messenger.network.MessageAttachment +import com.rosetta.messenger.network.PacketMessage + +data class SendMediaMessageCommand( + val fromPublicKey: String, + val toPublicKey: String, + val encryptedContent: String, + val encryptedKey: String, + val aesChachaKey: String, + val privateKeyHash: String, + val messageId: String, + val timestamp: Long, + val mediaAttachments: List, + val isSavedMessages: Boolean +) + +class SendMediaMessageUseCase( + private val sendWithRetry: (PacketMessage) -> Unit +) { + operator fun invoke(command: SendMediaMessageCommand): PacketMessage { + val packet = + PacketMessage().apply { + fromPublicKey = command.fromPublicKey + toPublicKey = command.toPublicKey + content = command.encryptedContent + chachaKey = command.encryptedKey + aesChachaKey = command.aesChachaKey + privateKey = command.privateKeyHash + messageId = command.messageId + timestamp = command.timestamp + attachments = command.mediaAttachments + } + + if (!command.isSavedMessages) { + sendWithRetry(packet) + } + return packet + } + + fun dispatch(packet: PacketMessage, isSavedMessages: Boolean) { + if (!isSavedMessages) { + sendWithRetry(packet) + } + } +} diff --git a/app/src/main/java/com/rosetta/messenger/ui/chats/usecase/SendTextMessageUseCase.kt b/app/src/main/java/com/rosetta/messenger/ui/chats/usecase/SendTextMessageUseCase.kt new file mode 100644 index 0000000..8af62bf --- /dev/null +++ b/app/src/main/java/com/rosetta/messenger/ui/chats/usecase/SendTextMessageUseCase.kt @@ -0,0 +1,47 @@ +package com.rosetta.messenger.ui.chats.usecase + +import com.rosetta.messenger.network.MessageAttachment +import com.rosetta.messenger.network.PacketMessage + +data class SendTextMessageCommand( + val fromPublicKey: String, + val toPublicKey: String, + val encryptedContent: String, + val encryptedKey: String, + val aesChachaKey: String, + val privateKeyHash: String, + val messageId: String, + val timestamp: Long, + val attachments: List = emptyList(), + val isSavedMessages: Boolean +) + +class SendTextMessageUseCase( + private val sendWithRetry: (PacketMessage) -> Unit +) { + operator fun invoke(command: SendTextMessageCommand): PacketMessage { + val packet = + PacketMessage().apply { + fromPublicKey = command.fromPublicKey + toPublicKey = command.toPublicKey + content = command.encryptedContent + chachaKey = command.encryptedKey + aesChachaKey = command.aesChachaKey + privateKey = command.privateKeyHash + messageId = command.messageId + timestamp = command.timestamp + attachments = command.attachments + } + + if (!command.isSavedMessages) { + sendWithRetry(packet) + } + return packet + } + + fun dispatch(packet: PacketMessage, isSavedMessages: Boolean) { + if (!isSavedMessages) { + sendWithRetry(packet) + } + } +} diff --git a/app/src/main/java/com/rosetta/messenger/ui/settings/ProfileViewModel.kt b/app/src/main/java/com/rosetta/messenger/ui/settings/ProfileViewModel.kt index 4582fdc..f257c6c 100644 --- a/app/src/main/java/com/rosetta/messenger/ui/settings/ProfileViewModel.kt +++ b/app/src/main/java/com/rosetta/messenger/ui/settings/ProfileViewModel.kt @@ -5,12 +5,12 @@ import androidx.lifecycle.AndroidViewModel import androidx.lifecycle.viewModelScope import com.rosetta.messenger.data.AccountManager import com.rosetta.messenger.data.EncryptedAccount -import com.rosetta.messenger.network.Packet import com.rosetta.messenger.network.PacketResult import com.rosetta.messenger.network.PacketUserInfo import com.rosetta.messenger.network.ProtocolManager import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.flow.collectLatest import kotlinx.coroutines.launch import kotlinx.coroutines.delay @@ -29,22 +29,21 @@ class ProfileViewModel(application: Application) : AndroidViewModel(application) private val _state = MutableStateFlow(ProfileState()) val state: StateFlow = _state - private var resultCallback: ((Packet) -> Unit)? = null + private var packetFlowJob: kotlinx.coroutines.Job? = null init { - // Register listener for PacketResult (0x02) - resultCallback = { packet -> - if (packet is PacketResult) { - handlePacketResult(packet) + packetFlowJob = + viewModelScope.launch { + ProtocolManager.packetFlow(0x02).collectLatest { packet -> + val result = packet as? PacketResult ?: return@collectLatest + handlePacketResult(result) + } } - } - ProtocolManager.waitPacket(0x02, resultCallback!!) } override fun onCleared() { super.onCleared() - // Unregister listener - resultCallback?.let { ProtocolManager.unwaitPacket(0x02, it) } + packetFlowJob?.cancel() } private fun addLog(message: String) {