Modern C++ (C++17 and later) introduced a revolutionary parallel programming model through Execution Policies that seamlessly integrate parallelism into the Standard Template Library (STL) algorithms. This model allows developers to parallelize existing algorithms with minimal code changes while maintaining portability across different hardware architectures.
The parallel algorithm programming model is built around execution policies that specify how algorithms should execute:
#include <execution>
#include <algorithm>
#include <vector>
std::vector<int> data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
// Sequential execution (default behavior)
std::sort(data.begin(), data.end());
// Parallel execution
std::sort(std::execution::par, data.begin(), data.end());
// Parallel + Vectorized execution
std::sort(std::execution::par_unseq, data.begin(), data.end());- Behavior: Sequential execution on a single thread
- Guarantee: Operations execute in the order they would in the sequential algorithm
- Use Case: Default behavior, debugging, when parallelism overhead isn't justified
- Performance: Baseline sequential performance
- Behavior: Parallel execution across multiple threads
- Guarantee: No ordering guarantees between operations
- Use Case: CPU-bound algorithms where operations are independent
- Performance: Scales with available CPU cores
- Behavior: Parallel execution with vectorization opportunities
- Guarantee: Operations may execute out of order and may be vectorized
- Use Case: SIMD/vectorizable operations, maximum performance
- Performance: Best performance on modern CPUs with SIMD support
The parallel algorithm model uses an implementation-defined approach that typically leverages:
- Thread Pools: Managed thread pools for efficient thread reuse
- Work Stealing: Dynamic load balancing across threads
- Vectorization: SIMD instructions when possible
- Memory Access Patterns: Optimized for cache efficiency
Most STL algorithms support parallel execution:
// Non-modifying sequence operations
std::find(std::execution::par, vec.begin(), vec.end(), value);
std::count_if(std::execution::par, vec.begin(), vec.end(), predicate);
// Modifying sequence operations
std::transform(std::execution::par, input.begin(), input.end(), output.begin(), func);
std::copy_if(std::execution::par, input.begin(), input.end(), output.begin(), predicate);
// Sorting operations
std::sort(std::execution::par, vec.begin(), vec.end());
std::stable_sort(std::execution::par_unseq, vec.begin(), vec.end());
// Numeric operations
std::inner_product(std::execution::par, vec1.begin(), vec1.end(), vec2.begin(), 0.0);Some algorithms cannot be parallelized due to their sequential nature:
// These remain sequential-only
std::partial_sum(vec.begin(), vec.end(), result.begin()); // Requires order
std::adjacent_difference(vec.begin(), vec.end(), result.begin()); // Depends on previous results#include <chrono>
#include <iostream>
template<typename ExecutionPolicy>
double benchmark_sort(std::vector<int>& data, ExecutionPolicy&& policy) {
auto start = std::chrono::high_resolution_clock::now();
std::sort(std::forward<ExecutionPolicy>(policy), data.begin(), data.end());
auto end = std::chrono::high_resolution_clock::now();
return std::chrono::duration<double>(end - start).count();
}Typical Performance Scaling:
- seq: Baseline performance (1x)
- par: 2-8x improvement on 4-16 core systems
- par_unseq: 4-16x improvement with SIMD acceleration
// Parallel execution has overhead - only beneficial for large datasets
const size_t threshold = 10000; // Implementation-dependent
if (data.size() > threshold) {
std::sort(std::execution::par, data.begin(), data.end());
} else {
std::sort(data.begin(), data.end()); // Sequential is faster for small datasets
}Parallel algorithms guarantee no data races for:
- Input ranges: Read-only access
- Output ranges: Write-only access (no overlapping with input)
std::vector<int> input = {1, 2, 3, 4, 5};
std::vector<int> output(input.size());
// Safe: input and output don't overlap
std::transform(std::execution::par, input.begin(), input.end(),
output.begin(), [](int x) { return x * 2; });
// Unsafe: overlapping ranges can cause data races
std::transform(std::execution::par, input.begin(), input.end(),
input.begin(), [](int x) { return x * 2; }); // Data race!try {
std::transform(std::execution::par, input.begin(), input.end(),
output.begin(), [](int x) {
if (x == 0) throw std::runtime_error("Division by zero");
return 100 / x;
});
} catch (const std::exception& e) {
// Exception may be rethrown from any thread
std::cout << "Exception caught: " << e.what() << std::endl;
}// C++20 allows custom execution policies
class custom_policy {
public:
// Define execution behavior
template<typename F>
auto execute(F&& f) const {
// Custom execution logic
return std::forward<F>(f)();
}
};
// Usage
std::for_each(custom_policy{}, data.begin(), data.end(), func);#include <ranges>
auto result = data
| std::views::filter([](int x) { return x > 0; })
| std::views::transform([](int x) { return x * 2; });
// Parallel processing of ranges
std::vector<int> output;
std::ranges::copy(std::execution::par,
std::ranges::begin(result), std::ranges::end(result),
std::back_inserter(output));// Stage 1: Filter
auto filtered = std::views::filter([](int x) { return x % 2 == 0; });
// Stage 2: Transform
auto transformed = std::views::transform([](int x) { return x * x; });
// Stage 3: Reduce
int sum = std::transform_reduce(
std::execution::par,
data.begin(), data.end(),
0, std::plus<int>{},
[](int x) { return x * x; } // Square each element then sum
);// Implementation typically uses a thread pool
class ThreadPool {
std::vector<std::thread> workers;
std::queue<std::function<void()>> tasks;
std::mutex queue_mutex;
std::condition_variable condition;
bool stop = false;
public:
// Submit task to thread pool
template<class F>
void enqueue(F&& f) {
{
std::unique_lock<std::mutex> lock(queue_mutex);
tasks.emplace(std::forward<F>(f));
}
condition.notify_one();
}
};- Static Partitioning: Divide work evenly among threads
- Dynamic Partitioning: Threads request work as they complete tasks
- Work Stealing: Idle threads steal work from busy threads
// par_unseq enables vectorization
std::transform(std::execution::par_unseq,
input.begin(), input.end(), output.begin(),
[](float x) { return x * 2.0f + 1.0f; });
// Compiler may generate SIMD instructions:
// movaps xmm0, [input] ; Load 4 floats
// mulps xmm0, xmm1 ; Multiply by 2
// addps xmm0, xmm2 ; Add 1
// movaps [output], xmm0 ; Store resultenum class ExecutionMode { Sequential, Parallel, Vectorized };
ExecutionMode choose_policy(size_t data_size, bool is_simd_friendly) {
if (data_size < 1000) return ExecutionMode::Sequential;
if (is_simd_friendly) return ExecutionMode::Vectorized;
return ExecutionMode::Parallel;
}
void process_data(std::vector<float>& data) {
auto mode = choose_policy(data.size(), true);
switch (mode) {
case ExecutionMode::Sequential:
std::transform(data.begin(), data.end(), data.begin(),
[](float x) { return std::sin(x); });
break;
case ExecutionMode::Parallel:
std::transform(std::execution::par, data.begin(), data.end(), data.begin(),
[](float x) { return std::sin(x); });
break;
case ExecutionMode::Vectorized:
std::transform(std::execution::par_unseq, data.begin(), data.end(), data.begin(),
[](float x) { return std::sin(x); });
break;
}
}// Optimize for cache efficiency
struct alignas(64) CacheLine { // 64-byte cache line alignment
float data[16]; // 16 floats = 64 bytes
};
// Process cache-line aligned data
std::vector<CacheLine> aligned_data;
std::transform(std::execution::par_unseq,
aligned_data.begin(), aligned_data.end(),
aligned_data.begin(),
[](const CacheLine& line) {
CacheLine result;
for (size_t i = 0; i < 16; ++i) {
result.data[i] = std::sqrt(line.data[i]);
}
return result;
});// Execution policies are implementation-defined
// Behavior may vary between compilers and standard library implementations
// Check for parallel algorithm support
#ifdef __cpp_lib_parallel_algorithm
std::cout << "Parallel algorithms supported" << std::endl;
#endif
// Runtime detection of available parallelism
unsigned int hardware_threads = std::thread::hardware_concurrency();
std::cout << "Available hardware threads: " << hardware_threads << std::endl;// Debugging parallel code requires special techniques
#define DEBUG_PARALLEL 1
template<typename ExecutionPolicy, typename... Args>
void debug_algorithm(ExecutionPolicy&& policy, Args&&... args) {
#ifdef DEBUG_PARALLEL
// Run sequentially for debugging
std::cout << "Debugging mode: running sequentially" << std::endl;
return algorithm(std::execution::seq, std::forward<Args>(args)...);
#else
return algorithm(std::forward<ExecutionPolicy>(policy), std::forward<Args>(args)...);
#endif
}// Exception handling in parallel algorithms
std::atomic<bool> exception_flag{false};
std::exception_ptr exception_ptr;
std::transform(std::execution::par, input.begin(), input.end(), output.begin(),
[&](int x) -> int {
try {
if (x == 42) throw std::runtime_error("Answer to everything");
return x * 2;
} catch (...) {
exception_flag = true;
exception_ptr = std::current_exception();
return 0; // Or some sentinel value
}
});
// Check for exceptions after parallel execution
if (exception_flag) {
std::rethrow_exception(exception_ptr);
}// Combine with OpenMP for fine-grained control
#pragma omp parallel
{
#pragma omp single
{
std::for_each(std::execution::par, data.begin(), data.end(),
[](int& x) {
#pragma omp critical
std::cout << x << " ";
});
}
}// CPU parallel algorithms can complement GPU computing
std::vector<float> host_data = generate_data();
// CPU preprocessing
std::transform(std::execution::par, host_data.begin(), host_data.end(),
host_data.begin(), [](float x) { return x * 0.1f; });
// GPU processing
thrust::device_vector<float> device_data = host_data;
thrust::transform(device_data.begin(), device_data.end(),
device_data.begin(),
[] __device__ (float x) { return sinf(x); });
// CPU postprocessing
host_data = device_data;
std::sort(std::execution::par, host_data.begin(), host_data.end());- Executor concepts: More flexible execution models
- Task graphs: Explicit dependency management
- Heterogeneous computing: CPU + GPU coordination
- Coroutine integration: Async parallel algorithms
// Future: More sophisticated execution policies
namespace std::execution {
// Quality of service policies
inline constexpr qos_policy high_throughput{};
inline constexpr qos_policy low_latency{};
inline constexpr qos_policy energy_efficient{};
// Hardware-specific policies
inline constexpr gpu_policy gpu{};
inline constexpr fpga_policy fpga{};
}C++20 introduced additional execution policies for more fine-grained control:
#include <execution>
#include <algorithm>
#include <vector>
std::vector<int> data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
// Existing policies (C++17)
std::sort(std::execution::seq, data.begin(), data.end());
std::sort(std::execution::par, data.begin(), data.end());
std::sort(std::execution::par_unseq, data.begin(), data.end());
// C++20: Unsequenced execution (similar to par_unseq but more explicit)
std::sort(std::execution::unseq, data.begin(), data.end());// Parallel for_each with different execution policies
std::for_each(std::execution::par, data.begin(), data.end(),
[](int& x) { x = x * x; });
// Parallel for_each_n (process first n elements)
std::for_each_n(std::execution::par, data.begin(), 5,
[](int& x) { x = x + 1; });// Transform with multiple input ranges
std::vector<int> a = {1, 2, 3, 4};
std::vector<int> b = {5, 6, 7, 8};
std::vector<int> result(4);
std::transform(std::execution::par, a.begin(), a.end(), b.begin(),
result.begin(), std::plus<int>{});#include <numeric>
// Parallel reduce (similar to accumulate but parallelizable)
int sum = std::reduce(std::execution::par, data.begin(), data.end(), 0);
// Parallel transform_reduce (map-reduce pattern)
double dot_product = std::transform_reduce(
std::execution::par,
vec1.begin(), vec1.end(), vec2.begin(), 0.0,
std::plus<double>{}, std::multiplies<double>{}
);
// With custom reduction operation
std::string concat = std::reduce(
std::execution::par,
strings.begin(), strings.end(), std::string{},
[](const std::string& a, const std::string& b) {
return a + "," + b;
}
);std::vector<int> data = {1, 2, 3, 4, 5};
std::vector<int> prefix_sum(data.size());
// Inclusive scan: prefix_sum[i] = sum of first (i+1) elements
std::inclusive_scan(std::execution::par, data.begin(), data.end(),
prefix_sum.begin());
// Exclusive scan: prefix_sum[i] = sum of first i elements
std::exclusive_scan(std::execution::par, data.begin(), data.end(),
prefix_sum.begin(), 0);std::vector<int> data = {1, 2, 3, 4, 5, 6};
// Shift elements left by 2 positions
auto new_end = std::shift_left(std::execution::par, data.begin(),
data.end(), 2);
// data now: {3, 4, 5, 6, ?, ?}
// Shift elements right by 1 position
std::shift_right(std::execution::par, data.begin(), new_end, 1);
// data now: {?, 3, 4, 5, 6, ?}#include <ranges>
#include <algorithm>
// Parallel ranges algorithms
auto even_numbers = data
| std::views::filter([](int x) { return x % 2 == 0; })
| std::views::transform([](int x) { return x * x; });
// Parallel copy to vector
std::vector<int> result;
std::ranges::copy(std::execution::par,
std::ranges::begin(even_numbers),
std::ranges::end(even_numbers),
std::back_inserter(result));// C++20 allows more sophisticated execution policy composition
struct custom_policy {
template<typename F>
auto execute(F&& f) const {
// Custom execution logic
// Could implement load balancing, priority scheduling, etc.
return std::forward<F>(f)();
}
};
// Usage with parallel algorithms
std::for_each(custom_policy{}, data.begin(), data.end(), func);// Query execution policy properties
template<typename ExecutionPolicy>
void analyze_policy(const ExecutionPolicy& policy) {
if constexpr (std::is_execution_policy_v<std::remove_cvref_t<ExecutionPolicy>>) {
std::cout << "Is parallel policy: "
<< std::is_same_v<ExecutionPolicy, std::execution::parallel_policy>
<< std::endl;
std::cout << "Is unsequenced policy: "
<< std::is_same_v<ExecutionPolicy, std::execution::unsequenced_policy>
<< std::endl;
}
}C++23 introduces the sender/scheduler model for more flexible async operations:
// Sender-based parallel algorithms (proposed for C++23)
auto sender = std::execution::just(data)
| std::execution::bulk([](auto& x) { x *= 2; })
| std::execution::then([](auto&& result) {
return std::reduce(result.begin(), result.end(), 0);
});
// Execute on thread pool
auto future = std::execution::execute(std::execution::thread_pool{}, sender);// Parallel for_each with explicit dependencies (C++23 concept)
std::vector<std::future<void>> futures;
std::parallel::for_each(std::execution::par, data.begin(), data.end(),
[&](int& x) {
// Complex computation with dependencies
auto dep1 = compute_dependency1(x);
auto dep2 = compute_dependency2(x);
x = combine_results(dep1, dep2);
});// Enhanced transform_reduce with custom combiners
auto result = std::parallel::transform_reduce(
std::execution::par,
data.begin(), data.end(),
0.0, // Initial value
std::plus<double>{}, // Reduction combiner
[](int x) { return std::sqrt(x); }, // Transformer
std::multiplies<double>{} // Alternative combiner
);#include <future>
#include <queue>
// Producer-consumer pipeline
template<typename T, typename F>
class PipelineStage {
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable cv_;
std::vector<std::thread> workers_;
F processor_;
bool done_ = false;
public:
PipelineStage(size_t num_workers, F processor)
: processor_(std::move(processor)) {
for (size_t i = 0; i < num_workers; ++i) {
workers_.emplace_back([this]() { worker_loop(); });
}
}
void submit(T item) {
{
std::unique_lock<std::mutex> lock(mutex_);
queue_.push(std::move(item));
}
cv_.notify_one();
}
void finish() {
{
std::unique_lock<std::mutex> lock(mutex_);
done_ = true;
}
cv_.notify_all();
for (auto& worker : workers_) {
worker.join();
}
}
private:
void worker_loop() {
while (true) {
T item;
{
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this]() {
return !queue_.empty() || done_;
});
if (queue_.empty() && done_) break;
item = std::move(queue_.front());
queue_.pop();
}
processor_(std::move(item));
}
}
};// Parallel graph traversal
template<typename Graph, typename Visitor>
void parallel_dfs(const Graph& graph, size_t start_vertex,
Visitor visitor, size_t num_threads) {
std::vector<std::atomic<bool>> visited(graph.num_vertices());
std::vector<std::thread> threads;
auto worker = [&](size_t thread_id) {
std::stack<size_t> local_stack;
// Assign vertices to threads using simple partitioning
for (size_t v = thread_id; v < graph.num_vertices();
v += num_threads) {
if (!visited[v].exchange(true)) {
local_stack.push(v);
visitor(v); // Visit starting vertex
while (!local_stack.empty()) {
size_t current = local_stack.top();
local_stack.pop();
// Process neighbors
for (auto neighbor : graph.neighbors(current)) {
if (!visited[neighbor].exchange(true)) {
local_stack.push(neighbor);
visitor(neighbor);
}
}
}
}
}
};
// Launch worker threads
for (size_t i = 0; i < num_threads; ++i) {
threads.emplace_back(worker, i);
}
// Wait for completion
for (auto& thread : threads) {
thread.join();
}
}// Work-stealing scheduler
class WorkStealingScheduler {
std::vector<std::deque<std::function<void()>>> queues_;
std::vector<std::thread> threads_;
std::atomic<bool> done_{false};
std::mutex mutex_;
std::condition_variable cv_;
public:
WorkStealingScheduler(size_t num_threads)
: queues_(num_threads) {
for (size_t i = 0; i < num_threads; ++i) {
threads_.emplace_back([this, i]() { worker_loop(i); });
}
}
void submit(std::function<void()> task) {
{
std::unique_lock<std::mutex> lock(mutex_);
queues_[get_current_thread_id()].push_back(std::move(task));
}
cv_.notify_one();
}
void shutdown() {
done_ = true;
cv_.notify_all();
for (auto& thread : threads_) {
thread.join();
}
}
private:
void worker_loop(size_t thread_id) {
while (!done_) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mutex_);
// Try to get task from own queue first
if (!queues_[thread_id].empty()) {
task = std::move(queues_[thread_id].front());
queues_[thread_id].pop_front();
} else {
// Try to steal from other queues
for (size_t i = 0; i < queues_.size(); ++i) {
if (!queues_[i].empty()) {
task = std::move(queues_[i].front());
queues_[i].pop_front();
break;
}
}
}
if (!task) {
cv_.wait_for(lock, std::chrono::milliseconds(10));
continue;
}
}
// Execute task
task();
}
}
size_t get_current_thread_id() {
// Simplified thread ID mapping
static std::atomic<size_t> next_id{0};
thread_local size_t id = next_id++;
return id % queues_.size();
}
};#include <coroutine>
#include <future>
// Coroutine-based parallel computation
template<typename T>
struct ParallelTask {
struct promise_type {
T value;
std::suspend_always yield_value(T v) {
value = v;
return {};
}
ParallelTask get_return_object() {
return ParallelTask{std::coroutine_handle<promise_type>::from_promise(*this)};
}
std::suspend_always initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void unhandled_exception() { std::terminate(); }
};
std::coroutine_handle<promise_type> handle;
T get() {
handle.resume();
return handle.promise().value;
}
};
// Usage with parallel algorithms
ParallelTask<std::vector<double>> parallel_compute(std::vector<double> data) {
std::transform(std::execution::par, data.begin(), data.end(), data.begin(),
[](double x) { return std::sin(x) * std::cos(x); });
co_yield data;
}// Parallel algorithms module (C++20 modules)
export module parallel_algorithms;
import <execution>;
import <algorithm>;
import <vector>;
export namespace parallel {
template<typename Range, typename Func>
void parallel_for_each(Range&& range, Func&& func) {
std::for_each(std::execution::par,
std::begin(range), std::end(range),
std::forward<Func>(func));
}
template<typename Range, typename T, typename BinaryOp>
T parallel_reduce(Range&& range, T init, BinaryOp&& op) {
return std::reduce(std::execution::par,
std::begin(range), std::end(range),
init, std::forward<BinaryOp>(op));
}
} // namespace parallel#include <concepts>
// Concept for parallelizable types
template<typename T>
concept Parallelizable = std::is_arithmetic_v<T> ||
requires(T t) { t.begin(); t.end(); };
// Concept for execution policies
template<typename T>
concept ExecutionPolicy = std::is_execution_policy_v<std::remove_cvref_t<T>>;
// Generic parallel algorithm with concepts
template<Parallelizable Range, ExecutionPolicy Policy, typename Func>
void parallel_transform(Policy&& policy, Range&& range, Func&& func) {
std::transform(std::forward<Policy>(policy),
std::begin(range), std::end(range), std::begin(range),
std::forward<Func>(func));
}#include <chrono>
#include <iostream>
// Detailed performance profiling
template<typename Func, typename... Args>
auto profile_parallel_execution(Func&& func, Args&&... args) {
using namespace std::chrono;
// Profile different execution policies
std::vector<std::pair<std::string, double>> results;
// Sequential baseline
auto start = high_resolution_clock::now();
auto result_seq = std::invoke(func, std::execution::seq, args...);
auto end = high_resolution_clock::now();
results.emplace_back("sequential",
duration<double>(end - start).count());
// Parallel execution
start = high_resolution_clock::now();
auto result_par = std::invoke(func, std::execution::par, args...);
end = high_resolution_clock::now();
results.emplace_back("parallel",
duration<double>(end - start).count());
// Parallel unsequenced
start = high_resolution_clock::now();
auto result_unseq = std::invoke(func, std::execution::par_unseq, args...);
end = high_resolution_clock::now();
results.emplace_back("par_unseq",
duration<double>(end - start).count());
return std::make_tuple(results, result_seq, result_par, result_unseq);
}// Analyze memory access patterns for optimization
struct MemoryAccessProfiler {
std::vector<size_t> access_pattern;
std::chrono::nanoseconds total_time{0};
template<typename Func>
auto profile(Func&& func) {
access_pattern.clear();
auto start = std::chrono::high_resolution_clock::now();
// Instrument memory accesses (simplified)
auto result = std::forward<Func>(func)();
auto end = std::chrono::high_resolution_clock::now();
total_time = end - start;
return result;
}
void report() const {
std::cout << "Memory access pattern analysis:\n";
std::cout << "Total time: " << total_time.count() << " ns\n";
std::cout << "Access pattern size: " << access_pattern.size() << "\n";
// Analyze for coalescing opportunities
bool is_coalesced = true;
for (size_t i = 1; i < access_pattern.size(); ++i) {
if (access_pattern[i] != access_pattern[i-1] + 1) {
is_coalesced = false;
break;
}
}
std::cout << "Memory access coalesced: " << (is_coalesced ? "Yes" : "No") << "\n";
}
};C++20 and future standards bring increasingly sophisticated parallel programming capabilities:
- Enhanced Execution Policies: More granular control over parallel execution
- New Parallel Algorithms:
reduce,transform_reduce,inclusive_scan, etc. - Ranges Integration: Seamless composition with modern C++ features
- Sender/Scheduler Model: Flexible async programming (C++23)
- Advanced Patterns: Pipeline parallelism, work-stealing, dynamic load balancing
- Modern C++ Integration: Coroutines, modules, concepts
These features enable developers to write high-performance parallel code with increasingly high-level abstractions while maintaining fine-grained control over execution characteristics. The evolution from C++17's basic parallel algorithms to C++20's rich ecosystem and beyond represents a significant advancement in parallel programming productivity and performance. ## Future Directions
- Executor concepts: More flexible execution models
- Task graphs: Explicit dependency management
- Heterogeneous computing: CPU + GPU coordination
- Coroutine integration: Async parallel algorithms
// Future: More sophisticated execution policies
namespace std::execution {
// Quality of service policies
inline constexpr qos_policy high_throughput{};
inline constexpr qos_policy low_latency{};
inline constexpr qos_policy energy_efficient{};
// Hardware-specific policies
inline constexpr gpu_policy gpu{};
inline constexpr fpga_policy fpga{};
}C++20 introduced additional execution policies for more fine-grained control:
#include <execution>
#include <algorithm>
#include <vector>
std::vector<int> data = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
// Existing policies (C++17)
std::sort(std::execution::seq, data.begin(), data.end());
std::sort(std::execution::par, data.begin(), data.end());
std::sort(std::execution::par_unseq, data.begin(), data.end());
// C++20: Unsequenced execution (similar to par_unseq but more explicit)
std::sort(std::execution::unseq, data.begin(), data.end());// Parallel for_each with different execution policies
std::for_each(std::execution::par, data.begin(), data.end(),
[](int& x) { x = x * x; });
// Parallel for_each_n (process first n elements)
std::for_each_n(std::execution::par, data.begin(), 5,
[](int& x) { x = x + 1; });// Transform with multiple input ranges
std::vector<int> a = {1, 2, 3, 4};
std::vector<int> b = {5, 6, 7, 8};
std::vector<int> result(4);
std::transform(std::execution::par, a.begin(), a.end(), b.begin(),
result.begin(), std::plus<int>{});#include <numeric>
// Parallel reduce (similar to accumulate but parallelizable)
int sum = std::reduce(std::execution::par, data.begin(), data.end(), 0);
// Parallel transform_reduce (map-reduce pattern)
double dot_product = std::transform_reduce(
std::execution::par,
vec1.begin(), vec1.end(), vec2.begin(), 0.0,
std::plus<double>{}, std::multiplies<double>{}
);
// With custom reduction operation
std::string concat = std::reduce(
std::execution::par,
strings.begin(), strings.end(), std::string{},
[](const std::string& a, const std::string& b) {
return a + "," + b;
}
);std::vector<int> data = {1, 2, 3, 4, 5};
std::vector<int> prefix_sum(data.size());
// Inclusive scan: prefix_sum[i] = sum of first (i+1) elements
std::inclusive_scan(std::execution::par, data.begin(), data.end(),
prefix_sum.begin());
// Exclusive scan: prefix_sum[i] = sum of first i elements
std::exclusive_scan(std::execution::par, data.begin(), data.end(),
prefix_sum.begin(), 0);std::vector<int> data = {1, 2, 3, 4, 5, 6};
// Shift elements left by 2 positions
auto new_end = std::shift_left(std::execution::par, data.begin(),
data.end(), 2);
// data now: {3, 4, 5, 6, ?, ?}
// Shift elements right by 1 position
std::shift_right(std::execution::par, data.begin(), new_end, 1);
// data now: {?, 3, 4, 5, 6, ?}#include <ranges>
#include <algorithm>
// Parallel ranges algorithms
auto even_numbers = data
| std::views::filter([](int x) { return x % 2 == 0; })
| std::views::transform([](int x) { return x * x; });
// Parallel copy to vector
std::vector<int> result;
std::ranges::copy(std::execution::par,
std::ranges::begin(even_numbers),
std::ranges::end(even_numbers),
std::back_inserter(result));// C++20 allows more sophisticated execution policy composition
struct custom_policy {
template<typename F>
auto execute(F&& f) const {
// Custom execution logic
// Could implement load balancing, priority scheduling, etc.
return std::forward<F>(f)();
}
};
// Usage with parallel algorithms
std::for_each(custom_policy{}, data.begin(), data.end(), func);// Query execution policy properties
template<typename ExecutionPolicy>
void analyze_policy(const ExecutionPolicy& policy) {
if constexpr (std::is_execution_policy_v<std::remove_cvref_t<ExecutionPolicy>>) {
std::cout << "Is parallel policy: "
<< std::is_same_v<ExecutionPolicy, std::execution::parallel_policy>
<< std::endl;
std::cout << "Is unsequenced policy: "
<< std::is_same_v<ExecutionPolicy, std::execution::unsequenced_policy>
<< std::endl;
}
}C++23 introduces the sender/scheduler model for more flexible async operations:
// Sender-based parallel algorithms (proposed for C++23)
auto sender = std::execution::just(data)
| std::execution::bulk([](auto& x) { x *= 2; })
| std::execution::then([](auto&& result) {
return std::reduce(result.begin(), result.end(), 0);
});
// Execute on thread pool
auto future = std::execution::execute(std::execution::thread_pool{}, sender);// Parallel for_each with explicit dependencies (C++23 concept)
std::vector<std::future<void>> futures;
std::parallel::for_each(std::execution::par, data.begin(), data.end(),
[&](int& x) {
// Complex computation with dependencies
auto dep1 = compute_dependency1(x);
auto dep2 = compute_dependency2(x);
x = combine_results(dep1, dep2);
});// Enhanced transform_reduce with custom combiners
auto result = std::parallel::transform_reduce(
std::execution::par,
data.begin(), data.end(),
0.0, // Initial value
std::plus<double>{}, // Reduction combiner
[](int x) { return std::sqrt(x); }, // Transformer
std::multiplies<double>{} // Alternative combiner
);#include <future>
#include <queue>
// Producer-consumer pipeline
template<typename T, typename F>
class PipelineStage {
std::queue<T> queue_;
std::mutex mutex_;
std::condition_variable cv_;
std::vector<std::thread> workers_;
F processor_;
bool done_ = false;
public:
PipelineStage(size_t num_workers, F processor)
: processor_(std::move(processor)) {
for (size_t i = 0; i < num_workers; ++i) {
workers_.emplace_back([this]() { worker_loop(); });
}
}
void submit(T item) {
{
std::unique_lock<std::mutex> lock(mutex_);
queue_.push(std::move(item));
}
cv_.notify_one();
}
void finish() {
{
std::unique_lock<std::mutex> lock(mutex_);
done_ = true;
}
cv_.notify_all();
for (auto& worker : workers_) {
worker.join();
}
}
private:
void worker_loop() {
while (true) {
T item;
{
std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this]() {
return !queue_.empty() || done_;
});
if (queue_.empty() && done_) break;
item = std::move(queue_.front());
queue_.pop();
}
processor_(std::move(item));
}
}
};// Parallel graph traversal
template<typename Graph, typename Visitor>
void parallel_dfs(const Graph& graph, size_t start_vertex,
Visitor visitor, size_t num_threads) {
std::vector<std::atomic<bool>> visited(graph.num_vertices());
std::vector<std::thread> threads;
auto worker = [&](size_t thread_id) {
std::stack<size_t> local_stack;
// Assign vertices to threads using simple partitioning
for (size_t v = thread_id; v < graph.num_vertices();
v += num_threads) {
if (!visited[v].exchange(true)) {
local_stack.push(v);
visitor(v); // Visit starting vertex
while (!local_stack.empty()) {
size_t current = local_stack.top();
local_stack.pop();
// Process neighbors
for (auto neighbor : graph.neighbors(current)) {
if (!visited[neighbor].exchange(true)) {
local_stack.push(neighbor);
visitor(neighbor);
}
}
}
}
}
};
// Launch worker threads
for (size_t i = 0; i < num_threads; ++i) {
threads.emplace_back(worker, i);
}
// Wait for completion
for (auto& thread : threads) {
thread.join();
}
}// Work-stealing scheduler
class WorkStealingScheduler {
std::vector<std::deque<std::function<void()>>> queues_;
std::vector<std::thread> threads_;
std::atomic<bool> done_{false};
std::mutex mutex_;
std::condition_variable cv_;
public:
WorkStealingScheduler(size_t num_threads)
: queues_(num_threads) {
for (size_t i = 0; i < num_threads; ++i) {
threads_.emplace_back([this, i]() { worker_loop(i); });
}
}
void submit(std::function<void()> task) {
{
std::unique_lock<std::mutex> lock(mutex_);
queues_[get_current_thread_id()].push_back(std::move(task));
}
cv_.notify_one();
}
void shutdown() {
done_ = true;
cv_.notify_all();
for (auto& thread : threads_) {
thread.join();
}
}
private:
void worker_loop(size_t thread_id) {
while (!done_) {
std::function<void()> task;
{
std::unique_lock<std::mutex> lock(mutex_);
// Try to get task from own queue first
if (!queues_[thread_id].empty()) {
task = std::move(queues_[thread_id].front());
queues_[thread_id].pop_front();
} else {
// Try to steal from other queues
for (size_t i = 0; i < queues_.size(); ++i) {
if (!queues_[i].empty()) {
task = std::move(queues_[i].front());
queues_[i].pop_front();
break;
}
}
}
if (!task) {
cv_.wait_for(lock, std::chrono::milliseconds(10));
continue;
}
}
// Execute task
task();
}
}
size_t get_current_thread_id() {
// Simplified thread ID mapping
static std::atomic<size_t> next_id{0};
thread_local size_t id = next_id++;
return id % queues_.size();
}
};#include <coroutine>
#include <future>
// Coroutine-based parallel computation
template<typename T>
struct ParallelTask {
struct promise_type {
T value;
std::suspend_always yield_value(T v) {
value = v;
return {};
}
ParallelTask get_return_object() {
return ParallelTask{std::coroutine_handle<promise_type>::from_promise(*this)};
}
std::suspend_always initial_suspend() { return {}; }
std::suspend_always final_suspend() noexcept { return {}; }
void unhandled_exception() { std::terminate(); }
};
std::coroutine_handle<promise_type> handle;
T get() {
handle.resume();
return handle.promise().value;
}
};
// Usage with parallel algorithms
ParallelTask<std::vector<double>> parallel_compute(std::vector<double> data) {
std::transform(std::execution::par, data.begin(), data.end(), data.begin(),
[](double x) { return std::sin(x) * std::cos(x); });
co_yield data;
}// Parallel algorithms module (C++20 modules)
export module parallel_algorithms;
import <execution>;
import <algorithm>;
import <vector>;
export namespace parallel {
template<typename Range, typename Func>
void parallel_for_each(Range&& range, Func&& func) {
std::for_each(std::execution::par,
std::begin(range), std::end(range),
std::forward<Func>(func));
}
template<typename Range, typename T, typename BinaryOp>
T parallel_reduce(Range&& range, T init, BinaryOp&& op) {
return std::reduce(std::execution::par,
std::begin(range), std::end(range),
init, std::forward<BinaryOp>(op));
}
} // namespace parallel#include <concepts>
// Concept for parallelizable types
template<typename T>
concept Parallelizable = std::is_arithmetic_v<T> ||
requires(T t) { t.begin(); t.end(); };
// Concept for execution policies
template<typename T>
concept ExecutionPolicy = std::is_execution_policy_v<std::remove_cvref_t<T>>;
// Generic parallel algorithm with concepts
template<Parallelizable Range, ExecutionPolicy Policy, typename Func>
void parallel_transform(Policy&& policy, Range&& range, Func&& func) {
std::transform(std::forward<Policy>(policy),
std::begin(range), std::end(range), std::begin(range),
std::forward<Func>(func));
}#include <chrono>
#include <iostream>
// Detailed performance profiling
template<typename Func, typename... Args>
auto profile_parallel_execution(Func&& func, Args&&... args) {
using namespace std::chrono;
// Profile different execution policies
std::vector<std::pair<std::string, double>> results;
// Sequential baseline
auto start = high_resolution_clock::now();
auto result_seq = std::invoke(func, std::execution::seq, args...);
auto end = high_resolution_clock::now();
results.emplace_back("sequential",
duration<double>(end - start).count());
// Parallel execution
start = high_resolution_clock::now();
auto result_par = std::invoke(func, std::execution::par, args...);
end = high_resolution_clock::now();
results.emplace_back("parallel",
duration<double>(end - start).count());
// Parallel unsequenced
start = high_resolution_clock::now();
auto result_unseq = std::invoke(func, std::execution::par_unseq, args...);
end = high_resolution_clock::now();
results.emplace_back("par_unseq",
duration<double>(end - start).count());
return std::make_tuple(results, result_seq, result_par, result_unseq);
}// Analyze memory access patterns for optimization
struct MemoryAccessProfiler {
std::vector<size_t> access_pattern;
std::chrono::nanoseconds total_time{0};
template<typename Func>
auto profile(Func&& func) {
access_pattern.clear();
auto start = std::chrono::high_resolution_clock::now();
// Instrument memory accesses (simplified)
auto result = std::forward<Func>(func)();
auto end = std::chrono::high_resolution_clock::now();
total_time = end - start;
return result;
}
void report() const {
std::cout << "Memory access pattern analysis:\n";
std::cout << "Total time: " << total_time.count() << " ns\n";
std::cout << "Access pattern size: " << access_pattern.size() << "\n";
// Analyze for coalescing opportunities
bool is_coalesced = true;
for (size_t i = 1; i < access_pattern.size(); ++i) {
if (access_pattern[i] != access_pattern[i-1] + 1) {
is_coalesced = false;
break;
}
}
std::cout << "Memory access coalesced: " << (is_coalesced ? "Yes" : "No") << "\n";
}
};C++20 and future standards bring increasingly sophisticated parallel programming capabilities:
- Enhanced Execution Policies: More granular control over parallel execution
- New Parallel Algorithms:
reduce,transform_reduce,inclusive_scan, etc. - Ranges Integration: Seamless composition with modern C++ features
- Sender/Scheduler Model: Flexible async programming (C++23)
- Advanced Patterns: Pipeline parallelism, work-stealing, dynamic load balancing
- Modern C++ Integration: Coroutines, modules, concepts
These features enable developers to write high-performance parallel code with increasingly high-level abstractions while maintaining fine-grained control over execution characteristics. The evolution from C++17's basic parallel algorithms to C++20's rich ecosystem and beyond represents a significant advancement in parallel programming productivity and performance. /home/rahul/AiStätt/cpp/Parallel_Algorithms_CPP.md