Стабилизация sync и логов: heartbeat антиспам + Connection Logs через rosettadev2
This commit is contained in:
@@ -1379,6 +1379,11 @@ fun MainScreen(
|
||||
},
|
||||
onNavigateToCrashLogs = {
|
||||
navStack = navStack.filterNot { it is Screen.Search } + Screen.CrashLogs
|
||||
},
|
||||
onNavigateToConnectionLogs = {
|
||||
navStack =
|
||||
navStack.filterNot { it is Screen.Search } +
|
||||
Screen.ConnectionLogs
|
||||
}
|
||||
)
|
||||
}
|
||||
|
||||
@@ -885,11 +885,10 @@ class MessageRepository private constructor(private val context: Context) {
|
||||
unreadCount = dialog?.unreadCount ?: 0
|
||||
)
|
||||
|
||||
// 🔥 Запрашиваем информацию о пользователе для отображения имени вместо ключа
|
||||
// Desktop parity: always re-fetch on incoming message so renamed contacts
|
||||
// get their new name/username updated in the chat list.
|
||||
// 🔥 Запрашиваем информацию о пользователе для отображения имени вместо ключа.
|
||||
// Важно: не форсим повторный запрос на каждый входящий пакет — это создает
|
||||
// шторм PacketSearch во время sync и заметно тормозит обработку.
|
||||
if (!isGroupDialogKey(dialogOpponentKey)) {
|
||||
requestedUserInfoKeys.remove(dialogOpponentKey)
|
||||
requestUserInfo(dialogOpponentKey)
|
||||
} else {
|
||||
applyGroupDisplayNameToDialog(account, dialogOpponentKey)
|
||||
|
||||
@@ -33,6 +33,9 @@ class Protocol(
|
||||
private const val RECONNECT_INTERVAL = 5000L // 5 seconds (как в Архиве)
|
||||
private const val HANDSHAKE_TIMEOUT = 10000L // 10 seconds
|
||||
private const val MIN_PACKET_ID_BITS = 18 // Stream.readInt16() = 2 * readInt8() (9 bits each)
|
||||
private const val DEFAULT_HEARTBEAT_INTERVAL_SECONDS = 15
|
||||
private const val MIN_HEARTBEAT_SEND_INTERVAL_MS = 2_000L
|
||||
private const val HEARTBEAT_OK_LOG_THROTTLE_MS = 30_000L
|
||||
}
|
||||
|
||||
private fun log(message: String) {
|
||||
@@ -112,6 +115,9 @@ class Protocol(
|
||||
|
||||
// Heartbeat
|
||||
private var heartbeatJob: Job? = null
|
||||
@Volatile private var heartbeatPeriodMs: Long = 0L
|
||||
@Volatile private var lastHeartbeatOkLogAtMs: Long = 0L
|
||||
@Volatile private var heartbeatOkSuppressedCount: Int = 0
|
||||
|
||||
// Supported packets
|
||||
private val supportedPackets = mapOf(
|
||||
@@ -179,11 +185,24 @@ class Protocol(
|
||||
* Как в Архиве - отправляем text "heartbeat" СРАЗУ и потом с интервалом
|
||||
*/
|
||||
private fun startHeartbeat(intervalSeconds: Int) {
|
||||
val normalizedServerIntervalSec =
|
||||
if (intervalSeconds > 0) intervalSeconds else DEFAULT_HEARTBEAT_INTERVAL_SECONDS
|
||||
// Отправляем чаще - каждые 1/3 интервала, но с нижним лимитом чтобы исключить tight-loop.
|
||||
val intervalMs =
|
||||
((normalizedServerIntervalSec * 1000L) / 3).coerceAtLeast(MIN_HEARTBEAT_SEND_INTERVAL_MS)
|
||||
|
||||
if (heartbeatJob?.isActive == true && heartbeatPeriodMs == intervalMs) {
|
||||
return
|
||||
}
|
||||
|
||||
heartbeatJob?.cancel()
|
||||
|
||||
// Отправляем чаще - каждые 1/3 интервала (чтобы не терять соединение)
|
||||
val intervalMs = (intervalSeconds * 1000L) / 3
|
||||
log("💓 HEARTBEAT START: server=${intervalSeconds}s, sending=${intervalMs/1000}s, state=${_state.value}")
|
||||
heartbeatPeriodMs = intervalMs
|
||||
lastHeartbeatOkLogAtMs = 0L
|
||||
heartbeatOkSuppressedCount = 0
|
||||
log(
|
||||
"💓 HEARTBEAT START: server=${intervalSeconds}s(normalized=${normalizedServerIntervalSec}s), " +
|
||||
"sending=${intervalMs / 1000}s, state=${_state.value}"
|
||||
)
|
||||
|
||||
heartbeatJob = scope.launch {
|
||||
// ⚡ СРАЗУ отправляем первый heartbeat (как в Архиве)
|
||||
@@ -210,7 +229,17 @@ class Protocol(
|
||||
) {
|
||||
val sent = webSocket?.send("heartbeat") ?: false
|
||||
if (sent) {
|
||||
log("💓 Heartbeat OK (socket=$socketAlive, state=$currentState)")
|
||||
val now = System.currentTimeMillis()
|
||||
if (now - lastHeartbeatOkLogAtMs >= HEARTBEAT_OK_LOG_THROTTLE_MS) {
|
||||
val suppressed = heartbeatOkSuppressedCount
|
||||
heartbeatOkSuppressedCount = 0
|
||||
lastHeartbeatOkLogAtMs = now
|
||||
val suffix =
|
||||
if (suppressed > 0) ", +$suppressed suppressed" else ""
|
||||
log("💓 Heartbeat OK (socket=$socketAlive, state=$currentState$suffix)")
|
||||
} else {
|
||||
heartbeatOkSuppressedCount++
|
||||
}
|
||||
} else {
|
||||
log("💔 HEARTBEAT FAILED: socket=$socketAlive, state=$currentState, manuallyClosed=$isManuallyClosed")
|
||||
// Триггерим reconnect если heartbeat не прошёл
|
||||
@@ -573,6 +602,7 @@ class Protocol(
|
||||
handshakeComplete = false
|
||||
handshakeJob?.cancel()
|
||||
heartbeatJob?.cancel()
|
||||
heartbeatPeriodMs = 0L
|
||||
|
||||
// Автоматический reconnect с защитой от бесконечных попыток
|
||||
if (!isManuallyClosed) {
|
||||
@@ -628,6 +658,7 @@ class Protocol(
|
||||
reconnectJob = null
|
||||
handshakeJob?.cancel()
|
||||
heartbeatJob?.cancel()
|
||||
heartbeatPeriodMs = 0L
|
||||
webSocket?.close(1000, "User disconnected")
|
||||
webSocket = null
|
||||
_state.value = ProtocolState.DISCONNECTED
|
||||
|
||||
@@ -6,6 +6,7 @@ import com.rosetta.messenger.data.AccountManager
|
||||
import com.rosetta.messenger.data.GroupRepository
|
||||
import com.rosetta.messenger.data.MessageRepository
|
||||
import com.rosetta.messenger.data.isPlaceholderAccountName
|
||||
import com.rosetta.messenger.utils.MessageLogger
|
||||
import kotlinx.coroutines.*
|
||||
import kotlinx.coroutines.channels.Channel
|
||||
import kotlinx.coroutines.flow.MutableStateFlow
|
||||
@@ -30,6 +31,7 @@ object ProtocolManager {
|
||||
private const val SYNC_REQUEST_TIMEOUT_MS = 12_000L
|
||||
private const val MAX_DEBUG_LOGS = 600
|
||||
private const val DEBUG_LOG_FLUSH_DELAY_MS = 60L
|
||||
private const val HEARTBEAT_OK_LOG_MIN_INTERVAL_MS = 5_000L
|
||||
private const val TYPING_INDICATOR_TIMEOUT_MS = 3_000L
|
||||
private const val PACKET_SIGNAL_PEER = 0x1A
|
||||
private const val PACKET_WEB_RTC = 0x1B
|
||||
@@ -61,6 +63,8 @@ object ProtocolManager {
|
||||
private val debugLogsLock = Any()
|
||||
@Volatile private var debugFlushJob: Job? = null
|
||||
private val debugFlushPending = AtomicBoolean(false)
|
||||
@Volatile private var lastHeartbeatOkLogAtMs: Long = 0L
|
||||
@Volatile private var suppressedHeartbeatOkLogs: Int = 0
|
||||
|
||||
// Typing status
|
||||
private val _typingUsers = MutableStateFlow<Set<String>>(emptySet())
|
||||
@@ -92,8 +96,8 @@ object ProtocolManager {
|
||||
private fun normalizeSearchQuery(value: String): String =
|
||||
value.trim().removePrefix("@").lowercase(Locale.ROOT)
|
||||
|
||||
// UI logs are enabled by default; updates are throttled and bounded by MAX_DEBUG_LOGS.
|
||||
private var uiLogsEnabled = true
|
||||
// Keep heavy protocol/message UI logs disabled by default.
|
||||
private var uiLogsEnabled = false
|
||||
private var lastProtocolState: ProtocolState? = null
|
||||
@Volatile private var syncBatchInProgress = false
|
||||
private val _syncInProgress = MutableStateFlow(false)
|
||||
@@ -131,9 +135,23 @@ object ProtocolManager {
|
||||
|
||||
fun addLog(message: String) {
|
||||
if (!uiLogsEnabled) return
|
||||
var normalizedMessage = message
|
||||
val isHeartbeatOk = normalizedMessage.startsWith("💓 Heartbeat OK")
|
||||
if (isHeartbeatOk) {
|
||||
val now = System.currentTimeMillis()
|
||||
if (now - lastHeartbeatOkLogAtMs < HEARTBEAT_OK_LOG_MIN_INTERVAL_MS) {
|
||||
suppressedHeartbeatOkLogs++
|
||||
return
|
||||
}
|
||||
if (suppressedHeartbeatOkLogs > 0) {
|
||||
normalizedMessage = "$normalizedMessage (+${suppressedHeartbeatOkLogs} skipped)"
|
||||
suppressedHeartbeatOkLogs = 0
|
||||
}
|
||||
lastHeartbeatOkLogAtMs = now
|
||||
}
|
||||
val timestamp =
|
||||
java.text.SimpleDateFormat("HH:mm:ss.SSS", Locale.getDefault()).format(Date())
|
||||
val line = "[$timestamp] $message"
|
||||
val line = "[$timestamp] $normalizedMessage"
|
||||
synchronized(debugLogsLock) {
|
||||
if (debugLogsBuffer.size >= MAX_DEBUG_LOGS) {
|
||||
debugLogsBuffer.removeFirst()
|
||||
@@ -145,6 +163,7 @@ object ProtocolManager {
|
||||
|
||||
fun enableUILogs(enabled: Boolean) {
|
||||
uiLogsEnabled = enabled
|
||||
MessageLogger.setEnabled(enabled)
|
||||
if (enabled) {
|
||||
val snapshot = synchronized(debugLogsLock) { debugLogsBuffer.toList() }
|
||||
_debugLogs.value = snapshot
|
||||
@@ -157,6 +176,8 @@ object ProtocolManager {
|
||||
synchronized(debugLogsLock) {
|
||||
debugLogsBuffer.clear()
|
||||
}
|
||||
suppressedHeartbeatOkLogs = 0
|
||||
lastHeartbeatOkLogAtMs = 0L
|
||||
_debugLogs.value = emptyList()
|
||||
}
|
||||
|
||||
|
||||
@@ -40,9 +40,16 @@ fun ConnectionLogsScreen(
|
||||
val listState = rememberLazyListState()
|
||||
val scope = rememberCoroutineScope()
|
||||
|
||||
DisposableEffect(Unit) {
|
||||
ProtocolManager.enableUILogs(true)
|
||||
onDispose {
|
||||
ProtocolManager.enableUILogs(false)
|
||||
}
|
||||
}
|
||||
|
||||
LaunchedEffect(logs.size) {
|
||||
if (logs.isNotEmpty()) {
|
||||
listState.animateScrollToItem(logs.size - 1)
|
||||
listState.scrollToItem(logs.size - 1)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -89,7 +96,7 @@ fun ConnectionLogsScreen(
|
||||
|
||||
IconButton(onClick = {
|
||||
scope.launch {
|
||||
if (logs.isNotEmpty()) listState.animateScrollToItem(logs.size - 1)
|
||||
if (logs.isNotEmpty()) listState.scrollToItem(logs.size - 1)
|
||||
}
|
||||
}) {
|
||||
Icon(
|
||||
|
||||
@@ -101,7 +101,8 @@ fun SearchScreen(
|
||||
protocolState: ProtocolState,
|
||||
onBackClick: () -> Unit,
|
||||
onUserSelect: (SearchUser) -> Unit,
|
||||
onNavigateToCrashLogs: () -> Unit = {}
|
||||
onNavigateToCrashLogs: () -> Unit = {},
|
||||
onNavigateToConnectionLogs: () -> Unit = {}
|
||||
) {
|
||||
// Context и View для мгновенного закрытия клавиатуры
|
||||
val context = LocalContext.current
|
||||
@@ -150,6 +151,11 @@ fun SearchScreen(
|
||||
if (searchQuery.trim().equals("rosettadev1", ignoreCase = true)) {
|
||||
searchViewModel.clearSearchQuery()
|
||||
onNavigateToCrashLogs()
|
||||
return@LaunchedEffect
|
||||
}
|
||||
if (searchQuery.trim().equals("rosettadev2", ignoreCase = true)) {
|
||||
searchViewModel.clearSearchQuery()
|
||||
onNavigateToConnectionLogs()
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -13,10 +13,13 @@ import com.rosetta.messenger.network.ProtocolManager
|
||||
*/
|
||||
object MessageLogger {
|
||||
private const val TAG = "RosettaMsg"
|
||||
|
||||
// Всегда включён — вывод идёт только в ProtocolManager.addLog() (in-memory UI),
|
||||
// не в logcat, безопасно для release
|
||||
private val isEnabled: Boolean = true
|
||||
|
||||
@Volatile
|
||||
private var isEnabled: Boolean = false
|
||||
|
||||
fun setEnabled(enabled: Boolean) {
|
||||
isEnabled = enabled
|
||||
}
|
||||
|
||||
/**
|
||||
* Добавить лог в UI (Debug Logs в чате)
|
||||
|
||||
Reference in New Issue
Block a user