WIP: стабилизация звонков и E2EE + инструменты сборки WebRTC

This commit is contained in:
2026-03-25 22:20:24 +05:00
parent 530047c5d0
commit eea650face
8 changed files with 1119 additions and 219 deletions

View File

@@ -25,6 +25,7 @@ val gitShortSha = safeGitOutput("rev-parse", "--short", "HEAD") ?: "unknown"
// ═══════════════════════════════════════════════════════════
val rosettaVersionName = "1.3.0"
val rosettaVersionCode = 32 // Increment on each release
val customWebRtcAar = file("libs/libwebrtc-custom.aar")
android {
namespace = "com.rosetta.messenger"
@@ -182,8 +183,13 @@ dependencies {
implementation("androidx.camera:camera-lifecycle:1.3.1")
implementation("androidx.camera:camera-view:1.3.1")
// WebRTC for voice calls
implementation("io.github.webrtc-sdk:android:125.6422.07")
// WebRTC for voice calls.
// If app/libs/libwebrtc-custom.aar exists, prefer it (custom E2EE-enabled build).
if (customWebRtcAar.exists()) {
implementation(files(customWebRtcAar))
} else {
implementation("io.github.webrtc-sdk:android:125.6422.07")
}
// Baseline Profiles for startup performance
implementation("androidx.profileinstaller:profileinstaller:1.3.1")

View File

@@ -45,6 +45,280 @@ static void diag_write(const char *fmt, ...) {
if (n > 0) write(g_diag_fd, buf, n);
}
/* ── RTP helpers (for cases when additional_data is empty) ───── */
struct ParsedRtpPacket {
size_t header_size = 0;
uint16_t sequence = 0;
uint32_t timestamp = 0;
uint32_t ssrc = 0;
};
struct RtpProbeState {
bool locked = false;
bool has_probe = false;
uint32_t probe_ssrc = 0;
uint16_t probe_sequence = 0;
uint32_t probe_timestamp = 0;
uint32_t ssrc = 0;
uint16_t last_sequence = 0;
uint32_t last_timestamp = 0;
};
struct GeneratedTsState {
bool initialized = false;
uint32_t next_timestamp = 0;
uint32_t next_step = 960; // 20 ms @ 48 kHz (default Opus packetization)
};
struct AdditionalTsState {
bool initialized64 = false;
bool initialized32 = false;
uint64_t base64 = 0;
uint32_t base32 = 0;
};
static inline uint16_t load16_be(const uint8_t* p) {
return (uint16_t)(((uint16_t)p[0] << 8) | (uint16_t)p[1]);
}
static inline uint32_t load32_be(const uint8_t* p) {
return ((uint32_t)p[0] << 24) |
((uint32_t)p[1] << 16) |
((uint32_t)p[2] << 8) |
((uint32_t)p[3]);
}
static inline uint64_t load64_be(const uint8_t* p) {
return ((uint64_t)p[0] << 56) |
((uint64_t)p[1] << 48) |
((uint64_t)p[2] << 40) |
((uint64_t)p[3] << 32) |
((uint64_t)p[4] << 24) |
((uint64_t)p[5] << 16) |
((uint64_t)p[6] << 8) |
((uint64_t)p[7]);
}
static inline void store64_be(uint8_t* p, uint64_t v) {
p[0] = (uint8_t)(v >> 56);
p[1] = (uint8_t)(v >> 48);
p[2] = (uint8_t)(v >> 40);
p[3] = (uint8_t)(v >> 32);
p[4] = (uint8_t)(v >> 24);
p[5] = (uint8_t)(v >> 16);
p[6] = (uint8_t)(v >> 8);
p[7] = (uint8_t)(v);
}
static bool parse_rtp_packet(const uint8_t* data, size_t len, ParsedRtpPacket* out) {
if (!data || !out || len < 12) return false;
// RTP version must be 2.
const uint8_t version = (data[0] >> 6) & 0x03;
if (version != 2) return false;
const size_t csrc_count = (size_t)(data[0] & 0x0F);
const bool has_extension = (data[0] & 0x10) != 0;
size_t header = 12 + csrc_count * 4;
if (header > len) return false;
if (has_extension) {
// Extension header: 16-bit profile + 16-bit length (in 32-bit words)
if (len < header + 4) return false;
const uint16_t ext_len_words =
(uint16_t)(((uint16_t)data[header + 2] << 8) | (uint16_t)data[header + 3]);
const size_t ext_bytes = (size_t)ext_len_words * 4;
header += 4 + ext_bytes;
if (header > len) return false;
}
const size_t payload_size = len - header;
if (payload_size == 0 || payload_size > 1200) return false;
out->header_size = header;
out->sequence = load16_be(data + 2);
out->timestamp = load32_be(data + 4);
out->ssrc = load32_be(data + 8);
return true;
}
static bool fill_nonce_from_rtp_frame(const uint8_t* data,
size_t len,
RtpProbeState* state,
uint8_t nonce[24],
size_t* header_size) {
if (!state) return false;
ParsedRtpPacket packet;
if (!parse_rtp_packet(data, len, &packet)) return false;
if (!state->locked) {
if (!state->has_probe) {
state->has_probe = true;
state->probe_ssrc = packet.ssrc;
state->probe_sequence = packet.sequence;
state->probe_timestamp = packet.timestamp;
return false;
}
const bool same_ssrc = packet.ssrc == state->probe_ssrc;
const uint16_t seq_delta = (uint16_t)(packet.sequence - state->probe_sequence);
const bool sequence_progressed = seq_delta > 0 && seq_delta <= 10;
if (!same_ssrc || !sequence_progressed) {
state->probe_ssrc = packet.ssrc;
state->probe_sequence = packet.sequence;
state->probe_timestamp = packet.timestamp;
return false;
}
state->locked = true;
state->has_probe = false;
state->ssrc = packet.ssrc;
state->last_sequence = packet.sequence;
state->last_timestamp = packet.timestamp;
} else {
if (packet.ssrc != state->ssrc) {
state->locked = false;
state->has_probe = true;
state->probe_ssrc = packet.ssrc;
state->probe_sequence = packet.sequence;
state->probe_timestamp = packet.timestamp;
return false;
}
const uint16_t seq_delta = (uint16_t)(packet.sequence - state->last_sequence);
// Accept in-order packets and small jumps (packet loss).
if (seq_delta != 0 && seq_delta <= 200) {
state->last_sequence = packet.sequence;
state->last_timestamp = packet.timestamp;
} else if (seq_delta != 0) {
// Not plausible for a continuous stream: re-probe.
state->locked = false;
state->has_probe = true;
state->probe_ssrc = packet.ssrc;
state->probe_sequence = packet.sequence;
state->probe_timestamp = packet.timestamp;
return false;
}
}
nonce[4] = (uint8_t)(packet.timestamp >> 24);
nonce[5] = (uint8_t)(packet.timestamp >> 16);
nonce[6] = (uint8_t)(packet.timestamp >> 8);
nonce[7] = (uint8_t)(packet.timestamp);
if (header_size) *header_size = packet.header_size;
return true;
}
static bool fill_nonce_from_additional_data(const uint8_t* data,
size_t len,
uint8_t nonce[24],
AdditionalTsState* ts_state,
bool normalize_timestamps,
bool* used_normalized,
bool* used_rtp_header) {
if (used_normalized) *used_normalized = false;
if (used_rtp_header) *used_rtp_header = false;
if (!data || len < 8) return false;
// Common native WebRTC layout: additional_data is RTP header bytes.
if (len >= 12) {
const uint8_t version = (data[0] >> 6) & 0x03;
if (version == 2) {
uint32_t ts = load32_be(data + 4);
if (normalize_timestamps && ts_state) {
if (!ts_state->initialized32) {
ts_state->initialized32 = true;
ts_state->base32 = ts;
}
ts = (uint32_t)(ts - ts_state->base32);
if (used_normalized) *used_normalized = true;
}
nonce[4] = (uint8_t)(ts >> 24);
nonce[5] = (uint8_t)(ts >> 16);
nonce[6] = (uint8_t)(ts >> 8);
nonce[7] = (uint8_t)(ts);
if (used_rtp_header) *used_rtp_header = true;
return true;
}
}
// Generic 8-byte timestamp layout (desktop's nonce[0..7] layout).
uint64_t ts = load64_be(data);
if (normalize_timestamps && ts_state) {
if (!ts_state->initialized64) {
ts_state->initialized64 = true;
ts_state->base64 = ts;
}
ts = (uint64_t)(ts - ts_state->base64);
if (used_normalized) *used_normalized = true;
}
store64_be(nonce, ts);
return true;
}
static inline void fill_nonce_from_ts32(uint32_t ts, uint8_t nonce[24]) {
nonce[4] = (uint8_t)(ts >> 24);
nonce[5] = (uint8_t)(ts >> 16);
nonce[6] = (uint8_t)(ts >> 8);
nonce[7] = (uint8_t)(ts);
}
static inline uint32_t opus_base_frame_samples(uint8_t config) {
// RFC 6716 TOC config mapping at 48 kHz.
if (config <= 11) {
// SILK: 10/20/40/60 ms
static const uint32_t kSilk[4] = {480, 960, 1920, 2880};
return kSilk[config & 0x03];
}
if (config <= 15) {
// Hybrid: 10/20 ms
return (config & 0x01) ? 960 : 480;
}
// CELT-only: 2.5/5/10/20 ms
static const uint32_t kCelt[4] = {120, 240, 480, 960};
return kCelt[config & 0x03];
}
static uint32_t infer_opus_packet_duration_samples(const uint8_t* packet, size_t len) {
if (!packet || len == 0) return 960;
const uint8_t toc = packet[0];
const uint8_t config = (uint8_t)(toc >> 3);
const uint8_t frame_code = (uint8_t)(toc & 0x03);
uint32_t frame_count = 1;
if (frame_code == 1 || frame_code == 2) {
frame_count = 2;
} else if (frame_code == 3) {
if (len < 2) return 960;
frame_count = (uint32_t)(packet[1] & 0x3F);
if (frame_count == 0 || frame_count > 48) return 960;
}
uint32_t base = opus_base_frame_samples(config);
uint32_t total = base * frame_count;
if (total < 120 || total > 5760) return 960;
return total;
}
static bool is_plausible_opus_packet(const uint8_t* packet, size_t len) {
if (!packet || len == 0 || len > 2000) return false;
const uint8_t toc = packet[0];
const uint8_t config = (uint8_t)(toc >> 3);
if (config > 31) return false;
const uint8_t frame_code = (uint8_t)(toc & 0x03);
if (frame_code != 3) return true;
if (len < 2) return false;
const uint8_t frame_count = (uint8_t)(packet[1] & 0x3F);
if (frame_count == 0 || frame_count > 48) return false;
const uint32_t total = opus_base_frame_samples(config) * (uint32_t)frame_count;
return total <= 5760;
}
/* ── Native crash handler — writes to file before dying ──────── */
static char g_crash_path[512] = {0};
@@ -98,57 +372,114 @@ public:
}
/**
* Frame format: [4-byte counter BE] + [xchacha20_xor(frame)]
* Desktop-compatible frame format: ciphertext only (no custom prefix).
*
* Nonce (24 bytes): [0,0,0,0, counter_BE_4bytes, 0,...,0]
* This matches Desktop's layout where nonce[4..7] = timestamp.
* The counter is embedded so the receiver can reconstruct the nonce
* even if frames are dropped/reordered.
* Nonce (24 bytes) is derived exactly like desktop:
* - nonce[0..3] = 0
* - nonce[4..7] = RTP timestamp
* - nonce[8..23] = 0
*
* Primary source of timestamp: additional_data[4..7] (if provided by WebRTC).
* Fallback (Android path where additional_data can be empty):
* parse RTP header from frame and take timestamp from frame[4..7].
*
* If RTP header is found inside frame, we leave header bytes unencrypted
* and encrypt only payload (desktop-compatible).
*/
int Encrypt(cricket::MediaType /*media_type*/,
uint32_t /*ssrc*/,
rtc::ArrayView<const uint8_t> /*additional_data*/,
rtc::ArrayView<const uint8_t> additional_data,
rtc::ArrayView<const uint8_t> frame,
rtc::ArrayView<uint8_t> encrypted_frame,
size_t* bytes_written) override {
const size_t HEADER = 4; // counter prefix
if (frame.size() == 0 || encrypted_frame.size() < frame.size() + HEADER) {
if (frame.size() == 0 || encrypted_frame.size() < frame.size()) {
*bytes_written = 0;
return -1;
}
uint32_t ctr = counter_.fetch_add(1, std::memory_order_relaxed);
size_t header_size = 0;
bool nonce_from_rtp_header = false;
bool nonce_from_generated_ts = false;
bool nonce_from_additional_data = false;
bool nonce_from_additional_normalized = false;
bool additional_was_rtp_header = false;
uint32_t generated_ts_used = 0;
// Write 4-byte counter as big-endian prefix
encrypted_frame.data()[0] = (uint8_t)(ctr >> 24);
encrypted_frame.data()[1] = (uint8_t)(ctr >> 16);
encrypted_frame.data()[2] = (uint8_t)(ctr >> 8);
encrypted_frame.data()[3] = (uint8_t)(ctr);
// Build nonce from counter (same positions as Desktop's timestamp)
// Build nonce from RTP timestamp in additional_data (preferred).
uint8_t nonce[24] = {0};
nonce[4] = encrypted_frame.data()[0];
nonce[5] = encrypted_frame.data()[1];
nonce[6] = encrypted_frame.data()[2];
nonce[7] = encrypted_frame.data()[3];
nonce_from_additional_data = fill_nonce_from_additional_data(
additional_data.data(),
additional_data.size(),
nonce,
&additional_ts_,
true,
&nonce_from_additional_normalized,
&additional_was_rtp_header);
if (!nonce_from_additional_data) {
nonce_from_rtp_header =
fill_nonce_from_rtp_frame(frame.data(), frame.size(), &rtp_probe_, nonce, &header_size);
if (!nonce_from_rtp_header) {
if (!generated_ts_.initialized) {
generated_ts_.initialized = true;
generated_ts_.next_timestamp = 0;
generated_ts_.next_step = 960;
}
nonce_from_generated_ts = true;
generated_ts_used = generated_ts_.next_timestamp;
fill_nonce_from_ts32(generated_ts_used, nonce);
}
}
rosetta_xchacha20_xor(encrypted_frame.data() + HEADER,
frame.data(), frame.size(), nonce, key_);
*bytes_written = frame.size() + HEADER;
if (nonce_from_rtp_header && header_size <= frame.size()) {
// Keep RTP header clear, encrypt payload only.
if (header_size > 0) {
memcpy(encrypted_frame.data(), frame.data(), header_size);
}
const size_t payload_size = frame.size() - header_size;
rosetta_xchacha20_xor(
encrypted_frame.data() + header_size,
frame.data() + header_size,
payload_size,
nonce,
key_);
} else {
// Legacy path: frame is payload-only.
rosetta_xchacha20_xor(encrypted_frame.data(),
frame.data(), frame.size(), nonce, key_);
}
*bytes_written = frame.size();
if (nonce_from_generated_ts) {
const uint32_t step = infer_opus_packet_duration_samples(frame.data(), frame.size());
generated_ts_.next_step = step;
generated_ts_.next_timestamp = generated_ts_used + step;
}
// Diag: log first 3 frames
int n = diag_count_.fetch_add(1, std::memory_order_relaxed);
if (n < 3) {
LOGI("ENC frame#%d: sz=%zu ctr=%u out=%zu",
n, frame.size(), ctr, frame.size() + HEADER);
diag_write("ENC frame#%d: sz=%zu ctr=%u nonce[4..7]=%02x%02x%02x%02x\n",
n, frame.size(), ctr, nonce[4], nonce[5], nonce[6], nonce[7]);
const char* mode =
nonce_from_rtp_header
? "rtp"
: (nonce_from_generated_ts
? "gen"
: (nonce_from_additional_data
? (additional_was_rtp_header
? (nonce_from_additional_normalized ? "ad-rtp-norm" : "ad-rtp")
: (nonce_from_additional_normalized ? "raw-norm" : "raw-abs"))
: "raw-abs"));
LOGI("ENC frame#%d: sz=%zu ad=%zu hdr=%zu mode=%s nonce=%02x%02x%02x%02x",
n, frame.size(), additional_data.size(), header_size, mode,
nonce[4], nonce[5], nonce[6], nonce[7]);
diag_write("ENC frame#%d: sz=%zu ad=%zu hdr=%zu mode=%s nonce[4..7]=%02x%02x%02x%02x\n",
n, frame.size(), additional_data.size(), header_size, mode,
nonce[4], nonce[5], nonce[6], nonce[7]);
}
return 0;
}
size_t GetMaxCiphertextByteSize(cricket::MediaType, size_t frame_size) override {
return frame_size + 4; // +4 for counter prefix
return frame_size;
}
protected:
@@ -156,8 +487,10 @@ protected:
private:
mutable std::atomic<int> ref_{0};
mutable std::atomic<uint32_t> counter_{0};
mutable std::atomic<int> diag_count_{0};
mutable RtpProbeState rtp_probe_;
mutable GeneratedTsState generated_ts_;
mutable AdditionalTsState additional_ts_;
uint8_t key_[32];
};
@@ -185,57 +518,180 @@ public:
}
/**
* Decrypt frame: read 4-byte counter prefix → derive nonce → decrypt.
* If frame has no prefix (< 5 bytes or from Desktop), fallback to
* nonce derived from additional_data (RTP header) or zeros.
* Desktop-compatible decrypt:
* - nonce from RTP timestamp
* - if RTP header is present inside encrypted_frame (fallback path),
* keep header bytes untouched and decrypt payload only.
*/
Result Decrypt(cricket::MediaType /*media_type*/,
const std::vector<uint32_t>& /*csrcs*/,
rtc::ArrayView<const uint8_t> additional_data,
rtc::ArrayView<const uint8_t> encrypted_frame,
rtc::ArrayView<uint8_t> frame) override {
const size_t HEADER = 4;
uint8_t nonce[24] = {0};
const uint8_t *payload;
size_t payload_sz;
if (encrypted_frame.size() > HEADER) {
// Android format: [4-byte counter] + [encrypted data]
nonce[4] = encrypted_frame.data()[0];
nonce[5] = encrypted_frame.data()[1];
nonce[6] = encrypted_frame.data()[2];
nonce[7] = encrypted_frame.data()[3];
payload = encrypted_frame.data() + HEADER;
payload_sz = encrypted_frame.size() - HEADER;
} else {
// Fallback: no counter prefix
payload = encrypted_frame.data();
payload_sz = encrypted_frame.size();
size_t header_size = 0;
bool nonce_from_rtp_header = false;
bool nonce_from_generated_ts = false;
bool nonce_from_additional_data = false;
bool nonce_from_additional_normalized = false;
bool additional_was_rtp_header = false;
bool used_absolute_additional_fallback = false;
uint32_t generated_ts_used = 0;
nonce_from_additional_data = fill_nonce_from_additional_data(
additional_data.data(),
additional_data.size(),
nonce,
&additional_ts_,
true,
&nonce_from_additional_normalized,
&additional_was_rtp_header);
if (!nonce_from_additional_data) {
nonce_from_rtp_header =
fill_nonce_from_rtp_frame(encrypted_frame.data(), encrypted_frame.size(), &rtp_probe_, nonce, &header_size);
if (!nonce_from_rtp_header) {
if (!generated_ts_.initialized) {
generated_ts_.initialized = true;
generated_ts_.next_timestamp = 0;
generated_ts_.next_step = 960;
}
nonce_from_generated_ts = true;
generated_ts_used = generated_ts_.next_timestamp;
fill_nonce_from_ts32(generated_ts_used, nonce);
}
}
if (payload_sz == 0 || frame.size() < payload_sz) {
if (encrypted_frame.size() == 0 || frame.size() < encrypted_frame.size()) {
return {Result::Status::kFailedToDecrypt, 0};
}
rosetta_xchacha20_xor(frame.data(), payload, payload_sz, nonce, key_);
bool used_generated_resync = false;
if (nonce_from_rtp_header && header_size <= encrypted_frame.size()) {
if (header_size > 0) {
memcpy(frame.data(), encrypted_frame.data(), header_size);
}
const size_t payload_size = encrypted_frame.size() - header_size;
rosetta_xchacha20_xor(
frame.data() + header_size,
encrypted_frame.data() + header_size,
payload_size,
nonce,
key_);
} else {
rosetta_xchacha20_xor(frame.data(), encrypted_frame.data(), encrypted_frame.size(), nonce, key_);
}
// additional_data on Android can be absolute RTP-ish timestamp, while
// desktop nonce source is normalized stream timestamp. If normalized
// nonce gives implausible Opus, retry with absolute additional_data.
if (!nonce_from_generated_ts &&
nonce_from_additional_data &&
encrypted_frame.size() > 0 &&
additional_data.size() >= 8) {
const uint8_t* payload_ptr = frame.data() + header_size;
const size_t payload_size = encrypted_frame.size() - header_size;
if (!is_plausible_opus_packet(payload_ptr, payload_size)) {
uint8_t nonce_abs[24] = {0};
bool abs_norm = false;
bool abs_rtp = false;
if (fill_nonce_from_additional_data(
additional_data.data(),
additional_data.size(),
nonce_abs,
nullptr,
false,
&abs_norm,
&abs_rtp) &&
memcmp(nonce_abs, nonce, 24) != 0) {
if (nonce_from_rtp_header && header_size <= encrypted_frame.size()) {
if (header_size > 0) {
memcpy(frame.data(), encrypted_frame.data(), header_size);
}
rosetta_xchacha20_xor(
frame.data() + header_size,
encrypted_frame.data() + header_size,
payload_size,
nonce_abs,
key_);
} else {
rosetta_xchacha20_xor(
frame.data(),
encrypted_frame.data(),
encrypted_frame.size(),
nonce_abs,
key_);
}
payload_ptr = frame.data() + header_size;
if (is_plausible_opus_packet(payload_ptr, payload_size)) {
memcpy(nonce, nonce_abs, 24);
used_absolute_additional_fallback = true;
}
}
}
}
if (nonce_from_generated_ts) {
bool plausible = is_plausible_opus_packet(frame.data(), encrypted_frame.size());
// Recover after lost packets by probing a few forward timestamp steps.
if (!plausible) {
std::vector<uint8_t> candidate(encrypted_frame.size());
for (uint32_t i = 1; i <= 8; ++i) {
const uint32_t ts_try = generated_ts_used + generated_ts_.next_step * i;
uint8_t nonce_try[24] = {0};
fill_nonce_from_ts32(ts_try, nonce_try);
rosetta_xchacha20_xor(
candidate.data(),
encrypted_frame.data(),
encrypted_frame.size(),
nonce_try,
key_);
if (is_plausible_opus_packet(candidate.data(), candidate.size())) {
memcpy(frame.data(), candidate.data(), candidate.size());
generated_ts_used = ts_try;
used_generated_resync = true;
plausible = true;
break;
}
}
}
const uint32_t step = infer_opus_packet_duration_samples(frame.data(), encrypted_frame.size());
generated_ts_.next_step = step;
generated_ts_.next_timestamp = generated_ts_used + step;
}
// Diag: log first 3 frames
int n = diag_count_.fetch_add(1, std::memory_order_relaxed);
if (n < 3) {
LOGI("DEC frame#%d: enc_sz=%zu payload=%zu nonce=%02x%02x%02x%02x",
n, encrypted_frame.size(), payload_sz,
const char* mode = nullptr;
if (nonce_from_rtp_header) {
mode = "rtp";
} else if (nonce_from_generated_ts) {
mode = used_generated_resync ? "gen-resync" : "gen";
} else if (used_absolute_additional_fallback) {
mode = additional_was_rtp_header ? "ad-rtp-abs-fb" : "raw-abs-fb";
} else if (nonce_from_additional_data) {
mode =
additional_was_rtp_header
? (nonce_from_additional_normalized ? "ad-rtp-norm" : "ad-rtp")
: (nonce_from_additional_normalized ? "raw-norm" : "raw-abs");
} else {
mode = "raw-abs";
}
LOGI("DEC frame#%d: enc_sz=%zu ad=%zu hdr=%zu mode=%s nonce=%02x%02x%02x%02x",
n, encrypted_frame.size(), additional_data.size(), header_size, mode,
nonce[4], nonce[5], nonce[6], nonce[7]);
diag_write("DEC frame#%d: enc_sz=%zu payload=%zu nonce[4..7]=%02x%02x%02x%02x\n",
n, encrypted_frame.size(), payload_sz,
diag_write("DEC frame#%d: enc_sz=%zu ad=%zu hdr=%zu mode=%s nonce[4..7]=%02x%02x%02x%02x\n",
n, encrypted_frame.size(), additional_data.size(), header_size, mode,
nonce[4], nonce[5], nonce[6], nonce[7]);
}
return {Result::Status::kOk, payload_sz};
return {Result::Status::kOk, encrypted_frame.size()};
}
size_t GetMaxPlaintextByteSize(cricket::MediaType, size_t encrypted_frame_size) override {
return encrypted_frame_size; // >= actual (payload = enc - 4)
return encrypted_frame_size;
}
protected:
@@ -244,6 +700,9 @@ protected:
private:
mutable std::atomic<int> ref_{0};
mutable std::atomic<int> diag_count_{0};
mutable RtpProbeState rtp_probe_;
mutable GeneratedTsState generated_ts_;
mutable AdditionalTsState additional_ts_;
uint8_t key_[32];
};

View File

@@ -3,6 +3,7 @@ package com.rosetta.messenger.network
import android.content.Context
import android.media.AudioManager
import android.util.Log
import com.rosetta.messenger.data.MessageRepository
import java.security.SecureRandom
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
@@ -14,6 +15,8 @@ import kotlinx.coroutines.flow.StateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.launch
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.coroutines.suspendCancellableCoroutine
import org.bouncycastle.math.ec.rfc7748.X25519
import org.json.JSONObject
@@ -24,6 +27,7 @@ import org.webrtc.MediaConstraints
import org.webrtc.PeerConnection
import org.webrtc.PeerConnectionFactory
import org.webrtc.RtpReceiver
import org.webrtc.RtpSender
import org.webrtc.RtpTransceiver
import org.webrtc.SdpObserver
import org.webrtc.SessionDescription
@@ -105,10 +109,12 @@ object CallManager {
private var durationJob: Job? = null
private var protocolStateJob: Job? = null
private var disconnectResetJob: Job? = null
private var signalWaiter: ((Packet) -> Unit)? = null
private var webRtcWaiter: ((Packet) -> Unit)? = null
private var iceWaiter: ((Packet) -> Unit)? = null
private val webRtcSignalMutex = Mutex()
private var peerConnectionFactory: PeerConnectionFactory? = null
private var peerConnection: PeerConnection? = null
@@ -228,6 +234,7 @@ object CallManager {
}
fun endCall() {
breadcrumb("UI: endCall requested")
resetSession(reason = null, notifyPeer = true)
}
@@ -392,58 +399,85 @@ object CallManager {
}
private suspend fun handleWebRtcPacket(packet: PacketWebRTC) {
val phase = _state.value.phase
if (phase != CallPhase.CONNECTING && phase != CallPhase.ACTIVE) {
breadcrumb("RTC: IGNORED ${packet.signalType} — phase=$phase")
return
}
val pc = peerConnection
if (pc == null) {
breadcrumb("RTC: IGNORED ${packet.signalType} — peerConnection=null!")
return
}
webRtcSignalMutex.withLock {
val phase = _state.value.phase
if (phase != CallPhase.CONNECTING && phase != CallPhase.ACTIVE) {
breadcrumb("RTC: IGNORED ${packet.signalType} — phase=$phase")
return@withLock
}
val pc = peerConnection
if (pc == null) {
breadcrumb("RTC: IGNORED ${packet.signalType} — peerConnection=null!")
return@withLock
}
when (packet.signalType) {
WebRTCSignalType.ANSWER -> {
breadcrumb("RTC: ANSWER received")
val answer = parseSessionDescription(packet.sdpOrCandidate) ?: return
try {
pc.setRemoteDescriptionAwait(answer)
remoteDescriptionSet = true
flushBufferedRemoteCandidates()
breadcrumb("RTC: ANSWER applied OK, remoteDesc=true")
} catch (e: Exception) {
breadcrumb("RTC: ANSWER FAILED — ${e.message}")
saveCrashReport("setRemoteDescription(answer) failed", e)
when (packet.signalType) {
WebRTCSignalType.ANSWER -> {
val answer = parseSessionDescription(packet.sdpOrCandidate) ?: return@withLock
if (answer.type != SessionDescription.Type.ANSWER) {
breadcrumb("RTC: ANSWER packet with type=${answer.type} ignored")
return@withLock
}
val stateBefore = pc.signalingState()
breadcrumb("RTC: ANSWER received state=$stateBefore")
if (stateBefore == PeerConnection.SignalingState.STABLE && remoteDescriptionSet) {
breadcrumb("RTC: ANSWER duplicate ignored (already stable)")
return@withLock
}
try {
pc.setRemoteDescriptionAwait(answer)
remoteDescriptionSet = true
flushBufferedRemoteCandidates()
breadcrumb("RTC: ANSWER applied OK, state=${pc.signalingState()}")
} catch (e: Exception) {
breadcrumb("RTC: ANSWER FAILED — ${e.message}")
saveCrashReport("setRemoteDescription(answer) failed", e)
}
}
}
WebRTCSignalType.ICE_CANDIDATE -> {
val candidate = parseIceCandidate(packet.sdpOrCandidate) ?: return
if (!remoteDescriptionSet) {
breadcrumb("RTC: ICE buffered (remoteDesc not set yet)")
bufferedRemoteCandidates.add(candidate)
return
WebRTCSignalType.ICE_CANDIDATE -> {
val candidate = parseIceCandidate(packet.sdpOrCandidate) ?: return@withLock
if (!remoteDescriptionSet) {
breadcrumb("RTC: ICE buffered (remoteDesc not set yet)")
bufferedRemoteCandidates.add(candidate)
return@withLock
}
breadcrumb("RTC: ICE added: ${candidate.sdp.take(40)}")
runCatching { pc.addIceCandidate(candidate) }
}
breadcrumb("RTC: ICE added: ${candidate.sdp.take(40)}")
runCatching { pc.addIceCandidate(candidate) }
}
WebRTCSignalType.OFFER -> {
breadcrumb("RTC: OFFER received (offerSent=$offerSent)")
val remoteOffer = parseSessionDescription(packet.sdpOrCandidate) ?: return
try {
pc.setRemoteDescriptionAwait(remoteOffer)
remoteDescriptionSet = true
flushBufferedRemoteCandidates()
val answer = pc.createAnswerAwait()
pc.setLocalDescriptionAwait(answer)
ProtocolManager.sendWebRtcSignal(
signalType = WebRTCSignalType.ANSWER,
sdpOrCandidate = serializeSessionDescription(answer)
)
breadcrumb("RTC: OFFER handled → ANSWER sent")
} catch (e: Exception) {
breadcrumb("RTC: OFFER FAILED — ${e.message}")
saveCrashReport("handleOffer failed", e)
WebRTCSignalType.OFFER -> {
val remoteOffer = parseSessionDescription(packet.sdpOrCandidate) ?: return@withLock
if (remoteOffer.type != SessionDescription.Type.OFFER) {
breadcrumb("RTC: OFFER packet with type=${remoteOffer.type} ignored")
return@withLock
}
breadcrumb("RTC: OFFER received (offerSent=$offerSent state=${pc.signalingState()})")
try {
pc.setRemoteDescriptionAwait(remoteOffer)
remoteDescriptionSet = true
flushBufferedRemoteCandidates()
val stateAfterRemote = pc.signalingState()
if (stateAfterRemote != PeerConnection.SignalingState.HAVE_REMOTE_OFFER &&
stateAfterRemote != PeerConnection.SignalingState.HAVE_LOCAL_PRANSWER
) {
breadcrumb("RTC: OFFER skip createAnswer, bad state=$stateAfterRemote")
return@withLock
}
val answer = pc.createAnswerAwait()
pc.setLocalDescriptionAwait(answer)
ProtocolManager.sendWebRtcSignal(
signalType = WebRTCSignalType.ANSWER,
sdpOrCandidate = serializeSessionDescription(answer)
)
breadcrumb("RTC: OFFER handled → ANSWER sent")
} catch (e: Exception) {
breadcrumb("RTC: OFFER FAILED — ${e.message}")
saveCrashReport("handleOffer failed", e)
}
}
}
}
@@ -493,9 +527,14 @@ object CallManager {
if (localAudioTrack == null) {
localAudioTrack = factory.createAudioTrack(LOCAL_AUDIO_TRACK_ID, audioSource)
localAudioTrack?.setEnabled(!_state.value.isMuted)
pc.addTrack(localAudioTrack, listOf(LOCAL_MEDIA_STREAM_ID))
breadcrumb("PC: audio track added, attaching E2EE…")
attachSenderE2EE(pc)
val txInit =
RtpTransceiver.RtpTransceiverInit(
RtpTransceiver.RtpTransceiverDirection.SEND_RECV,
listOf(LOCAL_MEDIA_STREAM_ID)
)
val transceiver = pc.addTransceiver(localAudioTrack, txInit)
breadcrumb("PC: audio transceiver added, attaching E2EE…")
attachSenderE2EE(transceiver?.sender)
}
try {
@@ -561,16 +600,37 @@ object CallManager {
breadcrumb("PC: connState=$newState")
when (newState) {
PeerConnection.PeerConnectionState.CONNECTED -> {
disconnectResetJob?.cancel()
disconnectResetJob = null
onCallConnected()
}
PeerConnection.PeerConnectionState.DISCONNECTED,
PeerConnection.PeerConnectionState.FAILED,
PeerConnection.PeerConnectionState.CLOSED -> {
disconnectResetJob?.cancel()
disconnectResetJob = null
// Dispatch to our scope — this callback fires on WebRTC thread
scope.launch {
resetSession(reason = "Connection lost", notifyPeer = false)
}
}
PeerConnection.PeerConnectionState.DISCONNECTED -> {
// Desktop tolerates short network dips; do not kill call immediately.
disconnectResetJob?.cancel()
disconnectResetJob =
scope.launch {
delay(5_000L)
val pcState = peerConnection?.connectionState()
if (pcState == PeerConnection.PeerConnectionState.DISCONNECTED ||
pcState == PeerConnection.PeerConnectionState.FAILED ||
pcState == PeerConnection.PeerConnectionState.CLOSED
) {
breadcrumb("PC: DISCONNECTED timeout → reset")
resetSession(reason = "Connection lost", notifyPeer = false)
} else {
breadcrumb("PC: DISCONNECTED recovered (state=$pcState)")
}
}
}
else -> Unit
}
}
@@ -625,6 +685,34 @@ object CallManager {
peerConnectionFactory = PeerConnectionFactory.builder().createPeerConnectionFactory()
}
private fun emitCallAttachmentIfNeeded(snapshot: CallUiState) {
if (role != CallRole.CALLER) return
val peerPublicKey = snapshot.peerPublicKey.trim()
val context = appContext ?: return
if (peerPublicKey.isBlank()) return
val durationSec = snapshot.durationSec.coerceAtLeast(0)
val callAttachment =
MessageAttachment(
id = java.util.UUID.randomUUID().toString().replace("-", "").take(16),
blob = "",
type = AttachmentType.CALL,
preview = durationSec.toString()
)
scope.launch {
runCatching {
MessageRepository.getInstance(context).sendMessage(
toPublicKey = peerPublicKey,
text = "",
attachments = listOf(callAttachment)
)
}.onFailure { error ->
Log.w(TAG, "Failed to send call attachment", error)
}
}
}
private fun resetSession(reason: String?, notifyPeer: Boolean) {
breadcrumb("RESET: reason=$reason notifyPeer=$notifyPeer phase=${_state.value.phase}")
val snapshot = _state.value
@@ -646,6 +734,7 @@ object CallManager {
if (!reason.isNullOrBlank()) {
Log.d(TAG, reason)
}
emitCallAttachmentIfNeeded(snapshot)
resetRtcObjects()
e2eeAvailable = true
role = null
@@ -656,6 +745,8 @@ object CallManager {
localPublicKey = null
durationJob?.cancel()
durationJob = null
disconnectResetJob?.cancel()
disconnectResetJob = null
setSpeakerphone(false)
_state.value = CallUiState()
}
@@ -732,10 +823,10 @@ object CallManager {
} catch (_: Throwable) {}
}
private fun attachSenderE2EE(pc: PeerConnection) {
private fun attachSenderE2EE(sender: RtpSender?) {
if (!e2eeAvailable) return
val key = sharedKeyBytes ?: return
val sender = pc.senders.firstOrNull() ?: return
if (sender == null) return
try {
breadcrumb("1. encryptor: nativeLoaded=${XChaCha20E2EE.nativeLoaded}")

View File

@@ -19,6 +19,7 @@ import androidx.compose.animation.core.tween
import androidx.compose.foundation.Canvas
import androidx.compose.foundation.Image
import androidx.compose.foundation.background
import androidx.compose.foundation.border
import androidx.compose.foundation.ExperimentalFoundationApi
import androidx.compose.foundation.clickable
import androidx.compose.foundation.combinedClickable
@@ -1555,27 +1556,48 @@ fun ImageAttachment(
}
}
private fun parseCallAttachmentPreview(preview: String): Pair<String, String?> {
if (preview.isBlank()) return "Call" to null
private data class DesktopCallUi(
val title: String,
val subtitle: String,
val isError: Boolean
)
val pieces = preview.split("::")
val title = pieces.firstOrNull()?.trim().orEmpty().ifBlank { "Call" }
val subtitle = pieces.drop(1).joinToString(" ").trim().ifBlank { null }
private fun parseCallDurationSeconds(preview: String): Int {
if (preview.isBlank()) return 0
val tail = preview.substringAfterLast("::").trim()
tail.toIntOrNull()?.let { return it.coerceAtLeast(0) }
val durationRegex = Regex("duration(?:Sec|Seconds)?\\s*[:=]\\s*(\\d+)", RegexOption.IGNORE_CASE)
val fallbackDurationRegex = Regex("^(\\d{1,5})$")
val durationSec =
durationRegex.find(preview)?.groupValues?.getOrNull(1)?.toIntOrNull()
?: fallbackDurationRegex.find(title)?.groupValues?.getOrNull(1)?.toIntOrNull()
durationRegex.find(preview)?.groupValues?.getOrNull(1)?.toIntOrNull()?.let {
return it.coerceAtLeast(0)
}
val normalizedSubtitle =
durationSec?.let { sec ->
val mins = sec / 60
val secs = sec % 60
"Duration ${"%d:%02d".format(mins, secs)}"
} ?: subtitle
return preview.trim().toIntOrNull()?.coerceAtLeast(0) ?: 0
}
return title to normalizedSubtitle
private fun formatDesktopCallDuration(durationSec: Int): String {
val minutes = durationSec / 60
val seconds = durationSec % 60
return "$minutes:${seconds.toString().padStart(2, '0')}"
}
private fun resolveDesktopCallUi(preview: String, isOutgoing: Boolean): DesktopCallUi {
val durationSec = parseCallDurationSeconds(preview)
val isError = durationSec == 0
val title =
if (isError) {
if (isOutgoing) "Rejected call" else "Missed call"
} else {
if (isOutgoing) "Outgoing call" else "Incoming call"
}
val subtitle =
if (isError) {
"Call was not answered or was rejected"
} else {
formatDesktopCallDuration(durationSec)
}
return DesktopCallUi(title = title, subtitle = subtitle, isError = isError)
}
/** Call attachment bubble */
@@ -1587,116 +1609,141 @@ fun CallAttachment(
timestamp: java.util.Date,
messageStatus: MessageStatus = MessageStatus.READ
) {
val (title, subtitle) = remember(attachment.preview) { parseCallAttachmentPreview(attachment.preview) }
Row(
modifier = Modifier.fillMaxWidth().padding(vertical = 4.dp),
verticalAlignment = Alignment.CenterVertically
) {
Box(
modifier =
Modifier.size(40.dp)
.clip(CircleShape)
.background(
if (isOutgoing) {
Color.White.copy(alpha = 0.18f)
} else {
if (isDarkTheme) Color(0xFF2B3A4D) else Color(0xFFE7F2FF)
}
),
contentAlignment = Alignment.Center
) {
Icon(
imageVector = Icons.Default.Call,
contentDescription = null,
tint =
if (isOutgoing) Color.White
else if (isDarkTheme) Color(0xFF8EC9FF) else PrimaryBlue,
modifier = Modifier.size(20.dp)
)
val callUi = remember(attachment.preview, isOutgoing) {
resolveDesktopCallUi(attachment.preview, isOutgoing)
}
val containerShape = RoundedCornerShape(10.dp)
val containerBackground =
if (isOutgoing) {
Color.White.copy(alpha = 0.12f)
} else {
if (isDarkTheme) Color(0xFF1F2733) else Color(0xFFF3F8FF)
}
val containerBorder =
if (isOutgoing) {
Color.White.copy(alpha = 0.2f)
} else {
if (isDarkTheme) Color(0xFF33435A) else Color(0xFFD8E5F4)
}
val iconBackground = if (callUi.isError) Color(0xFFE55A5A) else PrimaryBlue
val iconVector =
when {
callUi.isError -> Icons.Default.Close
isOutgoing -> Icons.Default.CallMade
else -> Icons.Default.CallReceived
}
Spacer(modifier = Modifier.width(10.dp))
Box(
modifier =
Modifier
.padding(vertical = 4.dp)
.widthIn(min = 200.dp)
.heightIn(min = 60.dp)
.clip(containerShape)
.background(containerBackground)
.border(width = 1.dp, color = containerBorder, shape = containerShape)
.padding(horizontal = 10.dp, vertical = 8.dp)
) {
Row(
verticalAlignment = Alignment.CenterVertically
) {
Box(
modifier =
Modifier.size(40.dp)
.clip(CircleShape)
.background(iconBackground),
contentAlignment = Alignment.Center
) {
Icon(
imageVector = iconVector,
contentDescription = null,
tint = Color.White,
modifier = Modifier.size(20.dp)
)
}
Column(modifier = Modifier.weight(1f)) {
Text(
text = title,
fontSize = 14.sp,
fontWeight = FontWeight.Medium,
color = if (isOutgoing) Color.White else if (isDarkTheme) Color.White else Color.Black,
maxLines = 1,
overflow = TextOverflow.Ellipsis
)
if (!subtitle.isNullOrBlank()) {
Spacer(modifier = Modifier.height(2.dp))
Spacer(modifier = Modifier.width(10.dp))
Column(modifier = Modifier.weight(1f)) {
Text(
text = subtitle,
fontSize = 12.sp,
color =
if (isOutgoing) {
Color.White.copy(alpha = 0.7f)
} else {
if (isDarkTheme) Color(0xFF8BA0B8) else Color(0xFF5E6E82)
},
text = callUi.title,
fontSize = 14.sp,
fontWeight = FontWeight.Medium,
color = if (isOutgoing) Color.White else if (isDarkTheme) Color.White else Color.Black,
maxLines = 1,
overflow = TextOverflow.Ellipsis
)
}
}
if (isOutgoing) {
Spacer(modifier = Modifier.width(8.dp))
Row(verticalAlignment = Alignment.CenterVertically) {
Spacer(modifier = Modifier.height(2.dp))
Text(
text = android.text.format.DateFormat.format("HH:mm", timestamp).toString(),
fontSize = 11.sp,
color = Color.White.copy(alpha = 0.7f)
text = callUi.subtitle,
fontSize = 12.sp,
color =
if (callUi.isError) {
Color(0xFFE55A5A)
} else if (isOutgoing) {
Color.White.copy(alpha = 0.72f)
} else {
if (isDarkTheme) Color(0xFF8EC9FF) else PrimaryBlue
},
maxLines = 1,
overflow = TextOverflow.Ellipsis
)
Spacer(modifier = Modifier.width(4.dp))
when (messageStatus) {
MessageStatus.SENDING -> {
Icon(
painter = TelegramIcons.Clock,
contentDescription = null,
tint = Color.White.copy(alpha = 0.7f),
modifier = Modifier.size(14.dp)
)
}
MessageStatus.SENT, MessageStatus.DELIVERED -> {
Icon(
painter = TelegramIcons.Done,
contentDescription = null,
tint = Color.White.copy(alpha = 0.8f),
modifier = Modifier.size(14.dp)
)
}
MessageStatus.READ -> {
Box(modifier = Modifier.height(14.dp)) {
}
if (isOutgoing) {
Spacer(modifier = Modifier.width(8.dp))
Row(verticalAlignment = Alignment.CenterVertically) {
Text(
text = android.text.format.DateFormat.format("HH:mm", timestamp).toString(),
fontSize = 11.sp,
color = Color.White.copy(alpha = 0.7f)
)
Spacer(modifier = Modifier.width(4.dp))
when (messageStatus) {
MessageStatus.SENDING -> {
Icon(
painter = TelegramIcons.Done,
painter = TelegramIcons.Clock,
contentDescription = null,
tint = Color.White,
tint = Color.White.copy(alpha = 0.7f),
modifier = Modifier.size(14.dp)
)
}
MessageStatus.SENT, MessageStatus.DELIVERED -> {
Icon(
painter = TelegramIcons.Done,
contentDescription = null,
tint = Color.White,
modifier = Modifier.size(14.dp).offset(x = 4.dp)
tint = Color.White.copy(alpha = 0.8f),
modifier = Modifier.size(14.dp)
)
}
MessageStatus.READ -> {
Box(modifier = Modifier.height(14.dp)) {
Icon(
painter = TelegramIcons.Done,
contentDescription = null,
tint = Color.White,
modifier = Modifier.size(14.dp)
)
Icon(
painter = TelegramIcons.Done,
contentDescription = null,
tint = Color.White,
modifier = Modifier.size(14.dp).offset(x = 4.dp)
)
}
}
MessageStatus.ERROR -> {
Icon(
imageVector = Icons.Default.Error,
contentDescription = null,
tint = Color(0xFFE53935),
modifier = Modifier.size(14.dp)
)
}
}
MessageStatus.ERROR -> {
Icon(
imageVector = Icons.Default.Error,
contentDescription = null,
tint = Color(0xFFE53935),
modifier = Modifier.size(14.dp)
)
}
}
}
}
}
}
}