feat: update version to 1.1.0 and enhance sync cycle handling in ProtocolManager
Some checks failed
Android Kernel Build / build (push) Failing after 2m6s
Some checks failed
Android Kernel Build / build (push) Failing after 2m6s
This commit is contained in:
@@ -23,8 +23,8 @@ val gitShortSha = safeGitOutput("rev-parse", "--short", "HEAD") ?: "unknown"
|
||||
// ═══════════════════════════════════════════════════════════
|
||||
// Rosetta versioning — bump here on each release
|
||||
// ═══════════════════════════════════════════════════════════
|
||||
val rosettaVersionName = "1.0.11"
|
||||
val rosettaVersionCode = 11 // Increment on each release
|
||||
val rosettaVersionName = "1.1.0"
|
||||
val rosettaVersionCode = 12 // Increment on each release
|
||||
|
||||
android {
|
||||
namespace = "com.rosetta.messenger"
|
||||
|
||||
@@ -18,14 +18,8 @@ object ReleaseNotes {
|
||||
Update v$VERSION_PLACEHOLDER
|
||||
|
||||
Синхронизация сообщений
|
||||
- Исправлен недолет сообщений после оффлайна при массовой отправке (спам-тест)
|
||||
- Исправлен сценарий, когда синхронизация останавливалась на первой пачке
|
||||
- Нормализуется sync-cursor (last_sync), включая поврежденные timestamp
|
||||
- Следующий sync-запрос отправляется с безопасным timestamp
|
||||
|
||||
Стабильность протокола
|
||||
- Улучшена защита чтения строк из бинарного потока
|
||||
- Ошибки внутри батча больше не клинят дальнейшую догрузку пакетов
|
||||
- Исправлен бесконечный цикл синхронизации, когда сервер возвращал пустые батчи с неизменным курсором
|
||||
- Вынесена общая логика завершения sync-цикла для единообразной обработки всех сценариев
|
||||
""".trimIndent()
|
||||
|
||||
fun getNotice(version: String): String =
|
||||
|
||||
@@ -10,12 +10,13 @@ import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import java.security.SecureRandom
|
||||
import java.util.*
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.concurrent.atomic.AtomicLong
|
||||
import kotlin.coroutines.resume
|
||||
|
||||
/**
|
||||
@@ -26,9 +27,9 @@ object ProtocolManager {
|
||||
private const val TAG = "ProtocolManager"
|
||||
private const val MANUAL_SYNC_BACKTRACK_MS = 120_000L
|
||||
private const val MAX_SYNC_FUTURE_DRIFT_MS = 86_400_000L // 24h
|
||||
private const val MAX_STALLED_SYNC_BATCHES = 12
|
||||
private const val MAX_DEBUG_LOGS = 600
|
||||
private const val DEBUG_LOG_FLUSH_DELAY_MS = 60L
|
||||
private const val INBOUND_TASK_TIMEOUT_MS = 20_000L
|
||||
|
||||
// Server address - same as React Native version
|
||||
private const val SERVER_ADDRESS = "ws://46.28.71.12:3000"
|
||||
@@ -40,6 +41,9 @@ object ProtocolManager {
|
||||
private var messageRepository: MessageRepository? = null
|
||||
private var appContext: Context? = null
|
||||
private val scope = CoroutineScope(Dispatchers.IO + SupervisorJob())
|
||||
@Volatile private var packetHandlersRegistered = false
|
||||
@Volatile private var stateMonitoringStarted = false
|
||||
@Volatile private var syncRequestInFlight = false
|
||||
|
||||
// Guard: prevent duplicate FCM token subscribe within a single session
|
||||
@Volatile
|
||||
@@ -90,10 +94,15 @@ object ProtocolManager {
|
||||
// Tracks the tail of the sequential processing chain (like desktop's `tail` promise)
|
||||
@Volatile private var inboundQueueDrainJob: Job? = null
|
||||
private val inboundProcessingFailures = AtomicInteger(0)
|
||||
private val syncBatchMaxProcessedTimestamp = AtomicLong(0L)
|
||||
private val syncBatchMessageCount = AtomicInteger(0)
|
||||
private val stalledSyncBatchCount = AtomicInteger(0)
|
||||
private val syncBatchEndMutex = Mutex()
|
||||
|
||||
private fun setSyncInProgress(value: Boolean) {
|
||||
syncBatchInProgress = value
|
||||
if (!value) {
|
||||
stalledSyncBatchCount.set(0)
|
||||
}
|
||||
if (_syncInProgress.value != value) {
|
||||
_syncInProgress.value = value
|
||||
}
|
||||
@@ -160,8 +169,14 @@ object ProtocolManager {
|
||||
fun initialize(context: Context) {
|
||||
appContext = context.applicationContext
|
||||
messageRepository = MessageRepository.getInstance(context)
|
||||
setupPacketHandlers()
|
||||
setupStateMonitoring()
|
||||
if (!packetHandlersRegistered) {
|
||||
setupPacketHandlers()
|
||||
packetHandlersRegistered = true
|
||||
}
|
||||
if (!stateMonitoringStarted) {
|
||||
setupStateMonitoring()
|
||||
stateMonitoringStarted = true
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -175,6 +190,7 @@ object ProtocolManager {
|
||||
onAuthenticated()
|
||||
}
|
||||
if (newState != ProtocolState.AUTHENTICATED && newState != ProtocolState.HANDSHAKING) {
|
||||
syncRequestInFlight = false
|
||||
setSyncInProgress(false)
|
||||
}
|
||||
lastProtocolState = newState
|
||||
@@ -239,10 +255,8 @@ object ProtocolManager {
|
||||
}
|
||||
if (!syncBatchInProgress) {
|
||||
repository.updateLastSyncTimestamp(messagePacket.timestamp)
|
||||
} else if (messagePacket.timestamp > 0L) {
|
||||
syncBatchMaxProcessedTimestamp.accumulateAndGet(messagePacket.timestamp) { prev, cur ->
|
||||
if (cur > prev) cur else prev
|
||||
}
|
||||
} else {
|
||||
syncBatchMessageCount.incrementAndGet()
|
||||
}
|
||||
// ✅ Send delivery ACK only AFTER message is safely stored in DB.
|
||||
// Skip for own sync messages (no need to ACK yourself).
|
||||
@@ -455,14 +469,9 @@ object ProtocolManager {
|
||||
inboundQueueDrainJob = scope.launch {
|
||||
for (task in inboundTaskChannel) {
|
||||
try {
|
||||
withTimeout(INBOUND_TASK_TIMEOUT_MS) {
|
||||
task()
|
||||
}
|
||||
task()
|
||||
} catch (t: Throwable) {
|
||||
markInboundProcessingFailure(
|
||||
"Dialog queue task failed or timed out (${INBOUND_TASK_TIMEOUT_MS}ms)",
|
||||
t
|
||||
)
|
||||
markInboundProcessingFailure("Dialog queue error", t)
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -551,6 +560,10 @@ object ProtocolManager {
|
||||
}
|
||||
|
||||
private fun finishSyncCycle(reason: String) {
|
||||
syncRequestInFlight = false
|
||||
stalledSyncBatchCount.set(0)
|
||||
syncBatchMessageCount.set(0)
|
||||
inboundProcessingFailures.set(0)
|
||||
addLog(reason)
|
||||
setSyncInProgress(false)
|
||||
retryWaitingMessages()
|
||||
@@ -606,16 +619,21 @@ object ProtocolManager {
|
||||
}
|
||||
|
||||
private fun requestSynchronize() {
|
||||
// Desktop parity: set syncBatchInProgress=true BEFORE sending the sync request.
|
||||
// This closes the race window between AUTHENTICATED → BATCH_START where real-time
|
||||
// messages could arrive and update lastSync, potentially advancing the cursor past
|
||||
// messages the server hasn't delivered yet.
|
||||
setSyncInProgress(true)
|
||||
if (syncBatchInProgress) {
|
||||
addLog("⚠️ SYNC request skipped: sync already in progress")
|
||||
return
|
||||
}
|
||||
if (syncRequestInFlight) {
|
||||
addLog("⚠️ SYNC request skipped: previous request still in flight")
|
||||
return
|
||||
}
|
||||
stalledSyncBatchCount.set(0)
|
||||
syncRequestInFlight = true
|
||||
addLog("🔄 SYNC requested — fetching last sync timestamp...")
|
||||
scope.launch {
|
||||
val repository = messageRepository
|
||||
if (repository == null || !repository.isInitialized()) {
|
||||
setSyncInProgress(false)
|
||||
syncRequestInFlight = false
|
||||
requireResyncAfterAccountInit("⏳ Sync postponed until account is initialized")
|
||||
return@launch
|
||||
}
|
||||
@@ -626,6 +644,7 @@ object ProtocolManager {
|
||||
}
|
||||
|
||||
private fun sendSynchronize(timestamp: Long) {
|
||||
syncRequestInFlight = true
|
||||
val safeTimestamp = normalizeSyncTimestamp(timestamp)
|
||||
val packet = PacketSync().apply {
|
||||
status = SyncStatus.NOT_NEEDED
|
||||
@@ -649,6 +668,7 @@ object ProtocolManager {
|
||||
* has been scheduled on Dispatchers.IO.
|
||||
*/
|
||||
private fun handleSyncPacket(packet: PacketSync) {
|
||||
syncRequestInFlight = false
|
||||
when (packet.status) {
|
||||
SyncStatus.BATCH_START -> {
|
||||
addLog("🔄 SYNC BATCH_START — incoming message batch")
|
||||
@@ -656,76 +676,86 @@ object ProtocolManager {
|
||||
// subsequent 0x06 packets are dispatched by OkHttp's sequential callback.
|
||||
setSyncInProgress(true)
|
||||
inboundProcessingFailures.set(0)
|
||||
syncBatchMaxProcessedTimestamp.set(0L)
|
||||
syncBatchMessageCount.set(0)
|
||||
}
|
||||
SyncStatus.BATCH_END -> {
|
||||
addLog("🔄 SYNC BATCH_END — waiting for tasks to finish (ts=${packet.timestamp})")
|
||||
// BATCH_END requires suspend (whenInboundTasksFinish), so we launch a coroutine.
|
||||
// syncBatchInProgress stays true until NOT_NEEDED arrives.
|
||||
scope.launch {
|
||||
setSyncInProgress(true)
|
||||
val tasksFinished = whenInboundTasksFinish()
|
||||
if (!tasksFinished) {
|
||||
android.util.Log.w(
|
||||
TAG,
|
||||
"SYNC BATCH_END: queue unavailable, skipping cursor update for this step"
|
||||
)
|
||||
return@launch
|
||||
}
|
||||
val failuresInBatch = inboundProcessingFailures.getAndSet(0)
|
||||
if (failuresInBatch > 0) {
|
||||
addLog(
|
||||
"⚠️ SYNC batch had $failuresInBatch processing error(s), continuing with desktop cursor behavior"
|
||||
)
|
||||
}
|
||||
val repository = messageRepository
|
||||
val currentCursor = normalizeSyncTimestamp(repository?.getLastSyncTimestamp() ?: 0L)
|
||||
val safeBatchTimestamp = normalizeSyncTimestamp(packet.timestamp)
|
||||
val processedMaxTimestamp = normalizeSyncTimestamp(syncBatchMaxProcessedTimestamp.get())
|
||||
val nextCursor =
|
||||
when {
|
||||
safeBatchTimestamp <= 0L -> processedMaxTimestamp
|
||||
processedMaxTimestamp <= 0L -> safeBatchTimestamp
|
||||
processedMaxTimestamp < safeBatchTimestamp -> processedMaxTimestamp
|
||||
else -> safeBatchTimestamp
|
||||
syncBatchEndMutex.withLock {
|
||||
if (!syncBatchInProgress) {
|
||||
addLog("⚠️ SYNC BATCH_END ignored: sync already completed")
|
||||
return@launch
|
||||
}
|
||||
val tasksFinished = whenInboundTasksFinish()
|
||||
if (!tasksFinished) {
|
||||
android.util.Log.w(
|
||||
TAG,
|
||||
"SYNC BATCH_END: queue unavailable, skipping cursor update for this step"
|
||||
)
|
||||
val fallbackCursor = normalizeSyncTimestamp(messageRepository?.getLastSyncTimestamp() ?: 0L)
|
||||
if (syncBatchInProgress) {
|
||||
sendSynchronize(fallbackCursor)
|
||||
}
|
||||
val requestCursor =
|
||||
when {
|
||||
nextCursor > 0L -> nextCursor
|
||||
currentCursor > 0L -> currentCursor
|
||||
else -> 0L
|
||||
return@launch
|
||||
}
|
||||
val failuresInBatch = inboundProcessingFailures.getAndSet(0)
|
||||
if (failuresInBatch > 0) {
|
||||
addLog(
|
||||
"⚠️ SYNC batch had $failuresInBatch processing error(s), continuing with desktop cursor behavior"
|
||||
)
|
||||
}
|
||||
val processedMessagesInBatch = syncBatchMessageCount.getAndSet(0)
|
||||
val repository = messageRepository
|
||||
val currentCursor = normalizeSyncTimestamp(repository?.getLastSyncTimestamp() ?: 0L)
|
||||
val safeBatchTimestamp = normalizeSyncTimestamp(packet.timestamp)
|
||||
val nextCursor = if (safeBatchTimestamp > 0L) safeBatchTimestamp else currentCursor
|
||||
val requestCursor =
|
||||
when {
|
||||
nextCursor > 0L -> nextCursor
|
||||
currentCursor > 0L -> currentCursor
|
||||
else -> 0L
|
||||
}
|
||||
addLog(
|
||||
"🔄 SYNC cursor calc: current=$currentCursor, server=$safeBatchTimestamp, next=$nextCursor, messages=$processedMessagesInBatch, failures=$failuresInBatch"
|
||||
)
|
||||
|
||||
// If server repeatedly returns an empty/non-advancing batch, allow a few retries
|
||||
// first (to avoid premature stop), then finish to avoid endless "Synchronizing...".
|
||||
val noProgress =
|
||||
failuresInBatch == 0 &&
|
||||
processedMessagesInBatch == 0 &&
|
||||
nextCursor <= currentCursor
|
||||
if (noProgress) {
|
||||
val stalled = stalledSyncBatchCount.incrementAndGet()
|
||||
if (stalled >= MAX_STALLED_SYNC_BATCHES) {
|
||||
finishSyncCycle(
|
||||
"✅ SYNC COMPLETE — stalled on cursor for $stalled batch(es) (server=$safeBatchTimestamp, current=$currentCursor)"
|
||||
)
|
||||
return@launch
|
||||
}
|
||||
addLog(
|
||||
"⚠️ SYNC batch has no progress (#$stalled/$MAX_STALLED_SYNC_BATCHES), retrying with cursor=$requestCursor"
|
||||
)
|
||||
} else {
|
||||
stalledSyncBatchCount.set(0)
|
||||
}
|
||||
|
||||
// Loop guard: if server keeps BATCH_END with unchanged cursor and we did not
|
||||
// process anything in this batch, treat sync as finished to avoid infinite loop.
|
||||
val noProgress =
|
||||
failuresInBatch == 0 &&
|
||||
processedMaxTimestamp <= 0L &&
|
||||
(nextCursor <= 0L || nextCursor == currentCursor)
|
||||
if (noProgress) {
|
||||
finishSyncCycle(
|
||||
"✅ SYNC COMPLETE — no progress on batch (server=$safeBatchTimestamp, current=$currentCursor)"
|
||||
)
|
||||
return@launch
|
||||
if (nextCursor > 0L) {
|
||||
addLog("🔄 SYNC tasks done — saving timestamp $nextCursor, requesting next batch")
|
||||
repository?.updateLastSyncTimestamp(nextCursor)
|
||||
} else {
|
||||
addLog(
|
||||
"⚠️ SYNC batch cursor unresolved (server=$safeBatchTimestamp, current=$currentCursor), requesting next batch with cursor=$requestCursor"
|
||||
)
|
||||
}
|
||||
if (!syncBatchInProgress) {
|
||||
addLog("⚠️ SYNC next batch skipped: sync already completed")
|
||||
return@launch
|
||||
}
|
||||
sendSynchronize(requestCursor)
|
||||
}
|
||||
|
||||
// If server batch timestamp runs ahead of what we actually processed, clamp it.
|
||||
// This avoids skipping tail messages when packet delivery/parsing was partial.
|
||||
if (processedMaxTimestamp > 0L && processedMaxTimestamp < safeBatchTimestamp) {
|
||||
addLog(
|
||||
"⚠️ SYNC cursor clamped to processed max: server=$safeBatchTimestamp processed=$processedMaxTimestamp"
|
||||
)
|
||||
}
|
||||
|
||||
if (nextCursor > 0L) {
|
||||
addLog("🔄 SYNC tasks done — saving timestamp $nextCursor, requesting next batch")
|
||||
repository?.updateLastSyncTimestamp(nextCursor)
|
||||
} else {
|
||||
addLog(
|
||||
"⚠️ SYNC batch cursor unresolved (server=$safeBatchTimestamp, processed=$processedMaxTimestamp, current=$currentCursor), requesting next batch with cursor=$requestCursor"
|
||||
)
|
||||
}
|
||||
sendSynchronize(requestCursor)
|
||||
}
|
||||
}
|
||||
SyncStatus.NOT_NEEDED -> {
|
||||
@@ -821,6 +851,7 @@ object ProtocolManager {
|
||||
fun syncOnForeground() {
|
||||
if (!isAuthenticated()) return
|
||||
if (syncBatchInProgress) return
|
||||
if (syncRequestInFlight) return
|
||||
val now = System.currentTimeMillis()
|
||||
if (now - lastForegroundSyncTime < 5_000) return
|
||||
lastForegroundSyncTime = now
|
||||
@@ -838,6 +869,7 @@ object ProtocolManager {
|
||||
return
|
||||
}
|
||||
if (syncBatchInProgress) return
|
||||
if (syncRequestInFlight) return
|
||||
|
||||
scope.launch {
|
||||
val repository = messageRepository
|
||||
@@ -848,7 +880,8 @@ object ProtocolManager {
|
||||
val currentSync = repository.getLastSyncTimestamp()
|
||||
val rewindTo = (currentSync - backtrackMs.coerceAtLeast(0L)).coerceAtLeast(0L)
|
||||
|
||||
setSyncInProgress(true)
|
||||
stalledSyncBatchCount.set(0)
|
||||
syncRequestInFlight = true
|
||||
addLog("🔄 MANUAL SYNC requested: lastSync=$currentSync -> rewind=$rewindTo")
|
||||
sendSynchronize(rewindTo)
|
||||
}
|
||||
@@ -1096,6 +1129,7 @@ object ProtocolManager {
|
||||
protocol?.clearCredentials()
|
||||
_devices.value = emptyList()
|
||||
_pendingDeviceVerification.value = null
|
||||
syncRequestInFlight = false
|
||||
setSyncInProgress(false)
|
||||
lastSubscribedToken = null // reset so token is re-sent on next connect
|
||||
}
|
||||
@@ -1108,6 +1142,7 @@ object ProtocolManager {
|
||||
protocol = null
|
||||
_devices.value = emptyList()
|
||||
_pendingDeviceVerification.value = null
|
||||
syncRequestInFlight = false
|
||||
setSyncInProgress(false)
|
||||
scope.cancel()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user