Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions src/lightspeed_agentic/audit.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
"""Structured audit event emission for compliance logging."""

from __future__ import annotations

import json
import sys
from datetime import UTC, datetime
from typing import Any

from lightspeed_agentic.tracing import get_tracer
from lightspeed_agentic.types import ProviderEvent


def derive_phase(context: dict[str, Any] | None) -> str:
if not context:
return "analysis"
if "phase" in context:
return str(context["phase"])
if "executionResult" in context:
return "verification"
if "approvedOption" in context:
return "execution"
return "analysis"


class AuditLogger:
def __init__(
self, trace_id: str, phase: str, model: str, provider: str, *, log_enabled: bool = True
) -> None:
self._trace_id = trace_id
self._phase = phase
self._model = model
self._provider = provider
self._log_enabled = log_enabled
self._text_buffer: list[str] = []
self._thinking_buffer: list[str] = []
self._last_tool_name = "unknown"
self._tool_span: Any = None
self._tracer = get_tracer()
self._emit("audit.agent.started", model=model, provider=provider)

def process_event(self, event: ProviderEvent) -> None:
match event.type:
case "text_delta":
self._text_buffer.append(event.text)
case "thinking_delta":
self._thinking_buffer.append(event.thinking)
case "content_block_stop":
self._flush_buffers()
case "tool_call":
self._flush_buffers()
if self._tool_span is not None:
self._tool_span.end()
self._last_tool_name = event.name or "unknown"
self._tool_span = self._tracer.start_span(f"tool.{self._last_tool_name}")
self._emit(
"audit.agent.tool.call", tool_name=self._last_tool_name, tool_input=event.input
)
case "tool_result":
self._emit(
"audit.agent.tool.result",
tool_name=self._last_tool_name,
tool_output=event.output,
success=True,
)
if self._tool_span is not None:
self._tool_span.end()
self._tool_span = None
case "result":
self._flush_buffers()

def complete(
self, *, success: bool, input_tokens: int, output_tokens: int, cost_usd: float
) -> None:
self._flush_buffers()
if self._tool_span is not None:
self._tool_span.end()
self._tool_span = None
self._emit(
"audit.agent.completed",
success=success,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost_usd=cost_usd,
)

def _flush_buffers(self) -> None:
if self._text_buffer:
text = "".join(self._text_buffer)
self._text_buffer.clear()
if text:
self._emit("audit.agent.text", text=text)
if self._thinking_buffer:
thinking = "".join(self._thinking_buffer)
self._thinking_buffer.clear()
if thinking:
self._emit("audit.agent.thinking", thinking=thinking)

def _emit(self, event: str, **fields: Any) -> None:
if not self._log_enabled:
return
record = {
"timestamp": datetime.now(UTC).isoformat(),
"level": "audit",
"event": event,
"trace_id": self._trace_id,
"phase": self._phase,
**fields,
}
print(json.dumps(record, default=str), flush=True, file=sys.stdout)
8 changes: 3 additions & 5 deletions src/lightspeed_agentic/providers/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,13 +259,11 @@ async def query(self, options: ProviderQueryOptions) -> AsyncIterator[ProviderEv

yield ContentBlockStopEvent()

usage = getattr(result, "usage", None) or {}
input_tokens = getattr(usage, "input_tokens", 0)
output_tokens = getattr(usage, "output_tokens", 0)
usage = result.context_wrapper.usage

yield ResultEvent(
text=stringify(result.final_output),
cost_usd=0,
input_tokens=input_tokens,
output_tokens=output_tokens,
input_tokens=usage.input_tokens,
output_tokens=usage.output_tokens,
)
64 changes: 53 additions & 11 deletions src/lightspeed_agentic/routes/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from fastapi import APIRouter, Request

from lightspeed_agentic.audit import AuditLogger, derive_phase
from lightspeed_agentic.logging import EventLogger
from lightspeed_agentic.routes.models import RunRequest, RunResponse
from lightspeed_agentic.tools import DEFAULT_ALLOWED_TOOLS
Expand Down Expand Up @@ -62,7 +63,7 @@ def register_query_routes(
model: str,
max_turns: int,
default_timeout_ms: int,
audit_enabled: bool = False, # noqa: ARG001 — used in PR2
audit_enabled: bool = False,
) -> None:
async def run_endpoint(req: RunRequest, request: Request) -> RunResponse:
timeout = req.timeout_ms if req.timeout_ms is not None else default_timeout_ms
Expand All @@ -77,6 +78,15 @@ async def run_endpoint(req: RunRequest, request: Request) -> RunResponse:
trace_id, trace_ctx = parse_traceparent(traceparent)
tracer = get_tracer()

phase = derive_phase(req.context)
audit_logger = AuditLogger(
trace_id=trace_id,
phase=phase,
model=model,
provider=provider.name,
log_enabled=audit_enabled,
)

logger.info(
"[agent] Starting query (model=%s, provider=%s, trace_id=%s)",
model,
Expand All @@ -87,10 +97,12 @@ async def run_endpoint(req: RunRequest, request: Request) -> RunResponse:
try:
text = ""
cost = 0.0
input_tokens = 0
output_tokens = 0
event_logger = EventLogger("run")

async def run() -> None:
nonlocal text, cost
nonlocal text, cost, input_tokens, output_tokens
with tracer.start_as_current_span(
"agent.run",
context=trace_ctx,
Expand All @@ -110,38 +122,68 @@ async def run() -> None:
)
async for event in result:
event_logger.log(event)
audit_logger.process_event(event)
if event.type == "result":
text = event.text
cost = event.cost_usd
input_tokens = event.input_tokens
output_tokens = event.output_tokens
break

await asyncio.wait_for(run(), timeout=timeout / 1000)

except TimeoutError:
audit_logger.complete(
success=False,
input_tokens=0,
output_tokens=0,
cost_usd=0,
)
return RunResponse(success=False, summary=f"Agent timed out after {timeout}ms")
except Exception as e:
audit_logger.complete(
success=False,
input_tokens=0,
output_tokens=0,
cost_usd=0,
)
logger.exception("[agent] query error")
return RunResponse(success=False, summary=f"Agent error: {e}")
Comment thread
coderabbitai[bot] marked this conversation as resolved.

if not text:
audit_logger.complete(
success=False,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost_usd=cost,
)
return RunResponse(success=False, summary="Agent returned empty response")

try:
parsed = json.loads(text)
if not isinstance(parsed, dict):
raise TypeError("expected dict")
logger.info(
"[agent] query complete: success=%s, cost=$%.4f",
parsed.get("success", True),
cost,
)
success = parsed.get("success", True)
except (json.JSONDecodeError, TypeError):
parsed = None
success = True

audit_logger.complete(
success=success,
input_tokens=input_tokens,
output_tokens=output_tokens,
cost_usd=cost,
)

if parsed is not None:
logger.info("[agent] query complete: success=%s, cost=$%.4f", success, cost)
return RunResponse(
success=parsed.get("success", True),
success=success,
summary=parsed.get("summary", text),
**{k: v for k, v in parsed.items() if k not in ("success", "summary")},
)
except (json.JSONDecodeError, TypeError):
logger.info("[agent] query complete (text response), cost=$%.4f", cost)
return RunResponse(success=True, summary=text)

logger.info("[agent] query complete (text response), cost=$%.4f", cost)
return RunResponse(success=True, summary=text)

router.add_api_route("/run", run_endpoint, methods=["POST"], response_model=RunResponse)
Loading