Синхронизация 1.3.0: parity с desktop/server и стабилизация sync-цикла

This commit is contained in:
2026-03-22 19:47:23 +05:00
parent 69c0c377d1
commit f915333a44
8 changed files with 219 additions and 14 deletions

View File

@@ -686,13 +686,6 @@ class MessageRepository private constructor(private val context: Context) {
return true
}
// 🔥 ВТОРОЙ УРОВЕНЬ ЗАЩИТЫ: Проверка в БД (для сообщений сохранённых в предыдущих сессиях)
val isDuplicate = messageDao.messageExists(account, messageId)
MessageLogger.logDuplicateCheck(messageId, isDuplicate)
if (isDuplicate) {
return true
}
val dialogOpponentKey =
when {
isGroupMessage -> packet.toPublicKey
@@ -701,6 +694,33 @@ class MessageRepository private constructor(private val context: Context) {
}
val dialogKey = getDialogKey(dialogOpponentKey)
// 🔥 ВТОРОЙ УРОВЕНЬ ЗАЩИТЫ: Проверка в БД (для сообщений сохранённых в предыдущих сессиях)
val isDuplicate = messageDao.messageExists(account, messageId)
MessageLogger.logDuplicateCheck(messageId, isDuplicate)
if (isDuplicate) {
// Desktop/server parity:
// own messages that arrive via sync must be treated as delivered.
// If a local optimistic row already exists (WAITING/ERROR), normalize it.
if (isOwnMessage) {
messageDao.updateDeliveryStatus(account, messageId, DeliveryStatus.DELIVERED.value)
messageCache[dialogKey]?.let { flow ->
flow.value =
flow.value.map { msg ->
if (msg.messageId == messageId) {
msg.copy(deliveryStatus = DeliveryStatus.DELIVERED)
} else {
msg
}
}
}
_deliveryStatusEvents.tryEmit(
DeliveryStatusUpdate(dialogKey, messageId, DeliveryStatus.DELIVERED)
)
dialogDao.updateDialogFromMessages(account, dialogOpponentKey)
}
return true
}
try {
val groupKey =
if (isGroupMessage) {

View File

@@ -17,10 +17,11 @@ object ReleaseNotes {
val RELEASE_NOTICE = """
Update v$VERSION_PLACEHOLDER
Полноэкранный просмотр фото
- Убраны лишние искусственные отступы в fullscreen viewer
- Фото (включая большие скриншоты) снова открываются edge-to-edge, как в Telegram
- Исправлены большие чёрные бордеры вокруг изображения при открытии
Синхронизация 1 в 1 с desktop/server
- Выровнен сетевой контракт пакетов как в desktop: добавлена поддержка 0x10 (push), 0x1A (signal), 0x1B (webrtc), 0x1C (ice)
- Исправлена нормализация дубликатов своих сообщений из sync: локальные WAITING/ERROR теперь автоматически переходят в DELIVERED
- Добавлен watchdog для sync-запроса: если ответ на PacketSync завис, запрос перезапускается автоматически
- Повышена стабильность цикла BATCH_START/BATCH_END/NOT_NEEDED при reconnect
""".trimIndent()
fun getNotice(version: String): String =

View File

@@ -0,0 +1,47 @@
package com.rosetta.messenger.network
data class IceServer(
val url: String,
val username: String,
val credential: String,
val transport: String
)
/**
* ICE servers packet (ID: 0x1C / 28).
* Wire format mirrors desktop packet.ice.servers.ts.
*/
class PacketIceServers : Packet() {
var iceServers: List<IceServer> = emptyList()
override fun getPacketId(): Int = 0x1C
override fun receive(stream: Stream) {
val count = stream.readInt16()
val servers = ArrayList<IceServer>(count.coerceAtLeast(0))
for (i in 0 until count) {
servers.add(
IceServer(
url = stream.readString(),
username = stream.readString(),
credential = stream.readString(),
transport = stream.readString()
)
)
}
iceServers = servers
}
override fun send(): Stream {
val stream = Stream()
stream.writeInt16(getPacketId())
stream.writeInt16(iceServers.size)
for (server in iceServers) {
stream.writeString(server.url)
stream.writeString(server.username)
stream.writeString(server.credential)
stream.writeString(server.transport)
}
return stream
}
}

View File

@@ -0,0 +1,69 @@
package com.rosetta.messenger.network
enum class SignalType(val value: Int) {
CALL(0),
KEY_EXCHANGE(1),
ACTIVE_CALL(2),
END_CALL(3),
CREATE_ROOM(4),
END_CALL_BECAUSE_PEER_DISCONNECTED(5),
END_CALL_BECAUSE_BUSY(6);
companion object {
fun fromValue(value: Int): SignalType =
entries.firstOrNull { it.value == value } ?: CALL
}
}
/**
* Signaling packet (ID: 0x1A / 26).
* Wire format mirrors desktop packet.signal.peer.ts.
*/
class PacketSignalPeer : Packet() {
var src: String = ""
var dst: String = ""
var sharedPublic: String = ""
var signalType: SignalType = SignalType.CALL
var roomId: String = ""
override fun getPacketId(): Int = 0x1A
override fun receive(stream: Stream) {
signalType = SignalType.fromValue(stream.readInt8())
if (
signalType == SignalType.END_CALL_BECAUSE_BUSY ||
signalType == SignalType.END_CALL_BECAUSE_PEER_DISCONNECTED
) {
return
}
src = stream.readString()
dst = stream.readString()
if (signalType == SignalType.KEY_EXCHANGE) {
sharedPublic = stream.readString()
}
if (signalType == SignalType.CREATE_ROOM) {
roomId = stream.readString()
}
}
override fun send(): Stream {
val stream = Stream()
stream.writeInt16(getPacketId())
stream.writeInt8(signalType.value)
if (
signalType == SignalType.END_CALL_BECAUSE_BUSY ||
signalType == SignalType.END_CALL_BECAUSE_PEER_DISCONNECTED
) {
return stream
}
stream.writeString(src)
stream.writeString(dst)
if (signalType == SignalType.KEY_EXCHANGE) {
stream.writeString(sharedPublic)
}
if (signalType == SignalType.CREATE_ROOM) {
stream.writeString(roomId)
}
return stream
}
}

View File

@@ -0,0 +1,36 @@
package com.rosetta.messenger.network
enum class WebRTCSignalType(val value: Int) {
OFFER(0),
ANSWER(1),
ICE_CANDIDATE(2);
companion object {
fun fromValue(value: Int): WebRTCSignalType =
entries.firstOrNull { it.value == value } ?: OFFER
}
}
/**
* WebRTC exchange packet (ID: 0x1B / 27).
* Wire format mirrors desktop packet.webrtc.ts.
*/
class PacketWebRTC : Packet() {
var signalType: WebRTCSignalType = WebRTCSignalType.OFFER
var sdpOrCandidate: String = ""
override fun getPacketId(): Int = 0x1B
override fun receive(stream: Stream) {
signalType = WebRTCSignalType.fromValue(stream.readInt8())
sdpOrCandidate = stream.readString()
}
override fun send(): Stream {
val stream = Stream()
stream.writeInt16(getPacketId())
stream.writeInt8(signalType.value)
stream.writeString(sdpOrCandidate)
return stream
}
}

View File

@@ -127,6 +127,7 @@ class Protocol(
0x09 to { PacketDeviceNew() },
0x0A to { PacketRequestUpdate() },
0x0B to { PacketTyping() },
0x10 to { PacketPushNotification() },
0x11 to { PacketCreateGroup() },
0x12 to { PacketGroupInfo() },
0x13 to { PacketGroupInviteInfo() },
@@ -136,7 +137,10 @@ class Protocol(
0x0F to { PacketRequestTransport() },
0x17 to { PacketDeviceList() },
0x18 to { PacketDeviceResolve() },
0x19 to { PacketSync() }
0x19 to { PacketSync() },
0x1A to { PacketSignalPeer() },
0x1B to { PacketWebRTC() },
0x1C to { PacketIceServers() }
)
init {

View File

@@ -27,6 +27,7 @@ import kotlin.coroutines.resume
object ProtocolManager {
private const val TAG = "ProtocolManager"
private const val MANUAL_SYNC_BACKTRACK_MS = 120_000L
private const val SYNC_REQUEST_TIMEOUT_MS = 12_000L
private const val MAX_DEBUG_LOGS = 600
private const val DEBUG_LOG_FLUSH_DELAY_MS = 60L
private const val TYPING_INDICATOR_TIMEOUT_MS = 3_000L
@@ -45,6 +46,7 @@ object ProtocolManager {
@Volatile private var packetHandlersRegistered = false
@Volatile private var stateMonitoringStarted = false
@Volatile private var syncRequestInFlight = false
@Volatile private var syncRequestTimeoutJob: Job? = null
// Guard: prevent duplicate FCM token subscribe within a single session
@Volatile
@@ -210,6 +212,7 @@ object ProtocolManager {
}
if (newState != ProtocolState.AUTHENTICATED && newState != ProtocolState.HANDSHAKING) {
syncRequestInFlight = false
clearSyncRequestTimeout()
setSyncInProgress(false)
// Connection/session dropped: force re-subscribe on next AUTHENTICATED.
lastSubscribedToken = null
@@ -678,6 +681,7 @@ object ProtocolManager {
private fun finishSyncCycle(reason: String) {
syncRequestInFlight = false
clearSyncRequestTimeout()
inboundProcessingFailures.set(0)
addLog(reason)
setSyncInProgress(false)
@@ -734,6 +738,7 @@ object ProtocolManager {
val repository = messageRepository
if (repository == null || !repository.isInitialized()) {
syncRequestInFlight = false
clearSyncRequestTimeout()
requireResyncAfterAccountInit("⏳ Sync postponed until account is initialized")
return@launch
}
@@ -744,6 +749,7 @@ object ProtocolManager {
repositoryAccount != protocolAccount
) {
syncRequestInFlight = false
clearSyncRequestTimeout()
requireResyncAfterAccountInit(
"⏳ Sync postponed: repository bound to another account"
)
@@ -757,6 +763,7 @@ object ProtocolManager {
private fun sendSynchronize(timestamp: Long) {
syncRequestInFlight = true
scheduleSyncRequestTimeout(timestamp)
val packet = PacketSync().apply {
status = SyncStatus.NOT_NEEDED
this.timestamp = timestamp
@@ -777,6 +784,7 @@ object ProtocolManager {
*/
private fun handleSyncPacket(packet: PacketSync) {
syncRequestInFlight = false
clearSyncRequestTimeout()
when (packet.status) {
SyncStatus.BATCH_START -> {
addLog("🔄 SYNC BATCH_START — incoming message batch")
@@ -826,6 +834,24 @@ object ProtocolManager {
}
}
private fun scheduleSyncRequestTimeout(cursor: Long) {
syncRequestTimeoutJob?.cancel()
syncRequestTimeoutJob = scope.launch {
delay(SYNC_REQUEST_TIMEOUT_MS)
if (!syncRequestInFlight || !isAuthenticated()) return@launch
syncRequestInFlight = false
addLog(
"⏱️ SYNC response timeout for cursor=$cursor, retrying request"
)
requestSynchronize()
}
}
private fun clearSyncRequestTimeout() {
syncRequestTimeoutJob?.cancel()
syncRequestTimeoutJob = null
}
/**
* Retry messages stuck in WAITING status on reconnect.
* Desktop has in-memory _packetQueue that flushes on handshake, but desktop apps are
@@ -1325,6 +1351,7 @@ object ProtocolManager {
_devices.value = emptyList()
_pendingDeviceVerification.value = null
syncRequestInFlight = false
clearSyncRequestTimeout()
setSyncInProgress(false)
resyncRequiredAfterAccountInit = false
lastSubscribedToken = null // reset so token is re-sent on next connect
@@ -1341,6 +1368,7 @@ object ProtocolManager {
_devices.value = emptyList()
_pendingDeviceVerification.value = null
syncRequestInFlight = false
clearSyncRequestTimeout()
setSyncInProgress(false)
resyncRequiredAfterAccountInit = false
scope.cancel()