diff --git a/.gitignore b/.gitignore index 3978ca2bd..bd24138b6 100644 --- a/.gitignore +++ b/.gitignore @@ -3,3 +3,6 @@ captive-core/ .soroban/ !test.toml *.sqlite* + +# Old design-docs-og reference files (local only) +full-history/design-docs/_ref-old-* diff --git a/full-history/design-docs/01-backfill-workflow.md b/full-history/design-docs/01-backfill-workflow.md new file mode 100644 index 000000000..e09385a1c --- /dev/null +++ b/full-history/design-docs/01-backfill-workflow.md @@ -0,0 +1,588 @@ +# Backfill Workflow + +## Overview + +Backfill is the RPC service's historical-ingestion subroutine — it pulls ledgers from a configured remote object store (GCS or S3) and writes them as **immutable, query-ready artifacts** on local disk. +It runs once per service start, as part of Phase 1 (catchup), to close the gap between on-disk state and the current network tip before live ingestion takes over. Interruption at any point leaves recoverable state; on restart, already-complete work is skipped. + +**What it produces:** + +Three immutable artifact types, one per full-history RPC query, scoped to **chunks** (10_000-ledger blocks) and **tx indexes** (consecutive chunks, default 1_000 = 10_000_000 ledgers each — see [Geometry](#geometry)). + +| Immutable output | Query it enables | Scope | +|-----------------|-----------------|-------| +| Ledger [pack file](../../design-docs/packfile-library.md) | `getLedger` | Per chunk (10_000 ledgers) | +| Tx-index files | `getTransaction` | Per tx index (default 10_000_000 ledgers) | +| [Events cold segment](../../design-docs/getevents-full-history-design.md) | `getEvents` | Per chunk | + +**How it does it:** + +- Backfill is a subroutine invoked by the RPC service's **Phase 1 (catchup)** — see [02-streaming-workflow.md — Phase 1](./02-streaming-workflow.md#phase-1--catchup). Internal to the service; no `full-history-backfill` subcommand, no per-run flags. +- Ledger source is **BSB** (Buffered Storage Backend) — a remote object-store reader for `LedgerCloseMeta`, configured under `[BSB]` in the TOML config. Interface details in [02-streaming-workflow.md — Ledger Source](./02-streaming-workflow.md#ledger-source). +- Ingests historical ledgers one chunk at a time. Each chunk uses its own BSB reader scoped to that chunk's 10_000 ledgers; no shared source state across chunks. +- Writes directly to immutable file formats — no RocksDB active stores (mutable RocksDB instances holding in-flight live-ingestion data; streaming's concern, see [02-streaming-workflow.md — Active Store Architecture](./02-streaming-workflow.md#active-store-architecture)). +- Tracks per-chunk and per-tx-index completion in a small **meta store** — a dedicated RocksDB with WAL always on, separate from streaming's active stores. Each flag is written after its artifact's `fsync`, and flag presence drives all resume decisions. +- Schedules work as a DAG of idempotent tasks dispatched via a bounded worker pool. +- Returns when every chunk in the range is complete; on crash, Phase 1 (catchup) re-invokes with the same range and already-complete chunks are skipped via per-chunk idempotency primitives. +- **BSB-only.** + - Backfill does not use captive core (the embedded `stellar-core` subprocess that the service runs during live ingestion) as a ledger source. + - Captive core belongs to Phase 4 (live ingestion) + - if BSB isn't configured, backfill is not invoked at all and Phase 4 (live ingestion)'s captive core catches up from a leapfrog'd resume ledger — a start ledger chosen forward of genesis so ingestion stays within the retention window — as part of normal startup. See [02-streaming-workflow.md — Phase 4](./02-streaming-workflow.md#phase-4--live-ingestion) and [Ledger Source](./02-streaming-workflow.md#ledger-source). + +For the distinction between **backfill (this subroutine)** and **Phase 1 (catchup) (the startup phase that invokes backfill)** — two terms that get conflated because their scopes overlap, refer [02-streaming-workflow.md — Backfill vs Phase 1 (catchup)](./02-streaming-workflow.md#backfill-vs-phase-1-catchup). + +--- + +## Geometry + +Stellar's first ledger is `GENESIS_LEDGER = 2`. Mapping functions subtract it to zero-base the `ledger_seq ↔ chunk_id` axis. + +```python +GENESIS_LEDGER = 2 +LEDGERS_PER_CHUNK = 10_000 # hardcoded; not configurable +CHUNKS_PER_TX_INDEX = 1_000 # read from config, immutable after first run. Acceptable values - 1, 10, 100, 1_000; default 1_000 +LEDGERS_PER_TX_INDEX = CHUNKS_PER_TX_INDEX * LEDGERS_PER_CHUNK # at cpi=1_000 this is 10_000_000 +``` + +- In pseudocode, `cpi` in inline comments is shorthand for `CHUNKS_PER_TX_INDEX`. +- All IDs use uniform `%08d` zero-padding (supports up to `99_999_999`). + +--- + +## Configuration +Backfill reads the subset of the unified TOML config described below. +_Service-level keys, used by the streaming flow, are specified in [02-streaming-workflow.md — Configuration](./02-streaming-workflow.md#configuration)._ + +### TOML Config + +**[SERVICE]** + +| Key | Type | Default | Description | +|--------------------------------------|------|---------|-------------| +| `DEFAULT_DATA_DIR` | string | **required** | Base directory for meta store and default storage paths. | +| `CHUNKS_PER_TX_INDEX` (optional) | int | `1000` | Chunks per tx index. Defines data layout; stored in the meta store on first run and fatal if changed on any subsequent run. | + +**[IMMUTABLE_STORAGE.LEDGERS]** (optional) + +| Key | Type | Default | Description | +|-----|------|---------|-------------| +| `PATH` | string | `{DEFAULT_DATA_DIR}/ledgers` | Base path for ledger pack files. | + +**[IMMUTABLE_STORAGE.EVENTS]** (optional) + +| Key | Type | Default | Description | +|-----|------|---------|-------------| +| `PATH` | string | `{DEFAULT_DATA_DIR}/events` | Base path for events cold segments. | + +**[IMMUTABLE_STORAGE.TXHASH_RAW]** (optional) + +| Key | Type | Default | Description | +|-----|------|---------|-------------| +| `PATH` | string | `{DEFAULT_DATA_DIR}/txhash/raw` | Base path for raw txhash `.bin` files (transient). | + +**[IMMUTABLE_STORAGE.TXHASH_INDEX]** (optional) + +| Key | Type | Default | Description | +|-----|------|---------|-------------| +| `PATH` | string | `{DEFAULT_DATA_DIR}/txhash/index` | Base path for RecSplit (minimal-perfect-hash index library) `.idx` files (permanent). | + +The `IMMUTABLE_STORAGE` prefix disambiguates from `ACTIVE_STORAGE` (RocksDB-backed mutable stores owned by the streaming workflow). + +**[BSB]** — Buffered Storage Backend (optional at the service level; required when [Phase 1 (catchup)](./02-streaming-workflow.md#phase-1--catchup) selects `BSBSource`) + +| Key | Type | Default | Description | +|-----|------|---------|-------------| +| `BUCKET_PATH` | string | **required** | Remote object store path to fetch LedgerCloseMeta (without `gs://` prefix for GCS). | +| `BUFFER_SIZE` | int | `1000` | Prefetch buffer depth per connection. | +| `NUM_WORKERS` | int | `20` | Download workers per connection. | + +- `[BSB]` is effectively required when backfill runs. If absent, Phase 1 (catchup) does not invoke `run_backfill` at all — Phase 4's captive core handles initial catchup instead (see [02-streaming-workflow.md — Ledger Source](./02-streaming-workflow.md#ledger-source)). + +**[LOGGING]** (optional) + +| Key | Type | Default | Description | +|-----|------|---------|-------------| +| `LEVEL` | string | `"info"` | Minimum log severity. Accepted values: `debug` / `info` / `warn` / `error`. Service CLI flag `--log-level` wins when both are set. | +| `FORMAT` | string | `"text"` | Log output format. Accepted values: `text` / `json`. Service CLI flag `--log-format` wins when both are set. | + +**[META_STORE]** (optional) + +| Key | Type | Default | Description | +|-----|------|---------|-------------| +| `PATH` | string | `{DEFAULT_DATA_DIR}/meta/rocksdb` | Meta store RocksDB directory. | + +### Example TOML + +```toml +[SERVICE] +DEFAULT_DATA_DIR = "/data/stellar-rpc" +CHUNKS_PER_TX_INDEX = 1000 + +[IMMUTABLE_STORAGE.LEDGERS] +PATH = "/mnt/nvme/ledgers" + +[IMMUTABLE_STORAGE.EVENTS] +PATH = "/mnt/nvme/events" + +[IMMUTABLE_STORAGE.TXHASH_RAW] +PATH = "/mnt/nvme/txhash/raw" + +[IMMUTABLE_STORAGE.TXHASH_INDEX] +PATH = "/mnt/nvme/txhash/index" + +[BSB] +BUCKET_PATH = "sdf-ledger-close-meta/v1/ledgers/pubnet" + +[LOGGING] +LEVEL = "info" +FORMAT = "text" +``` + +The TOML above is consumed by the RPC service entry point (`stellar-rpc --config ...`); backfill is invoked internally by [Phase 1 (catchup)](./02-streaming-workflow.md#phase-1--catchup) with the chunk range and source it computed. + +--- + +## Directory Structure + +With geometry and storage paths (`IMMUTABLE_STORAGE.*`) defined above, here is how they map to the filesystem. + +- Each data type has its own directory tree rooted at its `IMMUTABLE_STORAGE.*.PATH`. +- Chunk-level files (ledgers, events, raw txhash) are grouped into subdirectories (`bucket`) of 1_000 chunks: + - `bucket_id = chunk_id // 1000` (hardcoded, not configurable), formatted as `%05d`. + - `bucket_id` is purely a filesystem concern — it does not appear in meta store keys, DAG dependencies, or config. +- Tx-index output is the only structure that uses `tx_index_id` instead of `bucket_id`. +- Directories are created on-demand via `os.MkdirAll` (safe for concurrent writes). + +``` +{DEFAULT_DATA_DIR}/ +├── meta/ +│ └── rocksdb/ ← Meta store (WAL = Write-Ahead Log; always enabled) +│ +├── ledgers/ ← IMMUTABLE_STORAGE.LEDGERS.PATH +│ ├── 00000/ ← chunk_ids 0–999 (1_000 .pack files) +│ │ ├── 00000000.pack ← ledger pack file for chunk_id=0 (ledgers 2–10_001) +│ │ ├── 00000001.pack +│ │ └── ... +│ ├── 00001/ ← chunk_ids 1_000–1_999 +│ │ └── ... +│ └── .../ +│ +├── events/ ← IMMUTABLE_STORAGE.EVENTS.PATH +│ ├── 00000/ ← chunk_ids 0–999 (3_000 files: 3 per chunk) +│ │ ├── 00000000-events.pack ← compressed event blocks +│ │ ├── 00000000-index.pack ← serialized roaring bitmaps +│ │ ├── 00000000-index.hash ← MPHF (Minimal Perfect Hash Function) for term → slot lookup +│ │ └── ... +│ └── .../ +│ +└── txhash/ + ├── raw/ ← IMMUTABLE_STORAGE.TXHASH_RAW.PATH + │ ├── 00000/ ← chunk_ids 0–999 (1_000 .bin files) + │ │ ├── 00000000.bin ← TRANSIENT — deleted after RecSplit or by Phase 2 (.bin hydration) + │ │ └── ... + │ └── .../ + └── index/ ← IMMUTABLE_STORAGE.TXHASH_INDEX.PATH + ├── 00000000/ ← tx_index_id=0 (16 RecSplit CF files) + │ └── cf-{0-f}.idx ← PERMANENT + └── .../ +``` + +`CHUNKS_PER_TX_INDEX` only affects `txhash/index/` — all other trees use the hardcoded 1_000-chunk `bucket_id` grouping regardless. + +Directory-count tradeoffs for a 2_000-chunk (20M-ledger) dataset: + +| `CHUNKS_PER_TX_INDEX` | Tx-index dirs | Tradeoff | +|---------------------------|---------------|----------| +| `1000` (default) | `2_000 / 1_000 = 2` | Fewer dirs, larger indexes — longer build time per index, fewer files to search at query time | +| `100` | `2_000 / 100 = 20` | More dirs, smaller indexes — faster build time per index, more files to search at query time | +| `1` | `2_000 / 1 = 2_000` | One index per chunk — fastest build, most files to search | + +### Path Conventions + +| File Type | Pattern | Example | +|-----------|---------|---------| +| Ledger pack | `{IMMUTABLE_STORAGE.LEDGERS.PATH}/{bucket_id:05d}/{chunk_id:08d}.pack` | `ledgers/00000/00000042.pack` | +| Raw txhash | `{IMMUTABLE_STORAGE.TXHASH_RAW.PATH}/{bucket_id:05d}/{chunk_id:08d}.bin` | `txhash/raw/00000/00000042.bin` | +| RecSplit CF | `{IMMUTABLE_STORAGE.TXHASH_INDEX.PATH}/{tx_index_id:08d}/cf-{nibble}.idx` | `txhash/index/00000000/cf-a.idx` | +| Events data | `{IMMUTABLE_STORAGE.EVENTS.PATH}/{bucket_id:05d}/{chunk_id:08d}-events.pack` | `events/00000/00000042-events.pack` | +| Events index | `{IMMUTABLE_STORAGE.EVENTS.PATH}/{bucket_id:05d}/{chunk_id:08d}-index.pack` | `events/00000/00000042-index.pack` | +| Events hash | `{IMMUTABLE_STORAGE.EVENTS.PATH}/{bucket_id:05d}/{chunk_id:08d}-index.hash` | `events/00000/00000042-index.hash` | + +- **Nibble** = high 4 bits of `txhash[0]`, i.e., `txhash[0] >> 4`. Values `0`–`f`. Determines which of 16 CFs a txhash is routed to. +- **Raw txhash format**: 36 bytes per entry, no header: `[txhash: 32 bytes][ledger_seq: 4 bytes big-endian]`. +- **Events cold segment**: see [getEvents full-history design](../../design-docs/getevents-full-history-design.md) for the full format. + +--- + +## Meta Store Keys + +*This section is a reference for the key schema and lifecycle. It reads more naturally after [How Backfill Runs](#how-backfill-runs) below, which defines the tasks that write and consume these keys.* + +- Single RocksDB instance with WAL (Write-Ahead Log) always enabled. +- Authoritative for everything backfill decides: which chunks and tx indexes are done (progress tracker), which config values can't change across runs (e.g., `CHUNKS_PER_TX_INDEX`, stored on first run and fatal if changed), and where to resume after a crash (every resume decision derives from key presence). + +### Key Schema + +All IDs use uniform `%08d` zero-padding, matching the directory structure. + +| Key Pattern | Value | Written When | +|-------------|-------|-------------| +| `chunk:{chunk_id:08d}:lfs` | `"1"` | After ledger `.pack` file is fsynced | +| `chunk:{chunk_id:08d}:txhash` | `"1"` | After raw txhash `.bin` file is fsynced | +| `chunk:{chunk_id:08d}:events` | `"1"` | After events cold segment files (`events.pack`, `index.pack`, `index.hash`) are fsynced | +| `index:{tx_index_id:08d}:txhash` | `"1"` | After all 16 RecSplit CF `.idx` files are built and fsynced | + +- Values are `"1"` (retained for `ldb`/`sst_dump` readability); key presence is the signal. +- Key absence means not started or incomplete — treated identically on resume. +- Each chunk flag is written independently after its output's fsync — a crash may leave some flags set and others absent for the same chunk. +- On resume, each chunk's flags are checked independently — only missing outputs are produced. +- WAL is always enabled — disabling it would invalidate all crash recovery. + +**Examples:** +``` +chunk:00000000:lfs → "1" chunk_id=0 ledger pack done +chunk:00000000:txhash → "1" chunk_id=0 raw txhash done +chunk:00000000:events → "1" chunk_id=0 events cold segment done +chunk:00000999:events → "1" last chunk of tx_index_id=0 (at cpi=1_000) +index:00000000:txhash → "1" tx_index_id=0 RecSplit complete +index:00000001:txhash → absent tx_index_id=1 not yet built +``` + +### Key Lifecycle + +``` +chunk ingestion → sets chunk:{chunk_id:08d}:lfs, chunk:{chunk_id:08d}:txhash, chunk:{chunk_id:08d}:events + (each independently, after its output's fsync) +tx index build → sets index:{tx_index_id:08d}:txhash +txhash cleanup → deletes chunk:{chunk_id:08d}:txhash keys + raw .bin files +``` + +After a completed tx index: +- `chunk:{chunk_id:08d}:lfs`, `chunk:{chunk_id:08d}:events`, `index:{tx_index_id:08d}:txhash` — permanent within backfill's scope. +- `chunk:{chunk_id:08d}:txhash` keys + raw `.bin` files — deleted after tx index is built. + +--- + +## How Backfill Runs + +Backfill's work is a static DAG. The `run_backfill` orchestrator validates the caller's range, builds the DAG over that range, and dispatches it with a bounded-concurrency worker pool (`MAX_CPU_THREADS`-capped). Each task is idempotent and checks its own completion state — the scheduler is just the dispatcher. + +### Task Types and Dependencies + +The backfill DAG has three task types: + +| Task | Cadence | Dependencies | Produces | +|------|---------|-------------|----------| +| `process_chunk(chunk_id, source)` | Per chunk (10_000 ledgers) | None | Ledger `.pack` + raw txhash `.bin` + events cold segment | +| `build_txhash_index(tx_index_id)` | Per tx index | All `process_chunk` tasks for this tx index | 16 RecSplit `.idx` files | +| `cleanup_txhash(tx_index_id)` | Per tx index | `build_txhash_index` for this tx index | Deletes raw `.bin` files + `chunk:{chunk_id:08d}:txhash` meta keys | + +- Each task is a black box to the DAG scheduler — it calls `execute()` and waits for return. +- What happens inside (concurrency, I/O, parallelism) is up to the task. + +For the chunks of a single tx index (first chunk through last chunk, inclusive), the dependencies look like this: + +``` +process_chunk(chunk_id=first) ─┐ +process_chunk(chunk_id=first+1) ─┤ +process_chunk(chunk_id=first+2) ─┼──→ build_txhash_index(tx_index_id) ──→ cleanup_txhash(tx_index_id) +... │ +process_chunk(chunk_id=last) ─┘ +``` + +- All `process_chunk` tasks for a tx index must complete before `build_txhash_index` fires. +- `cleanup_txhash` runs after `build_txhash_index` succeeds. +- Cleanup deletes the raw `.bin` files and their `chunk:{chunk_id:08d}:txhash` meta keys. + +### Main Flow + +`run_backfill` is invoked by the service's [Phase 1 (catchup)](./02-streaming-workflow.md#phase-1--catchup) with an integer chunk range: + +```python +def run_backfill(config, range_start_chunk_id, range_end_chunk_id): + validate(range_start_chunk_id, range_end_chunk_id) + + dag = build_dag(config, range_start_chunk_id, range_end_chunk_id) + dag.execute(max_workers=MAX_CPU_THREADS) +``` + +### Pre-DAG Validation + +- Validation runs in two layers, both pre-DAG: + 1. **Service startup** (`validate_config`) — runs once per process start, before any backfill is invoked. Authoritative enforcer of config-immutability: `CHUNKS_PER_TX_INDEX` (and `RETENTION_LEDGERS`) cannot change across runs. Defined in [02-streaming-workflow.md — Validation Pseudocode](./02-streaming-workflow.md#validation-pseudocode). + 2. **Per `run_backfill` call** - `validate` runs before DAG construction. Argument sanity only (chunk-range bounds). +- Why pre-DAG and not a DAG task: no-dependency tasks would start concurrently; a validation failure would leave in-flight work to cancel. Pre-DAG = clean abort, no partial work. +- No source probe. `run_backfill` trusts the caller's range; source-coverage problems surface at runtime as task failures — see [Error Handling](#error-handling). +- `[BSB]` must be configured. Phase 1 (catchup) only calls `run_backfill` when `[BSB]` is present. + +```python +def validate(range_start_chunk_id, range_end_chunk_id): + # Argument sanity only. run_backfill trusts the caller's range — any source-coverage + # issue (upper or lower bound) surfaces at runtime as a per-task get_ledger failure. + assert range_start_chunk_id >= 0 + assert range_end_chunk_id >= range_start_chunk_id +``` + +### DAG Setup + +```python +def build_dag(config, range_start_chunk_id, range_end_chunk_id): + # Invariant: range_start_chunk_id is always tx-index-aligned + # Phase 1 (catchup) is the only caller of this function, and it aligns the start chunk ID to the nearest tx index boundary, which is why validate() doesn't check for that. + # This means the first tx index in the range is always fully covered by the chunk range, and thus always buildable. + # A partial-at-end (trailing partial) is normal: BSB-tip lands wherever network production is, mid-index is typical. + + dag = new DAG() + first_index = tx_index_id_of_chunk(range_start_chunk_id) + last_index = tx_index_id_of_chunk(range_end_chunk_id) + + for tx_index_id in range(first_index, last_index + 1): + chunk_tasks = [] + for chunk_id in chunks_for_tx_index(tx_index_id): + if range_start_chunk_id <= chunk_id <= range_end_chunk_id: + t = dag.add(ProcessChunkTask(chunk_id, config), deps=[]) + chunk_tasks.append(t.id) + + # If the tx_index is fully covered (i.e., last chunk ≤ range_end), + # schedule build_txhash_index + cleanup_txhash. + # Otherwise the tx_index is the trailing partial — its last chunk isn't available for + # consumption yet — and only the process_chunk tasks above run; + # tx-build is skipped in that case. + if last_chunk_in_tx_index(tx_index_id) <= range_end_chunk_id: + build_task = dag.add(BuildTxHashIndexTask(tx_index_id), deps=chunk_tasks) + dag.add(CleanupTxHashTask(tx_index_id), deps=[build_task.id]) + + return dag +``` + +**Examples:** + +- `input chunk range = [0, 5_999]`, `cpi = 1_000`, starting chunk is 0 - already tx-index aligned → tx_indexes 0..5 fully covered and created; no trailing partial. +- `input chunk range = [3_000, 6_100]`, `cpi = 1_000`, starting chunk is 3 - already tx-index aligned → tx_indexes 3..5 fully covered; tx_index 6 trailing partial with only chunk 6_000 created; `build_txhash_index` skipped for tx_index 6. + +**Trailing partial tx-index:** + +- On disk: chunks have `.bin` files + `:lfs` + `:events` + `:txhash` flags; `index:{tx_index_id:08d}:txhash` absent. +- Ledger and events data are not blocked by tx-index alignment — their flags land as each chunk's outputs are durable. +- The deferred build runs later: via a subsequent `run_backfill` call when BSB covers the tail, or via streaming's live-ingestion path — see [02-streaming-workflow.md — Phase 2 (`.bin` hydration)](./02-streaming-workflow.md#phase-2--hydrate-txhash-data-from-bin) and [02-streaming-workflow.md — RecSplit Transition](./02-streaming-workflow.md#recsplit-transition). + +### DAG Scheduler + +- The subroutine builds a single DAG per invocation and executes it with bounded concurrency. +- The DAG is the only scheduling mechanism — no per-tx-index coordinators, no secondary worker pools. +- Each task's `execute()` is wrapped with a retry loop bounded by `MAX_RETRIES` (implementation-defined constant). Any transient failure (BSB errors, temporary I/O issues) triggers a retry at the task level. + +```python +def run_dag(dag, max_workers): + worker_slots = Semaphore(max_workers) + runnable_tasks = ThreadSafeQueue(dag.tasks_with_no_pending_dependencies()) + + def execute_task(task): + for attempt in range(1, MAX_RETRIES + 1): + error = task.execute() + if error is None: + break + if attempt == MAX_RETRIES: + mark_failed(task, error) # halt dependents + break + log.warn("retry", task, attempt, error) + worker_slots.release() + + for downstream_task in dag.dependents_of(task): + downstream_task.mark_dependency_done(task) + if downstream_task.all_dependencies_done(): + runnable_tasks.push(downstream_task) + + while runnable_tasks: + current_task = runnable_tasks.pop() + worker_slots.acquire() + run_in_background(execute_task, current_task) +``` + +### Worker Pool + +- Single flat pool of `max_workers = MAX_CPU_THREADS` slots. +- Any mix of task types can occupy slots simultaneously. +- `process_chunk`: 1 slot per task. +- `build_txhash_index`: 1 slot per task (uses internal parallelism across many concurrent workers). +- `cleanup_txhash`: 1 slot per task. +- BSB's `NUM_WORKERS` is a per-BSB internal download pool, not a cross-task concurrency knob. + +--- + +### `process_chunk` + +- Processes a single 10_000-ledger chunk end-to-end. +- Idempotent at flag granularity — produces only outputs whose flag is missing; a partially-completed chunk resumes from where it left off. + +**Outputs** (each only if its flag is missing): + +- Ledger pack file (`{chunk_id:08d}.pack`) — see [packfile format](../../design-docs/packfile-library.md). +- Raw txhash flat file (`{chunk_id:08d}.bin`) — 36-byte entries (`txhash[32]` + `ledgerSeq[4]`) consumed by the RecSplit builder. +- Events cold segment (`events.pack` + `index.pack` + `index.hash`) — see [getEvents design](../../design-docs/getevents-full-history-design.md). + +**Pseudocode:** + +```python +def process_chunk(chunk_id, config): + first_ledger = first_ledger_in_chunk(chunk_id) + last_ledger = last_ledger_in_chunk(chunk_id) + + need_lfs = not meta_store.has(f"chunk:{chunk_id:08d}:lfs") + need_txhash = not meta_store.has(f"chunk:{chunk_id:08d}:txhash") + need_events = not meta_store.has(f"chunk:{chunk_id:08d}:events") + if not (need_lfs or need_txhash or need_events): + return + + # If :lfs is already on disk, read from the local packfile — no need to use BSB. + # Otherwise instantiate a per-task BSB scoped to THIS chunk's 10_000 ledgers. + if need_lfs: + ledger_reader = BSBSource(config.bsb) + ledger_reader.prepare_range(first_ledger, last_ledger) + else: + ledger_reader = local_packfile(ledger_pack_path(chunk_id)) + + ledger_writer = packfile.create(ledger_pack_path(chunk_id), overwrite=True) if need_lfs else None + txhash_writer = open(raw_txhash_path(chunk_id), overwrite=True) if need_txhash else None + events_writer = events_segment.create(events_segment_path(chunk_id), overwrite=True) if need_events else None + + try: + for ledger_seq in range(first_ledger, last_ledger + 1): + lcm = ledger_reader.get_ledger(ledger_seq) + if need_lfs: ledger_writer.append(compress(lcm)) + if need_txhash: txhash_writer.append(extract_txhashes(lcm)) # 36 bytes per tx + if need_events: events_writer.append(extract_events(lcm)) + + # Fsync + flag each output independently (flag-after-fsync). + if need_lfs: + ledger_writer.fsync_and_close() + meta_store.put(f"chunk:{chunk_id:08d}:lfs", "1") + if need_txhash: + txhash_writer.fsync_and_close() + meta_store.put(f"chunk:{chunk_id:08d}:txhash", "1") + if need_events: + events_writer.finalize() + meta_store.put(f"chunk:{chunk_id:08d}:events", "1") + finally: + ledger_reader.close() # BSB: tears down the per-task instance. Local packfile: closes file handle. +``` + +**Notes:** + +- If `:lfs` is set (pack file already on disk), reads from local NVMe instead of BSB — avoids redundant downloads on restart. +- Each flag is written independently after its output's `fsync`; no atomic WriteBatch needed. +- `packfile.create(..., overwrite=True)` handles truncation of partial files from prior crashes; no explicit cleanup before write. +- Each `process_chunk` owns its own BSB instance, scoped to the chunk's 10_000 ledgers and torn down at task exit. Cross-task concurrency cap is the DAG [Worker Pool](#worker-pool); the BSB interface is documented in [02-streaming-workflow.md — Ledger Source](./02-streaming-workflow.md#ledger-source). +- Adding a new data type = adding a fourth flag + writer; no other task changes. + +--- + +### `build_txhash_index` + +- Builds the RecSplit index for one completed tx index. +- Occupies one DAG worker slot but spawns multiple concurrent workers internally (per-stage worker counts are in the pseudocode). + +**Pseudocode:** + +```python +def build_txhash_index(tx_index_id): + if meta_store.has(f"index:{tx_index_id:08d}:txhash"): + return + + # All-or-nothing recovery: absent flag ⇒ any .idx on disk is from a crashed attempt. + delete_partial_idx_files(recsplit_index_path(tx_index_id)) + + # Invariant: every chunk's .bin is on disk when this runs. Prior-iteration chunks + # keep their .bin until cleanup_txhash runs; cleanup is DAG-gated on this build. + bin_files = list_bin_files(tx_index_id) + + # Stage 1 (COUNT) — two passes total over .bin; count entries per CF. + cf_counts = parallel_count(bin_files, workers=100) + + # Stage 2 (ADD) — route entries into 16 per-CF builders (nibble = txhash[0] >> 4). + cf_builders = [RecSplitBuilder(cf_counts[nibble]) for nibble in range(16)] + parallel_add(bin_files, cf_builders, workers=100) + + # Stage 3 (BUILD) — 16 parallel CF builds; each produces one .idx; all fsynced. + parallel_build(cf_builders, workers=16) + + # Stage 4 (VERIFY) — full verification; no wall-clock pressure at backfill time. + parallel_verify(bin_files, cf_builders, workers=100) + + # Backfill writes "1" only; streaming's prune path may transition through "deleting". + meta_store.put(f"index:{tx_index_id:08d}:txhash", "1") +``` + +**Notes:** + +- VERIFY always runs — no `--verify-recsplit=false` escape hatch; backfill trades throughput for correctness every time. +- All-or-nothing recovery on restart: absent `index:{tx_index_id:08d}:txhash` ⇒ delete partial `.idx` files and re-run the full build. + +--- + +### `cleanup_txhash` + +- Runs after `build_txhash_index` completes successfully. Modeled as a separate DAG task (not inline in `build_txhash_index`) so crash recovery falls out naturally — on restart, the DAG sees the tx-index flag set but per-chunk `:txhash` flags still present, and cleanup re-runs as a normal task. + +**Pseudocode:** + +```python +def cleanup_txhash(tx_index_id): + # File-before-flag-delete on every cleanup pair (see 02-streaming-workflow.md — Flag Semantics). + # On any crash mid-pair, the flag is the recovery signal — never an orphan file with no record. + for chunk_id in chunks_for_tx_index(tx_index_id): + if not meta_store.has(f"chunk:{chunk_id:08d}:txhash"): + continue + delete_if_exists(raw_txhash_path(chunk_id)) + meta_store.delete(f"chunk:{chunk_id:08d}:txhash") +``` + +**Notes:** + +- Per-chunk idempotency: each chunk checks its own `chunk:{chunk_id:08d}:txhash` flag before deleting; a crash mid-cleanup resumes safely. + +--- + +## Resilience + +Crash recovery and error handling share one foundation: flag-after-fsync makes the meta store authoritative, and every task checks its own flags before doing work. Transient failures retry at BSB-internal and task-level layers; persistent failures abort the run, and on restart already-complete work is skipped. + +### Crash Recovery + +No separate reconciliation phase — every task's `execute()` checks its own completion state: + +- `build_dag()` registers ALL tasks for the chunk range on every invocation; no meta-store scanning in setup. +- `process_chunk` checks each output flag independently — missing output is produced; existing output is skipped. +- `build_txhash_index` checks `index:{tx_index_id:08d}:txhash` — present → early return; absent → delete partial `.idx` files, rerun full build. +- `cleanup_txhash` checks `chunk:{chunk_id:08d}:txhash` per-chunk — cleaned skipped, remaining cleaned. + +Three invariants make this work: + +1. **Key implies durable file** — a meta store flag is set only after fsync. +2. **Tasks are idempotent** — each checks its own outputs and skips or overwrites what exists. +3. **DAG registers all tasks on every invocation** — completed tasks return immediately from `execute()`. + +### Concurrent Access Prevention + +The service acquires a directory flock on the meta-store at startup. A second process against the same datadir fails immediately. + +### Error Handling + +Two layers of retry: + +- **BSB-internal retries.** `BSBSource` handles transient errors (connection resets, throttling) inside a single task execution. Invisible to the DAG. +- **Task-level retries.** DAG wraps each task's `execute()` in a retry loop bounded by `MAX_RETRIES`. + - Source retries exhausted → task retries whole. + - `MAX_RETRIES` exhausted → task marked failed → DAG halts dependents → `run_backfill` returns fatal → [Phase 1 (catchup)](./02-streaming-workflow.md#phase-1--catchup) propagates error to the service → service exits non-zero. + - Operator fixes root cause + restarts → Phase 1 (catchup) re-enters → `run_backfill` re-invoked with a fresh range → completed work skipped via per-chunk idempotency. + +| Error | Handled by | Action | +|-------|-----------|--------| +| BSB transient error (throttle, connection reset) | BSB-internal retry | Retried within the task; transparent to DAG | +| BSB persistent error (BSB retries exhausted) | Task-level retry | `MAX_RETRIES` attempts; then ABORT | +| Ledger pack write / fsync failure | Task-level retry | `MAX_RETRIES` attempts; then ABORT; flag not set | +| Txhash write / fsync failure | Task-level retry | `MAX_RETRIES` attempts; then ABORT; flag not set | +| Events write / fsync failure | Task-level retry | `MAX_RETRIES` attempts; then ABORT; flag not set | +| RecSplit build failure | Task-level retry | `MAX_RETRIES` attempts; then ABORT; tx-index key absent | +| VERIFY stage mismatch | None | ABORT immediately — data corruption; operator investigates | +| Meta store write failure | None | ABORT immediately — treat as crash; operator re-runs service | diff --git a/full-history/design-docs/02-streaming-workflow.md b/full-history/design-docs/02-streaming-workflow.md new file mode 100644 index 000000000..f62938607 --- /dev/null +++ b/full-history/design-docs/02-streaming-workflow.md @@ -0,0 +1,945 @@ +# Streaming Workflow + +## Overview + +stellar-rpc is the **unified full-history RPC service** — historical backfill and live streaming under one binary, one invocation, one long-running process. + +- Operator runs `stellar-rpc --config path/to/config.toml`. No subcommand. No `--mode` flag. No behavior-switching flags. +- On every start, the service runs four sequential startup phases, then enters a live ingestion loop it stays in until killed. +- Behavior across the three operator profiles — **archive** (full history), **pruning-history** (retention-windowed history with bulk catchup from a remote object store), **tip-tracker** (retention-windowed history, no object store; captive-core-only) — is determined entirely by TOML config; no profile flag. Full matrix: [Operator Profiles](#operator-profiles). +- Backfill (specified in [01-backfill-workflow.md](./01-backfill-workflow.md)) is used as an internal subroutine by Phase 1 (catchup). Operators never invoke backfill directly. + +**What the service does end-to-end:** +- Validates config against immutable meta-store state (`CHUNKS_PER_TX_INDEX`, `RETENTION_LEDGERS`). +- Catches up to the current network tip using **BSB** (Buffered Storage Backend — remote object-store reader for `LedgerCloseMeta`) or captive core (embedded `stellar-core` subprocess), whichever is configured. See [Ledger Source](#ledger-source). +- Hydrates any in-flight state left by a prior run. +- Ingests live ledgers from captive core into three **active RocksDB stores** (per-chunk ledger + events, per-index txhash). See [Active Store Architecture](#active-store-architecture). +- Freezes active stores to immutable files at chunk and index boundaries in background. +- Prunes past-retention indexes atomically when retention is configured. +- Serves `getLedger` / `getTransaction` / `getEvents` only after startup phases complete; returns HTTP 4xx during startup. + +--- + +## Geometry + +See [01-backfill-workflow.md — Geometry](./01-backfill-workflow.md#geometry). Streaming uses the same constants (`GENESIS_LEDGER`, `LEDGERS_PER_CHUNK`, `LEDGERS_PER_TX_INDEX`, `CHUNKS_PER_TX_INDEX`), mapping functions, and derived helpers. + +--- + +## Configuration + +Streaming reads the same TOML file as backfill, plus additional keys described below. + +### Shared Config (from backfill) + +These sections come from backfill — see [01-backfill-workflow.md — Configuration](./01-backfill-workflow.md#configuration) for the full schemas: + +- `[SERVICE]` — service-wide settings (`DEFAULT_DATA_DIR`, `CHUNKS_PER_TX_INDEX`). +- `[BSB]` — Buffered Storage Backend source settings. +- `[IMMUTABLE_STORAGE.*]` — on-disk paths for immutable artifacts (ledger packs, events, raw txhash, txhash index). +- `[META_STORE]` — meta-store RocksDB path. +- `[LOGGING]` — log level + format. + +Streaming extends `[SERVICE]` with extra keys and introduces `[CAPTIVE_CORE]` (embedded `stellar-core` subprocess settings), `[ACTIVE_STORAGE]` (active RocksDB paths), and `[HISTORY_ARCHIVES]` (Stellar history-archive URLs for tip sampling). + +### Immutable Keys (stored in meta store, fatal if changed) + +Stored on first start; fatal on any subsequent start where the config value differs. Changing either requires wiping the datadir. + +| Key | Stored under | Set by | Rule | +|---|---|---|---| +| `CHUNKS_PER_TX_INDEX` | `config:chunks_per_tx_index` | first run | Fatal if changed. | +| `RETENTION_LEDGERS` | `config:retention_ledgers` | first run | Fatal if changed. | + +Source selection (BSB vs captive core) is determined per-startup by `[BSB]` presence; operators _may add or remove_ `[BSB]` between runs. `RETENTION_LEDGERS` already pins the retained window, so locking the source choice would add nothing. + +### Streaming TOML Config + +**[SERVICE] — streaming additions** + +Extends the `[SERVICE]` table in [01-backfill-workflow.md — Configuration](./01-backfill-workflow.md#configuration) + +| Key | Type | Default | Description | +|---|---|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `RETENTION_LEDGERS` | uint32 | `0` | `0` = full history; otherwise must be a positive multiple of `LEDGERS_PER_TX_INDEX`. See [Validation Rules](#validation-rules). | +| `NETWORK_PASSPHRASE` | string | **required** | Stellar network passphrase. Must match the `NETWORK_PASSPHRASE` in the captive-core config file. | + +**[CAPTIVE_CORE]** + +| Key | Type | Default | Description | +|---|---|---|---| +| `CONFIG_PATH` | string | **required** | Path to the captive-core TOML config file (consumed by the embedded `stellar-core` subprocess). | +| `STELLAR_CORE_BINARY_PATH` | string | **required** | Path to the `stellar-core` binary that captive core spawns as a subprocess. | + +**[ACTIVE_STORAGE]** (optional) + +| Key | Type | Default | Description | +|---|---|---|---| +| `PATH` | string | `{DEFAULT_DATA_DIR}/active` | Base path for active RocksDB stores (ledger, txhash, events). | + +**[HISTORY_ARCHIVES]** + +| Key | Type | Default | Description | +|---|---|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| `URLS` | []string | **required** | List of Stellar history archive URLs. Used to sample network tip for the no-BSB resume-from-ledger calculation | + +**[BSB]** (optional) — same schema as in the backfill doc; presence determines Phase 1 (catchup) behavior. See [Operator Profiles](#operator-profiles). + +### CLI Flags + +| Flag | Type | Default | Description | +|---|---|---|---| +| `--config` | string | **required** | Path to TOML config file. | + +**No other flags.** No `--mode`; no `--start-ledger`, `--end-ledger`; no subcommand for backfill or streaming. +Per-run behavior is driven by config or derived at runtime from meta store + tip. + +### Validation Rules + +- `CHUNKS_PER_TX_INDEX` and `RETENTION_LEDGERS` are immutable across runs (see [Immutable Keys](#immutable-keys-stored-in-meta-store-fatal-if-changed)). +- `RETENTION_LEDGERS` must be `0` OR a positive integer multiple of `LEDGERS_PER_TX_INDEX`. Valid at cpi=1_000: `0`, `10_000_000`, `20_000_000`, ...; invalid: `5_000_000`, `15_000_000`. Pruning is whole-index — non-aligned windows would leave partial indexes perpetually on disk. +- **`[BSB]` absent AND `RETENTION_LEDGERS = 0` is fatal.** Full history requires BSB — captive-core archive-catchup from genesis would take weeks-to-months. +- `[HISTORY_ARCHIVES].URLS`, `[CAPTIVE_CORE].CONFIG_PATH`, `[CAPTIVE_CORE].STELLAR_CORE_BINARY_PATH`, `[SERVICE].NETWORK_PASSPHRASE` are required in all profiles. + +### Validation Pseudocode + +```python +def validate_config(config, meta_store): + apply_static_rules(config) # see "Validation Rules" above + _enforce_immutable(meta_store, "config:chunks_per_tx_index", str(config.service.chunks_per_tx_index)) + _enforce_immutable(meta_store, "config:retention_ledgers", str(config.service.retention_ledgers)) + + +def _enforce_immutable(meta_store, key, current_value): + stored = meta_store.get(key) + if stored is None: + meta_store.put(key, current_value) + elif stored != current_value: + fatal(f"{key} changed: stored={stored}, config={current_value}. Wipe datadir.") +``` + +### Operator Profiles + +Three profiles emerge from config combinations. No separate profile flag. + +| Profile | `RETENTION_LEDGERS` | `[BSB]` | Use case | Backfill behavior | +|---|---|---|---|---| +| Archive | `0` | present | Public archive node; full history. | Backfill over full history (chunks `[0, bsb_tip_chunk]`) | +| Pruning-history | `N × LEDGERS_PER_TX_INDEX`, N ≥ 1 | present | Windowed history with bulk initial catchup. | Backfill over retention window (start aligned to first chunk of the tx index containing the retention floor) | +| Tip-tracker | `N × LEDGERS_PER_TX_INDEX`, N ≥ 1 | absent | App developer; short retention; no object-store dep. | **No-op.** Phase 4 (live ingestion)'s captive core archive-catches-up from a `resume_ledger` aligned to the retention-aligned tx-index boundary | +| (invalid) | `0` | absent | Rejected by `validate_config`: full history requires BSB. | — | + +--- + +## Meta Store Keys + +Single RocksDB instance, WAL always enabled. Authoritative source for every startup decision. Reference for the schema and lifecycle below; reads more naturally after [Startup Sequence](#startup-sequence) defines the phases that write and consume these keys. + +### Keys Introduced by Streaming + +| Key | Value | Written when | +|--|-|--| +| `streaming:last_committed_ledger` | uint32 | Monotonic progress marker; written via `advance_progress_marker`. Two writers: Phase 1 (post-catchup) and the live ingestion loop (per ledger). | +| `config:retention_ledgers` | uint32 | First run (stored); enforced on subsequent starts. | +| `hot:chunk:{chunk_id:08d}:lfs` | `"1"` | Set BEFORE active ledger store dir is created; cleared AFTER dir is removed by freeze. Presence ⇒ dir exists or its lifecycle is incomplete. | +| `hot:chunk:{chunk_id:08d}:events` | `"1"` | Same pattern, active events store dir. | +| `hot:index:{tx_index_id:08d}:txhash` | `"1"` | Same pattern, active txhash store dir. Per-index cadence (one per tx index). | +| `pruning:index:{tx_index_id:08d}` | `"1"` | Set by `prune_tx_index` BEFORE any file delete; cleared AFTER everything else (artifacts + `index:{N}:txhash`). QueryRouter returns 4xx while present; `prunable_tx_index_ids` re-enqueues N if the marker survives a crash. | + +### Keys Shared with Backfill + +Defined in [01-backfill-workflow.md — Meta Store Keys](./01-backfill-workflow.md#meta-store-keys); streaming uses the same contract: + +- `config:chunks_per_tx_index` +- `chunk:{chunk_id:08d}:lfs` +- `chunk:{chunk_id:08d}:events` +- `chunk:{chunk_id:08d}:txhash` +- `index:{tx_index_id:08d}:txhash` + +All values are binary (`"1"` or absent); prune-in-progress is tracked via the separate `pruning:index:*` key family rather than overloading the value space. + +### Flag Semantics + +- **Flag-after-fsync** — a freeze flag is set only AFTER its artifact is fsynced. Present ⇒ artifact durable. +- **File-before-flag-delete** — cleanup paths delete the file/dir FIRST, clear the key LAST. Reverse order would orphan a file with no meta-store record on a crash mid-pair, recoverable only by filesystem scan. +- **Hot keys before mkdir, cleared after dir-delete** — every active store dir has a `hot:*` key set BEFORE `mkdir` and cleared AFTER `delete_dir_if_exists`. + +_**The meta-store flag is the always-correct signal of artifact state on disk, both for immutable files and for active-store directories. A crash anywhere leaves a state the next start recovers from by flag presence alone — no filesystem scan anywhere.**_ + +--- + +## Active Store Architecture + +Three RocksDB-backed active stores; WAL always enabled. + +- **Ledger** — one per chunk at `{ACTIVE_STORAGE.PATH}/ledger-store-chunk-{chunk_id:08d}/`. Key `uint32BE(ledgerSeq)`, value `zstd(LCM bytes)`. +- **TxHash** — one per tx index at `{ACTIVE_STORAGE.PATH}/txhash-store-index-{tx_index_id:08d}/`. Key `txhash[32]`, value `uint32BE(ledgerSeq)`. 16 column families (`cf-0`..`cf-f`) routed by `txhash[0] >> 4`; each CF pairs 1:1 with one of the 16 RecSplit `.idx` files. +- **Events** — one per chunk at `{ACTIVE_STORAGE.PATH}/events-store-chunk-{chunk_id:08d}/`. Schema per [getEvents full-history design](../../design-docs/getevents-full-history-design.md). Per-ledger writes are idempotent. + +### Store Lifecycle + +- At every chunk boundary, the next chunk's ledger + events stores open synchronously (~100 ms) while the just-finished ones are handed to background freeze tasks; tx-index boundaries do the same for txhash. Each kind holds at most one active + one transitioning. Ingestion never blocks on the freeze. +- The freeze task deletes the active dir only after the immutable artifact is fsynced and its freeze flag is set. +- Active-store dirs surviving a crash are reconciled by [Phase 3](#phase-3--reconcile). + +--- + +## Ledger Source + +Two ledger sources, scoped to different phases: + +- **Backfill (Phase 1) uses `BSBSource`** — backfill-only reader (`PrepareRange` + `GetLedger`). Each `process_chunk` constructs its own instance scoped to its chunk's 10_000 ledgers. Captive core cannot be a backfill source — see [Backfill vs Phase 1 (catchup)](#backfill-vs-phase-1-catchup). +- **Live streaming (Phase 4 (live ingestion)) uses captive core directly** — `PrepareRange(UnboundedRange(resume_ledger))` + per-ledger `GetLedger(seq)` against the captive-core subprocess. + +--- + +## Startup Sequence + +Four sequential phases on every start. The first three are bounded bootstrap; Phase 4 is the long-running ingestion state. + +- **Phase 1 — catchup.** When `[BSB]` is configured, invokes the backfill subroutine in a loop to close the gap between on-disk artifacts and current network tip. Without `[BSB]`, Phase 1 is a no-op and Phase 4's captive core handles initial catchup via `PrepareRange(UnboundedRange(resume_ledger))`. +- **Phase 2 — hydrate txhash.** Loads any `.bin` files Phase 1 left (trailing partial index) into the active txhash store, then deletes them. +- **Phase 3 — reconcile.** Two passes: drop past-retention state, then recover in-flight freezes left by a prior crash. +- **Phase 4 — live ingestion.** Opens active stores, starts captive core, spawns the lifecycle task, enters the ingestion loop. Runs until process exit. + +**Backfill vs Phase 1 (catchup):** `run_backfill` (subroutine, [01-backfill-workflow.md](./01-backfill-workflow.md)) is BSB-only — captive core's serial subprocess can't be sharded per-chunk like BSB. `phase1_catchup` (startup phase) invokes backfill when `[BSB]` is configured, no-ops otherwise. + +### Service Entry Point + +```python +def main(): + args = parse_cli_flags() + config = load_config_toml(args.config) + run_rpc_service(config) + +def run_rpc_service(config): + meta_store = open_meta_store(config) + validate_config(config, meta_store) + start_http_server(config) # /getHealth servable; getLedger/Tx/Events 4xx until set_service_ready + + # Phases 1-3 + compute_resume_ledger: bring on-disk state into consistency. + # No query traffic during this window. + last_phase1_chunk_id = phase1_catchup(config, meta_store) + phase2_hydrate_txhash(config, meta_store, last_phase1_chunk_id) + phase3_reconcile(config, meta_store) + resume_ledger = compute_resume_ledger(config, meta_store) + + # Frozen-artifact queries don't need captive core; flip service_ready here + # (not after spinup) to avoid an unnecessary 4-5 min outage per restart. + set_service_ready() + phase4_live_ingest(config, meta_store, resume_ledger) +``` + +See [Query Contract](#query-contract) for the query-gating contract. + +### Phase 1 — Catchup + +```python +def phase1_catchup(config, meta_store) -> Optional[int]: + """ + Catch up history via BSB; no-op when [BSB] absent. + + Loop samples BSB tip, computes retention-aligned start, runs backfill, and + repeats until BSB stops advancing. The loop exists because the remote + object store lags the live network tip — each iteration may surface new + chunks that landed while the previous backfill was running. After the + loop, advances the progress marker so Phase 3 / compute_resume_ledger + see the post-catchup position rather than a stale prior-run value + (long-downtime correctness). + + Idempotent across restarts — backfill skips already-flagged chunks. + last_scheduled_end_chunk is local-only and resets every start. + + Returns: highest chunk_id completed (input to Phase 2), or None on no-op. + """ + if config.bsb is None: + # Tip-tracker profile. Nothing to backfill. + return None + + retention_ledgers = config.service.retention_ledgers + last_scheduled_end_chunk = -1 + + while True: + end_chunk = bsb_latest_complete_chunk_id(config.bsb) + if end_chunk <= last_scheduled_end_chunk: + break # BSB has no new complete chunks since last iter + start_chunk = retention_aligned_start_chunk(last_ledger_in_chunk(end_chunk), retention_ledgers) + if end_chunk < start_chunk: + # Defensive only — unreachable under current geometry. retention is 0 + # or N×LEDGERS_PER_TX_INDEX and floor is clamped to GENESIS, so + # first_chunk_of_tx_index_containing(floor) <= end_chunk always. + break + log.info(f"phase1_catchup bsb_tip_chunk={end_chunk} range=[{start_chunk}, {end_chunk}]") + run_backfill(config, start_chunk, end_chunk) + last_scheduled_end_chunk = end_chunk + + if last_scheduled_end_chunk < 0: + # Reached only when iter 1 broke on `end_chunk <= last_scheduled_end_chunk` + # with last_scheduled_end_chunk still at -1, i.e., bsb_latest_complete_chunk_id + # returned -1 (BSB has zero complete chunks — fresh bucket or transient + # empty state). No end_chunk to advance the marker to; Phase 4's captive + # core handles resume from whatever state exists. + # The other loop break (end_chunk < start_chunk) cannot land here under + # the current retention geometry — start_chunk <= end_chunk always. + return None + + # Advance marker past any stale prior-run value so Phase 3's floor calc and + # compute_resume_ledger don't replay chunks that are about to be pruned. + advance_progress_marker(meta_store, last_ledger_in_chunk(last_scheduled_end_chunk)) + return last_scheduled_end_chunk + +def retention_floor_ledger(tip_ledger, retention_ledgers) -> int: + # Bottom edge of the retention window — oldest ledger to keep. Clamped to + # GENESIS for the early-bootstrap case where tip < retention_ledgers + # (tip - retention would otherwise be a non-existent negative ledger). + # Shared by Phase 1 (start-chunk computation) and Phase 3 Pass 1 (past-retention floor). + return max(tip_ledger - retention_ledgers, GENESIS_LEDGER) + + +def retention_aligned_start_chunk(tip_ledger, retention_ledgers): + # Aligns DOWN to a tx-index boundary (no-gaps); up to LEDGERS_PER_TX_INDEX - 1 ledgers below strict retention. + if retention_ledgers == 0: + return 0 + return first_chunk_id_of_tx_index_containing(retention_floor_ledger(tip_ledger, retention_ledgers)) + +def advance_progress_marker(meta_store, candidate_ledger): + """ + Monotonic write to streaming:last_committed_ledger. Two callers — Phase 1 + (post-catchup) and the live ingestion loop (per ledger, after all three + active stores commit). Regression would cause re-ingest waste and a stale + retention floor. + """ + prior = meta_store.get("streaming:last_committed_ledger") + if prior is None or candidate_ledger > prior: + meta_store.put("streaming:last_committed_ledger", candidate_ledger) +``` + +**Worker concurrency:** `run_backfill` caps DAG concurrency at `MAX_CPU_THREADS` — see [01-backfill-workflow.md — process_chunk](./01-backfill-workflow.md#process_chunk). Catchup time ≈ `retention_window / (BSB throughput)`. + +### Phase 2 — Hydrate TxHash Data from `.bin` + +Phase 1's backfill range almost always ends mid-tx-index — BSB tip lands wherever the live network is, rarely on an index boundary. +For that trailing partial tx index, per-chunk `.bin` files are on disk but `index:N:txhash` is not yet written (RecSplit waits until every chunk of N is complete). +Phase 2 loads each surviving `.bin` into the active txhash store, then deletes the `.bin` and `chunk:{chunk_id:08d}:txhash` flag. +After Phase 2: no `.bin` files and no `:txhash` chunk flags remain. + +```python +def phase2_hydrate_txhash(config, meta_store, last_phase1_chunk_id): + # Both sweeps: file-before-flag-delete (see Flag Semantics). + + # Sweep 1: clean leftover .bin from completed indexes (cleanup_txhash crashed mid-pair). + for tx_index_id in tx_index_ids_with_txhash_flag(meta_store): + for chunk_id in chunks_for_tx_index(tx_index_id): + if meta_store.has(f"chunk:{chunk_id:08d}:txhash"): + delete_if_exists(raw_txhash_path(chunk_id)) + meta_store.delete(f"chunk:{chunk_id:08d}:txhash") + + # Sweep 2: hydrate the trailing incomplete tx index into the active txhash store. + # Phase 1 returns the highest chunk it completed; the trailing tx index is + # the one containing that chunk — no separate scan needed. + if last_phase1_chunk_id is None: + return # no Phase 1 work → no .bin files + tx_index_id = tx_index_id_of_chunk(last_phase1_chunk_id) + if meta_store.has(f"index:{tx_index_id:08d}:txhash"): + return # last touched index already complete (RecSplit done) + + txhash_store = open_active_txhash_store(config, meta_store, tx_index_id) + try: + for chunk_id in chunks_for_tx_index(tx_index_id): + if not meta_store.has(f"chunk:{chunk_id:08d}:txhash"): + # Phase 1 didn't reach this chunk in the trailing tx_index + # (chunk_id > last_phase1_chunk_id) — no .bin file to hydrate. + continue + bin_path = raw_txhash_path(chunk_id) + # .bin absent + :txhash flag set ⇒ a prior Phase 2 deleted the file + # but crashed before clearing the flag (file-before-flag-delete). + # The data is already durable in the active txhash store from that + # prior load; skip the re-load and just finish flag cleanup below. + if os.path.exists(bin_path): + load_bin_into_rocksdb(bin_path, txhash_store) + delete_if_exists(bin_path) + meta_store.delete(f"chunk:{chunk_id:08d}:txhash") + finally: + txhash_store.close() + +def tx_index_ids_with_txhash_flag(meta_store) -> Set[int]: + # Scans chunk:*:txhash and returns the unique tx_index_ids those chunks + # belong to. Used by Sweep 1 to find indexes whose cleanup_txhash crashed + # mid-pair (some chunks of N still carry :txhash flags + .bin files). + result = set() + for key in meta_store.scan_prefix("chunk:"): + if not key.endswith(":txhash"): + continue + chunk_id = parse_chunk_id_from_chunk_key(key) + result.add(tx_index_id_of_chunk(chunk_id)) + return result + +def open_active_txhash_store(config, meta_store, tx_index_id) -> RocksDBStore: + """ + Idempotent. Safe to call repeatedly on the same tx_index_id: + + Hot key is written BEFORE mkdir (See Flag Semantics). + A crash in between leaves hot:* set + dir absent, + which Phase 3 Pass 2 reconciles. The reverse order would orphan the + dir with no meta-store record — only recoverable via filesystem scan, which + violates the meta-store-driven-recovery invariant. + """ + meta_store.put(f"hot:index:{tx_index_id:08d}:txhash", "1") + path = active_store_path_for("index:txhash", tx_index_id) + os.makedirs(path, exist_ok=True) + column_families = [f"cf-{nibble:x}" for nibble in range(16)] + # open_rocksdb_store recovers from any partial WAL on an existing DB. + return open_rocksdb_store(path, txhash_rocksdb_settings(column_families)) + + +def open_active_ledger_store(config, meta_store, chunk_id) -> RocksDBStore: + ... # same shape: put hot:chunk:{C}:lfs (idempotent), mkdir, open RocksDB. + +def open_active_events_store(config, meta_store, chunk_id) -> RocksDBStore: + ... # same shape: put hot:chunk:{C}:events (idempotent), mkdir, open RocksDB. +``` + +Pure-streaming restarts (no recent Phase 1 output) never see `.bin` files; the live path writes txhash directly to the active store. Phase 2 is a no-op. + +### Phase 3 — Reconcile + +Two passes, both meta-store-driven. Pass 1 drops past-retention state first so Pass 2 only has to handle hot:* keys that are still within retention. + +```python +def phase3_reconcile(config, meta_store): + pass1_drop_past_retention_state(config, meta_store) + pass2_recover_in_flight_transitions(config, meta_store) + + +def pass1_drop_past_retention_state(config, meta_store): + """ + Drop every meta-store key and on-disk artifact whose ledger range falls + below the retention floor. Three distinct categories of state get cleaned: + + 1. Lifecycle-unreachable orphans. + - hot:* keys + active-store dirs (LFS / events / txhash) for chunks + and indexes below the floor. The freeze can't run on past-retention + data, so these dirs have no path forward. + - chunk:{C}:lfs / :events / :txhash flags + their immutable files for + chunks of an INCOMPLETE prior-run tx index. Because index:N:txhash + was never written for that tx index, the pruning lifecycle's + past-retention scan (which keys off index:N:txhash = "1") cannot + find them — Pass 1 is their only cleanup path. The chunk loop also + applies a sibling blanket-delete: for every chunk K below floor that + carries any chunk:K:* key, delete_if_exists is called on ALL three + artifact paths (.pack, .bin, events cold segment), not just the one + this key represents. Catches partial files left by a prior crashed + process_chunk where one kind completed (flag set) but the next kind + hadn't — its partial file on disk has no key tracking it. + + 2. Lifecycle-reachable complete state, drained eagerly. + - index:N:txhash flags + the 16 RecSplit .idx files for fully-built + indexes below the floor. The pruning lifecycle would pick these up + on its next sweep; Pass 1 drops them at startup so the first sweep + starts with no backlog. + + 3. Mid-prune markers from a prior-run crash, below floor. + - pruning:index:N markers below floor. The chunk/index loops above + have already deleted N's files + flags, so this loop just clears + the marker. Above-floor markers are left alone — the pruning + lifecycle's crash-recovery scan picks them up on its next sweep. + + Floor reference: sample the live network tip from the history archive. + The progress marker is NOT a reliable tip proxy — `[BSB]` may be configured + but no longer advancing (operator stopped updating the remote store; BSB + outage; etc.), in which case the marker reflects BSB's last seen ledger, + not the live network tip. Falling back to the marker on archive-unreachable + is degraded best-effort cleanup; the pruning lifecycle catches up later as + captive core advances the marker through the gap. + """ + retention_ledgers = config.service.retention_ledgers + if retention_ledgers == 0: + return # archive profile — no floor + + current_tip = try_sample_network_tip(config) + if current_tip is None: + # Archive unreachable. Degraded fallback to the local marker — Pass 1 + # under-cleans for now; pruning lifecycle catches up after captive core + # advances the marker. + current_tip = meta_store.get("streaming:last_committed_ledger") + if current_tip is None: + log.warn("phase3 pass1: no tip reference available (archive unreachable + marker absent); skipping past-retention cleanup") + return + + floor_ledger = retention_floor_ledger(current_tip, retention_ledgers) + floor_chunk = chunk_id_of_ledger(floor_ledger) + + # Category 1 — lifecycle-unreachable orphans (hot keys + chunks of incomplete prior-run tx indexes). + for hot_key in meta_store.scan_prefix("hot:chunk:"): + store_kind, chunk_id = parse_hot_key(hot_key) + if chunk_id < floor_chunk: + delete_dir_if_exists(active_store_path_for(store_kind, chunk_id)) + meta_store.delete(hot_key) + log.info(f"phase3 pass1: discarded past-retention {hot_key}") + + for hot_key in meta_store.scan_prefix("hot:index:"): + _, tx_index_id = parse_hot_key(hot_key) + if last_ledger_in_tx_index(tx_index_id) < floor_ledger: + delete_dir_if_exists(active_store_path_for("index:txhash", tx_index_id)) + meta_store.delete(hot_key) + log.info(f"phase3 pass1: discarded past-retention {hot_key}") + + # Sibling blanket-delete: the first time we see any chunk:K:* key for K + # below floor, delete_if_exists ALL three artifact paths for K (.pack, .bin, + # events cold segment) — not just the one this key represents. Catches + # partial files from a prior crashed process_chunk where some kinds + # completed (flag set) but others didn't (flag absent + partial file on + # disk with no key tracking it). Idempotent — extra delete_if_exists calls + # on already-clean paths are no-ops. + blanket_cleaned = set() + for chunk_key in meta_store.scan_prefix("chunk:"): + chunk_id, _ = parse_chunk_key(chunk_key) + if chunk_id >= floor_chunk: + continue + if chunk_id not in blanket_cleaned: + delete_if_exists(ledger_pack_path(chunk_id)) + delete_events_segment(chunk_id) + delete_if_exists(raw_txhash_path(chunk_id)) + blanket_cleaned.add(chunk_id) + meta_store.delete(chunk_key) + + # Category 2 — past-retention complete indexes; lifecycle could reach but Pass 1 drains eagerly. + for index_key in meta_store.scan_prefix("index:"): + _, tx_index_id = parse_index_key(index_key) + if last_ledger_in_tx_index(tx_index_id) < floor_ledger: + delete_recsplit_idx_files(tx_index_id) + meta_store.delete(index_key) + + # Category 3 — clear below-floor pruning:index:* markers; files + flags already removed above. + for pruning_key in meta_store.scan_prefix("pruning:index:"): + tx_index_id = parse_pruning_index_id(pruning_key) + if last_ledger_in_tx_index(tx_index_id) < floor_ledger: + meta_store.delete(pruning_key) + + +def pass2_recover_in_flight_transitions(config, meta_store): + # Pass 1 has already removed past-retention hot keys; every entry here is + # at or above the retention floor. + last_committed = meta_store.get("streaming:last_committed_ledger") + if last_committed is None: + # No progress marker means neither Phase 1 nor the live loop has ever + # advanced it. Without a resume position, hot:* keys can't be classified + # into scenarios A/B/C/D described below — bail. Any stranded hot:* keys from a prior abnormal + # exit get classified on a later restart once the marker is set. + return + + resume_chunk_id = chunk_id_of_ledger(last_committed + 1) + resume_tx_index_id = tx_index_id_of_chunk(resume_chunk_id) + + for hot_key in meta_store.scan_prefix("hot:"): + store_kind, scope_id = parse_hot_key(hot_key) + resume_id = resume_chunk_id if store_kind.startswith("chunk:") else resume_tx_index_id + store_path = active_store_path_for(store_kind, scope_id) + freeze_flag_key = freeze_flag_key_for(store_kind, scope_id) + + if scope_id == resume_id: + continue # A: resume target — Phase 4 reopens. + elif meta_store.has(freeze_flag_key): + # B: flag-is-truth. Frozen, but cleanup didn't finish. + delete_dir_if_exists(store_path) + meta_store.delete(hot_key) + elif scope_id < resume_id: + # C: freeze interrupted; restart to completion. + finish_interrupted_freeze(store_kind, scope_id, meta_store) + else: + # D: future-orphan — shouldn't occur in normal flow. Log + cleanup. + log.warn(f"phase3 pass2: future-orphan {store_kind}/{scope_id:08d} > resume {resume_id:08d}") + delete_dir_if_exists(store_path) + meta_store.delete(hot_key) + + +``` + +`finish_interrupted_freeze` reopens the active store (idempotent on existing or partial dirs) and runs the corresponding live-path freeze ([LFS](#lfs-transition), [events](#events-transition), or [RecSplit](#recsplit-transition)) to produce the artifact. + +### Compute Resume Ledger + +Runs once per service start, after Phase 3, before Phase 4. Returns the ledger sequence Phase 4's captive core resumes from via `PrepareRange(UnboundedRange(resume_ledger))`. + +```python +def compute_resume_ledger(config, meta_store) -> int: + """ + Where should captive core resume ingestion? + Shorthand: `marker` = `streaming:last_committed_ledger`. + + Why not just "if marker exists, return marker + 1"? The marker is our + local progress checkpoint, not the live network's tip. In long-downtime + or BSB-stale scenarios it can be tens of millions of ledgers behind the + live tip; resuming at marker + 1 there would archive-catchup from the + stale point and re-ingest weeks of ledgers that the pruning lifecycle + would delete moments later. Branch 2 sidesteps that by sampling the + live tip and skipping forward when the marker has fallen behind. + + Three branches: + + Branch 1 — marker present and current (or staleness unconfirmed): + return marker + 1. Covers archive profile (no floor), normal + restarts, and the degraded path where the history archive is + unreachable. + + Branch 2 — marker present, retention > 0, marker below the retention + floor: stale. Return retention_aligned_resume_ledger_with_tip. Two + cases land here: (1) no-BSB long-downtime — prior live loop hasn't + committed in a while; (2) BSB-stale-but-configured — operator + stopped advancing BSB while the network kept producing, so Phase 1 + only advanced the marker to BSB's stale tip. The first live commit + on resume monotonically overwrites the stale marker via + advance_progress_marker. + + Branch 3 — marker absent (fresh operator start): + return retention_aligned_resume_ledger. + + No consistency validation. Phase 1 self-heals incomplete chunks; Phase 3 + recovers in-flight freezes; pruning lifecycle handles past-retention state. + """ + marker = meta_store.get("streaming:last_committed_ledger") + + # Branch 3 — fresh operator start. + if marker is None: + return retention_aligned_resume_ledger(config) + + # Branch 2 — staleness check. Skipped when retention=0 (no floor) or + # when the archive is unreachable (best-effort fall-through to Branch 1). + if config.service.retention_ledgers > 0: + current_tip = try_sample_network_tip(config) + if current_tip is not None: + floor = retention_floor_ledger(current_tip, config.service.retention_ledgers) + if marker < floor: + log.info(f"compute_resume_ledger: marker ({marker}) below retention floor ({floor}); skipping forward") + return retention_aligned_resume_ledger_with_tip(config, current_tip) + + # Branch 1 — marker is current (or staleness unconfirmed). + return marker + 1 + + +def retention_aligned_resume_ledger(config) -> int: + # No-BSB resume cursor: align to the first ledger of the tx index containing + # the retention floor; captive core archive-catches-up from that point. + network_tip = get_latest_network_tip(config.history_archives.urls) + return retention_aligned_resume_ledger_with_tip(config, network_tip) + + +def retention_aligned_resume_ledger_with_tip(config, network_tip_ledger) -> int: + retention_ledgers = config.service.retention_ledgers + target_ledger = max(network_tip_ledger - retention_ledgers, GENESIS_LEDGER) + return first_ledger_of_tx_index_containing(target_ledger) + + +def try_sample_network_tip(config) -> Optional[int]: + # Returns None on archive failure instead of raising. Used where a missing + # tip is recoverable (compute_resume_ledger's stale-marker check). + try: + return get_latest_network_tip(config.history_archives.urls) + except NetworkTipUnreachable: + return None +``` + +### Phase 4 — Live Ingestion + +Opens active stores for the resume position, spawns the lifecycle task, starts captive core, enters the ingestion loop. Query serving was already enabled by `run_rpc_service`; Phase 4 is purely about ingestion. Captive core takes 4–5 minutes to spin up — historical queries continue to be served against frozen artifacts during that window. See [Query Contract](#query-contract). + +```python +def phase4_live_ingest(config, meta_store, resume_ledger): + # open_active_*_store writes the hot:* key BEFORE mkdir; Phase 3 has cleaned + # stale hot keys, so mkdir lands on an empty path or (SCENARIO A) on the + # prior-run active dir, idempotently re-opened. + active_stores = open_active_stores_for_resume(config, meta_store, resume_ledger) + + # Initial sweep handles any pruning:index:* markers left by a prior crash. + run_in_background(run_prune_lifecycle_loop, config, meta_store) + + # PrepareRange blocks ~4-5 min during captive-core spinup. + ledger_backend = make_ledger_backend(config.captive_core.config_path) + ledger_backend.PrepareRange(UnboundedRange(resume_ledger)) + + run_live_ingestion_loop(config, ledger_backend, active_stores, meta_store, resume_ledger) + + +def open_active_stores_for_resume(config, meta_store, resume_ledger): + # Per-kind stores share no state; opening order is free. Idempotent on + # existing dirs (mkdir no-ops; RocksDB recovers from partial WAL). + resume_chunk_id = chunk_id_of_ledger(resume_ledger) + resume_tx_index_id = tx_index_id_of_chunk(resume_chunk_id) + + return ActiveStores( + ledger = open_active_ledger_store(config, meta_store, resume_chunk_id), + events = open_active_events_store(config, meta_store, resume_chunk_id), + txhash = open_active_txhash_store(config, meta_store, resume_tx_index_id), + ) +``` + +--- + +## Ingestion Loop + +Single background task. Pull-based: the service drives sequential `GetLedger(seq)` calls. Same code path drains captive core's internal buffer during catchup and switches cadence to live closes (~5 s per ledger) once caught up. + +```python +def run_live_ingestion_loop(config, ledger_backend, active_stores, meta_store, resume_ledger): + ledger_seq = resume_ledger + while True: + lcm = ledger_backend.GetLedger(ledger_seq) # blocks until available + wait_all( # all three writes durably commit before advancing the checkpoint + run_in_background(write_ledger_store, active_stores.ledger, ledger_seq, lcm), + run_in_background(write_txhash_store, active_stores.txhash, ledger_seq, lcm), + run_in_background(write_events_store, active_stores.events, ledger_seq, lcm), + ) + meta_store.put("streaming:last_committed_ledger", ledger_seq) + + chunk_id = chunk_id_of_ledger(ledger_seq) + if ledger_seq == last_ledger_in_chunk(chunk_id): + on_chunk_boundary(chunk_id, active_stores, meta_store) + + # Every tx-index boundary is also a chunk boundary; index handler runs after chunk handler. + tx_index_id = tx_index_id_of_chunk(chunk_id) + if ledger_seq == last_ledger_in_tx_index(tx_index_id): + on_tx_index_boundary(tx_index_id, active_stores, meta_store) + + ledger_seq += 1 +``` + +Per-store writes are atomic via RocksDB WriteBatch + WAL. + +--- + +## Freeze Transitions + +Three independent background transitions per boundary; each has its own task, flag, and cleanup. Live ingestion never blocks on them. + +- **LFS transition** (per chunk) — retired ledger RocksDB → `.pack` file. +- **Events transition** (per chunk) — retired events RocksDB → cold segment (3 files). +- **RecSplit transition** (per index) — retired txhash RocksDB → 16 `.idx` files. + +Streaming's freezes never produce `.bin` files; those are transient backfill output (Phase 1 only). + +### Concurrency Model + +- **`active_stores` is owned by the ingestion loop.** Fields `ledger` / `events` / `txhash` are mutated only inside the boundary handlers. Freeze tasks receive a handle by value at spawn and never read back through `active_stores`. +- **Meta-store is single-writer.** Serialized across the ingestion loop (per-ledger checkpoint), freeze tasks (flags), and lifecycle loop (prune marker + key deletes). +- **Per-kind single-flight gates.** One outstanding LFS / events / RecSplit transition each; the next starts only after the previous releases. Not a global barrier — kinds remain independent. +- **Query routing.** Per-data-type storage managers own state-transition synchronization; query handlers never touch `active_stores` directly. A query sees either pre-transition or post-transition data, never a mix; never routes to an immutable artifact whose freeze flag is unset. Concrete lock primitives + routing logic are deferred to a separate query-routing doc. + +### Chunk Boundary (every 10_000 ledgers) + +Triggered when the ingestion loop commits `last_ledger_in_chunk(chunk_id)`. Handoffs to two freeze transitions (LFS + events) that run in background. + +```python +def on_chunk_boundary(chunk_id, active_stores, meta_store): + # LFS + events run independently — events doesn't wait on LFS. + wait_for_lfs_complete() + transitioning_ledger_store = active_stores.ledger + active_stores.ledger = open_active_ledger_store(config, meta_store, chunk_id + 1) + run_in_background(freeze_ledger_chunk_to_pack_file, chunk_id, transitioning_ledger_store, meta_store) + + wait_for_events_complete() + transitioning_events_store = active_stores.events + active_stores.events = open_active_events_store(config, meta_store, chunk_id + 1) + run_in_background(freeze_events_chunk_to_cold_segment, chunk_id, transitioning_events_store, meta_store) + + notify_lifecycle() # wake prune loop +``` + +### LFS Transition + +Converts the retired ledger RocksDB store to an immutable `.pack` file, then discards the store. + +```python +def freeze_ledger_chunk_to_pack_file(chunk_id, transitioning_ledger_store, meta_store): + # overwrite=True discards any prior partial. Crash after the freeze flag but + # before delete_dir leaves an orphan; Phase 3 (reconcile) picks it up. + pack_path = ledger_pack_path(chunk_id) + writer = packfile.create(pack_path, overwrite=True) + for ledger_seq in range(first_ledger_in_chunk(chunk_id), last_ledger_in_chunk(chunk_id) + 1): + writer.append(transitioning_ledger_store.get(uint32_big_endian(ledger_seq))) + writer.fsync_and_close() + meta_store.put(f"chunk:{chunk_id:08d}:lfs", "1") + transitioning_ledger_store.close() + delete_dir_if_exists(ledger_store_path(chunk_id)) + meta_store.delete(f"hot:chunk:{chunk_id:08d}:lfs") # cleared AFTER dir removed + signal_lfs_complete() +``` + +### Events Transition + +Converts the retired events RocksDB store to three immutable files (events cold segment). + +```python +def freeze_events_chunk_to_cold_segment(chunk_id, transitioning_events_store, meta_store): + events_path = events_segment_path(chunk_id) + write_cold_segment(transitioning_events_store, events_path) # 3 files: events.pack, index.pack, index.hash + fsync_all(events_path) + meta_store.put(f"chunk:{chunk_id:08d}:events", "1") + transitioning_events_store.close() + delete_dir_if_exists(events_store_path(chunk_id)) + meta_store.delete(f"hot:chunk:{chunk_id:08d}:events") + signal_events_complete() +``` + +### Tx-Index Boundary (every `LEDGERS_PER_TX_INDEX` ledgers) + +The last chunk of a tx index has just rolled over. Before RecSplit can start, every chunk in the tx index must have its `:lfs` and `:events` flags set. + +```python +def on_tx_index_boundary(tx_index_id, active_stores, meta_store): + # Drain all in-flight chunk-level freezes for this tx index before RecSplit. + wait_for_lfs_complete() + wait_for_events_complete() + verify_all_chunk_flags(tx_index_id, meta_store) + transitioning_txhash_store = active_stores.txhash + active_stores.txhash = open_active_txhash_store(config, meta_store, tx_index_id + 1) + run_in_background(build_tx_index_recsplit_files, tx_index_id, transitioning_txhash_store, meta_store) +``` + +### RecSplit Transition + +Builds the 16 RecSplit `.idx` files for tx_index_id from the retired txhash active store. + +```python +def build_tx_index_recsplit_files(tx_index_id, transitioning_txhash_store, meta_store): + # Verify before flag; flag-after-fsync as in LFS / events. + idx_path = recsplit_index_path(tx_index_id) + delete_partial_idx_files(idx_path) + build_recsplit(transitioning_txhash_store, idx_path) # 16 .idx files + fsync_all_idx_files(idx_path) + verify_spot_check(tx_index_id, idx_path, meta_store) + meta_store.put(f"index:{tx_index_id:08d}:txhash", "1") + transitioning_txhash_store.close() + delete_dir_if_exists(txhash_store_path(tx_index_id)) + meta_store.delete(f"hot:index:{tx_index_id:08d}:txhash") +``` + +--- + +## Pruning + +Retention is enforced by a single background task, woken at chunk boundaries. Prune granularity is the whole txhash index — never per chunk. + +```python +def run_prune_lifecycle_loop(config, meta_store): + # Sole caller of prune_tx_index. Initial sweep on entry catches any + # in-progress prunes from a prior-run crash; subsequent sweeps fire on + # chunk-boundary notifications. + retention_ledgers = config.service.retention_ledgers + _run_prune_sweep(meta_store, retention_ledgers, config) + while True: + wait_for_chunk_boundary_notification() # set by on_chunk_boundary's notify_lifecycle() + _run_prune_sweep(meta_store, retention_ledgers, config) + + +def _run_prune_sweep(meta_store, retention_ledgers, config): + for tx_index_id in prunable_tx_index_ids(meta_store, retention_ledgers): + prune_tx_index(tx_index_id, meta_store, config) + + +def prunable_tx_index_ids(meta_store, retention_ledgers): + """ + Returns tx_index_ids that need pruning, from two sources unioned together: + + - Crash-recovery scan — any pruning:index:N key set means a prior prune + was interrupted (marker survives crashes by design); re-run regardless + of retention status. + - Past-retention scan — indexes with index:N:txhash = "1" whose last + ledger is below the retention floor. + """ + if retention_ledgers == 0: + return [] + + result = set() + + # Crash-recovery scan: in-progress prunes from a prior crash. + for key in meta_store.scan_prefix("pruning:index:"): + result.add(parse_pruning_index_id(key)) + + # Past-retention scan: newly-eligible indexes since the last sweep. + last_committed_ledger = meta_store.get("streaming:last_committed_ledger") + max_eligible_tx_index_id = max_prunable_tx_index_id(last_committed_ledger, retention_ledgers) + for tx_index_id in range(0, max_eligible_tx_index_id + 1): + if tx_index_id in result: + continue + if meta_store.get(f"index:{tx_index_id:08d}:txhash") == "1": + result.add(tx_index_id) + + return sorted(result) + + +def max_prunable_tx_index_id(last_committed_ledger, retention_ledgers) -> int: + # Highest tx_index_id whose last_ledger is strictly below the retention floor. + # Returns -1 when no index is past-retention (range(0, 0) below is empty). + if last_committed_ledger is None: + return -1 + floor_ledger = last_committed_ledger - retention_ledgers + if floor_ledger <= GENESIS_LEDGER: + return -1 + # last_ledger_in_tx_index(N) < floor_ledger ⇔ N < tx_index_id_of_ledger(floor_ledger) + return tx_index_id_of_ledger(floor_ledger) - 1 + + +def prune_tx_index(tx_index_id, meta_store, config): + """ + Tear down all artifacts for tx_index_id. Marker set FIRST (atomic 4xx gate + for any tx in N) and cleared LAST (survives crashes so the next sweep + picks N back up). Everything between is individually idempotent. + """ + # OP 1: gate queries off; mark "prune in progress" for crash recovery. + meta_store.put(f"pruning:index:{tx_index_id:08d}", "1") + + # WORK: per-chunk file + key deletion (file-before-flag-delete). + for chunk_id in chunks_for_tx_index(tx_index_id): + delete_if_exists(ledger_pack_path(chunk_id)) + delete_events_segment(chunk_id) + delete_if_exists(raw_txhash_path(chunk_id)) # defence-in-depth; normally already gone via cleanup_txhash + meta_store.delete(f"chunk:{chunk_id:08d}:lfs") + meta_store.delete(f"chunk:{chunk_id:08d}:events") + meta_store.delete(f"chunk:{chunk_id:08d}:txhash") + delete_recsplit_idx_files(tx_index_id) + + # OP 2: queries still 4xx via the marker check. + meta_store.delete(f"index:{tx_index_id:08d}:txhash") + + # OP 3: tx index N is now fully gone. + meta_store.delete(f"pruning:index:{tx_index_id:08d}") +``` + +- **Why index-atomic.** Per-chunk pruning would open a window where `getTransaction` resolves but its `getLedger` 4xxs (pack deleted); whole-index gating closes it. +- **Extra data on disk.** Up to `LEDGERS_PER_TX_INDEX - 1` ledgers past strict retention. `RETENTION_LEDGERS` is always a multiple of `LEDGERS_PER_TX_INDEX`, so the next-eligible index is exactly `LEDGERS_PER_TX_INDEX` further. +- **Separate marker family vs overloading `index:N:txhash` with a `"deleting"` value.** Keeps every meta-store key binary (present-or-absent) so reader code never decodes special values; cost is one extra sub-microsecond write per prune. +- **Marker is steady-state-only.** Phase 3 Pass 1 deletes past-retention state directly (no marker needed — `service_ready = false`, no queries to gate) and clears any below-floor `pruning:index:*` it finds. + +--- + +## Query Contract + +`getLedger` / `getTransaction` / `getEvents` are gated on `service_ready`; during startup phases they return **HTTP 4xx**. + +### Readiness Signal + +- In-memory boolean. Flipped `true` by `set_service_ready()` after Phases 1–3 complete and `compute_resume_ledger` returns, BEFORE Phase 4's captive-core spinup. Frozen-artifact queries don't need captive core; flipping after spinup would add an unnecessary 4–5 minute outage per restart. +- Not persisted; every startup begins `false`. Clients see 4xx on every startup until Phase 4 is reached, regardless of prior runs. +- HTTP server binds before Phase 1, so `/getHealth` is always servable. + +### Behavior During Phases 1–3 + +- `/getLedger`, `/getTransaction`, `/getEvents` → HTTP 4xx, no payload. +- `/getHealth` → served. Response shape matches existing stellar-rpc: `status` (`catching_up` during Phases 1–3, `healthy` during Phase 4), `latestLedger` (= `streaming:last_committed_ledger`, or `0` if absent), `oldestLedger`, `ledgerRetentionWindow`. +- No partial / incremental serving while Phases 1–3 run. + +### Behavior When an Index Is Being Pruned + +- QueryRouter checks `pruning:index:N` first; if set, returns 4xx as if the index were past retention. Queries flip to 4xx the instant the marker is set, not when files actually disappear — no window where queries route into a half-deleted index. + +--- + +## Resilience + +Streaming extends backfill's resilience model ([01-backfill-workflow.md — Crash Recovery](./01-backfill-workflow.md#crash-recovery)) with per-ledger checkpoint discipline, per-kind single-flight freeze gates, and the `pruning:index:*` marker family. No separate recovery phase — every startup runs Phases 1–4 and skips already-complete work via meta-store flags. + +### Streaming-Specific Invariants + +1. **No permanently-partial tx index.** Every persisted tx index reaches a terminal state — either *complete* (`index:N:txhash = "1"`, RecSplit built) or *fully discarded* (all chunks + `index:N:txhash` deleted). The intermediate "trailing partial" state (chunks with `:lfs+:events` but no `index:N:txhash`) persists only transiently, and is completed by either (a) a future Phase 1 invocation extending the backfill range past `last_chunk_in_tx_index(N)`, (b) Phase 4 ingestion reaching `last_chunk_in_tx_index(N)` (which fires `on_tx_index_boundary` → `build_tx_index_recsplit_files`), or (c) Phase 3 Pass 1 discarding it as past-retention. +2. **No permanent orphans.** Every meta-store flag has a corresponding artifact (or is mid-cleanup, recoverable via file-before-flag-delete). Every active-store dir has a `hot:*` key. Every immutable file has a freeze flag. +3. **Pruning intent marker.** `pruning:index:{N}` is set BEFORE any file delete and cleared AFTER everything else; the lifecycle loop's `prunable_tx_index_ids` picks up surviving markers via `scan_prefix("pruning:index:")` and re-runs the prune idempotently. See [Pruning](#pruning). diff --git a/full-history/design-docs/03-backfill-workflow.md b/full-history/design-docs/03-backfill-workflow.md deleted file mode 100644 index dbb7aa05f..000000000 --- a/full-history/design-docs/03-backfill-workflow.md +++ /dev/null @@ -1,698 +0,0 @@ -# Backfill Workflow - -## Overview - -Backfill populates the immutable stores for a configured ledger range `[start_ledger, end_ledger]`. - -**What it does:** -- Ingests historical ledgers offline — no live queries served (only `getHealth` / `getStatus`). `getHealth` is the existing lightweight liveness check; `getStatus` is the new backfill-specific progress endpoint (see [getStatus API Response](#getstatus-api-response) below). -- Writes directly to immutable file formats — no RocksDB active stores -- Schedules work as a DAG of idempotent tasks, dispatched via a flat worker pool (default GOMAXPROCS slots) -- Exits when done; on failure, re-run the same command — completed work is never repeated - -**What it produces:** - -| Query it enables | Immutable output | Scope | -|-----------------|-----------------|-------| -| `getLedger` | Ledger [pack file](https://github.com/stellar/stellar-rpc/pull/633) | Per chunk (10K ledgers) | -| `getTransaction` | Txhash index files | Per txhash index (default 10M ledgers) | -| `getEvents` | [Events cold segment](https://github.com/stellar/stellar-rpc/pull/635) | Per chunk | - ---- - -## Geometry - -The Stellar blockchain starts at ledger 2. Backfill organizes data using two concepts: - -- **Chunk** — 10_000 ledgers (hardcoded, not configurable) - - Atomic unit of ingestion and crash recovery - - Produces: one ledger `.pack` file, one raw txhash `.bin` file, one events cold segment (`events.pack`, `index.pack`, `index.hash`) - - `chunk_id = (ledger_seq - 2) / 10_000` -- **Txhash Index** — `CHUNKS_PER_TXHASH_INDEX` chunks (default 1000 = 10M ledgers) - - One RecSplit index covers all transactions across `CHUNKS_PER_TXHASH_INDEX` chunks (default: 10M ledgers worth of transactions) - - Produces 16 CF (column family) `.idx` files per txhash index - - `index_id = chunk_id / CHUNKS_PER_TXHASH_INDEX` - - Configurable via TOML, but must not change across runs — once set, it is fixed - -### ID Formulas - -``` -chunk_id = (ledger_seq - 2) / 10_000 -index_id = chunk_id / CHUNKS_PER_TXHASH_INDEX -``` - -Example with `CHUNKS_PER_TXHASH_INDEX = 1000` (default): - -| Txhash Index ID | First Ledger | Last Ledger | Chunks | -|-----------------|-------------|------------|--------| -| 0 | 2 | 10_000_001 | 0–999 | -| 1 | 10_000_002 | 20_000_001 | 1000–1999 | -| 2 | 20_000_002 | 30_000_001 | 2000–2999 | -| N | (N × 10M) + 2 | ((N+1) × 10M) + 1 | N×1000 – (N+1)×1000 - 1 | - -All IDs use uniform `%08d` zero-padding (supports up to 99_999_999). - ---- - -## Configuration - -TOML file, passed via `stellar-rpc full-history-backfill --config path/to/config.toml`. - -- **TOML** defines data layout and storage paths — must be stable across runs -- **CLI flags** define per-run parameters (range, workers, retries) - -### TOML Config - -**[SERVICE]** - -| Key | Type | Default | Description | -|-----|------|---------|-------------| -| `DEFAULT_DATA_DIR` | string | **required** | Base directory for meta store and default storage paths. | - -**[BACKFILL]** - -| Key | Type | Default | Description | -|-----|------|---------|-------------| -| `CHUNKS_PER_TXHASH_INDEX` | int | `1000` | Chunks per txhash index. Defines data layout — must be stable across runs. | - -**[IMMUTABLE_STORAGE.LEDGERS]** - -| Key | Type | Default | Description | -|-----|------|---------|-------------| -| `PATH` | string | `{DEFAULT_DATA_DIR}/ledgers` | Base path for ledger pack files. | - -**[IMMUTABLE_STORAGE.EVENTS]** - -| Key | Type | Default | Description | -|-----|------|---------|-------------| -| `PATH` | string | `{DEFAULT_DATA_DIR}/events` | Base path for events cold segments. | - -**[IMMUTABLE_STORAGE.TXHASH_RAW]** - -| Key | Type | Default | Description | -|-----|------|---------|-------------| -| `PATH` | string | `{DEFAULT_DATA_DIR}/txhash/raw` | Base path for raw txhash `.bin` files (transient). | - -**[IMMUTABLE_STORAGE.TXHASH_INDEX]** - -| Key | Type | Default | Description | -|-----|------|---------|-------------| -| `PATH` | string | `{DEFAULT_DATA_DIR}/txhash/index` | Base path for RecSplit index files (permanent). | - -The `IMMUTABLE_STORAGE` prefix disambiguates from `ACTIVE_STORAGE` (RocksDB-backed mutable stores used by the streaming workflow). - -**[BACKFILL.BSB]** — BSB / Buffered Storage Backend (required) - -| Key | Type | Default | Description | -|-----|------|---------|-------------------------------------------------------------------------------------| -| `BUCKET_PATH` | string | **required** | Remote object store path to fetch LedgerCloseMeta (without `gs://` prefix for GCS). | -| `BUFFER_SIZE` | int | `1000` | Prefetch buffer depth per connection. | -| `NUM_WORKERS` | int | `20` | Download workers per connection. | - -**[LOGGING]** - -Both keys are optional. When a key is set in both TOML and on the CLI, the CLI flag wins — specifying both is not an error. - -| Key | Type | Default | Description | -|-----|------|---------|-------------| -| `LEVEL` | string | `"info"` | Minimum log severity. Accepted values: `debug` / `info` / `warn` / `error`. | -| `FORMAT` | string | `"text"` | Log output format. Accepted values: `text` / `json`. | - -### CLI Flags - -| Flag | Type | Default | Description | -|------|------|---------|-------------| -| `--start-ledger` | uint32 | **required** | First ledger (inclusive). Must be ≥ 2. | -| `--end-ledger` | uint32 | **required** | Last ledger (inclusive). Must be > `start_ledger`. | -| `--workers` | int | `GOMAXPROCS` | Total concurrent DAG task slots. | -| `--verify-recsplit` | bool | `true` | Run RecSplit verify phase after build. | -| `--max-retries` | int | `3` | Max retries per task before marking it failed. | -| `--log-level` | string | — | Overrides `[LOGGING].LEVEL` when set. | -| `--log-format` | string | — | Overrides `[LOGGING].FORMAT` when set. | - -### Optional TOML Sections - -| Section | Key | Default | Description | -|---------|-----|---------|-------------| -| `[META_STORE]` | `PATH` | `{DEFAULT_DATA_DIR}/meta/rocksdb` | Meta store RocksDB directory | - -### Validation Rules - -The only hard constraints are: - -- `start_ledger >= 2` -- `end_ledger > start_ledger` -- `[BACKFILL.BSB]` must be present -- `CHUNKS_PER_TXHASH_INDEX` must not change after the first run — changing it invalidates existing txhash index boundaries -- Backfill never prunes existing data — narrowing the range between runs is safe (completed work outside the new range is simply left untouched) -- No txhash-index-alignment required — the operator can pass any arbitrary ledger range -- If gaps remain after backfill, streaming mode validates completeness for all chunks and all txhash indexes at startup, reports any gaps to the operator, and aborts - -#### Chunk Boundary Expansion - -- System expands the requested range **outward** to the nearest chunk boundaries -- Start expands DOWN to the first ledger of its chunk -- End expands UP to the last ledger of its chunk -- Never clamps inward — the effective range is always ≥ the requested range -- Operator doesn't need to manually calculate chunk-aligned values - -``` -Operator requests: --start-ledger 5_000_000 --end-ledger 56_337_842 -Chunk boundary expand: start=5_000_000 falls within chunk 499 (starts at 4_990_002) - → expand start to 4_990_002 - end=56_337_842 falls within chunk 5633 (ends at 56_340_001) - → expand end to 56_340_001 -Effective range: ledgers 4_990_002–56_340_001 = 5_135 chunks -``` - -#### BSB Availability Validation - -After expansion, the system validates that the remote object store referenced by BSB contains all ledgers in the expanded range: - -- Expanded end exceeds BSB availability → error at startup (no silent truncation) -- Operator must either reduce `--end-ledger` or wait for more ledgers to become available in BSB - -#### Partial Txhash Index Ranges - -If the expanded range does not complete a full txhash index: - -- Chunks are still backfilled and immediately serve `getLedger`/`getEvents` when the service is started in streaming mode -- Txhash index creation only happens once **all** input chunks for the txhash index are ready -- If txhash index creation does not happen in the current backfill run, the remaining chunks are completed either by a subsequent backfill run (should the operator run backfill again) or when streaming mode starts for the first time (see [Implications for Streaming Workflow](#implications-for-streaming-workflow) below) - -Ledger and events data are useful per-chunk and should not be blocked by txhash index alignment. Without relaxed validation: - -- A node at ledger 56_340_000 cannot backfill the latest ~6.3M ledgers because `50_000_002–56_340_001` doesn't align to a 10M txhash index boundary — the operator would have to wait until ledger 60_000_001 -- Incremental backfill (extending coverage from a completed txhash index to recent history) would be blocked unless the chain happens to sit on a txhash index boundary - -#### Implications for Streaming Workflow - -When backfill completes at a non-txhash-index-aligned boundary, a partially-filled txhash index remains. The streaming workflow completes the remaining chunks: - -- Streaming continues chunk ingestion from where backfill left off, writing the same per-chunk outputs (LFS, txhash, events) using the same flag-based idempotency -- When streaming completes the last chunk needed for a pending txhash index, txhash index creation becomes eligible and runs -- The meta store is the shared coordination point — streaming checks the same chunk flags as backfill, so there is no gap or overlap between backfill and streaming coverage - -See [PR #617 discussion](https://github.com/stellar/stellar-rpc/pull/617#discussion_r2969796337) for the original rationale. - -### Example: GCS Backfill Config - -```toml -[SERVICE] -DEFAULT_DATA_DIR = "/data/stellar-rpc" - -[BACKFILL] -CHUNKS_PER_TXHASH_INDEX = 1000 - -[IMMUTABLE_STORAGE.LEDGERS] -PATH = "/mnt/nvme/ledgers" - -[IMMUTABLE_STORAGE.EVENTS] -PATH = "/mnt/nvme/events" - -[IMMUTABLE_STORAGE.TXHASH_RAW] -PATH = "/mnt/nvme/txhash/raw" - -[IMMUTABLE_STORAGE.TXHASH_INDEX] -PATH = "/mnt/nvme/txhash/index" - -[BACKFILL.BSB] -BUCKET_PATH = "sdf-ledger-close-meta/v1/ledgers/pubnet" - -[LOGGING] -LEVEL = "info" -FORMAT = "text" -``` - -```bash -stellar-rpc full-history-backfill --config config.toml \ - --start-ledger 2 \ - --end-ledger 30_000_001 \ - --workers 40 -``` - ---- - -## Directory Structure - -With geometry (chunk, txhash index) and storage paths (`IMMUTABLE_STORAGE.*`) defined above, here is how they map to the filesystem. - -- Each data type has its own directory tree rooted at its `IMMUTABLE_STORAGE.*.PATH` -- Chunk-level files (ledgers, events, raw txhash) are grouped into subdirectories (bucket) of 1_000 chunks: - - `bucket_id = chunk_id / 1000` (hardcoded, not configurable), formatted as `%05d` - - `bucket_id` is purely a filesystem concern — it does not appear in meta store keys, DAG dependencies, or config -- Txhash index output is the only structure that uses `index_id` instead of `bucket_id` -- Directories are created on-demand via `os.MkdirAll` (safe for concurrent writes) - -``` -{DEFAULT_DATA_DIR}/ -├── meta/ -│ └── rocksdb/ ← Meta store (WAL always enabled) -│ -├── ledgers/ ← IMMUTABLE_STORAGE.LEDGERS.PATH -│ ├── 00000/ ← chunks 0–999 (1_000 .pack files) -│ │ ├── 00000000.pack ← ledger pack file (PR #633) -│ │ ├── 00000001.pack -│ │ └── ... -│ ├── 00001/ ← chunks 1000–1999 -│ │ └── ... -│ └── .../ -│ -├── events/ ← IMMUTABLE_STORAGE.EVENTS.PATH -│ ├── 00000/ ← chunks 0–999 (3_000 files: 3 per chunk) -│ │ ├── 00000000-events.pack ← compressed event blocks -│ │ ├── 00000000-index.pack ← serialized roaring bitmaps -│ │ ├── 00000000-index.hash ← MPHF for term → slot lookup -│ │ └── ... -│ └── .../ -│ -└── txhash/ - ├── raw/ ← IMMUTABLE_STORAGE.TXHASH_RAW.PATH - │ ├── 00000/ ← chunks 0–999 (1_000 .bin files) - │ │ ├── 00000000.bin ← TRANSIENT (deleted after RecSplit) - │ │ └── ... - │ └── .../ - └── index/ ← IMMUTABLE_STORAGE.TXHASH_INDEX.PATH - ├── 00000000/ ← txhash index 0 (16 RecSplit CF files) - │ └── cf-{0-f}.idx ← PERMANENT - └── .../ -``` - -`CHUNKS_PER_TXHASH_INDEX` only affects `txhash/index/` — all other trees use the hardcoded 1_000-chunk `bucket_id` grouping regardless. - -The directory tree above reflects the default `CHUNKS_PER_TXHASH_INDEX = 1000`. Using 20M ledgers (2_000 chunks) as an example: - -| `CHUNKS_PER_TXHASH_INDEX` | Txhash index dirs | Tradeoff | -|---------------------------|-------------------|----------| -| `1000` (default) | 2_000 / 1000 = 2 | Fewer dirs, larger indexes — longer build time per index, fewer files to search at query time | -| `100` | 2_000 / 100 = 20 | More dirs, smaller indexes — faster build time per index, more files to search at query time | -| `1` | 2_000 / 1 = 2_000 | One index per chunk — fastest build, most files to search | - -### Path Conventions - -| File Type | Pattern | Example | -|-----------|---------|---------| -| Ledger pack | `{IMMUTABLE_STORAGE.LEDGERS.PATH}/{bucketID:05d}/{chunkID:08d}.pack` | `ledgers/00000/00000042.pack` | -| Raw txhash | `{IMMUTABLE_STORAGE.TXHASH_RAW.PATH}/{bucketID:05d}/{chunkID:08d}.bin` | `txhash/raw/00000/00000042.bin` | -| RecSplit CF | `{IMMUTABLE_STORAGE.TXHASH_INDEX.PATH}/{indexID:08d}/cf-{nibble}.idx` | `txhash/index/00000000/cf-a.idx` | -| Events data | `{IMMUTABLE_STORAGE.EVENTS.PATH}/{bucketID:05d}/{chunkID:08d}-events.pack` | `events/00000/00000042-events.pack` | -| Events index | `{IMMUTABLE_STORAGE.EVENTS.PATH}/{bucketID:05d}/{chunkID:08d}-index.pack` | `events/00000/00000042-index.pack` | -| Events hash | `{IMMUTABLE_STORAGE.EVENTS.PATH}/{bucketID:05d}/{chunkID:08d}-index.hash` | `events/00000/00000042-index.hash` | - -- **Nibble** = high 4 bits of `txhash[0]`, i.e., `txhash[0] >> 4`. Values `0`–`f`. Determines which of 16 CFs a txhash is routed to. -- **Raw txhash format**: 36 bytes per entry, no header: `[txhash: 32 bytes][ledgerSeq: 4 bytes big-endian]` -- **Events cold segment**: See [getEvents full-history design](https://github.com/stellar/stellar-rpc/pull/635) for the full format specification. - ---- - -## Meta Store Keys - -- Single RocksDB instance with WAL (Write-Ahead Log) always enabled -- Authoritative source for crash recovery — all resume decisions derive from key presence in this store - -### Key Schema - -All IDs use uniform `%08d` zero-padding, matching the directory structure. - -| Key Pattern | Value | Written When | -|-------------|-------|-------------| -| `chunk:{C:08d}:lfs` | `"1"` | After ledger `.pack` file is fsynced | -| `chunk:{C:08d}:txhash` | `"1"` | After raw txhash `.bin` file is fsynced | -| `chunk:{C:08d}:events` | `"1"` | After events cold segment files (`events.pack`, `index.pack`, `index.hash`) are fsynced | -| `index:{N:08d}:txhash` | `"1"` | After all 16 RecSplit CF `.idx` files are built and fsynced | - -- Values are `"1"` (retained for `ldb`/`sst_dump` readability); key presence is the signal -- Key absence means not started or incomplete — treated identically on resume -- Each chunk flag is written independently after its output's fsync — a crash may leave some flags set and others absent for the same chunk -- On resume, each chunk's flags are checked independently — only missing outputs are produced -- WAL is always enabled — disabling it would invalidate all crash recovery -- `chunk:{C}:txhash` keys are deleted after the txhash index is built (the raw `.bin` files they reference are also deleted); all other flags are permanent - -**Examples:** -``` -chunk:00000000:lfs → "1" chunk 0 ledger pack done -chunk:00000000:txhash → "1" chunk 0 raw txhash done -chunk:00000000:events → "1" chunk 0 events cold segment done -chunk:00000999:events → "1" last chunk of txhash index 0 -index:00000000:txhash → "1" txhash index 0 RecSplit complete -index:00000001:txhash → absent txhash index 1 not yet built -``` - -### Key Lifecycle - -``` -chunk ingestion → sets chunk:{C}:lfs, chunk:{C}:txhash, chunk:{C}:events - (each independently, after its output's fsync) -txhash index build → sets index:{N}:txhash -txhash cleanup → deletes chunk:{C}:txhash keys + raw .bin files -``` - -After a completed txhash index: -- `chunk:{C}:lfs`, `chunk:{C}:events`, `index:{N}:txhash` — permanent -- `chunk:{C}:txhash` keys + raw `.bin` files — deleted after txhash index is built - ---- - -## Tasks and Dependencies - -The backfill DAG has three task types: - -| Task | Cadence | Dependencies | Produces | -|------|---------|-------------|----------| -| `process_chunk(chunk_id)` | Per chunk (10K ledgers) | None | Ledger `.pack` + raw txhash `.bin` + events cold segment | -| `build_txhash_index(index_id)` | Per txhash index | All `process_chunk` tasks for this txhash index | 16 RecSplit `.idx` files | -| `cleanup_txhash(index_id)` | Per txhash index | `build_txhash_index` for this txhash index | Deletes raw `.bin` files + `chunk:{C}:txhash` meta keys | - -- Each task is a black box to the DAG scheduler — it calls `Execute()` and waits for return -- What happens inside (goroutines, I/O, parallelism) is up to the task - -### Dependency Diagram - -For a single txhash index with N chunks: - -``` -process_chunk(chunk 0) ─┐ -process_chunk(chunk 1) ─┤ -process_chunk(chunk 2) ─┼──→ build_txhash_index(index_id) ──→ cleanup_txhash(index_id) -... │ -process_chunk(chunk N) ─┘ -``` - -- All `process_chunk` tasks for a txhash index must complete before `build_txhash_index` fires -- `cleanup_txhash` runs after `build_txhash_index` succeeds -- Cleanup deletes the raw `.bin` files and their `chunk:{C}:txhash` meta keys - -### Main Flow - -```python -def run_backfill(config, flags): - - # 1. Validate — abort before any work if config is incompatible with existing state - validate(config, flags) - - # 2. Build DAG — register all tasks; each task's execute() handles its own no-op check - dag = build_dag(config, flags) - - # 3. Execute — dispatch all tasks concurrently, bounded by worker count - dag.execute(max_workers=flags.workers) # default GOMAXPROCS -``` - -### Validation - -Validation runs before DAG construction, not as a DAG task. If it were a DAG task, other tasks with no dependencies would start executing concurrently before validation completes — and if validation fails, in-flight work that should never have started would need to be cancelled. Running it first means a clean abort with no partial work. - -```python -def validate(config, flags): - # See Validation Rules for the full list of checks. - assert flags.start_ledger >= 2 - assert flags.end_ledger > flags.start_ledger - assert config.backfill.bsb is not None - assert CHUNKS_PER_TXHASH_INDEX unchanged from prior runs (if meta store is non-empty) -``` - -### DAG Setup - -```python -def build_dag(config, flags): - # Wires up tasks and dependency edges — no completion checks or skip logic. - # Each task's execute() handles its own no-op check (early return if already complete). - - dag = new DAG() - - for index_id in configured_indexes(config, flags): - chunk_tasks = [] - for chunk_id in chunks_for_index(index_id): - t = dag.add(ProcessChunkTask(chunk_id), deps=[]) - chunk_tasks.append(t.id) - b = dag.add(BuildTxHashIndexTask(index_id), - deps=chunk_tasks) - dag.add(CleanupTxHashTask(index_id), deps=[b.id]) - - return dag -``` - ---- - -## Task Details - -### process_chunk(chunk_id) - -- Processes a single 10K-ledger chunk end-to-end -- Occupies one DAG worker slot -- Only produces missing outputs — checks each flag independently -- Internal concurrency is an implementation detail - -**Outputs** (all produced in a single task, only if missing): -- Ledger pack file (`{chunkID:08d}.pack`) — compressed ledger data in [packfile format](https://github.com/stellar/stellar-rpc/pull/633) -- Raw txhash flat file (`{chunkID:08d}.bin`) — 36-byte entries consumed by RecSplit builder -- Events cold segment (`events.pack` + `index.pack` + `index.hash`) — per [getEvents design](https://github.com/stellar/stellar-rpc/pull/635) - -**Pseudocode:** - -```python -process_chunk(chunk_id): - bucket_id = chunk_id / 1000 # hardcoded subdirectory grouping (see Directory Structure) - first_ledger = chunk_first_ledger(chunk_id) - last_ledger = chunk_last_ledger(chunk_id) - - # 1. Check which outputs are missing - need_lfs = not meta_store.has(f"chunk:{chunk_id:08d}:lfs") - need_txhash = not meta_store.has(f"chunk:{chunk_id:08d}:txhash") - need_events = not meta_store.has(f"chunk:{chunk_id:08d}:events") - - if not (need_lfs or need_txhash or need_events): - return # all outputs already present - - # 2. Choose data source - if not need_lfs: - source = local_packfile(ledger_pack_path(bucket_id, chunk_id)) # NVMe, no BSB - else: - source = BSBFactory.create(first_ledger, last_ledger) # BSB connection - - # 3. Open writers only for missing outputs - ledger_writer = packfile.create(ledger_pack_path(bucket_id, chunk_id), - overwrite=True) if need_lfs else None - txhash_writer = open(raw_txhash_path(bucket_id, chunk_id), - overwrite=True) if need_txhash else None - events_writer = events_segment.create(events_path(bucket_id, chunk_id), - overwrite=True) if need_events else None - - # 4. Process each ledger - for seq in range(first_ledger, last_ledger + 1): - lcm = source.get_ledger(seq) - - if need_lfs: ledger_writer.append(compress(lcm)) - if need_txhash: txhash_writer.append(extract_txhashes(lcm)) # 36 bytes per tx - if need_events: events_writer.append(extract_events(lcm)) - - # 5. Fsync + flag each output independently - if need_lfs: - ledger_writer.fsync_and_close() - meta_store.put(f"chunk:{chunk_id:08d}:lfs", "1") - - if need_txhash: - txhash_writer.fsync_and_close() - meta_store.put(f"chunk:{chunk_id:08d}:txhash", "1") - - if need_events: - events_writer.finalize() # flush, build MPHF + bitmap index, fsync - meta_store.put(f"chunk:{chunk_id:08d}:events", "1") - - source.close() -``` - -Key properties: -- Only missing outputs are produced — a partially-completed chunk resumes from where it left off -- If LFS is already present, reads from local NVMe instead of BSB (avoids redundant download) -- Each flag is written independently after its output's fsync — no atomic WriteBatch needed -- `packfile.Create()` with `overwrite=True` handles truncation of partial files from prior crashes — no explicit `delete_if_exists` check needed -- Naturally extends to new data types (add a fourth flag) - -**BSB** (BufferedStorageBackend): -- Ledger source backed by a remote object store -- Each `process_chunk` task creates its own BSB connection -- Internal prefetch workers: `BUFFER_SIZE` ledgers ahead, `NUM_WORKERS` download goroutines - -### build_txhash_index(index_id) - -- Builds the RecSplit txhash index for one completed txhash index -- Occupies one DAG worker slot, but spawns several goroutines internally -- The DAG guarantees all chunk `.bin` files exist before this runs - -**Pseudocode:** - -```python -build_txhash_index(index_id): - if meta_store.has(f"index:{index_id:08d}:txhash"): - return # already built — no-op - - bin_files = list_bin_files(index_id) # all .bin files for chunks in this txhash index - - # Phase 1: COUNT — scan all .bin files, count entries per CF - cf_counts = parallel_count(bin_files, workers=100) - # cf_counts[nibble] = number of (txhash, ledgerSeq) entries routed to that CF - - # Phase 2: ADD — re-read .bin files, route entries to CF builders - cf_builders = [RecSplitBuilder(cf_counts[n]) for n in range(16)] - parallel_add(bin_files, cf_builders, workers=100) - # each entry routed to cf_builders[txhash[0] >> 4] (mutex per CF) - - # Phase 3: BUILD — build MPH index per CF, one .idx file each - parallel_build(cf_builders, workers=16) - # each CF produces one .idx file; all fsynced - - # Phase 4: VERIFY (optional) — look up every key in the built indexes - if verify_recsplit: - parallel_verify(bin_files, cf_builders, workers=100) - - # Mark index complete - meta_store.put(f"index:{index_id:08d}:txhash", "1") -``` - -Key properties: -- COUNT and ADD each read all `.bin` files (two full passes over the data) -- BUILD runs 16 goroutines in parallel (one per CF) — each CF is independent -- VERIFY is skippable via `--verify-recsplit=false` cli flag -- All-or-nothing recovery: if `index:{N}:txhash` is absent on restart → delete partial `.idx` files → rerun entire build - -### cleanup_txhash(index_id) - -- Runs after `build_txhash_index` completes successfully - -**Pseudocode:** - -```python -cleanup_txhash(index_id): - for chunk_id in chunks_for_index(index_id): - if not meta_store.has(f"chunk:{chunk_id:08d}:txhash"): - continue # already cleaned up — skip - delete(raw_txhash_path(bucket_id, chunk_id)) # remove .bin file - meta_store.delete(f"chunk:{chunk_id:08d}:txhash") # remove meta key -``` - -Key properties: -- Modeled as a separate DAG task (not inline in `build_txhash_index`) so crash recovery works naturally -- Per-chunk idempotency: each chunk checks its own `chunk:{C}:txhash` key before deleting — a crash mid-cleanup resumes from where cleanup left off -- On restart: DAG sees txhash index key present (build complete) but `chunk:{C}:txhash` keys still exist → cleanup runs as a normal task - ---- - -## Execution Model - -### DAG Scheduler - -- Pipeline builds a single DAG at startup, executes it with bounded concurrency -- The DAG is the only scheduling mechanism — no per-txhash-index coordinators, no secondary worker pools -- Each task's `Execute()` is wrapped with a retry loop bounded by `--max-retries` (default 3). Any transient failure (BSB errors, temporary I/O issues) triggers a retry at the task level. - -```python -run_dag(dag, max_workers): - worker_slots = Semaphore(max_workers) - runnable_tasks = ThreadSafeQueue(dag.tasks_with_no_pending_dependencies()) - - def execute_task(task): - """Runs in a background thread — one per dispatched task.""" - for attempt in range(1, max_retries + 1): - error = task.execute() - if error is None: - break - if attempt == max_retries: - mark_failed(task, error) # halt all dependents - break - log.warn("retry", task, attempt, error) - - worker_slots.release() # free worker slot - - # Check if completing this task unblocks any downstream tasks - for downstream in dag.dependents_of(task): - downstream.mark_dependency_done(task) - if downstream.all_dependencies_done(): - runnable_tasks.push(downstream) # now eligible to run - - # Main loop — dispatches tasks as they become runnable - while runnable_tasks: - current_task = runnable_tasks.pop() - worker_slots.acquire() # block until a worker slot is free - run_in_background(execute_task, current_task) # launch — returns immediately -``` - -### Worker Pool - -- Single flat pool of `workers` slots (default `GOMAXPROCS`) -- Any mix of task types can occupy slots simultaneously -- `process_chunk`: 1 slot per task -- `build_txhash_index`: 1 slot per task (uses many goroutines internally) -- `cleanup_txhash`: 1 slot per task - -### How Work Flows Through the Pipeline - -- All `process_chunk` tasks have no dependencies → DAG dispatches up to `workers` slots immediately at startup -- Chunks from different txhash indexes run side by side — the scheduler does not process txhash indexes sequentially -- When the last chunk of a txhash index completes → `build_txhash_index` becomes eligible, claims a slot -- After build completes → `cleanup_txhash` becomes eligible -- Remaining slots continue processing chunks for other txhash indexes throughout — no special coordination needed - ---- - -## Crash Recovery - -There is no separate crash recovery, reconciliation, or startup triage phase. Recovery happens organically because every task's `execute()` checks its own completion state: - -- On every startup, `build_dag()` registers ALL tasks for the configured range — no meta store scanning in DAG setup -- `process_chunk` checks each output flag independently — missing outputs are produced, existing outputs are skipped -- `build_txhash_index` checks `index:{N}:txhash` — if present, returns immediately; if absent, deletes partial `.idx` files and reruns the full build -- `cleanup_txhash` checks `chunk:{C}:txhash` per-chunk — already-cleaned chunks are skipped, remaining chunks are cleaned up - -This works because of three invariants: - -1. **Key implies durable file** — a meta store flag is set only after fsync -2. **Tasks are idempotent** — each checks its own outputs and skips or overwrites what exists -3. **DAG registers all tasks on every startup** — completed tasks return immediately from `execute()` - -### Concurrent Access Prevention - -- Meta store RocksDB uses kernel-level `flock()` on a `LOCK` file -- A second process attempting to open the same meta store fails immediately -- Released automatically on process exit (including `kill -9`) - - ---- - -## getStatus API Response - -During backfill, `getStatus` returns progress as task-type summaries: -- No per-txhash-index breakdown — just completed/pending/in_progress counts per task type - -```json -{ - "mode": "BACKFILL", - "tasks": { - "process_chunk": {"completed": 288, "pending": 5712, "in_progress": 40}, - "build_txhash_index": {"completed": 0, "pending": 6, "in_progress": 0}, - "cleanup_txhash": {"completed": 0, "pending": 6, "in_progress": 0} - }, - "eta_seconds": 1820 -} -``` - ---- - -## Error Handling - -Two layers of retry: - -- **BSB retries** — BSB handles transient errors internally (connection resets, throttling, etc). These retries happen within a single task execution and are not visible to the DAG scheduler. -- **Task-level retries** — the DAG scheduler wraps each task's `execute()` with a retry loop bounded by `--max-retries` (default 3). If a task returns an error after BSB has exhausted its own retries, the scheduler retries the entire task. After `--max-retries` exhausted → task marked failed → DAG halts all dependent tasks → process exits non-zero. - -Operator re-runs the same command; completed work is never repeated. - -| Error | Handled by | Action | -|-------|-----------|--------| -| BSB transient error (throttle, connection reset) | BSB internal retry | Retried within the task; transparent to DAG | -| BSB persistent error (BSB retries exhausted) | Task-level retry | `--max-retries` attempts; then ABORT | -| Ledger pack write / fsync failure | Task-level retry | `--max-retries` attempts; then ABORT; flag not set | -| TxHash write / fsync failure | Task-level retry | `--max-retries` attempts; then ABORT; flag not set | -| Events write / fsync failure | Task-level retry | `--max-retries` attempts; then ABORT; flag not set | -| RecSplit build failure | Task-level retry | `--max-retries` attempts; then ABORT; txhash index key absent | -| Verify phase mismatch | None | ABORT immediately — data corruption, operator investigates | -| Meta store write failure | None | ABORT immediately — treat as crash, operator re-runs | diff --git a/full-history/design-docs/README.md b/full-history/design-docs/README.md index e682068d6..8b3816547 100644 --- a/full-history/design-docs/README.md +++ b/full-history/design-docs/README.md @@ -1,26 +1,18 @@ # Stellar Full History RPC Service — Design Docs -> **Scope**: Backfill pipeline only. Streaming pipeline design is covered separately. - ## Documents -| Document | Description | -|----------|-------------| -| [03-backfill-workflow.md](./03-backfill-workflow.md) | Complete backfill design — geometry, meta store keys, directory layout, configuration, DAG task graph, execution model, crash recovery, getStatus API | - -The backfill doc is self-contained. Read it top-to-bottom for the full picture. - -## Quick Context - -The Stellar Full History RPC Service ingests the complete blockchain history. Primary use cases: +| Doc | Scope | +|-----|-------| +| [01-backfill-workflow.md](./01-backfill-workflow.md) | Backfill subroutine internals — DAG, per-chunk tasks, shared TOML config, meta-store key schema, crash recovery | +| [02-streaming-workflow.md](./02-streaming-workflow.md) | Unified service end-to-end — startup phases, live ingestion, freeze transitions, pruning, query contract, resilience (crash recovery + concurrent-access guards + error handling) | -- Retrieve any ledger from history -- Retrieve any transaction from history -- Retrieve any events with filter matching from history +## Reading Order -It has two modes: +- Read **01 Backfill** first. It defines shared concepts used by both docs: geometry, meta-store key schema, shared TOML config, flag-after-fsync. +- Read **02 Streaming** second. It builds on 01's vocabulary and describes how the service invokes backfill as its Phase 1 (catchup) subroutine. -- **Backfill** — offline bulk import. Writes directly to immutable files (LFS chunks + RecSplit indexes). No RocksDB, no queries during ingestion. DAG-scheduled with a flat worker pool. -- **Streaming** — real-time ingestion via CaptiveStellarCore. Writes to RocksDB active stores, serves queries, transitions to immutable storage at index boundaries. Covered in a separate design doc. +## See Also -These modes are fully independent — separate code, separate crash recovery, separate transition workflows. +- [packfile library design](../../design-docs/packfile-library.md) — binary format for immutable `.pack` files (ledger packs + events cold segments); consumed by both docs above. +- [getEvents full-history design](../../design-docs/getevents-full-history-design.md) — events hot/cold segment layout, roaring bitmap indexes, MPHF; consumed by both docs above.