From c57a23b3d1b840d8a4dc21889eae8b4f3250471d Mon Sep 17 00:00:00 2001 From: Christian Jonas Date: Mon, 29 Jun 2026 17:58:53 -0400 Subject: [PATCH 1/8] add initial backfill-specific testing and wiring --- .github/workflows/backfill-test.yml | 38 ++++ .github/workflows/load-test-coordinator.yml | 26 ++- .github/workflows/load-test.yml | 35 +++ .../perf-eval/backfill-test/run-backfill.sh | 9 + .../backfill-test/runner/instantiate.go | 207 ++++++++++++++++++ .../perf-eval/backfill-test/runner/main.go | 10 + .../testdata/backfill-pubnet.toml.tmpl | 36 +++ .../perf-eval/harness/harness.go | 31 ++- .../perf-eval/ingest-load-test/runner/main.go | 53 +---- 9 files changed, 393 insertions(+), 52 deletions(-) create mode 100644 .github/workflows/backfill-test.yml create mode 100644 .github/workflows/load-test.yml create mode 100644 cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/run-backfill.sh create mode 100644 cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner/instantiate.go create mode 100644 cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner/main.go create mode 100644 cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/testdata/backfill-pubnet.toml.tmpl diff --git a/.github/workflows/backfill-test.yml b/.github/workflows/backfill-test.yml new file mode 100644 index 000000000..9a52432c8 --- /dev/null +++ b/.github/workflows/backfill-test.yml @@ -0,0 +1,38 @@ +name: Backfill ingestion (callable) + +on: + workflow_call: + inputs: + target_ref: + description: 'Ref or SHA to benchmark (resolved to a SHA on checkout)' + required: true + type: string + # re-export ec2-leg's results so the coordinator can read them + outputs: + passed: + value: ${{ jobs.leg.outputs.passed }} + verdict: + value: ${{ jobs.leg.outputs.verdict }} + result_key: + value: ${{ jobs.leg.outputs.result_key }} + bucket: + value: ${{ jobs.leg.outputs.bucket }} + instance_id: + value: ${{ jobs.leg.outputs.instance_id }} + +permissions: + id-token: write + contents: read + +jobs: + leg: + uses: ./.github/workflows/ec2-leg.yml + secrets: inherit + with: + target_ref: ${{ inputs.target_ref }} + run_label: backfill + leg_script: cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/run-backfill.sh + runner_dir: ./cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner + # ~1 day for cheap test runs; raise to 120960 (1 week) once the role's + # MaxSessionDuration + the leg timeouts are bumped for the multi-hour run. + extra_env: 'export HISTORY_RETENTION_WINDOW=17280' diff --git a/.github/workflows/load-test-coordinator.yml b/.github/workflows/load-test-coordinator.yml index 575d0f82f..7805fb9de 100644 --- a/.github/workflows/load-test-coordinator.yml +++ b/.github/workflows/load-test-coordinator.yml @@ -72,9 +72,23 @@ jobs: leg_script: cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/ingest-load-test/run-load-test.sh runner_dir: ./cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/ingest-load-test/runner + backfill: + name: Backfill ingestion + needs: plan + uses: ./.github/workflows/ec2-leg.yml + secrets: inherit + with: + target_ref: ${{ needs.plan.outputs.target_sha }} + run_label: backfill + leg_script: cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/run-backfill.sh + runner_dir: ./cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner + # ~1 day for cheap test runs; raise to 120960 (1 week) once the role's + # MaxSessionDuration + the leg timeouts are bumped for the multi-hour run. + extra_env: 'export HISTORY_RETENTION_WINDOW=17280' + report: name: Aggregate + report - needs: [plan, load-test] + needs: [plan, load-test, backfill] if: always() runs-on: ubuntu-latest timeout-minutes: 10 @@ -103,7 +117,7 @@ jobs: TARGET_SHA: ${{ needs.plan.outputs.target_sha }} TARGET_REF: ${{ inputs.target_ref || github.ref_name }} RUN_URL: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }} - LEGS: '[{"label":"Apply-load ingestion","bucket":"${{ needs.load-test.outputs.bucket }}","key":"${{ needs.load-test.outputs.result_key }}","passed":"${{ needs.load-test.outputs.passed }}","verdict":"${{ needs.load-test.outputs.verdict }}"}]' + LEGS: '[{"label":"Apply-load ingestion","bucket":"${{ needs.load-test.outputs.bucket }}","key":"${{ needs.load-test.outputs.result_key }}","passed":"${{ needs.load-test.outputs.passed }}","verdict":"${{ needs.load-test.outputs.verdict }}"},{"label":"Backfill ingestion","bucket":"${{ needs.backfill.outputs.bucket }}","key":"${{ needs.backfill.outputs.result_key }}","passed":"${{ needs.backfill.outputs.passed }}","verdict":"${{ needs.backfill.outputs.verdict }}"}]' run: | set -uo pipefail MARKER='' @@ -140,8 +154,14 @@ jobs: - name: Set overall status if: always() run: | + rc=0 if [ "${{ needs.load-test.outputs.passed }}" != "true" ]; then echo "::error::Apply-load ingestion leg did not pass (result=${{ needs.load-test.result }}, verdict=${{ needs.load-test.outputs.verdict }})" - exit 1 + rc=1 + fi + if [ "${{ needs.backfill.outputs.passed }}" != "true" ]; then + echo "::error::Backfill ingestion leg did not pass (result=${{ needs.backfill.result }}, verdict=${{ needs.backfill.outputs.verdict }})" + rc=1 fi + if [ "$rc" -ne 0 ]; then exit 1; fi echo "all load-test legs passed" diff --git a/.github/workflows/load-test.yml b/.github/workflows/load-test.yml new file mode 100644 index 000000000..f14ae120b --- /dev/null +++ b/.github/workflows/load-test.yml @@ -0,0 +1,35 @@ +name: Apply-load ingestion (callable) + +on: + workflow_call: + inputs: + target_ref: + description: 'Ref or SHA to benchmark (resolved to a SHA on checkout)' + required: true + type: string + # re-export ec2-leg's results so the coordinator can read them + outputs: + passed: + value: ${{ jobs.leg.outputs.passed }} + verdict: + value: ${{ jobs.leg.outputs.verdict }} + result_key: + value: ${{ jobs.leg.outputs.result_key }} + bucket: + value: ${{ jobs.leg.outputs.bucket }} + instance_id: + value: ${{ jobs.leg.outputs.instance_id }} + +permissions: + id-token: write + contents: read + +jobs: + leg: + uses: ./.github/workflows/ec2-leg.yml + secrets: inherit + with: + target_ref: ${{ inputs.target_ref }} + run_label: apply-load + leg_script: cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/ingest-load-test/run-load-test.sh + runner_dir: ./cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/ingest-load-test/runner diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/run-backfill.sh b/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/run-backfill.sh new file mode 100644 index 000000000..4af0c96fb --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/run-backfill.sh @@ -0,0 +1,9 @@ +# Backfill ingestion leg. Concatenated after bootstrap-common.sh in the rendered +# EC2 user-data, so it relies on that file's env, helpers, bootstrap_box, and +# run_leg. It hands off to `runner instantiate`, which builds stellar-rpc and +# times a `--backfill` run against the pubnet datastore. The other half, +# `runner gather`, polls S3 for the result object. +LEG_TITLE="Backfill ingestion" + +bootstrap_box +run_leg ./cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner/instantiate.go b/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner/instantiate.go new file mode 100644 index 000000000..c89339b65 --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner/instantiate.go @@ -0,0 +1,207 @@ +package main + +import ( + "bufio" + "context" + "fmt" + "os" + "os/exec" + "path/filepath" + "regexp" + "strconv" + "time" + + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + + "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend" + + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/harness" +) + +// legDir is this leg's path under the repo root, where its config template is +// checked in (the runner runs with cwd = repo root). +const legDir = "cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test" + +const ( + corePath = "/usr/local/bin/stellar-core" // fetched from S3 + binaryPath = "/data/stellar-rpc" // built here +) + +// backfillDoneRe matches the terminal line emitted on backfill's completion +var backfillDoneRe = regexp.MustCompile(`Backfill process complete, ledgers \[(\d+) -> (\d+)\]`) + +// instantiate is the instance's backfill task: it fetches + builds test fixtures, +// runs a timed backfill, then publishes the verdict. +func instantiate(ctx context.Context) error { + var ( + bucket = harness.Env("BUCKET", "stellar-rpc-ci-load-test") + region = harness.Env("REGION", "us-east-1") + workDir = harness.Env("WORK_DIR", "/data") + resultsFile = harness.Env("RESULTS_FILE", "/tmp/results.md") + resultKey = os.Getenv("RESULT_KEY") + targetSHA = os.Getenv("TARGET_SHA") + runID = harness.Env("RUN_ID", "manual") + // ~1 day by default for cheap test runs; the full week is 120960. + retention = harness.Env("HISTORY_RETENTION_WINDOW", "17280") + deadline = harness.Env("BACKFILL_DEADLINE", "4h") + ) + repoRoot, err := os.Getwd() + if err != nil { + return err + } + bail := func(format string, args ...any) error { + return harness.BailInstance(resultsFile, "Backfill ingestion", runID, targetSHA, fmt.Sprintf(format, args...)) + } + + awsCfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(region)) + if err != nil { + return bail("loading AWS config: %v", err) + } + fetch := &harness.S3Fetcher{Client: s3.NewFromConfig(awsCfg), Bucket: bucket} + + if err := fetch.FetchVerified(ctx, "core/stellar-core.zst", corePath, true, "stellar-core"); err != nil { + return bail("%v", err) + } + if err := os.Chmod(corePath, 0o755); err != nil { + return bail("chmod stellar-core: %v", err) + } + + logger.Infof("building stellar-rpc") + if err := harness.RunStreaming(ctx, repoRoot, nil, 40, "make", "build-libs"); err != nil { + return bail("make build-libs failed: %v", err) + } + if err := harness.RunStreaming(ctx, repoRoot, nil, 40, + "go", "build", "-o", binaryPath, "./cmd/stellar-rpc"); err != nil { + return bail("go build failed: %v", err) + } + + // fetch + write core config from SDK + coreCfg := filepath.Join(workDir, "captive-core-pubnet.cfg") + if err := os.WriteFile(coreCfg, ledgerbackend.PubnetDefaultConfig, 0o644); err != nil { + return bail("writing captive-core config: %v", err) + } + + cfgPath, err := renderConfig(repoRoot, workDir, coreCfg, retention) + if err != nil { + return bail("rendering config: %v", err) + } + + dl, err := time.ParseDuration(deadline) + if err != nil { + return bail("invalid BACKFILL_DEADLINE %q: %v", deadline, err) + } + logger.Infof("starting backfill (retention=%s, deadline=%s)", retention, deadline) + elapsed, lo, hi, err := runBackfill(ctx, dl, binaryPath, cfgPath) + if err != nil { + return bail("%v", err) + } + ingested := hi - lo + 1 + logger.Infof("backfill complete: %d ledgers [%d -> %d] in %s", ingested, lo, hi, elapsed.Round(time.Second)) + + md := renderMarkdown(targetSHA, retention, lo, hi, ingested, elapsed) + if err := os.WriteFile(resultsFile, []byte(md), 0o644); err != nil { + return bail("writing results: %v", err) + } + if err := harness.PublishResult( + ctx, fetch.Client, bucket, resultKey, "ok", runID, targetSHA, resultsFile, ""); err != nil { + return bail("publishing result: %v", err) + } + return nil +} + +// renderConfig fills the config template's ${...} placeholders (box paths + the +// retention window) via os.Expand +func renderConfig(repoRoot, workDir, coreCfg, retention string) (string, error) { + tmpl, err := os.ReadFile(filepath.Join(repoRoot, legDir, "testdata", "backfill-pubnet.toml.tmpl")) + if err != nil { + return "", err + } + mapping := func(in string) string { + switch in { + case "CAPTIVE_CORE_CONFIG_PATH": + return coreCfg + case "CAPTIVE_CORE_STORAGE_PATH": + return filepath.Join(workDir, "core-storage") + case "DB_PATH": + return filepath.Join(workDir, "backfill.sqlite") + case "STELLAR_CORE_BINARY_PATH": + return corePath + case "HISTORY_RETENTION_WINDOW": + return retention + default: + return "$" + in // leave unknown placeholders intact + } + } + body := os.Expand(string(tmpl), mapping) + cfgPath := filepath.Join(workDir, "backfill-rpc.toml") + if err := os.WriteFile(cfgPath, []byte(body), 0o644); err != nil { + return "", err + } + return cfgPath, nil +} + +// runBackfill launches the daemon and streams its output (teeing to the box log) +// until the backfill-complete line fires, recording the wall-clock +func runBackfill(ctx context.Context, deadline time.Duration, binary, cfgPath string) (time.Duration, int, int, error) { + runCtx, cancel := context.WithTimeout(ctx, deadline) + defer cancel() + + cmd := exec.CommandContext(runCtx, binary, "--config-path", cfgPath) + // hide this box's IMDS creds as the public datalake 403s signed requests + cmd.Env = append(os.Environ(), "AWS_EC2_METADATA_DISABLED=true") + pr, pw, err := os.Pipe() + if err != nil { + return 0, 0, 0, err + } + cmd.Stdout, cmd.Stderr = pw, pw + + start := time.Now() + if err := cmd.Start(); err != nil { + pw.Close() + pr.Close() + return 0, 0, 0, fmt.Errorf("starting daemon: %w", err) + } + pw.Close() // the child holds the write end and we read until it dies + defer pr.Close() + + var elapsed time.Duration + var lo, hi int + scanner := bufio.NewScanner(pr) + scanner.Buffer(make([]byte, 64*1024), 1024*1024) + for scanner.Scan() { + line := scanner.Text() + fmt.Fprintln(os.Stderr, line) // tee to the box user-data log (SSM debug tail) + if m := backfillDoneRe.FindStringSubmatch(line); m != nil { + elapsed = time.Since(start) + lo, _ = strconv.Atoi(m[1]) + hi, _ = strconv.Atoi(m[2]) + cancel() // stop the daemon before it starts live ingestion + break + } + } + _ = cmd.Wait() // reap; a kill from cancel surfaces here and is expected + + if elapsed == 0 { + return 0, 0, 0, fmt.Errorf("daemon exited or hit the %s deadline before backfill completed", deadline) + } + return elapsed, lo, hi, nil +} + +func renderMarkdown(sha, retention string, lo, hi, ingested int, elapsed time.Duration) string { + shortSHA := sha + if len(shortSHA) > 12 { + shortSHA = shortSHA[:12] + } + lps := 0.0 + if s := elapsed.Seconds(); s > 0 { + lps = float64(ingested) / s + } + return fmt.Sprintf("### ⏳ Backfill ingestion — `%s`\n\n"+ + "| Metric | Value |\n|---|---|\n"+ + "| Ledgers ingested | %d (`[%d -> %d]`) |\n"+ + "| Retention window | %s |\n"+ + "| Wall-clock | %s |\n"+ + "| Ledgers/sec | %.1f |\n", + shortSHA, ingested, lo, hi, retention, elapsed.Round(time.Second), lps) +} diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner/main.go b/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner/main.go new file mode 100644 index 000000000..4db3e1b17 --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner/main.go @@ -0,0 +1,10 @@ +// Command runner drives the backfill ingestion leg. The shared harness owns the +// dispatch: `runner gather` polls S3 for the result object on the GHA runner, +// and `runner instantiate` (default) runs the on-box backfill below. +package main + +import "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/harness" + +var logger = harness.NewLogger() + +func main() { harness.Run(instantiate) } diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/testdata/backfill-pubnet.toml.tmpl b/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/testdata/backfill-pubnet.toml.tmpl new file mode 100644 index 000000000..80927c9fc --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/testdata/backfill-pubnet.toml.tmpl @@ -0,0 +1,36 @@ +# To fill in and use by the backfill leg's runner (instantiate.go renderConfig), +# which substitutes the placeholders below via os.Expand before the daemon reads +# it -- not a standalone config. + +NETWORK_PASSPHRASE = "Public Global Stellar Network ; September 2015" +HISTORY_ARCHIVE_URLS = ["https://history.stellar.org/prd/core-live/core_live_001"] +ENDPOINT = "localhost:8000" +LOG_LEVEL = "info" + +# Captive core is constructed at startup (so the binary + config must exist) +# even though backfill itself reads from the datastore below. +CAPTIVE_CORE_CONFIG_PATH = "${CAPTIVE_CORE_CONFIG_PATH}" +STELLAR_CORE_BINARY_PATH = "${STELLAR_CORE_BINARY_PATH}" +CAPTIVE_CORE_STORAGE_PATH = "${CAPTIVE_CORE_STORAGE_PATH}" +CHECKPOINT_FREQUENCY = 64 + +DB_PATH = "${DB_PATH}" + +# Backfill the retention window from the datastore on startup, then stop (the +# leg kills the daemon at the completion log line). +BACKFILL = true +SERVE_LEDGERS_FROM_DATASTORE = true +HISTORY_RETENTION_WINDOW = ${HISTORY_RETENTION_WINDOW} +INGESTION_TIMEOUT = "50m0s" + +[datastore_config] + type = "S3" + + [datastore_config.params] + destination_bucket_path = "aws-public-blockchain/v1.1/stellar/ledgers/pubnet" + region = "us-east-2" + + [datastore_config.schema] + FileExtension = "zst" + files_per_partition = 64000 + ledgers_per_file = 1 diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/harness/harness.go b/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/harness/harness.go index d279d6bfb..2803687d1 100644 --- a/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/harness/harness.go +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/harness/harness.go @@ -16,6 +16,7 @@ package harness import ( + "context" "fmt" "os" "strings" @@ -23,11 +24,37 @@ import ( supportlog "github.com/stellar/go-stellar-sdk/support/log" ) -var logger = func() *supportlog.Entry { +// NewLogger returns an Info-level logger (supportlog.New starts at WARN). Each +// leg's runner uses one for its own messages. +func NewLogger() *supportlog.Entry { l := supportlog.New() l.SetLevel(supportlog.InfoLevel) return l -}() +} + +var logger = NewLogger() + +func Run(instantiate func(context.Context) error) { + cmd := "instantiate" + if len(os.Args) > 1 { + cmd = os.Args[1] + } + ctx := context.Background() + var err error + switch cmd { + case "instantiate": + err = instantiate(ctx) + case "gather": + err = Gather(ctx) + default: + fmt.Fprintf(os.Stderr, "usage: %s [instantiate|gather]\n", os.Args[0]) + os.Exit(64) + } + if err != nil { + logger.Errorf("fatal: %v", err) + os.Exit(1) + } +} // Env returns the value of key, or def when unset/empty. func Env(key, def string) string { diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/ingest-load-test/runner/main.go b/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/ingest-load-test/runner/main.go index 4b6bffc95..0757b4f99 100644 --- a/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/ingest-load-test/runner/main.go +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/ingest-load-test/runner/main.go @@ -1,51 +1,10 @@ -// Command runner drives the apply-load ingestion leg. It has two subcommands, -// one per environment the leg spans: -// -// runner instantiate on the EC2 box, after the shell preamble has installed -// the toolchain and checked out the repo: streams the -// golden DB, stellar-core, and ledger bundles from S3 -// (sha-verified), runs the ingest benchmark, and writes -// an ok/fail verdict. -// runner gather on the GHA runner: polls S3 for the result object the -// instance publishes and relays the verdict + results as -// step outputs. Shared across legs (perf-eval/harness). -// -// The two halves coordinate through a single S3 result object (harness.Result). +// Command runner drives the apply-load ingestion leg. The shared harness owns +// the dispatch: `runner gather` polls S3 for the result object on the GHA runner, +// and `runner instantiate` (default) runs the on-box benchmark below. package main -import ( - "context" - "fmt" - "os" +import "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/harness" - supportlog "github.com/stellar/go-stellar-sdk/support/log" +var logger = harness.NewLogger() - "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/harness" -) - -var logger = supportlog.New() - -func main() { - logger.SetLevel(supportlog.InfoLevel) - - cmd := "instantiate" - if len(os.Args) > 1 { - cmd = os.Args[1] - } - - ctx := context.Background() - var err error - switch cmd { - case "instantiate": - err = instantiate(ctx) - case "gather": - err = harness.Gather(ctx) - default: - fmt.Fprintf(os.Stderr, "usage: %s [instantiate|gather]\n", os.Args[0]) - os.Exit(64) - } - if err != nil { - logger.Errorf("fatal: %v", err) - os.Exit(1) - } -} +func main() { harness.Run(instantiate) } From 3901d85cafd603b0b8aa806f7b0ce1c0ee4b4bff Mon Sep 17 00:00:00 2001 From: Christian Jonas Date: Mon, 29 Jun 2026 18:05:14 -0400 Subject: [PATCH 2/8] use callable workflows in coordinator --- .github/workflows/load-test-coordinator.yml | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/.github/workflows/load-test-coordinator.yml b/.github/workflows/load-test-coordinator.yml index 7805fb9de..3d5f10f81 100644 --- a/.github/workflows/load-test-coordinator.yml +++ b/.github/workflows/load-test-coordinator.yml @@ -58,33 +58,24 @@ jobs: echo "pr_number=$PR" } >> "$GITHUB_OUTPUT" - # The fan-out. Each leg is a call into the generic ec2-leg.yml with its own - # bootstrap script + runner. Further legs (e.g. endpoint benchmarks, backfill) - # slot in as sibling jobs the report job will aggregate. + # The fan-out. Each leg is its own callable workflow (a thin wrapper over the + # shared ec2-leg.yml). Further legs slot in as sibling jobs the report + # aggregates. load-test: name: Apply-load ingestion needs: plan - uses: ./.github/workflows/ec2-leg.yml + uses: ./.github/workflows/load-test.yml secrets: inherit with: target_ref: ${{ needs.plan.outputs.target_sha }} - run_label: apply-load - leg_script: cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/ingest-load-test/run-load-test.sh - runner_dir: ./cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/ingest-load-test/runner backfill: name: Backfill ingestion needs: plan - uses: ./.github/workflows/ec2-leg.yml + uses: ./.github/workflows/backfill-test.yml secrets: inherit with: target_ref: ${{ needs.plan.outputs.target_sha }} - run_label: backfill - leg_script: cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/run-backfill.sh - runner_dir: ./cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner - # ~1 day for cheap test runs; raise to 120960 (1 week) once the role's - # MaxSessionDuration + the leg timeouts are bumped for the multi-hour run. - extra_env: 'export HISTORY_RETENTION_WINDOW=17280' report: name: Aggregate + report From 4d7084d7f455b7a2b6a8e4130d0833e24665ea9a Mon Sep 17 00:00:00 2001 From: Christian Jonas Date: Tue, 30 Jun 2026 02:57:03 -0400 Subject: [PATCH 3/8] add push to branch backfill-test to coordinator triggers --- .github/workflows/load-test-coordinator.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/load-test-coordinator.yml b/.github/workflows/load-test-coordinator.yml index 3d5f10f81..7cd483093 100644 --- a/.github/workflows/load-test-coordinator.yml +++ b/.github/workflows/load-test-coordinator.yml @@ -11,7 +11,7 @@ on: push: # temporary scaffolding: before merge to main, replace with line # branches: [release/**] - branches: [release/**, gha-coordinator] + branches: [release/**, gha-coordinator, backfill-test] workflow_dispatch: inputs: target_ref: From 0d261a6c3b3bf7d4d04eee360087e58c289a5e66 Mon Sep 17 00:00:00 2001 From: Christian Jonas Date: Tue, 30 Jun 2026 03:11:52 -0400 Subject: [PATCH 4/8] resolve the PR by head without --base main --- .github/workflows/load-test-coordinator.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/load-test-coordinator.yml b/.github/workflows/load-test-coordinator.yml index 7cd483093..723850954 100644 --- a/.github/workflows/load-test-coordinator.yml +++ b/.github/workflows/load-test-coordinator.yml @@ -51,8 +51,9 @@ jobs: REF: ${{ inputs.target_ref || github.ref_name }} run: | SHA="$(git rev-parse HEAD)" + # match the branch's open PR by head alone -- a stacked PR need not target main PR="$(gh pr list --repo "${{ github.repository }}" --state open \ - --base main --head "$REF" --json number --jq '.[0].number // ""' 2>/dev/null || true)" + --head "$REF" --json number --jq '.[0].number // ""' 2>/dev/null || true)" { echo "target_sha=$SHA" echo "pr_number=$PR" From 2bbb35e4bd47d06d75ea34f9862056be8a83a342 Mon Sep 17 00:00:00 2001 From: Christian Jonas Date: Tue, 30 Jun 2026 12:45:17 -0400 Subject: [PATCH 5/8] in ec2, build daemon to a path that doesn't collide with the repo dir --- .../perf-eval/backfill-test/runner/instantiate.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner/instantiate.go b/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner/instantiate.go index c89339b65..32f3e439a 100644 --- a/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner/instantiate.go +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner/instantiate.go @@ -25,7 +25,7 @@ const legDir = "cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eva const ( corePath = "/usr/local/bin/stellar-core" // fetched from S3 - binaryPath = "/data/stellar-rpc" // built here + binaryPath = "/data/stellar-rpc-bin" // built here (the repo checkout is /data/stellar-rpc) ) // backfillDoneRe matches the terminal line emitted on backfill's completion From cda0eb459866d1aef94d06558ea020e932811570 Mon Sep 17 00:00:00 2001 From: Christian Jonas Date: Tue, 30 Jun 2026 18:16:37 -0400 Subject: [PATCH 6/8] clean up comments, increase MaxSessionDuration and backfill time limits --- .github/workflows/backfill-test.yml | 9 ++++++--- .github/workflows/load-test-coordinator.yml | 11 +++++------ 2 files changed, 11 insertions(+), 9 deletions(-) diff --git a/.github/workflows/backfill-test.yml b/.github/workflows/backfill-test.yml index 9a52432c8..eac068bd3 100644 --- a/.github/workflows/backfill-test.yml +++ b/.github/workflows/backfill-test.yml @@ -33,6 +33,9 @@ jobs: run_label: backfill leg_script: cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/run-backfill.sh runner_dir: ./cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner - # ~1 day for cheap test runs; raise to 120960 (1 week) once the role's - # MaxSessionDuration + the leg timeouts are bumped for the multi-hour run. - extra_env: 'export HISTORY_RETENTION_WINDOW=17280' + # these cascade under the role's 8h MaxSessionDuration so deadline < poll < job < session < self-terminate + results_timeout: 28200 # 7h50m S3 poll ceiling + job_timeout_minutes: 475 # 7h55m (only effective on self-hosted, hosted runners hard-cap jobs at 6h) + role_duration_seconds: 28800 # 8h OIDC session == the role's MaxSessionDuration + self_terminate_minutes: 540 # 9h box backstop + extra_env: 'export HISTORY_RETENTION_WINDOW=120960 BACKFILL_DEADLINE=7h30m' diff --git a/.github/workflows/load-test-coordinator.yml b/.github/workflows/load-test-coordinator.yml index 723850954..a66e2eb3d 100644 --- a/.github/workflows/load-test-coordinator.yml +++ b/.github/workflows/load-test-coordinator.yml @@ -51,7 +51,6 @@ jobs: REF: ${{ inputs.target_ref || github.ref_name }} run: | SHA="$(git rev-parse HEAD)" - # match the branch's open PR by head alone -- a stacked PR need not target main PR="$(gh pr list --repo "${{ github.repository }}" --state open \ --head "$REF" --json number --jq '.[0].number // ""' 2>/dev/null || true)" { @@ -96,10 +95,8 @@ jobs: role-to-assume: ${{ secrets.AWS_GHA_ROLE_ARN }} aws-region: us-east-1 - # coordinator-runner does the rendering + history fold (it fetches each leg's - # S3 result and emits the new comment body, under unit test). The gh api glue - # -- reading the existing comment and posting/editing -- stays here. Tolerant: - # a posting hiccup must not pre-empt the gate below. + # coordinator-runner fetches each leg's S3 result and emits the new comment body, + # this ties that to the gh API - name: Render + post sticky report env: GH_TOKEN: ${{ github.token }} @@ -109,7 +106,9 @@ jobs: TARGET_SHA: ${{ needs.plan.outputs.target_sha }} TARGET_REF: ${{ inputs.target_ref || github.ref_name }} RUN_URL: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }} - LEGS: '[{"label":"Apply-load ingestion","bucket":"${{ needs.load-test.outputs.bucket }}","key":"${{ needs.load-test.outputs.result_key }}","passed":"${{ needs.load-test.outputs.passed }}","verdict":"${{ needs.load-test.outputs.verdict }}"},{"label":"Backfill ingestion","bucket":"${{ needs.backfill.outputs.bucket }}","key":"${{ needs.backfill.outputs.result_key }}","passed":"${{ needs.backfill.outputs.passed }}","verdict":"${{ needs.backfill.outputs.verdict }}"}]' + LEGS: >- + [{"label":"Apply-load ingestion","bucket":"${{ needs.load-test.outputs.bucket }}","key":"${{ needs.load-test.outputs.result_key }}","passed":"${{ needs.load-test.outputs.passed }}","verdict":"${{ needs.load-test.outputs.verdict }}"}, + {"label":"Backfill ingestion","bucket":"${{ needs.backfill.outputs.bucket }}","key":"${{ needs.backfill.outputs.result_key }}","passed":"${{ needs.backfill.outputs.passed }}","verdict":"${{ needs.backfill.outputs.verdict }}"}] run: | set -uo pipefail MARKER='' From 423131fa0e0b80e55105a7749c1d5ff326d4b6b4 Mon Sep 17 00:00:00 2001 From: Christian Jonas Date: Wed, 1 Jul 2026 14:24:39 -0400 Subject: [PATCH 7/8] experiment: adjust db parameters to optimize for backfill --- cmd/stellar-rpc/internal/daemon/daemon.go | 14 +++++++ cmd/stellar-rpc/internal/db/db.go | 44 +++++++++++++++++++--- cmd/stellar-rpc/internal/db/event.go | 7 ++-- cmd/stellar-rpc/internal/db/transaction.go | 7 ++-- 4 files changed, 58 insertions(+), 14 deletions(-) diff --git a/cmd/stellar-rpc/internal/daemon/daemon.go b/cmd/stellar-rpc/internal/daemon/daemon.go index 70d2b26a7..b156ece99 100644 --- a/cmd/stellar-rpc/internal/daemon/daemon.go +++ b/cmd/stellar-rpc/internal/daemon/daemon.go @@ -205,6 +205,14 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon { var ingestCfg ingest.Config daemon.ingestService, ingestCfg = createIngestService(cfg, logger, daemon, feewindows, historyArchive, rw) if cfg.Backfill { + // Bulk-load with a large SQLite page cache + mmap and synchronous=OFF to avoid + // the random-index write cliff that happens in backfill runs. + tunedSession, err := db.OpenSQLiteBackfillSession(cfg.SQLiteDBPath) + if err != nil { + logger.WithError(err).Fatal("failed to open backfill-tuned database session") + } + servingSession := daemon.db.UseSession(tunedSession) + backfillMeta, err := ingest.NewBackfillMeta( logger, daemon.ingestService, @@ -218,6 +226,12 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon { if err := backfillMeta.RunBackfill(cfg); err != nil { logger.WithError(err).Fatal("failed to backfill ledgers") } + + // Restore the serving session and release the tuned session's resources. + daemon.db.UseSession(servingSession) + if err := tunedSession.Close(); err != nil { + logger.WithError(err).Warn("could not close backfill-tuned database session") + } // Clear the DB cache and fee windows so they re-populate from the database daemon.db.ResetCache() feewindows.Reset() diff --git a/cmd/stellar-rpc/internal/db/db.go b/cmd/stellar-rpc/internal/db/db.go index 21b728436..628e2bcbb 100644 --- a/cmd/stellar-rpc/internal/db/db.go +++ b/cmd/stellar-rpc/internal/db/db.go @@ -78,13 +78,25 @@ func (d *DB) ResetCache() { d.cache.firstLedgerCloseTime = 0 } +// Serving DSN pragmas: +// 1. Use Write-Ahead Logging (WAL). +// 2. Disable WAL auto-checkpointing (we do the checkpointing ourselves with +// wal_checkpoint pragmas after every write transaction). +// 3. Use synchronous=NORMAL, which is faster and still safe in WAL mode. +const serveSQLitePragmas = "_journal_mode=WAL&_wal_autocheckpoint=0&_synchronous=NORMAL" + +// Backfill DSN pragmas: bulk-loads into random-key indexes cliff once they +// outgrow the default ~2MB page cache, so relative to serving we: +// 1. Use a large page cache + mmap to keep hot index pages resident. +// 2. Use synchronous=OFF (safe since backfill is restartable and gap-checked). +// 3. Keep transient b-trees in memory (temp_store). +const backfillSQLitePragmas = "_journal_mode=WAL&_wal_autocheckpoint=0&_synchronous=OFF" + + "&_cache_size=-4194304" + // 4 GiB page cache (negative value = KiB) + "&_mmap_size=4294967296" + // 4 GiB memory-mapped I/O (file-backed, reclaimable) + "&_temp_store=MEMORY" + func openSQLiteDB(dbFilePath string) (*db.Session, error) { - // 1. Use Write-Ahead Logging (WAL). - // 2. Disable WAL auto-checkpointing (we will do the checkpointing ourselves with wal_checkpoint pragmas - // after every write transaction). - // 3. Use synchronous=NORMAL, which is faster and still safe in WAL mode. - session, err := db.Open("sqlite3", - fmt.Sprintf("file:%s?_journal_mode=WAL&_wal_autocheckpoint=0&_synchronous=NORMAL", dbFilePath)) + session, err := db.Open("sqlite3", fmt.Sprintf("file:%s?%s", dbFilePath, serveSQLitePragmas)) if err != nil { return nil, fmt.Errorf("open failed: %w", err) } @@ -96,6 +108,26 @@ func openSQLiteDB(dbFilePath string) (*db.Session, error) { return session, nil } +// OpenSQLiteBackfillSession opens an additional, backfill-tuned session to the +// same SQLite file. Assumes the schema already exists and skips them. Swap onto +// the *DB with UseSession for backfill, then restore the serving session. +func OpenSQLiteBackfillSession(dbFilePath string) (db.SessionInterface, error) { + session, err := db.Open("sqlite3", fmt.Sprintf("file:%s?%s", dbFilePath, backfillSQLitePragmas)) + if err != nil { + return nil, fmt.Errorf("open backfill session failed: %w", err) + } + return session, nil +} + +// UseSession swaps the underlying session, returning the previous one. Used to +// apply backfill-specific SQLite tuning for the backfill phase without disturbing +// the metrics-wrapped serving session + restore it. Single-threaded only. +func (d *DB) UseSession(s db.SessionInterface) db.SessionInterface { + prev := d.SessionInterface + d.SessionInterface = s + return prev +} + func OpenSQLiteDBWithPrometheusMetrics(dbFilePath string, namespace string, sub db.Subservice, registry *prometheus.Registry, ) (*DB, error) { diff --git a/cmd/stellar-rpc/internal/db/event.go b/cmd/stellar-rpc/internal/db/event.go index 8b2d7e98f..e14abbca1 100644 --- a/cmd/stellar-rpc/internal/db/event.go +++ b/cmd/stellar-rpc/internal/db/event.go @@ -211,10 +211,9 @@ func (eventHandler *eventHandler) InsertEvents(lcm xdr.LedgerCloseMeta) error { } // Batch inserts to avoid exceeding SQLite's SQLITE_MAX_VARIABLE_NUMBER - // limit (32,767 by default). With 10 bind variables per event, we cap - // each INSERT at 1000 events (10,000 bind variables) to stay well - // within the limit. - const maxEventsPerBatch = 1000 + // limit (32,766 by default). 10 bind variables/event * 3,000 events = + // 30,000 bind variables < 32,766 limit. + const maxEventsPerBatch = 3000 for batchStart := 0; batchStart < len(insertableEvents); batchStart += maxEventsPerBatch { batchEnd := min(batchStart+maxEventsPerBatch, len(insertableEvents)) diff --git a/cmd/stellar-rpc/internal/db/transaction.go b/cmd/stellar-rpc/internal/db/transaction.go index 87f2cabcc..36633bedb 100644 --- a/cmd/stellar-rpc/internal/db/transaction.go +++ b/cmd/stellar-rpc/internal/db/transaction.go @@ -110,10 +110,9 @@ func (txn *transactionHandler) InsertTransactions(lcm xdr.LedgerCloseMeta) error } // Batch inserts to avoid exceeding SQLite's SQLITE_MAX_VARIABLE_NUMBER - // limit (32,766 by default). With 3 bind variables per transaction, we - // cap each INSERT at 3,000 rows (9,000 bind variables) to stay well - // within the limit. - const maxRowsPerBatch = 3000 + // limit (32,766 by default). 3 bind variables/tx * 10,000 tx rows = 30,000 + // bind variables < 32,766 limit. + const maxRowsPerBatch = 10000 rowsInBatch := 0 query := sq.Insert(transactionTableName). Columns("hash", "ledger_sequence", "application_order") From 1a55e50c35cb4492381d86fddb00f35cd248226c Mon Sep 17 00:00:00 2001 From: Christian Jonas Date: Wed, 1 Jul 2026 21:05:37 -0400 Subject: [PATCH 8/8] experiment: retune db parameters, defer building events indices --- cmd/stellar-rpc/internal/daemon/daemon.go | 20 +++- cmd/stellar-rpc/internal/db/db.go | 109 +++++++++++++++++++-- cmd/stellar-rpc/internal/ingest/service.go | 33 ++++--- 3 files changed, 136 insertions(+), 26 deletions(-) diff --git a/cmd/stellar-rpc/internal/daemon/daemon.go b/cmd/stellar-rpc/internal/daemon/daemon.go index b156ece99..a40a989a3 100644 --- a/cmd/stellar-rpc/internal/daemon/daemon.go +++ b/cmd/stellar-rpc/internal/daemon/daemon.go @@ -205,14 +205,24 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon { var ingestCfg ingest.Config daemon.ingestService, ingestCfg = createIngestService(cfg, logger, daemon, feewindows, historyArchive, rw) if cfg.Backfill { - // Bulk-load with a large SQLite page cache + mmap and synchronous=OFF to avoid - // the random-index write cliff that happens in backfill runs. + // Bulk-load with backfill-tuned SQLite pragmas via a separate session that is + // swapped in for the backfill phase and closed once it completes. tunedSession, err := db.OpenSQLiteBackfillSession(cfg.SQLiteDBPath) if err != nil { logger.WithError(err).Fatal("failed to open backfill-tuned database session") } servingSession := daemon.db.UseSession(tunedSession) + // On a fresh DB, drop the events secondary indexes so the bulk-load avoids + // random B-tree inserts; EnsureEventIndexes rebuilds them below. + if _, err := db.NewLedgerReader(daemon.db).GetLedgerRange(context.Background()); errors.Is(err, db.ErrEmptyDB) { + if err := db.DropEventIndexes(context.Background(), daemon.db, logger); err != nil { + logger.WithError(err).Fatal("failed to drop events indexes for backfill") + } + } else if err != nil { + logger.WithError(err).Fatal("failed to check database emptiness for backfill") + } + backfillMeta, err := ingest.NewBackfillMeta( logger, daemon.ingestService, @@ -236,6 +246,12 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon { daemon.db.ResetCache() feewindows.Reset() } + // Rebuild any events indexes dropped for a backfill bulk-load (also covers + // crashed-backfill restarts). Must finish before ingestService.Start: SQLite + // is single-writer and a long CREATE INDEX would starve live commits. + if err := db.EnsureEventIndexes(context.Background(), daemon.db, cfg.SQLiteDBPath, logger); err != nil { + logger.WithError(err).Fatal("failed to ensure events indexes") + } // Start ingestion service only after backfill is complete daemon.ingestService.Start(ingestCfg) diff --git a/cmd/stellar-rpc/internal/db/db.go b/cmd/stellar-rpc/internal/db/db.go index 628e2bcbb..98ecbd968 100644 --- a/cmd/stellar-rpc/internal/db/db.go +++ b/cmd/stellar-rpc/internal/db/db.go @@ -85,15 +85,18 @@ func (d *DB) ResetCache() { // 3. Use synchronous=NORMAL, which is faster and still safe in WAL mode. const serveSQLitePragmas = "_journal_mode=WAL&_wal_autocheckpoint=0&_synchronous=NORMAL" -// Backfill DSN pragmas: bulk-loads into random-key indexes cliff once they -// outgrow the default ~2MB page cache, so relative to serving we: -// 1. Use a large page cache + mmap to keep hot index pages resident. -// 2. Use synchronous=OFF (safe since backfill is restartable and gap-checked). -// 3. Keep transient b-trees in memory (temp_store). +// Backfill DSN pragmas, relative to serving: +// 1. Use synchronous=OFF (safe since backfill is restartable and gap-checked). +// 2. Use a 1 GiB page cache for hot interior B-tree pages; anything larger +// displaces the OS page cache and degrades the late run. const backfillSQLitePragmas = "_journal_mode=WAL&_wal_autocheckpoint=0&_synchronous=OFF" + - "&_cache_size=-4194304" + // 4 GiB page cache (negative value = KiB) - "&_mmap_size=4294967296" + // 4 GiB memory-mapped I/O (file-backed, reclaimable) - "&_temp_store=MEMORY" + "&_cache_size=-1048576" // 1 GiB page cache (negative value = KiB) + +// Index-build DSN pragmas: a large cache raises the CREATE INDEX sorter's +// in-memory budget (this session runs alone, single connection); default +// temp_store so multi-GB sorts spill to disk instead of RAM. +const indexBuildSQLitePragmas = "_journal_mode=WAL&_wal_autocheckpoint=0&_synchronous=NORMAL" + + "&_cache_size=-2097152" // 2 GiB page cache (negative value = KiB) func openSQLiteDB(dbFilePath string) (*db.Session, error) { session, err := db.Open("sqlite3", fmt.Sprintf("file:%s?%s", dbFilePath, serveSQLitePragmas)) @@ -128,6 +131,96 @@ func (d *DB) UseSession(s db.SessionInterface) db.SessionInterface { return prev } +type indexDDL struct { + name string + ddl string +} + +// eventIndexes are the events secondary indexes dropped during a fresh-DB +// backfill and rebuilt afterwards. DDL text must match 06_topic_indices.sql +// exactly so the rebuilt schema is identical to a migration-built one. +// NOTE for migration authors: startup migrations run before EnsureEventIndexes, +// so a future migration touching these indexes must tolerate their absence +// (e.g. DROP INDEX IF EXISTS). +// +//nolint:gochecknoglobals // effectively-constant DDL lookup +var eventIndexes = []indexDDL{ + {"idx_id_contract_id", "CREATE INDEX idx_id_contract_id ON events (contract_id, id)"}, + {"idx_id_topic1", "CREATE INDEX idx_id_topic1 ON events (topic1, id)"}, +} + +// DropEventIndexes drops the events secondary indexes so a fresh-DB backfill +// bulk-load avoids random B-tree inserts. EnsureEventIndexes rebuilds them. +func DropEventIndexes(ctx context.Context, session db.SessionInterface, logger *log.Entry) error { + for _, index := range eventIndexes { + if _, err := session.ExecRaw(ctx, "DROP INDEX IF EXISTS "+index.name); err != nil { + return fmt.Errorf("could not drop index %s: %w", index.name, err) + } + logger.Infof("Dropped events index %s for backfill bulk-load", index.name) + } + return nil +} + +// EnsureEventIndexes rebuilds any events secondary index dropped by +// DropEventIndexes. Must complete before live ingestion starts: SQLite is +// single-writer and a long CREATE INDEX would starve live commits past their +// busy timeout. +func EnsureEventIndexes(ctx context.Context, d *DB, dbFilePath string, logger *log.Entry) error { + missing, err := missingEventIndexes(ctx, d) + if err != nil { + return err + } + if len(missing) == 0 { + return nil + } + + session, err := db.Open("sqlite3", fmt.Sprintf("file:%s?%s", dbFilePath, indexBuildSQLitePragmas)) + if err != nil { + return fmt.Errorf("open index build session failed: %w", err) + } + defer func() { + if err := session.Close(); err != nil { + logger.WithError(err).Warn("could not close index build session") + } + }() + // Single connection so the threads pragma applies to the CREATE INDEX below + session.DB.SetMaxOpenConns(1) + if _, err := session.ExecRaw(ctx, "PRAGMA threads=4"); err != nil { + return fmt.Errorf("could not enable multithreaded sorter: %w", err) + } + + for _, index := range missing { + logger.Infof("Building events index %s (may take minutes, no progress output)", index.name) + startTime := time.Now() + if _, err := session.ExecRaw(ctx, index.ddl); err != nil { + return fmt.Errorf("could not build index %s: %w", index.name, err) + } + // Checkpoint now so the index's WAL debt isn't paid by the first live commit + if _, err := session.ExecRaw(ctx, "PRAGMA wal_checkpoint(TRUNCATE)"); err != nil { + return fmt.Errorf("could not checkpoint after building index %s: %w", index.name, err) + } + logger.WithField("duration", time.Since(startTime).String()). + Infof("Built events index %s", index.name) + } + return nil +} + +func missingEventIndexes(ctx context.Context, d *DB) ([]indexDDL, error) { + var missing []indexDDL + for _, index := range eventIndexes { + var count int + err := d.GetRaw(ctx, &count, + "SELECT COUNT(*) FROM sqlite_master WHERE type = 'index' AND name = ?", index.name) + if err != nil { + return nil, fmt.Errorf("could not check index %s: %w", index.name, err) + } + if count == 0 { + missing = append(missing, index) + } + } + return missing, nil +} + func OpenSQLiteDBWithPrometheusMetrics(dbFilePath string, namespace string, sub db.Subservice, registry *prometheus.Registry, ) (*DB, error) { diff --git a/cmd/stellar-rpc/internal/ingest/service.go b/cmd/stellar-rpc/internal/ingest/service.go index 5868249bb..2b8fa76c4 100644 --- a/cmd/stellar-rpc/internal/ingest/service.go +++ b/cmd/stellar-rpc/internal/ingest/service.go @@ -217,7 +217,8 @@ func (s *Service) ingest(ctx context.Context, sequence uint32) error { } }() - if err := s.ingestLedgerCloseMeta(tx, ledgerCloseMeta); err != nil { + durationMetrics := map[string]time.Duration{} + if err := s.ingestLedgerCloseMeta(tx, ledgerCloseMeta, durationMetrics); err != nil { return err } @@ -230,7 +231,6 @@ func (s *Service) ingest(ctx context.Context, sequence uint32) error { With(prometheus.Labels{"type": "fee-window"}). Observe(time.Since(feeWindowStartTime).Seconds()) - durationMetrics := map[string]time.Duration{} if err := tx.Commit(ledgerCloseMeta, durationMetrics); err != nil { return err } @@ -274,29 +274,32 @@ func (s *Service) ingestRange(ctx context.Context, backend backends.LedgerBacken }() var ledgerCloseMeta xdr.LedgerCloseMeta + durationMetrics := map[string]time.Duration{} for seq := seqRange.From(); seq <= seqRange.To(); seq++ { ledgerCloseMeta, err = backend.GetLedger(ctx, seq) if err != nil { return err } - if err := s.ingestLedgerCloseMeta(tx, ledgerCloseMeta); err != nil { + if err := s.ingestLedgerCloseMeta(tx, ledgerCloseMeta, durationMetrics); err != nil { return err } } - durationMetrics := map[string]time.Duration{} if err := tx.Commit(ledgerCloseMeta, durationMetrics); err != nil { return err } + fields := log.F{"total": time.Since(startTime).Seconds()} for key, duration := range durationMetrics { s.metrics.ingestionDurationMetric. With(prometheus.Labels{"type": key}). Observe(duration.Seconds()) + fields[key] = duration.Seconds() } + // Per-chunk phase timings for attributing where backfill time goes s.logger. - WithField("duration", time.Since(startTime).Seconds()). - Debugf("Ingested ledgers [%d, %d]", seqRange.From(), seqRange.To()) + WithFields(fields). + Infof("Ingested ledgers [%d, %d]", seqRange.From(), seqRange.To()) s.metrics.ingestionDurationMetric. With(prometheus.Labels{"type": "total"}). @@ -308,30 +311,28 @@ func (s *Service) ingestRange(ctx context.Context, backend backends.LedgerBacken return nil } -func (s *Service) ingestLedgerCloseMeta(tx db.WriteTx, ledgerCloseMeta xdr.LedgerCloseMeta) error { +// ingestLedgerCloseMeta writes a ledger's rows, accumulating per-phase durations +// into durationMetrics; callers observe/log them. +func (s *Service) ingestLedgerCloseMeta(tx db.WriteTx, ledgerCloseMeta xdr.LedgerCloseMeta, + durationMetrics map[string]time.Duration, +) error { startTime := time.Now() if err := tx.LedgerWriter().InsertLedger(ledgerCloseMeta); err != nil { return err } - s.metrics.ingestionDurationMetric. - With(prometheus.Labels{"type": "ledger_close_meta"}). - Observe(time.Since(startTime).Seconds()) + durationMetrics["ledger_close_meta"] += time.Since(startTime) startTime = time.Now() if err := tx.TransactionWriter().InsertTransactions(ledgerCloseMeta); err != nil { return err } - s.metrics.ingestionDurationMetric. - With(prometheus.Labels{"type": "transactions"}). - Observe(time.Since(startTime).Seconds()) + durationMetrics["transactions"] += time.Since(startTime) startTime = time.Now() if err := tx.EventWriter().InsertEvents(ledgerCloseMeta); err != nil { return err } - s.metrics.ingestionDurationMetric. - With(prometheus.Labels{"type": "events"}). - Observe(time.Since(startTime).Seconds()) + durationMetrics["events"] += time.Since(startTime) return nil }