Skip to content

Commit 33cfa00

Browse files
InnexphilhetzeldanielericleeAbhiPrasad
authored
feat: Add Agno Workflow tracing support (#46)
Co-authored-by: Hossein Niazmandi <hossein@braintrustdata.com> Co-authored-by: Phil Hetzel <phetzel1@protonmail.com> Co-authored-by: Dan Lee <dan@braintrustdata.com> Co-authored-by: Abhijeet Prasad <abhijeet@braintrustdata.com>
1 parent eb53011 commit 33cfa00

8 files changed

Lines changed: 1239 additions & 212 deletions

File tree

py/noxfile.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,9 @@ def test_agno(session, version):
143143
_install_test_deps(session)
144144
_install(session, "agno", version)
145145
_install(session, "openai") # Required for agno.models.openai
146-
_run_tests(session, f"{WRAPPER_DIR}/test_agno.py")
146+
_install(session, "fastapi") # Required for agno.workflow
147+
_run_tests(session, f"{WRAPPER_DIR}/agno/test_agno.py")
148+
_run_tests(session, f"{WRAPPER_DIR}/agno/test_workflow.py")
147149
_run_core_tests(session)
148150

149151

py/src/braintrust/wrappers/agno/__init__.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
response = agent.run(...)
1919
"""
2020

21-
__all__ = ["setup_agno", "wrap_agent", "wrap_function_call", "wrap_model", "wrap_team"]
21+
__all__ = ["setup_agno", "wrap_agent", "wrap_function_call", "wrap_model", "wrap_team", "wrap_workflow"]
2222

2323
import logging
2424

@@ -28,6 +28,7 @@
2828
from .function_call import wrap_function_call
2929
from .model import wrap_model
3030
from .team import wrap_team
31+
from .workflow import wrap_workflow
3132

3233
logger = logging.getLogger(__name__)
3334

@@ -61,7 +62,16 @@ def setup_agno(
6162
team.Team = wrap_team(team.Team) # pyright: ignore[reportUnknownMemberType]
6263
models.base.Model = wrap_model(models.base.Model) # pyright: ignore[reportUnknownMemberType]
6364
tools.function.FunctionCall = wrap_function_call(tools.function.FunctionCall) # pyright: ignore[reportUnknownMemberType]
64-
return True
6565
except ImportError:
6666
# Not installed - this is expected when using auto_instrument()
6767
return False
68+
69+
try:
70+
from agno import workflow # pyright: ignore
71+
72+
workflow.Workflow = wrap_workflow(workflow.Workflow) # pyright: ignore[reportUnknownMemberType]
73+
except ImportError:
74+
# agno.workflow requires fastapi which may not be installed
75+
pass
76+
77+
return True
Lines changed: 308 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,308 @@
1+
# pyright: reportMissingParameterType=false
2+
# pyright: reportUnknownMemberType=false
3+
# pyright: reportUnknownParameterType=false
4+
# pyright: reportUnknownVariableType=false
5+
6+
from inspect import isawaitable
7+
8+
from braintrust.wrappers.agno.agent import wrap_agent
9+
10+
PROJECT_NAME = "test-agno-app"
11+
12+
13+
class FakeMetrics:
14+
def __init__(self):
15+
self.input_tokens = 1
16+
self.output_tokens = 2
17+
self.total_tokens = 3
18+
self.duration = 0.1
19+
self.time_to_first_token = 0.01
20+
21+
22+
class FakeRunOutput:
23+
def __init__(self, content: str):
24+
self.content = content
25+
self.status = "COMPLETED"
26+
self.model = "fake-model"
27+
self.model_provider = "FakeProvider"
28+
self.metrics = FakeMetrics()
29+
30+
31+
class FakeEvent:
32+
def __init__(self, event: str, **kwargs):
33+
self.event = event
34+
for k, v in kwargs.items():
35+
setattr(self, k, v)
36+
37+
38+
class FakeExecutionInput:
39+
def __init__(self, input):
40+
self.input = input
41+
self.kind = "workflow-execution"
42+
43+
44+
class FakeWorkflowRunResponse:
45+
def __init__(self, input=None, content: str | None = None):
46+
self.input = input
47+
self.content = content
48+
self.status = "COMPLETED"
49+
self.metrics = FakeMetrics()
50+
51+
52+
def make_fake_workflow(name: str):
53+
class FakeWorkflow:
54+
def __init__(self):
55+
self.name = name
56+
self.steps = ["first-step"]
57+
58+
async def _aexecute(self, session_id, user_id, execution_input, workflow_run_response, run_context=None):
59+
return FakeWorkflowRunResponse(input=execution_input.input, content="workflow-async")
60+
61+
def _execute_stream(self, session, execution_input, workflow_run_response, run_context=None):
62+
yield FakeEvent("WorkflowStarted", content=None)
63+
yield FakeEvent("StepStarted", content=None)
64+
yield FakeEvent("StepCompleted", content="hello ")
65+
yield FakeEvent("WorkflowCompleted", content="world", metrics=FakeMetrics(), status="COMPLETED")
66+
67+
return FakeWorkflow
68+
69+
70+
def make_fake_duplicate_content_workflow(name: str):
71+
class FakeWorkflow:
72+
def __init__(self):
73+
self.name = name
74+
self.steps = ["first-step"]
75+
76+
def _execute_stream(self, session, execution_input, workflow_run_response, run_context=None):
77+
yield FakeEvent("StepCompleted", content="hello")
78+
yield FakeEvent("WorkflowCompleted", content="hello", metrics=FakeMetrics(), status="COMPLETED")
79+
80+
return FakeWorkflow
81+
82+
83+
def make_fake_streaming_workflow_with_mutated_run_response(name: str):
84+
class FakeWorkflow:
85+
def __init__(self):
86+
self.name = name
87+
self.steps = ["first-step"]
88+
89+
def _execute_stream(self, session, execution_input, workflow_run_response, run_context=None):
90+
yield FakeEvent("WorkflowStarted", content=None)
91+
yield FakeEvent("StepCompleted", content="hello ")
92+
workflow_run_response.content = "world"
93+
workflow_run_response.status = "FAILED"
94+
workflow_run_response.metrics = FakeMetrics()
95+
yield FakeEvent("WorkflowCompleted", content="world")
96+
97+
return FakeWorkflow
98+
99+
100+
def make_fake_workflow_with_async_agent(name: str, agent_name: str):
101+
class FakeAgent:
102+
def __init__(self):
103+
self.name = agent_name
104+
105+
async def arun(self, input, stream=False, **kwargs):
106+
return FakeRunOutput(f"{input}-async")
107+
108+
WrappedAgent = wrap_agent(FakeAgent)
109+
110+
class FakeWorkflow:
111+
def __init__(self):
112+
self.name = name
113+
self.id = "workflow-123"
114+
self.steps = ["agent-step"]
115+
self.agent = WrappedAgent()
116+
117+
async def _aexecute(self, session_id, user_id, execution_input, workflow_run_response, run_context=None):
118+
return await self.agent.arun(execution_input.input)
119+
120+
return FakeWorkflow
121+
122+
123+
def make_fake_workflow_agent_path(name: str):
124+
class FakeWorkflow:
125+
def __init__(self):
126+
self.name = name
127+
self.id = "workflow-agent-123"
128+
self.steps = ["agent-step"]
129+
130+
def _execute_workflow_agent(self, user_input, session, execution_input, run_context, stream=False, **kwargs):
131+
if stream:
132+
def _stream():
133+
yield FakeEvent("WorkflowStarted")
134+
yield FakeEvent(
135+
"WorkflowCompleted",
136+
content=f"{user_input}-sync-stream",
137+
metrics=FakeMetrics(),
138+
status="COMPLETED",
139+
)
140+
141+
return _stream()
142+
return FakeRunOutput(f"{user_input}-sync")
143+
144+
async def _aexecute_workflow_agent(self, user_input, run_context, execution_input, stream=False, **kwargs):
145+
if stream:
146+
147+
async def _astream():
148+
yield FakeEvent("WorkflowStarted")
149+
yield FakeEvent(
150+
"WorkflowCompleted",
151+
content=f"{user_input}-async-stream",
152+
metrics=FakeMetrics(),
153+
status="COMPLETED",
154+
)
155+
156+
return _astream()
157+
return FakeRunOutput(f"{user_input}-async")
158+
159+
return FakeWorkflow
160+
161+
162+
def make_fake_component(name: str):
163+
class FakeComponent:
164+
def __init__(self):
165+
self.name = name
166+
167+
def run(self, input, stream=False, **kwargs):
168+
if stream:
169+
170+
def _stream():
171+
yield FakeEvent("RunStarted", model="fake-model", model_provider="FakeProvider")
172+
yield FakeEvent("RunContent", content=f"{input}-sync")
173+
yield FakeEvent("RunCompleted", metrics=FakeMetrics())
174+
175+
return _stream()
176+
return FakeRunOutput(f"{input}-sync")
177+
178+
def arun(self, input, stream=False, **kwargs):
179+
if stream:
180+
181+
async def _astream():
182+
yield FakeEvent("RunStarted", model="fake-model", model_provider="FakeProvider")
183+
yield FakeEvent("RunContent", content=f"{input}-async")
184+
yield FakeEvent("RunCompleted", metrics=FakeMetrics())
185+
186+
return _astream()
187+
188+
async def _result():
189+
return FakeRunOutput(f"{input}-async")
190+
191+
return _result()
192+
193+
return FakeComponent
194+
195+
196+
def make_fake_async_dispatch_component(name: str):
197+
class FakeComponent:
198+
def __init__(self):
199+
self.name = name
200+
201+
async def arun(self, input, stream=False, **kwargs):
202+
if stream:
203+
204+
async def _astream():
205+
yield FakeEvent("RunStarted", model="fake-model", model_provider="FakeProvider")
206+
yield FakeEvent("RunContent", content=f"{input}-awaited-async")
207+
yield FakeEvent("RunCompleted", metrics=FakeMetrics())
208+
209+
return _astream()
210+
return {"content": f"{input}-awaited-async"}
211+
212+
return FakeComponent
213+
214+
215+
def make_fake_error_component(name: str):
216+
class FakeComponent:
217+
def __init__(self):
218+
self.name = name
219+
220+
def run(self, input, stream=False, **kwargs):
221+
if stream:
222+
223+
def _stream():
224+
yield FakeEvent("RunStarted", model="fake-model", model_provider="FakeProvider")
225+
raise RuntimeError("sync-stream-error")
226+
227+
return _stream()
228+
return FakeRunOutput(f"{input}-sync")
229+
230+
def arun(self, input, stream=False, **kwargs):
231+
if stream:
232+
233+
async def _astream():
234+
yield FakeEvent("RunStarted", model="fake-model", model_provider="FakeProvider")
235+
raise RuntimeError("async-stream-error")
236+
237+
return _astream()
238+
239+
async def _result():
240+
return FakeRunOutput(f"{input}-async")
241+
242+
return _result()
243+
244+
return FakeComponent
245+
246+
247+
def make_fake_private_public_component(name: str):
248+
class FakeComponent:
249+
def __init__(self):
250+
self.name = name
251+
self.calls = []
252+
253+
def _run(self, run_response=None, run_messages=None, **kwargs):
254+
self.calls.append("_run")
255+
return FakeRunOutput("private-run")
256+
257+
def run(self, input, **kwargs):
258+
self.calls.append("run")
259+
return FakeRunOutput("public-run")
260+
261+
async def _arun(self, run_response=None, input=None, **kwargs):
262+
self.calls.append("_arun")
263+
return FakeRunOutput("private-arun")
264+
265+
def arun(self, input, **kwargs):
266+
self.calls.append("arun")
267+
268+
async def _result():
269+
return FakeRunOutput("public-arun")
270+
271+
return _result()
272+
273+
return FakeComponent
274+
275+
276+
class StrictSpan:
277+
def __init__(self):
278+
self.ended = False
279+
280+
def set_current(self):
281+
return None
282+
283+
def unset_current(self):
284+
return None
285+
286+
def log(self, **kwargs):
287+
if self.ended:
288+
raise AssertionError("log called after span.end()")
289+
290+
def end(self):
291+
self.ended = True
292+
293+
294+
__all__ = [
295+
"FakeExecutionInput",
296+
"FakeWorkflowRunResponse",
297+
"PROJECT_NAME",
298+
"StrictSpan",
299+
"isawaitable",
300+
"make_fake_async_dispatch_component",
301+
"make_fake_component",
302+
"make_fake_workflow_agent_path",
303+
"make_fake_workflow_with_async_agent",
304+
"make_fake_duplicate_content_workflow",
305+
"make_fake_error_component",
306+
"make_fake_private_public_component",
307+
"make_fake_workflow",
308+
]

0 commit comments

Comments
 (0)