diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 5c0f013..a4734b1 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -234,6 +234,7 @@ async fn start_dogstatsd( windows_pipe_name, so_rcvbuf: None, buffer_size: None, + queue_size: None, }; #[cfg(not(all(windows, feature = "windows-pipes")))] @@ -243,6 +244,7 @@ async fn start_dogstatsd( metric_namespace, so_rcvbuf: None, buffer_size: None, + queue_size: None, }; let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); diff --git a/crates/dogstatsd/src/dogstatsd.rs b/crates/dogstatsd/src/dogstatsd.rs index 0e89c42..11eae2d 100644 --- a/crates/dogstatsd/src/dogstatsd.rs +++ b/crates/dogstatsd/src/dogstatsd.rs @@ -8,7 +8,6 @@ //! namespacing, and forwards them to an aggregator for batching and shipping to Datadog. use std::net::SocketAddr; -use std::str::Split; use crate::aggregator_service::AggregatorHandle; use crate::errors::ParseError::UnsupportedType; @@ -24,6 +23,10 @@ use {std::sync::Arc, tokio::io::AsyncReadExt, tokio::net::windows::named_pipe::S // Used for both UDP recv_from and Windows named pipe reads. const DEFAULT_BUFFER_SIZE: usize = 8192; +// Internal queue capacity between the socket reader and metric processor. +// At 8 KB per packet this caps user-space buffering at ~8 MB. +const DEFAULT_QUEUE_SIZE: usize = 1024; + /// Configuration for the DogStatsD server pub struct DogStatsDConfig { /// Host to bind UDP socket to (e.g., "127.0.0.1") @@ -42,6 +45,9 @@ pub struct DogStatsDConfig { /// Defaults to 8192. For UDP, the client must batch metrics into packets of /// this size for the increase to take effect. pub buffer_size: Option, + /// Internal queue capacity between the socket reader and metric processor. + /// Defaults to 1024. Increase if the processor can't keep up with burst traffic. + pub queue_size: Option, } /// Represents the source of a DogStatsD message. Varies by transport method. @@ -82,39 +88,35 @@ enum BufferReader { } impl BufferReader { - /// This is the main entry point for receiving metric data. - /// Note: Different transports have different blocking behaviors. - async fn read(&self, buf_size: usize) -> std::io::Result<(Vec, MessageSource)> { + /// Reads one packet into the provided buffer, waiting if none is available. + /// Returns `(bytes_read, source)`. The caller owns the buffer and can + /// pass it through a channel without copying. + async fn read_into(&mut self, buf: &mut [u8]) -> std::io::Result<(usize, MessageSource)> { match self { BufferReader::UdpSocket(socket) => { - // UDP socket: blocks until a packet arrives - let mut buf = vec![0u8; buf_size]; - - #[allow(clippy::expect_used)] - let (amt, src) = socket - .recv_from(&mut buf) - .await - .expect("didn't receive data"); - Ok((buf[..amt].to_owned(), MessageSource::Network(src))) + let (amt, src) = socket.recv_from(buf).await?; + Ok((amt, MessageSource::Network(src))) } - BufferReader::MirrorTest(data, socket) => { - // Mirror Reader: returns immediately with stored data - Ok((data.clone(), MessageSource::Network(*socket))) + BufferReader::MirrorTest(data, addr) => { + let len = data.len().min(buf.len()); + buf[..len].copy_from_slice(&data[..len]); + Ok((len, MessageSource::Network(*addr))) } #[cfg(all(windows, feature = "windows-pipes"))] BufferReader::NamedPipe { pipe_name, receiver, - } => { - // Named Pipe Reader: receives data from client handler tasks - match receiver.lock().await.recv().await { - Some(data) => Ok((data, MessageSource::NamedPipe(pipe_name.clone()))), - None => { - // Channel closed - server exited, already triggered cancellation - Ok((Vec::new(), MessageSource::NamedPipe(pipe_name.clone()))) - } + } => match receiver.lock().await.recv().await { + Some(data) => { + let len = data.len().min(buf.len()); + buf[..len].copy_from_slice(&data[..len]); + Ok((len, MessageSource::NamedPipe(pipe_name.clone()))) } - } + None => Err(std::io::Error::new( + std::io::ErrorKind::ConnectionReset, + "named pipe channel closed", + )), + }, } } } @@ -126,6 +128,7 @@ pub struct DogStatsD { buffer_reader: BufferReader, metric_namespace: Option, buf_size: usize, + queue_size: usize, } impl DogStatsD { @@ -201,90 +204,242 @@ impl DogStatsD { BufferReader::UdpSocket(socket) }; + let queue_size = match config.queue_size { + Some(0) => { + error!( + "DogStatsD queue_size cannot be 0, falling back to default ({})", + DEFAULT_QUEUE_SIZE + ); + DEFAULT_QUEUE_SIZE + } + Some(size) => size, + None => DEFAULT_QUEUE_SIZE, + }; + DogStatsD { cancel_token, aggregator_handle, buffer_reader, metric_namespace: config.metric_namespace.clone(), buf_size, + queue_size, } } - /// Main event loop that continuously receives and processes metrics. + /// Starts the DogStatsD server with a dedicated reader task. + /// + /// Spawns a reader task that drains the socket into a bounded channel + /// with `queue_size` capacity, while this task processes packets from the + /// channel. + /// + /// This decoupling prevents packet loss when the OS `SO_RCVBUF` + /// is small (or capped by the OS) by moving buffering into + /// user space. + /// + /// If the queue fills up, the reader drops packets rather + /// than blocking. pub async fn spin(self) { - let mut spin_cancelled = false; - while !spin_cancelled { - self.consume_statsd().await; - spin_cancelled = self.cancel_token.is_cancelled(); - } + let DogStatsD { + cancel_token, + aggregator_handle, + buffer_reader, + metric_namespace, + buf_size, + queue_size, + } = self; + + let (tx, mut rx) = tokio::sync::mpsc::channel(queue_size); + + let reader_token = cancel_token.clone(); + tokio::spawn(async move { + read_loop(buffer_reader, tx, reader_token, buf_size, queue_size).await; + }); + + process_loop( + &mut rx, + &cancel_token, + &aggregator_handle, + metric_namespace.as_deref(), + ) + .await; } - /// Receive one batch of metrics from the transport layer and process them. - async fn consume_statsd(&self) { + /// Process a single buffer from the transport. Used by tests via `MirrorTest`. + #[cfg(test)] + async fn consume_statsd(&mut self) { + let mut buf = vec![0u8; self.buf_size]; #[allow(clippy::expect_used)] - let (buf, src) = self + let (len, src) = self .buffer_reader - .read(self.buf_size) + .read_into(&mut buf) .await .expect("didn't receive data"); + process_packet( + &buf[..len], + &src, + &self.aggregator_handle, + self.metric_namespace.as_deref(), + ); + } +} - // Skip empty buffers (e.g., from channel close) - if buf.is_empty() { - debug!("Received empty buffer from {}, skipping", src); - return; +/// Drains the transport into the channel as fast as possible. +/// +/// Allocates a fresh buffer for each read. On queue full, drops the packet +/// and logs at power-of-two intervals (matching Go agent behavior). +async fn read_loop( + mut reader: BufferReader, + tx: tokio::sync::mpsc::Sender<(Vec, MessageSource)>, + cancel: tokio_util::sync::CancellationToken, + buf_size: usize, + queue_capacity: usize, +) { + let mut dropped: u64 = 0; + loop { + let mut buf = vec![0u8; buf_size]; + let result = tokio::select! { + r = reader.read_into(&mut buf) => r, + _ = cancel.cancelled() => break, + }; + let (len, source) = match result { + Ok(r) => r, + Err(e) if e.kind() == std::io::ErrorKind::ConnectionReset => { + debug!("DogStatsD transport closed: {}", e); + break; + } + Err(e) => { + error!("DogStatsD read error: {}", e); + continue; + } + }; + if len == 0 { + continue; + } + buf.truncate(len); + match tx.try_send((buf, source)) { + Ok(()) => {} + Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { + dropped += 1; + if dropped.is_power_of_two() { + debug!("DogStatsD queue full, {} packets dropped so far", dropped); + } + } + Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => break, } - - #[allow(clippy::expect_used)] - let msgs = std::str::from_utf8(&buf).expect("couldn't parse as string"); - trace!("Received message: {} from {}", msgs, src); - let statsd_metric_strings = msgs.split('\n'); - self.insert_metrics(statsd_metric_strings); } - - fn prepend_namespace(namespace: &str, metric: &mut Metric) { - let new_name = format!("{}.{}", namespace, metric.name); - metric.name = ustr::Ustr::from(&new_name); - metric.id = id(metric.name, &metric.tags, metric.timestamp); + if dropped > 0 { + error!( + "DogStatsD reader exiting, {} total packets dropped (queue capacity: {})", + dropped, queue_capacity + ); } +} - fn insert_metrics(&self, msg: Split) { - let namespace = self.metric_namespace.as_deref(); - let all_valid_metrics: Vec = msg - .filter(|m| { - !m.is_empty() - && !m.starts_with("_sc|") - && !m.starts_with("_e{") - // todo(serverless): remove this hack, and create a blocklist for metrics - // or another mechanism for this. - // - // avoid metric duplication with lambda layer - && !m.starts_with("aws.lambda.enhanced.invocations") - }) // exclude empty messages, service checks, and events - .map(|m| m.replace('\n', "")) - .filter_map(|m| match parse(m.as_str()) { - Ok(metric) => Some(metric), - Err(e) => { - // unsupported type is quite common with dd_trace metrics. Avoid perf issue and - // log spam in that case - match e { - UnsupportedType(_) => debug!("Unsupported metric type: {}. {}", m, e), - _ => error!("Failed to parse metric {}: {}", m, e), +/// Receives packets from the channel, parses them, and forwards metrics +/// to the aggregator. On cancellation, drains any remaining buffered +/// packets before exiting so no already-read data is lost. +async fn process_loop( + rx: &mut tokio::sync::mpsc::Receiver<(Vec, MessageSource)>, + cancel: &tokio_util::sync::CancellationToken, + aggregator: &AggregatorHandle, + namespace: Option<&str>, +) { + loop { + tokio::select! { + packet = rx.recv() => { + match packet { + Some((buf, src)) => { + process_packet(&buf, &src, aggregator, namespace); } - None + None => break, + } + } + _ = cancel.cancelled() => { + while let Ok((buf, src)) = rx.try_recv() { + process_packet(&buf, &src, aggregator, namespace); } - }) - .map(|mut metric| { - if let Some(ns) = namespace { - Self::prepend_namespace(ns, &mut metric); + break; + } + } + } +} + +/// Decodes a raw UDP packet into metric strings and sends them to the aggregator. +fn process_packet( + buf: &[u8], + src: &MessageSource, + aggregator: &AggregatorHandle, + namespace: Option<&str>, +) { + if buf.is_empty() { + debug!("Received empty buffer from {}, skipping", src); + return; + } + + trace!("Received packet: {} bytes from {}", buf.len(), src); + + #[allow(clippy::expect_used)] + let msgs = std::str::from_utf8(buf).expect("couldn't parse as string"); + trace!("Received message: {} from {}", msgs, src); + let statsd_metric_strings: Vec<&str> = msgs.split('\n').collect(); + let metric_count_in_packet = statsd_metric_strings + .iter() + .filter(|m| !m.is_empty()) + .count(); + trace!("Packet contains {} metric strings", metric_count_in_packet); + insert_metrics(statsd_metric_strings.into_iter(), aggregator, namespace); +} + +fn prepend_namespace(namespace: &str, metric: &mut Metric) { + let new_name = format!("{}.{}", namespace, metric.name); + metric.name = ustr::Ustr::from(&new_name); + metric.id = id(metric.name, &metric.tags, metric.timestamp); +} + +/// Parses metric strings, applies optional namespace, and sends them to the aggregator. +fn insert_metrics<'a, I>(msg: I, aggregator: &AggregatorHandle, namespace: Option<&str>) +where + I: Iterator, +{ + let all_valid_metrics: Vec = msg + .filter(|m| { + !m.is_empty() + && !m.starts_with("_sc|") + && !m.starts_with("_e{") + // todo(serverless): remove this hack, and create a blocklist for metrics + // or another mechanism for this. + // + // avoid metric duplication with lambda layer + && !m.starts_with("aws.lambda.enhanced.invocations") + }) // exclude empty messages, service checks, and events + .map(|m| m.replace('\n', "")) + .filter_map(|m| match parse(m.as_str()) { + Ok(metric) => Some(metric), + Err(e) => { + // unsupported type is quite common with dd_trace metrics. Avoid perf issue and + // log spam in that case + match e { + UnsupportedType(_) => debug!("Unsupported metric type: {}. {}", m, e), + _ => error!("Failed to parse metric {}: {}", m, e), } - metric - }) - .collect(); - if !all_valid_metrics.is_empty() { - // Send metrics through the channel - no lock needed! - if let Err(e) = self.aggregator_handle.insert_batch(all_valid_metrics) { - error!("Failed to send metrics to aggregator: {}", e); + None } + }) + .map(|mut metric| { + if let Some(ns) = namespace { + prepend_namespace(ns, &mut metric); + } + metric + }) + .collect(); + if !all_valid_metrics.is_empty() { + debug!( + "Parsed {} valid metrics, sending to aggregator", + all_valid_metrics.len() + ); + // Send metrics through the channel - no lock needed! + if let Err(e) = aggregator.insert_batch(all_valid_metrics) { + error!("Failed to send metrics to aggregator: {}", e); } } } @@ -462,6 +617,8 @@ mod tests { use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use tracing_test::traced_test; + use super::*; + #[tokio::test] async fn test_dogstatsd_multi_distribution() { let response = setup_and_consume_dogstatsd( @@ -585,7 +742,7 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d let service_task = tokio::spawn(service.run()); let cancel_token = tokio_util::sync::CancellationToken::new(); - let dogstatsd = DogStatsD { + let mut dogstatsd = DogStatsD { cancel_token, aggregator_handle: handle.clone(), buffer_reader: BufferReader::MirrorTest( @@ -594,6 +751,7 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d ), metric_namespace: None, buf_size: custom_buf_size, + queue_size: DEFAULT_QUEUE_SIZE, }; dogstatsd.consume_statsd().await; @@ -621,6 +779,7 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d windows_pipe_name: None, so_rcvbuf: None, buffer_size: Some(0), + queue_size: None, }; let dogstatsd = DogStatsD::new(&config, handle.clone(), cancel_token).await; @@ -643,7 +802,7 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d let cancel_token = tokio_util::sync::CancellationToken::new(); - let dogstatsd = DogStatsD { + let mut dogstatsd = DogStatsD { cancel_token, aggregator_handle: handle.clone(), buffer_reader: BufferReader::MirrorTest( @@ -651,7 +810,8 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d SocketAddr::new(IpAddr::V4(Ipv4Addr::new(111, 112, 113, 114)), 0), ), metric_namespace, - buf_size: super::DEFAULT_BUFFER_SIZE, + buf_size: DEFAULT_BUFFER_SIZE, + queue_size: DEFAULT_QUEUE_SIZE, }; dogstatsd.consume_statsd().await; diff --git a/crates/dogstatsd/tests/integration_test.rs b/crates/dogstatsd/tests/integration_test.rs index 1f12930..1b242ca 100644 --- a/crates/dogstatsd/tests/integration_test.rs +++ b/crates/dogstatsd/tests/integration_test.rs @@ -112,6 +112,7 @@ async fn start_dogstatsd_on_port( windows_pipe_name: None, so_rcvbuf: None, buffer_size, + queue_size: None, }; let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); let dogstatsd_client = DogStatsD::new( @@ -398,6 +399,7 @@ async fn test_named_pipe_basic_communication() { windows_pipe_name: Some(pipe_name.to_string()), so_rcvbuf: None, buffer_size: None, + queue_size: None, }, handle, cancel_token, @@ -454,6 +456,7 @@ async fn test_named_pipe_disconnect_reconnect() { windows_pipe_name: Some(pipe_name.to_string()), so_rcvbuf: None, buffer_size: None, + queue_size: None, }, handle, cancel_token_clone, @@ -525,6 +528,7 @@ async fn test_named_pipe_cancellation() { windows_pipe_name: Some(pipe_name.to_string()), so_rcvbuf: None, buffer_size: None, + queue_size: None, }, handle, cancel_token_clone, @@ -570,6 +574,7 @@ async fn test_buffer_split_message() { windows_pipe_name: Some(pipe_name.to_string()), so_rcvbuf: None, buffer_size: None, + queue_size: None, }, handle, cancel_token_clone,