Skip to content
Merged

Dev #33

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ora-stack",
"version": "0.3.3",
"version": "0.3.4",
"private": true,
"packageManager": "pnpm@9.7.0",
"description": "",
Expand Down
2 changes: 1 addition & 1 deletion packages/orap/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@ora-io/orap",
"type": "module",
"version": "0.3.3",
"version": "0.3.4",
"packageManager": "pnpm@8.10.5",
"description": "",
"author": "Norman (nom4dv3), MuRong",
Expand Down
35 changes: 35 additions & 0 deletions packages/orap/tests/test-case.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import dotenv from 'dotenv'
import { RekuProviderManager } from '@ora-io/reku'
// import { sleep } from '@ora-io/utils'
import { startDemo } from '../examples/declarativeDemo/app'

dotenv.config({ path: './packages/orap/tests/.env' })

const chain = 'mainnet'

const wsProvider: RekuProviderManager = new RekuProviderManager(
process.env[`${chain.toUpperCase()}_WSS`]!,
{
// heartbeatInterval: 100,
disabledHeartbeat: true,
},
)
const httpProvider: RekuProviderManager = new RekuProviderManager(
process.env[`${chain.toUpperCase()}_HTTP`]!,
{
// heartbeatInterval: 500,
},
)
const storeConfig = { port: parseInt(process.env.REDIS_PORT!), host: process.env.REDIS_HOST }

setTimeout(async () => {
// for (let i = 0; i < 50; i++) {
// wsProvider.reconnect()
// await sleep(1000)
// }
// setInterval(() => {
// httpProvider.reconnect()
// }, 2000)
}, 10000 * 2)

startDemo({ wsProvider, httpProvider }, storeConfig)
2 changes: 1 addition & 1 deletion packages/reku/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@ora-io/reku",
"type": "module",
"version": "0.3.4",
"version": "0.3.5",
"packageManager": "pnpm@8.10.5",
"description": "",
"author": "Norman (nom4dv3), MuRong",
Expand Down
16 changes: 13 additions & 3 deletions packages/reku/src/event/crosschecker/autochecker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ export class AutoCrossChecker extends BaseCrossChecker {

this.cache = new CrossCheckerCacheManager(options?.store, { keyPrefix: options?.storeKeyPrefix, ttl: options?.storeTtl })

let latestBlockNum = await timeoutWithRetry(() => this.provider.provider?.getBlockNumber(), 15 * 1000, 3)
let latestBlockNum = await timeoutWithRetry(() => {
if (!this.provider || !this.provider.provider)
throw new Error('provider not ready')
return this.provider.provider?.getBlockNumber()
}, 15 * 1000, 3)

// resume checkpoint priority: options.fromBlock > cache > latestBlockNum + 1
const defaultInitCheckpoint = await this.cache.getCheckpoint() ?? (latestBlockNum)
Expand Down Expand Up @@ -87,7 +91,11 @@ export class AutoCrossChecker extends BaseCrossChecker {
}

const waitNextCrosscheck = async (): Promise<boolean> => {
latestBlockNum = await timeoutWithRetry(() => this.provider.provider?.getBlockNumber(), 15 * 1000, 3)
latestBlockNum = await timeoutWithRetry(() => {
if (!this.provider || !this.provider.provider)
throw new Error('provider not ready')
return this.provider.provider?.getBlockNumber()
}, 15 * 1000, 3)

// If auto-follow is enabled, update toBlock and check block range
if (options.autoFollowLatestBlock) {
Expand Down Expand Up @@ -145,7 +153,9 @@ export class AutoCrossChecker extends BaseCrossChecker {
else {
debug('Because the latest block %d is too old, skip this cross check', latestBlockNum)
}
return endingCondition()
const end = endingCondition()
debug('polling ending condition: %s', end)
return end
}, pollingInterval)
}

Expand Down
18 changes: 15 additions & 3 deletions packages/reku/src/event/crosschecker/basechecker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,11 @@ export class BaseCrossChecker {

// define from, to
// TODO: use blockNumber for performance
const block = await timeoutWithRetry(() => this.provider.provider.getBlock('latest'), 15 * 1000, 3)
const block = await timeoutWithRetry(() => {
if (!this.provider || !this.provider.provider)
throw new Error('provider not ready')
return this.provider.provider.getBlock('latest')
}, 15 * 1000, 3)
if (!block) {
console.warn('crosscheck failed to get latest block')
return
Expand All @@ -58,7 +62,11 @@ export class BaseCrossChecker {
ccfOptions: CrossCheckFromParam,
) {
// TODO: use blockNumber for performance
const block = await timeoutWithRetry(() => this.provider.provider.getBlock('latest'), 15 * 1000, 3)
const block = await timeoutWithRetry(() => {
if (!this.provider || !this.provider.provider)
throw new Error('provider not ready')
return this.provider.provider.getBlock('latest')
}, 15 * 1000, 3)
if (!block) {
console.warn('crosscheck failed to get latest block')
return
Expand Down Expand Up @@ -121,7 +129,11 @@ export class BaseCrossChecker {
...(topics && { topics }),
}
if (this.provider.provider) {
const logs = await timeoutWithRetry(() => this.provider.provider.getLogs(params), 15 * 1000, 3)
const logs = await timeoutWithRetry(() => {
if (!this.provider || !this.provider.provider)
throw new Error('provider not ready')
return this.provider.provider.getLogs(params)
}, 15 * 1000, 3)
// get ignoreLogs keys
const ignoreLogs = options.ignoreLogs

Expand Down
80 changes: 63 additions & 17 deletions packages/reku/src/provider/provider.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { EventEmitter } from 'node:events'
import type { InterfaceAbi } from 'ethers'
import { Interface, WebSocketProvider, ethers } from 'ethers'
import type { ErrorEvent, WebSocket } from 'ws'
import type { ContractAddress } from '@ora-io/utils'
import { WebSocket } from 'ws'
import type { ErrorEvent } from 'ws'
import { type ContractAddress, isInstanceof, to } from '@ora-io/utils'
import { debug } from '../debug'
import { RekuContractManager } from './contract'

Expand Down Expand Up @@ -39,13 +40,17 @@ export class RekuProviderManager {
}

connect() {
const url = new URL(this.providerUrl)
if (url.protocol === 'ws:' || url.protocol === 'wss:')
if (this.isWebSocketProviderUrl)
this._provider = new ethers.WebSocketProvider(this.providerUrl)
else
this._provider = new ethers.JsonRpcProvider(this.providerUrl)
}

get isWebSocketProviderUrl() {
const url = new URL(this.providerUrl)
return url.protocol === 'ws:' || url.protocol === 'wss:'
}

get provider() {
return this._provider as ethers.JsonRpcProvider | WebSocketProvider
}
Expand All @@ -54,6 +59,16 @@ export class RekuProviderManager {
return this._contracts
}

get websocket() {
if (isInstanceof(this._provider, ethers.WebSocketProvider))
return this._provider.websocket
return undefined
}

get destroyed() {
return this._provider?.destroyed
}

addContract(address: ContractAddress, contract: ethers.Contract): RekuContractManager | undefined
addContract(address: ContractAddress, abi: Interface | InterfaceAbi): RekuContractManager | undefined
addContract(address: ContractAddress, abi: Interface | InterfaceAbi | ethers.Contract): RekuContractManager | undefined {
Expand Down Expand Up @@ -156,7 +171,18 @@ export class RekuProviderManager {
socket.onerror = null
debug('remove all listeners of websocket provider')
}
this._provider?.destroy()
debug('reconnect destroyed: %s', this._provider?.destroyed)
if (this._provider && !this._provider.destroyed) {
if (isInstanceof(this._provider, ethers.WebSocketProvider)) {
debug('reconnect websocket readyState: %s', this.websocket?.readyState)
if (this.websocket?.readyState !== WebSocket.CONNECTING)
to(Promise.resolve(this._provider.destroy()))
}
else {
to(Promise.resolve(this._provider.destroy()))
}
}

this._provider = undefined

setTimeout(() => {
Expand Down Expand Up @@ -186,21 +212,41 @@ export class RekuProviderManager {
if (this._options?.disabledHeartbeat)
return
debug('start heartbeat')
this._heartbeatTimer = setInterval(() => {
debug('heartbeat running...')
debug('heartbeat has provider: %s', !!this._provider)
this._provider?.send('net_version', [])
.then((res) => {
debug('heartbeat response: %s', res)
})
.catch((err) => {
this.reconnect()
this._event?.emit('error', err)
debug('heartbeat error: %s', err)
})
this._heartbeatTimer = setInterval(async () => {
if (!this.destroyed) {
debug('heartbeat running...')
const hasProvider = this._hasProvider()
debug('heartbeat has provider: %s', hasProvider)
this._provider?.send('net_version', [])
.then((res) => {
debug('heartbeat response: %s', res)
})
.catch((err) => {
this.reconnect()
this._event?.emit('error', err)
debug('heartbeat error: %s', err)
})
.finally(() => {
debug('heartbeat finally')
})
}
else {
debug('heartbeat destroyed')
}
}, this._heartbeatInterval)
}

private _hasProvider() {
const hasProvider = !!this._provider && !!this._provider.provider
let isInstance = false
if (this.isWebSocketProviderUrl)
isInstance = isInstanceof(this._provider, ethers.WebSocketProvider) && isInstanceof(this._provider.provider, ethers.WebSocketProvider)
else
isInstance = isInstanceof(this._provider, ethers.JsonRpcProvider) && isInstanceof(this._provider.provider, ethers.JsonRpcProvider)

return hasProvider && isInstance && !this._provider?.destroyed
}

private _clearHeartbeat() {
if (this._heartbeatTimer) {
debug('clear heartbeat')
Expand Down
4 changes: 2 additions & 2 deletions packages/utils/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@ora-io/utils",
"type": "module",
"version": "0.3.2",
"version": "0.3.3",
"packageManager": "pnpm@8.10.5",
"description": "",
"author": "Norman (nom4dv3), MuRong",
Expand Down Expand Up @@ -43,7 +43,7 @@
"ethers": ">=6.13.0"
},
"dependencies": {
"@murongg/utils": "^0.1.28",
"@murongg/utils": "^0.2.0",
"cache-manager": "5.7.6",
"cache-manager-ioredis-yet": "2.1.1",
"debug": "^4.3.7",
Expand Down
9 changes: 7 additions & 2 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.