diff --git a/01_getting_started/.gitignore b/01_getting_started/.gitignore new file mode 100644 index 0000000..ddc7a88 --- /dev/null +++ b/01_getting_started/.gitignore @@ -0,0 +1 @@ +.flash/ diff --git a/01_getting_started/01_hello_world/README.md b/01_getting_started/01_hello_world/README.md index 1764184..7ecc27d 100644 --- a/01_getting_started/01_hello_world/README.md +++ b/01_getting_started/01_hello_world/README.md @@ -33,7 +33,7 @@ Server starts at **http://localhost:8888** Visit **http://localhost:8888/docs** for interactive API documentation. QB endpoints are auto-generated by `flash run` based on your `@remote` functions. ```bash -curl -X POST http://localhost:8888/gpu_worker/run_sync \ +curl -X POST http://localhost:8888/gpu_worker/runsync \ -H "Content-Type: application/json" \ -d '{"message": "Hello GPU!"}' ``` diff --git a/01_getting_started/01_hello_world/gpu_worker.py b/01_getting_started/01_hello_world/gpu_worker.py index 6fbd837..be7cb06 100644 --- a/01_getting_started/01_hello_world/gpu_worker.py +++ b/01_getting_started/01_hello_world/gpu_worker.py @@ -13,7 +13,7 @@ @remote(resource_config=gpu_config) -async def gpu_hello(input_data: dict) -> dict: +async def gpu_hello(payload: dict) -> dict: """Simple GPU worker that returns GPU hardware info.""" import platform from datetime import datetime @@ -25,7 +25,7 @@ async def gpu_hello(input_data: dict) -> dict: gpu_count = torch.cuda.device_count() gpu_memory = torch.cuda.get_device_properties(0).total_memory / (1024**3) - message = input_data.get("message", "Hello from GPU worker!") + message = payload.get("message", "Hello from GPU worker!") return { "status": "success", diff --git a/01_getting_started/02_cpu_worker/.gitignore b/01_getting_started/02_cpu_worker/.gitignore index ed4682d..60819c5 100644 --- a/01_getting_started/02_cpu_worker/.gitignore +++ b/01_getting_started/02_cpu_worker/.gitignore @@ -46,3 +46,4 @@ uv.lock # OS .DS_Store Thumbs.db +.flash/ diff --git a/01_getting_started/02_cpu_worker/README.md b/01_getting_started/02_cpu_worker/README.md index 28741f4..967776d 100644 --- a/01_getting_started/02_cpu_worker/README.md +++ b/01_getting_started/02_cpu_worker/README.md @@ -33,7 +33,7 @@ Server starts at **http://localhost:8888** Visit **http://localhost:8888/docs** for interactive API documentation. QB endpoints are auto-generated by `flash run` based on your `@remote` functions. ```bash -curl -X POST http://localhost:8888/cpu_worker/run_sync \ +curl -X POST http://localhost:8888/cpu_worker/runsync \ -H "Content-Type: application/json" \ -d '{"name": "Flash User"}' ``` diff --git a/01_getting_started/02_cpu_worker/cpu_worker.py b/01_getting_started/02_cpu_worker/cpu_worker.py index 429d1dc..f17c321 100644 --- a/01_getting_started/02_cpu_worker/cpu_worker.py +++ b/01_getting_started/02_cpu_worker/cpu_worker.py @@ -13,12 +13,12 @@ @remote(resource_config=cpu_config) -async def cpu_hello(input_data: dict) -> dict: +async def cpu_hello(payload: dict) -> dict: """Simple CPU worker that returns a greeting.""" import platform from datetime import datetime - message = f"Hello, {input_data.get('name', 'Anonymous Panda')}!" + message = f"Hello, {payload.get('name', 'Anonymous Panda')}!" return { "status": "success", diff --git a/01_getting_started/03_mixed_workers/README.md b/01_getting_started/03_mixed_workers/README.md index dba3533..1e99a94 100644 --- a/01_getting_started/03_mixed_workers/README.md +++ b/01_getting_started/03_mixed_workers/README.md @@ -75,11 +75,11 @@ curl -X POST http://localhost:8888/classify \ -d '{"text": "This product is amazing! I love it!"}' # Individual stages -curl -X POST http://localhost:8888/cpu_worker/run_sync \ +curl -X POST http://localhost:8888/cpu_worker/runsync \ -H "Content-Type: application/json" \ -d '{"text": "Test message"}' -curl -X POST http://localhost:8888/gpu_worker/run_sync \ +curl -X POST http://localhost:8888/gpu_worker/runsync \ -H "Content-Type: application/json" \ -d '{"cleaned_text": "Test message", "word_count": 2}' ``` diff --git a/01_getting_started/03_mixed_workers/cpu_worker.py b/01_getting_started/03_mixed_workers/cpu_worker.py index 4a5a621..ef4a238 100644 --- a/01_getting_started/03_mixed_workers/cpu_worker.py +++ b/01_getting_started/03_mixed_workers/cpu_worker.py @@ -12,12 +12,12 @@ @remote(resource_config=cpu_config) -async def preprocess_text(input_data: dict) -> dict: +async def preprocess_text(payload: dict) -> dict: """Preprocess text: cleaning and tokenization (cheap CPU work).""" import re from datetime import datetime - text = input_data.get("text", "") + text = payload.get("text", "") cleaned_text = text.strip() cleaned_text = re.sub(r"\s+", " ", cleaned_text) @@ -39,13 +39,13 @@ async def preprocess_text(input_data: dict) -> dict: @remote(resource_config=cpu_config) -async def postprocess_results(input_data: dict) -> dict: +async def postprocess_results(payload: dict) -> dict: """Postprocess GPU results: formatting and aggregation (cheap CPU work).""" from datetime import datetime - predictions = input_data.get("predictions", []) - original_text = input_data.get("original_text", "") - metadata = input_data.get("metadata", {}) + predictions = payload.get("predictions", []) + original_text = payload.get("original_text", "") + metadata = payload.get("metadata", {}) if predictions: top_prediction = max(predictions, key=lambda x: x["confidence"]) diff --git a/01_getting_started/03_mixed_workers/gpu_worker.py b/01_getting_started/03_mixed_workers/gpu_worker.py index 064adfb..00414cc 100644 --- a/01_getting_started/03_mixed_workers/gpu_worker.py +++ b/01_getting_started/03_mixed_workers/gpu_worker.py @@ -14,15 +14,15 @@ @remote(resource_config=gpu_config, dependencies=["torch"]) -async def gpu_inference(input_data: dict) -> dict: +async def gpu_inference(payload: dict) -> dict: """GPU inference: mock sentiment classification.""" import random from datetime import datetime import torch - cleaned_text = input_data.get("cleaned_text", "") - word_count = input_data.get("word_count", 0) + cleaned_text = payload.get("cleaned_text", "") + word_count = payload.get("word_count", 0) gpu_available = torch.cuda.is_available() if gpu_available: diff --git a/01_getting_started/03_mixed_workers/pipeline.py b/01_getting_started/03_mixed_workers/pipeline.py index 62b6869..155df7d 100644 --- a/01_getting_started/03_mixed_workers/pipeline.py +++ b/01_getting_started/03_mixed_workers/pipeline.py @@ -34,3 +34,12 @@ async def classify(text: str) -> dict: }, } ) + + +if __name__ == "__main__": + import asyncio + + test_text = "This is a test message for the classification pipeline." + print(f"Testing classify with text: {test_text}") + result = asyncio.run(classify(test_text)) + print(f"Result: {result}") diff --git a/01_getting_started/04_dependencies/.gitignore b/01_getting_started/04_dependencies/.gitignore index ed4682d..60819c5 100644 --- a/01_getting_started/04_dependencies/.gitignore +++ b/01_getting_started/04_dependencies/.gitignore @@ -46,3 +46,4 @@ uv.lock # OS .DS_Store Thumbs.db +.flash/ diff --git a/01_getting_started/04_dependencies/cpu_worker.py b/01_getting_started/04_dependencies/cpu_worker.py index 9abac4e..9d9fa6e 100644 --- a/01_getting_started/04_dependencies/cpu_worker.py +++ b/01_getting_started/04_dependencies/cpu_worker.py @@ -29,7 +29,7 @@ "matplotlib", ], ) -async def process_data(input_data: dict) -> dict: +async def process_data(payload: dict) -> dict: """ Worker with data science dependencies. @@ -46,7 +46,7 @@ async def process_data(input_data: dict) -> dict: import pandas as pd import scipy - data = input_data.get("data", [[1, 2], [3, 4], [5, 6]]) + data = payload.get("data", [[1, 2], [3, 4], [5, 6]]) # Create DataFrame and compute statistics df = pd.DataFrame(data, columns=["A", "B"]) @@ -80,7 +80,7 @@ async def process_data(input_data: dict) -> dict: @remote(resource_config=minimal_config) # No dependencies! -async def minimal_process(input_data: dict) -> dict: +async def minimal_process(payload: dict) -> dict: """ Worker with NO external dependencies. @@ -93,7 +93,7 @@ async def minimal_process(input_data: dict) -> dict: import re from datetime import datetime - text = input_data.get("text", "") + text = payload.get("text", "") # Built-in operations only word_count = len(text.split()) diff --git a/01_getting_started/04_dependencies/gpu_worker.py b/01_getting_started/04_dependencies/gpu_worker.py index 64970bf..73227da 100644 --- a/01_getting_started/04_dependencies/gpu_worker.py +++ b/01_getting_started/04_dependencies/gpu_worker.py @@ -29,7 +29,7 @@ "numpy<2.0.0", # Maximum version constraint ], ) -async def process_with_ml_libs(input_data: dict) -> dict: +async def process_with_ml_libs(payload: dict) -> dict: """ Worker with versioned Python dependencies. @@ -74,7 +74,7 @@ async def process_with_ml_libs(input_data: dict) -> dict: dependencies=["opencv-python", "requests"], system_dependencies=["ffmpeg", "libgl1"], # System packages via apt ) -async def process_with_system_deps(input_data: dict) -> dict: +async def process_with_system_deps(payload: dict) -> dict: """ Worker with system-level dependencies. diff --git a/02_ml_inference/01_text_to_speech/README.md b/02_ml_inference/01_text_to_speech/README.md index c669660..2157f60 100644 --- a/02_ml_inference/01_text_to_speech/README.md +++ b/02_ml_inference/01_text_to_speech/README.md @@ -43,14 +43,14 @@ Visit http://localhost:8888/docs for interactive API documentation. QB endpoints **Generate speech (JSON with base64 audio):** ```bash -curl -X POST http://localhost:8888/gpu_worker/run_sync \ +curl -X POST http://localhost:8888/gpu_worker/runsync \ -H "Content-Type: application/json" \ -d '{"text": "Hello world!", "speaker": "Ryan", "language": "English"}' ``` **List available voices:** ```bash -curl -X POST http://localhost:8888/gpu_worker/run_sync \ +curl -X POST http://localhost:8888/gpu_worker/runsync \ -H "Content-Type: application/json" \ -d '{}' ``` diff --git a/02_ml_inference/01_text_to_speech/gpu_worker.py b/02_ml_inference/01_text_to_speech/gpu_worker.py index 87eaff9..fe8f297 100644 --- a/02_ml_inference/01_text_to_speech/gpu_worker.py +++ b/02_ml_inference/01_text_to_speech/gpu_worker.py @@ -23,7 +23,7 @@ "soundfile", ], ) -async def generate_speech(input_data: dict) -> dict: +async def generate_speech(payload: dict) -> dict: """ Generate speech using Qwen3-TTS-12Hz-1.7B-CustomVoice model. @@ -72,10 +72,10 @@ async def generate_speech(input_data: dict) -> dict: "Auto", ] - text = input_data.get("text", "Hello, this is a test.") - speaker = input_data.get("speaker", "Ryan") - language = input_data.get("language", "Auto") - instruct = input_data.get("instruct", "") + text = payload.get("text", "Hello, this is a test.") + speaker = payload.get("speaker", "Ryan") + language = payload.get("language", "Auto") + instruct = payload.get("instruct", "") if speaker not in valid_speakers: return { @@ -133,7 +133,7 @@ async def generate_speech(input_data: dict) -> dict: @remote(resource_config=gpu_config, dependencies=["qwen-tts"]) -async def get_voices(input_data: dict) -> dict: +async def get_voices(payload: dict) -> dict: """Get available voices and languages.""" speakers = { "Vivian": "Bright, slightly edgy young female voice (Chinese native)", diff --git a/04_scaling_performance/01_autoscaling/README.md b/04_scaling_performance/01_autoscaling/README.md new file mode 100644 index 0000000..2baa7cf --- /dev/null +++ b/04_scaling_performance/01_autoscaling/README.md @@ -0,0 +1,181 @@ +# Autoscaling: Worker Scaling Strategies + +Configure Flash worker autoscaling for different workload patterns. This example demonstrates five scaling configurations across GPU and CPU workers, from cost-optimized scale-to-zero to latency-optimized always-on. + +## Quick Start + +**Prerequisites**: Complete the [repository setup](../../README.md#quick-start) first (clone, `make dev`, set API key). + +```bash +cd 04_scaling_performance/01_autoscaling +flash run +``` + +Server starts at http://localhost:8888 -- visit http://localhost:8888/docs for interactive API docs. + +### Test Individual Strategies + +```bash +# Scale-to-zero GPU worker +curl -X POST http://localhost:8888/gpu_worker/runsync \ + -H "Content-Type: application/json" \ + -d '{"matrix_size": 512}' + +# Always-on GPU worker (same payload, different endpoint) +curl -X POST http://localhost:8888/gpu_worker/runsync \ + -H "Content-Type: application/json" \ + -d '{"matrix_size": 512}' + +# CPU scale-to-zero +curl -X POST http://localhost:8888/cpu_worker/runsync \ + -H "Content-Type: application/json" \ + -d '{"text": "Hello autoscaling"}' +``` + +## Scaling Strategies + +### GPU Workers (`gpu_worker.py`) + +| Strategy | workersMin | workersMax | idleTimeout | scalerType | scalerValue | Use Case | +|----------|-----------|-----------|-------------|------------|-------------|----------| +| Scale to Zero | 0 | 3 | 5 min | QUEUE_DELAY | 4 | Sporadic/batch, cost-first | +| Always On | 1 | 3 | 60 min | QUEUE_DELAY | 4 | Steady traffic, latency-first | +| High Throughput | 2 | 10 | 30 min | REQUEST_COUNT | 3 | Bursty traffic, throughput-first | + +### CPU Workers (`cpu_worker.py`) + +| Strategy | workersMin | workersMax | idleTimeout | Use Case | +|----------|-----------|-----------|-------------|----------| +| Scale to Zero | 0 | 5 | 5 min | Cost-optimized preprocessing | +| Burst Ready | 1 | 10 | 30 min | Always-warm API gateway | + +## How Autoscaling Works + +``` +Requests arrive + | + v ++-------------------+ +| Request Queue | <-- scalerType monitors this ++-------------------+ + | + v ++-------------------+ scale up +| Scaler Logic | ----------------> Start new workers +| (QUEUE_DELAY or | (up to workersMax) +| REQUEST_COUNT) | ++-------------------+ + | + v ++-------------------+ idle > idleTimeout +| Active Workers | ----------------> Terminate worker +| (workersMin..Max) | (down to workersMin) ++-------------------+ +``` + +**Scaler types:** + +- **QUEUE_DELAY** -- Scales based on how long requests wait in the queue. `scalerValue` is the target queue delay in seconds. Good for latency-sensitive workloads. +- **REQUEST_COUNT** -- Scales based on pending request count per worker. `scalerValue` is the target requests per worker. Good for throughput-sensitive workloads. + +## Configuration Reference + +| Parameter | Type | Default | Description | +|-----------|------|---------|-------------| +| `workersMin` | int | 0 | Minimum workers kept warm (0 = scale to zero) | +| `workersMax` | int | 3 | Maximum concurrent workers | +| `idleTimeout` | int | 5 | Minutes before idle workers terminate | +| `scalerType` | ServerlessScalerType | QUEUE_DELAY | Scaling trigger metric | +| `scalerValue` | int | 4 | Target value for the scaler metric | +| `gpus` | list[GpuGroup] | -- | GPU types for LiveServerless | +| `instanceIds` | list[CpuInstanceType] | -- | CPU instance types for CpuLiveServerless | + +## Cost Analysis + +### Scale-to-Zero vs Always-On (GPU) + +Assumptions: GPU cost ~$0.0015/sec, 8 hours of actual compute per day. + +**Scale to Zero (workersMin=0):** +``` +Compute: 8h x 3600s x $0.0015 = $43.20/day +Cold starts: ~5-30s penalty per scale-up event +Monthly: ~$1,296 +``` + +**Always On (workersMin=1):** +``` +Baseline: 24h x 3600s x $0.0015 = $129.60/day (1 worker always running) +Extra compute: handled by autoscaling +Monthly: ~$3,888+ (baseline alone) +``` + +**When to choose each:** + +- **Scale to Zero**: Traffic is sporadic (< 4 hours/day of active use), batch processing, dev/staging environments, cost is primary concern. +- **Always On**: Traffic is steady (> 8 hours/day), SLA requires < 1s p99 latency, cold start penalty is unacceptable. +- **High Throughput**: Traffic is bursty with unpredictable spikes, throughput matters more than per-request latency, willing to pay for warm capacity. + +### CPU Cost Comparison + +CPU workers cost ~10x less than GPU. Use CPU for preprocessing, validation, and orchestration to reduce overall spend. + +``` +CPU worker: ~$0.0002/sec +GPU worker: ~$0.0015/sec +Ratio: ~7.5x cheaper +``` + +## Load Testing + +Use `load_test.py` to observe scaling behavior: + +```bash +# Default: 20 requests, concurrency 10, 10s pause +python load_test.py + +# Target a specific endpoint +python load_test.py --endpoint /gpu_worker/runsync --requests 50 + +# Longer pause to observe scale-down +python load_test.py --pause 60 --concurrency 20 + +# Test CPU workers +python load_test.py --endpoint /cpu_worker/runsync --requests 100 --concurrency 50 +``` + +**Requires:** `pip install aiohttp` + +### Interpreting Results + +- **First burst p95 vs second burst p95**: If pause > idleTimeout, second burst includes cold start latency. A large gap indicates scale-down occurred. +- **Error rate > 0%**: Workers may be overwhelmed. Increase `workersMax` or reduce `scalerValue`. +- **High p99 with low p50**: Queue delay is building up. Consider switching to `REQUEST_COUNT` scaler. + +## Cold Start Mitigation + +Cold starts occur when `workersMin=0` and all workers have been terminated. + +**Strategies:** + +1. **Set workersMin=1** -- Keeps one worker warm. Simplest fix, costs ~$130/day for GPU. +2. **Increase idleTimeout** -- Workers stay alive longer between requests. Good for intermittent traffic. +3. **Warm-up requests** -- Send periodic health checks to prevent scale-down. Application-level solution. +4. **Optimize container startup** -- Reduce model load time with smaller models, model caching, or quantization. + +## Production Checklist + +- [ ] Choose scaling strategy based on traffic pattern (sporadic, steady, bursty) +- [ ] Set `workersMax` based on budget and peak load +- [ ] Set `idleTimeout` based on traffic gaps (longer = fewer cold starts, higher cost) +- [ ] Choose `scalerType` based on priority (latency = QUEUE_DELAY, throughput = REQUEST_COUNT) +- [ ] Tune `scalerValue` after load testing (lower = more aggressive scaling) +- [ ] Run load test to verify scaling behavior +- [ ] Monitor cold start frequency in production +- [ ] Set up cost alerts for unexpected scaling + +## Next Steps + +- [02_gpu_optimization](../02_gpu_optimization/) -- GPU memory management and optimization +- [03_concurrency](../03_concurrency/) -- Async patterns and concurrency control +- [04_monitoring](../04_monitoring/) -- Logging, metrics, and observability diff --git a/04_scaling_performance/01_autoscaling/cpu_worker.py b/04_scaling_performance/01_autoscaling/cpu_worker.py new file mode 100644 index 0000000..f7e82d2 --- /dev/null +++ b/04_scaling_performance/01_autoscaling/cpu_worker.py @@ -0,0 +1,119 @@ +# CPU autoscaling strategies -- scale-to-zero and burst-ready. +# Run with: flash run +# Test directly: python cpu_worker.py +from runpod_flash import CpuInstanceType, CpuLiveServerless, remote + +# --- Strategy 1: Scale to Zero --- +# Cost-optimized for preprocessing tasks that tolerate cold starts. +cpu_scale_to_zero_config = CpuLiveServerless( + name="04_01_cpu_scale_to_zero", + instanceIds=[CpuInstanceType.CPU3C_1_2], + workersMin=0, + workersMax=5, + idleTimeout=5, +) + +# --- Strategy 2: Burst Ready --- +# Always-warm worker for API gateway or latency-sensitive CPU tasks. +cpu_burst_ready_config = CpuLiveServerless( + name="04_01_cpu_burst_ready", + instanceIds=[CpuInstanceType.CPU3G_2_8], + workersMin=1, + workersMax=10, + idleTimeout=30, +) + + +@remote(resource_config=cpu_scale_to_zero_config) +async def cpu_scale_to_zero(payload: dict) -> dict: + """CPU worker with scale-to-zero -- cost-optimized preprocessing.""" + import hashlib + import json + import time + + start_time = time.perf_counter() + + text = payload.get("text", "") + + # Simulate CPU-bound preprocessing: hashing, serialization, string ops + text_hash = hashlib.sha256(text.encode()).hexdigest() + normalized = " ".join(text.lower().split()) + tokens = normalized.split() + serialized = json.dumps({"tokens": tokens, "hash": text_hash}) + byte_size = len(serialized.encode()) + + duration_ms = round((time.perf_counter() - start_time) * 1000, 2) + + return { + "status": "success", + "strategy": "cpu_scale_to_zero", + "duration_ms": duration_ms, + "result": { + "text_hash": text_hash[:16], + "token_count": len(tokens), + "byte_size": byte_size, + }, + "config": {"workersMin": 0, "workersMax": 5, "idleTimeout": 5}, + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"), + } + + +@remote(resource_config=cpu_burst_ready_config) +async def cpu_burst_ready(payload: dict) -> dict: + """CPU worker with burst-ready scaling -- always-warm for low latency.""" + import hashlib + import json + import time + + start_time = time.perf_counter() + + text = payload.get("text", "") + + # Simulate CPU-bound API gateway work: validation, transformation, routing + text_hash = hashlib.sha256(text.encode()).hexdigest() + words = text.split() + word_lengths = [len(w) for w in words] + avg_word_length = sum(word_lengths) / len(word_lengths) if word_lengths else 0 + serialized = json.dumps( + {"words": words, "hash": text_hash, "avg_len": avg_word_length} + ) + byte_size = len(serialized.encode()) + + duration_ms = round((time.perf_counter() - start_time) * 1000, 2) + + return { + "status": "success", + "strategy": "cpu_burst_ready", + "duration_ms": duration_ms, + "result": { + "text_hash": text_hash[:16], + "word_count": len(words), + "avg_word_length": round(avg_word_length, 2), + "byte_size": byte_size, + }, + "config": {"workersMin": 1, "workersMax": 10, "idleTimeout": 30}, + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"), + } + + +if __name__ == "__main__": + import asyncio + + async def test_all(): + test_payload = { + "text": "Autoscaling CPU workers for preprocessing and validation" + } + + print("=== CPU Scale to Zero ===") + result = await cpu_scale_to_zero(test_payload) + print( + f"Duration: {result['duration_ms']}ms | Tokens: {result['result']['token_count']}\n" + ) + + print("=== CPU Burst Ready ===") + result = await cpu_burst_ready(test_payload) + print( + f"Duration: {result['duration_ms']}ms | Words: {result['result']['word_count']}\n" + ) + + asyncio.run(test_all()) diff --git a/04_scaling_performance/01_autoscaling/gpu_worker.py b/04_scaling_performance/01_autoscaling/gpu_worker.py new file mode 100644 index 0000000..67f3889 --- /dev/null +++ b/04_scaling_performance/01_autoscaling/gpu_worker.py @@ -0,0 +1,178 @@ +# GPU autoscaling strategies -- scale-to-zero, always-on, high-throughput. +# Run with: flash run +# Test directly: python gpu_worker.py +from runpod_flash import GpuGroup, LiveServerless, ServerlessScalerType, remote + +# --- Strategy 1: Scale to Zero --- +# Sporadic or batch workloads where cost matters more than cold-start latency. +# Workers scale down to zero after 5 minutes of idle time. +scale_to_zero_config = LiveServerless( + name="04_01_scale_to_zero", + gpus=[GpuGroup.ANY], + workersMin=0, + workersMax=3, + idleTimeout=5, + scalerType=ServerlessScalerType.QUEUE_DELAY, + scalerValue=4, +) + +# --- Strategy 2: Always On --- +# Steady traffic where low latency matters more than cost. +# At least one worker stays warm to avoid cold starts. +always_on_config = LiveServerless( + name="04_01_always_on", + gpus=[GpuGroup.ANY], + workersMin=1, + workersMax=3, + idleTimeout=60, + scalerType=ServerlessScalerType.QUEUE_DELAY, + scalerValue=4, +) + +# --- Strategy 3: High Throughput --- +# Bursty traffic where throughput matters most. +# Starts with 2 warm workers, scales aggressively to 10 based on request count. +high_throughput_config = LiveServerless( + name="04_01_high_throughput", + gpus=[GpuGroup.ANY], + workersMin=2, + workersMax=10, + idleTimeout=30, + scalerType=ServerlessScalerType.REQUEST_COUNT, + scalerValue=3, +) + + +@remote(resource_config=scale_to_zero_config) +async def scale_to_zero_inference(payload: dict) -> dict: + """GPU inference with scale-to-zero -- cost-optimized for sporadic workloads.""" + import asyncio + import time + + import torch + + start_time = time.perf_counter() + + device = "cuda" if torch.cuda.is_available() else "cpu" + size = payload.get("matrix_size", 512) + a = torch.randn(size, size, device=device) + b = torch.randn(size, size, device=device) + _ = torch.mm(a, b) + + await asyncio.sleep(0.5) + + duration_ms = round((time.perf_counter() - start_time) * 1000, 2) + + return { + "status": "success", + "strategy": "scale_to_zero", + "duration_ms": duration_ms, + "gpu_info": { + "available": torch.cuda.is_available(), + "device": device, + "name": torch.cuda.get_device_name(0) + if torch.cuda.is_available() + else "N/A", + }, + "config": {"workersMin": 0, "workersMax": 3, "idleTimeout": 5}, + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"), + } + + +@remote(resource_config=always_on_config) +async def always_on_inference(payload: dict) -> dict: + """GPU inference with always-on worker -- latency-optimized for steady traffic.""" + import asyncio + import time + + import torch + + start_time = time.perf_counter() + + device = "cuda" if torch.cuda.is_available() else "cpu" + size = payload.get("matrix_size", 512) + a = torch.randn(size, size, device=device) + b = torch.randn(size, size, device=device) + _ = torch.mm(a, b) + + await asyncio.sleep(0.5) + + duration_ms = round((time.perf_counter() - start_time) * 1000, 2) + + return { + "status": "success", + "strategy": "always_on", + "duration_ms": duration_ms, + "gpu_info": { + "available": torch.cuda.is_available(), + "device": device, + "name": torch.cuda.get_device_name(0) + if torch.cuda.is_available() + else "N/A", + }, + "config": {"workersMin": 1, "workersMax": 3, "idleTimeout": 60}, + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"), + } + + +@remote(resource_config=high_throughput_config) +async def high_throughput_inference(payload: dict) -> dict: + """GPU inference with high-throughput scaling -- optimized for bursty traffic.""" + import asyncio + import time + + import torch + + start_time = time.perf_counter() + + device = "cuda" if torch.cuda.is_available() else "cpu" + size = payload.get("matrix_size", 512) + a = torch.randn(size, size, device=device) + b = torch.randn(size, size, device=device) + _ = torch.mm(a, b) + + await asyncio.sleep(1.0) + + duration_ms = round((time.perf_counter() - start_time) * 1000, 2) + + return { + "status": "success", + "strategy": "high_throughput", + "duration_ms": duration_ms, + "gpu_info": { + "available": torch.cuda.is_available(), + "device": device, + "name": torch.cuda.get_device_name(0) + if torch.cuda.is_available() + else "N/A", + }, + "config": {"workersMin": 2, "workersMax": 10, "idleTimeout": 30}, + "timestamp": time.strftime("%Y-%m-%dT%H:%M:%S"), + } + + +if __name__ == "__main__": + import asyncio + + async def test_all(): + test_payload = {"matrix_size": 256} + + print("=== Scale to Zero Strategy ===") + result = await scale_to_zero_inference(test_payload) + print( + f"Duration: {result['duration_ms']}ms | Device: {result['gpu_info']['device']}\n" + ) + + print("=== Always On Strategy ===") + result = await always_on_inference(test_payload) + print( + f"Duration: {result['duration_ms']}ms | Device: {result['gpu_info']['device']}\n" + ) + + print("=== High Throughput Strategy ===") + result = await high_throughput_inference(test_payload) + print( + f"Duration: {result['duration_ms']}ms | Device: {result['gpu_info']['device']}\n" + ) + + asyncio.run(test_all()) diff --git a/04_scaling_performance/01_autoscaling/load_test.py b/04_scaling_performance/01_autoscaling/load_test.py new file mode 100644 index 0000000..db92feb --- /dev/null +++ b/04_scaling_performance/01_autoscaling/load_test.py @@ -0,0 +1,206 @@ +"""Load test script for autoscaling examples. + +Sends concurrent requests in phases to observe scaling behavior: +1. Warm-up -- single request to establish baseline +2. Burst -- concurrent requests to trigger scale-up +3. Pause -- idle period to observe scale-down +4. Second burst -- measure cold vs warm start difference + +Usage: + python load_test.py + python load_test.py --url http://localhost:8888 --concurrency 10 --requests 50 + python load_test.py --endpoint /gpu_worker/runsync --pause 30 +""" + +import argparse +import asyncio +import statistics +import time + +import aiohttp + +DEFAULT_URL = "http://localhost:8888" +DEFAULT_ENDPOINT = "/gpu_worker/runsync" +DEFAULT_PAYLOAD = {"matrix_size": 256} + + +async def send_request( + session: aiohttp.ClientSession, + url: str, + payload: dict, +) -> dict: + """Send a single request and return timing data.""" + start = time.perf_counter() + try: + async with session.post(url, json=payload) as resp: + body = await resp.json() + duration_ms = round((time.perf_counter() - start) * 1000, 2) + return { + "status": resp.status, + "duration_ms": duration_ms, + "success": resp.status == 200, + "body": body, + } + except Exception as e: + duration_ms = round((time.perf_counter() - start) * 1000, 2) + return { + "status": 0, + "duration_ms": duration_ms, + "success": False, + "error": str(e), + } + + +async def run_burst( + session: aiohttp.ClientSession, + url: str, + payload: dict, + count: int, + concurrency: int, +) -> list[dict]: + """Send a burst of concurrent requests with a concurrency limit.""" + semaphore = asyncio.Semaphore(concurrency) + results = [] + + async def limited_request() -> dict: + async with semaphore: + return await send_request(session, url, payload) + + tasks = [limited_request() for _ in range(count)] + results = await asyncio.gather(*tasks) + return list(results) + + +def compute_stats(results: list[dict]) -> dict: + """Compute latency percentiles and error rate from results.""" + durations = [r["duration_ms"] for r in results] + successes = sum(1 for r in results if r["success"]) + errors = len(results) - successes + + if not durations: + return {"count": 0, "errors": 0, "error_rate": 0} + + durations_sorted = sorted(durations) + p50_idx = int(len(durations_sorted) * 0.50) + p95_idx = int(len(durations_sorted) * 0.95) + p99_idx = int(len(durations_sorted) * 0.99) + + return { + "count": len(results), + "successes": successes, + "errors": errors, + "error_rate": round(errors / len(results) * 100, 2), + "min_ms": round(min(durations), 2), + "max_ms": round(max(durations), 2), + "mean_ms": round(statistics.mean(durations), 2), + "p50_ms": round(durations_sorted[p50_idx], 2), + "p95_ms": round(durations_sorted[min(p95_idx, len(durations_sorted) - 1)], 2), + "p99_ms": round(durations_sorted[min(p99_idx, len(durations_sorted) - 1)], 2), + } + + +def print_stats(label: str, stats: dict) -> None: + """Print formatted statistics for a phase.""" + print(f"\n--- {label} ---") + print( + f" Requests: {stats['count']} ({stats['successes']} ok, {stats['errors']} errors)" + ) + print(f" Error rate: {stats['error_rate']}%") + if stats["count"] > 0 and stats.get("min_ms") is not None: + print( + f" Latency: min={stats['min_ms']}ms mean={stats['mean_ms']}ms max={stats['max_ms']}ms" + ) + print( + f" Percentiles: p50={stats['p50_ms']}ms p95={stats['p95_ms']}ms p99={stats['p99_ms']}ms" + ) + + +async def run_load_test(args: argparse.Namespace) -> None: + """Execute the four-phase load test.""" + url = f"{args.url}{args.endpoint}" + payload = DEFAULT_PAYLOAD + + print(f"Target: {url}") + print(f"Concurrency: {args.concurrency} | Requests per burst: {args.requests}") + print(f"Pause between bursts: {args.pause}s") + + timeout = aiohttp.ClientTimeout(total=args.timeout) + async with aiohttp.ClientSession(timeout=timeout) as session: + # Phase 1: Warm-up + print("\n[Phase 1] Warm-up: 1 request") + warmup = await run_burst(session, url, payload, count=1, concurrency=1) + print_stats("Warm-up", compute_stats(warmup)) + + # Phase 2: First burst + print(f"\n[Phase 2] Burst: {args.requests} concurrent requests") + burst1_start = time.perf_counter() + burst1 = await run_burst( + session, url, payload, count=args.requests, concurrency=args.concurrency + ) + burst1_wall = round((time.perf_counter() - burst1_start) * 1000, 2) + stats1 = compute_stats(burst1) + print_stats(f"First Burst (wall time: {burst1_wall}ms)", stats1) + + # Phase 3: Pause + print(f"\n[Phase 3] Pause: waiting {args.pause}s for idle timeout / scale-down") + await asyncio.sleep(args.pause) + + # Phase 4: Second burst (cold vs warm comparison) + print(f"\n[Phase 4] Second burst: {args.requests} concurrent requests") + burst2_start = time.perf_counter() + burst2 = await run_burst( + session, url, payload, count=args.requests, concurrency=args.concurrency + ) + burst2_wall = round((time.perf_counter() - burst2_start) * 1000, 2) + stats2 = compute_stats(burst2) + print_stats(f"Second Burst (wall time: {burst2_wall}ms)", stats2) + + # Summary + print("\n=== Summary ===") + if stats1.get("mean_ms") and stats2.get("mean_ms"): + diff = round(stats2["mean_ms"] - stats1["mean_ms"], 2) + direction = "slower" if diff > 0 else "faster" + print(f" Second burst was {abs(diff)}ms {direction} on average") + print( + f" First burst p95: {stats1['p95_ms']}ms | Second burst p95: {stats2['p95_ms']}ms" + ) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Load test for Flash autoscaling examples" + ) + parser.add_argument( + "--url", default=DEFAULT_URL, help=f"Base URL (default: {DEFAULT_URL})" + ) + parser.add_argument( + "--endpoint", + default=DEFAULT_ENDPOINT, + help=f"Endpoint path (default: {DEFAULT_ENDPOINT})", + ) + parser.add_argument( + "--concurrency", + type=int, + default=10, + help="Max concurrent requests (default: 10)", + ) + parser.add_argument( + "--requests", type=int, default=20, help="Requests per burst (default: 20)" + ) + parser.add_argument( + "--pause", + type=int, + default=10, + help="Pause between bursts in seconds (default: 10)", + ) + parser.add_argument( + "--timeout", + type=int, + default=60, + help="Request timeout in seconds (default: 60)", + ) + return parser.parse_args() + + +if __name__ == "__main__": + asyncio.run(run_load_test(parse_args())) diff --git a/04_scaling_performance/README.md b/04_scaling_performance/README.md index c458f44..761bfa4 100644 --- a/04_scaling_performance/README.md +++ b/04_scaling_performance/README.md @@ -4,7 +4,7 @@ Optimize Flash applications for production workloads. Learn autoscaling, GPU opt ## Examples -### 01_autoscaling _(coming soon)_ +### [01_autoscaling](01_autoscaling/) Worker autoscaling configuration and strategies. **What you'll learn:** diff --git a/05_data_workflows/01_network_volumes/README.md b/05_data_workflows/01_network_volumes/README.md index d0b28bf..6c82f18 100644 --- a/05_data_workflows/01_network_volumes/README.md +++ b/05_data_workflows/01_network_volumes/README.md @@ -36,7 +36,7 @@ Server starts at `http://localhost:8888` **Generate an image (GPU worker):** ```bash -curl -X POST http://localhost:8888/gpu_worker/run_sync \ +curl -X POST http://localhost:8888/gpu_worker/runsync \ -H "Content-Type: application/json" \ -d '{"prompt": "a sunset over mountains"}' ``` @@ -76,7 +76,7 @@ Visit `http://localhost:8888/docs` for interactive API documentation. ## API Endpoints -### POST /gpu_worker/run_sync +### POST /gpu_worker/runsync GPU worker (QB, class-based `@remote`). Generates an image and saves it to the shared volume. diff --git a/CLAUDE.md b/CLAUDE.md index a767730..3804c9c 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -1,21 +1,45 @@ -# Flash Examples: AI Coding Assistant Guidelines +# Flash Examples -Instructions for AI coding assistants working on the flash-examples repository. +> Auto-generated by /analyze-repos on 2026-02-22. Manual edits will be overwritten on next analysis. ## Project Overview -Production-ready examples demonstrating Flash framework capabilities. Each example is a flat directory of standalone worker files auto-discovered by `flash run`. +Production-ready examples demonstrating Flash framework capabilities. Flat-file pattern: each worker is a standalone `.py` file with `@remote` decorator, auto-discovered by `flash run`. 6 categories, 18 worker files, 31 `@remote` endpoints. Root `pyproject.toml` declares only `runpod-flash` dependency; runtime deps declared inline via `@remote(dependencies=[...])`. -## Architecture: Flat-File Pattern +## Architecture -No FastAPI boilerplate, no routers, no `main.py`, no `mothership.py`. Each worker is a self-contained file with a `@remote` decorator. +### Key Abstractions + +1. **@remote decorator (function)** -- Core pattern. `async def` marked for remote execution. All 18 worker files use this. +2. **@remote decorator (class)** -- Used on `SimpleSD` class (`05_data_workflows`). Class-based pattern for stateful workers. +3. **Resource config types** -- `LiveServerless` (GPU), `CpuLiveServerless` (CPU), `CpuLiveLoadBalancer` (CPU LB), `LiveLoadBalancer` (GPU LB). Module-level config objects. +4. **Cross-worker orchestration** -- Pipeline files import from QB workers, chain with `await`. LB imports from QB workers. +5. **Flat-file discovery** -- No FastAPI boilerplate, no routers, no `main.py`. `flash run` auto-generates routes from decorated functions. +6. **In-function imports** -- Heavy libs (torch, transformers, etc.) imported inside `@remote` body, only `runpod_flash` at module level. + +### Entry Points + +All 18 worker files across 6 categories. Total: 30 `@remote` functions + 1 `@remote` class. Each file is an independent entry point discovered by `flash run`. + +### Module Structure ``` -example_name/ -├── README.md -├── gpu_worker.py # @remote decorated functions -├── cpu_worker.py # Optional CPU worker -└── pipeline.py # Optional cross-worker orchestration +01_getting_started/ # Fundamentals + 01_single_gpu_worker/ # Basic GPU worker + 02_cpu_worker/ # CPU-only worker + 03_pipeline/ # Cross-worker orchestration (CPU -> GPU -> LB) + 04_dependencies/ # Runtime dependency declaration + 05_multi_resource/ # Multiple resource types in one project +02_ml_inference/ # ML deployment + 01_text_to_speech/ # Qwen3-TTS model serving +03_advanced_workers/ # Advanced patterns + 01_lb_endpoint/ # LB endpoints with custom HTTP methods +04_scaling_performance/ # Autoscaling + 01_autoscaling/ # Scaling strategy examples +05_data_workflows/ # Data pipelines + 01_network_volumes/ # Network volume usage + 02_stable_diffusion/ # Stable Diffusion with @remote class +06_real_world/ # Placeholder for production patterns ``` ### Worker File Pattern @@ -32,27 +56,21 @@ gpu_config = LiveServerless( ) @remote(resource_config=gpu_config) -async def my_function(input_data: dict) -> dict: - """All imports inside the function body.""" +async def my_function(payload: dict) -> dict: + """All runtime imports inside the function body.""" import torch # implementation return {"status": "success"} ``` -Key rules: -- Imports from `runpod_flash`, not `flash` -- Config object at module level (`LiveServerless`, `CpuLiveLoadBalancer`, `CpuLiveServerless`) -- All runtime imports inside the `@remote` function body -- Return serializable data (dict, list, str) -- Worker naming convention: `{category}_{example}_{worker_type}` (e.g., `01_01_gpu_worker`) - ### Resource Types | Type | Import | Use Case | |------|--------|----------| -| `LiveServerless` | `from runpod_flash import LiveServerless, GpuGroup` | GPU workers | -| `CpuLiveLoadBalancer` | `from runpod_flash import CpuLiveLoadBalancer` | CPU workers, pipelines | -| `CpuLiveServerless` | `from runpod_flash import CpuLiveServerless` | CPU serverless | +| `LiveServerless` | `from runpod_flash import LiveServerless, GpuGroup` | GPU workers (9 files) | +| `CpuLiveServerless` | `from runpod_flash import CpuLiveServerless, CpuInstanceType` | CPU serverless (4 files) | +| `CpuLiveLoadBalancer` | `from runpod_flash import CpuLiveLoadBalancer` | CPU LB endpoints (4 files) | +| `LiveLoadBalancer` | `from runpod_flash import LiveLoadBalancer` | GPU LB endpoints (1 file) | ### Cross-Worker Orchestration @@ -71,44 +89,137 @@ async def classify(text: str) -> dict: return await gpu_inference(result) ``` -## Creating New Examples +## Public API Surface -Always use `flash init`: +All examples import from `runpod_flash`. Import frequency by symbol: -```bash -cd 01_getting_started -flash init my_new_example +| Symbol | Files Using It | Breakage Risk | +|--------|---------------|---------------| +| `remote` | 18 | ALL examples break | +| `LiveServerless` | 9 | GPU examples break | +| `GpuGroup` | 7 | GPU config breaks | +| `CpuLiveServerless` | 4 | CPU examples break | +| `CpuInstanceType` | 4 | CPU config breaks | +| `CpuLiveLoadBalancer` | 4 | Pipeline examples break | +| `NetworkVolume` | 2 | Volume examples break | +| `LiveLoadBalancer` | 1 | LB example breaks | +| `ServerlessScalerType` | 1 | Scaling example breaks | + +## Cross-Repo Dependencies + +### Depends On + +- **flash** (`runpod_flash` package) -- all 18 files import from it. Any breaking change to `@remote` signature, resource config constructors, or enum values breaks examples at import time. + +### Depended On By + +- None. This is a leaf repo (documentation/examples only). + +### Interface Contracts + +- `@remote(resource_config=...)` decorator signature -- any parameter rename or removal breaks all 18 files +- Resource config constructors (`LiveServerless`, `CpuLiveServerless`, etc.) -- field name changes break config objects +- `GpuGroup`, `CpuInstanceType` enum values -- value removals break GPU/CPU configs +- `NetworkVolume` constructor -- field changes break volume examples + +### Dependency Chain + +``` +flash-examples --> flash (runpod_flash) --> runpod-python (runpod) ``` -Never copy-paste existing examples. +### Known Drift + +- No automated tests -- changes caught only at import time or `flash run` +- No CI that validates examples against current flash version +- Python version: inherits from flash (3.10+) + +## Development Commands + +### Setup + +```bash +uv venv && source .venv/bin/activate +uv sync --all-groups +``` -## Development Workflow +### Testing ```bash -flash run # Start local dev server (localhost:8888) +flash run # Start local dev server (localhost:8888) # Visit http://localhost:8888/docs for interactive API docs -python gpu_worker.py # Test a single worker directly +python gpu_worker.py # Test a single worker directly (if __name__ == "__main__" block) ``` -## Quality Gates +### Quality ```bash -make quality-check # Required before every commit -make lint # Ruff linter -make format-check # Ruff format check +make quality-check # REQUIRED BEFORE ALL COMMITS +make lint # Ruff linter +make format # Ruff formatter +make format-check # Check formatting ``` -## Code Standards +### Build and Deploy -- Type hints on all function signatures -- Early returns / guard clauses over nested conditions -- No hardcoded credentials (use `os.getenv()`) -- No `print()` in production code (logging module or skip for examples) -- Catch specific exceptions, never bare `except:` +```bash +flash build # Package build artifacts +flash deploy # Build + upload + provision endpoints +flash deploy --preview # Local Docker Compose preview +flash build --use-local-flash # Use local flash library instead of PyPI +``` + +## Code Health + +### High Severity + +- **No test infrastructure at all.** No `conftest.py`, no `tests/` directory, no pytest config. Only `if __name__ == "__main__"` blocks for manual testing. Any flash API change is caught only at import time. + +### Medium Severity + +- Broad `except Exception` catches in 4 files -- swallows specific errors, makes debugging harder +- Duplicated GPU inference logic in `04_scaling_performance` -- 3 near-identical functions that should be extracted +- No CI validation that examples work against the current flash version + +### Low Severity + +- Duplicated `speakers`/`languages` lists in `02_ml_inference/01_text_to_speech` +- Missing input validation in some workers (accepts arbitrary dict without schema) + +## Testing + +### Structure + +No formal test infrastructure exists. Each worker has an optional `if __name__ == "__main__"` block for manual execution. + +### Coverage Gaps + +- **100% uncovered** -- no test framework, no conftest, no pytest config +- No smoke tests that verify examples import successfully +- No integration tests that run `flash run` against examples + +### Patterns + +To test manually: +```bash +cd 01_getting_started/01_single_gpu_worker +flash run # Starts dev server, auto-discovers workers +# Use http://localhost:8888/docs to invoke endpoints +``` + +### Recommended Test Strategy + +1. Add `tests/test_imports.py` that imports every worker file (catches `@remote` signature drift) +2. Add `tests/test_configs.py` that validates all resource configs construct without error +3. Add CI job that runs `flash run --check` (dry-run mode) against each example category ## Common Mistakes -1. **Accessing external scope in @remote functions** -- only local variables, parameters, and internal imports work -2. **Module-level imports of heavy libraries** -- import torch, numpy, etc. inside the function body -3. **Missing `if __name__ == "__main__"` test block** -- each worker should be independently testable -4. **Mutable default arguments** -- use `None` and initialize in function body +1. **Accessing external scope in @remote functions** -- only local variables, parameters, and internal imports work. The function body is serialized and sent to a remote worker. +2. **Module-level imports of heavy libraries** -- import torch, numpy, transformers, etc. inside the function body, not at module level. +3. **Missing `if __name__ == "__main__"` test block** -- each worker should be independently testable. +4. **Mutable default arguments** -- use `None` and initialize in function body. +5. **Importing from `flash` instead of `runpod_flash`** -- the package name is `runpod_flash`. + +--- +*Last analyzed: 2026-02-22* diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index b75869c..6083a0a 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -187,12 +187,12 @@ from runpod_flash import remote, LiveServerless config = LiveServerless(name="your_worker") @remote(resource_config=config, dependencies=["torch"]) -async def your_function(input_data: dict) -> dict: +async def your_function(payload: dict) -> dict: """ Clear docstring explaining what this function does. Args: - input_data: Description of expected input + payload: Description of expected input Returns: Description of output format @@ -200,7 +200,7 @@ async def your_function(input_data: dict) -> dict: import torch # Your implementation - result = process(input_data) + result = process(payload) return {"status": "success", "result": result} @@ -347,8 +347,8 @@ from your_example.gpu_worker import your_function @pytest.mark.asyncio async def test_your_function(): - input_data = {"key": "value"} - result = await your_function(input_data) + payload = {"key": "value"} + result = await your_function(payload) assert result["status"] == "success" assert "result" in result diff --git a/docs/cli/commands.md b/docs/cli/commands.md index 36a1687..8e87e9f 100644 --- a/docs/cli/commands.md +++ b/docs/cli/commands.md @@ -1217,7 +1217,7 @@ curl -X POST https://abcd1234-my-api-gpu.runpod.io/run \ -H "Content-Type: application/json" \ -d '{ "input": { - "input_data": {"message": "test"} + "payload": {"message": "test"} } }' ``` diff --git a/docs/cli/getting-started.md b/docs/cli/getting-started.md index 53e6190..5863b68 100644 --- a/docs/cli/getting-started.md +++ b/docs/cli/getting-started.md @@ -74,13 +74,13 @@ gpu_config = LiveServerless( ) @remote(resource_config=gpu_config) -async def process_request(input_data: dict) -> dict: +async def process_request(payload: dict) -> dict: """Example GPU worker that processes requests.""" # Your GPU processing logic here return { "status": "success", "message": "Hello from Flash GPU worker!", - "input_received": input_data + "input_received": payload } ``` @@ -127,7 +127,7 @@ You'll see FastAPI's interactive Swagger UI with your endpoint. 3. Enter test JSON: ```json { - "input_data": {"message": "test"} + "payload": {"message": "test"} } ``` 4. Click "Execute" @@ -149,7 +149,7 @@ You'll see FastAPI's interactive Swagger UI with your endpoint. ```bash curl -X POST http://localhost:8888/process \ -H "Content-Type: application/json" \ - -d '{"input_data": {"message": "test"}}' + -d '{"payload": {"message": "test"}}' ``` --- @@ -228,7 +228,7 @@ Test your endpoint: curl -X POST https://abcd1234-hello-flash-gpu.runpod.io/run \ -H "Authorization: Bearer $RUNPOD_API_KEY" \ -H "Content-Type: application/json" \ - -d '{"input": {"input_data": {"message": "production test"}}}' + -d '{"input": {"payload": {"message": "production test"}}}' ``` **What happened:** @@ -249,7 +249,7 @@ Use the curl command from the deployment output: curl -X POST https://abcd1234-hello-flash-gpu.runpod.io/run \ -H "Authorization: Bearer $RUNPOD_API_KEY" \ -H "Content-Type: application/json" \ - -d '{"input": {"input_data": {"message": "production test"}}}' + -d '{"input": {"payload": {"message": "production test"}}}' ``` **Expected response:** diff --git a/docs/cli/workflows.md b/docs/cli/workflows.md index 9c5eba4..bc087ef 100644 --- a/docs/cli/workflows.md +++ b/docs/cli/workflows.md @@ -100,10 +100,10 @@ Edit your worker files (e.g., `gpu_worker.py`): ```python @remote(resource_config=gpu_config) -async def process_request(input_data: dict) -> dict: +async def process_request(payload: dict) -> dict: """Updated function with new logic.""" # Add new feature here - result = perform_processing(input_data) + result = perform_processing(payload) return {"status": "success", "result": result} ``` @@ -120,7 +120,7 @@ async def process_request(input_data: dict) -> dict: 4. Enter test data: ```json { - "input_data": {"test": "value"} + "payload": {"test": "value"} } ``` 5. Click "Execute" @@ -131,7 +131,7 @@ async def process_request(input_data: dict) -> dict: ```bash curl -X POST http://localhost:8888/process \ -H "Content-Type: application/json" \ - -d '{"input_data": {"test": "value"}}' + -d '{"payload": {"test": "value"}}' ``` **Validation:** Response matches expected output @@ -348,7 +348,7 @@ curl -X POST https://abcd1234-my-api-gpu.runpod.io/run \ -H "Content-Type: application/json" \ -d '{ "input": { - "input_data": {"test": "production"} + "payload": {"test": "production"} } }' ``` @@ -787,7 +787,7 @@ Press Ctrl+C to stop preview ```bash curl -X POST http://localhost:8000/run \ -H "Content-Type: application/json" \ - -d '{"input": {"input_data": {"test": "preview"}}}' + -d '{"input": {"payload": {"test": "preview"}}}' ``` **3. Validate Response:** @@ -1589,11 +1589,11 @@ gpu_config = LiveServerless( _model = None @remote(resource_config=gpu_config) -async def infer(input_data: dict) -> dict: +async def infer(payload: dict) -> dict: global _model if _model is None: _model = load_model() # Only load once - return _model.predict(input_data) + return _model.predict(payload) ``` **C. Increase worker capacity:**