Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,20 @@ LANGFUSE_HOST=https://cloud.langfuse.com
# Disable OpenTelemetry OTLP export (we use Langfuse callback handler instead)
OTEL_SDK_DISABLED=true

# =============================================================================
# LangWatch Configuration (Observability — alternative/complement to Langfuse)
# =============================================================================
# LangWatch provides OTLP-native tracing, 30+ built-in evaluators (RAGAS,
# safety, PII), and agent simulation. Both Langfuse and LangWatch can be
# enabled simultaneously — callbacks are merged at runtime.
#
# Self-host: git clone https://github.com/langwatch/langwatch && docker compose up -d
# Dashboard: http://localhost:5560
LANGWATCH_ENABLED=false
# Get your API key from LangWatch project settings (http://localhost:5560 or cloud)
LANGWATCH_API_KEY=
LANGWATCH_ENDPOINT=http://localhost:5560

# =============================================================================
# Container Configuration
# =============================================================================
Expand Down
47 changes: 47 additions & 0 deletions docs/developer-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -486,6 +486,53 @@ LANGFUSE_ENABLED=false

---

## 9b. LangWatch Tracing (Alternative)

[LangWatch](https://langwatch.ai) is an OpenTelemetry-native LLM observability platform with 30+ built-in evaluators (RAGAS faithfulness, BLEU, ROUGE, PII detection, content safety), agent simulation, and prompt management with Git sync.

Both Langfuse and LangWatch can be enabled simultaneously — their callbacks are merged at runtime.

### Setup

1. Self-host LangWatch:

```bash
git clone https://github.com/langwatch/langwatch.git
cd langwatch
cp langwatch/.env.example langwatch/.env
docker compose up -d
# Dashboard at http://localhost:5560
```

2. Create an account and project in the LangWatch UI
3. Copy your API key from project settings
4. Add to `.env`:

```bash
LANGWATCH_ENABLED=true
LANGWATCH_API_KEY=your-api-key
LANGWATCH_ENDPOINT=http://localhost:5560
```

5. Restart the worker — traces appear in the LangWatch dashboard immediately

### Built-in evaluators

LangWatch includes evaluators that can be configured from the dashboard:

- **RAGAS**: faithfulness, context precision/recall, BLEU, ROUGE, factual correctness
- **Safety**: Azure content safety, jailbreak detection, prompt injection
- **Quality**: sentiment, similarity, off-topic detection, format validation
- **PII**: Presidio-based PII detection

### Disabling LangWatch

```bash
LANGWATCH_ENABLED=false
```

---

## 10. Debugging Tools

### Patch a workflow checkpoint
Expand Down
101 changes: 101 additions & 0 deletions proposals/009-langwatch-integration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
# Proposal: LangWatch Integration as Optional Observability Backend

**Author:** Tushar Jadhav
**Date:** 2026-04-22
**Status:** Under Review

## Summary

Add LangWatch as an optional, parallel observability backend alongside the existing Langfuse integration. LangWatch provides OTLP-native LLM tracing, 30+ built-in evaluators (RAGAS, safety, PII detection), agent simulation, and prompt management with Git sync — capabilities that directly support Forge's AI quality metrics needs.

## Motivation

### Problem Statement

Forge currently integrates only with Langfuse for LLM observability. While Langfuse provides solid trace capture and cost tracking, it lacks built-in evaluation capabilities. Teams needing to measure AI output quality (faithfulness, BLEU/ROUGE scores, PII detection, content safety) must build custom evaluation pipelines and post scores manually via the Langfuse API.

### Current Workarounds

- Quality metrics like faithfulness and RAGAS scores require writing standalone evaluation scripts and posting results back to Langfuse via `langfuse.score()`.
- There is no built-in agent simulation capability for end-to-end workflow testing.
- Safety evaluations (prompt injection, PII, content moderation) require integrating separate third-party services.

## Proposal

### Overview

Introduce a `forge/integrations/langwatch/` module that mirrors the existing Langfuse integration pattern. Both backends can be enabled simultaneously — their LangChain callbacks are merged into a single list at runtime. LangWatch is disabled by default and requires no new mandatory dependencies.

### Detailed Design

**New module:** `src/forge/integrations/langwatch/`
- `__init__.py` — public API exports
- `tracing.py` — `setup_langwatch()`, `get_langwatch_callback()`, `get_langwatch_config()`, `shutdown_langwatch()`

**Modified modules:**
- `config.py` — three new settings: `LANGWATCH_ENABLED`, `LANGWATCH_API_KEY`, `LANGWATCH_ENDPOINT`
- `integrations/agents/agent.py` — `_run_agent()` collects callbacks from both Langfuse and LangWatch into a unified list
- `main.py` — API server calls `setup_langwatch()` during lifespan startup
- `orchestrator/worker.py` — worker calls `setup_langwatch()` at boot
- `sandbox/runner.py` — passes `LANGWATCH_API_KEY` and `LANGWATCH_ENDPOINT` into containers

**Configuration:** `.env.example` updated with documented LangWatch settings. Developer guide updated with setup instructions.

### User Experience

```bash
# Self-host LangWatch
git clone https://github.com/langwatch/langwatch.git
cd langwatch && docker compose up -d

# Configure in Forge .env
LANGWATCH_ENABLED=true
LANGWATCH_API_KEY=your-key-from-dashboard
LANGWATCH_ENDPOINT=http://localhost:5560

# Restart worker — traces appear at http://localhost:5560
uv run forge worker
```

## Alternatives Considered

| Alternative | Pros | Cons | Why Not |
|-------------|------|------|---------|
| Langfuse only + custom eval scripts | Already integrated, no new code | No built-in evaluators, no agent simulation, manual scoring | Doesn't scale for the quality metrics Forge needs |
| Replace Langfuse with LangWatch | Single backend, simpler | Breaks existing setups, loses Langfuse-specific features | Disruptive; both can coexist |
| OpenTelemetry direct export | Standard protocol | No LLM-specific UI, no evaluators, no prompt management | Too low-level for LLM observability |

## Implementation Plan

### Phases

1. **Phase 1:** Integration module + config + agent wiring — 1 day (this PR)
2. **Phase 2:** Container entrypoint LangWatch support (auto-setup inside containers) — follow-up
3. **Phase 3:** Built-in evaluator hooks (auto-run RAGAS/safety evals on traces) — follow-up

### Dependencies

- [x] `langwatch` Python SDK (pip install langwatch)
- [x] Self-hosted or cloud LangWatch instance
- [ ] No changes to existing Langfuse integration

### Risks

| Risk | Likelihood | Impact | Mitigation |
|------|------------|--------|------------|
| SDK compatibility issues with Python 3.11-3.13 | Low | Med | SDK supports 3.10-3.13; tested locally |
| Callback ordering conflicts with Langfuse | Low | Low | Callbacks are independent handlers in a list |
| LangWatch project maturity | Med | Low | Langfuse remains the default; LangWatch is opt-in |

## Open Questions

- [ ] Should LangWatch SDK be added to `pyproject.toml` dependencies or remain an optional install?
- [ ] Should the container entrypoint also support LangWatch auto-setup (Phase 2)?
- [ ] Should we add a unified observability config that picks one backend vs. both?

## References

- [LangWatch GitHub](https://github.com/langwatch/langwatch)
- [LangWatch Python SDK](https://pypi.org/project/langwatch/)
- [Existing Langfuse integration](../src/forge/integrations/langfuse/tracing.py)
- [Forge proposals template](TEMPLATE.md)
1 change: 1 addition & 0 deletions proposals/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,3 +30,4 @@ This directory contains proposals for new Forge features and enhancements.
| 006 | [PR Description Sync After CI Fix Commits](006-ci-fix-pr-description-sync.md) | Implemented | eshulman2 |
| 007 | [Dedicated implement_review Node for PR Review Feedback](007-implement-review-node.md) | Implemented | eshulman2 |
| 008 | [Stable PR-to-Ticket Association via State Lookup](008-stable-pr-to-ticket-association.md) | Draft | eshulman2 |
| 009 | [LangWatch Integration as Optional Observability Backend](009-langwatch-integration.md) | Under Review | tusharjadhav3302 |
13 changes: 13 additions & 0 deletions src/forge/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,19 @@ def detect_model_provider(model_name: str) -> str:
default="https://cloud.langfuse.com", description="Langfuse host URL"
)

# LangWatch Configuration
langwatch_enabled: bool = Field(
default=False,
description="Enable LangWatch tracing (requires LANGWATCH_API_KEY)",
)
langwatch_api_key: SecretStr = Field(
default=SecretStr(""), description="LangWatch project API key"
)
langwatch_endpoint: str = Field(
default="http://localhost:5560",
description="LangWatch endpoint URL",
)

# Claude Agent SDK Configuration
agent_enable_tools: bool = Field(
default=True,
Expand Down
21 changes: 18 additions & 3 deletions src/forge/integrations/agents/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

from forge.config import Settings, get_settings
from forge.integrations.langfuse import get_langfuse_config, get_langfuse_context
from forge.integrations.langwatch import get_langwatch_config, setup_langwatch
from forge.prompts import load_prompt, set_default_version

# Optional Vertex AI support (Claude and Gemini)
Expand Down Expand Up @@ -598,19 +599,33 @@ async def _run_agent(
# Build config with Langfuse tracing if enabled
config: dict[str, Any] = {"configurable": {"thread_id": thread_id}}

# Add Langfuse callbacks for observability
# Add observability callbacks (Langfuse and/or LangWatch)
all_callbacks: list[Any] = []

langfuse_config = get_langfuse_config(
trace_name=trace_name or "deep_agent_invocation",
session_id=session_id,
metadata={"system_prompt_length": str(len(system_prompt))},
)
if langfuse_config:
# Extract context params and remove from config
langfuse_ctx_params = langfuse_config.pop("_langfuse_context", {})
config.update(langfuse_config)
all_callbacks.extend(langfuse_config.get("callbacks", []))
if "metadata" in langfuse_config:
config["metadata"] = langfuse_config["metadata"]
else:
langfuse_ctx_params = {}

langwatch_config = get_langwatch_config(
trace_name=trace_name or "deep_agent_invocation",
session_id=session_id,
metadata={"system_prompt_length": str(len(system_prompt))},
)
if langwatch_config:
all_callbacks.extend(langwatch_config.get("callbacks", []))

if all_callbacks:
config["callbacks"] = all_callbacks

# Invoke the agent with retry logic for transient errors
# Use async Langfuse context for session tracking (v3+ API)
async with get_langfuse_context(
Expand Down
15 changes: 15 additions & 0 deletions src/forge/integrations/langwatch/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
"""LangWatch integration for LLM observability."""

from forge.integrations.langwatch.tracing import (
get_langwatch_callback,
get_langwatch_config,
setup_langwatch,
shutdown_langwatch,
)

__all__ = [
"get_langwatch_callback",
"get_langwatch_config",
"setup_langwatch",
"shutdown_langwatch",
]
128 changes: 128 additions & 0 deletions src/forge/integrations/langwatch/tracing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""LangWatch tracing integration for LLM observability.

Provides the same interface as the Langfuse integration so both can be
used interchangeably. LangWatch uses OpenTelemetry under the hood and
offers a LangChain ``BaseCallbackHandler`` that captures LLM calls,
tool use, and agent activity.
"""

import logging
import os
from typing import Any

logger = logging.getLogger(__name__)

_setup_done = False


def _langwatch_enabled() -> bool:
"""Check whether LangWatch is configured via environment."""
return bool(os.environ.get("LANGWATCH_API_KEY"))


def setup_langwatch() -> None:
"""Initialise the LangWatch SDK (idempotent).

Reads ``LANGWATCH_API_KEY`` and ``LANGWATCH_ENDPOINT`` from the
environment. Must be called once at application startup before any
traces are created.
"""
global _setup_done
if _setup_done or not _langwatch_enabled():
return

try:
import langwatch

endpoint = os.environ.get("LANGWATCH_ENDPOINT", "http://localhost:5560")
api_key = os.environ.get("LANGWATCH_API_KEY", "")

langwatch.setup(
api_key=api_key,
endpoint_url=endpoint,
)
_setup_done = True
logger.info(f"LangWatch SDK initialised (endpoint={endpoint})")
except ImportError:
logger.warning("langwatch package not installed — tracing disabled")
except Exception as exc:
logger.error(f"Failed to initialise LangWatch: {exc}")


def get_langwatch_callback(
trace_name: str | None = None,
metadata: dict[str, Any] | None = None,
) -> Any | None:
"""Return a LangChain callback handler that sends traces to LangWatch.

Args:
trace_name: Optional human-readable name for the trace.
metadata: Optional metadata dict attached to the trace.

Returns:
``LangChainTracer`` instance, or ``None`` if LangWatch is disabled.
"""
if not _langwatch_enabled():
return None

setup_langwatch()

try:
import langwatch
from langwatch.langchain import LangChainTracer

trace = langwatch.trace(name=trace_name or "forge")
if metadata:
trace.update(metadata=metadata)

return LangChainTracer(trace=trace)
except ImportError:
logger.warning("langwatch package not installed")
return None
except Exception as exc:
logger.error(f"Failed to create LangWatch callback: {exc}")
return None


def get_langwatch_config(
trace_name: str | None = None,
session_id: str | None = None,
metadata: dict[str, Any] | None = None,
) -> dict[str, Any]:
"""Build a LangChain-compatible config dict with LangWatch callbacks.

Can be passed directly to ``agent.ainvoke(..., config=config)``.

Args:
trace_name: Trace name for the LangWatch dashboard.
session_id: Session / thread ID (e.g. Jira ticket key).
metadata: Extra metadata dict.

Returns:
Config dict with ``callbacks`` key, or empty dict if disabled.
"""
meta = dict(metadata or {})
if session_id:
meta["thread_id"] = session_id

handler = get_langwatch_callback(trace_name=trace_name, metadata=meta)
if handler is None:
return {}

return {"callbacks": [handler]}


async def shutdown_langwatch() -> None:
"""Flush pending spans and shut down the LangWatch SDK."""
if not _setup_done:
return

try:
from opentelemetry.trace import get_tracer_provider

provider = get_tracer_provider()
if hasattr(provider, "force_flush"):
provider.force_flush()
logger.info("LangWatch traces flushed")
except Exception as exc:
logger.warning(f"Error flushing LangWatch: {exc}")
Loading