From 250d0656cbd829d4f2fc3c7ec8b4cdd1534f7e8a Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Thu, 12 Feb 2026 13:51:16 +0100 Subject: [PATCH 1/3] Test: reorg during replay --- pkg/logpoller/log_poller_test.go | 110 +++++++++++++++++++++++++++++++ 1 file changed, 110 insertions(+) diff --git a/pkg/logpoller/log_poller_test.go b/pkg/logpoller/log_poller_test.go index b42798b61f..fcd395086b 100644 --- a/pkg/logpoller/log_poller_test.go +++ b/pkg/logpoller/log_poller_test.go @@ -2152,3 +2152,113 @@ func TestWhere(t *testing.T) { assert.Equal(t, []query.Expression{}, result) }) } + +func TestLogPoller_Reorg_On_Replay(t *testing.T) { + // TestCase: + // 1. LogPoller processes blocks to block 11 + // 2. Reorg replaces block 11 with a new block (some additional blocks may be added on top of it) + // 3. Replay is initiated from block below 11. + // Expected behaviour: + // 1. LogPoller should replace reorged block 11 with a new data. + // 2. DB must not contain at any point logs from both old and new block 11. + // 3. Finality Violation must not occur, since chain did not violate finality depth. + t.Parallel() + const reorgedBlockNumber = 11 + testCases := []struct { + name string + numberOfBlocksAfterReorg int + }{ + { + name: "Replay start right after reorg", + numberOfBlocksAfterReorg: 0, + }, + { + name: "Replay start a few blocks after reorg", + numberOfBlocksAfterReorg: 1, + }, + { + name: "Replay start once reorged block is finalized", + numberOfBlocksAfterReorg: 5, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + lpOpts := logpoller.Opts{ + PollPeriod: 24 * time.Hour, // effectively disable automatic polling, so we can control when we poll in the test + UseFinalityTag: false, + FinalityDepth: 3, + BackfillBatchSize: 3, + RPCBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + th := SetupTH(t, lpOpts) + + // Set up a log poller listening for log emitter logs. + err := th.LogPoller.RegisterFilter(testutils.Context(t), logpoller.Filter{ + Name: "Test Emitter 1", + EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, + Addresses: []common.Address{th.EmitterAddress1}, + }) + require.NoError(t, err) + + // populate chain with data + for range reorgedBlockNumber - 1 { + _, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(int64(1))}) + require.NoError(t, err) + th.Backend.Commit() + } + + // start LogPoller and wait for it to complete first poll. Second poll won't happen until we call PollAndSaveLogs again, since poll period is very long. + require.NoError(t, th.LogPoller.Start(t.Context())) + defer func() { + require.NoError(t, th.LogPoller.Close()) + }() + testutils.RequireEventually(t, func() bool { + latest, err := th.LogPoller.LatestBlock(t.Context()) + return err == nil && latest.BlockNumber == reorgedBlockNumber + }) + + reorgedBlock, err := th.Client.BlockByNumber(t.Context(), nil) + require.NoError(t, err) + require.Equal(t, int64(reorgedBlockNumber), reorgedBlock.Number().Int64()) + + // Replace block 11 with a new block and burry it under 1 new block + require.NoError(t, th.Backend.Fork(reorgedBlock.ParentHash())) + const newLogData = int64(123) + // Commit reorgedBlock and numberOfBlocksAfterReorg on top of it + for range tc.numberOfBlocksAfterReorg + 1 { + // emit log that is not tracked by LP to ensure that tracked log has a different index. + // So if reorg is not properly handled and both logs end up in the database + _, err = th.Emitter2.EmitLog1(th.Owner, []*big.Int{big.NewInt(int64(10))}) + require.NoError(t, err) + _, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(newLogData)}) + th.Backend.Commit() + } + + newReorgedBlock, err := th.Client.BlockByNumber(t.Context(), big.NewInt(reorgedBlockNumber)) + require.NoError(t, err) + require.NotEqual(t, reorgedBlock.Hash().String(), newReorgedBlock.Hash().String()) + + latest, err := th.Client.BlockByNumber(t.Context(), nil) + require.NoError(t, err) + require.Equal(t, int64(tc.numberOfBlocksAfterReorg+reorgedBlockNumber), latest.Number().Int64()) + + // Trigger replay, which should gracefully handle the reorg and end up on the new latest block + err = th.LogPoller.Replay(t.Context(), 5) + require.NoError(t, err) + // LP should be on latest block now + lpLatest, err := th.LogPoller.LatestBlock(t.Context()) + require.NoError(t, err) + require.Equal(t, int64(tc.numberOfBlocksAfterReorg+reorgedBlockNumber), lpLatest.BlockNumber) + logs, err := th.ORM.SelectLogsByBlockRange(t.Context(), reorgedBlockNumber, reorgedBlockNumber) + require.NoError(t, err) + require.Len(t, logs, 1) + require.Equal(t, newLogData, big.NewInt(0).SetBytes(logs[0].Data).Int64(), "Log data should match the log from the new block, indicating that the old block's log was properly removed during replay") + // Ensure reorged block was replaced by a new one + dbBlock, err := th.ORM.SelectBlockByNumber(testutils.Context(t), reorgedBlock.Number().Int64()) + require.NoError(t, err) + require.Equal(t, reorgedBlock.Number().Int64(), dbBlock.BlockNumber) + require.NotEqual(t, reorgedBlock.Hash(), dbBlock.BlockHash) + }) + } +} From abead20a19dbaa1a387d186b23d3f007d2e3914a Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Thu, 12 Feb 2026 14:37:28 +0100 Subject: [PATCH 2/3] Fix reorg handling during replay --- pkg/logpoller/helper_test.go | 2 +- pkg/logpoller/log_poller.go | 155 ++++++++++++++++------ pkg/logpoller/log_poller_internal_test.go | 12 +- pkg/logpoller/log_poller_test.go | 14 +- 4 files changed, 126 insertions(+), 57 deletions(-) diff --git a/pkg/logpoller/helper_test.go b/pkg/logpoller/helper_test.go index e4f6a0f745..cb79fb721c 100644 --- a/pkg/logpoller/helper_test.go +++ b/pkg/logpoller/helper_test.go @@ -164,7 +164,7 @@ func (th *TestHarness) AdjustTime(t *testing.T, d time.Duration) { } func (th *TestHarness) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) int64 { - th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber) + th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber, false) latest, _ := th.LogPoller.LatestBlock(ctx) return latest.BlockNumber + 1 } diff --git a/pkg/logpoller/log_poller.go b/pkg/logpoller/log_poller.go index 25aca9082c..01381e52d3 100644 --- a/pkg/logpoller/log_poller.go +++ b/pkg/logpoller/log_poller.go @@ -76,7 +76,7 @@ type LogPoller interface { type LogPollerTest interface { LogPoller - PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) + PollAndSaveLogs(ctx context.Context, currentBlockNumber int64, isReplay bool) BackupPollAndSaveLogs(ctx context.Context) error Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQuery GetReplayFromBlock(ctx context.Context, requested int64) (int64, error) @@ -662,7 +662,7 @@ func (lp *logPoller) run() { } else { start = lastProcessed.BlockNumber + 1 } - lp.PollAndSaveLogs(ctx, start) + lp.PollAndSaveLogs(ctx, start, false) case <-backupLogPollTicker.C: if lp.backupPollerBlockDelay == 0 { continue // backup poller is disabled @@ -772,7 +772,7 @@ func (lp *logPoller) handleReplayRequest(ctx context.Context, fromBlockReq int64 if err == nil { // Serially process replay requests. lp.lggr.Infow("Executing replay", "fromBlock", fromBlock, "requested", fromBlockReq) - lp.PollAndSaveLogs(ctx, fromBlock) + lp.PollAndSaveLogs(ctx, fromBlock, true) lp.lggr.Infow("Executing replay finished", "fromBlock", fromBlock, "requested", fromBlockReq) } } else { @@ -954,6 +954,26 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { return nil } +func (lp *logPoller) headerByNumber(ctx context.Context, blockNumber int64) (*evmtypes.Head, error) { + // If we don't have the current block already, lets get it. + header, err := lp.latencyMonitor.HeadByNumber(ctx, big.NewInt(blockNumber)) + if err != nil { + lp.lggr.Warnw("Unable to get currentBlock", "err", err, "blockNumber", blockNumber) + return nil, fmt.Errorf("unable to get current block header for block number %d: %w", blockNumber, err) + } + // Additional sanity checks, don't necessarily trust the RPC. + if header == nil { + lp.lggr.Errorw("Unexpected nil block from RPC", "blockNumber", blockNumber) + return nil, fmt.Errorf("got nil block for %d", blockNumber) + } + if header.Number != blockNumber { + lp.lggr.Warnw("Unable to get currentBlock, rpc returned incorrect block", "blockNumber", blockNumber, "got", header.Number) + return nil, fmt.Errorf("block mismatch have %d want %d", header.Number, blockNumber) + } + + return header, nil +} + // getCurrentBlockMaybeHandleReorg accepts a block number // and will return that block if its parent points to our last saved block. // One can optionally pass the block header if it has already been queried to avoid an extra RPC call. @@ -962,23 +982,12 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { // 1. Find the LCA by following parent hashes. // 2. Delete all logs and blocks after the LCA // 3. Return the LCA+1, i.e. our new current (unprocessed) block. -func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, currentBlockNumber int64, currentBlock *evmtypes.Head) (head *evmtypes.Head, err error) { +func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, currentBlockNumber int64, currentBlock *evmtypes.Head, isReplay bool) (head *evmtypes.Head, err error) { var err1 error if currentBlock == nil { - // If we don't have the current block already, lets get it. - currentBlock, err1 = lp.latencyMonitor.HeadByNumber(ctx, big.NewInt(currentBlockNumber)) + currentBlock, err1 = lp.headerByNumber(ctx, currentBlockNumber) if err1 != nil { - lp.lggr.Warnw("Unable to get currentBlock", "err", err1, "currentBlockNumber", currentBlockNumber) - return nil, err1 - } - // Additional sanity checks, don't necessarily trust the RPC. - if currentBlock == nil { - lp.lggr.Errorw("Unexpected nil block from RPC", "currentBlockNumber", currentBlockNumber) - return nil, pkgerrors.Errorf("Got nil block for %d", currentBlockNumber) - } - if currentBlock.Number != currentBlockNumber { - lp.lggr.Warnw("Unable to get currentBlock, rpc returned incorrect block", "currentBlockNumber", currentBlockNumber, "got", currentBlock.Number) - return nil, pkgerrors.Errorf("Block mismatch have %d want %d", currentBlock.Number, currentBlockNumber) + return nil, fmt.Errorf("unable to get current block header for block number %d: %w", currentBlockNumber, err1) } } // Does this currentBlock point to the same parent that we have saved? @@ -997,39 +1006,99 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren } // Check for reorg. if currentBlock.ParentHash != expectedParent.BlockHash { - // There can be another reorg while we're finding the LCA. - // That is ok, since we'll detect it on the next iteration. - // Since we go currentBlock by currentBlock for unfinalized logs, the mismatch starts at currentBlockNumber - 1. - blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock, expectedParent.FinalizedBlockNumber) - if err2 != nil { - return nil, fmt.Errorf("unable to find LCA after reorg: %w", err2) + return lp.handleReorg(ctx, currentBlock) + } + + if !isReplay { + // During normal polling DB does not have any blocks after currentBlockNumber, so no reorg is possible. We can skip extra checks and just return currentBlock. + return currentBlock, nil + } + + // Ensure that if DB contains current block it matches the current block from RPC. + currentBlockDB, err := lp.orm.SelectBlockByNumber(ctx, currentBlockNumber) + if err != nil && !pkgerrors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("failed to get current block from DB %d: %w", currentBlockNumber, err) + } + + if currentBlockDB != nil && currentBlock.Hash != currentBlockDB.BlockHash { + return lp.handleReorg(ctx, currentBlock) + } + + // No reorg for current block, but during replay it's possible that current block is older than the latest block, let's check it too to avoid false positives on finality violation. + latestBlockDB, err1 := lp.orm.SelectLatestBlock(ctx) + if err1 != nil { + if pkgerrors.Is(err1, sql.ErrNoRows) { + lp.lggr.Criticalw("Unexpected state. Expected at least one block to be present in the db when checking for reorg during replay, but got no rows", "currentBlockNumber", currentBlockNumber, "err", err1) + } + return nil, pkgerrors.Wrap(err1, "unable to get latest block") + } + + if currentBlock.BlockNumber() >= latestBlockDB.BlockNumber { + // currentBlock is newest, nothing more to check + return currentBlock, nil + } + + latestBlockRPC, err := lp.headerByNumber(ctx, latestBlockDB.BlockNumber) + if err != nil { + return nil, fmt.Errorf("unable to get latest block header for block number %d: %w", latestBlockDB.BlockNumber, err) + } + + if latestBlockRPC.Hash != latestBlockDB.BlockHash { + // Reorg detected, handle it + lca, err := lp.handleReorg(ctx, latestBlockRPC) + if err != nil { + return nil, fmt.Errorf("failed to handle reorg: %w", err) } - lp.lggr.Infow("Reorg detected", "blockAfterLCA", blockAfterLCA.Number, "currentBlockNumber", currentBlockNumber) - // We truncate all the blocks and logs after the LCA. - // We could preserve the logs for forensics, since its possible - // that applications see them and take action upon it, however that - // results in significantly slower reads since we must then compute - // the canonical set per read. Typically, if an application took action on a log - // it would be saved elsewhere e.g. evm.txes, so it seems better to just support the fast reads. - // Its also nicely analogous to reading from the chain itself. - err2 = lp.orm.DeleteLogsAndBlocksAfter(ctx, blockAfterLCA.Number) - if err2 != nil { - // If we error on db commit, we can't know if the tx went through or not. - // We return an error here which will cause us to restart polling from lastBlockSaved + 1 - return nil, err2 + if lca.Number < currentBlock.BlockNumber() { + // LCA is older than current block, we need to get the new current block after reorg + return lca, nil } - return blockAfterLCA, nil } - // No reorg, return current block. + return currentBlock, nil } +func (lp *logPoller) handleReorg(ctx context.Context, currentBlock *evmtypes.Head) (*evmtypes.Head, error) { + // during replay currentBlock may be older than the latest block, thus it's possible to miss finality violation, + // if we use its view on latest finalized block. To be safe, we get the latest block from the db. + latestBlock, err := lp.orm.SelectLatestBlock(ctx) + if err != nil { + if pkgerrors.Is(err, sql.ErrNoRows) { + lp.lggr.Criticalw("Unexpected state. Expected at least one block to be present in the db when handling reorg, but got no rows", "currentBlockNumber", currentBlock.Number, "err", err) + } + return nil, pkgerrors.Wrap(err, "failed to get latest finalized block from db") + } + // There can be another reorg while we're finding the LCA. + // That is ok, since we'll detect it on the next iteration. + // Since we go currentBlock by currentBlock for unfinalized logs, the mismatch starts at currentBlockNumber - 1. + blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock, latestBlock.FinalizedBlockNumber) + if err2 != nil { + return nil, fmt.Errorf("unable to find LCA after reorg: %w", err2) + } + + lp.lggr.Infow("Reorg detected", "blockAfterLCA", blockAfterLCA.Number, "currentBlockNumber", currentBlock.Number) + // We truncate all the blocks and logs after the LCA. + // We could preserve the logs for forensics, since its possible + // that applications see them and take action upon it, however that + // results in significantly slower reads since we must then compute + // the canonical set per read. Typically, if an application took action on a log + // it would be saved elsewhere e.g. evm.txes, so it seems better to just support the fast reads. + // Its also nicely analogous to reading from the chain itself. + err2 = lp.orm.DeleteLogsAndBlocksAfter(ctx, blockAfterLCA.Number) + if err2 != nil { + // If we error on db commit, we can't know if the tx went through or not. + // We return an error here which will cause us to restart polling from lastBlockSaved + 1 + return nil, err2 + } + return blockAfterLCA, nil +} + // PollAndSaveLogs On startup/crash current is the first block after the last processed block. // currentBlockNumber is the block from where new logs are to be polled & saved. Under normal // conditions this would be equal to lastProcessed.BlockNumber + 1. -func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) { - err := lp.pollAndSaveLogs(ctx, currentBlockNumber) +func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64, isReplay bool) { + err := lp.pollAndSaveLogs(ctx, currentBlockNumber, isReplay) if errors.Is(err, commontypes.ErrFinalityViolated) { lp.lggr.Criticalw("Failed to poll and save logs due to finality violation, retrying later", "err", err) lp.finalityViolated.Store(true) @@ -1048,7 +1117,7 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int } } -func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int64) (err error) { +func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int64, isReplay bool) (err error) { lp.lggr.Debugw("Polling for logs", "currentBlockNumber", currentBlockNumber) // Intentionally not using logPoller.finalityDepth directly but the latestFinalizedBlockNumber returned from lp.latestBlocks() // latestBlocks knows how to pick a proper latestFinalizedBlockNumber based on the logPoller's configuration @@ -1078,7 +1147,7 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int } // Possibly handle a reorg. For example if we crash, we'll be in the middle of processing unfinalized blocks. // Returns (currentBlock || LCA+1 if reorg detected, error) - currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, currentBlock) + currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, currentBlock, isReplay) if err != nil { // If there's an error handling the reorg, we can't be sure what state the db was left in. // Resume from the latest block saved and retry. @@ -1104,7 +1173,7 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int for { if currentBlockNumber > currentBlock.Number { - currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil) + currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil, isReplay) if err != nil { // If there's an error handling the reorg, we can't be sure what state the db was left in. // Resume from the latest block saved. diff --git a/pkg/logpoller/log_poller_internal_test.go b/pkg/logpoller/log_poller_internal_test.go index 66fe07b7cc..337fc58520 100644 --- a/pkg/logpoller/log_poller_internal_test.go +++ b/pkg/logpoller/log_poller_internal_test.go @@ -271,7 +271,7 @@ func assertBackupPollerStartup(t *testing.T, head *evmtypes.Head, finalizedHead assert.Equal(t, int64(0), lp.backupPollerNextBlock) assert.Equal(t, 1, observedLogs.FilterMessageSnippet("ran before first successful log poller run").Len()) - lp.PollAndSaveLogs(ctx, head.Number) + lp.PollAndSaveLogs(ctx, head.Number, false) lastProcessed, err := lp.orm.SelectLatestBlock(ctx) require.NoError(t, err) @@ -380,7 +380,7 @@ func TestLogPoller_Replay(t *testing.T) { { ctx := testutils.Context(t) // process 1 log in block 3 - lp.PollAndSaveLogs(ctx, 4) + lp.PollAndSaveLogs(ctx, 4, false) latest, err := lp.LatestBlock(ctx) require.NoError(t, err) require.Equal(t, int64(4), latest.BlockNumber) @@ -785,7 +785,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) { // insert finalized block with different hash than in RPC require.NoError(t, orm.InsertBlock(t.Context(), common.HexToHash("0x123"), 2, time.Unix(10, 0), 2, 2)) lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) - lp.PollAndSaveLogs(t.Context(), 4) + lp.PollAndSaveLogs(t.Context(), 4, false) require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated) }) t.Run("RPCs contradict each other and return different finalized blocks", func(t *testing.T) { @@ -806,7 +806,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) { return evmtypes.Head{Number: num, Hash: utils.NewHash()} }) lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) - lp.PollAndSaveLogs(t.Context(), 4) + lp.PollAndSaveLogs(t.Context(), 4, false) require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated) }) t.Run("Log's hash does not match block's", func(t *testing.T) { @@ -824,7 +824,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) { ec.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return([]types.Log{{BlockNumber: 5, BlockHash: common.HexToHash("0x123")}}, nil).Once() mockBatchCallContext(t, ec) lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) - lp.PollAndSaveLogs(t.Context(), 4) + lp.PollAndSaveLogs(t.Context(), 4, false) require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated) }) t.Run("Happy path", func(t *testing.T) { @@ -844,7 +844,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) { ec.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return([]types.Log{{BlockNumber: 5, BlockHash: common.BigToHash(big.NewInt(5)), Topics: []common.Hash{{}}}}, nil).Once() mockBatchCallContext(t, ec) lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) - lp.PollAndSaveLogs(t.Context(), 4) + lp.PollAndSaveLogs(t.Context(), 4, false) require.NoError(t, lp.HealthReport()[lp.Name()]) }) } diff --git a/pkg/logpoller/log_poller_test.go b/pkg/logpoller/log_poller_test.go index fcd395086b..4d2a4efb4f 100644 --- a/pkg/logpoller/log_poller_test.go +++ b/pkg/logpoller/log_poller_test.go @@ -377,7 +377,7 @@ func Test_BackupLogPoller(t *testing.T) { th.finalizeThroughBlock(t, 64) // Run ordinary poller + backup poller at least once more - th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber) + th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber, false) require.NoError(t, th.LogPoller.BackupPollAndSaveLogs(ctx)) currentBlock, err := th.LogPoller.LatestBlock(ctx) require.NoError(t, err) @@ -718,7 +718,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { backend.Commit() } currentBlockNumber := int64(1) - lp.PollAndSaveLogs(testutils.Context(t), currentBlockNumber) + lp.PollAndSaveLogs(testutils.Context(t), currentBlockNumber, false) currentBlock, err := lp.LatestBlock(testutils.Context(t)) require.NoError(t, err) matchesGeth := func() bool { @@ -770,7 +770,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { require.NoError(t, err1) t.Logf("New latest (%v, %x), latest parent %x)\n", latest.NumberU64(), latest.Hash(), latest.ParentHash()) } - lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber) + lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber, false) currentBlock, err = lp.LatestBlock(testutils.Context(t)) require.NoError(t, err) } @@ -1370,7 +1370,7 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { assert.Equal(t, 3, int(rpcBlocks2[1].FinalizedBlockNumber)) // after calling PollAndSaveLogs, block 3 (latest finalized block) is persisted in DB - th.LogPoller.PollAndSaveLogs(testutils.Context(t), 1) + th.LogPoller.PollAndSaveLogs(testutils.Context(t), 1, false) block, err := th.ORM.SelectBlockByNumber(testutils.Context(t), 3) require.NoError(t, err) assert.Equal(t, 3, int(block.BlockNumber)) @@ -1621,7 +1621,7 @@ func TestTooManyLogResults(t *testing.T) { Addresses: []common.Address{addr}, }) require.NoError(t, err) - lp.PollAndSaveLogs(ctx, 5) + lp.PollAndSaveLogs(ctx, 5, false) block, err2 := o.SelectLatestBlock(ctx) require.NoError(t, err2) assert.Equal(t, int64(298), block.BlockNumber) @@ -1653,7 +1653,7 @@ func TestTooManyLogResults(t *testing.T) { return []types.Log{}, tooLargeErr // return "too many results" error if block range spans 4 or more blocks }) - lp.PollAndSaveLogs(ctx, 298) + lp.PollAndSaveLogs(ctx, 298, false) block, err := o.SelectLatestBlock(ctx) if err != nil { require.ErrorContains(t, err, "no rows") // In case this subtest is run by itself @@ -1687,7 +1687,7 @@ func TestTooManyLogResults(t *testing.T) { headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalized, nil).Once() headTracker.On("LatestSafeBlock", mock.Anything).Return(finalized, nil).Once() - lp.PollAndSaveLogs(ctx, 298) + lp.PollAndSaveLogs(ctx, 298, false) block, err := o.SelectLatestBlock(ctx) if err != nil { require.ErrorContains(t, err, "no rows") // In case this subtest is run by itself From f60d8b485086b77f225da38c8c53eb7c5d10d54d Mon Sep 17 00:00:00 2001 From: Dmytro Haidashenko Date: Tue, 17 Feb 2026 17:57:35 +0100 Subject: [PATCH 3/3] Fix nits --- pkg/logpoller/log_poller.go | 8 +++----- pkg/logpoller/log_poller_test.go | 9 +++++---- 2 files changed, 8 insertions(+), 9 deletions(-) diff --git a/pkg/logpoller/log_poller.go b/pkg/logpoller/log_poller.go index 01381e52d3..f97289635a 100644 --- a/pkg/logpoller/log_poller.go +++ b/pkg/logpoller/log_poller.go @@ -955,7 +955,6 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { } func (lp *logPoller) headerByNumber(ctx context.Context, blockNumber int64) (*evmtypes.Head, error) { - // If we don't have the current block already, lets get it. header, err := lp.latencyMonitor.HeadByNumber(ctx, big.NewInt(blockNumber)) if err != nil { lp.lggr.Warnw("Unable to get currentBlock", "err", err, "blockNumber", blockNumber) @@ -1045,14 +1044,13 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren if latestBlockRPC.Hash != latestBlockDB.BlockHash { // Reorg detected, handle it - lca, err := lp.handleReorg(ctx, latestBlockRPC) + blockAfterLCA, err := lp.handleReorg(ctx, latestBlockRPC) if err != nil { return nil, fmt.Errorf("failed to handle reorg: %w", err) } - if lca.Number < currentBlock.BlockNumber() { - // LCA is older than current block, we need to get the new current block after reorg - return lca, nil + if blockAfterLCA.Number < currentBlock.BlockNumber() { + return blockAfterLCA, nil } } diff --git a/pkg/logpoller/log_poller_test.go b/pkg/logpoller/log_poller_test.go index 4d2a4efb4f..ab6a69d0f9 100644 --- a/pkg/logpoller/log_poller_test.go +++ b/pkg/logpoller/log_poller_test.go @@ -2161,7 +2161,7 @@ func TestLogPoller_Reorg_On_Replay(t *testing.T) { // Expected behaviour: // 1. LogPoller should replace reorged block 11 with a new data. // 2. DB must not contain at any point logs from both old and new block 11. - // 3. Finality Violation must not occur, since chain did not violate finality depth. + // 3. Finality Violation must not occur, since chain did not violate finality depth. t.Parallel() const reorgedBlockNumber = 11 testCases := []struct { @@ -2208,7 +2208,7 @@ func TestLogPoller_Reorg_On_Replay(t *testing.T) { th.Backend.Commit() } - // start LogPoller and wait for it to complete first poll. Second poll won't happen until we call PollAndSaveLogs again, since poll period is very long. + // Start LogPoller and wait for it to complete the first poll. The second poll won't happen until we call Replay, since the poll period is very long. require.NoError(t, th.LogPoller.Start(t.Context())) defer func() { require.NoError(t, th.LogPoller.Close()) @@ -2222,16 +2222,17 @@ func TestLogPoller_Reorg_On_Replay(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(reorgedBlockNumber), reorgedBlock.Number().Int64()) - // Replace block 11 with a new block and burry it under 1 new block + // Replace block 11 with a new block and bury it under 1 new block require.NoError(t, th.Backend.Fork(reorgedBlock.ParentHash())) const newLogData = int64(123) // Commit reorgedBlock and numberOfBlocksAfterReorg on top of it for range tc.numberOfBlocksAfterReorg + 1 { // emit log that is not tracked by LP to ensure that tracked log has a different index. - // So if reorg is not properly handled and both logs end up in the database + // So if reorg is not properly handled, both logs end up in the database and the test fails. _, err = th.Emitter2.EmitLog1(th.Owner, []*big.Int{big.NewInt(int64(10))}) require.NoError(t, err) _, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(newLogData)}) + require.NoError(t, err) th.Backend.Commit() }