From 5daeb3bc1fdf51ee88de40fc1c3c15499f880fbd Mon Sep 17 00:00:00 2001 From: Austin Gill Date: Sun, 31 May 2026 09:38:44 -0500 Subject: [PATCH 1/8] Update architecture doc with new netlink monitor --- docs/design/candumpr/02-architecture.md | 51 +++++++++++++++++++------ 1 file changed, 40 insertions(+), 11 deletions(-) diff --git a/docs/design/candumpr/02-architecture.md b/docs/design/candumpr/02-architecture.md index 00fd709..178b042 100644 --- a/docs/design/candumpr/02-architecture.md +++ b/docs/design/candumpr/02-architecture.md @@ -37,16 +37,22 @@ thread. ```mermaid graph TD can0 & can1 & can2 & can3 --> io_uring + nl[rtnetlink socket] --> nlthread subgraph recv [Receiver thread] io_uring end - io_uring --> |"SPSC: Vec"| main_loop - main_loop -.-> |"SPSC: recycled Vec"| io_uring + subgraph netlink [Netlink thread] + nlthread["poll + recvmsg"] + end + + io_uring --> |"crossbeam: Vec incl. error frames"| main_loop + main_loop -.-> |"crossbeam: recycled Vec"| io_uring + nlthread --> |"crossbeam: LinkEvent"| main_loop subgraph main [Main thread] - main_loop["recv_timeout"] + main_loop["select! over frames + link events + tick"] main_loop --> |"&[CanFrame]"| Pipeline end @@ -121,20 +127,23 @@ receiver allows us to re-use the allocations without repeated high frequency all ## Main loop detail -The main loop runs on the main thread. It receives batches from the SPSC channel, hands them to the -Pipeline, and gives the Pipeline a periodic `tick` so its Sinks can check their flush and sync -timers: +The main loop uses `crossbeam_channel::select!` over the frame channel, the link-event channel, and +a 100ms tick deadline. It writes each batch through the Pipeline, derives bus state from the batch's +error frames, logs link events, and calls `pipeline.tick()` each iteration so Sinks can check their +flush and sync timers: ```rust send_address_claims(&claim_ifaces) loop { - match full_rx.recv_timeout(100ms) { - Ok(batch) => { + select! { + recv(full_rx) => batch => { + log_bus_state(&batch) // inspect error frames for bus-state transitions pipeline.write_batch(&batch) recycle(batch) } - Err(Timeout) => {} + recv(event_rx) => link_event => handle_link_event(link_event) + default(100ms) => {} } pipeline.tick() if SIGHUP.swap(false) { @@ -142,8 +151,9 @@ loop { send_address_claims(&claim_ifaces) } send_address_claims(&pipeline.take_rotated()) - if STOP.load() { pipeline.close(); break } + if STOP.load() { break } } +// join the receiver and netlink threads, drain remaining batches, then pipeline.close() ``` Each batch represents one CQE drain from the receiver, so the main loop does not need to perform its @@ -161,7 +171,7 @@ each `sink.write` carries whole formatted frames and that zstd block boundaries frames. `pipeline.tick()` is called every loop iteration, so it fires under both sustained traffic (after -each batch) and idle periods (after each 100ms `recv_timeout`). +each batch) and idle periods (after each 100ms tick deadline). `pipeline.close()` must run on every shutdown path, even when a preceding `write_batch` returns an error. `close` is what calls `writer.finish()` on each Sink, which is where the zstd frame epilogue @@ -188,6 +198,25 @@ offline analysis. Pipeline tracks which interfaces rotated, and the main loop drains this set via `pipeline.take_rotated()` after each drain batch, sending address claims for those interfaces. +## Network state monitoring + +candumpr observes two health signals and routes both to the main thread; the receiver stays a +stateless pump. + +**Bus state**: the receive sockets enable `CAN_RAW_ERR_FILTER`, so bus-off, controller-state, and +restart notifications arrive as ordinary error frames on the frame channel, logged verbatim and +inspected by the main thread. Reception is unconditional, independent of whether error frames are +kept in the output. + +**Link state**: a dedicated thread binds an `RTMGRP_LINK` socket, dumps initial state with +`RTM_GETLINK`, and forwards `RTM_NEWLINK`/`RTM_DELLINK` as `LinkEvent`s over a second crossbeam +channel. It `poll`s with a short timeout to observe shutdown and is otherwise idle, keeping link +monitoring off the receiver's io_uring frame path at no steady-state cost. + +Neither signal is fatal; both are logged to stderr. The two streams are not strictly ordered +relative to each other. Acting on them (rotate on link-down, flush on bus-off, gate address claims) +is specified separately. + ## Pipeline detail The Pipeline owns a single Formatter, a `Vec`, and one scratch format buffer per Sink. From 031dd162390f6b1968273fb4d7a2682e84c02721 Mon Sep 17 00:00:00 2001 From: Austin Gill Date: Sun, 31 May 2026 12:07:47 -0500 Subject: [PATCH 2/8] Rename iface_idx to sock_id to disambiguate from ifindex --- candumpr/src/format.rs | 6 +++--- candumpr/src/frame.rs | 4 ++-- candumpr/src/main.rs | 8 ++++++++ candumpr/src/pipeline.rs | 12 ++++++------ candumpr/src/recv/receiver.rs | 4 ++-- docs/design/candumpr/02-architecture.md | 8 ++++---- 6 files changed, 25 insertions(+), 17 deletions(-) diff --git a/candumpr/src/format.rs b/candumpr/src/format.rs index 207a457..7559653 100644 --- a/candumpr/src/format.rs +++ b/candumpr/src/format.rs @@ -33,7 +33,7 @@ impl CanutilsFormatter { impl Formatter for CanutilsFormatter { fn format(&self, frame: &CanFrame, buf: &mut Vec) { let ts = &frame.timestamp; - let iface = &self.iface_names[frame.iface_idx]; + let iface = &self.iface_names[frame.sock_id]; let id = frame.raw.can_id & !libc::CAN_EFF_FLAG & !libc::CAN_RTR_FLAG & !libc::CAN_ERR_FLAG; write!( @@ -64,7 +64,7 @@ mod tests { #[test] fn canutils_format_basic() { let frame = CanFrame { - iface_idx: 0, + sock_id: 0, timestamp: Timestamp { sec: 1616161616, nsec: 123000, @@ -85,7 +85,7 @@ mod tests { #[test] fn canutils_format_empty_data() { let frame = CanFrame { - iface_idx: 1, + sock_id: 1, timestamp: Timestamp { sec: 0, nsec: 0 }, direction: Direction::Rx, raw: LinuxCanFrame::new(0x123 | libc::CAN_EFF_FLAG, &[]), diff --git a/candumpr/src/frame.rs b/candumpr/src/frame.rs index b06a824..8ae6a7d 100644 --- a/candumpr/src/frame.rs +++ b/candumpr/src/frame.rs @@ -15,8 +15,8 @@ pub enum Direction { /// Sent from the receiver thread to the writer thread over the SPSC channel. #[derive(Clone, Debug)] pub struct CanFrame { - /// Index into the interface list passed at startup. - pub iface_idx: usize, + /// Source interface: its position in the configured sockets vector + pub sock_id: usize, /// Receive timestamp. pub timestamp: Timestamp, /// Whether this was received from the bus or is a local TX loopback. diff --git a/candumpr/src/main.rs b/candumpr/src/main.rs index 1e0b85d..3cba4a2 100644 --- a/candumpr/src/main.rs +++ b/candumpr/src/main.rs @@ -42,6 +42,14 @@ fn main() -> ExitCode { .with_max_level(cli.log_level) .init(); + // The sockets vector defines the canonical interface ordering. The orderings of: + // + // 1. cli.interfaces + // 2. sockets + // 3. Pipeline::sinks / bufs + // 4. et al. + // + // all follow the same ordering, and are all indexed by CanFrame::sock_id let sockets: Vec<_> = match cli .interfaces .iter() diff --git a/candumpr/src/pipeline.rs b/candumpr/src/pipeline.rs index 839055e..3cbdbc3 100644 --- a/candumpr/src/pipeline.rs +++ b/candumpr/src/pipeline.rs @@ -31,7 +31,7 @@ impl Pipeline { /// Write the given batch of [CanFrame]s to the [Sink]s /// /// With a single sink, every frame is routed to it regardless of interface index; this is both - /// the common stdout case and what keeps a frame's [CanFrame::iface_idx] from indexing past the + /// the common stdout case and what keeps a frame's [CanFrame::sock_id] from indexing past the /// only buffer. With multiple sinks, frames are dispatched by interface index. pub fn write_batch(&mut self, frames: &[CanFrame]) -> eyre::Result<()> { for buf in &mut self.bufs { @@ -43,7 +43,7 @@ impl Pipeline { let single = self.sinks.len() == 1; for frame in frames { - let idx = if single { 0 } else { frame.iface_idx }; + let idx = if single { 0 } else { frame.sock_id }; if self.first_ts[idx].is_none() { self.first_ts[idx] = Some(frame.timestamp); } @@ -114,11 +114,11 @@ mod tests { use crate::sink::Sink; use crate::test_util::TestBufWriter; - fn frame(iface_idx: usize, id: u32, data: &[u8]) -> CanFrame { + fn frame(sock_id: usize, id: u32, data: &[u8]) -> CanFrame { CanFrame { - iface_idx, + sock_id, timestamp: Timestamp { - sec: 1000 + iface_idx as i64, + sec: 1000 + sock_id as i64, nsec: 0, }, direction: Direction::Rx, @@ -166,7 +166,7 @@ mod tests { } #[test] - fn per_interface_dispatches_by_iface_idx() { + fn per_interface_dispatches_by_sock_id() { let names = vec!["can0".to_string(), "can1".to_string(), "can2".to_string()]; let sinks = vec![sink(), sink(), sink()]; let mut pipeline = Pipeline::new(CanutilsFormatter::new(names.clone()), sinks); diff --git a/candumpr/src/recv/receiver.rs b/candumpr/src/recv/receiver.rs index ca11630..84e6587 100644 --- a/candumpr/src/recv/receiver.rs +++ b/candumpr/src/recv/receiver.rs @@ -196,7 +196,7 @@ impl Receiver { let timestamp = meta.timestamp.unwrap_or(Timestamp { sec: 0, nsec: 0 }); batch.push(CanFrame { - iface_idx: idx, + sock_id: idx, timestamp, direction: Direction::Rx, raw, @@ -368,7 +368,7 @@ mod tests { .recv_timeout(std::time::Duration::from_millis(100)) .unwrap(); for frame in &batch { - assert!(frame.iface_idx < IFACE_COUNT); + assert!(frame.sock_id < IFACE_COUNT); assert_eq!(frame.direction, Direction::Rx); assert!(frame.raw.len <= 8); count += 1; diff --git a/docs/design/candumpr/02-architecture.md b/docs/design/candumpr/02-architecture.md index 178b042..3ed708c 100644 --- a/docs/design/candumpr/02-architecture.md +++ b/docs/design/candumpr/02-architecture.md @@ -57,7 +57,7 @@ graph TD end subgraph Pipeline - Formatter --> |"&[u8]"| dispatch["sinks[frame.iface_idx]"] + Formatter --> |"&[u8]"| dispatch["sinks[frame.sock_id]"] end subgraph sink [Sink] @@ -222,14 +222,14 @@ is specified separately. The Pipeline owns a single Formatter, a `Vec`, and one scratch format buffer per Sink. When the output path template contains `{interface}`, `sinks.len() == n_interfaces` and frames are -dispatched by `frame.iface_idx`. Otherwise, `sinks.len() == 1`, all traffic is interleaved into that -single Sink, and `frame.iface_idx` is used only by the Formatter (for the interface-name column in +dispatched by `frame.sock_id`. Otherwise, `sinks.len() == 1`, all traffic is interleaved into that +single Sink, and `frame.sock_id` is used only by the Formatter (for the interface-name column in formats that need it), not for dispatch. On `write_batch(&[CanFrame])`: 1. Clear each Sink's scratch buffer. -2. For each frame in the batch, format it into the buffer of its target Sink: `sinks[iface_idx]` in +2. For each frame in the batch, format it into the buffer of its target Sink: `sinks[sock_id]` in per-interface mode, or `sinks[0]` in single-file mode. 3. For each Sink with a non-empty buffer, call `sink.write(buf)` exactly once. From aad928391c10b566b3e4b4299cf51da4bb732fc7 Mon Sep 17 00:00:00 2001 From: Austin Gill Date: Sun, 31 May 2026 13:13:39 -0500 Subject: [PATCH 3/8] Log the rcvbuf size on startup --- candumpr/src/can.rs | 21 +++++++++++++++++++++ candumpr/src/main.rs | 10 ++++++++++ 2 files changed, 31 insertions(+) diff --git a/candumpr/src/can.rs b/candumpr/src/can.rs index 1de34e7..a971236 100644 --- a/candumpr/src/can.rs +++ b/candumpr/src/can.rs @@ -106,6 +106,27 @@ pub fn set_recv_buffer(fd: BorrowedFd<'_>, bytes: u32) -> std::io::Result<()> { Ok(()) } +/// Get the kernel receive buffer size for the socket, in bytes. +/// +/// The kernel reports the doubled value it actually allocated (see [set_recv_buffer]). +pub fn get_recv_buffer(fd: BorrowedFd<'_>) -> std::io::Result { + let mut val: libc::c_int = 0; + let mut len = std::mem::size_of::() as libc::socklen_t; + let ret = unsafe { + libc::getsockopt( + fd.as_raw_fd(), + libc::SOL_SOCKET, + libc::SO_RCVBUF, + std::ptr::from_mut(&mut val).cast::(), + &mut len, + ) + }; + if ret != 0 { + return Err(std::io::Error::last_os_error()); + } + Ok(val as u32) +} + /// Enable receive timestamping on the socket. /// /// Requests hardware timestamps with a software fallback. The kernel delivers diff --git a/candumpr/src/main.rs b/candumpr/src/main.rs index 3cba4a2..3b27fab 100644 --- a/candumpr/src/main.rs +++ b/candumpr/src/main.rs @@ -1,3 +1,4 @@ +use std::os::unix::io::AsFd; use std::process::ExitCode; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::mpsc; @@ -63,6 +64,15 @@ fn main() -> ExitCode { } }; + for (name, sock) in cli.interfaces.iter().zip(&sockets) { + match can::get_recv_buffer(sock.as_fd()) { + Ok(bytes) => { + tracing::info!(interface = %name, rcvbuf_bytes = bytes, "opened CAN socket") + } + Err(e) => tracing::warn!(interface = %name, error = ?e, "failed to query rcvbuf size"), + } + } + const POOL_SIZE: usize = 4; const RECYCLE_BOUND: usize = 8; From 80d06bebb41e63489000bb74f693d962a234a3e7 Mon Sep 17 00:00:00 2001 From: Austin Gill Date: Sun, 31 May 2026 13:15:04 -0500 Subject: [PATCH 4/8] Listen for error frames and format the same as candump --- candumpr/src/can.rs | 14 ++++++++++++ candumpr/src/format.rs | 48 +++++++++++++++++++++++++++++++++++++++--- 2 files changed, 59 insertions(+), 3 deletions(-) diff --git a/candumpr/src/can.rs b/candumpr/src/can.rs index a971236..55154c0 100644 --- a/candumpr/src/can.rs +++ b/candumpr/src/can.rs @@ -83,6 +83,20 @@ fn open_can_socket(ifname: &str, flags: i32) -> std::io::Result { return Err(std::io::Error::last_os_error()); } + let err_mask = libc::CAN_ERR_MASK; + let ret = unsafe { + libc::setsockopt( + fd.as_raw_fd(), + libc::SOL_CAN_RAW, + libc::CAN_RAW_ERR_FILTER, + std::ptr::from_ref(&err_mask).cast::(), + std::mem::size_of_val(&err_mask) as u32, + ) + }; + if ret != 0 { + return Err(std::io::Error::last_os_error()); + } + Ok(fd) } diff --git a/candumpr/src/format.rs b/candumpr/src/format.rs index 7559653..dd1d29f 100644 --- a/candumpr/src/format.rs +++ b/candumpr/src/format.rs @@ -34,15 +34,24 @@ impl Formatter for CanutilsFormatter { fn format(&self, frame: &CanFrame, buf: &mut Vec) { let ts = &frame.timestamp; let iface = &self.iface_names[frame.sock_id]; - let id = frame.raw.can_id & !libc::CAN_EFF_FLAG & !libc::CAN_RTR_FLAG & !libc::CAN_ERR_FLAG; + let can_id = frame.raw.can_id; + + let (id, width) = if can_id & libc::CAN_ERR_FLAG != 0 { + (can_id & (libc::CAN_ERR_MASK | libc::CAN_ERR_FLAG), 8) + } else if can_id & libc::CAN_EFF_FLAG != 0 { + (can_id & libc::CAN_EFF_MASK, 8) + } else { + (can_id & libc::CAN_SFF_MASK, 3) + }; write!( buf, - "({}.{:06}) {} {:08X}#", + "({}.{:06}) {} {:0width$X}#", ts.sec, ts.nsec / 1000, iface, - id + id, + width = width, ) .unwrap(); for i in 0..frame.raw.len as usize { @@ -99,4 +108,37 @@ mod tests { "(0.000000) vcan1 00000123#\n" ); } + + #[test] + fn canutils_format_error_frame_keeps_err_flag() { + let frame = CanFrame { + sock_id: 0, + timestamp: Timestamp { sec: 1, nsec: 0 }, + direction: Direction::Rx, + raw: LinuxCanFrame::new(libc::CAN_ERR_FLAG | 0x40, &[0, 0, 0, 0, 0, 0, 0, 0]), + }; + let names = vec!["can0".to_string()]; + let fmt = CanutilsFormatter::new(names); + let mut buf = Vec::new(); + fmt.format(&frame, &mut buf); + assert_eq!( + String::from_utf8(buf).unwrap(), + "(1.000000) can0 20000040#0000000000000000\n" + ); + } + + #[test] + fn canutils_format_standard_frame_uses_three_digits() { + let frame = CanFrame { + sock_id: 0, + timestamp: Timestamp { sec: 0, nsec: 0 }, + direction: Direction::Rx, + raw: LinuxCanFrame::new(0x123, &[0xAB]), + }; + let names = vec!["can0".to_string()]; + let fmt = CanutilsFormatter::new(names); + let mut buf = Vec::new(); + fmt.format(&frame, &mut buf); + assert_eq!(String::from_utf8(buf).unwrap(), "(0.000000) can0 123#AB\n"); + } } From 05153238169291c81e4f0ce8cb6f45d5b55222e1 Mon Sep 17 00:00:00 2001 From: Austin Gill Date: Sun, 31 May 2026 13:36:45 -0500 Subject: [PATCH 5/8] Switch to crossbeam channels This will let us include the netlink link state events and select! over multiple channels nicely. --- Cargo.toml | 1 + candumpr/Cargo.toml | 1 + candumpr/src/main.rs | 36 +++++++++++++++++++++-------------- candumpr/src/recv/receiver.rs | 12 +++++------- 4 files changed, 29 insertions(+), 21 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 5bcef39..4e17c10 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,6 +17,7 @@ description = "Opinionated CAN utilities written in Rust" assert_cmd = { version = "2.2.0", features = ["color-auto"] } clap = { version = "4", features = ["derive"] } color-eyre = "0.6" +crossbeam-channel = "0.5" ctor = "0.6" eyre = "0.6" gungraun = "0.17" diff --git a/candumpr/Cargo.toml b/candumpr/Cargo.toml index 44d4074..4f3cc9f 100644 --- a/candumpr/Cargo.toml +++ b/candumpr/Cargo.toml @@ -13,6 +13,7 @@ ci = [] [dependencies] clap.workspace = true color-eyre.workspace = true +crossbeam-channel.workspace = true eyre.workspace = true io-uring.workspace = true libc.workspace = true diff --git a/candumpr/src/main.rs b/candumpr/src/main.rs index 3b27fab..250d69a 100644 --- a/candumpr/src/main.rs +++ b/candumpr/src/main.rs @@ -1,7 +1,6 @@ use std::os::unix::io::AsFd; use std::process::ExitCode; use std::sync::atomic::{AtomicBool, Ordering}; -use std::sync::mpsc; use std::time::Duration; use candumpr::can; @@ -11,6 +10,7 @@ use candumpr::recv::receiver::{BATCH_CAPACITY, Receiver}; use candumpr::sink::Sink; use candumpr::writer::StdoutWriter; use clap::Parser; +use crossbeam_channel::select; static STOP: AtomicBool = AtomicBool::new(false); @@ -76,8 +76,8 @@ fn main() -> ExitCode { const POOL_SIZE: usize = 4; const RECYCLE_BOUND: usize = 8; - let (full_tx, full_rx) = mpsc::channel::>(); - let (empty_tx, empty_rx) = mpsc::sync_channel::>(RECYCLE_BOUND); + let (full_tx, full_rx) = crossbeam_channel::unbounded::>(); + let (empty_tx, empty_rx) = crossbeam_channel::bounded::>(RECYCLE_BOUND); for _ in 0..POOL_SIZE { empty_tx .send(Vec::with_capacity(BATCH_CAPACITY)) @@ -112,25 +112,33 @@ fn main() -> ExitCode { // Every error sets `failed` so the process still exits nonzero. let mut failed = false; - while !STOP.load(Ordering::Relaxed) { - match full_rx.recv_timeout(Duration::from_millis(100)) { - Ok(mut batch) => { - if let Err(e) = pipeline.write_batch(&batch) { - tracing::error!(error = ?e, "failed to write batch"); - failed = true; + loop { + select! { + recv(full_rx) -> msg => match msg { + Ok(mut batch) => { + if let Err(e) = pipeline.write_batch(&batch) { + tracing::error!(error = ?e, "failed to write batch"); + failed = true; + } + batch.clear(); + let _ = empty_tx.try_send(batch); } - batch.clear(); - let _ = empty_tx.try_send(batch); - } - Err(mpsc::RecvTimeoutError::Timeout) => {} - Err(mpsc::RecvTimeoutError::Disconnected) => break, + Err(_) => break, + }, + default(Duration::from_millis(100)) => {} } if let Err(e) = pipeline.tick() { tracing::error!(error = ?e, "periodic flush or sync failed"); failed = true; } + if STOP.load(Ordering::Relaxed) { + break; + } } + // Set STOP so the receiver exits even if we broke out on a channel disconnect. + STOP.store(true, Ordering::Relaxed); + // Join before draining so we write every received frame. match recv_handle.join() { Ok(Ok(total)) => tracing::debug!(total_frames = total, "receiver finished"), diff --git a/candumpr/src/recv/receiver.rs b/candumpr/src/recv/receiver.rs index 84e6587..f20e378 100644 --- a/candumpr/src/recv/receiver.rs +++ b/candumpr/src/recv/receiver.rs @@ -1,7 +1,6 @@ use std::alloc::Layout; use std::os::unix::io::{AsFd, AsRawFd, OwnedFd, RawFd}; use std::sync::atomic::{AtomicBool, AtomicU16, Ordering}; -use std::sync::mpsc; use io_uring::types::BufRingEntry; use io_uring::{IoUring, cqueue, opcode, types}; @@ -35,7 +34,7 @@ const CMSG_BUF_SIZE: usize = 128; const BUF_ENTRY_SIZE: usize = RECVMSG_OUT_HDR + CMSG_BUF_SIZE + FRAME_SIZE; /// Receives CAN frames from multiple interfaces using io_uring multishot RecvMsgMulti and sends -/// them to the writer thread through an mpsc channel. +/// them to the writer thread through a crossbeam channel. pub struct Receiver { ring: IoUring, sockets: Vec, @@ -124,8 +123,8 @@ impl Receiver { pub fn run( &mut self, stop: &AtomicBool, - full_tx: &mpsc::Sender>, - empty_rx: &mpsc::Receiver>, + full_tx: &crossbeam_channel::Sender>, + empty_rx: &crossbeam_channel::Receiver>, ) -> std::io::Result { let mut total = 0u64; // We wait for BATCH_SIZE completions before waking up, but we still want to be able to @@ -313,7 +312,6 @@ fn parse_control_data(control: &[u8]) -> FrameMeta { mod tests { use std::os::unix::io::AsFd; use std::sync::atomic::{AtomicBool, Ordering}; - use std::sync::mpsc; use super::Receiver; use crate::can::{self, LinuxCanFrame}; @@ -348,8 +346,8 @@ mod tests { static STOP: AtomicBool = AtomicBool::new(false); STOP.store(false, Ordering::Relaxed); - let (full_tx, full_rx) = mpsc::channel::>(); - let (empty_tx, empty_rx) = mpsc::sync_channel::>(8); + let (full_tx, full_rx) = crossbeam_channel::unbounded::>(); + let (empty_tx, empty_rx) = crossbeam_channel::bounded::>(8); for _ in 0..4 { empty_tx .send(Vec::with_capacity(super::BATCH_CAPACITY)) From 39d3b98a2b0def35053815eafc971836c748acec Mon Sep 17 00:00:00 2001 From: Austin Gill Date: Sun, 31 May 2026 15:56:15 -0500 Subject: [PATCH 6/8] Make receiver survive net up/down --- candumpr/src/recv/receiver.rs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/candumpr/src/recv/receiver.rs b/candumpr/src/recv/receiver.rs index f20e378..70e1e66 100644 --- a/candumpr/src/recv/receiver.rs +++ b/candumpr/src/recv/receiver.rs @@ -178,7 +178,10 @@ impl Receiver { if result < 0 { let err_code = -result; - if err_code != libc::ECANCELED && err_code != libc::ENOBUFS { + if err_code != libc::ECANCELED + && err_code != libc::ENOBUFS + && err_code != libc::ENETDOWN + { return Err(std::io::Error::from_raw_os_error(err_code)); } } else if let Some(buf_id) = cqueue::buffer_select(flags) { From b2628abeea0871f90e40ac46de4ef519e2d184b7 Mon Sep 17 00:00:00 2001 From: Austin Gill Date: Sun, 31 May 2026 16:08:41 -0500 Subject: [PATCH 7/8] Handle and log netlink events --- candumpr/Cargo.toml | 1 + candumpr/src/main.rs | 53 ++++++++++++++- candumpr/src/recv/mod.rs | 1 + candumpr/src/recv/netlink.rs | 121 +++++++++++++++++++++++++++++++++++ candumpr/tests/link.rs | 66 +++++++++++++++++++ 5 files changed, 240 insertions(+), 2 deletions(-) create mode 100644 candumpr/src/recv/netlink.rs create mode 100644 candumpr/tests/link.rs diff --git a/candumpr/Cargo.toml b/candumpr/Cargo.toml index 4f3cc9f..5f22e21 100644 --- a/candumpr/Cargo.toml +++ b/candumpr/Cargo.toml @@ -17,6 +17,7 @@ crossbeam-channel.workspace = true eyre.workspace = true io-uring.workspace = true libc.workspace = true +neli.workspace = true tracing.workspace = true tracing-subscriber.workspace = true diff --git a/candumpr/src/main.rs b/candumpr/src/main.rs index 250d69a..830c63f 100644 --- a/candumpr/src/main.rs +++ b/candumpr/src/main.rs @@ -6,6 +6,7 @@ use std::time::Duration; use candumpr::can; use candumpr::format::{CanutilsFormatter, Formatter}; use candumpr::pipeline::Pipeline; +use candumpr::recv::netlink::{self, LinkEvent}; use candumpr::recv::receiver::{BATCH_CAPACITY, Receiver}; use candumpr::sink::Sink; use candumpr::writer::StdoutWriter; @@ -18,6 +19,24 @@ extern "C" fn signal_handler(_sig: libc::c_int) { STOP.store(true, Ordering::Relaxed); } +/// Log a link-state edge to stderr, ignoring repeats of the last observed state. +fn handle_link_event(event: LinkEvent, link_up: &mut [Option], names: &[String]) { + let (sock_id, up) = match event { + LinkEvent::LinkUp { sock_id } => (sock_id, true), + LinkEvent::LinkDown { sock_id } => (sock_id, false), + }; + if link_up[sock_id] == Some(up) { + return; + } + link_up[sock_id] = Some(up); + let interface = &names[sock_id]; + if up { + tracing::info!(interface = %interface, "interface link up"); + } else { + tracing::warn!(interface = %interface, "interface link down"); + } +} + /// Log CAN traffic from multiple networks. #[derive(Parser)] #[command(version)] @@ -96,6 +115,15 @@ fn main() -> ExitCode { Ok(total) }); + let (event_tx, event_rx) = crossbeam_channel::unbounded::(); + let nl_names = cli.interfaces.clone(); + let nl_handle = std::thread::spawn(move || netlink::run(&STOP, &nl_names, &event_tx)); + + // Names indexed by sock_id, kept for logging link and bus transitions on the main thread. + let names = cli.interfaces.clone(); + // Last observed link state per sock_id, so we log only edges. + let mut link_up: Vec> = vec![None; names.len()]; + let formatter = CanutilsFormatter::new(cli.interfaces); let header = formatter.header().map(|h| h.to_vec()); let sink = Sink::new( @@ -125,6 +153,10 @@ fn main() -> ExitCode { } Err(_) => break, }, + recv(event_rx) -> msg => match msg { + Ok(event) => handle_link_event(event, &mut link_up, &names), + Err(_) => break, + }, default(Duration::from_millis(100)) => {} } if let Err(e) = pipeline.tick() { @@ -146,8 +178,20 @@ fn main() -> ExitCode { tracing::error!(error = ?e, "receiver thread failed"); failed = true; } - Err(_) => { - tracing::error!("receiver thread panicked"); + Err(e) => { + tracing::error!(panic = ?e, "receiver thread panicked"); + failed = true; + } + } + + match nl_handle.join() { + Ok(Ok(())) => {} + Ok(Err(e)) => { + tracing::error!(error = ?e, "netlink thread failed"); + failed = true; + } + Err(e) => { + tracing::error!(panic = ?e, "netlink thread panicked"); failed = true; } } @@ -162,6 +206,11 @@ fn main() -> ExitCode { let _ = empty_tx.try_send(batch); } + // Log any link transitions the netlink thread queued before it exited. + while let Ok(event) = event_rx.try_recv() { + handle_link_event(event, &mut link_up, &names); + } + // close() always runs, even after write errors: for file and zstd writers it is what writes the // epilogue and fsyncs, so skipping it could leave output unrecoverable. if let Err(e) = pipeline.close() { diff --git a/candumpr/src/recv/mod.rs b/candumpr/src/recv/mod.rs index 691db72..be7ac1e 100644 --- a/candumpr/src/recv/mod.rs +++ b/candumpr/src/recv/mod.rs @@ -1,5 +1,6 @@ #[cfg(any(test, feature = "bench"))] pub mod backends; +pub mod netlink; pub mod receiver; /// Per-frame metadata delivered alongside the CAN frame. diff --git a/candumpr/src/recv/netlink.rs b/candumpr/src/recv/netlink.rs new file mode 100644 index 0000000..bf7e7b6 --- /dev/null +++ b/candumpr/src/recv/netlink.rs @@ -0,0 +1,121 @@ +use std::collections::HashMap; +use std::io::Cursor; +use std::os::unix::io::AsRawFd; +use std::sync::atomic::{AtomicBool, Ordering}; + +use crossbeam_channel::Sender; +use neli::FromBytesWithInput; +use neli::consts::nl::NlmF; +use neli::consts::rtnl::{Iff, Ifla, RtAddrFamily, Rtm}; +use neli::consts::socket::NlFamily; +use neli::nl::{NlPayload, NlmsghdrBuilder}; +use neli::rtnl::{Ifinfomsg, IfinfomsgBuilder}; +use neli::socket::synchronous::NlSocketHandle; +use neli::types::Buffer; +use neli::utils::Groups; + +const RTNLGRP_LINK: u32 = 1; + +/// A link-state transition for one configured interface, keyed by [sock_id](crate::frame::CanFrame::sock_id). +#[derive(Clone, Copy, Debug)] +pub enum LinkEvent { + LinkUp { sock_id: usize }, + LinkDown { sock_id: usize }, +} + +/// Watch link state for the configured `names`, sending a [LinkEvent] on each up/down transition. +/// +/// Intended to run on a dedicated thread: it blocks while idle and returns when the given `stop` +/// flag is set, the `events` receiver is dropped, or the netlink socket fails. +pub fn run(stop: &AtomicBool, names: &[String], events: &Sender) -> eyre::Result<()> { + let name_to_sock_id: HashMap<&str, usize> = names + .iter() + .enumerate() + .map(|(sock_id, name)| (name.as_str(), sock_id)) + .collect(); + + let socket = + NlSocketHandle::connect(NlFamily::Route, None, Groups::new_groups(&[RTNLGRP_LINK]))?; + + let dump = NlmsghdrBuilder::default() + .nl_type(Rtm::Getlink) + .nl_flags(NlmF::REQUEST | NlmF::DUMP) + .nl_payload(NlPayload::Payload( + IfinfomsgBuilder::default() + .ifi_family(RtAddrFamily::Unspecified) + .build() + .map_err(|e| eyre::eyre!("{e}"))?, + )) + .build() + .map_err(|e| eyre::eyre!("{e}"))?; + socket.send(&dump)?; + + let mut pollfd = libc::pollfd { + fd: socket.as_raw_fd(), + events: libc::POLLIN, + revents: 0, + }; + + while !stop.load(Ordering::Relaxed) { + pollfd.revents = 0; + let ret = unsafe { libc::poll(&mut pollfd, 1, 100) }; + if ret < 0 { + let err = std::io::Error::last_os_error(); + if err.kind() == std::io::ErrorKind::Interrupted { + continue; + } + return Err(err.into()); + } + if ret == 0 { + continue; + } + if pollfd.revents & libc::POLLIN == 0 { + return Err(eyre::eyre!( + "netlink socket unusable (poll revents={})", + pollfd.revents + )); + } + + let (iter, _groups) = socket.recv::()?; + for msg in iter { + let msg = msg?; + let present = match *msg.nl_type() { + libc::RTM_NEWLINK => true, + libc::RTM_DELLINK => false, + _ => continue, + }; + let Some(payload) = msg.get_payload() else { + continue; + }; + let bytes: &[u8] = payload.as_ref(); + let ifi = Ifinfomsg::from_bytes_with_input(&mut Cursor::new(bytes), bytes.len())?; + let Some(name) = ifname(&ifi) else { continue }; + let Some(&sock_id) = name_to_sock_id.get(name) else { + continue; + }; + + let is_up = present && ifi.ifi_flags().contains(Iff::UP | Iff::LOWERUP); + let event = if is_up { + LinkEvent::LinkUp { sock_id } + } else { + LinkEvent::LinkDown { sock_id } + }; + if events.send(event).is_err() { + return Ok(()); + } + } + } + Ok(()) +} + +/// Extract the interface name (IFLA_IFNAME) from a parsed link message, if present. +fn ifname(ifi: &Ifinfomsg) -> Option<&str> { + for attr in ifi.rtattrs().as_ref() { + if *attr.rta_type() == Ifla::Ifname { + let bytes: &[u8] = attr.rta_payload().as_ref(); + let end = bytes.iter().position(|&b| b == 0).unwrap_or(bytes.len()); + return std::str::from_utf8(&bytes[..end]).ok(); + } + } + None +} diff --git a/candumpr/tests/link.rs b/candumpr/tests/link.rs new file mode 100644 index 0000000..5f88ccc --- /dev/null +++ b/candumpr/tests/link.rs @@ -0,0 +1,66 @@ +use std::os::unix::io::AsFd; +use std::process::{Command, Stdio}; +use std::time::Duration; + +use candumpr::can::{self, LinuxCanFrame}; +use vcan_fixture::VcanHarness; + +#[ctor::ctor] +fn setup() { + tracing_subscriber::fmt().with_test_writer().init(); + vcan_fixture::enter_namespace(); +} + +#[test] +#[cfg_attr(feature = "ci", ignore = "requires vcan")] +fn rides_through_link_down_and_resumes() { + let vcans = VcanHarness::new(1).unwrap(); + let iface = vcans.names()[0].clone(); + + let child = Command::new(env!("CARGO_BIN_EXE_candumpr")) + .arg("--log-level=INFO") + .arg(&iface) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .spawn() + .unwrap(); + + // Let the netlink monitor connect and dump the initial (up) state. + std::thread::sleep(Duration::from_millis(400)); + + vcans.set_down(&iface).unwrap(); + std::thread::sleep(Duration::from_millis(250)); + vcans.set_up(&iface).unwrap(); + std::thread::sleep(Duration::from_millis(250)); + + // A frame sent after the link returns proves the receiver survived ENETDOWN and resumed. + let tx = can::open_can_raw_blocking(&iface).unwrap(); + can::send_frame( + tx.as_fd(), + &LinuxCanFrame::new(0x18FECA00 | libc::CAN_EFF_FLAG, &[0xDE, 0xAD]), + ) + .unwrap(); + std::thread::sleep(Duration::from_millis(250)); + + unsafe { + libc::kill(child.id() as libc::pid_t, libc::SIGINT); + } + let output = child.wait_with_output().unwrap(); + let stderr = String::from_utf8_lossy(&output.stderr); + let stdout = String::from_utf8_lossy(&output.stdout); + eprint!("{stderr}"); + print!("{stdout}"); + + assert!( + stderr.contains("interface link down"), + "expected a link-down log line, got:\n{stderr}" + ); + assert!( + stderr.contains("interface link up"), + "expected a link-up log line, got:\n{stderr}" + ); + assert!( + stdout.contains("18FECA00#DEAD"), + "expected a frame received after the link returned, got:\n{stdout}" + ); +} From 43f9122a8f7af1d3f7287b8e8ce1958640bbacb4 Mon Sep 17 00:00:00 2001 From: Austin Gill Date: Sun, 31 May 2026 16:59:32 -0500 Subject: [PATCH 8/8] Parse and log error frames I may decide to hoist the error frame log into ERROR level (and probably implement some kind of rate limiting?) --- candumpr/src/errframe.rs | 296 +++++++++++++++++++++++++++++++++++++++ candumpr/src/lib.rs | 1 + candumpr/src/main.rs | 34 +++++ 3 files changed, 331 insertions(+) create mode 100644 candumpr/src/errframe.rs diff --git a/candumpr/src/errframe.rs b/candumpr/src/errframe.rs new file mode 100644 index 0000000..c1fde49 --- /dev/null +++ b/candumpr/src/errframe.rs @@ -0,0 +1,296 @@ +//! Parsing and decoding of SocketCAN error frames (linux/can/error.h). +use std::fmt; + +use crate::can::LinuxCanFrame; + +// Error-class bits in `can_id` (masked by CAN_ERR_MASK). libc exposes only CAN_ERR_MASK. +const CAN_ERR_TX_TIMEOUT: u32 = 0x0000_0001; +const CAN_ERR_LOSTARB: u32 = 0x0000_0002; +const CAN_ERR_CRTL: u32 = 0x0000_0004; +const CAN_ERR_PROT: u32 = 0x0000_0008; +const CAN_ERR_TRX: u32 = 0x0000_0010; +const CAN_ERR_ACK: u32 = 0x0000_0020; +const CAN_ERR_BUSOFF: u32 = 0x0000_0040; +const CAN_ERR_BUSERROR: u32 = 0x0000_0080; +const CAN_ERR_RESTARTED: u32 = 0x0000_0100; +const CAN_ERR_CNT: u32 = 0x0000_0200; + +// Controller status detail in data[1] (CAN_ERR_CRTL). +const CAN_ERR_CRTL_RX_OVERFLOW: u8 = 0x01; +const CAN_ERR_CRTL_TX_OVERFLOW: u8 = 0x02; +const CAN_ERR_CRTL_RX_WARNING: u8 = 0x04; +const CAN_ERR_CRTL_TX_WARNING: u8 = 0x08; +const CAN_ERR_CRTL_RX_PASSIVE: u8 = 0x10; +const CAN_ERR_CRTL_TX_PASSIVE: u8 = 0x20; +const CAN_ERR_CRTL_ACTIVE: u8 = 0x40; + +// Protocol violation type detail in data[2] (CAN_ERR_PROT). +const CAN_ERR_PROT_BIT: u8 = 0x01; +const CAN_ERR_PROT_FORM: u8 = 0x02; +const CAN_ERR_PROT_STUFF: u8 = 0x04; +const CAN_ERR_PROT_BIT0: u8 = 0x08; +const CAN_ERR_PROT_BIT1: u8 = 0x10; +const CAN_ERR_PROT_OVERLOAD: u8 = 0x20; +const CAN_ERR_PROT_ACTIVE: u8 = 0x40; +const CAN_ERR_PROT_TX: u8 = 0x80; + +// Protocol violation location, indexed directly by data[3] (CAN_ERR_PROT). +const PROT_LOCATIONS: [&str; 32] = [ + "unspecified", // 0x00 + "unspecified", // 0x01 + "id.28-to-id.21", // 0x02 + "start-of-frame", // 0x03 + "bit-srtr", // 0x04 + "bit-ide", // 0x05 + "id.20-to-id.18", // 0x06 + "id.17-to-id.13", // 0x07 + "crc-sequence", // 0x08 + "reserved-bit-0", // 0x09 + "data-field", // 0x0A + "data-length-code", // 0x0B + "bit-rtr", // 0x0C + "reserved-bit-1", // 0x0D + "id.4-to-id.0", // 0x0E + "id.12-to-id.5", // 0x0F + "unspecified", // 0x10 + "active-error-flag", // 0x11 + "intermission", // 0x12 + "tolerate-dominant-bits", // 0x13 + "unspecified", // 0x14 + "unspecified", // 0x15 + "passive-error-flag", // 0x16 + "error-delimiter", // 0x17 + "crc-delimiter", // 0x18 + "acknowledge-slot", // 0x19 + "end-of-frame", // 0x1A + "acknowledge-delimiter", // 0x1B + "overload-flag", // 0x1C + "unspecified", // 0x1D + "unspecified", // 0x1E + "unspecified", // 0x1F +]; + +/// The error state of a CAN controller, ordered from healthy (error-active) to faulted (bus-off). +#[derive(Clone, Copy, Debug, Default, PartialEq, Eq)] +pub enum BusState { + /// ErrorActive means we're Active in the error state machine, not that there's an active error. + #[default] + ErrorActive, + ErrorWarning, + ErrorPassive, + BusOff, +} + +impl fmt::Display for BusState { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(match self { + BusState::ErrorActive => "error-active", + BusState::ErrorWarning => "error-warning", + BusState::ErrorPassive => "error-passive", + BusState::BusOff => "bus-off", + }) + } +} + +/// A SocketCAN error frame. Construct with [parse](ErrorFrame::parse); +pub struct ErrorFrame<'a>(&'a LinuxCanFrame); + +impl<'a> ErrorFrame<'a> { + /// An error-frame view of `frame`, or `None` if it is an ordinary data frame. + pub fn parse(frame: &'a LinuxCanFrame) -> Option { + (frame.can_id & libc::CAN_ERR_FLAG != 0).then_some(ErrorFrame(frame)) + } + + /// The controller state this frame reports, or `None` if it carries no controller-state change. + pub fn bus_state(&self) -> Option { + let class = self.0.can_id & libc::CAN_ERR_MASK; + if class & CAN_ERR_BUSOFF != 0 { + Some(BusState::BusOff) + } else if class & CAN_ERR_RESTARTED != 0 { + Some(BusState::ErrorActive) + } else if class & CAN_ERR_CRTL != 0 { + let ctrl = self.0.data[1]; + if ctrl & (CAN_ERR_CRTL_RX_PASSIVE | CAN_ERR_CRTL_TX_PASSIVE) != 0 { + Some(BusState::ErrorPassive) + } else if ctrl & (CAN_ERR_CRTL_RX_WARNING | CAN_ERR_CRTL_TX_WARNING) != 0 { + Some(BusState::ErrorWarning) + } else if ctrl & CAN_ERR_CRTL_ACTIVE != 0 { + Some(BusState::ErrorActive) + } else { + None + } + } else { + None + } + } +} + +impl fmt::Display for ErrorFrame<'_> { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + let class = self.0.can_id & libc::CAN_ERR_MASK; + let data = &self.0.data; + let mut tokens: Vec = Vec::new(); + + if class & CAN_ERR_TX_TIMEOUT != 0 { + tokens.push("tx-timeout".to_string()); + } + if class & CAN_ERR_LOSTARB != 0 { + tokens.push(format!("lost-arbitration{{at bit {}}}", data[0])); + } + if class & CAN_ERR_CRTL != 0 { + let mut s = String::from("controller-problem{"); + s.push_str(&controller_problems(data[1])); + s.push('}'); + tokens.push(s); + } + if class & CAN_ERR_PROT != 0 { + let mut s = String::from("protocol-violation{{"); + s.push_str(&protocol_types(data[2])); + s.push_str("}{"); + s.push_str(protocol_location(data[3])); + s.push_str("}}"); + tokens.push(s); + } + if class & CAN_ERR_TRX != 0 { + tokens.push("transceiver-status".to_string()); + } + if class & CAN_ERR_ACK != 0 { + tokens.push("no-acknowledgement-on-tx".to_string()); + } + if class & CAN_ERR_BUSOFF != 0 { + tokens.push("bus-off".to_string()); + } + if class & CAN_ERR_BUSERROR != 0 { + tokens.push("bus-error".to_string()); + } + if class & CAN_ERR_RESTARTED != 0 { + tokens.push("restarted-after-bus-off".to_string()); + } + if class & CAN_ERR_CNT != 0 { + let mut s = String::from("error-counter-tx-rx{{"); + s.push_str(&data[6].to_string()); + s.push_str("}{"); + s.push_str(&data[7].to_string()); + s.push_str("}}"); + tokens.push(s); + } + + write!(f, "{}", tokens.join(",")) + } +} + +fn controller_problems(byte: u8) -> String { + let mut tokens = Vec::new(); + for (bit, name) in [ + (CAN_ERR_CRTL_RX_OVERFLOW, "rx-overflow"), + (CAN_ERR_CRTL_TX_OVERFLOW, "tx-overflow"), + (CAN_ERR_CRTL_RX_WARNING, "rx-error-warning"), + (CAN_ERR_CRTL_TX_WARNING, "tx-error-warning"), + (CAN_ERR_CRTL_RX_PASSIVE, "rx-error-passive"), + (CAN_ERR_CRTL_TX_PASSIVE, "tx-error-passive"), + (CAN_ERR_CRTL_ACTIVE, "back-to-error-active"), + ] { + if byte & bit != 0 { + tokens.push(name); + } + } + tokens.join(",") +} + +fn protocol_types(byte: u8) -> String { + let mut tokens = Vec::new(); + for (bit, name) in [ + (CAN_ERR_PROT_BIT, "single-bit-error"), + (CAN_ERR_PROT_FORM, "frame-format-error"), + (CAN_ERR_PROT_STUFF, "bit-stuffing-error"), + (CAN_ERR_PROT_BIT0, "tx-dominant-bit-error"), + (CAN_ERR_PROT_BIT1, "tx-recessive-bit-error"), + (CAN_ERR_PROT_OVERLOAD, "bus-overload"), + (CAN_ERR_PROT_ACTIVE, "active-error"), + (CAN_ERR_PROT_TX, "error-on-tx"), + ] { + if byte & bit != 0 { + tokens.push(name); + } + } + tokens.join(",") +} + +fn protocol_location(byte: u8) -> &'static str { + PROT_LOCATIONS + .get(usize::from(byte)) + .copied() + .unwrap_or("unspecified") +} + +#[cfg(test)] +mod tests { + use pretty_assertions::assert_eq; + + use super::*; + + fn err_frame(class: u32, data: &[u8]) -> LinuxCanFrame { + LinuxCanFrame::new(libc::CAN_ERR_FLAG | class, data) + } + + fn decode(frame: &LinuxCanFrame) -> String { + ErrorFrame::parse(frame).unwrap().to_string() + } + + #[test] + fn parse_rejects_data_frames() { + let data = LinuxCanFrame::new(0x123 | libc::CAN_EFF_FLAG, &[1, 2]); + assert!(ErrorFrame::parse(&data).is_none()); + } + + #[test] + fn bus_state_from_discrete_classes() { + assert_eq!( + ErrorFrame::parse(&err_frame(CAN_ERR_BUSOFF, &[])) + .unwrap() + .bus_state(), + Some(BusState::BusOff) + ); + assert_eq!( + ErrorFrame::parse(&err_frame(CAN_ERR_RESTARTED, &[])) + .unwrap() + .bus_state(), + Some(BusState::ErrorActive) + ); + assert_eq!( + ErrorFrame::parse(&err_frame(CAN_ERR_CRTL, &[0, CAN_ERR_CRTL_TX_PASSIVE])) + .unwrap() + .bus_state(), + Some(BusState::ErrorPassive) + ); + // Counters alone are informational and do not yield a state. + assert_eq!( + ErrorFrame::parse(&err_frame(CAN_ERR_CNT, &[0, 0, 0, 0, 0, 0, 200, 0])) + .unwrap() + .bus_state(), + None + ); + } + + #[test] + fn display_decodes_like_candump() { + assert_eq!(decode(&err_frame(CAN_ERR_BUSOFF, &[])), "bus-off"); + assert_eq!( + decode(&err_frame(CAN_ERR_CRTL, &[0, CAN_ERR_CRTL_TX_PASSIVE])), + "controller-problem{tx-error-passive}" + ); + assert_eq!( + decode(&err_frame(CAN_ERR_PROT, &[0, 0, CAN_ERR_PROT_BIT, 0x02])), + "protocol-violation{{single-bit-error}{id.28-to-id.21}}" + ); + assert_eq!( + decode(&err_frame(CAN_ERR_CNT, &[0, 0, 0, 0, 0, 0, 130, 0])), + "error-counter-tx-rx{{130}{0}}" + ); + // Multiple classes in one frame are comma-separated. + assert_eq!( + decode(&err_frame(CAN_ERR_RESTARTED | CAN_ERR_LOSTARB, &[5])), + "lost-arbitration{at bit 5},restarted-after-bus-off" + ); + } +} diff --git a/candumpr/src/lib.rs b/candumpr/src/lib.rs index c1f2665..2a97856 100644 --- a/candumpr/src/lib.rs +++ b/candumpr/src/lib.rs @@ -1,4 +1,5 @@ pub mod can; +pub mod errframe; pub mod format; pub mod frame; pub mod pipeline; diff --git a/candumpr/src/main.rs b/candumpr/src/main.rs index 830c63f..1271907 100644 --- a/candumpr/src/main.rs +++ b/candumpr/src/main.rs @@ -4,7 +4,9 @@ use std::sync::atomic::{AtomicBool, Ordering}; use std::time::Duration; use candumpr::can; +use candumpr::errframe::{BusState, ErrorFrame}; use candumpr::format::{CanutilsFormatter, Formatter}; +use candumpr::frame::CanFrame; use candumpr::pipeline::Pipeline; use candumpr::recv::netlink::{self, LinkEvent}; use candumpr::recv::receiver::{BATCH_CAPACITY, Receiver}; @@ -37,6 +39,35 @@ fn handle_link_event(event: LinkEvent, link_up: &mut [Option], names: &[St } } +/// Log each error frame in `batch` at debug level, and log bus-state transitions (edges only). +fn log_error_frames(batch: &[CanFrame], bus_state: &mut [BusState], names: &[String]) { + for frame in batch { + let Some(err) = ErrorFrame::parse(&frame.raw) else { + continue; + }; + let interface = &names[frame.sock_id]; + tracing::debug!(interface = %interface, error = %err, "CAN error frame"); + + let Some(new) = err.bus_state() else { + continue; + }; + let old = bus_state[frame.sock_id]; + if old == new { + continue; + } + bus_state[frame.sock_id] = new; + match new { + BusState::ErrorActive => { + tracing::info!(interface = %interface, "bus state {old} -> {new}") + } + BusState::ErrorWarning | BusState::ErrorPassive => { + tracing::warn!(interface = %interface, "bus state {old} -> {new}") + } + BusState::BusOff => tracing::error!(interface = %interface, "bus state {old} -> {new}"), + } + } +} + /// Log CAN traffic from multiple networks. #[derive(Parser)] #[command(version)] @@ -123,6 +154,8 @@ fn main() -> ExitCode { let names = cli.interfaces.clone(); // Last observed link state per sock_id, so we log only edges. let mut link_up: Vec> = vec![None; names.len()]; + // Last logged bus state per sock_id, so we log only transitions. + let mut bus_state: Vec = vec![BusState::default(); names.len()]; let formatter = CanutilsFormatter::new(cli.interfaces); let header = formatter.header().map(|h| h.to_vec()); @@ -144,6 +177,7 @@ fn main() -> ExitCode { select! { recv(full_rx) -> msg => match msg { Ok(mut batch) => { + log_error_frames(&batch, &mut bus_state, &names); if let Err(e) = pipeline.write_batch(&batch) { tracing::error!(error = ?e, "failed to write batch"); failed = true;