Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/tasks/plan.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Replace Telegraf with grammY and @grammyjs/conversations to fix lifecycle/state
| 3 | Rewrite add-bill flow as grammY conversation | done (PR #4) | 1 | [003](003-rewrite-add-bill-conversation.md) |
| 4 | Rewrite categorization flow as grammY conversation | done (PR #5) | 1 | [004](004-rewrite-categorization-conversation.md) |
| 5 | Migrate TelegramAdapter and remove Telegraf | done (PR #6) | 2, 3, 4 | [005](005-migrate-telegram-adapter.md) |
| 6 | Wire event-driven categorization entry | pending | 4, 5 | [006](006-wire-event-driven-categorization.md) |
| 6 | Wire event-driven categorization entry | reviewed | 4, 5 | [006](006-wire-event-driven-categorization.md) |

## Dependency Graph

Expand Down
43 changes: 38 additions & 5 deletions src/infrastructure/events/message-queue.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { logger } from "../utils/logger";
import { getQueueEventName } from "./event-types";
import { container } from "../utils";
import { AutomationService } from "../../application/services/automation.service";
import { removePendingMerchantByMerchantId } from "../telegram/telegram.adapter";

interface QueueItem {
event: string;
Expand All @@ -11,13 +12,18 @@ interface QueueItem {
taskId?: string;
}

const DEFAULT_TASK_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes

export class MessageQueueService {
private queue: QueueItem[] = [];
private isProcessing: boolean = false;
private eventEmitter: EventEmitter;
private taskTimeoutMs: number;
private timeoutTimer: ReturnType<typeof setTimeout> | null = null;

constructor(eventEmitter: EventEmitter) {
constructor(eventEmitter: EventEmitter, taskTimeoutMs?: number) {
this.eventEmitter = eventEmitter;
this.taskTimeoutMs = taskTimeoutMs ?? DEFAULT_TASK_TIMEOUT_MS;
}

public enqueue(event: string, data: any, taskId?: string): void {
Expand Down Expand Up @@ -45,16 +51,18 @@ export class MessageQueueService {
}

public completeTask(taskId: string): void {
this.isProcessing = false;

// Find the index of the task with matching taskId
const taskIndex = this.queue.findIndex(item => item.taskId === taskId);

if (taskIndex === -1) {
logger.debug(`No task found with ID: ${taskId}`);
// Task already removed (e.g., by timeout) — do not disturb current processing
logger.debug(`No task found with ID: ${taskId}, ignoring late completion`);
return;
}

this.isProcessing = false;
this.clearTaskTimeout();

// Remove the specific task from the queue
this.queue.splice(taskIndex, 1);
logger.debug(`Task completed with ID: ${taskId}`, {
Expand All @@ -81,6 +89,7 @@ export class MessageQueueService {
}

this.isProcessing = true;
this.startTaskTimeout();

const item = this.queue[0];
const queueEventName = getQueueEventName(item.event);
Expand All @@ -100,6 +109,30 @@ export class MessageQueueService {
}
}

private startTaskTimeout(): void {
this.clearTaskTimeout();
if (this.taskTimeoutMs <= 0) return;

this.timeoutTimer = setTimeout(() => {
const item = this.queue[0];
if (!item?.taskId) return;

logger.warn(`Task ${item.taskId} timed out after ${this.taskTimeoutMs}ms, auto-completing`);
// Clean up the pending merchant registry to invalidate stale Telegram buttons
if (item.taskId) {
removePendingMerchantByMerchantId(item.taskId);
}
this.completeTask(item.taskId);
}, this.taskTimeoutMs);
}

private clearTaskTimeout(): void {
if (this.timeoutTimer) {
clearTimeout(this.timeoutTimer);
this.timeoutTimer = null;
}
}

private async processEvent(eventName: string, data: any): Promise<void> {
return new Promise((resolve, reject) => {
this.eventEmitter.emit(eventName, data, (error?: Error) => {
Expand Down
10 changes: 10 additions & 0 deletions src/infrastructure/telegram/telegram.adapter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,16 @@ export function removePendingMerchant(shortId: string): void {
}
}

export function removePendingMerchantByMerchantId(merchantId: string): void {
pendingMerchantRegistry.delete(merchantId);
for (const [shortId, mid] of shortIdToMerchantId.entries()) {
if (mid === merchantId) {
shortIdToMerchantId.delete(shortId);
break;
}
}
}

export class TelegramAdapter {
private bot: Bot<BotContext>;
private logger: ILogger;
Expand Down
Loading