diff --git a/pkg/client/metrics.go b/pkg/client/metrics.go new file mode 100644 index 0000000000..5893358029 --- /dev/null +++ b/pkg/client/metrics.go @@ -0,0 +1,70 @@ +package client + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" +) + +type rpcClientMetrics struct { + callsTotal metric.Int64Counter + callsSuccess metric.Int64Counter + callsFailed metric.Int64Counter +} + +func newRPCClientMetrics() (*rpcClientMetrics, error) { + callsTotal, err := beholder.GetMeter().Int64Counter("evm_pool_rpc_node_calls_total") + if err != nil { + return nil, fmt.Errorf("failed to register rpc calls total metric: %w", err) + } + + callsSuccess, err := beholder.GetMeter().Int64Counter("evm_pool_rpc_node_calls_success") + if err != nil { + return nil, fmt.Errorf("failed to register rpc calls success metric: %w", err) + } + + callsFailed, err := beholder.GetMeter().Int64Counter("evm_pool_rpc_node_calls_failed") + if err != nil { + return nil, fmt.Errorf("failed to register rpc calls failed metric: %w", err) + } + + return &rpcClientMetrics{ + callsTotal: callsTotal, + callsSuccess: callsSuccess, + callsFailed: callsFailed, + }, nil +} + +func (m *rpcClientMetrics) IncrementTotal(ctx context.Context, chainID, nodeName, rpcDomain, callName string) { + m.callsTotal.Add(ctx, 1, metric.WithAttributes( + attribute.String("chainFamily", "EVM"), + attribute.String("chainID", chainID), + attribute.String("nodeName", nodeName), + attribute.String("rpcDomain", rpcDomain), + attribute.String("callName", callName), + )) +} + +func (m *rpcClientMetrics) IncrementSuccess(ctx context.Context, chainID, nodeName, rpcDomain, callName string) { + m.callsSuccess.Add(ctx, 1, metric.WithAttributes( + attribute.String("chainFamily", "EVM"), + attribute.String("chainID", chainID), + attribute.String("nodeName", nodeName), + attribute.String("rpcDomain", rpcDomain), + attribute.String("callName", callName), + )) +} + +func (m *rpcClientMetrics) IncrementFailed(ctx context.Context, chainID, nodeName, rpcDomain, callName string) { + m.callsFailed.Add(ctx, 1, metric.WithAttributes( + attribute.String("chainFamily", "EVM"), + attribute.String("chainID", chainID), + attribute.String("nodeName", nodeName), + attribute.String("rpcDomain", rpcDomain), + attribute.String("callName", callName), + )) +} diff --git a/pkg/client/metrics_test.go b/pkg/client/metrics_test.go new file mode 100644 index 0000000000..0c62be587a --- /dev/null +++ b/pkg/client/metrics_test.go @@ -0,0 +1,35 @@ +package client + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestNewRPCClientMetrics(t *testing.T) { + m, err := newRPCClientMetrics() + require.NoError(t, err) + require.NotNil(t, m) + assert.NotNil(t, m.callsTotal) + assert.NotNil(t, m.callsSuccess) + assert.NotNil(t, m.callsFailed) +} + +func TestRPCClientMetrics_Increment(t *testing.T) { + m, err := newRPCClientMetrics() + require.NoError(t, err) + + ctx := context.Background() + + assert.NotPanics(t, func() { + m.IncrementTotal(ctx, "1", "node-1", "rpc.example.com", "eth_call") + }) + assert.NotPanics(t, func() { + m.IncrementSuccess(ctx, "1", "node-1", "rpc.example.com", "eth_call") + }) + assert.NotPanics(t, func() { + m.IncrementFailed(ctx, "1", "node-1", "rpc.example.com", "eth_call") + }) +} diff --git a/pkg/client/rpc_client.go b/pkg/client/rpc_client.go index 10a545bf7f..a1977f7d80 100644 --- a/pkg/client/rpc_client.go +++ b/pkg/client/rpc_client.go @@ -107,6 +107,8 @@ type RPCClient struct { safeDepth uint32 externalRequestMaxResponseSize uint32 + beholderMetrics *rpcClientMetrics + ws atomic.Pointer[rawclient] limitedWS atomic.Pointer[rawclient] // ws client with limited response size http atomic.Pointer[rawclient] @@ -167,6 +169,13 @@ func NewRPCClient( ) r.rpcLog = logger.Sugared(lggr).Named("RPC") + bm, bmErr := newRPCClientMetrics() + if bmErr != nil { + lggr.Warnw("Failed to initialize beholder metrics for RPC client", "err", bmErr) + } else { + r.beholderMetrics = bm + } + if httpuri == nil && externalRequestMaxResponseSize > 0 { lggr.Error("RPC client is configured with only WebSocket URL. If this CL Node serves external requests, it must also have an HTTP URL configured. Otherwise, there is a serious DDoS risk.") } @@ -271,6 +280,7 @@ func (r *RPCClient) String() string { } func (r *RPCClient) logResult( + ctx context.Context, lggr logger.Logger, err error, callDuration time.Duration, @@ -279,18 +289,28 @@ func (r *RPCClient) logResult( results ...interface{}, ) { lggr = logger.With(lggr, "duration", callDuration, "rpcDomain", rpcDomain, "callName", callName) - promEVMPoolRPCNodeCalls.WithLabelValues(r.chainID.String(), r.name).Inc() + chainID := r.chainID.String() + promEVMPoolRPCNodeCalls.WithLabelValues(chainID, r.name).Inc() if err == nil { - promEVMPoolRPCNodeCallsSuccess.WithLabelValues(r.chainID.String(), r.name).Inc() + promEVMPoolRPCNodeCallsSuccess.WithLabelValues(chainID, r.name).Inc() logger.Sugared(lggr).Tracew(fmt.Sprintf("evmclient.Client#%s RPC call success", callName), results...) } else { - promEVMPoolRPCNodeCallsFailed.WithLabelValues(r.chainID.String(), r.name).Inc() + promEVMPoolRPCNodeCallsFailed.WithLabelValues(chainID, r.name).Inc() lggr.Debugw( fmt.Sprintf("evmclient.Client#%s RPC call failure", callName), append(results, "err", err)..., ) } + if r.beholderMetrics != nil { + r.beholderMetrics.IncrementTotal(ctx, chainID, r.name, rpcDomain, callName) + if err == nil { + r.beholderMetrics.IncrementSuccess(ctx, chainID, r.name, rpcDomain, callName) + } else { + r.beholderMetrics.IncrementFailed(ctx, chainID, r.name, rpcDomain, callName) + } + } + metrics.RPCCallLatency. WithLabelValues( metrics.EVM, // chain family @@ -343,7 +363,7 @@ func (r *RPCClient) CallContext(ctx context.Context, result interface{}, method err := r.wrapRPCClientError(client.rpc.CallContext(ctx, result, method, args...)) duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "CallContext") + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "CallContext") return err } @@ -382,7 +402,7 @@ func (r *RPCClient) BatchCallContext(rootCtx context.Context, b []rpc.BatchElem) err := r.wrapRPCClientError(client.rpc.BatchCallContext(ctx, b)) duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "BatchCallContext") + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "BatchCallContext") if err != nil { return err } @@ -458,7 +478,7 @@ func (r *RPCClient) SubscribeToHeads(ctx context.Context) (ch <-chan *evmtypes.H lggr.Debug("RPC call: evmclient.Client#EthSubscribe") defer func() { duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "EthSubscribe") + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "EthSubscribe") err = r.wrapRPCClientError(err) }() @@ -509,7 +529,7 @@ func (r *RPCClient) TransactionReceiptGethWithOpts(ctx context.Context, txHash c err = r.wrapRPCClientError(err) duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "TransactionReceipt", + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "TransactionReceipt", "receipt", receipt, ) @@ -537,7 +557,7 @@ func (r *RPCClient) TransactionByHashWithOpts(ctx context.Context, txHash common err = r.wrapRPCClientError(err) duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "TransactionByHash", + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "TransactionByHash", "receipt", tx, ) @@ -555,7 +575,7 @@ func (r *RPCClient) HeaderByNumber(ctx context.Context, number *big.Int) (header err = r.wrapRPCClientError(err) duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "HeaderByNumber", "header", header) + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "HeaderByNumber", "header", header) return } @@ -571,7 +591,7 @@ func (r *RPCClient) HeaderByHash(ctx context.Context, hash common.Hash) (header err = r.wrapRPCClientError(err) duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "HeaderByHash", + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "HeaderByHash", "header", header, ) @@ -729,7 +749,7 @@ func (r *RPCClient) ethGetBlockByNumber(ctx context.Context, number string, resu err = r.wrapRPCClientError(client.rpc.CallContext(ctx, result, method, args...)) duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "CallContext") + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "CallContext") return err } @@ -757,7 +777,7 @@ func (r *RPCClient) BlockByHashGeth(ctx context.Context, hash common.Hash) (bloc err = r.wrapRPCClientError(err) duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "BlockByHash", + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "BlockByHash", "block", block, ) @@ -775,7 +795,7 @@ func (r *RPCClient) BlockByNumberGeth(ctx context.Context, number *big.Int) (blo err = r.wrapRPCClientError(err) duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "BlockByNumber", + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "BlockByNumber", "block", block, ) @@ -797,7 +817,7 @@ func (r *RPCClient) SendTransaction(ctx context.Context, tx *types.Transaction) err := r.wrapRPCClientError(client.geth.SendTransaction(ctx, tx)) duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "SendTransaction") + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "SendTransaction") return struct{}{}, ClassifySendError(err, r.clientErrors, logger.Sugared(logger.Nop()), tx, common.Address{}, r.chainType.IsL2()), err } @@ -841,7 +861,7 @@ func (r *RPCClient) PendingSequenceAt(ctx context.Context, account common.Addres err = r.wrapRPCClientError(err) duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "PendingNonceAt", + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "PendingNonceAt", "nonce", nonce, ) @@ -869,7 +889,7 @@ func (r *RPCClient) NonceAt(ctx context.Context, account common.Address, blockNu err = r.wrapRPCClientError(err) duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "NonceAt", + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "NonceAt", "nonce", nonce, ) @@ -887,7 +907,7 @@ func (r *RPCClient) PendingCodeAt(ctx context.Context, account common.Address) ( err = r.wrapRPCClientError(err) duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "PendingCodeAt", + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "PendingCodeAt", "code", code, ) @@ -905,7 +925,7 @@ func (r *RPCClient) CodeAt(ctx context.Context, account common.Address, blockNum err = r.wrapRPCClientError(err) duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "CodeAt", + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "CodeAt", "code", code, ) @@ -930,7 +950,7 @@ func (r *RPCClient) EstimateGas(ctx context.Context, c interface{}) (gas uint64, err = r.wrapRPCClientError(err) duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "EstimateGas", + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "EstimateGas", "gas", gas, ) @@ -948,7 +968,7 @@ func (r *RPCClient) SuggestGasPrice(ctx context.Context) (price *big.Int, err er err = r.wrapRPCClientError(err) duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "SuggestGasPrice", + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "SuggestGasPrice", "price", price, ) @@ -971,7 +991,7 @@ func (r *RPCClient) CallContract(ctx context.Context, msg interface{}, blockNumb } duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "CallContract", + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "CallContract", "val", val, ) @@ -1025,7 +1045,7 @@ func (r *RPCClient) PendingCallContract(ctx context.Context, msg interface{}) (v } duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "PendingCallContract", + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "PendingCallContract", "val", val, ) @@ -1049,7 +1069,7 @@ func (r *RPCClient) BlockNumber(ctx context.Context) (height uint64, err error) err = r.wrapRPCClientError(err) duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "BlockNumber", + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "BlockNumber", "height", height, ) @@ -1067,7 +1087,7 @@ func (r *RPCClient) BalanceAt(ctx context.Context, account common.Address, block err = r.wrapRPCClientError(err) duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "BalanceAt", + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "BalanceAt", "balance", balance, ) @@ -1115,7 +1135,7 @@ func (r *RPCClient) FeeHistory(ctx context.Context, blockCount uint64, lastBlock err = r.wrapRPCClientError(err) duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "FeeHistory", + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "FeeHistory", "feeHistory", feeHistory, ) @@ -1185,7 +1205,7 @@ func (r *RPCClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) (l [ err = r.makeLogsValid(l) } duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "FilterLogs", + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "FilterLogs", "log", l, ) @@ -1233,7 +1253,7 @@ func (r *RPCClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQu start := time.Now() defer func() { duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "SubscribeFilterLogs") + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "SubscribeFilterLogs") err = r.wrapRPCClientError(err) }() sub := newSubForwarder(ch, r.makeLogValid, r.wrapRPCClientError) @@ -1261,7 +1281,7 @@ func (r *RPCClient) SuggestGasTipCap(ctx context.Context) (tipCap *big.Int, err err = r.wrapRPCClientError(err) duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "SuggestGasTipCap", + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "SuggestGasTipCap", "tipCap", tipCap, ) @@ -1350,7 +1370,7 @@ func (r *RPCClient) IsSyncing(ctx context.Context) (bool, error) { err = r.wrapRPCClientError(err) duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), "BlockNumber", + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), "BlockNumber", "syncProgress", syncProgress, ) @@ -1460,7 +1480,7 @@ func (r *RPCClient) doWithConfidence(ctx context.Context, request rpc.BatchElem, defer func() { duration := time.Since(start) - r.logResult(lggr, err, duration, r.getRPCDomain(), request.Method+"WithConfidence", + r.logResult(ctx, lggr, err, duration, r.getRPCDomain(), request.Method+"WithConfidence", "result", request.Result, ) }()