-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathrun_agent.py
More file actions
67 lines (51 loc) · 2.1 KB
/
run_agent.py
File metadata and controls
67 lines (51 loc) · 2.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
# Tencent is pleased to support the open source community by making tRPC-Agent-Python available.
#
# Copyright (C) 2026 Tencent. All rights reserved.
#
# tRPC-Agent-Python is licensed under Apache-2.0.
import asyncio
import uuid
from dotenv import load_dotenv
from trpc_agent_sdk.runners import Runner
from trpc_agent_sdk.sessions import InMemorySessionService
from trpc_agent_sdk.types import Content
from trpc_agent_sdk.types import Part
load_dotenv()
async def run_compose_agent():
"""Run the Combined Agent Demo"""
APP_NAME = "compose_agent_demo"
USER_ID = "demo_user"
print("=" * 40)
print("Compose Agent Demo - Combined Orchestration")
print("=" * 40)
from agent.agent import root_agent
session_service = InMemorySessionService()
runner = Runner(app_name=APP_NAME, agent=root_agent, session_service=session_service)
test_content = """
Smart Home Security System
Our AI-powered security system provides 24/7 monitoring with facial recognition,
motion detection, and mobile alerts. The system stores user data including video
recordings and personal information for security analysis.
Features:
- Real-time monitoring
- Mobile app notifications
- Cloud storage for recordings
- User data analytics
"""
print("Analyze Content:")
print(test_content.strip())
print("\nRun Process:")
user_message = Content(parts=[Part.from_text(text=test_content)])
async for event in runner.run_async(user_id=USER_ID, session_id=str(uuid.uuid4()), new_message=user_message):
if event.content and event.content.parts and event.author != "user":
if not event.partial:
for part in event.content.parts:
if part.text:
if event.author == "report_generator":
print("\n[Comprehensive Report]")
print(part.text)
else:
print(f"[{event.author}] {part.text}")
print("-" * 30)
if __name__ == "__main__":
asyncio.run(run_compose_agent())