2500 lines
111 KiB
Swift
2500 lines
111 KiB
Swift
import Foundation
|
||
import Observation
|
||
import os
|
||
import UIKit
|
||
import CommonCrypto
|
||
import UserNotifications
|
||
|
||
/// Bridges AccountManager, CryptoManager, and ProtocolManager into a unified session lifecycle.
|
||
@Observable
|
||
@MainActor
|
||
final class SessionManager {
|
||
|
||
static let shared = SessionManager()
|
||
|
||
nonisolated private static let logger = Logger(subsystem: "com.rosetta.messenger", category: "Session")
|
||
|
||
enum StartSessionError: LocalizedError {
|
||
case invalidCredentials
|
||
case databaseBootstrapFailed(underlying: Error)
|
||
|
||
var errorDescription: String? {
|
||
switch self {
|
||
case .invalidCredentials:
|
||
return "Wrong password. Please try again."
|
||
case .databaseBootstrapFailed:
|
||
return "Database migration failed. Please restart the app and try again."
|
||
}
|
||
}
|
||
}
|
||
|
||
private(set) var isAuthenticated = false
|
||
private(set) var currentPublicKey: String = ""
|
||
private(set) var displayName: String = ""
|
||
private(set) var username: String = ""
|
||
|
||
/// Hex-encoded private key hash, kept in memory for the session duration.
|
||
private(set) var privateKeyHash: String?
|
||
/// Hex-encoded raw private key, kept in memory for message decryption.
|
||
private(set) var privateKeyHex: String?
|
||
private var lastTypingSentAt: [String: Int64] = [:]
|
||
/// Desktop parity: exposed so chat list can suppress unread badges during sync.
|
||
private(set) var syncBatchInProgress = false
|
||
private var syncRequestInFlight = false
|
||
private var pendingIncomingMessages: [PacketMessage] = []
|
||
private var isProcessingIncomingMessages = false
|
||
/// Android parity: tracks the latest incoming message timestamp per dialog
|
||
/// for which a read receipt was already sent. Prevents redundant sends.
|
||
private var lastReadReceiptTimestamp: [String: Int64] = [:]
|
||
/// Cross-device reads received during sync — re-applied at BATCH_END.
|
||
/// PacketRead can arrive BEFORE the sync messages, so markIncomingAsRead
|
||
/// updates 0 rows. Re-applying after sync ensures the read state sticks.
|
||
private var pendingSyncReads: Set<String> = []
|
||
/// Opponent read receipts received during sync — re-applied at BATCH_END.
|
||
/// Mirrors `pendingSyncReads` for own-device reads. Without this,
|
||
/// markOutgoingAsRead() races with message insertion (0 rows affected)
|
||
/// because iOS processes reads via separate Task, not the inbound queue.
|
||
/// Android/Desktop don't need this — all packets go through one FIFO queue.
|
||
private var pendingOpponentReads: Set<String> = []
|
||
private var requestedUserInfoKeys: Set<String> = []
|
||
private var onlineSubscribedKeys: Set<String> = []
|
||
private var pendingOutgoingRetryTasks: [String: Task<Void, Never>] = [:]
|
||
private var pendingOutgoingPackets: [String: PacketMessage] = [:]
|
||
private var pendingOutgoingAttempts: [String: Int] = [:]
|
||
private let maxOutgoingRetryAttempts = ProtocolConstants.maxOutgoingRetryAttempts
|
||
private let maxOutgoingWaitingLifetimeMs: Int64 = ProtocolConstants.messageDeliveryTimeoutS * 1000
|
||
var attachmentFlowTransport: AttachmentFlowTransporting = LiveAttachmentFlowTransport()
|
||
var packetFlowSender: PacketFlowSending = LivePacketFlowSender()
|
||
|
||
// MARK: - Foreground Detection (Android parity)
|
||
|
||
private var foregroundObserverToken: NSObjectProtocol?
|
||
/// Android parity: 5s debounce between foreground sync requests.
|
||
private var lastForegroundSyncTime: TimeInterval = 0
|
||
|
||
/// Whether the app is in the foreground.
|
||
private var isAppInForeground: Bool {
|
||
UIApplication.shared.applicationState == .active
|
||
}
|
||
|
||
private var userInfoSearchHandlerToken: UUID?
|
||
|
||
private init() {
|
||
setupProtocolCallbacks()
|
||
setupUserInfoSearchHandler()
|
||
setupForegroundObserver()
|
||
}
|
||
|
||
/// Re-apply cross-device reads that were received during sync.
|
||
/// Fixes race: PacketRead arrives before sync messages → markIncomingAsRead
|
||
/// updates 0 rows → messages stay unread. After sync delivers the messages,
|
||
/// this re-marks them as read.
|
||
private func reapplyPendingSyncReads() {
|
||
guard !pendingSyncReads.isEmpty else { return }
|
||
let keys = pendingSyncReads
|
||
pendingSyncReads.removeAll()
|
||
let myKey = currentPublicKey
|
||
for opponentKey in keys {
|
||
MessageRepository.shared.markIncomingAsRead(
|
||
opponentKey: opponentKey, myPublicKey: myKey
|
||
)
|
||
DialogRepository.shared.markAsRead(opponentKey: opponentKey)
|
||
}
|
||
}
|
||
|
||
/// Re-apply opponent read receipts that raced with message insertion during sync.
|
||
/// Android parity: Android doesn't need this because all packets go through one FIFO
|
||
/// queue (launchInboundPacketTask). iOS processes reads via separate Task, so they
|
||
/// can execute before messages are committed to DB → markOutgoingAsRead updates 0 rows.
|
||
private func reapplyPendingOpponentReads() {
|
||
guard !pendingOpponentReads.isEmpty else { return }
|
||
let keys = pendingOpponentReads
|
||
pendingOpponentReads.removeAll()
|
||
let myKey = currentPublicKey
|
||
for opponentKey in keys {
|
||
MessageRepository.shared.markOutgoingAsRead(
|
||
opponentKey: opponentKey, myPublicKey: myKey
|
||
)
|
||
DialogRepository.shared.markOutgoingAsRead(opponentKey: opponentKey)
|
||
}
|
||
}
|
||
|
||
/// Android parity (ON_RESUME): re-mark active dialogs as read and send read receipts.
|
||
/// Called on foreground resume. Android has no idle detection — just re-marks on resume.
|
||
func markActiveDialogsAsRead() {
|
||
let activeKeys = MessageRepository.shared.activeDialogKeys
|
||
let myKey = currentPublicKey
|
||
for dialogKey in activeKeys {
|
||
guard !SystemAccounts.isSystemAccount(dialogKey) else { continue }
|
||
guard MessageRepository.shared.isDialogReadEligible(dialogKey) else { continue }
|
||
DialogRepository.shared.markAsRead(opponentKey: dialogKey)
|
||
MessageRepository.shared.markIncomingAsRead(
|
||
opponentKey: dialogKey, myPublicKey: myKey
|
||
)
|
||
sendReadReceipt(toPublicKey: dialogKey)
|
||
}
|
||
}
|
||
|
||
// MARK: - Session Lifecycle
|
||
|
||
/// Called after password verification. Decrypts private key, connects WebSocket, and starts handshake.
|
||
func startSession(password: String) async throws {
|
||
let accountManager = AccountManager.shared
|
||
let crypto = CryptoManager.shared
|
||
|
||
// Decrypt private key
|
||
let privateKeyHex: String
|
||
do {
|
||
privateKeyHex = try await accountManager.decryptPrivateKey(password: password)
|
||
} catch {
|
||
throw StartSessionError.invalidCredentials
|
||
}
|
||
self.privateKeyHex = privateKeyHex
|
||
// Android parity: provide private key to caches for encryption at rest
|
||
AttachmentCache.shared.privateKey = privateKeyHex
|
||
Self.logger.info("Private key decrypted")
|
||
|
||
guard let account = accountManager.currentAccount else {
|
||
throw CryptoError.decryptionFailed
|
||
}
|
||
|
||
// Open SQLite database for this account (must happen before repository bootstrap).
|
||
do {
|
||
try DatabaseManager.shared.bootstrap(accountPublicKey: account.publicKey)
|
||
} catch {
|
||
self.privateKeyHex = nil
|
||
AttachmentCache.shared.privateKey = nil
|
||
Self.logger.error("Database bootstrap failed: \(error.localizedDescription)")
|
||
throw StartSessionError.databaseBootstrapFailed(underlying: error)
|
||
}
|
||
|
||
currentPublicKey = account.publicKey
|
||
displayName = account.displayName ?? ""
|
||
username = account.username ?? ""
|
||
CallManager.shared.bindAccount(publicKey: account.publicKey)
|
||
|
||
// Migrate legacy JSON → SQLite on first launch (before repositories read from DB).
|
||
let migrated = await DatabaseMigrationFromJSON.migrateIfNeeded(
|
||
accountPublicKey: account.publicKey,
|
||
storagePassword: privateKeyHex
|
||
)
|
||
if migrated > 0 {
|
||
Self.logger.info("Migrated \(migrated) messages from JSON to SQLite")
|
||
}
|
||
|
||
// Warm local state immediately, then let network sync reconcile updates.
|
||
await DialogRepository.shared.bootstrap(
|
||
accountPublicKey: account.publicKey,
|
||
storagePassword: privateKeyHex
|
||
)
|
||
await MessageRepository.shared.bootstrap(
|
||
accountPublicKey: account.publicKey,
|
||
storagePassword: privateKeyHex
|
||
)
|
||
RecentSearchesRepository.shared.setAccount(account.publicKey)
|
||
|
||
// Desktop parity: send release notes as a system message from "Rosetta Updates"
|
||
// account if the app version changed since the last notice.
|
||
sendReleaseNotesIfNeeded(publicKey: account.publicKey)
|
||
|
||
// Pre-warm PBKDF2 cache for message storage encryption.
|
||
// First encryptWithPassword() call costs 50-100ms (PBKDF2 derivation).
|
||
// All subsequent calls use NSLock-protected cache (<1ms).
|
||
// Fire-and-forget on background thread — completes before first sync message arrives.
|
||
let pkForCache = privateKeyHex
|
||
Task.detached(priority: .utility) {
|
||
_ = CryptoManager.shared.cachedPBKDF2(password: pkForCache)
|
||
}
|
||
|
||
// Generate private key hash for handshake
|
||
let hash = crypto.generatePrivateKeyHash(privateKeyHex: privateKeyHex)
|
||
privateKeyHash = hash
|
||
|
||
Self.logger.info("Connecting to server...")
|
||
|
||
// Connect + handshake
|
||
ProtocolManager.shared.connect(publicKey: account.publicKey, privateKeyHash: hash)
|
||
|
||
isAuthenticated = true
|
||
}
|
||
|
||
// MARK: - Message Sending
|
||
|
||
/// Sends an encrypted message to a recipient, matching Android's outgoing flow.
|
||
func sendMessage(text: String, toPublicKey: String, opponentTitle: String = "", opponentUsername: String = "") async throws {
|
||
PerformanceLogger.shared.track("session.sendMessage")
|
||
// Desktop parity: validate message is not empty/whitespace-only before sending.
|
||
let trimmed = text.trimmingCharacters(in: .whitespacesAndNewlines)
|
||
guard !trimmed.isEmpty else {
|
||
Self.logger.debug("📤 Ignoring empty message")
|
||
return
|
||
}
|
||
|
||
guard let privKey = privateKeyHex, let hash = privateKeyHash else {
|
||
Self.logger.error("📤 Cannot send — missing keys")
|
||
throw CryptoError.decryptionFailed
|
||
}
|
||
|
||
let connState = ProtocolManager.shared.connectionState
|
||
Self.logger.info("📤 sendMessage to \(toPublicKey.prefix(12))… conn=\(String(describing: connState))")
|
||
|
||
let messageId = UUID().uuidString.replacingOccurrences(of: "-", with: "").lowercased()
|
||
let timestamp = Int64(Date().timeIntervalSince1970 * 1000)
|
||
let packet = try makeOutgoingPacket(
|
||
text: text,
|
||
toPublicKey: toPublicKey,
|
||
messageId: messageId,
|
||
timestamp: timestamp,
|
||
privateKeyHex: privKey,
|
||
privateKeyHash: hash
|
||
)
|
||
let targetDialogKey = packet.toPublicKey
|
||
|
||
// Prefer caller-provided title/username (from ChatDetailView route),
|
||
// fall back to existing dialog data, then empty.
|
||
let existingDialog = DialogRepository.shared.dialogs[targetDialogKey]
|
||
let groupMetadata = GroupRepository.shared.groupMetadata(
|
||
account: currentPublicKey,
|
||
groupDialogKey: targetDialogKey
|
||
)
|
||
let title = !opponentTitle.isEmpty
|
||
? opponentTitle
|
||
: (existingDialog?.opponentTitle.isEmpty == false
|
||
? (existingDialog?.opponentTitle ?? "")
|
||
: (groupMetadata?.title ?? ""))
|
||
let username = !opponentUsername.isEmpty
|
||
? opponentUsername
|
||
: (existingDialog?.opponentUsername.isEmpty == false
|
||
? (existingDialog?.opponentUsername ?? "")
|
||
: (groupMetadata?.description ?? ""))
|
||
DialogRepository.shared.ensureDialog(
|
||
opponentKey: targetDialogKey,
|
||
title: title,
|
||
username: username,
|
||
myPublicKey: currentPublicKey
|
||
)
|
||
|
||
// Android parity: always insert as WAITING (status=0) regardless of connection state.
|
||
// Packet is queued in ProtocolManager and sent on reconnect; retry mechanism
|
||
// handles timeout (80s → ERROR). Previously iOS marked offline sends as ERROR
|
||
// immediately (Desktop parity), but this caused false ❌ on brief WebSocket drops.
|
||
// Android keeps WAITING (⏳) and delivers automatically — much better UX.
|
||
MessageRepository.shared.upsertFromMessagePacket(
|
||
packet,
|
||
myPublicKey: currentPublicKey,
|
||
decryptedText: text,
|
||
fromSync: false,
|
||
dialogIdentityOverride: targetDialogKey
|
||
)
|
||
DialogRepository.shared.updateDialogFromMessages(opponentKey: targetDialogKey)
|
||
|
||
// Android parity: persist IMMEDIATELY after inserting outgoing message.
|
||
// Without this, if app is killed within 800ms debounce window,
|
||
// the message is lost forever (only in memory, not on disk).
|
||
// Android saves to Room DB synchronously in sendMessage().
|
||
MessageRepository.shared.persistNow()
|
||
|
||
// Saved Messages: local-only, no server send
|
||
if targetDialogKey == currentPublicKey {
|
||
MessageRepository.shared.updateDeliveryStatus(messageId: messageId, status: .delivered)
|
||
DialogRepository.shared.updateDeliveryStatus(messageId: messageId, opponentKey: targetDialogKey, status: .delivered)
|
||
return
|
||
}
|
||
|
||
// Send via WebSocket (queued if offline, sent directly if online)
|
||
ProtocolManager.shared.sendPacket(packet)
|
||
registerOutgoingRetry(for: packet)
|
||
}
|
||
|
||
/// Sends current user's avatar to a chat as a message attachment.
|
||
/// Desktop parity: `onClickCamera()` in `DialogInput.tsx` → loads avatar → attaches as AVATAR type
|
||
/// → `prepareAttachmentsToSend()` encrypts blob → uploads to transport → sends PacketMessage.
|
||
func sendAvatar(toPublicKey: String, opponentTitle: String = "", opponentUsername: String = "") async throws {
|
||
guard let privKey = privateKeyHex, let hash = privateKeyHash else {
|
||
Self.logger.error("📤 Cannot send avatar — missing keys")
|
||
throw CryptoError.decryptionFailed
|
||
}
|
||
|
||
// Load avatar from local storage as base64 (desktop: avatars[0].avatar)
|
||
guard let avatarBase64 = AvatarRepository.shared.loadAvatarBase64(publicKey: currentPublicKey) else {
|
||
Self.logger.error("📤 No avatar to send")
|
||
return
|
||
}
|
||
|
||
let messageId = UUID().uuidString.replacingOccurrences(of: "-", with: "").lowercased()
|
||
let timestamp = Int64(Date().timeIntervalSince1970 * 1000)
|
||
let attachmentId = String((0..<8).map { _ in "abcdefghijklmnopqrstuvwxyz0123456789".randomElement()! })
|
||
|
||
// Android/Desktop parity: avatar messages have empty text.
|
||
// Desktop shows "$a=Avatar" in chat list ONLY if decrypted text is empty.
|
||
// Sending " " (space) causes Desktop chat list to show nothing.
|
||
let encrypted = try MessageCrypto.encryptOutgoing(
|
||
plaintext: "",
|
||
recipientPublicKeyHex: toPublicKey
|
||
)
|
||
|
||
// Attachment password: HEX encoding of raw 56-byte key+nonce.
|
||
// Desktop commit 61e83bd: changed from Buffer.toString('utf-8') to key.toString('hex').
|
||
// HEX is lossless for all byte values (no U+FFFD data loss).
|
||
let attachmentPassword = encrypted.plainKeyAndNonce.hexString
|
||
|
||
// aesChachaKey = Latin-1 encoding (matches desktop sync chain:
|
||
// Buffer.from(decryptedString, 'binary') takes low byte of each char).
|
||
// NEVER use WHATWG UTF-8 for aesChachaKey — U+FFFD round-trips as 0xFD, not original byte.
|
||
guard let latin1ForSync = String(data: encrypted.plainKeyAndNonce, encoding: .isoLatin1) else {
|
||
throw CryptoError.encryptionFailed
|
||
}
|
||
|
||
// Desktop parity: avatar blob is a full data URI (e.g. "data:image/png;base64,iVBOR...")
|
||
// not just raw base64. Desktop's AvatarProvider stores and sends data URIs.
|
||
let dataURI = "data:image/jpeg;base64,\(avatarBase64)"
|
||
let avatarData = Data(dataURI.utf8)
|
||
let encryptedBlob = try CryptoManager.shared.encryptWithPasswordDesktopCompat(
|
||
avatarData,
|
||
password: attachmentPassword
|
||
)
|
||
|
||
// Cache avatar locally BEFORE upload so outgoing avatar shows instantly
|
||
// (same pattern as sendMessageWithAttachments — AttachmentCache.saveImage before upload).
|
||
let avatarImage = AvatarRepository.shared.loadAvatar(publicKey: currentPublicKey)
|
||
if let avatarImage {
|
||
AttachmentCache.shared.saveImage(avatarImage, forAttachmentId: attachmentId)
|
||
}
|
||
|
||
// BlurHash for preview (computed before upload so optimistic UI has it)
|
||
let blurhash = avatarImage?.blurHash(numberOfComponents: (4, 3)) ?? ""
|
||
|
||
// Build aesChachaKey with Latin-1 payload (desktop sync parity)
|
||
let aesChachaPayload = Data(latin1ForSync.utf8)
|
||
let aesChachaKey = try CryptoManager.shared.encryptWithPasswordDesktopCompat(
|
||
aesChachaPayload,
|
||
password: privKey
|
||
)
|
||
|
||
// Build packet with avatar attachment — preview will be updated with tag after upload
|
||
var packet = PacketMessage()
|
||
packet.fromPublicKey = currentPublicKey
|
||
packet.toPublicKey = toPublicKey
|
||
packet.content = encrypted.content
|
||
packet.chachaKey = encrypted.chachaKey
|
||
packet.timestamp = timestamp
|
||
packet.privateKey = hash
|
||
packet.messageId = messageId
|
||
packet.aesChachaKey = aesChachaKey
|
||
packet.attachments = [
|
||
MessageAttachment(
|
||
id: attachmentId,
|
||
preview: blurhash, // Will be updated with "tag::blurhash" after upload
|
||
blob: "",
|
||
type: .avatar
|
||
),
|
||
]
|
||
|
||
// Ensure dialog exists
|
||
let existingDialog = DialogRepository.shared.dialogs[toPublicKey]
|
||
let title = !opponentTitle.isEmpty ? opponentTitle : (existingDialog?.opponentTitle ?? "")
|
||
let username = !opponentUsername.isEmpty ? opponentUsername : (existingDialog?.opponentUsername ?? "")
|
||
DialogRepository.shared.ensureDialog(
|
||
opponentKey: toPublicKey,
|
||
title: title,
|
||
username: username,
|
||
myPublicKey: currentPublicKey
|
||
)
|
||
|
||
// Optimistic UI — show message IMMEDIATELY (before upload)
|
||
MessageRepository.shared.upsertFromMessagePacket(
|
||
packet, myPublicKey: currentPublicKey, decryptedText: "",
|
||
attachmentPassword: "rawkey:" + encrypted.plainKeyAndNonce.hexString,
|
||
fromSync: false
|
||
)
|
||
DialogRepository.shared.updateDialogFromMessages(opponentKey: packet.toPublicKey)
|
||
|
||
// Upload encrypted blob to transport server in background (desktop: uploadFile)
|
||
let upload: (tag: String, server: String)
|
||
do {
|
||
upload = try await attachmentFlowTransport.uploadFile(
|
||
id: attachmentId,
|
||
content: Data(encryptedBlob.utf8)
|
||
)
|
||
} catch {
|
||
// Upload failed — mark as error
|
||
MessageRepository.shared.updateDeliveryStatus(messageId: messageId, status: .error)
|
||
DialogRepository.shared.updateDeliveryStatus(messageId: messageId, opponentKey: toPublicKey, status: .error)
|
||
Self.logger.error("📤 Avatar upload failed: \(error.localizedDescription)")
|
||
throw error
|
||
}
|
||
|
||
// Desktop parity: preview = pure blurhash (no tag prefix).
|
||
// Desktop MessageAvatar.tsx passes preview directly to blurhash decoder —
|
||
// including the "tag::" prefix causes "blurhash length mismatch" errors.
|
||
// CDN tag is stored in transportTag for download.
|
||
packet.attachments = [
|
||
MessageAttachment(
|
||
id: attachmentId,
|
||
preview: blurhash,
|
||
blob: "",
|
||
type: .avatar,
|
||
transportTag: upload.tag,
|
||
transportServer: upload.server
|
||
),
|
||
]
|
||
|
||
// Saved Messages — mark delivered locally but STILL send to server
|
||
// for cross-device avatar sync. Other devices receive via sync and
|
||
// update their local avatar cache.
|
||
if toPublicKey == currentPublicKey {
|
||
MessageRepository.shared.updateDeliveryStatus(messageId: messageId, status: .delivered)
|
||
DialogRepository.shared.updateDeliveryStatus(messageId: messageId, opponentKey: toPublicKey, status: .delivered)
|
||
// Send to server for multi-device sync (unlike text Saved Messages)
|
||
packetFlowSender.sendPacket(packet)
|
||
Self.logger.info("📤 Avatar synced to Saved Messages (multi-device) tag=\(upload.tag)")
|
||
return
|
||
}
|
||
|
||
packetFlowSender.sendPacket(packet)
|
||
registerOutgoingRetry(for: packet)
|
||
MessageRepository.shared.persistNow()
|
||
Self.logger.info("📤 Avatar sent to \(toPublicKey.prefix(12))… tag=\(upload.tag)")
|
||
}
|
||
|
||
/// Sends a message with image/file attachments.
|
||
///
|
||
/// Desktop parity: `prepareAttachmentsToSend()` in `DialogProvider.tsx` →
|
||
/// for each attachment: encrypt blob → upload to transport → set preview = tag → clear blob.
|
||
///
|
||
/// Flow per attachment:
|
||
/// 1. Build data URI (desktop: `FileReader.readAsDataURL()`)
|
||
/// 2. Encrypt with `encryptWithPasswordDesktopCompat(dataURI, password: latin1OfPlainKeyAndNonce)`
|
||
/// 3. Upload encrypted blob to transport → get server tag
|
||
/// 4. Set preview: IMAGE → `"tag::"`, FILE → `"tag::size::filename"`
|
||
/// 5. Clear blob (not sent over WebSocket)
|
||
func sendMessageWithAttachments(
|
||
text: String,
|
||
attachments: [PendingAttachment],
|
||
toPublicKey: String,
|
||
opponentTitle: String = "",
|
||
opponentUsername: String = ""
|
||
) async throws {
|
||
guard let privKey = privateKeyHex, let hash = privateKeyHash else {
|
||
Self.logger.error("📤 Cannot send — missing keys")
|
||
throw CryptoError.decryptionFailed
|
||
}
|
||
|
||
let messageId = UUID().uuidString.replacingOccurrences(of: "-", with: "").lowercased()
|
||
let timestamp = Int64(Date().timeIntervalSince1970 * 1000)
|
||
|
||
// Android parity: no caption → encrypt empty string "".
|
||
// Receivers decrypt "" → show "Photo"/"File" in chat list.
|
||
let messageText = text.trimmingCharacters(in: .whitespacesAndNewlines)
|
||
let encrypted = try MessageCrypto.encryptOutgoing(
|
||
plaintext: messageText.isEmpty ? "" : messageText,
|
||
recipientPublicKeyHex: toPublicKey
|
||
)
|
||
|
||
// Attachment password: HEX encoding of raw 56-byte key+nonce.
|
||
// Desktop commit 61e83bd: changed from Buffer.toString('utf-8') to key.toString('hex').
|
||
// HEX is lossless for all byte values (no U+FFFD data loss).
|
||
let attachmentPassword = encrypted.plainKeyAndNonce.hexString
|
||
|
||
#if DEBUG
|
||
let pwdUtf8Bytes = Array(attachmentPassword.utf8)
|
||
let pbkdf2Key = CryptoPrimitives.pbkdf2(
|
||
password: attachmentPassword, salt: "rosetta", iterations: 1000,
|
||
keyLength: 32, prf: CCPseudoRandomAlgorithm(kCCPRFHmacAlgSHA256)
|
||
)
|
||
Self.logger.debug("📎 rawKey (hex password): \(attachmentPassword)")
|
||
Self.logger.debug("📎 pwdUTF8(\(pwdUtf8Bytes.count)b): \(Data(pwdUtf8Bytes).hexString)")
|
||
Self.logger.debug("📎 pbkdf2Key: \(pbkdf2Key.hexString)")
|
||
#endif
|
||
|
||
// Phase 1: Encrypt all attachments sequentially (same password, CPU-bound).
|
||
struct EncryptedAttachment {
|
||
let original: PendingAttachment
|
||
let encryptedData: Data
|
||
let preview: String // partially built — tag placeholder
|
||
}
|
||
var encryptedAttachments: [EncryptedAttachment] = []
|
||
for attachment in attachments {
|
||
let dataURI = buildDataURI(attachment)
|
||
let encryptedBlob = try CryptoManager.shared.encryptWithPasswordDesktopCompat(
|
||
Data(dataURI.utf8),
|
||
password: attachmentPassword
|
||
)
|
||
|
||
#if DEBUG
|
||
// Self-test: decrypt with the SAME WHATWG password.
|
||
if let selfTestData = try? CryptoManager.shared.decryptWithPassword(
|
||
encryptedBlob, password: attachmentPassword, requireCompression: true
|
||
), String(data: selfTestData, encoding: .utf8)?.hasPrefix("data:") == true {
|
||
Self.logger.debug("📎 Blob self-test PASSED for \(attachment.id)")
|
||
} else {
|
||
Self.logger.error("📎 Blob self-test FAILED for \(attachment.id)")
|
||
}
|
||
#endif
|
||
|
||
// Pre-compute blurhash/preview prefix (everything except tag)
|
||
let previewSuffix: String
|
||
switch attachment.type {
|
||
case .image:
|
||
// Plain blurhash only — NO dimension suffix.
|
||
// Desktop's blurhash decoder (woltapp/blurhash) does strict length validation;
|
||
// appending "|WxH" causes it to throw → no preview on desktop.
|
||
// Android also sends plain blurhash without dimensions.
|
||
previewSuffix = attachment.thumbnail?.blurHash(numberOfComponents: (4, 3)) ?? ""
|
||
case .file:
|
||
previewSuffix = "\(attachment.fileSize ?? 0)::\(attachment.fileName ?? "file")"
|
||
default:
|
||
previewSuffix = ""
|
||
}
|
||
|
||
encryptedAttachments.append(EncryptedAttachment(
|
||
original: attachment,
|
||
encryptedData: Data(encryptedBlob.utf8),
|
||
preview: previewSuffix
|
||
))
|
||
}
|
||
|
||
// Android parity: cache original images and show optimistic message BEFORE upload.
|
||
// Android: addMessageSafely(optimisticMessage) → then background upload.
|
||
// Without this, photo doesn't appear until upload completes (5-10 seconds).
|
||
for item in encryptedAttachments {
|
||
if item.original.type == .image, let image = UIImage(data: item.original.data) {
|
||
AttachmentCache.shared.saveImage(image, forAttachmentId: item.original.id)
|
||
} else if item.original.type == .file {
|
||
AttachmentCache.shared.saveFile(
|
||
item.original.data,
|
||
forAttachmentId: item.original.id,
|
||
fileName: item.original.fileName ?? "file"
|
||
)
|
||
}
|
||
}
|
||
|
||
// Build placeholder attachments (no tag yet — filled after upload).
|
||
// preview = "::blurhash" (no tag prefix), blob = "".
|
||
let placeholderAttachments = encryptedAttachments.map { item in
|
||
MessageAttachment(
|
||
id: item.original.id,
|
||
preview: item.preview.isEmpty ? "" : "::\(item.preview)",
|
||
blob: "",
|
||
type: item.original.type
|
||
)
|
||
}
|
||
|
||
// Build aesChachaKey (for sync/backup — same encoding as makeOutgoingPacket).
|
||
// MUST use Latin-1 (not WHATWG UTF-8) so desktop can recover original raw bytes
|
||
// via Buffer.from(decryptedString, 'binary') which takes the low byte of each char.
|
||
// Latin-1 maps every byte 0x00-0xFF to its codepoint losslessly.
|
||
// WHATWG UTF-8 replaces invalid sequences with U+FFFD (codepoint 0xFFFD) —
|
||
// Buffer.from('\uFFFD', 'binary') recovers 0xFD, not the original byte.
|
||
guard let latin1ForSync = String(data: encrypted.plainKeyAndNonce, encoding: .isoLatin1) else {
|
||
throw CryptoError.encryptionFailed
|
||
}
|
||
let aesChachaPayload = Data(latin1ForSync.utf8)
|
||
let aesChachaKey = try CryptoManager.shared.encryptWithPasswordDesktopCompat(
|
||
aesChachaPayload,
|
||
password: privKey
|
||
)
|
||
|
||
#if DEBUG
|
||
// aesChachaKey round-trip self-test: simulates EXACT desktop sync chain.
|
||
do {
|
||
let rtDecrypted = try CryptoManager.shared.decryptWithPassword(
|
||
aesChachaKey, password: privKey, requireCompression: true
|
||
)
|
||
guard let rtString = String(data: rtDecrypted, encoding: .utf8) else {
|
||
Self.logger.error("📎 aesChachaKey FAILED — not valid UTF-8")
|
||
throw CryptoError.decryptionFailed
|
||
}
|
||
// Simulate Buffer.from(string, 'binary').toString('hex')
|
||
let rtRawBytes = Data(rtString.unicodeScalars.map { UInt8($0.value & 0xFF) })
|
||
let rtHex = rtRawBytes.hexString
|
||
let match = rtHex == attachmentPassword
|
||
Self.logger.debug("📎 aesChachaKey roundtrip: \(match ? "PASS" : "FAIL") (\(rtRawBytes.count) bytes recovered)")
|
||
} catch {
|
||
Self.logger.error("📎 aesChachaKey roundtrip FAILED: \(error)")
|
||
}
|
||
#endif
|
||
|
||
// ── Optimistic UI: show message INSTANTLY with placeholder attachments ──
|
||
// Android parity: addMessageSafely(optimisticMessage) before upload.
|
||
var optimisticPacket = PacketMessage()
|
||
optimisticPacket.fromPublicKey = currentPublicKey
|
||
optimisticPacket.toPublicKey = toPublicKey
|
||
optimisticPacket.content = encrypted.content
|
||
optimisticPacket.chachaKey = encrypted.chachaKey
|
||
optimisticPacket.timestamp = timestamp
|
||
optimisticPacket.privateKey = hash
|
||
optimisticPacket.messageId = messageId
|
||
optimisticPacket.aesChachaKey = aesChachaKey
|
||
optimisticPacket.attachments = placeholderAttachments
|
||
|
||
let existingDialog = DialogRepository.shared.dialogs[toPublicKey]
|
||
let title = !opponentTitle.isEmpty ? opponentTitle : (existingDialog?.opponentTitle ?? "")
|
||
let username = !opponentUsername.isEmpty ? opponentUsername : (existingDialog?.opponentUsername ?? "")
|
||
DialogRepository.shared.ensureDialog(
|
||
opponentKey: toPublicKey, title: title, username: username, myPublicKey: currentPublicKey
|
||
)
|
||
|
||
let displayText = messageText
|
||
|
||
// Android parity: always insert as WAITING (fromSync: false).
|
||
// Retry mechanism handles timeout (80s → ERROR).
|
||
MessageRepository.shared.upsertFromMessagePacket(
|
||
optimisticPacket, myPublicKey: currentPublicKey, decryptedText: displayText,
|
||
attachmentPassword: "rawkey:" + encrypted.plainKeyAndNonce.hexString, fromSync: false
|
||
)
|
||
DialogRepository.shared.updateDialogFromMessages(opponentKey: toPublicKey)
|
||
MessageRepository.shared.persistNow()
|
||
|
||
if toPublicKey == currentPublicKey {
|
||
// Keep self/saved files immediately openable without transport tag/download.
|
||
for item in encryptedAttachments where item.original.type == .file {
|
||
let fallback = AttachmentPreviewCodec.parseFilePreview(
|
||
item.preview,
|
||
fallbackFileName: item.original.fileName ?? "file",
|
||
fallbackFileSize: item.original.fileSize ?? item.original.data.count
|
||
)
|
||
_ = AttachmentCache.shared.saveFile(
|
||
item.original.data,
|
||
forAttachmentId: item.original.id,
|
||
fileName: fallback.fileName
|
||
)
|
||
}
|
||
MessageRepository.shared.updateDeliveryStatus(messageId: messageId, status: .delivered)
|
||
DialogRepository.shared.updateDeliveryStatus(messageId: messageId, opponentKey: toPublicKey, status: .delivered)
|
||
return
|
||
}
|
||
|
||
// ── Phase 2: Upload in background, then send packet ──
|
||
let flowTransport = attachmentFlowTransport
|
||
let messageAttachments: [MessageAttachment] = try await withThrowingTaskGroup(
|
||
of: (Int, String, String).self
|
||
) { group in
|
||
for (index, item) in encryptedAttachments.enumerated() {
|
||
group.addTask {
|
||
let result = try await flowTransport.uploadFile(
|
||
id: item.original.id, content: item.encryptedData
|
||
)
|
||
return (index, result.tag, result.server)
|
||
}
|
||
}
|
||
var uploads = [Int: (tag: String, server: String)]()
|
||
for try await (index, tag, server) in group { uploads[index] = (tag, server) }
|
||
return encryptedAttachments.enumerated().map { index, item in
|
||
let upload = uploads[index] ?? (tag: "", server: "")
|
||
// Desktop parity: preview = payload only (no tag prefix).
|
||
// Desktop MessageFile.tsx does preview.split("::")[0] for filesize —
|
||
// embedding tag prefix makes it parse the UUID as filesize → shows wrong filename.
|
||
// CDN tag stored in transportTag for download.
|
||
let preview = item.preview
|
||
Self.logger.info("📤 Attachment uploaded: type=\(String(describing: item.original.type)), tag=\(upload.tag), server=\(upload.server)")
|
||
return MessageAttachment(
|
||
id: item.original.id, preview: preview, blob: "",
|
||
type: item.original.type,
|
||
transportTag: upload.tag, transportServer: upload.server
|
||
)
|
||
}
|
||
}
|
||
|
||
// Update message with real attachment tags (preview with CDN tag)
|
||
MessageRepository.shared.updateAttachments(messageId: messageId, attachments: messageAttachments)
|
||
|
||
// Build final packet for WebSocket send
|
||
var packet = optimisticPacket
|
||
packet.attachments = messageAttachments
|
||
|
||
packetFlowSender.sendPacket(packet)
|
||
registerOutgoingRetry(for: packet)
|
||
MessageRepository.shared.persistNow()
|
||
Self.logger.info("📤 Message with \(attachments.count) attachment(s) sent to \(toPublicKey.prefix(12))…")
|
||
}
|
||
|
||
/// Builds a data URI from attachment data (desktop: `FileReader.readAsDataURL()`).
|
||
private func buildDataURI(_ attachment: PendingAttachment) -> String {
|
||
let base64 = attachment.data.base64EncodedString()
|
||
switch attachment.type {
|
||
case .image:
|
||
return "data:image/jpeg;base64,\(base64)"
|
||
case .file:
|
||
let mimeType = mimeTypeForFileName(attachment.fileName ?? "file")
|
||
return "data:\(mimeType);base64,\(base64)"
|
||
default:
|
||
return "data:application/octet-stream;base64,\(base64)"
|
||
}
|
||
}
|
||
|
||
/// Returns MIME type for a file name based on extension.
|
||
private func mimeTypeForFileName(_ name: String) -> String {
|
||
let ext = (name as NSString).pathExtension.lowercased()
|
||
switch ext {
|
||
case "jpg", "jpeg": return "image/jpeg"
|
||
case "png": return "image/png"
|
||
case "gif": return "image/gif"
|
||
case "pdf": return "application/pdf"
|
||
case "zip": return "application/zip"
|
||
case "mp4": return "video/mp4"
|
||
case "mp3": return "audio/mpeg"
|
||
case "doc", "docx": return "application/msword"
|
||
case "txt": return "text/plain"
|
||
default: return "application/octet-stream"
|
||
}
|
||
}
|
||
|
||
/// Sends a message with a reply/forward blob (AttachmentType.messages).
|
||
///
|
||
/// Desktop parity: `prepareAttachmentsToSend()` in `DialogProvider.tsx` →
|
||
/// for MESSAGES type: `encodeWithPassword(chacha_key_utf8, JSON.stringify(reply))`.
|
||
/// Android: `ChatViewModel.sendMessageWithReply()`.
|
||
///
|
||
/// The reply blob is a JSON array of `ReplyMessageData` objects, encrypted with
|
||
/// `encryptWithPasswordDesktopCompat()` using the WHATWG UTF-8 of plainKeyAndNonce.
|
||
func sendMessageWithReply(
|
||
text: String,
|
||
replyMessages: [ReplyMessageData],
|
||
toPublicKey: String,
|
||
opponentTitle: String = "",
|
||
opponentUsername: String = ""
|
||
) async throws {
|
||
guard let privKey = privateKeyHex, let hash = privateKeyHash else {
|
||
Self.logger.error("📤 Cannot send reply — missing keys")
|
||
throw CryptoError.decryptionFailed
|
||
}
|
||
|
||
let messageId = UUID().uuidString.replacingOccurrences(of: "-", with: "").lowercased()
|
||
let timestamp = Int64(Date().timeIntervalSince1970 * 1000)
|
||
|
||
// Forward messages have empty text — only the MESSAGES attachment carries content.
|
||
let messageText = text.trimmingCharacters(in: .whitespacesAndNewlines)
|
||
let encrypted = try MessageCrypto.encryptOutgoing(
|
||
plaintext: messageText,
|
||
recipientPublicKeyHex: toPublicKey
|
||
)
|
||
|
||
// Reply password: HEX encoding of raw 56-byte key+nonce.
|
||
// Desktop commit 61e83bd: changed from Buffer.toString('utf-8') to key.toString('hex').
|
||
let replyPassword = encrypted.plainKeyAndNonce.hexString
|
||
|
||
#if DEBUG
|
||
Self.logger.debug("📤 Reply password (hex): \(replyPassword)")
|
||
#endif
|
||
|
||
// Desktop commit aaa4b42: no re-upload needed for forwards.
|
||
// chacha_key_plain in ReplyMessageData carries the original message's key,
|
||
// so the recipient can decrypt original CDN blobs directly.
|
||
|
||
// Build the reply JSON blob
|
||
let replyJSON = try JSONEncoder().encode(replyMessages)
|
||
guard let replyJSONString = String(data: replyJSON, encoding: .utf8) else {
|
||
throw CryptoError.encryptionFailed
|
||
}
|
||
|
||
// Encrypt reply blob with desktop-compatible encryption
|
||
let encryptedReplyBlob = try CryptoManager.shared.encryptWithPasswordDesktopCompat(
|
||
replyJSON,
|
||
password: replyPassword
|
||
)
|
||
|
||
#if DEBUG
|
||
Self.logger.debug("📤 Reply blob: \(replyJSON.count) raw → \(encryptedReplyBlob.count) encrypted bytes")
|
||
// Self-test: decrypt with the same WHATWG password
|
||
if let selfTestData = try? CryptoManager.shared.decryptWithPassword(
|
||
encryptedReplyBlob, password: replyPassword, requireCompression: true
|
||
), String(data: selfTestData, encoding: .utf8) != nil {
|
||
Self.logger.debug("📤 Reply blob self-test PASSED")
|
||
} else {
|
||
Self.logger.error("📤 Reply blob self-test FAILED")
|
||
}
|
||
#endif
|
||
|
||
// Build reply attachment
|
||
let replyAttachmentId = "reply_\(timestamp)"
|
||
let replyAttachment = MessageAttachment(
|
||
id: replyAttachmentId,
|
||
preview: "",
|
||
blob: encryptedReplyBlob,
|
||
type: .messages
|
||
)
|
||
|
||
// Build aesChachaKey (same as sendMessageWithAttachments)
|
||
guard let latin1ForSync = String(data: encrypted.plainKeyAndNonce, encoding: .isoLatin1) else {
|
||
throw CryptoError.encryptionFailed
|
||
}
|
||
let aesChachaPayload = Data(latin1ForSync.utf8)
|
||
let aesChachaKey = try CryptoManager.shared.encryptWithPasswordDesktopCompat(
|
||
aesChachaPayload,
|
||
password: privKey
|
||
)
|
||
|
||
// Build packet for wire (encrypted blob)
|
||
var packet = PacketMessage()
|
||
packet.fromPublicKey = currentPublicKey
|
||
packet.toPublicKey = toPublicKey
|
||
packet.content = encrypted.content
|
||
packet.chachaKey = encrypted.chachaKey
|
||
packet.timestamp = timestamp
|
||
packet.privateKey = hash
|
||
packet.messageId = messageId
|
||
packet.aesChachaKey = aesChachaKey
|
||
packet.attachments = [replyAttachment]
|
||
|
||
// Build a local copy with decrypted blob for UI storage
|
||
// (incoming messages get decrypted in handleIncomingMessage; outgoing must be pre-decrypted)
|
||
let localReplyAttachment = MessageAttachment(
|
||
id: replyAttachmentId,
|
||
preview: "",
|
||
blob: replyJSONString, // Decrypted JSON for local rendering
|
||
type: .messages
|
||
)
|
||
var localPacket = packet
|
||
localPacket.attachments = [localReplyAttachment]
|
||
|
||
// Ensure dialog exists
|
||
let existingDialog = DialogRepository.shared.dialogs[toPublicKey]
|
||
let title = !opponentTitle.isEmpty ? opponentTitle : (existingDialog?.opponentTitle ?? "")
|
||
let username = !opponentUsername.isEmpty ? opponentUsername : (existingDialog?.opponentUsername ?? "")
|
||
DialogRepository.shared.ensureDialog(
|
||
opponentKey: toPublicKey,
|
||
title: title,
|
||
username: username,
|
||
myPublicKey: currentPublicKey
|
||
)
|
||
|
||
// Optimistic UI update — use localPacket (decrypted blob) for storage.
|
||
// Android parity: always insert as WAITING (fromSync: false).
|
||
let displayText = messageText
|
||
MessageRepository.shared.upsertFromMessagePacket(
|
||
localPacket,
|
||
myPublicKey: currentPublicKey,
|
||
decryptedText: displayText,
|
||
attachmentPassword: "rawkey:" + encrypted.plainKeyAndNonce.hexString,
|
||
fromSync: false
|
||
)
|
||
DialogRepository.shared.updateDialogFromMessages(opponentKey: localPacket.toPublicKey)
|
||
|
||
// Saved Messages: local-only
|
||
if toPublicKey == currentPublicKey {
|
||
MessageRepository.shared.updateDeliveryStatus(messageId: messageId, status: .delivered)
|
||
DialogRepository.shared.updateDeliveryStatus(
|
||
messageId: messageId, opponentKey: toPublicKey, status: .delivered
|
||
)
|
||
return
|
||
}
|
||
|
||
packetFlowSender.sendPacket(packet)
|
||
registerOutgoingRetry(for: packet)
|
||
MessageRepository.shared.persistNow()
|
||
Self.logger.info("📤 Reply message sent to \(toPublicKey.prefix(12))… with \(replyMessages.count) quoted message(s)")
|
||
}
|
||
|
||
/// Sends a call event message (AttachmentType.call, type=4) to dialog history.
|
||
/// Desktop/Android parity: caller emits one attachment with `preview = durationSec`.
|
||
func sendCallAttachment(
|
||
toPublicKey: String,
|
||
durationSec: Int,
|
||
opponentTitle: String = "",
|
||
opponentUsername: String = ""
|
||
) async throws {
|
||
guard let privKey = privateKeyHex, let hash = privateKeyHash else {
|
||
Self.logger.error("📤 Cannot send call attachment — missing keys")
|
||
throw CryptoError.decryptionFailed
|
||
}
|
||
|
||
let messageId = UUID().uuidString.replacingOccurrences(of: "-", with: "").lowercased()
|
||
let timestamp = Int64(Date().timeIntervalSince1970 * 1000)
|
||
|
||
let encrypted = try MessageCrypto.encryptOutgoing(
|
||
plaintext: "",
|
||
recipientPublicKeyHex: toPublicKey
|
||
)
|
||
|
||
guard let latin1ForSync = String(data: encrypted.plainKeyAndNonce, encoding: .isoLatin1) else {
|
||
throw CryptoError.encryptionFailed
|
||
}
|
||
let aesChachaPayload = Data(latin1ForSync.utf8)
|
||
let aesChachaKey = try CryptoManager.shared.encryptWithPasswordDesktopCompat(
|
||
aesChachaPayload,
|
||
password: privKey
|
||
)
|
||
|
||
var packet = PacketMessage()
|
||
packet.fromPublicKey = currentPublicKey
|
||
packet.toPublicKey = toPublicKey
|
||
packet.content = encrypted.content
|
||
packet.chachaKey = encrypted.chachaKey
|
||
packet.timestamp = timestamp
|
||
packet.privateKey = hash
|
||
packet.messageId = messageId
|
||
packet.aesChachaKey = aesChachaKey
|
||
packet.attachments = [
|
||
MessageAttachment(
|
||
id: String(UUID().uuidString.replacingOccurrences(of: "-", with: "").lowercased().prefix(16)),
|
||
preview: String(max(durationSec, 0)),
|
||
blob: "",
|
||
type: .call
|
||
),
|
||
]
|
||
|
||
let existingDialog = DialogRepository.shared.dialogs[toPublicKey]
|
||
let title = !opponentTitle.isEmpty ? opponentTitle : (existingDialog?.opponentTitle ?? "")
|
||
let username = !opponentUsername.isEmpty ? opponentUsername : (existingDialog?.opponentUsername ?? "")
|
||
DialogRepository.shared.ensureDialog(
|
||
opponentKey: toPublicKey,
|
||
title: title,
|
||
username: username,
|
||
myPublicKey: currentPublicKey
|
||
)
|
||
|
||
MessageRepository.shared.upsertFromMessagePacket(
|
||
packet,
|
||
myPublicKey: currentPublicKey,
|
||
decryptedText: "",
|
||
fromSync: false
|
||
)
|
||
DialogRepository.shared.updateDialogFromMessages(opponentKey: toPublicKey)
|
||
MessageRepository.shared.persistNow()
|
||
|
||
if toPublicKey == currentPublicKey {
|
||
MessageRepository.shared.updateDeliveryStatus(messageId: messageId, status: .delivered)
|
||
DialogRepository.shared.updateDeliveryStatus(
|
||
messageId: messageId,
|
||
opponentKey: toPublicKey,
|
||
status: .delivered
|
||
)
|
||
return
|
||
}
|
||
|
||
packetFlowSender.sendPacket(packet)
|
||
registerOutgoingRetry(for: packet)
|
||
}
|
||
|
||
/// Sends typing indicator with throttling (desktop parity: max once per 3s per dialog).
|
||
func sendTypingIndicator(toPublicKey: String) {
|
||
let base = toPublicKey.trimmingCharacters(in: .whitespacesAndNewlines)
|
||
let normalized = DatabaseManager.isGroupDialogKey(base)
|
||
? Self.normalizedGroupDialogIdentity(base)
|
||
: base
|
||
guard normalized != currentPublicKey,
|
||
!normalized.isEmpty,
|
||
let hash = privateKeyHash,
|
||
ProtocolManager.shared.connectionState == .authenticated
|
||
else { return }
|
||
|
||
let now = Int64(Date().timeIntervalSince1970 * 1000)
|
||
let lastSent = lastTypingSentAt[normalized] ?? 0
|
||
if now - lastSent < ProtocolConstants.typingThrottleMs {
|
||
return
|
||
}
|
||
lastTypingSentAt[normalized] = now
|
||
|
||
var packet = PacketTyping()
|
||
packet.privateKey = hash
|
||
packet.fromPublicKey = currentPublicKey
|
||
packet.toPublicKey = normalized
|
||
ProtocolManager.shared.sendPacket(packet)
|
||
}
|
||
|
||
/// Android parity: sends read receipt for direct/group dialog.
|
||
/// Uses timestamp dedup (not time-based throttle) — only sends if latest incoming
|
||
/// message timestamp > last sent read receipt timestamp. Retries once after 2s.
|
||
func sendReadReceipt(toPublicKey: String) {
|
||
let base = toPublicKey.trimmingCharacters(in: .whitespacesAndNewlines)
|
||
let normalized = DatabaseManager.isGroupDialogKey(base)
|
||
? Self.normalizedGroupDialogIdentity(base)
|
||
: base
|
||
let connState = ProtocolManager.shared.connectionState
|
||
guard normalized != currentPublicKey,
|
||
!normalized.isEmpty,
|
||
!SystemAccounts.isSystemAccount(normalized),
|
||
let hash = privateKeyHash,
|
||
connState == .authenticated
|
||
else {
|
||
return
|
||
}
|
||
|
||
// Android parity: timestamp dedup — only send if latest incoming message
|
||
// timestamp is newer than what we already sent a receipt for.
|
||
let latestTs = MessageRepository.shared.latestIncomingTimestamp(
|
||
for: normalized, myPublicKey: currentPublicKey
|
||
) ?? 0
|
||
let lastSentTs = lastReadReceiptTimestamp[normalized] ?? 0
|
||
if latestTs > 0, latestTs <= lastSentTs { return }
|
||
|
||
var packet = PacketRead()
|
||
packet.privateKey = hash
|
||
packet.fromPublicKey = currentPublicKey
|
||
packet.toPublicKey = normalized
|
||
ProtocolManager.shared.sendPacket(packet)
|
||
lastReadReceiptTimestamp[normalized] = latestTs
|
||
|
||
// Android parity: retry once after 2 seconds in case send failed silently.
|
||
Task { @MainActor [weak self] in
|
||
try? await Task.sleep(for: .seconds(2))
|
||
guard let self,
|
||
ProtocolManager.shared.connectionState == .authenticated,
|
||
let hash = self.privateKeyHash else { return }
|
||
var retryPacket = PacketRead()
|
||
retryPacket.privateKey = hash
|
||
retryPacket.fromPublicKey = self.currentPublicKey
|
||
retryPacket.toPublicKey = normalized
|
||
ProtocolManager.shared.sendPacket(retryPacket)
|
||
}
|
||
}
|
||
|
||
/// Updates locally cached display name and username (called from ProfileEditView).
|
||
func updateDisplayNameAndUsername(displayName: String, username: String) {
|
||
self.displayName = displayName
|
||
self.username = username
|
||
}
|
||
|
||
/// Ends the session and disconnects.
|
||
func endSession() {
|
||
ProtocolManager.shared.disconnect()
|
||
privateKeyHash = nil
|
||
privateKeyHex = nil
|
||
lastTypingSentAt.removeAll()
|
||
syncBatchInProgress = false
|
||
syncRequestInFlight = false
|
||
pendingSyncReads.removeAll()
|
||
pendingOpponentReads.removeAll()
|
||
pendingIncomingMessages.removeAll()
|
||
isProcessingIncomingMessages = false
|
||
lastReadReceiptTimestamp.removeAll()
|
||
requestedUserInfoKeys.removeAll()
|
||
pendingOutgoingRetryTasks.values.forEach { $0.cancel() }
|
||
pendingOutgoingRetryTasks.removeAll()
|
||
pendingOutgoingPackets.removeAll()
|
||
pendingOutgoingAttempts.removeAll()
|
||
isAuthenticated = false
|
||
currentPublicKey = ""
|
||
displayName = ""
|
||
username = ""
|
||
DialogRepository.shared.reset()
|
||
MessageRepository.shared.reset()
|
||
DatabaseManager.shared.close()
|
||
CryptoManager.shared.clearCaches()
|
||
AttachmentCache.shared.privateKey = nil
|
||
RecentSearchesRepository.shared.clearSession()
|
||
DraftManager.shared.reset()
|
||
CallManager.shared.resetForSessionEnd()
|
||
}
|
||
|
||
// MARK: - Protocol Callbacks
|
||
|
||
private func setupProtocolCallbacks() {
|
||
let proto = ProtocolManager.shared
|
||
|
||
proto.onMessageReceived = { [weak self] packet in
|
||
guard let self else { return }
|
||
Task { @MainActor [weak self] in
|
||
self?.enqueueIncomingMessage(packet)
|
||
}
|
||
}
|
||
|
||
proto.onDeliveryReceived = { [weak self] packet in
|
||
Task { @MainActor in
|
||
let opponentKey = MessageRepository.shared.dialogKey(forMessageId: packet.messageId)
|
||
?? packet.toPublicKey
|
||
// Desktop parity: update both status AND timestamp on delivery ACK.
|
||
let deliveryTimestamp = Int64(Date().timeIntervalSince1970 * 1000)
|
||
MessageRepository.shared.updateDeliveryStatus(
|
||
messageId: packet.messageId,
|
||
status: .delivered,
|
||
newTimestamp: deliveryTimestamp
|
||
)
|
||
// Android parity 1:1: full dialog recalculation after delivery status change.
|
||
DialogRepository.shared.updateDialogFromMessages(opponentKey: opponentKey)
|
||
self?.resolveOutgoingRetry(messageId: packet.messageId)
|
||
// Desktop parity (useDialogFiber.ts): update sync cursor on delivery ACK.
|
||
if let self, !self.syncBatchInProgress {
|
||
self.saveLastSyncTimestamp(deliveryTimestamp)
|
||
}
|
||
}
|
||
}
|
||
|
||
proto.onReadReceived = { [weak self] packet in
|
||
guard let self else { return }
|
||
Task { @MainActor in
|
||
guard let context = Self.resolveReadPacketContext(packet, ownKey: self.currentPublicKey) else {
|
||
Self.logger.debug(
|
||
"Skipping unsupported read packet: from=\(packet.fromPublicKey), to=\(packet.toPublicKey)"
|
||
)
|
||
return
|
||
}
|
||
|
||
let ownKey = self.currentPublicKey
|
||
let opponentKey = context.dialogKey
|
||
|
||
if context.fromMe {
|
||
// Android parity: read sync from another own device means
|
||
// incoming messages in this dialog should become read locally.
|
||
DialogRepository.shared.markAsRead(opponentKey: opponentKey)
|
||
MessageRepository.shared.markIncomingAsRead(
|
||
opponentKey: opponentKey,
|
||
myPublicKey: ownKey
|
||
)
|
||
// Race fix: if sync is in progress, the messages may not be
|
||
// in DB yet (PacketRead arrives before sync messages).
|
||
// Store for re-application at BATCH_END.
|
||
if self.syncBatchInProgress {
|
||
self.pendingSyncReads.insert(opponentKey)
|
||
}
|
||
return
|
||
}
|
||
|
||
DialogRepository.shared.markOutgoingAsRead(opponentKey: opponentKey)
|
||
MessageRepository.shared.markOutgoingAsRead(
|
||
opponentKey: opponentKey,
|
||
myPublicKey: ownKey
|
||
)
|
||
// Race fix: if sync is in progress, messages may not be in DB yet.
|
||
// markOutgoingAsRead() may have updated 0 rows. Store for re-application
|
||
// at BATCH_END after waitForInboundQueueToDrain() ensures all messages
|
||
// are committed. Android doesn't need this — single FIFO queue.
|
||
if self.syncBatchInProgress {
|
||
self.pendingOpponentReads.insert(opponentKey)
|
||
}
|
||
// Resolve pending retry timers for all messages to this opponent —
|
||
// read receipt proves delivery, no need to retry further.
|
||
self.resolveAllOutgoingRetries(toPublicKey: opponentKey)
|
||
// Desktop parity (useDialogFiber.ts): update sync cursor on read receipt.
|
||
if !self.syncBatchInProgress {
|
||
let nowMs = Int64(Date().timeIntervalSince1970 * 1000)
|
||
self.saveLastSyncTimestamp(nowMs)
|
||
}
|
||
}
|
||
}
|
||
|
||
proto.onTypingReceived = { [weak self] packet in
|
||
guard let self else { return }
|
||
Task { @MainActor in
|
||
guard let context = Self.resolveTypingPacketContext(packet, ownKey: self.currentPublicKey) else {
|
||
return
|
||
}
|
||
if context.fromMe { return }
|
||
MessageRepository.shared.markTyping(from: context.dialogKey)
|
||
}
|
||
}
|
||
|
||
proto.onOnlineStateReceived = { packet in
|
||
Task { @MainActor in
|
||
for entry in packet.entries {
|
||
DialogRepository.shared.updateOnlineState(
|
||
publicKey: entry.publicKey,
|
||
isOnline: entry.isOnline
|
||
)
|
||
}
|
||
}
|
||
}
|
||
|
||
proto.onUserInfoReceived = { [weak self] packet in
|
||
guard let self else { return }
|
||
Task { @MainActor in
|
||
Self.logger.debug("UserInfo received: username='\(packet.username)', title='\(packet.title)'")
|
||
if !packet.title.isEmpty {
|
||
self.displayName = packet.title
|
||
AccountManager.shared.updateProfile(displayName: packet.title, username: nil)
|
||
}
|
||
if !packet.username.isEmpty {
|
||
self.username = packet.username
|
||
AccountManager.shared.updateProfile(displayName: nil, username: packet.username)
|
||
}
|
||
NotificationCenter.default.post(name: .profileDidUpdate, object: nil)
|
||
}
|
||
}
|
||
|
||
// Note: onSearchResult is set by ChatListViewModel for user search.
|
||
// SessionManager does NOT override it — the ViewModel handles both
|
||
// displaying results and updating dialog user info.
|
||
|
||
proto.onHandshakeCompleted = { [weak self] _ in
|
||
guard let self else { return }
|
||
Task { @MainActor in
|
||
Self.logger.info("Handshake completed")
|
||
|
||
guard let hash = self.privateKeyHash else { return }
|
||
|
||
// Only send UserInfo if we have profile data to update
|
||
let name = self.displayName
|
||
let uname = self.username
|
||
if !name.isEmpty || !uname.isEmpty {
|
||
var userInfoPacket = PacketUserInfo()
|
||
userInfoPacket.username = uname
|
||
userInfoPacket.title = name
|
||
userInfoPacket.privateKey = hash
|
||
ProtocolManager.shared.sendPacket(userInfoPacket)
|
||
} else {
|
||
// No local profile — fetch from server (e.g. after import via seed phrase).
|
||
// Server may have our profile from a previous session on another device.
|
||
Self.logger.debug("Skipping UserInfo send — requesting own profile from server")
|
||
var searchPacket = PacketSearch()
|
||
searchPacket.privateKey = hash
|
||
searchPacket.search = self.currentPublicKey
|
||
ProtocolManager.shared.sendSearchPacket(searchPacket, channel: .userInfo)
|
||
}
|
||
|
||
// Reset sync state — if a previous connection dropped mid-sync,
|
||
// syncRequestInFlight would stay true and block all future syncs.
|
||
self.syncRequestInFlight = false
|
||
self.syncBatchInProgress = false
|
||
self.pendingIncomingMessages.removeAll()
|
||
self.isProcessingIncomingMessages = false
|
||
|
||
// Cancel stale retry timers from previous connection —
|
||
// they would fire and duplicate-send messages that are about to be retried fresh.
|
||
self.pendingOutgoingRetryTasks.values.forEach { $0.cancel() }
|
||
self.pendingOutgoingRetryTasks.removeAll()
|
||
self.pendingOutgoingPackets.removeAll()
|
||
self.pendingOutgoingAttempts.removeAll()
|
||
|
||
// Desktop parity: request message synchronization after authentication.
|
||
self.requestSynchronize()
|
||
// Safety net: reconcile dialog delivery indicators and unread counts
|
||
// with actual message statuses, fixing any desync from stale retry timers.
|
||
DialogRepository.shared.reconcileDeliveryStatuses()
|
||
DialogRepository.shared.reconcileUnreadCounts()
|
||
// NOTE: retryWaitingOutgoingMessages moved to NOT_NEEDED (Android parity:
|
||
// finishSyncCycle() retries AFTER sync completes, not during).
|
||
|
||
// Clear dedup sets on reconnect so subscriptions can be re-established lazily.
|
||
self.requestedUserInfoKeys.removeAll()
|
||
self.onlineSubscribedKeys.removeAll()
|
||
|
||
// Send push token to server for push notifications (Android parity).
|
||
self.sendPushTokenToServer()
|
||
CallManager.shared.onAuthenticated()
|
||
|
||
// Desktop parity: user info refresh is deferred until sync completes.
|
||
// Desktop fetches lazily per-component (useUserInformation); we do it
|
||
// in bulk after sync ends to avoid flooding the server during sync streaming.
|
||
}
|
||
}
|
||
|
||
proto.onSyncReceived = { [weak self] packet in
|
||
guard let self else { return }
|
||
Task { @MainActor in
|
||
self.syncRequestInFlight = false
|
||
|
||
switch packet.status {
|
||
case .batchStart:
|
||
self.syncBatchInProgress = true
|
||
Self.logger.debug("SYNC BATCH_START")
|
||
|
||
case .batchEnd:
|
||
// Desktop parity (useSynchronize.ts): await whenFinish() then
|
||
// save server cursor and request next batch.
|
||
await self.waitForInboundQueueToDrain()
|
||
let serverCursor = packet.timestamp
|
||
self.saveLastSyncTimestamp(serverCursor)
|
||
// Re-apply cross-device reads that arrived during sync.
|
||
// PacketRead can arrive BEFORE sync messages — markIncomingAsRead
|
||
// updates 0 rows because the message isn't in DB yet.
|
||
self.reapplyPendingSyncReads()
|
||
self.reapplyPendingOpponentReads()
|
||
// Android parity: reconcile unread counts after each batch.
|
||
DialogRepository.shared.reconcileUnreadCounts()
|
||
Self.logger.debug("SYNC BATCH_END cursor=\(serverCursor)")
|
||
self.requestSynchronize(cursor: serverCursor)
|
||
|
||
case .notNeeded:
|
||
// Android parity: finishSyncCycle() — clear flag, retry, refresh.
|
||
self.syncBatchInProgress = false
|
||
// Re-apply cross-device reads one final time.
|
||
self.reapplyPendingSyncReads()
|
||
self.reapplyPendingOpponentReads()
|
||
DialogRepository.shared.reconcileDeliveryStatuses()
|
||
DialogRepository.shared.reconcileUnreadCounts()
|
||
self.retryWaitingOutgoingMessagesAfterReconnect()
|
||
Self.logger.debug("SYNC NOT_NEEDED")
|
||
// Refresh user info now that sync is done.
|
||
Task { @MainActor [weak self] in
|
||
try? await Task.sleep(for: .milliseconds(300))
|
||
await self?.refreshOnlineStatusForAllDialogs()
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
proto.onRequestUpdateReceived = { packet in
|
||
Self.logger.debug("RequestUpdate packet received: server=\(packet.updateServer)")
|
||
}
|
||
|
||
proto.onCreateGroupReceived = { packet in
|
||
Self.logger.debug("CreateGroup packet received: groupId=\(packet.groupId)")
|
||
}
|
||
|
||
proto.onGroupInfoReceived = { packet in
|
||
Self.logger.debug("GroupInfo packet received: groupId=\(packet.groupId), members=\(packet.members.count)")
|
||
}
|
||
|
||
proto.onGroupInviteInfoReceived = { packet in
|
||
Self.logger.debug(
|
||
"GroupInviteInfo packet received: groupId=\(packet.groupId), members=\(packet.membersCount), status=\(packet.status.rawValue)"
|
||
)
|
||
}
|
||
|
||
proto.onGroupJoinReceived = { [weak self] packet in
|
||
guard let self else { return }
|
||
Task { @MainActor in
|
||
guard let privateKeyHex = self.privateKeyHex else { return }
|
||
guard !self.currentPublicKey.isEmpty else { return }
|
||
|
||
guard let dialogKey = GroupRepository.shared.upsertFromGroupJoin(
|
||
account: self.currentPublicKey,
|
||
privateKeyHex: privateKeyHex,
|
||
packet: packet
|
||
) else {
|
||
return
|
||
}
|
||
|
||
let metadata = GroupRepository.shared.groupMetadata(
|
||
account: self.currentPublicKey,
|
||
groupDialogKey: dialogKey
|
||
)
|
||
|
||
DialogRepository.shared.ensureDialog(
|
||
opponentKey: dialogKey,
|
||
title: metadata?.title ?? "",
|
||
username: metadata?.description ?? "",
|
||
myPublicKey: self.currentPublicKey
|
||
)
|
||
|
||
if MessageRepository.shared.lastDecryptedMessage(
|
||
account: self.currentPublicKey,
|
||
opponentKey: dialogKey
|
||
) != nil {
|
||
DialogRepository.shared.updateDialogFromMessages(opponentKey: dialogKey)
|
||
}
|
||
}
|
||
}
|
||
|
||
proto.onGroupLeaveReceived = { packet in
|
||
Self.logger.debug("GroupLeave packet received: groupId=\(packet.groupId)")
|
||
}
|
||
|
||
proto.onGroupBanReceived = { packet in
|
||
Self.logger.debug("GroupBan packet received: groupId=\(packet.groupId), publicKey=\(packet.publicKey)")
|
||
}
|
||
|
||
proto.onDeviceNewReceived = { [weak self] packet in
|
||
Task { @MainActor in
|
||
self?.handleDeviceNewLogin(packet)
|
||
}
|
||
}
|
||
}
|
||
|
||
private func handleDeviceNewLogin(_ packet: PacketDeviceNew) {
|
||
let myKey = currentPublicKey
|
||
guard !myKey.isEmpty else { return }
|
||
|
||
// Desktop parity: dotCenterIfNeeded(deviceId, 12, 4)
|
||
let truncId: String
|
||
if packet.deviceId.count > 12 {
|
||
truncId = "\(packet.deviceId.prefix(4))...\(packet.deviceId.suffix(4))"
|
||
} else {
|
||
truncId = packet.deviceId
|
||
}
|
||
|
||
// Desktop parity: useDeviceMessage.ts messageTemplate
|
||
let text = """
|
||
**Attempt to login from a new device**
|
||
|
||
We detected a login to your account from **\(packet.ipAddress)** a new device **by seed phrase**. If this was you, you can safely ignore this message.
|
||
|
||
**Arch:** \(packet.deviceOs)
|
||
**IP:** \(packet.ipAddress)
|
||
**Device:** \(packet.deviceName)
|
||
**ID:** \(truncId)
|
||
"""
|
||
|
||
let safeKey = SystemAccounts.safePublicKey
|
||
let timestamp = Int64(Date().timeIntervalSince1970 * 1000)
|
||
let messageId = UUID().uuidString
|
||
|
||
// Desktop parity: Safe account has verified: 1 (public figure/brand)
|
||
DialogRepository.shared.ensureDialog(
|
||
opponentKey: safeKey,
|
||
title: SystemAccounts.safeTitle,
|
||
username: "safe",
|
||
verified: 1,
|
||
myPublicKey: myKey
|
||
)
|
||
|
||
var fakePacket = PacketMessage()
|
||
fakePacket.fromPublicKey = safeKey
|
||
fakePacket.toPublicKey = myKey
|
||
fakePacket.messageId = messageId
|
||
fakePacket.timestamp = timestamp
|
||
|
||
MessageRepository.shared.upsertFromMessagePacket(
|
||
fakePacket, myPublicKey: myKey,
|
||
decryptedText: text, fromSync: false
|
||
)
|
||
DialogRepository.shared.updateDialogFromMessages(opponentKey: safeKey)
|
||
}
|
||
|
||
private func enqueueIncomingMessage(_ packet: PacketMessage) {
|
||
pendingIncomingMessages.append(packet)
|
||
guard !isProcessingIncomingMessages else { return }
|
||
isProcessingIncomingMessages = true
|
||
|
||
Task { @MainActor [weak self] in
|
||
guard let self else { return }
|
||
await self.processIncomingMessagesQueue()
|
||
}
|
||
}
|
||
|
||
private func processIncomingMessagesQueue() async {
|
||
// PERF: During sync, batch all message writes → single @Published event.
|
||
// Real-time messages publish immediately for instant UI feedback.
|
||
// Android parity: check thread-safe flag too (BATCH_START MainActor Task may lag).
|
||
let batching = syncBatchInProgress || ProtocolManager.shared.isSyncBatchActive
|
||
if batching { MessageRepository.shared.beginBatchUpdates() }
|
||
|
||
while !pendingIncomingMessages.isEmpty {
|
||
let batch = pendingIncomingMessages
|
||
pendingIncomingMessages.removeAll(keepingCapacity: true)
|
||
for packet in batch {
|
||
await processIncomingMessage(packet)
|
||
await Task.yield()
|
||
}
|
||
}
|
||
|
||
if batching { MessageRepository.shared.endBatchUpdates() }
|
||
isProcessingIncomingMessages = false
|
||
signalQueueDrained()
|
||
}
|
||
|
||
private func processIncomingMessage(_ packet: PacketMessage) async {
|
||
PerformanceLogger.shared.track("session.processIncoming")
|
||
let myKey = currentPublicKey
|
||
let currentPrivateKeyHex = self.privateKeyHex
|
||
let currentPrivateKeyHash = self.privateKeyHash
|
||
|
||
guard let context = Self.resolveMessagePacketContext(packet, ownKey: myKey) else {
|
||
return
|
||
}
|
||
let fromMe = context.fromMe
|
||
|
||
let opponentKey = context.dialogKey
|
||
let isGroupDialog = context.kind == .group
|
||
let wasKnownBefore = MessageRepository.shared.hasMessage(packet.messageId)
|
||
let groupKey: String? = {
|
||
guard isGroupDialog, let currentPrivateKeyHex else { return nil }
|
||
return GroupRepository.shared.groupKey(
|
||
account: myKey,
|
||
privateKeyHex: currentPrivateKeyHex,
|
||
groupDialogKey: opponentKey
|
||
)
|
||
}()
|
||
if isGroupDialog, groupKey == nil {
|
||
Self.logger.warning("processIncoming: group key not found for \(opponentKey)")
|
||
return
|
||
}
|
||
|
||
// ── PERF: Offload all crypto to background thread ──
|
||
// decryptIncomingMessage (ECDH + XChaCha20) and attachment blob
|
||
// decryption (PBKDF2 × N candidates) are CPU-heavy pure computation.
|
||
// Running them on MainActor blocked the UI for seconds during sync.
|
||
let cryptoResult = await Task.detached(priority: .userInitiated) {
|
||
Self.decryptAndProcessAttachments(
|
||
packet: packet,
|
||
myPublicKey: myKey,
|
||
privateKeyHex: currentPrivateKeyHex,
|
||
groupKey: groupKey
|
||
)
|
||
}.value
|
||
|
||
guard let cryptoResult else {
|
||
Self.logger.warning("processIncoming: decryptIncomingMessage returned nil for msgId=\(packet.messageId.prefix(8))…")
|
||
return
|
||
}
|
||
let text = cryptoResult.text
|
||
let processedPacket = cryptoResult.processedPacket
|
||
let resolvedAttachmentPassword = cryptoResult.attachmentPassword
|
||
|
||
// For outgoing messages received from the server (sent by another device
|
||
// on the same account), treat as sync-equivalent so status = .delivered.
|
||
// Without this, real-time fromMe messages get .waiting → timeout → .error.
|
||
// Android parity: also check isSyncBatchActive (set synchronously on receive
|
||
// queue) to handle race where BATCH_START MainActor Task hasn't run yet.
|
||
let effectiveFromSync = syncBatchInProgress || ProtocolManager.shared.isSyncBatchActive || fromMe
|
||
|
||
// Android parity: insert message to DB FIRST, then update dialog.
|
||
// Dialog's unreadCount uses COUNT(*) WHERE read=0, so the message
|
||
// must exist in DB before the count query runs.
|
||
MessageRepository.shared.upsertFromMessagePacket(
|
||
processedPacket,
|
||
myPublicKey: myKey,
|
||
decryptedText: text,
|
||
attachmentPassword: resolvedAttachmentPassword,
|
||
fromSync: effectiveFromSync,
|
||
dialogIdentityOverride: opponentKey
|
||
)
|
||
#if DEBUG
|
||
if processedPacket.attachments.contains(where: { $0.type == .call }) {
|
||
print("[CallAtt] Stored call attachment msgId=\(processedPacket.messageId.prefix(12))… text='\(text.prefix(20))' attCount=\(processedPacket.attachments.count)")
|
||
}
|
||
#endif
|
||
// Android parity 1:1: dialogDao.updateDialogFromMessages(account, opponentKey)
|
||
// Full recalculation of lastMessage, unread, iHaveSent, delivery from DB.
|
||
DialogRepository.shared.updateDialogFromMessages(opponentKey: opponentKey)
|
||
if isGroupDialog,
|
||
let metadata = GroupRepository.shared.groupMetadata(account: myKey, groupDialogKey: opponentKey) {
|
||
DialogRepository.shared.ensureDialog(
|
||
opponentKey: opponentKey,
|
||
title: metadata.title,
|
||
username: metadata.description,
|
||
myPublicKey: myKey
|
||
)
|
||
}
|
||
|
||
// Desktop parity: if we received a message from the opponent (not our own),
|
||
// they are clearly online — update their online status immediately.
|
||
// This supplements PacketOnlineState (0x05) which may arrive with delay.
|
||
if !fromMe && !effectiveFromSync && !isGroupDialog {
|
||
DialogRepository.shared.updateOnlineState(publicKey: opponentKey, isOnline: true)
|
||
}
|
||
|
||
let dialog = DialogRepository.shared.dialogs[opponentKey]
|
||
|
||
if !isGroupDialog, dialog?.opponentTitle.isEmpty == true {
|
||
requestUserInfoIfNeeded(opponentKey: opponentKey, privateKeyHash: currentPrivateKeyHash)
|
||
}
|
||
|
||
// Desktop parity: do NOT send PacketDelivery (0x08) back to server.
|
||
// The server auto-generates delivery confirmations when it forwards
|
||
// the message — the client never needs to acknowledge receipt explicitly.
|
||
// Sending 0x08 for every received message was causing a packet flood
|
||
// that triggered server RST disconnects.
|
||
|
||
// Android parity: mark as read if dialog is active AND app is in foreground.
|
||
// Android has NO idle detection — only isDialogActive flag (ON_RESUME/ON_PAUSE).
|
||
let dialogIsActive = MessageRepository.shared.isDialogActive(opponentKey)
|
||
let dialogIsReadEligible = MessageRepository.shared.isDialogReadEligible(opponentKey)
|
||
let isSystem = SystemAccounts.isSystemAccount(opponentKey)
|
||
let fg = isAppInForeground
|
||
let shouldMarkRead = dialogIsActive && dialogIsReadEligible && fg && !isSystem
|
||
|
||
if shouldMarkRead {
|
||
DialogRepository.shared.markAsRead(opponentKey: opponentKey)
|
||
MessageRepository.shared.markIncomingAsRead(
|
||
opponentKey: opponentKey,
|
||
myPublicKey: myKey
|
||
)
|
||
if !fromMe && !wasKnownBefore {
|
||
// Android/Desktop parity: send read receipt immediately,
|
||
// even during sync. 400ms debounce prevents flooding.
|
||
sendReadReceipt(toPublicKey: opponentKey)
|
||
}
|
||
}
|
||
|
||
// Desktop parity (useUpdateSyncTime.ts): no-op during SYNCHRONIZATION.
|
||
// Sync cursor is updated once at BATCH_END with the server's timestamp.
|
||
// Android parity: check both MainActor flag and thread-safe flag for race safety.
|
||
if !effectiveFromSync {
|
||
saveLastSyncTimestamp(packet.timestamp)
|
||
}
|
||
}
|
||
|
||
/// Continuations waiting for the inbound queue to drain.
|
||
private var drainContinuations: [CheckedContinuation<Void, Never>] = []
|
||
|
||
/// Signal all waiting continuations that the queue has drained.
|
||
private func signalQueueDrained() {
|
||
let waiting = drainContinuations
|
||
drainContinuations.removeAll()
|
||
for continuation in waiting {
|
||
continuation.resume()
|
||
}
|
||
}
|
||
|
||
/// Desktop parity (dialogQueue.ts `whenFinish`): waits indefinitely for all
|
||
/// enqueued message tasks to complete before advancing the sync cursor.
|
||
private func waitForInboundQueueToDrain() async {
|
||
// Fast path: already drained
|
||
if !isProcessingIncomingMessages && pendingIncomingMessages.isEmpty {
|
||
return
|
||
}
|
||
|
||
// Event-based: wait for signalQueueDrained()
|
||
await withCheckedContinuation { continuation in
|
||
self.drainContinuations.append(continuation)
|
||
}
|
||
}
|
||
|
||
// Sync cursor key (legacy UserDefaults, kept for migration)
|
||
private var syncCursorKey: String {
|
||
"rosetta_last_sync_\(currentPublicKey)"
|
||
}
|
||
|
||
// MARK: - Online Status Subscription
|
||
|
||
/// Subscribe to a single user's online status (called when opening a chat, like Android).
|
||
/// Only sends when authenticated — does NOT queue to avoid flooding server on reconnect.
|
||
func subscribeToOnlineStatus(publicKey: String) {
|
||
guard !publicKey.isEmpty,
|
||
ProtocolManager.shared.connectionState == .authenticated,
|
||
!onlineSubscribedKeys.contains(publicKey),
|
||
let hash = privateKeyHash
|
||
else { return }
|
||
onlineSubscribedKeys.insert(publicKey)
|
||
var packet = PacketOnlineSubscribe()
|
||
packet.privateKey = hash
|
||
packet.publicKeys = [publicKey]
|
||
ProtocolManager.shared.sendPacket(packet)
|
||
}
|
||
|
||
/// Force-refresh user info (including online status) by sending PacketSearch
|
||
/// without dedup check. Desktop parity: `forceUpdateUserInformation()`.
|
||
func forceRefreshUserInfo(publicKey: String) {
|
||
guard !publicKey.isEmpty,
|
||
let hash = privateKeyHash,
|
||
ProtocolManager.shared.connectionState == .authenticated
|
||
else { return }
|
||
var searchPacket = PacketSearch()
|
||
searchPacket.privateKey = hash
|
||
searchPacket.search = publicKey
|
||
ProtocolManager.shared.sendSearchPacket(searchPacket, channel: .userInfo)
|
||
}
|
||
|
||
private func requestSynchronize(cursor: Int64? = nil) {
|
||
// No connectionState guard: this method is only called from (1) handshake
|
||
// completion handler and (2) BATCH_END handler — both inherently authenticated.
|
||
// The old `connectionState == .authenticated` guard caused a race condition:
|
||
// ProtocolManager sets .authenticated in a separate MainActor task, so if
|
||
// requestSynchronize() ran first, the guard silently dropped the sync request.
|
||
guard !syncRequestInFlight else { return }
|
||
|
||
syncRequestInFlight = true
|
||
// Server and all platforms use MILLISECONDS for sync cursors.
|
||
// Pass cursor as-is — no normalization needed.
|
||
let lastSync = cursor ?? loadLastSyncTimestamp()
|
||
|
||
var packet = PacketSync()
|
||
packet.status = .notNeeded
|
||
packet.timestamp = lastSync
|
||
|
||
Self.logger.debug("Requesting sync with timestamp \(lastSync)")
|
||
ProtocolManager.shared.sendPacket(packet)
|
||
}
|
||
|
||
/// Android parity: sync cursor stored in SQLite (not UserDefaults).
|
||
/// Migrates from UserDefaults on first call.
|
||
private func loadLastSyncTimestamp() -> Int64 {
|
||
guard !currentPublicKey.isEmpty else { return 0 }
|
||
|
||
// Try SQLite first
|
||
let sqliteValue = DatabaseManager.shared.loadSyncCursor(account: currentPublicKey)
|
||
if sqliteValue > 0 { return sqliteValue }
|
||
|
||
// Migrate from UserDefaults (one-time)
|
||
let legacyValue = Int64(UserDefaults.standard.integer(forKey: syncCursorKey))
|
||
if legacyValue > 0 {
|
||
let normalized = legacyValue < 1_000_000_000_000 ? legacyValue * 1000 : legacyValue
|
||
DatabaseManager.shared.saveSyncCursor(account: currentPublicKey, timestamp: normalized)
|
||
UserDefaults.standard.removeObject(forKey: syncCursorKey)
|
||
return normalized
|
||
}
|
||
return 0
|
||
}
|
||
|
||
/// Android parity: sync cursor stored in SQLite. Monotonic only.
|
||
private func saveLastSyncTimestamp(_ raw: Int64) {
|
||
guard !currentPublicKey.isEmpty, raw > 0 else { return }
|
||
DatabaseManager.shared.saveSyncCursor(account: currentPublicKey, timestamp: raw)
|
||
}
|
||
|
||
// MARK: - Background Crypto (off MainActor)
|
||
|
||
/// Result of background crypto operations for an incoming message.
|
||
private struct IncomingCryptoResult: Sendable {
|
||
let text: String
|
||
let processedPacket: PacketMessage
|
||
let attachmentPassword: String?
|
||
}
|
||
|
||
/// Decrypts message text + attachment blobs on a background thread.
|
||
/// Pure computation — no UI or repository access.
|
||
nonisolated private static func decryptAndProcessAttachments(
|
||
packet: PacketMessage,
|
||
myPublicKey: String,
|
||
privateKeyHex: String?,
|
||
groupKey: String?
|
||
) -> IncomingCryptoResult? {
|
||
guard let result = decryptIncomingMessage(
|
||
packet: packet,
|
||
myPublicKey: myPublicKey,
|
||
privateKeyHex: privateKeyHex,
|
||
groupKey: groupKey
|
||
) else { return nil }
|
||
|
||
var processedPacket = packet
|
||
var resolvedAttachmentPassword: String?
|
||
|
||
if let keyData = result.rawKeyData {
|
||
resolvedAttachmentPassword = "rawkey:" + keyData.hexString
|
||
let passwordCandidates = MessageCrypto.attachmentPasswordCandidates(
|
||
from: resolvedAttachmentPassword!
|
||
)
|
||
|
||
for i in processedPacket.attachments.indices where processedPacket.attachments[i].type == .messages {
|
||
let blob = processedPacket.attachments[i].blob
|
||
guard !blob.isEmpty else { continue }
|
||
var decrypted = false
|
||
for password in passwordCandidates {
|
||
if let data = try? CryptoManager.shared.decryptWithPassword(
|
||
blob, password: password, requireCompression: true
|
||
),
|
||
let decryptedString = String(data: data, encoding: .utf8) {
|
||
processedPacket.attachments[i].blob = decryptedString
|
||
decrypted = true
|
||
break
|
||
}
|
||
}
|
||
if !decrypted {
|
||
for password in passwordCandidates {
|
||
if let data = try? CryptoManager.shared.decryptWithPassword(
|
||
blob, password: password
|
||
),
|
||
let decryptedString = String(data: data, encoding: .utf8) {
|
||
processedPacket.attachments[i].blob = decryptedString
|
||
break
|
||
}
|
||
}
|
||
}
|
||
}
|
||
} else if let groupKey {
|
||
for i in processedPacket.attachments.indices where processedPacket.attachments[i].type == .messages {
|
||
let blob = processedPacket.attachments[i].blob
|
||
guard !blob.isEmpty else { continue }
|
||
if let data = try? CryptoManager.shared.decryptWithPassword(blob, password: groupKey),
|
||
let decryptedString = String(data: data, encoding: .utf8) {
|
||
processedPacket.attachments[i].blob = decryptedString
|
||
}
|
||
}
|
||
}
|
||
|
||
return IncomingCryptoResult(
|
||
text: result.text,
|
||
processedPacket: processedPacket,
|
||
attachmentPassword: resolvedAttachmentPassword
|
||
)
|
||
}
|
||
|
||
/// Returns (decryptedText, rawKeyData) where rawKeyData can be used for attachment blob decryption.
|
||
nonisolated private static func decryptIncomingMessage(
|
||
packet: PacketMessage,
|
||
myPublicKey: String,
|
||
privateKeyHex: String?,
|
||
groupKey: String?
|
||
) -> (text: String, rawKeyData: Data?)? {
|
||
let isOwnMessage = packet.fromPublicKey == myPublicKey
|
||
|
||
if let groupKey {
|
||
if packet.content.isEmpty {
|
||
return ("", nil)
|
||
}
|
||
if let data = try? CryptoManager.shared.decryptWithPassword(
|
||
packet.content,
|
||
password: groupKey
|
||
), let text = String(data: data, encoding: .utf8) {
|
||
return (text, nil)
|
||
}
|
||
Self.logger.warning("Group decrypt failed for msgId=\(packet.messageId.prefix(8))…")
|
||
return nil
|
||
}
|
||
|
||
guard let privateKeyHex, !packet.content.isEmpty else {
|
||
return nil
|
||
}
|
||
|
||
// Own sync packets: prefer aesChachaKey (PBKDF2+AES encrypted key+nonce).
|
||
if isOwnMessage, !packet.aesChachaKey.isEmpty {
|
||
do {
|
||
let decryptedPayload = try CryptoManager.shared.decryptWithPassword(
|
||
packet.aesChachaKey,
|
||
password: privateKeyHex
|
||
)
|
||
// decryptedPayload = UTF-8 bytes of Latin-1 string.
|
||
// androidUtf8BytesToLatin1Bytes recovers the raw 56-byte key+nonce.
|
||
let keyAndNonce = MessageCrypto.androidUtf8BytesToLatin1Bytes(decryptedPayload)
|
||
let text = try MessageCrypto.decryptIncomingWithPlainKey(
|
||
ciphertext: packet.content,
|
||
plainKeyAndNonce: keyAndNonce
|
||
)
|
||
// Return raw 56 bytes (not the UTF-8 payload) so
|
||
// attachmentPasswordCandidates can correctly derive WHATWG/Android passwords.
|
||
return (text, keyAndNonce)
|
||
} catch {
|
||
Self.logger.debug("AES-CHACHA sync path failed, falling through to ECDH…")
|
||
}
|
||
}
|
||
|
||
// ECDH path (works for opponent messages, may work for own if chachaKey targets us)
|
||
guard !packet.chachaKey.isEmpty else {
|
||
return nil
|
||
}
|
||
|
||
do {
|
||
let (text, keyAndNonce) = try MessageCrypto.decryptIncomingFull(
|
||
ciphertext: packet.content,
|
||
encryptedKey: packet.chachaKey,
|
||
myPrivateKeyHex: privateKeyHex
|
||
)
|
||
return (text, keyAndNonce)
|
||
} catch {
|
||
Self.logger.warning("ECDH decrypt failed for msgId=\(packet.messageId.prefix(8))…: \(error)")
|
||
return nil
|
||
}
|
||
}
|
||
|
||
private enum PacketDialogKind {
|
||
case direct
|
||
case saved
|
||
case group
|
||
}
|
||
|
||
private struct PacketDialogContext {
|
||
let kind: PacketDialogKind
|
||
let dialogKey: String
|
||
let fromKey: String
|
||
let toKey: String
|
||
let fromMe: Bool
|
||
let toMe: Bool
|
||
}
|
||
|
||
private static func normalizedGroupDialogIdentity(_ value: String) -> String {
|
||
let trimmed = value.trimmingCharacters(in: .whitespacesAndNewlines)
|
||
let lower = trimmed.lowercased()
|
||
if lower.hasPrefix("group:") {
|
||
let id = String(trimmed.dropFirst("group:".count)).trimmingCharacters(in: .whitespacesAndNewlines)
|
||
return id.isEmpty ? trimmed : "#group:\(id)"
|
||
}
|
||
return trimmed
|
||
}
|
||
|
||
private static func isUnsupportedDialogKey(_ value: String) -> Bool {
|
||
let normalized = value.trimmingCharacters(in: .whitespacesAndNewlines).lowercased()
|
||
if normalized.isEmpty { return true }
|
||
if DatabaseManager.isGroupDialogKey(normalized) { return false }
|
||
return normalized.hasPrefix("#")
|
||
}
|
||
|
||
private static func isSupportedDirectPeerKey(_ peerKey: String, ownKey: String) -> Bool {
|
||
let normalized = peerKey.trimmingCharacters(in: .whitespacesAndNewlines)
|
||
if normalized.isEmpty { return false }
|
||
if normalized == ownKey { return true }
|
||
if SystemAccounts.isSystemAccount(normalized) { return true }
|
||
return !isUnsupportedDialogKey(normalized)
|
||
}
|
||
|
||
private static func resolveDialogContext(from: String, to: String, ownKey: String) -> PacketDialogContext? {
|
||
if ownKey.isEmpty { return nil }
|
||
let fromKey = from.trimmingCharacters(in: .whitespacesAndNewlines)
|
||
let toKey = to.trimmingCharacters(in: .whitespacesAndNewlines)
|
||
if fromKey.isEmpty || toKey.isEmpty { return nil }
|
||
|
||
if DatabaseManager.isGroupDialogKey(toKey) {
|
||
return PacketDialogContext(
|
||
kind: .group,
|
||
dialogKey: normalizedGroupDialogIdentity(toKey),
|
||
fromKey: fromKey,
|
||
toKey: toKey,
|
||
fromMe: fromKey == ownKey,
|
||
toMe: toKey == ownKey
|
||
)
|
||
}
|
||
|
||
if fromKey == ownKey {
|
||
guard isSupportedDirectPeerKey(toKey, ownKey: ownKey) else { return nil }
|
||
return PacketDialogContext(
|
||
kind: toKey == ownKey ? .saved : .direct,
|
||
dialogKey: toKey,
|
||
fromKey: fromKey,
|
||
toKey: toKey,
|
||
fromMe: true,
|
||
toMe: toKey == ownKey
|
||
)
|
||
}
|
||
if toKey == ownKey {
|
||
guard isSupportedDirectPeerKey(fromKey, ownKey: ownKey) else { return nil }
|
||
return PacketDialogContext(
|
||
kind: fromKey == ownKey ? .saved : .direct,
|
||
dialogKey: fromKey,
|
||
fromKey: fromKey,
|
||
toKey: toKey,
|
||
fromMe: false,
|
||
toMe: true
|
||
)
|
||
}
|
||
return nil
|
||
}
|
||
|
||
private static func resolveMessagePacketContext(_ packet: PacketMessage, ownKey: String) -> PacketDialogContext? {
|
||
resolveDialogContext(from: packet.fromPublicKey, to: packet.toPublicKey, ownKey: ownKey)
|
||
}
|
||
|
||
private static func resolveReadPacketContext(_ packet: PacketRead, ownKey: String) -> PacketDialogContext? {
|
||
resolveDialogContext(from: packet.fromPublicKey, to: packet.toPublicKey, ownKey: ownKey)
|
||
}
|
||
|
||
private static func resolveTypingPacketContext(_ packet: PacketTyping, ownKey: String) -> PacketDialogContext? {
|
||
resolveDialogContext(from: packet.fromPublicKey, to: packet.toPublicKey, ownKey: ownKey)
|
||
}
|
||
|
||
// MARK: - Test Support
|
||
|
||
static func testResolveMessagePacketContext(
|
||
_ packet: PacketMessage,
|
||
ownKey: String
|
||
) -> (kind: String, dialogKey: String, fromMe: Bool)? {
|
||
guard let context = resolveMessagePacketContext(packet, ownKey: ownKey) else { return nil }
|
||
let kind: String
|
||
switch context.kind {
|
||
case .direct: kind = "direct"
|
||
case .saved: kind = "saved"
|
||
case .group: kind = "group"
|
||
}
|
||
return (kind, context.dialogKey, context.fromMe)
|
||
}
|
||
|
||
static func testResolveReadPacketContext(
|
||
_ packet: PacketRead,
|
||
ownKey: String
|
||
) -> (kind: String, dialogKey: String, fromMe: Bool)? {
|
||
guard let context = resolveReadPacketContext(packet, ownKey: ownKey) else { return nil }
|
||
let kind: String
|
||
switch context.kind {
|
||
case .direct: kind = "direct"
|
||
case .saved: kind = "saved"
|
||
case .group: kind = "group"
|
||
}
|
||
return (kind, context.dialogKey, context.fromMe)
|
||
}
|
||
|
||
static func testResolveTypingPacketContext(
|
||
_ packet: PacketTyping,
|
||
ownKey: String
|
||
) -> (kind: String, dialogKey: String, fromMe: Bool)? {
|
||
guard let context = resolveTypingPacketContext(packet, ownKey: ownKey) else { return nil }
|
||
let kind: String
|
||
switch context.kind {
|
||
case .direct: kind = "direct"
|
||
case .saved: kind = "saved"
|
||
case .group: kind = "group"
|
||
}
|
||
return (kind, context.dialogKey, context.fromMe)
|
||
}
|
||
|
||
func testConfigureSessionForParityFlows(
|
||
currentPublicKey: String,
|
||
privateKeyHex: String,
|
||
privateKeyHash: String? = nil
|
||
) {
|
||
self.currentPublicKey = currentPublicKey
|
||
self.privateKeyHex = privateKeyHex
|
||
self.privateKeyHash = privateKeyHash ?? CryptoManager.shared.generatePrivateKeyHash(privateKeyHex: privateKeyHex)
|
||
}
|
||
|
||
func testResetParityFlowDependencies() {
|
||
attachmentFlowTransport = LiveAttachmentFlowTransport()
|
||
packetFlowSender = LivePacketFlowSender()
|
||
}
|
||
|
||
/// Public convenience for views that need to trigger a user-info fetch.
|
||
func requestUserInfoIfNeeded(forKey publicKey: String) {
|
||
requestUserInfoIfNeeded(opponentKey: publicKey, privateKeyHash: privateKeyHash)
|
||
}
|
||
|
||
private func requestUserInfoIfNeeded(opponentKey: String, privateKeyHash: String?) {
|
||
guard let privateKeyHash else { return }
|
||
let normalized = opponentKey.trimmingCharacters(in: .whitespacesAndNewlines)
|
||
guard !normalized.isEmpty else { return }
|
||
guard !DatabaseManager.isGroupDialogKey(normalized) else { return }
|
||
guard !requestedUserInfoKeys.contains(normalized) else { return }
|
||
|
||
requestedUserInfoKeys.insert(normalized)
|
||
|
||
var searchPacket = PacketSearch()
|
||
searchPacket.privateKey = privateKeyHash
|
||
searchPacket.search = normalized
|
||
ProtocolManager.shared.sendSearchPacket(searchPacket, channel: .userInfo)
|
||
}
|
||
|
||
/// Request user info for all existing dialog opponents after sync completes.
|
||
/// Desktop parity: useUserInformation sends PacketSearch(publicKey) lazily per-component.
|
||
/// We do it in bulk after sync — with generous staggering to avoid server rate-limiting.
|
||
/// PERF: cap at 30 dialogs to avoid sending 100+ PacketSearch requests after sync.
|
||
/// Each response triggers updateUserInfo → schedulePersist → JSON encoding cascade.
|
||
/// Missing names are prioritized; online status refresh is limited.
|
||
private func refreshOnlineStatusForAllDialogs() async {
|
||
let dialogs = DialogRepository.shared.dialogs
|
||
let ownKey = currentPublicKey
|
||
|
||
// Split into priority (missing name) and normal (has name, refresh online)
|
||
var missingName: [String] = []
|
||
var hasName: [String] = []
|
||
for (key, dialog) in dialogs {
|
||
guard key != ownKey, !key.isEmpty else { continue }
|
||
guard !DatabaseManager.isGroupDialogKey(key) else { continue }
|
||
if dialog.opponentTitle.isEmpty {
|
||
missingName.append(key)
|
||
} else {
|
||
hasName.append(key)
|
||
}
|
||
}
|
||
|
||
// Priority: fetch missing names first (generous 200ms stagger)
|
||
var count = 0
|
||
for key in missingName {
|
||
guard ProtocolManager.shared.connectionState == .authenticated else { break }
|
||
guard count < 30 else { break } // PERF: cap to prevent request storm
|
||
requestUserInfoIfNeeded(opponentKey: key, privateKeyHash: privateKeyHash)
|
||
count += 1
|
||
if count > 1 {
|
||
try? await Task.sleep(for: .milliseconds(200))
|
||
}
|
||
}
|
||
|
||
// Then refresh online status for recently active dialogs only (300ms stagger).
|
||
// Sort by lastMessageTimestamp descending — most recent chats first.
|
||
let recentKeys = hasName
|
||
.compactMap { key -> (String, Int64)? in
|
||
guard let dialog = dialogs[key] else { return nil }
|
||
return (key, dialog.lastMessageTimestamp)
|
||
}
|
||
.sorted { $0.1 > $1.1 }
|
||
.prefix(max(0, 30 - count))
|
||
.map(\.0)
|
||
|
||
for key in recentKeys {
|
||
guard ProtocolManager.shared.connectionState == .authenticated else { break }
|
||
requestUserInfoIfNeeded(opponentKey: key, privateKeyHash: privateKeyHash)
|
||
count += 1
|
||
if count > 1 {
|
||
try? await Task.sleep(for: .milliseconds(300))
|
||
}
|
||
}
|
||
Self.logger.info("Refreshed user info: \(missingName.count) missing names + \(recentKeys.count) online status = \(count) total (capped at 30)")
|
||
}
|
||
|
||
/// Persistent handler for ALL search results — updates dialog names/usernames from server data.
|
||
/// This runs independently of ChatListViewModel's search UI handler.
|
||
/// Also detects own profile in search results and updates SessionManager + AccountManager.
|
||
private func setupUserInfoSearchHandler() {
|
||
userInfoSearchHandlerToken = ProtocolManager.shared.addSearchResultHandler(channel: .userInfo) { [weak self] packet in
|
||
Task { @MainActor [weak self] in
|
||
guard let self else { return }
|
||
let ownKey = self.currentPublicKey
|
||
for user in packet.users {
|
||
guard !user.publicKey.isEmpty else { continue }
|
||
Self.logger.debug("🔍 Search result: \(user.publicKey.prefix(12))… title='\(user.title)' online=\(user.online) verified=\(user.verified)")
|
||
|
||
// Own profile from server — update local profile if we have empty data
|
||
if user.publicKey == ownKey {
|
||
var updated = false
|
||
if !user.title.isEmpty && self.displayName.isEmpty {
|
||
self.displayName = user.title
|
||
AccountManager.shared.updateProfile(displayName: user.title, username: nil)
|
||
Self.logger.info("Own profile restored from server: title='\(user.title)'")
|
||
updated = true
|
||
}
|
||
if !user.username.isEmpty && self.username.isEmpty {
|
||
self.username = user.username
|
||
AccountManager.shared.updateProfile(displayName: nil, username: user.username)
|
||
Self.logger.info("Own profile restored from server: username='\(user.username)'")
|
||
updated = true
|
||
}
|
||
if updated {
|
||
NotificationCenter.default.post(name: .profileDidUpdate, object: nil)
|
||
}
|
||
}
|
||
|
||
// Update user info + online status from search results
|
||
DialogRepository.shared.updateUserInfo(
|
||
publicKey: user.publicKey,
|
||
title: user.title,
|
||
username: user.username,
|
||
verified: user.verified,
|
||
online: user.online
|
||
)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
private func makeOutgoingPacket(
|
||
text: String,
|
||
toPublicKey: String,
|
||
messageId: String,
|
||
timestamp: Int64,
|
||
privateKeyHex: String,
|
||
privateKeyHash: String
|
||
) throws -> PacketMessage {
|
||
let normalizedTarget = toPublicKey.trimmingCharacters(in: .whitespacesAndNewlines)
|
||
if DatabaseManager.isGroupDialogKey(normalizedTarget) {
|
||
let normalizedGroupTarget = Self.normalizedGroupDialogIdentity(normalizedTarget)
|
||
guard let groupKey = GroupRepository.shared.groupKey(
|
||
account: currentPublicKey,
|
||
privateKeyHex: privateKeyHex,
|
||
groupDialogKey: normalizedGroupTarget
|
||
) else {
|
||
throw CryptoError.invalidData("Missing group key for \(normalizedGroupTarget)")
|
||
}
|
||
|
||
let encryptedContent = try CryptoManager.shared.encryptWithPasswordDesktopCompat(
|
||
Data(text.utf8),
|
||
password: groupKey
|
||
)
|
||
|
||
var packet = PacketMessage()
|
||
packet.fromPublicKey = currentPublicKey
|
||
packet.toPublicKey = normalizedGroupTarget
|
||
packet.content = encryptedContent
|
||
packet.chachaKey = ""
|
||
packet.timestamp = timestamp
|
||
packet.privateKey = privateKeyHash
|
||
packet.messageId = messageId
|
||
packet.aesChachaKey = ""
|
||
return packet
|
||
}
|
||
|
||
let encrypted = try MessageCrypto.encryptOutgoing(
|
||
plaintext: text,
|
||
recipientPublicKeyHex: normalizedTarget
|
||
)
|
||
|
||
guard let latin1String = String(data: encrypted.plainKeyAndNonce, encoding: .isoLatin1) else {
|
||
throw CryptoError.encryptionFailed
|
||
}
|
||
let aesChachaPayload = Data(latin1String.utf8)
|
||
let aesChachaKey = try CryptoManager.shared.encryptWithPasswordDesktopCompat(
|
||
aesChachaPayload,
|
||
password: privateKeyHex
|
||
)
|
||
|
||
var packet = PacketMessage()
|
||
packet.fromPublicKey = currentPublicKey
|
||
packet.toPublicKey = normalizedTarget
|
||
packet.content = encrypted.content
|
||
packet.chachaKey = encrypted.chachaKey
|
||
packet.timestamp = timestamp
|
||
packet.privateKey = privateKeyHash
|
||
packet.messageId = messageId
|
||
packet.aesChachaKey = aesChachaKey
|
||
return packet
|
||
}
|
||
|
||
private func retryWaitingOutgoingMessagesAfterReconnect() {
|
||
guard !currentPublicKey.isEmpty,
|
||
let privateKeyHex,
|
||
let privateKeyHash
|
||
else { return }
|
||
|
||
let now = Int64(Date().timeIntervalSince1970 * 1000)
|
||
// Android parity: MESSAGE_MAX_TIME_TO_DELIVERED_MS = 80_000.
|
||
let maxRetryAgeMs: Int64 = 80_000
|
||
|
||
// Android parity: batch-mark expired WAITING messages as ERROR first (SQL UPDATE).
|
||
let expiredCount = MessageRepository.shared.markExpiredWaitingAsError(
|
||
myPublicKey: currentPublicKey,
|
||
maxTimestamp: now - maxRetryAgeMs
|
||
)
|
||
if expiredCount > 0 {
|
||
Self.logger.warning("Marked \(expiredCount) expired WAITING messages as ERROR")
|
||
}
|
||
|
||
// Android parity: get remaining WAITING messages within the window.
|
||
let retryable = MessageRepository.shared.resolveRetryableOutgoingMessages(
|
||
myPublicKey: currentPublicKey,
|
||
nowMs: now,
|
||
maxRetryAgeMs: maxRetryAgeMs
|
||
)
|
||
|
||
for message in retryable {
|
||
if message.toPublicKey == currentPublicKey {
|
||
continue
|
||
}
|
||
|
||
let text = message.text.trimmingCharacters(in: .whitespacesAndNewlines)
|
||
guard !text.isEmpty else { continue }
|
||
|
||
// Update dialog delivery status back to .waiting (shows clock icon).
|
||
DialogRepository.shared.updateDeliveryStatus(
|
||
messageId: message.id,
|
||
opponentKey: message.toPublicKey,
|
||
status: .waiting
|
||
)
|
||
|
||
do {
|
||
// Fresh timestamp for retry: server silently rejects messages older than 30s
|
||
// (Executor6Message maxPaddingSec=30). Original timestamp would fail validation
|
||
// if reconnect took >30s. Server overwrites timestamp with System.currentTimeMillis()
|
||
// anyway (Executor6Message:102), so client timestamp is only for age validation.
|
||
let packet = try makeOutgoingPacket(
|
||
text: text,
|
||
toPublicKey: message.toPublicKey,
|
||
messageId: message.id,
|
||
timestamp: Int64(Date().timeIntervalSince1970 * 1000),
|
||
privateKeyHex: privateKeyHex,
|
||
privateKeyHash: privateKeyHash
|
||
)
|
||
ProtocolManager.shared.sendPacket(packet)
|
||
registerOutgoingRetry(for: packet)
|
||
Self.logger.info("Retrying message \(message.id.prefix(8))… to \(message.toPublicKey.prefix(12))…")
|
||
} catch {
|
||
Self.logger.error("Failed to retry message \(message.id): \(error.localizedDescription)")
|
||
MessageRepository.shared.updateDeliveryStatus(messageId: message.id, status: .error)
|
||
DialogRepository.shared.updateDeliveryStatus(
|
||
messageId: message.id,
|
||
opponentKey: message.toPublicKey,
|
||
status: .error
|
||
)
|
||
}
|
||
}
|
||
}
|
||
|
||
private func registerOutgoingRetry(for packet: PacketMessage) {
|
||
let messageId = packet.messageId
|
||
pendingOutgoingRetryTasks[messageId]?.cancel()
|
||
pendingOutgoingPackets[messageId] = packet
|
||
pendingOutgoingAttempts[messageId] = 0
|
||
scheduleOutgoingRetry(messageId: messageId)
|
||
}
|
||
|
||
private func scheduleOutgoingRetry(messageId: String) {
|
||
pendingOutgoingRetryTasks[messageId]?.cancel()
|
||
pendingOutgoingRetryTasks[messageId] = Task { @MainActor [weak self] in
|
||
guard let self else { return }
|
||
try? await Task.sleep(for: .seconds(4))
|
||
|
||
guard let packet = self.pendingOutgoingPackets[messageId] else { return }
|
||
let attempts = self.pendingOutgoingAttempts[messageId] ?? 0
|
||
|
||
// Android parity: flat 80s timeout (MESSAGE_MAX_TIME_TO_DELIVERED_MS).
|
||
let nowMs = Int64(Date().timeIntervalSince1970 * 1000)
|
||
let ageMs = nowMs - packet.timestamp
|
||
if ageMs >= self.maxOutgoingWaitingLifetimeMs {
|
||
// Android parity: mark as ERROR after timeout, not DELIVERED.
|
||
// If the message was actually delivered, server will send 0x08 on reconnect
|
||
// (or sync will restore the message). Marking DELIVERED optimistically
|
||
// hides the problem from the user — they think it was sent but it wasn't.
|
||
Self.logger.warning("Message \(messageId) — no ACK after \(ageMs)ms, marking as ERROR")
|
||
self.markOutgoingAsError(messageId: messageId, packet: packet)
|
||
return
|
||
}
|
||
|
||
guard attempts < self.maxOutgoingRetryAttempts else {
|
||
// Android parity: mark as ERROR after max retries.
|
||
// User can manually retry via error menu.
|
||
Self.logger.warning("Message \(messageId) — no ACK after \(attempts) retries, marking as ERROR")
|
||
self.markOutgoingAsError(messageId: messageId, packet: packet)
|
||
return
|
||
}
|
||
|
||
guard ProtocolManager.shared.connectionState == .authenticated else {
|
||
// Android parity: cancel retry when not authenticated — clean up in-memory state.
|
||
// retryWaitingOutgoingMessagesAfterReconnect() re-reads from DB after sync completes.
|
||
Self.logger.debug("Message \(messageId) retry deferred — not authenticated")
|
||
self.resolveOutgoingRetry(messageId: messageId)
|
||
return
|
||
}
|
||
|
||
let nextAttempt = attempts + 1
|
||
self.pendingOutgoingAttempts[messageId] = nextAttempt
|
||
Self.logger.warning("Retrying message \(messageId), attempt \(nextAttempt)")
|
||
ProtocolManager.shared.sendPacket(packet)
|
||
self.scheduleOutgoingRetry(messageId: messageId)
|
||
}
|
||
}
|
||
|
||
/// Optimistically mark an outgoing message as delivered when no ACK was received
|
||
/// but packets were successfully sent while authenticated. Most common cause:
|
||
/// server deduplicates by messageId (original was delivered before disconnect).
|
||
private func markOutgoingAsDelivered(messageId: String, packet: PacketMessage) {
|
||
let fromMe = packet.fromPublicKey == currentPublicKey
|
||
let opponentKey = fromMe ? packet.toPublicKey : packet.fromPublicKey
|
||
|
||
let deliveryTimestamp = Int64(Date().timeIntervalSince1970 * 1000)
|
||
MessageRepository.shared.updateDeliveryStatus(
|
||
messageId: messageId,
|
||
status: .delivered,
|
||
newTimestamp: deliveryTimestamp
|
||
)
|
||
DialogRepository.shared.updateDeliveryStatus(
|
||
messageId: messageId,
|
||
opponentKey: opponentKey,
|
||
status: .delivered
|
||
)
|
||
resolveOutgoingRetry(messageId: messageId)
|
||
}
|
||
|
||
/// Mark an outgoing message as error in both repositories and clean up retry state.
|
||
/// Guards against downgrading .delivered/.read → .error (e.g. if PacketDelivery/PacketRead
|
||
/// arrived while the retry timer was still running).
|
||
private func markOutgoingAsError(messageId: String, packet: PacketMessage) {
|
||
let fromMe = packet.fromPublicKey == currentPublicKey
|
||
let opponentKey = fromMe ? packet.toPublicKey : packet.fromPublicKey
|
||
|
||
let currentStatus = MessageRepository.shared.deliveryStatus(forMessageId: messageId)
|
||
if currentStatus == .delivered {
|
||
Self.logger.info("Skipping markOutgoingAsError for \(messageId.prefix(8))… — already delivered")
|
||
resolveOutgoingRetry(messageId: messageId)
|
||
return
|
||
}
|
||
|
||
MessageRepository.shared.updateDeliveryStatus(messageId: messageId, status: .error)
|
||
DialogRepository.shared.updateDeliveryStatus(
|
||
messageId: messageId,
|
||
opponentKey: opponentKey,
|
||
status: .error
|
||
)
|
||
resolveOutgoingRetry(messageId: messageId)
|
||
}
|
||
|
||
private func resolveOutgoingRetry(messageId: String) {
|
||
pendingOutgoingRetryTasks[messageId]?.cancel()
|
||
pendingOutgoingRetryTasks.removeValue(forKey: messageId)
|
||
pendingOutgoingPackets.removeValue(forKey: messageId)
|
||
pendingOutgoingAttempts.removeValue(forKey: messageId)
|
||
}
|
||
|
||
/// Resolve all pending outgoing retries for messages to a specific opponent.
|
||
/// Called when PacketRead (0x07) proves that messages were delivered.
|
||
private func resolveAllOutgoingRetries(toPublicKey: String) {
|
||
let matchingIds = pendingOutgoingPackets
|
||
.filter { $0.value.toPublicKey == toPublicKey }
|
||
.map { $0.key }
|
||
for messageId in matchingIds {
|
||
Self.logger.info("Resolving retry for \(messageId.prefix(8))… — read receipt received")
|
||
resolveOutgoingRetry(messageId: messageId)
|
||
}
|
||
}
|
||
|
||
// MARK: - Push Notifications
|
||
|
||
/// Stores the APNs device token received from AppDelegate.
|
||
/// Called from AppDelegate.didRegisterForRemoteNotificationsWithDeviceToken.
|
||
func setAPNsToken(_ token: String) {
|
||
UserDefaults.standard.set(token, forKey: "apns_device_token")
|
||
// If already authenticated, send immediately
|
||
if ProtocolManager.shared.connectionState == .authenticated {
|
||
sendPushTokenToServer()
|
||
}
|
||
}
|
||
|
||
/// Sends the stored APNs push token to the server via PacketPushNotification (0x10).
|
||
/// Android parity: called after successful handshake.
|
||
private func sendPushTokenToServer() {
|
||
guard let token = UserDefaults.standard.string(forKey: "apns_device_token"),
|
||
!token.isEmpty,
|
||
ProtocolManager.shared.connectionState == .authenticated
|
||
else { return }
|
||
|
||
var packet = PacketPushNotification()
|
||
packet.notificationsToken = token
|
||
packet.action = .subscribe
|
||
ProtocolManager.shared.sendPacket(packet)
|
||
Self.logger.info("Push token sent to server")
|
||
}
|
||
|
||
// MARK: - Release Notes (Desktop Parity)
|
||
|
||
/// Desktop parity: sends release notes as a local system message from
|
||
/// the "Rosetta Updates" account when the app version changes.
|
||
/// Desktop equivalent: `useUpdateMessage.ts` → `useSendSystemMessage("updates")`.
|
||
private func sendReleaseNotesIfNeeded(publicKey: String) {
|
||
let key = "lastReleaseNoticeVersion_\(publicKey)"
|
||
let lastKey = UserDefaults.standard.string(forKey: key) ?? ""
|
||
// Android parity: version + text hash — re-sends if text changed within same version.
|
||
let noticeText = ReleaseNotes.releaseNoticeText
|
||
guard !noticeText.isEmpty else { return }
|
||
// Use stable hash — Swift's hashValue is randomized per launch (ASLR seed).
|
||
// Android uses Java's hashCode() which is deterministic.
|
||
let stableHash = noticeText.utf8.reduce(0) { ($0 &* 31) &+ Int($1) }
|
||
let currentKey = "\(ReleaseNotes.appVersion)_\(stableHash)"
|
||
|
||
guard lastKey != currentKey else { return }
|
||
|
||
let now = Int64(Date().timeIntervalSince1970 * 1000)
|
||
let messageId = "release_notes_\(ReleaseNotes.appVersion)"
|
||
|
||
// Create synthetic PacketMessage — local-only, never sent to server.
|
||
var packet = PacketMessage()
|
||
packet.fromPublicKey = Account.updatesPublicKey
|
||
packet.toPublicKey = publicKey
|
||
packet.timestamp = now
|
||
packet.messageId = messageId
|
||
|
||
// Insert message into MessageRepository (delivered, already read).
|
||
MessageRepository.shared.upsertFromMessagePacket(
|
||
packet,
|
||
myPublicKey: publicKey,
|
||
decryptedText: noticeText,
|
||
fromSync: true // fromSync = true → message marked as delivered + read
|
||
)
|
||
|
||
// Android parity: recalculate dialog from DB after inserting release note.
|
||
DialogRepository.shared.updateDialogFromMessages(opponentKey: Account.updatesPublicKey)
|
||
// Force-clear unread for system Updates account — release notes are always "read".
|
||
DialogRepository.shared.markAsRead(opponentKey: Account.updatesPublicKey)
|
||
|
||
// Set system account display info (title, username, verified badge).
|
||
// Desktop parity: both system accounts use verified=1 (blue rosette badge).
|
||
DialogRepository.shared.updateUserInfo(
|
||
publicKey: Account.updatesPublicKey,
|
||
title: SystemAccounts.updatesTitle,
|
||
username: "updates",
|
||
verified: 1
|
||
)
|
||
|
||
UserDefaults.standard.set(currentKey, forKey: key)
|
||
Self.logger.info("Release notes v\(ReleaseNotes.appVersion) sent to Updates chat")
|
||
}
|
||
|
||
// MARK: - Foreground Observer (Android parity)
|
||
|
||
private func setupForegroundObserver() {
|
||
// Android parity: ON_RESUME → markVisibleMessagesAsRead() + reconnect + sync.
|
||
foregroundObserverToken = NotificationCenter.default.addObserver(
|
||
forName: UIApplication.willEnterForegroundNotification,
|
||
object: nil,
|
||
queue: .main
|
||
) { [weak self] _ in
|
||
Task { @MainActor [weak self] in
|
||
// Android parity (onResume line 428): clear ALL delivered notifications
|
||
UNUserNotificationCenter.current().removeAllDeliveredNotifications()
|
||
// Android: ON_RESUME calls markVisibleMessagesAsRead() for active dialog.
|
||
self?.markActiveDialogsAsRead()
|
||
|
||
// Android parity: on foreground resume, always force reconnect.
|
||
// Android's OkHttp fires onFailure in background → state is accurate.
|
||
// iOS URLSession does NOT fire didCloseWith when process is suspended →
|
||
// connectionState stays .authenticated (zombie socket). Ping-first
|
||
// added 3 seconds of delay waiting for pong that never comes.
|
||
// Match Android: just tear down + reconnect. If connection was alive,
|
||
// small cost (reconnect <1s). If zombie, no wasted 3s.
|
||
ProtocolManager.shared.forceReconnectOnForeground()
|
||
|
||
// syncOnForeground() has its own `.authenticated` guard — safe to call.
|
||
// If ping-first triggers reconnect, sync won't fire (state is .connecting).
|
||
// After reconnect + handshake, onHandshakeCompleted triggers requestSynchronize().
|
||
self?.syncOnForeground()
|
||
}
|
||
}
|
||
}
|
||
|
||
/// Android parity: `syncOnForeground()` — request sync on foreground resume
|
||
/// if already authenticated and 5s have passed since last foreground sync.
|
||
private func syncOnForeground() {
|
||
guard ProtocolManager.shared.connectionState == .authenticated else { return }
|
||
guard !syncBatchInProgress else { return }
|
||
guard !syncRequestInFlight else { return }
|
||
let now = Date().timeIntervalSince1970
|
||
guard now - lastForegroundSyncTime >= 5 else { return }
|
||
lastForegroundSyncTime = now
|
||
Self.logger.info("🔄 Sync on foreground resume")
|
||
requestSynchronize()
|
||
}
|
||
}
|