Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dashboard/jest.config.cjs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
module.exports = {
preset: 'ts-jest/presets/default-esm',
testEnvironment: 'jsdom',
testTimeout: 15000,
setupFilesAfterEnv: ['<rootDir>/jest.setup.cjs'],
extensionsToTreatAsEsm: ['.ts', '.tsx'],
moduleNameMapper: {
Expand Down
16 changes: 16 additions & 0 deletions dashboard/src/pages/EventExplorerPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { useWalletAccountSync } from '../hooks/useWalletAccountSync';
const DEFAULT_EVENT_COUNT = 5000;
const DEFAULT_LIMIT = 12;
const API_URL = import.meta.env.VITE_EVENTS_API_URL ?? 'http://localhost:8787/api/events';
const POLL_INTERVAL_MS = 15_000;
const LISTENER_BASE_URL = API_URL.replace('/api/events', '');
const INDEXING_HEALTH_URL =
import.meta.env.VITE_INDEXING_HEALTH_URL ?? resolveIndexingHealthUrl(API_URL);
Expand Down Expand Up @@ -102,8 +103,23 @@ export function EventExplorerPage() {
loadEvents();
loadStatus();

// Poll for status updates so delivered/failed notifications are reflected
// without requiring a manual page refresh.
const intervalId = setInterval(async () => {
try {
const remoteEvents = await fetchEvents(API_URL);
if (!cancelled) {
setEvents(remoteEvents);
}
} catch {
// Silently ignore polling errors — the error banner is reserved for
// the initial load failure so background polls don't disrupt the user.
}
}, POLL_INTERVAL_MS);

return () => {
cancelled = true;
clearInterval(intervalId);
};
}, [lastFetchedAt, setEvents, setError, setLoading]);

Expand Down
13 changes: 13 additions & 0 deletions dashboard/src/pages/EventsPage.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { restoreWalletSession } from '../services/wallet';
const DEFAULT_EVENT_COUNT = 5000;
const API_URL =
import.meta.env.VITE_EVENTS_API_URL ?? 'http://localhost:8787/api/events';
const POLL_INTERVAL_MS = 15_000;

export function EventsPage() {
const setEvents = useEventStore((state) => state.setEvents);
Expand Down Expand Up @@ -59,8 +60,20 @@ export function EventsPage() {

loadEvents();

const intervalId = setInterval(async () => {
try {
const remoteEvents = await fetchEvents(API_URL);
if (!cancelled) {
setEvents(remoteEvents);
}
} catch {
// Silently ignore background poll errors.
}
}, POLL_INTERVAL_MS);

return () => {
cancelled = true;
clearInterval(intervalId);
};
}, [lastFetchedAt, setEvents, setError, setLoading]);

Expand Down
42 changes: 42 additions & 0 deletions dashboard/src/store/eventStore.test.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -117,3 +117,45 @@ describe('pagination + filter interaction', () => {
expect(after[0].textContent).toContain('Withdrawal');
});
});

describe('stale cache regression tests', () => {
beforeEach(() => {
useEventStore.setState({ events: [], filters: { search: '', contractAddress: 'all', eventType: 'all' }, isLoading: false, error: null });
});

it('setEvents with an updated record replaces the stale copy, not silently dropped', () => {
const [event] = generateMockEvents(1);
const staleEvent = { ...event, value: 'stale-value' };
const freshEvent = { ...event, value: 'fresh-value' };

useEventStore.getState().setEvents([staleEvent]);
// Simulate a poll returning the same eventId with updated data
useEventStore.getState().setEvents([freshEvent]);

const stored = useEventStore.getState().events;
expect(stored).toHaveLength(1);
expect(stored[0].value).toBe('fresh-value');
});

it('appendEvents with an updated record replaces the stale copy', () => {
const [event] = generateMockEvents(1);
const staleEvent = { ...event, value: 'stale-value' };
const freshEvent = { ...event, value: 'fresh-value' };

useEventStore.getState().setEvents([staleEvent]);
// Poll brings in the updated record
useEventStore.getState().appendEvents([freshEvent]);

const stored = useEventStore.getState().events;
expect(stored).toHaveLength(1);
expect(stored[0].value).toBe('fresh-value');
});

it('setEvents keeps order stable when no duplicates are present', () => {
const [a, b, c] = generateMockEvents(3);
useEventStore.getState().setEvents([a, b, c]);

const stored = useEventStore.getState().events;
expect(stored.map((e) => e.eventId)).toEqual([a.eventId, b.eventId, c.eventId]);
});
});
19 changes: 9 additions & 10 deletions dashboard/src/store/eventStore.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,13 @@ interface EventStoreState {
}

function dedupeEventsById(events: BlockchainEvent[]): BlockchainEvent[] {
const seenEventIds = new Set<string>();

return events.filter((event) => {
if (seenEventIds.has(event.eventId)) {
return false;
}

seenEventIds.add(event.eventId);
return true;
});
// Use a Map to keep the last-seen record for each eventId so that status
// updates (newer entries) overwrite stale cached copies rather than being dropped.
const byId = new Map<string, BlockchainEvent>();
for (const event of events) {
byId.set(event.eventId, event);
}
return Array.from(byId.values());
}

export const useEventStore = create<EventStoreState>((set) => ({
Expand All @@ -77,6 +74,8 @@ export const useEventStore = create<EventStoreState>((set) => ({
setEvents: (events) => set({ events: dedupeEventsById(events), lastFetchedAt: Date.now() }),
appendEvents: (events) =>
set((state) => ({
// Existing events go first so incoming (fresh) events overwrite stale
// copies when the Map processes duplicates last-write-wins.
events: dedupeEventsById([...state.events, ...events]),
})),
setSearch: (search) => set((state) => ({ filters: { ...state.filters, search } })),
Expand Down
7 changes: 5 additions & 2 deletions listener/API.md
Original file line number Diff line number Diff line change
Expand Up @@ -305,7 +305,8 @@ Returns paginated delivery execution records from `notification_execution_log`.
| Name | Type | Required | Description |
|-----------|--------|----------|-------------------------------------------------------------------|
| limit | number | No | Maximum records per page (default `20`, max `100`) |
| offset | number | No | Number of records to skip (default `0`) |
| offset | number | No | Number of records to skip (default `0`). Prefer `cursor`. |
| cursor | string | No | Opaque token for cursor-based pagination |
| status | string | No | Filter by execution status: `SUCCESS`, `FAILED`, or `RETRY` |
| startDate | string | No | ISO 8601 lower bound on `execution_time` (inclusive) |
| endDate | string | No | ISO 8601 upper bound on `execution_time` (inclusive) |
Expand All @@ -329,7 +330,8 @@ Returns paginated delivery execution records from `notification_execution_log`.
"itemCount": 5,
"totalPages": 3,
"limit": 2,
"offset": 0
"offset": 0,
"nextCursor": "MjAyNC0wNi0yMFQxNTowMDowMC4wMDBaLDQy"
}
```

Expand All @@ -341,6 +343,7 @@ Returns paginated delivery execution records from `notification_execution_log`.
| totalPages | number | Total pages available at the requested `limit` (`0` when `itemCount` is `0`) |
| limit | number | Effective page size applied to the query |
| offset | number | Number of records skipped before this page |
| nextCursor | string | Opaque token to fetch the next page of results |

Existing clients that read `total`, `limit`, `offset`, and `records` continue to work unchanged. New clients should prefer `itemCount` and `totalPages` for pagination UI.

Expand Down
3 changes: 3 additions & 0 deletions listener/src/api/events-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,7 @@ export function createEventsServer(options: EventsServerOptions): http.Server {
const url = new URL(req.url, 'http://localhost');
const limit = url.searchParams.get('limit') ? parseInt(url.searchParams.get('limit')!, 10) : undefined;
const offset = url.searchParams.get('offset') ? parseInt(url.searchParams.get('offset')!, 10) : undefined;
const cursor = url.searchParams.get('cursor') || undefined;
const status = url.searchParams.get('status') as 'SUCCESS' | 'FAILED' | 'RETRY' | null;
const startDate = url.searchParams.get('startDate');
const endDate = url.searchParams.get('endDate');
Expand All @@ -827,6 +828,7 @@ export function createEventsServer(options: EventsServerOptions): http.Server {
correlationId,
limit,
offset,
cursor,
status,
startDate,
endDate,
Expand All @@ -835,6 +837,7 @@ export function createEventsServer(options: EventsServerOptions): http.Server {
historyService.getHistory({
limit,
offset,
cursor,
status: status || undefined,
startDate: startDate || undefined,
endDate: endDate || undefined,
Expand Down
98 changes: 98 additions & 0 deletions listener/src/api/notifications-history.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -227,6 +227,104 @@ describe('GET /api/notifications/history', () => {
expect(status).toBe(200);
expect((body as any).limit).toBeLessThanOrEqual(100);
});

it('supports cursor-based pagination', async () => {
server = await startServer(BASE_OPTIONS);

for (let i = 0; i < 5; i++) {
await db.run(
`INSERT INTO scheduled_notifications
(payload, notification_type, target_recipient, execute_at, status)
VALUES (?, ?, ?, ?, ?)`,
[JSON.stringify({ test: true }), 'discord', 'test_user', new Date().toISOString(), 'COMPLETED']
);
}

const times = [
'2026-06-20T10:00:00.000Z',
'2026-06-20T10:00:01.000Z',
'2026-06-20T10:00:02.000Z',
'2026-06-20T10:00:03.000Z',
'2026-06-20T10:00:04.000Z',
];

for (let i = 1; i <= 5; i++) {
await db.run(
`INSERT INTO notification_execution_log
(scheduled_notification_id, execution_attempt, execution_time, status, duration_ms)
VALUES (?, ?, ?, ?, ?)`,
[i, 1, times[i - 1], 'SUCCESS', 100]
);
}

const { status: status1, body: body1 } = await makeRequest(
server,
'/api/notifications/history?limit=2'
);

expect(status1).toBe(200);
expect((body1 as any).records.length).toBe(2);
expect((body1 as any).records[0].executionTime).toBe(times[4]); // DESC order
expect((body1 as any).records[1].executionTime).toBe(times[3]);
expect((body1 as any).nextCursor).toBeDefined();

const cursor = encodeURIComponent((body1 as any).nextCursor);
const { status: status2, body: body2 } = await makeRequest(
server,
`/api/notifications/history?limit=2&cursor=${cursor}`
);

expect(status2).toBe(200);
expect((body2 as any).records.length).toBe(2);
expect((body2 as any).records[0].executionTime).toBe(times[2]);
expect((body2 as any).records[1].executionTime).toBe(times[1]);
});

it('handles sorting consistency with tie-breakers', async () => {
server = await startServer(BASE_OPTIONS);

for (let i = 0; i < 3; i++) {
await db.run(
`INSERT INTO scheduled_notifications
(payload, notification_type, target_recipient, execute_at, status)
VALUES (?, ?, ?, ?, ?)`,
[JSON.stringify({ test: true }), 'discord', 'test_user', new Date().toISOString(), 'COMPLETED']
);
}

const sameTime = '2026-06-20T10:00:00.000Z';

// Insert multiple records with the exact same execution_time
for (let i = 1; i <= 3; i++) {
await db.run(
`INSERT INTO notification_execution_log
(scheduled_notification_id, execution_attempt, execution_time, status, duration_ms)
VALUES (?, ?, ?, ?, ?)`,
[i, 1, sameTime, 'SUCCESS', 100]
);
}

const { status: status1, body: body1 } = await makeRequest(
server,
'/api/notifications/history?limit=2'
);

expect(status1).toBe(200);
expect((body1 as any).records.length).toBe(2);
expect((body1 as any).records[0].id).toBe(3); // DESC order
expect((body1 as any).records[1].id).toBe(2);
expect((body1 as any).nextCursor).toBeDefined();

const cursor = encodeURIComponent((body1 as any).nextCursor);
const { status: status2, body: body2 } = await makeRequest(
server,
`/api/notifications/history?limit=2&cursor=${cursor}`
);

expect(status2).toBe(200);
expect((body2 as any).records.length).toBe(1);
expect((body2 as any).records[0].id).toBe(1);
});
});

describe('GET /api/notifications/history database failures', () => {
Expand Down
Loading
Loading