Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions include/condy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "condy/sync_wait.hpp" // IWYU pragma: export
#include "condy/task.hpp" // IWYU pragma: export
#include "condy/version.hpp" // IWYU pragma: export
#include "condy/zcrx.hpp" // IWYU pragma: export

/**
* @brief The main namespace for the Condy library.
Expand Down
20 changes: 20 additions & 0 deletions include/condy/async_operations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "condy/condy_uring.hpp"
#include "condy/detail/async_operations.hpp"
#include "condy/detail/helpers.hpp"
#include "condy/zcrx.hpp"

namespace condy {

Expand Down Expand Up @@ -647,6 +648,25 @@ inline auto async_recv_multishot(Fd sockfd, Buffer &buf, int flags,
}
#endif

#if !IO_URING_CHECK_VERSION(2, 15) // >= 2.15
// TODO: Consider the function signature later...
template <FdLike Fd, typename MultiShotFunc>
inline auto async_recv_multishot(Fd fd, ZeroCopyRxBufferPool &pool,
[[maybe_unused]] int flags,
MultiShotFunc &&func) {
auto zcrx_id = pool.zcrx_id();
auto prep_func = [=](detail::Ring *ring) {
auto *sqe = ring->get_sqe();
detail::prep_recv_zc_multishot(sqe, fd, zcrx_id);
return sqe;
};
auto op = build_multishot_op_awaiter<
SelectBufferCQEHandler<ZeroCopyRxBufferPool>>(
std::move(prep_func), std::forward<MultiShotFunc>(func), &pool);
return detail::maybe_flag_fixed_fd(std::move(op), fd);
}
#endif

/**
* @brief See io_uring_prep_openat2
*/
Expand Down
4 changes: 3 additions & 1 deletion include/condy/cqe_handler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ struct SimpleCQEHandler {
* result of the operation (the value of `cqe->res`) and the selected buffer,
* whose type is determined by the buffer ring.
*/
template <BufferRingLike Br> class SelectBufferCQEHandler {
template <typename Br>
requires(requires(Br *br, io_uring_cqe *cqe) { br->handle_finish(cqe); })
class SelectBufferCQEHandler {
public:
SelectBufferCQEHandler(Br *buffers) : buffers_(buffers) {}

Expand Down
9 changes: 9 additions & 0 deletions include/condy/detail/async_operations.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,5 +191,14 @@ inline void prep_sendto_zc_fixed(io_uring_sqe *sqe, int sockfd, const void *buf,
sqe->buf_index = buf_index;
}

#if !IO_URING_CHECK_VERSION(2, 15) // >= 2.15
inline void prep_recv_zc_multishot(io_uring_sqe *sqe, int fd,
uint32_t zcrx_id) {
io_uring_prep_rw(IORING_OP_RECV_ZC, sqe, fd, nullptr, 0, 0);
sqe->ioprio |= IORING_RECV_MULTISHOT;
sqe->zcrx_ifq_idx = zcrx_id;
}
#endif

} // namespace detail
} // namespace condy
295 changes: 295 additions & 0 deletions include/condy/zcrx.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,295 @@
#pragma once

#include "condy/detail/buffers.hpp"
#include "condy/detail/context.hpp"
#include "condy/detail/utils.hpp"
#include "condy/runtime.hpp"
#include <sys/mman.h>

namespace condy {

#if !IO_URING_CHECK_VERSION(2, 15) // >= 2.15

class ZeroCopyRxBufferPool;

/**
* @brief Buffer from a ZeroCopyRxBufferPool.
* @details This buffer type is used for buffers obtained from a
* ZeroCopyRxBufferPool. It automatically returns the buffer to the pool when it
* is out of scope.
* @note The lifetime of the buffer must not exceed the lifetime of the
* ZeroCopyRxBufferPool it is associated with.
*/
using ZeroCopyRxBuffer = detail::ManagedBuffer<ZeroCopyRxBufferPool>;

/**
* @brief Area for zero-copy receive buffers.
*/
struct ZeroCopyRxArea {
void *addr = nullptr;
size_t size;
};

/**
* @brief Area for zero-copy receive buffers using DMA-BUF.
*/
struct ZeroCopyRxDMABufArea {
int dmabuf_fd;
size_t offset;
size_t size;
};

/**
* @brief Buffer pool for zero-copy receive buffers.
* @details This buffer pool utilizes the io_uring zcrx feature to provide
* zero-copy receive buffers. It can be used to receive data directly into
* user-space buffers without copying, which can improve performance for
* high-throughput network applications.
* @returns std::pair<int32_t, ZeroCopyRxBuffer> When passed to async
* operations, the return type will be a pair of the operation result and the
* @ref ZeroCopyRxBuffer.
* @note The lifetime of this pool must not exceed the running period of the
* associated Runtime, and the lifetime of any ZeroCopyRxBuffer obtained from
* this pool must not exceed the lifetime of this pool.
*/
class ZeroCopyRxBufferPool {
public:
/**
* @brief Construct a new Zero Copy Rx Buffer Pool object
* @param if_idx Network interface index to register the buffer pool with.
* @param if_rxq Receive queue index to register the buffer pool with.
* @param rq_entries Number of receive queue entries.
* @param area Area for zero-copy receive buffers.
*/
ZeroCopyRxBufferPool(uint32_t if_idx, uint32_t if_rxq, uint32_t rq_entries,
const ZeroCopyRxArea &area)
: ZeroCopyRxBufferPool(*detail::Context::current().runtime(), if_idx,
if_rxq, rq_entries, area) {}

/**
* @brief Construct a new Zero Copy Rx Buffer Pool object
* @param runtime The runtime to register the buffer pool with.
* @param if_idx Network interface index to register the buffer pool with.
* @param if_rxq Receive queue index to register the buffer pool with.
* @param rq_entries Number of receive queue entries.
* @param area Area for zero-copy receive buffers.
*/
ZeroCopyRxBufferPool(Runtime &runtime, uint32_t if_idx, uint32_t if_rxq,
uint32_t rq_entries, const ZeroCopyRxArea &area)
: ZeroCopyRxBufferPool(runtime.ring(), if_idx, if_rxq, rq_entries, area,
0) {}

// Device-less constructor, DO NOT use this in production code if you don't
// know what you are doing.
ZeroCopyRxBufferPool(Runtime &runtime, uint32_t rq_entries,
const ZeroCopyRxArea &area)
: ZeroCopyRxBufferPool(runtime.ring(), 0, 0, rq_entries, area,
ZCRX_REG_NODEV) {
device_less_ = true;
}

/**
* @brief Construct a new Zero Copy Rx Buffer Pool object
* @param if_idx Network interface index to register the buffer pool with.
* @param if_rxq Receive queue index to register the buffer pool with.
* @param rq_entries Number of receive queue entries.
* @param area Area for zero-copy receive buffers using DMA-BUF.
*/
ZeroCopyRxBufferPool(uint32_t if_idx, uint32_t if_rxq, uint32_t rq_entries,
const ZeroCopyRxDMABufArea &area)
: ZeroCopyRxBufferPool(*detail::Context::current().runtime(), if_idx,
if_rxq, rq_entries, area) {}

/**
* @brief Construct a new Zero Copy Rx Buffer Pool object
* @param runtime The runtime to register the buffer pool with.
* @param if_idx Network interface index to register the buffer pool with.
* @param if_rxq Receive queue index to register the buffer pool with.
* @param rq_entries Number of receive queue entries.
* @param area Area for zero-copy receive buffers using DMA-BUF.
*/
ZeroCopyRxBufferPool(Runtime &runtime, uint32_t if_idx, uint32_t if_rxq,
uint32_t rq_entries, const ZeroCopyRxDMABufArea &area)
: ring_(&runtime.ring()) {
area_size_ = 0;
area_ptr_ = nullptr;

io_uring_zcrx_area_reg area_reg = {};
area_reg.addr = area.offset;
area_reg.len = area.size;
area_reg.flags = IORING_ZCRX_AREA_DMABUF;

register_ifq_(if_idx, if_rxq, rq_entries, area_reg,
sysconf(_SC_PAGESIZE), 0);
}

~ZeroCopyRxBufferPool() {
[[maybe_unused]] int r;
if (area_size_ > 0) {
assert(area_ptr_ != nullptr);
r = munmap(area_ptr_, area_size_);
assert(r == 0);
}
assert(rq_ring_.ring_ptr != nullptr);
r = munmap(rq_ring_.ring_ptr, ring_size_);
assert(r == 0);
// TODO: Unregister ifq
}

CONDY_DELETE_COPY_MOVE(ZeroCopyRxBufferPool);

private:
ZeroCopyRxBufferPool(detail::Ring &ring, uint32_t if_idx, uint32_t if_rxq,
uint32_t rq_entries, const ZeroCopyRxArea &area,
uint32_t flags)
: ring_(&ring) {
const size_t page_size = sysconf(_SC_PAGESIZE);

if (area.addr == nullptr) {
area_size_ = detail::align_up(area.size, page_size);
area_ptr_ = mmap(nullptr, area_size_, PROT_READ | PROT_WRITE,
MAP_ANONYMOUS | MAP_PRIVATE, 0, 0);
if (area_ptr_ == MAP_FAILED) {
throw detail::make_system_error("mmap");
}
auto d = detail::defer([&]() { munmap(area_ptr_, area_size_); });

io_uring_zcrx_area_reg area_reg = {};
area_reg.addr = reinterpret_cast<uint64_t>(area_ptr_);
area_reg.len = area_size_;
area_reg.flags = 0;

register_ifq_(if_idx, if_rxq, rq_entries, area_reg, page_size,
flags);

d.dismiss();
} else {
// Not owned, so we don't track the size for unmapping
area_size_ = 0;
area_ptr_ = area.addr;

io_uring_zcrx_area_reg area_reg = {};
area_reg.addr = reinterpret_cast<uint64_t>(area_ptr_);
area_reg.len = area.size;
area_reg.flags = 0;

register_ifq_(if_idx, if_rxq, rq_entries, area_reg, page_size,
flags);
}
}

public:
uint32_t zcrx_id() const noexcept { return zcrx_id_; }

ZeroCopyRxBuffer handle_finish(io_uring_cqe *cqe) noexcept {
if (cqe->res < 0) {
return ZeroCopyRxBuffer();
}
io_uring_zcrx_cqe *rcqe =
reinterpret_cast<io_uring_zcrx_cqe *>(cqe + 1);
void *data = static_cast<char *>(area_ptr_) +
(rcqe->off & ~IORING_ZCRX_AREA_MASK);
size_t size = static_cast<size_t>(cqe->res);
return ZeroCopyRxBuffer(data, size, this);
}

void add_buffer_back(void *ptr, size_t size) noexcept {
rq_enqueue_(ptr, size);
maybe_flush_rq_();
}

private:
void register_ifq_(uint32_t if_idx, uint32_t if_rxq, uint32_t rq_entries,
io_uring_zcrx_area_reg &area_reg, size_t page_size,
uint32_t flags) {
rq_entries = std::bit_ceil(rq_entries);
io_uring_region_desc region_reg = {};
ring_size_ = get_refill_ring_size_(rq_entries, page_size);
region_reg.user_addr = 0;
region_reg.size = ring_size_;
region_reg.flags = 0;

io_uring_zcrx_ifq_reg reg = {};
reg.if_idx = if_idx;
reg.if_rxq = if_rxq;
reg.rq_entries = rq_entries;
reg.area_ptr = reinterpret_cast<uint64_t>(&area_reg);
reg.region_ptr = reinterpret_cast<uint64_t>(&region_reg);
reg.flags = flags;

int r = io_uring_register_ifq(ring_->ring(), &reg);
if (r != 0) {
throw detail::make_system_error("io_uring_register_ifq", -r);
}
// TODO: unregister ifq if any exception

void *ring_ptr = mmap(nullptr, ring_size_, PROT_READ | PROT_WRITE,
MAP_SHARED | MAP_POPULATE, ring_->ring()->ring_fd,
static_cast<off_t>(region_reg.mmap_offset));
if (ring_ptr == MAP_FAILED) {
throw detail::make_system_error("mmap");
}
rq_ring_.khead = (unsigned int *)((char *)ring_ptr + reg.offsets.head);
rq_ring_.ktail = (unsigned int *)((char *)ring_ptr + reg.offsets.tail);
rq_ring_.rqes =
(struct io_uring_zcrx_rqe *)((char *)ring_ptr + reg.offsets.rqes);
rq_ring_.rq_tail = 0;
rq_ring_.ring_entries = reg.rq_entries;
rq_ring_.ring_ptr = ring_ptr;

zcrx_id_ = reg.zcrx_id;
area_token_ = area_reg.rq_area_token;
}

static size_t get_refill_ring_size_(uint32_t rq_entries,
size_t page_size) noexcept {
size_t ring_size = rq_entries * sizeof(io_uring_zcrx_rqe);
ring_size += page_size;
ring_size = detail::align_up(ring_size, page_size);
return ring_size;
}

size_t rq_nr_queued_() const noexcept {
return rq_ring_.rq_tail - io_uring_smp_load_acquire(rq_ring_.khead);
}

void rq_enqueue_(void *ptr, size_t size) noexcept {
assert(rq_nr_queued_() < rq_ring_.ring_entries);
io_uring_zcrx_rqe *rqe;
unsigned rq_mask = rq_ring_.ring_entries - 1;
rqe = &rq_ring_.rqes[rq_ring_.rq_tail & rq_mask];
rqe->off = (static_cast<char *>(ptr) - static_cast<char *>(area_ptr_)) |
area_token_;
rqe->len = static_cast<uint32_t>(size);
io_uring_smp_store_release(rq_ring_.ktail, ++rq_ring_.rq_tail);
}

void flush_rq_() noexcept {
zcrx_ctrl ctrl = {};
ctrl.zcrx_id = zcrx_id_;
ctrl.op = ZCRX_CTRL_FLUSH_RQ;
[[maybe_unused]] int r =
io_uring_register_zcrx_ctrl(ring_->ring(), &ctrl);
assert(r == 0);
}

void maybe_flush_rq_() noexcept {
if (rq_nr_queued_() >= rq_ring_.ring_entries || device_less_) {
flush_rq_();
}
}

private:
detail::Ring *ring_;
void *area_ptr_;
size_t area_size_;
size_t ring_size_;
io_uring_zcrx_rq rq_ring_;
uint32_t zcrx_id_;
uint64_t area_token_;
bool device_less_ = false;
};

#endif

} // namespace condy
Loading
Loading