Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
6883ce4
feat(fullhistory/streaming): meta-store catalog + one write protocol …
chowbao Jun 18, 2026
8a65f00
feat(fullhistory/streaming): processChunk + catchupSource
chowbao Jun 18, 2026
6fb7add
feat(fullhistory/streaming): tx-hash rolling rebuild + coverage protocol
chowbao Jun 18, 2026
460ccdf
feat(fullhistory/streaming): postcondition resolver + executor
chowbao Jun 18, 2026
257f742
refactor(fullhistory): collapse per-chunk hot stores into one multi-C…
chowbao Jun 18, 2026
410b996
feat(fullhistory/streaming): derived progress (deriveWatermark/derive…
chowbao Jun 18, 2026
9cc4537
feat(fullhistory/streaming): hot-DB ingestion loop
chowbao Jun 18, 2026
cef49f6
feat(fullhistory/streaming): lifecycle tick + loop (clean shutdown)
chowbao Jun 18, 2026
4937a59
feat(fullhistory/streaming): startup orchestration (startStreaming)
chowbao Jun 18, 2026
ea6f11e
test(fullhistory/streaming): close derived-progress coverage gaps
chowbao Jun 18, 2026
d0d9c9e
feat(fullhistory/streaming): config schema + validateConfig + single-…
chowbao Jun 18, 2026
b1c2445
feat(fullhistory/streaming): retention widen/shorten + reader-retenti…
chowbao Jun 18, 2026
9911731
feat(fullhistory/streaming): surgical recovery + hot-volume-loss hand…
chowbao Jun 18, 2026
3ef68ad
feat(fullhistory/streaming): audit command (INV-1..4 invariant walks)
chowbao Jun 18, 2026
edf512f
fix(full-history): audit completes (not aborts) when an INV-2 two-fro…
chowbao Jun 18, 2026
00a8a07
feat(fullhistory/streaming): runnable streaming-daemon entrypoint + c…
chowbao Jun 18, 2026
93b88e1
fix(fullhistory/streaming): honor storage-path overrides in the data …
chowbao Jun 18, 2026
66db3de
feat(fullhistory/streaming): observability — metrics + structured log…
chowbao Jun 18, 2026
f801c09
fix(fullhistory/streaming): real steady-state liveness signal + log/l…
chowbao Jun 18, 2026
f80259f
test(fullhistory/streaming): crash-injection + convergence suite (INV…
chowbao Jun 18, 2026
c9083e2
test(fullhistory/streaming): document convergence caveats; verify cas…
chowbao Jun 18, 2026
ec731b5
test(fullhistory/streaming): end-to-end daemon integration (in-process)
chowbao Jun 18, 2026
424e60f
test/docs(fullhistory/streaming): tx-hash format alignment + perf exp…
chowbao Jun 18, 2026
ecafbd0
refactor(fullhistory/streaming): rename lfs->ledgers, catch_up->backf…
chowbao Jun 18, 2026
dd7e5b7
refactor(fullhistory/streaming): align execution layer to design c586…
chowbao Jun 18, 2026
3280595
test(fullhistory/streaming): address review -- preserve assertion intent
chowbao Jun 18, 2026
68adacc
fix(fullhistory/streaming): join lifecycle goroutine per supervise it…
chowbao Jun 18, 2026
295aa92
docs(fullhistory/streaming): address review -- comment accuracy + wid…
chowbao Jun 18, 2026
856330d
docs(full-history): add implementation issue breakdown + status trace…
chowbao Jun 18, 2026
566792b
refactor(fullhistory/streaming): organize package -- doc map, layer-g…
chowbao Jun 19, 2026
aa599c1
refactor(fullhistory): /simplify pass -- prune onto RetentionGate, sl…
chowbao Jun 19, 2026
93091a4
docs(fullhistory/streaming): clarify surgical-recovery Hi=live contra…
chowbao Jun 19, 2026
974675a
docs(fullhistory): reconcile impl breakdown with design + add vertica…
chowbao Jun 23, 2026
3aa50f5
fix(hotchunk): adapt eventPayloads to the internal/events API (post-r…
chowbao Jun 23, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
227 changes: 147 additions & 80 deletions cmd/stellar-rpc/internal/fullhistory/ingest/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,58 +14,37 @@ import (
"github.com/stellar/go-stellar-sdk/xdr"

"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/chunk"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/eventstore"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/ledger"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/txhash"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/hotchunk"
)

// HotStores holds the long-lived, caller-owned hot stores injected into RunHot.
// The caller (the daemon) opens and closes these; RunHot only borrows them to
// build the per-type hot ingesters. A field left nil for an enabled data type is
// a configuration error caught by RunHot. Every hot store is chunk-bound (each
// instance accumulates exactly one chunk before being frozen into cold
// artifacts), so each injected store must already be bound to the chunk being
// ingested — RunHot rejects a mismatch up front.
// HotStores holds the long-lived, caller-owned shared per-chunk hot DB injected
// into RunHot. The caller (the daemon) opens and closes it; RunHot only borrows
// it to drive the per-ledger atomic ingest. Under decision (a) this is ONE
// multi-CF RocksDB instance (ledgers + events CFs + txhash CFs), not three
// independent stores. The DB is chunk-bound (it accumulates exactly one chunk
// before being frozen into cold artifacts), so the injected DB must already be
// bound to the chunk being ingested — RunHot rejects a mismatch up front. A nil
// DB with any data type enabled in cfg is a configuration error caught by
// RunHot.
type HotStores struct {
Ledgers *ledger.HotStore
Txhash *txhash.HotStore
Events *eventstore.HotStore
// HotDB is the shared per-chunk multi-CF hot DB. Required when any hot data
// type is enabled.
HotDB *hotchunk.DB
}

// buildHotIngesters constructs one HotIngester per data type enabled in cfg, in
// canonical ledgers→txhash→events order, from the injected stores. It errors if
// an enabled type's store is nil.
func buildHotIngesters(stores HotStores, sink MetricSink, cfg Config) ([]HotIngester, error) {
var ings []HotIngester
if cfg.Ledgers {
if stores.Ledgers == nil {
return nil, errors.New("ingest: Ledgers enabled but HotStores.Ledgers is nil")
}
ings = append(ings, NewLedgerHotIngester(stores.Ledgers, sink))
}
if cfg.Txhash {
if stores.Txhash == nil {
return nil, errors.New("ingest: Txhash enabled but HotStores.Txhash is nil")
}
ings = append(ings, NewTxhashHotIngester(stores.Txhash, sink))
}
if cfg.Events {
if stores.Events == nil {
return nil, errors.New("ingest: Events enabled but HotStores.Events is nil")
}
ings = append(ings, NewEventsHotIngester(stores.Events, sink))
}
return ings, nil
// ingestContributions maps the ingest Config's enabled data types onto the
// hotchunk.Ingest toggles that select which CFs the single per-ledger batch
// writes.
func ingestContributions(cfg Config) hotchunk.Ingest {
return hotchunk.Ingest{Ledgers: cfg.Ledgers, Txhash: cfg.Txhash, Events: cfg.Events}
}

// buildColdIngesters opens one ColdIngester per data type enabled in cfg,
// each opening its own per-chunk writer under coldDir/<type> (constructors
// create their own directories and freely overwrite any prior attempt's
// files — see the package doc's artifact model). The constructor table below
// is the single definition site of the canonical ledgers→txhash→events order
// (buildHotIngesters keeps its explicit if-ladder because its three injected
// store types differ). On any constructor error it closes the ingesters built
// so far and returns.
// is the single definition site of the canonical ledgers→txhash→events order.
// On any constructor error it closes the ingesters built so far and returns.
func buildColdIngesters(coldDir string, chunkID chunk.ID, sink MetricSink, cfg Config) ([]ColdIngester, error) {
ctors := []struct {
enabled bool
Expand Down Expand Up @@ -123,11 +102,12 @@ func closeColdAll(ings []ColdIngester, err error) error {
}

// RunHot opens one stream for chunkID from source and feeds each ledger (as a
// view) to a HotService over the enabled hot ingesters, built from the INJECTED,
// caller-owned stores in hotStores. Ingest errors abort fast; HotService.Ingest
// waits for all ingesters before the loop pulls again so the borrowed view is
// never read past its lifetime. The hot stores are NOT closed here — the caller
// owns their lifecycle.
// view) to a HotService backed by the INJECTED, caller-owned shared per-chunk
// hot DB in hotStores. Each ledger commits as ONE atomic synced WriteBatch
// across all enabled CFs (decision (a)); Ingest errors abort fast, and
// HotService.Ingest consumes the borrowed view synchronously before the loop
// pulls the next ledger. The hot DB is NOT closed here — the caller owns its
// lifecycle.
func RunHot(
ctx context.Context,
logger *supportlog.Entry,
Expand All @@ -140,47 +120,26 @@ func RunHot(
if verr := cfg.validate(); verr != nil {
return verr
}
// Every hot store is chunk-bound — each instance accumulates exactly one
// chunk's data before being frozen into the chunk's cold artifacts — and
// records its chunk at open time. An injected store bound to a different
// chunk than we're ingesting would silently interleave two chunks' data
// (ledgers, txhash) or fail every per-ledger write with an out-of-range
// offset (events, whose LedgerOffsets are chunk-relative), so catch the
// mismatch up front with a clear message. Nil stores are skipped here:
// buildHotIngesters rejects a nil store for an enabled type with a more
// specific error.
checkBinding := func(name string, got chunk.ID) error {
if got != chunkID {
return fmt.Errorf("ingest: RunHot chunk %d but injected %s store is bound to chunk %d",
uint32(chunkID), name, uint32(got))
}
return nil
}
if cfg.Ledgers && hotStores.Ledgers != nil {
if err := checkBinding("Ledgers", hotStores.Ledgers.ChunkID()); err != nil {
return err
}
}
if cfg.Txhash && hotStores.Txhash != nil {
if err := checkBinding("Txhash", hotStores.Txhash.ChunkID()); err != nil {
return err
}
}
if cfg.Events && hotStores.Events != nil {
if err := checkBinding("Events", hotStores.Events.ChunkID()); err != nil {
return err
}
anyEnabled := cfg.Ledgers || cfg.Txhash || cfg.Events
if anyEnabled && hotStores.HotDB == nil {
return errors.New("ingest: a hot data type is enabled but HotStores.HotDB is nil")
}
ings, berr := buildHotIngesters(hotStores, sink, cfg)
if berr != nil {
return berr
// The shared hot DB is chunk-bound — it accumulates exactly one chunk's
// data before being frozen into the chunk's cold artifacts — and records
// its chunk at open time. An injected DB bound to a different chunk than
// we're ingesting would silently interleave two chunks' data or fail every
// per-ledger events write with an out-of-range offset (LedgerOffsets are
// chunk-relative), so catch the mismatch up front with a clear message.
if hotStores.HotDB != nil && hotStores.HotDB.ChunkID() != chunkID {
return fmt.Errorf("ingest: RunHot chunk %d but injected hot DB is bound to chunk %d",
uint32(chunkID), uint32(hotStores.HotDB.ChunkID()))
}
stream, oerr := source.OpenStream(chunkID)
if oerr != nil {
return fmt.Errorf("open stream for chunk %d: %w", uint32(chunkID), oerr)
}
logger.Debugf("RunHot: ingesting chunk %d [%d, %d]", uint32(chunkID), chunkID.FirstLedger(), chunkID.LastLedger())
service := NewHotService(ings, sink)
service := NewHotService(hotStores.HotDB, ingestContributions(cfg), sink)
return drain(ctx, stream, chunkID, service)
}

Expand Down Expand Up @@ -235,6 +194,114 @@ func drain(ctx context.Context, stream ledgerbackend.LedgerStream, chunkID chunk
return nil
}

// ColdDirs names the per-data-type output root for one chunk's cold artifacts.
// Each field is the directory UNDER WHICH the matching cold ingester composes
// its {bucketID:05d}/ subdirectory — i.e. the same `coldDir` the per-type
// constructor (NewLedgerColdIngester / NewTxhashColdIngester /
// NewEventsColdIngester) takes. A field left "" for a data type enabled in cfg
// is a configuration error caught by RunColdChunk.
//
// RunCold derives these three roots from a single coldDir by appending the
// fixed dataType subdirectory (coldDir/ledgers, coldDir/txhash, coldDir/events).
// ColdDirs exists so a caller with a DIFFERENT on-disk layout (e.g. the
// streaming daemon, whose raw txhash runs live under txhash/raw, not txhash)
// can place each artifact at its own canonical path while reusing the very same
// cold ingesters, ColdService, and drain loop.
type ColdDirs struct {
Ledgers string
Txhash string
Events string
}

// buildColdIngestersIn opens one ColdIngester per data type enabled in cfg,
// each under its OWN root from dirs (rather than coldDir/<dataType>). It is the
// ColdDirs counterpart of buildColdIngesters: same constructors, same canonical
// ledgers→txhash→events order, same rollback-on-constructor-error semantics; it
// differs only in resolving each type's root from an explicit field instead of
// a fixed subdirectory of one coldDir.
func buildColdIngestersIn(dirs ColdDirs, chunkID chunk.ID, sink MetricSink, cfg Config) ([]ColdIngester, error) {
ctors := []struct {
enabled bool
dataType string
dir string
open func(string, chunk.ID, MetricSink) (ColdIngester, error)
}{
{cfg.Ledgers, dataTypeLedgers, dirs.Ledgers, NewLedgerColdIngester},
{cfg.Txhash, dataTypeTxhash, dirs.Txhash, NewTxhashColdIngester},
{cfg.Events, dataTypeEvents, dirs.Events, NewEventsColdIngester},
}
var ings []ColdIngester
for _, c := range ctors {
if !c.enabled {
continue
}
if c.dir == "" {
return nil, closeColdAll(ings, fmt.Errorf("ingest: %s enabled but ColdDirs.%s is empty", c.dataType, c.dataType))
}
ing, err := c.open(c.dir, chunkID, sink)
if err != nil {
return nil, closeColdAll(ings, fmt.Errorf("open %s cold ingester: %w", c.dataType, err))
}
ings = append(ings, ing)
}
return ings, nil
}

// RunColdChunk ingests EXACTLY ONE chunk's cold artifacts from source into the
// per-data-type roots named by dirs, in a single streaming pass over the
// chunk's ledgers. It is the single-chunk, explicit-layout sibling of RunCold:
// it reuses the same cold ingester constructors, the same ColdService, and the
// same drain loop (sequence/overrun validation, full-range completeness check
// before Finalize), differing only in (1) producing one chunk rather than N
// concurrent chunks and (2) taking explicit per-type output roots so a caller
// whose layout is not coldDir/<dataType> can still reuse the cold pipeline
// verbatim.
//
// The cold ingesters overwrite any prior attempt's files at their canonical
// paths (see the package doc's artifact model), so RunColdChunk is the
// re-materialization primitive the streaming freeze protocol drives: a partial
// file from a crashed attempt is inert scratch the next call overwrites.
func RunColdChunk(
ctx context.Context,
logger *supportlog.Entry,
source ChunkSource,
dirs ColdDirs,
chunkID chunk.ID,
sink MetricSink,
cfg Config,
) (err error) {
if verr := cfg.validate(); verr != nil {
return verr
}
sink = orNop(sink)
start := time.Now()
if cerr := ctx.Err(); cerr != nil {
sink.ColdChunkTotal(time.Since(start))
return cerr
}
stream, oerr := source.OpenStream(chunkID)
if oerr != nil {
sink.ColdChunkTotal(time.Since(start))
return fmt.Errorf("open stream for chunk %d: %w", uint32(chunkID), oerr)
}
ings, berr := buildColdIngestersIn(dirs, chunkID, sink, cfg)
if berr != nil {
sink.ColdChunkTotal(time.Since(start))
return berr
}
logger.Debugf("RunColdChunk: ingesting chunk %d [%d, %d]", uint32(chunkID), chunkID.FirstLedger(), chunkID.LastLedger())
service := NewColdService(ings, sink)
defer func() {
if cerr := service.Close(); cerr != nil {
err = errors.Join(err, fmt.Errorf("close: %w", cerr))
}
}()
if derr := drain(ctx, stream, chunkID, service); derr != nil {
return derr
}
return service.Finalize(ctx)
}

// RunCold ingests numChunks consecutive chunks starting at startChunk into the
// cold stores under coldDir, processing up to chunkWorkers chunks concurrently.
// Each chunk worker opens its own stream via source.OpenStream(chunkID), builds
Expand Down
Loading
Loading