diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 439f38c4..dcc9ca95 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -70,8 +70,8 @@ concurrency: cancel-in-progress: false permissions: - contents: write # commit + tag + GitHub release - id-token: write # OIDC trusted publishing + npm provenance + contents: write # commit + tag + GitHub release + id-token: write # OIDC trusted publishing + npm provenance jobs: release: diff --git a/cli/XCWH264Encoder.m b/cli/XCWH264Encoder.m index 0d2077bd..2b65b432 100644 --- a/cli/XCWH264Encoder.m +++ b/cli/XCWH264Encoder.m @@ -231,11 +231,15 @@ static int32_t XCWRoundToEvenDimension(double value) { return rounded; } -static CGSize XCWScaledDimensionsForSourceSize(int32_t width, int32_t height) { +static CGSize XCWScaledDimensionsForSourceSize(int32_t width, int32_t height, XCWVideoEncoderMode mode) { if (width <= 0 || height <= 0) { return CGSizeZero; } + if (mode == XCWVideoEncoderModeH264Software) { + return CGSizeMake(width, height); + } + int32_t longestEdge = MAX(width, height); if (longestEdge <= XCWMaximumEncodedDimension) { return CGSizeMake(width, height); @@ -494,7 +498,7 @@ - (BOOL)encodePixelBufferLocked:(CVPixelBufferRef)pixelBuffer { return NO; } - CGSize targetSize = XCWScaledDimensionsForSourceSize(sourceWidth, sourceHeight); + CGSize targetSize = XCWScaledDimensionsForSourceSize(sourceWidth, sourceHeight, _encoderMode); int32_t targetWidth = (int32_t)targetSize.width; int32_t targetHeight = (int32_t)targetSize.height; if (targetWidth <= 0 || targetHeight <= 0) { diff --git a/cli/XCWPrivateSimulatorSession.h b/cli/XCWPrivateSimulatorSession.h index 18ca721c..c6571f26 100644 --- a/cli/XCWPrivateSimulatorSession.h +++ b/cli/XCWPrivateSimulatorSession.h @@ -28,6 +28,7 @@ typedef void (^XCWPrivateSimulatorEncodedFrameHandler)(NSData *sampleData, - (BOOL)waitUntilReadyWithTimeout:(NSTimeInterval)timeout; - (BOOL)waitForFirstEncodedFrameWithTimeout:(NSTimeInterval)timeout; - (void)requestKeyFrameRefresh; +- (void)requestFrameRefresh; - (id)addEncodedFrameListener:(XCWPrivateSimulatorEncodedFrameHandler)handler; - (void)removeEncodedFrameListener:(id)token; diff --git a/cli/XCWPrivateSimulatorSession.m b/cli/XCWPrivateSimulatorSession.m index d4f9f616..23f310d1 100644 --- a/cli/XCWPrivateSimulatorSession.m +++ b/cli/XCWPrivateSimulatorSession.m @@ -167,6 +167,10 @@ - (void)requestKeyFrameRefresh { [_videoEncoder requestKeyFrame]; } +- (void)requestFrameRefresh { + [self refreshCurrentFrame]; +} + - (id)addEncodedFrameListener:(XCWPrivateSimulatorEncodedFrameHandler)handler { if (handler == nil) { return [NSUUID UUID]; diff --git a/cli/native/XCWNativeBridge.h b/cli/native/XCWNativeBridge.h index 715e1290..616ead97 100644 --- a/cli/native/XCWNativeBridge.h +++ b/cli/native/XCWNativeBridge.h @@ -70,6 +70,7 @@ void * _Nullable xcw_native_session_create(const char * _Nonnull udid, char * _N void xcw_native_session_destroy(void * _Nullable handle); bool xcw_native_session_start(void * _Nonnull handle, char * _Nullable * _Nullable error_message); void xcw_native_session_request_refresh(void * _Nonnull handle); +void xcw_native_session_request_keyframe(void * _Nonnull handle); bool xcw_native_session_send_touch(void * _Nonnull handle, double x, double y, const char * _Nonnull phase, char * _Nullable * _Nullable error_message); bool xcw_native_session_send_multitouch(void * _Nonnull handle, double x1, double y1, double x2, double y2, const char * _Nonnull phase, char * _Nullable * _Nullable error_message); bool xcw_native_session_send_key(void * _Nonnull handle, uint16_t key_code, uint32_t modifiers, char * _Nullable * _Nullable error_message); diff --git a/cli/native/XCWNativeBridge.m b/cli/native/XCWNativeBridge.m index 5b7e6e31..ee773288 100644 --- a/cli/native/XCWNativeBridge.m +++ b/cli/native/XCWNativeBridge.m @@ -668,6 +668,12 @@ void xcw_native_session_request_refresh(void *handle) { } } +void xcw_native_session_request_keyframe(void *handle) { + @autoreleasepool { + [XCWNativeSessionFromHandle(handle) requestKeyFrame]; + } +} + bool xcw_native_session_send_touch(void *handle, double x, double y, const char *phase, char **error_message) { @autoreleasepool { NSError *error = nil; diff --git a/cli/native/XCWNativeSession.h b/cli/native/XCWNativeSession.h index 84b5dd9e..19075e88 100644 --- a/cli/native/XCWNativeSession.h +++ b/cli/native/XCWNativeSession.h @@ -13,6 +13,7 @@ NS_ASSUME_NONNULL_BEGIN - (BOOL)start:(NSError * _Nullable * _Nullable)error; - (void)requestRefresh; +- (void)requestKeyFrame; - (BOOL)sendTouchAtX:(double)x y:(double)y phase:(NSString *)phase diff --git a/cli/native/XCWNativeSession.m b/cli/native/XCWNativeSession.m index 3296b164..76656806 100644 --- a/cli/native/XCWNativeSession.m +++ b/cli/native/XCWNativeSession.m @@ -91,6 +91,10 @@ - (BOOL)start:(NSError * _Nullable __autoreleasing *)error { } - (void)requestRefresh { + [self.session requestFrameRefresh]; +} + +- (void)requestKeyFrame { [self.session requestKeyFrameRefresh]; } diff --git a/client/src/features/stream/streamWorkerClient.ts b/client/src/features/stream/streamWorkerClient.ts index f097ae7c..e7a96797 100644 --- a/client/src/features/stream/streamWorkerClient.ts +++ b/client/src/features/stream/streamWorkerClient.ts @@ -105,6 +105,7 @@ class WebRtcStreamClient implements StreamClientBackend { private connectGeneration = 0; private context: CanvasRenderingContext2D | null = null; private controlChannel: RTCDataChannel | null = null; + private diagnostics = createWebRtcDiagnostics(); private peerConnection: RTCPeerConnection | null = null; private reconnectTimeout = 0; private shouldReconnect = false; @@ -141,6 +142,7 @@ class WebRtcStreamClient implements StreamClientBackend { } const generation = ++this.connectGeneration; this.shouldReconnect = true; + this.diagnostics = createWebRtcDiagnostics(); this.stats = createEmptyStreamStats(); this.onMessage({ type: "status", @@ -150,8 +152,10 @@ class WebRtcStreamClient implements StreamClientBackend { try { const peerConnection = new RTCPeerConnection({ iceServers: iceServers(), + iceTransportPolicy: iceTransportPolicy(), }); this.peerConnection = peerConnection; + this.attachDiagnostics(peerConnection, target, generation); const transceiver = peerConnection.addTransceiver("video", { direction: "recvonly", }); @@ -174,6 +178,7 @@ class WebRtcStreamClient implements StreamClientBackend { if (generation !== this.connectGeneration) { return; } + event.track.contentHint = "motion"; for (const receiver of peerConnection.getReceivers()) { configureLowLatencyReceiver(receiver); } @@ -206,11 +211,16 @@ class WebRtcStreamClient implements StreamClientBackend { }; peerConnection.onconnectionstatechange = () => { + this.diagnostics.peerConnectionState = peerConnection.connectionState; + this.postDiagnostics(target, "connectionstatechange"); if ( generation === this.connectGeneration && (peerConnection.connectionState === "failed" || peerConnection.connectionState === "disconnected") ) { + if (peerConnection.connectionState === "failed") { + void this.updateSelectedCandidatePair(peerConnection, target); + } this.handleConnectionError( target, generation, @@ -232,6 +242,10 @@ class WebRtcStreamClient implements StreamClientBackend { if (!localDescription) { throw new Error("WebRTC local offer was not created."); } + this.diagnostics.localCandidateSummary = summarizeSdpCandidates( + localDescription.sdp, + ); + this.postDiagnostics(target, "local-offer"); const response = await fetch( `/api/simulators/${encodeURIComponent(target.udid)}/webrtc/offer`, @@ -251,6 +265,10 @@ class WebRtcStreamClient implements StreamClientBackend { if (generation !== this.connectGeneration) { return; } + this.diagnostics.remoteCandidateSummary = summarizeSdpCandidates( + answer.sdp ?? "", + ); + this.postDiagnostics(target, "remote-answer"); await peerConnection.setRemoteDescription(answer); } catch (error) { this.handleConnectionError(target, generation, error); @@ -330,6 +348,139 @@ class WebRtcStreamClient implements StreamClientBackend { this.reconnectTimeout = 0; } + private attachDiagnostics( + peerConnection: RTCPeerConnection, + target: StreamConnectTarget, + generation: number, + ) { + peerConnection.onicecandidate = (event) => { + if (generation !== this.connectGeneration) { + return; + } + if (event.candidate) { + this.diagnostics.localCandidateSummary = summarizeCandidateLines([ + ...(this.diagnostics.localCandidateLines ?? []), + event.candidate.candidate, + ]); + this.diagnostics.localCandidateLines = [ + ...(this.diagnostics.localCandidateLines ?? []), + event.candidate.candidate, + ]; + } + this.postDiagnostics( + target, + event.candidate ? "local-candidate" : "local-candidates-complete", + ); + }; + peerConnection.oniceconnectionstatechange = () => { + if (generation !== this.connectGeneration) { + return; + } + this.diagnostics.iceConnectionState = peerConnection.iceConnectionState; + this.postDiagnostics(target, "iceconnectionstatechange"); + if ( + peerConnection.iceConnectionState === "connected" || + peerConnection.iceConnectionState === "completed" || + peerConnection.iceConnectionState === "failed" + ) { + void this.updateSelectedCandidatePair(peerConnection, target); + } + }; + peerConnection.onicegatheringstatechange = () => { + if (generation !== this.connectGeneration) { + return; + } + this.diagnostics.iceGatheringState = peerConnection.iceGatheringState; + this.postDiagnostics(target, "icegatheringstatechange"); + }; + peerConnection.onsignalingstatechange = () => { + if (generation !== this.connectGeneration) { + return; + } + this.diagnostics.signalingState = peerConnection.signalingState; + this.postDiagnostics(target, "signalingstatechange"); + }; + } + + private async updateSelectedCandidatePair( + peerConnection: RTCPeerConnection, + target: StreamConnectTarget, + ) { + try { + const stats = await peerConnection.getStats(); + let selectedPair: RTCStats | undefined; + stats.forEach((report) => { + const pair = report as RTCStats & { + nominated?: boolean; + selected?: boolean; + state?: string; + localCandidateId?: string; + remoteCandidateId?: string; + }; + if ( + report.type === "candidate-pair" && + (pair.selected || pair.nominated || pair.state === "succeeded") + ) { + selectedPair = report; + } + }); + if (!selectedPair) { + this.diagnostics.selectedCandidatePair = "none"; + this.postDiagnostics(target, "candidate-pair-none"); + return; + } + const pair = selectedPair as RTCStats & { + localCandidateId?: string; + remoteCandidateId?: string; + state?: string; + currentRoundTripTime?: number; + }; + const local = pair.localCandidateId + ? stats.get(pair.localCandidateId) + : undefined; + const remote = pair.remoteCandidateId + ? stats.get(pair.remoteCandidateId) + : undefined; + this.diagnostics.selectedCandidatePair = `state=${pair.state ?? "?"},rtt=${pair.currentRoundTripTime ?? "?"},local=${candidateStatsSummary(local)},remote=${candidateStatsSummary(remote)}`; + this.postDiagnostics(target, "candidate-pair-selected"); + } catch (error) { + this.diagnostics.selectedCandidatePair = `stats-error:${error instanceof Error ? error.message : String(error)}`; + this.postDiagnostics(target, "candidate-pair-error"); + } + } + + private postDiagnostics(target: StreamConnectTarget, detail: string) { + const payload = { + ...this.stats, + clientId: "webrtc-page", + connectionId: this.connectGeneration, + detail, + iceConnectionState: this.diagnostics.iceConnectionState, + iceGatheringState: this.diagnostics.iceGatheringState, + kind: "webrtc", + localCandidateSummary: this.diagnostics.localCandidateSummary, + peerConnectionState: this.diagnostics.peerConnectionState, + remoteCandidateSummary: this.diagnostics.remoteCandidateSummary, + selectedCandidatePair: this.diagnostics.selectedCandidatePair, + signalingState: this.diagnostics.signalingState, + status: + this.diagnostics.peerConnectionState || + this.diagnostics.iceConnectionState, + timestampMs: Date.now(), + udid: target.udid, + url: window.location.href, + userAgent: window.navigator.userAgent, + }; + void fetch(new URL("/api/client-stream-stats", window.location.href), { + body: JSON.stringify(payload), + cache: "no-store", + headers: apiHeaders(), + method: "POST", + }).catch(() => { + // Diagnostics only. + }); + } + private drawVideoFrame = () => { this.videoFrameCallback = 0; if (!this.canvas || !this.context || !this.video) { @@ -413,7 +564,7 @@ function configureLowLatencyReceiver(receiver: RTCRtpReceiver) { jitterBufferTarget?: number; }; if ("jitterBufferTarget" in lowLatencyReceiver) { - lowLatencyReceiver.jitterBufferTarget = 0.03; + lowLatencyReceiver.jitterBufferTarget = 0.001; } } @@ -427,6 +578,9 @@ function streamTransportMode(): string { function iceServers(): RTCIceServer[] { const params = new URLSearchParams(window.location.search); const raw = params.get("iceServers") ?? "stun:stun.l.google.com:19302"; + if (raw === "none") { + return []; + } return [ { urls: raw @@ -437,6 +591,86 @@ function iceServers(): RTCIceServer[] { ]; } +function iceTransportPolicy(): RTCIceTransportPolicy { + const value = new URLSearchParams(window.location.search).get( + "iceTransportPolicy", + ); + return value === "relay" || value === "all" ? value : "all"; +} + +interface WebRtcDiagnostics { + iceConnectionState: string; + iceGatheringState: string; + localCandidateLines?: string[]; + localCandidateSummary: string; + peerConnectionState: string; + remoteCandidateSummary: string; + selectedCandidatePair: string; + signalingState: string; +} + +function createWebRtcDiagnostics(): WebRtcDiagnostics { + return { + iceConnectionState: "", + iceGatheringState: "", + localCandidateSummary: "", + peerConnectionState: "", + remoteCandidateSummary: "", + selectedCandidatePair: "", + signalingState: "", + }; +} + +function summarizeSdpCandidates(sdp: string): string { + return summarizeCandidateLines( + sdp + .split(/\r?\n/) + .filter((line) => line.startsWith("a=candidate:")) + .map((line) => line.slice("a=".length)), + ); +} + +function summarizeCandidateLines(lines: string[]): string { + const counts: Record = { + host: 0, + prflx: 0, + relay: 0, + srflx: 0, + tcp: 0, + udp: 0, + other: 0, + }; + for (const line of lines) { + const parts = line.split(/\s+/); + const typIndex = parts.indexOf("typ"); + const typ = typIndex >= 0 ? parts[typIndex + 1] : ""; + if (typ && typ in counts) { + counts[typ] += 1; + } else { + counts.other += 1; + } + const protocol = parts[2]?.toLowerCase(); + if (protocol === "udp" || protocol === "tcp") { + counts[protocol] += 1; + } + } + return `host=${counts.host},srflx=${counts.srflx},prflx=${counts.prflx},relay=${counts.relay},udp=${counts.udp},tcp=${counts.tcp},other=${counts.other}`; +} + +function candidateStatsSummary(candidate: RTCStats | undefined): string { + if (!candidate) { + return "none"; + } + const stats = candidate as RTCStats & { + address?: string; + candidateType?: string; + ip?: string; + port?: number; + protocol?: string; + }; + return `${stats.candidateType ?? "?"}/${stats.protocol ?? "?"}/${stats.address || stats.ip ? "addr" : "noaddr"}/${stats.port ?? "?"}`; +} + function waitForIceGathering(peerConnection: RTCPeerConnection) { if (peerConnection.iceGatheringState === "complete") { return Promise.resolve(); diff --git a/server/src/metrics/counters.rs b/server/src/metrics/counters.rs index fccd405b..879ec86f 100644 --- a/server/src/metrics/counters.rs +++ b/server/src/metrics/counters.rs @@ -45,6 +45,15 @@ pub struct ClientStreamStats { pub udid: Option, pub connection_id: Option, pub status: Option, + pub detail: Option, + pub error: Option, + pub ice_connection_state: Option, + pub peer_connection_state: Option, + pub ice_gathering_state: Option, + pub signaling_state: Option, + pub local_candidate_summary: Option, + pub remote_candidate_summary: Option, + pub selected_candidate_pair: Option, pub url: Option, pub user_agent: Option, pub visibility_state: Option, diff --git a/server/src/native/bridge.rs b/server/src/native/bridge.rs index c453932e..0dbc9d60 100644 --- a/server/src/native/bridge.rs +++ b/server/src/native/bridge.rs @@ -624,6 +624,12 @@ impl NativeSession { } } + pub fn request_keyframe(&self) { + unsafe { + ffi::xcw_native_session_request_keyframe(self.handle); + } + } + pub unsafe fn set_frame_callback( &self, callback: Option, diff --git a/server/src/native/ffi.rs b/server/src/native/ffi.rs index 7312a7f3..9ecb2523 100644 --- a/server/src/native/ffi.rs +++ b/server/src/native/ffi.rs @@ -174,6 +174,7 @@ unsafe extern "C" { pub fn xcw_native_session_destroy(handle: *mut c_void); pub fn xcw_native_session_start(handle: *mut c_void, error_message: *mut *mut c_char) -> bool; pub fn xcw_native_session_request_refresh(handle: *mut c_void); + pub fn xcw_native_session_request_keyframe(handle: *mut c_void); pub fn xcw_native_session_set_frame_callback( handle: *mut c_void, callback: Option, diff --git a/server/src/simulators/session.rs b/server/src/simulators/session.rs index 74f17e6b..b712468f 100644 --- a/server/src/simulators/session.rs +++ b/server/src/simulators/session.rs @@ -3,7 +3,8 @@ use crate::metrics::counters::Metrics; use crate::native::bridge::{NativeBridge, NativeSession}; use crate::native::ffi; use crate::simulators::state::SessionState; -use crate::transport::packet::{ForeignBytes, FramePacket, SharedFrame}; +use crate::transport::packet::{FramePacket, SharedFrame}; +use bytes::Bytes; use std::ffi::c_void; use std::sync::atomic::{AtomicBool, AtomicU64, Ordering}; use std::sync::{Arc, Mutex, RwLock, Weak}; @@ -13,7 +14,9 @@ use tokio::task; use tokio::time::{timeout, Instant}; use tracing::debug; -const FRAME_BROADCAST_CAPACITY: usize = 240; +const FRAME_BROADCAST_CAPACITY: usize = 32; +const MIN_REFRESH_INTERVAL_MS: u64 = 16; +const MIN_KEYFRAME_INTERVAL_MS: u64 = 250; pub struct SimulatorSession { inner: Arc, @@ -32,6 +35,7 @@ struct SimulatorSessionInner { display_height: AtomicU64, frame_sequence: AtomicU64, last_refresh_ms: AtomicU64, + last_keyframe_ms: AtomicU64, } impl SimulatorSession { @@ -54,6 +58,7 @@ impl SimulatorSession { display_height: AtomicU64::new(0), frame_sequence: AtomicU64::new(0), last_refresh_ms: AtomicU64::new(0), + last_keyframe_ms: AtomicU64::new(0), }); let user_data = Weak::into_raw(Arc::downgrade(&inner)) as *mut c_void; @@ -136,7 +141,7 @@ impl SimulatorSession { pub fn request_refresh(&self) { let now = now_ms(); let previous = self.inner.last_refresh_ms.load(Ordering::Relaxed); - if now.saturating_sub(previous) < 200 { + if now.saturating_sub(previous) < MIN_REFRESH_INTERVAL_MS { return; } self.inner.last_refresh_ms.store(now, Ordering::Relaxed); @@ -147,6 +152,22 @@ impl SimulatorSession { self.inner.native.request_refresh(); } + pub fn request_keyframe(&self) { + let now = now_ms(); + let previous = self.inner.last_keyframe_ms.load(Ordering::Relaxed); + if now.saturating_sub(previous) < MIN_KEYFRAME_INTERVAL_MS { + self.request_refresh(); + return; + } + self.inner.last_keyframe_ms.store(now, Ordering::Relaxed); + self.inner.last_refresh_ms.store(now, Ordering::Relaxed); + self.inner + .metrics + .keyframe_requests + .fetch_add(1, Ordering::Relaxed); + self.inner.native.request_keyframe(); + } + pub fn send_touch(&self, x: f64, y: f64, phase: &str) -> Result<(), AppError> { self.inner.native.send_touch(x, y, phase) } @@ -206,8 +227,8 @@ unsafe extern "C" fn native_frame_callback( impl SimulatorSessionInner { fn handle_frame(&self, frame: &ffi::xcw_native_frame) { - let description = unsafe { ForeignBytes::from_ffi(frame.description) }; - let Some(data) = (unsafe { ForeignBytes::from_ffi(frame.data) }) else { + let description = unsafe { copy_ffi_bytes(frame.description) }; + let Some(data) = (unsafe { copy_ffi_bytes(frame.data) }) else { return; }; let packet = Arc::new(FramePacket { @@ -249,6 +270,24 @@ impl SimulatorSessionInner { } } +unsafe fn copy_ffi_bytes(bytes: ffi::xcw_native_shared_bytes) -> Option { + if bytes.data.is_null() || bytes.length == 0 { + if !bytes.owner.is_null() { + unsafe { + ffi::xcw_native_release_shared_bytes(bytes); + } + } + return None; + } + + let copied = + unsafe { Bytes::copy_from_slice(std::slice::from_raw_parts(bytes.data, bytes.length)) }; + unsafe { + ffi::xcw_native_release_shared_bytes(bytes); + } + Some(copied) +} + fn c_string(ptr: *const i8) -> Option { if ptr.is_null() { return None; diff --git a/server/src/transport/packet.rs b/server/src/transport/packet.rs index 5e3e40f3..40d61252 100644 --- a/server/src/transport/packet.rs +++ b/server/src/transport/packet.rs @@ -1,7 +1,5 @@ -use crate::native::ffi; +use bytes::Bytes; use serde::Serialize; -use std::ffi::c_void; -use std::fmt; use std::sync::Arc; pub const PACKET_VERSION: u8 = 1; @@ -10,69 +8,6 @@ pub const FLAG_CONFIG: u8 = 1 << 1; pub const FLAG_DISCONTINUITY: u8 = 1 << 2; pub const PACKET_HEADER_BYTES: usize = 36; -pub struct ForeignBytes { - data: *const u8, - length: usize, - owner: *const c_void, -} - -impl ForeignBytes { - pub unsafe fn from_ffi(bytes: ffi::xcw_native_shared_bytes) -> Option { - if bytes.data.is_null() || bytes.length == 0 { - if !bytes.owner.is_null() { - unsafe { - ffi::xcw_native_release_shared_bytes(bytes); - } - } - return None; - } - - Some(Self { - data: bytes.data, - length: bytes.length, - owner: bytes.owner, - }) - } - - pub fn as_slice(&self) -> &[u8] { - unsafe { std::slice::from_raw_parts(self.data, self.length) } - } - - pub fn len(&self) -> usize { - self.length - } -} - -impl AsRef<[u8]> for ForeignBytes { - fn as_ref(&self) -> &[u8] { - self.as_slice() - } -} - -impl Drop for ForeignBytes { - fn drop(&mut self) { - unsafe { - ffi::xcw_native_release_shared_bytes(ffi::xcw_native_shared_bytes { - data: self.data, - length: self.length, - owner: self.owner, - }); - } - } -} - -impl fmt::Debug for ForeignBytes { - fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { - formatter - .debug_struct("ForeignBytes") - .field("length", &self.length) - .finish() - } -} - -unsafe impl Send for ForeignBytes {} -unsafe impl Sync for ForeignBytes {} - #[derive(Debug)] pub struct FramePacket { pub frame_sequence: u64, @@ -81,13 +16,13 @@ pub struct FramePacket { pub width: u32, pub height: u32, pub codec: Option, - pub description: Option, - pub data: ForeignBytes, + pub description: Option, + pub data: Bytes, } impl FramePacket { pub fn header_bytes(&self, discontinuity: bool) -> [u8; PACKET_HEADER_BYTES] { - let description_length = self.description.as_ref().map_or(0, ForeignBytes::len); + let description_length = self.description.as_ref().map_or(0, Bytes::len); let mut flags = 0u8; if self.is_keyframe { flags |= FLAG_KEYFRAME; diff --git a/server/src/transport/webrtc.rs b/server/src/transport/webrtc.rs index 513e7b63..4aa2170e 100644 --- a/server/src/transport/webrtc.rs +++ b/server/src/transport/webrtc.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use std::time::Duration; use tokio::sync::broadcast::{self, error::TryRecvError}; use tokio::time; -use tracing::warn; +use tracing::{info, warn}; use webrtc::api::interceptor_registry::register_default_interceptors; use webrtc::api::media_engine::{MediaEngine, MIME_TYPE_H264}; use webrtc::api::APIBuilder; @@ -26,7 +26,10 @@ const ANNEX_B_START_CODE: &[u8] = &[0, 0, 0, 1]; const DEFAULT_STUN_URL: &str = "stun:stun.l.google.com:19302"; const WEBRTC_CONTROL_CHANNEL_LABEL: &str = "simdeck-control"; const WEBRTC_BOOTSTRAP_KEYFRAME_INTERVAL: Duration = Duration::from_millis(250); -const WEBRTC_BOOTSTRAP_KEYFRAME_REPEATS: u8 = 12; +const WEBRTC_BOOTSTRAP_KEYFRAME_REPEATS: u8 = 4; +const WEBRTC_MIN_REFRESH_INTERVAL: Duration = Duration::from_millis(16); +const WEBRTC_MAX_REFRESH_INTERVAL: Duration = Duration::from_millis(100); +const WEBRTC_WRITE_TIMEOUT: Duration = Duration::from_millis(120); #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] @@ -60,7 +63,16 @@ pub async fn create_answer( state.registry.remove(&udid); return Err(error); } - session.request_refresh(); + session.request_keyframe(); + info!( + "WebRTC offer for {udid}: remote_candidates={} remote_candidate_types={} ice_servers={}", + count_sdp_candidates(&payload.sdp), + summarize_sdp_candidate_types(&payload.sdp), + std::env::var("SIMDECK_WEBRTC_ICE_SERVERS") + .ok() + .filter(|value| !value.trim().is_empty()) + .unwrap_or_else(|| DEFAULT_STUN_URL.to_owned()) + ); let first_frame = session .wait_for_keyframe(Duration::from_secs(3)) @@ -97,6 +109,7 @@ pub async fn create_answer( .await .map_err(|error| AppError::internal(format!("create WebRTC peer connection: {error}")))?, ); + register_diagnostics(&peer_connection, &udid); register_control_data_channel(&peer_connection, session.clone(), udid.clone()); let video_track = Arc::new(TrackLocalStaticSample::new( @@ -141,6 +154,11 @@ pub async fn create_answer( .local_description() .await .ok_or_else(|| AppError::internal("WebRTC local description was not set."))?; + info!( + "WebRTC answer for {udid}: local_candidates={} local_candidate_types={}", + count_sdp_candidates(&local_description.sdp), + summarize_sdp_candidate_types(&local_description.sdp) + ); tokio::spawn(stream_h264_frames( state, @@ -157,6 +175,103 @@ pub async fn create_answer( }) } +fn register_diagnostics( + peer_connection: &Arc, + udid: &str, +) { + let candidate_udid = udid.to_owned(); + peer_connection.on_ice_candidate(Box::new(move |candidate| { + let candidate_udid = candidate_udid.clone(); + Box::pin(async move { + match candidate { + Some(candidate) => { + info!( + "WebRTC local candidate for {candidate_udid}: type={} protocol={} address={} port={} related={}:{} tcp={}", + candidate.typ, + candidate.protocol, + redact_candidate_address(&candidate.address), + candidate.port, + redact_candidate_address(&candidate.related_address), + candidate.related_port, + candidate.tcp_type + ); + } + None => { + info!("WebRTC local candidate gathering complete for {candidate_udid}"); + } + } + }) + })); + + let gathering_udid = udid.to_owned(); + peer_connection.on_ice_gathering_state_change(Box::new(move |state| { + let gathering_udid = gathering_udid.clone(); + Box::pin(async move { + info!("WebRTC ICE gathering state for {gathering_udid}: {state}"); + }) + })); + + let ice_udid = udid.to_owned(); + peer_connection.on_ice_connection_state_change(Box::new(move |state| { + let ice_udid = ice_udid.clone(); + Box::pin(async move { + info!("WebRTC ICE connection state for {ice_udid}: {state}"); + }) + })); + + let peer_udid = udid.to_owned(); + peer_connection.on_peer_connection_state_change(Box::new(move |state| { + let peer_udid = peer_udid.clone(); + Box::pin(async move { + info!("WebRTC peer connection state for {peer_udid}: {state}"); + }) + })); +} + +fn count_sdp_candidates(sdp: &str) -> usize { + sdp.lines() + .filter(|line| line.starts_with("a=candidate:")) + .count() +} + +fn summarize_sdp_candidate_types(sdp: &str) -> String { + let mut host = 0usize; + let mut srflx = 0usize; + let mut prflx = 0usize; + let mut relay = 0usize; + let mut other = 0usize; + for line in sdp.lines().filter(|line| line.starts_with("a=candidate:")) { + match line + .split_whitespace() + .collect::>() + .windows(2) + .find_map(|pair| { + if pair[0] == "typ" { + Some(pair[1]) + } else { + None + } + }) { + Some("host") => host += 1, + Some("srflx") => srflx += 1, + Some("prflx") => prflx += 1, + Some("relay") => relay += 1, + Some(_) | None => other += 1, + } + } + format!("host={host},srflx={srflx},prflx={prflx},relay={relay},other={other}") +} + +fn redact_candidate_address(address: &str) -> String { + if address.is_empty() { + return String::new(); + } + if address.parse::().is_ok() { + return "".to_owned(); + } + "".to_owned() +} + fn register_control_data_channel( peer_connection: &Arc, session: crate::simulators::session::SimulatorSession, @@ -241,11 +356,20 @@ async fn stream_h264_frames( let mut last_sequence = 0u64; let mut send_timing = WebRtcSendTiming::new(); let mut bootstrap_interval = time::interval(WEBRTC_BOOTSTRAP_KEYFRAME_INTERVAL); + let mut refresh_sleep = Box::pin(time::sleep(WEBRTC_MIN_REFRESH_INTERVAL)); let mut bootstrap_frames_remaining = WEBRTC_BOOTSTRAP_KEYFRAME_REPEATS; + let mut adaptive_refresh_interval = WEBRTC_MIN_REFRESH_INTERVAL; + let mut waiting_for_keyframe = false; let _guard = WebRtcMetricsGuard::new(state.metrics.clone()); loop { tokio::select! { + _ = &mut refresh_sleep => { + session.request_refresh(); + refresh_sleep + .as_mut() + .reset(time::Instant::now() + adaptive_refresh_interval); + } _ = bootstrap_interval.tick(), if bootstrap_frames_remaining > 0 => { if let Err(error) = write_frame_sample( &video_track, @@ -266,7 +390,8 @@ async fn stream_h264_frames( .metrics .frames_dropped_server .fetch_add(skipped, Ordering::Relaxed); - session.request_refresh(); + waiting_for_keyframe = true; + session.request_keyframe(); continue; } Err(broadcast::error::RecvError::Closed) => break, @@ -278,7 +403,8 @@ async fn stream_h264_frames( .frames_dropped_server .fetch_add(skipped, Ordering::Relaxed); if !frame.is_keyframe { - session.request_refresh(); + waiting_for_keyframe = true; + session.request_keyframe(); continue; } } @@ -287,14 +413,32 @@ async fn stream_h264_frames( .metrics .frames_dropped_server .fetch_add(frame.frame_sequence - last_sequence - 1, Ordering::Relaxed); - session.request_refresh(); + waiting_for_keyframe = true; + session.request_keyframe(); + continue; + } + if waiting_for_keyframe && !frame.is_keyframe { + state.metrics.frames_dropped_server.fetch_add(1, Ordering::Relaxed); continue; } if frame.is_keyframe { latest_keyframe = frame.clone(); + waiting_for_keyframe = false; } let duration = send_timing.duration_for(&frame); - if let Err(error) = write_frame_sample(&video_track, &frame, duration).await { + let started_at = time::Instant::now(); + let write_result = time::timeout( + WEBRTC_WRITE_TIMEOUT, + write_frame_sample(&video_track, &frame, duration), + ).await; + adaptive_refresh_interval = adaptive_interval_for_write(started_at.elapsed()); + if let Err(error) = write_result + .map_err(|_| anyhow::anyhow!( + "timed out writing WebRTC frame after {}ms", + WEBRTC_WRITE_TIMEOUT.as_millis() + )) + .and_then(|result| result) + { warn!("WebRTC frame write failed for {udid}: {error}"); break; } @@ -307,6 +451,14 @@ async fn stream_h264_frames( let _ = peer_connection.close().await; } +fn adaptive_interval_for_write(write_elapsed: Duration) -> Duration { + let target_ms = (write_elapsed.as_millis() as u64).saturating_mul(2).clamp( + WEBRTC_MIN_REFRESH_INTERVAL.as_millis() as u64, + WEBRTC_MAX_REFRESH_INTERVAL.as_millis() as u64, + ); + Duration::from_millis(target_ms) +} + async fn write_frame_sample( video_track: &TrackLocalStaticSample, frame: &crate::transport::packet::SharedFrame, @@ -324,8 +476,8 @@ async fn write_frame_sample( } fn h264_annex_b_sample(frame: &crate::transport::packet::FramePacket) -> anyhow::Result> { - let data = frame.data.as_slice(); - let description = frame.description.as_ref().map(|bytes| bytes.as_slice()); + let data = frame.data.as_ref(); + let description = frame.description.as_ref().map(bytes::Bytes::as_ref); let mut sample = Vec::with_capacity(data.len() + description.map_or(0, |bytes| bytes.len())); if frame.is_keyframe { diff --git a/server/src/transport/webtransport.rs b/server/src/transport/webtransport.rs index 3d70f317..3abbdfba 100644 --- a/server/src/transport/webtransport.rs +++ b/server/src/transport/webtransport.rs @@ -2,7 +2,7 @@ use crate::api::routes::AppState; use crate::auth; use crate::metrics::counters::Metrics; use crate::simulators::session::SimulatorSession; -use crate::transport::packet::{ControlHello, ForeignBytes, SharedFrame, PACKET_VERSION}; +use crate::transport::packet::{ControlHello, SharedFrame, PACKET_VERSION}; use anyhow::Context; use std::sync::atomic::Ordering; use std::sync::Arc; @@ -214,13 +214,13 @@ async fn send_frame( let description = frame .description .as_ref() - .map(ForeignBytes::as_slice) + .map(bytes::Bytes::as_ref) .unwrap_or(&[]); stream.write_all(&header).await?; if !description.is_empty() { stream.write_all(description).await?; } - let data = frame.data.as_slice(); + let data = frame.data.as_ref(); if !data.is_empty() { stream.write_all(data).await?; }