diff --git a/src/subdomains/core/liquidity-management/services/liquidity-management-pipeline.service.ts b/src/subdomains/core/liquidity-management/services/liquidity-management-pipeline.service.ts index decc02e2e0..bb0ad4426f 100644 --- a/src/subdomains/core/liquidity-management/services/liquidity-management-pipeline.service.ts +++ b/src/subdomains/core/liquidity-management/services/liquidity-management-pipeline.service.ts @@ -32,16 +32,16 @@ export class LiquidityManagementPipelineService { //*** JOBS ***// - @DfxCron(CronExpression.EVERY_MINUTE, { process: Process.LIQUIDITY_MANAGEMENT, timeout: 1800 }) - async processPipelines() { - await this.checkRunningOrders(); - await this.startNewPipelines(); - - let hasWaitingOrders = true; - for (let i = 0; i < 5 && hasWaitingOrders; i++) { - await this.checkRunningPipelines(); - - hasWaitingOrders = await this.startNewOrders(); + @DfxCron(CronExpression.EVERY_10_SECONDS, { process: Process.LIQUIDITY_MANAGEMENT, timeout: 1800 }) + async processPipelines(): Promise { + let hasChanges = true; + while (hasChanges) { + const newPipelinesStarted = await this.startNewPipelines(); + const ordersChanged = await this.checkRunningOrders(); + const pipelinesChanged = await this.checkRunningPipelines(); + const newOrdersStarted = await this.startNewOrders(); + + hasChanges = newPipelinesStarted || ordersChanged || pipelinesChanged || newOrdersStarted; } } @@ -82,7 +82,7 @@ export class LiquidityManagementPipelineService { //*** HELPER METHODS ***// - async startNewPipelines(): Promise { + async startNewPipelines(): Promise { const newPipelines = await this.pipelineRepo.findBy({ status: LiquidityManagementPipelineStatus.CREATED }); this.logNewPipelines(newPipelines); @@ -96,13 +96,16 @@ export class LiquidityManagementPipelineService { continue; } } + + return newPipelines.length > 0; } - private async checkRunningPipelines(): Promise { + private async checkRunningPipelines(): Promise { const runningPipelines = await this.pipelineRepo.find({ where: { status: LiquidityManagementPipelineStatus.IN_PROGRESS }, relations: { currentAction: { onSuccess: true, onFail: true } }, }); + let anyChanged = false; for (const pipeline of runningPipelines) { try { @@ -120,6 +123,7 @@ export class LiquidityManagementPipelineService { ) { pipeline.continue(lastOrder.status); await this.pipelineRepo.save(pipeline); + anyChanged = true; if (pipeline.status === LiquidityManagementPipelineStatus.COMPLETE) { await this.handlePipelineCompletion(pipeline); @@ -146,11 +150,14 @@ export class LiquidityManagementPipelineService { ); await this.placeLiquidityOrder(pipeline, lastOrder); + anyChanged = true; } catch (e) { this.logger.error(`Error in checking running liquidity pipeline ${pipeline.id}:`, e); continue; } } + + return anyChanged; } private async placeLiquidityOrder( @@ -164,8 +171,6 @@ export class LiquidityManagementPipelineService { } private async startNewOrders(): Promise { - let hasFinishedOrders = false; - const newOrders = await this.orderRepo.findBy({ status: LiquidityManagementOrderStatus.CREATED }); for (const order of newOrders) { @@ -175,23 +180,19 @@ export class LiquidityManagementPipelineService { if (e instanceof OrderNotNecessaryException) { order.complete(); await this.orderRepo.save(order); - } - if (e instanceof OrderNotProcessableException) { + } else if (e instanceof OrderNotProcessableException) { order.notProcessable(e); await this.orderRepo.save(order); - } - if (e instanceof OrderFailedException) { + } else if (e instanceof OrderFailedException) { order.fail(e); await this.orderRepo.save(order); } - hasFinishedOrders = true; - this.logger.info(`Error in starting new liquidity order ${order.id}:`, e); } } - return hasFinishedOrders; + return newOrders.length > 0; } private async executeOrder(order: LiquidityManagementOrder): Promise { @@ -203,30 +204,36 @@ export class LiquidityManagementPipelineService { await this.orderRepo.save(order); } - private async checkRunningOrders(): Promise { + private async checkRunningOrders(): Promise { const runningOrders = await this.orderRepo.findBy({ status: LiquidityManagementOrderStatus.IN_PROGRESS }); + let anyChanged = false; for (const order of runningOrders) { try { - await this.checkOrder(order); + const changed = await this.checkOrder(order); + anyChanged = anyChanged || changed; } catch (e) { if (e instanceof OrderNotProcessableException) { order.notProcessable(e); await this.orderRepo.save(order); + anyChanged = true; continue; } if (e instanceof OrderFailedException) { order.fail(e); await this.orderRepo.save(order); + anyChanged = true; continue; } this.logger.error(`Error in checking running liquidity order ${order.id}:`, e); } } + + return anyChanged; } - private async checkOrder(order: LiquidityManagementOrder): Promise { + private async checkOrder(order: LiquidityManagementOrder): Promise { const actionIntegration = this.actionIntegrationFactory.getIntegration(order.action); const isComplete = await actionIntegration.checkCompletion(order); @@ -235,7 +242,10 @@ export class LiquidityManagementPipelineService { await this.orderRepo.save(order); this.logger.verbose(`Liquidity management order ${order.id} complete`); + return true; } + + return false; } private async handlePipelineCompletion(pipeline: LiquidityManagementPipeline): Promise { diff --git a/src/subdomains/supporting/payout/strategies/payout/impl/monero.strategy.ts b/src/subdomains/supporting/payout/strategies/payout/impl/monero.strategy.ts index 372009d9c4..09639cd113 100644 --- a/src/subdomains/supporting/payout/strategies/payout/impl/monero.strategy.ts +++ b/src/subdomains/supporting/payout/strategies/payout/impl/monero.strategy.ts @@ -3,7 +3,6 @@ import { Blockchain } from 'src/integration/blockchain/shared/enums/blockchain.e import { Asset, AssetType } from 'src/shared/models/asset/asset.entity'; import { AssetService } from 'src/shared/models/asset/asset.service'; import { DfxLogger } from 'src/shared/services/dfx-logger'; -import { Util } from 'src/shared/utils/util'; import { NotificationService } from 'src/subdomains/supporting/notification/services/notification.service'; import { PayoutOrder, PayoutOrderContext } from '../../../entities/payout-order.entity'; import { FeeResult } from '../../../interfaces'; @@ -43,40 +42,45 @@ export class MoneroStrategy extends BitcoinBasedStrategy { } protected async doPayoutForContext(context: PayoutOrderContext, orders: PayoutOrder[]): Promise { - if (!(await this.hasEnoughUnlockedBalance(orders))) { - this.logger.info( - `Insufficient unlocked balance for paying out XMR orders(s). Order ID(s): ${orders.map((o) => o.id)}`, - ); - return; - } + const pendingOrders = [...orders]; + let paidOutOrders = 0; - const payoutGroups = this.createPayoutGroups(orders, 15); + while (pendingOrders.length > 0) { + const unlockedBalance = await this.payoutMoneroService.getUnlockedBalance(); + if (unlockedBalance <= 0) break; - for (const group of payoutGroups) { - try { - if (group.length === 0) { - continue; - } - - this.logger.verbose(`Paying out ${group.length} XMR orders(s). Order ID(s): ${group.map((o) => o.id)}`); + const group = this.splicePayoutGroup(pendingOrders, unlockedBalance, 15); + if (group.length === 0) break; + try { await this.sendXMR(context, group); + paidOutOrders += group.length; } catch (e) { - this.logger.error( - `Error in paying out a group of ${group.length} XMR orders(s). Order ID(s): ${group.map((o) => o.id)}`, - e, - ); - // continue with next group in case payout failed - continue; + this.logger.error(`Error paying out XMR orders`, e); + break; } } + + if (paidOutOrders > 0 || pendingOrders.length > 0) { + this.logger.info( + `XMR payout: ${paidOutOrders} paid, ${pendingOrders.length} pending (insufficient unlocked balance)`, + ); + } } - private async hasEnoughUnlockedBalance(orders: PayoutOrder[]): Promise { - const totalOrderAmount = Util.sumObjValue(orders, 'amount'); - const unlockedBalance = await this.payoutMoneroService.getUnlockedBalance(); + private splicePayoutGroup(orders: PayoutOrder[], maxAmount: number, maxSize: number): PayoutOrder[] { + let total = 0; + let count = 0; + + for (const order of orders) { + if (count >= maxSize) break; + if (total + order.amount > maxAmount) break; + + total += order.amount; + count++; + } - return totalOrderAmount <= unlockedBalance; + return orders.splice(0, count); } protected dispatchPayout(context: PayoutOrderContext, payout: PayoutGroup): Promise {