Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions bin/prepare_fastqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def process_sample(
dest = os.path.join(sample_dir, f"{sample}_R1.fastq.{dest_compression}")
transfer_file(fastqs[0], dest, remote_input=remote_input)

yield sample, False
yield sample, False, 1

elif fastqs:

Expand Down Expand Up @@ -185,9 +185,7 @@ def rx_filter(s, x=1):
return re.search(r"[._R]" + str(x) + r"$", s) and not re.search(r"(orphan|single)s?", s)

# partition fastqs into R1, R2, and 'other' sets
# r1 = [(p, f) for p, f in zip(prefixes, fastqs) if re.search(r"[._R]1$", p)]
r1 = [(p, f) for p, f in zip(prefixes, fastqs) if rx_filter(p, x=1)]
# r2 = [(p, f) for p, f in zip(prefixes, fastqs) if re.search(r"[._R]2$", p)]
r2 = [(p, f) for p, f in zip(prefixes, fastqs) if rx_filter(p, x=2)]
others = sorted(list(set(fastqs).difference({f for _, f in r1}).difference({f for _, f in r2})))

Expand Down Expand Up @@ -228,18 +226,21 @@ def rx_filter(s, x=1):

pathlib.Path(sample_dir).mkdir(parents=True, exist_ok=True)

n_parts = 0
if r1:
# if R1 is not empty, transfer R1-files
n_parts += 1
dest = os.path.join(sample_dir, f"{sample}_R1.fastq.{dest_compression}")
transfer_multifiles(r1, dest, remote_input=remote_input, compression=compression)
if r2:
# if R2 is not empty, transfer R2-files,
# if R1 is empty, rename R2 to R1 so that files can be processed as normal single-end
n_parts += 1
target_r = "R2" if r1 else "R1"
dest = os.path.join(sample_dir, f"{sample}_{target_r}.fastq.{dest_compression}")
transfer_multifiles(r2, dest, remote_input=remote_input, compression=compression)

yield sample, bool(r1 and r2)
yield sample, bool(r1 and r2), bool(r1 or r2) + bool(others)

if others:
# if single-end reads exist,
Expand All @@ -252,7 +253,7 @@ def rx_filter(s, x=1):
dest = os.path.join(sample_dir, f"{sample}_R1.fastq.{dest_compression}")
transfer_multifiles(others, dest, remote_input=remote_input, compression=compression)

yield sample, bool(r1 or r2)
yield sample, bool(r1 or r2), bool(r1 or r2) + bool(others)


def is_fastq(f, valid_fastq_suffixes, valid_compression_suffixes):
Expand Down Expand Up @@ -309,6 +310,7 @@ def main():
ap.add_argument("--valid-fastq-suffixes", type=str, default="fastq,fq")
ap.add_argument("--valid-compression-suffixes", type=str, default="gz,bz2")
ap.add_argument("--add_sample_suffix", type=str)
ap.add_argument("--override_pair_check", action="store_true")

args = ap.parse_args()

Expand Down Expand Up @@ -370,8 +372,8 @@ def collect_fastq_files(input_dir, valid_fastq_suffixes, valid_compression_suffi
except Exception as e:
raise ValueError(f"Encountered problems processing sample '{sample}': {e}.\nPlease check your file names.")
else:
for sample, is_paired in renamed:
print(sample, int(is_paired), sep="\t", file=lib_out)
for sample, is_paired, n_parts in renamed:
print(sample, int(is_paired), n_parts, sep="\t", file=lib_out)

if __name__ == "__main__":
main()
67 changes: 50 additions & 17 deletions nevermore/bin/prepare_fastqs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@


def check_pairwise(r1, r2):
d = {}
for prefix, fn in itertools.chain(r1, r2):
d.setdefault(prefix[:-1], []).append((prefix, fn))
for prefix, reads in d.items():
if len(reads) == 2:
yield reads[0][1], reads[1][1]
elif len(reads) == 1:
yield (None, reads[0][1]) if reads[0][0][-1] == "2" else (reads[0][1], None)
else:
raise ValueError(f"Weird number of reads found: {reads}")


def check_pairwise_old(r1, r2):
""" Checks if two sets of read files contain the same prefixes.

Input:
Expand Down Expand Up @@ -80,11 +93,13 @@ def transfer_multifiles(files, dest, remote_input=False, compression=None):
if compression in ("gz", "bz2"):
# multiple compressed files can just be concatenated
logging.debug('transfer_multifiles: compression=%s, remote_input=%s, action=concatenate', compression, remote_input)
logging.debug(' cmd: %s', ' '.join(cat_cmd))
with open(dest, "wt") as _out:
subprocess.run(cat_cmd, stdout=_out)
else:
# multiple uncompressed files will be cat | gzipped
logging.debug('transfer_multifiles: compression=%s, remote_input=%s, action=concatenate+gzip', compression, remote_input)
logging.debug(' cmd: %s', ' | '.join((' '.join(cat_cmd), "gzip -c -")))
cat_pr = subprocess.Popen(cat_cmd, stdout=subprocess.PIPE)
with open(dest, "wt") as _out:
subprocess.run(("gzip", "-c", "-"), stdin=cat_pr.stdout, stdout=_out)
Expand Down Expand Up @@ -130,7 +145,7 @@ def process_sample(
dest = os.path.join(sample_dir, f"{sample}_R1.fastq.{dest_compression}")
transfer_file(fastqs[0], dest, remote_input=remote_input)

yield sample, False
yield sample, False, 1

elif fastqs:

Expand Down Expand Up @@ -166,23 +181,37 @@ def process_sample(

print("PRE", prefixes, file=sys.stderr)

def rx_filter(s, x=1):
return re.search(r"[._R]" + str(x) + r"$", s) and not re.search(r"(orphan|single)s?", s)

# partition fastqs into R1, R2, and 'other' sets
r1 = [(p, f) for p, f in zip(prefixes, fastqs) if re.search(r"[._R]1$", p)]
r2 = [(p, f) for p, f in zip(prefixes, fastqs) if re.search(r"[._R]2$", p)]
r1 = [(p, f) for p, f in zip(prefixes, fastqs) if rx_filter(p, x=1)]
r2 = [(p, f) for p, f in zip(prefixes, fastqs) if rx_filter(p, x=2)]
others = sorted(list(set(fastqs).difference({f for _, f in r1}).difference({f for _, f in r2})))

# check if R1/R2 sets have equal sizes or are empty
# R1 empty: potential scRNAseq (or any protocol with barcode reads in R1)
# R2 empty: typical single end reads with (R?)1 suffix
assert len(r2) == 0 or len(r1) == 0 or (r1 and len(r1) == len(r2)), "R1/R2 sets are not of the same length"
if False:
# check if R1/R2 sets have equal sizes or are empty
# R1 empty: potential scRNAseq (or any protocol with barcode reads in R1)
# R2 empty: typical single end reads with (R?)1 suffix
assert len(r2) == 0 or len(r1) == 0 or (r1 and len(r1) == len(r2)), "R1/R2 sets are not of the same length"

# if R1 and R2 are of equal size, check if the prefixes match
if len(r1) == len(r2) and r1:
check_pairwise(r1, r2)

# if R1 and R2 are of equal size, check if the prefixes match
if len(r1) == len(r2) and r1:
check_pairwise(r1, r2)
# sort R1/R2 for concatenation, get rid off prefixes
r1 = sorted(f for _, f in r1)
r2 = sorted(f for _, f in r2)
else:
reads = list(check_pairwise(r1, r2))
if reads:
others += [
r1 or r2
for r1, r2 in reads
if r1 is None or r2 is None
]
r1, r2 = zip(*((f1, f2) for f1, f2 in reads if f1 and f2))

# sort R1/R2 for concatenation, get rid off prefixes
r1 = sorted(f for _, f in r1)
r2 = sorted(f for _, f in r2)

print("R1", r1, file=sys.stderr)
print("R2", r2, file=sys.stderr)
Expand All @@ -197,18 +226,21 @@ def process_sample(

pathlib.Path(sample_dir).mkdir(parents=True, exist_ok=True)

n_parts = 0
if r1:
# if R1 is not empty, transfer R1-files
n_parts += 1
dest = os.path.join(sample_dir, f"{sample}_R1.fastq.{dest_compression}")
transfer_multifiles(r1, dest, remote_input=remote_input, compression=compression)
if r2:
# if R2 is not empty, transfer R2-files,
# if R1 is empty, rename R2 to R1 so that files can be processed as normal single-end
n_parts += 1
target_r = "R2" if r1 else "R1"
dest = os.path.join(sample_dir, f"{sample}_{target_r}.fastq.{dest_compression}")
transfer_multifiles(r2, dest, remote_input=remote_input, compression=compression)

yield sample, bool(r1 and r2)
yield sample, bool(r1 and r2), bool(r1 or r2) + bool(others)

if others:
# if single-end reads exist,
Expand All @@ -221,7 +253,7 @@ def process_sample(
dest = os.path.join(sample_dir, f"{sample}_R1.fastq.{dest_compression}")
transfer_multifiles(others, dest, remote_input=remote_input, compression=compression)

yield sample, bool(r1 or r2)
yield sample, bool(r1 or r2), bool(r1 or r2) + bool(others)


def is_fastq(f, valid_fastq_suffixes, valid_compression_suffixes):
Expand Down Expand Up @@ -278,6 +310,7 @@ def main():
ap.add_argument("--valid-fastq-suffixes", type=str, default="fastq,fq")
ap.add_argument("--valid-compression-suffixes", type=str, default="gz,bz2")
ap.add_argument("--add_sample_suffix", type=str)
ap.add_argument("--override_pair_check", action="store_true")

args = ap.parse_args()

Expand Down Expand Up @@ -339,8 +372,8 @@ def collect_fastq_files(input_dir, valid_fastq_suffixes, valid_compression_suffi
except Exception as e:
raise ValueError(f"Encountered problems processing sample '{sample}': {e}.\nPlease check your file names.")
else:
for sample, is_paired in renamed:
print(sample, int(is_paired), sep="\t", file=lib_out)
for sample, is_paired, n_parts in renamed:
print(sample, int(is_paired), n_parts, sep="\t", file=lib_out)

if __name__ == "__main__":
main()
5 changes: 2 additions & 3 deletions nevermore/modules/align/bowtie2.nf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
process bowtie2_build {
container "quay.io/biocontainers/bowtie2:2.5.3--py39h6fed5c7_1"
container "registry.git.embl.org/schudoma/bowtie2-docker:latest"
tag "${sample.id}"

input:
Expand All @@ -23,8 +23,7 @@ process bowtie2_build {


process bowtie2_align {
// container "quay.io/biocontainers/bowtie2:2.5.3--py39h6fed5c7_1"
container "registry.git.embl.de/schudoma/bowtie2-docker:latest"
container "registry.git.embl.org/schudoma/bowtie2-docker:latest"
tag "${sample.id}"

input:
Expand Down
29 changes: 14 additions & 15 deletions nevermore/modules/align/bwa.nf
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
process bwa_mem_align {
container "registry.git.embl.de/schudoma/align-docker:latest"
cpus 4
memory { 20.GB * task.attempt }
time { 3.d * task.attempt }
container "registry.git.embl.org/schudoma/align-docker:latest"
label 'align'

input:
Expand All @@ -12,10 +15,17 @@ process bwa_mem_align {

script:
def maxmem = task.memory.toGiga()
def align_cpus = 4 // figure out the groovy division garbage later (task.cpus >= 8) ?
def sort_cpus = 4

def align_cpus = 1
def sort_cpus = 1
if (task.cpus > 1) {
def half_cpus = task.cpus.intdiv(2)
sort_cpus = half.cpus
align_cpus = task.cpus - half.cpus
}
def blocksize = "-K 10000000" // shamelessly taken from NGLess

// def sort_cmd = "samtools collate -@ ${sort_cpus} -o ${sample.id}.bam - tmp/collated_bam"

r1_files = reads.findAll( { it.name.endsWith("_R1.fastq.gz") && !it.name.matches("(.*)(singles|orphans|chimeras)(.*)") } )
r2_files = reads.findAll( { it.name.endsWith("_R2.fastq.gz") } )
Expand All @@ -27,31 +37,20 @@ process bwa_mem_align {
} else if (orphan_files.size() != 0) {
r1_input += "${orphan_files.join(' ')}"
}
def pre_sort_cmd_1 = "sortbyname.sh -Xmx${maxmem}g in=${r1_input} out=${sample.id}_R1.sorted.fastq.gz interleaved=f"
def pre_sort_cmd_2 = ""
def r2_input = ""
def reads2 = ""
if (r2_files.size() != 0) {
r2_input += "${r2_files.join(' ')}"
pre_sort_cmd_2 = "sortbyname.sh -Xmx${maxmem}g in=${r2_input} out=${sample.id}_R2.sorted.fastq.gz interleaved=f"
reads2 = "${sample.id}_R2.sorted.fastq.gz"
}

def sort_cmd = (do_name_sort) ? "samtools collate -@ ${sort_cpus} -o ${sample.id}.bam - tmp/collated_bam" : "samtools sort -@ ${sort_cpus} -o ${sample.id}.bam -"

def read_group_id = (sample.library == "paired") ? ((sample.is_paired) ? 2 : 2) : 1
def read_group = "'@RG\\tID:${read_group_id}\\tSM:${sample.id}'"

pre_sort_cmd_1 = ""
pre_sort_cmd_2 = ""

"""
set -e -o pipefail
mkdir -p tmp/
${pre_sort_cmd_1}
${pre_sort_cmd_2}
bwa mem -R ${read_group} -a -t ${align_cpus} ${blocksize} \$(readlink ${reference}) ${r1_input} ${r2_input} | samtools view -F 4 -buSh - | ${sort_cmd}
bwa mem -R ${read_group} -a -t ${align_cpus} ${blocksize} \$(readlink ${reference}) ${r1_input} ${r2_input} | samtools view -buSh - | ${sort_cmd}
rm -rvf tmp/ *.sorted.fastq.gz
"""
// sortbyname.sh -Xmx${maxmem}g in=${sample.id}_R1.fastq.gz out=${sample.id}_R1.sorted.fastq.gz interleaved=f
}
5 changes: 1 addition & 4 deletions nevermore/modules/align/helpers.nf
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,16 @@ process merge_sam {

input:
tuple val(sample), path(samfiles)
// val(do_name_sort)


output:
tuple val(sample), path("sam/${sample.id}.sam"), emit: sam
tuple val(sample), path("stats/sam/${sample.id}.flagstats.txt"), emit: flagstats

script:
// def sort_order = (do_name_sort) ? "-n" : ""
def merge_cmd = ""

// need a better detection for this
if (samfiles instanceof Collection && samfiles.size() >= 2) {
// merge_cmd = "samtools merge -@ $task.cpus ${sort_order} bam/${sample.id}.bam ${bamfiles}"
merge_cmd += "samtools view --no-PG -Sh ${samfiles[0]} > sam/${sample.id}.sam\n"
merge_cmd += "samtools view -S ${samfiles[1]} >> sam/${sample.id}.sam"

Expand Down
8 changes: 2 additions & 6 deletions nevermore/modules/align/hisat2.nf
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
process hisat2_build {
container "quay.io/biocontainers/hisat2:2.2.1--hdbdd923_6"
// we need a hisat2/samtools mixed container
// container "registry.git.embl.de/schudoma/hisat2-docker:latest"
container "registry.git.embl.org/schudoma/hisat2-docker:latest"

input:
tuple val(sample), path(genomeseq)
Expand All @@ -23,9 +21,7 @@ process hisat2_build {
}

process hisat2_align {
// container "quay.io/biocontainers/hisat2:2.2.1--hdbdd923_6"
// we need a hisat2/samtools mixed container
container "registry.git.embl.de/schudoma/hisat2-docker:latest"
container "registry.git.embl.org/schudoma/hisat2-docker:latest"

input:
tuple val(sample), path(fastqs), path(index)
Expand Down
2 changes: 1 addition & 1 deletion nevermore/modules/align/minimap2.nf
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
process minimap2_align {
container "registry.git.embl.de/schudoma/minimap2-docker:latest"
container "registry.git.embl.org/schudoma/minimap2-docker:latest"
label 'align'

input:
Expand Down
16 changes: 0 additions & 16 deletions nevermore/modules/collate.nf

This file was deleted.

16 changes: 14 additions & 2 deletions nevermore/modules/converters/bam2fq.nf
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,27 @@ process bam2fq {
container "quay.io/biocontainers/samtools:1.19.2--h50ea8bc_1"
input:
tuple val(sample), path(bam)
val(keep_unmapped)
label "process_high"

output:
tuple val(sample), path("fastq/${sample.id}/${sample.id}*.fastq.gz"), emit: reads

script:

def filter_flags = "-F 0x900"
if (keep_unmapped == true) {
if (sample.is_paired) {
filter_flags = "-F 0x900 -f 0xc"
} else {
filter_flags = "-F 0x900 -f 0x4"
}
}

"""
set -o pipefail
mkdir -p fastq/${sample.id}
samtools collate -@ $task.cpus -u -O $bam | samtools fastq -F 0x900 -0 ${sample.id}_other.fastq.gz -1 ${sample.id}_R1.fastq.gz -2 ${sample.id}_R2.fastq.gz
mkdir -p fastq/${sample.id} tmp/
samtools collate -T tmp/tmpfile -@ $task.cpus -u -O $bam | samtools fastq ${filter_flags} -0 ${sample.id}_other.fastq.gz -1 ${sample.id}_R1.fastq.gz -2 ${sample.id}_R2.fastq.gz

if [[ "\$?" -eq 0 ]];
then
Expand Down
Loading