diff --git a/pom.xml b/pom.xml
index fb0e6d4d..687064a9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -25,6 +25,16 @@
4.12
test
+
+ org.apache.logging.log4j
+ log4j-api
+ 2.22.1
+
+
+ org.apache.logging.log4j
+ log4j-core
+ 2.22.1
+
diff --git a/src/main/java/core/basesyntax/BlockingQueue.java b/src/main/java/core/basesyntax/BlockingQueue.java
index 77a20440..7f0e73df 100644
--- a/src/main/java/core/basesyntax/BlockingQueue.java
+++ b/src/main/java/core/basesyntax/BlockingQueue.java
@@ -4,24 +4,38 @@
import java.util.Queue;
public class BlockingQueue {
- private Queue queue = new LinkedList<>();
- private int capacity;
+ private final Queue queue = new LinkedList<>();
+ private final int capacity;
public BlockingQueue(int capacity) {
this.capacity = capacity;
}
public synchronized void put(T element) throws InterruptedException {
- // write your code here
+
+ while (queue.size() == capacity) {
+ wait();
+ }
+
+ queue.add(element);
+
+ notifyAll();
}
public synchronized T take() throws InterruptedException {
- // write your code here
- return null;
+
+ while (queue.isEmpty()) {
+ wait();
+ }
+
+ T element = queue.poll();
+
+ notifyAll();
+ return element;
}
public synchronized boolean isEmpty() {
- // write your code here
- return true;
+
+ return queue.isEmpty();
}
}
diff --git a/src/main/java/core/basesyntax/thread/Consumer.java b/src/main/java/core/basesyntax/thread/Consumer.java
index a28994fa..148e1c6d 100644
--- a/src/main/java/core/basesyntax/thread/Consumer.java
+++ b/src/main/java/core/basesyntax/thread/Consumer.java
@@ -1,9 +1,13 @@
package core.basesyntax.thread;
import core.basesyntax.BlockingQueue;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public class Consumer implements Runnable {
- private BlockingQueue blockingQueue;
+
+ private static final Logger logger = LogManager.getLogger(Consumer.class);
+ private final BlockingQueue blockingQueue;
public Consumer(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
@@ -11,11 +15,14 @@ public Consumer(BlockingQueue blockingQueue) {
@Override
public void run() {
- while (!blockingQueue.isEmpty()) {
+ while (!Thread.currentThread().isInterrupted()) {
try {
- System.out.println("Took value " + blockingQueue.take());
+ Integer value = blockingQueue.take();
+ System.out.println("Took value " + value);
} catch (InterruptedException e) {
- throw new RuntimeException("Consumer was interrupted!", e);
+ Thread.currentThread().interrupt();
+ logger.warn("Thread was interrupted while taking from queue", e);
+ break;
}
}
}
diff --git a/src/main/java/core/basesyntax/thread/Producer.java b/src/main/java/core/basesyntax/thread/Producer.java
index a08376ce..479a1ad2 100644
--- a/src/main/java/core/basesyntax/thread/Producer.java
+++ b/src/main/java/core/basesyntax/thread/Producer.java
@@ -1,9 +1,13 @@
package core.basesyntax.thread;
import core.basesyntax.BlockingQueue;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
public class Producer implements Runnable {
- private BlockingQueue blockingQueue;
+
+ private static final Logger logger = LogManager.getLogger(Consumer.class);
+ private final BlockingQueue blockingQueue;
public Producer(BlockingQueue blockingQueue) {
this.blockingQueue = blockingQueue;
@@ -15,7 +19,9 @@ public void run() {
try {
blockingQueue.put(i);
} catch (InterruptedException e) {
- throw new RuntimeException("Producer was interrupted!", e);
+ Thread.currentThread().interrupt();
+ logger.warn("Producer interrupted while putting value {}", i, e);
+ break;
}
}
}