-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy paththreadPool.c
More file actions
169 lines (160 loc) · 5.13 KB
/
threadPool.c
File metadata and controls
169 lines (160 loc) · 5.13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
#include "threadPool.h"
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
// declarations
void* waitForTask(void* threadPool);
void emptyQueue(ThreadPool* tp);
void freeResourses(ThreadPool* tp);
ThreadPool* tpCreate(int numOfThreads) {
ThreadPool* tp;
if ((tp = malloc(sizeof(ThreadPool))) == NULL) {
perror("malloc failed");
exit(-1);
}
if ((tp->jobQueue = osCreateQueue()) == NULL) {
perror("CreateQueue failed");
free(tp);
exit(-1);
}
if (pthread_mutex_init(&tp->queueMutex, NULL)) {
perror("mutex_init failed");
osDestroyQueue(tp->jobQueue);
free(tp);
exit(-1);
}
if (pthread_cond_init(&tp->cond, NULL)) {
perror("cond_init failed");
pthread_mutex_destroy(&tp->queueMutex);
osDestroyQueue(tp->jobQueue);
free(tp);
exit(-1);
}
if ((tp->threads = (pthread_t*)malloc(numOfThreads*sizeof(pthread_t))) == NULL) {
perror("malloc failed");
pthread_mutex_destroy(&tp->queueMutex);
pthread_cond_destroy(&tp->cond);
osDestroyQueue(tp->jobQueue);
free(tp);
exit(-1);
}
if (pthread_mutex_init(&tp->wasDestroyedMutex, NULL)) {
perror("malloc failed");
free(tp->threads);
pthread_mutex_destroy(&tp->queueMutex);
pthread_cond_destroy(&tp->cond);
osDestroyQueue(tp->jobQueue);
free(tp);
exit(-1);
}
tp->numOfThreads = numOfThreads;
tp->wasDestroyed = 0;
int i;
for (i = 0; i < numOfThreads; i++) {
if (pthread_create(&tp->threads[i], NULL, waitForTask, (void*)tp) != 0) {
perror("pthread create failed");
freeResourses(tp);
exit(-1);
}
}
return tp;
}
// take one task from the qeueue and execute it. assumes a locked queueMutex.
void doTask(ThreadPool* tp) {
if (osIsQueueEmpty(tp->jobQueue)) {
pthread_mutex_unlock(&tp->queueMutex);
return;
}
// take a job from the queue
funcAndArgs* job = (funcAndArgs*)osDequeue(tp->jobQueue);
pthread_mutex_unlock(&tp->queueMutex);
if (job) {
// run the job
job->func(job->args);
// free job struct
free(job);
}
}
void freeResourses(ThreadPool* tp) {
emptyQueue(tp);
free(tp->threads);
pthread_mutex_destroy(&tp->queueMutex);
pthread_mutex_destroy(&tp->wasDestroyedMutex);
pthread_cond_destroy(&tp->cond);
osDestroyQueue(tp->jobQueue);
free(tp);
}
// each thread runs this function. wait for a job to come and execute it.
void* waitForTask(void* threadPool) {
ThreadPool* tp = (ThreadPool*)threadPool;
pthread_mutex_lock(&tp->wasDestroyedMutex);
while (!tp->wasDestroyed) {
pthread_mutex_unlock(&tp->wasDestroyedMutex);
pthread_mutex_lock(&tp->queueMutex);
while (osIsQueueEmpty(tp->jobQueue) && !tp->wasDestroyed) {
// if there are no jobs, wait for a new job to arrive.
// thread will be signaled at arrival of a job.
pthread_cond_wait(&tp->cond, &tp->queueMutex);
}
// (try to) execute a job.
doTask(tp);
pthread_mutex_lock(&tp->wasDestroyedMutex);
}
pthread_mutex_unlock(&tp->wasDestroyedMutex);
// finish remaining tasks if needed (queue will be empty otherwise)
while (!osIsQueueEmpty(tp->jobQueue)) {
pthread_mutex_lock(&tp->queueMutex);
doTask(tp);
}
return NULL;
}
// empty jobs queue
void emptyQueue(ThreadPool* tp) {
pthread_mutex_lock(&tp->queueMutex);
while (!osIsQueueEmpty(tp->jobQueue)) {
free(osDequeue(tp->jobQueue));
}
pthread_mutex_unlock(&tp->queueMutex);
}
void tpDestroy(ThreadPool* threadPool, int shouldWaitForTasks) {
// dont allow to destroy twice
if (threadPool->wasDestroyed)
return;
// printf("destroy!\n");
pthread_mutex_lock(&threadPool->wasDestroyedMutex);
threadPool->wasDestroyed = 1;
pthread_mutex_unlock(&threadPool->wasDestroyedMutex);
// empty queue if shouldWaitForTasks = 0
if (!shouldWaitForTasks)
emptyQueue(threadPool);
int i;
// signal all waiting threads
pthread_cond_broadcast(&threadPool->cond);
for (i = 0; i < threadPool->numOfThreads; i++) {
pthread_join(threadPool->threads[i], NULL);
}
freeResourses(threadPool);
}
int tpInsertTask(ThreadPool* threadPool, void (*computeFunc) (void *), void* param) {
pthread_mutex_lock(&threadPool->wasDestroyedMutex);
// tp was already destroyed, cant add more jobs
if (threadPool->wasDestroyed) {
pthread_mutex_unlock(&threadPool->wasDestroyedMutex);
return -1;
}
pthread_mutex_unlock(&threadPool->wasDestroyedMutex);
funcAndArgs* func;
if((func = (funcAndArgs*)malloc(sizeof(funcAndArgs))) == NULL) {
perror("malloc failed");
freeResourses(threadPool);
exit(-1);
}
func->func = computeFunc;
func->args = param;
pthread_mutex_lock(&threadPool->queueMutex);
osEnqueue(threadPool->jobQueue, func);
// printf("job added\n");
pthread_mutex_unlock(&threadPool->queueMutex);
pthread_cond_signal(&threadPool->cond);
return 0;
}