From d64b9895782e395275515fea1ecdfc930472209b Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sun, 15 Mar 2026 15:49:52 +0100 Subject: [PATCH 1/6] chore: remove reqwest from default OTLP dependency graph --- Cargo.lock | 124 +++-------------------------------------------- Cargo.toml | 2 +- src/telemetry.rs | 1 - 3 files changed, 7 insertions(+), 120 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index b1ea132f..562bdb12 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2194,37 +2194,21 @@ dependencies = [ "want", ] -[[package]] -name = "hyper-timeout" -version = "0.5.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" -dependencies = [ - "hyper 1.8.1", - "hyper-util", - "pin-project-lite", - "tokio", - "tower-service", -] - [[package]] name = "hyper-util" version = "0.1.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" dependencies = [ - "base64 0.22.1", "bytes", "futures-channel", "futures-util", "http 1.4.0", "http-body 1.0.1", "hyper 1.8.1", - "ipnet", "libc", - "percent-encoding", "pin-project-lite", - "socket2 0.6.3", + "socket2 0.5.10", "tokio", "tower-service", "tracing", @@ -2416,22 +2400,6 @@ dependencies = [ "serde_core", ] -[[package]] -name = "ipnet" -version = "2.12.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2" - -[[package]] -name = "iri-string" -version = "0.7.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c91338f0783edbd6195decb37bae672fd3b165faffb89bf7b9e6942f8b1a731a" -dependencies = [ - "memchr", - "serde", -] - [[package]] name = "is_terminal_polyfill" version = "1.70.2" @@ -2548,7 +2516,7 @@ dependencies = [ "serde_json", "tokio", "tokio-stream", - "tower 0.4.13", + "tower", "tracing", ] @@ -3269,8 +3237,11 @@ dependencies = [ "async-trait", "bytes", "http 1.4.0", + "http-body-util", + "hyper 1.8.1", + "hyper-util", "opentelemetry", - "reqwest", + "tokio", ] [[package]] @@ -3285,11 +3256,7 @@ dependencies = [ "opentelemetry-proto", "opentelemetry_sdk", "prost", - "reqwest", "thiserror 2.0.18", - "tokio", - "tonic", - "tracing", ] [[package]] @@ -3896,40 +3863,6 @@ version = "0.8.10" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc897dd8d9e8bd1ed8cdad82b5966c3e0ecae09fb1907d58efaa013543185d0a" -[[package]] -name = "reqwest" -version = "0.12.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eddd3ca559203180a307f12d114c268abf583f59b03cb906fd0b3ff8646c1147" -dependencies = [ - "base64 0.22.1", - "bytes", - "futures-channel", - "futures-core", - "futures-util", - "http 1.4.0", - "http-body 1.0.1", - "http-body-util", - "hyper 1.8.1", - "hyper-util", - "js-sys", - "log", - "percent-encoding", - "pin-project-lite", - "serde", - "serde_json", - "serde_urlencoded", - "sync_wrapper", - "tokio", - "tower 0.5.3", - "tower-http", - "tower-service", - "url", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", -] - [[package]] name = "rfc6979" version = "0.4.0" @@ -4726,9 +4659,6 @@ name = "sync_wrapper" version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0bf256ce5efdfa370213c1dabab5935a12e49f2c58d15e9eac2870d3b4f27263" -dependencies = [ - "futures-core", -] [[package]] name = "synstructure" @@ -4986,15 +4916,10 @@ dependencies = [ "http 1.4.0", "http-body 1.0.1", "http-body-util", - "hyper 1.8.1", - "hyper-timeout", - "hyper-util", "percent-encoding", "pin-project", "sync_wrapper", - "tokio", "tokio-stream", - "tower 0.5.3", "tower-layer", "tower-service", "tracing", @@ -5026,43 +4951,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "tower" -version = "0.5.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4" -dependencies = [ - "futures-core", - "futures-util", - "indexmap 2.13.0", - "pin-project-lite", - "slab", - "sync_wrapper", - "tokio", - "tokio-util", - "tower-layer", - "tower-service", - "tracing", -] - -[[package]] -name = "tower-http" -version = "0.6.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d4e6559d53cc268e5031cd8429d05415bc4cb4aefc4aa5d6cc35fbf5b924a1f8" -dependencies = [ - "bitflags 2.11.0", - "bytes", - "futures-util", - "http 1.4.0", - "http-body 1.0.1", - "iri-string", - "pin-project-lite", - "tower 0.5.3", - "tower-layer", - "tower-service", -] - [[package]] name = "tower-layer" version = "0.3.3" diff --git a/Cargo.toml b/Cargo.toml index 00f4776a..2f0b4953 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,7 +88,7 @@ tracing-actix-web = { version = "0.7", default-features = false, features = ["op tracing-log = "0.2" opentelemetry = "0.31" opentelemetry_sdk = { version = "0.31", features = ["metrics", "rt-tokio-current-thread", "spec_unstable_metrics_views"] } -opentelemetry-otlp = { version = "0.31", features = ["http-proto", "grpc-tonic", "metrics"] } +opentelemetry-otlp = { version = "0.31", default-features = false, features = ["http-proto", "metrics", "hyper-client"] } opentelemetry-semantic-conventions = { version = "0.31", features = ["semconv_experimental"] } diff --git a/src/telemetry.rs b/src/telemetry.rs index adfb060e..3e50a7b1 100644 --- a/src/telemetry.rs +++ b/src/telemetry.rs @@ -89,7 +89,6 @@ fn init_otel_tracing(logfmt_layer: logfmt::LogfmtLayer) { // W3C TraceContext propagation (traceparent header) global::set_text_map_propagator(TraceContextPropagator::new()); - // OTLP exporter — reads OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_SERVICE_NAME, etc. let exporter = opentelemetry_otlp::SpanExporter::builder() .with_http() From aa8d30b6e57715cf469da6553a1d79690982f22b Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sun, 15 Mar 2026 16:02:26 +0100 Subject: [PATCH 2/6] fix: run OTLP hyper exporters on tokio async runtime --- Cargo.toml | 2 +- src/telemetry.rs | 15 +++++++++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2f0b4953..9a499005 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -87,7 +87,7 @@ tracing-opentelemetry = "0.32" tracing-actix-web = { version = "0.7", default-features = false, features = ["opentelemetry_0_31"] } tracing-log = "0.2" opentelemetry = "0.31" -opentelemetry_sdk = { version = "0.31", features = ["metrics", "rt-tokio-current-thread", "spec_unstable_metrics_views"] } +opentelemetry_sdk = { version = "0.31", features = ["metrics", "rt-tokio", "spec_unstable_metrics_views", "experimental_trace_batch_span_processor_with_async_runtime", "experimental_metrics_periodicreader_with_async_runtime"] } opentelemetry-otlp = { version = "0.31", default-features = false, features = ["http-proto", "metrics", "hyper-client"] } opentelemetry-semantic-conventions = { version = "0.31", features = ["semconv_experimental"] } diff --git a/src/telemetry.rs b/src/telemetry.rs index 3e50a7b1..4adfc5d3 100644 --- a/src/telemetry.rs +++ b/src/telemetry.rs @@ -95,8 +95,14 @@ fn init_otel_tracing(logfmt_layer: logfmt::LogfmtLayer) { .build() .expect("Failed to build OTLP span exporter"); + let span_processor = + opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor::builder( + exporter, + opentelemetry_sdk::runtime::Tokio, + ) + .build(); let provider = SdkTracerProvider::builder() - .with_batch_exporter(exporter) + .with_span_processor(span_processor) .build(); let tracer = provider.tracer("sqlpage"); @@ -109,7 +115,12 @@ fn init_otel_tracing(logfmt_layer: logfmt::LogfmtLayer) { .build() .expect("Failed to build OTLP metric exporter"); - let reader = opentelemetry_sdk::metrics::PeriodicReader::builder(metric_exporter).build(); + let reader = + opentelemetry_sdk::metrics::periodic_reader_with_async_runtime::PeriodicReader::builder( + metric_exporter, + opentelemetry_sdk::runtime::Tokio, + ) + .build(); let meter_provider = SdkMeterProvider::builder() .with_reader(reader) .with_view(|instrument: &opentelemetry_sdk::metrics::Instrument| { From 695b608c4f7f0e37b9ebe9dd5d19e29a0339b9b6 Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sun, 15 Mar 2026 21:32:32 +0100 Subject: [PATCH 3/6] fix: propagate OTLP init errors without panic --- src/main.rs | 11 ++++++--- src/telemetry.rs | 63 ++++++++++++++++++++++++------------------------ 2 files changed, 40 insertions(+), 34 deletions(-) diff --git a/src/main.rs b/src/main.rs index ed683025..b75b5ed6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,7 +7,10 @@ use sqlpage::{ #[actix_web::main] async fn main() { - init_logging(); + if let Err(e) = init_logging() { + eprintln!("Failed to initialize logging/telemetry: {e:#}"); + std::process::exit(1); + } if let Err(e) = start().await { log::error!("{e:?}"); std::process::exit(1); @@ -33,10 +36,10 @@ async fn start() -> anyhow::Result<()> { Ok(()) } -fn init_logging() { +fn init_logging() -> anyhow::Result<()> { let load_env = dotenvy::dotenv(); - let otel_active = telemetry::init_telemetry(); + let otel_active = telemetry::init_telemetry()?; match load_env { Ok(path) => log::info!("Loaded environment variables from {path:?}"), @@ -49,4 +52,6 @@ fn init_logging() { if otel_active { log::info!("OpenTelemetry tracing enabled (OTEL_EXPORTER_OTLP_ENDPOINT is set)"); } + + Ok(()) } diff --git a/src/telemetry.rs b/src/telemetry.rs index 4adfc5d3..7cb36731 100644 --- a/src/telemetry.rs +++ b/src/telemetry.rs @@ -9,6 +9,7 @@ use std::env; use std::sync::{Once, OnceLock}; +use anyhow::Context as _; use opentelemetry_sdk::metrics::SdkMeterProvider; use opentelemetry_sdk::trace::SdkTracerProvider; @@ -17,23 +18,22 @@ static METER_PROVIDER: OnceLock = OnceLock::new(); static TEST_LOGGING_INIT: Once = Once::new(); const DEFAULT_ENV_FILTER_DIRECTIVES: &str = "sqlpage=info,actix_web=info,tracing_actix_web=info"; -/// Initializes logging / tracing. Returns `true` if `OTel` was activated. -#[must_use] -pub fn init_telemetry() -> bool { +/// Initializes logging / tracing. Returns whether `OTel` was activated. +pub fn init_telemetry() -> anyhow::Result { init_telemetry_with_log_layer(logfmt::LogfmtLayer::new()) } -fn init_telemetry_with_log_layer(logfmt_layer: logfmt::LogfmtLayer) -> bool { +fn init_telemetry_with_log_layer(logfmt_layer: logfmt::LogfmtLayer) -> anyhow::Result { let otel_endpoint = env::var("OTEL_EXPORTER_OTLP_ENDPOINT").ok(); let otel_active = otel_endpoint.as_deref().is_some_and(|v| !v.is_empty()); if otel_active { - init_otel_tracing(logfmt_layer); + init_otel_tracing(logfmt_layer)?; } else { - init_tracing(logfmt_layer); + init_tracing(logfmt_layer)?; } - otel_active + Ok(otel_active) } /// Initializes logging once for tests using the same formatter as production. @@ -61,14 +61,14 @@ pub fn shutdown_telemetry() { } /// Tracing subscriber without `OTel` export — logfmt output only. -fn init_tracing(logfmt_layer: logfmt::LogfmtLayer) { +fn init_tracing(logfmt_layer: logfmt::LogfmtLayer) -> anyhow::Result<()> { use tracing_subscriber::layer::SubscriberExt; let subscriber = tracing_subscriber::registry() .with(default_env_filter()) .with(logfmt_layer); - set_global_subscriber(subscriber); + set_global_subscriber(subscriber) } fn init_test_tracing() { @@ -78,12 +78,13 @@ fn init_test_tracing() { .with(test_env_filter()) .with(logfmt::LogfmtLayer::test_writer()); - set_global_subscriber(subscriber); + set_global_subscriber(subscriber).expect("Failed to initialize test tracing subscriber"); } -fn init_otel_tracing(logfmt_layer: logfmt::LogfmtLayer) { +fn init_otel_tracing(logfmt_layer: logfmt::LogfmtLayer) -> anyhow::Result<()> { use opentelemetry::global; use opentelemetry::trace::TracerProvider as _; + use opentelemetry_otlp::WithExportConfig as _; use opentelemetry_sdk::propagation::TraceContextPropagator; use tracing_subscriber::layer::SubscriberExt; @@ -92,8 +93,9 @@ fn init_otel_tracing(logfmt_layer: logfmt::LogfmtLayer) { // OTLP exporter — reads OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_SERVICE_NAME, etc. let exporter = opentelemetry_otlp::SpanExporter::builder() .with_http() + .with_protocol(opentelemetry_otlp::Protocol::HttpBinary) .build() - .expect("Failed to build OTLP span exporter"); + .context("Failed to build OTLP span exporter")?; let span_processor = opentelemetry_sdk::trace::span_processor_with_async_runtime::BatchSpanProcessor::builder( @@ -112,8 +114,9 @@ fn init_otel_tracing(logfmt_layer: logfmt::LogfmtLayer) { // OTLP Metric exporter let metric_exporter = opentelemetry_otlp::MetricExporter::builder() .with_http() + .with_protocol(opentelemetry_otlp::Protocol::HttpBinary) .build() - .expect("Failed to build OTLP metric exporter"); + .context("Failed to build OTLP metric exporter")?; let reader = opentelemetry_sdk::metrics::periodic_reader_with_async_runtime::PeriodicReader::builder( @@ -125,20 +128,18 @@ fn init_otel_tracing(logfmt_layer: logfmt::LogfmtLayer) { .with_reader(reader) .with_view(|instrument: &opentelemetry_sdk::metrics::Instrument| { if instrument.kind() == opentelemetry_sdk::metrics::InstrumentKind::Histogram { - Some( - opentelemetry_sdk::metrics::Stream::builder() - .with_aggregation( - opentelemetry_sdk::metrics::Aggregation::ExplicitBucketHistogram { - boundaries: vec![ - 0.001, 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, - 1.0, 2.5, 5.0, 7.5, 10.0, - ], - record_min_max: true, - }, - ) - .build() - .expect("Failed to build metrics stream"), - ) + opentelemetry_sdk::metrics::Stream::builder() + .with_aggregation( + opentelemetry_sdk::metrics::Aggregation::ExplicitBucketHistogram { + boundaries: vec![ + 0.001, 0.005, 0.01, 0.025, 0.05, 0.075, 0.1, 0.25, 0.5, 0.75, 1.0, + 2.5, 5.0, 7.5, 10.0, + ], + record_min_max: true, + }, + ) + .build() + .ok() } else { None } @@ -157,7 +158,7 @@ fn init_otel_tracing(logfmt_layer: logfmt::LogfmtLayer) { .with(otel_layer) .with(tracing_opentelemetry::MetricsLayer::new(meter_provider)); - set_global_subscriber(subscriber); + set_global_subscriber(subscriber) } fn default_env_filter() -> tracing_subscriber::EnvFilter { @@ -192,10 +193,10 @@ fn env_filter_directives(log_level: Option<&str>, rust_log: Option<&str>) -> Str } } -fn set_global_subscriber(subscriber: impl tracing::Subscriber + Send + Sync) { +fn set_global_subscriber(subscriber: impl tracing::Subscriber + Send + Sync) -> anyhow::Result<()> { tracing::subscriber::set_global_default(subscriber) - .expect("Failed to set global tracing subscriber"); - tracing_log::LogTracer::init().expect("Failed to set log→tracing bridge"); + .context("Failed to set global tracing subscriber")?; + tracing_log::LogTracer::init().context("Failed to set log→tracing bridge") } /// Custom logfmt logging layer. From 3f4bbbc505103762506c31c198b6897058fcf284 Mon Sep 17 00:00:00 2001 From: lovasoa Date: Sun, 15 Mar 2026 23:52:00 +0100 Subject: [PATCH 4/6] fix: surface OTLP exporter failures in default logs --- src/telemetry.rs | 50 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 47 insertions(+), 3 deletions(-) diff --git a/src/telemetry.rs b/src/telemetry.rs index 7cb36731..b6d45bff 100644 --- a/src/telemetry.rs +++ b/src/telemetry.rs @@ -16,7 +16,8 @@ use opentelemetry_sdk::trace::SdkTracerProvider; static TRACER_PROVIDER: OnceLock = OnceLock::new(); static METER_PROVIDER: OnceLock = OnceLock::new(); static TEST_LOGGING_INIT: Once = Once::new(); -const DEFAULT_ENV_FILTER_DIRECTIVES: &str = "sqlpage=info,actix_web=info,tracing_actix_web=info"; +const DEFAULT_ENV_FILTER_DIRECTIVES: &str = + "sqlpage=info,actix_web=info,tracing_actix_web=info,opentelemetry=warn,opentelemetry_sdk=warn,opentelemetry_otlp=warn"; /// Initializes logging / tracing. Returns whether `OTel` was activated. pub fn init_telemetry() -> anyhow::Result { @@ -335,11 +336,14 @@ mod logfmt { let target = event_target(event, &event_fields); let msg = event_fields.get("message"); let multiline_msg = is_multiline_terminal_message(colors, msg); + let include_all_event_fields = + include_all_span_fields || msg.is_none_or(String::is_empty); write_timestamp(&mut buf, colors); write_level(&mut buf, level, colors); write_message(&mut buf, msg, multiline_msg); write_dimmed_field(&mut buf, "target", target, colors); + write_event_fields(&mut buf, &event_fields, include_all_event_fields); write_span_fields(&mut buf, ctx.event_scope(event), include_all_span_fields); write_trace_id(&mut buf, ctx.event_scope(event), colors); @@ -407,12 +411,34 @@ mod logfmt { fn write_message(buf: &mut String, msg: Option<&String>, multiline_msg: bool) { if !multiline_msg { - if let Some(msg) = msg { + if let Some(msg) = msg.filter(|msg| !msg.is_empty()) { write_logfmt_value(buf, "msg", msg); } } } + fn write_event_fields( + buf: &mut String, + event_fields: &HashMap<&'static str, String>, + include_all_event_fields: bool, + ) { + if !include_all_event_fields { + return; + } + + let mut extra_fields = BTreeMap::new(); + for (&key, value) in event_fields { + if key == "message" || key == "log.target" { + continue; + } + extra_fields.insert(key, value); + } + + for (key, value) in extra_fields { + write_logfmt_value(buf, key, value); + } + } + fn write_span_fields( buf: &mut String, scope: Option>, @@ -573,7 +599,7 @@ mod logfmt { fn empty_values_fall_back_to_default_filter() { assert_eq!( env_filter_directives(Some(""), Some("")), - "sqlpage=info,actix_web=info,tracing_actix_web=info" + "sqlpage=info,actix_web=info,tracing_actix_web=info,opentelemetry=warn,opentelemetry_sdk=warn,opentelemetry_otlp=warn" ); } @@ -604,5 +630,23 @@ mod logfmt { assert_eq!(buf, " method=GET"); } + + #[test] + fn event_fields_are_rendered_when_message_is_missing() { + let mut buf = String::new(); + let event_fields = HashMap::from([ + ("message", String::new()), + ("log.target", "opentelemetry_sdk".to_string()), + ("name", "BatchSpanProcessor.ExportFailed".to_string()), + ("reason", "connection error".to_string()), + ]); + + write_event_fields(&mut buf, &event_fields, true); + + assert_eq!( + buf, + " name=BatchSpanProcessor.ExportFailed reason=\"connection error\"" + ); + } } } From fd8072f3598f14771fd43a9a7d22e3c2940b4588 Mon Sep 17 00:00:00 2001 From: lovasoa Date: Mon, 16 Mar 2026 00:03:11 +0100 Subject: [PATCH 5/6] feat: use awc for OTLP HTTP exports --- Cargo.lock | 80 ++------------------------ Cargo.toml | 3 +- src/telemetry.rs | 144 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 151 insertions(+), 76 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 562bdb12..17d30957 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -40,7 +40,7 @@ dependencies = [ "flate2", "foldhash 0.1.5", "futures-core", - "h2 0.3.27", + "h2", "http 0.2.12", "httparse", "httpdate", @@ -629,7 +629,7 @@ dependencies = [ "derive_more 2.1.1", "futures-core", "futures-util", - "h2 0.3.27", + "h2", "http 0.2.12", "itoa", "log", @@ -1970,25 +1970,6 @@ dependencies = [ "tracing", ] -[[package]] -name = "h2" -version = "0.4.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2f44da3a8150a6703ed5d34e164b875fd14c2cdab9af1252a9a1020bde2bdc54" -dependencies = [ - "atomic-waker", - "bytes", - "fnv", - "futures-core", - "futures-sink", - "http 1.4.0", - "indexmap 2.13.0", - "slab", - "tokio", - "tokio-util", - "tracing", -] - [[package]] name = "handlebars" version = "6.4.0" @@ -2172,48 +2153,6 @@ dependencies = [ "want", ] -[[package]] -name = "hyper" -version = "1.8.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ab2d4f250c3d7b1c9fcdff1cece94ea4e2dfbec68614f7b87cb205f24ca9d11" -dependencies = [ - "atomic-waker", - "bytes", - "futures-channel", - "futures-core", - "h2 0.4.13", - "http 1.4.0", - "http-body 1.0.1", - "httparse", - "itoa", - "pin-project-lite", - "pin-utils", - "smallvec", - "tokio", - "want", -] - -[[package]] -name = "hyper-util" -version = "0.1.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "96547c2556ec9d12fb1578c4eaf448b04993e7fb79cbaad930a656880a6bdfa0" -dependencies = [ - "bytes", - "futures-channel", - "futures-util", - "http 1.4.0", - "http-body 1.0.1", - "hyper 1.8.1", - "libc", - "pin-project-lite", - "socket2 0.5.10", - "tokio", - "tower-service", - "tracing", -] - [[package]] name = "iana-time-zone" version = "0.1.65" @@ -2510,7 +2449,7 @@ dependencies = [ "bytes", "futures", "http 0.2.12", - "hyper 0.14.32", + "hyper", "lambda_runtime_api_client", "serde", "serde_json", @@ -2527,7 +2466,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7210012be904051520f0dc502140ba599bae3042b65b3737b87727f1aa88a7d6" dependencies = [ "http 0.2.12", - "hyper 0.14.32", + "hyper", "tokio", "tower-service", ] @@ -3237,11 +3176,7 @@ dependencies = [ "async-trait", "bytes", "http 1.4.0", - "http-body-util", - "hyper 1.8.1", - "hyper-util", "opentelemetry", - "tokio", ] [[package]] @@ -3506,12 +3441,6 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a89322df9ebe1c1578d689c92318e070967d1042b512afbe49518723f4e6d5cd" -[[package]] -name = "pin-utils" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" - [[package]] name = "piper" version = "0.2.5" @@ -4463,6 +4392,7 @@ dependencies = [ "odbc-sys 0.29.0", "openidconnect", "opentelemetry", + "opentelemetry-http", "opentelemetry-otlp", "opentelemetry-semantic-conventions", "opentelemetry_sdk", diff --git a/Cargo.toml b/Cargo.toml index 9a499005..3b6fd53e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -88,7 +88,8 @@ tracing-actix-web = { version = "0.7", default-features = false, features = ["op tracing-log = "0.2" opentelemetry = "0.31" opentelemetry_sdk = { version = "0.31", features = ["metrics", "rt-tokio", "spec_unstable_metrics_views", "experimental_trace_batch_span_processor_with_async_runtime", "experimental_metrics_periodicreader_with_async_runtime"] } -opentelemetry-otlp = { version = "0.31", default-features = false, features = ["http-proto", "metrics", "hyper-client"] } +opentelemetry-otlp = { version = "0.31", default-features = false, features = ["http-proto", "metrics"] } +opentelemetry-http = { version = "0.31", default-features = false } opentelemetry-semantic-conventions = { version = "0.31", features = ["semconv_experimental"] } diff --git a/src/telemetry.rs b/src/telemetry.rs index b6d45bff..040dae02 100644 --- a/src/telemetry.rs +++ b/src/telemetry.rs @@ -16,9 +16,148 @@ use opentelemetry_sdk::trace::SdkTracerProvider; static TRACER_PROVIDER: OnceLock = OnceLock::new(); static METER_PROVIDER: OnceLock = OnceLock::new(); static TEST_LOGGING_INIT: Once = Once::new(); +static OTLP_HTTP_WORKER_SENDER: OnceLock< + Result, String>, +> = OnceLock::new(); const DEFAULT_ENV_FILTER_DIRECTIVES: &str = "sqlpage=info,actix_web=info,tracing_actix_web=info,opentelemetry=warn,opentelemetry_sdk=warn,opentelemetry_otlp=warn"; +type OtlpHttpResponse = Result, String>; + +struct OtlpHttpJob { + request: opentelemetry_http::Request, + response_sender: tokio::sync::oneshot::Sender, +} + +#[derive(Clone)] +struct AwcOtlpHttpClient { + sender: tokio::sync::mpsc::UnboundedSender, +} + +impl AwcOtlpHttpClient { + fn new() -> anyhow::Result { + Ok(Self { + sender: otlp_http_worker_sender()?, + }) + } +} + +impl std::fmt::Debug for AwcOtlpHttpClient { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AwcOtlpHttpClient").finish_non_exhaustive() + } +} + +fn otlp_http_worker_sender() -> anyhow::Result> { + let sender_result = OTLP_HTTP_WORKER_SENDER + .get_or_init(|| init_otlp_http_worker_sender().map_err(|error| error.to_string())); + + match sender_result { + Ok(sender) => Ok(sender.clone()), + Err(error) => { + anyhow::bail!("Failed to initialize OTLP AWC worker thread: {error}"); + } + } +} + +fn init_otlp_http_worker_sender() -> anyhow::Result> +{ + let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::(); + + std::thread::Builder::new() + .name("sqlpage-otlp-http".to_owned()) + .spawn(move || { + actix_web::rt::System::new().block_on(async move { + let awc_client = awc::Client::builder() + .add_default_header((awc::http::header::USER_AGENT, env!("CARGO_PKG_NAME"))) + .finish(); + + while let Some(job) = receiver.recv().await { + let response = execute_otlp_http_request_with_awc(&awc_client, job.request) + .await + .map_err(|error| error.to_string()); + let _ = job.response_sender.send(response); + } + }); + }) + .context("Failed to spawn OTLP AWC worker thread")?; + + Ok(sender) +} + +async fn execute_otlp_http_request_with_awc( + awc_client: &awc::Client, + request: opentelemetry_http::Request, +) -> anyhow::Result> { + let (request_parts, request_body) = request.into_parts(); + + let awc_method = awc::http::Method::from_bytes(request_parts.method.as_str().as_bytes()) + .with_context(|| format!("Invalid OTLP HTTP method: {}", request_parts.method))?; + let awc_uri: awc::http::Uri = request_parts + .uri + .to_string() + .parse() + .with_context(|| format!("Invalid OTLP collector URI: {}", request_parts.uri))?; + + let mut awc_request = awc_client.request(awc_method, awc_uri.clone()); + for (header_name, header_value) in &request_parts.headers { + let header_name_str = header_name.as_str(); + let awc_header_name = awc::http::header::HeaderName::from_bytes(header_name_str.as_bytes()) + .with_context(|| format!("Invalid OTLP header name: {header_name_str}"))?; + let awc_header_value = awc::http::header::HeaderValue::from_bytes(header_value.as_bytes()) + .with_context(|| format!("Invalid OTLP header value for {header_name_str}"))?; + awc_request = awc_request.insert_header((awc_header_name, awc_header_value)); + } + + let mut awc_response = awc_request.send_body(request_body).await.map_err(|error| { + anyhow::anyhow!("Failed to send OTLP HTTP request to {awc_uri}: {error}") + })?; + + let mut response_builder = + opentelemetry_http::Response::builder().status(awc_response.status().as_u16()); + for (header_name, header_value) in awc_response.headers() { + let header_value = header_value.to_str().map_err(|error| { + anyhow::anyhow!( + "Invalid OTLP response header value for {}: {error}", + header_name.as_str() + ) + })?; + response_builder = response_builder.header(header_name.as_str(), header_value); + } + + let response_body = awc_response.body().await.map_err(|error| { + anyhow::anyhow!("Failed to read OTLP HTTP response body from {awc_uri}: {error}") + })?; + + response_builder + .body(response_body) + .context("Failed to build OTLP HTTP response") +} + +#[async_trait::async_trait] +impl opentelemetry_http::HttpClient for AwcOtlpHttpClient { + async fn send_bytes( + &self, + request: opentelemetry_http::Request, + ) -> Result< + opentelemetry_http::Response, + opentelemetry_http::HttpError, + > { + let (response_sender, response_receiver) = tokio::sync::oneshot::channel(); + self.sender + .send(OtlpHttpJob { + request, + response_sender, + }) + .map_err(|_| anyhow::anyhow!("OTLP AWC worker thread is unavailable"))?; + + response_receiver + .await + .map_err(|_| anyhow::anyhow!("OTLP AWC worker dropped response channel"))? + .map_err(Into::into) + } +} + /// Initializes logging / tracing. Returns whether `OTel` was activated. pub fn init_telemetry() -> anyhow::Result { init_telemetry_with_log_layer(logfmt::LogfmtLayer::new()) @@ -86,14 +225,18 @@ fn init_otel_tracing(logfmt_layer: logfmt::LogfmtLayer) -> anyhow::Result<()> { use opentelemetry::global; use opentelemetry::trace::TracerProvider as _; use opentelemetry_otlp::WithExportConfig as _; + use opentelemetry_otlp::WithHttpConfig as _; use opentelemetry_sdk::propagation::TraceContextPropagator; use tracing_subscriber::layer::SubscriberExt; + let http_client = AwcOtlpHttpClient::new().context("Failed to initialize OTLP AWC client")?; + // W3C TraceContext propagation (traceparent header) global::set_text_map_propagator(TraceContextPropagator::new()); // OTLP exporter — reads OTEL_EXPORTER_OTLP_ENDPOINT, OTEL_SERVICE_NAME, etc. let exporter = opentelemetry_otlp::SpanExporter::builder() .with_http() + .with_http_client(http_client.clone()) .with_protocol(opentelemetry_otlp::Protocol::HttpBinary) .build() .context("Failed to build OTLP span exporter")?; @@ -115,6 +258,7 @@ fn init_otel_tracing(logfmt_layer: logfmt::LogfmtLayer) -> anyhow::Result<()> { // OTLP Metric exporter let metric_exporter = opentelemetry_otlp::MetricExporter::builder() .with_http() + .with_http_client(http_client) .with_protocol(opentelemetry_otlp::Protocol::HttpBinary) .build() .context("Failed to build OTLP metric exporter")?; From 2fb1a568449c0fabe6efe97a1a17774fd0573c9a Mon Sep 17 00:00:00 2001 From: lovasoa Date: Mon, 16 Mar 2026 00:17:56 +0100 Subject: [PATCH 6/6] refactor: share awc client construction with telemetry --- src/telemetry.rs | 29 ++++++++++++++++++++++++++--- src/webserver/http_client.rs | 13 ++++++++++++- 2 files changed, 38 insertions(+), 4 deletions(-) diff --git a/src/telemetry.rs b/src/telemetry.rs index 040dae02..355cfda3 100644 --- a/src/telemetry.rs +++ b/src/telemetry.rs @@ -63,14 +63,31 @@ fn otlp_http_worker_sender() -> anyhow::Result anyhow::Result> { let (sender, mut receiver) = tokio::sync::mpsc::unbounded_channel::(); + let (init_result_sender, init_result_receiver) = std::sync::mpsc::sync_channel(1); + let use_system_root_ca_certificates = + crate::webserver::http_client::default_system_root_ca_certificates_from_env(); std::thread::Builder::new() .name("sqlpage-otlp-http".to_owned()) .spawn(move || { actix_web::rt::System::new().block_on(async move { - let awc_client = awc::Client::builder() - .add_default_header((awc::http::header::USER_AGENT, env!("CARGO_PKG_NAME"))) - .finish(); + let awc_client = + match crate::webserver::http_client::make_http_client_with_system_roots( + use_system_root_ca_certificates, + ) { + Ok(client) => { + let _ = init_result_sender.send(Ok(())); + client + } + Err(error) => { + let error = format!("Failed to initialize OTLP AWC client: {error:#}"); + let _ = init_result_sender.send(Err(error.clone())); + while let Some(job) = receiver.recv().await { + let _ = job.response_sender.send(Err(error.clone())); + } + return; + } + }; while let Some(job) = receiver.recv().await { let response = execute_otlp_http_request_with_awc(&awc_client, job.request) @@ -82,6 +99,12 @@ fn init_otlp_http_worker_sender() -> anyhow::Result {} + Ok(Err(error)) => anyhow::bail!("{error}"), + Err(error) => anyhow::bail!("OTLP AWC worker initialization channel closed: {error}"), + } + Ok(sender) } diff --git a/src/webserver/http_client.rs b/src/webserver/http_client.rs index 3160492a..d374a9ee 100644 --- a/src/webserver/http_client.rs +++ b/src/webserver/http_client.rs @@ -6,7 +6,18 @@ use std::sync::OnceLock; static NATIVE_CERTS: OnceLock> = OnceLock::new(); pub fn make_http_client(config: &crate::app_config::AppConfig) -> anyhow::Result { - let connector = if config.system_root_ca_certificates { + make_http_client_with_system_roots(config.system_root_ca_certificates) +} + +pub(crate) fn default_system_root_ca_certificates_from_env() -> bool { + std::env::var("SSL_CERT_FILE").is_ok_and(|value| !value.is_empty()) + || std::env::var("SSL_CERT_DIR").is_ok_and(|value| !value.is_empty()) +} + +pub(crate) fn make_http_client_with_system_roots( + system_root_ca_certificates: bool, +) -> anyhow::Result { + let connector = if system_root_ca_certificates { let roots = NATIVE_CERTS .get_or_init(|| { log::debug!("Loading native certificates because system_root_ca_certificates is enabled");