Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
220 changes: 137 additions & 83 deletions cmd/stellar-rpc/internal/fullhistory/ingest/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,67 +14,40 @@ import (
"github.com/stellar/go-stellar-sdk/xdr"

"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/chunk"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/eventstore"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/ledger"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/txhash"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/hotchunk"
)

// HotStores holds the long-lived, caller-owned hot stores injected into RunHot.
// The caller (the daemon) opens and closes these; RunHot only borrows them to
// build the per-type hot ingesters. A field left nil for an enabled data type is
// a configuration error caught by RunHot. Every hot store is chunk-bound (each
// instance accumulates exactly one chunk before being frozen into cold
// artifacts), so each injected store must already be bound to the chunk being
// ingested — RunHot rejects a mismatch up front.
// HotStores holds the long-lived, caller-owned per-chunk hot DB injected into
// RunHot. The caller (the daemon) opens and closes it; RunHot only borrows it
// to drive the per-ledger atomic ingest. The DB is chunk-bound (it accumulates
// exactly one chunk before being frozen into cold artifacts), so the injected
// DB must already be bound to the chunk being ingested — RunHot rejects a
// mismatch up front. A nil DB with any data type enabled in cfg is a
// configuration error caught by RunHot.
type HotStores struct {
Ledgers *ledger.HotStore
Txhash *txhash.HotStore
Events *eventstore.HotStore
// HotDB is the per-chunk hot DB. Required when any hot data type is enabled.
HotDB *hotchunk.DB
}

// buildHotIngesters constructs one HotIngester per data type enabled in cfg, in
// canonical ledgers→txhash→events order, from the injected stores. It errors if
// an enabled type's store is nil.
func buildHotIngesters(stores HotStores, sink MetricSink, cfg Config) ([]HotIngester, error) {
var ings []HotIngester
if cfg.Ledgers {
if stores.Ledgers == nil {
return nil, errors.New("ingest: Ledgers enabled but HotStores.Ledgers is nil")
}
ings = append(ings, NewLedgerHotIngester(stores.Ledgers, sink))
}
if cfg.Txhash {
if stores.Txhash == nil {
return nil, errors.New("ingest: Txhash enabled but HotStores.Txhash is nil")
}
ings = append(ings, NewTxhashHotIngester(stores.Txhash, sink))
}
if cfg.Events {
if stores.Events == nil {
return nil, errors.New("ingest: Events enabled but HotStores.Events is nil")
}
ings = append(ings, NewEventsHotIngester(stores.Events, sink))
}
return ings, nil
// ingestContributions maps the ingest Config's enabled data types onto the
// hotchunk.Ingest toggles that select which CFs the single per-ledger batch
// writes.
func ingestContributions(cfg Config) hotchunk.Ingest {
return hotchunk.Ingest{Ledgers: cfg.Ledgers}
}

// buildColdIngesters opens one ColdIngester per data type enabled in cfg,
// each opening its own per-chunk writer under coldDir/<type> (constructors
// create their own directories and freely overwrite any prior attempt's
// files — see the package doc's artifact model). The constructor table below
// is the single definition site of the canonical ledgers→txhash→events order
// (buildHotIngesters keeps its explicit if-ladder because its three injected
// store types differ). On any constructor error it closes the ingesters built
// so far and returns.
// files — see the package doc's artifact model). On any constructor error it
// closes the ingesters built so far and returns.
func buildColdIngesters(coldDir string, chunkID chunk.ID, sink MetricSink, cfg Config) ([]ColdIngester, error) {
ctors := []struct {
enabled bool
dataType string
open func(string, chunk.ID, MetricSink) (ColdIngester, error)
}{
{cfg.Ledgers, dataTypeLedgers, NewLedgerColdIngester},
{cfg.Txhash, dataTypeTxhash, NewTxhashColdIngester},
{cfg.Events, dataTypeEvents, NewEventsColdIngester},
}
var ings []ColdIngester
for _, c := range ctors {
Expand Down Expand Up @@ -123,11 +96,12 @@ func closeColdAll(ings []ColdIngester, err error) error {
}

// RunHot opens one stream for chunkID from source and feeds each ledger (as a
// view) to a HotService over the enabled hot ingesters, built from the INJECTED,
// caller-owned stores in hotStores. Ingest errors abort fast; HotService.Ingest
// waits for all ingesters before the loop pulls again so the borrowed view is
// never read past its lifetime. The hot stores are NOT closed here — the caller
// owns their lifecycle.
// view) to a HotService backed by the INJECTED, caller-owned shared per-chunk
// hot DB in hotStores. Each ledger commits as ONE atomic synced WriteBatch
// across all enabled CFs (decision (a)); Ingest errors abort fast, and
// HotService.Ingest consumes the borrowed view synchronously before the loop
// pulls the next ledger. The hot DB is NOT closed here — the caller owns its
// lifecycle.
func RunHot(
ctx context.Context,
logger *supportlog.Entry,
Expand All @@ -140,47 +114,25 @@ func RunHot(
if verr := cfg.validate(); verr != nil {
return verr
}
// Every hot store is chunk-bound — each instance accumulates exactly one
// chunk's data before being frozen into the chunk's cold artifacts — and
// records its chunk at open time. An injected store bound to a different
// chunk than we're ingesting would silently interleave two chunks' data
// (ledgers, txhash) or fail every per-ledger write with an out-of-range
// offset (events, whose LedgerOffsets are chunk-relative), so catch the
// mismatch up front with a clear message. Nil stores are skipped here:
// buildHotIngesters rejects a nil store for an enabled type with a more
// specific error.
checkBinding := func(name string, got chunk.ID) error {
if got != chunkID {
return fmt.Errorf("ingest: RunHot chunk %d but injected %s store is bound to chunk %d",
uint32(chunkID), name, uint32(got))
}
return nil
}
if cfg.Ledgers && hotStores.Ledgers != nil {
if err := checkBinding("Ledgers", hotStores.Ledgers.ChunkID()); err != nil {
return err
}
}
if cfg.Txhash && hotStores.Txhash != nil {
if err := checkBinding("Txhash", hotStores.Txhash.ChunkID()); err != nil {
return err
}
}
if cfg.Events && hotStores.Events != nil {
if err := checkBinding("Events", hotStores.Events.ChunkID()); err != nil {
return err
}
anyEnabled := cfg.Ledgers
if anyEnabled && hotStores.HotDB == nil {
return errors.New("ingest: a hot data type is enabled but HotStores.HotDB is nil")
}
ings, berr := buildHotIngesters(hotStores, sink, cfg)
if berr != nil {
return berr
// The hot DB is chunk-bound — it accumulates exactly one chunk's data
// before being frozen into the chunk's cold artifacts — and records its
// chunk at open time. An injected DB bound to a different chunk than we're
// ingesting would silently interleave two chunks' data, so catch the
// mismatch up front with a clear message.
if hotStores.HotDB != nil && hotStores.HotDB.ChunkID() != chunkID {
return fmt.Errorf("ingest: RunHot chunk %d but injected hot DB is bound to chunk %d",
uint32(chunkID), uint32(hotStores.HotDB.ChunkID()))
}
stream, oerr := source.OpenStream(chunkID)
if oerr != nil {
return fmt.Errorf("open stream for chunk %d: %w", uint32(chunkID), oerr)
}
logger.Debugf("RunHot: ingesting chunk %d [%d, %d]", uint32(chunkID), chunkID.FirstLedger(), chunkID.LastLedger())
service := NewHotService(ings, sink)
service := NewHotService(hotStores.HotDB, ingestContributions(cfg), sink)
return drain(ctx, stream, chunkID, service)
}

Expand Down Expand Up @@ -235,6 +187,108 @@ func drain(ctx context.Context, stream ledgerbackend.LedgerStream, chunkID chunk
return nil
}

// ColdDirs names the per-data-type output root for one chunk's cold artifacts.
// Each field is the directory UNDER WHICH the matching cold ingester composes
// its {bucketID:05d}/ subdirectory — i.e. the same `coldDir` the per-type
// constructor (NewLedgerColdIngester) takes. A field left "" for a data type
// enabled in cfg is a configuration error caught by RunColdChunk.
//
// RunCold derives this root from a single coldDir by appending the fixed
// dataType subdirectory (coldDir/ledgers). ColdDirs exists so a caller with a
// DIFFERENT on-disk layout can place each artifact at its own canonical path
// while reusing the very same cold ingesters, ColdService, and drain loop.
type ColdDirs struct {
Ledgers string
}

// buildColdIngestersIn opens one ColdIngester per data type enabled in cfg,
// each under its OWN root from dirs (rather than coldDir/<dataType>). It is the
// ColdDirs counterpart of buildColdIngesters: same constructors, same
// rollback-on-constructor-error semantics; it differs only in resolving each
// type's root from an explicit field instead of a fixed subdirectory of one
// coldDir.
func buildColdIngestersIn(dirs ColdDirs, chunkID chunk.ID, sink MetricSink, cfg Config) ([]ColdIngester, error) {
ctors := []struct {
enabled bool
dataType string
dir string
open func(string, chunk.ID, MetricSink) (ColdIngester, error)
}{
{cfg.Ledgers, dataTypeLedgers, dirs.Ledgers, NewLedgerColdIngester},
}
var ings []ColdIngester
for _, c := range ctors {
if !c.enabled {
continue
}
if c.dir == "" {
return nil, closeColdAll(ings, fmt.Errorf("ingest: %s enabled but ColdDirs.%s is empty", c.dataType, c.dataType))
}
ing, err := c.open(c.dir, chunkID, sink)
if err != nil {
return nil, closeColdAll(ings, fmt.Errorf("open %s cold ingester: %w", c.dataType, err))
}
ings = append(ings, ing)
}
return ings, nil
}

// RunColdChunk ingests EXACTLY ONE chunk's cold artifacts from source into the
// per-data-type roots named by dirs, in a single streaming pass over the
// chunk's ledgers. It is the single-chunk, explicit-layout sibling of RunCold:
// it reuses the same cold ingester constructors, the same ColdService, and the
// same drain loop (sequence/overrun validation, full-range completeness check
// before Finalize), differing only in (1) producing one chunk rather than N
// concurrent chunks and (2) taking explicit per-type output roots so a caller
// whose layout is not coldDir/<dataType> can still reuse the cold pipeline
// verbatim.
//
// The cold ingesters overwrite any prior attempt's files at their canonical
// paths (see the package doc's artifact model), so RunColdChunk is the
// re-materialization primitive the streaming freeze protocol drives: a partial
// file from a crashed attempt is inert scratch the next call overwrites.
func RunColdChunk(
ctx context.Context,
logger *supportlog.Entry,
source ChunkSource,
dirs ColdDirs,
chunkID chunk.ID,
sink MetricSink,
cfg Config,
) (err error) {
if verr := cfg.validate(); verr != nil {
return verr
}
sink = orNop(sink)
start := time.Now()
if cerr := ctx.Err(); cerr != nil {
sink.ColdChunkTotal(time.Since(start))
return cerr
}
stream, oerr := source.OpenStream(chunkID)
if oerr != nil {
sink.ColdChunkTotal(time.Since(start))
return fmt.Errorf("open stream for chunk %d: %w", uint32(chunkID), oerr)
}
ings, berr := buildColdIngestersIn(dirs, chunkID, sink, cfg)
if berr != nil {
sink.ColdChunkTotal(time.Since(start))
return berr
}
logger.Debugf("RunColdChunk: ingesting chunk %d [%d, %d]",
uint32(chunkID), chunkID.FirstLedger(), chunkID.LastLedger())
service := NewColdService(ings, sink)
defer func() {
if cerr := service.Close(); cerr != nil {
err = errors.Join(err, fmt.Errorf("close: %w", cerr))
}
}()
if derr := drain(ctx, stream, chunkID, service); derr != nil {
return derr
}
return service.Finalize(ctx)
}

// RunCold ingests numChunks consecutive chunks starting at startChunk into the
// cold stores under coldDir, processing up to chunkWorkers chunks concurrently.
// Each chunk worker opens its own stream via source.OpenStream(chunkID), builds
Expand Down
Loading
Loading