From de0dbbd616d3a40076407396cf29c2aa8167321d Mon Sep 17 00:00:00 2001 From: Vimal Kumar Date: Wed, 24 Jun 2026 16:27:00 +0530 Subject: [PATCH] OLS-3274: add structured audit event logging for sandbox agent runs Adds AuditLogger that emits structured JSON audit events to stdout during agent execution. Logging and OTEL tracing are independent controls per spec: JSON logs are gated by LIGHTSPEED_AUDIT_ENABLED, OTEL spans are always created when an endpoint is configured. Co-Authored-By: Claude Opus 4.6 Signed-off-by: Vimal Kumar --- src/lightspeed_agentic/audit.py | 110 ++++++++++++ src/lightspeed_agentic/providers/openai.py | 8 +- src/lightspeed_agentic/routes/query.py | 64 +++++-- tests/test_audit.py | 192 +++++++++++++++++++++ tests/test_routes.py | 142 ++++++++++++++- 5 files changed, 499 insertions(+), 17 deletions(-) create mode 100644 src/lightspeed_agentic/audit.py create mode 100644 tests/test_audit.py diff --git a/src/lightspeed_agentic/audit.py b/src/lightspeed_agentic/audit.py new file mode 100644 index 0000000..fcc5a9b --- /dev/null +++ b/src/lightspeed_agentic/audit.py @@ -0,0 +1,110 @@ +"""Structured audit event emission for compliance logging.""" + +from __future__ import annotations + +import json +import sys +from datetime import UTC, datetime +from typing import Any + +from lightspeed_agentic.tracing import get_tracer +from lightspeed_agentic.types import ProviderEvent + + +def derive_phase(context: dict[str, Any] | None) -> str: + if not context: + return "analysis" + if "phase" in context: + return str(context["phase"]) + if "executionResult" in context: + return "verification" + if "approvedOption" in context: + return "execution" + return "analysis" + + +class AuditLogger: + def __init__( + self, trace_id: str, phase: str, model: str, provider: str, *, log_enabled: bool = True + ) -> None: + self._trace_id = trace_id + self._phase = phase + self._model = model + self._provider = provider + self._log_enabled = log_enabled + self._text_buffer: list[str] = [] + self._thinking_buffer: list[str] = [] + self._last_tool_name = "unknown" + self._tool_span: Any = None + self._tracer = get_tracer() + self._emit("audit.agent.started", model=model, provider=provider) + + def process_event(self, event: ProviderEvent) -> None: + match event.type: + case "text_delta": + self._text_buffer.append(event.text) + case "thinking_delta": + self._thinking_buffer.append(event.thinking) + case "content_block_stop": + self._flush_buffers() + case "tool_call": + self._flush_buffers() + if self._tool_span is not None: + self._tool_span.end() + self._last_tool_name = event.name or "unknown" + self._tool_span = self._tracer.start_span(f"tool.{self._last_tool_name}") + self._emit( + "audit.agent.tool.call", tool_name=self._last_tool_name, tool_input=event.input + ) + case "tool_result": + self._emit( + "audit.agent.tool.result", + tool_name=self._last_tool_name, + tool_output=event.output, + success=True, + ) + if self._tool_span is not None: + self._tool_span.end() + self._tool_span = None + case "result": + self._flush_buffers() + + def complete( + self, *, success: bool, input_tokens: int, output_tokens: int, cost_usd: float + ) -> None: + self._flush_buffers() + if self._tool_span is not None: + self._tool_span.end() + self._tool_span = None + self._emit( + "audit.agent.completed", + success=success, + input_tokens=input_tokens, + output_tokens=output_tokens, + cost_usd=cost_usd, + ) + + def _flush_buffers(self) -> None: + if self._text_buffer: + text = "".join(self._text_buffer) + self._text_buffer.clear() + if text: + self._emit("audit.agent.text", text=text) + if self._thinking_buffer: + thinking = "".join(self._thinking_buffer) + self._thinking_buffer.clear() + if thinking: + self._emit("audit.agent.thinking", thinking=thinking) + + def _emit(self, event: str, **fields: Any) -> None: + if not self._log_enabled: + return + record = { + "timestamp": datetime.now(UTC).isoformat(), + "level": "audit", + "event": event, + "trace_id": self._trace_id, + "phase": self._phase, + **fields, + } + print(json.dumps(record, default=str), flush=True, file=sys.stdout) diff --git a/src/lightspeed_agentic/providers/openai.py b/src/lightspeed_agentic/providers/openai.py index 04f569f..43364fe 100644 --- a/src/lightspeed_agentic/providers/openai.py +++ b/src/lightspeed_agentic/providers/openai.py @@ -259,13 +259,11 @@ async def query(self, options: ProviderQueryOptions) -> AsyncIterator[ProviderEv yield ContentBlockStopEvent() - usage = getattr(result, "usage", None) or {} - input_tokens = getattr(usage, "input_tokens", 0) - output_tokens = getattr(usage, "output_tokens", 0) + usage = result.context_wrapper.usage yield ResultEvent( text=stringify(result.final_output), cost_usd=0, - input_tokens=input_tokens, - output_tokens=output_tokens, + input_tokens=usage.input_tokens, + output_tokens=usage.output_tokens, ) diff --git a/src/lightspeed_agentic/routes/query.py b/src/lightspeed_agentic/routes/query.py index 2ecf614..f907722 100644 --- a/src/lightspeed_agentic/routes/query.py +++ b/src/lightspeed_agentic/routes/query.py @@ -13,6 +13,7 @@ from fastapi import APIRouter, Request +from lightspeed_agentic.audit import AuditLogger, derive_phase from lightspeed_agentic.logging import EventLogger from lightspeed_agentic.routes.models import RunRequest, RunResponse from lightspeed_agentic.tools import DEFAULT_ALLOWED_TOOLS @@ -62,7 +63,7 @@ def register_query_routes( model: str, max_turns: int, default_timeout_ms: int, - audit_enabled: bool = False, # noqa: ARG001 — used in PR2 + audit_enabled: bool = False, ) -> None: async def run_endpoint(req: RunRequest, request: Request) -> RunResponse: timeout = req.timeout_ms if req.timeout_ms is not None else default_timeout_ms @@ -77,6 +78,15 @@ async def run_endpoint(req: RunRequest, request: Request) -> RunResponse: trace_id, trace_ctx = parse_traceparent(traceparent) tracer = get_tracer() + phase = derive_phase(req.context) + audit_logger = AuditLogger( + trace_id=trace_id, + phase=phase, + model=model, + provider=provider.name, + log_enabled=audit_enabled, + ) + logger.info( "[agent] Starting query (model=%s, provider=%s, trace_id=%s)", model, @@ -87,10 +97,12 @@ async def run_endpoint(req: RunRequest, request: Request) -> RunResponse: try: text = "" cost = 0.0 + input_tokens = 0 + output_tokens = 0 event_logger = EventLogger("run") async def run() -> None: - nonlocal text, cost + nonlocal text, cost, input_tokens, output_tokens with tracer.start_as_current_span( "agent.run", context=trace_ctx, @@ -110,38 +122,68 @@ async def run() -> None: ) async for event in result: event_logger.log(event) + audit_logger.process_event(event) if event.type == "result": text = event.text cost = event.cost_usd + input_tokens = event.input_tokens + output_tokens = event.output_tokens break await asyncio.wait_for(run(), timeout=timeout / 1000) except TimeoutError: + audit_logger.complete( + success=False, + input_tokens=0, + output_tokens=0, + cost_usd=0, + ) return RunResponse(success=False, summary=f"Agent timed out after {timeout}ms") except Exception as e: + audit_logger.complete( + success=False, + input_tokens=0, + output_tokens=0, + cost_usd=0, + ) logger.exception("[agent] query error") return RunResponse(success=False, summary=f"Agent error: {e}") if not text: + audit_logger.complete( + success=False, + input_tokens=input_tokens, + output_tokens=output_tokens, + cost_usd=cost, + ) return RunResponse(success=False, summary="Agent returned empty response") try: parsed = json.loads(text) if not isinstance(parsed, dict): raise TypeError("expected dict") - logger.info( - "[agent] query complete: success=%s, cost=$%.4f", - parsed.get("success", True), - cost, - ) + success = parsed.get("success", True) + except (json.JSONDecodeError, TypeError): + parsed = None + success = True + + audit_logger.complete( + success=success, + input_tokens=input_tokens, + output_tokens=output_tokens, + cost_usd=cost, + ) + + if parsed is not None: + logger.info("[agent] query complete: success=%s, cost=$%.4f", success, cost) return RunResponse( - success=parsed.get("success", True), + success=success, summary=parsed.get("summary", text), **{k: v for k, v in parsed.items() if k not in ("success", "summary")}, ) - except (json.JSONDecodeError, TypeError): - logger.info("[agent] query complete (text response), cost=$%.4f", cost) - return RunResponse(success=True, summary=text) + + logger.info("[agent] query complete (text response), cost=$%.4f", cost) + return RunResponse(success=True, summary=text) router.add_api_route("/run", run_endpoint, methods=["POST"], response_model=RunResponse) diff --git a/tests/test_audit.py b/tests/test_audit.py new file mode 100644 index 0000000..97dfff7 --- /dev/null +++ b/tests/test_audit.py @@ -0,0 +1,192 @@ +"""Tests for audit event emission.""" + +from __future__ import annotations + +import contextlib +import json +from typing import Any + +import pytest + +from lightspeed_agentic.audit import AuditLogger, derive_phase +from lightspeed_agentic.types import ( + ContentBlockStopEvent, + TextDeltaEvent, + ThinkingDeltaEvent, + ToolCallEvent, + ToolResultEvent, +) + + +class TestDerivePhase: + def test_no_context_returns_analysis(self) -> None: + assert derive_phase(None) == "analysis" + + def test_empty_context_returns_analysis(self) -> None: + assert derive_phase({}) == "analysis" + + def test_approved_option_returns_execution(self) -> None: + assert derive_phase({"approvedOption": {"title": "fix"}}) == "execution" + + def test_execution_result_returns_verification(self) -> None: + assert derive_phase({"executionResult": {"success": True}}) == "verification" + + def test_explicit_phase_takes_precedence(self) -> None: + assert ( + derive_phase({"phase": "escalation", "approvedOption": {"title": "fix"}}) + == "escalation" + ) + + def test_both_approved_and_result_prefers_verification(self) -> None: + ctx: dict[str, Any] = { + "approvedOption": {"title": "fix"}, + "executionResult": {"success": True}, + } + assert derive_phase(ctx) == "verification" + + +def _collect_audit_lines(capsys: pytest.CaptureFixture[str]) -> list[dict[str, Any]]: + """Parse all JSON lines from captured stdout.""" + out = capsys.readouterr().out + lines = [] + for line in out.strip().splitlines(): + with contextlib.suppress(json.JSONDecodeError): + lines.append(json.loads(line)) + return lines + + +class TestAuditLoggerStarted: + def test_emits_started_on_construction(self, capsys: pytest.CaptureFixture[str]) -> None: + AuditLogger(trace_id="abc123", phase="analysis", model="gpt-4", provider="openai") + events = _collect_audit_lines(capsys) + assert len(events) == 1 + e = events[0] + assert e["event"] == "audit.agent.started" + assert e["trace_id"] == "abc123" + assert e["phase"] == "analysis" + assert e["model"] == "gpt-4" + assert e["provider"] == "openai" + assert e["level"] == "audit" + assert "timestamp" in e + + +class TestAuditLoggerTextBuffering: + def test_buffers_text_deltas_emits_on_block_stop( + self, capsys: pytest.CaptureFixture[str] + ) -> None: + al = AuditLogger(trace_id="t1", phase="analysis", model="m", provider="p") + capsys.readouterr() # clear started event + al.process_event(TextDeltaEvent(text="hello ")) + al.process_event(TextDeltaEvent(text="world")) + assert capsys.readouterr().out == "" # nothing yet + al.process_event(ContentBlockStopEvent()) + events = _collect_audit_lines(capsys) + assert len(events) == 1 + assert events[0]["event"] == "audit.agent.text" + assert events[0]["text"] == "hello world" + + def test_empty_text_buffer_no_event(self, capsys: pytest.CaptureFixture[str]) -> None: + al = AuditLogger(trace_id="t1", phase="analysis", model="m", provider="p") + capsys.readouterr() + al.process_event(ContentBlockStopEvent()) + assert _collect_audit_lines(capsys) == [] + + +class TestAuditLoggerThinkingBuffering: + def test_buffers_thinking_emits_on_block_stop(self, capsys: pytest.CaptureFixture[str]) -> None: + al = AuditLogger(trace_id="t1", phase="analysis", model="m", provider="p") + capsys.readouterr() + al.process_event(ThinkingDeltaEvent(thinking="let me think")) + al.process_event(ContentBlockStopEvent()) + events = _collect_audit_lines(capsys) + assert len(events) == 1 + assert events[0]["event"] == "audit.agent.thinking" + assert events[0]["thinking"] == "let me think" + + +class TestAuditLoggerToolCall: + def test_emits_tool_call(self, capsys: pytest.CaptureFixture[str]) -> None: + al = AuditLogger(trace_id="t1", phase="execution", model="m", provider="p") + capsys.readouterr() + al.process_event(ToolCallEvent(name="bash", input="ls -la")) + events = _collect_audit_lines(capsys) + assert len(events) == 1 + assert events[0]["event"] == "audit.agent.tool.call" + assert events[0]["tool_name"] == "bash" + assert events[0]["tool_input"] == "ls -la" + + +class TestAuditLoggerToolResult: + def test_emits_tool_result_with_tracked_name(self, capsys: pytest.CaptureFixture[str]) -> None: + al = AuditLogger(trace_id="t1", phase="execution", model="m", provider="p") + capsys.readouterr() + al.process_event(ToolCallEvent(name="bash", input="ls")) + capsys.readouterr() # clear tool call event + al.process_event(ToolResultEvent(output="file1.txt\nfile2.txt")) + events = _collect_audit_lines(capsys) + assert len(events) == 1 + assert events[0]["event"] == "audit.agent.tool.result" + assert events[0]["tool_name"] == "bash" + assert events[0]["tool_output"] == "file1.txt\nfile2.txt" + assert events[0]["success"] is True + + def test_tool_result_without_prior_call_uses_unknown( + self, capsys: pytest.CaptureFixture[str] + ) -> None: + al = AuditLogger(trace_id="t1", phase="analysis", model="m", provider="p") + capsys.readouterr() + al.process_event(ToolResultEvent(output="result")) + events = _collect_audit_lines(capsys) + assert events[0]["tool_name"] == "unknown" + + +class TestAuditLoggerCompleted: + def test_emits_completed(self, capsys: pytest.CaptureFixture[str]) -> None: + al = AuditLogger(trace_id="t1", phase="analysis", model="m", provider="p") + capsys.readouterr() + al.complete(success=True, input_tokens=100, output_tokens=50, cost_usd=0.01) + events = _collect_audit_lines(capsys) + assert len(events) == 1 + e = events[0] + assert e["event"] == "audit.agent.completed" + assert e["success"] is True + assert e["input_tokens"] == 100 + assert e["output_tokens"] == 50 + assert e["cost_usd"] == 0.01 + + +class TestAuditLoggerCommonFields: + def test_all_events_carry_required_fields(self, capsys: pytest.CaptureFixture[str]) -> None: + al = AuditLogger(trace_id="t1", phase="analysis", model="m", provider="p") + al.process_event(ToolCallEvent(name="bash", input="ls")) + al.complete(success=True, input_tokens=0, output_tokens=0, cost_usd=0) + events = _collect_audit_lines(capsys) + for e in events: + assert "timestamp" in e + assert e["level"] == "audit" + assert e["trace_id"] == "t1" + assert e["phase"] == "analysis" + + +class TestAuditLoggerDisabled: + def test_no_json_when_log_disabled(self, capsys: pytest.CaptureFixture[str]) -> None: + """When log_enabled=False, no JSON audit lines are emitted but spans still work.""" + al = AuditLogger( + trace_id="t1", phase="analysis", model="m", provider="p", log_enabled=False + ) + al.process_event(TextDeltaEvent(text="hello")) + al.process_event(ContentBlockStopEvent()) + al.process_event(ToolCallEvent(name="bash", input="ls")) + al.process_event(ToolResultEvent(output="file.txt")) + al.complete(success=True, input_tokens=100, output_tokens=50, cost_usd=0.01) + assert capsys.readouterr().out == "" + + def test_spans_created_when_log_disabled(self) -> None: + """OTEL spans are created even when JSON logging is disabled.""" + al = AuditLogger( + trace_id="t1", phase="analysis", model="m", provider="p", log_enabled=False + ) + al.process_event(ToolCallEvent(name="bash", input="ls")) + assert al._tool_span is not None + al.process_event(ToolResultEvent(output="done")) + assert al._tool_span is None diff --git a/tests/test_routes.py b/tests/test_routes.py index 596ef81..f1413ec 100644 --- a/tests/test_routes.py +++ b/tests/test_routes.py @@ -2,13 +2,21 @@ from __future__ import annotations +import json + import pytest from fastapi import FastAPI from httpx import ASGITransport, AsyncClient from lightspeed_agentic.routes import _resolve_router_model, build_router, resolve_startup_model from lightspeed_agentic.routes.query import _format_context_prefix -from lightspeed_agentic.types import ResultEvent +from lightspeed_agentic.types import ( + ContentBlockStopEvent, + ResultEvent, + TextDeltaEvent, + ToolCallEvent, + ToolResultEvent, +) from .conftest import MockProvider @@ -20,6 +28,13 @@ def _make_app(provider) -> FastAPI: return app +def _make_audit_app(provider) -> FastAPI: + app = FastAPI() + router = build_router(provider, skills_dir="/workspace", model="test-model", audit_enabled=True) + app.include_router(router, prefix="/v1/agent") + return app + + @pytest.mark.asyncio async def test_run_endpoint(): app = _make_app(MockProvider()) @@ -358,3 +373,128 @@ def test_resolve_router_model_uses_default_when_unset( monkeypatch.delenv("OPENAI_MODEL", raising=False) monkeypatch.delenv("LIGHTSPEED_MODEL", raising=False) assert _resolve_router_model("openai") == DEFAULT_MODEL + + +@pytest.mark.asyncio +async def test_run_emits_audit_events_when_enabled(capsys: pytest.CaptureFixture[str]): + """When audit is enabled, started and completed events are emitted.""" + app = _make_audit_app(MockProvider()) + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + resp = await client.post("/v1/agent/run", json={"query": "test"}) + assert resp.status_code == 200 + out = capsys.readouterr().out + events = [json.loads(line) for line in out.strip().splitlines() if line.strip().startswith("{")] + event_types = [e["event"] for e in events] + assert "audit.agent.started" in event_types + assert "audit.agent.completed" in event_types + for e in events: + assert e["level"] == "audit" + assert "trace_id" in e + assert "phase" in e + + +@pytest.mark.asyncio +async def test_run_no_audit_events_when_disabled(capsys: pytest.CaptureFixture[str]): + """Default (audit disabled) emits no audit events.""" + app = _make_app(MockProvider()) + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + resp = await client.post("/v1/agent/run", json={"query": "test"}) + assert resp.status_code == 200 + out = capsys.readouterr().out + audit_lines = [line for line in out.splitlines() if '"audit.agent.' in line] + assert audit_lines == [] + + +@pytest.mark.asyncio +async def test_run_audit_with_tool_events(capsys: pytest.CaptureFixture[str]): + """Audit logger captures tool call and result events.""" + events_seq = [ + ToolCallEvent(name="bash", input="ls"), + ToolResultEvent(output="file.txt"), + ResultEvent( + text='{"success": true, "summary": "done"}', + cost_usd=0.01, + input_tokens=10, + output_tokens=5, + ), + ] + app = _make_audit_app(MockProvider(events=events_seq)) + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + resp = await client.post("/v1/agent/run", json={"query": "test"}) + assert resp.status_code == 200 + out = capsys.readouterr().out + events = [json.loads(line) for line in out.strip().splitlines() if line.strip().startswith("{")] + event_types = [e["event"] for e in events] + assert "audit.agent.tool.call" in event_types + assert "audit.agent.tool.result" in event_types + + +@pytest.mark.asyncio +async def test_run_audit_with_text_buffering(capsys: pytest.CaptureFixture[str]): + """Text deltas are buffered and emitted as audit.agent.text on block stop.""" + events_seq = [ + TextDeltaEvent(text="hello "), + TextDeltaEvent(text="world"), + ContentBlockStopEvent(), + ResultEvent( + text='{"success": true, "summary": "done"}', cost_usd=0, input_tokens=0, output_tokens=0 + ), + ] + app = _make_audit_app(MockProvider(events=events_seq)) + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + resp = await client.post("/v1/agent/run", json={"query": "test"}) + assert resp.status_code == 200 + out = capsys.readouterr().out + events = [json.loads(line) for line in out.strip().splitlines() if line.strip().startswith("{")] + text_events = [e for e in events if e["event"] == "audit.agent.text"] + assert len(text_events) == 1 + assert text_events[0]["text"] == "hello world" + + +@pytest.mark.asyncio +async def test_run_audit_completed_captures_token_counts(capsys: pytest.CaptureFixture[str]): + """Completed event carries input_tokens and output_tokens from ResultEvent.""" + events_seq = [ + ResultEvent( + text='{"success": true, "summary": "done"}', + cost_usd=0.05, + input_tokens=42, + output_tokens=17, + ), + ] + app = _make_audit_app(MockProvider(events=events_seq)) + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + resp = await client.post("/v1/agent/run", json={"query": "test"}) + assert resp.status_code == 200 + out = capsys.readouterr().out + events = [json.loads(line) for line in out.strip().splitlines() if line.strip().startswith("{")] + completed = [e for e in events if e["event"] == "audit.agent.completed"] + assert len(completed) == 1 + assert completed[0]["input_tokens"] == 42 + assert completed[0]["output_tokens"] == 17 + assert completed[0]["cost_usd"] == 0.05 + + +@pytest.mark.asyncio +async def test_run_audit_phase_derivation(capsys: pytest.CaptureFixture[str]): + """Phase is derived from context fields.""" + app = _make_audit_app(MockProvider()) + async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as client: + resp = await client.post( + "/v1/agent/run", + json={ + "query": "test", + "context": { + "approvedOption": { + "title": "fix", + "diagnosis": {"rootCause": "test"}, + "proposal": {"description": "test", "risk": "low", "reversible": True}, + } + }, + }, + ) + assert resp.status_code == 200 + out = capsys.readouterr().out + events = [json.loads(line) for line in out.strip().splitlines() if line.strip().startswith("{")] + for e in events: + assert e["phase"] == "execution"