Skip to content
Merged

Dev #35

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "ora-stack",
"version": "0.3.5",
"version": "0.4.0-beta.4",
"private": true,
"packageManager": "pnpm@9.7.0",
"description": "",
Expand Down Expand Up @@ -47,7 +47,8 @@
"publish": "nr prepublishOnly && esno scripts/publish.ts",
"release": "esno scripts/release.ts",
"start": "esno src/index.ts",
"test": "DEBUG=DEBUG:ora-stack:* vitest",
"test": "vitest",
"test:debug": "DEBUG=DEBUG:ora-stack:* vitest",
"typecheck": "tsc --noEmit"
},
"devDependencies": {
Expand Down
52 changes: 33 additions & 19 deletions packages/orap/examples/declarativeDemo/app.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import type { ContractEventPayload } from 'ethers'
import { Logger, objectKeys, randomStr, redisStore } from '@ora-io/utils'
import type { ListenOptions, ToKeyFn } from '../../src'
import { CheckTransactionStatus, Orap, StoreManager, getMiddlewareContext } from '../../src'
import { Orap, StoreManager, getMiddlewareContext } from '../../src'
import ABI from './erc20.abi.json'

const MAINNET_USDT_ADDR = '0xdAC17F958D2ee523a2206206994597C13D831ec7'
Expand All @@ -24,7 +24,12 @@ export function startDemo(options: ListenOptions, storeConfig?: any) {

const toKey: ToKeyFn = (from: string, _to: string, _amount: number) => `${from}_${randomStr(4)}`

orap.event(eventSignalParam.address, eventSignalParam.abi, eventSignalParam.eventName)
const event = orap.event({
address: eventSignalParam.address,
abi: eventSignalParam.abi,
eventName: eventSignalParam.eventName,
enableSubscribe: false,
})
.crosscheck({
store,
storeKeyPrefix: 'ora-stack:orap:demo:cc:',
Expand All @@ -37,31 +42,40 @@ export function startDemo(options: ListenOptions, storeConfig?: any) {
// event hook, not necessary
.handle(newEventSignalHook)

// add a task
.task()
// add a task
event.task()
.cache(sm)
.key(toKey)
.prefix('ora-stack:orap:demo:TransferTask:', 'ora-stack:orap:demo:Done-TransferTask:')
.ttl({ taskTtl: 120000, doneTtl: 60000 })
.use(CheckTransactionStatus(options.wsProvider))
// .use(CheckTransactionStatus(options.wsProvider))
.handle(handleTask)
// add another task
.another()
.task()
.prefix('ora-stack:orap:demo:AnotherTask:', 'ora-stack:orap:demo:Done-AnotherTask:')
.cache(sm) // rm to use mem by default
.ttl({ taskTtl: 20000, doneTtl: 20000 })
.handle(handleTask_2)
// .another()
// .task()
// .prefix('ora-stack:orap:demo:AnotherTask:', 'ora-stack:orap:demo:Done-AnotherTask:')
// .cache(sm) // rm to use mem by default
// .ttl({ taskTtl: 20000, doneTtl: 20000 })
// .handle(handleTask_2)

// start signal listener
orap.listen(
options,
() => { logger.log('listening on provider.network') },
)

setTimeout(() => {
logger.log('[+] add another address')
event.addresses([
eventSignalParam.address,
'0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48',
])
event.restart()
}, 10 * 1000)
}

async function handleTask(from: string, to: string, amount: number, _event: ContractEventPayload) {
logger.log('[+] handleTask: from =', from, 'to =', to, 'amount =', amount)
async function handleTask(from: string, to: string, amount: number, event: ContractEventPayload) {
logger.log('[+] handleTask: from =', from, 'to =', to, 'amount =', amount, 'address =', event.log.address)
const args = objectKeys(arguments).map(k => arguments[k])

const { next } = getMiddlewareContext(...args)
Expand All @@ -74,10 +88,10 @@ async function newEventSignalHook(from: string, to: string, amount: number, even
return true // true to continue handle tasks, false to hijack the process.
}

async function handleTask_2(from: string, to: string, amount: number) {
logger.log('[+] handleTask_2: from =', from, 'to =', to, 'amount =', amount)
const args = objectKeys(arguments).map(k => arguments[k])
// async function handleTask_2(from: string, to: string, amount: number) {
// logger.log('[+] handleTask_2: from =', from, 'to =', to, 'amount =', amount)
// const args = objectKeys(arguments).map(k => arguments[k])

const { next } = getMiddlewareContext(...args)
await next()
}
// const { next } = getMiddlewareContext(...args)
// await next()
// }
2 changes: 1 addition & 1 deletion packages/orap/package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@ora-io/orap",
"type": "module",
"version": "0.3.5",
"version": "0.4.0-beta.4",
"packageManager": "pnpm@8.10.5",
"description": "",
"author": "Norman (nom4dv3), MuRong",
Expand Down
4 changes: 4 additions & 0 deletions packages/orap/src/beat/event.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,4 +20,8 @@ export class EventBeat extends EventSignal {
drop() {
this.listen(this.subscribeProvider, this.crosscheckProvider)
}

stop() {
super.stop()
}
}
45 changes: 43 additions & 2 deletions packages/orap/src/flow/event.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import { ethers } from 'ethers'
import { beforeEach, describe, expect, it } from 'vitest'
import { beforeEach, describe, expect, it, vi } from 'vitest'
import { EventVerse } from '../verse/event'
import { ERC20_ABI, USDT_ADDRESS } from '../../tests/config'
import { OrapFlow } from './orap'
import { EventFlow } from './event'

Expand All @@ -9,7 +11,7 @@ describe('EventFlow', () => {

beforeEach(() => {
orapFlow = new OrapFlow()
eventFlow = new EventFlow(orapFlow)
eventFlow = new EventFlow(orapFlow, { address: USDT_ADDRESS, abi: ERC20_ABI, eventName: 'Transfer' })
})

it('should create a task flow', () => {
Expand Down Expand Up @@ -48,4 +50,43 @@ describe('EventFlow', () => {
const parentFlow = eventFlow.another()
expect(parentFlow).toBe(orapFlow)
})

it('should stop the EventVerse', () => {
const stopFn = vi.fn()
vi.spyOn(EventVerse.prototype, 'stop').mockImplementation(stopFn)
eventFlow.stop()
expect(stopFn).toHaveBeenCalled()
})

describe('address', () => {
it('should set the address', () => {
const eventFlow = new EventFlow(orapFlow, { address: USDT_ADDRESS, abi: ERC20_ABI, eventName: 'Transfer' })
eventFlow.address('0x1234567890123456789012345678901234567890')
expect(eventFlow.params.address).toEqual(['0x1234567890123456789012345678901234567890'])
})

it.only('should set the array address', () => {
const eventFlow = new EventFlow(orapFlow, { address: [USDT_ADDRESS], abi: ERC20_ABI, eventName: 'Transfer' })
eventFlow.address(1, '0x1234567890123456789012345678901234567890')
expect(eventFlow.params.address).toContainEqual('0x1234567890123456789012345678901234567890')
})

it('should set the address with number', () => {
const eventFlow = new EventFlow(orapFlow, { address: USDT_ADDRESS, abi: ERC20_ABI, eventName: 'Transfer' })
eventFlow.address(1, '0x1234567890123456789012345678901234567890')
expect(eventFlow.params.address).toEqual(['0x1234567890123456789012345678901234567890'])
})

it('should set the addresses with array', () => {
const eventFlow = new EventFlow(orapFlow, { address: USDT_ADDRESS, abi: ERC20_ABI, eventName: 'Transfer' })
eventFlow.addresses(['0x1234567890123456789012345678901234567890'])
expect(eventFlow.params.address).toEqual(['0x1234567890123456789012345678901234567890'])
})

it('should set the addresses with array and number', () => {
const eventFlow = new EventFlow(orapFlow, { address: USDT_ADDRESS, abi: ERC20_ABI, eventName: 'Transfer' })
eventFlow.addresses([USDT_ADDRESS, '0x1234567890123456789012345678901234567890'])
expect(eventFlow.params.address).toEqual([USDT_ADDRESS, '0x1234567890123456789012345678901234567890'])
})
})
})
64 changes: 61 additions & 3 deletions packages/orap/src/flow/event.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { AutoCrossCheckParam, Providers } from '@ora-io/reku'
import type { ContractAddress } from '@ora-io/utils/src'
import type { Context } from '../task'
import type { TaskFlowParams } from '../flow/task'
import { TaskFlow } from '../flow/task'
Expand All @@ -18,11 +19,25 @@ export class EventFlow implements Flow {
private _subscribeProvider?: Providers
private _crosscheckProvider?: Providers

private _verse: EventVerse = new EventVerse(this)
private _addresses: ContractAddress[] = []
private _params: EventSignalRegisterParams

get verse() {
return this._verse
}

get params() {
return this._params
}

constructor(
private parentFlow?: OrapFlow,
public params?: EventSignalRegisterParams,
private parentFlow: OrapFlow,
params: EventSignalRegisterParams,
handleFn?: HandleFn, // return: succ & continue if true, stop if false
) {
this._params = params
this._addresses = Array.isArray(this.params.address) ? this.params.address : [this.params.address]
// Default handleFn
this.handleFn = handleFn ?? (async (..._args: Array<any>) => {
return true
Expand Down Expand Up @@ -52,6 +67,7 @@ export class EventFlow implements Flow {
tf = new TaskFlow(this, sm)
}
this._taskFlows.push(tf)
this._verse.setTaskVerses(this._taskFlows.map(flow => flow.verse))
return tf
}

Expand Down Expand Up @@ -89,6 +105,48 @@ export class EventFlow implements Flow {
}

another(): OrapFlow {
return this.parentFlow!
return this.parentFlow
}

stop(): this {
this._verse.stop()
return this
}

restart(): this {
if (this.parentFlow.wsProvider)
this._subscribeProvider = this.parentFlow.wsProvider
else
throw new Error('wsProvider is not set, cannot restart')
if (this.parentFlow.httpProvider)
this._crosscheckProvider = this.parentFlow.httpProvider
else
throw new Error('httpProvider is not set, cannot restart')
this._verse.restart()
return this
}

address(_index: number, _address: ContractAddress): this
address(address: ContractAddress): this
address(_first: ContractAddress | number, _second?: ContractAddress): this {
if (typeof _first === 'number') {
if (!_second)
throw new Error('address is required')
if (Array.isArray(this._params.address))
Reflect.set(this._addresses, _first, _second)

else this._addresses = [_second!]
}
else
if (Array.isArray(this._params.address)) { this._addresses = [...new Set([...this._params.address, _first])] }
else { this._addresses = [_first] }
this._params.address = this._addresses
return this
}

addresses(addresses: ContractAddress[]): this {
this._addresses = addresses
this._params.address = this._addresses
return this
}
}
8 changes: 8 additions & 0 deletions packages/orap/src/flow/orap.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { ethers } from 'ethers'
import { beforeEach, describe, expect, it, vi } from 'vitest'
import { SEPOLIA_HTTP, SEPOLIA_WSS } from '../../tests/config'
import { OrapVerse } from '../verse/orap'
import { OrapFlow } from './orap'

describe('OrapFlow', () => {
Expand Down Expand Up @@ -33,4 +34,11 @@ describe('OrapFlow', () => {
expect(eventFlow).toBeDefined()
expect(orapFlow.eventFlows).toContain(eventFlow)
})

it('should stop the OrapVerse', () => {
const stopFn = vi.fn()
vi.spyOn(OrapVerse.prototype, 'stop').mockImplementation(stopFn)
orapFlow.stop()
expect(stopFn).toHaveBeenCalled()
})
})
43 changes: 36 additions & 7 deletions packages/orap/src/flow/orap.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,31 @@ export class OrapFlow implements Flow {
} = { event: [] }

onListenFn: Fn = () => { }
_wsProvider?: Providers
_httpProvider?: Providers

get eventFlows() {
return this.subflows.event
}

get wsProvider() {
return this._wsProvider
}

get httpProvider() {
return this._httpProvider
}

private _verse: OrapVerse = new OrapVerse(this)

get verse() {
return this._verse
}

event(params: EventSignalRegisterParams, handler?: HandleFn): EventFlow
event(address: ContractAddress, abi: Interface | InterfaceAbi | HandleFn, eventName: string, handler?: HandleFn): EventFlow
event(params: EventSignalRegisterParams | ContractAddress, abi?: Interface | InterfaceAbi | HandleFn, eventName?: string, handler?: HandleFn): EventFlow {
if (typeof params === 'string' || isAddressable(params))
event(address: ContractAddress | ContractAddress[], abi: Interface | InterfaceAbi | HandleFn, eventName: string, handler?: HandleFn): EventFlow
event(params: EventSignalRegisterParams | ContractAddress | ContractAddress[], abi?: Interface | InterfaceAbi | HandleFn, eventName?: string, handler?: HandleFn): EventFlow {
if (typeof params === 'string' || isAddressable(params) || Array.isArray(params))
params = { address: params, abi: abi as Interface | InterfaceAbi, eventName: eventName as string }
else handler = abi as HandleFn

Expand All @@ -45,24 +61,37 @@ export class OrapFlow implements Flow {
* @param options
* @param onListenFn
*/
listen(options: ListenOptions, onListenFn?: Fn) {
listen(options: ListenOptions, onListenFn?: Fn): this {
for (const eventFlow of this.subflows.event) {
eventFlow.setSubscribeProvider(options.wsProvider)
if (options.httpProvider)
eventFlow.setCrosscheckProvider(options.httpProvider)
}
this._wsProvider = options.wsProvider
this._httpProvider = options.httpProvider

if (onListenFn)
this.onListenFn = onListenFn
const eventVerses = this.subflows.event.map(flow => flow.verse)
this._verse.setEventVerses(eventVerses)

const orapVerse = this.assemble()
orapVerse.play()
this._verse.play()
this.onListenFn()
return this
}

stop(): this {
this._verse.stop()
return this
}

restart(): this {
this._verse.restart()
return this
}

assemble(): OrapVerse {
const eventVerses = this.subflows.event.map(flow => flow.assemble())
return new OrapVerse(this).setEventVerses(eventVerses)
// this.routes.event.push(es)
}
}
5 changes: 3 additions & 2 deletions packages/orap/src/flow/task.test.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
import { beforeEach, describe, expect, it, vi } from 'vitest'
import { memoryStore } from '@ora-io/utils'
import { StoreManager } from '../store'
import { EventFlow, TaskFlow } from '.'
import { ERC20_ABI, USDT_ADDRESS } from '../../tests/config'
import { EventFlow, OrapFlow, TaskFlow } from '.'

describe('TaskFlow', () => {
let parentFlow: any
let taskFlow: TaskFlow

beforeEach(() => {
parentFlow = new EventFlow()
parentFlow = new EventFlow(new OrapFlow(), { address: USDT_ADDRESS, abi: ERC20_ABI, eventName: 'Transfer' })
taskFlow = new TaskFlow(parentFlow)
})

Expand Down
Loading
Loading