diff --git a/.github/workflows/backfill-test.yml b/.github/workflows/backfill-test.yml new file mode 100644 index 000000000..eac068bd3 --- /dev/null +++ b/.github/workflows/backfill-test.yml @@ -0,0 +1,41 @@ +name: Backfill ingestion (callable) + +on: + workflow_call: + inputs: + target_ref: + description: 'Ref or SHA to benchmark (resolved to a SHA on checkout)' + required: true + type: string + # re-export ec2-leg's results so the coordinator can read them + outputs: + passed: + value: ${{ jobs.leg.outputs.passed }} + verdict: + value: ${{ jobs.leg.outputs.verdict }} + result_key: + value: ${{ jobs.leg.outputs.result_key }} + bucket: + value: ${{ jobs.leg.outputs.bucket }} + instance_id: + value: ${{ jobs.leg.outputs.instance_id }} + +permissions: + id-token: write + contents: read + +jobs: + leg: + uses: ./.github/workflows/ec2-leg.yml + secrets: inherit + with: + target_ref: ${{ inputs.target_ref }} + run_label: backfill + leg_script: cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/run-backfill.sh + runner_dir: ./cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner + # these cascade under the role's 8h MaxSessionDuration so deadline < poll < job < session < self-terminate + results_timeout: 28200 # 7h50m S3 poll ceiling + job_timeout_minutes: 475 # 7h55m (only effective on self-hosted, hosted runners hard-cap jobs at 6h) + role_duration_seconds: 28800 # 8h OIDC session == the role's MaxSessionDuration + self_terminate_minutes: 540 # 9h box backstop + extra_env: 'export HISTORY_RETENTION_WINDOW=120960 BACKFILL_DEADLINE=7h30m' diff --git a/.github/workflows/load-test-coordinator.yml b/.github/workflows/load-test-coordinator.yml index 9557d4c98..a66e2eb3d 100644 --- a/.github/workflows/load-test-coordinator.yml +++ b/.github/workflows/load-test-coordinator.yml @@ -11,7 +11,7 @@ on: push: # temporary scaffolding: before merge to main, replace with line # branches: [release/**] - branches: [release/**, gha-coordinator] + branches: [release/**, gha-coordinator, backfill-test] workflow_dispatch: inputs: target_ref: @@ -52,7 +52,7 @@ jobs: run: | SHA="$(git rev-parse HEAD)" PR="$(gh pr list --repo "${{ github.repository }}" --state open \ - --base main --head "$REF" --json number --jq '.[0].number // ""' 2>/dev/null || true)" + --head "$REF" --json number --jq '.[0].number // ""' 2>/dev/null || true)" { echo "target_sha=$SHA" echo "pr_number=$PR" @@ -69,9 +69,17 @@ jobs: with: target_ref: ${{ needs.plan.outputs.target_sha }} + backfill: + name: Backfill ingestion + needs: plan + uses: ./.github/workflows/backfill-test.yml + secrets: inherit + with: + target_ref: ${{ needs.plan.outputs.target_sha }} + report: name: Aggregate + report - needs: [plan, load-test] + needs: [plan, load-test, backfill] if: always() runs-on: ubuntu-latest timeout-minutes: 10 @@ -87,10 +95,8 @@ jobs: role-to-assume: ${{ secrets.AWS_GHA_ROLE_ARN }} aws-region: us-east-1 - # coordinator-runner does the rendering + history fold (it fetches each leg's - # S3 result and emits the new comment body, under unit test). The gh api glue - # -- reading the existing comment and posting/editing -- stays here. Tolerant: - # a posting hiccup must not pre-empt the gate below. + # coordinator-runner fetches each leg's S3 result and emits the new comment body, + # this ties that to the gh API - name: Render + post sticky report env: GH_TOKEN: ${{ github.token }} @@ -100,7 +106,9 @@ jobs: TARGET_SHA: ${{ needs.plan.outputs.target_sha }} TARGET_REF: ${{ inputs.target_ref || github.ref_name }} RUN_URL: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }} - LEGS: '[{"label":"Apply-load ingestion","bucket":"${{ needs.load-test.outputs.bucket }}","key":"${{ needs.load-test.outputs.result_key }}","passed":"${{ needs.load-test.outputs.passed }}","verdict":"${{ needs.load-test.outputs.verdict }}"}]' + LEGS: >- + [{"label":"Apply-load ingestion","bucket":"${{ needs.load-test.outputs.bucket }}","key":"${{ needs.load-test.outputs.result_key }}","passed":"${{ needs.load-test.outputs.passed }}","verdict":"${{ needs.load-test.outputs.verdict }}"}, + {"label":"Backfill ingestion","bucket":"${{ needs.backfill.outputs.bucket }}","key":"${{ needs.backfill.outputs.result_key }}","passed":"${{ needs.backfill.outputs.passed }}","verdict":"${{ needs.backfill.outputs.verdict }}"}] run: | set -uo pipefail MARKER='' @@ -137,8 +145,14 @@ jobs: - name: Set overall status if: always() run: | + rc=0 if [ "${{ needs.load-test.outputs.passed }}" != "true" ]; then echo "::error::Apply-load ingestion leg did not pass (result=${{ needs.load-test.result }}, verdict=${{ needs.load-test.outputs.verdict }})" - exit 1 + rc=1 + fi + if [ "${{ needs.backfill.outputs.passed }}" != "true" ]; then + echo "::error::Backfill ingestion leg did not pass (result=${{ needs.backfill.result }}, verdict=${{ needs.backfill.outputs.verdict }})" + rc=1 fi + if [ "$rc" -ne 0 ]; then exit 1; fi echo "all load-test legs passed" diff --git a/cmd/stellar-rpc/internal/daemon/daemon.go b/cmd/stellar-rpc/internal/daemon/daemon.go index 70d2b26a7..a40a989a3 100644 --- a/cmd/stellar-rpc/internal/daemon/daemon.go +++ b/cmd/stellar-rpc/internal/daemon/daemon.go @@ -205,6 +205,24 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon { var ingestCfg ingest.Config daemon.ingestService, ingestCfg = createIngestService(cfg, logger, daemon, feewindows, historyArchive, rw) if cfg.Backfill { + // Bulk-load with backfill-tuned SQLite pragmas via a separate session that is + // swapped in for the backfill phase and closed once it completes. + tunedSession, err := db.OpenSQLiteBackfillSession(cfg.SQLiteDBPath) + if err != nil { + logger.WithError(err).Fatal("failed to open backfill-tuned database session") + } + servingSession := daemon.db.UseSession(tunedSession) + + // On a fresh DB, drop the events secondary indexes so the bulk-load avoids + // random B-tree inserts; EnsureEventIndexes rebuilds them below. + if _, err := db.NewLedgerReader(daemon.db).GetLedgerRange(context.Background()); errors.Is(err, db.ErrEmptyDB) { + if err := db.DropEventIndexes(context.Background(), daemon.db, logger); err != nil { + logger.WithError(err).Fatal("failed to drop events indexes for backfill") + } + } else if err != nil { + logger.WithError(err).Fatal("failed to check database emptiness for backfill") + } + backfillMeta, err := ingest.NewBackfillMeta( logger, daemon.ingestService, @@ -218,10 +236,22 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon { if err := backfillMeta.RunBackfill(cfg); err != nil { logger.WithError(err).Fatal("failed to backfill ledgers") } + + // Restore the serving session and release the tuned session's resources. + daemon.db.UseSession(servingSession) + if err := tunedSession.Close(); err != nil { + logger.WithError(err).Warn("could not close backfill-tuned database session") + } // Clear the DB cache and fee windows so they re-populate from the database daemon.db.ResetCache() feewindows.Reset() } + // Rebuild any events indexes dropped for a backfill bulk-load (also covers + // crashed-backfill restarts). Must finish before ingestService.Start: SQLite + // is single-writer and a long CREATE INDEX would starve live commits. + if err := db.EnsureEventIndexes(context.Background(), daemon.db, cfg.SQLiteDBPath, logger); err != nil { + logger.WithError(err).Fatal("failed to ensure events indexes") + } // Start ingestion service only after backfill is complete daemon.ingestService.Start(ingestCfg) diff --git a/cmd/stellar-rpc/internal/db/db.go b/cmd/stellar-rpc/internal/db/db.go index 21b728436..98ecbd968 100644 --- a/cmd/stellar-rpc/internal/db/db.go +++ b/cmd/stellar-rpc/internal/db/db.go @@ -78,13 +78,28 @@ func (d *DB) ResetCache() { d.cache.firstLedgerCloseTime = 0 } +// Serving DSN pragmas: +// 1. Use Write-Ahead Logging (WAL). +// 2. Disable WAL auto-checkpointing (we do the checkpointing ourselves with +// wal_checkpoint pragmas after every write transaction). +// 3. Use synchronous=NORMAL, which is faster and still safe in WAL mode. +const serveSQLitePragmas = "_journal_mode=WAL&_wal_autocheckpoint=0&_synchronous=NORMAL" + +// Backfill DSN pragmas, relative to serving: +// 1. Use synchronous=OFF (safe since backfill is restartable and gap-checked). +// 2. Use a 1 GiB page cache for hot interior B-tree pages; anything larger +// displaces the OS page cache and degrades the late run. +const backfillSQLitePragmas = "_journal_mode=WAL&_wal_autocheckpoint=0&_synchronous=OFF" + + "&_cache_size=-1048576" // 1 GiB page cache (negative value = KiB) + +// Index-build DSN pragmas: a large cache raises the CREATE INDEX sorter's +// in-memory budget (this session runs alone, single connection); default +// temp_store so multi-GB sorts spill to disk instead of RAM. +const indexBuildSQLitePragmas = "_journal_mode=WAL&_wal_autocheckpoint=0&_synchronous=NORMAL" + + "&_cache_size=-2097152" // 2 GiB page cache (negative value = KiB) + func openSQLiteDB(dbFilePath string) (*db.Session, error) { - // 1. Use Write-Ahead Logging (WAL). - // 2. Disable WAL auto-checkpointing (we will do the checkpointing ourselves with wal_checkpoint pragmas - // after every write transaction). - // 3. Use synchronous=NORMAL, which is faster and still safe in WAL mode. - session, err := db.Open("sqlite3", - fmt.Sprintf("file:%s?_journal_mode=WAL&_wal_autocheckpoint=0&_synchronous=NORMAL", dbFilePath)) + session, err := db.Open("sqlite3", fmt.Sprintf("file:%s?%s", dbFilePath, serveSQLitePragmas)) if err != nil { return nil, fmt.Errorf("open failed: %w", err) } @@ -96,6 +111,116 @@ func openSQLiteDB(dbFilePath string) (*db.Session, error) { return session, nil } +// OpenSQLiteBackfillSession opens an additional, backfill-tuned session to the +// same SQLite file. Assumes the schema already exists and skips them. Swap onto +// the *DB with UseSession for backfill, then restore the serving session. +func OpenSQLiteBackfillSession(dbFilePath string) (db.SessionInterface, error) { + session, err := db.Open("sqlite3", fmt.Sprintf("file:%s?%s", dbFilePath, backfillSQLitePragmas)) + if err != nil { + return nil, fmt.Errorf("open backfill session failed: %w", err) + } + return session, nil +} + +// UseSession swaps the underlying session, returning the previous one. Used to +// apply backfill-specific SQLite tuning for the backfill phase without disturbing +// the metrics-wrapped serving session + restore it. Single-threaded only. +func (d *DB) UseSession(s db.SessionInterface) db.SessionInterface { + prev := d.SessionInterface + d.SessionInterface = s + return prev +} + +type indexDDL struct { + name string + ddl string +} + +// eventIndexes are the events secondary indexes dropped during a fresh-DB +// backfill and rebuilt afterwards. DDL text must match 06_topic_indices.sql +// exactly so the rebuilt schema is identical to a migration-built one. +// NOTE for migration authors: startup migrations run before EnsureEventIndexes, +// so a future migration touching these indexes must tolerate their absence +// (e.g. DROP INDEX IF EXISTS). +// +//nolint:gochecknoglobals // effectively-constant DDL lookup +var eventIndexes = []indexDDL{ + {"idx_id_contract_id", "CREATE INDEX idx_id_contract_id ON events (contract_id, id)"}, + {"idx_id_topic1", "CREATE INDEX idx_id_topic1 ON events (topic1, id)"}, +} + +// DropEventIndexes drops the events secondary indexes so a fresh-DB backfill +// bulk-load avoids random B-tree inserts. EnsureEventIndexes rebuilds them. +func DropEventIndexes(ctx context.Context, session db.SessionInterface, logger *log.Entry) error { + for _, index := range eventIndexes { + if _, err := session.ExecRaw(ctx, "DROP INDEX IF EXISTS "+index.name); err != nil { + return fmt.Errorf("could not drop index %s: %w", index.name, err) + } + logger.Infof("Dropped events index %s for backfill bulk-load", index.name) + } + return nil +} + +// EnsureEventIndexes rebuilds any events secondary index dropped by +// DropEventIndexes. Must complete before live ingestion starts: SQLite is +// single-writer and a long CREATE INDEX would starve live commits past their +// busy timeout. +func EnsureEventIndexes(ctx context.Context, d *DB, dbFilePath string, logger *log.Entry) error { + missing, err := missingEventIndexes(ctx, d) + if err != nil { + return err + } + if len(missing) == 0 { + return nil + } + + session, err := db.Open("sqlite3", fmt.Sprintf("file:%s?%s", dbFilePath, indexBuildSQLitePragmas)) + if err != nil { + return fmt.Errorf("open index build session failed: %w", err) + } + defer func() { + if err := session.Close(); err != nil { + logger.WithError(err).Warn("could not close index build session") + } + }() + // Single connection so the threads pragma applies to the CREATE INDEX below + session.DB.SetMaxOpenConns(1) + if _, err := session.ExecRaw(ctx, "PRAGMA threads=4"); err != nil { + return fmt.Errorf("could not enable multithreaded sorter: %w", err) + } + + for _, index := range missing { + logger.Infof("Building events index %s (may take minutes, no progress output)", index.name) + startTime := time.Now() + if _, err := session.ExecRaw(ctx, index.ddl); err != nil { + return fmt.Errorf("could not build index %s: %w", index.name, err) + } + // Checkpoint now so the index's WAL debt isn't paid by the first live commit + if _, err := session.ExecRaw(ctx, "PRAGMA wal_checkpoint(TRUNCATE)"); err != nil { + return fmt.Errorf("could not checkpoint after building index %s: %w", index.name, err) + } + logger.WithField("duration", time.Since(startTime).String()). + Infof("Built events index %s", index.name) + } + return nil +} + +func missingEventIndexes(ctx context.Context, d *DB) ([]indexDDL, error) { + var missing []indexDDL + for _, index := range eventIndexes { + var count int + err := d.GetRaw(ctx, &count, + "SELECT COUNT(*) FROM sqlite_master WHERE type = 'index' AND name = ?", index.name) + if err != nil { + return nil, fmt.Errorf("could not check index %s: %w", index.name, err) + } + if count == 0 { + missing = append(missing, index) + } + } + return missing, nil +} + func OpenSQLiteDBWithPrometheusMetrics(dbFilePath string, namespace string, sub db.Subservice, registry *prometheus.Registry, ) (*DB, error) { diff --git a/cmd/stellar-rpc/internal/db/event.go b/cmd/stellar-rpc/internal/db/event.go index 8b2d7e98f..e14abbca1 100644 --- a/cmd/stellar-rpc/internal/db/event.go +++ b/cmd/stellar-rpc/internal/db/event.go @@ -211,10 +211,9 @@ func (eventHandler *eventHandler) InsertEvents(lcm xdr.LedgerCloseMeta) error { } // Batch inserts to avoid exceeding SQLite's SQLITE_MAX_VARIABLE_NUMBER - // limit (32,767 by default). With 10 bind variables per event, we cap - // each INSERT at 1000 events (10,000 bind variables) to stay well - // within the limit. - const maxEventsPerBatch = 1000 + // limit (32,766 by default). 10 bind variables/event * 3,000 events = + // 30,000 bind variables < 32,766 limit. + const maxEventsPerBatch = 3000 for batchStart := 0; batchStart < len(insertableEvents); batchStart += maxEventsPerBatch { batchEnd := min(batchStart+maxEventsPerBatch, len(insertableEvents)) diff --git a/cmd/stellar-rpc/internal/db/transaction.go b/cmd/stellar-rpc/internal/db/transaction.go index 87f2cabcc..36633bedb 100644 --- a/cmd/stellar-rpc/internal/db/transaction.go +++ b/cmd/stellar-rpc/internal/db/transaction.go @@ -110,10 +110,9 @@ func (txn *transactionHandler) InsertTransactions(lcm xdr.LedgerCloseMeta) error } // Batch inserts to avoid exceeding SQLite's SQLITE_MAX_VARIABLE_NUMBER - // limit (32,766 by default). With 3 bind variables per transaction, we - // cap each INSERT at 3,000 rows (9,000 bind variables) to stay well - // within the limit. - const maxRowsPerBatch = 3000 + // limit (32,766 by default). 3 bind variables/tx * 10,000 tx rows = 30,000 + // bind variables < 32,766 limit. + const maxRowsPerBatch = 10000 rowsInBatch := 0 query := sq.Insert(transactionTableName). Columns("hash", "ledger_sequence", "application_order") diff --git a/cmd/stellar-rpc/internal/ingest/service.go b/cmd/stellar-rpc/internal/ingest/service.go index 5868249bb..2b8fa76c4 100644 --- a/cmd/stellar-rpc/internal/ingest/service.go +++ b/cmd/stellar-rpc/internal/ingest/service.go @@ -217,7 +217,8 @@ func (s *Service) ingest(ctx context.Context, sequence uint32) error { } }() - if err := s.ingestLedgerCloseMeta(tx, ledgerCloseMeta); err != nil { + durationMetrics := map[string]time.Duration{} + if err := s.ingestLedgerCloseMeta(tx, ledgerCloseMeta, durationMetrics); err != nil { return err } @@ -230,7 +231,6 @@ func (s *Service) ingest(ctx context.Context, sequence uint32) error { With(prometheus.Labels{"type": "fee-window"}). Observe(time.Since(feeWindowStartTime).Seconds()) - durationMetrics := map[string]time.Duration{} if err := tx.Commit(ledgerCloseMeta, durationMetrics); err != nil { return err } @@ -274,29 +274,32 @@ func (s *Service) ingestRange(ctx context.Context, backend backends.LedgerBacken }() var ledgerCloseMeta xdr.LedgerCloseMeta + durationMetrics := map[string]time.Duration{} for seq := seqRange.From(); seq <= seqRange.To(); seq++ { ledgerCloseMeta, err = backend.GetLedger(ctx, seq) if err != nil { return err } - if err := s.ingestLedgerCloseMeta(tx, ledgerCloseMeta); err != nil { + if err := s.ingestLedgerCloseMeta(tx, ledgerCloseMeta, durationMetrics); err != nil { return err } } - durationMetrics := map[string]time.Duration{} if err := tx.Commit(ledgerCloseMeta, durationMetrics); err != nil { return err } + fields := log.F{"total": time.Since(startTime).Seconds()} for key, duration := range durationMetrics { s.metrics.ingestionDurationMetric. With(prometheus.Labels{"type": key}). Observe(duration.Seconds()) + fields[key] = duration.Seconds() } + // Per-chunk phase timings for attributing where backfill time goes s.logger. - WithField("duration", time.Since(startTime).Seconds()). - Debugf("Ingested ledgers [%d, %d]", seqRange.From(), seqRange.To()) + WithFields(fields). + Infof("Ingested ledgers [%d, %d]", seqRange.From(), seqRange.To()) s.metrics.ingestionDurationMetric. With(prometheus.Labels{"type": "total"}). @@ -308,30 +311,28 @@ func (s *Service) ingestRange(ctx context.Context, backend backends.LedgerBacken return nil } -func (s *Service) ingestLedgerCloseMeta(tx db.WriteTx, ledgerCloseMeta xdr.LedgerCloseMeta) error { +// ingestLedgerCloseMeta writes a ledger's rows, accumulating per-phase durations +// into durationMetrics; callers observe/log them. +func (s *Service) ingestLedgerCloseMeta(tx db.WriteTx, ledgerCloseMeta xdr.LedgerCloseMeta, + durationMetrics map[string]time.Duration, +) error { startTime := time.Now() if err := tx.LedgerWriter().InsertLedger(ledgerCloseMeta); err != nil { return err } - s.metrics.ingestionDurationMetric. - With(prometheus.Labels{"type": "ledger_close_meta"}). - Observe(time.Since(startTime).Seconds()) + durationMetrics["ledger_close_meta"] += time.Since(startTime) startTime = time.Now() if err := tx.TransactionWriter().InsertTransactions(ledgerCloseMeta); err != nil { return err } - s.metrics.ingestionDurationMetric. - With(prometheus.Labels{"type": "transactions"}). - Observe(time.Since(startTime).Seconds()) + durationMetrics["transactions"] += time.Since(startTime) startTime = time.Now() if err := tx.EventWriter().InsertEvents(ledgerCloseMeta); err != nil { return err } - s.metrics.ingestionDurationMetric. - With(prometheus.Labels{"type": "events"}). - Observe(time.Since(startTime).Seconds()) + durationMetrics["events"] += time.Since(startTime) return nil } diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/run-backfill.sh b/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/run-backfill.sh new file mode 100644 index 000000000..4af0c96fb --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/run-backfill.sh @@ -0,0 +1,9 @@ +# Backfill ingestion leg. Concatenated after bootstrap-common.sh in the rendered +# EC2 user-data, so it relies on that file's env, helpers, bootstrap_box, and +# run_leg. It hands off to `runner instantiate`, which builds stellar-rpc and +# times a `--backfill` run against the pubnet datastore. The other half, +# `runner gather`, polls S3 for the result object. +LEG_TITLE="Backfill ingestion" + +bootstrap_box +run_leg ./cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner/instantiate.go b/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner/instantiate.go new file mode 100644 index 000000000..32f3e439a --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner/instantiate.go @@ -0,0 +1,207 @@ +package main + +import ( + "bufio" + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + "regexp" + "strconv" + "time" + + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + + "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend" + + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/harness" +) + +// legDir is this leg's path under the repo root, where its config template is +// checked in (the runner runs with cwd = repo root). +const legDir = "cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test" + +const ( + corePath = "/usr/local/bin/stellar-core" // fetched from S3 + binaryPath = "/data/stellar-rpc-bin" // built here (the repo checkout is /data/stellar-rpc) +) + +// backfillDoneRe matches the terminal line emitted on backfill's completion +var backfillDoneRe = regexp.MustCompile(`Backfill process complete, ledgers \[(\d+) -> (\d+)\]`) + +// instantiate is the instance's backfill task: it fetches + builds test fixtures, +// runs a timed backfill, then publishes the verdict. +func instantiate(ctx context.Context) error { + var ( + bucket = harness.Env("BUCKET", "stellar-rpc-ci-load-test") + region = harness.Env("REGION", "us-east-1") + workDir = harness.Env("WORK_DIR", "/data") + resultsFile = harness.Env("RESULTS_FILE", "/tmp/results.md") + resultKey = os.Getenv("RESULT_KEY") + targetSHA = os.Getenv("TARGET_SHA") + runID = harness.Env("RUN_ID", "manual") + // ~1 day by default for cheap test runs; the full week is 120960. + retention = harness.Env("HISTORY_RETENTION_WINDOW", "17280") + deadline = harness.Env("BACKFILL_DEADLINE", "4h") + ) + repoRoot, err := os.Getwd() + if err != nil { + return err + } + bail := func(format string, args ...any) error { + return harness.BailInstance(resultsFile, "Backfill ingestion", runID, targetSHA, fmt.Sprintf(format, args...)) + } + + awsCfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(region)) + if err != nil { + return bail("loading AWS config: %v", err) + } + fetch := &harness.S3Fetcher{Client: s3.NewFromConfig(awsCfg), Bucket: bucket} + + if err := fetch.FetchVerified(ctx, "core/stellar-core.zst", corePath, true, "stellar-core"); err != nil { + return bail("%v", err) + } + if err := os.Chmod(corePath, 0o755); err != nil { + return bail("chmod stellar-core: %v", err) + } + + logger.Infof("building stellar-rpc") + if err := harness.RunStreaming(ctx, repoRoot, nil, 40, "make", "build-libs"); err != nil { + return bail("make build-libs failed: %v", err) + } + if err := harness.RunStreaming(ctx, repoRoot, nil, 40, + "go", "build", "-o", binaryPath, "./cmd/stellar-rpc"); err != nil { + return bail("go build failed: %v", err) + } + + // fetch + write core config from SDK + coreCfg := filepath.Join(workDir, "captive-core-pubnet.cfg") + if err := os.WriteFile(coreCfg, ledgerbackend.PubnetDefaultConfig, 0o644); err != nil { + return bail("writing captive-core config: %v", err) + } + + cfgPath, err := renderConfig(repoRoot, workDir, coreCfg, retention) + if err != nil { + return bail("rendering config: %v", err) + } + + dl, err := time.ParseDuration(deadline) + if err != nil { + return bail("invalid BACKFILL_DEADLINE %q: %v", deadline, err) + } + logger.Infof("starting backfill (retention=%s, deadline=%s)", retention, deadline) + elapsed, lo, hi, err := runBackfill(ctx, dl, binaryPath, cfgPath) + if err != nil { + return bail("%v", err) + } + ingested := hi - lo + 1 + logger.Infof("backfill complete: %d ledgers [%d -> %d] in %s", ingested, lo, hi, elapsed.Round(time.Second)) + + md := renderMarkdown(targetSHA, retention, lo, hi, ingested, elapsed) + if err := os.WriteFile(resultsFile, []byte(md), 0o644); err != nil { + return bail("writing results: %v", err) + } + if err := harness.PublishResult( + ctx, fetch.Client, bucket, resultKey, "ok", runID, targetSHA, resultsFile, ""); err != nil { + return bail("publishing result: %v", err) + } + return nil +} + +// renderConfig fills the config template's ${...} placeholders (box paths + the +// retention window) via os.Expand +func renderConfig(repoRoot, workDir, coreCfg, retention string) (string, error) { + tmpl, err := os.ReadFile(filepath.Join(repoRoot, legDir, "testdata", "backfill-pubnet.toml.tmpl")) + if err != nil { + return "", err + } + mapping := func(in string) string { + switch in { + case "CAPTIVE_CORE_CONFIG_PATH": + return coreCfg + case "CAPTIVE_CORE_STORAGE_PATH": + return filepath.Join(workDir, "core-storage") + case "DB_PATH": + return filepath.Join(workDir, "backfill.sqlite") + case "STELLAR_CORE_BINARY_PATH": + return corePath + case "HISTORY_RETENTION_WINDOW": + return retention + default: + return "$" + in // leave unknown placeholders intact + } + } + body := os.Expand(string(tmpl), mapping) + cfgPath := filepath.Join(workDir, "backfill-rpc.toml") + if err := os.WriteFile(cfgPath, []byte(body), 0o644); err != nil { + return "", err + } + return cfgPath, nil +} + +// runBackfill launches the daemon and streams its output (teeing to the box log) +// until the backfill-complete line fires, recording the wall-clock +func runBackfill(ctx context.Context, deadline time.Duration, binary, cfgPath string) (time.Duration, int, int, error) { + runCtx, cancel := context.WithTimeout(ctx, deadline) + defer cancel() + + cmd := exec.CommandContext(runCtx, binary, "--config-path", cfgPath) + // hide this box's IMDS creds as the public datalake 403s signed requests + cmd.Env = append(os.Environ(), "AWS_EC2_METADATA_DISABLED=true") + pr, pw, err := os.Pipe() + if err != nil { + return 0, 0, 0, err + } + cmd.Stdout, cmd.Stderr = pw, pw + + start := time.Now() + if err := cmd.Start(); err != nil { + pw.Close() + pr.Close() + return 0, 0, 0, fmt.Errorf("starting daemon: %w", err) + } + pw.Close() // the child holds the write end and we read until it dies + defer pr.Close() + + var elapsed time.Duration + var lo, hi int + scanner := bufio.NewScanner(pr) + scanner.Buffer(make([]byte, 64*1024), 1024*1024) + for scanner.Scan() { + line := scanner.Text() + fmt.Fprintln(os.Stderr, line) // tee to the box user-data log (SSM debug tail) + if m := backfillDoneRe.FindStringSubmatch(line); m != nil { + elapsed = time.Since(start) + lo, _ = strconv.Atoi(m[1]) + hi, _ = strconv.Atoi(m[2]) + cancel() // stop the daemon before it starts live ingestion + break + } + } + _ = cmd.Wait() // reap; a kill from cancel surfaces here and is expected + + if elapsed == 0 { + return 0, 0, 0, fmt.Errorf("daemon exited or hit the %s deadline before backfill completed", deadline) + } + return elapsed, lo, hi, nil +} + +func renderMarkdown(sha, retention string, lo, hi, ingested int, elapsed time.Duration) string { + shortSHA := sha + if len(shortSHA) > 12 { + shortSHA = shortSHA[:12] + } + lps := 0.0 + if s := elapsed.Seconds(); s > 0 { + lps = float64(ingested) / s + } + return fmt.Sprintf("### ⏳ Backfill ingestion — `%s`\n\n"+ + "| Metric | Value |\n|---|---|\n"+ + "| Ledgers ingested | %d (`[%d -> %d]`) |\n"+ + "| Retention window | %s |\n"+ + "| Wall-clock | %s |\n"+ + "| Ledgers/sec | %.1f |\n", + shortSHA, ingested, lo, hi, retention, elapsed.Round(time.Second), lps) +} diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner/main.go b/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner/main.go new file mode 100644 index 000000000..4db3e1b17 --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner/main.go @@ -0,0 +1,10 @@ +// Command runner drives the backfill ingestion leg. The shared harness owns the +// dispatch: `runner gather` polls S3 for the result object on the GHA runner, +// and `runner instantiate` (default) runs the on-box backfill below. +package main + +import "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/harness" + +var logger = harness.NewLogger() + +func main() { harness.Run(instantiate) } diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/testdata/backfill-pubnet.toml.tmpl b/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/testdata/backfill-pubnet.toml.tmpl new file mode 100644 index 000000000..80927c9fc --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/testdata/backfill-pubnet.toml.tmpl @@ -0,0 +1,36 @@ +# To fill in and use by the backfill leg's runner (instantiate.go renderConfig), +# which substitutes the placeholders below via os.Expand before the daemon reads +# it -- not a standalone config. + +NETWORK_PASSPHRASE = "Public Global Stellar Network ; September 2015" +HISTORY_ARCHIVE_URLS = ["https://history.stellar.org/prd/core-live/core_live_001"] +ENDPOINT = "localhost:8000" +LOG_LEVEL = "info" + +# Captive core is constructed at startup (so the binary + config must exist) +# even though backfill itself reads from the datastore below. +CAPTIVE_CORE_CONFIG_PATH = "${CAPTIVE_CORE_CONFIG_PATH}" +STELLAR_CORE_BINARY_PATH = "${STELLAR_CORE_BINARY_PATH}" +CAPTIVE_CORE_STORAGE_PATH = "${CAPTIVE_CORE_STORAGE_PATH}" +CHECKPOINT_FREQUENCY = 64 + +DB_PATH = "${DB_PATH}" + +# Backfill the retention window from the datastore on startup, then stop (the +# leg kills the daemon at the completion log line). +BACKFILL = true +SERVE_LEDGERS_FROM_DATASTORE = true +HISTORY_RETENTION_WINDOW = ${HISTORY_RETENTION_WINDOW} +INGESTION_TIMEOUT = "50m0s" + +[datastore_config] + type = "S3" + + [datastore_config.params] + destination_bucket_path = "aws-public-blockchain/v1.1/stellar/ledgers/pubnet" + region = "us-east-2" + + [datastore_config.schema] + FileExtension = "zst" + files_per_partition = 64000 + ledgers_per_file = 1