diff --git a/bin/prepare_fastqs.py b/bin/prepare_fastqs.py index dbe7f15..15fc27b 100755 --- a/bin/prepare_fastqs.py +++ b/bin/prepare_fastqs.py @@ -145,7 +145,7 @@ def process_sample( dest = os.path.join(sample_dir, f"{sample}_R1.fastq.{dest_compression}") transfer_file(fastqs[0], dest, remote_input=remote_input) - yield sample, False + yield sample, False, 1 elif fastqs: @@ -185,9 +185,7 @@ def rx_filter(s, x=1): return re.search(r"[._R]" + str(x) + r"$", s) and not re.search(r"(orphan|single)s?", s) # partition fastqs into R1, R2, and 'other' sets - # r1 = [(p, f) for p, f in zip(prefixes, fastqs) if re.search(r"[._R]1$", p)] r1 = [(p, f) for p, f in zip(prefixes, fastqs) if rx_filter(p, x=1)] - # r2 = [(p, f) for p, f in zip(prefixes, fastqs) if re.search(r"[._R]2$", p)] r2 = [(p, f) for p, f in zip(prefixes, fastqs) if rx_filter(p, x=2)] others = sorted(list(set(fastqs).difference({f for _, f in r1}).difference({f for _, f in r2}))) @@ -228,18 +226,21 @@ def rx_filter(s, x=1): pathlib.Path(sample_dir).mkdir(parents=True, exist_ok=True) + n_parts = 0 if r1: # if R1 is not empty, transfer R1-files + n_parts += 1 dest = os.path.join(sample_dir, f"{sample}_R1.fastq.{dest_compression}") transfer_multifiles(r1, dest, remote_input=remote_input, compression=compression) if r2: # if R2 is not empty, transfer R2-files, # if R1 is empty, rename R2 to R1 so that files can be processed as normal single-end + n_parts += 1 target_r = "R2" if r1 else "R1" dest = os.path.join(sample_dir, f"{sample}_{target_r}.fastq.{dest_compression}") transfer_multifiles(r2, dest, remote_input=remote_input, compression=compression) - yield sample, bool(r1 and r2) + yield sample, bool(r1 and r2), bool(r1 or r2) + bool(others) if others: # if single-end reads exist, @@ -252,7 +253,7 @@ def rx_filter(s, x=1): dest = os.path.join(sample_dir, f"{sample}_R1.fastq.{dest_compression}") transfer_multifiles(others, dest, remote_input=remote_input, compression=compression) - yield sample, bool(r1 or r2) + yield sample, bool(r1 or r2), bool(r1 or r2) + bool(others) def is_fastq(f, valid_fastq_suffixes, valid_compression_suffixes): @@ -309,6 +310,7 @@ def main(): ap.add_argument("--valid-fastq-suffixes", type=str, default="fastq,fq") ap.add_argument("--valid-compression-suffixes", type=str, default="gz,bz2") ap.add_argument("--add_sample_suffix", type=str) + ap.add_argument("--override_pair_check", action="store_true") args = ap.parse_args() @@ -370,8 +372,8 @@ def collect_fastq_files(input_dir, valid_fastq_suffixes, valid_compression_suffi except Exception as e: raise ValueError(f"Encountered problems processing sample '{sample}': {e}.\nPlease check your file names.") else: - for sample, is_paired in renamed: - print(sample, int(is_paired), sep="\t", file=lib_out) + for sample, is_paired, n_parts in renamed: + print(sample, int(is_paired), n_parts, sep="\t", file=lib_out) if __name__ == "__main__": main() diff --git a/nevermore/bin/prepare_fastqs.py b/nevermore/bin/prepare_fastqs.py index 6b5af35..15fc27b 100755 --- a/nevermore/bin/prepare_fastqs.py +++ b/nevermore/bin/prepare_fastqs.py @@ -22,6 +22,19 @@ def check_pairwise(r1, r2): + d = {} + for prefix, fn in itertools.chain(r1, r2): + d.setdefault(prefix[:-1], []).append((prefix, fn)) + for prefix, reads in d.items(): + if len(reads) == 2: + yield reads[0][1], reads[1][1] + elif len(reads) == 1: + yield (None, reads[0][1]) if reads[0][0][-1] == "2" else (reads[0][1], None) + else: + raise ValueError(f"Weird number of reads found: {reads}") + + +def check_pairwise_old(r1, r2): """ Checks if two sets of read files contain the same prefixes. Input: @@ -80,11 +93,13 @@ def transfer_multifiles(files, dest, remote_input=False, compression=None): if compression in ("gz", "bz2"): # multiple compressed files can just be concatenated logging.debug('transfer_multifiles: compression=%s, remote_input=%s, action=concatenate', compression, remote_input) + logging.debug(' cmd: %s', ' '.join(cat_cmd)) with open(dest, "wt") as _out: subprocess.run(cat_cmd, stdout=_out) else: # multiple uncompressed files will be cat | gzipped logging.debug('transfer_multifiles: compression=%s, remote_input=%s, action=concatenate+gzip', compression, remote_input) + logging.debug(' cmd: %s', ' | '.join((' '.join(cat_cmd), "gzip -c -"))) cat_pr = subprocess.Popen(cat_cmd, stdout=subprocess.PIPE) with open(dest, "wt") as _out: subprocess.run(("gzip", "-c", "-"), stdin=cat_pr.stdout, stdout=_out) @@ -130,7 +145,7 @@ def process_sample( dest = os.path.join(sample_dir, f"{sample}_R1.fastq.{dest_compression}") transfer_file(fastqs[0], dest, remote_input=remote_input) - yield sample, False + yield sample, False, 1 elif fastqs: @@ -166,23 +181,37 @@ def process_sample( print("PRE", prefixes, file=sys.stderr) + def rx_filter(s, x=1): + return re.search(r"[._R]" + str(x) + r"$", s) and not re.search(r"(orphan|single)s?", s) + # partition fastqs into R1, R2, and 'other' sets - r1 = [(p, f) for p, f in zip(prefixes, fastqs) if re.search(r"[._R]1$", p)] - r2 = [(p, f) for p, f in zip(prefixes, fastqs) if re.search(r"[._R]2$", p)] + r1 = [(p, f) for p, f in zip(prefixes, fastqs) if rx_filter(p, x=1)] + r2 = [(p, f) for p, f in zip(prefixes, fastqs) if rx_filter(p, x=2)] others = sorted(list(set(fastqs).difference({f for _, f in r1}).difference({f for _, f in r2}))) - # check if R1/R2 sets have equal sizes or are empty - # R1 empty: potential scRNAseq (or any protocol with barcode reads in R1) - # R2 empty: typical single end reads with (R?)1 suffix - assert len(r2) == 0 or len(r1) == 0 or (r1 and len(r1) == len(r2)), "R1/R2 sets are not of the same length" + if False: + # check if R1/R2 sets have equal sizes or are empty + # R1 empty: potential scRNAseq (or any protocol with barcode reads in R1) + # R2 empty: typical single end reads with (R?)1 suffix + assert len(r2) == 0 or len(r1) == 0 or (r1 and len(r1) == len(r2)), "R1/R2 sets are not of the same length" + + # if R1 and R2 are of equal size, check if the prefixes match + if len(r1) == len(r2) and r1: + check_pairwise(r1, r2) - # if R1 and R2 are of equal size, check if the prefixes match - if len(r1) == len(r2) and r1: - check_pairwise(r1, r2) + # sort R1/R2 for concatenation, get rid off prefixes + r1 = sorted(f for _, f in r1) + r2 = sorted(f for _, f in r2) + else: + reads = list(check_pairwise(r1, r2)) + if reads: + others += [ + r1 or r2 + for r1, r2 in reads + if r1 is None or r2 is None + ] + r1, r2 = zip(*((f1, f2) for f1, f2 in reads if f1 and f2)) - # sort R1/R2 for concatenation, get rid off prefixes - r1 = sorted(f for _, f in r1) - r2 = sorted(f for _, f in r2) print("R1", r1, file=sys.stderr) print("R2", r2, file=sys.stderr) @@ -197,18 +226,21 @@ def process_sample( pathlib.Path(sample_dir).mkdir(parents=True, exist_ok=True) + n_parts = 0 if r1: # if R1 is not empty, transfer R1-files + n_parts += 1 dest = os.path.join(sample_dir, f"{sample}_R1.fastq.{dest_compression}") transfer_multifiles(r1, dest, remote_input=remote_input, compression=compression) if r2: # if R2 is not empty, transfer R2-files, # if R1 is empty, rename R2 to R1 so that files can be processed as normal single-end + n_parts += 1 target_r = "R2" if r1 else "R1" dest = os.path.join(sample_dir, f"{sample}_{target_r}.fastq.{dest_compression}") transfer_multifiles(r2, dest, remote_input=remote_input, compression=compression) - yield sample, bool(r1 and r2) + yield sample, bool(r1 and r2), bool(r1 or r2) + bool(others) if others: # if single-end reads exist, @@ -221,7 +253,7 @@ def process_sample( dest = os.path.join(sample_dir, f"{sample}_R1.fastq.{dest_compression}") transfer_multifiles(others, dest, remote_input=remote_input, compression=compression) - yield sample, bool(r1 or r2) + yield sample, bool(r1 or r2), bool(r1 or r2) + bool(others) def is_fastq(f, valid_fastq_suffixes, valid_compression_suffixes): @@ -278,6 +310,7 @@ def main(): ap.add_argument("--valid-fastq-suffixes", type=str, default="fastq,fq") ap.add_argument("--valid-compression-suffixes", type=str, default="gz,bz2") ap.add_argument("--add_sample_suffix", type=str) + ap.add_argument("--override_pair_check", action="store_true") args = ap.parse_args() @@ -339,8 +372,8 @@ def collect_fastq_files(input_dir, valid_fastq_suffixes, valid_compression_suffi except Exception as e: raise ValueError(f"Encountered problems processing sample '{sample}': {e}.\nPlease check your file names.") else: - for sample, is_paired in renamed: - print(sample, int(is_paired), sep="\t", file=lib_out) + for sample, is_paired, n_parts in renamed: + print(sample, int(is_paired), n_parts, sep="\t", file=lib_out) if __name__ == "__main__": main() diff --git a/nevermore/modules/align/bowtie2.nf b/nevermore/modules/align/bowtie2.nf index 9b51129..d02b85a 100644 --- a/nevermore/modules/align/bowtie2.nf +++ b/nevermore/modules/align/bowtie2.nf @@ -1,5 +1,5 @@ process bowtie2_build { - container "quay.io/biocontainers/bowtie2:2.5.3--py39h6fed5c7_1" + container "registry.git.embl.org/schudoma/bowtie2-docker:latest" tag "${sample.id}" input: @@ -23,8 +23,7 @@ process bowtie2_build { process bowtie2_align { - // container "quay.io/biocontainers/bowtie2:2.5.3--py39h6fed5c7_1" - container "registry.git.embl.de/schudoma/bowtie2-docker:latest" + container "registry.git.embl.org/schudoma/bowtie2-docker:latest" tag "${sample.id}" input: diff --git a/nevermore/modules/align/bwa.nf b/nevermore/modules/align/bwa.nf index 1e23e22..e51637c 100644 --- a/nevermore/modules/align/bwa.nf +++ b/nevermore/modules/align/bwa.nf @@ -1,5 +1,8 @@ process bwa_mem_align { - container "registry.git.embl.de/schudoma/align-docker:latest" + cpus 4 + memory { 20.GB * task.attempt } + time { 3.d * task.attempt } + container "registry.git.embl.org/schudoma/align-docker:latest" label 'align' input: @@ -12,10 +15,17 @@ process bwa_mem_align { script: def maxmem = task.memory.toGiga() - def align_cpus = 4 // figure out the groovy division garbage later (task.cpus >= 8) ? - def sort_cpus = 4 + + def align_cpus = 1 + def sort_cpus = 1 + if (task.cpus > 1) { + def half_cpus = task.cpus.intdiv(2) + sort_cpus = half.cpus + align_cpus = task.cpus - half.cpus + } def blocksize = "-K 10000000" // shamelessly taken from NGLess + // def sort_cmd = "samtools collate -@ ${sort_cpus} -o ${sample.id}.bam - tmp/collated_bam" r1_files = reads.findAll( { it.name.endsWith("_R1.fastq.gz") && !it.name.matches("(.*)(singles|orphans|chimeras)(.*)") } ) r2_files = reads.findAll( { it.name.endsWith("_R2.fastq.gz") } ) @@ -27,14 +37,9 @@ process bwa_mem_align { } else if (orphan_files.size() != 0) { r1_input += "${orphan_files.join(' ')}" } - def pre_sort_cmd_1 = "sortbyname.sh -Xmx${maxmem}g in=${r1_input} out=${sample.id}_R1.sorted.fastq.gz interleaved=f" - def pre_sort_cmd_2 = "" def r2_input = "" - def reads2 = "" if (r2_files.size() != 0) { r2_input += "${r2_files.join(' ')}" - pre_sort_cmd_2 = "sortbyname.sh -Xmx${maxmem}g in=${r2_input} out=${sample.id}_R2.sorted.fastq.gz interleaved=f" - reads2 = "${sample.id}_R2.sorted.fastq.gz" } def sort_cmd = (do_name_sort) ? "samtools collate -@ ${sort_cpus} -o ${sample.id}.bam - tmp/collated_bam" : "samtools sort -@ ${sort_cpus} -o ${sample.id}.bam -" @@ -42,16 +47,10 @@ process bwa_mem_align { def read_group_id = (sample.library == "paired") ? ((sample.is_paired) ? 2 : 2) : 1 def read_group = "'@RG\\tID:${read_group_id}\\tSM:${sample.id}'" - pre_sort_cmd_1 = "" - pre_sort_cmd_2 = "" - """ set -e -o pipefail mkdir -p tmp/ - ${pre_sort_cmd_1} - ${pre_sort_cmd_2} - bwa mem -R ${read_group} -a -t ${align_cpus} ${blocksize} \$(readlink ${reference}) ${r1_input} ${r2_input} | samtools view -F 4 -buSh - | ${sort_cmd} + bwa mem -R ${read_group} -a -t ${align_cpus} ${blocksize} \$(readlink ${reference}) ${r1_input} ${r2_input} | samtools view -buSh - | ${sort_cmd} rm -rvf tmp/ *.sorted.fastq.gz """ - // sortbyname.sh -Xmx${maxmem}g in=${sample.id}_R1.fastq.gz out=${sample.id}_R1.sorted.fastq.gz interleaved=f } diff --git a/nevermore/modules/align/helpers.nf b/nevermore/modules/align/helpers.nf index 56b20aa..56b638c 100644 --- a/nevermore/modules/align/helpers.nf +++ b/nevermore/modules/align/helpers.nf @@ -35,19 +35,16 @@ process merge_sam { input: tuple val(sample), path(samfiles) - // val(do_name_sort) - + output: tuple val(sample), path("sam/${sample.id}.sam"), emit: sam tuple val(sample), path("stats/sam/${sample.id}.flagstats.txt"), emit: flagstats script: - // def sort_order = (do_name_sort) ? "-n" : "" def merge_cmd = "" // need a better detection for this if (samfiles instanceof Collection && samfiles.size() >= 2) { - // merge_cmd = "samtools merge -@ $task.cpus ${sort_order} bam/${sample.id}.bam ${bamfiles}" merge_cmd += "samtools view --no-PG -Sh ${samfiles[0]} > sam/${sample.id}.sam\n" merge_cmd += "samtools view -S ${samfiles[1]} >> sam/${sample.id}.sam" diff --git a/nevermore/modules/align/hisat2.nf b/nevermore/modules/align/hisat2.nf index ba0b49d..32c3eb4 100644 --- a/nevermore/modules/align/hisat2.nf +++ b/nevermore/modules/align/hisat2.nf @@ -1,7 +1,5 @@ process hisat2_build { - container "quay.io/biocontainers/hisat2:2.2.1--hdbdd923_6" - // we need a hisat2/samtools mixed container - // container "registry.git.embl.de/schudoma/hisat2-docker:latest" + container "registry.git.embl.org/schudoma/hisat2-docker:latest" input: tuple val(sample), path(genomeseq) @@ -23,9 +21,7 @@ process hisat2_build { } process hisat2_align { - // container "quay.io/biocontainers/hisat2:2.2.1--hdbdd923_6" - // we need a hisat2/samtools mixed container - container "registry.git.embl.de/schudoma/hisat2-docker:latest" + container "registry.git.embl.org/schudoma/hisat2-docker:latest" input: tuple val(sample), path(fastqs), path(index) diff --git a/nevermore/modules/align/minimap2.nf b/nevermore/modules/align/minimap2.nf index ba30067..ae53df7 100644 --- a/nevermore/modules/align/minimap2.nf +++ b/nevermore/modules/align/minimap2.nf @@ -1,5 +1,5 @@ process minimap2_align { - container "registry.git.embl.de/schudoma/minimap2-docker:latest" + container "registry.git.embl.org/schudoma/minimap2-docker:latest" label 'align' input: diff --git a/nevermore/modules/collate.nf b/nevermore/modules/collate.nf deleted file mode 100644 index 524a652..0000000 --- a/nevermore/modules/collate.nf +++ /dev/null @@ -1,16 +0,0 @@ -process collate_stats { - label "default" - - input: - path(stats_files) - - output: - path("reports/read_count_table.txt") - - script: - """ - mkdir -p reports/ - collate_stats.py . > reports/read_count_table.txt - """ -} - diff --git a/nevermore/modules/converters/bam2fq.nf b/nevermore/modules/converters/bam2fq.nf index d2d4ff3..0282b73 100644 --- a/nevermore/modules/converters/bam2fq.nf +++ b/nevermore/modules/converters/bam2fq.nf @@ -2,15 +2,27 @@ process bam2fq { container "quay.io/biocontainers/samtools:1.19.2--h50ea8bc_1" input: tuple val(sample), path(bam) + val(keep_unmapped) + label "process_high" output: tuple val(sample), path("fastq/${sample.id}/${sample.id}*.fastq.gz"), emit: reads script: + + def filter_flags = "-F 0x900" + if (keep_unmapped == true) { + if (sample.is_paired) { + filter_flags = "-F 0x900 -f 0xc" + } else { + filter_flags = "-F 0x900 -f 0x4" + } + } + """ set -o pipefail - mkdir -p fastq/${sample.id} - samtools collate -@ $task.cpus -u -O $bam | samtools fastq -F 0x900 -0 ${sample.id}_other.fastq.gz -1 ${sample.id}_R1.fastq.gz -2 ${sample.id}_R2.fastq.gz + mkdir -p fastq/${sample.id} tmp/ + samtools collate -T tmp/tmpfile -@ $task.cpus -u -O $bam | samtools fastq ${filter_flags} -0 ${sample.id}_other.fastq.gz -1 ${sample.id}_R1.fastq.gz -2 ${sample.id}_R2.fastq.gz if [[ "\$?" -eq 0 ]]; then diff --git a/nevermore/modules/converters/fq2bam.nf b/nevermore/modules/converters/fq2bam.nf index 64faae7..71e4151 100644 --- a/nevermore/modules/converters/fq2bam.nf +++ b/nevermore/modules/converters/fq2bam.nf @@ -1,24 +1,45 @@ process fq2bam { container "quay.io/biocontainers/bbmap:39.06--h92535d8_0" + label "process_high" input: - tuple val(sample), path(fq) + tuple val(sample), path(fastqs) output: tuple val(sample), path("out/${sample.id}.bam"), emit: reads script: def maxmem = task.memory.toGiga() - def r2 = (sample.is_paired) ? "in2=${sample.id}_R2.fastq.gz" : "" + // def r2 = (sample.is_paired) ? "in2=${sample.id}_R2.fastq.gz" : "" def qual_modifier = "" if (params.pb_reads) { qual_modifier = "qin=33" } + def r1_files = fastqs.findAll( { it.name.endsWith("_R1.fastq.gz") && !it.name.matches("(.*)(singles|orphans|chimeras)(.*)") } ) + def r2_files = fastqs.findAll( { it.name.endsWith("_R2.fastq.gz") } ) + def orphans = fastqs.findAll( { it.name.matches("(.*)(singles|orphans|chimeras)(.*)") } ) + + def input_files = "" + if (r1_files.size() != 0 && r2_files.size() != 0) { + // input_files += "-f ${r1_files.join(' ')} -r ${r2_files.join(' ')}" + input_files = "in=${r1_files.join(',')} in2=${r2_files.join(',')}" + } else if (r1_files.size() != 0) { + // input_files += "-s ${r1_files.join(' ')}" + input_files += "in=${r1_files.join(',')}" + } else if (r2_files.size() != 0) { + // input_files += "-s ${r2_files.join(' ')}" + input_files += "in=${r2_files.join(',')}" + } else if (orphans.size() != 0) { + input_files += "in=${orphans.join(',')}" + } + + """ set -e -o pipefail mkdir -p out/ - reformat.sh -Xmx${maxmem}g in=${sample.id}_R1.fastq.gz ${r2} trimreaddescription=t out=stdout.bam ${qual_modifier} | samtools addreplacerg -r "ID:A" -r "SM:${sample.id}" -o out/${sample.id}.bam - + reformat.sh -Xmx${maxmem}g ${input_files} trimreaddescription=t out=stdout.bam ${qual_modifier} | samtools addreplacerg -r "ID:A" -r "SM:${sample.id}" -o out/${sample.id}.bam - """ + // reformat.sh -Xmx${maxmem}g in=${sample.id}_R1.fastq.gz ${r2} trimreaddescription=t out=stdout.bam ${qual_modifier} | samtools addreplacerg -r "ID:A" -r "SM:${sample.id}" -o out/${sample.id}.bam - // https://forum.qiime2.org/t/bug-q2-itsxpresss-dependency-bbmap-cannot-handle-pacbio-ccs/17612/4 } diff --git a/nevermore/modules/converters/merge_fastqs.nf b/nevermore/modules/converters/merge_fastqs.nf index e85620d..9d9ff14 100644 --- a/nevermore/modules/converters/merge_fastqs.nf +++ b/nevermore/modules/converters/merge_fastqs.nf @@ -1,6 +1,7 @@ process merge_single_fastqs { container "quay.io/biocontainers/bbmap:39.06--h92535d8_0" label "medium" + tag "${sample.id}" input: tuple val(sample), path(fastqs) diff --git a/nevermore/modules/decon/hostile.nf b/nevermore/modules/decon/hostile.nf index 6f74f2d..ce4e0d0 100644 --- a/nevermore/modules/decon/hostile.nf +++ b/nevermore/modules/decon/hostile.nf @@ -4,6 +4,7 @@ params.hostile.aligner = "bowtie2" process hostile { container "quay.io/biocontainers/hostile:2.0.0--pyhdfd78af_0" + tag "${sample.id}" input: tuple val(sample), path(fastqs) diff --git a/nevermore/modules/decon/kraken2.nf b/nevermore/modules/decon/kraken2.nf index 15cab45..386099a 100644 --- a/nevermore/modules/decon/kraken2.nf +++ b/nevermore/modules/decon/kraken2.nf @@ -2,7 +2,7 @@ params.kraken2_min_hit_groups = 10 params.fix_read_ids = true process remove_host_kraken2_individual { - container "registry.git.embl.de/schudoma/kraken2-docker:latest" + container "registry.git.embl.org/schudoma/kraken2-docker:latest" label 'kraken2' label "large" @@ -17,8 +17,7 @@ process remove_host_kraken2_individual { tuple val(sample), path("no_host/${sample.id}/KRAKEN_FINISHED"), emit: sentinel script: - // def kraken2_call = "kraken2 --threads ${task.cpus} --db ${kraken_db} --report-minimizer-data --gzip-compressed --minimum-hit-groups ${params.kraken2_min_hit_groups}" - def kraken2_call = "kraken2 --threads ${task.cpus} --db ${kraken_db} --report-minimizer-data --minimum-hit-groups ${params.kraken2_min_hit_groups}" + def kraken2_call = "kraken2 --threads ${task.cpus} --db ${kraken_db} --report-minimizer-data --gzip-compressed --minimum-hit-groups ${params.kraken2_min_hit_groups}" def r1_files = fastqs.findAll( { it.name.endsWith("_R1.fastq.gz") } ) def r2_files = fastqs.findAll( { it.name.endsWith("_R2.fastq.gz") } ) @@ -40,6 +39,7 @@ process remove_host_kraken2_individual { } } + def kraken_cmd = "" def postprocessing = "" @@ -167,7 +167,7 @@ process remove_host_kraken2_individual { } process remove_host_kraken2 { - container "registry.git.embl.de/schudoma/kraken2-docker:latest" + container "registry.git.embl.org/schudoma/kraken2-docker:latest" label 'kraken2' input: @@ -181,8 +181,7 @@ process remove_host_kraken2 { def out_options = (sample.is_paired) ? "--paired --unclassified-out ${sample.id}#.fastq" : "--unclassified-out ${sample.id}_1.fastq" def move_r2 = (sample.is_paired) ? "gzip -c ${sample.id}_2.fastq > no_host/${sample.id}/${sample.id}_R2.fastq.gz" : "" - // def kraken2_call = "kraken2 --threads $task.cpus --db ${kraken_db} --report-minimizer-data --gzip-compressed --minimum-hit-groups ${params.kraken2_min_hit_groups}" - def kraken2_call = "kraken2 --threads $task.cpus --db ${kraken_db} --report-minimizer-data --minimum-hit-groups ${params.kraken2_min_hit_groups}" + def kraken2_call = "kraken2 --threads $task.cpus --db ${kraken_db} --report-minimizer-data --gzip-compressed --minimum-hit-groups ${params.kraken2_min_hit_groups}" """ mkdir -p no_host/${sample.id} diff --git a/nevermore/modules/stats.nf b/nevermore/modules/stats.nf index a64f6e4..d9eb62d 100644 --- a/nevermore/modules/stats.nf +++ b/nevermore/modules/stats.nf @@ -1,20 +1,58 @@ process flagstats { - container "registry.git.embl.de/schudoma/align-docker:latest" + container "registry.git.embl.org/schudoma/align-docker:latest" label "default" input: tuple val(sample), path(bam) + val(stage) output: - tuple val(sample), path("${sample.id}/${sample.id}.flagstats.txt"), emit: flagstats - tuple val(sample), path("${sample.id}/${sample.id}.libsize.txt"), emit: counts - tuple val(sample), path("${sample.id}/${sample.id}.is_paired.txt"), emit: is_paired + tuple val(sample), path("${stage}/${sample.id}/${sample.id}.flagstats.txt"), emit: flagstats + tuple val(sample), path("${stage}/${sample.id}/${sample.id}.libsize.txt"), emit: counts + tuple val(sample), path("${stage}/${sample.id}/${sample.id}.is_paired.txt"), emit: is_paired script: """ - mkdir -p ${sample.id} - samtools flagstat $bam > "${sample.id}/${sample.id}.flagstats.txt" - head -n 1 "${sample.id}/${sample.id}.flagstats.txt" | awk '{print \$1 + \$3}' > "${sample.id}/${sample.id}.libsize.txt" - grep -m 1 "paired in sequencing" "${sample.id}/${sample.id}.flagstats.txt" | awk '{npaired = \$1 + \$3; if (npaired==0) {print "unpaired"} else {print "paired"};}' > "${sample.id}/${sample.id}.is_paired.txt" + mkdir -p ${stage}/${sample.id} + samtools flagstat $bam > "${stage}/${sample.id}/${sample.id}.flagstats.txt" + head -n 1 "${stage}/${sample.id}/${sample.id}.flagstats.txt" | awk '{print \$1 + \$3}' > "${stage}/${sample.id}/${sample.id}.libsize.txt" + grep -m 1 "paired in sequencing" "${stage}/${sample.id}/${sample.id}.flagstats.txt" | awk '{npaired = \$1 + \$3; if (npaired==0) {print "unpaired"} else {print "paired"};}' > "${stage}/${sample.id}/${sample.id}.is_paired.txt" + """ +} + + +process flagstats_libtype { + container "quay.io/biocontainers/gawk:5.1.0--2" + label "default" + publishDir "${params.output_dir}", mode: "copy" + + input: + path(files) + + output: + path("stats/library_type.txt") + + script: + """ + mkdir -p stats/ + find . -maxdepth 1 -mindepth 1 -name '*is_paired.txt' | xargs -I {} awk -v OFS='\t' '{ print gensub(/.+\\/(.+).is_paired.txt/, "\\\\1", "g", FILENAME), \$0;}' {} > stats/library_type.txt + """ +} + + +process collate_stats { + container "registry.git.embl.org/schudoma/portraits_metatraits:latest" + label "default" + + input: + path(stats_files) + + output: + path("reports/read_count_table.txt") + + script: + """ + mkdir -p reports/ + collate_stats.py . > reports/read_count_table.txt """ } diff --git a/nevermore/version.json b/nevermore/version.json index 68dd77b..3cd997b 100644 --- a/nevermore/version.json +++ b/nevermore/version.json @@ -1,4 +1,4 @@ { - "base_version": "0.14.12", - "local_version": "0.14.12_0.0" + "base_version": "0.15.0", + "local_version": "0.15.0" } \ No newline at end of file diff --git a/nevermore/workflows/align.nf b/nevermore/workflows/align.nf index 21f67b7..8ea5fa0 100644 --- a/nevermore/workflows/align.nf +++ b/nevermore/workflows/align.nf @@ -6,10 +6,8 @@ include { bwa_mem_align } from "../modules/align/bwa" include { minimap2_align } from "../modules/align/minimap2" include { merge_and_sort; merge_sam } from "../modules/align/helpers" -def asset_dir = "${projectDir}/nevermore/assets" def do_alignment = params.run_gffquant || !params.skip_alignment def do_stream = params.gq_stream -def do_preprocessing = (!params.skip_preprocessing || params.run_preprocessing) params.do_name_sort = true diff --git a/nevermore/workflows/decon.nf b/nevermore/workflows/decon.nf index 273a0da..7174710 100644 --- a/nevermore/workflows/decon.nf +++ b/nevermore/workflows/decon.nf @@ -35,7 +35,7 @@ workflow nevermore_decon { hostile(preprocessed_ch, params.hostile_db) preprocessed_ch = hostile.out.reads - } else if ((params.remove_host != false && params.remove_host != null ) || params.remove_host == "kraken") { + } else if (params.remove_host == "kraken") { remove_host_kraken2_individual(preprocessed_ch, params.remove_host_kraken2_db) preprocessed_ch = remove_host_kraken2_individual.out.reads diff --git a/nevermore/workflows/input.nf b/nevermore/workflows/input.nf index 2596c27..e2aa948 100644 --- a/nevermore/workflows/input.nf +++ b/nevermore/workflows/input.nf @@ -1,7 +1,7 @@ nextflow.enable.dsl=2 include { classify_sample; classify_sample_with_library_info } from "../modules/functions" - +include { bam2fq } from "../modules/converters/bam2fq" params.bam_input_pattern = "**.bam" @@ -34,6 +34,8 @@ process transfer_bams { process prepare_fastqs { + // container "ghcr.io/astral-sh/uv:python3.14-trixie-slim" + container "registry.git.embl.org/schudoma/portraits_metatraits:latest" label "default" input: @@ -60,11 +62,6 @@ process prepare_fastqs { } - - - - - workflow remote_fastq_input { take: fastq_ch @@ -115,23 +112,23 @@ workflow fastq_input { library_info_ch = prepare_fastqs.out.library_info .splitCsv(header:false, sep:'\t', strip:true) - .map { row -> - return tuple(row[0], row[1]) - } + .map { row -> [ row[0], row[1], row[2] ] } prepped_fastq_ch = prepare_fastqs.out.singles - .map { sample_id, files -> return tuple("${sample_id}.singles", files, false) } + .map { sample_id, files -> [ "${sample_id}.singles", files, false ] } .mix(prepare_fastqs.out.pairs - .map { sample_id, files -> return tuple(sample_id, files, true) } + .map { sample_id, files -> [ sample_id, files, true ] } ) .join(by: 0, library_info_ch) - .map { sample_id, files, is_paired, library_is_paired -> + .map { sample_id, files, is_paired, library_is_paired, n_parts -> def meta = [:] meta.id = sample_id meta.is_paired = is_paired meta.library = (library_is_paired == "1") ? "paired" : "single" - return tuple(meta, [files].flatten()) + meta.multilib = n_parts != "1" + return [ meta, [files].flatten() ] } + prepped_fastq_ch.dump(pretty: true, tag: "prepped_fastq_ch") emit: @@ -158,7 +155,7 @@ workflow bam_input { fastq_ch = Channel.empty() if (params.do_bam2fq_conversion) { - bam2fq(bam_ch) + bam2fq(bam_ch, false) fastq_ch = bam2fq.out.reads .map { classify_sample(it[0].id, it[1]) } } @@ -166,4 +163,3 @@ workflow bam_input { bamfiles = bam_ch fastqs = fastq_ch } - diff --git a/nevermore/workflows/nevermore.nf b/nevermore/workflows/nevermore.nf index b48a1b2..a964d39 100644 --- a/nevermore/workflows/nevermore.nf +++ b/nevermore/workflows/nevermore.nf @@ -5,13 +5,14 @@ nextflow.enable.dsl=2 include { nevermore_simple_preprocessing } from "./prep" include { fastqc } from "../modules/qc/fastqc" include { multiqc } from "../modules/qc/multiqc" -include { collate_stats } from "../modules/collate" +include { collate_stats } from "../modules/stats" include { nevermore_align } from "./align" include { nevermore_pack_reads } from "./pack" include { nevermore_qa } from "./qa" include { nevermore_decon } from "./decon" +params.run_preprocessing = params.run_qc def do_preprocessing = (!params.skip_preprocessing || params.run_preprocessing) def do_alignment = params.run_gffquant || !params.skip_alignment def do_stream = params.gq_stream @@ -57,8 +58,25 @@ workflow nevermore_main { } + collate_stats(collate_ch.collect()) + + + preprocessed_fastq_ch = nevermore_pack_reads.out.fastqs + .map { sample, fastqs -> + sample_id = sample.id.replaceAll(/\.singles$/, "") + return [ sample_id, sample.is_paired, fastqs ] //tuple(sample_id, fastqs) + } + .groupTuple(size: ((params.single_end_libraries) ? 1 : 2), remainder: true) + .map { sample_id, pair_info, fastqs -> + def meta = [:] + meta.id = sample_id + // meta.is_paired = pair_info.contains(true) + return tuple(meta, [fastqs].flatten()) + } + emit: - fastqs = nevermore_pack_reads.out.fastqs + // fastqs = nevermore_pack_reads.out.fastqs + fastqs = preprocessed_fastq_ch readcounts = collate_ch } diff --git a/nevermore/workflows/pack.nf b/nevermore/workflows/pack.nf index b85ddaa..a81f1e4 100644 --- a/nevermore/workflows/pack.nf +++ b/nevermore/workflows/pack.nf @@ -45,8 +45,9 @@ workflow nevermore_pack_reads { single_ch .map { sample, fastq -> - sample.id = sample.id.replaceAll(/.(orphans|singles|chimeras)$/, ".singles") - return tuple(sample, fastq) + def meta = sample.clone() + meta.id = sample.id.replaceAll(/.(orphans|singles|chimeras)$/, ".singles") + return tuple(meta, fastq) } .branch { single_end: it[0].library == "single" @@ -54,31 +55,41 @@ workflow nevermore_pack_reads { } .set { single_reads_ch } - def se_group_size = 2 - ((params.single_end_libraries || params.drop_orphans) ? 1 : 0) + def orphan_merge = !params.single_end_libraries && !params.drop_orphans && params.run_preprocessing && params.remove_host; + def se_group_size = 2 - ((orphan_merge) ? 0 : 1); single_reads_ch.paired_end - .groupTuple(sort: true, size: se_group_size, remainder: true) .branch { - do_merge: it[1].size() > 1 + do_merge: it[0].multilib no_merge: true } .set { pe_singles_ch } merged_single_ch = pe_singles_ch.do_merge + .map { meta, fastq -> [ meta.id, fastq ] } + .groupTuple(by: 0, sort: true, size: se_group_size, remainder: true) + .map { sample_id, fastqs -> + def meta = [:] + meta.id = sample_id + meta.is_paired = false + meta.library = "paired" + meta.merged = true + meta.multilib = true - /* then merge single-read file groups into single files */ + return [meta, [fastqs].flatten()] + } - if (params.single_end_libraries) { + merged_single_ch.dump(pretty: true, tag: "merged_single_ch") - merged_ch = Channel.empty() + /* then merge single-read file groups into single files */ - } else { + merged_ch = Channel.empty() + if (!params.single_end_libraries) { merge_single_fastqs(merged_single_ch) - merged_ch = Channel.empty() - .mix(merge_single_fastqs.out.fastq) + merged_ch = merge_single_fastqs.out.fastq - } + } /* take all single-read files except for the qc-survivors, concat with merged single-read files (takes care of single-end qc-survivors), @@ -95,16 +106,14 @@ workflow nevermore_pack_reads { meta.merged = false return tuple(meta, fastq) } - .mix(pe_singles_ch.no_merge) - .mix(single_reads_ch.single_end) - .mix(paired_ch) - .mix(merged_ch) - // .mix(merge_single_fastqs.out.fastq) + .mix(pe_singles_ch.no_merge) // raw PE library orphans + .mix(single_reads_ch.single_end) // SE library reads + .mix(paired_ch) // PE library pairs + .mix(merged_ch) // merged preprocessed PE library orphans fastq_prep_ch = paired_ch .mix(single_reads_ch.single_end) .mix(pe_singles_ch.no_merge) - // .mix(merge_single_fastqs.out.fastq) .mix(merged_ch) emit: diff --git a/nextflow.config b/nextflow.config index b4a8877..1181f5d 100644 --- a/nextflow.config +++ b/nextflow.config @@ -4,5 +4,5 @@ manifest { description = "Metaomics pipeline toolbox" name = "nevermore" nextflowVersion = ">=22.10.6" - version = "0.14.14" + version = "0.15.0" }