Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions src/commands/compression_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ impl CompressionEngine {
)));
}

let mut writer = FqcWriter::create(request.output_path.to_str().unwrap())?;
let mut writer = FqcWriter::create(&request.output_path)?;

// Build flags
let flags = build_flags(
Expand Down Expand Up @@ -544,20 +544,23 @@ impl CompressionEngine {
.and_then(|n| n.to_str())
.unwrap_or("unknown");

let output_path = request.output_path.to_string_lossy().to_string();

if let Some(ref path2) = input.secondary_path {
pipeline.run_paired(
&input.primary_path,
path2,
&output_path,
&request.output_path,
input_filename,
input.archive_layout,
)?;
} else if input.is_interleaved {
pipeline.run_interleaved(&input.primary_path, &output_path, input_filename, input.archive_layout)?;
pipeline.run_interleaved(
&input.primary_path,
&request.output_path,
input_filename,
input.archive_layout,
)?;
} else {
pipeline.run(&input.primary_path, &output_path, input_filename)?;
pipeline.run(&input.primary_path, &request.output_path, input_filename)?;
}

let stats = pipeline.stats();
Expand All @@ -575,7 +578,7 @@ impl CompressionEngine {
// Build outcome
let processing_stats = ProcessingStats {
total_reads: stats.total_reads,
total_bases: 0, // pipeline doesn't track bases separately
total_bases: stats.total_bases,
input_bytes: stats.input_bytes,
output_bytes: stats.output_bytes,
blocks_written: stats.total_blocks as u64,
Expand All @@ -585,7 +588,7 @@ impl CompressionEngine {
Ok(CompressionOutcome {
mode: CompressionExecutionMode::Pipeline,
detected_read_length_class: effective_length_class,
reorder_map_written: false, // Pipeline stats don't track this separately
reorder_map_written: stats.reorder_map_written,
blocks_written: stats.total_blocks as usize,
reads_compressed: stats.total_reads,
bytes_read: stats.input_bytes,
Expand Down Expand Up @@ -737,7 +740,7 @@ impl CompressionEngine {
};

// Open writer
let mut writer = FqcWriter::create(request.output_path.to_str().unwrap())?;
let mut writer = FqcWriter::create(&request.output_path)?;

let flags = build_flags(
false,
Expand Down Expand Up @@ -843,7 +846,7 @@ impl CompressionEngine {
log::info!("Streaming compression mode (paired-end)");

let mut pe_reader = open_fastq_paired(input_path, input2_path)?;
let mut writer = FqcWriter::create(request.output_path.to_str().unwrap())?;
let mut writer = FqcWriter::create(&request.output_path)?;

let flags = build_flags(
true,
Expand Down Expand Up @@ -960,7 +963,7 @@ impl CompressionEngine {
open_fastq_interleaved(input_path)?
};

let mut writer = FqcWriter::create(request.output_path.to_str().unwrap())?;
let mut writer = FqcWriter::create(&request.output_path)?;

let flags = build_flags(
true,
Expand Down
5 changes: 4 additions & 1 deletion src/fastq/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,10 @@ impl<R: BufRead> FastqParser<R> {

let id_line = self.line_buf.trim_end();
if id_line.is_empty() {
return Ok(None);
return Err(FqcError::Parse(format!(
"Line {}: Blank line encountered where FASTQ record header was expected",
self.line_number
)));
}
if !id_line.starts_with('@') {
return Err(FqcError::Parse(format!(
Expand Down
3 changes: 2 additions & 1 deletion src/fqc_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use byteorder::LittleEndian;
use byteorder::WriteBytesExt;
use std::fs::File;
use std::io::{BufWriter, Seek, SeekFrom, Write};
use std::path::Path;
use xxhash_rust::xxh64::Xxh64;

// =============================================================================
Expand All @@ -26,7 +27,7 @@ pub struct FqcWriter {
}

impl FqcWriter {
pub fn create(path: &str) -> Result<Self> {
pub fn create(path: impl AsRef<Path>) -> Result<Self> {
let file = File::create(path).map_err(|e| FqcError::Io(e))?;
let mut writer = BufWriter::new(file);

Expand Down
75 changes: 53 additions & 22 deletions src/pipeline/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
// Uses bounded channels for backpressure control.
// =============================================================================

use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::thread;
use std::time::Instant;
Expand Down Expand Up @@ -139,12 +140,12 @@ impl CompressionPipeline {
}

/// Run compression on a single-end input file
pub fn run(&mut self, input_path: &str, output_path: &str, original_filename: &str) -> Result<()> {
pub fn run(&mut self, input_path: &str, output_path: impl AsRef<Path>, original_filename: &str) -> Result<()> {
self.config.validate()?;
let start = Instant::now();
let threads = self.config.effective_threads();
let block_size = self.config.effective_block_size();
let output_path_owned = output_path.to_string();
let output_path_owned: PathBuf = output_path.as_ref().to_path_buf();
let original_filename_owned = original_filename.to_string();

log::info!("Compression pipeline: {} threads, block_size={}", threads, block_size);
Expand All @@ -163,10 +164,12 @@ impl CompressionPipeline {
}

let total_reads = all_reads.len();
let input_bytes: usize = all_reads
.iter()
.map(|r| r.id.len() + r.sequence.len() + r.quality.len() + 4)
.sum();
let (input_bytes, total_bases) = all_reads.iter().fold((0usize, 0u64), |(bytes, bases), record| {
(
bytes + record.id.len() + record.sequence.len() + record.quality.len() + 4,
bases + record.sequence.len() as u64,
)
});

log::info!("Read {} records ({} bytes)", total_reads, input_bytes);

Expand All @@ -190,6 +193,7 @@ impl CompressionPipeline {
} else {
(all_reads, None, None)
};
let reorder_map_written = forward_map.is_some() && self.config.save_reorder_map;

// ---- Phase 2: Pipeline compression ----
let is_paired = false;
Expand All @@ -198,7 +202,7 @@ impl CompressionPipeline {
!self.config.enable_reorder,
self.config.quality_mode,
self.config.id_mode,
forward_map.is_some(),
reorder_map_written,
self.config.pe_layout,
self.config.read_length_class,
self.config.streaming_mode,
Expand Down Expand Up @@ -326,7 +330,12 @@ impl CompressionPipeline {
}

// Write reorder map if present
if let (Some(fwd), Some(rev)) = (&forward_map, &reverse_map) {
if reorder_map_written {
let (Some(fwd), Some(rev)) = (&forward_map, &reverse_map) else {
return Err(FqcError::Compression(
"Reorder map metadata missing despite reorder_map_written=true".to_string(),
));
};
writer.write_reorder_map(fwd, rev)?;
}

Expand All @@ -350,12 +359,14 @@ impl CompressionPipeline {
let elapsed = start.elapsed();
self.stats = PipelineStats {
total_reads: total_reads as u64,
total_bases,
total_blocks: num_chunks as u32,
input_bytes: input_bytes as u64,
output_bytes,
processing_time_ms: elapsed.as_millis() as u64,
peak_memory_bytes: 0,
threads_used: threads,
reorder_map_written,
};

log::info!(
Expand All @@ -378,7 +389,7 @@ impl CompressionPipeline {
&mut self,
input1_path: &str,
input2_path: &str,
output_path: &str,
output_path: impl AsRef<Path>,
original_filename: &str,
pe_layout: PeLayout,
) -> Result<()> {
Expand All @@ -399,10 +410,12 @@ impl CompressionPipeline {

// Store reads, then run pipeline (reuse single-end logic for Phase 2)
let total_reads = all_reads.len();
let input_bytes: usize = all_reads
.iter()
.map(|r| r.id.len() + r.sequence.len() + r.quality.len() + 4)
.sum();
let (input_bytes, total_bases) = all_reads.iter().fold((0usize, 0u64), |(bytes, bases), record| {
(
bytes + record.id.len() + record.sequence.len() + record.quality.len() + 4,
bases + record.sequence.len() as u64,
)
});
let block_size = self.config.effective_block_size();
let threads = self.config.effective_threads();

Expand All @@ -423,13 +436,14 @@ impl CompressionPipeline {
} else {
(all_reads, None, None)
};
let reorder_map_written = forward_map.is_some() && self.config.save_reorder_map;

let flags = build_flags(
true,
!self.config.enable_reorder,
self.config.quality_mode,
self.config.id_mode,
forward_map.is_some(),
reorder_map_written,
pe_layout,
self.config.read_length_class,
self.config.streaming_mode,
Expand Down Expand Up @@ -465,7 +479,12 @@ impl CompressionPipeline {
writer.write_block(&compressed)?;
}

if let (Some(fwd), Some(rev)) = (&forward_map, &reverse_map) {
if reorder_map_written {
let (Some(fwd), Some(rev)) = (&forward_map, &reverse_map) else {
return Err(FqcError::Compression(
"Reorder map metadata missing despite reorder_map_written=true".to_string(),
));
};
writer.write_reorder_map(fwd, rev)?;
}

Expand All @@ -474,12 +493,14 @@ impl CompressionPipeline {
let elapsed = start.elapsed();
self.stats = PipelineStats {
total_reads: total_reads as u64,
total_bases,
total_blocks: total_reads.div_ceil(block_size) as u32,
input_bytes: input_bytes as u64,
output_bytes,
processing_time_ms: elapsed.as_millis() as u64,
peak_memory_bytes: 0,
threads_used: threads,
reorder_map_written,
};

Ok(())
Expand All @@ -489,7 +510,7 @@ impl CompressionPipeline {
pub fn run_interleaved(
&mut self,
input_path: &str,
output_path: &str,
output_path: impl AsRef<Path>,
original_filename: &str,
pe_layout: PeLayout,
) -> Result<()> {
Expand All @@ -513,10 +534,12 @@ impl CompressionPipeline {
}

let total_reads = all_reads.len();
let input_bytes: usize = all_reads
.iter()
.map(|r| r.id.len() + r.sequence.len() + r.quality.len() + 4)
.sum();
let (input_bytes, total_bases) = all_reads.iter().fold((0usize, 0u64), |(bytes, bases), record| {
(
bytes + record.id.len() + record.sequence.len() + record.quality.len() + 4,
bases + record.sequence.len() as u64,
)
});
let block_size = self.config.effective_block_size();
let threads = self.config.effective_threads();

Expand All @@ -537,13 +560,14 @@ impl CompressionPipeline {
} else {
(all_reads, None, None)
};
let reorder_map_written = forward_map.is_some() && self.config.save_reorder_map;

let flags = build_flags(
true,
!self.config.enable_reorder,
self.config.quality_mode,
self.config.id_mode,
forward_map.is_some(),
reorder_map_written,
pe_layout,
self.config.read_length_class,
self.config.streaming_mode,
Expand Down Expand Up @@ -579,7 +603,12 @@ impl CompressionPipeline {
writer.write_block(&compressed)?;
}

if let (Some(fwd), Some(rev)) = (&forward_map, &reverse_map) {
if reorder_map_written {
let (Some(fwd), Some(rev)) = (&forward_map, &reverse_map) else {
return Err(FqcError::Compression(
"Reorder map metadata missing despite reorder_map_written=true".to_string(),
));
};
writer.write_reorder_map(fwd, rev)?;
}

Expand All @@ -588,12 +617,14 @@ impl CompressionPipeline {
let elapsed = start.elapsed();
self.stats = PipelineStats {
total_reads: total_reads as u64,
total_bases,
total_blocks: total_reads.div_ceil(block_size) as u32,
input_bytes: input_bytes as u64,
output_bytes,
processing_time_ms: elapsed.as_millis() as u64,
peak_memory_bytes: 0,
threads_used: threads,
reorder_map_written,
};

Ok(())
Expand Down
2 changes: 2 additions & 0 deletions src/pipeline/decompression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,12 +388,14 @@ impl DecompressionPipeline {
let elapsed = start.elapsed();
self.stats = PipelineStats {
total_reads: reads_written,
total_bases: 0,
total_blocks: (end_block - start_block) as u32,
input_bytes: file_size,
output_bytes,
processing_time_ms: elapsed.as_millis() as u64,
peak_memory_bytes: 0,
threads_used: threads,
reorder_map_written: false,
};

log::info!(
Expand Down
6 changes: 4 additions & 2 deletions src/pipeline/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,14 @@ pub const MAX_BLOCK_SIZE: usize = 1_000_000;
#[derive(Debug, Clone, Default)]
pub struct PipelineStats {
pub total_reads: u64,
pub total_bases: u64,
pub total_blocks: u32,
pub input_bytes: u64,
pub output_bytes: u64,
pub processing_time_ms: u64,
pub peak_memory_bytes: usize,
pub threads_used: usize,
pub reorder_map_written: bool,
}

impl PipelineStats {
Expand All @@ -66,10 +68,10 @@ impl PipelineStats {
}

pub fn bits_per_base(&self) -> f64 {
if self.input_bytes == 0 {
if self.total_bases == 0 {
return 0.0;
}
(self.output_bytes as f64 * 8.0) / (self.input_bytes as f64 * 0.5)
(self.output_bytes as f64 * 8.0) / self.total_bases as f64
}

pub fn throughput_mbps(&self) -> f64 {
Expand Down
Loading
Loading