feat: Bump version to 1.0.7, enhance message delivery handling, and add connection logs screen
This commit is contained in:
@@ -23,8 +23,8 @@ val gitShortSha = safeGitOutput("rev-parse", "--short", "HEAD") ?: "unknown"
|
||||
// ═══════════════════════════════════════════════════════════
|
||||
// Rosetta versioning — bump here on each release
|
||||
// ═══════════════════════════════════════════════════════════
|
||||
val rosettaVersionName = "1.0.6"
|
||||
val rosettaVersionCode = 6 // Increment on each release
|
||||
val rosettaVersionName = "1.0.7"
|
||||
val rosettaVersionCode = 7 // Increment on each release
|
||||
|
||||
android {
|
||||
namespace = "com.rosetta.messenger"
|
||||
|
||||
@@ -96,6 +96,9 @@ class MessageRepository private constructor(private val context: Context) {
|
||||
companion object {
|
||||
@Volatile private var INSTANCE: MessageRepository? = null
|
||||
|
||||
/** Desktop parity: MESSAGE_MAX_TIME_TO_DELEVERED_S = 80 (seconds) */
|
||||
private const val MESSAGE_MAX_TIME_TO_DELIVERED_MS = 80_000L
|
||||
|
||||
const val SYSTEM_SAFE_PUBLIC_KEY = "0x000000000000000000000000000000000000000002"
|
||||
const val SYSTEM_SAFE_TITLE = "Safe"
|
||||
const val SYSTEM_SAFE_USERNAME = "safe"
|
||||
@@ -773,6 +776,9 @@ class MessageRepository private constructor(private val context: Context) {
|
||||
)
|
||||
|
||||
// 🔥 Запрашиваем информацию о пользователе для отображения имени вместо ключа
|
||||
// Desktop parity: always re-fetch on incoming message so renamed contacts
|
||||
// get their new name/username updated in the chat list.
|
||||
requestedUserInfoKeys.remove(dialogOpponentKey)
|
||||
requestUserInfo(dialogOpponentKey)
|
||||
|
||||
// Обновляем кэш только если сообщение новое
|
||||
@@ -810,11 +816,22 @@ class MessageRepository private constructor(private val context: Context) {
|
||||
status = "DELIVERED"
|
||||
)
|
||||
|
||||
messageDao.updateDeliveryStatus(account, packet.messageId, DeliveryStatus.DELIVERED.value)
|
||||
// Desktop parity: update both delivery status AND timestamp on delivery confirmation.
|
||||
// Desktop sets timestamp = Date.now() when PacketDelivery arrives (useSynchronize.ts).
|
||||
val deliveryTimestamp = System.currentTimeMillis()
|
||||
messageDao.updateDeliveryStatusAndTimestamp(
|
||||
account, packet.messageId, DeliveryStatus.DELIVERED.value, deliveryTimestamp
|
||||
)
|
||||
|
||||
// Обновляем кэш
|
||||
// Обновляем кэш (status + timestamp)
|
||||
val dialogKey = getDialogKey(packet.toPublicKey)
|
||||
updateMessageStatus(dialogKey, packet.messageId, DeliveryStatus.DELIVERED)
|
||||
messageCache[dialogKey]?.let { flow ->
|
||||
flow.value = flow.value.map { msg ->
|
||||
if (msg.messageId == packet.messageId)
|
||||
msg.copy(deliveryStatus = DeliveryStatus.DELIVERED, timestamp = deliveryTimestamp)
|
||||
else msg
|
||||
}
|
||||
}
|
||||
|
||||
// 🔔 Уведомляем UI о смене статуса доставки
|
||||
_deliveryStatusEvents.tryEmit(
|
||||
@@ -950,6 +967,86 @@ class MessageRepository private constructor(private val context: Context) {
|
||||
}
|
||||
}
|
||||
|
||||
// ===============================
|
||||
// Retry WAITING messages on reconnect
|
||||
// ===============================
|
||||
|
||||
/**
|
||||
* Desktop parity: resend messages stuck in WAITING status.
|
||||
*
|
||||
* On Android, if the app is killed while a message is being sent, it stays in DB
|
||||
* with delivered=WAITING forever. Desktop has _packetQueue (in-memory) but desktop
|
||||
* apps are rarely force-killed. On Android this is critical.
|
||||
*
|
||||
* Messages older than MESSAGE_MAX_TIME_TO_DELIVERED_MS are marked as ERROR instead
|
||||
* of being retried (desktop: MESSAGE_MAX_TIME_TO_DELEVERED_S = 80s).
|
||||
*/
|
||||
suspend fun retryWaitingMessages() {
|
||||
val account = currentAccount ?: return
|
||||
val privateKey = currentPrivateKey ?: return
|
||||
val now = System.currentTimeMillis()
|
||||
|
||||
// Mark expired messages as ERROR (older than 80 seconds)
|
||||
val expiredCount = messageDao.markExpiredWaitingAsError(account, now - MESSAGE_MAX_TIME_TO_DELIVERED_MS)
|
||||
if (expiredCount > 0) {
|
||||
android.util.Log.w("MessageRepository", "⚠️ Marked $expiredCount expired WAITING messages as ERROR")
|
||||
}
|
||||
|
||||
// Get remaining WAITING messages (younger than 80s)
|
||||
val waitingMessages = messageDao.getWaitingMessages(account, now - MESSAGE_MAX_TIME_TO_DELIVERED_MS)
|
||||
if (waitingMessages.isEmpty()) return
|
||||
|
||||
android.util.Log.i("MessageRepository", "🔄 Retrying ${waitingMessages.size} WAITING messages")
|
||||
|
||||
for (entity in waitingMessages) {
|
||||
// Skip saved messages (should not happen, but guard)
|
||||
if (entity.fromPublicKey == entity.toPublicKey) continue
|
||||
|
||||
try {
|
||||
// The message is already saved in DB with encrypted content and chachaKey.
|
||||
// We can re-send the PacketMessage directly using stored fields.
|
||||
val aesChachaKeyValue = if (entity.chachaKey.startsWith("sync:")) {
|
||||
entity.chachaKey.removePrefix("sync:")
|
||||
} else {
|
||||
// Re-generate aesChachaKey from the stored chachaKey + privateKey.
|
||||
// The chachaKey in DB is the ECC-encrypted key for the recipient.
|
||||
// We need to decrypt it, then re-encrypt with our private key for self-sync.
|
||||
try {
|
||||
val plainKeyAndNonce = MessageCrypto.decryptKeyFromSender(entity.chachaKey, privateKey)
|
||||
CryptoManager.encryptWithPassword(
|
||||
String(plainKeyAndNonce, Charsets.ISO_8859_1),
|
||||
privateKey
|
||||
)
|
||||
} catch (e: Exception) {
|
||||
android.util.Log.w("MessageRepository", "⚠️ Cannot regenerate aesChachaKey for ${entity.messageId.take(8)}, sending without it")
|
||||
""
|
||||
}
|
||||
}
|
||||
|
||||
val packet = PacketMessage().apply {
|
||||
this.fromPublicKey = account
|
||||
this.toPublicKey = entity.toPublicKey
|
||||
this.content = entity.content
|
||||
this.chachaKey = if (entity.chachaKey.startsWith("sync:")) "" else entity.chachaKey
|
||||
this.aesChachaKey = aesChachaKeyValue
|
||||
this.timestamp = entity.timestamp
|
||||
this.privateKey = CryptoManager.generatePrivateKeyHash(privateKey)
|
||||
this.messageId = entity.messageId
|
||||
this.attachments = emptyList() // Attachments already uploaded, tags are in content
|
||||
}
|
||||
|
||||
ProtocolManager.send(packet)
|
||||
android.util.Log.d("MessageRepository", "🔄 Resent WAITING message: ${entity.messageId.take(8)}")
|
||||
} catch (e: Exception) {
|
||||
android.util.Log.e("MessageRepository", "❌ Failed to retry message ${entity.messageId.take(8)}: ${e.message}")
|
||||
// Mark as ERROR if retry fails
|
||||
messageDao.updateDeliveryStatus(account, entity.messageId, DeliveryStatus.ERROR.value)
|
||||
val dialogKey = getDialogKey(entity.toPublicKey)
|
||||
updateMessageStatus(dialogKey, entity.messageId, DeliveryStatus.ERROR)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ===============================
|
||||
// Private helpers
|
||||
// ===============================
|
||||
@@ -1100,6 +1197,53 @@ class MessageRepository private constructor(private val context: Context) {
|
||||
messageCache.remove(dialogKey)
|
||||
}
|
||||
|
||||
/**
|
||||
* Desktop parity: clear the one-shot guard so that names can be re-requested
|
||||
* after reconnect / sync. Desktop's useUserInformation re-fires on every render;
|
||||
* on Android we clear the guard after each sync cycle instead.
|
||||
*/
|
||||
fun clearUserInfoRequestCache() {
|
||||
requestedUserInfoKeys.clear()
|
||||
}
|
||||
|
||||
/**
|
||||
* Desktop parity: after sync, resolve names for ALL dialogs that still have
|
||||
* an empty / placeholder title. Desktop does this per-component via useUserInformation;
|
||||
* we batch it here for efficiency.
|
||||
*/
|
||||
suspend fun requestMissingUserInfo() {
|
||||
val account = currentAccount ?: return
|
||||
val privateKey = currentPrivateKey ?: return
|
||||
val privateKeyHash = CryptoManager.generatePrivateKeyHash(privateKey)
|
||||
|
||||
// Query dialogs with empty or placeholder titles
|
||||
val dialogs = dialogDao.getDialogsWithEmptyTitle(account)
|
||||
for (dialog in dialogs) {
|
||||
// Skip self (Saved Messages)
|
||||
if (dialog.opponentKey == account) continue
|
||||
// Skip if already requested in this cycle
|
||||
if (requestedUserInfoKeys.contains(dialog.opponentKey)) continue
|
||||
requestedUserInfoKeys.add(dialog.opponentKey)
|
||||
|
||||
val packet = PacketSearch().apply {
|
||||
this.privateKey = privateKeyHash
|
||||
this.search = dialog.opponentKey
|
||||
}
|
||||
ProtocolManager.send(packet)
|
||||
// Small delay to avoid flooding the server with search requests
|
||||
kotlinx.coroutines.delay(50)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Force-request user info, bypassing the one-shot guard.
|
||||
* Use when opening a dialog to ensure the name/username is fresh.
|
||||
*/
|
||||
fun forceRequestUserInfo(publicKey: String) {
|
||||
requestedUserInfoKeys.remove(publicKey)
|
||||
requestUserInfo(publicKey)
|
||||
}
|
||||
|
||||
/**
|
||||
* Запросить информацию о пользователе с сервера 🔥 Защита от бесконечных запросов - каждый ключ
|
||||
* запрашивается только один раз
|
||||
|
||||
@@ -17,15 +17,28 @@ object ReleaseNotes {
|
||||
val RELEASE_NOTICE = """
|
||||
Update v$VERSION_PLACEHOLDER
|
||||
|
||||
- Исправлена критическая ошибка синхронизации сообщений между ПК и мобильным устройством
|
||||
- Подтверждение доставки теперь отправляется только после успешной обработки
|
||||
- Автоматическая синхронизация при возврате из фона
|
||||
- Анимация сайдбара в стиле Telegram
|
||||
- Исправлены артефакты на разделителях при анимации
|
||||
- Улучшено качество блюра аватара на экранах профиля
|
||||
- Устранены артефакты по краям изображения
|
||||
- Обновлён цвет шапки и сайдбара в светлой теме
|
||||
- Белая галочка верификации на экранах профиля
|
||||
Синхронизация
|
||||
- Исправлена рассинхронизация сообщений между ПК и Android
|
||||
- Зависшие сообщения автоматически переотправляются после реконнекта
|
||||
- Уведомления больше не дублируются во время синхронизации
|
||||
- Время доставки сообщений теперь корректно обновляется
|
||||
|
||||
Контакты
|
||||
- Имена и юзернеймы загружаются автоматически при первом запуске
|
||||
- Имя контакта обновляется при открытии чата и входящем сообщении
|
||||
|
||||
Соединение
|
||||
- Исправлен баг с зависанием WebSocket-соединения
|
||||
- Автореконнект теперь срабатывает корректно при разрыве связи
|
||||
|
||||
Обновления
|
||||
- Система автообновлений — проверка, загрузка и установка APK
|
||||
- Новый экран обновлений с детальной информацией
|
||||
|
||||
Интерфейс
|
||||
- Бейдж непрочитанных на стрелке назад в чате
|
||||
- Новый значок верификации
|
||||
- Печатает = всегда онлайн
|
||||
""".trimIndent()
|
||||
|
||||
fun getNotice(version: String): String =
|
||||
|
||||
@@ -414,6 +414,50 @@ interface MessageDao {
|
||||
"""
|
||||
)
|
||||
suspend fun getLastMessageAttachments(account: String, opponent: String): String?
|
||||
|
||||
/**
|
||||
* Get all outgoing messages stuck in WAITING status (delivered = 0).
|
||||
* Used to retry sending on reconnect (desktop parity: _packetQueue flush).
|
||||
* Only returns messages younger than minTimestamp to avoid retrying stale messages.
|
||||
*/
|
||||
@Query(
|
||||
"""
|
||||
SELECT * FROM messages
|
||||
WHERE account = :account
|
||||
AND from_me = 1
|
||||
AND delivered = 0
|
||||
AND timestamp >= :minTimestamp
|
||||
ORDER BY timestamp ASC
|
||||
"""
|
||||
)
|
||||
suspend fun getWaitingMessages(account: String, minTimestamp: Long): List<MessageEntity>
|
||||
|
||||
/**
|
||||
* Mark old WAITING messages as ERROR (delivery timeout expired).
|
||||
* Desktop parity: MESSAGE_MAX_TIME_TO_DELEVERED_S = 80s.
|
||||
*/
|
||||
@Query(
|
||||
"""
|
||||
UPDATE messages SET delivered = 2
|
||||
WHERE account = :account
|
||||
AND from_me = 1
|
||||
AND delivered = 0
|
||||
AND timestamp < :maxTimestamp
|
||||
"""
|
||||
)
|
||||
suspend fun markExpiredWaitingAsError(account: String, maxTimestamp: Long): Int
|
||||
|
||||
/**
|
||||
* Update delivery status AND timestamp on delivery confirmation.
|
||||
* Desktop parity: useDialogFiber.ts sets timestamp = Date.now() on PacketDelivery.
|
||||
*/
|
||||
@Query(
|
||||
"""
|
||||
UPDATE messages SET delivered = :status, timestamp = :timestamp
|
||||
WHERE account = :account AND message_id = :messageId
|
||||
"""
|
||||
)
|
||||
suspend fun updateDeliveryStatusAndTimestamp(account: String, messageId: String, status: Int, timestamp: Long)
|
||||
}
|
||||
|
||||
/** DAO для работы с диалогами */
|
||||
@@ -480,6 +524,23 @@ interface DialogDao {
|
||||
)
|
||||
fun getRequestsCountFlow(account: String): Flow<Int>
|
||||
|
||||
/**
|
||||
* Desktop parity: get all dialogs where opponent_title is empty or equals the raw
|
||||
* public key (or its prefix). Used by requestMissingUserInfo() to batch-resolve names
|
||||
* after sync, like Desktop's useUserInformation per-component hook.
|
||||
*/
|
||||
@Query("""
|
||||
SELECT * FROM dialogs
|
||||
WHERE account = :account
|
||||
AND last_message_timestamp > 0
|
||||
AND (
|
||||
opponent_title = ''
|
||||
OR opponent_title = opponent_key
|
||||
OR LENGTH(opponent_title) <= 8
|
||||
)
|
||||
""")
|
||||
suspend fun getDialogsWithEmptyTitle(account: String): List<DialogEntity>
|
||||
|
||||
/** Получить диалог */
|
||||
@Query("SELECT * FROM dialogs WHERE account = :account AND opponent_key = :opponentKey LIMIT 1")
|
||||
suspend fun getDialog(account: String, opponentKey: String): DialogEntity?
|
||||
|
||||
@@ -97,8 +97,8 @@ class Protocol(
|
||||
// Packet waiters - callbacks for specific packet types (thread-safe)
|
||||
private val packetWaiters = java.util.concurrent.ConcurrentHashMap<Int, MutableList<(Packet) -> Unit>>()
|
||||
|
||||
// Packet queue for packets sent before handshake complete
|
||||
private val packetQueue = mutableListOf<Packet>()
|
||||
// Packet queue for packets sent before handshake complete (thread-safe)
|
||||
private val packetQueue = java.util.Collections.synchronizedList(mutableListOf<Packet>())
|
||||
|
||||
// Last used credentials for reconnection
|
||||
private var lastPublicKey: String? = null
|
||||
@@ -314,6 +314,15 @@ class Protocol(
|
||||
|
||||
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
|
||||
log("⚠️ WebSocket CLOSING: code=$code reason='$reason' state=${_state.value}")
|
||||
// Must respond with close() so OkHttp transitions to onClosed.
|
||||
// Without this, the socket stays in a half-closed "zombie" state —
|
||||
// heartbeat keeps running but no data flows, and handleDisconnect
|
||||
// is never called (it only fires from onClosed/onFailure).
|
||||
try {
|
||||
webSocket.close(code, reason)
|
||||
} catch (e: Exception) {
|
||||
log("⚠️ Error responding to CLOSING: ${e.message}")
|
||||
}
|
||||
}
|
||||
|
||||
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
|
||||
@@ -470,9 +479,12 @@ class Protocol(
|
||||
}
|
||||
|
||||
private fun flushPacketQueue() {
|
||||
log("📬 Flushing ${packetQueue.size} queued packets")
|
||||
val packets = packetQueue.toList()
|
||||
packetQueue.clear()
|
||||
val packets: List<Packet>
|
||||
synchronized(packetQueue) {
|
||||
packets = packetQueue.toList()
|
||||
packetQueue.clear()
|
||||
}
|
||||
log("📬 Flushing ${packets.size} queued packets")
|
||||
packets.forEach { sendPacketDirect(it) }
|
||||
}
|
||||
|
||||
|
||||
@@ -6,11 +6,10 @@ import com.rosetta.messenger.data.AccountManager
|
||||
import com.rosetta.messenger.data.MessageRepository
|
||||
import com.rosetta.messenger.data.isPlaceholderAccountName
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
import kotlinx.coroutines.flow.StateFlow
|
||||
import kotlinx.coroutines.flow.asStateFlow
|
||||
import kotlinx.coroutines.sync.Mutex
|
||||
import kotlinx.coroutines.sync.withLock
|
||||
import java.security.SecureRandom
|
||||
import java.text.SimpleDateFormat
|
||||
import java.util.*
|
||||
@@ -70,10 +69,14 @@ object ProtocolManager {
|
||||
private val _syncInProgress = MutableStateFlow(false)
|
||||
val syncInProgress: StateFlow<Boolean> = _syncInProgress.asStateFlow()
|
||||
@Volatile private var resyncRequiredAfterAccountInit = false
|
||||
// Desktop parity: sequential task queue with Job-based completion tracking
|
||||
// (replaces AtomicInteger polling with 15s timeout that could lose messages)
|
||||
private val inboundPacketMutex = Mutex()
|
||||
@Volatile private var lastInboundJob: Job = Job().also { it.complete() }
|
||||
// Desktop parity: sequential task queue matching dialogQueue.ts (promise chain).
|
||||
// Uses Channel to guarantee strict FIFO ordering (Mutex+lastInboundJob had a race
|
||||
// condition: Dispatchers.IO doesn't guarantee FIFO, so the last-launched job could
|
||||
// finish before earlier ones, causing whenInboundTasksFinish to return prematurely
|
||||
// and BATCH_END to advance the sync timestamp while messages were still processing).
|
||||
private val inboundTaskChannel = Channel<suspend () -> Unit>(Channel.UNLIMITED)
|
||||
// Tracks the tail of the sequential processing chain (like desktop's `tail` promise)
|
||||
@Volatile private var inboundQueueDrainJob: Job? = null
|
||||
|
||||
private fun setSyncInProgress(value: Boolean) {
|
||||
syncBatchInProgress = value
|
||||
@@ -86,10 +89,8 @@ object ProtocolManager {
|
||||
val timestamp = dateFormat.format(Date())
|
||||
val logLine = "[$timestamp] $message"
|
||||
|
||||
// UI логи отключены по умолчанию - вызывали ANR из-за перекомпозиций
|
||||
if (uiLogsEnabled) {
|
||||
_debugLogs.value = (_debugLogs.value + logLine).takeLast(50)
|
||||
}
|
||||
// Always keep logs in memory for the Logs screen (capped at 500)
|
||||
_debugLogs.value = (_debugLogs.value + logLine).takeLast(500)
|
||||
}
|
||||
|
||||
fun enableUILogs(enabled: Boolean) {
|
||||
@@ -184,6 +185,7 @@ object ProtocolManager {
|
||||
}
|
||||
|
||||
// Обработчик доставки (0x08)
|
||||
// Desktop parity: useDialogFiber.ts updates sync time on delivery (await updateSyncTime(Date.now()))
|
||||
waitPacket(0x08) { packet ->
|
||||
val deliveryPacket = packet as PacketDelivery
|
||||
|
||||
@@ -194,6 +196,9 @@ object ProtocolManager {
|
||||
return@launchInboundPacketTask
|
||||
}
|
||||
repository.handleDelivery(deliveryPacket)
|
||||
if (!syncBatchInProgress) {
|
||||
repository.updateLastSyncTimestamp(System.currentTimeMillis())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -331,22 +336,32 @@ object ProtocolManager {
|
||||
}
|
||||
|
||||
/**
|
||||
* Desktop parity: sequential task queue (like dialogQueue.ts runTaskInQueue).
|
||||
* All inbound packet tasks are serialized via mutex.
|
||||
* lastInboundJob tracks the last submitted job so BATCH_END can await completion
|
||||
* without an arbitrary timeout (desktop uses `await whenFinish()`).
|
||||
* Desktop parity: sequential task queue (like dialogQueue.ts runTaskInQueue / whenFinish).
|
||||
*
|
||||
* Desktop uses a promise chain: `tail = tail.then(fn).catch(...)` which guarantees
|
||||
* strict FIFO ordering and `whenFinish = () => tail` returns a promise that resolves
|
||||
* only after ALL queued tasks complete.
|
||||
*
|
||||
* We reproduce this with a Channel<suspend () -> Unit> (UNLIMITED buffer) consumed
|
||||
* by a single coroutine. Tasks are executed strictly in the order they were submitted,
|
||||
* and `whenInboundTasksFinish()` waits for the queue to drain completely.
|
||||
*/
|
||||
private fun launchInboundPacketTask(block: suspend () -> Unit) {
|
||||
val job = scope.launch {
|
||||
try {
|
||||
inboundPacketMutex.withLock {
|
||||
block()
|
||||
private fun ensureInboundQueueDrainRunning() {
|
||||
if (inboundQueueDrainJob?.isActive == true) return
|
||||
inboundQueueDrainJob = scope.launch {
|
||||
for (task in inboundTaskChannel) {
|
||||
try {
|
||||
task()
|
||||
} catch (e: Exception) {
|
||||
android.util.Log.e(TAG, "Inbound packet task error", e)
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
android.util.Log.e(TAG, "Inbound packet task error", e)
|
||||
}
|
||||
}
|
||||
lastInboundJob = job
|
||||
}
|
||||
|
||||
private fun launchInboundPacketTask(block: suspend () -> Unit) {
|
||||
ensureInboundQueueDrainRunning()
|
||||
inboundTaskChannel.trySend(block)
|
||||
}
|
||||
|
||||
private fun requireResyncAfterAccountInit(reason: String) {
|
||||
@@ -358,12 +373,14 @@ object ProtocolManager {
|
||||
|
||||
/**
|
||||
* Desktop parity: equivalent of `await whenFinish()` in useSynchronize.ts.
|
||||
* Waits for all currently queued inbound packet tasks to complete.
|
||||
* Since tasks are serialized via mutex, awaiting the last job
|
||||
* guarantees all previous jobs have finished.
|
||||
* Sends a sentinel task into the sequential queue and suspends until it executes.
|
||||
* Since the queue is strictly FIFO, when the sentinel runs, all previously
|
||||
* submitted tasks are guaranteed to have completed.
|
||||
*/
|
||||
private suspend fun whenInboundTasksFinish() {
|
||||
lastInboundJob.join()
|
||||
val done = CompletableDeferred<Unit>()
|
||||
launchInboundPacketTask { done.complete(Unit) }
|
||||
done.await()
|
||||
}
|
||||
|
||||
private fun onAuthenticated() {
|
||||
@@ -393,13 +410,21 @@ object ProtocolManager {
|
||||
}
|
||||
|
||||
private fun requestSynchronize() {
|
||||
// Desktop parity: set syncBatchInProgress=true BEFORE sending the sync request.
|
||||
// This closes the race window between AUTHENTICATED → BATCH_START where real-time
|
||||
// messages could arrive and update lastSync, potentially advancing the cursor past
|
||||
// messages the server hasn't delivered yet.
|
||||
setSyncInProgress(true)
|
||||
addLog("🔄 SYNC requested — fetching last sync timestamp...")
|
||||
scope.launch {
|
||||
val repository = messageRepository
|
||||
if (repository == null || !repository.isInitialized()) {
|
||||
setSyncInProgress(false)
|
||||
requireResyncAfterAccountInit("⏳ Sync postponed until account is initialized")
|
||||
return@launch
|
||||
}
|
||||
val lastSync = repository.getLastSyncTimestamp()
|
||||
addLog("🔄 SYNC sending request with lastSync=$lastSync")
|
||||
sendSynchronize(lastSync)
|
||||
}
|
||||
}
|
||||
@@ -414,29 +439,89 @@ object ProtocolManager {
|
||||
|
||||
/**
|
||||
* Desktop parity: useSynchronize.ts usePacket(25, ...)
|
||||
* BATCH_START → mark sync in progress
|
||||
* BATCH_START → mark sync in progress (synchronous — no scope.launch)
|
||||
* BATCH_END → wait for ALL message tasks to finish, save timestamp, request next batch
|
||||
* NOT_NEEDED → sync complete, save timestamp, mark connected
|
||||
* NOT_NEEDED → sync complete, mark connected (synchronous — no scope.launch)
|
||||
*
|
||||
* CRITICAL: BATCH_START and NOT_NEEDED are handled synchronously in the WebSocket
|
||||
* callback thread. This prevents a race condition where 0x06 message packets arrive
|
||||
* and check syncBatchInProgress BEFORE the scope.launch coroutine for BATCH_START
|
||||
* has been scheduled on Dispatchers.IO.
|
||||
*/
|
||||
private fun handleSyncPacket(packet: PacketSync) {
|
||||
scope.launch {
|
||||
when (packet.status) {
|
||||
SyncStatus.BATCH_START -> {
|
||||
setSyncInProgress(true)
|
||||
}
|
||||
SyncStatus.BATCH_END -> {
|
||||
when (packet.status) {
|
||||
SyncStatus.BATCH_START -> {
|
||||
addLog("🔄 SYNC BATCH_START — incoming message batch")
|
||||
// Synchronous — guarantees syncBatchInProgress=true before any
|
||||
// subsequent 0x06 packets are dispatched by OkHttp's sequential callback.
|
||||
setSyncInProgress(true)
|
||||
}
|
||||
SyncStatus.BATCH_END -> {
|
||||
addLog("🔄 SYNC BATCH_END — waiting for tasks to finish (ts=${packet.timestamp})")
|
||||
// BATCH_END requires suspend (whenInboundTasksFinish), so we launch a coroutine.
|
||||
// syncBatchInProgress stays true until NOT_NEEDED arrives.
|
||||
scope.launch {
|
||||
setSyncInProgress(true)
|
||||
// Desktop: await whenFinish() — wait for ALL queued tasks without timeout.
|
||||
// Old code used 15s polling timeout which could advance the sync timestamp
|
||||
// before all messages were processed, causing message loss on app crash.
|
||||
whenInboundTasksFinish()
|
||||
addLog("🔄 SYNC tasks done — saving timestamp ${packet.timestamp}, requesting next batch")
|
||||
messageRepository?.updateLastSyncTimestamp(packet.timestamp)
|
||||
sendSynchronize(packet.timestamp)
|
||||
}
|
||||
SyncStatus.NOT_NEEDED -> {
|
||||
setSyncInProgress(false)
|
||||
messageRepository?.updateLastSyncTimestamp(packet.timestamp)
|
||||
}
|
||||
}
|
||||
SyncStatus.NOT_NEEDED -> {
|
||||
addLog("✅ SYNC COMPLETE — no more messages to sync")
|
||||
// Synchronous — immediately marks sync as complete.
|
||||
// Desktop parity: NOT_NEEDED just sets state to CONNECTED,
|
||||
// does NOT update last_sync timestamp (unnecessary since client
|
||||
// was already up to date).
|
||||
setSyncInProgress(false)
|
||||
// Retry any messages stuck in WAITING status from previous sessions.
|
||||
retryWaitingMessages()
|
||||
// Desktop parity: resolve names for all dialogs with empty titles.
|
||||
// Desktop does this per-component via useUserInformation hook;
|
||||
// we batch it after sync for efficiency.
|
||||
requestMissingUserInfo()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Retry messages stuck in WAITING status on reconnect.
|
||||
* Desktop has in-memory _packetQueue that flushes on handshake, but desktop apps are
|
||||
* rarely force-killed. On Android, the app can be killed mid-send, leaving messages
|
||||
* in WAITING status in the DB. This method resends them after sync completes.
|
||||
*
|
||||
* Messages older than 80s (MESSAGE_MAX_TIME_TO_DELEVERED_S) are marked ERROR.
|
||||
*/
|
||||
private fun retryWaitingMessages() {
|
||||
scope.launch {
|
||||
val repository = messageRepository
|
||||
if (repository == null || !repository.isInitialized()) return@launch
|
||||
try {
|
||||
repository.retryWaitingMessages()
|
||||
} catch (e: Exception) {
|
||||
android.util.Log.e(TAG, "retryWaitingMessages failed", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Desktop parity: after sync completes, resolve names/usernames for all dialogs
|
||||
* that still have empty titles. Clears the one-shot guard first so that previously
|
||||
* failed requests can be retried.
|
||||
*/
|
||||
private fun requestMissingUserInfo() {
|
||||
scope.launch {
|
||||
val repository = messageRepository
|
||||
if (repository == null || !repository.isInitialized()) return@launch
|
||||
try {
|
||||
repository.clearUserInfoRequestCache()
|
||||
repository.requestMissingUserInfo()
|
||||
} catch (e: Exception) {
|
||||
android.util.Log.e(TAG, "requestMissingUserInfo failed", e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -155,6 +155,11 @@ class RosettaFirebaseMessagingService : FirebaseMessagingService() {
|
||||
if (isAppInForeground || !areNotificationsEnabled()) {
|
||||
return
|
||||
}
|
||||
// Desktop parity: suppress notifications during sync (useDialogFiber.ts checks
|
||||
// protocolState != ProtocolState.SYNCHRONIZATION before calling notify()).
|
||||
if (ProtocolManager.syncInProgress.value) {
|
||||
return
|
||||
}
|
||||
val senderKey = senderPublicKey?.trim().orEmpty()
|
||||
if (senderKey.isNotEmpty() && isDialogMuted(senderKey)) {
|
||||
return
|
||||
|
||||
@@ -425,7 +425,9 @@ fun ChatDetailScreen(
|
||||
val inputText by viewModel.inputText.collectAsState()
|
||||
val isTyping by viewModel.opponentTyping.collectAsState()
|
||||
val isLoadingMore by viewModel.isLoadingMore.collectAsState()
|
||||
val isOnline by viewModel.opponentOnline.collectAsState()
|
||||
val rawIsOnline by viewModel.opponentOnline.collectAsState()
|
||||
// If typing, the user is obviously online — never show "offline" while typing
|
||||
val isOnline = rawIsOnline || isTyping
|
||||
val isLoading by viewModel.isLoading.collectAsState() // 🔥 Для скелетона
|
||||
|
||||
// <20>🔥 Reply/Forward state
|
||||
@@ -905,7 +907,7 @@ fun ChatDetailScreen(
|
||||
)
|
||||
.background(
|
||||
Color(
|
||||
0xFFFF3B30
|
||||
0xFF3B82F6
|
||||
)
|
||||
),
|
||||
contentAlignment =
|
||||
|
||||
@@ -584,6 +584,10 @@ class ChatViewModel(application: Application) : AndroidViewModel(application) {
|
||||
subscribedToOnlineStatus = false // 🔥 Сбрасываем флаг подписки при смене диалога
|
||||
isDialogActive = true // 🔥 Диалог активен!
|
||||
|
||||
// Desktop parity: refresh opponent name/username from server on dialog open,
|
||||
// so renamed contacts get their new name displayed immediately.
|
||||
messageRepository?.forceRequestUserInfo(publicKey)
|
||||
|
||||
// 📝 Восстанавливаем черновик для этого диалога (draft, как в Telegram)
|
||||
val draft = com.rosetta.messenger.data.DraftManager.getDraft(publicKey)
|
||||
_inputText.value = draft ?: ""
|
||||
|
||||
@@ -189,6 +189,10 @@ class ChatsListViewModel(application: Application) : AndroidViewModel(applicatio
|
||||
dialog.opponentTitle ==
|
||||
dialog.opponentKey.take(
|
||||
7
|
||||
) ||
|
||||
dialog.opponentTitle ==
|
||||
dialog.opponentKey.take(
|
||||
8
|
||||
))
|
||||
) {
|
||||
loadUserInfoForDialog(dialog.opponentKey)
|
||||
@@ -371,6 +375,20 @@ class ChatsListViewModel(application: Application) : AndroidViewModel(applicatio
|
||||
.collect { blockedSet -> _blockedUsers.value = blockedSet }
|
||||
}
|
||||
|
||||
// Desktop parity: when sync finishes (syncInProgress transitions true → false),
|
||||
// clear the one-shot requestedUserInfoKeys guard so the dialog-list .map{} block
|
||||
// can re-trigger loadUserInfoForDialog() on the next Room emission for any
|
||||
// dialogs that still have empty titles.
|
||||
launch {
|
||||
var wasSyncing = false
|
||||
ProtocolManager.syncInProgress.collect { syncing ->
|
||||
if (wasSyncing && !syncing) {
|
||||
requestedUserInfoKeys.clear()
|
||||
}
|
||||
wasSyncing = syncing
|
||||
}
|
||||
}
|
||||
|
||||
} // end accountSubscriptionsJob
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,215 @@
|
||||
package com.rosetta.messenger.ui.chats
|
||||
|
||||
import androidx.compose.foundation.background
|
||||
import androidx.compose.foundation.layout.*
|
||||
import androidx.compose.foundation.lazy.LazyColumn
|
||||
import androidx.compose.foundation.lazy.items
|
||||
import androidx.compose.foundation.lazy.rememberLazyListState
|
||||
import androidx.compose.foundation.shape.RoundedCornerShape
|
||||
import androidx.compose.material3.*
|
||||
import androidx.compose.runtime.*
|
||||
import androidx.compose.ui.Alignment
|
||||
import androidx.compose.ui.Modifier
|
||||
import androidx.compose.ui.graphics.Color
|
||||
import androidx.compose.ui.text.font.FontFamily
|
||||
import androidx.compose.ui.text.font.FontWeight
|
||||
import androidx.compose.ui.unit.dp
|
||||
import androidx.compose.ui.unit.sp
|
||||
import com.rosetta.messenger.network.ProtocolManager
|
||||
import com.rosetta.messenger.network.ProtocolState
|
||||
import compose.icons.TablerIcons
|
||||
import compose.icons.tablericons.*
|
||||
import kotlinx.coroutines.launch
|
||||
|
||||
/**
|
||||
* Full-screen connection logs viewer.
|
||||
* Shows all protocol/WebSocket logs from ProtocolManager.debugLogs.
|
||||
*/
|
||||
@OptIn(ExperimentalMaterial3Api::class)
|
||||
@Composable
|
||||
fun ConnectionLogsScreen(
|
||||
isDarkTheme: Boolean,
|
||||
onBack: () -> Unit
|
||||
) {
|
||||
val logs by ProtocolManager.debugLogs.collectAsState()
|
||||
val protocolState by ProtocolManager.getProtocol().state.collectAsState()
|
||||
val syncInProgress by ProtocolManager.syncInProgress.collectAsState()
|
||||
|
||||
val bgColor = if (isDarkTheme) Color(0xFF0E0E0E) else Color(0xFFF5F5F5)
|
||||
val cardColor = if (isDarkTheme) Color(0xFF1A1A1A) else Color.White
|
||||
val textColor = if (isDarkTheme) Color(0xFFE0E0E0) else Color(0xFF1A1A1A)
|
||||
val headerColor = if (isDarkTheme) Color(0xFF1E1E1E) else Color(0xFF228BE6)
|
||||
|
||||
val listState = rememberLazyListState()
|
||||
val scope = rememberCoroutineScope()
|
||||
|
||||
// Auto-scroll to bottom when new logs arrive
|
||||
LaunchedEffect(logs.size) {
|
||||
if (logs.isNotEmpty()) {
|
||||
listState.animateScrollToItem(logs.size - 1)
|
||||
}
|
||||
}
|
||||
|
||||
Column(
|
||||
modifier = Modifier
|
||||
.fillMaxSize()
|
||||
.background(bgColor)
|
||||
.statusBarsPadding()
|
||||
) {
|
||||
// Header
|
||||
Box(
|
||||
modifier = Modifier
|
||||
.fillMaxWidth()
|
||||
.background(headerColor)
|
||||
.padding(horizontal = 8.dp, vertical = 4.dp)
|
||||
) {
|
||||
Row(
|
||||
modifier = Modifier.fillMaxWidth(),
|
||||
verticalAlignment = Alignment.CenterVertically
|
||||
) {
|
||||
IconButton(onClick = onBack) {
|
||||
Icon(
|
||||
imageVector = TablerIcons.ArrowLeft,
|
||||
contentDescription = "Back",
|
||||
tint = Color.White
|
||||
)
|
||||
}
|
||||
|
||||
Text(
|
||||
text = "Connection Logs",
|
||||
color = Color.White,
|
||||
fontSize = 18.sp,
|
||||
fontWeight = FontWeight.SemiBold,
|
||||
modifier = Modifier.weight(1f)
|
||||
)
|
||||
|
||||
// Clear button
|
||||
IconButton(onClick = { ProtocolManager.clearLogs() }) {
|
||||
Icon(
|
||||
imageVector = TablerIcons.Trash,
|
||||
contentDescription = "Clear logs",
|
||||
tint = Color.White.copy(alpha = 0.8f),
|
||||
modifier = Modifier.size(22.dp)
|
||||
)
|
||||
}
|
||||
|
||||
// Scroll to bottom
|
||||
IconButton(onClick = {
|
||||
scope.launch {
|
||||
if (logs.isNotEmpty()) listState.animateScrollToItem(logs.size - 1)
|
||||
}
|
||||
}) {
|
||||
Icon(
|
||||
imageVector = TablerIcons.ArrowDown,
|
||||
contentDescription = "Scroll to bottom",
|
||||
tint = Color.White.copy(alpha = 0.8f),
|
||||
modifier = Modifier.size(22.dp)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Status bar
|
||||
Row(
|
||||
modifier = Modifier
|
||||
.fillMaxWidth()
|
||||
.background(if (isDarkTheme) Color(0xFF252525) else Color(0xFFE8E8E8))
|
||||
.padding(horizontal = 16.dp, vertical = 8.dp),
|
||||
verticalAlignment = Alignment.CenterVertically,
|
||||
horizontalArrangement = Arrangement.SpaceBetween
|
||||
) {
|
||||
val stateColor = when (protocolState) {
|
||||
ProtocolState.AUTHENTICATED -> Color(0xFF4CAF50)
|
||||
ProtocolState.CONNECTING, ProtocolState.HANDSHAKING -> Color(0xFFFFA726)
|
||||
ProtocolState.DISCONNECTED -> Color(0xFFEF5350)
|
||||
else -> Color(0xFF9E9E9E)
|
||||
}
|
||||
|
||||
Row(verticalAlignment = Alignment.CenterVertically) {
|
||||
Box(
|
||||
modifier = Modifier
|
||||
.size(8.dp)
|
||||
.background(stateColor, RoundedCornerShape(4.dp))
|
||||
)
|
||||
Spacer(modifier = Modifier.width(8.dp))
|
||||
Text(
|
||||
text = protocolState.name,
|
||||
color = textColor,
|
||||
fontSize = 13.sp,
|
||||
fontWeight = FontWeight.Medium,
|
||||
fontFamily = FontFamily.Monospace
|
||||
)
|
||||
}
|
||||
|
||||
if (syncInProgress) {
|
||||
Text(
|
||||
text = "SYNCING…",
|
||||
color = Color(0xFFFFA726),
|
||||
fontSize = 12.sp,
|
||||
fontWeight = FontWeight.Bold,
|
||||
fontFamily = FontFamily.Monospace
|
||||
)
|
||||
}
|
||||
|
||||
Text(
|
||||
text = "${logs.size} logs",
|
||||
color = textColor.copy(alpha = 0.5f),
|
||||
fontSize = 12.sp,
|
||||
fontFamily = FontFamily.Monospace
|
||||
)
|
||||
}
|
||||
|
||||
// Logs list
|
||||
if (logs.isEmpty()) {
|
||||
Box(
|
||||
modifier = Modifier.fillMaxSize(),
|
||||
contentAlignment = Alignment.Center
|
||||
) {
|
||||
Text(
|
||||
text = "No logs yet.\nConnect to see protocol activity.",
|
||||
color = textColor.copy(alpha = 0.4f),
|
||||
fontSize = 14.sp,
|
||||
textAlign = androidx.compose.ui.text.style.TextAlign.Center
|
||||
)
|
||||
}
|
||||
} else {
|
||||
LazyColumn(
|
||||
state = listState,
|
||||
modifier = Modifier
|
||||
.fillMaxSize()
|
||||
.padding(horizontal = 8.dp, vertical = 4.dp),
|
||||
verticalArrangement = Arrangement.spacedBy(2.dp)
|
||||
) {
|
||||
items(logs, key = { it.hashCode().toString() + logs.indexOf(it) }) { log ->
|
||||
val logColor = when {
|
||||
"❌" in log || "FAILED" in log || "Error" in log || "error" in log -> Color(0xFFEF5350)
|
||||
"✅" in log || "COMPLETE" in log || "SUCCESS" in log -> Color(0xFF4CAF50)
|
||||
"⚠️" in log || "WARNING" in log -> Color(0xFFFFA726)
|
||||
"🔄" in log || "RECONNECT" in log || "SYNC" in log -> Color(0xFF42A5F5)
|
||||
"💓" in log || "Heartbeat" in log -> Color(0xFF9E9E9E)
|
||||
"📤" in log || "Sending" in log -> Color(0xFF7E57C2)
|
||||
"📥" in log || "onMessage" in log -> Color(0xFF26A69A)
|
||||
"🤝" in log || "HANDSHAKE" in log -> Color(0xFFFFCA28)
|
||||
else -> textColor.copy(alpha = 0.85f)
|
||||
}
|
||||
|
||||
Text(
|
||||
text = log,
|
||||
color = logColor,
|
||||
fontSize = 11.sp,
|
||||
fontFamily = FontFamily.Monospace,
|
||||
lineHeight = 15.sp,
|
||||
modifier = Modifier
|
||||
.fillMaxWidth()
|
||||
.background(
|
||||
if ("❌" in log) Color.Red.copy(alpha = 0.08f)
|
||||
else Color.Transparent,
|
||||
RoundedCornerShape(4.dp)
|
||||
)
|
||||
.padding(horizontal = 6.dp, vertical = 2.dp)
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user