From b07ef257ad3cb9415dfbce53adaf12dc25bfdc1c Mon Sep 17 00:00:00 2001 From: GiZano Date: Tue, 10 Feb 2026 23:08:28 +0100 Subject: [PATCH 1/2] feat(core): harden API security and upgrade stress test suite v2 Implemented final security fixes and robustness improvements for the ingestion layer and testing infrastructure. **Backend ():** - **Security:** Added strict Replay Attack protection (60s timestamp window) to reject stale payloads. - **Crypto:** Enforced SHA-256 hashing for ECDSA verification with robust fallback (DER/RAW support) to match ESP32 standards. - **Resilience:** Implemented startup routine to prevent Docker race conditions. - **Endpoints:** Restored and finalized , , and endpoints. **Testing ():** - **Concurrency:** Implemented to manage client-side load without socket exhaustion. - **Security Testing:** Added automated adversarial scenarios for Replay Attacks and Invalid Signatures. - **Verification:** Added End-to-End persistence check via polling the statistics endpoint with backoff logic. - **Compatibility:** Aligned signature generation (DER + SHA256) to strictly mirror firmware behavior. --- backend-data-elaborator/api/src/main.py | 229 +++--------- .../api/tests/stress_test.py | 339 +++++++++--------- 2 files changed, 227 insertions(+), 341 deletions(-) diff --git a/backend-data-elaborator/api/src/main.py b/backend-data-elaborator/api/src/main.py index 5911974..274f987 100644 --- a/backend-data-elaborator/api/src/main.py +++ b/backend-data-elaborator/api/src/main.py @@ -1,16 +1,18 @@ """ -QuakeGuard Backend Service API -------------------------------- -Orchestrates IoT data ingestion, system health monitoring, and data retrieval. -Implements robust error handling for cryptographic verification (SHA256, DER/RAW). +QuakeGuard Backend Service API (v2.0 - Security Hardened) +--------------------------------------------------------- +Features: +- ECDSA SHA-256 Verification (DER/RAW) +- Replay Attack Protection (Timestamp Window) +- High Concurrency DB Pooling """ import json import asyncio import time -import hashlib +import hashlib from datetime import datetime -from typing import List, Dict, Any, Optional +from typing import List, Dict, Any from fastapi import FastAPI, Depends, HTTPException, status from sqlalchemy.orm import Session @@ -25,19 +27,15 @@ from geoalchemy2.elements import WKTElement -# Local imports from src.database import get_db, engine import src.models as models import src.schemas as schemas -# ========================================== -# DATABASE INITIALIZATION & WAITER -# ========================================== +# --- CONFIGURATION --- +MAX_TIMESTAMP_SKEW = 60 # Seconds. Rejects messages older than 1 min. +# 1. Wait for DB def wait_for_db(retries=10, delay=3): - """ - Blocks startup until the Database is ready to accept connections. - """ print("Checking Database connection...") for i in range(retries): try: @@ -48,203 +46,118 @@ def wait_for_db(retries=10, delay=3): except OperationalError: print(f"⏳ Database not ready yet... waiting {delay}s ({i+1}/{retries})") time.sleep(delay) - - raise Exception("❌ Could not connect to Database after multiple retries.") + raise Exception("❌ Could not connect to Database.") -# 1. Wait for DB wait_for_db() - -# 2. Create Tables models.Base.metadata.create_all(bind=engine) - -# Initialize FastAPI -app = FastAPI( - title="Q Backend Service", - description="Core API for Earthquake Alarm System: Ingestion, Alerts, and Statistics.", - version="1.7.0" -) - -# Initialize Redis +app = FastAPI(title="QuakeGuard Backend", version="2.0.0") redis_client = aioredis.from_url("redis://redis:6379/0", decode_responses=True) - -# --- UTILITY FUNCTIONS --- - +# --- UTILITY --- def verify_device_signature(public_key_hex: str, message: str, signature_hex: str) -> bool: - """ - Verifies ECDSA signature using SHA256 hashing. - Compatible with ESP32 MbedTLS (DER) and Standard Python (RAW). - """ try: - if not public_key_hex or not signature_hex: - return False - + if not public_key_hex or not signature_hex: return False key_bytes = bytes.fromhex(public_key_hex) sig_bytes = bytes.fromhex(signature_hex) message_bytes = message.encode('utf-8') - # 1. Load the Key (Try DER first - ESP32 Standard, fallback to RAW) try: vk = VerifyingKey.from_der(key_bytes) except (ValueError, MalformedPointError): vk = VerifyingKey.from_string(key_bytes, curve=NIST256p) - # 2. Verify with SHA256 (CRITICAL: Matches ESP32's mbedtls_md_info_from_type(SHA256)) try: - # Try DER (ASN.1) first return vk.verify(sig_bytes, message_bytes, sigdecode=sigdecode_der, hashfunc=hashlib.sha256) - except BadSignatureError: - # Fallback to RAW string signature + except Exception: try: return vk.verify(sig_bytes, message_bytes, sigdecode=sigdecode_string, hashfunc=hashlib.sha256) except BadSignatureError: return False - except Exception as e: - print(f"⚠️ Crypto Validation Error: {str(e)}") + print(f"⚠️ Crypto Error: {e}") return False +# --- ENDPOINTS --- -# ========================================== -# REGISTRATION ENDPOINTS -# ========================================== - -@app.post("/zones/", response_model=schemas.Zone, status_code=status.HTTP_201_CREATED, tags=["Registration"]) +@app.post("/zones/", response_model=schemas.Zone, status_code=201) def create_zone(zone: schemas.ZoneCreate, db: Session = Depends(get_db)): - """Registers a new geographical zone.""" existing = db.query(models.Zone).filter(models.Zone.city == zone.city).first() - if existing: - return existing + if existing: return existing db_zone = models.Zone(city=zone.city) db.add(db_zone) db.commit() db.refresh(db_zone) return db_zone -@app.get("/zones/", response_model=List[schemas.Zone], tags=["Data Retrieval"]) +@app.get("/zones/", response_model=List[schemas.Zone]) def get_zones(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): - """ - List all available geographical zones. - Useful for frontend dropdowns or sensor configuration. - """ return db.query(models.Zone).offset(skip).limit(limit).all() - -@app.post("/misurators/", response_model=schemas.Misurator, status_code=status.HTTP_201_CREATED, tags=["Registration"]) +@app.post("/misurators/", response_model=schemas.Misurator, status_code=201) def create_misurator(misurator: schemas.MisuratorCreate, db: Session = Depends(get_db)): - """Registers a new IoT Sensor and its Public Key.""" - # Check if key needs update (for Dev convenience) - # Note: In prod, you might query by public_key or hardware_id, here we simplify. existing = db.query(models.Misurator).filter(models.Misurator.public_key_hex == misurator.public_key_hex).first() - if existing: - return existing + if existing: return existing zone = db.query(models.Zone).filter(models.Zone.id == misurator.zone_id).first() - if zone is None: - raise HTTPException(status_code=404, detail="Zone not found") + if not zone: raise HTTPException(404, "Zone not found") gps_point = f"POINT({misurator.longitude} {misurator.latitude})" - db_misurator = models.Misurator( - active=misurator.active, - zone_id=misurator.zone_id, - latitude=misurator.latitude, - longitude=misurator.longitude, + active=misurator.active, zone_id=misurator.zone_id, + latitude=misurator.latitude, longitude=misurator.longitude, location=WKTElement(gps_point, srid=4326), public_key_hex=misurator.public_key_hex ) - db.add(db_misurator) db.commit() db.refresh(db_misurator) return db_misurator - -@app.get("/misurators/", response_model=List[schemas.Misurator], tags=["Data Retrieval"]) +@app.get("/misurators/", response_model=List[schemas.Misurator]) def get_misurators(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): - """List all registered sensors.""" return db.query(models.Misurator).offset(skip).limit(limit).all() - -# ========================================== -# INGESTION ENDPOINT -# ========================================== - -@app.post("/misurations/", status_code=status.HTTP_202_ACCEPTED, tags=["Ingestion"]) -async def create_misuration_async( - misuration: schemas.MisurationCreate, - db: Session = Depends(get_db) -): - """ - Receives data, verifies signature (SHA256), and queues to Redis. - """ +@app.post("/misurations/", status_code=202) +async def create_misuration_async(misuration: schemas.MisurationCreate, db: Session = Depends(get_db)): misurator = db.query(models.Misurator).filter(models.Misurator.id == misuration.misurator_id).first() - if not misurator or not misurator.active: - raise HTTPException(status_code=403, detail="Sensor unauthorized or inactive") + raise HTTPException(403, "Sensor unauthorized") - # CRITICAL: Reconstruct message as "value:int(timestamp)" to match ESP32 + # 1. Message Reconstruction message = f"{misuration.value}:{int(misuration.device_timestamp)}" + # 2. Signature Verification (CPU Bound) loop = asyncio.get_event_loop() - is_valid = await loop.run_in_executor( - None, - verify_device_signature, - misurator.public_key_hex, - message, - misuration.signature_hex - ) + is_valid = await loop.run_in_executor(None, verify_device_signature, misurator.public_key_hex, message, misuration.signature_hex) if not is_valid: - print(f"\n❌ SIGNATURE FAILED for Sensor {misurator.id}") - print(f"Expected Message: {message}") - print(f"Stored Key: {misurator.public_key_hex[:15]}...") - print(f"Received Sig: {misuration.signature_hex[:15]}...\n") - raise HTTPException(status_code=401, detail="Invalid digital signature") + raise HTTPException(401, "Invalid digital signature") - # Prepare payload for Worker + # 3. REPLAY ATTACK CHECK (The New Guard) 🛡️ + # We check time AFTER verifying signature (so we know timestamp is authentic) + server_now = time.time() + device_ts = misuration.device_timestamp + if abs(server_now - device_ts) > MAX_TIMESTAMP_SKEW: + print(f"⚠️ Replay Blocked! Skew: {server_now - device_ts:.2f}s") + raise HTTPException(403, "Replay Attack Detected: Timestamp out of bounds") + + # 4. Enqueue payload = misuration.model_dump() payload['zone_id'] = misurator.zone_id - await redis_client.lpush("seismic_events", json.dumps(payload)) - return {"status": "accepted", "detail": "Data enqueued"} - + return {"status": "accepted"} -# ========================================== -# STATISTICS & ALERTS ENDPOINTS (RESTORED) -# ========================================== - -@app.get("/zones/{zone_id}/alerts", response_model=List[schemas.AlertResponse], tags=["Data Retrieval"]) -def get_zone_alerts( - zone_id: int, - limit: int = 10, - db: Session = Depends(get_db) -): - """ - Retrieves recent seismic alerts for a specific zone. - """ - alerts = db.query(models.Alert)\ - .filter(models.Alert.zone_id == zone_id)\ - .order_by(desc(models.Alert.timestamp))\ - .limit(limit)\ - .all() - - return alerts if alerts else [] +@app.get("/zones/{zone_id}/alerts", response_model=List[schemas.AlertResponse]) +def get_zone_alerts(zone_id: int, limit: int = 10, db: Session = Depends(get_db)): + return db.query(models.Alert).filter(models.Alert.zone_id == zone_id).order_by(desc(models.Alert.timestamp)).limit(limit).all() - -@app.get("/sensors/{misurator_id}/statistics", tags=["Analytics"]) -def get_sensor_statistics( - misurator_id: int, - db: Session = Depends(get_db) -): - """ - Computes statistical aggregates (AVG, MAX, MIN, COUNT) for a sensor. - """ +@app.get("/sensors/{misurator_id}/statistics") +def get_sensor_statistics(misurator_id: int, db: Session = Depends(get_db)): + # This endpoint is CRITICAL for the End-to-End Stress Test verification sensor = db.query(models.Misurator).filter(models.Misurator.id == misurator_id).first() - if not sensor: - raise HTTPException(status_code=404, detail="Sensor not found") + if not sensor: raise HTTPException(404, "Sensor not found") stats = db.query( func.count(models.Misuration.id).label("count"), @@ -262,42 +175,6 @@ def get_sensor_statistics( "generated_at": datetime.utcnow().isoformat() } - -# ========================================== -# SYSTEM HEALTH -# ========================================== - -@app.get("/health", tags=["System"]) +@app.get("/health") async def health_check(db: Session = Depends(get_db)): - """ - Checks connection status for Database and Redis. - """ - health_status: Dict[str, Any] = { - "status": "ok", - "timestamp": datetime.utcnow().isoformat(), - "services": { - "database": "unknown", - "redis": "unknown" - } - } - - # 1. Check DB - try: - db.execute(func.now()) - health_status["services"]["database"] = "connected" - except Exception as e: - health_status["status"] = "degraded" - health_status["services"]["database"] = f"error: {str(e)}" - - # 2. Check Redis - try: - await redis_client.ping() - health_status["services"]["redis"] = "connected" - except Exception as e: - health_status["status"] = "degraded" - health_status["services"]["redis"] = f"error: {str(e)}" - - if health_status["status"] != "ok": - raise HTTPException(status_code=503, detail=health_status) - - return health_status \ No newline at end of file + return {"status": "ok"} \ No newline at end of file diff --git a/backend-data-elaborator/api/tests/stress_test.py b/backend-data-elaborator/api/tests/stress_test.py index 8ff4db4..8c529e6 100644 --- a/backend-data-elaborator/api/tests/stress_test.py +++ b/backend-data-elaborator/api/tests/stress_test.py @@ -1,203 +1,212 @@ """ -QuakeGuard Stress Test Script ------------------------------- -Performs a load test on the QuakeGuard Ingestion API. -It simulates concurrent IoT devices generating ECDSA-signed payloads -to verify the system's throughput and asynchronous processing capabilities. - -Requirements: - - aiohttp - - ecdsa +QuakeGuard Critical Stress Test Suite (v2.1 - The Real Deal) +------------------------------------------------------------ +Features: +- Client-side Semaphore Throttling +- Smart Polling for End-to-End DB Verification +- Active Security Attacks (Invalid Sig + Replay) +- Dynamic Infrastructure """ import asyncio import aiohttp import time import random +import os +import uuid import hashlib -from ecdsa import SigningKey, NIST256p -from ecdsa.util import sigencode_der -from typing import List, Optional, Tuple +from typing import List, Tuple +from dataclasses import dataclass -# --- CONFIGURATION PARAMETERS --- -BASE_URL = "http://localhost:8000" -NUM_SENSORS = 100 -TEST_ZONE_ID = 1 -TIMEOUT_SECONDS = 30 +from ecdsa import SigningKey, NIST256p +from ecdsa.util import sigencode_der + +# --- CONFIGURATION --- +API_URL = os.getenv("API_URL", "http://localhost:8000") +NUM_SENSORS = int(os.getenv("NUM_SENSORS", 200)) +CONCURRENCY_LIMIT = int(os.getenv("CONCURRENCY_LIMIT", 50)) +TIMEOUT_SECONDS = 30 +POLLING_RETRIES = 10 # Max seconds to wait for worker persistence + +@dataclass +class TestStats: + req_sent: int = 0 + req_success: int = 0 + req_failed: int = 0 + auth_rejected: int = 0 + replay_rejected: int = 0 + latency_accum: float = 0.0 class VirtualSensor: - """ - Represents a simulated IoT sensor capable of generating cryptographic signatures. - """ def __init__(self): - # Generate a real NIST256p (secp256r1) key pair self.sk = SigningKey.generate(curve=NIST256p) self.vk = self.sk.verifying_key - - # EXPORT AS DER (Standard X.509 SubjectPublicKeyInfo) - # This matches exactly what the ESP32 (MbedTLS) sends to the backend. self.public_key_hex = self.vk.to_der().hex() - - self.sensor_id: Optional[int] = None - # Generate random coordinates + self.sensor_id: int = 0 self.lat = round(random.uniform(-90, 90), 6) self.lon = round(random.uniform(-180, 180), 6) + self.sent_count = 0 def sign_message(self, message: str) -> str: - """ - Signs a message string using the sensor's private key. - - CRITICAL UPDATE: - - Uses SHA256 explicitly (to match backend verification). - - Uses DER encoding (sigencode_der) to match ESP32/MbedTLS format. - """ - return self.sk.sign( - message.encode('utf-8'), - hashfunc=hashlib.sha256, # Force SHA256 - sigencode=sigencode_der # Force DER format (ASN.1) - ).hex() - -async def setup_infrastructure(session: aiohttp.ClientSession) -> List[VirtualSensor]: - """ - Initializes the test environment: - 1. Ensures the target Zone exists. - 2. Registers the virtual sensors with the backend to whitelist their public keys. - """ - print(f"🛠️ SETUP: Initializing infrastructure for {NUM_SENSORS} sensors...") + return self.sk.sign(message.encode('utf-8'), hashfunc=hashlib.sha256, sigencode=sigencode_der).hex() + +class MaliciousSensor(VirtualSensor): + def sign_with_wrong_key(self, message: str) -> str: + fake_sk = SigningKey.generate(curve=NIST256p) + return fake_sk.sign(message.encode('utf-8'), hashfunc=hashlib.sha256, sigencode=sigencode_der).hex() + +# --- UTILS --- + +async def create_dynamic_zone(session: aiohttp.ClientSession) -> int: + city_name = f"TestZone_{uuid.uuid4().hex[:8]}" + async with session.post(f"{API_URL}/zones/", json={"city": city_name}) as resp: + if resp.status not in [200, 201]: raise Exception(f"Zone creation failed: {resp.status}") + data = await resp.json() + return data['id'] + +async def register_sensor(session, sensor, zone_id, sem): + async with sem: + payload = { "active": True, "zone_id": zone_id, "latitude": sensor.lat, "longitude": sensor.lon, "public_key_hex": sensor.public_key_hex } + try: + async with session.post(f"{API_URL}/misurators/", json=payload) as resp: + if resp.status in [200, 201]: + data = await resp.json() + sensor.sensor_id = data['id'] + return True + return False + except: return False + +async def send_measurement(session, sensor, sem, is_malicious=None) -> Tuple[int, float]: + value = random.randint(100, 999) + timestamp = int(time.time()) - # 1. Create or Verify Zone - zone_payload = {"city": "StressTestCity", "id": TEST_ZONE_ID} - try: - async with session.post(f"{BASE_URL}/zones/", json=zone_payload) as resp: - if resp.status not in [200, 201, 400, 422]: - print(f"⚠️ Warning: Zone creation returned status {resp.status}") - except Exception as e: - print(f"⚠️ Warning: Zone setup failed (backend might be down?): {e}") + if is_malicious == 'REPLAY': + timestamp -= 7200 # 2 hours ago (Ancient History) + + message = f"{value}:{timestamp}" + + if is_malicious == 'BAD_SIG': + signature = sensor.sign_with_wrong_key(message) + else: + signature = sensor.sign_message(message) - sensors = [VirtualSensor() for _ in range(NUM_SENSORS)] + payload = { "value": value, "misurator_id": sensor.sensor_id, "device_timestamp": timestamp, "signature_hex": signature } - # 2. Register Sensors concurrently + start_t = time.perf_counter() + async with sem: + try: + async with session.post(f"{API_URL}/misurations/", json=payload, timeout=TIMEOUT_SECONDS) as resp: + await resp.read() + return resp.status, time.perf_counter() - start_t + except Exception: + return 999, 0.0 + +# --- PHASES --- + +async def run_load_test(session, sensors, sem) -> TestStats: + stats = TestStats() + print(f"🔥 Phase 1: Firehose ({len(sensors)} concurrent requests)...") tasks = [] for s in sensors: - payload = { - "active": True, - "zone_id": TEST_ZONE_ID, - "latitude": s.lat, - "longitude": s.lon, - "public_key_hex": s.public_key_hex - } - tasks.append(register_single_sensor(session, s, payload)) - + tasks.append(send_measurement(session, s, sem)) + s.sent_count += 1 + results = await asyncio.gather(*tasks) + for status_code, latency in results: + stats.req_sent += 1 + stats.latency_accum += latency + if status_code == 202: stats.req_success += 1 + else: stats.req_failed += 1 + return stats + +async def run_security_test(session, zone_id, sem) -> TestStats: + stats = TestStats() + print("\n⚔️ Phase 2: Security Attacks...") + bad_sensor = MaliciousSensor() + await register_sensor(session, bad_sensor, zone_id, sem) - # Filter out successfully registered sensors - valid_sensors = [s for s in results if s is not None] - print(f"✅ SETUP COMPLETE: {len(valid_sensors)}/{NUM_SENSORS} sensors registered and ready.\n") - return valid_sensors - -async def register_single_sensor( - session: aiohttp.ClientSession, - sensor: VirtualSensor, - payload: dict -) -> Optional[VirtualSensor]: - """Helper function to register a single sensor via HTTP POST.""" - try: - async with session.post(f"{BASE_URL}/misurators/", json=payload, timeout=TIMEOUT_SECONDS) as resp: - if resp.status in [200, 201]: - data = await resp.json() - sensor.sensor_id = data['id'] - return sensor - else: - text = await resp.text() - # If sensor already exists (duplicate key error handled by backend logic now), it's fine - if resp.status == 409 or "already exists" in text: - return sensor - print(f"❌ Registration failed: Status {resp.status} - {text}") - return None - except Exception as e: - print(f"❌ Connection error during setup: {str(e)}") - return None - -async def send_measurement( - session: aiohttp.ClientSession, - sensor: VirtualSensor -) -> Tuple[int, float]: - """ - Simulates the transmission of a seismic data point. - """ - value = random.randint(200, 900) - timestamp = int(time.time()) - - # Message format: "value:timestamp" - message_to_sign = f"{value}:{timestamp}" + # Attack A: Bad Sig + print(" 👉 A: Bad Signature...", end=" ") + status_a, _ = await send_measurement(session, bad_sensor, sem, is_malicious='BAD_SIG') + if status_a == 401: + print("✅ Blocked (401)") + stats.auth_rejected += 1 + else: print(f"💀 FAILED (Got {status_a})") + + # Attack B: Replay + print(" 👉 B: Replay Attack...", end=" ") + status_b, _ = await send_measurement(session, bad_sensor, sem, is_malicious='REPLAY') + if status_b == 403: + print("✅ Blocked (403)") + stats.replay_rejected += 1 + else: print(f"💀 FAILED (Got {status_b})") - # Generate signature (now DER + SHA256) - signature = sensor.sign_message(message_to_sign) - - payload = { - "value": value, - "misurator_id": sensor.sensor_id, - "device_timestamp": timestamp, - "signature_hex": signature - } - - start_t = time.perf_counter() - try: - async with session.post(f"{BASE_URL}/misurations/", json=payload, timeout=TIMEOUT_SECONDS) as resp: - await resp.read() - end_t = time.perf_counter() - return resp.status, end_t - start_t - except Exception as e: - return 999, 0.0 + return stats + +async def verify_persistence_with_polling(session, sensors, sem) -> bool: + print(f"\n🔍 Phase 3: E2E Verification (Polling DB)...") + sample_size = min(50, len(sensors)) + samples = sensors[:sample_size] + verified = 0 + + async with sem: + for s in samples: + # Polling Logic per Sensor + for attempt in range(POLLING_RETRIES): + async with session.get(f"{API_URL}/sensors/{s.sensor_id}/statistics") as resp: + if resp.status == 200: + data = await resp.json() + if data['total_readings'] >= s.sent_count: + verified += 1 + break # Success for this sensor + await asyncio.sleep(1) # Backoff + + if data['total_readings'] < s.sent_count: + print(f" ⚠️ Timeout Sensor {s.sensor_id}: DB has {data['total_readings']}, Sent {s.sent_count}") + + print(f" ✅ Persistence Rate: {(verified/sample_size)*100:.1f}% ({verified}/{sample_size})") + return verified == sample_size + +# --- MAIN --- async def main(): - print(f"--- 🌋 QUAKEGUARD LOAD TEST: {NUM_SENSORS} CONCURRENT SENSORS ---") - + print(f"🚀 QUAKEGUARD CRITICAL TEST v2.1") + sem = asyncio.Semaphore(CONCURRENCY_LIMIT) + async with aiohttp.ClientSession() as session: - # PHASE 1: Setup (Device Registration) - sensors = await setup_infrastructure(session) - if not sensors: - print("❌ No sensors registered. Aborting test.") + # Setup + try: + zone_id = await create_dynamic_zone(session) + sensors = [VirtualSensor() for _ in range(NUM_SENSORS)] + await asyncio.gather(*[register_sensor(session, s, zone_id, sem) for s in sensors]) + print(f"📝 Registered {NUM_SENSORS} sensors.") + except Exception as e: + print(f"❌ Setup Failed: {e}") return - print("⏳ Preparing payload (Thundering Herd simulation)...") - tasks = [send_measurement(session, s) for s in sensors] - - print("🚀 FIRE! Sending concurrent requests...") - start_time = time.perf_counter() - - results = await asyncio.gather(*tasks) - - total_time = time.perf_counter() - start_time - - # --- METRICS & REPORTING --- - success_count = sum(1 for status, _ in results if status == 202) - fail_count = sum(1 for status, _ in results if status != 202) - - avg_req_time = sum(t for _, t in results) / len(results) if results else 0 - rps = len(results) / total_time if total_time > 0 else 0 - - print("\n" + "="*50) - print(f"📊 FINAL TEST REPORT") - print("="*50) - print(f"⏱️ Total Execution Time: {total_time:.4f} seconds") - print(f"🚀 Throughput (RPS): {rps:.2f} requests/sec") - print(f"✅ Success (HTTP 202): {success_count}") - print(f"❌ Failures: {fail_count}") - print(f"🐢 Avg Request Latency: {avg_req_time*1000:.2f} ms") - print("="*50) - - if fail_count == 0 and success_count == len(sensors): - print("🏆 TEST PASSED: System handled the load successfully.") + # Execution + load_stats = await run_load_test(session, sensors, sem) + sec_stats = await run_security_test(session, zone_id, sem) + e2e_passed = await verify_persistence_with_polling(session, sensors, sem) + + # Report + print("\n" + "="*40) + print("📊 MISSION REPORT") + print("="*40) + print(f"Traffic: {load_stats.req_success}/{load_stats.req_sent} Accepted") + print(f"Sec (BadSig): {sec_stats.auth_rejected} Blocked") + print(f"Sec (Replay): {sec_stats.replay_rejected} Blocked") + print(f"Persistence: {'PASS' if e2e_passed else 'FAIL'}") + print("="*40) + + if load_stats.req_failed == 0 and e2e_passed and sec_stats.auth_rejected > 0 and sec_stats.replay_rejected > 0: + print("🏆 SYSTEM CERTIFIED") else: - print("⚠️ TEST FAILED or PARTIAL: Check server logs for details.") + print("⚠️ SYSTEM FAILURE") if __name__ == "__main__": try: import sys - if sys.platform == 'win32': - asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) - except AttributeError: - pass - + if sys.platform == 'win32': asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + except: pass asyncio.run(main()) \ No newline at end of file From 6609fd78019a5d3d51bc77192fb1038024df7819 Mon Sep 17 00:00:00 2001 From: GiZano Date: Tue, 10 Feb 2026 23:12:28 +0100 Subject: [PATCH 2/2] Feat: Added new Action to perform a stress test on push --- .github/workflows/quakeguard-ci.yml | 50 +++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) create mode 100644 .github/workflows/quakeguard-ci.yml diff --git a/.github/workflows/quakeguard-ci.yml b/.github/workflows/quakeguard-ci.yml new file mode 100644 index 0000000..3e493e4 --- /dev/null +++ b/.github/workflows/quakeguard-ci.yml @@ -0,0 +1,50 @@ +name: QuakeGuard CI Pipeline + +on: + push: + branches: [ "main", "develop" ] + pull_request: + branches: [ "main" ] + +jobs: + system-stress-test: + runs-on: ubuntu-latest + + steps: + # 1. Download Code + - name: Checkout code + uses: actions/checkout@v3 + + # 2. Prepare Python + - name: Set up Python + uses: actions/setup-python@v4 + with: + python-version: '3.11' + + - name: Install Test Dependencies + run: | + python -m pip install --upgrade pip + pip install aiohttp ecdsa redis sqlalchemy psycopg2-binary geoalchemy2 + + # 3. Start Docker Container + - name: Start Docker Stack + run: | + docker compose up -d --build + # Wait DB to be ready (backend has wait_for_db, but let's give time to docket) + echo "⏳ Waiting for services to stabilize..." + sleep 15 + docker compose ps + + # 4. Execute Stress Test + - name: Run Critical Stress Test + env: + API_URL: "http://localhost:8000" + NUM_SENSORS: 200 # Average Load for CI + CONCURRENCY_LIMIT: 50 + run: | + python -m tests.stress_test_v2 + + # 5. Logs if there are errors + - name: Dump Docker Logs on Failure + if: failure() + run: docker compose logs \ No newline at end of file