Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
fa93cb1
streaming(fullhistory): Phase 2 layer 1 — hot store + lifecycle (#816)
chowbao Jun 25, 2026
7e687c1
fullhistory: rename ingest.go -> hotloop.go to disambiguate from inge…
chowbao Jun 30, 2026
96d0e44
fullhistory: drop gratuitous comment churn in untouched sections
chowbao Jun 30, 2026
b61181a
fullhistory: drop redundant pkg-name prefix from PR-added error messages
chowbao Jun 30, 2026
a0b068c
streaming(fullhistory): Phase 2 layer 2 — live ingestion + daemon wir…
chowbao Jun 30, 2026
4d19700
fullhistory/ingest: fix stale comments describing the deleted HotServ…
chowbao Jun 30, 2026
cfca87a
fullhistory/observability: drop cold_tier_bytes + per-ledger LedgerCo…
chowbao Jun 30, 2026
f8c8cec
fullhistory: /simplify pass on the layer-2 test code
chowbao Jun 30, 2026
5b497b3
fullhistory: align prose to the design's vocabulary (backfill, last-c…
chowbao Jun 30, 2026
c2471fc
fullhistory: rename functions to the design doc's names
chowbao Jul 1, 2026
f1d602a
fullhistory: run captive core for live ingestion via ledgerbackend (#…
chowbao Jul 1, 2026
4a4e037
fullhistory: source captive-core config from the captive_core_config …
chowbao Jul 1, 2026
a07c973
Align hot ingestion with always-on hot tier
chowbao Jul 1, 2026
5cabb45
Recover hot progress from WAL on startup
chowbao Jul 1, 2026
27aae30
fullhistory: seed events contiguity in boundary test for always-on ho…
chowbao Jul 1, 2026
5112430
fullhistory: align hot txhash tier + index catalog key to design #787
chowbao Jul 1, 2026
9ea3b4c
fullhistory: fix golangci-lint failures
chowbao Jul 1, 2026
a5bf892
fullhistory: drop redundant "streaming:" prefix from PR-introduced fa…
chowbao Jul 1, 2026
0c993a8
fullhistory: make hot DB a required HotService dependency
chowbao Jul 1, 2026
cb99138
fullhistory: /simplify cleanups (reuse + dead-param + boundary dedup)
chowbao Jul 1, 2026
4c16406
stores: drop standalone hot-store open paths; wrap-only over shared p…
chowbao Jul 1, 2026
4481062
txhash: collapse vestigial Tuning()/tuning() split into one func
chowbao Jul 1, 2026
270bf50
Remove live catalog test seam
chowbao Jul 1, 2026
ce675b7
e2e_test: drop unused cyclop nolint directive
chowbao Jul 1, 2026
5a96d9c
fullhistory: drop orphaned self-committing hot writes + eventPayloads…
chowbao Jul 2, 2026
a298fec
fullhistory: clear lint fallout + finish the ReadOnly WAL comment sweep
chowbao Jul 2, 2026
e701785
fullhistory: address PR #820 review — concrete fixes
chowbao Jul 2, 2026
a694d8c
fullhistory/eventstore: delete NextEventID (test-only duplicate of Ev…
chowbao Jul 2, 2026
86285e3
fullhistory: hot-DB failure policy — no auto-create, loss is restarta…
chowbao Jul 2, 2026
893cc80
hotchunk: move hotLedgerStream in, add DB.Source() (#22 prep)
chowbao Jul 2, 2026
e0be578
fullhistory: delete HotProbe seam — backfill/watermark open hot DBs b…
chowbao Jul 2, 2026
4ad32fb
fullhistory: ingestion-loop rework — raw stream in, atomic cell out (…
chowbao Jul 2, 2026
e13fb21
fullhistory/stores: delete unreachable hot-write-path guards + events…
chowbao Jul 2, 2026
26f81df
fullhistory/lifecycle: delete the two dead plan-range clamps (#25)
chowbao Jul 2, 2026
cf6fe62
fullhistory: lifecycle returns error, joined with ingestion via errgr…
chowbao Jul 2, 2026
c6d018c
fullhistory/ingest: emit per-ingester ColdIngest off the Close path (…
chowbao Jul 2, 2026
4ff5e5b
fullhistory,events: halve hot per-ledger extraction to one walk (#18 …
chowbao Jul 2, 2026
f360ad4
fullhistory: unify the frozen-index coverage predicate (#37)
chowbao Jul 2, 2026
049db0e
fullhistory: make geometry.Layout the only cold-path derivation (#36)
chowbao Jul 2, 2026
f981593
fullhistory/ingest: batch-scope hot metric attribution (#18 part 2)
chowbao Jul 2, 2026
ec7697e
fullhistory: errgroup.WithContext + ingestion owns its first chunk (#…
chowbao Jul 2, 2026
6df6ad8
fullhistory/ingest: hot per-ledger phase timings (#18 amendment)
chowbao Jul 2, 2026
73f95dc
fullhistory: round-3 review polish — dead code, metric name, stale docs
chowbao Jul 2, 2026
c2d5cc9
fullhistory/catalog: test DiscardHotChunk crash-resume + absent-key n…
chowbao Jul 2, 2026
44d4faa
fullhistory: trim over-narrated comments (#3516018538)
chowbao Jul 2, 2026
6ebcc02
fullhistory: guard the errgroup no-hang invariant in run()
chowbao Jul 2, 2026
e6de0f9
fullhistory: fix golangci-lint findings on the review-round changes
chowbao Jul 3, 2026
10a7caf
fullhistory: fix 3 follow-on golangci findings (unparam + unused noli…
chowbao Jul 3, 2026
d2b1c12
fullhistory: lifecycle tick cleanups + gauge correctness (round-4 rev…
chowbao Jul 3, 2026
3cb0e7d
fullhistory/ingest,stores: delete dead seams (round-4 review)
chowbao Jul 3, 2026
11ed6fa
fullhistory: missing test pins + stale doc/comment fixes (round-4 rev…
chowbao Jul 3, 2026
44d66ae
fullhistory/ingest,hotchunk: unify hot metrics into one phase-keyed f…
chowbao Jul 3, 2026
79704d4
fullhistory: delete SeqValidatedCursor, enforce in-order at the sourc…
chowbao Jul 3, 2026
3dcd691
fullhistory: partial-duration on failed hot phase + restore lifecycle…
chowbao Jul 3, 2026
e7eb46d
fullhistory: open resume hot DB before serving reads (#3517545236, pa…
chowbao Jul 3, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 18 additions & 2 deletions cmd/stellar-rpc/internal/events/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ import (
// (ingest.ExtractLedgerEvents — one TxProcessing walk yields hash + events
// together). This function adds only the RPC-specific Payload shape, the
// Stage→(TxIdx, OpIdx) cursor-sentinel mapping, EventIdx, and the cursor
// ordering.
// ordering — all in PayloadsFromLedgerEvents, over which this is the thin
// view-reading wrapper.
func LCMViewToPayloads(lcm xdr.LedgerCloseMetaView) ([]Payload, error) {
ledgerSeq, err := lcm.LedgerSequence()
if err != nil {
Expand All @@ -44,11 +45,26 @@ func LCMViewToPayloads(lcm xdr.LedgerCloseMetaView) ([]Payload, error) {
if err != nil {
return nil, err
}

txEvents, err := ingest.ExtractLedgerEvents(lcm)
if err != nil {
return nil, err
}
return PayloadsFromLedgerEvents(txEvents, ledgerSeq, ledgerClosedAt)
}

// PayloadsFromLedgerEvents shapes an already-extracted per-transaction event
// slice (ingest.ExtractLedgerEvents output) into cursor-ordered Payloads. It is
// the body of LCMViewToPayloads minus the SDK walk, so a caller that already
// holds the txEvents — the hot ingest path, which also needs the paired tx
// hashes (txEvents[i].Hash) — can feed BOTH txhash and events from ONE
// ExtractLedgerEvents call instead of walking TxProcessing twice. ledgerSeq and
// ledgerClosedAt are the view's header values (cheap reads, not a walk). The
// cursor ordering and EventIdx assignment are IDENTICAL to what LCMViewToPayloads
// produced inline, so event IDs are unchanged across the refactor.
func PayloadsFromLedgerEvents(
txEvents []ingest.LedgerTransactionEvents, ledgerSeq uint32, ledgerClosedAt int64,
) ([]Payload, error) {
var err error
at := func(i int) (uint32, xdr.Hash) {
return uint32(i) + 1, xdr.Hash(txEvents[i].Hash) //nolint:gosec // 1-based, matching ingest reader's tx.Index
}
Expand Down
85 changes: 85 additions & 0 deletions cmd/stellar-rpc/internal/fullhistory/backfill/hotsource_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package backfill

import (
"context"
"testing"

"github.com/stretchr/testify/require"

"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/catalog"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/chunk"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/rocksdb"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/hotchunk"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/ledger"
)

// seedReadyHotChunk brackets a "ready" hot DB for c (transient -> create -> ready)
// and commits ONE ledgers-CF entry at seq `top` so MaxCommittedSeq reads back
// `top`. It writes just the ledgers CF (the only CF the completeness gate reads)
// and closes the store — hygiene, not a lock requirement: a read-only open takes
// no RocksDB LOCK and would succeed against a writer-held DB too. The daemon opens
// this exact on-disk DB by its Layout path.
func seedReadyHotChunk(t *testing.T, cat *catalog.Catalog, c chunk.ID, top uint32) {
t.Helper()
require.NoError(t, cat.PutHotTransient(c))
store, err := rocksdb.New(rocksdb.Config{
Path: cat.Layout().HotChunkPath(c),
ColumnFamilies: hotchunk.ColumnFamilies(),
Logger: silentLogger(),
})
require.NoError(t, err)
h := ledger.NewWithStore(store)
require.NoError(t, store.Batch(func(b *rocksdb.BatchWriter) error {
return h.AddLedgerToBatch(b, ledger.Entry{Seq: top, Bytes: []byte("ledger")})
}))
require.NoError(t, store.Close())
require.NoError(t, cat.FlipHotReady(c))
}

// TestBackfillSource_HotComplete: a "ready" hot DB whose committed frontier
// reaches the chunk's last ledger IS the source — backfillSource returns it with
// NO backend configured, so success alone proves the hot branch was taken.
func TestBackfillSource_HotComplete(t *testing.T) {
cat, _ := testCatalog(t)
cfg := testProcessConfig(t, cat) // no Backend

c := chunk.ID(0)
seedReadyHotChunk(t, cat, c, c.LastLedger()) // complete: maxSeq == last ledger

src, closeSrc, err := backfillSource(context.Background(), c, catalog.AllArtifacts(), cfg)
require.NoError(t, err, "complete hot tier is used; no bulk backend needed")
require.NotNil(t, src)
require.NoError(t, closeSrc())
}

// TestBackfillSource_HotIncompleteFallsThrough: a "ready" but incomplete hot DB is
// staleness — backfillSource falls past it. With no pack and no backend, that
// fall-through surfaces as the "no bulk backend" error (not a hot-tier error).
func TestBackfillSource_HotIncompleteFallsThrough(t *testing.T) {
cat, _ := testCatalog(t)
cfg := testProcessConfig(t, cat) // no Backend, no frozen pack

c := chunk.ID(0)
seedReadyHotChunk(t, cat, c, c.FirstLedger()) // incomplete: maxSeq < last ledger

_, _, err := backfillSource(context.Background(), c, catalog.AllArtifacts(), cfg)
require.Error(t, err)
require.Contains(t, err.Error(), "no bulk backend",
"an incomplete hot tier falls through; it is not itself an error")
}

// TestBackfillSource_HotReadyButDirMissing: a "ready" key whose hot DB won't open
// (dir gone) is an ordinary restartable error — the read-only open never
// auto-heals it into a fresh empty DB.
func TestBackfillSource_HotReadyButDirMissing(t *testing.T) {
cat, _ := testCatalog(t)
cfg := testProcessConfig(t, cat)

c := chunk.ID(0)
require.NoError(t, cat.PutHotTransient(c))
require.NoError(t, cat.FlipHotReady(c)) // ready key, NO dir on disk

_, _, err := backfillSource(context.Background(), c, catalog.AllArtifacts(), cfg)
require.Error(t, err)
require.Contains(t, err.Error(), "won't open")
}
107 changes: 88 additions & 19 deletions cmd/stellar-rpc/internal/fullhistory/backfill/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/geometry"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/ingest"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/chunk"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/hotchunk"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/ledger"
)

Expand Down Expand Up @@ -84,11 +85,12 @@ func processChunk(ctx context.Context, chunkID chunk.ID, artifacts catalog.Artif

// Choose the source before marking "freezing": a source error (a missing pack
// or a coverage timeout) must not leave "freezing" debris for a chunk we then
// refuse to produce.
src, err := backfillSource(ctx, chunkID, artifacts, cfg)
// refuse to produce. closeSource releases any opened hot DB after the pass.
src, closeSource, err := backfillSource(ctx, chunkID, artifacts, cfg)

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Design-doc traceability for #787: resolving the source before marking freezing keeps the processChunk one-write protocol safe: source/coverage errors do not leave freezing debris, and the subsequent mark -> write -> fsync barrier -> frozen flip below is the protocol from the design. See full-history-streaming-workflow.md lines 195-206 and the processChunk primitive at lines 271-302.

if err != nil {
return err
}
defer func() { _ = closeSource() }()

// The one-write protocol, straight-line (see catalog_protocol.go header). The
// // one-write: labels keep the four steps greppable without a wrapper.
Expand All @@ -101,9 +103,9 @@ func processChunk(ctx context.Context, chunkID chunk.ID, artifacts catalog.Artif
// one-write:create — materialize this chunk's cold artifacts from the resolved
// source's raw ledger iterator. WriteColdChunk is source-blind.
dirs := ingest.ColdDirs{
Ledgers: layout.LedgersRoot(),
Txhash: layout.TxHashRawRoot(),
Events: layout.EventsRoot(),
LedgerPack: layout.LedgerPackPath(chunkID),
TxhashBin: layout.TxHashBinPath(chunkID),
EventsDir: layout.EventsBucketDir(chunkID),
}
raw := src.RawLedgers(ctx, ledgerbackend.BoundedRange(chunkID.FirstLedger(), chunkID.LastLedger()))
if rerr := ingest.WriteColdChunk(
Expand All @@ -130,37 +132,53 @@ func processChunk(ctx context.Context, chunkID chunk.ID, artifacts catalog.Artif
return nil
}

// backfillSource picks a chunk's ledger source as a bare ledgerbackend.LedgerStream:
// 1. the frozen local .pack, unless ledgers is itself requested (circular);
// 2. the bulk backend (cfg.Backend), gated by a bounded waitForCoverage on its Tip.
//
// The local pack needs no coverage wait (it is complete) and no close (its reader
// is opened and closed per RawLedgers call). The bulk backend is caller-owned (the
// daemon Closes it), so backfillSource returns no closer either.
// backfillSource picks a chunk's ledger source (+ a closer for an opened hot DB;
// no-op otherwise), in preference order:
// 1. a ready, COMPLETE hot tier (decision (a): maxCommittedSeq >= last ledger);
// incomplete-but-present is staleness that falls through (re-derivation
// recovers it); a "ready" DB that won't open is an ordinary restartable error
// (read-only open, never auto-healed);
// 2. the frozen local .pack, unless ledgers is itself requested (circular);
// 3. the bulk backend, gated by a bounded waitForCoverage on its Tip.
func backfillSource(
ctx context.Context, chunkID chunk.ID, artifacts catalog.ArtifactSet, cfg ProcessConfig,
) (ledgerbackend.LedgerStream, error) {
) (ledgerbackend.LedgerStream, func() error, error) {
noClose := func() error { return nil }
cat := cfg.Catalog
layout := cat.Layout()

// (1) Hot branch: only when the hot key is "ready". A "transient" key (mid-op
// or recovery-demoted) is not a read source; an absent key falls through.
src, closer, used, herr := resolveHotSource(chunkID, cfg)
if herr != nil {
return nil, noClose, herr // hot-DB open failure — restartable, never auto-healed
}
if used {
cfg.Logger.Debugf("backfillSource: chunk %s from complete hot tier", chunkID)
return src, closer, nil
}

// (2) Frozen local .pack, only when ledgers is not requested (producing ledgers
// from the pack we'd write would be circular).
ledgersState, err := cat.State(chunkID, geometry.KindLedgers)
if err != nil {
return nil, fmt.Errorf("read ledgers state chunk %s: %w", chunkID, err)
return nil, noClose, fmt.Errorf("read ledgers state chunk %s: %w", chunkID, err)
}
if ledgersState == geometry.StateFrozen && !artifacts.Has(geometry.KindLedgers) {
packPath := layout.LedgerPackPath(chunkID)
if _, serr := os.Stat(packPath); serr == nil {
cfg.Logger.Debugf("backfillSource: chunk %s re-derived from frozen .pack", chunkID)
return ledger.NewPackStream(packPath), nil
return ledger.NewPackStream(packPath), noClose, nil
}
// frozen ⇒ file exists; a missing pack is a bug, not a re-download trigger.
return nil, fmt.Errorf(
return nil, noClose, fmt.Errorf(
"chunk %s ledgers is %q but pack file is missing at %s",
chunkID, geometry.StateFrozen, packPath)
}

// (3) Bulk backend — the only source for a chunk with no local copy.
if cfg.Backend == nil {
return nil, fmt.Errorf(
return nil, noClose, fmt.Errorf(
"chunk %s has no local copy and no bulk backend is configured", chunkID)
}
// The coverage wait is mandatory before reading the bulk backend: the freeze
Expand All @@ -169,8 +187,59 @@ func backfillSource(
if werr := waitForCoverage(
ctx, cfg.Backend, chunkID.LastLedger(), defaultCoveragePollInterval, defaultCoverageTimeout,
); werr != nil {
return nil, werr
return nil, noClose, werr
}
cfg.Logger.Debugf("backfillSource: chunk %s from bulk backend", chunkID)
return cfg.Backend, nil
return cfg.Backend, noClose, nil
}

// resolveHotSource applies the hot branch end to end: it reads the hot key and,
// only when "ready", tries the hot tier. used=true → src/closer are the hot
// source; used=false → no "ready" key or present-but-incomplete (caller falls
// through); err → a "ready" DB that won't open (restartable). Keeps backfillSource's
// hot branch flat.
func resolveHotSource(
chunkID chunk.ID, cfg ProcessConfig,
) (ledgerbackend.LedgerStream, func() error, bool, error) {
hotState, err := cfg.Catalog.HotState(chunkID)
if err != nil {
return nil, nil, false, fmt.Errorf("read hot state chunk %s: %w", chunkID, err)
}
if hotState != geometry.HotReady {
return nil, nil, false, nil // "transient"/absent: not a read source
}
return tryHotSource(chunkID, cfg)
}

// tryHotSource handles the hot branch under a "ready" key: it opens the chunk's
// shared hot DB read-only (never auto-healed) straight from its Layout path.
// used=true when present AND complete; used=false when present-but-incomplete
// (staleness, caller falls through); err when a "ready" DB is absent or unopenable
// — an ordinary restartable error, detected lazily on the open.
func tryHotSource(chunkID chunk.ID, cfg ProcessConfig) (ledgerbackend.LedgerStream, func() error, bool, error) {
dir := cfg.Catalog.Layout().HotChunkPath(chunkID)
// Open the chunk's shared multi-CF DB READ-ONLY: the freeze reads its ledgers to
// re-derive the cold artifacts and must never mutate it (the read-only open
// replays any un-synced WAL into memtables but persists nothing). An absent or
// gutted "ready" DB fails the open — restartable, never auto-created.
hot, err := hotchunk.OpenReadOnly(dir, chunkID, cfg.Logger)
if err != nil {
return nil, nil, false, fmt.Errorf("chunk %s is ready but its hot DB won't open: %w", chunkID, err)
}
maxSeq, present, merr := hot.MaxCommittedSeq()
if merr != nil {
_ = hot.Close()
// A read error against an opened DB: the DB opened but cannot answer its
// own progress. Surface it (restartable), don't treat as staleness.
return nil, nil, false, fmt.Errorf("chunk %s: read hot max committed seq: %w", chunkID, merr)
}
// decision (a): complete iff the single DB's maxCommittedSeq reaches the chunk's
// last ledger. An empty DB (present==false) cannot be complete.
if present && maxSeq >= chunkID.LastLedger() {
return hot.Source(), hot.Close, true, nil
}
// Present but incomplete: legitimate staleness — caller falls through.
cfg.Logger.Debugf("backfillSource: chunk %s hot tier present but incomplete; falling through", chunkID)
_ = hot.Close()
return nil, nil, false, nil
}
12 changes: 7 additions & 5 deletions cmd/stellar-rpc/internal/fullhistory/backfill/process_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,9 @@ func TestBackfillSource_PrefersFrozenPackWhenLFSNotRequested(t *testing.T) {
cfg.Backend = bulk

set := catalog.NewArtifactSet(geometry.KindEvents, geometry.KindTxHash) // ledgers NOT requested
src, err := backfillSource(context.Background(), chunkID, set, cfg)
src, closeSrc, err := backfillSource(context.Background(), chunkID, set, cfg)
require.NoError(t, err)
defer func() { require.NoError(t, closeSrc()) }()
// It is a pack stream (re-derivation without download); the bulk backend was
// not consulted.
require.IsType(t, ledger.NewPackStream(""), src)
Expand All @@ -354,8 +355,9 @@ func TestBackfillSource_DoesNotUsePackWhenLFSRequested(t *testing.T) {

// ledgers IS requested — the pack branch is skipped (circular), so it goes to
// the bulk backend (whose tip covers the chunk, so the wait passes).
src, err := backfillSource(context.Background(), chunkID, catalog.AllArtifacts(), cfg)
src, closeSrc, err := backfillSource(context.Background(), chunkID, catalog.AllArtifacts(), cfg)
require.NoError(t, err)
defer func() { require.NoError(t, closeSrc()) }()
require.Same(t, bulk, src)
}

Expand All @@ -369,7 +371,7 @@ func TestBackfillSource_BulkCoverageErrorAborts(t *testing.T) {
chunkID := chunk.ID(0)
cfg.Backend = &fakeBackend{t: t, gen: zeroTxLCMBytes, tipErr: errors.New("boom")}

_, err := backfillSource(context.Background(), chunkID, catalog.AllArtifacts(), cfg)
_, _, err := backfillSource(context.Background(), chunkID, catalog.AllArtifacts(), cfg)
require.Error(t, err)
require.Contains(t, err.Error(), "backend tip query")
}
Expand All @@ -379,7 +381,7 @@ func TestBackfillSource_NoBackendConfigured(t *testing.T) {
cfg := testProcessConfig(t, cat)
cfg.Backend = nil

_, err := backfillSource(context.Background(), chunk.ID(0), catalog.AllArtifacts(), cfg)
_, _, err := backfillSource(context.Background(), chunk.ID(0), catalog.AllArtifacts(), cfg)
require.Error(t, err)
require.Contains(t, err.Error(), "no bulk backend")
}
Expand Down Expand Up @@ -453,7 +455,7 @@ func writeRealPack(t *testing.T, cat *catalog.Catalog, chunkID chunk.ID) {
stream := &fullChunkStream{t: t, gen: zeroTxLCMBytes}
raw := stream.RawLedgers(context.Background(),
ledgerbackend.BoundedRange(chunkID.FirstLedger(), chunkID.LastLedger()))
dirs := ingest.ColdDirs{Ledgers: cat.Layout().LedgersRoot()}
dirs := ingest.ColdDirs{LedgerPack: cat.Layout().LedgerPackPath(chunkID)}
require.NoError(t, ingest.WriteColdChunk(
context.Background(), silentLogger(), chunkID, raw, dirs,
ingest.NopSink{}, ingest.Config{Ledgers: true}))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,11 @@ func (r *recordingMetrics) Prune(count int, d time.Duration) {
r.prune = append(r.prune, pruneRec{count, d})
}

func (*recordingMetrics) LastCommitted(uint32, uint32) {}
func (*recordingMetrics) BackfillPass(time.Duration) {}
func (*recordingMetrics) LastCommitted(uint32) {}
func (*recordingMetrics) RetentionFloor(uint32) {}
func (*recordingMetrics) ChunkBoundary() {}
func (*recordingMetrics) BackfillPass(time.Duration) {}
func (*recordingMetrics) LiveHotChunks(int) {}
func (*recordingMetrics) Discard(int, time.Duration) {}

var _ observability.Metrics = (*recordingMetrics)(nil)
10 changes: 2 additions & 8 deletions cmd/stellar-rpc/internal/fullhistory/backfill/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,6 @@ type coverageRange struct {
Lo, Hi chunk.ID
}

// covers reports whether this range fully contains other (other ⊆ this).
func (r coverageRange) covers(other coverageRange) bool {
return r.Lo <= other.Lo && r.Hi >= other.Hi
}

// resolve diffs the desired state (every artifact of [rangeStart, rangeEnd] durable)
// against the catalog, emitting a Plan. A pure read — recomputes from durable keys
// every run, so a restart re-plans cleanly.
Expand Down Expand Up @@ -98,12 +93,11 @@ func resolveTxHashIndex(
Hi: min(txLayout.LastChunk(w), rangeEnd), // capped by range end
}

frozen, hasFrozen, err := cat.FrozenTxHashIndex(w)
covered, err := cat.FrozenIndexCoversRange(w, desired.Lo, desired.Hi)
if err != nil {
return IndexBuild{}, false, err
}
stored := coverageRange{Lo: frozen.Lo, Hi: frozen.Hi}
if hasFrozen && stored.covers(desired) {
if covered {
// Frozen coverage already spans desired, so no rebuild is due — steady state, a
// risen floor, or a finalized window. Any non-frozen leftover a crashed build
// stranded (a superseded "pruning"/"freezing" coverage or a demoted .bin) is the
Expand Down
Loading
Loading