From 1fc8cff777efc512f55f8493073ada651ec3bb0a Mon Sep 17 00:00:00 2001 From: MuRong Date: Wed, 26 Feb 2025 12:27:45 +0800 Subject: [PATCH 1/3] fix(reku): edge cases --- packages/orap/tests/test-case.ts | 35 ++++++++ .../src/event/crosschecker/autochecker.ts | 16 +++- .../src/event/crosschecker/basechecker.ts | 18 ++++- packages/reku/src/provider/provider.ts | 80 +++++++++++++++---- 4 files changed, 126 insertions(+), 23 deletions(-) create mode 100644 packages/orap/tests/test-case.ts diff --git a/packages/orap/tests/test-case.ts b/packages/orap/tests/test-case.ts new file mode 100644 index 0000000..6d10241 --- /dev/null +++ b/packages/orap/tests/test-case.ts @@ -0,0 +1,35 @@ +import dotenv from 'dotenv' +import { RekuProviderManager } from '@ora-io/reku' +// import { sleep } from '@ora-io/utils' +import { startDemo } from '../examples/declarativeDemo/app' + +dotenv.config({ path: './packages/orap/tests/.env' }) + +const chain = 'mainnet' + +const wsProvider: RekuProviderManager = new RekuProviderManager( + process.env[`${chain.toUpperCase()}_WSS`]!, + { + // heartbeatInterval: 100, + disabledHeartbeat: true, + }, +) +const httpProvider: RekuProviderManager = new RekuProviderManager( + process.env[`${chain.toUpperCase()}_HTTP`]!, + { + // heartbeatInterval: 500, + }, +) +const storeConfig = { port: parseInt(process.env.REDIS_PORT!), host: process.env.REDIS_HOST } + +setTimeout(async () => { + // for (let i = 0; i < 50; i++) { + // wsProvider.reconnect() + // await sleep(1000) + // } + // setInterval(() => { + // httpProvider.reconnect() + // }, 2000) +}, 10000 * 2) + +startDemo({ wsProvider, httpProvider }, storeConfig) diff --git a/packages/reku/src/event/crosschecker/autochecker.ts b/packages/reku/src/event/crosschecker/autochecker.ts index 284ecbb..e953cb0 100644 --- a/packages/reku/src/event/crosschecker/autochecker.ts +++ b/packages/reku/src/event/crosschecker/autochecker.ts @@ -58,7 +58,11 @@ export class AutoCrossChecker extends BaseCrossChecker { this.cache = new CrossCheckerCacheManager(options?.store, { keyPrefix: options?.storeKeyPrefix, ttl: options?.storeTtl }) - let latestBlockNum = await timeoutWithRetry(() => this.provider.provider?.getBlockNumber(), 15 * 1000, 3) + let latestBlockNum = await timeoutWithRetry(() => { + if (!this.provider || !this.provider.provider) + throw new Error('provider not ready') + return this.provider.provider?.getBlockNumber() + }, 15 * 1000, 3) // resume checkpoint priority: options.fromBlock > cache > latestBlockNum + 1 const defaultInitCheckpoint = await this.cache.getCheckpoint() ?? (latestBlockNum) @@ -87,7 +91,11 @@ export class AutoCrossChecker extends BaseCrossChecker { } const waitNextCrosscheck = async (): Promise => { - latestBlockNum = await timeoutWithRetry(() => this.provider.provider?.getBlockNumber(), 15 * 1000, 3) + latestBlockNum = await timeoutWithRetry(() => { + if (!this.provider || !this.provider.provider) + throw new Error('provider not ready') + return this.provider.provider?.getBlockNumber() + }, 15 * 1000, 3) // If auto-follow is enabled, update toBlock and check block range if (options.autoFollowLatestBlock) { @@ -145,7 +153,9 @@ export class AutoCrossChecker extends BaseCrossChecker { else { debug('Because the latest block %d is too old, skip this cross check', latestBlockNum) } - return endingCondition() + const end = endingCondition() + debug('polling ending condition: %s', end) + return end }, pollingInterval) } diff --git a/packages/reku/src/event/crosschecker/basechecker.ts b/packages/reku/src/event/crosschecker/basechecker.ts index d590348..cf7cae0 100644 --- a/packages/reku/src/event/crosschecker/basechecker.ts +++ b/packages/reku/src/event/crosschecker/basechecker.ts @@ -35,7 +35,11 @@ export class BaseCrossChecker { // define from, to // TODO: use blockNumber for performance - const block = await timeoutWithRetry(() => this.provider.provider.getBlock('latest'), 15 * 1000, 3) + const block = await timeoutWithRetry(() => { + if (!this.provider || !this.provider.provider) + throw new Error('provider not ready') + return this.provider.provider.getBlock('latest') + }, 15 * 1000, 3) if (!block) { console.warn('crosscheck failed to get latest block') return @@ -58,7 +62,11 @@ export class BaseCrossChecker { ccfOptions: CrossCheckFromParam, ) { // TODO: use blockNumber for performance - const block = await timeoutWithRetry(() => this.provider.provider.getBlock('latest'), 15 * 1000, 3) + const block = await timeoutWithRetry(() => { + if (!this.provider || !this.provider.provider) + throw new Error('provider not ready') + return this.provider.provider.getBlock('latest') + }, 15 * 1000, 3) if (!block) { console.warn('crosscheck failed to get latest block') return @@ -121,7 +129,11 @@ export class BaseCrossChecker { ...(topics && { topics }), } if (this.provider.provider) { - const logs = await timeoutWithRetry(() => this.provider.provider.getLogs(params), 15 * 1000, 3) + const logs = await timeoutWithRetry(() => { + if (!this.provider || !this.provider.provider) + throw new Error('provider not ready') + return this.provider.provider.getLogs(params) + }, 15 * 1000, 3) // get ignoreLogs keys const ignoreLogs = options.ignoreLogs diff --git a/packages/reku/src/provider/provider.ts b/packages/reku/src/provider/provider.ts index 1973e9d..6de951f 100644 --- a/packages/reku/src/provider/provider.ts +++ b/packages/reku/src/provider/provider.ts @@ -1,8 +1,9 @@ import { EventEmitter } from 'node:events' import type { InterfaceAbi } from 'ethers' import { Interface, WebSocketProvider, ethers } from 'ethers' -import type { ErrorEvent, WebSocket } from 'ws' -import type { ContractAddress } from '@ora-io/utils' +import { WebSocket } from 'ws' +import type { ErrorEvent } from 'ws' +import { type ContractAddress, isInstanceof, to } from '@ora-io/utils' import { debug } from '../debug' import { RekuContractManager } from './contract' @@ -39,13 +40,17 @@ export class RekuProviderManager { } connect() { - const url = new URL(this.providerUrl) - if (url.protocol === 'ws:' || url.protocol === 'wss:') + if (this.isWebSocketProviderUrl) this._provider = new ethers.WebSocketProvider(this.providerUrl) else this._provider = new ethers.JsonRpcProvider(this.providerUrl) } + get isWebSocketProviderUrl() { + const url = new URL(this.providerUrl) + return url.protocol === 'ws:' || url.protocol === 'wss:' + } + get provider() { return this._provider as ethers.JsonRpcProvider | WebSocketProvider } @@ -54,6 +59,16 @@ export class RekuProviderManager { return this._contracts } + get websocket() { + if (isInstanceof(this._provider, ethers.WebSocketProvider)) + return this._provider.websocket + return undefined + } + + get destroyed() { + return this._provider?.destroyed + } + addContract(address: ContractAddress, contract: ethers.Contract): RekuContractManager | undefined addContract(address: ContractAddress, abi: Interface | InterfaceAbi): RekuContractManager | undefined addContract(address: ContractAddress, abi: Interface | InterfaceAbi | ethers.Contract): RekuContractManager | undefined { @@ -156,7 +171,18 @@ export class RekuProviderManager { socket.onerror = null debug('remove all listeners of websocket provider') } - this._provider?.destroy() + debug('reconnect destroyed: %s', this._provider?.destroyed) + if (this._provider && !this._provider.destroyed) { + if (isInstanceof(this._provider, ethers.WebSocketProvider)) { + debug('reconnect websocket readyState: %s', this.websocket?.readyState) + if (this.websocket?.readyState !== WebSocket.CONNECTING) + to(Promise.resolve(this._provider.destroy())) + } + else { + to(Promise.resolve(this._provider.destroy())) + } + } + this._provider = undefined setTimeout(() => { @@ -186,21 +212,41 @@ export class RekuProviderManager { if (this._options?.disabledHeartbeat) return debug('start heartbeat') - this._heartbeatTimer = setInterval(() => { - debug('heartbeat running...') - debug('heartbeat has provider: %s', !!this._provider) - this._provider?.send('net_version', []) - .then((res) => { - debug('heartbeat response: %s', res) - }) - .catch((err) => { - this.reconnect() - this._event?.emit('error', err) - debug('heartbeat error: %s', err) - }) + this._heartbeatTimer = setInterval(async () => { + if (!this.destroyed) { + debug('heartbeat running...') + const hasProvider = this._hasProvider() + debug('heartbeat has provider: %s', hasProvider) + this._provider?.send('net_version', []) + .then((res) => { + debug('heartbeat response: %s', res) + }) + .catch((err) => { + this.reconnect() + this._event?.emit('error', err) + debug('heartbeat error: %s', err) + }) + .finally(() => { + debug('heartbeat finally') + }) + } + else { + debug('heartbeat destroyed') + } }, this._heartbeatInterval) } + private _hasProvider() { + const hasProvider = !!this._provider && !!this._provider.provider + let isInstance = false + if (this.isWebSocketProviderUrl) + isInstance = isInstanceof(this._provider, ethers.WebSocketProvider) && isInstanceof(this._provider.provider, ethers.WebSocketProvider) + else + isInstance = isInstanceof(this._provider, ethers.JsonRpcProvider) && isInstanceof(this._provider.provider, ethers.JsonRpcProvider) + + return hasProvider && isInstance && !this._provider?.destroyed + } + private _clearHeartbeat() { if (this._heartbeatTimer) { debug('clear heartbeat') From c9a2b1e525deeaf99fe3c0c27bf7951e928a4289 Mon Sep 17 00:00:00 2001 From: MuRong Date: Wed, 26 Feb 2025 12:28:05 +0800 Subject: [PATCH 2/3] feat(utils): upgrade utils --- packages/utils/package.json | 2 +- pnpm-lock.yaml | 9 +++++++-- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/packages/utils/package.json b/packages/utils/package.json index 5cc6174..86955e5 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -43,7 +43,7 @@ "ethers": ">=6.13.0" }, "dependencies": { - "@murongg/utils": "^0.1.28", + "@murongg/utils": "^0.2.0", "cache-manager": "5.7.6", "cache-manager-ioredis-yet": "2.1.1", "debug": "^4.3.7", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index db69681..fd06ca7 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -139,8 +139,8 @@ importers: packages/utils: dependencies: '@murongg/utils': - specifier: ^0.1.28 - version: 0.1.32 + specifier: ^0.2.0 + version: 0.2.0 cache-manager: specifier: 5.7.6 version: 5.7.6 @@ -911,6 +911,9 @@ packages: '@murongg/utils@0.1.32': resolution: {integrity: sha512-l+SRRVVCn8V2YjTgpyF0pVsav9+S4YENMSn5AjonXT/nen+QhcWJmZW1kXfEsBsRoQmDR4a8A5+PbKEOXe9GYw==} + '@murongg/utils@0.2.0': + resolution: {integrity: sha512-AW7QYoC0q6YrYgNS1dF/2sUvSzYPGMxd2p9SS3AcOmZ74j5Q7Ge5U/V5/LTJ7UZBH67XC+kMYdlgiX5cv9Kz3A==} + '@noble/curves@1.2.0': resolution: {integrity: sha512-oYclrNgRaM9SsBUBVbb8M6DTV7ZHRTKugureoYEncY5c65HOmRzvSiTE3y5CYaPYJA/GVkrhXEoF0M3Ya9PMnw==} @@ -4220,6 +4223,8 @@ snapshots: '@murongg/utils@0.1.32': {} + '@murongg/utils@0.2.0': {} + '@noble/curves@1.2.0': dependencies: '@noble/hashes': 1.3.2 From f59394b15979300726c4414ddfbf513c5bca7427 Mon Sep 17 00:00:00 2001 From: MuRong Date: Wed, 26 Feb 2025 12:29:46 +0800 Subject: [PATCH 3/3] chore: release v0.3.4 --- package.json | 2 +- packages/orap/package.json | 2 +- packages/reku/package.json | 2 +- packages/utils/package.json | 2 +- 4 files changed, 4 insertions(+), 4 deletions(-) diff --git a/package.json b/package.json index d1be551..9db6342 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ora-stack", - "version": "0.3.3", + "version": "0.3.4", "private": true, "packageManager": "pnpm@9.7.0", "description": "", diff --git a/packages/orap/package.json b/packages/orap/package.json index 4ea1e84..229f912 100644 --- a/packages/orap/package.json +++ b/packages/orap/package.json @@ -1,7 +1,7 @@ { "name": "@ora-io/orap", "type": "module", - "version": "0.3.3", + "version": "0.3.4", "packageManager": "pnpm@8.10.5", "description": "", "author": "Norman (nom4dv3), MuRong", diff --git a/packages/reku/package.json b/packages/reku/package.json index e310533..d8e9ca2 100644 --- a/packages/reku/package.json +++ b/packages/reku/package.json @@ -1,7 +1,7 @@ { "name": "@ora-io/reku", "type": "module", - "version": "0.3.4", + "version": "0.3.5", "packageManager": "pnpm@8.10.5", "description": "", "author": "Norman (nom4dv3), MuRong", diff --git a/packages/utils/package.json b/packages/utils/package.json index 86955e5..b65d78f 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -1,7 +1,7 @@ { "name": "@ora-io/utils", "type": "module", - "version": "0.3.2", + "version": "0.3.3", "packageManager": "pnpm@8.10.5", "description": "", "author": "Norman (nom4dv3), MuRong",