From fbf49f7f644662b1ae6b00e52099b0c88234ba23 Mon Sep 17 00:00:00 2001 From: Alex Peters Date: Thu, 15 Jan 2026 21:57:08 +0100 Subject: [PATCH] Fix inconsistent state detection and rollback --- core/execution/execution.go | 17 +++++ execution/evm/execution.go | 123 +++++++++++++++++++++++++++++++----- 2 files changed, 123 insertions(+), 17 deletions(-) diff --git a/core/execution/execution.go b/core/execution/execution.go index 5085ebe578..f9c6df9be2 100644 --- a/core/execution/execution.go +++ b/core/execution/execution.go @@ -102,3 +102,20 @@ type HeightProvider interface { // - error: Any errors during height retrieval GetLatestHeight(ctx context.Context) (uint64, error) } + +// Rollbackable is an optional interface that execution clients can implement +// to support automatic rollback when the execution layer is ahead of the target height. +// This enables automatic recovery during rolling restarts when the EL has committed +// blocks that were not replicated to Raft. +type Rollbackable interface { + // Rollback resets the execution layer head to the specified height. + // This is used for recovery when the EL is ahead of the consensus layer. + // + // Parameters: + // - ctx: Context for timeout/cancellation control + // - targetHeight: Height to rollback to (must be <= current height) + // + // Returns: + // - error: Any errors during rollback + Rollback(ctx context.Context, targetHeight uint64) error +} diff --git a/execution/evm/execution.go b/execution/evm/execution.go index c310af06d7..000dc7ca92 100644 --- a/execution/evm/execution.go +++ b/execution/evm/execution.go @@ -58,6 +58,12 @@ var ( // Ensure EngineAPIExecutionClient implements the execution.Execute interface var _ execution.Executor = (*EngineClient)(nil) +// Ensure EngineClient implements the execution.HeightProvider interface +var _ execution.HeightProvider = (*EngineClient)(nil) + +// Ensure EngineClient implements the execution.Rollbackable interface +var _ execution.Rollbackable = (*EngineClient)(nil) + // validatePayloadStatus checks the payload status and returns appropriate errors. // It implements the Engine API specification's status handling: // - VALID: Operation succeeded, return nil @@ -338,7 +344,7 @@ func (c *EngineClient) GetTxs(ctx context.Context) ([][]byte, error) { func (c *EngineClient) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) (updatedStateRoot []byte, maxBytes uint64, err error) { // 1. Check for idempotent execution - stateRoot, payloadID, found, err := c.checkIdempotency(ctx, blockHeight, timestamp, txs) + stateRoot, payloadID, found, err := c.reconcileExecutionAtHeight(ctx, blockHeight, timestamp, txs) if err != nil { c.logger.Warn().Err(err).Uint64("height", blockHeight).Msg("ExecuteTxs: idempotency check failed") // Continue execution on error, as it might be transient @@ -690,35 +696,57 @@ func (c *EngineClient) ResumePayload(ctx context.Context, payloadIDBytes []byte) return stateRoot, err } -// checkIdempotency checks if the block at the given height and timestamp has already been executed. +// reconcileExecutionAtHeight checks if the block at the given height and timestamp has already been executed. // It returns: // - stateRoot: non-nil if block is already promoted/finalized (idempotent success) // - payloadID: non-nil if block execution was started but not finished (resume needed) // - found: true if either of the above is true // - err: error during checks -func (c *EngineClient) checkIdempotency(ctx context.Context, height uint64, timestamp time.Time, txs [][]byte) (stateRoot []byte, payloadID *engine.PayloadID, found bool, err error) { +func (c *EngineClient) reconcileExecutionAtHeight(ctx context.Context, height uint64, timestamp time.Time, txs [][]byte) (stateRoot []byte, payloadID *engine.PayloadID, found bool, err error) { // 1. Check ExecMeta from store execMeta, err := c.store.GetExecMeta(ctx, height) if err == nil && execMeta != nil { - // If we already have a promoted block at this height, return the stored StateRoot + // If we already have a promoted block at this height, verify timestamp matches + // to catch Dual-Store Conflicts where ExecMeta was saved for an old block + // that was later replaced via Raft consensus. if execMeta.Stage == ExecStagePromoted && len(execMeta.StateRoot) > 0 { - c.logger.Info(). + if execMeta.Timestamp == timestamp.Unix() { + c.logger.Info(). + Uint64("height", height). + Str("stage", execMeta.Stage). + Msg("ExecuteTxs: reusing already-promoted execution (idempotent)") + return execMeta.StateRoot, nil, true, nil + } + // Timestamp mismatch - ExecMeta is stale from an old block that was replaced. + // Ignore it and proceed to EL check which will handle rollback if needed. + c.logger.Warn(). Uint64("height", height). - Str("stage", execMeta.Stage). - Msg("ExecuteTxs: reusing already-promoted execution (idempotent)") - return execMeta.StateRoot, nil, true, nil + Int64("execmeta_timestamp", execMeta.Timestamp). + Int64("requested_timestamp", timestamp.Unix()). + Msg("ExecuteTxs: ExecMeta timestamp mismatch, ignoring stale promoted record") } - // If we have a started execution with a payloadID, return it to resume + // If we have a started execution with a payloadID, validate it still exists before resuming. + // After node restart, the EL's payload cache is ephemeral and the payloadID may be stale. if execMeta.Stage == ExecStageStarted && len(execMeta.PayloadID) == 8 { - c.logger.Info(). - Uint64("height", height). - Str("stage", execMeta.Stage). - Msg("ExecuteTxs: found in-progress execution with payloadID, returning payloadID for resume") - var pid engine.PayloadID copy(pid[:], execMeta.PayloadID) - return nil, &pid, true, nil + + // Validate payload still exists by attempting to retrieve it + if _, err = c.engineClient.GetPayload(ctx, pid); err == nil { + c.logger.Info(). + Uint64("height", height). + Str("stage", execMeta.Stage). + Msg("ExecuteTxs: found in-progress execution with payloadID, returning payloadID for resume") + return nil, &pid, true, nil + } + // Payload is stale (expired or node restarted) - proceed with fresh execution + c.logger.Warn(). + Uint64("height", height). + Str("payloadID", pid.String()). + Err(err). + Msg("ExecuteTxs: stale ExecMeta payloadID no longer valid in EL, will re-execute") + // Don't return - fall through to fresh execution } } @@ -744,12 +772,31 @@ func (c *EngineClient) checkIdempotency(ctx context.Context, height uint64, time return existingStateRoot.Bytes(), nil, true, nil } - // Timestamp mismatch - log warning but proceed + // Timestamp mismatch - this indicates a Dual-Store Conflict where the EL produced a block + // that was not replicated via Raft before leadership changed. The new leader created a + // different block at the same height with a different timestamp, and that block is now + // the authoritative version. We need to rollback the EL to height-1 so it can re-execute + // with the correct timestamp from the Raft-replicated block. c.logger.Warn(). Uint64("height", height). Uint64("existingTimestamp", existingTimestamp). Int64("requestedTimestamp", timestamp.Unix()). - Msg("ExecuteTxs: block exists at height but timestamp differs") + Msg("ExecuteTxs: block exists at height but timestamp differs - rolling back EL to re-sync") + + // Rollback to height-1 to allow re-execution with correct timestamp + if height > 0 { + if err := c.Rollback(ctx, height-1); err != nil { + c.logger.Error().Err(err). + Uint64("height", height). + Uint64("rollback_target", height-1). + Msg("ExecuteTxs: failed to rollback EL for timestamp mismatch") + return nil, nil, false, fmt.Errorf("failed to rollback EL for timestamp mismatch at height %d: %w", height, err) + } + c.logger.Info(). + Uint64("height", height). + Uint64("rollback_target", height-1). + Msg("ExecuteTxs: EL rolled back successfully, will re-execute with correct timestamp") + } } return nil, nil, false, nil @@ -907,6 +954,48 @@ func (c *EngineClient) GetLatestHeight(ctx context.Context) (uint64, error) { return header.Number.Uint64(), nil } +// Rollback resets the execution layer head to the specified height using forkchoice update. +// This is used for recovery when the EL is ahead of the consensus layer (e.g., during rolling restarts +// where the EL committed blocks that were not replicated to Raft). +// +// Implements the execution.Rollbackable interface. +func (c *EngineClient) Rollback(ctx context.Context, targetHeight uint64) error { + // Get block hash at target height + blockHash, _, _, _, err := c.getBlockInfo(ctx, targetHeight) + if err != nil { + return fmt.Errorf("get block at height %d: %w", targetHeight, err) + } + + c.logger.Info(). + Uint64("target_height", targetHeight). + Str("block_hash", blockHash.Hex()). + Msg("rolling back execution layer via forkchoice update") + + // Reset head, safe, and finalized to target block + // This forces the EL to reorg its canonical chain to the target height + c.mu.Lock() + c.currentHeadBlockHash = blockHash + c.currentHeadHeight = targetHeight + c.currentSafeBlockHash = blockHash + c.currentFinalizedBlockHash = blockHash + args := engine.ForkchoiceStateV1{ + HeadBlockHash: blockHash, + SafeBlockHash: blockHash, + FinalizedBlockHash: blockHash, + } + c.mu.Unlock() + + if err := c.doForkchoiceUpdate(ctx, args, "Rollback"); err != nil { + return fmt.Errorf("forkchoice update for rollback failed: %w", err) + } + + c.logger.Info(). + Uint64("target_height", targetHeight). + Msg("execution layer rollback completed") + + return nil +} + // decodeSecret decodes a hex-encoded JWT secret string into a byte slice. func decodeSecret(jwtSecret string) ([]byte, error) { secret, err := hex.DecodeString(strings.TrimPrefix(jwtSecret, "0x"))