Skip to content

Can I answer a call if I don't have "Event Grid" service and a "Registered Phone Number"? #81

@ANYMS-A

Description

@ANYMS-A

Hi,
Recently I am trying to build a voice AI agent with ACS. I tried to follow the intro from this example: https://github.com/Azure-Samples/communication-services-python-quickstarts/tree/main/callautomation-azure-openai-voice

However, I saw in the README, this example need a registered Phone number. But I thougt I can call with the created ACS user name right?
Besides, I don't have an Event Grid subscription yet. I only deploy my app to a Azure App Service to verify if it can work.

But I can only recieve the following events:

INFO:     xxx:xxx:xxx:xxxx- "POST /api/callbacks/incomingCall HTTP/1.1" 200 OK
INFO:main:Received Event:-> Microsoft.Communication.CallDisconnected, Correlation Id:-> 447dd7fd-0b4b-4279-b609-08792e7295cd, CallConnectionId:-> 07004c80-34ed-4107-bfbe-c93d89e77abd
INFO:     xxx:xxx:xxx:xxxx - "POST /api/callbacks/incomingCall HTTP/1.1" 200 OK
INFO:main:Received Event:-> Microsoft.Communication.CreateCallFailed, Correlation Id:-> 447dd7fd-0b4b-4279-b609-08792e7295cd, CallConnectionId:-> 07004c80-34ed-4107-bfbe-c93d89e77abd

I will paste my code below:

from fastapi.responses import JSONResponse
from urllib.parse import urlencode, urlparse, urlunparse
import logging
import json
from azure.communication.callautomation import (
    MediaStreamingOptions,
    AudioFormat,
    MediaStreamingContentType,
    MediaStreamingAudioChannelType,
    StreamingTransportType
    )
from azure.communication.callautomation.aio import (
    CallAutomationClient
    )
import os
import uuid
import uvicorn
from dotenv import load_dotenv


load_dotenv()

ACS_HOST = os.environ.get("ACS_HOST")
ACS_KEY = os.environ.get("ACS_KEY")
# Callback events URI to handle callback events.
CALLBACK_URI_HOST = os.environ.get("CALLBACK_URI_HOST")
CALLBACK_EVENTS_URI = CALLBACK_URI_HOST + "/api/callbacks"

ONE_ON_ONE_CALLER_MAP = {}

acs_client = CallAutomationClient(ACS_HOST, ACS_KEY)
app = FastAPI()

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)


@app.get("/api/getOneOnOneCallUser")
async def get_one_on_one_call_user():

    try:
        # Create an identity
        identity = acs_client.create_user()
        logging.info("\nCreated an caller identity with ID: " + identity.properties['id'])
        # Issue an access token with a validity of 24 hours and the "voip" scope for an identity
        token_result = acs_client.get_token(identity, ["voip"])
        logging.info("\nIssued an caller access token with 'voip' scope that expires at " + token_result.expires_on + ":")
        logging.info(token_result.token)

        # Create an identity
        identity_acc = acs_client.create_user()
        logging.info("\nCreated an accepter identity with ID: " + identity_acc.properties['id'])
        # Issue an access token with a validity of 24 hours and the "voip" scope for an identity
        token_result_acc = acs_client.get_token(identity_acc, ["voip"])
        logging.info("\nIssued an accepter access token with 'voip' scope that expires at " + token_result_acc.expires_on + ":")
        logging.info(token_result_acc.token)
        id_str = uuid.uuid4()
        data = {
            "caller": {
                "identity": identity,
                "token": token_result
            },
            "accepter": {
                "identity": identity_acc,
                "token": token_result_acc
            }
        }
        ONE_ON_ONE_CALLER_MAP[id_str] = data
        return JSONResponse(content=data, status_code=200)
    except Exception as e:
        logger.error(f"Error creating users or tokens: {e}")
        return JSONResponse(content={"error": "Failed to create users or tokens"}, status_code=500)


@app.post('/api/callbacks/incomingCall')
async def incoming_call_callbacks(request: Request):
    event_list = await request.json()
    for event in event_list:
        # Parsing callback events
        global call_connection_id
        event_data = event['data']
        call_connection_id = event_data["callConnectionId"]
        logger.info(f"Received Event:-> {event['type']}, Correlation Id:-> {event_data['correlationId']}, CallConnectionId:-> {call_connection_id}")
        if event['type'] == "Microsoft.Communication.IncomingCall":
            logger.info(f"Received IncomingCall event for connection id: {call_connection_id}")
            logger.info("Start answering call")
            incoming_call_response = await incoming_call_handler(event)
            return incoming_call_response
    return JSONResponse(content={}, status_code=200)

@app.post('/api/callbacks/{contextId}')
async def callbacks(contextId: str, request: Request):
     event_list = await request.json()
     for event in event_list:
        # Parsing callback events
        global call_connection_id
        event_data = event['data']
        call_connection_id = event_data["callConnectionId"]
        logger.info(f"Received Event:-> {event['type']}, Correlation Id:-> {event_data['correlationId']}, CallConnectionId:-> {call_connection_id}")
        if event['type'] == "Microsoft.Communication.CallConnected":
            call_connection_properties = await acs_client.get_call_connection(call_connection_id).get_call_properties()
            media_streaming_subscription = call_connection_properties.media_streaming_subscription
            logger.info(f"MediaStreamingSubscription:--> {media_streaming_subscription}")
            logger.info(f"Received CallConnected event for connection id: {call_connection_id}")
            logger.info("CORRELATION ID:--> %s", event_data["correlationId"])
            logger.info("CALL CONNECTION ID:--> %s", event_data["callConnectionId"])
        elif event['type'] == "Microsoft.Communication.MediaStreamingStarted":
            logger.info(f"Media streaming content type:--> {event_data['mediaStreamingUpdate']['contentType']}")
            logger.info(f"Media streaming status:--> {event_data['mediaStreamingUpdate']['mediaStreamingStatus']}")
            logger.info(f"Media streaming status details:--> {event_data['mediaStreamingUpdate']['mediaStreamingStatusDetails']}")
        elif event['type'] == "Microsoft.Communication.MediaStreamingStopped":
            logger.info(f"Media streaming content type:--> {event_data['mediaStreamingUpdate']['contentType']}")
            logger.info(f"Media streaming status:--> {event_data['mediaStreamingUpdate']['mediaStreamingStatus']}")
            logger.info(f"Media streaming status details:--> {event_data['mediaStreamingUpdate']['mediaStreamingStatusDetails']}")
        elif event['type'] == "Microsoft.Communication.MediaStreamingFailed":
            logger.info(f"Code:->{event_data['resultInformation']['code']}, Subcode:-> {event_data['resultInformation']['subCode']}")
            logger.info(f"Message:->{event_data['resultInformation']['message']}")
        elif event['type'] == "Microsoft.Communication.CallDisconnected":
            logger.info(f"Received CallDisconnected event for connection id: {call_connection_id}")

     return JSONResponse(content={}, status_code=200)

# WebSocket.
@app.websocket('/ws')
async def ws(websocket: WebSocket):
    await websocket.accept()
    print("Client connected to WebSocket")
    try:
        while True:
            # Receive data from the client
            data = await websocket.receive_text()
            print(f"Received data: {data}")
            data = json.loads(data)
            kind = data['kind']
            if kind == "AudioData":
                audio_data_section = data.get("audioData", {})
                if not audio_data_section.get("silent", True):
                    audio_data = audio_data_section.get("data")
                    logging.info(f"Received audio data, type {type(data)}, length: {len(audio_data)}")
    except WebSocketDisconnect:
        print("WebSocket connection closed")
    except Exception as e:
        print(f"WebSocket error: {e}")


@app.get('/')
def home():
    return 'Hello ACS CallAutomation!'


async def incoming_call_handler(event: dict):
    caller_id =  event["data"]['from']['rawId'] 
    logger.info("incoming call handler caller id: %s", caller_id)
    incoming_call_context=event.data['incomingCallContext']
    logging.info(f"incoming call context:{incoming_call_context}")
    guid =uuid.uuid4()
    query_parameters = urlencode({"callerId": caller_id})
    callback_uri = f"{CALLBACK_EVENTS_URI}/{guid}?{query_parameters}"
    
    parsed_url = urlparse(CALLBACK_EVENTS_URI)
    websocket_url = urlunparse(('wss',parsed_url.netloc,'/ws','', '', ''))

    logger.info("callback url: %s",  callback_uri)
    logger.info("websocket url: %s",  websocket_url)

    media_streaming_options = MediaStreamingOptions(
            transport_url=websocket_url,
            transport_type=StreamingTransportType.WEBSOCKET,
            content_type=MediaStreamingContentType.AUDIO,
            audio_channel_type=MediaStreamingAudioChannelType.MIXED,
            start_media_streaming=True,
            enable_bidirectional=True,
            audio_format=AudioFormat.PCM24_K_MONO)
    
    answer_call_result = await acs_client.answer_call(
        incoming_call_context=incoming_call_context,
        operation_context="incomingCall",
        callback_url=callback_uri, 
        media_streaming=media_streaming_options
    )
    logger.info("Answered call for connection id: %s", answer_call_result.call_connection_id)
    return JSONResponse(content={}, status_code=200)

if __name__ == '__main__':
    uvicorn.run(app, host="0.0.0.0", port=8000)

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions