From 61d36b261ef70c1aada9ad51f9af4f74c064f90e Mon Sep 17 00:00:00 2001 From: Cody Hart Date: Wed, 24 Dec 2025 16:13:50 -0500 Subject: [PATCH] separate example project modules --- examples/fastapi-example/app.py | 58 ++++ examples/fastapi-example/main.py | 257 +----------------- examples/fastapi-example/mock_llm.py | 41 +++ examples/fastapi-example/routes/__init__.py | 7 + examples/fastapi-example/routes/action.py | 67 +++++ examples/fastapi-example/routes/chat.py | 27 ++ examples/fastapi-example/routes/refinement.py | 94 +++++++ 7 files changed, 297 insertions(+), 254 deletions(-) create mode 100644 examples/fastapi-example/app.py create mode 100644 examples/fastapi-example/mock_llm.py create mode 100644 examples/fastapi-example/routes/__init__.py create mode 100644 examples/fastapi-example/routes/action.py create mode 100644 examples/fastapi-example/routes/chat.py create mode 100644 examples/fastapi-example/routes/refinement.py diff --git a/examples/fastapi-example/app.py b/examples/fastapi-example/app.py new file mode 100644 index 0000000..463cd4e --- /dev/null +++ b/examples/fastapi-example/app.py @@ -0,0 +1,58 @@ +"""FastAPI application configuration.""" + +import sys +from pathlib import Path + +from fastapi import FastAPI +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import FileResponse +from fastapi.staticfiles import StaticFiles + +# Add parent packages to path for development +sys.path.insert(0, str(Path(__file__).parent.parent.parent / "packages" / "llmpane-py")) + +from routes import action_router, chat_router, refinement_router + + +def create_app() -> FastAPI: + """Create and configure the FastAPI application.""" + app = FastAPI(title="llmpane Example", version="0.1.0") + + # CORS for development + app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + + # Register routers + app.include_router(chat_router) + app.include_router(action_router) + app.include_router(refinement_router) + + # Serve static files (for the React example) + static_dir = Path(__file__).parent / "static" + if static_dir.exists(): + app.mount("/static", StaticFiles(directory=static_dir), name="static") + + @app.get("/") + async def serve_index(): + """Serve the React app.""" + index_path = static_dir / "index.html" + if index_path.exists(): + return FileResponse(index_path) + return { + "message": "llmpane example API", + "endpoints": [ + "/api/chat - Basic streaming chat", + "/api/chat/action - Action message pattern", + "/api/chat/refinement - Refinement message pattern", + ], + } + + return app + + +app = create_app() diff --git a/examples/fastapi-example/main.py b/examples/fastapi-example/main.py index b1dcfb9..8454486 100644 --- a/examples/fastapi-example/main.py +++ b/examples/fastapi-example/main.py @@ -1,262 +1,11 @@ -"""FastAPI example demonstrating all llmpane patterns with a mock LLM.""" +"""Entry point for the FastAPI example application.""" -import asyncio -import json import os -import random -import uuid -from collections.abc import AsyncGenerator -from pathlib import Path -from fastapi import FastAPI -from fastapi.middleware.cors import CORSMiddleware -from fastapi.staticfiles import StaticFiles -from fastapi.responses import FileResponse -from pydantic import BaseModel - -# Add parent packages to path for development -import sys - -sys.path.insert(0, str(Path(__file__).parent.parent.parent / "packages" / "llmpane-py")) - -from llmpane import ChatRequest, StreamChunk -from llmpane.streaming import create_sse_response - -app = FastAPI(title="llmpane Example", version="0.1.0") - -# CORS for development -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) - - -# --- Mock LLM Responses --- - - -MOCK_RESPONSES = [ - "That's a great question! Let me think about it...\n\nBased on my analysis, I'd say the answer involves considering multiple factors. First, we need to understand the context. Then, we can break down the problem into smaller parts.", - "I'd be happy to help with that! Here's what I think:\n\n1. Start by defining the problem clearly\n2. Gather relevant information\n3. Analyze the options\n4. Make a decision based on the evidence", - "Interesting point! From my perspective, there are several ways to approach this. The most effective method depends on your specific situation and goals.", - "Thanks for asking! This is a topic I find fascinating. Let me share some thoughts:\n\nThe key insight here is that simplicity often leads to better outcomes. Focus on the fundamentals and build from there.", -] - - -async def generate_mock_response(message: str) -> AsyncGenerator[str, None]: - """Simulate realistic LLM streaming with variable-sized chunks. - - Real LLMs stream in chunks of varying sizes (1-10 words typically), - with the delay being the model's generation time, not network latency. - """ - response = random.choice(MOCK_RESPONSES) - words = response.split(" ") - i = 0 - - while i < len(words): - # Random chunk size (1-5 words) - mimics real LLM behavior - chunk_size = random.randint(1, 5) - chunk_words = words[i : i + chunk_size] - - # Build chunk with spaces - if i > 0: - chunk = " " + " ".join(chunk_words) - else: - chunk = " ".join(chunk_words) - - yield chunk - i += chunk_size - - # Minimal delay - real LLMs stream as fast as they generate - # This simulates ~50-100 tokens/second which is typical - await asyncio.sleep(random.uniform(0.01, 0.03)) - - -# --- Basic Chat Endpoint --- - - -@app.post("/api/chat") -async def chat(request: ChatRequest): - """Basic streaming chat endpoint.""" - - async def generate() -> AsyncGenerator[StreamChunk, None]: - message_id = f"msg_{uuid.uuid4().hex[:12]}" - accumulated = "" - - async for char in generate_mock_response(request.message): - accumulated += char - yield StreamChunk(delta=char) - - yield StreamChunk(done=True, message_id=message_id) - - return create_sse_response(generate()) - - -# --- Action Message Pattern --- - - -class FilterAction(BaseModel): - """Example action: filter data by a field.""" - - field: str - operator: str - value: str - - -class ActionMetadataPayload(BaseModel): - """Metadata for action messages.""" - - action: FilterAction - status: str = "pending" - explanation: str - - -FILTER_SUGGESTIONS = [ - FilterAction(field="date", operator=">=", value="2024-01-01"), - FilterAction(field="status", operator="=", value="active"), - FilterAction(field="amount", operator=">", value="1000"), - FilterAction(field="category", operator="in", value="electronics,clothing"), -] - - -@app.post("/api/chat/action") -async def chat_with_action(request: ChatRequest): - """Chat endpoint that returns proposed actions.""" - - async def generate() -> AsyncGenerator[StreamChunk, None]: - message_id = f"msg_{uuid.uuid4().hex[:12]}" - - # Stream explanation in realistic chunks - explanation = "Based on your request, I suggest applying the following filter:" - async for chunk in generate_mock_response(explanation): - yield StreamChunk(delta=chunk) - # Small pause before showing the action card - await asyncio.sleep(0.05) - - # Then send the action metadata - action = random.choice(FILTER_SUGGESTIONS) - metadata = ActionMetadataPayload( - action=action, - status="pending", - explanation=f"Filter {action.field} where it {action.operator} {action.value}", - ) - - yield StreamChunk(metadata=metadata) - yield StreamChunk(done=True, message_id=message_id) - - return create_sse_response(generate()) - - -# --- Refinement Message Pattern --- - - -class SQLQuery(BaseModel): - """Example output: a SQL query.""" - - query: str - tables: list[str] - estimated_rows: int - - -class RefinementMetadataPayload(BaseModel): - """Metadata for refinement messages.""" - - output: SQLQuery - iteration: int - is_final: bool = False - - -# Simulate refining queries over iterations -QUERY_ITERATIONS = [ - SQLQuery( - query="SELECT * FROM orders", - tables=["orders"], - estimated_rows=10000, - ), - SQLQuery( - query="SELECT id, customer_id, total FROM orders WHERE status = 'completed'", - tables=["orders"], - estimated_rows=5000, - ), - SQLQuery( - query=( - "SELECT o.id, c.name, o.total " - "FROM orders o " - "JOIN customers c ON o.customer_id = c.id " - "WHERE o.status = 'completed' AND o.total > 100" - ), - tables=["orders", "customers"], - estimated_rows=1500, - ), -] - -# Track iteration per conversation (simplified - in production use proper session management) -iteration_tracker: dict[str, int] = {} - - -@app.post("/api/chat/refinement") -async def chat_with_refinement(request: ChatRequest): - """Chat endpoint that returns iteratively refined output.""" - conv_id = request.conversation_id or "default" - - # Get or initialize iteration count - if conv_id not in iteration_tracker: - iteration_tracker[conv_id] = 0 - - current_iteration = iteration_tracker[conv_id] - iteration_tracker[conv_id] = (current_iteration + 1) % len(QUERY_ITERATIONS) - - async def generate() -> AsyncGenerator[StreamChunk, None]: - message_id = f"msg_{uuid.uuid4().hex[:12]}" - - # Stream explanation in realistic chunks - explanation = f"Here's iteration {current_iteration + 1} of the query based on your feedback:" - async for chunk in generate_mock_response(explanation): - yield StreamChunk(delta=chunk) - # Small pause before showing the refinement card - await asyncio.sleep(0.05) - - # Send refinement metadata - query = QUERY_ITERATIONS[current_iteration] - metadata = RefinementMetadataPayload( - output=query, - iteration=current_iteration + 1, - is_final=current_iteration == len(QUERY_ITERATIONS) - 1, - ) - - yield StreamChunk(metadata=metadata) - yield StreamChunk(done=True, message_id=message_id) - - return create_sse_response(generate()) - - -# --- Serve Static Files (for the React example) --- - -static_dir = Path(__file__).parent / "static" -if static_dir.exists(): - app.mount("/static", StaticFiles(directory=static_dir), name="static") - - -@app.get("/") -async def serve_index(): - """Serve the React app.""" - index_path = static_dir / "index.html" - if index_path.exists(): - return FileResponse(index_path) - return { - "message": "llmpane example API", - "endpoints": [ - "/api/chat - Basic streaming chat", - "/api/chat/action - Action message pattern", - "/api/chat/refinement - Refinement message pattern", - ], - } +import uvicorn +from app import app if __name__ == "__main__": - import uvicorn - port = int(os.environ.get("PORT", 8000)) uvicorn.run(app, host="0.0.0.0", port=port) diff --git a/examples/fastapi-example/mock_llm.py b/examples/fastapi-example/mock_llm.py new file mode 100644 index 0000000..c8fca9d --- /dev/null +++ b/examples/fastapi-example/mock_llm.py @@ -0,0 +1,41 @@ +"""Mock LLM response generation for demonstration purposes.""" + +import asyncio +import random +from collections.abc import AsyncGenerator + +MOCK_RESPONSES = [ + "That's a great question! Let me think about it...\n\nBased on my analysis, I'd say the answer involves considering multiple factors. First, we need to understand the context. Then, we can break down the problem into smaller parts.", + "I'd be happy to help with that! Here's what I think:\n\n1. Start by defining the problem clearly\n2. Gather relevant information\n3. Analyze the options\n4. Make a decision based on the evidence", + "Interesting point! From my perspective, there are several ways to approach this. The most effective method depends on your specific situation and goals.", + "Thanks for asking! This is a topic I find fascinating. Let me share some thoughts:\n\nThe key insight here is that simplicity often leads to better outcomes. Focus on the fundamentals and build from there.", +] + + +async def generate_mock_response(_message: str) -> AsyncGenerator[str, None]: + """Simulate realistic LLM streaming with variable-sized chunks. + + Real LLMs stream in chunks of varying sizes (1-10 words typically), + with the delay being the model's generation time, not network latency. + """ + response = random.choice(MOCK_RESPONSES) + words = response.split(" ") + i = 0 + + while i < len(words): + # Random chunk size (1-5 words) - mimics real LLM behavior + chunk_size = random.randint(1, 5) + chunk_words = words[i : i + chunk_size] + + # Build chunk with spaces + if i > 0: + chunk = " " + " ".join(chunk_words) + else: + chunk = " ".join(chunk_words) + + yield chunk + i += chunk_size + + # Minimal delay - real LLMs stream as fast as they generate + # This simulates ~50-100 tokens/second which is typical + await asyncio.sleep(random.uniform(0.01, 0.03)) diff --git a/examples/fastapi-example/routes/__init__.py b/examples/fastapi-example/routes/__init__.py new file mode 100644 index 0000000..67adab0 --- /dev/null +++ b/examples/fastapi-example/routes/__init__.py @@ -0,0 +1,7 @@ +"""API route modules.""" + +from routes.action import router as action_router +from routes.chat import router as chat_router +from routes.refinement import router as refinement_router + +__all__ = ["action_router", "chat_router", "refinement_router"] diff --git a/examples/fastapi-example/routes/action.py b/examples/fastapi-example/routes/action.py new file mode 100644 index 0000000..b2c4cd7 --- /dev/null +++ b/examples/fastapi-example/routes/action.py @@ -0,0 +1,67 @@ +"""Action message pattern endpoint.""" + +import asyncio +import random +import uuid +from collections.abc import AsyncGenerator + +from fastapi import APIRouter +from pydantic import BaseModel + +from llmpane import ChatRequest, StreamChunk +from llmpane.streaming import create_sse_response +from mock_llm import generate_mock_response + +router = APIRouter() + + +class FilterAction(BaseModel): + """Example action: filter data by a field.""" + + field: str + operator: str + value: str + + +class ActionMetadataPayload(BaseModel): + """Metadata for action messages.""" + + action: FilterAction + status: str = "pending" + explanation: str + + +FILTER_SUGGESTIONS = [ + FilterAction(field="date", operator=">=", value="2024-01-01"), + FilterAction(field="status", operator="=", value="active"), + FilterAction(field="amount", operator=">", value="1000"), + FilterAction(field="category", operator="in", value="electronics,clothing"), +] + + +@router.post("/api/chat/action") +async def chat_with_action(request: ChatRequest): + """Chat endpoint that returns proposed actions.""" + + async def generate() -> AsyncGenerator[StreamChunk, None]: + message_id = f"msg_{uuid.uuid4().hex[:12]}" + + # Stream explanation in realistic chunks + explanation = "Based on your request, I suggest applying the following filter:" + async for chunk in generate_mock_response(explanation): + yield StreamChunk(delta=chunk) + # Small pause before showing the action card + await asyncio.sleep(0.05) + + # Then send the action metadata + action = random.choice(FILTER_SUGGESTIONS) + metadata = ActionMetadataPayload( + action=action, + status="pending", + explanation=f"Filter {action.field} where it {action.operator} {action.value}", + ) + + yield StreamChunk(metadata=metadata) + yield StreamChunk(done=True, message_id=message_id) + + return create_sse_response(generate()) diff --git a/examples/fastapi-example/routes/chat.py b/examples/fastapi-example/routes/chat.py new file mode 100644 index 0000000..8807b22 --- /dev/null +++ b/examples/fastapi-example/routes/chat.py @@ -0,0 +1,27 @@ +"""Basic streaming chat endpoint.""" + +import uuid +from collections.abc import AsyncGenerator + +from fastapi import APIRouter + +from llmpane import ChatRequest, StreamChunk +from llmpane.streaming import create_sse_response +from mock_llm import generate_mock_response + +router = APIRouter() + + +@router.post("/api/chat") +async def chat(request: ChatRequest): + """Basic streaming chat endpoint.""" + + async def generate() -> AsyncGenerator[StreamChunk, None]: + message_id = f"msg_{uuid.uuid4().hex[:12]}" + + async for chunk in generate_mock_response(request.message): + yield StreamChunk(delta=chunk) + + yield StreamChunk(done=True, message_id=message_id) + + return create_sse_response(generate()) diff --git a/examples/fastapi-example/routes/refinement.py b/examples/fastapi-example/routes/refinement.py new file mode 100644 index 0000000..c156c94 --- /dev/null +++ b/examples/fastapi-example/routes/refinement.py @@ -0,0 +1,94 @@ +"""Refinement message pattern endpoint.""" + +import asyncio +import uuid +from collections.abc import AsyncGenerator + +from fastapi import APIRouter +from pydantic import BaseModel + +from llmpane import ChatRequest, StreamChunk +from llmpane.streaming import create_sse_response +from mock_llm import generate_mock_response + +router = APIRouter() + + +class SQLQuery(BaseModel): + """Example output: a SQL query.""" + + query: str + tables: list[str] + estimated_rows: int + + +class RefinementMetadataPayload(BaseModel): + """Metadata for refinement messages.""" + + output: SQLQuery + iteration: int + is_final: bool = False + + +# Simulate refining queries over iterations +QUERY_ITERATIONS = [ + SQLQuery( + query="SELECT * FROM orders", + tables=["orders"], + estimated_rows=10000, + ), + SQLQuery( + query="SELECT id, customer_id, total FROM orders WHERE status = 'completed'", + tables=["orders"], + estimated_rows=5000, + ), + SQLQuery( + query=( + "SELECT o.id, c.name, o.total " + "FROM orders o " + "JOIN customers c ON o.customer_id = c.id " + "WHERE o.status = 'completed' AND o.total > 100" + ), + tables=["orders", "customers"], + estimated_rows=1500, + ), +] + +# Track iteration per conversation (simplified - in production use proper session management) +iteration_tracker: dict[str, int] = {} + + +@router.post("/api/chat/refinement") +async def chat_with_refinement(request: ChatRequest): + """Chat endpoint that returns iteratively refined output.""" + conv_id = request.conversation_id or "default" + + # Get or initialize iteration count + if conv_id not in iteration_tracker: + iteration_tracker[conv_id] = 0 + + current_iteration = iteration_tracker[conv_id] + iteration_tracker[conv_id] = (current_iteration + 1) % len(QUERY_ITERATIONS) + + async def generate() -> AsyncGenerator[StreamChunk, None]: + message_id = f"msg_{uuid.uuid4().hex[:12]}" + + # Stream explanation in realistic chunks + explanation = f"Here's iteration {current_iteration + 1} of the query based on your feedback:" + async for chunk in generate_mock_response(explanation): + yield StreamChunk(delta=chunk) + # Small pause before showing the refinement card + await asyncio.sleep(0.05) + + # Send refinement metadata + query = QUERY_ITERATIONS[current_iteration] + metadata = RefinementMetadataPayload( + output=query, + iteration=current_iteration + 1, + is_final=current_iteration == len(QUERY_ITERATIONS) - 1, + ) + + yield StreamChunk(metadata=metadata) + yield StreamChunk(done=True, message_id=message_id) + + return create_sse_response(generate())