From 9df74780474844f9543d0f07138e49521661d447 Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Tue, 17 Feb 2026 16:22:00 -0500 Subject: [PATCH 1/3] feat(dogstatsd): decouple socket reading from metric processing Split spin() into a dedicated reader task and processor task connected by a bounded channel. The reader drains the socket into user-space buffering (~8 MB channel vs ~416 KiB kernel SO_RCVBUF cap), while the processor handles parsing and aggregation independently. This prevents head-of-line blocking where processing time caused kernel buffer overflow and silent packet drops under high-volume metric bursts. - Add configurable queue_size (DD_DOGSTATSD_QUEUE_SIZE, default 1024) - Extract process_packet() and insert_metrics() as free functions - Add read_loop() with cancellation-aware tokio::select! on reads - Add process_loop() that drains remaining packets on shutdown - Handle named pipe ConnectionReset as clean transport close - Validate queue_size=0 with fallback to default Co-Authored-By: Claude Opus 4.6 --- crates/datadog-serverless-compat/src/main.rs | 2 + crates/dogstatsd/src/dogstatsd.rs | 392 ++++++++++++++----- crates/dogstatsd/tests/integration_test.rs | 5 + 3 files changed, 311 insertions(+), 88 deletions(-) diff --git a/crates/datadog-serverless-compat/src/main.rs b/crates/datadog-serverless-compat/src/main.rs index 5c0f013..a4734b1 100644 --- a/crates/datadog-serverless-compat/src/main.rs +++ b/crates/datadog-serverless-compat/src/main.rs @@ -234,6 +234,7 @@ async fn start_dogstatsd( windows_pipe_name, so_rcvbuf: None, buffer_size: None, + queue_size: None, }; #[cfg(not(all(windows, feature = "windows-pipes")))] @@ -243,6 +244,7 @@ async fn start_dogstatsd( metric_namespace, so_rcvbuf: None, buffer_size: None, + queue_size: None, }; let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); diff --git a/crates/dogstatsd/src/dogstatsd.rs b/crates/dogstatsd/src/dogstatsd.rs index 0e89c42..cfa9090 100644 --- a/crates/dogstatsd/src/dogstatsd.rs +++ b/crates/dogstatsd/src/dogstatsd.rs @@ -8,13 +8,12 @@ //! namespacing, and forwards them to an aggregator for batching and shipping to Datadog. use std::net::SocketAddr; -use std::str::Split; use crate::aggregator_service::AggregatorHandle; use crate::errors::ParseError::UnsupportedType; use crate::metric::{id, parse, Metric}; use socket2::{Domain, Protocol, Socket, Type}; -use tracing::{debug, error, trace}; +use tracing::{debug, error}; // Windows-specific imports #[cfg(all(windows, feature = "windows-pipes"))] @@ -24,6 +23,10 @@ use {std::sync::Arc, tokio::io::AsyncReadExt, tokio::net::windows::named_pipe::S // Used for both UDP recv_from and Windows named pipe reads. const DEFAULT_BUFFER_SIZE: usize = 8192; +// Internal queue capacity between the socket reader and metric processor. +// At 8 KB per packet this caps user-space buffering at ~8 MB. +const DEFAULT_QUEUE_SIZE: usize = 1024; + /// Configuration for the DogStatsD server pub struct DogStatsDConfig { /// Host to bind UDP socket to (e.g., "127.0.0.1") @@ -42,6 +45,9 @@ pub struct DogStatsDConfig { /// Defaults to 8192. For UDP, the client must batch metrics into packets of /// this size for the increase to take effect. pub buffer_size: Option, + /// Internal queue capacity between the socket reader and metric processor. + /// Defaults to 1024. Increase if the processor can't keep up with burst traffic. + pub queue_size: Option, } /// Represents the source of a DogStatsD message. Varies by transport method. @@ -82,39 +88,35 @@ enum BufferReader { } impl BufferReader { - /// This is the main entry point for receiving metric data. - /// Note: Different transports have different blocking behaviors. - async fn read(&self, buf_size: usize) -> std::io::Result<(Vec, MessageSource)> { + /// Reads one packet into the provided buffer, waiting if none is available. + /// Returns `(bytes_read, source)`. The caller owns the buffer and can + /// pass it through a channel without copying. + async fn read_into(&mut self, buf: &mut [u8]) -> std::io::Result<(usize, MessageSource)> { match self { BufferReader::UdpSocket(socket) => { - // UDP socket: blocks until a packet arrives - let mut buf = vec![0u8; buf_size]; - - #[allow(clippy::expect_used)] - let (amt, src) = socket - .recv_from(&mut buf) - .await - .expect("didn't receive data"); - Ok((buf[..amt].to_owned(), MessageSource::Network(src))) + let (amt, src) = socket.recv_from(buf).await?; + Ok((amt, MessageSource::Network(src))) } - BufferReader::MirrorTest(data, socket) => { - // Mirror Reader: returns immediately with stored data - Ok((data.clone(), MessageSource::Network(*socket))) + BufferReader::MirrorTest(data, addr) => { + let len = data.len().min(buf.len()); + buf[..len].copy_from_slice(&data[..len]); + Ok((len, MessageSource::Network(*addr))) } #[cfg(all(windows, feature = "windows-pipes"))] BufferReader::NamedPipe { pipe_name, receiver, - } => { - // Named Pipe Reader: receives data from client handler tasks - match receiver.lock().await.recv().await { - Some(data) => Ok((data, MessageSource::NamedPipe(pipe_name.clone()))), - None => { - // Channel closed - server exited, already triggered cancellation - Ok((Vec::new(), MessageSource::NamedPipe(pipe_name.clone()))) - } + } => match receiver.lock().await.recv().await { + Some(data) => { + let len = data.len().min(buf.len()); + buf[..len].copy_from_slice(&data[..len]); + Ok((len, MessageSource::NamedPipe(pipe_name.clone()))) } - } + None => Err(std::io::Error::new( + std::io::ErrorKind::ConnectionReset, + "named pipe channel closed", + )), + }, } } } @@ -126,6 +128,7 @@ pub struct DogStatsD { buffer_reader: BufferReader, metric_namespace: Option, buf_size: usize, + queue_size: usize, } impl DogStatsD { @@ -201,90 +204,242 @@ impl DogStatsD { BufferReader::UdpSocket(socket) }; + let queue_size = match config.queue_size { + Some(0) => { + error!( + "DogStatsD queue_size cannot be 0, falling back to default ({})", + DEFAULT_QUEUE_SIZE + ); + DEFAULT_QUEUE_SIZE + } + Some(size) => size, + None => DEFAULT_QUEUE_SIZE, + }; + DogStatsD { cancel_token, aggregator_handle, buffer_reader, metric_namespace: config.metric_namespace.clone(), buf_size, + queue_size, } } - /// Main event loop that continuously receives and processes metrics. + /// Starts the DogStatsD server with a dedicated reader task. + /// + /// Spawns a reader task that drains the socket into a bounded channel + /// with `queue_size` capacity, while this task processes packets from the + /// channel. + /// + /// This decoupling prevents packet loss when the OS `SO_RCVBUF` + /// is small (or capped by the OS) by moving buffering into + /// user space. + /// + /// If the queue fills up, the reader drops packets rather + /// than blocking. pub async fn spin(self) { - let mut spin_cancelled = false; - while !spin_cancelled { - self.consume_statsd().await; - spin_cancelled = self.cancel_token.is_cancelled(); - } + let DogStatsD { + cancel_token, + aggregator_handle, + buffer_reader, + metric_namespace, + buf_size, + queue_size, + } = self; + + let (tx, mut rx) = tokio::sync::mpsc::channel(queue_size); + + let reader_token = cancel_token.clone(); + tokio::spawn(async move { + read_loop(buffer_reader, tx, reader_token, buf_size, queue_size).await; + }); + + process_loop( + &mut rx, + &cancel_token, + &aggregator_handle, + metric_namespace.as_deref(), + ) + .await; } - /// Receive one batch of metrics from the transport layer and process them. - async fn consume_statsd(&self) { + /// Process a single buffer from the transport. Used by tests via `MirrorTest`. + #[cfg(test)] + async fn consume_statsd(&mut self) { + let mut buf = vec![0u8; self.buf_size]; #[allow(clippy::expect_used)] - let (buf, src) = self + let (len, src) = self .buffer_reader - .read(self.buf_size) + .read_into(&mut buf) .await .expect("didn't receive data"); + process_packet( + &buf[..len], + &src, + &self.aggregator_handle, + self.metric_namespace.as_deref(), + ); + } +} - // Skip empty buffers (e.g., from channel close) - if buf.is_empty() { - debug!("Received empty buffer from {}, skipping", src); - return; +/// Drains the transport into the channel as fast as possible. +/// +/// Allocates a fresh buffer for each read. On queue full, drops the packet +/// and logs at power-of-two intervals (matching Go agent behavior). +async fn read_loop( + mut reader: BufferReader, + tx: tokio::sync::mpsc::Sender<(Vec, MessageSource)>, + cancel: tokio_util::sync::CancellationToken, + buf_size: usize, + queue_capacity: usize, +) { + let mut dropped: u64 = 0; + loop { + let mut buf = vec![0u8; buf_size]; + let result = tokio::select! { + r = reader.read_into(&mut buf) => r, + _ = cancel.cancelled() => break, + }; + let (len, source) = match result { + Ok(r) => r, + Err(e) if e.kind() == std::io::ErrorKind::ConnectionReset => { + debug!("DogStatsD transport closed: {}", e); + break; + } + Err(e) => { + error!("DogStatsD read error: {}", e); + continue; + } + }; + if len == 0 { + continue; + } + buf.truncate(len); + match tx.try_send((buf, source)) { + Ok(()) => {} + Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { + dropped += 1; + if dropped.is_power_of_two() { + debug!("DogStatsD queue full, {} packets dropped so far", dropped); + } + } + Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => break, } - - #[allow(clippy::expect_used)] - let msgs = std::str::from_utf8(&buf).expect("couldn't parse as string"); - trace!("Received message: {} from {}", msgs, src); - let statsd_metric_strings = msgs.split('\n'); - self.insert_metrics(statsd_metric_strings); } - - fn prepend_namespace(namespace: &str, metric: &mut Metric) { - let new_name = format!("{}.{}", namespace, metric.name); - metric.name = ustr::Ustr::from(&new_name); - metric.id = id(metric.name, &metric.tags, metric.timestamp); + if dropped > 0 { + error!( + "DogStatsD reader exiting, {} total packets dropped (queue capacity: {})", + dropped, queue_capacity + ); } +} - fn insert_metrics(&self, msg: Split) { - let namespace = self.metric_namespace.as_deref(); - let all_valid_metrics: Vec = msg - .filter(|m| { - !m.is_empty() - && !m.starts_with("_sc|") - && !m.starts_with("_e{") - // todo(serverless): remove this hack, and create a blocklist for metrics - // or another mechanism for this. - // - // avoid metric duplication with lambda layer - && !m.starts_with("aws.lambda.enhanced.invocations") - }) // exclude empty messages, service checks, and events - .map(|m| m.replace('\n', "")) - .filter_map(|m| match parse(m.as_str()) { - Ok(metric) => Some(metric), - Err(e) => { - // unsupported type is quite common with dd_trace metrics. Avoid perf issue and - // log spam in that case - match e { - UnsupportedType(_) => debug!("Unsupported metric type: {}. {}", m, e), - _ => error!("Failed to parse metric {}: {}", m, e), +/// Receives packets from the channel, parses them, and forwards metrics +/// to the aggregator. On cancellation, drains any remaining buffered +/// packets before exiting so no already-read data is lost. +async fn process_loop( + rx: &mut tokio::sync::mpsc::Receiver<(Vec, MessageSource)>, + cancel: &tokio_util::sync::CancellationToken, + aggregator: &AggregatorHandle, + namespace: Option<&str>, +) { + loop { + tokio::select! { + packet = rx.recv() => { + match packet { + Some((buf, src)) => { + process_packet(&buf, &src, aggregator, namespace); } - None + None => break, + } + } + _ = cancel.cancelled() => { + while let Ok((buf, src)) = rx.try_recv() { + process_packet(&buf, &src, aggregator, namespace); } - }) - .map(|mut metric| { - if let Some(ns) = namespace { - Self::prepend_namespace(ns, &mut metric); + break; + } + } + } +} + +/// Decodes a raw UDP packet into metric strings and sends them to the aggregator. +fn process_packet( + buf: &[u8], + src: &MessageSource, + aggregator: &AggregatorHandle, + namespace: Option<&str>, +) { + if buf.is_empty() { + debug!("Received empty buffer from {}, skipping", src); + return; + } + + debug!("Received packet: {} bytes from {}", buf.len(), src); + + #[allow(clippy::expect_used)] + let msgs = std::str::from_utf8(buf).expect("couldn't parse as string"); + debug!("Received message: {} from {}", msgs, src); + let statsd_metric_strings: Vec<&str> = msgs.split('\n').collect(); + let metric_count_in_packet = statsd_metric_strings + .iter() + .filter(|m| !m.is_empty()) + .count(); + debug!("Packet contains {} metric strings", metric_count_in_packet); + insert_metrics(statsd_metric_strings.into_iter(), aggregator, namespace); +} + +fn prepend_namespace(namespace: &str, metric: &mut Metric) { + let new_name = format!("{}.{}", namespace, metric.name); + metric.name = ustr::Ustr::from(&new_name); + metric.id = id(metric.name, &metric.tags, metric.timestamp); +} + +/// Parses metric strings, applies optional namespace, and sends them to the aggregator. +fn insert_metrics<'a, I>(msg: I, aggregator: &AggregatorHandle, namespace: Option<&str>) +where + I: Iterator, +{ + let all_valid_metrics: Vec = msg + .filter(|m| { + !m.is_empty() + && !m.starts_with("_sc|") + && !m.starts_with("_e{") + // todo(serverless): remove this hack, and create a blocklist for metrics + // or another mechanism for this. + // + // avoid metric duplication with lambda layer + && !m.starts_with("aws.lambda.enhanced.invocations") + }) // exclude empty messages, service checks, and events + .map(|m| m.replace('\n', "")) + .filter_map(|m| match parse(m.as_str()) { + Ok(metric) => Some(metric), + Err(e) => { + // unsupported type is quite common with dd_trace metrics. Avoid perf issue and + // log spam in that case + match e { + UnsupportedType(_) => debug!("Unsupported metric type: {}. {}", m, e), + _ => error!("Failed to parse metric {}: {}", m, e), } - metric - }) - .collect(); - if !all_valid_metrics.is_empty() { - // Send metrics through the channel - no lock needed! - if let Err(e) = self.aggregator_handle.insert_batch(all_valid_metrics) { - error!("Failed to send metrics to aggregator: {}", e); + None + } + }) + .map(|mut metric| { + if let Some(ns) = namespace { + prepend_namespace(ns, &mut metric); } + metric + }) + .collect(); + if !all_valid_metrics.is_empty() { + debug!( + "Parsed {} valid metrics, sending to aggregator", + all_valid_metrics.len() + ); + // Send metrics through the channel - no lock needed! + if let Err(e) = aggregator.insert_batch(all_valid_metrics) { + error!("Failed to send metrics to aggregator: {}", e); } } } @@ -462,6 +617,8 @@ mod tests { use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use tracing_test::traced_test; + use super::*; + #[tokio::test] async fn test_dogstatsd_multi_distribution() { let response = setup_and_consume_dogstatsd( @@ -571,6 +728,62 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d requested ); } + async fn test_dogstatsd_custom_buffer_size() { + // Use a large buffer to verify custom buf_size is wired through. + // The MirrorTest reader returns pre-stored data (ignoring buf_size), + // so this test ensures the buf_size field is properly wired into DogStatsD. + let payload = "large.buf.metric:1|c\nlarge.buf.metric2:2|c\n"; + let custom_buf_size: usize = 16384; + + let (service, handle) = + AggregatorService::new(EMPTY_TAGS, 1_024).expect("aggregator service creation failed"); + let service_task = tokio::spawn(service.run()); + let cancel_token = tokio_util::sync::CancellationToken::new(); + + let dogstatsd = DogStatsD { + cancel_token, + aggregator_handle: handle.clone(), + buffer_reader: BufferReader::MirrorTest( + payload.as_bytes().to_vec(), + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0), + ), + metric_namespace: None, + buf_size: custom_buf_size, + }; + dogstatsd.consume_statsd().await; + + let response = handle.flush().await.expect("Failed to flush"); + assert_eq!(response.series.len(), 1); + assert_eq!(response.series[0].series.len(), 2); + + handle.shutdown().expect("Failed to shutdown"); + service_task.await.expect("Service task failed"); + } + + #[tokio::test] + #[traced_test] + async fn test_dogstatsd_zero_buffer_size_falls_back_to_default() { + let cancel_token = tokio_util::sync::CancellationToken::new(); + let (service, handle) = + AggregatorService::new(EMPTY_TAGS, 1_024).expect("aggregator service creation failed"); + tokio::spawn(service.run()); + + let config = super::DogStatsDConfig { + host: "127.0.0.1".to_string(), + port: 0, + metric_namespace: None, + #[cfg(all(windows, feature = "windows-pipes"))] + windows_pipe_name: None, + so_rcvbuf: None, + buffer_size: Some(0), + }; + + let dogstatsd = DogStatsD::new(&config, handle.clone(), cancel_token).await; + assert_eq!(dogstatsd.buf_size, super::DEFAULT_BUFFER_SIZE); + assert!(logs_contain("buffer_size cannot be 0")); + + handle.shutdown().expect("Failed to shutdown"); + } #[tokio::test] async fn test_dogstatsd_custom_buffer_size() { @@ -585,7 +798,7 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d let service_task = tokio::spawn(service.run()); let cancel_token = tokio_util::sync::CancellationToken::new(); - let dogstatsd = DogStatsD { + let mut dogstatsd = DogStatsD { cancel_token, aggregator_handle: handle.clone(), buffer_reader: BufferReader::MirrorTest( @@ -594,6 +807,7 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d ), metric_namespace: None, buf_size: custom_buf_size, + queue_size: DEFAULT_QUEUE_SIZE, }; dogstatsd.consume_statsd().await; @@ -621,6 +835,7 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d windows_pipe_name: None, so_rcvbuf: None, buffer_size: Some(0), + queue_size: None, }; let dogstatsd = DogStatsD::new(&config, handle.clone(), cancel_token).await; @@ -643,7 +858,7 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d let cancel_token = tokio_util::sync::CancellationToken::new(); - let dogstatsd = DogStatsD { + let mut dogstatsd = DogStatsD { cancel_token, aggregator_handle: handle.clone(), buffer_reader: BufferReader::MirrorTest( @@ -651,7 +866,8 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d SocketAddr::new(IpAddr::V4(Ipv4Addr::new(111, 112, 113, 114)), 0), ), metric_namespace, - buf_size: super::DEFAULT_BUFFER_SIZE, + buf_size: DEFAULT_BUFFER_SIZE, + queue_size: DEFAULT_QUEUE_SIZE, }; dogstatsd.consume_statsd().await; diff --git a/crates/dogstatsd/tests/integration_test.rs b/crates/dogstatsd/tests/integration_test.rs index 1f12930..1b242ca 100644 --- a/crates/dogstatsd/tests/integration_test.rs +++ b/crates/dogstatsd/tests/integration_test.rs @@ -112,6 +112,7 @@ async fn start_dogstatsd_on_port( windows_pipe_name: None, so_rcvbuf: None, buffer_size, + queue_size: None, }; let dogstatsd_cancel_token = tokio_util::sync::CancellationToken::new(); let dogstatsd_client = DogStatsD::new( @@ -398,6 +399,7 @@ async fn test_named_pipe_basic_communication() { windows_pipe_name: Some(pipe_name.to_string()), so_rcvbuf: None, buffer_size: None, + queue_size: None, }, handle, cancel_token, @@ -454,6 +456,7 @@ async fn test_named_pipe_disconnect_reconnect() { windows_pipe_name: Some(pipe_name.to_string()), so_rcvbuf: None, buffer_size: None, + queue_size: None, }, handle, cancel_token_clone, @@ -525,6 +528,7 @@ async fn test_named_pipe_cancellation() { windows_pipe_name: Some(pipe_name.to_string()), so_rcvbuf: None, buffer_size: None, + queue_size: None, }, handle, cancel_token_clone, @@ -570,6 +574,7 @@ async fn test_buffer_split_message() { windows_pipe_name: Some(pipe_name.to_string()), so_rcvbuf: None, buffer_size: None, + queue_size: None, }, handle, cancel_token_clone, From 77265ece059fa2deb405895df094882dd445cca7 Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Tue, 17 Feb 2026 16:22:53 -0500 Subject: [PATCH 2/3] remove duplicate test --- crates/dogstatsd/src/dogstatsd.rs | 56 ------------------------------- 1 file changed, 56 deletions(-) diff --git a/crates/dogstatsd/src/dogstatsd.rs b/crates/dogstatsd/src/dogstatsd.rs index cfa9090..f27a4c6 100644 --- a/crates/dogstatsd/src/dogstatsd.rs +++ b/crates/dogstatsd/src/dogstatsd.rs @@ -728,62 +728,6 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d requested ); } - async fn test_dogstatsd_custom_buffer_size() { - // Use a large buffer to verify custom buf_size is wired through. - // The MirrorTest reader returns pre-stored data (ignoring buf_size), - // so this test ensures the buf_size field is properly wired into DogStatsD. - let payload = "large.buf.metric:1|c\nlarge.buf.metric2:2|c\n"; - let custom_buf_size: usize = 16384; - - let (service, handle) = - AggregatorService::new(EMPTY_TAGS, 1_024).expect("aggregator service creation failed"); - let service_task = tokio::spawn(service.run()); - let cancel_token = tokio_util::sync::CancellationToken::new(); - - let dogstatsd = DogStatsD { - cancel_token, - aggregator_handle: handle.clone(), - buffer_reader: BufferReader::MirrorTest( - payload.as_bytes().to_vec(), - SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 0), - ), - metric_namespace: None, - buf_size: custom_buf_size, - }; - dogstatsd.consume_statsd().await; - - let response = handle.flush().await.expect("Failed to flush"); - assert_eq!(response.series.len(), 1); - assert_eq!(response.series[0].series.len(), 2); - - handle.shutdown().expect("Failed to shutdown"); - service_task.await.expect("Service task failed"); - } - - #[tokio::test] - #[traced_test] - async fn test_dogstatsd_zero_buffer_size_falls_back_to_default() { - let cancel_token = tokio_util::sync::CancellationToken::new(); - let (service, handle) = - AggregatorService::new(EMPTY_TAGS, 1_024).expect("aggregator service creation failed"); - tokio::spawn(service.run()); - - let config = super::DogStatsDConfig { - host: "127.0.0.1".to_string(), - port: 0, - metric_namespace: None, - #[cfg(all(windows, feature = "windows-pipes"))] - windows_pipe_name: None, - so_rcvbuf: None, - buffer_size: Some(0), - }; - - let dogstatsd = DogStatsD::new(&config, handle.clone(), cancel_token).await; - assert_eq!(dogstatsd.buf_size, super::DEFAULT_BUFFER_SIZE); - assert!(logs_contain("buffer_size cannot be 0")); - - handle.shutdown().expect("Failed to shutdown"); - } #[tokio::test] async fn test_dogstatsd_custom_buffer_size() { From b1bdfd541f39f6af4f6617e274b784e0a557a391 Mon Sep 17 00:00:00 2001 From: Jordan Gonzalez <30836115+duncanista@users.noreply.github.com> Date: Tue, 17 Feb 2026 16:48:53 -0500 Subject: [PATCH 3/3] change some `debug` to `trace` --- crates/dogstatsd/src/dogstatsd.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/crates/dogstatsd/src/dogstatsd.rs b/crates/dogstatsd/src/dogstatsd.rs index f27a4c6..11eae2d 100644 --- a/crates/dogstatsd/src/dogstatsd.rs +++ b/crates/dogstatsd/src/dogstatsd.rs @@ -13,7 +13,7 @@ use crate::aggregator_service::AggregatorHandle; use crate::errors::ParseError::UnsupportedType; use crate::metric::{id, parse, Metric}; use socket2::{Domain, Protocol, Socket, Type}; -use tracing::{debug, error}; +use tracing::{debug, error, trace}; // Windows-specific imports #[cfg(all(windows, feature = "windows-pipes"))] @@ -376,17 +376,17 @@ fn process_packet( return; } - debug!("Received packet: {} bytes from {}", buf.len(), src); + trace!("Received packet: {} bytes from {}", buf.len(), src); #[allow(clippy::expect_used)] let msgs = std::str::from_utf8(buf).expect("couldn't parse as string"); - debug!("Received message: {} from {}", msgs, src); + trace!("Received message: {} from {}", msgs, src); let statsd_metric_strings: Vec<&str> = msgs.split('\n').collect(); let metric_count_in_packet = statsd_metric_strings .iter() .filter(|m| !m.is_empty()) .count(); - debug!("Packet contains {} metric strings", metric_count_in_packet); + trace!("Packet contains {} metric strings", metric_count_in_packet); insert_metrics(statsd_metric_strings.into_iter(), aggregator, namespace); }