Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,16 @@ export class LiquidityManagementPipelineService {

//*** JOBS ***//

@DfxCron(CronExpression.EVERY_MINUTE, { process: Process.LIQUIDITY_MANAGEMENT, timeout: 1800 })
async processPipelines() {
await this.checkRunningOrders();
await this.startNewPipelines();

let hasWaitingOrders = true;
for (let i = 0; i < 5 && hasWaitingOrders; i++) {
await this.checkRunningPipelines();

hasWaitingOrders = await this.startNewOrders();
@DfxCron(CronExpression.EVERY_10_SECONDS, { process: Process.LIQUIDITY_MANAGEMENT, timeout: 1800 })
async processPipelines(): Promise<void> {
let hasChanges = true;
while (hasChanges) {
const newPipelinesStarted = await this.startNewPipelines();
const ordersChanged = await this.checkRunningOrders();
const pipelinesChanged = await this.checkRunningPipelines();
const newOrdersStarted = await this.startNewOrders();

hasChanges = newPipelinesStarted || ordersChanged || pipelinesChanged || newOrdersStarted;
}
}

Expand Down Expand Up @@ -82,7 +82,7 @@ export class LiquidityManagementPipelineService {

//*** HELPER METHODS ***//

async startNewPipelines(): Promise<void> {
async startNewPipelines(): Promise<boolean> {
const newPipelines = await this.pipelineRepo.findBy({ status: LiquidityManagementPipelineStatus.CREATED });

this.logNewPipelines(newPipelines);
Expand All @@ -96,13 +96,16 @@ export class LiquidityManagementPipelineService {
continue;
}
}

return newPipelines.length > 0;
}

private async checkRunningPipelines(): Promise<void> {
private async checkRunningPipelines(): Promise<boolean> {
const runningPipelines = await this.pipelineRepo.find({
where: { status: LiquidityManagementPipelineStatus.IN_PROGRESS },
relations: { currentAction: { onSuccess: true, onFail: true } },
});
let anyChanged = false;

for (const pipeline of runningPipelines) {
try {
Expand All @@ -120,6 +123,7 @@ export class LiquidityManagementPipelineService {
) {
pipeline.continue(lastOrder.status);
await this.pipelineRepo.save(pipeline);
anyChanged = true;

if (pipeline.status === LiquidityManagementPipelineStatus.COMPLETE) {
await this.handlePipelineCompletion(pipeline);
Expand All @@ -146,11 +150,14 @@ export class LiquidityManagementPipelineService {
);

await this.placeLiquidityOrder(pipeline, lastOrder);
anyChanged = true;
} catch (e) {
this.logger.error(`Error in checking running liquidity pipeline ${pipeline.id}:`, e);
continue;
}
}

return anyChanged;
}

private async placeLiquidityOrder(
Expand All @@ -164,8 +171,6 @@ export class LiquidityManagementPipelineService {
}

private async startNewOrders(): Promise<boolean> {
let hasFinishedOrders = false;

const newOrders = await this.orderRepo.findBy({ status: LiquidityManagementOrderStatus.CREATED });

for (const order of newOrders) {
Expand All @@ -175,23 +180,19 @@ export class LiquidityManagementPipelineService {
if (e instanceof OrderNotNecessaryException) {
order.complete();
await this.orderRepo.save(order);
}
if (e instanceof OrderNotProcessableException) {
} else if (e instanceof OrderNotProcessableException) {
order.notProcessable(e);
await this.orderRepo.save(order);
}
if (e instanceof OrderFailedException) {
} else if (e instanceof OrderFailedException) {
order.fail(e);
await this.orderRepo.save(order);
}

hasFinishedOrders = true;

this.logger.info(`Error in starting new liquidity order ${order.id}:`, e);
}
}

return hasFinishedOrders;
return newOrders.length > 0;
}

private async executeOrder(order: LiquidityManagementOrder): Promise<void> {
Expand All @@ -203,30 +204,36 @@ export class LiquidityManagementPipelineService {
await this.orderRepo.save(order);
}

private async checkRunningOrders(): Promise<void> {
private async checkRunningOrders(): Promise<boolean> {
const runningOrders = await this.orderRepo.findBy({ status: LiquidityManagementOrderStatus.IN_PROGRESS });
let anyChanged = false;

for (const order of runningOrders) {
try {
await this.checkOrder(order);
const changed = await this.checkOrder(order);
anyChanged = anyChanged || changed;
} catch (e) {
if (e instanceof OrderNotProcessableException) {
order.notProcessable(e);
await this.orderRepo.save(order);
anyChanged = true;
continue;
}
if (e instanceof OrderFailedException) {
order.fail(e);
await this.orderRepo.save(order);
anyChanged = true;
continue;
}

this.logger.error(`Error in checking running liquidity order ${order.id}:`, e);
}
}

return anyChanged;
}

private async checkOrder(order: LiquidityManagementOrder): Promise<void> {
private async checkOrder(order: LiquidityManagementOrder): Promise<boolean> {
const actionIntegration = this.actionIntegrationFactory.getIntegration(order.action);
const isComplete = await actionIntegration.checkCompletion(order);

Expand All @@ -235,7 +242,10 @@ export class LiquidityManagementPipelineService {
await this.orderRepo.save(order);

this.logger.verbose(`Liquidity management order ${order.id} complete`);
return true;
}

return false;
}

private async handlePipelineCompletion(pipeline: LiquidityManagementPipeline): Promise<void> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import { Blockchain } from 'src/integration/blockchain/shared/enums/blockchain.e
import { Asset, AssetType } from 'src/shared/models/asset/asset.entity';
import { AssetService } from 'src/shared/models/asset/asset.service';
import { DfxLogger } from 'src/shared/services/dfx-logger';
import { Util } from 'src/shared/utils/util';
import { NotificationService } from 'src/subdomains/supporting/notification/services/notification.service';
import { PayoutOrder, PayoutOrderContext } from '../../../entities/payout-order.entity';
import { FeeResult } from '../../../interfaces';
Expand Down Expand Up @@ -43,40 +42,45 @@ export class MoneroStrategy extends BitcoinBasedStrategy {
}

protected async doPayoutForContext(context: PayoutOrderContext, orders: PayoutOrder[]): Promise<void> {
if (!(await this.hasEnoughUnlockedBalance(orders))) {
this.logger.info(
`Insufficient unlocked balance for paying out XMR orders(s). Order ID(s): ${orders.map((o) => o.id)}`,
);
return;
}
const pendingOrders = [...orders];
let paidOutOrders = 0;

const payoutGroups = this.createPayoutGroups(orders, 15);
while (pendingOrders.length > 0) {
const unlockedBalance = await this.payoutMoneroService.getUnlockedBalance();
if (unlockedBalance <= 0) break;

for (const group of payoutGroups) {
try {
if (group.length === 0) {
continue;
}

this.logger.verbose(`Paying out ${group.length} XMR orders(s). Order ID(s): ${group.map((o) => o.id)}`);
const group = this.splicePayoutGroup(pendingOrders, unlockedBalance, 15);
if (group.length === 0) break;

try {
await this.sendXMR(context, group);
paidOutOrders += group.length;
} catch (e) {
this.logger.error(
`Error in paying out a group of ${group.length} XMR orders(s). Order ID(s): ${group.map((o) => o.id)}`,
e,
);
// continue with next group in case payout failed
continue;
this.logger.error(`Error paying out XMR orders`, e);
break;
}
}

if (paidOutOrders > 0 || pendingOrders.length > 0) {
this.logger.info(
`XMR payout: ${paidOutOrders} paid, ${pendingOrders.length} pending (insufficient unlocked balance)`,
);
}
}

private async hasEnoughUnlockedBalance(orders: PayoutOrder[]): Promise<boolean> {
const totalOrderAmount = Util.sumObjValue<PayoutOrder>(orders, 'amount');
const unlockedBalance = await this.payoutMoneroService.getUnlockedBalance();
private splicePayoutGroup(orders: PayoutOrder[], maxAmount: number, maxSize: number): PayoutOrder[] {
let total = 0;
let count = 0;

for (const order of orders) {
if (count >= maxSize) break;
if (total + order.amount > maxAmount) break;

total += order.amount;
count++;
}

return totalOrderAmount <= unlockedBalance;
return orders.splice(0, count);
}

protected dispatchPayout(context: PayoutOrderContext, payout: PayoutGroup): Promise<string> {
Expand Down
Loading