diff --git a/crates/dogstatsd/src/dogstatsd.rs b/crates/dogstatsd/src/dogstatsd.rs index 11eae2d..e2ab2a0 100644 --- a/crates/dogstatsd/src/dogstatsd.rs +++ b/crates/dogstatsd/src/dogstatsd.rs @@ -70,6 +70,21 @@ impl std::fmt::Display for MessageSource { } } +/// A packet read from the transport, carrying a pooled buffer. +/// The actual data is `buf[..len]` — the buffer itself is returned +/// to the pool after processing so it can be reused without allocation. +struct Packet { + buf: Vec, + len: usize, + source: MessageSource, +} + +impl Packet { + fn data(&self) -> &[u8] { + &self.buf[..self.len] + } +} + // BufferReader abstracts transport methods for metric data. enum BufferReader { /// UDP socket reader (cross-platform, default transport) @@ -119,6 +134,22 @@ impl BufferReader { }, } } + + /// Non-blocking read into the provided buffer. Returns `Ok(Some(...))` + /// if a packet is immediately available, `Ok(None)` if the socket would + /// block. Used after `read_into()` to drain all pending packets from the + /// kernel buffer without re-entering tokio's event loop. + fn try_read_into(&mut self, buf: &mut [u8]) -> std::io::Result> { + match self { + BufferReader::UdpSocket(socket) => match socket.try_recv_from(buf) { + Ok((amt, src)) => Ok(Some((amt, MessageSource::Network(src)))), + Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(None), + Err(e) => Err(e), + }, + // Non-UDP transports don't support non-blocking reads + _ => Ok(None), + } + } } /// DogStatsD server to receive, parse, and forward metrics. @@ -249,10 +280,23 @@ impl DogStatsD { } = self; let (tx, mut rx) = tokio::sync::mpsc::channel(queue_size); + // Buffer pool: processor returns used buffers, reader reuses them. + // Avoids a heap allocation + zero-fill per packet in steady state. + let (pool_tx, pool_rx) = tokio::sync::mpsc::unbounded_channel(); + let reader_pool_tx = pool_tx.clone(); let reader_token = cancel_token.clone(); tokio::spawn(async move { - read_loop(buffer_reader, tx, reader_token, buf_size, queue_size).await; + read_loop( + buffer_reader, + tx, + reader_token, + buf_size, + queue_size, + pool_rx, + reader_pool_tx, + ) + .await; }); process_loop( @@ -260,6 +304,7 @@ impl DogStatsD { &cancel_token, &aggregator_handle, metric_namespace.as_deref(), + pool_tx, ) .await; } @@ -285,21 +330,39 @@ impl DogStatsD { /// Drains the transport into the channel as fast as possible. /// -/// Allocates a fresh buffer for each read. On queue full, drops the packet -/// and logs at power-of-two intervals (matching Go agent behavior). +/// Buffers are drawn from a pool (or allocated on first use) and returned by +/// the processor after parsing. In steady state this means zero heap +/// allocations in the read loop — only a `recv_from` syscall + channel send. +/// +/// After each async `read_into()`, calls `try_read_into()` in a tight loop to +/// drain all packets already sitting in the kernel buffer without re-entering +/// tokio's event loop. +/// +/// Uses `is_cancelled()` as a fast-path check at the top of each iteration, +/// and `tokio::select!` on the async `read_into()` to ensure the reader exits +/// promptly even on a quiet socket (where `read_into` would block forever). async fn read_loop( mut reader: BufferReader, - tx: tokio::sync::mpsc::Sender<(Vec, MessageSource)>, + tx: tokio::sync::mpsc::Sender, cancel: tokio_util::sync::CancellationToken, buf_size: usize, queue_capacity: usize, + mut pool_rx: tokio::sync::mpsc::UnboundedReceiver>, + pool_tx: tokio::sync::mpsc::UnboundedSender>, ) { let mut dropped: u64 = 0; - loop { - let mut buf = vec![0u8; buf_size]; + while !cancel.is_cancelled() { + // Get a buffer from the pool, or allocate if the pool is empty (cold start). + let mut buf = pool_rx.try_recv().unwrap_or_else(|_| vec![0u8; buf_size]); + + // Async wait for the first packet, but remain cancellation-aware + // so the reader exits promptly on a quiet socket. let result = tokio::select! { r = reader.read_into(&mut buf) => r, - _ = cancel.cancelled() => break, + _ = cancel.cancelled() => { + let _ = pool_tx.send(buf); + break; + } }; let (len, source) = match result { Ok(r) => r, @@ -309,22 +372,45 @@ async fn read_loop( } Err(e) => { error!("DogStatsD read error: {}", e); + let _ = pool_tx.send(buf); continue; } }; if len == 0 { + let _ = pool_tx.send(buf); continue; } - buf.truncate(len); - match tx.try_send((buf, source)) { - Ok(()) => {} - Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { - dropped += 1; - if dropped.is_power_of_two() { - debug!("DogStatsD queue full, {} packets dropped so far", dropped); + if !try_send_packet(&tx, Packet { buf, len, source }, &mut dropped, &pool_tx) { + break; + } + + // Drain any packets already in the kernel buffer without going + // through tokio's event loop — just raw non-blocking syscalls. + loop { + let mut buf = pool_rx.try_recv().unwrap_or_else(|_| vec![0u8; buf_size]); + match reader.try_read_into(&mut buf) { + Ok(Some((0, _))) => { + let _ = pool_tx.send(buf); + } + Ok(Some((len, source))) => { + if !try_send_packet(&tx, Packet { buf, len, source }, &mut dropped, &pool_tx) { + break; + } + } + Ok(None) => { + let _ = pool_tx.send(buf); + break; + } + Err(e) => { + error!("DogStatsD read error: {}", e); + let _ = pool_tx.send(buf); + break; } } - Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => break, + } + // If the channel was closed during batch drain, exit immediately. + if tx.is_closed() { + break; } } if dropped > 0 { @@ -335,28 +421,58 @@ async fn read_loop( } } +/// Sends a packet to the channel. Returns `false` if the channel is closed +/// (caller should exit the loop). On failure, returns the buffer to the pool +/// so it can be reused. +fn try_send_packet( + tx: &tokio::sync::mpsc::Sender, + packet: Packet, + dropped: &mut u64, + pool: &tokio::sync::mpsc::UnboundedSender>, +) -> bool { + match tx.try_send(packet) { + Ok(()) => true, + Err(tokio::sync::mpsc::error::TrySendError::Full(packet)) => { + let _ = pool.send(packet.buf); + *dropped += 1; + if dropped.is_power_of_two() { + debug!("DogStatsD queue full, {} packets dropped so far", *dropped); + } + true + } + Err(tokio::sync::mpsc::error::TrySendError::Closed(packet)) => { + let _ = pool.send(packet.buf); + false + } + } +} + /// Receives packets from the channel, parses them, and forwards metrics -/// to the aggregator. On cancellation, drains any remaining buffered -/// packets before exiting so no already-read data is lost. +/// to the aggregator. After processing each packet, returns the buffer to +/// the pool for reuse by the reader. On cancellation, drains any remaining +/// buffered packets before exiting so no already-read data is lost. async fn process_loop( - rx: &mut tokio::sync::mpsc::Receiver<(Vec, MessageSource)>, + rx: &mut tokio::sync::mpsc::Receiver, cancel: &tokio_util::sync::CancellationToken, aggregator: &AggregatorHandle, namespace: Option<&str>, + pool: tokio::sync::mpsc::UnboundedSender>, ) { loop { tokio::select! { packet = rx.recv() => { match packet { - Some((buf, src)) => { - process_packet(&buf, &src, aggregator, namespace); + Some(packet) => { + process_packet(packet.data(), &packet.source, aggregator, namespace); + let _ = pool.send(packet.buf); } None => break, } } _ = cancel.cancelled() => { - while let Ok((buf, src)) = rx.try_recv() { - process_packet(&buf, &src, aggregator, namespace); + while let Ok(packet) = rx.try_recv() { + process_packet(packet.data(), &packet.source, aggregator, namespace); + let _ = pool.send(packet.buf); } break; } @@ -732,8 +848,8 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d #[tokio::test] async fn test_dogstatsd_custom_buffer_size() { // Use a large buffer to verify custom buf_size is wired through. - // The MirrorTest reader returns pre-stored data (ignoring buf_size), - // so this test ensures the buf_size field is properly wired into DogStatsD. + // The MirrorTest reader copies data into the caller's buffer, so + // a payload that fits within the custom size should parse correctly. let payload = "large.buf.metric:1|c\nlarge.buf.metric2:2|c\n"; let custom_buf_size: usize = 16384; diff --git a/crates/dogstatsd/tests/integration_test.rs b/crates/dogstatsd/tests/integration_test.rs index 1b242ca..839dab0 100644 --- a/crates/dogstatsd/tests/integration_test.rs +++ b/crates/dogstatsd/tests/integration_test.rs @@ -297,6 +297,121 @@ async fn test_send_with_retry_immediate_failure_after_one_attempt() { mock.assert_async().await; } +/// Verifies that `spin()` exits promptly when cancelled on a quiet socket +/// (no packets arriving). Without cancellation-aware awaiting in the reader, +/// the task would block on `recv_from` indefinitely. +#[cfg(test)] +#[tokio::test] +async fn test_spin_exits_on_cancellation_without_traffic() { + use dogstatsd::metric::SortedTags; + + let (service, handle) = + AggregatorService::new(SortedTags::parse("test:value").unwrap(), CONTEXTS) + .expect("aggregator service creation failed"); + tokio::spawn(service.run()); + + let cancel_token = CancellationToken::new(); + let dogstatsd = DogStatsD::new( + &DogStatsDConfig { + host: "127.0.0.1".to_string(), + port: 18128, + metric_namespace: None, + #[cfg(all(windows, feature = "windows-pipes"))] + windows_pipe_name: None, + so_rcvbuf: None, + buffer_size: None, + queue_size: None, + }, + handle.clone(), + cancel_token.clone(), + ) + .await; + + let spin_handle = tokio::spawn(async move { + dogstatsd.spin().await; + }); + + // Cancel immediately — no packets sent + cancel_token.cancel(); + + // spin() must exit within 500ms; if it blocks on recv_from this times out. + let result = timeout(Duration::from_millis(500), spin_handle).await; + assert!( + result.is_ok(), + "spin() should exit promptly after cancellation on a quiet socket" + ); + + handle.shutdown().expect("shutdown failed"); +} + +/// Verifies that when the internal queue is full, the server drops packets +/// rather than blocking, and still delivers the metrics it did accept. +/// Uses queue_size=1 so the queue saturates almost immediately. +#[cfg(test)] +#[tokio::test] +async fn test_queue_full_drops_packets_without_blocking() { + use dogstatsd::metric::SortedTags; + + let (service, handle) = + AggregatorService::new(SortedTags::parse("test:value").unwrap(), CONTEXTS) + .expect("aggregator service creation failed"); + tokio::spawn(service.run()); + + let cancel_token = CancellationToken::new(); + let dogstatsd = DogStatsD::new( + &DogStatsDConfig { + host: "127.0.0.1".to_string(), + port: 18129, + metric_namespace: None, + #[cfg(all(windows, feature = "windows-pipes"))] + windows_pipe_name: None, + so_rcvbuf: None, + buffer_size: None, + queue_size: Some(1), + }, + handle.clone(), + cancel_token.clone(), + ) + .await; + + let spin_handle = tokio::spawn(async move { + dogstatsd.spin().await; + }); + + sleep(Duration::from_millis(50)).await; + + // Fire 200 packets as fast as possible — with queue_size=1, most will be dropped. + let socket = UdpSocket::bind("0.0.0.0:0").await.expect("bind failed"); + for i in 0..200 { + let msg = format!("qfull.m{}:{}|c\n", i, i); + let _ = socket.send_to(msg.as_bytes(), "127.0.0.1:18129").await; + } + + // Give the processor time to consume what it received. + sleep(Duration::from_millis(300)).await; + + let response = handle.flush().await.expect("flush failed"); + let received: usize = response.series.iter().map(|s| s.len()).sum(); + + // With queue_size=1, we expect some metrics to arrive but not all 200. + // The exact number depends on timing, so just verify: + // 1. At least some metrics got through (server isn't broken) + // 2. Fewer than all 200 arrived (drops happened) + assert!( + received > 0, + "expected at least some metrics to arrive, got 0" + ); + assert!( + received < 200, + "expected some packets to be dropped with queue_size=1, but all {} arrived", + received + ); + + cancel_token.cancel(); + let _ = timeout(Duration::from_millis(500), spin_handle).await; + handle.shutdown().expect("shutdown failed"); +} + /// Verifies that `buffer_size` actually controls how many bytes the server /// reads per UDP packet. Sends a payload larger than a small buffer and /// checks that metrics are lost, then sends the same payload to a server