diff --git a/src/main/java/core/basesyntax/BlockingQueue.java b/src/main/java/core/basesyntax/BlockingQueue.java index 77a20440..daf35fb6 100644 --- a/src/main/java/core/basesyntax/BlockingQueue.java +++ b/src/main/java/core/basesyntax/BlockingQueue.java @@ -12,16 +12,23 @@ public BlockingQueue(int capacity) { } public synchronized void put(T element) throws InterruptedException { - // write your code here + while (queue.size() == capacity) { + wait(); + } + queue.add(element); + notifyAll(); } public synchronized T take() throws InterruptedException { - // write your code here - return null; + while (queue.isEmpty()) { + wait(); + } + T element = queue.remove(); + notifyAll(); + return element; } public synchronized boolean isEmpty() { - // write your code here - return true; + return queue.isEmpty(); } } diff --git a/src/main/java/core/basesyntax/thread/Consumer.java b/src/main/java/core/basesyntax/thread/Consumer.java index a28994fa..ad9166da 100644 --- a/src/main/java/core/basesyntax/thread/Consumer.java +++ b/src/main/java/core/basesyntax/thread/Consumer.java @@ -3,7 +3,7 @@ import core.basesyntax.BlockingQueue; public class Consumer implements Runnable { - private BlockingQueue blockingQueue; + private final BlockingQueue blockingQueue; public Consumer(BlockingQueue blockingQueue) { this.blockingQueue = blockingQueue; @@ -11,12 +11,17 @@ public Consumer(BlockingQueue blockingQueue) { @Override public void run() { - while (!blockingQueue.isEmpty()) { - try { - System.out.println("Took value " + blockingQueue.take()); - } catch (InterruptedException e) { - throw new RuntimeException("Consumer was interrupted!", e); + try { + while (true) { + Integer value = blockingQueue.take(); + if (value == null) { + break; + } + System.out.println("Consumed " + value); } + System.out.println("Consumer finished."); + } catch (InterruptedException e) { + System.out.println("Consumer was interrupted!"); } } } diff --git a/src/main/java/core/basesyntax/thread/Producer.java b/src/main/java/core/basesyntax/thread/Producer.java index a08376ce..fe502c0c 100644 --- a/src/main/java/core/basesyntax/thread/Producer.java +++ b/src/main/java/core/basesyntax/thread/Producer.java @@ -11,12 +11,14 @@ public Producer(BlockingQueue blockingQueue) { @Override public void run() { - for (int i = 0; i < 50; i++) { - try { + try { + for (int i = 0; i < 50; i++) { blockingQueue.put(i); - } catch (InterruptedException e) { - throw new RuntimeException("Producer was interrupted!", e); + System.out.println("Produced " + i); } + blockingQueue.put(null); + } catch (InterruptedException e) { + System.out.println("Producer was interrupted!"); } } }