Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions common/changes/@subsquid/rpc-client/master_2026-06-26-14-18.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@subsquid/rpc-client",
"comment": "",
"type": "none"
}
],
"packageName": "@subsquid/rpc-client"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@subsquid/util-internal-dump-cli",
"comment": "",
"type": "none"
}
],
"packageName": "@subsquid/util-internal-dump-cli"
}
26 changes: 14 additions & 12 deletions util/rpc-client/src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import assert from 'assert'
import {RetryError, RpcConnectionError, RpcError} from './errors'
import {Connection, HttpHeaders, RpcCall, RpcErrorInfo, RpcNotification, RpcRequest, RpcResponse} from './interfaces'
import {RateMeter} from './rate'
import {MethodMetrics} from './method-metrics'
import {Subscription, SubscriptionHandle, Subscriptions} from './subscriptions'
import {HttpConnection} from './transport/http'
import {WsConnection} from './transport/ws'
Expand Down Expand Up @@ -62,16 +63,6 @@ export interface RpcClientOptions {
log?: Logger | null
}

// Add interface for RPC metrics
export interface RpcMetrics {
url: string
requestsServed: number
connectionErrors: number
notificationsReceived: number
avg_response_time: number
}


export interface RpcMetrics {
url: string
requestsServed: number
Expand Down Expand Up @@ -130,6 +121,7 @@ export class RpcClient {
private connectionErrorsInRow = 0
private connectionErrors = 0
private requestsServed = 0
private requestsByMethod = new MethodMetrics()
private notificationsReceived = 0
private totalResponseTime = 0
private backoffEpoch = 0
Expand Down Expand Up @@ -204,12 +196,14 @@ export class RpcClient {
requestsServed: this.requestsServed,
connectionErrors: this.connectionErrors,
notificationsReceived: this.notificationsReceived,
// FIXME: only one of these metrics should remain; decide which to keep
avg_response_time: this.requestsServed > 0 ? this.totalResponseTime / this.requestsServed : 0,
avgResponseTime: this.requestsServed > 0 ? this.totalResponseTime / this.requestsServed : 0,
}
}

getMethodMetrics() {
return this.requestsByMethod.getMethodMetrics()
}

private onNotification(msg: RpcNotification): void {
this.notificationsReceived += 1
this.log?.debug({rpcMsg: msg}, 'rpc notification')
Expand Down Expand Up @@ -391,17 +385,25 @@ export class RpcClient {
let call = req.call
this.log?.debug({rpcBatchId: [call[0].id, last(call).id]}, 'rpc send')
promise = this.con.batchCall(call, req.timeout).then(res => {
this.requestsByMethod.recordBatch(call, res)
let result = new Array(res.length)
for (let i = 0; i < res.length; i++) {
result[i] = this.receiveResult(call[i], res[i], req.validateResult, req.validateError)
}
return result
}, err => {
this.requestsByMethod.recordBatchError(call, err)
throw err
})
} else {
let call = req.call
this.log?.debug({rpcId: call.id}, 'rpc send')
promise = this.con.call(call, req.timeout).then(res => {
this.requestsByMethod.record(call.method, res)
return this.receiveResult(call, res, req.validateResult, req.validateError)
}, err => {
this.requestsByMethod.recordError(call, err)
throw err
})
}
promise.then(result => {
Expand Down
64 changes: 64 additions & 0 deletions util/rpc-client/src/method-metrics.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import {HttpError} from '@subsquid/http-client'
import {RpcError, RpcProtocolError} from './errors'
import {RpcRequest, RpcResponse} from './interfaces'


/**
* Per-method request tally counted by batch element, keyed by a triplet of
* labels: RPC method name, HTTP status code, and JSON-RPC error code.
*/
export class MethodMetrics {
private items: Record<string, Record<string, Record<string, number>>> = {}

private inc(method: string, httpCode: string, rpcCode: string): void {
let byHttp = this.items[method]
if (!byHttp) byHttp = this.items[method] = {}
let byRpc = byHttp[httpCode]
if (!byRpc) byRpc = byHttp[httpCode] = {}
byRpc[rpcCode] = (byRpc[rpcCode] || 0) + 1
}

record(method: string, res: RpcResponse): void {
let rpcCode = res.error ? res.error.code.toString() : ''
this.inc(method, '200', rpcCode)
}

recordBatch(calls: RpcRequest[], responses: RpcResponse[]): void {
for (let i = 0; i < calls.length; i++) {
this.record(calls[i].method, responses[i])
}
}

recordError(call: RpcRequest, err: unknown): void {
let {httpCode, rpcCode} = extractTransportLabels(err)
this.inc(call.method, httpCode, rpcCode)
}

recordBatchError(calls: RpcRequest[], err: unknown): void {
let {httpCode, rpcCode} = extractTransportLabels(err)
for (let i = 0; i < calls.length; i++) {
this.inc(calls[i].method, httpCode, rpcCode)
}
}

*getMethodMetrics() {
for (let [method, byHttp] of Object.entries(this.items)) {
for (let [httpCode, byRpc] of Object.entries(byHttp)) {
for (let [rpcCode, count] of Object.entries(byRpc)) {
yield {method, httpCode, rpcCode, count}
}
}
}
}
}


function extractTransportLabels(err: unknown) {
if (err instanceof HttpError) {
return {httpCode: err.response.status.toString(), rpcCode: ''}
}
if (err instanceof RpcError || err instanceof RpcProtocolError) {
return {httpCode: '200', rpcCode: err.code.toString()}
}
return {httpCode: '', rpcCode: ''}
}
23 changes: 18 additions & 5 deletions util/util-internal-dump-cli/src/prometheus.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export class PrometheusServer {
private rpcRequestsServedTotal: Counter
private rpcAvgResponseTimeSeconds: Gauge
private rpcConnectionErrorsTotal: Counter
private rpcMethodCallsTotal: Counter
private s3RequestsCounter: Counter
private latestReceivedBlockNumberGauge: Gauge
private latestReceivedBlockTimestampGauge: Gauge
Expand Down Expand Up @@ -126,11 +127,6 @@ export class PrometheusServer {
registers: [this.registry],
collect() {
const metrics = rpc.getMetrics()

this.set({
url: metrics.url,
}, metrics.avg_response_time)

this.set({
url: metrics.url,
}, metrics.avgResponseTime)
Expand All @@ -150,6 +146,23 @@ export class PrometheusServer {
}
});

this.rpcMethodCallsTotal = new Counter({
name: 'sqd_rpc_method_calls_total',
help: 'Total number of RPC requests by method, HTTP code, and RPC code (counted by batch element)',
labelNames: ['method', 'http_code', 'rpc_code'],
registers: [this.registry],
collect() {
this.reset()
for (let item of rpc.getMethodMetrics()) {
this.inc({
method: item.method,
http_code: item.httpCode,
rpc_code: item.rpcCode
}, item.count)
}
}
});

this.s3RequestsCounter = new Counter({
name: 'sqd_s3_request_count',
help: 'Number of s3 requests made',
Expand Down
Loading