From d2be62e90c4f0103043c79ad3b53cd261d3852b2 Mon Sep 17 00:00:00 2001 From: Kim Yang Date: Fri, 27 Mar 2026 12:10:58 +0800 Subject: [PATCH 1/3] perf: extend pwrite parallel write to raw FASTQ output Previously pwrite mode only activated for .gz output (parallel libdeflate compression). Now any non-stdout multi-threaded output uses pwrite: worker threads write directly to non-overlapping file regions via pwrite(2), bypassing the single writer thread. For .gz: compress with libdeflate then pwrite (existing behavior). For .fq: pwrite raw output bytes directly (new). Also adds hybrid backoff to the pwrite spin-wait: yield for 256 iterations then usleep(1), preventing CPU burn when threads wait on predecessor sequence numbers. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/writerthread.cpp | 74 +++++++++++++++++++++++++++----------------- src/writerthread.h | 4 ++- 2 files changed, 49 insertions(+), 29 deletions(-) diff --git a/src/writerthread.cpp b/src/writerthread.cpp index 3424e3e..2ef8a52 100644 --- a/src/writerthread.cpp +++ b/src/writerthread.cpp @@ -13,7 +13,8 @@ WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){ mInputCompleted = false; mFilename = filename; - mPwriteMode = !isSTDOUT && ends_with(filename, ".gz") && mOptions->thread > 1; + mPwriteMode = !isSTDOUT && mOptions->thread > 1; + mCompressInPwrite = mPwriteMode && ends_with(filename, ".gz"); mFd = -1; mOffsetRing = NULL; mNextSeq = NULL; @@ -30,14 +31,15 @@ WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){ mNextSeq = new size_t[mOptions->thread]; for (int t = 0; t < mOptions->thread; t++) mNextSeq[t] = t; - mCompressors = new libdeflate_compressor*[mOptions->thread]; - for (int t = 0; t < mOptions->thread; t++) - mCompressors[t] = libdeflate_alloc_compressor(mOptions->compression); - // Pre-allocate per-worker compress buffers (avoids malloc/free per pack) - mCompBufSize = PACK_SIZE * 500; // ~500 bytes/read worst case - mCompBufs = new char*[mOptions->thread]; - for (int t = 0; t < mOptions->thread; t++) - mCompBufs[t] = new char[mCompBufSize]; + if (mCompressInPwrite) { + mCompressors = new libdeflate_compressor*[mOptions->thread]; + for (int t = 0; t < mOptions->thread; t++) + mCompressors[t] = libdeflate_alloc_compressor(mOptions->compression); + mCompBufSize = PACK_SIZE * 500; // ~500 bytes/read worst case + mCompBufs = new char*[mOptions->thread]; + for (int t = 0; t < mOptions->thread; t++) + mCompBufs[t] = new char[mCompBufSize]; + } mWorkingBufferList = 0; mBufferLength = 0; } else { @@ -113,33 +115,48 @@ void WriterThread::input(int tid, string* data) { } void WriterThread::inputPwrite(int tid, string* data) { - size_t bound = libdeflate_gzip_compress_bound(mCompressors[tid], data->size()); - // Grow pre-allocated buffer if needed - if (bound > mCompBufSize) { - delete[] mCompBufs[tid]; - mCompBufs[tid] = new char[bound]; - // Note: mCompBufSize is shared but only grows, safe for other threads - } - size_t outsize = libdeflate_gzip_compress(mCompressors[tid], data->data(), data->size(), - mCompBufs[tid], bound); - if (outsize == 0) - error_exit("libdeflate gzip compression failed"); - delete data; - const char* writeData = mCompBufs[tid]; - size_t wsize = outsize; - size_t seq = mNextSeq[tid]; + doInputPwrite(tid, data, seq); + mNextSeq[tid] += mOptions->thread; +} + +void WriterThread::doInputPwrite(int tid, string* data, size_t seq) { + const char* writeData; + size_t wsize; + + if (mCompressInPwrite) { + size_t bound = libdeflate_gzip_compress_bound(mCompressors[tid], data->size()); + if (bound > mCompBufSize) { + delete[] mCompBufs[tid]; + mCompBufs[tid] = new char[bound]; + } + size_t outsize = libdeflate_gzip_compress(mCompressors[tid], data->data(), data->size(), + mCompBufs[tid], bound); + if (outsize == 0) + error_exit("libdeflate gzip compression failed"); + delete data; + writeData = mCompBufs[tid]; + wsize = outsize; + } else { + writeData = data->data(); + wsize = data->size(); + } // Wait for previous batch's cumulative offset size_t offset = 0; if (seq > 0) { size_t prevSlot = (seq - 1) & (OFFSET_RING_SIZE - 1); - while (mOffsetRing[prevSlot].published_seq.load(std::memory_order_acquire) != seq - 1) { + for (int spins = 0; mOffsetRing[prevSlot].published_seq.load(std::memory_order_acquire) != seq - 1; ) { + if (++spins > 256) { + usleep(1); + spins = 0; + } else { #if defined(__aarch64__) - __asm__ volatile("yield"); + __asm__ volatile("yield"); #elif defined(__x86_64__) || defined(__i386__) - __asm__ volatile("pause"); + __asm__ volatile("pause"); #endif + } } offset = mOffsetRing[prevSlot].cumulative_offset.load(std::memory_order_relaxed); } @@ -164,7 +181,8 @@ void WriterThread::inputPwrite(int tid, string* data) { } } - mNextSeq[tid] += mOptions->thread; + if (!mCompressInPwrite) + delete data; } void WriterThread::cleanup() { diff --git a/src/writerthread.h b/src/writerthread.h index a2b7dc8..19a4ac5 100644 --- a/src/writerthread.h +++ b/src/writerthread.h @@ -43,6 +43,7 @@ class WriterThread{ private: void deleteWriter(); void inputPwrite(int tid, string* data); + void doInputPwrite(int tid, string* data, size_t seq); void setInputCompletedPwrite(); private: @@ -55,8 +56,9 @@ class WriterThread{ SingleProducerSingleConsumerList** mBufferLists; int mWorkingBufferList; - // pwrite mode: parallel libdeflate gz compression + direct file write + // pwrite mode: parallel direct file write (with optional gz compression) bool mPwriteMode; + bool mCompressInPwrite; int mFd; OffsetSlot* mOffsetRing; size_t* mNextSeq; From 8622042cb34217bdaceb44674cb07b7230bf6021 Mon Sep 17 00:00:00 2001 From: Kim Yang Date: Fri, 27 Mar 2026 12:15:08 +0800 Subject: [PATCH 2/3] perf: SE parallel pread with ordered output Worker threads use pread(2) to read chunks from uncompressed FASTQ files in parallel, bypassing the single reader thread bottleneck. Each thread atomically grabs the next pack index via fetch_add, reads its chunk with pread, parses it, and processes it. WriterThread gains ordered-mode (setOrderedMode/inputWithSeq): - pwrite path: uses pack index as sequence number in offset ring - non-pwrite path: ordered ring buffer drained by writer thread Parallel pread is disabled for stdin, .gz input, split mode, and readsToProcess limit. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/seprocessor.cpp | 117 ++++++++++++++++++++++++++++++++++++++++--- src/seprocessor.h | 7 ++- src/writerthread.cpp | 88 ++++++++++++++++++++++++++++++-- src/writerthread.h | 8 +++ 4 files changed, 210 insertions(+), 10 deletions(-) diff --git a/src/seprocessor.cpp b/src/seprocessor.cpp index 577cab2..423a22f 100644 --- a/src/seprocessor.cpp +++ b/src/seprocessor.cpp @@ -1,5 +1,6 @@ #include "seprocessor.h" #include "fastqreader.h" +#include "fastqchunkparser.h" #include #include #include @@ -30,6 +31,8 @@ SingleEndProcessor::SingleEndProcessor(Options* opt){ mPackProcessedCounter = 0; mReadPool = new ReadPool(mOptions); + mChunkIndex = NULL; + mNextPackIndex = 0; } SingleEndProcessor::~SingleEndProcessor() { @@ -42,6 +45,14 @@ SingleEndProcessor::~SingleEndProcessor() { delete mReadPool; mReadPool = NULL; } + if(mChunkIndex) { + delete mChunkIndex; + mChunkIndex = NULL; + } + if(mReadPool) { + delete mReadPool; + mReadPool = NULL; + } delete[] mInputLists; } @@ -73,10 +84,35 @@ void SingleEndProcessor::initConfig(ThreadConfig* config) { } } +bool SingleEndProcessor::canUseParallelRead() { + if (mOptions->in1 == "/dev/stdin") + return false; + if (ends_with(mOptions->in1, ".gz")) + return false; + if (mOptions->readsToProcess > 0) + return false; + if (mOptions->split.enabled) + return false; + return true; +} + bool SingleEndProcessor::process(){ if(!mOptions->split.enabled) initOutput(); + bool useParallelRead = canUseParallelRead(); + + if (useParallelRead) { + mChunkIndex = new FastqChunkIndex(); + if (!mChunkIndex->build(mOptions->in1, PACK_SIZE)) { + delete mChunkIndex; + mChunkIndex = NULL; + useParallelRead = false; + } else if (mOptions->verbose) { + loginfo("parallel pread: indexed " + to_string(mChunkIndex->packCount()) + " packs from " + mOptions->in1); + } + } + mInputLists = new SingleProducerSingleConsumerList*[mOptions->thread]; ThreadConfig** configs = new ThreadConfig*[mOptions->thread]; @@ -87,11 +123,25 @@ bool SingleEndProcessor::process(){ initConfig(configs[t]); } - std::thread readerThread(std::bind(&SingleEndProcessor::readerTask, this)); + if (useParallelRead) { + size_t totalPacks = mChunkIndex->packCount(); + if (mLeftWriter) + mLeftWriter->setOrderedMode(totalPacks); + if (mFailedWriter) + mFailedWriter->setOrderedMode(totalPacks); + } + + std::thread* readerThreadPtr = NULL; + if (!useParallelRead) { + readerThreadPtr = new std::thread(std::bind(&SingleEndProcessor::readerTask, this)); + } std::thread** threads = new thread*[mOptions->thread]; for(int t=0; tthread; t++){ - threads[t] = new std::thread(std::bind(&SingleEndProcessor::processorTask, this, configs[t])); + if (useParallelRead) + threads[t] = new std::thread(std::bind(&SingleEndProcessor::processorTaskParallel, this, configs[t])); + else + threads[t] = new std::thread(std::bind(&SingleEndProcessor::processorTask, this, configs[t])); } std::thread* leftWriterThread = NULL; @@ -101,7 +151,10 @@ bool SingleEndProcessor::process(){ if(mFailedWriter) failedWriterThread = new std::thread(std::bind(&SingleEndProcessor::writerTask, this, mFailedWriter)); - readerThread.join(); + if (readerThreadPtr) { + readerThreadPtr->join(); + delete readerThreadPtr; + } for(int t=0; tthread; t++){ threads[t]->join(); } @@ -194,7 +247,7 @@ void SingleEndProcessor::recycleToPool(int tid, Read* r) { delete r; } -bool SingleEndProcessor::processSingleEnd(ReadPack* pack, ThreadConfig* config){ +bool SingleEndProcessor::processSingleEnd(ReadPack* pack, ThreadConfig* config, int64_t writeSeq){ // build output on stack strings, move to heap only when handing off to writers string outstr, failedOut; outstr.reserve(pack->count * 320); @@ -300,10 +353,16 @@ bool SingleEndProcessor::processSingleEnd(ReadPack* pack, ThreadConfig* config){ } if(mLeftWriter) { - mLeftWriter->input(tid, new string(std::move(outstr))); + if (writeSeq >= 0) + mLeftWriter->inputWithSeq(tid, new string(std::move(outstr)), (size_t)writeSeq); + else + mLeftWriter->input(tid, new string(std::move(outstr))); } if(mFailedWriter) { - mFailedWriter->input(tid, new string(std::move(failedOut))); + if (writeSeq >= 0) + mFailedWriter->inputWithSeq(tid, new string(std::move(failedOut)), (size_t)writeSeq); + else + mFailedWriter->input(tid, new string(std::move(failedOut))); } if(mOptions->split.byFileLines) @@ -468,6 +527,52 @@ void SingleEndProcessor::processorTask(ThreadConfig* config) } } +void SingleEndProcessor::processorTaskParallel(ThreadConfig* config) +{ + int tid = config->getThreadId(); + size_t totalPacks = mChunkIndex->packCount(); + int fd = mChunkIndex->fd(); + + while (true) { + size_t packIdx = mNextPackIndex.fetch_add(1, std::memory_order_relaxed); + if (packIdx >= totalPacks) + break; + + size_t offset = mChunkIndex->packStart(packIdx); + size_t len = mChunkIndex->packLength(packIdx); + char* buf = new char[len]; + ssize_t bytesRead = pread(fd, buf, len, offset); + if (bytesRead <= 0) { + delete[] buf; + break; + } + + ReadPack* pack = FastqChunkParser::parse(buf, bytesRead, tid, nullptr, mOptions->phred64); + delete[] buf; + + processSingleEnd(pack, config, (int64_t)packIdx); + + if (mLeftWriter && mLeftWriter->bufferLength() > PACK_IN_MEM_LIMIT) { + std::unique_lock lk(mBackpressureMtx); + while (mLeftWriter->bufferLength() > PACK_IN_MEM_LIMIT) { + mBackpressureCV.wait_for(lk, std::chrono::milliseconds(1)); + } + } + } + + if (mFinishedThreads.fetch_add(1, std::memory_order_release) + 1 == mOptions->thread) { + if (mLeftWriter) + mLeftWriter->setInputCompleted(); + if (mFailedWriter) + mFailedWriter->setInputCompleted(); + } + + if (mOptions->verbose) { + string msg = "thread " + to_string(tid + 1) + " finished (parallel pread)"; + loginfo(msg); + } +} + void SingleEndProcessor::writerTask(WriterThread* config) { while(true) { diff --git a/src/seprocessor.h b/src/seprocessor.h index b3c7152..f47d28c 100644 --- a/src/seprocessor.h +++ b/src/seprocessor.h @@ -17,6 +17,7 @@ #include "duplicate.h" #include "singleproducersingleconsumerlist.h" #include "readpool.h" +#include "fastqchunkindex.h" using namespace std; @@ -29,9 +30,11 @@ class SingleEndProcessor{ bool process(); private: - bool processSingleEnd(ReadPack* pack, ThreadConfig* config); + bool processSingleEnd(ReadPack* pack, ThreadConfig* config, int64_t writeSeq = -1); void readerTask(); void processorTask(ThreadConfig* config); + void processorTaskParallel(ThreadConfig* config); + bool canUseParallelRead(); void initConfig(ThreadConfig* config); void initOutput(); void closeOutput(); @@ -53,6 +56,8 @@ class SingleEndProcessor{ ReadPool* mReadPool; std::mutex mBackpressureMtx; std::condition_variable mBackpressureCV; + FastqChunkIndex* mChunkIndex; + std::atomic mNextPackIndex; }; diff --git a/src/writerthread.cpp b/src/writerthread.cpp index 2ef8a52..db10bec 100644 --- a/src/writerthread.cpp +++ b/src/writerthread.cpp @@ -21,6 +21,10 @@ WriterThread::WriterThread(Options* opt, string filename, bool isSTDOUT){ mCompressors = NULL; mCompBufs = NULL; mCompBufSize = 0; + mOrderedMode = false; + mOrderedTotal = 0; + mOrderedRing = NULL; + mOrderedWriteCursor = 0; mBufferLists = NULL; if (mPwriteMode) { @@ -57,18 +61,45 @@ WriterThread::~WriterThread() { bool WriterThread::isCompleted() { if (mPwriteMode) return true; // no writer thread needed + if (mOrderedMode) + return mInputCompleted && mOrderedWriteCursor.load(std::memory_order_acquire) >= mOrderedTotal; return mInputCompleted && (mBufferLength==0); } bool WriterThread::setInputCompleted() { if (mPwriteMode) { - setInputCompletedPwrite(); + if (mOrderedMode) { + if (mOrderedTotal > 0) { + size_t lastSeq = mOrderedTotal - 1; + size_t lastSlot = lastSeq & (OFFSET_RING_SIZE - 1); + for (int spins = 0; mOffsetRing[lastSlot].published_seq.load(std::memory_order_acquire) != lastSeq; ) { + if (++spins > 256) { + usleep(1); + spins = 0; + } else { +#if defined(__aarch64__) + __asm__ volatile("yield"); +#elif defined(__x86_64__) || defined(__i386__) + __asm__ volatile("pause"); +#endif + } + } + size_t offset = mOffsetRing[lastSlot].cumulative_offset.load(std::memory_order_relaxed); + ftruncate(mFd, offset); + } else { + ftruncate(mFd, 0); + } + } else { + setInputCompletedPwrite(); + } mInputCompleted = true; return true; } mInputCompleted = true; - for(int t=0; tthread; t++) { - mBufferLists[t]->setProducerFinished(); + if (!mOrderedMode) { + for(int t=0; tthread; t++) { + mBufferLists[t]->setProducerFinished(); + } } return true; } @@ -91,8 +122,34 @@ void WriterThread::setInputCompletedPwrite() { ftruncate(mFd, offset); } +void WriterThread::setOrderedMode(size_t totalPacks) { + mOrderedMode = true; + mOrderedTotal = totalPacks; + if (!mPwriteMode) { + mOrderedRing = new std::atomic[OFFSET_RING_SIZE]; + for (int i = 0; i < OFFSET_RING_SIZE; i++) + mOrderedRing[i].store(nullptr, std::memory_order_relaxed); + } +} + void WriterThread::output(){ if (mPwriteMode) return; // no-op + if (mOrderedMode) { + size_t cursor = mOrderedWriteCursor.load(std::memory_order_relaxed); + if (cursor >= mOrderedTotal) return; + size_t slot = cursor & (OFFSET_RING_SIZE - 1); + string* str = mOrderedRing[slot].load(std::memory_order_acquire); + if (str) { + mWriter1->write(str->data(), str->length()); + delete str; + mOrderedRing[slot].store(nullptr, std::memory_order_release); + mOrderedWriteCursor.store(cursor + 1, std::memory_order_release); + mBufferLength--; + } else { + usleep(100); + } + return; + } SingleProducerSingleConsumerList* list = mBufferLists[mWorkingBufferList]; if(!list->canBeConsumed()) { usleep(100); @@ -185,7 +242,32 @@ void WriterThread::doInputPwrite(int tid, string* data, size_t seq) { delete data; } +void WriterThread::inputOrderedRing(string* data, size_t seq) { + while (seq - mOrderedWriteCursor.load(std::memory_order_acquire) >= (size_t)OFFSET_RING_SIZE) { + usleep(10); + } + size_t slot = seq & (OFFSET_RING_SIZE - 1); + mOrderedRing[slot].store(data, std::memory_order_release); + mBufferLength++; +} + +void WriterThread::inputWithSeq(int tid, string* data, size_t seq) { + if (mPwriteMode) { + doInputPwrite(tid, data, seq); + } else { + inputOrderedRing(data, seq); + } +} + void WriterThread::cleanup() { + if (mOrderedRing) { + for (int i = 0; i < OFFSET_RING_SIZE; i++) { + string* s = mOrderedRing[i].load(std::memory_order_relaxed); + if (s) delete s; + } + delete[] mOrderedRing; + mOrderedRing = NULL; + } if (mPwriteMode) { if (mFd >= 0) { close(mFd); mFd = -1; } delete[] mOffsetRing; mOffsetRing = NULL; diff --git a/src/writerthread.h b/src/writerthread.h index 19a4ac5..8c36863 100644 --- a/src/writerthread.h +++ b/src/writerthread.h @@ -39,12 +39,15 @@ class WriterThread{ long bufferLength() {return mBufferLength;}; string getFilename() {return mFilename;} bool isPwriteMode() {return mPwriteMode;} + void setOrderedMode(size_t totalPacks); + void inputWithSeq(int tid, string* data, size_t seq); private: void deleteWriter(); void inputPwrite(int tid, string* data); void doInputPwrite(int tid, string* data, size_t seq); void setInputCompletedPwrite(); + void inputOrderedRing(string* data, size_t seq); private: Writer* mWriter1; @@ -65,6 +68,11 @@ class WriterThread{ libdeflate_compressor** mCompressors; char** mCompBufs; // per-worker pre-allocated compress output buffers size_t mCompBufSize; + // Ordered output mode (parallel pread): pack-index-based sequencing + bool mOrderedMode; + size_t mOrderedTotal; + std::atomic* mOrderedRing; + std::atomic mOrderedWriteCursor; }; #endif From 3dbbec79c9be427a05da7626cc6340543db7c541 Mon Sep 17 00:00:00 2001 From: Kim Yang Date: Fri, 27 Mar 2026 12:26:39 +0800 Subject: [PATCH 3/3] perf: PE parallel pread with ordered output Worker threads use pread(2) to read R1/R2 chunks in parallel, bypassing the dual reader thread bottleneck. Both files are indexed with FastqChunkIndex; pack counts must match (same number of reads). Each thread atomically grabs a pack index, preads from both files, parses both into ReadPacks, and calls processPairEnd. All 7 PE writers (left, right, unpaired x2, merged, failed, overlapped) use pack-index-based ordered output via inputWithSeq. Parallel pread is disabled for interleaved input, stdin, .gz input, split mode, and readsToProcess limit. Also adds FastqChunkIndex and FastqChunkParser utility classes for scanning uncompressed FASTQ files into pack-aligned byte ranges and parsing raw byte buffers into Read objects. Co-Authored-By: Claude Opus 4.6 (1M context) --- src/fastqchunkindex.cpp | 85 +++++++++++++++++++ src/fastqchunkindex.h | 41 +++++++++ src/fastqchunkparser.cpp | 117 +++++++++++++++++++++++++ src/fastqchunkparser.h | 17 ++++ src/peprocessor.cpp | 178 ++++++++++++++++++++++++++++++++++----- src/peprocessor.h | 9 +- 6 files changed, 425 insertions(+), 22 deletions(-) create mode 100644 src/fastqchunkindex.cpp create mode 100644 src/fastqchunkindex.h create mode 100644 src/fastqchunkparser.cpp create mode 100644 src/fastqchunkparser.h diff --git a/src/fastqchunkindex.cpp b/src/fastqchunkindex.cpp new file mode 100644 index 0000000..92857b6 --- /dev/null +++ b/src/fastqchunkindex.cpp @@ -0,0 +1,85 @@ +#include "fastqchunkindex.h" +#include +#include +#include +#include + +// Scan buffer size: 4MB +static const size_t SCAN_BUF_SIZE = 1 << 22; + +bool FastqChunkIndex::build(const std::string& filename, int packSize) { + mFd = open(filename.c_str(), O_RDONLY); + if (mFd < 0) + return false; + + struct stat st; + if (fstat(mFd, &st) != 0 || !S_ISREG(st.st_mode)) { + ::close(mFd); + mFd = -1; + return false; + } + mFileSize = st.st_size; + if (mFileSize == 0) { + ::close(mFd); + mFd = -1; + return false; + } + +#ifdef __linux__ + posix_fadvise(mFd, 0, mFileSize, POSIX_FADV_SEQUENTIAL); +#endif + + // Every FASTQ record is 4 lines, so one pack = packSize * 4 newlines + const int newlinesPerPack = packSize * 4; + + mOffsets.clear(); + mOffsets.push_back(0); // first pack starts at byte 0 + + char* buf = new char[SCAN_BUF_SIZE]; + size_t fileOffset = 0; + int nlCount = 0; + + while (fileOffset < mFileSize) { + size_t toRead = SCAN_BUF_SIZE; + if (fileOffset + toRead > mFileSize) + toRead = mFileSize - fileOffset; + + ssize_t n = pread(mFd, buf, toRead, fileOffset); + if (n <= 0) + break; + + // Scan for newlines using memchr for speed + const char* p = buf; + const char* end = buf + n; + while (p < end) { + const char* nl = (const char*)memchr(p, '\n', end - p); + if (!nl) + break; + nlCount++; + if (nlCount == newlinesPerPack) { + // The next pack starts right after this newline + size_t offset = fileOffset + (nl - buf) + 1; + if (offset < mFileSize) + mOffsets.push_back(offset); + nlCount = 0; + } + p = nl + 1; + } + + fileOffset += n; + } + + // Sentinel: the end of the last pack is fileSize + if (mOffsets.back() != mFileSize) + mOffsets.push_back(mFileSize); + + delete[] buf; + return true; +} + +FastqChunkIndex::~FastqChunkIndex() { + if (mFd >= 0) { + ::close(mFd); + mFd = -1; + } +} diff --git a/src/fastqchunkindex.h b/src/fastqchunkindex.h new file mode 100644 index 0000000..f4d333c --- /dev/null +++ b/src/fastqchunkindex.h @@ -0,0 +1,41 @@ +#ifndef FASTQ_CHUNK_INDEX_H +#define FASTQ_CHUNK_INDEX_H + +#include +#include +#include + +// Scans an uncompressed FASTQ file and builds a pack offset index. +// Each entry marks the byte offset where a pack of PACK_SIZE reads begins. +// The last pack may contain fewer than PACK_SIZE reads. +class FastqChunkIndex { +public: + // Scan the file, recording a byte offset every (4 * packSize) newlines. + // Returns false if the file cannot be opened or is not a regular file. + bool build(const std::string& filename, int packSize); + + // Number of packs + size_t packCount() const { return mOffsets.size() > 0 ? mOffsets.size() - 1 : 0; } + + // Byte offset of pack i + size_t packStart(size_t i) const { return mOffsets[i]; } + + // Byte length of pack i + size_t packLength(size_t i) const { return mOffsets[i + 1] - mOffsets[i]; } + + // File descriptor (caller must not close it; owned by this object) + int fd() const { return mFd; } + + // Total file size + size_t fileSize() const { return mFileSize; } + + // Clean up + ~FastqChunkIndex(); + +private: + std::vector mOffsets; // packCount+1 entries: [0]=0, [last]=fileSize + int mFd = -1; + size_t mFileSize = 0; +}; + +#endif diff --git a/src/fastqchunkparser.cpp b/src/fastqchunkparser.cpp new file mode 100644 index 0000000..2b80b67 --- /dev/null +++ b/src/fastqchunkparser.cpp @@ -0,0 +1,117 @@ +#include "fastqchunkparser.h" +#include "common.h" +#include +#include +#include + +using namespace std; + +// Find next newline in buf[pos..len), return position of char after \n. +// Handles \r\n. Returns len if no newline found. +static inline size_t nextLine(const char* buf, size_t pos, size_t len) { + const char* p = (const char*)memchr(buf + pos, '\n', len - pos); + if (!p) + return len; + return (p - buf) + 1; +} + +// Extract a line from buf[start..end) stripping trailing \r\n +static inline void extractLine(const char* buf, size_t start, size_t lineEnd, string* out) { + size_t end = lineEnd; + // strip trailing \n and \r + while (end > start && (buf[end - 1] == '\n' || buf[end - 1] == '\r')) + end--; + out->assign(buf + start, end - start); +} + +ReadPack* FastqChunkParser::parse(const char* buf, size_t len, int tid, ReadPool* pool, bool phred64) { + // Pre-count records: count newlines / 4 + int estimatedReads = 0; + { + const char* p = buf; + const char* end = buf + len; + while (p < end) { + p = (const char*)memchr(p, '\n', end - p); + if (!p) break; + estimatedReads++; + p++; + } + estimatedReads /= 4; + } + if (estimatedReads <= 0) + estimatedReads = 1; + + Read** data = new Read*[estimatedReads]; + int count = 0; + size_t pos = 0; + + while (pos < len) { + // Skip empty lines + while (pos < len && (buf[pos] == '\n' || buf[pos] == '\r')) + pos++; + if (pos >= len) + break; + + // Line 1: name (starts with @) + size_t nameStart = pos; + size_t nameEnd = nextLine(buf, pos, len); + if (nameStart >= len || buf[nameStart] != '@') + break; + pos = nameEnd; + + // Line 2: sequence + if (pos >= len) break; + size_t seqStart = pos; + size_t seqEnd = nextLine(buf, pos, len); + pos = seqEnd; + + // Line 3: strand (+) + if (pos >= len) break; + size_t strandStart = pos; + size_t strandEnd = nextLine(buf, pos, len); + pos = strandEnd; + + // Line 4: quality + if (pos >= len && strandEnd >= len) break; + size_t qualStart = pos; + size_t qualEnd = nextLine(buf, pos, len); + pos = qualEnd; + + // Build Read object + Read* r = nullptr; + if (pool) + r = pool->getOne(); + + if (r) { + extractLine(buf, nameStart, nameEnd, r->mName); + extractLine(buf, seqStart, seqEnd, r->mSeq); + extractLine(buf, strandStart, strandEnd, r->mStrand); + extractLine(buf, qualStart, qualEnd, r->mQuality); + } else { + string* name = new string(); + string* seq = new string(); + string* strand = new string(); + string* quality = new string(); + extractLine(buf, nameStart, nameEnd, name); + extractLine(buf, seqStart, seqEnd, seq); + extractLine(buf, strandStart, strandEnd, strand); + extractLine(buf, qualStart, qualEnd, quality); + r = new Read(name, seq, strand, quality, phred64); + } + + if (count >= estimatedReads) { + // Shouldn't happen, but be safe + Read** newData = new Read*[estimatedReads * 2]; + memcpy(newData, data, sizeof(Read*) * count); + delete[] data; + data = newData; + estimatedReads *= 2; + } + data[count++] = r; + } + + ReadPack* pack = new ReadPack; + pack->data = data; + pack->count = count; + return pack; +} diff --git a/src/fastqchunkparser.h b/src/fastqchunkparser.h new file mode 100644 index 0000000..d41e523 --- /dev/null +++ b/src/fastqchunkparser.h @@ -0,0 +1,17 @@ +#ifndef FASTQ_CHUNK_PARSER_H +#define FASTQ_CHUNK_PARSER_H + +#include "read.h" +#include "readpool.h" + +// Parse a raw byte buffer (from pread) into a ReadPack. +// The buffer must start at a record boundary and contain complete records. +class FastqChunkParser { +public: + // Parse buf[0..len) into Read* objects, return a ReadPack. + // If pool is non-null, try to reuse Read objects from the pool. + // tid is the thread id for the pool. + static ReadPack* parse(const char* buf, size_t len, int tid, ReadPool* pool, bool phred64); +}; + +#endif diff --git a/src/peprocessor.cpp b/src/peprocessor.cpp index 7411942..8427332 100644 --- a/src/peprocessor.cpp +++ b/src/peprocessor.cpp @@ -1,5 +1,6 @@ #include "peprocessor.h" #include "fastqreader.h" +#include "fastqchunkparser.h" #include #include #include @@ -44,6 +45,9 @@ PairEndProcessor::PairEndProcessor(Options* opt){ mLeftReadPool = new ReadPool(mOptions); mRightReadPool = new ReadPool(mOptions); + mChunkIndexLeft = NULL; + mChunkIndexRight = NULL; + mNextPackIndex = 0; } PairEndProcessor::~PairEndProcessor() { @@ -62,6 +66,14 @@ PairEndProcessor::~PairEndProcessor() { } delete[] mLeftInputLists; delete[] mRightInputLists; + if(mChunkIndexLeft) { + delete mChunkIndexLeft; + mChunkIndexLeft = NULL; + } + if(mChunkIndexRight) { + delete mChunkIndexRight; + mChunkIndexRight = NULL; + } } void PairEndProcessor::initOutput() { @@ -130,10 +142,40 @@ void PairEndProcessor::initConfig(ThreadConfig* config) { } +bool PairEndProcessor::canUseParallelRead() { + if (mOptions->interleavedInput) + return false; + if (mOptions->in1 == "/dev/stdin" || mOptions->in2 == "/dev/stdin") + return false; + if (ends_with(mOptions->in1, ".gz") || ends_with(mOptions->in2, ".gz")) + return false; + if (mOptions->readsToProcess > 0) + return false; + if (mOptions->split.enabled) + return false; + return true; +} + bool PairEndProcessor::process(){ if(!mOptions->split.enabled) initOutput(); + bool useParallelRead = canUseParallelRead(); + + if (useParallelRead) { + mChunkIndexLeft = new FastqChunkIndex(); + mChunkIndexRight = new FastqChunkIndex(); + if (!mChunkIndexLeft->build(mOptions->in1, PACK_SIZE) || + !mChunkIndexRight->build(mOptions->in2, PACK_SIZE) || + mChunkIndexLeft->packCount() != mChunkIndexRight->packCount()) { + delete mChunkIndexLeft; mChunkIndexLeft = NULL; + delete mChunkIndexRight; mChunkIndexRight = NULL; + useParallelRead = false; + } else if (mOptions->verbose) { + loginfo("parallel pread: indexed " + to_string(mChunkIndexLeft->packCount()) + " packs from R1/R2"); + } + } + std::thread* readerLeft = NULL; std::thread* readerRight = NULL; std::thread* readerInterveleaved = NULL; @@ -150,16 +192,32 @@ bool PairEndProcessor::process(){ initConfig(configs[t]); } - if(mOptions->interleavedInput) - readerInterveleaved= new std::thread(std::bind(&PairEndProcessor::interleavedReaderTask, this)); - else { - readerLeft = new std::thread(std::bind(&PairEndProcessor::readerTask, this, true)); - readerRight = new std::thread(std::bind(&PairEndProcessor::readerTask, this, false)); + if (useParallelRead) { + size_t totalPacks = mChunkIndexLeft->packCount(); + if (mLeftWriter) mLeftWriter->setOrderedMode(totalPacks); + if (mRightWriter) mRightWriter->setOrderedMode(totalPacks); + if (mMergedWriter) mMergedWriter->setOrderedMode(totalPacks); + if (mFailedWriter) mFailedWriter->setOrderedMode(totalPacks); + if (mOverlappedWriter) mOverlappedWriter->setOrderedMode(totalPacks); + if (mUnpairedLeftWriter) mUnpairedLeftWriter->setOrderedMode(totalPacks); + if (mUnpairedRightWriter) mUnpairedRightWriter->setOrderedMode(totalPacks); + } + + if (!useParallelRead) { + if(mOptions->interleavedInput) + readerInterveleaved= new std::thread(std::bind(&PairEndProcessor::interleavedReaderTask, this)); + else { + readerLeft = new std::thread(std::bind(&PairEndProcessor::readerTask, this, true)); + readerRight = new std::thread(std::bind(&PairEndProcessor::readerTask, this, false)); + } } std::thread** threads = new thread*[mOptions->thread]; for(int t=0; tthread; t++){ - threads[t] = new std::thread(std::bind(&PairEndProcessor::processorTask, this, configs[t])); + if (useParallelRead) + threads[t] = new std::thread(std::bind(&PairEndProcessor::processorTaskParallel, this, configs[t])); + else + threads[t] = new std::thread(std::bind(&PairEndProcessor::processorTask, this, configs[t])); } std::thread* leftWriterThread = NULL; @@ -186,7 +244,7 @@ bool PairEndProcessor::process(){ if(readerInterveleaved) { readerInterveleaved->join(); - } else { + } else if(readerLeft) { readerLeft->join(); readerRight->join(); } @@ -359,7 +417,7 @@ void PairEndProcessor::recycleToPool2(int tid, Read* r) { delete r; } -bool PairEndProcessor::processPairEnd(ReadPack* leftPack, ReadPack* rightPack, ThreadConfig* config){ +bool PairEndProcessor::processPairEnd(ReadPack* leftPack, ReadPack* rightPack, ThreadConfig* config, int64_t writeSeq){ if(leftPack->count != rightPack->count) { cerr << endl; cerr << "WARNING: different read numbers of the " << mPackProcessedCounter << " pack" << endl; @@ -646,33 +704,55 @@ bool PairEndProcessor::processPairEnd(ReadPack* leftPack, ReadPack* rightPack, T } if(mMergedWriter) { - // move to heap for writer thread ownership - mMergedWriter->input(tid, new string(std::move(mergedOutput))); + if (writeSeq >= 0) + mMergedWriter->inputWithSeq(tid, new string(std::move(mergedOutput)), (size_t)writeSeq); + else + mMergedWriter->input(tid, new string(std::move(mergedOutput))); } if(mFailedWriter) { - mFailedWriter->input(tid, new string(std::move(failedOut))); + if (writeSeq >= 0) + mFailedWriter->inputWithSeq(tid, new string(std::move(failedOut)), (size_t)writeSeq); + else + mFailedWriter->input(tid, new string(std::move(failedOut))); } if(mOverlappedWriter) { - mOverlappedWriter->input(tid, new string(std::move(overlappedOut))); + if (writeSeq >= 0) + mOverlappedWriter->inputWithSeq(tid, new string(std::move(overlappedOut)), (size_t)writeSeq); + else + mOverlappedWriter->input(tid, new string(std::move(overlappedOut))); } // normal output by left/right writer thread if(mRightWriter && mLeftWriter) { - // write PE - move to heap for writer thread ownership - mLeftWriter->input(tid, new string(std::move(outstr1))); - mRightWriter->input(tid, new string(std::move(outstr2))); + if (writeSeq >= 0) { + mLeftWriter->inputWithSeq(tid, new string(std::move(outstr1)), (size_t)writeSeq); + mRightWriter->inputWithSeq(tid, new string(std::move(outstr2)), (size_t)writeSeq); + } else { + mLeftWriter->input(tid, new string(std::move(outstr1))); + mRightWriter->input(tid, new string(std::move(outstr2))); + } } else if(mLeftWriter) { - // write singleOutput - mLeftWriter->input(tid, new string(std::move(singleOutput))); + if (writeSeq >= 0) + mLeftWriter->inputWithSeq(tid, new string(std::move(singleOutput)), (size_t)writeSeq); + else + mLeftWriter->input(tid, new string(std::move(singleOutput))); } // output unpaired reads if(mUnpairedLeftWriter && mUnpairedRightWriter) { - mUnpairedLeftWriter->input(tid, new string(std::move(unpairedOut1))); - mUnpairedRightWriter->input(tid, new string(std::move(unpairedOut2))); + if (writeSeq >= 0) { + mUnpairedLeftWriter->inputWithSeq(tid, new string(std::move(unpairedOut1)), (size_t)writeSeq); + mUnpairedRightWriter->inputWithSeq(tid, new string(std::move(unpairedOut2)), (size_t)writeSeq); + } else { + mUnpairedLeftWriter->input(tid, new string(std::move(unpairedOut1))); + mUnpairedRightWriter->input(tid, new string(std::move(unpairedOut2))); + } } else if(mUnpairedLeftWriter) { - mUnpairedLeftWriter->input(tid, new string(std::move(unpairedOut1))); + if (writeSeq >= 0) + mUnpairedLeftWriter->inputWithSeq(tid, new string(std::move(unpairedOut1)), (size_t)writeSeq); + else + mUnpairedLeftWriter->input(tid, new string(std::move(unpairedOut1))); } if(mOptions->split.byFileLines) @@ -1062,6 +1142,64 @@ void PairEndProcessor::processorTask(ThreadConfig* config) } } +void PairEndProcessor::processorTaskParallel(ThreadConfig* config) +{ + int tid = config->getThreadId(); + size_t totalPacks = mChunkIndexLeft->packCount(); + int fdLeft = mChunkIndexLeft->fd(); + int fdRight = mChunkIndexRight->fd(); + + while (true) { + size_t packIdx = mNextPackIndex.fetch_add(1, std::memory_order_relaxed); + if (packIdx >= totalPacks) + break; + + // pread left (R1) + size_t offL = mChunkIndexLeft->packStart(packIdx); + size_t lenL = mChunkIndexLeft->packLength(packIdx); + char* bufL = new char[lenL]; + ssize_t nL = pread(fdLeft, bufL, lenL, offL); + if (nL <= 0) { delete[] bufL; break; } + + // pread right (R2) + size_t offR = mChunkIndexRight->packStart(packIdx); + size_t lenR = mChunkIndexRight->packLength(packIdx); + char* bufR = new char[lenR]; + ssize_t nR = pread(fdRight, bufR, lenR, offR); + if (nR <= 0) { delete[] bufL; delete[] bufR; break; } + + ReadPack* leftPack = FastqChunkParser::parse(bufL, nL, tid, nullptr, mOptions->phred64); + ReadPack* rightPack = FastqChunkParser::parse(bufR, nR, tid, nullptr, mOptions->phred64); + delete[] bufL; + delete[] bufR; + + processPairEnd(leftPack, rightPack, config, (int64_t)packIdx); + + if (mLeftWriter && mLeftWriter->bufferLength() > PACK_IN_MEM_LIMIT) { + std::unique_lock lk(mBackpressureMtx); + while (mLeftWriter->bufferLength() > PACK_IN_MEM_LIMIT) { + mBackpressureCV.wait_for(lk, std::chrono::milliseconds(1)); + } + } + } + + int finishedCount = mFinishedThreads.fetch_add(1, std::memory_order_release) + 1; + if (finishedCount == mOptions->thread) { + if (mLeftWriter) mLeftWriter->setInputCompleted(); + if (mRightWriter) mRightWriter->setInputCompleted(); + if (mUnpairedLeftWriter) mUnpairedLeftWriter->setInputCompleted(); + if (mUnpairedRightWriter) mUnpairedRightWriter->setInputCompleted(); + if (mMergedWriter) mMergedWriter->setInputCompleted(); + if (mFailedWriter) mFailedWriter->setInputCompleted(); + if (mOverlappedWriter) mOverlappedWriter->setInputCompleted(); + } + + if (mOptions->verbose) { + string msg = "thread " + to_string(tid + 1) + " finished (parallel pread)"; + loginfo(msg); + } +} + void PairEndProcessor::writerTask(WriterThread* config) { while(true) { diff --git a/src/peprocessor.h b/src/peprocessor.h index 707e41c..8b0ba16 100644 --- a/src/peprocessor.h +++ b/src/peprocessor.h @@ -17,7 +17,7 @@ #include "writerthread.h" #include "duplicate.h" #include "readpool.h" - +#include "fastqchunkindex.h" using namespace std; @@ -30,10 +30,12 @@ class PairEndProcessor{ bool process(); private: - bool processPairEnd(ReadPack* leftPack, ReadPack* rightPack, ThreadConfig* config); + bool processPairEnd(ReadPack* leftPack, ReadPack* rightPack, ThreadConfig* config, int64_t writeSeq = -1); void readerTask(bool isLeft); void interleavedReaderTask(); void processorTask(ThreadConfig* config); + void processorTaskParallel(ThreadConfig* config); + bool canUseParallelRead(); void initConfig(ThreadConfig* config); void initOutput(); void closeOutput(); @@ -69,6 +71,9 @@ class PairEndProcessor{ atomic_bool shouldStopReading; std::mutex mBackpressureMtx; std::condition_variable mBackpressureCV; + FastqChunkIndex* mChunkIndexLeft; + FastqChunkIndex* mChunkIndexRight; + std::atomic mNextPackIndex; };