Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/logpoller/helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ func (th *TestHarness) assertHaveCanonical(t *testing.T, start, end int) {
require.NoError(t, err, "block %v", i)
chainBlk, err := th.Client.BlockByNumber(testutils.Context(t), big.NewInt(int64(i)))
require.NoError(t, err)
assert.Equal(t, chainBlk.Hash().Bytes(), blk.BlockHash.Bytes(), "block %v", i)
assert.Equal(t, chainBlk.Hash().String(), blk.BlockHash.String(), "block %v", i)
}
}

Expand Down
125 changes: 94 additions & 31 deletions pkg/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -945,7 +945,7 @@ func (lp *logPoller) backfill(ctx context.Context, start, end int64) error {
}

lp.lggr.Debugw("Inserting backfilled logs with batch endblock", "from", from, "to", to, "logs", len(gethLogs), "blocks", blocks)
err = lp.orm.InsertLogsWithBlock(ctx, convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), endblock)
err = lp.orm.InsertLogsWithBlocks(ctx, convertLogs(gethLogs, blocks, lp.lggr, lp.ec.ConfiguredChainID()), []Block{endblock})
if err != nil {
lp.lggr.Warnw("Unable to insert logs, retrying", "err", err, "from", from, "to", to)
return err
Expand Down Expand Up @@ -1167,52 +1167,115 @@ func (lp *logPoller) pollAndSaveLogs(ctx context.Context, currentBlockNumber int
return fmt.Errorf("failed to backfill finalized logs: %w", err)
}
currentBlockNumber = lastSafeBackfillBlock + 1
currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil, isReplay)
if err != nil {
// If there's an error handling the reorg, we can't be sure what state the db was left in.
// Resume from the latest block saved.
return fmt.Errorf("failed to get current block: %w", err)
}
currentBlockNumber = currentBlock.Number
}

for {
if currentBlockNumber > currentBlock.Number {
currentBlock, err = lp.getCurrentBlockMaybeHandleReorg(ctx, currentBlockNumber, nil, isReplay)
if err != nil {
// If there's an error handling the reorg, we can't be sure what state the db was left in.
// Resume from the latest block saved.
return fmt.Errorf("failed to get current block: %w", err)

blocks, logs, err := lp.getUnfinalizedLogs(ctx, currentBlock, latestBlockNumber, safeBlockNumber, latestFinalizedBlockNumber, isReplay)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should there be some batching as well? If a node is far behind, can this be potential OOM risk?

// even if we have an error, we may have partial logs and blocks that can be saved, so we save what we have and then retry.
if len(logs) > 0 || len(blocks) > 0 {
lp.lggr.Debugw("Saving logs", "logs", len(logs), "blocks", len(blocks), "currentBlockNumber", currentBlockNumber)
insertErr := lp.orm.InsertLogsWithBlocks(ctx, logs, blocks)
if insertErr != nil {
lp.lggr.Warnw("Unable to save logs, retrying later", "insertErr", insertErr, "block", currentBlockNumber, "err", err)
return nil
}
currentBlockNumber = currentBlock.Number
}

if err == nil {
lp.lggr.Debugw("Finished processing unfinalized blocks", "from", currentBlockNumber, "to", latestBlockNumber)
return nil
}

var reorgErr *reorgError
if !errors.As(err, &reorgErr) {
return fmt.Errorf("failed to get unfinalized logs: %w", err)
}

lp.lggr.Warnw("Reorg detected during unfinalized log processing, handling reorg", "err", err, "currentBlockNumber", currentBlockNumber, "lastKnownMatchingHead", reorgErr.ReorgedAt.Number)
currentBlock, err = lp.handleReorg(ctx, reorgErr.ReorgedAt)
if err != nil {
return fmt.Errorf("failed to handle reorg: %w", err)
}
lp.lggr.Infow("Finished handling reorg, resuming log processing from new block after LCA", "currentBlockNumber", currentBlock.Number)
}
}

type reorgError struct {
ReorgedAt *evmtypes.Head
}

func newReorgError(reorgedAt *evmtypes.Head) error {
return &reorgError{ReorgedAt: reorgedAt}
}

func (e *reorgError) Error() string {
return fmt.Sprintf("reorg detected at block %d", e.ReorgedAt.Number)
}

func (lp *logPoller) getUnfinalizedLogs(ctx context.Context, currentBlock *evmtypes.Head, latest, safe, finalized int64, isReplay bool) ([]Block, []Log, error) {
const maxUnfinalizedBlocks = 2000
var logs []Log
var blocks []Block
for {
h := currentBlock.Hash
var logs []types.Log
logs, err = lp.latencyMonitor.FilterLogs(ctx, lp.Filter(nil, nil, &h))
rpcLogs, err := lp.latencyMonitor.FilterLogs(ctx, lp.Filter(nil, nil, &h))
if err != nil {
lp.lggr.Warnw("Unable to query for logs, retrying", "err", err, "block", currentBlockNumber)
return nil
lp.lggr.Warnw("Unable to query for logs, retrying on next poll", "err", err, "block", currentBlock.Number)
return blocks, logs, nil
}
lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlockNumber, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp)
lp.lggr.Debugw("Unfinalized log query", "logs", len(logs), "currentBlockNumber", currentBlock.Number, "blockHash", currentBlock.Hash, "timestamp", currentBlock.Timestamp)
block := Block{
BlockHash: h,
BlockNumber: currentBlockNumber,
BlockNumber: currentBlock.Number,
BlockTimestamp: currentBlock.Timestamp,
FinalizedBlockNumber: latestFinalizedBlockNumber,
SafeBlockNumber: safeBlockNumber,
}
err = lp.orm.InsertLogsWithBlock(
ctx,
convertLogs(logs, []Block{block}, lp.lggr, lp.ec.ConfiguredChainID()),
block,
)
FinalizedBlockNumber: finalized,
SafeBlockNumber: safe,
}
logs = append(logs, convertLogs(rpcLogs, []Block{block}, lp.lggr, lp.ec.ConfiguredChainID())...)
blocks = append(blocks, block)

if currentBlock.Number >= latest {
return blocks, logs, nil
}

if len(blocks) >= maxUnfinalizedBlocks {
lp.lggr.Warnw("Too many unfinalized blocks, stopping log retrieval to avoid OOM", "currentBlockNumber", currentBlock.Number, "latestBlockNumber", latest)
return blocks, logs, nil
}

nextBlock, err := lp.headerByNumber(ctx, currentBlock.Number+1)
if err != nil {
lp.lggr.Warnw("Unable to save logs resuming from last saved block + 1", "err", err, "block", currentBlockNumber)
return nil
lp.lggr.Warnw("Unable to get next block header, retrying on next poll", "err", err, "block", currentBlock.Number)
return blocks, logs, nil
}
// Update current block.
// Same reorg detection on unfinalized blocks.
currentBlockNumber++
if currentBlockNumber > latestBlockNumber {
break

if nextBlock.ParentHash != currentBlock.Hash {
return blocks, logs, newReorgError(nextBlock)
}
}

return nil
if isReplay {
// During replay, we also check if the next block matches what we have in the DB to avoid false positives on reorgs due to finality violation.
nextBlockDB, err := lp.orm.SelectBlockByNumber(ctx, nextBlock.Number)
if err != nil && !pkgerrors.Is(err, sql.ErrNoRows) {
lp.lggr.Warnw("Unable to get next block from DB during replay, retrying on next poll", "err", err, "block", nextBlock.Number)
return blocks, logs, nil
}

if nextBlockDB != nil && nextBlock.Hash != nextBlockDB.BlockHash {
return blocks, logs, newReorgError(nextBlock)
}
}

currentBlock = nextBlock
}
}

// Returns information about latestBlock, latestFinalizedBlockNumber provided by HeadTracker
Expand Down
8 changes: 5 additions & 3 deletions pkg/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,7 +690,9 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) {
owner.GasPrice = big.NewInt(10e9)
p.Property("synchronized with geth", prop.ForAll(func(mineOrReorg []uint64) bool {
// After the set of reorgs, we should have the same canonical blocks that geth does.
t.Log("Starting test", mineOrReorg)
seed := time.Now().UnixNano()
localRand := rand.New(rand.NewSource(seed))
t.Log("Starting test", mineOrReorg, "seed", seed)
chainID := testutils.NewRandomEVMChainID()
// Set up a test chain with a log emitting contract deployed.
orm := logpoller.NewORM(chainID, db, lggr)
Expand Down Expand Up @@ -742,7 +744,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) {
}
// Randomly pick to mine or reorg
for i := 0; i < numChainInserts; i++ {
if rand.Int63()%2 == 0 {
if localRand.Int63()%2 == 0 {
// Mine blocks
for j := 0; j < int(mineOrReorg[i]); j++ {
backend.Commit()
Expand Down Expand Up @@ -770,7 +772,7 @@ func TestLogPoller_SynchronizedWithGeth(t *testing.T) {
require.NoError(t, err1)
t.Logf("New latest (%v, %x), latest parent %x)\n", latest.NumberU64(), latest.Hash(), latest.ParentHash())
}
lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber, false)
lp.PollAndSaveLogs(testutils.Context(t), currentBlock.BlockNumber+1, false)
currentBlock, err = lp.LatestBlock(testutils.Context(t))
require.NoError(t, err)
}
Expand Down
14 changes: 7 additions & 7 deletions pkg/logpoller/observability.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,16 @@ func (o *ObservedORM) InsertLogs(ctx context.Context, logs []Log) error {
err := withObservedExec(ctx, o, "InsertLogs", metrics.Create, func() error {
return o.ORM.InsertLogs(ctx, logs)
})
trackInsertedLogsAndBlock(ctx, o, logs, nil, err)
trackInsertedLogsAndBlocks(ctx, o, logs, nil, err)
trackInsertedBlockLatency(ctx, o, logs, err)
return err
}

func (o *ObservedORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block) error {
func (o *ObservedORM) InsertLogsWithBlocks(ctx context.Context, logs []Log, blocks []Block) error {
err := withObservedExec(ctx, o, "InsertLogsWithBlock", metrics.Create, func() error {
return o.ORM.InsertLogsWithBlock(ctx, logs, block)
return o.ORM.InsertLogsWithBlocks(ctx, logs, blocks)
})
trackInsertedLogsAndBlock(ctx, o, logs, &block, err)
trackInsertedLogsAndBlocks(ctx, o, logs, blocks, err)
trackInsertedBlockLatency(ctx, o, logs, err)
return err
}
Expand Down Expand Up @@ -290,15 +290,15 @@ func withObservedExec(ctx context.Context, o *ObservedORM, query string, queryTy
return exec()
}

func trackInsertedLogsAndBlock(ctx context.Context, o *ObservedORM, logs []Log, block *Block, err error) {
func trackInsertedLogsAndBlocks(ctx context.Context, o *ObservedORM, logs []Log, blocks []Block, err error) {
if err != nil {
return
}
ctx, cancel := context.WithTimeout(ctx, client.QueryTimeout)
defer cancel()
o.metrics.IncrementLogsInserted(ctx, int64(len(logs)))
if block != nil {
o.metrics.IncrementBlocksInserted(ctx, 1)
if len(blocks) > 0 {
o.metrics.IncrementBlocksInserted(ctx, int64(len(blocks)))
}
}

Expand Down
40 changes: 24 additions & 16 deletions pkg/logpoller/observability_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ func TestMultipleMetricsArePublished(t *testing.T) {
_, _ = orm.SelectLatestLogEventSigsAddrsWithConfs(ctx, 0, []common.Address{{}}, []common.Hash{{}}, 1)
_, _ = orm.SelectIndexedLogsCreatedAfter(ctx, common.Address{}, common.Hash{}, 1, []common.Hash{}, time.Now(), 0)
_ = orm.InsertLogs(ctx, []Log{})
_ = orm.InsertLogsWithBlock(ctx, []Log{}, Block{
BlockNumber: 1,
BlockTimestamp: time.Now(),
_ = orm.InsertLogsWithBlocks(ctx, []Log{}, []Block{
{
BlockNumber: 1,
BlockTimestamp: time.Now(),
},
})

require.Equal(t, 13, testutil.CollectAndCount(orm.queryDuration))
Expand Down Expand Up @@ -114,21 +116,25 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) {
assert.Equal(t, 10, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420"))))
assert.Equal(t, 1, testutil.CollectAndCount(orm.discoveryLatency))
// Insert 5 more logs with block
require.NoError(t, orm.InsertLogsWithBlock(ctx, logs[10:15], Block{
BlockHash: utils.RandomBytes32(),
BlockNumber: 10,
BlockTimestamp: time.Now(),
FinalizedBlockNumber: 5,
require.NoError(t, orm.InsertLogsWithBlocks(ctx, logs[10:15], []Block{
{
BlockHash: utils.RandomBytes32(),
BlockNumber: 10,
BlockTimestamp: time.Now(),
FinalizedBlockNumber: 5,
},
}))
assert.Equal(t, 15, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420"))))
assert.Equal(t, 1, int(testutil.ToFloat64(orm.blocksInserted.WithLabelValues(network, "420"))))

// Insert 5 more logs with block
require.NoError(t, orm.InsertLogsWithBlock(ctx, logs[15:], Block{
BlockHash: utils.RandomBytes32(),
BlockNumber: 15,
BlockTimestamp: time.Now(),
FinalizedBlockNumber: 5,
require.NoError(t, orm.InsertLogsWithBlocks(ctx, logs[15:], []Block{
{
BlockHash: utils.RandomBytes32(),
BlockNumber: 15,
BlockTimestamp: time.Now(),
FinalizedBlockNumber: 5,
},
}))
assert.Equal(t, 20, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420"))))
assert.Equal(t, 2, int(testutil.ToFloat64(orm.blocksInserted.WithLabelValues(network, "420"))))
Expand All @@ -144,9 +150,11 @@ func TestCountersAreProperlyPopulatedForWrites(t *testing.T) {
assert.Equal(t, 2, counterFromGaugeByLabels(orm.datasetSize, network, "420", "DeleteBlocksBefore", "delete"))

// Don't update counters in case of an error
require.Error(t, orm.InsertLogsWithBlock(ctx, logs, Block{
BlockHash: utils.RandomBytes32(),
BlockTimestamp: time.Now(),
require.Error(t, orm.InsertLogsWithBlocks(ctx, logs, []Block{
{
BlockHash: utils.RandomBytes32(),
BlockTimestamp: time.Now(),
},
}))
assert.Equal(t, 20, int(testutil.ToFloat64(orm.logsInserted.WithLabelValues(network, "420"))))
assert.Equal(t, 2, int(testutil.ToFloat64(orm.blocksInserted.WithLabelValues(network, "420"))))
Expand Down
40 changes: 27 additions & 13 deletions pkg/logpoller/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
// What is more, LogPoller should not be aware of the underlying database implementation and delegate all the queries to the ORM.
type ORM interface {
InsertLogs(ctx context.Context, logs []Log) error
InsertLogsWithBlock(ctx context.Context, logs []Log, block Block) error
InsertLogsWithBlocks(ctx context.Context, logs []Log, blocks []Block) error
InsertFilter(ctx context.Context, filter Filter) error

LoadFilters(ctx context.Context) (map[string]Filter, error)
Expand Down Expand Up @@ -113,6 +113,18 @@ func (o *DSORM) InsertBlock(ctx context.Context, blockHash common.Hash, blockNum
return err
}

func (o *DSORM) InsertBlocks(ctx context.Context, blocks []Block) error {
const q = `INSERT INTO evm.log_poller_blocks
(evm_chain_id, block_hash, block_number, block_timestamp, finalized_block_number, created_at, safe_block_number)
VALUES (:evm_chain_id, :block_hash, :block_number, :block_timestamp, :finalized_block_number, NOW(), :safe_block_number)
ON CONFLICT DO NOTHING`
// maintain behaviour of InsertBlock
for i := range blocks {
blocks[i].EVMChainID = ubig.New(o.chainID)
}
return batchInsert(ctx, o.ds, q, blocks, 1000)
}

// InsertFilter is idempotent.
//
// Each address/event pair must have a unique job id, so it may be removed when the job is deleted.
Expand Down Expand Up @@ -545,10 +557,10 @@ func (o *DSORM) InsertLogs(ctx context.Context, logs []Log) error {
})
}

func (o *DSORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block) error {
func (o *DSORM) InsertLogsWithBlocks(ctx context.Context, logs []Log, blocks []Block) error {
// Optimization, don't open TX when there is only a block to be persisted
if len(logs) == 0 {
return o.InsertBlock(ctx, block.BlockHash, block.BlockNumber, block.BlockTimestamp, block.FinalizedBlockNumber, block.SafeBlockNumber)
return o.InsertBlocks(ctx, blocks)
}

if err := o.validateLogs(logs); err != nil {
Expand All @@ -557,7 +569,7 @@ func (o *DSORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block

// Block and logs goes with the same TX to ensure atomicity
return o.Transact(ctx, func(orm *DSORM) error {
err := orm.InsertBlock(ctx, block.BlockHash, block.BlockNumber, block.BlockTimestamp, block.FinalizedBlockNumber, block.SafeBlockNumber)
err := orm.InsertBlocks(ctx, blocks)
if err != nil {
return err
}
Expand All @@ -566,20 +578,22 @@ func (o *DSORM) InsertLogsWithBlock(ctx context.Context, logs []Log, block Block
}

func (o *DSORM) insertLogsWithinTx(ctx context.Context, logs []Log, tx sqlutil.DataSource) error {
batchInsertSize := 4000
for i := 0; i < len(logs); i += batchInsertSize {
start, end := i, i+batchInsertSize
if end > len(logs) {
end = len(logs)
}

query := `INSERT INTO evm.logs
const q = `INSERT INTO evm.logs
(evm_chain_id, log_index, block_hash, block_number, block_timestamp, address, event_sig, topics, tx_hash, data, created_at)
VALUES
(:evm_chain_id, :log_index, :block_hash, :block_number, :block_timestamp, :address, :event_sig, :topics, :tx_hash, :data, NOW())
ON CONFLICT DO NOTHING`
return batchInsert(ctx, tx, q, logs, 4000)
}

func batchInsert[T any](ctx context.Context, ds sqlutil.DataSource, query string, objs []T, batchInsertSize int) error {
for i := 0; i < len(objs); i += batchInsertSize {
start, end := i, i+batchInsertSize
if end > len(objs) {
end = len(objs)
}

_, err := tx.NamedExecContext(ctx, query, logs[start:end])
_, err := ds.NamedExecContext(ctx, query, objs[start:end])
if err != nil {
if pkgerrors.Is(err, context.DeadlineExceeded) && batchInsertSize > 500 {
// In case of DB timeouts, try to insert again with a smaller batch upto a limit
Expand Down
Loading
Loading