feat: update version to 1.1.1 and enhance group chat features, sync stability, and UI improvements
Some checks failed
Android Kernel Build / build (push) Failing after 19m6s
Some checks failed
Android Kernel Build / build (push) Failing after 19m6s
This commit is contained in:
@@ -23,8 +23,8 @@ val gitShortSha = safeGitOutput("rev-parse", "--short", "HEAD") ?: "unknown"
|
|||||||
// ═══════════════════════════════════════════════════════════
|
// ═══════════════════════════════════════════════════════════
|
||||||
// Rosetta versioning — bump here on each release
|
// Rosetta versioning — bump here on each release
|
||||||
// ═══════════════════════════════════════════════════════════
|
// ═══════════════════════════════════════════════════════════
|
||||||
val rosettaVersionName = "1.1.0"
|
val rosettaVersionName = "1.1.1"
|
||||||
val rosettaVersionCode = 12 // Increment on each release
|
val rosettaVersionCode = 13 // Increment on each release
|
||||||
|
|
||||||
android {
|
android {
|
||||||
namespace = "com.rosetta.messenger"
|
namespace = "com.rosetta.messenger"
|
||||||
|
|||||||
@@ -407,8 +407,6 @@ class MainActivity : FragmentActivity() {
|
|||||||
com.rosetta.messenger.push.RosettaFirebaseMessagingService.isAppInForeground = true
|
com.rosetta.messenger.push.RosettaFirebaseMessagingService.isAppInForeground = true
|
||||||
// ⚡ На возврате в приложение пробуем мгновенный reconnect без ожидания backoff.
|
// ⚡ На возврате в приложение пробуем мгновенный reconnect без ожидания backoff.
|
||||||
ProtocolManager.reconnectNowIfNeeded("activity_onResume")
|
ProtocolManager.reconnectNowIfNeeded("activity_onResume")
|
||||||
// 🔄 Desktop parity: синхронизация при каждом заходе в приложение
|
|
||||||
ProtocolManager.syncOnForeground()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
override fun onPause() {
|
override fun onPause() {
|
||||||
|
|||||||
@@ -99,7 +99,6 @@ class MessageRepository private constructor(private val context: Context) {
|
|||||||
|
|
||||||
/** Desktop parity: MESSAGE_MAX_TIME_TO_DELEVERED_S = 80 (seconds) */
|
/** Desktop parity: MESSAGE_MAX_TIME_TO_DELEVERED_S = 80 (seconds) */
|
||||||
private const val MESSAGE_MAX_TIME_TO_DELIVERED_MS = 80_000L
|
private const val MESSAGE_MAX_TIME_TO_DELIVERED_MS = 80_000L
|
||||||
private const val MAX_SYNC_FUTURE_DRIFT_MS = 86_400_000L // 24h
|
|
||||||
|
|
||||||
const val SYSTEM_SAFE_PUBLIC_KEY = "0x000000000000000000000000000000000000000002"
|
const val SYSTEM_SAFE_PUBLIC_KEY = "0x000000000000000000000000000000000000000002"
|
||||||
const val SYSTEM_SAFE_TITLE = "Safe"
|
const val SYSTEM_SAFE_TITLE = "Safe"
|
||||||
@@ -385,43 +384,13 @@ class MessageRepository private constructor(private val context: Context) {
|
|||||||
|
|
||||||
suspend fun getLastSyncTimestamp(): Long {
|
suspend fun getLastSyncTimestamp(): Long {
|
||||||
val account = currentAccount ?: return 0L
|
val account = currentAccount ?: return 0L
|
||||||
val stored = syncTimeDao.getLastSync(account) ?: 0L
|
return syncTimeDao.getLastSync(account) ?: 0L
|
||||||
val normalized = normalizeSyncTimestamp(stored)
|
|
||||||
if (normalized != stored) {
|
|
||||||
syncTimeDao.upsert(AccountSyncTimeEntity(account = account, lastSync = normalized))
|
|
||||||
if (stored > 0) {
|
|
||||||
android.util.Log.w(
|
|
||||||
"MessageRepository",
|
|
||||||
"⚠️ Normalized invalid last_sync for account=${account.take(10)}...: $stored -> $normalized"
|
|
||||||
)
|
|
||||||
ProtocolManager.addLog("⚠️ SYNC cursor normalized: $stored -> $normalized")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return normalized
|
|
||||||
}
|
}
|
||||||
|
|
||||||
suspend fun updateLastSyncTimestamp(timestamp: Long) {
|
suspend fun updateLastSyncTimestamp(timestamp: Long) {
|
||||||
if (timestamp <= 0) return
|
|
||||||
val account = currentAccount ?: return
|
val account = currentAccount ?: return
|
||||||
val normalized = normalizeSyncTimestamp(timestamp)
|
// Desktop parity: store raw cursor value from sync/update events.
|
||||||
if (normalized <= 0) return
|
syncTimeDao.upsert(AccountSyncTimeEntity(account = account, lastSync = timestamp))
|
||||||
// Desktop parity: allow moving sync cursor backward if needed.
|
|
||||||
syncTimeDao.upsert(AccountSyncTimeEntity(account = account, lastSync = normalized))
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun normalizeSyncTimestamp(rawTimestamp: Long): Long {
|
|
||||||
if (rawTimestamp <= 0) return 0L
|
|
||||||
val now = System.currentTimeMillis()
|
|
||||||
val maxAllowed = now + MAX_SYNC_FUTURE_DRIFT_MS
|
|
||||||
var normalized = rawTimestamp
|
|
||||||
|
|
||||||
// Heal common corruption where extra decimal places appear in timestamp.
|
|
||||||
while (normalized > maxAllowed) {
|
|
||||||
normalized /= 10L
|
|
||||||
if (normalized <= 0L) return 0L
|
|
||||||
}
|
|
||||||
|
|
||||||
return if (normalized > maxAllowed) 0L else normalized
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Получить поток сообщений для диалога */
|
/** Получить поток сообщений для диалога */
|
||||||
@@ -953,10 +922,26 @@ class MessageRepository private constructor(private val context: Context) {
|
|||||||
// 1) from=opponent, to=account -> собеседник прочитал НАШИ сообщения (double check)
|
// 1) from=opponent, to=account -> собеседник прочитал НАШИ сообщения (double check)
|
||||||
// 2) from=account, to=opponent -> sync с другого нашего устройства (мы прочитали входящие)
|
// 2) from=account, to=opponent -> sync с другого нашего устройства (мы прочитали входящие)
|
||||||
val isOwnReadSync = fromPublicKey == account
|
val isOwnReadSync = fromPublicKey == account
|
||||||
|
|
||||||
|
// Desktop parity (group): from=groupMember, to=groupId -> mark own group messages as read.
|
||||||
if (!isOwnReadSync && isGroupDialogKey(toPublicKey)) {
|
if (!isOwnReadSync && isGroupDialogKey(toPublicKey)) {
|
||||||
// Group read receipts are currently not mapped to per-message states.
|
val dialogKey = getDialogKey(toPublicKey)
|
||||||
|
messageDao.markAllAsRead(account, toPublicKey)
|
||||||
|
|
||||||
|
val readCount = messageCache[dialogKey]?.value?.count { it.isFromMe && !it.isRead } ?: 0
|
||||||
|
messageCache[dialogKey]?.let { flow ->
|
||||||
|
flow.value =
|
||||||
|
flow.value.map { msg ->
|
||||||
|
if (msg.isFromMe && !msg.isRead) msg.copy(isRead = true) else msg
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
_deliveryStatusEvents.tryEmit(DeliveryStatusUpdate(dialogKey, "", DeliveryStatus.READ))
|
||||||
|
MessageLogger.logReadStatus(fromPublicKey = toPublicKey, messagesCount = readCount)
|
||||||
|
dialogDao.updateDialogFromMessages(account, toPublicKey)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
val opponentKey = if (isOwnReadSync) toPublicKey else fromPublicKey
|
val opponentKey = if (isOwnReadSync) toPublicKey else fromPublicKey
|
||||||
if (opponentKey.isBlank()) return
|
if (opponentKey.isBlank()) return
|
||||||
|
|
||||||
|
|||||||
@@ -17,9 +17,28 @@ object ReleaseNotes {
|
|||||||
val RELEASE_NOTICE = """
|
val RELEASE_NOTICE = """
|
||||||
Update v$VERSION_PLACEHOLDER
|
Update v$VERSION_PLACEHOLDER
|
||||||
|
|
||||||
Синхронизация сообщений
|
Группы и интерфейс
|
||||||
- Исправлен бесконечный цикл синхронизации, когда сервер возвращал пустые батчи с неизменным курсором
|
- Полностью обновлен экран группы в стиле приложения (по паритету с desktop логикой)
|
||||||
- Вынесена общая логика завершения sync-цикла для единообразной обработки всех сценариев
|
- В участниках добавлены верификации, админ-метка и тултип администратора
|
||||||
|
- Добавлен просмотр Encryption Key с QR-кодом
|
||||||
|
- Улучшены секции Media/Files/Links: корректные пустые состояния и выравнивание медиа-сетки
|
||||||
|
|
||||||
|
Сообщения и списки
|
||||||
|
- Group Invite теперь отображается как invite-карточка вместо хэша (в чате и в chat list)
|
||||||
|
- Для групп в chat list показывается иконка и автор последнего сообщения (You/имя отправителя)
|
||||||
|
- Исправлено выравнивание превью вида "You: Photo"
|
||||||
|
- Системные события группы (например joined the group) приведены к desktop-стилю
|
||||||
|
|
||||||
|
Модерация групп
|
||||||
|
- Добавлены свайп и long-press действия по участникам (Kick)
|
||||||
|
- Улучшены цвета, haptic и размеры action-кнопки; исправлен конфликт свайпа item vs экран
|
||||||
|
- Для групп в chat list добавлены swipe-actions: Pin, Leave, Delete
|
||||||
|
|
||||||
|
Синхронизация и стабильность
|
||||||
|
- Исправлены пропуски сообщений при массовой синхронизации личных и групповых чатов
|
||||||
|
- Sync теперь не продвигает курсор батча при ошибках обработки и делает безопасные ретраи
|
||||||
|
- Исправлены кейсы, где requests зависели от состояния устройства, а не аккаунта
|
||||||
|
- Rosetta Updates и Safe исключены из requests
|
||||||
""".trimIndent()
|
""".trimIndent()
|
||||||
|
|
||||||
fun getNotice(version: String): String =
|
fun getNotice(version: String): String =
|
||||||
|
|||||||
@@ -395,12 +395,11 @@ interface MessageDao {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Отметить все исходящие сообщения к собеседнику как прочитанные Используется когда приходит
|
* Отметить все исходящие сообщения к собеседнику как прочитанные Используется когда приходит
|
||||||
* PacketRead от собеседника 🔥 ВАЖНО: delivered=3 означает READ (синхронизировано с
|
* PacketRead от собеседника.
|
||||||
* ChatViewModel)
|
|
||||||
*/
|
*/
|
||||||
@Query(
|
@Query(
|
||||||
"""
|
"""
|
||||||
UPDATE messages SET delivered = 3, read = 1
|
UPDATE messages SET read = 1
|
||||||
WHERE account = :account AND to_public_key = :opponent AND from_me = 1
|
WHERE account = :account AND to_public_key = :opponent AND from_me = 1
|
||||||
"""
|
"""
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -27,8 +27,6 @@ import kotlin.coroutines.resume
|
|||||||
object ProtocolManager {
|
object ProtocolManager {
|
||||||
private const val TAG = "ProtocolManager"
|
private const val TAG = "ProtocolManager"
|
||||||
private const val MANUAL_SYNC_BACKTRACK_MS = 120_000L
|
private const val MANUAL_SYNC_BACKTRACK_MS = 120_000L
|
||||||
private const val MAX_SYNC_FUTURE_DRIFT_MS = 86_400_000L // 24h
|
|
||||||
private const val MAX_STALLED_SYNC_BATCHES = 12
|
|
||||||
private const val MAX_DEBUG_LOGS = 600
|
private const val MAX_DEBUG_LOGS = 600
|
||||||
private const val DEBUG_LOG_FLUSH_DELAY_MS = 60L
|
private const val DEBUG_LOG_FLUSH_DELAY_MS = 60L
|
||||||
|
|
||||||
@@ -96,15 +94,10 @@ object ProtocolManager {
|
|||||||
// Tracks the tail of the sequential processing chain (like desktop's `tail` promise)
|
// Tracks the tail of the sequential processing chain (like desktop's `tail` promise)
|
||||||
@Volatile private var inboundQueueDrainJob: Job? = null
|
@Volatile private var inboundQueueDrainJob: Job? = null
|
||||||
private val inboundProcessingFailures = AtomicInteger(0)
|
private val inboundProcessingFailures = AtomicInteger(0)
|
||||||
private val syncBatchMessageCount = AtomicInteger(0)
|
|
||||||
private val stalledSyncBatchCount = AtomicInteger(0)
|
|
||||||
private val syncBatchEndMutex = Mutex()
|
private val syncBatchEndMutex = Mutex()
|
||||||
|
|
||||||
private fun setSyncInProgress(value: Boolean) {
|
private fun setSyncInProgress(value: Boolean) {
|
||||||
syncBatchInProgress = value
|
syncBatchInProgress = value
|
||||||
if (!value) {
|
|
||||||
stalledSyncBatchCount.set(0)
|
|
||||||
}
|
|
||||||
if (_syncInProgress.value != value) {
|
if (_syncInProgress.value != value) {
|
||||||
_syncInProgress.value = value
|
_syncInProgress.value = value
|
||||||
}
|
}
|
||||||
@@ -223,9 +216,7 @@ object ProtocolManager {
|
|||||||
*/
|
*/
|
||||||
private fun setupPacketHandlers() {
|
private fun setupPacketHandlers() {
|
||||||
// Обработчик входящих сообщений (0x06)
|
// Обработчик входящих сообщений (0x06)
|
||||||
// Desktop parity: delivery ACK is sent AFTER successful processing, not before.
|
// Desktop parity: desktop client does not send PacketDelivery manually.
|
||||||
// Desktop itself never sends PacketDelivery (server auto-generates them),
|
|
||||||
// but we still notify the sender after we've safely stored the message.
|
|
||||||
waitPacket(0x06) { packet ->
|
waitPacket(0x06) { packet ->
|
||||||
val messagePacket = packet as PacketMessage
|
val messagePacket = packet as PacketMessage
|
||||||
|
|
||||||
@@ -236,19 +227,6 @@ object ProtocolManager {
|
|||||||
markInboundProcessingFailure("Incoming packet skipped before account init")
|
markInboundProcessingFailure("Incoming packet skipped before account init")
|
||||||
return@launchInboundPacketTask
|
return@launchInboundPacketTask
|
||||||
}
|
}
|
||||||
val ownKey = getProtocol().getPublicKey().orEmpty()
|
|
||||||
if (ownKey.isBlank()) {
|
|
||||||
requireResyncAfterAccountInit("⏳ Incoming message before protocol account init, scheduling re-sync")
|
|
||||||
markInboundProcessingFailure("Incoming packet skipped before protocol account init")
|
|
||||||
return@launchInboundPacketTask
|
|
||||||
}
|
|
||||||
if (!isSupportedDirectMessagePacket(messagePacket, ownKey)) {
|
|
||||||
android.util.Log.w(
|
|
||||||
TAG,
|
|
||||||
"Skipping unsupported message packet (likely conversation): from=${messagePacket.fromPublicKey.take(16)}, to=${messagePacket.toPublicKey.take(16)}"
|
|
||||||
)
|
|
||||||
return@launchInboundPacketTask
|
|
||||||
}
|
|
||||||
val processed = repository.handleIncomingMessage(messagePacket)
|
val processed = repository.handleIncomingMessage(messagePacket)
|
||||||
if (!processed) {
|
if (!processed) {
|
||||||
markInboundProcessingFailure(
|
markInboundProcessingFailure(
|
||||||
@@ -258,17 +236,6 @@ object ProtocolManager {
|
|||||||
}
|
}
|
||||||
if (!syncBatchInProgress) {
|
if (!syncBatchInProgress) {
|
||||||
repository.updateLastSyncTimestamp(messagePacket.timestamp)
|
repository.updateLastSyncTimestamp(messagePacket.timestamp)
|
||||||
} else {
|
|
||||||
syncBatchMessageCount.incrementAndGet()
|
|
||||||
}
|
|
||||||
// ✅ Send delivery ACK only AFTER message is safely stored in DB.
|
|
||||||
// Skip for own sync messages (no need to ACK yourself).
|
|
||||||
if (messagePacket.fromPublicKey != ownKey) {
|
|
||||||
val deliveryPacket = PacketDelivery().apply {
|
|
||||||
messageId = messagePacket.messageId
|
|
||||||
toPublicKey = messagePacket.fromPublicKey
|
|
||||||
}
|
|
||||||
send(deliveryPacket)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -285,13 +252,6 @@ object ProtocolManager {
|
|||||||
markInboundProcessingFailure("Delivery packet skipped before account init")
|
markInboundProcessingFailure("Delivery packet skipped before account init")
|
||||||
return@launchInboundPacketTask
|
return@launchInboundPacketTask
|
||||||
}
|
}
|
||||||
if (isUnsupportedDialogKey(deliveryPacket.toPublicKey)) {
|
|
||||||
android.util.Log.w(
|
|
||||||
TAG,
|
|
||||||
"Skipping unsupported delivery packet: to=${deliveryPacket.toPublicKey.take(24)}"
|
|
||||||
)
|
|
||||||
return@launchInboundPacketTask
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
repository.handleDelivery(deliveryPacket)
|
repository.handleDelivery(deliveryPacket)
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
@@ -322,13 +282,6 @@ object ProtocolManager {
|
|||||||
markInboundProcessingFailure("Read packet skipped before protocol account init")
|
markInboundProcessingFailure("Read packet skipped before protocol account init")
|
||||||
return@launchInboundPacketTask
|
return@launchInboundPacketTask
|
||||||
}
|
}
|
||||||
if (!isSupportedDirectReadPacket(readPacket, ownKey)) {
|
|
||||||
android.util.Log.w(
|
|
||||||
TAG,
|
|
||||||
"Skipping unsupported read packet (likely conversation): from=${readPacket.fromPublicKey.take(16)}, to=${readPacket.toPublicKey.take(16)}"
|
|
||||||
)
|
|
||||||
return@launchInboundPacketTask
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
repository.handleRead(readPacket)
|
repository.handleRead(readPacket)
|
||||||
} catch (e: Exception) {
|
} catch (e: Exception) {
|
||||||
@@ -336,7 +289,14 @@ object ProtocolManager {
|
|||||||
return@launchInboundPacketTask
|
return@launchInboundPacketTask
|
||||||
}
|
}
|
||||||
if (!syncBatchInProgress) {
|
if (!syncBatchInProgress) {
|
||||||
repository.updateLastSyncTimestamp(System.currentTimeMillis())
|
// Desktop parity:
|
||||||
|
// own direct read sync (from=me,to=peer) does not advance sync cursor.
|
||||||
|
val isOwnDirectReadSync =
|
||||||
|
readPacket.fromPublicKey.trim() == ownKey &&
|
||||||
|
!isGroupDialogKey(readPacket.toPublicKey)
|
||||||
|
if (!isOwnDirectReadSync) {
|
||||||
|
repository.updateLastSyncTimestamp(System.currentTimeMillis())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -548,52 +508,6 @@ object ProtocolManager {
|
|||||||
return normalized.startsWith("#group:") || normalized.startsWith("group:")
|
return normalized.startsWith("#group:") || normalized.startsWith("group:")
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun isConversationDialogKey(value: String): Boolean {
|
|
||||||
val normalized = value.trim().lowercase(Locale.ROOT)
|
|
||||||
return normalized.startsWith("conversation:")
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun isUnsupportedDialogKey(value: String): Boolean {
|
|
||||||
val normalized = value.trim().lowercase(Locale.ROOT)
|
|
||||||
if (normalized.isBlank()) return true
|
|
||||||
if (isConversationDialogKey(normalized)) return true
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun isSupportedDirectPeerKey(peerKey: String, ownKey: String): Boolean {
|
|
||||||
val normalized = peerKey.trim()
|
|
||||||
if (normalized.isBlank()) return false
|
|
||||||
if (normalized == ownKey) return true
|
|
||||||
if (MessageRepository.isSystemAccount(normalized)) return true
|
|
||||||
return !isUnsupportedDialogKey(normalized)
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun isSupportedDirectMessagePacket(packet: PacketMessage, ownKey: String): Boolean {
|
|
||||||
val from = packet.fromPublicKey.trim()
|
|
||||||
val to = packet.toPublicKey.trim()
|
|
||||||
if (from.isBlank() || to.isBlank()) return false
|
|
||||||
if (isConversationDialogKey(from) || isConversationDialogKey(to)) return false
|
|
||||||
if (isGroupDialogKey(to)) return true
|
|
||||||
return when {
|
|
||||||
from == ownKey -> isSupportedDirectPeerKey(to, ownKey)
|
|
||||||
to == ownKey -> isSupportedDirectPeerKey(from, ownKey)
|
|
||||||
else -> false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun isSupportedDirectReadPacket(packet: PacketRead, ownKey: String): Boolean {
|
|
||||||
val from = packet.fromPublicKey.trim()
|
|
||||||
val to = packet.toPublicKey.trim()
|
|
||||||
if (from.isBlank() || to.isBlank()) return false
|
|
||||||
if (isConversationDialogKey(from) || isConversationDialogKey(to)) return false
|
|
||||||
if (isGroupDialogKey(to)) return true
|
|
||||||
return when {
|
|
||||||
from == ownKey -> isSupportedDirectPeerKey(to, ownKey)
|
|
||||||
to == ownKey -> isSupportedDirectPeerKey(from, ownKey)
|
|
||||||
else -> false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private fun onAuthenticated() {
|
private fun onAuthenticated() {
|
||||||
setSyncInProgress(false)
|
setSyncInProgress(false)
|
||||||
TransportManager.requestTransportServer()
|
TransportManager.requestTransportServer()
|
||||||
@@ -605,8 +519,6 @@ object ProtocolManager {
|
|||||||
|
|
||||||
private fun finishSyncCycle(reason: String) {
|
private fun finishSyncCycle(reason: String) {
|
||||||
syncRequestInFlight = false
|
syncRequestInFlight = false
|
||||||
stalledSyncBatchCount.set(0)
|
|
||||||
syncBatchMessageCount.set(0)
|
|
||||||
inboundProcessingFailures.set(0)
|
inboundProcessingFailures.set(0)
|
||||||
addLog(reason)
|
addLog(reason)
|
||||||
setSyncInProgress(false)
|
setSyncInProgress(false)
|
||||||
@@ -671,7 +583,6 @@ object ProtocolManager {
|
|||||||
addLog("⚠️ SYNC request skipped: previous request still in flight")
|
addLog("⚠️ SYNC request skipped: previous request still in flight")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
stalledSyncBatchCount.set(0)
|
|
||||||
syncRequestInFlight = true
|
syncRequestInFlight = true
|
||||||
addLog("🔄 SYNC requested — fetching last sync timestamp...")
|
addLog("🔄 SYNC requested — fetching last sync timestamp...")
|
||||||
scope.launch {
|
scope.launch {
|
||||||
@@ -701,13 +612,9 @@ object ProtocolManager {
|
|||||||
|
|
||||||
private fun sendSynchronize(timestamp: Long) {
|
private fun sendSynchronize(timestamp: Long) {
|
||||||
syncRequestInFlight = true
|
syncRequestInFlight = true
|
||||||
val safeTimestamp = normalizeSyncTimestamp(timestamp)
|
|
||||||
val packet = PacketSync().apply {
|
val packet = PacketSync().apply {
|
||||||
status = SyncStatus.NOT_NEEDED
|
status = SyncStatus.NOT_NEEDED
|
||||||
this.timestamp = safeTimestamp
|
this.timestamp = timestamp
|
||||||
}
|
|
||||||
if (safeTimestamp != timestamp) {
|
|
||||||
addLog("⚠️ SYNC request timestamp normalized: $timestamp -> $safeTimestamp")
|
|
||||||
}
|
}
|
||||||
send(packet)
|
send(packet)
|
||||||
}
|
}
|
||||||
@@ -732,7 +639,6 @@ object ProtocolManager {
|
|||||||
// subsequent 0x06 packets are dispatched by OkHttp's sequential callback.
|
// subsequent 0x06 packets are dispatched by OkHttp's sequential callback.
|
||||||
setSyncInProgress(true)
|
setSyncInProgress(true)
|
||||||
inboundProcessingFailures.set(0)
|
inboundProcessingFailures.set(0)
|
||||||
syncBatchMessageCount.set(0)
|
|
||||||
}
|
}
|
||||||
SyncStatus.BATCH_END -> {
|
SyncStatus.BATCH_END -> {
|
||||||
addLog("🔄 SYNC BATCH_END — waiting for tasks to finish (ts=${packet.timestamp})")
|
addLog("🔄 SYNC BATCH_END — waiting for tasks to finish (ts=${packet.timestamp})")
|
||||||
@@ -740,77 +646,28 @@ object ProtocolManager {
|
|||||||
// syncBatchInProgress stays true until NOT_NEEDED arrives.
|
// syncBatchInProgress stays true until NOT_NEEDED arrives.
|
||||||
scope.launch {
|
scope.launch {
|
||||||
syncBatchEndMutex.withLock {
|
syncBatchEndMutex.withLock {
|
||||||
if (!syncBatchInProgress) {
|
|
||||||
addLog("⚠️ SYNC BATCH_END ignored: sync already completed")
|
|
||||||
return@launch
|
|
||||||
}
|
|
||||||
val tasksFinished = whenInboundTasksFinish()
|
val tasksFinished = whenInboundTasksFinish()
|
||||||
if (!tasksFinished) {
|
if (!tasksFinished) {
|
||||||
android.util.Log.w(
|
android.util.Log.w(
|
||||||
TAG,
|
TAG,
|
||||||
"SYNC BATCH_END: queue unavailable, skipping cursor update for this step"
|
"SYNC BATCH_END: queue unavailable, skipping cursor update for this step"
|
||||||
)
|
)
|
||||||
val fallbackCursor = normalizeSyncTimestamp(messageRepository?.getLastSyncTimestamp() ?: 0L)
|
val fallbackCursor = messageRepository?.getLastSyncTimestamp() ?: 0L
|
||||||
if (syncBatchInProgress) {
|
sendSynchronize(fallbackCursor)
|
||||||
sendSynchronize(fallbackCursor)
|
|
||||||
}
|
|
||||||
return@launch
|
return@launch
|
||||||
}
|
}
|
||||||
val failuresInBatch = inboundProcessingFailures.getAndSet(0)
|
val failuresInBatch = inboundProcessingFailures.getAndSet(0)
|
||||||
if (failuresInBatch > 0) {
|
if (failuresInBatch > 0) {
|
||||||
addLog(
|
addLog(
|
||||||
"⚠️ SYNC batch had $failuresInBatch processing error(s), continuing with desktop cursor behavior"
|
"⚠️ SYNC batch had $failuresInBatch processing error(s), continuing with desktop sync cursor behavior"
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
val processedMessagesInBatch = syncBatchMessageCount.getAndSet(0)
|
|
||||||
val repository = messageRepository
|
val repository = messageRepository
|
||||||
val currentCursor = normalizeSyncTimestamp(repository?.getLastSyncTimestamp() ?: 0L)
|
// Desktop parity: save the cursor provided by BATCH_END and request next
|
||||||
val safeBatchTimestamp = normalizeSyncTimestamp(packet.timestamp)
|
// chunk with the same cursor.
|
||||||
val nextCursor = if (safeBatchTimestamp > 0L) safeBatchTimestamp else currentCursor
|
repository?.updateLastSyncTimestamp(packet.timestamp)
|
||||||
val requestCursor =
|
addLog("🔄 SYNC tasks done — cursor=${packet.timestamp}, requesting next batch")
|
||||||
when {
|
sendSynchronize(packet.timestamp)
|
||||||
nextCursor > 0L -> nextCursor
|
|
||||||
currentCursor > 0L -> currentCursor
|
|
||||||
else -> 0L
|
|
||||||
}
|
|
||||||
addLog(
|
|
||||||
"🔄 SYNC cursor calc: current=$currentCursor, server=$safeBatchTimestamp, next=$nextCursor, messages=$processedMessagesInBatch, failures=$failuresInBatch"
|
|
||||||
)
|
|
||||||
|
|
||||||
// If server repeatedly returns an empty/non-advancing batch, allow a few retries
|
|
||||||
// first (to avoid premature stop), then finish to avoid endless "Synchronizing...".
|
|
||||||
val noProgress =
|
|
||||||
failuresInBatch == 0 &&
|
|
||||||
processedMessagesInBatch == 0 &&
|
|
||||||
nextCursor <= currentCursor
|
|
||||||
if (noProgress) {
|
|
||||||
val stalled = stalledSyncBatchCount.incrementAndGet()
|
|
||||||
if (stalled >= MAX_STALLED_SYNC_BATCHES) {
|
|
||||||
finishSyncCycle(
|
|
||||||
"✅ SYNC COMPLETE — stalled on cursor for $stalled batch(es) (server=$safeBatchTimestamp, current=$currentCursor)"
|
|
||||||
)
|
|
||||||
return@launch
|
|
||||||
}
|
|
||||||
addLog(
|
|
||||||
"⚠️ SYNC batch has no progress (#$stalled/$MAX_STALLED_SYNC_BATCHES), retrying with cursor=$requestCursor"
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
stalledSyncBatchCount.set(0)
|
|
||||||
}
|
|
||||||
|
|
||||||
if (nextCursor > 0L) {
|
|
||||||
addLog("🔄 SYNC tasks done — saving timestamp $nextCursor, requesting next batch")
|
|
||||||
repository?.updateLastSyncTimestamp(nextCursor)
|
|
||||||
} else {
|
|
||||||
addLog(
|
|
||||||
"⚠️ SYNC batch cursor unresolved (server=$safeBatchTimestamp, current=$currentCursor), requesting next batch with cursor=$requestCursor"
|
|
||||||
)
|
|
||||||
}
|
|
||||||
if (!syncBatchInProgress) {
|
|
||||||
addLog("⚠️ SYNC next batch skipped: sync already completed")
|
|
||||||
return@launch
|
|
||||||
}
|
|
||||||
sendSynchronize(requestCursor)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -936,32 +793,12 @@ object ProtocolManager {
|
|||||||
val currentSync = repository.getLastSyncTimestamp()
|
val currentSync = repository.getLastSyncTimestamp()
|
||||||
val rewindTo = (currentSync - backtrackMs.coerceAtLeast(0L)).coerceAtLeast(0L)
|
val rewindTo = (currentSync - backtrackMs.coerceAtLeast(0L)).coerceAtLeast(0L)
|
||||||
|
|
||||||
stalledSyncBatchCount.set(0)
|
|
||||||
syncRequestInFlight = true
|
syncRequestInFlight = true
|
||||||
addLog("🔄 MANUAL SYNC requested: lastSync=$currentSync -> rewind=$rewindTo")
|
addLog("🔄 MANUAL SYNC requested: lastSync=$currentSync -> rewind=$rewindTo")
|
||||||
sendSynchronize(rewindTo)
|
sendSynchronize(rewindTo)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Defensive normalization for sync cursor/timestamps.
|
|
||||||
* Some malformed values arrive with extra decimal digits; heal by scaling down.
|
|
||||||
*/
|
|
||||||
private fun normalizeSyncTimestamp(rawTimestamp: Long): Long {
|
|
||||||
if (rawTimestamp <= 0L) return 0L
|
|
||||||
val now = System.currentTimeMillis()
|
|
||||||
val maxAllowed = now + MAX_SYNC_FUTURE_DRIFT_MS
|
|
||||||
var normalized = rawTimestamp
|
|
||||||
|
|
||||||
// Try to recover values with accidental decimal scaling (x10, x100, ...).
|
|
||||||
while (normalized > maxAllowed) {
|
|
||||||
normalized /= 10L
|
|
||||||
if (normalized <= 0L) return 0L
|
|
||||||
}
|
|
||||||
|
|
||||||
return normalized
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Authenticate with server
|
* Authenticate with server
|
||||||
*/
|
*/
|
||||||
|
|||||||
Reference in New Issue
Block a user