diff --git a/package.json b/package.json index f57a702..ec50e94 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "ora-stack", - "version": "0.3.5", + "version": "0.4.0-beta.4", "private": true, "packageManager": "pnpm@9.7.0", "description": "", @@ -47,7 +47,8 @@ "publish": "nr prepublishOnly && esno scripts/publish.ts", "release": "esno scripts/release.ts", "start": "esno src/index.ts", - "test": "DEBUG=DEBUG:ora-stack:* vitest", + "test": "vitest", + "test:debug": "DEBUG=DEBUG:ora-stack:* vitest", "typecheck": "tsc --noEmit" }, "devDependencies": { diff --git a/packages/orap/examples/declarativeDemo/app.ts b/packages/orap/examples/declarativeDemo/app.ts index 9ed6e3f..4ff9d4a 100644 --- a/packages/orap/examples/declarativeDemo/app.ts +++ b/packages/orap/examples/declarativeDemo/app.ts @@ -2,7 +2,7 @@ import type { ContractEventPayload } from 'ethers' import { Logger, objectKeys, randomStr, redisStore } from '@ora-io/utils' import type { ListenOptions, ToKeyFn } from '../../src' -import { CheckTransactionStatus, Orap, StoreManager, getMiddlewareContext } from '../../src' +import { Orap, StoreManager, getMiddlewareContext } from '../../src' import ABI from './erc20.abi.json' const MAINNET_USDT_ADDR = '0xdAC17F958D2ee523a2206206994597C13D831ec7' @@ -24,7 +24,12 @@ export function startDemo(options: ListenOptions, storeConfig?: any) { const toKey: ToKeyFn = (from: string, _to: string, _amount: number) => `${from}_${randomStr(4)}` - orap.event(eventSignalParam.address, eventSignalParam.abi, eventSignalParam.eventName) + const event = orap.event({ + address: eventSignalParam.address, + abi: eventSignalParam.abi, + eventName: eventSignalParam.eventName, + enableSubscribe: false, + }) .crosscheck({ store, storeKeyPrefix: 'ora-stack:orap:demo:cc:', @@ -37,31 +42,40 @@ export function startDemo(options: ListenOptions, storeConfig?: any) { // event hook, not necessary .handle(newEventSignalHook) - // add a task - .task() + // add a task + event.task() .cache(sm) .key(toKey) .prefix('ora-stack:orap:demo:TransferTask:', 'ora-stack:orap:demo:Done-TransferTask:') .ttl({ taskTtl: 120000, doneTtl: 60000 }) - .use(CheckTransactionStatus(options.wsProvider)) + // .use(CheckTransactionStatus(options.wsProvider)) .handle(handleTask) // add another task - .another() - .task() - .prefix('ora-stack:orap:demo:AnotherTask:', 'ora-stack:orap:demo:Done-AnotherTask:') - .cache(sm) // rm to use mem by default - .ttl({ taskTtl: 20000, doneTtl: 20000 }) - .handle(handleTask_2) + // .another() + // .task() + // .prefix('ora-stack:orap:demo:AnotherTask:', 'ora-stack:orap:demo:Done-AnotherTask:') + // .cache(sm) // rm to use mem by default + // .ttl({ taskTtl: 20000, doneTtl: 20000 }) + // .handle(handleTask_2) // start signal listener orap.listen( options, () => { logger.log('listening on provider.network') }, ) + + setTimeout(() => { + logger.log('[+] add another address') + event.addresses([ + eventSignalParam.address, + '0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48', + ]) + event.restart() + }, 10 * 1000) } -async function handleTask(from: string, to: string, amount: number, _event: ContractEventPayload) { - logger.log('[+] handleTask: from =', from, 'to =', to, 'amount =', amount) +async function handleTask(from: string, to: string, amount: number, event: ContractEventPayload) { + logger.log('[+] handleTask: from =', from, 'to =', to, 'amount =', amount, 'address =', event.log.address) const args = objectKeys(arguments).map(k => arguments[k]) const { next } = getMiddlewareContext(...args) @@ -74,10 +88,10 @@ async function newEventSignalHook(from: string, to: string, amount: number, even return true // true to continue handle tasks, false to hijack the process. } -async function handleTask_2(from: string, to: string, amount: number) { - logger.log('[+] handleTask_2: from =', from, 'to =', to, 'amount =', amount) - const args = objectKeys(arguments).map(k => arguments[k]) +// async function handleTask_2(from: string, to: string, amount: number) { +// logger.log('[+] handleTask_2: from =', from, 'to =', to, 'amount =', amount) +// const args = objectKeys(arguments).map(k => arguments[k]) - const { next } = getMiddlewareContext(...args) - await next() -} +// const { next } = getMiddlewareContext(...args) +// await next() +// } diff --git a/packages/orap/package.json b/packages/orap/package.json index d6e58f9..661b38d 100644 --- a/packages/orap/package.json +++ b/packages/orap/package.json @@ -1,7 +1,7 @@ { "name": "@ora-io/orap", "type": "module", - "version": "0.3.5", + "version": "0.4.0-beta.4", "packageManager": "pnpm@8.10.5", "description": "", "author": "Norman (nom4dv3), MuRong", diff --git a/packages/orap/src/beat/event.ts b/packages/orap/src/beat/event.ts index f44e237..d29d43b 100644 --- a/packages/orap/src/beat/event.ts +++ b/packages/orap/src/beat/event.ts @@ -20,4 +20,8 @@ export class EventBeat extends EventSignal { drop() { this.listen(this.subscribeProvider, this.crosscheckProvider) } + + stop() { + super.stop() + } } diff --git a/packages/orap/src/flow/event.test.ts b/packages/orap/src/flow/event.test.ts index dc53e0e..c570c2a 100644 --- a/packages/orap/src/flow/event.test.ts +++ b/packages/orap/src/flow/event.test.ts @@ -1,5 +1,7 @@ import { ethers } from 'ethers' -import { beforeEach, describe, expect, it } from 'vitest' +import { beforeEach, describe, expect, it, vi } from 'vitest' +import { EventVerse } from '../verse/event' +import { ERC20_ABI, USDT_ADDRESS } from '../../tests/config' import { OrapFlow } from './orap' import { EventFlow } from './event' @@ -9,7 +11,7 @@ describe('EventFlow', () => { beforeEach(() => { orapFlow = new OrapFlow() - eventFlow = new EventFlow(orapFlow) + eventFlow = new EventFlow(orapFlow, { address: USDT_ADDRESS, abi: ERC20_ABI, eventName: 'Transfer' }) }) it('should create a task flow', () => { @@ -48,4 +50,43 @@ describe('EventFlow', () => { const parentFlow = eventFlow.another() expect(parentFlow).toBe(orapFlow) }) + + it('should stop the EventVerse', () => { + const stopFn = vi.fn() + vi.spyOn(EventVerse.prototype, 'stop').mockImplementation(stopFn) + eventFlow.stop() + expect(stopFn).toHaveBeenCalled() + }) + + describe('address', () => { + it('should set the address', () => { + const eventFlow = new EventFlow(orapFlow, { address: USDT_ADDRESS, abi: ERC20_ABI, eventName: 'Transfer' }) + eventFlow.address('0x1234567890123456789012345678901234567890') + expect(eventFlow.params.address).toEqual(['0x1234567890123456789012345678901234567890']) + }) + + it.only('should set the array address', () => { + const eventFlow = new EventFlow(orapFlow, { address: [USDT_ADDRESS], abi: ERC20_ABI, eventName: 'Transfer' }) + eventFlow.address(1, '0x1234567890123456789012345678901234567890') + expect(eventFlow.params.address).toContainEqual('0x1234567890123456789012345678901234567890') + }) + + it('should set the address with number', () => { + const eventFlow = new EventFlow(orapFlow, { address: USDT_ADDRESS, abi: ERC20_ABI, eventName: 'Transfer' }) + eventFlow.address(1, '0x1234567890123456789012345678901234567890') + expect(eventFlow.params.address).toEqual(['0x1234567890123456789012345678901234567890']) + }) + + it('should set the addresses with array', () => { + const eventFlow = new EventFlow(orapFlow, { address: USDT_ADDRESS, abi: ERC20_ABI, eventName: 'Transfer' }) + eventFlow.addresses(['0x1234567890123456789012345678901234567890']) + expect(eventFlow.params.address).toEqual(['0x1234567890123456789012345678901234567890']) + }) + + it('should set the addresses with array and number', () => { + const eventFlow = new EventFlow(orapFlow, { address: USDT_ADDRESS, abi: ERC20_ABI, eventName: 'Transfer' }) + eventFlow.addresses([USDT_ADDRESS, '0x1234567890123456789012345678901234567890']) + expect(eventFlow.params.address).toEqual([USDT_ADDRESS, '0x1234567890123456789012345678901234567890']) + }) + }) }) diff --git a/packages/orap/src/flow/event.ts b/packages/orap/src/flow/event.ts index 35f11f3..8485bc6 100644 --- a/packages/orap/src/flow/event.ts +++ b/packages/orap/src/flow/event.ts @@ -1,4 +1,5 @@ import type { AutoCrossCheckParam, Providers } from '@ora-io/reku' +import type { ContractAddress } from '@ora-io/utils/src' import type { Context } from '../task' import type { TaskFlowParams } from '../flow/task' import { TaskFlow } from '../flow/task' @@ -18,11 +19,25 @@ export class EventFlow implements Flow { private _subscribeProvider?: Providers private _crosscheckProvider?: Providers + private _verse: EventVerse = new EventVerse(this) + private _addresses: ContractAddress[] = [] + private _params: EventSignalRegisterParams + + get verse() { + return this._verse + } + + get params() { + return this._params + } + constructor( - private parentFlow?: OrapFlow, - public params?: EventSignalRegisterParams, + private parentFlow: OrapFlow, + params: EventSignalRegisterParams, handleFn?: HandleFn, // return: succ & continue if true, stop if false ) { + this._params = params + this._addresses = Array.isArray(this.params.address) ? this.params.address : [this.params.address] // Default handleFn this.handleFn = handleFn ?? (async (..._args: Array) => { return true @@ -52,6 +67,7 @@ export class EventFlow implements Flow { tf = new TaskFlow(this, sm) } this._taskFlows.push(tf) + this._verse.setTaskVerses(this._taskFlows.map(flow => flow.verse)) return tf } @@ -89,6 +105,48 @@ export class EventFlow implements Flow { } another(): OrapFlow { - return this.parentFlow! + return this.parentFlow + } + + stop(): this { + this._verse.stop() + return this + } + + restart(): this { + if (this.parentFlow.wsProvider) + this._subscribeProvider = this.parentFlow.wsProvider + else + throw new Error('wsProvider is not set, cannot restart') + if (this.parentFlow.httpProvider) + this._crosscheckProvider = this.parentFlow.httpProvider + else + throw new Error('httpProvider is not set, cannot restart') + this._verse.restart() + return this + } + + address(_index: number, _address: ContractAddress): this + address(address: ContractAddress): this + address(_first: ContractAddress | number, _second?: ContractAddress): this { + if (typeof _first === 'number') { + if (!_second) + throw new Error('address is required') + if (Array.isArray(this._params.address)) + Reflect.set(this._addresses, _first, _second) + + else this._addresses = [_second!] + } + else + if (Array.isArray(this._params.address)) { this._addresses = [...new Set([...this._params.address, _first])] } + else { this._addresses = [_first] } + this._params.address = this._addresses + return this + } + + addresses(addresses: ContractAddress[]): this { + this._addresses = addresses + this._params.address = this._addresses + return this } } diff --git a/packages/orap/src/flow/orap.test.ts b/packages/orap/src/flow/orap.test.ts index 4eeab12..90c0791 100644 --- a/packages/orap/src/flow/orap.test.ts +++ b/packages/orap/src/flow/orap.test.ts @@ -1,6 +1,7 @@ import { ethers } from 'ethers' import { beforeEach, describe, expect, it, vi } from 'vitest' import { SEPOLIA_HTTP, SEPOLIA_WSS } from '../../tests/config' +import { OrapVerse } from '../verse/orap' import { OrapFlow } from './orap' describe('OrapFlow', () => { @@ -33,4 +34,11 @@ describe('OrapFlow', () => { expect(eventFlow).toBeDefined() expect(orapFlow.eventFlows).toContain(eventFlow) }) + + it('should stop the OrapVerse', () => { + const stopFn = vi.fn() + vi.spyOn(OrapVerse.prototype, 'stop').mockImplementation(stopFn) + orapFlow.stop() + expect(stopFn).toHaveBeenCalled() + }) }) diff --git a/packages/orap/src/flow/orap.ts b/packages/orap/src/flow/orap.ts index 0f41ae1..d91e79f 100644 --- a/packages/orap/src/flow/orap.ts +++ b/packages/orap/src/flow/orap.ts @@ -22,15 +22,31 @@ export class OrapFlow implements Flow { } = { event: [] } onListenFn: Fn = () => { } + _wsProvider?: Providers + _httpProvider?: Providers get eventFlows() { return this.subflows.event } + get wsProvider() { + return this._wsProvider + } + + get httpProvider() { + return this._httpProvider + } + + private _verse: OrapVerse = new OrapVerse(this) + + get verse() { + return this._verse + } + event(params: EventSignalRegisterParams, handler?: HandleFn): EventFlow - event(address: ContractAddress, abi: Interface | InterfaceAbi | HandleFn, eventName: string, handler?: HandleFn): EventFlow - event(params: EventSignalRegisterParams | ContractAddress, abi?: Interface | InterfaceAbi | HandleFn, eventName?: string, handler?: HandleFn): EventFlow { - if (typeof params === 'string' || isAddressable(params)) + event(address: ContractAddress | ContractAddress[], abi: Interface | InterfaceAbi | HandleFn, eventName: string, handler?: HandleFn): EventFlow + event(params: EventSignalRegisterParams | ContractAddress | ContractAddress[], abi?: Interface | InterfaceAbi | HandleFn, eventName?: string, handler?: HandleFn): EventFlow { + if (typeof params === 'string' || isAddressable(params) || Array.isArray(params)) params = { address: params, abi: abi as Interface | InterfaceAbi, eventName: eventName as string } else handler = abi as HandleFn @@ -45,24 +61,37 @@ export class OrapFlow implements Flow { * @param options * @param onListenFn */ - listen(options: ListenOptions, onListenFn?: Fn) { + listen(options: ListenOptions, onListenFn?: Fn): this { for (const eventFlow of this.subflows.event) { eventFlow.setSubscribeProvider(options.wsProvider) if (options.httpProvider) eventFlow.setCrosscheckProvider(options.httpProvider) } + this._wsProvider = options.wsProvider + this._httpProvider = options.httpProvider if (onListenFn) this.onListenFn = onListenFn + const eventVerses = this.subflows.event.map(flow => flow.verse) + this._verse.setEventVerses(eventVerses) - const orapVerse = this.assemble() - orapVerse.play() + this._verse.play() this.onListenFn() + return this + } + + stop(): this { + this._verse.stop() + return this + } + + restart(): this { + this._verse.restart() + return this } assemble(): OrapVerse { const eventVerses = this.subflows.event.map(flow => flow.assemble()) return new OrapVerse(this).setEventVerses(eventVerses) - // this.routes.event.push(es) } } diff --git a/packages/orap/src/flow/task.test.ts b/packages/orap/src/flow/task.test.ts index 103938a..fdbc493 100644 --- a/packages/orap/src/flow/task.test.ts +++ b/packages/orap/src/flow/task.test.ts @@ -1,14 +1,15 @@ import { beforeEach, describe, expect, it, vi } from 'vitest' import { memoryStore } from '@ora-io/utils' import { StoreManager } from '../store' -import { EventFlow, TaskFlow } from '.' +import { ERC20_ABI, USDT_ADDRESS } from '../../tests/config' +import { EventFlow, OrapFlow, TaskFlow } from '.' describe('TaskFlow', () => { let parentFlow: any let taskFlow: TaskFlow beforeEach(() => { - parentFlow = new EventFlow() + parentFlow = new EventFlow(new OrapFlow(), { address: USDT_ADDRESS, abi: ERC20_ABI, eventName: 'Transfer' }) taskFlow = new TaskFlow(parentFlow) }) diff --git a/packages/orap/src/flow/task.ts b/packages/orap/src/flow/task.ts index 52f6f89..489ea76 100644 --- a/packages/orap/src/flow/task.ts +++ b/packages/orap/src/flow/task.ts @@ -16,6 +16,11 @@ const defaultHandleFn: HandleFn = () => { } const defaultToKeyFn: ToKeyFn = _ => randomStr(8, alphabetHex) + +const defaultFailFn: HandleResultFn = async (task: TaskRaplized) => { + await task.remove() +} + export interface TaskFlowTTL { taskTtl: Milliseconds; doneTtl: Milliseconds } export interface TaskFlowParams { @@ -41,15 +46,17 @@ export class TaskFlow implements Flow { toKeyFn: ToKeyFn = defaultToKeyFn handleFn: HandleFn = defaultHandleFn successFn: HandleResultFn = defaultSuccessFn + failFn: HandleResultFn = defaultFailFn private _middlewares: Array = [] - - failFn: HandleResultFn = async (task: TaskRaplized) => { - await task.remove() - } + private _verse: TaskVerse = new TaskVerse(this) ctx?: Context + get verse() { + return this._verse + } + constructor( private parentFlow: EventFlow, params?: TaskFlowParams, @@ -136,4 +143,14 @@ export class TaskFlow implements Flow { assemble(): TaskVerse { return new TaskVerse(this) } + + stop(): this { + this._verse.stop() + return this + } + + restart(): this { + this._verse.restart() + return this + } } diff --git a/packages/orap/src/signal/event.test.ts b/packages/orap/src/signal/event.test.ts index ca6723a..bdd4e05 100644 --- a/packages/orap/src/signal/event.test.ts +++ b/packages/orap/src/signal/event.test.ts @@ -2,6 +2,7 @@ import { ethers } from 'ethers' import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import type { AutoCrossCheckParam } from '@ora-io/reku' import { AutoCrossChecker, RekuProviderManager } from '@ora-io/reku' +import type { ContractAddress } from '@ora-io/utils/src' import { ERC20_ABI, MAINNET_WSS, USDT_ADDRESS } from '../../tests/config' import type { EventSignalCallback, EventSignalRegisterParams } from './event' import { EventSignal } from './event' @@ -30,11 +31,6 @@ describe('EventSignal', () => { expect(eventSignal).toBeInstanceOf(EventSignal) }) - it('should set the contract property', () => { - expect(eventSignal.contract).toBeInstanceOf(ethers.Contract) - expect(eventSignal.contract.interface).toEqual(new ethers.Interface(ERC20_ABI)) - }) - it('should set the eventFragment property', () => { expect(eventSignal.eventFragment).toBeInstanceOf(ethers.EventFragment) expect(eventSignal.eventFragment.name).toBe(params.eventName) @@ -91,29 +87,37 @@ describe('EventSignal', () => { const provider = {} as any beforeEach(() => { - eventSignal.contract.connect = vi.fn().mockReturnValue({ - on: vi.fn(), - }) + // Mock the contract in the contractMap + const mockContract = { + connect: vi.fn().mockReturnValue({ + on: vi.fn(), + }), + removeListener: vi.fn(), + } as any + eventSignal.contractMap.set(params.address as ContractAddress, mockContract) }) it('should add the contract to the provider if provider is an instance of RekuProviderManager', () => { rekuProviderManager.addContract = vi.fn() rekuProviderManager.addListener = vi.fn() eventSignal.startEventListener(rekuProviderManager) - expect(rekuProviderManager.addContract).toHaveBeenCalledWith(params.address, eventSignal.contract) + expect(rekuProviderManager.addContract).toHaveBeenCalledWith(params.address, eventSignal.contractMap.get(params.address as ContractAddress)) expect(rekuProviderManager.addListener).toHaveBeenCalledWith(params.address, params.eventName, eventSignal.subscribeCallback) }) it('should connect the contract to the provider if provider is not an instance of RekuProviderManager', () => { eventSignal.startEventListener(provider) - expect(eventSignal.contract.connect).toHaveBeenCalledWith(provider) + expect(eventSignal.contractMap.get(params.address as ContractAddress)?.connect).toHaveBeenCalledWith(provider) }) it('should call the on method of the listener with the eventName and subscribeCallback', () => { const listener = { on: vi.fn(), } - eventSignal.contract.connect = vi.fn().mockReturnValue(listener) + const mockContract = eventSignal.contractMap.get(params.address as ContractAddress) + if (mockContract) + mockContract.connect = vi.fn().mockReturnValue(listener) + eventSignal.startEventListener(provider) expect(listener.on).toHaveBeenCalledWith(params.eventName, eventSignal.subscribeCallback) }) @@ -156,4 +160,14 @@ describe('EventSignal', () => { expect(eventSignal.crosschecker?.start).toHaveBeenCalledWith(eventSignal.crosscheckerOptions) }) }) + + describe('stop', () => { + it('should stop the event listener and the crosschecker', () => { + eventSignal.stopEventListener = vi.fn() + eventSignal.stopCrossChecker = vi.fn() + eventSignal.stop() + expect(eventSignal.stopEventListener).toHaveBeenCalled() + expect(eventSignal.stopCrossChecker).toHaveBeenCalledWith() + }) + }) }) diff --git a/packages/orap/src/signal/event.ts b/packages/orap/src/signal/event.ts index 980ca77..b3d4729 100644 --- a/packages/orap/src/signal/event.ts +++ b/packages/orap/src/signal/event.ts @@ -6,9 +6,11 @@ import type { ContractAddress } from '@ora-io/utils' import type { Signal } from './interface' export interface EventSignalRegisterParams { - address: ContractAddress + address: ContractAddress | ContractAddress[] abi: Interface | InterfaceAbi eventName: string + enableCrosscheck?: boolean // default: true + enableSubscribe?: boolean // default: true // esig?: string, } @@ -25,7 +27,10 @@ export type EventSignalCallback = ethers.Listener export class EventSignal implements Signal { provider?: Providers - contract: ethers.Contract + params: EventSignalRegisterParams + addresses: ContractAddress[] + contractMap: Map = new Map() + interface: ethers.Interface esig: string eventFragment: EventFragment @@ -37,17 +42,26 @@ export class EventSignal implements Signal { crosscheckerParams?: CrosscheckParams constructor( - public params: EventSignalRegisterParams, + params: EventSignalRegisterParams, public callback: EventSignalCallback, crosscheckOptions?: CrosscheckOptions, ) { - this.contract = new ethers.Contract( - params.address, - params.abi, - ) + params = this._transformParams(params) + this.params = params + + if (Array.isArray(params.address)) + this.addresses = params.address.map(this._getAddressStr) + + else this.addresses = [this._getAddressStr(params.address)] + + for (const address of this.addresses) { + const contract = new ethers.Contract(address, params.abi) + this.contractMap.set(address, contract) + } // Get the event fragment by name - const iface = this.contract.interface + const iface = ethers.Interface.from(params.abi) + this.interface = iface const _ef = iface.getEvent(params.eventName) if (!_ef) throw new Error('event not found in abi') @@ -63,7 +77,7 @@ export class EventSignal implements Signal { } // to align with subscribe listener, parse event params and add EventLog to the last this.crosscheckCallback = async (log: Log) => { - const parsedLog = this.contract.interface.decodeEventLog(this.eventFragment, log.data, log.topics) + const parsedLog = this.interface.decodeEventLog(this.eventFragment, log.data, log.topics) const payload = this._wrapContractEventPayload(log) await this.callback(...parsedLog, payload) } @@ -73,10 +87,27 @@ export class EventSignal implements Signal { this._setCrosscheckOptions(crosscheckOptions) } + private _transformParams(params: EventSignalRegisterParams) { + if (params.enableCrosscheck === undefined) + params.enableCrosscheck = true + if (params.enableSubscribe === undefined) + params.enableSubscribe = true + return params + } + private _wrapContractEventPayload(log: Log) { + const contract = this.contractMap.get(this._getAddressStr(log.address)) + if (!contract) + throw new Error(`contract not found for address: ${log.address}`) if (this.eventFragment) - return new ContractEventPayload(this.contract, this.subscribeCallback, this.params.eventName, this.eventFragment, log) - return new ContractUnknownEventPayload(this.contract, this.subscribeCallback, this.params.eventName, log) + return new ContractEventPayload(contract, this.subscribeCallback, this.params.eventName, this.eventFragment, log) + return new ContractUnknownEventPayload(contract, this.subscribeCallback, this.params.eventName, log) + } + + private _getAddressStr(address: ContractAddress) { + if (typeof address === 'string') + return ethers.getAddress(address) + return address } private _setCrosscheckOptions(options: CrosscheckOptions) { @@ -112,21 +143,51 @@ export class EventSignal implements Signal { } startEventListener(provider: Providers) { + if (!this.params.enableSubscribe) + return if (provider instanceof RekuProviderManager) { - provider.addContract(this.params.address, this.contract) - provider.addListener(this.params.address, this.params.eventName, this.subscribeCallback) + for (const address of this.addresses) + provider.addContract(address, this.contractMap.get(address)!) + for (const address of this.addresses) + provider.addListener(address, this.params.eventName, this.subscribeCallback) } else { - const listener = this.contract.connect(provider) - listener?.on( - this.params.eventName, - // TODO: calling this seems to be async, should we make it to sequential? - this.subscribeCallback, - ) + // const listener = this.contract.connect(provider) + // listener?.on( + // this.params.eventName, + // // TODO: calling this seems to be async, should we make it to sequential? + // this.subscribeCallback, + // ) + for (const address of this.addresses) { + const newContract = this.contractMap.get(address)!.connect(provider) + newContract.on(this.params.eventName, this.subscribeCallback) + this.contractMap.set(address, newContract) + } } } + stop() { + this.stopEventListener() + this.stopCrossChecker() + } + + stopEventListener() { + if (this.provider instanceof RekuProviderManager) + this.contractMap.forEach(contract => contract.removeAllListeners()) + + else + this.contractMap.forEach(contract => contract.off(this.params.eventName, this.subscribeCallback)) + // this.provider?.destroy() + } + + stopCrossChecker() { + this.crosschecker?.stop() + } + async startCrossChecker(provider?: Providers) { + if (!this.params.enableCrosscheck) + return + if (this.crosscheckerParams?.disabled) return diff --git a/packages/orap/src/task/verse.test.ts b/packages/orap/src/task/verse.test.ts index f5c3cf6..f723fd0 100644 --- a/packages/orap/src/task/verse.test.ts +++ b/packages/orap/src/task/verse.test.ts @@ -4,6 +4,7 @@ import { EventFlow, OrapFlow, TaskFlow } from '../flow' import type { StoreManager } from '../store' import { getMiddlewareContext } from '../utils' import { HandleFailedMiddleware, HandleSuccessMiddleware } from '../middlewares/private' +import { ERC20_ABI, USDT_ADDRESS } from '../../tests/config' import { TaskRaplized } from './verse' describe('TaskRaplized', () => { @@ -15,7 +16,7 @@ describe('TaskRaplized', () => { const taskFlowDoneTtl = 2000 beforeEach(() => { - taskFlow = new TaskFlow(new EventFlow(new OrapFlow())) + taskFlow = new TaskFlow(new EventFlow(new OrapFlow(), { address: USDT_ADDRESS, abi: ERC20_ABI, eventName: 'Transfer' })) taskFlow.handle(vi.fn(async (...args: any[]) => { const { next } = getMiddlewareContext(args) diff --git a/packages/orap/src/verse/event.test.ts b/packages/orap/src/verse/event.test.ts index 0d3290e..84aed4b 100644 --- a/packages/orap/src/verse/event.test.ts +++ b/packages/orap/src/verse/event.test.ts @@ -2,6 +2,7 @@ import { ethers } from 'ethers' import { beforeEach, describe, expect, it, vi } from 'vitest' import { EventFlow, OrapFlow } from '../flow' import { EventBeat } from '../beat/event' +import { ERC20_ABI, USDT_ADDRESS } from '../../tests/config' import { EventVerse } from './event' import type { TaskVerse } from './task' @@ -12,7 +13,7 @@ describe('EventVerse', () => { beforeEach(() => { orapFlow = new OrapFlow() - eventFlow = new EventFlow(orapFlow) + eventFlow = new EventFlow(orapFlow, { address: USDT_ADDRESS, abi: ERC20_ABI, eventName: 'Transfer' }) eventVerse = new EventVerse(eventFlow) }) @@ -73,4 +74,24 @@ describe('EventVerse', () => { expect(eventBeatDrop).toHaveBeenCalled() }) + + it('should stop event verse', () => { + const eventBeatStop = vi.fn() + vi.spyOn(EventBeat.prototype, 'stop').mockImplementation(eventBeatStop) + + // Mock the _play method to avoid actual EventBeat creation + vi.spyOn(eventVerse as any, '_play').mockImplementation(() => { + // Create a mock eventBeat + (eventVerse as any).eventBeat = { + stop: eventBeatStop, + } + }) + + // Call play first to create the eventBeat + eventVerse.play() + // Then call stop + eventVerse.stop() + + expect(eventBeatStop).toHaveBeenCalled() + }) }) diff --git a/packages/orap/src/verse/event.ts b/packages/orap/src/verse/event.ts index 6d05c64..400ae40 100644 --- a/packages/orap/src/verse/event.ts +++ b/packages/orap/src/verse/event.ts @@ -5,6 +5,7 @@ import type { TaskVerse } from './task' export class EventVerse implements Verse { private taskVerses: TaskVerse[] = [] + private eventBeat: EventBeat | undefined constructor(private flow: EventFlow) { } @@ -35,6 +36,18 @@ export class EventVerse implements Verse { this._play() } + stop() { + for (const verse of this.taskVerses) + verse.stop() + this.eventBeat?.stop() + this.eventBeat = undefined + } + + restart() { + this.stop() + this.play() + } + setTaskVerses(taskVerses: TaskVerse[]) { this.taskVerses = taskVerses return this @@ -47,22 +60,15 @@ export class EventVerse implements Verse { private _play() { // create an beat per verse - const eventBeat = new EventBeat( + this.eventBeat = new EventBeat( // for create signal - this.flow.params!, + this.flow.params, this.handleSignal.bind(this), this.flow.partialCrosscheckOptions, // for listen this.flow.subscribeProvider!, this.flow.crosscheckProvider, ) - eventBeat.drop() - - // this.eventSignal = new EventSignal( - // this.flow.params!, - // this.handleSignal, - // this.flow.logger, - // this.flow.partialCrosscheckOptions, - // ) + this.eventBeat.drop() } } diff --git a/packages/orap/src/verse/orap.test.ts b/packages/orap/src/verse/orap.test.ts index d8c3640..71efc7a 100644 --- a/packages/orap/src/verse/orap.test.ts +++ b/packages/orap/src/verse/orap.test.ts @@ -1,6 +1,7 @@ import { beforeEach, describe, expect, it, vi } from 'vitest' import { EventVerse } from '../verse/event' import { EventFlow, OrapFlow } from '../flow' +import { ERC20_ABI, USDT_ADDRESS } from '../../tests/config' import { OrapVerse } from './orap' describe('OrapVerse', () => { @@ -11,7 +12,7 @@ describe('OrapVerse', () => { beforeEach(() => { orapFlow = new OrapFlow() orapVerse = new OrapVerse(orapFlow) - eventVerse = new EventVerse(new EventFlow(orapFlow)) + eventVerse = new EventVerse(new EventFlow(orapFlow, { address: USDT_ADDRESS, abi: ERC20_ABI, eventName: 'Transfer' })) }) it('should play the OrapVerse', () => { @@ -35,4 +36,12 @@ describe('OrapVerse', () => { orapVerse.setEventVerses(eventVerses) expect(orapVerse.eventVerses).toEqual(eventVerses) }) + + it('should stop the OrapVerse', () => { + const stopFn = vi.fn() + vi.spyOn(EventVerse.prototype, 'stop').mockImplementation(stopFn) + orapVerse.setEventVerses([eventVerse]) + orapVerse.stop() + expect(stopFn).toHaveBeenCalled() + }) }) diff --git a/packages/orap/src/verse/orap.ts b/packages/orap/src/verse/orap.ts index 6a880f8..9bc3b00 100644 --- a/packages/orap/src/verse/orap.ts +++ b/packages/orap/src/verse/orap.ts @@ -21,6 +21,16 @@ export class OrapVerse implements Verse { // return this } + stop() { + for (const verse of this._eventVerses) + verse.stop() + } + + restart() { + for (const verse of this._eventVerses) + verse.restart() + } + setEventVerses(_eventVerses: EventVerse[]) { this._eventVerses = _eventVerses return this diff --git a/packages/orap/src/verse/task.test.ts b/packages/orap/src/verse/task.test.ts new file mode 100644 index 0000000..0b63a7d --- /dev/null +++ b/packages/orap/src/verse/task.test.ts @@ -0,0 +1,225 @@ +import { beforeEach, describe, expect, it, vi } from 'vitest' +import { sleep } from '@murongg/utils' +import { TaskFlow } from '../flow/task' +import { EventFlow } from '../flow/event' +import { OrapFlow } from '../flow' +import { TaskRaplized } from '../task/verse' +import { ERC20_ABI, USDT_ADDRESS } from '../../tests/config' +import { TaskVerse } from './task' + +// Mock the sleep function +vi.mock('@murongg/utils', () => ({ + sleep: vi.fn(), +})) + +describe('TaskVerse', () => { + let orapFlow: OrapFlow + let eventFlow: EventFlow + let taskFlow: TaskFlow + let taskVerse: TaskVerse + let mockStoreManager: any + + beforeEach(() => { + vi.clearAllMocks() + + // Mock store manager + mockStoreManager = { + keys: vi.fn(), + get: vi.fn(), + set: vi.fn(), + del: vi.fn(), + } + + orapFlow = new OrapFlow() + eventFlow = new EventFlow(orapFlow, { address: USDT_ADDRESS, abi: ERC20_ABI, eventName: 'Transfer' }) + taskFlow = new TaskFlow(eventFlow) + taskFlow.cache(mockStoreManager) + taskVerse = new TaskVerse(taskFlow) + }) + + describe('constructor', () => { + it('should create instance with task flow', () => { + expect(taskVerse).toBeInstanceOf(TaskVerse) + expect(taskVerse.flow).toBe(taskFlow) + }) + }) + + describe('createTask', () => { + it('should create and save a task', async () => { + vi.spyOn(TaskRaplized.prototype, 'save').mockResolvedValue(undefined) + vi.spyOn(TaskRaplized.prototype, 'getTaskPrefix').mockResolvedValue('Task:test') + + const args = ['arg1', 'arg2', 'arg3'] + taskVerse.play() + const result = await taskVerse.createTask(...args) + + expect(result).toBe(taskVerse) + expect(TaskRaplized.prototype.save).toHaveBeenCalled() + }) + + it('should handle multiple tasks when loading is false', async () => { + vi.spyOn(TaskRaplized.prototype, 'save').mockResolvedValue(undefined) + vi.spyOn(TaskRaplized.prototype, 'getTaskPrefix').mockResolvedValue('Task:test') + vi.spyOn(TaskRaplized.prototype, 'loadByKey').mockResolvedValue({} as TaskRaplized) + vi.spyOn(TaskRaplized.prototype, 'handle').mockResolvedValue(undefined) + + mockStoreManager.keys + .mockResolvedValueOnce(['Task:test:1', 'Task:test:2']) + .mockResolvedValueOnce([]) + + const args = ['arg1', 'arg2'] + taskVerse.play() + await taskVerse.createTask(...args) + + // Wait for the async operation to complete + await new Promise(resolve => setTimeout(resolve, 0)) + + expect(mockStoreManager.keys).toHaveBeenCalledWith('Task:test*') + expect(TaskRaplized.prototype.loadByKey).toHaveBeenCalledTimes(2) + expect(TaskRaplized.prototype.handle).toHaveBeenCalledTimes(2) + }) + + it('should not handle tasks when loading is true', async () => { + vi.spyOn(TaskRaplized.prototype, 'save').mockResolvedValue(undefined) + vi.spyOn(TaskRaplized.prototype, 'getTaskPrefix').mockResolvedValue('Task:test') + ;(taskVerse as any).loading = true + + const args = ['arg1', 'arg2'] + taskVerse.play() + await taskVerse.createTask(...args) + + expect(TaskRaplized.prototype.save).toHaveBeenCalled() + }) + }) + + describe('loadAndHandleAll', () => { + it('should load and handle all tasks with given prefix', async () => { + vi.spyOn(TaskRaplized.prototype, 'loadByKey').mockResolvedValue({} as TaskRaplized) + vi.spyOn(TaskRaplized.prototype, 'handle').mockResolvedValue(undefined) + + mockStoreManager.keys + .mockResolvedValueOnce(['Task:test:1', 'Task:test:2']) + .mockResolvedValueOnce([]) + + const args = ['arg1', 'arg2'] + await taskVerse.loadAndHandleAll('Task:test', ...args) + + expect(mockStoreManager.keys).toHaveBeenCalledWith('Task:test*') + expect(TaskRaplized.prototype.loadByKey).toHaveBeenCalledTimes(2) + expect(TaskRaplized.prototype.handle).toHaveBeenCalledTimes(2) + expect(sleep).toHaveBeenCalledWith(1000) + }) + + it('should handle multiple batches of tasks', async () => { + vi.spyOn(TaskRaplized.prototype, 'loadByKey').mockResolvedValue({} as TaskRaplized) + vi.spyOn(TaskRaplized.prototype, 'handle').mockResolvedValue(undefined) + + mockStoreManager.keys + .mockResolvedValueOnce(['Task:test:1', 'Task:test:2']) + .mockResolvedValueOnce(['Task:test:3']) + .mockResolvedValueOnce([]) + + const args = ['arg1', 'arg2'] + await taskVerse.loadAndHandleAll('Task:test', ...args) + + expect(mockStoreManager.keys).toHaveBeenCalledTimes(3) + expect(TaskRaplized.prototype.loadByKey).toHaveBeenCalledTimes(3) + expect(TaskRaplized.prototype.handle).toHaveBeenCalledTimes(3) + expect(sleep).toHaveBeenCalledTimes(2) + }) + + it('should stop when no more keys are found', async () => { + vi.spyOn(TaskRaplized.prototype, 'loadByKey').mockResolvedValue({} as TaskRaplized) + vi.spyOn(TaskRaplized.prototype, 'handle').mockResolvedValue(undefined) + + mockStoreManager.keys.mockResolvedValue([]) + + const args = ['arg1', 'arg2'] + await taskVerse.loadAndHandleAll('Task:test', ...args) + + expect(mockStoreManager.keys).toHaveBeenCalledTimes(1) + expect(TaskRaplized.prototype.loadByKey).not.toHaveBeenCalled() + expect(TaskRaplized.prototype.handle).not.toHaveBeenCalled() + expect(sleep).not.toHaveBeenCalled() + }) + }) + + describe('preload', () => { + it('should start preloading tasks', async () => { + vi.spyOn(TaskRaplized.prototype, 'getTaskPrefix').mockResolvedValue('Task:test') + vi.spyOn(TaskRaplized.prototype, 'loadByKey').mockResolvedValue({} as TaskRaplized) + vi.spyOn(TaskRaplized.prototype, 'handle').mockResolvedValue(undefined) + + mockStoreManager.keys.mockResolvedValue([]) + + taskVerse.preload() + + // Wait for the async operation to complete + await sleep(0) + + expect((taskVerse as any).loading).toBe(true) + expect(TaskRaplized.prototype.getTaskPrefix).toHaveBeenCalled() + }) + + it('should set loading to false after preloading completes', async () => { + vi.spyOn(TaskRaplized.prototype, 'getTaskPrefix').mockResolvedValue('Task:test') + vi.spyOn(TaskRaplized.prototype, 'loadByKey').mockResolvedValue({} as TaskRaplized) + vi.spyOn(TaskRaplized.prototype, 'handle').mockResolvedValue(undefined) + + mockStoreManager.keys.mockResolvedValue([]) + + taskVerse.preload() + + // Wait for the async operation to complete + await new Promise(resolve => setTimeout(resolve, 0)) + + expect((taskVerse as any).loading).toBe(false) + }) + }) + + describe('play', () => { + it('should call preload', () => { + const preloadSpy = vi.spyOn(taskVerse, 'preload') + taskVerse.play() + expect(preloadSpy).toHaveBeenCalled() + }) + }) + + describe('stop', () => { + it('should set playing to false', () => { + // Set playing to true first + (taskVerse as any).playing = true + + taskVerse.stop() + + expect((taskVerse as any).loading).toBe(false) + }) + }) + + describe('integration', () => { + it('should handle complete task lifecycle', async () => { + vi.spyOn(TaskRaplized.prototype, 'save').mockResolvedValue(undefined) + vi.spyOn(TaskRaplized.prototype, 'getTaskPrefix').mockResolvedValue('Task:test') + vi.spyOn(TaskRaplized.prototype, 'loadByKey').mockResolvedValue({} as TaskRaplized) + vi.spyOn(TaskRaplized.prototype, 'handle').mockResolvedValue(undefined) + + mockStoreManager.keys.mockResolvedValue([]) + + // Start the task verse + taskVerse.play() + + // Create a task + const args = ['arg1', 'arg2'] + await taskVerse.createTask(...args) + + // Wait for async operations + await new Promise(resolve => setTimeout(resolve, 0)) + + // Stop the task verse + taskVerse.stop() + + expect((taskVerse as any).loading).toBe(false) + expect(TaskRaplized.prototype.save).toHaveBeenCalled() + }) + }) +}) diff --git a/packages/orap/src/verse/task.ts b/packages/orap/src/verse/task.ts index af34bc7..c1a1ea7 100644 --- a/packages/orap/src/verse/task.ts +++ b/packages/orap/src/verse/task.ts @@ -5,6 +5,7 @@ import type { Verse } from './interface' export class TaskVerse implements Verse { private loading = false + private playing = false constructor(private _flow: TaskFlow) { } @@ -18,6 +19,9 @@ export class TaskVerse implements Verse { * @param args exactly the same with eventsignal.callback, i.e. event log fields + eventlog obj */ async createTask(...args: Array) { + if (!this.playing) + return + const task = new TaskRaplized(this._flow, args) task.save().finally(async () => { if (this.loading) @@ -35,7 +39,7 @@ export class TaskVerse implements Verse { let keys = await this.flow.sm.keys(`${prefix}*`) while (true) { // break if no more keys - if (keys.length === 0) + if (!keys || keys.length === 0) break for (const key of keys) { const task = new TaskRaplized(this.flow, args) @@ -64,6 +68,15 @@ export class TaskVerse implements Verse { * start processor for this task _flow */ play() { + this.playing = true this.preload() } + + stop() { + this.playing = false + } + + restart() { + this.play() + } } diff --git a/packages/orap/tests/orap.test.ts b/packages/orap/tests/orap.test.ts index c215b3c..bbd6278 100644 --- a/packages/orap/tests/orap.test.ts +++ b/packages/orap/tests/orap.test.ts @@ -37,7 +37,7 @@ describe('Orap', () => { }, { timeout: 100000000, }) - it.skip('should run Declarative Demo without errors', async () => { + it('should run Declarative Demo without errors', async () => { const consoleSpy = vi.spyOn(console, 'log') const storeConfig = { port: parseInt(process.env.REDIS_PORT!), host: process.env.REDIS_HOST } diff --git a/packages/reku/package.json b/packages/reku/package.json index 10ade28..34210e6 100644 --- a/packages/reku/package.json +++ b/packages/reku/package.json @@ -1,7 +1,7 @@ { "name": "@ora-io/reku", "type": "module", - "version": "0.3.7", + "version": "0.4.0-beta.1", "packageManager": "pnpm@8.10.5", "description": "", "author": "Norman (nom4dv3), MuRong", diff --git a/packages/reku/src/event/crosschecker/autochecker.ts b/packages/reku/src/event/crosschecker/autochecker.ts index 541a363..ded5a96 100644 --- a/packages/reku/src/event/crosschecker/autochecker.ts +++ b/packages/reku/src/event/crosschecker/autochecker.ts @@ -10,6 +10,7 @@ import { BaseCrossChecker } from './basechecker' export class AutoCrossChecker extends BaseCrossChecker { cache: CrossCheckerCacheManager | undefined = undefined checkpointBlockNumber: number | undefined + playing = false constructor( provider: Providers, @@ -140,10 +141,14 @@ export class AutoCrossChecker extends BaseCrossChecker { // never ends if options.toBlock is not provided : () => false + this.playing = true debug('crosscheck running') // TODO: replace polling with schedule cron - await polling(async () => { + + const pollingFn = async () => { + if (!this.playing) + return true try { debug('start polling') const wait = await waitOrUpdateToBlock() @@ -165,7 +170,14 @@ export class AutoCrossChecker extends BaseCrossChecker { debug('polling error', error) return false } - }, pollingInterval) + } + + polling(pollingFn, pollingInterval) + } + + stop() { + this.playing = false + debug('auto crosscheck stop') } async diff(logs: ethers.Log[], ignoreLogs: SimpleLog[]): Promise { diff --git a/packages/reku/src/event/crosschecker/basechecker.ts b/packages/reku/src/event/crosschecker/basechecker.ts index cf7cae0..a061868 100644 --- a/packages/reku/src/event/crosschecker/basechecker.ts +++ b/packages/reku/src/event/crosschecker/basechecker.ts @@ -1,5 +1,6 @@ import type { ethers } from 'ethers' -import { timeoutWithRetry } from '@ora-io/utils' +import type { ContractAddress } from '@ora-io/utils' +import { timeoutWithRetry, to } from '@ora-io/utils' import { ETH_BLOCK_COUNT_ONE_HOUR } from '../../constants' import type { Providers } from '../../types/w3' import { debug } from '../../debug' @@ -35,11 +36,15 @@ export class BaseCrossChecker { // define from, to // TODO: use blockNumber for performance - const block = await timeoutWithRetry(() => { + const [err, block] = await to(timeoutWithRetry(() => { if (!this.provider || !this.provider.provider) throw new Error('provider not ready') return this.provider.provider.getBlock('latest') - }, 15 * 1000, 3) + }, 15 * 1000, 3)) + if (err) { + console.warn('crosscheck failed to get latest block', err) + return + } if (!block) { console.warn('crosscheck failed to get latest block') return @@ -62,11 +67,15 @@ export class BaseCrossChecker { ccfOptions: CrossCheckFromParam, ) { // TODO: use blockNumber for performance - const block = await timeoutWithRetry(() => { + const [err, block] = await to(timeoutWithRetry(() => { if (!this.provider || !this.provider.provider) throw new Error('provider not ready') return this.provider.provider.getBlock('latest') - }, 15 * 1000, 3) + }, 15 * 1000, 3)) + if (err) { + console.warn('crosscheck failed to get latest block', err) + return + } if (!block) { console.warn('crosscheck failed to get latest block') return @@ -86,24 +95,6 @@ export class BaseCrossChecker { return block.number } - /** - * @deprecated - * @param logs - * @param txHashList - * @param logIndexList - * @returns - */ - async diff_old(logs: ethers.Log[], txHashList: string[], logIndexList: number[][]): Promise { - const missing = (log: ethers.Log) => { - const txIndex = txHashList?.indexOf(log.transactionHash) || -1 - // 1. tx missing, or 2. if passed in logIndexList, event idx missing - return txIndex === -1 || (logIndexList && !logIndexList[txIndex].includes(log.index)) - } - // filter missing logs - const missingLogs = logs.filter((log) => { return missing(log) })// - return missingLogs - } - async diff(logs: ethers.Log[], ignoreLogs: SimpleLog[]): Promise { const missing = (logToFind: ethers.Log) => { const logIndex = ignoreLogs.findIndex( @@ -122,18 +113,46 @@ export class BaseCrossChecker { // get period logs const { fromBlock, toBlock, address, topics } = options debug('start crosscheck from %d to %d', fromBlock, toBlock) - const params = { - fromBlock, - toBlock, - ...(address && { address }), - ...(topics && { topics }), - } + if (this.provider.provider) { - const logs = await timeoutWithRetry(() => { - if (!this.provider || !this.provider.provider) - throw new Error('provider not ready') - return this.provider.provider.getLogs(params) - }, 15 * 1000, 3) + const addresses: ContractAddress[][] = [] + if (Array.isArray(address)) { + if (options.addressGroupLimit) { + for (let i = 0; i < address.length; i += options.addressGroupLimit) + addresses.push(address.slice(i, i + options.addressGroupLimit)) + } + else { + addresses.push(address) + } + } + else { + addresses.push([address]) + } + + const requests = addresses.map((address) => { + const params = { + fromBlock, + toBlock, + ...(address && { address }), + ...(topics && { topics }), + } + + const fn = async () => { + if (!this.provider || !this.provider.provider) + throw new Error('provider not ready') + return this.provider?.provider?.getLogs(params) + } + if (options.retryOptions) + return timeoutWithRetry(fn, options.retryOptions.timeout || 15 * 1000, options.retryOptions.retries || 3) + + else + return fn() + }) + + const logs = (await Promise.all(requests)).reduce((acc, curr) => { + return acc.concat(curr) + }, []) + // get ignoreLogs keys const ignoreLogs = options.ignoreLogs diff --git a/packages/reku/src/event/crosschecker/interface.ts b/packages/reku/src/event/crosschecker/interface.ts index d2de489..b90c7f1 100644 --- a/packages/reku/src/event/crosschecker/interface.ts +++ b/packages/reku/src/event/crosschecker/interface.ts @@ -6,7 +6,7 @@ export type FnOnMissingLog = (log: ethers.Log) => Awaitable export interface SimpleLog { transactionHash: string; index?: number } export interface LogFilterParam { - address: ContractAddress + address: ContractAddress | ContractAddress[] topics: string[] fromBlock?: number toBlock?: number @@ -15,6 +15,11 @@ export interface LogFilterParam { export interface BaseCrossCheckParam extends LogFilterParam { onMissingLog: FnOnMissingLog ignoreLogs?: SimpleLog[] + addressGroupLimit?: number // how many addresses to get logs at most + retryOptions?: { + timeout?: Milliseconds + retries?: number + } } export interface CrossCheckFromParam extends BaseCrossCheckParam { diff --git a/packages/reku/src/provider/provider.ts b/packages/reku/src/provider/provider.ts index 6de951f..ed6e291 100644 --- a/packages/reku/src/provider/provider.ts +++ b/packages/reku/src/provider/provider.ts @@ -3,7 +3,7 @@ import type { InterfaceAbi } from 'ethers' import { Interface, WebSocketProvider, ethers } from 'ethers' import { WebSocket } from 'ws' import type { ErrorEvent } from 'ws' -import { type ContractAddress, isInstanceof, to } from '@ora-io/utils' +import { type ContractAddress, isInstanceof, timeout, to } from '@ora-io/utils' import { debug } from '../debug' import { RekuContractManager } from './contract' @@ -69,9 +69,9 @@ export class RekuProviderManager { return this._provider?.destroyed } - addContract(address: ContractAddress, contract: ethers.Contract): RekuContractManager | undefined + addContract(address: ContractAddress, contract: ethers.BaseContract): RekuContractManager | undefined addContract(address: ContractAddress, abi: Interface | InterfaceAbi): RekuContractManager | undefined - addContract(address: ContractAddress, abi: Interface | InterfaceAbi | ethers.Contract): RekuContractManager | undefined { + addContract(address: ContractAddress, abi: Interface | InterfaceAbi | ethers.BaseContract): RekuContractManager | undefined { if (this._provider) { if (abi instanceof Interface || Array.isArray(abi)) { if (!abi) @@ -82,7 +82,7 @@ export class RekuProviderManager { debug('add contract %s', address) return contract } - else if (abi instanceof ethers.Contract) { + else if (abi instanceof ethers.BaseContract) { const contract = new RekuContractManager(address, abi.interface, this._provider) this._contracts.set(address, contract) debug('add contract %s', address) @@ -217,18 +217,14 @@ export class RekuProviderManager { debug('heartbeat running...') const hasProvider = this._hasProvider() debug('heartbeat has provider: %s', hasProvider) - this._provider?.send('net_version', []) - .then((res) => { - debug('heartbeat response: %s', res) - }) - .catch((err) => { - this.reconnect() - this._event?.emit('error', err) - debug('heartbeat error: %s', err) - }) - .finally(() => { - debug('heartbeat finally') - }) + + const [err, res] = await to(timeout(async () => this._provider?.send('net_version', []), 10 * 1000)) + if (err) { + this.reconnect() + this._event?.emit('error', err) + debug('heartbeat timeout error: %s', err) + } + else { debug('heartbeat response: %s', res) } } else { debug('heartbeat destroyed') diff --git a/packages/reku/tests/autochecker.test.ts b/packages/reku/tests/autochecker.test.ts new file mode 100644 index 0000000..060527d --- /dev/null +++ b/packages/reku/tests/autochecker.test.ts @@ -0,0 +1,342 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +import { Logger, setLogger } from '@ora-io/utils' +import { AutoCrossChecker } from '../src/event/crosschecker/autochecker' +import type { AutoCrossCheckParam, SimpleLog } from '../src/event/crosschecker/interface' + +// Mock ethers provider +const mockProvider = { + provider: { + getBlock: vi.fn().mockResolvedValue({ number: 2000 }), + getLogs: vi.fn().mockResolvedValue([]), + getBlockNumber: vi.fn().mockResolvedValue(2000), + }, +} + +// Mock logs for testing +const mockLogs: any[] = [ + { + transactionHash: '0x1234567890abcdef', + index: 0, + blockNumber: 1000, + blockHash: '0xblockhash1', + address: '0xcontract1', + topics: ['0xtopic1'], + data: '0xdata1', + removed: false, + transactionIndex: 0, + }, + { + transactionHash: '0xabcdef1234567890', + index: 1, + blockNumber: 1000, + blockHash: '0xblockhash1', + address: '0xcontract1', + topics: ['0xtopic1'], + data: '0xdata2', + removed: false, + transactionIndex: 0, + }, +] + +setLogger(new Logger('debug', 'reku-autochecker-tests')) + +describe('AutoCrossChecker', () => { + let autoChecker: AutoCrossChecker + let onMissingLogSpy: ReturnType + + beforeEach(() => { + vi.clearAllMocks() + autoChecker = new AutoCrossChecker(mockProvider as any) + onMissingLogSpy = vi.fn() + }) + + afterEach(() => { + vi.restoreAllMocks() + }) + + describe('constructor', () => { + it('should create instance with provider', () => { + expect(autoChecker).toBeInstanceOf(AutoCrossChecker) + expect(autoChecker.provider).toBe(mockProvider) + expect(autoChecker.cache).toBeUndefined() + expect(autoChecker.checkpointBlockNumber).toBeUndefined() + expect(autoChecker.playing).toBe(false) + }) + }) + + describe('validate', () => { + it('should validate valid options', () => { + const options: AutoCrossCheckParam = { + fromBlock: 1000, + toBlock: 2000, + batchBlocksCount: 10, + pollingInterval: 3000, + address: '0xcontract1', + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + } + + expect(() => autoChecker.validate(options)).not.toThrow() + }) + + it('should throw error when batchBlocksCount is 0', () => { + const options: AutoCrossCheckParam = { + batchBlocksCount: 0, + address: '0xcontract1', + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + } + + expect(() => autoChecker.validate(options)).toThrow('options invalid: should batchBlocksCount >= 1') + }) + + it('should throw error when batchBlocksCount is negative', () => { + const options: AutoCrossCheckParam = { + batchBlocksCount: -1, + address: '0xcontract1', + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + } + + expect(() => autoChecker.validate(options)).toThrow('options invalid: should batchBlocksCount >= 1') + }) + + it('should throw error when toBlock is provided without fromBlock', () => { + const options: AutoCrossCheckParam = { + toBlock: 2000, + address: '0xcontract1', + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + } + + expect(() => autoChecker.validate(options)).toThrow('options invalid: need fromBlock when toBlock presents') + }) + + it('should throw error when toBlock is less than fromBlock', () => { + const options: AutoCrossCheckParam = { + fromBlock: 2000, + toBlock: 1000, + address: '0xcontract1', + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + } + + expect(() => autoChecker.validate(options)).toThrow('options invalid: should fromBlock <= toBlock') + }) + + it('should throw error when pollingInterval is too high in realtime mode', () => { + const options: AutoCrossCheckParam = { + batchBlocksCount: 10, + pollingInterval: 150000, // Higher than 10 * 12000 = 120000 + address: '0xcontract1', + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + } + + expect(() => autoChecker.validate(options)).toThrow('options invalid: should pollingInterval <= batchBlocksCount * blockInterval when no toBlock present') + }) + }) + + describe('setCheckpoint', () => { + it('should set checkpoint and call cache.setCheckpoint', async () => { + const checkpoint = 1500 + + // Mock the cache property + const mockCache = { + setCheckpoint: vi.fn(), + } + Object.defineProperty(autoChecker, 'cache', { + value: mockCache, + writable: true, + }) + + await autoChecker.setCheckpoint(checkpoint) + + expect(autoChecker.checkpointBlockNumber).toBe(checkpoint) + expect(mockCache.setCheckpoint).toHaveBeenCalledWith(checkpoint) + }) + }) + + describe('start', () => { + it('should start with valid options and create cache manager', async () => { + mockProvider.provider.getBlockNumber.mockResolvedValue(2000) + + const options: AutoCrossCheckParam = { + fromBlock: 1000, + toBlock: 2000, + batchBlocksCount: 10, + pollingInterval: 3000, + address: '0xcontract1', + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + } + + // Mock the polling function to return immediately + const mockPolling = vi.fn().mockResolvedValue(true) + vi.doMock('@ora-io/utils', () => ({ + ...vi.importActual('@ora-io/utils'), + polling: mockPolling, + })) + + await autoChecker.start(options) + + expect(autoChecker.cache).toBeDefined() + expect(autoChecker.checkpointBlockNumber).toBe(1000) + expect(autoChecker.playing).toBe(true) + }) + + it('should handle provider not ready error', async () => { + mockProvider.provider.getBlockNumber.mockRejectedValue(new Error('provider not ready')) + + const options: AutoCrossCheckParam = { + fromBlock: 1000, + toBlock: 2000, + batchBlocksCount: 10, + pollingInterval: 3000, + address: '0xcontract1', + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + } + + await expect(autoChecker.start(options)).rejects.toThrow('provider not ready') + }) + + it('should use default values when options are not provided', async () => { + mockProvider.provider.getBlockNumber.mockResolvedValue(2000) + + const options: AutoCrossCheckParam = { + address: '0xcontract1', + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + } + + // Mock the polling function to return immediately + const mockPolling = vi.fn().mockResolvedValue(true) + vi.doMock('@ora-io/utils', () => ({ + ...vi.importActual('@ora-io/utils'), + polling: mockPolling, + })) + + await autoChecker.start(options) + + expect(autoChecker.checkpointBlockNumber).toBe(2000) // Should use latest block number + }) + + it('should handle autoFollowLatestBlock option', async () => { + mockProvider.provider.getBlockNumber.mockResolvedValue(2000) + + const options: AutoCrossCheckParam = { + fromBlock: 1000, + batchBlocksCount: 10, + pollingInterval: 3000, + autoFollowLatestBlock: true, + address: '0xcontract1', + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + } + + // Mock the polling function to return immediately + const mockPolling = vi.fn().mockResolvedValue(true) + vi.doMock('@ora-io/utils', () => ({ + ...vi.importActual('@ora-io/utils'), + polling: mockPolling, + })) + + await autoChecker.start(options) + + expect(autoChecker.playing).toBe(true) + }) + }) + + describe('stop', () => { + it('should set playing to false', () => { + autoChecker.playing = true + + autoChecker.stop() + + expect(autoChecker.playing).toBe(false) + }) + }) + + describe('diff', () => { + it('should call parent diff method and filter based on cache', async () => { + // Mock the cache property + const mockCache = { + encodeLogKey: vi.fn().mockReturnValue('log-key-1'), + has: vi.fn().mockResolvedValue(false), + } + Object.defineProperty(autoChecker, 'cache', { + value: mockCache, + writable: true, + }) + + const ignoreLogs: SimpleLog[] = [ + { transactionHash: '0x1234567890abcdef', index: 0 }, + ] + + const result = await autoChecker.diff(mockLogs, ignoreLogs) + + expect(mockCache.encodeLogKey).toHaveBeenCalledWith(mockLogs[1]) + expect(mockCache.has).toHaveBeenCalledWith('log-key-1') + expect(result).toHaveLength(1) + }) + }) + + describe('integration tests', () => { + it('should handle complete start and stop cycle', async () => { + mockProvider.provider.getBlockNumber.mockResolvedValue(2000) + + const options: AutoCrossCheckParam = { + fromBlock: 1000, + toBlock: 2000, + batchBlocksCount: 10, + pollingInterval: 3000, + address: '0xcontract1', + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + } + + // Mock the polling function to return immediately + const mockPolling = vi.fn().mockResolvedValue(true) + vi.doMock('@ora-io/utils', () => ({ + ...vi.importActual('@ora-io/utils'), + polling: mockPolling, + })) + + await autoChecker.start(options) + expect(autoChecker.playing).toBe(true) + + autoChecker.stop() + expect(autoChecker.playing).toBe(false) + }) + + it('should handle onMissingLog callback with cache update', async () => { + mockProvider.provider.getBlockNumber.mockResolvedValue(2000) + + const options: AutoCrossCheckParam = { + fromBlock: 1000, + toBlock: 2000, + batchBlocksCount: 10, + pollingInterval: 3000, + address: '0xcontract1', + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + } + + // Mock the polling function to return immediately + const mockPolling = vi.fn().mockResolvedValue(true) + vi.doMock('@ora-io/utils', () => ({ + ...vi.importActual('@ora-io/utils'), + polling: mockPolling, + })) + + await autoChecker.start(options) + + // Simulate a missing log callback + const log = mockLogs[0] + await options.onMissingLog(log) + + expect(onMissingLogSpy).toHaveBeenCalledWith(log) + }) + }) +}) diff --git a/packages/reku/tests/basechecker.test.ts b/packages/reku/tests/basechecker.test.ts new file mode 100644 index 0000000..78bcaef --- /dev/null +++ b/packages/reku/tests/basechecker.test.ts @@ -0,0 +1,442 @@ +import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' +import { Logger, setLogger } from '@ora-io/utils' +import { BaseCrossChecker } from '../src/event/crosschecker/basechecker' +import type { CrossCheckFromParam, CrossCheckRangeParam, CrossCheckRetroParam, SimpleLog } from '../src/event/crosschecker/interface' + +// Mock ethers provider +const mockProvider = { + provider: { + getBlock: vi.fn(), + getLogs: vi.fn(), + provider: { + getBlock: vi.fn(), + getLogs: vi.fn(), + }, + }, +} + +// Mock logs for testing +const mockLogs: any[] = [ + { + transactionHash: '0x1234567890abcdef', + index: 0, + blockNumber: 1000, + blockHash: '0xblockhash1', + address: '0xcontract1', + topics: ['0xtopic1'], + data: '0xdata1', + removed: false, + transactionIndex: 0, + }, + { + transactionHash: '0xabcdef1234567890', + index: 1, + blockNumber: 1000, + blockHash: '0xblockhash1', + address: '0xcontract1', + topics: ['0xtopic1'], + data: '0xdata2', + removed: false, + transactionIndex: 0, + }, + { + transactionHash: '0x9876543210fedcba', + index: 2, + blockNumber: 1001, + blockHash: '0xblockhash2', + address: '0xcontract1', + topics: ['0xtopic1'], + data: '0xdata3', + removed: false, + transactionIndex: 0, + }, +] + +// Mock block for testing +const mockBlock = { + number: 2000, + hash: '0xlatestblockhash', + timestamp: Date.now(), +} + +setLogger(new Logger('debug', 'reku-basechecker-tests')) + +describe('BaseCrossChecker', () => { + let baseChecker: BaseCrossChecker + let onMissingLogSpy: ReturnType + + beforeEach(() => { + vi.clearAllMocks() + baseChecker = new BaseCrossChecker(mockProvider as any) + onMissingLogSpy = vi.fn() + }) + + afterEach(() => { + vi.restoreAllMocks() + }) + + describe('constructor', () => { + it('should create instance with provider', () => { + expect(baseChecker).toBeInstanceOf(BaseCrossChecker) + expect(baseChecker.provider).toBe(mockProvider) + }) + }) + + describe('crossCheckRange', () => { + it('should call _crossCheck with correct parameters', async () => { + const spy = vi.spyOn(baseChecker as any, '_crossCheck').mockResolvedValue(undefined) + + const options: CrossCheckRangeParam = { + fromBlock: 1000, + toBlock: 2000, + address: '0xcontract1', + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + } + + await baseChecker.crossCheckRange(options) + + expect(spy).toHaveBeenCalledWith(options) + }) + }) + + describe('crossCheckRetro', () => { + it('should get latest block and call _crossCheck with correct range', async () => { + const spy = vi.spyOn(baseChecker as any, '_crossCheck').mockResolvedValue(undefined) + mockProvider.provider.getBlock.mockResolvedValue(mockBlock) + + const options: CrossCheckRetroParam = { + retroBlockCount: 100, + address: '0xcontract1', + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + } + + await baseChecker.crossCheckRetro(options) + + expect(mockProvider.provider.getBlock).toHaveBeenCalledWith('latest') + expect(spy).toHaveBeenCalledWith({ + ...options, + fromBlock: mockBlock.number - options.retroBlockCount, + toBlock: mockBlock.number, + }) + }) + + it('should warn when retroBlockCount is too low', async () => { + const consoleSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) + mockProvider.provider.getBlock.mockResolvedValue(mockBlock) + vi.spyOn(baseChecker as any, '_crossCheck').mockResolvedValue(undefined) + + const options: CrossCheckRetroParam = { + retroBlockCount: 100, // Less than ETH_BLOCK_COUNT_ONE_HOUR + address: '0xcontract1', + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + } + + await baseChecker.crossCheckRetro(options) + + expect(consoleSpy).toHaveBeenCalledWith( + 'crosscheck retroBlockCount too low, recommend block range >= 1 hour', + ) + consoleSpy.mockRestore() + }) + + it('should handle provider not ready error', async () => { + mockProvider.provider.getBlock.mockRejectedValue(new Error('provider not ready')) + const consoleSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) + + const options: CrossCheckRetroParam = { + retroBlockCount: 100, + address: '0xcontract1', + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + } + + await baseChecker.crossCheckRetro(options) + + expect(consoleSpy).toHaveBeenCalledWith( + 'crosscheck failed to get latest block', + new Error('provider not ready'), + ) + consoleSpy.mockRestore() + }) + + it('should handle null block response', async () => { + // Reset mock before setting up the null response + vi.clearAllMocks() + mockProvider.provider.getBlock.mockResolvedValue(null) + const consoleSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) + + const options: CrossCheckRetroParam = { + retroBlockCount: 100, + address: '0xcontract1', + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + } + + await baseChecker.crossCheckRetro(options) + + expect(consoleSpy).toHaveBeenCalledWith('crosscheck failed to get latest block') + consoleSpy.mockRestore() + }) + }) + + describe('crossCheckFrom', () => { + it('should get latest block and call _crossCheck with correct range', async () => { + const spy = vi.spyOn(baseChecker as any, '_crossCheck').mockResolvedValue(undefined) + mockProvider.provider.getBlock.mockResolvedValue(mockBlock) + + const options: CrossCheckFromParam = { + fromBlock: 1000, + address: '0xcontract1', + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + } + + const result = await baseChecker.crossCheckFrom(options) + + expect(mockProvider.provider.getBlock).toHaveBeenCalledWith('latest') + expect(spy).toHaveBeenCalledWith({ + ...options, + fromBlock: options.fromBlock, + toBlock: mockBlock.number, + }) + expect(result).toBe(mockBlock.number) + }) + + it('should warn when block range is too small', async () => { + const consoleSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) + mockProvider.provider.getBlock.mockResolvedValue(mockBlock) + vi.spyOn(baseChecker as any, '_crossCheck').mockResolvedValue(undefined) + + const options: CrossCheckFromParam = { + fromBlock: 1999, // Very close to latest block + address: '0xcontract1', + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + } + + await baseChecker.crossCheckFrom(options) + + expect(consoleSpy).toHaveBeenCalledWith( + 'crosscheck retroBlockCount too low, recommend crosscheck interval >= 1 hour', + ) + consoleSpy.mockRestore() + }) + + it('should handle provider not ready error', async () => { + mockProvider.provider.getBlock.mockRejectedValue(new Error('provider not ready')) + const consoleSpy = vi.spyOn(console, 'warn').mockImplementation(() => {}) + + const options: CrossCheckFromParam = { + fromBlock: 1000, + address: '0xcontract1', + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + } + + const result = await baseChecker.crossCheckFrom(options) + + expect(consoleSpy).toHaveBeenCalledWith( + 'crosscheck failed to get latest block', + new Error('provider not ready'), + ) + expect(result).toBeUndefined() + consoleSpy.mockRestore() + }) + }) + + describe('diff', () => { + it('should return logs that are not in ignoreLogs', async () => { + const ignoreLogs: SimpleLog[] = [ + { transactionHash: '0x1234567890abcdef', index: 0 }, + { transactionHash: '0xabcdef1234567890', index: 1 }, + ] + + const result = await baseChecker.diff(mockLogs, ignoreLogs) + + expect(result).toHaveLength(1) + expect(result[0].transactionHash).toBe('0x9876543210fedcba') + }) + + it('should return all logs when ignoreLogs is empty', async () => { + const result = await baseChecker.diff(mockLogs, []) + + expect(result).toHaveLength(3) + expect(result).toEqual(mockLogs) + }) + + it('should handle logs without index in ignoreLogs', async () => { + const ignoreLogs: SimpleLog[] = [ + { transactionHash: '0x1234567890abcdef' }, // No index specified + ] + + const result = await baseChecker.diff(mockLogs, ignoreLogs) + + expect(result).toHaveLength(2) + expect(result[0].transactionHash).toBe('0xabcdef1234567890') + expect(result[1].transactionHash).toBe('0x9876543210fedcba') + }) + + it('should return empty array when all logs are ignored', async () => { + const ignoreLogs: SimpleLog[] = [ + { transactionHash: '0x1234567890abcdef', index: 0 }, + { transactionHash: '0xabcdef1234567890', index: 1 }, + { transactionHash: '0x9876543210fedcba', index: 2 }, + ] + + const result = await baseChecker.diff(mockLogs, ignoreLogs) + + expect(result).toHaveLength(0) + }) + }) + + describe('_crossCheck', () => { + it('should get logs and call onMissingLog for missing logs', async () => { + mockProvider.provider.getLogs.mockResolvedValue(mockLogs) + + const options: CrossCheckRangeParam = { + fromBlock: 1000, + toBlock: 2000, + address: '0xcontract1', + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + ignoreLogs: [ + { transactionHash: '0x1234567890abcdef', index: 0 }, + ], + } + + await (baseChecker as any)._crossCheck(options) + + expect(mockProvider.provider.getLogs).toHaveBeenCalledWith({ + fromBlock: 1000, + toBlock: 2000, + address: ['0xcontract1'], + topics: ['0xtopic1'], + }) + expect(onMissingLogSpy).toHaveBeenCalledTimes(2) + }) + + it('should handle provider not ready error', async () => { + mockProvider.provider.getLogs.mockRejectedValue(new Error('provider not ready')) + + const options: CrossCheckRangeParam = { + fromBlock: 1000, + toBlock: 2000, + address: ['0xcontract1'], + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + } + + await expect((baseChecker as any)._crossCheck(options)).rejects.toThrow('provider not ready') + }) + + it('should handle missing address and topics', async () => { + mockProvider.provider.getLogs.mockResolvedValue(mockLogs) + + const options: CrossCheckRangeParam = { + fromBlock: 1000, + toBlock: 2000, + address: '0xcontract1', + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + } + + await (baseChecker as any)._crossCheck(options) + + expect(mockProvider.provider.getLogs).toHaveBeenCalledWith({ + fromBlock: 1000, + toBlock: 2000, + address: ['0xcontract1'], + topics: ['0xtopic1'], + }) + }) + + it('should call onMissingLog for all logs when no ignoreLogs provided', async () => { + mockProvider.provider.getLogs.mockResolvedValue(mockLogs) + + const options: CrossCheckRangeParam = { + fromBlock: 1000, + toBlock: 2000, + address: '0xcontract1', + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + } + + await (baseChecker as any)._crossCheck(options) + + expect(onMissingLogSpy).toHaveBeenCalledTimes(3) + }) + + it('should handle addressGroupLimit', async () => { + mockProvider.provider.getLogs.mockResolvedValue(mockLogs) + + const options: CrossCheckRangeParam = { + fromBlock: 1000, + toBlock: 2000, + address: ['0xcontract1', '0xcontract2'], + addressGroupLimit: 1, + onMissingLog: onMissingLogSpy, + topics: [], + } + + await (baseChecker as any)._crossCheck(options) + + expect(mockProvider.provider.getLogs).toHaveBeenCalledTimes(2) + }) + + it('should handle addressGroupLimit with single address', async () => { + mockProvider.provider.getLogs.mockResolvedValue(mockLogs) + + const options: CrossCheckRangeParam = { + fromBlock: 1000, + toBlock: 2000, + address: '0xcontract1', + addressGroupLimit: 1, + onMissingLog: onMissingLogSpy, + topics: [], + } + + await (baseChecker as any)._crossCheck(options) + + expect(mockProvider.provider.getLogs).toHaveBeenCalledTimes(1) + + expect(mockProvider.provider.getLogs).toHaveBeenCalledWith({ + fromBlock: 1000, + toBlock: 2000, + address: ['0xcontract1'], + topics: [], + }) + }) + + it.only('should handle retryOptions', async () => { + mockProvider.provider.getLogs.mockRejectedValue(Error('provider not ready')) + + const options: CrossCheckRangeParam = { + fromBlock: 1000, + toBlock: 2000, + address: '0xcontract1', + topics: ['0xtopic1'], + onMissingLog: onMissingLogSpy, + retryOptions: { + timeout: 1000, + retries: 1, + }, + } + + const defaultCallTimes = 1 + + try { + await (baseChecker as any)._crossCheck(options) + } + catch (error) { + expect(error).toBeInstanceOf(Error) + expect((error as Error).message).toBe('provider not ready') + } + + expect(mockProvider.provider.getLogs).toHaveBeenCalledTimes(defaultCallTimes + options.retryOptions!.retries!) + expect(onMissingLogSpy).toHaveBeenCalledTimes(0) + }) + }) +}) diff --git a/packages/reku/tests/crosscheck.test.ts b/packages/reku/tests/crosscheck.test.ts index bbf3f5c..e334efc 100644 --- a/packages/reku/tests/crosscheck.test.ts +++ b/packages/reku/tests/crosscheck.test.ts @@ -61,7 +61,7 @@ export async function crossCheckerTest() { }) } -describe('CrossChecker Tests', () => { +describe.skip('CrossChecker Tests', () => { it('should run crossCheckerTest without errors', async () => { const consoleSpy = vi.spyOn(console, 'log') diff --git a/packages/utils/package.json b/packages/utils/package.json index b65d78f..13ee0b3 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -1,7 +1,7 @@ { "name": "@ora-io/utils", "type": "module", - "version": "0.3.3", + "version": "0.4.0-beta.1", "packageManager": "pnpm@8.10.5", "description": "", "author": "Norman (nom4dv3), MuRong",