Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
9946e6e
Design: incremental sink reads (LLP 0040)
philcunliffe Jun 25, 2026
f1cd3bb
Plan: incremental sink reads (LLP 0042)
philcunliffe Jun 25, 2026
ecc577a
T1: stamp internal monotonic _hyp_ingest_seq at the decorateRow flush…
philcunliffe Jun 25, 2026
ca8c4ee
Merge remote-tracking branch 'origin/task/incremental-sink-reads/T1' …
philcunliffe Jun 25, 2026
20bf2c7
T2: extend storage read contract with cursor-aware incremental reads
philcunliffe Jun 25, 2026
7db714e
Merge remote-tracking branch 'origin/task/incremental-sink-reads/T2' …
philcunliffe Jun 25, 2026
30e97c0
T3: persisted per-(sink,partition) watermark store keyed by logical path
philcunliffe Jun 25, 2026
88d6841
Merge remote-tracking branch 'origin/task/incremental-sink-reads/T3' …
philcunliffe Jun 25, 2026
20e574c
Wire central forward sink to incremental readRowsSince watermark
philcunliffe Jun 25, 2026
0bb28ab
T5: wire core blob sink (local-fs + s3) to incremental readRowsSince
philcunliffe Jun 26, 2026
a3f0808
Merge remote-tracking branch 'origin/task/incremental-sink-reads/T4' …
philcunliffe Jun 26, 2026
4e9f386
Merge remote-tracking branch 'origin/task/incremental-sink-reads/T5' …
philcunliffe Jun 26, 2026
c337f49
T6: exactly-once acceptance suite + smoke for incremental sink reads
philcunliffe Jun 26, 2026
35d77bd
Merge remote-tracking branch 'origin/task/incremental-sink-reads/T6' …
philcunliffe Jun 26, 2026
1141f3d
fix(central-forward): keep chunk batch-id stable across watermark adv…
philcunliffe Jun 26, 2026
7819992
fix(incremental-sink-reads): close exactly-once holes in forward+blob…
philcunliffe Jun 26, 2026
9cab552
Merge remote-tracking branch 'origin/master' into HEAD
philcunliffe Jun 29, 2026
c94e6b6
Fix typecheck: root-anchor type-module imports in feature .js files
philcunliffe Jun 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 44 additions & 1 deletion collectivus-plugin-kernel-types.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1109,6 +1109,36 @@ export interface QueryScope {
limit: number
}

/**
* Opaque, versioned continuation token marking a sink's incremental-read
* watermark: the highest `_hyp_ingest_seq` a `(sink instance, partition)` has
* durably exported. `seq` is an int64 encoded as a decimal string to dodge
* bigint/JSON precision hazards. Opaque + versioned so the underlying watermark
* mechanism can change without invalidating persisted watermarks. See LLP 0040 §2.
*/
export interface SinkContinuation {
v: 1
seq: string
}

/** Options for the back-compatible incremental extension to `readRows`. */
export interface ReadRowsOptions {
/**
* Yield only rows newer than this watermark (`_hyp_ingest_seq > since.seq`).
* Absent ⇒ full scan (today's behaviour).
*/
since?: SinkContinuation
/**
* Disposition of pre-upgrade null-seq "legacy" rows when `since` is set.
* `true` (default) treats them as new (one-time backlog export); `false`
* treats them as already-exported (skip). A sink passes `false` once it has a
* durable watermark, so the legacy backlog re-exports exactly once instead of
* on every tick (LLP 0040 §6 risk #1). No new null-seq row can appear
* post-upgrade, so excluding them after the first export never skips live data.
*/
includeLegacy?: boolean
}

/**
* Intrinsic storage service exposed by core to plugins that materialize
* rows into the local Iceberg-backed cache. Plugins do not configure
Expand All @@ -1132,7 +1162,20 @@ export interface QueryStorageService {
discoverCachePartitions(scope?: Partial<QueryScope>): Promise<CachePartitionMeta[]>
tableExists(tablePath: string): boolean
tableUrl(tablePath: string): string
readRows(tablePath: string, columns?: string[]): AsyncIterable<Record<string, unknown>>
readRows(tablePath: string, columns?: string[], opts?: ReadRowsOptions): AsyncIterable<Record<string, unknown>>
/**
* Cursor-aware sibling of `readRows` for sinks that must advance a
* per-(sink instance, partition) watermark. Pairs each internal-stripped row
* with the `after` continuation to persist ONCE that row is durably exported.
* The internal `_hyp_ingest_seq` never reaches the row payload — it is read to
* derive `after`, then stripped. `after` is a monotonic high-water mark, so a
* null-seq legacy row carries the prior watermark forward unchanged. See
* LLP 0040 §2.
*/
readRowsSince(
tablePath: string,
opts: { since?: SinkContinuation; columns?: string[]; includeLegacy?: boolean },
): AsyncIterable<{ row: Record<string, unknown>; after: SinkContinuation }>
}

export interface CachePartitionMeta {
Expand Down
12 changes: 12 additions & 0 deletions hypaware-core/plugins-workspace/central/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import path from 'node:path'

import { createInstanceWatermarkStore } from '../../../src/core/sinks/incremental.js'

import { validateCentralConfig } from './src/config.js'
import { createConfigPullLoop } from './src/config_client.js'
import { IdentityClient } from './src/identity_client.js'
Expand Down Expand Up @@ -60,11 +62,21 @@ export async function activate(ctx) {
hyp_identity_source: source,
})

// Per-(sink instance, partition) incremental-read watermarks. The plugin
// `stateDir` is per-PLUGIN, so two `@hypaware/central` instances would
// share — and clobber — one watermark file and skip each other's rows;
// `createInstanceWatermarkStore` namespaces by the instance name, matching
// local-fs/s3. Each forward instance then reads only rows added since its
// own last successful export.
// @ref LLP 0040#watermark-contract [implements] — one watermark per (sink instance, partition), scoped by instance name
const watermarks = createInstanceWatermarkStore({ paths: sinkCtx.paths, instanceName: sinkCtx.name })

const sink = createForwardSink({
config,
identityClient,
query,
storage,
watermarks,
log: sinkCtx.log,
})

Expand Down
141 changes: 117 additions & 24 deletions hypaware-core/plugins-workspace/central/src/sink.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@ import { createHash } from 'node:crypto'
import { RETRY_BACKOFF_SECONDS, parseRetryAfter, abortableSleep } from './backoff.js'

/**
* @import { ExportBatch, ExportOptions, ExportResult, PluginLogger, QueryPartition, QueryRegistry, QueryStorageService, Sink } from '../../../../collectivus-plugin-kernel-types.js'
* @import { ExportBatch, ExportOptions, ExportResult, PluginLogger, QueryPartition, QueryRegistry, QueryStorageService, Sink, SinkContinuation } from '../../../../collectivus-plugin-kernel-types.js'
* @import { SinkWatermarkKey, SinkWatermarkStore } from '../../../../src/core/sinks/types.js'
* @import { IdentityClient } from './identity_client.js'
* @import { CentralSinkConfig } from './types.js'
*/
Expand Down Expand Up @@ -47,14 +48,15 @@ const MAX_CHUNK_BYTES = 4 * 1024 * 1024
* identityClient: IdentityClient,
* query: QueryRegistry,
* storage: QueryStorageService,
* watermarks: SinkWatermarkStore,
* log: PluginLogger,
* fetchFn?: typeof fetch,
* sleepFn?: (ms: number, signal?: AbortSignal) => Promise<void>,
* }} args
* @returns {Sink}
*/
export function createForwardSink(args) {
const { config, identityClient, query, storage, log } = args
const { config, identityClient, query, storage, watermarks, log } = args
const fetchFn = args.fetchFn ?? fetch
// Injectable so tests drive backpressure pacing without real waits.
const sleepFn = args.sleepFn ?? abortableSleep
Expand Down Expand Up @@ -90,7 +92,7 @@ export function createForwardSink(args) {
const signal = signalForPartition(query, partition)
try {
bytesWritten += await forwardPartition({
partition, signal, config, identityClient, storage, fetchFn, log,
partition, signal, config, identityClient, storage, watermarks, fetchFn, log,
abortSignal: abortController.signal, sleepFn,
})
partitionsExported += 1
Expand Down Expand Up @@ -158,31 +160,43 @@ function signalForPartition(query, partition) {

/**
* Stream one partition's rows to `/v1/ingest/{signal}` in bounded
* chunks, never materializing the whole table. Each chunk POSTs with an
* `X-Hyp-Batch-Id` derived from the signal, the partition identity, the
* chunk's position, and its bytes (see {@link batchIdForChunk}): stable
* across retries of that exact chunk, yet distinct for any other chunk,
* so two byte-identical chunks never collide. When the driver re-hands a
* partition after a transport failure, re-streaming reproduces the same
* chunk boundaries, so the unchanged prefix chunks hash to the same ids
* and the server's idempotency ledger (server LLP 0001) acks them `202`
* without re-storing. A partial-then-retried partition thus converges to
* exactly-once instead of duplicating every already-delivered row.
* chunks, never materializing the whole table. Only rows added since the
* last durable export are read: the `(sink instance, partition)`
* watermark is loaded up front and handed to `readRowsSince({ since })`,
* so a tick with no new rows reads zero rows and sends zero chunks. Each
* chunk POSTs with an `X-Hyp-Batch-Id` derived from the signal, the
* partition identity, the chunk's position, and its bytes (see
* {@link batchIdForChunk}): stable across retries of that exact chunk,
* yet distinct for any other chunk — so two byte-identical chunks never
* collide. When the driver re-hands a partition after a transport
* failure, re-streaming from the same watermark reproduces the same chunk
* boundaries, so the unchanged prefix chunks hash to the same ids and the
* server's idempotency ledger (server LLP 0001) acks them `202` without
* re-storing. The watermark advances ONCE, after the whole partition's chunks
* are acked (ship first, advance second), to the partition's high-water `after`
* — never mid-partition. A partial partition (an early chunk acked, a later one
* failed) therefore never checkpoints, so a crash/failure re-reads the whole
* partition next tick and the server ledger dedupes the already-acked prefix.
* Mid-partition advance is unsafe because the scan is NOT seq-ordered (LLP 0040
* §4 risk #3): `after` is a running max, so a chunk that physically precedes a
* lower-seq chunk would advance the watermark past rows still un-acked in a
* later chunk, silently skipping them forever on a between-chunk failure.
*
* @param {{
* partition: QueryPartition,
* signal: string,
* config: CentralSinkConfig,
* identityClient: IdentityClient,
* storage: QueryStorageService,
* watermarks: SinkWatermarkStore,
* fetchFn: typeof fetch,
* log: PluginLogger,
* abortSignal: AbortSignal,
* sleepFn: (ms: number, signal?: AbortSignal) => Promise<void>,
* }} args
* @returns {Promise<number>} bytes successfully POSTed for this partition
*/
async function forwardPartition({ partition, signal, config, identityClient, storage, fetchFn, log, abortSignal, sleepFn }) {
async function forwardPartition({ partition, signal, config, identityClient, storage, watermarks, fetchFn, log, abortSignal, sleepFn }) {
if (!KNOWN_SIGNALS.has(signal)) {
throw new Error(`central.forward: unknown signal '${signal}' (expected logs|traces|metrics|proxy)`)
}
Expand All @@ -193,18 +207,62 @@ async function forwardPartition({ partition, signal, config, identityClient, sto
const tablePath = partition.tablePath
await flushPartition(storage, tablePath, 'sink_export')

// @ref LLP 0040#watermark-contract [implements] — load the per-(sink instance, partition) watermark so this tick reads only rows added since the last durable export; a missing/unreadable watermark reads from the start (at-least-once + server dedup), never a silent skip.
/** @type {SinkContinuation | undefined} */
let since
/** @type {SinkWatermarkKey | undefined} */
let watermarkKey
let exportedRowCount = 0
try {
watermarkKey = watermarks.keyFor(storage.cacheRoot, tablePath)
const record = await watermarks.read(watermarkKey)
since = record?.continuation
exportedRowCount = record?.exportedRowCount ?? 0
} catch (err) {
// An underivable key or unreadable watermark must not wedge the sink:
// fall back to a full scan (the server ledger dedupes the redelivery)
// and skip watermark writes for this partition this tick.
watermarkKey = undefined
since = undefined
exportedRowCount = 0
log.warn('central.forward.watermark_read_failed', {
hyp_dataset: partition.dataset,
message: err instanceof Error ? err.message : String(err),
})
}

let bytesWritten = 0
let chunkIndex = 0
// Rows acked across THIS partition's chunks. Accumulated as each chunk POSTs
// so the single end-of-partition watermark write carries an accurate count.
let shippedRowCount = 0
/** @type {string[]} */
let lines = []
let pendingBytes = 0
// `after` token of the most recently buffered row; after the loop it is the
// partition's high-water `after`, the watermark to persist once every chunk
// is acked.
/** @type {SinkContinuation | undefined} */
let lastAfter
// The seq this chunk starts AFTER — the `since` watermark for the first
// chunk, then the previous chunk's last `after` seq. The idempotency key is
// derived from THIS (not the per-tick `chunkIndex`) so a chunk's id is stable
// across watermark advances: once an earlier chunk is acked and the watermark
// moves, a respool re-reads the un-acked suffix from that same watermark, the
// re-streamed chunk reproduces the same `[startSeq, body]`, and the server
// ledger dedupes the redelivery. Keying on `chunkIndex` would re-number the
// suffix from 0 and mint a NEW id for an already-committed-but-unacked chunk,
// double-storing it on the server.
let chunkStartSeq = since?.seq ?? '0'

const flushChunk = async () => {
if (lines.length === 0) return
const body = lines.join('\n') + '\n'
const batchId = batchIdForChunk(signal, tablePath, chunkIndex, body)
// @ref LLP 0040#applying-it-to-both-sinks [implements] — stable per-chunk batch id keyed by the chunk's start seq, so a post-watermark-advance respool reproduces the same id and the server ledger dedupes.
const batchId = batchIdForChunk(signal, tablePath, chunkStartSeq, body)
const bytes = Buffer.byteLength(body, 'utf8')
const rows = lines.length
const after = lastAfter
try {
await postNdjson({
centralUrl: config.url, signal, body, batchId, identityClient, fetchFn, log, abortSignal, sleepFn,
Expand All @@ -231,13 +289,24 @@ async function forwardPartition({ partition, signal, config, identityClient, sto
})
bytesWritten += bytes
chunkIndex += 1
shippedRowCount += rows
// The next chunk starts after this chunk's last row, so its batch id keys
// off this chunk's `after` — keeping ids stable whether a tick streams the
// whole partition or a respool replays only the un-acked suffix.
if (after) chunkStartSeq = after.seq
lines = []
pendingBytes = 0
}

for await (const row of storage.readRows(tablePath)) {
// @ref LLP 0040#storage-api-extension [implements] — pre-upgrade null-seq rows
// are "new" only on a sink with no durable watermark (export the backlog once);
// once a watermark exists they are already shipped, so exclude them and the
// legacy backlog never re-exports every tick (LLP 0040 §6 risk #1).
const includeLegacy = since === undefined
for await (const { row, after } of storage.readRowsSince(tablePath, { since, includeLegacy })) {
const line = JSON.stringify(serializeRow(row))
lines.push(line)
lastAfter = after
// Count UTF-8 bytes (not UTF-16 code units) so the budget bounds the
// actual wire size for multibyte payloads, e.g. CJK `content_text`.
pendingBytes += Buffer.byteLength(line, 'utf8') + 1
Expand All @@ -246,28 +315,52 @@ async function forwardPartition({ partition, signal, config, identityClient, sto
}
}
await flushChunk()

// @ref LLP 0040#watermark-contract [implements] — ship first, advance second,
// but advance ONLY at end-of-partition (like the blob sink). Every chunk is
// acked by the time we reach here (a failed POST throws out of flushChunk
// before this), so persisting the partition's high-water `after` can never
// checkpoint past an un-acked row. A between-chunk failure leaves the
// watermark untouched: the next tick re-reads the whole partition and the
// server ledger dedupes the already-acked prefix. Advancing per chunk to the
// running-max `after` would skip lower-seq rows in a later un-acked chunk
// whenever the scan is not seq-ordered (LLP 0040 §4 risk #3).
if (watermarkKey && lastAfter && shippedRowCount > 0) {
await watermarks.write(watermarkKey, {
continuation: lastAfter,
exportedRowCount: exportedRowCount + shippedRowCount,
})
}
return bytesWritten
}

/**
* Deterministic idempotency key for one chunk. Hashes the signal, the
* partition identity (`tablePath`), the chunk's ordinal position, and
* its exact bytes. Re-streaming a partition reproduces the same chunk
* boundaries and order, so a re-sent chunk hashes to the same id (the
* server dedupes it); two byte-identical chunks at different positions or
* in different partitions get distinct ids and are both stored.
* partition identity (`tablePath`), the seq this chunk starts AFTER, and its
* exact bytes.
*
* Keying on `chunkStartSeq` (the watermark the chunk resumes from) rather than a
* per-tick ordinal is what keeps the id stable across a watermark advance: when
* an earlier chunk is acked the watermark moves, and a respool re-reads only the
* un-acked suffix — which reproduces the same `[startSeq, body]` and so the same
* id, letting the server ledger dedupe a chunk that committed but whose ack was
* lost. (An ordinal would re-number the suffix from 0 and mint a fresh id for an
* already-stored chunk, double-storing it.) Two byte-identical chunks at
* different positions still get distinct ids because a row's `_hyp_ingest_seq`
* is unique, so their start seqs differ; chunks in different partitions differ
* on `tablePath`.
*
* @param {string} signal
* @param {string} tablePath
* @param {number} chunkIndex
* @param {string} chunkStartSeq decimal `_hyp_ingest_seq` the chunk starts after
* @param {string} body
* @returns {string}
*/
function batchIdForChunk(signal, tablePath, chunkIndex, body) {
function batchIdForChunk(signal, tablePath, chunkStartSeq, body) {
return createHash('sha256')
.update(signal).update('\0')
.update(tablePath).update('\0')
.update(String(chunkIndex)).update('\0')
.update(chunkStartSeq).update('\0')
.update(body)
.digest('hex').slice(0, 32)
}
Expand Down
Loading