Skip to content

feat(jobs): durable JobStore backends (file + astra)#58

Merged
erichare merged 1 commit intomainfrom
feat/durable-job-store
Apr 24, 2026
Merged

feat(jobs): durable JobStore backends (file + astra)#58
erichare merged 1 commit intomainfrom
feat/durable-job-store

Conversation

@erichare
Copy link
Copy Markdown
Collaborator

Summary

Async ingest jobs now survive process restart. The `JobStore` backend auto-matches `controlPlane.driver`:

  • `memory` → existing `MemoryJobStore` (unchanged)
  • `file` → new `FileJobStore` at `<controlPlane.root>/jobs.json`
  • `astra` → new `AstraJobStore`, adds one table (`wb_jobs_by_workspace`) to the existing Data API connection

Highlights

  • In-process pub/sub extracted to `src/jobs/subscriptions.ts` so all three backends reuse the listener logic verbatim. SSE subscribers work on every backend today; cross-replica fan-out stays future work.
  • `applyUpdate` shared out of `MemoryJobStore` so all backends agree on patch semantics.
  • Astra wiring reuses the existing `TablesBundle` rather than opening a second connection — `buildControlPlane()` now returns `{ store, astraTables? }` so the jobs factory can consume the bundle. Old `buildControlPlaneStore` / `storeFromConfig` kept as thin back-compat wrappers.
  • New table: `wb_jobs_by_workspace` (partition by workspace, sort by job_id, serialized `result_json` text column — same pattern as `filter_json`). Created with `ifNotExists: true` on boot like every other `wb_*` table.

Tests

  • `tests/jobs/contract.ts` — 8-assertion shared contract covering create→get, scoping, update persistence, serialized `result` round-trip, subscribe replay + fire + unsub, throwing-listener isolation. Run against all three backends.
  • `tests/jobs/memory-store.test.ts` — one-liner contract invocation (was 90 lines; deduplicated).
  • `tests/jobs/file-store.test.ts` — contract + a "second instance over the same root sees prior jobs" case that simulates a restart.
  • `tests/jobs/astra-store.test.ts` — contract against `createFakeTablesBundle()` + nested-`result` serialization test.

Not in scope

In-flight job resume after restart — the record survives but the worker is gone. The operator-facing fix (timeout-based `running` → `failed` flip) and worker-side resume are a separate slice. Cross-replica pub/sub (Redis etc.) also stays future work; the seam is isolated in `JobSubscriptions`.

Test plan

  • `npm test` — 443 passing (was 421; +22 for the contract × 3 backends + 2 backend-specific)
  • `npm run typecheck` — clean
  • `npm run build` — clean
  • `npm run lint` — clean
  • Durability pinned: a fresh `FileJobStore` over the same root surfaces a prior job's full state including a serialized `result` object

First of three PRs in this Phase 2b-tail batch. Astra-native hybrid/rerank + UI catch-up follow.

Adds two durable `JobStore` impls so the async ingest records
survive process restart. The backend is auto-matched to
`controlPlane.driver` — `memory` keeps the existing ephemeral
behavior, `file` writes `<root>/jobs.json` alongside the other
control-plane files, and `astra` reuses the already-open tables
bundle to read/write `wb_jobs_by_workspace`.

## Shared plumbing

- `src/jobs/subscriptions.ts` — in-process pub/sub helper. Every
  backend reuses it; listener management doesn't drift.
- `src/jobs/memory-store.ts` — refactored to use `JobSubscriptions`.
  `applyUpdate()` extracted as the shared patch helper so all three
  backends agree on update semantics.
- `src/jobs/factory.ts` — builds the right impl from
  `ControlPlaneConfig`. Memory/file need no extra deps; astra takes
  the existing `TablesBundle` from `buildControlPlane()` so we don't
  open a second Data API connection.
- `src/control-plane/factory.ts` — new `buildControlPlane()` that
  returns `{ store, astraTables? }`; old `buildControlPlaneStore()`
  and `storeFromConfig()` kept as thin back-compat wrappers. A new
  `controlPlaneFromConfig()` returns the bundle for callers that
  want the tables.

## Astra wiring

- New `wb_jobs_by_workspace` table: partition by workspace, sort by
  `job_id`, nullable `catalog_uid` / `document_uid`, serialized
  `result_json` (same text-column pattern as `filter_json` on saved
  queries).
- `JobRow` row type + in-store converters (kept inside the astra
  store since they're not shared).
- `openAstraClient` creates the table on startup with
  `ifNotExists: true`, same as every other `wb_*` table.

## Tests

- `tests/jobs/contract.ts` — 8-assertion shared contract. Covers
  create→get round-trip, workspace scoping, update→get persists,
  serialized `result` round-trip, subscribe replay + fire + unsub,
  throwing-listener isolation.
- `tests/jobs/memory-store.test.ts` — now a one-liner that runs the
  contract.
- `tests/jobs/file-store.test.ts` — contract + a "second instance
  over the same root sees prior jobs" case to pin durability.
- `tests/jobs/astra-store.test.ts` — contract against the fake tables
  bundle + a nested-result serialization test.

## What didn't change

The `/jobs/{id}` and `/jobs/{id}/events` routes are untouched — they
consume `JobStore`, and every backend satisfies the same interface.
In-flight jobs still don't resume after restart (the worker that
owned them is gone); the record now survives for operator
inspection. Cross-replica pub/sub is the remaining follow-up for a
truly multi-node deployment.

Total: 443 tests (was 421; +22 for the new contract × 3 backends +
two backend-specific tests).
@erichare erichare merged commit 79f7774 into main Apr 24, 2026
9 checks passed
@erichare erichare deleted the feat/durable-job-store branch April 24, 2026 20:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant