-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathexecutor_framework
More file actions
112 lines (97 loc) · 2.7 KB
/
executor_framework
File metadata and controls
112 lines (97 loc) · 2.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
````
import java.util.LinkedList;
import java.util.Queue;
public class ExecutorMain {
public static void main(String[] args) throws InterruptedException {
// create queue size - 3
// Number of threads - 4
ThreadPool threadPool = new ThreadPool(3, 4);
// Created 15 Tasks and submit to pool
for (int taskNumber = 1; taskNumber <= 7; taskNumber++) {
TestTask task = new TestTask(taskNumber);
threadPool.submitTask(task);
}
}
}
class ThreadPool {
BlockingQueue<Runnable> queue;
public ThreadPool(int queueSize, int nThread) {
queue = new BlockingQueue<>(queueSize);
String threadName = null;
TaskExecutor task = null;
for (int count = 0; count < nThread; count++) {
threadName = "Thread-" + count;
task = new TaskExecutor(queue);
Thread thread = new Thread(task, threadName);
thread.start();
}
}
public void submitTask(Runnable task) throws InterruptedException {
queue.enqueue(task);
}
}
class TaskExecutor implements Runnable {
BlockingQueue<Runnable> queue;
public TaskExecutor(BlockingQueue<Runnable> queue) {
this.queue = queue;
}
@Override
public void run() {
try {
while (true) {
String name = Thread.currentThread().getName();
Runnable task = queue.dequeue();
System.out.println("Task Started by Thread :" + name);
task.run();
System.out.println("Task Finished by Thread :" + name);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
class BlockingQueue<Type> {
private Queue<Type> queue = new LinkedList<>();
private int EMPTY = 0;
private int MAX_TASK_IN_QUEUE = -1;
public BlockingQueue(int size) {
this.MAX_TASK_IN_QUEUE = size;
}
public synchronized void enqueue(Type task) throws InterruptedException {
while (this.queue.size() == this.MAX_TASK_IN_QUEUE) {
wait();
}
if (this.queue.size() == EMPTY) {
notifyAll();
}
this.queue.offer(task);
}
public synchronized Type dequeue() throws InterruptedException {
while (this.queue.size() == EMPTY) {
wait();
}
if (this.queue.size() == this.MAX_TASK_IN_QUEUE) {
notifyAll();
}
return this.queue.poll();
}
}
class TestTask implements Runnable {
private int number;
public TestTask(int number) {
this.number = number;
}
@Override
public void run() {
System.out.println("Start executing of task number :" + number);
try {
// Simulating processing time
// perform tasks
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("End executing of task number :" + number);
}
}
````