From 2159f3e071454a4e0cbd8b380ed4078162e7def2 Mon Sep 17 00:00:00 2001 From: ravi688 Date: Sun, 1 Mar 2026 12:49:56 +0530 Subject: [PATCH 1/3] WIP --- build_master.json | 7 ++ meson.build | 34 +++++++ source/manual_tests/Pipeline.cpp | 156 +++++++++++++++++++++++++++++++ 3 files changed, 197 insertions(+) create mode 100644 source/manual_tests/Pipeline.cpp diff --git a/build_master.json b/build_master.json index bbfa670..c46f69b 100644 --- a/build_master.json +++ b/build_master.json @@ -66,6 +66,13 @@ "sources" : [ "source/manual_tests/ThreadNaming.cpp" ] + }, + { + "name" : "Pipeline", + "is_executable" : true, + "sources" : [ + "source/manual_tests/Pipeline.cpp" + ] }, { "name" : "main", diff --git a/meson.build b/meson.build index 70a6c3c..2a752a5 100644 --- a/meson.build +++ b/meson.build @@ -440,6 +440,40 @@ ThreadNaming = executable('ThreadNaming', gnu_symbol_visibility: 'hidden' ) +# -------------- Target: Pipeline ------------------ +Pipeline_sources_bm_internal__ = [ +'source/manual_tests/Pipeline.cpp' +] +Pipeline_include_dirs_bm_internal__ = [ + +] +Pipeline_dependencies_bm_internal__ = [ + +] +Pipeline_link_args_bm_internal__ = { +'windows' : [], +'linux' : [], +'darwin' : [] +} +Pipeline_platform_src_bm_internal__ = { +'windows' : [], +'linux' : [], +'darwin' : [] +} +Pipeline_defines_bm_internal__ = [ + +] +Pipeline = executable('Pipeline', + Pipeline_sources_bm_internal__ + Pipeline_platform_src_bm_internal__[host_machine.system()] + sources_bm_internal__, + dependencies: dependencies_bm_internal__ + Pipeline_dependencies_bm_internal__, + include_directories: [inc_bm_internal__, Pipeline_include_dirs_bm_internal__], + install: false, + c_args: Pipeline_defines_bm_internal__ + project_build_mode_defines_bm_internal__, + cpp_args: Pipeline_defines_bm_internal__ + project_build_mode_defines_bm_internal__, + link_args: Pipeline_link_args_bm_internal__[host_machine.system()], + gnu_symbol_visibility: 'hidden' +) + # -------------- Target: main ------------------ main_sources_bm_internal__ = [ 'source/main.cpp' diff --git a/source/manual_tests/Pipeline.cpp b/source/manual_tests/Pipeline.cpp new file mode 100644 index 0000000..30639be --- /dev/null +++ b/source/manual_tests/Pipeline.cpp @@ -0,0 +1,156 @@ +#include +#include + +#include +#include +#include +#include +#include + +static std::atomic gPool1DataCounter = 0; +static std::atomic gPool2DataCounter = 0; + +static std::vector gGeneratedRandomNumberList; +static std::vector gProcessedRandomNumberList; + +static int GenerateRandomNumber() +{ + static std::random_device r; + static std::default_random_engine e1(r()); + static std::uniform_int_distribution uniform_dist(1, 100); + + int value = uniform_dist(e1); + gGeneratedRandomNumberList.push_back(value); + + return value; +} + +struct Context +{ + com::DynamicPoolFast pool12; + com::DynamicPoolFast pool23; + + com::ProducerConsumerBuffer::ElementType> pipe1; + com::ProducerConsumerBuffer::ElementType> pipe2; + com::ProducerConsumerBuffer::ElementType> pipe3; + + Context() : pool12([]() + { + ++gPool1DataCounter; + return 0; + }, + [](int&) + { + --gPool1DataCounter; + }), + pool23([]() + { + ++gPool2DataCounter; + return 0; + }, + [](int&) + { + --gPool2DataCounter; + }) + { + + } +}; + +static Context gContext; +static constexpr std::size_t gDataCount = 100; + +static void _pipe1Process(com::DynamicPoolFast::ElementType& output) +{ + *output = GenerateRandomNumber(); +} + +static void pipe1Process() +{ + auto dataCount = gDataCount; + while(dataCount) + { + auto value = gContext.pool12.get(); + _pipe1Process(value); + gContext.pipe1.push(value); + --dataCount; + } +} + +static void _pipe2Process(const com::DynamicPoolFast::ElementType& input, com::DynamicPoolFast::ElementType& output) +{ + *output = *input; +} + +static void pipe2Process() +{ + auto dataCount = gDataCount; + while(dataCount) + { + // Take out the output of pipe1 + auto value = gContext.pipe1.pop(); + + // Process it + auto value2 = gContext.pool23.get(); + _pipe2Process(value, value2); + + // Push the output of pipe2 + gContext.pipe2.push(value2); + + // Return the output of pipe1 back to pipe1's pool + gContext.pool12.putFast(value); + --dataCount; + } +} + +static void pipe3Process() +{ + auto dataCount = gDataCount; + while(dataCount) + { + // Take out the output of pipe1 + auto value = gContext.pipe2.pop(); + + gProcessedRandomNumberList.push_back(*value); + + // Return the output of pipe2 back to pipe2's pool + gContext.pool23.putFast(value); + --dataCount; + } +} + + +// 3 stage pipeline +// Pipe 1: generate input and put into Pipe 2 +// Pipe 2: process input and put into Pipe 3 +// Pipe 3: process input and compare with pipe 1's generated output +int main() +{ + std::thread pipe1Thread(pipe1Process); + std::thread pipe2Thread(pipe2Process); + std::thread pipe3Thread(pipe3Process); + + if(pipe1Thread.joinable()) + pipe1Thread.join(); + if(pipe2Thread.joinable()) + pipe2Thread.join(); + if(pipe3Thread.joinable()) + pipe3Thread.join(); + + assert(gProcessedRandomNumberList.size() == gGeneratedRandomNumberList.size()); + assert(gProcessedRandomNumberList.size() == gDataCount); + + for(std::size_t i = 0; i < gGeneratedRandomNumberList.size(); ++i) + { + assert(gGeneratedRandomNumberList[i] == gProcessedRandomNumberList[i]); + } + + std::cout << "gDataCount: " << gDataCount << "\n"; + std::cout << "gPool1DataCounter: " << gPool1DataCounter << "\n"; + std::cout << "gPool2DataCounter: " << gPool2DataCounter << "\n"; + + assert(gPool1DataCounter <= gDataCount); + assert(gPool2DataCounter <= gDataCount); + + return 0; +} From c3337e78858d692ef40650d81bd53d3174333265 Mon Sep 17 00:00:00 2001 From: ravi688 Date: Sun, 1 Mar 2026 13:10:01 +0530 Subject: [PATCH 2/3] Enable DynamicPoolFastElement() if T is default constructible --- include/common/DynamicPoolFast.hpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/include/common/DynamicPoolFast.hpp b/include/common/DynamicPoolFast.hpp index 42d4cca..53a4e6a 100644 --- a/include/common/DynamicPoolFast.hpp +++ b/include/common/DynamicPoolFast.hpp @@ -2,6 +2,8 @@ #include +#include + namespace com { template @@ -17,6 +19,8 @@ namespace com std::size_t getIndex() const noexcept { return m_index; } T& getValue() { return m_value; } public: + DynamicPoolFastElement() requires std::is_default_constructible_v + { } DynamicPoolFastElement(T value, std::size_t index) : m_value(std::move(value)), m_index(index) { } DynamicPoolFastElement(DynamicPoolFastElement&& el) : m_value(std::move(el.m_value)), m_index(el.m_index) { } From 578beecd8d63aef63dd24348ac8036e5e86ac679 Mon Sep 17 00:00:00 2001 From: ravi688 Date: Sun, 1 Mar 2026 13:11:18 +0530 Subject: [PATCH 3/3] Guard dynamic pools with mutexes --- source/manual_tests/Pipeline.cpp | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/source/manual_tests/Pipeline.cpp b/source/manual_tests/Pipeline.cpp index 30639be..e751c23 100644 --- a/source/manual_tests/Pipeline.cpp +++ b/source/manual_tests/Pipeline.cpp @@ -3,6 +3,7 @@ #include #include +#include #include #include #include @@ -30,6 +31,9 @@ struct Context com::DynamicPoolFast pool12; com::DynamicPoolFast pool23; + std::mutex pool12Mutex; + std::mutex pool23Mutex; + com::ProducerConsumerBuffer::ElementType> pipe1; com::ProducerConsumerBuffer::ElementType> pipe2; com::ProducerConsumerBuffer::ElementType> pipe3; @@ -70,7 +74,11 @@ static void pipe1Process() auto dataCount = gDataCount; while(dataCount) { - auto value = gContext.pool12.get(); + com::DynamicPoolFast::ElementType value; + { + std::lock_guard lock(gContext.pool12Mutex); + value = gContext.pool12.get(); + } _pipe1Process(value); gContext.pipe1.push(value); --dataCount; @@ -91,14 +99,21 @@ static void pipe2Process() auto value = gContext.pipe1.pop(); // Process it - auto value2 = gContext.pool23.get(); + com::DynamicPoolFast::ElementType value2; + { + std::lock_guard lock(gContext.pool23Mutex); + value2 = gContext.pool23.get(); + } _pipe2Process(value, value2); // Push the output of pipe2 gContext.pipe2.push(value2); // Return the output of pipe1 back to pipe1's pool - gContext.pool12.putFast(value); + { + std::lock_guard lock(gContext.pool12Mutex); + gContext.pool12.putFast(value); + } --dataCount; } } @@ -114,7 +129,10 @@ static void pipe3Process() gProcessedRandomNumberList.push_back(*value); // Return the output of pipe2 back to pipe2's pool - gContext.pool23.putFast(value); + { + std::lock_guard lock(gContext.pool23Mutex); + gContext.pool23.putFast(value); + } --dataCount; } }