diff --git a/src/lightspeed_agentic/audit.py b/src/lightspeed_agentic/audit.py new file mode 100644 index 0000000..fcc5a9b --- /dev/null +++ b/src/lightspeed_agentic/audit.py @@ -0,0 +1,110 @@ +"""Structured audit event emission for compliance logging.""" + +from __future__ import annotations + +import json +import sys +from datetime import UTC, datetime +from typing import Any + +from lightspeed_agentic.tracing import get_tracer +from lightspeed_agentic.types import ProviderEvent + + +def derive_phase(context: dict[str, Any] | None) -> str: + if not context: + return "analysis" + if "phase" in context: + return str(context["phase"]) + if "executionResult" in context: + return "verification" + if "approvedOption" in context: + return "execution" + return "analysis" + + +class AuditLogger: + def __init__( + self, trace_id: str, phase: str, model: str, provider: str, *, log_enabled: bool = True + ) -> None: + self._trace_id = trace_id + self._phase = phase + self._model = model + self._provider = provider + self._log_enabled = log_enabled + self._text_buffer: list[str] = [] + self._thinking_buffer: list[str] = [] + self._last_tool_name = "unknown" + self._tool_span: Any = None + self._tracer = get_tracer() + self._emit("audit.agent.started", model=model, provider=provider) + + def process_event(self, event: ProviderEvent) -> None: + match event.type: + case "text_delta": + self._text_buffer.append(event.text) + case "thinking_delta": + self._thinking_buffer.append(event.thinking) + case "content_block_stop": + self._flush_buffers() + case "tool_call": + self._flush_buffers() + if self._tool_span is not None: + self._tool_span.end() + self._last_tool_name = event.name or "unknown" + self._tool_span = self._tracer.start_span(f"tool.{self._last_tool_name}") + self._emit( + "audit.agent.tool.call", tool_name=self._last_tool_name, tool_input=event.input + ) + case "tool_result": + self._emit( + "audit.agent.tool.result", + tool_name=self._last_tool_name, + tool_output=event.output, + success=True, + ) + if self._tool_span is not None: + self._tool_span.end() + self._tool_span = None + case "result": + self._flush_buffers() + + def complete( + self, *, success: bool, input_tokens: int, output_tokens: int, cost_usd: float + ) -> None: + self._flush_buffers() + if self._tool_span is not None: + self._tool_span.end() + self._tool_span = None + self._emit( + "audit.agent.completed", + success=success, + input_tokens=input_tokens, + output_tokens=output_tokens, + cost_usd=cost_usd, + ) + + def _flush_buffers(self) -> None: + if self._text_buffer: + text = "".join(self._text_buffer) + self._text_buffer.clear() + if text: + self._emit("audit.agent.text", text=text) + if self._thinking_buffer: + thinking = "".join(self._thinking_buffer) + self._thinking_buffer.clear() + if thinking: + self._emit("audit.agent.thinking", thinking=thinking) + + def _emit(self, event: str, **fields: Any) -> None: + if not self._log_enabled: + return + record = { + "timestamp": datetime.now(UTC).isoformat(), + "level": "audit", + "event": event, + "trace_id": self._trace_id, + "phase": self._phase, + **fields, + } + print(json.dumps(record, default=str), flush=True, file=sys.stdout) diff --git a/src/lightspeed_agentic/providers/openai.py b/src/lightspeed_agentic/providers/openai.py index 04f569f..43364fe 100644 --- a/src/lightspeed_agentic/providers/openai.py +++ b/src/lightspeed_agentic/providers/openai.py @@ -259,13 +259,11 @@ async def query(self, options: ProviderQueryOptions) -> AsyncIterator[ProviderEv yield ContentBlockStopEvent() - usage = getattr(result, "usage", None) or {} - input_tokens = getattr(usage, "input_tokens", 0) - output_tokens = getattr(usage, "output_tokens", 0) + usage = result.context_wrapper.usage yield ResultEvent( text=stringify(result.final_output), cost_usd=0, - input_tokens=input_tokens, - output_tokens=output_tokens, + input_tokens=usage.input_tokens, + output_tokens=usage.output_tokens, ) diff --git a/src/lightspeed_agentic/routes/query.py b/src/lightspeed_agentic/routes/query.py index 2ecf614..f907722 100644 --- a/src/lightspeed_agentic/routes/query.py +++ b/src/lightspeed_agentic/routes/query.py @@ -13,6 +13,7 @@ from fastapi import APIRouter, Request +from lightspeed_agentic.audit import AuditLogger, derive_phase from lightspeed_agentic.logging import EventLogger from lightspeed_agentic.routes.models import RunRequest, RunResponse from lightspeed_agentic.tools import DEFAULT_ALLOWED_TOOLS @@ -62,7 +63,7 @@ def register_query_routes( model: str, max_turns: int, default_timeout_ms: int, - audit_enabled: bool = False, # noqa: ARG001 — used in PR2 + audit_enabled: bool = False, ) -> None: async def run_endpoint(req: RunRequest, request: Request) -> RunResponse: timeout = req.timeout_ms if req.timeout_ms is not None else default_timeout_ms @@ -77,6 +78,15 @@ async def run_endpoint(req: RunRequest, request: Request) -> RunResponse: trace_id, trace_ctx = parse_traceparent(traceparent) tracer = get_tracer() + phase = derive_phase(req.context) + audit_logger = AuditLogger( + trace_id=trace_id, + phase=phase, + model=model, + provider=provider.name, + log_enabled=audit_enabled, + ) + logger.info( "[agent] Starting query (model=%s, provider=%s, trace_id=%s)", model, @@ -87,10 +97,12 @@ async def run_endpoint(req: RunRequest, request: Request) -> RunResponse: try: text = "" cost = 0.0 + input_tokens = 0 + output_tokens = 0 event_logger = EventLogger("run") async def run() -> None: - nonlocal text, cost + nonlocal text, cost, input_tokens, output_tokens with tracer.start_as_current_span( "agent.run", context=trace_ctx, @@ -110,38 +122,68 @@ async def run() -> None: ) async for event in result: event_logger.log(event) + audit_logger.process_event(event) if event.type == "result": text = event.text cost = event.cost_usd + input_tokens = event.input_tokens + output_tokens = event.output_tokens break await asyncio.wait_for(run(), timeout=timeout / 1000) except TimeoutError: + audit_logger.complete( + success=False, + input_tokens=0, + output_tokens=0, + cost_usd=0, + ) return RunResponse(success=False, summary=f"Agent timed out after {timeout}ms") except Exception as e: + audit_logger.complete( + success=False, + input_tokens=0, + output_tokens=0, + cost_usd=0, + ) logger.exception("[agent] query error") return RunResponse(success=False, summary=f"Agent error: {e}") if not text: + audit_logger.complete( + success=False, + input_tokens=input_tokens, + output_tokens=output_tokens, + cost_usd=cost, + ) return RunResponse(success=False, summary="Agent returned empty response") try: parsed = json.loads(text) if not isinstance(parsed, dict): raise TypeError("expected dict") - logger.info( - "[agent] query complete: success=%s, cost=$%.4f", - parsed.get("success", True), - cost, - ) + success = parsed.get("success", True) + except (json.JSONDecodeError, TypeError): + parsed = None + success = True + + audit_logger.complete( + success=success, + input_tokens=input_tokens, + output_tokens=output_tokens, + cost_usd=cost, + ) + + if parsed is not None: + logger.info("[agent] query complete: success=%s, cost=$%.4f", success, cost) return RunResponse( - success=parsed.get("success", True), + success=success, summary=parsed.get("summary", text), **{k: v for k, v in parsed.items() if k not in ("success", "summary")}, ) - except (json.JSONDecodeError, TypeError): - logger.info("[agent] query complete (text response), cost=$%.4f", cost) - return RunResponse(success=True, summary=text) + + logger.info("[agent] query complete (text response), cost=$%.4f", cost) + return RunResponse(success=True, summary=text) router.add_api_route("/run", run_endpoint, methods=["POST"], response_model=RunResponse) diff --git a/tests/test_audit.py b/tests/test_audit.py new file mode 100644 index 0000000..97dfff7 --- /dev/null +++ b/tests/test_audit.py @@ -0,0 +1,192 @@ +"""Tests for audit event emission.""" + +from __future__ import annotations + +import contextlib +import json +from typing import Any + +import pytest + +from lightspeed_agentic.audit import AuditLogger, derive_phase +from lightspeed_agentic.types import ( + ContentBlockStopEvent, + TextDeltaEvent, + ThinkingDeltaEvent, + ToolCallEvent, + ToolResultEvent, +) + + +class TestDerivePhase: + def test_no_context_returns_analysis(self) -> None: + assert derive_phase(None) == "analysis" + + def test_empty_context_returns_analysis(self) -> None: + assert derive_phase({}) == "analysis" + + def test_approved_option_returns_execution(self) -> None: + assert derive_phase({"approvedOption": {"title": "fix"}}) == "execution" + + def test_execution_result_returns_verification(self) -> None: + assert derive_phase({"executionResult": {"success": True}}) == "verification" + + def test_explicit_phase_takes_precedence(self) -> None: + assert ( + derive_phase({"phase": "escalation", "approvedOption": {"title": "fix"}}) + == "escalation" + ) + + def test_both_approved_and_result_prefers_verification(self) -> None: + ctx: dict[str, Any] = { + "approvedOption": {"title": "fix"}, + "executionResult": {"success": True}, + } + assert derive_phase(ctx) == "verification" + + +def _collect_audit_lines(capsys: pytest.CaptureFixture[str]) -> list[dict[str, Any]]: + """Parse all JSON lines from captured stdout.""" + out = capsys.readouterr().out + lines = [] + for line in out.strip().splitlines(): + with contextlib.suppress(json.JSONDecodeError): + lines.append(json.loads(line)) + return lines + + +class TestAuditLoggerStarted: + def test_emits_started_on_construction(self, capsys: pytest.CaptureFixture[str]) -> None: + AuditLogger(trace_id="abc123", phase="analysis", model="gpt-4", provider="openai") + events = _collect_audit_lines(capsys) + assert len(events) == 1 + e = events[0] + assert e["event"] == "audit.agent.started" + assert e["trace_id"] == "abc123" + assert e["phase"] == "analysis" + assert e["model"] == "gpt-4" + assert e["provider"] == "openai" + assert e["level"] == "audit" + assert "timestamp" in e + + +class TestAuditLoggerTextBuffering: + def test_buffers_text_deltas_emits_on_block_stop( + self, capsys: pytest.CaptureFixture[str] + ) -> None: + al = AuditLogger(trace_id="t1", phase="analysis", model="m", provider="p") + capsys.readouterr() # clear started event + al.process_event(TextDeltaEvent(text="hello ")) + al.process_event(TextDeltaEvent(text="world")) + assert capsys.readouterr().out == "" # nothing yet + al.process_event(ContentBlockStopEvent()) + events = _collect_audit_lines(capsys) + assert len(events) == 1 + assert events[0]["event"] == "audit.agent.text" + assert events[0]["text"] == "hello world" + + def test_empty_text_buffer_no_event(self, capsys: pytest.CaptureFixture[str]) -> None: + al = AuditLogger(trace_id="t1", phase="analysis", model="m", provider="p") + capsys.readouterr() + al.process_event(ContentBlockStopEvent()) + assert _collect_audit_lines(capsys) == [] + + +class TestAuditLoggerThinkingBuffering: + def test_buffers_thinking_emits_on_block_stop(self, capsys: pytest.CaptureFixture[str]) -> None: + al = AuditLogger(trace_id="t1", phase="analysis", model="m", provider="p") + capsys.readouterr() + al.process_event(ThinkingDeltaEvent(thinking="let me think")) + al.process_event(ContentBlockStopEvent()) + events = _collect_audit_lines(capsys) + assert len(events) == 1 + assert events[0]["event"] == "audit.agent.thinking" + assert events[0]["thinking"] == "let me think" + + +class TestAuditLoggerToolCall: + def test_emits_tool_call(self, capsys: pytest.CaptureFixture[str]) -> None: + al = AuditLogger(trace_id="t1", phase="execution", model="m", provider="p") + capsys.readouterr() + al.process_event(ToolCallEvent(name="bash", input="ls -la")) + events = _collect_audit_lines(capsys) + assert len(events) == 1 + assert events[0]["event"] == "audit.agent.tool.call" + assert events[0]["tool_name"] == "bash" + assert events[0]["tool_input"] == "ls -la" + + +class TestAuditLoggerToolResult: + def test_emits_tool_result_with_tracked_name(self, capsys: pytest.CaptureFixture[str]) -> None: + al = AuditLogger(trace_id="t1", phase="execution", model="m", provider="p") + capsys.readouterr() + al.process_event(ToolCallEvent(name="bash", input="ls")) + capsys.readouterr() # clear tool call event + al.process_event(ToolResultEvent(output="file1.txt\nfile2.txt")) + events = _collect_audit_lines(capsys) + assert len(events) == 1 + assert events[0]["event"] == "audit.agent.tool.result" + assert events[0]["tool_name"] == "bash" + assert events[0]["tool_output"] == "file1.txt\nfile2.txt" + assert events[0]["success"] is True + + def test_tool_result_without_prior_call_uses_unknown( + self, capsys: pytest.CaptureFixture[str] + ) -> None: + al = AuditLogger(trace_id="t1", phase="analysis", model="m", provider="p") + capsys.readouterr() + al.process_event(ToolResultEvent(output="result")) + events = _collect_audit_lines(capsys) + assert events[0]["tool_name"] == "unknown" + + +class TestAuditLoggerCompleted: + def test_emits_completed(self, capsys: pytest.CaptureFixture[str]) -> None: + al = AuditLogger(trace_id="t1", phase="analysis", model="m", provider="p") + capsys.readouterr() + al.complete(success=True, input_tokens=100, output_tokens=50, cost_usd=0.01) + events = _collect_audit_lines(capsys) + assert len(events) == 1 + e = events[0] + assert e["event"] == "audit.agent.completed" + assert e["success"] is True + assert e["input_tokens"] == 100 + assert e["output_tokens"] == 50 + assert e["cost_usd"] == 0.01 + + +class TestAuditLoggerCommonFields: + def test_all_events_carry_required_fields(self, capsys: pytest.CaptureFixture[str]) -> None: + al = AuditLogger(trace_id="t1", phase="analysis", model="m", provider="p") + al.process_event(ToolCallEvent(name="bash", input="ls")) + al.complete(success=True, input_tokens=0, output_tokens=0, cost_usd=0) + events = _collect_audit_lines(capsys) + for e in events: + assert "timestamp" in e + assert e["level"] == "audit" + assert e["trace_id"] == "t1" + assert e["phase"] == "analysis" + + +class TestAuditLoggerDisabled: + def test_no_json_when_log_disabled(self, capsys: pytest.CaptureFixture[str]) -> None: + """When log_enabled=False, no JSON audit lines are emitted but spans still work.""" + al = AuditLogger( + trace_id="t1", phase="analysis", model="m", provider="p", log_enabled=False + ) + al.process_event(TextDeltaEvent(text="hello")) + al.process_event(ContentBlockStopEvent()) + al.process_event(ToolCallEvent(name="bash", input="ls")) + al.process_event(ToolResultEvent(output="file.txt")) + al.complete(success=True, input_tokens=100, output_tokens=50, cost_usd=0.01) + assert capsys.readouterr().out == "" + + def test_spans_created_when_log_disabled(self) -> None: + """OTEL spans are created even when JSON logging is disabled.""" + al = AuditLogger( + trace_id="t1", phase="analysis", model="m", provider="p", log_enabled=False + ) + al.process_event(ToolCallEvent(name="bash", input="ls")) + assert al._tool_span is not None + al.process_event(ToolResultEvent(output="done")) + assert al._tool_span is None diff --git a/tests/test_routes.py b/tests/test_routes.py index 596ef81..f1413ec 100644 --- a/tests/test_routes.py +++ b/tests/test_routes.py @@ -2,13 +2,21 @@ from __future__ import annotations +import json + import pytest from fastapi import FastAPI from httpx import ASGITransport, AsyncClient from lightspeed_agentic.routes import _resolve_router_model, build_router, resolve_startup_model from lightspeed_agentic.routes.query import _format_context_prefix -from lightspeed_agentic.types import ResultEvent +from lightspeed_agentic.types import ( + ContentBlockStopEvent, + ResultEvent, + TextDeltaEvent, + ToolCallEvent, + ToolResultEvent, +) from .conftest import MockProvider @@ -20,6 +28,13 @@ def _make_app(provider) -> FastAPI: return app +def _make_audit_app(provider) -> FastAPI: + app = FastAPI() + router = build_router(provider, skills_dir="/workspace", model="test-model", audit_enabled=True) + app.include_router(router, prefix="/v1/agent") + return app + + @pytest.mark.asyncio async def test_run_endpoint(): app = _make_app(MockProvider()) @@ -358,3 +373,128 @@ def test_resolve_router_model_uses_default_when_unset( monkeypatch.delenv("OPENAI_MODEL", raising=False) monkeypatch.delenv("LIGHTSPEED_MODEL", raising=False) assert _resolve_router_model("openai") == DEFAULT_MODEL + + +@pytest.mark.asyncio +async def test_run_emits_audit_events_when_enabled(capsys: pytest.CaptureFixture[str]): + """When audit is enabled, started and completed events are emitted.""" + app = _make_audit_app(MockProvider()) + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + resp = await client.post("/v1/agent/run", json={"query": "test"}) + assert resp.status_code == 200 + out = capsys.readouterr().out + events = [json.loads(line) for line in out.strip().splitlines() if line.strip().startswith("{")] + event_types = [e["event"] for e in events] + assert "audit.agent.started" in event_types + assert "audit.agent.completed" in event_types + for e in events: + assert e["level"] == "audit" + assert "trace_id" in e + assert "phase" in e + + +@pytest.mark.asyncio +async def test_run_no_audit_events_when_disabled(capsys: pytest.CaptureFixture[str]): + """Default (audit disabled) emits no audit events.""" + app = _make_app(MockProvider()) + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + resp = await client.post("/v1/agent/run", json={"query": "test"}) + assert resp.status_code == 200 + out = capsys.readouterr().out + audit_lines = [line for line in out.splitlines() if '"audit.agent.' in line] + assert audit_lines == [] + + +@pytest.mark.asyncio +async def test_run_audit_with_tool_events(capsys: pytest.CaptureFixture[str]): + """Audit logger captures tool call and result events.""" + events_seq = [ + ToolCallEvent(name="bash", input="ls"), + ToolResultEvent(output="file.txt"), + ResultEvent( + text='{"success": true, "summary": "done"}', + cost_usd=0.01, + input_tokens=10, + output_tokens=5, + ), + ] + app = _make_audit_app(MockProvider(events=events_seq)) + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + resp = await client.post("/v1/agent/run", json={"query": "test"}) + assert resp.status_code == 200 + out = capsys.readouterr().out + events = [json.loads(line) for line in out.strip().splitlines() if line.strip().startswith("{")] + event_types = [e["event"] for e in events] + assert "audit.agent.tool.call" in event_types + assert "audit.agent.tool.result" in event_types + + +@pytest.mark.asyncio +async def test_run_audit_with_text_buffering(capsys: pytest.CaptureFixture[str]): + """Text deltas are buffered and emitted as audit.agent.text on block stop.""" + events_seq = [ + TextDeltaEvent(text="hello "), + TextDeltaEvent(text="world"), + ContentBlockStopEvent(), + ResultEvent( + text='{"success": true, "summary": "done"}', cost_usd=0, input_tokens=0, output_tokens=0 + ), + ] + app = _make_audit_app(MockProvider(events=events_seq)) + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + resp = await client.post("/v1/agent/run", json={"query": "test"}) + assert resp.status_code == 200 + out = capsys.readouterr().out + events = [json.loads(line) for line in out.strip().splitlines() if line.strip().startswith("{")] + text_events = [e for e in events if e["event"] == "audit.agent.text"] + assert len(text_events) == 1 + assert text_events[0]["text"] == "hello world" + + +@pytest.mark.asyncio +async def test_run_audit_completed_captures_token_counts(capsys: pytest.CaptureFixture[str]): + """Completed event carries input_tokens and output_tokens from ResultEvent.""" + events_seq = [ + ResultEvent( + text='{"success": true, "summary": "done"}', + cost_usd=0.05, + input_tokens=42, + output_tokens=17, + ), + ] + app = _make_audit_app(MockProvider(events=events_seq)) + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + resp = await client.post("/v1/agent/run", json={"query": "test"}) + assert resp.status_code == 200 + out = capsys.readouterr().out + events = [json.loads(line) for line in out.strip().splitlines() if line.strip().startswith("{")] + completed = [e for e in events if e["event"] == "audit.agent.completed"] + assert len(completed) == 1 + assert completed[0]["input_tokens"] == 42 + assert completed[0]["output_tokens"] == 17 + assert completed[0]["cost_usd"] == 0.05 + + +@pytest.mark.asyncio +async def test_run_audit_phase_derivation(capsys: pytest.CaptureFixture[str]): + """Phase is derived from context fields.""" + app = _make_audit_app(MockProvider()) + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + resp = await client.post( + "/v1/agent/run", + json={ + "query": "test", + "context": { + "approvedOption": { + "title": "fix", + "diagnosis": {"rootCause": "test"}, + "proposal": {"description": "test", "risk": "low", "reversible": True}, + } + }, + }, + ) + assert resp.status_code == 200 + out = capsys.readouterr().out + events = [json.loads(line) for line in out.strip().splitlines() if line.strip().startswith("{")] + for e in events: + assert e["phase"] == "execution"