diff --git a/app/src/main/java/com/rosetta/messenger/network/Protocol.kt b/app/src/main/java/com/rosetta/messenger/network/Protocol.kt index eeeea78..14747b8 100644 --- a/app/src/main/java/com/rosetta/messenger/network/Protocol.kt +++ b/app/src/main/java/com/rosetta/messenger/network/Protocol.kt @@ -4,10 +4,14 @@ import kotlinx.coroutines.* import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import okhttp3.* import okio.ByteString import java.util.Locale import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong /** * Protocol connection states @@ -42,6 +46,7 @@ class Protocol( private const val HEARTBEAT_OK_LOG_THROTTLE_MS = 30_000L private const val HEX_PREVIEW_BYTES = 64 private const val TEXT_PREVIEW_CHARS = 80 + private val INSTANCE_COUNTER = AtomicInteger(0) } private fun log(message: String) { @@ -186,6 +191,10 @@ class Protocol( private var connectingSinceMs = 0L private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) + private val lifecycleMutex = Mutex() + private val connectionGeneration = AtomicLong(0L) + @Volatile private var activeConnectionGeneration: Long = 0L + private val instanceId = INSTANCE_COUNTER.incrementAndGet() private val _state = MutableStateFlow(ProtocolState.DISCONNECTED) val state: StateFlow = _state.asStateFlow() @@ -217,6 +226,45 @@ class Protocol( } } } + + private fun launchLifecycleOperation(operation: String, block: suspend () -> Unit) { + scope.launch { + lifecycleMutex.withLock { + try { + block() + } catch (e: CancellationException) { + throw e + } catch (e: Exception) { + log("❌ Lifecycle operation '$operation' failed: ${e.message}") + e.printStackTrace() + } + } + } + } + + private fun rotateConnectionGeneration(reason: String): Long { + val generation = connectionGeneration.incrementAndGet() + activeConnectionGeneration = generation + log("🧬 CONNECTION GENERATION: #$generation ($reason, instance=$instanceId)") + return generation + } + + private fun isStaleSocketEvent(event: String, generation: Long, socket: WebSocket): Boolean { + val currentGeneration = activeConnectionGeneration + val activeSocket = webSocket + val staleByGeneration = generation != currentGeneration + val staleBySocket = activeSocket != null && activeSocket !== socket + if (!staleByGeneration && !staleBySocket) { + return false + } + + log( + "🧊 STALE SOCKET EVENT ignored: event=$event gen=$generation activeGen=$currentGeneration " + + "sameSocket=${activeSocket === socket} instance=$instanceId" + ) + runCatching { socket.close(1000, "Stale socket event") } + return true + } private val _lastError = MutableStateFlow(null) val lastError: StateFlow = _lastError.asStateFlow() @@ -273,6 +321,8 @@ class Protocol( ) init { + log("🧩 Protocol init: instance=$instanceId") + // Register handshake response handler waitPacket(0x00) { packet -> if (packet is PacketHandshake) { @@ -415,7 +465,7 @@ class Protocol( // Триггерим reconnect если heartbeat не прошёл if (!isManuallyClosed) { log("🔄 TRIGGERING RECONNECT due to failed heartbeat") - handleDisconnect() + handleDisconnect("heartbeat_failed") } } } else { @@ -433,9 +483,15 @@ class Protocol( * Initialize connection to server */ fun connect() { + launchLifecycleOperation("connect") { + connectLocked() + } + } + + private fun connectLocked() { val currentState = _state.value val now = System.currentTimeMillis() - log("🔌 CONNECT CALLED: currentState=$currentState, reconnectAttempts=$reconnectAttempts, isConnecting=$isConnecting") + log("🔌 CONNECT CALLED: currentState=$currentState, reconnectAttempts=$reconnectAttempts, isConnecting=$isConnecting, instance=$instanceId") // КРИТИЧНО: Если уже подключены и аутентифицированы - не переподключаемся! if ( @@ -488,6 +544,7 @@ class Protocol( reconnectAttempts++ log("📊 RECONNECT ATTEMPT #$reconnectAttempts") + val generation = rotateConnectionGeneration("connect_attempt_$reconnectAttempts") // Закрываем старый сокет если есть (как в Архиве) webSocket?.let { oldSocket -> @@ -512,7 +569,8 @@ class Protocol( webSocket = client.newWebSocket(request, object : WebSocketListener() { override fun onOpen(webSocket: WebSocket, response: Response) { - log("✅ WebSocket OPEN: response=${response.code}, hasCredentials=${lastPublicKey != null}") + if (isStaleSocketEvent("onOpen", generation, webSocket)) return + log("✅ WebSocket OPEN: response=${response.code}, hasCredentials=${lastPublicKey != null}, gen=$generation") // Сбрасываем флаг подключения isConnecting = false @@ -538,15 +596,18 @@ class Protocol( } override fun onMessage(webSocket: WebSocket, bytes: ByteString) { + if (isStaleSocketEvent("onMessage(bytes)", generation, webSocket)) return log("📥 onMessage called - ${bytes.size} bytes") handleMessage(bytes.toByteArray()) } override fun onMessage(webSocket: WebSocket, text: String) { + if (isStaleSocketEvent("onMessage(text)", generation, webSocket)) return log("Received text message (unexpected): $text") } override fun onClosing(webSocket: WebSocket, code: Int, reason: String) { + if (isStaleSocketEvent("onClosing", generation, webSocket)) return log("⚠️ WebSocket CLOSING: code=$code reason='$reason' state=${_state.value}") // Must respond with close() so OkHttp transitions to onClosed. // Without this, the socket stays in a half-closed "zombie" state — @@ -560,23 +621,26 @@ class Protocol( } override fun onClosed(webSocket: WebSocket, code: Int, reason: String) { - log("❌ WebSocket CLOSED: code=$code reason='$reason' state=${_state.value} manuallyClosed=$isManuallyClosed") + if (isStaleSocketEvent("onClosed", generation, webSocket)) return + log("❌ WebSocket CLOSED: code=$code reason='$reason' state=${_state.value} manuallyClosed=$isManuallyClosed gen=$generation") isConnecting = false // Сбрасываем флаг connectingSinceMs = 0L - handleDisconnect() + handleDisconnect("onClosed") } override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) { + if (isStaleSocketEvent("onFailure", generation, webSocket)) return log("❌ WebSocket FAILURE: ${t.message}") log(" Response: ${response?.code} ${response?.message}") log(" State: ${_state.value}") log(" Manually closed: $isManuallyClosed") log(" Reconnect attempts: $reconnectAttempts") + log(" Generation: $generation") t.printStackTrace() isConnecting = false // Сбрасываем флаг connectingSinceMs = 0L _lastError.value = t.message - handleDisconnect() + handleDisconnect("onFailure") } }) } @@ -606,8 +670,10 @@ class Protocol( // If switching accounts, force disconnect and reconnect with new credentials if (switchingAccount) { log("🔄 Account switch detected, forcing reconnect with new credentials") - disconnect() - connect() // Will auto-handshake with saved credentials (publicKey, privateHash) on connect + launchLifecycleOperation("account_switch_reconnect") { + disconnectLocked(manual = false, reason = "Account switch reconnect") + connectLocked() // Will auto-handshake with saved credentials on connect + } return } @@ -797,9 +863,18 @@ class Protocol( } } - private fun handleDisconnect() { + private fun handleDisconnect(source: String = "unknown") { + launchLifecycleOperation("handle_disconnect:$source") { + handleDisconnectLocked(source) + } + } + + private fun handleDisconnectLocked(source: String) { val previousState = _state.value - log("🔌 DISCONNECT HANDLER: previousState=$previousState, manuallyClosed=$isManuallyClosed, reconnectAttempts=$reconnectAttempts, isConnecting=$isConnecting") + log( + "🔌 DISCONNECT HANDLER: source=$source previousState=$previousState, manuallyClosed=$isManuallyClosed, " + + "reconnectAttempts=$reconnectAttempts, isConnecting=$isConnecting, instance=$instanceId" + ) // Duplicate callbacks are possible (e.g. heartbeat failure + onFailure/onClosed). // If we are already disconnected and a reconnect is pending, avoid scheduling another one. @@ -813,6 +888,8 @@ class Protocol( log("⚠️ DISCONNECT IGNORED: connection already in progress") return } + + rotateConnectionGeneration("disconnect:$source") setState(ProtocolState.DISCONNECTED, "Disconnect handler called from $previousState") handshakeComplete = false @@ -880,8 +957,14 @@ class Protocol( * Disconnect from server */ fun disconnect() { - log("🔌 Manual disconnect requested") - isManuallyClosed = true + launchLifecycleOperation("disconnect_manual") { + disconnectLocked(manual = true, reason = "User disconnected") + } + } + + private fun disconnectLocked(manual: Boolean, reason: String) { + log("🔌 Disconnect requested: manual=$manual reason='$reason' instance=$instanceId") + isManuallyClosed = manual isConnecting = false // Сбрасываем флаг connectingSinceMs = 0L reconnectJob?.cancel() // Отменяем запланированные переподключения @@ -889,9 +972,12 @@ class Protocol( handshakeJob?.cancel() heartbeatJob?.cancel() heartbeatPeriodMs = 0L - webSocket?.close(1000, "User disconnected") + rotateConnectionGeneration("disconnect_locked:${if (manual) "manual" else "internal"}") + + val socket = webSocket webSocket = null - _state.value = ProtocolState.DISCONNECTED + runCatching { socket?.close(1000, reason) } + setState(ProtocolState.DISCONNECTED, "disconnectLocked(manual=$manual, reason=$reason)") } /** @@ -904,6 +990,12 @@ class Protocol( * on app resume we should not wait scheduled exponential backoff. */ fun reconnectNowIfNeeded(reason: String = "foreground") { + launchLifecycleOperation("fast_reconnect:$reason") { + reconnectNowIfNeededLocked(reason) + } + } + + private fun reconnectNowIfNeededLocked(reason: String) { val currentState = _state.value val hasCredentials = !lastPublicKey.isNullOrBlank() && !lastPrivateHash.isNullOrBlank() val now = System.currentTimeMillis() @@ -929,6 +1021,7 @@ class Protocol( connectingSinceMs = 0L runCatching { webSocket?.cancel() } webSocket = null + rotateConnectionGeneration("fast_reconnect_reset:$reason") setState(ProtocolState.DISCONNECTED, "Fast reconnect reset stuck CONNECTING") } else if ( currentState == ProtocolState.AUTHENTICATED || @@ -943,7 +1036,7 @@ class Protocol( reconnectAttempts = 0 reconnectJob?.cancel() reconnectJob = null - connect() + connectLocked() } /** @@ -966,7 +1059,11 @@ class Protocol( * Release resources */ fun destroy() { - disconnect() + runBlocking { + lifecycleMutex.withLock { + disconnectLocked(manual = true, reason = "Destroy protocol") + } + } heartbeatJob?.cancel() scope.cancel() } diff --git a/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt b/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt index 2f2d427..3da4aaf 100644 --- a/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt +++ b/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt @@ -52,11 +52,12 @@ object ProtocolManager { private const val DEVICE_ID_KEY = "device_id" private const val DEVICE_ID_LENGTH = 128 - private var protocol: Protocol? = null + @Volatile private var protocol: Protocol? = null private var messageRepository: MessageRepository? = null private var groupRepository: GroupRepository? = null private var appContext: Context? = null private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) + private val protocolInstanceLock = Any() @Volatile private var packetHandlersRegistered = false @Volatile private var stateMonitoringStarted = false @@ -1109,16 +1110,22 @@ object ProtocolManager { * Get or create Protocol instance */ fun getProtocol(): Protocol { - if (protocol == null) { - protocol = + protocol?.let { return it } + + synchronized(protocolInstanceLock) { + protocol?.let { return it } + + val created = Protocol( serverAddress = SERVER_ADDRESS, logger = { msg -> addLog(msg) }, isNetworkAvailable = { hasActiveInternet() }, onNetworkUnavailable = { waitForNetworkAndReconnect("protocol_connect") } ) + protocol = created + addLog("🧩 Protocol singleton created: id=${System.identityHashCode(created)}") + return created } - return protocol!! } /** @@ -1758,8 +1765,10 @@ object ProtocolManager { */ fun destroy() { stopWaitingForNetwork("destroy") - protocol?.destroy() - protocol = null + synchronized(protocolInstanceLock) { + protocol?.destroy() + protocol = null + } messageRepository?.clearInitialization() clearTypingState() _devices.value = emptyList()