Skip to content
Draft
41 changes: 41 additions & 0 deletions .github/workflows/backfill-test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
name: Backfill ingestion (callable)

on:
workflow_call:
inputs:
target_ref:
description: 'Ref or SHA to benchmark (resolved to a SHA on checkout)'
required: true
type: string
# re-export ec2-leg's results so the coordinator can read them
outputs:
passed:
value: ${{ jobs.leg.outputs.passed }}
verdict:
value: ${{ jobs.leg.outputs.verdict }}
result_key:
value: ${{ jobs.leg.outputs.result_key }}
bucket:
value: ${{ jobs.leg.outputs.bucket }}
instance_id:
value: ${{ jobs.leg.outputs.instance_id }}

permissions:
id-token: write
contents: read

jobs:
leg:
uses: ./.github/workflows/ec2-leg.yml
secrets: inherit
with:
target_ref: ${{ inputs.target_ref }}
run_label: backfill
leg_script: cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/run-backfill.sh
runner_dir: ./cmd/stellar-rpc/internal/integrationtest/infrastructure/perf-eval/backfill-test/runner
# these cascade under the role's 8h MaxSessionDuration so deadline < poll < job < session < self-terminate
results_timeout: 28200 # 7h50m S3 poll ceiling
job_timeout_minutes: 475 # 7h55m (only effective on self-hosted, hosted runners hard-cap jobs at 6h)
role_duration_seconds: 28800 # 8h OIDC session == the role's MaxSessionDuration
self_terminate_minutes: 540 # 9h box backstop
extra_env: 'export HISTORY_RETENTION_WINDOW=120960 BACKFILL_DEADLINE=7h30m'
32 changes: 23 additions & 9 deletions .github/workflows/load-test-coordinator.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ on:
push:
# temporary scaffolding: before merge to main, replace with line
# branches: [release/**]
branches: [release/**, gha-coordinator]
branches: [release/**, gha-coordinator, backfill-test]
workflow_dispatch:
inputs:
target_ref:
Expand Down Expand Up @@ -52,7 +52,7 @@ jobs:
run: |
SHA="$(git rev-parse HEAD)"
PR="$(gh pr list --repo "${{ github.repository }}" --state open \
--base main --head "$REF" --json number --jq '.[0].number // ""' 2>/dev/null || true)"
--head "$REF" --json number --jq '.[0].number // ""' 2>/dev/null || true)"
{
echo "target_sha=$SHA"
echo "pr_number=$PR"
Expand All @@ -69,9 +69,17 @@ jobs:
with:
target_ref: ${{ needs.plan.outputs.target_sha }}

backfill:
name: Backfill ingestion
needs: plan
uses: ./.github/workflows/backfill-test.yml
secrets: inherit
with:
target_ref: ${{ needs.plan.outputs.target_sha }}

report:
name: Aggregate + report
needs: [plan, load-test]
needs: [plan, load-test, backfill]
if: always()
runs-on: ubuntu-latest
timeout-minutes: 10
Expand All @@ -87,10 +95,8 @@ jobs:
role-to-assume: ${{ secrets.AWS_GHA_ROLE_ARN }}
aws-region: us-east-1

# coordinator-runner does the rendering + history fold (it fetches each leg's
# S3 result and emits the new comment body, under unit test). The gh api glue
# -- reading the existing comment and posting/editing -- stays here. Tolerant:
# a posting hiccup must not pre-empt the gate below.
# coordinator-runner fetches each leg's S3 result and emits the new comment body,
# this ties that to the gh API
- name: Render + post sticky report
env:
GH_TOKEN: ${{ github.token }}
Expand All @@ -100,7 +106,9 @@ jobs:
TARGET_SHA: ${{ needs.plan.outputs.target_sha }}
TARGET_REF: ${{ inputs.target_ref || github.ref_name }}
RUN_URL: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}
LEGS: '[{"label":"Apply-load ingestion","bucket":"${{ needs.load-test.outputs.bucket }}","key":"${{ needs.load-test.outputs.result_key }}","passed":"${{ needs.load-test.outputs.passed }}","verdict":"${{ needs.load-test.outputs.verdict }}"}]'
LEGS: >-
[{"label":"Apply-load ingestion","bucket":"${{ needs.load-test.outputs.bucket }}","key":"${{ needs.load-test.outputs.result_key }}","passed":"${{ needs.load-test.outputs.passed }}","verdict":"${{ needs.load-test.outputs.verdict }}"},
{"label":"Backfill ingestion","bucket":"${{ needs.backfill.outputs.bucket }}","key":"${{ needs.backfill.outputs.result_key }}","passed":"${{ needs.backfill.outputs.passed }}","verdict":"${{ needs.backfill.outputs.verdict }}"}]
run: |
set -uo pipefail
MARKER='<!-- load-test-results -->'
Expand Down Expand Up @@ -137,8 +145,14 @@ jobs:
- name: Set overall status
if: always()
run: |
rc=0
if [ "${{ needs.load-test.outputs.passed }}" != "true" ]; then
echo "::error::Apply-load ingestion leg did not pass (result=${{ needs.load-test.result }}, verdict=${{ needs.load-test.outputs.verdict }})"
exit 1
rc=1
fi
if [ "${{ needs.backfill.outputs.passed }}" != "true" ]; then
echo "::error::Backfill ingestion leg did not pass (result=${{ needs.backfill.result }}, verdict=${{ needs.backfill.outputs.verdict }})"
rc=1
fi
if [ "$rc" -ne 0 ]; then exit 1; fi
echo "all load-test legs passed"
30 changes: 30 additions & 0 deletions cmd/stellar-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,6 +205,24 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {
var ingestCfg ingest.Config
daemon.ingestService, ingestCfg = createIngestService(cfg, logger, daemon, feewindows, historyArchive, rw)
if cfg.Backfill {
// Bulk-load with backfill-tuned SQLite pragmas via a separate session that is
// swapped in for the backfill phase and closed once it completes.
tunedSession, err := db.OpenSQLiteBackfillSession(cfg.SQLiteDBPath)
if err != nil {
logger.WithError(err).Fatal("failed to open backfill-tuned database session")
}
servingSession := daemon.db.UseSession(tunedSession)

// On a fresh DB, drop the events secondary indexes so the bulk-load avoids
// random B-tree inserts; EnsureEventIndexes rebuilds them below.
if _, err := db.NewLedgerReader(daemon.db).GetLedgerRange(context.Background()); errors.Is(err, db.ErrEmptyDB) {
if err := db.DropEventIndexes(context.Background(), daemon.db, logger); err != nil {
logger.WithError(err).Fatal("failed to drop events indexes for backfill")
}
} else if err != nil {
logger.WithError(err).Fatal("failed to check database emptiness for backfill")
}

backfillMeta, err := ingest.NewBackfillMeta(
logger,
daemon.ingestService,
Expand All @@ -218,10 +236,22 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {
if err := backfillMeta.RunBackfill(cfg); err != nil {
logger.WithError(err).Fatal("failed to backfill ledgers")
}

// Restore the serving session and release the tuned session's resources.
daemon.db.UseSession(servingSession)
if err := tunedSession.Close(); err != nil {
logger.WithError(err).Warn("could not close backfill-tuned database session")
}
// Clear the DB cache and fee windows so they re-populate from the database
daemon.db.ResetCache()
feewindows.Reset()
}
// Rebuild any events indexes dropped for a backfill bulk-load (also covers
// crashed-backfill restarts). Must finish before ingestService.Start: SQLite
// is single-writer and a long CREATE INDEX would starve live commits.
if err := db.EnsureEventIndexes(context.Background(), daemon.db, cfg.SQLiteDBPath, logger); err != nil {
logger.WithError(err).Fatal("failed to ensure events indexes")
}
// Start ingestion service only after backfill is complete
daemon.ingestService.Start(ingestCfg)

Expand Down
137 changes: 131 additions & 6 deletions cmd/stellar-rpc/internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,13 +78,28 @@ func (d *DB) ResetCache() {
d.cache.firstLedgerCloseTime = 0
}

// Serving DSN pragmas:
// 1. Use Write-Ahead Logging (WAL).
// 2. Disable WAL auto-checkpointing (we do the checkpointing ourselves with
// wal_checkpoint pragmas after every write transaction).
// 3. Use synchronous=NORMAL, which is faster and still safe in WAL mode.
const serveSQLitePragmas = "_journal_mode=WAL&_wal_autocheckpoint=0&_synchronous=NORMAL"

// Backfill DSN pragmas, relative to serving:
// 1. Use synchronous=OFF (safe since backfill is restartable and gap-checked).
// 2. Use a 1 GiB page cache for hot interior B-tree pages; anything larger
// displaces the OS page cache and degrades the late run.
const backfillSQLitePragmas = "_journal_mode=WAL&_wal_autocheckpoint=0&_synchronous=OFF" +
"&_cache_size=-1048576" // 1 GiB page cache (negative value = KiB)

// Index-build DSN pragmas: a large cache raises the CREATE INDEX sorter's
// in-memory budget (this session runs alone, single connection); default
// temp_store so multi-GB sorts spill to disk instead of RAM.
const indexBuildSQLitePragmas = "_journal_mode=WAL&_wal_autocheckpoint=0&_synchronous=NORMAL" +
"&_cache_size=-2097152" // 2 GiB page cache (negative value = KiB)

func openSQLiteDB(dbFilePath string) (*db.Session, error) {
// 1. Use Write-Ahead Logging (WAL).
// 2. Disable WAL auto-checkpointing (we will do the checkpointing ourselves with wal_checkpoint pragmas
// after every write transaction).
// 3. Use synchronous=NORMAL, which is faster and still safe in WAL mode.
session, err := db.Open("sqlite3",
fmt.Sprintf("file:%s?_journal_mode=WAL&_wal_autocheckpoint=0&_synchronous=NORMAL", dbFilePath))
session, err := db.Open("sqlite3", fmt.Sprintf("file:%s?%s", dbFilePath, serveSQLitePragmas))
if err != nil {
return nil, fmt.Errorf("open failed: %w", err)
}
Expand All @@ -96,6 +111,116 @@ func openSQLiteDB(dbFilePath string) (*db.Session, error) {
return session, nil
}

// OpenSQLiteBackfillSession opens an additional, backfill-tuned session to the
// same SQLite file. Assumes the schema already exists and skips them. Swap onto
// the *DB with UseSession for backfill, then restore the serving session.
func OpenSQLiteBackfillSession(dbFilePath string) (db.SessionInterface, error) {
session, err := db.Open("sqlite3", fmt.Sprintf("file:%s?%s", dbFilePath, backfillSQLitePragmas))
if err != nil {
return nil, fmt.Errorf("open backfill session failed: %w", err)
}
return session, nil
}

// UseSession swaps the underlying session, returning the previous one. Used to
// apply backfill-specific SQLite tuning for the backfill phase without disturbing
// the metrics-wrapped serving session + restore it. Single-threaded only.
func (d *DB) UseSession(s db.SessionInterface) db.SessionInterface {
prev := d.SessionInterface
d.SessionInterface = s
return prev
}

type indexDDL struct {
name string
ddl string
}

// eventIndexes are the events secondary indexes dropped during a fresh-DB
// backfill and rebuilt afterwards. DDL text must match 06_topic_indices.sql
// exactly so the rebuilt schema is identical to a migration-built one.
// NOTE for migration authors: startup migrations run before EnsureEventIndexes,
// so a future migration touching these indexes must tolerate their absence
// (e.g. DROP INDEX IF EXISTS).
//
//nolint:gochecknoglobals // effectively-constant DDL lookup
var eventIndexes = []indexDDL{
{"idx_id_contract_id", "CREATE INDEX idx_id_contract_id ON events (contract_id, id)"},
{"idx_id_topic1", "CREATE INDEX idx_id_topic1 ON events (topic1, id)"},
}

// DropEventIndexes drops the events secondary indexes so a fresh-DB backfill
// bulk-load avoids random B-tree inserts. EnsureEventIndexes rebuilds them.
func DropEventIndexes(ctx context.Context, session db.SessionInterface, logger *log.Entry) error {
for _, index := range eventIndexes {
if _, err := session.ExecRaw(ctx, "DROP INDEX IF EXISTS "+index.name); err != nil {
return fmt.Errorf("could not drop index %s: %w", index.name, err)
}
logger.Infof("Dropped events index %s for backfill bulk-load", index.name)
}
return nil
}

// EnsureEventIndexes rebuilds any events secondary index dropped by
// DropEventIndexes. Must complete before live ingestion starts: SQLite is
// single-writer and a long CREATE INDEX would starve live commits past their
// busy timeout.
func EnsureEventIndexes(ctx context.Context, d *DB, dbFilePath string, logger *log.Entry) error {
missing, err := missingEventIndexes(ctx, d)
if err != nil {
return err
}
if len(missing) == 0 {
return nil
}

session, err := db.Open("sqlite3", fmt.Sprintf("file:%s?%s", dbFilePath, indexBuildSQLitePragmas))
if err != nil {
return fmt.Errorf("open index build session failed: %w", err)
}
defer func() {
if err := session.Close(); err != nil {
logger.WithError(err).Warn("could not close index build session")
}
}()
// Single connection so the threads pragma applies to the CREATE INDEX below
session.DB.SetMaxOpenConns(1)
if _, err := session.ExecRaw(ctx, "PRAGMA threads=4"); err != nil {
return fmt.Errorf("could not enable multithreaded sorter: %w", err)
}

for _, index := range missing {
logger.Infof("Building events index %s (may take minutes, no progress output)", index.name)
startTime := time.Now()
if _, err := session.ExecRaw(ctx, index.ddl); err != nil {
return fmt.Errorf("could not build index %s: %w", index.name, err)
}
// Checkpoint now so the index's WAL debt isn't paid by the first live commit
if _, err := session.ExecRaw(ctx, "PRAGMA wal_checkpoint(TRUNCATE)"); err != nil {
return fmt.Errorf("could not checkpoint after building index %s: %w", index.name, err)
}
logger.WithField("duration", time.Since(startTime).String()).
Infof("Built events index %s", index.name)
}
return nil
}

func missingEventIndexes(ctx context.Context, d *DB) ([]indexDDL, error) {
var missing []indexDDL
for _, index := range eventIndexes {
var count int
err := d.GetRaw(ctx, &count,
"SELECT COUNT(*) FROM sqlite_master WHERE type = 'index' AND name = ?", index.name)
if err != nil {
return nil, fmt.Errorf("could not check index %s: %w", index.name, err)
}
if count == 0 {
missing = append(missing, index)
}
}
return missing, nil
}

func OpenSQLiteDBWithPrometheusMetrics(dbFilePath string, namespace string, sub db.Subservice,
registry *prometheus.Registry,
) (*DB, error) {
Expand Down
7 changes: 3 additions & 4 deletions cmd/stellar-rpc/internal/db/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,10 +211,9 @@ func (eventHandler *eventHandler) InsertEvents(lcm xdr.LedgerCloseMeta) error {
}

// Batch inserts to avoid exceeding SQLite's SQLITE_MAX_VARIABLE_NUMBER
// limit (32,767 by default). With 10 bind variables per event, we cap
// each INSERT at 1000 events (10,000 bind variables) to stay well
// within the limit.
const maxEventsPerBatch = 1000
// limit (32,766 by default). 10 bind variables/event * 3,000 events =
// 30,000 bind variables < 32,766 limit.
const maxEventsPerBatch = 3000

for batchStart := 0; batchStart < len(insertableEvents); batchStart += maxEventsPerBatch {
batchEnd := min(batchStart+maxEventsPerBatch, len(insertableEvents))
Expand Down
7 changes: 3 additions & 4 deletions cmd/stellar-rpc/internal/db/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,9 @@ func (txn *transactionHandler) InsertTransactions(lcm xdr.LedgerCloseMeta) error
}

// Batch inserts to avoid exceeding SQLite's SQLITE_MAX_VARIABLE_NUMBER
// limit (32,766 by default). With 3 bind variables per transaction, we
// cap each INSERT at 3,000 rows (9,000 bind variables) to stay well
// within the limit.
const maxRowsPerBatch = 3000
// limit (32,766 by default). 3 bind variables/tx * 10,000 tx rows = 30,000
// bind variables < 32,766 limit.
const maxRowsPerBatch = 10000
rowsInBatch := 0
query := sq.Insert(transactionTableName).
Columns("hash", "ledger_sequence", "application_order")
Expand Down
Loading
Loading