From 4a8a8c455eecada1c9628a0aabecb54fe1a55a2f Mon Sep 17 00:00:00 2001 From: Mayank Raj Date: Thu, 28 May 2026 04:40:03 +0530 Subject: [PATCH 1/3] feat: bulk event ingestion (POST /track/batch) for offline-first SDKs Reimplements the batch ingestion feature from PR #374 on top of current upstream/main (post buffer-perf, kafka client, ClickHouse round-robin). Adds __syncedAt property to all worker-processed events. See PR description for full architectural details. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/api/src/controllers/event.controller.ts | 7 +- apps/api/src/controllers/track.controller.ts | 320 +++++++++--- .../api/src/routes/track-batch.router.test.ts | 455 ++++++++++++++++++ apps/api/src/routes/track.router.ts | 47 +- apps/api/src/utils/ids.ts | 24 +- apps/worker/src/jobs/events.incoming-event.ts | 153 ++++-- .../src/jobs/events.incoming-events.test.ts | 125 ++++- packages/queue/src/queues.ts | 1 - packages/validation/src/track.validation.ts | 13 + 9 files changed, 1022 insertions(+), 123 deletions(-) create mode 100644 apps/api/src/routes/track-batch.router.test.ts diff --git a/apps/api/src/controllers/event.controller.ts b/apps/api/src/controllers/event.controller.ts index 59a6d0f4a..9f7808b9f 100644 --- a/apps/api/src/controllers/event.controller.ts +++ b/apps/api/src/controllers/event.controller.ts @@ -14,10 +14,7 @@ export async function postEvent( }>, reply: FastifyReply ) { - const { timestamp, isTimestampFromThePast } = getTimestamp( - request.timestamp, - request.body - ); + const { timestamp } = getTimestamp(request.timestamp, request.body); const ip = request.clientIp; const ua = request.headers['user-agent'] ?? 'unknown/1.0'; const projectId = request.client?.projectId; @@ -34,6 +31,7 @@ export async function postEvent( ip, ua, salts, + eventMs: new Date(timestamp).getTime(), }); const uaInfo = parseUserAgent(ua, request.body?.properties); @@ -48,7 +46,6 @@ export async function postEvent( event: { ...request.body, timestamp, - isTimestampFromThePast, }, uaInfo, geo, diff --git a/apps/api/src/controllers/track.controller.ts b/apps/api/src/controllers/track.controller.ts index bf6b2d4e3..fc0e180b6 100644 --- a/apps/api/src/controllers/track.controller.ts +++ b/apps/api/src/controllers/track.controller.ts @@ -22,14 +22,32 @@ import type { IIdentifyPayload, IIncrementPayload, IReplayPayload, + ITrackBatchBody, ITrackHandlerPayload, ITrackPayload, } from '@openpanel/validation'; +import { zTrackHandlerPayload } from '@openpanel/validation'; import type { FastifyReply, FastifyRequest } from 'fastify'; import { assocPath, pathOr, pick } from 'ramda'; import { HttpError } from '@/utils/errors'; import { getDeviceId } from '@/utils/ids'; +type Salts = Awaited>; + +/** + * Per-request data that is identical for every event in a batch. + * Computed once in the batch handler so we don't re-fetch salts/geo + * or re-parse headers N times. + */ +interface SharedRequestContext { + projectId: string; + requestIp: string; + requestUa: string; + requestHeaders: Record; + requestGeo: GeoLocation; + salts: Salts; +} + export function getStringHeaders(headers: FastifyRequest['headers']) { return Object.entries( pick( @@ -82,7 +100,7 @@ export function getTimestamp( : undefined; if (!userDefinedTimestamp) { - return { timestamp: safeTimestamp, isTimestampFromThePast: false }; + return { timestamp: safeTimestamp }; } const clientTimestamp = new Date(userDefinedTimestamp); @@ -90,24 +108,26 @@ export function getTimestamp( // Constants for time validation const ONE_MINUTE_MS = 60 * 1000; - const FIFTEEN_MINUTES_MS = 15 * ONE_MINUTE_MS; + // Hard floor for accepted historical events. Public contract for /track + // and /track/batch, hard-coded (not per-project configurable). + const FIVE_DAYS_MS = 5 * 24 * 60 * 60 * 1000; // Use safeTimestamp if invalid or more than 1 minute in the future if ( Number.isNaN(clientTimestampNumber) || clientTimestampNumber > safeTimestamp + ONE_MINUTE_MS ) { - return { timestamp: safeTimestamp, isTimestampFromThePast: false }; + return { timestamp: safeTimestamp }; } - // isTimestampFromThePast is true only if timestamp is older than 15 minutes - const isTimestampFromThePast = - clientTimestampNumber < safeTimestamp - FIFTEEN_MINUTES_MS; + // Reject events older than 5 days. In /track/batch this surfaces as a + // per-row { reason: 'validation' } entry in rejected[]; in single-event + // /track it returns 400 to the caller. + if (clientTimestampNumber < safeTimestamp - FIVE_DAYS_MS) { + throw new HttpError('event timestamp older than 5 days', { status: 400 }); + } - return { - timestamp: clientTimestampNumber, - isTimestampFromThePast, - }; + return { timestamp: clientTimestampNumber }; } interface TrackContext { @@ -115,35 +135,72 @@ interface TrackContext { ip: string; ua?: string; headers: Record; - timestamp: { value: number; isFromPast: boolean }; + timestamp: { value: number }; identity?: IIdentifyPayload; deviceId: string; sessionId: string; geo: GeoLocation; } -async function buildContext( - request: FastifyRequest<{ - Body: ITrackHandlerPayload; - }>, - validatedBody: ITrackHandlerPayload -): Promise { +/** + * Build the per-request shared context. Done once per HTTP request — for + * single-event /track this is just an extra struct; for /track/batch it + * lets N events share one salts + one geo lookup. + */ +async function buildSharedRequestContext( + request: FastifyRequest, +): Promise { const projectId = request.client?.projectId; if (!projectId) { throw new HttpError('Missing projectId', { status: 400 }); } - const timestamp = getTimestamp(request.timestamp, validatedBody.payload); - const ip = + const requestIp = request.clientIp; + const requestUa = request.headers['user-agent'] ?? 'unknown/1.0'; + const requestHeaders = getStringHeaders(request.headers); + + const [requestGeo, salts] = await Promise.all([ + getGeoLocation(requestIp), + getSalts(), + ]); + + return { + projectId, + requestIp, + requestUa, + requestHeaders, + requestGeo, + salts, + }; +} + +/** + * Build a per-event TrackContext from already-fetched shared data. + * Per-event work: timestamp, identity, ip override, deviceId, and a + * second geo lookup *only* if the event overrides __ip. + */ +async function buildEventContext( + shared: SharedRequestContext, + requestTimestamp: FastifyRequest['timestamp'], + validatedBody: ITrackHandlerPayload, +): Promise { + const timestamp = getTimestamp(requestTimestamp, validatedBody.payload); + + const overrideIp = validatedBody.type === 'track' && validatedBody.payload.properties?.__ip ? (validatedBody.payload.properties.__ip as string) - : request.clientIp; - const ua = request.headers['user-agent'] ?? 'unknown/1.0'; + : undefined; + const ip = overrideIp ?? shared.requestIp; + + // Only re-fetch geo when the event overrode the IP — the common case + // (browser SDK, no __ip) reuses the request-level geo computed once. + const geo = + overrideIp && overrideIp !== shared.requestIp + ? await getGeoLocation(overrideIp) + : shared.requestGeo; - const headers = getStringHeaders(request.headers); const identity = getIdentity(validatedBody); const profileId = identity?.profileId; - if (profileId && validatedBody.type === 'track') { validatedBody.payload.profileId = profileId; } @@ -154,25 +211,25 @@ async function buildContext( ? validatedBody.payload?.properties.__deviceId : undefined; - // Get geo location (needed for track and identify) - const [geo, salts] = await Promise.all([getGeoLocation(ip), getSalts()]); - const deviceIdResult = await getDeviceId({ - projectId, + projectId: shared.projectId, ip, - ua, - salts, + ua: shared.requestUa, + salts: shared.salts, overrideDeviceId, + // Bucket the deterministic session_id by the event's own __timestamp, + // not the wall-clock moment the request arrived. Critical for + // /track/batch where one request can contain events spanning days. + eventMs: timestamp.timestamp, }); return { - projectId, + projectId: shared.projectId, ip, - ua, - headers, + ua: shared.requestUa, + headers: shared.requestHeaders, timestamp: { value: timestamp.timestamp, - isFromPast: timestamp.isTimestampFromThePast, }, identity, deviceId: deviceIdResult.deviceId, @@ -181,6 +238,16 @@ async function buildContext( }; } +async function buildContext( + request: FastifyRequest<{ + Body: ITrackHandlerPayload; + }>, + validatedBody: ITrackHandlerPayload, +): Promise { + const shared = await buildSharedRequestContext(request); + return buildEventContext(shared, request.timestamp, validatedBody); +} + async function handleTrack( payload: ITrackPayload, context: TrackContext @@ -208,7 +275,6 @@ async function handleTrack( ...payload, groups: payload.groups ?? [], timestamp: timestamp.value, - isTimestampFromThePast: timestamp.isFromPast, }, uaInfo, geo, @@ -361,15 +427,61 @@ async function handleAssignGroup( }); } +/** + * Dispatch a validated event to the matching per-type handler. Shared by + * /track and /track/batch. Throws HttpError(400) for the unsupported `alias` + * type so single-event and batch can both surface it consistently. + */ +async function dispatchEvent( + body: ITrackHandlerPayload, + context: TrackContext, +): Promise { + switch (body.type) { + case 'alias': + throw new HttpError('Alias is not supported', { status: 400 }); + case 'track': + await handleTrack(body.payload, context); + return; + case 'identify': + await handleIdentify(body.payload, context); + return; + case 'increment': + await handleIncrement(body.payload, context); + return; + case 'decrement': + await handleDecrement(body.payload, context); + return; + case 'replay': + await handleReplay(body.payload, context); + return; + case 'group': + await handleGroup(body.payload, context); + return; + case 'assign_group': + await handleAssignGroup(body.payload, context); + return; + default: { + // Exhaustiveness check: `body` narrows to `never` when every variant + // of ITrackHandlerPayload['type'] is handled. Adding a new variant + // makes this assignment fail to compile. + const exhaustive: never = body; + throw new HttpError(`Unhandled event type: ${exhaustive}`, { + status: 400, + }); + } + } +} + export async function handler( request: FastifyRequest<{ Body: ITrackHandlerPayload; }>, - reply: FastifyReply + reply: FastifyReply, ) { const validatedBody = request.body; - // Handle alias (not supported) + // Reject `alias` before building context — saves the salts/geo/deviceId work + // for a request that's going to fail anyway. if (validatedBody.type === 'alias') { return reply.status(400).send({ status: 400, @@ -378,39 +490,8 @@ export async function handler( }); } - // Build request context const context = await buildContext(request, validatedBody); - - // Dispatch to appropriate handler - switch (validatedBody.type) { - case 'track': - await handleTrack(validatedBody.payload, context); - break; - case 'identify': - await handleIdentify(validatedBody.payload, context); - break; - case 'increment': - await handleIncrement(validatedBody.payload, context); - break; - case 'decrement': - await handleDecrement(validatedBody.payload, context); - break; - case 'replay': - await handleReplay(validatedBody.payload, context); - break; - case 'group': - await handleGroup(validatedBody.payload, context); - break; - case 'assign_group': - await handleAssignGroup(validatedBody.payload, context); - break; - default: - return reply.status(400).send({ - status: 400, - error: 'Bad Request', - message: 'Invalid type', - }); - } + await dispatchEvent(validatedBody, context); reply.status(200).send({ deviceId: context.deviceId, @@ -418,6 +499,109 @@ export async function handler( }); } +type BatchItemResult = + | { index: number; status: 'accepted' } + | { + index: number; + status: 'rejected'; + reason: 'validation' | 'internal'; + error: string; + }; + +/** + * POST /track/batch — accepts up to TRACK_BATCH_MAX_EVENTS payloads in one + * request and dispatches each through the same per-event pipeline as /track. + * + * Per-event validation failures do NOT fail the whole batch: the response is + * always 202 (assuming envelope + auth pass) with `{ accepted, rejected[] }` + * so callers can fix and retry just the bad indices. + * + * Optimization: salts + request-IP geo are fetched once and shared across + * all events. Events that override `__ip` still get their own geo lookup. + */ +// Bounded concurrency for per-event processing inside a batch. Each event +// hits Redis (session lookup + groupmq queue add) and may trigger a geo +// lookup if it overrides `__ip`, so an unbounded `Promise.all` over 2000 +// events can spike Redis pool usage and the geo provider's rate budget on +// smaller self-hosted instances. 50 keeps the pipeline saturated without +// turning a single big batch into a thundering herd. +const BATCH_CONCURRENCY = 50; + +export async function batchHandler( + request: FastifyRequest<{ + Body: ITrackBatchBody; + }>, + reply: FastifyReply, +) { + const { events } = request.body; + const shared = await buildSharedRequestContext(request); + + const processOne = async ( + raw: unknown, + index: number, + ): Promise => { + const parsed = zTrackHandlerPayload.safeParse(raw); + if (!parsed.success) { + const issue = parsed.error.issues[0]; + const path = issue?.path?.join('.') ?? ''; + const error = path + ? `${path}: ${issue?.message}` + : issue?.message ?? 'invalid payload'; + return { index, status: 'rejected', reason: 'validation', error }; + } + + try { + const context = await buildEventContext( + shared, + request.timestamp, + parsed.data, + ); + await dispatchEvent(parsed.data, context); + return { index, status: 'accepted' }; + } catch (err) { + // HttpError with 4xx → caller's fault (validation-style: alias, + // unknown type, replay without session). Anything else → ours. + const isClientError = + err instanceof HttpError && err.status >= 400 && err.status < 500; + const reason: 'validation' | 'internal' = isClientError + ? 'validation' + : 'internal'; + const message = err instanceof Error ? err.message : 'unknown error'; + if (!isClientError) { + request.log.error( + { err, index }, + 'Batch event dispatch failed', + ); + } + return { index, status: 'rejected', reason, error: message }; + } + }; + + // Process in chunks of BATCH_CONCURRENCY. We keep results aligned with + // input indices via the `index` field on each BatchItemResult. + const results: BatchItemResult[] = new Array(events.length); + for (let start = 0; start < events.length; start += BATCH_CONCURRENCY) { + const end = Math.min(start + BATCH_CONCURRENCY, events.length); + const chunk = await Promise.all( + events.slice(start, end).map((raw, i) => processOne(raw, start + i)), + ); + for (const r of chunk) { + results[r.index] = r; + } + } + + const accepted = results.filter((r) => r.status === 'accepted').length; + const rejected = results.filter( + (r): r is Extract => + r.status === 'rejected', + ); + + reply.status(202).send({ + accepted, + rejected, + }); +} + export async function fetchDeviceId( request: FastifyRequest, reply: FastifyReply diff --git a/apps/api/src/routes/track-batch.router.test.ts b/apps/api/src/routes/track-batch.router.test.ts new file mode 100644 index 000000000..adcd4c462 --- /dev/null +++ b/apps/api/src/routes/track-batch.router.test.ts @@ -0,0 +1,455 @@ +/** + * Integration tests for POST /track/batch. + * + * Side effects (queue, db, geo, redis) are mocked so the test runs without + * Docker. Auth uses the same getClientByIdCached mock as the insights + * router tests, except here we don't need real fixtures — we never read + * from PG/CH, we only verify the controller dispatches each item correctly. + */ + +import { afterAll, beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'; + +// ─── Module mocks (hoisted before imports) ──────────────────────────────────── +// +// `vi.mock` is hoisted above all top-level statements, so any spies the factory +// references must be created via `vi.hoisted(...)` (also hoisted) — otherwise +// the factory runs first and hits a temporal-dead-zone ReferenceError. + +const { queueAdd, upsertProfileMock } = vi.hoisted(() => ({ + queueAdd: vi.fn().mockResolvedValue(undefined), + upsertProfileMock: vi.fn().mockResolvedValue(undefined), +})); + +vi.mock('@openpanel/db', async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + getClientByIdCached: vi.fn(), + getSalts: vi.fn().mockResolvedValue({ current: 'salt-a', previous: 'salt-b' }), + getProfileById: vi.fn().mockResolvedValue(null), + upsertProfile: upsertProfileMock, + groupBuffer: { add: vi.fn().mockResolvedValue(undefined) }, + replayBuffer: { add: vi.fn().mockResolvedValue(undefined) }, + }; +}); + +vi.mock('@openpanel/queue', async (importOriginal) => { + const actual = await importOriginal(); + return { + ...actual, + getEventsGroupQueueShard: vi.fn(() => ({ add: queueAdd })), + }; +}); + +vi.mock('@openpanel/geo', () => ({ + getGeoLocation: vi.fn().mockResolvedValue({ + country: 'US', + city: 'San Francisco', + region: 'CA', + longitude: -122.4, + latitude: 37.77, + }), +})); + +vi.mock('@openpanel/common/server', async (importOriginal) => { + const actual = + await importOriginal(); + return { + ...actual, + verifyPassword: vi.fn().mockResolvedValue(true), + generateDeviceId: vi.fn().mockReturnValue('device-test'), + }; +}); + +vi.mock('@openpanel/redis', async (importOriginal) => { + const actual = await importOriginal(); + const fakeRedisClient = new Proxy( + {}, + { + get: (_t, p) => { + if (p === 'status') return 'ready'; + if (p === 'multi') { + return () => ({ + hget: vi.fn().mockReturnThis(), + exec: vi.fn().mockResolvedValue([]), + }); + } + return vi.fn().mockResolvedValue(null); + }, + }, + ); + return { + ...actual, + getCache: async (_key: string, _ttl: number, fn: () => Promise) => + fn(), + getLock: vi.fn().mockResolvedValue(true), + getRedisCache: vi.fn().mockReturnValue(fakeRedisClient), + }; +}); + +// ─── Imports (after mocks) ──────────────────────────────────────────────────── + +import { ClientType, getClientByIdCached } from '@openpanel/db'; +import type { FastifyInstance } from 'fastify'; +import { buildApp } from '../app'; + +// ─── Test client constants ──────────────────────────────────────────────────── + +const CLIENT_ID = '00000000-0000-0000-0000-0000000000aa'; +const CLIENT_SECRET = 'test-secret'; +const PROJECT_ID = 'test-project'; +const ORG_ID = 'test-org'; + +const AUTH = { + 'openpanel-client-id': CLIENT_ID, + 'openpanel-client-secret': CLIENT_SECRET, + 'user-agent': 'Mozilla/5.0 (Macintosh) Chrome/120.0.0.0', + 'content-type': 'application/json', +}; + +const WRITE_CLIENT = { + id: CLIENT_ID, + type: ClientType.write, + projectId: PROJECT_ID, + organizationId: ORG_ID, + secret: 'hashed-secret', + name: 'Batch Test Client', + cors: ['*'], + description: '', + ignoreCorsAndSecret: true, + createdAt: new Date(), + updatedAt: new Date(), + project: { + id: PROJECT_ID, + organizationId: ORG_ID, + cors: ['*'], + filters: [], + allowUnsafeRevenueTracking: true, + }, +}; + +// ─── Lifecycle ──────────────────────────────────────────────────────────────── + +let app: FastifyInstance; + +beforeAll(async () => { + vi.mocked(getClientByIdCached).mockResolvedValue(WRITE_CLIENT as any); + app = await buildApp({ testing: true }); + await app.ready(); +}, 30_000); + +afterAll(async () => { + await app.close(); +}, 10_000); + +beforeEach(() => { + queueAdd.mockClear(); + upsertProfileMock.mockClear(); +}); + +// ─── Helpers ────────────────────────────────────────────────────────────────── + +function postBatch(body: unknown, headers: Record = AUTH) { + return app.inject({ + method: 'POST', + url: '/track/batch', + headers, + payload: body as any, + }); +} + +const validTrack = (name = 'page_view') => ({ + type: 'track' as const, + payload: { name, properties: { __path: '/home' } }, +}); + +const validIdentify = (profileId = 'user-1') => ({ + type: 'identify' as const, + payload: { profileId, email: 'a@b.com' }, +}); + +// ─── Tests ──────────────────────────────────────────────────────────────────── + +describe('POST /track/batch — auth & envelope', () => { + it('returns 401 without client-id', async () => { + const res = await postBatch({ events: [validTrack()] }, { + 'content-type': 'application/json', + }); + expect(res.statusCode).toBe(401); + }); + + it('returns 400 on empty events array', async () => { + const res = await postBatch({ events: [] }); + expect(res.statusCode).toBe(400); + }); + + it('returns 400 on missing events field', async () => { + const res = await postBatch({}); + expect(res.statusCode).toBe(400); + }); + + it('returns 400 when array exceeds the per-request cap', async () => { + const events = Array.from({ length: 2001 }, () => validTrack()); + const res = await postBatch({ events }); + expect(res.statusCode).toBe(400); + }); +}); + +describe('POST /track/batch — happy path', () => { + it('accepts a single track event and queues it', async () => { + const res = await postBatch({ events: [validTrack()] }); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body).toEqual({ accepted: 1, rejected: [] }); + expect(queueAdd).toHaveBeenCalledTimes(1); + }); + + it('accepts a mixed batch (track + identify) and dispatches each', async () => { + const res = await postBatch({ + events: [validTrack('signup'), validIdentify('alice'), validTrack('purchase')], + }); + expect(res.statusCode).toBe(202); + expect(res.json()).toEqual({ accepted: 3, rejected: [] }); + expect(queueAdd).toHaveBeenCalledTimes(2); // two `track` events + expect(upsertProfileMock).toHaveBeenCalledTimes(1); // one `identify` + }); + + it('treats each event as if sent one by one (per-event queue add)', async () => { + const events = Array.from({ length: 5 }, (_, i) => validTrack(`event_${i}`)); + const res = await postBatch({ events }); + expect(res.statusCode).toBe(202); + expect(res.json()).toEqual({ accepted: 5, rejected: [] }); + expect(queueAdd).toHaveBeenCalledTimes(5); + }); +}); + +describe('POST /track/batch — per-item validation', () => { + it('rejects bad rows by index without failing the batch', async () => { + const res = await postBatch({ + events: [ + validTrack('good_1'), + { type: 'track', payload: { name: '' } }, // empty name → invalid + validTrack('good_2'), + { type: 'wrong-type', payload: {} }, // unknown discriminator + ], + }); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body.accepted).toBe(2); + expect(body.rejected).toHaveLength(2); + expect(body.rejected.map((r: { index: number }) => r.index).sort()).toEqual([1, 3]); + expect(body.rejected.every((r: { reason: string }) => r.reason === 'validation')).toBe(true); + expect(queueAdd).toHaveBeenCalledTimes(2); + }); + + it('rejects alias as per-item validation (does not 400 the whole batch)', async () => { + const res = await postBatch({ + events: [ + validTrack(), + { type: 'alias', payload: { profileId: 'user-1', alias: 'u1' } }, + ], + }); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body.accepted).toBe(1); + expect(body.rejected).toHaveLength(1); + expect(body.rejected[0]).toMatchObject({ + index: 1, + reason: 'validation', + }); + expect(body.rejected[0].error).toMatch(/alias/i); + }); + + it('populates sessionId for events with __deviceId override', async () => { + // When a client supplies __deviceId, the API still resolves a + // sessionId — first via the live-session Redis lookup, then a + // deterministic 30-min bucket keyed on the event's __timestamp. + // Both deviceId and sessionId reach the queue. + const res = await postBatch({ + events: [ + { + type: 'track' as const, + payload: { + name: 'probe_device_override', + properties: { + __deviceId: 'mobile-device-abc', + __path: '/home', + }, + }, + }, + ], + }); + expect(res.statusCode).toBe(202); + expect(res.json()).toEqual({ accepted: 1, rejected: [] }); + expect(queueAdd).toHaveBeenCalledTimes(1); + const queuedJob = queueAdd.mock.calls[0]?.[0]; + expect(queuedJob.data.deviceId).toBe('mobile-device-abc'); + expect(queuedJob.data.sessionId).toBeTruthy(); + expect(queuedJob.data.sessionId.length).toBeGreaterThan(0); + }); + + it('buckets historical events by __timestamp, not request time', async () => { + // Two events on the same device, 1h apart in __timestamp. They + // should land in different deterministic 30-min buckets and thus + // get different sessionIds, even though they arrive in the same + // request. Anchored to "now" so the events stay inside the 5-day + // acceptance window when the test runs. + const baseMs = Date.now() - 2 * 60 * 60 * 1000; // 2h ago + const res = await postBatch({ + events: [ + { + type: 'track' as const, + payload: { + name: 'probe_bucket_a', + properties: { + __deviceId: 'shared-device', + __timestamp: new Date(baseMs).toISOString(), + }, + }, + }, + { + type: 'track' as const, + payload: { + name: 'probe_bucket_b', + properties: { + __deviceId: 'shared-device', + __timestamp: new Date(baseMs + 60 * 60 * 1000).toISOString(), + }, + }, + }, + ], + }); + expect(res.statusCode).toBe(202); + expect(res.json()).toEqual({ accepted: 2, rejected: [] }); + expect(queueAdd).toHaveBeenCalledTimes(2); + const sessionIdA = queueAdd.mock.calls[0]?.[0].data.sessionId; + const sessionIdB = queueAdd.mock.calls[1]?.[0].data.sessionId; + expect(sessionIdA).toBeTruthy(); + expect(sessionIdB).toBeTruthy(); + expect(sessionIdA).not.toBe(sessionIdB); + }); + + it('shares sessionId across events in the same 30-min bucket', async () => { + // Two events on the same device, 5 min apart inside the same + // wall-clock 30-min bucket. They should share a sessionId. Anchor + // to the bucket that closed ~1 hour ago so both timestamps are in + // the past (avoiding the future-timestamp guard) and well within + // the 5-day acceptance window. + const WINDOW_MS = 30 * 60 * 1000; + const oneHourAgoBucket = + Math.floor((Date.now() - 60 * 60 * 1000) / WINDOW_MS) * WINDOW_MS; + const baseMs = oneHourAgoBucket + 60_000; // 1 min past bucket start (past the grace) + const res = await postBatch({ + events: [ + { + type: 'track' as const, + payload: { + name: 'probe_same_bucket_a', + properties: { + __deviceId: 'same-bucket-device', + __timestamp: new Date(baseMs).toISOString(), + }, + }, + }, + { + type: 'track' as const, + payload: { + name: 'probe_same_bucket_b', + properties: { + __deviceId: 'same-bucket-device', + __timestamp: new Date(baseMs + 5 * 60 * 1000).toISOString(), + }, + }, + }, + ], + }); + expect(res.statusCode).toBe(202); + expect(res.json()).toEqual({ accepted: 2, rejected: [] }); + expect(queueAdd).toHaveBeenCalledTimes(2); + const sessionIdA = queueAdd.mock.calls[0]?.[0].data.sessionId; + const sessionIdB = queueAdd.mock.calls[1]?.[0].data.sessionId; + expect(sessionIdA).toBe(sessionIdB); + }); + + it('rejects events with __timestamp older than 5 days', async () => { + // Older events should be rejected per-row with a clear reason. + const sixDaysAgo = Date.now() - 6 * 24 * 60 * 60 * 1000; + const fourDaysAgo = Date.now() - 4 * 24 * 60 * 60 * 1000; + const res = await postBatch({ + events: [ + // valid (4 days old, within window) + { + type: 'track' as const, + payload: { + name: 'probe_within_window', + properties: { + __deviceId: 'd-1', + __timestamp: new Date(fourDaysAgo).toISOString(), + }, + }, + }, + // too old (6 days) + { + type: 'track' as const, + payload: { + name: 'probe_too_old', + properties: { + __deviceId: 'd-2', + __timestamp: new Date(sixDaysAgo).toISOString(), + }, + }, + }, + ], + }); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body.accepted).toBe(1); + expect(body.rejected).toHaveLength(1); + expect(body.rejected[0]).toMatchObject({ + index: 1, + reason: 'validation', + }); + expect(body.rejected[0].error).toMatch(/5 days/i); + expect(queueAdd).toHaveBeenCalledTimes(1); + }); + + it('returns 202 with accepted=0 when every event fails validation', async () => { + const res = await postBatch({ + events: [ + { type: 'track', payload: { name: '' } }, + { type: 'track', payload: {} }, + { type: 'identify', payload: {} }, + ], + }); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body.accepted).toBe(0); + expect(body.rejected).toHaveLength(3); + expect(queueAdd).not.toHaveBeenCalled(); + }); + + // Regression: per-event processing is chunked (BATCH_CONCURRENCY = 50). + // A 200-event batch spans 4 chunks. Verifies that rejected indices land in + // the right positions across chunk boundaries — including the very first + // event in chunk 1, the last event in chunk 2, and one in chunk 4 — which + // would catch off-by-one slicing or out-of-order result accumulation. + it('preserves per-index results across chunk boundaries', async () => { + const SIZE = 200; + const badIndices = new Set([0, 50, 99, 100, 149, 199]); + const events = Array.from({ length: SIZE }, (_, i) => + badIndices.has(i) + ? { type: 'track', payload: { name: '' } } // invalid + : validTrack(`chunked_${i}`), + ); + const res = await postBatch({ events }); + expect(res.statusCode).toBe(202); + const body = res.json(); + expect(body.accepted).toBe(SIZE - badIndices.size); + expect(body.rejected).toHaveLength(badIndices.size); + const rejectedIndices = new Set( + body.rejected.map((r: { index: number }) => r.index), + ); + expect(rejectedIndices).toEqual(badIndices); + expect(queueAdd).toHaveBeenCalledTimes(SIZE - badIndices.size); + }); +}); diff --git a/apps/api/src/routes/track.router.ts b/apps/api/src/routes/track.router.ts index 07afbcdc2..9d1d73b5a 100644 --- a/apps/api/src/routes/track.router.ts +++ b/apps/api/src/routes/track.router.ts @@ -1,25 +1,40 @@ -import { zTrackHandlerPayload } from '@openpanel/validation'; +import { + TRACK_BATCH_MAX_EVENTS, + zTrackBatchBody, + zTrackHandlerPayload, +} from '@openpanel/validation'; import type { FastifyPluginAsyncZodOpenApi } from 'fastify-zod-openapi'; import { z } from 'zod'; -import { fetchDeviceId, handler } from '@/controllers/track.controller'; +import { + batchHandler, + fetchDeviceId, + handler, +} from '@/controllers/track.controller'; import { clientHook } from '@/hooks/client.hook'; import { duplicateHook } from '@/hooks/duplicate.hook'; import { isBotHook } from '@/hooks/is-bot.hook'; +// Per-route body limit for /track/batch: 10 MB uncompressed, matching the +// stated public contract ("up to 2000 events and 10 MB per request"). +const TRACK_BATCH_BODY_LIMIT_BYTES = 10 * 1024 * 1024; + const trackRouter: FastifyPluginAsyncZodOpenApi = async (fastify) => { - fastify.addHook('preValidation', duplicateHook); fastify.addHook('preHandler', clientHook); fastify.addHook('preHandler', isBotHook); await fastify.route({ method: 'POST', url: '/', + // The 100 ms body-hash dedup only runs on the single-event endpoint — + // applying it batch-wide would drop a whole 1000-event retry on hash + // collision, which is the opposite of what we want. + preValidation: duplicateHook, schema: { body: zTrackHandlerPayload.and( z.object({ clientId: z.string().optional(), clientSecret: z.string().optional(), - }) + }), ), tags: ['Track'], description: @@ -34,6 +49,30 @@ const trackRouter: FastifyPluginAsyncZodOpenApi = async (fastify) => { handler, }); + await fastify.route({ + method: 'POST', + url: '/batch', + bodyLimit: TRACK_BATCH_BODY_LIMIT_BYTES, + schema: { + body: zTrackBatchBody, + tags: ['Track'], + description: `We accept up to ${TRACK_BATCH_MAX_EVENTS} events and 10MB uncompressed per request. Events are part of the request body. Each event is dispatched through the same pipeline as POST /track. Per-event validation failures are returned in the rejected[] array — the whole batch does not fail on a single bad row.`, + response: { + 202: z.object({ + accepted: z.number().int().min(0), + rejected: z.array( + z.object({ + index: z.number().int().min(0), + reason: z.enum(['validation', 'internal']), + error: z.string(), + }), + ), + }), + }, + }, + handler: batchHandler, + }); + await fastify.route({ method: 'GET', url: '/device-id', diff --git a/apps/api/src/utils/ids.ts b/apps/api/src/utils/ids.ts index 3614da1b7..d579845fb 100644 --- a/apps/api/src/utils/ids.ts +++ b/apps/api/src/utils/ids.ts @@ -10,15 +10,31 @@ export async function getDeviceId({ ua, salts, overrideDeviceId, + eventMs, }: { projectId: string; ip: string; ua: string | undefined; salts: { current: string; previous: string }; overrideDeviceId?: string; -}) { + /** + * Wall-clock time of the event being processed. Used as the bucket input for + * the deterministic session_id fallback so historical/buffered events bucket + * into the window they actually happened in, not the moment the API + * received them. + */ + eventMs: number; +}): Promise { + // Client-supplied stable device id (mobile/server SDKs). We still need to + // resolve a sessionId — first try the live session lookup keyed on this + // exact deviceId, then fall back to the deterministic 30-min bucket. if (overrideDeviceId) { - return { deviceId: overrideDeviceId, sessionId: '' }; + return getInfoFromSession({ + projectId, + currentDeviceId: overrideDeviceId, + previousDeviceId: overrideDeviceId, + eventMs, + }); } if (!ua) { @@ -42,6 +58,7 @@ export async function getDeviceId({ projectId, currentDeviceId, previousDeviceId, + eventMs, }); } @@ -54,10 +71,12 @@ async function getInfoFromSession({ projectId, currentDeviceId, previousDeviceId, + eventMs, }: { projectId: string; currentDeviceId: string; previousDeviceId: string; + eventMs: number; }): Promise { try { const multi = getRedisCache().multi(); @@ -101,6 +120,7 @@ async function getInfoFromSession({ sessionId: getSessionId({ projectId, deviceId: currentDeviceId, + eventMs, graceMs: 5 * 1000, windowMs: 1000 * 60 * 30, }), diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index 5035e3186..ee1282de4 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -1,6 +1,6 @@ import { getTime, isSameDomain, parsePath } from '@openpanel/common'; import { getReferrerWithQuery, parseReferrer } from '@openpanel/common/server'; -import type { IServiceCreateEventPayload, IServiceEvent } from '@openpanel/db'; +import type { IServiceCreateEventPayload } from '@openpanel/db'; import { checkNotificationRulesForEvent, createEvent, @@ -10,14 +10,44 @@ import { } from '@openpanel/db'; import type { ILogger } from '@openpanel/logger'; import type { EventsQueuePayloadIncomingEvent } from '@openpanel/queue'; +import { getLock } from '@openpanel/redis'; import { anyPass, isEmpty, isNil, mergeDeepRight, omit, reject } from 'ramda'; import { logger as baseLogger } from '@/utils/logger'; import { createSessionEndJob, extendSessionEndJob, getActiveSessionEndJob, + SESSION_TIMEOUT, } from '@/utils/session-handler'; +/** + * Acquire a Redis-backed lock that prevents duplicate session_start rows for + * the same `(projectId, sessionId)`. Returns true if THIS caller should emit + * the session_start row; false if another worker (or earlier event in the + * same batch) already claimed it. + * + * TTL matches SESSION_TIMEOUT — a session can't extend beyond 30 min of + * inactivity in the live mechanism, and the deterministic bucket is exactly + * 30 min wide. By the time the lock TTL elapses, the session itself has + * rolled. + * + * Keyed on sessionId (not deviceId) so historical events from the same + * device but different 30-min buckets each get their own session_start. + */ +async function acquireSessionStartLock( + projectId: string, + sessionId: string, +): Promise { + if (!sessionId) { + return false; + } + return getLock( + `session_start:${projectId}:${sessionId}`, + '1', + SESSION_TIMEOUT, + ); +} + const GLOBAL_PROPERTIES = ['__path', '__referrer', '__timestamp', '__revenue']; // This function will merge two objects. @@ -113,7 +143,14 @@ export async function incomingEvent( // this will get the profileId from the alias table if it exists const profileId = body.profileId ? String(body.profileId) : ''; const createdAt = new Date(body.timestamp); - const isTimestampFromThePast = body.isTimestampFromThePast; + // "Live" = the event is recent enough that it could plausibly belong to + // an active in-memory session. We use the same window as SESSION_TIMEOUT + // (30 min) so historical events never push the live sessionEnd job + // forward or create new live sessions. Server-side events are always + // treated as non-live (they get session enrichment from sessionBuffer + // when a profile is supplied; otherwise they keep the API-computed id). + const isLiveEvent = + !uaInfo.isServer && Date.now() - createdAt.getTime() <= SESSION_TIMEOUT; const url = getProperty('__path'); const { path, hash, query, origin } = parsePath(url); const referrer = isSameDomain(getProperty('__referrer'), url) @@ -133,6 +170,7 @@ export async function incomingEvent( ...properties, __hash: hash, __query: query, + __syncedAt: new Date().toISOString(), }), groups: body.groups ?? [], createdAt, @@ -162,40 +200,42 @@ export async function incomingEvent( : undefined, }; - // if timestamp is from the past we dont want to create a new session - if (uaInfo.isServer || isTimestampFromThePast) { - const session = - profileId && !isTimestampFromThePast - ? await sessionBuffer.getExistingSession({ - profileId, - projectId, - }) - : null; + // Server-side events: when a profileId is supplied, enrich from the + // user's most recent browser session (deviceId, sessionId, geo, UA, path, + // referrer). Without a session, fall back to the API-computed identity. + // Server events never create or extend live sessions in Redis. + if (uaInfo.isServer) { + const enrichment = profileId + ? await sessionBuffer.getExistingSession({ profileId, projectId }) + : null; - const payload = { - ...baseEvent, - deviceId: session?.device_id ?? '', - sessionId: session?.id ?? '', - referrer: session?.referrer ?? undefined, - referrerName: session?.referrer_name ?? undefined, - referrerType: session?.referrer_type ?? undefined, - path: session?.exit_path ?? baseEvent.path, - origin: session?.exit_origin ?? baseEvent.origin, - os: session?.os ?? baseEvent.os, - osVersion: session?.os_version ?? baseEvent.osVersion, - browserVersion: session?.browser_version ?? baseEvent.browserVersion, - browser: session?.browser ?? baseEvent.browser, - device: session?.device ?? baseEvent.device, - brand: session?.brand ?? baseEvent.brand, - model: session?.model ?? baseEvent.model, - city: session?.city ?? baseEvent.city, - country: session?.country ?? baseEvent.country, - region: session?.region ?? baseEvent.region, - longitude: session?.longitude ?? baseEvent.longitude, - latitude: session?.latitude ?? baseEvent.latitude, - }; + const payload: IServiceCreateEventPayload = enrichment + ? { + ...baseEvent, + deviceId: enrichment.device_id, + sessionId: enrichment.id, + referrer: enrichment.referrer ?? undefined, + referrerName: enrichment.referrer_name ?? undefined, + referrerType: enrichment.referrer_type ?? undefined, + path: enrichment.exit_path ?? baseEvent.path, + origin: enrichment.exit_origin ?? baseEvent.origin, + os: enrichment.os ?? baseEvent.os, + osVersion: enrichment.os_version ?? baseEvent.osVersion, + browserVersion: + enrichment.browser_version ?? baseEvent.browserVersion, + browser: enrichment.browser ?? baseEvent.browser, + device: enrichment.device ?? baseEvent.device, + brand: enrichment.brand ?? baseEvent.brand, + model: enrichment.model ?? baseEvent.model, + city: enrichment.city ?? baseEvent.city, + country: enrichment.country ?? baseEvent.country, + region: enrichment.region ?? baseEvent.region, + longitude: enrichment.longitude ?? baseEvent.longitude, + latitude: enrichment.latitude ?? baseEvent.latitude, + } + : baseEvent; - return createEventAndNotify(payload as IServiceEvent, logger, projectId); + return createEventAndNotify(payload, logger, projectId); } const activeSessionEndJob = await getActiveSessionEndJob( @@ -219,6 +259,34 @@ export async function incomingEvent( return null; } + // Historical (buffered) events: the API has already computed a + // deterministic sessionId for them. Write the event and emit one + // session_start per bucket (Redis lock dedups across batches and + // workers). Do NOT touch live session state — historical events + // must not extend the user's current session or schedule a 30-min + // sessionEnd timer. + if (!isLiveEvent) { + if (await acquireSessionStartLock(projectId, sessionId)) { + await createEventAndNotify( + { + ...payload, + name: 'session_start', + createdAt: new Date(getTime(payload.createdAt) - 100), + }, + logger, + projectId, + ).catch((error) => { + logger.error( + { err: error, event: payload }, + 'Error creating historical session start event', + ); + // Don't throw — historical session_start is best-effort. The + // event itself should still land. + }); + } + return createEventAndNotify(payload, logger, projectId); + } + if (activeSessionEndJob) { await extendSessionEndJob({ projectId, @@ -227,7 +295,11 @@ export async function incomingEvent( }).catch((error) => { logger.warn({ err: error }, 'Failed to extend session end job'); }); - } else { + } else if (await acquireSessionStartLock(projectId, sessionId)) { + // Lock prevents the previously-observed batch race: when N events for + // the same device land in the API in parallel, all see no Redis + // sessionEnd key yet, all queue with session: undefined, and would + // each try to emit session_start. The lock collapses them to one. await createEventAndNotify( { ...payload, @@ -251,6 +323,17 @@ export async function incomingEvent( ); throw error; }); + } else { + // Another worker (or earlier event in this batch) claimed the + // session_start. Still ensure a sessionEnd is scheduled so the + // session closes cleanly. createSessionEndJob is idempotent on + // jobId, so this is a no-op when the job already exists. + await createSessionEndJob({ payload }).catch((error) => { + logger.warn( + { err: error, event: payload }, + 'Failed to schedule session end job (lock not acquired)', + ); + }); } return createEventAndNotify(payload, logger, projectId); diff --git a/apps/worker/src/jobs/events.incoming-events.test.ts b/apps/worker/src/jobs/events.incoming-events.test.ts index 30547d7dc..345b57fd7 100644 --- a/apps/worker/src/jobs/events.incoming-events.test.ts +++ b/apps/worker/src/jobs/events.incoming-events.test.ts @@ -26,6 +26,16 @@ vi.mock('@openpanel/db', async () => { }, }; }); +// Mock the session_start dedup lock so tests don't need a live Redis. By +// default the lock is acquired (true) so existing tests' session_start +// expectations still hold; individual tests can override per-call. +vi.mock('@openpanel/redis', async () => { + const actual = await vi.importActual('@openpanel/redis'); + return { + ...actual, + getLock: vi.fn().mockResolvedValue(true), + }; +}); // 30 minutes const SESSION_TIMEOUT = 30 * 60 * 1000; @@ -79,7 +89,6 @@ describe('incomingEvent', () => { event: { name: 'test_event', timestamp: timestamp.toISOString(), - isTimestampFromThePast: false, properties: { __path: 'https://example.com/test' }, }, uaInfo, @@ -174,7 +183,6 @@ describe('incomingEvent', () => { name: 'test_event', timestamp: timestamp.toISOString(), properties: { __path: 'https://example.com/test' }, - isTimestampFromThePast: false, }, headers: { 'request-id': '123', @@ -256,7 +264,6 @@ describe('incomingEvent', () => { timestamp: timestamp.toISOString(), properties: { custom_property: 'test_value' }, profileId: 'profile-123', - isTimestampFromThePast: false, }, headers: { 'user-agent': 'OpenPanel Server/1.0', @@ -361,7 +368,6 @@ describe('incomingEvent', () => { timestamp: timestamp.toISOString(), properties: { custom_property: 'test_value' }, profileId: 'profile-123', - isTimestampFromThePast: false, }, headers: { 'user-agent': 'OpenPanel Server/1.0', @@ -379,6 +385,9 @@ describe('incomingEvent', () => { expect((createEvent as Mock).mock.calls[0]![0]).toStrictEqual({ name: 'server_event', + // Server event with profileId but no existing session: keep the + // API-computed identity instead of blanking deviceId/sessionId. + // The fixture sends '' for both so that's what we expect here. deviceId: '', sessionId: '', profileId: 'profile-123', @@ -405,9 +414,12 @@ describe('incomingEvent', () => { duration: 0, path: '', origin: '', - referrer: undefined, - referrerName: undefined, - referrerType: undefined, + // baseEvent fields fall through uniformly when there's no + // session enrichment available — empty strings for all referrer + // fields rather than the previous mix of undefined/''. + referrer: '', + referrerName: '', + referrerType: '', sdkName: 'server', sdkVersion: '1.0.0', groups: [], @@ -434,7 +446,6 @@ describe('incomingEvent', () => { event: { name: eventName, timestamp: new Date().toISOString(), - isTimestampFromThePast: false, properties: { __path: 'https://example.com/test' }, }, uaInfo, @@ -485,4 +496,102 @@ describe('incomingEvent', () => { // events extend the existing one via changeDelay. expect(spySessionsQueueAdd).toHaveBeenCalledTimes(1); }); + + it('does not emit duplicate session_start when lock is held', async () => { + const { getLock } = await import('@openpanel/redis'); + // Simulate "another worker already claimed session_start" by failing + // the lock acquisition. Live event still fires; sessionEnd job is + // still scheduled (it's idempotent on jobId). + vi.mocked(getLock).mockResolvedValueOnce(false); + // No active session-end job exists yet — force getJob to undefined so the + // worker falls into the "no active session" branch where the lock check + // matters. (vi.clearAllMocks resets call history but keeps implementations + // set via mockResolvedValue in previous tests.) + vi.spyOn(sessionsQueue, 'getJob').mockResolvedValue(undefined); + const spySessionsQueueAdd = vi + .spyOn(sessionsQueue, 'add') + .mockResolvedValue({} as Job); + + const timestamp = new Date(); + const jobData: EventsQueuePayloadIncomingEvent['payload'] = { + geo, + event: { + name: 'live_event', + timestamp: timestamp.toISOString(), + properties: { __path: 'https://example.com/test' }, + }, + uaInfo, + headers: { + 'request-id': '123', + 'user-agent': + 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 Chrome/91.0.4472.124', + 'openpanel-sdk-name': 'web', + 'openpanel-sdk-version': '1.0.0', + }, + projectId, + deviceId, + sessionId: newSessionId, + }; + (createEvent as Mock).mockReturnValue({}); + + await incomingEvent(jobData); + + // No session_start emission (lock not acquired) + const startCalls = (createEvent as Mock).mock.calls.filter( + (call) => call[0]?.name === 'session_start', + ); + expect(startCalls).toHaveLength(0); + // Live event itself still gets created + const liveCalls = (createEvent as Mock).mock.calls.filter( + (call) => call[0]?.name === 'live_event', + ); + expect(liveCalls).toHaveLength(1); + // sessionEnd is still scheduled even when lock not acquired (idempotent) + expect(spySessionsQueueAdd).toHaveBeenCalled(); + }); + + it('historical event preserves API-computed deviceId/sessionId', async () => { + // Event with __timestamp older than SESSION_TIMEOUT (30 min). Worker + // should write it with the deviceId/sessionId the API computed, + // without scheduling sessionEnd (live state untouched). + const oneHourAgo = new Date(Date.now() - 60 * 60 * 1000); + const jobData: EventsQueuePayloadIncomingEvent['payload'] = { + geo, + event: { + name: 'historical_event', + timestamp: oneHourAgo.toISOString(), + properties: { __path: 'https://example.com/replay' }, + }, + uaInfo, + headers: { + 'request-id': '123', + 'user-agent': + 'Mozilla/5.0 (iPhone; CPU iPhone OS 17_2 like Mac OS X) AppleWebKit/605.1.15', + 'openpanel-sdk-name': 'react-native', + 'openpanel-sdk-version': '1.0.0', + }, + projectId, + deviceId: 'mobile-device-xyz', + sessionId: 'deterministic-bucket-id', + }; + + (createEvent as Mock).mockReturnValue({}); + await incomingEvent(jobData); + + // Live state untouched: no sessionEnd job scheduled + expect(sessionsQueue.add).not.toHaveBeenCalled(); + // Two createEvent calls: one for the historical session_start (lock + // acquired by default in the redis mock), one for the event itself + expect((createEvent as Mock).mock.calls).toHaveLength(2); + const startCall = (createEvent as Mock).mock.calls.find( + (call) => call[0]?.name === 'session_start', + ); + const eventCall = (createEvent as Mock).mock.calls.find( + (call) => call[0]?.name === 'historical_event', + ); + expect(startCall).toBeDefined(); + expect(eventCall).toBeDefined(); + expect(eventCall![0].deviceId).toBe('mobile-device-xyz'); + expect(eventCall![0].sessionId).toBe('deterministic-bucket-id'); + }); }); diff --git a/packages/queue/src/queues.ts b/packages/queue/src/queues.ts index bb7b8cb45..8858bb052 100644 --- a/packages/queue/src/queues.ts +++ b/packages/queue/src/queues.ts @@ -33,7 +33,6 @@ export interface EventsQueuePayloadIncomingEvent { projectId: string; event: ITrackPayload & { timestamp: string | number; - isTimestampFromThePast: boolean; }; uaInfo: | { diff --git a/packages/validation/src/track.validation.ts b/packages/validation/src/track.validation.ts index eae1f055e..ec180516c 100644 --- a/packages/validation/src/track.validation.ts +++ b/packages/validation/src/track.validation.ts @@ -220,6 +220,19 @@ export const zTrackHandlerPayload = z.discriminatedUnion('type', [ .meta({ title: 'Assign Group' }), ]) satisfies z.ZodType; +// Batch ingestion: envelope is validated strictly (array length only); per-event +// validation runs inside the controller via `safeParse(zTrackHandlerPayload)` so +// invalid items can be rejected per-index without failing the whole batch. +// +// Per-request caps: up to 2000 events and 10 MB uncompressed body. +export const TRACK_BATCH_MAX_EVENTS = 2000; + +export const zTrackBatchBody = z.object({ + events: z.array(z.unknown()).min(1).max(TRACK_BATCH_MAX_EVENTS), +}); + +export type ITrackBatchBody = z.infer; + // Deprecated types for beta version of the SDKs export interface DeprecatedOpenpanelEventOptions { From 2ef16523e1034c0875f9d586522637badf0044cc Mon Sep 17 00:00:00 2001 From: Mayank Raj Date: Thu, 28 May 2026 05:19:58 +0530 Subject: [PATCH 2/3] fix: add __syncedAt to worker test assertions Tests use toStrictEqual, so the new __syncedAt property in event properties needs to be included in assertions. Uses expect.any(String) since the exact ISO timestamp varies per run. Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/worker/src/jobs/events.incoming-events.test.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/apps/worker/src/jobs/events.incoming-events.test.ts b/apps/worker/src/jobs/events.incoming-events.test.ts index 345b57fd7..65b9b0b55 100644 --- a/apps/worker/src/jobs/events.incoming-events.test.ts +++ b/apps/worker/src/jobs/events.incoming-events.test.ts @@ -115,6 +115,7 @@ describe('incomingEvent', () => { properties: { __hash: undefined, __query: undefined, + __syncedAt: expect.any(String), }, createdAt: timestamp, country: 'US', @@ -223,6 +224,7 @@ describe('incomingEvent', () => { properties: { __hash: undefined, __query: undefined, + __syncedAt: expect.any(String), }, createdAt: timestamp, country: 'US', @@ -330,6 +332,7 @@ describe('incomingEvent', () => { custom_property: 'test_value', __hash: undefined, __query: undefined, + __syncedAt: expect.any(String), }, createdAt: timestamp, country: 'CA', @@ -396,6 +399,7 @@ describe('incomingEvent', () => { custom_property: 'test_value', __hash: undefined, __query: undefined, + __syncedAt: expect.any(String), }, createdAt: timestamp, country: 'US', From 6149a204b63c6cc535ed32087fa37e011f721fb3 Mon Sep 17 00:00:00 2001 From: Mayank Raj Date: Thu, 28 May 2026 05:24:02 +0530 Subject: [PATCH 3/3] fix: use baseEvent fallback for referrer fields in server event enrichment When session enrichment has null referrer fields, fall back to baseEvent values instead of undefined. Consistent with all other enrichment fields (path, os, browser, geo, etc.). Co-Authored-By: Claude Opus 4.7 (1M context) --- apps/worker/src/jobs/events.incoming-event.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/apps/worker/src/jobs/events.incoming-event.ts b/apps/worker/src/jobs/events.incoming-event.ts index ee1282de4..9922fd43d 100644 --- a/apps/worker/src/jobs/events.incoming-event.ts +++ b/apps/worker/src/jobs/events.incoming-event.ts @@ -214,9 +214,9 @@ export async function incomingEvent( ...baseEvent, deviceId: enrichment.device_id, sessionId: enrichment.id, - referrer: enrichment.referrer ?? undefined, - referrerName: enrichment.referrer_name ?? undefined, - referrerType: enrichment.referrer_type ?? undefined, + referrer: enrichment.referrer ?? baseEvent.referrer, + referrerName: enrichment.referrer_name ?? baseEvent.referrerName, + referrerType: enrichment.referrer_type ?? baseEvent.referrerType, path: enrichment.exit_path ?? baseEvent.path, origin: enrichment.exit_origin ?? baseEvent.origin, os: enrichment.os ?? baseEvent.os,