Skip to content

feat: bulk event ingestion (POST /track/batch) for offline-first SDKs#377

Open
mraj602-tohands wants to merge 3 commits into
Openpanel-dev:mainfrom
Tohands-Private-Limited:feat/batch-v2-upstream
Open

feat: bulk event ingestion (POST /track/batch) for offline-first SDKs#377
mraj602-tohands wants to merge 3 commits into
Openpanel-dev:mainfrom
Tohands-Private-Limited:feat/batch-v2-upstream

Conversation

@mraj602-tohands
Copy link
Copy Markdown

@mraj602-tohands mraj602-tohands commented May 27, 2026

Supersedes #374. Since that PR was opened, upstream merged significant changes to the buffer subsystem, Kafka client, and ClickHouse round-robin layer that conflicted with the batch v1 branch. Rather than rebase the increasingly divergent branch, we reimplemented the batch feature on a clean branch from current main. The batch logic is functionally identical to #374; the only addition is __syncedAt (see below). Closing #374 in favour of this PR.

Closes #373.

Summary

Adds POST /track/batch — accepts up to 2000 events / 10 MB per request and back-dates each event to its own __timestamp (up to 5 days in the past). Built so offline-capable SDKs (mobile, IoT, edge) can drain a local buffer in one round-trip and have those events land in the right historical session, not all stamped with arrival time.

What's new vs #374

Change Detail
Rebased on current main Clean branch on top of the buffer-perf, Kafka client, and ClickHouse round-robin changes that landed since #374 was opened. No merge conflicts.
__syncedAt property Every event processed by the worker now gets a server-stamped __syncedAt (ISO 8601) in its properties, recording when the worker actually handled the event. For batch ingestion this is distinct from createdAt (the client's __timestamp), making it easy to diagnose ingestion latency and spot retried events.
No other functional changes The batch endpoint, deterministic session bucketing, live/historical split, Redis session_start lock, 5-day rejection, and concurrency bounding are all identical to #374.

Architecture

Per-row rejection instead of whole-batch failure

A 2000-event batch where one row is malformed shouldn't fail the other 1999. The controller safeParses each row through zTrackHandlerPayload; failures become { index, reason: 'validation' | 'internal', error } entries in rejected[]. Caller gets 202 with { accepted, rejected[] } and can re-send only the bad indices.

alias is rejected per-row with reason: 'validation', matching the existing single-event /track 400 for alias. Same behaviour, surfaced consistently in both shapes.

Salts + geo fetched once per request

buildSharedRequestContext does the salts query and request-IP geo lookup once. buildEventContext then uses those for every event. Only events that override __ip trigger a second geo lookup. For a typical 2000-event batch this is the difference between 1 PG round-trip + 1 geo lookup vs. 4000 of each.

2000 events / 10 MB caps

Mixpanel /import uses the same numbers. Going higher means a single bad request can OOM a Fastify worker. Going lower forces SDKs to chunk excessively for the IoT use case (a device with a week of 1-min readings has 10080 events; 2000-per-batch = 5 round-trips, which is fine; 1000-per-batch = 11 round-trips, noticeably worse on a flaky cell network).

The 10 MB body cap is enforced via Fastify's existing limit; no new mechanism needed.

Deterministic session_id from __timestamp

This is the load-bearing change for the IoT use case. Before this PR, when a client sends __deviceId (override path), the server returned sessionId: '' and let the worker re-derive a session at processing time. The worker uses Date.now() for that derivation. Result: a 6-hour-old buffered event lands in whatever session is currently active for that device, not the session it belonged to when captured.

After this PR:

  • getDeviceId accepts an eventMs parameter (the event's own timestamp, not wall-clock now)
  • The deterministic 30-min session bucket is keyed on eventMs
  • The __deviceId override path no longer short-circuits to empty sessionId
  • Every event leaves the API with both deviceId and sessionId populated

A batch covering 5 days of buffered readings produces 240 distinct sessions (one per 30-min bucket per device), each with session_start back-dated to the actual capture time. Dashboards show the activity in the right hour of the right day.

Hard 5-day floor

Same as Mixpanel's /import contract. The deterministic session bucket has finite memory in Redis (TTL = 30 min), and emitting session_start rows for events older than the analytics tool will reasonably backfill is wasteful. 5 days is the longest plausible offline window for the use cases this is designed to support.

In single-event /track this returns 400. In batch it surfaces as a per-row { reason: 'validation', error: 'event timestamp older than 5 days' }.

Three-way worker dispatch: server / historical / live

Before: the worker had two paths — server events (enrich from sessionBuffer if a profileId is supplied) and everything else (extend live session or emit session_start + schedule sessionEnd).

After: a third "historical" path. An event is historical when Date.now() - createdAt > SESSION_TIMEOUT (30 min) and not server-side. Historical events:

  • Write the event row with the API-computed deviceId / sessionId
  • Do not call extendSessionEndJob (would push the live timer for whatever current session exists)
  • Do not call createSessionEndJob (would schedule a 30-min close timer for a 6-hour-old session — meaningless)
  • Do emit one session_start per bucket via the lock

This keeps live session state and historical backfill in completely separate code paths. A device that's actively online while a batch of historical events is being processed for it doesn't get its current session disturbed.

Redis lock for session_start dedup

Two scenarios where duplicate session_start rows leak in without a lock:

  1. Live race: N events for the same (projectId, deviceId) arrive at the API in parallel. All see no Redis sessionEnd key. All queue with no active session. The worker — even with sequential per-device processing — has all N events trying to emit session_start before any of them has finished writing.

  2. Historical batch: a single batch contains 50 events spanning the same 30-min bucket. Each one independently tries to emit session_start.

The lock key is session_start:{projectId}:{sessionId}, TTL = SESSION_TIMEOUT (30 min). First caller acquires, emits, others skip. The lock-not-acquired branch still calls createSessionEndJob (idempotent on jobId) so the session still closes cleanly.

Keyed on sessionId, not deviceId, so two events from the same device but different historical buckets each get their own session_start.

__syncedAt — server-stamped sync time

Every event processed by the worker now gets __syncedAt: new Date().toISOString() in its properties. This is set at worker processing time (not API receipt time), so for batch ingestion the delta between createdAt (the client's __timestamp) and __syncedAt includes both network latency and queue wait time.

Use cases:

  • Diagnose batch ingestion pipeline latency
  • Spot retried events (same createdAt, different __syncedAt)
  • Distinguish "when did it happen" from "when did we process it"

Stored as a regular property (not a ClickHouse column) — no migration needed.

Bounded batch concurrency

Events within a batch are processed in chunks of 50 (BATCH_CONCURRENCY). Each event hits Redis (session lookup + queue add) and may trigger a geo lookup if __ip is overridden. Unbounded Promise.all over 2000 events can spike Redis pool usage and the geo provider's rate budget. 50 keeps the pipeline saturated without thundering-herd behaviour.

What this PR does NOT do

  • No insert_id / dedup. Two identical batches sent twice produce two copies of every event. Worker session_start is deduped via the Redis lock, but events themselves are not. This is deliberate so the dedup design can be discussed in isolation. Listed as out-of-scope in Bulk event ingestion (POST /track/batch) for offline-first SDKs — IoT, mobile, edge #373.
  • No SDK changes. API-only. SDKs continue to send single events; the batch endpoint is for new offline-buffer integrations and migration-time replays.
  • No request compression. Fastify can add gzip handling separately if it becomes a real bottleneck.

Production verification

Deployed against a self-hosted instance and ran ~80 assertions across 24 scenarios. All passing with ClickHouse cross-checks for session_id distribution, session_start counts, and bucket boundaries:

  • IoT 7-day backlog (168 events, 1/hour): 119 accepted, 119 distinct buckets, 119 session_starts at the historical timestamps. 49 rejected by the 5-day floor.
  • Cross-bucket boundary precision (events 30s apart spanning a bucket edge): 2 sessions, 2 session_starts, correct split.
  • Multi-device household (3 devices, shared IP): 3 separate sessions, no cross-talk.
  • Concurrent batches same device (3 parallel HTTP requests, 6 events total): 1 session_start total — Redis lock dedups across requests/workers.
  • Same-millisecond ties (20 events with identical timestamp): 20 events, 1 session, 1 session_start.
  • 5-day boundary precision (4d59m vs 5d1m): one accepted, one rejected.
  • Future timestamps: ≤1min preserved, >1min silently clamped to now (matches existing /track).
  • Per-event __ip override: distinct geo per event in CH within one shared session.

Test plan

  • pnpm vitest run apps/api/src/routes/track-batch.router.test.ts — 14 test cases
  • pnpm vitest run apps/worker/src/jobs/events.incoming-events.test.ts — existing + 2 new tests
  • Single-event /track regression — no behavioural change
  • Manual prod verification on self-hosted instance (24 scenarios above)
  • Verify __syncedAt appears in event properties in dashboard
  • Reviewer acceptance

🤖 Generated with Claude Code

Summary by CodeRabbit

  • New Features

    • Added batch event ingestion (up to 2,000 events) with per-item accepted/rejected results and deterministic session bucketing based on event timestamp.
    • Batch endpoint returns 202 with per-index validation/internal rejection reasons and uses bounded concurrency for processing.
  • Bug Fixes

    • Stronger timestamp validation: client timestamps older than 5 days are rejected.
    • Improved session_start deduplication and clearer live vs. historical event handling.
  • Tests

    • Added integration and worker tests covering batch ingestion, per-item validation, session behavior, and session_start locking.

Review Change Stack

Reimplements the batch ingestion feature from PR Openpanel-dev#374 on top of current
upstream/main (post buffer-perf, kafka client, ClickHouse round-robin).
Adds __syncedAt property to all worker-processed events.

See PR description for full architectural details.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai Bot commented May 27, 2026

No actionable comments were generated in the recent review. 🎉

ℹ️ Recent review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 6344404b-e8cf-4c1f-9f8d-6eefa67f88e9

📥 Commits

Reviewing files that changed from the base of the PR and between 2ef1652 and 6149a20.

📒 Files selected for processing (1)
  • apps/worker/src/jobs/events.incoming-event.ts
🚧 Files skipped from review as they are similar to previous changes (1)
  • apps/worker/src/jobs/events.incoming-event.ts

📝 Walkthrough

Walkthrough

This PR implements batch event ingestion (POST /track/batch) accepting up to 2000 timestamped events per request, enforces a 5-day historical acceptance window, derives session IDs deterministically from event timestamps rather than request time, and deduplicates session_start emission across workers using Redis locks.

Changes

Batch event ingestion with deterministic session bucketing

Layer / File(s) Summary
Batch validation schema and envelope contract
packages/validation/src/track.validation.ts
Define batch-ingestion envelope contract: TRACK_BATCH_MAX_EVENTS = 2000, zTrackBatchBody schema validating events array length, and inferred ITrackBatchBody type.
Timestamp validation and eventMs-based context threading
apps/api/src/controllers/track.controller.ts, apps/api/src/utils/ids.ts
Refactor getTimestamp to throw HttpError(400) for events older than 5 days (removing isTimestampFromThePast computation). Introduce buildSharedRequestContext and buildEventContext to fetch geo/salts once per request and reuse them per event. Thread eventMs parameter through getDeviceIdgetInfoFromSessiongetSessionId so deterministic session bucket uses event timestamp instead of request time.
Event controller and dispatch updates
apps/api/src/controllers/event.controller.ts, apps/api/src/controllers/track.controller.ts
Update postEvent to use only timestamp (not isTimestampFromThePast) and include eventMs in queued payload. Add dispatchEvent internal router to route validated payloads to handlers and update single-event handler to build context and return only deviceId/sessionId.
Batch handler implementation and POST /batch route
apps/api/src/controllers/track.controller.ts, apps/api/src/routes/track.router.ts
Implement batchHandler with BATCH_CONCURRENCY = 50, per-item validation via safeParse, per-event context + dispatch pipeline, and 202 response with per-index accepted/rejected plus reason ('validation' or 'internal'). Add POST /batch route with TRACK_BATCH_BODY_LIMIT_BYTES, zTrackBatchBody validation, and 202 schema. Move duplicateHook from router-level to route-level (single-event POST / only).
Queue payload type contract update
packages/queue/src/queues.ts
Remove isTimestampFromThePast: boolean from EventsQueuePayloadIncomingEvent.payload.event intersection type.
Worker session deduplication via Redis locks and live/historical branching
apps/worker/src/jobs/events.incoming-event.ts
Introduce acquireSessionStartLock to guard session_start emission per (projectId, sessionId) with TTL tied to SESSION_TIMEOUT. Add isLiveEvent classification (server-side and recent vs non-server-side or older than timeout). For server events with profileId, enrich from sessionBuffer. For historical events, optionally emit best-effort session_start guarded by lock (shifted createdAt), then create event. For live events without active sessionEnd, emit session_start only when lock acquired; when not acquired, schedule createSessionEndJob best-effort with warning. Add __syncedAt to stored event properties.
Batch endpoint integration tests and test harness
apps/api/src/routes/track-batch.router.test.ts
Establish test harness with hoisted mocks, Fastify app, and request helpers. Test envelope validation (401/400), happy path (202 with queue dispatch), per-item validation (index/reason/error mapping), sessionId deterministic bucketing (same 30-min bucket, different buckets across windows), 5-day rejection, all-invalid handling, and 200-event batch chunk-boundary regression (BATCH_CONCURRENCY=50).
Worker tests updates for lock behavior and event contract
apps/worker/src/jobs/events.incoming-events.test.ts
Add getLock mock (defaulting to lock-acquired, per-test overridable). Remove isTimestampFromThePast from all event payloads. Adjust referrer fields to empty strings. Add lock-not-acquired regression test (live event created, sessionEnd scheduled, session_start skipped) and historical-event test (API-computed IDs preserved, session_start emitted, sessionEnd not scheduled).

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Poem

A rabbit counts the buffered streams, 🐇
Five-day packets, batched-up dreams,
One eventMs to bind the past,
One lock to make the session last,
Hops, queues, and timestamps stitched at last.

🚥 Pre-merge checks | ✅ 4 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 25.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (4 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The PR title clearly and concisely describes the main change: adding bulk event ingestion via POST /track/batch for offline-first SDKs, which is the primary objective.
Linked Issues check ✅ Passed The changes comprehensively implement all coding requirements from issue #373: batch endpoint accepting up to 2000 events, per-event timestamp-based session derivation, per-item validation with 202 response, 5-day window enforcement, same handlers as single-event /track, Redis-backed session_start deduplication, historical event isolation, and bounded concurrency.
Out of Scope Changes check ✅ Passed All changes are directly aligned with issue #373 objectives. No idempotent deduplication, compression, or SDK changes were introduced; scope matches the linked issue's stated boundaries.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
🧪 Generate unit tests (beta)
  • Create PR with unit tests

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 ESLint

If the error stems from missing dependencies, add them to the package.json file. For unrecoverable errors (e.g., due to private dependencies), disable the tool in the CodeRabbit configuration.

ESLint skipped: no ESLint configuration detected in root package.json. To enable, add eslint to devDependencies.


Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
apps/worker/src/jobs/events.incoming-events.test.ts (1)

553-596: 💤 Low value

Consider adding test coverage for historical events when lock is not acquired.

This test verifies historical event handling when the lock is acquired (session_start emitted). Consider adding a companion test where getLock returns false to verify that:

  1. session_start is skipped
  2. The event itself is still created
  3. sessionEnd is still not scheduled (historical path)

This would complete coverage of the historical event path.

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@apps/worker/src/jobs/events.incoming-events.test.ts` around lines 553 - 596,
Add a new test alongside the existing historical-event test that simulates the
lock NOT being acquired by mocking getLock to return false; call incomingEvent
with the same historical payload and assert that createEvent is still called for
the event (but there is no 'session_start' createEvent call), sessionsQueue.add
is not called (no sessionEnd scheduled), and that the event createEvent call
preserves deviceId/sessionId; reference the incomingEvent handler, getLock mock,
createEvent mock, and sessionsQueue.add to locate and update the test harness.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@apps/worker/src/jobs/events.incoming-event.ts`:
- Around line 217-219: The referrer fields (enrichment.referrer,
enrichment.referrer_name, enrichment.referrer_type) use "?? undefined" which can
yield undefined instead of falling back to the base event; change them to use
the same fallback pattern as the other fields by replacing "enrichment.referrer
?? undefined" with "enrichment.referrer ?? baseEvent.referrer" and likewise
"enrichment.referrer_name ?? baseEvent.referrer_name" and
"enrichment.referrer_type ?? baseEvent.referrer_type" so string-type
expectations are preserved.

---

Nitpick comments:
In `@apps/worker/src/jobs/events.incoming-events.test.ts`:
- Around line 553-596: Add a new test alongside the existing historical-event
test that simulates the lock NOT being acquired by mocking getLock to return
false; call incomingEvent with the same historical payload and assert that
createEvent is still called for the event (but there is no 'session_start'
createEvent call), sessionsQueue.add is not called (no sessionEnd scheduled),
and that the event createEvent call preserves deviceId/sessionId; reference the
incomingEvent handler, getLock mock, createEvent mock, and sessionsQueue.add to
locate and update the test harness.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: defaults

Review profile: CHILL

Plan: Pro

Run ID: 91718804-8af2-4a6e-9663-17127061acf7

📥 Commits

Reviewing files that changed from the base of the PR and between c470b8b and 4a8a8c4.

📒 Files selected for processing (9)
  • apps/api/src/controllers/event.controller.ts
  • apps/api/src/controllers/track.controller.ts
  • apps/api/src/routes/track-batch.router.test.ts
  • apps/api/src/routes/track.router.ts
  • apps/api/src/utils/ids.ts
  • apps/worker/src/jobs/events.incoming-event.ts
  • apps/worker/src/jobs/events.incoming-events.test.ts
  • packages/queue/src/queues.ts
  • packages/validation/src/track.validation.ts
💤 Files with no reviewable changes (1)
  • packages/queue/src/queues.ts

Comment thread apps/worker/src/jobs/events.incoming-event.ts Outdated
mraj602 and others added 2 commits May 28, 2026 05:19
Tests use toStrictEqual, so the new __syncedAt property in event
properties needs to be included in assertions. Uses expect.any(String)
since the exact ISO timestamp varies per run.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…hment

When session enrichment has null referrer fields, fall back to
baseEvent values instead of undefined. Consistent with all other
enrichment fields (path, os, browser, geo, etc.).

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Bulk event ingestion (POST /track/batch) for offline-first SDKs — IoT, mobile, edge

2 participants