-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathspsc_lockfree.cpp
More file actions
139 lines (117 loc) · 2.96 KB
/
spsc_lockfree.cpp
File metadata and controls
139 lines (117 loc) · 2.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
#include <iostream>
#include <thread>
#include <array>
#include <functional>
#include <vector>
#include <random>
#include <chrono>
using func_type = std::function<int(int,int)>;
using args_ary = std::array<int,2>;
struct ThreadPool{
ThreadPool(){
for(int i = 0; i < max_size; ++i){
work[i] = Work();
}
}
bool enqueue(func_type fn, args_ary args){
size_t current_head_idx = head_idx.load(std::memory_order_acquire);
std::atomic<size_t> next_idx = (current_head_idx + 1) % max_size;
Work w = work[next_idx];
if(w.invalid() || w.processed()){
work[next_idx].set_work(fn);
work[next_idx].set_args(args);
work[next_idx].reset();
head_idx.store(next_idx, std::memory_order_release);
}
return true;
}
bool process(){
size_t current_tail_idx = tail_idx.load(std::memory_order_acquire);
std::atomic<size_t> next_idx = (current_tail_idx + 1) % max_size;
if(!work[next_idx].processed()){
int output = work[next_idx].process();
std::cout << output << '\n';
work[next_idx].set_processed();
tail_idx.store(next_idx);
}
return true;
}
private:
struct Work{
private:
func_type _work;
args_ary _args;
bool is_processed = false;
public:
Work():is_processed(true){};
Work(func_type work, args_ary args):_work{work},_args{args},is_processed(false){};
bool processed(){
return is_processed == true;
}
int process(){
return _work(_args[0], _args[1]);
}
bool invalid(){
return _work == nullptr;
}
void set_processed(){
is_processed = true;
}
void set_work(func_type work){
_work = work;
}
void set_args(args_ary args){
_args = args;
}
void reset(){
is_processed = false;
}
};
static const size_t max_size = 1000;
std::array<Work,max_size> work;
std::atomic<size_t> head_idx{0}, tail_idx{0};
};
int random_num(){
std::mt19937 rng;
rng.seed(std::random_device()());
std::uniform_int_distribution<std::mt19937::result_type> dist(1,10000); // distribution in range [1,10000]
return dist(rng);
}
int main(){
ThreadPool tp;
std::thread producer1([&](){
int count = 0;
while(true){
std::function<int(int,int)> fn;
if (count % 3 == 0){
fn = [](int a, int b){
std::cout << "Adding \t\t" << a << '\t'<< b << '\t';
return a+b;
};
}else if(count % 3 == 1){
fn = [](int a, int b){
std::cout << "Subtracting \t" << a << '\t'<< b << '\t';
return a-b;
};
}else{
fn = [](int a, int b){
std::cout << "Multiplying \t" << a << '\t'<< b << '\t';
return a*b;};
}
tp.enqueue(fn, args_ary{random_num(),random_num()});
++count;
}
});
std::thread consumer1([&](){
while(true){
tp.process();
}
});
if(producer1.joinable()){
producer1.join();
}
if(consumer1.joinable()){
consumer1.join();
}
return 0;
}