diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 1073fae..bcda213 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -1243,6 +1243,20 @@ dependencies = [ "web-sys", ] +[[package]] +name = "reqwest-middleware" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07bc3f1384cffa4f274dad2d4ddd73aed32fed8f786d96c6be8aa4e5fd3c3b58" +dependencies = [ + "anyhow", + "async-trait", + "http", + "reqwest", + "thiserror", + "tower-service", +] + [[package]] name = "ring" version = "0.17.14" @@ -1332,7 +1346,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -1515,12 +1529,16 @@ dependencies = [ "opentelemetry-otlp", "opentelemetry-semantic-conventions", "opentelemetry_sdk", + "pin-project-lite", "regex", "reqwest", + "reqwest-middleware", "serde", "serde_json", "thiserror", "tokio", + "tower-layer", + "tower-service", "uuid", "wiremock", ] @@ -1973,7 +1991,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/rust/observability/Cargo.toml b/rust/observability/Cargo.toml index 3de0c0d..450037b 100644 --- a/rust/observability/Cargo.toml +++ b/rust/observability/Cargo.toml @@ -34,6 +34,23 @@ opentelemetry-otlp = { version = "0.32", features = ["trace", "metrics", "http-j opentelemetry-http = { version = "0.32", features = ["reqwest"], default-features = false } opentelemetry-semantic-conventions = "0.32" +# --- Optional HTTP framework instrumentation (feature-gated, default off) ----- +# `tower` feature: a tower::Layer wrapping any http Service (incl. Axum routers) +# that opens a server span per request on the global tracer. +tower-layer = { version = "0.3", optional = true } +tower-service = { version = "0.3", optional = true } +pin-project-lite = { version = "0.2", optional = true } +# `reqwest-middleware` feature: a client-span middleware for outbound calls. +# 0.5 supports reqwest 0.13 (this crate's pin). +reqwest-middleware = { version = "0.5", optional = true } + +[features] +default = [] +# OTel server-span layer for Tower/Axum services. +tower = ["dep:tower-layer", "dep:tower-service", "dep:pin-project-lite"] +# OTel client-span middleware for reqwest. +reqwest-middleware = ["dep:reqwest-middleware"] + [dev-dependencies] tokio = { version = "1", features = ["rt-multi-thread", "macros", "time", "test-util"] } wiremock = "0.6" diff --git a/rust/observability/src/lib.rs b/rust/observability/src/lib.rs index a2693cf..e4b66a4 100644 --- a/rust/observability/src/lib.rs +++ b/rust/observability/src/lib.rs @@ -43,6 +43,16 @@ mod stack; pub mod transport; pub mod types; +/// Optional Tower / Axum server-span layer. Feature-gated (`tower`, default off) +/// so the core crate stays lean for services that don't run an HTTP server. +#[cfg(feature = "tower")] +pub mod tower; + +/// Optional reqwest client-span middleware. Feature-gated +/// (`reqwest-middleware`, default off). +#[cfg(feature = "reqwest-middleware")] +pub mod reqwest_mw; + use once_cell::sync::OnceCell; use std::time::{SystemTime, UNIX_EPOCH}; diff --git a/rust/observability/src/reqwest_mw.rs b/rust/observability/src/reqwest_mw.rs new file mode 100644 index 0000000..c6bae36 --- /dev/null +++ b/rust/observability/src/reqwest_mw.rs @@ -0,0 +1,236 @@ +//! Optional reqwest client-span instrumentation (feature `reqwest-middleware`). +//! +//! A [`reqwest_middleware::Middleware`] that opens one OpenTelemetry +//! `SpanKind::Client` span per outbound HTTP call, feeding the **same global +//! tracer** the SDK installs in [`crate::setup_otel_sdk`]. Drop it into a +//! `reqwest_middleware::ClientBuilder` and every request the client makes +//! becomes a client span — nesting under the current server span when one is +//! active (e.g. inside a handler wrapped by the `tower` layer). +//! +//! Span shape (HTTP semantic conventions): +//! - name: `{method}` (e.g. `GET`) — keeps cardinality low; the URL goes in +//! attributes, not the name, per OTel HTTP client semconv. +//! - kind: `Client` +//! - attrs on start: `http.request.method`, `url.full`, `server.address`, +//! `server.port` +//! - attrs on finish: `http.response.status_code` +//! - status: `Error` on a transport error or a 4xx/5xx response; `Ok` +//! otherwise. (Unlike the server span, a CLIENT span treats any 4xx/5xx as +//! an error for the caller — the OTel client semconv marks the span errored +//! for status >= 400.) +//! +//! ## Why a dedicated middleware (not a thin span-wrapping helper) +//! +//! `reqwest-middleware` 0.5 supports reqwest 0.13 (this crate's pin), so the +//! middleware path *is* cleanly feasible — it intercepts at the client's own +//! extension point, so retries / redirects the real reqwest client performs are +//! all covered by the one span. For callers who don't want the +//! `reqwest-middleware` dependency, [`instrument_client_call`] offers a manual +//! span-wrapping alternative around any future. + +use opentelemetry::trace::{FutureExt as _, SpanKind, Status, TraceContextExt, Tracer as _}; +use opentelemetry::{Context, KeyValue}; +use reqwest::{Request, Response}; +use reqwest_middleware::{Middleware, Next}; + +/// Instrumentation-scope name used for the global tracer lookup. +const TRACER_NAME: &str = "smooai-observability/reqwest"; + +/// reqwest middleware that wraps each outbound call in a client span on the +/// global tracer. Zero-config — `OtelReqwestMiddleware::default()` is all most +/// callers need. +/// +/// ```ignore +/// use reqwest_middleware::ClientBuilder; +/// use smooai_observability::reqwest_mw::OtelReqwestMiddleware; +/// +/// let client = ClientBuilder::new(reqwest::Client::new()) +/// .with(OtelReqwestMiddleware::default()) +/// .build(); +/// ``` +#[derive(Clone, Default)] +pub struct OtelReqwestMiddleware { + _private: (), +} + +impl OtelReqwestMiddleware { + pub fn new() -> Self { + Self::default() + } +} + +#[async_trait::async_trait] +impl Middleware for OtelReqwestMiddleware { + async fn handle( + &self, + req: Request, + extensions: &mut http::Extensions, + next: Next<'_>, + ) -> reqwest_middleware::Result { + let method = req.method().clone(); + let url = req.url().clone(); + + let tracer = opentelemetry::global::tracer(TRACER_NAME); + let mut attributes = vec![ + KeyValue::new("http.request.method", method.as_str().to_owned()), + KeyValue::new("url.full", url.as_str().to_owned()), + ]; + if let Some(host) = url.host_str() { + attributes.push(KeyValue::new("server.address", host.to_owned())); + } + if let Some(port) = url.port_or_known_default() { + attributes.push(KeyValue::new("server.port", port as i64)); + } + + let span = tracer + .span_builder(method.as_str().to_owned()) + .with_kind(SpanKind::Client) + .with_attributes(attributes) + .start(&tracer); + + // Run the rest of the chain with the client span as the active context, + // so the real reqwest request (and any nested instrumentation) sees it. + // `with_context` (vs `attach`) keeps the future `Send` — `ContextGuard` + // is `!Send` and can't be held across this `.await`. + let cx = Context::current_with_span(span); + let result = next.run(req, extensions).with_context(cx.clone()).await; + + let span = cx.span(); + match &result { + Ok(response) => { + let status = response.status(); + span.set_attribute(KeyValue::new( + "http.response.status_code", + status.as_u16() as i64, + )); + // Client semconv: any >= 400 status is an error for the caller. + if status.is_client_error() || status.is_server_error() { + span.set_status(Status::error( + status.canonical_reason().unwrap_or("http error").to_owned(), + )); + } else { + span.set_status(Status::Ok); + } + } + Err(e) => { + span.set_status(Status::error(e.to_string())); + } + } + span.end(); + result + } +} + +/// Manual span-wrapping alternative for callers who do NOT want the +/// `reqwest-middleware` machinery (or aren't using a `ClientWithMiddleware`). +/// Wraps any future representing one outbound call in a client span on the +/// global tracer, recording method + URL up front and the result on completion. +/// +/// ```ignore +/// let resp = instrument_client_call( +/// reqwest::Method::GET, +/// "https://api.smoo.ai/v1/models", +/// client.get("https://api.smoo.ai/v1/models").send(), +/// ) +/// .await?; +/// ``` +pub async fn instrument_client_call( + method: reqwest::Method, + url: impl Into, + fut: F, +) -> Result +where + F: std::future::Future>, + E: std::fmt::Display, + T: HttpStatus, +{ + let url = url.into(); + let tracer = opentelemetry::global::tracer(TRACER_NAME); + let span = tracer + .span_builder(method.as_str().to_owned()) + .with_kind(SpanKind::Client) + .with_attributes(vec![ + KeyValue::new("http.request.method", method.as_str().to_owned()), + KeyValue::new("url.full", url), + ]) + .start(&tracer); + + let cx = Context::current_with_span(span); + let result = fut.with_context(cx.clone()).await; + + let span = cx.span(); + match &result { + Ok(value) => { + if let Some(code) = value.status_code() { + span.set_attribute(KeyValue::new("http.response.status_code", code as i64)); + if code >= 400 { + span.set_status(Status::error("http error")); + } else { + span.set_status(Status::Ok); + } + } else { + span.set_status(Status::Ok); + } + } + Err(e) => span.set_status(Status::error(e.to_string())), + } + span.end(); + result +} + +/// Lets [`instrument_client_call`] read a status code off the success value +/// (e.g. a [`reqwest::Response`]) without knowing the concrete type. Implement +/// it for your own response type, or rely on the blanket impls below. +pub trait HttpStatus { + /// The HTTP status code, if this value carries one. + fn status_code(&self) -> Option; +} + +impl HttpStatus for Response { + fn status_code(&self) -> Option { + Some(self.status().as_u16()) + } +} + +impl HttpStatus for () { + fn status_code(&self) -> Option { + None + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn instrument_client_call_ok_path() { + // No global provider installed -> no-op span, but the code path (Ok, + // status_code None) must run without panicking and return the value. + let out: Result<(), std::io::Error> = + instrument_client_call(reqwest::Method::GET, "https://example.com", async { + Ok(()) + }) + .await; + assert!(out.is_ok()); + } + + #[tokio::test] + async fn instrument_client_call_err_path() { + let out: Result<(), std::io::Error> = + instrument_client_call(reqwest::Method::POST, "https://example.com", async { + Err(std::io::Error::other("connection reset")) + }) + .await; + assert!(out.is_err()); + } + + #[test] + fn unit_has_no_status() { + assert_eq!(HttpStatus::status_code(&()), None); + } + + #[test] + fn middleware_is_constructible() { + let _m = OtelReqwestMiddleware::new(); + } +} diff --git a/rust/observability/src/tower.rs b/rust/observability/src/tower.rs new file mode 100644 index 0000000..3bde41d --- /dev/null +++ b/rust/observability/src/tower.rs @@ -0,0 +1,330 @@ +//! Optional Tower / Axum server-span instrumentation (feature `tower`). +//! +//! A thin [`tower::Layer`] that wraps any HTTP `Service` — including Axum +//! routers, which *are* `tower::Service` — +//! and opens one OpenTelemetry `SpanKind::Server` span per request. The span +//! feeds the **same global tracer** the SDK installs in +//! [`crate::setup_otel_sdk`], so server spans land on api.smoo.ai alongside the +//! GenAI and client spans with zero extra wiring. +//! +//! Why a custom layer instead of `tower_http::trace::TraceLayer`? +//! `TraceLayer` emits `tracing` events, which only reach OTel if a +//! `tracing-opentelemetry` bridge is installed in the host. This crate talks to +//! the `opentelemetry` API directly (see `gen_ai.rs`, `otel.rs`) and installs no +//! such bridge, so a `TraceLayer`-based helper would silently produce no spans. +//! A direct layer keeps the integration honest: the span it creates is the span +//! that gets exported. +//! +//! Span shape (HTTP semantic conventions): +//! - name: `{method} {route}` (e.g. `GET /organizations/{org_id}/resource`) +//! - kind: `Server` +//! - attrs on start: `http.request.method`, `url.path`, `network.protocol.version` +//! - attrs on finish: `http.response.status_code` +//! - status: `Error` on a 5xx response or an inner-service error; `Unset` +//! otherwise (4xx is a client problem, not a server error — matches the +//! OTel HTTP semconv rule that only 5xx sets span status to error). +//! +//! The span is attached to an [`opentelemetry::Context`] that wraps the inner +//! future, so any spans created downstream (GenAI calls, the reqwest client +//! layer in `reqwest_mw.rs`) nest under it automatically. +//! +//! Routing note: the layer records `url.path` (the raw request path). To get the +//! low-cardinality matched route template as the span name — strongly +//! recommended for Axum so `/users/123` and `/users/456` share one span name — +//! enable route templating by reading Axum's `MatchedPath` from the request +//! extensions; see [`OtelTraceLayer::with_route_extractor`]. + +use std::future::Future; +use std::pin::Pin; +use std::sync::Arc; +use std::task::{Context as TaskContext, Poll}; +use std::time::Instant; + +use opentelemetry::global::BoxedSpan; +use opentelemetry::trace::{SpanKind, Status, TraceContextExt, Tracer as _}; +use opentelemetry::{Context, KeyValue}; +use tower_layer::Layer; +use tower_service::Service; + +/// Instrumentation-scope name used for the global tracer lookup. Lets backends +/// attribute these spans to the SmooAI tower integration. +const TRACER_NAME: &str = "smooai-observability/tower"; + +/// Extracts the low-cardinality route template (e.g. `/users/{id}`) from a +/// request, when the framework exposes it. Returning `None` falls back to the +/// raw request path. For Axum, a typical extractor reads `MatchedPath`: +/// +/// ```ignore +/// layer.with_route_extractor(|req: &http::Request| { +/// req.extensions() +/// .get::() +/// .map(|m| m.as_str().to_owned()) +/// }); +/// ``` +pub type RouteExtractor = Arc) -> Option + Send + Sync>; + +/// A [`tower::Layer`] that opens a server span per HTTP request on the global +/// tracer. Clone-cheap; share one instance across the whole router. +#[derive(Clone)] +pub struct OtelTraceLayer { + route_extractor: Option>, +} + +// The default body type parameter only matters when a caller writes +// `OtelTraceLayer::default()` without a route extractor; the real body type is +// inferred from the wrapped service at `layer()` time. We keep a private unit +// placeholder so the default generic has a concrete name in docs without +// pulling in `axum` as a dependency. +mod axum_body_placeholder { + /// Placeholder body type for the default `OtelTraceLayer` generic. Never + /// constructed — the actual body type is inferred from the wrapped service. + pub enum Body {} +} + +impl Default for OtelTraceLayer { + fn default() -> Self { + OtelTraceLayer { + route_extractor: None, + } + } +} + +impl OtelTraceLayer { + /// A layer that names spans `{method} {path}` using the raw request path. + pub fn new() -> Self { + Self::default() + } + + /// Supply a route-template extractor so spans get a low-cardinality name + /// (e.g. `GET /users/{id}` instead of `GET /users/123`). Strongly + /// recommended for Axum — see [`RouteExtractor`]. + pub fn with_route_extractor(mut self, f: F) -> Self + where + F: Fn(&http::Request) -> Option + Send + Sync + 'static, + { + self.route_extractor = Some(Arc::new(f)); + self + } +} + +impl Layer for OtelTraceLayer { + type Service = OtelTraceService; + + fn layer(&self, inner: S) -> Self::Service { + OtelTraceService { + inner, + route_extractor: self.route_extractor.clone(), + } + } +} + +/// The [`tower::Service`] produced by [`OtelTraceLayer`]. Wraps the inner +/// service, opening + closing a server span around each call. +#[derive(Clone)] +pub struct OtelTraceService { + inner: S, + route_extractor: Option>, +} + +impl Service> for OtelTraceService +where + S: Service, Response = http::Response>, + S::Future: Send + 'static, + S::Error: std::fmt::Display, +{ + type Response = S::Response; + type Error = S::Error; + type Future = ResponseFuture; + + fn poll_ready(&mut self, cx: &mut TaskContext<'_>) -> Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: http::Request) -> Self::Future { + let method = req.method().clone(); + let route = self + .route_extractor + .as_ref() + .and_then(|f| f(&req)) + .unwrap_or_else(|| req.uri().path().to_owned()); + let protocol = http_version_str(req.version()); + + // Open a server span on the GLOBAL tracer (the one setup_otel_sdk + // installed). If no provider is installed it's a cheap no-op span. + let tracer = opentelemetry::global::tracer(TRACER_NAME); + let span: BoxedSpan = tracer + .span_builder(format!("{method} {route}")) + .with_kind(SpanKind::Server) + .with_attributes(vec![ + KeyValue::new("http.request.method", method.as_str().to_owned()), + KeyValue::new("url.path", req.uri().path().to_owned()), + KeyValue::new("network.protocol.version", protocol), + ]) + .start(&tracer); + + // Build the request context that holds the server span. The inner + // service is called WITHOUT the context attached here — `ResponseFuture` + // re-attaches it on every poll instead, so the span is the active parent + // exactly while the inner future runs (and we never hold the !Send + // `ContextGuard` across an await point). + let cx = Context::current_with_span(span); + let inner_future = self.inner.call(req); + + ResponseFuture { + inner: inner_future, + cx, + start: Instant::now(), + } + } +} + +pin_project_lite::pin_project! { + /// Future returned by [`OtelTraceService`]. Re-attaches the request context + /// on every poll (so downstream code sees the server span as the active + /// parent) and finalizes the span once the inner service resolves. + pub struct ResponseFuture { + #[pin] + inner: F, + cx: Context, + start: Instant, + } +} + +impl Future for ResponseFuture +where + F: Future, E>>, + E: std::fmt::Display, +{ + type Output = Result, E>; + + fn poll(self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll { + let this = self.project(); + // Make the server span the active context while the inner service runs. + let _guard = this.cx.clone().attach(); + let result = this.inner.poll(cx); + + match &result { + Poll::Pending => Poll::Pending, + Poll::Ready(outcome) => { + let span = this.cx.span(); + let elapsed_ms = this.start.elapsed().as_millis() as i64; + span.set_attribute(KeyValue::new("http.server.duration_ms", elapsed_ms)); + match outcome { + Ok(response) => { + let status = response.status(); + span.set_attribute(KeyValue::new( + "http.response.status_code", + status.as_u16() as i64, + )); + // Per OTel HTTP semconv: only 5xx marks the SERVER span + // as errored. 4xx is a client issue. + if status.is_server_error() { + span.set_status(Status::error( + status + .canonical_reason() + .unwrap_or("server error") + .to_owned(), + )); + } else { + span.set_status(Status::Ok); + } + } + Err(e) => { + span.set_status(Status::error(e.to_string())); + } + } + span.end(); + result + } + } + } +} + +fn http_version_str(v: http::Version) -> &'static str { + match v { + http::Version::HTTP_09 => "0.9", + http::Version::HTTP_10 => "1.0", + http::Version::HTTP_11 => "1.1", + http::Version::HTTP_2 => "2", + http::Version::HTTP_3 => "3", + _ => "unknown", + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::convert::Infallible; + + // A trivial inner service that echoes a fixed status, for driving the layer. + #[derive(Clone)] + struct FixedStatus(http::StatusCode); + + impl Service> for FixedStatus { + type Response = http::Response<()>; + type Error = Infallible; + type Future = + std::pin::Pin> + Send>>; + + fn poll_ready(&mut self, _: &mut TaskContext<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, _req: http::Request<()>) -> Self::Future { + let status = self.0; + Box::pin(async move { + let resp = http::Response::builder().status(status).body(()).unwrap(); + Ok(resp) + }) + } + } + + fn request() -> http::Request<()> { + http::Request::builder() + .method(http::Method::GET) + .uri("/users/123") + .body(()) + .unwrap() + } + + #[tokio::test] + async fn passes_through_response_unchanged() { + let mut svc = OtelTraceLayer::new().layer(FixedStatus(http::StatusCode::OK)); + let resp = svc.call(request()).await.unwrap(); + assert_eq!(resp.status(), http::StatusCode::OK); + } + + #[tokio::test] + async fn server_error_passes_through() { + // Exercises the 5xx -> Status::error branch (no global provider installed, + // so the span is a no-op, but the code path must run without panicking). + let mut svc = + OtelTraceLayer::new().layer(FixedStatus(http::StatusCode::INTERNAL_SERVER_ERROR)); + let resp = svc.call(request()).await.unwrap(); + assert_eq!(resp.status(), http::StatusCode::INTERNAL_SERVER_ERROR); + } + + #[tokio::test] + async fn client_error_is_not_span_error() { + // 4xx -> Status::Ok branch; just assert pass-through + no panic. + let mut svc = OtelTraceLayer::new().layer(FixedStatus(http::StatusCode::NOT_FOUND)); + let resp = svc.call(request()).await.unwrap(); + assert_eq!(resp.status(), http::StatusCode::NOT_FOUND); + } + + #[tokio::test] + async fn route_extractor_overrides_path() { + // The extractor runs without panicking; span name uses the template. + let layer = OtelTraceLayer::new() + .with_route_extractor(|_req: &http::Request<()>| Some("/users/{id}".to_owned())); + let mut svc = layer.layer(FixedStatus(http::StatusCode::OK)); + let resp = svc.call(request()).await.unwrap(); + assert_eq!(resp.status(), http::StatusCode::OK); + } + + #[test] + fn http_version_strings() { + assert_eq!(http_version_str(http::Version::HTTP_11), "1.1"); + assert_eq!(http_version_str(http::Version::HTTP_2), "2"); + } +}