diff --git a/app/build.gradle.kts b/app/build.gradle.kts index 96f6ec3..54bfcd1 100644 --- a/app/build.gradle.kts +++ b/app/build.gradle.kts @@ -23,8 +23,8 @@ val gitShortSha = safeGitOutput("rev-parse", "--short", "HEAD") ?: "unknown" // ═══════════════════════════════════════════════════════════ // Rosetta versioning — bump here on each release // ═══════════════════════════════════════════════════════════ -val rosettaVersionName = "1.1.0" -val rosettaVersionCode = 12 // Increment on each release +val rosettaVersionName = "1.1.1" +val rosettaVersionCode = 13 // Increment on each release android { namespace = "com.rosetta.messenger" diff --git a/app/src/main/java/com/rosetta/messenger/MainActivity.kt b/app/src/main/java/com/rosetta/messenger/MainActivity.kt index e34da6f..5ae24b4 100644 --- a/app/src/main/java/com/rosetta/messenger/MainActivity.kt +++ b/app/src/main/java/com/rosetta/messenger/MainActivity.kt @@ -407,8 +407,6 @@ class MainActivity : FragmentActivity() { com.rosetta.messenger.push.RosettaFirebaseMessagingService.isAppInForeground = true // ⚡ На возврате в приложение пробуем мгновенный reconnect без ожидания backoff. ProtocolManager.reconnectNowIfNeeded("activity_onResume") - // 🔄 Desktop parity: синхронизация при каждом заходе в приложение - ProtocolManager.syncOnForeground() } override fun onPause() { diff --git a/app/src/main/java/com/rosetta/messenger/data/MessageRepository.kt b/app/src/main/java/com/rosetta/messenger/data/MessageRepository.kt index 142e8c0..c085b76 100644 --- a/app/src/main/java/com/rosetta/messenger/data/MessageRepository.kt +++ b/app/src/main/java/com/rosetta/messenger/data/MessageRepository.kt @@ -99,7 +99,6 @@ class MessageRepository private constructor(private val context: Context) { /** Desktop parity: MESSAGE_MAX_TIME_TO_DELEVERED_S = 80 (seconds) */ private const val MESSAGE_MAX_TIME_TO_DELIVERED_MS = 80_000L - private const val MAX_SYNC_FUTURE_DRIFT_MS = 86_400_000L // 24h const val SYSTEM_SAFE_PUBLIC_KEY = "0x000000000000000000000000000000000000000002" const val SYSTEM_SAFE_TITLE = "Safe" @@ -385,43 +384,13 @@ class MessageRepository private constructor(private val context: Context) { suspend fun getLastSyncTimestamp(): Long { val account = currentAccount ?: return 0L - val stored = syncTimeDao.getLastSync(account) ?: 0L - val normalized = normalizeSyncTimestamp(stored) - if (normalized != stored) { - syncTimeDao.upsert(AccountSyncTimeEntity(account = account, lastSync = normalized)) - if (stored > 0) { - android.util.Log.w( - "MessageRepository", - "⚠️ Normalized invalid last_sync for account=${account.take(10)}...: $stored -> $normalized" - ) - ProtocolManager.addLog("⚠️ SYNC cursor normalized: $stored -> $normalized") - } - } - return normalized + return syncTimeDao.getLastSync(account) ?: 0L } suspend fun updateLastSyncTimestamp(timestamp: Long) { - if (timestamp <= 0) return val account = currentAccount ?: return - val normalized = normalizeSyncTimestamp(timestamp) - if (normalized <= 0) return - // Desktop parity: allow moving sync cursor backward if needed. - syncTimeDao.upsert(AccountSyncTimeEntity(account = account, lastSync = normalized)) - } - - private fun normalizeSyncTimestamp(rawTimestamp: Long): Long { - if (rawTimestamp <= 0) return 0L - val now = System.currentTimeMillis() - val maxAllowed = now + MAX_SYNC_FUTURE_DRIFT_MS - var normalized = rawTimestamp - - // Heal common corruption where extra decimal places appear in timestamp. - while (normalized > maxAllowed) { - normalized /= 10L - if (normalized <= 0L) return 0L - } - - return if (normalized > maxAllowed) 0L else normalized + // Desktop parity: store raw cursor value from sync/update events. + syncTimeDao.upsert(AccountSyncTimeEntity(account = account, lastSync = timestamp)) } /** Получить поток сообщений для диалога */ @@ -953,10 +922,26 @@ class MessageRepository private constructor(private val context: Context) { // 1) from=opponent, to=account -> собеседник прочитал НАШИ сообщения (double check) // 2) from=account, to=opponent -> sync с другого нашего устройства (мы прочитали входящие) val isOwnReadSync = fromPublicKey == account + + // Desktop parity (group): from=groupMember, to=groupId -> mark own group messages as read. if (!isOwnReadSync && isGroupDialogKey(toPublicKey)) { - // Group read receipts are currently not mapped to per-message states. + val dialogKey = getDialogKey(toPublicKey) + messageDao.markAllAsRead(account, toPublicKey) + + val readCount = messageCache[dialogKey]?.value?.count { it.isFromMe && !it.isRead } ?: 0 + messageCache[dialogKey]?.let { flow -> + flow.value = + flow.value.map { msg -> + if (msg.isFromMe && !msg.isRead) msg.copy(isRead = true) else msg + } + } + + _deliveryStatusEvents.tryEmit(DeliveryStatusUpdate(dialogKey, "", DeliveryStatus.READ)) + MessageLogger.logReadStatus(fromPublicKey = toPublicKey, messagesCount = readCount) + dialogDao.updateDialogFromMessages(account, toPublicKey) return } + val opponentKey = if (isOwnReadSync) toPublicKey else fromPublicKey if (opponentKey.isBlank()) return diff --git a/app/src/main/java/com/rosetta/messenger/data/ReleaseNotes.kt b/app/src/main/java/com/rosetta/messenger/data/ReleaseNotes.kt index 50edff2..fac48db 100644 --- a/app/src/main/java/com/rosetta/messenger/data/ReleaseNotes.kt +++ b/app/src/main/java/com/rosetta/messenger/data/ReleaseNotes.kt @@ -17,9 +17,28 @@ object ReleaseNotes { val RELEASE_NOTICE = """ Update v$VERSION_PLACEHOLDER - Синхронизация сообщений - - Исправлен бесконечный цикл синхронизации, когда сервер возвращал пустые батчи с неизменным курсором - - Вынесена общая логика завершения sync-цикла для единообразной обработки всех сценариев + Группы и интерфейс + - Полностью обновлен экран группы в стиле приложения (по паритету с desktop логикой) + - В участниках добавлены верификации, админ-метка и тултип администратора + - Добавлен просмотр Encryption Key с QR-кодом + - Улучшены секции Media/Files/Links: корректные пустые состояния и выравнивание медиа-сетки + + Сообщения и списки + - Group Invite теперь отображается как invite-карточка вместо хэша (в чате и в chat list) + - Для групп в chat list показывается иконка и автор последнего сообщения (You/имя отправителя) + - Исправлено выравнивание превью вида "You: Photo" + - Системные события группы (например joined the group) приведены к desktop-стилю + + Модерация групп + - Добавлены свайп и long-press действия по участникам (Kick) + - Улучшены цвета, haptic и размеры action-кнопки; исправлен конфликт свайпа item vs экран + - Для групп в chat list добавлены swipe-actions: Pin, Leave, Delete + + Синхронизация и стабильность + - Исправлены пропуски сообщений при массовой синхронизации личных и групповых чатов + - Sync теперь не продвигает курсор батча при ошибках обработки и делает безопасные ретраи + - Исправлены кейсы, где requests зависели от состояния устройства, а не аккаунта + - Rosetta Updates и Safe исключены из requests """.trimIndent() fun getNotice(version: String): String = diff --git a/app/src/main/java/com/rosetta/messenger/database/MessageEntities.kt b/app/src/main/java/com/rosetta/messenger/database/MessageEntities.kt index 14ca260..a9dcac0 100644 --- a/app/src/main/java/com/rosetta/messenger/database/MessageEntities.kt +++ b/app/src/main/java/com/rosetta/messenger/database/MessageEntities.kt @@ -395,12 +395,11 @@ interface MessageDao { /** * Отметить все исходящие сообщения к собеседнику как прочитанные Используется когда приходит - * PacketRead от собеседника 🔥 ВАЖНО: delivered=3 означает READ (синхронизировано с - * ChatViewModel) + * PacketRead от собеседника. */ @Query( """ - UPDATE messages SET delivered = 3, read = 1 + UPDATE messages SET read = 1 WHERE account = :account AND to_public_key = :opponent AND from_me = 1 """ ) diff --git a/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt b/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt index bc4265b..0f5d0db 100644 --- a/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt +++ b/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt @@ -27,8 +27,6 @@ import kotlin.coroutines.resume object ProtocolManager { private const val TAG = "ProtocolManager" private const val MANUAL_SYNC_BACKTRACK_MS = 120_000L - private const val MAX_SYNC_FUTURE_DRIFT_MS = 86_400_000L // 24h - private const val MAX_STALLED_SYNC_BATCHES = 12 private const val MAX_DEBUG_LOGS = 600 private const val DEBUG_LOG_FLUSH_DELAY_MS = 60L @@ -96,15 +94,10 @@ object ProtocolManager { // Tracks the tail of the sequential processing chain (like desktop's `tail` promise) @Volatile private var inboundQueueDrainJob: Job? = null private val inboundProcessingFailures = AtomicInteger(0) - private val syncBatchMessageCount = AtomicInteger(0) - private val stalledSyncBatchCount = AtomicInteger(0) private val syncBatchEndMutex = Mutex() private fun setSyncInProgress(value: Boolean) { syncBatchInProgress = value - if (!value) { - stalledSyncBatchCount.set(0) - } if (_syncInProgress.value != value) { _syncInProgress.value = value } @@ -223,9 +216,7 @@ object ProtocolManager { */ private fun setupPacketHandlers() { // Обработчик входящих сообщений (0x06) - // Desktop parity: delivery ACK is sent AFTER successful processing, not before. - // Desktop itself never sends PacketDelivery (server auto-generates them), - // but we still notify the sender after we've safely stored the message. + // Desktop parity: desktop client does not send PacketDelivery manually. waitPacket(0x06) { packet -> val messagePacket = packet as PacketMessage @@ -236,19 +227,6 @@ object ProtocolManager { markInboundProcessingFailure("Incoming packet skipped before account init") return@launchInboundPacketTask } - val ownKey = getProtocol().getPublicKey().orEmpty() - if (ownKey.isBlank()) { - requireResyncAfterAccountInit("⏳ Incoming message before protocol account init, scheduling re-sync") - markInboundProcessingFailure("Incoming packet skipped before protocol account init") - return@launchInboundPacketTask - } - if (!isSupportedDirectMessagePacket(messagePacket, ownKey)) { - android.util.Log.w( - TAG, - "Skipping unsupported message packet (likely conversation): from=${messagePacket.fromPublicKey.take(16)}, to=${messagePacket.toPublicKey.take(16)}" - ) - return@launchInboundPacketTask - } val processed = repository.handleIncomingMessage(messagePacket) if (!processed) { markInboundProcessingFailure( @@ -258,17 +236,6 @@ object ProtocolManager { } if (!syncBatchInProgress) { repository.updateLastSyncTimestamp(messagePacket.timestamp) - } else { - syncBatchMessageCount.incrementAndGet() - } - // ✅ Send delivery ACK only AFTER message is safely stored in DB. - // Skip for own sync messages (no need to ACK yourself). - if (messagePacket.fromPublicKey != ownKey) { - val deliveryPacket = PacketDelivery().apply { - messageId = messagePacket.messageId - toPublicKey = messagePacket.fromPublicKey - } - send(deliveryPacket) } } } @@ -285,13 +252,6 @@ object ProtocolManager { markInboundProcessingFailure("Delivery packet skipped before account init") return@launchInboundPacketTask } - if (isUnsupportedDialogKey(deliveryPacket.toPublicKey)) { - android.util.Log.w( - TAG, - "Skipping unsupported delivery packet: to=${deliveryPacket.toPublicKey.take(24)}" - ) - return@launchInboundPacketTask - } try { repository.handleDelivery(deliveryPacket) } catch (e: Exception) { @@ -322,13 +282,6 @@ object ProtocolManager { markInboundProcessingFailure("Read packet skipped before protocol account init") return@launchInboundPacketTask } - if (!isSupportedDirectReadPacket(readPacket, ownKey)) { - android.util.Log.w( - TAG, - "Skipping unsupported read packet (likely conversation): from=${readPacket.fromPublicKey.take(16)}, to=${readPacket.toPublicKey.take(16)}" - ) - return@launchInboundPacketTask - } try { repository.handleRead(readPacket) } catch (e: Exception) { @@ -336,7 +289,14 @@ object ProtocolManager { return@launchInboundPacketTask } if (!syncBatchInProgress) { - repository.updateLastSyncTimestamp(System.currentTimeMillis()) + // Desktop parity: + // own direct read sync (from=me,to=peer) does not advance sync cursor. + val isOwnDirectReadSync = + readPacket.fromPublicKey.trim() == ownKey && + !isGroupDialogKey(readPacket.toPublicKey) + if (!isOwnDirectReadSync) { + repository.updateLastSyncTimestamp(System.currentTimeMillis()) + } } } } @@ -548,52 +508,6 @@ object ProtocolManager { return normalized.startsWith("#group:") || normalized.startsWith("group:") } - private fun isConversationDialogKey(value: String): Boolean { - val normalized = value.trim().lowercase(Locale.ROOT) - return normalized.startsWith("conversation:") - } - - private fun isUnsupportedDialogKey(value: String): Boolean { - val normalized = value.trim().lowercase(Locale.ROOT) - if (normalized.isBlank()) return true - if (isConversationDialogKey(normalized)) return true - return false - } - - private fun isSupportedDirectPeerKey(peerKey: String, ownKey: String): Boolean { - val normalized = peerKey.trim() - if (normalized.isBlank()) return false - if (normalized == ownKey) return true - if (MessageRepository.isSystemAccount(normalized)) return true - return !isUnsupportedDialogKey(normalized) - } - - private fun isSupportedDirectMessagePacket(packet: PacketMessage, ownKey: String): Boolean { - val from = packet.fromPublicKey.trim() - val to = packet.toPublicKey.trim() - if (from.isBlank() || to.isBlank()) return false - if (isConversationDialogKey(from) || isConversationDialogKey(to)) return false - if (isGroupDialogKey(to)) return true - return when { - from == ownKey -> isSupportedDirectPeerKey(to, ownKey) - to == ownKey -> isSupportedDirectPeerKey(from, ownKey) - else -> false - } - } - - private fun isSupportedDirectReadPacket(packet: PacketRead, ownKey: String): Boolean { - val from = packet.fromPublicKey.trim() - val to = packet.toPublicKey.trim() - if (from.isBlank() || to.isBlank()) return false - if (isConversationDialogKey(from) || isConversationDialogKey(to)) return false - if (isGroupDialogKey(to)) return true - return when { - from == ownKey -> isSupportedDirectPeerKey(to, ownKey) - to == ownKey -> isSupportedDirectPeerKey(from, ownKey) - else -> false - } - } - private fun onAuthenticated() { setSyncInProgress(false) TransportManager.requestTransportServer() @@ -605,8 +519,6 @@ object ProtocolManager { private fun finishSyncCycle(reason: String) { syncRequestInFlight = false - stalledSyncBatchCount.set(0) - syncBatchMessageCount.set(0) inboundProcessingFailures.set(0) addLog(reason) setSyncInProgress(false) @@ -671,7 +583,6 @@ object ProtocolManager { addLog("⚠️ SYNC request skipped: previous request still in flight") return } - stalledSyncBatchCount.set(0) syncRequestInFlight = true addLog("🔄 SYNC requested — fetching last sync timestamp...") scope.launch { @@ -701,13 +612,9 @@ object ProtocolManager { private fun sendSynchronize(timestamp: Long) { syncRequestInFlight = true - val safeTimestamp = normalizeSyncTimestamp(timestamp) val packet = PacketSync().apply { status = SyncStatus.NOT_NEEDED - this.timestamp = safeTimestamp - } - if (safeTimestamp != timestamp) { - addLog("⚠️ SYNC request timestamp normalized: $timestamp -> $safeTimestamp") + this.timestamp = timestamp } send(packet) } @@ -732,7 +639,6 @@ object ProtocolManager { // subsequent 0x06 packets are dispatched by OkHttp's sequential callback. setSyncInProgress(true) inboundProcessingFailures.set(0) - syncBatchMessageCount.set(0) } SyncStatus.BATCH_END -> { addLog("🔄 SYNC BATCH_END — waiting for tasks to finish (ts=${packet.timestamp})") @@ -740,77 +646,28 @@ object ProtocolManager { // syncBatchInProgress stays true until NOT_NEEDED arrives. scope.launch { syncBatchEndMutex.withLock { - if (!syncBatchInProgress) { - addLog("⚠️ SYNC BATCH_END ignored: sync already completed") - return@launch - } val tasksFinished = whenInboundTasksFinish() if (!tasksFinished) { android.util.Log.w( TAG, "SYNC BATCH_END: queue unavailable, skipping cursor update for this step" ) - val fallbackCursor = normalizeSyncTimestamp(messageRepository?.getLastSyncTimestamp() ?: 0L) - if (syncBatchInProgress) { - sendSynchronize(fallbackCursor) - } + val fallbackCursor = messageRepository?.getLastSyncTimestamp() ?: 0L + sendSynchronize(fallbackCursor) return@launch } val failuresInBatch = inboundProcessingFailures.getAndSet(0) if (failuresInBatch > 0) { addLog( - "⚠️ SYNC batch had $failuresInBatch processing error(s), continuing with desktop cursor behavior" + "⚠️ SYNC batch had $failuresInBatch processing error(s), continuing with desktop sync cursor behavior" ) } - val processedMessagesInBatch = syncBatchMessageCount.getAndSet(0) val repository = messageRepository - val currentCursor = normalizeSyncTimestamp(repository?.getLastSyncTimestamp() ?: 0L) - val safeBatchTimestamp = normalizeSyncTimestamp(packet.timestamp) - val nextCursor = if (safeBatchTimestamp > 0L) safeBatchTimestamp else currentCursor - val requestCursor = - when { - nextCursor > 0L -> nextCursor - currentCursor > 0L -> currentCursor - else -> 0L - } - addLog( - "🔄 SYNC cursor calc: current=$currentCursor, server=$safeBatchTimestamp, next=$nextCursor, messages=$processedMessagesInBatch, failures=$failuresInBatch" - ) - - // If server repeatedly returns an empty/non-advancing batch, allow a few retries - // first (to avoid premature stop), then finish to avoid endless "Synchronizing...". - val noProgress = - failuresInBatch == 0 && - processedMessagesInBatch == 0 && - nextCursor <= currentCursor - if (noProgress) { - val stalled = stalledSyncBatchCount.incrementAndGet() - if (stalled >= MAX_STALLED_SYNC_BATCHES) { - finishSyncCycle( - "✅ SYNC COMPLETE — stalled on cursor for $stalled batch(es) (server=$safeBatchTimestamp, current=$currentCursor)" - ) - return@launch - } - addLog( - "⚠️ SYNC batch has no progress (#$stalled/$MAX_STALLED_SYNC_BATCHES), retrying with cursor=$requestCursor" - ) - } else { - stalledSyncBatchCount.set(0) - } - - if (nextCursor > 0L) { - addLog("🔄 SYNC tasks done — saving timestamp $nextCursor, requesting next batch") - repository?.updateLastSyncTimestamp(nextCursor) - } else { - addLog( - "⚠️ SYNC batch cursor unresolved (server=$safeBatchTimestamp, current=$currentCursor), requesting next batch with cursor=$requestCursor" - ) - } - if (!syncBatchInProgress) { - addLog("⚠️ SYNC next batch skipped: sync already completed") - return@launch - } - sendSynchronize(requestCursor) + // Desktop parity: save the cursor provided by BATCH_END and request next + // chunk with the same cursor. + repository?.updateLastSyncTimestamp(packet.timestamp) + addLog("🔄 SYNC tasks done — cursor=${packet.timestamp}, requesting next batch") + sendSynchronize(packet.timestamp) } } } @@ -936,32 +793,12 @@ object ProtocolManager { val currentSync = repository.getLastSyncTimestamp() val rewindTo = (currentSync - backtrackMs.coerceAtLeast(0L)).coerceAtLeast(0L) - stalledSyncBatchCount.set(0) syncRequestInFlight = true addLog("🔄 MANUAL SYNC requested: lastSync=$currentSync -> rewind=$rewindTo") sendSynchronize(rewindTo) } } - /** - * Defensive normalization for sync cursor/timestamps. - * Some malformed values arrive with extra decimal digits; heal by scaling down. - */ - private fun normalizeSyncTimestamp(rawTimestamp: Long): Long { - if (rawTimestamp <= 0L) return 0L - val now = System.currentTimeMillis() - val maxAllowed = now + MAX_SYNC_FUTURE_DRIFT_MS - var normalized = rawTimestamp - - // Try to recover values with accidental decimal scaling (x10, x100, ...). - while (normalized > maxAllowed) { - normalized /= 10L - if (normalized <= 0L) return 0L - } - - return normalized - } - /** * Authenticate with server */