diff --git a/pkg/logpoller/helper_test.go b/pkg/logpoller/helper_test.go index cb79fb721c..24c3b072fd 100644 --- a/pkg/logpoller/helper_test.go +++ b/pkg/logpoller/helper_test.go @@ -182,7 +182,7 @@ func (th *TestHarness) assertHaveCanonical(t *testing.T, start, end int) { require.NoError(t, err, "block %v", i) chainBlk, err := th.Client.BlockByNumber(testutils.Context(t), big.NewInt(int64(i))) require.NoError(t, err) - assert.Equal(t, chainBlk.Hash().Bytes(), blk.BlockHash.Bytes(), "block %v", i) + assert.Equal(t, chainBlk.Hash().String(), blk.BlockHash.String(), "block %v", i) } } diff --git a/pkg/logpoller/log_poller.go b/pkg/logpoller/log_poller.go index f97289635a..618aa390a2 100644 --- a/pkg/logpoller/log_poller.go +++ b/pkg/logpoller/log_poller.go @@ -945,7 +945,7 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error { } lp.lggr.Debugw("Inserting backfilled logs with batch endblock", "from", from, "to", to, "logs", len(gethLogs), "blocks", blocks) - err = lp.orm.InsertLogsWithBlock(ctx, convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), endblock) + err = lp.orm.InsertLogsWithBlocks(ctx, convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), []Block{endblock}) if err != nil { lp.lggr.Warnw("Unable to insert logs, retrying", "err", err, "from", from, "to", to) return err @@ -1167,52 +1167,115 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int return fmt.Errorf("failed to backfill finalized logs: %w", err) } currentBlockNumber = lastSafeBackfillBlock + 1 + currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil, isReplay) + if err != nil { + // If there's an error handling the reorg, we can't be sure what state the db was left in. + // Resume from the latest block saved. + return fmt.Errorf("failed to get current block: %w", err) + } + currentBlockNumber = currentBlock.Number } for { - if currentBlockNumber > currentBlock.Number { - currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil, isReplay) - if err != nil { - // If there's an error handling the reorg, we can't be sure what state the db was left in. - // Resume from the latest block saved. - return fmt.Errorf("failed to get current block: %w", err) + + blocks, logs, err := lp.getUnfinalizedLogs(ctx, currentBlock, latestBlockNumber, safeBlockNumber, latestFinalizedBlockNumber, isReplay) + // even if we have an error, we may have partial logs and blocks that can be saved, so we save what we have and then retry. + if len(logs) > 0 || len(blocks) > 0 { + lp.lggr.Debugw("Saving logs", "logs", len(logs), "blocks", len(blocks), "currentBlockNumber", currentBlockNumber) + insertErr := lp.orm.InsertLogsWithBlocks(ctx, logs, blocks) + if insertErr != nil { + lp.lggr.Warnw("Unable to save logs, retrying later", "insertErr", insertErr, "block", currentBlockNumber, "err", err) + return nil } - currentBlockNumber = currentBlock.Number } + if err == nil { + lp.lggr.Debugw("Finished processing unfinalized blocks", "from", currentBlockNumber, "to", latestBlockNumber) + return nil + } + + var reorgErr *reorgError + if !errors.As(err, &reorgErr) { + return fmt.Errorf("failed to get unfinalized logs: %w", err) + } + + lp.lggr.Warnw("Reorg detected during unfinalized log processing, handling reorg", "err", err, "currentBlockNumber", currentBlockNumber, "lastKnownMatchingHead", reorgErr.ReorgedAt.Number) + currentBlock, err = lp.handleReorg(ctx, reorgErr.ReorgedAt) + if err != nil { + return fmt.Errorf("failed to handle reorg: %w", err) + } + lp.lggr.Infow("Finished handling reorg, resuming log processing from new block after LCA", "currentBlockNumber", currentBlock.Number) + } +} + +type reorgError struct { + ReorgedAt *evmtypes.Head +} + +func newReorgError(reorgedAt *evmtypes.Head) error { + return &reorgError{ReorgedAt: reorgedAt} +} + +func (e *reorgError) Error() string { + return fmt.Sprintf("reorg detected at block %d", e.ReorgedAt.Number) +} + +func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmtypes.Head, latest, safe, finalized int64, isReplay bool) ([]Block, []Log, error) { + const maxUnfinalizedBlocks = 2000 + var logs []Log + var blocks []Block + for { h := currentBlock.Hash - var logs []types.Log - logs, err = lp.latencyMonitor.FilterLogs(ctx, lp.Filter(nil, nil, &h)) + rpcLogs, err := lp.latencyMonitor.FilterLogs(ctx, lp.Filter(nil, nil, &h)) if err != nil { - lp.lggr.Warnw("Unable to query for logs, retrying", "err", err, "block", currentBlockNumber) - return nil + lp.lggr.Warnw("Unable to query for logs, retrying on next poll", "err", err, "block", currentBlock.Number) + return blocks, logs, nil } - lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlockNumber, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp) + lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlock.Number, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp) block := Block{ BlockHash: h, - BlockNumber: currentBlockNumber, + BlockNumber: currentBlock.Number, BlockTimestamp: currentBlock.Timestamp, - FinalizedBlockNumber: latestFinalizedBlockNumber, - SafeBlockNumber: safeBlockNumber, - } - err = lp.orm.InsertLogsWithBlock( - ctx, - convertLogs(logs, []Block{block}, lp.lggr, lp.ec.ConfiguredChainID()), - block, - ) + FinalizedBlockNumber: finalized, + SafeBlockNumber: safe, + } + logs = append(logs, convertLogs(rpcLogs, []Block{block}, lp.lggr, lp.ec.ConfiguredChainID())...) + blocks = append(blocks, block) + + if currentBlock.Number >= latest { + return blocks, logs, nil + } + + if len(blocks) >= maxUnfinalizedBlocks { + lp.lggr.Warnw("Too many unfinalized blocks, stopping log retrieval to avoid OOM", "currentBlockNumber", currentBlock.Number, "latestBlockNumber", latest) + return blocks, logs, nil + } + + nextBlock, err := lp.headerByNumber(ctx, currentBlock.Number+1) if err != nil { - lp.lggr.Warnw("Unable to save logs resuming from last saved block + 1", "err", err, "block", currentBlockNumber) - return nil + lp.lggr.Warnw("Unable to get next block header, retrying on next poll", "err", err, "block", currentBlock.Number) + return blocks, logs, nil } - // Update current block. - // Same reorg detection on unfinalized blocks. - currentBlockNumber++ - if currentBlockNumber > latestBlockNumber { - break + + if nextBlock.ParentHash != currentBlock.Hash { + return blocks, logs, newReorgError(nextBlock) } - } - return nil + if isReplay { + // During replay, we also check if the next block matches what we have in the DB to avoid false positives on reorgs due to finality violation. + nextBlockDB, err := lp.orm.SelectBlockByNumber(ctx, nextBlock.Number) + if err != nil && !pkgerrors.Is(err, sql.ErrNoRows) { + lp.lggr.Warnw("Unable to get next block from DB during replay, retrying on next poll", "err", err, "block", nextBlock.Number) + return blocks, logs, nil + } + + if nextBlockDB != nil && nextBlock.Hash != nextBlockDB.BlockHash { + return blocks, logs, newReorgError(nextBlock) + } + } + + currentBlock = nextBlock + } } // Returns information about latestBlock, latestFinalizedBlockNumber provided by HeadTracker diff --git a/pkg/logpoller/log_poller_test.go b/pkg/logpoller/log_poller_test.go index ab6a69d0f9..6004075946 100644 --- a/pkg/logpoller/log_poller_test.go +++ b/pkg/logpoller/log_poller_test.go @@ -690,7 +690,9 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { owner.GasPrice = big.NewInt(10e9) p.Property("synchronized with geth", prop.ForAll(func(mineOrReorg []uint64) bool { // After the set of reorgs, we should have the same canonical blocks that geth does. - t.Log("Starting test", mineOrReorg) + seed := time.Now().UnixNano() + localRand := rand.New(rand.NewSource(seed)) + t.Log("Starting test", mineOrReorg, "seed", seed) chainID := testutils.NewRandomEVMChainID() // Set up a test chain with a log emitting contract deployed. orm := logpoller.NewORM(chainID, db, lggr) @@ -742,7 +744,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { } // Randomly pick to mine or reorg for i := 0; i < numChainInserts; i++ { - if rand.Int63()%2 == 0 { + if localRand.Int63()%2 == 0 { // Mine blocks for j := 0; j < int(mineOrReorg[i]); j++ { backend.Commit() @@ -770,7 +772,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) { require.NoError(t, err1) t.Logf("New latest (%v, %x), latest parent %x)\n", latest.NumberU64(), latest.Hash(), latest.ParentHash()) } - lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber, false) + lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber+1, false) currentBlock, err = lp.LatestBlock(testutils.Context(t)) require.NoError(t, err) } diff --git a/pkg/logpoller/observability.go b/pkg/logpoller/observability.go index 0cdd040df0..ffa1b47456 100644 --- a/pkg/logpoller/observability.go +++ b/pkg/logpoller/observability.go @@ -52,16 +52,16 @@ func (o *ObservedORM) InsertLogs(ctx context.Context, logs []Log) error { err := withObservedExec(ctx, o, "InsertLogs", metrics.Create, func() error { return o.ORM.InsertLogs(ctx, logs) }) - trackInsertedLogsAndBlock(ctx, o, logs, nil, err) + trackInsertedLogsAndBlocks(ctx, o, logs, nil, err) trackInsertedBlockLatency(ctx, o, logs, err) return err } -func (o *ObservedORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block) error { +func (o *ObservedORM) InsertLogsWithBlocks(ctx context.Context, logs []Log, blocks []Block) error { err := withObservedExec(ctx, o, "InsertLogsWithBlock", metrics.Create, func() error { - return o.ORM.InsertLogsWithBlock(ctx, logs, block) + return o.ORM.InsertLogsWithBlocks(ctx, logs, blocks) }) - trackInsertedLogsAndBlock(ctx, o, logs, &block, err) + trackInsertedLogsAndBlocks(ctx, o, logs, blocks, err) trackInsertedBlockLatency(ctx, o, logs, err) return err } @@ -290,15 +290,15 @@ func withObservedExec(ctx context.Context, o *ObservedORM, query string, queryTy return exec() } -func trackInsertedLogsAndBlock(ctx context.Context, o *ObservedORM, logs []Log, block *Block, err error) { +func trackInsertedLogsAndBlocks(ctx context.Context, o *ObservedORM, logs []Log, blocks []Block, err error) { if err != nil { return } ctx, cancel := context.WithTimeout(ctx, client.QueryTimeout) defer cancel() o.metrics.IncrementLogsInserted(ctx, int64(len(logs))) - if block != nil { - o.metrics.IncrementBlocksInserted(ctx, 1) + if len(blocks) > 0 { + o.metrics.IncrementBlocksInserted(ctx, int64(len(blocks))) } } diff --git a/pkg/logpoller/observability_test.go b/pkg/logpoller/observability_test.go index 5cdedeae47..08d78f3953 100644 --- a/pkg/logpoller/observability_test.go +++ b/pkg/logpoller/observability_test.go @@ -42,9 +42,11 @@ func TestMultipleMetricsArePublished(t *testing.T) { _, _ = orm.SelectLatestLogEventSigsAddrsWithConfs(ctx, 0, []common.Address{{}}, []common.Hash{{}}, 1) _, _ = orm.SelectIndexedLogsCreatedAfter(ctx, common.Address{}, common.Hash{}, 1, []common.Hash{}, time.Now(), 0) _ = orm.InsertLogs(ctx, []Log{}) - _ = orm.InsertLogsWithBlock(ctx, []Log{}, Block{ - BlockNumber: 1, - BlockTimestamp: time.Now(), + _ = orm.InsertLogsWithBlocks(ctx, []Log{}, []Block{ + { + BlockNumber: 1, + BlockTimestamp: time.Now(), + }, }) require.Equal(t, 13, testutil.CollectAndCount(orm.queryDuration)) @@ -114,21 +116,25 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) { assert.Equal(t, 10, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420")))) assert.Equal(t, 1, testutil.CollectAndCount(orm.discoveryLatency)) // Insert 5 more logs with block - require.NoError(t, orm.InsertLogsWithBlock(ctx, logs[10:15], Block{ - BlockHash: utils.RandomBytes32(), - BlockNumber: 10, - BlockTimestamp: time.Now(), - FinalizedBlockNumber: 5, + require.NoError(t, orm.InsertLogsWithBlocks(ctx, logs[10:15], []Block{ + { + BlockHash: utils.RandomBytes32(), + BlockNumber: 10, + BlockTimestamp: time.Now(), + FinalizedBlockNumber: 5, + }, })) assert.Equal(t, 15, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420")))) assert.Equal(t, 1, int(testutil.ToFloat64(orm.blocksInserted.WithLabelValues(network, "420")))) // Insert 5 more logs with block - require.NoError(t, orm.InsertLogsWithBlock(ctx, logs[15:], Block{ - BlockHash: utils.RandomBytes32(), - BlockNumber: 15, - BlockTimestamp: time.Now(), - FinalizedBlockNumber: 5, + require.NoError(t, orm.InsertLogsWithBlocks(ctx, logs[15:], []Block{ + { + BlockHash: utils.RandomBytes32(), + BlockNumber: 15, + BlockTimestamp: time.Now(), + FinalizedBlockNumber: 5, + }, })) assert.Equal(t, 20, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420")))) assert.Equal(t, 2, int(testutil.ToFloat64(orm.blocksInserted.WithLabelValues(network, "420")))) @@ -144,9 +150,11 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) { assert.Equal(t, 2, counterFromGaugeByLabels(orm.datasetSize, network, "420", "DeleteBlocksBefore", "delete")) // Don't update counters in case of an error - require.Error(t, orm.InsertLogsWithBlock(ctx, logs, Block{ - BlockHash: utils.RandomBytes32(), - BlockTimestamp: time.Now(), + require.Error(t, orm.InsertLogsWithBlocks(ctx, logs, []Block{ + { + BlockHash: utils.RandomBytes32(), + BlockTimestamp: time.Now(), + }, })) assert.Equal(t, 20, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420")))) assert.Equal(t, 2, int(testutil.ToFloat64(orm.blocksInserted.WithLabelValues(network, "420")))) diff --git a/pkg/logpoller/orm.go b/pkg/logpoller/orm.go index 5d6313169b..0b1836ac77 100644 --- a/pkg/logpoller/orm.go +++ b/pkg/logpoller/orm.go @@ -25,7 +25,7 @@ import ( // What is more, LogPoller should not be aware of the underlying database implementation and delegate all the queries to the ORM. type ORM interface { InsertLogs(ctx context.Context, logs []Log) error - InsertLogsWithBlock(ctx context.Context, logs []Log, block Block) error + InsertLogsWithBlocks(ctx context.Context, logs []Log, blocks []Block) error InsertFilter(ctx context.Context, filter Filter) error LoadFilters(ctx context.Context) (map[string]Filter, error) @@ -113,6 +113,18 @@ func (o *DSORM) InsertBlock(ctx context.Context, blockHash common.Hash, blockNum return err } +func (o *DSORM) InsertBlocks(ctx context.Context, blocks []Block) error { + const q = `INSERT INTO evm.log_poller_blocks + (evm_chain_id, block_hash, block_number, block_timestamp, finalized_block_number, created_at, safe_block_number) + VALUES (:evm_chain_id, :block_hash, :block_number, :block_timestamp, :finalized_block_number, NOW(), :safe_block_number) + ON CONFLICT DO NOTHING` + // maintain behaviour of InsertBlock + for i := range blocks { + blocks[i].EVMChainID = ubig.New(o.chainID) + } + return batchInsert(ctx, o.ds, q, blocks, 1000) +} + // InsertFilter is idempotent. // // Each address/event pair must have a unique job id, so it may be removed when the job is deleted. @@ -545,10 +557,10 @@ func (o *DSORM) InsertLogs(ctx context.Context, logs []Log) error { }) } -func (o *DSORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block) error { +func (o *DSORM) InsertLogsWithBlocks(ctx context.Context, logs []Log, blocks []Block) error { // Optimization, don't open TX when there is only a block to be persisted if len(logs) == 0 { - return o.InsertBlock(ctx, block.BlockHash, block.BlockNumber, block.BlockTimestamp, block.FinalizedBlockNumber, block.SafeBlockNumber) + return o.InsertBlocks(ctx, blocks) } if err := o.validateLogs(logs); err != nil { @@ -557,7 +569,7 @@ func (o *DSORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block // Block and logs goes with the same TX to ensure atomicity return o.Transact(ctx, func(orm *DSORM) error { - err := orm.InsertBlock(ctx, block.BlockHash, block.BlockNumber, block.BlockTimestamp, block.FinalizedBlockNumber, block.SafeBlockNumber) + err := orm.InsertBlocks(ctx, blocks) if err != nil { return err } @@ -566,20 +578,22 @@ func (o *DSORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block } func (o *DSORM) insertLogsWithinTx(ctx context.Context, logs []Log, tx sqlutil.DataSource) error { - batchInsertSize := 4000 - for i := 0; i < len(logs); i += batchInsertSize { - start, end := i, i+batchInsertSize - if end > len(logs) { - end = len(logs) - } - - query := `INSERT INTO evm.logs + const q = `INSERT INTO evm.logs (evm_chain_id, log_index, block_hash, block_number, block_timestamp, address, event_sig, topics, tx_hash, data, created_at) VALUES (:evm_chain_id, :log_index, :block_hash, :block_number, :block_timestamp, :address, :event_sig, :topics, :tx_hash, :data, NOW()) ON CONFLICT DO NOTHING` + return batchInsert(ctx, tx, q, logs, 4000) +} + +func batchInsert[T any](ctx context.Context, ds sqlutil.DataSource, query string, objs []T, batchInsertSize int) error { + for i := 0; i < len(objs); i += batchInsertSize { + start, end := i, i+batchInsertSize + if end > len(objs) { + end = len(objs) + } - _, err := tx.NamedExecContext(ctx, query, logs[start:end]) + _, err := ds.NamedExecContext(ctx, query, objs[start:end]) if err != nil { if pkgerrors.Is(err, context.DeadlineExceeded) && batchInsertSize > 500 { // In case of DB timeouts, try to insert again with a smaller batch upto a limit diff --git a/pkg/logpoller/orm_test.go b/pkg/logpoller/orm_test.go index 3a32cc933c..94e3bff5b9 100644 --- a/pkg/logpoller/orm_test.go +++ b/pkg/logpoller/orm_test.go @@ -94,6 +94,32 @@ func TestLogPoller_Batching(t *testing.T) { require.Equal(t, len(logs), len(lgs)) } +func TestLogPoller_Blocks_Batching(t *testing.T) { + t.Parallel() + ctx := testutils.Context(t) + th := SetupTH(t, lpOpts) + var blocks []logpoller.Block + var logs []logpoller.Log + const numBlocks = 2000 + for i := 0; i < numBlocks; i++ { + blockHash := common.HexToHash(fmt.Sprintf("0x%d", i+1)) + blocks = append(blocks, logpoller.Block{ + EVMChainID: ubig.New(th.ChainID), + BlockHash: blockHash, + BlockNumber: int64(i + 1), + }) + logs = append(logs, GenLog(th.ChainID, int64(i), int64(i+1), blockHash.String(), EmitterABI.Events["Log1"].ID.Bytes(), th.EmitterAddress1)) + } + require.NoError(t, th.ORM.InsertLogsWithBlocks(ctx, logs, blocks)) + lgs, err := th.ORM.SelectLogsByBlockRange(ctx, 1, numBlocks) + require.NoError(t, err) + // Make sure all logs are inserted + require.Equal(t, len(logs), len(lgs)) + dbBlocks, err := th.ORM.GetBlocksRange(ctx, 1, numBlocks) + require.NoError(t, err) + require.Equal(t, numBlocks, len(dbBlocks)) +} + func TestORM_GetBlocks_From_Range(t *testing.T) { th := SetupTH(t, lpOpts) o1 := th.ORM @@ -2116,7 +2142,7 @@ func TestInsertLogsWithBlock(t *testing.T) { t.Run(tt.name, func(t *testing.T) { // clean all logs and blocks between test cases defer func() { _ = o.DeleteLogsAndBlocksAfter(ctx, 0) }() - insertError := o.InsertLogsWithBlock(ctx, tt.logs, tt.block) + insertError := o.InsertLogsWithBlocks(ctx, tt.logs, []logpoller.Block{tt.block}) logs, logsErr := o.SelectLogs(ctx, 0, math.MaxInt, address, event) block, blockErr := o.SelectLatestBlock(ctx) @@ -2209,16 +2235,18 @@ func TestSelectLogsDataWordBetween(t *testing.T) { secondLogData = append(secondLogData, logpoller.EvmWord(5).Bytes()...) secondLogData = append(secondLogData, logpoller.EvmWord(20).Bytes()...) - err := th.ORM.InsertLogsWithBlock(ctx, + err := th.ORM.InsertLogsWithBlocks(ctx, []logpoller.Log{ GenLogWithData(th.ChainID, address, eventSig, 1, 1, firstLogData), GenLogWithData(th.ChainID, address, eventSig, 2, 2, secondLogData), }, - logpoller.Block{ - BlockHash: utils.RandomBytes32(), - BlockNumber: 10, - BlockTimestamp: time.Now(), - FinalizedBlockNumber: 1, + []logpoller.Block{ + { + BlockHash: utils.RandomBytes32(), + BlockNumber: 10, + BlockTimestamp: time.Now(), + FinalizedBlockNumber: 1, + }, }, ) require.NoError(t, err)