feat: Bump version to 1.0.11, add ConnectionLogsScreen, and enhance message synchronization logic

This commit is contained in:
2026-02-27 02:43:30 +05:00
parent 829e19364a
commit 2da2c6ab36
9 changed files with 338 additions and 102 deletions

View File

@@ -23,8 +23,8 @@ val gitShortSha = safeGitOutput("rev-parse", "--short", "HEAD") ?: "unknown"
// ═══════════════════════════════════════════════════════════ // ═══════════════════════════════════════════════════════════
// Rosetta versioning — bump here on each release // Rosetta versioning — bump here on each release
// ═══════════════════════════════════════════════════════════ // ═══════════════════════════════════════════════════════════
val rosettaVersionName = "1.0.10" val rosettaVersionName = "1.0.11"
val rosettaVersionCode = 10 // Increment on each release val rosettaVersionCode = 11 // Increment on each release
android { android {
namespace = "com.rosetta.messenger" namespace = "com.rosetta.messenger"

View File

@@ -41,6 +41,7 @@ import com.rosetta.messenger.ui.auth.AuthFlow
import com.rosetta.messenger.ui.auth.DeviceConfirmScreen import com.rosetta.messenger.ui.auth.DeviceConfirmScreen
import com.rosetta.messenger.ui.chats.ChatDetailScreen import com.rosetta.messenger.ui.chats.ChatDetailScreen
import com.rosetta.messenger.ui.chats.ChatsListScreen import com.rosetta.messenger.ui.chats.ChatsListScreen
import com.rosetta.messenger.ui.chats.ConnectionLogsScreen
import com.rosetta.messenger.ui.chats.RequestsListScreen import com.rosetta.messenger.ui.chats.RequestsListScreen
import com.rosetta.messenger.ui.chats.SearchScreen import com.rosetta.messenger.ui.chats.SearchScreen
import com.rosetta.messenger.ui.components.OptimizedEmojiCache import com.rosetta.messenger.ui.components.OptimizedEmojiCache
@@ -503,6 +504,7 @@ sealed class Screen {
data object Safety : Screen() data object Safety : Screen()
data object Backup : Screen() data object Backup : Screen()
data object Logs : Screen() data object Logs : Screen()
data object ConnectionLogs : Screen()
data object CrashLogs : Screen() data object CrashLogs : Screen()
data object Biometric : Screen() data object Biometric : Screen()
data object Appearance : Screen() data object Appearance : Screen()
@@ -610,6 +612,9 @@ fun MainScreen(
val isSafetyVisible by remember { derivedStateOf { navStack.any { it is Screen.Safety } } } val isSafetyVisible by remember { derivedStateOf { navStack.any { it is Screen.Safety } } }
val isBackupVisible by remember { derivedStateOf { navStack.any { it is Screen.Backup } } } val isBackupVisible by remember { derivedStateOf { navStack.any { it is Screen.Backup } } }
val isLogsVisible by remember { derivedStateOf { navStack.any { it is Screen.Logs } } } val isLogsVisible by remember { derivedStateOf { navStack.any { it is Screen.Logs } } }
val isConnectionLogsVisible by remember {
derivedStateOf { navStack.any { it is Screen.ConnectionLogs } }
}
val isCrashLogsVisible by remember { val isCrashLogsVisible by remember {
derivedStateOf { navStack.any { it is Screen.CrashLogs } } derivedStateOf { navStack.any { it is Screen.CrashLogs } }
} }
@@ -1010,6 +1015,17 @@ fun MainScreen(
) )
} }
SwipeBackContainer(
isVisible = isConnectionLogsVisible,
onBack = { navStack = navStack.filterNot { it is Screen.ConnectionLogs } },
isDarkTheme = isDarkTheme
) {
ConnectionLogsScreen(
isDarkTheme = isDarkTheme,
onBack = { navStack = navStack.filterNot { it is Screen.ConnectionLogs } }
)
}
var isOtherProfileSwipeEnabled by remember { mutableStateOf(true) } var isOtherProfileSwipeEnabled by remember { mutableStateOf(true) }
LaunchedEffect(selectedOtherUser?.publicKey) { LaunchedEffect(selectedOtherUser?.publicKey) {
isOtherProfileSwipeEnabled = true isOtherProfileSwipeEnabled = true

View File

@@ -98,6 +98,7 @@ class MessageRepository private constructor(private val context: Context) {
/** Desktop parity: MESSAGE_MAX_TIME_TO_DELEVERED_S = 80 (seconds) */ /** Desktop parity: MESSAGE_MAX_TIME_TO_DELEVERED_S = 80 (seconds) */
private const val MESSAGE_MAX_TIME_TO_DELIVERED_MS = 80_000L private const val MESSAGE_MAX_TIME_TO_DELIVERED_MS = 80_000L
private const val MAX_SYNC_FUTURE_DRIFT_MS = 86_400_000L // 24h
const val SYSTEM_SAFE_PUBLIC_KEY = "0x000000000000000000000000000000000000000002" const val SYSTEM_SAFE_PUBLIC_KEY = "0x000000000000000000000000000000000000000002"
const val SYSTEM_SAFE_TITLE = "Safe" const val SYSTEM_SAFE_TITLE = "Safe"
@@ -366,16 +367,43 @@ class MessageRepository private constructor(private val context: Context) {
suspend fun getLastSyncTimestamp(): Long { suspend fun getLastSyncTimestamp(): Long {
val account = currentAccount ?: return 0L val account = currentAccount ?: return 0L
return syncTimeDao.getLastSync(account) ?: 0L val stored = syncTimeDao.getLastSync(account) ?: 0L
val normalized = normalizeSyncTimestamp(stored)
if (normalized != stored) {
syncTimeDao.upsert(AccountSyncTimeEntity(account = account, lastSync = normalized))
if (stored > 0) {
android.util.Log.w(
"MessageRepository",
"⚠️ Normalized invalid last_sync for account=${account.take(10)}...: $stored -> $normalized"
)
ProtocolManager.addLog("⚠️ SYNC cursor normalized: $stored -> $normalized")
}
}
return normalized
} }
suspend fun updateLastSyncTimestamp(timestamp: Long) { suspend fun updateLastSyncTimestamp(timestamp: Long) {
if (timestamp <= 0) return if (timestamp <= 0) return
val account = currentAccount ?: return val account = currentAccount ?: return
val existing = syncTimeDao.getLastSync(account) ?: 0L val normalized = normalizeSyncTimestamp(timestamp)
if (timestamp > existing) { if (normalized <= 0) return
syncTimeDao.upsert(AccountSyncTimeEntity(account = account, lastSync = timestamp)) // Desktop parity: allow moving sync cursor backward if needed.
syncTimeDao.upsert(AccountSyncTimeEntity(account = account, lastSync = normalized))
}
private fun normalizeSyncTimestamp(rawTimestamp: Long): Long {
if (rawTimestamp <= 0) return 0L
val now = System.currentTimeMillis()
val maxAllowed = now + MAX_SYNC_FUTURE_DRIFT_MS
var normalized = rawTimestamp
// Heal common corruption where extra decimal places appear in timestamp.
while (normalized > maxAllowed) {
normalized /= 10L
if (normalized <= 0L) return 0L
} }
return if (normalized > maxAllowed) 0L else normalized
} }
/** Получить поток сообщений для диалога */ /** Получить поток сообщений для диалога */
@@ -591,21 +619,21 @@ class MessageRepository private constructor(private val context: Context) {
return optimisticMessage return optimisticMessage
} }
/** Обработка входящего сообщения */ /** Обработка входящего сообщения. Возвращает true если пакет обработан безопасно. */
suspend fun handleIncomingMessage(packet: PacketMessage) { suspend fun handleIncomingMessage(packet: PacketMessage): Boolean {
val startTime = System.currentTimeMillis() val startTime = System.currentTimeMillis()
val account = val account =
currentAccount currentAccount
?: run { ?: run {
MessageLogger.debug("📥 RECEIVE SKIP: account is null") MessageLogger.debug("📥 RECEIVE SKIP: account is null")
return return false
} }
val privateKey = val privateKey =
currentPrivateKey currentPrivateKey
?: run { ?: run {
MessageLogger.debug("📥 RECEIVE SKIP: privateKey is null") MessageLogger.debug("📥 RECEIVE SKIP: privateKey is null")
return return false
} }
// 📝 LOG: Начало обработки входящего сообщения // 📝 LOG: Начало обработки входящего сообщения
@@ -625,7 +653,7 @@ class MessageRepository private constructor(private val context: Context) {
val isBlocked = database.blacklistDao().isUserBlocked(packet.fromPublicKey, account) val isBlocked = database.blacklistDao().isUserBlocked(packet.fromPublicKey, account)
if (isBlocked) { if (isBlocked) {
MessageLogger.logBlockedSender(packet.fromPublicKey) MessageLogger.logBlockedSender(packet.fromPublicKey)
return return true
} }
} }
@@ -650,14 +678,14 @@ class MessageRepository private constructor(private val context: Context) {
MessageLogger.debug( MessageLogger.debug(
"📥 SKIP (in-memory cache): Message $messageId already being processed" "📥 SKIP (in-memory cache): Message $messageId already being processed"
) )
return return true
} }
// 🔥 ВТОРОЙ УРОВЕНЬ ЗАЩИТЫ: Проверка в БД (для сообщений сохранённых в предыдущих сессиях) // 🔥 ВТОРОЙ УРОВЕНЬ ЗАЩИТЫ: Проверка в БД (для сообщений сохранённых в предыдущих сессиях)
val isDuplicate = messageDao.messageExists(account, messageId) val isDuplicate = messageDao.messageExists(account, messageId)
MessageLogger.logDuplicateCheck(messageId, isDuplicate) MessageLogger.logDuplicateCheck(messageId, isDuplicate)
if (isDuplicate) { if (isDuplicate) {
return return true
} }
val dialogOpponentKey = if (isOwnMessage) packet.toPublicKey else packet.fromPublicKey val dialogOpponentKey = if (isOwnMessage) packet.toPublicKey else packet.fromPublicKey
@@ -794,6 +822,7 @@ class MessageRepository private constructor(private val context: Context) {
// 📝 LOG: Успешная обработка // 📝 LOG: Успешная обработка
MessageLogger.logReceiveSuccess(messageId, System.currentTimeMillis() - startTime) MessageLogger.logReceiveSuccess(messageId, System.currentTimeMillis() - startTime)
return true
} catch (e: Exception) { } catch (e: Exception) {
// 📝 LOG: Ошибка обработки // 📝 LOG: Ошибка обработки
MessageLogger.logDecryptionError(messageId, e) MessageLogger.logDecryptionError(messageId, e)
@@ -803,6 +832,7 @@ class MessageRepository private constructor(private val context: Context) {
// Разрешаем повторную обработку через re-sync, если пакет не удалось сохранить. // Разрешаем повторную обработку через re-sync, если пакет не удалось сохранить.
processedMessageIds.remove(messageId) processedMessageIds.remove(messageId)
e.printStackTrace() e.printStackTrace()
return false
} }
} }

View File

@@ -17,15 +17,15 @@ object ReleaseNotes {
val RELEASE_NOTICE = """ val RELEASE_NOTICE = """
Update v$VERSION_PLACEHOLDER Update v$VERSION_PLACEHOLDER
Верификация аккаунта Синхронизация сообщений
- Бейдж верификации отображается в боковом меню рядом с именем - Исправлен недолет сообщений после оффлайна при массовой отправке (спам-тест)
- Бейдж верификации отображается в экране профиля - Исправлен сценарий, когда синхронизация останавливалась на первой пачке
- Статус загружается из кэша пользователей при старте - Нормализуется sync-cursor (last_sync), включая поврежденные timestamp
- Следующий sync-запрос отправляется с безопасным timestamp
Стабильность Стабильность протокола
- Фильтрация неподдерживаемых пакетов (группы, conversations) - Улучшена защита чтения строк из бинарного потока
- Добавлена фильтрация delivery-пакетов для неподдерживаемых диалогов - Ошибки внутри батча больше не клинят дальнейшую догрузку пакетов
- Улучшена обработка ошибок в очереди входящих пакетов
""".trimIndent() """.trimIndent()
fun getNotice(version: String): String = fun getNotice(version: String): String =

View File

@@ -32,6 +32,7 @@ class Protocol(
private const val TAG = "RosettaProtocol" private const val TAG = "RosettaProtocol"
private const val RECONNECT_INTERVAL = 5000L // 5 seconds (как в Архиве) private const val RECONNECT_INTERVAL = 5000L // 5 seconds (как в Архиве)
private const val HANDSHAKE_TIMEOUT = 10000L // 10 seconds private const val HANDSHAKE_TIMEOUT = 10000L // 10 seconds
private const val MIN_PACKET_ID_BITS = 18 // Stream.readInt16() = 2 * readInt8() (9 bits each)
} }
private fun log(message: String) { private fun log(message: String) {
@@ -493,32 +494,54 @@ class Protocol(
// Debug: log first 50 bytes as hex // Debug: log first 50 bytes as hex
val hexDump = data.take(50).joinToString(" ") { String.format("%02X", it.toInt() and 0xFF) } val hexDump = data.take(50).joinToString(" ") { String.format("%02X", it.toInt() and 0xFF) }
log("📥 Received ${data.size} bytes: $hexDump${if (data.size > 50) "..." else ""}") log("📥 Received ${data.size} bytes: $hexDump${if (data.size > 50) "..." else ""}")
val stream = Stream(data) val stream = Stream(data)
val packetId = stream.readInt16() var parsedPackets = 0
log("📥 Packet ID: $packetId") while (stream.getRemainingBits() >= MIN_PACKET_ID_BITS) {
val packetStartBits = stream.getReadPointerBits()
val packetFactory = supportedPackets[packetId] val packetId = stream.readInt16()
if (packetFactory == null) {
log("⚠️ Unknown packet ID: $packetId") log("📥 Packet ID: $packetId")
return
} val packetFactory = supportedPackets[packetId]
if (packetFactory == null) {
val packet = packetFactory() log("⚠️ Unknown packet ID: $packetId, stopping frame parse")
packet.receive(stream) break
// Notify waiters
val waitersCount = packetWaiters[packetId]?.size ?: 0
log("📥 Notifying $waitersCount waiter(s) for packet $packetId")
packetWaiters[packetId]?.forEach { callback ->
try {
callback(packet)
} catch (e: Exception) {
log("❌ Error in packet handler: ${e.message}")
e.printStackTrace()
} }
val packet = packetFactory()
try {
packet.receive(stream)
} catch (e: Exception) {
log("❌ Error parsing packet $packetId: ${e.message}")
e.printStackTrace()
break
}
// Notify waiters
val waitersCount = packetWaiters[packetId]?.size ?: 0
log("📥 Notifying $waitersCount waiter(s) for packet $packetId")
packetWaiters[packetId]?.forEach { callback ->
try {
callback(packet)
} catch (e: Exception) {
log("❌ Error in packet handler: ${e.message}")
e.printStackTrace()
}
}
parsedPackets++
val consumedBits = stream.getReadPointerBits() - packetStartBits
if (consumedBits <= 0) {
log("⚠️ Packet parser made no progress for packet $packetId, stopping frame parse")
break
}
}
if (parsedPackets > 1) {
log("📦 Parsed $parsedPackets packets from single WebSocket frame")
} }
} catch (e: Exception) { } catch (e: Exception) {
log("❌ Error parsing packet: ${e.message}") log("❌ Error parsing packet: ${e.message}")

View File

@@ -13,6 +13,9 @@ import kotlinx.coroutines.flow.asStateFlow
import java.security.SecureRandom import java.security.SecureRandom
import java.util.* import java.util.*
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicLong
import kotlin.coroutines.resume import kotlin.coroutines.resume
/** /**
@@ -21,6 +24,11 @@ import kotlin.coroutines.resume
*/ */
object ProtocolManager { object ProtocolManager {
private const val TAG = "ProtocolManager" private const val TAG = "ProtocolManager"
private const val MANUAL_SYNC_BACKTRACK_MS = 120_000L
private const val MAX_SYNC_FUTURE_DRIFT_MS = 86_400_000L // 24h
private const val MAX_DEBUG_LOGS = 600
private const val DEBUG_LOG_FLUSH_DELAY_MS = 60L
private const val INBOUND_TASK_TIMEOUT_MS = 20_000L
// Server address - same as React Native version // Server address - same as React Native version
private const val SERVER_ADDRESS = "ws://46.28.71.12:3000" private const val SERVER_ADDRESS = "ws://46.28.71.12:3000"
@@ -37,10 +45,12 @@ object ProtocolManager {
@Volatile @Volatile
private var lastSubscribedToken: String? = null private var lastSubscribedToken: String? = null
// Debug logs for dev console - 🚀 ОТКЛЮЧЕНО для производительности
// Логи только в Logcat, не в StateFlow (это вызывало ANR!)
private val _debugLogs = MutableStateFlow<List<String>>(emptyList()) private val _debugLogs = MutableStateFlow<List<String>>(emptyList())
val debugLogs: StateFlow<List<String>> = _debugLogs.asStateFlow() val debugLogs: StateFlow<List<String>> = _debugLogs.asStateFlow()
private val debugLogsBuffer = ArrayDeque<String>(MAX_DEBUG_LOGS)
private val debugLogsLock = Any()
@Volatile private var debugFlushJob: Job? = null
private val debugFlushPending = AtomicBoolean(false)
// Typing status // Typing status
private val _typingUsers = MutableStateFlow<Set<String>>(emptySet()) private val _typingUsers = MutableStateFlow<Set<String>>(emptySet())
@@ -63,8 +73,8 @@ object ProtocolManager {
// Pending resolves: publicKey → list of continuations waiting for the result // Pending resolves: publicKey → list of continuations waiting for the result
private val pendingResolves = ConcurrentHashMap<String, MutableList<kotlinx.coroutines.CancellableContinuation<SearchUser?>>>() private val pendingResolves = ConcurrentHashMap<String, MutableList<kotlinx.coroutines.CancellableContinuation<SearchUser?>>>()
// 🚀 Флаг для включения UI логов (по умолчанию ВЫКЛЮЧЕНО - это вызывало ANR!) // UI logs are enabled by default; updates are throttled and bounded by MAX_DEBUG_LOGS.
private var uiLogsEnabled = false private var uiLogsEnabled = true
private var lastProtocolState: ProtocolState? = null private var lastProtocolState: ProtocolState? = null
@Volatile private var syncBatchInProgress = false @Volatile private var syncBatchInProgress = false
private val _syncInProgress = MutableStateFlow(false) private val _syncInProgress = MutableStateFlow(false)
@@ -79,6 +89,8 @@ object ProtocolManager {
private val inboundTaskChannel = Channel<suspend () -> Unit>(Channel.UNLIMITED) private val inboundTaskChannel = Channel<suspend () -> Unit>(Channel.UNLIMITED)
// Tracks the tail of the sequential processing chain (like desktop's `tail` promise) // Tracks the tail of the sequential processing chain (like desktop's `tail` promise)
@Volatile private var inboundQueueDrainJob: Job? = null @Volatile private var inboundQueueDrainJob: Job? = null
private val inboundProcessingFailures = AtomicInteger(0)
private val syncBatchMaxProcessedTimestamp = AtomicLong(0L)
private fun setSyncInProgress(value: Boolean) { private fun setSyncInProgress(value: Boolean) {
syncBatchInProgress = value syncBatchInProgress = value
@@ -87,18 +99,60 @@ object ProtocolManager {
} }
} }
fun addLog(@Suppress("UNUSED_PARAMETER") message: String) { fun addLog(message: String) {
// Disabled by request: keep debug log buffer empty. if (!uiLogsEnabled) return
return val timestamp =
java.text.SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault()).format(Date())
val line = "[$timestamp] $message"
synchronized(debugLogsLock) {
if (debugLogsBuffer.size >= MAX_DEBUG_LOGS) {
debugLogsBuffer.removeFirst()
}
debugLogsBuffer.addLast(line)
}
flushDebugLogsThrottled()
} }
fun enableUILogs(enabled: Boolean) { fun enableUILogs(enabled: Boolean) {
uiLogsEnabled = enabled uiLogsEnabled = enabled
if (enabled) {
val snapshot = synchronized(debugLogsLock) { debugLogsBuffer.toList() }
_debugLogs.value = snapshot
} else {
_debugLogs.value = emptyList()
}
} }
fun clearLogs() { fun clearLogs() {
synchronized(debugLogsLock) {
debugLogsBuffer.clear()
}
_debugLogs.value = emptyList() _debugLogs.value = emptyList()
} }
private fun flushDebugLogsThrottled() {
debugFlushPending.set(true)
if (debugFlushJob?.isActive == true) return
debugFlushJob =
scope.launch {
while (debugFlushPending.getAndSet(false)) {
delay(DEBUG_LOG_FLUSH_DELAY_MS)
val snapshot = synchronized(debugLogsLock) { debugLogsBuffer.toList() }
_debugLogs.value = snapshot
}
}
}
private fun markInboundProcessingFailure(reason: String, error: Throwable? = null) {
inboundProcessingFailures.incrementAndGet()
if (error != null) {
android.util.Log.e(TAG, reason, error)
addLog("$reason: ${error.message ?: error.javaClass.simpleName}")
} else {
android.util.Log.w(TAG, reason)
addLog("⚠️ $reason")
}
}
/** /**
* Инициализация с контекстом для доступа к MessageRepository * Инициализация с контекстом для доступа к MessageRepository
@@ -160,11 +214,13 @@ object ProtocolManager {
val repository = messageRepository val repository = messageRepository
if (repository == null || !repository.isInitialized()) { if (repository == null || !repository.isInitialized()) {
requireResyncAfterAccountInit("⏳ Incoming message before account init, scheduling re-sync") requireResyncAfterAccountInit("⏳ Incoming message before account init, scheduling re-sync")
markInboundProcessingFailure("Incoming packet skipped before account init")
return@launchInboundPacketTask return@launchInboundPacketTask
} }
val ownKey = getProtocol().getPublicKey().orEmpty() val ownKey = getProtocol().getPublicKey().orEmpty()
if (ownKey.isBlank()) { if (ownKey.isBlank()) {
requireResyncAfterAccountInit("⏳ Incoming message before protocol account init, scheduling re-sync") requireResyncAfterAccountInit("⏳ Incoming message before protocol account init, scheduling re-sync")
markInboundProcessingFailure("Incoming packet skipped before protocol account init")
return@launchInboundPacketTask return@launchInboundPacketTask
} }
if (!isSupportedDirectMessagePacket(messagePacket, ownKey)) { if (!isSupportedDirectMessagePacket(messagePacket, ownKey)) {
@@ -174,22 +230,28 @@ object ProtocolManager {
) )
return@launchInboundPacketTask return@launchInboundPacketTask
} }
try { val processed = repository.handleIncomingMessage(messagePacket)
repository.handleIncomingMessage(messagePacket) if (!processed) {
if (!syncBatchInProgress) { markInboundProcessingFailure(
repository.updateLastSyncTimestamp(messagePacket.timestamp) "Message processing failed for ${messagePacket.messageId.take(8)}"
)
return@launchInboundPacketTask
}
if (!syncBatchInProgress) {
repository.updateLastSyncTimestamp(messagePacket.timestamp)
} else if (messagePacket.timestamp > 0L) {
syncBatchMaxProcessedTimestamp.accumulateAndGet(messagePacket.timestamp) { prev, cur ->
if (cur > prev) cur else prev
} }
// ✅ Send delivery ACK only AFTER message is safely stored in DB. }
// Skip for own sync messages (no need to ACK yourself). // ✅ Send delivery ACK only AFTER message is safely stored in DB.
if (messagePacket.fromPublicKey != ownKey) { // Skip for own sync messages (no need to ACK yourself).
val deliveryPacket = PacketDelivery().apply { if (messagePacket.fromPublicKey != ownKey) {
messageId = messagePacket.messageId val deliveryPacket = PacketDelivery().apply {
toPublicKey = messagePacket.fromPublicKey messageId = messagePacket.messageId
} toPublicKey = messagePacket.fromPublicKey
send(deliveryPacket)
} }
} catch (e: Exception) { send(deliveryPacket)
android.util.Log.e(TAG, "❌ Message processing failed: ${messagePacket.messageId.take(8)}, err=${e.message}")
} }
} }
} }
@@ -203,6 +265,7 @@ object ProtocolManager {
val repository = messageRepository val repository = messageRepository
if (repository == null || !repository.isInitialized()) { if (repository == null || !repository.isInitialized()) {
requireResyncAfterAccountInit("⏳ Delivery status before account init, scheduling re-sync") requireResyncAfterAccountInit("⏳ Delivery status before account init, scheduling re-sync")
markInboundProcessingFailure("Delivery packet skipped before account init")
return@launchInboundPacketTask return@launchInboundPacketTask
} }
if (isUnsupportedDialogKey(deliveryPacket.toPublicKey)) { if (isUnsupportedDialogKey(deliveryPacket.toPublicKey)) {
@@ -212,7 +275,12 @@ object ProtocolManager {
) )
return@launchInboundPacketTask return@launchInboundPacketTask
} }
repository.handleDelivery(deliveryPacket) try {
repository.handleDelivery(deliveryPacket)
} catch (e: Exception) {
markInboundProcessingFailure("Delivery processing failed", e)
return@launchInboundPacketTask
}
if (!syncBatchInProgress) { if (!syncBatchInProgress) {
repository.updateLastSyncTimestamp(System.currentTimeMillis()) repository.updateLastSyncTimestamp(System.currentTimeMillis())
} }
@@ -228,11 +296,13 @@ object ProtocolManager {
val repository = messageRepository val repository = messageRepository
if (repository == null || !repository.isInitialized()) { if (repository == null || !repository.isInitialized()) {
requireResyncAfterAccountInit("⏳ Read status before account init, scheduling re-sync") requireResyncAfterAccountInit("⏳ Read status before account init, scheduling re-sync")
markInboundProcessingFailure("Read packet skipped before account init")
return@launchInboundPacketTask return@launchInboundPacketTask
} }
val ownKey = getProtocol().getPublicKey().orEmpty() val ownKey = getProtocol().getPublicKey().orEmpty()
if (ownKey.isBlank()) { if (ownKey.isBlank()) {
requireResyncAfterAccountInit("⏳ Read status before protocol account init, scheduling re-sync") requireResyncAfterAccountInit("⏳ Read status before protocol account init, scheduling re-sync")
markInboundProcessingFailure("Read packet skipped before protocol account init")
return@launchInboundPacketTask return@launchInboundPacketTask
} }
if (!isSupportedDirectReadPacket(readPacket, ownKey)) { if (!isSupportedDirectReadPacket(readPacket, ownKey)) {
@@ -242,7 +312,12 @@ object ProtocolManager {
) )
return@launchInboundPacketTask return@launchInboundPacketTask
} }
repository.handleRead(readPacket) try {
repository.handleRead(readPacket)
} catch (e: Exception) {
markInboundProcessingFailure("Read processing failed", e)
return@launchInboundPacketTask
}
if (!syncBatchInProgress) { if (!syncBatchInProgress) {
repository.updateLastSyncTimestamp(System.currentTimeMillis()) repository.updateLastSyncTimestamp(System.currentTimeMillis())
} }
@@ -380,9 +455,14 @@ object ProtocolManager {
inboundQueueDrainJob = scope.launch { inboundQueueDrainJob = scope.launch {
for (task in inboundTaskChannel) { for (task in inboundTaskChannel) {
try { try {
task() withTimeout(INBOUND_TASK_TIMEOUT_MS) {
task()
}
} catch (t: Throwable) { } catch (t: Throwable) {
android.util.Log.e(TAG, "Dialog queue error", t) markInboundProcessingFailure(
"Dialog queue task failed or timed out (${INBOUND_TASK_TIMEOUT_MS}ms)",
t
)
} }
} }
} }
@@ -392,7 +472,10 @@ object ProtocolManager {
ensureInboundQueueDrainRunning() ensureInboundQueueDrainRunning()
val result = inboundTaskChannel.trySend(block) val result = inboundTaskChannel.trySend(block)
if (result.isFailure) { if (result.isFailure) {
android.util.Log.e(TAG, "Failed to enqueue inbound task", result.exceptionOrNull()) markInboundProcessingFailure(
"Failed to enqueue inbound task",
result.exceptionOrNull()
)
return false return false
} }
return true return true
@@ -536,9 +619,13 @@ object ProtocolManager {
} }
private fun sendSynchronize(timestamp: Long) { private fun sendSynchronize(timestamp: Long) {
val safeTimestamp = normalizeSyncTimestamp(timestamp)
val packet = PacketSync().apply { val packet = PacketSync().apply {
status = SyncStatus.NOT_NEEDED status = SyncStatus.NOT_NEEDED
this.timestamp = timestamp this.timestamp = safeTimestamp
}
if (safeTimestamp != timestamp) {
addLog("⚠️ SYNC request timestamp normalized: $timestamp -> $safeTimestamp")
} }
send(packet) send(packet)
} }
@@ -561,6 +648,8 @@ object ProtocolManager {
// Synchronous — guarantees syncBatchInProgress=true before any // Synchronous — guarantees syncBatchInProgress=true before any
// subsequent 0x06 packets are dispatched by OkHttp's sequential callback. // subsequent 0x06 packets are dispatched by OkHttp's sequential callback.
setSyncInProgress(true) setSyncInProgress(true)
inboundProcessingFailures.set(0)
syncBatchMaxProcessedTimestamp.set(0L)
} }
SyncStatus.BATCH_END -> { SyncStatus.BATCH_END -> {
addLog("🔄 SYNC BATCH_END — waiting for tasks to finish (ts=${packet.timestamp})") addLog("🔄 SYNC BATCH_END — waiting for tasks to finish (ts=${packet.timestamp})")
@@ -572,20 +661,51 @@ object ProtocolManager {
if (!tasksFinished) { if (!tasksFinished) {
android.util.Log.w( android.util.Log.w(
TAG, TAG,
"SYNC BATCH_END: queue unavailable, requesting re-sync without advancing cursor" "SYNC BATCH_END: queue unavailable, skipping cursor update for this step"
) )
val fallbackTimestamp = try {
messageRepository?.getLastSyncTimestamp() ?: packet.timestamp
} catch (e: Exception) {
android.util.Log.e(TAG, "Failed to read last sync timestamp for fallback", e)
packet.timestamp
}
sendSynchronize(fallbackTimestamp)
return@launch return@launch
} }
addLog("🔄 SYNC tasks done — saving timestamp ${packet.timestamp}, requesting next batch") val failuresInBatch = inboundProcessingFailures.getAndSet(0)
messageRepository?.updateLastSyncTimestamp(packet.timestamp) if (failuresInBatch > 0) {
sendSynchronize(packet.timestamp) addLog(
"⚠️ SYNC batch had $failuresInBatch processing error(s), continuing with desktop cursor behavior"
)
}
val repository = messageRepository
val currentCursor = normalizeSyncTimestamp(repository?.getLastSyncTimestamp() ?: 0L)
val safeBatchTimestamp = normalizeSyncTimestamp(packet.timestamp)
val processedMaxTimestamp = normalizeSyncTimestamp(syncBatchMaxProcessedTimestamp.get())
val nextCursor =
when {
safeBatchTimestamp <= 0L -> processedMaxTimestamp
processedMaxTimestamp <= 0L -> safeBatchTimestamp
processedMaxTimestamp < safeBatchTimestamp -> processedMaxTimestamp
else -> safeBatchTimestamp
}
val requestCursor =
when {
nextCursor > 0L -> nextCursor
currentCursor > 0L -> currentCursor
else -> 0L
}
// If server batch timestamp runs ahead of what we actually processed, clamp it.
// This avoids skipping tail messages when packet delivery/parsing was partial.
if (processedMaxTimestamp > 0L && processedMaxTimestamp < safeBatchTimestamp) {
addLog(
"⚠️ SYNC cursor clamped to processed max: server=$safeBatchTimestamp processed=$processedMaxTimestamp"
)
}
if (nextCursor > 0L) {
addLog("🔄 SYNC tasks done — saving timestamp $nextCursor, requesting next batch")
repository?.updateLastSyncTimestamp(nextCursor)
} else {
addLog(
"⚠️ SYNC batch cursor unresolved (server=$safeBatchTimestamp, processed=$processedMaxTimestamp, current=$currentCursor), requesting next batch with cursor=$requestCursor"
)
}
sendSynchronize(requestCursor)
} }
} }
SyncStatus.NOT_NEEDED -> { SyncStatus.NOT_NEEDED -> {
@@ -694,6 +814,51 @@ object ProtocolManager {
addLog("🔄 SYNC on foreground resume") addLog("🔄 SYNC on foreground resume")
requestSynchronize() requestSynchronize()
} }
/**
* Manual sync trigger from UI.
* Rewinds lastSync a bit to safely re-fetch recent packets and re-starts sync.
*/
fun forceSynchronize(backtrackMs: Long = MANUAL_SYNC_BACKTRACK_MS) {
if (!isAuthenticated()) {
reconnectNowIfNeeded("manual_sync_button")
return
}
if (syncBatchInProgress) return
scope.launch {
val repository = messageRepository
if (repository == null || !repository.isInitialized()) {
requireResyncAfterAccountInit("⏳ Manual sync postponed until account is initialized")
return@launch
}
val currentSync = repository.getLastSyncTimestamp()
val rewindTo = (currentSync - backtrackMs.coerceAtLeast(0L)).coerceAtLeast(0L)
setSyncInProgress(true)
addLog("🔄 MANUAL SYNC requested: lastSync=$currentSync -> rewind=$rewindTo")
sendSynchronize(rewindTo)
}
}
/**
* Defensive normalization for sync cursor/timestamps.
* Some malformed values arrive with extra decimal digits; heal by scaling down.
*/
private fun normalizeSyncTimestamp(rawTimestamp: Long): Long {
if (rawTimestamp <= 0L) return 0L
val now = System.currentTimeMillis()
val maxAllowed = now + MAX_SYNC_FUTURE_DRIFT_MS
var normalized = rawTimestamp
// Try to recover values with accidental decimal scaling (x10, x100, ...).
while (normalized > maxAllowed) {
normalized /= 10L
if (normalized <= 0L) return 0L
}
return normalized
}
/** /**
* Authenticate with server * Authenticate with server

View File

@@ -16,6 +16,14 @@ class Stream(stream: ByteArray = ByteArray(0)) {
fun getStream(): ByteArray { fun getStream(): ByteArray {
return _stream.map { it.toByte() }.toByteArray() return _stream.map { it.toByte() }.toByteArray()
} }
fun getReadPointerBits(): Int = _readPointer
fun getTotalBits(): Int = _stream.size * 8
fun getRemainingBits(): Int = getTotalBits() - _readPointer
fun hasRemainingBits(): Boolean = _readPointer < getTotalBits()
fun setStream(stream: ByteArray) { fun setStream(stream: ByteArray) {
_stream = stream.map { it.toInt() and 0xFF }.toMutableList() _stream = stream.map { it.toInt() and 0xFF }.toMutableList()
@@ -115,6 +123,15 @@ class Stream(stream: ByteArray = ByteArray(0)) {
fun readString(): String { fun readString(): String {
val length = readInt32() val length = readInt32()
// Desktop parity + safety: don't trust malformed string length.
val bytesAvailable = _stream.size - (_readPointer shr 3)
if (length < 0 || (length.toLong() * 2L) > bytesAvailable.toLong()) {
android.util.Log.w(
"RosettaStream",
"readString invalid length=$length, bytesAvailable=$bytesAvailable, readPointer=$_readPointer"
)
return ""
}
val sb = StringBuilder() val sb = StringBuilder()
for (i in 0 until length) { for (i in 0 until length) {
sb.append(readInt16().toChar()) sb.append(readInt16().toChar())

View File

@@ -1149,21 +1149,6 @@ fun ChatsListScreen(
} }
) )
// 🔄 Sync logs
DrawerMenuItemEnhanced(
painter = painterResource(id = R.drawable.files_document),
text = "Sync Logs",
iconColor = menuIconColor,
textColor = menuTextColor,
onClick = {
scope.launch {
drawerState.close()
kotlinx.coroutines.delay(100)
showSduLogs = true
}
}
)
} }
// ═══════════════════════════════════════════════════════════ // ═══════════════════════════════════════════════════════════

View File

@@ -3,7 +3,7 @@ package com.rosetta.messenger.ui.chats
import androidx.compose.foundation.background import androidx.compose.foundation.background
import androidx.compose.foundation.layout.* import androidx.compose.foundation.layout.*
import androidx.compose.foundation.lazy.LazyColumn import androidx.compose.foundation.lazy.LazyColumn
import androidx.compose.foundation.lazy.items import androidx.compose.foundation.lazy.itemsIndexed
import androidx.compose.foundation.lazy.rememberLazyListState import androidx.compose.foundation.lazy.rememberLazyListState
import androidx.compose.foundation.shape.RoundedCornerShape import androidx.compose.foundation.shape.RoundedCornerShape
import androidx.compose.material3.* import androidx.compose.material3.*
@@ -180,7 +180,7 @@ fun ConnectionLogsScreen(
.padding(horizontal = 8.dp, vertical = 4.dp), .padding(horizontal = 8.dp, vertical = 4.dp),
verticalArrangement = Arrangement.spacedBy(2.dp) verticalArrangement = Arrangement.spacedBy(2.dp)
) { ) {
items(logs, key = { it.hashCode().toString() + logs.indexOf(it) }) { log -> itemsIndexed(logs, key = { index, _ -> index }) { _, log ->
val logColor = when { val logColor = when {
"" in log || "FAILED" in log || "Error" in log || "error" in log -> Color(0xFFEF5350) "" in log || "FAILED" in log || "Error" in log || "error" in log -> Color(0xFFEF5350)
"" in log || "COMPLETE" in log || "SUCCESS" in log -> Color(0xFF4CAF50) "" in log || "COMPLETE" in log || "SUCCESS" in log -> Color(0xFF4CAF50)