From f401b159621872400fc6b416dbb104426859937f Mon Sep 17 00:00:00 2001 From: manjik-rumsan Date: Wed, 15 Apr 2026 07:45:11 +0545 Subject: [PATCH 1/2] fixed ivr sequece --- .../src/workers/ami.service.ts | 34 +- .../src/workers/asterisk.module.ts | 6 +- .../src/workers/channel-state.manager.ts | 153 +++++++ .../src/workers/ivr.service.ts | 372 +++--------------- .../src/workers/playback.service.ts | 144 +++++++ .../src/workers/types/ivr.types.ts | 31 ++ 6 files changed, 415 insertions(+), 325 deletions(-) create mode 100644 apps/asterisk-worker/src/workers/channel-state.manager.ts create mode 100644 apps/asterisk-worker/src/workers/playback.service.ts create mode 100644 apps/asterisk-worker/src/workers/types/ivr.types.ts diff --git a/apps/asterisk-worker/src/workers/ami.service.ts b/apps/asterisk-worker/src/workers/ami.service.ts index 64213f0..d103530 100644 --- a/apps/asterisk-worker/src/workers/ami.service.ts +++ b/apps/asterisk-worker/src/workers/ami.service.ts @@ -23,6 +23,10 @@ const amiConfig = { export class AMIService { private readonly logger = new Logger(AMIService.name); private ami: any; + // BUG FIX: Per-channel DTMF tracking instead of a single shared array. + // The old code used one ivrSequence[] for ALL concurrent calls, causing + // jumbled reports where digits from different calls were mixed together. + private ivrSequences = new Map(); constructor( @Inject('AMQP_CONNECTION') @@ -35,7 +39,6 @@ export class AMIService { connect() { console.log(amiConfig); - const ivrSequence: string[] = []; this.ami = new AsteriskManager( amiConfig.port, amiConfig.host, @@ -49,17 +52,18 @@ export class AMIService { this.ami.on('managerevent', async (evt) => { const eventType = evt.event; - // console.log('=====', eventType); - // console.log(evt); if (eventType === 'Hangup') { - //console.log('Hangup Event', evt); const disposition = getAsteriskDisposition(evt.cause, evt.channelstate); const broadcastLog: QueueBroadcastLogVoice = this.batchManager.getLog( evt.uniqueid, ); if (broadcastLog) { + // Retrieve the per-channel DTMF sequence and clean up + const ivrSequence = this.ivrSequences.get(evt.uniqueid) || []; + this.ivrSequences.delete(evt.uniqueid); + broadcastLog.status = disposition === CallDisposition.ANSWERED ? BroadcastStatus.SUCCESS @@ -68,13 +72,16 @@ export class AMIService { trunk: amiConfig.trunk, disposition, hangupDetails: evt, - ivrSequence, + ivrSequence: [...ivrSequence], // snapshot copy }; await this.broadcastLogQueue.addVoice(broadcastLog); await this.batchManager.endMonitoring(evt.uniqueid); - this.logger.log(`Call Hangup: ${evt.uniqueid}`); - ivrSequence.length = 0; + this.logger.log( + `Call Hangup: ${evt.uniqueid}, DTMF sequence: [${ivrSequence.join(',')}]`, + ); } else { + // Not a tracked call — clean up any stale DTMF data just in case + this.ivrSequences.delete(evt.uniqueid); console.log('=====> Call Received: ', evt.calleridnum); } } @@ -103,7 +110,18 @@ export class AMIService { } if (eventType === 'DTMFEnd') { - ivrSequence.push(evt.digit); + // Track DTMF digit per-channel using uniqueid + const uniqueid = evt.uniqueid; + if (!this.ivrSequences.has(uniqueid)) { + this.ivrSequences.set(uniqueid, []); + } + const sequence = this.ivrSequences.get(uniqueid); + if (sequence) { + sequence.push(evt.digit); + } + this.logger.debug( + `DTMF digit '${evt.digit}' recorded for channel ${uniqueid}`, + ); } }); } diff --git a/apps/asterisk-worker/src/workers/asterisk.module.ts b/apps/asterisk-worker/src/workers/asterisk.module.ts index 3c06ffa..dc51c6e 100644 --- a/apps/asterisk-worker/src/workers/asterisk.module.ts +++ b/apps/asterisk-worker/src/workers/asterisk.module.ts @@ -7,7 +7,9 @@ import { SessionModel } from '../entities/session.entity'; import { AMIService } from './ami.service'; import { AsteriskWorker } from './asterisk.worker'; import { AudioService } from './audio.service'; +import { ChannelStateManager } from './channel-state.manager'; import { IVRService } from './ivr.service'; +import { PlaybackService } from './playback.service'; @Module({ imports: [ @@ -20,7 +22,9 @@ import { IVRService } from './ivr.service'; AudioService, AMIService, BatchManger, - IVRService + ChannelStateManager, + PlaybackService, + IVRService, ], exports: [AsteriskWorker, AMIService], }) diff --git a/apps/asterisk-worker/src/workers/channel-state.manager.ts b/apps/asterisk-worker/src/workers/channel-state.manager.ts new file mode 100644 index 0000000..7676623 --- /dev/null +++ b/apps/asterisk-worker/src/workers/channel-state.manager.ts @@ -0,0 +1,153 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Client } from 'ari-client'; +import { ChannelState, IVRDialPlan } from './types/ivr.types'; + +@Injectable() +export class ChannelStateManager { + private readonly logger = new Logger(ChannelStateManager.name); + private client: Client; + private channelStates = new Map(); + + setClient(client: Client) { + this.client = client; + } + + registerChannel(params: { + channelId: string; + ivrDialPlan: IVRDialPlan | null; + sessionId: string; + broadcastLogId: string; + address: string; + }): ChannelState { + const channelState: ChannelState = { + channelId: params.channelId, + ivrDialPlan: params.ivrDialPlan, + sessionId: params.sessionId, + broadcastLogId: params.broadcastLogId, + address: params.address, + activePlayback: null, + activePlaybackId: null, + hangupTimer: null, + isActive: true, + }; + + this.channelStates.set(params.channelId, channelState); + this.logger.log( + `Channel registered: ${params.channelId}, Address: ${params.address}`, + ); + return channelState; + } + + getState(channelId: string): ChannelState | undefined { + return this.channelStates.get(channelId); + } + + hasChannel(channelId: string): boolean { + return this.channelStates.has(channelId); + } + + removeChannel(channelId: string) { + this.channelStates.delete(channelId); + } + + async stopActivePlayback(channelId: string) { + const channelState = this.channelStates.get(channelId); + if (!channelState?.activePlayback) { + return; + } + + const oldPlaybackId = channelState.activePlaybackId; + channelState.activePlaybackId = null; + + try { + await channelState.activePlayback.stop(); + this.logger.log( + `Stopped active playback ${oldPlaybackId} on channel: ${channelId}`, + ); + } catch (err) { + // Playback may already be finished or channel gone — not an error + this.logger.debug( + `Could not stop playback ${oldPlaybackId} on channel ${channelId}: ${(err as Error).message}`, + ); + } finally { + channelState.activePlayback = null; + } + } + + scheduleHangup(channelId: string, delay: number) { + const channelState = this.channelStates.get(channelId); + if (!channelState?.isActive) { + return; + } + + this.cancelScheduledHangup(channelId); + + const timer = setTimeout(async () => { + const currentState = this.channelStates.get(channelId); + if (!currentState?.isActive) { + return; + } + + try { + this.logger.log(`Scheduled hangup firing for channel: ${channelId}`); + await this.client.channels.hangup({ channelId }); + this.logger.log(`Channel ${channelId} successfully hung up.`); + } catch (err) { + this.logger.debug( + `Hangup failed for channel ${channelId} (likely already gone): ${(err as Error).message}`, + ); + } + // Cleanup will be triggered by StasisEnd event + }, delay); + + channelState.hangupTimer = timer; + this.logger.log( + `Scheduled hangup for channel ${channelId} in ${delay / 1000}s`, + ); + } + + cancelScheduledHangup(channelId: string) { + const channelState = this.channelStates.get(channelId); + if (!channelState?.hangupTimer) { + return; + } + + clearTimeout(channelState.hangupTimer); + channelState.hangupTimer = null; + this.logger.log(`Cancelled scheduled hangup for channel: ${channelId}`); + } + + async cleanupChannel(channelId: string) { + const channelState = this.channelStates.get(channelId); + if (!channelState) { + return; // Already cleaned up — idempotent guard + } + + // Mark inactive first to prevent new operations + channelState.isActive = false; + + // Remove from map immediately to prevent re-entrant cleanup + this.channelStates.delete(channelId); + + await this.stopActivePlayback(channelId); + + // stopActivePlayback won't find it in the map anymore since we already deleted, + // so stop it directly from the state we captured + if (channelState.activePlayback) { + try { + channelState.activePlaybackId = null; + await channelState.activePlayback.stop(); + } catch { + // already gone + } + channelState.activePlayback = null; + } + + if (channelState.hangupTimer) { + clearTimeout(channelState.hangupTimer); + channelState.hangupTimer = null; + } + + this.logger.log(`Cleaned up resources for channel: ${channelId}`); + } +} diff --git a/apps/asterisk-worker/src/workers/ivr.service.ts b/apps/asterisk-worker/src/workers/ivr.service.ts index 9fa0116..134ee04 100644 --- a/apps/asterisk-worker/src/workers/ivr.service.ts +++ b/apps/asterisk-worker/src/workers/ivr.service.ts @@ -9,47 +9,22 @@ import { BroadcastLogQueue, } from '@rsconnect/queue'; import { Broadcast, QueueBroadcastLog } from '@rumsan/connect/types'; -import ari, { Channel, Client, Playback } from 'ari-client'; - -interface IVRMenuOption { - digit: number; - prompt?: string; - hangup?: boolean; - action?: string; // For webhooks - destination?: string; // For webhooks -} - -interface IVRMenu { - prompt: string; - options: IVRMenuOption[]; -} - -interface IVRDialPlan { - main: IVRMenu; - [key: string]: IVRMenu; // For other menus -} +import ari, { Channel, Client } from 'ari-client'; +import { randomUUID } from 'crypto'; +import { ChannelStateManager } from './channel-state.manager'; +import { PlaybackService } from './playback.service'; -interface ChannelState { - channelId: string; - ivrDialPlan: IVRDialPlan | null; - sessionId: string; - broadcastLogId: string; - address: string; - activePlayback: Playback | null; - activePlaybackId: string | null; // Track playback ID to prevent stale event handlers - hangupTimer: NodeJS.Timeout | null; - isActive: boolean; -} @Injectable() export class IVRService implements OnModuleInit, OnModuleDestroy { private readonly logger = new Logger(IVRService.name); private client: Client; private config; - private channelStates: Map; // Maps channelId -> ChannelState constructor( private readonly batchManager: BatchManager, private readonly broadcastLogQueue: BroadcastLogQueue, + private readonly channelStateManager: ChannelStateManager, + private readonly playbackService: PlaybackService, ) { this.config = { appName: 'rs-connect', @@ -61,7 +36,6 @@ export class IVRService implements OnModuleInit, OnModuleDestroy { audioPath: process.env.ASTERISK_AUDIO_PATH, callerId: process.env.ASTERISK_CALLER_ID, }; - this.channelStates = new Map(); // Maps channelId -> ChannelState this.logger.log('IVRService initialized', this.config); } @@ -77,50 +51,49 @@ export class IVRService implements OnModuleInit, OnModuleDestroy { broadcastLog: QueueBroadcastLog, ivrJSON?: string, ) { - try { - const ivrDialPlan = ivrJSON ? JSON.parse(ivrJSON) : null; + const ivrDialPlan = ivrJSON ? JSON.parse(ivrJSON) : null; + + // BUG FIX: Pre-generate channelId and register state BEFORE originate + // to prevent race condition where StasisStart fires before state is set + const channelId = randomUUID(); + + this.channelStateManager.registerChannel({ + channelId, + ivrDialPlan, + sessionId: broadcast.session, + broadcastLogId: broadcastLog.broadcastLogId, + address: broadcast.address, + }); - const channel = await this.originateCall( + try { + await this.originateCall( + channelId, this.callEndpoint(broadcast.address), `${broadcastLog.broadcastId} <${broadcast.address}>`, [broadcastLog.broadcastLogId, broadcast.session, broadcast.address], ); - if (!channel?.id) { - throw new Error('Failed to originate call - no channel ID returned'); - } - - // Create and store channel state - const channelState: ChannelState = { - channelId: channel.id, - ivrDialPlan, - sessionId: broadcast.session, - broadcastLogId: broadcastLog.broadcastLogId, - address: broadcast.address, - activePlayback: null, - activePlaybackId: null, - hangupTimer: null, - isActive: true, - }; - - this.channelStates.set(channel.id, channelState); - this.batchManager.startMonitoring(channel.id, broadcastLog); + this.batchManager.startMonitoring(channelId, broadcastLog); this.logger.log( - `Broadcast started for IVR - Channel: ${channel.id}, Address: ${broadcast.address}`, + `Broadcast started for IVR - Channel: ${channelId}, Address: ${broadcast.address}`, ); } catch (error) { + // Clean up pre-registered state on originate failure + this.channelStateManager.removeChannel(channelId); this.logger.error('Error in sendBroadcast:', error); throw error; } } async originateCall( + channelId: string, callEndpoint: string, callerId: string, appArgs: string[] = [], ) { try { return await this.client.channels.originate({ + channelId, endpoint: callEndpoint, context: 'from-internal', priority: 1, @@ -142,6 +115,10 @@ export class IVRService implements OnModuleInit, OnModuleDestroy { this.client = await ari.connect(server, user, password); await this.client.start(appName); + // Share the ARI client with dependent services + this.channelStateManager.setClient(this.client); + this.playbackService.setClient(this.client); + this.logger.log('ARI connected'); } catch (error) { this.logger.error('Error connecting to ARI', error); @@ -152,7 +129,6 @@ export class IVRService implements OnModuleInit, OnModuleDestroy { async onModuleInit() { this.logger.log('Module Init'); await this.connect(); - // Register event listeners only once at startup // Handle StasisStart - when a channel enters the Stasis application this.client.on('StasisStart', async (event, incomingChannel) => { @@ -160,8 +136,7 @@ export class IVRService implements OnModuleInit, OnModuleDestroy { const channelId = event.channel.id; const [broadcastLogId, sessionId, incomingAddress] = event.args || []; - // Get channel state - const channelState = this.channelStates.get(channelId); + const channelState = this.channelStateManager.getState(channelId); if (!channelState) { this.logger.warn( `StasisStart received for unknown channel: ${channelId}`, @@ -169,7 +144,6 @@ export class IVRService implements OnModuleInit, OnModuleDestroy { return; } - // Update state with actual session info if available if (sessionId) channelState.sessionId = sessionId; if (broadcastLogId) channelState.broadcastLogId = broadcastLogId; if (incomingAddress) channelState.address = incomingAddress; @@ -179,16 +153,17 @@ export class IVRService implements OnModuleInit, OnModuleDestroy { `Call Answered: ${incomingChannel.caller.number} on channel: ${channelId}`, ); - // Check if this is an IVR call or regular audio call if (channelState.ivrDialPlan?.main?.prompt) { const mainPrompt = channelState.ivrDialPlan.main.prompt.replace( '.wav', '', ); - await this.playPrompt(channelId, mainPrompt); + await this.playbackService.playPrompt(channelId, mainPrompt); } else { - // Regular audio playback - await this.playAudio(channelState.sessionId, incomingChannel); + await this.playbackService.playAudio( + channelState.sessionId, + incomingChannel, + ); } } catch (error) { this.logger.error('Error in StasisStart handler:', error); @@ -209,7 +184,7 @@ export class IVRService implements OnModuleInit, OnModuleDestroy { try { const channelId = event.channel.id; this.logger.log(`StasisEnd received for channel: ${channelId}`); - this.cleanupChannel(channelId); + await this.channelStateManager.cleanupChannel(channelId); } catch (error) { this.logger.error('Error in StasisEnd handler:', error); } @@ -221,14 +196,13 @@ export class IVRService implements OnModuleInit, OnModuleDestroy { const channelId = event.channel.id; const state = event.channel.state; - // If channel is hung up, cleanup if (state === 'Down' || state === 'Rsrvd') { - const channelState = this.channelStates.get(channelId); + const channelState = this.channelStateManager.getState(channelId); if (channelState?.isActive) { this.logger.log( `Channel ${channelId} state changed to ${state}, cleaning up`, ); - this.cleanupChannel(channelId); + await this.channelStateManager.cleanupChannel(channelId); } } } catch (error) { @@ -244,246 +218,11 @@ export class IVRService implements OnModuleInit, OnModuleDestroy { } } - async playAudio(sessionId: string, channel: Channel) { - const channelId = channel.id; - const channelState = this.channelStates.get(channelId); - - if (!channelState || !channelState.isActive) { - this.logger.warn( - `Attempted to play audio on inactive or unknown channel: ${channelId}`, - ); - return; - } - - const audio = `${this.config.audioPath}/${sessionId}`; - const playbackId = `${channelId}-${audio}`; - - const playback = this.client.Playback(); - channelState.activePlayback = playback; - channelState.activePlaybackId = playbackId; - - playback.once('PlaybackFinished', async () => { - // Only process if this is still the active playback - if (channelState.activePlaybackId !== playbackId) { - this.logger.log( - `PlaybackFinished for stale playback ${playbackId} on channel: ${channelId}, ignoring`, - ); - return; - } - - try { - if (channelState.isActive) { - await channel.hangup(); - } - } catch (error) { - this.logger.error( - `Error hanging up channel ${channelId} after playback:`, - error, - ); - } - this.logger.log( - `PlaybackFinished for channel: ${channelId}, caller: ${channel?.caller?.number}`, - ); - channelState.activePlayback = null; - channelState.activePlaybackId = null; - }); - - await channel.play( - { playbackId: playbackId, media: `sound:${audio}` }, - playback, - ); - this.logger.log(`Playing recording for channel: ${channelId}`); - playback.once('PlaybackStarted', () => { - this.logger.log( - `PlaybackStarted for channel: ${channelId}, caller: ${channel?.caller?.number}`, - ); - }); - } - - //////////// WORK FLOW CODE /////////// - private async playPrompt( - channelId: string, - media: string, - immediateHangup = false, - ) { - const channelState = this.channelStates.get(channelId); - if (!channelState || !channelState.isActive) { - this.logger.warn( - `Attempted to play prompt on inactive or unknown channel: ${channelId}`, - ); - return; - } - - try { - // Stop any active playback - await this.stopActivePlayback(channelId); - this.cancelScheduledHangup(channelId); - - // Create a new playback - const playback = this.client.Playback(); - const playbackId = playback.id; - channelState.activePlayback = playback; - channelState.activePlaybackId = playbackId; - - await this.client.channels.play({ - channelId: channelId, - playbackId: playbackId, - media: media, - }); - - this.logger.log(`Playback started: ${media} on channel: ${channelId}`); - - // Handle playback completion - playback.once('PlaybackFinished', async () => { - // Only process if this is still the active playback - if (channelState.activePlaybackId !== playbackId) { - this.logger.log( - `PlaybackFinished for stale playback ${playbackId} on channel: ${channelId}, ignoring`, - ); - return; - } - - this.logger.log(`Playback finished on channel: ${channelId}`); - channelState.activePlayback = null; - channelState.activePlaybackId = null; - if (immediateHangup) { - // Hang up immediately after playback (option has hangup: true) - try { - await this.client.channels.hangup({ channelId }); - this.logger.log( - `Channel ${channelId} hung up immediately after playback (hangup: true)`, - ); - this.cleanupChannel(channelId); - } catch (error) { - this.logger.error( - `Error hanging up channel ${channelId}: ${error.message}`, - ); - this.cleanupChannel(channelId); - } - } else { - // Schedule hangup after 10 seconds - if (channelState.isActive) { - this.scheduleHangup(channelId, 10000); // 10000 ms = 10s - } - } - }); - } catch (error) { - this.logger.error( - `Error playing prompt on channel ${channelId}: ${error.message}`, - ); - channelState.activePlayback = null; - channelState.activePlaybackId = null; - } - } - - // Stop any active playback on a channel - async stopActivePlayback(channelId: string) { - const channelState = this.channelStates.get(channelId); - if (!channelState) { - return; - } - - try { - if (channelState.activePlayback) { - // Clear the playback ID first to prevent event handlers from firing - const oldPlaybackId = channelState.activePlaybackId; - channelState.activePlaybackId = null; - - await channelState.activePlayback.stop(); - this.logger.log( - `Stopped active playback ${oldPlaybackId} on channel: ${channelId}`, - ); - channelState.activePlayback = null; - } - } catch (error) { - this.logger.error( - `Error stopping playback on channel ${channelId}: ${error.message}`, - ); - channelState.activePlayback = null; - channelState.activePlaybackId = null; - } - } - - // Schedule call hangup after a delay - async scheduleHangup(channelId: string, delay: number) { - const channelState = this.channelStates.get(channelId); - if (!channelState || !channelState.isActive) { - return; - } - - // Cancel any existing hangup timer - this.cancelScheduledHangup(channelId); - - const timer = setTimeout(async () => { - try { - // Double-check channel is still active before hanging up - const currentState = this.channelStates.get(channelId); - if (currentState?.isActive) { - this.logger.log(`Attempting to hang up channel: ${channelId}`); - await this.client.channels.hangup({ channelId }); - this.logger.log(`Channel ${channelId} successfully hung up.`); - this.cleanupChannel(channelId); - } - } catch (error) { - this.logger.error( - `Error hanging up channel ${channelId}: ${error.message}`, - ); - // Still cleanup even if hangup fails - this.cleanupChannel(channelId); - } - }, delay); - - channelState.hangupTimer = timer; - this.logger.log( - `Scheduled hangup for channel ${channelId} in ${delay / 1000} seconds`, - ); - } - - // Cancel a scheduled hangup for a specific channel - cancelScheduledHangup(channelId: string) { - const channelState = this.channelStates.get(channelId); - if (!channelState) { - return; - } - - if (channelState.hangupTimer) { - clearTimeout(channelState.hangupTimer); - channelState.hangupTimer = null; - this.logger.log(`Cancelled scheduled hangup for channel: ${channelId}`); - } - } - - // Cleanup resources when the call ends - cleanupChannel(channelId: string) { - const channelState = this.channelStates.get(channelId); - if (!channelState) { - return; - } - - // Mark channel as inactive first to prevent new operations - channelState.isActive = false; - - // Stop any active playback (this will also clear activePlaybackId) - this.stopActivePlayback(channelId); - - // Cancel any scheduled hangup - this.cancelScheduledHangup(channelId); - - // Ensure playback ID is cleared - channelState.activePlaybackId = null; - - // Remove channel state - this.channelStates.delete(channelId); - - this.logger.log(`Cleaned up resources for channel: ${channelId}`); - } - // HANDLE DTMF SIGNALS async handleDTMF(channel: Channel, digit: string) { const channelId = channel.id; - const channelState = this.channelStates.get(channelId); + const channelState = this.channelStateManager.getState(channelId); - // Verify channel exists and is active if (!channelState) { this.logger.warn( `DTMF received for unknown channel: ${channelId}, digit: ${digit}`, @@ -498,7 +237,6 @@ export class IVRService implements OnModuleInit, OnModuleDestroy { return; } - // Verify this is an IVR call (has dial plan) if (!channelState.ivrDialPlan) { this.logger.warn( `DTMF received for non-IVR channel: ${channelId}, digit: ${digit}`, @@ -508,15 +246,14 @@ export class IVRService implements OnModuleInit, OnModuleDestroy { try { this.logger.log(`DTMF received: ${digit} on channel: ${channelId}`); - await this.stopActivePlayback(channelId); - // Cancel any scheduled hangup if new interaction occurs - this.cancelScheduledHangup(channelId); + await this.channelStateManager.stopActivePlayback(channelId); + this.channelStateManager.cancelScheduledHangup(channelId); - // If digit is '0', replay the main IVR prompt + // Digit '0' replays the main IVR prompt if (digit === '0') { const mainPrompt = channelState.ivrDialPlan.main?.prompt; if (mainPrompt) { - await this.playPrompt( + await this.playbackService.playPrompt( channelId, mainPrompt.replace('.wav', ''), ); @@ -526,24 +263,27 @@ export class IVRService implements OnModuleInit, OnModuleDestroy { const options = channelState.ivrDialPlan.main?.options || []; const option = options.find((opt) => opt.digit === parseInt(digit)); - // Play the corresponding prompt; if option has hangup: true, hang up immediately after playback - if (option && option.prompt) { - await this.playPrompt( - channel.id, + + if (option?.prompt) { + await this.playbackService.playPrompt( + channelId, option.prompt.replace('.wav', ''), option.hangup === true, ); } else { - await this.playPrompt(channelId, 'sound:option-is-invalid'); + await this.playbackService.playPrompt( + channelId, + 'sound:option-is-invalid', + ); } // TODO: Trigger the webhook if provided // if (option?.action === 'webhook' && option?.destination) { // this.sendWebhook(option.destination, { channelId, digit }); // } - } catch (error) { + } catch (err) { this.logger.error( - `Error handling DTMF on channel ${channelId}: ${error.message}`, + `Error handling DTMF on channel ${channelId}: ${(err as Error).message}`, ); } } diff --git a/apps/asterisk-worker/src/workers/playback.service.ts b/apps/asterisk-worker/src/workers/playback.service.ts new file mode 100644 index 0000000..8eeed33 --- /dev/null +++ b/apps/asterisk-worker/src/workers/playback.service.ts @@ -0,0 +1,144 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { Channel, Client } from 'ari-client'; +import { ChannelStateManager } from './channel-state.manager'; + +@Injectable() +export class PlaybackService { + private readonly logger = new Logger(PlaybackService.name); + private client: Client; + private audioPath: string; + + constructor(private readonly channelStateManager: ChannelStateManager) { + this.audioPath = process.env.ASTERISK_AUDIO_PATH || ''; + } + + setClient(client: Client) { + this.client = client; + } + + async playAudio(sessionId: string, channel: Channel) { + const channelId = channel.id; + const channelState = this.channelStateManager.getState(channelId); + + if (!channelState?.isActive) { + this.logger.warn( + `Attempted to play audio on inactive or unknown channel: ${channelId}`, + ); + return; + } + + const audio = `${this.audioPath}/${sessionId}`; + const playbackId = `${channelId}-${Date.now()}`; + + const playback = this.client.Playback(); + channelState.activePlayback = playback; + channelState.activePlaybackId = playbackId; + + playback.once('PlaybackFinished', async () => { + if (channelState.activePlaybackId !== playbackId) { + this.logger.log( + `PlaybackFinished for stale playback on channel: ${channelId}, ignoring`, + ); + return; + } + + channelState.activePlayback = null; + channelState.activePlaybackId = null; + + this.logger.log( + `PlaybackFinished for channel: ${channelId}, caller: ${channel?.caller?.number}`, + ); + + try { + if (channelState.isActive) { + await channel.hangup(); + } + } catch (err) { + this.logger.debug( + `Hangup after audio playback failed for channel ${channelId} (likely already gone): ${(err as Error).message}`, + ); + } + }); + + try { + await channel.play( + { playbackId: playbackId, media: `sound:${audio}` }, + playback, + ); + this.logger.log(`Playing recording for channel: ${channelId}`); + } catch (err) { + this.logger.error( + `Error starting audio playback on channel ${channelId}: ${(err as Error).message}`, + ); + channelState.activePlayback = null; + channelState.activePlaybackId = null; + } + } + + async playPrompt( + channelId: string, + media: string, + immediateHangup = false, + ) { + const channelState = this.channelStateManager.getState(channelId); + if (!channelState?.isActive) { + this.logger.warn( + `Attempted to play prompt on inactive or unknown channel: ${channelId}`, + ); + return; + } + + try { + await this.channelStateManager.stopActivePlayback(channelId); + this.channelStateManager.cancelScheduledHangup(channelId); + + const playback = this.client.Playback(); + const playbackId = playback.id; + channelState.activePlayback = playback; + channelState.activePlaybackId = playbackId; + + await this.client.channels.play({ + channelId: channelId, + playbackId: playbackId, + media: media, + }); + + this.logger.log(`Playback started: ${media} on channel: ${channelId}`); + + playback.once('PlaybackFinished', async () => { + if (channelState.activePlaybackId !== playbackId) { + this.logger.log( + `PlaybackFinished for stale playback on channel: ${channelId}, ignoring`, + ); + return; + } + + this.logger.log(`Playback finished on channel: ${channelId}`); + channelState.activePlayback = null; + channelState.activePlaybackId = null; + + if (immediateHangup) { + try { + await this.client.channels.hangup({ channelId }); + this.logger.log( + `Channel ${channelId} hung up after playback (hangup: true)`, + ); + } catch (err) { + this.logger.debug( + `Hangup after prompt failed for channel ${channelId} (likely already gone): ${(err as Error).message}`, + ); + } + // Cleanup will be triggered by StasisEnd event + } else if (channelState.isActive) { + this.channelStateManager.scheduleHangup(channelId, 10000); + } + }); + } catch (err) { + this.logger.error( + `Error playing prompt on channel ${channelId}: ${(err as Error).message}`, + ); + channelState.activePlayback = null; + channelState.activePlaybackId = null; + } + } +} diff --git a/apps/asterisk-worker/src/workers/types/ivr.types.ts b/apps/asterisk-worker/src/workers/types/ivr.types.ts new file mode 100644 index 0000000..e67abab --- /dev/null +++ b/apps/asterisk-worker/src/workers/types/ivr.types.ts @@ -0,0 +1,31 @@ +import { Playback } from 'ari-client'; + +export interface IVRMenuOption { + digit: number; + prompt?: string; + hangup?: boolean; + action?: string; + destination?: string; +} + +export interface IVRMenu { + prompt: string; + options: IVRMenuOption[]; +} + +export interface IVRDialPlan { + main: IVRMenu; + [key: string]: IVRMenu; +} + +export interface ChannelState { + channelId: string; + ivrDialPlan: IVRDialPlan | null; + sessionId: string; + broadcastLogId: string; + address: string; + activePlayback: Playback | null; + activePlaybackId: string | null; + hangupTimer: NodeJS.Timeout | null; + isActive: boolean; +} From 9ee9806fc490e90c5c2a597043c6ac55152ee4c3 Mon Sep 17 00:00:00 2001 From: manjik-rumsan Date: Mon, 27 Apr 2026 14:28:56 +0545 Subject: [PATCH 2/2] fixed reconnect issue Co-authored-by: Copilot --- .../src/workers/ami.service.ts | 81 ++++++++++++++++++- 1 file changed, 77 insertions(+), 4 deletions(-) diff --git a/apps/asterisk-worker/src/workers/ami.service.ts b/apps/asterisk-worker/src/workers/ami.service.ts index d103530..c219a42 100644 --- a/apps/asterisk-worker/src/workers/ami.service.ts +++ b/apps/asterisk-worker/src/workers/ami.service.ts @@ -1,4 +1,4 @@ -import { Inject, Injectable, Logger } from '@nestjs/common'; +import { Inject, Injectable, Logger, OnModuleDestroy } from '@nestjs/common'; import { BatchManger, BroadcastLogQueue } from '@rsconnect/queue'; import { BroadcastStatus, @@ -19,14 +19,20 @@ const amiConfig = { trunk_max_channels: Number(process.env.ASTERISK_TRUNK_CHANNELS), }; +const RECONNECT_BASE_MS = 10_000; +const RECONNECT_MAX_MS = 60_000; + @Injectable() -export class AMIService { +export class AMIService implements OnModuleDestroy { private readonly logger = new Logger(AMIService.name); private ami: any; // BUG FIX: Per-channel DTMF tracking instead of a single shared array. // The old code used one ivrSequence[] for ALL concurrent calls, causing // jumbled reports where digits from different calls were mixed together. private ivrSequences = new Map(); + private reconnectAttempts = 0; + private reconnectTimer: NodeJS.Timeout | null = null; + private isDestroyed = false; constructor( @Inject('AMQP_CONNECTION') @@ -38,7 +44,18 @@ export class AMIService { } connect() { - console.log(amiConfig); + // Clean up the previous instance to prevent socket/listener leaks + if (this.ami) { + try { + this.ami.removeAllListeners(); + this.ami.disconnect(); + } catch (_) { + // ignore cleanup errors + } + this.ami = null; + } + + this.logger.log('Connecting to AMI...'); this.ami = new AsteriskManager( amiConfig.port, amiConfig.host, @@ -47,9 +64,32 @@ export class AMIService { true, ); - this.ami.keepConnected(); this.ami.action(); + this.ami.on('connect', () => { + this.logger.log('AMI connected and authenticated'); + this.reconnectAttempts = 0; + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + }); + + this.ami.on('close', () => { + this.logger.warn('AMI connection closed'); + if (!this.isDestroyed) { + this.scheduleReconnect(); + } + }); + + this.ami.on('end', () => { + this.logger.warn('AMI connection ended'); + }); + + this.ami.on('error', (err: Error) => { + this.logger.error(`AMI connection error: ${err?.message ?? String(err)}`); + }); + this.ami.on('managerevent', async (evt) => { const eventType = evt.event; @@ -125,6 +165,39 @@ export class AMIService { } }); } + + private scheduleReconnect() { + if (this.reconnectTimer) return; // already scheduled + + this.reconnectAttempts += 1; + const delay = Math.min( + RECONNECT_BASE_MS * this.reconnectAttempts, + RECONNECT_MAX_MS, + ); + this.logger.warn( + `Scheduling AMI reconnect in ${delay / 1000}s (attempt #${this.reconnectAttempts})`, + ); + this.reconnectTimer = setTimeout(() => { + this.reconnectTimer = null; + this.connect(); + }, delay); + } + + onModuleDestroy() { + this.isDestroyed = true; + if (this.reconnectTimer) { + clearTimeout(this.reconnectTimer); + this.reconnectTimer = null; + } + if (this.ami) { + try { + this.ami.removeAllListeners(); + this.ami.disconnect(); + } catch (_) { + // ignore + } + } + } } // ======== DialState-Event ========