feat: Add delivery confirmation for incoming messages in ProtocolManager

This commit is contained in:
k1ngsterr1
2026-01-12 14:47:36 +05:00
parent a7976c7cf3
commit 99121ce996
11 changed files with 413 additions and 153 deletions

View File

@@ -364,6 +364,7 @@ class PacketRead : Packet() {
/**
* Delivery packet (ID: 0x08)
* Уведомление о доставке сообщения
* Порядок полей как в React Native: toPublicKey, messageId
*/
class PacketDelivery : Packet() {
var messageId: String = ""
@@ -372,15 +373,17 @@ class PacketDelivery : Packet() {
override fun getPacketId(): Int = 0x08
override fun receive(stream: Stream) {
messageId = stream.readString()
// React Native читает: toPublicKey, messageId
toPublicKey = stream.readString()
messageId = stream.readString()
}
override fun send(): Stream {
val stream = Stream()
stream.writeInt16(getPacketId())
stream.writeString(messageId)
// React Native пишет: toPublicKey, messageId
stream.writeString(toPublicKey)
stream.writeString(messageId)
return stream
}
}

View File

@@ -43,6 +43,7 @@ class Protocol(
private val client = OkHttpClient.Builder()
.readTimeout(0, TimeUnit.MILLISECONDS)
.connectTimeout(10, TimeUnit.SECONDS)
.pingInterval(30, TimeUnit.SECONDS) // Автоматический ping/pong для keep-alive
.build()
private var webSocket: WebSocket? = null
@@ -59,8 +60,8 @@ class Protocol(
private val _lastError = MutableStateFlow<String?>(null)
val lastError: StateFlow<String?> = _lastError.asStateFlow()
// Packet waiters - callbacks for specific packet types
private val packetWaiters = mutableMapOf<Int, MutableList<(Packet) -> Unit>>()
// Packet waiters - callbacks for specific packet types (thread-safe)
private val packetWaiters = java.util.concurrent.ConcurrentHashMap<Int, MutableList<(Packet) -> Unit>>()
// Packet queue for packets sent before handshake complete
private val packetQueue = mutableListOf<Packet>()
@@ -104,7 +105,7 @@ class Protocol(
/**
* Start heartbeat to keep connection alive
* Как в Архиве - отправляем text "heartbeat"
* Как в Архиве - отправляем text "heartbeat" СРАЗУ и потом с интервалом
*/
private fun startHeartbeat(intervalSeconds: Int) {
heartbeatJob?.cancel()
@@ -113,18 +114,36 @@ class Protocol(
log("💓 Starting heartbeat with interval: ${intervalSeconds}s (sending every ${intervalMs}ms)")
heartbeatJob = scope.launch {
// ⚡ СРАЗУ отправляем первый heartbeat (как в Архиве)
sendHeartbeat()
while (isActive) {
delay(intervalMs)
try {
if (webSocket?.send("heartbeat") == true) {
log("💓 Heartbeat sent")
} else {
log("💔 Heartbeat failed - socket closed or null")
sendHeartbeat()
}
}
}
/**
* Отправка heartbeat с проверкой состояния
*/
private fun sendHeartbeat() {
try {
if (_state.value == ProtocolState.AUTHENTICATED) {
val sent = webSocket?.send("heartbeat") ?: false
if (sent) {
log("💓 Heartbeat sent")
} else {
log("💔 Heartbeat failed - socket closed or null")
// Триггерим reconnect если heartbeat не прошёл
if (!isManuallyClosed) {
log("🔄 Triggering reconnect due to failed heartbeat")
handleDisconnect()
}
} catch (e: Exception) {
log("💔 Heartbeat error: ${e.message}")
}
}
} catch (e: Exception) {
log("💔 Heartbeat error: ${e.message}")
}
}

View File

@@ -62,6 +62,15 @@ object ProtocolManager {
val messagePacket = packet as PacketMessage
addLog("📩 Incoming message from ${messagePacket.fromPublicKey.take(16)}...")
// ⚡ ВАЖНО: Отправляем подтверждение доставки обратно серверу
// Без этого сервер не будет отправлять следующие сообщения!
val deliveryPacket = PacketDelivery().apply {
messageId = messagePacket.messageId
toPublicKey = messagePacket.fromPublicKey
}
send(deliveryPacket)
addLog("✅ Sent delivery confirmation for message ${messagePacket.messageId.take(16)}...")
scope.launch {
messageRepository?.handleIncomingMessage(messagePacket)
}