Prerequisites: Completed Parts 1, 2, and 3 of the walkthrough
This section covers best practices, optimization techniques, testing strategies, and deployment considerations for taking your FastAPI + DocumentDB application to production.
Scenario: Optimize database queries using proper indexing and query analysis.
DocumentDB leverages PostgreSQL's powerful indexing capabilities while maintaining MongoDB compatibility.
Index Types Available:
| Index Type | Use Case | DocumentDB Implementation |
|---|---|---|
| B-tree | Equality, range queries | PostgreSQL B-tree on JSONB fields |
| Unique | Enforce uniqueness (SKU, email) | PostgreSQL unique constraint |
| Compound | Multi-field queries | PostgreSQL multi-column index |
| Text | Full-text search | PostgreSQL ts_vector |
| Geospatial | Location queries | PostGIS (2d, 2dsphere) |
| Vector | Similarity search | pgvector (HNSW, IVF) |
File: backend/app/routers/debug.py
"""
Debug and performance analysis endpoints.
"""
from fastapi import APIRouter, Query
from motor.motor_asyncio import AsyncIOMotorClient
from app.core.config import settings
router = APIRouter(prefix="/debug", tags=["debug"])
@router.get("/explain-query")
async def explain_query(
collection: str = Query(..., description="Collection name"),
query_filter: str = Query("{}", description="MongoDB query as JSON string")
):
"""
Explain query execution plan using DocumentDB's EXPLAIN.
This shows how PostgreSQL executes your MongoDB query.
"""
import json
client = AsyncIOMotorClient(
settings.DOCUMENTDB_URL,
tls=True,
tlsAllowInvalidCertificates=True,
)
db = client[settings.DOCUMENTDB_DB_NAME]
try:
# Parse query filter
filter_dict = json.loads(query_filter)
# Use MongoDB's explain command
explain_result = await db.command(
"explain",
{
"find": collection,
"filter": filter_dict
},
verbosity="executionStats"
)
return {
"collection": collection,
"query": filter_dict,
"execution_stats": {
"execution_time_ms": explain_result.get("executionStats", {}).get("executionTimeMillis"),
"total_docs_examined": explain_result.get("executionStats", {}).get("totalDocsExamined"),
"total_keys_examined": explain_result.get("executionStats", {}).get("totalKeysExamined"),
"docs_returned": explain_result.get("executionStats", {}).get("nReturned"),
},
"index_used": explain_result.get("queryPlanner", {}).get("winningPlan", {}).get("inputStage", {}).get("indexName"),
"full_explain": explain_result
}
finally:
client.close()
@router.get("/collection-stats")
async def get_collection_stats(
collection: str = Query(..., description="Collection name")
):
"""Get statistics about a collection (size, indexes, document count)."""
client = AsyncIOMotorClient(
settings.DOCUMENTDB_URL,
tls=True,
tlsAllowInvalidCertificates=True,
)
db = client[settings.DOCUMENTDB_DB_NAME]
try:
# Get collection stats
stats = await db.command("collStats", collection)
# Get indexes
indexes = await db[collection].list_indexes().to_list(length=None)
return {
"collection": collection,
"document_count": stats.get("count"),
"size_bytes": stats.get("size"),
"avg_document_size_bytes": stats.get("avgObjSize"),
"storage_size_bytes": stats.get("storageSize"),
"indexes": [
{
"name": idx.get("name"),
"keys": idx.get("key"),
"unique": idx.get("unique", False)
}
for idx in indexes
],
"total_index_size_bytes": stats.get("totalIndexSize")
}
finally:
client.close()Usage Example:
# Explain a simple query
GET /api/v1/debug/explain-query?collection=products&query_filter={"category":"Electronics"}
# Get collection statistics
GET /api/v1/debug/collection-stats?collection=productsFile: scripts/create_production_indexes.py
"""
Create production-optimized indexes for DocumentDB collections.
"""
import asyncio
from motor.motor_asyncio import AsyncIOMotorClient
import sys
import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from backend.app.core.config import settings
async def create_product_indexes():
"""Create optimized indexes for products collection."""
client = AsyncIOMotorClient(
settings.DOCUMENTDB_URL,
tls=True,
tlsAllowInvalidCertificates=True,
)
db = client[settings.DOCUMENTDB_DB_NAME]
collection = db["products"]
try:
print("Creating indexes for products collection...")
# 1. Single field indexes
await collection.create_index("name", name="idx_products_name")
print("✓ Created index: idx_products_name")
await collection.create_index("category", name="idx_products_category")
print("✓ Created index: idx_products_category")
await collection.create_index(
"sku",
unique=True,
name="idx_products_sku_unique"
)
print("✓ Created unique index: idx_products_sku_unique")
# 2. Compound indexes for common query patterns
await collection.create_index(
[("category", 1), ("price", 1)],
name="idx_products_category_price"
)
print("✓ Created compound index: idx_products_category_price")
await collection.create_index(
[("is_active", 1), ("created_at", -1)],
name="idx_products_active_created"
)
print("✓ Created compound index: idx_products_active_created")
# 3. Text index for search
await collection.create_index(
[("name", "text"), ("description", "text")],
name="idx_products_text_search",
weights={"name": 10, "description": 5} # Name more important
)
print("✓ Created text index: idx_products_text_search")
# 4. Sparse index for optional fields
await collection.create_index(
"tags",
name="idx_products_tags",
sparse=True # Only index documents with tags field
)
print("✓ Created sparse index: idx_products_tags")
# List all indexes
indexes = await collection.list_indexes().to_list(length=None)
print("\n✅ All indexes on products collection:")
for idx in indexes:
print(f" - {idx['name']}: {idx.get('key', {})}")
finally:
client.close()
async def create_order_indexes():
"""Create optimized indexes for orders collection."""
client = AsyncIOMotorClient(
settings.DOCUMENTDB_URL,
tls=True,
tlsAllowInvalidCertificates=True,
)
db = client[settings.DOCUMENTDB_DB_NAME]
collection = db["orders"]
try:
print("\nCreating indexes for orders collection...")
# Customer lookup
await collection.create_index("customer_email", name="idx_orders_customer")
print("✓ Created index: idx_orders_customer")
# Status filtering
await collection.create_index("status", name="idx_orders_status")
print("✓ Created index: idx_orders_status")
# Date range queries
await collection.create_index(
[("created_at", -1)],
name="idx_orders_created_desc"
)
print("✓ Created index: idx_orders_created_desc")
# Compound index for customer orders by date
await collection.create_index(
[("customer_email", 1), ("created_at", -1)],
name="idx_orders_customer_created"
)
print("✓ Created compound index: idx_orders_customer_created")
# Status and date for admin queries
await collection.create_index(
[("status", 1), ("created_at", -1)],
name="idx_orders_status_created"
)
print("✓ Created compound index: idx_orders_status_created")
print("\n✅ All indexes created for orders collection")
finally:
client.close()
async def create_review_indexes():
"""Create optimized indexes for reviews collection."""
client = AsyncIOMotorClient(
settings.DOCUMENTDB_URL,
tls=True,
tlsAllowInvalidCertificates=True,
)
db = client[settings.DOCUMENTDB_DB_NAME]
collection = db["reviews"]
try:
print("\nCreating indexes for reviews collection...")
# Product reviews lookup
await collection.create_index(
[("product_id", 1), ("created_at", -1)],
name="idx_reviews_product_created"
)
print("✓ Created compound index: idx_reviews_product_created")
# Rating filtering
await collection.create_index(
[("product_id", 1), ("rating", -1)],
name="idx_reviews_product_rating"
)
print("✓ Created compound index: idx_reviews_product_rating")
# Customer's reviews
await collection.create_index(
"customer_email",
name="idx_reviews_customer"
)
print("✓ Created index: idx_reviews_customer")
# Prevent duplicate reviews (unique compound)
await collection.create_index(
[("product_id", 1), ("customer_email", 1)],
unique=True,
name="idx_reviews_product_customer_unique"
)
print("✓ Created unique compound index: idx_reviews_product_customer_unique")
print("\n✅ All indexes created for reviews collection")
finally:
client.close()
async def main():
"""Create all production indexes."""
print("=" * 60)
print("Creating Production Indexes for DocumentDB")
print("=" * 60)
await create_product_indexes()
await create_order_indexes()
await create_review_indexes()
print("\n" + "=" * 60)
print("✅ All production indexes created successfully!")
print("=" * 60)
if __name__ == "__main__":
asyncio.run(main())Run the script:
python scripts/create_production_indexes.py❌ Inefficient Query (No Index):
# Scans entire collection
products = await Product.find(
Product.description.regex("wireless", "i") # Case-insensitive regex
).to_list()
# Execution time: ~500ms for 10,000 products✅ Optimized Query (Using Index):
# Uses text index
products = await Product.find(
{"$text": {"$search": "wireless"}}
).to_list()
# Execution time: ~50ms for 10,000 productsCommon Optimization Patterns:
# 1. Use covered indexes (index contains all needed fields)
# Query returns only indexed fields - no document lookup needed
products = await Product.find(
{"category": "Electronics"},
projection={"name": 1, "category": 1, "_id": 0} # Only indexed fields
).to_list()
# 2. Use compound indexes for multi-field queries
# Index: [("category", 1), ("price", 1)]
products = await Product.find(
Product.category == "Electronics",
Product.price >= 20,
Product.price <= 100
).sort("price").to_list()
# 3. Limit results early
# Always use .limit() when you don't need all results
recent_products = await Product.find(
Product.is_active == True
).sort("-created_at").limit(20).to_list()
# 4. Use projection to reduce data transfer
# Only fetch fields you need
products = await Product.find(
{"category": "Electronics"}
).project({"name": 1, "price": 1, "sku": 1}).to_list()File: backend/app/core/database.py (Enhanced)
"""
Enhanced database connection with production-ready settings.
"""
from typing import Optional
from motor.motor_asyncio import AsyncIOMotorClient
from beanie import init_beanie
from contextlib import asynccontextmanager
from app.core.config import settings
from app.models.product import Product
from app.models.order import Order
from app.models.customer import Customer
from app.models.review import Review
class Database:
"""Database connection manager with connection pooling."""
client: Optional[AsyncIOMotorClient] = None
@classmethod
async def connect_db(cls):
"""
Connect to DocumentDB with production-optimized settings.
"""
if cls.client is not None:
return
# Production connection settings
cls.client = AsyncIOMotorClient(
settings.DOCUMENTDB_URL,
tls=True,
tlsAllowInvalidCertificates=True, # Use proper certs in production!
# Connection pool settings
maxPoolSize=50, # Max connections in pool
minPoolSize=10, # Min connections to maintain
maxIdleTimeMS=45000, # Close idle connections after 45s
# Timeout settings
serverSelectionTimeoutMS=5000, # Fail fast if can't connect
connectTimeoutMS=10000, # Connection timeout
socketTimeoutMS=30000, # Socket operation timeout
# Retry settings
retryWrites=True, # Retry failed writes
retryReads=True, # Retry failed reads
# Monitoring
appname="ecommerce-api", # Appears in logs
)
# Initialize Beanie ODM
await init_beanie(
database=cls.client[settings.DOCUMENTDB_DB_NAME],
document_models=[
Product,
Order,
Customer,
Review,
],
)
# Verify connection
await cls.client.admin.command('ping')
print("✓ Connected to DocumentDB with connection pool")
@classmethod
async def close_db(cls):
"""Close database connection and cleanup resources."""
if cls.client is None:
return
cls.client.close()
cls.client = None
print("✓ Closed DocumentDB connection")
@classmethod
def get_client(cls) -> AsyncIOMotorClient:
"""Get the database client."""
if cls.client is None:
raise RuntimeError("Database not connected. Call connect_db() first.")
return cls.clientFile: backend/app/main.py (Enhanced with lifespan)
"""
FastAPI application with production-ready configuration.
"""
from contextlib import asynccontextmanager
from fastapi import FastAPI, Request, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.gzip import GZipMiddleware
from fastapi.responses import JSONResponse
import time
import logging
from app.core.database import Database
from app.core.config import settings
from app.routers import products, customers, orders, admin, reviews, debug
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Application lifespan manager.
Handles startup and shutdown events.
"""
# Startup
logger.info("Starting application...")
try:
await Database.connect_db()
logger.info("✓ Database connection established")
except Exception as e:
logger.error(f"✗ Failed to connect to database: {e}")
raise
yield # Application runs here
# Shutdown
logger.info("Shutting down application...")
await Database.close_db()
logger.info("✓ Database connection closed")
# Create FastAPI application
app = FastAPI(
title="E-Commerce API",
description="Production-ready FastAPI + DocumentDB e-commerce API",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc",
lifespan=lifespan,
)
# Middleware configuration
app.add_middleware(
CORSMiddleware,
allow_origins=settings.ALLOWED_ORIGINS.split(","),
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Enable response compression
app.add_middleware(GZipMiddleware, minimum_size=1000)
# Request timing middleware
@app.middleware("http")
async def add_process_time_header(request: Request, call_next):
"""Add X-Process-Time header to responses."""
start_time = time.time()
response = await call_next(request)
process_time = time.time() - start_time
response.headers["X-Process-Time"] = f"{process_time:.4f}s"
# Log slow requests
if process_time > 1.0:
logger.warning(
f"Slow request: {request.method} {request.url.path} "
f"took {process_time:.2f}s"
)
return response
# Global exception handler
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception):
"""Handle unexpected exceptions gracefully."""
logger.error(f"Unhandled exception: {exc}", exc_info=True)
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={
"error": "Internal server error",
"message": "An unexpected error occurred. Please try again later.",
"request_id": request.headers.get("X-Request-ID", "unknown")
}
)
# Health check endpoint
@app.get("/health", tags=["health"])
async def health_check():
"""
Health check endpoint for load balancers and monitoring.
Returns 200 if application and database are healthy.
"""
try:
# Check database connection
client = Database.get_client()
await client.admin.command('ping')
return {
"status": "healthy",
"database": "connected",
"version": "1.0.0"
}
except Exception as e:
logger.error(f"Health check failed: {e}")
return JSONResponse(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
content={
"status": "unhealthy",
"database": "disconnected",
"error": str(e)
}
)
# Readiness check (for Kubernetes)
@app.get("/ready", tags=["health"])
async def readiness_check():
"""
Readiness check for Kubernetes.
Returns 200 when application is ready to accept traffic.
"""
try:
client = Database.get_client()
await client.admin.command('ping')
return {"status": "ready"}
except Exception:
return JSONResponse(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
content={"status": "not ready"}
)
# Register routers
app.include_router(products.router, prefix="/api/v1")
app.include_router(customers.router, prefix="/api/v1")
app.include_router(orders.router, prefix="/api/v1")
app.include_router(admin.router, prefix="/api/v1")
app.include_router(reviews.router, prefix="/api/v1")
# Debug router (disable in production!)
if settings.DEBUG:
app.include_router(debug.router, prefix="/api/v1")
# Root endpoint
@app.get("/", tags=["root"])
async def root():
"""API root endpoint."""
return {
"message": "Welcome to E-Commerce API",
"version": "1.0.0",
"docs": "/docs",
"health": "/health"
}File: backend/app/core/exceptions.py
"""
Custom exception classes for better error handling.
"""
from fastapi import HTTPException, status
class DatabaseConnectionError(HTTPException):
"""Raised when database connection fails."""
def __init__(self, detail: str = "Database connection failed"):
super().__init__(
status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
detail=detail
)
class ResourceNotFoundError(HTTPException):
"""Raised when a requested resource doesn't exist."""
def __init__(self, resource: str, identifier: str):
super().__init__(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"{resource} with ID '{identifier}' not found"
)
class DuplicateResourceError(HTTPException):
"""Raised when trying to create a duplicate resource."""
def __init__(self, resource: str, field: str, value: str):
super().__init__(
status_code=status.HTTP_409_CONFLICT,
detail=f"{resource} with {field}='{value}' already exists"
)
class ValidationError(HTTPException):
"""Raised when business logic validation fails."""
def __init__(self, detail: str):
super().__init__(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=detail
)
class InsufficientStockError(HTTPException):
"""Raised when product stock is insufficient for order."""
def __init__(self, product_name: str, available: int, requested: int):
super().__init__(
status_code=status.HTTP_409_CONFLICT,
detail=f"Insufficient stock for {product_name}. "
f"Available: {available}, Requested: {requested}"
)Usage in endpoints:
from app.core.exceptions import ResourceNotFoundError, DuplicateResourceError
@router.post("", response_model=ProductResponse)
async def create_product(product_data: ProductCreate):
# Check for duplicate
existing = await Product.find_one(Product.sku == product_data.sku)
if existing:
raise DuplicateResourceError("Product", "SKU", product_data.sku)
product = Product(**product_data.model_dump())
await product.insert()
return ProductResponse(**product.model_dump())
@router.get("/{product_id}", response_model=ProductResponse)
async def get_product(product_id: str):
product = await Product.get(product_id)
if not product:
raise ResourceNotFoundError("Product", product_id)
return ProductResponse(**product.model_dump())File: backend/app/core/config.py (Production-ready)
"""
Application configuration using pydantic-settings.
"""
from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import Optional
class Settings(BaseSettings):
"""Application settings loaded from environment variables."""
# Application
APP_NAME: str = "E-Commerce API"
DEBUG: bool = False
VERSION: str = "1.0.0"
# DocumentDB Connection
DOCUMENTDB_URL: str
DOCUMENTDB_DB_NAME: str = "ecommerce"
# Security
SECRET_KEY: str
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# CORS
ALLOWED_ORIGINS: str = "http://localhost:3000,http://localhost:8000"
# Rate Limiting
RATE_LIMIT_REQUESTS: int = 100
RATE_LIMIT_PERIOD: int = 60 # seconds
# Logging
LOG_LEVEL: str = "INFO"
LOG_FORMAT: str = "json" # json or text
# Monitoring
SENTRY_DSN: Optional[str] = None
ENABLE_METRICS: bool = True
# Feature Flags
ENABLE_VECTOR_SEARCH: bool = True
ENABLE_GEOSPATIAL: bool = True
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=True
)
# Global settings instance
settings = Settings().env file for production:
# Application
DEBUG=False
SECRET_KEY=your-super-secret-key-change-in-production
# DocumentDB
DOCUMENTDB_URL=mongodb://admin:password@documentdb-cluster.us-east-1.docdb.amazonaws.com:27017/?tls=true&tlsCAFile=rds-combined-ca-bundle.pem
DOCUMENTDB_DB_NAME=ecommerce
# CORS
ALLOWED_ORIGINS=https://yourdomain.com,https://www.yourdomain.com
# Rate Limiting
RATE_LIMIT_REQUESTS=1000
RATE_LIMIT_PERIOD=60
# Monitoring
SENTRY_DSN=https://your-sentry-dsn@sentry.io/project
ENABLE_METRICS=TrueFile: backend/tests/test_product_service.py
"""
Unit tests for product service logic.
"""
import pytest
from decimal import Decimal
from app.models.product import Product
from app.schemas.product import ProductCreate
@pytest.mark.asyncio
async def test_create_product_success():
"""Test successful product creation."""
product_data = ProductCreate(
name="Test Product",
description="Test description",
price=Decimal("99.99"),
sku="TEST-PROD-001",
category="Electronics",
stock_quantity=100
)
product = Product(**product_data.model_dump())
assert product.name == "Test Product"
assert product.price == Decimal("99.99")
assert product.sku == "TEST-PROD-001"
def test_product_validation_negative_price():
"""Test that negative price raises validation error."""
with pytest.raises(ValueError):
ProductCreate(
name="Test Product",
price=Decimal("-10.00"), # Invalid
sku="TEST-001",
category="Electronics"
)
def test_product_validation_empty_name():
"""Test that empty name raises validation error."""
with pytest.raises(ValueError):
ProductCreate(
name="", # Invalid
price=Decimal("99.99"),
sku="TEST-001",
category="Electronics"
)File: backend/tests/test_api_integration.py
"""
Integration tests for API endpoints.
"""
import pytest
from httpx import AsyncClient
from app.main import app
@pytest.mark.asyncio
async def test_health_endpoint():
"""Test health check endpoint."""
async with AsyncClient(app=app, base_url="http://test") as client:
response = await client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
@pytest.mark.asyncio
async def test_create_product_integration():
"""Test product creation through API."""
async with AsyncClient(app=app, base_url="http://test") as client:
product_data = {
"name": "Integration Test Product",
"price": 49.99,
"sku": "INT-TEST-001",
"category": "Test",
"stock_quantity": 10
}
response = await client.post("/api/v1/products", json=product_data)
assert response.status_code == 201
data = response.json()
assert data["name"] == "Integration Test Product"
assert "_id" in data
@pytest.mark.asyncio
async def test_create_duplicate_sku():
"""Test that duplicate SKU returns 409 Conflict."""
async with AsyncClient(app=app, base_url="http://test") as client:
product_data = {
"name": "Test Product",
"price": 49.99,
"sku": "DUP-TEST-001",
"category": "Test"
}
# First creation should succeed
response1 = await client.post("/api/v1/products", json=product_data)
assert response1.status_code == 201
# Second creation should fail
response2 = await client.post("/api/v1/products", json=product_data)
assert response2.status_code == 409File: tests/locustfile.py (using Locust)
"""
Load testing with Locust.
Run with: locust -f tests/locustfile.py --host=http://localhost:8000
"""
from locust import HttpUser, task, between
import random
class EcommerceUser(HttpUser):
"""Simulate e-commerce user behavior."""
wait_time = between(1, 3) # Wait 1-3 seconds between tasks
def on_start(self):
"""Called when a user starts."""
# Could add login logic here
pass
@task(3) # Weight: 3 (runs 3x more often)
def list_products(self):
"""List products with pagination."""
page = random.randint(1, 10)
self.client.get(f"/api/v1/products?page={page}&page_size=20")
@task(2)
def get_product_details(self):
"""Get a specific product."""
# Assuming we know some product IDs
product_id = "507f1f77bcf86cd799439011"
self.client.get(f"/api/v1/products/{product_id}")
@task(1)
def search_products(self):
"""Search for products."""
categories = ["Electronics", "Books", "Clothing"]
category = random.choice(categories)
self.client.get(f"/api/v1/products?category={category}")
@task(1)
def create_order(self):
"""Create a new order."""
order_data = {
"customer_email": f"user{random.randint(1, 1000)}@test.com",
"items": [
{
"product_id": "507f1f77bcf86cd799439011",
"quantity": random.randint(1, 3),
"price": 29.99
}
],
"total_amount": 29.99
}
self.client.post("/api/v1/orders", json=order_data)Run load test:
pip install locust
locust -f tests/locustfile.py --host=http://localhost:8000 --users 100 --spawn-rate 10File: backend/Dockerfile.prod
FROM python:3.11-slim
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && apt-get install -y \
gcc \
&& rm -rf /var/lib/apt/lists/*
# Copy requirements
COPY requirements.txt .
# Install Python dependencies
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY ./app ./app
# Create non-root user
RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app
USER appuser
# Health check
HEALTHCHECK --interval=30s --timeout=3s --start-period=40s --retries=3 \
CMD python -c "import requests; requests.get('http://localhost:8000/health')"
# Run with Gunicorn
CMD ["gunicorn", "app.main:app", "--workers", "4", "--worker-class", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"]File: k8s/deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: ecommerce-api
labels:
app: ecommerce-api
spec:
replicas: 3
selector:
matchLabels:
app: ecommerce-api
template:
metadata:
labels:
app: ecommerce-api
spec:
containers:
- name: api
image: your-registry/ecommerce-api:latest
ports:
- containerPort: 8000
env:
- name: DOCUMENTDB_URL
valueFrom:
secretKeyRef:
name: documentdb-secret
key: connection-string
- name: SECRET_KEY
valueFrom:
secretKeyRef:
name: api-secret
key: secret-key
resources:
requests:
memory: "256Mi"
cpu: "250m"
limits:
memory: "512Mi"
cpu: "500m"
livenessProbe:
httpGet:
path: /health
port: 8000
initialDelaySeconds: 30
periodSeconds: 10
readinessProbe:
httpGet:
path: /ready
port: 8000
initialDelaySeconds: 5
periodSeconds: 5
---
apiVersion: v1
kind: Service
metadata:
name: ecommerce-api-service
spec:
selector:
app: ecommerce-api
ports:
- protocol: TCP
port: 80
targetPort: 8000
type: LoadBalancerFile: backend/app/middleware/metrics.py
"""
Prometheus metrics middleware.
"""
from prometheus_client import Counter, Histogram, generate_latest
from fastapi import Request
import time
# Define metrics
request_count = Counter(
'api_requests_total',
'Total API requests',
['method', 'endpoint', 'status']
)
request_duration = Histogram(
'api_request_duration_seconds',
'API request duration',
['method', 'endpoint']
)
async def metrics_middleware(request: Request, call_next):
"""Collect metrics for each request."""
method = request.method
path = request.url.path
# Start timer
start_time = time.time()
# Process request
response = await call_next(request)
# Record metrics
duration = time.time() - start_time
request_duration.labels(method=method, endpoint=path).observe(duration)
request_count.labels(
method=method,
endpoint=path,
status=response.status_code
).inc()
return responseAdd metrics endpoint:
from prometheus_client import generate_latest
@app.get("/metrics")
async def metrics():
"""Prometheus metrics endpoint."""
return Response(
content=generate_latest(),
media_type="text/plain"
)Choose one to demonstrate your mastery:
- Implement complete vector search with sentence transformers
- Create "Similar Products" and "Frequently Bought Together" features
- Build recommendation API with A/B testing support
- Add WebSocket support for live stock updates
- Implement distributed locking for concurrent order processing
- Create admin dashboard with real-time metrics
- Deploy to multiple regions (US, EU, Asia)
- Implement geolocation-based routing
- Add read replicas for DocumentDB
- Setup global CDN with CloudFront/Cloudflare
- Add user authentication (JWT)
- Implement shopping cart with Redis caching
- Add payment integration (Stripe)
- Create order tracking system
- Build admin analytics dashboard
- ✅ Optimal database indexes created
- ✅ Connection pooling configured
- ✅ Query performance analyzed with EXPLAIN
- ✅ Response compression enabled
- ✅ Caching strategy implemented
- ✅ Health check endpoints
- ✅ Graceful shutdown handling
- ✅ Proper error handling and logging
- ✅ Database connection retry logic
- ✅ Circuit breakers for external services
- ✅ Environment variables for secrets
- ✅ CORS properly configured
- ✅ Rate limiting implemented
- ✅ Input validation on all endpoints
- ✅ SQL injection prevention (JSONB queries)
- ✅ Prometheus metrics exposed
- ✅ Structured logging
- ✅ Performance monitoring
- ✅ Error tracking (Sentry)
- ✅ Database query monitoring
- ✅ Unit tests for business logic
- ✅ Integration tests for APIs
- ✅ Load testing completed
- ✅ Test coverage > 80%
- ✅ Dockerfile optimized
- ✅ Kubernetes manifests ready
- ✅ CI/CD pipeline configured
- ✅ Blue-green deployment strategy
- ✅ Rollback procedures documented
🎉 Congratulations! You've built a production-ready FastAPI + DocumentDB application with best practices for performance, reliability, and scalability!