Skip to content
Merged

V4 #37

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,13 @@ jobs:
- name: Compile and build (skip integration tests)
run: ./gradlew build -x :eventlens-stream-kafka:test -x :eventlens-source-postgres:test --no-daemon

- name: Install Playwright browser
working-directory: eventlens-ui
run: npx playwright install --with-deps chromium

- name: Run frontend browser gate
run: ./gradlew :eventlens-ui:npmTestE2e --no-daemon

integration-tests:
runs-on: ubuntu-latest
needs: build
Expand Down
6 changes: 6 additions & 0 deletions .github/workflows/codeql.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@ jobs:
with:
java-version: '21'
distribution: 'temurin'

- name: Set up Node.js 22
if: matrix.language == 'java'
uses: actions/setup-node@v6
with:
node-version: '22'

- name: Build Java
if: matrix.language == 'java'
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ jobs:
distribution: 'temurin'
cache: gradle

- name: Set up Node.js 18
- name: Set up Node.js 22
uses: actions/setup-node@v6
with:
node-version: '18'
node-version: '22'

- name: Give execute permission for gradlew
run: chmod +x gradlew
Expand Down
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,17 @@ eventlens.yaml

# Test runtime
.testcontainers.properties
eventlens-api/exports/

# Local AI agent rules / dev tooling config
.agents/
.gemini/

#personal files
tests/
!eventlens-ui/tests/
!eventlens-ui/tests/e2e/
!eventlens-ui/tests/e2e/*.ts
versions/
loadtests/
plans/
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ public EventLensServer(
Duration.ofSeconds(config.getQueryCache().getTimelineTtlSeconds()));
var datasourceRoutes = new DatasourceRoutes(sourceRegistry);
var pluginRoutes = new PluginRoutes(sourceRegistry);
var statisticsRoutes = new StatisticsRoutes(sourceRegistry);
var bisectRoutes = new BisectRoutes(bisectEngine);
var anomalyRoutes = new AnomalyRoutes(sourceRegistry, config.getAnomaly(), auditLogger);
var exportRoutes = new ExportRoutes(exportEngine, auditLogger);
Expand Down Expand Up @@ -334,6 +335,7 @@ public EventLensServer(
cfg.routes.get("/api/v1/datasources", datasourceRoutes::list);
cfg.routes.get("/api/v1/datasources/{id}/health", datasourceRoutes::health);
cfg.routes.get("/api/v1/plugins", pluginRoutes::list);
cfg.routes.get("/api/v1/statistics", statisticsRoutes::get);

// Legacy aggregate routes (no redirect, but marked deprecated)
cfg.routes.get("/api/aggregates/search", ctx -> {
Expand Down Expand Up @@ -498,3 +500,5 @@ private static String extractClientIp(io.javalin.http.Context ctx) {
}
}



Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.eventlens.api.routes;

import io.eventlens.api.source.SourceRegistry;
import io.eventlens.spi.EventStatisticsQuery;
import io.javalin.http.Context;

public final class StatisticsRoutes {

private final SourceRegistry sourceRegistry;

public StatisticsRoutes(SourceRegistry sourceRegistry) {
this.sourceRegistry = sourceRegistry;
}

public void get(Context ctx) {
int bucketHours = parsePositiveInt(ctx.queryParam("bucketHours"), 1);
int maxBuckets = parsePositiveInt(ctx.queryParam("maxBuckets"), 24);
ctx.json(sourceRegistry.statistics(ctx.queryParam("source"), new EventStatisticsQuery(bucketHours, maxBuckets)));
}

private static int parsePositiveInt(String value, int fallback) {
if (value == null || value.isBlank()) {
return fallback;
}
int parsed = Integer.parseInt(value);
if (parsed <= 0) {
throw new IllegalArgumentException("Expected a positive integer");
}
return parsed;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,20 @@
import io.eventlens.core.engine.ReplayEngine;
import io.eventlens.core.plugin.DatasourceListingModel;
import io.eventlens.core.plugin.PluginInstance;
import io.eventlens.spi.HealthStatus;
import io.eventlens.core.plugin.PluginListingModel;
import io.eventlens.core.plugin.PluginManager;
import io.eventlens.core.spi.EventStoreReader;
import io.eventlens.spi.EventSourcePlugin;
import io.eventlens.spi.EventStatistics;
import io.eventlens.spi.EventStatisticsQuery;
import io.eventlens.spi.HealthStatus;
import io.eventlens.spi.PluginLifecycle;

import java.util.Comparator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;

public final class SourceRegistry {
Expand Down Expand Up @@ -104,6 +106,19 @@ public List<PluginListingModel> listPlugins() {
.toList();
}

public EventStatistics statistics(String requestedSourceId, EventStatisticsQuery query) {
if (requestedSourceId == null || requestedSourceId.isBlank() || requestedSourceId.equals(defaultSourceId)) {
return pluginManager.getEventSource(defaultSourceId)
.or(() -> pluginManager.getFirstReadyEventSource())
.map(plugin -> plugin.statistics(query))
.orElse(EventStatistics.unavailable("Statistics not available for the primary datasource"));
}

EventSourcePlugin plugin = pluginManager.getEventSource(requestedSourceId)
.orElseThrow(() -> new IllegalArgumentException("Unknown datasource: " + requestedSourceId));
return plugin.statistics(query);
}

public record ResolvedSource(
String id,
String displayName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,27 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

/**
* WebSocket live tail - streams events to connected browser clients in
* real-time.
* WebSocket live tail with bounded buffering and batched flushes.
*/
public class LiveTailWebSocket {

private static final Logger log = LoggerFactory.getLogger(LiveTailWebSocket.class);
private static final int MAX_CONNECTIONS = 500;
private static final int BACKFILL_EVENT_COUNT = 100;
private static final int MAX_BUFFERED_MESSAGES = 200;
private static final int MAX_MESSAGES_PER_FLUSH = 50;
private static final long FLUSH_DELAY_MS = 100;

private final Map<String, Set<WsContext>> sessionsBySource = new ConcurrentHashMap<>();
private final Map<String, SessionState> sessionStates = new ConcurrentHashMap<>();
private final Set<String> subscribedSources = ConcurrentHashMap.newKeySet();
private final ObjectMapper mapper = new ObjectMapper()
.findAndRegisterModules()
Expand All @@ -41,8 +49,13 @@ public class LiveTailWebSocket {
private final AuditLogger auditLogger;
private final String defaultSourceId;
private final Map<String, String> sourceStreamBindings;
private final ExecutorService backfillExecutor = java.util.concurrent.Executors.newCachedThreadPool(
private final ExecutorService backfillExecutor = Executors.newCachedThreadPool(
Thread.ofVirtual().name("eventlens-backfill-", 0).factory());
private final ScheduledExecutorService flushExecutor = Executors.newSingleThreadScheduledExecutor(r -> {
Thread thread = new Thread(r, "eventlens-live-flush");
thread.setDaemon(true);
return thread;
});

public LiveTailWebSocket(
SourceRegistry sourceRegistry,
Expand All @@ -69,6 +82,7 @@ public void configureHandlers(WsConfig ws) {
ctx.attribute("eventlensSourceId", sourceId);
ctx.enableAutomaticPings();
sessionsBySource.computeIfAbsent(sourceId, ignored -> ConcurrentHashMap.newKeySet()).add(ctx);
sessionStates.put(ctx.sessionId(), new SessionState(ctx));
EventLensMetrics.setWebsocketConnections(totalSessions());
log.debug("WebSocket client connected: {} on source {} ({} active)", ctx.sessionId(), sourceId, totalSessions());

Expand All @@ -91,7 +105,7 @@ public void configureHandlers(WsConfig ws) {
ensureSubscribed(sourceId, streamAdapter.get());
backfillExecutor.submit(() -> backfill(ctx, sourceId));
} else {
sendControl(ctx, new ControlMessage("NO_LIVE_STREAM", sourceId));
enqueueControl(ctx, new ControlMessage("NO_LIVE_STREAM", sourceId));
}
});

Expand All @@ -104,7 +118,7 @@ private void backfill(WsContext ctx, String sourceId) {
Thread.sleep(250);
var recent = sourceRegistry.resolve(sourceId).reader().getRecentEvents(BACKFILL_EVENT_COUNT);
for (var event : recent) {
if (!trySend(ctx, event)) {
if (!enqueue(ctx, event)) {
break;
}
}
Expand All @@ -127,10 +141,63 @@ public void broadcast(String sourceId, StoredEvent event) {
if (sessions == null || sessions.isEmpty()) {
return;
}
sessions.removeIf(session -> !trySend(session, event));
sessions.removeIf(session -> !enqueue(session, event));
EventLensMetrics.setWebsocketConnections(totalSessions());
}

private boolean enqueue(WsContext ctx, Object payload) {
SessionState state = sessionStates.get(ctx.sessionId());
if (state == null || state.closed.get()) {
return false;
}
try {
String json = mapper.writeValueAsString(payload);
while (state.queue.size() >= MAX_BUFFERED_MESSAGES) {
state.queue.pollFirst();
state.droppedCount++;
}
state.queue.addLast(json);
scheduleFlush(state);
return true;
} catch (Exception e) {
log.debug("WebSocket enqueue failed for {}: {}", ctx.sessionId(), e.getMessage());
return false;
}
}

private void enqueueControl(WsContext ctx, ControlMessage message) {
enqueue(ctx, message);
}

private void scheduleFlush(SessionState state) {
if (!state.flushScheduled.compareAndSet(false, true)) {
return;
}
flushExecutor.schedule(() -> flush(state), FLUSH_DELAY_MS, TimeUnit.MILLISECONDS);
}

private void flush(SessionState state) {
state.flushScheduled.set(false);
if (state.closed.get()) {
return;
}
int sent = 0;
while (sent < MAX_MESSAGES_PER_FLUSH) {
String next = state.queue.pollFirst();
if (next == null) {
break;
}
if (!trySendRaw(state.ctx, next)) {
removeSession(state.ctx);
return;
}
sent++;
}
if (!state.queue.isEmpty()) {
scheduleFlush(state);
}
}

private Optional<StreamAdapterPlugin> streamForSource(String sourceId) {
String explicit = sourceStreamBindings.get(sourceId);
if (explicit != null) {
Expand Down Expand Up @@ -173,6 +240,11 @@ private void removeSession(WsContext ctx) {
} else {
sessionsBySource.values().forEach(sessions -> sessions.remove(ctx));
}
SessionState state = sessionStates.remove(ctx.sessionId());
if (state != null) {
state.closed.set(true);
state.queue.clear();
}
EventLensMetrics.setWebsocketConnections(totalSessions());
log.debug("WebSocket session ended: {}", ctx.sessionId());
}
Expand All @@ -181,24 +253,16 @@ private int totalSessions() {
return sessionsBySource.values().stream().mapToInt(Set::size).sum();
}

private boolean trySend(WsContext ctx, StoredEvent event) {
private boolean trySendRaw(WsContext ctx, String json) {
try {
ctx.send(mapper.writeValueAsString(event));
ctx.send(json);
return true;
} catch (Exception e) {
log.debug("WebSocket send failed for {}: {}", ctx.sessionId(), e.getMessage());
return false;
}
}

private void sendControl(WsContext ctx, ControlMessage message) {
try {
ctx.send(mapper.writeValueAsString(message));
} catch (Exception e) {
log.debug("WebSocket control send failed for {}: {}", ctx.sessionId(), e.getMessage());
}
}

private StoredEvent toStoredEvent(Event event) {
return new StoredEvent(
event.eventId(),
Expand All @@ -224,4 +288,16 @@ private static String extractIp(WsContext ctx) {

private record ControlMessage(String type, String source) {
}

private static final class SessionState {
private final WsContext ctx;
private final ConcurrentLinkedDeque<String> queue = new ConcurrentLinkedDeque<>();
private final AtomicBoolean flushScheduled = new AtomicBoolean(false);
private final AtomicBoolean closed = new AtomicBoolean(false);
private volatile long droppedCount = 0;

private SessionState(WsContext ctx) {
this.ctx = ctx;
}
}
}

Large diffs are not rendered by default.

14 changes: 0 additions & 14 deletions eventlens-api/src/main/resources/web/assets/index-B-pPVu7c.js

This file was deleted.

This file was deleted.

14 changes: 14 additions & 0 deletions eventlens-api/src/main/resources/web/assets/index-ri1P99Z8.js

Large diffs are not rendered by default.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions eventlens-api/src/main/resources/web/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
<link rel="preconnect" href="https://fonts.googleapis.com" />
<link rel="preconnect" href="https://fonts.gstatic.com" crossorigin />
<link href="https://fonts.googleapis.com/css2?family=Inter:wght@400;500;600;700&family=JetBrains+Mono:wght@400;500&family=Orbitron:wght@400;500;600;700;800;900&display=swap" rel="stylesheet" />
<script type="module" crossorigin src="/assets/index-B-pPVu7c.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-Cw0Fu2da.css">
<script type="module" crossorigin src="/assets/index-ri1P99Z8.js"></script>
<link rel="stylesheet" crossorigin href="/assets/index-2dMaJUyt.css">
</head>
<body>
<div id="root"></div>
Expand Down
Loading
Loading