Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions build_master.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@
"sources" : [
"source/manual_tests/ThreadNaming.cpp"
]
},
{
"name" : "Pipeline",
"is_executable" : true,
"sources" : [
"source/manual_tests/Pipeline.cpp"
]
},
{
"name" : "main",
Expand Down
4 changes: 4 additions & 0 deletions include/common/DynamicPoolFast.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <common/DynamicPool.hpp>

#include <type_traits>

namespace com
{
template<typename T>
Expand All @@ -17,6 +19,8 @@ namespace com
std::size_t getIndex() const noexcept { return m_index; }
T& getValue() { return m_value; }
public:
DynamicPoolFastElement() requires std::is_default_constructible_v<T>
{ }
DynamicPoolFastElement(T value, std::size_t index) : m_value(std::move(value)), m_index(index) { }

DynamicPoolFastElement(DynamicPoolFastElement&& el) : m_value(std::move(el.m_value)), m_index(el.m_index) { }
Expand Down
34 changes: 34 additions & 0 deletions meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,40 @@ ThreadNaming = executable('ThreadNaming',
gnu_symbol_visibility: 'hidden'
)

# -------------- Target: Pipeline ------------------
Pipeline_sources_bm_internal__ = [
'source/manual_tests/Pipeline.cpp'
]
Pipeline_include_dirs_bm_internal__ = [

]
Pipeline_dependencies_bm_internal__ = [

]
Pipeline_link_args_bm_internal__ = {
'windows' : [],
'linux' : [],
'darwin' : []
}
Pipeline_platform_src_bm_internal__ = {
'windows' : [],
'linux' : [],
'darwin' : []
}
Pipeline_defines_bm_internal__ = [

]
Pipeline = executable('Pipeline',
Pipeline_sources_bm_internal__ + Pipeline_platform_src_bm_internal__[host_machine.system()] + sources_bm_internal__,
dependencies: dependencies_bm_internal__ + Pipeline_dependencies_bm_internal__,
include_directories: [inc_bm_internal__, Pipeline_include_dirs_bm_internal__],
install: false,
c_args: Pipeline_defines_bm_internal__ + project_build_mode_defines_bm_internal__,
cpp_args: Pipeline_defines_bm_internal__ + project_build_mode_defines_bm_internal__,
link_args: Pipeline_link_args_bm_internal__[host_machine.system()],
gnu_symbol_visibility: 'hidden'
)

# -------------- Target: main ------------------
main_sources_bm_internal__ = [
'source/main.cpp'
Expand Down
174 changes: 174 additions & 0 deletions source/manual_tests/Pipeline.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
#include <common/DynamicPoolFast.hpp>
#include <common/ProducerConsumerBuffer.hpp>

#include <atomic>
#include <thread>
#include <mutex>
#include <vector>
#include <random>
#include <cassert>

static std::atomic<std::size_t> gPool1DataCounter = 0;
static std::atomic<std::size_t> gPool2DataCounter = 0;

static std::vector<int> gGeneratedRandomNumberList;
static std::vector<int> gProcessedRandomNumberList;

static int GenerateRandomNumber()
{
static std::random_device r;
static std::default_random_engine e1(r());
static std::uniform_int_distribution<int> uniform_dist(1, 100);

int value = uniform_dist(e1);
gGeneratedRandomNumberList.push_back(value);

return value;
}

struct Context
{
com::DynamicPoolFast<int> pool12;
com::DynamicPoolFast<int> pool23;

std::mutex pool12Mutex;
std::mutex pool23Mutex;

com::ProducerConsumerBuffer<com::DynamicPoolFast<int>::ElementType> pipe1;
com::ProducerConsumerBuffer<com::DynamicPoolFast<int>::ElementType> pipe2;
com::ProducerConsumerBuffer<com::DynamicPoolFast<int>::ElementType> pipe3;

Context() : pool12([]()
{
++gPool1DataCounter;
return 0;
},
[](int&)
{
--gPool1DataCounter;
}),
pool23([]()
{
++gPool2DataCounter;
return 0;
},
[](int&)
{
--gPool2DataCounter;
})
{

}
};

static Context gContext;
static constexpr std::size_t gDataCount = 100;

static void _pipe1Process(com::DynamicPoolFast<int>::ElementType& output)
{
*output = GenerateRandomNumber();
}

static void pipe1Process()
{
auto dataCount = gDataCount;
while(dataCount)
{
com::DynamicPoolFast<int>::ElementType value;
{
std::lock_guard<std::mutex> lock(gContext.pool12Mutex);
value = gContext.pool12.get();
}
_pipe1Process(value);
gContext.pipe1.push(value);
--dataCount;
}
}

static void _pipe2Process(const com::DynamicPoolFast<int>::ElementType& input, com::DynamicPoolFast<int>::ElementType& output)
{
*output = *input;
}

static void pipe2Process()
{
auto dataCount = gDataCount;
while(dataCount)
{
// Take out the output of pipe1
auto value = gContext.pipe1.pop();

// Process it
com::DynamicPoolFast<int>::ElementType value2;
{
std::lock_guard<std::mutex> lock(gContext.pool23Mutex);
value2 = gContext.pool23.get();
}
_pipe2Process(value, value2);

// Push the output of pipe2
gContext.pipe2.push(value2);

// Return the output of pipe1 back to pipe1's pool
{
std::lock_guard<std::mutex> lock(gContext.pool12Mutex);
gContext.pool12.putFast(value);
}
--dataCount;
}
}

static void pipe3Process()
{
auto dataCount = gDataCount;
while(dataCount)
{
// Take out the output of pipe1
auto value = gContext.pipe2.pop();

gProcessedRandomNumberList.push_back(*value);

// Return the output of pipe2 back to pipe2's pool
{
std::lock_guard<std::mutex> lock(gContext.pool23Mutex);
gContext.pool23.putFast(value);
}
--dataCount;
}
}


// 3 stage pipeline
// Pipe 1: generate input and put into Pipe 2
// Pipe 2: process input and put into Pipe 3
// Pipe 3: process input and compare with pipe 1's generated output
int main()
{
std::thread pipe1Thread(pipe1Process);
std::thread pipe2Thread(pipe2Process);
std::thread pipe3Thread(pipe3Process);

if(pipe1Thread.joinable())
pipe1Thread.join();
if(pipe2Thread.joinable())
pipe2Thread.join();
if(pipe3Thread.joinable())
pipe3Thread.join();

assert(gProcessedRandomNumberList.size() == gGeneratedRandomNumberList.size());
assert(gProcessedRandomNumberList.size() == gDataCount);

for(std::size_t i = 0; i < gGeneratedRandomNumberList.size(); ++i)
{
assert(gGeneratedRandomNumberList[i] == gProcessedRandomNumberList[i]);
}

std::cout << "gDataCount: " << gDataCount << "\n";
std::cout << "gPool1DataCounter: " << gPool1DataCounter << "\n";
std::cout << "gPool2DataCounter: " << gPool2DataCounter << "\n";

assert(gPool1DataCounter <= gDataCount);
assert(gPool2DataCounter <= gDataCount);

return 0;
}
Loading