Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions cmd/stellar-rpc/internal/fullhistory/ingest/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ type HotStores struct {
// hotchunk.Ingest toggles that select which CFs the single per-ledger batch
// writes.
func ingestContributions(cfg Config) hotchunk.Ingest {
return hotchunk.Ingest{Ledgers: cfg.Ledgers}
return hotchunk.Ingest{Ledgers: cfg.Ledgers, Events: cfg.Events}
}

// buildColdIngesters opens one ColdIngester per data type enabled in cfg,
Expand All @@ -48,6 +48,7 @@ func buildColdIngesters(coldDir string, chunkID chunk.ID, sink MetricSink, cfg C
open func(string, chunk.ID, MetricSink) (ColdIngester, error)
}{
{cfg.Ledgers, dataTypeLedgers, NewLedgerColdIngester},
{cfg.Events, dataTypeEvents, NewEventsColdIngester},
}
var ings []ColdIngester
for _, c := range ctors {
Expand Down Expand Up @@ -114,7 +115,7 @@ func RunHot(
if verr := cfg.validate(); verr != nil {
return verr
}
anyEnabled := cfg.Ledgers
anyEnabled := cfg.Ledgers || cfg.Events
if anyEnabled && hotStores.HotDB == nil {
return errors.New("ingest: a hot data type is enabled but HotStores.HotDB is nil")
}
Expand Down Expand Up @@ -199,6 +200,7 @@ func drain(ctx context.Context, stream ledgerbackend.LedgerStream, chunkID chunk
// while reusing the very same cold ingesters, ColdService, and drain loop.
type ColdDirs struct {
Ledgers string
Events string
}

// buildColdIngestersIn opens one ColdIngester per data type enabled in cfg,
Expand All @@ -215,6 +217,7 @@ func buildColdIngestersIn(dirs ColdDirs, chunkID chunk.ID, sink MetricSink, cfg
open func(string, chunk.ID, MetricSink) (ColdIngester, error)
}{
{cfg.Ledgers, dataTypeLedgers, dirs.Ledgers, NewLedgerColdIngester},
{cfg.Events, dataTypeEvents, dirs.Events, NewEventsColdIngester},
}
var ings []ColdIngester
for _, c := range ctors {
Expand Down
94 changes: 94 additions & 0 deletions cmd/stellar-rpc/internal/fullhistory/ingest/ingest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@ import (
supportlog "github.com/stellar/go-stellar-sdk/support/log"
"github.com/stellar/go-stellar-sdk/xdr"

"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/events"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/chunk"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/eventstore"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/hotchunk"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/fullhistory/pkg/stores/ledger"
)
Expand Down Expand Up @@ -277,6 +279,50 @@ func buildLCMReturningHashes(
return lcm, hashes
}

// eventTopic is the contract-event topic the events fixtures share, so two
// fixtures' events resolve to the same term key.
const eventTopic = "ingest_test"

// eventLCM builds a V2 LCM at seq carrying one transaction that emits a single
// contract event. Returns the wire bytes and the event's term key.
func eventLCM(t *testing.T, seq uint32) ([]byte, events.TermKey) {
t.Helper()
ev := buildContractEvent(eventTopic)
meta := xdr.TransactionMeta{
V: 4,
V4: &xdr.TransactionMetaV4{Operations: []xdr.OperationMetaV2{{Events: []xdr.ContractEvent{ev}}}},
}
raw, err := buildLCM(t, seq, []xdr.TransactionMeta{meta}).MarshalBinary()
require.NoError(t, err)

evBytes, err := ev.MarshalBinary()
require.NoError(t, err)
keys, err := events.TermsForBytes(evBytes)
require.NoError(t, err)
require.NotEmpty(t, keys)
return raw, keys[0]
}

// buildContractEvent returns a contract ContractEvent with a single symbol
// topic, the minimal shape the events extractor indexes.
func buildContractEvent(topic string) xdr.ContractEvent {
var contractID xdr.ContractId
contractID[0] = 0xab
contractID[1] = 0xcd
sym := xdr.ScSymbol(topic)
return xdr.ContractEvent{
ContractId: &contractID,
Type: xdr.ContractEventTypeContract,
Body: xdr.ContractEventBody{
V: 0,
V0: &xdr.ContractEventV0{
Topics: []xdr.ScVal{{Type: xdr.ScValTypeScvSymbol, Sym: &sym}},
Data: xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &sym},
},
},
}
}

func testLogger() *supportlog.Entry {
l := supportlog.New()
l.SetLevel(logrus.ErrorLevel)
Expand Down Expand Up @@ -499,6 +545,54 @@ func TestColdService_Success(t *testing.T) {
require.Len(t, sink.coldIngests, 1, "Close after Finalize must not re-emit per-ingester signals")
}

// TestColdService_LedgersAndEvents drives BOTH the ledger and events cold
// ingesters through one ColdService over event-bearing ledgers (via the
// explicit-dirs builder processChunk uses), then reads back the ledger pack
// AND the events cold segment, proving the events kind lands across CFs on the
// cold path.
func TestColdService_LedgersAndEvents(t *testing.T) {
chunkID := chunk.ID(0)
first := chunkID.FirstLedger()
dirs := ColdDirs{Ledgers: t.TempDir(), Events: t.TempDir()}
sink := &testSink{}

ings, err := buildColdIngestersIn(dirs, chunkID, sink, Config{Ledgers: true, Events: true})
require.NoError(t, err)
service := NewColdService(ings, sink)
defer func() { require.NoError(t, service.Close()) }()

raw0, term0 := eventLCM(t, first)
raw1, _ := eventLCM(t, first+1)
require.NoError(t, service.Ingest(context.Background(), first, xdr.LedgerCloseMetaView(raw0)))
require.NoError(t, service.Ingest(context.Background(), first+1, xdr.LedgerCloseMetaView(raw1)))
require.NoError(t, service.Finalize(context.Background()))

// Ledger cold readback: the boundary ledger reads back to the right bytes.
lcr, err := ledger.OpenColdReader(packPath(dirs.Ledgers, chunkID))
require.NoError(t, err)
defer func() { require.NoError(t, lcr.Close()) }()
gotFirst, err := lcr.GetLedgerRaw(first)
require.NoError(t, err)
require.Equal(t, raw0, gotFirst)

// Events cold readback: the shared event term resolves to both ledgers'
// events in the frozen cold segment.
ecr, err := eventstore.OpenColdReader(
chunkID, filepath.Join(dirs.Events, chunkID.BucketID()), eventstore.ColdReaderOptions{})
require.NoError(t, err)
defer func() { require.NoError(t, ecr.Close()) }()
bm, err := ecr.Lookup(context.Background(), term0)
require.NoError(t, err)
require.NotNil(t, bm)
require.Equal(t, uint64(2), bm.GetCardinality(), "both ledgers share the event term")

// Metrics: one ColdIngest per data type, no errors.
cdt := sink.coldDataTypes()
require.Equal(t, 1, cdt[dataTypeLedgers])
require.Equal(t, 1, cdt[dataTypeEvents])
require.Empty(t, sink.coldErrorTypes(), "success path records no ingester errors")
}

// failingCold is a ColdIngester whose Ingest always fails, modeling a mid-chunk
// error. Finalize must NOT run on this path.
type failingCold struct {
Expand Down
3 changes: 3 additions & 0 deletions cmd/stellar-rpc/internal/fullhistory/ingest/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ func (s *HotService) emit(counts hotchunk.LedgerCounts, d time.Duration, err err
if s.cfg.Ledgers {
s.sink.HotIngest(dataTypeLedgers, d, itemsOnSuccess(counts.Ledgers, err), err)
}
if s.cfg.Events {
s.sink.HotIngest(dataTypeEvents, d, itemsOnSuccess(counts.Events, err), err)
}
}

// itemsOnSuccess returns n on success and 0 on error — a failed atomic batch
Expand Down
Loading
Loading