From 07a7909df4819ecf379fe042f95552b7d41e8f8a Mon Sep 17 00:00:00 2001 From: John Claus Date: Thu, 23 Apr 2026 12:21:39 -0600 Subject: [PATCH] check: add NDJSON output matching daemon (fixes #570) --format json streams NDJSON to stdout; --output also writes a file copy and is still required for --pcapify. Per-input Harness, no overwrites, --show-skipped opts skipped rows in. NdjsonWriter shared with the daemon via a new with_writer() over any AsyncWrite. --- Cargo.lock | 1 + check/Cargo.toml | 3 +- check/src/main.rs | 399 ++++++++++++++++++++++++++++++++++++--- daemon/src/analysis.rs | 38 +--- doc/reanalyzing.md | 53 +++++- lib/src/lib.rs | 1 + lib/src/ndjson_writer.rs | 46 +++++ 7 files changed, 479 insertions(+), 62 deletions(-) create mode 100644 lib/src/ndjson_writer.rs diff --git a/Cargo.lock b/Cargo.lock index 2a4f2791..97c7a12f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4571,6 +4571,7 @@ dependencies = [ "log", "pcap-file-tokio", "rayhunter", + "serde", "tokio", "walkdir", ] diff --git a/check/Cargo.toml b/check/Cargo.toml index 310827a8..f8bb087e 100644 --- a/check/Cargo.toml +++ b/check/Cargo.toml @@ -7,7 +7,8 @@ edition = "2024" rayhunter = { path = "../lib" } futures = { version = "0.3.30", default-features = false } log = "0.4.20" -tokio = { version = "1.44.2", default-features = false, features = ["fs", "signal", "process", "rt-multi-thread"] } +tokio = { version = "1.44.2", default-features = false, features = ["fs", "io-std", "io-util", "signal", "process", "rt-multi-thread"] } pcap-file-tokio = "0.1.0" clap = { version = "4.5.2", features = ["derive"] } walkdir = "2.5.0" +serde = "1" diff --git a/check/src/main.rs b/check/src/main.rs index 87105341..7279ca33 100644 --- a/check/src/main.rs +++ b/check/src/main.rs @@ -1,4 +1,4 @@ -use clap::Parser; +use clap::{Parser, ValueEnum}; use futures::TryStreamExt; use log::{debug, error, info, warn}; use pcap_file_tokio::pcapng::{Block, PcapNgReader}; @@ -6,10 +6,16 @@ use rayhunter::{ analysis::analyzer::{AnalysisRow, AnalyzerConfig, EventType, Harness}, diag::DataType, gsmtap_parser, + ndjson_writer::NdjsonWriter, pcap::GsmtapPcapWriter, qmdl::QmdlReader, }; -use std::{collections::HashMap, future, path::PathBuf, pin::pin}; +use std::{ + collections::HashMap, + future, + path::{Path, PathBuf}, + pin::pin, +}; use tokio::fs::File; use walkdir::WalkDir; @@ -25,6 +31,22 @@ struct Args { #[arg(long, help = "Show why some packets were skipped during analysis")] show_skipped: bool, + #[arg( + long, + value_enum, + default_value_t = OutputFormat::Text, + value_name = "FORMAT", + help = "Output format (NDJSON is written to stdout)" + )] + format: OutputFormat, + + #[arg( + short = 'o', + long, + help = "Optional directory for output files. With --format json, NDJSON is also written to /.ndjson alongside stdout. Required for --pcapify." + )] + output: Option, + #[arg(short, long, help = "Only print warnings/errors to stdout")] quiet: bool, @@ -32,6 +54,12 @@ struct Args { debug: bool, } +#[derive(Copy, Clone, Debug, Eq, PartialEq, ValueEnum)] +enum OutputFormat { + Text, + Json, +} + #[derive(Default)] struct Report { skipped_reasons: HashMap, @@ -49,14 +77,14 @@ impl Report { } } - fn process_row(&mut self, row: AnalysisRow) { + fn process_row(&mut self, row: &AnalysisRow) { self.total_messages += 1; - if let Some(reason) = row.skipped_message_reason { - *self.skipped_reasons.entry(reason).or_insert(0) += 1; + if let Some(ref reason) = row.skipped_message_reason { + *self.skipped_reasons.entry(reason.clone()).or_insert(0) += 1; self.skipped += 1; return; } - for maybe_event in row.events { + for maybe_event in &row.events { let Some(event) = maybe_event else { continue }; let Some(timestamp) = row.packet_timestamp else { continue; @@ -90,13 +118,122 @@ impl Report { } } -async fn analyze_pcap(pcap_path: &str, show_skipped: bool) { - let mut harness = Harness::new_with_config(&AnalyzerConfig::default()); - let pcap_file = &mut File::open(&pcap_path).await.expect("failed to open file"); +// Decide whether an AnalysisRow should appear in NDJSON output. Empty rows +// (no warnings, no skipped reason) are always omitted; rows that exist only +// because a message was skipped are omitted unless --show-skipped is set, so +// NDJSON consumers see warnings by default and can opt in to the verbose +// stream the same way the text-mode summary does. +fn should_emit_to_ndjson(row: &AnalysisRow, show_skipped: bool) -> bool { + if row.is_empty() { + return false; + } + show_skipped || row.skipped_message_reason.is_none() +} + +fn output_path(output_dir: &Path, input_path: &str, extension: &str) -> PathBuf { + // Append the new extension to the input's full file name (rather than + // replacing it via Path::with_extension, which eats anything after the + // last dot in the stem). This preserves dotted stems like + // "2026-01-02_10.05.00_capture.qmdl" and keeps "session.qmdl" / + // "session.pcap" outputs distinct in the same target directory. + let input_name = Path::new(input_path) + .file_name() + .and_then(|s| s.to_str()) + .unwrap_or("output"); + output_dir.join(format!("{input_name}.{extension}")) +} + +// NDJSON sinks for one input file. Stdout is always set up (per the design +// in PR #941: ndjson always goes to stdout, --output is optional). When +// --output is provided, an extra file sink is created so the run produces a +// durable copy alongside the stream. Cross-directory collisions on the file +// sink are reported and the file is skipped, but stdout output continues so +// the user still sees the analysis. +struct NdjsonSinks { + stdout: NdjsonWriter, + file: Option, +} + +struct NdjsonFileSink { + writer: NdjsonWriter, + path: PathBuf, +} + +impl NdjsonSinks { + async fn for_input( + output_dir: Option<&Path>, + input_path: &str, + harness: &Harness, + ) -> NdjsonSinks { + let stdout = NdjsonWriter::with_writer(tokio::io::stdout()); + let mut sinks = NdjsonSinks { stdout, file: None }; + + if let Some(dir) = output_dir { + let out_path = output_path(dir, input_path, "ndjson"); + if tokio::fs::try_exists(&out_path).await.unwrap_or(false) { + error!( + "{input_path}: refusing to overwrite existing {}; skipping file copy (different inputs with the same file name collide in --output). NDJSON will still be written to stdout.", + out_path.display() + ); + } else { + let f = File::create(&out_path) + .await + .expect("failed to create ndjson file"); + let writer = NdjsonWriter::new(f); + sinks.file = Some(NdjsonFileSink { + writer, + path: out_path, + }); + } + } + + sinks + .write(&harness.get_metadata()) + .await + .expect("failed to write metadata"); + sinks + } + + async fn write(&mut self, value: &T) -> Result<(), std::io::Error> { + self.stdout.write(value).await?; + if let Some(ref mut f) = self.file { + f.writer.write(value).await?; + } + Ok(()) + } + + async fn close(self) -> Result<(), std::io::Error> { + self.stdout.close().await?; + if let Some(f) = self.file { + f.writer.close().await?; + info!("wrote {:?}", f.path); + } + Ok(()) + } +} + +async fn analyze_pcap( + pcap_path: &str, + show_skipped: bool, + format_json: bool, + output_dir: Option<&Path>, + config: &AnalyzerConfig, +) { + let mut harness = Harness::new_with_config(config); + let pcap_file = &mut File::open(pcap_path).await.expect("failed to open file"); let mut pcap_reader = PcapNgReader::new(pcap_file) .await .expect("failed to read PCAP file"); - let mut report = Report::new(pcap_path); + + let (mut ndjson, mut report) = if format_json { + ( + Some(NdjsonSinks::for_input(output_dir, pcap_path, &harness).await), + None, + ) + } else { + (None, Some(Report::new(pcap_path))) + }; + while let Some(Ok(block)) = pcap_reader.next_block().await { let row = match block { Block::EnhancedPacket(packet) => harness.analyze_pcap_packet(packet), @@ -105,14 +242,32 @@ async fn analyze_pcap(pcap_path: &str, show_skipped: bool) { continue; } }; - report.process_row(row); + match &mut ndjson { + Some(sinks) => { + if should_emit_to_ndjson(&row, show_skipped) { + sinks.write(&row).await.expect("write"); + } + } + None => report.as_mut().unwrap().process_row(&row), + } + } + + if let Some(sinks) = ndjson { + sinks.close().await.expect("failed to flush"); + } else { + report.unwrap().print_summary(show_skipped); } - report.print_summary(show_skipped); } -async fn analyze_qmdl(qmdl_path: &str, show_skipped: bool) { - let mut harness = Harness::new_with_config(&AnalyzerConfig::default()); - let qmdl_file = &mut File::open(&qmdl_path).await.expect("failed to open file"); +async fn analyze_qmdl( + qmdl_path: &str, + show_skipped: bool, + format_json: bool, + output_dir: Option<&Path>, + config: &AnalyzerConfig, +) { + let mut harness = Harness::new_with_config(config); + let qmdl_file = &mut File::open(qmdl_path).await.expect("failed to open file"); let file_size = qmdl_file .metadata() .await @@ -124,27 +279,56 @@ async fn analyze_qmdl(qmdl_path: &str, show_skipped: bool) { .as_stream() .try_filter(|container| future::ready(container.data_type == DataType::UserSpace)) ); - let mut report = Report::new(qmdl_path); + + let (mut ndjson, mut report) = if format_json { + ( + Some(NdjsonSinks::for_input(output_dir, qmdl_path, &harness).await), + None, + ) + } else { + (None, Some(Report::new(qmdl_path))) + }; + while let Some(container) = qmdl_stream .try_next() .await .expect("failed getting QMDL container") { for row in harness.analyze_qmdl_messages(container) { - report.process_row(row); + match &mut ndjson { + Some(sinks) => { + if should_emit_to_ndjson(&row, show_skipped) { + sinks.write(&row).await.expect("write"); + } + } + None => report.as_mut().unwrap().process_row(&row), + } } } - report.print_summary(show_skipped); + + if let Some(sinks) = ndjson { + sinks.close().await.expect("failed to flush"); + } else { + report.unwrap().print_summary(show_skipped); + } } -async fn pcapify(qmdl_path: &PathBuf) { - let qmdl_file = &mut File::open(&qmdl_path) +async fn pcapify(qmdl_path: &Path, output_dir: &Path) { + let qmdl_path_str = qmdl_path.to_string_lossy(); + let pcap_path = output_path(output_dir, qmdl_path_str.as_ref(), "pcapng"); + if tokio::fs::try_exists(&pcap_path).await.unwrap_or(false) { + error!( + "{}: refusing to overwrite existing {}; skipping pcapify (different inputs with the same file name collide in --output)", + qmdl_path_str, + pcap_path.display() + ); + return; + } + let qmdl_file = &mut File::open(qmdl_path) .await .expect("failed to open qmdl file"); let qmdl_file_size = qmdl_file.metadata().await.unwrap().len(); let mut qmdl_reader = QmdlReader::new(qmdl_file, Some(qmdl_file_size as usize)); - let mut pcap_path = qmdl_path.clone(); - pcap_path.set_extension("pcapng"); let pcap_file = &mut File::create(&pcap_path) .await .expect("failed to open pcap file"); @@ -177,17 +361,33 @@ async fn main() { } else { log::LevelFilter::Info }; + // All log output (info, warnings, errors) goes to stderr via env_logger. + // This keeps stdout clean so NDJSON output can be redirected independently. rayhunter::init_logging(level); - let harness = Harness::new_with_config(&AnalyzerConfig::default()); + if args.pcapify && args.output.is_none() { + error!("--output is required for --pcapify"); + std::process::exit(1); + } + + let output_dir = args.output.as_deref(); + if let Some(dir) = output_dir { + tokio::fs::create_dir_all(dir) + .await + .expect("failed to create output directory"); + } + + let analyzer_config = AnalyzerConfig::default(); + let metadata_harness = Harness::new_with_config(&analyzer_config); info!("Analyzers:"); - for analyzer in harness.get_metadata().analyzers { + for analyzer in metadata_harness.get_metadata().analyzers { info!( " - {} (v{}): {}", analyzer.name, analyzer.version, analyzer.description ); } + let format_json = args.format == OutputFormat::Json; for maybe_entry in WalkDir::new(&args.path) { let Ok(entry) = maybe_entry else { error!("failed to open dir entry {maybe_entry:?}"); @@ -201,14 +401,159 @@ async fn main() { // QMDL by inspecting the contents? if name_str.ends_with(".qmdl") { info!("**** Beginning analysis of {name_str}"); - analyze_qmdl(path_str, args.show_skipped).await; + analyze_qmdl( + path_str, + args.show_skipped, + format_json, + output_dir, + &analyzer_config, + ) + .await; if args.pcapify { - pcapify(&path.to_path_buf()).await; + let dir = output_dir.expect("--output required for --pcapify"); + pcapify(path, dir).await; } } else if name_str.ends_with(".pcap") || name_str.ends_with(".pcapng") { // TODO: if we've already analyzed a QMDL, skip its corresponding pcap info!("**** Beginning analysis of {name_str}"); - analyze_pcap(path_str, args.show_skipped).await; + analyze_pcap( + path_str, + args.show_skipped, + format_json, + output_dir, + &analyzer_config, + ) + .await; + } + } +} + +#[cfg(test)] +mod tests { + use super::{output_path, should_emit_to_ndjson}; + use rayhunter::analysis::analyzer::{AnalysisRow, Event, EventType}; + use std::path::{Path, PathBuf}; + + fn empty_row() -> AnalysisRow { + AnalysisRow { + packet_timestamp: None, + skipped_message_reason: None, + events: Vec::new(), + } + } + + fn skipped_row() -> AnalysisRow { + AnalysisRow { + packet_timestamp: None, + skipped_message_reason: Some("test reason".into()), + events: Vec::new(), + } + } + + fn warning_row() -> AnalysisRow { + AnalysisRow { + packet_timestamp: None, + skipped_message_reason: None, + events: vec![Some(Event { + event_type: EventType::High, + message: "test warning".into(), + })], + } + } + + fn informational_only_row() -> AnalysisRow { + AnalysisRow { + packet_timestamp: None, + skipped_message_reason: None, + events: vec![Some(Event { + event_type: EventType::Informational, + message: "fyi".into(), + })], } } + + #[test] + fn empty_row_never_emitted() { + assert!(!should_emit_to_ndjson(&empty_row(), false)); + assert!(!should_emit_to_ndjson(&empty_row(), true)); + } + + #[test] + fn informational_only_row_never_emitted() { + // Informational events don't count as warnings; AnalysisRow::is_empty + // treats this row as empty, so it stays out of NDJSON. + assert!(!should_emit_to_ndjson(&informational_only_row(), false)); + assert!(!should_emit_to_ndjson(&informational_only_row(), true)); + } + + #[test] + fn warning_row_always_emitted() { + assert!(should_emit_to_ndjson(&warning_row(), false)); + assert!(should_emit_to_ndjson(&warning_row(), true)); + } + + #[test] + fn skipped_row_only_emitted_with_show_skipped() { + assert!(!should_emit_to_ndjson(&skipped_row(), false)); + assert!(should_emit_to_ndjson(&skipped_row(), true)); + } + + #[test] + fn appends_extension_to_basic_name() { + let got = output_path(Path::new("/out"), "/in/capture.qmdl", "ndjson"); + assert_eq!(got, PathBuf::from("/out/capture.qmdl.ndjson")); + } + + #[test] + fn preserves_dotted_stem() { + let got = output_path(Path::new("/out"), "/in/recording.v2.qmdl", "ndjson"); + assert_eq!(got, PathBuf::from("/out/recording.v2.qmdl.ndjson")); + } + + #[test] + fn preserves_timestamped_name_with_multiple_dots() { + let got = output_path( + Path::new("/out"), + "/in/2026-01-02_10.05.00_capture.qmdl", + "ndjson", + ); + assert_eq!( + got, + PathBuf::from("/out/2026-01-02_10.05.00_capture.qmdl.ndjson") + ); + } + + #[test] + fn same_stem_with_different_extensions_stays_distinct() { + let qmdl = output_path(Path::new("/out"), "/in/session.qmdl", "ndjson"); + let pcap = output_path(Path::new("/out"), "/in/session.pcap", "ndjson"); + assert_eq!(qmdl, PathBuf::from("/out/session.qmdl.ndjson")); + assert_eq!(pcap, PathBuf::from("/out/session.pcap.ndjson")); + assert_ne!(qmdl, pcap); + } + + #[test] + fn pcapng_extension_uses_same_template() { + let got = output_path(Path::new("/out"), "/in/capture.qmdl", "pcapng"); + assert_eq!(got, PathBuf::from("/out/capture.qmdl.pcapng")); + } + + #[test] + fn input_without_file_name_falls_back_to_output_literal() { + // Path::file_name() returns None for "/" and similar; we should still + // produce a deterministic, non-panicking output path. + let got = output_path(Path::new("/out"), "/", "ndjson"); + assert_eq!(got, PathBuf::from("/out/output.ndjson")); + } + + #[test] + fn ignores_input_directory_components() { + // Outputs are flat under output_dir; only the input file name matters. + let a = output_path(Path::new("/out"), "/captures/a/capture.qmdl", "ndjson"); + let b = output_path(Path::new("/out"), "/captures/b/capture.qmdl", "ndjson"); + // Both resolve to the same path; collision detection in the caller is + // what prevents the second from clobbering the first. + assert_eq!(a, PathBuf::from("/out/capture.qmdl.ndjson")); + assert_eq!(a, b); + } } diff --git a/daemon/src/analysis.rs b/daemon/src/analysis.rs index 48c29b94..023ac020 100644 --- a/daemon/src/analysis.rs +++ b/daemon/src/analysis.rs @@ -10,10 +10,10 @@ use futures::TryStreamExt; use log::{error, info}; use rayhunter::analysis::analyzer::{AnalyzerConfig, EventType, Harness}; use rayhunter::diag::{DataType, MessagesContainer}; +use rayhunter::ndjson_writer::NdjsonWriter; use rayhunter::qmdl::QmdlReader; use serde::Serialize; use tokio::fs::File; -use tokio::io::{AsyncWriteExt, BufWriter}; use tokio::sync::mpsc::Receiver; use tokio::sync::{RwLock, RwLockWriteGuard}; use tokio_util::task::TaskTracker; @@ -22,27 +22,17 @@ use crate::qmdl_store::RecordingStore; use crate::server::ServerState; pub struct AnalysisWriter { - writer: BufWriter, + writer: NdjsonWriter, harness: Harness, } -// We write our analysis results to a file immediately to minimize the amount of -// state Rayhunter has to keep track of in memory. The analysis file's format is -// Newline Delimited JSON -// (https://docs.mulesoft.com/dataweave/latest/dataweave-formats-ndjson), which -// lets us simply append new rows to the end without parsing the entire JSON -// object beforehand. impl AnalysisWriter { pub async fn new(file: File, analyzer_config: &AnalyzerConfig) -> Result { let harness = Harness::new_with_config(analyzer_config); - - let mut result = Self { - writer: BufWriter::new(file), - harness, - }; - let metadata = result.harness.get_metadata(); - result.write(&metadata).await?; - Ok(result) + let mut writer = NdjsonWriter::new(file); + let metadata = harness.get_metadata(); + writer.write(&metadata).await?; + Ok(Self { writer, harness }) } // Runs the analysis harness on the given container, serializing the results @@ -55,25 +45,15 @@ impl AnalysisWriter { for row in self.harness.analyze_qmdl_messages(container) { if !row.is_empty() { - self.write(&row).await?; + self.writer.write(&row).await?; } max_type = cmp::max(max_type, row.get_max_event_type()); } Ok(max_type) } - async fn write(&mut self, value: &T) -> Result<(), std::io::Error> { - let mut value_str = serde_json::to_string(value).unwrap(); - value_str.push('\n'); - self.writer.write_all(value_str.as_bytes()).await?; - self.writer.flush().await?; - Ok(()) - } - - // Flushes any pending I/O to disk before dropping the writer - pub async fn close(mut self) -> Result<(), std::io::Error> { - self.writer.flush().await?; - Ok(()) + pub async fn close(self) -> Result<(), std::io::Error> { + self.writer.close().await } } diff --git a/doc/reanalyzing.md b/doc/reanalyzing.md index 06447da1..e4a32b70 100644 --- a/doc/reanalyzing.md +++ b/doc/reanalyzing.md @@ -26,15 +26,52 @@ You can build `rayhunter-check` from source with the following command: rayhunter-check [OPTIONS] --path Options: - -p, --path Path to the PCAP, or QMDL file. If given a directory will - recursively scan all pcap, qmdl, and subdirectories - -P, --pcapify Turn QMDL file into PCAP - --show-skipped Show skipped messages + -p, --path Path to the PCAP, or QMDL file. If given a directory will + recursively scan all pcap, qmdl, and subdirectories + -P, --pcapify Turn QMDL file into PCAP in --output (requires --output) + --show-skipped Show skipped messages (also includes them in NDJSON output) + --format Output format: [possible values: text, json]. + JSON is NDJSON (one record per line) and is always + written to stdout. + -o, --output Optional directory for output files. With --format json, + NDJSON is also written to /.ndjson + alongside stdout. Required for --pcapify. -q, --quiet Print only warnings - -d, --debug Print debug info + -d, --debug Print debug info -h, --help Print help -V, --version Print version ``` + +**Stdout vs. stderr:** with `--format json`, NDJSON records go to **stdout**; +all log output (including the analyzer listing, per-file headers, warnings, +and any errors) goes to **stderr**. This makes it safe to pipe stdout into +`jq` or any other NDJSON consumer without log lines mixing in. + +**Skipped messages:** rows that exist only because a packet was skipped +during analysis are omitted from NDJSON by default. Pass `--show-skipped` +to include them in stdout (and in any `--output` file copy), the same way +`--show-skipped` enables the per-reason summary in text mode. + +**Output file names:** with `--format json` or `--pcapify`, files in +`--output` are named `.`. For example, +`capture.qmdl` produces `capture.qmdl.ndjson` (and, with `--pcapify`, +`capture.qmdl.pcapng`); `capture.pcap` produces `capture.pcap.ndjson`. +This preserves dotted names like `2026-01-02_10.05.00_capture.qmdl` and +avoids collisions when a directory contains both a `.qmdl` and a `.pcap` +with the same stem. + +`rayhunter-check` will refuse to overwrite an existing file in +`--output`. If two inputs from different directories share the same file +name, the second one's file copy is skipped with an error on stderr; +NDJSON for that input still goes to stdout. Point `--output` at an empty +directory (or remove the conflicting file) and re-run if you need a +separate file copy of every input. + +When `--path` is a directory, `rayhunter-check` processes inputs +sequentially. With `--format json`, each input begins with its own +metadata record on stdout followed by its analysis rows; consumers can +key off the `analyzers` field to know when a new input starts. + ### Examples `rayhunter-check -p ~/Downloads/myfile.qmdl` @@ -43,3 +80,9 @@ Options: `rayhunter-check -p ~/Downloads #Check all files in downloads` `rayhunter-check -d -p ~/Downloads/myfile.qmdl #run in debug mode` + +`rayhunter-check -p ~/Downloads/myfile.qmdl --format json #stream NDJSON to stdout` + +`rayhunter-check -p ~/Downloads/myfile.qmdl --format json | jq . #pipe into jq` + +`rayhunter-check -p ~/Downloads/myfile.qmdl --format json -o ./reports #also write a copy to ./reports/myfile.qmdl.ndjson` diff --git a/lib/src/lib.rs b/lib/src/lib.rs index 0dae5e0d..74370af1 100644 --- a/lib/src/lib.rs +++ b/lib/src/lib.rs @@ -18,6 +18,7 @@ pub mod gsmtap; pub mod gsmtap_parser; pub mod hdlc; pub mod log_codes; +pub mod ndjson_writer; pub mod pcap; pub mod qmdl; pub mod util; diff --git a/lib/src/ndjson_writer.rs b/lib/src/ndjson_writer.rs new file mode 100644 index 00000000..4149f21e --- /dev/null +++ b/lib/src/ndjson_writer.rs @@ -0,0 +1,46 @@ +// Shared NDJSON (Newline Delimited JSON) writer used by both the daemon +// (real-time analysis) and rayhunter-check (offline analysis) to ensure +// consistent output format. +// +// We write analysis results in NDJSON format to minimize in-memory state. +// Each line is a self-contained JSON object, so we can append without +// parsing the entire file. +// +// We flush after every line so each record is durable. + +use serde::Serialize; +use tokio::fs::File; +use tokio::io::{AsyncWrite, AsyncWriteExt}; + +pub struct NdjsonWriter { + inner: Box, +} + +impl NdjsonWriter { + /// Convenience constructor that matches the daemon's existing usage: + /// build a writer backed by an open `tokio::fs::File`. + pub fn new(file: File) -> Self { + Self::with_writer(file) + } + + /// Build an NDJSON writer over any async writer (file, stdout, etc.). + /// Lets `rayhunter-check` fan output to both a file and stdout without + /// duplicating serialization logic. + pub fn with_writer(writer: W) -> Self { + Self { + inner: Box::new(writer), + } + } + + pub async fn write(&mut self, value: &T) -> Result<(), std::io::Error> { + let mut line = serde_json::to_string(value).unwrap(); + line.push('\n'); + self.inner.write_all(line.as_bytes()).await?; + self.inner.flush().await?; + Ok(()) + } + + pub async fn close(mut self) -> Result<(), std::io::Error> { + self.inner.flush().await + } +}