diff --git a/.dockerignore b/.dockerignore index 7ecb7d59d..4baeff184 100644 --- a/.dockerignore +++ b/.dockerignore @@ -2,4 +2,4 @@ target/ storage/ .soroban/ .cargo/ -.cargo-husky/ \ No newline at end of file +.cargo-husky/ diff --git a/.github/workflows/load-test.yml b/.github/workflows/load-test.yml new file mode 100644 index 000000000..23ea60340 --- /dev/null +++ b/.github/workflows/load-test.yml @@ -0,0 +1,216 @@ +name: Load test (ephemeral) +# Launches a c5.2xlarge in Horizon (203618453975), polls S3 for the result object +# the box publishes, posts results to the PR, terminates. Box bootstrap lives in +# run-load-test.sh; runner-side polling in runner/orchestrate.go. + +on: + workflow_call: + +permissions: + id-token: write # for OIDC AssumeRole into the GHA role + contents: read + pull-requests: write + +jobs: + load-test: + name: Launch + await ephemeral load-test box + runs-on: ubuntu-latest + timeout-minutes: 225 # 210min results wait + buffer for boot/SSM/poll latency and cleanup (role lasts 240min) + env: + AWS_REGION: us-east-1 + INSTANCE_TYPE: c5.2xlarge + ROOT_VOLUME_GB: 500 + BOOTSTRAP_VOLUME_IOPS: 3000 + # 3000 IOPS is the gp3 floor; 125 MiB/s alone would need only 500. + BOOTSTRAP_VOLUME_THROUGHPUT: 125 + INSTANCE_PROFILE: stellar-rpc-ci-load-test + TEST_TAG_KEY: test + TEST_TAG_VAL: stellar-rpc-ci-load-test + SSM_REGISTRATION_TIMEOUT: 240 # SSM agent registers ~30-90s after boot + RESULTS_TIMEOUT: 12600 # 210 min wait for /tmp/done: ~55m bootstrap+build + ~90m benchmark, under the 170m go-test budget. + POLL_INTERVAL: 30 + DEBUG_LOG_LINES: 40 + DEBUG_LOG_EVERY_POLLS: 5 + LOAD_TEST_DIR: cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test + + steps: + - name: Resolve target context + id: target + env: + GH_TOKEN: ${{ github.token }} + run: | + PR_NUMBER=$(gh pr list \ + --repo "${{ github.repository }}" \ + --state open \ + --base main \ + --head "${{ github.ref_name }}" \ + --json number \ + --jq '.[0].number // ""' 2>/dev/null || true) + + RUN_LABEL="${PR_NUMBER:+pr$PR_NUMBER}" + { + echo "pr_number=$PR_NUMBER" + echo "pr_tag_value=${PR_NUMBER:-none}" + echo "run_label=${RUN_LABEL:-${{ github.ref_name }}}" + } >> "$GITHUB_OUTPUT" + + - name: Checkout target ref + uses: actions/checkout@v4 + with: + ref: ${{ github.sha }} + + # The runner-side half is `go run ... runner orchestrate`. + - uses: ./.github/actions/setup-go + + - name: Configure AWS via OIDC + uses: aws-actions/configure-aws-credentials@v4 + with: + role-to-assume: ${{ secrets.AWS_GHA_ROLE_ARN }} + aws-region: ${{ env.AWS_REGION }} + role-duration-seconds: 14400 + + - name: Resolve latest Ubuntu 22.04 AMI + id: ami + run: | + AMI=$(aws ec2 describe-images \ + --owners 099720109477 \ + --filters \ + "Name=name,Values=ubuntu/images/hvm-ssd*/ubuntu-jammy-22.04-amd64-server-*" \ + "Name=architecture,Values=x86_64" \ + "Name=state,Values=available" \ + --query 'sort_by(Images, &CreationDate)[-1].ImageId' \ + --output text) + echo "ami=$AMI" >> "$GITHUB_OUTPUT" + + - name: Render user-data + # The script ships verbatim; parameters travel in a two-line preamble + # so the bytes that run on the box match the bytes in git. + run: | + { + echo '#!/usr/bin/env bash' + echo 'export TARGET_SHA=${{ github.sha }} RUN_ID=${{ github.run_id }}-${{ github.run_attempt }}' + echo 'export BUCKET=stellar-rpc-ci-load-test RESULT_KEY=runs/${{ github.run_id }}-${{ github.run_attempt }}/result.json' + cat "$LOAD_TEST_DIR/run-load-test.sh" + } > /tmp/user-data.sh + + - name: Launch EC2 instance + id: launch + run: | + COMMON_TAGS="{Key=$TEST_TAG_KEY,Value=$TEST_TAG_VAL}, + {Key=pr,Value=${{ steps.target.outputs.pr_tag_value }}}, + {Key=ref,Value=${{ github.ref_name }}}, + {Key=sha,Value=${{ github.sha }}}, + {Key=run-id,Value=${{ github.run_id }}}" + RUN_INSTANCES_JSON=$(aws ec2 run-instances \ + --image-id "${{ steps.ami.outputs.ami }}" \ + --instance-type "$INSTANCE_TYPE" \ + --iam-instance-profile "Name=$INSTANCE_PROFILE" \ + --user-data file:///tmp/user-data.sh \ + --instance-initiated-shutdown-behavior terminate \ + --block-device-mappings "[{ + \"DeviceName\":\"/dev/sda1\", + \"Ebs\":{\"VolumeSize\":$ROOT_VOLUME_GB,\"VolumeType\":\"gp3\",\"Iops\":$BOOTSTRAP_VOLUME_IOPS,\"Throughput\":$BOOTSTRAP_VOLUME_THROUGHPUT,\"DeleteOnTermination\":true} + }]" \ + --tag-specifications \ + "ResourceType=instance,Tags=[ + {Key=Name,Value=load-test-${{ steps.target.outputs.run_label }}}, + $COMMON_TAGS + ]" \ + "ResourceType=volume,Tags=[ + {Key=Name,Value=load-test-${{ steps.target.outputs.run_label }}-root}, + $COMMON_TAGS + ]" \ + --count 1 \ + --output json) + + INSTANCE_ID=$(printf '%s' "$RUN_INSTANCES_JSON" | jq -r '.Instances[0].InstanceId') + echo "instance_id=$INSTANCE_ID" >> "$GITHUB_OUTPUT" + + - name: Acknowledge launch in PR + if: steps.target.outputs.pr_number != '' + env: + GH_TOKEN: ${{ github.token }} + run: | + if ! gh pr comment ${{ steps.target.outputs.pr_number }} \ + --repo ${{ github.repository }} \ + --body "⏳ Load test launching on \`${{ steps.launch.outputs.instance_id }}\` (commit \`${{ github.sha }}\`). + Workflow run: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }} + Posting results when the run finishes."; then + echo "::warning::Failed to post launch comment to PR #${{ steps.target.outputs.pr_number }}" + fi + + - name: Wait for SSM agent to register + env: + INSTANCE_ID: ${{ steps.launch.outputs.instance_id }} + run: | + DEADLINE=$(( $(date +%s) + SSM_REGISTRATION_TIMEOUT )) + while [ $(date +%s) -lt $DEADLINE ]; do + PING=$(aws ssm describe-instance-information \ + --filters "Key=InstanceIds,Values=$INSTANCE_ID" \ + --query 'InstanceInformationList[0].PingStatus' \ + --output text 2>/dev/null || echo "") + echo "[$(date -u +%FT%TZ)] ssm ping=$PING" + if [ "$PING" = "Online" ]; then + exit 0 + fi + sleep 10 + done + echo "::error::SSM agent never registered for $INSTANCE_ID — verify AmazonSSMManagedInstanceCore is attached to the stellar-rpc-ci-load-test role" + exit 1 + + - name: Poll for results + id: results + env: + INSTANCE_ID: ${{ steps.launch.outputs.instance_id }} + BUCKET: stellar-rpc-ci-load-test + RESULT_KEY: runs/${{ github.run_id }}-${{ github.run_attempt }}/result.json + run: go run "./$LOAD_TEST_DIR/runner" orchestrate + + - name: Write results summary + if: always() + run: | + if [ -f /tmp/results.md ]; then + cat /tmp/results.md >> "$GITHUB_STEP_SUMMARY" + elif [ -f /tmp/timeout-comment.md ]; then + cat /tmp/timeout-comment.md >> "$GITHUB_STEP_SUMMARY" + fi + + - name: Post results to PR + if: steps.target.outputs.pr_number != '' + env: + GH_TOKEN: ${{ github.token }} + run: | + if [ "${{ steps.results.outputs.found }}" = "true" ]; then + BODY=/tmp/results.md + else + BODY=/tmp/timeout-comment.md + fi + if [ ! -s "$BODY" ]; then + echo "::warning::No body to post to PR #${{ steps.target.outputs.pr_number }} ($BODY missing or empty)" + exit 0 + fi + if ! gh pr comment ${{ steps.target.outputs.pr_number }} \ + --repo ${{ github.repository }} \ + --body-file "$BODY"; then + echo "::warning::Failed to post comment to PR #${{ steps.target.outputs.pr_number }}" + fi + + - name: Fail workflow on timeout or load-test failure + if: always() + run: | + if [ "${{ steps.results.outputs.found }}" != "true" ]; then + echo "Load test timed out before producing instance results" + exit 1 + fi + + if [ "${{ steps.results.outputs.passed }}" != "true" ]; then + echo "Instance reported a failing verdict" + cat /tmp/results.md 2>/dev/null || true + exit 1 + fi + + - name: Terminate instance + if: always() && steps.launch.outputs.instance_id != '' + run: | + aws ec2 terminate-instances \ + --instance-ids ${{ steps.launch.outputs.instance_id }} || true diff --git a/README.md b/README.md index 06bfcc2d3..7c755966e 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ Integration tests: ```bash STELLAR_RPC_INTEGRATION_TESTS_ENABLED=true \ -STELLAR_RPC_INTEGRATION_TESTS_CORE_MAX_SUPPORTED_PROTOCOL=23 \ +STELLAR_RPC_INTEGRATION_TESTS_CORE_MAX_SUPPORTED_PROTOCOL=25 \ STELLAR_RPC_INTEGRATION_TESTS_CAPTIVE_CORE_BIN=$(which stellar-core) \ go test -v -failfast ./cmd/stellar-rpc/internal/integrationtest/... ``` diff --git a/cmd/stellar-rpc/internal/config/main.go b/cmd/stellar-rpc/internal/config/main.go index 0c5d25696..063f3d391 100644 --- a/cmd/stellar-rpc/internal/config/main.go +++ b/cmd/stellar-rpc/internal/config/main.go @@ -81,15 +81,43 @@ type Config struct { MaxSendTransactionExecutionDuration time.Duration MaxSimulateTransactionExecutionDuration time.Duration MaxGetFeeStatsExecutionDuration time.Duration - ServeLedgersFromDatastore bool - BufferedStorageBackendConfig ledgerbackend.BufferedStorageBackendConfig - DataStoreConfig datastore.DataStoreConfig + + ServeLedgersFromDatastore bool + BufferedStorageBackendConfig ledgerbackend.BufferedStorageBackendConfig + DataStoreConfig datastore.DataStoreConfig + + IngestLoadTest IngestLoadTestConfig // We memoize these, so they bind to pflags correctly optionsCache *Options flagset *pflag.FlagSet } +// IngestLoadTestConfig groups the options for ingesting from pre-generated synthetic +// ledger bundles. If no files are given, normal captive-core ingestion runs. +type IngestLoadTestConfig struct { + // Files are .xdr.zstd bundles of LedgerCloseMeta records produced by + // stellar-core's apply-load, replayed in order. + Files []string `toml:"files"` + // Frequency paces ingestion, replaying one synthetic ledger per duration. + // Zero means "use DefaultIngestLoadTestFrequency". + Frequency time.Duration `toml:"frequency"` + // MaxLedgersPerFile optionally caps how many ledgers are replayed from each + // file in Files. Zero replays every ledger in every file. + MaxLedgersPerFile uint32 `toml:"max_ledgers_per_file"` + // OnLedgerIngested is a test-only seam, never set from TOML/flags (hence + // toml:"-"): the load-test harness sets it so the daemon reports exact + // per-ledger ingest timing via ingest.Config.OnLedgerIngested. Nil in + // production. + OnLedgerIngested func(seq uint32, d time.Duration) `toml:"-"` +} + +func (cfg IngestLoadTestConfig) Enabled() bool { + return len(cfg.Files) > 0 +} + +const DefaultIngestLoadTestFrequency = 2 * time.Second + func (cfg *Config) ExtendedUserAgent(extension string) string { if cfg.HistoryArchiveUserAgent == "" { return extension diff --git a/cmd/stellar-rpc/internal/config/options.go b/cmd/stellar-rpc/internal/config/options.go index f0158cc38..8cf3fb9c2 100644 --- a/cmd/stellar-rpc/internal/config/options.go +++ b/cmd/stellar-rpc/internal/config/options.go @@ -617,11 +617,7 @@ func (cfg *Config) options() Options { return unmarshalTOMLTree(i, option.ConfigKey, "buffered_storage_backend_config") }, MarshalTOML: func(_ *Option) (any, error) { - tomlBytes, err := toml.Marshal(defaultBufferedStorageBackendConfig()) - if err != nil { - return nil, fmt.Errorf("failed to marshal buffered_storage_backend_config: %w", err) - } - return toml.LoadBytes(tomlBytes) + return marshalTOMLTree(defaultBufferedStorageBackendConfig(), "buffered_storage_backend_config") }, }, { @@ -632,11 +628,20 @@ func (cfg *Config) options() Options { return unmarshalTOMLTree(i, option.ConfigKey, "datastore_config") }, MarshalTOML: func(_ *Option) (any, error) { - tomlBytes, err := toml.Marshal(defaultDataStoreConfig()) - if err != nil { - return nil, fmt.Errorf("failed to marshal datastore_config: %w", err) - } - return toml.LoadBytes(tomlBytes) + return marshalTOMLTree(defaultDataStoreConfig(), "datastore_config") + }, + }, + { + TomlKey: "ingest_load_test_config", + ConfigKey: &cfg.IngestLoadTest, + Usage: "Load testing configuration: replay pre-generated .xdr.zstd ledger bundles " + + "through ingestion. Subkeys: files (list of bundle paths), frequency (duration; " + + "defaults to 2s), max_ledgers_per_file (0 = all). WARNING: destructive to your database.", + CustomSetValue: func(option *Option, i any) error { + return unmarshalTOMLTree(i, option.ConfigKey, "ingest_load_test_config") + }, + MarshalTOML: func(_ *Option) (any, error) { + return marshalTOMLTree(defaultIngestLoadTestConfig(), "ingest_load_test_config") }, }, } @@ -664,6 +669,22 @@ func defaultDataStoreConfig() datastore.DataStoreConfig { } } +func defaultIngestLoadTestConfig() IngestLoadTestConfig { + return IngestLoadTestConfig{ + Frequency: DefaultIngestLoadTestFrequency, + } +} + +// marshalTOMLTree renders a sub-config struct as the TOML tree the option's +// MarshalTOML hook must return. +func marshalTOMLTree(v any, configName string) (any, error) { + tomlBytes, err := toml.Marshal(v) + if err != nil { + return nil, fmt.Errorf("failed to marshal %s: %w", configName, err) + } + return toml.LoadBytes(tomlBytes) +} + func unmarshalTOMLTree(tree any, out any, configName string) error { t, ok := tree.(*toml.Tree) if !ok { diff --git a/cmd/stellar-rpc/internal/config/toml_test.go b/cmd/stellar-rpc/internal/config/toml_test.go index 4df290558..435795221 100644 --- a/cmd/stellar-rpc/internal/config/toml_test.go +++ b/cmd/stellar-rpc/internal/config/toml_test.go @@ -139,6 +139,12 @@ func TestRoundTrip(t *testing.T) { *v = logrus.InfoLevel case *LogFormat: *v = LogFormatText + case *IngestLoadTestConfig: + *v = IngestLoadTestConfig{ + Files: []string{"a.xdr.zstd", "b.xdr.zstd"}, + Frequency: 5 * time.Second, + MaxLedgersPerFile: 100, + } case *ledgerbackend.BufferedStorageBackendConfig: *v = defaultBufferedStorageBackendConfig() case *datastore.DataStoreConfig: diff --git a/cmd/stellar-rpc/internal/daemon/daemon.go b/cmd/stellar-rpc/internal/daemon/daemon.go index a053d8324..70d2b26a7 100644 --- a/cmd/stellar-rpc/internal/daemon/daemon.go +++ b/cmd/stellar-rpc/internal/daemon/daemon.go @@ -22,6 +22,7 @@ import ( "github.com/stellar/go-stellar-sdk/clients/stellarcore" "github.com/stellar/go-stellar-sdk/historyarchive" "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend" + "github.com/stellar/go-stellar-sdk/ingest/loadtest" "github.com/stellar/go-stellar-sdk/support/datastore" supporthttp "github.com/stellar/go-stellar-sdk/support/http" supportlog "github.com/stellar/go-stellar-sdk/support/log" @@ -322,14 +323,36 @@ func createIngestService(cfg *config.Config, logger *supportlog.Entry, daemon *D logger.WithError(err).Error("could not run ingestion. Retrying") } + var backend ledgerbackend.LedgerBackend = daemon.core + if cfg.IngestLoadTest.Enabled() { + // CustomSetValue/MarshalTOML doesn't apply DefaultValue, so fall back here. + frequency := cfg.IngestLoadTest.Frequency + if frequency == 0 { + frequency = config.DefaultIngestLoadTestFrequency + } + logger. + WithField("files", cfg.IngestLoadTest.Files). + WithField("max_ledgers_per_file", cfg.IngestLoadTest.MaxLedgersPerFile). + WithField("close_time", frequency). + Warn("Ingestion will run with load testing") + + backend = loadtest.NewLedgerBackend(loadtest.LedgerBackendConfig{ + NetworkPassphrase: cfg.NetworkPassphrase, + LedgersFilePaths: cfg.IngestLoadTest.Files, + LedgerCloseDuration: frequency, + MaxLedgersPerFile: cfg.IngestLoadTest.MaxLedgersPerFile, + }) + } + ingestCfg := ingest.Config{ Logger: logger, DB: rw, NetworkPassPhrase: cfg.NetworkPassphrase, Archive: *historyArchive, - LedgerBackend: daemon.core, + LedgerBackend: backend, Timeout: cfg.IngestionTimeout, OnIngestionRetry: onIngestionRetry, + OnLedgerIngested: cfg.IngestLoadTest.OnLedgerIngested, Daemon: daemon, FeeWindows: feewindows, } @@ -431,6 +454,12 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows d.db, ) + // In load-test mode the existing DB is treated as opaque carrier state for + // ingestion timing; skip the fee-stat / migration backfill + if cfg.IngestLoadTest.Enabled() { + return feeWindows + } + // 1. First, identify the ledger range for database migrations based on the // ledger retention window. Since we don't do "partial" migrations (all or // nothing), this represents the entire range of ledger metas we store. diff --git a/cmd/stellar-rpc/internal/ingest/service.go b/cmd/stellar-rpc/internal/ingest/service.go index 5808974e2..5868249bb 100644 --- a/cmd/stellar-rpc/internal/ingest/service.go +++ b/cmd/stellar-rpc/internal/ingest/service.go @@ -12,6 +12,7 @@ import ( "github.com/stellar/go-stellar-sdk/historyarchive" backends "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend" + "github.com/stellar/go-stellar-sdk/ingest/loadtest" "github.com/stellar/go-stellar-sdk/support/log" "github.com/stellar/go-stellar-sdk/xdr" @@ -37,7 +38,11 @@ type Config struct { LedgerBackend backends.LedgerBackend Timeout time.Duration OnIngestionRetry backoff.Notify - Daemon interfaces.Daemon + // OnLedgerIngested, if non-nil, is invoked after each ledger commits with its + // sequence and the duration that's recorded to ledger_ingestion_duration_seconds. + // Set by the load test to capture exact per-ledger timing without polling. + OnLedgerIngested func(seq uint32, d time.Duration) + Daemon interfaces.Daemon } func NewService(cfg Config) *Service { @@ -81,6 +86,7 @@ func newService(cfg Config) *Service { ledgerBackend: cfg.LedgerBackend, networkPassPhrase: cfg.NetworkPassPhrase, timeout: cfg.Timeout, + onLedgerIngested: cfg.OnLedgerIngested, metrics: Metrics{ ingestionDurationMetric: ingestionDurationMetric, latestLedgerMetric: latestLedgerMetric, @@ -110,11 +116,16 @@ func (s *Service) Start(cfg Config) { // keep retrying until history archives are published constantBackoff.Reset() } + if errors.Is(err, loadtest.ErrLoadTestDone) { + // Load-test mode: synthetic ledger stream is exhausted. + // Stop retrying and let the daemon stay up so its DB can be queried + return backoff.Permanent(err) + } return err }, contextBackoff, cfg.OnIngestionRetry) - if err != nil && !errors.Is(err, context.Canceled) { + if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, loadtest.ErrLoadTestDone) { s.logger.WithError(err).Fatal("could not run ingestion") } }) @@ -137,6 +148,7 @@ type Service struct { wg sync.WaitGroup metrics Metrics latestIngestedSeq uint32 + onLedgerIngested func(seq uint32, d time.Duration) } func (s *Service) Close() error { @@ -232,9 +244,13 @@ func (s *Service) ingest(ctx context.Context, sequence uint32) error { WithField("duration", time.Since(totalStartTime).Seconds()). Debugf("Ingested ledger %d", sequence) + total := time.Since(totalStartTime) s.metrics.ingestionDurationMetric. With(prometheus.Labels{"type": "total"}). - Observe(time.Since(totalStartTime).Seconds()) + Observe(total.Seconds()) + if s.onLedgerIngested != nil { + s.onLedgerIngested(sequence, total) + } if sequence > s.latestIngestedSeq { s.latestIngestedSeq = sequence s.metrics.latestLedgerMetric.Set(float64(sequence)) diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/run-load-test.sh b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/run-load-test.sh new file mode 100755 index 000000000..3fb389458 --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/run-load-test.sh @@ -0,0 +1,123 @@ +#!/usr/bin/env bash +# Bootstraps the ephemeral load-test box (EC2 user-data): installs the toolchain, +# checks out TARGET_SHA, then hands off to `runner instantiate`, which streams the +# corpus from S3 and runs the ingest benchmark. +# The other half, `runner orchestrate`, polls S3 for the result object. +# +# Result protocol: the box publishes one object to s3://$BUCKET/$RESULT_KEY holding +# {schemaVersion, verdict, markdown, bench, runId, targetSha}. The Go runner +# publishes the success object; this script publishes the fail object so the +# orchestrator always sees a verdict. + +set -euo pipefail +log() { echo "[$(date -u +%FT%TZ)] $*"; } + +exec > >(tee -a /var/log/user-data.log | logger -t user-data -s 2>/dev/console) 2>&1 + +# Hard self-terminate ceiling, independent of the GHA runner for if the runner is +# force-cancelled or crashes and skips its Terminate step. +# Sits above the workflow's 225min job timeout so it never pre-empts a healthy run. +SELF_TERMINATE_MINUTES="${SELF_TERMINATE_MINUTES:-240}" +shutdown -P "+${SELF_TERMINATE_MINUTES}" "load-test self-terminate ceiling" \ + || log "WARN: could not schedule self-terminate ceiling" + +TARGET_SHA="${TARGET_SHA:-}" +RUN_ID="${RUN_ID:-manual}" +REPO="${REPO:-stellar/stellar-rpc}" +WORK_DIR="${WORK_DIR:-/data}" +RESULTS_FILE="${RESULTS_FILE:-/tmp/results.md}" +BUCKET="${BUCKET:-stellar-rpc-ci-load-test}" +RESULT_KEY="${RESULT_KEY:-}" +DEFAULT_BRANCH=main + +# Install the AWS CLI + jq early so any later failure can still publish a result to S3. +log "installing aws cli + jq" +export DEBIAN_FRONTEND=noninteractive +apt-get update -qq +apt-get install -y -qq --no-install-recommends awscli jq curl ca-certificates + +# upload_result publishes {verdict, markdown} as the run's result object so the +# orchestrator (polling S3) sees a verdict. Covers the fail paths. +upload_result() { + local verdict="$1" body_file="$2" + if [ -z "$BUCKET" ] || [ -z "$RESULT_KEY" ]; then + log "WARN: BUCKET/RESULT_KEY unset; cannot publish $verdict result" + return 0 + fi + [ -s "$body_file" ] || printf 'Load test failed before producing a result body.\n' > "$body_file" + jq -n --arg v "$verdict" --rawfile md "$body_file" \ + --arg run "$RUN_ID" --arg sha "$TARGET_SHA" \ + '{schemaVersion: 1, verdict: $v, markdown: $md, runId: $run, targetSha: $sha}' > /tmp/result.json + aws s3api put-object --bucket "$BUCKET" --key "$RESULT_KEY" \ + --content-type application/json --body /tmp/result.json >/dev/null +} + +# bail publishes a fail result the orchestrator can read, then stops. It guards +# only the pre-Go bootstrap phase. +bail() { + log "FATAL: $*" + { printf '❌ **Ingest load test failed** (run %s on `%s`)\n\n```\n' "$RUN_ID" "$TARGET_SHA" + printf '%s\n' "$*" + printf '```\n'; } > "$RESULTS_FILE" + upload_result fail "$RESULTS_FILE" || log "WARN: fail result upload failed" + exit 1 +} +trap 'bail "unhandled error at line $LINENO while running: $BASH_COMMAND"' ERR + +log "clearing stale run state" +rm -f /tmp/bench-results.json /tmp/load-test-ledgers-*.xdr.zstd \ + "$RESULTS_FILE" +rm -rf "$WORK_DIR/stellar-rpc" + +log "installing build deps" +apt-get install -y -qq --no-install-recommends \ + git build-essential \ + libpq5 libsodium23 libunwind8 libc++1-14 + +GO_VERSION=1.25.11 +curl -fsSL "https://go.dev/dl/go${GO_VERSION}.linux-amd64.tar.gz" | tar -xz -C /usr/local +export HOME="${HOME:-/root}" +export GOPATH="${GOPATH:-$HOME/go}" +export GOMODCACHE="${GOMODCACHE:-$GOPATH/pkg/mod}" +export GOCACHE="${GOCACHE:-$HOME/.cache/go-build}" +export CARGO_HOME=/root/.cargo +export RUSTUP_HOME=/root/.rustup +export PATH="/usr/local/go/bin:${CARGO_HOME}/bin:$PATH" +mkdir -p "$GOMODCACHE" "$GOCACHE" "$GOPATH/bin" + +command -v cargo >/dev/null || curl -fsSL https://sh.rustup.rs \ + | sh -s -- -y --profile minimal --default-toolchain stable + +# build-libs needs real git metadata, so this is a git tree/not a source archive. +mkdir -p "$WORK_DIR" +cd "$WORK_DIR" +mkdir -p stellar-rpc && cd stellar-rpc +git init -q +git remote add origin "https://github.com/$REPO.git" +if [ -z "$TARGET_SHA" ]; then + log "TARGET_SHA unset; shallow fetching origin/$DEFAULT_BRANCH" + git fetch --depth 1 origin "$DEFAULT_BRANCH" || bail "failed to fetch origin/$DEFAULT_BRANCH" + git checkout --detach FETCH_HEAD + TARGET_SHA=$(git rev-parse HEAD) +elif git fetch --depth 1 origin "$TARGET_SHA"; then + git checkout --detach FETCH_HEAD +else + log "direct commit fetch failed; falling back to full clone" + cd "$WORK_DIR" && rm -rf stellar-rpc + git clone "https://github.com/$REPO.git" stellar-rpc && cd stellar-rpc + git fetch origin "+refs/pull/*:refs/remotes/origin/pr/*" 2>/dev/null || true + git checkout "$TARGET_SHA" +fi +log "checked out $TARGET_SHA; handing off to the Go runner" + +# The Go runner owns the verdict from here -> release the bootstrap trap. On +# success it publishes the result object itself; on any non-zero exit it has +# written the failure body to RESULTS_FILE (or died before doing so), so we +# publish the fail result it couldn't. +trap - ERR +export TARGET_SHA RUN_ID REPO WORK_DIR RESULTS_FILE BUCKET RESULT_KEY +if ! go run ./cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/runner instantiate; then + log "go runner exited non-zero; publishing fail result" + upload_result fail "$RESULTS_FILE" + exit 1 +fi diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/runner/instantiate.go b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/runner/instantiate.go new file mode 100644 index 000000000..032999d93 --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/runner/instantiate.go @@ -0,0 +1,333 @@ +package main + +import ( + "bytes" + "context" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "strings" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/klauspost/compress/zstd" +) + +// result is the structured run outcome the box publishes to S3 as an atomic object +// The orchestrator polls for it and reads either a complete object or a 404. +type result struct { + SchemaVersion int `json:"schemaVersion"` + Verdict string `json:"verdict"` // "ok" or "fail" + Markdown string `json:"markdown"` + Bench json.RawMessage `json:"bench,omitempty"` + RunID string `json:"runId"` + TargetSHA string `json:"targetSha"` +} + +func env(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def +} + +// ledgerScenarios are the apply-load profiles ingested as one concatenated stream, +// one bundle per scenario (load-test-ledgers--.xdr.zstd). +var ( + ledgerScenarios = []string{"oz", "sac", "soroswap"} + curVersion = "v27" // version of the above bundles +) + +// instantiate is the instance half after the bootstrap, which streams the corpus +// from S3, runs the benchmark, and writes the ok/fail verdict. +func instantiate(ctx context.Context) error { + var ( + bucket = env("BUCKET", "stellar-rpc-ci-load-test") + region = env("REGION", "us-east-1") + workDir = env("WORK_DIR", "/data") + goldenDB = env("GOLDEN_DB", filepath.Join(workDir, "golden.sqlite")) + resultsFile = env("RESULTS_FILE", "/tmp/results.md") + benchResults = env("BENCH_RESULTS", "/tmp/bench-results.json") + resultKey = os.Getenv("RESULT_KEY") + targetSHA = os.Getenv("TARGET_SHA") + runID = env("RUN_ID", "manual") + ) + + repoRoot, err := os.Getwd() + if err != nil { + return err + } + bail := func(format string, args ...any) error { + return bailInstance(resultsFile, runID, targetSHA, fmt.Sprintf(format, args...)) + } + + awsCfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(region)) + if err != nil { + return bail("loading AWS config: %v", err) + } + fetch := &s3Fetcher{client: s3.NewFromConfig(awsCfg), bucket: bucket} + + bundlePaths, goldenFetchSecs, err := fetchCorpus(ctx, fetch, goldenDB) + if err != nil { + return bail("%v", err) + } + + logger.Infof("download complete") + + logger.Infof("building rpc libs") + if err := runStreaming(ctx, repoRoot, nil, 40, "make", "build-libs"); err != nil { + return bail("make build-libs failed: %v", err) + } + + logger.Infof("running ingest perf benchmark") + benchEnv := []string{ + "LOADTEST_INGEST_LEDGER_PATH=" + strings.Join(bundlePaths, ","), + "LOADTEST_INGEST_DEADLINE=" + env("LOADTEST_INGEST_DEADLINE", "150m"), + "LOADTEST_SQLITE_PATH=" + goldenDB, + "PERF_RESULTS_PATH=" + benchResults, + "PERF_RESULTS_MD_PATH=" + resultsFile, + "PERF_TARGET_SHA=" + targetSHA, + "PERF_RUN_ID=" + runID, + "PERF_REPO=" + env("REPO", "stellar/stellar-rpc"), + fmt.Sprintf("PERF_GOLDEN_FETCH_SECONDS=%d", goldenFetchSecs), + "STELLAR_RPC_INTEGRATION_TESTS_ENABLED=true", + } + if err := runStreaming(ctx, repoRoot, benchEnv, 80, + "go", "test", "-run", "TestIngestSyntheticLedgers", "-timeout", "170m", "-v", + "./cmd/stellar-rpc/internal/integrationtest/"); err != nil { + return bail("benchmark failed:\n%v", err) + } + + if fi, err := os.Stat(resultsFile); err != nil || fi.Size() == 0 { + return bail("benchmark succeeded but did not emit %s", resultsFile) + } + logger.Infof("results ready; publishing verdict") + if err := publishResult( + ctx, fetch.client, bucket, resultKey, "ok", runID, targetSHA, resultsFile, benchResults, + ); err != nil { + return bail("publishing result: %v", err) + } + return nil +} + +// bailInstance writes the failure body and exits non-zero for the bash wrapper +func bailInstance(resultsFile, runID, targetSHA, msg string) error { + logger.Error(msg) + body := fmt.Sprintf("❌ **Ingest load test failed** (run %s on `%s`)\n\n```\n%s\n```\n", runID, targetSHA, msg) + _ = os.WriteFile(resultsFile, []byte(body), 0o644) + os.Exit(1) + return nil // unreachable +} + +// publishResult uploads the run result to s3://bucket/key as one atomic object +// that the orchestrator can poll for. +func publishResult( + ctx context.Context, + client *s3.Client, + bucket, key, verdict, runID, targetSHA, mdPath, benchPath string, +) error { + if key == "" { + logger.Infof("RESULT_KEY unset; skipping S3 result publish (verdict: %s)", verdict) + return nil + } + md, err := os.ReadFile(mdPath) + if err != nil { + return fmt.Errorf("reading result markdown %s: %w", mdPath, err) + } + res := result{ + SchemaVersion: 1, + Verdict: verdict, + Markdown: string(md), + RunID: runID, + TargetSHA: targetSHA, + } + if bench, berr := os.ReadFile(benchPath); berr == nil { + res.Bench = json.RawMessage(bench) + } else { + logger.Warnf("bench results %s unavailable: %v", benchPath, berr) + } + body, err := json.Marshal(res) + if err != nil { + return fmt.Errorf("marshaling result: %w", err) + } + logger.Infof("publishing result to s3://%s/%s (verdict: %s, %d bytes)", bucket, key, verdict, len(body)) + if _, err := client.PutObject(ctx, &s3.PutObjectInput{ + Bucket: &bucket, + Key: &key, + Body: bytes.NewReader(body), + ContentType: aws.String("application/json"), + }); err != nil { + return fmt.Errorf("uploading result: %w", err) + } + return nil +} + +// runStreaming runs name in dir (with extra env appended) and streams combined +// output to our log. On failure, the error carries the last tailN lines. +func runStreaming(ctx context.Context, dir string, env []string, tailN int, name string, args ...string) error { + cmd := exec.CommandContext(ctx, name, args...) + cmd.Dir = dir + cmd.Env = append(os.Environ(), env...) + // The full stream goes to stderr (and on to the box's user-data log), but we + // keep a bounded tail in memory for the error. + tail := &tailWriter{max: 64 << 10} + w := io.MultiWriter(os.Stderr, tail) + cmd.Stdout, cmd.Stderr = w, w + if err := cmd.Run(); err != nil { + return fmt.Errorf("%w\n%s", err, lastLines(tail.String(), tailN)) + } + return nil +} + +// tailWriter is a ring buffer-writer that retains the last max bytes written to it. +type tailWriter struct { + max int + buf []byte +} + +func (w *tailWriter) Write(p []byte) (int, error) { + w.buf = append(w.buf, p...) + if len(w.buf) > w.max { + w.buf = w.buf[len(w.buf)-w.max:] + } + return len(p), nil +} + +func (w *tailWriter) String() string { return string(w.buf) } + +func lastLines(s string, n int) string { + lines := strings.Split(strings.TrimRight(s, "\n"), "\n") + if len(lines) > n { + lines = lines[len(lines)-n:] + } + return strings.Join(lines, "\n") +} + +// fetchCorpus streams the golden DB, stellar-core, and ledger bundles from S3, +// returning the bundle paths and the golden DB fetch duration. +func fetchCorpus(ctx context.Context, fetch *s3Fetcher, goldenDB string) ([]string, int, error) { + // current/prev1/prev2 lets a run fall back to an older golden DB snapshot + // while a fresh one is being published. + goldenFetchSecs := -1 + for _, pfx := range []string{"current", "prev1", "prev2"} { + key := pfx + "/golden.sqlite.zst" + logger.Infof("streaming s3://%s/%s", fetch.bucket, key) + start := time.Now() + if err := fetch.fetchVerified(ctx, key, goldenDB, true, "golden DB"); err != nil { + logger.Infof("%v", err) + _ = os.Remove(goldenDB) + continue + } + goldenFetchSecs = int(time.Since(start).Seconds()) + logger.Infof("golden DB ready in %ds", goldenFetchSecs) + break + } + if goldenFetchSecs < 0 { + return nil, 0, errors.New("no golden.sqlite.zst in current/, prev1/, or prev2/") + } + + const corePath = "/usr/local/bin/stellar-core" // fetch pre-built core cached in S3 + if err := fetch.fetchVerified(ctx, "core/stellar-core.zst", corePath, true, "stellar-core"); err != nil { + return nil, 0, err + } + if err := os.Chmod(corePath, 0o755); err != nil { + return nil, 0, fmt.Errorf("chmod stellar-core: %w", err) + } + + var bundlePaths []string + for _, sc := range ledgerScenarios { + bundlePath := fmt.Sprintf("/tmp/load-test-ledgers-%s-%s.xdr.zstd", curVersion, sc) + key := fmt.Sprintf("ledgers/load-test-ledgers-%s-%s.xdr.zstd", curVersion, sc) + if err := fetch.fetchVerified(ctx, key, bundlePath, false, "ledger bundle ("+sc+")"); err != nil { + return nil, 0, err + } + bundlePaths = append(bundlePaths, bundlePath) + } + return bundlePaths, goldenFetchSecs, nil +} + +// s3Fetcher streams objects from one bucket, sha-verifying when the object +// carries sha256-raw metadata. +type s3Fetcher struct { + client *s3.Client + bucket string +} + +// fetchVerified downloads key to dst (zstd-decoding when zstdMode), checking its +// sha256 against the object's sha256-raw metadata when present. +func (f *s3Fetcher) fetchVerified(ctx context.Context, key, dst string, zstdMode bool, label string) error { + expected := f.expectedSHA(ctx, key, label) + logger.Infof("fetching %s", label) + got, err := f.streamObject(ctx, key, dst, zstdMode) + if err != nil { + return fmt.Errorf("failed to download %s: %w", label, err) + } + if expected != "" && expected != got { + return fmt.Errorf("%s hash mismatch: expected %s, got %s", label, expected, got) + } + if expected == "" { + logger.Infof("%s hash computed (unverified) (%s)", label, got) + } else { + logger.Infof("%s hash OK (%s)", label, got) + } + return nil +} + +// expectedSHA returns the object's sha256-raw metadata, or "" if the object or +// the key is absent (caller then fetches unverified). +func (f *s3Fetcher) expectedSHA(ctx context.Context, key, label string) string { + head, err := f.client.HeadObject(ctx, &s3.HeadObjectInput{Bucket: &f.bucket, Key: &key}) + if err != nil { + logger.Warnf("head-object failed for s3://%s/%s; fetching %s without checksum", f.bucket, key, label) + return "" + } + // S3 lowercases user-metadata keys; the SDK strips the x-amz-meta- prefix. + if sha := head.Metadata["sha256-raw"]; sha != "" { + return sha + } + logger.Warnf("no sha256-raw on s3://%s/%s; skipping %s checksum", f.bucket, key, label) + return "" +} + +// streamObject downloads key to dst (zstd-decoding when zstdMode) and returns +// the sha256 of the bytes written. +func (f *s3Fetcher) streamObject(ctx context.Context, key, dst string, zstdMode bool) (string, error) { + out, err := f.client.GetObject(ctx, &s3.GetObjectInput{Bucket: &f.bucket, Key: &key}) + if err != nil { + return "", err + } + defer out.Body.Close() + + var src io.Reader = out.Body + if zstdMode { + zr, err := zstd.NewReader(out.Body) + if err != nil { + return "", err + } + defer zr.Close() + src = zr + } + + file, err := os.Create(dst) + if err != nil { + return "", err + } + defer file.Close() + + hasher := sha256.New() + if _, err := io.Copy(io.MultiWriter(file, hasher), src); err != nil { + return "", err + } + if err := file.Sync(); err != nil { + return "", err + } + return hex.EncodeToString(hasher.Sum(nil)), nil +} diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/runner/orchestrate.go b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/runner/orchestrate.go new file mode 100644 index 000000000..4407281f2 --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/runner/orchestrate.go @@ -0,0 +1,274 @@ +// Command runner drives the ephemeral RPC ingestion load test. It has two +// subcommands, one per environment the test spans: +// +// runner instantiate on the EC2 box, after a shell preamble has installed +// the toolchain and checked out the repo: streams the +// golden DB, stellar-core, and ledger bundles from S3 +// (sha-verified), runs the ingest benchmark, and writes +// an ok/fail verdict. +// runner orchestrate on the GHA runner: polls S3 for the result object the +// instance publishes and relays the verdict + results as +// step outputs. SSM carries only best-effort debug tails. +// +// The two halves coordinate through a single S3 result object (see type result). +// SSM is used only for live-progress and timeout diagnostics. +package main + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "io" + "os" + "strconv" + "strings" + "time" + + "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/config" + "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/aws/aws-sdk-go-v2/service/ssm" + "github.com/aws/smithy-go" + + supportlog "github.com/stellar/go-stellar-sdk/support/log" +) + +var logger = supportlog.New() + +func main() { + logger.SetLevel(supportlog.InfoLevel) + + cmd := "instantiate" + if len(os.Args) > 1 { + cmd = os.Args[1] + } + + ctx := context.Background() + var err error + switch cmd { + case "instantiate": + err = instantiate(ctx) + case "orchestrate": + err = orchestrate(ctx) + default: + fmt.Fprintf(os.Stderr, "usage: %s [instantiate|orchestrate]\n", os.Args[0]) + os.Exit(64) + } + if err != nil { + logger.Errorf("fatal: %v", err) + os.Exit(1) + } +} + +// commandWaitTimeout backstops a stuck SSM command (the debug-tail reads). +const commandWaitTimeout = 60 * time.Second + +// requireEnv returns the values of keys in order, erroring with every unset one. +func requireEnv(keys ...string) ([]string, error) { + vals := make([]string, len(keys)) + var missing []string + for i, k := range keys { + if vals[i] = os.Getenv(k); vals[i] == "" { + missing = append(missing, k) + } + } + if len(missing) > 0 { + return nil, fmt.Errorf("missing required env: %s", strings.Join(missing, ", ")) + } + return vals, nil +} + +// orchestrate polls the box until it reports a verdict and relays the result as step outputs +// On timeout it writes a debug comment instead. +func orchestrate(ctx context.Context) error { + vals, err := requireEnv("INSTANCE_ID", "AWS_REGION", + "RESULTS_TIMEOUT", "POLL_INTERVAL", "GITHUB_OUTPUT", "DEBUG_LOG_LINES", "DEBUG_LOG_EVERY_POLLS", + "BUCKET", "RESULT_KEY") + if err != nil { + return err + } + instanceID, region, githubOutput := vals[0], vals[1], vals[4] + bucket, resultKey := vals[7], vals[8] + + resultsTimeoutSec, err := strconv.Atoi(vals[2]) + if err != nil { + return fmt.Errorf("RESULTS_TIMEOUT: %w", err) + } + pollIntervalSec, err := strconv.Atoi(vals[3]) + if err != nil { + return fmt.Errorf("POLL_INTERVAL: %w", err) + } + debugLogLines, err := strconv.Atoi(vals[5]) + if err != nil { + return fmt.Errorf("DEBUG_LOG_LINES: %w", err) + } + debugEveryPolls, err := strconv.Atoi(vals[6]) + if err != nil { + return fmt.Errorf("DEBUG_LOG_EVERY_POLLS: %w", err) + } + resultsTimeout := time.Duration(resultsTimeoutSec) * time.Second + pollInterval := time.Duration(pollIntervalSec) * time.Second + + awsCfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(region)) + if err != nil { + return err + } + s3Client := s3.NewFromConfig(awsCfg) + runner := &ssmRunner{client: ssm.NewFromConfig(awsCfg), instanceID: instanceID} + + deadline := time.Now().Add(resultsTimeout) + for pollCount := 1; time.Now().Before(deadline); pollCount++ { + res, derr := fetchResult(ctx, s3Client, bucket, resultKey) + switch { + case errors.Is(derr, errResultNotReady): + logger.Infof("still waiting for s3://%s/%s", bucket, resultKey) + case derr != nil: + logger.Warnf("result fetch failed; retrying: %v", derr) + default: + logger.Infof("result published by instance (verdict: %s)", res.Verdict) + if werr := os.WriteFile("/tmp/results.md", []byte(res.Markdown), 0o644); werr != nil { + return werr + } + return appendOutputs(githubOutput, + "found=true", + fmt.Sprintf("passed=%t", res.Verdict == "ok")) + } + + if pollCount%debugEveryPolls == 0 { + logger.Infof("debug tail:\n%s", runner.debugTail(ctx, debugLogLines)) + } + time.Sleep(pollInterval) + } + + return writeTimeoutComment(ctx, runner, githubOutput, instanceID, resultsTimeout, debugLogLines) +} + +// ssmRunner runs shell commands on one instance over SSM RunShellScript. +type ssmRunner struct { + client *ssm.Client + instanceID string +} + +// capture dispatches command, waits for it, and returns its stdout. A non-nil +// error means dispatch failed; an unreadable result is "". +func (r *ssmRunner) capture(ctx context.Context, command string) (string, error) { + var id string + var sendErr error + for attempt := 1; attempt <= 3; attempt++ { + out, err := r.client.SendCommand(ctx, &ssm.SendCommandInput{ + InstanceIds: []string{r.instanceID}, + DocumentName: aws.String("AWS-RunShellScript"), + Parameters: map[string][]string{"commands": {command}}, + }) + if err == nil { + id = aws.ToString(out.Command.CommandId) + break + } + sendErr = err + logger.Warnf("ssm send-command attempt %d failed", attempt) + time.Sleep(5 * time.Second) + } + if id == "" { + return "", fmt.Errorf("ssm send-command failed: %w", sendErr) + } + + in := &ssm.GetCommandInvocationInput{CommandId: &id, InstanceId: &r.instanceID} + _ = ssm.NewCommandExecutedWaiter(r.client).Wait(ctx, in, commandWaitTimeout) + inv, err := r.client.GetCommandInvocation(ctx, in) + if err != nil { + // Unreadable result is "not ready", not a dispatch failure. + return "", nil //nolint:nilerr + } + return aws.ToString(inv.StandardOutputContent), nil +} + +// debugTail returns the last n lines of the box's user-data log, or a sentinel. +func (r *ssmRunner) debugTail(ctx context.Context, n int) string { + cmd := fmt.Sprintf("if [ -f /var/log/user-data.log ]; then tail -n %d /var/log/user-data.log; "+ + "else echo __NO_DEBUG_LOG__; fi", n) + out, err := r.capture(ctx, cmd) + if err != nil || out == "" { + return "__DEBUG_TAIL_UNAVAILABLE__" + } + return out +} + +// errResultNotReady means the result object hasn't been published yet. +var errResultNotReady = errors.New("result not published yet") + +// fetchResult gets and decodes the result object, returning errResultNotReady +// when it is absent. +func fetchResult(ctx context.Context, client *s3.Client, bucket, key string) (*result, error) { + out, err := client.GetObject(ctx, &s3.GetObjectInput{Bucket: &bucket, Key: &key}) + if err != nil { + if isNotFound(err) { + return nil, errResultNotReady + } + return nil, err + } + defer out.Body.Close() + + data, err := io.ReadAll(out.Body) + if err != nil { + return nil, err + } + var res result + if err := json.Unmarshal(data, &res); err != nil { + return nil, fmt.Errorf("decoding result object: %w", err) + } + return &res, nil +} + +// isNotFound reports whether a GetObject error means the key is absent. +func isNotFound(err error) bool { + var nsk *types.NoSuchKey + if errors.As(err, &nsk) { + return true + } + var apiErr smithy.APIError + if errors.As(err, &apiErr) { + switch apiErr.ErrorCode() { + case "NoSuchKey", "NotFound": + return true + } + } + return false +} + +// appendOutputs appends lines to the GitHub Actions step-output file. +func appendOutputs(path string, lines ...string) error { + f, err := os.OpenFile(path, os.O_APPEND|os.O_WRONLY|os.O_CREATE, 0o644) + if err != nil { + return err + } + defer f.Close() + _, err = fmt.Fprintln(f, strings.Join(lines, "\n")) + return err +} + +// writeTimeoutComment is the no-verdict path: it writes a comment to +// /tmp/timeout-comment.md and records found=false. +func writeTimeoutComment( + ctx context.Context, + runner *ssmRunner, + githubOutput, instanceID string, + resultsTimeout time.Duration, + debugLogLines int, +) error { + var b strings.Builder + fmt.Fprintf(&b, "❌ Load test did not produce results within %.0fs.\n\n", resultsTimeout.Seconds()) + fmt.Fprintf(&b, "Instance: `%s`\n", instanceID) + srv, repo, run := os.Getenv("GITHUB_SERVER_URL"), os.Getenv("GITHUB_REPOSITORY"), os.Getenv("GITHUB_RUN_ID") + if srv != "" && repo != "" && run != "" { + fmt.Fprintf(&b, "Workflow run: %s/%s/actions/runs/%s\n", srv, repo, run) + } + if tail := runner.debugTail(ctx, debugLogLines); tail != "" { + fmt.Fprintf(&b, "\nLast %d lines of /var/log/user-data.log:\n\n```\n%s\n```\n", debugLogLines, tail) + } + if err := os.WriteFile("/tmp/timeout-comment.md", []byte(b.String()), 0o644); err != nil { + return err + } + return appendOutputs(githubOutput, "found=false") +} diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/runner/runner_test.go b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/runner/runner_test.go new file mode 100644 index 000000000..33c394d99 --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/runner/runner_test.go @@ -0,0 +1,44 @@ +package main + +import ( + "encoding/json" + "errors" + "testing" + + "github.com/aws/aws-sdk-go-v2/service/s3/types" + "github.com/aws/smithy-go" + "github.com/stretchr/testify/require" +) + +func TestIsNotFound(t *testing.T) { + require.True(t, isNotFound(&types.NoSuchKey{})) + require.True(t, isNotFound(&smithy.GenericAPIError{Code: "NotFound"})) + require.False(t, isNotFound(&smithy.GenericAPIError{Code: "AccessDenied"})) + require.False(t, isNotFound(errors.New("i/o timeout"))) +} + +// TestResultRoundTrip guards the publisher/poller contract: what publishResult +// writes must decode back to what orchestrate relays. +func TestResultRoundTrip(t *testing.T) { + in := result{ + SchemaVersion: 1, Verdict: "ok", Markdown: "# r", + Bench: json.RawMessage(`{"x":1}`), RunID: "123-1", TargetSHA: "abc", + } + data, err := json.Marshal(in) + require.NoError(t, err) + var out result + require.NoError(t, json.Unmarshal(data, &out)) + require.Equal(t, in, out) +} + +func TestTailWriter(t *testing.T) { + w := &tailWriter{max: 5} + for range 1000 { + _, err := w.Write([]byte("x")) + require.NoError(t, err) + } + _, err := w.Write([]byte("END")) + require.NoError(t, err) + require.Equal(t, "xxEND", w.String()) + require.LessOrEqual(t, len(w.buf), w.max) +} diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/testdata/apply-load-v27-oz.cfg b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/testdata/apply-load-v27-oz.cfg new file mode 100644 index 000000000..c3d6e3ace --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/testdata/apply-load-v27-oz.cfg @@ -0,0 +1,65 @@ +# v27 apply-load config: OZ (custom_token) Soroban scenario, with meta enabled +# so the output is an ingestible LedgerCloseMeta corpus. +# +# Run in a dedicated work dir: +# stellar-core apply-load --conf +# This emits meta.xdr (a stream of LedgerCloseMeta). The benchmark closes +# exactly APPLY_LOAD_NUM_LEDGERS ledgers at the end, so the *last* +# APPLY_LOAD_NUM_LEDGERS frames of meta.xdr are the corpus. +# +# Scenario profile: 900 soroban (custom token transfer) txs/ledger + 1000 +# classic payment txs/ledger. + +# Select the apply-load mode and benchmark model transaction. +APPLY_LOAD_MODE="benchmark" +APPLY_LOAD_MODEL_TX="custom_token" + +# Perf knobs (do not affect generated transaction content). +APPLY_LOAD_TIME_WRITES = true +DISABLE_SOROBAN_METRICS_FOR_TESTING = true + +# --- Meta ENABLED (this is what makes the output ingestible) --- +# NB: do NOT set DISABLE_TX_META_FOR_TESTING; BUILD_TESTS forces tx meta on, +# which we want. LOG_FILE_PATH="" sends logs to stdout. +METADATA_OUTPUT_STREAM = 'meta.xdr' +METADATA_DEBUG_LEDGERS = 0 +LOG_FILE_PATH = "" + +# Soroban txs per ledger for this scenario. Custom token has no batching, so +# APPLY_LOAD_MAX_SOROBAN_TX_COUNT is the soroban TPL directly. +APPLY_LOAD_MAX_SOROBAN_TX_COUNT = 900 + +# Classic payment txs per ledger (same for all three scenarios). +APPLY_LOAD_CLASSIC_TXS_PER_LEDGER = 1000 + +# Number of parallel transaction clusters. +APPLY_LOAD_LEDGER_MAX_DEPENDENT_TX_CLUSTERS = 1 + +# Number of benchmark ledgers to close. +APPLY_LOAD_NUM_LEDGERS = 1000 + +# Disable bucket list pre-generation (not needed for benchmark mode). +APPLY_LOAD_BL_SIMULATED_LEDGERS = 0 +APPLY_LOAD_BL_WRITE_FREQUENCY = 0 +APPLY_LOAD_BL_BATCH_SIZE = 0 +APPLY_LOAD_BL_LAST_BATCH_SIZE = 0 +APPLY_LOAD_BL_LAST_BATCH_LEDGERS = 0 + +# Common apply-load boilerplate. +ARTIFICIALLY_GENERATE_LOAD_FOR_TESTING=true +ENABLE_SOROBAN_DIAGNOSTIC_EVENTS = false +# Must cover MAX_SOROBAN_TX_COUNT*2 + CLASSIC_TXS_PER_LEDGER (~2800 here). +GENESIS_TEST_ACCOUNT_COUNT = 21000 + +# Minimal core config boilerplate. +UNSAFE_QUORUM=true +HTTP_PORT=11627 +# NB: the apply-load command hard-overrides NETWORK_PASSPHRASE to "Apply Load" +# (stellar-core CommandLine.cpp runApplyLoad), so this value is cosmetic and +# CANNOT be used to make tx hashes differ between scenarios. +NETWORK_PASSPHRASE="Apply Load" +NODE_SEED="SDQVDISRYN2JXBS7ICL7QJAEKB3HWBJFP2QECXG7GZICAHBK4UNJCWK2 self" + +[QUORUM_SET] +THRESHOLD_PERCENT=100 +VALIDATORS=["$self"] diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/testdata/apply-load-v27-sac.cfg b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/testdata/apply-load-v27-sac.cfg new file mode 100644 index 000000000..31a2d2414 --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/testdata/apply-load-v27-sac.cfg @@ -0,0 +1,82 @@ +# v27 apply-load config: SAC Soroban scenario, with meta enabled so the output +# is an ingestible LedgerCloseMeta corpus. +# +# Run in a dedicated work dir: +# stellar-core apply-load --conf +# This emits meta.xdr (a stream of LedgerCloseMeta). The benchmark closes +# exactly APPLY_LOAD_NUM_LEDGERS ledgers at the end, so the *last* +# APPLY_LOAD_NUM_LEDGERS frames of meta.xdr are the corpus. +# +# Scenario profile: 1000 soroban (SAC transfer) txs/ledger + 1000 classic +# payment txs/ledger. + +# Select the apply-load mode and benchmark model transaction. +APPLY_LOAD_MODE="benchmark" +APPLY_LOAD_MODEL_TX="sac" + +# Perf knobs (do not affect generated transaction content). +APPLY_LOAD_TIME_WRITES = true +DISABLE_SOROBAN_METRICS_FOR_TESTING = true + +# --- Meta ENABLED (this is what makes the output ingestible) --- +# NB: do NOT set DISABLE_TX_META_FOR_TESTING; BUILD_TESTS forces tx meta on, +# which we want. LOG_FILE_PATH="" sends logs to stdout. +METADATA_OUTPUT_STREAM = 'meta.xdr' +METADATA_DEBUG_LEDGERS = 0 +LOG_FILE_PATH = "" + +# Soroban txs per ledger for this scenario. For SAC, the number of tx envelopes +# is APPLY_LOAD_MAX_SOROBAN_TX_COUNT / APPLY_LOAD_BATCH_SAC_COUNT, so batch=1 +# makes MAX_SOROBAN_TX_COUNT the soroban TPL directly. +APPLY_LOAD_MAX_SOROBAN_TX_COUNT = 1000 +APPLY_LOAD_BATCH_SAC_COUNT = 1 + +# Classic payment txs per ledger (same for all three scenarios). +APPLY_LOAD_CLASSIC_TXS_PER_LEDGER = 1000 + +# Keep classic payments DISJOINT from the other scenarios' bundles. +# Classic payment source accounts are the window +# [mNumAccounts - CLASSIC_TXS, mNumAccounts) +# where for SAC benchmark mode +# mNumAccounts = MAX_SOROBAN_TX_COUNT * SOROBAN_TRANSACTION_QUEUE_SIZE_MULTIPLIER +# + CLASSIC_TXS + 2. +# Test-account keypairs are deterministic per index and apply-load always +# hashes with the fixed "Apply Load" passphrase, so overlapping windows yield +# byte-identical classic payments (colliding tx hashes) across scenario +# bundles. With the default multiplier (2) the sac window [2002,3002) overlaps +# oz's [1800,2800). Multiplier 3 shifts sac to [3002,4002), disjoint from oz +# [1800,2800) and soroswap [251,1251). +# (BUILD_TESTS-only key; sets SOROBAN_TRANSACTION_QUEUE_SIZE_MULTIPLIER.) +SOROBAN_TRANSACTION_QUEUE_SIZE_MULTIPLIER_FOR_TESTING = 3 + +# Number of parallel transaction clusters. +APPLY_LOAD_LEDGER_MAX_DEPENDENT_TX_CLUSTERS = 1 + +# Number of benchmark ledgers to close. +APPLY_LOAD_NUM_LEDGERS = 1000 + +# Disable bucket list pre-generation (not needed for benchmark mode). +APPLY_LOAD_BL_SIMULATED_LEDGERS = 0 +APPLY_LOAD_BL_WRITE_FREQUENCY = 0 +APPLY_LOAD_BL_BATCH_SIZE = 0 +APPLY_LOAD_BL_LAST_BATCH_SIZE = 0 +APPLY_LOAD_BL_LAST_BATCH_LEDGERS = 0 + +# Common apply-load boilerplate. +ARTIFICIALLY_GENERATE_LOAD_FOR_TESTING=true +ENABLE_SOROBAN_DIAGNOSTIC_EVENTS = false +# Must cover MAX_SOROBAN_TX_COUNT*3 + CLASSIC_TXS_PER_LEDGER + 2 (~4002 here). +GENESIS_TEST_ACCOUNT_COUNT = 21000 + +# Minimal core config boilerplate. +UNSAFE_QUORUM=true +HTTP_PORT=11626 +# NB: the apply-load command hard-overrides NETWORK_PASSPHRASE to "Apply Load" +# (stellar-core CommandLine.cpp runApplyLoad), so this value is cosmetic and +# CANNOT be used to make tx hashes differ between scenarios. +NETWORK_PASSPHRASE="Apply Load" +NODE_SEED="SDQVDISRYN2JXBS7ICL7QJAEKB3HWBJFP2QECXG7GZICAHBK4UNJCWK2 self" + +[QUORUM_SET] +THRESHOLD_PERCENT=100 +VALIDATORS=["$self"] diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/testdata/apply-load-v27-soroswap.cfg b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/testdata/apply-load-v27-soroswap.cfg new file mode 100644 index 000000000..3e18bee7d --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/load-test/testdata/apply-load-v27-soroswap.cfg @@ -0,0 +1,65 @@ +# v27 apply-load config: Soroswap Soroban scenario, with meta enabled so the +# output is an ingestible LedgerCloseMeta corpus. +# +# Run in a dedicated work dir: +# stellar-core apply-load --conf +# This emits meta.xdr (a stream of LedgerCloseMeta). The benchmark closes +# exactly APPLY_LOAD_NUM_LEDGERS ledgers at the end, so the *last* +# APPLY_LOAD_NUM_LEDGERS frames of meta.xdr are the corpus. +# +# Scenario profile: 250 soroban (Soroswap swap) txs/ledger + 1000 classic +# payment txs/ledger. + +# Select the apply-load mode and benchmark model transaction. +APPLY_LOAD_MODE="benchmark" +APPLY_LOAD_MODEL_TX="soroswap" + +# Perf knobs (do not affect generated transaction content). +APPLY_LOAD_TIME_WRITES = true +DISABLE_SOROBAN_METRICS_FOR_TESTING = true + +# --- Meta ENABLED (this is what makes the output ingestible) --- +# NB: do NOT set DISABLE_TX_META_FOR_TESTING; BUILD_TESTS forces tx meta on, +# which we want. LOG_FILE_PATH="" sends logs to stdout. +METADATA_OUTPUT_STREAM = 'meta.xdr' +METADATA_DEBUG_LEDGERS = 0 +LOG_FILE_PATH = "" + +# Soroban txs per ledger for this scenario. Soroswap has no batching, so +# APPLY_LOAD_MAX_SOROBAN_TX_COUNT is the soroban TPL directly. +APPLY_LOAD_MAX_SOROBAN_TX_COUNT = 250 + +# Classic payment txs per ledger (same for all three scenarios). +APPLY_LOAD_CLASSIC_TXS_PER_LEDGER = 1000 + +# Number of parallel transaction clusters. +APPLY_LOAD_LEDGER_MAX_DEPENDENT_TX_CLUSTERS = 1 + +# Number of benchmark ledgers to close. +APPLY_LOAD_NUM_LEDGERS = 1000 + +# Disable bucket list pre-generation (not needed for benchmark mode). +APPLY_LOAD_BL_SIMULATED_LEDGERS = 0 +APPLY_LOAD_BL_WRITE_FREQUENCY = 0 +APPLY_LOAD_BL_BATCH_SIZE = 0 +APPLY_LOAD_BL_LAST_BATCH_SIZE = 0 +APPLY_LOAD_BL_LAST_BATCH_LEDGERS = 0 + +# Common apply-load boilerplate. +ARTIFICIALLY_GENERATE_LOAD_FOR_TESTING=true +ENABLE_SOROBAN_DIAGNOSTIC_EVENTS = false +# Must cover MAX_SOROBAN_TX_COUNT + 1 + CLASSIC_TXS_PER_LEDGER (~1251 here). +GENESIS_TEST_ACCOUNT_COUNT = 21000 + +# Minimal core config boilerplate. +UNSAFE_QUORUM=true +HTTP_PORT=11628 +# NB: the apply-load command hard-overrides NETWORK_PASSPHRASE to "Apply Load" +# (stellar-core CommandLine.cpp runApplyLoad), so this value is cosmetic and +# CANNOT be used to make tx hashes differ between scenarios. +NETWORK_PASSPHRASE="Apply Load" +NODE_SEED="SDQVDISRYN2JXBS7ICL7QJAEKB3HWBJFP2QECXG7GZICAHBK4UNJCWK2 self" + +[QUORUM_SET] +THRESHOLD_PERCENT=100 +VALIDATORS=["$self"] diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/test.go b/cmd/stellar-rpc/internal/integrationtest/infrastructure/test.go index a886540f8..b4fb4c91d 100644 --- a/cmd/stellar-rpc/internal/integrationtest/infrastructure/test.go +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/test.go @@ -6,9 +6,12 @@ import ( "context" "crypto/sha256" "embed" + "encoding/json" "errors" "fmt" "net" + "net/http" + "net/http/httptest" "os" "os/exec" "os/signal" @@ -29,6 +32,7 @@ import ( client "github.com/stellar/go-stellar-sdk/clients/rpcclient" "github.com/stellar/go-stellar-sdk/clients/stellarcore" + "github.com/stellar/go-stellar-sdk/historyarchive" "github.com/stellar/go-stellar-sdk/keypair" protocol "github.com/stellar/go-stellar-sdk/protocols/rpc" proto "github.com/stellar/go-stellar-sdk/protocols/stellarcore" @@ -73,7 +77,8 @@ type TestOnlyRPCConfig struct { } type TestConfig struct { - ProtocolVersion int32 + ProtocolVersion int32 + NetworkPassphrase string // Run a previously released version of RPC (in a container) instead of the current version UseReleasedRPCVersion string // Use/Reuse a SQLite file path @@ -94,6 +99,14 @@ type TestConfig struct { DelayDaemonForLedgerN int // don't start daemon until ledger N reached by core DatastoreConfigFunc func(*config.Config) + + // LoadTest mode swaps the daemon's ingestion from captive-core to a synthetic + // ledger stream and skips all the captive-core/history-archive scaffolding. + IngestLoadTest config.IngestLoadTestConfig + + // HistoryRetentionWindow overrides the daemon's retention window. Zero + // uses the harness default (config.OneDayOfLedgers). + HistoryRetentionWindow uint32 } type TestCorePorts struct { @@ -120,6 +133,9 @@ type Test struct { protocolVersion int32 + historyRetentionWindow uint32 + networkPassphrase string + rpcConfigFilesDir string sqlitePath string @@ -143,6 +159,9 @@ type Test struct { ignoreLedgerCloseTimes bool datastoreConfigFunc func(*config.Config) + + ingestLoadTest config.IngestLoadTestConfig + fakeArchiveURL string } //nolint:cyclop @@ -152,6 +171,7 @@ func NewTest(t testing.TB, cfg *TestConfig) *Test { } i := &Test{t: t} + i.networkPassphrase = StandaloneNetworkPassphrase i.masterAccount = &txnbuild.SimpleAccount{ AccountID: i.MasterKey().Address(), Sequence: 0, @@ -167,6 +187,16 @@ func NewTest(t testing.TB, cfg *TestConfig) *Test { parallel = !cfg.NoParallel i.datastoreConfigFunc = cfg.DatastoreConfigFunc i.ignoreLedgerCloseTimes = cfg.IgnoreLedgerCloseTimes + i.ingestLoadTest = cfg.IngestLoadTest + i.historyRetentionWindow = cfg.HistoryRetentionWindow + if i.ingestLoadTest.Enabled() { + // apply-load ledgers have close time of 1970-01-01 + i.ignoreLedgerCloseTimes = true + } + + if cfg.NetworkPassphrase != "" { + i.networkPassphrase = cfg.NetworkPassphrase + } if cfg.OnlyRPC != nil { i.onlyRPC = true @@ -210,10 +240,15 @@ func NewTest(t testing.TB, cfg *TestConfig) *Test { i.rpcConfigFilesDir = i.t.TempDir() i.prepareShutdownHandlers() + if i.ingestLoadTest.Enabled() { + i.fakeArchiveURL = i.startFakeHistoryArchive() + } if i.areThereContainers() { i.spawnContainers() } - if !i.onlyRPC { + + // skipped in load test mode because it doesn't use a live core + if !i.onlyRPC && !i.ingestLoadTest.Enabled() { i.coreClient = &stellarcore.Client{URL: "http://" + i.testPorts.CoreHTTPHostPort} i.waitForCore() i.waitForCheckpoint() @@ -231,12 +266,40 @@ func NewTest(t testing.TB, cfg *TestConfig) *Test { i.waitForRPC() } - i.upgradeLimits() // upgrades need preflight so need RPC up + if !i.ingestLoadTest.Enabled() { + i.upgradeLimits() // upgrades need preflight so need RPC up + } return i } +// startFakeHistoryArchive serves a minimal .well-known/stellar-history.json. +// Ingestion consults it only when the DB is empty (to pick a start ledger), +// so a constant CurrentLedger of 1 starts the synthetic stream at ledger 1; +// non-empty DBs derive their start from the DB itself and never read this. +func (i *Test) startFakeHistoryArchive() string { + i.t.Helper() + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path != "/.well-known/stellar-history.json" { + http.NotFound(w, r) + return + } + body, err := json.Marshal(historyarchive.HistoryArchiveState{ + Version: 1, + Server: "stellar-rpc-loadtest-fake", + NetworkPassphrase: i.networkPassphrase, + CurrentLedger: 1, + }) + require.NoError(i.t, err) + w.Header().Set("Content-Type", "application/json") + _, _ = w.Write(body) + })) + i.t.Cleanup(srv.Close) + return srv.URL +} + func (i *Test) areThereContainers() bool { - return i.runRPCInContainer() || !i.onlyRPC + // Load-test mode bypasses captive-core, so it needs no containers. + return !i.ingestLoadTest.Enabled() && (i.runRPCInContainer() || !i.onlyRPC) } func (i *Test) spawnContainers() { @@ -357,26 +420,49 @@ func (i *Test) getRPConfigForContainer() rpcConfig { archiveURL: fmt.Sprintf("http://%s:%d", inContainerCoreHostname, inContainerCoreArchivePort), sqlitePath: "/db/" + filepath.Base(i.sqlitePath), captiveCoreHTTPQueryPort: i.testPorts.captiveCoreHTTPQueryPort, + networkPassphrase: i.networkPassphrase, + logLevel: "debug", + historyRetentionWindow: i.historyRetentionWindow, + } +} + +// findCoreBinary returns the stellar-core binary to use: +// STELLAR_RPC_INTEGRATION_TESTS_CAPTIVE_CORE_BIN if set, otherwise +// "stellar-core" from PATH. +func findCoreBinary(t testing.TB) string { + if coreBinaryPath := os.Getenv("STELLAR_RPC_INTEGRATION_TESTS_CAPTIVE_CORE_BIN"); coreBinaryPath != "" { + return coreBinaryPath } + coreBinaryPath, err := exec.LookPath("stellar-core") + require.NoError(t, err, "stellar-core not found in PATH and STELLAR_RPC_INTEGRATION_TESTS_CAPTIVE_CORE_BIN unset") + return coreBinaryPath } func (i *Test) getRPConfigForDaemon() rpcConfig { - coreBinaryPath := os.Getenv("STELLAR_RPC_INTEGRATION_TESTS_CAPTIVE_CORE_BIN") - if coreBinaryPath == "" { - i.t.Fatal("missing STELLAR_RPC_INTEGRATION_TESTS_CAPTIVE_CORE_BIN") + stellarCoreURL := "http://" + i.testPorts.CoreHTTPHostPort + archiveURL := "http://" + i.testPorts.CoreArchiveHostPort + logLevel := "debug" + if i.ingestLoadTest.Enabled() { + stellarCoreURL = "http://localhost:0" // unreachable + unused in load test mode, must be not empty + archiveURL = i.fakeArchiveURL + // warn makes the benchmark an "ingest-only" metric, reduces I/O noise drastically + logLevel = "warn" } return rpcConfig{ // Allocate port dynamically and then figure out what the port is endPoint: "localhost:0", adminEndpoint: "localhost:0", - stellarCoreURL: "http://" + i.testPorts.CoreHTTPHostPort, - coreBinaryPath: coreBinaryPath, + stellarCoreURL: stellarCoreURL, + coreBinaryPath: findCoreBinary(i.t), captiveCoreConfigPath: path.Join(i.rpcConfigFilesDir, captiveCoreConfigFilename), captiveCoreStoragePath: i.captiveCoreStoragePath, - archiveURL: "http://" + i.testPorts.CoreArchiveHostPort, + archiveURL: archiveURL, sqlitePath: i.sqlitePath, captiveCoreHTTPQueryPort: i.testPorts.captiveCoreHTTPQueryPort, ignoreLedgerCloseTimes: i.ignoreLedgerCloseTimes, + networkPassphrase: i.networkPassphrase, + logLevel: logLevel, + historyRetentionWindow: i.historyRetentionWindow, } } @@ -392,6 +478,9 @@ type rpcConfig struct { archiveURL string sqlitePath string ignoreLedgerCloseTimes bool + networkPassphrase string + logLevel string + historyRetentionWindow uint32 } func (vars rpcConfig) toMap() map[string]string { @@ -400,6 +489,10 @@ func (vars rpcConfig) toMap() map[string]string { // If we're ignoring close times, permit absurdly high latencies maxHealthyLedgerLatency = time.Duration(1<<63 - 1).String() } + retentionWindow := strconv.Itoa(config.OneDayOfLedgers) + if vars.historyRetentionWindow > 0 { + retentionWindow = strconv.FormatUint(uint64(vars.historyRetentionWindow), 10) + } return map[string]string{ "ENDPOINT": vars.endPoint, "ADMIN_ENDPOINT": vars.adminEndpoint, @@ -414,12 +507,12 @@ func (vars rpcConfig) toMap() map[string]string { "STELLAR_CAPTIVE_CORE_HTTP_QUERY_SNAPSHOT_LEDGERS": "1", "EMIT_CLASSIC_EVENTS": "true", "FRIENDBOT_URL": FriendbotURL, - "NETWORK_PASSPHRASE": StandaloneNetworkPassphrase, + "NETWORK_PASSPHRASE": vars.networkPassphrase, "HISTORY_ARCHIVE_URLS": vars.archiveURL, - "LOG_LEVEL": "debug", + "LOG_LEVEL": vars.logLevel, "DB_PATH": vars.sqlitePath, "INGESTION_TIMEOUT": "10m", - "HISTORY_RETENTION_WINDOW": strconv.Itoa(config.OneDayOfLedgers), + "HISTORY_RETENTION_WINDOW": retentionWindow, "CHECKPOINT_FREQUENCY": strconv.Itoa(checkpointFrequency), "MAX_HEALTHY_LEDGER_LATENCY": maxHealthyLedgerLatency, "PREFLIGHT_ENABLE_DEBUG": "true", @@ -555,6 +648,10 @@ func (i *Test) createRPCDaemon(c rpcConfig) *daemon.Daemon { i.datastoreConfigFunc(&cfg) } + if i.ingestLoadTest.Enabled() { + cfg.IngestLoadTest = i.ingestLoadTest + } + logger := supportlog.New() logger.SetOutput(newTestLogWriter(i.t, `rpc="daemon" `)) logger.SetExitFunc(func(code int) { diff --git a/cmd/stellar-rpc/internal/integrationtest/ingest_loadtest_test.go b/cmd/stellar-rpc/internal/integrationtest/ingest_loadtest_test.go new file mode 100644 index 000000000..5c27481e8 --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/ingest_loadtest_test.go @@ -0,0 +1,664 @@ +package integrationtest + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "maps" + "os" + "path/filepath" + "sort" + "strconv" + "strings" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + rpcclient "github.com/stellar/go-stellar-sdk/clients/rpcclient" + "github.com/stellar/go-stellar-sdk/ingest/loadtest" + protocol "github.com/stellar/go-stellar-sdk/protocols/rpc" + "github.com/stellar/go-stellar-sdk/xdr" + + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/config" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/db" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/integrationtest/infrastructure" +) + +const ( + defaultLedgerBundlePath = "./infrastructure/testdata/load-test-ledgers-v27-sac.xdr.zstd" + + // applyLoadNetworkPassphrase is the network apply-load hard-overrides every bundle to. + applyLoadNetworkPassphrase = "Apply Load" // needed by RPC for ingest + ingestStallTimeout = 3 * time.Minute // rate should = ~1 ledger/sec +) + +// TestIngestSyntheticLedgers replays apply-load-generated ledger bundles through RPC +// ingestion and asserts the resulting DB matches the workloads that produced them. +// Bundles are built offline by stellar-core apply-load and fetched from S3. +// +// Requires STELLAR_RPC_INTEGRATION_TESTS_ENABLED=true. Optional: LOADTEST_SQLITE_PATH +// (DB to ingest into; empty = fresh tmp DB), comma-separated LOADTEST_INGEST_LEDGER_PATH +// (bundles, replayed in order), and LOADTEST_MAX_LEDGERS_PER_FILE to cap ledgers per bundle. +func TestIngestSyntheticLedgers(t *testing.T) { + if os.Getenv("STELLAR_RPC_INTEGRATION_TESTS_ENABLED") != "true" { + t.Skip("STELLAR_RPC_INTEGRATION_TESTS_ENABLED not set, skipping test") + } + + ledgerPaths := splitPathList(os.Getenv("LOADTEST_INGEST_LEDGER_PATH")) + if len(ledgerPaths) == 0 { + ledgerPaths = []string{defaultLedgerBundlePath} + } + for _, p := range ledgerPaths { + if _, err := os.Stat(p); err != nil { + t.Skipf("no generated ledger bundle at %q; generate one with stellar-core apply-load "+ + "(or set LOADTEST_INGEST_LEDGER_PATH)", p) + } + } + sqlitePath := os.Getenv("LOADTEST_SQLITE_PATH") + + maxPerFile := maxLedgersPerFile(t) + segments, total := buildSegments(t, ledgerPaths, maxPerFile) + prof := ingestProfile{networkPassphrase: applyLoadNetworkPassphrase, totalLedgers: total, segments: segments} + + runIngestPhase(t, sqlitePath, ledgerPaths, prof, maxPerFile) +} + +// runIngestPhase boots an RPC daemon ingesting from the bundles, waits for it to +// catch up to the last synthetic ledger, then verifies the range via getTransactions. +func runIngestPhase(t *testing.T, sqlitePath string, ledgerPaths []string, prof ingestProfile, maxPerFile uint32) { + t.Helper() + + // Empty path -> fresh tmp DB. + if sqlitePath == "" { + sqlitePath = filepath.Join(t.TempDir(), "stellar-rpc.sqlite") + } + + // Read the pre-test bounds and close the handle immediately: holding an + // idle second SQLite connection through the benchmark invites what-ifs. + sdb, err := db.OpenSQLiteDB(sqlitePath) + require.NoError(t, err) + preTestLast, initialCount, err := getLedgerBounds(t.Context(), sdb) + require.NoError(t, sdb.Close()) + require.NoError(t, err) + require.True(t, initialCount == 0 || initialCount >= prof.totalLedgers, + "seed DB has %d ledgers but the corpus is %d; retention window %d will trim "+ + "the early synthetic ledgers before verification", initialCount, prof.totalLedgers, initialCount) + + // Synthetic ledgers append past the DB's pre-test latest (empty DB -> startSeq 1). + startSeq := preTestLast + 1 + endSeq := startSeq + prof.totalLedgers - 1 + + // The daemon fires OnLedgerIngested per committed ledger and the recorder captures + // exact per-ledger ingest durations (no polling, no quantization). + rec := newIngestRecorder(startSeq, endSeq) + i := infrastructure.NewTest(t, &infrastructure.TestConfig{ + NetworkPassphrase: prof.networkPassphrase, + SQLitePath: sqlitePath, + HistoryRetentionWindow: initialCount, + IngestLoadTest: config.IngestLoadTestConfig{ + Files: ledgerPaths, + Frequency: ingestFrequency(t), + MaxLedgersPerFile: maxPerFile, + OnLedgerIngested: rec.onLedger, + }, + }) + startedAt := time.Now().UTC() + client := i.GetRPCLient() + + awaitIngest(t, rec) + finishedAt := time.Now().UTC() + // elapsed runs from the first to the last synthetic ledger's commit, excluding + // the backend's one-time corpus preprocessing (not the code under test). + elapsed := rec.elapsed() + t.Logf("Ingested %d ledgers in %s", prof.totalLedgers, elapsed) + + verifyLedgerRange(t, sqlitePath, startSeq, endSeq, prof.totalLedgers) // check every ledger present/contiguous + verifyOpsSample(t, client, startSeq, prof.segments) // sample to verify a mixed workload + + versionInfo, err := client.GetVersionInfo(t.Context()) + require.NoError(t, err) + + emitPerfReport(t, perfReportInput{ + startedAt: startedAt, + finishedAt: finishedAt, + ledgerCount: prof.totalLedgers, + initialLedgers: initialCount, + durations: rec.snapshotDurations(), + elapsed: elapsed, + startSeq: startSeq, + segments: prof.segments, + captiveCoreVersion: versionInfo.CaptiveCoreVersion, + }) +} + +// ingestFrequency is the backend's ledger emit rate (LOADTEST_INGEST_FREQUENCY, default 1ms). +func ingestFrequency(t *testing.T) time.Duration { + t.Helper() + v := os.Getenv("LOADTEST_INGEST_FREQUENCY") + if v == "" { + return time.Millisecond + } + d, err := time.ParseDuration(v) + require.NoError(t, err, "invalid LOADTEST_INGEST_FREQUENCY") + return d +} + +// maxLedgersPerFile is the per-bundle ceiling on replayed ledgers +// (LOADTEST_MAX_LEDGERS_PER_FILE, default 0 = every ledger in every file). +func maxLedgersPerFile(t *testing.T) uint32 { + t.Helper() + v := os.Getenv("LOADTEST_MAX_LEDGERS_PER_FILE") + if v == "" { + return 0 + } + n, err := strconv.ParseUint(v, 10, 32) + require.NoError(t, err, "invalid LOADTEST_MAX_LEDGERS_PER_FILE") + return uint32(n) +} + +// ingestRecorder collects exact per-ledger ingest durations from the daemon's +// OnLedgerIngested hook. It records only sequences in [startSeq, endSeq] and +// closes done once endSeq commits. +type ingestRecorder struct { + startSeq, endSeq uint32 + done chan struct{} + + mu sync.Mutex + durations map[uint32]time.Duration + firstAt time.Time // first in-range commit, for elapsed + lastAt time.Time // endSeq commit, for elapsed + latestSeq uint32 + lastProgressAt time.Time // for stall detection +} + +func newIngestRecorder(startSeq, endSeq uint32) *ingestRecorder { + return &ingestRecorder{ + startSeq: startSeq, + endSeq: endSeq, + done: make(chan struct{}), + durations: make(map[uint32]time.Duration, endSeq-startSeq+1), + } +} + +// onLedger is the OnLedgerIngested hook. It runs on the ingest loop, so it stays +// to a map insert under a short lock. Duration is measured before the call. +func (r *ingestRecorder) onLedger(seq uint32, d time.Duration) { + if seq < r.startSeq || seq > r.endSeq { + return + } + now := time.Now() + r.mu.Lock() + defer r.mu.Unlock() + r.durations[seq] = d + r.latestSeq = seq + r.lastProgressAt = now + if r.firstAt.IsZero() { + r.firstAt = now + } + if seq == r.endSeq { + r.lastAt = now + close(r.done) + } +} + +// elapsed is the wall-clock from the first to the last synthetic ledger's commit. +func (r *ingestRecorder) elapsed() time.Duration { + r.mu.Lock() + defer r.mu.Unlock() + return r.lastAt.Sub(r.firstAt) +} + +// snapshotDurations returns a copy of the per-ledger durations for offline use. +func (r *ingestRecorder) snapshotDurations() map[uint32]time.Duration { + r.mu.Lock() + defer r.mu.Unlock() + out := make(map[uint32]time.Duration, len(r.durations)) + maps.Copy(out, r.durations) + return out +} + +// progress reports time since the last ledger, the latest sequence, and whether +// any ledger has been seen yet. +func (r *ingestRecorder) progress() (time.Duration, uint32, bool) { + r.mu.Lock() + defer r.mu.Unlock() + if r.lastProgressAt.IsZero() { + return 0, r.startSeq - 1, false + } + return time.Since(r.lastProgressAt), r.latestSeq, true +} + +// awaitIngest blocks until the recorder sees endSeq committed. It fails fast if +// ingestion stalls (ingestStallTimeout with no progress) and gives up at +// LOADTEST_INGEST_DEADLINE (default 30m). +func awaitIngest(t *testing.T, rec *ingestRecorder) { + t.Helper() + + ingestDeadline := 30 * time.Minute + if v := os.Getenv("LOADTEST_INGEST_DEADLINE"); v != "" { + var err error + ingestDeadline, err = time.ParseDuration(v) + require.NoError(t, err, "invalid LOADTEST_INGEST_DEADLINE") + } + deadline := time.After(ingestDeadline) + check := time.NewTicker(10 * time.Second) + defer check.Stop() + + for { + select { + case <-rec.done: + return + case <-deadline: + _, seq, _ := rec.progress() + t.Fatalf("RPC only ingested through ledger %d; wanted %d within %s", seq, rec.endSeq, ingestDeadline) + case <-check.C: + if since, seq, started := rec.progress(); started && since >= ingestStallTimeout { + t.Fatalf("ingestion stalled at ledger %d for %s; wanted endSeq %d", + seq, ingestStallTimeout, rec.endSeq) + } + } + } +} + +// verifyLedgerRange asserts the DB holds exactly expected synthetic ledgers across +// [startSeq, endSeq]; count==span with matching bounds confirms no gaps. +func verifyLedgerRange(t *testing.T, sqlitePath string, startSeq, endSeq, expected uint32) { + t.Helper() + sdb, err := db.OpenSQLiteDB(sqlitePath) + require.NoError(t, err) + count, lo, hi, err := db.NewLedgerReader(sdb).GetLedgerCountInRange(t.Context(), startSeq, endSeq) + require.NoError(t, sdb.Close()) + require.NoError(t, err) + require.Equal(t, expected, count, "want %d ledgers in [%d,%d], got %d", expected, startSeq, endSeq, count) + require.Equal(t, startSeq, lo, "first synthetic ledger") + require.Equal(t, endSeq, hi, "last synthetic ledger") +} + +// verifyOpsSample walks every tx of a few ledgers per segment to confirm the corpus is a real mixed workload +// and that every sampled ledger carries ops of mixed activity. +func verifyOpsSample(t *testing.T, client *rpcclient.Client, startSeq uint32, segments []profileSegment) { + t.Helper() + lo := startSeq + for _, seg := range segments { + totalClassic, totalSoroban := 0, 0 + hi := lo + seg.ledgers - 1 + for _, seq := range []uint32{lo, lo + (seg.ledgers-1)/2, hi} { + classic, soroban, err := walkTransactionRange(t.Context(), client, seq, seq, 200) + require.NoError(t, err, "sampling ledger %d (%s)", seq, seg.name) + require.Positive(t, classic+soroban, "ledger %d (%s) has no ops", seq, seg.name) + totalClassic += classic + totalSoroban += soroban + } + lo = hi + 1 + require.Positive(t, totalClassic, "no classic ops sampled across the segment %s", seg.name) + require.Positive(t, totalSoroban, "no Soroban ops sampled across the segment %s", seg.name) + } +} + +// walkTransactionRange pages through getTransactions for ledgers in [lo, hi] and +// counts classic Payment and Soroban ops. Pages are retried so one transient RPC +// error doesn't abort a walk of thousands of pages. +func walkTransactionRange( + ctx context.Context, client *rpcclient.Client, lo, hi uint32, pageLimit uint, +) (int, int, error) { + const pageAttempts = 3 + var countClassic, countSoroban int + cursor := "" + for { + // Empty Cursor (omitempty) covers both the first and subsequent pages. + req := protocol.GetTransactionsRequest{ + Format: protocol.FormatBase64, + Pagination: &protocol.LedgerPaginationOptions{Cursor: cursor, Limit: pageLimit}, + } + if cursor == "" { + req.StartLedger = lo + } + + var resp protocol.GetTransactionsResponse + var err error + for attempt := 1; ; attempt++ { + resp, err = client.GetTransactions(ctx, req) + if err == nil || attempt >= pageAttempts { + break + } + select { + case <-ctx.Done(): + return countClassic, countSoroban, ctx.Err() + case <-time.After(time.Second): + } + } + if err != nil { + return countClassic, countSoroban, err + } + if len(resp.Transactions) == 0 { + return countClassic, countSoroban, nil + } + + for _, tx := range resp.Transactions { + if tx.Ledger > hi { + return countClassic, countSoroban, nil + } + var env xdr.TransactionEnvelope + if err := xdr.SafeUnmarshalBase64(tx.EnvelopeXDR, &env); err != nil { + return countClassic, countSoroban, fmt.Errorf("unmarshalling tx envelope: %w", err) + } + c, s := countOps(env) + countClassic += c + countSoroban += s + } + if resp.Cursor == "" { + return countClassic, countSoroban, nil + } + cursor = resp.Cursor + } +} + +// countOps counts classic Payment and Soroban ops in one envelope. +func countOps(env xdr.TransactionEnvelope) (int, int) { + var classic, soroban int + for _, op := range env.Operations() { + switch op.Body.Type { + case xdr.OperationTypePayment: + classic++ + case xdr.OperationTypeInvokeHostFunction: + soroban++ + default: + } + } + return classic, soroban +} + +// getLedgerBounds returns the DB's latest ledger sequence and its ledger count. +func getLedgerBounds(ctx context.Context, sdb *db.DB) (uint32, uint32, error) { + r, err := db.NewLedgerReader(sdb).GetLedgerRange(ctx) + if errors.Is(err, db.ErrEmptyDB) { + return 0, 0, nil // zero values for empty DB + } + return r.LastLedger.Sequence, r.LastLedger.Sequence - r.FirstLedger.Sequence + 1, err +} + +// profileSegment is one bundle's slice of the concatenated ledger stream +type profileSegment struct { + name string // from bundle filename + ledgers uint32 // how many ledgers this bundle contributes +} + +// ingestProfile is the expectation for an ingest run over one or more bundles: +// the shared network passphrase, total ledger count, and per-bundle segments. +type ingestProfile struct { + networkPassphrase string + totalLedgers uint32 + segments []profileSegment +} + +// buildSegments derives one segment per bundle straight from the corpus: the SDK +// counts each file under maxPerFile (the same cap the backend applies), so the +// per-bundle bounds and total exactly match what gets replayed. +func buildSegments(t *testing.T, ledgerPaths []string, maxPerFile uint32) ([]profileSegment, uint32) { + t.Helper() + counts, err := loadtest.CountLedgersPerFile(ledgerPaths, maxPerFile) + require.NoError(t, err) + segments := make([]profileSegment, 0, len(counts)) + var total uint32 + for _, c := range counts { + require.NotZero(t, c.Ledgers, "bundle %s contributed no ledgers", c.Path) + name := strings.TrimSuffix(filepath.Base(c.Path), ".xdr.zstd") + segments = append(segments, profileSegment{name: name, ledgers: c.Ledgers}) + total += c.Ledgers + } + return segments, total +} + +// splitPathList splits a comma-separated list, trimming whitespace and dropping empties. +func splitPathList(s string) []string { + var out []string + for p := range strings.SplitSeq(s, ",") { + if p = strings.TrimSpace(p); p != "" { + out = append(out, p) + } + } + return out +} + +func TestComputeProfilePerf(t *testing.T) { + // Two segments of 3 ledgers starting at seq 10, with exact per-ledger ingest + // durations: segment a at 100ms each, segment b at 200ms each. + durations := make(map[uint32]time.Duration) + for seq := uint32(10); seq <= 12; seq++ { + durations[seq] = 100 * time.Millisecond + } + for seq := uint32(13); seq <= 15; seq++ { + durations[seq] = 200 * time.Millisecond + } + + perf := computeProfilePerf(durations, 10, []profileSegment{ + {name: "a", ledgers: 3}, + {name: "b", ledgers: 3}, + }) + require.Len(t, perf, 2) + + // ms/ledger is the mean of each segment's per-ledger durations; quantiles come + // straight from the samples (no inter-arrival diffing, no quantization). + require.Equal(t, "a", perf[0].Profile) + require.InDelta(t, 100.0, perf[0].MsPerLedger, 1e-9) + require.InDelta(t, 100.0, perf[0].PerLedgerLatencyMs.P50, 1e-9) + require.InDelta(t, 100.0, perf[0].PerLedgerLatencyMs.Max, 1e-9) + + require.Equal(t, "b", perf[1].Profile) + require.InDelta(t, 200.0, perf[1].MsPerLedger, 1e-9) + require.InDelta(t, 200.0, perf[1].PerLedgerLatencyMs.P99, 1e-9) +} + +// --- perf metrics --------------------------------------------------- +// +// When PERF_RESULTS_PATH is set, runIngestPhase writes a JSON report to that +// path summarizing the ingest workload. + +type perfReport struct { + StartedAt string `json:"startedAt"` + FinishedAt string `json:"finishedAt"` + LedgerCount uint32 `json:"ledgerCount"` + // InitialLedgerCount is the DB's pre-corpus ledger count: ingestion cost grows + // with DB size, so runs are only comparable at similar initial sizes. + InitialLedgerCount uint32 `json:"initialLedgerCount"` + + IngestWallClockSec float64 `json:"ingestWallClockSeconds"` // elapsed time from first to last ledger's commit + IngestBusySec float64 `json:"ingestBusySeconds"` // sum of per-ledger ingest durations + LedgersPerSecond float64 `json:"ledgersPerSecond"` + // PerLedgerLatencyMs is the distribution of exact per-ledger ingest durations + // (the daemon's ledger_ingestion_duration_seconds{type=total}), not inter-arrival. + PerLedgerLatencyMs latencyQuantiles `json:"perLedgerLatencyMs"` + Profiles []profilePerf `json:"profiles"` + CaptiveCoreVersion string `json:"captiveCoreVersion"` + + // Markdown-only context read from PERF_* env vars by emitPerfReport; + // excluded from the JSON artifact. + TargetSha string `json:"-"` + RunID string `json:"-"` + Repo string `json:"-"` + GoldenFetchSecs string `json:"-"` +} + +// profilePerf is the per-ledger ingest cost for one bundle's segment. Throughput +// is a system-wide number (backend pacing makes per-segment rate uninformative), +// so it lives only on perfReport, not here. +type profilePerf struct { + Profile string `json:"profile"` + Ledgers uint32 `json:"ledgers"` + MsPerLedger float64 `json:"msPerLedger"` // mean per-ledger ingest duration + PerLedgerLatencyMs latencyQuantiles `json:"perLedgerLatencyMs"` +} + +type latencyQuantiles struct { + P50 float64 `json:"p50"` + P95 float64 `json:"p95"` + P99 float64 `json:"p99"` + Min float64 `json:"min"` + Max float64 `json:"max"` + Mean float64 `json:"mean"` +} + +type perfReportInput struct { + startedAt time.Time + finishedAt time.Time + ledgerCount uint32 + initialLedgers uint32 + durations map[uint32]time.Duration // exact per-ledger ingest time from the hook + elapsed time.Duration // first-to-last commit, for throughput + startSeq uint32 + segments []profileSegment + captiveCoreVersion string +} + +// segmentDurationsMs returns the exact per-ledger ingest durations (ms) for the +// sequences in [lo, hi]. +func segmentDurationsMs(durations map[uint32]time.Duration, lo, hi uint32) []float64 { + out := make([]float64, 0, hi-lo+1) + for seq := lo; seq <= hi; seq++ { + if d, ok := durations[seq]; ok { + out = append(out, float64(d.Microseconds())/1000.0) + } + } + return out +} + +// computeProfilePerf slices the per-ledger durations into per-bundle segments (in +// bundle order) and summarizes each segment's ingest cost. +func computeProfilePerf(durations map[uint32]time.Duration, startSeq uint32, segments []profileSegment) []profilePerf { + out := make([]profilePerf, 0, len(segments)) + lo := startSeq + for _, seg := range segments { + hi := lo + seg.ledgers - 1 + out = append(out, summarizeDurations(seg.name, seg.ledgers, segmentDurationsMs(durations, lo, hi))) + lo = hi + 1 + } + return out +} + +// summarizeDurations builds a profilePerf from per-ledger ingest duration samples (ms). +func summarizeDurations(name string, ledgers uint32, samplesMs []float64) profilePerf { + q := computeQuantiles(samplesMs) + return profilePerf{ + Profile: name, + Ledgers: ledgers, + MsPerLedger: q.Mean, + PerLedgerLatencyMs: q, + } +} + +// emitPerfReport writes the perf report as JSON to PERF_RESULTS_PATH and as +// markdown to PERF_RESULTS_MD_PATH; either or both may be unset. +func emitPerfReport(t *testing.T, in perfReportInput) { + t.Helper() + jsonPath := os.Getenv("PERF_RESULTS_PATH") + mdPath := os.Getenv("PERF_RESULTS_MD_PATH") + if jsonPath == "" && mdPath == "" { + return + } + + overallSamples := segmentDurationsMs(in.durations, in.startSeq, in.startSeq+in.ledgerCount-1) + var busyMs float64 + for _, s := range overallSamples { + busyMs += s + } + var throughput float64 + if in.elapsed > 0 { + throughput = float64(in.ledgerCount) / in.elapsed.Seconds() + } + sha := os.Getenv("PERF_TARGET_SHA") + sha = sha[:min(len(sha), 7)] + report := perfReport{ + StartedAt: in.startedAt.Format(time.RFC3339), + FinishedAt: in.finishedAt.Format(time.RFC3339), + LedgerCount: in.ledgerCount, + InitialLedgerCount: in.initialLedgers, + IngestWallClockSec: in.elapsed.Seconds(), + IngestBusySec: busyMs / 1000, + LedgersPerSecond: throughput, + PerLedgerLatencyMs: computeQuantiles(overallSamples), + Profiles: computeProfilePerf(in.durations, in.startSeq, in.segments), + CaptiveCoreVersion: in.captiveCoreVersion, + TargetSha: sha, + RunID: os.Getenv("PERF_RUN_ID"), + Repo: os.Getenv("PERF_REPO"), + GoldenFetchSecs: os.Getenv("PERF_GOLDEN_FETCH_SECONDS"), + } + + if jsonPath != "" { + data, err := json.MarshalIndent(report, "", " ") + require.NoError(t, err) + require.NoError(t, os.WriteFile(jsonPath, data, 0o644)) + t.Logf("perf report written to %s", jsonPath) + } + if mdPath != "" { + require.NoError(t, os.WriteFile(mdPath, []byte(renderPerfMarkdown(report)), 0o644)) + t.Logf("perf markdown written to %s", mdPath) + } +} + +// renderPerfMarkdown returns the PR-comment table for an ingest run; missing +// context fields render as empty cells. +func renderPerfMarkdown(r perfReport) string { + lines := make([]string, 0, 4+len(r.Profiles)+12) + lines = append(lines, + fmt.Sprintf("### 📈 Ingest load test — `%s`", r.TargetSha), + "", + "| Profile | Ledgers | ms/ledger | p50 / p95 / p99 ms | max ms |", + "|---|---|---|---|---|", + ) + for _, p := range r.Profiles { + lines = append(lines, fmt.Sprintf("| %s | %d | %.3f | %.3f / %.3f / %.3f | %.3f |", + p.Profile, p.Ledgers, p.MsPerLedger, + p.PerLedgerLatencyMs.P50, p.PerLedgerLatencyMs.P95, p.PerLedgerLatencyMs.P99, + p.PerLedgerLatencyMs.Max)) + } + util := 0.0 + if r.IngestWallClockSec > 0 { + util = 100 * r.IngestBusySec / r.IngestWallClockSec + } + lines = append(lines, + "", + "| Metric | Value |", + "|---|---|", + fmt.Sprintf("| Ledgers replayed | %d |", r.LedgerCount), + fmt.Sprintf("| Initial DB ledger count | %d |", r.InitialLedgerCount), + fmt.Sprintf("| Throughput | %.2f ledgers/sec |", r.LedgersPerSecond), + fmt.Sprintf("| Elapsed wall-clock | %.3fs |", r.IngestWallClockSec), + fmt.Sprintf("| Ingest busy-time | %.3fs (%.1f%% utilization) |", r.IngestBusySec, util), + fmt.Sprintf("| Per-ledger p50 / p95 / p99 | %.3f / %.3f / %.3f ms |", + r.PerLedgerLatencyMs.P50, r.PerLedgerLatencyMs.P95, r.PerLedgerLatencyMs.P99), + fmt.Sprintf("| Golden DB fetch+decompress | %ss |", r.GoldenFetchSecs), + fmt.Sprintf("| stellar-core | `%s` |", r.CaptiveCoreVersion), + fmt.Sprintf("| Workflow run | [#%s](https://github.com/%s/actions/runs/%s) |", r.RunID, r.Repo, r.RunID), + "", + ) + return strings.Join(lines, "\n") +} + +func computeQuantiles(samplesMs []float64) latencyQuantiles { + if len(samplesMs) == 0 { + return latencyQuantiles{} + } + sorted := make([]float64, len(samplesMs)) + copy(sorted, samplesMs) + sort.Float64s(sorted) + at := func(p float64) float64 { + idx := int(p * float64(len(sorted)-1)) + return sorted[idx] + } + var sum float64 + for _, v := range sorted { + sum += v + } + return latencyQuantiles{ + P50: at(0.50), + P95: at(0.95), + P99: at(0.99), + Min: sorted[0], + Max: sorted[len(sorted)-1], + Mean: sum / float64(len(sorted)), + } +} diff --git a/go.mod b/go.mod index fb6101884..809d68c8d 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,11 @@ go 1.25 require ( github.com/Masterminds/squirrel v1.5.4 + github.com/aws/aws-sdk-go-v2 v1.42.0 + github.com/aws/aws-sdk-go-v2/config v1.31.16 + github.com/aws/aws-sdk-go-v2/service/s3 v1.89.1 + github.com/aws/aws-sdk-go-v2/service/ssm v1.69.3 + github.com/aws/smithy-go v1.27.1 github.com/cenkalti/backoff/v4 v4.3.0 github.com/creachadair/jrpc2 v1.3.3 github.com/fsouza/fake-gcs-server v1.49.2 @@ -17,7 +22,7 @@ require ( github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.7.0 github.com/spf13/pflag v1.0.5 - github.com/stellar/go-stellar-sdk v0.6.1-0.20260624185910-fc9acb3f1ba6 + github.com/stellar/go-stellar-sdk v0.6.1-0.20260625225930-6181cdf8bda5 github.com/stretchr/testify v1.11.1 ) @@ -29,25 +34,21 @@ require ( github.com/GoogleCloudPlatform/opentelemetry-operations-go/detectors/gcp v1.30.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric v0.54.0 // indirect github.com/GoogleCloudPlatform/opentelemetry-operations-go/internal/resourcemapping v0.54.0 // indirect - github.com/aws/aws-sdk-go-v2 v1.39.5 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 // indirect - github.com/aws/aws-sdk-go-v2/config v1.31.16 // indirect github.com/aws/aws-sdk-go-v2/credentials v1.18.20 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.12 // indirect github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2 // indirect - github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12 // indirect - github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12 // indirect + github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29 // indirect + github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.12 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.12 // indirect github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.3 // indirect - github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.12 // indirect + github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.29 // indirect github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.12 // indirect - github.com/aws/aws-sdk-go-v2/service/s3 v1.89.1 // indirect github.com/aws/aws-sdk-go-v2/service/sso v1.30.0 // indirect github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.4 // indirect github.com/aws/aws-sdk-go-v2/service/sts v1.39.0 // indirect - github.com/aws/smithy-go v1.23.1 // indirect github.com/cncf/xds/go v0.0.0-20251031190108-5cf4b1949528 // indirect github.com/envoyproxy/go-control-plane/envoy v1.35.0 // indirect github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect @@ -59,6 +60,7 @@ require ( github.com/pkg/xattr v0.4.9 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect github.com/spiffe/go-spiffe/v2 v2.6.0 // indirect + github.com/xdrpp/goxdr v0.1.1 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/contrib/detectors/gcp v1.38.0 // indirect go.opentelemetry.io/otel/sdk v1.38.0 // indirect @@ -98,7 +100,7 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/jmoiron/sqlx v1.3.5 // indirect - github.com/klauspost/compress v1.18.1 // indirect + github.com/klauspost/compress v1.18.1 github.com/lann/builder v0.0.0-20180802200727-47ae307949d0 // indirect github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0 // indirect github.com/lib/pq v1.10.9 // indirect diff --git a/go.sum b/go.sum index 262c6d611..2c09bdba4 100644 --- a/go.sum +++ b/go.sum @@ -88,8 +88,8 @@ github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 h1:DklsrG3d github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2/go.mod h1:WaHUgvxTVq04UNunO+XhnAqY/wQc+bxr74GqbsZ/Jqw= github.com/aws/aws-sdk-go v1.55.8 h1:JRmEUbU52aJQZ2AjX4q4Wu7t4uZjOu71uyNmaWlUkJQ= github.com/aws/aws-sdk-go v1.55.8/go.mod h1:ZkViS9AqA6otK+JBBNH2++sx1sgxrPKcSzPPvQkUtXk= -github.com/aws/aws-sdk-go-v2 v1.39.5 h1:e/SXuia3rkFtapghJROrydtQpfQaaUgd1cUvyO1mp2w= -github.com/aws/aws-sdk-go-v2 v1.39.5/go.mod h1:yWSxrnioGUZ4WVv9TgMrNUeLV3PFESn/v+6T/Su8gnM= +github.com/aws/aws-sdk-go-v2 v1.42.0 h1:XvXMJTkFQtpBKIWZnmr9ZEOc2InWM2yldjXEJ/bymhA= +github.com/aws/aws-sdk-go-v2 v1.42.0/go.mod h1:27+ACypSLljLAEKsCYOmrjKh83vuTRkuAe9Uv/3A4bg= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2 h1:t9yYsydLYNBk9cJ73rgPhPWqOh/52fcWDQB5b1JsKSY= github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.7.2/go.mod h1:IusfVNTmiSN3t4rhxWFaBAqn+mcNdwKtPcV16eYdgko= github.com/aws/aws-sdk-go-v2/config v1.31.16 h1:E4Tz+tJiPc7kGnXwIfCyUj6xHJNpENlY11oKpRTgsjc= @@ -100,32 +100,34 @@ github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.12 h1:VO3FIM2TDbm0kqp6sFNR0P github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.12/go.mod h1:6C39gB8kg82tx3r72muZSrNhHia9rjGkX7ORaS2GKNE= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2 h1:9/HxDeIgA7DcKK6e6ZaP5PQiXugYbNERx3Z5u30mN+k= github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.20.2/go.mod h1:3N1RoxKNcVHmbOKVMMw8pvMs5TUhGYPQP/aq1zmAWqo= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12 h1:p/9flfXdoAnwJnuW9xHEAFY22R3A6skYkW19JFF9F+8= -github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.12/go.mod h1:ZTLHakoVCTtW8AaLGSwJ3LXqHD9uQKnOcv1TrpO6u2k= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12 h1:2lTWFvRcnWFFLzHWmtddu5MTchc5Oj2OOey++99tPZ0= -github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.12/go.mod h1:hI92pK+ho8HVcWMHKHrK3Uml4pfG7wvL86FzO0LVtQQ= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29 h1:f3vKqSo13fhTYb+JEcXwXefZQE26I1FB5eTSniU67ko= +github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.29/go.mod h1:MzoLFUArKGpGD+ukmPiTPG1X5x4o6M2kq4v2dr1FiEc= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29 h1:RdwIf/CuUsvJX3RgJagbOyotl/cxoLY4xviKuE7p2GY= +github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.29/go.mod h1:71wt8W2EgswdZy9Mf9KNnzxZ3TiZlv4caKghPktDOkA= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 h1:WKuaxf++XKWlHWu9ECbMlha8WOEGm0OUEZqm4K/Gcfk= github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4/go.mod h1:ZWy7j6v1vWGmPReu0iSGvRiise4YI5SkR3OHKTZ6Wuc= github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.12 h1:itu4KHu8JK/N6NcLIISlf3LL1LccMqruLUXZ9y7yBZw= github.com/aws/aws-sdk-go-v2/internal/v4a v1.4.12/go.mod h1:i+6vTU3xziikTY3vcox23X8pPGW5X3wVgd1VZ7ha+x8= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2 h1:xtuxji5CS0JknaXoACOunXOYOQzgfTvGAc9s2QdCJA4= -github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.2/go.mod h1:zxwi0DIR0rcRcgdbl7E2MSOvxDyyXGBlScvBkARFaLQ= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.12 h1:ZD2+BSw9vFsNlKYIasSNt3uDbjqqXIBcM13UJv/Lx2k= +github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.12/go.mod h1:Ms4zlcVBbXbiP7EVLhl+lgjvA/a7YphqQ3Ih3174EmI= github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.3 h1:NEe7FaViguRQEm8zl8Ay/kC/QRsMtWUiCGZajQIsLdc= github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.9.3/go.mod h1:JLuCKu5VfiLBBBl/5IzZILU7rxS0koQpHzMOCzycOJU= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.12 h1:MM8imH7NZ0ovIVX7D2RxfMDv7Jt9OiUXkcQ+GqywA7M= -github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.12/go.mod h1:gf4OGwdNkbEsb7elw2Sy76odfhwNktWII3WgvQgQQ6w= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.29 h1:DRebniUGZ2MqiiIVmQJ04vIXr918hubdHMnarSLEWyU= +github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.29/go.mod h1:LfRkPCD8YHDM2E5eTkos2UpwYeZnBcVarTa8L59bJHA= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.12 h1:R3uW0iKl8rgNEXNjVGliW/oMEh9fO/LlUEV8RvIFr1I= github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.19.12/go.mod h1:XEttbEr5yqsw8ebi7vlDoGJJjMXRez4/s9pibpJyL5s= github.com/aws/aws-sdk-go-v2/service/s3 v1.89.1 h1:Dq82AV+Qxpno/fG162eAhnD8d48t9S+GZCfz7yv1VeA= github.com/aws/aws-sdk-go-v2/service/s3 v1.89.1/go.mod h1:MbKLznDKpf7PnSonNRUVYZzfP0CeLkRIUexeblgKcU4= +github.com/aws/aws-sdk-go-v2/service/ssm v1.69.3 h1:58LjP8cp8UEHA1LG/JZ4fG9SobHE82kLYe46mogbSI4= +github.com/aws/aws-sdk-go-v2/service/ssm v1.69.3/go.mod h1:16Zd02ocSJp68o4r36MQ4Rikf/Ulv4On5qjMpJJf5Mo= github.com/aws/aws-sdk-go-v2/service/sso v1.30.0 h1:xHXvxst78wBpJFgDW07xllOx0IAzbryrSdM4nMVQ4Dw= github.com/aws/aws-sdk-go-v2/service/sso v1.30.0/go.mod h1:/e8m+AO6HNPPqMyfKRtzZ9+mBF5/x1Wk8QiDva4m07I= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.4 h1:tBw2Qhf0kj4ZwtsVpDiVRU3zKLvjvjgIjHMKirxXg8M= github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.4/go.mod h1:Deq4B7sRM6Awq/xyOBlxBdgW8/Z926KYNNaGMW2lrkA= github.com/aws/aws-sdk-go-v2/service/sts v1.39.0 h1:C+BRMnasSYFcgDw8o9H5hzehKzXyAb9GY5v/8bP9DUY= github.com/aws/aws-sdk-go-v2/service/sts v1.39.0/go.mod h1:4EjU+4mIx6+JqKQkruye+CaigV7alL3thVPfDd9VlMs= -github.com/aws/smithy-go v1.23.1 h1:sLvcH6dfAFwGkHLZ7dGiYF7aK6mg4CgKA/iDKjLDt9M= -github.com/aws/smithy-go v1.23.1/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= +github.com/aws/smithy-go v1.27.1 h1:4T340VFndXtADGF52gYa1POyL7s9E4Z1OeZ1hCscIw8= +github.com/aws/smithy-go v1.27.1/go.mod h1:YE2RhdIuDbA5E5bTdciG9KrW3+TiEONeUWCqxX9i1Fc= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8= @@ -429,8 +431,8 @@ github.com/spf13/viper v1.17.0 h1:I5txKw7MJasPL/BrfkbA0Jyo/oELqVmux4pR/UxOMfI= github.com/spf13/viper v1.17.0/go.mod h1:BmMMMLQXSbcHK6KAOiFLz0l5JHrU89OdIRHvsk0+yVI= github.com/spiffe/go-spiffe/v2 v2.6.0 h1:l+DolpxNWYgruGQVV0xsfeya3CsC7m8iBzDnMpsbLuo= github.com/spiffe/go-spiffe/v2 v2.6.0/go.mod h1:gm2SeUoMZEtpnzPNs2Csc0D/gX33k1xIx7lEzqblHEs= -github.com/stellar/go-stellar-sdk v0.6.1-0.20260624185910-fc9acb3f1ba6 h1:NL5m+E+Ln5/HAYenkgyr/kI5xmgGOQ+BMRFktjl1eIw= -github.com/stellar/go-stellar-sdk v0.6.1-0.20260624185910-fc9acb3f1ba6/go.mod h1:IkcqcrE9UQi7n/1y+MxKB+7qzdjG1T2kGOD7Ss8dqjw= +github.com/stellar/go-stellar-sdk v0.6.1-0.20260625225930-6181cdf8bda5 h1:mFC9kqxTmHDNXWI78aPTgcvDADEsJOJSr4SRcEbMTvc= +github.com/stellar/go-stellar-sdk v0.6.1-0.20260625225930-6181cdf8bda5/go.mod h1:IkcqcrE9UQi7n/1y+MxKB+7qzdjG1T2kGOD7Ss8dqjw= github.com/stellar/go-xdr v0.0.0-20260529210834-0bf8f4956364 h1:gOKrfuWdZ92LFlv0TAwgZ7OsWKeBsOMDlGLyFgduI1w= github.com/stellar/go-xdr v0.0.0-20260529210834-0bf8f4956364/go.mod h1:If+U9Z1W5xU97VrOgJandQT+2dN7/iOpkCrxBJEyF80= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=