diff --git a/pkg/logpoller/helper_test.go b/pkg/logpoller/helper_test.go index e4f6a0f745..cb79fb721c 100644 --- a/pkg/logpoller/helper_test.go +++ b/pkg/logpoller/helper_test.go @@ -164,7 +164,7 @@ func (th *TestHarness) AdjustTime(t *testing.T, d time.Duration) { } func (th *TestHarness) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) int64 { - th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber) + th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber, false) latest, _ := th.LogPoller.LatestBlock(ctx) return latest.BlockNumber + 1 } diff --git a/pkg/logpoller/log_poller.go b/pkg/logpoller/log_poller.go index 25aca9082c..f97289635a 100644 --- a/pkg/logpoller/log_poller.go +++ b/pkg/logpoller/log_poller.go @@ -76,7 +76,7 @@ type LogPoller interface { type LogPollerTest interface { LogPoller - PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) + PollAndSaveLogs(ctx context.Context, currentBlockNumber int64, isReplay bool) BackupPollAndSaveLogs(ctx context.Context) error Filter(from, to *big.Int, bh *common.Hash) ethereum.FilterQuery GetReplayFromBlock(ctx context.Context, requested int64) (int64, error) @@ -662,7 +662,7 @@ func (lp *logPoller) run() { } else { start = lastProcessed.BlockNumber + 1 } - lp.PollAndSaveLogs(ctx, start) + lp.PollAndSaveLogs(ctx, start, false) case <-backupLogPollTicker.C: if lp.backupPollerBlockDelay == 0 { continue // backup poller is disabled @@ -772,7 +772,7 @@ func (lp *logPoller) handleReplayRequest(ctx context.Context, fromBlockReq int64 if err == nil { // Serially process replay requests. lp.lggr.Infow("Executing replay", "fromBlock", fromBlock, "requested", fromBlockReq) - lp.PollAndSaveLogs(ctx, fromBlock) + lp.PollAndSaveLogs(ctx, fromBlock, true) lp.lggr.Infow("Executing replay finished", "fromBlock", fromBlock, "requested", fromBlockReq) } } else { @@ -954,6 +954,25 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { return nil } +func (lp *logPoller) headerByNumber(ctx context.Context, blockNumber int64) (*evmtypes.Head, error) { + header, err := lp.latencyMonitor.HeadByNumber(ctx, big.NewInt(blockNumber)) + if err != nil { + lp.lggr.Warnw("Unable to get currentBlock", "err", err, "blockNumber", blockNumber) + return nil, fmt.Errorf("unable to get current block header for block number %d: %w", blockNumber, err) + } + // Additional sanity checks, don't necessarily trust the RPC. + if header == nil { + lp.lggr.Errorw("Unexpected nil block from RPC", "blockNumber", blockNumber) + return nil, fmt.Errorf("got nil block for %d", blockNumber) + } + if header.Number != blockNumber { + lp.lggr.Warnw("Unable to get currentBlock, rpc returned incorrect block", "blockNumber", blockNumber, "got", header.Number) + return nil, fmt.Errorf("block mismatch have %d want %d", header.Number, blockNumber) + } + + return header, nil +} + // getCurrentBlockMaybeHandleReorg accepts a block number // and will return that block if its parent points to our last saved block. // One can optionally pass the block header if it has already been queried to avoid an extra RPC call. @@ -962,23 +981,12 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { // 1. Find the LCA by following parent hashes. // 2. Delete all logs and blocks after the LCA // 3. Return the LCA+1, i.e. our new current (unprocessed) block. -func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, currentBlockNumber int64, currentBlock *evmtypes.Head) (head *evmtypes.Head, err error) { +func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, currentBlockNumber int64, currentBlock *evmtypes.Head, isReplay bool) (head *evmtypes.Head, err error) { var err1 error if currentBlock == nil { - // If we don't have the current block already, lets get it. - currentBlock, err1 = lp.latencyMonitor.HeadByNumber(ctx, big.NewInt(currentBlockNumber)) + currentBlock, err1 = lp.headerByNumber(ctx, currentBlockNumber) if err1 != nil { - lp.lggr.Warnw("Unable to get currentBlock", "err", err1, "currentBlockNumber", currentBlockNumber) - return nil, err1 - } - // Additional sanity checks, don't necessarily trust the RPC. - if currentBlock == nil { - lp.lggr.Errorw("Unexpected nil block from RPC", "currentBlockNumber", currentBlockNumber) - return nil, pkgerrors.Errorf("Got nil block for %d", currentBlockNumber) - } - if currentBlock.Number != currentBlockNumber { - lp.lggr.Warnw("Unable to get currentBlock, rpc returned incorrect block", "currentBlockNumber", currentBlockNumber, "got", currentBlock.Number) - return nil, pkgerrors.Errorf("Block mismatch have %d want %d", currentBlock.Number, currentBlockNumber) + return nil, fmt.Errorf("unable to get current block header for block number %d: %w", currentBlockNumber, err1) } } // Does this currentBlock point to the same parent that we have saved? @@ -997,39 +1005,98 @@ func (lp *logPoller) getCurrentBlockMaybeHandleReorg(ctx context.Context, curren } // Check for reorg. if currentBlock.ParentHash != expectedParent.BlockHash { - // There can be another reorg while we're finding the LCA. - // That is ok, since we'll detect it on the next iteration. - // Since we go currentBlock by currentBlock for unfinalized logs, the mismatch starts at currentBlockNumber - 1. - blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock, expectedParent.FinalizedBlockNumber) - if err2 != nil { - return nil, fmt.Errorf("unable to find LCA after reorg: %w", err2) + return lp.handleReorg(ctx, currentBlock) + } + + if !isReplay { + // During normal polling DB does not have any blocks after currentBlockNumber, so no reorg is possible. We can skip extra checks and just return currentBlock. + return currentBlock, nil + } + + // Ensure that if DB contains current block it matches the current block from RPC. + currentBlockDB, err := lp.orm.SelectBlockByNumber(ctx, currentBlockNumber) + if err != nil && !pkgerrors.Is(err, sql.ErrNoRows) { + return nil, fmt.Errorf("failed to get current block from DB %d: %w", currentBlockNumber, err) + } + + if currentBlockDB != nil && currentBlock.Hash != currentBlockDB.BlockHash { + return lp.handleReorg(ctx, currentBlock) + } + + // No reorg for current block, but during replay it's possible that current block is older than the latest block, let's check it too to avoid false positives on finality violation. + latestBlockDB, err1 := lp.orm.SelectLatestBlock(ctx) + if err1 != nil { + if pkgerrors.Is(err1, sql.ErrNoRows) { + lp.lggr.Criticalw("Unexpected state. Expected at least one block to be present in the db when checking for reorg during replay, but got no rows", "currentBlockNumber", currentBlockNumber, "err", err1) } + return nil, pkgerrors.Wrap(err1, "unable to get latest block") + } - lp.lggr.Infow("Reorg detected", "blockAfterLCA", blockAfterLCA.Number, "currentBlockNumber", currentBlockNumber) - // We truncate all the blocks and logs after the LCA. - // We could preserve the logs for forensics, since its possible - // that applications see them and take action upon it, however that - // results in significantly slower reads since we must then compute - // the canonical set per read. Typically, if an application took action on a log - // it would be saved elsewhere e.g. evm.txes, so it seems better to just support the fast reads. - // Its also nicely analogous to reading from the chain itself. - err2 = lp.orm.DeleteLogsAndBlocksAfter(ctx, blockAfterLCA.Number) - if err2 != nil { - // If we error on db commit, we can't know if the tx went through or not. - // We return an error here which will cause us to restart polling from lastBlockSaved + 1 - return nil, err2 + if currentBlock.BlockNumber() >= latestBlockDB.BlockNumber { + // currentBlock is newest, nothing more to check + return currentBlock, nil + } + + latestBlockRPC, err := lp.headerByNumber(ctx, latestBlockDB.BlockNumber) + if err != nil { + return nil, fmt.Errorf("unable to get latest block header for block number %d: %w", latestBlockDB.BlockNumber, err) + } + + if latestBlockRPC.Hash != latestBlockDB.BlockHash { + // Reorg detected, handle it + blockAfterLCA, err := lp.handleReorg(ctx, latestBlockRPC) + if err != nil { + return nil, fmt.Errorf("failed to handle reorg: %w", err) + } + + if blockAfterLCA.Number < currentBlock.BlockNumber() { + return blockAfterLCA, nil } - return blockAfterLCA, nil } - // No reorg, return current block. + return currentBlock, nil } +func (lp *logPoller) handleReorg(ctx context.Context, currentBlock *evmtypes.Head) (*evmtypes.Head, error) { + // during replay currentBlock may be older than the latest block, thus it's possible to miss finality violation, + // if we use its view on latest finalized block. To be safe, we get the latest block from the db. + latestBlock, err := lp.orm.SelectLatestBlock(ctx) + if err != nil { + if pkgerrors.Is(err, sql.ErrNoRows) { + lp.lggr.Criticalw("Unexpected state. Expected at least one block to be present in the db when handling reorg, but got no rows", "currentBlockNumber", currentBlock.Number, "err", err) + } + return nil, pkgerrors.Wrap(err, "failed to get latest finalized block from db") + } + // There can be another reorg while we're finding the LCA. + // That is ok, since we'll detect it on the next iteration. + // Since we go currentBlock by currentBlock for unfinalized logs, the mismatch starts at currentBlockNumber - 1. + blockAfterLCA, err2 := lp.findBlockAfterLCA(ctx, currentBlock, latestBlock.FinalizedBlockNumber) + if err2 != nil { + return nil, fmt.Errorf("unable to find LCA after reorg: %w", err2) + } + + lp.lggr.Infow("Reorg detected", "blockAfterLCA", blockAfterLCA.Number, "currentBlockNumber", currentBlock.Number) + // We truncate all the blocks and logs after the LCA. + // We could preserve the logs for forensics, since its possible + // that applications see them and take action upon it, however that + // results in significantly slower reads since we must then compute + // the canonical set per read. Typically, if an application took action on a log + // it would be saved elsewhere e.g. evm.txes, so it seems better to just support the fast reads. + // Its also nicely analogous to reading from the chain itself. + err2 = lp.orm.DeleteLogsAndBlocksAfter(ctx, blockAfterLCA.Number) + if err2 != nil { + // If we error on db commit, we can't know if the tx went through or not. + // We return an error here which will cause us to restart polling from lastBlockSaved + 1 + return nil, err2 + } + return blockAfterLCA, nil +} + // PollAndSaveLogs On startup/crash current is the first block after the last processed block. // currentBlockNumber is the block from where new logs are to be polled & saved. Under normal // conditions this would be equal to lastProcessed.BlockNumber + 1. -func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64) { - err := lp.pollAndSaveLogs(ctx, currentBlockNumber) +func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int64, isReplay bool) { + err := lp.pollAndSaveLogs(ctx, currentBlockNumber, isReplay) if errors.Is(err, commontypes.ErrFinalityViolated) { lp.lggr.Criticalw("Failed to poll and save logs due to finality violation, retrying later", "err", err) lp.finalityViolated.Store(true) @@ -1048,7 +1115,7 @@ func (lp *logPoller) PollAndSaveLogs(ctx context.Context, currentBlockNumber int } } -func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int64) (err error) { +func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int64, isReplay bool) (err error) { lp.lggr.Debugw("Polling for logs", "currentBlockNumber", currentBlockNumber) // Intentionally not using logPoller.finalityDepth directly but the latestFinalizedBlockNumber returned from lp.latestBlocks() // latestBlocks knows how to pick a proper latestFinalizedBlockNumber based on the logPoller's configuration @@ -1078,7 +1145,7 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int } // Possibly handle a reorg. For example if we crash, we'll be in the middle of processing unfinalized blocks. // Returns (currentBlock || LCA+1 if reorg detected, error) - currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, currentBlock) + currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, currentBlock, isReplay) if err != nil { // If there's an error handling the reorg, we can't be sure what state the db was left in. // Resume from the latest block saved and retry. @@ -1104,7 +1171,7 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int for { if currentBlockNumber > currentBlock.Number { - currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil) + currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil, isReplay) if err != nil { // If there's an error handling the reorg, we can't be sure what state the db was left in. // Resume from the latest block saved. diff --git a/pkg/logpoller/log_poller_internal_test.go b/pkg/logpoller/log_poller_internal_test.go index 66fe07b7cc..337fc58520 100644 --- a/pkg/logpoller/log_poller_internal_test.go +++ b/pkg/logpoller/log_poller_internal_test.go @@ -271,7 +271,7 @@ func assertBackupPollerStartup(t *testing.T, head *evmtypes.Head, finalizedHead assert.Equal(t, int64(0), lp.backupPollerNextBlock) assert.Equal(t, 1, observedLogs.FilterMessageSnippet("ran before first successful log poller run").Len()) - lp.PollAndSaveLogs(ctx, head.Number) + lp.PollAndSaveLogs(ctx, head.Number, false) lastProcessed, err := lp.orm.SelectLatestBlock(ctx) require.NoError(t, err) @@ -380,7 +380,7 @@ func TestLogPoller_Replay(t *testing.T) { { ctx := testutils.Context(t) // process 1 log in block 3 - lp.PollAndSaveLogs(ctx, 4) + lp.PollAndSaveLogs(ctx, 4, false) latest, err := lp.LatestBlock(ctx) require.NoError(t, err) require.Equal(t, int64(4), latest.BlockNumber) @@ -785,7 +785,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) { // insert finalized block with different hash than in RPC require.NoError(t, orm.InsertBlock(t.Context(), common.HexToHash("0x123"), 2, time.Unix(10, 0), 2, 2)) lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) - lp.PollAndSaveLogs(t.Context(), 4) + lp.PollAndSaveLogs(t.Context(), 4, false) require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated) }) t.Run("RPCs contradict each other and return different finalized blocks", func(t *testing.T) { @@ -806,7 +806,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) { return evmtypes.Head{Number: num, Hash: utils.NewHash()} }) lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) - lp.PollAndSaveLogs(t.Context(), 4) + lp.PollAndSaveLogs(t.Context(), 4, false) require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated) }) t.Run("Log's hash does not match block's", func(t *testing.T) { @@ -824,7 +824,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) { ec.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return([]types.Log{{BlockNumber: 5, BlockHash: common.HexToHash("0x123")}}, nil).Once() mockBatchCallContext(t, ec) lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) - lp.PollAndSaveLogs(t.Context(), 4) + lp.PollAndSaveLogs(t.Context(), 4, false) require.ErrorIs(t, lp.HealthReport()[lp.Name()], commontypes.ErrFinalityViolated) }) t.Run("Happy path", func(t *testing.T) { @@ -844,7 +844,7 @@ func Test_PollAndSaveLogs_BackfillFinalityViolation(t *testing.T) { ec.EXPECT().FilterLogs(mock.Anything, mock.Anything).Return([]types.Log{{BlockNumber: 5, BlockHash: common.BigToHash(big.NewInt(5)), Topics: []common.Hash{{}}}}, nil).Once() mockBatchCallContext(t, ec) lp := NewLogPoller(orm, ec, lggr, headTracker, lpOpts) - lp.PollAndSaveLogs(t.Context(), 4) + lp.PollAndSaveLogs(t.Context(), 4, false) require.NoError(t, lp.HealthReport()[lp.Name()]) }) } diff --git a/pkg/logpoller/log_poller_test.go b/pkg/logpoller/log_poller_test.go index b42798b61f..ab6a69d0f9 100644 --- a/pkg/logpoller/log_poller_test.go +++ b/pkg/logpoller/log_poller_test.go @@ -377,7 +377,7 @@ func Test_BackupLogPoller(t *testing.T) { th.finalizeThroughBlock(t, 64) // Run ordinary poller + backup poller at least once more - th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber) + th.LogPoller.PollAndSaveLogs(ctx, currentBlockNumber, false) require.NoError(t, th.LogPoller.BackupPollAndSaveLogs(ctx)) currentBlock, err := th.LogPoller.LatestBlock(ctx) require.NoError(t, err) @@ -718,7 +718,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { backend.Commit() } currentBlockNumber := int64(1) - lp.PollAndSaveLogs(testutils.Context(t), currentBlockNumber) + lp.PollAndSaveLogs(testutils.Context(t), currentBlockNumber, false) currentBlock, err := lp.LatestBlock(testutils.Context(t)) require.NoError(t, err) matchesGeth := func() bool { @@ -770,7 +770,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { require.NoError(t, err1) t.Logf("New latest (%v, %x), latest parent %x)\n", latest.NumberU64(), latest.Hash(), latest.ParentHash()) } - lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber) + lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber, false) currentBlock, err = lp.LatestBlock(testutils.Context(t)) require.NoError(t, err) } @@ -1370,7 +1370,7 @@ func TestLogPoller_GetBlocks_Range(t *testing.T) { assert.Equal(t, 3, int(rpcBlocks2[1].FinalizedBlockNumber)) // after calling PollAndSaveLogs, block 3 (latest finalized block) is persisted in DB - th.LogPoller.PollAndSaveLogs(testutils.Context(t), 1) + th.LogPoller.PollAndSaveLogs(testutils.Context(t), 1, false) block, err := th.ORM.SelectBlockByNumber(testutils.Context(t), 3) require.NoError(t, err) assert.Equal(t, 3, int(block.BlockNumber)) @@ -1621,7 +1621,7 @@ func TestTooManyLogResults(t *testing.T) { Addresses: []common.Address{addr}, }) require.NoError(t, err) - lp.PollAndSaveLogs(ctx, 5) + lp.PollAndSaveLogs(ctx, 5, false) block, err2 := o.SelectLatestBlock(ctx) require.NoError(t, err2) assert.Equal(t, int64(298), block.BlockNumber) @@ -1653,7 +1653,7 @@ func TestTooManyLogResults(t *testing.T) { return []types.Log{}, tooLargeErr // return "too many results" error if block range spans 4 or more blocks }) - lp.PollAndSaveLogs(ctx, 298) + lp.PollAndSaveLogs(ctx, 298, false) block, err := o.SelectLatestBlock(ctx) if err != nil { require.ErrorContains(t, err, "no rows") // In case this subtest is run by itself @@ -1687,7 +1687,7 @@ func TestTooManyLogResults(t *testing.T) { headTracker.On("LatestAndFinalizedBlock", mock.Anything).Return(head, finalized, nil).Once() headTracker.On("LatestSafeBlock", mock.Anything).Return(finalized, nil).Once() - lp.PollAndSaveLogs(ctx, 298) + lp.PollAndSaveLogs(ctx, 298, false) block, err := o.SelectLatestBlock(ctx) if err != nil { require.ErrorContains(t, err, "no rows") // In case this subtest is run by itself @@ -2152,3 +2152,114 @@ func TestWhere(t *testing.T) { assert.Equal(t, []query.Expression{}, result) }) } + +func TestLogPoller_Reorg_On_Replay(t *testing.T) { + // TestCase: + // 1. LogPoller processes blocks to block 11 + // 2. Reorg replaces block 11 with a new block (some additional blocks may be added on top of it) + // 3. Replay is initiated from block below 11. + // Expected behaviour: + // 1. LogPoller should replace reorged block 11 with a new data. + // 2. DB must not contain at any point logs from both old and new block 11. + // 3. Finality Violation must not occur, since chain did not violate finality depth. + t.Parallel() + const reorgedBlockNumber = 11 + testCases := []struct { + name string + numberOfBlocksAfterReorg int + }{ + { + name: "Replay start right after reorg", + numberOfBlocksAfterReorg: 0, + }, + { + name: "Replay start a few blocks after reorg", + numberOfBlocksAfterReorg: 1, + }, + { + name: "Replay start once reorged block is finalized", + numberOfBlocksAfterReorg: 5, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + lpOpts := logpoller.Opts{ + PollPeriod: 24 * time.Hour, // effectively disable automatic polling, so we can control when we poll in the test + UseFinalityTag: false, + FinalityDepth: 3, + BackfillBatchSize: 3, + RPCBatchSize: 2, + KeepFinalizedBlocksDepth: 1000, + } + th := SetupTH(t, lpOpts) + + // Set up a log poller listening for log emitter logs. + err := th.LogPoller.RegisterFilter(testutils.Context(t), logpoller.Filter{ + Name: "Test Emitter 1", + EventSigs: []common.Hash{EmitterABI.Events["Log1"].ID}, + Addresses: []common.Address{th.EmitterAddress1}, + }) + require.NoError(t, err) + + // populate chain with data + for range reorgedBlockNumber - 1 { + _, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(int64(1))}) + require.NoError(t, err) + th.Backend.Commit() + } + + // Start LogPoller and wait for it to complete the first poll. The second poll won't happen until we call Replay, since the poll period is very long. + require.NoError(t, th.LogPoller.Start(t.Context())) + defer func() { + require.NoError(t, th.LogPoller.Close()) + }() + testutils.RequireEventually(t, func() bool { + latest, err := th.LogPoller.LatestBlock(t.Context()) + return err == nil && latest.BlockNumber == reorgedBlockNumber + }) + + reorgedBlock, err := th.Client.BlockByNumber(t.Context(), nil) + require.NoError(t, err) + require.Equal(t, int64(reorgedBlockNumber), reorgedBlock.Number().Int64()) + + // Replace block 11 with a new block and bury it under 1 new block + require.NoError(t, th.Backend.Fork(reorgedBlock.ParentHash())) + const newLogData = int64(123) + // Commit reorgedBlock and numberOfBlocksAfterReorg on top of it + for range tc.numberOfBlocksAfterReorg + 1 { + // emit log that is not tracked by LP to ensure that tracked log has a different index. + // So if reorg is not properly handled, both logs end up in the database and the test fails. + _, err = th.Emitter2.EmitLog1(th.Owner, []*big.Int{big.NewInt(int64(10))}) + require.NoError(t, err) + _, err = th.Emitter1.EmitLog1(th.Owner, []*big.Int{big.NewInt(newLogData)}) + require.NoError(t, err) + th.Backend.Commit() + } + + newReorgedBlock, err := th.Client.BlockByNumber(t.Context(), big.NewInt(reorgedBlockNumber)) + require.NoError(t, err) + require.NotEqual(t, reorgedBlock.Hash().String(), newReorgedBlock.Hash().String()) + + latest, err := th.Client.BlockByNumber(t.Context(), nil) + require.NoError(t, err) + require.Equal(t, int64(tc.numberOfBlocksAfterReorg+reorgedBlockNumber), latest.Number().Int64()) + + // Trigger replay, which should gracefully handle the reorg and end up on the new latest block + err = th.LogPoller.Replay(t.Context(), 5) + require.NoError(t, err) + // LP should be on latest block now + lpLatest, err := th.LogPoller.LatestBlock(t.Context()) + require.NoError(t, err) + require.Equal(t, int64(tc.numberOfBlocksAfterReorg+reorgedBlockNumber), lpLatest.BlockNumber) + logs, err := th.ORM.SelectLogsByBlockRange(t.Context(), reorgedBlockNumber, reorgedBlockNumber) + require.NoError(t, err) + require.Len(t, logs, 1) + require.Equal(t, newLogData, big.NewInt(0).SetBytes(logs[0].Data).Int64(), "Log data should match the log from the new block, indicating that the old block's log was properly removed during replay") + // Ensure reorged block was replaced by a new one + dbBlock, err := th.ORM.SelectBlockByNumber(testutils.Context(t), reorgedBlock.Number().Int64()) + require.NoError(t, err) + require.Equal(t, reorgedBlock.Number().Int64(), dbBlock.BlockNumber) + require.NotEqual(t, reorgedBlock.Hash(), dbBlock.BlockHash) + }) + } +}