From 2da2c6ab360fd79e66cbc34654d9ea4b1b81607d Mon Sep 17 00:00:00 2001 From: k1ngsterr1 Date: Fri, 27 Feb 2026 02:43:30 +0500 Subject: [PATCH] feat: Bump version to 1.0.11, add ConnectionLogsScreen, and enhance message synchronization logic --- app/build.gradle.kts | 4 +- .../com/rosetta/messenger/MainActivity.kt | 16 ++ .../messenger/data/MessageRepository.kt | 52 +++- .../rosetta/messenger/data/ReleaseNotes.kt | 16 +- .../com/rosetta/messenger/network/Protocol.kt | 71 +++-- .../messenger/network/ProtocolManager.kt | 245 +++++++++++++++--- .../com/rosetta/messenger/network/Stream.kt | 17 ++ .../messenger/ui/chats/ChatsListScreen.kt | 15 -- .../ui/chats/ConnectionLogsScreen.kt | 4 +- 9 files changed, 338 insertions(+), 102 deletions(-) diff --git a/app/build.gradle.kts b/app/build.gradle.kts index 675523e..c509dd0 100644 --- a/app/build.gradle.kts +++ b/app/build.gradle.kts @@ -23,8 +23,8 @@ val gitShortSha = safeGitOutput("rev-parse", "--short", "HEAD") ?: "unknown" // ═══════════════════════════════════════════════════════════ // Rosetta versioning — bump here on each release // ═══════════════════════════════════════════════════════════ -val rosettaVersionName = "1.0.10" -val rosettaVersionCode = 10 // Increment on each release +val rosettaVersionName = "1.0.11" +val rosettaVersionCode = 11 // Increment on each release android { namespace = "com.rosetta.messenger" diff --git a/app/src/main/java/com/rosetta/messenger/MainActivity.kt b/app/src/main/java/com/rosetta/messenger/MainActivity.kt index a4a51cf..cd2c127 100644 --- a/app/src/main/java/com/rosetta/messenger/MainActivity.kt +++ b/app/src/main/java/com/rosetta/messenger/MainActivity.kt @@ -41,6 +41,7 @@ import com.rosetta.messenger.ui.auth.AuthFlow import com.rosetta.messenger.ui.auth.DeviceConfirmScreen import com.rosetta.messenger.ui.chats.ChatDetailScreen import com.rosetta.messenger.ui.chats.ChatsListScreen +import com.rosetta.messenger.ui.chats.ConnectionLogsScreen import com.rosetta.messenger.ui.chats.RequestsListScreen import com.rosetta.messenger.ui.chats.SearchScreen import com.rosetta.messenger.ui.components.OptimizedEmojiCache @@ -503,6 +504,7 @@ sealed class Screen { data object Safety : Screen() data object Backup : Screen() data object Logs : Screen() + data object ConnectionLogs : Screen() data object CrashLogs : Screen() data object Biometric : Screen() data object Appearance : Screen() @@ -610,6 +612,9 @@ fun MainScreen( val isSafetyVisible by remember { derivedStateOf { navStack.any { it is Screen.Safety } } } val isBackupVisible by remember { derivedStateOf { navStack.any { it is Screen.Backup } } } val isLogsVisible by remember { derivedStateOf { navStack.any { it is Screen.Logs } } } + val isConnectionLogsVisible by remember { + derivedStateOf { navStack.any { it is Screen.ConnectionLogs } } + } val isCrashLogsVisible by remember { derivedStateOf { navStack.any { it is Screen.CrashLogs } } } @@ -1010,6 +1015,17 @@ fun MainScreen( ) } + SwipeBackContainer( + isVisible = isConnectionLogsVisible, + onBack = { navStack = navStack.filterNot { it is Screen.ConnectionLogs } }, + isDarkTheme = isDarkTheme + ) { + ConnectionLogsScreen( + isDarkTheme = isDarkTheme, + onBack = { navStack = navStack.filterNot { it is Screen.ConnectionLogs } } + ) + } + var isOtherProfileSwipeEnabled by remember { mutableStateOf(true) } LaunchedEffect(selectedOtherUser?.publicKey) { isOtherProfileSwipeEnabled = true diff --git a/app/src/main/java/com/rosetta/messenger/data/MessageRepository.kt b/app/src/main/java/com/rosetta/messenger/data/MessageRepository.kt index 60d3b79..f57fe37 100644 --- a/app/src/main/java/com/rosetta/messenger/data/MessageRepository.kt +++ b/app/src/main/java/com/rosetta/messenger/data/MessageRepository.kt @@ -98,6 +98,7 @@ class MessageRepository private constructor(private val context: Context) { /** Desktop parity: MESSAGE_MAX_TIME_TO_DELEVERED_S = 80 (seconds) */ private const val MESSAGE_MAX_TIME_TO_DELIVERED_MS = 80_000L + private const val MAX_SYNC_FUTURE_DRIFT_MS = 86_400_000L // 24h const val SYSTEM_SAFE_PUBLIC_KEY = "0x000000000000000000000000000000000000000002" const val SYSTEM_SAFE_TITLE = "Safe" @@ -366,16 +367,43 @@ class MessageRepository private constructor(private val context: Context) { suspend fun getLastSyncTimestamp(): Long { val account = currentAccount ?: return 0L - return syncTimeDao.getLastSync(account) ?: 0L + val stored = syncTimeDao.getLastSync(account) ?: 0L + val normalized = normalizeSyncTimestamp(stored) + if (normalized != stored) { + syncTimeDao.upsert(AccountSyncTimeEntity(account = account, lastSync = normalized)) + if (stored > 0) { + android.util.Log.w( + "MessageRepository", + "⚠️ Normalized invalid last_sync for account=${account.take(10)}...: $stored -> $normalized" + ) + ProtocolManager.addLog("⚠️ SYNC cursor normalized: $stored -> $normalized") + } + } + return normalized } suspend fun updateLastSyncTimestamp(timestamp: Long) { if (timestamp <= 0) return val account = currentAccount ?: return - val existing = syncTimeDao.getLastSync(account) ?: 0L - if (timestamp > existing) { - syncTimeDao.upsert(AccountSyncTimeEntity(account = account, lastSync = timestamp)) + val normalized = normalizeSyncTimestamp(timestamp) + if (normalized <= 0) return + // Desktop parity: allow moving sync cursor backward if needed. + syncTimeDao.upsert(AccountSyncTimeEntity(account = account, lastSync = normalized)) + } + + private fun normalizeSyncTimestamp(rawTimestamp: Long): Long { + if (rawTimestamp <= 0) return 0L + val now = System.currentTimeMillis() + val maxAllowed = now + MAX_SYNC_FUTURE_DRIFT_MS + var normalized = rawTimestamp + + // Heal common corruption where extra decimal places appear in timestamp. + while (normalized > maxAllowed) { + normalized /= 10L + if (normalized <= 0L) return 0L } + + return if (normalized > maxAllowed) 0L else normalized } /** Получить поток сообщений для диалога */ @@ -591,21 +619,21 @@ class MessageRepository private constructor(private val context: Context) { return optimisticMessage } - /** Обработка входящего сообщения */ - suspend fun handleIncomingMessage(packet: PacketMessage) { + /** Обработка входящего сообщения. Возвращает true если пакет обработан безопасно. */ + suspend fun handleIncomingMessage(packet: PacketMessage): Boolean { val startTime = System.currentTimeMillis() val account = currentAccount ?: run { MessageLogger.debug("📥 RECEIVE SKIP: account is null") - return + return false } val privateKey = currentPrivateKey ?: run { MessageLogger.debug("📥 RECEIVE SKIP: privateKey is null") - return + return false } // 📝 LOG: Начало обработки входящего сообщения @@ -625,7 +653,7 @@ class MessageRepository private constructor(private val context: Context) { val isBlocked = database.blacklistDao().isUserBlocked(packet.fromPublicKey, account) if (isBlocked) { MessageLogger.logBlockedSender(packet.fromPublicKey) - return + return true } } @@ -650,14 +678,14 @@ class MessageRepository private constructor(private val context: Context) { MessageLogger.debug( "📥 SKIP (in-memory cache): Message $messageId already being processed" ) - return + return true } // 🔥 ВТОРОЙ УРОВЕНЬ ЗАЩИТЫ: Проверка в БД (для сообщений сохранённых в предыдущих сессиях) val isDuplicate = messageDao.messageExists(account, messageId) MessageLogger.logDuplicateCheck(messageId, isDuplicate) if (isDuplicate) { - return + return true } val dialogOpponentKey = if (isOwnMessage) packet.toPublicKey else packet.fromPublicKey @@ -794,6 +822,7 @@ class MessageRepository private constructor(private val context: Context) { // 📝 LOG: Успешная обработка MessageLogger.logReceiveSuccess(messageId, System.currentTimeMillis() - startTime) + return true } catch (e: Exception) { // 📝 LOG: Ошибка обработки MessageLogger.logDecryptionError(messageId, e) @@ -803,6 +832,7 @@ class MessageRepository private constructor(private val context: Context) { // Разрешаем повторную обработку через re-sync, если пакет не удалось сохранить. processedMessageIds.remove(messageId) e.printStackTrace() + return false } } diff --git a/app/src/main/java/com/rosetta/messenger/data/ReleaseNotes.kt b/app/src/main/java/com/rosetta/messenger/data/ReleaseNotes.kt index eb4980a..2f29c43 100644 --- a/app/src/main/java/com/rosetta/messenger/data/ReleaseNotes.kt +++ b/app/src/main/java/com/rosetta/messenger/data/ReleaseNotes.kt @@ -17,15 +17,15 @@ object ReleaseNotes { val RELEASE_NOTICE = """ Update v$VERSION_PLACEHOLDER - Верификация аккаунта - - Бейдж верификации отображается в боковом меню рядом с именем - - Бейдж верификации отображается в экране профиля - - Статус загружается из кэша пользователей при старте + Синхронизация сообщений + - Исправлен недолет сообщений после оффлайна при массовой отправке (спам-тест) + - Исправлен сценарий, когда синхронизация останавливалась на первой пачке + - Нормализуется sync-cursor (last_sync), включая поврежденные timestamp + - Следующий sync-запрос отправляется с безопасным timestamp - Стабильность - - Фильтрация неподдерживаемых пакетов (группы, conversations) - - Добавлена фильтрация delivery-пакетов для неподдерживаемых диалогов - - Улучшена обработка ошибок в очереди входящих пакетов + Стабильность протокола + - Улучшена защита чтения строк из бинарного потока + - Ошибки внутри батча больше не клинят дальнейшую догрузку пакетов """.trimIndent() fun getNotice(version: String): String = diff --git a/app/src/main/java/com/rosetta/messenger/network/Protocol.kt b/app/src/main/java/com/rosetta/messenger/network/Protocol.kt index 5ed1bf3..48eccf9 100644 --- a/app/src/main/java/com/rosetta/messenger/network/Protocol.kt +++ b/app/src/main/java/com/rosetta/messenger/network/Protocol.kt @@ -32,6 +32,7 @@ class Protocol( private const val TAG = "RosettaProtocol" private const val RECONNECT_INTERVAL = 5000L // 5 seconds (как в Архиве) private const val HANDSHAKE_TIMEOUT = 10000L // 10 seconds + private const val MIN_PACKET_ID_BITS = 18 // Stream.readInt16() = 2 * readInt8() (9 bits each) } private fun log(message: String) { @@ -493,32 +494,54 @@ class Protocol( // Debug: log first 50 bytes as hex val hexDump = data.take(50).joinToString(" ") { String.format("%02X", it.toInt() and 0xFF) } log("📥 Received ${data.size} bytes: $hexDump${if (data.size > 50) "..." else ""}") - + val stream = Stream(data) - val packetId = stream.readInt16() - - log("📥 Packet ID: $packetId") - - val packetFactory = supportedPackets[packetId] - if (packetFactory == null) { - log("⚠️ Unknown packet ID: $packetId") - return - } - - val packet = packetFactory() - packet.receive(stream) - - // Notify waiters - val waitersCount = packetWaiters[packetId]?.size ?: 0 - log("📥 Notifying $waitersCount waiter(s) for packet $packetId") - - packetWaiters[packetId]?.forEach { callback -> - try { - callback(packet) - } catch (e: Exception) { - log("❌ Error in packet handler: ${e.message}") - e.printStackTrace() + var parsedPackets = 0 + + while (stream.getRemainingBits() >= MIN_PACKET_ID_BITS) { + val packetStartBits = stream.getReadPointerBits() + val packetId = stream.readInt16() + + log("📥 Packet ID: $packetId") + + val packetFactory = supportedPackets[packetId] + if (packetFactory == null) { + log("⚠️ Unknown packet ID: $packetId, stopping frame parse") + break } + + val packet = packetFactory() + try { + packet.receive(stream) + } catch (e: Exception) { + log("❌ Error parsing packet $packetId: ${e.message}") + e.printStackTrace() + break + } + + // Notify waiters + val waitersCount = packetWaiters[packetId]?.size ?: 0 + log("📥 Notifying $waitersCount waiter(s) for packet $packetId") + + packetWaiters[packetId]?.forEach { callback -> + try { + callback(packet) + } catch (e: Exception) { + log("❌ Error in packet handler: ${e.message}") + e.printStackTrace() + } + } + + parsedPackets++ + val consumedBits = stream.getReadPointerBits() - packetStartBits + if (consumedBits <= 0) { + log("⚠️ Packet parser made no progress for packet $packetId, stopping frame parse") + break + } + } + + if (parsedPackets > 1) { + log("📦 Parsed $parsedPackets packets from single WebSocket frame") } } catch (e: Exception) { log("❌ Error parsing packet: ${e.message}") diff --git a/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt b/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt index 127b191..56b2b1f 100644 --- a/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt +++ b/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt @@ -13,6 +13,9 @@ import kotlinx.coroutines.flow.asStateFlow import java.security.SecureRandom import java.util.* import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.atomic.AtomicLong import kotlin.coroutines.resume /** @@ -21,6 +24,11 @@ import kotlin.coroutines.resume */ object ProtocolManager { private const val TAG = "ProtocolManager" + private const val MANUAL_SYNC_BACKTRACK_MS = 120_000L + private const val MAX_SYNC_FUTURE_DRIFT_MS = 86_400_000L // 24h + private const val MAX_DEBUG_LOGS = 600 + private const val DEBUG_LOG_FLUSH_DELAY_MS = 60L + private const val INBOUND_TASK_TIMEOUT_MS = 20_000L // Server address - same as React Native version private const val SERVER_ADDRESS = "ws://46.28.71.12:3000" @@ -37,10 +45,12 @@ object ProtocolManager { @Volatile private var lastSubscribedToken: String? = null - // Debug logs for dev console - 🚀 ОТКЛЮЧЕНО для производительности - // Логи только в Logcat, не в StateFlow (это вызывало ANR!) private val _debugLogs = MutableStateFlow>(emptyList()) val debugLogs: StateFlow> = _debugLogs.asStateFlow() + private val debugLogsBuffer = ArrayDeque(MAX_DEBUG_LOGS) + private val debugLogsLock = Any() + @Volatile private var debugFlushJob: Job? = null + private val debugFlushPending = AtomicBoolean(false) // Typing status private val _typingUsers = MutableStateFlow>(emptySet()) @@ -63,8 +73,8 @@ object ProtocolManager { // Pending resolves: publicKey → list of continuations waiting for the result private val pendingResolves = ConcurrentHashMap>>() - // 🚀 Флаг для включения UI логов (по умолчанию ВЫКЛЮЧЕНО - это вызывало ANR!) - private var uiLogsEnabled = false + // UI logs are enabled by default; updates are throttled and bounded by MAX_DEBUG_LOGS. + private var uiLogsEnabled = true private var lastProtocolState: ProtocolState? = null @Volatile private var syncBatchInProgress = false private val _syncInProgress = MutableStateFlow(false) @@ -79,6 +89,8 @@ object ProtocolManager { private val inboundTaskChannel = Channel Unit>(Channel.UNLIMITED) // Tracks the tail of the sequential processing chain (like desktop's `tail` promise) @Volatile private var inboundQueueDrainJob: Job? = null + private val inboundProcessingFailures = AtomicInteger(0) + private val syncBatchMaxProcessedTimestamp = AtomicLong(0L) private fun setSyncInProgress(value: Boolean) { syncBatchInProgress = value @@ -87,18 +99,60 @@ object ProtocolManager { } } - fun addLog(@Suppress("UNUSED_PARAMETER") message: String) { - // Disabled by request: keep debug log buffer empty. - return + fun addLog(message: String) { + if (!uiLogsEnabled) return + val timestamp = + java.text.SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault()).format(Date()) + val line = "[$timestamp] $message" + synchronized(debugLogsLock) { + if (debugLogsBuffer.size >= MAX_DEBUG_LOGS) { + debugLogsBuffer.removeFirst() + } + debugLogsBuffer.addLast(line) + } + flushDebugLogsThrottled() } - + fun enableUILogs(enabled: Boolean) { uiLogsEnabled = enabled + if (enabled) { + val snapshot = synchronized(debugLogsLock) { debugLogsBuffer.toList() } + _debugLogs.value = snapshot + } else { + _debugLogs.value = emptyList() + } } - + fun clearLogs() { + synchronized(debugLogsLock) { + debugLogsBuffer.clear() + } _debugLogs.value = emptyList() } + + private fun flushDebugLogsThrottled() { + debugFlushPending.set(true) + if (debugFlushJob?.isActive == true) return + debugFlushJob = + scope.launch { + while (debugFlushPending.getAndSet(false)) { + delay(DEBUG_LOG_FLUSH_DELAY_MS) + val snapshot = synchronized(debugLogsLock) { debugLogsBuffer.toList() } + _debugLogs.value = snapshot + } + } + } + + private fun markInboundProcessingFailure(reason: String, error: Throwable? = null) { + inboundProcessingFailures.incrementAndGet() + if (error != null) { + android.util.Log.e(TAG, reason, error) + addLog("❌ $reason: ${error.message ?: error.javaClass.simpleName}") + } else { + android.util.Log.w(TAG, reason) + addLog("⚠️ $reason") + } + } /** * Инициализация с контекстом для доступа к MessageRepository @@ -160,11 +214,13 @@ object ProtocolManager { val repository = messageRepository if (repository == null || !repository.isInitialized()) { requireResyncAfterAccountInit("⏳ Incoming message before account init, scheduling re-sync") + markInboundProcessingFailure("Incoming packet skipped before account init") return@launchInboundPacketTask } val ownKey = getProtocol().getPublicKey().orEmpty() if (ownKey.isBlank()) { requireResyncAfterAccountInit("⏳ Incoming message before protocol account init, scheduling re-sync") + markInboundProcessingFailure("Incoming packet skipped before protocol account init") return@launchInboundPacketTask } if (!isSupportedDirectMessagePacket(messagePacket, ownKey)) { @@ -174,22 +230,28 @@ object ProtocolManager { ) return@launchInboundPacketTask } - try { - repository.handleIncomingMessage(messagePacket) - if (!syncBatchInProgress) { - repository.updateLastSyncTimestamp(messagePacket.timestamp) + val processed = repository.handleIncomingMessage(messagePacket) + if (!processed) { + markInboundProcessingFailure( + "Message processing failed for ${messagePacket.messageId.take(8)}" + ) + return@launchInboundPacketTask + } + if (!syncBatchInProgress) { + repository.updateLastSyncTimestamp(messagePacket.timestamp) + } else if (messagePacket.timestamp > 0L) { + syncBatchMaxProcessedTimestamp.accumulateAndGet(messagePacket.timestamp) { prev, cur -> + if (cur > prev) cur else prev } - // ✅ Send delivery ACK only AFTER message is safely stored in DB. - // Skip for own sync messages (no need to ACK yourself). - if (messagePacket.fromPublicKey != ownKey) { - val deliveryPacket = PacketDelivery().apply { - messageId = messagePacket.messageId - toPublicKey = messagePacket.fromPublicKey - } - send(deliveryPacket) + } + // ✅ Send delivery ACK only AFTER message is safely stored in DB. + // Skip for own sync messages (no need to ACK yourself). + if (messagePacket.fromPublicKey != ownKey) { + val deliveryPacket = PacketDelivery().apply { + messageId = messagePacket.messageId + toPublicKey = messagePacket.fromPublicKey } - } catch (e: Exception) { - android.util.Log.e(TAG, "❌ Message processing failed: ${messagePacket.messageId.take(8)}, err=${e.message}") + send(deliveryPacket) } } } @@ -203,6 +265,7 @@ object ProtocolManager { val repository = messageRepository if (repository == null || !repository.isInitialized()) { requireResyncAfterAccountInit("⏳ Delivery status before account init, scheduling re-sync") + markInboundProcessingFailure("Delivery packet skipped before account init") return@launchInboundPacketTask } if (isUnsupportedDialogKey(deliveryPacket.toPublicKey)) { @@ -212,7 +275,12 @@ object ProtocolManager { ) return@launchInboundPacketTask } - repository.handleDelivery(deliveryPacket) + try { + repository.handleDelivery(deliveryPacket) + } catch (e: Exception) { + markInboundProcessingFailure("Delivery processing failed", e) + return@launchInboundPacketTask + } if (!syncBatchInProgress) { repository.updateLastSyncTimestamp(System.currentTimeMillis()) } @@ -228,11 +296,13 @@ object ProtocolManager { val repository = messageRepository if (repository == null || !repository.isInitialized()) { requireResyncAfterAccountInit("⏳ Read status before account init, scheduling re-sync") + markInboundProcessingFailure("Read packet skipped before account init") return@launchInboundPacketTask } val ownKey = getProtocol().getPublicKey().orEmpty() if (ownKey.isBlank()) { requireResyncAfterAccountInit("⏳ Read status before protocol account init, scheduling re-sync") + markInboundProcessingFailure("Read packet skipped before protocol account init") return@launchInboundPacketTask } if (!isSupportedDirectReadPacket(readPacket, ownKey)) { @@ -242,7 +312,12 @@ object ProtocolManager { ) return@launchInboundPacketTask } - repository.handleRead(readPacket) + try { + repository.handleRead(readPacket) + } catch (e: Exception) { + markInboundProcessingFailure("Read processing failed", e) + return@launchInboundPacketTask + } if (!syncBatchInProgress) { repository.updateLastSyncTimestamp(System.currentTimeMillis()) } @@ -380,9 +455,14 @@ object ProtocolManager { inboundQueueDrainJob = scope.launch { for (task in inboundTaskChannel) { try { - task() + withTimeout(INBOUND_TASK_TIMEOUT_MS) { + task() + } } catch (t: Throwable) { - android.util.Log.e(TAG, "Dialog queue error", t) + markInboundProcessingFailure( + "Dialog queue task failed or timed out (${INBOUND_TASK_TIMEOUT_MS}ms)", + t + ) } } } @@ -392,7 +472,10 @@ object ProtocolManager { ensureInboundQueueDrainRunning() val result = inboundTaskChannel.trySend(block) if (result.isFailure) { - android.util.Log.e(TAG, "Failed to enqueue inbound task", result.exceptionOrNull()) + markInboundProcessingFailure( + "Failed to enqueue inbound task", + result.exceptionOrNull() + ) return false } return true @@ -536,9 +619,13 @@ object ProtocolManager { } private fun sendSynchronize(timestamp: Long) { + val safeTimestamp = normalizeSyncTimestamp(timestamp) val packet = PacketSync().apply { status = SyncStatus.NOT_NEEDED - this.timestamp = timestamp + this.timestamp = safeTimestamp + } + if (safeTimestamp != timestamp) { + addLog("⚠️ SYNC request timestamp normalized: $timestamp -> $safeTimestamp") } send(packet) } @@ -561,6 +648,8 @@ object ProtocolManager { // Synchronous — guarantees syncBatchInProgress=true before any // subsequent 0x06 packets are dispatched by OkHttp's sequential callback. setSyncInProgress(true) + inboundProcessingFailures.set(0) + syncBatchMaxProcessedTimestamp.set(0L) } SyncStatus.BATCH_END -> { addLog("🔄 SYNC BATCH_END — waiting for tasks to finish (ts=${packet.timestamp})") @@ -572,20 +661,51 @@ object ProtocolManager { if (!tasksFinished) { android.util.Log.w( TAG, - "SYNC BATCH_END: queue unavailable, requesting re-sync without advancing cursor" + "SYNC BATCH_END: queue unavailable, skipping cursor update for this step" ) - val fallbackTimestamp = try { - messageRepository?.getLastSyncTimestamp() ?: packet.timestamp - } catch (e: Exception) { - android.util.Log.e(TAG, "Failed to read last sync timestamp for fallback", e) - packet.timestamp - } - sendSynchronize(fallbackTimestamp) return@launch } - addLog("🔄 SYNC tasks done — saving timestamp ${packet.timestamp}, requesting next batch") - messageRepository?.updateLastSyncTimestamp(packet.timestamp) - sendSynchronize(packet.timestamp) + val failuresInBatch = inboundProcessingFailures.getAndSet(0) + if (failuresInBatch > 0) { + addLog( + "⚠️ SYNC batch had $failuresInBatch processing error(s), continuing with desktop cursor behavior" + ) + } + val repository = messageRepository + val currentCursor = normalizeSyncTimestamp(repository?.getLastSyncTimestamp() ?: 0L) + val safeBatchTimestamp = normalizeSyncTimestamp(packet.timestamp) + val processedMaxTimestamp = normalizeSyncTimestamp(syncBatchMaxProcessedTimestamp.get()) + val nextCursor = + when { + safeBatchTimestamp <= 0L -> processedMaxTimestamp + processedMaxTimestamp <= 0L -> safeBatchTimestamp + processedMaxTimestamp < safeBatchTimestamp -> processedMaxTimestamp + else -> safeBatchTimestamp + } + val requestCursor = + when { + nextCursor > 0L -> nextCursor + currentCursor > 0L -> currentCursor + else -> 0L + } + + // If server batch timestamp runs ahead of what we actually processed, clamp it. + // This avoids skipping tail messages when packet delivery/parsing was partial. + if (processedMaxTimestamp > 0L && processedMaxTimestamp < safeBatchTimestamp) { + addLog( + "⚠️ SYNC cursor clamped to processed max: server=$safeBatchTimestamp processed=$processedMaxTimestamp" + ) + } + + if (nextCursor > 0L) { + addLog("🔄 SYNC tasks done — saving timestamp $nextCursor, requesting next batch") + repository?.updateLastSyncTimestamp(nextCursor) + } else { + addLog( + "⚠️ SYNC batch cursor unresolved (server=$safeBatchTimestamp, processed=$processedMaxTimestamp, current=$currentCursor), requesting next batch with cursor=$requestCursor" + ) + } + sendSynchronize(requestCursor) } } SyncStatus.NOT_NEEDED -> { @@ -694,6 +814,51 @@ object ProtocolManager { addLog("🔄 SYNC on foreground resume") requestSynchronize() } + + /** + * Manual sync trigger from UI. + * Rewinds lastSync a bit to safely re-fetch recent packets and re-starts sync. + */ + fun forceSynchronize(backtrackMs: Long = MANUAL_SYNC_BACKTRACK_MS) { + if (!isAuthenticated()) { + reconnectNowIfNeeded("manual_sync_button") + return + } + if (syncBatchInProgress) return + + scope.launch { + val repository = messageRepository + if (repository == null || !repository.isInitialized()) { + requireResyncAfterAccountInit("⏳ Manual sync postponed until account is initialized") + return@launch + } + val currentSync = repository.getLastSyncTimestamp() + val rewindTo = (currentSync - backtrackMs.coerceAtLeast(0L)).coerceAtLeast(0L) + + setSyncInProgress(true) + addLog("🔄 MANUAL SYNC requested: lastSync=$currentSync -> rewind=$rewindTo") + sendSynchronize(rewindTo) + } + } + + /** + * Defensive normalization for sync cursor/timestamps. + * Some malformed values arrive with extra decimal digits; heal by scaling down. + */ + private fun normalizeSyncTimestamp(rawTimestamp: Long): Long { + if (rawTimestamp <= 0L) return 0L + val now = System.currentTimeMillis() + val maxAllowed = now + MAX_SYNC_FUTURE_DRIFT_MS + var normalized = rawTimestamp + + // Try to recover values with accidental decimal scaling (x10, x100, ...). + while (normalized > maxAllowed) { + normalized /= 10L + if (normalized <= 0L) return 0L + } + + return normalized + } /** * Authenticate with server diff --git a/app/src/main/java/com/rosetta/messenger/network/Stream.kt b/app/src/main/java/com/rosetta/messenger/network/Stream.kt index b1a6287..c2e825f 100644 --- a/app/src/main/java/com/rosetta/messenger/network/Stream.kt +++ b/app/src/main/java/com/rosetta/messenger/network/Stream.kt @@ -16,6 +16,14 @@ class Stream(stream: ByteArray = ByteArray(0)) { fun getStream(): ByteArray { return _stream.map { it.toByte() }.toByteArray() } + + fun getReadPointerBits(): Int = _readPointer + + fun getTotalBits(): Int = _stream.size * 8 + + fun getRemainingBits(): Int = getTotalBits() - _readPointer + + fun hasRemainingBits(): Boolean = _readPointer < getTotalBits() fun setStream(stream: ByteArray) { _stream = stream.map { it.toInt() and 0xFF }.toMutableList() @@ -115,6 +123,15 @@ class Stream(stream: ByteArray = ByteArray(0)) { fun readString(): String { val length = readInt32() + // Desktop parity + safety: don't trust malformed string length. + val bytesAvailable = _stream.size - (_readPointer shr 3) + if (length < 0 || (length.toLong() * 2L) > bytesAvailable.toLong()) { + android.util.Log.w( + "RosettaStream", + "readString invalid length=$length, bytesAvailable=$bytesAvailable, readPointer=$_readPointer" + ) + return "" + } val sb = StringBuilder() for (i in 0 until length) { sb.append(readInt16().toChar()) diff --git a/app/src/main/java/com/rosetta/messenger/ui/chats/ChatsListScreen.kt b/app/src/main/java/com/rosetta/messenger/ui/chats/ChatsListScreen.kt index 1cc75e2..cc77562 100644 --- a/app/src/main/java/com/rosetta/messenger/ui/chats/ChatsListScreen.kt +++ b/app/src/main/java/com/rosetta/messenger/ui/chats/ChatsListScreen.kt @@ -1149,21 +1149,6 @@ fun ChatsListScreen( } ) - // 🔄 Sync logs - DrawerMenuItemEnhanced( - painter = painterResource(id = R.drawable.files_document), - text = "Sync Logs", - iconColor = menuIconColor, - textColor = menuTextColor, - onClick = { - scope.launch { - drawerState.close() - kotlinx.coroutines.delay(100) - showSduLogs = true - } - } - ) - } // ═══════════════════════════════════════════════════════════ diff --git a/app/src/main/java/com/rosetta/messenger/ui/chats/ConnectionLogsScreen.kt b/app/src/main/java/com/rosetta/messenger/ui/chats/ConnectionLogsScreen.kt index f15880f..dbeed8f 100644 --- a/app/src/main/java/com/rosetta/messenger/ui/chats/ConnectionLogsScreen.kt +++ b/app/src/main/java/com/rosetta/messenger/ui/chats/ConnectionLogsScreen.kt @@ -3,7 +3,7 @@ package com.rosetta.messenger.ui.chats import androidx.compose.foundation.background import androidx.compose.foundation.layout.* import androidx.compose.foundation.lazy.LazyColumn -import androidx.compose.foundation.lazy.items +import androidx.compose.foundation.lazy.itemsIndexed import androidx.compose.foundation.lazy.rememberLazyListState import androidx.compose.foundation.shape.RoundedCornerShape import androidx.compose.material3.* @@ -180,7 +180,7 @@ fun ConnectionLogsScreen( .padding(horizontal = 8.dp, vertical = 4.dp), verticalArrangement = Arrangement.spacedBy(2.dp) ) { - items(logs, key = { it.hashCode().toString() + logs.indexOf(it) }) { log -> + itemsIndexed(logs, key = { index, _ -> index }) { _, log -> val logColor = when { "❌" in log || "FAILED" in log || "Error" in log || "error" in log -> Color(0xFFEF5350) "✅" in log || "COMPLETE" in log || "SUCCESS" in log -> Color(0xFF4CAF50)