Files
mobile-ios/Rosetta/Core/Network/Protocol/PacketAwaiter.swift

130 lines
4.6 KiB
Swift

import Foundation
// MARK: - Packet Awaiter
/// Async/await wrapper for request-response packet exchanges.
/// Sends a packet and waits for a typed response matching a predicate, with timeout.
///
/// Uses ProtocolManager's one-shot handler mechanism. Thread-safe: handlers fire on
/// URLSession delegate queue, continuation resumed via `@Sendable`.
enum PacketAwaiter {
enum AwaitError: Error, LocalizedError {
case timeout
case notConnected
var errorDescription: String? {
switch self {
case .timeout: return "Server did not respond in time"
case .notConnected: return "Not connected to server"
}
}
}
/// Sends `outgoing` packet and waits for a response of type `T` matching `predicate`.
///
/// - Parameters:
/// - outgoing: The packet to send.
/// - responsePacketId: The packet ID of the expected response (e.g., `0x11` for PacketCreateGroup).
/// - timeout: Maximum wait time in seconds (default 15).
/// - predicate: Filter to match the correct response (e.g., matching groupId).
/// - Returns: The matched response packet.
@MainActor
static func send<T: Packet>(
_ outgoing: some Packet,
awaitResponse responsePacketId: Int,
timeout: TimeInterval = 15,
where predicate: @escaping @Sendable (T) -> Bool = { _ in true }
) async throws -> T {
let proto = ProtocolManager.shared
guard proto.connectionState == .authenticated else {
throw AwaitError.notConnected
}
return try await withCheckedThrowingContinuation { continuation in
// Guard against double-resume (timeout + response race).
let resumed = AtomicFlag()
var handlerId: UUID?
var timeoutTask: Task<Void, Never>?
let id = proto.addGroupOneShotHandler(packetId: responsePacketId) { rawPacket in
guard let response = rawPacket as? T else { return false }
guard predicate(response) else { return false }
// Matched consume and resume.
if resumed.setIfFalse() {
timeoutTask?.cancel()
continuation.resume(returning: response)
}
return true
}
handlerId = id
proto.sendPacket(outgoing)
// Timeout fallback.
timeoutTask = Task {
try? await Task.sleep(nanoseconds: UInt64(timeout * 1_000_000_000))
guard !Task.isCancelled else { return }
if resumed.setIfFalse() {
if let hid = handlerId {
proto.removeGroupOneShotHandler(hid)
}
continuation.resume(throwing: AwaitError.timeout)
}
}
}
}
/// Waits for an incoming packet of type `T` without sending anything first.
@MainActor
static func awaitIncoming<T: Packet>(
packetId: Int,
timeout: TimeInterval = 15,
where predicate: @escaping @Sendable (T) -> Bool = { _ in true }
) async throws -> T {
let proto = ProtocolManager.shared
return try await withCheckedThrowingContinuation { continuation in
let resumed = AtomicFlag()
var handlerId: UUID?
var timeoutTask: Task<Void, Never>?
let id = proto.addGroupOneShotHandler(packetId: packetId) { rawPacket in
guard let response = rawPacket as? T else { return false }
guard predicate(response) else { return false }
if resumed.setIfFalse() {
timeoutTask?.cancel()
continuation.resume(returning: response)
}
return true
}
handlerId = id
timeoutTask = Task {
try? await Task.sleep(nanoseconds: UInt64(timeout * 1_000_000_000))
guard !Task.isCancelled else { return }
if resumed.setIfFalse() {
if let hid = handlerId {
proto.removeGroupOneShotHandler(hid)
}
continuation.resume(throwing: AwaitError.timeout)
}
}
}
}
}
// MARK: - AtomicFlag (lock-free double-resume guard)
/// Simple atomic boolean flag for preventing double continuation resume.
private final class AtomicFlag: @unchecked Sendable {
private var _value: Int32 = 0
/// Sets the flag to true. Returns `true` if it was previously false (first caller wins).
func setIfFalse() -> Bool {
OSAtomicCompareAndSwap32(0, 1, &_value)
}
}