Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 140 additions & 24 deletions crates/dogstatsd/src/dogstatsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,21 @@ impl std::fmt::Display for MessageSource {
}
}

/// A packet read from the transport, carrying a pooled buffer.
/// The actual data is `buf[..len]` — the buffer itself is returned
/// to the pool after processing so it can be reused without allocation.
struct Packet {
buf: Vec<u8>,
len: usize,
source: MessageSource,
}

impl Packet {
fn data(&self) -> &[u8] {
&self.buf[..self.len]
}
}

// BufferReader abstracts transport methods for metric data.
enum BufferReader {
/// UDP socket reader (cross-platform, default transport)
Expand Down Expand Up @@ -119,6 +134,22 @@ impl BufferReader {
},
}
}

/// Non-blocking read into the provided buffer. Returns `Ok(Some(...))`
/// if a packet is immediately available, `Ok(None)` if the socket would
/// block. Used after `read_into()` to drain all pending packets from the
/// kernel buffer without re-entering tokio's event loop.
fn try_read_into(&mut self, buf: &mut [u8]) -> std::io::Result<Option<(usize, MessageSource)>> {
match self {
BufferReader::UdpSocket(socket) => match socket.try_recv_from(buf) {
Ok((amt, src)) => Ok(Some((amt, MessageSource::Network(src)))),
Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(None),
Err(e) => Err(e),
},
// Non-UDP transports don't support non-blocking reads
_ => Ok(None),
}
}
}

/// DogStatsD server to receive, parse, and forward metrics.
Expand Down Expand Up @@ -249,17 +280,31 @@ impl DogStatsD {
} = self;

let (tx, mut rx) = tokio::sync::mpsc::channel(queue_size);
// Buffer pool: processor returns used buffers, reader reuses them.
// Avoids a heap allocation + zero-fill per packet in steady state.
let (pool_tx, pool_rx) = tokio::sync::mpsc::unbounded_channel();
let reader_pool_tx = pool_tx.clone();

let reader_token = cancel_token.clone();
tokio::spawn(async move {
read_loop(buffer_reader, tx, reader_token, buf_size, queue_size).await;
read_loop(
buffer_reader,
tx,
reader_token,
buf_size,
queue_size,
pool_rx,
reader_pool_tx,
)
.await;
});

process_loop(
&mut rx,
&cancel_token,
&aggregator_handle,
metric_namespace.as_deref(),
pool_tx,
)
.await;
}
Expand All @@ -285,21 +330,39 @@ impl DogStatsD {

/// Drains the transport into the channel as fast as possible.
///
/// Allocates a fresh buffer for each read. On queue full, drops the packet
/// and logs at power-of-two intervals (matching Go agent behavior).
/// Buffers are drawn from a pool (or allocated on first use) and returned by
/// the processor after parsing. In steady state this means zero heap
/// allocations in the read loop — only a `recv_from` syscall + channel send.
///
/// After each async `read_into()`, calls `try_read_into()` in a tight loop to
/// drain all packets already sitting in the kernel buffer without re-entering
/// tokio's event loop.
///
/// Uses `is_cancelled()` as a fast-path check at the top of each iteration,
/// and `tokio::select!` on the async `read_into()` to ensure the reader exits
/// promptly even on a quiet socket (where `read_into` would block forever).
async fn read_loop(
mut reader: BufferReader,
tx: tokio::sync::mpsc::Sender<(Vec<u8>, MessageSource)>,
tx: tokio::sync::mpsc::Sender<Packet>,
cancel: tokio_util::sync::CancellationToken,
buf_size: usize,
queue_capacity: usize,
mut pool_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>,
pool_tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
) {
let mut dropped: u64 = 0;
loop {
let mut buf = vec![0u8; buf_size];
while !cancel.is_cancelled() {
// Get a buffer from the pool, or allocate if the pool is empty (cold start).
let mut buf = pool_rx.try_recv().unwrap_or_else(|_| vec![0u8; buf_size]);

// Async wait for the first packet, but remain cancellation-aware
// so the reader exits promptly on a quiet socket.
let result = tokio::select! {
r = reader.read_into(&mut buf) => r,
_ = cancel.cancelled() => break,
_ = cancel.cancelled() => {
let _ = pool_tx.send(buf);
break;
}
};
let (len, source) = match result {
Ok(r) => r,
Expand All @@ -309,22 +372,45 @@ async fn read_loop(
}
Err(e) => {
error!("DogStatsD read error: {}", e);
let _ = pool_tx.send(buf);
continue;
}
};
if len == 0 {
let _ = pool_tx.send(buf);
continue;
}
buf.truncate(len);
match tx.try_send((buf, source)) {
Ok(()) => {}
Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => {
dropped += 1;
if dropped.is_power_of_two() {
debug!("DogStatsD queue full, {} packets dropped so far", dropped);
if !try_send_packet(&tx, Packet { buf, len, source }, &mut dropped, &pool_tx) {
break;
}

// Drain any packets already in the kernel buffer without going
// through tokio's event loop — just raw non-blocking syscalls.
loop {
let mut buf = pool_rx.try_recv().unwrap_or_else(|_| vec![0u8; buf_size]);
match reader.try_read_into(&mut buf) {
Ok(Some((0, _))) => {
let _ = pool_tx.send(buf);
}
Ok(Some((len, source))) => {
if !try_send_packet(&tx, Packet { buf, len, source }, &mut dropped, &pool_tx) {
break;
}
}
Ok(None) => {
let _ = pool_tx.send(buf);
break;
}
Comment on lines 387 to 403
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The batch-drain loop runs unconditionally, but try_read_into() returns Ok(None) for non-UDP transports. That means each iteration still pulls a buffer from the pool (or allocates on cold start) and immediately returns it, adding extra channel traffic and potentially an extra allocation per packet for named pipes/mirror transport. Consider gating the drain loop to only run for UdpSocket (e.g., via a supports_try_read()/is_udp() check) so non-UDP paths don’t pay this overhead.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The overhead is trivial — one try_recv() + one send() on unbounded channels per packet for named pipes. Adding a supports_try_read() method adds API surface and branching for a path that barely matters. Named pipes are Windows-only and low-volume compared to UDP.

Err(e) => {
error!("DogStatsD read error: {}", e);
let _ = pool_tx.send(buf);
break;
}
}
Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => break,
}
// If the channel was closed during batch drain, exit immediately.
if tx.is_closed() {
break;
}
}
if dropped > 0 {
Expand All @@ -335,28 +421,58 @@ async fn read_loop(
}
}

/// Sends a packet to the channel. Returns `false` if the channel is closed
/// (caller should exit the loop). On failure, returns the buffer to the pool
/// so it can be reused.
fn try_send_packet(
tx: &tokio::sync::mpsc::Sender<Packet>,
packet: Packet,
dropped: &mut u64,
pool: &tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
) -> bool {
match tx.try_send(packet) {
Ok(()) => true,
Err(tokio::sync::mpsc::error::TrySendError::Full(packet)) => {
let _ = pool.send(packet.buf);
*dropped += 1;
if dropped.is_power_of_two() {
debug!("DogStatsD queue full, {} packets dropped so far", *dropped);
}
true
}
Err(tokio::sync::mpsc::error::TrySendError::Closed(packet)) => {
let _ = pool.send(packet.buf);
false
}
}
}

/// Receives packets from the channel, parses them, and forwards metrics
/// to the aggregator. On cancellation, drains any remaining buffered
/// packets before exiting so no already-read data is lost.
/// to the aggregator. After processing each packet, returns the buffer to
/// the pool for reuse by the reader. On cancellation, drains any remaining
/// buffered packets before exiting so no already-read data is lost.
async fn process_loop(
rx: &mut tokio::sync::mpsc::Receiver<(Vec<u8>, MessageSource)>,
rx: &mut tokio::sync::mpsc::Receiver<Packet>,
cancel: &tokio_util::sync::CancellationToken,
aggregator: &AggregatorHandle,
namespace: Option<&str>,
pool: tokio::sync::mpsc::UnboundedSender<Vec<u8>>,
) {
loop {
tokio::select! {
packet = rx.recv() => {
match packet {
Some((buf, src)) => {
process_packet(&buf, &src, aggregator, namespace);
Some(packet) => {
process_packet(packet.data(), &packet.source, aggregator, namespace);
let _ = pool.send(packet.buf);
}
None => break,
}
}
_ = cancel.cancelled() => {
while let Ok((buf, src)) = rx.try_recv() {
process_packet(&buf, &src, aggregator, namespace);
while let Ok(packet) = rx.try_recv() {
process_packet(packet.data(), &packet.source, aggregator, namespace);
let _ = pool.send(packet.buf);
}
break;
}
Expand Down Expand Up @@ -732,8 +848,8 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d
#[tokio::test]
async fn test_dogstatsd_custom_buffer_size() {
// Use a large buffer to verify custom buf_size is wired through.
// The MirrorTest reader returns pre-stored data (ignoring buf_size),
// so this test ensures the buf_size field is properly wired into DogStatsD.
// The MirrorTest reader copies data into the caller's buffer, so
// a payload that fits within the custom size should parse correctly.
let payload = "large.buf.metric:1|c\nlarge.buf.metric2:2|c\n";
let custom_buf_size: usize = 16384;

Expand Down
115 changes: 115 additions & 0 deletions crates/dogstatsd/tests/integration_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,121 @@ async fn test_send_with_retry_immediate_failure_after_one_attempt() {
mock.assert_async().await;
}

/// Verifies that `spin()` exits promptly when cancelled on a quiet socket
/// (no packets arriving). Without cancellation-aware awaiting in the reader,
/// the task would block on `recv_from` indefinitely.
#[cfg(test)]
#[tokio::test]
async fn test_spin_exits_on_cancellation_without_traffic() {
use dogstatsd::metric::SortedTags;

let (service, handle) =
AggregatorService::new(SortedTags::parse("test:value").unwrap(), CONTEXTS)
.expect("aggregator service creation failed");
tokio::spawn(service.run());

let cancel_token = CancellationToken::new();
let dogstatsd = DogStatsD::new(
&DogStatsDConfig {
host: "127.0.0.1".to_string(),
port: 18128,
metric_namespace: None,
#[cfg(all(windows, feature = "windows-pipes"))]
windows_pipe_name: None,
so_rcvbuf: None,
buffer_size: None,
queue_size: None,
},
handle.clone(),
cancel_token.clone(),
)
.await;

let spin_handle = tokio::spawn(async move {
dogstatsd.spin().await;
});

// Cancel immediately — no packets sent
cancel_token.cancel();

// spin() must exit within 500ms; if it blocks on recv_from this times out.
let result = timeout(Duration::from_millis(500), spin_handle).await;
assert!(
result.is_ok(),
"spin() should exit promptly after cancellation on a quiet socket"
);

handle.shutdown().expect("shutdown failed");
}

/// Verifies that when the internal queue is full, the server drops packets
/// rather than blocking, and still delivers the metrics it did accept.
/// Uses queue_size=1 so the queue saturates almost immediately.
#[cfg(test)]
#[tokio::test]
async fn test_queue_full_drops_packets_without_blocking() {
use dogstatsd::metric::SortedTags;

let (service, handle) =
AggregatorService::new(SortedTags::parse("test:value").unwrap(), CONTEXTS)
.expect("aggregator service creation failed");
tokio::spawn(service.run());

let cancel_token = CancellationToken::new();
let dogstatsd = DogStatsD::new(
&DogStatsDConfig {
host: "127.0.0.1".to_string(),
port: 18129,
metric_namespace: None,
#[cfg(all(windows, feature = "windows-pipes"))]
windows_pipe_name: None,
so_rcvbuf: None,
buffer_size: None,
queue_size: Some(1),
},
handle.clone(),
cancel_token.clone(),
)
.await;

let spin_handle = tokio::spawn(async move {
dogstatsd.spin().await;
});

sleep(Duration::from_millis(50)).await;

// Fire 200 packets as fast as possible — with queue_size=1, most will be dropped.
let socket = UdpSocket::bind("0.0.0.0:0").await.expect("bind failed");
for i in 0..200 {
let msg = format!("qfull.m{}:{}|c\n", i, i);
let _ = socket.send_to(msg.as_bytes(), "127.0.0.1:18129").await;
}

// Give the processor time to consume what it received.
sleep(Duration::from_millis(300)).await;

let response = handle.flush().await.expect("flush failed");
let received: usize = response.series.iter().map(|s| s.len()).sum();

// With queue_size=1, we expect some metrics to arrive but not all 200.
// The exact number depends on timing, so just verify:
// 1. At least some metrics got through (server isn't broken)
// 2. Fewer than all 200 arrived (drops happened)
assert!(
received > 0,
"expected at least some metrics to arrive, got 0"
);
assert!(
received < 200,
"expected some packets to be dropped with queue_size=1, but all {} arrived",
received
);

cancel_token.cancel();
let _ = timeout(Duration::from_millis(500), spin_handle).await;
handle.shutdown().expect("shutdown failed");
}

/// Verifies that `buffer_size` actually controls how many bytes the server
/// reads per UDP packet. Sends a payload larger than a small buffer and
/// checks that metrics are lost, then sends the same payload to a server
Expand Down
Loading