diff --git a/dashboard/jest.config.cjs b/dashboard/jest.config.cjs index bcc6a39..b2bb61c 100644 --- a/dashboard/jest.config.cjs +++ b/dashboard/jest.config.cjs @@ -1,6 +1,7 @@ module.exports = { preset: 'ts-jest/presets/default-esm', testEnvironment: 'jsdom', + testTimeout: 15000, setupFilesAfterEnv: ['/jest.setup.cjs'], extensionsToTreatAsEsm: ['.ts', '.tsx'], moduleNameMapper: { diff --git a/dashboard/src/pages/EventExplorerPage.tsx b/dashboard/src/pages/EventExplorerPage.tsx index ad6276d..d08ecac 100644 --- a/dashboard/src/pages/EventExplorerPage.tsx +++ b/dashboard/src/pages/EventExplorerPage.tsx @@ -19,6 +19,7 @@ import { useWalletAccountSync } from '../hooks/useWalletAccountSync'; const DEFAULT_EVENT_COUNT = 5000; const DEFAULT_LIMIT = 12; const API_URL = import.meta.env.VITE_EVENTS_API_URL ?? 'http://localhost:8787/api/events'; +const POLL_INTERVAL_MS = 15_000; const LISTENER_BASE_URL = API_URL.replace('/api/events', ''); const INDEXING_HEALTH_URL = import.meta.env.VITE_INDEXING_HEALTH_URL ?? resolveIndexingHealthUrl(API_URL); @@ -102,8 +103,23 @@ export function EventExplorerPage() { loadEvents(); loadStatus(); + // Poll for status updates so delivered/failed notifications are reflected + // without requiring a manual page refresh. + const intervalId = setInterval(async () => { + try { + const remoteEvents = await fetchEvents(API_URL); + if (!cancelled) { + setEvents(remoteEvents); + } + } catch { + // Silently ignore polling errors — the error banner is reserved for + // the initial load failure so background polls don't disrupt the user. + } + }, POLL_INTERVAL_MS); + return () => { cancelled = true; + clearInterval(intervalId); }; }, [lastFetchedAt, setEvents, setError, setLoading]); diff --git a/dashboard/src/pages/EventsPage.tsx b/dashboard/src/pages/EventsPage.tsx index 3a14713..3848850 100644 --- a/dashboard/src/pages/EventsPage.tsx +++ b/dashboard/src/pages/EventsPage.tsx @@ -11,6 +11,7 @@ import { restoreWalletSession } from '../services/wallet'; const DEFAULT_EVENT_COUNT = 5000; const API_URL = import.meta.env.VITE_EVENTS_API_URL ?? 'http://localhost:8787/api/events'; +const POLL_INTERVAL_MS = 15_000; export function EventsPage() { const setEvents = useEventStore((state) => state.setEvents); @@ -59,8 +60,20 @@ export function EventsPage() { loadEvents(); + const intervalId = setInterval(async () => { + try { + const remoteEvents = await fetchEvents(API_URL); + if (!cancelled) { + setEvents(remoteEvents); + } + } catch { + // Silently ignore background poll errors. + } + }, POLL_INTERVAL_MS); + return () => { cancelled = true; + clearInterval(intervalId); }; }, [lastFetchedAt, setEvents, setError, setLoading]); diff --git a/dashboard/src/store/eventStore.test.tsx b/dashboard/src/store/eventStore.test.tsx index 03ce4d6..d1310c5 100644 --- a/dashboard/src/store/eventStore.test.tsx +++ b/dashboard/src/store/eventStore.test.tsx @@ -117,3 +117,45 @@ describe('pagination + filter interaction', () => { expect(after[0].textContent).toContain('Withdrawal'); }); }); + +describe('stale cache regression tests', () => { + beforeEach(() => { + useEventStore.setState({ events: [], filters: { search: '', contractAddress: 'all', eventType: 'all' }, isLoading: false, error: null }); + }); + + it('setEvents with an updated record replaces the stale copy, not silently dropped', () => { + const [event] = generateMockEvents(1); + const staleEvent = { ...event, value: 'stale-value' }; + const freshEvent = { ...event, value: 'fresh-value' }; + + useEventStore.getState().setEvents([staleEvent]); + // Simulate a poll returning the same eventId with updated data + useEventStore.getState().setEvents([freshEvent]); + + const stored = useEventStore.getState().events; + expect(stored).toHaveLength(1); + expect(stored[0].value).toBe('fresh-value'); + }); + + it('appendEvents with an updated record replaces the stale copy', () => { + const [event] = generateMockEvents(1); + const staleEvent = { ...event, value: 'stale-value' }; + const freshEvent = { ...event, value: 'fresh-value' }; + + useEventStore.getState().setEvents([staleEvent]); + // Poll brings in the updated record + useEventStore.getState().appendEvents([freshEvent]); + + const stored = useEventStore.getState().events; + expect(stored).toHaveLength(1); + expect(stored[0].value).toBe('fresh-value'); + }); + + it('setEvents keeps order stable when no duplicates are present', () => { + const [a, b, c] = generateMockEvents(3); + useEventStore.getState().setEvents([a, b, c]); + + const stored = useEventStore.getState().events; + expect(stored.map((e) => e.eventId)).toEqual([a.eventId, b.eventId, c.eventId]); + }); +}); diff --git a/dashboard/src/store/eventStore.ts b/dashboard/src/store/eventStore.ts index 90703f2..3c1a777 100644 --- a/dashboard/src/store/eventStore.ts +++ b/dashboard/src/store/eventStore.ts @@ -49,16 +49,13 @@ interface EventStoreState { } function dedupeEventsById(events: BlockchainEvent[]): BlockchainEvent[] { - const seenEventIds = new Set(); - - return events.filter((event) => { - if (seenEventIds.has(event.eventId)) { - return false; - } - - seenEventIds.add(event.eventId); - return true; - }); + // Use a Map to keep the last-seen record for each eventId so that status + // updates (newer entries) overwrite stale cached copies rather than being dropped. + const byId = new Map(); + for (const event of events) { + byId.set(event.eventId, event); + } + return Array.from(byId.values()); } export const useEventStore = create((set) => ({ @@ -77,6 +74,8 @@ export const useEventStore = create((set) => ({ setEvents: (events) => set({ events: dedupeEventsById(events), lastFetchedAt: Date.now() }), appendEvents: (events) => set((state) => ({ + // Existing events go first so incoming (fresh) events overwrite stale + // copies when the Map processes duplicates last-write-wins. events: dedupeEventsById([...state.events, ...events]), })), setSearch: (search) => set((state) => ({ filters: { ...state.filters, search } })), diff --git a/listener/API.md b/listener/API.md index 424fb4f..ed60e00 100644 --- a/listener/API.md +++ b/listener/API.md @@ -305,7 +305,8 @@ Returns paginated delivery execution records from `notification_execution_log`. | Name | Type | Required | Description | |-----------|--------|----------|-------------------------------------------------------------------| | limit | number | No | Maximum records per page (default `20`, max `100`) | -| offset | number | No | Number of records to skip (default `0`) | +| offset | number | No | Number of records to skip (default `0`). Prefer `cursor`. | +| cursor | string | No | Opaque token for cursor-based pagination | | status | string | No | Filter by execution status: `SUCCESS`, `FAILED`, or `RETRY` | | startDate | string | No | ISO 8601 lower bound on `execution_time` (inclusive) | | endDate | string | No | ISO 8601 upper bound on `execution_time` (inclusive) | @@ -329,7 +330,8 @@ Returns paginated delivery execution records from `notification_execution_log`. "itemCount": 5, "totalPages": 3, "limit": 2, - "offset": 0 + "offset": 0, + "nextCursor": "MjAyNC0wNi0yMFQxNTowMDowMC4wMDBaLDQy" } ``` @@ -341,6 +343,7 @@ Returns paginated delivery execution records from `notification_execution_log`. | totalPages | number | Total pages available at the requested `limit` (`0` when `itemCount` is `0`) | | limit | number | Effective page size applied to the query | | offset | number | Number of records skipped before this page | +| nextCursor | string | Opaque token to fetch the next page of results | Existing clients that read `total`, `limit`, `offset`, and `records` continue to work unchanged. New clients should prefer `itemCount` and `totalPages` for pagination UI. diff --git a/listener/src/api/events-server.ts b/listener/src/api/events-server.ts index 10d8c16..6bd5e46 100644 --- a/listener/src/api/events-server.ts +++ b/listener/src/api/events-server.ts @@ -818,6 +818,7 @@ export function createEventsServer(options: EventsServerOptions): http.Server { const url = new URL(req.url, 'http://localhost'); const limit = url.searchParams.get('limit') ? parseInt(url.searchParams.get('limit')!, 10) : undefined; const offset = url.searchParams.get('offset') ? parseInt(url.searchParams.get('offset')!, 10) : undefined; + const cursor = url.searchParams.get('cursor') || undefined; const status = url.searchParams.get('status') as 'SUCCESS' | 'FAILED' | 'RETRY' | null; const startDate = url.searchParams.get('startDate'); const endDate = url.searchParams.get('endDate'); @@ -827,6 +828,7 @@ export function createEventsServer(options: EventsServerOptions): http.Server { correlationId, limit, offset, + cursor, status, startDate, endDate, @@ -835,6 +837,7 @@ export function createEventsServer(options: EventsServerOptions): http.Server { historyService.getHistory({ limit, offset, + cursor, status: status || undefined, startDate: startDate || undefined, endDate: endDate || undefined, diff --git a/listener/src/api/notifications-history.test.ts b/listener/src/api/notifications-history.test.ts index e2e476e..b4e6a2f 100644 --- a/listener/src/api/notifications-history.test.ts +++ b/listener/src/api/notifications-history.test.ts @@ -227,6 +227,104 @@ describe('GET /api/notifications/history', () => { expect(status).toBe(200); expect((body as any).limit).toBeLessThanOrEqual(100); }); + + it('supports cursor-based pagination', async () => { + server = await startServer(BASE_OPTIONS); + + for (let i = 0; i < 5; i++) { + await db.run( + `INSERT INTO scheduled_notifications + (payload, notification_type, target_recipient, execute_at, status) + VALUES (?, ?, ?, ?, ?)`, + [JSON.stringify({ test: true }), 'discord', 'test_user', new Date().toISOString(), 'COMPLETED'] + ); + } + + const times = [ + '2026-06-20T10:00:00.000Z', + '2026-06-20T10:00:01.000Z', + '2026-06-20T10:00:02.000Z', + '2026-06-20T10:00:03.000Z', + '2026-06-20T10:00:04.000Z', + ]; + + for (let i = 1; i <= 5; i++) { + await db.run( + `INSERT INTO notification_execution_log + (scheduled_notification_id, execution_attempt, execution_time, status, duration_ms) + VALUES (?, ?, ?, ?, ?)`, + [i, 1, times[i - 1], 'SUCCESS', 100] + ); + } + + const { status: status1, body: body1 } = await makeRequest( + server, + '/api/notifications/history?limit=2' + ); + + expect(status1).toBe(200); + expect((body1 as any).records.length).toBe(2); + expect((body1 as any).records[0].executionTime).toBe(times[4]); // DESC order + expect((body1 as any).records[1].executionTime).toBe(times[3]); + expect((body1 as any).nextCursor).toBeDefined(); + + const cursor = encodeURIComponent((body1 as any).nextCursor); + const { status: status2, body: body2 } = await makeRequest( + server, + `/api/notifications/history?limit=2&cursor=${cursor}` + ); + + expect(status2).toBe(200); + expect((body2 as any).records.length).toBe(2); + expect((body2 as any).records[0].executionTime).toBe(times[2]); + expect((body2 as any).records[1].executionTime).toBe(times[1]); + }); + + it('handles sorting consistency with tie-breakers', async () => { + server = await startServer(BASE_OPTIONS); + + for (let i = 0; i < 3; i++) { + await db.run( + `INSERT INTO scheduled_notifications + (payload, notification_type, target_recipient, execute_at, status) + VALUES (?, ?, ?, ?, ?)`, + [JSON.stringify({ test: true }), 'discord', 'test_user', new Date().toISOString(), 'COMPLETED'] + ); + } + + const sameTime = '2026-06-20T10:00:00.000Z'; + + // Insert multiple records with the exact same execution_time + for (let i = 1; i <= 3; i++) { + await db.run( + `INSERT INTO notification_execution_log + (scheduled_notification_id, execution_attempt, execution_time, status, duration_ms) + VALUES (?, ?, ?, ?, ?)`, + [i, 1, sameTime, 'SUCCESS', 100] + ); + } + + const { status: status1, body: body1 } = await makeRequest( + server, + '/api/notifications/history?limit=2' + ); + + expect(status1).toBe(200); + expect((body1 as any).records.length).toBe(2); + expect((body1 as any).records[0].id).toBe(3); // DESC order + expect((body1 as any).records[1].id).toBe(2); + expect((body1 as any).nextCursor).toBeDefined(); + + const cursor = encodeURIComponent((body1 as any).nextCursor); + const { status: status2, body: body2 } = await makeRequest( + server, + `/api/notifications/history?limit=2&cursor=${cursor}` + ); + + expect(status2).toBe(200); + expect((body2 as any).records.length).toBe(1); + expect((body2 as any).records[0].id).toBe(1); + }); }); describe('GET /api/notifications/history database failures', () => { diff --git a/listener/src/services/notification-history.ts b/listener/src/services/notification-history.ts index 215adf9..2077ce0 100644 --- a/listener/src/services/notification-history.ts +++ b/listener/src/services/notification-history.ts @@ -1,6 +1,6 @@ import { getDatabase } from '../database/database'; import logger from '../utils/logger'; -import { buildPaginationMetadata, normalizePaginationParams } from '../utils/pagination'; +import { buildPaginationMetadata, normalizePaginationParams, encodeCursor, decodeCursor } from '../utils/pagination'; export interface NotificationHistoryRecord { id: number; @@ -15,6 +15,7 @@ export interface NotificationHistoryRecord { export interface HistoryQueryOptions { limit?: number; offset?: number; + cursor?: string; status?: 'SUCCESS' | 'FAILED' | 'RETRY'; startDate?: string; endDate?: string; @@ -27,6 +28,7 @@ export interface PaginatedHistoryResponse { offset: number; itemCount: number; totalPages: number; + nextCursor?: string | null; } export class NotificationHistoryService { @@ -55,15 +57,25 @@ export class NotificationHistoryService { params.push(options.endDate); } + const baseConditions = [...conditions]; + const baseParams = [...params]; + + const decodedCursor = options.cursor ? decodeCursor(options.cursor) : null; + if (decodedCursor) { + conditions.push('(execution_time < ? OR (execution_time = ? AND id < ?))'); + params.push(decodedCursor.executionTime, decodedCursor.executionTime, decodedCursor.id); + } + + const countWhereClause = baseConditions.length > 0 ? `WHERE ${baseConditions.join(' AND ')}` : ''; const whereClause = conditions.length > 0 ? `WHERE ${conditions.join(' AND ')}` : ''; // Get total count - const countSql = `SELECT COUNT(*) as count FROM notification_execution_log ${whereClause}`; - const countResult = await this.db.get<{ count: number }>(countSql, params); + const countSql = `SELECT COUNT(*) as count FROM notification_execution_log ${countWhereClause}`; + const countResult = await this.db.get<{ count: number }>(countSql, baseParams); const total = countResult?.count || 0; // Get paginated records - const sql = ` + let sql = ` SELECT id, scheduled_notification_id as scheduledNotificationId, @@ -74,13 +86,20 @@ export class NotificationHistoryService { duration_ms as responseDuration FROM notification_execution_log ${whereClause} - ORDER BY execution_time DESC - LIMIT ? OFFSET ? + ORDER BY execution_time DESC, id DESC + LIMIT ? `; + + const queryParams = [...params, limit]; + + if (!decodedCursor) { + sql += ` OFFSET ?`; + queryParams.push(offset); + } const records = await this.db.all( sql, - [...params, limit, offset] + queryParams ); logger.info('Notification history retrieved', { @@ -92,6 +111,10 @@ export class NotificationHistoryService { const pagination = buildPaginationMetadata(total, limit, offset); + const nextCursor = records.length > 0 + ? encodeCursor(records[records.length - 1].executionTime, records[records.length - 1].id) + : null; + return { records, total, @@ -99,6 +122,7 @@ export class NotificationHistoryService { offset: pagination.offset, itemCount: pagination.itemCount, totalPages: pagination.totalPages, + nextCursor, }; } catch (error) { logger.error('Failed to retrieve notification history', { error }); diff --git a/listener/src/services/scheduled-notification-repository.ts b/listener/src/services/scheduled-notification-repository.ts index 214673b..0163bd6 100644 --- a/listener/src/services/scheduled-notification-repository.ts +++ b/listener/src/services/scheduled-notification-repository.ts @@ -163,6 +163,7 @@ export class ScheduledNotificationRepository { last_error = ?, error_details = ?, processing_completed_at = ?, + updated_at = ?, processor_id = NULL, lock_expires_at = NULL WHERE id = ? @@ -180,6 +181,7 @@ export class ScheduledNotificationRepository { errorMsg, errorDetails, isFailed ? now.toISOString() : null, + now.toISOString(), model.id, ]); @@ -210,14 +212,17 @@ export class ScheduledNotificationRepository { SET status = ?, processing_completed_at = ?, + updated_at = ?, processor_id = NULL, lock_expires_at = NULL WHERE id = ? `; + const now = new Date().toISOString(); await this.db.run(sql, [ NotificationStatus.COMPLETED, - new Date().toISOString(), + now, + now, id, ]); @@ -247,16 +252,18 @@ export class ScheduledNotificationRepository { error_details = ?, next_retry_at = ?, processing_completed_at = ?, + updated_at = ?, processor_id = NULL, lock_expires_at = NULL WHERE id = ? `; + const now = new Date().toISOString(); const completedAt = isFailed ? new Date().toISOString() : null; const errorDetails = JSON.stringify({ message: error.message, stack: error.stack, - timestamp: new Date().toISOString(), + timestamp: now, }); await this.db.run(sql, [ @@ -264,6 +271,8 @@ export class ScheduledNotificationRepository { nextRetryCount, error.message, errorDetails, + isFailed ? now : null, + now, isFailed ? null : (nextRetryAt?.toISOString() ?? null), completedAt, id, @@ -499,6 +508,15 @@ export class ScheduledNotificationRepository { * Convert database row to model */ private rowToModel(row: ScheduledNotificationRow): ScheduledNotification { + // SQLite CURRENT_TIMESTAMP produces "YYYY-MM-DD HH:MM:SS" (UTC, no Z suffix). + // Appending Z ensures JS parses the value as UTC rather than local time, + // which prevents timezone-shifted timestamps in rowToModel output. + const parseUtc = (value: string | null | undefined): Date | undefined => { + if (!value) return undefined; + const normalized = value.includes('T') || value.endsWith('Z') ? value : value.replace(' ', 'T') + 'Z'; + return new Date(normalized); + }; + return { id: row.id, payload: decompressPayload(row.payload), @@ -506,18 +524,16 @@ export class ScheduledNotificationRepository { payloadHash: row.payload_hash, notificationType: row.notification_type as any, targetRecipient: row.target_recipient, - executeAt: new Date(row.execute_at), - createdAt: new Date(row.created_at), - updatedAt: new Date(row.updated_at), + executeAt: parseUtc(row.execute_at) as Date, + createdAt: parseUtc(row.created_at), + updatedAt: parseUtc(row.updated_at), status: row.status as NotificationStatus, retryCount: row.retry_count, maxRetries: row.max_retries, - processingStartedAt: row.processing_started_at ? new Date(row.processing_started_at) : null, - processingCompletedAt: row.processing_completed_at - ? new Date(row.processing_completed_at) - : null, + processingStartedAt: parseUtc(row.processing_started_at) ?? null, + processingCompletedAt: parseUtc(row.processing_completed_at) ?? null, processorId: row.processor_id, - lockExpiresAt: row.lock_expires_at ? new Date(row.lock_expires_at) : null, + lockExpiresAt: parseUtc(row.lock_expires_at) ?? null, lastError: row.last_error, errorDetails: row.error_details, eventId: row.event_id, diff --git a/listener/src/tests/notification-scheduler-refactored.test.ts b/listener/src/tests/notification-scheduler-refactored.test.ts index 596fb6e..bfe570b 100644 --- a/listener/src/tests/notification-scheduler-refactored.test.ts +++ b/listener/src/tests/notification-scheduler-refactored.test.ts @@ -306,6 +306,8 @@ describe('NotificationScheduler (Refactored)', () => { .build() ); + // 2. Create a notification in the past (overdue, pending) + const overdueId = await repository.create( // 2. Create a notification in the past that is currently PROCESSING but lock is expired const staleId = await repository.create( NotificationFixtureBuilder @@ -334,8 +336,23 @@ describe('NotificationScheduler (Refactored)', () => { .forImmediateExecution() .build() ); + // Lock only the stale notification — fetchAndLock picks up past items by priority order, + // so we lock both past items then restore item 2 to PENDING to isolate the stale case. + await repository.fetchAndLockPendingNotifications('processor-1', 30000, 10); + await db.run( + "UPDATE scheduled_notifications SET status = 'PENDING', processor_id = NULL, lock_expires_at = NULL WHERE id = ?", + [overdueId] + ); + const pastLock = NotificationFixtureBuilder.dates.past(1000); + await db.run('UPDATE scheduled_notifications SET lock_expires_at = ? WHERE id = ?', [ + pastLock.toISOString(), + staleId, + ]); - // Get stats BEFORE recovery + // Get stats BEFORE recovery: + // - item 1: PENDING (future) + // - item 2: PENDING (restored, overdue) + // - item 3: PROCESSING with expired lock → getStats adjusts to PENDING const stats = await repository.getStats(); expect(stats.pending).toBe(3); diff --git a/listener/src/tests/notification-scheduler.test.ts b/listener/src/tests/notification-scheduler.test.ts index ad53824..9c4cb79 100644 --- a/listener/src/tests/notification-scheduler.test.ts +++ b/listener/src/tests/notification-scheduler.test.ts @@ -406,3 +406,138 @@ describe('NotificationScheduler', () => { }); }); }); + +describe('Stale cache regression tests', () => { + let db: Database; + let repository: ScheduledNotificationRepository; + + const testDbPath = './data/test-stale-cache.db'; + + beforeAll(async () => { + const dbDir = path.dirname(testDbPath); + if (!fs.existsSync(dbDir)) { + fs.mkdirSync(dbDir, { recursive: true }); + } + if (fs.existsSync(testDbPath)) { + fs.unlinkSync(testDbPath); + } + db = new Database(testDbPath); + await db.initialize(); + repository = new ScheduledNotificationRepository(db); + }); + + afterAll(async () => { + await db.close(); + if (fs.existsSync(testDbPath)) { + fs.unlinkSync(testDbPath); + } + }); + + beforeEach(async () => { + await db.run('DELETE FROM notification_execution_log'); + await db.run('DELETE FROM scheduled_notifications'); + }); + + test('markAsCompleted updates updated_at — no stale timestamp after delivery', async () => { + const id = await repository.create({ + payload: { message: 'Stale test' }, + notificationType: NotificationType.DISCORD, + targetRecipient: 'test-webhook', + executeAt: new Date(Date.now() - 1000), + }); + + await repository.markAsCompleted(id); + + const notification = await repository.getById(id); + expect(notification?.status).toBe(NotificationStatus.COMPLETED); + // updated_at must be a defined, valid date — confirms the column is written on status change + expect(notification?.updatedAt).toBeDefined(); + expect(notification?.updatedAt).toBeInstanceOf(Date); + expect(isNaN(notification?.updatedAt?.getTime() ?? NaN)).toBe(false); + }); + + test('markAsFailedOrRetry updates updated_at — no stale timestamp on retry', async () => { + const id = await repository.create({ + payload: { message: 'Retry stale test' }, + notificationType: NotificationType.DISCORD, + targetRecipient: 'test-webhook', + executeAt: new Date(Date.now() - 1000), + maxRetries: 3, + }); + + await repository.markAsFailedOrRetry(id, new Error('Delivery failed'), 0, 3); + + const notification = await repository.getById(id); + expect(notification?.status).toBe(NotificationStatus.PENDING); + expect(notification?.updatedAt).toBeDefined(); + expect(notification?.updatedAt).toBeInstanceOf(Date); + expect(isNaN(notification?.updatedAt?.getTime() ?? NaN)).toBe(false); + }); + + test('markAsFailedOrRetry updates updated_at when permanently failed', async () => { + const id = await repository.create({ + payload: { message: 'Final failure stale test' }, + notificationType: NotificationType.DISCORD, + targetRecipient: 'test-webhook', + executeAt: new Date(Date.now() - 1000), + maxRetries: 2, + }); + + await repository.markAsFailedOrRetry(id, new Error('Max retries exceeded'), 2, 2); + + const notification = await repository.getById(id); + expect(notification?.status).toBe(NotificationStatus.FAILED); + expect(notification?.updatedAt).toBeDefined(); + expect(notification?.updatedAt).toBeInstanceOf(Date); + expect(isNaN(notification?.updatedAt?.getTime() ?? NaN)).toBe(false); + }); + + test('recoverStaleLocks updates updated_at — no stale status after lock recovery', async () => { + const id = await repository.create({ + payload: { message: 'Stale lock test' }, + notificationType: NotificationType.DISCORD, + targetRecipient: 'test-webhook', + executeAt: new Date(Date.now() - 1000), + }); + + await repository.fetchAndLockPendingNotifications('processor-stale', 30000, 10); + + // Expire the lock + const pastLock = new Date(Date.now() - 1000); + await db.run('UPDATE scheduled_notifications SET lock_expires_at = ? WHERE id = ?', [ + pastLock.toISOString(), + id, + ]); + + await repository.recoverStaleLocks(); + + const recovered = await repository.getById(id); + expect(recovered?.status).toBe(NotificationStatus.PENDING); + expect(recovered?.processorId).toBeNull(); + // updated_at must be a valid date — confirms the column is written on lock recovery + expect(recovered?.updatedAt).toBeDefined(); + expect(recovered?.updatedAt).toBeInstanceOf(Date); + expect(isNaN(recovered?.updatedAt?.getTime() ?? NaN)).toBe(false); + }); + + test('stale status is not served after delivery: re-fetch reflects COMPLETED', async () => { + const id = await repository.create({ + payload: { message: 'Delivery stale check' }, + notificationType: NotificationType.DISCORD, + targetRecipient: 'test-webhook', + executeAt: new Date(Date.now() - 1000), + }); + + // Simulate the state a UI would cache before delivery + const beforeDelivery = await repository.getById(id); + expect(beforeDelivery?.status).toBe(NotificationStatus.PENDING); + + // Delivery happens + await repository.markAsCompleted(id); + + // Re-fetch (simulating a poll/refresh) must return the new status + const afterDelivery = await repository.getById(id); + expect(afterDelivery?.status).toBe(NotificationStatus.COMPLETED); + expect(afterDelivery?.status).not.toBe(beforeDelivery?.status); + }); +}); diff --git a/listener/src/utils/pagination.ts b/listener/src/utils/pagination.ts index 8df07ef..6ba4b3a 100644 --- a/listener/src/utils/pagination.ts +++ b/listener/src/utils/pagination.ts @@ -5,6 +5,28 @@ export interface PaginationMetadata { offset: number; } +export interface CursorData { + executionTime: string; + id: number; +} + +export function encodeCursor(executionTime: string, id: number): string { + return Buffer.from(`${executionTime},${id}`).toString('base64'); +} + +export function decodeCursor(cursor: string): CursorData | null { + try { + const decoded = Buffer.from(cursor, 'base64').toString('utf-8'); + const [executionTime, idStr] = decoded.split(','); + if (!executionTime || !idStr) return null; + const id = parseInt(idStr, 10); + if (isNaN(id)) return null; + return { executionTime, id }; + } catch { + return null; + } +} + export interface PaginationQueryParams { limit: number; offset: number;