feat(ledger): Go PgLedger pgxpool impl + executor wiring (PR-2 of #95)#122
Conversation
PR-2 of the trade-ledger plan. Mirrors the Rust PgLedger surface so a
unified /metrics scrape across both binaries surfaces a single set of
aether_ledger_* families, and brings 'rows on a fork run' from
half-functional (Rust side only) to end-to-end.
internal/db/ledger_pg.go — new
- PgLedger on pgxpool. Hot path is non-blocking try-send onto a bounded
chan (cap 1024), drained by a dispatcher goroutine that fans out via
a counting semaphore (limit 8) so the pgx pool runs at capacity
without acquire-queueing. Saturation drops the row and bumps
aether_ledger_drops_total{op}; never blocks the executor.
- 2 s connect timeout so a misconfigured DATABASE_URL fails fast and
LedgerFromEnv falls back to NoopLedger instead of stalling boot.
- Idempotent INSERTs for bundles + ON CONFLICT upsert for inclusion +
pnl_daily delta accumulation (multiple writers can contribute to the
same day without lost updates; NUMERIC(78,0) preserves U256 economics
losslessly).
internal/db/metrics.go — new
- LedgerMetrics over the default Prometheus registry. Names mirror the
Rust LedgerMetrics::register surface exactly:
aether_ledger_writes_total{op, result}
aether_ledger_drops_total{op}
aether_ledger_queue_depth
aether_ledger_write_latency_ms{op}
internal/db/ledger.go — extended
- ArbIDNamespace + BundleIDNamespace constants. ArbIDNamespace is
byte-identical to the Rust ARB_ID_NAMESPACE in
crates/grpc-server/src/engine.rs so log↔DB join keys are symmetric
across the gRPC boundary; a TestArbIDNamespaceMatchesRust pin guards
drift.
- ArbIDFromOppID(string) -> uuid.UUID and BundleIDFor(uuid, block)
-> uuid.UUID for deterministic ids. Same (arb, block) pair always
produces the same bundle_id so resubmission ON CONFLICT is naturally
idempotent.
cmd/executor/main.go — wired
- LedgerFromEnv constructed once at startup, deferred Close() flushes
in-flight writes on shutdown.
- processArb gains a db.Ledger param. After bundle build, derives
arb_db_id + bundle_id deterministically and emits both as structured
fields on the submission log line so operators can grep id -> psql
WHERE arb_id = ... straight from logs.
- Shadow path persists the bundle row (IsShadow=true, no builders).
- Live path persists bundle + per-builder inclusion rows. Critical
semantic note: Included on these rows reflects builder *acceptance*
for next-block inclusion, not on-chain inclusion — the future
GetBundleStats poll loop UPSERTs the same (bundle_id, builder) row
with included_block + landed_tx_hash populated.
- pnl_daily inline roll-up: bundle_count bumps every submit; on
acceptance realized_profit_wei + inclusion_count accumulate. Adds
~30 LOC to keep the table populated during fork runs without a
separate cron.
Tests
- Existing 14 processArb / 2 consumeArbStream call sites updated to
pass db.NewNoopLedger().
- New deterministic-id tests pin the ArbIDNamespace and
BundleIDFor / ArbIDFromOppID semantics.
- go test ./... all green.
go.mod — adds github.com/jackc/pgx/v5/pgxpool.
|
The latest updates on your projects. Learn more about Vercel for GitHub.
|
Self-review: alignment vs #95 + correctness on what landedReviewed locally before requesting external eyes. Posting here so the rationale travels with the PR. Alignment matrix (#95 acceptance criteria)
Justified deferrals — and their consequences
CRITICAL — block mergeCRIT 1 — CRIT 2 — HIGHH1 — H2 — H3 — MEDIUMM1 — M2 — M3 — M4 — LOW / NIT
What's already right
VerdictRequest changes. Architecture is right. Criterion 6 nominally met, but criterion 3 is effectively unmet — CRIT 1 kills every live |
bundles.builders is JSONB (per migration 0001) but the previous bind sent a Go []string straight through, which pgx maps to Postgres text[] — every live insert would fail with 'column "builders" is of type jsonb but expression is of type text[]'. Self-review caught it before any user-facing run hit the path. Marshal []string → []byte JSON via encoding/json and bind with an explicit $8::jsonb cast so the wire format matches the column type regardless of nil / empty Builders slices. Refs PR #122 self-review (CRIT 1).
The Rust engine and Go executor each own one half of the trade-ledger
write surface and both run fire-and-forget through their own bounded
mpsc → writer task. There is no cross-process ordering between Rust's
'ARB PUBLISHED → insert_arb' and Go's 'bundle signed → insert_bundle';
under load the Go bundle insert lands first and the immediate FK check
fails. Self-review flagged this as CRIT 2 — it would surface as a
steady stream of bundle drops on every busy block and mask real ledger
health.
Drops bundles_arb_id_fkey via an idempotent migration so both writers
can race freely. Trade-off: a transient Rust connection blip can
produce an orphan bundle row; that is already metered as
aether_ledger_writes_total{op="insert_arb",result="err"}, and
downstream LEFT JOIN queries treat the NULL arb side as informative.
Comment in the migration spells out the future path (coordinator or
reconciliation worker) for re-adding the FK once cross-process
ordering exists.
Refs PR #122 self-review (CRIT 2).
Two issues caught in self-review: 1. Close() called wg.Wait() unconditionally. A wedged Postgres would hang executor shutdown forever. Now caps the drain at ledgerCloseDrainTimeout (5s), and on timeout cancels the writer dispatcher's context so any in-flight queries abort cleanly before pool.Close() runs. 2. dispatch() used ctx for both the dispatcher loop and per-op runs, shared with the caller's ctx. Caller cancellation would also kill in-flight queries, defeating Close()'s drain. Splits the dispatcher onto an independent context, owned by PgLedger and only cancelled from Close() on timeout. 3. Reorganises dispatch() so defer inflight.Wait() runs on every exit path (channel close, ctx cancel, future error branches). Previously a ctx-cancel return path skipped the wait and left dangling writer goroutines holding pool connections after the dispatcher had already reported wg.Done(). Refs PR #122 self-review (CRIT 3, MED 1).
… loop `inclusion_results.included` is the on-chain outcome the schema's `WHERE included` partial index expects, not the JSON-RPC ACK from a builder. The previous wiring set Included=r.Success on the submit-time row, which would make dashboards think every accepted bundle landed — silently inflating inclusion-rate metrics by an order of magnitude. Submit-time row now writes Included=false unconditionally with the builder's error string preserved on failure. The future GetBundleStats poll loop UPSERTs the same (bundle_id, builder) row with the real on-chain truth (Included=true, IncludedBlock, LandedTxHash) once the target block lands. Dashboards can distinguish "builder rejected" (Error != NULL, Included=false) from "never landed" (Error=NULL, Included=false) on day one. Drops the included-branch from the pnl_daily inline roll-up for the same reason: realized_profit_wei + inclusion_count are deferred to the poll loop. bundle_count still bumps every submit so the table stays useful for "how many bundles did we attempt" queries during fork runs. Refs PR #122 self-review (HIGH 2).
…pace doc Bundle of small self-review nits: - enqueue() flips Inc/send order so the gauge can't briefly read negative when the dispatcher's Dec() lands between a slow enqueuer's send and its Inc(). Failed sends now Revert via Dec(), keeping the gauge consistent with actual channel depth. - write_latency_ms histogram adds 0.1 / 0.25 ms buckets so local- Postgres inserts (~150-300 µs) show up at p50 instead of being flattened into the 0.5 ms bucket. - gasSpentApprox switches from big.Float intermediate to direct big.Int arithmetic so the value round-trips into NUMERIC(78,0) without precision drift across cumulative pnl_daily updates. - BundleIDNamespace gains a doc note explaining why no parallel Rust pin test exists today (Rust does not write bundles), and what to add if a future chain-backfill reconciliation worker on the Rust side starts producing bundle ids. Refs PR #122 self-review (MED 2, MED 4, NIT histogram, NIT gas).
|
Self-review round 1 addressed across 5 atomic commits ( Fixed (5 commits)
Deferred (real but follow-up scope)
Noise (skipped)
Local checks
Re-requesting review. |
bundles.builders is JSONB (per migration 0001) but the previous bind sent a Go []string straight through, which pgx maps to Postgres text[] — every live insert would fail with 'column "builders" is of type jsonb but expression is of type text[]'. Self-review caught it before any user-facing run hit the path. Marshal []string → []byte JSON via encoding/json and bind with an explicit $8::jsonb cast so the wire format matches the column type regardless of nil / empty Builders slices. Refs PR #122 self-review (CRIT 1).
The Rust engine and Go executor each own one half of the trade-ledger
write surface and both run fire-and-forget through their own bounded
mpsc → writer task. There is no cross-process ordering between Rust's
'ARB PUBLISHED → insert_arb' and Go's 'bundle signed → insert_bundle';
under load the Go bundle insert lands first and the immediate FK check
fails. Self-review flagged this as CRIT 2 — it would surface as a
steady stream of bundle drops on every busy block and mask real ledger
health.
Drops bundles_arb_id_fkey via an idempotent migration so both writers
can race freely. Trade-off: a transient Rust connection blip can
produce an orphan bundle row; that is already metered as
aether_ledger_writes_total{op="insert_arb",result="err"}, and
downstream LEFT JOIN queries treat the NULL arb side as informative.
Comment in the migration spells out the future path (coordinator or
reconciliation worker) for re-adding the FK once cross-process
ordering exists.
Refs PR #122 self-review (CRIT 2).
Two issues caught in self-review: 1. Close() called wg.Wait() unconditionally. A wedged Postgres would hang executor shutdown forever. Now caps the drain at ledgerCloseDrainTimeout (5s), and on timeout cancels the writer dispatcher's context so any in-flight queries abort cleanly before pool.Close() runs. 2. dispatch() used ctx for both the dispatcher loop and per-op runs, shared with the caller's ctx. Caller cancellation would also kill in-flight queries, defeating Close()'s drain. Splits the dispatcher onto an independent context, owned by PgLedger and only cancelled from Close() on timeout. 3. Reorganises dispatch() so defer inflight.Wait() runs on every exit path (channel close, ctx cancel, future error branches). Previously a ctx-cancel return path skipped the wait and left dangling writer goroutines holding pool connections after the dispatcher had already reported wg.Done(). Refs PR #122 self-review (CRIT 3, MED 1).
… loop `inclusion_results.included` is the on-chain outcome the schema's `WHERE included` partial index expects, not the JSON-RPC ACK from a builder. The previous wiring set Included=r.Success on the submit-time row, which would make dashboards think every accepted bundle landed — silently inflating inclusion-rate metrics by an order of magnitude. Submit-time row now writes Included=false unconditionally with the builder's error string preserved on failure. The future GetBundleStats poll loop UPSERTs the same (bundle_id, builder) row with the real on-chain truth (Included=true, IncludedBlock, LandedTxHash) once the target block lands. Dashboards can distinguish "builder rejected" (Error != NULL, Included=false) from "never landed" (Error=NULL, Included=false) on day one. Drops the included-branch from the pnl_daily inline roll-up for the same reason: realized_profit_wei + inclusion_count are deferred to the poll loop. bundle_count still bumps every submit so the table stays useful for "how many bundles did we attempt" queries during fork runs. Refs PR #122 self-review (HIGH 2).
Summary
PR-2 of the trade-ledger plan. Adds the Go side of the writer surface:
PgLedgeronpgxpool, executor wiring on bundle submission and per-builder result handling, inlinepnl_dailyroll-up, andaether_ledger_*metrics that mirror the Rust side exactly. After this lands, running aether against a fork or staging produces row trails inbundles,inclusion_results, andpnl_daily— closing the practical "DB integration on develop" goal.PR-3 (CI Postgres + counter-vs-row reconciliation test) is intentionally not in scope per the agreed-to minimum-viable plan.
Single commit
39492a2feat(ledger): Go PgLedger pgxpool impl + executor wiringWhat landed
internal/db/ledger_pg.go(new, 232 LOC)PgLedgeronpgxpool. Hot-path enqueue is non-blockingtry-sendonto a boundedchan(cap 1024). Dispatcher goroutine drains and fans out via aSemaphore(8)so the pgx pool runs at capacity without acquire-queueing.aether_ledger_drops_total{op}— never blocks the executor.DATABASE_URLfails fast andLedgerFromEnvfalls back toNoopLedger.bundles+ ON CONFLICT upsert oninclusion_results+ delta-accumulation upsert onpnl_daily(multiple writers can contribute to the same day without lost updates;NUMERIC(78,0)preserves U256 economics losslessly).Close()for graceful shutdown drains in-flight writes.internal/db/metrics.go(new, 51 LOC)LedgerMetricsregistered against the default Prometheus registry. Names byte-identical to the Rust side:aether_ledger_writes_total{op, result}aether_ledger_drops_total{op}aether_ledger_queue_depthaether_ledger_write_latency_ms{op}internal/db/ledger.go(extended)ArbIDNamespace+BundleIDNamespaceconstants.ArbIDNamespaceis byte-identical to the RustARB_ID_NAMESPACEso log↔DB join keys are symmetric across the gRPC boundary; aTestArbIDNamespaceMatchesRustpin guards drift.ArbIDFromOppID(string) → uuid.UUIDandBundleIDFor(uuid, block) → uuid.UUIDfor deterministic ids. Same(arb, block)always produces the samebundle_idso resubmissionON CONFLICTis naturally idempotent.cmd/executor/main.go(wired)LedgerFromEnvat startup, deferredClose()on shutdown.processArbgainsdb.Ledgerparam. After bundle build, derivesarb_db_id+bundle_iddeterministically and emits both as structured log fields on thearb submittedline sogrep <id> logs/* | psql ... WHERE arb_id = ...works straight from logs.bundlesrow withIsShadow=true(no builders).bundles+ per-builderinclusion_results. Critical semantic note (in code comment):Includedhere reflects builder acceptance for next-block inclusion, not on-chain inclusion — the futureGetBundleStatspoll loop will UPSERT the same(bundle_id, builder)row withincluded_block+landed_tx_hashpopulated.pnl_dailyinline roll-up:bundle_countbumps every submit; on builder acceptancerealized_profit_wei+inclusion_countaccumulate. ~30 LOC to keep the table populated during fork / staging runs without a separate cron.Tests
processArb+ 2consumeArbStreamcall sites in*_test.goupdated to passdb.NewNoopLedger().ArbIDNamespacebyte-equality with Rust and the determinism / variance contract onArbIDFromOppID/BundleIDFor.go test ./...100% green.Cargo / go.mod
github.com/jackc/pgx/v5/pgxpool.Acceptance criteria progress (#95)
sqlx::PgPool)pgxpool)Criteria 8 (counter-vs-row reconciliation) and 9 (CI Postgres) intentionally deferred per the minimum-viable plan.
Out of scope
GetBundleStatspoll loop that UPSERTsinclusion_resultswith on-chain truth. Schema already supports the upsert; this PR's writes are submission-time. Followup.update_inclusion).arbs+inclusion_results.Local verification
go build ./...clean.go vet ./...clean.go test ./...: all green (executor + db + risk + config + pooldiscovery).