diff --git a/rust/Cargo.lock b/rust/Cargo.lock index bcda213..4e22845 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -181,6 +181,16 @@ dependencies = [ "serde_core", ] +[[package]] +name = "core-foundation" +version = "0.9.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "91e195e091a93c46f7102ec7818a2aa394e1e1771c3ab4825963fa03e45afb8f" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "core-foundation" version = "0.10.1" @@ -247,12 +257,37 @@ version = "1.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "91622ff5e7162018101f2fea40d6ebf4a78bbe5a49736a2020649edf9693679e" +[[package]] +name = "encoding_rs" +version = "0.8.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75030f3c4f45dafd7586dd6780965a8c7e8e285a5ecb86713e63a79c5b2766f3" +dependencies = [ + "cfg-if", +] + [[package]] name = "equivalent" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" +[[package]] +name = "errno" +version = "0.3.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" +dependencies = [ + "libc", + "windows-sys 0.61.2", +] + +[[package]] +name = "fastrand" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f1f227452a390804cdb637b74a86990f2a7d7ba4b7d5693aac9b4dd6defd8d6" + [[package]] name = "find-msvc-tools" version = "0.1.9" @@ -271,6 +306,21 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" +[[package]] +name = "foreign-types" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6f339eb8adc052cd2ca78910fda869aefa38d22d5cb648e6485e4d3fc06f3b1" +dependencies = [ + "foreign-types-shared", +] + +[[package]] +name = "foreign-types-shared" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "00b0228411908ca8685dba7fc2cdd70ec9990a6e753e89b6ac91a84c40fbaf4b" + [[package]] name = "form_urlencoded" version = "1.2.2" @@ -548,6 +598,22 @@ dependencies = [ "tower-service", ] +[[package]] +name = "hyper-tls" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70206fc6890eaca9fde8a0bf71caa2ddfc9fe045ac9e5c70df101a7dbde866e0" +dependencies = [ + "bytes", + "http-body-util", + "hyper", + "hyper-util", + "native-tls", + "tokio", + "tokio-native-tls", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.20" @@ -566,9 +632,11 @@ dependencies = [ "percent-encoding", "pin-project-lite", "socket2", + "system-configuration", "tokio", "tower-service", "tracing", + "windows-registry", ] [[package]] @@ -802,12 +870,27 @@ version = "0.2.186" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "68ab91017fe16c622486840e4c83c9a37afeff978bd239b5293d61ece587de66" +[[package]] +name = "linux-raw-sys" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a66949e030da00e8c7d4434b251670a91556f4144941d37452769c25d58a53" + [[package]] name = "litemap" version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "92daf443525c4cce67b150400bc2316076100ce0b3686209eb8cf3c31612e6f0" +[[package]] +name = "lock_api" +version = "0.4.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "224399e74b87b5f3557511d98dff8b14089b3dadafcab6bb93eab67d3aace965" +dependencies = [ + "scopeguard", +] + [[package]] name = "log" version = "0.4.29" @@ -826,6 +909,12 @@ version = "2.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ca58f447f06ed17d5fc4043ce1b10dd205e060fb3ce5b979b8ed8e59ff3f79" +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "miniz_oxide" version = "0.8.9" @@ -846,6 +935,23 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "native-tls" +version = "0.2.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "465500e14ea162429d264d44189adc38b199b62b1c21eea9f69e4b73cb03bbf2" +dependencies = [ + "libc", + "log", + "openssl", + "openssl-probe", + "openssl-sys", + "schannel", + "security-framework", + "security-framework-sys", + "tempfile", +] + [[package]] name = "num-traits" version = "0.2.19" @@ -880,12 +986,49 @@ version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" +[[package]] +name = "openssl" +version = "0.10.81" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77823a27f0babb03091cb9ed9ef80af3b39dbc82f97e8fa530374b7dafd87a45" +dependencies = [ + "bitflags", + "cfg-if", + "foreign-types", + "libc", + "openssl-macros", + "openssl-sys", +] + +[[package]] +name = "openssl-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "openssl-probe" version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7c87def4c32ab89d880effc9e097653c8da5d6ef28e6b539d313baaacfbafcbe" +[[package]] +name = "openssl-sys" +version = "0.9.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b47e7e6bb2c38cd930d25a23b40fa52e068c10e85f3e03a7f5ba5aaca5713695" +dependencies = [ + "cc", + "libc", + "pkg-config", + "vcpkg", +] + [[package]] name = "opentelemetry" version = "0.32.0" @@ -910,7 +1053,7 @@ dependencies = [ "bytes", "http", "opentelemetry", - "reqwest", + "reqwest 0.13.4", ] [[package]] @@ -925,7 +1068,7 @@ dependencies = [ "opentelemetry-proto", "opentelemetry_sdk", "prost", - "reqwest", + "reqwest 0.13.4", "serde_json", "thiserror", ] @@ -962,12 +1105,35 @@ dependencies = [ "opentelemetry", "percent-encoding", "portable-atomic", - "rand", + "rand 0.9.4", "thiserror", "tokio", "tokio-stream", ] +[[package]] +name = "parking_lot" +version = "0.12.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93857453250e3077bd71ff98b6a65ea6621a19bb0f559a85248955ac12c45a1a" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2621685985a2ebf1c516881c026032ac7deafcda1a2c9b7850dc81e3dfcb64c1" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-link", +] + [[package]] name = "percent-encoding" version = "2.3.2" @@ -980,6 +1146,12 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" +[[package]] +name = "pkg-config" +version = "0.3.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19f132c84eca552bf34cab8ec81f1c1dcc229b811638f9d283dceabe58c5569e" + [[package]] name = "portable-atomic" version = "1.13.1" @@ -1031,8 +1203,8 @@ checksum = "4b45fcc2344c680f5025fe57779faef368840d0bd1f42f216291f0dc4ace4744" dependencies = [ "bitflags", "num-traits", - "rand", - "rand_chacha", + "rand 0.9.4", + "rand_chacha 0.9.0", "rand_xorshift", "regex-syntax", "unarray", @@ -1091,7 +1263,7 @@ dependencies = [ "bytes", "getrandom 0.3.4", "lru-slab", - "rand", + "rand 0.9.4", "ring", "rustc-hash", "rustls", @@ -1138,14 +1310,35 @@ version = "6.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" +[[package]] +name = "rand" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ca0ecfa931c29007047d1bc58e623ab12e5590e8c7cc53200d5202b69266d8a" +dependencies = [ + "libc", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + [[package]] name = "rand" version = "0.9.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "44c5af06bb1b7d3216d91932aed5265164bf384dc89cd6ba05cf59a35f5f76ea" dependencies = [ - "rand_chacha", - "rand_core", + "rand_chacha 0.9.0", + "rand_core 0.9.5", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core 0.6.4", ] [[package]] @@ -1155,7 +1348,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d3022b5f1df60f26e1ffddd6c66e8aa15de382ae63b3a0c1bfc0e4d3e3f325cb" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.9.5", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom 0.2.17", ] [[package]] @@ -1173,7 +1375,16 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "513962919efc330f829edb2535844d1b912b0fbe2ca165d613e4e8788bb05a5a" dependencies = [ - "rand_core", + "rand_core 0.9.5", +] + +[[package]] +name = "redox_syscall" +version = "0.5.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed2bf2547551a7053d6fdfafda3f938979645c44812fbfcda098faae3f1a362d" +dependencies = [ + "bitflags", ] [[package]] @@ -1205,6 +1416,46 @@ version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d6f6ff9a378485b298a5286656da665ba74413d36db0979633275d2e708145d4" +[[package]] +name = "reqwest" +version = "0.12.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" +dependencies = [ + "base64", + "bytes", + "encoding_rs", + "futures-core", + "h2", + "http", + "http-body", + "http-body-util", + "hyper", + "hyper-rustls", + "hyper-tls", + "hyper-util", + "js-sys", + "log", + "mime", + "native-tls", + "percent-encoding", + "pin-project-lite", + "rustls-pki-types", + "serde", + "serde_json", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tokio-native-tls", + "tower", + "tower-http", + "tower-service", + "url", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "reqwest" version = "0.13.4" @@ -1252,7 +1503,7 @@ dependencies = [ "anyhow", "async-trait", "http", - "reqwest", + "reqwest 0.13.4", "thiserror", "tower-service", ] @@ -1292,6 +1543,19 @@ dependencies = [ "semver", ] +[[package]] +name = "rustix" +version = "1.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6fe4565b9518b83ef4f91bb47ce29620ca828bd32cb7e408f0062e9930ba190" +dependencies = [ + "bitflags", + "errno", + "libc", + "linux-raw-sys", + "windows-sys 0.61.2", +] + [[package]] name = "rustls" version = "0.23.40" @@ -1334,7 +1598,7 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "26d1e2536ce4f35f4846aa13bff16bd0ff40157cdb14cc056c7b14ba41233ba0" dependencies = [ - "core-foundation", + "core-foundation 0.10.1", "core-foundation-sys", "jni", "log", @@ -1397,6 +1661,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + [[package]] name = "security-framework" version = "3.7.0" @@ -1404,7 +1674,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b7f4bc775c73d9a02cde8bf7b2ec4c9d12743edf609006c7facc23998404cd1d" dependencies = [ "bitflags", - "core-foundation", + "core-foundation 0.10.1", "core-foundation-sys", "libc", "security-framework-sys", @@ -1487,6 +1757,16 @@ version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" +[[package]] +name = "signal-hook-registry" +version = "1.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4db69cba1110affc0e9f7bcd48bbf87b3f4fc7c61fc9155afd4c469eb3d6c1b" +dependencies = [ + "errno", + "libc", +] + [[package]] name = "simd_cesu8" version = "1.1.1" @@ -1515,6 +1795,21 @@ version = "1.15.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "67b1b7a3b5fe4f1376887184045fcf45c69e92af734b7aaddc05fb777b6fbd03" +[[package]] +name = "smooai-fetch" +version = "3.3.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5b9129f2e98af30f61be5ace745edb87086cd72e6c6f4c03b83d988eca12c43" +dependencies = [ + "rand 0.8.6", + "reqwest 0.12.28", + "serde", + "serde_json", + "thiserror", + "tokio", + "tracing", +] + [[package]] name = "smooai-observability" version = "0.1.0" @@ -1531,10 +1826,11 @@ dependencies = [ "opentelemetry_sdk", "pin-project-lite", "regex", - "reqwest", + "reqwest 0.13.4", "reqwest-middleware", "serde", "serde_json", + "smooai-fetch", "thiserror", "tokio", "tower-layer", @@ -1596,6 +1892,40 @@ dependencies = [ "syn", ] +[[package]] +name = "system-configuration" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a13f3d0daba03132c0aa9767f98351b3488edc2c100cda2d2ec2b04f3d8d3c8b" +dependencies = [ + "bitflags", + "core-foundation 0.9.4", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e1d1b10ced5ca923a1fcb8d03e96b8d3268065d724548c0211415ff6ac6bac4" +dependencies = [ + "core-foundation-sys", + "libc", +] + +[[package]] +name = "tempfile" +version = "3.27.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32497e9a4c7b38532efcdebeef879707aa9f794296a4f0244f6f69e9bc8574bd" +dependencies = [ + "fastrand", + "getrandom 0.4.2", + "once_cell", + "rustix", + "windows-sys 0.61.2", +] + [[package]] name = "thiserror" version = "2.0.18" @@ -1650,7 +1980,9 @@ dependencies = [ "bytes", "libc", "mio", + "parking_lot", "pin-project-lite", + "signal-hook-registry", "socket2", "tokio-macros", "windows-sys 0.61.2", @@ -1667,6 +1999,16 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-native-tls" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbae76ab933c85776efabc971569dd6119c580d8f5d448769dec1764bf796ef2" +dependencies = [ + "native-tls", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.26.4" @@ -1753,9 +2095,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ "pin-project-lite", + "tracing-attributes", "tracing-core", ] +[[package]] +name = "tracing-attributes" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "tracing-core" version = "0.1.36" @@ -1824,6 +2178,12 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "vcpkg" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" + [[package]] name = "walkdir" version = "2.5.0" @@ -2000,6 +2360,35 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0805222e57f7521d6a62e36fa9163bc891acd422f971defe97d64e70d0a4fe5" +[[package]] +name = "windows-registry" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "02752bf7fbdcce7f2a27a742f798510f3e5ad88dbe84871e5168e2120c3d5720" +dependencies = [ + "windows-link", + "windows-result", + "windows-strings", +] + +[[package]] +name = "windows-result" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7781fa89eaf60850ac3d2da7af8e5242a5ea78d1a11c49bf2910bb5a73853eb5" +dependencies = [ + "windows-link", +] + +[[package]] +name = "windows-strings" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7837d08f69c77cf6b07689544538e017c1bfcf57e34b4c0ff58e6c2cd3b37091" +dependencies = [ + "windows-link", +] + [[package]] name = "windows-sys" version = "0.52.0" diff --git a/rust/observability/Cargo.toml b/rust/observability/Cargo.toml index 450037b..9f2ad3a 100644 --- a/rust/observability/Cargo.toml +++ b/rust/observability/Cargo.toml @@ -10,9 +10,19 @@ categories = ["development-tools::debugging", "api-bindings"] readme = "README.md" [dependencies] -# reqwest 0.13 to unify with opentelemetry-http 0.32 (its `reqwest` feature -# impls HttpClient on reqwest 0.13's Client; a 0.12 pin would be a different type -# and the impl wouldn't apply). +# `smooai-fetch` wraps reqwest with timeouts + retries + circuit-breaking; the +# SDK's own app-style outbound HTTP (M2M token exchange in `auth.rs`, the batched +# webhook POST in `transport.rs`) goes through it so those calls inherit the +# resilience policy every other SmooAI client uses (SMOODEV-2026). +smooai-fetch = "3" +# reqwest 0.13 stays for two reasons: (1) the OTLP exporter's auth-injecting +# client in `otel.rs` is coupled to `opentelemetry-http`'s `HttpClient` trait, +# which is impl'd for `reqwest::Client` — smooai-fetch doesn't impl that trait, and +# that path is the OTLP protocol transport, not an app call; (2) the +# `reqwest-middleware` feature IS the reqwest integration for downstream reqwest +# users. It also remains a transitive dep of smooai-fetch. 0.13 unifies with +# opentelemetry-http 0.32 (a 0.12 pin would be a different `Client` type and the +# trait impl wouldn't apply). reqwest = { version = "0.13", features = ["json", "form", "rustls"], default-features = false } serde = { version = "1", features = ["derive"] } serde_json = "1" diff --git a/rust/observability/src/auth.rs b/rust/observability/src/auth.rs index 46091b8..22e2be1 100644 --- a/rust/observability/src/auth.rs +++ b/rust/observability/src/auth.rs @@ -8,6 +8,9 @@ //! refresh share one in-flight request (a `tokio::sync::Mutex` guards the //! refresh so duplicate token exchanges don't churn the rate limiter). //! +//! The token exchange goes through `smooai-fetch` (timeouts + retries + circuit +//! breaking) rather than raw `reqwest` (SMOODEV-2026). +//! //! Server contract: //! //! ```text @@ -21,8 +24,13 @@ //! ``` use serde::Deserialize; +use smooai_fetch::defaults::default_retry_options; +use smooai_fetch::error::FetchError; +use smooai_fetch::types::{Method, RequestInit}; +use smooai_fetch::{FetchBuilder, FetchClient}; +use std::collections::HashMap; use std::sync::Arc; -use std::time::{Duration, SystemTime, UNIX_EPOCH}; +use std::time::{SystemTime, UNIX_EPOCH}; use tokio::sync::Mutex; /// Errors from the token exchange. Callers in the export path log + drop these; @@ -39,7 +47,9 @@ pub enum TokenError { Config(&'static str), } -#[derive(Deserialize)] +// `Clone` is required by smooai-fetch's `FetchClient` bound +// (`DeserializeOwned + Clone + Send + 'static`). +#[derive(Deserialize, Clone)] struct TokenResponse { access_token: Option, expires_in: Option, @@ -85,7 +95,9 @@ struct Inner { client_id: String, client_secret: String, refresh_window_secs: u64, - http: reqwest::Client, + // smooai-fetch client typed to the token JSON so a successful exchange parses + // straight into `FetchResponse::data`. Carries the 10s timeout + retry policy. + http: FetchClient, cached: Mutex>, } @@ -121,10 +133,10 @@ impl TokenProvider { if options.client_secret.is_empty() { return Err(TokenError::Config("clientSecret")); } - let http = reqwest::Client::builder() - .timeout(Duration::from_secs(10)) - .build() - .map_err(|e| TokenError::Http(e.to_string()))?; + let http = FetchBuilder::::new() + .with_timeout(10_000) + .with_retry(default_retry_options()) + .build(); Ok(TokenProvider { inner: Arc::new(Inner { auth_url: options.auth_url.trim_end_matches('/').to_string(), @@ -169,38 +181,41 @@ impl TokenProvider { } async fn refresh(&self) -> Result { - let res = self + // smooai-fetch's `RequestInit` takes a pre-serialized String body, so + // build the `application/x-www-form-urlencoded` payload by hand. Each + // value is percent-encoded (the `client_secret` can contain reserved + // characters); the keys are all token-safe literals. + let body = format!( + "grant_type=client_credentials&provider=client_credentials&client_id={}&client_secret={}", + form_encode(&self.inner.client_id), + form_encode(&self.inner.client_secret), + ); + + let mut headers = HashMap::new(); + headers.insert( + "content-type".to_string(), + "application/x-www-form-urlencoded".to_string(), + ); + + let resp = self .inner .http - .post(format!("{}/token", self.inner.auth_url)) - .header("Content-Type", "application/x-www-form-urlencoded") - .form(&[ - ("grant_type", "client_credentials"), - ("provider", "client_credentials"), - ("client_id", self.inner.client_id.as_str()), - ("client_secret", self.inner.client_secret.as_str()), - ]) - .send() + .fetch( + &format!("{}/token", self.inner.auth_url), + RequestInit { + method: Method::POST, + headers, + body: Some(body), + }, + ) .await - .map_err(|e| TokenError::Http(e.to_string()))?; + .map_err(token_error_from_fetch)?; - let status = res.status(); - if !status.is_success() { - let body = res - .text() - .await - .unwrap_or_else(|_| "".to_string()); - return Err(TokenError::Status { - status: status.as_u16(), - body, - }); - } - let body: TokenResponse = res - .json() - .await - .map_err(|e| TokenError::Http(e.to_string()))?; - let access_token = body.access_token.ok_or(TokenError::NoAccessToken)?; - let expires_in = body.expires_in.unwrap_or(3600); + // 2xx only reaches here (non-2xx is an `Err` from `fetch`). The typed + // client parsed the JSON into `resp.data`. + let parsed = resp.data.ok_or(TokenError::NoAccessToken)?; + let access_token = parsed.access_token.ok_or(TokenError::NoAccessToken)?; + let expires_in = parsed.expires_in.unwrap_or(3600); Ok(CachedToken { access_token, expires_at: now_secs() + expires_in, @@ -208,6 +223,40 @@ impl TokenProvider { } } +/// Map a [`FetchError`] onto a [`TokenError`]. A non-2xx response surfaces as +/// `TokenError::Status` (preserving the upstream status + body); everything else +/// (timeout, transport, exhausted retries) folds into `TokenError::Http`. The +/// retry loop wraps the final failure in `FetchError::Retry`, so unwrap one level +/// to recover an underlying `HttpResponse` status. +fn token_error_from_fetch(err: FetchError) -> TokenError { + let unwrapped = match &err { + FetchError::Retry { source, .. } => source.as_ref(), + other => other, + }; + match unwrapped { + FetchError::HttpResponse { status, body, .. } => TokenError::Status { + status: *status, + body: body.clone(), + }, + _ => TokenError::Http(err.to_string()), + } +} + +/// Minimal `application/x-www-form-urlencoded` value encoder: percent-encodes +/// everything outside the RFC 3986 unreserved set (`A-Z a-z 0-9 - . _ ~`). +fn form_encode(value: &str) -> String { + let mut out = String::with_capacity(value.len()); + for byte in value.bytes() { + match byte { + b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'.' | b'_' | b'~' => { + out.push(byte as char); + } + _ => out.push_str(&format!("%{byte:02X}")), + } + } + out +} + fn now_secs() -> u64 { SystemTime::now() .duration_since(UNIX_EPOCH) diff --git a/rust/observability/src/transport.rs b/rust/observability/src/transport.rs index f9fbf88..31cc060 100644 --- a/rust/observability/src/transport.rs +++ b/rust/observability/src/transport.rs @@ -8,9 +8,18 @@ //! Errors are swallowed — observability must never throw into host code. On a //! failed flush the batch is pushed back to the FRONT of the queue for the next //! attempt (matching the TS `queue.unshift(...batch)` retry behavior). +//! +//! The webhook POST goes through `smooai-fetch` (timeouts + retries + circuit +//! breaking) rather than raw `reqwest` (SMOODEV-2026). smooai-fetch already +//! retries 429/5xx internally; the queue requeue here covers transport failures +//! and the post-retry surface so a permanently-failing endpoint still re-tries on +//! the next flush tick. use crate::types::{IngestPayload, IngestType, ObservabilityEvent}; -use std::collections::VecDeque; +use smooai_fetch::defaults::default_retry_options; +use smooai_fetch::types::{Method, RequestInit}; +use smooai_fetch::{FetchBuilder, FetchClient}; +use std::collections::{HashMap, VecDeque}; use std::sync::Arc; use std::time::Duration; use tokio::sync::Mutex; @@ -46,7 +55,9 @@ impl TransportOptions { struct TransportInner { opts: TransportOptions, - http: reqwest::Client, + // The webhook response body is ignored (we only care about success/failure), + // so the client is typed to `serde_json::Value`. + http: FetchClient, queue: Mutex>, } @@ -75,10 +86,10 @@ impl Transport { /// Build a transport and spawn its background flush loop. Requires a tokio /// runtime (the loop is a spawned task). pub fn new(opts: TransportOptions) -> Self { - let http = reqwest::Client::builder() - .timeout(Duration::from_secs(10)) - .build() - .unwrap_or_default(); + let http = FetchBuilder::::new() + .with_timeout(10_000) + .with_retry(default_retry_options()) + .build(); let inner = Arc::new(TransportInner { opts: opts.clone(), http, @@ -154,26 +165,46 @@ async fn flush_inner(inner: &Arc) { events: batch.clone(), }; + // Serialize the payload by hand — smooai-fetch's `RequestInit` takes a String + // body. A serialization failure can't realistically happen for our types, but + // if it did we treat it as a failed flush and requeue. + let Ok(body) = serde_json::to_string(&payload) else { + requeue(inner, batch).await; + return; + }; + + let mut headers = HashMap::new(); + headers.insert("content-type".to_string(), "application/json".to_string()); + + // smooai-fetch returns `Err` on non-2xx (after its own 429/5xx retries) and on + // transport/timeout errors — exactly the cases we want to requeue. let result = inner .http - .post(&inner.opts.dsn) - .header("content-type", "application/json") - .json(&payload) - .send() + .fetch( + &inner.opts.dsn, + RequestInit { + method: Method::POST, + headers, + body: Some(body), + }, + ) .await; - let ok = matches!(result, Ok(ref res) if res.status().is_success()); - if !ok { - // Push the batch back to the FRONT for the next attempt (matches the TS - // `queue.unshift(...batch)`). Bounded by max_queue_size so a permanently - // failing endpoint can't grow memory without limit. - let mut q = inner.queue.lock().await; - for event in batch.into_iter().rev() { - if q.len() >= inner.opts.max_queue_size { - q.pop_back(); - } - q.push_front(event); + if result.is_err() { + requeue(inner, batch).await; + } +} + +/// Push a failed batch back to the FRONT of the queue for the next attempt +/// (matches the TS `queue.unshift(...batch)`). Bounded by `max_queue_size` so a +/// permanently failing endpoint can't grow memory without limit. +async fn requeue(inner: &Arc, batch: Vec) { + let mut q = inner.queue.lock().await; + for event in batch.into_iter().rev() { + if q.len() >= inner.opts.max_queue_size { + q.pop_back(); } + q.push_front(event); } }