Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions core/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,20 @@ type HeightProvider interface {
// - error: Any errors during height retrieval
GetLatestHeight(ctx context.Context) (uint64, error)
}

// Rollbackable is an optional interface that execution clients can implement
// to support automatic rollback when the execution layer is ahead of the target height.
// This enables automatic recovery during rolling restarts when the EL has committed
// blocks that were not replicated to Raft.
type Rollbackable interface {
// Rollback resets the execution layer head to the specified height.
// This is used for recovery when the EL is ahead of the consensus layer.
//
// Parameters:
// - ctx: Context for timeout/cancellation control
// - targetHeight: Height to rollback to (must be <= current height)
//
// Returns:
// - error: Any errors during rollback
Rollback(ctx context.Context, targetHeight uint64) error
}
123 changes: 106 additions & 17 deletions execution/evm/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@ var (
// Ensure EngineAPIExecutionClient implements the execution.Execute interface
var _ execution.Executor = (*EngineClient)(nil)

// Ensure EngineClient implements the execution.HeightProvider interface
var _ execution.HeightProvider = (*EngineClient)(nil)

// Ensure EngineClient implements the execution.Rollbackable interface
var _ execution.Rollbackable = (*EngineClient)(nil)

// validatePayloadStatus checks the payload status and returns appropriate errors.
// It implements the Engine API specification's status handling:
// - VALID: Operation succeeded, return nil
Expand Down Expand Up @@ -338,7 +344,7 @@ func (c *EngineClient) GetTxs(ctx context.Context) ([][]byte, error) {
func (c *EngineClient) ExecuteTxs(ctx context.Context, txs [][]byte, blockHeight uint64, timestamp time.Time, prevStateRoot []byte) (updatedStateRoot []byte, maxBytes uint64, err error) {

// 1. Check for idempotent execution
stateRoot, payloadID, found, err := c.checkIdempotency(ctx, blockHeight, timestamp, txs)
stateRoot, payloadID, found, err := c.reconcileExecutionAtHeight(ctx, blockHeight, timestamp, txs)
if err != nil {
c.logger.Warn().Err(err).Uint64("height", blockHeight).Msg("ExecuteTxs: idempotency check failed")
// Continue execution on error, as it might be transient
Expand Down Expand Up @@ -690,35 +696,57 @@ func (c *EngineClient) ResumePayload(ctx context.Context, payloadIDBytes []byte)
return stateRoot, err
}

// checkIdempotency checks if the block at the given height and timestamp has already been executed.
// reconcileExecutionAtHeight checks if the block at the given height and timestamp has already been executed.
// It returns:
// - stateRoot: non-nil if block is already promoted/finalized (idempotent success)
// - payloadID: non-nil if block execution was started but not finished (resume needed)
// - found: true if either of the above is true
// - err: error during checks
func (c *EngineClient) checkIdempotency(ctx context.Context, height uint64, timestamp time.Time, txs [][]byte) (stateRoot []byte, payloadID *engine.PayloadID, found bool, err error) {
func (c *EngineClient) reconcileExecutionAtHeight(ctx context.Context, height uint64, timestamp time.Time, txs [][]byte) (stateRoot []byte, payloadID *engine.PayloadID, found bool, err error) {
// 1. Check ExecMeta from store
execMeta, err := c.store.GetExecMeta(ctx, height)
if err == nil && execMeta != nil {
// If we already have a promoted block at this height, return the stored StateRoot
// If we already have a promoted block at this height, verify timestamp matches
// to catch Dual-Store Conflicts where ExecMeta was saved for an old block
// that was later replaced via Raft consensus.
if execMeta.Stage == ExecStagePromoted && len(execMeta.StateRoot) > 0 {
c.logger.Info().
if execMeta.Timestamp == timestamp.Unix() {
c.logger.Info().
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is still a weak check without hash but it fixed the problems on restart

Uint64("height", height).
Str("stage", execMeta.Stage).
Msg("ExecuteTxs: reusing already-promoted execution (idempotent)")
return execMeta.StateRoot, nil, true, nil
}
// Timestamp mismatch - ExecMeta is stale from an old block that was replaced.
// Ignore it and proceed to EL check which will handle rollback if needed.
c.logger.Warn().
Uint64("height", height).
Str("stage", execMeta.Stage).
Msg("ExecuteTxs: reusing already-promoted execution (idempotent)")
return execMeta.StateRoot, nil, true, nil
Int64("execmeta_timestamp", execMeta.Timestamp).
Int64("requested_timestamp", timestamp.Unix()).
Msg("ExecuteTxs: ExecMeta timestamp mismatch, ignoring stale promoted record")
}

// If we have a started execution with a payloadID, return it to resume
// If we have a started execution with a payloadID, validate it still exists before resuming.
// After node restart, the EL's payload cache is ephemeral and the payloadID may be stale.
if execMeta.Stage == ExecStageStarted && len(execMeta.PayloadID) == 8 {
c.logger.Info().
Uint64("height", height).
Str("stage", execMeta.Stage).
Msg("ExecuteTxs: found in-progress execution with payloadID, returning payloadID for resume")

var pid engine.PayloadID
copy(pid[:], execMeta.PayloadID)
return nil, &pid, true, nil

// Validate payload still exists by attempting to retrieve it
if _, err = c.engineClient.GetPayload(ctx, pid); err == nil {
c.logger.Info().
Uint64("height", height).
Str("stage", execMeta.Stage).
Msg("ExecuteTxs: found in-progress execution with payloadID, returning payloadID for resume")
return nil, &pid, true, nil
}
// Payload is stale (expired or node restarted) - proceed with fresh execution
c.logger.Warn().
Uint64("height", height).
Str("payloadID", pid.String()).
Err(err).
Msg("ExecuteTxs: stale ExecMeta payloadID no longer valid in EL, will re-execute")
// Don't return - fall through to fresh execution
}
}

Expand All @@ -744,12 +772,31 @@ func (c *EngineClient) checkIdempotency(ctx context.Context, height uint64, time

return existingStateRoot.Bytes(), nil, true, nil
}
// Timestamp mismatch - log warning but proceed
// Timestamp mismatch - this indicates a Dual-Store Conflict where the EL produced a block
// that was not replicated via Raft before leadership changed. The new leader created a
// different block at the same height with a different timestamp, and that block is now
// the authoritative version. We need to rollback the EL to height-1 so it can re-execute
// with the correct timestamp from the Raft-replicated block.
c.logger.Warn().
Uint64("height", height).
Uint64("existingTimestamp", existingTimestamp).
Int64("requestedTimestamp", timestamp.Unix()).
Msg("ExecuteTxs: block exists at height but timestamp differs")
Msg("ExecuteTxs: block exists at height but timestamp differs - rolling back EL to re-sync")

// Rollback to height-1 to allow re-execution with correct timestamp
if height > 0 {
if err := c.Rollback(ctx, height-1); err != nil {
c.logger.Error().Err(err).
Uint64("height", height).
Uint64("rollback_target", height-1).
Msg("ExecuteTxs: failed to rollback EL for timestamp mismatch")
return nil, nil, false, fmt.Errorf("failed to rollback EL for timestamp mismatch at height %d: %w", height, err)
}
c.logger.Info().
Uint64("height", height).
Uint64("rollback_target", height-1).
Msg("ExecuteTxs: EL rolled back successfully, will re-execute with correct timestamp")
}
}

return nil, nil, false, nil
Expand Down Expand Up @@ -907,6 +954,48 @@ func (c *EngineClient) GetLatestHeight(ctx context.Context) (uint64, error) {
return header.Number.Uint64(), nil
}

// Rollback resets the execution layer head to the specified height using forkchoice update.
// This is used for recovery when the EL is ahead of the consensus layer (e.g., during rolling restarts
// where the EL committed blocks that were not replicated to Raft).
//
// Implements the execution.Rollbackable interface.
func (c *EngineClient) Rollback(ctx context.Context, targetHeight uint64) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we wire the command directly then?
This would fix #2967.

For consitency, a good follow-up would be to implement this in ev-abci and use it in the rollback command as well instead of directly calling: https://github.com/evstack/ev-abci/blob/e905b0b/server/rollback_cmd.go#L91-L95.

// Get block hash at target height
blockHash, _, _, _, err := c.getBlockInfo(ctx, targetHeight)
if err != nil {
return fmt.Errorf("get block at height %d: %w", targetHeight, err)
}

c.logger.Info().
Uint64("target_height", targetHeight).
Str("block_hash", blockHash.Hex()).
Msg("rolling back execution layer via forkchoice update")

// Reset head, safe, and finalized to target block
// This forces the EL to reorg its canonical chain to the target height
c.mu.Lock()
c.currentHeadBlockHash = blockHash
c.currentHeadHeight = targetHeight
c.currentSafeBlockHash = blockHash
c.currentFinalizedBlockHash = blockHash
args := engine.ForkchoiceStateV1{
HeadBlockHash: blockHash,
SafeBlockHash: blockHash,
FinalizedBlockHash: blockHash,
}
c.mu.Unlock()

if err := c.doForkchoiceUpdate(ctx, args, "Rollback"); err != nil {
return fmt.Errorf("forkchoice update for rollback failed: %w", err)
}
Comment on lines +976 to +990
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There is a critical race condition in how forkchoiceUpdated calls are managed. The mutex c.mu is unlocked before calling c.doForkchoiceUpdate, which means the update to the client's internal state (c.current... fields) and the RPC call that propagates this state to the execution layer are not atomic.

If another goroutine (e.g., handling SetFinal or another ExecuteTxs) modifies the state and sends its own forkchoiceUpdated call between the c.mu.Unlock() and the completion of this doForkchoiceUpdate, the state of the EngineClient and the execution layer can diverge.

For example:

  1. Rollback locks, updates c.current... to height H-1, and unlocks.
  2. Another goroutine calls SetFinal for height H. It locks, updates c.currentFinalizedBlockHash to H, and unlocks.
  3. SetFinal's doForkchoiceUpdate is sent to the EL, finalizing H.
  4. Rollback's doForkchoiceUpdate is sent, resetting head/safe/finalized to H-1.

The final state of the EL is H-1, but EngineClient's state has currentFinalizedBlockHash as H, which is a state inconsistency.

To ensure atomicity, the lock should be held over the doForkchoiceUpdate call. While this has performance implications due to holding a lock over a network call with retries, it is necessary for correctness. This atomicity should be enforced for all methods that call doForkchoiceUpdate.

Suggested change
c.mu.Lock()
c.currentHeadBlockHash = blockHash
c.currentHeadHeight = targetHeight
c.currentSafeBlockHash = blockHash
c.currentFinalizedBlockHash = blockHash
args := engine.ForkchoiceStateV1{
HeadBlockHash: blockHash,
SafeBlockHash: blockHash,
FinalizedBlockHash: blockHash,
}
c.mu.Unlock()
if err := c.doForkchoiceUpdate(ctx, args, "Rollback"); err != nil {
return fmt.Errorf("forkchoice update for rollback failed: %w", err)
}
c.mu.Lock()
defer c.mu.Unlock()
c.currentHeadBlockHash = blockHash
c.currentHeadHeight = targetHeight
c.currentSafeBlockHash = blockHash
c.currentFinalizedBlockHash = blockHash
args := engine.ForkchoiceStateV1{
HeadBlockHash: blockHash,
SafeBlockHash: blockHash,
FinalizedBlockHash: blockHash,
}
if err := c.doForkchoiceUpdate(ctx, args, "Rollback"); err != nil {
return fmt.Errorf("forkchoice update for rollback failed: %w", err)
}


c.logger.Info().
Uint64("target_height", targetHeight).
Msg("execution layer rollback completed")

return nil
}

// decodeSecret decodes a hex-encoded JWT secret string into a byte slice.
func decodeSecret(jwtSecret string) ([]byte, error) {
secret, err := hex.DecodeString(strings.TrimPrefix(jwtSecret, "0x"))
Expand Down
Loading