Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,25 +1,48 @@
package hse.java.lectures.lecture6.tasks.queue;

public class BoundedBlockingQueue<T> {
import java.util.ArrayDeque;
import java.util.Queue;

public class BoundedBlockingQueue<T> {
private final Queue<T> queue = new ArrayDeque<>();
private final int capacity;

public BoundedBlockingQueue(int capacity) {

if(capacity <= 0){
throw new IllegalArgumentException("Capacity <= 0 :^(");
}
this.capacity = capacity;
}

public void put(T item) throws InterruptedException {
public synchronized void put(T item) throws InterruptedException{
if(item == null){
throw new NullPointerException("not null pls");
}

while(queue.size() == capacity()){
wait();
}

queue.add(item);
notifyAll();
}

public T take() throws InterruptedException {
return null;
public synchronized T take() throws InterruptedException{
while(queue.isEmpty()){
wait();
}

T item = queue.remove();
notifyAll();

return item;
}

public int size() {
return 0;
public synchronized int size() {
return queue.size();
}

public int capacity() {
return 0;
return capacity;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,18 @@ public void attachMonitor(StreamingMonitor monitor) {

@Override
public void run() {
// Writer threads are intentionally infinite for the task contract.
while (true) {
output.print(message);
onTick.run();
try {
if (!monitor.streamWait(id)) {
return;
}
output.print(message);
onTick.run();
monitor.streamComplete(id);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
}
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,50 @@
package hse.java.lectures.lecture6.tasks.synchronizer;

public class StreamingMonitor {
// impl your sync here
private int currentWriter = 0;
private final int writersAmount;
private final int[] ticksDone;
private final int ticksPerWriter;
private boolean isFinish = false;

public StreamingMonitor(int writersAmount, int ticksPerWriter) {
this.writersAmount = writersAmount;
this.ticksPerWriter = ticksPerWriter;
this.ticksDone = new int[writersAmount];
}

public synchronized boolean streamWait(int id) throws InterruptedException {
int myIndex = id - 1;

while (currentWriter != myIndex && !isFinish) {
wait();
}

return !isFinish;
}

public synchronized void streamComplete(int id) {
int myIndex = id - 1;
ticksDone[myIndex]++;

currentWriter = (currentWriter + 1) % writersAmount;

int checker = 0;
while (checker < writersAmount && ticksDone[currentWriter] >= ticksPerWriter) {
currentWriter = (currentWriter + 1) % writersAmount;
checker++;
}

if (checker == writersAmount) {
isFinish = true;
}

notifyAll();
}

public synchronized void synchFinish() throws InterruptedException {
while (!isFinish) {
wait();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,23 @@ public Synchronizer(List<StreamWriter> tasks, int ticksPerWriter) {
this.ticksPerWriter = ticksPerWriter;
}

/**
* Starts infinite writer threads and waits until each writer prints exactly ticksPerWriter ticks
* in strict ascending id order.
*/
public void execute() {
// add monitor and sync
StreamingMonitor monitor = new StreamingMonitor(tasks.size(), ticksPerWriter);

for (StreamWriter writer : tasks) {
writer.attachMonitor(monitor);
}

for (StreamWriter writer : tasks) {
Thread worker = new Thread(writer, "stream-writer-" + writer.getId());
worker.setDaemon(true);
worker.start();
}
}

try {
monitor.synchFinish();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package hse.java.lectures.lesson7.dau;

import java.time.Clock;
import java.time.LocalDate;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

public class DauServiceImplementation implements DauService {
private final Clock clock;
private volatile LocalDate current;

private final ConcurrentHashMap<Integer, Set<Integer>> today = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Integer, Set<Integer>> yesterday = new ConcurrentHashMap<>();

public DauServiceImplementation(Clock clock) {
this.clock = clock;
this.current = LocalDate.now(clock);
}

private void isNextDay() {
LocalDate now = LocalDate.now(clock);

if (!now.equals(current)) {
synchronized (this) {
if (!now.equals(current)) {
yesterday.clear();
yesterday.putAll(today);

today.clear();

current = now;
}
}
}
}

@Override
public void postEvent(Event event) {
isNextDay();
today.computeIfAbsent(event.authorId(), a -> ConcurrentHashMap.newKeySet()).add(event.userId());
}

@Override
public Map<Integer, Long> getDauStatistics(List<Integer> authorIds) {
isNextDay();

Map<Integer, Long> result = new HashMap<>();

for (Integer authorId : authorIds) {
Set<Integer> users = yesterday.get(authorId);
result.put(authorId, users == null ? 0L : (long) users.size());
}

return result;
}

@Override
public Long getAuthorDauStatistics(int authorId) {
return getDauStatistics(List.of(authorId)).get(authorId);
}
}
116 changes: 116 additions & 0 deletions src/test/java/hse/java/lectures/lecture7/DauServiceTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package hse.java.lectures.lecture7;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;

import java.time.*;
import java.util.List;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.*;

import hse.java.lectures.lesson7.dau.*;

@Tag("Dau")
class DauServiceTest {

private MutableClock clock;
private DauServiceImplementation service;

private static final ZoneId ZONE = ZoneOffset.UTC;

@BeforeEach
void setUp() {
clock = new MutableClock(LocalDate.of(2026, 3, 21).atStartOfDay(ZONE).toInstant(), ZONE);

service = new DauServiceImplementation(clock);
}

private void setDate(LocalDate date) {
clock.setInstant(date.atStartOfDay(ZONE).toInstant());
}

@Test
void singleUser_singleClick() {
setDate(LocalDate.of(2026, 3, 20));
service.postEvent(new Event(1, 100));

setDate(LocalDate.of(2026, 3, 21));

Long result = service.getAuthorDauStatistics(100);

assertEquals(1L, result);
}

@Test
void sameUser_multipleClicks() {
setDate(LocalDate.of(2026, 3, 20));

service.postEvent(new Event(1, 100));
service.postEvent(new Event(1, 100));
service.postEvent(new Event(1, 100));

setDate(LocalDate.of(2026, 3, 21));

assertEquals(1L, service.getAuthorDauStatistics(100));
}

@Test
void multipleUsers() {
setDate(LocalDate.of(2026, 3, 20));

service.postEvent(new Event(1, 100));
service.postEvent(new Event(2, 100));
service.postEvent(new Event(3, 100));

setDate(LocalDate.of(2026, 3, 21));

assertEquals(3L, service.getAuthorDauStatistics(100));
}

@Test
void differentAuthors() {
setDate(LocalDate.of(2026, 3, 20));

service.postEvent(new Event(1, 100));
service.postEvent(new Event(2, 200));

setDate(LocalDate.of(2026, 3, 21));

Map<Integer, Long> stats = service.getDauStatistics(List.of(100, 200));

assertEquals(1L, stats.get(100));
assertEquals(1L, stats.get(200));
}

@Test
void emptyClick() {
Map<Integer, Long> stats = service.getDauStatistics(List.of(100));

assertEquals(0L, stats.get(100));
}

@Test
void isTodayClicks() {
service.postEvent(new Event(1, 100));

assertEquals(0L, service.getAuthorDauStatistics(100));
}

@Test
void isDataClear() {
setDate(LocalDate.of(2026, 3, 20));
service.postEvent(new Event(1, 100));

setDate(LocalDate.of(2026, 3, 21));
service.postEvent(new Event(2, 100));

assertEquals(1L, service.getAuthorDauStatistics(100));

setDate(LocalDate.of(2026, 3, 22));
service.postEvent(new Event(3, 100));

assertEquals(1L, service.getAuthorDauStatistics(100));
}
}
34 changes: 34 additions & 0 deletions src/test/java/hse/java/lectures/lecture7/MutableClock.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package hse.java.lectures.lecture7;

import lombok.Setter;

import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;

class MutableClock extends Clock {

@Setter
private Instant instant;
private final ZoneId zone;

public MutableClock(Instant instant, ZoneId zone) {
this.instant = instant;
this.zone = zone;
}

@Override
public ZoneId getZone() {
return zone;
}

@Override
public Clock withZone(ZoneId zone) {
return new MutableClock(instant, zone);
}

@Override
public Instant instant() {
return instant;
}
}
Loading