From 2857fa3af58c3617bb38d3e0bf5198604d74e154 Mon Sep 17 00:00:00 2001 From: shijiashuai Date: Fri, 22 May 2026 10:28:34 +0800 Subject: [PATCH] Harden archive reads and corrupted block recovery Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- src/commands/compress.rs | 1 + src/commands/compression_engine.rs | 244 +++++++------- src/commands/compression_request.rs | 57 ++++ src/commands/decompress.rs | 139 ++++++-- src/fastq/parser.rs | 14 +- src/format.rs | 92 +++++- src/fqc_reader.rs | 165 ++++++++++ src/pipeline/compression.rs | 116 ++++++- src/pipeline/decompression.rs | 117 +++++-- tests/test_compression_orchestration.rs | 180 +++++++++++ tests/test_e2e.rs | 405 ++++++++++++++++++++++++ tests/test_format.rs | 106 +++++++ tests/test_parser.rs | 13 + 13 files changed, 1438 insertions(+), 211 deletions(-) create mode 100644 tests/test_compression_orchestration.rs diff --git a/src/commands/compress.rs b/src/commands/compress.rs index ee1605f..be4f745 100644 --- a/src/commands/compress.rs +++ b/src/commands/compress.rs @@ -136,6 +136,7 @@ impl CompressOptions { CompressionInputTopology::PairedFiles { input_path_r1: PathBuf::from(&self.input_path), input_path_r2: PathBuf::from(input2_path), + archive_layout: self.pe_layout, } } else if self.interleaved { // Interleaved file diff --git a/src/commands/compression_engine.rs b/src/commands/compression_engine.rs index e3a1fc4..37ef1b4 100644 --- a/src/commands/compression_engine.rs +++ b/src/commands/compression_engine.rs @@ -8,9 +8,9 @@ use crate::algo::block_compressor::{BlockCompressor, BlockCompressorConfig, CompressedBlockData}; use crate::algo::global_analyzer::{GlobalAnalyzer, GlobalAnalyzerConfig}; -use crate::commands::compression_request::{CompressionExecutionMode, CompressionInputTopology, CompressionRequest}; +use crate::commands::compression_request::{CompressionExecutionMode, CompressionRequest}; use crate::error::{FqcError, Result}; -use crate::fastq::parser::{open_fastq, open_fastq_paired, open_fastq_stdin}; +use crate::fastq::parser::{open_fastq, open_fastq_interleaved, open_fastq_paired, open_fastq_stdin}; use crate::format::{build_flags, GlobalHeader}; use crate::fqc_writer::FqcWriter; use crate::types::*; @@ -152,27 +152,16 @@ impl CompressionEngine { /// 5. Write archive with optional reorder map #[allow(clippy::needless_pass_by_value)] fn run_archive(&self, request: CompressionRequest) -> Result { - // Determine input path and pairing - let (input_path, input2_path, is_paired) = match &request.input { - CompressionInputTopology::SingleFile { input_path } => { - (input_path.to_string_lossy().to_string(), None, false) - } - CompressionInputTopology::PairedFiles { - input_path_r1, - input_path_r2, - } => ( - input_path_r1.to_string_lossy().to_string(), - Some(input_path_r2.to_string_lossy().to_string()), - true, - ), - CompressionInputTopology::InterleavedFile { input_path, .. } => { - (input_path.to_string_lossy().to_string(), None, true) - } - CompressionInputTopology::Stdin { .. } => ("-".to_string(), None, false), - }; + let input = request.input.resolve(); - log::info!("Reading input file: {}", input_path); - let records = Self::read_all_records(&input_path, input2_path.as_deref())?; + log::info!("Reading input file: {}", input.primary_path); + let records = if let Some(path2) = input.secondary_path.as_deref() { + Self::read_all_paired_records(&input.primary_path, path2, input.archive_layout)? + } else if input.is_interleaved { + Self::read_all_interleaved_records(&input.primary_path, input.archive_layout)? + } else { + Self::read_all_records(&input.primary_path)? + }; if records.is_empty() { return Err(FqcError::InvalidArgument( @@ -195,7 +184,7 @@ impl CompressionEngine { &length_stats, request.max_block_bases, ); - let enable_reorder = !is_paired && effective_length_class == ReadLengthClass::Short; + let enable_reorder = !input.is_paired && effective_length_class == ReadLengthClass::Short; Self::enforce_archive_mode_memory_limit(records.len(), block_size, &length_stats, request.memory_limit_mb)?; @@ -240,17 +229,13 @@ impl CompressionEngine { let mut writer = FqcWriter::create(request.output_path.to_str().unwrap())?; // Build flags - let pe_layout = match &request.input { - CompressionInputTopology::InterleavedFile { archive_layout, .. } => *archive_layout, - _ => PeLayout::Interleaved, - }; let flags = build_flags( - is_paired, + input.is_paired, !analysis.reordering_performed, request.quality_mode, request.id_mode, analysis.reordering_performed, - pe_layout, + input.archive_layout, effective_length_class, false, // not streaming ); @@ -260,7 +245,7 @@ impl CompressionEngine { .map(|d| d.as_secs()) .unwrap_or(0); - let input_filename = std::path::Path::new(&input_path) + let input_filename = std::path::Path::new(&input.primary_path) .file_name() .and_then(|n| n.to_str()) .unwrap_or("unknown"); @@ -406,39 +391,21 @@ impl CompressionEngine { ))); } - // Determine input path and pairing - let (input_path, input2_path, _is_paired, is_interleaved) = match &request.input { - CompressionInputTopology::SingleFile { input_path } => { - (input_path.to_string_lossy().to_string(), None, false, false) - } - CompressionInputTopology::PairedFiles { - input_path_r1, - input_path_r2, - } => ( - input_path_r1.to_string_lossy().to_string(), - Some(input_path_r2.to_string_lossy().to_string()), - true, - false, - ), - CompressionInputTopology::InterleavedFile { - input_path, - archive_layout: _, - } => (input_path.to_string_lossy().to_string(), None, true, true), - CompressionInputTopology::Stdin { archive_layout } => { - let is_interleaved = archive_layout.is_some(); - ("-".to_string(), None, is_interleaved, is_interleaved) - } - }; + let input = request.input.resolve(); // Inspect input lengths - let length_stats = - Self::inspect_input_lengths_for_streaming(&input_path, input2_path.as_deref(), request.scan_all_lengths)? - .unwrap_or(LengthStats { - sample_size: 0, - avg_length: MEDIUM_READ_THRESHOLD, - median_length: MEDIUM_READ_THRESHOLD, - max_length: MEDIUM_READ_THRESHOLD, - }); + let length_stats = Self::inspect_input_lengths_for_streaming( + &input.primary_path, + input.secondary_path.as_deref(), + input.is_interleaved, + request.scan_all_lengths, + )? + .unwrap_or(LengthStats { + sample_size: 0, + avg_length: MEDIUM_READ_THRESHOLD, + median_length: MEDIUM_READ_THRESHOLD, + max_length: MEDIUM_READ_THRESHOLD, + }); let effective_length_class = Self::effective_length_class(request.requested_read_length_class, &length_stats); let block_size = Self::effective_block_size( @@ -459,26 +426,26 @@ impl CompressionEngine { ); // Get archive layout for paired/interleaved - let pe_layout = match &request.input { - CompressionInputTopology::InterleavedFile { archive_layout, .. } => *archive_layout, - CompressionInputTopology::Stdin { archive_layout } => archive_layout.unwrap_or(PeLayout::Interleaved), - _ => PeLayout::Interleaved, - }; - // Route to appropriate streaming handler - if let Some(path2) = input2_path { + if let Some(path2) = input.secondary_path { Self::run_streaming_paired( - &input_path, + &input.primary_path, &path2, &request, effective_length_class, block_size, - pe_layout, + input.archive_layout, + ) + } else if input.is_interleaved { + Self::run_streaming_interleaved( + &input.primary_path, + &request, + effective_length_class, + block_size, + input.archive_layout, ) - } else if is_interleaved { - Self::run_streaming_interleaved(&input_path, &request, effective_length_class, block_size, pe_layout) } else { - Self::run_streaming_single(&input_path, &request, effective_length_class, block_size) + Self::run_streaming_single(&input.primary_path, &request, effective_length_class, block_size) } } @@ -502,37 +469,21 @@ impl CompressionEngine { ))); } - // Determine input path and pairing - let (input_path, input2_path, is_paired) = match &request.input { - CompressionInputTopology::SingleFile { input_path } => { - (input_path.to_string_lossy().to_string(), None, false) - } - CompressionInputTopology::PairedFiles { - input_path_r1, - input_path_r2, - } => ( - input_path_r1.to_string_lossy().to_string(), - Some(input_path_r2.to_string_lossy().to_string()), - true, - ), - CompressionInputTopology::InterleavedFile { input_path, .. } => { - (input_path.to_string_lossy().to_string(), None, true) - } - CompressionInputTopology::Stdin { archive_layout } => { - let is_paired = archive_layout.is_some(); - ("-".to_string(), None, is_paired) - } - }; + let input = request.input.resolve(); // Inspect input lengths - let length_stats = - Self::inspect_input_lengths_for_streaming(&input_path, input2_path.as_deref(), request.scan_all_lengths)? - .unwrap_or(LengthStats { - sample_size: 0, - avg_length: 150, - median_length: 150, - max_length: 150, - }); + let length_stats = Self::inspect_input_lengths_for_streaming( + &input.primary_path, + input.secondary_path.as_deref(), + input.is_interleaved, + request.scan_all_lengths, + )? + .unwrap_or(LengthStats { + sample_size: 0, + avg_length: 150, + median_length: 150, + max_length: 150, + }); let effective_length_class = Self::effective_length_class(request.requested_read_length_class, &length_stats); let block_size = Self::effective_block_size_for_pipeline( @@ -564,12 +515,6 @@ impl CompressionEngine { ); // Get archive layout for paired/interleaved - let pe_layout = match &request.input { - CompressionInputTopology::InterleavedFile { archive_layout, .. } => *archive_layout, - CompressionInputTopology::Stdin { archive_layout } => archive_layout.unwrap_or(PeLayout::Interleaved), - _ => PeLayout::Interleaved, - }; - // Create pipeline config let effective_threads = if request.threads == 0 { std::thread::available_parallelism().map(|n| n.get()).unwrap_or(1) @@ -585,26 +530,34 @@ impl CompressionEngine { quality_mode: request.quality_mode, id_mode: request.id_mode, compression_level: request.level, - enable_reorder: !is_paired, - save_reorder_map: !is_paired, + enable_reorder: !input.is_paired, + save_reorder_map: !input.is_paired, streaming_mode: false, - pe_layout, + pe_layout: input.archive_layout, memory_limit_mb: request.memory_limit_mb, }; let mut pipeline = CompressionPipeline::new(pipeline_config); - let input_filename = std::path::Path::new(&input_path) + let input_filename = std::path::Path::new(&input.primary_path) .file_name() .and_then(|n| n.to_str()) .unwrap_or("unknown"); let output_path = request.output_path.to_string_lossy().to_string(); - if let Some(ref path2) = input2_path { - pipeline.run_paired(&input_path, path2, &output_path, input_filename, pe_layout)?; + if let Some(ref path2) = input.secondary_path { + pipeline.run_paired( + &input.primary_path, + path2, + &output_path, + input_filename, + input.archive_layout, + )?; + } else if input.is_interleaved { + pipeline.run_interleaved(&input.primary_path, &output_path, input_filename, input.archive_layout)?; } else { - pipeline.run(&input_path, &output_path, input_filename)?; + pipeline.run(&input.primary_path, &output_path, input_filename)?; } let stats = pipeline.stats(); @@ -650,16 +603,8 @@ impl CompressionEngine { // Helper methods (extracted from CompressCommand) // ============================================================================= - fn read_all_records(input_path: &str, input2_path: Option<&str>) -> Result> { - if let Some(path2) = input2_path { - let mut pe_reader = open_fastq_paired(input_path, path2)?; - let mut records = Vec::new(); - while let Some((r1, r2)) = pe_reader.next_pair()? { - records.push(r1); - records.push(r2); - } - Ok(records) - } else if input_path == "-" { + fn read_all_records(input_path: &str) -> Result> { + if input_path == "-" { let mut parser = open_fastq_stdin(); let mut records = Vec::new(); while let Some(rec) = parser.next_record()? { @@ -676,6 +621,26 @@ impl CompressionEngine { } } + fn read_all_paired_records(input_path: &str, input2_path: &str, pe_layout: PeLayout) -> Result> { + let mut pe_reader = open_fastq_paired(input_path, input2_path)?; + match pe_layout { + PeLayout::Interleaved => pe_reader.collect_all_interleaved(), + PeLayout::Consecutive => pe_reader.collect_all_consecutive(), + } + } + + fn read_all_interleaved_records(input_path: &str, pe_layout: PeLayout) -> Result> { + let mut parser = if input_path == "-" { + crate::fastq::parser::InterleavedPeParser::new(open_fastq_stdin()) + } else { + open_fastq_interleaved(input_path)? + }; + match pe_layout { + PeLayout::Interleaved => parser.collect_all_interleaved(), + PeLayout::Consecutive => parser.collect_all_consecutive(), + } + } + fn length_stats_from_records(records: &[ReadRecord]) -> LengthStats { let lengths: Vec = records.iter().map(|r| r.sequence.len()).collect(); LengthStats::from_lengths(&lengths) @@ -990,9 +955,9 @@ impl CompressionEngine { log::info!("Streaming compression mode (interleaved single-file PE)"); let mut parser = if input_path == "-" { - open_fastq_stdin() + crate::fastq::parser::InterleavedPeParser::new(open_fastq_stdin()) } else { - open_fastq(input_path)? + open_fastq_interleaved(input_path)? }; let mut writer = FqcWriter::create(request.output_path.to_str().unwrap())?; @@ -1038,17 +1003,11 @@ impl CompressionEngine { let mut output_bytes = 0u64; let mut blocks_written = 0; - let mut is_r1 = true; - while let Some(rec) = parser.next_record()? { - total_reads += 1; - total_bases += rec.sequence.len() as u64; - - if is_r1 { - r1_buf.push(rec); - } else { - r2_buf.push(rec); - } - is_r1 = !is_r1; + while let Some((r1, r2)) = parser.next_pair()? { + total_reads += 2; + total_bases += (r1.sequence.len() + r2.sequence.len()) as u64; + r1_buf.push(r1); + r2_buf.push(r2); if r1_buf.len() >= target_pairs { let block_buf = pe_layout.arrange(std::mem::take(&mut r1_buf), std::mem::take(&mut r2_buf)); @@ -1106,6 +1065,7 @@ impl CompressionEngine { fn inspect_input_lengths_for_streaming( input_path: &str, input2_path: Option<&str>, + is_interleaved: bool, scan_all: bool, ) -> Result> { if input_path == "-" { @@ -1128,6 +1088,14 @@ impl CompressionEngine { break; } } + } else if is_interleaved { + let mut parser = open_fastq_interleaved(input_path)?; + while let Some((r1, r2)) = parser.next_pair()? { + if scan_all || lengths.len() < sample_limit { + lengths.push(r1.sequence.len()); + lengths.push(r2.sequence.len()); + } + } } else { let mut parser = open_fastq(input_path)?; while let Some(record) = parser.next_record()? { diff --git a/src/commands/compression_request.rs b/src/commands/compression_request.rs index a2c12ba..8fc62bf 100644 --- a/src/commands/compression_request.rs +++ b/src/commands/compression_request.rs @@ -29,6 +29,7 @@ pub enum CompressionInputTopology { PairedFiles { input_path_r1: PathBuf, input_path_r2: PathBuf, + archive_layout: PeLayout, }, /// Interleaved FASTQ file (paired reads interleaved in one file) InterleavedFile { @@ -39,6 +40,62 @@ pub enum CompressionInputTopology { Stdin { archive_layout: Option }, } +/// Resolved input properties used by compression execution. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct CompressionInputResolution { + pub primary_path: String, + pub secondary_path: Option, + pub is_paired: bool, + pub is_interleaved: bool, + pub archive_layout: PeLayout, +} + +impl CompressionInputTopology { + /// Resolve topology-specific details once so callers do not duplicate branching. + pub fn resolve(&self) -> CompressionInputResolution { + match self { + Self::SingleFile { input_path } => CompressionInputResolution { + primary_path: input_path.to_string_lossy().to_string(), + secondary_path: None, + is_paired: false, + is_interleaved: false, + archive_layout: PeLayout::Interleaved, + }, + Self::PairedFiles { + input_path_r1, + input_path_r2, + archive_layout, + } => CompressionInputResolution { + primary_path: input_path_r1.to_string_lossy().to_string(), + secondary_path: Some(input_path_r2.to_string_lossy().to_string()), + is_paired: true, + is_interleaved: false, + archive_layout: *archive_layout, + }, + Self::InterleavedFile { + input_path, + archive_layout, + } => CompressionInputResolution { + primary_path: input_path.to_string_lossy().to_string(), + secondary_path: None, + is_paired: true, + is_interleaved: true, + archive_layout: *archive_layout, + }, + Self::Stdin { archive_layout } => { + let is_paired = archive_layout.is_some(); + CompressionInputResolution { + primary_path: "-".to_string(), + secondary_path: None, + is_paired, + is_interleaved: is_paired, + archive_layout: archive_layout.unwrap_or(PeLayout::Interleaved), + } + } + } + } +} + /// Normalized compression request. /// /// This type represents a validated, normalized compression request diff --git a/src/commands/decompress.rs b/src/commands/decompress.rs index 3a6862b..0f1722c 100644 --- a/src/commands/decompress.rs +++ b/src/commands/decompress.rs @@ -69,6 +69,7 @@ enum OutputWriters { r1: Box, r2: Box, pe_layout: PeLayout, + streaming_block_layout: bool, }, } @@ -79,13 +80,20 @@ impl OutputWriters { header_only: bool, zero_based_read_idx: u64, total_archive_reads: u64, + block_local_idx: Option<(u64, u64)>, ) -> Result { match self { Self::Single(output) => write_to_target(output.as_mut(), read, header_only), - Self::Split { r1, r2, pe_layout } => { - let to_r1 = match pe_layout { - PeLayout::Interleaved => zero_based_read_idx % 2 == 0, - PeLayout::Consecutive => zero_based_read_idx < (total_archive_reads / 2), + Self::Split { + r1, + r2, + pe_layout, + streaming_block_layout, + } => { + let to_r1 = match (pe_layout, *streaming_block_layout, block_local_idx) { + (PeLayout::Consecutive, true, Some((local_idx, block_reads))) => local_idx < (block_reads / 2), + (PeLayout::Interleaved, _, _) => zero_based_read_idx % 2 == 0, + (PeLayout::Consecutive, _, _) => zero_based_read_idx < (total_archive_reads / 2), }; if to_r1 { @@ -243,10 +251,12 @@ impl DecompressCommand { } } + let pe_layout = get_pe_layout(flags); OutputWriters::Split { r1: Box::new(BufWriter::new(File::create(&r1_path)?)), r2: Box::new(BufWriter::new(File::create(&r2_path)?)), - pe_layout: get_pe_layout(flags), + pe_layout, + streaming_block_layout: (flags & flags::STREAMING_MODE) != 0 && pe_layout == PeLayout::Consecutive, } } else if self.opts.output_path == "-" { OutputWriters::Single(Box::new(std::io::stdout())) @@ -301,9 +311,14 @@ impl DecompressCommand { } Err(e) => { if self.opts.skip_corrupted { - // Account for skipped reads so global_read_idx stays correct if let Some(entry) = reader.block_index.entries.get(block_id) { - global_read_idx += entry.read_count as u64; + self.emit_corrupted_placeholders( + &mut output, + block_id as u32, + entry.read_count, + total_archive_reads, + &mut global_read_idx, + )?; } log::warn!("Block {} failed, skipping: {}", block_id, e); self.stats.corrupted_blocks += 1; @@ -353,11 +368,19 @@ impl DecompressCommand { // Phase 1: Read block data sequentially let mut block_data_vec: Vec<(u32, BlockData)> = Vec::with_capacity(batch_end - block_start); + let mut phase1_failures: Vec<(u32, String, u32)> = Vec::new(); for block_id in block_start..batch_end { match reader.read_block(block_id as u32) { Ok(bd) => block_data_vec.push((block_id as u32, bd)), Err(e) => { if skip_corrupted { + let read_count = reader + .block_index + .entries + .get(block_id) + .map(|entry| entry.read_count) + .unwrap_or(0); + phase1_failures.push((block_id as u32, e.to_string(), read_count)); log::warn!("Block {} read failed, skipping: {}", block_id, e); self.stats.corrupted_blocks += 1; } else { @@ -369,39 +392,50 @@ impl DecompressCommand { // Phase 2: Decompress in parallel let cfg = Arc::clone(&config); - let results: Vec> = block_data_vec + let results: Vec> = block_data_vec .into_par_iter() .map(|(bid, bd)| { let mut comp = BlockCompressor::new((*cfg).clone()); match comp.decompress_block(&bd) { Ok(dec) => Ok((bid, dec)), - Err(e) => Err((bid, format!("{}", e))), + Err(e) => Err((bid, format!("{}", e), bd.header.uncompressed_count)), } }) .collect(); // Phase 3: Write results sequentially (sorted by block_id) let mut sorted: Vec<_> = results; + sorted.extend(phase1_failures.into_iter().map(Err)); sorted.sort_by_key(|r| match r { Ok((bid, _)) => *bid, - Err((bid, _)) => *bid, + Err((bid, _, _)) => *bid, }); for result in sorted { match result { Ok((_bid, decompressed)) => { - for read in &decompressed.reads { - self.emit_read(output, read, global_read_idx, total_archive_reads)?; + let block_read_count = decompressed.reads.len() as u64; + for (local_idx, read) in decompressed.reads.iter().enumerate() { + self.emit_read( + output, + read, + global_read_idx, + total_archive_reads, + Some((local_idx as u64, block_read_count)), + )?; global_read_idx += 1; } self.stats.blocks_processed += 1; } - Err((bid, msg)) => { + Err((bid, msg, read_count)) => { if skip_corrupted { - // Account for skipped reads so global_read_idx stays correct - if let Some(entry) = reader.block_index.entries.get(bid as usize) { - global_read_idx += entry.read_count as u64; - } + self.emit_corrupted_placeholders( + output, + bid, + read_count, + total_archive_reads, + &mut global_read_idx, + )?; log::warn!("Block {} decompress failed, skipping: {}", bid, msg); self.stats.corrupted_blocks += 1; } else { @@ -430,10 +464,17 @@ impl DecompressCommand { let decompressed = compressor.decompress_block(&block_data)?; - for read in &decompressed.reads { + let block_read_count = decompressed.reads.len() as u64; + for (local_idx, read) in decompressed.reads.iter().enumerate() { let read_idx = *global_read_idx; *global_read_idx += 1; - self.emit_read(output, read, read_idx, total_archive_reads)?; + self.emit_read( + output, + read, + read_idx, + total_archive_reads, + Some((local_idx as u64, block_read_count)), + )?; } Ok(()) @@ -460,7 +501,23 @@ impl DecompressCommand { let mut all_reads: Vec = Vec::with_capacity(total_reads); for block_id in 0..block_count { - let block_data = reader.read_block(block_id as u32)?; + let block_data = match reader.read_block(block_id as u32) { + Ok(block_data) => block_data, + Err(e) => { + if self.opts.skip_corrupted { + if let Some(entry) = reader.block_index.entries.get(block_id) { + all_reads.extend( + (0..entry.read_count as usize) + .map(|read_idx| self.opts.placeholder_record(block_id as u32, read_idx)), + ); + } + log::warn!("Block {} failed, skipping: {}", block_id, e); + self.stats.corrupted_blocks += 1; + continue; + } + return Err(e); + } + }; match compressor.decompress_block(&block_data) { Ok(decompressed) => { @@ -469,6 +526,12 @@ impl DecompressCommand { } Err(e) => { if self.opts.skip_corrupted { + if let Some(entry) = reader.block_index.entries.get(block_id) { + all_reads.extend( + (0..entry.read_count as usize) + .map(|read_idx| self.opts.placeholder_record(block_id as u32, read_idx)), + ); + } log::warn!("Block {} failed, skipping: {}", block_id, e); self.stats.corrupted_blocks += 1; } else { @@ -494,18 +557,42 @@ impl DecompressCommand { } let read = &all_reads[archive_id]; - self.emit_read(output, read, original_id as u64, total_archive_reads)?; + self.emit_read(output, read, original_id as u64, total_archive_reads, None)?; } Ok(()) } + fn emit_corrupted_placeholders( + &mut self, + output: &mut OutputWriters, + block_id: u32, + read_count: u32, + total_archive_reads: u64, + global_read_idx: &mut u64, + ) -> Result<()> { + for local_idx in 0..read_count as usize { + let read_idx = *global_read_idx; + *global_read_idx += 1; + let placeholder = self.opts.placeholder_record(block_id, local_idx); + self.emit_read( + output, + &placeholder, + read_idx, + total_archive_reads, + Some((local_idx as u64, read_count as u64)), + )?; + } + Ok(()) + } + fn emit_read( &mut self, output: &mut OutputWriters, read: &ReadRecord, zero_based_read_idx: u64, total_archive_reads: u64, + block_local_idx: Option<(u64, u64)>, ) -> Result<()> { let current_id = zero_based_read_idx + 1; @@ -517,8 +604,13 @@ impl DecompressCommand { return Ok(()); } - let bytes_written = - output.write_record(read, self.opts.header_only, zero_based_read_idx, total_archive_reads)?; + let bytes_written = output.write_record( + read, + self.opts.header_only, + zero_based_read_idx, + total_archive_reads, + block_local_idx, + )?; self.stats.total_reads += 1; self.stats.total_bases += read.sequence.len() as u64; self.stats.output_bytes += bytes_written; @@ -546,6 +638,7 @@ impl DecompressCommand { original_order: false, header_only: self.opts.header_only, skip_corrupted: self.opts.skip_corrupted, + corrupted_placeholder: self.opts.corrupted_placeholder.clone(), split_pe: false, ..Default::default() }; diff --git a/src/fastq/parser.rs b/src/fastq/parser.rs index c04865e..d7e5575 100644 --- a/src/fastq/parser.rs +++ b/src/fastq/parser.rs @@ -347,14 +347,12 @@ impl PairedFastqReader { pub fn next_pair(&mut self) -> Result> { match (self.r1.next_record()?, self.r2.next_record()?) { (Some(a), Some(b)) => Ok(Some((a, b))), - (Some(_), None) => { - log::warn!("R1 has more reads than R2, truncating"); - Ok(None) - } - (None, Some(_)) => { - log::warn!("R2 has more reads than R1, truncating"); - Ok(None) - } + (Some(_), None) => Err(FqcError::Parse( + "Paired FASTQ inputs have mismatched mate counts: R1 has more reads than R2".to_string(), + )), + (None, Some(_)) => Err(FqcError::Parse( + "Paired FASTQ inputs have mismatched mate counts: R2 has more reads than R1".to_string(), + )), (None, None) => Ok(None), } } diff --git a/src/format.rs b/src/format.rs index 3e14e0e..224918b 100644 --- a/src/format.rs +++ b/src/format.rs @@ -41,6 +41,7 @@ pub const CURRENT_VERSION: u8 = (FORMAT_VERSION_MAJOR << 4) | FORMAT_VERSION_MIN pub const MAGIC_HEADER_SIZE: usize = 9; pub const FILE_FOOTER_SIZE: usize = 32; +const MAX_FORWARD_COMPAT_PADDING: usize = 1 << 20; // ============================================================================= // Flag Bit Definitions @@ -109,12 +110,38 @@ pub fn get_read_length_class(f: u64) -> ReadLengthClass { ReadLengthClass::from_u8(((f & flags::READ_LENGTH_CLASS_MASK) >> flags::READ_LENGTH_CLASS_SHIFT) as u8) } +fn validate_declared_size(section: &str, declared_size: usize, minimum_size: usize, maximum_size: usize) -> Result<()> { + if declared_size < minimum_size { + return Err(FqcError::Format(format!( + "{section} header size {declared_size} < required {minimum_size}" + ))); + } + if declared_size > maximum_size { + return Err(FqcError::Format(format!( + "{section} header size {declared_size} > allowed {maximum_size}" + ))); + } + Ok(()) +} + +fn skip_extra_bytes(r: &mut R, extra: usize) -> Result<()> { + let mut remaining = extra; + let mut scratch = [0u8; 8192]; + while remaining > 0 { + let chunk = remaining.min(scratch.len()); + r.read_exact(&mut scratch[..chunk])?; + remaining -= chunk; + } + Ok(()) +} + // ============================================================================= // GlobalHeader // ============================================================================= /// Minimum size: 34 bytes pub const GLOBAL_HEADER_MIN_SIZE: usize = 34; +const MAX_GLOBAL_HEADER_SIZE: usize = GLOBAL_HEADER_MIN_SIZE + MAX_FORWARD_COMPAT_PADDING; #[derive(Debug, Clone, Default)] pub struct GlobalHeader { @@ -169,6 +196,21 @@ impl GlobalHeader { let reserved = r.read_u16::()?; let total_read_count = r.read_u64::()?; let fname_len = r.read_u16::()? as usize; + + validate_declared_size( + "GlobalHeader", + header_size as usize, + GLOBAL_HEADER_MIN_SIZE, + MAX_GLOBAL_HEADER_SIZE, + )?; + let minimum_required = GLOBAL_HEADER_MIN_SIZE + fname_len; + if (header_size as usize) < minimum_required { + return Err(FqcError::Format(format!( + "GlobalHeader header size {} < required {} for filename length {}", + header_size, minimum_required, fname_len + ))); + } + let mut fname_buf = vec![0u8; fname_len]; r.read_exact(&mut fname_buf)?; let original_filename = @@ -179,8 +221,7 @@ impl GlobalHeader { let read_so_far = GLOBAL_HEADER_MIN_SIZE + fname_len; if header_size as usize > read_so_far { let extra = header_size as usize - read_so_far; - let mut skip = vec![0u8; extra]; - r.read_exact(&mut skip)?; + skip_extra_bytes(r, extra)?; } if reserved != 0 { @@ -205,6 +246,7 @@ impl GlobalHeader { // ============================================================================= pub const BLOCK_HEADER_SIZE: usize = 104; +const MAX_BLOCK_HEADER_SIZE: usize = BLOCK_HEADER_SIZE + MAX_FORWARD_COMPAT_PADDING; #[derive(Debug, Clone, Default)] pub struct BlockHeader { @@ -280,11 +322,17 @@ impl BlockHeader { let size_qual = r.read_u64::()?; let size_aux = r.read_u64::()?; + validate_declared_size( + "BlockHeader", + header_size as usize, + BLOCK_HEADER_SIZE, + MAX_BLOCK_HEADER_SIZE, + )?; + // Skip extra bytes if header is larger if header_size as usize > BLOCK_HEADER_SIZE { let extra = header_size as usize - BLOCK_HEADER_SIZE; - let mut skip = vec![0u8; extra]; - r.read_exact(&mut skip)?; + skip_extra_bytes(r, extra)?; } if reserved1 != 0 || reserved2 != 0 { @@ -330,6 +378,7 @@ impl BlockHeader { // ============================================================================= pub const INDEX_ENTRY_SIZE: usize = 28; +const MAX_BLOCK_INDEX_ENTRY_SIZE: usize = INDEX_ENTRY_SIZE + MAX_FORWARD_COMPAT_PADDING; #[derive(Debug, Clone, Default)] pub struct IndexEntry { @@ -375,6 +424,7 @@ impl IndexEntry { // ============================================================================= pub const BLOCK_INDEX_HEADER_SIZE: usize = 16; +const MAX_BLOCK_INDEX_HEADER_SIZE: usize = BLOCK_INDEX_HEADER_SIZE + MAX_FORWARD_COMPAT_PADDING; #[derive(Debug, Clone, Default)] pub struct BlockIndex { @@ -398,27 +448,41 @@ impl BlockIndex { let entry_size = r.read_u32::()? as usize; let num_blocks = r.read_u64::()?; + validate_declared_size( + "BlockIndex", + header_size, + BLOCK_INDEX_HEADER_SIZE, + MAX_BLOCK_INDEX_HEADER_SIZE, + )?; if entry_size < INDEX_ENTRY_SIZE { return Err(FqcError::Format(format!( "BlockIndex entry size {entry_size} < required {INDEX_ENTRY_SIZE}" ))); } + if entry_size > MAX_BLOCK_INDEX_ENTRY_SIZE { + return Err(FqcError::Format(format!( + "BlockIndex entry size {entry_size} > allowed {MAX_BLOCK_INDEX_ENTRY_SIZE}" + ))); + } + if usize::try_from(num_blocks).is_err() { + return Err(FqcError::Format(format!( + "BlockIndex block count {num_blocks} does not fit in memory on this platform" + ))); + } // Skip extra header bytes if header_size > BLOCK_INDEX_HEADER_SIZE { let extra = header_size - BLOCK_INDEX_HEADER_SIZE; - let mut skip = vec![0u8; extra]; - r.read_exact(&mut skip)?; + skip_extra_bytes(r, extra)?; } - let mut entries = Vec::with_capacity(num_blocks as usize); + let mut entries = Vec::new(); for _ in 0..num_blocks { let entry = IndexEntry::read(r)?; // Skip extra entry bytes for forward compatibility if entry_size > INDEX_ENTRY_SIZE { let extra = entry_size - INDEX_ENTRY_SIZE; - let mut skip = vec![0u8; extra]; - r.read_exact(&mut skip)?; + skip_extra_bytes(r, extra)?; } entries.push(entry); } @@ -432,6 +496,7 @@ impl BlockIndex { // ============================================================================= pub const REORDER_MAP_HEADER_SIZE: usize = 32; +const MAX_REORDER_MAP_HEADER_SIZE: usize = REORDER_MAP_HEADER_SIZE + MAX_FORWARD_COMPAT_PADDING; #[derive(Debug, Clone, Default)] pub struct ReorderMapHeader { @@ -458,10 +523,15 @@ impl ReorderMapHeader { let forward_map_size = r.read_u64::()?; let reverse_map_size = r.read_u64::()?; + validate_declared_size( + "ReorderMapHeader", + header_size, + REORDER_MAP_HEADER_SIZE, + MAX_REORDER_MAP_HEADER_SIZE, + )?; if header_size > REORDER_MAP_HEADER_SIZE { let extra = header_size - REORDER_MAP_HEADER_SIZE; - let mut skip = vec![0u8; extra]; - r.read_exact(&mut skip)?; + skip_extra_bytes(r, extra)?; } Ok(Self { diff --git a/src/fqc_reader.rs b/src/fqc_reader.rs index 9e2e979..97fe0c0 100644 --- a/src/fqc_reader.rs +++ b/src/fqc_reader.rs @@ -80,10 +80,15 @@ impl FqcReader { // Read global header (after magic) reader.seek(SeekFrom::Start(MAGIC_HEADER_SIZE as u64))?; let global_header = GlobalHeader::read(&mut reader)?; + let header_end = reader.stream_position()?; + + Self::validate_footer_offsets(&footer, header_end, footer_pos)?; // Read block index reader.seek(SeekFrom::Start(footer.index_offset))?; let block_index = BlockIndex::read(&mut reader)?; + Self::validate_block_index(&block_index, header_end, &footer, global_header.total_read_count)?; + Self::validate_block_headers(&mut reader, &block_index, &footer)?; Ok(Self { path: path.to_string(), @@ -97,6 +102,166 @@ impl FqcReader { }) } + fn validate_footer_offsets(footer: &FileFooter, header_end: u64, footer_pos: u64) -> Result<()> { + if footer.index_offset < header_end || footer.index_offset >= footer_pos { + return Err(FqcError::Format(format!( + "Block index offset {} is outside archive data region", + footer.index_offset + ))); + } + + if footer.has_reorder_map() + && (footer.reorder_map_offset < header_end || footer.reorder_map_offset >= footer.index_offset) + { + return Err(FqcError::Format(format!( + "Reorder map offset {} is outside archive data region", + footer.reorder_map_offset + ))); + } + + Ok(()) + } + + fn validate_block_index( + block_index: &BlockIndex, + header_end: u64, + footer: &FileFooter, + total_read_count: u64, + ) -> Result<()> { + let data_end = if footer.has_reorder_map() { + footer.reorder_map_offset + } else { + footer.index_offset + }; + let mut previous_end = header_end; + let mut expected_archive_id_start = 0u64; + + for (idx, entry) in block_index.entries.iter().enumerate() { + if entry.compressed_size < BLOCK_HEADER_SIZE as u64 { + return Err(FqcError::Format(format!( + "Block index entry {idx} compressed size {} is smaller than block header", + entry.compressed_size + ))); + } + if entry.offset < header_end { + return Err(FqcError::Format(format!( + "Block index entry {idx} offset {} precedes block region", + entry.offset + ))); + } + if entry.offset < previous_end { + return Err(FqcError::Format(format!( + "Block index entry {idx} overlaps or reorders block data" + ))); + } + if entry.archive_id_start != expected_archive_id_start { + return Err(FqcError::Format(format!( + "Block index entry {idx} archive start {} does not match expected {}", + entry.archive_id_start, expected_archive_id_start + ))); + } + + let entry_end = entry + .offset + .checked_add(entry.compressed_size) + .ok_or_else(|| FqcError::Format(format!("Block index entry {idx} overflows archive offsets")))?; + if entry_end > data_end { + return Err(FqcError::Format(format!( + "Block index entry {idx} exceeds archive data region" + ))); + } + + previous_end = entry_end; + expected_archive_id_start = expected_archive_id_start + .checked_add(entry.read_count as u64) + .ok_or_else(|| FqcError::Format("Block index read counts overflow total read count".to_string()))?; + } + + if expected_archive_id_start != total_read_count { + return Err(FqcError::Format(format!( + "Block index total read count {} does not match global header {}", + expected_archive_id_start, total_read_count + ))); + } + + Ok(()) + } + + fn validate_block_headers( + reader: &mut BufReader, + block_index: &BlockIndex, + footer: &FileFooter, + ) -> Result<()> { + let block_region_end = if footer.has_reorder_map() { + footer.reorder_map_offset + } else { + footer.index_offset + }; + + for (idx, entry) in block_index.entries.iter().enumerate() { + reader.seek(SeekFrom::Start(entry.offset))?; + let header = BlockHeader::read(reader)?; + + if header.block_id != idx as u32 { + return Err(FqcError::Format(format!( + "Block header id {} does not match block index position {}", + header.block_id, idx + ))); + } + if header.uncompressed_count != entry.read_count { + return Err(FqcError::Format(format!( + "Block header read count {} does not match block index {} for block {}", + header.uncompressed_count, entry.read_count, idx + ))); + } + + let block_end = entry + .offset + .checked_add(entry.compressed_size) + .ok_or_else(|| FqcError::Format(format!("Block {} overflows archive offsets", idx)))?; + let payload_start = entry + .offset + .checked_add(header.header_size as u64) + .ok_or_else(|| FqcError::Format(format!("Block {} header overflows archive offsets", idx)))?; + if payload_start > block_end { + return Err(FqcError::Format(format!( + "Block {} stream extent exceeds block payload bounds", + idx + ))); + } + + let declared_stream_end = [ + header.offset_ids.checked_add(header.size_ids), + header.offset_seq.checked_add(header.size_seq), + header.offset_qual.checked_add(header.size_qual), + header.offset_aux.checked_add(header.size_aux), + ] + .into_iter() + .flatten() + .max() + .ok_or_else(|| FqcError::Format(format!("Block {} stream extent overflows block payload", idx)))?; + + if declared_stream_end > header.compressed_size { + return Err(FqcError::Format(format!( + "Block {} stream extent exceeds declared block payload", + idx + ))); + } + + let declared_block_end = payload_start + .checked_add(header.compressed_size) + .ok_or_else(|| FqcError::Format(format!("Block {} payload overflows archive offsets", idx)))?; + if declared_block_end > block_end || declared_block_end > block_region_end { + return Err(FqcError::Format(format!( + "Block {} stream extent exceeds block payload bounds", + idx + ))); + } + } + + Ok(()) + } + pub fn block_count(&self) -> usize { self.block_index.entries.len() } diff --git a/src/pipeline/compression.rs b/src/pipeline/compression.rs index c9f91e5..173f437 100644 --- a/src/pipeline/compression.rs +++ b/src/pipeline/compression.rs @@ -14,7 +14,7 @@ use crossbeam_channel::{bounded, Receiver, Sender}; use crate::algo::block_compressor::{BlockCompressor, BlockCompressorConfig, CompressedBlockData}; use crate::algo::global_analyzer::{GlobalAnalyzer, GlobalAnalyzerConfig}; use crate::error::{FqcError, Result}; -use crate::fastq::parser::{open_fastq, open_fastq_paired, open_fastq_stdin}; +use crate::fastq::parser::{open_fastq, open_fastq_interleaved, open_fastq_paired, open_fastq_stdin}; use crate::format::{build_flags, GlobalHeader}; use crate::fqc_writer::FqcWriter; use crate::types::*; @@ -485,6 +485,120 @@ impl CompressionPipeline { Ok(()) } + /// Run compression on interleaved paired-end input in a single file. + pub fn run_interleaved( + &mut self, + input_path: &str, + output_path: &str, + original_filename: &str, + pe_layout: PeLayout, + ) -> Result<()> { + self.config.validate()?; + let start = Instant::now(); + + log::info!("Reading interleaved paired-end file: {}", input_path); + + let mut parser = if input_path == "-" { + crate::fastq::parser::InterleavedPeParser::new(open_fastq_stdin()) + } else { + open_fastq_interleaved(input_path)? + }; + let all_reads = match pe_layout { + PeLayout::Interleaved => parser.collect_all_interleaved()?, + PeLayout::Consecutive => parser.collect_all_consecutive()?, + }; + + if all_reads.is_empty() { + return Err(FqcError::InvalidArgument("Input file is empty".to_string())); + } + + let total_reads = all_reads.len(); + let input_bytes: usize = all_reads + .iter() + .map(|r| r.id.len() + r.sequence.len() + r.quality.len() + 4) + .sum(); + let block_size = self.config.effective_block_size(); + let threads = self.config.effective_threads(); + + let (ordered_reads, forward_map, reverse_map) = if self.config.enable_reorder && !self.config.streaming_mode { + let ga_config = GlobalAnalyzerConfig { + reads_per_block: block_size, + ..Default::default() + }; + let sequences: Vec = all_reads.iter().map(|r| r.sequence.clone()).collect(); + let analyzer = GlobalAnalyzer::new(ga_config); + let result = analyzer.analyze(&sequences)?; + let ordered: Vec = result + .reverse_map + .iter() + .map(|&orig_idx| all_reads[orig_idx as usize].clone()) + .collect(); + (ordered, Some(result.forward_map), Some(result.reverse_map)) + } else { + (all_reads, None, None) + }; + + let flags = build_flags( + true, + !self.config.enable_reorder, + self.config.quality_mode, + self.config.id_mode, + forward_map.is_some(), + pe_layout, + self.config.read_length_class, + self.config.streaming_mode, + ); + + let compressor_config = BlockCompressorConfig { + read_length_class: self.config.read_length_class, + compression_level: self.config.compression_level, + quality_mode: self.config.quality_mode, + id_mode: self.config.id_mode, + zstd_level: BlockCompressorConfig::zstd_level_for_compression_level(self.config.compression_level), + ..Default::default() + }; + + let mut compressor = BlockCompressor::new(compressor_config); + let mut writer = FqcWriter::create(output_path)?; + + let gh = GlobalHeader::new( + flags, + total_reads as u64, + original_filename, + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d| d.as_secs()) + .unwrap_or(0), + ); + writer.write_global_header(&gh)?; + + let mut output_bytes: u64 = 0; + for (i, chunk) in ordered_reads.chunks(block_size).enumerate() { + let compressed = compressor.compress(chunk, i as u32)?; + output_bytes += compressed.total_compressed_size() as u64; + writer.write_block(&compressed)?; + } + + if let (Some(fwd), Some(rev)) = (&forward_map, &reverse_map) { + writer.write_reorder_map(fwd, rev)?; + } + + writer.finalize()?; + + let elapsed = start.elapsed(); + self.stats = PipelineStats { + total_reads: total_reads as u64, + total_blocks: total_reads.div_ceil(block_size) as u32, + input_bytes: input_bytes as u64, + output_bytes, + processing_time_ms: elapsed.as_millis() as u64, + peak_memory_bytes: 0, + threads_used: threads, + }; + + Ok(()) + } + pub fn cancel(&self) { self.control.cancel(); } diff --git a/src/pipeline/decompression.rs b/src/pipeline/decompression.rs index c1faeb8..2556e6e 100644 --- a/src/pipeline/decompression.rs +++ b/src/pipeline/decompression.rs @@ -35,6 +35,7 @@ pub struct DecompressionPipelineConfig { pub header_only: bool, pub verify_checksums: bool, pub skip_corrupted: bool, + pub corrupted_placeholder: Option, pub split_pe: bool, } @@ -49,6 +50,7 @@ impl Default for DecompressionPipelineConfig { header_only: false, verify_checksums: true, skip_corrupted: false, + corrupted_placeholder: None, split_pe: false, } } @@ -88,6 +90,39 @@ struct DecompressedResult { expected_read_count: u32, } +fn placeholder_record(placeholder_seq: &str, block_id: u32, read_idx: usize) -> crate::types::ReadRecord { + crate::types::ReadRecord { + id: format!("corrupted_block{}_read{}", block_id, read_idx), + comment: String::new(), + sequence: placeholder_seq.to_string(), + quality: "!".repeat(placeholder_seq.len()), + } +} + +fn write_output_read( + output: &mut dyn std::io::Write, + read: &crate::types::ReadRecord, + header_only: bool, +) -> Result { + if header_only { + let line = if read.comment.is_empty() { + format!("@{}\n", read.id) + } else { + format!("@{} {}\n", read.id, read.comment) + }; + output.write_all(line.as_bytes()).map_err(FqcError::Io)?; + Ok(line.len() as u64) + } else { + write_record(output, read)?; + let comment_bytes = if read.comment.is_empty() { + 0_u64 + } else { + read.comment.len() as u64 + 1 + }; + Ok(read.id.len() as u64 + comment_bytes + read.sequence.len() as u64 + read.quality.len() as u64 + 4) + } +} + // ============================================================================= // DecompressionPipeline // ============================================================================= @@ -171,6 +206,8 @@ impl DecompressionPipeline { let (result_tx, result_rx): (Sender, Receiver) = bounded(max_inflight); let control = self.control.clone(); + let skip_corrupted = self.config.skip_corrupted; + let reader_result_tx = result_tx.clone(); // ---- Reader thread ---- let reader_control = control.clone(); @@ -180,16 +217,36 @@ impl DecompressionPipeline { break; } - let block_data = reader.read_block(block_id as u32)?; + let expected_read_count = reader + .block_index + .entries + .get(block_id) + .map(|entry| entry.read_count) + .unwrap_or(0); let is_last = block_id + 1 == end_block; - - task_tx - .send(BlockTask { - block_id: block_id as u32, - block_data, - is_last, - }) - .map_err(|_| FqcError::Decompression("Reader: channel closed".to_string()))?; + match reader.read_block(block_id as u32) { + Ok(block_data) => task_tx + .send(BlockTask { + block_id: block_id as u32, + block_data, + is_last, + }) + .map_err(|_| FqcError::Decompression("Reader: channel closed".to_string()))?, + Err(e) => { + if skip_corrupted { + reader_result_tx + .send(DecompressedResult { + block_id: block_id as u32, + result: Err(e), + is_last, + expected_read_count, + }) + .map_err(|_| FqcError::Decompression("Reader: result channel closed".to_string()))?; + continue; + } + return Err(e); + } + } } Ok(()) }); @@ -236,6 +293,11 @@ impl DecompressionPipeline { let writer_control = control.clone(); let header_only = self.config.header_only; let skip_corrupted = self.config.skip_corrupted; + let corrupted_placeholder = self + .config + .corrupted_placeholder + .clone() + .unwrap_or_else(|| "N".to_string()); let range_start = self.config.range_start; let range_end = self.config.range_end; let has_range = self.config.has_range(); @@ -276,32 +338,27 @@ impl DecompressionPipeline { continue; } } - if header_only { - let line = if read.comment.is_empty() { - format!("@{}\n", read.id) - } else { - format!("@{} {}\n", read.id, read.comment) - }; - output.write_all(line.as_bytes()).map_err(FqcError::Io)?; - total_output_bytes += line.len() as u64; - } else { - write_record(output.as_mut(), read)?; - let comment_bytes = if read.comment.is_empty() { - 0 - } else { - read.comment.len() + 1 - }; - total_output_bytes += - (read.id.len() + comment_bytes + read.sequence.len() + read.quality.len() + 5) - as u64; - } + total_output_bytes += write_output_read(output.as_mut(), read, header_only)?; total_reads_written += 1; } } Err(e) => { if skip_corrupted { - // Account for skipped reads so global_read_idx stays correct - global_read_idx += dr.expected_read_count as u64; + for read_idx in 0..dr.expected_read_count as usize { + global_read_idx += 1; + if has_range { + if range_start > 0 && global_read_idx < range_start { + continue; + } + if range_end > 0 && global_read_idx > range_end { + continue; + } + } + let placeholder = placeholder_record(&corrupted_placeholder, dr.block_id, read_idx); + total_output_bytes += + write_output_read(output.as_mut(), &placeholder, header_only)?; + total_reads_written += 1; + } log::warn!("Block {} corrupted, skipping: {}", dr.block_id, e); } else { return Err(e); diff --git a/tests/test_compression_orchestration.rs b/tests/test_compression_orchestration.rs new file mode 100644 index 0000000..c56dcf7 --- /dev/null +++ b/tests/test_compression_orchestration.rs @@ -0,0 +1,180 @@ +use std::io::BufWriter; +use std::path::{Path, PathBuf}; + +use fqc::commands::compress::{CompressCommand, CompressOptions}; +use fqc::fastq::parser::write_record; +use fqc::fqc_reader::FqcReader; +use fqc::types::{PeLayout, ReadRecord}; +use tempfile::tempdir; + +fn test_data_dir() -> PathBuf { + Path::new(env!("CARGO_MANIFEST_DIR")).join("tests").join("data") +} + +fn paired_inputs() -> (String, String) { + let dir = test_data_dir(); + ( + dir.join("test_R1.fastq").to_string_lossy().to_string(), + dir.join("test_R2.fastq").to_string_lossy().to_string(), + ) +} + +fn interleaved_input() -> String { + test_data_dir() + .join("test_interleaved.fastq") + .to_string_lossy() + .to_string() +} + +fn base_paired_options(output_path: &Path) -> CompressOptions { + let (r1, r2) = paired_inputs(); + CompressOptions { + input_path: r1, + input2_path: Some(r2), + output_path: output_path.to_string_lossy().to_string(), + pe_layout: PeLayout::Consecutive, + force_overwrite: true, + show_progress: false, + threads: 1, + ..Default::default() + } +} + +fn run_and_read_info(opts: &CompressOptions) -> fqc::fqc_reader::ArchiveInfo { + let exit_code = CompressCommand::new(opts.clone()).execute(); + assert_eq!(exit_code, 0, "compression should succeed"); + FqcReader::open(&opts.output_path).unwrap().info() +} + +fn write_fastq(path: &Path, records: &[ReadRecord]) { + let file = std::fs::File::create(path).unwrap(); + let mut writer = BufWriter::new(file); + for record in records { + write_record(&mut writer, record).unwrap(); + } +} + +#[test] +fn test_archive_mode_preserves_requested_paired_layout_for_paired_files() { + let dir = tempdir().unwrap(); + let output = dir.path().join("archive-layout.fqc"); + + let opts = base_paired_options(&output); + let info = run_and_read_info(&opts); + + assert!(info.is_paired); + assert_eq!(info.pe_layout, PeLayout::Consecutive); +} + +#[test] +fn test_streaming_mode_preserves_requested_paired_layout_for_paired_files() { + let dir = tempdir().unwrap(); + let output = dir.path().join("streaming-layout.fqc"); + let mut opts = base_paired_options(&output); + opts.streaming_mode = true; + + let info = run_and_read_info(&opts); + + assert!(info.is_paired); + assert!(info.streaming_mode); + assert_eq!(info.pe_layout, PeLayout::Consecutive); +} + +#[test] +fn test_pipeline_mode_preserves_requested_paired_layout_for_paired_files() { + let dir = tempdir().unwrap(); + let output = dir.path().join("pipeline-layout.fqc"); + let mut opts = base_paired_options(&output); + opts.use_pipeline = true; + + let info = run_and_read_info(&opts); + + assert!(info.is_paired); + assert_eq!(info.pe_layout, PeLayout::Consecutive); +} + +#[test] +fn test_pipeline_mode_marks_interleaved_input_as_paired() { + let dir = tempdir().unwrap(); + let output = dir.path().join("pipeline-interleaved.fqc"); + let opts = CompressOptions { + input_path: interleaved_input(), + output_path: output.to_string_lossy().to_string(), + interleaved: true, + use_pipeline: true, + pe_layout: PeLayout::Consecutive, + force_overwrite: true, + show_progress: false, + threads: 1, + ..Default::default() + }; + + let info = run_and_read_info(&opts); + + assert!(info.is_paired); + assert_eq!(info.pe_layout, PeLayout::Consecutive); +} + +#[test] +fn test_archive_mode_rejects_mismatched_paired_files() { + let dir = tempdir().unwrap(); + let r1 = dir.path().join("mismatch_R1.fastq"); + let r2 = dir.path().join("mismatch_R2.fastq"); + let output = dir.path().join("mismatch.fqc"); + + write_fastq( + &r1, + &[ + ReadRecord::new("pair1/1".into(), "ACGT".into(), "IIII".into()), + ReadRecord::new("pair2/1".into(), "TGCA".into(), "JJJJ".into()), + ], + ); + write_fastq(&r2, &[ReadRecord::new("pair1/2".into(), "ACGT".into(), "IIII".into())]); + + let opts = CompressOptions { + input_path: r1.to_string_lossy().to_string(), + input2_path: Some(r2.to_string_lossy().to_string()), + output_path: output.to_string_lossy().to_string(), + force_overwrite: true, + show_progress: false, + threads: 1, + ..Default::default() + }; + + let exit_code = CompressCommand::new(opts).execute(); + assert_ne!(exit_code, 0, "mismatched paired inputs must fail"); +} + +#[test] +fn test_streaming_mode_rejects_odd_interleaved_input() { + let dir = tempdir().unwrap(); + let input = dir.path().join("odd-interleaved.fastq"); + let output = dir.path().join("odd-interleaved.fqc"); + + write_fastq( + &input, + &[ + ReadRecord::new("pair1/1".into(), "ACGT".into(), "IIII".into()), + ReadRecord::new("pair1/2".into(), "TGCA".into(), "JJJJ".into()), + ReadRecord::new("pair2/1".into(), "GGGG".into(), "HHHH".into()), + ], + ); + + let opts = CompressOptions { + input_path: input.to_string_lossy().to_string(), + output_path: output.to_string_lossy().to_string(), + interleaved: true, + streaming_mode: true, + force_overwrite: true, + show_progress: false, + threads: 1, + ..Default::default() + }; + + let exit_code = CompressCommand::new(opts).execute(); + assert_ne!(exit_code, 0, "odd interleaved paired input must fail"); + assert!( + !output.exists(), + "streaming interleaved validation should fail before writing an archive" + ); +} diff --git a/tests/test_e2e.rs b/tests/test_e2e.rs index 9daff8e..3907a0e 100644 --- a/tests/test_e2e.rs +++ b/tests/test_e2e.rs @@ -79,6 +79,28 @@ fn write_fastq_records(path: &str, records: &[ReadRecord]) { } } +fn corrupt_block_codec_seq(path: &str, block_id: usize, codec_seq: u8) { + let reader = FqcReader::open(path).unwrap(); + let block_offset = reader.block_index.entries[block_id].offset as usize; + drop(reader); + + let mut bytes = std::fs::read(path).unwrap(); + bytes[block_offset + 10] = codec_seq; + std::fs::write(path, bytes).unwrap(); +} + +fn patch_u64(path: &str, offset: usize, value: u64) { + let mut bytes = std::fs::read(path).unwrap(); + bytes[offset..offset + 8].copy_from_slice(&value.to_le_bytes()); + std::fs::write(path, bytes).unwrap(); +} + +fn patch_u32(path: &str, offset: usize, value: u32) { + let mut bytes = std::fs::read(path).unwrap(); + bytes[offset..offset + 4].copy_from_slice(&value.to_le_bytes()); + std::fs::write(path, bytes).unwrap(); +} + fn gzip_file(src: &str, dst: &str) { let mut input = std::fs::File::open(src).unwrap(); let output = std::fs::File::create(dst).unwrap(); @@ -461,6 +483,389 @@ fn test_e2e_compress_decompress_gzip_paired_end_roundtrip() { let _ = std::fs::remove_file(restored_r2); } +#[test] +fn test_e2e_archive_mode_consecutive_layout_split_roundtrip() { + let input_r1 = test_data_dir().join("test_R1.fastq"); + let input_r2 = test_data_dir().join("test_R2.fastq"); + let archive = TempFile::new("e2e_archive_consecutive_pe.fqc"); + let output = TempFile::new("e2e_archive_consecutive.fastq"); + + let compress_exit = CompressCommand::new(CompressOptions { + input_path: input_r1.to_string_lossy().to_string(), + input2_path: Some(input_r2.to_string_lossy().to_string()), + output_path: archive.path().to_string(), + pe_layout: PeLayout::Consecutive, + show_progress: false, + force_overwrite: true, + ..CompressOptions::default() + }) + .execute(); + assert_eq!(compress_exit, 0); + + let info = FqcReader::open(archive.path()).unwrap().info(); + assert_eq!(info.pe_layout, PeLayout::Consecutive); + + let decompress_exit = DecompressCommand::new(DecompressOptions { + input_path: archive.path().to_string(), + output_path: output.path().to_string(), + split_pe: true, + force_overwrite: true, + show_progress: false, + ..DecompressOptions::default() + }) + .execute(); + assert_eq!(decompress_exit, 0); + + let restored_r1 = format!("{}_R1.fastq", output.path().trim_end_matches(".fastq")); + let restored_r2 = format!("{}_R2.fastq", output.path().trim_end_matches(".fastq")); + + let original_r1 = read_fastq_records(input_r1.to_str().unwrap()); + let original_r2 = read_fastq_records(input_r2.to_str().unwrap()); + let restored_r1_reads = read_fastq_records(&restored_r1); + let restored_r2_reads = read_fastq_records(&restored_r2); + + assert_roundtrip_match(&original_r1, &restored_r1_reads); + assert_roundtrip_match(&original_r2, &restored_r2_reads); + + let _ = std::fs::remove_file(restored_r1); + let _ = std::fs::remove_file(restored_r2); +} + +#[test] +fn test_e2e_archive_mode_interleaved_input_consecutive_layout_split_roundtrip() { + let input = test_data_dir().join("test_interleaved.fastq"); + let archive = TempFile::new("e2e_archive_interleaved_consecutive_pe.fqc"); + let output = TempFile::new("e2e_archive_interleaved_consecutive.fastq"); + + let compress_exit = CompressCommand::new(CompressOptions { + input_path: input.to_string_lossy().to_string(), + output_path: archive.path().to_string(), + interleaved: true, + pe_layout: PeLayout::Consecutive, + show_progress: false, + force_overwrite: true, + ..CompressOptions::default() + }) + .execute(); + assert_eq!(compress_exit, 0); + + let info = FqcReader::open(archive.path()).unwrap().info(); + assert!(info.is_paired); + assert_eq!(info.pe_layout, PeLayout::Consecutive); + + let decompress_exit = DecompressCommand::new(DecompressOptions { + input_path: archive.path().to_string(), + output_path: output.path().to_string(), + split_pe: true, + force_overwrite: true, + show_progress: false, + ..DecompressOptions::default() + }) + .execute(); + assert_eq!(decompress_exit, 0); + + let restored_r1 = format!("{}_R1.fastq", output.path().trim_end_matches(".fastq")); + let restored_r2 = format!("{}_R2.fastq", output.path().trim_end_matches(".fastq")); + + let original_interleaved = read_fastq_records(input.to_str().unwrap()); + let (original_r1, original_r2) = PeLayout::Interleaved.split(original_interleaved); + let restored_r1_reads = read_fastq_records(&restored_r1); + let restored_r2_reads = read_fastq_records(&restored_r2); + + assert_roundtrip_match(&original_r1, &restored_r1_reads); + assert_roundtrip_match(&original_r2, &restored_r2_reads); + + let _ = std::fs::remove_file(restored_r1); + let _ = std::fs::remove_file(restored_r2); +} + +#[test] +fn test_e2e_streaming_consecutive_layout_split_roundtrip_multiblock() { + let r1_records = synthetic_reads(&[12, 12, 12, 12, 12, 12], "stream_r1"); + let r2_records = synthetic_reads(&[12, 12, 12, 12, 12, 12], "stream_r2"); + let input_r1 = TempFile::new("e2e_streaming_consecutive_r1.fastq"); + let input_r2 = TempFile::new("e2e_streaming_consecutive_r2.fastq"); + let archive = TempFile::new("e2e_streaming_consecutive.fqc"); + let output = TempFile::new("e2e_streaming_consecutive.fastq"); + + write_fastq_records(input_r1.path(), &r1_records); + write_fastq_records(input_r2.path(), &r2_records); + + let compress_exit = CompressCommand::new(CompressOptions { + input_path: input_r1.path().to_string(), + input2_path: Some(input_r2.path().to_string()), + output_path: archive.path().to_string(), + streaming_mode: true, + pe_layout: PeLayout::Consecutive, + block_size: 4, + show_progress: false, + force_overwrite: true, + ..CompressOptions::default() + }) + .execute(); + assert_eq!(compress_exit, 0); + + let info = FqcReader::open(archive.path()).unwrap().info(); + assert!(info.streaming_mode); + assert_eq!(info.pe_layout, PeLayout::Consecutive); + assert!(info.num_blocks > 1); + + let decompress_exit = DecompressCommand::new(DecompressOptions { + input_path: archive.path().to_string(), + output_path: output.path().to_string(), + split_pe: true, + force_overwrite: true, + show_progress: false, + ..DecompressOptions::default() + }) + .execute(); + assert_eq!(decompress_exit, 0); + + let restored_r1 = format!("{}_R1.fastq", output.path().trim_end_matches(".fastq")); + let restored_r2 = format!("{}_R2.fastq", output.path().trim_end_matches(".fastq")); + let restored_r1_reads = read_fastq_records(&restored_r1); + let restored_r2_reads = read_fastq_records(&restored_r2); + + assert_roundtrip_match(&r1_records, &restored_r1_reads); + assert_roundtrip_match(&r2_records, &restored_r2_reads); + + let _ = std::fs::remove_file(restored_r1); + let _ = std::fs::remove_file(restored_r2); +} + +#[test] +fn test_reader_rejects_footer_index_offset_past_file_end() { + let input = TempFile::new("reader_index_offset_input.fastq"); + let archive = TempFile::new("reader_index_offset_bad_footer.fqc"); + write_fastq_records(input.path(), &synthetic_reads(&[16, 16, 16], "reader_index_offset")); + + let compress_exit = CompressCommand::new(CompressOptions { + input_path: input.path().to_string(), + output_path: archive.path().to_string(), + show_progress: false, + force_overwrite: true, + ..CompressOptions::default() + }) + .execute(); + assert_eq!(compress_exit, 0); + + let mut bytes = std::fs::read(archive.path()).unwrap(); + let file_size = bytes.len() as u64; + let footer_pos = bytes.len() - FILE_FOOTER_SIZE; + bytes[footer_pos..footer_pos + 8].copy_from_slice(&(file_size + 1).to_le_bytes()); + std::fs::write(archive.path(), bytes).unwrap(); + + let err = match FqcReader::open(archive.path()) { + Ok(_) => panic!("expected malformed footer index offset to be rejected"), + Err(err) => err, + }; + assert!( + err.to_string().contains("index offset"), + "expected index-offset validation error, got {err}" + ); +} + +#[test] +fn test_reader_rejects_block_index_entry_offset_past_file_end() { + let input = TempFile::new("reader_entry_offset_input.fastq"); + let archive = TempFile::new("reader_entry_offset_bad_index.fqc"); + write_fastq_records(input.path(), &synthetic_reads(&[20, 20, 20, 20], "reader_entry_offset")); + + let compress_exit = CompressCommand::new(CompressOptions { + input_path: input.path().to_string(), + output_path: archive.path().to_string(), + show_progress: false, + force_overwrite: true, + ..CompressOptions::default() + }) + .execute(); + assert_eq!(compress_exit, 0); + + let valid_reader = FqcReader::open(archive.path()).unwrap(); + let index_offset = valid_reader.footer.index_offset as usize; + drop(valid_reader); + + let mut bytes = std::fs::read(archive.path()).unwrap(); + let file_size = bytes.len() as u64; + let entry_offset_pos = index_offset + BLOCK_INDEX_HEADER_SIZE; + bytes[entry_offset_pos..entry_offset_pos + 8].copy_from_slice(&file_size.to_le_bytes()); + std::fs::write(archive.path(), bytes).unwrap(); + + let err = match FqcReader::open(archive.path()) { + Ok(_) => panic!("expected malformed block index entry offset to be rejected"), + Err(err) => err, + }; + assert!( + err.to_string().contains("Block index entry"), + "expected block-index bounds validation error, got {err}" + ); +} + +#[test] +fn test_reader_rejects_block_header_stream_extent_past_payload() { + let input = TempFile::new("reader_bad_block_extent.fastq"); + let archive = TempFile::new("reader_bad_block_extent.fqc"); + write_fastq_records( + input.path(), + &synthetic_reads(&[20, 20, 20, 20], "reader_bad_block_extent"), + ); + + let compress_exit = CompressCommand::new(CompressOptions { + input_path: input.path().to_string(), + output_path: archive.path().to_string(), + show_progress: false, + force_overwrite: true, + ..CompressOptions::default() + }) + .execute(); + assert_eq!(compress_exit, 0); + + let reader = FqcReader::open(archive.path()).unwrap(); + let block_offset = reader.block_index.entries[0].offset as usize; + drop(reader); + + patch_u64(archive.path(), block_offset + 88, u64::MAX / 2); + + let err = match FqcReader::open(archive.path()) { + Ok(_) => panic!("expected malformed block stream extents to be rejected"), + Err(err) => err, + }; + assert!( + err.to_string().contains("stream extent"), + "expected block stream extent validation error, got {err}" + ); +} + +#[test] +fn test_reader_rejects_block_index_total_reads_mismatch() { + let input = TempFile::new("reader_bad_total_reads.fastq"); + let archive = TempFile::new("reader_bad_total_reads.fqc"); + write_fastq_records( + input.path(), + &synthetic_reads(&[20, 20, 20, 20], "reader_bad_total_reads"), + ); + + let compress_exit = CompressCommand::new(CompressOptions { + input_path: input.path().to_string(), + output_path: archive.path().to_string(), + show_progress: false, + force_overwrite: true, + ..CompressOptions::default() + }) + .execute(); + assert_eq!(compress_exit, 0); + + let reader = FqcReader::open(archive.path()).unwrap(); + let index_offset = reader.footer.index_offset as usize; + drop(reader); + + patch_u32(archive.path(), index_offset + BLOCK_INDEX_HEADER_SIZE + 24, 999); + + let err = match FqcReader::open(archive.path()) { + Ok(_) => panic!("expected block-index read-count mismatch to be rejected"), + Err(err) => err, + }; + assert!( + err.to_string().contains("total read count"), + "expected total-read-count validation error, got {err}" + ); +} + +#[test] +fn test_skip_corrupted_emits_placeholder_reads() { + let input = TempFile::new("skip_corrupted_placeholders.fastq"); + let archive = TempFile::new("skip_corrupted_placeholders.fqc"); + let output = TempFile::new("skip_corrupted_placeholders.out.fastq"); + let original = synthetic_reads(&[18, 18, 18, 18, 18, 18], "skip_corrupted"); + write_fastq_records(input.path(), &original); + + let compress_exit = CompressCommand::new(CompressOptions { + input_path: input.path().to_string(), + output_path: archive.path().to_string(), + block_size: 4, + show_progress: false, + force_overwrite: true, + ..CompressOptions::default() + }) + .execute(); + assert_eq!(compress_exit, 0); + + corrupt_block_codec_seq(archive.path(), 0, 0xFF); + + let decompress_exit = DecompressCommand::new(DecompressOptions { + input_path: archive.path().to_string(), + output_path: output.path().to_string(), + skip_corrupted: true, + corrupted_placeholder: Some("NNNN".to_string()), + show_progress: false, + force_overwrite: true, + ..DecompressOptions::default() + }) + .execute(); + assert_eq!(decompress_exit, 0); + + let restored = read_fastq_records(output.path()); + assert_eq!( + restored.len(), + original.len(), + "placeholder reads should preserve read count" + ); + for (idx, read) in restored.iter().take(4).enumerate() { + assert_eq!(read.id, format!("corrupted_block0_read{idx}")); + assert_eq!(read.sequence, "NNNN"); + assert_eq!(read.quality, "!!!!"); + } + assert_roundtrip_match(&original[4..], &restored[4..]); +} + +#[test] +fn test_pipeline_skip_corrupted_emits_placeholder_reads() { + let input = TempFile::new("pipeline_skip_corrupted_placeholders.fastq"); + let archive = TempFile::new("pipeline_skip_corrupted_placeholders.fqc"); + let output = TempFile::new("pipeline_skip_corrupted_placeholders.out.fastq"); + let original = synthetic_reads(&[18, 18, 18, 18, 18, 18], "pipeline_skip_corrupted"); + write_fastq_records(input.path(), &original); + + let compress_exit = CompressCommand::new(CompressOptions { + input_path: input.path().to_string(), + output_path: archive.path().to_string(), + block_size: 4, + show_progress: false, + force_overwrite: true, + ..CompressOptions::default() + }) + .execute(); + assert_eq!(compress_exit, 0); + + corrupt_block_codec_seq(archive.path(), 0, 0xFF); + + let decompress_exit = DecompressCommand::new(DecompressOptions { + input_path: archive.path().to_string(), + output_path: output.path().to_string(), + skip_corrupted: true, + corrupted_placeholder: Some("NNNN".to_string()), + show_progress: false, + force_overwrite: true, + use_pipeline: true, + ..DecompressOptions::default() + }) + .execute(); + assert_eq!(decompress_exit, 0); + + let restored = read_fastq_records(output.path()); + assert_eq!( + restored.len(), + original.len(), + "pipeline placeholders should preserve read count" + ); + for (idx, read) in restored.iter().take(4).enumerate() { + assert_eq!(read.id, format!("corrupted_block0_read{idx}")); + assert_eq!(read.sequence, "NNNN"); + assert_eq!(read.quality, "!!!!"); + } + assert_roundtrip_match(&original[4..], &restored[4..]); +} + #[test] fn test_e2e_scan_all_lengths_changes_length_classification() { let mut lengths = vec![150; 4096]; diff --git a/tests/test_format.rs b/tests/test_format.rs index 6a437c7..5625b1c 100644 --- a/tests/test_format.rs +++ b/tests/test_format.rs @@ -108,6 +108,48 @@ fn test_global_header_empty_filename() { assert_eq!(gh2.total_read_count, 0); } +#[test] +fn test_global_header_rejects_undersized_declared_header() { + let gh = GlobalHeader::new(0, 0, "tiny.fastq", 7); + let mut buf = Vec::new(); + gh.write(&mut buf).unwrap(); + buf[..4].copy_from_slice(&((GLOBAL_HEADER_MIN_SIZE as u32) - 1).to_le_bytes()); + + let err = GlobalHeader::read(&mut Cursor::new(&buf)).unwrap_err(); + assert!( + err.to_string().contains("header size"), + "expected header-size validation error, got {err}" + ); +} + +#[test] +fn test_global_header_rejects_oversized_declared_header() { + let gh = GlobalHeader::new(0, 0, "tiny.fastq", 7); + let mut buf = Vec::new(); + gh.write(&mut buf).unwrap(); + buf[..4].copy_from_slice(&((GLOBAL_HEADER_MIN_SIZE as u32) + 1_048_577_u32).to_le_bytes()); + + let err = GlobalHeader::read(&mut Cursor::new(&buf)).unwrap_err(); + assert!( + err.to_string().contains("header size"), + "expected header-size validation error, got {err}" + ); +} + +#[test] +fn test_global_header_rejects_filename_length_inconsistent_with_declared_size() { + let gh = GlobalHeader::new(0, 0, "tiny.fastq", 7); + let mut buf = Vec::new(); + gh.write(&mut buf).unwrap(); + buf[..4].copy_from_slice(&(GLOBAL_HEADER_MIN_SIZE as u32).to_le_bytes()); + + let err = GlobalHeader::read(&mut Cursor::new(&buf)).unwrap_err(); + assert!( + err.to_string().contains("filename length"), + "expected filename-length consistency error, got {err}" + ); +} + #[test] fn test_block_header_roundtrip() { let bh = BlockHeader { @@ -193,6 +235,42 @@ fn test_block_header_quality_discarded() { assert!(!bh2.is_quality_discarded()); } +#[test] +fn test_block_header_rejects_undersized_declared_header() { + let bh = BlockHeader { + block_id: 9, + codec_seq: encode_codec(CodecFamily::AbcV1, 0), + ..Default::default() + }; + let mut buf = Vec::new(); + bh.write(&mut buf).unwrap(); + buf[..4].copy_from_slice(&((BLOCK_HEADER_SIZE as u32) - 1).to_le_bytes()); + + let err = BlockHeader::read(&mut Cursor::new(&buf)).unwrap_err(); + assert!( + err.to_string().contains("header size"), + "expected header-size validation error, got {err}" + ); +} + +#[test] +fn test_block_header_rejects_oversized_declared_header() { + let bh = BlockHeader { + block_id: 9, + codec_seq: encode_codec(CodecFamily::AbcV1, 0), + ..Default::default() + }; + let mut buf = Vec::new(); + bh.write(&mut buf).unwrap(); + buf[..4].copy_from_slice(&((BLOCK_HEADER_SIZE as u32) + 1_048_577_u32).to_le_bytes()); + + let err = BlockHeader::read(&mut Cursor::new(&buf)).unwrap_err(); + assert!( + err.to_string().contains("header size"), + "expected header-size validation error, got {err}" + ); +} + #[test] fn test_index_entry_roundtrip() { let entry = IndexEntry { @@ -258,6 +336,34 @@ fn test_block_index_roundtrip() { assert_eq!(index2.entries[2].read_count, 50); } +#[test] +fn test_block_index_accepts_large_declared_count_until_stream_ends() { + let mut buf = Vec::new(); + buf.extend_from_slice(&(BLOCK_INDEX_HEADER_SIZE as u32).to_le_bytes()); + buf.extend_from_slice(&(INDEX_ENTRY_SIZE as u32).to_le_bytes()); + buf.extend_from_slice(&100_000_u64.to_le_bytes()); + + let err = BlockIndex::read(&mut Cursor::new(&buf)).unwrap_err(); + assert!( + err.to_string().contains("failed to fill whole buffer"), + "expected stream exhaustion error, got {err}" + ); +} + +#[test] +fn test_block_index_rejects_oversized_entry_size() { + let mut buf = Vec::new(); + buf.extend_from_slice(&(BLOCK_INDEX_HEADER_SIZE as u32).to_le_bytes()); + buf.extend_from_slice(&((INDEX_ENTRY_SIZE as u32) + 1_048_577_u32).to_le_bytes()); + buf.extend_from_slice(&1_u64.to_le_bytes()); + + let err = BlockIndex::read(&mut Cursor::new(&buf)).unwrap_err(); + assert!( + err.to_string().contains("entry size"), + "expected entry-size validation error, got {err}" + ); +} + #[test] fn test_reorder_map_header_roundtrip() { let rmh = ReorderMapHeader { diff --git a/tests/test_parser.rs b/tests/test_parser.rs index 463e4a4..1807b07 100644 --- a/tests/test_parser.rs +++ b/tests/test_parser.rs @@ -259,3 +259,16 @@ fn test_pe_id_validation_illumina_convention() { fn test_pe_id_validation_different() { assert!(!validate_pe_pair_ids("read1", "read2")); } + +#[test] +fn test_paired_reader_rejects_mismatched_mate_counts() { + let r1_data = make_fastq_data(&[("pair1/1", "AAAA", "IIII"), ("pair2/1", "CCCC", "JJJJ")]); + let r2_data = make_fastq_data(&[("pair1/2", "TTTT", "KKKK")]); + + let r1 = FastqParser::new(BufReader::new(r1_data.as_slice())); + let r2 = FastqParser::new(BufReader::new(r2_data.as_slice())); + let mut reader = PairedFastqReader::new(r1, r2); + + assert!(reader.next_pair().unwrap().is_some()); + assert!(reader.next_pair().is_err(), "mismatched mates must fail"); +}