Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 79 additions & 4 deletions apps/asterisk-worker/src/workers/ivr.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import {
import { Broadcast, QueueBroadcastLog } from '@rumsan/connect/types';
import ari, { Channel, Client } from 'ari-client';
import { randomUUID } from 'crypto';
import { EventEmitter } from 'events';
import { ChannelStateManager } from './channel-state.manager';
import { PlaybackService } from './playback.service';

Expand All @@ -19,6 +20,9 @@ export class IVRService implements OnModuleInit, OnModuleDestroy {
private readonly logger = new Logger(IVRService.name);
private client: Client;
private config;
private isConnected = false;
private isShuttingDown = false;
private reconnectTimer: NodeJS.Timeout | null = null;

constructor(
private readonly batchManager: BatchManager,
Expand Down Expand Up @@ -51,6 +55,10 @@ export class IVRService implements OnModuleInit, OnModuleDestroy {
broadcastLog: QueueBroadcastLog,
ivrJSON?: string,
) {
if (!this.isConnected) {
throw new Error('ARI not connected — call will be retried');
}

const ivrDialPlan = ivrJSON ? JSON.parse(ivrJSON) : null;

// BUG FIX: Pre-generate channelId and register state BEFORE originate
Expand Down Expand Up @@ -115,6 +123,29 @@ export class IVRService implements OnModuleInit, OnModuleDestroy {
this.client = await ari.connect(server, user, password);
await this.client.start(appName);

this.isConnected = true;

// ari-client has built-in WebSocket retry (up to 10 attempts).
// Track connectivity so we can guard outbound calls during transient drops.
this.client.on('WebSocketReconnecting', () => {
this.isConnected = false;
this.logger.warn('ARI WebSocket reconnecting...');
});

this.client.on('WebSocketConnected', () => {
this.isConnected = true;
this.logger.log('ARI WebSocket reconnected');
});

// When ari-client exhausts all built-in retries, start our own reconnect loop.
this.client.once('WebSocketMaxRetries', () => {
this.isConnected = false;
this.logger.error(
'ARI WebSocket max retries exceeded — scheduling full reconnect',
);
this.scheduleReconnect(1);
});

// Share the ARI client with dependent services
this.channelStateManager.setClient(this.client);
this.playbackService.setClient(this.client);
Expand All @@ -126,10 +157,7 @@ export class IVRService implements OnModuleInit, OnModuleDestroy {
}
}

async onModuleInit() {
this.logger.log('Module Init');
await this.connect();

private setupEventHandlers() {
// Handle StasisStart - when a channel enters the Stasis application
this.client.on('StasisStart', async (event, incomingChannel) => {
try {
Expand Down Expand Up @@ -211,9 +239,56 @@ export class IVRService implements OnModuleInit, OnModuleDestroy {
});
}

private scheduleReconnect(attempt: number) {
if (this.isShuttingDown) return;
const delay = Math.min(5000 * attempt, 60_000);
this.logger.warn(
`ARI disconnected — reconnecting in ${delay / 1000}s (attempt ${attempt})`,
);
this.reconnectTimer = setTimeout(
() => this.attemptReconnect(attempt),
delay,
);
}

private async attemptReconnect(attempt: number) {
if (this.isShuttingDown) return;
this.logger.log(`Attempting ARI full reconnect (attempt ${attempt})`);
try {
// Clean up the dead client before creating a new one
if (this.client) {
(this.client as unknown as EventEmitter).removeAllListeners();
try { this.client.stop(); } catch (_) { /* ignore */ }
}
await this.connect();
this.setupEventHandlers();
this.logger.log(
`ARI full reconnect succeeded on attempt ${attempt}`,
);
} catch (error) {
this.logger.error(
`ARI reconnect attempt ${attempt} failed:`,
error,
);
this.scheduleReconnect(attempt + 1);
}
}

async onModuleInit() {
this.logger.log('Module Init');
await this.connect();
this.setupEventHandlers();
}

async onModuleDestroy() {
this.isShuttingDown = true;
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
if (this.client) {
this.logger.log('Stopping ARI client');
(this.client as unknown as EventEmitter).removeAllListeners();
this.client.stop();
}
}
Expand Down
Loading