Надёжный фикс Protocol: singleton, connection generation и single-flight reconnect через Mutex
All checks were successful
Android Kernel Build / build (push) Successful in 21m24s

This commit is contained in:
2026-04-17 00:39:46 +05:00
parent 664f9fd7ae
commit 53e2119feb
2 changed files with 128 additions and 22 deletions

View File

@@ -4,10 +4,14 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import okhttp3.*
import okio.ByteString
import java.util.Locale
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
/**
* Protocol connection states
@@ -42,6 +46,7 @@ class Protocol(
private const val HEARTBEAT_OK_LOG_THROTTLE_MS = 30_000L
private const val HEX_PREVIEW_BYTES = 64
private const val TEXT_PREVIEW_CHARS = 80
private val INSTANCE_COUNTER = AtomicInteger(0)
}
private fun log(message: String) {
@@ -186,6 +191,10 @@ class Protocol(
private var connectingSinceMs = 0L
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private val lifecycleMutex = Mutex()
private val connectionGeneration = AtomicLong(0L)
@Volatile private var activeConnectionGeneration: Long = 0L
private val instanceId = INSTANCE_COUNTER.incrementAndGet()
private val _state = MutableStateFlow(ProtocolState.DISCONNECTED)
val state: StateFlow<ProtocolState> = _state.asStateFlow()
@@ -217,6 +226,45 @@ class Protocol(
}
}
}
private fun launchLifecycleOperation(operation: String, block: suspend () -> Unit) {
scope.launch {
lifecycleMutex.withLock {
try {
block()
} catch (e: CancellationException) {
throw e
} catch (e: Exception) {
log("❌ Lifecycle operation '$operation' failed: ${e.message}")
e.printStackTrace()
}
}
}
}
private fun rotateConnectionGeneration(reason: String): Long {
val generation = connectionGeneration.incrementAndGet()
activeConnectionGeneration = generation
log("🧬 CONNECTION GENERATION: #$generation ($reason, instance=$instanceId)")
return generation
}
private fun isStaleSocketEvent(event: String, generation: Long, socket: WebSocket): Boolean {
val currentGeneration = activeConnectionGeneration
val activeSocket = webSocket
val staleByGeneration = generation != currentGeneration
val staleBySocket = activeSocket != null && activeSocket !== socket
if (!staleByGeneration && !staleBySocket) {
return false
}
log(
"🧊 STALE SOCKET EVENT ignored: event=$event gen=$generation activeGen=$currentGeneration " +
"sameSocket=${activeSocket === socket} instance=$instanceId"
)
runCatching { socket.close(1000, "Stale socket event") }
return true
}
private val _lastError = MutableStateFlow<String?>(null)
val lastError: StateFlow<String?> = _lastError.asStateFlow()
@@ -273,6 +321,8 @@ class Protocol(
)
init {
log("🧩 Protocol init: instance=$instanceId")
// Register handshake response handler
waitPacket(0x00) { packet ->
if (packet is PacketHandshake) {
@@ -415,7 +465,7 @@ class Protocol(
// Триггерим reconnect если heartbeat не прошёл
if (!isManuallyClosed) {
log("🔄 TRIGGERING RECONNECT due to failed heartbeat")
handleDisconnect()
handleDisconnect("heartbeat_failed")
}
}
} else {
@@ -433,9 +483,15 @@ class Protocol(
* Initialize connection to server
*/
fun connect() {
launchLifecycleOperation("connect") {
connectLocked()
}
}
private fun connectLocked() {
val currentState = _state.value
val now = System.currentTimeMillis()
log("🔌 CONNECT CALLED: currentState=$currentState, reconnectAttempts=$reconnectAttempts, isConnecting=$isConnecting")
log("🔌 CONNECT CALLED: currentState=$currentState, reconnectAttempts=$reconnectAttempts, isConnecting=$isConnecting, instance=$instanceId")
// КРИТИЧНО: Если уже подключены и аутентифицированы - не переподключаемся!
if (
@@ -488,6 +544,7 @@ class Protocol(
reconnectAttempts++
log("📊 RECONNECT ATTEMPT #$reconnectAttempts")
val generation = rotateConnectionGeneration("connect_attempt_$reconnectAttempts")
// Закрываем старый сокет если есть (как в Архиве)
webSocket?.let { oldSocket ->
@@ -512,7 +569,8 @@ class Protocol(
webSocket = client.newWebSocket(request, object : WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
log("✅ WebSocket OPEN: response=${response.code}, hasCredentials=${lastPublicKey != null}")
if (isStaleSocketEvent("onOpen", generation, webSocket)) return
log("✅ WebSocket OPEN: response=${response.code}, hasCredentials=${lastPublicKey != null}, gen=$generation")
// Сбрасываем флаг подключения
isConnecting = false
@@ -538,15 +596,18 @@ class Protocol(
}
override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
if (isStaleSocketEvent("onMessage(bytes)", generation, webSocket)) return
log("📥 onMessage called - ${bytes.size} bytes")
handleMessage(bytes.toByteArray())
}
override fun onMessage(webSocket: WebSocket, text: String) {
if (isStaleSocketEvent("onMessage(text)", generation, webSocket)) return
log("Received text message (unexpected): $text")
}
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
if (isStaleSocketEvent("onClosing", generation, webSocket)) return
log("⚠️ WebSocket CLOSING: code=$code reason='$reason' state=${_state.value}")
// Must respond with close() so OkHttp transitions to onClosed.
// Without this, the socket stays in a half-closed "zombie" state —
@@ -560,23 +621,26 @@ class Protocol(
}
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
log("❌ WebSocket CLOSED: code=$code reason='$reason' state=${_state.value} manuallyClosed=$isManuallyClosed")
if (isStaleSocketEvent("onClosed", generation, webSocket)) return
log("❌ WebSocket CLOSED: code=$code reason='$reason' state=${_state.value} manuallyClosed=$isManuallyClosed gen=$generation")
isConnecting = false // Сбрасываем флаг
connectingSinceMs = 0L
handleDisconnect()
handleDisconnect("onClosed")
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
if (isStaleSocketEvent("onFailure", generation, webSocket)) return
log("❌ WebSocket FAILURE: ${t.message}")
log(" Response: ${response?.code} ${response?.message}")
log(" State: ${_state.value}")
log(" Manually closed: $isManuallyClosed")
log(" Reconnect attempts: $reconnectAttempts")
log(" Generation: $generation")
t.printStackTrace()
isConnecting = false // Сбрасываем флаг
connectingSinceMs = 0L
_lastError.value = t.message
handleDisconnect()
handleDisconnect("onFailure")
}
})
}
@@ -606,8 +670,10 @@ class Protocol(
// If switching accounts, force disconnect and reconnect with new credentials
if (switchingAccount) {
log("🔄 Account switch detected, forcing reconnect with new credentials")
disconnect()
connect() // Will auto-handshake with saved credentials (publicKey, privateHash) on connect
launchLifecycleOperation("account_switch_reconnect") {
disconnectLocked(manual = false, reason = "Account switch reconnect")
connectLocked() // Will auto-handshake with saved credentials on connect
}
return
}
@@ -797,9 +863,18 @@ class Protocol(
}
}
private fun handleDisconnect() {
private fun handleDisconnect(source: String = "unknown") {
launchLifecycleOperation("handle_disconnect:$source") {
handleDisconnectLocked(source)
}
}
private fun handleDisconnectLocked(source: String) {
val previousState = _state.value
log("🔌 DISCONNECT HANDLER: previousState=$previousState, manuallyClosed=$isManuallyClosed, reconnectAttempts=$reconnectAttempts, isConnecting=$isConnecting")
log(
"🔌 DISCONNECT HANDLER: source=$source previousState=$previousState, manuallyClosed=$isManuallyClosed, " +
"reconnectAttempts=$reconnectAttempts, isConnecting=$isConnecting, instance=$instanceId"
)
// Duplicate callbacks are possible (e.g. heartbeat failure + onFailure/onClosed).
// If we are already disconnected and a reconnect is pending, avoid scheduling another one.
@@ -813,6 +888,8 @@ class Protocol(
log("⚠️ DISCONNECT IGNORED: connection already in progress")
return
}
rotateConnectionGeneration("disconnect:$source")
setState(ProtocolState.DISCONNECTED, "Disconnect handler called from $previousState")
handshakeComplete = false
@@ -880,8 +957,14 @@ class Protocol(
* Disconnect from server
*/
fun disconnect() {
log("🔌 Manual disconnect requested")
isManuallyClosed = true
launchLifecycleOperation("disconnect_manual") {
disconnectLocked(manual = true, reason = "User disconnected")
}
}
private fun disconnectLocked(manual: Boolean, reason: String) {
log("🔌 Disconnect requested: manual=$manual reason='$reason' instance=$instanceId")
isManuallyClosed = manual
isConnecting = false // Сбрасываем флаг
connectingSinceMs = 0L
reconnectJob?.cancel() // Отменяем запланированные переподключения
@@ -889,9 +972,12 @@ class Protocol(
handshakeJob?.cancel()
heartbeatJob?.cancel()
heartbeatPeriodMs = 0L
webSocket?.close(1000, "User disconnected")
rotateConnectionGeneration("disconnect_locked:${if (manual) "manual" else "internal"}")
val socket = webSocket
webSocket = null
_state.value = ProtocolState.DISCONNECTED
runCatching { socket?.close(1000, reason) }
setState(ProtocolState.DISCONNECTED, "disconnectLocked(manual=$manual, reason=$reason)")
}
/**
@@ -904,6 +990,12 @@ class Protocol(
* on app resume we should not wait scheduled exponential backoff.
*/
fun reconnectNowIfNeeded(reason: String = "foreground") {
launchLifecycleOperation("fast_reconnect:$reason") {
reconnectNowIfNeededLocked(reason)
}
}
private fun reconnectNowIfNeededLocked(reason: String) {
val currentState = _state.value
val hasCredentials = !lastPublicKey.isNullOrBlank() && !lastPrivateHash.isNullOrBlank()
val now = System.currentTimeMillis()
@@ -929,6 +1021,7 @@ class Protocol(
connectingSinceMs = 0L
runCatching { webSocket?.cancel() }
webSocket = null
rotateConnectionGeneration("fast_reconnect_reset:$reason")
setState(ProtocolState.DISCONNECTED, "Fast reconnect reset stuck CONNECTING")
} else if (
currentState == ProtocolState.AUTHENTICATED ||
@@ -943,7 +1036,7 @@ class Protocol(
reconnectAttempts = 0
reconnectJob?.cancel()
reconnectJob = null
connect()
connectLocked()
}
/**
@@ -966,7 +1059,11 @@ class Protocol(
* Release resources
*/
fun destroy() {
disconnect()
runBlocking {
lifecycleMutex.withLock {
disconnectLocked(manual = true, reason = "Destroy protocol")
}
}
heartbeatJob?.cancel()
scope.cancel()
}

View File

@@ -52,11 +52,12 @@ object ProtocolManager {
private const val DEVICE_ID_KEY = "device_id"
private const val DEVICE_ID_LENGTH = 128
private var protocol: Protocol? = null
@Volatile private var protocol: Protocol? = null
private var messageRepository: MessageRepository? = null
private var groupRepository: GroupRepository? = null
private var appContext: Context? = null
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
private val protocolInstanceLock = Any()
@Volatile private var packetHandlersRegistered = false
@Volatile private var stateMonitoringStarted = false
@@ -1109,16 +1110,22 @@ object ProtocolManager {
* Get or create Protocol instance
*/
fun getProtocol(): Protocol {
if (protocol == null) {
protocol =
protocol?.let { return it }
synchronized(protocolInstanceLock) {
protocol?.let { return it }
val created =
Protocol(
serverAddress = SERVER_ADDRESS,
logger = { msg -> addLog(msg) },
isNetworkAvailable = { hasActiveInternet() },
onNetworkUnavailable = { waitForNetworkAndReconnect("protocol_connect") }
)
protocol = created
addLog("🧩 Protocol singleton created: id=${System.identityHashCode(created)}")
return created
}
return protocol!!
}
/**
@@ -1758,8 +1765,10 @@ object ProtocolManager {
*/
fun destroy() {
stopWaitingForNetwork("destroy")
protocol?.destroy()
protocol = null
synchronized(protocolInstanceLock) {
protocol?.destroy()
protocol = null
}
messageRepository?.clearInitialization()
clearTypingState()
_devices.value = emptyList()