Стабилизация sync и логов: heartbeat антиспам + Connection Logs через rosettadev2
Some checks failed
Android Kernel Build / build (push) Has been cancelled
Some checks failed
Android Kernel Build / build (push) Has been cancelled
This commit is contained in:
@@ -1200,6 +1200,11 @@ fun MainScreen(
|
|||||||
},
|
},
|
||||||
onNavigateToCrashLogs = {
|
onNavigateToCrashLogs = {
|
||||||
navStack = navStack.filterNot { it is Screen.Search } + Screen.CrashLogs
|
navStack = navStack.filterNot { it is Screen.Search } + Screen.CrashLogs
|
||||||
|
},
|
||||||
|
onNavigateToConnectionLogs = {
|
||||||
|
navStack =
|
||||||
|
navStack.filterNot { it is Screen.Search } +
|
||||||
|
Screen.ConnectionLogs
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -871,11 +871,10 @@ class MessageRepository private constructor(private val context: Context) {
|
|||||||
unreadCount = dialog?.unreadCount ?: 0
|
unreadCount = dialog?.unreadCount ?: 0
|
||||||
)
|
)
|
||||||
|
|
||||||
// 🔥 Запрашиваем информацию о пользователе для отображения имени вместо ключа
|
// 🔥 Запрашиваем информацию о пользователе для отображения имени вместо ключа.
|
||||||
// Desktop parity: always re-fetch on incoming message so renamed contacts
|
// Важно: не форсим повторный запрос на каждый входящий пакет — это создает
|
||||||
// get their new name/username updated in the chat list.
|
// шторм PacketSearch во время sync и заметно тормозит обработку.
|
||||||
if (!isGroupDialogKey(dialogOpponentKey)) {
|
if (!isGroupDialogKey(dialogOpponentKey)) {
|
||||||
requestedUserInfoKeys.remove(dialogOpponentKey)
|
|
||||||
requestUserInfo(dialogOpponentKey)
|
requestUserInfo(dialogOpponentKey)
|
||||||
} else {
|
} else {
|
||||||
applyGroupDisplayNameToDialog(account, dialogOpponentKey)
|
applyGroupDisplayNameToDialog(account, dialogOpponentKey)
|
||||||
|
|||||||
@@ -33,6 +33,9 @@ class Protocol(
|
|||||||
private const val RECONNECT_INTERVAL = 5000L // 5 seconds (как в Архиве)
|
private const val RECONNECT_INTERVAL = 5000L // 5 seconds (как в Архиве)
|
||||||
private const val HANDSHAKE_TIMEOUT = 10000L // 10 seconds
|
private const val HANDSHAKE_TIMEOUT = 10000L // 10 seconds
|
||||||
private const val MIN_PACKET_ID_BITS = 18 // Stream.readInt16() = 2 * readInt8() (9 bits each)
|
private const val MIN_PACKET_ID_BITS = 18 // Stream.readInt16() = 2 * readInt8() (9 bits each)
|
||||||
|
private const val DEFAULT_HEARTBEAT_INTERVAL_SECONDS = 15
|
||||||
|
private const val MIN_HEARTBEAT_SEND_INTERVAL_MS = 2_000L
|
||||||
|
private const val HEARTBEAT_OK_LOG_THROTTLE_MS = 30_000L
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun log(message: String) {
|
private fun log(message: String) {
|
||||||
@@ -112,6 +115,9 @@ class Protocol(
|
|||||||
|
|
||||||
// Heartbeat
|
// Heartbeat
|
||||||
private var heartbeatJob: Job? = null
|
private var heartbeatJob: Job? = null
|
||||||
|
@Volatile private var heartbeatPeriodMs: Long = 0L
|
||||||
|
@Volatile private var lastHeartbeatOkLogAtMs: Long = 0L
|
||||||
|
@Volatile private var heartbeatOkSuppressedCount: Int = 0
|
||||||
|
|
||||||
// Supported packets
|
// Supported packets
|
||||||
private val supportedPackets = mapOf(
|
private val supportedPackets = mapOf(
|
||||||
@@ -179,11 +185,24 @@ class Protocol(
|
|||||||
* Как в Архиве - отправляем text "heartbeat" СРАЗУ и потом с интервалом
|
* Как в Архиве - отправляем text "heartbeat" СРАЗУ и потом с интервалом
|
||||||
*/
|
*/
|
||||||
private fun startHeartbeat(intervalSeconds: Int) {
|
private fun startHeartbeat(intervalSeconds: Int) {
|
||||||
heartbeatJob?.cancel()
|
val normalizedServerIntervalSec =
|
||||||
|
if (intervalSeconds > 0) intervalSeconds else DEFAULT_HEARTBEAT_INTERVAL_SECONDS
|
||||||
|
// Отправляем чаще - каждые 1/3 интервала, но с нижним лимитом чтобы исключить tight-loop.
|
||||||
|
val intervalMs =
|
||||||
|
((normalizedServerIntervalSec * 1000L) / 3).coerceAtLeast(MIN_HEARTBEAT_SEND_INTERVAL_MS)
|
||||||
|
|
||||||
// Отправляем чаще - каждые 1/3 интервала (чтобы не терять соединение)
|
if (heartbeatJob?.isActive == true && heartbeatPeriodMs == intervalMs) {
|
||||||
val intervalMs = (intervalSeconds * 1000L) / 3
|
return
|
||||||
log("💓 HEARTBEAT START: server=${intervalSeconds}s, sending=${intervalMs/1000}s, state=${_state.value}")
|
}
|
||||||
|
|
||||||
|
heartbeatJob?.cancel()
|
||||||
|
heartbeatPeriodMs = intervalMs
|
||||||
|
lastHeartbeatOkLogAtMs = 0L
|
||||||
|
heartbeatOkSuppressedCount = 0
|
||||||
|
log(
|
||||||
|
"💓 HEARTBEAT START: server=${intervalSeconds}s(normalized=${normalizedServerIntervalSec}s), " +
|
||||||
|
"sending=${intervalMs / 1000}s, state=${_state.value}"
|
||||||
|
)
|
||||||
|
|
||||||
heartbeatJob = scope.launch {
|
heartbeatJob = scope.launch {
|
||||||
// ⚡ СРАЗУ отправляем первый heartbeat (как в Архиве)
|
// ⚡ СРАЗУ отправляем первый heartbeat (как в Архиве)
|
||||||
@@ -210,7 +229,17 @@ class Protocol(
|
|||||||
) {
|
) {
|
||||||
val sent = webSocket?.send("heartbeat") ?: false
|
val sent = webSocket?.send("heartbeat") ?: false
|
||||||
if (sent) {
|
if (sent) {
|
||||||
log("💓 Heartbeat OK (socket=$socketAlive, state=$currentState)")
|
val now = System.currentTimeMillis()
|
||||||
|
if (now - lastHeartbeatOkLogAtMs >= HEARTBEAT_OK_LOG_THROTTLE_MS) {
|
||||||
|
val suppressed = heartbeatOkSuppressedCount
|
||||||
|
heartbeatOkSuppressedCount = 0
|
||||||
|
lastHeartbeatOkLogAtMs = now
|
||||||
|
val suffix =
|
||||||
|
if (suppressed > 0) ", +$suppressed suppressed" else ""
|
||||||
|
log("💓 Heartbeat OK (socket=$socketAlive, state=$currentState$suffix)")
|
||||||
|
} else {
|
||||||
|
heartbeatOkSuppressedCount++
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
log("💔 HEARTBEAT FAILED: socket=$socketAlive, state=$currentState, manuallyClosed=$isManuallyClosed")
|
log("💔 HEARTBEAT FAILED: socket=$socketAlive, state=$currentState, manuallyClosed=$isManuallyClosed")
|
||||||
// Триггерим reconnect если heartbeat не прошёл
|
// Триггерим reconnect если heartbeat не прошёл
|
||||||
@@ -573,6 +602,7 @@ class Protocol(
|
|||||||
handshakeComplete = false
|
handshakeComplete = false
|
||||||
handshakeJob?.cancel()
|
handshakeJob?.cancel()
|
||||||
heartbeatJob?.cancel()
|
heartbeatJob?.cancel()
|
||||||
|
heartbeatPeriodMs = 0L
|
||||||
|
|
||||||
// Автоматический reconnect с защитой от бесконечных попыток
|
// Автоматический reconnect с защитой от бесконечных попыток
|
||||||
if (!isManuallyClosed) {
|
if (!isManuallyClosed) {
|
||||||
@@ -628,6 +658,7 @@ class Protocol(
|
|||||||
reconnectJob = null
|
reconnectJob = null
|
||||||
handshakeJob?.cancel()
|
handshakeJob?.cancel()
|
||||||
heartbeatJob?.cancel()
|
heartbeatJob?.cancel()
|
||||||
|
heartbeatPeriodMs = 0L
|
||||||
webSocket?.close(1000, "User disconnected")
|
webSocket?.close(1000, "User disconnected")
|
||||||
webSocket = null
|
webSocket = null
|
||||||
_state.value = ProtocolState.DISCONNECTED
|
_state.value = ProtocolState.DISCONNECTED
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import com.rosetta.messenger.data.AccountManager
|
|||||||
import com.rosetta.messenger.data.GroupRepository
|
import com.rosetta.messenger.data.GroupRepository
|
||||||
import com.rosetta.messenger.data.MessageRepository
|
import com.rosetta.messenger.data.MessageRepository
|
||||||
import com.rosetta.messenger.data.isPlaceholderAccountName
|
import com.rosetta.messenger.data.isPlaceholderAccountName
|
||||||
|
import com.rosetta.messenger.utils.MessageLogger
|
||||||
import kotlinx.coroutines.*
|
import kotlinx.coroutines.*
|
||||||
import kotlinx.coroutines.channels.Channel
|
import kotlinx.coroutines.channels.Channel
|
||||||
import kotlinx.coroutines.flow.MutableStateFlow
|
import kotlinx.coroutines.flow.MutableStateFlow
|
||||||
@@ -30,6 +31,7 @@ object ProtocolManager {
|
|||||||
private const val SYNC_REQUEST_TIMEOUT_MS = 12_000L
|
private const val SYNC_REQUEST_TIMEOUT_MS = 12_000L
|
||||||
private const val MAX_DEBUG_LOGS = 600
|
private const val MAX_DEBUG_LOGS = 600
|
||||||
private const val DEBUG_LOG_FLUSH_DELAY_MS = 60L
|
private const val DEBUG_LOG_FLUSH_DELAY_MS = 60L
|
||||||
|
private const val HEARTBEAT_OK_LOG_MIN_INTERVAL_MS = 5_000L
|
||||||
private const val TYPING_INDICATOR_TIMEOUT_MS = 3_000L
|
private const val TYPING_INDICATOR_TIMEOUT_MS = 3_000L
|
||||||
|
|
||||||
// Desktop parity: use the same primary WebSocket endpoint as desktop client.
|
// Desktop parity: use the same primary WebSocket endpoint as desktop client.
|
||||||
@@ -58,6 +60,8 @@ object ProtocolManager {
|
|||||||
private val debugLogsLock = Any()
|
private val debugLogsLock = Any()
|
||||||
@Volatile private var debugFlushJob: Job? = null
|
@Volatile private var debugFlushJob: Job? = null
|
||||||
private val debugFlushPending = AtomicBoolean(false)
|
private val debugFlushPending = AtomicBoolean(false)
|
||||||
|
@Volatile private var lastHeartbeatOkLogAtMs: Long = 0L
|
||||||
|
@Volatile private var suppressedHeartbeatOkLogs: Int = 0
|
||||||
|
|
||||||
// Typing status
|
// Typing status
|
||||||
private val _typingUsers = MutableStateFlow<Set<String>>(emptySet())
|
private val _typingUsers = MutableStateFlow<Set<String>>(emptySet())
|
||||||
@@ -89,8 +93,8 @@ object ProtocolManager {
|
|||||||
private fun normalizeSearchQuery(value: String): String =
|
private fun normalizeSearchQuery(value: String): String =
|
||||||
value.trim().removePrefix("@").lowercase(Locale.ROOT)
|
value.trim().removePrefix("@").lowercase(Locale.ROOT)
|
||||||
|
|
||||||
// UI logs are enabled by default; updates are throttled and bounded by MAX_DEBUG_LOGS.
|
// Keep heavy protocol/message UI logs disabled by default.
|
||||||
private var uiLogsEnabled = true
|
private var uiLogsEnabled = false
|
||||||
private var lastProtocolState: ProtocolState? = null
|
private var lastProtocolState: ProtocolState? = null
|
||||||
@Volatile private var syncBatchInProgress = false
|
@Volatile private var syncBatchInProgress = false
|
||||||
private val _syncInProgress = MutableStateFlow(false)
|
private val _syncInProgress = MutableStateFlow(false)
|
||||||
@@ -128,9 +132,23 @@ object ProtocolManager {
|
|||||||
|
|
||||||
fun addLog(message: String) {
|
fun addLog(message: String) {
|
||||||
if (!uiLogsEnabled) return
|
if (!uiLogsEnabled) return
|
||||||
|
var normalizedMessage = message
|
||||||
|
val isHeartbeatOk = normalizedMessage.startsWith("💓 Heartbeat OK")
|
||||||
|
if (isHeartbeatOk) {
|
||||||
|
val now = System.currentTimeMillis()
|
||||||
|
if (now - lastHeartbeatOkLogAtMs < HEARTBEAT_OK_LOG_MIN_INTERVAL_MS) {
|
||||||
|
suppressedHeartbeatOkLogs++
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if (suppressedHeartbeatOkLogs > 0) {
|
||||||
|
normalizedMessage = "$normalizedMessage (+${suppressedHeartbeatOkLogs} skipped)"
|
||||||
|
suppressedHeartbeatOkLogs = 0
|
||||||
|
}
|
||||||
|
lastHeartbeatOkLogAtMs = now
|
||||||
|
}
|
||||||
val timestamp =
|
val timestamp =
|
||||||
java.text.SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault()).format(Date())
|
java.text.SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault()).format(Date())
|
||||||
val line = "[$timestamp] $message"
|
val line = "[$timestamp] $normalizedMessage"
|
||||||
synchronized(debugLogsLock) {
|
synchronized(debugLogsLock) {
|
||||||
if (debugLogsBuffer.size >= MAX_DEBUG_LOGS) {
|
if (debugLogsBuffer.size >= MAX_DEBUG_LOGS) {
|
||||||
debugLogsBuffer.removeFirst()
|
debugLogsBuffer.removeFirst()
|
||||||
@@ -142,6 +160,7 @@ object ProtocolManager {
|
|||||||
|
|
||||||
fun enableUILogs(enabled: Boolean) {
|
fun enableUILogs(enabled: Boolean) {
|
||||||
uiLogsEnabled = enabled
|
uiLogsEnabled = enabled
|
||||||
|
MessageLogger.setEnabled(enabled)
|
||||||
if (enabled) {
|
if (enabled) {
|
||||||
val snapshot = synchronized(debugLogsLock) { debugLogsBuffer.toList() }
|
val snapshot = synchronized(debugLogsLock) { debugLogsBuffer.toList() }
|
||||||
_debugLogs.value = snapshot
|
_debugLogs.value = snapshot
|
||||||
@@ -154,6 +173,8 @@ object ProtocolManager {
|
|||||||
synchronized(debugLogsLock) {
|
synchronized(debugLogsLock) {
|
||||||
debugLogsBuffer.clear()
|
debugLogsBuffer.clear()
|
||||||
}
|
}
|
||||||
|
suppressedHeartbeatOkLogs = 0
|
||||||
|
lastHeartbeatOkLogAtMs = 0L
|
||||||
_debugLogs.value = emptyList()
|
_debugLogs.value = emptyList()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -40,9 +40,16 @@ fun ConnectionLogsScreen(
|
|||||||
val listState = rememberLazyListState()
|
val listState = rememberLazyListState()
|
||||||
val scope = rememberCoroutineScope()
|
val scope = rememberCoroutineScope()
|
||||||
|
|
||||||
|
DisposableEffect(Unit) {
|
||||||
|
ProtocolManager.enableUILogs(true)
|
||||||
|
onDispose {
|
||||||
|
ProtocolManager.enableUILogs(false)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
LaunchedEffect(logs.size) {
|
LaunchedEffect(logs.size) {
|
||||||
if (logs.isNotEmpty()) {
|
if (logs.isNotEmpty()) {
|
||||||
listState.animateScrollToItem(logs.size - 1)
|
listState.scrollToItem(logs.size - 1)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -89,7 +96,7 @@ fun ConnectionLogsScreen(
|
|||||||
|
|
||||||
IconButton(onClick = {
|
IconButton(onClick = {
|
||||||
scope.launch {
|
scope.launch {
|
||||||
if (logs.isNotEmpty()) listState.animateScrollToItem(logs.size - 1)
|
if (logs.isNotEmpty()) listState.scrollToItem(logs.size - 1)
|
||||||
}
|
}
|
||||||
}) {
|
}) {
|
||||||
Icon(
|
Icon(
|
||||||
|
|||||||
@@ -101,7 +101,8 @@ fun SearchScreen(
|
|||||||
protocolState: ProtocolState,
|
protocolState: ProtocolState,
|
||||||
onBackClick: () -> Unit,
|
onBackClick: () -> Unit,
|
||||||
onUserSelect: (SearchUser) -> Unit,
|
onUserSelect: (SearchUser) -> Unit,
|
||||||
onNavigateToCrashLogs: () -> Unit = {}
|
onNavigateToCrashLogs: () -> Unit = {},
|
||||||
|
onNavigateToConnectionLogs: () -> Unit = {}
|
||||||
) {
|
) {
|
||||||
// Context и View для мгновенного закрытия клавиатуры
|
// Context и View для мгновенного закрытия клавиатуры
|
||||||
val context = LocalContext.current
|
val context = LocalContext.current
|
||||||
@@ -150,6 +151,11 @@ fun SearchScreen(
|
|||||||
if (searchQuery.trim().equals("rosettadev1", ignoreCase = true)) {
|
if (searchQuery.trim().equals("rosettadev1", ignoreCase = true)) {
|
||||||
searchViewModel.clearSearchQuery()
|
searchViewModel.clearSearchQuery()
|
||||||
onNavigateToCrashLogs()
|
onNavigateToCrashLogs()
|
||||||
|
return@LaunchedEffect
|
||||||
|
}
|
||||||
|
if (searchQuery.trim().equals("rosettadev2", ignoreCase = true)) {
|
||||||
|
searchViewModel.clearSearchQuery()
|
||||||
|
onNavigateToConnectionLogs()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -14,9 +14,12 @@ import com.rosetta.messenger.network.ProtocolManager
|
|||||||
object MessageLogger {
|
object MessageLogger {
|
||||||
private const val TAG = "RosettaMsg"
|
private const val TAG = "RosettaMsg"
|
||||||
|
|
||||||
// Всегда включён — вывод идёт только в ProtocolManager.addLog() (in-memory UI),
|
@Volatile
|
||||||
// не в logcat, безопасно для release
|
private var isEnabled: Boolean = false
|
||||||
private val isEnabled: Boolean = true
|
|
||||||
|
fun setEnabled(enabled: Boolean) {
|
||||||
|
isEnabled = enabled
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Добавить лог в UI (Debug Logs в чате)
|
* Добавить лог в UI (Debug Logs в чате)
|
||||||
|
|||||||
Reference in New Issue
Block a user