diff --git a/app/build.gradle.kts b/app/build.gradle.kts index c509dd0..96f6ec3 100644 --- a/app/build.gradle.kts +++ b/app/build.gradle.kts @@ -23,8 +23,8 @@ val gitShortSha = safeGitOutput("rev-parse", "--short", "HEAD") ?: "unknown" // ═══════════════════════════════════════════════════════════ // Rosetta versioning — bump here on each release // ═══════════════════════════════════════════════════════════ -val rosettaVersionName = "1.0.11" -val rosettaVersionCode = 11 // Increment on each release +val rosettaVersionName = "1.1.0" +val rosettaVersionCode = 12 // Increment on each release android { namespace = "com.rosetta.messenger" diff --git a/app/src/main/java/com/rosetta/messenger/data/ReleaseNotes.kt b/app/src/main/java/com/rosetta/messenger/data/ReleaseNotes.kt index 2f29c43..50edff2 100644 --- a/app/src/main/java/com/rosetta/messenger/data/ReleaseNotes.kt +++ b/app/src/main/java/com/rosetta/messenger/data/ReleaseNotes.kt @@ -18,14 +18,8 @@ object ReleaseNotes { Update v$VERSION_PLACEHOLDER Синхронизация сообщений - - Исправлен недолет сообщений после оффлайна при массовой отправке (спам-тест) - - Исправлен сценарий, когда синхронизация останавливалась на первой пачке - - Нормализуется sync-cursor (last_sync), включая поврежденные timestamp - - Следующий sync-запрос отправляется с безопасным timestamp - - Стабильность протокола - - Улучшена защита чтения строк из бинарного потока - - Ошибки внутри батча больше не клинят дальнейшую догрузку пакетов + - Исправлен бесконечный цикл синхронизации, когда сервер возвращал пустые батчи с неизменным курсором + - Вынесена общая логика завершения sync-цикла для единообразной обработки всех сценариев """.trimIndent() fun getNotice(version: String): String = diff --git a/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt b/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt index 804f4bb..f1d6bed 100644 --- a/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt +++ b/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt @@ -10,12 +10,13 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.StateFlow import kotlinx.coroutines.flow.asStateFlow +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import java.security.SecureRandom import java.util.* import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.atomic.AtomicLong import kotlin.coroutines.resume /** @@ -26,9 +27,9 @@ object ProtocolManager { private const val TAG = "ProtocolManager" private const val MANUAL_SYNC_BACKTRACK_MS = 120_000L private const val MAX_SYNC_FUTURE_DRIFT_MS = 86_400_000L // 24h + private const val MAX_STALLED_SYNC_BATCHES = 12 private const val MAX_DEBUG_LOGS = 600 private const val DEBUG_LOG_FLUSH_DELAY_MS = 60L - private const val INBOUND_TASK_TIMEOUT_MS = 20_000L // Server address - same as React Native version private const val SERVER_ADDRESS = "ws://46.28.71.12:3000" @@ -40,6 +41,9 @@ object ProtocolManager { private var messageRepository: MessageRepository? = null private var appContext: Context? = null private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob()) + @Volatile private var packetHandlersRegistered = false + @Volatile private var stateMonitoringStarted = false + @Volatile private var syncRequestInFlight = false // Guard: prevent duplicate FCM token subscribe within a single session @Volatile @@ -90,10 +94,15 @@ object ProtocolManager { // Tracks the tail of the sequential processing chain (like desktop's `tail` promise) @Volatile private var inboundQueueDrainJob: Job? = null private val inboundProcessingFailures = AtomicInteger(0) - private val syncBatchMaxProcessedTimestamp = AtomicLong(0L) + private val syncBatchMessageCount = AtomicInteger(0) + private val stalledSyncBatchCount = AtomicInteger(0) + private val syncBatchEndMutex = Mutex() private fun setSyncInProgress(value: Boolean) { syncBatchInProgress = value + if (!value) { + stalledSyncBatchCount.set(0) + } if (_syncInProgress.value != value) { _syncInProgress.value = value } @@ -160,8 +169,14 @@ object ProtocolManager { fun initialize(context: Context) { appContext = context.applicationContext messageRepository = MessageRepository.getInstance(context) - setupPacketHandlers() - setupStateMonitoring() + if (!packetHandlersRegistered) { + setupPacketHandlers() + packetHandlersRegistered = true + } + if (!stateMonitoringStarted) { + setupStateMonitoring() + stateMonitoringStarted = true + } } /** @@ -175,6 +190,7 @@ object ProtocolManager { onAuthenticated() } if (newState != ProtocolState.AUTHENTICATED && newState != ProtocolState.HANDSHAKING) { + syncRequestInFlight = false setSyncInProgress(false) } lastProtocolState = newState @@ -239,10 +255,8 @@ object ProtocolManager { } if (!syncBatchInProgress) { repository.updateLastSyncTimestamp(messagePacket.timestamp) - } else if (messagePacket.timestamp > 0L) { - syncBatchMaxProcessedTimestamp.accumulateAndGet(messagePacket.timestamp) { prev, cur -> - if (cur > prev) cur else prev - } + } else { + syncBatchMessageCount.incrementAndGet() } // ✅ Send delivery ACK only AFTER message is safely stored in DB. // Skip for own sync messages (no need to ACK yourself). @@ -455,14 +469,9 @@ object ProtocolManager { inboundQueueDrainJob = scope.launch { for (task in inboundTaskChannel) { try { - withTimeout(INBOUND_TASK_TIMEOUT_MS) { - task() - } + task() } catch (t: Throwable) { - markInboundProcessingFailure( - "Dialog queue task failed or timed out (${INBOUND_TASK_TIMEOUT_MS}ms)", - t - ) + markInboundProcessingFailure("Dialog queue error", t) } } } @@ -551,6 +560,10 @@ object ProtocolManager { } private fun finishSyncCycle(reason: String) { + syncRequestInFlight = false + stalledSyncBatchCount.set(0) + syncBatchMessageCount.set(0) + inboundProcessingFailures.set(0) addLog(reason) setSyncInProgress(false) retryWaitingMessages() @@ -606,16 +619,21 @@ object ProtocolManager { } private fun requestSynchronize() { - // Desktop parity: set syncBatchInProgress=true BEFORE sending the sync request. - // This closes the race window between AUTHENTICATED → BATCH_START where real-time - // messages could arrive and update lastSync, potentially advancing the cursor past - // messages the server hasn't delivered yet. - setSyncInProgress(true) + if (syncBatchInProgress) { + addLog("⚠️ SYNC request skipped: sync already in progress") + return + } + if (syncRequestInFlight) { + addLog("⚠️ SYNC request skipped: previous request still in flight") + return + } + stalledSyncBatchCount.set(0) + syncRequestInFlight = true addLog("🔄 SYNC requested — fetching last sync timestamp...") scope.launch { val repository = messageRepository if (repository == null || !repository.isInitialized()) { - setSyncInProgress(false) + syncRequestInFlight = false requireResyncAfterAccountInit("⏳ Sync postponed until account is initialized") return@launch } @@ -626,6 +644,7 @@ object ProtocolManager { } private fun sendSynchronize(timestamp: Long) { + syncRequestInFlight = true val safeTimestamp = normalizeSyncTimestamp(timestamp) val packet = PacketSync().apply { status = SyncStatus.NOT_NEEDED @@ -649,6 +668,7 @@ object ProtocolManager { * has been scheduled on Dispatchers.IO. */ private fun handleSyncPacket(packet: PacketSync) { + syncRequestInFlight = false when (packet.status) { SyncStatus.BATCH_START -> { addLog("🔄 SYNC BATCH_START — incoming message batch") @@ -656,76 +676,86 @@ object ProtocolManager { // subsequent 0x06 packets are dispatched by OkHttp's sequential callback. setSyncInProgress(true) inboundProcessingFailures.set(0) - syncBatchMaxProcessedTimestamp.set(0L) + syncBatchMessageCount.set(0) } SyncStatus.BATCH_END -> { addLog("🔄 SYNC BATCH_END — waiting for tasks to finish (ts=${packet.timestamp})") // BATCH_END requires suspend (whenInboundTasksFinish), so we launch a coroutine. // syncBatchInProgress stays true until NOT_NEEDED arrives. scope.launch { - setSyncInProgress(true) - val tasksFinished = whenInboundTasksFinish() - if (!tasksFinished) { - android.util.Log.w( - TAG, - "SYNC BATCH_END: queue unavailable, skipping cursor update for this step" - ) - return@launch - } - val failuresInBatch = inboundProcessingFailures.getAndSet(0) - if (failuresInBatch > 0) { - addLog( - "⚠️ SYNC batch had $failuresInBatch processing error(s), continuing with desktop cursor behavior" - ) - } - val repository = messageRepository - val currentCursor = normalizeSyncTimestamp(repository?.getLastSyncTimestamp() ?: 0L) - val safeBatchTimestamp = normalizeSyncTimestamp(packet.timestamp) - val processedMaxTimestamp = normalizeSyncTimestamp(syncBatchMaxProcessedTimestamp.get()) - val nextCursor = - when { - safeBatchTimestamp <= 0L -> processedMaxTimestamp - processedMaxTimestamp <= 0L -> safeBatchTimestamp - processedMaxTimestamp < safeBatchTimestamp -> processedMaxTimestamp - else -> safeBatchTimestamp + syncBatchEndMutex.withLock { + if (!syncBatchInProgress) { + addLog("⚠️ SYNC BATCH_END ignored: sync already completed") + return@launch + } + val tasksFinished = whenInboundTasksFinish() + if (!tasksFinished) { + android.util.Log.w( + TAG, + "SYNC BATCH_END: queue unavailable, skipping cursor update for this step" + ) + val fallbackCursor = normalizeSyncTimestamp(messageRepository?.getLastSyncTimestamp() ?: 0L) + if (syncBatchInProgress) { + sendSynchronize(fallbackCursor) } - val requestCursor = - when { - nextCursor > 0L -> nextCursor - currentCursor > 0L -> currentCursor - else -> 0L + return@launch + } + val failuresInBatch = inboundProcessingFailures.getAndSet(0) + if (failuresInBatch > 0) { + addLog( + "⚠️ SYNC batch had $failuresInBatch processing error(s), continuing with desktop cursor behavior" + ) + } + val processedMessagesInBatch = syncBatchMessageCount.getAndSet(0) + val repository = messageRepository + val currentCursor = normalizeSyncTimestamp(repository?.getLastSyncTimestamp() ?: 0L) + val safeBatchTimestamp = normalizeSyncTimestamp(packet.timestamp) + val nextCursor = if (safeBatchTimestamp > 0L) safeBatchTimestamp else currentCursor + val requestCursor = + when { + nextCursor > 0L -> nextCursor + currentCursor > 0L -> currentCursor + else -> 0L + } + addLog( + "🔄 SYNC cursor calc: current=$currentCursor, server=$safeBatchTimestamp, next=$nextCursor, messages=$processedMessagesInBatch, failures=$failuresInBatch" + ) + + // If server repeatedly returns an empty/non-advancing batch, allow a few retries + // first (to avoid premature stop), then finish to avoid endless "Synchronizing...". + val noProgress = + failuresInBatch == 0 && + processedMessagesInBatch == 0 && + nextCursor <= currentCursor + if (noProgress) { + val stalled = stalledSyncBatchCount.incrementAndGet() + if (stalled >= MAX_STALLED_SYNC_BATCHES) { + finishSyncCycle( + "✅ SYNC COMPLETE — stalled on cursor for $stalled batch(es) (server=$safeBatchTimestamp, current=$currentCursor)" + ) + return@launch } + addLog( + "⚠️ SYNC batch has no progress (#$stalled/$MAX_STALLED_SYNC_BATCHES), retrying with cursor=$requestCursor" + ) + } else { + stalledSyncBatchCount.set(0) + } - // Loop guard: if server keeps BATCH_END with unchanged cursor and we did not - // process anything in this batch, treat sync as finished to avoid infinite loop. - val noProgress = - failuresInBatch == 0 && - processedMaxTimestamp <= 0L && - (nextCursor <= 0L || nextCursor == currentCursor) - if (noProgress) { - finishSyncCycle( - "✅ SYNC COMPLETE — no progress on batch (server=$safeBatchTimestamp, current=$currentCursor)" - ) - return@launch + if (nextCursor > 0L) { + addLog("🔄 SYNC tasks done — saving timestamp $nextCursor, requesting next batch") + repository?.updateLastSyncTimestamp(nextCursor) + } else { + addLog( + "⚠️ SYNC batch cursor unresolved (server=$safeBatchTimestamp, current=$currentCursor), requesting next batch with cursor=$requestCursor" + ) + } + if (!syncBatchInProgress) { + addLog("⚠️ SYNC next batch skipped: sync already completed") + return@launch + } + sendSynchronize(requestCursor) } - - // If server batch timestamp runs ahead of what we actually processed, clamp it. - // This avoids skipping tail messages when packet delivery/parsing was partial. - if (processedMaxTimestamp > 0L && processedMaxTimestamp < safeBatchTimestamp) { - addLog( - "⚠️ SYNC cursor clamped to processed max: server=$safeBatchTimestamp processed=$processedMaxTimestamp" - ) - } - - if (nextCursor > 0L) { - addLog("🔄 SYNC tasks done — saving timestamp $nextCursor, requesting next batch") - repository?.updateLastSyncTimestamp(nextCursor) - } else { - addLog( - "⚠️ SYNC batch cursor unresolved (server=$safeBatchTimestamp, processed=$processedMaxTimestamp, current=$currentCursor), requesting next batch with cursor=$requestCursor" - ) - } - sendSynchronize(requestCursor) } } SyncStatus.NOT_NEEDED -> { @@ -821,6 +851,7 @@ object ProtocolManager { fun syncOnForeground() { if (!isAuthenticated()) return if (syncBatchInProgress) return + if (syncRequestInFlight) return val now = System.currentTimeMillis() if (now - lastForegroundSyncTime < 5_000) return lastForegroundSyncTime = now @@ -838,6 +869,7 @@ object ProtocolManager { return } if (syncBatchInProgress) return + if (syncRequestInFlight) return scope.launch { val repository = messageRepository @@ -848,7 +880,8 @@ object ProtocolManager { val currentSync = repository.getLastSyncTimestamp() val rewindTo = (currentSync - backtrackMs.coerceAtLeast(0L)).coerceAtLeast(0L) - setSyncInProgress(true) + stalledSyncBatchCount.set(0) + syncRequestInFlight = true addLog("🔄 MANUAL SYNC requested: lastSync=$currentSync -> rewind=$rewindTo") sendSynchronize(rewindTo) } @@ -1096,6 +1129,7 @@ object ProtocolManager { protocol?.clearCredentials() _devices.value = emptyList() _pendingDeviceVerification.value = null + syncRequestInFlight = false setSyncInProgress(false) lastSubscribedToken = null // reset so token is re-sent on next connect } @@ -1108,6 +1142,7 @@ object ProtocolManager { protocol = null _devices.value = emptyList() _pendingDeviceVerification.value = null + syncRequestInFlight = false setSyncInProgress(false) scope.cancel() }