Skip to content
7 changes: 7 additions & 0 deletions candumpr/src/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ use crate::frame::CanFrame;
pub trait Formatter {
/// Append the formatted representation of `frame` to `buf`.
fn format(&self, frame: &CanFrame, buf: &mut Vec<u8>);

/// Optional header bytes written once at the start of each output stream, before any frames.
///
/// Used by formats with a file-level header (e.g. PCAP).
fn header(&self) -> Option<&[u8]> {
None
}
}

/// Formats frames in the can-utils candump file format.
Expand Down
40 changes: 39 additions & 1 deletion candumpr/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,46 @@
pub mod can;
pub mod format;
pub mod frame;
pub mod pipeline;
pub mod recv;
pub mod write;
pub mod sink;
pub mod writer;

#[cfg(test)]
pub(crate) mod test_util {
pub(crate) struct TestBufWriter {
pub(crate) bytes: Vec<u8>,
}

impl TestBufWriter {
pub(crate) fn new() -> Self {
Self { bytes: Vec::new() }
}
}

impl crate::writer::Writer for TestBufWriter {
fn write(&mut self, b: &[u8]) -> eyre::Result<()> {
self.bytes.extend_from_slice(b);
Ok(())
}

fn flush(&mut self) -> eyre::Result<()> {
Ok(())
}

fn sync(&mut self) -> eyre::Result<()> {
Ok(())
}

fn finish(&mut self) -> eyre::Result<()> {
Ok(())
}

fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}
}
}

#[cfg(test)]
#[ctor::ctor]
Expand Down
123 changes: 91 additions & 32 deletions candumpr/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
use std::process::ExitCode;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::mpsc;
use std::time::Duration;

use candumpr::can;
use candumpr::format::{CanutilsFormatter, Formatter};
use candumpr::recv::receiver::Receiver;
use candumpr::write::{StdoutWriter, Writer};
use candumpr::pipeline::Pipeline;
use candumpr::recv::receiver::{BATCH_CAPACITY, Receiver};
use candumpr::sink::Sink;
use candumpr::writer::StdoutWriter;
use clap::Parser;

static STOP: AtomicBool = AtomicBool::new(false);
Expand All @@ -27,66 +30,122 @@ struct Cli {
log_level: tracing::Level,
}

fn main() -> eyre::Result<()> {
color_eyre::install()?;
fn main() -> ExitCode {
if let Err(e) = color_eyre::install() {
eprintln!("failed to install error handler: {e:#}");
return ExitCode::FAILURE;
}
let cli = Cli::parse();

tracing_subscriber::fmt()
.with_writer(std::io::stderr)
.with_max_level(cli.log_level)
.init();

let sockets: Vec<_> = cli
let sockets: Vec<_> = match cli
.interfaces
.iter()
.map(|name| can::open_can_raw(name))
.collect::<std::io::Result<_>>()?;
.collect::<std::io::Result<_>>()
{
Ok(sockets) => sockets,
Err(e) => {
tracing::error!(error = ?e, "failed to open CAN sockets");
return ExitCode::FAILURE;
}
};

const POOL_SIZE: usize = 4;
const RECYCLE_BOUND: usize = 8;

let (tx, rx) = mpsc::channel();
let (full_tx, full_rx) = mpsc::channel::<Vec<_>>();
let (empty_tx, empty_rx) = mpsc::sync_channel::<Vec<_>>(RECYCLE_BOUND);
for _ in 0..POOL_SIZE {
empty_tx
.send(Vec::with_capacity(BATCH_CAPACITY))
.expect("recycle channel must accept initial pool");
}

unsafe {
libc::signal(
libc::SIGINT,
signal_handler as *const () as libc::sighandler_t,
);
for sig in [libc::SIGINT, libc::SIGTERM] {
unsafe {
libc::signal(sig, signal_handler as *const () as libc::sighandler_t);
}
}

let recv_handle = std::thread::spawn(move || -> eyre::Result<u64> {
let mut recv = Receiver::new(sockets)?;
let total = recv.run(&STOP, &tx)?;
let total = recv.run(&STOP, &full_tx, &empty_rx)?;
Ok(total)
});

let formatter = CanutilsFormatter::new(cli.interfaces);
let mut writer = StdoutWriter::new();
let mut buf = Vec::with_capacity(4096);
let header = formatter.header().map(|h| h.to_vec());
let sink = Sink::new(
StdoutWriter::new(),
header,
64 * 1024,
Some(Duration::from_secs(5)),
Some(Duration::from_secs(5 * 60)),
);
let mut pipeline = Pipeline::new(formatter, vec![sink]);

// Write-path errors are logged and recorded rather than propagated: returning early would skip
// draining the remaining batches and closing the pipeline, both of which can lose buffered data.
// Every error sets `failed` so the process still exits nonzero.
let mut failed = false;

while !STOP.load(Ordering::Relaxed) {
match rx.recv_timeout(Duration::from_millis(100)) {
Ok(frame) => {
formatter.format(&frame, &mut buf);
// Drain any additional frames that are immediately ready.
while let Ok(frame) = rx.try_recv() {
formatter.format(&frame, &mut buf);
match full_rx.recv_timeout(Duration::from_millis(100)) {
Ok(mut batch) => {
if let Err(e) = pipeline.write_batch(&batch) {
tracing::error!(error = ?e, "failed to write batch");
failed = true;
}
writer.write(&buf)?;
buf.clear();
batch.clear();
let _ = empty_tx.try_send(batch);
}
Err(mpsc::RecvTimeoutError::Timeout) => continue,
Err(mpsc::RecvTimeoutError::Timeout) => {}
Err(mpsc::RecvTimeoutError::Disconnected) => break,
}
if let Err(e) = pipeline.tick() {
tracing::error!(error = ?e, "periodic flush or sync failed");
failed = true;
}
}

// Drain remaining frames after stop.
while let Ok(frame) = rx.try_recv() {
formatter.format(&frame, &mut buf);
// Join before draining so we write every received frame.
match recv_handle.join() {
Ok(Ok(total)) => tracing::debug!(total_frames = total, "receiver finished"),
Ok(Err(e)) => {
tracing::error!(error = ?e, "receiver thread failed");
failed = true;
}
Err(_) => {
tracing::error!("receiver thread panicked");
failed = true;
}
}
if !buf.is_empty() {
writer.write(&buf)?;

// Drain everything the receiver queued before it exited.
while let Ok(mut batch) = full_rx.try_recv() {
if let Err(e) = pipeline.write_batch(&batch) {
tracing::error!(error = ?e, "failed to write batch during drain");
failed = true;
}
batch.clear();
let _ = empty_tx.try_send(batch);
}

let total = recv_handle.join().expect("receiver thread panicked")?;
tracing::debug!(total, "receiver finished");
// close() always runs, even after write errors: for file and zstd writers it is what writes the
// epilogue and fsyncs, so skipping it could leave output unrecoverable.
if let Err(e) = pipeline.close() {
tracing::error!(error = ?e, "failed to close pipeline");
failed = true;
}

Ok(())
if failed {
ExitCode::FAILURE
} else {
ExitCode::SUCCESS
}
}
Loading