diff --git a/build_master.json b/build_master.json index bbfa670..c46f69b 100644 --- a/build_master.json +++ b/build_master.json @@ -66,6 +66,13 @@ "sources" : [ "source/manual_tests/ThreadNaming.cpp" ] + }, + { + "name" : "Pipeline", + "is_executable" : true, + "sources" : [ + "source/manual_tests/Pipeline.cpp" + ] }, { "name" : "main", diff --git a/include/common/DynamicPoolFast.hpp b/include/common/DynamicPoolFast.hpp index 42d4cca..53a4e6a 100644 --- a/include/common/DynamicPoolFast.hpp +++ b/include/common/DynamicPoolFast.hpp @@ -2,6 +2,8 @@ #include +#include + namespace com { template @@ -17,6 +19,8 @@ namespace com std::size_t getIndex() const noexcept { return m_index; } T& getValue() { return m_value; } public: + DynamicPoolFastElement() requires std::is_default_constructible_v + { } DynamicPoolFastElement(T value, std::size_t index) : m_value(std::move(value)), m_index(index) { } DynamicPoolFastElement(DynamicPoolFastElement&& el) : m_value(std::move(el.m_value)), m_index(el.m_index) { } diff --git a/meson.build b/meson.build index 70a6c3c..2a752a5 100644 --- a/meson.build +++ b/meson.build @@ -440,6 +440,40 @@ ThreadNaming = executable('ThreadNaming', gnu_symbol_visibility: 'hidden' ) +# -------------- Target: Pipeline ------------------ +Pipeline_sources_bm_internal__ = [ +'source/manual_tests/Pipeline.cpp' +] +Pipeline_include_dirs_bm_internal__ = [ + +] +Pipeline_dependencies_bm_internal__ = [ + +] +Pipeline_link_args_bm_internal__ = { +'windows' : [], +'linux' : [], +'darwin' : [] +} +Pipeline_platform_src_bm_internal__ = { +'windows' : [], +'linux' : [], +'darwin' : [] +} +Pipeline_defines_bm_internal__ = [ + +] +Pipeline = executable('Pipeline', + Pipeline_sources_bm_internal__ + Pipeline_platform_src_bm_internal__[host_machine.system()] + sources_bm_internal__, + dependencies: dependencies_bm_internal__ + Pipeline_dependencies_bm_internal__, + include_directories: [inc_bm_internal__, Pipeline_include_dirs_bm_internal__], + install: false, + c_args: Pipeline_defines_bm_internal__ + project_build_mode_defines_bm_internal__, + cpp_args: Pipeline_defines_bm_internal__ + project_build_mode_defines_bm_internal__, + link_args: Pipeline_link_args_bm_internal__[host_machine.system()], + gnu_symbol_visibility: 'hidden' +) + # -------------- Target: main ------------------ main_sources_bm_internal__ = [ 'source/main.cpp' diff --git a/source/manual_tests/Pipeline.cpp b/source/manual_tests/Pipeline.cpp new file mode 100644 index 0000000..e751c23 --- /dev/null +++ b/source/manual_tests/Pipeline.cpp @@ -0,0 +1,174 @@ +#include +#include + +#include +#include +#include +#include +#include +#include + +static std::atomic gPool1DataCounter = 0; +static std::atomic gPool2DataCounter = 0; + +static std::vector gGeneratedRandomNumberList; +static std::vector gProcessedRandomNumberList; + +static int GenerateRandomNumber() +{ + static std::random_device r; + static std::default_random_engine e1(r()); + static std::uniform_int_distribution uniform_dist(1, 100); + + int value = uniform_dist(e1); + gGeneratedRandomNumberList.push_back(value); + + return value; +} + +struct Context +{ + com::DynamicPoolFast pool12; + com::DynamicPoolFast pool23; + + std::mutex pool12Mutex; + std::mutex pool23Mutex; + + com::ProducerConsumerBuffer::ElementType> pipe1; + com::ProducerConsumerBuffer::ElementType> pipe2; + com::ProducerConsumerBuffer::ElementType> pipe3; + + Context() : pool12([]() + { + ++gPool1DataCounter; + return 0; + }, + [](int&) + { + --gPool1DataCounter; + }), + pool23([]() + { + ++gPool2DataCounter; + return 0; + }, + [](int&) + { + --gPool2DataCounter; + }) + { + + } +}; + +static Context gContext; +static constexpr std::size_t gDataCount = 100; + +static void _pipe1Process(com::DynamicPoolFast::ElementType& output) +{ + *output = GenerateRandomNumber(); +} + +static void pipe1Process() +{ + auto dataCount = gDataCount; + while(dataCount) + { + com::DynamicPoolFast::ElementType value; + { + std::lock_guard lock(gContext.pool12Mutex); + value = gContext.pool12.get(); + } + _pipe1Process(value); + gContext.pipe1.push(value); + --dataCount; + } +} + +static void _pipe2Process(const com::DynamicPoolFast::ElementType& input, com::DynamicPoolFast::ElementType& output) +{ + *output = *input; +} + +static void pipe2Process() +{ + auto dataCount = gDataCount; + while(dataCount) + { + // Take out the output of pipe1 + auto value = gContext.pipe1.pop(); + + // Process it + com::DynamicPoolFast::ElementType value2; + { + std::lock_guard lock(gContext.pool23Mutex); + value2 = gContext.pool23.get(); + } + _pipe2Process(value, value2); + + // Push the output of pipe2 + gContext.pipe2.push(value2); + + // Return the output of pipe1 back to pipe1's pool + { + std::lock_guard lock(gContext.pool12Mutex); + gContext.pool12.putFast(value); + } + --dataCount; + } +} + +static void pipe3Process() +{ + auto dataCount = gDataCount; + while(dataCount) + { + // Take out the output of pipe1 + auto value = gContext.pipe2.pop(); + + gProcessedRandomNumberList.push_back(*value); + + // Return the output of pipe2 back to pipe2's pool + { + std::lock_guard lock(gContext.pool23Mutex); + gContext.pool23.putFast(value); + } + --dataCount; + } +} + + +// 3 stage pipeline +// Pipe 1: generate input and put into Pipe 2 +// Pipe 2: process input and put into Pipe 3 +// Pipe 3: process input and compare with pipe 1's generated output +int main() +{ + std::thread pipe1Thread(pipe1Process); + std::thread pipe2Thread(pipe2Process); + std::thread pipe3Thread(pipe3Process); + + if(pipe1Thread.joinable()) + pipe1Thread.join(); + if(pipe2Thread.joinable()) + pipe2Thread.join(); + if(pipe3Thread.joinable()) + pipe3Thread.join(); + + assert(gProcessedRandomNumberList.size() == gGeneratedRandomNumberList.size()); + assert(gProcessedRandomNumberList.size() == gDataCount); + + for(std::size_t i = 0; i < gGeneratedRandomNumberList.size(); ++i) + { + assert(gGeneratedRandomNumberList[i] == gProcessedRandomNumberList[i]); + } + + std::cout << "gDataCount: " << gDataCount << "\n"; + std::cout << "gPool1DataCounter: " << gPool1DataCounter << "\n"; + std::cout << "gPool2DataCounter: " << gPool2DataCounter << "\n"; + + assert(gPool1DataCounter <= gDataCount); + assert(gPool2DataCounter <= gDataCount); + + return 0; +}