Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions examples/fastapi-example/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
"""FastAPI application configuration."""

import sys
from pathlib import Path

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import FileResponse
from fastapi.staticfiles import StaticFiles

# Add parent packages to path for development
sys.path.insert(0, str(Path(__file__).parent.parent.parent / "packages" / "llmpane-py"))

from routes import action_router, chat_router, refinement_router


def create_app() -> FastAPI:
"""Create and configure the FastAPI application."""
app = FastAPI(title="llmpane Example", version="0.1.0")

# CORS for development
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

# Register routers
app.include_router(chat_router)
app.include_router(action_router)
app.include_router(refinement_router)

# Serve static files (for the React example)
static_dir = Path(__file__).parent / "static"
if static_dir.exists():
app.mount("/static", StaticFiles(directory=static_dir), name="static")

@app.get("/")
async def serve_index():
"""Serve the React app."""
index_path = static_dir / "index.html"
if index_path.exists():
return FileResponse(index_path)
return {
"message": "llmpane example API",
"endpoints": [
"/api/chat - Basic streaming chat",
"/api/chat/action - Action message pattern",
"/api/chat/refinement - Refinement message pattern",
],
}

return app


app = create_app()
257 changes: 3 additions & 254 deletions examples/fastapi-example/main.py
Original file line number Diff line number Diff line change
@@ -1,262 +1,11 @@
"""FastAPI example demonstrating all llmpane patterns with a mock LLM."""
"""Entry point for the FastAPI example application."""

import asyncio
import json
import os
import random
import uuid
from collections.abc import AsyncGenerator
from pathlib import Path

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
from pydantic import BaseModel

# Add parent packages to path for development
import sys

sys.path.insert(0, str(Path(__file__).parent.parent.parent / "packages" / "llmpane-py"))

from llmpane import ChatRequest, StreamChunk
from llmpane.streaming import create_sse_response

app = FastAPI(title="llmpane Example", version="0.1.0")

# CORS for development
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)


# --- Mock LLM Responses ---


MOCK_RESPONSES = [
"That's a great question! Let me think about it...\n\nBased on my analysis, I'd say the answer involves considering multiple factors. First, we need to understand the context. Then, we can break down the problem into smaller parts.",
"I'd be happy to help with that! Here's what I think:\n\n1. Start by defining the problem clearly\n2. Gather relevant information\n3. Analyze the options\n4. Make a decision based on the evidence",
"Interesting point! From my perspective, there are several ways to approach this. The most effective method depends on your specific situation and goals.",
"Thanks for asking! This is a topic I find fascinating. Let me share some thoughts:\n\nThe key insight here is that simplicity often leads to better outcomes. Focus on the fundamentals and build from there.",
]


async def generate_mock_response(message: str) -> AsyncGenerator[str, None]:
"""Simulate realistic LLM streaming with variable-sized chunks.

Real LLMs stream in chunks of varying sizes (1-10 words typically),
with the delay being the model's generation time, not network latency.
"""
response = random.choice(MOCK_RESPONSES)
words = response.split(" ")
i = 0

while i < len(words):
# Random chunk size (1-5 words) - mimics real LLM behavior
chunk_size = random.randint(1, 5)
chunk_words = words[i : i + chunk_size]

# Build chunk with spaces
if i > 0:
chunk = " " + " ".join(chunk_words)
else:
chunk = " ".join(chunk_words)

yield chunk
i += chunk_size

# Minimal delay - real LLMs stream as fast as they generate
# This simulates ~50-100 tokens/second which is typical
await asyncio.sleep(random.uniform(0.01, 0.03))


# --- Basic Chat Endpoint ---


@app.post("/api/chat")
async def chat(request: ChatRequest):
"""Basic streaming chat endpoint."""

async def generate() -> AsyncGenerator[StreamChunk, None]:
message_id = f"msg_{uuid.uuid4().hex[:12]}"
accumulated = ""

async for char in generate_mock_response(request.message):
accumulated += char
yield StreamChunk(delta=char)

yield StreamChunk(done=True, message_id=message_id)

return create_sse_response(generate())


# --- Action Message Pattern ---


class FilterAction(BaseModel):
"""Example action: filter data by a field."""

field: str
operator: str
value: str


class ActionMetadataPayload(BaseModel):
"""Metadata for action messages."""

action: FilterAction
status: str = "pending"
explanation: str


FILTER_SUGGESTIONS = [
FilterAction(field="date", operator=">=", value="2024-01-01"),
FilterAction(field="status", operator="=", value="active"),
FilterAction(field="amount", operator=">", value="1000"),
FilterAction(field="category", operator="in", value="electronics,clothing"),
]


@app.post("/api/chat/action")
async def chat_with_action(request: ChatRequest):
"""Chat endpoint that returns proposed actions."""

async def generate() -> AsyncGenerator[StreamChunk, None]:
message_id = f"msg_{uuid.uuid4().hex[:12]}"

# Stream explanation in realistic chunks
explanation = "Based on your request, I suggest applying the following filter:"
async for chunk in generate_mock_response(explanation):
yield StreamChunk(delta=chunk)
# Small pause before showing the action card
await asyncio.sleep(0.05)

# Then send the action metadata
action = random.choice(FILTER_SUGGESTIONS)
metadata = ActionMetadataPayload(
action=action,
status="pending",
explanation=f"Filter {action.field} where it {action.operator} {action.value}",
)

yield StreamChunk(metadata=metadata)
yield StreamChunk(done=True, message_id=message_id)

return create_sse_response(generate())


# --- Refinement Message Pattern ---


class SQLQuery(BaseModel):
"""Example output: a SQL query."""

query: str
tables: list[str]
estimated_rows: int


class RefinementMetadataPayload(BaseModel):
"""Metadata for refinement messages."""

output: SQLQuery
iteration: int
is_final: bool = False


# Simulate refining queries over iterations
QUERY_ITERATIONS = [
SQLQuery(
query="SELECT * FROM orders",
tables=["orders"],
estimated_rows=10000,
),
SQLQuery(
query="SELECT id, customer_id, total FROM orders WHERE status = 'completed'",
tables=["orders"],
estimated_rows=5000,
),
SQLQuery(
query=(
"SELECT o.id, c.name, o.total "
"FROM orders o "
"JOIN customers c ON o.customer_id = c.id "
"WHERE o.status = 'completed' AND o.total > 100"
),
tables=["orders", "customers"],
estimated_rows=1500,
),
]

# Track iteration per conversation (simplified - in production use proper session management)
iteration_tracker: dict[str, int] = {}


@app.post("/api/chat/refinement")
async def chat_with_refinement(request: ChatRequest):
"""Chat endpoint that returns iteratively refined output."""
conv_id = request.conversation_id or "default"

# Get or initialize iteration count
if conv_id not in iteration_tracker:
iteration_tracker[conv_id] = 0

current_iteration = iteration_tracker[conv_id]
iteration_tracker[conv_id] = (current_iteration + 1) % len(QUERY_ITERATIONS)

async def generate() -> AsyncGenerator[StreamChunk, None]:
message_id = f"msg_{uuid.uuid4().hex[:12]}"

# Stream explanation in realistic chunks
explanation = f"Here's iteration {current_iteration + 1} of the query based on your feedback:"
async for chunk in generate_mock_response(explanation):
yield StreamChunk(delta=chunk)
# Small pause before showing the refinement card
await asyncio.sleep(0.05)

# Send refinement metadata
query = QUERY_ITERATIONS[current_iteration]
metadata = RefinementMetadataPayload(
output=query,
iteration=current_iteration + 1,
is_final=current_iteration == len(QUERY_ITERATIONS) - 1,
)

yield StreamChunk(metadata=metadata)
yield StreamChunk(done=True, message_id=message_id)

return create_sse_response(generate())


# --- Serve Static Files (for the React example) ---

static_dir = Path(__file__).parent / "static"
if static_dir.exists():
app.mount("/static", StaticFiles(directory=static_dir), name="static")


@app.get("/")
async def serve_index():
"""Serve the React app."""
index_path = static_dir / "index.html"
if index_path.exists():
return FileResponse(index_path)
return {
"message": "llmpane example API",
"endpoints": [
"/api/chat - Basic streaming chat",
"/api/chat/action - Action message pattern",
"/api/chat/refinement - Refinement message pattern",
],
}
import uvicorn

from app import app

if __name__ == "__main__":
import uvicorn

port = int(os.environ.get("PORT", 8000))
uvicorn.run(app, host="0.0.0.0", port=port)
41 changes: 41 additions & 0 deletions examples/fastapi-example/mock_llm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Mock LLM response generation for demonstration purposes."""

import asyncio
import random
from collections.abc import AsyncGenerator

MOCK_RESPONSES = [
"That's a great question! Let me think about it...\n\nBased on my analysis, I'd say the answer involves considering multiple factors. First, we need to understand the context. Then, we can break down the problem into smaller parts.",
"I'd be happy to help with that! Here's what I think:\n\n1. Start by defining the problem clearly\n2. Gather relevant information\n3. Analyze the options\n4. Make a decision based on the evidence",
"Interesting point! From my perspective, there are several ways to approach this. The most effective method depends on your specific situation and goals.",
"Thanks for asking! This is a topic I find fascinating. Let me share some thoughts:\n\nThe key insight here is that simplicity often leads to better outcomes. Focus on the fundamentals and build from there.",
]


async def generate_mock_response(_message: str) -> AsyncGenerator[str, None]:
"""Simulate realistic LLM streaming with variable-sized chunks.

Real LLMs stream in chunks of varying sizes (1-10 words typically),
with the delay being the model's generation time, not network latency.
"""
response = random.choice(MOCK_RESPONSES)
words = response.split(" ")
i = 0

while i < len(words):
# Random chunk size (1-5 words) - mimics real LLM behavior
chunk_size = random.randint(1, 5)
chunk_words = words[i : i + chunk_size]

# Build chunk with spaces
if i > 0:
chunk = " " + " ".join(chunk_words)
else:
chunk = " ".join(chunk_words)

yield chunk
i += chunk_size

# Minimal delay - real LLMs stream as fast as they generate
# This simulates ~50-100 tokens/second which is typical
await asyncio.sleep(random.uniform(0.01, 0.03))
7 changes: 7 additions & 0 deletions examples/fastapi-example/routes/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
"""API route modules."""

from routes.action import router as action_router
from routes.chat import router as chat_router
from routes.refinement import router as refinement_router

__all__ = ["action_router", "chat_router", "refinement_router"]
Loading