From 3e039293a081608ad42ecfbb13b7324e24672e2a Mon Sep 17 00:00:00 2001 From: Arthur Vasseur Date: Mon, 6 Apr 2026 17:25:07 +0200 Subject: [PATCH] feat: implement ThreadPool class with task management and worker threads --- .github/workflows/coverage.yml | 2 +- .github/workflows/macos.yml | 15 +- .github/workflows/ubuntu24.yml | 18 +- Src/Concerto/Core/ThreadPool/ThreadPool.cpp | 136 +++++ Src/Concerto/Core/ThreadPool/ThreadPool.hpp | 115 ++++ Src/Concerto/Core/ThreadPool/ThreadPool.inl | 137 +++++ Src/Tests/ThreadPool.cpp | 583 ++++++++++++++++++++ xmake.lua | 10 + 8 files changed, 1003 insertions(+), 13 deletions(-) create mode 100644 Src/Concerto/Core/ThreadPool/ThreadPool.cpp create mode 100644 Src/Concerto/Core/ThreadPool/ThreadPool.hpp create mode 100644 Src/Concerto/Core/ThreadPool/ThreadPool.inl create mode 100644 Src/Tests/ThreadPool.cpp diff --git a/.github/workflows/coverage.yml b/.github/workflows/coverage.yml index e118ad2..b3a7e4c 100644 --- a/.github/workflows/coverage.yml +++ b/.github/workflows/coverage.yml @@ -94,7 +94,7 @@ jobs: if: runner.os == 'Linux' run: | xmake run concerto-core-tests - gcovr -x coverage.out -s -f "Src/Concerto/Core/.*" build/.objs/ + gcovr -x coverage.out -s -f "Src/Concerto/Core/.*" --gcov-ignore-parse-errors=negative_hits.warn build/.objs/ - name: Run unit tests and generate coverage output (Windows) if: runner.os == 'Windows' diff --git a/.github/workflows/macos.yml b/.github/workflows/macos.yml index f37638e..203a462 100644 --- a/.github/workflows/macos.yml +++ b/.github/workflows/macos.yml @@ -15,14 +15,17 @@ jobs: strategy: fail-fast: false matrix: - os: [macos-14] - arch: [x86_64, arm64] + os: [macos-15] + arch: [arm64] mode: [debug, release, releasedbg] kind: [static, shared] steps: - - name: Update clang to 18 - run: brew install llvm@18 + - name: Update clang to 20 + run: | + brew install llvm@20 && brew link llvm@20 + LLVM_PREFIX=$(brew --prefix llvm@20) + echo "LLVM_PREFIX=${LLVM_PREFIX}" >> $GITHUB_ENV - name: Get current date as package key id: cache_key @@ -55,7 +58,7 @@ jobs: key: macOS-${{ matrix.arch }}-${{ matrix.mode }}-${{ matrix.kind }}-${{ steps.dep_hash.outputs.hash }}-W${{ steps.cache_key.outputs.key }} - name: Configure xmake and install dependencies - run: xmake config --arch=${{ matrix.arch }} --mode=${{ matrix.mode }} -k ${{ matrix.kind }} --yes --cc=$(brew --prefix llvm@18)/bin/clang-18 --cxx=$(brew --prefix llvm@18)/bin/clang++ --tests=y + run: xmake config -vD --arch=${{ matrix.arch }} --mode=${{ matrix.mode }} -k ${{ matrix.kind }} --yes --toolchain=llvm --runtimes=c++_shared --sdk=${LLVM_PREFIX} --tests=y - name: Save cached xmake dependencies if: ${{ !steps.restore-depcache.outputs.cache-hit }} @@ -65,7 +68,7 @@ jobs: key: ${{ steps.restore-depcache.outputs.cache-primary-key }} - name: Build - run: xmake --yes -v + run: xmake --yes -vD - name: Run unit tests run: xmake run concerto-core-tests \ No newline at end of file diff --git a/.github/workflows/ubuntu24.yml b/.github/workflows/ubuntu24.yml index c8307f5..9e720cd 100644 --- a/.github/workflows/ubuntu24.yml +++ b/.github/workflows/ubuntu24.yml @@ -17,17 +17,23 @@ jobs: strategy: matrix: - os: [ubuntu-24.04] + os: [ubuntu-latest] arch: [x86_64, arm64] mode: [debug, release, releasedbg] kind: [static, shared] steps: - - uses: actions/checkout@v4 + - uses: actions/checkout@v5 - - name: Set gcc/g++ to 13 + - name: Set gcc/g++ to latest available run: | - ln -s /usr/bin/g++-13 /usr/local/bin/g++ - ln -s /usr/bin/gcc-13 /usr/local/bin/gcc + # Try gcc-14 first, fall back to gcc-13 + if command -v g++-14 &> /dev/null; then + ln -sf /usr/bin/g++-14 /usr/local/bin/g++ + ln -sf /usr/bin/gcc-14 /usr/local/bin/gcc + else + ln -sf /usr/bin/g++-13 /usr/local/bin/g++ + ln -sf /usr/bin/gcc-13 /usr/local/bin/gcc + fi - name: Get current date as package key id: cache_key @@ -39,7 +45,7 @@ jobs: - name: Setup xmake uses: xmake-io/github-action-setup-xmake@v1 with: - xmake-version: branch@dev + xmake-version: latest actions-cache-folder: .xmake-cache-W${{ steps.cache_key.outputs.key }} - name: Update xmake repo diff --git a/Src/Concerto/Core/ThreadPool/ThreadPool.cpp b/Src/Concerto/Core/ThreadPool/ThreadPool.cpp new file mode 100644 index 0000000..aef4e87 --- /dev/null +++ b/Src/Concerto/Core/ThreadPool/ThreadPool.cpp @@ -0,0 +1,136 @@ +// +// Created by arthur on 28/10/2025. +// + +#include "Concerto/Core/ThreadPool/ThreadPool.hpp" + +namespace cct +{ + ThreadPool::ThreadPool(unsigned int numThreads) + { + if (numThreads == 0) + { + numThreads = std::thread::hardware_concurrency(); + if (numThreads == 0) + { + numThreads = 1; + } + } + + m_workers.reserve(numThreads); + + for (unsigned int i = 0; i < numThreads; ++i) + { + m_workers.emplace_back([this, i]() + { WorkerLoop(i + 1); }); + } + } + + ThreadPool::~ThreadPool() noexcept + { + RequestStop(); + } + + void ThreadPool::WorkerLoop(unsigned int workerIndex) + { + while (true) + { + std::function task; + { + std::unique_lock lock(m_queueMutex); + + m_queueCv.wait(lock, [this]() + { return !m_taskQueue.empty() || + m_stopRequested.load(std::memory_order_acquire); }); + + if (m_stopRequested.load(std::memory_order_acquire) && m_taskQueue.empty()) + break; + + if (m_taskQueue.empty()) + continue; + + task = std::move(m_taskQueue.front()); + m_taskQueue.pop_front(); + } + + if (task) + { + try + { + task(); + } + catch (const std::exception& e) + { + std::cerr << "[ThreadPool::Worker#" << workerIndex + << "] Exception caught: " << e.what() << '\n'; + } + catch (...) + { + std::cerr << "[ThreadPool::Worker#" << workerIndex + << "] Unknown exception caught\n"; + } + TaskCompleted(); + } + } + } + + void ThreadPool::TaskCompleted() noexcept + { + size_t prev = m_tasksInFlight.fetch_sub(1, std::memory_order_acq_rel); + + if (prev == 1) + { + std::lock_guard lock(m_waitMutex); + m_waitCv.notify_all(); + } + } + + bool ThreadPool::Wait(std::chrono::steady_clock::time_point deadline) + { + std::unique_lock lock(m_waitMutex); + + return m_waitCv.wait_until(lock, deadline, [this]() + { return m_tasksInFlight.load(std::memory_order_acquire) == 0; }); + } + + bool ThreadPool::WaitFor(std::chrono::milliseconds timeout) + { + auto deadline = std::chrono::steady_clock::now() + timeout; + return Wait(deadline); + } + + void ThreadPool::RequestStop() noexcept + { + bool expected = false; + if (!m_stopRequested.compare_exchange_strong(expected, true, + std::memory_order_acq_rel)) + { + // Stop already requested, wait for workers to finish + for (auto& worker : m_workers) + { + if (worker.joinable()) + worker.join(); + } + return; + } + + // Notify all workers while holding the lock to ensure they see the stop flag + { + std::lock_guard lock(m_queueMutex); + m_queueCv.notify_all(); + } + + // Wait for all workers to finish + for (auto& worker : m_workers) + { + if (worker.joinable()) + worker.join(); + } + } + + size_t ThreadPool::GetWorkerCount() const noexcept + { + return m_workers.size(); + } + +} // namespace cct diff --git a/Src/Concerto/Core/ThreadPool/ThreadPool.hpp b/Src/Concerto/Core/ThreadPool/ThreadPool.hpp new file mode 100644 index 0000000..25c071b --- /dev/null +++ b/Src/Concerto/Core/ThreadPool/ThreadPool.hpp @@ -0,0 +1,115 @@ +// +// Created by arthur on 28/10/2025. +// + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "Concerto/Core/Types/Types.hpp" + +namespace cct +{ + + class CCT_CORE_PUBLIC_API ThreadPool + { + public: + explicit ThreadPool(unsigned int numThreads = 0); + ~ThreadPool() noexcept; + + ThreadPool(const ThreadPool&) = delete; + ThreadPool& operator=(const ThreadPool&) = delete; + ThreadPool(ThreadPool&&) = delete; + ThreadPool& operator=(ThreadPool&&) = delete; + + /** + * @brief Adds a task to the pool without returning a result. + * + * @tparam F Callable type that can be invoked with no arguments. + * @param f Callable to execute. + * + * @note This method is thread-safe. Tasks are not accepted after + * RequestStop(). + */ + template + requires std::invocable> && + std::is_void_v>> + void AddTask(F&& f); + + /** + * @brief Submits a task and returns a future for its result. + * + * @tparam F Callable type. + * @return std::future with the result of the callable. + * + * @note This method is thread-safe. Tasks are not accepted after + * RequestStop(). + */ + template + requires std::invocable> + auto Submit(F&& f) -> std::future>>; + + /** + * @brief Waits until all in-flight tasks complete or deadline is reached. + * + * @param deadline Absolute time point to wait until. + * @return true if all tasks completed before deadline, false otherwise. + */ + bool Wait(std::chrono::steady_clock::time_point deadline); + + /** + * @brief Waits until all in-flight tasks complete or timeout expires. + * + * @param timeout Duration to wait. + * @return true if all tasks completed before timeout, false otherwise. + */ + bool WaitFor(std::chrono::milliseconds timeout); + + /** + * @brief Requests graceful shutdown. No new tasks will be accepted. + * + * Workers will finish current tasks and then exit. + * This method is idempotent and thread-safe. + */ + void RequestStop() noexcept; + + size_t GetWorkerCount() const noexcept; + + private: + void WorkerLoop(unsigned int workerIndex); + + void TaskCompleted() noexcept; + + // Thread management + std::vector m_workers; + + // Task queue and synchronization + std::deque> m_taskQueue; + std::mutex m_queueMutex; + std::condition_variable m_queueCv; + + // Wait synchronization + std::atomic m_tasksInFlight{0}; + std::condition_variable m_waitCv; + std::mutex m_waitMutex; + + // State + std::atomic m_stopRequested{false}; + }; + +} // namespace cct + +#include "ThreadPool.inl" diff --git a/Src/Concerto/Core/ThreadPool/ThreadPool.inl b/Src/Concerto/Core/ThreadPool/ThreadPool.inl new file mode 100644 index 0000000..0906a1a --- /dev/null +++ b/Src/Concerto/Core/ThreadPool/ThreadPool.inl @@ -0,0 +1,137 @@ +// +// Created by arthur on 28/10/2025. +// + +#include +#include +#include "Concerto/Core/Assert.hpp" + +namespace cct +{ + template + requires std::invocable> && std::is_void_v>> + void ThreadPool::AddTask(F&& f) + { + if (m_stopRequested.load(std::memory_order_acquire)) + return; + + m_tasksInFlight.fetch_add(1, std::memory_order_acq_rel); + + auto wrapped = [func = std::forward(f), this]() mutable + { + try + { + func(); + } + catch (const std::exception& e) + { + std::cerr << "[ThreadPool] Task threw exception: " << e.what() << '\n'; + } + catch (...) + { + std::cerr << "[ThreadPool] Task threw unknown exception" << '\n'; + } + }; + + { + std::lock_guard lock(m_queueMutex); + + if (m_stopRequested.load(std::memory_order_acquire)) + { + m_tasksInFlight.fetch_sub(1, std::memory_order_acq_rel); + return; + } + + m_taskQueue.emplace_back(std::move(wrapped)); + } + + m_queueCv.notify_all(); + } + + template + requires std::invocable> + auto ThreadPool::Submit(F&& f) -> std::future>> + { + using ReturnType = std::invoke_result_t>; + + auto promise = std::make_shared>(); + auto func = std::make_shared>(std::forward(f)); + std::future result = promise->get_future(); + + if (m_stopRequested.load(std::memory_order_acquire)) + { + promise->set_exception(std::make_exception_ptr(std::runtime_error("ThreadPool is shutting down"))); + return result; + } + + m_tasksInFlight.fetch_add(1, std::memory_order_acq_rel); + + auto wrapped_task = [promise, func]() + { + std::exception_ptr ex_ptr; + + try + { + if constexpr (std::is_void_v) + { + (*func)(); + promise->set_value(); + } + else + { + promise->set_value((*func)()); + } + } + catch (...) + { + ex_ptr = std::current_exception(); + } + + if (ex_ptr) + { + try + { + promise->set_exception(ex_ptr); + } + catch (const std::future_error& e) + { + CCT_ASSERT_FALSE("[ThreadPool] Future error when setting exception: {}", e.what()); + } + catch (const std::exception& e) + { + CCT_ASSERT_FALSE("[ThreadPool] Error setting exception in promise: {}", e.what()); + } + catch (...) + { + CCT_ASSERT_FALSE("[ThreadPool] Unknown error setting exception in promise"); + } + } + }; + + { + std::lock_guard lock(m_queueMutex); + + if (m_stopRequested.load(std::memory_order_acquire)) + { + m_tasksInFlight.fetch_sub(1, std::memory_order_acq_rel); + + try + { + promise->set_exception(std::make_exception_ptr(std::runtime_error("ThreadPool is shutting down"))); + } + catch (...) + { + } + + return result; + } + + m_taskQueue.emplace_back(std::move(wrapped_task)); + } + + m_queueCv.notify_all(); + + return result; + } + +} // namespace cct diff --git a/Src/Tests/ThreadPool.cpp b/Src/Tests/ThreadPool.cpp new file mode 100644 index 0000000..26bf8e5 --- /dev/null +++ b/Src/Tests/ThreadPool.cpp @@ -0,0 +1,583 @@ +/** + * @file Tests/ThreadPool.cpp + * @brief Unit tests for ThreadPool + * @date 2025-10-31 + */ + +#include +#include +#include +#include +#include +#include + +#include + +#include + +using cct::ThreadPool; +using namespace std::chrono_literals; + +TEST(ThreadPoolInitialization, DefaultConstruction) +{ + ThreadPool pool; + EXPECT_GT(pool.GetWorkerCount(), 0u); +} + +TEST(ThreadPoolInitialization, ExplicitThreadCount) +{ + ThreadPool pool(4); + EXPECT_EQ(pool.GetWorkerCount(), 4u); +} + +TEST(ThreadPoolInitialization, SingleThreadPool) +{ + ThreadPool pool(1); + EXPECT_EQ(pool.GetWorkerCount(), 1u); +} + +TEST(ThreadPoolAddTask, SingleTaskExecution) +{ + ThreadPool pool(4); + std::atomic executed{false}; + + pool.AddTask( + [&executed]() + { executed.store(true, std::memory_order_relaxed); }); + + ASSERT_TRUE(pool.WaitFor(1000ms)); + EXPECT_TRUE(executed.load()); +} + +TEST(ThreadPoolAddTask, MultipleTasksExecution) +{ + ThreadPool pool(4); + std::atomic counter{0}; + constexpr int numTasks = 100; + + for (int i = 0; i < numTasks; ++i) + { + pool.AddTask( + [&counter]() + { counter.fetch_add(1, std::memory_order_relaxed); }); + } + + ASSERT_TRUE(pool.WaitFor(5000ms)); + EXPECT_EQ(counter.load(), numTasks); +} + +TEST(ThreadPoolAddTask, TaskWithSharedState) +{ + ThreadPool pool(4); + std::atomic sum{0}; + constexpr int numTasks = 50; + + for (int i = 1; i <= numTasks; ++i) + { + pool.AddTask([&sum, i]() + { sum.fetch_add(i, std::memory_order_relaxed); }); + } + + ASSERT_TRUE(pool.WaitFor(5000ms)); + EXPECT_EQ(sum.load(), (numTasks * (numTasks + 1)) / 2); +} + +TEST(ThreadPoolSubmit, ReturningInt) +{ + ThreadPool pool(4); + auto future = pool.Submit([]() + { return 42; }); + + ASSERT_TRUE(pool.WaitFor(1000ms)); + EXPECT_EQ(future.get(), 42); +} + +TEST(ThreadPoolSubmit, ReturningString) +{ + ThreadPool pool(4); + auto future = pool.Submit([]() + { return std::string("Hello, ThreadPool!"); }); + + ASSERT_TRUE(pool.WaitFor(1000ms)); + EXPECT_EQ(future.get(), "Hello, ThreadPool!"); +} + +TEST(ThreadPoolSubmit, WithComputation) +{ + ThreadPool pool(4); + auto future = pool.Submit([]() + { + int sum = 0; + for (int i = 1; i <= 100; ++i) + sum += i; + return sum; }); + + ASSERT_TRUE(pool.WaitFor(1000ms)); + EXPECT_EQ(future.get(), 5050); +} + +TEST(ThreadPoolSubmit, MultipleSubmitCalls) +{ + ThreadPool pool(4); + std::vector> futures; + constexpr int numTasks = 20; + + for (int i = 0; i < numTasks; ++i) + { + futures.push_back(pool.Submit([i]() + { return i * i; })); + } + + ASSERT_TRUE(pool.WaitFor(5000ms)); + + for (int i = 0; i < numTasks; ++i) + { + EXPECT_EQ(futures[i].get(), i * i); + } +} + +TEST(ThreadPoolExceptionHandling, TaskThrowingException) +{ + ThreadPool pool(4); + auto future = + pool.Submit([]() -> int + { throw std::runtime_error("Test exception"); }); + + EXPECT_TRUE(pool.WaitFor(1000ms)); + EXPECT_THROW(future.get(), std::runtime_error); +} + +TEST(ThreadPoolExceptionHandling, MultipleTasksWithExceptions) +{ + ThreadPool pool(4); + std::vector> futures; + + for (int i = 0; i < 10; ++i) + { + futures.push_back(pool.Submit([i]() -> int + { + if (i % 2 == 0) + throw std::runtime_error("Even number"); + return i; })); + } + + ASSERT_TRUE(pool.WaitFor(5000ms)); + + for (int i = 0; i < 10; ++i) + { + if (i % 2 == 0) + EXPECT_THROW(futures[i].get(), std::runtime_error); + else + EXPECT_EQ(futures[i].get(), i); + } +} + +TEST(ThreadPoolExceptionHandling, PoolContinuesAfterException) +{ + ThreadPool pool(4); + auto future1 = + pool.Submit([]() -> int + { throw std::runtime_error("First exception"); }); + + ASSERT_TRUE(pool.WaitFor(1000ms)); + EXPECT_THROW(future1.get(), std::runtime_error); + + auto future2 = pool.Submit([]() + { return 42; }); + + ASSERT_TRUE(pool.WaitFor(1000ms)); + EXPECT_EQ(future2.get(), 42); +} + +TEST(ThreadPoolWait, WaitForWithImmediateCompletion) +{ + ThreadPool pool(4); + std::atomic counter{0}; + + for (int i = 0; i < 10; ++i) + { + pool.AddTask( + [&counter]() + { counter.fetch_add(1, std::memory_order_relaxed); }); + } + + ASSERT_TRUE(pool.WaitFor(5000ms)); + EXPECT_EQ(counter.load(), 10); +} + +TEST(ThreadPoolWait, WaitForWithTimeout) +{ + ThreadPool pool(4); + + pool.AddTask([]() + { std::this_thread::sleep_for(500ms); }); + + EXPECT_FALSE(pool.WaitFor(100ms)); + EXPECT_TRUE(pool.WaitFor(1000ms)); +} + +TEST(ThreadPoolWait, WaitWithDeadline) +{ + ThreadPool pool(4); + + pool.AddTask([]() + { std::this_thread::sleep_for(500ms); }); + + auto deadline = std::chrono::steady_clock::now() + 200ms; + EXPECT_FALSE(pool.Wait(deadline)); + + deadline = std::chrono::steady_clock::now() + 1000ms; + EXPECT_TRUE(pool.Wait(deadline)); +} + +TEST(ThreadPoolWait, MultipleWaitCalls) +{ + ThreadPool pool(4); + std::atomic counter{0}; + + for (int i = 0; i < 50; ++i) + { + pool.AddTask( + [&counter]() + { counter.fetch_add(1, std::memory_order_relaxed); }); + } + + ASSERT_TRUE(pool.WaitFor(5000ms)); + EXPECT_EQ(counter.load(), 50); + EXPECT_TRUE(pool.WaitFor(100ms)); +} + +TEST(ThreadPoolStop, RequestStopWithEmptyQueue) +{ + ThreadPool pool(4); + std::atomic counter{0}; + + for (int i = 0; i < 10; ++i) + { + pool.AddTask( + [&counter]() + { counter.fetch_add(1, std::memory_order_relaxed); }); + } + + ASSERT_TRUE(pool.WaitFor(5000ms)); + pool.RequestStop(); + + EXPECT_EQ(counter.load(), 10); +} + +TEST(ThreadPoolStop, RequestStopIsIdempotent) +{ + ThreadPool pool(4); + + pool.RequestStop(); + pool.RequestStop(); + pool.RequestStop(); +} + +TEST(ThreadPoolStop, NoNewTasksAfterRequestStop) +{ + ThreadPool pool(4); + std::atomic counter{0}; + + pool.RequestStop(); + + pool.AddTask( + [&counter]() + { counter.fetch_add(1, std::memory_order_relaxed); }); + + std::this_thread::sleep_for(100ms); + EXPECT_EQ(counter.load(), 0); +} + +TEST(ThreadPoolStop, SubmitAfterRequestStopFailsGracefully) +{ + ThreadPool pool(4); + pool.RequestStop(); + + auto future = pool.Submit([]() + { return 42; }); + + EXPECT_TRUE(future.valid()); +} + +TEST(ThreadPoolDestruction, DestructionWithEmptyQueue) +{ + std::atomic counter{0}; + + { + ThreadPool pool(4); + + for (int i = 0; i < 10; ++i) + { + pool.AddTask( + [&counter]() + { counter.fetch_add(1, std::memory_order_relaxed); }); + } + + ASSERT_TRUE(pool.WaitFor(5000ms)); + } + + EXPECT_EQ(counter.load(), 10); +} + +TEST(ThreadPoolDestruction, DestructionWithPendingTasks) +{ + std::atomic counter{0}; + + { + ThreadPool pool(4); + + for (int i = 0; i < 100; ++i) + { + pool.AddTask([&counter]() + { + std::this_thread::sleep_for(10ms); + counter.fetch_add(1, std::memory_order_relaxed); }); + } + } + + EXPECT_GE(counter.load(), 0); +} + +TEST(ThreadPoolConcurrentOperations, ConcurrentAddTaskFromMultipleThreads) +{ + ThreadPool pool(8); + std::atomic counter{0}; + constexpr int numThreads = 10; + constexpr int tasksPerThread = 100; + + std::vector threads; + for (int t = 0; t < numThreads; ++t) + { + threads.emplace_back([&pool, &counter]() + { + for (int i = 0; i < tasksPerThread; ++i) { + pool.AddTask( + [&counter]() { counter.fetch_add(1, std::memory_order_relaxed); }); + } }); + } + + for (auto& thread : threads) + thread.join(); + + ASSERT_TRUE(pool.WaitFor(10000ms)); + EXPECT_EQ(counter.load(), numThreads * tasksPerThread); +} + +TEST(ThreadPoolConcurrentOperations, ConcurrentSubmitFromMultipleThreads) +{ + ThreadPool pool(8); + constexpr int numThreads = 10; + constexpr int tasksPerThread = 50; + + std::vector threads; + std::vector>> allFutures(numThreads); + + for (int t = 0; t < numThreads; ++t) + { + threads.emplace_back([&pool, &allFutures, t]() + { + for (int i = 0; i < tasksPerThread; ++i) { + allFutures[t].push_back(pool.Submit([i]() { return i; })); + } }); + } + + for (auto& thread : threads) + thread.join(); + + ASSERT_TRUE(pool.WaitFor(10000ms)); + + for (int t = 0; t < numThreads; ++t) + { + for (int i = 0; i < tasksPerThread; ++i) + { + EXPECT_EQ(allFutures[t][i].get(), i); + } + } +} + +TEST(ThreadPoolConcurrentOperations, ConcurrentWaitFromMultipleThreads) +{ + ThreadPool pool(8); + std::atomic counter{0}; + + for (int i = 0; i < 100; ++i) + { + pool.AddTask([&counter]() + { + std::this_thread::sleep_for(10ms); + counter.fetch_add(1, std::memory_order_relaxed); }); + } + + std::vector threads; + std::atomic waitSuccessCount{0}; + + for (int t = 0; t < 5; ++t) + { + threads.emplace_back([&pool, &waitSuccessCount]() + { + if (pool.WaitFor(10000ms)) + waitSuccessCount.fetch_add(1, std::memory_order_relaxed); }); + } + + for (auto& thread : threads) + thread.join(); + + EXPECT_EQ(counter.load(), 100); + EXPECT_EQ(waitSuccessCount.load(), 5); +} + +TEST(ThreadPoolEdgeCases, TasksThatAddMoreTasks) +{ + ThreadPool pool(4); + std::atomic counter{0}; + + pool.AddTask([&pool, &counter]() + { + counter.fetch_add(1, std::memory_order_relaxed); + + pool.AddTask( + [&counter]() { counter.fetch_add(1, std::memory_order_relaxed); }); }); + + ASSERT_TRUE(pool.WaitFor(5000ms)); + EXPECT_EQ(counter.load(), 2); +} + +TEST(ThreadPoolEdgeCases, LargeNumberOfThreads) +{ + ThreadPool pool(100); + std::atomic counter{0}; + + for (int i = 0; i < 1000; ++i) + { + pool.AddTask( + [&counter]() + { counter.fetch_add(1, std::memory_order_relaxed); }); + } + + ASSERT_TRUE(pool.WaitFor(10000ms)); + EXPECT_EQ(counter.load(), 1000); +} + +TEST(ThreadPoolEdgeCases, TasksWithVaryingDurations) +{ + ThreadPool pool(4); + std::atomic counter{0}; + + for (int i = 0; i < 20; ++i) + { + pool.AddTask([&counter, i]() + { + if (i % 2 == 0) + std::this_thread::sleep_for(10ms); + else + std::this_thread::sleep_for(50ms); + counter.fetch_add(1, std::memory_order_relaxed); }); + } + + ASSERT_TRUE(pool.WaitFor(10000ms)); + EXPECT_EQ(counter.load(), 20); +} + +TEST(ThreadPoolEdgeCases, EmptyPoolBehavior) +{ + ThreadPool pool(4); + EXPECT_TRUE(pool.WaitFor(100ms)); +} + +TEST(ThreadPoolEdgeCases, ImmediateRequestStopAfterConstruction) +{ + ThreadPool pool(4); + pool.RequestStop(); + + std::atomic counter{0}; + pool.AddTask( + [&counter]() + { counter.fetch_add(1, std::memory_order_relaxed); }); + + std::this_thread::sleep_for(100ms); + EXPECT_EQ(counter.load(), 0); +} + +TEST(ThreadPoolStress, HighVolumeTaskProcessing) +{ + ThreadPool pool(8); + std::atomic counter{0}; + constexpr int numTasks = 10000; + + for (int i = 0; i < numTasks; ++i) + { + pool.AddTask( + [&counter]() + { counter.fetch_add(1, std::memory_order_relaxed); }); + } + + ASSERT_TRUE(pool.WaitFor(30000ms)); + EXPECT_EQ(counter.load(), numTasks); +} + +TEST(ThreadPoolStress, MixedAddTaskAndSubmit) +{ + ThreadPool pool(8); + std::atomic addTaskCounter{0}; + std::vector> futures; + constexpr int numOperations = 1000; + + for (int i = 0; i < numOperations; ++i) + { + if (i % 2 == 0) + { + pool.AddTask([&addTaskCounter]() + { addTaskCounter.fetch_add(1, std::memory_order_relaxed); }); + } + else + { + futures.push_back(pool.Submit([i]() + { return i; })); + } + } + + ASSERT_TRUE(pool.WaitFor(30000ms)); + EXPECT_EQ(addTaskCounter.load(), numOperations / 2); + + for (size_t i = 0; i < futures.size(); ++i) + { + EXPECT_EQ(futures[i].get(), static_cast(i * 2 + 1)); + } +} + +TEST(ThreadPoolThreadSafety, NoDataRacesWithSharedAtomic) +{ + ThreadPool pool(4); + std::atomic counter{0}; + constexpr int numIncrements = 10000; + + for (int i = 0; i < numIncrements; ++i) + { + pool.AddTask( + [&counter]() + { counter.fetch_add(1, std::memory_order_relaxed); }); + } + + ASSERT_TRUE(pool.WaitFor(30000ms)); + EXPECT_EQ(counter.load(), numIncrements); +} + +TEST(ThreadPoolThreadSafety, GetWorkerCountIsThreadSafe) +{ + ThreadPool pool(4); + std::vector threads; + + for (int t = 0; t < 10; ++t) + { + threads.emplace_back([&pool]() + { + for (int i = 0; i < 100; ++i) { + volatile size_t count = pool.GetWorkerCount(); + (void)count; + } }); + } + + for (auto& thread : threads) + thread.join(); +} diff --git a/xmake.lua b/xmake.lua index 580e187..f2edcb4 100644 --- a/xmake.lua +++ b/xmake.lua @@ -85,6 +85,16 @@ target("concerto-core") if has_config("asserts") then add_defines("CCT_ENABLE_ASSERTS") end + + -- macOS: ensure we link against the correct C++ runtime when using custom toolchain + if is_plat("macosx") then + local llvm_prefix = os.getenv("LLVM_PREFIX") + if llvm_prefix then + add_linkdirs(path.join(llvm_prefix, "lib")) + add_rpathdirs(path.join(llvm_prefix, "lib")) + end + end + target_end() if has_config("tests") then