From ca198b268746c635e5ed627354a16d6830838f1b Mon Sep 17 00:00:00 2001 From: David May <85513542+davidleomay@users.noreply.github.com> Date: Tue, 13 Jan 2026 21:54:22 +0100 Subject: [PATCH 1/2] feat(lm): speedup (#2930) --- .../liquidity-management-pipeline.service.ts | 58 +++++++++++-------- 1 file changed, 34 insertions(+), 24 deletions(-) diff --git a/src/subdomains/core/liquidity-management/services/liquidity-management-pipeline.service.ts b/src/subdomains/core/liquidity-management/services/liquidity-management-pipeline.service.ts index decc02e2e0..bb0ad4426f 100644 --- a/src/subdomains/core/liquidity-management/services/liquidity-management-pipeline.service.ts +++ b/src/subdomains/core/liquidity-management/services/liquidity-management-pipeline.service.ts @@ -32,16 +32,16 @@ export class LiquidityManagementPipelineService { //*** JOBS ***// - @DfxCron(CronExpression.EVERY_MINUTE, { process: Process.LIQUIDITY_MANAGEMENT, timeout: 1800 }) - async processPipelines() { - await this.checkRunningOrders(); - await this.startNewPipelines(); - - let hasWaitingOrders = true; - for (let i = 0; i < 5 && hasWaitingOrders; i++) { - await this.checkRunningPipelines(); - - hasWaitingOrders = await this.startNewOrders(); + @DfxCron(CronExpression.EVERY_10_SECONDS, { process: Process.LIQUIDITY_MANAGEMENT, timeout: 1800 }) + async processPipelines(): Promise { + let hasChanges = true; + while (hasChanges) { + const newPipelinesStarted = await this.startNewPipelines(); + const ordersChanged = await this.checkRunningOrders(); + const pipelinesChanged = await this.checkRunningPipelines(); + const newOrdersStarted = await this.startNewOrders(); + + hasChanges = newPipelinesStarted || ordersChanged || pipelinesChanged || newOrdersStarted; } } @@ -82,7 +82,7 @@ export class LiquidityManagementPipelineService { //*** HELPER METHODS ***// - async startNewPipelines(): Promise { + async startNewPipelines(): Promise { const newPipelines = await this.pipelineRepo.findBy({ status: LiquidityManagementPipelineStatus.CREATED }); this.logNewPipelines(newPipelines); @@ -96,13 +96,16 @@ export class LiquidityManagementPipelineService { continue; } } + + return newPipelines.length > 0; } - private async checkRunningPipelines(): Promise { + private async checkRunningPipelines(): Promise { const runningPipelines = await this.pipelineRepo.find({ where: { status: LiquidityManagementPipelineStatus.IN_PROGRESS }, relations: { currentAction: { onSuccess: true, onFail: true } }, }); + let anyChanged = false; for (const pipeline of runningPipelines) { try { @@ -120,6 +123,7 @@ export class LiquidityManagementPipelineService { ) { pipeline.continue(lastOrder.status); await this.pipelineRepo.save(pipeline); + anyChanged = true; if (pipeline.status === LiquidityManagementPipelineStatus.COMPLETE) { await this.handlePipelineCompletion(pipeline); @@ -146,11 +150,14 @@ export class LiquidityManagementPipelineService { ); await this.placeLiquidityOrder(pipeline, lastOrder); + anyChanged = true; } catch (e) { this.logger.error(`Error in checking running liquidity pipeline ${pipeline.id}:`, e); continue; } } + + return anyChanged; } private async placeLiquidityOrder( @@ -164,8 +171,6 @@ export class LiquidityManagementPipelineService { } private async startNewOrders(): Promise { - let hasFinishedOrders = false; - const newOrders = await this.orderRepo.findBy({ status: LiquidityManagementOrderStatus.CREATED }); for (const order of newOrders) { @@ -175,23 +180,19 @@ export class LiquidityManagementPipelineService { if (e instanceof OrderNotNecessaryException) { order.complete(); await this.orderRepo.save(order); - } - if (e instanceof OrderNotProcessableException) { + } else if (e instanceof OrderNotProcessableException) { order.notProcessable(e); await this.orderRepo.save(order); - } - if (e instanceof OrderFailedException) { + } else if (e instanceof OrderFailedException) { order.fail(e); await this.orderRepo.save(order); } - hasFinishedOrders = true; - this.logger.info(`Error in starting new liquidity order ${order.id}:`, e); } } - return hasFinishedOrders; + return newOrders.length > 0; } private async executeOrder(order: LiquidityManagementOrder): Promise { @@ -203,30 +204,36 @@ export class LiquidityManagementPipelineService { await this.orderRepo.save(order); } - private async checkRunningOrders(): Promise { + private async checkRunningOrders(): Promise { const runningOrders = await this.orderRepo.findBy({ status: LiquidityManagementOrderStatus.IN_PROGRESS }); + let anyChanged = false; for (const order of runningOrders) { try { - await this.checkOrder(order); + const changed = await this.checkOrder(order); + anyChanged = anyChanged || changed; } catch (e) { if (e instanceof OrderNotProcessableException) { order.notProcessable(e); await this.orderRepo.save(order); + anyChanged = true; continue; } if (e instanceof OrderFailedException) { order.fail(e); await this.orderRepo.save(order); + anyChanged = true; continue; } this.logger.error(`Error in checking running liquidity order ${order.id}:`, e); } } + + return anyChanged; } - private async checkOrder(order: LiquidityManagementOrder): Promise { + private async checkOrder(order: LiquidityManagementOrder): Promise { const actionIntegration = this.actionIntegrationFactory.getIntegration(order.action); const isComplete = await actionIntegration.checkCompletion(order); @@ -235,7 +242,10 @@ export class LiquidityManagementPipelineService { await this.orderRepo.save(order); this.logger.verbose(`Liquidity management order ${order.id} complete`); + return true; } + + return false; } private async handlePipelineCompletion(pipeline: LiquidityManagementPipeline): Promise { From 2ce7abbcdc2e5576e4b3d82603e0123f0deed611 Mon Sep 17 00:00:00 2001 From: TaprootFreak <142087526+TaprootFreak@users.noreply.github.com> Date: Tue, 13 Jan 2026 21:56:36 +0100 Subject: [PATCH 2/2] fix: check unlocked balance per group for Monero payouts (#2919) * fix: check unlocked balance per group for Monero payouts Previously, the balance check was done once for all orders. If total amount exceeded unlocked balance, ALL orders were skipped - even if enough balance existed for the first group. Now checks balance per 15-order group, ensuring at least the first group gets processed when sufficient funds are available. Remaining groups retry in subsequent cronjob runs after change outputs unlock. * fix: reduce Monero payout group size to fit unlocked balance Instead of skipping the entire group when balance is insufficient, now reduces the group to include only orders that fit within the available unlocked balance while maintaining FIFO order. * style: format monero strategy * chore: refactoring --------- Co-authored-by: Bernd Co-authored-by: David May --- .../strategies/payout/impl/monero.strategy.ts | 54 ++++++++++--------- 1 file changed, 29 insertions(+), 25 deletions(-) diff --git a/src/subdomains/supporting/payout/strategies/payout/impl/monero.strategy.ts b/src/subdomains/supporting/payout/strategies/payout/impl/monero.strategy.ts index 372009d9c4..09639cd113 100644 --- a/src/subdomains/supporting/payout/strategies/payout/impl/monero.strategy.ts +++ b/src/subdomains/supporting/payout/strategies/payout/impl/monero.strategy.ts @@ -3,7 +3,6 @@ import { Blockchain } from 'src/integration/blockchain/shared/enums/blockchain.e import { Asset, AssetType } from 'src/shared/models/asset/asset.entity'; import { AssetService } from 'src/shared/models/asset/asset.service'; import { DfxLogger } from 'src/shared/services/dfx-logger'; -import { Util } from 'src/shared/utils/util'; import { NotificationService } from 'src/subdomains/supporting/notification/services/notification.service'; import { PayoutOrder, PayoutOrderContext } from '../../../entities/payout-order.entity'; import { FeeResult } from '../../../interfaces'; @@ -43,40 +42,45 @@ export class MoneroStrategy extends BitcoinBasedStrategy { } protected async doPayoutForContext(context: PayoutOrderContext, orders: PayoutOrder[]): Promise { - if (!(await this.hasEnoughUnlockedBalance(orders))) { - this.logger.info( - `Insufficient unlocked balance for paying out XMR orders(s). Order ID(s): ${orders.map((o) => o.id)}`, - ); - return; - } + const pendingOrders = [...orders]; + let paidOutOrders = 0; - const payoutGroups = this.createPayoutGroups(orders, 15); + while (pendingOrders.length > 0) { + const unlockedBalance = await this.payoutMoneroService.getUnlockedBalance(); + if (unlockedBalance <= 0) break; - for (const group of payoutGroups) { - try { - if (group.length === 0) { - continue; - } - - this.logger.verbose(`Paying out ${group.length} XMR orders(s). Order ID(s): ${group.map((o) => o.id)}`); + const group = this.splicePayoutGroup(pendingOrders, unlockedBalance, 15); + if (group.length === 0) break; + try { await this.sendXMR(context, group); + paidOutOrders += group.length; } catch (e) { - this.logger.error( - `Error in paying out a group of ${group.length} XMR orders(s). Order ID(s): ${group.map((o) => o.id)}`, - e, - ); - // continue with next group in case payout failed - continue; + this.logger.error(`Error paying out XMR orders`, e); + break; } } + + if (paidOutOrders > 0 || pendingOrders.length > 0) { + this.logger.info( + `XMR payout: ${paidOutOrders} paid, ${pendingOrders.length} pending (insufficient unlocked balance)`, + ); + } } - private async hasEnoughUnlockedBalance(orders: PayoutOrder[]): Promise { - const totalOrderAmount = Util.sumObjValue(orders, 'amount'); - const unlockedBalance = await this.payoutMoneroService.getUnlockedBalance(); + private splicePayoutGroup(orders: PayoutOrder[], maxAmount: number, maxSize: number): PayoutOrder[] { + let total = 0; + let count = 0; + + for (const order of orders) { + if (count >= maxSize) break; + if (total + order.amount > maxAmount) break; + + total += order.amount; + count++; + } - return totalOrderAmount <= unlockedBalance; + return orders.splice(0, count); } protected dispatchPayout(context: PayoutOrderContext, payout: PayoutGroup): Promise {