diff --git a/.env.example b/.env.example index 701cdd7..9952b48 100644 --- a/.env.example +++ b/.env.example @@ -118,6 +118,20 @@ LANGFUSE_HOST=https://cloud.langfuse.com # Disable OpenTelemetry OTLP export (we use Langfuse callback handler instead) OTEL_SDK_DISABLED=true +# ============================================================================= +# LangWatch Configuration (Observability — alternative/complement to Langfuse) +# ============================================================================= +# LangWatch provides OTLP-native tracing, 30+ built-in evaluators (RAGAS, +# safety, PII), and agent simulation. Both Langfuse and LangWatch can be +# enabled simultaneously — callbacks are merged at runtime. +# +# Self-host: git clone https://github.com/langwatch/langwatch && docker compose up -d +# Dashboard: http://localhost:5560 +LANGWATCH_ENABLED=false +# Get your API key from LangWatch project settings (http://localhost:5560 or cloud) +LANGWATCH_API_KEY= +LANGWATCH_ENDPOINT=http://localhost:5560 + # ============================================================================= # Container Configuration # ============================================================================= diff --git a/docs/developer-guide.md b/docs/developer-guide.md index ddf96d0..b24aa62 100644 --- a/docs/developer-guide.md +++ b/docs/developer-guide.md @@ -486,6 +486,53 @@ LANGFUSE_ENABLED=false --- +## 9b. LangWatch Tracing (Alternative) + +[LangWatch](https://langwatch.ai) is an OpenTelemetry-native LLM observability platform with 30+ built-in evaluators (RAGAS faithfulness, BLEU, ROUGE, PII detection, content safety), agent simulation, and prompt management with Git sync. + +Both Langfuse and LangWatch can be enabled simultaneously — their callbacks are merged at runtime. + +### Setup + +1. Self-host LangWatch: + +```bash +git clone https://github.com/langwatch/langwatch.git +cd langwatch +cp langwatch/.env.example langwatch/.env +docker compose up -d +# Dashboard at http://localhost:5560 +``` + +2. Create an account and project in the LangWatch UI +3. Copy your API key from project settings +4. Add to `.env`: + +```bash +LANGWATCH_ENABLED=true +LANGWATCH_API_KEY=your-api-key +LANGWATCH_ENDPOINT=http://localhost:5560 +``` + +5. Restart the worker — traces appear in the LangWatch dashboard immediately + +### Built-in evaluators + +LangWatch includes evaluators that can be configured from the dashboard: + +- **RAGAS**: faithfulness, context precision/recall, BLEU, ROUGE, factual correctness +- **Safety**: Azure content safety, jailbreak detection, prompt injection +- **Quality**: sentiment, similarity, off-topic detection, format validation +- **PII**: Presidio-based PII detection + +### Disabling LangWatch + +```bash +LANGWATCH_ENABLED=false +``` + +--- + ## 10. Debugging Tools ### Patch a workflow checkpoint diff --git a/proposals/009-langwatch-integration.md b/proposals/009-langwatch-integration.md new file mode 100644 index 0000000..8b12fc8 --- /dev/null +++ b/proposals/009-langwatch-integration.md @@ -0,0 +1,101 @@ +# Proposal: LangWatch Integration as Optional Observability Backend + +**Author:** Tushar Jadhav +**Date:** 2026-04-22 +**Status:** Under Review + +## Summary + +Add LangWatch as an optional, parallel observability backend alongside the existing Langfuse integration. LangWatch provides OTLP-native LLM tracing, 30+ built-in evaluators (RAGAS, safety, PII detection), agent simulation, and prompt management with Git sync — capabilities that directly support Forge's AI quality metrics needs. + +## Motivation + +### Problem Statement + +Forge currently integrates only with Langfuse for LLM observability. While Langfuse provides solid trace capture and cost tracking, it lacks built-in evaluation capabilities. Teams needing to measure AI output quality (faithfulness, BLEU/ROUGE scores, PII detection, content safety) must build custom evaluation pipelines and post scores manually via the Langfuse API. + +### Current Workarounds + +- Quality metrics like faithfulness and RAGAS scores require writing standalone evaluation scripts and posting results back to Langfuse via `langfuse.score()`. +- There is no built-in agent simulation capability for end-to-end workflow testing. +- Safety evaluations (prompt injection, PII, content moderation) require integrating separate third-party services. + +## Proposal + +### Overview + +Introduce a `forge/integrations/langwatch/` module that mirrors the existing Langfuse integration pattern. Both backends can be enabled simultaneously — their LangChain callbacks are merged into a single list at runtime. LangWatch is disabled by default and requires no new mandatory dependencies. + +### Detailed Design + +**New module:** `src/forge/integrations/langwatch/` +- `__init__.py` — public API exports +- `tracing.py` — `setup_langwatch()`, `get_langwatch_callback()`, `get_langwatch_config()`, `shutdown_langwatch()` + +**Modified modules:** +- `config.py` — three new settings: `LANGWATCH_ENABLED`, `LANGWATCH_API_KEY`, `LANGWATCH_ENDPOINT` +- `integrations/agents/agent.py` — `_run_agent()` collects callbacks from both Langfuse and LangWatch into a unified list +- `main.py` — API server calls `setup_langwatch()` during lifespan startup +- `orchestrator/worker.py` — worker calls `setup_langwatch()` at boot +- `sandbox/runner.py` — passes `LANGWATCH_API_KEY` and `LANGWATCH_ENDPOINT` into containers + +**Configuration:** `.env.example` updated with documented LangWatch settings. Developer guide updated with setup instructions. + +### User Experience + +```bash +# Self-host LangWatch +git clone https://github.com/langwatch/langwatch.git +cd langwatch && docker compose up -d + +# Configure in Forge .env +LANGWATCH_ENABLED=true +LANGWATCH_API_KEY=your-key-from-dashboard +LANGWATCH_ENDPOINT=http://localhost:5560 + +# Restart worker — traces appear at http://localhost:5560 +uv run forge worker +``` + +## Alternatives Considered + +| Alternative | Pros | Cons | Why Not | +|-------------|------|------|---------| +| Langfuse only + custom eval scripts | Already integrated, no new code | No built-in evaluators, no agent simulation, manual scoring | Doesn't scale for the quality metrics Forge needs | +| Replace Langfuse with LangWatch | Single backend, simpler | Breaks existing setups, loses Langfuse-specific features | Disruptive; both can coexist | +| OpenTelemetry direct export | Standard protocol | No LLM-specific UI, no evaluators, no prompt management | Too low-level for LLM observability | + +## Implementation Plan + +### Phases + +1. **Phase 1:** Integration module + config + agent wiring — 1 day (this PR) +2. **Phase 2:** Container entrypoint LangWatch support (auto-setup inside containers) — follow-up +3. **Phase 3:** Built-in evaluator hooks (auto-run RAGAS/safety evals on traces) — follow-up + +### Dependencies + +- [x] `langwatch` Python SDK (pip install langwatch) +- [x] Self-hosted or cloud LangWatch instance +- [ ] No changes to existing Langfuse integration + +### Risks + +| Risk | Likelihood | Impact | Mitigation | +|------|------------|--------|------------| +| SDK compatibility issues with Python 3.11-3.13 | Low | Med | SDK supports 3.10-3.13; tested locally | +| Callback ordering conflicts with Langfuse | Low | Low | Callbacks are independent handlers in a list | +| LangWatch project maturity | Med | Low | Langfuse remains the default; LangWatch is opt-in | + +## Open Questions + +- [ ] Should LangWatch SDK be added to `pyproject.toml` dependencies or remain an optional install? +- [ ] Should the container entrypoint also support LangWatch auto-setup (Phase 2)? +- [ ] Should we add a unified observability config that picks one backend vs. both? + +## References + +- [LangWatch GitHub](https://github.com/langwatch/langwatch) +- [LangWatch Python SDK](https://pypi.org/project/langwatch/) +- [Existing Langfuse integration](../src/forge/integrations/langfuse/tracing.py) +- [Forge proposals template](TEMPLATE.md) diff --git a/proposals/README.md b/proposals/README.md index 76d31ff..8d2543a 100644 --- a/proposals/README.md +++ b/proposals/README.md @@ -30,3 +30,4 @@ This directory contains proposals for new Forge features and enhancements. | 006 | [PR Description Sync After CI Fix Commits](006-ci-fix-pr-description-sync.md) | Implemented | eshulman2 | | 007 | [Dedicated implement_review Node for PR Review Feedback](007-implement-review-node.md) | Implemented | eshulman2 | | 008 | [Stable PR-to-Ticket Association via State Lookup](008-stable-pr-to-ticket-association.md) | Draft | eshulman2 | +| 009 | [LangWatch Integration as Optional Observability Backend](009-langwatch-integration.md) | Under Review | tusharjadhav3302 | diff --git a/src/forge/config.py b/src/forge/config.py index 1b88a4d..8e95a59 100644 --- a/src/forge/config.py +++ b/src/forge/config.py @@ -171,6 +171,19 @@ def detect_model_provider(model_name: str) -> str: default="https://cloud.langfuse.com", description="Langfuse host URL" ) + # LangWatch Configuration + langwatch_enabled: bool = Field( + default=False, + description="Enable LangWatch tracing (requires LANGWATCH_API_KEY)", + ) + langwatch_api_key: SecretStr = Field( + default=SecretStr(""), description="LangWatch project API key" + ) + langwatch_endpoint: str = Field( + default="http://localhost:5560", + description="LangWatch endpoint URL", + ) + # Claude Agent SDK Configuration agent_enable_tools: bool = Field( default=True, diff --git a/src/forge/integrations/agents/agent.py b/src/forge/integrations/agents/agent.py index c3c9f23..55dd464 100644 --- a/src/forge/integrations/agents/agent.py +++ b/src/forge/integrations/agents/agent.py @@ -32,6 +32,7 @@ from forge.config import Settings, get_settings from forge.integrations.langfuse import get_langfuse_config, get_langfuse_context +from forge.integrations.langwatch import get_langwatch_config, setup_langwatch from forge.prompts import load_prompt, set_default_version # Optional Vertex AI support (Claude and Gemini) @@ -598,19 +599,33 @@ async def _run_agent( # Build config with Langfuse tracing if enabled config: dict[str, Any] = {"configurable": {"thread_id": thread_id}} - # Add Langfuse callbacks for observability + # Add observability callbacks (Langfuse and/or LangWatch) + all_callbacks: list[Any] = [] + langfuse_config = get_langfuse_config( trace_name=trace_name or "deep_agent_invocation", session_id=session_id, metadata={"system_prompt_length": str(len(system_prompt))}, ) if langfuse_config: - # Extract context params and remove from config langfuse_ctx_params = langfuse_config.pop("_langfuse_context", {}) - config.update(langfuse_config) + all_callbacks.extend(langfuse_config.get("callbacks", [])) + if "metadata" in langfuse_config: + config["metadata"] = langfuse_config["metadata"] else: langfuse_ctx_params = {} + langwatch_config = get_langwatch_config( + trace_name=trace_name or "deep_agent_invocation", + session_id=session_id, + metadata={"system_prompt_length": str(len(system_prompt))}, + ) + if langwatch_config: + all_callbacks.extend(langwatch_config.get("callbacks", [])) + + if all_callbacks: + config["callbacks"] = all_callbacks + # Invoke the agent with retry logic for transient errors # Use async Langfuse context for session tracking (v3+ API) async with get_langfuse_context( diff --git a/src/forge/integrations/langwatch/__init__.py b/src/forge/integrations/langwatch/__init__.py new file mode 100644 index 0000000..036bec5 --- /dev/null +++ b/src/forge/integrations/langwatch/__init__.py @@ -0,0 +1,15 @@ +"""LangWatch integration for LLM observability.""" + +from forge.integrations.langwatch.tracing import ( + get_langwatch_callback, + get_langwatch_config, + setup_langwatch, + shutdown_langwatch, +) + +__all__ = [ + "get_langwatch_callback", + "get_langwatch_config", + "setup_langwatch", + "shutdown_langwatch", +] diff --git a/src/forge/integrations/langwatch/tracing.py b/src/forge/integrations/langwatch/tracing.py new file mode 100644 index 0000000..2590b70 --- /dev/null +++ b/src/forge/integrations/langwatch/tracing.py @@ -0,0 +1,128 @@ +"""LangWatch tracing integration for LLM observability. + +Provides the same interface as the Langfuse integration so both can be +used interchangeably. LangWatch uses OpenTelemetry under the hood and +offers a LangChain ``BaseCallbackHandler`` that captures LLM calls, +tool use, and agent activity. +""" + +import logging +import os +from typing import Any + +logger = logging.getLogger(__name__) + +_setup_done = False + + +def _langwatch_enabled() -> bool: + """Check whether LangWatch is configured via environment.""" + return bool(os.environ.get("LANGWATCH_API_KEY")) + + +def setup_langwatch() -> None: + """Initialise the LangWatch SDK (idempotent). + + Reads ``LANGWATCH_API_KEY`` and ``LANGWATCH_ENDPOINT`` from the + environment. Must be called once at application startup before any + traces are created. + """ + global _setup_done + if _setup_done or not _langwatch_enabled(): + return + + try: + import langwatch + + endpoint = os.environ.get("LANGWATCH_ENDPOINT", "http://localhost:5560") + api_key = os.environ.get("LANGWATCH_API_KEY", "") + + langwatch.setup( + api_key=api_key, + endpoint_url=endpoint, + ) + _setup_done = True + logger.info(f"LangWatch SDK initialised (endpoint={endpoint})") + except ImportError: + logger.warning("langwatch package not installed — tracing disabled") + except Exception as exc: + logger.error(f"Failed to initialise LangWatch: {exc}") + + +def get_langwatch_callback( + trace_name: str | None = None, + metadata: dict[str, Any] | None = None, +) -> Any | None: + """Return a LangChain callback handler that sends traces to LangWatch. + + Args: + trace_name: Optional human-readable name for the trace. + metadata: Optional metadata dict attached to the trace. + + Returns: + ``LangChainTracer`` instance, or ``None`` if LangWatch is disabled. + """ + if not _langwatch_enabled(): + return None + + setup_langwatch() + + try: + import langwatch + from langwatch.langchain import LangChainTracer + + trace = langwatch.trace(name=trace_name or "forge") + if metadata: + trace.update(metadata=metadata) + + return LangChainTracer(trace=trace) + except ImportError: + logger.warning("langwatch package not installed") + return None + except Exception as exc: + logger.error(f"Failed to create LangWatch callback: {exc}") + return None + + +def get_langwatch_config( + trace_name: str | None = None, + session_id: str | None = None, + metadata: dict[str, Any] | None = None, +) -> dict[str, Any]: + """Build a LangChain-compatible config dict with LangWatch callbacks. + + Can be passed directly to ``agent.ainvoke(..., config=config)``. + + Args: + trace_name: Trace name for the LangWatch dashboard. + session_id: Session / thread ID (e.g. Jira ticket key). + metadata: Extra metadata dict. + + Returns: + Config dict with ``callbacks`` key, or empty dict if disabled. + """ + meta = dict(metadata or {}) + if session_id: + meta["thread_id"] = session_id + + handler = get_langwatch_callback(trace_name=trace_name, metadata=meta) + if handler is None: + return {} + + return {"callbacks": [handler]} + + +async def shutdown_langwatch() -> None: + """Flush pending spans and shut down the LangWatch SDK.""" + if not _setup_done: + return + + try: + from opentelemetry.trace import get_tracer_provider + + provider = get_tracer_provider() + if hasattr(provider, "force_flush"): + provider.force_flush() + logger.info("LangWatch traces flushed") + except Exception as exc: + logger.warning(f"Error flushing LangWatch: {exc}") diff --git a/src/forge/main.py b/src/forge/main.py index 0826a7f..b12bdc5 100644 --- a/src/forge/main.py +++ b/src/forge/main.py @@ -40,6 +40,14 @@ async def lifespan(_app: FastAPI) -> AsyncGenerator[None, None]: ) logger.info("Distributed tracing initialized") + # Startup - initialize LangWatch if enabled + if settings.langwatch_enabled and settings.langwatch_api_key.get_secret_value(): + os.environ.setdefault("LANGWATCH_API_KEY", settings.langwatch_api_key.get_secret_value()) + os.environ.setdefault("LANGWATCH_ENDPOINT", settings.langwatch_endpoint) + from forge.integrations.langwatch import setup_langwatch + setup_langwatch() + logger.info("LangWatch tracing initialized") + yield # Shutdown diff --git a/src/forge/orchestrator/worker.py b/src/forge/orchestrator/worker.py index a05ab9a..1ad0a75 100644 --- a/src/forge/orchestrator/worker.py +++ b/src/forge/orchestrator/worker.py @@ -970,6 +970,12 @@ def main() -> None: format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", ) + # Initialise LangWatch if configured + if os.environ.get("LANGWATCH_API_KEY"): + from forge.integrations.langwatch import setup_langwatch + setup_langwatch() + logger.info("LangWatch tracing initialized in worker") + # Check for single-ticket mode via command line if len(sys.argv) > 1: ticket_key = sys.argv[1] diff --git a/src/forge/sandbox/runner.py b/src/forge/sandbox/runner.py index 789fb69..283c5f0 100644 --- a/src/forge/sandbox/runner.py +++ b/src/forge/sandbox/runner.py @@ -150,6 +150,12 @@ def _build_env_vars( env["LANGFUSE_HOST"] = self.settings.langfuse_host logger.debug("Container Langfuse tracing enabled") + # Pass LangWatch tracing credentials if enabled + if self.settings.langwatch_enabled and self.settings.langwatch_api_key.get_secret_value(): + env["LANGWATCH_API_KEY"] = self.settings.langwatch_api_key.get_secret_value() + env["LANGWATCH_ENDPOINT"] = self.settings.langwatch_endpoint + logger.debug("Container LangWatch tracing enabled") + # Pass system prompt template (unformatted - entrypoint will interpolate) # Load raw template without interpolation by passing empty values prompt_template = load_prompt("container-system")