Problem
forwardPartition reads the entire partition every tick and re-streams all of it — no
cursor/watermark to skip already-forwarded rows:
// hypaware-core/plugins-workspace/central/src/sink.js:238
for await (const row of storage.readRows(tablePath)) { … } // whole table, every tick
Correctness leans entirely on the server's idempotency ledger (X-Hyp-Batch-Id, :266) to drop
re-sent chunks — but the client still reads and transmits everything each run. Cumulative
cost is O(N²) while data grows (tick k reads ≈ k·c rows: 1+2+…+K ≈ K²/2); with retention
it plateaus to "re-send the whole window every tick" — still wasteful by ~retention-window-many
re-sends per row vs the ideal once.
The fix already exists in the local sink
The iceberg table-format sink is already incremental — markerSubsumedBySnapshot / snapshot
ancestry (format-iceberg/src/table-format.js, state.js) skips already-exported data (O(N)
amortized). The forward sink just never got the equivalent. (A plain-Parquet blob sink has
the same gap — the blob path in src/core/sinks/materialize.js also does full-table reads — so
a shared fix should cover request + blob sinks.)
Proposed fix
Persist a per-(sink instance, partition) forward watermark under the sink's plugin state
dir; advance it after a successful forward (client already gets per-chunk 202 acks); next tick
read only rows after it (extend storage.readRows with a since/offset — the cache already
tracks epochs/offsets). Keep the ledger as the in-flight retry safety net.
Acceptance
- Tick with no new rows transmits ~0 bytes (currently: re-sends everything).
- Tick after N new rows reads/sends ≈ N rows, independent of total size.
- Exactly-once preserved (idempotent batch-ids still cover mid-batch retries).
Problem
forwardPartitionreads the entire partition every tick and re-streams all of it — nocursor/watermark to skip already-forwarded rows:
Correctness leans entirely on the server's idempotency ledger (
X-Hyp-Batch-Id,:266) to dropre-sent chunks — but the client still reads and transmits everything each run. Cumulative
cost is O(N²) while data grows (tick k reads ≈ k·c rows: 1+2+…+K ≈ K²/2); with retention
it plateaus to "re-send the whole window every tick" — still wasteful by ~retention-window-many
re-sends per row vs the ideal once.
The fix already exists in the local sink
The iceberg table-format sink is already incremental —
markerSubsumedBySnapshot/ snapshotancestry (
format-iceberg/src/table-format.js,state.js) skips already-exported data (O(N)amortized). The forward sink just never got the equivalent. (A plain-Parquet blob sink has
the same gap — the blob path in
src/core/sinks/materialize.jsalso does full-table reads — soa shared fix should cover request + blob sinks.)
Proposed fix
Persist a per-
(sink instance, partition)forward watermark under the sink's plugin statedir; advance it after a successful forward (client already gets per-chunk
202acks); next tickread only rows after it (extend
storage.readRowswith asince/offset — the cache alreadytracks epochs/offsets). Keep the ledger as the in-flight retry safety net.
Acceptance