Skip to content

central forward sink has no cursor — re-reads & re-sends the whole dataset every tick #122

Description

@bgmcmullen

Problem

forwardPartition reads the entire partition every tick and re-streams all of it — no
cursor/watermark to skip already-forwarded rows:

// hypaware-core/plugins-workspace/central/src/sink.js:238
for await (const row of storage.readRows(tablePath)) {  }   // whole table, every tick

Correctness leans entirely on the server's idempotency ledger (X-Hyp-Batch-Id, :266) to drop
re-sent chunks — but the client still reads and transmits everything each run. Cumulative
cost is O(N²) while data grows (tick k reads ≈ k·c rows: 1+2+…+K ≈ K²/2); with retention
it plateaus to "re-send the whole window every tick" — still wasteful by ~retention-window-many
re-sends per row vs the ideal once.

The fix already exists in the local sink

The iceberg table-format sink is already incremental — markerSubsumedBySnapshot / snapshot
ancestry (format-iceberg/src/table-format.js, state.js) skips already-exported data (O(N)
amortized). The forward sink just never got the equivalent. (A plain-Parquet blob sink has
the same gap — the blob path in src/core/sinks/materialize.js also does full-table reads — so
a shared fix should cover request + blob sinks.)

Proposed fix

Persist a per-(sink instance, partition) forward watermark under the sink's plugin state
dir; advance it after a successful forward (client already gets per-chunk 202 acks); next tick
read only rows after it (extend storage.readRows with a since/offset — the cache already
tracks epochs/offsets). Keep the ledger as the in-flight retry safety net.

Acceptance

  • Tick with no new rows transmits ~0 bytes (currently: re-sends everything).
  • Tick after N new rows reads/sends ≈ N rows, independent of total size.
  • Exactly-once preserved (idempotent batch-ids still cover mid-batch retries).

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions