diff --git a/app/build.gradle.kts b/app/build.gradle.kts index bb6cadb..c856419 100644 --- a/app/build.gradle.kts +++ b/app/build.gradle.kts @@ -23,8 +23,8 @@ val gitShortSha = safeGitOutput("rev-parse", "--short", "HEAD") ?: "unknown" // ═══════════════════════════════════════════════════════════ // Rosetta versioning — bump here on each release // ═══════════════════════════════════════════════════════════ -val rosettaVersionName = "1.2.9" -val rosettaVersionCode = 31 // Increment on each release +val rosettaVersionName = "1.3.0" +val rosettaVersionCode = 32 // Increment on each release android { namespace = "com.rosetta.messenger" diff --git a/app/src/main/java/com/rosetta/messenger/data/MessageRepository.kt b/app/src/main/java/com/rosetta/messenger/data/MessageRepository.kt index 2ff320c..b524b41 100644 --- a/app/src/main/java/com/rosetta/messenger/data/MessageRepository.kt +++ b/app/src/main/java/com/rosetta/messenger/data/MessageRepository.kt @@ -686,13 +686,6 @@ class MessageRepository private constructor(private val context: Context) { return true } - // 🔥 ВТОРОЙ УРОВЕНЬ ЗАЩИТЫ: Проверка в БД (для сообщений сохранённых в предыдущих сессиях) - val isDuplicate = messageDao.messageExists(account, messageId) - MessageLogger.logDuplicateCheck(messageId, isDuplicate) - if (isDuplicate) { - return true - } - val dialogOpponentKey = when { isGroupMessage -> packet.toPublicKey @@ -701,6 +694,33 @@ class MessageRepository private constructor(private val context: Context) { } val dialogKey = getDialogKey(dialogOpponentKey) + // 🔥 ВТОРОЙ УРОВЕНЬ ЗАЩИТЫ: Проверка в БД (для сообщений сохранённых в предыдущих сессиях) + val isDuplicate = messageDao.messageExists(account, messageId) + MessageLogger.logDuplicateCheck(messageId, isDuplicate) + if (isDuplicate) { + // Desktop/server parity: + // own messages that arrive via sync must be treated as delivered. + // If a local optimistic row already exists (WAITING/ERROR), normalize it. + if (isOwnMessage) { + messageDao.updateDeliveryStatus(account, messageId, DeliveryStatus.DELIVERED.value) + messageCache[dialogKey]?.let { flow -> + flow.value = + flow.value.map { msg -> + if (msg.messageId == messageId) { + msg.copy(deliveryStatus = DeliveryStatus.DELIVERED) + } else { + msg + } + } + } + _deliveryStatusEvents.tryEmit( + DeliveryStatusUpdate(dialogKey, messageId, DeliveryStatus.DELIVERED) + ) + dialogDao.updateDialogFromMessages(account, dialogOpponentKey) + } + return true + } + try { val groupKey = if (isGroupMessage) { diff --git a/app/src/main/java/com/rosetta/messenger/data/ReleaseNotes.kt b/app/src/main/java/com/rosetta/messenger/data/ReleaseNotes.kt index 356cfa5..affa43b 100644 --- a/app/src/main/java/com/rosetta/messenger/data/ReleaseNotes.kt +++ b/app/src/main/java/com/rosetta/messenger/data/ReleaseNotes.kt @@ -17,10 +17,11 @@ object ReleaseNotes { val RELEASE_NOTICE = """ Update v$VERSION_PLACEHOLDER - Полноэкранный просмотр фото - - Убраны лишние искусственные отступы в fullscreen viewer - - Фото (включая большие скриншоты) снова открываются edge-to-edge, как в Telegram - - Исправлены большие чёрные бордеры вокруг изображения при открытии + Синхронизация 1 в 1 с desktop/server + - Выровнен сетевой контракт пакетов как в desktop: добавлена поддержка 0x10 (push), 0x1A (signal), 0x1B (webrtc), 0x1C (ice) + - Исправлена нормализация дубликатов своих сообщений из sync: локальные WAITING/ERROR теперь автоматически переходят в DELIVERED + - Добавлен watchdog для sync-запроса: если ответ на PacketSync завис, запрос перезапускается автоматически + - Повышена стабильность цикла BATCH_START/BATCH_END/NOT_NEEDED при reconnect """.trimIndent() fun getNotice(version: String): String = diff --git a/app/src/main/java/com/rosetta/messenger/network/PacketIceServers.kt b/app/src/main/java/com/rosetta/messenger/network/PacketIceServers.kt new file mode 100644 index 0000000..4875db6 --- /dev/null +++ b/app/src/main/java/com/rosetta/messenger/network/PacketIceServers.kt @@ -0,0 +1,47 @@ +package com.rosetta.messenger.network + +data class IceServer( + val url: String, + val username: String, + val credential: String, + val transport: String +) + +/** + * ICE servers packet (ID: 0x1C / 28). + * Wire format mirrors desktop packet.ice.servers.ts. + */ +class PacketIceServers : Packet() { + var iceServers: List = emptyList() + + override fun getPacketId(): Int = 0x1C + + override fun receive(stream: Stream) { + val count = stream.readInt16() + val servers = ArrayList(count.coerceAtLeast(0)) + for (i in 0 until count) { + servers.add( + IceServer( + url = stream.readString(), + username = stream.readString(), + credential = stream.readString(), + transport = stream.readString() + ) + ) + } + iceServers = servers + } + + override fun send(): Stream { + val stream = Stream() + stream.writeInt16(getPacketId()) + stream.writeInt16(iceServers.size) + for (server in iceServers) { + stream.writeString(server.url) + stream.writeString(server.username) + stream.writeString(server.credential) + stream.writeString(server.transport) + } + return stream + } +} diff --git a/app/src/main/java/com/rosetta/messenger/network/PacketSignalPeer.kt b/app/src/main/java/com/rosetta/messenger/network/PacketSignalPeer.kt new file mode 100644 index 0000000..4b2aee9 --- /dev/null +++ b/app/src/main/java/com/rosetta/messenger/network/PacketSignalPeer.kt @@ -0,0 +1,69 @@ +package com.rosetta.messenger.network + +enum class SignalType(val value: Int) { + CALL(0), + KEY_EXCHANGE(1), + ACTIVE_CALL(2), + END_CALL(3), + CREATE_ROOM(4), + END_CALL_BECAUSE_PEER_DISCONNECTED(5), + END_CALL_BECAUSE_BUSY(6); + + companion object { + fun fromValue(value: Int): SignalType = + entries.firstOrNull { it.value == value } ?: CALL + } +} + +/** + * Signaling packet (ID: 0x1A / 26). + * Wire format mirrors desktop packet.signal.peer.ts. + */ +class PacketSignalPeer : Packet() { + var src: String = "" + var dst: String = "" + var sharedPublic: String = "" + var signalType: SignalType = SignalType.CALL + var roomId: String = "" + + override fun getPacketId(): Int = 0x1A + + override fun receive(stream: Stream) { + signalType = SignalType.fromValue(stream.readInt8()) + if ( + signalType == SignalType.END_CALL_BECAUSE_BUSY || + signalType == SignalType.END_CALL_BECAUSE_PEER_DISCONNECTED + ) { + return + } + src = stream.readString() + dst = stream.readString() + if (signalType == SignalType.KEY_EXCHANGE) { + sharedPublic = stream.readString() + } + if (signalType == SignalType.CREATE_ROOM) { + roomId = stream.readString() + } + } + + override fun send(): Stream { + val stream = Stream() + stream.writeInt16(getPacketId()) + stream.writeInt8(signalType.value) + if ( + signalType == SignalType.END_CALL_BECAUSE_BUSY || + signalType == SignalType.END_CALL_BECAUSE_PEER_DISCONNECTED + ) { + return stream + } + stream.writeString(src) + stream.writeString(dst) + if (signalType == SignalType.KEY_EXCHANGE) { + stream.writeString(sharedPublic) + } + if (signalType == SignalType.CREATE_ROOM) { + stream.writeString(roomId) + } + return stream + } +} diff --git a/app/src/main/java/com/rosetta/messenger/network/PacketWebRTC.kt b/app/src/main/java/com/rosetta/messenger/network/PacketWebRTC.kt new file mode 100644 index 0000000..809ac62 --- /dev/null +++ b/app/src/main/java/com/rosetta/messenger/network/PacketWebRTC.kt @@ -0,0 +1,36 @@ +package com.rosetta.messenger.network + +enum class WebRTCSignalType(val value: Int) { + OFFER(0), + ANSWER(1), + ICE_CANDIDATE(2); + + companion object { + fun fromValue(value: Int): WebRTCSignalType = + entries.firstOrNull { it.value == value } ?: OFFER + } +} + +/** + * WebRTC exchange packet (ID: 0x1B / 27). + * Wire format mirrors desktop packet.webrtc.ts. + */ +class PacketWebRTC : Packet() { + var signalType: WebRTCSignalType = WebRTCSignalType.OFFER + var sdpOrCandidate: String = "" + + override fun getPacketId(): Int = 0x1B + + override fun receive(stream: Stream) { + signalType = WebRTCSignalType.fromValue(stream.readInt8()) + sdpOrCandidate = stream.readString() + } + + override fun send(): Stream { + val stream = Stream() + stream.writeInt16(getPacketId()) + stream.writeInt8(signalType.value) + stream.writeString(sdpOrCandidate) + return stream + } +} diff --git a/app/src/main/java/com/rosetta/messenger/network/Protocol.kt b/app/src/main/java/com/rosetta/messenger/network/Protocol.kt index ae578e8..760e3e7 100644 --- a/app/src/main/java/com/rosetta/messenger/network/Protocol.kt +++ b/app/src/main/java/com/rosetta/messenger/network/Protocol.kt @@ -127,6 +127,7 @@ class Protocol( 0x09 to { PacketDeviceNew() }, 0x0A to { PacketRequestUpdate() }, 0x0B to { PacketTyping() }, + 0x10 to { PacketPushNotification() }, 0x11 to { PacketCreateGroup() }, 0x12 to { PacketGroupInfo() }, 0x13 to { PacketGroupInviteInfo() }, @@ -136,7 +137,10 @@ class Protocol( 0x0F to { PacketRequestTransport() }, 0x17 to { PacketDeviceList() }, 0x18 to { PacketDeviceResolve() }, - 0x19 to { PacketSync() } + 0x19 to { PacketSync() }, + 0x1A to { PacketSignalPeer() }, + 0x1B to { PacketWebRTC() }, + 0x1C to { PacketIceServers() } ) init { diff --git a/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt b/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt index e5eaca7..ec7047c 100644 --- a/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt +++ b/app/src/main/java/com/rosetta/messenger/network/ProtocolManager.kt @@ -27,6 +27,7 @@ import kotlin.coroutines.resume object ProtocolManager { private const val TAG = "ProtocolManager" private const val MANUAL_SYNC_BACKTRACK_MS = 120_000L + private const val SYNC_REQUEST_TIMEOUT_MS = 12_000L private const val MAX_DEBUG_LOGS = 600 private const val DEBUG_LOG_FLUSH_DELAY_MS = 60L private const val TYPING_INDICATOR_TIMEOUT_MS = 3_000L @@ -45,6 +46,7 @@ object ProtocolManager { @Volatile private var packetHandlersRegistered = false @Volatile private var stateMonitoringStarted = false @Volatile private var syncRequestInFlight = false + @Volatile private var syncRequestTimeoutJob: Job? = null // Guard: prevent duplicate FCM token subscribe within a single session @Volatile @@ -210,6 +212,7 @@ object ProtocolManager { } if (newState != ProtocolState.AUTHENTICATED && newState != ProtocolState.HANDSHAKING) { syncRequestInFlight = false + clearSyncRequestTimeout() setSyncInProgress(false) // Connection/session dropped: force re-subscribe on next AUTHENTICATED. lastSubscribedToken = null @@ -678,6 +681,7 @@ object ProtocolManager { private fun finishSyncCycle(reason: String) { syncRequestInFlight = false + clearSyncRequestTimeout() inboundProcessingFailures.set(0) addLog(reason) setSyncInProgress(false) @@ -734,6 +738,7 @@ object ProtocolManager { val repository = messageRepository if (repository == null || !repository.isInitialized()) { syncRequestInFlight = false + clearSyncRequestTimeout() requireResyncAfterAccountInit("⏳ Sync postponed until account is initialized") return@launch } @@ -744,6 +749,7 @@ object ProtocolManager { repositoryAccount != protocolAccount ) { syncRequestInFlight = false + clearSyncRequestTimeout() requireResyncAfterAccountInit( "⏳ Sync postponed: repository bound to another account" ) @@ -757,6 +763,7 @@ object ProtocolManager { private fun sendSynchronize(timestamp: Long) { syncRequestInFlight = true + scheduleSyncRequestTimeout(timestamp) val packet = PacketSync().apply { status = SyncStatus.NOT_NEEDED this.timestamp = timestamp @@ -777,6 +784,7 @@ object ProtocolManager { */ private fun handleSyncPacket(packet: PacketSync) { syncRequestInFlight = false + clearSyncRequestTimeout() when (packet.status) { SyncStatus.BATCH_START -> { addLog("🔄 SYNC BATCH_START — incoming message batch") @@ -826,6 +834,24 @@ object ProtocolManager { } } + private fun scheduleSyncRequestTimeout(cursor: Long) { + syncRequestTimeoutJob?.cancel() + syncRequestTimeoutJob = scope.launch { + delay(SYNC_REQUEST_TIMEOUT_MS) + if (!syncRequestInFlight || !isAuthenticated()) return@launch + syncRequestInFlight = false + addLog( + "⏱️ SYNC response timeout for cursor=$cursor, retrying request" + ) + requestSynchronize() + } + } + + private fun clearSyncRequestTimeout() { + syncRequestTimeoutJob?.cancel() + syncRequestTimeoutJob = null + } + /** * Retry messages stuck in WAITING status on reconnect. * Desktop has in-memory _packetQueue that flushes on handshake, but desktop apps are @@ -1325,6 +1351,7 @@ object ProtocolManager { _devices.value = emptyList() _pendingDeviceVerification.value = null syncRequestInFlight = false + clearSyncRequestTimeout() setSyncInProgress(false) resyncRequiredAfterAccountInit = false lastSubscribedToken = null // reset so token is re-sent on next connect @@ -1341,6 +1368,7 @@ object ProtocolManager { _devices.value = emptyList() _pendingDeviceVerification.value = null syncRequestInFlight = false + clearSyncRequestTimeout() setSyncInProgress(false) resyncRequiredAfterAccountInit = false scope.cancel()