From f72138a8a2e829ff556db255a788bb07f4f1e5e2 Mon Sep 17 00:00:00 2001 From: k1ngsterr1 Date: Mon, 16 Mar 2026 23:36:35 +0700 Subject: [PATCH] =?UTF-8?q?=D0=A4=D0=B8=D0=BA=D1=81=20=D1=81=D0=B8=D0=BD?= =?UTF-8?q?=D1=85=D1=80=D0=BE=D0=BD=D0=B8=D0=B7=D0=B0=D1=86=D0=B8=D0=B8=20?= =?UTF-8?q?Android=20=E2=86=94=20iOS:=20retry=20=D0=BC=D0=B5=D1=85=D0=B0?= =?UTF-8?q?=D0=BD=D0=B8=D0=B7=D0=BC=20=D0=B8=20=D0=BD=D0=BE=D1=80=D0=BC?= =?UTF-8?q?=D0=B0=D0=BB=D0=B8=D0=B7=D0=B0=D1=86=D0=B8=D1=8F=20sync-=D0=BA?= =?UTF-8?q?=D1=83=D1=80=D1=81=D0=BE=D1=80=D0=B0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Добавлен retry для исходящих сообщений (iOS parity): 4с интервал, 3 попытки, 80с таймаут - Нормализация sync timestamp в миллисекунды (предотвращает расхождение курсора) - resolveOutgoingRetry при получении delivery ACK (0x08) - cancelAllOutgoingRetries при дисконнекте - Обновлены release notes для 1.2.1 Co-Authored-By: Claude Opus 4.6 (1M context) --- RELEASE_NOTES.md | 11 ++ .../messenger/data/MessageRepository.kt | 35 +++++- .../messenger/network/ProtocolManager.kt | 115 +++++++++++++++++- 3 files changed, 155 insertions(+), 6 deletions(-) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index c2eab6e..a3dc7f4 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -1,5 +1,16 @@ # Release Notes +## 1.2.1 + +### Синхронизация Android ↔ iOS +- Исправлена критическая проблема: сообщения зависали на «часиках» при одновременном использовании Android и iOS. +- Добавлен механизм автоматического повтора отправки (retry) — как в iOS: 3 попытки с интервалом 4 сек, таймаут 80 сек. +- Исправлена нормализация sync-курсора (секунды → миллисекунды) для корректной синхронизации между устройствами. + +### UI-улучшения +- Дата «today/yesterday» и пустой стейт чата теперь белые при тёмных обоях или тёмной теме. +- Исправлена обрезка имени отправителя в групповых чатах — бабл расширяется под имя. + ## 1.2.0 (обновление с 1.1.9) - Синхронизированы индикаторы отправки между чат-листом и диалогом: diff --git a/app/src/main/java/com/rosetta/messenger/data/MessageRepository.kt b/app/src/main/java/com/rosetta/messenger/data/MessageRepository.kt index 6f8cd42..f90c150 100644 --- a/app/src/main/java/com/rosetta/messenger/data/MessageRepository.kt +++ b/app/src/main/java/com/rosetta/messenger/data/MessageRepository.kt @@ -389,8 +389,21 @@ class MessageRepository private constructor(private val context: Context) { suspend fun updateLastSyncTimestamp(timestamp: Long) { val account = currentAccount ?: return - // Desktop parity: store raw cursor value from sync/update events. - syncTimeDao.upsert(AccountSyncTimeEntity(account = account, lastSync = timestamp)) + // iOS parity: normalize timestamp to milliseconds before storing. + // Server may send seconds or milliseconds depending on the packet. + val normalized = normalizeSyncTimestamp(timestamp) + if (normalized <= 0L) return + val existing = syncTimeDao.getLastSync(account) ?: 0L + if (normalized <= existing) return + syncTimeDao.upsert(AccountSyncTimeEntity(account = account, lastSync = normalized)) + } + + /** + * iOS parity: normalize sync timestamp to milliseconds. + * Values below 1_000_000_000_000 are assumed to be in seconds. + */ + private fun normalizeSyncTimestamp(raw: Long): Long { + return if (raw < 1_000_000_000_000L) raw * 1000L else raw } /** Получить поток сообщений для диалога */ @@ -589,7 +602,8 @@ class MessageRepository private constructor(private val context: Context) { // 📝 LOG: Отправка пакета MessageLogger.logPacketSend(messageId, toPublicKey, timestamp) - ProtocolManager.send(packet) + // iOS parity: send with automatic retry (4s interval, 3 attempts, 80s timeout) + ProtocolManager.sendMessageWithRetry(packet) // 📝 LOG: Успешная отправка MessageLogger.logSendSuccess(messageId, System.currentTimeMillis() - startTime) @@ -1135,7 +1149,8 @@ class MessageRepository private constructor(private val context: Context) { this.attachments = emptyList() // Attachments already uploaded, tags are in content } - ProtocolManager.send(packet) + // iOS parity: use retry mechanism for reconnect-resent messages too + ProtocolManager.sendMessageWithRetry(packet) android.util.Log.d("MessageRepository", "🔄 Resent WAITING message: ${entity.messageId.take(8)}") } catch (e: Exception) { android.util.Log.e("MessageRepository", "❌ Failed to retry message ${entity.messageId.take(8)}: ${e.message}") @@ -1158,7 +1173,7 @@ class MessageRepository private constructor(private val context: Context) { * Получить ключ диалога для группировки сообщений 📁 SAVED MESSAGES: Для saved messages * (account == opponentKey) возвращает просто account */ - private fun getDialogKey(opponentKey: String): String { + fun getDialogKey(opponentKey: String): String { val account = currentAccount ?: return opponentKey if (isGroupDialogKey(opponentKey)) { return opponentKey.trim() @@ -1238,6 +1253,16 @@ class MessageRepository private constructor(private val context: Context) { } } + /** + * Public API for ProtocolManager to update delivery status (e.g., marking as ERROR on retry timeout). + */ + suspend fun updateMessageDeliveryStatus(dialogKey: String, messageId: String, status: DeliveryStatus) { + val account = currentAccount ?: return + messageDao.updateDeliveryStatus(account, messageId, status.value) + updateMessageStatus(dialogKey, messageId, status) + _deliveryStatusEvents.tryEmit(DeliveryStatusUpdate(dialogKey, messageId, status)) + } + private fun updateMessageStatus(dialogKey: String, messageId: String, status: DeliveryStatus) { messageCache[dialogKey]?.let { flow -> flow.value = diff --git a/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt b/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt index 841e2b0..71bb1e6 100644 --- a/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt +++ b/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt @@ -100,6 +100,17 @@ object ProtocolManager { private val inboundProcessingFailures = AtomicInteger(0) private val syncBatchEndMutex = Mutex() + // iOS parity: outgoing message retry mechanism. + // iOS retries stuck WAITING messages every 4 seconds, up to 3 attempts, + // with a hard timeout of 80 seconds. Without this, messages stay as "clocks" + // until the next reconnect (which may never happen if the connection is alive). + private const val OUTGOING_RETRY_INTERVAL_MS = 4_000L + private const val OUTGOING_MAX_RETRY_ATTEMPTS = 3 + private const val OUTGOING_MAX_LIFETIME_MS = 80_000L + private val pendingOutgoingPackets = ConcurrentHashMap() + private val pendingOutgoingAttempts = ConcurrentHashMap() + private val pendingOutgoingRetryJobs = ConcurrentHashMap() + private fun setSyncInProgress(value: Boolean) { syncBatchInProgress = value if (_syncInProgress.value != value) { @@ -196,6 +207,9 @@ object ProtocolManager { setSyncInProgress(false) // Connection/session dropped: force re-subscribe on next AUTHENTICATED. lastSubscribedToken = null + // iOS parity: cancel all pending outgoing retries on disconnect. + // They will be retried via retryWaitingMessages() on next handshake. + cancelAllOutgoingRetries() } lastProtocolState = newState } @@ -263,6 +277,8 @@ object ProtocolManager { } try { repository.handleDelivery(deliveryPacket) + // iOS parity: cancel retry timer on delivery ACK + resolveOutgoingRetry(deliveryPacket.messageId) } catch (e: Exception) { markInboundProcessingFailure("Delivery processing failed", e) return@launchInboundPacketTask @@ -1021,7 +1037,104 @@ object ProtocolManager { fun send(packet: Packet) { getProtocol().sendPacket(packet) } - + + /** + * Send an outgoing message packet and register it for automatic retry. + * iOS parity: registerOutgoingRetry + scheduleOutgoingRetry. + */ + fun sendMessageWithRetry(packet: PacketMessage) { + send(packet) + registerOutgoingRetry(packet) + } + + /** + * iOS parity: register an outgoing message for automatic retry. + */ + private fun registerOutgoingRetry(packet: PacketMessage) { + val messageId = packet.messageId + pendingOutgoingRetryJobs[messageId]?.cancel() + pendingOutgoingPackets[messageId] = packet + pendingOutgoingAttempts[messageId] = 0 + scheduleOutgoingRetry(messageId) + } + + /** + * iOS parity: schedule a retry attempt for a pending outgoing message. + * Retries every 4 seconds, marks as ERROR after 80s or 3 attempts. + */ + private fun scheduleOutgoingRetry(messageId: String) { + pendingOutgoingRetryJobs[messageId]?.cancel() + pendingOutgoingRetryJobs[messageId] = scope.launch { + delay(OUTGOING_RETRY_INTERVAL_MS) + + val packet = pendingOutgoingPackets[messageId] ?: return@launch + val attempts = pendingOutgoingAttempts[messageId] ?: 0 + + // Check if message exceeded delivery timeout (80s) + val nowMs = System.currentTimeMillis() + val ageMs = nowMs - packet.timestamp + if (ageMs >= OUTGOING_MAX_LIFETIME_MS) { + addLog("⚠️ Message ${messageId.take(8)} expired after ${ageMs}ms — marking as error") + markOutgoingAsError(messageId, packet) + return@launch + } + + if (attempts >= OUTGOING_MAX_RETRY_ATTEMPTS) { + addLog("⚠️ Message ${messageId.take(8)} exhausted $attempts retries — marking as error") + markOutgoingAsError(messageId, packet) + return@launch + } + + if (!isAuthenticated()) { + // Not authenticated — defer to reconnect retry + addLog("⏳ Message ${messageId.take(8)} retry deferred — not authenticated") + resolveOutgoingRetry(messageId) + return@launch + } + + val nextAttempt = attempts + 1 + pendingOutgoingAttempts[messageId] = nextAttempt + addLog("🔄 Retrying message ${messageId.take(8)}, attempt $nextAttempt") + send(packet) + scheduleOutgoingRetry(messageId) + } + } + + /** + * iOS parity: mark an outgoing message as error and clean up retry state. + */ + private fun markOutgoingAsError(messageId: String, packet: PacketMessage) { + scope.launch { + val repository = messageRepository ?: return@launch + val opponentKey = if (packet.fromPublicKey == repository.getCurrentAccountKey()) + packet.toPublicKey else packet.fromPublicKey + val dialogKey = repository.getDialogKey(opponentKey) + repository.updateMessageDeliveryStatus(dialogKey, messageId, DeliveryStatus.ERROR) + } + resolveOutgoingRetry(messageId) + } + + /** + * iOS parity: cancel retry and clean up state for a resolved outgoing message. + * Called when delivery ACK (0x08) is received. + */ + fun resolveOutgoingRetry(messageId: String) { + pendingOutgoingRetryJobs[messageId]?.cancel() + pendingOutgoingRetryJobs.remove(messageId) + pendingOutgoingPackets.remove(messageId) + pendingOutgoingAttempts.remove(messageId) + } + + /** + * Cancel all pending outgoing retry jobs (e.g., on disconnect). + */ + private fun cancelAllOutgoingRetries() { + pendingOutgoingRetryJobs.values.forEach { it.cancel() } + pendingOutgoingRetryJobs.clear() + pendingOutgoingPackets.clear() + pendingOutgoingAttempts.clear() + } + /** * Send packet (legacy name) */