diff --git a/backend-data-elaborator/api/docker-compose.yml b/backend-data-elaborator/api/docker-compose.yml index 48f5a6a..a03f88f 100644 --- a/backend-data-elaborator/api/docker-compose.yml +++ b/backend-data-elaborator/api/docker-compose.yml @@ -1,36 +1,38 @@ version: '3.8' services: - # --- PERSISTENCE LAYER (PostgreSQL + PostGIS) --- + # --- DATABASE --- postgres: image: postgis/postgis:15-3.4-alpine + restart: unless-stopped + command: postgres -c 'max_connections=200' -c 'shared_buffers=128MB' environment: - POSTGRES_DB: monitoraggio_db - POSTGRES_USER: developer - POSTGRES_PASSWORD: password - ports: - - "5432:5432" + POSTGRES_DB: ${POSTGRES_DB} + POSTGRES_USER: ${POSTGRES_USER} + POSTGRES_PASSWORD: ${POSTGRES_PASSWORD} volumes: - postgres_data:/var/lib/postgresql/data healthcheck: - test: ["CMD-SHELL", "pg_isready -U developer -d monitoraggio_db"] + test: ["CMD-SHELL", "pg_isready -U ${POSTGRES_USER} -d ${POSTGRES_DB}"] interval: 5s timeout: 3s - retries: 10 + retries: 5 - # --- MESSAGE BROKER & CACHING LAYER --- + # --- REDIS --- redis: image: redis:7-alpine - ports: - - "6379:6379" + restart: unless-stopped + volumes: + - redis_data:/data - # --- INGESTION API (Asynchronous FastAPI) --- + # --- API --- fastapi-app: build: . + restart: always ports: - - "8000:8000" + - "${API_PORT}:8000" environment: - - DATABASE_URL=postgresql://developer:password@postgres:5432/monitoraggio_db + - DATABASE_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432/${POSTGRES_DB} - REDIS_URL=redis://redis:6379/0 depends_on: postgres: @@ -38,16 +40,18 @@ services: redis: condition: service_started - # --- BACKGROUND WORKER (Consumer & Alert Logic) --- + # --- WORKER --- worker: build: . + restart: always command: python src/worker.py environment: - - DATABASE_URL=postgresql://developer:password@postgres:5432/monitoraggio_db + - DATABASE_URL=postgresql://${POSTGRES_USER}:${POSTGRES_PASSWORD}@postgres:5432/${POSTGRES_DB} - REDIS_URL=redis://redis:6379/0 depends_on: - redis - postgres volumes: - postgres_data: \ No newline at end of file + postgres_data: + redis_data: \ No newline at end of file diff --git a/backend-data-elaborator/api/src/main.py b/backend-data-elaborator/api/src/main.py index 274f987..a32c8da 100644 --- a/backend-data-elaborator/api/src/main.py +++ b/backend-data-elaborator/api/src/main.py @@ -1,40 +1,102 @@ """ -QuakeGuard Backend Service API (v2.0 - Security Hardened) ---------------------------------------------------------- +QuakeGuard Backend Service API (v2.6 - Provisioning Restored) +------------------------------------------------------------- +Core API Gateway. Features: -- ECDSA SHA-256 Verification (DER/RAW) -- Replay Attack Protection (Timestamp Window) -- High Concurrency DB Pooling +1. Auto-Provisioning (Device Handshake). +2. IoT Data Ingestion (ECDSA Validation). +3. Real-Time Alert Distribution (Redis Pub/Sub -> WebSocket). """ import json import asyncio import time import hashlib +import os from datetime import datetime -from typing import List, Dict, Any +from typing import List, Dict, Any, Optional +from contextlib import asynccontextmanager -from fastapi import FastAPI, Depends, HTTPException, status +# FastAPI & Pydantic +from fastapi import FastAPI, Depends, HTTPException, status, WebSocket, WebSocketDisconnect +from pydantic import BaseModel, Field + +# Database & Redis from sqlalchemy.orm import Session from sqlalchemy import func, desc, text from sqlalchemy.exc import OperationalError from redis import asyncio as aioredis -# --- CRYPTO IMPORTS --- +# Cryptography from ecdsa import VerifyingKey, NIST256p, BadSignatureError from ecdsa.errors import MalformedPointError from ecdsa.util import sigdecode_der, sigdecode_string - from geoalchemy2.elements import WKTElement +# Local Modules from src.database import get_db, engine import src.models as models import src.schemas as schemas # --- CONFIGURATION --- -MAX_TIMESTAMP_SKEW = 60 # Seconds. Rejects messages older than 1 min. +MAX_TIMESTAMP_SKEW = 60 +REDIS_URL = "redis://redis:6379/0" + +# ⚠️ SECURITY: Shared Secret for Device Provisioning (Must match Firmware) +ENROLLMENT_TOKEN = os.getenv("ENROLLMENT_TOKEN", "S3cret_Qu4k3_K3y") + +# ========================================== +# WEBSOCKET CONNECTION MANAGER +# ========================================== + +class ConnectionManager: + """Manages active WebSocket connections for broadcasting alerts.""" + def __init__(self): + self.active_connections: List[WebSocket] = [] + + async def connect(self, websocket: WebSocket): + await websocket.accept() + self.active_connections.append(websocket) + # print(f"📱 Client Connected. Active: {len(self.active_connections)}") + + def disconnect(self, websocket: WebSocket): + if websocket in self.active_connections: + self.active_connections.remove(websocket) + + async def broadcast(self, message: str): + for connection in self.active_connections: + try: + await connection.send_text(message) + except Exception: + pass + +ws_manager = ConnectionManager() + +# ========================================== +# BACKGROUND SERVICES +# ========================================== + +async def redis_listener(): + """Listens to Redis 'quake_alerts' channel and broadcasts via WebSocket.""" + redis = aioredis.from_url(REDIS_URL, decode_responses=True) + pubsub = redis.pubsub() + await pubsub.subscribe("quake_alerts") + print("🎧 Backend listening on Redis Pub/Sub: 'quake_alerts'") + + try: + async for message in pubsub.listen(): + if message["type"] == "message": + # print(f"🔥 RELAYING ALERT: {message['data']}") + await ws_manager.broadcast(message['data']) + except Exception as e: + print(f"❌ Redis Listener Error: {e}") + finally: + await redis.close() + +# ========================================== +# LIFESPAN & INIT +# ========================================== -# 1. Wait for DB def wait_for_db(retries=10, delay=3): print("Checking Database connection...") for i in range(retries): @@ -44,43 +106,136 @@ def wait_for_db(retries=10, delay=3): print("✅ Database is up and running!") return except OperationalError: - print(f"⏳ Database not ready yet... waiting {delay}s ({i+1}/{retries})") + print(f"⏳ Waiting for DB... ({i+1}/{retries})") time.sleep(delay) - raise Exception("❌ Could not connect to Database.") + raise Exception("❌ DB Connection Failed.") -wait_for_db() -models.Base.metadata.create_all(bind=engine) +@asynccontextmanager +async def lifespan(app: FastAPI): + wait_for_db() + models.Base.metadata.create_all(bind=engine) + redis_task = asyncio.create_task(redis_listener()) + yield + redis_task.cancel() -app = FastAPI(title="QuakeGuard Backend", version="2.0.0") -redis_client = aioredis.from_url("redis://redis:6379/0", decode_responses=True) +app = FastAPI(title="QuakeGuard Backend", version="2.6.0", lifespan=lifespan) +redis_client = aioredis.from_url(REDIS_URL, decode_responses=True) + +# ========================================== +# CRYPTO UTILS +# ========================================== -# --- UTILITY --- def verify_device_signature(public_key_hex: str, message: str, signature_hex: str) -> bool: try: if not public_key_hex or not signature_hex: return False key_bytes = bytes.fromhex(public_key_hex) sig_bytes = bytes.fromhex(signature_hex) message_bytes = message.encode('utf-8') - try: vk = VerifyingKey.from_der(key_bytes) - except (ValueError, MalformedPointError): + except: vk = VerifyingKey.from_string(key_bytes, curve=NIST256p) - try: return vk.verify(sig_bytes, message_bytes, sigdecode=sigdecode_der, hashfunc=hashlib.sha256) - except Exception: + except: try: return vk.verify(sig_bytes, message_bytes, sigdecode=sigdecode_string, hashfunc=hashlib.sha256) - except BadSignatureError: + except: return False - except Exception as e: - print(f"⚠️ Crypto Error: {e}") + except: return False -# --- ENDPOINTS --- +# ========================================== +# 🚨 PROVISIONING ENDPOINT (RESTORED) 🚨 +# ========================================== + +# Internal Schema for Provisioning Request +class DeviceRegistrationRequest(BaseModel): + public_key_hex: str + mac_address: str + enrollment_token: str + firmware_version: Optional[str] = "1.0.0" + +@app.post("/devices/register", status_code=201, tags=["Registration"]) +def register_device(payload: DeviceRegistrationRequest, db: Session = Depends(get_db)): + """ + Auto-Provisioning Endpoint. + Allows new sensors to register themselves securely using a shared secret. + """ + # 1. SECURITY CHECK + if payload.enrollment_token != ENROLLMENT_TOKEN: + print(f"⛔ Unauthorized registration attempt from {payload.mac_address}") + # Artificial delay to slow down brute-force attacks + time.sleep(1) + raise HTTPException(status_code=403, detail="Invalid Enrollment Token") + + # 2. IDEMPOTENCY CHECK (Check by Public Key OR Mac Address) + existing_sensor = db.query(models.Misurator).filter( + (models.Misurator.public_key_hex == payload.public_key_hex) | + (models.Misurator.mac_address == payload.mac_address) + ).first() + + if existing_sensor: + # Update metadata if changed (e.g. firmware update) + if existing_sensor.firmware_version != payload.firmware_version: + existing_sensor.firmware_version = payload.firmware_version + db.commit() + + return { + "sensor_id": existing_sensor.id, + "status": "existing", + "message": "Device already registered", + "zone_id": existing_sensor.zone_id + } -@app.post("/zones/", response_model=schemas.Zone, status_code=201) + # 3. NEW DEVICE REGISTRATION + # Ensure a default zone exists + default_zone = db.query(models.Zone).first() + if not default_zone: + default_zone = models.Zone(city="Default Staging Zone") + db.add(default_zone) + db.commit() + db.refresh(default_zone) + + # Create new sensor record + # Note: We initialize lat/lon to 0.0. These should be updated via GPS later. + gps_point = "POINT(0 0)" + + new_sensor = models.Misurator( + active=True, + zone_id=default_zone.id, + latitude=0.0, + longitude=0.0, + location=WKTElement(gps_point, srid=4326), + public_key_hex=payload.public_key_hex, + # IMPORTANT: Persist hardware identifiers + mac_address=payload.mac_address, + firmware_version=payload.firmware_version + ) + + try: + db.add(new_sensor) + db.commit() + db.refresh(new_sensor) + print(f"🎉 New Device Enrolled: ID {new_sensor.id} (MAC: {payload.mac_address})") + + return { + "sensor_id": new_sensor.id, + "status": "created", + "message": "Device successfully enrolled", + "zone_id": default_zone.id + } + except Exception as e: + db.rollback() + print(f"❌ DB Error during registration: {e}") + raise HTTPException(status_code=500, detail="Registration failed") + + +# ========================================== +# STANDARD API ENDPOINTS +# ========================================== + +@app.post("/zones/", response_model=schemas.Zone, status_code=201, tags=["Registration"]) def create_zone(zone: schemas.ZoneCreate, db: Session = Depends(get_db)): existing = db.query(models.Zone).filter(models.Zone.city == zone.city).first() if existing: return existing @@ -90,91 +245,102 @@ def create_zone(zone: schemas.ZoneCreate, db: Session = Depends(get_db)): db.refresh(db_zone) return db_zone -@app.get("/zones/", response_model=List[schemas.Zone]) +@app.get("/zones/", response_model=List[schemas.Zone], tags=["Data Retrieval"]) def get_zones(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): return db.query(models.Zone).offset(skip).limit(limit).all() -@app.post("/misurators/", response_model=schemas.Misurator, status_code=201) +@app.post("/misurators/", response_model=schemas.Misurator, status_code=201, tags=["Registration"]) def create_misurator(misurator: schemas.MisuratorCreate, db: Session = Depends(get_db)): + """Manual registration (Admin panel)""" existing = db.query(models.Misurator).filter(models.Misurator.public_key_hex == misurator.public_key_hex).first() if existing: return existing - zone = db.query(models.Zone).filter(models.Zone.id == misurator.zone_id).first() - if not zone: raise HTTPException(404, "Zone not found") + if not zone: raise HTTPException(404, detail="Zone not found") gps_point = f"POINT({misurator.longitude} {misurator.latitude})" db_misurator = models.Misurator( active=misurator.active, zone_id=misurator.zone_id, latitude=misurator.latitude, longitude=misurator.longitude, - location=WKTElement(gps_point, srid=4326), - public_key_hex=misurator.public_key_hex + location=WKTElement(gps_point, srid=4326), public_key_hex=misurator.public_key_hex ) db.add(db_misurator) db.commit() db.refresh(db_misurator) return db_misurator -@app.get("/misurators/", response_model=List[schemas.Misurator]) +@app.get("/misurators/", response_model=List[schemas.Misurator], tags=["Data Retrieval"]) def get_misurators(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)): return db.query(models.Misurator).offset(skip).limit(limit).all() -@app.post("/misurations/", status_code=202) +@app.post("/misurations/", status_code=202, tags=["Ingestion"]) async def create_misuration_async(misuration: schemas.MisurationCreate, db: Session = Depends(get_db)): misurator = db.query(models.Misurator).filter(models.Misurator.id == misuration.misurator_id).first() - if not misurator or not misurator.active: - raise HTTPException(403, "Sensor unauthorized") + if not misurator or not misurator.active: raise HTTPException(403, detail="Sensor unauthorized") - # 1. Message Reconstruction + # 1. Signature message = f"{misuration.value}:{int(misuration.device_timestamp)}" - - # 2. Signature Verification (CPU Bound) loop = asyncio.get_event_loop() is_valid = await loop.run_in_executor(None, verify_device_signature, misurator.public_key_hex, message, misuration.signature_hex) + if not is_valid: raise HTTPException(401, detail="Invalid digital signature") + + # 2. Replay + if abs(time.time() - misuration.device_timestamp) > MAX_TIMESTAMP_SKEW: + raise HTTPException(403, detail="Replay Attack Detected") - if not is_valid: - raise HTTPException(401, "Invalid digital signature") - - # 3. REPLAY ATTACK CHECK (The New Guard) 🛡️ - # We check time AFTER verifying signature (so we know timestamp is authentic) - server_now = time.time() - device_ts = misuration.device_timestamp - if abs(server_now - device_ts) > MAX_TIMESTAMP_SKEW: - print(f"⚠️ Replay Blocked! Skew: {server_now - device_ts:.2f}s") - raise HTTPException(403, "Replay Attack Detected: Timestamp out of bounds") - - # 4. Enqueue + # 3. Queue payload = misuration.model_dump() payload['zone_id'] = misurator.zone_id await redis_client.lpush("seismic_events", json.dumps(payload)) - return {"status": "accepted"} -@app.get("/zones/{zone_id}/alerts", response_model=List[schemas.AlertResponse]) +@app.get("/zones/{zone_id}/alerts", response_model=List[schemas.AlertResponse], tags=["Data Retrieval"]) def get_zone_alerts(zone_id: int, limit: int = 10, db: Session = Depends(get_db)): return db.query(models.Alert).filter(models.Alert.zone_id == zone_id).order_by(desc(models.Alert.timestamp)).limit(limit).all() -@app.get("/sensors/{misurator_id}/statistics") +@app.get("/sensors/{misurator_id}/statistics", tags=["Analytics"]) def get_sensor_statistics(misurator_id: int, db: Session = Depends(get_db)): - # This endpoint is CRITICAL for the End-to-End Stress Test verification sensor = db.query(models.Misurator).filter(models.Misurator.id == misurator_id).first() - if not sensor: raise HTTPException(404, "Sensor not found") - + if not sensor: raise HTTPException(404, detail="Sensor not found") stats = db.query( func.count(models.Misuration.id).label("count"), func.avg(models.Misuration.value).label("average"), func.max(models.Misuration.value).label("max_value"), func.min(models.Misuration.value).label("min_value") ).filter(models.Misuration.misurator_id == misurator_id).first() - return { - "misurator_id": misurator_id, - "total_readings": stats.count, + "misurator_id": misurator_id, "total_readings": stats.count, "average_value": round(stats.average, 2) if stats.average else 0.0, - "max_recorded": stats.max_value, - "min_recorded": stats.min_value, + "max_recorded": stats.max_value, "min_recorded": stats.min_value, "generated_at": datetime.utcnow().isoformat() } -@app.get("/health") +@app.get("/health", tags=["System"]) async def health_check(db: Session = Depends(get_db)): - return {"status": "ok"} \ No newline at end of file + return {"status": "ok"} + +# ========================================== +# WS ENDPOINTS +# ========================================== + +@app.websocket("/ws/alerts") +async def websocket_endpoint(websocket: WebSocket): + await ws_manager.connect(websocket) + try: + while True: + await websocket.receive_text() + except WebSocketDisconnect: + ws_manager.disconnect(websocket) + except Exception: + ws_manager.disconnect(websocket) + +@app.post("/simulate-alert", tags=["Testing"]) +async def simulate_alert(zone_id: int, magnitude: float): + alert = { + "type": "CRITICAL_TEST", "zone_id": zone_id, + "magnitude": magnitude, "message": "⚠️ SIMULATION TEST", + "timestamp": datetime.utcnow().isoformat() + } + redis = aioredis.from_url(REDIS_URL, decode_responses=True) + await redis.publish("quake_alerts", json.dumps(alert)) + await redis.close() + return {"status": "Simulated Alert Broadcasted"} \ No newline at end of file diff --git a/backend-data-elaborator/api/src/models.py b/backend-data-elaborator/api/src/models.py index 5fa4cb1..3c46c51 100644 --- a/backend-data-elaborator/api/src/models.py +++ b/backend-data-elaborator/api/src/models.py @@ -1,9 +1,8 @@ """ Database Models Definition -------------------------- -This module defines the SQLAlchemy ORM models for the Earthquake Monitoring System. -It integrates PostGIS geometry types for GPS location handling and defines -the schema for Zones, Sensors (Misurators), Measurements, and Alerts. +Defines the SQLAlchemy ORM models. +Updated to support Device Provisioning (MAC Address & Firmware Version). """ from sqlalchemy import Column, Integer, String, Boolean, DateTime, ForeignKey, Float @@ -13,10 +12,6 @@ from src.database import Base class Zone(Base): - """ - Represents a geographical zone (e.g., a city or district). - Acts as the parent entity for sensors and alerts. - """ __tablename__ = "zones" id = Column(Integer, primary_key=True, index=True) @@ -28,8 +23,8 @@ class Zone(Base): class Misurator(Base): """ - Represents a physical IoT sensor device installed in a specific Zone. - Includes GPS coordinates managed via PostGIS. + IoT Sensor Device. + Includes Security (ECDSA Key) and Hardware Identity (MAC, Firmware). """ __tablename__ = "misurators" @@ -44,9 +39,13 @@ class Misurator(Base): longitude = Column(Float, nullable=True) location = Column(Geometry('POINT', srid=4326), nullable=True) - # --- SECURITY (Ecco il pezzo mancante!) --- - # Stores the ECDSA Public Key used to verify message signatures - public_key_hex = Column(String, nullable=False) + # --- SECURITY & IDENTITY --- + # La chiave pubblica è l'identità crittografica primaria + public_key_hex = Column(String, nullable=False, unique=True, index=True) + + # [FIX CRITICO] Identificativi Hardware per il Provisioning + mac_address = Column(String(17), nullable=True, unique=True, index=True) + firmware_version = Column(String(20), nullable=True) # Relationships zone = relationship("Zone", back_populates="misurators") @@ -54,26 +53,18 @@ class Misurator(Base): class Misuration(Base): - """ - Represents a single data point (acceleration/vibration) recorded by a Misurator. - """ __tablename__ = "misurations" id = Column(Integer, primary_key=True, index=True) created_at = Column(DateTime(timezone=True), server_default=func.now(), nullable=False, index=True) value = Column(Integer, nullable=False) - # Foreign Key misurator_id = Column(Integer, ForeignKey("misurators.id"), nullable=False) - # Relationships misurator = relationship("Misurator", back_populates="misurations") class Alert(Base): - """ - Represents an aggregated/confirmed seismic event for a specific zone. - """ __tablename__ = "alerts" id = Column(Integer, primary_key=True, index=True) @@ -82,5 +73,4 @@ class Alert(Base): severity = Column(Float, nullable=False) message = Column(String(255), nullable=True) - # Relationships zone = relationship("Zone") \ No newline at end of file