-
Notifications
You must be signed in to change notification settings - Fork 0
fix(dogstatsd): decouple socket reading from metric processing #76
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -70,6 +70,21 @@ impl std::fmt::Display for MessageSource { | |
| } | ||
| } | ||
|
|
||
| /// A packet read from the transport, carrying a pooled buffer. | ||
| /// The actual data is `buf[..len]` — the buffer itself is returned | ||
| /// to the pool after processing so it can be reused without allocation. | ||
| struct Packet { | ||
| buf: Vec<u8>, | ||
| len: usize, | ||
| source: MessageSource, | ||
| } | ||
|
|
||
| impl Packet { | ||
| fn data(&self) -> &[u8] { | ||
| &self.buf[..self.len] | ||
| } | ||
| } | ||
|
|
||
| // BufferReader abstracts transport methods for metric data. | ||
| enum BufferReader { | ||
| /// UDP socket reader (cross-platform, default transport) | ||
|
|
@@ -119,6 +134,22 @@ impl BufferReader { | |
| }, | ||
| } | ||
| } | ||
|
|
||
| /// Non-blocking read into the provided buffer. Returns `Ok(Some(...))` | ||
| /// if a packet is immediately available, `Ok(None)` if the socket would | ||
| /// block. Used after `read_into()` to drain all pending packets from the | ||
| /// kernel buffer without re-entering tokio's event loop. | ||
| fn try_read_into(&mut self, buf: &mut [u8]) -> std::io::Result<Option<(usize, MessageSource)>> { | ||
| match self { | ||
| BufferReader::UdpSocket(socket) => match socket.try_recv_from(buf) { | ||
| Ok((amt, src)) => Ok(Some((amt, MessageSource::Network(src)))), | ||
| Err(ref e) if e.kind() == std::io::ErrorKind::WouldBlock => Ok(None), | ||
| Err(e) => Err(e), | ||
| }, | ||
| // Non-UDP transports don't support non-blocking reads | ||
| _ => Ok(None), | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// DogStatsD server to receive, parse, and forward metrics. | ||
|
|
@@ -249,17 +280,31 @@ impl DogStatsD { | |
| } = self; | ||
|
|
||
| let (tx, mut rx) = tokio::sync::mpsc::channel(queue_size); | ||
| // Buffer pool: processor returns used buffers, reader reuses them. | ||
| // Avoids a heap allocation + zero-fill per packet in steady state. | ||
| let (pool_tx, pool_rx) = tokio::sync::mpsc::unbounded_channel(); | ||
| let reader_pool_tx = pool_tx.clone(); | ||
|
|
||
| let reader_token = cancel_token.clone(); | ||
| tokio::spawn(async move { | ||
| read_loop(buffer_reader, tx, reader_token, buf_size, queue_size).await; | ||
| read_loop( | ||
| buffer_reader, | ||
| tx, | ||
| reader_token, | ||
| buf_size, | ||
| queue_size, | ||
| pool_rx, | ||
| reader_pool_tx, | ||
| ) | ||
| .await; | ||
| }); | ||
|
|
||
| process_loop( | ||
| &mut rx, | ||
| &cancel_token, | ||
| &aggregator_handle, | ||
| metric_namespace.as_deref(), | ||
| pool_tx, | ||
| ) | ||
| .await; | ||
| } | ||
|
|
@@ -285,21 +330,39 @@ impl DogStatsD { | |
|
|
||
| /// Drains the transport into the channel as fast as possible. | ||
| /// | ||
| /// Allocates a fresh buffer for each read. On queue full, drops the packet | ||
| /// and logs at power-of-two intervals (matching Go agent behavior). | ||
| /// Buffers are drawn from a pool (or allocated on first use) and returned by | ||
| /// the processor after parsing. In steady state this means zero heap | ||
| /// allocations in the read loop — only a `recv_from` syscall + channel send. | ||
| /// | ||
| /// After each async `read_into()`, calls `try_read_into()` in a tight loop to | ||
| /// drain all packets already sitting in the kernel buffer without re-entering | ||
| /// tokio's event loop. | ||
| /// | ||
| /// Uses `is_cancelled()` as a fast-path check at the top of each iteration, | ||
| /// and `tokio::select!` on the async `read_into()` to ensure the reader exits | ||
| /// promptly even on a quiet socket (where `read_into` would block forever). | ||
| async fn read_loop( | ||
| mut reader: BufferReader, | ||
| tx: tokio::sync::mpsc::Sender<(Vec<u8>, MessageSource)>, | ||
| tx: tokio::sync::mpsc::Sender<Packet>, | ||
| cancel: tokio_util::sync::CancellationToken, | ||
| buf_size: usize, | ||
| queue_capacity: usize, | ||
| mut pool_rx: tokio::sync::mpsc::UnboundedReceiver<Vec<u8>>, | ||
| pool_tx: tokio::sync::mpsc::UnboundedSender<Vec<u8>>, | ||
| ) { | ||
| let mut dropped: u64 = 0; | ||
| loop { | ||
| let mut buf = vec![0u8; buf_size]; | ||
| while !cancel.is_cancelled() { | ||
| // Get a buffer from the pool, or allocate if the pool is empty (cold start). | ||
| let mut buf = pool_rx.try_recv().unwrap_or_else(|_| vec![0u8; buf_size]); | ||
|
|
||
| // Async wait for the first packet, but remain cancellation-aware | ||
| // so the reader exits promptly on a quiet socket. | ||
| let result = tokio::select! { | ||
| r = reader.read_into(&mut buf) => r, | ||
| _ = cancel.cancelled() => break, | ||
| _ = cancel.cancelled() => { | ||
| let _ = pool_tx.send(buf); | ||
| break; | ||
| } | ||
| }; | ||
| let (len, source) = match result { | ||
| Ok(r) => r, | ||
|
|
@@ -309,22 +372,45 @@ async fn read_loop( | |
| } | ||
| Err(e) => { | ||
| error!("DogStatsD read error: {}", e); | ||
| let _ = pool_tx.send(buf); | ||
| continue; | ||
| } | ||
| }; | ||
| if len == 0 { | ||
| let _ = pool_tx.send(buf); | ||
| continue; | ||
| } | ||
| buf.truncate(len); | ||
| match tx.try_send((buf, source)) { | ||
| Ok(()) => {} | ||
| Err(tokio::sync::mpsc::error::TrySendError::Full(_)) => { | ||
| dropped += 1; | ||
| if dropped.is_power_of_two() { | ||
| debug!("DogStatsD queue full, {} packets dropped so far", dropped); | ||
| if !try_send_packet(&tx, Packet { buf, len, source }, &mut dropped, &pool_tx) { | ||
| break; | ||
| } | ||
|
|
||
| // Drain any packets already in the kernel buffer without going | ||
| // through tokio's event loop — just raw non-blocking syscalls. | ||
| loop { | ||
| let mut buf = pool_rx.try_recv().unwrap_or_else(|_| vec![0u8; buf_size]); | ||
| match reader.try_read_into(&mut buf) { | ||
| Ok(Some((0, _))) => { | ||
| let _ = pool_tx.send(buf); | ||
| } | ||
| Ok(Some((len, source))) => { | ||
| if !try_send_packet(&tx, Packet { buf, len, source }, &mut dropped, &pool_tx) { | ||
| break; | ||
| } | ||
| } | ||
| Ok(None) => { | ||
| let _ = pool_tx.send(buf); | ||
| break; | ||
| } | ||
|
Comment on lines
387
to
403
|
||
| Err(e) => { | ||
| error!("DogStatsD read error: {}", e); | ||
| let _ = pool_tx.send(buf); | ||
| break; | ||
| } | ||
| } | ||
| Err(tokio::sync::mpsc::error::TrySendError::Closed(_)) => break, | ||
| } | ||
duncanista marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| // If the channel was closed during batch drain, exit immediately. | ||
| if tx.is_closed() { | ||
| break; | ||
| } | ||
| } | ||
| if dropped > 0 { | ||
|
|
@@ -335,28 +421,58 @@ async fn read_loop( | |
| } | ||
| } | ||
|
|
||
| /// Sends a packet to the channel. Returns `false` if the channel is closed | ||
| /// (caller should exit the loop). On failure, returns the buffer to the pool | ||
| /// so it can be reused. | ||
| fn try_send_packet( | ||
| tx: &tokio::sync::mpsc::Sender<Packet>, | ||
| packet: Packet, | ||
| dropped: &mut u64, | ||
| pool: &tokio::sync::mpsc::UnboundedSender<Vec<u8>>, | ||
| ) -> bool { | ||
| match tx.try_send(packet) { | ||
| Ok(()) => true, | ||
| Err(tokio::sync::mpsc::error::TrySendError::Full(packet)) => { | ||
| let _ = pool.send(packet.buf); | ||
kathiehuang marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| *dropped += 1; | ||
| if dropped.is_power_of_two() { | ||
| debug!("DogStatsD queue full, {} packets dropped so far", *dropped); | ||
| } | ||
| true | ||
| } | ||
| Err(tokio::sync::mpsc::error::TrySendError::Closed(packet)) => { | ||
| let _ = pool.send(packet.buf); | ||
| false | ||
| } | ||
| } | ||
| } | ||
|
|
||
| /// Receives packets from the channel, parses them, and forwards metrics | ||
| /// to the aggregator. On cancellation, drains any remaining buffered | ||
| /// packets before exiting so no already-read data is lost. | ||
| /// to the aggregator. After processing each packet, returns the buffer to | ||
| /// the pool for reuse by the reader. On cancellation, drains any remaining | ||
| /// buffered packets before exiting so no already-read data is lost. | ||
| async fn process_loop( | ||
| rx: &mut tokio::sync::mpsc::Receiver<(Vec<u8>, MessageSource)>, | ||
| rx: &mut tokio::sync::mpsc::Receiver<Packet>, | ||
| cancel: &tokio_util::sync::CancellationToken, | ||
| aggregator: &AggregatorHandle, | ||
| namespace: Option<&str>, | ||
| pool: tokio::sync::mpsc::UnboundedSender<Vec<u8>>, | ||
| ) { | ||
| loop { | ||
| tokio::select! { | ||
| packet = rx.recv() => { | ||
| match packet { | ||
| Some((buf, src)) => { | ||
| process_packet(&buf, &src, aggregator, namespace); | ||
| Some(packet) => { | ||
| process_packet(packet.data(), &packet.source, aggregator, namespace); | ||
| let _ = pool.send(packet.buf); | ||
| } | ||
| None => break, | ||
| } | ||
| } | ||
| _ = cancel.cancelled() => { | ||
| while let Ok((buf, src)) = rx.try_recv() { | ||
| process_packet(&buf, &src, aggregator, namespace); | ||
| while let Ok(packet) = rx.try_recv() { | ||
| process_packet(packet.data(), &packet.source, aggregator, namespace); | ||
| let _ = pool.send(packet.buf); | ||
| } | ||
| break; | ||
| } | ||
|
|
@@ -732,8 +848,8 @@ single_machine_performance.rouster.metrics_max_timestamp_latency:1376.90870216|d | |
| #[tokio::test] | ||
| async fn test_dogstatsd_custom_buffer_size() { | ||
| // Use a large buffer to verify custom buf_size is wired through. | ||
| // The MirrorTest reader returns pre-stored data (ignoring buf_size), | ||
| // so this test ensures the buf_size field is properly wired into DogStatsD. | ||
| // The MirrorTest reader copies data into the caller's buffer, so | ||
| // a payload that fits within the custom size should parse correctly. | ||
| let payload = "large.buf.metric:1|c\nlarge.buf.metric2:2|c\n"; | ||
| let custom_buf_size: usize = 16384; | ||
|
|
||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.