Skip to content

Commit fe8a521

Browse files
committed
added stats for job queue and manaer and print functions
1 parent b6f3127 commit fe8a521

File tree

4 files changed

+205
-29
lines changed

4 files changed

+205
-29
lines changed

src/DataModelBase/JobQueue.cpp

Lines changed: 55 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,22 @@
22

33
using namespace ToolFramework;
44

5+
6+
QueueStats::QueueStats(){
7+
8+
Clear();
9+
10+
}
11+
12+
void QueueStats::Clear(){
13+
14+
submitted = 0;
15+
queued = 0;
16+
17+
}
18+
19+
20+
521
JobQueue::JobQueue(){}
622

723
JobQueue::~JobQueue(){
@@ -24,7 +40,10 @@ bool JobQueue::AddJob(Job* job){
2440
job->m_failed=false;
2541
m_lock.lock();
2642
m_jobs.push(job);
43+
m_stats[job->m_id].submitted++;
44+
m_stats[job->m_id].queued++;
2745
m_lock.unlock();
46+
2847
return true;
2948
}
3049
return false;
@@ -41,6 +60,7 @@ Job* JobQueue::GetJob(){
4160
Job* ret = m_jobs.front();
4261
m_jobs.front()=0;
4362
m_jobs.pop();
63+
m_stats[ret->m_id].queued--;
4464
m_lock.unlock();
4565
return ret;
4666
}
@@ -49,8 +69,9 @@ bool JobQueue::pop(){
4969

5070
m_lock.lock();
5171
if(m_jobs.size()){
52-
m_jobs.pop();
53-
m_lock.unlock();
72+
m_stats[m_jobs.front()->m_id].queued--;
73+
m_jobs.pop();
74+
m_lock.unlock();
5475
return true;
5576
}
5677
m_lock.unlock();
@@ -65,3 +86,35 @@ unsigned int JobQueue::size(){
6586
return tmp;
6687

6788
}
89+
90+
void JobQueue::Print(){
91+
92+
m_lock.lock();
93+
printf("Total jobs queued = %u\n", size());
94+
for(std::map<std::string, QueueStats>::iterator it = m_stats.begin(); it!=m_stats.end(); it++){
95+
printf(" %s : submitted = %lu, queued = %lu \n", it->first.c_str(), it->second.submitted, it->second.queued);
96+
}
97+
m_lock.unlock();
98+
99+
}
100+
101+
void JobQueue::ClearStats(){
102+
103+
m_lock.lock();
104+
m_stats.clear();
105+
m_lock.unlock();
106+
107+
}
108+
109+
void JobQueue::Clear(){
110+
111+
m_lock.lock();
112+
while(m_jobs.size()){
113+
delete m_jobs.front();
114+
m_jobs.front()=0;
115+
m_jobs.pop();
116+
}
117+
m_stats.clear();
118+
m_lock.unlock();
119+
120+
}

src/DataModelBase/JobQueue.h

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,28 @@
44
#include <queue>
55
#include <mutex>
66
#include <Job.h>
7+
#include <map>
78

89
namespace ToolFramework{
910

11+
/**
12+
* \struct QueueStats
13+
*
14+
* This is a struct to define stats measured for job types;
15+
*
16+
* $Author: B.Richards $
17+
* $Date: 2024/06/08 1:17:00 $
18+
*/
19+
20+
struct QueueStats{
21+
22+
QueueStats();
23+
void Clear();
24+
unsigned long submitted;
25+
unsigned long queued;
26+
27+
};
28+
1029
/**
1130
* \class JobQueue
1231
*
@@ -18,6 +37,8 @@ namespace ToolFramework{
1837

1938

2039
class JobQueue{
40+
41+
friend class WorkerPoolManager;
2142

2243
public:
2344

@@ -28,11 +49,16 @@ namespace ToolFramework{
2849
Job* GetJob(); ///< function to get job from the front of the queue, the function pops the job off the queue
2950
bool pop(); ///< function to pop a job off the front of the queue
3051
unsigned int size(); ///< function to return number of jobs in the queue
52+
void Print();
53+
void ClearStats();
54+
void Clear();
55+
3156

3257
private:
3358

3459
std::queue<Job*> m_jobs;
3560
std::mutex m_lock;
61+
std::map<std::string, QueueStats> m_stats;
3662

3763
};
3864

src/DataModelBase/WorkerPoolManager.cpp

Lines changed: 98 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,14 @@
22

33
using namespace ToolFramework;
44

5+
PoolManagerStats::PoolManagerStats(){
6+
7+
processing = 0;
8+
completed = 0 ;
9+
failed = 0;
10+
11+
}
12+
513
PoolWorker_args::PoolWorker_args() : Thread_args() {}
614

715
PoolWorker_args::~PoolWorker_args() {}
@@ -32,7 +40,7 @@ WorkerPoolManager::WorkerPoolManager(JobQueue& job_queue, unsigned int* thread_c
3240
m_manager_args.sleep = false;
3341
m_manager_args.sleep_us = ( m_manager_args.thread_management_period_us < m_manager_args.job_assignment_period_us ? m_manager_args.thread_management_period_us : m_manager_args.job_assignment_period_us );
3442

35-
CreateWorkerThread(m_manager_args.args, m_manager_args.self_serving, m_manager_args.thread_sleep_us, m_manager_args.job_queue, m_manager_args.job_out_deque, m_manager_args.thread_num, m_util, global_thread_num);
43+
CreateWorkerThread(m_manager_args.args, m_manager_args.self_serving, m_manager_args.thread_sleep_us, m_manager_args.job_queue, m_manager_args.job_out_deque, m_manager_args.thread_num, m_util, &m_manager_args.stats, &m_manager_args.stats_mtx, global_thread_num);
3644

3745
m_manager_args.free_threads = 1;
3846
if (m_threaded) CreateManagerThread();
@@ -61,21 +69,23 @@ void WorkerPoolManager::CreateManagerThread() {
6169
}
6270

6371

64-
void WorkerPoolManager::CreateWorkerThread(std::vector<PoolWorker_args*>& in_args, bool &in_self_serving, unsigned int &in_thread_sleep_us, JobQueue* in_job_queue, JobDeque* in_job_out_deque,unsigned long &thread_num, Utilities* in_util, unsigned int* global_thread_num) {
72+
void WorkerPoolManager::CreateWorkerThread(std::vector<PoolWorker_args*>& in_args, bool &in_self_serving, unsigned int &in_thread_sleep_us, JobQueue* in_job_queue, JobDeque* in_job_out_deque,unsigned long &thread_num, Utilities* in_util, std::map<std::string,PoolManagerStats>* in_stats, std::mutex* in_stats_mtx, unsigned int* global_thread_num) {
6573
PoolWorker_args* tmparg = new PoolWorker_args();
6674
tmparg->busy = false;
6775
tmparg->thread_sleep_us = in_thread_sleep_us;
6876
tmparg->job = 0;
6977
tmparg->job_queue = 0;
7078
tmparg->job_out_deque = in_job_out_deque;
79+
tmparg->stats = in_stats;
80+
tmparg->stats_mtx = in_stats_mtx;
7181
if(in_self_serving) tmparg->job_queue=in_job_queue;
7282
tmparg->self_serving = in_self_serving;
7383
in_args.push_back(tmparg);
7484
std::stringstream tmp;
7585
tmp << "T" << thread_num;
7686
in_util->CreateThread(tmp.str(), &WorkerThread, in_args.at(in_args.size() - 1));
7787
thread_num++;
78-
if(global_thread_num) global_thread_num++;
88+
if(global_thread_num) (*global_thread_num)++;
7989
}
8090

8191
void WorkerPoolManager::DeleteWorkerThread(unsigned int pos, Utilities* in_util, std::vector<PoolWorker_args*> &in_args, unsigned int* global_thread_num) {
@@ -93,34 +103,52 @@ void WorkerPoolManager::WorkerThread(Thread_args* arg) {
93103
else {
94104
if(args->self_serving){
95105
args->job=args->job_queue->GetJob();
96-
if(!args->job) return;
106+
args->stats_mtx->lock();
107+
(*args->stats)[args->job->m_id].processing++;
108+
args->stats_mtx->unlock();
97109
args->job->m_in_progress=true;
98110
args->busy = true;
99111
}
100-
101-
try{
102-
if(args->job->func(args->job->data)) args->job->m_complete=true;
103-
else args->job->m_failed=true;
104-
}
105-
catch (std::exception& e) {
106-
std::clog<<"Job Failed \""<<args->job->m_id<<"\": "<<e.what() <<std::endl;
107-
args->job->m_failed=true;
112+
113+
if(args->job){
114+
try{
115+
if(args->job->func(args->job->data)){
116+
args->job->m_complete=true;
117+
args->stats_mtx->lock();
118+
(*args->stats)[args->job->m_id].processing--;
119+
(*args->stats)[args->job->m_id].completed++;
120+
args->stats_mtx->unlock();
121+
}
122+
else args->job->m_failed=true;
123+
}
124+
catch (std::exception& e) {
125+
std::clog<<"Job Failed \""<<args->job->m_id<<"\": "<<e.what() <<std::endl;
126+
args->job->m_failed=true;
127+
}
128+
catch(...){
129+
std::clog<<"Job Failed \""<<args->job->m_id<<"\""<<std::endl;
130+
args->job->m_failed=true;
131+
}
108132
}
109-
catch(...){
110-
std::clog<<"Job Failed \""<<args->job->m_id<<"\""<<std::endl;
133+
else{
134+
std::clog<<"Job Failed \""<<args->job->m_id<<"\": null job pointer"<<std::endl;
111135
args->job->m_failed=true;
112136
}
113137

114138
if(args->job->m_failed){
115-
try{
116-
if(args->job->fail_func) args->job->fail_func(args->job->data);
117-
}
118-
catch (std::exception& p) {
119-
std::clog<<"Job fail_func Failed \"args->job->m_id\" likely memory leaking: "<<p.what() <<std::endl;
120-
}
121-
catch(...){
122-
std::clog<<"Job fail_func Failed \"args->job->m_id\" likely memory leaking: "<<std::endl;
123-
}
139+
args->stats_mtx->lock();
140+
(*args->stats)[args->job->m_id].processing--;
141+
(*args->stats)[args->job->m_id].failed++;
142+
args->stats_mtx->unlock();
143+
try{
144+
if(args->job->fail_func) args->job->fail_func(args->job->data);
145+
}
146+
catch (std::exception& p) {
147+
std::clog<<"Job fail_func Failed \"args->job->m_id\" likely memory leaking: "<<p.what() <<std::endl;
148+
}
149+
catch(...){
150+
std::clog<<"Job fail_func Failed \"args->job->m_id\" likely memory leaking: "<<std::endl;
151+
}
124152
}
125153

126154
if (args->job_out_deque) {
@@ -159,6 +187,10 @@ void WorkerPoolManager::ManagerThread(Thread_args* arg) {
159187
for (unsigned int i = 0; i < args->args.size(); i++) {
160188
if (!args->args.at(i)->busy && args->job_queue->size() > 0) {
161189
args->args.at(i)->job = args->job_queue->GetJob();
190+
args->stats_mtx.lock();
191+
args->stats[args->args.at(i)->job->m_id].processing++;
192+
args->stats_mtx.unlock();
193+
162194
args->args.at(i)->job->m_in_progress=true;
163195
args->args.at(i)->busy = true;
164196
}
@@ -178,7 +210,7 @@ void WorkerPoolManager::ManagerThread(Thread_args* arg) {
178210
}
179211
}
180212

181-
if (args->free_threads < 1 && args->args.size()<(*(args->thread_cap)) && ( !args->global_thread_cap || (*(args->global_thread_num))<(*(args->global_thread_cap)) ) ) CreateWorkerThread(args->args, args->self_serving, args->thread_sleep_us, args->job_queue, args->job_out_deque, args->thread_num, args->util, args->global_thread_num);
213+
if (args->free_threads < 1 && args->args.size()<(*(args->thread_cap)) && ( !args->global_thread_cap || (*(args->global_thread_num))<(*(args->global_thread_cap)) ) ) CreateWorkerThread(args->args, args->self_serving, args->thread_sleep_us, args->job_queue, args->job_out_deque, args->thread_num, args->util, &args->stats, &args->stats_mtx, args->global_thread_num);
182214

183215
if (args->free_threads > 1) DeleteWorkerThread(last_free, args->util, args->args, args->global_thread_num);
184216

@@ -200,3 +232,45 @@ unsigned int WorkerPoolManager::NumThreads() {
200232
return m_manager_args.args.size();
201233

202234
}
235+
236+
std::string WorkerPoolManager::GetStats(){
237+
238+
std::string ret="";
239+
240+
m_job_queue->m_lock.lock();
241+
m_manager_args.stats_mtx.lock();
242+
243+
ret="Queued Jobs Total = " + std::to_string(m_job_queue->size()) + " \n";
244+
245+
for(std::map<std::string, QueueStats>::iterator it = m_job_queue->m_stats.begin(); it!=m_job_queue->m_stats.end(); it++){
246+
247+
ret += " " + it->first + ": submitted = " + std::to_string(it->second.submitted) + ", queued = " + std::to_string(it->second.queued) + ", processing = " + std::to_string(m_manager_args.stats[it->first].processing) + ", completed = " + std::to_string(m_manager_args.stats[it->first].completed) + ", failed = " + std::to_string(m_manager_args.stats[it->first].failed) +"\n";
248+
249+
}
250+
251+
m_manager_args.stats_mtx.unlock();
252+
m_job_queue->m_lock.unlock();
253+
254+
return ret;
255+
256+
}
257+
258+
void WorkerPoolManager::PrintStats(){
259+
260+
printf("%s\n", GetStats().c_str());
261+
262+
}
263+
264+
void WorkerPoolManager::ClearStats(){
265+
266+
m_job_queue->m_lock.lock();
267+
m_manager_args.stats_mtx.lock();
268+
269+
m_job_queue->ClearStats();
270+
m_manager_args.stats.clear();
271+
272+
m_manager_args.stats_mtx.unlock();
273+
m_job_queue->m_lock.unlock();
274+
275+
276+
}

src/DataModelBase/WorkerPoolManager.h

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,24 @@
1111
#include <chrono>
1212

1313
namespace ToolFramework{
14+
15+
/**
16+
* \struct PoolManagerStats
17+
*
18+
* This is a struct to define stats measured for job types;
19+
*
20+
* $Author: B.Richards $
21+
* $Date: 2024/06/08 1:17:00 $
22+
*/
23+
24+
struct PoolManagerStats{
25+
26+
PoolManagerStats();
27+
unsigned long processing;
28+
unsigned long completed;
29+
unsigned long failed;
30+
31+
};
1432

1533
/**
1634
* \struct PoolWorker_args
@@ -32,7 +50,8 @@ namespace ToolFramework{
3250
Job* job;
3351
JobQueue* job_queue;
3452
JobDeque* job_out_deque;
35-
53+
std::map<std::string,PoolManagerStats>* stats;
54+
std::mutex* stats_mtx;
3655
};
3756

3857

@@ -70,6 +89,8 @@ namespace ToolFramework{
7089
std::chrono::high_resolution_clock::time_point now;
7190
std::chrono::high_resolution_clock::time_point managing_timer;
7291
std::chrono::high_resolution_clock::time_point serving_timer;
92+
std::map<std::string,PoolManagerStats> stats;
93+
std::mutex stats_mtx;
7394

7495
};
7596
/**
@@ -104,12 +125,14 @@ namespace ToolFramework{
104125

105126
void ManageWorkers(); ///< Function to manage workers and distribute jobs to be run when unthreaded if you choose to not have the managment run on a thread.
106127
unsigned int NumThreads(); ///< Function to return the number of current worker threads
107-
128+
std::string GetStats(); ///< Function to get the current stats
129+
void PrintStats(); ///< Function to print the current stats to screen
130+
void ClearStats(); ///< Function to clear the current stats
108131

109132
private:
110133

111134
void CreateManagerThread(); ///< Function to Create Manager Thread
112-
static void CreateWorkerThread(std::vector<PoolWorker_args*> &in_args, bool &in_self_serving, unsigned int &in_thread_sleep_us, JobQueue* in_job_queue, JobDeque* in_job_out_deque,unsigned long &thread_num, Utilities* in_util, unsigned int* global_thread_num=0); ///< Function to Create Worker Thread
135+
static void CreateWorkerThread(std::vector<PoolWorker_args*> &in_args, bool &in_self_serving, unsigned int &in_thread_sleep_us, JobQueue* in_job_queue, JobDeque* in_job_out_deque,unsigned long &thread_num, Utilities* in_util, std::map<std::string,PoolManagerStats>* in_stats, std::mutex* in_stats_mtx, unsigned int* global_thread_num=0 ); ///< Function to Create Worker Thread
113136
static void DeleteWorkerThread(unsigned int pos, Utilities* in_util, std::vector<PoolWorker_args*> &in_args, unsigned int* global_thread_num=0); ///< Function to delete thread @param pos is the position in the args vector below
114137

115138
static void WorkerThread(Thread_args* arg); ///< Function to be run by the thread in a loop. Make sure not to block in it

0 commit comments

Comments
 (0)