-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathDefferedCallbackExecutor.java
More file actions
110 lines (86 loc) · 2.63 KB
/
DefferedCallbackExecutor.java
File metadata and controls
110 lines (86 loc) · 2.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
package interview;
import java.util.HashSet;
import java.util.PriorityQueue;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
class DefferedCallbackExecutor{
private static Random random = new Random(System.currentTimeMillis());
PriorityQueue<CallBack> q= null;/*new PriorityQueue<CallBack>(new Comparartor<CallBack>(){
public int compare(CallBack c1,CallBack c2){
return (int) (c1.executeAt - c2.executeAt);
}
})*/;
ReentrantLock lock = new ReentrantLock();
Condition newCallBackArrived = lock.newCondition();
private long findSleepDuration(){
long currentTime = System.currentTimeMillis();
return q.peek().executeAt - currentTime;
}
public void start() throws InterruptedException{
long sleepFor = 0;
while(true){
lock.lock();
while(q.size() == 0){
newCallBackArrived.await();
}
while(q.size()!=0){
sleepFor = findSleepDuration();
if(sleepFor <= 0)
break;
newCallBackArrived.await(sleepFor,TimeUnit.MILLISECONDS);
}
CallBack cb = q.poll();
System.out.println("Executed at: " +System.currentTimeMillis()/1000 +"required at " +cb.executeAt + "message"+cb.message);
lock.unlock();
}
}
public void registerCallback(CallBack callback){
lock.lock();
q.add(callback);
newCallBackArrived.signal();
lock.unlock();
}
static class CallBack{
long executeAt;
String message;
public CallBack(long executeAfter, String message){
this.executeAt = System.currentTimeMillis()+ (executeAfter *1000);
this.message = message;
}
}
public static void runTestTenCallbacks() throws InterruptedException {
Set<Thread> allThreads = new HashSet<>();
final DefferedCallbackExecutor defferedCallbackExecutor = new DefferedCallbackExecutor();
Thread service = new Thread(new Runnable(){
public void run(){
try{
defferedCallbackExecutor.start();
}catch(InterruptedException e){
}
}
});
service.start();
for(int i=0;i<10;i++){
Thread thread = new Thread(new Runnable(){
public void run(){
try{
CallBack cb = new CallBack(1, "Hello this is "+Thread.currentThread().get());
defferedCallbackExecutor.registerCallback(cb);
.start();
}catch(InterruptedException e){
}
}
});
thread.setName("Thread_"+(i+1));
thread.start();
allThreads.add(thread);
Thread.sleep(1000);
}
for(Thread t: allThreads){
t.join();
}
}
}