-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsimple_multi_example.py
More file actions
143 lines (110 loc) · 4.26 KB
/
simple_multi_example.py
File metadata and controls
143 lines (110 loc) · 4.26 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import logging
from dataclasses import dataclass
from dotenv import load_dotenv
from livekit import api
from livekit.agents import (
Agent,
AgentServer,
AgentSession,
ChatContext,
JobContext,
JobProcess,
RunContext,
cli,
inference,
metrics,
)
from livekit.agents.job import get_job_context
from livekit.agents.llm import function_tool
from livekit.agents.voice import MetricsCollectedEvent
from livekit.plugins import silero # only VAD plugin used
logger = logging.getLogger("multi-agent")
load_dotenv()
common_instructions = (
"Your name is Echo. You are a story teller that interacts with the user via voice. "
"You are curious and friendly, with a sense of humor."
)
@dataclass
class StoryData:
name: str | None = None
location: str | None = None
class IntroAgent(Agent):
def __init__(self) -> None:
super().__init__(
instructions=(
f"{common_instructions} Your goal is to gather a few pieces of "
"information from the user to make the story personalized and engaging. "
"Ask the user for their name and where they are from. "
"Start the conversation with a short introduction."
)
)
async def on_enter(self):
self.session.generate_reply()
@function_tool
async def information_gathered(
self,
context: RunContext[StoryData],
name: str,
location: str,
):
"""Called when the user has provided name + location."""
context.userdata.name = name
context.userdata.location = location
story_agent = StoryAgent(name, location)
logger.info("switching to the story agent with userdata: %s", context.userdata)
return story_agent
class StoryAgent(Agent):
def __init__(self, name: str, location: str, *, chat_ctx: ChatContext | None = None) -> None:
super().__init__(
instructions=(
f"{common_instructions} Use the user's information to make the story personalized. "
"Create the entire story, weaving in elements of their information, and make it "
"interactive by occasionally asking the user questions. "
"Do not end on a statement where the user is not expected to respond. "
"When interrupted, ask if the user would like to continue or end. "
f"The user's name is {name}, from {location}."
),
# IMPORTANT: do NOT set llm/tts here unless you want to override the session defaults.
# We rely on the AgentSession's inference LLM/TTS for a provider-free setup.
chat_ctx=chat_ctx,
)
async def on_enter(self):
self.session.generate_reply()
@function_tool
async def story_finished(self, context: RunContext[StoryData]):
"""End the conversation and delete the room."""
self.session.interrupt()
await self.session.generate_reply(
instructions=f"Say goodbye to {context.userdata.name}.",
allow_interruptions=False,
)
job_ctx = get_job_context()
await job_ctx.api.room.delete_room(api.DeleteRoomRequest(room=job_ctx.room.name))
server = AgentServer()
def prewarm(proc: JobProcess):
proc.userdata["vad"] = silero.VAD.load()
server.setup_fnc = prewarm
@server.rtc_session()
async def entrypoint(ctx: JobContext):
session = AgentSession[StoryData](
vad=ctx.proc.userdata["vad"],
# LiveKit Inference (no OpenAI/Deepgram/Cartesia keys needed)
stt=inference.STT("deepgram/nova-3", language="multi"),
llm=inference.LLM("openai/gpt-4.1-mini"),
tts=inference.TTS(
"cartesia/sonic-3",
voice="9626c31c-bec5-4cca-baa8-f8ba9e84c8bc",
),
userdata=StoryData(),
)
usage_collector = metrics.UsageCollector()
@session.on("metrics_collected")
def _on_metrics_collected(ev: MetricsCollectedEvent):
metrics.log_metrics(ev.metrics)
usage_collector.collect(ev.metrics)
async def log_usage():
logger.info("Usage: %s", usage_collector.get_summary())
ctx.add_shutdown_callback(log_usage)
await session.start(agent=IntroAgent(), room=ctx.room)
if __name__ == "__main__":
cli.run_app(server)