Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@ REDIS_URL=redis://redis:6379
# Frontend Port (Default 80)
FRONTEND_PORT=80

# MQTT Broker Port — exposed on LAN for hardware devices to publish to (Default 1883)
# Point RTL_433, Meshtastic, AIS-catcher, and other IoT devices at this Pi's LAN IP on this port.
MQTT_PORT=1883

# MQTT source credentials — only required when auth_enabled=true in sources.yml.
# Variable name format: MQTT_{UPPER_SNAKE_SOURCE_NAME}_USERNAME / _PASSWORD
# Example for a source named "Remote Broker":
# MQTT_REMOTE_BROKER_USERNAME=
# MQTT_REMOTE_BROKER_PASSWORD=

# Logging Level (DEBUG, INFO, WARNING, ERROR)
LOG_LEVEL=INFO

Expand Down Expand Up @@ -145,12 +155,7 @@ VITE_OBSERVATION_RANGE_KM=50
# Keep false for normal operation. True enables cleaner map snapshots but costs render performance.
VITE_PRESERVE_DRAWING_BUFFER=false

# ── 6. TINYGS (OPTIONAL) ─────────────────────────────────────────────────────
# TinyGS is sunset by default due to upstream API instability/404 responses.
# Set true to opt in and enable the TinyGS poller again.
TINYGS_ENABLED=false

# ── 7. TRIMET GTFS-RT (OPTIONAL) ─────────────────────────────────────────────
# ── 6. TRIMET GTFS-RT (OPTIONAL) ─────────────────────────────────────────────
# MAX light rail, WES commuter rail, Portland Streetcar real-time positions.
# Free AppID at: https://developer.trimet.org/
# route_types: 0=Tram/Streetcar, 1=Light Rail/MAX, 2=Rail/WES (comma-separated)
Expand Down
13 changes: 13 additions & 0 deletions backend/config_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,18 @@ class PollerSourceEntry(BaseModel):
source: Literal["config", "user"] = "config"


class MqttSourceEntry(BaseModel):
name: str
normalizer: Literal["rtl_433", "meshtastic", "ais"]
broker: str = "mosquitto"
port: int = 1883
topic: str
qos: int = 0
auth_enabled: bool = False
enabled: bool = True
source: Literal["config", "user"] = "config"


class AlertZonesConfig(BaseModel):
nws_zones: list[str] = []
source: Literal["config", "user"] = "config"
Expand All @@ -47,6 +59,7 @@ class SourcesConfig(BaseModel):
news_feeds: list[NewsFeedEntry] = []
poller_sources: list[PollerSourceEntry] = []
alert_zones: AlertZonesConfig = AlertZonesConfig()
mqtt_sources: list[MqttSourceEntry] = []


def load_sources_config() -> SourcesConfig:
Expand Down
33 changes: 33 additions & 0 deletions backend/config_writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,39 @@ async def update_entry(section: str, url: str, updates: dict) -> None:
await _write_raw(data)


async def add_mqtt_entry(entry: dict) -> None:
"""Append an MQTT source entry to sources.yml (keyed by name, not url)."""
async with _write_lock:
data = await _read_raw()
data.setdefault("mqtt_sources", [])
if data["mqtt_sources"] is None:
data["mqtt_sources"] = []
data["mqtt_sources"].append(entry)
await _write_raw(data)


async def remove_mqtt_entry(name: str) -> None:
"""Remove the MQTT source entry matching name from sources.yml."""
async with _write_lock:
data = await _read_raw()
entries = data.get("mqtt_sources") or []
data["mqtt_sources"] = [e for e in entries if e.get("name") != name]
await _write_raw(data)


async def update_mqtt_entry(name: str, updates: dict) -> None:
"""Apply field updates to the MQTT source entry matching name in sources.yml."""
async with _write_lock:
data = await _read_raw()
entries = data.get("mqtt_sources") or []
for entry in entries:
if entry.get("name") == name:
entry.update(updates)
break
data["mqtt_sources"] = entries
await _write_raw(data)


async def add_alert_zone(zone_code: str) -> None:
"""Append a zone code to alert_zones.nws_zones."""
async with _write_lock:
Expand Down
19 changes: 18 additions & 1 deletion backend/db/models.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from datetime import datetime
from typing import Optional
from sqlalchemy import String, Float, DateTime, Text, JSON, ForeignKey, Index, Boolean, func, UniqueConstraint
from sqlalchemy import String, Float, DateTime, Text, JSON, ForeignKey, Index, Boolean, Integer, func, UniqueConstraint
from sqlalchemy.orm import Mapped, mapped_column, relationship
from geoalchemy2 import Geometry

Expand Down Expand Up @@ -185,6 +185,23 @@ class AlertFeedConfig(Base):
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())


class MqttSource(Base):
__tablename__ = "mqtt_sources"

id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(String(128), unique=True)
normalizer: Mapped[str] = mapped_column(String(32), index=True)
broker: Mapped[str] = mapped_column(String(256), default="mosquitto")
port: Mapped[int] = mapped_column(Integer, default=1883)
topic: Mapped[str] = mapped_column(String(512))
qos: Mapped[int] = mapped_column(Integer, default=0)
auth_enabled: Mapped[bool] = mapped_column(Boolean, default=False)
enabled: Mapped[bool] = mapped_column(Boolean, default=True)
source: Mapped[str] = mapped_column(String(16), default="config")
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())


class AlertRule(Base):
__tablename__ = "alert_rules"

Expand Down
2 changes: 1 addition & 1 deletion backend/routers/admin.py
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ def delta(key: str) -> float:
"news_article": "news",
"aprs": "aprs",
"traffic": "traffic",
"tinygs_station": "tinygs",
"rf_sensor": "mqtt",
"fire_incident": "fire",
"stream_gauge": "streamgauge",
}
Expand Down
98 changes: 96 additions & 2 deletions backend/routers/sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession

from config_writer import add_entry, add_alert_zone, remove_entry, remove_alert_zone, update_entry
from config_writer import add_entry, add_alert_zone, remove_entry, remove_alert_zone, update_entry, add_mqtt_entry, remove_mqtt_entry, update_mqtt_entry
from deps import get_db
from db.models import AlertZoneConfig, NewsFeed, PollerSource
from db.models import AlertZoneConfig, MqttSource, NewsFeed, PollerSource

router = APIRouter(prefix="/sources", tags=["sources"])

Expand Down Expand Up @@ -235,3 +235,97 @@ async def delete_alert_zone(zone_id: int, db: AsyncSession = Depends(get_db)):
await db.delete(az)
await db.commit()
await remove_alert_zone(code)


# ---------------------------------------------------------------------------
# MQTT sources
# ---------------------------------------------------------------------------

class MqttSourceCreate(BaseModel):
name: str
normalizer: Literal["rtl_433", "meshtastic", "ais"]
broker: str = "mosquitto"
port: int = 1883
topic: str
qos: int = 0
auth_enabled: bool = False
enabled: bool = True


class MqttSourceResponse(BaseModel):
id: int
name: str
normalizer: str
broker: str
port: int
topic: str
qos: int
auth_enabled: bool
enabled: bool
source: str


def _ms_response(ms: MqttSource) -> MqttSourceResponse:
return MqttSourceResponse(
id=ms.id, name=ms.name, normalizer=ms.normalizer,
broker=ms.broker, port=ms.port, topic=ms.topic,
qos=ms.qos, auth_enabled=ms.auth_enabled,
enabled=ms.enabled, source=ms.source,
)


@router.get("/mqtt", response_model=list[MqttSourceResponse])
async def list_mqtt_sources(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(MqttSource).order_by(MqttSource.normalizer, MqttSource.id))
return [_ms_response(ms) for ms in result.scalars().all()]


@router.post("/mqtt", response_model=MqttSourceResponse, status_code=201)
async def create_mqtt_source(body: MqttSourceCreate, db: AsyncSession = Depends(get_db)):
ms = MqttSource(
name=body.name, normalizer=body.normalizer, broker=body.broker,
port=body.port, topic=body.topic, qos=body.qos,
auth_enabled=body.auth_enabled, enabled=body.enabled,
source="user",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
)
db.add(ms)
try:
await db.commit()
except IntegrityError:
await db.rollback()
raise HTTPException(409, f"MQTT source {body.name!r} already exists")
await db.refresh(ms)
await add_mqtt_entry({
"name": ms.name, "normalizer": ms.normalizer, "broker": ms.broker,
"port": ms.port, "topic": ms.topic, "qos": ms.qos,
"auth_enabled": ms.auth_enabled, "enabled": ms.enabled, "source": "user",
})
return _ms_response(ms)


@router.patch("/mqtt/{source_id}/toggle", response_model=MqttSourceResponse)
async def toggle_mqtt_source(source_id: int, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(MqttSource).where(MqttSource.id == source_id))
ms = result.scalar_one_or_none()
if not ms:
raise HTTPException(404, "MQTT source not found")
ms.enabled = not ms.enabled
ms.updated_at = datetime.now(timezone.utc)
await db.commit()
await db.refresh(ms)
await update_mqtt_entry(ms.name, {"enabled": ms.enabled})
return _ms_response(ms)


@router.delete("/mqtt/{source_id}", status_code=204)
async def delete_mqtt_source(source_id: int, db: AsyncSession = Depends(get_db)):
result = await db.execute(select(MqttSource).where(MqttSource.id == source_id))
ms = result.scalar_one_or_none()
if not ms:
raise HTTPException(404, "MQTT source not found")
name = ms.name
await db.delete(ms)
await db.commit()
await remove_mqtt_entry(name)
51 changes: 46 additions & 5 deletions config/sources.example.yml
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,52 @@ poller_sources:
source: config

# ---------------------------------------------------------------------------
# TinyGS — LoRa satellite ground station
# The poller connects to mqtt.tinygs.com:8883 using your TinyGS MQTT credentials.
# Set TINYGS_MQTT_USERNAME and TINYGS_MQTT_PASSWORD in .env (from the Telegram bot).
# No entry needed here — credentials are environment variables, not URL-based sources.
# ---------------------------------------------------------------------------
# MQTT sources — IoT and hardware devices that publish to the local broker.
# The Mosquitto broker starts automatically as part of the Vertex stack.
# Hardware devices should be configured to publish to this Pi's LAN IP on
# port 1883 (or the MQTT_PORT you set in .env).
#
# broker: "mosquitto" — internal Docker service name (use for all local hardware)
# "192.168.1.x" — LAN device running its own broker (e.g., Frigate on another Pi)
#
# normalizer: which parser handles messages on this topic
# rtl_433 — RTL_433 SDR RF sensor decoder (rtl_433 -F "mqtt://localhost:1883,events=rtl_433/events")
# meshtastic — Meshtastic node JSON MQTT uplink (configure node MQTT server to Pi LAN IP)
# ais — AIS-catcher MQTT output (alternative to WebSocket poller_source)
#
# auth_enabled: false for local Mosquitto (no credentials needed)
# true for external brokers — set MQTT_{UPPER_SNAKE_NAME}_USERNAME/PASSWORD in .env
# ---------------------------------------------------------------------------
mqtt_sources:
- name: "RTL_433 Sensors"
normalizer: rtl_433
broker: mosquitto
port: 1883
topic: "rtl_433/#"
qos: 0
auth_enabled: false
enabled: false
source: config

- name: "Meshtastic"
normalizer: meshtastic
broker: mosquitto
port: 1883
topic: "msh/#"
qos: 0
auth_enabled: false
enabled: false
source: config

- name: "AIS-Catcher MQTT"
normalizer: ais
broker: mosquitto
port: 1883
topic: "ais/#"
qos: 0
auth_enabled: false
enabled: false
source: config

# ---------------------------------------------------------------------------
# Alert zones — NWS zone codes for weather alerts.
Expand Down
22 changes: 22 additions & 0 deletions db/init/10_mqtt_sources.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
-- MQTT source configuration table
-- Runtime DB cache of mqtt_sources entries from config/sources.yml.
-- Seeded at startup by the config_sync process; kept in sync by the config watcher.
-- Credentials are never stored here — auth uses env vars keyed by source name.

CREATE TABLE IF NOT EXISTS mqtt_sources (
id SERIAL PRIMARY KEY,
name VARCHAR(128) NOT NULL UNIQUE,
normalizer VARCHAR(32) NOT NULL,
broker VARCHAR(256) NOT NULL DEFAULT 'mosquitto',
port INTEGER NOT NULL DEFAULT 1883,
topic VARCHAR(512) NOT NULL,
qos INTEGER NOT NULL DEFAULT 0,
auth_enabled BOOLEAN NOT NULL DEFAULT FALSE,
enabled BOOLEAN NOT NULL DEFAULT TRUE,
source VARCHAR(16) NOT NULL DEFAULT 'config',
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE INDEX IF NOT EXISTS ix_mqtt_sources_enabled ON mqtt_sources (enabled);
CREATE INDEX IF NOT EXISTS ix_mqtt_sources_normalizer ON mqtt_sources (normalizer);
28 changes: 28 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,31 @@ services:
networks:
- internal

# ── MQTT Broker ──────────────────────────────────
mosquitto:
image: eclipse-mosquitto:2
restart: unless-stopped
deploy:
resources:
limits:
memory: 64m
cpus: '0.25'
reservations:
memory: 16m
cpus: '0.05'
volumes:
- ./infra/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf:ro
- mosquitto_data:/mosquitto/data
ports:
- "${MQTT_PORT:-1883}:1883"
healthcheck:
test: ["CMD-SHELL", "mosquitto_pub -h 127.0.0.1 -p 1883 -t 'health/check' -m 'ok' -q 0"]
interval: 10s
timeout: 5s
retries: 5
networks:
- internal

# ── Application ──────────────────────────────────
backend:
build:
Expand Down Expand Up @@ -126,6 +151,8 @@ services:
condition: service_healthy
redis:
condition: service_healthy
mosquitto:
condition: service_healthy
networks:
- internal
volumes:
Expand Down Expand Up @@ -225,5 +252,6 @@ networks:
volumes:
db_data:
redis_data:
mosquitto_data: # MQTT broker persistence (retained messages, subscriptions)
p25_audio: # P25 call recordings; shared with transcription service
whisper_models: # Whisper model weights cache; avoids re-downloading on container restart
2 changes: 1 addition & 1 deletion frontend/src/admin/metrics/SignalQualityChart.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const ENTITY_COLORS: Record<string, string> = {
mesh_node: '#FF8F00', // cat-mesh
aprs_position: '#B388FF', // cat-aprs
p25: '#FF8F00', // cat-mesh (P25 radio)
tinygs_packet: '#4FC3F7', // cat-stream

}

function barColor(type: string) {
Expand Down
Loading
Loading