From dadafc59fa3405d6ad92c08f568b9f0726ac8cad Mon Sep 17 00:00:00 2001 From: shijiashuai Date: Tue, 26 May 2026 09:25:18 +0800 Subject: [PATCH 1/2] fix: harden parser and compression metadata Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/commands/compression_engine.rs | 25 ++++++----- src/fastq/parser.rs | 5 ++- src/fqc_writer.rs | 3 +- src/pipeline/compression.rs | 46 +++++++++++++------- src/pipeline/decompression.rs | 2 + src/pipeline/mod.rs | 6 ++- tests/test_compression_engine.rs | 70 ++++++++++++++++++++++++++++++ tests/test_parser.rs | 13 ++++++ 8 files changed, 139 insertions(+), 31 deletions(-) diff --git a/src/commands/compression_engine.rs b/src/commands/compression_engine.rs index 37ef1b4..4a02cbd 100644 --- a/src/commands/compression_engine.rs +++ b/src/commands/compression_engine.rs @@ -226,7 +226,7 @@ impl CompressionEngine { ))); } - let mut writer = FqcWriter::create(request.output_path.to_str().unwrap())?; + let mut writer = FqcWriter::create(&request.output_path)?; // Build flags let flags = build_flags( @@ -544,20 +544,23 @@ impl CompressionEngine { .and_then(|n| n.to_str()) .unwrap_or("unknown"); - let output_path = request.output_path.to_string_lossy().to_string(); - if let Some(ref path2) = input.secondary_path { pipeline.run_paired( &input.primary_path, path2, - &output_path, + &request.output_path, input_filename, input.archive_layout, )?; } else if input.is_interleaved { - pipeline.run_interleaved(&input.primary_path, &output_path, input_filename, input.archive_layout)?; + pipeline.run_interleaved( + &input.primary_path, + &request.output_path, + input_filename, + input.archive_layout, + )?; } else { - pipeline.run(&input.primary_path, &output_path, input_filename)?; + pipeline.run(&input.primary_path, &request.output_path, input_filename)?; } let stats = pipeline.stats(); @@ -575,7 +578,7 @@ impl CompressionEngine { // Build outcome let processing_stats = ProcessingStats { total_reads: stats.total_reads, - total_bases: 0, // pipeline doesn't track bases separately + total_bases: stats.total_bases, input_bytes: stats.input_bytes, output_bytes: stats.output_bytes, blocks_written: stats.total_blocks as u64, @@ -585,7 +588,7 @@ impl CompressionEngine { Ok(CompressionOutcome { mode: CompressionExecutionMode::Pipeline, detected_read_length_class: effective_length_class, - reorder_map_written: false, // Pipeline stats don't track this separately + reorder_map_written: stats.reorder_map_written, blocks_written: stats.total_blocks as usize, reads_compressed: stats.total_reads, bytes_read: stats.input_bytes, @@ -737,7 +740,7 @@ impl CompressionEngine { }; // Open writer - let mut writer = FqcWriter::create(request.output_path.to_str().unwrap())?; + let mut writer = FqcWriter::create(&request.output_path)?; let flags = build_flags( false, @@ -843,7 +846,7 @@ impl CompressionEngine { log::info!("Streaming compression mode (paired-end)"); let mut pe_reader = open_fastq_paired(input_path, input2_path)?; - let mut writer = FqcWriter::create(request.output_path.to_str().unwrap())?; + let mut writer = FqcWriter::create(&request.output_path)?; let flags = build_flags( true, @@ -960,7 +963,7 @@ impl CompressionEngine { open_fastq_interleaved(input_path)? }; - let mut writer = FqcWriter::create(request.output_path.to_str().unwrap())?; + let mut writer = FqcWriter::create(&request.output_path)?; let flags = build_flags( true, diff --git a/src/fastq/parser.rs b/src/fastq/parser.rs index d7e5575..5c370d2 100644 --- a/src/fastq/parser.rs +++ b/src/fastq/parser.rs @@ -172,7 +172,10 @@ impl FastqParser { let id_line = self.line_buf.trim_end(); if id_line.is_empty() { - return Ok(None); + return Err(FqcError::Parse(format!( + "Line {}: Blank line encountered where FASTQ record header was expected", + self.line_number + ))); } if !id_line.starts_with('@') { return Err(FqcError::Parse(format!( diff --git a/src/fqc_writer.rs b/src/fqc_writer.rs index a42ef8e..4499136 100644 --- a/src/fqc_writer.rs +++ b/src/fqc_writer.rs @@ -10,6 +10,7 @@ use byteorder::LittleEndian; use byteorder::WriteBytesExt; use std::fs::File; use std::io::{BufWriter, Seek, SeekFrom, Write}; +use std::path::Path; use xxhash_rust::xxh64::Xxh64; // ============================================================================= @@ -26,7 +27,7 @@ pub struct FqcWriter { } impl FqcWriter { - pub fn create(path: &str) -> Result { + pub fn create(path: impl AsRef) -> Result { let file = File::create(path).map_err(|e| FqcError::Io(e))?; let mut writer = BufWriter::new(file); diff --git a/src/pipeline/compression.rs b/src/pipeline/compression.rs index 173f437..2a64849 100644 --- a/src/pipeline/compression.rs +++ b/src/pipeline/compression.rs @@ -5,6 +5,7 @@ // Uses bounded channels for backpressure control. // ============================================================================= +use std::path::{Path, PathBuf}; use std::sync::Arc; use std::thread; use std::time::Instant; @@ -139,12 +140,12 @@ impl CompressionPipeline { } /// Run compression on a single-end input file - pub fn run(&mut self, input_path: &str, output_path: &str, original_filename: &str) -> Result<()> { + pub fn run(&mut self, input_path: &str, output_path: impl AsRef, original_filename: &str) -> Result<()> { self.config.validate()?; let start = Instant::now(); let threads = self.config.effective_threads(); let block_size = self.config.effective_block_size(); - let output_path_owned = output_path.to_string(); + let output_path_owned: PathBuf = output_path.as_ref().to_path_buf(); let original_filename_owned = original_filename.to_string(); log::info!("Compression pipeline: {} threads, block_size={}", threads, block_size); @@ -163,10 +164,12 @@ impl CompressionPipeline { } let total_reads = all_reads.len(); - let input_bytes: usize = all_reads - .iter() - .map(|r| r.id.len() + r.sequence.len() + r.quality.len() + 4) - .sum(); + let (input_bytes, total_bases) = all_reads.iter().fold((0usize, 0u64), |(bytes, bases), record| { + ( + bytes + record.id.len() + record.sequence.len() + record.quality.len() + 4, + bases + record.sequence.len() as u64, + ) + }); log::info!("Read {} records ({} bytes)", total_reads, input_bytes); @@ -190,6 +193,7 @@ impl CompressionPipeline { } else { (all_reads, None, None) }; + let reorder_map_written = forward_map.is_some() && self.config.save_reorder_map; // ---- Phase 2: Pipeline compression ---- let is_paired = false; @@ -350,12 +354,14 @@ impl CompressionPipeline { let elapsed = start.elapsed(); self.stats = PipelineStats { total_reads: total_reads as u64, + total_bases, total_blocks: num_chunks as u32, input_bytes: input_bytes as u64, output_bytes, processing_time_ms: elapsed.as_millis() as u64, peak_memory_bytes: 0, threads_used: threads, + reorder_map_written, }; log::info!( @@ -378,7 +384,7 @@ impl CompressionPipeline { &mut self, input1_path: &str, input2_path: &str, - output_path: &str, + output_path: impl AsRef, original_filename: &str, pe_layout: PeLayout, ) -> Result<()> { @@ -399,10 +405,12 @@ impl CompressionPipeline { // Store reads, then run pipeline (reuse single-end logic for Phase 2) let total_reads = all_reads.len(); - let input_bytes: usize = all_reads - .iter() - .map(|r| r.id.len() + r.sequence.len() + r.quality.len() + 4) - .sum(); + let (input_bytes, total_bases) = all_reads.iter().fold((0usize, 0u64), |(bytes, bases), record| { + ( + bytes + record.id.len() + record.sequence.len() + record.quality.len() + 4, + bases + record.sequence.len() as u64, + ) + }); let block_size = self.config.effective_block_size(); let threads = self.config.effective_threads(); @@ -474,12 +482,14 @@ impl CompressionPipeline { let elapsed = start.elapsed(); self.stats = PipelineStats { total_reads: total_reads as u64, + total_bases, total_blocks: total_reads.div_ceil(block_size) as u32, input_bytes: input_bytes as u64, output_bytes, processing_time_ms: elapsed.as_millis() as u64, peak_memory_bytes: 0, threads_used: threads, + reorder_map_written: forward_map.is_some() && self.config.save_reorder_map, }; Ok(()) @@ -489,7 +499,7 @@ impl CompressionPipeline { pub fn run_interleaved( &mut self, input_path: &str, - output_path: &str, + output_path: impl AsRef, original_filename: &str, pe_layout: PeLayout, ) -> Result<()> { @@ -513,10 +523,12 @@ impl CompressionPipeline { } let total_reads = all_reads.len(); - let input_bytes: usize = all_reads - .iter() - .map(|r| r.id.len() + r.sequence.len() + r.quality.len() + 4) - .sum(); + let (input_bytes, total_bases) = all_reads.iter().fold((0usize, 0u64), |(bytes, bases), record| { + ( + bytes + record.id.len() + record.sequence.len() + record.quality.len() + 4, + bases + record.sequence.len() as u64, + ) + }); let block_size = self.config.effective_block_size(); let threads = self.config.effective_threads(); @@ -588,12 +600,14 @@ impl CompressionPipeline { let elapsed = start.elapsed(); self.stats = PipelineStats { total_reads: total_reads as u64, + total_bases, total_blocks: total_reads.div_ceil(block_size) as u32, input_bytes: input_bytes as u64, output_bytes, processing_time_ms: elapsed.as_millis() as u64, peak_memory_bytes: 0, threads_used: threads, + reorder_map_written: forward_map.is_some() && self.config.save_reorder_map, }; Ok(()) diff --git a/src/pipeline/decompression.rs b/src/pipeline/decompression.rs index 2556e6e..8ac6ae9 100644 --- a/src/pipeline/decompression.rs +++ b/src/pipeline/decompression.rs @@ -388,12 +388,14 @@ impl DecompressionPipeline { let elapsed = start.elapsed(); self.stats = PipelineStats { total_reads: reads_written, + total_bases: 0, total_blocks: (end_block - start_block) as u32, input_bytes: file_size, output_bytes, processing_time_ms: elapsed.as_millis() as u64, peak_memory_bytes: 0, threads_used: threads, + reorder_map_written: false, }; log::info!( diff --git a/src/pipeline/mod.rs b/src/pipeline/mod.rs index 3e89556..71b8068 100644 --- a/src/pipeline/mod.rs +++ b/src/pipeline/mod.rs @@ -49,12 +49,14 @@ pub const MAX_BLOCK_SIZE: usize = 1_000_000; #[derive(Debug, Clone, Default)] pub struct PipelineStats { pub total_reads: u64, + pub total_bases: u64, pub total_blocks: u32, pub input_bytes: u64, pub output_bytes: u64, pub processing_time_ms: u64, pub peak_memory_bytes: usize, pub threads_used: usize, + pub reorder_map_written: bool, } impl PipelineStats { @@ -66,10 +68,10 @@ impl PipelineStats { } pub fn bits_per_base(&self) -> f64 { - if self.input_bytes == 0 { + if self.total_bases == 0 { return 0.0; } - (self.output_bytes as f64 * 8.0) / (self.input_bytes as f64 * 0.5) + (self.output_bytes as f64 * 8.0) / self.total_bases as f64 } pub fn throughput_mbps(&self) -> f64 { diff --git a/tests/test_compression_engine.rs b/tests/test_compression_engine.rs index 4f469ba..c10e629 100644 --- a/tests/test_compression_engine.rs +++ b/tests/test_compression_engine.rs @@ -5,6 +5,7 @@ use fqc::commands::compress::CompressOptions; use fqc::commands::compression_engine::{CompressionEngine, CompressionOutcome}; use fqc::commands::compression_request::{CompressionExecutionMode, CompressionInputTopology, CompressionRequest}; +use fqc::fastq::parser::open_fastq; use fqc::types::{IdMode, PeLayout, QualityMode, ReadLengthClass}; // ============================================================================= @@ -172,3 +173,72 @@ fn streaming_request_reports_streaming_mode_in_outcome() { let outcome = CompressionEngine::new().run(opts.to_request()).unwrap(); assert_eq!(outcome.mode, CompressionExecutionMode::Streaming); } + +#[test] +fn pipeline_outcome_tracks_total_bases_for_summary_metrics() { + let output = tempfile::NamedTempFile::new().unwrap(); + let opts = CompressOptions { + input_path: "tests/data/test_se.fastq".into(), + output_path: output.path().to_string_lossy().to_string(), + use_pipeline: true, + force_overwrite: true, + show_progress: false, + ..CompressOptions::default() + }; + + let outcome = CompressionEngine::new().run(opts.to_request()).unwrap(); + + let mut parser = open_fastq("tests/data/test_se.fastq").unwrap(); + let expected_total_bases: u64 = parser + .collect_all() + .unwrap() + .iter() + .map(|record| record.sequence.len() as u64) + .sum(); + + assert_eq!(outcome.stats.total_bases, expected_total_bases); + assert!(outcome.stats.bits_per_base() > 0.0); +} + +#[test] +fn pipeline_outcome_reports_written_reorder_map() { + let output = tempfile::NamedTempFile::new().unwrap(); + let opts = CompressOptions { + input_path: "tests/data/test_se.fastq".into(), + output_path: output.path().to_string_lossy().to_string(), + use_pipeline: true, + force_overwrite: true, + show_progress: false, + ..CompressOptions::default() + }; + + let outcome = CompressionEngine::new().run(opts.to_request()).unwrap(); + + assert!(outcome.reorder_map_written); +} + +#[cfg(unix)] +#[test] +fn archive_execution_accepts_non_utf8_output_path() { + use std::ffi::OsString; + use std::os::unix::ffi::OsStringExt; + + let dir = tempfile::tempdir().unwrap(); + let output_path = dir.path().join(OsString::from_vec(b"nonutf8-\xff.fqc".to_vec())); + let request = CompressionRequest { + mode: CompressionExecutionMode::Archive, + input: CompressionInputTopology::SingleFile { + input_path: "tests/data/test_se.fastq".into(), + }, + output_path: output_path.clone(), + quality_mode: QualityMode::Lossless, + id_mode: IdMode::Exact, + force_overwrite: true, + ..CompressionRequest::for_tests() + }; + + CompressionEngine::new().run(request).unwrap(); + + let bytes = std::fs::read(&output_path).unwrap(); + assert!(bytes.starts_with(&fqc::format::MAGIC_BYTES)); +} diff --git a/tests/test_parser.rs b/tests/test_parser.rs index 1807b07..2705736 100644 --- a/tests/test_parser.rs +++ b/tests/test_parser.rs @@ -205,6 +205,19 @@ fn test_parse_length_mismatch() { assert!(parser.next_record().is_err()); } +#[test] +fn test_parse_blank_line_between_records_is_error() { + let data = b"@read1\nACGT\n+\nIIII\n\n@read2\nTGCA\n+\nJJJJ\n"; + let reader = BufReader::new(data.as_slice()); + let mut parser = FastqParser::new(reader); + + let first = parser.next_record().unwrap().unwrap(); + assert_eq!(first.id, "read1"); + + let err = parser.next_record().unwrap_err(); + assert!(format!("{err}").contains("Blank line")); +} + // ============================================================================= // for_each callback // ============================================================================= From e3932dcb38b4ea1efa8648867555ed2c61e180c9 Mon Sep 17 00:00:00 2001 From: shijiashuai Date: Tue, 26 May 2026 09:38:22 +0800 Subject: [PATCH 2/2] fix: honor pipeline reorder-map settings Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/pipeline/compression.rs | 33 +++++++++++++++++++++++++-------- tests/test_e2e.rs | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 8 deletions(-) diff --git a/src/pipeline/compression.rs b/src/pipeline/compression.rs index 2a64849..b51841f 100644 --- a/src/pipeline/compression.rs +++ b/src/pipeline/compression.rs @@ -202,7 +202,7 @@ impl CompressionPipeline { !self.config.enable_reorder, self.config.quality_mode, self.config.id_mode, - forward_map.is_some(), + reorder_map_written, self.config.pe_layout, self.config.read_length_class, self.config.streaming_mode, @@ -330,7 +330,12 @@ impl CompressionPipeline { } // Write reorder map if present - if let (Some(fwd), Some(rev)) = (&forward_map, &reverse_map) { + if reorder_map_written { + let (Some(fwd), Some(rev)) = (&forward_map, &reverse_map) else { + return Err(FqcError::Compression( + "Reorder map metadata missing despite reorder_map_written=true".to_string(), + )); + }; writer.write_reorder_map(fwd, rev)?; } @@ -431,13 +436,14 @@ impl CompressionPipeline { } else { (all_reads, None, None) }; + let reorder_map_written = forward_map.is_some() && self.config.save_reorder_map; let flags = build_flags( true, !self.config.enable_reorder, self.config.quality_mode, self.config.id_mode, - forward_map.is_some(), + reorder_map_written, pe_layout, self.config.read_length_class, self.config.streaming_mode, @@ -473,7 +479,12 @@ impl CompressionPipeline { writer.write_block(&compressed)?; } - if let (Some(fwd), Some(rev)) = (&forward_map, &reverse_map) { + if reorder_map_written { + let (Some(fwd), Some(rev)) = (&forward_map, &reverse_map) else { + return Err(FqcError::Compression( + "Reorder map metadata missing despite reorder_map_written=true".to_string(), + )); + }; writer.write_reorder_map(fwd, rev)?; } @@ -489,7 +500,7 @@ impl CompressionPipeline { processing_time_ms: elapsed.as_millis() as u64, peak_memory_bytes: 0, threads_used: threads, - reorder_map_written: forward_map.is_some() && self.config.save_reorder_map, + reorder_map_written, }; Ok(()) @@ -549,13 +560,14 @@ impl CompressionPipeline { } else { (all_reads, None, None) }; + let reorder_map_written = forward_map.is_some() && self.config.save_reorder_map; let flags = build_flags( true, !self.config.enable_reorder, self.config.quality_mode, self.config.id_mode, - forward_map.is_some(), + reorder_map_written, pe_layout, self.config.read_length_class, self.config.streaming_mode, @@ -591,7 +603,12 @@ impl CompressionPipeline { writer.write_block(&compressed)?; } - if let (Some(fwd), Some(rev)) = (&forward_map, &reverse_map) { + if reorder_map_written { + let (Some(fwd), Some(rev)) = (&forward_map, &reverse_map) else { + return Err(FqcError::Compression( + "Reorder map metadata missing despite reorder_map_written=true".to_string(), + )); + }; writer.write_reorder_map(fwd, rev)?; } @@ -607,7 +624,7 @@ impl CompressionPipeline { processing_time_ms: elapsed.as_millis() as u64, peak_memory_bytes: 0, threads_used: threads, - reorder_map_written: forward_map.is_some() && self.config.save_reorder_map, + reorder_map_written, }; Ok(()) diff --git a/tests/test_e2e.rs b/tests/test_e2e.rs index 3907a0e..84efb11 100644 --- a/tests/test_e2e.rs +++ b/tests/test_e2e.rs @@ -993,6 +993,38 @@ fn test_e2e_pipeline_roundtrip() { assert_roundtrip_match(&original, &restored); } +#[test] +fn test_e2e_pipeline_respects_save_reorder_map_flag() { + use fqc::pipeline::compression::{CompressionPipeline, CompressionPipelineConfig}; + + let input = test_data_dir().join("test_se.fastq").to_string_lossy().to_string(); + let compressed = TempFile::new("e2e_pipeline_no_reorder_map.fqc"); + + let config = CompressionPipelineConfig { + num_threads: 2, + block_size: 100, + read_length_class: ReadLengthClass::Short, + quality_mode: QualityMode::Lossless, + id_mode: IdMode::Exact, + compression_level: 3, + enable_reorder: true, + save_reorder_map: false, + streaming_mode: false, + pe_layout: PeLayout::Interleaved, + memory_limit_mb: 1024, + ..Default::default() + }; + + let mut pipeline = CompressionPipeline::new(config); + pipeline.run(&input, compressed.path(), "test_se.fastq").unwrap(); + + let stats = pipeline.stats(); + assert!(!stats.reorder_map_written); + + let reader = FqcReader::open(compressed.path()).unwrap(); + assert!(!reader.has_reorder_map()); +} + // ============================================================================= // E2E: Decompression Pipeline Round-Trip // =============================================================================