Skip to content
Merged

Dev #90

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 103 additions & 12 deletions apps/asterisk-worker/src/workers/ami.service.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Inject, Injectable, Logger } from '@nestjs/common';
import { Inject, Injectable, Logger, OnModuleDestroy } from '@nestjs/common';
import { BatchManger, BroadcastLogQueue } from '@rsconnect/queue';
import {
BroadcastStatus,
Expand All @@ -19,10 +19,20 @@ const amiConfig = {
trunk_max_channels: Number(process.env.ASTERISK_TRUNK_CHANNELS),
};

const RECONNECT_BASE_MS = 10_000;
const RECONNECT_MAX_MS = 60_000;

@Injectable()
export class AMIService {
export class AMIService implements OnModuleDestroy {
private readonly logger = new Logger(AMIService.name);
private ami: any;
// BUG FIX: Per-channel DTMF tracking instead of a single shared array.
// The old code used one ivrSequence[] for ALL concurrent calls, causing
// jumbled reports where digits from different calls were mixed together.
private ivrSequences = new Map<string, string[]>();
private reconnectAttempts = 0;
private reconnectTimer: NodeJS.Timeout | null = null;
private isDestroyed = false;

constructor(
@Inject('AMQP_CONNECTION')
Expand All @@ -34,8 +44,18 @@ export class AMIService {
}

connect() {
console.log(amiConfig);
const ivrSequence: string[] = [];
// Clean up the previous instance to prevent socket/listener leaks
if (this.ami) {
try {
this.ami.removeAllListeners();
this.ami.disconnect();
} catch (_) {
// ignore cleanup errors
}
this.ami = null;
}

this.logger.log('Connecting to AMI...');
this.ami = new AsteriskManager(
amiConfig.port,
amiConfig.host,
Expand All @@ -44,22 +64,46 @@ export class AMIService {
true,
);

this.ami.keepConnected();
this.ami.action();

this.ami.on('connect', () => {
this.logger.log('AMI connected and authenticated');
this.reconnectAttempts = 0;
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
});

this.ami.on('close', () => {
this.logger.warn('AMI connection closed');
if (!this.isDestroyed) {
this.scheduleReconnect();
}
});

this.ami.on('end', () => {
this.logger.warn('AMI connection ended');
});

this.ami.on('error', (err: Error) => {
this.logger.error(`AMI connection error: ${err?.message ?? String(err)}`);
});

this.ami.on('managerevent', async (evt) => {
const eventType = evt.event;
// console.log('=====', eventType);
// console.log(evt);

if (eventType === 'Hangup') {
//console.log('Hangup Event', evt);
const disposition = getAsteriskDisposition(evt.cause, evt.channelstate);
const broadcastLog: QueueBroadcastLogVoice = this.batchManager.getLog(
evt.uniqueid,
);

if (broadcastLog) {
// Retrieve the per-channel DTMF sequence and clean up
const ivrSequence = this.ivrSequences.get(evt.uniqueid) || [];
this.ivrSequences.delete(evt.uniqueid);

broadcastLog.status =
disposition === CallDisposition.ANSWERED
? BroadcastStatus.SUCCESS
Expand All @@ -68,13 +112,16 @@ export class AMIService {
trunk: amiConfig.trunk,
disposition,
hangupDetails: evt,
ivrSequence,
ivrSequence: [...ivrSequence], // snapshot copy
};
await this.broadcastLogQueue.addVoice(broadcastLog);
await this.batchManager.endMonitoring(evt.uniqueid);
this.logger.log(`Call Hangup: ${evt.uniqueid}`);
ivrSequence.length = 0;
this.logger.log(
`Call Hangup: ${evt.uniqueid}, DTMF sequence: [${ivrSequence.join(',')}]`,
);
} else {
// Not a tracked call — clean up any stale DTMF data just in case
this.ivrSequences.delete(evt.uniqueid);
console.log('=====> Call Received: ', evt.calleridnum);
}
}
Expand Down Expand Up @@ -103,10 +150,54 @@ export class AMIService {
}

if (eventType === 'DTMFEnd') {
ivrSequence.push(evt.digit);
// Track DTMF digit per-channel using uniqueid
const uniqueid = evt.uniqueid;
if (!this.ivrSequences.has(uniqueid)) {
this.ivrSequences.set(uniqueid, []);
}
const sequence = this.ivrSequences.get(uniqueid);
if (sequence) {
sequence.push(evt.digit);
}
this.logger.debug(
`DTMF digit '${evt.digit}' recorded for channel ${uniqueid}`,
);
}
});
}

private scheduleReconnect() {
if (this.reconnectTimer) return; // already scheduled

this.reconnectAttempts += 1;
const delay = Math.min(
RECONNECT_BASE_MS * this.reconnectAttempts,
RECONNECT_MAX_MS,
);
this.logger.warn(
`Scheduling AMI reconnect in ${delay / 1000}s (attempt #${this.reconnectAttempts})`,
);
this.reconnectTimer = setTimeout(() => {
this.reconnectTimer = null;
this.connect();
}, delay);
}

onModuleDestroy() {
this.isDestroyed = true;
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
if (this.ami) {
try {
this.ami.removeAllListeners();
this.ami.disconnect();
} catch (_) {
// ignore
}
}
}
}

// ======== DialState-Event ========
Expand Down
6 changes: 5 additions & 1 deletion apps/asterisk-worker/src/workers/asterisk.module.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import { SessionModel } from '../entities/session.entity';
import { AMIService } from './ami.service';
import { AsteriskWorker } from './asterisk.worker';
import { AudioService } from './audio.service';
import { ChannelStateManager } from './channel-state.manager';
import { IVRService } from './ivr.service';
import { PlaybackService } from './playback.service';

@Module({
imports: [
Expand All @@ -20,7 +22,9 @@ import { IVRService } from './ivr.service';
AudioService,
AMIService,
BatchManger,
IVRService
ChannelStateManager,
PlaybackService,
IVRService,
],
exports: [AsteriskWorker, AMIService],
})
Expand Down
153 changes: 153 additions & 0 deletions apps/asterisk-worker/src/workers/channel-state.manager.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
import { Injectable, Logger } from '@nestjs/common';
import { Client } from 'ari-client';
import { ChannelState, IVRDialPlan } from './types/ivr.types';

@Injectable()
export class ChannelStateManager {
private readonly logger = new Logger(ChannelStateManager.name);
private client: Client;
private channelStates = new Map<string, ChannelState>();

setClient(client: Client) {
this.client = client;
}

registerChannel(params: {
channelId: string;
ivrDialPlan: IVRDialPlan | null;
sessionId: string;
broadcastLogId: string;
address: string;
}): ChannelState {
const channelState: ChannelState = {
channelId: params.channelId,
ivrDialPlan: params.ivrDialPlan,
sessionId: params.sessionId,
broadcastLogId: params.broadcastLogId,
address: params.address,
activePlayback: null,
activePlaybackId: null,
hangupTimer: null,
isActive: true,
};

this.channelStates.set(params.channelId, channelState);
this.logger.log(
`Channel registered: ${params.channelId}, Address: ${params.address}`,
);
return channelState;
}

getState(channelId: string): ChannelState | undefined {
return this.channelStates.get(channelId);
}

hasChannel(channelId: string): boolean {
return this.channelStates.has(channelId);
}

removeChannel(channelId: string) {
this.channelStates.delete(channelId);
}

async stopActivePlayback(channelId: string) {
const channelState = this.channelStates.get(channelId);
if (!channelState?.activePlayback) {
return;
}

const oldPlaybackId = channelState.activePlaybackId;
channelState.activePlaybackId = null;

try {
await channelState.activePlayback.stop();
this.logger.log(
`Stopped active playback ${oldPlaybackId} on channel: ${channelId}`,
);
} catch (err) {
// Playback may already be finished or channel gone — not an error
this.logger.debug(
`Could not stop playback ${oldPlaybackId} on channel ${channelId}: ${(err as Error).message}`,
);
} finally {
channelState.activePlayback = null;
}
}

scheduleHangup(channelId: string, delay: number) {
const channelState = this.channelStates.get(channelId);
if (!channelState?.isActive) {
return;
}

this.cancelScheduledHangup(channelId);

const timer = setTimeout(async () => {
const currentState = this.channelStates.get(channelId);
if (!currentState?.isActive) {
return;
}

try {
this.logger.log(`Scheduled hangup firing for channel: ${channelId}`);
await this.client.channels.hangup({ channelId });
this.logger.log(`Channel ${channelId} successfully hung up.`);
} catch (err) {
this.logger.debug(
`Hangup failed for channel ${channelId} (likely already gone): ${(err as Error).message}`,
);
}
// Cleanup will be triggered by StasisEnd event
}, delay);

channelState.hangupTimer = timer;
this.logger.log(
`Scheduled hangup for channel ${channelId} in ${delay / 1000}s`,
);
}

cancelScheduledHangup(channelId: string) {
const channelState = this.channelStates.get(channelId);
if (!channelState?.hangupTimer) {
return;
}

clearTimeout(channelState.hangupTimer);
channelState.hangupTimer = null;
this.logger.log(`Cancelled scheduled hangup for channel: ${channelId}`);
}

async cleanupChannel(channelId: string) {
const channelState = this.channelStates.get(channelId);
if (!channelState) {
return; // Already cleaned up — idempotent guard
}

// Mark inactive first to prevent new operations
channelState.isActive = false;

// Remove from map immediately to prevent re-entrant cleanup
this.channelStates.delete(channelId);

await this.stopActivePlayback(channelId);

// stopActivePlayback won't find it in the map anymore since we already deleted,
// so stop it directly from the state we captured
if (channelState.activePlayback) {
try {
channelState.activePlaybackId = null;
await channelState.activePlayback.stop();
} catch {
// already gone
}
channelState.activePlayback = null;
}

if (channelState.hangupTimer) {
clearTimeout(channelState.hangupTimer);
channelState.hangupTimer = null;
}

this.logger.log(`Cleaned up resources for channel: ${channelId}`);
}
}
Loading
Loading