Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 50 additions & 1 deletion include/condy/condy_uring.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,11 @@

#pragma once

#include <liburing.h> // IWYU pragma: export
#include <liburing.h>

#include <cerrno>
#include <cstring>
#include <sys/mman.h>

// liburing <= 2.3 has no version macros, define them here

Expand Down Expand Up @@ -35,3 +39,48 @@ inline void io_uring_prep_uring_cmd(struct io_uring_sqe *sqe, int cmd_op,
sqe->len = 0;
}
#endif

// Polyfill for io_uring_setup_buf_ring / io_uring_free_buf_ring
// (added in liburing 2.4)
#if IO_URING_CHECK_VERSION(2, 4) // < 2.4
inline struct io_uring_buf_ring *
io_uring_setup_buf_ring(struct io_uring *ring, unsigned int nentries, int bgid,
unsigned int flags, int *err) noexcept {
size_t ring_size = nentries * sizeof(struct io_uring_buf);
auto *br = static_cast<struct io_uring_buf_ring *>(
mmap(nullptr, ring_size, PROT_READ | PROT_WRITE,
MAP_ANONYMOUS | MAP_PRIVATE, -1, 0));
if (br == MAP_FAILED) {
*err = -errno;
return nullptr;
}

io_uring_buf_ring_init(br);

struct io_uring_buf_reg reg = {};
reg.ring_addr = reinterpret_cast<uint64_t>(br);
reg.ring_entries = nentries;
reg.bgid = bgid;

*err = 0;
int ret = io_uring_register_buf_ring(ring, &reg, flags);
if (ret != 0) {
munmap(br, ring_size);
*err = ret;
return nullptr;
}

return br;
}

inline int io_uring_free_buf_ring(struct io_uring *ring,
struct io_uring_buf_ring *br,
unsigned int nentries, int bgid) noexcept {
int ret = io_uring_unregister_buf_ring(ring, bgid);
if (ret != 0) {
return ret;
}
munmap(br, nentries * sizeof(struct io_uring_buf));
return 0;
}
#endif
72 changes: 19 additions & 53 deletions include/condy/provided_buffers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,40 +56,23 @@ class BundledProvidedBufferQueue {
br_flags_(flags) {
auto &bgid_pool = runtime_->bgid_pool();

size_t data_size = capacity_ * sizeof(io_uring_buf);
void *data = mmap(nullptr, data_size, PROT_READ | PROT_WRITE,
MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
if (data == MAP_FAILED) [[unlikely]] {
throw detail::make_system_error("mmap");
}
auto d1 = detail::defer([&]() { munmap(data, data_size); });

bgid_ = bgid_pool.allocate();
auto d2 = detail::defer([&]() { bgid_pool.recycle(bgid_); });

br_ = reinterpret_cast<io_uring_buf_ring *>(data);
io_uring_buf_ring_init(br_);

io_uring_buf_reg reg = {};
reg.ring_addr = reinterpret_cast<uint64_t>(br_);
reg.ring_entries = capacity_;
reg.bgid = bgid_;
int r = io_uring_register_buf_ring(runtime_->ring().ring(), &reg,
br_flags_);
if (r != 0) [[unlikely]] {
throw detail::make_system_error("io_uring_register_buf_ring", -r);
auto d = detail::defer([&]() { bgid_pool.recycle(bgid_); });

int err = 0;
br_ = io_uring_setup_buf_ring(runtime_->ring().ring(), capacity_, bgid_,
br_flags_, &err);
if (br_ == nullptr) [[unlikely]] {
throw detail::make_system_error("io_uring_setup_buf_ring", -err);
}

d1.dismiss();
d2.dismiss();
d.dismiss();
}

~BundledProvidedBufferQueue() {
assert(br_ != nullptr);
size_t data_size = capacity_ * sizeof(io_uring_buf);
munmap(br_, data_size);
[[maybe_unused]] int r =
io_uring_unregister_buf_ring(runtime_->ring().ring(), bgid_);
[[maybe_unused]] int r = io_uring_free_buf_ring(runtime_->ring().ring(),
br_, capacity_, bgid_);
assert(r == 0);
if (r == 0) {
runtime_->bgid_pool().recycle(bgid_);
Expand Down Expand Up @@ -274,26 +257,13 @@ class BundledProvidedBufferPool {
auto &bgid_pool = runtime_->bgid_pool();

bgid_ = bgid_pool.allocate();
auto d2 = detail::defer([&]() { bgid_pool.recycle(bgid_); });
auto d = detail::defer([&]() { bgid_pool.recycle(bgid_); });

size_t ring_size = num_buffers_ * sizeof(io_uring_buf);
void *ring_data = mmap(nullptr, ring_size, PROT_READ | PROT_WRITE,
MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
if (ring_data == MAP_FAILED) [[unlikely]] {
throw detail::make_system_error("mmap");
}
auto d1 = detail::defer([&]() { munmap(ring_data, ring_size); });
br_ = reinterpret_cast<io_uring_buf_ring *>(ring_data);
io_uring_buf_ring_init(br_);

io_uring_buf_reg reg = {};
reg.ring_addr = reinterpret_cast<uint64_t>(br_);
reg.ring_entries = num_buffers_;
reg.bgid = bgid_;
int r = io_uring_register_buf_ring(runtime_->ring().ring(), &reg,
br_flags_);
if (r != 0) [[unlikely]] {
throw detail::make_system_error("io_uring_register_buf_ring", -r);
int err = 0;
br_ = io_uring_setup_buf_ring(runtime_->ring().ring(), num_buffers_,
bgid_, br_flags_, &err);
if (br_ == nullptr) [[unlikely]] {
throw detail::make_system_error("io_uring_setup_buf_ring", -err);
}

for (size_t bid = 0; bid < num_buffers_; bid++) {
Expand All @@ -303,21 +273,17 @@ class BundledProvidedBufferPool {
}
io_uring_buf_ring_advance(br_, static_cast<int>(num_buffers_));

d1.dismiss();
d2.dismiss();
d.dismiss();
}

~BundledProvidedBufferPool() {
assert(br_ != nullptr);
[[maybe_unused]] int r =
io_uring_unregister_buf_ring(runtime_->ring().ring(), bgid_);
[[maybe_unused]] int r = io_uring_free_buf_ring(
runtime_->ring().ring(), br_, num_buffers_, bgid_);
assert(r == 0);
if (r == 0) {
runtime_->bgid_pool().recycle(bgid_);
}

size_t ring_size = num_buffers_ * sizeof(io_uring_buf);
munmap(br_, ring_size);
}

public:
Expand Down
Loading