Files
mobile-ios/Rosetta/Core/Network/Protocol/WebSocketClient.swift

252 lines
9.3 KiB
Swift

import Foundation
import Network
import os
/// Native URLSession-based WebSocket client for Rosetta protocol.
final class WebSocketClient: NSObject, @unchecked Sendable, URLSessionWebSocketDelegate {
private static let logger = Logger(subsystem: "com.rosetta.messenger", category: "WebSocket")
private let url = URL(string: "wss://wss.rosetta.im")!
private var session: URLSession!
private var webSocketTask: URLSessionWebSocketTask?
private var isManuallyClosed = false
private var reconnectTask: Task<Void, Never>?
private var hasNotifiedConnected = false
private(set) var isConnected = false
/// Android parity: prevents concurrent connect() calls and suppresses handleDisconnect
/// while a connection attempt is already in progress (Protocol.kt: `isConnecting` flag).
private(set) var isConnecting = false
private var disconnectHandledForCurrentSocket = false
/// Android parity: exponential backoff counter, reset on AUTHENTICATED (not on open).
private var reconnectAttempts = 0
/// NWPathMonitor for instant reconnect on network changes (Wi-Fi cellular, etc.).
private let networkMonitor = NWPathMonitor()
private var lastNetworkPath: NWPath.Status?
var onConnected: (() -> Void)?
var onDisconnected: ((Error?) -> Void)?
var onDataReceived: ((Data) -> Void)?
/// Called when network becomes available and we need to reconnect.
var onNetworkRestored: (() -> Void)?
override init() {
super.init()
let config = URLSessionConfiguration.default
// Don't wait for connectivity fail fast so NWPathMonitor can trigger
// instant reconnect when network becomes available.
config.waitsForConnectivity = false
config.timeoutIntervalForRequest = 10
config.timeoutIntervalForResource = 15
session = URLSession(configuration: config, delegate: self, delegateQueue: nil)
startNetworkMonitor()
}
private func startNetworkMonitor() {
networkMonitor.pathUpdateHandler = { [weak self] path in
guard let self else { return }
let previous = self.lastNetworkPath
self.lastNetworkPath = path.status
// Only trigger on transition to .satisfied (network restored).
// Skip the initial callback (previous == nil).
if path.status == .satisfied, previous != nil, previous != .satisfied {
Self.logger.info("Network restored — triggering reconnect")
self.onNetworkRestored?()
}
}
networkMonitor.start(queue: DispatchQueue(label: "com.rosetta.networkMonitor"))
}
// MARK: - Connection
func connect() {
// Android parity: prevent duplicate connect() calls (Protocol.kt lines 237-256).
guard webSocketTask == nil else { return }
guard !isConnecting else {
Self.logger.info("Already connecting, skipping duplicate connect()")
return
}
isConnecting = true
isManuallyClosed = false
hasNotifiedConnected = false
isConnected = false
disconnectHandledForCurrentSocket = false
Self.logger.info("Connecting to \(self.url.absoluteString)")
let task = session.webSocketTask(with: url)
webSocketTask = task
task.resume()
receiveLoop()
}
func disconnect() {
Self.logger.info("Manual disconnect")
isManuallyClosed = true
isConnecting = false
reconnectTask?.cancel()
reconnectTask = nil
webSocketTask?.cancel(with: .goingAway, reason: nil)
webSocketTask = nil
isConnected = false
}
/// Immediately reconnect, bypassing scheduled retry.
/// Android parity: `reconnectNowIfNeeded()` reset backoff and connect.
func forceReconnect() {
guard !isManuallyClosed else { return }
reconnectTask?.cancel()
reconnectTask = nil
// Always tear down and reconnect connection may be zombie after background
webSocketTask?.cancel(with: .goingAway, reason: nil)
webSocketTask = nil
isConnected = false
isConnecting = false
disconnectHandledForCurrentSocket = false
// Android parity: reset backoff so next failure starts from 1s, not stale 8s/16s.
reconnectAttempts = 0
Self.logger.info("Force reconnect triggered")
connect()
}
/// Android parity: reset backoff counter on successful AUTHENTICATED state.
func resetReconnectAttempts() {
reconnectAttempts = 0
}
@discardableResult
func send(_ data: Data, onFailure: ((Error?) -> Void)? = nil) -> Bool {
guard isConnected, let task = webSocketTask else {
Self.logger.warning("Cannot send: no active connection")
return false
}
task.send(.data(data)) { [weak self] error in
guard let self else { return }
if let error {
Self.logger.error("Send error: \(error.localizedDescription)")
onFailure?(error)
self.handleDisconnect(error: error)
}
}
return true
}
func sendText(_ text: String) {
guard let task = webSocketTask else { return }
task.send(.string(text)) { [weak self] error in
if let error {
Self.logger.error("Send text error: \(error.localizedDescription)")
self?.handleDisconnect(error: error)
}
}
}
/// Sends a WebSocket-level ping to verify the connection is alive.
/// Completion receives nil on success (pong received) or an Error on failure.
func sendPing(completion: @escaping (Error?) -> Void) {
guard let task = webSocketTask else {
completion(NSError(domain: "WebSocket", code: -1,
userInfo: [NSLocalizedDescriptionKey: "No active WebSocket task"]))
return
}
task.sendPing { error in
completion(error)
}
}
// MARK: - URLSessionWebSocketDelegate
nonisolated func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didOpenWithProtocol protocol: String?) {
Self.logger.info("WebSocket didOpen")
guard !isManuallyClosed else { return }
// Android parity: reset isConnecting on successful open (Protocol.kt onOpen).
isConnecting = false
hasNotifiedConnected = true
isConnected = true
disconnectHandledForCurrentSocket = false
reconnectTask?.cancel()
reconnectTask = nil
// Android parity: backoff reset moved to AUTHENTICATED (ProtocolManager).
// Do NOT reset here handshake may still fail.
onConnected?()
}
nonisolated func urlSession(_ session: URLSession, webSocketTask: URLSessionWebSocketTask, didCloseWith closeCode: URLSessionWebSocketTask.CloseCode, reason: Data?) {
Self.logger.info("WebSocket didClose: \(closeCode.rawValue)")
isConnecting = false
isConnected = false
handleDisconnect(error: nil)
}
// MARK: - Receive Loop
private func receiveLoop() {
guard let task = webSocketTask else { return }
task.receive { [weak self] result in
guard let self, !isManuallyClosed else { return }
switch result {
case .success(let message):
switch message {
case .data(let data):
self.onDataReceived?(data)
case .string(let text):
Self.logger.debug("Received text: \(text)")
@unknown default:
break
}
self.receiveLoop()
case .failure(let error):
Self.logger.error("Receive error: \(error.localizedDescription)")
self.handleDisconnect(error: error)
}
}
}
// MARK: - Reconnection
private func handleDisconnect(error: Error?) {
// Android parity (Protocol.kt:562-566): if a new connection is already
// in progress, ignore stale disconnect from previous socket.
if isConnecting {
Self.logger.info("Disconnect ignored: connection already in progress")
return
}
if disconnectHandledForCurrentSocket {
return
}
disconnectHandledForCurrentSocket = true
webSocketTask = nil
isConnected = false
isConnecting = false
onDisconnected?(error)
guard !isManuallyClosed else { return }
guard reconnectTask == nil else { return }
// Android parity: exponential backoff 1s, 2s, 4s, 8s, 16s, 30s (cap).
// No instant first attempt. Formula: min(1000 * 2^(n-1), 30000).
reconnectAttempts += 1
if reconnectAttempts > 20 {
Self.logger.warning("⚠️ Too many reconnect attempts (\(self.reconnectAttempts)), may be stuck in loop")
}
let exponent = min(reconnectAttempts - 1, 4)
let delayMs = min(1000 * (1 << exponent), 30000)
reconnectTask = Task { [weak self] in
Self.logger.info("Reconnecting in \(delayMs)ms (attempt #\(self?.reconnectAttempts ?? 0))...")
try? await Task.sleep(nanoseconds: UInt64(delayMs) * 1_000_000)
guard let self, !isManuallyClosed, !Task.isCancelled else { return }
self.reconnectTask = nil
self.connect()
}
}
}