diff --git a/cmd/stellar-rpc/internal/fullhistory/ingest/driver.go b/cmd/stellar-rpc/internal/fullhistory/ingest/driver.go index c333d5236..0cf3e9bcf 100644 --- a/cmd/stellar-rpc/internal/fullhistory/ingest/driver.go +++ b/cmd/stellar-rpc/internal/fullhistory/ingest/driver.go @@ -33,7 +33,7 @@ type HotStores struct { // hotchunk.Ingest toggles that select which CFs the single per-ledger batch // writes. func ingestContributions(cfg Config) hotchunk.Ingest { - return hotchunk.Ingest{Ledgers: cfg.Ledgers} + return hotchunk.Ingest{Ledgers: cfg.Ledgers, Events: cfg.Events} } // buildColdIngesters opens one ColdIngester per data type enabled in cfg, @@ -48,6 +48,7 @@ func buildColdIngesters(coldDir string, chunkID chunk.ID, sink MetricSink, cfg C open func(string, chunk.ID, MetricSink) (ColdIngester, error) }{ {cfg.Ledgers, dataTypeLedgers, NewLedgerColdIngester}, + {cfg.Events, dataTypeEvents, NewEventsColdIngester}, } var ings []ColdIngester for _, c := range ctors { @@ -114,7 +115,7 @@ func RunHot( if verr := cfg.validate(); verr != nil { return verr } - anyEnabled := cfg.Ledgers + anyEnabled := cfg.Ledgers || cfg.Events if anyEnabled && hotStores.HotDB == nil { return errors.New("ingest: a hot data type is enabled but HotStores.HotDB is nil") } @@ -199,6 +200,7 @@ func drain(ctx context.Context, stream ledgerbackend.LedgerStream, chunkID chunk // while reusing the very same cold ingesters, ColdService, and drain loop. type ColdDirs struct { Ledgers string + Events string } // buildColdIngestersIn opens one ColdIngester per data type enabled in cfg, @@ -215,6 +217,7 @@ func buildColdIngestersIn(dirs ColdDirs, chunkID chunk.ID, sink MetricSink, cfg open func(string, chunk.ID, MetricSink) (ColdIngester, error) }{ {cfg.Ledgers, dataTypeLedgers, dirs.Ledgers, NewLedgerColdIngester}, + {cfg.Events, dataTypeEvents, dirs.Events, NewEventsColdIngester}, } var ings []ColdIngester for _, c := range ctors { diff --git a/cmd/stellar-rpc/internal/fullhistory/ingest/ingest_test.go b/cmd/stellar-rpc/internal/fullhistory/ingest/ingest_test.go index dcd040df1..c4ce5dbb9 100644 --- a/cmd/stellar-rpc/internal/fullhistory/ingest/ingest_test.go +++ b/cmd/stellar-rpc/internal/fullhistory/ingest/ingest_test.go @@ -21,7 +21,9 @@ import ( supportlog "github.com/stellar/go-stellar-sdk/support/log" "github.com/stellar/go-stellar-sdk/xdr" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/events" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/chunk" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/eventstore" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/hotchunk" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/ledger" ) @@ -277,6 +279,50 @@ func buildLCMReturningHashes( return lcm, hashes } +// eventTopic is the contract-event topic the events fixtures share, so two +// fixtures' events resolve to the same term key. +const eventTopic = "ingest_test" + +// eventLCM builds a V2 LCM at seq carrying one transaction that emits a single +// contract event. Returns the wire bytes and the event's term key. +func eventLCM(t *testing.T, seq uint32) ([]byte, events.TermKey) { + t.Helper() + ev := buildContractEvent(eventTopic) + meta := xdr.TransactionMeta{ + V: 4, + V4: &xdr.TransactionMetaV4{Operations: []xdr.OperationMetaV2{{Events: []xdr.ContractEvent{ev}}}}, + } + raw, err := buildLCM(t, seq, []xdr.TransactionMeta{meta}).MarshalBinary() + require.NoError(t, err) + + evBytes, err := ev.MarshalBinary() + require.NoError(t, err) + keys, err := events.TermsForBytes(evBytes) + require.NoError(t, err) + require.NotEmpty(t, keys) + return raw, keys[0] +} + +// buildContractEvent returns a contract ContractEvent with a single symbol +// topic, the minimal shape the events extractor indexes. +func buildContractEvent(topic string) xdr.ContractEvent { + var contractID xdr.ContractId + contractID[0] = 0xab + contractID[1] = 0xcd + sym := xdr.ScSymbol(topic) + return xdr.ContractEvent{ + ContractId: &contractID, + Type: xdr.ContractEventTypeContract, + Body: xdr.ContractEventBody{ + V: 0, + V0: &xdr.ContractEventV0{ + Topics: []xdr.ScVal{{Type: xdr.ScValTypeScvSymbol, Sym: &sym}}, + Data: xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &sym}, + }, + }, + } +} + func testLogger() *supportlog.Entry { l := supportlog.New() l.SetLevel(logrus.ErrorLevel) @@ -499,6 +545,54 @@ func TestColdService_Success(t *testing.T) { require.Len(t, sink.coldIngests, 1, "Close after Finalize must not re-emit per-ingester signals") } +// TestColdService_LedgersAndEvents drives BOTH the ledger and events cold +// ingesters through one ColdService over event-bearing ledgers (via the +// explicit-dirs builder processChunk uses), then reads back the ledger pack +// AND the events cold segment, proving the events kind lands across CFs on the +// cold path. +func TestColdService_LedgersAndEvents(t *testing.T) { + chunkID := chunk.ID(0) + first := chunkID.FirstLedger() + dirs := ColdDirs{Ledgers: t.TempDir(), Events: t.TempDir()} + sink := &testSink{} + + ings, err := buildColdIngestersIn(dirs, chunkID, sink, Config{Ledgers: true, Events: true}) + require.NoError(t, err) + service := NewColdService(ings, sink) + defer func() { require.NoError(t, service.Close()) }() + + raw0, term0 := eventLCM(t, first) + raw1, _ := eventLCM(t, first+1) + require.NoError(t, service.Ingest(context.Background(), first, xdr.LedgerCloseMetaView(raw0))) + require.NoError(t, service.Ingest(context.Background(), first+1, xdr.LedgerCloseMetaView(raw1))) + require.NoError(t, service.Finalize(context.Background())) + + // Ledger cold readback: the boundary ledger reads back to the right bytes. + lcr, err := ledger.OpenColdReader(packPath(dirs.Ledgers, chunkID)) + require.NoError(t, err) + defer func() { require.NoError(t, lcr.Close()) }() + gotFirst, err := lcr.GetLedgerRaw(first) + require.NoError(t, err) + require.Equal(t, raw0, gotFirst) + + // Events cold readback: the shared event term resolves to both ledgers' + // events in the frozen cold segment. + ecr, err := eventstore.OpenColdReader( + chunkID, filepath.Join(dirs.Events, chunkID.BucketID()), eventstore.ColdReaderOptions{}) + require.NoError(t, err) + defer func() { require.NoError(t, ecr.Close()) }() + bm, err := ecr.Lookup(context.Background(), term0) + require.NoError(t, err) + require.NotNil(t, bm) + require.Equal(t, uint64(2), bm.GetCardinality(), "both ledgers share the event term") + + // Metrics: one ColdIngest per data type, no errors. + cdt := sink.coldDataTypes() + require.Equal(t, 1, cdt[dataTypeLedgers]) + require.Equal(t, 1, cdt[dataTypeEvents]) + require.Empty(t, sink.coldErrorTypes(), "success path records no ingester errors") +} + // failingCold is a ColdIngester whose Ingest always fails, modeling a mid-chunk // error. Finalize must NOT run on this path. type failingCold struct { diff --git a/cmd/stellar-rpc/internal/fullhistory/ingest/service.go b/cmd/stellar-rpc/internal/fullhistory/ingest/service.go index a6fb16722..56cb19946 100644 --- a/cmd/stellar-rpc/internal/fullhistory/ingest/service.go +++ b/cmd/stellar-rpc/internal/fullhistory/ingest/service.go @@ -64,6 +64,9 @@ func (s *HotService) emit(counts hotchunk.LedgerCounts, d time.Duration, err err if s.cfg.Ledgers { s.sink.HotIngest(dataTypeLedgers, d, itemsOnSuccess(counts.Ledgers, err), err) } + if s.cfg.Events { + s.sink.HotIngest(dataTypeEvents, d, itemsOnSuccess(counts.Events, err), err) + } } // itemsOnSuccess returns n on success and 0 on error — a failed atomic batch diff --git a/cmd/stellar-rpc/internal/fullhistory/pkg/stores/eventstore/hot_store.go b/cmd/stellar-rpc/internal/fullhistory/pkg/stores/eventstore/hot_store.go index 0b95fc8ef..50af4de31 100644 --- a/cmd/stellar-rpc/internal/fullhistory/pkg/stores/eventstore/hot_store.go +++ b/cmd/stellar-rpc/internal/fullhistory/pkg/stores/eventstore/hot_store.go @@ -79,6 +79,16 @@ func hotStoreCFOptions() map[string]rocksdb.CFOptions { } } +// CFNames returns the three column families this facade owns. Exported +// so the hotchunk shared-DB opener can register them alongside the +// ledger and txhash CFs (decision (a)). +func CFNames() []string { return []string{DataCF, IndexCF, OffsetsCF} } + +// CFOptions returns this facade's per-CF options (ZSTD on DataCF, tuned +// block sizes on all three). Exported so the hotchunk opener merges +// them into the shared per-chunk DB's PerCFOptions. +func CFOptions() map[string]rocksdb.CFOptions { return hotStoreCFOptions() } + // openHotChunk opens (or creates) chunkID's per-Chunk hot RocksDB DB // at HotChunkDir(dataDir, chunkID). The three per-Chunk CFs are // configured at New so they auto-create on a fresh DB and are @@ -153,6 +163,11 @@ type HotStore struct { chunkID chunk.ID mirror *events.ConcurrentBitmaps offsets *events.ConcurrentLedgerOffsets + // ownsStore is true when this HotStore opened its own dedicated DB + // (standalone OpenHotStore); false when wrapping the SHARED + // per-chunk multi-CF DB injected via NewWithStore (decision (a)), + // which the hotchunk.DB owns and closes once. + ownsStore bool } // Compile-time guard: *HotStore satisfies Reader. @@ -178,13 +193,31 @@ func OpenHotStore( if err != nil { return nil, err } - mirror, offsets, err := warmup(chunkStore, chunkID) + h, err := NewWithStore(chunkStore, chunkID) if err != nil { _ = chunkStore.Close() + return nil, err + } + h.ownsStore = true + return h, nil +} + +// NewWithStore wraps an ALREADY-OPEN rocksdb.Store as an events +// HotStore operating on the three events CFs (CFNames()), running the +// mandatory warmup over them to reconstruct the in-memory mirror + +// offsets. The store is NOT owned by the returned HotStore (Close is a +// no-op) — this is the constructor the hotchunk package uses to compose +// the events facade over the shared per-chunk multi-CF DB (decision +// (a)). The store must have been opened with CFNames() registered and +// CFOptions() applied. A warmup failure returns the error WITHOUT +// closing the shared store (the caller owns it). +func NewWithStore(store *rocksdb.Store, chunkID chunk.ID) (*HotStore, error) { + mirror, offsets, err := warmup(store, chunkID) + if err != nil { return nil, fmt.Errorf("events: warmup chunk %s: %w", chunkID, err) } return &HotStore{ - chunkStore: chunkStore, + chunkStore: store, chunkID: chunkID, mirror: mirror, offsets: offsets, @@ -203,6 +236,9 @@ func OpenHotStore( // race with either; chunkStore's IsClosed check inside // IngestLedgerEvents fast-fails any post-Close ingest attempt. func (h *HotStore) Close() error { + if !h.ownsStore { + return nil + } return h.chunkStore.Close() } @@ -453,7 +489,8 @@ func (h *HotStore) FetchRange(ctx context.Context, start, count uint32) iter.Seq if yielded != count { yield(events.Payload{}, fmt.Errorf( "events: FetchRange short scan for chunk %s: got %d of %d events at [%d, %d)", - h.chunkID, yielded, count, start, start+count)) + h.chunkID, yielded, count, start, start+count, + )) } } } @@ -509,18 +546,116 @@ func (h *HotStore) All(ctx context.Context) iter.Seq2[events.Payload, error] { // failure there panics rather than returning an error, because a // returned error would leave on-disk state ahead of in-memory state // with no clean recovery short of close + reopen. -// -//nolint:cyclop // sequential pipeline: validate -> marshal -> batch -> mirror updates func (h *HotStore) IngestLedgerEvents(ledgerSeq uint32, payloads []events.Payload) error { if h.chunkStore.IsClosed() { return ErrClosed } - // Validate ledger sequence BEFORE any disk write or mirror mutation. - // Failing the offsets.Append check after the RocksDB batch has - // committed would leave events orphaned under a bad ledger key. + // Atomic batch on the (here single-purpose) chunk DB: queue every CF + // Put for this ledger, commit once with sync=true, then apply the + // post-commit mirror/offsets update. This is the same prepare → queue + // → commit → apply pipeline the hotchunk package drives across the + // shared multi-CF DB; here the batch holds only the events CFs. + apply, err := h.IngestLedgerToBatchCommit(ledgerSeq, payloads) + if err != nil { + return err + } + if apply != nil { + apply() + } + return nil +} + +// IngestLedgerToBatchCommit is IngestLedgerEvents over a batch this +// facade owns end-to-end (validate → marshal → one synced batch). It +// returns the post-commit apply hook (mirror+offsets) the caller must +// run after the batch is durable, or (nil, nil) for an idempotent +// duplicate no-op. Split out so IngestLedgerToBatch can share the +// prepare step while committing into a SHARED cross-CF batch instead. +func (h *HotStore) IngestLedgerToBatchCommit(ledgerSeq uint32, payloads []events.Payload) (func(), error) { + prep, err := h.prepareLedger(ledgerSeq, payloads) + if err != nil { + return nil, err + } + if prep == nil { + return nil, nil //nolint:nilnil // idempotent duplicate no-op: nil apply hook, no error + } + if cerr := h.chunkStore.Batch(func(b *rocksdb.BatchWriter) error { + return prep.queue(b) + }); cerr != nil { + return nil, fmt.Errorf("events: commit ledger %d to chunk %s: %w", ledgerSeq, h.chunkID, cerr) + } + return prep.apply, nil +} + +// IngestLedgerToBatch validates+marshals one ledger's events and queues +// all their CF Puts (DataCF/IndexCF/OffsetsCF) into the SHARED batch b, +// returning the post-commit apply hook (mirror+offsets) the caller runs +// AFTER b commits durably (decision (a): one atomic synced WriteBatch +// per ledger across all CFs). Returns (nil, nil) for an idempotent +// duplicate no-op — the caller queues nothing for events and the apply +// hook is absent. All validation (range/order/overflow) and term +// derivation happen up front, so a rejected ledger leaves the shared +// batch untouched. +func (h *HotStore) IngestLedgerToBatch( + b *rocksdb.BatchWriter, ledgerSeq uint32, payloads []events.Payload, +) (func(), error) { + if h.chunkStore.IsClosed() { + return nil, ErrClosed + } + prep, err := h.prepareLedger(ledgerSeq, payloads) + if err != nil { + return nil, err + } + if prep == nil { + return nil, nil //nolint:nilnil // idempotent duplicate no-op: nil apply hook, no error + } + if qerr := prep.queue(b); qerr != nil { + return nil, qerr + } + return prep.apply, nil +} + +// preparedLedger is one validated, marshaled ledger ready to queue into +// a write batch (queue) and, once that batch is durable, apply to the +// in-memory mirror + offsets (apply). +type preparedLedger struct { + ledgerSeq uint32 + startID uint32 + blobs [][]byte // marshaled payload XDR, positional with payloads + termKeys [][]events.TermKey // per-payload term keys + apply func() // post-commit mirror + offsets update (infallible) +} + +// queue writes the prepared ledger's rows into b: one DataCF row per +// event, one IndexCF row per (term, event), and one OffsetsCF row for +// the ledger's per-ledger event count. +func (p *preparedLedger) queue(b *rocksdb.BatchWriter) error { + for i := range p.blobs { + eventID := p.startID + uint32(i) + b.Put(DataCF, encodeDataKey(eventID), p.blobs[i]) + for _, key := range p.termKeys[i] { + b.Put(IndexCF, encodeIndexKey(key, eventID), nil) + } + } + //nolint:gosec // bounds-checked in prepareLedger's overflow guard + eventCount := uint32(len(p.blobs)) + b.Put(OffsetsCF, encodeOffsetKey(p.ledgerSeq), encodeLedgerEventCount(eventCount)) + return nil +} + +// prepareLedger runs the full pre-commit pipeline for one ledger: +// sequence validation (range/order/overflow), term derivation, and +// payload marshaling into fresh per-event buffers. It returns a +// *preparedLedger ready to queue + apply, or (nil, nil) for an +// idempotent duplicate (already-committed ledger). It performs NO disk +// write and NO mirror mutation — a rejected ledger leaves all state +// untouched, so it is safe to call before touching a shared batch. +func (h *HotStore) prepareLedger(ledgerSeq uint32, payloads []events.Payload) (*preparedLedger, error) { + // Validate ledger sequence BEFORE any marshaling. Failing after a + // shared batch already holds this ledger's rows would orphan them. if ledgerSeq < h.chunkID.FirstLedger() || ledgerSeq > h.chunkID.LastLedger() { - return fmt.Errorf("%w: ledger %d not in chunk %s [%d, %d]", + return nil, fmt.Errorf("%w: ledger %d not in chunk %s [%d, %d]", ErrLedgerOutOfRange, ledgerSeq, h.chunkID, h.chunkID.FirstLedger(), h.chunkID.LastLedger()) } @@ -531,90 +666,80 @@ func (h *HotStore) IngestLedgerEvents(ledgerSeq uint32, payloads []events.Payloa // rather than erroring or double-appending. The re-delivered // events are not re-verified, so a re-delivery carrying different // events for an already-ingested ledger is silently ignored. - return nil + return nil, nil //nolint:nilnil // idempotent duplicate no-op: nil prepared ledger, no error } if ledgerSeq > expected { - return fmt.Errorf("%w: expected ledger %d, got %d", + return nil, fmt.Errorf("%w: expected ledger %d, got %d", ErrLedgerOutOfOrder, expected, ledgerSeq) } - // Pre-derive term keys per payload so the post-commit mirror - // update doesn't re-hash. Surfacing TermsForBytes errors here - // (pre-batch) cleanly rejects the ledger commit without touching disk — - // a decode failure on stellar-core-validated XDR is a corruption - // signal worth aborting on. + // Pre-derive term keys per payload so the post-commit mirror update + // doesn't re-hash. A TermsForBytes error here cleanly rejects the + // ledger without touching the batch — a decode failure on + // stellar-core-validated XDR is a corruption signal worth aborting on. termKeys := make([][]events.TermKey, len(payloads)) for i := range payloads { keys, err := events.TermsForBytes(payloads[i].ContractEventBytes) if err != nil { - return fmt.Errorf("events: derive terms for payload %d in ledger %d: %w", i, ledgerSeq, err) + return nil, fmt.Errorf("events: derive terms for payload %d in ledger %d: %w", i, ledgerSeq, err) } termKeys[i] = keys } startID := h.offsets.TotalEvents() if uint64(startID)+uint64(len(payloads)) > math.MaxUint32 { - return fmt.Errorf("events: chunk %s would overflow uint32 event-id space at ledger %d", + return nil, fmt.Errorf("events: chunk %s would overflow uint32 event-id space at ledger %d", h.chunkID, ledgerSeq) } - // Atomic batch on the per-Chunk DB. Each payload is marshaled into one - // reused scratch buffer: BatchWriter.Put copies the value into the write - // batch synchronously, so the scratch is free to reuse on the next - // iteration — no per-payload allocation. A marshal error returns from - // the callback, which aborts the batch so nothing commits. - var scratch []byte - err := h.chunkStore.Batch(func(b *rocksdb.BatchWriter) error { - for i := range payloads { - eventID := startID + uint32(i) - blob, err := payloads[i].MarshalInto(scratch[:0]) - if err != nil { - return fmt.Errorf("events: marshal payload %d for ledger %d: %w", i, ledgerSeq, err) - } - scratch = blob - b.Put(DataCF, encodeDataKey(eventID), blob) - for _, key := range termKeys[i] { - b.Put(IndexCF, encodeIndexKey(key, eventID), nil) - } + // Marshal each payload into its OWN fresh buffer (not a reused + // scratch): a shared batch may hold many ledgers' rows simultaneously + // before commit, so each blob must outlive the prepare call until the + // single Write copies it. BatchWriter.Put copies synchronously, so the + // buffers are free after queue returns. + blobs := make([][]byte, len(payloads)) + for i := range payloads { + blob, err := payloads[i].MarshalInto(nil) + if err != nil { + return nil, fmt.Errorf("events: marshal payload %d for ledger %d: %w", i, ledgerSeq, err) } - // On-disk shape matches the in-memory API: per-ledger event - // count, not cumulative. Warmup replays directly via - // offsets.Append(eventCount) — no delta arithmetic. - //nolint:gosec // bounds-checked above - eventCount := uint32(len(payloads)) - b.Put(OffsetsCF, encodeOffsetKey(ledgerSeq), encodeLedgerEventCount(eventCount)) - return nil - }) - if err != nil { - return fmt.Errorf("events: commit ledger %d to chunk %s: %w", ledgerSeq, h.chunkID, err) - } - - // Phase 3: the batch is durable — apply it to the in-memory cache. - // Infallible given the validation above (ledgerSeq == expected and - // in-chunk, single writer): mirror.AddTo cannot fail and offsets.Append - // appends at the already-validated next slot, so the only - // non-completion is a crash, after which warmup rebuilds the cache from - // disk. - // - // Ordering invariant: mirror BEFORE offsets. A concurrent Query - // that captures offsets via h.offsets.Snapshot() then later calls - // mirror.Get for the same key sees either the previous state - // (offsets count N-1, mirror without ledger-N events) or a - // consistent later one (offsets count ≥N, mirror with ledger-N - // events). Reversing the order would let a reader observe an - // offsets count that includes IDs the mirror hasn't published - // yet — Query would then ask FetchEvents for IDs not yet - // indexed; the bitmap intersection would simply miss them, with - // no error surface. - // + blobs[i] = blob + } + + prep := &preparedLedger{ + ledgerSeq: ledgerSeq, + startID: startID, + blobs: blobs, + termKeys: termKeys, + } + prep.apply = func() { h.applyLedger(prep) } + return prep, nil +} + +// applyLedger updates the in-memory mirror + offsets for a ledger whose +// rows are now durable. Infallible by construction (the prepare step +// validated ledgerSeq == expected and in-chunk under the single-writer +// contract): the only non-completion is a crash, after which warmup +// rebuilds the cache from disk. +// +// Ordering invariant: mirror BEFORE offsets. A concurrent Query that +// captures offsets via h.offsets.Snapshot() then later calls mirror.Get +// for the same key sees either the previous state (offsets count N-1, +// mirror without ledger-N events) or a consistent later one (offsets +// count ≥N, mirror with ledger-N events). Reversing the order would let +// a reader observe an offsets count that includes IDs the mirror hasn't +// published yet — Query would then ask FetchEvents for IDs not yet +// indexed; the bitmap intersection would simply miss them, with no +// error surface. +func (h *HotStore) applyLedger(p *preparedLedger) { // Batch by key so each ConcurrentBitmaps.AddTo call clones at most // once per (key, ledger), not once per (key, event). For popular // terms that receive many events in one ledger this turns N COW // clones into 1. Initial capacity 64 ≈ a few × unique-terms per // typical ledger; the map grows correctly past that. perKeyIDs := make(map[events.TermKey][]uint32, 64) - for i, keys := range termKeys { - eventID := startID + uint32(i) + for i, keys := range p.termKeys { + eventID := p.startID + uint32(i) for _, key := range keys { perKeyIDs[key] = append(perKeyIDs[key], eventID) } @@ -622,9 +747,8 @@ func (h *HotStore) IngestLedgerEvents(ledgerSeq uint32, payloads []events.Payloa for key, ids := range perKeyIDs { h.mirror.AddTo(key, ids...) } - //nolint:gosec // len bounded by the overflow check above - h.offsets.Append(uint32(len(payloads))) - return nil + //nolint:gosec // len bounded by prepareLedger's overflow guard + h.offsets.Append(uint32(len(p.blobs))) } // ────────────────────────────────────────────────────────────────── diff --git a/cmd/stellar-rpc/internal/fullhistory/pkg/stores/hotchunk/hotchunk.go b/cmd/stellar-rpc/internal/fullhistory/pkg/stores/hotchunk/hotchunk.go index 0563698b5..5b4949a40 100644 --- a/cmd/stellar-rpc/internal/fullhistory/pkg/stores/hotchunk/hotchunk.go +++ b/cmd/stellar-rpc/internal/fullhistory/pkg/stores/hotchunk/hotchunk.go @@ -16,9 +16,11 @@ import ( supportlog "github.com/stellar/go-stellar-sdk/support/log" "github.com/stellar/go-stellar-sdk/xdr" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/events" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/chunk" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/rocksdb" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/eventstore" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/ledger" ) @@ -35,11 +37,17 @@ type DB struct { chunkID chunk.ID ledger *ledger.HotStore + events *eventstore.HotStore } -// columnFamilies returns the CF list for the per-chunk DB: the ledger CF. +// columnFamilies returns the CF list for the per-chunk DB: the ledger CF +// plus the three events CFs. Names are non-colliding across the facades +// ("ledgers"; "events_data"/"events_index"/"events_offsets"). func columnFamilies() []string { - return []string{ledger.LedgersCF} + cfs := make([]string, 0, 1+len(eventstore.CFNames())) + cfs = append(cfs, ledger.LedgersCF) + cfs = append(cfs, eventstore.CFNames()...) + return cfs } // config builds the per-chunk store's rocksdb.Config. It rides on @@ -51,6 +59,7 @@ func config(path string, logger *supportlog.Entry) rocksdb.Config { Path: path, ColumnFamilies: columnFamilies(), Logger: logger, + PerCFOptions: eventstore.CFOptions(), } } @@ -68,10 +77,16 @@ func Open(path string, chunkID chunk.ID, logger *supportlog.Entry) (*DB, error) return nil, fmt.Errorf("hotchunk: open chunk %s: %w", chunkID, err) } + es, err := eventstore.NewWithStore(store, chunkID) + if err != nil { + _ = store.Close() + return nil, fmt.Errorf("hotchunk: compose events facade for chunk %s: %w", chunkID, err) + } return &DB{ store: store, chunkID: chunkID, ledger: ledger.NewWithStore(store, chunkID), + events: es, }, nil } @@ -81,6 +96,9 @@ func (d *DB) ChunkID() chunk.ID { return d.chunkID } // Ledgers returns the ledger read/write facade over the shared store. func (d *DB) Ledgers() *ledger.HotStore { return d.ledger } +// Events returns the events read/write facade over the shared store. +func (d *DB) Events() *eventstore.HotStore { return d.events } + // Close releases the shared store exactly once. Idempotent (delegates // to rocksdb.Store.Close, which is itself idempotent). Must not be // called concurrently with in-flight reads/writes. @@ -98,6 +116,7 @@ func (d *DB) MaxCommittedSeq() (uint32, bool, error) { // dependency on the ingest package (which depends on the stores). type Ingest struct { Ledgers bool + Events bool } // LedgerCounts reports how many items each data type contributed to one @@ -105,6 +124,7 @@ type Ingest struct { // (HotService) emit per-type volume metrics without re-deriving them. type LedgerCounts struct { Ledgers int + Events int } // IngestLedger commits ONE ledger to the hot DB as a SINGLE atomic, @@ -121,21 +141,62 @@ func (d *DB) IngestLedger(seq uint32, lcm xdr.LedgerCloseMetaView, cfg Ingest) ( return counts, stores.ErrStoreClosed } + // Pre-extract the events payloads BEFORE opening the batch, so a decode + // error rejects the ledger without a half-built batch. + var payloads []events.Payload + if cfg.Events { + p, err := eventPayloads(seq, lcm) + if err != nil { + return counts, err + } + payloads = p + counts.Events = len(payloads) + } if cfg.Ledgers { counts.Ledgers = 1 } + // The events facade validates sequence/order and marshals up front so a + // rejected events ledger never touches the shared batch; it returns the + // post-commit apply hook (nil for an idempotent duplicate). + var applyEvents func() cerr := d.store.Batch(func(b *rocksdb.BatchWriter) error { if cfg.Ledgers { if err := d.ledger.AddLedgerToBatch(b, ledger.Entry{Seq: seq, Bytes: []byte(lcm)}); err != nil { return fmt.Errorf("hotchunk: queue ledger seq %d: %w", seq, err) } } + if cfg.Events { + apply, err := d.events.IngestLedgerToBatch(b, seq, payloads) + if err != nil { + return fmt.Errorf("hotchunk: queue events seq %d: %w", seq, err) + } + applyEvents = apply + } return nil }) if cerr != nil { return counts, fmt.Errorf("hotchunk: commit ledger %d to chunk %s: %w", seq, d.chunkID, cerr) } + // The batch is durable — now and only now apply the events in-memory + // mirror/offsets update (nil on an idempotent duplicate). + if applyEvents != nil { + applyEvents() + } return counts, nil } + +// eventPayloads derives one ledger's event payloads from the view. A V0 +// (pre-Soroban) ledger has no contract events and yields zero payloads, +// recorded like any event-free ledger — LCMViewToPayloads returns them +// empty, with no error. Mirrors ingest.eventPayloads — duplicated here +// (a few lines) rather than importing ingest, which would create a +// dependency cycle (ingest will depend on hotchunk). +func eventPayloads(seq uint32, lcm xdr.LedgerCloseMetaView) ([]events.Payload, error) { + payloads, err := events.LCMViewToPayloads(lcm) + if err != nil { + return nil, fmt.Errorf("hotchunk: LCMViewToPayloads seq %d: %w", seq, err) + } + return payloads, nil +} diff --git a/cmd/stellar-rpc/internal/fullhistory/pkg/stores/hotchunk/hotchunk_test.go b/cmd/stellar-rpc/internal/fullhistory/pkg/stores/hotchunk/hotchunk_test.go index 4afa38c5f..ca296a00b 100644 --- a/cmd/stellar-rpc/internal/fullhistory/pkg/stores/hotchunk/hotchunk_test.go +++ b/cmd/stellar-rpc/internal/fullhistory/pkg/stores/hotchunk/hotchunk_test.go @@ -1,21 +1,28 @@ package hotchunk import ( + "context" "testing" "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/stellar/go-stellar-sdk/keypair" + "github.com/stellar/go-stellar-sdk/network" supportlog "github.com/stellar/go-stellar-sdk/support/log" "github.com/stellar/go-stellar-sdk/xdr" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/events" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/chunk" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/rocksdb" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/eventstore" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/ledger" ) +const testPassphrase = "Public Global Stellar Network ; September 2015" + func silentLogger() *supportlog.Entry { log := supportlog.New() log.SetLevel(logrus.ErrorLevel) @@ -30,7 +37,7 @@ func openTestDB(t *testing.T, chunkID chunk.ID) *DB { return db } -func allTypes() Ingest { return Ingest{Ledgers: true} } +func allTypes() Ingest { return Ingest{Ledgers: true, Events: true} } func TestOpen_ValidatesInputs(t *testing.T) { _, err := Open("", chunk.ID(0), silentLogger()) @@ -40,10 +47,14 @@ func TestOpen_ValidatesInputs(t *testing.T) { require.ErrorIs(t, err, stores.ErrInvalidConfig) } -func TestColumnFamilies_IsLedgerCF(t *testing.T) { +func TestColumnFamilies_IsLedgerAndEventsCFs(t *testing.T) { cfs := columnFamilies() - require.Len(t, cfs, 1) + // 1 ledger CF + 3 events CFs. + require.Len(t, cfs, 1+len(eventstore.CFNames())) require.Equal(t, ledger.LedgersCF, cfs[0]) + for _, cf := range eventstore.CFNames() { + require.Contains(t, cfs, cf) + } } // TestIngestLedger_LedgerCommittedAndWatermarkAdvances is the core decision-(a) @@ -175,6 +186,58 @@ func TestIngestLedger_ClosedDBFails(t *testing.T) { require.ErrorIs(t, err, stores.ErrStoreClosed) } +// TestIngestLedger_EventsCommittedAcrossEventsCFs is the events decision-(a) +// proof: a ledger carrying one contract event commits the ledger AND the +// event in the SAME batch, so the events facade indexes the event's term and +// the event-id watermark advances alongside the ledger watermark. +func TestIngestLedger_EventsCommittedAcrossEventsCFs(t *testing.T) { + chunkID := chunk.ID(0) + first := chunkID.FirstLedger() + db := openTestDB(t, chunkID) + + rawA, termA := lcmWithEvent(t, first) + rawB, _ := lcmWithEvent(t, first+1) + + counts, err := db.IngestLedger(first, xdr.LedgerCloseMetaView(rawA), allTypes()) + require.NoError(t, err) + assert.Equal(t, LedgerCounts{Ledgers: 1, Events: 1}, counts) + + counts, err = db.IngestLedger(first+1, xdr.LedgerCloseMetaView(rawB), allTypes()) + require.NoError(t, err) + assert.Equal(t, LedgerCounts{Ledgers: 1, Events: 1}, counts) + + // events CFs: the shared term resolves to both ledgers' events, and the + // event-id watermark advanced to 2. + bm, err := db.Events().Lookup(context.Background(), termA) + require.NoError(t, err) + require.NotNil(t, bm) + assert.Equal(t, uint64(2), bm.GetCardinality(), "both ledgers share the event term") + assert.Equal(t, uint32(2), db.Events().NextEventID()) + + // The single watermark equals the last committed ledger seq. + maxSeq, ok, err := db.MaxCommittedSeq() + require.NoError(t, err) + require.True(t, ok) + assert.Equal(t, first+1, maxSeq) +} + +// TestIngestLedger_DisabledEventsUntouched confirms an Ingest selection without +// Events leaves the events CFs empty even when the ledger carries an event. +func TestIngestLedger_DisabledEventsUntouched(t *testing.T) { + chunkID := chunk.ID(0) + first := chunkID.FirstLedger() + db := openTestDB(t, chunkID) + + raw, term := lcmWithEvent(t, first) + counts, err := db.IngestLedger(first, xdr.LedgerCloseMetaView(raw), Ingest{Ledgers: true}) + require.NoError(t, err) + assert.Equal(t, LedgerCounts{Ledgers: 1}, counts) + + _, lerr := db.Events().Lookup(context.Background(), term) + require.ErrorIs(t, lerr, eventstore.ErrTermNotFound) + assert.Equal(t, uint32(0), db.Events().NextEventID()) +} + // ──────────────────────────── LCM fixtures ──────────────────────────── // zeroTxLCM builds a minimal V2 LCM with no transactions at the given sequence. @@ -200,3 +263,101 @@ func zeroTxLCM(t *testing.T, seq uint32) []byte { require.NoError(t, err) return raw } + +// lcmWithEvent builds a V2 LCM at seq carrying one transaction that emits a +// single contract event (topic="hotchunk_test"). Returns the wire bytes and +// the event's term key. +func lcmWithEvent(t *testing.T, seq uint32) ([]byte, events.TermKey) { + t.Helper() + ev := buildContractEvent("hotchunk_test") + meta := xdr.TransactionMeta{ + V: 4, + V4: &xdr.TransactionMetaV4{Operations: []xdr.OperationMetaV2{{Events: []xdr.ContractEvent{ev}}}}, + } + lcm := buildLCMWithTx(t, seq, meta) + raw, err := lcm.MarshalBinary() + require.NoError(t, err) + + evBytes, err := ev.MarshalBinary() + require.NoError(t, err) + keys, err := events.TermsForBytes(evBytes) + require.NoError(t, err) + require.NotEmpty(t, keys) + return raw, keys[0] +} + +func buildContractEvent(topic string) xdr.ContractEvent { + var contractID xdr.ContractId + contractID[0] = 0xab + contractID[1] = 0xcd + sym := xdr.ScSymbol(topic) + return xdr.ContractEvent{ + ContractId: &contractID, + Type: xdr.ContractEventTypeContract, + Body: xdr.ContractEventBody{ + V: 0, + V0: &xdr.ContractEventV0{ + Topics: []xdr.ScVal{{Type: xdr.ScValTypeScvSymbol, Sym: &sym}}, + Data: xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &sym}, + }, + }, + } +} + +func successResult() xdr.TransactionResult { + opResults := []xdr.OperationResult{} + return xdr.TransactionResult{ + FeeCharged: 100, + Result: xdr.TransactionResultResult{ + Code: xdr.TransactionResultCodeTxSuccess, + Results: &opResults, + }, + } +} + +func buildLCMWithTx(t *testing.T, seq uint32, meta xdr.TransactionMeta) xdr.LedgerCloseMeta { + t.Helper() + envelope := xdr.TransactionEnvelope{ + Type: xdr.EnvelopeTypeEnvelopeTypeTx, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + SourceAccount: xdr.MustMuxedAddress(keypair.MustRandom().Address()), + Ext: xdr.TransactionExt{ + V: 1, + SorobanData: &xdr.SorobanTransactionData{}, + }, + }, + }, + } + hash, err := network.HashTransactionInEnvelope(envelope, testPassphrase) + require.NoError(t, err) + + comp := []xdr.TxSetComponent{{ + Type: xdr.TxSetComponentTypeTxsetCompTxsMaybeDiscountedFee, + TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{ + Txs: []xdr.TransactionEnvelope{envelope}, + }, + }} + return xdr.LedgerCloseMeta{ + V: 2, + V2: &xdr.LedgerCloseMetaV2{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{CloseTime: xdr.TimePoint(0)}, + LedgerSeq: xdr.Uint32(seq), + }, + }, + TxSet: xdr.GeneralizedTransactionSet{ + V: 1, + V1TxSet: &xdr.TransactionSetV1{Phases: []xdr.TransactionPhase{{V: 0, V0Components: &comp}}}, + }, + TxProcessing: []xdr.TransactionResultMetaV1{{ + TxApplyProcessing: meta, + Result: xdr.TransactionResultPair{ + TransactionHash: hash, + Result: successResult(), + }, + }}, + }, + } +} diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/artifacts.go b/cmd/stellar-rpc/internal/fullhistory/streaming/artifacts.go index 2da436423..a3253f5b5 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/artifacts.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/artifacts.go @@ -98,5 +98,6 @@ func (s ArtifactSet) String() string { func (s ArtifactSet) ingestConfig() ingest.Config { //nolint:unused // called from processChunk in a later layer return ingest.Config{ Ledgers: s.Has(KindLedgers), + Events: s.Has(KindEvents), } } diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/audit_invariants.go b/cmd/stellar-rpc/internal/fullhistory/streaming/audit_invariants.go index 7e68d3185..ea8491f03 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/audit_invariants.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/audit_invariants.go @@ -361,6 +361,7 @@ func (c *Catalog) auditReadCorrectness(opts AuditOptions, report *AuditReport) e func (c *Catalog) artifactFileRoots() []string { return []string{ c.layout.LedgersRoot(), + c.layout.EventsRoot(), } } diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/audit_test.go b/cmd/stellar-rpc/internal/fullhistory/streaming/audit_test.go index 720db37b6..edeb8da9c 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/audit_test.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/audit_test.go @@ -61,14 +61,31 @@ func TestAudit_CleanStoreNoViolations(t *testing.T) { cat, _ := testCatalog(t) require.NoError(t, cat.PutEarliestLedger(chunk.FirstLedgerSeq)) - freezeChunkArtifacts(t, cat, 0, KindLedgers) - freezeChunkArtifacts(t, cat, 1, KindLedgers) + freezeChunkArtifacts(t, cat, 0, KindLedgers, KindEvents) + freezeChunkArtifacts(t, cat, 1, KindLedgers, KindEvents) report, err := cat.Audit(AuditOptions{}) require.NoError(t, err) require.True(t, report.Clean(), "expected clean audit, got: %v", report.Violations) } +// TestAudit_INV3_OrphanEventsFileNoKey confirms the INV-3 disk->meta walk now +// covers the events tree: a stray events cold-segment file with no meta key is +// flagged as an orphan. +func TestAudit_INV3_OrphanEventsFileNoKey(t *testing.T) { + cat, _ := testCatalog(t) + require.NoError(t, cat.PutEarliestLedger(chunk.FirstLedgerSeq)) + + // An events file on disk at chunk 9's events path with NO meta key — orphan. + orphan := cat.layout.EventsPaths(9)[0] + writeArtifact(t, orphan) + + report, err := cat.Audit(AuditOptions{}) + require.NoError(t, err) + require.True(t, hasViolation(report, InvDiskMatchesMeta, ""), + "a keyless events file must be flagged as an orphan: %v", report.Violations) +} + // --------------------------------------------------------------------------- // INV-2 — single canonical state. // --------------------------------------------------------------------------- @@ -125,10 +142,10 @@ func TestAudit_INV2_OrphanHotForFullyServedChunk(t *testing.T) { cat, _ := testCatalog(t) require.NoError(t, cat.PutEarliestLedger(chunk.FirstLedgerSeq)) - // Chunk 0 fully served by cold artifacts (ledgers frozen) yet a "ready" hot DB - // persists — the discard scan missed it. - freezeChunkArtifacts(t, cat, 0, KindLedgers) - freezeChunkArtifacts(t, cat, 1, KindLedgers) + // Chunk 0 fully served by cold artifacts (ledgers + events frozen) yet a + // "ready" hot DB persists — the discard scan missed it. + freezeChunkArtifacts(t, cat, 0, KindLedgers, KindEvents) + freezeChunkArtifacts(t, cat, 1, KindLedgers, KindEvents) readyHot(t, cat, 0) report, err := cat.Audit(AuditOptions{}) diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/convergence_test.go b/cmd/stellar-rpc/internal/fullhistory/streaming/convergence_test.go index 4be2a68db..c0658901a 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/convergence_test.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/convergence_test.go @@ -312,6 +312,8 @@ func TestConvergence_SurgicalRecoveryCase3ReDerives(t *testing.T) { h.auditClean(t) h.requireQuiescent(t) require.Equal(t, StateFrozen, mustState(t, h.cat, 0, KindLedgers)) + require.Equal(t, StateFrozen, mustState(t, h.cat, 0, KindEvents), + "the re-ingested hot DB re-derives the events cold segment too") before := snapshotAllKeys(t, h.cat) h.tick(t) @@ -334,9 +336,9 @@ func TestConvergence_HotVolumeLossCase4(t *testing.T) { h := newConvergenceHarness(t, 0) // Durable cold history through chunk 0 (survives on durable storage): frozen - // ledgers. Chunk 0's last ledger is the last frozen boundary the watermark must - // heal to. - freezeChunkArtifacts(t, h.cat, 0, KindLedgers) + // ledgers + events. Chunk 0's last ledger is the last frozen boundary the + // watermark must heal to. + freezeChunkArtifacts(t, h.cat, 0, KindLedgers, KindEvents) // The lost live chunk 1: "ready" with its hot dir GONE (the ephemeral volume // died while the meta store survived). @@ -398,7 +400,7 @@ func TestConvergence_RetentionShortenPrunesBelowRaisedFloor(t *testing.T) { // Six finalized chunks (0..5) with real files, plus a live chunk 6. for c := chunk.ID(0); c <= 5; c++ { - freezeChunkArtifacts(t, cat, c, KindLedgers) + freezeChunkArtifacts(t, cat, c, KindLedgers, KindEvents) writeArtifact(t, cat.layout.LedgerPackPath(c)) } makeReadyHotDirNoData(t, cat, 1) // a below-floor hot DB too @@ -444,7 +446,7 @@ func TestConvergence_RetentionWidenIsTickNoOpAuditClean(t *testing.T) { // Chunks 3..5 finalized (the existing bottom of storage is chunk 3), live 6. for c := chunk.ID(3); c <= 5; c++ { - freezeChunkArtifacts(t, cat, c, KindLedgers) + freezeChunkArtifacts(t, cat, c, KindLedgers, KindEvents) writeArtifact(t, cat.layout.LedgerPackPath(c)) } live := openLiveHotDB(t, cat, 6) diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/doc.go b/cmd/stellar-rpc/internal/fullhistory/streaming/doc.go index 06a0a7f05..30bca677e 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/doc.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/doc.go @@ -4,13 +4,15 @@ // (fullhistory/pkg/...). It is built ON that layer — the catalog WRAPS // metastore.Store rather than reinventing a RocksDB wrapper. // -// This file map covers all of Slice 1 (Layers 1–4) — the assembled, -// ledgers-only daemon. Slices 2 and 3 then weave in the events and tx-hash data -// types (see "Later slices" below). +// This file map covers Slice 1 (the daemon skeleton) plus Slice 2 (events). +// Events is a second per-chunk artifact woven into the existing seams — it adds +// no new files here, only events column families, a processChunk segment +// writer, and the matching resolver/audit kind-loops. Slice 3 then adds the +// tx-hash data type (see "Later slices" below). // // # Data model (keys-first) // -// Every durable artifact (a per-chunk file) and every per-chunk hot DB is named +// Every durable artifact (a per-chunk file: ledger or events) and every per-chunk hot DB is named // by exactly one catalog key, and the path on disk is a fixed bijection of that // key. Nothing ever lists a directory to find work; every scan and sweep // iterates keys. The authoritative spec is @@ -68,7 +70,6 @@ // // # Later slices // -// Slice 2 weaves in the events data type (a second per-chunk artifact) and -// Slice 3 the tx-hash data type with its per-window rolling index — both -// additive on this ledgers-only skeleton. +// Slice 3 adds the tx-hash data type with its per-window rolling index — +// additive on this ledgers+events skeleton. package streaming diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/e2e_test.go b/cmd/stellar-rpc/internal/fullhistory/streaming/e2e_test.go index 322cf3fd0..fd177b258 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/e2e_test.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/e2e_test.go @@ -295,12 +295,20 @@ func TestE2E_DaemonLifecycle_FirstStartIngestFreezeRestartPrune(t *testing.T) { require.Equal(t, c0First, core.resumeSeen.Load(), "first start resumes captive core at genesis (watermark+1)") - // --- Correctness: chunks 0 and 1 ledger cold artifacts froze and exist on disk. --- + // --- Correctness: chunks 0 and 1 ledger + events cold artifacts froze and + // exist on disk. --- for _, c := range []chunk.ID{c0, c1} { st, err := cat.State(c, KindLedgers) require.NoError(t, err) assert.Equal(t, StateFrozen, st, "chunk %s ledgers is frozen", c) require.FileExists(t, cat.layout.LedgerPackPath(c), "chunk %s pack exists on disk", c) + + est, err := cat.State(c, KindEvents) + require.NoError(t, err) + assert.Equal(t, StateFrozen, est, "chunk %s events is frozen", c) + for _, p := range cat.layout.EventsPaths(c) { + require.FileExists(t, p, "chunk %s events segment file %s exists on disk", c, p) + } } // Observability: the daemon emitted the boundary + freeze phase signals (the diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/eligibility.go b/cmd/stellar-rpc/internal/fullhistory/streaming/eligibility.go index c3a999fb0..626b42511 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/eligibility.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/eligibility.go @@ -63,10 +63,10 @@ func eligibleDiscardOps(cfg LifecycleConfig, cat *Catalog, through uint32) ([]fu } // pendingArtifacts lists which processChunk outputs chunk still needs: the -// per-chunk kinds (currently just ledgers) that are not yet frozen. +// per-chunk kinds (ledgers, events) that are not yet frozen. func pendingArtifacts(c chunk.ID, cat *Catalog) (ArtifactSet, error) { var need ArtifactSet - for _, kind := range []Kind{KindLedgers} { + for _, kind := range []Kind{KindLedgers, KindEvents} { state, err := cat.State(c, kind) if err != nil { return need, err diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/ingest.go b/cmd/stellar-rpc/internal/fullhistory/streaming/ingest.go index cb36a26b1..e9302b78e 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/ingest.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/ingest.go @@ -47,7 +47,7 @@ type LedgerGetter interface { // batch. // //nolint:gochecknoglobals // immutable selection, the production ingest config -var allHotTypes = hotchunk.Ingest{Ledgers: true} +var allHotTypes = hotchunk.Ingest{Ledgers: true, Events: true} // openHotTierForChunk opens (or recovers, or creates) the ONE shared hot DB for // chunkID under the Phase A catalog hot:chunk bracket, returning an open handle diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/keys.go b/cmd/stellar-rpc/internal/fullhistory/streaming/keys.go index be952d753..eecdb2401 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/keys.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/keys.go @@ -46,12 +46,14 @@ type Kind string const ( // KindLedgers is the ledger pack file (.pack). KindLedgers Kind = "ledgers" + // KindEvents is the events cold segment (three files per chunk). + KindEvents Kind = "events" ) // allKinds is the canonical iteration order for per-chunk artifact kinds. // //nolint:gochecknoglobals // immutable kind registry, single source of truth -var allKinds = []Kind{KindLedgers} +var allKinds = []Kind{KindLedgers, KindEvents} // AllKinds returns the per-chunk artifact kinds in canonical order. func AllKinds() []Kind { return append([]Kind(nil), allKinds...) } diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/lifecycle_test.go b/cmd/stellar-rpc/internal/fullhistory/streaming/lifecycle_test.go index 6160ecfce..6a8cb4790 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/lifecycle_test.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/lifecycle_test.go @@ -241,8 +241,9 @@ func TestRunLifecycleTick_DiscardWhenComplete(t *testing.T) { require.NoError(t, err) require.Empty(t, ops, "ledgers not frozen yet: the hot DB stays") - // Now freeze chunk 0's ledger artifact: nothing pending => discard eligible. - freezeKinds(t, cat, 0, KindLedgers) + // Now freeze chunk 0's ledgers + events artifacts: nothing pending => discard + // eligible. + freezeKinds(t, cat, 0, KindLedgers, KindEvents) ops, err = eligibleDiscardOps(cfg, cat, through) require.NoError(t, err) require.Len(t, ops, 1, "frozen + nothing pending => discard eligible") @@ -263,7 +264,7 @@ func TestRunLifecycleTick_PastFloorPrune(t *testing.T) { // floor = lastCompleteChunkAt(through)-retention+1 = 5-2+1 = chunk 4's first // ledger. So chunks 0..3 are wholly past the floor and must be swept. for c := chunk.ID(0); c <= 5; c++ { - freezeKinds(t, cat, c, KindLedgers) + freezeKinds(t, cat, c, KindLedgers, KindEvents) writeArtifact(t, cat.layout.LedgerPackPath(c)) } // A past-floor hot DB too (chunk 1). @@ -418,10 +419,10 @@ func TestLifecycleLoop_RunsTickPerNotifyThenStopsOnCtx(t *testing.T) { cfg, rec := lifecycleTestConfig(t, cat, 0) // Make the tick observable WITHOUT a slow full ingest: chunk 0 is already - // fully frozen, with a leftover "ready" hot DB on disk. The plan stage is a - // no-op; the discard scan retires chunk 0's hot DB. A live chunk 1 keeps chunk - // 0 below the partition. - freezeKinds(t, cat, 0, KindLedgers) + // fully frozen (ledgers + events), with a leftover "ready" hot DB on disk. The + // plan stage is a no-op; the discard scan retires chunk 0's hot DB. A live + // chunk 1 keeps chunk 0 below the partition. + freezeKinds(t, cat, 0, KindLedgers, KindEvents) makeReadyHotDirNoData(t, cat, 0) live := openLiveHotDB(t, cat, 1) t.Cleanup(func() { _ = live.Close() }) @@ -458,7 +459,7 @@ func TestLifecycleLoop_DrainsToMostRecent(t *testing.T) { cfg, rec := lifecycleTestConfig(t, cat, 0) for c := chunk.ID(0); c <= 1; c++ { - freezeKinds(t, cat, c, KindLedgers) + freezeKinds(t, cat, c, KindLedgers, KindEvents) makeReadyHotDirNoData(t, cat, c) } live := openLiveHotDB(t, cat, 2) diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/observability_test.go b/cmd/stellar-rpc/internal/fullhistory/streaming/observability_test.go index 1897f43c6..fe9aaac26 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/observability_test.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/observability_test.go @@ -411,7 +411,7 @@ func TestRunLifecycleTick_EmptyTickStillReportsStages(t *testing.T) { metrics := newRecordingMetrics() cfg.Metrics = metrics - freezeKinds(t, cat, 0, KindLedgers) + freezeKinds(t, cat, 0, KindLedgers, KindEvents) // Drive the tick with chunk 0 (the just-completed chunk): the range [0,0] is // already fully materialized, so no build, no discard, no prune. diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/paths.go b/cmd/stellar-rpc/internal/fullhistory/streaming/paths.go index 12d47caa8..56ce0def0 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/paths.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/paths.go @@ -17,7 +17,8 @@ import ( // {root}/ // ├── catalog/rocksdb/ // ├── hot/{chunk:08d}/ -// └── ledgers/{bucket:05d}/{chunk:08d}.pack +// ├── ledgers/{bucket:05d}/{chunk:08d}.pack +// └── events/{bucket:05d}/{chunk:08d}-events.pack (+ -index.pack, -index.hash) // // But each tree's root is independently settable (NewLayoutFromPaths) so an // operator's [catalog]/[immutable_storage.*]/[streaming.hot_storage] path @@ -30,6 +31,7 @@ type Layout struct { catalogRoot string // meta-store RocksDB dir (a leaf, not a tree root) hotRoot string // per-chunk hot RocksDB dirs live directly under here ledgersRoot string // {ledgersRoot}/{bucket}/{chunk}.pack + eventsRoot string // {eventsRoot}/{bucket}/{chunk}-*.{pack,hash} } // NewLayout returns a Layout with every tree defaulting under a single data @@ -41,6 +43,7 @@ func NewLayout(root string) Layout { catalogRoot: filepath.Join(root, "catalog", "rocksdb"), hotRoot: filepath.Join(root, "hot"), ledgersRoot: filepath.Join(root, "ledgers"), + eventsRoot: filepath.Join(root, "events"), } } @@ -55,6 +58,7 @@ func NewLayoutFromPaths(p Paths) Layout { catalogRoot: p.Catalog, hotRoot: p.HotStorage, ledgersRoot: filepath.Join(p.Cold, "ledgers"), + eventsRoot: filepath.Join(p.Cold, "events"), } } @@ -79,13 +83,31 @@ func (l Layout) LedgerPackPath(c chunk.ID) string { // path matching LedgerPackPath. func (l Layout) LedgersRoot() string { return l.ledgersRoot } +// EventsPaths are the three events cold-segment files for a chunk: +// {chunk}-events.pack, {chunk}-index.pack, {chunk}-index.hash. +func (l Layout) EventsPaths(c chunk.ID) []string { + dir := filepath.Join(l.eventsRoot, c.BucketID()) + base := c.String() + return []string{ + filepath.Join(dir, base+"-events.pack"), + filepath.Join(dir, base+"-index.pack"), + filepath.Join(dir, base+"-index.hash"), + } +} + +// EventsRoot is the directory under which per-chunk events segments are +// bucketed. Matches the dir EventsPaths composes. +func (l Layout) EventsRoot() string { return l.eventsRoot } + // ArtifactPaths returns every file a per-chunk artifact kind owns on disk. -// One path for ledgers. The single place that maps a (chunk, kind) to its -// files, so the sweep and the freeze writer agree. +// One path for ledgers; three for events. The single place that maps a +// (chunk, kind) to its files, so the sweep and the freeze writer agree. func (l Layout) ArtifactPaths(c chunk.ID, kind Kind) []string { switch kind { case KindLedgers: return []string{l.LedgerPackPath(c)} + case KindEvents: + return l.EventsPaths(c) default: return nil } diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/process.go b/cmd/stellar-rpc/internal/fullhistory/streaming/process.go index e1bdfffd3..59cbfcb08 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/process.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/process.go @@ -177,6 +177,7 @@ func processChunk(ctx context.Context, chunkID chunk.ID, artifacts ArtifactSet, // any partial from a crashed "freezing" attempt. dirs := ingest.ColdDirs{ Ledgers: cat.layout.LedgersRoot(), + Events: cat.layout.EventsRoot(), } rerr := ingest.RunColdChunk(ctx, cfg.Logger, source, dirs, chunkID, cfg.Sink, artifacts.ingestConfig()) if rerr != nil { diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/process_test.go b/cmd/stellar-rpc/internal/fullhistory/streaming/process_test.go index caf84ff53..005be936d 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/process_test.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/process_test.go @@ -16,6 +16,7 @@ import ( "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/ingest" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/chunk" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/eventstore" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/hotchunk" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/ledger" ) @@ -189,6 +190,11 @@ func TestProcessChunk_ProducesAllArtifactsAndFreezes(t *testing.T) { // The ledger artifact exists on disk at its canonical Layout path. require.FileExists(t, cat.layout.LedgerPackPath(chunkID)) + // The events cold segment (all three files) exists at its canonical paths. + for _, p := range cat.layout.EventsPaths(chunkID) { + require.FileExists(t, p, "events cold-segment file %s should exist", p) + } + // The pack is a valid cold ledger pack covering the whole chunk. cr, err := ledger.OpenColdReader(cat.layout.LedgerPackPath(chunkID)) require.NoError(t, err) @@ -196,6 +202,12 @@ func TestProcessChunk_ProducesAllArtifactsAndFreezes(t *testing.T) { last, err := cr.LastSeq() require.NoError(t, err) require.Equal(t, chunkID.LastLedger(), last) + + // The events cold segment opens as a valid (eventless, since zero-tx) reader. + ecr, err := eventstore.OpenColdReader( + chunkID, filepath.Join(cat.layout.EventsRoot(), chunkID.BucketID()), eventstore.ColdReaderOptions{}) + require.NoError(t, err) + require.NoError(t, ecr.Close()) _ = root } diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/resolve.go b/cmd/stellar-rpc/internal/fullhistory/streaming/resolve.go index a3aa23905..c6676a96a 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/resolve.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/resolve.go @@ -35,7 +35,7 @@ func (p Plan) Empty() bool { return len(p.ChunkBuilds) == 0 } // // The kind rule: // -// - ledgers (per-chunk): chunk c is needed iff chunk:{c}:ledgers is not +// - ledgers / events (per-chunk): chunk c is needed iff chunk:{c}:{kind} is not // "frozen". A "freezing"/"pruning"/absent key re-materializes (idempotent // inside processChunk); a "frozen" key self-skips here. // @@ -51,9 +51,9 @@ func resolve(cfg ExecConfig, rangeStart, rangeEnd chunk.ID) (Plan, error) { // of how many kinds it needs (one processChunk pass produces all). needs := map[chunk.ID]ArtifactSet{} - // Per-chunk kinds: ledgers. + // Per-chunk kinds: ledgers, events. for c := rangeStart; ; c++ { - for _, kind := range []Kind{KindLedgers} { + for _, kind := range []Kind{KindLedgers, KindEvents} { state, err := cat.State(c, kind) if err != nil { return Plan{}, err diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/resolve_test.go b/cmd/stellar-rpc/internal/fullhistory/streaming/resolve_test.go index 05f7d6a03..9f459242d 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/resolve_test.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/resolve_test.go @@ -64,9 +64,10 @@ func TestResolve_InvertedRangeIsEmpty(t *testing.T) { func TestResolve_SteadyStateRestartIsEmpty(t *testing.T) { cat, _ := testCatalog(t) - // Every chunk in [0,3] has its ledgers frozen — the post-freeze steady state. + // Every chunk in [0,3] has its ledgers + events frozen — the post-freeze + // steady state. for c := chunk.ID(0); c <= 3; c++ { - freezeKinds(t, cat, c, KindLedgers) + freezeKinds(t, cat, c, KindLedgers, KindEvents) } plan, err := resolve(resolveCfg(cat), 0, 3) @@ -83,9 +84,9 @@ func TestResolve_SteadyStateRestartIsEmpty(t *testing.T) { func TestResolve_SchedulesOnlyUnfrozenChunks(t *testing.T) { cat, _ := testCatalog(t) - // Chunks 0,1,5 frozen; 2,3,4 absent. + // Chunks 0,1,5 frozen (ledgers + events); 2,3,4 absent. for _, c := range []chunk.ID{0, 1, 5} { - freezeKinds(t, cat, c, KindLedgers) + freezeKinds(t, cat, c, KindLedgers, KindEvents) } plan, err := resolve(resolveCfg(cat), 0, 5) diff --git a/cmd/stellar-rpc/internal/fullhistory/streaming/retention_test.go b/cmd/stellar-rpc/internal/fullhistory/streaming/retention_test.go index 56089f709..edda4f131 100644 --- a/cmd/stellar-rpc/internal/fullhistory/streaming/retention_test.go +++ b/cmd/stellar-rpc/internal/fullhistory/streaming/retention_test.go @@ -85,10 +85,10 @@ func TestRetentionGate_ChunkBelowFloor(t *testing.T) { func TestReaderRetention_StraddlingFloorServesInRangeNotBelow(t *testing.T) { cat, _ := testCatalog(t) - // Chunks 0..3 have their ledger artifacts frozen, written when the floor sat at - // genesis. + // Chunks 0..3 have their ledger + events artifacts frozen, written when the + // floor sat at genesis. for c := chunk.ID(0); c <= 3; c++ { - freezeKinds(t, cat, c, KindLedgers) + freezeKinds(t, cat, c, KindLedgers, KindEvents) writeArtifact(t, cat.layout.LedgerPackPath(c)) } @@ -139,10 +139,10 @@ func TestReaderRetention_StraddlingFloorServesInRangeNotBelow(t *testing.T) { func TestReaderRetention_ShorteningPrunesNewlyOutOfRangeChunks(t *testing.T) { cat, _ := testCatalog(t) - // Chunks 0..5 fully frozen, with a real .pack on disk. Live chunk 6 - // (positional ⇒ through = chunk 5's last). + // Chunks 0..5 fully frozen (ledgers + events), with a real .pack on disk. Live + // chunk 6 (positional ⇒ through = chunk 5's last). for c := chunk.ID(0); c <= 5; c++ { - freezeKinds(t, cat, c, KindLedgers) + freezeKinds(t, cat, c, KindLedgers, KindEvents) writeArtifact(t, cat.layout.LedgerPackPath(c)) } live := openLiveHotDB(t, cat, 6)