Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/commands/compress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,7 @@ impl CompressOptions {
CompressionInputTopology::PairedFiles {
input_path_r1: PathBuf::from(&self.input_path),
input_path_r2: PathBuf::from(input2_path),
archive_layout: self.pe_layout,
}
} else if self.interleaved {
// Interleaved file
Expand Down
244 changes: 106 additions & 138 deletions src/commands/compression_engine.rs

Large diffs are not rendered by default.

57 changes: 57 additions & 0 deletions src/commands/compression_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub enum CompressionInputTopology {
PairedFiles {
input_path_r1: PathBuf,
input_path_r2: PathBuf,
archive_layout: PeLayout,
},
/// Interleaved FASTQ file (paired reads interleaved in one file)
InterleavedFile {
Expand All @@ -39,6 +40,62 @@ pub enum CompressionInputTopology {
Stdin { archive_layout: Option<PeLayout> },
}

/// Resolved input properties used by compression execution.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CompressionInputResolution {
pub primary_path: String,
pub secondary_path: Option<String>,
pub is_paired: bool,
pub is_interleaved: bool,
pub archive_layout: PeLayout,
}

impl CompressionInputTopology {
/// Resolve topology-specific details once so callers do not duplicate branching.
pub fn resolve(&self) -> CompressionInputResolution {
match self {
Self::SingleFile { input_path } => CompressionInputResolution {
primary_path: input_path.to_string_lossy().to_string(),
secondary_path: None,
is_paired: false,
is_interleaved: false,
archive_layout: PeLayout::Interleaved,
},
Self::PairedFiles {
input_path_r1,
input_path_r2,
archive_layout,
} => CompressionInputResolution {
primary_path: input_path_r1.to_string_lossy().to_string(),
secondary_path: Some(input_path_r2.to_string_lossy().to_string()),
is_paired: true,
is_interleaved: false,
archive_layout: *archive_layout,
},
Self::InterleavedFile {
input_path,
archive_layout,
} => CompressionInputResolution {
primary_path: input_path.to_string_lossy().to_string(),
secondary_path: None,
is_paired: true,
is_interleaved: true,
archive_layout: *archive_layout,
},
Self::Stdin { archive_layout } => {
let is_paired = archive_layout.is_some();
CompressionInputResolution {
primary_path: "-".to_string(),
secondary_path: None,
is_paired,
is_interleaved: is_paired,
archive_layout: archive_layout.unwrap_or(PeLayout::Interleaved),
}
}
}
}
}

/// Normalized compression request.
///
/// This type represents a validated, normalized compression request
Expand Down
139 changes: 116 additions & 23 deletions src/commands/decompress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ enum OutputWriters {
r1: Box<dyn Write>,
r2: Box<dyn Write>,
pe_layout: PeLayout,
streaming_block_layout: bool,
},
}

Expand All @@ -79,13 +80,20 @@ impl OutputWriters {
header_only: bool,
zero_based_read_idx: u64,
total_archive_reads: u64,
block_local_idx: Option<(u64, u64)>,
) -> Result<u64> {
match self {
Self::Single(output) => write_to_target(output.as_mut(), read, header_only),
Self::Split { r1, r2, pe_layout } => {
let to_r1 = match pe_layout {
PeLayout::Interleaved => zero_based_read_idx % 2 == 0,
PeLayout::Consecutive => zero_based_read_idx < (total_archive_reads / 2),
Self::Split {
r1,
r2,
pe_layout,
streaming_block_layout,
} => {
let to_r1 = match (pe_layout, *streaming_block_layout, block_local_idx) {
(PeLayout::Consecutive, true, Some((local_idx, block_reads))) => local_idx < (block_reads / 2),
(PeLayout::Interleaved, _, _) => zero_based_read_idx % 2 == 0,
(PeLayout::Consecutive, _, _) => zero_based_read_idx < (total_archive_reads / 2),
};

if to_r1 {
Expand Down Expand Up @@ -243,10 +251,12 @@ impl DecompressCommand {
}
}

let pe_layout = get_pe_layout(flags);
OutputWriters::Split {
r1: Box::new(BufWriter::new(File::create(&r1_path)?)),
r2: Box::new(BufWriter::new(File::create(&r2_path)?)),
pe_layout: get_pe_layout(flags),
pe_layout,
streaming_block_layout: (flags & flags::STREAMING_MODE) != 0 && pe_layout == PeLayout::Consecutive,
}
} else if self.opts.output_path == "-" {
OutputWriters::Single(Box::new(std::io::stdout()))
Expand Down Expand Up @@ -301,9 +311,14 @@ impl DecompressCommand {
}
Err(e) => {
if self.opts.skip_corrupted {
// Account for skipped reads so global_read_idx stays correct
if let Some(entry) = reader.block_index.entries.get(block_id) {
global_read_idx += entry.read_count as u64;
self.emit_corrupted_placeholders(
&mut output,
block_id as u32,
entry.read_count,
total_archive_reads,
&mut global_read_idx,
)?;
}
log::warn!("Block {} failed, skipping: {}", block_id, e);
self.stats.corrupted_blocks += 1;
Expand Down Expand Up @@ -353,11 +368,19 @@ impl DecompressCommand {

// Phase 1: Read block data sequentially
let mut block_data_vec: Vec<(u32, BlockData)> = Vec::with_capacity(batch_end - block_start);
let mut phase1_failures: Vec<(u32, String, u32)> = Vec::new();
for block_id in block_start..batch_end {
match reader.read_block(block_id as u32) {
Ok(bd) => block_data_vec.push((block_id as u32, bd)),
Err(e) => {
if skip_corrupted {
let read_count = reader
.block_index
.entries
.get(block_id)
.map(|entry| entry.read_count)
.unwrap_or(0);
phase1_failures.push((block_id as u32, e.to_string(), read_count));
log::warn!("Block {} read failed, skipping: {}", block_id, e);
self.stats.corrupted_blocks += 1;
} else {
Expand All @@ -369,39 +392,50 @@ impl DecompressCommand {

// Phase 2: Decompress in parallel
let cfg = Arc::clone(&config);
let results: Vec<std::result::Result<(u32, DecompressedBlockData), (u32, String)>> = block_data_vec
let results: Vec<std::result::Result<(u32, DecompressedBlockData), (u32, String, u32)>> = block_data_vec
.into_par_iter()
.map(|(bid, bd)| {
let mut comp = BlockCompressor::new((*cfg).clone());
match comp.decompress_block(&bd) {
Ok(dec) => Ok((bid, dec)),
Err(e) => Err((bid, format!("{}", e))),
Err(e) => Err((bid, format!("{}", e), bd.header.uncompressed_count)),
}
})
.collect();

// Phase 3: Write results sequentially (sorted by block_id)
let mut sorted: Vec<_> = results;
sorted.extend(phase1_failures.into_iter().map(Err));
sorted.sort_by_key(|r| match r {
Ok((bid, _)) => *bid,
Err((bid, _)) => *bid,
Err((bid, _, _)) => *bid,
});

for result in sorted {
match result {
Ok((_bid, decompressed)) => {
for read in &decompressed.reads {
self.emit_read(output, read, global_read_idx, total_archive_reads)?;
let block_read_count = decompressed.reads.len() as u64;
for (local_idx, read) in decompressed.reads.iter().enumerate() {
self.emit_read(
output,
read,
global_read_idx,
total_archive_reads,
Some((local_idx as u64, block_read_count)),
)?;
global_read_idx += 1;
}
self.stats.blocks_processed += 1;
}
Err((bid, msg)) => {
Err((bid, msg, read_count)) => {
if skip_corrupted {
// Account for skipped reads so global_read_idx stays correct
if let Some(entry) = reader.block_index.entries.get(bid as usize) {
global_read_idx += entry.read_count as u64;
}
self.emit_corrupted_placeholders(
output,
bid,
read_count,
total_archive_reads,
&mut global_read_idx,
)?;
log::warn!("Block {} decompress failed, skipping: {}", bid, msg);
self.stats.corrupted_blocks += 1;
} else {
Expand Down Expand Up @@ -430,10 +464,17 @@ impl DecompressCommand {

let decompressed = compressor.decompress_block(&block_data)?;

for read in &decompressed.reads {
let block_read_count = decompressed.reads.len() as u64;
for (local_idx, read) in decompressed.reads.iter().enumerate() {
let read_idx = *global_read_idx;
*global_read_idx += 1;
self.emit_read(output, read, read_idx, total_archive_reads)?;
self.emit_read(
output,
read,
read_idx,
total_archive_reads,
Some((local_idx as u64, block_read_count)),
)?;
}

Ok(())
Expand All @@ -460,7 +501,23 @@ impl DecompressCommand {
let mut all_reads: Vec<ReadRecord> = Vec::with_capacity(total_reads);

for block_id in 0..block_count {
let block_data = reader.read_block(block_id as u32)?;
let block_data = match reader.read_block(block_id as u32) {
Ok(block_data) => block_data,
Err(e) => {
if self.opts.skip_corrupted {
if let Some(entry) = reader.block_index.entries.get(block_id) {
all_reads.extend(
(0..entry.read_count as usize)
.map(|read_idx| self.opts.placeholder_record(block_id as u32, read_idx)),
);
}
log::warn!("Block {} failed, skipping: {}", block_id, e);
self.stats.corrupted_blocks += 1;
continue;
}
return Err(e);
}
};

match compressor.decompress_block(&block_data) {
Ok(decompressed) => {
Expand All @@ -469,6 +526,12 @@ impl DecompressCommand {
}
Err(e) => {
if self.opts.skip_corrupted {
if let Some(entry) = reader.block_index.entries.get(block_id) {
all_reads.extend(
(0..entry.read_count as usize)
.map(|read_idx| self.opts.placeholder_record(block_id as u32, read_idx)),
);
}
log::warn!("Block {} failed, skipping: {}", block_id, e);
self.stats.corrupted_blocks += 1;
} else {
Expand All @@ -494,18 +557,42 @@ impl DecompressCommand {
}

let read = &all_reads[archive_id];
self.emit_read(output, read, original_id as u64, total_archive_reads)?;
self.emit_read(output, read, original_id as u64, total_archive_reads, None)?;
}

Ok(())
}

fn emit_corrupted_placeholders(
&mut self,
output: &mut OutputWriters,
block_id: u32,
read_count: u32,
total_archive_reads: u64,
global_read_idx: &mut u64,
) -> Result<()> {
for local_idx in 0..read_count as usize {
let read_idx = *global_read_idx;
*global_read_idx += 1;
let placeholder = self.opts.placeholder_record(block_id, local_idx);
self.emit_read(
output,
&placeholder,
read_idx,
total_archive_reads,
Some((local_idx as u64, read_count as u64)),
)?;
}
Ok(())
}

fn emit_read(
&mut self,
output: &mut OutputWriters,
read: &ReadRecord,
zero_based_read_idx: u64,
total_archive_reads: u64,
block_local_idx: Option<(u64, u64)>,
) -> Result<()> {
let current_id = zero_based_read_idx + 1;

Expand All @@ -517,8 +604,13 @@ impl DecompressCommand {
return Ok(());
}

let bytes_written =
output.write_record(read, self.opts.header_only, zero_based_read_idx, total_archive_reads)?;
let bytes_written = output.write_record(
read,
self.opts.header_only,
zero_based_read_idx,
total_archive_reads,
block_local_idx,
)?;
self.stats.total_reads += 1;
self.stats.total_bases += read.sequence.len() as u64;
self.stats.output_bytes += bytes_written;
Expand Down Expand Up @@ -546,6 +638,7 @@ impl DecompressCommand {
original_order: false,
header_only: self.opts.header_only,
skip_corrupted: self.opts.skip_corrupted,
corrupted_placeholder: self.opts.corrupted_placeholder.clone(),
split_pe: false,
..Default::default()
};
Expand Down
14 changes: 6 additions & 8 deletions src/fastq/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -347,14 +347,12 @@ impl<R1: BufRead, R2: BufRead> PairedFastqReader<R1, R2> {
pub fn next_pair(&mut self) -> Result<Option<(ReadRecord, ReadRecord)>> {
match (self.r1.next_record()?, self.r2.next_record()?) {
(Some(a), Some(b)) => Ok(Some((a, b))),
(Some(_), None) => {
log::warn!("R1 has more reads than R2, truncating");
Ok(None)
}
(None, Some(_)) => {
log::warn!("R2 has more reads than R1, truncating");
Ok(None)
}
(Some(_), None) => Err(FqcError::Parse(
"Paired FASTQ inputs have mismatched mate counts: R1 has more reads than R2".to_string(),
)),
(None, Some(_)) => Err(FqcError::Parse(
"Paired FASTQ inputs have mismatched mate counts: R2 has more reads than R1".to_string(),
)),
(None, None) => Ok(None),
}
}
Expand Down
Loading
Loading