Фикс синхронизации Android ↔ iOS: retry механизм и нормализация sync-курсора
- Добавлен retry для исходящих сообщений (iOS parity): 4с интервал, 3 попытки, 80с таймаут - Нормализация sync timestamp в миллисекунды (предотвращает расхождение курсора) - resolveOutgoingRetry при получении delivery ACK (0x08) - cancelAllOutgoingRetries при дисконнекте - Обновлены release notes для 1.2.1 Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -1,5 +1,16 @@
|
|||||||
# Release Notes
|
# Release Notes
|
||||||
|
|
||||||
|
## 1.2.1
|
||||||
|
|
||||||
|
### Синхронизация Android ↔ iOS
|
||||||
|
- Исправлена критическая проблема: сообщения зависали на «часиках» при одновременном использовании Android и iOS.
|
||||||
|
- Добавлен механизм автоматического повтора отправки (retry) — как в iOS: 3 попытки с интервалом 4 сек, таймаут 80 сек.
|
||||||
|
- Исправлена нормализация sync-курсора (секунды → миллисекунды) для корректной синхронизации между устройствами.
|
||||||
|
|
||||||
|
### UI-улучшения
|
||||||
|
- Дата «today/yesterday» и пустой стейт чата теперь белые при тёмных обоях или тёмной теме.
|
||||||
|
- Исправлена обрезка имени отправителя в групповых чатах — бабл расширяется под имя.
|
||||||
|
|
||||||
## 1.2.0 (обновление с 1.1.9)
|
## 1.2.0 (обновление с 1.1.9)
|
||||||
|
|
||||||
- Синхронизированы индикаторы отправки между чат-листом и диалогом:
|
- Синхронизированы индикаторы отправки между чат-листом и диалогом:
|
||||||
|
|||||||
@@ -389,8 +389,21 @@ class MessageRepository private constructor(private val context: Context) {
|
|||||||
|
|
||||||
suspend fun updateLastSyncTimestamp(timestamp: Long) {
|
suspend fun updateLastSyncTimestamp(timestamp: Long) {
|
||||||
val account = currentAccount ?: return
|
val account = currentAccount ?: return
|
||||||
// Desktop parity: store raw cursor value from sync/update events.
|
// iOS parity: normalize timestamp to milliseconds before storing.
|
||||||
syncTimeDao.upsert(AccountSyncTimeEntity(account = account, lastSync = timestamp))
|
// Server may send seconds or milliseconds depending on the packet.
|
||||||
|
val normalized = normalizeSyncTimestamp(timestamp)
|
||||||
|
if (normalized <= 0L) return
|
||||||
|
val existing = syncTimeDao.getLastSync(account) ?: 0L
|
||||||
|
if (normalized <= existing) return
|
||||||
|
syncTimeDao.upsert(AccountSyncTimeEntity(account = account, lastSync = normalized))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* iOS parity: normalize sync timestamp to milliseconds.
|
||||||
|
* Values below 1_000_000_000_000 are assumed to be in seconds.
|
||||||
|
*/
|
||||||
|
private fun normalizeSyncTimestamp(raw: Long): Long {
|
||||||
|
return if (raw < 1_000_000_000_000L) raw * 1000L else raw
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Получить поток сообщений для диалога */
|
/** Получить поток сообщений для диалога */
|
||||||
@@ -589,7 +602,8 @@ class MessageRepository private constructor(private val context: Context) {
|
|||||||
// 📝 LOG: Отправка пакета
|
// 📝 LOG: Отправка пакета
|
||||||
MessageLogger.logPacketSend(messageId, toPublicKey, timestamp)
|
MessageLogger.logPacketSend(messageId, toPublicKey, timestamp)
|
||||||
|
|
||||||
ProtocolManager.send(packet)
|
// iOS parity: send with automatic retry (4s interval, 3 attempts, 80s timeout)
|
||||||
|
ProtocolManager.sendMessageWithRetry(packet)
|
||||||
|
|
||||||
// 📝 LOG: Успешная отправка
|
// 📝 LOG: Успешная отправка
|
||||||
MessageLogger.logSendSuccess(messageId, System.currentTimeMillis() - startTime)
|
MessageLogger.logSendSuccess(messageId, System.currentTimeMillis() - startTime)
|
||||||
@@ -1135,7 +1149,8 @@ class MessageRepository private constructor(private val context: Context) {
|
|||||||
this.attachments = emptyList() // Attachments already uploaded, tags are in content
|
this.attachments = emptyList() // Attachments already uploaded, tags are in content
|
||||||
}
|
}
|
||||||
|
|
||||||
ProtocolManager.send(packet)
|
// iOS parity: use retry mechanism for reconnect-resent messages too
|
||||||
|
ProtocolManager.sendMessageWithRetry(packet)
|
||||||
android.util.Log.d("MessageRepository", "🔄 Resent WAITING message: ${entity.messageId.take(8)}")
|
android.util.Log.d("MessageRepository", "🔄 Resent WAITING message: ${entity.messageId.take(8)}")
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
android.util.Log.e("MessageRepository", "❌ Failed to retry message ${entity.messageId.take(8)}: ${e.message}")
|
android.util.Log.e("MessageRepository", "❌ Failed to retry message ${entity.messageId.take(8)}: ${e.message}")
|
||||||
@@ -1158,7 +1173,7 @@ class MessageRepository private constructor(private val context: Context) {
|
|||||||
* Получить ключ диалога для группировки сообщений 📁 SAVED MESSAGES: Для saved messages
|
* Получить ключ диалога для группировки сообщений 📁 SAVED MESSAGES: Для saved messages
|
||||||
* (account == opponentKey) возвращает просто account
|
* (account == opponentKey) возвращает просто account
|
||||||
*/
|
*/
|
||||||
private fun getDialogKey(opponentKey: String): String {
|
fun getDialogKey(opponentKey: String): String {
|
||||||
val account = currentAccount ?: return opponentKey
|
val account = currentAccount ?: return opponentKey
|
||||||
if (isGroupDialogKey(opponentKey)) {
|
if (isGroupDialogKey(opponentKey)) {
|
||||||
return opponentKey.trim()
|
return opponentKey.trim()
|
||||||
@@ -1238,6 +1253,16 @@ class MessageRepository private constructor(private val context: Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Public API for ProtocolManager to update delivery status (e.g., marking as ERROR on retry timeout).
|
||||||
|
*/
|
||||||
|
suspend fun updateMessageDeliveryStatus(dialogKey: String, messageId: String, status: DeliveryStatus) {
|
||||||
|
val account = currentAccount ?: return
|
||||||
|
messageDao.updateDeliveryStatus(account, messageId, status.value)
|
||||||
|
updateMessageStatus(dialogKey, messageId, status)
|
||||||
|
_deliveryStatusEvents.tryEmit(DeliveryStatusUpdate(dialogKey, messageId, status))
|
||||||
|
}
|
||||||
|
|
||||||
private fun updateMessageStatus(dialogKey: String, messageId: String, status: DeliveryStatus) {
|
private fun updateMessageStatus(dialogKey: String, messageId: String, status: DeliveryStatus) {
|
||||||
messageCache[dialogKey]?.let { flow ->
|
messageCache[dialogKey]?.let { flow ->
|
||||||
flow.value =
|
flow.value =
|
||||||
|
|||||||
@@ -100,6 +100,17 @@ object ProtocolManager {
|
|||||||
private val inboundProcessingFailures = AtomicInteger(0)
|
private val inboundProcessingFailures = AtomicInteger(0)
|
||||||
private val syncBatchEndMutex = Mutex()
|
private val syncBatchEndMutex = Mutex()
|
||||||
|
|
||||||
|
// iOS parity: outgoing message retry mechanism.
|
||||||
|
// iOS retries stuck WAITING messages every 4 seconds, up to 3 attempts,
|
||||||
|
// with a hard timeout of 80 seconds. Without this, messages stay as "clocks"
|
||||||
|
// until the next reconnect (which may never happen if the connection is alive).
|
||||||
|
private const val OUTGOING_RETRY_INTERVAL_MS = 4_000L
|
||||||
|
private const val OUTGOING_MAX_RETRY_ATTEMPTS = 3
|
||||||
|
private const val OUTGOING_MAX_LIFETIME_MS = 80_000L
|
||||||
|
private val pendingOutgoingPackets = ConcurrentHashMap<String, PacketMessage>()
|
||||||
|
private val pendingOutgoingAttempts = ConcurrentHashMap<String, Int>()
|
||||||
|
private val pendingOutgoingRetryJobs = ConcurrentHashMap<String, Job>()
|
||||||
|
|
||||||
private fun setSyncInProgress(value: Boolean) {
|
private fun setSyncInProgress(value: Boolean) {
|
||||||
syncBatchInProgress = value
|
syncBatchInProgress = value
|
||||||
if (_syncInProgress.value != value) {
|
if (_syncInProgress.value != value) {
|
||||||
@@ -196,6 +207,9 @@ object ProtocolManager {
|
|||||||
setSyncInProgress(false)
|
setSyncInProgress(false)
|
||||||
// Connection/session dropped: force re-subscribe on next AUTHENTICATED.
|
// Connection/session dropped: force re-subscribe on next AUTHENTICATED.
|
||||||
lastSubscribedToken = null
|
lastSubscribedToken = null
|
||||||
|
// iOS parity: cancel all pending outgoing retries on disconnect.
|
||||||
|
// They will be retried via retryWaitingMessages() on next handshake.
|
||||||
|
cancelAllOutgoingRetries()
|
||||||
}
|
}
|
||||||
lastProtocolState = newState
|
lastProtocolState = newState
|
||||||
}
|
}
|
||||||
@@ -263,6 +277,8 @@ object ProtocolManager {
|
|||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
repository.handleDelivery(deliveryPacket)
|
repository.handleDelivery(deliveryPacket)
|
||||||
|
// iOS parity: cancel retry timer on delivery ACK
|
||||||
|
resolveOutgoingRetry(deliveryPacket.messageId)
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
markInboundProcessingFailure("Delivery processing failed", e)
|
markInboundProcessingFailure("Delivery processing failed", e)
|
||||||
return@launchInboundPacketTask
|
return@launchInboundPacketTask
|
||||||
@@ -1022,6 +1038,103 @@ object ProtocolManager {
|
|||||||
getProtocol().sendPacket(packet)
|
getProtocol().sendPacket(packet)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Send an outgoing message packet and register it for automatic retry.
|
||||||
|
* iOS parity: registerOutgoingRetry + scheduleOutgoingRetry.
|
||||||
|
*/
|
||||||
|
fun sendMessageWithRetry(packet: PacketMessage) {
|
||||||
|
send(packet)
|
||||||
|
registerOutgoingRetry(packet)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* iOS parity: register an outgoing message for automatic retry.
|
||||||
|
*/
|
||||||
|
private fun registerOutgoingRetry(packet: PacketMessage) {
|
||||||
|
val messageId = packet.messageId
|
||||||
|
pendingOutgoingRetryJobs[messageId]?.cancel()
|
||||||
|
pendingOutgoingPackets[messageId] = packet
|
||||||
|
pendingOutgoingAttempts[messageId] = 0
|
||||||
|
scheduleOutgoingRetry(messageId)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* iOS parity: schedule a retry attempt for a pending outgoing message.
|
||||||
|
* Retries every 4 seconds, marks as ERROR after 80s or 3 attempts.
|
||||||
|
*/
|
||||||
|
private fun scheduleOutgoingRetry(messageId: String) {
|
||||||
|
pendingOutgoingRetryJobs[messageId]?.cancel()
|
||||||
|
pendingOutgoingRetryJobs[messageId] = scope.launch {
|
||||||
|
delay(OUTGOING_RETRY_INTERVAL_MS)
|
||||||
|
|
||||||
|
val packet = pendingOutgoingPackets[messageId] ?: return@launch
|
||||||
|
val attempts = pendingOutgoingAttempts[messageId] ?: 0
|
||||||
|
|
||||||
|
// Check if message exceeded delivery timeout (80s)
|
||||||
|
val nowMs = System.currentTimeMillis()
|
||||||
|
val ageMs = nowMs - packet.timestamp
|
||||||
|
if (ageMs >= OUTGOING_MAX_LIFETIME_MS) {
|
||||||
|
addLog("⚠️ Message ${messageId.take(8)} expired after ${ageMs}ms — marking as error")
|
||||||
|
markOutgoingAsError(messageId, packet)
|
||||||
|
return@launch
|
||||||
|
}
|
||||||
|
|
||||||
|
if (attempts >= OUTGOING_MAX_RETRY_ATTEMPTS) {
|
||||||
|
addLog("⚠️ Message ${messageId.take(8)} exhausted $attempts retries — marking as error")
|
||||||
|
markOutgoingAsError(messageId, packet)
|
||||||
|
return@launch
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!isAuthenticated()) {
|
||||||
|
// Not authenticated — defer to reconnect retry
|
||||||
|
addLog("⏳ Message ${messageId.take(8)} retry deferred — not authenticated")
|
||||||
|
resolveOutgoingRetry(messageId)
|
||||||
|
return@launch
|
||||||
|
}
|
||||||
|
|
||||||
|
val nextAttempt = attempts + 1
|
||||||
|
pendingOutgoingAttempts[messageId] = nextAttempt
|
||||||
|
addLog("🔄 Retrying message ${messageId.take(8)}, attempt $nextAttempt")
|
||||||
|
send(packet)
|
||||||
|
scheduleOutgoingRetry(messageId)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* iOS parity: mark an outgoing message as error and clean up retry state.
|
||||||
|
*/
|
||||||
|
private fun markOutgoingAsError(messageId: String, packet: PacketMessage) {
|
||||||
|
scope.launch {
|
||||||
|
val repository = messageRepository ?: return@launch
|
||||||
|
val opponentKey = if (packet.fromPublicKey == repository.getCurrentAccountKey())
|
||||||
|
packet.toPublicKey else packet.fromPublicKey
|
||||||
|
val dialogKey = repository.getDialogKey(opponentKey)
|
||||||
|
repository.updateMessageDeliveryStatus(dialogKey, messageId, DeliveryStatus.ERROR)
|
||||||
|
}
|
||||||
|
resolveOutgoingRetry(messageId)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* iOS parity: cancel retry and clean up state for a resolved outgoing message.
|
||||||
|
* Called when delivery ACK (0x08) is received.
|
||||||
|
*/
|
||||||
|
fun resolveOutgoingRetry(messageId: String) {
|
||||||
|
pendingOutgoingRetryJobs[messageId]?.cancel()
|
||||||
|
pendingOutgoingRetryJobs.remove(messageId)
|
||||||
|
pendingOutgoingPackets.remove(messageId)
|
||||||
|
pendingOutgoingAttempts.remove(messageId)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cancel all pending outgoing retry jobs (e.g., on disconnect).
|
||||||
|
*/
|
||||||
|
private fun cancelAllOutgoingRetries() {
|
||||||
|
pendingOutgoingRetryJobs.values.forEach { it.cancel() }
|
||||||
|
pendingOutgoingRetryJobs.clear()
|
||||||
|
pendingOutgoingPackets.clear()
|
||||||
|
pendingOutgoingAttempts.clear()
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Send packet (legacy name)
|
* Send packet (legacy name)
|
||||||
*/
|
*/
|
||||||
|
|||||||
Reference in New Issue
Block a user