diff --git a/bottlecap/Cargo.lock b/bottlecap/Cargo.lock index 75137ab9a..298487f95 100644 --- a/bottlecap/Cargo.lock +++ b/bottlecap/Cargo.lock @@ -1761,10 +1761,11 @@ checksum = "459427e2af2b9c839b132acb702a1c654d95e10f8c326bfc2ad11310e458b1c5" [[package]] name = "libdd-common" -version = "1.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=158b59471f1132e3cb36023fa3c46ccb2dd0eda1#158b59471f1132e3cb36023fa3c46ccb2dd0eda1" +version = "1.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=c8121f422d2c8d219f8d421ff3cdb1fcbe9e8b09#c8121f422d2c8d219f8d421ff3cdb1fcbe9e8b09" dependencies = [ "anyhow", + "bytes", "cc", "const_format", "futures", @@ -1795,15 +1796,15 @@ dependencies = [ [[package]] name = "libdd-ddsketch" version = "1.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=158b59471f1132e3cb36023fa3c46ccb2dd0eda1#158b59471f1132e3cb36023fa3c46ccb2dd0eda1" +source = "git+https://github.com/DataDog/libdatadog?rev=c8121f422d2c8d219f8d421ff3cdb1fcbe9e8b09#c8121f422d2c8d219f8d421ff3cdb1fcbe9e8b09" dependencies = [ "prost 0.14.3", ] [[package]] name = "libdd-tinybytes" -version = "1.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=158b59471f1132e3cb36023fa3c46ccb2dd0eda1#158b59471f1132e3cb36023fa3c46ccb2dd0eda1" +version = "1.1.0" +source = "git+https://github.com/DataDog/libdatadog?rev=c8121f422d2c8d219f8d421ff3cdb1fcbe9e8b09#c8121f422d2c8d219f8d421ff3cdb1fcbe9e8b09" dependencies = [ "serde", ] @@ -1811,7 +1812,7 @@ dependencies = [ [[package]] name = "libdd-trace-normalization" version = "1.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=158b59471f1132e3cb36023fa3c46ccb2dd0eda1#158b59471f1132e3cb36023fa3c46ccb2dd0eda1" +source = "git+https://github.com/DataDog/libdatadog?rev=c8121f422d2c8d219f8d421ff3cdb1fcbe9e8b09#c8121f422d2c8d219f8d421ff3cdb1fcbe9e8b09" dependencies = [ "anyhow", "libdd-trace-protobuf", @@ -1820,7 +1821,7 @@ dependencies = [ [[package]] name = "libdd-trace-obfuscation" version = "1.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=158b59471f1132e3cb36023fa3c46ccb2dd0eda1#158b59471f1132e3cb36023fa3c46ccb2dd0eda1" +source = "git+https://github.com/DataDog/libdatadog?rev=c8121f422d2c8d219f8d421ff3cdb1fcbe9e8b09#c8121f422d2c8d219f8d421ff3cdb1fcbe9e8b09" dependencies = [ "anyhow", "libdd-common", @@ -1837,7 +1838,7 @@ dependencies = [ [[package]] name = "libdd-trace-protobuf" version = "1.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=158b59471f1132e3cb36023fa3c46ccb2dd0eda1#158b59471f1132e3cb36023fa3c46ccb2dd0eda1" +source = "git+https://github.com/DataDog/libdatadog?rev=c8121f422d2c8d219f8d421ff3cdb1fcbe9e8b09#c8121f422d2c8d219f8d421ff3cdb1fcbe9e8b09" dependencies = [ "prost 0.14.3", "serde", @@ -1847,7 +1848,7 @@ dependencies = [ [[package]] name = "libdd-trace-stats" version = "1.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=158b59471f1132e3cb36023fa3c46ccb2dd0eda1#158b59471f1132e3cb36023fa3c46ccb2dd0eda1" +source = "git+https://github.com/DataDog/libdatadog?rev=c8121f422d2c8d219f8d421ff3cdb1fcbe9e8b09#c8121f422d2c8d219f8d421ff3cdb1fcbe9e8b09" dependencies = [ "hashbrown 0.15.5", "libdd-ddsketch", @@ -1858,15 +1859,15 @@ dependencies = [ [[package]] name = "libdd-trace-utils" version = "1.0.0" -source = "git+https://github.com/DataDog/libdatadog?rev=158b59471f1132e3cb36023fa3c46ccb2dd0eda1#158b59471f1132e3cb36023fa3c46ccb2dd0eda1" +source = "git+https://github.com/DataDog/libdatadog?rev=c8121f422d2c8d219f8d421ff3cdb1fcbe9e8b09#c8121f422d2c8d219f8d421ff3cdb1fcbe9e8b09" dependencies = [ "anyhow", "bytes", "flate2", "futures", "http 1.4.0", + "http-body 1.0.1", "http-body-util", - "hyper 1.8.1", "indexmap 2.13.0", "libdd-common", "libdd-tinybytes", diff --git a/bottlecap/Cargo.toml b/bottlecap/Cargo.toml index b7b825b96..abc4ea306 100644 --- a/bottlecap/Cargo.toml +++ b/bottlecap/Cargo.toml @@ -67,12 +67,12 @@ indexmap = {version = "2.11.0", default-features = false} # be found in the clippy.toml file adjacent to this Cargo.toml. datadog-protos = { version = "0.1.0", default-features = false, git = "https://github.com/DataDog/saluki/", rev = "f863626dbfe3c59bb390985fa6530b0621c2a0a2"} ddsketch-agent = { version = "0.1.0", default-features = false, git = "https://github.com/DataDog/saluki/", rev = "f863626dbfe3c59bb390985fa6530b0621c2a0a2"} -libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "158b59471f1132e3cb36023fa3c46ccb2dd0eda1" } -libdd-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = "158b59471f1132e3cb36023fa3c46ccb2dd0eda1" } -libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "158b59471f1132e3cb36023fa3c46ccb2dd0eda1" , features = ["mini_agent"] } -libdd-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "158b59471f1132e3cb36023fa3c46ccb2dd0eda1" } -libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "158b59471f1132e3cb36023fa3c46ccb2dd0eda1" } -libdd-trace-stats = { git = "https://github.com/DataDog/libdatadog", rev = "158b59471f1132e3cb36023fa3c46ccb2dd0eda1" } +libdd-common = { git = "https://github.com/DataDog/libdatadog", rev = "c8121f422d2c8d219f8d421ff3cdb1fcbe9e8b09" } +libdd-trace-protobuf = { git = "https://github.com/DataDog/libdatadog", rev = "c8121f422d2c8d219f8d421ff3cdb1fcbe9e8b09" } +libdd-trace-utils = { git = "https://github.com/DataDog/libdatadog", rev = "c8121f422d2c8d219f8d421ff3cdb1fcbe9e8b09" , features = ["mini_agent"] } +libdd-trace-normalization = { git = "https://github.com/DataDog/libdatadog", rev = "c8121f422d2c8d219f8d421ff3cdb1fcbe9e8b09" } +libdd-trace-obfuscation = { git = "https://github.com/DataDog/libdatadog", rev = "c8121f422d2c8d219f8d421ff3cdb1fcbe9e8b09" } +libdd-trace-stats = { git = "https://github.com/DataDog/libdatadog", rev = "c8121f422d2c8d219f8d421ff3cdb1fcbe9e8b09" } dogstatsd = { git = "https://github.com/DataDog/serverless-components", rev = "2eb009a59ed07ffcaf174db1c31af413365e9bc6", default-features = false } datadog-fips = { git = "https://github.com/DataDog/serverless-components", rev = "2eb009a59ed07ffcaf174db1c31af413365e9bc6", default-features = false } libddwaf = { version = "1.28.1", git = "https://github.com/DataDog/libddwaf-rust", rev = "d1534a158d976bd4f747bf9fcc58e0712d2d17fc", default-features = false, features = ["serde"] } diff --git a/bottlecap/src/traces/hyper_client.rs b/bottlecap/src/traces/http_client.rs similarity index 85% rename from bottlecap/src/traces/hyper_client.rs rename to bottlecap/src/traces/http_client.rs index 99cd60fe5..c4638c35c 100644 --- a/bottlecap/src/traces/hyper_client.rs +++ b/bottlecap/src/traces/http_client.rs @@ -1,14 +1,14 @@ // Copyright 2023-Present Datadog, Inc. https://www.datadoghq.com/ // SPDX-License-Identifier: Apache-2.0 -//! Hyper-based HTTP client for trace and stats flushers. +//! HTTP client for trace and stats flushers. //! //! This module provides the HTTP client type required by `libdd_trace_utils` //! for sending traces and stats to Datadog intake endpoints. use hyper_http_proxy; use hyper_rustls::HttpsConnectorBuilder; -use libdd_common::{GenericHttpClient, hyper_migration}; +use libdd_common::{GenericHttpClient, http_common}; use rustls::RootCertStore; use rustls_pki_types::CertificateDer; use std::error::Error; @@ -20,7 +20,7 @@ use tracing::debug; /// Type alias for the HTTP client used by trace and stats flushers. /// /// This is the client type expected by `libdd_trace_utils::SendData::send()`. -pub type HyperClient = +pub type HttpClient = GenericHttpClient>; /// Initialize the crypto provider needed for setting custom root certificates. @@ -35,7 +35,7 @@ fn ensure_crypto_provider_initialized() { let () = &*INIT_CRYPTO_PROVIDER; } -/// Creates a new hyper-based HTTP client with the given configuration. +/// Creates a new HTTP client with the given configuration. /// /// This client is compatible with `libdd_trace_utils` and supports: /// - HTTPS proxy configuration @@ -54,7 +54,7 @@ fn ensure_crypto_provider_initialized() { pub fn create_client( proxy_https: Option<&String>, tls_cert_file: Option<&String>, -) -> Result> { +) -> Result> { // Create the base connector with optional custom TLS config let connector = if let Some(ca_cert_path) = tls_cert_file { // Ensure crypto provider is initialized before creating TLS config @@ -84,10 +84,7 @@ pub fn create_client( .enable_http1() .build(); - debug!( - "HYPER_CLIENT | Added root certificate from {}", - ca_cert_path - ); + debug!("HTTP_CLIENT | Added root certificate from {}", ca_cert_path); // Construct the Connector::Https variant directly libdd_common::connector::Connector::Https(https_connector) @@ -100,14 +97,14 @@ pub fn create_client( let proxy = hyper_http_proxy::Proxy::new(hyper_http_proxy::Intercept::Https, proxy.parse()?); let proxy_connector = hyper_http_proxy::ProxyConnector::from_proxy(connector, proxy)?; - let client = hyper_migration::client_builder().build(proxy_connector); + let client = http_common::client_builder().build(proxy_connector); debug!( - "HYPER_CLIENT | Proxy connector created with proxy: {:?}", + "HTTP_CLIENT | Proxy connector created with proxy: {:?}", proxy_https ); Ok(client) } else { let proxy_connector = hyper_http_proxy::ProxyConnector::new(connector)?; - Ok(hyper_migration::client_builder().build(proxy_connector)) + Ok(http_common::client_builder().build(proxy_connector)) } } diff --git a/bottlecap/src/traces/mod.rs b/bottlecap/src/traces/mod.rs index 7d1581c2e..2efc1de65 100644 --- a/bottlecap/src/traces/mod.rs +++ b/bottlecap/src/traces/mod.rs @@ -2,7 +2,7 @@ // SPDX-License-Identifier: Apache-2.0 pub mod context; -pub mod hyper_client; +pub mod http_client; pub mod propagation; pub mod proxy_aggregator; pub mod proxy_flusher; diff --git a/bottlecap/src/traces/stats_flusher.rs b/bottlecap/src/traces/stats_flusher.rs index f860ff763..5a3663cd2 100644 --- a/bottlecap/src/traces/stats_flusher.rs +++ b/bottlecap/src/traces/stats_flusher.rs @@ -8,7 +8,7 @@ use tokio::sync::OnceCell; use crate::config; use crate::lifecycle::invocation::processor::S_TO_MS; -use crate::traces::hyper_client::{self, HyperClient}; +use crate::traces::http_client::{self, HttpClient}; use crate::traces::stats_aggregator::StatsAggregator; use dogstatsd::api_key::ApiKeyFactory; use libdd_common::Endpoint; @@ -24,7 +24,7 @@ pub struct StatsFlusher { /// Cached HTTP client, lazily initialized on first use. /// TODO: `StatsFlusher` and `TraceFlusher` both hit trace.agent.datadoghq.{site} and could /// share a single HTTP client for better connection pooling. - http_client: OnceCell, + http_client: OnceCell, } impl StatsFlusher { @@ -72,6 +72,7 @@ impl StatsFlusher { api_key: Some(api_key_clone.into()), timeout_ms: self.config.flush_timeout * S_TO_MS, test_token: None, + use_system_resolver: false, } } }) @@ -176,11 +177,11 @@ impl StatsFlusher { /// /// Returns `None` if client creation fails. The error is logged but not cached, /// allowing retry on subsequent calls. - async fn get_or_init_http_client(&self) -> Option<&HyperClient> { + async fn get_or_init_http_client(&self) -> Option<&HttpClient> { match self .http_client .get_or_try_init(|| async { - hyper_client::create_client( + http_client::create_client( self.config.proxy_https.as_ref(), self.config.tls_cert_file.as_ref(), ) diff --git a/bottlecap/src/traces/stats_processor.rs b/bottlecap/src/traces/stats_processor.rs index 86d3d81ff..32bd76792 100644 --- a/bottlecap/src/traces/stats_processor.rs +++ b/bottlecap/src/traces/stats_processor.rs @@ -12,12 +12,12 @@ use axum::{ use tokio::sync::mpsc::Sender; use tracing::{debug, error}; -use libdd_common::hyper_migration; +use libdd_common::http_common; use libdd_trace_protobuf::pb; use libdd_trace_utils::stats_utils; -use super::trace_agent::MAX_CONTENT_LENGTH; use crate::http::extract_request_body; +use crate::traces::trace_agent::MAX_CONTENT_LENGTH; #[async_trait] pub trait StatsProcessor { @@ -66,7 +66,7 @@ impl StatsProcessor for ServerlessStatsProcessor { // deserialize trace stats from the request body, convert to protobuf structs (see // trace-protobuf crate) let mut stats: pb::ClientStatsPayload = - match stats_utils::get_stats_from_request_body(hyper_migration::Body::from_bytes(body)) + match stats_utils::get_stats_from_request_body(http_common::Body::from_bytes(body)) .await { Ok(result) => result, diff --git a/bottlecap/src/traces/trace_agent.rs b/bottlecap/src/traces/trace_agent.rs index 96a16520e..4035e24e8 100644 --- a/bottlecap/src/traces/trace_agent.rs +++ b/bottlecap/src/traces/trace_agent.rs @@ -43,7 +43,7 @@ use crate::{ trace_processor, }, }; -use libdd_common::hyper_migration; +use libdd_common::http_common; use libdd_trace_protobuf::pb; use libdd_trace_utils::trace_utils::{self}; @@ -503,22 +503,22 @@ impl TraceAgent { let tracer_header_tags = (&parts.headers).into(); - let (body_size, mut traces) = match version { - ApiVersion::V04 => match trace_utils::get_traces_from_request_body( - hyper_migration::Body::from_bytes(body), - ) - .await - { - Ok(result) => result, - Err(err) => { - return error_response( - StatusCode::INTERNAL_SERVER_ERROR, - format!("Error deserializing trace from request body: {err}"), - ); + let (body_size, mut traces): (usize, Vec>) = match version { + ApiVersion::V04 => { + match trace_utils::get_traces_from_request_body(http_common::Body::from_bytes(body)) + .await + { + Ok(result) => result, + Err(err) => { + return error_response( + StatusCode::INTERNAL_SERVER_ERROR, + format!("Error deserializing trace from request body: {err}"), + ); + } } - }, + } ApiVersion::V05 => match trace_utils::get_v05_traces_from_request_body( - hyper_migration::Body::from_bytes(body), + http_common::Body::from_bytes(body), ) .await { diff --git a/bottlecap/src/traces/trace_aggregator.rs b/bottlecap/src/traces/trace_aggregator.rs index a17d54c7b..c16e8d59b 100644 --- a/bottlecap/src/traces/trace_aggregator.rs +++ b/bottlecap/src/traces/trace_aggregator.rs @@ -1,4 +1,5 @@ use libdd_trace_utils::send_data::SendDataBuilder; +use libdd_trace_utils::trace_utils::TracerHeaderTags; use std::collections::VecDeque; /// Maximum content size per payload uncompressed in bytes, @@ -7,16 +8,71 @@ use std::collections::VecDeque; /// pub const MAX_CONTENT_SIZE_BYTES: usize = 3_200_000; +/// Owned version of `TracerHeaderTags<'a>` so it can be stored across async +/// boundaries without lifetime issues. +pub struct OwnedTracerHeaderTags { + pub lang: String, + pub lang_version: String, + pub lang_interpreter: String, + pub lang_vendor: String, + pub tracer_version: String, + pub container_id: String, + pub client_computed_top_level: bool, + pub client_computed_stats: bool, + pub dropped_p0_traces: usize, + pub dropped_p0_spans: usize, +} + +impl From> for OwnedTracerHeaderTags { + fn from(tags: TracerHeaderTags<'_>) -> Self { + Self { + lang: tags.lang.to_string(), + lang_version: tags.lang_version.to_string(), + lang_interpreter: tags.lang_interpreter.to_string(), + lang_vendor: tags.lang_vendor.to_string(), + tracer_version: tags.tracer_version.to_string(), + container_id: tags.container_id.to_string(), + client_computed_top_level: tags.client_computed_top_level, + client_computed_stats: tags.client_computed_stats, + dropped_p0_traces: tags.dropped_p0_traces, + dropped_p0_spans: tags.dropped_p0_spans, + } + } +} + +impl OwnedTracerHeaderTags { + #[must_use] + pub fn to_tracer_header_tags(&self) -> TracerHeaderTags<'_> { + TracerHeaderTags { + lang: &self.lang, + lang_version: &self.lang_version, + lang_interpreter: &self.lang_interpreter, + lang_vendor: &self.lang_vendor, + tracer_version: &self.tracer_version, + container_id: &self.container_id, + client_computed_top_level: self.client_computed_top_level, + client_computed_stats: self.client_computed_stats, + dropped_p0_traces: self.dropped_p0_traces, + dropped_p0_spans: self.dropped_p0_spans, + } + } +} + // Bundle SendDataBuilder with payload size because SendDataBuilder doesn't // expose a getter for the size pub struct SendDataBuilderInfo { pub builder: SendDataBuilder, pub size: usize, + pub header_tags: OwnedTracerHeaderTags, } impl SendDataBuilderInfo { - pub fn new(builder: SendDataBuilder, size: usize) -> Self { - Self { builder, size } + pub fn new(builder: SendDataBuilder, size: usize, header_tags: OwnedTracerHeaderTags) -> Self { + Self { + builder, + size, + header_tags, + } } } @@ -25,7 +81,7 @@ impl SendDataBuilderInfo { pub struct TraceAggregator { queue: VecDeque, max_content_size_bytes: usize, - buffer: Vec, + buffer: Vec, } impl Default for TraceAggregator { @@ -55,7 +111,7 @@ impl TraceAggregator { } /// Returns a batch of trace payloads, subject to the max content size. - pub fn get_batch(&mut self) -> Vec { + pub fn get_batch(&mut self) -> Vec { let mut batch_size = 0; // Fill the batch @@ -70,7 +126,7 @@ impl TraceAggregator { break; } batch_size += payload_size; - self.buffer.push(payload_info.builder); + self.buffer.push(payload_info); } else { break; } @@ -95,10 +151,8 @@ mod tests { use super::*; - #[test] - fn test_add() { - let mut aggregator = TraceAggregator::default(); - let tracer_header_tags = TracerHeaderTags { + fn make_header_tags() -> TracerHeaderTags<'static> { + TracerHeaderTags { lang: "lang", lang_version: "lang_version", lang_interpreter: "lang_interpreter", @@ -109,43 +163,39 @@ mod tests { client_computed_stats: true, dropped_p0_traces: 0, dropped_p0_spans: 0, - }; - let size = 1; - let payload = SendDataBuilder::new( + } + } + + fn make_builder_info(size: usize) -> SendDataBuilderInfo { + let tracer_header_tags = make_header_tags(); + let builder = SendDataBuilder::new( size, TracerPayloadCollection::V07(Vec::new()), - tracer_header_tags, + tracer_header_tags.clone(), &Endpoint::from_slice("localhost"), ); + SendDataBuilderInfo::new( + builder, + size, + OwnedTracerHeaderTags::from(tracer_header_tags), + ) + } - aggregator.add(SendDataBuilderInfo::new(payload.clone(), size)); + #[test] + fn test_add() { + let mut aggregator = TraceAggregator::default(); + let size = 1; + + aggregator.add(make_builder_info(size)); assert_eq!(aggregator.queue.len(), 1); } #[test] fn test_get_batch() { let mut aggregator = TraceAggregator::default(); - let tracer_header_tags = TracerHeaderTags { - lang: "lang", - lang_version: "lang_version", - lang_interpreter: "lang_interpreter", - lang_vendor: "lang_vendor", - tracer_version: "tracer_version", - container_id: "container_id", - client_computed_top_level: true, - client_computed_stats: true, - dropped_p0_traces: 0, - dropped_p0_spans: 0, - }; let size = 1; - let payload = SendDataBuilder::new( - size, - TracerPayloadCollection::V07(Vec::new()), - tracer_header_tags, - &Endpoint::from_slice("localhost"), - ); - aggregator.add(SendDataBuilderInfo::new(payload.clone(), size)); + aggregator.add(make_builder_info(size)); assert_eq!(aggregator.queue.len(), 1); let batch = aggregator.get_batch(); assert_eq!(batch.len(), 1); @@ -154,30 +204,12 @@ mod tests { #[test] fn test_get_batch_full_entries() { let mut aggregator = TraceAggregator::new(2); - let tracer_header_tags = TracerHeaderTags { - lang: "lang", - lang_version: "lang_version", - lang_interpreter: "lang_interpreter", - lang_vendor: "lang_vendor", - tracer_version: "tracer_version", - container_id: "container_id", - client_computed_top_level: true, - client_computed_stats: true, - dropped_p0_traces: 0, - dropped_p0_spans: 0, - }; let size = 1; - let payload = SendDataBuilder::new( - size, - TracerPayloadCollection::V07(Vec::new()), - tracer_header_tags, - &Endpoint::from_slice("localhost"), - ); // Add 3 payloads - aggregator.add(SendDataBuilderInfo::new(payload.clone(), size)); - aggregator.add(SendDataBuilderInfo::new(payload.clone(), size)); - aggregator.add(SendDataBuilderInfo::new(payload.clone(), size)); + aggregator.add(make_builder_info(size)); + aggregator.add(make_builder_info(size)); + aggregator.add(make_builder_info(size)); // The batch should only contain the first 2 payloads let first_batch = aggregator.get_batch(); diff --git a/bottlecap/src/traces/trace_aggregator_service.rs b/bottlecap/src/traces/trace_aggregator_service.rs index b8ffc4a30..6c23faed4 100644 --- a/bottlecap/src/traces/trace_aggregator_service.rs +++ b/bottlecap/src/traces/trace_aggregator_service.rs @@ -1,4 +1,3 @@ -use libdd_trace_utils::send_data::SendDataBuilder; use tokio::sync::{mpsc, oneshot}; use tracing::{debug, error}; @@ -8,7 +7,7 @@ use crate::traces::trace_aggregator::{ pub enum AggregatorCommand { InsertPayload(Box), - GetBatches(oneshot::Sender>>), + GetBatches(oneshot::Sender>>), Clear, Shutdown, } @@ -27,7 +26,7 @@ impl AggregatorHandle { .send(AggregatorCommand::InsertPayload(Box::new(payload_info))) } - pub async fn get_batches(&self) -> Result>, String> { + pub async fn get_batches(&self) -> Result>, String> { let (response_tx, response_rx) = oneshot::channel(); self.tx .send(AggregatorCommand::GetBatches(response_tx)) @@ -105,9 +104,11 @@ impl AggregatorService { #[allow(clippy::unwrap_used)] mod tests { use super::*; + use crate::traces::trace_aggregator::OwnedTracerHeaderTags; use libdd_common::Endpoint; use libdd_trace_utils::{ - trace_utils::TracerHeaderTags, tracer_payload::TracerPayloadCollection, + send_data::SendDataBuilder, trace_utils::TracerHeaderTags, + tracer_payload::TracerPayloadCollection, }; #[tokio::test] @@ -131,6 +132,7 @@ mod tests { dropped_p0_spans: 0, }; let size = 1; + let owned_tags = OwnedTracerHeaderTags::from(tracer_header_tags.clone()); let payload = SendDataBuilder::new( size, TracerPayloadCollection::V07(Vec::new()), @@ -139,7 +141,7 @@ mod tests { ); handle - .insert_payload(SendDataBuilderInfo::new(payload, size)) + .insert_payload(SendDataBuilderInfo::new(payload, size, owned_tags)) .unwrap(); let batches = handle.get_batches().await.unwrap(); diff --git a/bottlecap/src/traces/trace_flusher.rs b/bottlecap/src/traces/trace_flusher.rs index 811751d8b..d7610f46b 100644 --- a/bottlecap/src/traces/trace_flusher.rs +++ b/bottlecap/src/traces/trace_flusher.rs @@ -5,8 +5,9 @@ use dogstatsd::api_key::ApiKeyFactory; use libdd_common::Endpoint; use libdd_trace_utils::{ config_utils::trace_intake_url_prefixed, - send_data::SendDataBuilder, - trace_utils::{self, SendData}, + send_data::SendData, + trace_utils::{self}, + tracer_payload::TracerPayloadCollection, }; use std::str::FromStr; use std::sync::Arc; @@ -16,7 +17,7 @@ use tracing::{debug, error}; use crate::config::Config; use crate::lifecycle::invocation::processor::S_TO_MS; -use crate::traces::hyper_client::{self, HyperClient}; +use crate::traces::http_client::{self, HttpClient}; use crate::traces::trace_aggregator_service::AggregatorHandle; pub struct TraceFlusher { @@ -30,7 +31,7 @@ pub struct TraceFlusher { /// Cached HTTP client, lazily initialized on first use. /// TODO: `TraceFlusher` and `StatsFlusher` both hit trace.agent.datadoghq.{site} and could /// share a single HTTP client for better connection pooling. - http_client: OnceCell, + http_client: OnceCell, } impl TraceFlusher { @@ -53,6 +54,7 @@ impl TraceFlusher { api_key: Some(api_key.clone().into()), timeout_ms: config.flush_timeout * S_TO_MS, test_token: None, + use_system_resolver: false, }; additional_endpoints.push(endpoint); } @@ -91,17 +93,12 @@ impl TraceFlusher { if let Some(traces) = failed_traces { // If we have traces from a previous failed attempt, try to send those first. - // TODO: Currently retries always go to the primary endpoint (None), even if the - // original failure was for an additional endpoint. This means traces that failed - // to send to additional endpoints will be retried to the primary endpoint instead. - // To fix this, we need to track which endpoint each failed trace was destined for, - // possibly by storing (Vec, Option) pairs in failed_batch. if !traces.is_empty() { debug!( "TRACES | Retrying to send {} previously failed batches", traces.len() ); - let retry_result = Self::send_traces(traces, None, http_client.clone()).await; + let retry_result = Self::send_traces(traces, http_client.clone()).await; if retry_result.is_some() { // Still failed, return to retry later return retry_result; @@ -120,32 +117,43 @@ impl TraceFlusher { let mut batch_tasks = JoinSet::new(); for trace_builders in all_batches { - let traces: Vec<_> = trace_builders + let traces_with_tags: Vec<_> = trace_builders .into_iter() - .map(|builder| builder.with_api_key(api_key.as_str())) - .map(SendDataBuilder::build) + .map(|info| { + let trace = info.builder.with_api_key(api_key.as_str()).build(); + (trace, info.header_tags) + }) .collect(); - // Send to PRIMARY endpoint (the default endpoint configured in the trace). - // Passing None means "use the endpoint already configured in the SendData". - let traces_clone = traces.clone(); - let client_clone = http_client.clone(); - batch_tasks - .spawn(async move { Self::send_traces(traces_clone, None, client_clone).await }); - // Send to ADDITIONAL endpoints for dual-shipping. - // Each additional endpoint gets the same traces, enabling multi-region delivery. + // Construct separate SendData objects per endpoint by cloning the inner + // V07 payload data (TracerPayload is Clone, but SendData is not). for endpoint in self.additional_endpoints.clone() { - let traces_clone = traces.clone(); + let additional_traces: Vec<_> = traces_with_tags + .iter() + .filter_map(|(trace, tags)| match trace.get_payloads() { + TracerPayloadCollection::V07(payloads) => Some(SendData::new( + trace.len(), + TracerPayloadCollection::V07(payloads.clone()), + tags.to_tracer_header_tags(), + &endpoint, + )), + // All payloads in the extension are V07 (produced by + // collect_pb_trace_chunks), so this branch is unreachable. + _ => None, + }) + .collect(); let client_clone = http_client.clone(); - batch_tasks.spawn(async move { - Self::send_traces(traces_clone, Some(endpoint), client_clone).await - }); + batch_tasks + .spawn(async move { Self::send_traces(additional_traces, client_clone).await }); } + + // Send to PRIMARY endpoint (moves traces into the task). + let traces: Vec<_> = traces_with_tags.into_iter().map(|(t, _)| t).collect(); + let client_clone = http_client.clone(); + batch_tasks.spawn(async move { Self::send_traces(traces, client_clone).await }); } // Collect failed traces from all endpoints (primary + additional). - // Note: We lose track of which endpoint each failure came from here. - // All failures are mixed together and will be retried to the primary endpoint only. while let Some(result) = batch_tasks.join_next().await { if let Ok(Some(mut failed)) = result { failed_batch.append(&mut failed); @@ -166,11 +174,11 @@ impl TraceFlusher { /// /// Returns `None` if client creation fails. The error is logged but not cached, /// allowing retry on subsequent calls. - async fn get_or_init_http_client(&self) -> Option { + async fn get_or_init_http_client(&self) -> Option { match self .http_client .get_or_try_init(|| async { - hyper_client::create_client( + http_client::create_client( self.config.proxy_https.as_ref(), self.config.tls_cert_file.as_ref(), ) @@ -187,21 +195,9 @@ impl TraceFlusher { /// Sends traces to the Datadog intake endpoint using the provided HTTP client. /// - /// # Arguments - /// - /// * `traces` - The traces to send - /// * `override_endpoint` - If `Some`, sends to this endpoint instead of the trace's - /// configured endpoint. Used for sending to additional endpoints. - /// * `http_client` - The HTTP client to use for sending - /// - /// # Returns - /// - /// Returns the traces back if there was an error sending them (for retry). - async fn send_traces( - traces: Vec, - override_endpoint: Option, - http_client: HyperClient, - ) -> Option> { + /// Each `SendData` is sent to its own configured target endpoint. + /// Returns the traces back (by value) if there was an error sending them (for retry). + async fn send_traces(traces: Vec, http_client: HttpClient) -> Option> { if traces.is_empty() { return None; } @@ -211,17 +207,11 @@ impl TraceFlusher { debug!("TRACES | Flushing {} traces", coalesced_traces.len()); for trace in &coalesced_traces { - let trace_to_send = match &override_endpoint { - Some(endpoint) => trace.with_endpoint(endpoint.clone()), - None => trace.clone(), - }; - - let send_result = trace_to_send.send(&http_client).await.last_result; + let send_result = trace.send(&http_client).await.last_result; if let Err(e) = send_result { error!("TRACES | Request failed: {e:?}"); - // Return the original traces for retry - return Some(coalesced_traces.clone()); + return Some(coalesced_traces); } } diff --git a/bottlecap/src/traces/trace_processor.rs b/bottlecap/src/traces/trace_processor.rs index 8b3cc3e03..6b0cdf789 100644 --- a/bottlecap/src/traces/trace_processor.rs +++ b/bottlecap/src/traces/trace_processor.rs @@ -31,8 +31,8 @@ use tokio::sync::mpsc::Sender; use tokio::sync::mpsc::error::SendError; use tracing::{debug, error}; -use super::stats_generator::StatsGenerator; -use super::trace_aggregator::SendDataBuilderInfo; +use crate::traces::stats_generator::StatsGenerator; +use crate::traces::trace_aggregator::{OwnedTracerHeaderTags, SendDataBuilderInfo}; #[derive(Clone)] #[allow(clippy::module_name_repetitions)] @@ -357,9 +357,25 @@ impl TraceProcessor for ServerlessTraceProcessor { api_key: None, timeout_ms: config.flush_timeout * S_TO_MS, test_token: None, + use_system_resolver: false, }; - let builder = SendDataBuilder::new(body_size, payload.clone(), header_tags, &endpoint) + // Clone inner V07 payloads for stats generation (TracerPayload is Clone, + // but TracerPayloadCollection is not). + let payloads_for_stats = match &payload { + TracerPayloadCollection::V07(payloads) => { + TracerPayloadCollection::V07(payloads.clone()) + } + other => { + error!("TRACE_PROCESSOR | Unexpected payload type for stats: {other:?}"); + TracerPayloadCollection::V07(vec![]) + } + }; + + let owned_header_tags = OwnedTracerHeaderTags::from(header_tags.clone()); + + // Move original payload into builder (no clone needed) + let builder = SendDataBuilder::new(body_size, payload, header_tags, &endpoint) .with_compression(Compression::Zstd(config.apm_config_compression_level)) .with_retry_strategy(RetryStrategy::new( 1, @@ -368,7 +384,10 @@ impl TraceProcessor for ServerlessTraceProcessor { None, )); - (SendDataBuilderInfo::new(builder, body_size), payload) + ( + SendDataBuilderInfo::new(builder, body_size, owned_header_tags), + payloads_for_stats, + ) } }