Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,9 @@ func New(stack *node.Node, config *ethconfig.Config) (*Ethereum, error) {
// The `config.TxPool.PriceLimit` used above doesn't reflect the sanitized/enforced changes
// made in the txpool. Update the `gasTip` explicitly to reflect the enforced value.
eth.txPool.SetGasTip(new(big.Int).SetUint64(params.BorDefaultTxPoolPriceLimit))

// Allow private tx store to check if txs are still in the pool for cleanup
relayService.SetTxPoolChecker(eth.txPool.Has)
}

if !config.TxPool.NoLocals {
Expand Down
52 changes: 47 additions & 5 deletions eth/relay/multiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@ var (
rpcErrorInPreconfMeter = metrics.NewRegisteredMeter("preconfs/rpcerror", nil)
belowThresholdPreconfMeter = metrics.NewRegisteredMeter("preconfs/belowthreshold", nil)
alreadyKnownErrMeter = metrics.NewRegisteredMeter("relay/txalreadyknown", nil)

// Track BP level RPC rejections for all tx submissions
preconfRejectionMeter = metrics.NewRegisteredMeter("relay/bprpc/preconf/error", nil)
privateTxRejectionMeter = metrics.NewRegisteredMeter("relay/bprpc/privatetx/error", nil)
)

// isAlreadyKnownError checks if the error indicates the transaction is already known to the node
Expand All @@ -43,6 +47,10 @@ type multiClient struct {
clients []*rpc.Client // rpc client instances dialed to each block producer
closed atomic.Bool
retryInterval time.Duration // 0 means use privateTxRetryInterval; configurable for testing

rejectionTracker rejectionTracker
reporterDone chan struct{} // closed to signal the reporter goroutine to exit
closeOnce sync.Once
}

func newMultiClient(urls []string) *multiClient {
Expand Down Expand Up @@ -94,9 +102,12 @@ func newMultiClient(urls []string) *multiClient {
}

log.Info("[tx-relay] Initialised rpc client for each block producer", "success", len(clients), "failed", failed)
return &multiClient{
clients: clients,
mc := &multiClient{
clients: clients,
reporterDone: make(chan struct{}),
}
go mc.reportRejections()
return mc
}

type SendTxForPreconfResponse struct {
Expand Down Expand Up @@ -129,6 +140,8 @@ func (mc *multiClient) submitPreconfTx(rawTx []byte) (bool, error) {
preconfOfferedCount.Add(1)
return
}
preconfRejectionMeter.Mark(1)
mc.rejectionTracker.record(err)
setError.Do(func() {
firstErr = err
})
Expand Down Expand Up @@ -194,6 +207,8 @@ func (mc *multiClient) submitPrivateTx(rawTx []byte, hash common.Hash, retry boo
successfulIndices = append(successfulIndices, index)
return
}
privateTxRejectionMeter.Mark(1)
mc.rejectionTracker.record(err)
setError.Do(func() {
firstErr = err
})
Expand Down Expand Up @@ -284,6 +299,8 @@ func (mc *multiClient) retryPrivateTxSubmission(hexTx string, hash common.Hash,
alreadyKnownErrMeter.Mark(1)
return
}
privateTxRejectionMeter.Mark(1)
mc.rejectionTracker.record(err)
mu.Lock()
newFailedIndices = append(newFailedIndices, idx)
mu.Unlock()
Expand Down Expand Up @@ -354,8 +371,33 @@ func (mc *multiClient) checkTxStatus(hash common.Hash) (bool, error) {

// Close closes all rpc client connections
func (mc *multiClient) close() {
mc.closed.Store(true)
for _, client := range mc.clients {
client.Close()
mc.closeOnce.Do(func() {
mc.closed.Store(true)
if mc.reporterDone != nil {
close(mc.reporterDone)
}
for _, client := range mc.clients {
client.Close()
}
})
}

// reportRejections runs in the background and flushes the rejection tracker on a
// fixed interval, emitting one aggregated error log for different error types.
func (mc *multiClient) reportRejections() {
ticker := time.NewTicker(rejectionReportInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
total, counts := mc.rejectionTracker.flush()
log.Info("[tx-relay] BP rejection summary",
"total", total,
"errors", formatRejectionCounts(counts),
)
Comment thread
manav2401 marked this conversation as resolved.
case <-mc.reporterDone:
return
}
}
}
195 changes: 195 additions & 0 deletions eth/relay/multiclient_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package relay
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"sync"
Expand Down Expand Up @@ -1406,3 +1407,197 @@ func TestPrivateTxSubmissionRetry(t *testing.T) {
require.Equal(t, int32(5), txGetterCallCount.Load(), "expected txGetter to be called 5 times during retries")
})
}

// TestRejectionTracker covers the in-memory aggregation used by the BP rejection
// reporter. The production reporter goroutine pulls from this same tracker, so
// verifying record/flush here covers the data path end-to-end.
func TestRejectionTracker(t *testing.T) {
t.Parallel()

t.Run("record groups identical errors and counts total", func(t *testing.T) {
var tr rejectionTracker
errA := fmt.Errorf("nonce too low")
errB := fmt.Errorf("transaction underpriced")

tr.record(errA)
tr.record(errA)
tr.record(errB)
tr.record(errA)

total, counts := tr.flush()
require.Equal(t, uint64(4), total)
require.Equal(t, uint64(3), counts["nonce too low"])
require.Equal(t, uint64(1), counts["transaction underpriced"])
})

t.Run("flush resets state", func(t *testing.T) {
var tr rejectionTracker
tr.record(fmt.Errorf("boom"))

total1, counts1 := tr.flush()
require.Equal(t, uint64(1), total1)
require.NotNil(t, counts1)

total2, counts2 := tr.flush()
require.Equal(t, uint64(0), total2)
require.Nil(t, counts2, "flush should leave the tracker empty")
})

t.Run("nil error is ignored", func(t *testing.T) {
var tr rejectionTracker
tr.record(nil)
total, counts := tr.flush()
require.Equal(t, uint64(0), total)
require.Nil(t, counts)
})

t.Run("cardinality cap diverts overflow into the <other> bucket", func(t *testing.T) {
var tr rejectionTracker
// Fill the map to the cap with unique errors.
for i := 0; i < maxRejectionCategories; i++ {
tr.record(fmt.Errorf("unique error %d", i))
}
// Extra unique errors should land in the overflow bucket, not expand the map.
tr.record(fmt.Errorf("brand new error 1"))
tr.record(fmt.Errorf("brand new error 2"))
// But an error already in the map should still increment its own bucket.
tr.record(fmt.Errorf("unique error 0"))

total, counts := tr.flush()
require.Equal(t, uint64(maxRejectionCategories+3), total)
require.Equal(t, maxRejectionCategories+1, len(counts), "exactly one overflow bucket should be added")
require.Equal(t, uint64(2), counts[rejectionOtherCategoryLabel])
require.Equal(t, uint64(2), counts["unique error 0"])
})

t.Run("concurrent records do not lose count", func(t *testing.T) {
var tr rejectionTracker
var wg sync.WaitGroup
const (
goroutines = 20
perRoutine = 500
expectTotal = goroutines * perRoutine
)
for g := 0; g < goroutines; g++ {
wg.Add(1)
go func() {
defer wg.Done()
for i := 0; i < perRoutine; i++ {
tr.record(fmt.Errorf("e%d", i%3))
}
}()
}
wg.Wait()
total, _ := tr.flush()
require.Equal(t, uint64(expectTotal), total)
})
}

func TestFormatRejectionCounts(t *testing.T) {
t.Parallel()

t.Run("sorted desc, zero-count entries skipped", func(t *testing.T) {
counts := map[string]uint64{
"nonce too low": 10,
"transaction underpriced": 3,
"invalid sender": 3,
"pool full": 25,
"<other>": 0, // must be skipped
}
got := formatRejectionCounts(counts)
require.Equal(t,
`pool full: 25, nonce too low: 10, invalid sender: 3, transaction underpriced: 3`,
got,
)
})

t.Run("empty input yields empty string", func(t *testing.T) {
require.Equal(t, "", formatRejectionCounts(nil))
require.Equal(t, "", formatRejectionCounts(map[string]uint64{"skip": 0}))
})
}

// TestRejectionTrackerWiredToSubmit verifies the full production path: BP
// rejections from both submitPrivateTx and submitPreconfTx flow into the same
// tracker on the multiClient, while "already known" is filtered out in both cases.
func TestRejectionTrackerWiredToSubmit(t *testing.T) {
t.Parallel()

tx := types.NewTransaction(1, common.Address{}, nil, 0, nil, nil)
rawTx, err := tx.MarshalBinary()
require.NoError(t, err)

t.Run("private tx rejection is tracked, already-known is not", func(t *testing.T) {
s1 := newMockRpcServer()
s2 := newMockRpcServer()
defer s1.close()
defer s2.close()

s1.setHandleSendPrivateTx(func(w http.ResponseWriter, id int, params json.RawMessage) {
defaultSendError(w, id, -32000, "tx fee (1.00 ether) exceeds the configured cap (0.50 ether)")
})
s2.setHandleSendPrivateTx(func(w http.ResponseWriter, id int, params json.RawMessage) {
defaultSendError(w, id, -32000, "already known")
})

mc := newMultiClient([]string{s1.server.URL, s2.server.URL})
defer mc.close()

_, _ = mc.submitPrivateTx(rawTx, tx.Hash(), false, nil)

total, counts := mc.rejectionTracker.flush()
require.Equal(t, uint64(1), total, "only the non-already-known rejection should be tracked")
require.Equal(t, uint64(1), counts["tx fee (1.00 ether) exceeds the configured cap (0.50 ether)"])
require.NotContains(t, counts, "already known", "already-known must not be classified as a rejection")
})

t.Run("preconf rejection is tracked, already-known is not", func(t *testing.T) {
s1 := newMockRpcServer()
s2 := newMockRpcServer()
defer s1.close()
defer s2.close()

s1.setHandleSendPreconfTx(func(w http.ResponseWriter, id int, params json.RawMessage) {
defaultSendError(w, id, -32000, "tx fee (1.00 ether) exceeds the configured cap (0.50 ether)")
})
s2.setHandleSendPreconfTx(func(w http.ResponseWriter, id int, params json.RawMessage) {
defaultSendError(w, id, -32000, "already known")
})

mc := newMultiClient([]string{s1.server.URL, s2.server.URL})
defer mc.close()

_, _ = mc.submitPreconfTx(rawTx)

total, counts := mc.rejectionTracker.flush()
require.Equal(t, uint64(1), total, "only the non-already-known rejection should be tracked")
require.Equal(t, uint64(1), counts["tx fee (1.00 ether) exceeds the configured cap (0.50 ether)"])
require.NotContains(t, counts, "already known", "already-known must not be classified as a rejection")
})

t.Run("preconf and private rejections aggregate into the same tracker", func(t *testing.T) {
// Same BP rejects both preconf and private with the same config-mismatch
// error. The tracker should accumulate the count across both submission
// types, while the two separate meters distinguish the source.
s := newMockRpcServer()
defer s.close()

errMsg := "tx fee exceeds the configured cap"
s.setHandleSendPreconfTx(func(w http.ResponseWriter, id int, params json.RawMessage) {
defaultSendError(w, id, -32000, errMsg)
})
s.setHandleSendPrivateTx(func(w http.ResponseWriter, id int, params json.RawMessage) {
defaultSendError(w, id, -32000, errMsg)
})

mc := newMultiClient([]string{s.server.URL})
defer mc.close()

_, _ = mc.submitPreconfTx(rawTx)
_, _ = mc.submitPrivateTx(rawTx, tx.Hash(), false, nil)

total, counts := mc.rejectionTracker.flush()
require.Equal(t, uint64(2), total, "both rejections should be tracked together")
require.Equal(t, uint64(2), counts[errMsg], "same error message should aggregate across submission types")
})
}
Loading
Loading