Переход Android звонков на новый серверный протокол

This commit is contained in:
2026-04-04 23:18:23 +05:00
parent 7d4b9a8fc4
commit 2bb3281ccf
6 changed files with 179 additions and 82 deletions

View File

@@ -110,13 +110,14 @@ object CallManager {
private var ownPublicKey: String = ""
private var role: CallRole? = null
private var roomId: String = ""
private var serverCallId: String = ""
private var serverJoinToken: String = ""
private var offerSent = false
private var remoteDescriptionSet = false
private var callSessionId: String = ""
private var callStartedAtMs: Long = 0L
private var keyExchangeSent = false
private var createRoomSent = false
private var activeSignalSent = false
private var lastPeerSharedPublicHex: String = ""
private var localPrivateKey: ByteArray? = null
@@ -183,7 +184,8 @@ object CallManager {
if (phase == CallPhase.IDLE) {
val hasResidualSession =
callSessionId.isNotBlank() ||
roomId.isNotBlank() ||
serverCallId.isNotBlank() ||
serverJoinToken.isNotBlank() ||
role != null ||
_state.value.peerPublicKey.isNotBlank() ||
sharedKeyBytes != null ||
@@ -214,7 +216,12 @@ object CallManager {
* Ставит CallManager в INCOMING сразу, не дожидаясь WebSocket сигнала.
* Если WebSocket CALL придёт позже — дедупликация его отбросит.
*/
fun setIncomingFromPush(peerPublicKey: String, peerTitle: String) {
fun setIncomingFromPush(
peerPublicKey: String,
peerTitle: String,
callId: String = "",
joinToken: String = ""
) {
val peer = peerPublicKey.trim()
if (peer.isBlank()) return
// Уже в звонке — не перебиваем
@@ -222,7 +229,12 @@ object CallManager {
breadcrumb("setIncomingFromPush SKIP: phase=${_state.value.phase}")
return
}
breadcrumb("setIncomingFromPush peer=${peer.take(8)}… title=$peerTitle")
serverCallId = callId.trim()
serverJoinToken = joinToken.trim()
breadcrumb(
"setIncomingFromPush peer=${peer.take(8)}… title=$peerTitle " +
"callId=${serverCallId.take(12)} join=${serverJoinToken.take(12)}"
)
beginCallSession("incoming-push:${peer.take(8)}")
role = CallRole.CALLEE
resetRtcObjects()
@@ -299,7 +311,6 @@ object CallManager {
role = CallRole.CALLEE
generateSessionKeys()
val localPublic = localPublicKey ?: return CallActionResult.INVALID_TARGET
incomingRingTimeoutJob?.cancel()
incomingRingTimeoutJob = null
@@ -312,19 +323,30 @@ object CallManager {
}
armConnectingTimeout("acceptIncomingCall")
// Отправляем KEY_EXCHANGE — если WebSocket не подключен, ждём и ретраим
// Отправляем ACCEPT с callId/joinToken. Если push пришел раньше WS CALL,
// подождем немного пока идентификаторы звонка подтянутся.
scope.launch {
var sent = false
for (attempt in 1..30) { // 30 * 200ms = 6 sec
val callIdNow = serverCallId.trim()
val joinTokenNow = serverJoinToken.trim()
if (callIdNow.isBlank() || joinTokenNow.isBlank()) {
breadcrumb("acceptIncomingCall: waiting callId/joinToken (attempt #$attempt)")
kotlinx.coroutines.delay(200)
continue
}
if (ProtocolManager.isAuthenticated()) {
ProtocolManager.sendCallSignal(
signalType = SignalType.KEY_EXCHANGE,
signalType = SignalType.ACCEPT,
src = ownPublicKey,
dst = snapshot.peerPublicKey,
sharedPublic = localPublic.toHex()
callId = callIdNow,
joinToken = joinTokenNow
)
breadcrumb(
"acceptIncomingCall: ACCEPT sent (attempt #$attempt) " +
"callId=${callIdNow.take(12)} join=${joinTokenNow.take(12)}"
)
keyExchangeSent = true
breadcrumb("acceptIncomingCall: KEY_EXCHANGE sent (attempt #$attempt)")
sent = true
break
}
@@ -332,7 +354,7 @@ object CallManager {
kotlinx.coroutines.delay(200)
}
if (!sent) {
breadcrumb("acceptIncomingCall: FAILED to send KEY_EXCHANGE after 6s — resetting")
breadcrumb("acceptIncomingCall: FAILED to send ACCEPT after 6s — resetting")
resetSession(reason = "Failed to connect", notifyPeer = false)
}
}
@@ -346,11 +368,20 @@ object CallManager {
if (snapshot.phase != CallPhase.INCOMING) return
incomingRingTimeoutJob?.cancel()
incomingRingTimeoutJob = null
if (ownPublicKey.isNotBlank() && snapshot.peerPublicKey.isNotBlank()) {
val callIdNow = serverCallId.trim()
val joinTokenNow = serverJoinToken.trim()
if (ownPublicKey.isNotBlank() && snapshot.peerPublicKey.isNotBlank() && callIdNow.isNotBlank() && joinTokenNow.isNotBlank()) {
ProtocolManager.sendCallSignal(
signalType = SignalType.END_CALL,
src = ownPublicKey,
dst = snapshot.peerPublicKey
dst = snapshot.peerPublicKey,
callId = callIdNow,
joinToken = joinTokenNow
)
} else {
breadcrumb(
"declineIncomingCall: skip END_CALL (missing ids) " +
"callId=${callIdNow.take(12)} join=${joinTokenNow.take(12)}"
)
}
resetSession(reason = null, notifyPeer = false)
@@ -400,6 +431,11 @@ object CallManager {
resetSession(reason = "Peer disconnected", notifyPeer = false)
return
}
SignalType.RINGING_TIMEOUT -> {
breadcrumb("SIG: ringing timeout → reset")
resetSession(reason = "No answer", notifyPeer = false)
return
}
SignalType.END_CALL -> {
breadcrumb("SIG: END_CALL → reset")
resetSession(reason = "Call ended", notifyPeer = false)
@@ -419,10 +455,15 @@ object CallManager {
SignalType.CALL -> {
val incomingPeer = packet.src.trim()
if (incomingPeer.isBlank()) return
serverCallId = packet.callId.trim()
serverJoinToken = packet.joinToken.trim()
// Дедупликация: push уже поставил INCOMING для этого peer — обновляем только имя
if (_state.value.phase == CallPhase.INCOMING && _state.value.peerPublicKey == incomingPeer) {
breadcrumb("SIG: CALL from ${incomingPeer.take(8)}… but already INCOMING — dedup")
breadcrumb(
"SIG: CALL from ${incomingPeer.take(8)}… but already INCOMING — dedup " +
"callId=${serverCallId.take(12)} join=${serverJoinToken.take(12)}"
)
resolvePeerIdentity(incomingPeer)
return
}
@@ -439,7 +480,10 @@ object CallManager {
return
}
beginCallSession("incoming:${incomingPeer.take(8)}")
breadcrumb("SIG: CALL from ${incomingPeer.take(8)}… → INCOMING")
breadcrumb(
"SIG: CALL from ${incomingPeer.take(8)}… → INCOMING " +
"callId=${serverCallId.take(12)} join=${serverJoinToken.take(12)}"
)
role = CallRole.CALLEE
resetRtcObjects()
// Пробуем сразу взять имя из кэша чтобы ForegroundService показал его
@@ -490,29 +534,51 @@ object CallManager {
breadcrumb("SIG: KEY_EXCHANGE → handleKeyExchange")
handleKeyExchange(packet)
}
SignalType.CREATE_ROOM -> {
val incomingRoomId = packet.roomId.trim()
breadcrumb("SIG: CREATE_ROOM roomId=${incomingRoomId.take(16)}")
if (incomingRoomId.isBlank()) {
breadcrumb("SIG: CREATE_ROOM IGNORED — empty roomId!")
SignalType.ACCEPT -> {
breadcrumb(
"SIG: ACCEPT callId=${packet.callId.take(12)} join=${packet.joinToken.take(12)}"
)
serverCallId = packet.callId.trim()
serverJoinToken = packet.joinToken.trim()
if (role != CallRole.CALLER) {
breadcrumb("SIG: ACCEPT ignored — role=$role")
return
}
// Если ключей нет — звонок был принят на другом устройстве,
// а сервер всё равно прислал CREATE_ROOM. Сбрасываем.
if (localPrivateKey == null || localPublicKey == null) {
breadcrumb("SIG: ACCEPT — generating local session keys")
generateSessionKeys()
}
val localPublic = localPublicKey ?: return
ProtocolManager.sendCallSignal(
signalType = SignalType.KEY_EXCHANGE,
src = ownPublicKey,
dst = _state.value.peerPublicKey,
sharedPublic = localPublic.toHex()
)
keyExchangeSent = true
updateState {
it.copy(
phase = CallPhase.CONNECTING,
statusText = "Exchanging keys..."
)
}
armConnectingTimeout("signal:accept")
}
SignalType.ACTIVE -> {
breadcrumb("SIG: ACTIVE")
if (sharedKeyBytes == null && localPrivateKey == null) {
breadcrumb("SIG: CREATE_ROOM but no session keys — call accepted on another device, resetting")
breadcrumb("SIG: ACTIVE but no session keys — resetting")
CallSoundManager.stop()
resetSession(reason = null, notifyPeer = false)
return
}
roomId = incomingRoomId
updateState {
it.copy(
phase = CallPhase.CONNECTING,
statusText = "Connecting..."
)
}
armConnectingTimeout("signal:create_room")
armConnectingTimeout("signal:active")
ensurePeerConnectionAndOffer()
}
SignalType.ACTIVE_CALL -> Unit
@@ -552,25 +618,15 @@ object CallManager {
return
}
setupE2EE(sharedKey)
breadcrumb("KE: CALLER — E2EE ready, sending missing signaling packets")
updateState { it.copy(keyCast = sharedKey, statusText = "Creating room...") }
val localPublic = localPublicKey ?: return
if (!keyExchangeSent) {
breadcrumb("KE: CALLER — E2EE ready, notifying ACTIVE")
updateState { it.copy(keyCast = sharedKey, statusText = "Connecting...") }
if (!activeSignalSent) {
ProtocolManager.sendCallSignal(
signalType = SignalType.KEY_EXCHANGE,
src = ownPublicKey,
dst = peerKey,
sharedPublic = localPublic.toHex()
)
keyExchangeSent = true
}
if (!createRoomSent) {
ProtocolManager.sendCallSignal(
signalType = SignalType.CREATE_ROOM,
signalType = SignalType.ACTIVE,
src = ownPublicKey,
dst = peerKey
)
createRoomSent = true
activeSignalSent = true
}
updateState { it.copy(phase = CallPhase.CONNECTING) }
armConnectingTimeout("key_exchange:caller")
@@ -588,10 +644,23 @@ object CallManager {
return
}
setupE2EE(sharedKey)
breadcrumb("KE: CALLEE — E2EE ready, waiting for CREATE_ROOM")
if (!keyExchangeSent) {
val localPublic = localPublicKey ?: return
ProtocolManager.sendCallSignal(
signalType = SignalType.KEY_EXCHANGE,
src = ownPublicKey,
dst = peerKey,
sharedPublic = localPublic.toHex()
)
keyExchangeSent = true
}
breadcrumb("KE: CALLEE — E2EE ready, waiting for ACTIVE")
updateState { it.copy(keyCast = sharedKey, phase = CallPhase.CONNECTING) }
armConnectingTimeout("key_exchange:callee")
return
}
breadcrumb("KE: ignored — unknown role")
}
private suspend fun handleWebRtcPacket(packet: PacketWebRTC) {
@@ -710,8 +779,8 @@ object CallManager {
private suspend fun ensurePeerConnectionAndOffer() {
val peerKey = _state.value.peerPublicKey
if (peerKey.isBlank() || roomId.isBlank()) {
breadcrumb("PC: ensurePCAndOffer SKIP — peer=${peerKey.take(8)} room=${roomId.take(8)}")
if (peerKey.isBlank()) {
breadcrumb("PC: ensurePCAndOffer SKIP — peer=${peerKey.take(8)}")
return
}
if (offerSent) {
@@ -719,7 +788,7 @@ object CallManager {
return
}
breadcrumb("PC: ensurePCAndOffer START role=$role room=${roomId.take(8)}")
breadcrumb("PC: ensurePCAndOffer START role=$role")
ensurePeerFactory()
val factory = peerConnectionFactory
if (factory == null) {
@@ -897,8 +966,8 @@ object CallManager {
val snapshot = _state.value
if (snapshot.phase != CallPhase.CONNECTING) return@launch
breadcrumb(
"CONNECTING TIMEOUT origin=$origin role=$role room=${roomId.take(12)} " +
"keyExSent=$keyExchangeSent createRoomSent=$createRoomSent offerSent=$offerSent " +
"CONNECTING TIMEOUT origin=$origin role=$role callId=${serverCallId.take(12)} join=${serverJoinToken.take(12)} " +
"keyExSent=$keyExchangeSent activeSent=$activeSignalSent offerSent=$offerSent " +
"remoteDesc=$remoteDescriptionSet peer=${snapshot.peerPublicKey.take(8)}"
)
resetSession(reason = "Connecting timeout", notifyPeer = false)
@@ -983,7 +1052,9 @@ object CallManager {
ProtocolManager.sendCallSignal(
signalType = SignalType.END_CALL,
src = ownPublicKey,
dst = peerToNotify
dst = peerToNotify,
callId = serverCallId,
joinToken = serverJoinToken
)
}
// Отменяем все jobs ПЕРВЫМИ — чтобы они не вызвали updateState с пустым state
@@ -1011,11 +1082,12 @@ object CallManager {
lastHealthLog = ""
healthLogCount = 0
role = null
roomId = ""
serverCallId = ""
serverJoinToken = ""
offerSent = false
remoteDescriptionSet = false
keyExchangeSent = false
createRoomSent = false
activeSignalSent = false
lastPeerSharedPublicHex = ""
lastRemoteOfferFingerprint = ""
lastLocalOfferFingerprint = ""
@@ -1273,7 +1345,8 @@ object CallManager {
append(" phase=").append(st.phase)
append(" role=").append(role)
append(" peer=").append(st.peerPublicKey.take(12))
append(" room=").append(roomId.take(16))
append(" callId=").append(serverCallId.take(16))
append(" join=").append(serverJoinToken.take(16))
append(" offerSent=").append(offerSent)
append(" remoteDescSet=").append(remoteDescriptionSet)
append(" e2eeAvail=").append(e2eeAvailable)

View File

@@ -5,9 +5,11 @@ enum class SignalType(val value: Int) {
KEY_EXCHANGE(1),
ACTIVE_CALL(2),
END_CALL(3),
CREATE_ROOM(4),
ACTIVE(4),
END_CALL_BECAUSE_PEER_DISCONNECTED(5),
END_CALL_BECAUSE_BUSY(6);
END_CALL_BECAUSE_BUSY(6),
ACCEPT(7),
RINGING_TIMEOUT(8);
companion object {
fun fromValue(value: Int): SignalType =
@@ -25,7 +27,8 @@ class PacketSignalPeer : Packet() {
var dst: String = ""
var sharedPublic: String = ""
var signalType: SignalType = SignalType.CALL
var roomId: String = ""
var callId: String = ""
var joinToken: String = ""
override fun getPacketId(): Int = 0x1A
@@ -33,7 +36,8 @@ class PacketSignalPeer : Packet() {
signalType = SignalType.fromValue(stream.readInt8())
if (
signalType == SignalType.END_CALL_BECAUSE_BUSY ||
signalType == SignalType.END_CALL_BECAUSE_PEER_DISCONNECTED
signalType == SignalType.END_CALL_BECAUSE_PEER_DISCONNECTED ||
signalType == SignalType.RINGING_TIMEOUT
) {
return
}
@@ -42,8 +46,13 @@ class PacketSignalPeer : Packet() {
if (signalType == SignalType.KEY_EXCHANGE) {
sharedPublic = stream.readString()
}
if (signalType == SignalType.CREATE_ROOM) {
roomId = stream.readString()
if (
signalType == SignalType.CALL ||
signalType == SignalType.ACCEPT ||
signalType == SignalType.END_CALL
) {
callId = stream.readString()
joinToken = stream.readString()
}
}
@@ -53,7 +62,8 @@ class PacketSignalPeer : Packet() {
stream.writeInt8(signalType.value)
if (
signalType == SignalType.END_CALL_BECAUSE_BUSY ||
signalType == SignalType.END_CALL_BECAUSE_PEER_DISCONNECTED
signalType == SignalType.END_CALL_BECAUSE_PEER_DISCONNECTED ||
signalType == SignalType.RINGING_TIMEOUT
) {
return stream
}
@@ -62,8 +72,13 @@ class PacketSignalPeer : Packet() {
if (signalType == SignalType.KEY_EXCHANGE) {
stream.writeString(sharedPublic)
}
if (signalType == SignalType.CREATE_ROOM) {
stream.writeString(roomId)
if (
signalType == SignalType.CALL ||
signalType == SignalType.ACCEPT ||
signalType == SignalType.END_CALL
) {
stream.writeString(callId)
stream.writeString(joinToken)
}
return stream
}

View File

@@ -19,16 +19,12 @@ enum class WebRTCSignalType(val value: Int) {
class PacketWebRTC : Packet() {
var signalType: WebRTCSignalType = WebRTCSignalType.OFFER
var sdpOrCandidate: String = ""
var publicKey: String = ""
var deviceId: String = ""
override fun getPacketId(): Int = 0x1B
override fun receive(stream: Stream) {
signalType = WebRTCSignalType.fromValue(stream.readInt8())
sdpOrCandidate = stream.readString()
publicKey = stream.readString()
deviceId = stream.readString()
}
override fun send(): Stream {
@@ -36,8 +32,6 @@ class PacketWebRTC : Packet() {
stream.writeInt16(getPacketId())
stream.writeInt8(signalType.value)
stream.writeString(sdpOrCandidate)
stream.writeString(publicKey)
stream.writeString(deviceId)
return stream
}
}

View File

@@ -135,10 +135,9 @@ class Protocol(
"status=${packet.status} ts=${packet.timestamp}"
is PacketSignalPeer ->
"type=${packet.signalType} src=${shortKey(packet.src)} dst=${shortKey(packet.dst)} " +
"sharedLen=${packet.sharedPublic.length} room=${shortKey(packet.roomId, 12)}"
"sharedLen=${packet.sharedPublic.length} callId=${shortKey(packet.callId, 12)} join=${shortKey(packet.joinToken, 12)}"
is PacketWebRTC ->
"type=${packet.signalType} sdpLen=${packet.sdpOrCandidate.length} " +
"pk=${shortKey(packet.publicKey)} device=${shortKey(packet.deviceId, 12)} " +
"preview='${shortText(packet.sdpOrCandidate, 64)}'"
is PacketIceServers ->
"count=${packet.iceServers.size} firstUrl='${packet.iceServers.firstOrNull()?.url?.let { shortText(it, 40) } ?: "<none>"}'"

View File

@@ -1373,11 +1373,12 @@ object ProtocolManager {
src: String = "",
dst: String = "",
sharedPublic: String = "",
roomId: String = ""
callId: String = "",
joinToken: String = ""
) {
addLog(
"📡 CALL TX type=$signalType src=${shortKeyForLog(src)} dst=${shortKeyForLog(dst)} " +
"sharedLen=${sharedPublic.length} room=${shortKeyForLog(roomId, 12)}"
"sharedLen=${sharedPublic.length} callId=${shortKeyForLog(callId, 12)} join=${shortKeyForLog(joinToken, 12)}"
)
send(
PacketSignalPeer().apply {
@@ -1385,7 +1386,8 @@ object ProtocolManager {
this.src = src
this.dst = dst
this.sharedPublic = sharedPublic
this.roomId = roomId
this.callId = callId
this.joinToken = joinToken
}
)
}
@@ -1394,19 +1396,14 @@ object ProtocolManager {
* Send WebRTC signaling packet (0x1B).
*/
fun sendWebRtcSignal(signalType: WebRTCSignalType, sdpOrCandidate: String) {
val pk = try { getProtocol().getPublicKey().orEmpty() } catch (_: Exception) { "" }
val did = appContext?.let { getOrCreateDeviceId(it) } ?: ""
addLog(
"📡 WEBRTC TX type=$signalType sdpLen=${sdpOrCandidate.length} " +
"pk=${shortKeyForLog(pk)} did=${shortKeyForLog(did, 12)} " +
"preview='${shortTextForLog(sdpOrCandidate, 56)}'"
)
send(
PacketWebRTC().apply {
this.signalType = signalType
this.sdpOrCandidate = sdpOrCandidate
this.publicKey = pk
this.deviceId = did
}
)
}
@@ -1428,7 +1425,7 @@ object ProtocolManager {
(packet as? PacketSignalPeer)?.let {
addLog(
"📡 CALL RX type=${it.signalType} src=${shortKeyForLog(it.src)} dst=${shortKeyForLog(it.dst)} " +
"sharedLen=${it.sharedPublic.length} room=${shortKeyForLog(it.roomId, 12)}"
"sharedLen=${it.sharedPublic.length} callId=${shortKeyForLog(it.callId, 12)} join=${shortKeyForLog(it.joinToken, 12)}"
)
callback(it)
}
@@ -1450,7 +1447,6 @@ object ProtocolManager {
(packet as? PacketWebRTC)?.let {
addLog(
"📡 WEBRTC RX type=${it.signalType} sdpLen=${it.sdpOrCandidate.length} " +
"pk=${shortKeyForLog(it.publicKey)} did=${shortKeyForLog(it.deviceId, 12)} " +
"preview='${shortTextForLog(it.sdpOrCandidate, 56)}'"
)
callback(it)

View File

@@ -160,6 +160,8 @@ class RosettaFirebaseMessagingService : FirebaseMessagingService() {
"public_key",
"publicKey"
)
val callId = firstNonBlank(data, "callId", "call_id")
val joinToken = firstNonBlank(data, "joinToken", "join_token")
val senderName =
firstNonBlank(
data,
@@ -203,7 +205,8 @@ class RosettaFirebaseMessagingService : FirebaseMessagingService() {
handleIncomingCallPush(
dialogKey = dialogKey ?: senderPublicKey.orEmpty(),
title = senderName,
body = messagePreview
callId = callId.orEmpty(),
joinToken = joinToken.orEmpty()
)
handledByData = true
}
@@ -395,7 +398,12 @@ class RosettaFirebaseMessagingService : FirebaseMessagingService() {
}
/** Супер push входящего звонка: пробуждаем протокол и запускаем ForegroundService с incoming call UI */
private fun handleIncomingCallPush(dialogKey: String, title: String, body: String) {
private fun handleIncomingCallPush(
dialogKey: String,
title: String,
callId: String,
joinToken: String
) {
pushCallLog("handleIncomingCallPush dialog=$dialogKey title=$title")
wakeProtocolFromPush("call")
@@ -415,7 +423,14 @@ class RosettaFirebaseMessagingService : FirebaseMessagingService() {
return
}
val dedupKey = "call:${normalizedDialog.ifEmpty { "__no_dialog__" }}"
val normalizedCallId = callId.trim()
val normalizedJoinToken = joinToken.trim()
val dedupKey =
if (normalizedCallId.isNotBlank()) {
"call:$normalizedCallId"
} else {
"call:${normalizedDialog.ifEmpty { "__no_dialog__" }}"
}
val now = System.currentTimeMillis()
val lastTs = lastNotifTimestamps[dedupKey]
if (lastTs != null && now - lastTs < DEDUP_WINDOW_MS) {
@@ -428,7 +443,12 @@ class RosettaFirebaseMessagingService : FirebaseMessagingService() {
pushCallLog("resolvedName=$resolvedName, calling setIncomingFromPush")
// Сразу ставим CallManager в INCOMING — не ждём WebSocket
CallManager.setIncomingFromPush(normalizedDialog, resolvedName)
CallManager.setIncomingFromPush(
peerPublicKey = normalizedDialog,
peerTitle = resolvedName,
callId = normalizedCallId,
joinToken = normalizedJoinToken
)
pushCallLog("setIncomingFromPush done, phase=${CallManager.state.value.phase}")
// Пробуем запустить IncomingCallActivity напрямую из FCM