diff --git a/candumpr/src/format.rs b/candumpr/src/format.rs index 59593b7..207a457 100644 --- a/candumpr/src/format.rs +++ b/candumpr/src/format.rs @@ -6,6 +6,13 @@ use crate::frame::CanFrame; pub trait Formatter { /// Append the formatted representation of `frame` to `buf`. fn format(&self, frame: &CanFrame, buf: &mut Vec); + + /// Optional header bytes written once at the start of each output stream, before any frames. + /// + /// Used by formats with a file-level header (e.g. PCAP). + fn header(&self) -> Option<&[u8]> { + None + } } /// Formats frames in the can-utils candump file format. diff --git a/candumpr/src/lib.rs b/candumpr/src/lib.rs index c4e0b2f..c1f2665 100644 --- a/candumpr/src/lib.rs +++ b/candumpr/src/lib.rs @@ -1,8 +1,46 @@ pub mod can; pub mod format; pub mod frame; +pub mod pipeline; pub mod recv; -pub mod write; +pub mod sink; +pub mod writer; + +#[cfg(test)] +pub(crate) mod test_util { + pub(crate) struct TestBufWriter { + pub(crate) bytes: Vec, + } + + impl TestBufWriter { + pub(crate) fn new() -> Self { + Self { bytes: Vec::new() } + } + } + + impl crate::writer::Writer for TestBufWriter { + fn write(&mut self, b: &[u8]) -> eyre::Result<()> { + self.bytes.extend_from_slice(b); + Ok(()) + } + + fn flush(&mut self) -> eyre::Result<()> { + Ok(()) + } + + fn sync(&mut self) -> eyre::Result<()> { + Ok(()) + } + + fn finish(&mut self) -> eyre::Result<()> { + Ok(()) + } + + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { + self + } + } +} #[cfg(test)] #[ctor::ctor] diff --git a/candumpr/src/main.rs b/candumpr/src/main.rs index 1c0d5bd..1e0b85d 100644 --- a/candumpr/src/main.rs +++ b/candumpr/src/main.rs @@ -1,11 +1,14 @@ +use std::process::ExitCode; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc; use std::time::Duration; use candumpr::can; use candumpr::format::{CanutilsFormatter, Formatter}; -use candumpr::recv::receiver::Receiver; -use candumpr::write::{StdoutWriter, Writer}; +use candumpr::pipeline::Pipeline; +use candumpr::recv::receiver::{BATCH_CAPACITY, Receiver}; +use candumpr::sink::Sink; +use candumpr::writer::StdoutWriter; use clap::Parser; static STOP: AtomicBool = AtomicBool::new(false); @@ -27,8 +30,11 @@ struct Cli { log_level: tracing::Level, } -fn main() -> eyre::Result<()> { - color_eyre::install()?; +fn main() -> ExitCode { + if let Err(e) = color_eyre::install() { + eprintln!("failed to install error handler: {e:#}"); + return ExitCode::FAILURE; + } let cli = Cli::parse(); tracing_subscriber::fmt() @@ -36,57 +42,110 @@ fn main() -> eyre::Result<()> { .with_max_level(cli.log_level) .init(); - let sockets: Vec<_> = cli + let sockets: Vec<_> = match cli .interfaces .iter() .map(|name| can::open_can_raw(name)) - .collect::>()?; + .collect::>() + { + Ok(sockets) => sockets, + Err(e) => { + tracing::error!(error = ?e, "failed to open CAN sockets"); + return ExitCode::FAILURE; + } + }; + + const POOL_SIZE: usize = 4; + const RECYCLE_BOUND: usize = 8; - let (tx, rx) = mpsc::channel(); + let (full_tx, full_rx) = mpsc::channel::>(); + let (empty_tx, empty_rx) = mpsc::sync_channel::>(RECYCLE_BOUND); + for _ in 0..POOL_SIZE { + empty_tx + .send(Vec::with_capacity(BATCH_CAPACITY)) + .expect("recycle channel must accept initial pool"); + } - unsafe { - libc::signal( - libc::SIGINT, - signal_handler as *const () as libc::sighandler_t, - ); + for sig in [libc::SIGINT, libc::SIGTERM] { + unsafe { + libc::signal(sig, signal_handler as *const () as libc::sighandler_t); + } } let recv_handle = std::thread::spawn(move || -> eyre::Result { let mut recv = Receiver::new(sockets)?; - let total = recv.run(&STOP, &tx)?; + let total = recv.run(&STOP, &full_tx, &empty_rx)?; Ok(total) }); let formatter = CanutilsFormatter::new(cli.interfaces); - let mut writer = StdoutWriter::new(); - let mut buf = Vec::with_capacity(4096); + let header = formatter.header().map(|h| h.to_vec()); + let sink = Sink::new( + StdoutWriter::new(), + header, + 64 * 1024, + Some(Duration::from_secs(5)), + Some(Duration::from_secs(5 * 60)), + ); + let mut pipeline = Pipeline::new(formatter, vec![sink]); + + // Write-path errors are logged and recorded rather than propagated: returning early would skip + // draining the remaining batches and closing the pipeline, both of which can lose buffered data. + // Every error sets `failed` so the process still exits nonzero. + let mut failed = false; while !STOP.load(Ordering::Relaxed) { - match rx.recv_timeout(Duration::from_millis(100)) { - Ok(frame) => { - formatter.format(&frame, &mut buf); - // Drain any additional frames that are immediately ready. - while let Ok(frame) = rx.try_recv() { - formatter.format(&frame, &mut buf); + match full_rx.recv_timeout(Duration::from_millis(100)) { + Ok(mut batch) => { + if let Err(e) = pipeline.write_batch(&batch) { + tracing::error!(error = ?e, "failed to write batch"); + failed = true; } - writer.write(&buf)?; - buf.clear(); + batch.clear(); + let _ = empty_tx.try_send(batch); } - Err(mpsc::RecvTimeoutError::Timeout) => continue, + Err(mpsc::RecvTimeoutError::Timeout) => {} Err(mpsc::RecvTimeoutError::Disconnected) => break, } + if let Err(e) = pipeline.tick() { + tracing::error!(error = ?e, "periodic flush or sync failed"); + failed = true; + } } - // Drain remaining frames after stop. - while let Ok(frame) = rx.try_recv() { - formatter.format(&frame, &mut buf); + // Join before draining so we write every received frame. + match recv_handle.join() { + Ok(Ok(total)) => tracing::debug!(total_frames = total, "receiver finished"), + Ok(Err(e)) => { + tracing::error!(error = ?e, "receiver thread failed"); + failed = true; + } + Err(_) => { + tracing::error!("receiver thread panicked"); + failed = true; + } } - if !buf.is_empty() { - writer.write(&buf)?; + + // Drain everything the receiver queued before it exited. + while let Ok(mut batch) = full_rx.try_recv() { + if let Err(e) = pipeline.write_batch(&batch) { + tracing::error!(error = ?e, "failed to write batch during drain"); + failed = true; + } + batch.clear(); + let _ = empty_tx.try_send(batch); } - let total = recv_handle.join().expect("receiver thread panicked")?; - tracing::debug!(total, "receiver finished"); + // close() always runs, even after write errors: for file and zstd writers it is what writes the + // epilogue and fsyncs, so skipping it could leave output unrecoverable. + if let Err(e) = pipeline.close() { + tracing::error!(error = ?e, "failed to close pipeline"); + failed = true; + } - Ok(()) + if failed { + ExitCode::FAILURE + } else { + ExitCode::SUCCESS + } } diff --git a/candumpr/src/pipeline.rs b/candumpr/src/pipeline.rs new file mode 100644 index 0000000..839055e --- /dev/null +++ b/candumpr/src/pipeline.rs @@ -0,0 +1,195 @@ +use crate::format::Formatter; +use crate::frame::CanFrame; +use crate::recv::Timestamp; +use crate::sink::Sink; + +/// Orchestrates formatting batches of [CanFrame]s and then writing them to each [Sink] +pub struct Pipeline { + formatter: F, + pub(crate) sinks: Vec, + bufs: Vec>, + first_ts: Vec>, +} + +impl Pipeline { + /// Construct a Pipeline over a non-empty set of sinks. + /// + /// There should either be one sink, or a sink for every CAN interface being logged. + pub fn new(formatter: F, sinks: Vec) -> Self { + assert!(!sinks.is_empty(), "Pipeline requires at least one Sink"); + let n = sinks.len(); + let bufs = (0..n).map(|_| Vec::with_capacity(4096)).collect(); + let first_ts = vec![None; n]; + Self { + formatter, + sinks, + bufs, + first_ts, + } + } + + /// Write the given batch of [CanFrame]s to the [Sink]s + /// + /// With a single sink, every frame is routed to it regardless of interface index; this is both + /// the common stdout case and what keeps a frame's [CanFrame::iface_idx] from indexing past the + /// only buffer. With multiple sinks, frames are dispatched by interface index. + pub fn write_batch(&mut self, frames: &[CanFrame]) -> eyre::Result<()> { + for buf in &mut self.bufs { + buf.clear(); + } + for slot in &mut self.first_ts { + *slot = None; + } + + let single = self.sinks.len() == 1; + for frame in frames { + let idx = if single { 0 } else { frame.iface_idx }; + if self.first_ts[idx].is_none() { + self.first_ts[idx] = Some(frame.timestamp); + } + self.formatter.format(frame, &mut self.bufs[idx]); + } + + let mut first_err: Option = None; + for (i, (sink, buf)) in self.sinks.iter_mut().zip(self.bufs.iter()).enumerate() { + if buf.is_empty() { + continue; + } + // first_ts[i] is Some whenever bufs[i] is non-empty: both are written together above. + if let Err(e) = sink.write(buf, self.first_ts[i].unwrap()) { + match &first_err { + None => first_err = Some(e), + Some(_) => tracing::error!(error = ?e, "sink write failed"), + } + } + } + first_err.map_or(Ok(()), Err) + } + + /// Run each sink's periodic flush and sync checks. + pub fn tick(&mut self) -> eyre::Result<()> { + self.for_each_sink(Sink::tick) + } + + /// Flush every sink. + pub fn flush(&mut self) -> eyre::Result<()> { + self.for_each_sink(Sink::flush) + } + + /// Sync every sink. + pub fn sync(&mut self) -> eyre::Result<()> { + self.for_each_sink(Sink::sync) + } + + /// Close every sink, finalizing each writer. + pub fn close(&mut self) -> eyre::Result<()> { + self.for_each_sink(Sink::close) + } + + fn for_each_sink( + &mut self, + mut op: impl FnMut(&mut Sink) -> eyre::Result<()>, + ) -> eyre::Result<()> { + let mut first_err: Option = None; + for sink in &mut self.sinks { + if let Err(e) = op(sink) { + match &first_err { + None => first_err = Some(e), + Some(_) => tracing::error!(error = ?e, "sink operation failed"), + } + } + } + first_err.map_or(Ok(()), Err) + } +} + +#[cfg(test)] +mod tests { + use pretty_assertions::assert_eq; + + use super::*; + use crate::can::LinuxCanFrame; + use crate::format::CanutilsFormatter; + use crate::frame::Direction; + use crate::sink::Sink; + use crate::test_util::TestBufWriter; + + fn frame(iface_idx: usize, id: u32, data: &[u8]) -> CanFrame { + CanFrame { + iface_idx, + timestamp: Timestamp { + sec: 1000 + iface_idx as i64, + nsec: 0, + }, + direction: Direction::Rx, + raw: LinuxCanFrame::new(id | libc::CAN_EFF_FLAG, data), + } + } + + fn sink() -> Sink { + Sink::new(TestBufWriter::new(), None, 64 * 1024, None, None) + } + + fn bytes_in(sink: &mut Sink) -> Vec { + sink.writer + .as_any_mut() + .downcast_mut::() + .unwrap() + .bytes + .clone() + } + + fn formatted(names: &[String], frames: &[&CanFrame]) -> Vec { + let fmt = CanutilsFormatter::new(names.to_vec()); + let mut buf = Vec::new(); + for frame in frames { + fmt.format(frame, &mut buf); + } + buf + } + + #[test] + fn single_sink_dispatches_all_frames_to_index_zero() { + let names = vec![ + "can0".to_string(), + "can1".to_string(), + "can2".to_string(), + "can3".to_string(), + ]; + let mut pipeline = Pipeline::new(CanutilsFormatter::new(names.clone()), vec![sink()]); + + let frames = vec![frame(0, 0x100, &[0x01]), frame(3, 0x200, &[0x02])]; + pipeline.write_batch(&frames).unwrap(); + + let expected = formatted(&names, &[&frames[0], &frames[1]]); + assert_eq!(bytes_in(&mut pipeline.sinks[0]), expected); + } + + #[test] + fn per_interface_dispatches_by_iface_idx() { + let names = vec!["can0".to_string(), "can1".to_string(), "can2".to_string()]; + let sinks = vec![sink(), sink(), sink()]; + let mut pipeline = Pipeline::new(CanutilsFormatter::new(names.clone()), sinks); + + let frames = vec![ + frame(0, 0x100, &[0x0A]), + frame(2, 0x200, &[0x0B]), + frame(0, 0x300, &[0x0C]), + frame(1, 0x400, &[0x0D]), + ]; + pipeline.write_batch(&frames).unwrap(); + + assert_eq!( + bytes_in(&mut pipeline.sinks[0]), + formatted(&names, &[&frames[0], &frames[2]]) + ); + assert_eq!( + bytes_in(&mut pipeline.sinks[1]), + formatted(&names, &[&frames[3]]) + ); + assert_eq!( + bytes_in(&mut pipeline.sinks[2]), + formatted(&names, &[&frames[1]]) + ); + } +} diff --git a/candumpr/src/recv/receiver.rs b/candumpr/src/recv/receiver.rs index 568facc..ca11630 100644 --- a/candumpr/src/recv/receiver.rs +++ b/candumpr/src/recv/receiver.rs @@ -1,6 +1,7 @@ use std::alloc::Layout; use std::os::unix::io::{AsFd, AsRawFd, OwnedFd, RawFd}; use std::sync::atomic::{AtomicBool, AtomicU16, Ordering}; +use std::sync::mpsc; use io_uring::types::BufRingEntry; use io_uring::{IoUring, cqueue, opcode, types}; @@ -13,6 +14,10 @@ use crate::recv::Timestamp; const FRAMEBUF_COUNT: u16 = 256; const _: () = assert!(FRAMEBUF_COUNT.is_power_of_two()); +/// Capacity of a batch Vec sent over the SPSC channel. Sized to match the maximum number of +/// CQEs that can be drained in one cycle. +pub const BATCH_CAPACITY: usize = FRAMEBUF_COUNT as usize; + const BGID: u16 = 0; /// Number of CQEs to wait for before waking. @@ -112,13 +117,15 @@ impl Receiver { /// Run the receive loop until `stop` is set. /// - /// Each received frame is sent through `tx` as a [CanFrame]. If the channel is disconnected - /// (writer thread dropped), the receiver treats it as a shutdown signal. Returns the total - /// number of frames received. + /// Each CQE drain produces a Vec of [CanFrame] which is sent through `full_tx`. + /// Empty Vecs are pulled from `empty_rx` and reused, falling back to a fresh allocation + /// when the recycle channel is empty. If `full_tx` is disconnected (writer thread dropped), + /// the receiver treats it as a shutdown signal. Returns the total number of frames received. pub fn run( &mut self, stop: &AtomicBool, - tx: &std::sync::mpsc::Sender, + full_tx: &mpsc::Sender>, + empty_rx: &mpsc::Receiver>, ) -> std::io::Result { let mut total = 0u64; // We wait for BATCH_SIZE completions before waking up, but we still want to be able to @@ -163,6 +170,10 @@ impl Receiver { cqe_count += 1; } + let mut batch = empty_rx + .try_recv() + .unwrap_or_else(|_| Vec::with_capacity(BATCH_CAPACITY)); + for &(ud, result, flags) in &cqe_buf[..cqe_count] { let idx = ud as usize; @@ -184,17 +195,12 @@ impl Receiver { let meta = parse_control_data(out.control_data()); let timestamp = meta.timestamp.unwrap_or(Timestamp { sec: 0, nsec: 0 }); - let frame = CanFrame { + batch.push(CanFrame { iface_idx: idx, timestamp, direction: Direction::Rx, raw, - }; - - if tx.send(frame).is_err() { - // Channel disconnected; writer thread is gone. - return Ok(total); - } + }); total += 1; } } @@ -211,6 +217,11 @@ impl Receiver { } } } + + if !batch.is_empty() && full_tx.send(batch).is_err() { + // Channel disconnected; writer thread is gone. + return Ok(total); + } } Ok(total) @@ -337,23 +348,33 @@ mod tests { static STOP: AtomicBool = AtomicBool::new(false); STOP.store(false, Ordering::Relaxed); - let (tx, rx_chan) = mpsc::channel(); + let (full_tx, full_rx) = mpsc::channel::>(); + let (empty_tx, empty_rx) = mpsc::sync_channel::>(8); + for _ in 0..4 { + empty_tx + .send(Vec::with_capacity(super::BATCH_CAPACITY)) + .unwrap(); + } // Receiver must be created on the same thread that calls run() due to SINGLE_ISSUER. let handle = std::thread::spawn(move || { let mut recv = Receiver::new(rx_sockets).unwrap(); - recv.run(&STOP, &tx) + recv.run(&STOP, &full_tx, &empty_rx) }); let mut count = 0u64; while count < TOTAL_FRAMES { - let frame = rx_chan + let mut batch = full_rx .recv_timeout(std::time::Duration::from_millis(100)) .unwrap(); - assert!(frame.iface_idx < IFACE_COUNT); - assert_eq!(frame.direction, Direction::Rx); - assert!(frame.raw.len <= 8); - count += 1; + for frame in &batch { + assert!(frame.iface_idx < IFACE_COUNT); + assert_eq!(frame.direction, Direction::Rx); + assert!(frame.raw.len <= 8); + count += 1; + } + batch.clear(); + let _ = empty_tx.try_send(batch); } STOP.store(true, Ordering::Relaxed); diff --git a/candumpr/src/sink.rs b/candumpr/src/sink.rs new file mode 100644 index 0000000..993bd6e --- /dev/null +++ b/candumpr/src/sink.rs @@ -0,0 +1,227 @@ +use std::time::{Duration, Instant}; + +use crate::recv::Timestamp; +use crate::writer::Writer; + +/// A [Sink] manages [Writer] operations to write formatted CAN frames to whatever writer is configured +pub struct Sink { + pub(crate) writer: Box, + header: Option>, + flush_threshold_bytes: usize, + flush_interval: Option, + sync_interval: Option, + pub(crate) state: SinkState, +} + +/// Lifecycle state of a [Sink]. +pub(crate) enum SinkState { + /// Writer exists, but no frame has been written yet + Pending, + /// Writer exists and is writing + Active { + bytes_since_flush: usize, + last_flush: Instant, + last_sync: Instant, + /// Timestamp of the first frame seen by this sink, captured at activation. + #[allow(dead_code)] + timestamp: Timestamp, + }, + Closed, +} + +impl Sink { + /// Construct a Sink in the Pending state with the given pre-built writer. + pub fn new( + writer: W, + header: Option>, + flush_threshold_bytes: usize, + flush_interval: Option, + sync_interval: Option, + ) -> Self { + Self { + writer: Box::new(writer), + header, + flush_threshold_bytes, + flush_interval, + sync_interval, + state: SinkState::Pending, + } + } + + /// Write `bytes` to the writer, activating the sink on the first call. + /// + /// The bytes are expected to evenly divide CAN frames. That is, no partially formatted frames + /// should be given [Self::write]. + pub fn write(&mut self, bytes: &[u8], timestamp: Timestamp) -> eyre::Result<()> { + if matches!(self.state, SinkState::Closed) { + eyre::bail!("write to closed sink"); + } + + let mut wrote = 0; + + if matches!(self.state, SinkState::Pending) { + if let Some(header) = &self.header { + self.writer.write(header)?; + wrote += header.len(); + } + let now = Instant::now(); + self.state = SinkState::Active { + bytes_since_flush: 0, + last_flush: now, + last_sync: now, + timestamp, + }; + } + + self.writer.write(bytes)?; + wrote += bytes.len(); + + let SinkState::Active { + bytes_since_flush, + last_flush, + .. + } = &mut self.state + else { + unreachable!("state must be Active after the Pending branch above"); + }; + *bytes_since_flush += wrote; + if *bytes_since_flush >= self.flush_threshold_bytes { + self.writer.flush()?; + *bytes_since_flush = 0; + *last_flush = Instant::now(); + } + + Ok(()) + } + + /// Check the time-based flush and sync triggers + /// + /// Should be called periodically + pub fn tick(&mut self) -> eyre::Result<()> { + let SinkState::Active { + bytes_since_flush, + last_flush, + last_sync, + .. + } = &mut self.state + else { + return Ok(()); + }; + + let now = Instant::now(); + + if let Some(d) = self.sync_interval + && now.duration_since(*last_sync) >= d + { + self.writer.sync()?; + *bytes_since_flush = 0; + *last_flush = now; + *last_sync = now; + return Ok(()); + } + + if let Some(d) = self.flush_interval + && now.duration_since(*last_flush) >= d + { + self.writer.flush()?; + *bytes_since_flush = 0; + *last_flush = now; + } + + Ok(()) + } + + /// Flush the writer if Active; no-op otherwise. + pub fn flush(&mut self) -> eyre::Result<()> { + let SinkState::Active { + bytes_since_flush, + last_flush, + .. + } = &mut self.state + else { + return Ok(()); + }; + self.writer.flush()?; + *bytes_since_flush = 0; + *last_flush = Instant::now(); + Ok(()) + } + + /// Sync the writer if Active; no-op otherwise + pub fn sync(&mut self) -> eyre::Result<()> { + let SinkState::Active { + bytes_since_flush, + last_flush, + last_sync, + .. + } = &mut self.state + else { + return Ok(()); + }; + self.writer.sync()?; + *bytes_since_flush = 0; + let now = Instant::now(); + *last_flush = now; + *last_sync = now; + Ok(()) + } + + /// Finalize the writer and transition to Closed. + pub fn close(&mut self) -> eyre::Result<()> { + let result = match self.state { + SinkState::Active { .. } => self.writer.finish(), + SinkState::Pending | SinkState::Closed => Ok(()), + }; + self.state = SinkState::Closed; + result + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::recv::Timestamp; + use crate::test_util::TestBufWriter; + + fn ts(sec: i64) -> Timestamp { + Timestamp { sec, nsec: 0 } + } + + fn sink(header: Option>) -> Sink { + Sink::new(TestBufWriter::new(), header, 64 * 1024, None, None) + } + + fn bytes_in(sink: &mut Sink) -> Vec { + sink.writer + .as_any_mut() + .downcast_mut::() + .unwrap() + .bytes + .clone() + } + + #[test] + fn header_written_on_activation() { + let mut sink = sink(Some(b"HDR".to_vec())); + sink.write(b"PAYLOAD", ts(42)).unwrap(); + assert!(matches!(sink.state, SinkState::Active { .. })); + assert_eq!(bytes_in(&mut sink), b"HDRPAYLOAD"); + } + + #[test] + fn write_after_close_on_active_returns_err() { + let mut sink = sink(None); + sink.write(b"PAYLOAD", ts(42)).unwrap(); + sink.close().unwrap(); + assert!(matches!(sink.state, SinkState::Closed)); + assert!(sink.write(b"MORE", ts(43)).is_err()); + } + + #[test] + fn write_after_close_on_pending_returns_err() { + let mut sink = sink(None); + sink.close().unwrap(); + assert!(matches!(sink.state, SinkState::Closed)); + assert!(sink.write(b"PAYLOAD", ts(42)).is_err()); + } +} diff --git a/candumpr/src/write.rs b/candumpr/src/write.rs deleted file mode 100644 index be09c65..0000000 --- a/candumpr/src/write.rs +++ /dev/null @@ -1,41 +0,0 @@ -use std::io::Write; - -/// Writes formatted frame data to an output destination. -/// -/// Flushing is an internal implementation detail of each writer. -pub trait Writer { - fn write(&mut self, buf: &[u8]) -> eyre::Result<()>; -} - -/// Writes formatted output to stdout, flushing after every write for live log viewing. -pub struct StdoutWriter { - inner: std::io::BufWriter, -} - -impl Default for StdoutWriter { - fn default() -> Self { - Self::new() - } -} - -impl StdoutWriter { - pub fn new() -> Self { - Self { - inner: std::io::BufWriter::new(std::io::stdout()), - } - } -} - -impl Writer for StdoutWriter { - fn write(&mut self, buf: &[u8]) -> eyre::Result<()> { - self.inner.write_all(buf)?; - self.inner.flush()?; - Ok(()) - } -} - -impl Drop for StdoutWriter { - fn drop(&mut self) { - let _ = self.inner.flush(); - } -} diff --git a/candumpr/src/writer/mod.rs b/candumpr/src/writer/mod.rs new file mode 100644 index 0000000..1f8f46c --- /dev/null +++ b/candumpr/src/writer/mod.rs @@ -0,0 +1,63 @@ +use std::io::Write; + +/// Writes formatted frame data to an output destination. +/// +/// Each leaf decides explicitly what each method does. There are no defaults; a leaf with nothing +/// to do for a given hook returns Ok(()). +pub trait Writer { + fn write(&mut self, buf: &[u8]) -> eyre::Result<()>; + fn flush(&mut self) -> eyre::Result<()>; + fn sync(&mut self) -> eyre::Result<()>; + fn finish(&mut self) -> eyre::Result<()>; + + #[cfg(test)] + fn as_any_mut(&mut self) -> &mut dyn std::any::Any; +} + +/// Writes formatted output to stdout. +/// +/// Each [Self::write] acquires the stdout lock, writes the full buffer, and flushes; +/// flush/sync/finish are no-ops. +pub struct StdoutWriter { + stdout: std::io::Stdout, +} + +impl Default for StdoutWriter { + fn default() -> Self { + Self::new() + } +} + +impl StdoutWriter { + pub fn new() -> Self { + Self { + stdout: std::io::stdout(), + } + } +} + +impl Writer for StdoutWriter { + fn write(&mut self, buf: &[u8]) -> eyre::Result<()> { + let mut lock = self.stdout.lock(); + lock.write_all(buf)?; + lock.flush()?; + Ok(()) + } + + fn flush(&mut self) -> eyre::Result<()> { + Ok(()) + } + + fn sync(&mut self) -> eyre::Result<()> { + Ok(()) + } + + fn finish(&mut self) -> eyre::Result<()> { + Ok(()) + } + + #[cfg(test)] + fn as_any_mut(&mut self) -> &mut dyn std::any::Any { + self + } +} diff --git a/docs/design/candumpr/02-architecture.md b/docs/design/candumpr/02-architecture.md index 6e5fde0..00fd709 100644 --- a/docs/design/candumpr/02-architecture.md +++ b/docs/design/candumpr/02-architecture.md @@ -42,11 +42,12 @@ graph TD io_uring end - io_uring --> |"SPSC (unbounded)"| main_loop + io_uring --> |"SPSC: Vec"| main_loop + main_loop -.-> |"SPSC: recycled Vec"| io_uring subgraph main [Main thread] - main_loop["recv_timeout / try_recv drain loop"] - main_loop --> |CanFrame| Pipeline + main_loop["recv_timeout"] + main_loop --> |"&[CanFrame]"| Pipeline end subgraph Pipeline @@ -66,7 +67,7 @@ graph TD direction LR compressed["ZstdWriter<FileWriter>"] uncompressed["BufWriter<FileWriter>"] - live["BufWriter<StdoutWriter>"] + live["StdoutWriter"] end sink --> writers @@ -76,12 +77,12 @@ graph TD The pipeline has four layers: -| Layer | Responsibility | -| --------- | ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | -| Main loop | Recv frames from SPSC channel, call `pipeline.write_frame()`, forward SIGHUP via `pipeline.rotate()`, call `pipeline.flush()` on idle timeout, `pipeline.close()` on STOP | -| Pipeline | Owns one Formatter and a `Vec` whose length is either 1 (single-file mode) or the number of interfaces (per-interface mode). Formats frame into a shared buffer, dispatches bytes to the Sink for `frame.iface_idx`, or to `sinks[0]` in single-file mode. | -| Sink | Two-state machine (Pending/Active). Owns filename template, rotation config, retention config, header blob, file index. Handles deferred file creation, rotation decisions, retention cleanup. Constructs the Writer stack on activation. | -| Writers | Composable, each wraps a generic inner Writer. Trait has `write`, `flush`, and `finish`. Leaf implementations are FileWriter and StdoutWriter. | +| Layer | Responsibility | +| --------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| Main loop | Recv batches from SPSC channel, call `pipeline.write_batch()`, recycle the batch Vec back to the receiver, call `pipeline.tick()` each iteration for time-driven flush/sync, forward SIGHUP via `pipeline.rotate()`, `pipeline.close()` on STOP | +| Pipeline | Owns one Formatter and a `Vec` whose length is either 1 (single-file mode) or the number of interfaces (per-interface mode). Formats each frame into a per-Sink scratch buffer, then writes each non-empty buffer to its Sink once per batch. | +| Sink | Three-state machine (Pending/Active/Closed). Owns filename template, rotation config, retention config, header blob, file index. Handles deferred file creation, rotation decisions, retention cleanup. Constructs the Writer stack on activation. Terminal after `close`. | +| Writers | Composable, each wraps a generic inner Writer. Trait has `write`, `flush`, `sync`, and `finish`. Leaf implementations are FileWriter and StdoutWriter. | ## Receiver detail @@ -106,25 +107,36 @@ infinitely increase the batch size - there's a tipping point at which if we incr run the risk of filling up the recvbuf. A batch size of 32 or 64 seems like a reasonable upper limit. +## Batching CanFrames + +Sending the `CanFrame`s in batches over the channel allows the writer to format and write the whole +batch at once, which is a nice property. Using a second channel to send emptied `Vec`s back to the +receiver allows us to re-use the allocations without repeated high frequency alloc/free cycles. + +* At startup, a small pool of pre-allocated Vecs is pushed into the recycle channel. +* The receiver pulls a Vec from the recycle channel via `try_recv` and falls back to a fresh + allocation if the pool is empty. +* The main loop receives a batch, hands it to the Pipeline, clears the Vec and `try_send`s it back + to the recycle channel. If the recycle channel is full, the Vec is dropped. + ## Main loop detail -The main loop runs on the main thread. It receives frames from the SPSC channel and hands them to -the Pipeline. The loop drains all ready frames, relying on a byte threshold inside the Sink to -trigger flushes during normal operation: +The main loop runs on the main thread. It receives batches from the SPSC channel, hands them to the +Pipeline, and gives the Pipeline a periodic `tick` so its Sinks can check their flush and sync +timers: ```rust send_address_claims(&claim_ifaces) loop { - match rx.recv_timeout(100ms) { - Ok(frame) => { - pipeline.write_frame(&frame) - while let Ok(frame) = rx.try_recv() { - pipeline.write_frame(&frame) - } + match full_rx.recv_timeout(100ms) { + Ok(batch) => { + pipeline.write_batch(&batch) + recycle(batch) } - Err(Timeout) => pipeline.flush() + Err(Timeout) => {} } + pipeline.tick() if SIGHUP.swap(false) { pipeline.rotate() send_address_claims(&claim_ifaces) @@ -134,17 +146,26 @@ loop { } ``` -The `try_recv` drain terminates because the receiver produces frames at a finite rate bounded by the -CAN bus speeds. +Each batch represents one CQE drain from the receiver, so the main loop does not need to perform its +own draining: a batch already coalesces frames that were ready together. + +There are three independent triggers for pushing data down the Writer stack, all checked inside the +Sink: -Flushing is driven by a byte threshold inside the Sink, not by the drain loop. Each `write_frame` -call tracks bytes written since the last flush. When the threshold is crossed, the Sink calls -`writer.flush()` inline during the write. This produces predictable flush granularity regardless of -traffic burstiness. `flush()` is unconditional at each layer of the Writer stack: the Sink is the -single place that decides when to call it. +* **Byte threshold crossed during write** triggers `writer.flush()` immediately after the write. +* **flush_interval elapsed** triggers `writer.flush()` on the next `tick`. +* **sync_interval elapsed** triggers `writer.sync()` on the next `tick`. -The 100ms `recv_timeout` is a safety net for idle periods: when no frames arrive, the timeout fires -and `pipeline.flush()` pushes out whatever is buffered. Under normal load this timeout rarely fires. +`flush` and `sync` are only invoked between frames, never mid-write, preserving the invariant that +each `sink.write` carries whole formatted frames and that zstd block boundaries always fall between +frames. + +`pipeline.tick()` is called every loop iteration, so it fires under both sustained traffic (after +each batch) and idle periods (after each 100ms `recv_timeout`). + +`pipeline.close()` must run on every shutdown path, even when a preceding `write_batch` returns an +error. `close` is what calls `writer.finish()` on each Sink, which is where the zstd frame epilogue +is emitted and the file is fsynced. Skipping it would leave files in an unrecoverable state. Signal handling is cooperative: the main loop checks atomic flags between iterations. SIGHUP triggers rotation. SIGTERM/SIGINT trigger shutdown. Neither is latency-sensitive since finishing the @@ -169,23 +190,29 @@ offline analysis. ## Pipeline detail -The Pipeline owns a single Formatter and a `Vec`. +The Pipeline owns a single Formatter, a `Vec`, and one scratch format buffer per Sink. When the output path template contains `{interface}`, `sinks.len() == n_interfaces` and frames are dispatched by `frame.iface_idx`. Otherwise, `sinks.len() == 1`, all traffic is interleaved into that single Sink, and `frame.iface_idx` is used only by the Formatter (for the interface-name column in formats that need it), not for dispatch. -On `write_frame(&CanFrame)`: +On `write_batch(&[CanFrame])`: + +1. Clear each Sink's scratch buffer. +2. For each frame in the batch, format it into the buffer of its target Sink: `sinks[iface_idx]` in + per-interface mode, or `sinks[0]` in single-file mode. +3. For each Sink with a non-empty buffer, call `sink.write(buf)` exactly once. -1. Format the frame into a shared `Vec` buffer using the Formatter -2. Dispatch the formatted bytes to the target Sink: `sinks[frame.iface_idx]` in per-interface mode, - or `sinks[0]` in single-file mode +This collapses every frame in the batch destined for a given Sink into a single `sink.write` call, +preserving the invariant that each `sink.write` carries whole formatted frames. In single-sink mode +that is one write per batch; in per-interface mode, one write per (interface, batch) pair. -On `flush()`, `rotate()`, and `close()`: forward to all unique Sinks. +On `tick()`: forward to each Sink so it can check its `flush_interval` and `sync_interval` timers +and call `writer.flush()` / `writer.sync()` if the corresponding timer has elapsed. On `take_rotated()`: return and clear the set of interface indices whose Sinks rotated during -`write_frame()`. This allows the main loop to send address claim requests for interfaces that +`write_batch()`. This allows the main loop to send address claim requests for interfaces that rotated due to size or duration limits. ## Formatter detail @@ -206,7 +233,7 @@ Output formats: can-utils candump (file and console variants), Vector ASC, PCAP. ## Sink detail -The Sink is a two-state machine that manages one output destination. +The Sink is a three-state machine that manages one output destination. ```mermaid stateDiagram-v2 @@ -214,8 +241,9 @@ stateDiagram-v2 Pending --> Active : first write (resolve filename, open file, write header) Active --> Active : write (within rotation threshold) Active --> Pending : rotate (size/time limit hit, or SIGHUP) - Active --> [*] : close (finalize, fsync) - Pending --> [*] : close (no-op) + Active --> Closed : close (finalize, fsync) + Pending --> Closed : close (no-op) + Closed --> [*] ``` ### Pending state @@ -227,7 +255,9 @@ Holds all configuration needed to open a new file: * Rotation config (size limit or duration limit, or none) * Retention config (size limit, or none) * Compression config (on/off) -* Flush byte threshold (drives how often the Sink calls `writer.flush()` during normal writes) +* Flush byte threshold (caps the size of one in-flight zstd block / one BufWriter fill) +* Flush interval (upper bound on time between `writer.flush()` calls; optional) +* Sync interval (upper bound on time between `writer.sync()` calls; optional) * Header blob from the Formatter ### Active state @@ -242,6 +272,14 @@ Adds the live output state: * File open instant from the monotonic clock (for duration-based rotation tracking) * Current filename (for logging rotation events to stderr) +### Closed state + +Terminal state. Reached by `close`, which calls `writer.finish()` if the Sink was Active (flushing +buffers, writing the zstd frame epilogue if compressed, fsyncing the file) or no-ops if the Sink was +Pending. A Closed Sink does not accept further writes: subsequent `write` calls return an error +rather than re-opening the file. `flush`, `rotate`, and `close` on an already-Closed Sink are +no-ops. + ### Deferred file creation Files are not created until the first frame arrives. On the first `write()` call: @@ -300,12 +338,15 @@ Writers are composable. Each Writer wraps an inner Writer, forming a stack. The Writer: write(&mut self, buf: &[u8]) -> Result<()> flush(&mut self) -> Result<()> + sync(&mut self) -> Result<()> finish(&mut self) -> Result<()> ``` -* `write` -- push bytes through to the inner Writer (possibly buffering or transforming) -* `flush` -- flush internal buffers so data is recoverable up to this point -* `finish` -- finalize the stream (called on rotation and shutdown only) +* `write` -- push bytes through to the inner Writer (possibly buffering or transforming). +* `flush` -- push buffered bytes down into the kernel page cache. Does not force a disk commit. +* `sync` -- push buffered bytes down and durably commit them to storage. +* `finish` -- write any per-stream epilogue (e.g. the zstd frame trailer), then durably commit. + Called on rotation and shutdown only. ### Writer implementations @@ -313,18 +354,21 @@ Writer: * `write`: `fd.write(buf)` * `flush`: no-op (data is in the kernel page cache) -* `finish`: `fsync` +* `sync`: `fdatasync` (commit data + size to disk, skip the inode-only metadata) +* `finish`: `fsync` (commit data + all metadata to disk) **StdoutWriter** (leaf): writes to stdout, flushes eagerly for live viewing. * `write`: `stdout.write(buf)` then `stdout.flush()` * `flush`: no-op (already flushed on every write) +* `sync`: no-op * `finish`: no-op **BufWriter\**: buffers small writes into larger ones. * `write`: buffer bytes, flush to inner Writer when buffer is full * `flush`: flush buffer to inner Writer, call `inner.flush()` +* `sync`: flush buffer to inner Writer, call `inner.sync()` * `finish`: flush buffer to inner Writer, call `inner.finish()` **ZstdWriter\**: compresses data with zstd streaming compression. @@ -333,6 +377,8 @@ Writer: * `flush`: call `ZSTD_e_flush` to emit a complete compressed block to the inner Writer, then call `inner.flush()`. The Sink's flush byte threshold gates how often this runs during normal writes, so emitting too-small blocks is already avoided at that layer. +* `sync`: call `ZSTD_e_flush` to emit a complete compressed block, then call `inner.sync()`. The + zstd encoder state continues across the call; the file remains an in-progress zstd stream. * `finish`: call `ZSTD_e_end` to write the zstd frame epilogue, making the file a complete and independently decompressable zstd stream. Then call `inner.finish()`. @@ -342,23 +388,37 @@ Writer: | -------------------- | -------------------------- | | File, uncompressed | BufWriter\ | | File, compressed | ZstdWriter\ | -| Stdout, uncompressed | BufWriter\ | +| Stdout, uncompressed | StdoutWriter | | Stdout, compressed | ZstdWriter\ | No BufWriter wraps the ZstdWriter because the zstd encoder already batches output into block-sized chunks. The BufWriter is only needed when many small formatted frames (uncompressed) need to be batched into fewer `write()` syscalls. -### Flush behavior summary +We don't buffer writes to stdout because when used interactively, candumpr should be responsive. The +low-performance-impact goal matters most for the logging-daemon use case, and less for interactive +troubleshooting. + +### Flush behavior + +The Sink is the only place that decides when to call `flush`, `sync`, or `finish`; everything below +it responds unconditionally and forwards the call down its inner stack. + +| Event | ZstdWriter | BufWriter | FileWriter | StdoutWriter | +| ------------------------------------------ | ---------------------------- | ----------------------------- | ---------- | ------------ | +| `flush` (byte threshold or flush_interval) | ZSTD_e_flush + `inner.flush` | Flush buffer + `inner.flush` | No-op | No-op | +| `sync` (sync_interval) | ZSTD_e_flush + `inner.sync` | Flush buffer + `inner.sync` | fdatasync | No-op | +| `finish` (rotation / shutdown) | ZSTD_e_end + `inner.finish` | Flush buffer + `inner.finish` | fsync | No-op | + +## Durability -Both "Sink byte threshold crossed during write" and "idle-timeout `pipeline.flush()`" ultimately -call `writer.flush()` with the same semantics. The Sink is the only place that decides when to call -`flush`; everything below it responds unconditionally. +Two distinct guarantees: -| Event | ZstdWriter | BufWriter | FileWriter | StdoutWriter | -| ------------------------------- | --------------------- | --------------------- | ---------- | --------------------- | -| `flush` (byte threshold / idle) | ZSTD_e_flush | Flush buffer to inner | No-op | No-op (already eager) | -| `finish` (rotation / shutdown) | ZSTD_e_end (epilogue) | Flush buffer to inner | fsync | No-op | +* **Structurally readable on power loss.** `flush` and `sync` only ever fire between frames, so + whatever the kernel had written back at the moment of power loss truncates on a block and frame + boundary. The file is always decodable up to that point with standard tools. +* **No data lost.** Only `sync` (and `finish`) provide this. Data that has been `flush`ed but not + `sync`ed lives in the kernel page cache and may not reach disk before power loss. ## Compression detail diff --git a/docs/design/candumpr/04-configuration.md b/docs/design/candumpr/04-configuration.md index 2e2cdb8..fa005fc 100644 --- a/docs/design/candumpr/04-configuration.md +++ b/docs/design/candumpr/04-configuration.md @@ -152,17 +152,19 @@ filters = ["0x200:0x7FF"] The `[defaults]` section provides base values that all interfaces inherit from. Any option set in an `[interface.]` section overrides the corresponding default. -| Key | Type | Default | Description | -| --------------- | ----------------- | --------------------------------------------- | -------------------------------------------------------------------------------------- | -| `format` | string | `"candump-file"` | Output format: `"candump-file"`, `"candump-console"`, `"asc"`, `"pcap"` | -| `compress` | boolean | `true` | Enable zstd compression | -| `timestamp` | string | `"absolute"` | Timestamp mode: `"absolute"`, `"delta"`, `"zero"`. Only applies to the candump formats | -| `path` | string | `"{index}_{interface}_{timestamp-iso}.{ext}"` | Output file path template | -| `batch_size` | string or integer | `"auto"` | io_uring batch size | -| `rcvbuf` | integer | system default | Socket receive buffer size in bytes | -| `address_claim` | boolean | `false` | Send J1939 address claim PGN request on rotation | -| `error_frames` | boolean | `true` | Log error frames | -| `filters` | array of strings | `[]` | candump-style filters | +| Key | Type | Default | Description | +| ---------------- | ----------------- | --------------------------------------------- | -------------------------------------------------------------------------------------- | +| `format` | string | `"candump-file"` | Output format: `"candump-file"`, `"candump-console"`, `"asc"`, `"pcap"` | +| `compress` | boolean | `true` | Enable zstd compression | +| `timestamp` | string | `"absolute"` | Timestamp mode: `"absolute"`, `"delta"`, `"zero"`. Only applies to the candump formats | +| `path` | string | `"{index}_{interface}_{timestamp-iso}.{ext}"` | Output file path template | +| `batch_size` | string or integer | `"auto"` | io_uring batch size | +| `rcvbuf` | integer | system default | Socket receive buffer size in bytes | +| `address_claim` | boolean | `false` | Send J1939 address claim PGN request on rotation | +| `error_frames` | boolean | `true` | Log error frames | +| `filters` | array of strings | `[]` | candump-style filters | +| `flush_interval` | string | `"5s"` | Upper bound between `flush` calls. `"off"` disables time-based flush | +| `sync_interval` | string | `"5min"` | Upper bound between `sync` calls. `"off"` disables periodic sync | ## Rotation