Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 38 additions & 27 deletions cmd/stellar-rpc/internal/fullhistory/ingest/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,37 +17,42 @@ import (
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/hotchunk"
)

// HotStores holds the long-lived, caller-owned per-chunk hot DB injected into
// RunHot. The caller (the daemon) opens and closes it; RunHot only borrows it
// to drive the per-ledger atomic ingest. The DB is chunk-bound (it accumulates
// exactly one chunk before being frozen into cold artifacts), so the injected
// DB must already be bound to the chunk being ingested — RunHot rejects a
// mismatch up front. A nil DB with any data type enabled in cfg is a
// configuration error caught by RunHot.
// HotStores holds the long-lived, caller-owned shared per-chunk hot DB injected
// into RunHot. The caller (the daemon) opens and closes it; RunHot only borrows
// it to drive the per-ledger atomic ingest. Under decision (a) this is ONE
// multi-CF RocksDB instance (ledgers + events CFs + txhash CFs), not three
// independent stores. The DB is chunk-bound (it accumulates exactly one chunk
// before being frozen into cold artifacts), so the injected DB must already be
// bound to the chunk being ingested — RunHot rejects a mismatch up front. A nil
// DB with any data type enabled in cfg is a configuration error caught by
// RunHot.
type HotStores struct {
// HotDB is the per-chunk hot DB. Required when any hot data type is enabled.
// HotDB is the shared per-chunk multi-CF hot DB. Required when any hot data
// type is enabled.
HotDB *hotchunk.DB
}

// ingestContributions maps the ingest Config's enabled data types onto the
// hotchunk.Ingest toggles that select which CFs the single per-ledger batch
// writes.
func ingestContributions(cfg Config) hotchunk.Ingest {
return hotchunk.Ingest{Ledgers: cfg.Ledgers, Events: cfg.Events}
return hotchunk.Ingest{Ledgers: cfg.Ledgers, Txhash: cfg.Txhash, Events: cfg.Events}
}

// buildColdIngesters opens one ColdIngester per data type enabled in cfg,
// each opening its own per-chunk writer under coldDir/<type> (constructors
// create their own directories and freely overwrite any prior attempt's
// files — see the package doc's artifact model). On any constructor error it
// closes the ingesters built so far and returns.
// files — see the package doc's artifact model). The constructor table below
// is the single definition site of the canonical ledgers→txhash→events order.
// On any constructor error it closes the ingesters built so far and returns.
func buildColdIngesters(coldDir string, chunkID chunk.ID, sink MetricSink, cfg Config) ([]ColdIngester, error) {
ctors := []struct {
enabled bool
dataType string
open func(string, chunk.ID, MetricSink) (ColdIngester, error)
}{
{cfg.Ledgers, dataTypeLedgers, NewLedgerColdIngester},
{cfg.Txhash, dataTypeTxhash, NewTxhashColdIngester},
{cfg.Events, dataTypeEvents, NewEventsColdIngester},
}
var ings []ColdIngester
Expand Down Expand Up @@ -115,15 +120,16 @@ func RunHot(
if verr := cfg.validate(); verr != nil {
return verr
}
anyEnabled := cfg.Ledgers || cfg.Events
anyEnabled := cfg.Ledgers || cfg.Txhash || cfg.Events
if anyEnabled && hotStores.HotDB == nil {
return errors.New("ingest: a hot data type is enabled but HotStores.HotDB is nil")
}
// The hot DB is chunk-bound — it accumulates exactly one chunk's data
// before being frozen into the chunk's cold artifacts — and records its
// chunk at open time. An injected DB bound to a different chunk than we're
// ingesting would silently interleave two chunks' data, so catch the
// mismatch up front with a clear message.
// The shared hot DB is chunk-bound — it accumulates exactly one chunk's
// data before being frozen into the chunk's cold artifacts — and records
// its chunk at open time. An injected DB bound to a different chunk than
// we're ingesting would silently interleave two chunks' data or fail every
// per-ledger events write with an out-of-range offset (LedgerOffsets are
// chunk-relative), so catch the mismatch up front with a clear message.
if hotStores.HotDB != nil && hotStores.HotDB.ChunkID() != chunkID {
return fmt.Errorf("ingest: RunHot chunk %d but injected hot DB is bound to chunk %d",
uint32(chunkID), uint32(hotStores.HotDB.ChunkID()))
Expand Down Expand Up @@ -191,24 +197,28 @@ func drain(ctx context.Context, stream ledgerbackend.LedgerStream, chunkID chunk
// ColdDirs names the per-data-type output root for one chunk's cold artifacts.
// Each field is the directory UNDER WHICH the matching cold ingester composes
// its {bucketID:05d}/ subdirectory — i.e. the same `coldDir` the per-type
// constructor (NewLedgerColdIngester) takes. A field left "" for a data type
// enabled in cfg is a configuration error caught by RunColdChunk.
// constructor (NewLedgerColdIngester / NewTxhashColdIngester /
// NewEventsColdIngester) takes. A field left "" for a data type enabled in cfg
// is a configuration error caught by RunColdChunk.
//
// RunCold derives this root from a single coldDir by appending the fixed
// dataType subdirectory (coldDir/ledgers). ColdDirs exists so a caller with a
// DIFFERENT on-disk layout can place each artifact at its own canonical path
// while reusing the very same cold ingesters, ColdService, and drain loop.
// RunCold derives these three roots from a single coldDir by appending the
// fixed dataType subdirectory (coldDir/ledgers, coldDir/txhash, coldDir/events).
// ColdDirs exists so a caller with a DIFFERENT on-disk layout (e.g. the
// streaming daemon, whose raw txhash runs live under txhash/raw, not txhash)
// can place each artifact at its own canonical path while reusing the very same
// cold ingesters, ColdService, and drain loop.
type ColdDirs struct {
Ledgers string
Txhash string
Events string
}

// buildColdIngestersIn opens one ColdIngester per data type enabled in cfg,
// each under its OWN root from dirs (rather than coldDir/<dataType>). It is the
// ColdDirs counterpart of buildColdIngesters: same constructors, same
// rollback-on-constructor-error semantics; it differs only in resolving each
// type's root from an explicit field instead of a fixed subdirectory of one
// coldDir.
// ColdDirs counterpart of buildColdIngesters: same constructors, same canonical
// ledgers→txhash→events order, same rollback-on-constructor-error semantics; it
// differs only in resolving each type's root from an explicit field instead of
// a fixed subdirectory of one coldDir.
func buildColdIngestersIn(dirs ColdDirs, chunkID chunk.ID, sink MetricSink, cfg Config) ([]ColdIngester, error) {
ctors := []struct {
enabled bool
Expand All @@ -217,6 +227,7 @@ func buildColdIngestersIn(dirs ColdDirs, chunkID chunk.ID, sink MetricSink, cfg
open func(string, chunk.ID, MetricSink) (ColdIngester, error)
}{
{cfg.Ledgers, dataTypeLedgers, dirs.Ledgers, NewLedgerColdIngester},
{cfg.Txhash, dataTypeTxhash, dirs.Txhash, NewTxhashColdIngester},
{cfg.Events, dataTypeEvents, dirs.Events, NewEventsColdIngester},
}
var ings []ColdIngester
Expand Down
Loading
Loading