diff --git a/Makefile b/Makefile index 6d2086fc0..b39adb0f8 100644 --- a/Makefile +++ b/Makefile @@ -300,6 +300,7 @@ suball.lastz: ln -f submodules/lastz/src/lastz bin suball.paffy: + git -C submodules/paffy/submodules/sonLib checkout $$(git -C submodules/sonLib rev-parse HEAD) cd submodules/paffy && LIBS="${jemallocLib}" ${MAKE} rm -rf submodules/paffy/bin/*.dSYM ln -f submodules/paffy/bin/[a-zA-Z]* ${BINDIR} diff --git a/caf/impl/pinchIterator.c b/caf/impl/pinchIterator.c index 628c64cd1..8fb62c182 100644 --- a/caf/impl/pinchIterator.c +++ b/caf/impl/pinchIterator.c @@ -43,7 +43,7 @@ typedef struct _pairwiseAlignmentToPinch { Paf *(*getPairwiseAlignment)(void *); Paf *paf; int64_t xCoordinate, yCoordinate, xName, yName; - Cigar *op; + int64_t op_index; bool freeAlignments; } PairwiseAlignmentToPinch; @@ -63,43 +63,47 @@ static stPinch *pairwiseAlignmentToPinch_getNext(PairwiseAlignmentToPinch *pA, s if (pA->paf == NULL) { return NULL; } - pA->op = pA->paf->cigar; + pA->op_index = 0; pA->xCoordinate = pA->paf->same_strand ? pA->paf->query_start : pA->paf->query_end; pA->yCoordinate = pA->paf->target_start; pA->xName = cactusMisc_stringToName(pA->paf->query_name); pA->yName = cactusMisc_stringToName(pA->paf->target_name); } - while (pA->op != NULL) { - assert(pA->op->length >= 1); - if (pA->op->op == match || pA->op->op == sequence_match || pA->op->op == sequence_mismatch) { //deal with the possibility of a zero length match (strange, but not illegal) + int64_t n = cigar_count(pA->paf->cigar); + while (pA->op_index < n) { + CigarRecord *rec = cigar_get(pA->paf->cigar, pA->op_index); + assert(rec->length >= 1); + if (rec->op == match || rec->op == sequence_match || rec->op == sequence_mismatch) { // Make maximal length (in case run of sequence matches and mismatches) - int64_t i=0; // Represents the length of the previous matches in the sequence - while(pA->op->next != NULL && (pA->op->next->op == match || - pA->op->next->op == sequence_match || - pA->op->next->op == sequence_mismatch)) { - i+=pA->op->length; - pA->op = pA->op->next; + int64_t i = 0; // Represents the length of the previous matches in the sequence + while (pA->op_index + 1 < n) { + CigarRecord *next_rec = cigar_get(pA->paf->cigar, pA->op_index + 1); + if (next_rec->op != match && next_rec->op != sequence_match && + next_rec->op != sequence_mismatch) break; + i += rec->length; + pA->op_index++; + rec = cigar_get(pA->paf->cigar, pA->op_index); } if (pA->paf->same_strand) { - stPinch_fillOut(pinchToFillOut, pA->xName, pA->yName, pA->xCoordinate, pA->yCoordinate, i+pA->op->length, - 1); - pA->xCoordinate += i+pA->op->length; + stPinch_fillOut(pinchToFillOut, pA->xName, pA->yName, pA->xCoordinate, + pA->yCoordinate, i + rec->length, 1); + pA->xCoordinate += i + rec->length; } else { - pA->xCoordinate -= i+pA->op->length; - stPinch_fillOut(pinchToFillOut, pA->xName, pA->yName, pA->xCoordinate, pA->yCoordinate, i+pA->op->length, - 0); + pA->xCoordinate -= i + rec->length; + stPinch_fillOut(pinchToFillOut, pA->xName, pA->yName, pA->xCoordinate, + pA->yCoordinate, i + rec->length, 0); } - pA->yCoordinate += i+pA->op->length; - pA->op = pA->op->next; + pA->yCoordinate += i + rec->length; + pA->op_index++; return pinchToFillOut; } - if (pA->op->op != query_delete) { - pA->xCoordinate += pA->paf->same_strand ? pA->op->length : -pA->op->length; + if (rec->op != query_delete) { + pA->xCoordinate += pA->paf->same_strand ? rec->length : -rec->length; } - if (pA->op->op != query_insert) { - pA->yCoordinate += pA->op->length; + if (rec->op != query_insert) { + pA->yCoordinate += rec->length; } - pA->op = pA->op->next; + pA->op_index++; } if (pA->paf->same_strand) { assert(pA->xCoordinate == pA->paf->query_end); diff --git a/caf/tests/pinchIteratorTest.c b/caf/tests/pinchIteratorTest.c index e1f46fc4f..61ec93ecc 100644 --- a/caf/tests/pinchIteratorTest.c +++ b/caf/tests/pinchIteratorTest.c @@ -23,8 +23,9 @@ static void testIterator(CuTest *testCase, stPinchIterator *pinchIterator, stLis sscanf(paf->target_name, "%" PRIi64 "", &contigY); int64_t x = paf->same_strand ? paf->query_start : paf->query_end; int64_t y = paf->target_start; - Cigar *c = paf->cigar; - while(c != NULL) { + int64_t nc = cigar_count(paf->cigar); + for (int64_t ci = 0; ci < nc; ci++) { + CigarRecord *c = cigar_get(paf->cigar, ci); if (c->op == match) { if (c->length > 2 * trim) { stPinch *pinch = stPinchIterator_getNext(pinchIterator, &pinchToFillOut); @@ -44,7 +45,6 @@ static void testIterator(CuTest *testCase, stPinchIterator *pinchIterator, stLis if (c->op != query_insert) { y += c->length; } - c = c->next; } } CuAssertPtrEquals(testCase, NULL, stPinchIterator_getNext(pinchIterator, &pinchToFillOut)); @@ -65,22 +65,28 @@ static stList *getRandomPairwiseAlignments() { paf->target_start = st_randomInt(100000, 1000000); paf->same_strand = st_random() > 0.5; int64_t i = paf->query_start, j = paf->target_start; - Cigar **pc = &(paf->cigar); + Cigar *cigar = st_calloc(1, sizeof(Cigar)); + int64_t max_ops = 100; + cigar->recs = st_calloc(max_ops, sizeof(CigarRecord)); + cigar->capacity = max_ops; + cigar->length = 0; + cigar->start = 0; + paf->cigar = cigar; CigarOp p_op_type = query_delete; // This is a fudge to ensure that we don't end up with two matches in succession // in the cigar - because the pinch iterator will smush them together do { - Cigar *c = st_calloc(1, sizeof(Cigar)); + assert(cigar->length < cigar->capacity); + CigarRecord *c = &cigar->recs[cigar->length]; c->length = st_randomInt(1, 10); c->op = st_random() > 0.3 ? ((st_random() > 0.5 && p_op_type != match) ? match : query_insert): query_delete; p_op_type = c->op; + cigar->length++; if (c->op != query_delete) { i += c->length; } if (c->op != query_insert) { j += c->length; } - *pc = c; - pc = &(c->next); } while(st_random() > 0.1 || paf->query_start == i || paf->target_start == j); paf->query_end = i; paf->target_end = j; diff --git a/src/cactus/cactus_progressive_config.xml b/src/cactus/cactus_progressive_config.xml index 4b821a953..5404950a2 100644 --- a/src/cactus/cactus_progressive_config.xml +++ b/src/cactus/cactus_progressive_config.xml @@ -60,6 +60,10 @@ + + @@ -96,6 +100,8 @@ minimumDistance="0.01" gpu="0" lastzMemory="littleMemory" + chainContigGroupSize="10000000" + chainSplitMinSize="1000000000" chainMaxGapLength="1000000" chainGapOpen="5000" chainGapExtend="1" diff --git a/src/cactus/paf/local_alignment.py b/src/cactus/paf/local_alignment.py index c41e4ce7b..7ba060fae 100755 --- a/src/cactus/paf/local_alignment.py +++ b/src/cactus/paf/local_alignment.py @@ -13,6 +13,7 @@ from toil.lib.bioio import getLogLevelString from toil.realtimeLogger import RealtimeLogger from sonLib.bioio import newickTreeParser +import glob import os import shutil import math @@ -611,7 +612,6 @@ def chain_one_alignment(job, alignment_file, alignment_name, params, include_inv work_dir = job.fileStore.getLocalTempDir() alignment_path = os.path.join(work_dir, alignment_name + '.paf') alignment_inv_path = os.path.join(work_dir, alignment_name + '.inv.paf') - output_path = os.path.join(work_dir, alignment_name + '.chained.paf') # Copy the alignments from the job store job.fileStore.readGlobalFile(alignment_file, alignment_path) @@ -623,17 +623,79 @@ def chain_one_alignment(job, alignment_file, alignment_name, params, include_inv cactus_call(parameters=['paffy', 'invert', "-i", alignment_inv_path], outfile=alignment_path, outappend=True, job_memory=job.memory) - # Now chain the alignments - cactus_call(parameters=['paffy', 'chain', "-i", alignment_path, - "--maxGapLength", params.find("blast").attrib["chainMaxGapLength"], - "--chainGapOpen", params.find("blast").attrib["chainGapOpen"], - "--chainGapExtend", params.find("blast").attrib["chainGapExtend"], - "--trimFraction", params.find("blast").attrib["chainTrimFraction"], - "--logLevel", getLogLevelString()], - outfile=output_path, job_memory=job.memory) + contig_group_size = int(params.find("blast").attrib.get("chainContigGroupSize", "10000000")) + chain_split_min_size = int(params.find("blast").attrib.get("chainSplitMinSize", "10000000")) + + # If the file is small enough, chain it directly without splitting + if os.path.getsize(alignment_path) <= chain_split_min_size: + output_path = os.path.join(work_dir, alignment_name + '.chained.paf') + cactus_call(parameters=['paffy', 'chain', '-i', alignment_path, + '--maxGapLength', params.find("blast").attrib["chainMaxGapLength"], + '--chainGapOpen', params.find("blast").attrib["chainGapOpen"], + '--chainGapExtend', params.find("blast").attrib["chainGapExtend"], + '--trimFraction', params.find("blast").attrib["chainTrimFraction"], + '--logLevel', getLogLevelString()], + outfile=output_path, job_memory=job.memory) + job.fileStore.deleteGlobalFile(alignment_file) + return job.fileStore.writeGlobalFile(output_path) + + # Split by query contig to enable parallel chaining + split_dir = job.fileStore.getLocalTempDir() + split_prefix = os.path.join(split_dir, 'split_') + cactus_call(parameters=['paffy', 'split_file', '-i', alignment_path, + '-q', '-p', split_prefix, '-m', str(contig_group_size), + '--logLevel', getLogLevelString()], + job_memory=job.memory) + # Write each split to global store and spawn a child chain job + split_paths = glob.glob(split_prefix + '*.paf') + chained_split_rv = [] + for split_path in split_paths: + split_size = os.path.getsize(split_path) + split_file_id = job.fileStore.writeGlobalFile(split_path) + chained_split_rv.append( + job.addChildJobFn(chain_one_contig, split_file_id, params, + disk=2 * split_size, + memory=cactus_clamp_memory(2 * split_size)).rv() + ) + + # Mark alignment file for cleanup job.fileStore.deleteGlobalFile(alignment_file) + return job.addFollowOnJobFn(merge_chained_contigs, chained_split_rv).rv() + + +def chain_one_contig(job, split_file_id, params): + """Run paffy chain on one contig's worth of alignments.""" + work_dir = job.fileStore.getLocalTempDir() + input_path = os.path.join(work_dir, 'input.paf') + output_path = os.path.join(work_dir, 'chained.paf') + + job.fileStore.readGlobalFile(split_file_id, input_path) + cactus_call(parameters=['paffy', 'chain', '-i', input_path, + '--maxGapLength', params.find("blast").attrib["chainMaxGapLength"], + '--chainGapOpen', params.find("blast").attrib["chainGapOpen"], + '--chainGapExtend', params.find("blast").attrib["chainGapExtend"], + '--trimFraction', params.find("blast").attrib["chainTrimFraction"], + '--logLevel', getLogLevelString()], + outfile=output_path, job_memory=job.memory) + + job.fileStore.deleteGlobalFile(split_file_id) + return job.fileStore.writeGlobalFile(output_path) + + +def merge_chained_contigs(job, chained_file_ids): + """Concatenate all per-contig chained PAF files into one.""" + work_dir = job.fileStore.getLocalTempDir() + output_path = os.path.join(work_dir, 'merged_chained.paf') + + with open(output_path, 'w') as outf: + for file_id in chained_file_ids: + local_path = job.fileStore.readGlobalFile(file_id) + with open(local_path, 'r') as inf: + shutil.copyfileobj(inf, outf) + job.fileStore.deleteGlobalFile(file_id) + return job.fileStore.writeGlobalFile(output_path) diff --git a/submodules/paffy b/submodules/paffy index 8b3c73246..f114a7ece 160000 --- a/submodules/paffy +++ b/submodules/paffy @@ -1 +1 @@ -Subproject commit 8b3c732460ad8c307161ec558ba72736b4fd421a +Subproject commit f114a7ece1d411f63ea579ed177033a37f8b0b57 diff --git a/submodules/sonLib b/submodules/sonLib index ba4cf9540..c660699f1 160000 --- a/submodules/sonLib +++ b/submodules/sonLib @@ -1 +1 @@ -Subproject commit ba4cf95406f1d29c292d12428a95fde420e19c52 +Subproject commit c660699f1c99c18fc346fec2f91c985fa420b2ec