diff --git a/include/condy/runtime.hpp b/include/condy/runtime.hpp index db9bc144..9dcaab26 100644 --- a/include/condy/runtime.hpp +++ b/include/condy/runtime.hpp @@ -23,6 +23,7 @@ #include #include #include +#include namespace condy { @@ -55,8 +56,7 @@ class Runtime { return; } - auto state = state_.load(); - if (state == State::Enabled) { + if (ring_enabled_.load(std::memory_order_relaxed)) { // Fast path: if the ring is enabled, we can directly schedule the // work detail::tsan_release(work); @@ -67,8 +67,7 @@ class Runtime { // Slow path: if the ring is not enabled, we need to acquire the // mutex to ensure the work is scheduled before the ring is enabled std::unique_lock lock(mutex_); - state = state_.load(); - if (state == State::Enabled) { + if (ring_enabled_.load(std::memory_order_relaxed)) { lock.unlock(); detail::tsan_release(work); schedule_msg_ring_( @@ -89,8 +88,7 @@ class Runtime { return; } - auto state = state_.load(); - if (state != State::Enabled) { + if (!ring_enabled_.load(std::memory_order_relaxed)) { return; } @@ -138,41 +136,39 @@ class Runtime { /** * @brief Run the runtime event loop in the current thread. - * @details This function starts the event loop of the runtime in the - * current thread. It will process events, schedule tasks, and handle - * notifications until there are no pending works left. - * @throw std::runtime_error If the runtime is already running or has been - * stopped. - * @note Once exit, the runtime cannot be restarted. + * @details Runs the event loop in the current thread until there are no + * pending works and exit is allowed. Can be called multiple times from + * the same thread. + * @note This function can only be called from one thread. Calling it from + * multiple threads, either concurrently or sequentially is not allowed. */ void run() { - State expected = State::Idle; - bool success = state_.compare_exchange_strong(expected, State::Running); - if (!success) { - throw std::runtime_error( - "Runtime is already running or has been stopped"); - } - auto d1 = detail::defer([this]() { state_.store(State::Stopped); }); - - [[maybe_unused]] int r; - r = io_uring_enable_rings(ring_.ring()); - assert(r == 0); - - { - std::lock_guard lock(mutex_); - flush_global_queue_(); - // Now that the ring is enabled and all pending works are scheduled, - // we can set the state to Enabled. - state_.store(State::Enabled); - } + detail::Context::current().init(this); + auto d = detail::defer([]() { detail::Context::current().reset(); }); - if (!disable_register_ring_fd_) { - r = io_uring_register_ring_fd(ring_.ring()); - assert(r == 1); // 1 indicates success for this call - } + if (run_thread_id_ == std::thread::id()) { + int r = io_uring_enable_rings(ring_.ring()); + if (r < 0) { + throw detail::make_system_error("io_uring_enable_rings", -r); + } - detail::Context::current().init(this); - auto d2 = detail::defer([]() { detail::Context::current().reset(); }); + run_thread_id_ = std::this_thread::get_id(); + + { + std::lock_guard lock(mutex_); + flush_global_queue_(); + ring_enabled_.store(true, std::memory_order_relaxed); + } + + if (!disable_register_ring_fd_) { + io_uring_register_ring_fd(ring_.ring()); + } + } else if (run_thread_id_ == std::this_thread::get_id()) { + flush_ring_(); + } else { + throw std::runtime_error( + "Runtime::run() can only be called from the same thread"); + } while (true) { tick_count_++; @@ -193,6 +189,9 @@ class Runtime { } flush_ring_wait_(); } + + tick_count_ = 0; + exit_allowed_.store(false, std::memory_order_release); } /** @@ -335,8 +334,7 @@ class Runtime { return; } - auto state = state_.load(); - if (state != State::Enabled) { + if (!ring_enabled_.load(std::memory_order_relaxed)) { return; } @@ -418,14 +416,6 @@ class Runtime { } private: - enum class State : uint8_t { - Idle, // Not running - Running, // Started running - Enabled, // Running and ring enabled - Stopped, // Stopped - }; - static_assert(std::atomic::is_always_lock_free); - using WorkListQueue = detail::IntrusiveSingleList; @@ -433,14 +423,15 @@ class Runtime { // Global state std::mutex mutex_; WorkListQueue global_queue_; - size_t pending_works_ = 0; std::atomic_bool exit_allowed_ = false; - std::atomic state_ = State::Idle; + std::atomic_bool ring_enabled_ = false; // Local state WorkListQueue local_queue_; detail::Ring ring_; + size_t pending_works_ = 0; size_t tick_count_ = 0; + std::thread::id run_thread_id_; // Configurable parameters size_t event_interval_ = 61; diff --git a/tests/test_runtime.cpp b/tests/test_runtime.cpp index 68ca62f4..bd3d31f6 100644 --- a/tests/test_runtime.cpp +++ b/tests/test_runtime.cpp @@ -250,3 +250,54 @@ TEST_CASE("test runtime - allow_exit from other thread") { t1.join(); } + +TEST_CASE("test runtime - multiple run calls") { + condy::Runtime runtime(options); + size_t run_count = 0; + + auto func = [&]() -> condy::Coro { + run_count++; + co_return; + }; + + // First run + condy::co_spawn(runtime, func()).detach(); + runtime.allow_exit(); + runtime.run(); + REQUIRE(run_count == 1); + + // Second run should also work + condy::co_spawn(runtime, func()).detach(); + runtime.allow_exit(); + runtime.run(); + REQUIRE(run_count == 2); +} + +TEST_CASE("test runtime - run from different thread throws") { + condy::Runtime runtime(options); + + // First call binds the thread + runtime.allow_exit(); + runtime.run(); + + // Second call from a different thread should throw + std::thread t( + [&]() { REQUIRE_THROWS_AS(runtime.run(), std::runtime_error); }); + t.join(); +} + +TEST_CASE("test runtime - cross-thread schedule between runs") { + condy::Runtime runtime(options); + + runtime.allow_exit(); + runtime.run(); + + SetFinishInvoker invoker; + std::thread t([&]() { runtime.schedule(&invoker); }); + t.join(); + + runtime.allow_exit(); + runtime.run(); + + REQUIRE(invoker.finished); +} diff --git a/tests/test_sync_wait.cpp b/tests/test_sync_wait.cpp index db36fb4a..4cafb1f8 100644 --- a/tests/test_sync_wait.cpp +++ b/tests/test_sync_wait.cpp @@ -71,3 +71,28 @@ TEST_CASE("test sync_wait - with allocator") { REQUIRE(result == 42); REQUIRE(finished); } + +TEST_CASE("test sync_wait - one runtime multiple times") { + bool finished1 = false; + bool finished2 = false; + + auto func1 = [&]() -> condy::Coro { + finished1 = true; + co_return 42; + }; + + auto func2 = [&]() -> condy::Coro { + finished2 = true; + co_return 84; + }; + + condy::Runtime runtime; + + auto result1 = condy::sync_wait(runtime, func1()); + REQUIRE(result1 == 42); + REQUIRE(finished1); + + auto result2 = condy::sync_wait(runtime, func2()); + REQUIRE(result2 == 84); + REQUIRE(finished2); +} \ No newline at end of file