Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 50 additions & 0 deletions .github/workflows/quakeguard-ci.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
name: QuakeGuard CI Pipeline

on:
push:
branches: [ "main", "develop" ]
pull_request:
branches: [ "main" ]

jobs:
system-stress-test:
runs-on: ubuntu-latest

steps:
# 1. Download Code
- name: Checkout code
uses: actions/checkout@v3

# 2. Prepare Python
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.11'

- name: Install Test Dependencies
run: |
python -m pip install --upgrade pip
pip install aiohttp ecdsa redis sqlalchemy psycopg2-binary geoalchemy2

# 3. Start Docker Container
- name: Start Docker Stack
run: |
docker compose up -d --build
# Wait DB to be ready (backend has wait_for_db, but let's give time to docket)
echo "⏳ Waiting for services to stabilize..."
sleep 15
docker compose ps

# 4. Execute Stress Test
- name: Run Critical Stress Test
env:
API_URL: "http://localhost:8000"
NUM_SENSORS: 200 # Average Load for CI
CONCURRENCY_LIMIT: 50
run: |
python -m tests.stress_test_v2

# 5. Logs if there are errors
- name: Dump Docker Logs on Failure
if: failure()
run: docker compose logs
229 changes: 53 additions & 176 deletions backend-data-elaborator/api/src/main.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
"""
QuakeGuard Backend Service API
-------------------------------
Orchestrates IoT data ingestion, system health monitoring, and data retrieval.
Implements robust error handling for cryptographic verification (SHA256, DER/RAW).
QuakeGuard Backend Service API (v2.0 - Security Hardened)
---------------------------------------------------------
Features:
- ECDSA SHA-256 Verification (DER/RAW)
- Replay Attack Protection (Timestamp Window)
- High Concurrency DB Pooling
"""

import json
import asyncio
import time
import hashlib
import hashlib
from datetime import datetime
from typing import List, Dict, Any, Optional
from typing import List, Dict, Any

from fastapi import FastAPI, Depends, HTTPException, status
from sqlalchemy.orm import Session
Expand All @@ -25,19 +27,15 @@

from geoalchemy2.elements import WKTElement

# Local imports
from src.database import get_db, engine
import src.models as models
import src.schemas as schemas

# ==========================================
# DATABASE INITIALIZATION & WAITER
# ==========================================
# --- CONFIGURATION ---
MAX_TIMESTAMP_SKEW = 60 # Seconds. Rejects messages older than 1 min.

# 1. Wait for DB
def wait_for_db(retries=10, delay=3):
"""
Blocks startup until the Database is ready to accept connections.
"""
print("Checking Database connection...")
for i in range(retries):
try:
Expand All @@ -48,203 +46,118 @@ def wait_for_db(retries=10, delay=3):
except OperationalError:
print(f"⏳ Database not ready yet... waiting {delay}s ({i+1}/{retries})")
time.sleep(delay)

raise Exception("❌ Could not connect to Database after multiple retries.")
raise Exception("❌ Could not connect to Database.")

# 1. Wait for DB
wait_for_db()

# 2. Create Tables
models.Base.metadata.create_all(bind=engine)


# Initialize FastAPI
app = FastAPI(
title="Q Backend Service",
description="Core API for Earthquake Alarm System: Ingestion, Alerts, and Statistics.",
version="1.7.0"
)

# Initialize Redis
app = FastAPI(title="QuakeGuard Backend", version="2.0.0")
redis_client = aioredis.from_url("redis://redis:6379/0", decode_responses=True)


# --- UTILITY FUNCTIONS ---

# --- UTILITY ---
def verify_device_signature(public_key_hex: str, message: str, signature_hex: str) -> bool:
"""
Verifies ECDSA signature using SHA256 hashing.
Compatible with ESP32 MbedTLS (DER) and Standard Python (RAW).
"""
try:
if not public_key_hex or not signature_hex:
return False

if not public_key_hex or not signature_hex: return False
key_bytes = bytes.fromhex(public_key_hex)
sig_bytes = bytes.fromhex(signature_hex)
message_bytes = message.encode('utf-8')

# 1. Load the Key (Try DER first - ESP32 Standard, fallback to RAW)
try:
vk = VerifyingKey.from_der(key_bytes)
except (ValueError, MalformedPointError):
vk = VerifyingKey.from_string(key_bytes, curve=NIST256p)

# 2. Verify with SHA256 (CRITICAL: Matches ESP32's mbedtls_md_info_from_type(SHA256))
try:
# Try DER (ASN.1) first
return vk.verify(sig_bytes, message_bytes, sigdecode=sigdecode_der, hashfunc=hashlib.sha256)
except BadSignatureError:
# Fallback to RAW string signature
except Exception:
try:
return vk.verify(sig_bytes, message_bytes, sigdecode=sigdecode_string, hashfunc=hashlib.sha256)
except BadSignatureError:
return False

except Exception as e:
print(f"⚠️ Crypto Validation Error: {str(e)}")
print(f"⚠️ Crypto Error: {e}")
return False

# --- ENDPOINTS ---

# ==========================================
# REGISTRATION ENDPOINTS
# ==========================================

@app.post("/zones/", response_model=schemas.Zone, status_code=status.HTTP_201_CREATED, tags=["Registration"])
@app.post("/zones/", response_model=schemas.Zone, status_code=201)
def create_zone(zone: schemas.ZoneCreate, db: Session = Depends(get_db)):
"""Registers a new geographical zone."""
existing = db.query(models.Zone).filter(models.Zone.city == zone.city).first()
if existing:
return existing
if existing: return existing
db_zone = models.Zone(city=zone.city)
db.add(db_zone)
db.commit()
db.refresh(db_zone)
return db_zone

@app.get("/zones/", response_model=List[schemas.Zone], tags=["Data Retrieval"])
@app.get("/zones/", response_model=List[schemas.Zone])
def get_zones(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
"""
List all available geographical zones.
Useful for frontend dropdowns or sensor configuration.
"""
return db.query(models.Zone).offset(skip).limit(limit).all()


@app.post("/misurators/", response_model=schemas.Misurator, status_code=status.HTTP_201_CREATED, tags=["Registration"])
@app.post("/misurators/", response_model=schemas.Misurator, status_code=201)
def create_misurator(misurator: schemas.MisuratorCreate, db: Session = Depends(get_db)):
"""Registers a new IoT Sensor and its Public Key."""
# Check if key needs update (for Dev convenience)
# Note: In prod, you might query by public_key or hardware_id, here we simplify.
existing = db.query(models.Misurator).filter(models.Misurator.public_key_hex == misurator.public_key_hex).first()
if existing:
return existing
if existing: return existing

zone = db.query(models.Zone).filter(models.Zone.id == misurator.zone_id).first()
if zone is None:
raise HTTPException(status_code=404, detail="Zone not found")
if not zone: raise HTTPException(404, "Zone not found")

gps_point = f"POINT({misurator.longitude} {misurator.latitude})"

db_misurator = models.Misurator(
active=misurator.active,
zone_id=misurator.zone_id,
latitude=misurator.latitude,
longitude=misurator.longitude,
active=misurator.active, zone_id=misurator.zone_id,
latitude=misurator.latitude, longitude=misurator.longitude,
location=WKTElement(gps_point, srid=4326),
public_key_hex=misurator.public_key_hex
)

db.add(db_misurator)
db.commit()
db.refresh(db_misurator)
return db_misurator


@app.get("/misurators/", response_model=List[schemas.Misurator], tags=["Data Retrieval"])
@app.get("/misurators/", response_model=List[schemas.Misurator])
def get_misurators(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
"""List all registered sensors."""
return db.query(models.Misurator).offset(skip).limit(limit).all()


# ==========================================
# INGESTION ENDPOINT
# ==========================================

@app.post("/misurations/", status_code=status.HTTP_202_ACCEPTED, tags=["Ingestion"])
async def create_misuration_async(
misuration: schemas.MisurationCreate,
db: Session = Depends(get_db)
):
"""
Receives data, verifies signature (SHA256), and queues to Redis.
"""
@app.post("/misurations/", status_code=202)
async def create_misuration_async(misuration: schemas.MisurationCreate, db: Session = Depends(get_db)):
misurator = db.query(models.Misurator).filter(models.Misurator.id == misuration.misurator_id).first()

if not misurator or not misurator.active:
raise HTTPException(status_code=403, detail="Sensor unauthorized or inactive")
raise HTTPException(403, "Sensor unauthorized")

# CRITICAL: Reconstruct message as "value:int(timestamp)" to match ESP32
# 1. Message Reconstruction
message = f"{misuration.value}:{int(misuration.device_timestamp)}"

# 2. Signature Verification (CPU Bound)
loop = asyncio.get_event_loop()
is_valid = await loop.run_in_executor(
None,
verify_device_signature,
misurator.public_key_hex,
message,
misuration.signature_hex
)
is_valid = await loop.run_in_executor(None, verify_device_signature, misurator.public_key_hex, message, misuration.signature_hex)

if not is_valid:
print(f"\n❌ SIGNATURE FAILED for Sensor {misurator.id}")
print(f"Expected Message: {message}")
print(f"Stored Key: {misurator.public_key_hex[:15]}...")
print(f"Received Sig: {misuration.signature_hex[:15]}...\n")
raise HTTPException(status_code=401, detail="Invalid digital signature")
raise HTTPException(401, "Invalid digital signature")

# Prepare payload for Worker
# 3. REPLAY ATTACK CHECK (The New Guard) 🛡️
# We check time AFTER verifying signature (so we know timestamp is authentic)
server_now = time.time()
device_ts = misuration.device_timestamp
if abs(server_now - device_ts) > MAX_TIMESTAMP_SKEW:
print(f"⚠️ Replay Blocked! Skew: {server_now - device_ts:.2f}s")
raise HTTPException(403, "Replay Attack Detected: Timestamp out of bounds")

# 4. Enqueue
payload = misuration.model_dump()
payload['zone_id'] = misurator.zone_id

await redis_client.lpush("seismic_events", json.dumps(payload))

return {"status": "accepted", "detail": "Data enqueued"}

return {"status": "accepted"}

# ==========================================
# STATISTICS & ALERTS ENDPOINTS (RESTORED)
# ==========================================

@app.get("/zones/{zone_id}/alerts", response_model=List[schemas.AlertResponse], tags=["Data Retrieval"])
def get_zone_alerts(
zone_id: int,
limit: int = 10,
db: Session = Depends(get_db)
):
"""
Retrieves recent seismic alerts for a specific zone.
"""
alerts = db.query(models.Alert)\
.filter(models.Alert.zone_id == zone_id)\
.order_by(desc(models.Alert.timestamp))\
.limit(limit)\
.all()

return alerts if alerts else []
@app.get("/zones/{zone_id}/alerts", response_model=List[schemas.AlertResponse])
def get_zone_alerts(zone_id: int, limit: int = 10, db: Session = Depends(get_db)):
return db.query(models.Alert).filter(models.Alert.zone_id == zone_id).order_by(desc(models.Alert.timestamp)).limit(limit).all()


@app.get("/sensors/{misurator_id}/statistics", tags=["Analytics"])
def get_sensor_statistics(
misurator_id: int,
db: Session = Depends(get_db)
):
"""
Computes statistical aggregates (AVG, MAX, MIN, COUNT) for a sensor.
"""
@app.get("/sensors/{misurator_id}/statistics")
def get_sensor_statistics(misurator_id: int, db: Session = Depends(get_db)):
# This endpoint is CRITICAL for the End-to-End Stress Test verification
sensor = db.query(models.Misurator).filter(models.Misurator.id == misurator_id).first()
if not sensor:
raise HTTPException(status_code=404, detail="Sensor not found")
if not sensor: raise HTTPException(404, "Sensor not found")

stats = db.query(
func.count(models.Misuration.id).label("count"),
Expand All @@ -262,42 +175,6 @@ def get_sensor_statistics(
"generated_at": datetime.utcnow().isoformat()
}


# ==========================================
# SYSTEM HEALTH
# ==========================================

@app.get("/health", tags=["System"])
@app.get("/health")
async def health_check(db: Session = Depends(get_db)):
"""
Checks connection status for Database and Redis.
"""
health_status: Dict[str, Any] = {
"status": "ok",
"timestamp": datetime.utcnow().isoformat(),
"services": {
"database": "unknown",
"redis": "unknown"
}
}

# 1. Check DB
try:
db.execute(func.now())
health_status["services"]["database"] = "connected"
except Exception as e:
health_status["status"] = "degraded"
health_status["services"]["database"] = f"error: {str(e)}"

# 2. Check Redis
try:
await redis_client.ping()
health_status["services"]["redis"] = "connected"
except Exception as e:
health_status["status"] = "degraded"
health_status["services"]["redis"] = f"error: {str(e)}"

if health_status["status"] != "ok":
raise HTTPException(status_code=503, detail=health_status)

return health_status
return {"status": "ok"}
Loading
Loading