From 14ec06136c54359fffe16bab046e8cae280fcd8e Mon Sep 17 00:00:00 2001 From: William Yang Date: Sun, 19 Apr 2026 13:25:40 +0200 Subject: [PATCH 1/6] feat(quic): add Quiche-backed MQTT-over-QUIC engine and improve feature flags for FFI bindings --- Cargo.toml | 57 ++++-- flowsdk_ffi/Cargo.toml | 23 ++- flowsdk_ffi/src/engine.rs | 267 +++++++++++++------------ src/mqtt_client/mod.rs | 4 + src/mqtt_client/quic_engine_quiche.rs | 270 ++++++++++++++++++++++++++ 5 files changed, 479 insertions(+), 142 deletions(-) create mode 100644 src/mqtt_client/quic_engine_quiche.rs diff --git a/Cargo.toml b/Cargo.toml index 3d9ae2df..565bc956 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -14,16 +14,16 @@ members = [".", "mqtt_grpc_duality", "flowsdk_ffi"] exclude = ["fuzz"] [dependencies] -tonic = "0.14.1" -prost = "0.14" -tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "net", "io-util", "sync", "time"] } -tonic-prost = "0.14.1" +tonic = { version = "0.14.1", optional = true } +prost = { version = "0.14", optional = true } +tokio = { version = "1.0", features = ["sync", "time", "io-util", "net"] } +tonic-prost = { version = "0.14.1", optional = true } serde = { version = "1.0.218", features = ["derive"] } hex = "0.4" bytes = { version = "1", features = ["serde"] } -tokio-stream = "0.1" +tokio-stream = { version = "0.1", optional = true } arbitrary = { version = "1", optional = true, features = ["derive"] } -serde_json = "1.0" +serde_json = { version = "1.0" } slab = "0.4.11" async-trait = "0.1" thiserror = "1.0" @@ -40,24 +40,39 @@ rustls = { version = "0.23", optional = true, default-features = false, features rustls-native-certs = { version = "0.7", optional = true } rustls-pki-types = { version = "1", optional = true } tokio-rustls = { version = "0.26", optional = true } +# QUIC via quiche (uses system OpenSSL dynamically — no vendored crypto) +quiche = { version = "0.28.0", default-features = false, features = ["openssl"], optional = true } [features] -default = ["strict-protocol-compliance", "tls"] -# TLS/SSL transport support -tls = ["dep:tokio-native-tls", "dep:native-tls"] -# QUIC transport support -quic = ["dep:quinn", "dep:quinn-proto", "dep:rustls", "dep:rustls-native-certs", "dep:rustls-pki-types"] -# Rustls-based TLS over TCP (mqtts://) transport support +default = ["strict-protocol-compliance", "tls", "async-client"] +# TLS/SSL transport support (native-tls based, used by the async TCP client) +tls = ["dep:tokio-native-tls", "dep:native-tls", "async-client"] +# Sans-I/O QUIC protocol engine (no tokio runtime required) +quic-proto = ["dep:quinn-proto", "dep:rustls", "dep:rustls-native-certs", "dep:rustls-pki-types"] +# Full async QUIC support (adds tokio transport on top of quic-proto) +quic = ["quic-proto", "dep:quinn", "async-client"] +# Rustls-based TLS over TCP — tls_engine is Sans-I/O, tokio-rustls needed only for transport rustls-tls = [ - "dep:tokio-rustls", "dep:rustls", "dep:rustls-native-certs", "dep:rustls-pki-types", + "dep:tokio-rustls", +] +# Tokio-based async MQTT client (adds the high-throughput multi-threaded runtime) +async-client = [ + "tokio/macros", + "tokio/rt-multi-thread", + "tokio/net", + "dep:tokio-stream", ] +# gRPC support (only needed for mqtt_grpc_duality proxy) +grpc = ["dep:tonic", "dep:prost", "dep:tonic-prost"] strict-protocol-compliance = [] # ⚠️ DANGEROUS: Enable raw packet API for protocol compliance testing # DO NOT enable in production builds protocol-testing = [] +# Quiche-backed QUIC (uses system OpenSSL via pkg-config, no vendored crypto) +quic-quiche = ["dep:quiche"] # Force static linking of the C runtime # usage: cargo build --target aarch64-unknown-linux-musl --examples --features quic @@ -82,3 +97,19 @@ required-features = ["quic"] [[example]] name = "tokio_async_mqtt_quic_example" required-features = ["quic"] + +# Release profile optimizations +[profile.release] +strip = true # Strip debug symbols automatically +lto = true # Enable link-time optimization +codegen-units = 1 # Better optimization (slower compile) +opt-level = 3 # Maximum performance + +# Size-optimized profile for FFI and embedded use cases +[profile.release-size] +inherits = "release" +opt-level = "z" # Optimize aggressively for size +lto = true +strip = true +codegen-units = 1 +panic = 'abort' # Remove panic unwinding code diff --git a/flowsdk_ffi/Cargo.toml b/flowsdk_ffi/Cargo.toml index fa6c51ec..a46f6e2a 100644 --- a/flowsdk_ffi/Cargo.toml +++ b/flowsdk_ffi/Cargo.toml @@ -12,13 +12,16 @@ categories = ["api-bindings"] [lib] crate-type = ["cdylib", "staticlib", "rlib"] +[[bin]] +name = "uniffi-bindgen" +path = "src/bin/uniffi-bindgen.rs" +required-features = ["uniffi-bindings"] [dependencies] -flowsdk = { path = "..", version = "0.4.1" } +flowsdk = { path = "..", version = "0.4.1", default-features = false } libc = "0.2" -serde = { version = "1.0", features = ["derive"] } -uniffi = { version = "0.28", features = ["cli"] } -uniffi_macros = "0.28" -serde_json = "1.0" +serde = { version = "1.0", features = ["derive"], optional = true } +uniffi = { version = "0.28", features = ["cli"], optional = true } +uniffi_macros = { version = "0.28", optional = true } rustls = { version = "0.23", optional = true } rustls-pemfile = { version = "2.1", optional = true } rustls-native-certs = { version = "0.7", optional = true } @@ -26,6 +29,10 @@ quinn-proto = { version = "0.11", optional = true } rustls-pki-types = { version = "1", optional = true } [features] -default = ["tls", "quic"] -quic = ["tls", "flowsdk/quic", "dep:quinn-proto"] -tls = ["flowsdk/rustls-tls", "dep:rustls", "dep:rustls-native-certs", "dep:rustls-pemfile", "dep:rustls-pki-types"] +default = ["tls", "quic", "uniffi-bindings"] +quic = ["tls", "flowsdk/quic-proto", "flowsdk/strict-protocol-compliance", "dep:quinn-proto"] +# Quiche-backed QUIC: system OpenSSL (dynamic), no vendored crypto code +quic-quiche = ["flowsdk/quic-quiche", "flowsdk/strict-protocol-compliance"] +tls = ["flowsdk/rustls-tls", "flowsdk/strict-protocol-compliance", "dep:rustls", "dep:rustls-native-certs", "dep:rustls-pemfile", "dep:rustls-pki-types"] +# UniFFI bindings for Kotlin/Python/Swift language bindings +uniffi-bindings = ["dep:uniffi", "dep:uniffi_macros"] diff --git a/flowsdk_ffi/src/engine.rs b/flowsdk_ffi/src/engine.rs index b8142fa1..b79475cf 100644 --- a/flowsdk_ffi/src/engine.rs +++ b/flowsdk_ffi/src/engine.rs @@ -11,23 +11,28 @@ use ffi_types::*; use std::sync::Mutex; -#[derive(uniffi::Object)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Object))] pub struct MqttEngineFFI { engine: Mutex, start_time: Instant, events: Mutex>, } +// QUIC backend selection: `quic` (quinn-proto) takes priority over `quic-quiche`. #[cfg(feature = "quic")] -use flowsdk::mqtt_client::engine::QuicMqttEngine; +use flowsdk::mqtt_client::engine::QuicMqttEngine as ActiveQuicEngine; +#[cfg(all(feature = "quic-quiche", not(feature = "quic")))] +use flowsdk::mqtt_client::quic_engine_quiche::QuicMqttEngineQuiche as ActiveQuicEngine; + #[cfg(feature = "tls")] use flowsdk::mqtt_client::tls_engine::TlsMqttEngine; use std::net::SocketAddr; +#[cfg(feature = "quic")] use std::sync::Arc; -#[uniffi::export] +#[cfg_attr(feature = "uniffi-bindings", uniffi::export)] impl MqttEngineFFI { - #[uniffi::constructor] + #[cfg_attr(feature = "uniffi-bindings", uniffi::constructor)] pub fn new(client_id: Option, mqtt_version: u8) -> Self { let client_id = client_id.unwrap_or_else(|| "mqtt_client".to_string()); let options = MqttClientOptions::builder() @@ -43,7 +48,7 @@ impl MqttEngineFFI { } } - #[uniffi::constructor] + #[cfg_attr(feature = "uniffi-bindings", uniffi::constructor)] pub fn new_with_opts(opts: MqttOptionsFFI) -> Self { let mut builder = MqttClientOptions::builder() .client_id(opts.client_id) @@ -223,16 +228,18 @@ fn map_event(event: MqttEvent) -> MqttEventFFI { } } -#[derive(uniffi::Object)] +#[cfg(feature = "tls")] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Object))] pub struct TlsMqttEngineFFI { engine: Mutex, start_time: Instant, events: Mutex>, } -#[uniffi::export] +#[cfg(feature = "tls")] +#[cfg_attr(feature = "uniffi-bindings", uniffi::export)] impl TlsMqttEngineFFI { - #[uniffi::constructor] + #[cfg_attr(feature = "uniffi-bindings", uniffi::constructor)] pub fn new(opts: MqttOptionsFFI, tls_opts: MqttTlsOptionsFFI, server_name: String) -> Self { let options = MqttClientOptions::builder() .client_id(opts.client_id) @@ -389,9 +396,11 @@ impl TlsMqttEngineFFI { } } +#[cfg(feature = "tls")] #[derive(Debug)] struct InsecureServerCertVerifier; +#[cfg(feature = "tls")] impl rustls::client::danger::ServerCertVerifier for InsecureServerCertVerifier { fn verify_server_cert( &self, @@ -431,16 +440,18 @@ impl rustls::client::danger::ServerCertVerifier for InsecureServerCertVerifier { } } -#[derive(uniffi::Object)] +#[cfg(any(feature = "quic", feature = "quic-quiche"))] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Object))] pub struct QuicMqttEngineFFI { - engine: Mutex, + engine: Mutex, start_time: Instant, events: Mutex>, } -#[uniffi::export] +#[cfg(any(feature = "quic", feature = "quic-quiche"))] +#[cfg_attr(feature = "uniffi-bindings", uniffi::export)] impl QuicMqttEngineFFI { - #[uniffi::constructor] + #[cfg_attr(feature = "uniffi-bindings", uniffi::constructor)] pub fn new(opts: MqttOptionsFFI) -> Self { let options = MqttClientOptions::builder() .client_id(opts.client_id) @@ -452,7 +463,7 @@ impl QuicMqttEngineFFI { .max_reconnect_attempts(opts.max_reconnect_attempts) .build(); - let engine = QuicMqttEngine::new(options).unwrap(); + let engine = ActiveQuicEngine::new(options).unwrap(); QuicMqttEngineFFI { engine: Mutex::new(engine), start_time: Instant::now(), @@ -460,67 +471,6 @@ impl QuicMqttEngineFFI { } } - pub fn connect( - &self, - server_addr: String, - server_name: String, - tls_opts: MqttTlsOptionsFFI, - now_ms: u64, - ) { - let addr: SocketAddr = server_addr.parse().unwrap(); - let now = self.start_time + Duration::from_millis(now_ms); - - let _ = rustls::crypto::ring::default_provider().install_default(); - let crypto_builder = rustls::ClientConfig::builder(); - - let mut config = if tls_opts.insecure_skip_verify { - crypto_builder - .dangerous() - .with_custom_certificate_verifier(Arc::new(InsecureServerCertVerifier)) - .with_no_client_auth() - } else { - let mut root_store = rustls::RootCertStore::empty(); - if let Some(ca_path) = tls_opts.ca_cert_file { - if let Ok(file) = std::fs::File::open(ca_path) { - let mut reader = std::io::BufReader::new(file); - let certs = rustls_pemfile::certs(&mut reader) - .filter_map(|r| r.ok()) - .collect::>(); - for cert in certs { - root_store.add(cert).ok(); - } - } - } else { - for cert in rustls_native_certs::load_native_certs().unwrap_or_default() { - root_store.add(cert).ok(); - } - } - crypto_builder - .with_root_certificates(root_store) - .with_no_client_auth() - }; - - if !tls_opts.alpn_protocols.is_empty() { - config.alpn_protocols = tls_opts - .alpn_protocols - .into_iter() - .map(|s| s.into_bytes()) - .collect(); - } else { - config.alpn_protocols = vec![b"mqtt".to_vec()]; - } - - if tls_opts.enable_key_log { - config.key_log = Arc::new(rustls::KeyLogFile::new()); - } - - self.engine - .lock() - .unwrap() - .connect(addr, &server_name, config, now) - .ok(); - } - pub fn handle_datagram(&self, data: Vec, remote_addr: String, now_ms: u64) { let addr: SocketAddr = remote_addr.parse().unwrap(); let now = self.start_time + Duration::from_millis(now_ms); @@ -594,6 +544,101 @@ impl QuicMqttEngineFFI { } } +/// Quinn-proto `connect()` — builds a `rustls::ClientConfig` and calls the QUIC engine. +#[cfg(feature = "quic")] +impl QuicMqttEngineFFI { + pub fn connect( + &self, + server_addr: String, + server_name: String, + tls_opts: MqttTlsOptionsFFI, + now_ms: u64, + ) { + let addr: SocketAddr = server_addr.parse().unwrap(); + let now = self.start_time + Duration::from_millis(now_ms); + + let _ = rustls::crypto::ring::default_provider().install_default(); + let crypto_builder = rustls::ClientConfig::builder(); + + let mut config = if tls_opts.insecure_skip_verify { + crypto_builder + .dangerous() + .with_custom_certificate_verifier(Arc::new(InsecureServerCertVerifier)) + .with_no_client_auth() + } else { + let mut root_store = rustls::RootCertStore::empty(); + if let Some(ca_path) = tls_opts.ca_cert_file { + if let Ok(file) = std::fs::File::open(ca_path) { + let mut reader = std::io::BufReader::new(file); + let certs = rustls_pemfile::certs(&mut reader) + .filter_map(|r| r.ok()) + .collect::>(); + for cert in certs { + root_store.add(cert).ok(); + } + } + } else { + for cert in rustls_native_certs::load_native_certs().unwrap_or_default() { + root_store.add(cert).ok(); + } + } + crypto_builder + .with_root_certificates(root_store) + .with_no_client_auth() + }; + + if !tls_opts.alpn_protocols.is_empty() { + config.alpn_protocols = tls_opts + .alpn_protocols + .into_iter() + .map(|s| s.into_bytes()) + .collect(); + } else { + config.alpn_protocols = vec![b"mqtt".to_vec()]; + } + + if tls_opts.enable_key_log { + config.key_log = Arc::new(rustls::KeyLogFile::new()); + } + + self.engine + .lock() + .unwrap() + .connect(addr, &server_name, config, now) + .ok(); + } +} + +/// Quiche `connect()` — builds quiche's TLS config directly (system OpenSSL). +#[cfg(all(feature = "quic-quiche", not(feature = "quic")))] +impl QuicMqttEngineFFI { + pub fn connect( + &self, + server_addr: String, + server_name: String, + tls_opts: MqttTlsOptionsFFI, + _now_ms: u64, + ) { + let addr: SocketAddr = server_addr.parse().unwrap(); + let alpn: Vec> = tls_opts + .alpn_protocols + .into_iter() + .map(|s| s.into_bytes()) + .collect(); + self.engine + .lock() + .unwrap() + .connect( + addr, + &server_name, + tls_opts.ca_cert_file.as_deref(), + tls_opts.insecure_skip_verify, + alpn, + ) + .ok(); + } +} + // --- C-Compatible FFI Layer --- // This layer provides a stable ABI for the C examples, mapping to the UniFFI objects. @@ -878,6 +923,7 @@ pub unsafe extern "C" fn mqtt_engine_free_string(ptr: *mut c_char) { /// /// This function is unsafe because it dereferences raw pointers for `client_id`, /// `server_name`, and `tls_opts`. +#[cfg(feature = "tls")] #[no_mangle] pub unsafe extern "C" fn mqtt_tls_engine_new( client_id: *const c_char, @@ -971,6 +1017,7 @@ pub unsafe extern "C" fn mqtt_tls_engine_new( /// # Safety /// /// This function is unsafe because it performs manual memory deallocation of a `TlsMqttEngineFFI`. +#[cfg(feature = "tls")] #[no_mangle] pub unsafe extern "C" fn mqtt_tls_engine_free(ptr: *mut TlsMqttEngineFFI) { if !ptr.is_null() { @@ -981,6 +1028,7 @@ pub unsafe extern "C" fn mqtt_tls_engine_free(ptr: *mut TlsMqttEngineFFI) { /// # Safety /// /// This function is unsafe because it dereferences a raw pointer to `TlsMqttEngineFFI`. +#[cfg(feature = "tls")] #[no_mangle] pub unsafe extern "C" fn mqtt_tls_engine_connect(ptr: *mut TlsMqttEngineFFI) { if let Some(engine) = ptr.as_ref() { @@ -991,6 +1039,7 @@ pub unsafe extern "C" fn mqtt_tls_engine_connect(ptr: *mut TlsMqttEngineFFI) { /// # Safety /// /// This function is unsafe because it dereferences raw pointers for `ptr` and `data`. +#[cfg(feature = "tls")] #[no_mangle] pub unsafe extern "C" fn mqtt_tls_engine_handle_socket_data( ptr: *mut TlsMqttEngineFFI, @@ -1006,6 +1055,7 @@ pub unsafe extern "C" fn mqtt_tls_engine_handle_socket_data( /// # Safety /// /// This function is unsafe because it dereferences raw pointers for `ptr` and `out_len`. +#[cfg(feature = "tls")] #[no_mangle] pub unsafe extern "C" fn mqtt_tls_engine_take_socket_data( ptr: *mut TlsMqttEngineFFI, @@ -1034,6 +1084,7 @@ pub unsafe extern "C" fn mqtt_tls_engine_take_socket_data( /// # Safety /// /// This function is unsafe because it dereferences a raw pointer to `TlsMqttEngineFFI`. +#[cfg(feature = "tls")] #[no_mangle] pub unsafe extern "C" fn mqtt_tls_engine_handle_tick(ptr: *mut TlsMqttEngineFFI, now_ms: u64) { if let Some(engine) = ptr.as_ref() { @@ -1044,6 +1095,7 @@ pub unsafe extern "C" fn mqtt_tls_engine_handle_tick(ptr: *mut TlsMqttEngineFFI, /// # Safety /// /// This function is unsafe because it dereferences raw pointers for `ptr`, `topic`, and `payload`. +#[cfg(feature = "tls")] #[no_mangle] pub unsafe extern "C" fn mqtt_tls_engine_publish( ptr: *mut TlsMqttEngineFFI, @@ -1064,6 +1116,7 @@ pub unsafe extern "C" fn mqtt_tls_engine_publish( /// # Safety /// /// This function is unsafe because it dereferences raw pointers for `ptr` and `topic_filter`. +#[cfg(feature = "tls")] #[no_mangle] pub unsafe extern "C" fn mqtt_tls_engine_subscribe( ptr: *mut TlsMqttEngineFFI, @@ -1081,6 +1134,7 @@ pub unsafe extern "C" fn mqtt_tls_engine_subscribe( /// # Safety /// /// This function is unsafe because it dereferences raw pointers for `ptr` and `topic_filter`. +#[cfg(feature = "tls")] #[no_mangle] pub unsafe extern "C" fn mqtt_tls_engine_unsubscribe( ptr: *mut TlsMqttEngineFFI, @@ -1097,6 +1151,7 @@ pub unsafe extern "C" fn mqtt_tls_engine_unsubscribe( /// # Safety /// /// This function is unsafe because it dereferences a raw pointer to `TlsMqttEngineFFI`. +#[cfg(feature = "tls")] #[no_mangle] pub unsafe extern "C" fn mqtt_tls_engine_disconnect(ptr: *mut TlsMqttEngineFFI) { if let Some(engine) = ptr.as_ref() { @@ -1107,6 +1162,7 @@ pub unsafe extern "C" fn mqtt_tls_engine_disconnect(ptr: *mut TlsMqttEngineFFI) /// # Safety /// /// This function is unsafe because it dereferences a raw pointer to `TlsMqttEngineFFI`. +#[cfg(feature = "tls")] #[no_mangle] pub unsafe extern "C" fn mqtt_tls_engine_is_connected(ptr: *mut TlsMqttEngineFFI) -> i32 { if let Some(engine) = ptr.as_ref() { @@ -1120,41 +1176,12 @@ pub unsafe extern "C" fn mqtt_tls_engine_is_connected(ptr: *mut TlsMqttEngineFFI } } -/// # Safety -/// -/// This function is unsafe because it dereferences a raw pointer to `MqttEngineFFI` -/// and returns an allocated `c_char` pointer that must be freed using `mqtt_engine_free_string`. -#[no_mangle] -pub unsafe extern "C" fn mqtt_engine_take_events(ptr: *mut MqttEngineFFI) -> *mut c_char { - if let Some(engine) = ptr.as_ref() { - let events = engine.take_events(); - let json = serde_json::to_string(&events).unwrap_or_else(|_| "[]".to_string()); - CString::new(json).unwrap().into_raw() - } else { - std::ptr::null_mut() - } -} - -/// # Safety -/// -/// This function is unsafe because it dereferences a raw pointer to `TlsMqttEngineFFI` -/// and returns an allocated `c_char` pointer that must be freed using `mqtt_engine_free_string`. -#[no_mangle] -pub unsafe extern "C" fn mqtt_tls_engine_take_events(ptr: *mut TlsMqttEngineFFI) -> *mut c_char { - if let Some(engine) = ptr.as_ref() { - let events = engine.take_events(); - let json = serde_json::to_string(&events).unwrap_or_else(|_| "[]".to_string()); - CString::new(json).unwrap().into_raw() - } else { - std::ptr::null_mut() - } -} - // QUIC Engine C wrappers /// # Safety /// /// This function is unsafe because it dereferences a raw pointer for `client_id` /// and returns a raw pointer to a new `QuicMqttEngineFFI`. +#[cfg(any(feature = "quic", feature = "quic-quiche"))] #[no_mangle] pub unsafe extern "C" fn mqtt_quic_engine_new( client_id: *const c_char, @@ -1182,6 +1209,7 @@ pub unsafe extern "C" fn mqtt_quic_engine_new( /// # Safety /// /// This function is unsafe because it performs manual memory deallocation of a `QuicMqttEngineFFI`. +#[cfg(any(feature = "quic", feature = "quic-quiche"))] #[no_mangle] pub unsafe extern "C" fn mqtt_quic_engine_free(ptr: *mut QuicMqttEngineFFI) { if !ptr.is_null() { @@ -1193,6 +1221,7 @@ pub unsafe extern "C" fn mqtt_quic_engine_free(ptr: *mut QuicMqttEngineFFI) { /// /// This function is unsafe because it dereferences raw pointers for `ptr`, `server_addr`, /// `server_name`, and `tls_opts`. +#[cfg(any(feature = "quic", feature = "quic-quiche"))] #[no_mangle] pub unsafe extern "C" fn mqtt_quic_engine_connect( ptr: *mut QuicMqttEngineFFI, @@ -1264,6 +1293,7 @@ pub unsafe extern "C" fn mqtt_quic_engine_connect( /// # Safety /// /// This function is unsafe because it dereferences raw pointers for `ptr`, `data`, and `remote_addr`. +#[cfg(any(feature = "quic", feature = "quic-quiche"))] #[no_mangle] pub unsafe extern "C" fn mqtt_quic_engine_handle_datagram( ptr: *mut QuicMqttEngineFFI, @@ -1281,6 +1311,7 @@ pub unsafe extern "C" fn mqtt_quic_engine_handle_datagram( /// # Safety /// /// This function is unsafe because it dereferences raw pointers for `ptr` and `out_count`. +#[cfg(any(feature = "quic", feature = "quic-quiche"))] #[no_mangle] pub unsafe extern "C" fn mqtt_quic_engine_take_outgoing_datagrams( ptr: *mut QuicMqttEngineFFI, @@ -1324,6 +1355,7 @@ pub unsafe extern "C" fn mqtt_quic_engine_take_outgoing_datagrams( /// # Safety /// /// This function is unsafe because it performs manual memory deallocation of a datagram slice. +#[cfg(any(feature = "quic", feature = "quic-quiche"))] #[no_mangle] pub unsafe extern "C" fn mqtt_quic_engine_free_datagrams(ptr: *mut MqttDatagramC, count: usize) { if !ptr.is_null() { @@ -1346,6 +1378,7 @@ pub unsafe extern "C" fn mqtt_quic_engine_free_datagrams(ptr: *mut MqttDatagramC /// # Safety /// /// This function is unsafe because it dereferences a raw pointer to `QuicMqttEngineFFI`. +#[cfg(any(feature = "quic", feature = "quic-quiche"))] #[no_mangle] pub unsafe extern "C" fn mqtt_quic_engine_handle_tick(ptr: *mut QuicMqttEngineFFI, now_ms: u64) { if let Some(engine) = ptr.as_ref() { @@ -1353,24 +1386,10 @@ pub unsafe extern "C" fn mqtt_quic_engine_handle_tick(ptr: *mut QuicMqttEngineFF } } -/// # Safety -/// -/// This function is unsafe because it dereferences a raw pointer to `QuicMqttEngineFFI` -/// and returns an allocated `c_char` pointer that must be freed using `mqtt_engine_free_string`. -#[no_mangle] -pub unsafe extern "C" fn mqtt_quic_engine_take_events(ptr: *mut QuicMqttEngineFFI) -> *mut c_char { - if let Some(engine) = ptr.as_ref() { - let events = engine.take_events(); - let json = serde_json::to_string(&events).unwrap_or_else(|_| "[]".to_string()); - CString::new(json).unwrap().into_raw() - } else { - std::ptr::null_mut() - } -} - /// # Safety /// /// This function is unsafe because it dereferences raw pointers for `ptr`, `topic`, and `payload`. +#[cfg(any(feature = "quic", feature = "quic-quiche"))] #[no_mangle] pub unsafe extern "C" fn mqtt_quic_engine_publish( ptr: *mut QuicMqttEngineFFI, @@ -1391,6 +1410,7 @@ pub unsafe extern "C" fn mqtt_quic_engine_publish( /// # Safety /// /// This function is unsafe because it dereferences raw pointers for `ptr` and `topic_filter`. +#[cfg(any(feature = "quic", feature = "quic-quiche"))] #[no_mangle] pub unsafe extern "C" fn mqtt_quic_engine_subscribe( ptr: *mut QuicMqttEngineFFI, @@ -1408,6 +1428,7 @@ pub unsafe extern "C" fn mqtt_quic_engine_subscribe( /// # Safety /// /// This function is unsafe because it dereferences raw pointers for `ptr` and `topic_filter`. +#[cfg(any(feature = "quic", feature = "quic-quiche"))] #[no_mangle] pub unsafe extern "C" fn mqtt_quic_engine_unsubscribe( ptr: *mut QuicMqttEngineFFI, @@ -1424,6 +1445,7 @@ pub unsafe extern "C" fn mqtt_quic_engine_unsubscribe( /// # Safety /// /// This function is unsafe because it dereferences a raw pointer to `QuicMqttEngineFFI`. +#[cfg(any(feature = "quic", feature = "quic-quiche"))] #[no_mangle] pub unsafe extern "C" fn mqtt_quic_engine_disconnect(ptr: *mut QuicMqttEngineFFI) { if let Some(engine) = ptr.as_ref() { @@ -1434,6 +1456,7 @@ pub unsafe extern "C" fn mqtt_quic_engine_disconnect(ptr: *mut QuicMqttEngineFFI /// # Safety /// /// This function is unsafe because it dereferences a raw pointer to `QuicMqttEngineFFI`. +#[cfg(any(feature = "quic", feature = "quic-quiche"))] #[no_mangle] pub unsafe extern "C" fn mqtt_quic_engine_is_connected(ptr: *mut QuicMqttEngineFFI) -> i32 { if let Some(engine) = ptr.as_ref() { @@ -1480,12 +1503,12 @@ pub struct MqttDatagramC { // Event Inspection API for C (Native Structs) // Actually, let's just use a dedicated "C Event List" object to manage the lifetime. -#[derive(uniffi::Object)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Object))] pub struct MqttEventListFFI { events: Vec, } -#[uniffi::export] +#[cfg_attr(feature = "uniffi-bindings", uniffi::export)] impl MqttEventListFFI { pub fn len(&self) -> u32 { self.events.len() as u32 @@ -1677,6 +1700,7 @@ pub unsafe extern "C" fn mqtt_event_list_get_error_message( /// /// This function is unsafe because it dereferences a raw pointer to `QuicMqttEngineFFI` /// and returns an allocated `MqttEventListFFI` pointer that must be freed with `mqtt_event_list_free`. +#[cfg(any(feature = "quic", feature = "quic-quiche"))] #[no_mangle] pub unsafe extern "C" fn mqtt_quic_engine_take_events_list( ptr: *mut QuicMqttEngineFFI, @@ -1693,6 +1717,7 @@ pub unsafe extern "C" fn mqtt_quic_engine_take_events_list( /// /// This function is unsafe because it dereferences a raw pointer to `TlsMqttEngineFFI` /// and returns an allocated `MqttEventListFFI` pointer that must be freed with `mqtt_event_list_free`. +#[cfg(feature = "tls")] #[no_mangle] pub unsafe extern "C" fn mqtt_tls_engine_take_events_list( ptr: *mut TlsMqttEngineFFI, diff --git a/src/mqtt_client/mod.rs b/src/mqtt_client/mod.rs index 418c2d2d..14981bbf 100644 --- a/src/mqtt_client/mod.rs +++ b/src/mqtt_client/mod.rs @@ -12,10 +12,13 @@ pub mod opts; pub mod raw_packet; #[cfg(feature = "rustls-tls")] pub mod tls_engine; +#[cfg(feature = "async-client")] pub mod tokio_async_client; #[cfg(feature = "quic")] pub mod tokio_quic_client; pub mod transport; +#[cfg(feature = "quic-quiche")] +pub mod quic_engine_quiche; // Re-exports pub use async_client::{AsyncClientConfig, AsyncMqttClient, MqttEventHandler}; @@ -33,6 +36,7 @@ pub use no_io_client::NoIoMqttClient; pub use opts::{MqttClientOptions, MqttClientOptionsBuilder}; #[cfg(feature = "rustls-tls")] pub use tls_engine::TlsMqttEngine; +#[cfg(feature = "async-client")] pub use tokio_async_client::{ TokioAsyncClientConfig, TokioAsyncMqttClient, TokioMqttEvent, TokioMqttEventHandler, }; diff --git a/src/mqtt_client/quic_engine_quiche.rs b/src/mqtt_client/quic_engine_quiche.rs new file mode 100644 index 00000000..cc08776e --- /dev/null +++ b/src/mqtt_client/quic_engine_quiche.rs @@ -0,0 +1,270 @@ +// SPDX-License-Identifier: MPL-2.0 +//! Quiche-backed sans-I/O MQTT-over-QUIC engine. +//! +//! Uses Cloudflare's `quiche` library which can be linked against the system OpenSSL +//! (via `features = ["openssl"]`) rather than vendoring BoringSSL, keeping the +//! compiled library footprint small. +//! +//! The public API mirrors [`super::engine::QuicMqttEngine`] so FFI and higher-level +//! code can switch backends via a feature flag. + +use std::collections::VecDeque; +use std::net::SocketAddr; +use std::time::Instant; + +use super::commands::{PublishCommand, SubscribeCommand, UnsubscribeCommand}; +use super::engine::{MqttEngine, MqttEvent}; +use super::error::MqttClientError; +use super::opts::MqttClientOptions; + +/// Maximum UDP datagram size we read/write (standard Ethernet MTU minus overhead). +const MAX_DATAGRAM_SIZE: usize = 1350; + +/// The QUIC stream ID used for the MQTT control channel. +/// Client-initiated bidirectional streams start at 0 and increment by 4. +const MQTT_STREAM_ID: u64 = 0; + +/// A "Sans-I/O" MQTT over QUIC engine backed by Cloudflare's `quiche` library. +/// +/// Using quiche with the `openssl` feature flag allows dynamic linking against +/// the system's OpenSSL/libcrypto, avoiding the ~400 KB of vendored crypto code +/// that the default BoringSSL backend adds to the shared library. +/// +/// # Usage +/// +/// Identical to `QuicMqttEngine` — feed datagrams in, pull datagrams out. +pub struct QuicMqttEngineQuiche { + mqtt_engine: MqttEngine, + connection: Option>, + local_addr: Option, + peer_addr: Option, + outgoing_datagrams: VecDeque<(SocketAddr, Vec)>, + mqtt_stream_open: bool, +} + +impl QuicMqttEngineQuiche { + pub fn new(options: MqttClientOptions) -> Result { + Ok(Self { + mqtt_engine: MqttEngine::new(options), + connection: None, + local_addr: None, + peer_addr: None, + outgoing_datagrams: VecDeque::new(), + mqtt_stream_open: false, + }) + } + + /// Initiate a QUIC connection to `server_addr`. + /// + /// # Parameters + /// - `ca_cert_path`: path to a PEM CA certificate file; if `None` and + /// `insecure_skip_verify` is false, the system/default roots are used via + /// quiche's default verification. + /// - `insecure_skip_verify`: disable peer certificate verification (for + /// testing only). + /// - `alpn`: ALPN protocol IDs; defaults to `["mqtt"]`. + pub fn connect( + &mut self, + server_addr: SocketAddr, + server_name: &str, + ca_cert_path: Option<&str>, + insecure_skip_verify: bool, + alpn: Vec>, + ) -> Result<(), MqttClientError> { + let mut config = + quiche::Config::new(quiche::PROTOCOL_VERSION).map_err(|e| { + MqttClientError::InternalError { + message: format!("quiche Config::new failed: {e}"), + } + })?; + + // ALPN + let alpn_refs: Vec<&[u8]> = if alpn.is_empty() { + vec![b"mqtt"] + } else { + alpn.iter().map(|v| v.as_slice()).collect() + }; + config + .set_application_protos(&alpn_refs) + .map_err(|e| MqttClientError::InternalError { + message: format!("quiche set_application_protos failed: {e}"), + })?; + + // TLS verification + config.verify_peer(!insecure_skip_verify); + if let Some(path) = ca_cert_path { + config + .load_verify_locations_from_file(path) + .map_err(|e| MqttClientError::InternalError { + message: format!("quiche load_verify_locations_from_file failed: {e}"), + })?; + } + + // Transport parameters — generous limits for MQTT workloads + config.set_max_idle_timeout(120_000); + config.set_max_recv_udp_payload_size(MAX_DATAGRAM_SIZE); + config.set_max_send_udp_payload_size(MAX_DATAGRAM_SIZE); + config.set_initial_max_data(10_000_000); + config.set_initial_max_stream_data_bidi_local(1_024_000); + config.set_initial_max_stream_data_bidi_remote(1_024_000); + config.set_initial_max_streams_bidi(100); + config.set_disable_active_migration(true); + + // Pick a random source connection ID + let mut scid_bytes = [0u8; quiche::MAX_CONN_ID_LEN]; + for b in &mut scid_bytes { + *b = rand_byte(); + } + let scid = quiche::ConnectionId::from_ref(&scid_bytes); + + // Use an unspecified local address — the actual UDP socket is managed externally. + let local_addr: SocketAddr = if server_addr.is_ipv6() { + "[::]:0".parse().unwrap() + } else { + "0.0.0.0:0".parse().unwrap() + }; + + let conn = quiche::connect(Some(server_name), &scid, local_addr, server_addr, &mut config) + .map_err(|e| MqttClientError::InternalError { + message: format!("quiche::connect failed: {e}"), + })?; + + self.connection = Some(Box::new(conn)); + self.local_addr = Some(local_addr); + self.peer_addr = Some(server_addr); + + // Drain the initial handshake packets into outgoing_datagrams so the + // caller can send them on the first poll. + self.drain_outgoing(); + + Ok(()) + } + + /// Feed an incoming UDP datagram. + pub fn handle_datagram(&mut self, mut data: Vec, remote_addr: SocketAddr, _now: Instant) { + let local = self.local_addr.unwrap_or_else(|| "0.0.0.0:0".parse().unwrap()); + if let Some(conn) = &mut self.connection { + let recv_info = quiche::RecvInfo { + from: remote_addr, + to: local, + }; + // Ignore receive errors (e.g. Unknown Connection ID during handshake) + let _ = conn.recv(&mut data, recv_info); + } + } + + /// Drive the state machine, produce MQTT events, and queue outgoing datagrams. + pub fn handle_tick(&mut self, now: Instant) -> Vec { + let mut mqtt_events = Vec::new(); + + if let Some(conn) = &mut self.connection { + // Drive QUIC timeouts + conn.on_timeout(); + + // Check handshake completion + if conn.is_established() && !self.mqtt_stream_open { + self.mqtt_stream_open = true; + // Signal MQTT engine to send CONNECT + self.mqtt_engine.connect(); + } + + // Closed connection + if conn.is_closed() { + self.mqtt_engine.handle_connection_lost(); + mqtt_events.push(MqttEvent::Disconnected(None)); + } + + // Transfer QUIC stream data → MQTT engine + // We only run this after the stream has been opened + if self.mqtt_stream_open { + let mut buf = vec![0u8; 65536]; + loop { + match conn.stream_recv(MQTT_STREAM_ID, &mut buf) { + Ok((len, _fin)) => { + mqtt_events.extend(self.mqtt_engine.handle_incoming(&buf[..len])); + } + Err(quiche::Error::Done) => break, + Err(_) => break, + } + } + + // Transfer MQTT engine outgoing → QUIC stream + let outgoing = self.mqtt_engine.take_outgoing(); + if !outgoing.is_empty() { + let _ = conn.stream_send(MQTT_STREAM_ID, &outgoing, false); + } + } + } + + // Drive MQTT engine tick + let tick_events = self.mqtt_engine.handle_tick(now); + mqtt_events.extend(tick_events); + + // Drain any newly produced outgoing QUIC datagrams + self.drain_outgoing(); + + mqtt_events + } + + /// Drain all pending outgoing datagrams from the quiche connection. + fn drain_outgoing(&mut self) { + let peer = match self.peer_addr { + Some(a) => a, + None => return, + }; + let conn = match &mut self.connection { + Some(c) => c, + None => return, + }; + let mut out = vec![0u8; MAX_DATAGRAM_SIZE]; + loop { + match conn.send(&mut out) { + Ok((len, _send_info)) => { + self.outgoing_datagrams.push_back((peer, out[..len].to_vec())); + } + Err(quiche::Error::Done) => break, + Err(_) => break, + } + } + } + + pub fn take_outgoing_datagrams(&mut self) -> VecDeque<(SocketAddr, Vec)> { + std::mem::take(&mut self.outgoing_datagrams) + } + + pub fn take_events(&mut self) -> Vec { + self.mqtt_engine.take_events() + } + + pub fn publish(&mut self, command: PublishCommand) -> Result, MqttClientError> { + self.mqtt_engine.publish(command) + } + + pub fn subscribe(&mut self, command: SubscribeCommand) -> Result { + self.mqtt_engine.subscribe(command) + } + + pub fn unsubscribe(&mut self, command: UnsubscribeCommand) -> Result { + self.mqtt_engine.unsubscribe(command) + } + + pub fn disconnect(&mut self) { + self.mqtt_engine.disconnect(); + } + + pub fn is_connected(&self) -> bool { + self.mqtt_engine.is_connected() + } +} + +/// Cheap non-cryptographic random byte for connection ID generation. +/// Real deployments should use `rand`; this avoids adding a dependency. +fn rand_byte() -> u8 { + use std::collections::hash_map::DefaultHasher; + use std::hash::{Hash, Hasher}; + use std::time::SystemTime; + let mut h = DefaultHasher::new(); + SystemTime::now().hash(&mut h); + std::thread::current().id().hash(&mut h); + h.finish() as u8 +} From c79933b50c60ad8c48dffa37f6d00bb9d19feb96 Mon Sep 17 00:00:00 2001 From: William Yang Date: Sun, 19 Apr 2026 13:30:40 +0200 Subject: [PATCH 2/6] refactor(priority_queue): move Serialize+DeserializeOwned bounds to save/load methods Relaxes the impl PriorityQueue bounds so the struct is usable without serde when persistence isn't needed. The Serialize+DeserializeOwned constraints are now only required on the save_to_file/load_from_file methods themselves. --- src/priority_queue.rs | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/src/priority_queue.rs b/src/priority_queue.rs index c1ba675e..7698d230 100644 --- a/src/priority_queue.rs +++ b/src/priority_queue.rs @@ -27,8 +27,7 @@ where impl PriorityQueue where - P: Ord + Clone + Serialize + DeserializeOwned, - T: Serialize + DeserializeOwned, + P: Ord + Clone, { /// Creates a new `PriorityQueue` with the specified capacity. pub fn new(capacity: usize) -> Self { @@ -99,7 +98,11 @@ where } /// Saves the queue state to a file (JSON format). - pub fn save_to_file>(&self, path: Q) -> io::Result<()> { + pub fn save_to_file>(&self, path: Q) -> io::Result<()> + where + P: Serialize + DeserializeOwned, + T: Serialize + DeserializeOwned, + { let file = File::create(path)?; let writer = BufWriter::new(file); serde_json::to_writer(writer, self)?; @@ -107,7 +110,11 @@ where } /// Restores the queue state from a file. - pub fn load_from_file>(path: Q) -> io::Result { + pub fn load_from_file>(path: Q) -> io::Result + where + P: Serialize + DeserializeOwned, + T: Serialize + DeserializeOwned, + { let file = File::open(path)?; let reader = BufReader::new(file); let queue: Self = serde_json::from_reader(reader)?; From a61f9d17b490ceea5ad5e12cb223a0406964dd4c Mon Sep 17 00:00:00 2001 From: William Yang Date: Sun, 19 Apr 2026 13:30:49 +0200 Subject: [PATCH 3/6] feat(engine): recognise quic-proto feature in QuicMqttEngine cfg gates Changes #[cfg(feature = "quic")] to #[cfg(any(feature = "quic", feature = "quic-proto"))] so QuicMqttEngine and its quinn_proto imports are compiled when the lower-level quic-proto feature is enabled directly on the flowsdk crate. --- src/mqtt_client/engine.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/mqtt_client/engine.rs b/src/mqtt_client/engine.rs index 9843534d..9eac3b60 100644 --- a/src/mqtt_client/engine.rs +++ b/src/mqtt_client/engine.rs @@ -1,9 +1,9 @@ // SPDX-License-Identifier: MPL-2.0 -#[cfg(feature = "quic")] +#[cfg(any(feature = "quic", feature = "quic-proto"))] use quinn_proto::{ClientConfig, Connection, ConnectionHandle, Endpoint, EndpointConfig, StreamId}; use std::collections::VecDeque; -#[cfg(feature = "quic")] +#[cfg(any(feature = "quic", feature = "quic-proto"))] use std::sync::Arc; use std::time::{Duration, Instant}; @@ -853,7 +853,7 @@ impl MqttEngine { /// /// This engine combines the `MqttEngine` (MQTT state machine) with `quinn_proto` (QUIC state machine) /// to provide a complete MQTT-over-QUIC implementation that does not perform any direct I/O. -#[cfg(feature = "quic")] +#[cfg(any(feature = "quic", feature = "quic-proto"))] pub struct QuicMqttEngine { mqtt_engine: MqttEngine, endpoint: Endpoint, @@ -869,7 +869,7 @@ pub struct QuicMqttEngine { // stream_read_buffer removed } -#[cfg(feature = "quic")] +#[cfg(any(feature = "quic", feature = "quic-proto"))] impl QuicMqttEngine { pub fn new(options: MqttClientOptions) -> Result { // Initialize MqttEngine From 6bf05f221804b565ba6e7ba56ad0da895f09ab6f Mon Sep 17 00:00:00 2001 From: William Yang Date: Sun, 19 Apr 2026 13:30:55 +0200 Subject: [PATCH 4/6] fix(transport): gate RustlsTls re-exports behind async-client feature RustlsTlsConfig and RustlsTlsTransport are only meaningful in async contexts. Adding the async-client guard prevents a compile error when rustls-tls is enabled in isolation (e.g. the quic-quiche feature set). --- src/mqtt_client/transport/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/mqtt_client/transport/mod.rs b/src/mqtt_client/transport/mod.rs index af8d0337..0eac5e4e 100644 --- a/src/mqtt_client/transport/mod.rs +++ b/src/mqtt_client/transport/mod.rs @@ -106,5 +106,5 @@ pub use tcp::TcpTransport; #[cfg(feature = "tls")] pub use tls::TlsTransport; -#[cfg(feature = "rustls-tls")] +#[cfg(all(feature = "rustls-tls", feature = "async-client"))] pub use rustls_tls::{RustlsTlsConfig, RustlsTlsTransport}; From 38ed588849b0bcaf5e382725acb986b73145ec84 Mon Sep 17 00:00:00 2001 From: William Yang Date: Sun, 19 Apr 2026 13:31:01 +0200 Subject: [PATCH 5/6] refactor(ffi): gate uniffi derive macros and scaffolding behind uniffi-bindings feature Changes bare uniffi::Record/Enum/setup_scaffolding! invocations to #[cfg_attr(feature = "uniffi-bindings", ...)] so the FFI crate compiles cleanly when uniffi-bindings is not selected (e.g. C-only or quiche-only builds). --- flowsdk_ffi/src/engine/ffi_types.rs | 24 +++++++++++++++--------- flowsdk_ffi/src/lib.rs | 1 + 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/flowsdk_ffi/src/engine/ffi_types.rs b/flowsdk_ffi/src/engine/ffi_types.rs index d40c7ccb..f3a66794 100644 --- a/flowsdk_ffi/src/engine/ffi_types.rs +++ b/flowsdk_ffi/src/engine/ffi_types.rs @@ -1,6 +1,7 @@ // SPDX-License-Identifier: MPL-2.0 -#[derive(uniffi::Record, Clone, serde::Serialize)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Record))] +#[derive(Clone)] pub struct MqttMessageFFI { pub topic: String, pub payload: Vec, @@ -8,32 +9,37 @@ pub struct MqttMessageFFI { pub retain: bool, } -#[derive(uniffi::Record, Clone, serde::Serialize)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Record))] +#[derive(Clone)] pub struct ConnectionResultFFI { pub reason_code: u8, pub session_present: bool, } -#[derive(uniffi::Record, Clone, serde::Serialize)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Record))] +#[derive(Clone)] pub struct PublishResultFFI { pub packet_id: Option, pub reason_code: Option, pub qos: u8, } -#[derive(uniffi::Record, Clone, serde::Serialize)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Record))] +#[derive(Clone)] pub struct SubscribeResultFFI { pub packet_id: u16, pub reason_codes: Vec, } -#[derive(uniffi::Record, Clone, serde::Serialize)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Record))] +#[derive(Clone)] pub struct UnsubscribeResultFFI { pub packet_id: u16, pub reason_codes: Vec, } -#[derive(uniffi::Enum, Clone, serde::Serialize)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Enum))] +#[derive(Clone)] pub enum MqttEventFFI { Connected(ConnectionResultFFI), Disconnected { reason_code: Option }, @@ -47,7 +53,7 @@ pub enum MqttEventFFI { ReconnectScheduled { attempt: u32, delay_ms: u64 }, } -#[derive(uniffi::Record)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Record))] pub struct MqttOptionsFFI { pub client_id: String, pub mqtt_version: u8, @@ -60,7 +66,7 @@ pub struct MqttOptionsFFI { pub max_reconnect_attempts: u32, } -#[derive(uniffi::Record)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Record))] pub struct MqttTlsOptionsFFI { pub ca_cert_file: Option, pub client_cert_file: Option, @@ -70,7 +76,7 @@ pub struct MqttTlsOptionsFFI { pub enable_key_log: bool, } -#[derive(uniffi::Record)] +#[cfg_attr(feature = "uniffi-bindings", derive(uniffi::Record))] pub struct MqttDatagramFFI { pub addr: String, pub data: Vec, diff --git a/flowsdk_ffi/src/lib.rs b/flowsdk_ffi/src/lib.rs index 28dca148..9adf4837 100644 --- a/flowsdk_ffi/src/lib.rs +++ b/flowsdk_ffi/src/lib.rs @@ -2,4 +2,5 @@ pub mod engine; +#[cfg(feature = "uniffi-bindings")] uniffi::setup_scaffolding!("flowsdk_ffi"); From 7d4bf8ee1c4f01326a720311c7502b4f44ee4a1d Mon Sep 17 00:00:00 2001 From: William Yang Date: Sun, 19 Apr 2026 13:31:06 +0200 Subject: [PATCH 6/6] style: apply cargo fmt to quiche module files --- src/mqtt_client/mod.rs | 4 +-- src/mqtt_client/quic_engine_quiche.rs | 40 ++++++++++++++++----------- 2 files changed, 26 insertions(+), 18 deletions(-) diff --git a/src/mqtt_client/mod.rs b/src/mqtt_client/mod.rs index 14981bbf..2409afdb 100644 --- a/src/mqtt_client/mod.rs +++ b/src/mqtt_client/mod.rs @@ -8,6 +8,8 @@ pub mod error; pub mod inflight; pub mod no_io_client; pub mod opts; +#[cfg(feature = "quic-quiche")] +pub mod quic_engine_quiche; #[cfg(feature = "protocol-testing")] pub mod raw_packet; #[cfg(feature = "rustls-tls")] @@ -17,8 +19,6 @@ pub mod tokio_async_client; #[cfg(feature = "quic")] pub mod tokio_quic_client; pub mod transport; -#[cfg(feature = "quic-quiche")] -pub mod quic_engine_quiche; // Re-exports pub use async_client::{AsyncClientConfig, AsyncMqttClient, MqttEventHandler}; diff --git a/src/mqtt_client/quic_engine_quiche.rs b/src/mqtt_client/quic_engine_quiche.rs index cc08776e..50d0a1b6 100644 --- a/src/mqtt_client/quic_engine_quiche.rs +++ b/src/mqtt_client/quic_engine_quiche.rs @@ -71,12 +71,11 @@ impl QuicMqttEngineQuiche { insecure_skip_verify: bool, alpn: Vec>, ) -> Result<(), MqttClientError> { - let mut config = - quiche::Config::new(quiche::PROTOCOL_VERSION).map_err(|e| { - MqttClientError::InternalError { - message: format!("quiche Config::new failed: {e}"), - } - })?; + let mut config = quiche::Config::new(quiche::PROTOCOL_VERSION).map_err(|e| { + MqttClientError::InternalError { + message: format!("quiche Config::new failed: {e}"), + } + })?; // ALPN let alpn_refs: Vec<&[u8]> = if alpn.is_empty() { @@ -93,11 +92,11 @@ impl QuicMqttEngineQuiche { // TLS verification config.verify_peer(!insecure_skip_verify); if let Some(path) = ca_cert_path { - config - .load_verify_locations_from_file(path) - .map_err(|e| MqttClientError::InternalError { + config.load_verify_locations_from_file(path).map_err(|e| { + MqttClientError::InternalError { message: format!("quiche load_verify_locations_from_file failed: {e}"), - })?; + } + })?; } // Transport parameters — generous limits for MQTT workloads @@ -124,10 +123,16 @@ impl QuicMqttEngineQuiche { "0.0.0.0:0".parse().unwrap() }; - let conn = quiche::connect(Some(server_name), &scid, local_addr, server_addr, &mut config) - .map_err(|e| MqttClientError::InternalError { - message: format!("quiche::connect failed: {e}"), - })?; + let conn = quiche::connect( + Some(server_name), + &scid, + local_addr, + server_addr, + &mut config, + ) + .map_err(|e| MqttClientError::InternalError { + message: format!("quiche::connect failed: {e}"), + })?; self.connection = Some(Box::new(conn)); self.local_addr = Some(local_addr); @@ -142,7 +147,9 @@ impl QuicMqttEngineQuiche { /// Feed an incoming UDP datagram. pub fn handle_datagram(&mut self, mut data: Vec, remote_addr: SocketAddr, _now: Instant) { - let local = self.local_addr.unwrap_or_else(|| "0.0.0.0:0".parse().unwrap()); + let local = self + .local_addr + .unwrap_or_else(|| "0.0.0.0:0".parse().unwrap()); if let Some(conn) = &mut self.connection { let recv_info = quiche::RecvInfo { from: remote_addr, @@ -220,7 +227,8 @@ impl QuicMqttEngineQuiche { loop { match conn.send(&mut out) { Ok((len, _send_info)) => { - self.outgoing_datagrams.push_back((peer, out[..len].to_vec())); + self.outgoing_datagrams + .push_back((peer, out[..len].to_vec())); } Err(quiche::Error::Done) => break, Err(_) => break,