From 5d612e0f048fe1091c204924252184096868215d Mon Sep 17 00:00:00 2001 From: kathir Date: Tue, 29 Jul 2025 06:18:39 +0000 Subject: [PATCH 1/2] mcap io_ring implementation --- cpp/mcap/include/mcap/writer.inl | 354 ++++++++++++++++++++++++++++++- 1 file changed, 352 insertions(+), 2 deletions(-) diff --git a/cpp/mcap/include/mcap/writer.inl b/cpp/mcap/include/mcap/writer.inl index f925596c24..f7284257f6 100644 --- a/cpp/mcap/include/mcap/writer.inl +++ b/cpp/mcap/include/mcap/writer.inl @@ -11,8 +11,59 @@ # include #endif +#define ALIGNED_WRITE + +#include +#include +#include +#include +#include +#include +#include +#include + +#ifdef ALIGNED_WRITE + #include + #include + #include + #include + #include + #include + #include + #include + #include +#endif + + namespace mcap { + +#ifdef ALIGNED_WRITE + void neon_memcpy(uint8_t* dst, const uint8_t* src, size_t size) { + size_t i = 0; + // Copy 16 bytes (128 bits) at a time + for (; i + 15 < size; i += 16) { + uint8x16_t data = vld1q_u8(src + i); + vst1q_u8(dst + i, data); + } + // Copy remaining bytes + for (; i < size; ++i) { + dst[i] = src[i]; + } + } + void neon_memset(uint8_t* dst, uint8_t value, size_t size) { + size_t i = 0; + uint8x16_t val = vdupq_n_u8(value); + for (; i + 15 < size; i += 16) { + vst1q_u8(dst + i, val); + } + for (; i < size; ++i) { + dst[i] = value; + } + } +#endif + + // IWritable /////////////////////////////////////////////////////////////////// IWritable::IWritable() noexcept @@ -37,12 +88,305 @@ void IWritable::resetCrc() { crc_ = internal::CRC32_INIT; } -// FileWriter ////////////////////////////////////////////////////////////////// +// FileWriter /////////////////////////////////////////////////////////////////// +#ifdef ALIGNED_WRITE + constexpr size_t BUFFER_SIZE = 120 * 1024 * 1024; + constexpr size_t IORING_BUFF_SIZE = 80 * 1024 * 1024; + int fd_=-1; + struct io_uring ring_; + bool ringInited_=false; + void* buf_ = nullptr; + void *bufpong_=nullptr; + uint64_t writeOffset_=0; // current offset in file + static constexpr int QUEUE_DEPTH = 128; + static constexpr size_t ALIGNMENT = 512; + std::mutex mtx; + std::condition_variable cv; + bool ready = false; + std::atomic running{true}; + uint64_t mcap_buffsize=0; + int final_footer_count = 0; + bool final_footer = false; + bool ping_buffer_used=true; + std::thread workerThread_; + uint64_t aligned_blocks=0; + + void worker_thread(); + + Status FileWriter::open(std::string_view filename) { + end(); + fd_ = ::open(filename.data(), O_CREAT |O_RDWR | O_TRUNC | O_DIRECT, 0644); + if (fd_ < 0) { + perror("open"); + } + + if (posix_memalign(&buf_, ALIGNMENT, BUFFER_SIZE) != 0) { + perror("posix_memalign"); + close(fd_); + } + + if (posix_memalign(&bufpong_, ALIGNMENT,BUFFER_SIZE) != 0) { + perror("posix_memalign mcapbuf_"); + free(buf_); // Clean up buf_ + close(fd_); + } + int ret = io_uring_queue_init(QUEUE_DEPTH, &ring_, 0); + if (ret < 0) { + std::cerr << "io_uring_queue_init failed: " << strerror(-ret) << "\n"; + close(fd_); + } + ringInited_ = true; + + // Register buffer + struct iovec iovecs[2]; + iovecs[0].iov_base = buf_; + iovecs[0].iov_len = BUFFER_SIZE; + + iovecs[1].iov_base = bufpong_; + iovecs[1].iov_len = BUFFER_SIZE; + ret = io_uring_register_buffers(&ring_, iovecs, 2); + if (ret < 0) { + std::cerr << "io_uring_register_buffers failed: " << strerror(-ret) << "\n"; + close(fd_); + } + + // Register file descriptor + ret = io_uring_register_files(&ring_, &fd_, 1); + if (ret < 0) { + std::cerr << "io_uring_register_files failed: " << strerror(-ret) << "\n"; + close(fd_); + } + + size_ = 0; + writeOffset_ = 0; + running=true; + ready=false; + mcap_buffsize=0; + ping_buffer_used=true; + std::thread t(worker_thread); + workerThread_ = std::move(t); // Store in a class member + return StatusCode::Success; + } + + + void worker_thread() { + std::unique_lock lock(mtx); // ✅ This must be defined + while(running) + { + cv.wait(lock, [] { return ready; }); // Wait until ready is + io_uring_cqe* cqe; + if (io_uring_wait_cqe(&ring_, &cqe) < 0) { + std::cerr << "io_uring_wait_cqe failed\n"; + continue; + } + if (cqe->res < 0) { + std::cerr << "Async write error: " << strerror(-cqe->res) << "\n"; + } else { + //std::cout << " io_ring Write completed: " << cqe->res << " bytes\n"; + } + io_uring_cqe_seen(&ring_, cqe); + ready=false; + } + std::cout << "Detached thread received signal. Proceeding..." << std::endl; + } + inline void* getActiveBuffer() { + return ping_buffer_used ? buf_ : bufpong_; + } + + inline void switchBuffer() { + ping_buffer_used = !ping_buffer_used; + } + + inline void* getInactiveBuffer() { + return ping_buffer_used ? bufpong_ : buf_; + } + + // Your requested handleWrite equivalent + void FileWriter::handleWrite(const std::byte* data, uint64_t size) { + if (!ringInited_ || !data || size == 0) + return; + + // Append incoming data to internal MCAP buffer if size is reasonable + if (mcap_buffsize + size > BUFFER_SIZE) { + std::cerr << "Error: Buffer overflow risk. Data size exceeds allocated buffer.\n"; + return; + } + void* activeBuf = getActiveBuffer(); + neon_memcpy(static_cast(activeBuf) + mcap_buffsize, + reinterpret_cast(data), + size); + mcap_buffsize += size; + + // Check for MCAP magic footer (assuming Magic is a byte array and sizeof(Magic) is valid) + if (size == sizeof(Magic) && memcmp(data, Magic, sizeof(Magic)) == 0) { + final_footer_count++; + if (final_footer_count == 2) { + final_footer = true; + final_footer_count=0; + } + } + + // Lambda to submit a fixed buffer write using io_uring + auto submit_fixed_write = [&](void* buf, size_t bufSize, off_t offset, int bufIndex) -> bool { + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); + if (!sqe) { + std::cerr << "io_uring_get_sqe failed\n"; + return false; + } + io_uring_prep_write_fixed(sqe, 0, buf, bufSize, offset, bufIndex); + sqe->flags |= IOSQE_FIXED_FILE; + sqe->buf_index = bufIndex; + sqe->fd = 0; + return true; + }; + + if (mcap_buffsize >= IORING_BUFF_SIZE) { + // Align mcap_buffsize down to multiple of ALIGNMENT + size_t aligned_blocks = (mcap_buffsize / ALIGNMENT) * ALIGNMENT; + size_t remain_mcapbytes = mcap_buffsize % ALIGNMENT; + + void* activeBuf = getActiveBuffer(); + if(ping_buffer_used) + { + if (!submit_fixed_write(activeBuf, aligned_blocks, writeOffset_, 0)) + return; + } + else + { + if (!submit_fixed_write(activeBuf, aligned_blocks, writeOffset_, 1)) + return; + } + + int ret = io_uring_submit(&ring_); + if (ret < 0) { + std::cerr << "io_uring_submit failed: " << strerror(-ret) << "\n"; + return; + } + { + std::lock_guard lock(mtx); + ready = true; + } + cv.notify_one(); // response thread + + if (remain_mcapbytes > 0) { + void* InactiveBuf = getInactiveBuffer(); + void* activeBuf = getActiveBuffer(); + std::memcpy(reinterpret_cast(InactiveBuf), + reinterpret_cast(activeBuf) + aligned_blocks, + remain_mcapbytes); + switchBuffer(); + } + mcap_buffsize = remain_mcapbytes; + writeOffset_ += aligned_blocks; + size_ += aligned_blocks; + } + + if (final_footer) { + // Align mcap_buffsize up to next multiple of ALIGNMENT + size_t aligned_size = (mcap_buffsize + (ALIGNMENT - 1)) & ~(ALIGNMENT - 1); + size_t appended_bytes = aligned_size - mcap_buffsize; + + // Submit final footer write + struct io_uring_sqe* sqe = io_uring_get_sqe(&ring_); + if (!sqe) { + std::cerr << "io_uring_get_sqe failed\n"; + return; + } + void* activeBuf = getActiveBuffer(); + if(ping_buffer_used) + { + if (!submit_fixed_write(activeBuf, aligned_size, writeOffset_, 0)) + return; + } + else + { + if (!submit_fixed_write(activeBuf,aligned_size, writeOffset_, 1)) + return; + } + + int ret = io_uring_submit(&ring_); + if (ret < 0) { + std::cerr << "io_uring_submit failed: " << strerror(-ret) << "\n"; + return; + } else if (ret == 0) { + std::cerr << "io_uring_submit submitted 0 entries, nothing queued\n"; + return; + } else { + std::cout << "Submitted " << ret << " sqe(s) successfully\n"; + } + + // Wait for completion + struct io_uring_cqe* cqe; + if (io_uring_wait_cqe(&ring_, &cqe) < 0) { + std::cerr << "io_uring_wait_cqe failed\n"; + return; + } + if (cqe->res < 0) { + std::cerr << "Async write error 0001: " << strerror(-cqe->res) << "\n"; + } else { + std::cout << "Write completed: " << cqe->res << " bytes\n"; + } + io_uring_cqe_seen(&ring_, cqe); + + writeOffset_ += aligned_size; + + // Truncate file to remove padded bytes + struct stat st; + if (fstat(fd_, &st) < 0) { + perror("fstat"); + return; + } + off_t current_size = st.st_size; + off_t new_size = current_size - appended_bytes; + + if (new_size < 0) { + std::cerr << "Cannot remove more bytes than file size\n"; + return; + } + + if (ftruncate(fd_, new_size) < 0) { + perror("ftruncate"); + return; + } else { + std::cout << "File truncated to " << new_size << " bytes\n"; + } + + + std::cout << " after current_size " << current_size << " bytes\n"; + final_footer = false; + } + + } + + + void io_close() { + if (ringInited_) { + io_uring_unregister_buffers(&ring_); + io_uring_unregister_files(&ring_); + io_uring_queue_exit(&ring_); + ringInited_ = false; + } + if (fd_ >= 0) { + ::close(fd_); + fd_ = -1; + } + if (buf_) { + free(buf_); + buf_ = nullptr; + } + writeOffset_ = 0; + running = false; + final_footer_count=0; + final_footer=false; + cv.notify_all(); + if (workerThread_.joinable()) workerThread_.join(); + } +#endif FileWriter::~FileWriter() { end(); } - +#ifndef ALIGNED_WRITE Status FileWriter::open(std::string_view filename) { end(); file_ = std::fopen(filename.data(), "wb"); @@ -61,6 +405,8 @@ void FileWriter::handleWrite(const std::byte* data, uint64_t size) { size_ += size; } +#endif + void FileWriter::flush() { if (file_) { std::fflush(file_); @@ -72,6 +418,10 @@ void FileWriter::end() { std::fclose(file_); file_ = nullptr; } +#ifdef ALIGNED_WRITE + io_close(); + close(fd_); +#endif size_ = 0; } From 9745c10ef961db6a7475397f7cd9ceeb14800416 Mon Sep 17 00:00:00 2001 From: kathir Date: Mon, 4 Aug 2025 07:27:38 +0000 Subject: [PATCH 2/2] instead of fixed wite use normal write and fixed write size --- cpp/mcap/include/mcap/writer.inl | 31 ++++++++++++++++++++----------- 1 file changed, 20 insertions(+), 11 deletions(-) diff --git a/cpp/mcap/include/mcap/writer.inl b/cpp/mcap/include/mcap/writer.inl index f7284257f6..7b6451fb7e 100644 --- a/cpp/mcap/include/mcap/writer.inl +++ b/cpp/mcap/include/mcap/writer.inl @@ -21,6 +21,7 @@ #include #include #include +#include // setpriority, PRIO_PROCESS #ifdef ALIGNED_WRITE #include @@ -90,8 +91,8 @@ void IWritable::resetCrc() { // FileWriter /////////////////////////////////////////////////////////////////// #ifdef ALIGNED_WRITE - constexpr size_t BUFFER_SIZE = 120 * 1024 * 1024; - constexpr size_t IORING_BUFF_SIZE = 80 * 1024 * 1024; + constexpr size_t BUFFER_SIZE = 140 * 1024 * 1024; + constexpr size_t IORING_BUFF_SIZE = 100 * 1024 * 1024; int fd_=-1; struct io_uring ring_; bool ringInited_=false; @@ -170,6 +171,17 @@ void IWritable::resetCrc() { void worker_thread() { + cpu_set_t cpuset; + CPU_ZERO(&cpuset); + CPU_SET(0, &cpuset); + pthread_setaffinity_np(pthread_self(), sizeof(cpu_set_t), &cpuset); + pid_t tid = gettid(); // get thread ID (Linux-specific) + if (setpriority(PRIO_PROCESS, tid, -18) != 0) { + perror("setpriority failed"); + } else { + std::cout << " rest Nice value set to " << std::endl; + } + std::unique_lock lock(mtx); // ✅ This must be defined while(running) { @@ -233,17 +245,14 @@ void IWritable::resetCrc() { std::cerr << "io_uring_get_sqe failed\n"; return false; } - io_uring_prep_write_fixed(sqe, 0, buf, bufSize, offset, bufIndex); + io_uring_prep_write(sqe, 0, buf,IORING_BUFF_SIZE, offset); sqe->flags |= IOSQE_FIXED_FILE; - sqe->buf_index = bufIndex; - sqe->fd = 0; return true; }; if (mcap_buffsize >= IORING_BUFF_SIZE) { // Align mcap_buffsize down to multiple of ALIGNMENT - size_t aligned_blocks = (mcap_buffsize / ALIGNMENT) * ALIGNMENT; - size_t remain_mcapbytes = mcap_buffsize % ALIGNMENT; + size_t remain_mcapbytes = mcap_buffsize -IORING_BUFF_SIZE; void* activeBuf = getActiveBuffer(); if(ping_buffer_used) @@ -271,14 +280,14 @@ void IWritable::resetCrc() { if (remain_mcapbytes > 0) { void* InactiveBuf = getInactiveBuffer(); void* activeBuf = getActiveBuffer(); - std::memcpy(reinterpret_cast(InactiveBuf), - reinterpret_cast(activeBuf) + aligned_blocks, + std::memcpy(reinterpret_cast(InactiveBuf), + reinterpret_cast(activeBuf) +IORING_BUFF_SIZE, remain_mcapbytes); switchBuffer(); } mcap_buffsize = remain_mcapbytes; - writeOffset_ += aligned_blocks; - size_ += aligned_blocks; + writeOffset_ +=IORING_BUFF_SIZE; + size_ += IORING_BUFF_SIZE; } if (final_footer) {