diff --git a/execution/engineapi/engine_server.go b/execution/engineapi/engine_server.go index 54cabdaca3b..e25ff7fe90e 100644 --- a/execution/engineapi/engine_server.go +++ b/execution/engineapi/engine_server.go @@ -388,8 +388,6 @@ func (s *EngineServer) newPayload(ctx context.Context, req *engine_types.Executi } } - log.Debug(fmt.Sprintf("bal from header: %s", blockAccessList.DebugString())) - if (!s.config.IsCancun(header.Time) && version >= clparams.DenebVersion) || (s.config.IsCancun(header.Time) && version < clparams.DenebVersion) || (!s.config.IsPrague(header.Time) && version >= clparams.ElectraVersion) || diff --git a/execution/execmodule/exec_module_test.go b/execution/execmodule/exec_module_test.go index b931977c151..69680c1f708 100644 --- a/execution/execmodule/exec_module_test.go +++ b/execution/execmodule/exec_module_test.go @@ -39,6 +39,7 @@ import ( "github.com/erigontech/erigon/db/kv" "github.com/erigontech/erigon/db/kv/rawdbv3" "github.com/erigontech/erigon/db/rawdb" + "github.com/erigontech/erigon/db/state/changeset" "github.com/erigontech/erigon/execution/builder" "github.com/erigontech/erigon/execution/chain" "github.com/erigontech/erigon/execution/commitment/commitmentdb" @@ -2050,3 +2051,113 @@ func TestInsertBlocksWithBatchedFCU_BadBlockRecovery_Foreground(t *testing.T) { func TestInsertBlocksWithBatchedFCU_BadBlockRecovery_Background(t *testing.T) { runBatchedFCUBadBlockRecovery(t, true) } + +// transferGen returns a deterministic per-block tx generator: identical +// inputs produce identical blocks, which lets tests build forks that share +// a prefix with the canonical chain (requires a pre-Cancun config — Cancun+ +// blocks get a random ParentBeaconBlockRoot in blockgen). +func transferGen(t *testing.T, key *ecdsa.PrivateKey, to common.Address, amount uint64) func(int, *blockgen.BlockGen) { + return func(i int, b *blockgen.BlockGen) { + tx, err := types.SignTx( + types.NewTransaction(uint64(i), to, uint256.NewInt(amount), 50000, uint256.NewInt(1), nil), + *types.LatestSignerForChainID(nil), + key, + ) + require.NoError(t, err) + b.AddTx(tx) + } +} + +// A batch longer than MaxReorgDepth must still produce changesets for the +// last MaxReorgDepth blocks, otherwise no reorg of any depth is possible +// after the batch. +func TestLargeBatchExecGeneratesChangesetsForReorgWindow(t *testing.T) { + ctx := t.Context() + privKey, err := crypto.GenerateKey() + require.NoError(t, err) + senderAddr := crypto.PubkeyToAddress(privKey.PublicKey) + m := execmoduletester.New(t, + execmoduletester.WithKey(privKey), + execmoduletester.WithAlwaysGenerateChangesets(false), + ) + + maxReorgDepth := m.Cfg().Sync.MaxReorgDepth + chainLen := int(maxReorgDepth) + 14 + + chainPack, err := blockgen.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, chainLen, + transferGen(t, privKey, senderAddr, 1_000)) + require.NoError(t, err) + + insRes, err := insertBlocks(ctx, m.ExecModule, chainPack.Blocks) + require.NoError(t, err) + require.Equal(t, execmodule.ExecutionStatusSuccess, insRes) + fcuRes, err := updateForkChoice(ctx, m.ExecModule, chainPack.TopBlock.Header()) + require.NoError(t, err) + require.Equal(t, execmodule.ExecutionStatusSuccess, fcuRes.Status) + + require.NoError(t, m.DB.ViewTemporal(ctx, func(tx kv.TemporalTx) error { + lowest, err := changeset.ReadLowestUnwindableBlock(tx) + require.NoError(t, err) + require.Equal(t, uint64(chainLen)-maxReorgDepth, lowest, + "changesets must cover the last MaxReorgDepth blocks of the batch") + return nil + })) +} + +// After a single batch execution longer than MaxReorgDepth, an FCU onto a +// fork branching a few blocks below the tip must unwind and re-execute +// instead of failing with ReorgTooDeep. +func TestUpdateForkChoiceShallowReorgAfterLargeBatchExec(t *testing.T) { + ctx := t.Context() + privKey, err := crypto.GenerateKey() + require.NoError(t, err) + senderAddr := crypto.PubkeyToAddress(privKey.PublicKey) + m := execmoduletester.New(t, + execmoduletester.WithKey(privKey), + execmoduletester.WithAlwaysGenerateChangesets(false), + ) + + maxReorgDepth := m.Cfg().Sync.MaxReorgDepth + chainLen := int(maxReorgDepth) + 14 + const reorgDepth = 4 + divergeFrom := chainLen - reorgDepth + + canonical, err := blockgen.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, chainLen, + transferGen(t, privKey, senderAddr, 1_000)) + require.NoError(t, err) + fork, err := blockgen.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, chainLen, + func(i int, b *blockgen.BlockGen) { + amount := uint64(1_000) + if i >= divergeFrom { + amount = 2_000 + } + transferGen(t, privKey, senderAddr, amount)(i, b) + }) + require.NoError(t, err) + require.Equal(t, canonical.Blocks[divergeFrom-1].Hash(), fork.Blocks[divergeFrom-1].Hash()) + require.NotEqual(t, canonical.Blocks[divergeFrom].Hash(), fork.Blocks[divergeFrom].Hash()) + + insRes, err := insertBlocks(ctx, m.ExecModule, canonical.Blocks) + require.NoError(t, err) + require.Equal(t, execmodule.ExecutionStatusSuccess, insRes) + fcuRes, err := updateForkChoice(ctx, m.ExecModule, canonical.TopBlock.Header()) + require.NoError(t, err) + require.Equal(t, execmodule.ExecutionStatusSuccess, fcuRes.Status) + + insRes, err = insertBlocks(ctx, m.ExecModule, fork.Blocks[divergeFrom:]) + require.NoError(t, err) + require.Equal(t, execmodule.ExecutionStatusSuccess, insRes) + fcuRes, err = updateForkChoice(ctx, m.ExecModule, fork.TopBlock.Header()) + require.NoError(t, err) + require.Equal(t, execmodule.ExecutionStatusSuccess, fcuRes.Status, + "shallow reorg of %d blocks after a %d-block batch must succeed; status=%s validationError=%q", + reorgDepth, chainLen, fcuRes.Status, fcuRes.ValidationError) + + require.NoError(t, m.DB.ViewTemporal(ctx, func(tx kv.TemporalTx) error { + execProg, err := stages.GetStageProgress(tx, stages.Execution) + require.NoError(t, err) + require.Equal(t, uint64(chainLen), execProg) + require.Equal(t, fork.TopBlock.Hash(), rawdb.ReadHeadBlockHash(tx)) + return nil + })) +} diff --git a/execution/execmodule/execmoduletester/exec_module_tester.go b/execution/execmodule/execmoduletester/exec_module_tester.go index 395936eab38..78405e904f7 100644 --- a/execution/execmodule/execmoduletester/exec_module_tester.go +++ b/execution/execmodule/execmoduletester/exec_module_tester.go @@ -333,6 +333,16 @@ func WithFcuBackgroundCommit() Option { } } +// WithAlwaysGenerateChangesets pins --experimental.always-generate-changesets +// regardless of the tester default: true for tests that reorg deeper than +// MaxReorgDepth, false for tests that rely on the windowed-changesets +// production behaviour. +func WithAlwaysGenerateChangesets(v bool) Option { + return func(opts *options) { + opts.alwaysGenerateChangesets = &v + } +} + func WithFcuBackgroundPrune() Option { return func(opts *options) { opts.fcuBackgroundPrune = true @@ -340,17 +350,18 @@ func WithFcuBackgroundPrune() Option { } type options struct { - stepSize *uint64 - experimentalBAL bool - genesis *types.Genesis - chainConfig *chain.Config - key *ecdsa.PrivateKey - engine rules.Engine - pruneMode *prune.Mode - withTxPool bool - enableDomains []kv.Domain - fcuBackgroundCommit bool - fcuBackgroundPrune bool + stepSize *uint64 + experimentalBAL bool + genesis *types.Genesis + chainConfig *chain.Config + key *ecdsa.PrivateKey + engine rules.Engine + pruneMode *prune.Mode + withTxPool bool + enableDomains []kv.Domain + fcuBackgroundCommit bool + fcuBackgroundPrune bool + alwaysGenerateChangesets *bool } func applyOptions(opts []Option) options { @@ -420,7 +431,9 @@ func New(tb testing.TB, opts ...Option) *ExecModuleTester { cfg.Sync.BodyDownloadTimeoutSeconds = 10 cfg.TxPool.Disable = !withTxPool cfg.Dirs = dirs - cfg.AlwaysGenerateChangesets = true + if opt.alwaysGenerateChangesets != nil { + cfg.AlwaysGenerateChangesets = *opt.alwaysGenerateChangesets + } cfg.PersistReceiptsCacheV2 = true cfg.ChaosMonkey = false cfg.Snapshot.ChainName = gspec.Config.ChainName diff --git a/execution/stagedsync/bal_create.go b/execution/stagedsync/bal_create.go index e0dad3d75f2..351613bd264 100644 --- a/execution/stagedsync/bal_create.go +++ b/execution/stagedsync/bal_create.go @@ -5,6 +5,7 @@ import ( "os" "path/filepath" + "github.com/erigontech/erigon/common" "github.com/erigontech/erigon/common/dbg" "github.com/erigontech/erigon/common/log/v3" "github.com/erigontech/erigon/db/kv" @@ -14,40 +15,31 @@ import ( "github.com/erigontech/erigon/execution/types" ) -func CreateBAL(blockNum uint64, txIO *state.VersionedIO, dataDir string) types.BlockAccessList { +func CreateBAL(blockNum uint64, txIO *state.VersionedIO, dataDir string, logger log.Logger) types.BlockAccessList { bal := txIO.AsBlockAccessList() if dbg.TraceBlockAccessLists { - writeBALToFile(bal, blockNum, dataDir) + writeBALToFile(bal, dataDir, fmt.Sprintf("computed_bal_%d.txt", blockNum), logger) } return bal } -// writeBALToFile writes the Block Access List to a text file for debugging/analysis -func writeBALToFile(bal types.BlockAccessList, blockNum uint64, dataDir string) { +// writeBALToFile dumps the Block Access List to /bal/ for debugging/analysis. +func writeBALToFile(bal types.BlockAccessList, dataDir string, name string, logger log.Logger) { if dataDir == "" { return } - balDir := filepath.Join(dataDir, "bal") - if err := os.MkdirAll(balDir, 0755); err != nil { - log.Warn("Failed to create BAL directory", "dir", balDir, "error", err) + if err := os.MkdirAll(balDir, 0o755); err != nil { + logger.Warn("failed to create BAL debug directory", "dir", balDir, "err", err) return } - - filename := filepath.Join(balDir, fmt.Sprintf("bal_block_%d.txt", blockNum)) - - file, err := os.Create(filename) - if err != nil { - log.Warn("Failed to create BAL file", "blockNum", blockNum, "error", err) - return + path := filepath.Join(balDir, name) + if err := os.WriteFile(path, []byte(bal.DebugString()), 0o644); err != nil { + logger.Warn("failed to write BAL debug file", "path", path, "err", err) } - defer file.Close() - - fmt.Fprintf(file, "Block Access List for Block %d\n", blockNum) - bal.DebugPrint(file) } -func ProcessBAL(tx kv.TemporalRwTx, h *types.Header, vio *state.VersionedIO, isEIP7928 bool, experimental bool, dataDir string) error { +func ProcessBAL(tx kv.TemporalRwTx, h *types.Header, vio *state.VersionedIO, isEIP7928 bool, experimental bool, dataDir string, logger log.Logger) error { if !isEIP7928 && !experimental { return nil } @@ -56,97 +48,63 @@ func ProcessBAL(tx kv.TemporalRwTx, h *types.Header, vio *state.VersionedIO, isE } blockNum := h.Number.Uint64() blockHash := h.Hash() - bal := CreateBAL(blockNum, vio, dataDir) - err := bal.Validate() + computedBlockBal := CreateBAL(blockNum, vio, dataDir, logger) + err := computedBlockBal.Validate() if err != nil { return fmt.Errorf("block %d: invalid computed block access list: %w", blockNum, err) } - log.Debug("bal", "blockNum", blockNum, "hash", bal.Hash()) if !isEIP7928 { return nil } // EIP-7928 size bound is only consensus-binding once Amsterdam activates. - if err := bal.ValidateMaxItems(h.GasLimit); err != nil { + if err := computedBlockBal.ValidateMaxItems(h.GasLimit); err != nil { return fmt.Errorf("%w, block %d: %w", rules.ErrInvalidBlock, blockNum, err) } if h.BlockAccessListHash == nil { return fmt.Errorf("block %d: EIP-7928 active but BlockAccessListHash is nil in header", blockNum) } - headerBALHash := *h.BlockAccessListHash - dbBALBytes, err := rawdb.ReadBlockAccessListBytes(tx, blockHash, blockNum) + blockBalHash := *h.BlockAccessListHash + blockBalBytes, err := rawdb.ReadBlockAccessListBytes(tx, blockHash, blockNum) if err != nil { return fmt.Errorf("block %d: read stored block access list: %w", blockNum, err) } // A stored BAL sidecar may be absent — eth/71 backfill is best-effort and // never blocks stage progress — so cross-check it only when present. - if dbBALBytes != nil { - dbBAL, err := types.DecodeBlockAccessListBytes(dbBALBytes) + var blockBal types.BlockAccessList + if blockBalBytes != nil { + blockBal, err = types.DecodeBlockAccessListBytes(blockBalBytes) if err != nil { return fmt.Errorf("block %d: read stored block access list: %w", blockNum, err) } - if err = dbBAL.Validate(); err != nil { + if err = blockBal.Validate(); err != nil { return fmt.Errorf("block %d: db block access list is invalid: %w", blockNum, err) } - if err = dbBAL.ValidateMaxItems(h.GasLimit); err != nil { + if err = blockBal.ValidateMaxItems(h.GasLimit); err != nil { return fmt.Errorf("block %d: stored block access list exceeds max items: %w", blockNum, err) } - - if headerBALHash != dbBAL.Hash() { - log.Info(fmt.Sprintf("bal from block: %s", dbBAL.DebugString())) - return fmt.Errorf("block %d: invalid block access list, hash mismatch: got %s expected %s", blockNum, dbBAL.Hash(), headerBALHash) + if blockBalHash != blockBal.Hash() { + reportBALMismatch(blockNum, blockHash, blockBal, blockBalHash, computedBlockBal, dataDir, logger) + return fmt.Errorf("block %d: invalid block access list, hash mismatch: got %s expected %s", blockNum, blockBal.Hash(), blockBalHash) } } // Always validate computed BAL against header. The BalancePath cross-check // in VersionMap.validateRead ensures deterministic parallel execution even // without a stored BAL body (HasBAL=false), so the computed BAL is accurate. - if headerBALHash != bal.Hash() { - // Dump full BAL DebugStrings to stderr via the logger so container - // stdout captures (kurtosis/docker log collection) preserve the diff - // even when the on-disk dumpDir below is unreachable (e.g. ephemeral - // container, CI runner without artifact capture of /data). - // Each DebugString is one multi-line value tagged with block metadata - // so operators can diff computed vs stored without hunting for files. - computedDebug := bal.DebugString() - var storedDebug string - if dbBALBytes != nil { - dbBAL2, decErr := types.DecodeBlockAccessListBytes(dbBALBytes) - if decErr != nil { - log.Warn("failed to decode stored BAL for debug dump", "err", decErr) - } else if dbBAL2 != nil { - storedDebug = dbBAL2.DebugString() - } - } - log.Error("BAL mismatch: computed", "block", blockNum, "hash", bal.Hash(), "headerHash", headerBALHash, "bal", computedDebug) - if storedDebug != "" { - log.Error("BAL mismatch: stored (from sidecar)", "block", blockNum, "hash", headerBALHash, "bal", storedDebug) - } - - dumpDir := "" - if dataDir != "" { - balDir := filepath.Join(dataDir, "bal") - if err := os.MkdirAll(balDir, 0o755); err != nil { - log.Warn("failed to create BAL debug directory", "dir", balDir, "err", err) - } else { - computedPath := filepath.Join(balDir, fmt.Sprintf("computed_bal_%d.txt", blockNum)) - if err := os.WriteFile(computedPath, []byte(computedDebug), 0o644); err != nil { - log.Warn("failed to write computed BAL debug file", "path", computedPath, "err", err) - } else { - dumpDir = balDir - } - if storedDebug != "" { - storedPath := filepath.Join(balDir, fmt.Sprintf("stored_bal_%d.txt", blockNum)) - if err := os.WriteFile(storedPath, []byte(storedDebug), 0o644); err != nil { - log.Warn("failed to write stored BAL debug file", "path", storedPath, "err", err) - } - } - } - } - if dumpDir != "" { - return fmt.Errorf("%w, block=%d (hash=%s): block access list mismatch: got %s expected %s; debug dumps in %s", - rules.ErrInvalidBlock, blockNum, blockHash, bal.Hash(), headerBALHash, dumpDir) - } + computedBlockBalHash := computedBlockBal.Hash() + if blockBalHash != computedBlockBalHash { + reportBALMismatch(blockNum, blockHash, blockBal, blockBalHash, computedBlockBal, dataDir, logger) return fmt.Errorf("%w, block=%d (hash=%s): block access list mismatch: got %s expected %s", - rules.ErrInvalidBlock, blockNum, blockHash, bal.Hash(), headerBALHash) + rules.ErrInvalidBlock, blockNum, blockHash, computedBlockBalHash, blockBalHash) } return nil } + +// reportBALMismatch logs a BAL hash mismatch and dumps the block's BAL and the +// computed one under /bal for offline diffing. +func reportBALMismatch(blockNum uint64, blockHash common.Hash, blockBal types.BlockAccessList, blockBalHash common.Hash, computedBlockBal types.BlockAccessList, dataDir string, logger log.Logger) { + logger.Error("BAL mismatch", "blockNum", blockNum, "blockHash", blockHash, "blockBalHash", blockBalHash, "computedBlockBalHash", computedBlockBal.Hash()) + writeBALToFile(computedBlockBal, dataDir, fmt.Sprintf("computed_bal_%d.txt", blockNum), logger) + if blockBal != nil { + writeBALToFile(blockBal, dataDir, fmt.Sprintf("block_bal_%d.txt", blockNum), logger) + } +} diff --git a/execution/stagedsync/committer.go b/execution/stagedsync/committer.go index 70f1a59ed34..6729524e77d 100644 --- a/execution/stagedsync/committer.go +++ b/execution/stagedsync/committer.go @@ -109,6 +109,13 @@ type commitmentCalculator struct { // LAST block's changeset, which is wrong for per-block unwind. forcePerBlockCompute bool + // perBlockFrom is the batch's changeset window start: blocks >= it + // compute per-block (their changesets must record per-block branch + // deltas); blocks below it accumulate in batch mode. The last + // pre-window block triggers a transition compute (computeTransition) + // so no pre-window branch deltas leak into a window block's changeset. + perBlockFrom uint64 + wg sync.WaitGroup done chan struct{} } @@ -120,6 +127,7 @@ func newCommitmentCalculator( logPrefix string, logger log.Logger, forcePerBlockCompute bool, + perBlockFrom uint64, in chan applyResult, out chan commitmentResult, ) (*commitmentCalculator, error) { @@ -162,6 +170,7 @@ func newCommitmentCalculator( in: in, out: out, forcePerBlockCompute: forcePerBlockCompute, + perBlockFrom: perBlockFrom, done: make(chan struct{}), }, nil } @@ -250,8 +259,9 @@ func (cc *commitmentCalculator) handleMessage(ctx context.Context, msg applyResu // serial's gate (exec3_serial.go around the `if !dbg.BatchCommitments // || shouldGenerateChangesets || ...` check) — per-block compute is // required when changesets must record per-block branch deltas - // (reorg support, KeepExecutionProofs). - if !dbg.BatchCommitments || cc.forcePerBlockCompute { + // (reorg support, KeepExecutionProofs). Blocks from the changeset + // window (perBlockFrom) onward compute per-block for the same reason. + if !dbg.BatchCommitments || cc.forcePerBlockCompute || r.BlockNum >= cc.perBlockFrom { if cc.lastComputedBlock == 0 && r.isPartial { // First block is partial (resumed mid-block). // Compute it (like serial does) to save trie state, then @@ -261,6 +271,18 @@ func (cc *commitmentCalculator) handleMessage(ctx context.Context, msg applyResu } else { cc.computeAndCheck(ctx, r) } + if r.BlockNum+1 == cc.perBlockFrom { + // Pre-window per-block computes (BatchCommitments off) defer + // branch writes too — flush the boundary block's pending update + // outside any changeset before the first window block's compute + // routes into its saved CS. + cc.flushPendingUpdatesWithoutChangeset(ctx, r) + } + } else if r.BlockNum+1 == cc.perBlockFrom { + // Last pre-window block: fold everything accumulated in batch + // mode now, so the first window block's per-block compute (and + // changeset) covers only its own deltas. + cc.computeTransition(ctx, r) } // In BatchCommitments mode (without forcePerBlockCompute): just // accumulate — compute only on explicit commitComputeRequest from @@ -450,6 +472,79 @@ func (cc *commitmentCalculator) computeAndCheck(ctx context.Context, br *blockRe } } +// flushPendingUpdatesWithoutChangeset eagerly applies the pending deferred +// update under a nil accumulator — a pre-window block's branch deltas must +// not pend into the first window block's changeset-routed compute. +func (cc *commitmentCalculator) flushPendingUpdatesWithoutChangeset(ctx context.Context, br *blockResult) { + cc.doms.LockChangesetAccumulator() + prev := cc.doms.GetChangesetAccumulatorLocked() + cc.doms.SetChangesetAccumulatorLocked(nil) + err := cc.doms.FlushPendingUpdatesLocked(ctx, cc.roTx) + cc.doms.SetChangesetAccumulatorLocked(prev) + cc.doms.UnlockChangesetAccumulator() + if err != nil { + cc.publish(ctx, commitmentResult{ + blockNum: br.BlockNum, + txNum: br.lastTxNum, + err: fmt.Errorf("commitmentCalculator: %w", err), + }) + } +} + +// computeTransition folds all batch-mode blocks at the last pre-window block +// so the first window block's compute (and changeset) covers only its own +// deltas. Branch writes and the deferred-update flush run under a nil +// accumulator — pre-window deltas must not land in any block's changeset. +func (cc *commitmentCalculator) computeTransition(ctx context.Context, br *blockResult) { + if err := cc.state.LazyLoadErr(); err != nil { + cc.publish(ctx, commitmentResult{ + blockNum: br.BlockNum, + txNum: br.lastTxNum, + err: fmt.Errorf("commitmentCalculator: lazy-load failed: %w", err), + }) + return + } + cc.state.FlushToUpdates(cc.updates) + cc.state.ResetBlockFlags() + + sdCtx := cc.doms.GetCommitmentContext() + sdCtx.SetUpdates(cc.updates) + cc.updates = cc.updates.NewEmpty() + + cc.asOfReader.txNum = br.lastTxNum + 1 + sdCtx.SetStateReader(cc.asOfReader) + + cc.doms.LockChangesetAccumulator() + prev := cc.doms.GetChangesetAccumulatorLocked() + cc.doms.SetChangesetAccumulatorLocked(nil) + rh, err := cc.doms.ComputeCommitmentLocked(ctx, cc.roTx, true, br.BlockNum, br.lastTxNum, cc.logPrefix, nil) + if err == nil { + err = cc.doms.FlushPendingUpdatesLocked(ctx, cc.roTx) + } + cc.doms.SetChangesetAccumulatorLocked(prev) + cc.doms.UnlockChangesetAccumulator() + if err != nil { + cc.publish(ctx, commitmentResult{ + blockNum: br.BlockNum, + txNum: br.lastTxNum, + err: fmt.Errorf("commitmentCalculator: %w", err), + }) + return + } + + cc.lastComputedBlock = br.BlockNum + cc.hasComputed = true + + if !bytes.Equal(rh, br.StateRoot[:]) { + cc.publish(ctx, commitmentResult{ + blockNum: br.BlockNum, + txNum: br.lastTxNum, + rootHash: rh, + err: fmt.Errorf("%w: block %d root %x expected %x", ErrWrongTrieRoot, br.BlockNum, rh, br.StateRoot), + }) + } +} + func (cc *commitmentCalculator) publish(ctx context.Context, r commitmentResult) { select { case cc.out <- r: diff --git a/execution/stagedsync/exec3.go b/execution/stagedsync/exec3.go index 425adcd13b2..810b9ae9bb7 100644 --- a/execution/stagedsync/exec3.go +++ b/execution/stagedsync/exec3.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "math" "os" "sync" "sync/atomic" @@ -834,3 +835,22 @@ func shouldGenerateChangeSets(cfg ExecuteBlockCfg, blockNum, maxBlockNum uint64) // so the node can handle reorgs at the tip. return blockNum+cfg.syncCfg.MaxReorgDepth >= maxBlockNum } + +// changesetWindowStart returns the first block in [startBlockNum, maxBlockNum] +// for which shouldGenerateChangeSets is true, or math.MaxUint64 when there is +// none. Parallel exec gates per-block changeset capture and the commitment +// calculator's per-block mode on this boundary. +func changesetWindowStart(alwaysGenerateChangesets bool, maxReorgDepth uint64, frozenBlocks uint64, startBlockNum uint64, maxBlockNum uint64) uint64 { + if alwaysGenerateChangesets { + return startBlockNum + } + windowStart := startBlockNum + if maxBlockNum > maxReorgDepth { + windowStart = max(windowStart, maxBlockNum-maxReorgDepth) + } + windowStart = max(windowStart, frozenBlocks) + if windowStart > maxBlockNum { + return math.MaxUint64 + } + return windowStart +} diff --git a/execution/stagedsync/exec3_parallel.go b/execution/stagedsync/exec3_parallel.go index f3315029ad9..cf8ebb78e40 100644 --- a/execution/stagedsync/exec3_parallel.go +++ b/execution/stagedsync/exec3_parallel.go @@ -112,8 +112,11 @@ type parallelExecutor struct { // goroutine and avoids the data race between SetChangesetAccumulator // (apply loop) and ApplyStateWrites (exec loop, via SysCallContract for // block-end system calls) on SharedDomains.mem. - shouldGenerateChangesets bool - currentChangeSet *changeset.StateChangeSet + // changesetWindowStart is the first block of the batch that must capture + // a changeset (see changesetWindowStart in exec3.go); blocks below it run + // without an accumulator. + changesetWindowStart uint64 + currentChangeSet *changeset.StateChangeSet // currentChangeSetBlock is the block number currentChangeSet belongs to // (0 == none). Tracked so ensureChangesetAccumulator can be a no-op when the // accumulator is already installed for the block whose writes are about to @@ -130,7 +133,7 @@ type parallelExecutor struct { // SetChangesetAccumulator, which must be single-writer (see the comment on // currentChangeSet). func (pe *parallelExecutor) ensureChangesetAccumulator(blockNum uint64) { - if !pe.shouldGenerateChangesets || blockNum == 0 || blockNum > pe.maxBlockNum { + if blockNum < pe.changesetWindowStart || blockNum == 0 || blockNum > pe.maxBlockNum { return } if pe.currentChangeSet != nil && pe.currentChangeSetBlock == blockNum { @@ -256,18 +259,18 @@ func (pe *parallelExecutor) execImpl(ctx context.Context, execStage *StageState, // exec loop owns all subsequent SetChangesetAccumulator transitions // (per-block save/clear/install) so apply-loop and exec-loop sd.mem // writes never race on SharedDomains.mem. - pe.shouldGenerateChangesets = shouldGenerateChangeSets(pe.cfg, startBlockNum, maxBlockNum) + pe.changesetWindowStart = changesetWindowStart(pe.cfg.syncCfg.AlwaysGenerateChangesets, + pe.cfg.syncCfg.MaxReorgDepth, pe.cfg.blockReader.FrozenBlocks(), startBlockNum, maxBlockNum) pe.ensureChangesetAccumulator(startBlockNum) - // Start the commitment calculator. forcePerBlockCompute mirrors serial's - // per-block gate (exec3_serial.go: `if !dbg.BatchCommitments || - // shouldGenerateChangesets || KeepExecutionProofs`). When changesets - // must be generated (for unwind/reorg) the calculator must compute - // per-block — otherwise batch-mode dedupes branch updates across the - // batch and flushes them all into the last block's changeset, which - // fails on subsequent reorgs. - forcePerBlockCompute := pe.shouldGenerateChangesets || pe.cfg.syncCfg.KeepExecutionProofs - calculator, err := newCommitmentCalculator(executorContext, pe.rs.Domains(), pe.cfg.db, pe.logPrefix, pe.logger, forcePerBlockCompute, commitResults, rootResults) + // Start the commitment calculator. It mirrors serial's per-block gate + // (exec3_serial.go: `if !dbg.BatchCommitments || shouldGenerateChangesets + // || KeepExecutionProofs`): blocks from the changeset window onward must + // compute per-block — otherwise batch-mode dedupes branch updates across + // the batch and flushes them all into one block's changeset, which fails + // on subsequent reorgs. + forcePerBlockCompute := pe.cfg.syncCfg.KeepExecutionProofs + calculator, err := newCommitmentCalculator(executorContext, pe.rs.Domains(), pe.cfg.db, pe.logPrefix, pe.logger, forcePerBlockCompute, pe.changesetWindowStart, commitResults, rootResults) if err != nil { return nil, nil, err } @@ -310,7 +313,7 @@ func (pe *parallelExecutor) execImpl(ctx context.Context, execStage *StageState, } defer applyRoTx.Rollback() - // pe.shouldGenerateChangesets and pe.currentChangeSet were set up + // pe.changesetWindowStart and pe.currentChangeSet were set up // before pe.run/executeBlocks launched their goroutines (above the // calculator.Start call). Per-block accumulator save/clear/install // transitions are driven from the exec loop's blockResult handler. @@ -606,7 +609,7 @@ func (pe *parallelExecutor) execImpl(ctx context.Context, execStage *StageState, } if pe.cfg.chainConfig.IsAmsterdam(applyResult.BlockTime) || pe.cfg.experimentalBAL { - err = ProcessBAL(rwTx, lastHeader, applyResult.TxIO, pe.cfg.chainConfig.IsAmsterdam(applyResult.BlockTime), pe.cfg.experimentalBAL, pe.cfg.dirs.DataDir) + err = ProcessBAL(rwTx, lastHeader, applyResult.TxIO, pe.cfg.chainConfig.IsAmsterdam(applyResult.BlockTime), pe.cfg.experimentalBAL, pe.cfg.dirs.DataDir, pe.logger) if err != nil { return err } @@ -978,7 +981,7 @@ func (pe *parallelExecutor) execLoop(ctx context.Context) (err error) { // blockResult, so that the commitment calculator (which consumes // blockResults on a separate goroutine) can find this block's // saved changeset via GetChangesetByBlockNum at compute time. - // In per-block compute mode (shouldGenerateChangesets), the + // In per-block compute mode (changeset window), the // calculator switches the accumulator to this saved CS for the // duration of ComputeCommitment (committer.go:computeWithBlockAccumulator) // so branch writes land in block N's CS rather than whatever the diff --git a/execution/stagedsync/exec3_parallel_robustness_test.go b/execution/stagedsync/exec3_parallel_robustness_test.go index 60767de06cb..b6cffcb7405 100644 --- a/execution/stagedsync/exec3_parallel_robustness_test.go +++ b/execution/stagedsync/exec3_parallel_robustness_test.go @@ -12,6 +12,7 @@ package stagedsync import ( "context" "errors" + "math" "strings" "sync" "sync/atomic" @@ -22,6 +23,86 @@ import ( "github.com/erigontech/erigon/execution/protocol/rules" ) +// TestChangesetWindowStart covers the pure helper that decides where the +// changeset window of a batch begins. Evaluating the window once at +// startBlockNum (instead of per block) would leave any batch longer than +// MaxReorgDepth without changesets, making the node unable to reorg +// afterwards. +func TestChangesetWindowStart(t *testing.T) { + cases := []struct { + name string + alwaysGenerateChangesets bool + maxReorgDepth uint64 + frozenBlocks uint64 + startBlockNum uint64 + maxBlockNum uint64 + want uint64 + }{ + { + name: "big batch: window covers the last maxReorgDepth blocks", + maxReorgDepth: 96, + startBlockNum: 1, + maxBlockNum: 1000, + want: 904, + }, + { + name: "small batch: whole batch in window", + maxReorgDepth: 96, + startBlockNum: 950, + maxBlockNum: 1000, + want: 950, + }, + { + name: "batch end below maxReorgDepth: window from batch start", + maxReorgDepth: 96, + startBlockNum: 1, + maxBlockNum: 96, + want: 1, + }, + { + name: "alwaysGenerateChangesets overrides depth and frozen gates", + alwaysGenerateChangesets: true, + maxReorgDepth: 96, + frozenBlocks: 2000, + startBlockNum: 1, + maxBlockNum: 1000, + want: 1, + }, + { + name: "frozen blocks push the window up", + maxReorgDepth: 96, + frozenBlocks: 950, + startBlockNum: 1, + maxBlockNum: 1000, + want: 950, + }, + { + name: "fully frozen batch has no window", + maxReorgDepth: 96, + frozenBlocks: 2000, + startBlockNum: 1, + maxBlockNum: 1000, + want: math.MaxUint64, + }, + { + name: "long catch-up batch keeps a shallow reorg below its tip unwindable", + maxReorgDepth: 96, + startBlockNum: 5138, + maxBlockNum: 6137, + want: 6041, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + got := changesetWindowStart(tc.alwaysGenerateChangesets, tc.maxReorgDepth, tc.frozenBlocks, tc.startBlockNum, tc.maxBlockNum) + if got != tc.want { + t.Fatalf("changesetWindowStart got %d, want %d", got, tc.want) + } + }) + } +} + // TestApplyLoopMissingBlocks covers the pure completeness-check helper. // Every entry asserts a single invariant — see the comment on each case. func TestApplyLoopMissingBlocks(t *testing.T) { diff --git a/execution/state/intra_block_state.go b/execution/state/intra_block_state.go index f3c7645f681..14759bcd85a 100644 --- a/execution/state/intra_block_state.go +++ b/execution/state/intra_block_state.go @@ -372,6 +372,8 @@ func (sdb *IntraBlockState) Reset() { // references are dropped, while the next execution lazily allocates fresh ones. sdb.versionedReads = nil sdb.versionedWrites = nil + sdb.recordAccess = false + sdb.addressAccess = nil sdb.accountReadDuration = 0 sdb.accountReadCount = 0 sdb.storageReadDuration = 0 diff --git a/execution/state/parallel_fixes_test.go b/execution/state/parallel_fixes_test.go index 98ba9b353da..437e8a42662 100644 --- a/execution/state/parallel_fixes_test.go +++ b/execution/state/parallel_fixes_test.go @@ -7,6 +7,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/erigontech/erigon/execution/chain" "github.com/erigontech/erigon/execution/commitment" "github.com/erigontech/erigon/execution/types/accounts" ) @@ -170,6 +171,26 @@ func TestAccessListResetInIBSReset(t *testing.T) { assert.False(t, ibs.AddressInAccessList(testAddr), "Address should be cold after Reset") } +// TestAddressAccessResetInIBSReset verifies that IBS.Reset() clears BAL +// address-access recording. An aborted incarnation never harvests +// AccessedAddresses, and only regular txs call Prepare (which re-inits +// recording) — a worker next assigned a system block-start/block-end +// transaction never calls Prepare, so it would harvest the leftovers into +// its own block's access list as phantom entries. +func TestAddressAccessResetInIBSReset(t *testing.T) { + ibs := New(nil) + sender := accounts.InternAddress([20]byte{0x01}) + coinbase := accounts.InternAddress([20]byte{0x02}) + leaked := accounts.InternAddress([20]byte{0x42}) + // Prepare enables access recording at tx start. + require.NoError(t, ibs.Prepare(&chain.Rules{}, sender, coinbase, accounts.NilAddress, nil, nil, nil)) + ibs.MarkAddressAccess(leaked, false) + // Tx aborts: AccessedAddresses is never harvested. The worker resets + // the shared IBS before the next task. + ibs.Reset() + assert.Empty(t, ibs.AccessedAddresses(), "no recorded accesses should survive Reset") +} + // TestTransientStorageResetInIBSReset verifies that IBS.Reset() clears // transient storage (EIP-1153). func TestTransientStorageResetInIBSReset(t *testing.T) { diff --git a/execution/tests/blockchain_test.go b/execution/tests/blockchain_test.go index 55eed7ffe33..54b39983c8b 100644 --- a/execution/tests/blockchain_test.go +++ b/execution/tests/blockchain_test.go @@ -1230,7 +1230,9 @@ func TestLargeReorgTrieGC(t *testing.T) { t.Parallel() // Generate the original common chain segment and the two competing forks - m, m2 := execmoduletester.New(t), execmoduletester.New(t) + // The competitor fork reorgs ~2*triesInMemory blocks deep — beyond + // MaxReorgDepth, so the inserting node needs changesets for every block. + m, m2 := execmoduletester.New(t), execmoduletester.New(t, execmoduletester.WithAlwaysGenerateChangesets(true)) shared, err := blockgen.GenerateChain(m.ChainConfig, m.Genesis, m.Engine, m.DB, 64, func(i int, b *blockgen.BlockGen) { b.SetCoinbase(common.Address{1}) @@ -1317,8 +1319,9 @@ func TestLowDiffLongChain(t *testing.T) { t.Fatalf("generate fork: %v", err) } - // Import the canonical chain - m2 := execmoduletester.New(t) + // Import the canonical chain. The fork branches at block 11 — far beyond + // MaxReorgDepth — so the inserting node needs changesets for every block. + m2 := execmoduletester.New(t, execmoduletester.WithAlwaysGenerateChangesets(true)) if err := m2.InsertChain(chain); err != nil { t.Fatalf("failed to insert into chain: %v", err)