-
Notifications
You must be signed in to change notification settings - Fork 21.2k
Expand file tree
/
Copy pathThreadSafeQueue.java
More file actions
186 lines (172 loc) · 5.25 KB
/
ThreadSafeQueue.java
File metadata and controls
186 lines (172 loc) · 5.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
package com.thealgorithms.datastructures.queues;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
/**
* @brief Thread-safe bounded queue implementation using ReentrantLock and Condition variables
* @details A blocking queue that supports multiple producers and consumers.
* Uses a circular buffer internally with lock-based synchronization to ensure
* thread safety. Producers block when the queue is full, and consumers block
* when the queue is empty.
* @see <a href="https://en.wikipedia.org/wiki/Producer%E2%80%93consumer_problem">Producer-Consumer Problem</a>
*/
public class ThreadSafeQueue<T> {
private final Object[] buffer;
private final int capacity;
private int head;
private int tail;
private int count;
private final ReentrantLock lock;
private final Condition notFull;
private final Condition notEmpty;
/**
* @brief Constructs a ThreadSafeQueue with the specified capacity
* @param capacity the maximum number of elements the queue can hold
* @throws IllegalArgumentException if capacity is less than or equal to zero
*/
public ThreadSafeQueue(int capacity) {
if (capacity <= 0) {
throw new IllegalArgumentException("Capacity must be greater than zero.");
}
this.capacity = capacity;
this.buffer = new Object[capacity];
this.head = 0;
this.tail = 0;
this.count = 0;
this.lock = new ReentrantLock();
this.notFull = lock.newCondition();
this.notEmpty = lock.newCondition();
}
/**
* @brief Adds an element to the tail of the queue, blocking if full
* @param item the element to add
* @throws InterruptedException if the thread is interrupted while waiting
* @throws IllegalArgumentException if the item is null
*/
public void enqueue(T item) throws InterruptedException {
if (item == null) {
throw new IllegalArgumentException("Cannot enqueue null item.");
}
lock.lock();
try {
while (count == capacity) {
notFull.await();
}
buffer[tail] = item;
tail = (tail + 1) % capacity;
count++;
notEmpty.signalAll();
} finally {
lock.unlock();
}
}
/**
* @brief Removes and returns the element at the head of the queue, blocking if empty
* @return the element at the head of the queue
* @throws InterruptedException if the thread is interrupted while waiting
*/
@SuppressWarnings("unchecked")
public T dequeue() throws InterruptedException {
lock.lock();
try {
while (count == 0) {
notEmpty.await();
}
T item = (T) buffer[head];
buffer[head] = null;
head = (head + 1) % capacity;
count--;
notFull.signalAll();
return item;
} finally {
lock.unlock();
}
}
/**
* @brief Adds an element to the tail of the queue without blocking
* @param item the element to add
* @return true if the element was added, false if the queue was full
* @throws IllegalArgumentException if the item is null
*/
public boolean offer(T item) {
if (item == null) {
throw new IllegalArgumentException("Cannot enqueue null item.");
}
lock.lock();
try {
if (count == capacity) {
return false;
}
buffer[tail] = item;
tail = (tail + 1) % capacity;
count++;
notEmpty.signalAll();
return true;
} finally {
lock.unlock();
}
}
/**
* @brief Removes and returns the element at the head without blocking
* @return the element at the head, or null if the queue is empty
*/
@SuppressWarnings("unchecked")
public T poll() {
lock.lock();
try {
if (count == 0) {
return null;
}
T item = (T) buffer[head];
buffer[head] = null;
head = (head + 1) % capacity;
count--;
notFull.signalAll();
return item;
} finally {
lock.unlock();
}
}
/**
* @brief Returns the number of elements in the queue
* @return the current size of the queue
*/
public int size() {
lock.lock();
try {
return count;
} finally {
lock.unlock();
}
}
/**
* @brief Checks if the queue is empty
* @return true if the queue contains no elements
*/
public boolean isEmpty() {
lock.lock();
try {
return count == 0;
} finally {
lock.unlock();
}
}
/**
* @brief Checks if the queue is full
* @return true if the queue has reached its capacity
*/
public boolean isFull() {
lock.lock();
try {
return count == capacity;
} finally {
lock.unlock();
}
}
/**
* @brief Returns the maximum capacity of the queue
* @return the capacity
*/
public int capacity() {
return capacity;
}
}