diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java index 816f3ee6..c5794f3d 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/queue/BoundedBlockingQueue.java @@ -1,25 +1,48 @@ package hse.java.lectures.lecture6.tasks.queue; -public class BoundedBlockingQueue { +import java.util.ArrayDeque; +import java.util.Queue; +public class BoundedBlockingQueue { + private final Queue queue = new ArrayDeque<>(); + private final int capacity; public BoundedBlockingQueue(int capacity) { - + if(capacity <= 0){ + throw new IllegalArgumentException("Capacity <= 0 :^("); + } + this.capacity = capacity; } - public void put(T item) throws InterruptedException { + public synchronized void put(T item) throws InterruptedException{ + if(item == null){ + throw new NullPointerException("not null pls"); + } + while(queue.size() == capacity()){ + wait(); + } + + queue.add(item); + notifyAll(); } - public T take() throws InterruptedException { - return null; + public synchronized T take() throws InterruptedException{ + while(queue.isEmpty()){ + wait(); + } + + T item = queue.remove(); + notifyAll(); + + return item; } - public int size() { - return 0; + public synchronized int size() { + return queue.size(); } public int capacity() { - return 0; + return capacity; } -} +} \ No newline at end of file diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java index fedb5e66..0407988d 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamWriter.java @@ -26,11 +26,18 @@ public void attachMonitor(StreamingMonitor monitor) { @Override public void run() { - // Writer threads are intentionally infinite for the task contract. while (true) { - output.print(message); - onTick.run(); + try { + if (!monitor.streamWait(id)) { + return; + } + output.print(message); + onTick.run(); + monitor.streamComplete(id); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + return; + } } } - } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java index 68e8f279..72259d2d 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/StreamingMonitor.java @@ -1,5 +1,50 @@ package hse.java.lectures.lecture6.tasks.synchronizer; public class StreamingMonitor { - // impl your sync here + private int currentWriter = 0; + private final int writersAmount; + private final int[] ticksDone; + private final int ticksPerWriter; + private boolean isFinish = false; + + public StreamingMonitor(int writersAmount, int ticksPerWriter) { + this.writersAmount = writersAmount; + this.ticksPerWriter = ticksPerWriter; + this.ticksDone = new int[writersAmount]; + } + + public synchronized boolean streamWait(int id) throws InterruptedException { + int myIndex = id - 1; + + while (currentWriter != myIndex && !isFinish) { + wait(); + } + + return !isFinish; + } + + public synchronized void streamComplete(int id) { + int myIndex = id - 1; + ticksDone[myIndex]++; + + currentWriter = (currentWriter + 1) % writersAmount; + + int checker = 0; + while (checker < writersAmount && ticksDone[currentWriter] >= ticksPerWriter) { + currentWriter = (currentWriter + 1) % writersAmount; + checker++; + } + + if (checker == writersAmount) { + isFinish = true; + } + + notifyAll(); + } + + public synchronized void synchFinish() throws InterruptedException { + while (!isFinish) { + wait(); + } + } } diff --git a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java index 3cb8aded..fdc2d429 100644 --- a/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java +++ b/src/main/java/hse/java/lectures/lecture6/tasks/synchronizer/Synchronizer.java @@ -17,17 +17,23 @@ public Synchronizer(List tasks, int ticksPerWriter) { this.ticksPerWriter = ticksPerWriter; } - /** - * Starts infinite writer threads and waits until each writer prints exactly ticksPerWriter ticks - * in strict ascending id order. - */ public void execute() { - // add monitor and sync + StreamingMonitor monitor = new StreamingMonitor(tasks.size(), ticksPerWriter); + + for (StreamWriter writer : tasks) { + writer.attachMonitor(monitor); + } + for (StreamWriter writer : tasks) { Thread worker = new Thread(writer, "stream-writer-" + writer.getId()); worker.setDaemon(true); worker.start(); } - } + try { + monitor.synchFinish(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } } diff --git a/src/main/java/hse/java/lectures/lesson7/dau/DauServiceImplementation.java b/src/main/java/hse/java/lectures/lesson7/dau/DauServiceImplementation.java new file mode 100644 index 00000000..198f6668 --- /dev/null +++ b/src/main/java/hse/java/lectures/lesson7/dau/DauServiceImplementation.java @@ -0,0 +1,61 @@ +package hse.java.lectures.lesson7.dau; + +import java.time.Clock; +import java.time.LocalDate; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +public class DauServiceImplementation implements DauService { + private final Clock clock; + private volatile LocalDate current; + + private final ConcurrentHashMap> today = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> yesterday = new ConcurrentHashMap<>(); + + public DauServiceImplementation(Clock clock) { + this.clock = clock; + this.current = LocalDate.now(clock); + } + + private void isNextDay() { + LocalDate now = LocalDate.now(clock); + + if (!now.equals(current)) { + synchronized (this) { + if (!now.equals(current)) { + yesterday.clear(); + yesterday.putAll(today); + + today.clear(); + + current = now; + } + } + } + } + + @Override + public void postEvent(Event event) { + isNextDay(); + today.computeIfAbsent(event.authorId(), a -> ConcurrentHashMap.newKeySet()).add(event.userId()); + } + + @Override + public Map getDauStatistics(List authorIds) { + isNextDay(); + + Map result = new HashMap<>(); + + for (Integer authorId : authorIds) { + Set users = yesterday.get(authorId); + result.put(authorId, users == null ? 0L : (long) users.size()); + } + + return result; + } + + @Override + public Long getAuthorDauStatistics(int authorId) { + return getDauStatistics(List.of(authorId)).get(authorId); + } +} \ No newline at end of file diff --git a/src/test/java/hse/java/lectures/lecture7/DauServiceTest.java b/src/test/java/hse/java/lectures/lecture7/DauServiceTest.java new file mode 100644 index 00000000..92a46e48 --- /dev/null +++ b/src/test/java/hse/java/lectures/lecture7/DauServiceTest.java @@ -0,0 +1,116 @@ +package hse.java.lectures.lecture7; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.time.*; +import java.util.List; +import java.util.Map; + +import static org.junit.jupiter.api.Assertions.*; + +import hse.java.lectures.lesson7.dau.*; + +@Tag("Dau") +class DauServiceTest { + + private MutableClock clock; + private DauServiceImplementation service; + + private static final ZoneId ZONE = ZoneOffset.UTC; + + @BeforeEach + void setUp() { + clock = new MutableClock(LocalDate.of(2026, 3, 21).atStartOfDay(ZONE).toInstant(), ZONE); + + service = new DauServiceImplementation(clock); + } + + private void setDate(LocalDate date) { + clock.setInstant(date.atStartOfDay(ZONE).toInstant()); + } + + @Test + void singleUser_singleClick() { + setDate(LocalDate.of(2026, 3, 20)); + service.postEvent(new Event(1, 100)); + + setDate(LocalDate.of(2026, 3, 21)); + + Long result = service.getAuthorDauStatistics(100); + + assertEquals(1L, result); + } + + @Test + void sameUser_multipleClicks() { + setDate(LocalDate.of(2026, 3, 20)); + + service.postEvent(new Event(1, 100)); + service.postEvent(new Event(1, 100)); + service.postEvent(new Event(1, 100)); + + setDate(LocalDate.of(2026, 3, 21)); + + assertEquals(1L, service.getAuthorDauStatistics(100)); + } + + @Test + void multipleUsers() { + setDate(LocalDate.of(2026, 3, 20)); + + service.postEvent(new Event(1, 100)); + service.postEvent(new Event(2, 100)); + service.postEvent(new Event(3, 100)); + + setDate(LocalDate.of(2026, 3, 21)); + + assertEquals(3L, service.getAuthorDauStatistics(100)); + } + + @Test + void differentAuthors() { + setDate(LocalDate.of(2026, 3, 20)); + + service.postEvent(new Event(1, 100)); + service.postEvent(new Event(2, 200)); + + setDate(LocalDate.of(2026, 3, 21)); + + Map stats = service.getDauStatistics(List.of(100, 200)); + + assertEquals(1L, stats.get(100)); + assertEquals(1L, stats.get(200)); + } + + @Test + void emptyClick() { + Map stats = service.getDauStatistics(List.of(100)); + + assertEquals(0L, stats.get(100)); + } + + @Test + void isTodayClicks() { + service.postEvent(new Event(1, 100)); + + assertEquals(0L, service.getAuthorDauStatistics(100)); + } + + @Test + void isDataClear() { + setDate(LocalDate.of(2026, 3, 20)); + service.postEvent(new Event(1, 100)); + + setDate(LocalDate.of(2026, 3, 21)); + service.postEvent(new Event(2, 100)); + + assertEquals(1L, service.getAuthorDauStatistics(100)); + + setDate(LocalDate.of(2026, 3, 22)); + service.postEvent(new Event(3, 100)); + + assertEquals(1L, service.getAuthorDauStatistics(100)); + } +} diff --git a/src/test/java/hse/java/lectures/lecture7/MutableClock.java b/src/test/java/hse/java/lectures/lecture7/MutableClock.java new file mode 100644 index 00000000..a2c0d043 --- /dev/null +++ b/src/test/java/hse/java/lectures/lecture7/MutableClock.java @@ -0,0 +1,34 @@ +package hse.java.lectures.lecture7; + +import lombok.Setter; + +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; + +class MutableClock extends Clock { + + @Setter + private Instant instant; + private final ZoneId zone; + + public MutableClock(Instant instant, ZoneId zone) { + this.instant = instant; + this.zone = zone; + } + + @Override + public ZoneId getZone() { + return zone; + } + + @Override + public Clock withZone(ZoneId zone) { + return new MutableClock(instant, zone); + } + + @Override + public Instant instant() { + return instant; + } +} \ No newline at end of file