diff --git a/docs/tasks/plan.md b/docs/tasks/plan.md index fd4ba78..cb56e89 100644 --- a/docs/tasks/plan.md +++ b/docs/tasks/plan.md @@ -14,7 +14,7 @@ Replace Telegraf with grammY and @grammyjs/conversations to fix lifecycle/state | 3 | Rewrite add-bill flow as grammY conversation | done (PR #4) | 1 | [003](003-rewrite-add-bill-conversation.md) | | 4 | Rewrite categorization flow as grammY conversation | done (PR #5) | 1 | [004](004-rewrite-categorization-conversation.md) | | 5 | Migrate TelegramAdapter and remove Telegraf | done (PR #6) | 2, 3, 4 | [005](005-migrate-telegram-adapter.md) | -| 6 | Wire event-driven categorization entry | pending | 4, 5 | [006](006-wire-event-driven-categorization.md) | +| 6 | Wire event-driven categorization entry | reviewed | 4, 5 | [006](006-wire-event-driven-categorization.md) | ## Dependency Graph diff --git a/src/infrastructure/events/message-queue.service.ts b/src/infrastructure/events/message-queue.service.ts index 84f8b3c..a2d2858 100644 --- a/src/infrastructure/events/message-queue.service.ts +++ b/src/infrastructure/events/message-queue.service.ts @@ -3,6 +3,7 @@ import { logger } from "../utils/logger"; import { getQueueEventName } from "./event-types"; import { container } from "../utils"; import { AutomationService } from "../../application/services/automation.service"; +import { removePendingMerchantByMerchantId } from "../telegram/telegram.adapter"; interface QueueItem { event: string; @@ -11,13 +12,18 @@ interface QueueItem { taskId?: string; } +const DEFAULT_TASK_TIMEOUT_MS = 30 * 60 * 1000; // 30 minutes + export class MessageQueueService { private queue: QueueItem[] = []; private isProcessing: boolean = false; private eventEmitter: EventEmitter; + private taskTimeoutMs: number; + private timeoutTimer: ReturnType | null = null; - constructor(eventEmitter: EventEmitter) { + constructor(eventEmitter: EventEmitter, taskTimeoutMs?: number) { this.eventEmitter = eventEmitter; + this.taskTimeoutMs = taskTimeoutMs ?? DEFAULT_TASK_TIMEOUT_MS; } public enqueue(event: string, data: any, taskId?: string): void { @@ -45,16 +51,18 @@ export class MessageQueueService { } public completeTask(taskId: string): void { - this.isProcessing = false; - // Find the index of the task with matching taskId const taskIndex = this.queue.findIndex(item => item.taskId === taskId); - + if (taskIndex === -1) { - logger.debug(`No task found with ID: ${taskId}`); + // Task already removed (e.g., by timeout) — do not disturb current processing + logger.debug(`No task found with ID: ${taskId}, ignoring late completion`); return; } + this.isProcessing = false; + this.clearTaskTimeout(); + // Remove the specific task from the queue this.queue.splice(taskIndex, 1); logger.debug(`Task completed with ID: ${taskId}`, { @@ -81,6 +89,7 @@ export class MessageQueueService { } this.isProcessing = true; + this.startTaskTimeout(); const item = this.queue[0]; const queueEventName = getQueueEventName(item.event); @@ -100,6 +109,30 @@ export class MessageQueueService { } } + private startTaskTimeout(): void { + this.clearTaskTimeout(); + if (this.taskTimeoutMs <= 0) return; + + this.timeoutTimer = setTimeout(() => { + const item = this.queue[0]; + if (!item?.taskId) return; + + logger.warn(`Task ${item.taskId} timed out after ${this.taskTimeoutMs}ms, auto-completing`); + // Clean up the pending merchant registry to invalidate stale Telegram buttons + if (item.taskId) { + removePendingMerchantByMerchantId(item.taskId); + } + this.completeTask(item.taskId); + }, this.taskTimeoutMs); + } + + private clearTaskTimeout(): void { + if (this.timeoutTimer) { + clearTimeout(this.timeoutTimer); + this.timeoutTimer = null; + } + } + private async processEvent(eventName: string, data: any): Promise { return new Promise((resolve, reject) => { this.eventEmitter.emit(eventName, data, (error?: Error) => { diff --git a/src/infrastructure/telegram/telegram.adapter.ts b/src/infrastructure/telegram/telegram.adapter.ts index 8760bad..fb75de6 100644 --- a/src/infrastructure/telegram/telegram.adapter.ts +++ b/src/infrastructure/telegram/telegram.adapter.ts @@ -37,6 +37,16 @@ export function removePendingMerchant(shortId: string): void { } } +export function removePendingMerchantByMerchantId(merchantId: string): void { + pendingMerchantRegistry.delete(merchantId); + for (const [shortId, mid] of shortIdToMerchantId.entries()) { + if (mid === merchantId) { + shortIdToMerchantId.delete(shortId); + break; + } + } +} + export class TelegramAdapter { private bot: Bot; private logger: ILogger;