Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/logpoller/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (th *TestHarness) AdjustTime(t *testing.T, d time.Duration) {
}

func (th *TestHarness) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) int64 {
th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber)
th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber, false)
latest, _ := th.LogPoller.LatestBlock(ctx)
return latest.BlockNumber + 1
}
Expand Down
153 changes: 110 additions & 43 deletions pkg/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ type LogPoller interface {

type LogPollerTest interface {
LogPoller
PollAndSaveLogs(ctx context.Context, currentBlockNumber int64)
PollAndSaveLogs(ctx context.Context, currentBlockNumber int64, isReplay bool)
BackupPollAndSaveLogs(ctx context.Context) error
Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQuery
GetReplayFromBlock(ctx context.Context, requested int64) (int64, error)
Expand Down Expand Up @@ -662,7 +662,7 @@ func (lp *logPoller) run() {
} else {
start = lastProcessed.BlockNumber + 1
}
lp.PollAndSaveLogs(ctx, start)
lp.PollAndSaveLogs(ctx, start, false)
case <-backupLogPollTicker.C:
if lp.backupPollerBlockDelay == 0 {
continue // backup poller is disabled
Expand Down Expand Up @@ -772,7 +772,7 @@ func (lp *logPoller) handleReplayRequest(ctx context.Context, fromBlockReq int64
if err == nil {
// Serially process replay requests.
lp.lggr.Infow("Executing replay", "fromBlock", fromBlock, "requested", fromBlockReq)
lp.PollAndSaveLogs(ctx, fromBlock)
lp.PollAndSaveLogs(ctx, fromBlock, true)
lp.lggr.Infow("Executing replay finished", "fromBlock", fromBlock, "requested", fromBlockReq)
}
} else {
Expand Down Expand Up @@ -954,6 +954,25 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {
return nil
}

func (lp *logPoller) headerByNumber(ctx context.Context, blockNumber int64) (*evmtypes.Head, error) {
header, err := lp.latencyMonitor.HeadByNumber(ctx, big.NewInt(blockNumber))
if err != nil {
lp.lggr.Warnw("Unable to get currentBlock", "err", err, "blockNumber", blockNumber)
return nil, fmt.Errorf("unable to get current block header for block number %d: %w", blockNumber, err)
}
// Additional sanity checks, don't necessarily trust the RPC.
if header == nil {
lp.lggr.Errorw("Unexpected nil block from RPC", "blockNumber", blockNumber)
return nil, fmt.Errorf("got nil block for %d", blockNumber)
}
if header.Number != blockNumber {
lp.lggr.Warnw("Unable to get currentBlock, rpc returned incorrect block", "blockNumber", blockNumber, "got", header.Number)
return nil, fmt.Errorf("block mismatch have %d want %d", header.Number, blockNumber)
}

return header, nil
}

// getCurrentBlockMaybeHandleReorg accepts a block number
// and will return that block if its parent points to our last saved block.
// One can optionally pass the block header if it has already been queried to avoid an extra RPC call.
Expand All @@ -962,23 +981,12 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {
// 1. Find the LCA by following parent hashes.
// 2. Delete all logs and blocks after the LCA
// 3. Return the LCA+1, i.e. our new current (unprocessed) block.
func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, currentBlockNumber int64, currentBlock *evmtypes.Head) (head *evmtypes.Head, err error) {
func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, currentBlockNumber int64, currentBlock *evmtypes.Head, isReplay bool) (head *evmtypes.Head, err error) {
var err1 error
if currentBlock == nil {
// If we don't have the current block already, lets get it.
currentBlock, err1 = lp.latencyMonitor.HeadByNumber(ctx, big.NewInt(currentBlockNumber))
currentBlock, err1 = lp.headerByNumber(ctx, currentBlockNumber)
if err1 != nil {
lp.lggr.Warnw("Unable to get currentBlock", "err", err1, "currentBlockNumber", currentBlockNumber)
return nil, err1
}
// Additional sanity checks, don't necessarily trust the RPC.
if currentBlock == nil {
lp.lggr.Errorw("Unexpected nil block from RPC", "currentBlockNumber", currentBlockNumber)
return nil, pkgerrors.Errorf("Got nil block for %d", currentBlockNumber)
}
if currentBlock.Number != currentBlockNumber {
lp.lggr.Warnw("Unable to get currentBlock, rpc returned incorrect block", "currentBlockNumber", currentBlockNumber, "got", currentBlock.Number)
return nil, pkgerrors.Errorf("Block mismatch have %d want %d", currentBlock.Number, currentBlockNumber)
return nil, fmt.Errorf("unable to get current block header for block number %d: %w", currentBlockNumber, err1)
}
}
// Does this currentBlock point to the same parent that we have saved?
Expand All @@ -997,39 +1005,98 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren
}
// Check for reorg.
if currentBlock.ParentHash != expectedParent.BlockHash {
// There can be another reorg while we're finding the LCA.
// That is ok, since we'll detect it on the next iteration.
// Since we go currentBlock by currentBlock for unfinalized logs, the mismatch starts at currentBlockNumber - 1.
blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock, expectedParent.FinalizedBlockNumber)
if err2 != nil {
return nil, fmt.Errorf("unable to find LCA after reorg: %w", err2)
return lp.handleReorg(ctx, currentBlock)
}

if !isReplay {
// During normal polling DB does not have any blocks after currentBlockNumber, so no reorg is possible. We can skip extra checks and just return currentBlock.
return currentBlock, nil
}

// Ensure that if DB contains current block it matches the current block from RPC.
currentBlockDB, err := lp.orm.SelectBlockByNumber(ctx, currentBlockNumber)
if err != nil && !pkgerrors.Is(err, sql.ErrNoRows) {
return nil, fmt.Errorf("failed to get current block from DB %d: %w", currentBlockNumber, err)
}

if currentBlockDB != nil && currentBlock.Hash != currentBlockDB.BlockHash {
return lp.handleReorg(ctx, currentBlock)
}

// No reorg for current block, but during replay it's possible that current block is older than the latest block, let's check it too to avoid false positives on finality violation.
latestBlockDB, err1 := lp.orm.SelectLatestBlock(ctx)
if err1 != nil {
if pkgerrors.Is(err1, sql.ErrNoRows) {
lp.lggr.Criticalw("Unexpected state. Expected at least one block to be present in the db when checking for reorg during replay, but got no rows", "currentBlockNumber", currentBlockNumber, "err", err1)
}
return nil, pkgerrors.Wrap(err1, "unable to get latest block")
}

lp.lggr.Infow("Reorg detected", "blockAfterLCA", blockAfterLCA.Number, "currentBlockNumber", currentBlockNumber)
// We truncate all the blocks and logs after the LCA.
// We could preserve the logs for forensics, since its possible
// that applications see them and take action upon it, however that
// results in significantly slower reads since we must then compute
// the canonical set per read. Typically, if an application took action on a log
// it would be saved elsewhere e.g. evm.txes, so it seems better to just support the fast reads.
// Its also nicely analogous to reading from the chain itself.
err2 = lp.orm.DeleteLogsAndBlocksAfter(ctx, blockAfterLCA.Number)
if err2 != nil {
// If we error on db commit, we can't know if the tx went through or not.
// We return an error here which will cause us to restart polling from lastBlockSaved + 1
return nil, err2
if currentBlock.BlockNumber() >= latestBlockDB.BlockNumber {
// currentBlock is newest, nothing more to check
return currentBlock, nil
}

latestBlockRPC, err := lp.headerByNumber(ctx, latestBlockDB.BlockNumber)
if err != nil {
return nil, fmt.Errorf("unable to get latest block header for block number %d: %w", latestBlockDB.BlockNumber, err)
}

if latestBlockRPC.Hash != latestBlockDB.BlockHash {
// Reorg detected, handle it
blockAfterLCA, err := lp.handleReorg(ctx, latestBlockRPC)
if err != nil {
return nil, fmt.Errorf("failed to handle reorg: %w", err)
}

if blockAfterLCA.Number < currentBlock.BlockNumber() {
return blockAfterLCA, nil
}
return blockAfterLCA, nil
}
// No reorg, return current block.

return currentBlock, nil
}

func (lp *logPoller) handleReorg(ctx context.Context, currentBlock *evmtypes.Head) (*evmtypes.Head, error) {
// during replay currentBlock may be older than the latest block, thus it's possible to miss finality violation,
// if we use its view on latest finalized block. To be safe, we get the latest block from the db.
latestBlock, err := lp.orm.SelectLatestBlock(ctx)
if err != nil {
if pkgerrors.Is(err, sql.ErrNoRows) {
lp.lggr.Criticalw("Unexpected state. Expected at least one block to be present in the db when handling reorg, but got no rows", "currentBlockNumber", currentBlock.Number, "err", err)
}
return nil, pkgerrors.Wrap(err, "failed to get latest finalized block from db")
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Misleading error message: The error message says "failed to get latest finalized block from db" but the function actually calls orm.SelectLatestBlock (line 1063), not SelectLatestFinalizedBlock. The error message should be updated to "failed to get latest block from db" for accuracy.

Suggested change
return nil, pkgerrors.Wrap(err, "failed to get latest finalized block from db")
return nil, pkgerrors.Wrap(err, "failed to get latest block from db")

Copilot uses AI. Check for mistakes.
}
// There can be another reorg while we're finding the LCA.
// That is ok, since we'll detect it on the next iteration.
// Since we go currentBlock by currentBlock for unfinalized logs, the mismatch starts at currentBlockNumber - 1.
blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock, latestBlock.FinalizedBlockNumber)
if err2 != nil {
return nil, fmt.Errorf("unable to find LCA after reorg: %w", err2)
}

lp.lggr.Infow("Reorg detected", "blockAfterLCA", blockAfterLCA.Number, "currentBlockNumber", currentBlock.Number)
// We truncate all the blocks and logs after the LCA.
// We could preserve the logs for forensics, since its possible
// that applications see them and take action upon it, however that
// results in significantly slower reads since we must then compute
// the canonical set per read. Typically, if an application took action on a log
// it would be saved elsewhere e.g. evm.txes, so it seems better to just support the fast reads.
// Its also nicely analogous to reading from the chain itself.
err2 = lp.orm.DeleteLogsAndBlocksAfter(ctx, blockAfterLCA.Number)
if err2 != nil {
// If we error on db commit, we can't know if the tx went through or not.
// We return an error here which will cause us to restart polling from lastBlockSaved + 1
return nil, err2
}
return blockAfterLCA, nil
}

// PollAndSaveLogs On startup/crash current is the first block after the last processed block.
// currentBlockNumber is the block from where new logs are to be polled & saved. Under normal
// conditions this would be equal to lastProcessed.BlockNumber + 1.
Copy link

Copilot AI Feb 17, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing parameter documentation: The function comment should document the new isReplay parameter, explaining that it should be true when called during a replay operation and false during normal polling. This helps future maintainers understand when to use each value.

Suggested change
// conditions this would be equal to lastProcessed.BlockNumber + 1.
// conditions this would be equal to lastProcessed.BlockNumber + 1. The isReplay flag should be
// true when PollAndSaveLogs is called as part of a replay operation and false during normal polling.

Copilot uses AI. Check for mistakes.
func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) {
err := lp.pollAndSaveLogs(ctx, currentBlockNumber)
func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64, isReplay bool) {
err := lp.pollAndSaveLogs(ctx, currentBlockNumber, isReplay)
if errors.Is(err, commontypes.ErrFinalityViolated) {
lp.lggr.Criticalw("Failed to poll and save logs due to finality violation, retrying later", "err", err)
lp.finalityViolated.Store(true)
Expand All @@ -1048,7 +1115,7 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int
}
}

func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int64) (err error) {
func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int64, isReplay bool) (err error) {
lp.lggr.Debugw("Polling for logs", "currentBlockNumber", currentBlockNumber)
// Intentionally not using logPoller.finalityDepth directly but the latestFinalizedBlockNumber returned from lp.latestBlocks()
// latestBlocks knows how to pick a proper latestFinalizedBlockNumber based on the logPoller's configuration
Expand Down Expand Up @@ -1078,7 +1145,7 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int
}
// Possibly handle a reorg. For example if we crash, we'll be in the middle of processing unfinalized blocks.
// Returns (currentBlock || LCA+1 if reorg detected, error)
currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, currentBlock)
currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, currentBlock, isReplay)
if err != nil {
// If there's an error handling the reorg, we can't be sure what state the db was left in.
// Resume from the latest block saved and retry.
Expand All @@ -1104,7 +1171,7 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int

for {
if currentBlockNumber > currentBlock.Number {
currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil)
currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil, isReplay)
if err != nil {
// If there's an error handling the reorg, we can't be sure what state the db was left in.
// Resume from the latest block saved.
Expand Down
12 changes: 6 additions & 6 deletions pkg/logpoller/log_poller_internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ func assertBackupPollerStartup(t *testing.T, head *evmtypes.Head, finalizedHead
assert.Equal(t, int64(0), lp.backupPollerNextBlock)
assert.Equal(t, 1, observedLogs.FilterMessageSnippet("ran before first successful log poller run").Len())

lp.PollAndSaveLogs(ctx, head.Number)
lp.PollAndSaveLogs(ctx, head.Number, false)

lastProcessed, err := lp.orm.SelectLatestBlock(ctx)
require.NoError(t, err)
Expand Down Expand Up @@ -380,7 +380,7 @@ func TestLogPoller_Replay(t *testing.T) {
{
ctx := testutils.Context(t)
// process 1 log in block 3
lp.PollAndSaveLogs(ctx, 4)
lp.PollAndSaveLogs(ctx, 4, false)
latest, err := lp.LatestBlock(ctx)
require.NoError(t, err)
require.Equal(t, int64(4), latest.BlockNumber)
Expand Down Expand Up @@ -785,7 +785,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) {
// insert finalized block with different hash than in RPC
require.NoError(t, orm.InsertBlock(t.Context(), common.HexToHash("0x123"), 2, time.Unix(10, 0), 2, 2))
lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts)
lp.PollAndSaveLogs(t.Context(), 4)
lp.PollAndSaveLogs(t.Context(), 4, false)
require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated)
})
t.Run("RPCs contradict each other and return different finalized blocks", func(t *testing.T) {
Expand All @@ -806,7 +806,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) {
return evmtypes.Head{Number: num, Hash: utils.NewHash()}
})
lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts)
lp.PollAndSaveLogs(t.Context(), 4)
lp.PollAndSaveLogs(t.Context(), 4, false)
require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated)
})
t.Run("Log's hash does not match block's", func(t *testing.T) {
Expand All @@ -824,7 +824,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) {
ec.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return([]types.Log{{BlockNumber: 5, BlockHash: common.HexToHash("0x123")}}, nil).Once()
mockBatchCallContext(t, ec)
lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts)
lp.PollAndSaveLogs(t.Context(), 4)
lp.PollAndSaveLogs(t.Context(), 4, false)
require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated)
})
t.Run("Happy path", func(t *testing.T) {
Expand All @@ -844,7 +844,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) {
ec.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return([]types.Log{{BlockNumber: 5, BlockHash: common.BigToHash(big.NewInt(5)), Topics: []common.Hash{{}}}}, nil).Once()
mockBatchCallContext(t, ec)
lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts)
lp.PollAndSaveLogs(t.Context(), 4)
lp.PollAndSaveLogs(t.Context(), 4, false)
require.NoError(t, lp.HealthReport()[lp.Name()])
})
}
Expand Down
Loading
Loading