Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 40 additions & 49 deletions include/condy/runtime.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <cstring>
#include <limits>
#include <mutex>
#include <thread>

namespace condy {

Expand Down Expand Up @@ -55,8 +56,7 @@ class Runtime {
return;
}

auto state = state_.load();
if (state == State::Enabled) {
if (ring_enabled_.load(std::memory_order_relaxed)) {
// Fast path: if the ring is enabled, we can directly schedule the
// work
detail::tsan_release(work);
Expand All @@ -67,8 +67,7 @@ class Runtime {
// Slow path: if the ring is not enabled, we need to acquire the
// mutex to ensure the work is scheduled before the ring is enabled
std::unique_lock<std::mutex> lock(mutex_);
state = state_.load();
if (state == State::Enabled) {
if (ring_enabled_.load(std::memory_order_relaxed)) {
lock.unlock();
detail::tsan_release(work);
schedule_msg_ring_(
Expand All @@ -89,8 +88,7 @@ class Runtime {
return;
}

auto state = state_.load();
if (state != State::Enabled) {
if (!ring_enabled_.load(std::memory_order_relaxed)) {
return;
}

Expand Down Expand Up @@ -138,41 +136,39 @@ class Runtime {

/**
* @brief Run the runtime event loop in the current thread.
* @details This function starts the event loop of the runtime in the
* current thread. It will process events, schedule tasks, and handle
* notifications until there are no pending works left.
* @throw std::runtime_error If the runtime is already running or has been
* stopped.
* @note Once exit, the runtime cannot be restarted.
* @details Runs the event loop in the current thread until there are no
* pending works and exit is allowed. Can be called multiple times from
* the same thread.
* @note This function can only be called from one thread. Calling it from
* multiple threads, either concurrently or sequentially is not allowed.
*/
void run() {
State expected = State::Idle;
bool success = state_.compare_exchange_strong(expected, State::Running);
if (!success) {
throw std::runtime_error(
"Runtime is already running or has been stopped");
}
auto d1 = detail::defer([this]() { state_.store(State::Stopped); });

[[maybe_unused]] int r;
r = io_uring_enable_rings(ring_.ring());
assert(r == 0);

{
std::lock_guard<std::mutex> lock(mutex_);
flush_global_queue_();
// Now that the ring is enabled and all pending works are scheduled,
// we can set the state to Enabled.
state_.store(State::Enabled);
}
detail::Context::current().init(this);
auto d = detail::defer([]() { detail::Context::current().reset(); });

if (!disable_register_ring_fd_) {
r = io_uring_register_ring_fd(ring_.ring());
assert(r == 1); // 1 indicates success for this call
}
if (run_thread_id_ == std::thread::id()) {
int r = io_uring_enable_rings(ring_.ring());
if (r < 0) {
throw detail::make_system_error("io_uring_enable_rings", -r);
}

detail::Context::current().init(this);
auto d2 = detail::defer([]() { detail::Context::current().reset(); });
run_thread_id_ = std::this_thread::get_id();

{
std::lock_guard<std::mutex> lock(mutex_);
flush_global_queue_();
ring_enabled_.store(true, std::memory_order_relaxed);
}

if (!disable_register_ring_fd_) {
io_uring_register_ring_fd(ring_.ring());
}
} else if (run_thread_id_ == std::this_thread::get_id()) {
flush_ring_();
} else {
throw std::runtime_error(
"Runtime::run() can only be called from the same thread");
}

while (true) {
tick_count_++;
Expand All @@ -193,6 +189,9 @@ class Runtime {
}
flush_ring_wait_();
}

tick_count_ = 0;
exit_allowed_.store(false, std::memory_order_release);
}

/**
Expand Down Expand Up @@ -335,8 +334,7 @@ class Runtime {
return;
}

auto state = state_.load();
if (state != State::Enabled) {
if (!ring_enabled_.load(std::memory_order_relaxed)) {
return;
}

Expand Down Expand Up @@ -418,29 +416,22 @@ class Runtime {
}

private:
enum class State : uint8_t {
Idle, // Not running
Running, // Started running
Enabled, // Running and ring enabled
Stopped, // Stopped
};
static_assert(std::atomic<State>::is_always_lock_free);

using WorkListQueue =
detail::IntrusiveSingleList<detail::WorkInvoker,
&detail::WorkInvoker::work_queue_entry_>;

// Global state
std::mutex mutex_;
WorkListQueue global_queue_;
size_t pending_works_ = 0;
std::atomic_bool exit_allowed_ = false;
std::atomic<State> state_ = State::Idle;
std::atomic_bool ring_enabled_ = false;

// Local state
WorkListQueue local_queue_;
detail::Ring ring_;
size_t pending_works_ = 0;
size_t tick_count_ = 0;
std::thread::id run_thread_id_;

// Configurable parameters
size_t event_interval_ = 61;
Expand Down
51 changes: 51 additions & 0 deletions tests/test_runtime.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,54 @@ TEST_CASE("test runtime - allow_exit from other thread") {

t1.join();
}

TEST_CASE("test runtime - multiple run calls") {
condy::Runtime runtime(options);
size_t run_count = 0;

auto func = [&]() -> condy::Coro<void> {
run_count++;
co_return;
};

// First run
condy::co_spawn(runtime, func()).detach();
runtime.allow_exit();
runtime.run();
REQUIRE(run_count == 1);

// Second run should also work
condy::co_spawn(runtime, func()).detach();
runtime.allow_exit();
runtime.run();
REQUIRE(run_count == 2);
}

TEST_CASE("test runtime - run from different thread throws") {
condy::Runtime runtime(options);

// First call binds the thread
runtime.allow_exit();
runtime.run();

// Second call from a different thread should throw
std::thread t(
[&]() { REQUIRE_THROWS_AS(runtime.run(), std::runtime_error); });
t.join();
}

TEST_CASE("test runtime - cross-thread schedule between runs") {
condy::Runtime runtime(options);

runtime.allow_exit();
runtime.run();

SetFinishInvoker invoker;
std::thread t([&]() { runtime.schedule(&invoker); });
t.join();

runtime.allow_exit();
runtime.run();

REQUIRE(invoker.finished);
}
25 changes: 25 additions & 0 deletions tests/test_sync_wait.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,28 @@ TEST_CASE("test sync_wait - with allocator") {
REQUIRE(result == 42);
REQUIRE(finished);
}

TEST_CASE("test sync_wait - one runtime multiple times") {
bool finished1 = false;
bool finished2 = false;

auto func1 = [&]() -> condy::Coro<int> {
finished1 = true;
co_return 42;
};

auto func2 = [&]() -> condy::Coro<int> {
finished2 = true;
co_return 84;
};

condy::Runtime runtime;

auto result1 = condy::sync_wait(runtime, func1());
REQUIRE(result1 == 42);
REQUIRE(finished1);

auto result2 = condy::sync_wait(runtime, func2());
REQUIRE(result2 == 84);
REQUIRE(finished2);
}
Loading