diff --git a/src/fastqchunkindex.cpp b/src/fastqchunkindex.cpp new file mode 100644 index 0000000..92857b6 --- /dev/null +++ b/src/fastqchunkindex.cpp @@ -0,0 +1,85 @@ +#include "fastqchunkindex.h" +#include +#include +#include +#include + +// Scan buffer size: 4MB +static const size_t SCAN_BUF_SIZE = 1 << 22; + +bool FastqChunkIndex::build(const std::string& filename, int packSize) { + mFd = open(filename.c_str(), O_RDONLY); + if (mFd < 0) + return false; + + struct stat st; + if (fstat(mFd, &st) != 0 || !S_ISREG(st.st_mode)) { + ::close(mFd); + mFd = -1; + return false; + } + mFileSize = st.st_size; + if (mFileSize == 0) { + ::close(mFd); + mFd = -1; + return false; + } + +#ifdef __linux__ + posix_fadvise(mFd, 0, mFileSize, POSIX_FADV_SEQUENTIAL); +#endif + + // Every FASTQ record is 4 lines, so one pack = packSize * 4 newlines + const int newlinesPerPack = packSize * 4; + + mOffsets.clear(); + mOffsets.push_back(0); // first pack starts at byte 0 + + char* buf = new char[SCAN_BUF_SIZE]; + size_t fileOffset = 0; + int nlCount = 0; + + while (fileOffset < mFileSize) { + size_t toRead = SCAN_BUF_SIZE; + if (fileOffset + toRead > mFileSize) + toRead = mFileSize - fileOffset; + + ssize_t n = pread(mFd, buf, toRead, fileOffset); + if (n <= 0) + break; + + // Scan for newlines using memchr for speed + const char* p = buf; + const char* end = buf + n; + while (p < end) { + const char* nl = (const char*)memchr(p, '\n', end - p); + if (!nl) + break; + nlCount++; + if (nlCount == newlinesPerPack) { + // The next pack starts right after this newline + size_t offset = fileOffset + (nl - buf) + 1; + if (offset < mFileSize) + mOffsets.push_back(offset); + nlCount = 0; + } + p = nl + 1; + } + + fileOffset += n; + } + + // Sentinel: the end of the last pack is fileSize + if (mOffsets.back() != mFileSize) + mOffsets.push_back(mFileSize); + + delete[] buf; + return true; +} + +FastqChunkIndex::~FastqChunkIndex() { + if (mFd >= 0) { + ::close(mFd); + mFd = -1; + } +} diff --git a/src/fastqchunkindex.h b/src/fastqchunkindex.h new file mode 100644 index 0000000..f4d333c --- /dev/null +++ b/src/fastqchunkindex.h @@ -0,0 +1,41 @@ +#ifndef FASTQ_CHUNK_INDEX_H +#define FASTQ_CHUNK_INDEX_H + +#include +#include +#include + +// Scans an uncompressed FASTQ file and builds a pack offset index. +// Each entry marks the byte offset where a pack of PACK_SIZE reads begins. +// The last pack may contain fewer than PACK_SIZE reads. +class FastqChunkIndex { +public: + // Scan the file, recording a byte offset every (4 * packSize) newlines. + // Returns false if the file cannot be opened or is not a regular file. + bool build(const std::string& filename, int packSize); + + // Number of packs + size_t packCount() const { return mOffsets.size() > 0 ? mOffsets.size() - 1 : 0; } + + // Byte offset of pack i + size_t packStart(size_t i) const { return mOffsets[i]; } + + // Byte length of pack i + size_t packLength(size_t i) const { return mOffsets[i + 1] - mOffsets[i]; } + + // File descriptor (caller must not close it; owned by this object) + int fd() const { return mFd; } + + // Total file size + size_t fileSize() const { return mFileSize; } + + // Clean up + ~FastqChunkIndex(); + +private: + std::vector mOffsets; // packCount+1 entries: [0]=0, [last]=fileSize + int mFd = -1; + size_t mFileSize = 0; +}; + +#endif diff --git a/src/fastqchunkparser.cpp b/src/fastqchunkparser.cpp new file mode 100644 index 0000000..2b80b67 --- /dev/null +++ b/src/fastqchunkparser.cpp @@ -0,0 +1,117 @@ +#include "fastqchunkparser.h" +#include "common.h" +#include +#include +#include + +using namespace std; + +// Find next newline in buf[pos..len), return position of char after \n. +// Handles \r\n. Returns len if no newline found. +static inline size_t nextLine(const char* buf, size_t pos, size_t len) { + const char* p = (const char*)memchr(buf + pos, '\n', len - pos); + if (!p) + return len; + return (p - buf) + 1; +} + +// Extract a line from buf[start..end) stripping trailing \r\n +static inline void extractLine(const char* buf, size_t start, size_t lineEnd, string* out) { + size_t end = lineEnd; + // strip trailing \n and \r + while (end > start && (buf[end - 1] == '\n' || buf[end - 1] == '\r')) + end--; + out->assign(buf + start, end - start); +} + +ReadPack* FastqChunkParser::parse(const char* buf, size_t len, int tid, ReadPool* pool, bool phred64) { + // Pre-count records: count newlines / 4 + int estimatedReads = 0; + { + const char* p = buf; + const char* end = buf + len; + while (p < end) { + p = (const char*)memchr(p, '\n', end - p); + if (!p) break; + estimatedReads++; + p++; + } + estimatedReads /= 4; + } + if (estimatedReads <= 0) + estimatedReads = 1; + + Read** data = new Read*[estimatedReads]; + int count = 0; + size_t pos = 0; + + while (pos < len) { + // Skip empty lines + while (pos < len && (buf[pos] == '\n' || buf[pos] == '\r')) + pos++; + if (pos >= len) + break; + + // Line 1: name (starts with @) + size_t nameStart = pos; + size_t nameEnd = nextLine(buf, pos, len); + if (nameStart >= len || buf[nameStart] != '@') + break; + pos = nameEnd; + + // Line 2: sequence + if (pos >= len) break; + size_t seqStart = pos; + size_t seqEnd = nextLine(buf, pos, len); + pos = seqEnd; + + // Line 3: strand (+) + if (pos >= len) break; + size_t strandStart = pos; + size_t strandEnd = nextLine(buf, pos, len); + pos = strandEnd; + + // Line 4: quality + if (pos >= len && strandEnd >= len) break; + size_t qualStart = pos; + size_t qualEnd = nextLine(buf, pos, len); + pos = qualEnd; + + // Build Read object + Read* r = nullptr; + if (pool) + r = pool->getOne(); + + if (r) { + extractLine(buf, nameStart, nameEnd, r->mName); + extractLine(buf, seqStart, seqEnd, r->mSeq); + extractLine(buf, strandStart, strandEnd, r->mStrand); + extractLine(buf, qualStart, qualEnd, r->mQuality); + } else { + string* name = new string(); + string* seq = new string(); + string* strand = new string(); + string* quality = new string(); + extractLine(buf, nameStart, nameEnd, name); + extractLine(buf, seqStart, seqEnd, seq); + extractLine(buf, strandStart, strandEnd, strand); + extractLine(buf, qualStart, qualEnd, quality); + r = new Read(name, seq, strand, quality, phred64); + } + + if (count >= estimatedReads) { + // Shouldn't happen, but be safe + Read** newData = new Read*[estimatedReads * 2]; + memcpy(newData, data, sizeof(Read*) * count); + delete[] data; + data = newData; + estimatedReads *= 2; + } + data[count++] = r; + } + + ReadPack* pack = new ReadPack; + pack->data = data; + pack->count = count; + return pack; +} diff --git a/src/fastqchunkparser.h b/src/fastqchunkparser.h new file mode 100644 index 0000000..d41e523 --- /dev/null +++ b/src/fastqchunkparser.h @@ -0,0 +1,17 @@ +#ifndef FASTQ_CHUNK_PARSER_H +#define FASTQ_CHUNK_PARSER_H + +#include "read.h" +#include "readpool.h" + +// Parse a raw byte buffer (from pread) into a ReadPack. +// The buffer must start at a record boundary and contain complete records. +class FastqChunkParser { +public: + // Parse buf[0..len) into Read* objects, return a ReadPack. + // If pool is non-null, try to reuse Read objects from the pool. + // tid is the thread id for the pool. + static ReadPack* parse(const char* buf, size_t len, int tid, ReadPool* pool, bool phred64); +}; + +#endif diff --git a/src/peprocessor.cpp b/src/peprocessor.cpp index 7411942..8427332 100644 --- a/src/peprocessor.cpp +++ b/src/peprocessor.cpp @@ -1,5 +1,6 @@ #include "peprocessor.h" #include "fastqreader.h" +#include "fastqchunkparser.h" #include #include #include @@ -44,6 +45,9 @@ PairEndProcessor::PairEndProcessor(Options* opt){ mLeftReadPool = new ReadPool(mOptions); mRightReadPool = new ReadPool(mOptions); + mChunkIndexLeft = NULL; + mChunkIndexRight = NULL; + mNextPackIndex = 0; } PairEndProcessor::~PairEndProcessor() { @@ -62,6 +66,14 @@ PairEndProcessor::~PairEndProcessor() { } delete[] mLeftInputLists; delete[] mRightInputLists; + if(mChunkIndexLeft) { + delete mChunkIndexLeft; + mChunkIndexLeft = NULL; + } + if(mChunkIndexRight) { + delete mChunkIndexRight; + mChunkIndexRight = NULL; + } } void PairEndProcessor::initOutput() { @@ -130,10 +142,40 @@ void PairEndProcessor::initConfig(ThreadConfig* config) { } +bool PairEndProcessor::canUseParallelRead() { + if (mOptions->interleavedInput) + return false; + if (mOptions->in1 == "/dev/stdin" || mOptions->in2 == "/dev/stdin") + return false; + if (ends_with(mOptions->in1, ".gz") || ends_with(mOptions->in2, ".gz")) + return false; + if (mOptions->readsToProcess > 0) + return false; + if (mOptions->split.enabled) + return false; + return true; +} + bool PairEndProcessor::process(){ if(!mOptions->split.enabled) initOutput(); + bool useParallelRead = canUseParallelRead(); + + if (useParallelRead) { + mChunkIndexLeft = new FastqChunkIndex(); + mChunkIndexRight = new FastqChunkIndex(); + if (!mChunkIndexLeft->build(mOptions->in1, PACK_SIZE) || + !mChunkIndexRight->build(mOptions->in2, PACK_SIZE) || + mChunkIndexLeft->packCount() != mChunkIndexRight->packCount()) { + delete mChunkIndexLeft; mChunkIndexLeft = NULL; + delete mChunkIndexRight; mChunkIndexRight = NULL; + useParallelRead = false; + } else if (mOptions->verbose) { + loginfo("parallel pread: indexed " + to_string(mChunkIndexLeft->packCount()) + " packs from R1/R2"); + } + } + std::thread* readerLeft = NULL; std::thread* readerRight = NULL; std::thread* readerInterveleaved = NULL; @@ -150,16 +192,32 @@ bool PairEndProcessor::process(){ initConfig(configs[t]); } - if(mOptions->interleavedInput) - readerInterveleaved= new std::thread(std::bind(&PairEndProcessor::interleavedReaderTask, this)); - else { - readerLeft = new std::thread(std::bind(&PairEndProcessor::readerTask, this, true)); - readerRight = new std::thread(std::bind(&PairEndProcessor::readerTask, this, false)); + if (useParallelRead) { + size_t totalPacks = mChunkIndexLeft->packCount(); + if (mLeftWriter) mLeftWriter->setOrderedMode(totalPacks); + if (mRightWriter) mRightWriter->setOrderedMode(totalPacks); + if (mMergedWriter) mMergedWriter->setOrderedMode(totalPacks); + if (mFailedWriter) mFailedWriter->setOrderedMode(totalPacks); + if (mOverlappedWriter) mOverlappedWriter->setOrderedMode(totalPacks); + if (mUnpairedLeftWriter) mUnpairedLeftWriter->setOrderedMode(totalPacks); + if (mUnpairedRightWriter) mUnpairedRightWriter->setOrderedMode(totalPacks); + } + + if (!useParallelRead) { + if(mOptions->interleavedInput) + readerInterveleaved= new std::thread(std::bind(&PairEndProcessor::interleavedReaderTask, this)); + else { + readerLeft = new std::thread(std::bind(&PairEndProcessor::readerTask, this, true)); + readerRight = new std::thread(std::bind(&PairEndProcessor::readerTask, this, false)); + } } std::thread** threads = new thread*[mOptions->thread]; for(int t=0; tthread; t++){ - threads[t] = new std::thread(std::bind(&PairEndProcessor::processorTask, this, configs[t])); + if (useParallelRead) + threads[t] = new std::thread(std::bind(&PairEndProcessor::processorTaskParallel, this, configs[t])); + else + threads[t] = new std::thread(std::bind(&PairEndProcessor::processorTask, this, configs[t])); } std::thread* leftWriterThread = NULL; @@ -186,7 +244,7 @@ bool PairEndProcessor::process(){ if(readerInterveleaved) { readerInterveleaved->join(); - } else { + } else if(readerLeft) { readerLeft->join(); readerRight->join(); } @@ -359,7 +417,7 @@ void PairEndProcessor::recycleToPool2(int tid, Read* r) { delete r; } -bool PairEndProcessor::processPairEnd(ReadPack* leftPack, ReadPack* rightPack, ThreadConfig* config){ +bool PairEndProcessor::processPairEnd(ReadPack* leftPack, ReadPack* rightPack, ThreadConfig* config, int64_t writeSeq){ if(leftPack->count != rightPack->count) { cerr << endl; cerr << "WARNING: different read numbers of the " << mPackProcessedCounter << " pack" << endl; @@ -646,33 +704,55 @@ bool PairEndProcessor::processPairEnd(ReadPack* leftPack, ReadPack* rightPack, T } if(mMergedWriter) { - // move to heap for writer thread ownership - mMergedWriter->input(tid, new string(std::move(mergedOutput))); + if (writeSeq >= 0) + mMergedWriter->inputWithSeq(tid, new string(std::move(mergedOutput)), (size_t)writeSeq); + else + mMergedWriter->input(tid, new string(std::move(mergedOutput))); } if(mFailedWriter) { - mFailedWriter->input(tid, new string(std::move(failedOut))); + if (writeSeq >= 0) + mFailedWriter->inputWithSeq(tid, new string(std::move(failedOut)), (size_t)writeSeq); + else + mFailedWriter->input(tid, new string(std::move(failedOut))); } if(mOverlappedWriter) { - mOverlappedWriter->input(tid, new string(std::move(overlappedOut))); + if (writeSeq >= 0) + mOverlappedWriter->inputWithSeq(tid, new string(std::move(overlappedOut)), (size_t)writeSeq); + else + mOverlappedWriter->input(tid, new string(std::move(overlappedOut))); } // normal output by left/right writer thread if(mRightWriter && mLeftWriter) { - // write PE - move to heap for writer thread ownership - mLeftWriter->input(tid, new string(std::move(outstr1))); - mRightWriter->input(tid, new string(std::move(outstr2))); + if (writeSeq >= 0) { + mLeftWriter->inputWithSeq(tid, new string(std::move(outstr1)), (size_t)writeSeq); + mRightWriter->inputWithSeq(tid, new string(std::move(outstr2)), (size_t)writeSeq); + } else { + mLeftWriter->input(tid, new string(std::move(outstr1))); + mRightWriter->input(tid, new string(std::move(outstr2))); + } } else if(mLeftWriter) { - // write singleOutput - mLeftWriter->input(tid, new string(std::move(singleOutput))); + if (writeSeq >= 0) + mLeftWriter->inputWithSeq(tid, new string(std::move(singleOutput)), (size_t)writeSeq); + else + mLeftWriter->input(tid, new string(std::move(singleOutput))); } // output unpaired reads if(mUnpairedLeftWriter && mUnpairedRightWriter) { - mUnpairedLeftWriter->input(tid, new string(std::move(unpairedOut1))); - mUnpairedRightWriter->input(tid, new string(std::move(unpairedOut2))); + if (writeSeq >= 0) { + mUnpairedLeftWriter->inputWithSeq(tid, new string(std::move(unpairedOut1)), (size_t)writeSeq); + mUnpairedRightWriter->inputWithSeq(tid, new string(std::move(unpairedOut2)), (size_t)writeSeq); + } else { + mUnpairedLeftWriter->input(tid, new string(std::move(unpairedOut1))); + mUnpairedRightWriter->input(tid, new string(std::move(unpairedOut2))); + } } else if(mUnpairedLeftWriter) { - mUnpairedLeftWriter->input(tid, new string(std::move(unpairedOut1))); + if (writeSeq >= 0) + mUnpairedLeftWriter->inputWithSeq(tid, new string(std::move(unpairedOut1)), (size_t)writeSeq); + else + mUnpairedLeftWriter->input(tid, new string(std::move(unpairedOut1))); } if(mOptions->split.byFileLines) @@ -1062,6 +1142,64 @@ void PairEndProcessor::processorTask(ThreadConfig* config) } } +void PairEndProcessor::processorTaskParallel(ThreadConfig* config) +{ + int tid = config->getThreadId(); + size_t totalPacks = mChunkIndexLeft->packCount(); + int fdLeft = mChunkIndexLeft->fd(); + int fdRight = mChunkIndexRight->fd(); + + while (true) { + size_t packIdx = mNextPackIndex.fetch_add(1, std::memory_order_relaxed); + if (packIdx >= totalPacks) + break; + + // pread left (R1) + size_t offL = mChunkIndexLeft->packStart(packIdx); + size_t lenL = mChunkIndexLeft->packLength(packIdx); + char* bufL = new char[lenL]; + ssize_t nL = pread(fdLeft, bufL, lenL, offL); + if (nL <= 0) { delete[] bufL; break; } + + // pread right (R2) + size_t offR = mChunkIndexRight->packStart(packIdx); + size_t lenR = mChunkIndexRight->packLength(packIdx); + char* bufR = new char[lenR]; + ssize_t nR = pread(fdRight, bufR, lenR, offR); + if (nR <= 0) { delete[] bufL; delete[] bufR; break; } + + ReadPack* leftPack = FastqChunkParser::parse(bufL, nL, tid, nullptr, mOptions->phred64); + ReadPack* rightPack = FastqChunkParser::parse(bufR, nR, tid, nullptr, mOptions->phred64); + delete[] bufL; + delete[] bufR; + + processPairEnd(leftPack, rightPack, config, (int64_t)packIdx); + + if (mLeftWriter && mLeftWriter->bufferLength() > PACK_IN_MEM_LIMIT) { + std::unique_lock lk(mBackpressureMtx); + while (mLeftWriter->bufferLength() > PACK_IN_MEM_LIMIT) { + mBackpressureCV.wait_for(lk, std::chrono::milliseconds(1)); + } + } + } + + int finishedCount = mFinishedThreads.fetch_add(1, std::memory_order_release) + 1; + if (finishedCount == mOptions->thread) { + if (mLeftWriter) mLeftWriter->setInputCompleted(); + if (mRightWriter) mRightWriter->setInputCompleted(); + if (mUnpairedLeftWriter) mUnpairedLeftWriter->setInputCompleted(); + if (mUnpairedRightWriter) mUnpairedRightWriter->setInputCompleted(); + if (mMergedWriter) mMergedWriter->setInputCompleted(); + if (mFailedWriter) mFailedWriter->setInputCompleted(); + if (mOverlappedWriter) mOverlappedWriter->setInputCompleted(); + } + + if (mOptions->verbose) { + string msg = "thread " + to_string(tid + 1) + " finished (parallel pread)"; + loginfo(msg); + } +} + void PairEndProcessor::writerTask(WriterThread* config) { while(true) { diff --git a/src/peprocessor.h b/src/peprocessor.h index 707e41c..8b0ba16 100644 --- a/src/peprocessor.h +++ b/src/peprocessor.h @@ -17,7 +17,7 @@ #include "writerthread.h" #include "duplicate.h" #include "readpool.h" - +#include "fastqchunkindex.h" using namespace std; @@ -30,10 +30,12 @@ class PairEndProcessor{ bool process(); private: - bool processPairEnd(ReadPack* leftPack, ReadPack* rightPack, ThreadConfig* config); + bool processPairEnd(ReadPack* leftPack, ReadPack* rightPack, ThreadConfig* config, int64_t writeSeq = -1); void readerTask(bool isLeft); void interleavedReaderTask(); void processorTask(ThreadConfig* config); + void processorTaskParallel(ThreadConfig* config); + bool canUseParallelRead(); void initConfig(ThreadConfig* config); void initOutput(); void closeOutput(); @@ -69,6 +71,9 @@ class PairEndProcessor{ atomic_bool shouldStopReading; std::mutex mBackpressureMtx; std::condition_variable mBackpressureCV; + FastqChunkIndex* mChunkIndexLeft; + FastqChunkIndex* mChunkIndexRight; + std::atomic mNextPackIndex; }; diff --git a/src/seprocessor.cpp b/src/seprocessor.cpp index 577cab2..423a22f 100644 --- a/src/seprocessor.cpp +++ b/src/seprocessor.cpp @@ -1,5 +1,6 @@ #include "seprocessor.h" #include "fastqreader.h" +#include "fastqchunkparser.h" #include #include #include @@ -30,6 +31,8 @@ SingleEndProcessor::SingleEndProcessor(Options* opt){ mPackProcessedCounter = 0; mReadPool = new ReadPool(mOptions); + mChunkIndex = NULL; + mNextPackIndex = 0; } SingleEndProcessor::~SingleEndProcessor() { @@ -42,6 +45,14 @@ SingleEndProcessor::~SingleEndProcessor() { delete mReadPool; mReadPool = NULL; } + if(mChunkIndex) { + delete mChunkIndex; + mChunkIndex = NULL; + } + if(mReadPool) { + delete mReadPool; + mReadPool = NULL; + } delete[] mInputLists; } @@ -73,10 +84,35 @@ void SingleEndProcessor::initConfig(ThreadConfig* config) { } } +bool SingleEndProcessor::canUseParallelRead() { + if (mOptions->in1 == "/dev/stdin") + return false; + if (ends_with(mOptions->in1, ".gz")) + return false; + if (mOptions->readsToProcess > 0) + return false; + if (mOptions->split.enabled) + return false; + return true; +} + bool SingleEndProcessor::process(){ if(!mOptions->split.enabled) initOutput(); + bool useParallelRead = canUseParallelRead(); + + if (useParallelRead) { + mChunkIndex = new FastqChunkIndex(); + if (!mChunkIndex->build(mOptions->in1, PACK_SIZE)) { + delete mChunkIndex; + mChunkIndex = NULL; + useParallelRead = false; + } else if (mOptions->verbose) { + loginfo("parallel pread: indexed " + to_string(mChunkIndex->packCount()) + " packs from " + mOptions->in1); + } + } + mInputLists = new SingleProducerSingleConsumerList*[mOptions->thread]; ThreadConfig** configs = new ThreadConfig*[mOptions->thread]; @@ -87,11 +123,25 @@ bool SingleEndProcessor::process(){ initConfig(configs[t]); } - std::thread readerThread(std::bind(&SingleEndProcessor::readerTask, this)); + if (useParallelRead) { + size_t totalPacks = mChunkIndex->packCount(); + if (mLeftWriter) + mLeftWriter->setOrderedMode(totalPacks); + if (mFailedWriter) + mFailedWriter->setOrderedMode(totalPacks); + } + + std::thread* readerThreadPtr = NULL; + if (!useParallelRead) { + readerThreadPtr = new std::thread(std::bind(&SingleEndProcessor::readerTask, this)); + } std::thread** threads = new thread*[mOptions->thread]; for(int t=0; tthread; t++){ - threads[t] = new std::thread(std::bind(&SingleEndProcessor::processorTask, this, configs[t])); + if (useParallelRead) + threads[t] = new std::thread(std::bind(&SingleEndProcessor::processorTaskParallel, this, configs[t])); + else + threads[t] = new std::thread(std::bind(&SingleEndProcessor::processorTask, this, configs[t])); } std::thread* leftWriterThread = NULL; @@ -101,7 +151,10 @@ bool SingleEndProcessor::process(){ if(mFailedWriter) failedWriterThread = new std::thread(std::bind(&SingleEndProcessor::writerTask, this, mFailedWriter)); - readerThread.join(); + if (readerThreadPtr) { + readerThreadPtr->join(); + delete readerThreadPtr; + } for(int t=0; tthread; t++){ threads[t]->join(); } @@ -194,7 +247,7 @@ void SingleEndProcessor::recycleToPool(int tid, Read* r) { delete r; } -bool SingleEndProcessor::processSingleEnd(ReadPack* pack, ThreadConfig* config){ +bool SingleEndProcessor::processSingleEnd(ReadPack* pack, ThreadConfig* config, int64_t writeSeq){ // build output on stack strings, move to heap only when handing off to writers string outstr, failedOut; outstr.reserve(pack->count * 320); @@ -300,10 +353,16 @@ bool SingleEndProcessor::processSingleEnd(ReadPack* pack, ThreadConfig* config){ } if(mLeftWriter) { - mLeftWriter->input(tid, new string(std::move(outstr))); + if (writeSeq >= 0) + mLeftWriter->inputWithSeq(tid, new string(std::move(outstr)), (size_t)writeSeq); + else + mLeftWriter->input(tid, new string(std::move(outstr))); } if(mFailedWriter) { - mFailedWriter->input(tid, new string(std::move(failedOut))); + if (writeSeq >= 0) + mFailedWriter->inputWithSeq(tid, new string(std::move(failedOut)), (size_t)writeSeq); + else + mFailedWriter->input(tid, new string(std::move(failedOut))); } if(mOptions->split.byFileLines) @@ -468,6 +527,52 @@ void SingleEndProcessor::processorTask(ThreadConfig* config) } } +void SingleEndProcessor::processorTaskParallel(ThreadConfig* config) +{ + int tid = config->getThreadId(); + size_t totalPacks = mChunkIndex->packCount(); + int fd = mChunkIndex->fd(); + + while (true) { + size_t packIdx = mNextPackIndex.fetch_add(1, std::memory_order_relaxed); + if (packIdx >= totalPacks) + break; + + size_t offset = mChunkIndex->packStart(packIdx); + size_t len = mChunkIndex->packLength(packIdx); + char* buf = new char[len]; + ssize_t bytesRead = pread(fd, buf, len, offset); + if (bytesRead <= 0) { + delete[] buf; + break; + } + + ReadPack* pack = FastqChunkParser::parse(buf, bytesRead, tid, nullptr, mOptions->phred64); + delete[] buf; + + processSingleEnd(pack, config, (int64_t)packIdx); + + if (mLeftWriter && mLeftWriter->bufferLength() > PACK_IN_MEM_LIMIT) { + std::unique_lock lk(mBackpressureMtx); + while (mLeftWriter->bufferLength() > PACK_IN_MEM_LIMIT) { + mBackpressureCV.wait_for(lk, std::chrono::milliseconds(1)); + } + } + } + + if (mFinishedThreads.fetch_add(1, std::memory_order_release) + 1 == mOptions->thread) { + if (mLeftWriter) + mLeftWriter->setInputCompleted(); + if (mFailedWriter) + mFailedWriter->setInputCompleted(); + } + + if (mOptions->verbose) { + string msg = "thread " + to_string(tid + 1) + " finished (parallel pread)"; + loginfo(msg); + } +} + void SingleEndProcessor::writerTask(WriterThread* config) { while(true) { diff --git a/src/seprocessor.h b/src/seprocessor.h index b3c7152..f47d28c 100644 --- a/src/seprocessor.h +++ b/src/seprocessor.h @@ -17,6 +17,7 @@ #include "duplicate.h" #include "singleproducersingleconsumerlist.h" #include "readpool.h" +#include "fastqchunkindex.h" using namespace std; @@ -29,9 +30,11 @@ class SingleEndProcessor{ bool process(); private: - bool processSingleEnd(ReadPack* pack, ThreadConfig* config); + bool processSingleEnd(ReadPack* pack, ThreadConfig* config, int64_t writeSeq = -1); void readerTask(); void processorTask(ThreadConfig* config); + void processorTaskParallel(ThreadConfig* config); + bool canUseParallelRead(); void initConfig(ThreadConfig* config); void initOutput(); void closeOutput(); @@ -53,6 +56,8 @@ class SingleEndProcessor{ ReadPool* mReadPool; std::mutex mBackpressureMtx; std::condition_variable mBackpressureCV; + FastqChunkIndex* mChunkIndex; + std::atomic mNextPackIndex; }; diff --git a/src/writerthread.cpp b/src/writerthread.cpp index 3424e3e..db10bec 100644 --- a/src/writerthread.cpp +++ b/src/writerthread.cpp @@ -13,13 +13,18 @@ WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){ mInputCompleted = false; mFilename = filename; - mPwriteMode = !isSTDOUT && ends_with(filename, ".gz") && mOptions->thread > 1; + mPwriteMode = !isSTDOUT && mOptions->thread > 1; + mCompressInPwrite = mPwriteMode && ends_with(filename, ".gz"); mFd = -1; mOffsetRing = NULL; mNextSeq = NULL; mCompressors = NULL; mCompBufs = NULL; mCompBufSize = 0; + mOrderedMode = false; + mOrderedTotal = 0; + mOrderedRing = NULL; + mOrderedWriteCursor = 0; mBufferLists = NULL; if (mPwriteMode) { @@ -30,14 +35,15 @@ WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){ mNextSeq = new size_t[mOptions->thread]; for (int t = 0; t < mOptions->thread; t++) mNextSeq[t] = t; - mCompressors = new libdeflate_compressor*[mOptions->thread]; - for (int t = 0; t < mOptions->thread; t++) - mCompressors[t] = libdeflate_alloc_compressor(mOptions->compression); - // Pre-allocate per-worker compress buffers (avoids malloc/free per pack) - mCompBufSize = PACK_SIZE * 500; // ~500 bytes/read worst case - mCompBufs = new char*[mOptions->thread]; - for (int t = 0; t < mOptions->thread; t++) - mCompBufs[t] = new char[mCompBufSize]; + if (mCompressInPwrite) { + mCompressors = new libdeflate_compressor*[mOptions->thread]; + for (int t = 0; t < mOptions->thread; t++) + mCompressors[t] = libdeflate_alloc_compressor(mOptions->compression); + mCompBufSize = PACK_SIZE * 500; // ~500 bytes/read worst case + mCompBufs = new char*[mOptions->thread]; + for (int t = 0; t < mOptions->thread; t++) + mCompBufs[t] = new char[mCompBufSize]; + } mWorkingBufferList = 0; mBufferLength = 0; } else { @@ -55,18 +61,45 @@ WriterThread::~WriterThread() { bool WriterThread::isCompleted() { if (mPwriteMode) return true; // no writer thread needed + if (mOrderedMode) + return mInputCompleted && mOrderedWriteCursor.load(std::memory_order_acquire) >= mOrderedTotal; return mInputCompleted && (mBufferLength==0); } bool WriterThread::setInputCompleted() { if (mPwriteMode) { - setInputCompletedPwrite(); + if (mOrderedMode) { + if (mOrderedTotal > 0) { + size_t lastSeq = mOrderedTotal - 1; + size_t lastSlot = lastSeq & (OFFSET_RING_SIZE - 1); + for (int spins = 0; mOffsetRing[lastSlot].published_seq.load(std::memory_order_acquire) != lastSeq; ) { + if (++spins > 256) { + usleep(1); + spins = 0; + } else { +#if defined(__aarch64__) + __asm__ volatile("yield"); +#elif defined(__x86_64__) || defined(__i386__) + __asm__ volatile("pause"); +#endif + } + } + size_t offset = mOffsetRing[lastSlot].cumulative_offset.load(std::memory_order_relaxed); + ftruncate(mFd, offset); + } else { + ftruncate(mFd, 0); + } + } else { + setInputCompletedPwrite(); + } mInputCompleted = true; return true; } mInputCompleted = true; - for(int t=0; tthread; t++) { - mBufferLists[t]->setProducerFinished(); + if (!mOrderedMode) { + for(int t=0; tthread; t++) { + mBufferLists[t]->setProducerFinished(); + } } return true; } @@ -89,8 +122,34 @@ void WriterThread::setInputCompletedPwrite() { ftruncate(mFd, offset); } +void WriterThread::setOrderedMode(size_t totalPacks) { + mOrderedMode = true; + mOrderedTotal = totalPacks; + if (!mPwriteMode) { + mOrderedRing = new std::atomic[OFFSET_RING_SIZE]; + for (int i = 0; i < OFFSET_RING_SIZE; i++) + mOrderedRing[i].store(nullptr, std::memory_order_relaxed); + } +} + void WriterThread::output(){ if (mPwriteMode) return; // no-op + if (mOrderedMode) { + size_t cursor = mOrderedWriteCursor.load(std::memory_order_relaxed); + if (cursor >= mOrderedTotal) return; + size_t slot = cursor & (OFFSET_RING_SIZE - 1); + string* str = mOrderedRing[slot].load(std::memory_order_acquire); + if (str) { + mWriter1->write(str->data(), str->length()); + delete str; + mOrderedRing[slot].store(nullptr, std::memory_order_release); + mOrderedWriteCursor.store(cursor + 1, std::memory_order_release); + mBufferLength--; + } else { + usleep(100); + } + return; + } SingleProducerSingleConsumerList* list = mBufferLists[mWorkingBufferList]; if(!list->canBeConsumed()) { usleep(100); @@ -113,33 +172,48 @@ void WriterThread::input(int tid, string* data) { } void WriterThread::inputPwrite(int tid, string* data) { - size_t bound = libdeflate_gzip_compress_bound(mCompressors[tid], data->size()); - // Grow pre-allocated buffer if needed - if (bound > mCompBufSize) { - delete[] mCompBufs[tid]; - mCompBufs[tid] = new char[bound]; - // Note: mCompBufSize is shared but only grows, safe for other threads - } - size_t outsize = libdeflate_gzip_compress(mCompressors[tid], data->data(), data->size(), - mCompBufs[tid], bound); - if (outsize == 0) - error_exit("libdeflate gzip compression failed"); - delete data; - const char* writeData = mCompBufs[tid]; - size_t wsize = outsize; - size_t seq = mNextSeq[tid]; + doInputPwrite(tid, data, seq); + mNextSeq[tid] += mOptions->thread; +} + +void WriterThread::doInputPwrite(int tid, string* data, size_t seq) { + const char* writeData; + size_t wsize; + + if (mCompressInPwrite) { + size_t bound = libdeflate_gzip_compress_bound(mCompressors[tid], data->size()); + if (bound > mCompBufSize) { + delete[] mCompBufs[tid]; + mCompBufs[tid] = new char[bound]; + } + size_t outsize = libdeflate_gzip_compress(mCompressors[tid], data->data(), data->size(), + mCompBufs[tid], bound); + if (outsize == 0) + error_exit("libdeflate gzip compression failed"); + delete data; + writeData = mCompBufs[tid]; + wsize = outsize; + } else { + writeData = data->data(); + wsize = data->size(); + } // Wait for previous batch's cumulative offset size_t offset = 0; if (seq > 0) { size_t prevSlot = (seq - 1) & (OFFSET_RING_SIZE - 1); - while (mOffsetRing[prevSlot].published_seq.load(std::memory_order_acquire) != seq - 1) { + for (int spins = 0; mOffsetRing[prevSlot].published_seq.load(std::memory_order_acquire) != seq - 1; ) { + if (++spins > 256) { + usleep(1); + spins = 0; + } else { #if defined(__aarch64__) - __asm__ volatile("yield"); + __asm__ volatile("yield"); #elif defined(__x86_64__) || defined(__i386__) - __asm__ volatile("pause"); + __asm__ volatile("pause"); #endif + } } offset = mOffsetRing[prevSlot].cumulative_offset.load(std::memory_order_relaxed); } @@ -164,10 +238,36 @@ void WriterThread::inputPwrite(int tid, string* data) { } } - mNextSeq[tid] += mOptions->thread; + if (!mCompressInPwrite) + delete data; +} + +void WriterThread::inputOrderedRing(string* data, size_t seq) { + while (seq - mOrderedWriteCursor.load(std::memory_order_acquire) >= (size_t)OFFSET_RING_SIZE) { + usleep(10); + } + size_t slot = seq & (OFFSET_RING_SIZE - 1); + mOrderedRing[slot].store(data, std::memory_order_release); + mBufferLength++; +} + +void WriterThread::inputWithSeq(int tid, string* data, size_t seq) { + if (mPwriteMode) { + doInputPwrite(tid, data, seq); + } else { + inputOrderedRing(data, seq); + } } void WriterThread::cleanup() { + if (mOrderedRing) { + for (int i = 0; i < OFFSET_RING_SIZE; i++) { + string* s = mOrderedRing[i].load(std::memory_order_relaxed); + if (s) delete s; + } + delete[] mOrderedRing; + mOrderedRing = NULL; + } if (mPwriteMode) { if (mFd >= 0) { close(mFd); mFd = -1; } delete[] mOffsetRing; mOffsetRing = NULL; diff --git a/src/writerthread.h b/src/writerthread.h index a2b7dc8..8c36863 100644 --- a/src/writerthread.h +++ b/src/writerthread.h @@ -39,11 +39,15 @@ class WriterThread{ long bufferLength() {return mBufferLength;}; string getFilename() {return mFilename;} bool isPwriteMode() {return mPwriteMode;} + void setOrderedMode(size_t totalPacks); + void inputWithSeq(int tid, string* data, size_t seq); private: void deleteWriter(); void inputPwrite(int tid, string* data); + void doInputPwrite(int tid, string* data, size_t seq); void setInputCompletedPwrite(); + void inputOrderedRing(string* data, size_t seq); private: Writer* mWriter1; @@ -55,14 +59,20 @@ class WriterThread{ SingleProducerSingleConsumerList** mBufferLists; int mWorkingBufferList; - // pwrite mode: parallel libdeflate gz compression + direct file write + // pwrite mode: parallel direct file write (with optional gz compression) bool mPwriteMode; + bool mCompressInPwrite; int mFd; OffsetSlot* mOffsetRing; size_t* mNextSeq; libdeflate_compressor** mCompressors; char** mCompBufs; // per-worker pre-allocated compress output buffers size_t mCompBufSize; + // Ordered output mode (parallel pread): pack-index-based sequencing + bool mOrderedMode; + size_t mOrderedTotal; + std::atomic* mOrderedRing; + std::atomic mOrderedWriteCursor; }; #endif