Streaming daemon: Phase 2 — hot store, lifecycle + live ingestion (closes #816, #808)#820
Streaming daemon: Phase 2 — hot store, lifecycle + live ingestion (closes #816, #808)#820chowbao wants to merge 55 commits into
Conversation
3d12c9e to
84ff8c2
Compare
df8ec80 to
17b5c39
Compare
84ff8c2 to
419f7ec
Compare
17b5c39 to
145c1cc
Compare
04f9931 to
7f8e58f
Compare
e15575a to
bc56b0a
Compare
7f8e58f to
f3431cd
Compare
bc56b0a to
440443b
Compare
fa0083f to
cbc80ab
Compare
440443b to
c4944fb
Compare
…hanges Rebased onto the updated #820 and propagated #817's API changes into the Phase 2 live-ingestion/daemon layer: - window -> tx-hash index rename + key prefix index: -> txhash_index: (TxHashIndexCoverage.Index, Catalog.txhashIndex), Catalog.Get/Has -> get/has, config sections regrouped (cfg.Retention/Layout/Storage/Ingestion), pins via PinLayout. - daemon.go merge: kept #821's live-ingestion wiring (LifecycleConfig + Core) and deduped the HotProbe line (#821's Phase-2 wiring already set it, so #820's HotProbe fix is redundant here). - removed the #819 cold-only catch-up E2E (TestRunDaemon_CatchUpMaterializes...) + its someTxBackend/oneTxLCMBytes helpers: #821's daemon now requires Boundaries.Core and runs a continuous live loop, so a cold-only "catch up then return" test can't fit — and TestE2E_DaemonLifecycle covers it end to end. Mechanical propagation only; build/vet/test -short green (the heavy lifecycle E2E stays -short-gated).
cbc80ab to
ae91d20
Compare
c4944fb to
aeca6a0
Compare
Rebased the live-ingestion capstone onto the reorganized #820 and propagated: - qualify moved symbols (geometry./catalog.) in daemon.go, startup.go, e2e_test.go - window->tx-hash-index + RetentionGate->RetentionFloor renames; cat.layout->Layout(), cat.Has->public HotState shim, .IndexFilePath->.TxHashIndexFilePath - config regroup: cfg.Streaming.CaptiveCoreConfig -> cfg.Ingestion.CaptiveCoreConfig - restored #821's daemon_test.go (drops the cold-only catch-up test the full daemon supersedes; adds the supervise/backend-tip/boundaries tests) + the HotProbe/Core wiring - avoided the txhash_txhash_index find-replace corruption (was only in the dropped restack) build + vet + go test -short green EXCEPT the lifecycle E2E, whose generated TOML still uses the pre-regroup [streaming]/[backfill] schema (follow-up; per maintainer the stack will be re-rebased).
14aa4c8 to
aafbe0d
Compare
Relocate the one-write protocol ordering helper (mark -> create -> barrier -> flip) from backfill/process.go to catalog_protocol.go, where the protocol's states and mark/flip steps already live, and export it as catalog.OneWrite. processChunk and buildTxhashIndex now call it across the package boundary. It is a zero-dependency pure function and catalog never imports backfill, so there is no import cycle; #820's hot-tier openHotTierForChunk adopts it as the third caller by import alone, with no later relocation. Addresses the #818 review thread that asked to establish the shared helper here rather than deferring the move to #820.
The 10 --new-from-rev findings from the golangci-lint job, all on lines this review round touched: - lll (7): wrap the RunBackfill call (extract startChunk), loopConfig signature, and 5 WriteColdChunk/buildColdIngesters call sites that the added chunkID arg pushed past 120. - misspell: cancelling -> canceling (run()'s errgroup comment). - nolintlint: drop the now-unused //nolint:gosec in lastCompleteChunkAtID (gosec doesn't run on _test.go, so the directive suppressed nothing). - funcorder: move the unexported HotStore.index() below the exported IngestLedgerToBatch (next to the other unexported method, applyLedger). No behavior change. Build+vet+gofmt clean; ingest/eventstore/lifecycle -short green.
…nt:gosec) The index() move and coldDirsAt signature change pulled these lines into the --new-from-rev window on the previous fix's run: - unparam: coldDirsAt's chunk param always receives chunk 0 — add the same //nolint:unparam the sibling packPath/txhashBinPath helpers already carry. - nolintlint x2: the //nolint:gosec on the two eventID := startID + uint32(i) range-index conversions in hot_store.go are unused (this gosec config doesn't flag range-loop indices) — remove them. No behavior change (comment-only in hot_store; test-helper directive). Build+vet+ gofmt clean; ingest/eventstore -short green.
| floor := EffectiveRetentionFloor(through, cfg.RetentionChunks, earliest) | ||
|
|
||
| // Progress gauges: derived last-committed ledger and effective retention floor. | ||
| metrics.LastCommitted(through, floor) |
There was a problem hiding this comment.
The tick's gauge emission regresses the watermark on every mid-chunk restart
The tick re-emits last_committed_ledger as lastChunk.LastLedger(), regressing the gauge below startup's refined watermark on every mid-chunk restart — up to ~10k ledgers, factually false at emission, lasting until the next boundary. Suggest the ingestion loop own that gauge (it knows the true value; a per-ledger gauge set is one atomic store, nothing next to a synced batch) and the tick keep only the floor — which means splitting Metrics.LastCommitted(lc, floor) into two methods. The floor itself doesn't regress in this window (verified: it depends only on the last complete chunk, identical for both watermarks). Same site: the LiveHotChunks scan error is silently swallowed (if ...; herr == nil).
There was a problem hiding this comment.
Fixed in d2b1c12. Split Metrics.LastCommitted(lc, floor) into LastCommitted(lc) + RetentionFloor(floor). The ingestion loop now owns last_committed_ledger (one atomic gauge set per committed ledger — the true, possibly-mid-chunk value); the tick sets only the floor, so it can no longer regress the gauge below the refined watermark on a mid-chunk restart. The LiveHotChunks scan error is now surfaced (the tick returns it) instead of being swallowed.
|
|
||
| hotCommitErrors := prometheus.NewCounter(prometheus.CounterOpts{ | ||
| Namespace: namespace, Subsystem: metricsSubsystem, | ||
| Name: "hot_commit_errors_total", |
There was a problem hiding this comment.
One phase-keyed signal instead of three overlapping families
The hot path carries three overlapping signal families — HotLedgerTotal, the five phase timings, HotItems — and the outcome signal mislabels: hot_commit_errors_total ("failed hot batch commits") increments for any IngestLedger failure including pre-batch decode errors, d spans the whole call while its doc claims batch-only, and the IngestStage doc omits commit. Rather than patching each: one phase-keyed signal carrying duration + outcome per phase — HotLedgerTotal becomes the sum, commit errors become the commit phase's error dimension, decode failures land on extract by construction, and HotItems folds into the per-type write phases' existing items dimension, leaving exactly one family and retiring the "batch" pseudo-type. Two implementation notes: key the phases as a typed enum indexed into an array so an out-of-table emission is unrepresentable (the current map lookups nil-panic, and test recorders don't index, so it'd ship); and the (counts, phases, err) return wants to be one report struct that carries which phase failed. The cold side has the mirror problem: it registers the full 3×4 stage cross-product at construction, four series no code path can emit — enumerate its eight real pairs like hotPhaseKeys.
There was a problem hiding this comment.
Done in 44d66ae. Collapsed the three hot families into ONE phase-keyed signal. hotchunk.IngestLedger now returns a single LedgerReport (per-phase samples + the failed phase) instead of (LedgerCounts, LedgerPhases, error); phases are a typed enum indexed into a fixed-size array, so an out-of-table phase is unrepresentable (no nil-map emit). MetricSink.HotPhase(phase, d, items, err) replaces HotItems + HotLedgerTotal: the per-ledger total is the SUM of the phase durations, a decode failure lands on PhaseExtract and a commit failure on PhaseCommit BY CONSTRUCTION (no more hot_commit_errors_total firing for any IngestLedger failure), and item volume folds onto the write phases. The 'batch' pseudo-type and the hot IngestStage plumbing are retired. PrometheusSink resolves the hot family into a [NumPhases] array; the cold side now enumerates its 8 real (data_type, stage) pairs instead of the 3×4 cross-product. HotService.Ingest emits one HotPhase per phase from the report (phases [0, Failed] on error with zero items, all on success).
There was a problem hiding this comment.
One small inconsistency in the new shape: every failure branch returns before stamping the failed phase's Dur, so the error is emitted with a zero-duration sample — a commit that blocks for 30s and then fails shows the error but not the 30s. RunBackfill's freeze timing states the principle this breaks: "reported even on failure (partial duration is signal)". Stamping time.Since(...) on the failed phase before each error return closes it.
There was a problem hiding this comment.
Good catch — fixed in 3dcd691. Every failure branch now stamps the failed phase's partial Dur (time.Since(...)) before returning: the extract sub-steps stamp PhaseExtract, each queue step stamps its own phase, and a commit failure stamps PhaseCommit (whole-Batch minus the queue steps). So a slow-then-failed phase carries its wall-clock, matching RunBackfill's "reported even on failure".
|
|
||
| // TestIngestLedger_WritesEveryHotType confirms the hot tier always writes all | ||
| // three hot data types; per-type disabling is not a supported hot DB mode. | ||
| func TestIngestLedger_WritesEveryHotType(t *testing.T) { |
There was a problem hiding this comment.
Pin the eventless-transaction txhash coverage
Post-merge, txhash completeness rests entirely on ExtractLedgerEvents yielding event-less transactions (verified in the pinned SDK — one element per applied tx, hash included; and the events frontier advances per-ledger, so zero-event ledgers can't wedge the ordering check). Every hotchunk test uses one-tx-one-event ledgers, so nothing pins it: worth one test ingesting a ledger with {eventful tx, event-less tx}, asserting both hashes land in the txhash CF and only the eventful tx's events in the events CF. An SDK semantic change would otherwise silently gut the txhash index for every classic-only transaction.
There was a problem hiding this comment.
Fixed in 11ed6fa. Added TestIngestLedger_EventlessTxStillIndexesHash: one ledger with {eventful tx, event-less tx}, asserting both hashes land in the txhash CF (counts.Txhash == 2) and only the eventful tx's event lands in the events CF (counts.Events == 1, eventCount == 1). Pins the post-#18 invariant that txhash completeness rests on ExtractLedgerEvents yielding an element per applied tx, event-less included.
| // backend also validates its own output — this is defense-in-depth, a zero-copy | ||
| // header read). A source error, a decode error, or a non-contiguous sequence is | ||
| // yielded as the error element and ends iteration; the view is borrowed. | ||
| func SeqValidatedCursor( |
There was a problem hiding this comment.
Contract-side enforcement: delete the cursor, check where the contract lives
The LedgerStream contract promises in-order delivery; the SDK implementations enforce it structurally, and SeqValidatedCursor re-checks them redundantly. Preference: enforce at the contract side and delete the cursor — with two precisions from tracing the repo impls. packStream is already structurally contiguous (positional reads + coverage check), so only hotLedgerStream needs a check, and it's a key-derived seq-vs-counter comparison, no XDR parsing. While in there, unify the impls' range semantics: packStream self-bounds unbounded ranges while hotLedgerStream errors on them — it can self-bound at MaxCommittedSeq identically. drain keeps its overrun and completeness checks on a local counter; the hot loop's boundary detection works identically on its own counter.
There was a problem hiding this comment.
Done in 79704d4. Deleted SeqValidatedCursor + ValidatedLedger and moved the in-order contract to the source: packStream already reads positionally by key; hotLedgerStream now key-checks its own keyspace (a gap in the sole writer of recent history is a defect — TestSource_RejectsGap) and self-bounds an unbounded range at its committed frontier, mirroring packStream (TestSource_SelfBoundsUnboundedRange); the SDK backends validate their own output. drain and the hot loop each keep their overrun/completeness/boundary logic on a local counter. Net simpler, one contiguity guard per source instead of a redundant re-parse. (LedgerIngester was already deleted in 3cb0e7d.)
| // must copy any bytes it retains. Ledgers are ingested sequentially — the source | ||
| // pulls the next only after Ingest returns — so synchronous consumption inside | ||
| // Ingest is safe. | ||
| type LedgerIngester interface { |
There was a problem hiding this comment.
LedgerIngester can go
LedgerIngester has one consumer (drain) and one implementor reaching it (*ColdService); the ColdIngester seam below already provides testability (the rollback tests fake that layer). drain can take *ColdService directly; the interface, its contract doc, and the cross-reference in ColdIngester's doc go with it. Reintroduce when a second consumer of drain exists. Lands naturally with the cursor change, which rewords the same doc.
There was a problem hiding this comment.
Fixed in 3cb0e7d. Deleted LedgerIngester; drain takes *ColdService directly. The overrun test now fakes the ColdIngester seam (wrapped in a ColdService), which is the layer that provides drain's testability.
| return &HotService{ingesters: ingesters, sink: orNop(sink)} | ||
| // NewHotService builds a HotService that writes ledgers, txhash, and events into | ||
| // the shared per-chunk DB. A nil sink defaults to NopSink. | ||
| func NewHotService(db *hotchunk.DB, sink MetricSink) *HotService { |
There was a problem hiding this comment.
HotService's emission contract has no test
The reworked emission contract has no test — nothing constructs a HotService with a recording sink — and the migration left residue: testSink.hotDataTypes() has zero callers, hotLedgerTotals is counted but never asserted, and the "P1-c" banner heads an empty section. Best written against the reworked sink shape from the metrics comment, so once.
There was a problem hiding this comment.
Done in 44d66ae, written against the reworked sink. TestHotService_EmitsEveryPhaseOnSuccess (every phase emitted once; write phases carry per-type volume, extract/commit none; no phase carries an error) + TestHotService_CommitErrorLandsOnCommitPhase (a closed-DB commit failure surfaces on PhaseCommit by construction, no items on the failure path). Removed the residue in the same change: testSink.hotDataTypes() (zero callers), the counted-but-unasserted hotLedgerTotals, and the empty 'P1-c' banner.
| // the already-opened resume hot DB is closed on the way out, so a restart can | ||
| // reopen it (the rocksdb LOCK is released). ServeReads runs after the hot DB opens | ||
| // and core starts but before the blocking ingestion loop, so run returns here. | ||
| func TestRun_ServeReadsErrorSurfaces(t *testing.T) { |
There was a problem hiding this comment.
Vacuous LOCK assertion + stale getter-era comments
TestRun_ServeReadsErrorSurfaces' LOCK-release assertion is vacuous — the hot DB opens inside the loop, which never starts on that path, so it reopens a DB that was never opened; suggest deleting the assertion (the boundary-close fence is pinned where the loop owns the handle). Same file still narrates the deleted getter API ("ctx-canceled GetLedger error", "parked on the blocking getter"); hotloop_test.go:41 too.
There was a problem hiding this comment.
Fixed in 11ed6fa. Deleted the vacuous LOCK-release assertion in TestRun_ServeReadsErrorSurfaces — ServeReads now runs before the loop opens the hot DB, so the reopen was of a DB that was never opened. Also dropped the stale getter-era comments ('ctx-canceled GetLedger error', 'parked on the blocking getter' in startup_test.go; the per-GetLedger count note in hotloop_test.go).
|
|
||
| // makeHotDir creates the on-disk hot dir for a chunk so deriveWatermark's | ||
| // per-ready-key dir-existence loop sees it present. | ||
| func makeHotDir(t *testing.T, cat *catalog.Catalog, c chunk.ID) { |
There was a problem hiding this comment.
Stale test scaffolding docs
makeHotDir's comment cites a per-ready-key dir-existence loop that was deleted (the refinement opens only the highest ready chunk), and helpers_test.go:119,128 point at wrong files ("startup.go's openHotDBForChunk" lives in hotloop.go; the opener tests are in hotloop_test.go, not ingest_test.go).
There was a problem hiding this comment.
Fixed in d2b1c12 + 11ed6fa. makeHotDir's comment no longer cites the deleted per-ready-key dir loop (it now explains only the highest ready chunk is opened). helpers_test.go file refs corrected: openHotDBForChunk lives in hotloop.go (not startup.go) and the opener tests are in hotloop_test.go (not ingest_test.go).
There was a problem hiding this comment.
New collectors untested + three one-word doc strays
The three collectors this PR added (chunk_boundaries_total, live_hot_chunks, discarded_hot_chunks_total, plus the "discard" phase label) are registered but never recorded+gathered in the test. Three one-word doc strays elsewhere: progress.go:38 "(safe: … before ingestion locks the DB)" — right ordering, wrong reason, read-only opens take no LOCK; hotloop.go:71 still says "fatal"; hotchunk.go's OpenReadOnly says "un-synced WAL" where it means synced-but-unflushed (an unsynced WAL is exactly what a crash loses).
There was a problem hiding this comment.
Fixed. Collector coverage in d2b1c12 (observability_test now records + gathers live_hot_chunks, chunk_boundaries_total, discarded_hot_chunks_total, and the discard phase). The three doc strays in 11ed6fa: progress.go no longer claims the read is safe because it runs 'before ingestion locks the DB' (read-only opens take no LOCK — reworded to say so); hotloop.go 'fatal' → 'won't-open error'; hotchunk.OpenReadOnly 'un-synced WAL' → 'synced-but-unflushed WAL' (with a note that an unsynced tail is exactly what a crash loses).
| e.metrics.observe(time.Since(start), n, ierr) | ||
| if ierr != nil { | ||
| e.failed = true | ||
| e.metrics.emit(0, nil) // an Ingest error abandons the chunk; meter it now (Close no longer emits) |
There was a problem hiding this comment.
Make the terminal-error emission structural
The terminal Ingest-error emission is a hand-maintained observe(err) + emit(0, nil) pair copy-pasted in all three ingesters, enforced by nothing. An Ingest error is always terminal by the ColdIngester contract (verified: no retry path reuses an ingester, and the emitted flag already prevents double-emit even under a contract violation), so coldMetrics.observe can emit itself when err != nil — structural, deletes three copies. While there: Close's doc still claims "a chunk that failed after ingesting still produced one per-ingester signal," which contradicts the deliberate emit-nothing-without-a-terminal-step semantics.
There was a problem hiding this comment.
Fixed in 3cb0e7d. coldMetrics.observe now emits the single per-ingester ColdIngest itself when err != nil (an Ingest error is terminal by the ColdIngester contract), deleting the hand-paired observe+emit(0,nil) copied in all three cold ingesters. Fixed the ColdService.Close doc to state the per-ingester signal comes only from a terminal step (a failed Ingest via observe, or Finalize) — never from Close.
|
|
||
| // ChunkIDOfLedger maps a ledger to its chunk, signed so a sub-genesis ledger | ||
| // yields -1 instead of panicking. | ||
| func ChunkIDOfLedger(ledger uint32) int64 { |
There was a problem hiding this comment.
One home for the signed chunk arithmetic
The signed pre-genesis chunk arithmetic lives in two packages — geometry's LastCompleteChunkAt/ChunkFirstLedger and lifecycle's CompleteThrough/ChunkIDOfLedger — with two -1 sentinels meaning different things, mixed in adjacent startup.go lines exactly at the mid-chunk exclusion. One home in geometry; pure relocation, no signature changes. Bonus: under the mid-chunk precondition, startup.go's ChunkIDOfLedger(lastCommitted)-1 equals geometry.LastCompleteChunkAt(lastCommitted), which erases the mixed-vocabulary site outright.
There was a problem hiding this comment.
Fixed in d2b1c12. Moved CompleteThrough, ChunkIDOfLedger, and the pre-genesis sentinel (now geometry.PreGenesisLedger) into geometry alongside LastCompleteChunkAt/ChunkFirstLedger — one home, one -1 convention. TestCompleteThrough moved with them. And per the bonus: startup.go's mid-chunk exclusion now reads geometry.LastCompleteChunkAt(lastCommitted) directly instead of ChunkIDOfLedger(lastCommitted)-1, erasing the mixed-vocabulary site.
| // a completed chunk (boundary fence + post-backfill seed), so the only guard | ||
| // needed is the empty-range check (floor above lastChunk when retention outran | ||
| // production). | ||
| freezeStart := time.Now() |
There was a problem hiding this comment.
Delete the empty-range Freeze emission
RunBackfill emits the Freeze sample internally; the tick re-implements the emission for the empty-range branch only, with a freezeStart timer that times nothing. Simplest fix: delete the tick's empty-range emission and the timer outright — empty-tick visibility is already carried by the Discard/Prune samples every tick emits.
There was a problem hiding this comment.
Fixed in d2b1c12. Deleted the tick's empty-range Freeze emission and the freezeStart timer; empty-tick visibility is carried by the Discard/Prune samples every tick emits.
There was a problem hiding this comment.
The tip trio wants a type
(File-level comment — the anchor isn't in a diff hunk. The sites: networkTip at startup.go:339, its caller in backfillToTip at startup.go:~171, validateConfig and resolveEarliestFirstStart in config_validate.go, the three StartConfig fields NetworkTip/TipBackoff/TipMaxAttempts, and the assembler in daemon.go startConfig.)
That (NetworkTipBackend, TipBackoff, TipMaxAttempts) triple travels through all five signatures together, and the retry defaults are bound at two independent sites that must agree: daemon.go pre-resolves them for validateConfig, while StartConfig.withDefaults applies them again for run(). A concrete tipSampler with one Sample(ctx) method and defaults in its constructor collapses every signature to one value and gives the policy a single binding site.
There was a problem hiding this comment.
Consolidating the tip threads so these don't read as competing directives — the end state is one change: a tipSampler owning the source list (lake, then history archives — the same source #833 plans for frontfill) and the retry policy, used by both deployment modes. With a robust sampler, the serve-without-tip fallback branch deletes (per the correction on the synthetic-tip thread), and if every source fails the error propagates to supervise. The clump-collapse proposed here and the archive fallback proposed there are the same piece of work.
There was a problem hiding this comment.
Agreed on the tipSampler type (Sample(ctx), defaults in the constructor, one binding site). Folding it into the #833-tied tip rework from the synthetic-tip thread: r3517460304 redirects the tip path to a lake->history-archive fallback, and the sampler is exactly where that fallback belongs — so I'm building it as part of that change rather than as a lake-only type I'd immediately rework.
| return 0, fmt.Errorf("network tip unavailable and no local history to serve: %w", err) | ||
| } | ||
| // Restart with local progress: serve what's below lastCommitted, skip backfill. | ||
| tip = lastCommitted |
There was a problem hiding this comment.
The synthetic tip splices two jobs through one variable
On tip failure with local history, tip = lastCommitted fabricates a synthetic tip that flows through tip-semantics arithmetic — today producing identity/no-op results (max() and the exclusion are inert), so this is structure, not a wrong result: availability policy spliced into pass geometry through a sentinel variable that future edits must know is sometimes fake. A plain loop break is safe: the fallback pass's local repairs are re-run identically by the seeded first tick (same floor, same range — verified), shifting them post-serve, which is already the steady-state design. One behavioral note worth a line in the change: a repair failure then surfaces via the lifecycle errgroup instead of blocking startup — same supervised restart, different timing.
There was a problem hiding this comment.
Correction to the above: rather than restructuring this branch, delete it. The tip can fall back to the history archives (GetRootHAS().CurrentLedger — the URLs are already in the config, and #833 plans exactly this source for frontfill); if both the lake and the archives are unreachable, return the error and let the supervisor retry — at that point the box has no egress and live ingestion is stalled anyway. Archive checkpoint lag (≤64 ledgers, so the archive tip can sit below lastCommitted) is already handled by the existing arithmetic: anchor = max(tip, lastCommitted) guards the range, and withinOneChunkOfTip is signed precisely so a lagging tip reads as near-tip. Residuals: an earliest = "now" first-start pin can land up to 64 ledgers early (chunk-aligned, harmless), and a restart within 64 ledgers of a boundary during a lake outage replays one chunk from core instead of the lake (bounded, self-healing). This deletes the synthetic tip, the fallback pass, and the degraded serve-without-tip mode, and unifies the tip path with #833's.
There was a problem hiding this comment.
Taking r3517460304's correction — delete the degraded serve-without-tip mode and fall the tip back to the history archives (GetRootHAS().CurrentLedger). But the streaming daemon has no history-archive tip source wired yet: resolveNetworkTip is a placeholder pending #772, and the archive URLs aren't threaded into StartConfig. Since #833 plans exactly this source for frontfill, this unifies with #833's tip path and lands there (with the tipSampler from the sibling thread as its home). I reverted my interim break so the tip path stays in its known-good state until then.
| // - "transient" or absent: wipe any leftover dir and create fresh | ||
| // (transient -> fsync dir+parent -> ready), so a crash mid-create can't | ||
| // fabricate a "ready but DB gone" open failure above. | ||
| func openHotDBForChunk(cat *catalog.Catalog, chunkID chunk.ID, logger *supportlog.Entry) (*hotchunk.DB, error) { |
There was a problem hiding this comment.
The ready-open rule is enforced at three sites
The "ready key ⇒ must-exist, never-creating open" rule is enforced at three independent sites — openHotDBForChunk, refineWithHotDB, resolveHotSource — each pairing its own state read with its own open variant and near-duplicate error text. The probe deletion inlined the gate three times. Placement is genuinely constrained (catalog can't construct DBs; hotchunk can't read the catalog), so the unifiable core is a single open-ready helper in hotchunk taking the caller's state read, with the rest belonging to the #824 reshuffle.
There was a problem hiding this comment.
Agreed — a single open-ready helper in hotchunk taking the caller's state read, with openHotDBForChunk / refineWithHotDB / resolveHotSource routed through it. As you note the placement is constrained and the rest belongs to the #824 reshuffle, so I'm landing the helper with #824 to keep it and its call sites moving together.
…iew) - last_committed_ledger gauge: split Metrics.LastCommitted(lc, floor) into LastCommitted(lc) + RetentionFloor(floor). The ingestion loop now owns the last-committed gauge (per-ledger, the true possibly-mid-chunk value); the tick owns only the floor. Fixes the tick regressing the gauge below the refined watermark on every mid-chunk restart (thread 3517302906). - Tick: compute earliest + retention gate ONCE and pass into the discard/prune scans (was re-derived per scan); pendingArtifacts takes the caller's coverage (no double read); add RetentionFloor.FirstChunk() as the one floor->chunk boundary shared by plan/prune/backfill; surface the LiveHotChunks scan error instead of swallowing it; delete the empty-range Freeze emission + dead timer (threads 3511860199, 3517303403). - LastCommittedLedger: delete the nil-logger positional mode (zero prod callers after the clamp deletion); logger is required; delete the three `if logger != nil` tick guards (thread 3517302730). Positional-term unit tests seed real empty hot DBs. - Move the signed pre-genesis chunk arithmetic (CompleteThrough, ChunkIDOfLedger, PreGenesisLedger) into geometry alongside LastCompleteChunkAt/ChunkFirstLedger — one -1 convention, one home; startup's mid-chunk exclusion now reads geometry.LastCompleteChunkAt directly (thread 3517303377). - observability_test: cover the previously-untested collectors (live_hot_chunks, chunk_boundaries_total, discarded_hot_chunks_total, discard phase) (3517303282).
- Delete the LedgerIngester interface: it had one consumer (drain) and one implementor reaching it (*ColdService); drain takes *ColdService directly. The ColdIngester seam already provides drain's testability (the overrun test now fakes that layer) (thread 3517303073). - Terminal-error emission is now structural: coldMetrics.observe emits the single per-ingester ColdIngest itself when err != nil (an Ingest error is terminal by the ColdIngester contract), deleting the hand-paired observe+emit(0,nil) copied in all three cold ingesters. Fix the ColdService.Close doc to match the emit-only-on-a-terminal-step semantics (thread 3517303336). - Delete the ChunkID() accessor + chunkID field + ctor param from the two thin hot facades (ledger.HotStore, txhash.HotStore): the accessor had no production caller and the "driver can reject a mismatched store" binding it documented was never implemented. eventstore.HotStore keeps its chunkID (load-bearing for range checks + error messages) (thread 3511861630).
…iew) Tests: - hotchunk: TestIngestLedger_EventlessTxStillIndexesHash — a ledger with an eventful + an event-less tx; both hashes land in the txhash CF, only the eventful tx's event in the events CF. Pins the post-#18 invariant that txhash completeness rests on ExtractLedgerEvents yielding an element per applied tx, event-less included (thread 3517303006). - rocksdb: TestNew_MustExist_EmptyReadyDBReopens (must-exist reopen of an empty ready DB succeeds) + TestNew_MustExist_GuttedDirFailsOpen (a gutted dir fails the open, never auto-heals) — the two MustExist pins at the rocksdb level (threads 3516017921/3517306778). Docs/comments: - Delete the vacuous LOCK-release assertion in TestRun_ServeReadsErrorSurfaces (ServeReads runs before the loop opens the hot DB) + stale getter-era comments in startup_test.go / hotloop_test.go (thread 3517303157). - Doc strays: progress.go read-only-open-takes-no-LOCK; hotloop.go "fatal" -> "won't-open error"; hotchunk.OpenReadOnly "un-synced" -> "synced-but-unflushed" WAL; helpers_test.go file refs (hotloop.go / hotloop_test.go) (3517303282/3517303185).
|
|
||
| // Begin serving reads (injected) BEFORE launching the loops; it must return | ||
| // promptly (launch, not block). | ||
| if err := cfg.ServeReads(ctx); err != nil { |
There was a problem hiding this comment.
The design's startup order was: open the hot DB, start core, then serve — a broken deployment fails before it ever serves anything. The current shape (serve first, DB open and core start deferred into the loop) fell out of two earlier review fixes rather than a decision: the leak fix moved the resume open into the loop, and the stream swap made core start on the first pull. Both are restorable without losing what those fixes bought:
run()opens the resume hot DB beforeServeReadsand hands it to the loop, whose first statement is the deferred close. The leak fix's content was the defer's position ahead of any early return, not which function calls Open; restarts re-enterrun()from the top, so there's still one initial-open site, and the boundary reopen stays as is.OpenCorestarts the core process (or the stream grows an explicitStart) beforeServeReads;run()holds adefer cancel()so an error return after core start still tears it down. Lazy start was a side effect of keeping the stream closer-free, not a requirement.
With this order a misconfigured deployment fails at startup instead of serving while the loop restarts behind it. Whether that failure should also exit rather than cycle under supervise is a separate restart-policy question.
There was a problem hiding this comment.
Agree the design's order (open hot DB + start core, THEN serve) is the better failure shape — a misconfigured deployment should fail before it serves, not serve while the loop restarts behind it. Flagging the cost before I reverse it, since it undoes two prior review fixes: (1) the resume-open moved INTO the loop specifically for leak-safety (open + deferred close adjacent, no cross-call ownership gap) — restoring the open to run() reintroduces that gap, so run() needs a defer-with-disarm that closes the DB until the loop takes ownership; (2) #17 made the stream own core lifecycle (lazy start, closer-free) — eager start needs OpenCore (or an explicit stream.Start) plus a run()-held defer cancel(). Both are doable exactly as you describe; I want to confirm we're OK re-introducing the run()-scoped ownership/teardown on the crash-safety path (vs the current loop-owns-everything shape) before landing it. Will do it next round once confirmed.
There was a problem hiding this comment.
Done (part A) in e7eb46d. run() now opens the resume hot DB before ServeReads and hands the handle to the loop; loopOwnsDB flips at the errgroup launch so the loop's deferred close owns it from then on (g.Wait joins before run returns — no window where neither owns it), and an error between the open and the launch returns via run()'s defer. Leak-safety preserved (deferred close still ahead of any early return); restarts re-enter run() from the top so it stays the single initial-open site. A broken hot tier now fails startup instead of serving behind a crash-looping loop — TestRun_OpensHotDBAndCoreBeforeServe asserts ServeReads sees the resume chunk already "ready", and TestRun_ServeReadsErrorSurfaces regains a meaningful reopen assertion.
Part B (eager core start before serve): this one isn't a fullhistory-side change. The SDK LedgerStream is RawLedgers-only and deliberately starts core on the first pull — "no PrepareRange/GetLedger/Close for the consumer to sequence" — so there's no Start hook to call before ServeReads. It lands if the SDK stream grows an explicit Start, or with #772's real core wiring where an eager PrepareRange would live. OpenCore is already invoked before ServeReads, so a construction-time misconfig still surfaces pre-serve.
…amily (#3517302950)
Replaces the three overlapping hot signal families (HotLedgerTotal, the five
IngestStage phase timings, HotItems) with ONE phase-keyed family, and fixes the
mislabels the old shape carried.
- hotchunk.IngestLedger now returns a single LedgerReport (per-phase samples +
the phase that failed) instead of (LedgerCounts, LedgerPhases, error). Phases
are a typed enum (Phase) indexed into a fixed-size array, so an out-of-table
phase is unrepresentable.
- MetricSink: HotItems + HotLedgerTotal collapse into HotPhase(phase, d, items,
err). The per-ledger total is the SUM of the phase durations; a decode failure
lands on PhaseExtract and a commit failure on PhaseCommit BY CONSTRUCTION (no
more hot_commit_errors_total incrementing for any IngestLedger failure, no more
whole-call duration doc'd as batch-only). Item volume folds onto the write
phases. The "batch" pseudo-type and the hot IngestStage plumbing are retired.
- PrometheusSink: one hot family (hot_phase_{duration_seconds,items_total,
errors_total}{phase}) resolved into a [NumPhases] array (no nil-map emit). The
cold side now enumerates its 8 real (data_type, stage) pairs instead of
registering the 3×4 cross-product (4 series nothing emits). IngestStage is
cold-only (tier label dropped).
- HotService.Ingest emits one HotPhase per phase from the report: phases [0,
Failed] on error (with zero items — nothing landed), all phases on success.
Tests: TestHotService_EmitsEveryPhaseOnSuccess +
TestHotService_CommitErrorLandsOnCommitPhase (the emission contract had none);
removed the dead testSink residue (hotDataTypes, hotLedgerTotals, the empty P1-c
banner). (#3517303118)
| // runOps runs each op in order, returning the first error. It checks ctx between | ||
| // ops so a shutdown mid-scan stops promptly without starting the next storage op; | ||
| // the ctx error is surfaced up through Loop for supervise to classify as clean. | ||
| func runOps(ctx context.Context, ops []func() error) error { |
There was a problem hiding this comment.
The design had a small retry around each lifecycle op: a failed freeze/discard/prune was retried a few times with a short pause before giving up (its config carried 3 attempts / 5s backoff). That got lost when the errgroup join we suggested earlier went in — the retry didn't carry across. As it stands, one transient failure in a sweep (a busy file, a slow fsync) cancels ingestion through the shared errgroup, run() returns, and supervise restarts the whole daemon — killing and relaunching captive core, which then has to catch up again. That's a disproportionate response to a retryable file operation.
Two ways to settle it: put the retry back here in runOps (a few attempts with a fixed pause; the ops are idempotent, so re-running one is always safe), or decide restart-as-retry is acceptable and the design doc gets updated to match. I'd put the retry back — core relaunch isn't free, and these failure modes are exactly the transient kind retries exist for.
There was a problem hiding this comment.
Fixed in 3dcd691 — put the retry back in runOps. Each discard/prune op now runs under a bounded constant backoff (default 3 attempts / 5s, ctx-abortable) before the error propagates, so a transient sweep failure retries in place instead of canceling ingestion through the errgroup and forcing a captive-core relaunch. Config carries OpRetryAttempts/OpRetryBackoff (defaults in WithLifecycleDefaults); a zero-value Config runs each op once. Tests: TestRunOps_{RetriesTransientThenSucceeds,GivesUpAfterAttempts,CtxCancelStopsBeforeOp,ZeroConfigRunsOnce}. (Left the freeze/RunBackfill stage as-is — it has its own internal coverage retries; this targets the idempotent file-op sweeps you flagged.)
…e (#3517303043) The LedgerStream contract already promises in-order delivery and the sources enforce it structurally; the shared cursor re-checked it redundantly. - Delete SeqValidatedCursor + ValidatedLedger. - hotLedgerStream (the one source that could be mis-keyed — it wraps the sole writer of recent history) now key-checks its own keyspace (a gap is a defect) and self-bounds an unbounded range at its committed frontier, mirroring packStream. packStream is already positionally contiguous; the SDK backends validate their own output. - drain and the hot ingestion loop each consume the raw stream on a local sequence counter, keeping their overrun / completeness / boundary logic without re-parsing every view's sequence. Tests: hotchunk TestSource_RejectsGap + TestSource_SelfBoundsUnboundedRange pin the relocated guard; the drain per-seq-guard tests (which fed an artificially mis-ordered stream) were deleted with the cursor; the two drain stream-error tests assert the new chunk-scoped wrapping. Full -short suite + non-short E2E (85s) + golangci --new-from-rev all green.
… op retry
Two round-4 follow-ups on this batch's own changes:
- hotchunk.IngestLedger stamped the failed phase's Dur only on the success path,
so an error was emitted with a zero-duration sample (a commit that blocks 30s
then fails showed the error but not the 30s). Stamp time.Since(...) on the
failed phase before every error return — partial duration is signal, matching
RunBackfill's "reported even on failure" (#3517613508).
- Restore the per-op retry the design carried (3 attempts / 5s pause) that was
lost when the lifecycle loop joined run()'s errgroup: a transient discard/prune
failure (busy file, slow fsync) now retries in place instead of canceling
ingestion and forcing a whole-daemon restart (relaunching captive core) for a
retryable, idempotent file op. runOps wraps each op in a bounded constant
backoff (ctx-abortable); Config gains OpRetryAttempts/OpRetryBackoff with
defaults; a zero-value Config runs each op once (#3517561326).
Tests: TestRunOps_{RetriesTransientThenSucceeds,GivesUpAfterAttempts,
CtxCancelStopsBeforeOp,ZeroConfigRunsOnce}. lifecycle + hotchunk -short green;
golangci --new-from-rev clean.
…rt A) Restore the design's startup order for the hot tier: run() opens the resume chunk's hot DB BEFORE ServeReads and hands the handle to the ingestion loop, whose first deferred statement takes ownership of the close. So a broken hot tier fails startup instead of serving behind a crash-looping loop. The leak-safety the earlier fix bought is preserved: the deferred close still sits ahead of any early return. run() owns the close only until the errgroup launch (loopOwnsDB flips there); after that the loop's defer owns it and g.Wait joins before run returns, so there is no window where neither owns the handle. An error between the open and the launch (OpenCore / ServeReads) returns via run()'s defer, which closes it. Restarts re-enter run() from the top, so this stays the single initial-open site; the loop still reopens at each boundary. Part B (eager core start before serve) is NOT a fullhistory-side change: the SDK LedgerStream is RawLedgers-only and deliberately starts core on the first pull with no Start/PrepareRange hook, so eager start lands if the SDK stream grows one (or with #772's real core wiring). OpenCore is already called before ServeReads. Tests: TestRun_OpensHotDBAndCoreBeforeServe (ServeReads observes the resume chunk already "ready" + core opened); TestRun_ServeReadsErrorSurfaces restores the now-meaningful reopen assertion (run opened the DB, closed it on the error path). loopConfig opens the resume DB the way run() does. Root -short + non-short E2E (85s) + golangci --new-from-rev all green.
Closes #816, #808 — the complete Phase 2 (hot tier + lifecycle + live ingestion) in one PR.
Phase 2 introduces the hot tier + lifecycle machinery + live ingestion on top of Phase 1's source-blind cold pipeline:
Hot storage — the per-chunk
hotchunkDBtransient/readystate machine; a read-only view serves both the freeze source and the watermark refineringest.HotServicecommits each ledger as one atomic synced WriteBatch across all CFs (decision (a)) — onefsync, a single per-chunkMaxCommittedSeq, no per-store frontiers, no fan-outExtractLedgerEventswalk per ledger feeds both the tx-hash and events CFs (event-ID assignment order unchanged)Backfill integration — hot source by path (no probe seam)
backfillSource's hot branch opens the chunk's hot DB read-only straight from itsgeometry.Layoutpath (hotchunk.OpenReadOnly) and yields aledgerbackend.LedgerStreamreadychunk whose DB is missing/gutted fails the must-exist open — an ordinary restartable error, never auto-healed into a fresh empty DB (no watermark regression)Progress — derived watermark
LastCommittedLedger(cat, logger)maxes the cold term (highest fully-durable chunk) vs the highestreadyhot DB'sMaxCommittedSeq(one read-only open, which replays any synced WAL after an ungraceful crash) vs the earliest-pin floor — all in the signed domain, never stored; restart re-derives from durable stateLifecycle + live ingestion (the folded-in layer 2)
run()transitions from backfill to a serve+ingest steady state: start captive core (injectedCoreOpener) → serve reads (injected) → run the ingestion loop and the lifecycle loop as a joinederrgroup.WithContextpair (whichever returns first tears down the other; both joined beforerunreturns — the single-lifecycle-goroutine invariant across supervisor restarts)RawLedgersstream, commits each ledger, and at each chunk boundary closes the filled DB → opens the next → publishes the completed chunk (the handoff fence)lifecycle.Loop): freeze → index-aware discard → prune, driven by a latest-cellBoundarySignal— a slow lifecycle can never fall behind, since one tick over[floor, latest]subsumes every skipped boundarybackfill.RunBackfill— the same path catch-up usessuperviseis the single clean-vs-restart decision point (a canceled ctx is a clean shutdown; anything else is a warn + backoff restart). There is no fatal-and-exit class: genuine volume loss presents as a supervised crash-loop, upheld by the must-exist hot-DB open rather than by a hard exitDaemon wiring
captiveCoreOpenerbuilds a fresh captive-coreLedgerStreamper run (NewCaptiveCoreStream); captive-core config unification and the read-serving cutover are deferred to Cut over to RPC v2: remove SQLite ingestion/query, serve from v2 stores #772 (injectCore/ServeReadsto run today)Observability
HotLedgerTotal(duration, err)per ledger + per-typeHotItemsvolume + per-phase timings (extract / ledgers / txhash / events / commit)ColdIngestis emitted only on a terminal step (a Finalize, or an Ingest error) — never fromClose— so a rolled-back or sibling-abandoned ingester leaves no phantom-success sampleFolded-in cleanup
RunHot/HotStoresstream-drain orchestration and theHotProbe/HotChunk/ErrHotVolumeLostprobe machinery (verified zero production callers)Verification:
go build+go vet+go test(incl. the full-lifecycle E2E: ingest → freeze → fold → discard → cold+hot lookup → restart → prune) green on./cmd/stellar-rpc/internal/fullhistory/...(cgo RocksDB toolchain).golangci-lintruns in CI.Follow-ups: #772 (captive-core config unification + read-serving cutover), #835 (
LastCommittedLedger(cat)signature cleanup), #836 (halve cold-path extraction / thin theColdIngesterseam).