Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/coverage.yml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ jobs:
if: runner.os == 'Linux'
run: |
xmake run concerto-core-tests
gcovr -x coverage.out -s -f "Src/Concerto/Core/.*" build/.objs/
gcovr -x coverage.out -s -f "Src/Concerto/Core/.*" --gcov-ignore-parse-errors=negative_hits.warn build/.objs/

- name: Run unit tests and generate coverage output (Windows)
if: runner.os == 'Windows'
Expand Down
15 changes: 9 additions & 6 deletions .github/workflows/macos.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,17 @@ jobs:
strategy:
fail-fast: false
matrix:
os: [macos-14]
arch: [x86_64, arm64]
os: [macos-15]
arch: [arm64]
mode: [debug, release, releasedbg]
kind: [static, shared]

steps:
- name: Update clang to 18
run: brew install llvm@18
- name: Update clang to 20
run: |
brew install llvm@20 && brew link llvm@20
LLVM_PREFIX=$(brew --prefix llvm@20)
echo "LLVM_PREFIX=${LLVM_PREFIX}" >> $GITHUB_ENV

- name: Get current date as package key
id: cache_key
Expand Down Expand Up @@ -55,7 +58,7 @@ jobs:
key: macOS-${{ matrix.arch }}-${{ matrix.mode }}-${{ matrix.kind }}-${{ steps.dep_hash.outputs.hash }}-W${{ steps.cache_key.outputs.key }}

- name: Configure xmake and install dependencies
run: xmake config --arch=${{ matrix.arch }} --mode=${{ matrix.mode }} -k ${{ matrix.kind }} --yes --cc=$(brew --prefix llvm@18)/bin/clang-18 --cxx=$(brew --prefix llvm@18)/bin/clang++ --tests=y
run: xmake config -vD --arch=${{ matrix.arch }} --mode=${{ matrix.mode }} -k ${{ matrix.kind }} --yes --toolchain=llvm --runtimes=c++_shared --sdk=${LLVM_PREFIX} --tests=y

- name: Save cached xmake dependencies
if: ${{ !steps.restore-depcache.outputs.cache-hit }}
Expand All @@ -65,7 +68,7 @@ jobs:
key: ${{ steps.restore-depcache.outputs.cache-primary-key }}

- name: Build
run: xmake --yes -v
run: xmake --yes -vD

- name: Run unit tests
run: xmake run concerto-core-tests
18 changes: 12 additions & 6 deletions .github/workflows/ubuntu24.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@ jobs:

strategy:
matrix:
os: [ubuntu-24.04]
os: [ubuntu-latest]
arch: [x86_64, arm64]
mode: [debug, release, releasedbg]
kind: [static, shared]
steps:
- uses: actions/checkout@v4
- uses: actions/checkout@v5

- name: Set gcc/g++ to 13
- name: Set gcc/g++ to latest available
run: |
ln -s /usr/bin/g++-13 /usr/local/bin/g++
ln -s /usr/bin/gcc-13 /usr/local/bin/gcc
# Try gcc-14 first, fall back to gcc-13
if command -v g++-14 &> /dev/null; then
ln -sf /usr/bin/g++-14 /usr/local/bin/g++
ln -sf /usr/bin/gcc-14 /usr/local/bin/gcc
else
ln -sf /usr/bin/g++-13 /usr/local/bin/g++
ln -sf /usr/bin/gcc-13 /usr/local/bin/gcc
fi

- name: Get current date as package key
id: cache_key
Expand All @@ -39,7 +45,7 @@ jobs:
- name: Setup xmake
uses: xmake-io/github-action-setup-xmake@v1
with:
xmake-version: branch@dev
xmake-version: latest
actions-cache-folder: .xmake-cache-W${{ steps.cache_key.outputs.key }}

- name: Update xmake repo
Expand Down
136 changes: 136 additions & 0 deletions Src/Concerto/Core/ThreadPool/ThreadPool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
//
// Created by arthur on 28/10/2025.
//

#include "Concerto/Core/ThreadPool/ThreadPool.hpp"

namespace cct
{
ThreadPool::ThreadPool(unsigned int numThreads)
{
if (numThreads == 0)
{
numThreads = std::thread::hardware_concurrency();
if (numThreads == 0)
{
numThreads = 1;
}
}

m_workers.reserve(numThreads);

for (unsigned int i = 0; i < numThreads; ++i)
{
m_workers.emplace_back([this, i]()
{ WorkerLoop(i + 1); });
}
}

ThreadPool::~ThreadPool() noexcept
{
RequestStop();
}

void ThreadPool::WorkerLoop(unsigned int workerIndex)
{
while (true)
{
std::function<void()> task;
{
std::unique_lock lock(m_queueMutex);

m_queueCv.wait(lock, [this]()
{ return !m_taskQueue.empty() ||
m_stopRequested.load(std::memory_order_acquire); });

if (m_stopRequested.load(std::memory_order_acquire) && m_taskQueue.empty())
break;

if (m_taskQueue.empty())
continue;

task = std::move(m_taskQueue.front());
m_taskQueue.pop_front();
}

if (task)
{
try
{
task();
}
catch (const std::exception& e)
{
std::cerr << "[ThreadPool::Worker#" << workerIndex
<< "] Exception caught: " << e.what() << '\n';
}
catch (...)
{
std::cerr << "[ThreadPool::Worker#" << workerIndex
<< "] Unknown exception caught\n";
}
TaskCompleted();
}
}
}

void ThreadPool::TaskCompleted() noexcept
{
size_t prev = m_tasksInFlight.fetch_sub(1, std::memory_order_acq_rel);

if (prev == 1)
{
std::lock_guard lock(m_waitMutex);
m_waitCv.notify_all();
}
}

bool ThreadPool::Wait(std::chrono::steady_clock::time_point deadline)
{
std::unique_lock lock(m_waitMutex);

return m_waitCv.wait_until(lock, deadline, [this]()
{ return m_tasksInFlight.load(std::memory_order_acquire) == 0; });
}

bool ThreadPool::WaitFor(std::chrono::milliseconds timeout)
{
auto deadline = std::chrono::steady_clock::now() + timeout;
return Wait(deadline);
}

void ThreadPool::RequestStop() noexcept
{
bool expected = false;
if (!m_stopRequested.compare_exchange_strong(expected, true,
std::memory_order_acq_rel))
{
// Stop already requested, wait for workers to finish
for (auto& worker : m_workers)
{
if (worker.joinable())
worker.join();
}
return;
}

// Notify all workers while holding the lock to ensure they see the stop flag
{
std::lock_guard lock(m_queueMutex);
m_queueCv.notify_all();
}

// Wait for all workers to finish
for (auto& worker : m_workers)
{
if (worker.joinable())
worker.join();
}
}

size_t ThreadPool::GetWorkerCount() const noexcept
{
return m_workers.size();
}

} // namespace cct
115 changes: 115 additions & 0 deletions Src/Concerto/Core/ThreadPool/ThreadPool.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
//
// Created by arthur on 28/10/2025.
//

#pragma once

#include <atomic>
#include <chrono>
#include <concepts>
#include <condition_variable>
#include <deque>
#include <functional>
#include <future>
#include <iostream>
#include <memory>
#include <mutex>
#include <stop_token>
#include <thread>
#include <type_traits>
#include <vector>

#include "Concerto/Core/Types/Types.hpp"

namespace cct
{

class CCT_CORE_PUBLIC_API ThreadPool
{
public:
explicit ThreadPool(unsigned int numThreads = 0);
~ThreadPool() noexcept;

ThreadPool(const ThreadPool&) = delete;
ThreadPool& operator=(const ThreadPool&) = delete;
ThreadPool(ThreadPool&&) = delete;
ThreadPool& operator=(ThreadPool&&) = delete;

/**
* @brief Adds a task to the pool without returning a result.
*
* @tparam F Callable type that can be invoked with no arguments.
* @param f Callable to execute.
*
* @note This method is thread-safe. Tasks are not accepted after
* RequestStop().
*/
template<typename F>
requires std::invocable<std::decay_t<F>> &&
std::is_void_v<std::invoke_result_t<std::decay_t<F>>>
void AddTask(F&& f);

/**
* @brief Submits a task and returns a future for its result.
*
* @tparam F Callable type.
* @return std::future with the result of the callable.
*
* @note This method is thread-safe. Tasks are not accepted after
* RequestStop().
*/
template<typename F>
requires std::invocable<std::decay_t<F>>
auto Submit(F&& f) -> std::future<std::invoke_result_t<std::decay_t<F>>>;

/**
* @brief Waits until all in-flight tasks complete or deadline is reached.
*
* @param deadline Absolute time point to wait until.
* @return true if all tasks completed before deadline, false otherwise.
*/
bool Wait(std::chrono::steady_clock::time_point deadline);

/**
* @brief Waits until all in-flight tasks complete or timeout expires.
*
* @param timeout Duration to wait.
* @return true if all tasks completed before timeout, false otherwise.
*/
bool WaitFor(std::chrono::milliseconds timeout);

/**
* @brief Requests graceful shutdown. No new tasks will be accepted.
*
* Workers will finish current tasks and then exit.
* This method is idempotent and thread-safe.
*/
void RequestStop() noexcept;

size_t GetWorkerCount() const noexcept;

private:
void WorkerLoop(unsigned int workerIndex);

void TaskCompleted() noexcept;

// Thread management
std::vector<std::jthread> m_workers;

// Task queue and synchronization
std::deque<std::function<void()>> m_taskQueue;
std::mutex m_queueMutex;
std::condition_variable m_queueCv;

// Wait synchronization
std::atomic<size_t> m_tasksInFlight{0};
std::condition_variable m_waitCv;
std::mutex m_waitMutex;

// State
std::atomic<bool> m_stopRequested{false};
};

} // namespace cct

#include "ThreadPool.inl"
Loading
Loading