From 6be98961d0f67cae278864527e27cc0ccc169eae Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 21 May 2026 04:05:19 +0000 Subject: [PATCH 1/3] Add local MQTT broker and IoT hardware ingress pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds Mosquitto as a core container (always-on, no profile flag) and a new mqtt_subscriber poller that dispatches incoming messages to per-normalizer handlers. Replaces the unstable TinyGS REST-only integration with a proper local-node MQTT path while keeping the REST poller as an opt-in fallback. Infrastructure: - docker-compose.yml: mosquitto service (eclipse-mosquitto:2), mosquitto_data volume, LAN port 1883 (MQTT_PORT), poller now depends on mosquitto health - infra/mosquitto/mosquitto.conf: anonymous, plaintext, local-network-only Data pipeline (poller): - pollers/mqtt_subscriber.py: groups sources by broker, one aiomqtt connection per broker, MQTT wildcard topic matching, reconnects on failure - normalizers/tinygs_mqtt.py: tinygs_satellite entities + satellite_contact events from local TinyGS node packets (SNR, RSSI, satPos, decoded payload) - normalizers/rtl_433.py: rf_sensor entities at home coords from RTL_433 SDR decoded frames (temperature, humidity, wind, power, etc.) - normalizers/meshtastic_mqtt.py: mesh_node entities from Meshtastic JSON MQTT uplink; text messages fenced from MeshCore by source_url prefix "mqtt:" - normalizers/ais_mqtt.py: vessel entities via AIS-catcher MQTT output, delegates to existing normalize_ais_catcher, alternative to WebSocket poller - db.py: add write_mesh_message() for Meshtastic MQTT chat persistence Config layer (both backend and poller): - config_loader.py (x2): MqttSourceEntry model, added to SourcesConfig - config_writer.py: add/remove/update_mqtt_entry keyed by name (not url) - config_sync.py: _sync_mqtt_sources with full config/user diff semantics - db/init/10_mqtt_sources.sql: mqtt_sources table (no credential columns) API: - routers/sources.py: GET/POST/PATCH/DELETE /sources/mqtt endpoints - routers/admin.py: tinygs_satellite and rf_sensor added to _TYPE_TO_POLLER - db/models.py: MqttSource ORM model Auth: boolean auth_enabled gate per source; credentials from env vars MQTT_{UPPER_SNAKE_NAME}_USERNAME/PASSWORD — never stored in DB or YAML Frontend: - storeTypes.ts: tinygs_satellite and rf_sensor added to EntityTypeFilter - store.ts: defaults (true) and TTLs (3600s satellite, 900s sensor) - MapOverlay.tsx: entityFilterRef initializer updated to include new types Config docs: - sources.example.yml: mqtt_sources section with all four source types - .env.example: MQTT_PORT, auth credential naming convention, updated TINYGS_ENABLED comment pointing users toward local MQTT path --- .env.example | 16 +- backend/config_loader.py | 13 ++ backend/config_writer.py | 33 ++++ backend/db/models.py | 19 ++- backend/routers/admin.py | 2 + backend/routers/sources.py | 98 ++++++++++- config/sources.example.yml | 66 +++++++- db/init/10_mqtt_sources.sql | 22 +++ docker-compose.yml | 28 ++++ frontend/src/components/MapOverlay.tsx | 2 +- frontend/src/store.ts | 8 +- frontend/src/storeTypes.ts | 2 + infra/mosquitto/mosquitto.conf | 17 ++ poller/config_loader.py | 13 ++ poller/config_sync.py | 54 +++++- poller/db.py | 37 +++++ poller/main.py | 5 +- poller/normalizers/ais_mqtt.py | 41 +++++ poller/normalizers/meshtastic_mqtt.py | 217 +++++++++++++++++++++++++ poller/normalizers/rtl_433.py | 127 +++++++++++++++ poller/normalizers/tinygs_mqtt.py | 112 +++++++++++++ poller/pollers/mqtt_subscriber.py | 175 ++++++++++++++++++++ 22 files changed, 1088 insertions(+), 19 deletions(-) create mode 100644 db/init/10_mqtt_sources.sql create mode 100644 infra/mosquitto/mosquitto.conf create mode 100644 poller/normalizers/ais_mqtt.py create mode 100644 poller/normalizers/meshtastic_mqtt.py create mode 100644 poller/normalizers/rtl_433.py create mode 100644 poller/normalizers/tinygs_mqtt.py create mode 100644 poller/pollers/mqtt_subscriber.py diff --git a/.env.example b/.env.example index 99f180d..89970e0 100644 --- a/.env.example +++ b/.env.example @@ -18,6 +18,16 @@ REDIS_URL=redis://redis:6379 # Frontend Port (Default 80) FRONTEND_PORT=80 +# MQTT Broker Port — exposed on LAN for hardware devices to publish to (Default 1883) +# Point TinyGS nodes, RTL_433, Meshtastic, and AIS-catcher at this Pi's LAN IP on this port. +MQTT_PORT=1883 + +# MQTT source credentials — only required when auth_enabled=true in sources.yml. +# Variable name format: MQTT_{UPPER_SNAKE_SOURCE_NAME}_USERNAME / _PASSWORD +# Example for a source named "Remote Broker": +# MQTT_REMOTE_BROKER_USERNAME= +# MQTT_REMOTE_BROKER_PASSWORD= + # Logging Level (DEBUG, INFO, WARNING, ERROR) LOG_LEVEL=INFO @@ -146,8 +156,10 @@ VITE_OBSERVATION_RANGE_KM=50 VITE_PRESERVE_DRAWING_BUFFER=false # ── 6. TINYGS (OPTIONAL) ───────────────────────────────────────────────────── -# TinyGS is sunset by default due to upstream API instability/404 responses. -# Set true to opt in and enable the TinyGS poller again. +# Global TinyGS REST poller — polls the public TinyGS API for all community +# ground station locations. Disabled by default due to upstream API instability. +# For richer local data (packet SNR/RSSI, satellite positions), configure a +# tinygs mqtt_source in sources.yml pointing at your local TinyGS node instead. TINYGS_ENABLED=false # ── 7. TRIMET GTFS-RT (OPTIONAL) ───────────────────────────────────────────── diff --git a/backend/config_loader.py b/backend/config_loader.py index 7b69ee1..2f73241 100644 --- a/backend/config_loader.py +++ b/backend/config_loader.py @@ -32,6 +32,18 @@ class PollerSourceEntry(BaseModel): source: Literal["config", "user"] = "config" +class MqttSourceEntry(BaseModel): + name: str + normalizer: Literal["tinygs", "rtl_433", "meshtastic", "ais"] + broker: str = "mosquitto" + port: int = 1883 + topic: str + qos: int = 0 + auth_enabled: bool = False + enabled: bool = True + source: Literal["config", "user"] = "config" + + class AlertZonesConfig(BaseModel): nws_zones: list[str] = [] source: Literal["config", "user"] = "config" @@ -47,6 +59,7 @@ class SourcesConfig(BaseModel): news_feeds: list[NewsFeedEntry] = [] poller_sources: list[PollerSourceEntry] = [] alert_zones: AlertZonesConfig = AlertZonesConfig() + mqtt_sources: list[MqttSourceEntry] = [] def load_sources_config() -> SourcesConfig: diff --git a/backend/config_writer.py b/backend/config_writer.py index 88bd9f3..4792ac3 100644 --- a/backend/config_writer.py +++ b/backend/config_writer.py @@ -66,6 +66,39 @@ async def update_entry(section: str, url: str, updates: dict) -> None: await _write_raw(data) +async def add_mqtt_entry(entry: dict) -> None: + """Append an MQTT source entry to sources.yml (keyed by name, not url).""" + async with _write_lock: + data = await _read_raw() + data.setdefault("mqtt_sources", []) + if data["mqtt_sources"] is None: + data["mqtt_sources"] = [] + data["mqtt_sources"].append(entry) + await _write_raw(data) + + +async def remove_mqtt_entry(name: str) -> None: + """Remove the MQTT source entry matching name from sources.yml.""" + async with _write_lock: + data = await _read_raw() + entries = data.get("mqtt_sources") or [] + data["mqtt_sources"] = [e for e in entries if e.get("name") != name] + await _write_raw(data) + + +async def update_mqtt_entry(name: str, updates: dict) -> None: + """Apply field updates to the MQTT source entry matching name in sources.yml.""" + async with _write_lock: + data = await _read_raw() + entries = data.get("mqtt_sources") or [] + for entry in entries: + if entry.get("name") == name: + entry.update(updates) + break + data["mqtt_sources"] = entries + await _write_raw(data) + + async def add_alert_zone(zone_code: str) -> None: """Append a zone code to alert_zones.nws_zones.""" async with _write_lock: diff --git a/backend/db/models.py b/backend/db/models.py index e08ba1f..f7f4039 100644 --- a/backend/db/models.py +++ b/backend/db/models.py @@ -1,6 +1,6 @@ from datetime import datetime from typing import Optional -from sqlalchemy import String, Float, DateTime, Text, JSON, ForeignKey, Index, Boolean, func, UniqueConstraint +from sqlalchemy import String, Float, DateTime, Text, JSON, ForeignKey, Index, Boolean, Integer, func, UniqueConstraint from sqlalchemy.orm import Mapped, mapped_column, relationship from geoalchemy2 import Geometry @@ -185,6 +185,23 @@ class AlertFeedConfig(Base): updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now()) +class MqttSource(Base): + __tablename__ = "mqtt_sources" + + id: Mapped[int] = mapped_column(primary_key=True, autoincrement=True) + name: Mapped[str] = mapped_column(String(128), unique=True) + normalizer: Mapped[str] = mapped_column(String(32), index=True) + broker: Mapped[str] = mapped_column(String(256), default="mosquitto") + port: Mapped[int] = mapped_column(Integer, default=1883) + topic: Mapped[str] = mapped_column(String(512)) + qos: Mapped[int] = mapped_column(Integer, default=0) + auth_enabled: Mapped[bool] = mapped_column(Boolean, default=False) + enabled: Mapped[bool] = mapped_column(Boolean, default=True) + source: Mapped[str] = mapped_column(String(16), default="config") + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now()) + updated_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now()) + + class AlertRule(Base): __tablename__ = "alert_rules" diff --git a/backend/routers/admin.py b/backend/routers/admin.py index aad1f4f..0631be6 100644 --- a/backend/routers/admin.py +++ b/backend/routers/admin.py @@ -187,6 +187,8 @@ def delta(key: str) -> float: "aprs": "aprs", "traffic": "traffic", "tinygs_station": "tinygs", + "tinygs_satellite": "mqtt", + "rf_sensor": "mqtt", "fire_incident": "fire", "stream_gauge": "streamgauge", } diff --git a/backend/routers/sources.py b/backend/routers/sources.py index 1c483fc..28b859b 100644 --- a/backend/routers/sources.py +++ b/backend/routers/sources.py @@ -15,9 +15,9 @@ from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession -from config_writer import add_entry, add_alert_zone, remove_entry, remove_alert_zone, update_entry +from config_writer import add_entry, add_alert_zone, remove_entry, remove_alert_zone, update_entry, add_mqtt_entry, remove_mqtt_entry, update_mqtt_entry from deps import get_db -from db.models import AlertZoneConfig, NewsFeed, PollerSource +from db.models import AlertZoneConfig, MqttSource, NewsFeed, PollerSource router = APIRouter(prefix="/sources", tags=["sources"]) @@ -235,3 +235,97 @@ async def delete_alert_zone(zone_id: int, db: AsyncSession = Depends(get_db)): await db.delete(az) await db.commit() await remove_alert_zone(code) + + +# --------------------------------------------------------------------------- +# MQTT sources +# --------------------------------------------------------------------------- + +class MqttSourceCreate(BaseModel): + name: str + normalizer: Literal["tinygs", "rtl_433", "meshtastic", "ais"] + broker: str = "mosquitto" + port: int = 1883 + topic: str + qos: int = 0 + auth_enabled: bool = False + enabled: bool = True + + +class MqttSourceResponse(BaseModel): + id: int + name: str + normalizer: str + broker: str + port: int + topic: str + qos: int + auth_enabled: bool + enabled: bool + source: str + + +def _ms_response(ms: MqttSource) -> MqttSourceResponse: + return MqttSourceResponse( + id=ms.id, name=ms.name, normalizer=ms.normalizer, + broker=ms.broker, port=ms.port, topic=ms.topic, + qos=ms.qos, auth_enabled=ms.auth_enabled, + enabled=ms.enabled, source=ms.source, + ) + + +@router.get("/mqtt", response_model=list[MqttSourceResponse]) +async def list_mqtt_sources(db: AsyncSession = Depends(get_db)): + result = await db.execute(select(MqttSource).order_by(MqttSource.normalizer, MqttSource.id)) + return [_ms_response(ms) for ms in result.scalars().all()] + + +@router.post("/mqtt", response_model=MqttSourceResponse, status_code=201) +async def create_mqtt_source(body: MqttSourceCreate, db: AsyncSession = Depends(get_db)): + ms = MqttSource( + name=body.name, normalizer=body.normalizer, broker=body.broker, + port=body.port, topic=body.topic, qos=body.qos, + auth_enabled=body.auth_enabled, enabled=body.enabled, + source="user", + created_at=datetime.now(timezone.utc), + updated_at=datetime.now(timezone.utc), + ) + db.add(ms) + try: + await db.commit() + except IntegrityError: + await db.rollback() + raise HTTPException(409, f"MQTT source {body.name!r} already exists") + await db.refresh(ms) + await add_mqtt_entry({ + "name": ms.name, "normalizer": ms.normalizer, "broker": ms.broker, + "port": ms.port, "topic": ms.topic, "qos": ms.qos, + "auth_enabled": ms.auth_enabled, "enabled": ms.enabled, "source": "user", + }) + return _ms_response(ms) + + +@router.patch("/mqtt/{source_id}/toggle", response_model=MqttSourceResponse) +async def toggle_mqtt_source(source_id: int, db: AsyncSession = Depends(get_db)): + result = await db.execute(select(MqttSource).where(MqttSource.id == source_id)) + ms = result.scalar_one_or_none() + if not ms: + raise HTTPException(404, "MQTT source not found") + ms.enabled = not ms.enabled + ms.updated_at = datetime.now(timezone.utc) + await db.commit() + await db.refresh(ms) + await update_mqtt_entry(ms.name, {"enabled": ms.enabled}) + return _ms_response(ms) + + +@router.delete("/mqtt/{source_id}", status_code=204) +async def delete_mqtt_source(source_id: int, db: AsyncSession = Depends(get_db)): + result = await db.execute(select(MqttSource).where(MqttSource.id == source_id)) + ms = result.scalar_one_or_none() + if not ms: + raise HTTPException(404, "MQTT source not found") + name = ms.name + await db.delete(ms) + await db.commit() + await remove_mqtt_entry(name) diff --git a/config/sources.example.yml b/config/sources.example.yml index fc520b1..216c114 100644 --- a/config/sources.example.yml +++ b/config/sources.example.yml @@ -116,11 +116,67 @@ poller_sources: source: config # --------------------------------------------------------------------------- -# TinyGS — LoRa satellite ground station -# The poller connects to mqtt.tinygs.com:8883 using your TinyGS MQTT credentials. -# Set TINYGS_MQTT_USERNAME and TINYGS_MQTT_PASSWORD in .env (from the Telegram bot). -# No entry needed here — credentials are environment variables, not URL-based sources. -# --------------------------------------------------------------------------- +# MQTT sources — IoT and hardware devices that publish to the local broker. +# The Mosquitto broker starts automatically as part of the Vertex stack. +# Hardware devices should be configured to publish to this Pi's LAN IP on +# port 1883 (or the MQTT_PORT you set in .env). +# +# broker: "mosquitto" — internal Docker service name (use for all local hardware) +# "192.168.1.x" — LAN device running its own broker (e.g., Frigate on another Pi) +# +# normalizer: which parser handles messages on this topic +# tinygs — TinyGS LoRa ground station (configure node MQTT server to Pi LAN IP) +# rtl_433 — RTL_433 SDR RF sensor decoder (rtl_433 -F "mqtt://localhost:1883,events=rtl_433/events") +# meshtastic — Meshtastic node JSON MQTT uplink (configure node MQTT server to Pi LAN IP) +# ais — AIS-catcher MQTT output (alternative to WebSocket poller_source) +# +# auth_enabled: false for local Mosquitto (no credentials needed) +# true for external brokers — set MQTT_{UPPER_SNAKE_NAME}_USERNAME/PASSWORD in .env +# +# TinyGS note: configure your node's MQTT settings to point at this Pi's LAN IP. +# The global TinyGS REST poller (TINYGS_ENABLED) can run alongside or be disabled +# once your local node is providing data. +# --------------------------------------------------------------------------- +mqtt_sources: + - name: "TinyGS Node" + normalizer: tinygs + broker: mosquitto + port: 1883 + topic: "tinygs/#" + qos: 0 + auth_enabled: false + enabled: false + source: config + + - name: "RTL_433 Sensors" + normalizer: rtl_433 + broker: mosquitto + port: 1883 + topic: "rtl_433/#" + qos: 0 + auth_enabled: false + enabled: false + source: config + + - name: "Meshtastic" + normalizer: meshtastic + broker: mosquitto + port: 1883 + topic: "msh/#" + qos: 0 + auth_enabled: false + enabled: false + source: config + + - name: "AIS-Catcher MQTT" + normalizer: ais + broker: mosquitto + port: 1883 + topic: "ais/#" + qos: 0 + auth_enabled: false + enabled: false + source: config # --------------------------------------------------------------------------- # Alert zones — NWS zone codes for weather alerts. diff --git a/db/init/10_mqtt_sources.sql b/db/init/10_mqtt_sources.sql new file mode 100644 index 0000000..c3d553b --- /dev/null +++ b/db/init/10_mqtt_sources.sql @@ -0,0 +1,22 @@ +-- MQTT source configuration table +-- Runtime DB cache of mqtt_sources entries from config/sources.yml. +-- Seeded at startup by the config_sync process; kept in sync by the config watcher. +-- Credentials are never stored here — auth uses env vars keyed by source name. + +CREATE TABLE IF NOT EXISTS mqtt_sources ( + id SERIAL PRIMARY KEY, + name VARCHAR(128) NOT NULL UNIQUE, + normalizer VARCHAR(32) NOT NULL, + broker VARCHAR(256) NOT NULL DEFAULT 'mosquitto', + port INTEGER NOT NULL DEFAULT 1883, + topic VARCHAR(512) NOT NULL, + qos INTEGER NOT NULL DEFAULT 0, + auth_enabled BOOLEAN NOT NULL DEFAULT FALSE, + enabled BOOLEAN NOT NULL DEFAULT TRUE, + source VARCHAR(16) NOT NULL DEFAULT 'config', + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE INDEX IF NOT EXISTS ix_mqtt_sources_enabled ON mqtt_sources (enabled); +CREATE INDEX IF NOT EXISTS ix_mqtt_sources_normalizer ON mqtt_sources (normalizer); diff --git a/docker-compose.yml b/docker-compose.yml index 02a97f0..00104ce 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -50,6 +50,31 @@ services: networks: - internal + # ── MQTT Broker ────────────────────────────────── + mosquitto: + image: eclipse-mosquitto:2 + restart: unless-stopped + deploy: + resources: + limits: + memory: 64m + cpus: '0.25' + reservations: + memory: 16m + cpus: '0.05' + volumes: + - ./infra/mosquitto/mosquitto.conf:/mosquitto/config/mosquitto.conf:ro + - mosquitto_data:/mosquitto/data + ports: + - "${MQTT_PORT:-1883}:1883" + healthcheck: + test: ["CMD-SHELL", "mosquitto_pub -h 127.0.0.1 -p 1883 -t 'health/check' -m 'ok' -q 0"] + interval: 10s + timeout: 5s + retries: 5 + networks: + - internal + # ── Application ────────────────────────────────── backend: build: @@ -126,6 +151,8 @@ services: condition: service_healthy redis: condition: service_healthy + mosquitto: + condition: service_healthy networks: - internal volumes: @@ -225,5 +252,6 @@ networks: volumes: db_data: redis_data: + mosquitto_data: # MQTT broker persistence (retained messages, subscriptions) p25_audio: # P25 call recordings; shared with transcription service whisper_models: # Whisper model weights cache; avoids re-downloading on container restart diff --git a/frontend/src/components/MapOverlay.tsx b/frontend/src/components/MapOverlay.tsx index d5476a5..f4afa85 100644 --- a/frontend/src/components/MapOverlay.tsx +++ b/frontend/src/components/MapOverlay.tsx @@ -107,7 +107,7 @@ export function MapOverlay({ map }: Props) { const camerasRef = useRef([]) const selectedCamRef = useRef(null) const activeTabRef = useRef('safety') - const entityFilterRef = useRef({ aircraft: true, adsbLocal: true, adsbSupplement: true, vessel: true, mesh_node: true, aprs: true, fire_incident: true, satellite: true, tinygs_station: true, train: true }) + const entityFilterRef = useRef({ aircraft: true, adsbLocal: true, adsbSupplement: true, vessel: true, mesh_node: true, aprs: true, fire_incident: true, satellite: true, tinygs_station: true, tinygs_satellite: true, rf_sensor: true, train: true }) const searchQueryRef = useRef('') const altRangeRef = useRef([0, 60_000]) const speedRangeRef = useRef([0, 600]) diff --git a/frontend/src/store.ts b/frontend/src/store.ts index 4de0c72..0895613 100644 --- a/frontend/src/store.ts +++ b/frontend/src/store.ts @@ -368,7 +368,7 @@ export const useCivicStore = create()( mobileNavOpen: false, settingsOpen: false, helpOpen: false, - entityFilter: { aircraft: true, adsbLocal: true, adsbSupplement: true, vessel: true, mesh_node: true, aprs: true, fire_incident: true, satellite: true, tinygs_station: true, train: true }, + entityFilter: { aircraft: true, adsbLocal: true, adsbSupplement: true, vessel: true, mesh_node: true, aprs: true, fire_incident: true, satellite: true, tinygs_station: true, tinygs_satellite: true, rf_sensor: true, train: true }, entitySearchQuery: '', entityAltRange: ALT_RANGE_DEFAULT, entitySpeedRange: SPD_RANGE_DEFAULT, @@ -457,8 +457,10 @@ export const useCivicStore = create()( vessel: 600_000, // 10 min — AIS updates are infrequent mesh_node: 604_800_000, // 7 days — mesh nodes are semi-permanent infrastructure satellite: 1_800_000, // 30 min — matches poller TTL - tinygs_station: 600_000, // 10 min — station ping is every ~60 s - stream_gauge: 600_000, // 10 min — gauges are polled every 5 min + tinygs_station: 600_000, // 10 min — station ping is every ~60 s + tinygs_satellite: 3_600_000, // 60 min — matches poller TTL, satellites pass infrequently + rf_sensor: 900_000, // 15 min — matches poller TTL, sensors broadcast every few min + stream_gauge: 600_000, // 10 min — gauges are polled every 5 min tak_client: 300_000, // 5 min — TAK SA ping is every 30 s–2 min train: 600_000, // 10 min — Amtrak polls every 60 s } diff --git a/frontend/src/storeTypes.ts b/frontend/src/storeTypes.ts index e0e1768..20439dc 100644 --- a/frontend/src/storeTypes.ts +++ b/frontend/src/storeTypes.ts @@ -260,6 +260,8 @@ export type EntityTypeFilter = { fire_incident: boolean satellite: boolean tinygs_station: boolean + tinygs_satellite: boolean + rf_sensor: boolean train: boolean } diff --git a/infra/mosquitto/mosquitto.conf b/infra/mosquitto/mosquitto.conf new file mode 100644 index 0000000..53def74 --- /dev/null +++ b/infra/mosquitto/mosquitto.conf @@ -0,0 +1,17 @@ +# Vertex local MQTT broker — IoT hardware ingress bus +# Accepts connections from LAN hardware devices (TinyGS nodes, RTL_433, etc.) +# and from the poller container via internal Docker networking. +# No TLS, no authentication — intended for local network use only. +# Do NOT expose port 1883 beyond your LAN. + +listener 1883 +allow_anonymous true + +persistence true +persistence_location /mosquitto/data/ + +log_dest stdout +log_type error +log_type warning +log_type notice +log_type information diff --git a/poller/config_loader.py b/poller/config_loader.py index fe55b5b..bd1bf3b 100644 --- a/poller/config_loader.py +++ b/poller/config_loader.py @@ -32,6 +32,18 @@ class PollerSourceEntry(BaseModel): source: Literal["config", "user"] = "config" +class MqttSourceEntry(BaseModel): + name: str + normalizer: Literal["tinygs", "rtl_433", "meshtastic", "ais"] + broker: str = "mosquitto" + port: int = 1883 + topic: str + qos: int = 0 + auth_enabled: bool = False + enabled: bool = True + source: Literal["config", "user"] = "config" + + class AlertFeedEntry(BaseModel): name: str url: str @@ -72,6 +84,7 @@ class SourcesConfig(BaseModel): alert_zones: AlertZonesConfig = AlertZonesConfig() alert_feeds: list[AlertFeedEntry] = [] regions: list[RegionEntry] = [] + mqtt_sources: list[MqttSourceEntry] = [] def load_sources_config() -> SourcesConfig: diff --git a/poller/config_sync.py b/poller/config_sync.py index 8449b56..42af202 100644 --- a/poller/config_sync.py +++ b/poller/config_sync.py @@ -2,7 +2,7 @@ import asyncpg -from config_loader import AlertFeedEntry, AlertZonesConfig, NewsFeedEntry, PollerSourceEntry, RadioStreamEntry, SourcesConfig +from config_loader import AlertFeedEntry, AlertZonesConfig, MqttSourceEntry, NewsFeedEntry, PollerSourceEntry, RadioStreamEntry, SourcesConfig logger = logging.getLogger(__name__) @@ -21,11 +21,12 @@ async def sync_sources_to_db(config: SourcesConfig, pool: asyncpg.Pool) -> None: ps = await _sync_poller_sources(config.poller_sources, conn) az = await _sync_alert_zones(config.alert_zones, conn) af = await _sync_alert_feeds(config.alert_feeds, conn) + ms = await _sync_mqtt_sources(config.mqtt_sources, conn) - if any((rs, nf, ps, az, af)): + if any((rs, nf, ps, az, af, ms)): logger.info( - "[config_sync] radio_streams(%s) news_feeds(%s) poller_sources(%s) alert_zones(%s) alert_feeds(%s)", - rs, nf, ps, az, af, + "[config_sync] radio_streams(%s) news_feeds(%s) poller_sources(%s) alert_zones(%s) alert_feeds(%s) mqtt_sources(%s)", + rs, nf, ps, az, af, ms, ) else: logger.debug("[config_sync] no changes") @@ -204,3 +205,48 @@ async def _sync_alert_feeds( removed += len(to_remove) return f"+{added} -{removed}" if (added or removed) else "" + + +async def _sync_mqtt_sources( + entries: list[MqttSourceEntry], conn: asyncpg.Connection +) -> str: + existing = await conn.fetch("SELECT name, source FROM mqtt_sources") + db_all_names = {row["name"] for row in existing} + db_config_names = {row["name"] for row in existing if row["source"] == "config"} + + yaml_config_names = {e.name for e in entries if e.source == "config"} + added = removed = 0 + + for entry in entries: + if entry.name not in db_all_names: + await conn.execute( + """ + INSERT INTO mqtt_sources + (name, normalizer, broker, port, topic, qos, auth_enabled, enabled, source, created_at, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, NOW(), NOW()) + """, + entry.name, entry.normalizer, entry.broker, entry.port, + entry.topic, entry.qos, entry.auth_enabled, entry.enabled, entry.source, + ) + added += 1 + elif entry.source == "config": + await conn.execute( + """ + UPDATE mqtt_sources + SET normalizer=$1, broker=$2, port=$3, topic=$4, qos=$5, + auth_enabled=$6, enabled=$7, updated_at=NOW() + WHERE name=$8 AND source='config' + """, + entry.normalizer, entry.broker, entry.port, entry.topic, entry.qos, + entry.auth_enabled, entry.enabled, entry.name, + ) + + to_remove = db_config_names - yaml_config_names + if to_remove: + await conn.execute( + "DELETE FROM mqtt_sources WHERE source = 'config' AND name = ANY($1::text[])", + list(to_remove), + ) + removed += len(to_remove) + + return f"+{added} -{removed}" if (added or removed) else "" diff --git a/poller/db.py b/poller/db.py index d512d72..a2f0c69 100644 --- a/poller/db.py +++ b/poller/db.py @@ -285,3 +285,40 @@ async def write_acars_message(msg: dict) -> bool: except Exception as exc: logger.warning("[acars] DB write failed: %s", exc) return False + + +async def write_mesh_message(msg: dict) -> None: + """Upsert a mesh message from Meshtastic MQTT into mesh_messages. + + msg keys: id, msg_type, conversation_key, channel_name, text, + sender_name, sender_key, outgoing, acked, ts, source_url + """ + if _pool is None: + return + ts = msg.get("ts") + if isinstance(ts, (int, float)): + from datetime import datetime, timezone + ts = datetime.fromtimestamp(float(ts), tz=timezone.utc) + try: + await _pool.execute( + """ + INSERT INTO mesh_messages + (id, msg_type, conversation_key, channel_name, text, + sender_name, sender_key, outgoing, acked, ts, source_url) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11) + ON CONFLICT (id) DO NOTHING + """, + sanitize_text(msg.get("id") or ""), + sanitize_text(msg.get("msg_type") or "channel"), + sanitize_text(msg.get("conversation_key") or ""), + sanitize_text(msg.get("channel_name") or ""), + sanitize_text(msg.get("text") or ""), + sanitize_text(msg.get("sender_name") or ""), + sanitize_text(msg.get("sender_key") or ""), + bool(msg.get("outgoing", False)), + bool(msg.get("acked", False)), + ts, + sanitize_text(msg.get("source_url") or ""), + ) + except Exception as exc: + logger.warning("[mesh_message] DB write failed: %s", exc) diff --git a/poller/main.py b/poller/main.py index fbbdfd3..e9e0acc 100644 --- a/poller/main.py +++ b/poller/main.py @@ -24,6 +24,7 @@ from pollers.p25_recorder import P25AudioRecorder from pollers.anomaly import AnomalyDetectionPoller from pollers.tinygs import TinyGSPoller +from pollers.mqtt_subscriber import MqttSubscriberPoller from pollers.lightning import LightningPoller from pollers.streamgauge import StreamGaugePoller from pollers.gdacs import GdacsPoller @@ -96,7 +97,9 @@ async def main(): if settings.tinygs_enabled: pollers.append(TinyGSPoller()) else: - logger.info("[tinygs] integration sunset by default (set TINYGS_ENABLED=true to re-enable)") + logger.info("[tinygs] REST poller disabled (set TINYGS_ENABLED=true, or configure a tinygs mqtt_source for local node data)") + + pollers.append(MqttSubscriberPoller()) tasks = [asyncio.create_task(p.run()) for p in pollers] tasks.append(asyncio.create_task(_purge_loop())) diff --git a/poller/normalizers/ais_mqtt.py b/poller/normalizers/ais_mqtt.py new file mode 100644 index 0000000..b0dfcdf --- /dev/null +++ b/poller/normalizers/ais_mqtt.py @@ -0,0 +1,41 @@ +""" +AIS MQTT normalizer. + +Handles JSON messages published by AIS-catcher to the local Mosquitto broker. +AIS-catcher's MQTT output uses the same JSON schema as its WebSocket output, +so this normalizer delegates directly to the existing normalize_ais_catcher +function and feeds into the same vessel entity pipeline. + +Configure AIS-catcher with: + -N 10101 -H 192.168.1.x:1883 MQTT ON + +This is an alternative transport to the WebSocket poller (poller_sources type +'ais'). Both can run simultaneously — vessel entity upserts are idempotent. +If you switch to MQTT, you can remove the 'ais' entry from poller_sources to +avoid the redundant WebSocket connection. +""" + +import json +import logging + +from bus import publish_entity +from normalizers.vessel import normalize_ais_catcher + +logger = logging.getLogger(__name__) + +_VESSEL_TTL = 600 # 10 minutes, matching the WebSocket AIS poller + + +async def handle(topic: str, payload: str) -> None: + try: + data = json.loads(payload) + except (json.JSONDecodeError, ValueError): + logger.debug("[ais_mqtt] non-JSON payload on %s", topic) + return + + if not isinstance(data, dict): + return + + entity = normalize_ais_catcher(data) + if entity: + await publish_entity(entity, ttl=_VESSEL_TTL) diff --git a/poller/normalizers/meshtastic_mqtt.py b/poller/normalizers/meshtastic_mqtt.py new file mode 100644 index 0000000..9dcecc8 --- /dev/null +++ b/poller/normalizers/meshtastic_mqtt.py @@ -0,0 +1,217 @@ +""" +Meshtastic MQTT normalizer. + +Handles JSON messages published by Meshtastic nodes to a local MQTT broker +when the node's MQTT uplink is configured to use JSON mode. + +Configure each Meshtastic node: + Settings → Module Config → MQTT → Server Address: + JSON Enabled: on + +Topic format: msh/{region}/2/json/{channel}/{node_hex_id} +Recommended subscription topic: msh/# + +Message types handled: + position → updates mesh_node entity lat/lon/alt + nodeinfo → updates mesh_node entity display name and hardware info + telemetry → updates mesh_node identity (battery, voltage, utilization) + text → persists to mesh_messages table (fenced from MeshCore by source_url) + +MeshCore and Meshtastic MQTT can run simultaneously. Entities from this +normalizer carry source='meshtastic'; MeshCore entities carry source='meshcore'. +The mesh panel uses source_url in mesh_messages to separate chat streams: + MeshCore messages: source_url starts with 'http' + Meshtastic messages: source_url starts with 'mqtt:' +""" + +import json +import logging +import time + +from bus import publish_entity +from db import write_mesh_message + +logger = logging.getLogger(__name__) + +_NODE_TTL = 1_800 # 30 minutes +_CHANNEL_TTL = 86_400 # 24 hours (used for source_url namespace) + + +async def handle(topic: str, payload: str) -> None: + try: + data = json.loads(payload) + except (json.JSONDecodeError, ValueError): + logger.debug("[meshtastic] non-JSON payload on %s", topic) + return + + if not isinstance(data, dict): + return + + msg_type = data.get("type") + sender = data.get("sender") or data.get("from") + if not sender: + return + + sender_hex = _to_hex(sender) + entity_id = f"mesh_node:{sender_hex}" + + # Extract channel from topic: msh/{region}/2/json/{channel}/{node_id} + channel = _channel_from_topic(topic) + + if msg_type == "position": + await _handle_position(data, entity_id, sender_hex) + + elif msg_type == "nodeinfo": + await _handle_nodeinfo(data, entity_id, sender_hex) + + elif msg_type == "telemetry": + await _handle_telemetry(data, entity_id, sender_hex) + + elif msg_type == "text": + await _handle_text(data, entity_id, sender_hex, channel, topic) + + +async def _handle_position(data: dict, entity_id: str, sender_hex: str) -> None: + payload = data.get("payload") or {} + lat_i = payload.get("latitude_i") + lon_i = payload.get("longitude_i") + if lat_i is None or lon_i is None: + return + + lat = lat_i / 1e7 + lon = lon_i / 1e7 + alt = payload.get("altitude") + + entity = { + "entity_id": entity_id, + "entity_type": "mesh_node", + "source": "meshtastic", + "display_name": sender_hex, + "lat": lat, + "lon": lon, + "altitude": float(alt) if alt is not None else None, + "status": "active", + "identity": {"node_id": sender_hex}, + "tags": ["mesh_node"], + "signal_quality": _snr_to_quality(data.get("snr") or data.get("rxSnr")), + } + await publish_entity(entity, ttl=_NODE_TTL) + + +async def _handle_nodeinfo(data: dict, entity_id: str, sender_hex: str) -> None: + payload = data.get("payload") or {} + long_name = str(payload.get("longname") or "").strip() + short_name = str(payload.get("shortname") or "").strip() + display = long_name or short_name or sender_hex + + entity = { + "entity_id": entity_id, + "entity_type": "mesh_node", + "source": "meshtastic", + "display_name": display, + "lat": None, + "lon": None, + "status": "active", + "identity": { + "node_id": sender_hex, + "long_name": long_name or None, + "short_name": short_name or None, + "hw_model": str(payload.get("hardware", "")) or None, + "role": payload.get("role"), + }, + "tags": ["mesh_node"], + } + await publish_entity(entity, ttl=_NODE_TTL, record_observation=False) + + +async def _handle_telemetry(data: dict, entity_id: str, sender_hex: str) -> None: + payload = data.get("payload") or {} + device = payload.get("device_metrics") or {} + + battery = device.get("battery_level") + voltage = device.get("voltage") + chan_util = device.get("channel_utilization") + air_util = device.get("air_util_tx") + + if not any(v is not None for v in (battery, voltage, chan_util, air_util)): + return + + identity_update: dict = {"node_id": sender_hex} + if battery is not None: + identity_update["battery_level"] = battery + if voltage is not None: + identity_update["voltage"] = round(float(voltage), 2) + if chan_util is not None: + identity_update["channel_utilization"] = round(float(chan_util), 1) + if air_util is not None: + identity_update["air_util_tx"] = round(float(air_util), 1) + + entity = { + "entity_id": entity_id, + "entity_type": "mesh_node", + "source": "meshtastic", + "display_name": sender_hex, + "lat": None, + "lon": None, + "status": "active", + "identity": identity_update, + "tags": ["mesh_node"], + } + await publish_entity(entity, ttl=_NODE_TTL, record_observation=False) + + +async def _handle_text( + data: dict, entity_id: str, sender_hex: str, channel: str, topic: str +) -> None: + payload = data.get("payload") or {} + text = payload.get("text") if isinstance(payload, dict) else str(payload) + if not text: + return + + msg_id = str(data.get("id") or f"msh_{int(time.time())}_{sender_hex}") + ts = data.get("timestamp") or data.get("rxTime") or time.time() + to_field = data.get("to", 0) + # 0xFFFFFFFF is the broadcast address in Meshtastic + is_broadcast = (to_field == 0xFFFFFFFF or to_field == 4294967295) + msg_type_str = "channel" if is_broadcast else "direct" + source_url = f"mqtt:{channel}" + + try: + await write_mesh_message({ + "id": f"meshtastic:{msg_id}", + "msg_type": msg_type_str, + "conversation_key": channel if is_broadcast else f"{sender_hex}:direct", + "channel_name": channel, + "text": str(text), + "sender_name": sender_hex, + "sender_key": sender_hex, + "outgoing": False, + "acked": False, + "ts": ts, + "source_url": source_url, + }) + except Exception as exc: + logger.warning("[meshtastic] message save failed: %s", exc) + + +def _to_hex(sender) -> str: + if isinstance(sender, int): + return f"!{sender:08x}" + return str(sender) + + +def _channel_from_topic(topic: str) -> str: + """Extract channel name from topic msh/{region}/2/json/{channel}/{node_id}.""" + parts = topic.split("/") + if len(parts) >= 5: + return parts[4] + return "unknown" + + +def _snr_to_quality(snr) -> float | None: + if snr is None: + return None + try: + return max(0.0, min(1.0, (float(snr) + 20) / 30)) + except (TypeError, ValueError): + return None diff --git a/poller/normalizers/rtl_433.py b/poller/normalizers/rtl_433.py new file mode 100644 index 0000000..2cd2602 --- /dev/null +++ b/poller/normalizers/rtl_433.py @@ -0,0 +1,127 @@ +""" +RTL_433 MQTT normalizer. + +Handles JSON messages published by rtl_433 (SDR-based RF decoder) to the +local Mosquitto broker. Each message becomes an rf_sensor entity pinned to +the Pi's home coordinates; the sensor values are stored as observations. + +RTL_433 publishes one message per decoded frame on topics like: + rtl_433/HOST/events — all decoded events (recommended) + rtl_433/HOST/devices/MODEL/ID/FIELD/VALUE — per-field topics + +This normalizer handles the JSON event format only. Configure rtl_433 with: + rtl_433 -F "mqtt://localhost:1883,events=rtl_433/events" + +Decoded fields vary by device model. Common ones we capture: + temperature_C, humidity, wind_speed_km_h, wind_dir_deg, rain_mm, + pressure_hPa, battery_ok, rssi, snr, noise +""" + +import json +import logging + +from bus import publish_entity +from config import settings + +logger = logging.getLogger(__name__) + +_SENSOR_TTL = 900 # 15 minutes — sensors broadcast every few minutes + + +async def handle(topic: str, payload: str) -> None: + try: + data = json.loads(payload) + except (json.JSONDecodeError, ValueError): + logger.debug("[rtl_433] non-JSON payload on %s", topic) + return + + if not isinstance(data, dict): + return + + model = str(data.get("model") or "unknown") + device_id = data.get("id") if data.get("id") is not None else data.get("channel", 0) + channel = data.get("channel") + + entity_id = f"rtl_433:{model}:{device_id}" + + identity: dict = { + "model": model, + "device_id": str(device_id), + "channel": str(channel) if channel is not None else None, + "battery_ok": data.get("battery_ok"), + } + + # Capture all numeric sensor fields present in this frame + _sensor_fields = ( + "temperature_C", + "temperature_F", + "humidity", + "wind_speed_km_h", + "wind_avg_km_h", + "wind_dir_deg", + "rain_mm", + "pressure_hPa", + "uv", + "lux", + "moisture", + "depth_cm", + "power_W", + "energy_kWh", + "current_A", + "voltage_V", + ) + for field in _sensor_fields: + if data.get(field) is not None: + try: + identity[field] = float(data[field]) + except (TypeError, ValueError): + pass + + snr = _coerce_float(data.get("snr")) + rssi = _coerce_float(data.get("rssi")) + + # Build a human-readable display name + display_name = _display_name(model, device_id, channel, identity) + + entity = { + "entity_id": entity_id, + "entity_type": "rf_sensor", + "source": "rtl_433", + "display_name": display_name, + "lat": settings.region_lat, + "lon": settings.region_lon, + "status": "active", + "identity": identity, + "tags": ["rtl_433", "rf_sensor"], + "signal_quality": _rssi_to_quality(rssi) if rssi is not None else _snr_to_quality(snr), + } + await publish_entity(entity, ttl=_SENSOR_TTL) + + +def _display_name(model: str, device_id, channel, identity: dict) -> str: + base = model + if channel is not None: + base = f"{model} Ch{channel}" + temp = identity.get("temperature_C") + if temp is not None: + return f"{base} ({temp:.1f}°C)" + return base + + +def _coerce_float(v) -> float | None: + if v is None: + return None + try: + return float(v) + except (TypeError, ValueError): + return None + + +def _rssi_to_quality(rssi: float) -> float: + # Typical RSSI range for close-range 433 MHz: -120 dBm (floor) to -40 dBm (excellent) + return max(0.0, min(1.0, (rssi + 120) / 80)) + + +def _snr_to_quality(snr: float) -> float: + # Typical SNR range: 0 to 30 dB + return max(0.0, min(1.0, snr / 30)) diff --git a/poller/normalizers/tinygs_mqtt.py b/poller/normalizers/tinygs_mqtt.py new file mode 100644 index 0000000..13dd43e --- /dev/null +++ b/poller/normalizers/tinygs_mqtt.py @@ -0,0 +1,112 @@ +""" +TinyGS MQTT normalizer. + +Handles packets published by a local TinyGS LoRa ground station to the +local Mosquitto broker. Each received satellite packet produces: + - A tinygs_satellite entity updated with the satellite's computed position + at time of reception (satPos field). + - A satellite_contact event recording SNR, RSSI, frequency, and decoded + payload. + +The ground station entity (tinygs_station) continues to be published by +the REST-based TinyGS poller when no local MQTT source is active. +""" + +import json +import logging +import time + +from bus import publish_entity +from db import write_event + +logger = logging.getLogger(__name__) + +_SATELLITE_TTL = 3600 # satellite stays on map for 1 hour after last contact +_STATION_TTL = 600 # ground station entity TTL (matches REST poller) + + +async def handle(topic: str, payload: str) -> None: + try: + data = json.loads(payload) + except (json.JSONDecodeError, ValueError): + logger.debug("[tinygs_mqtt] non-JSON payload on %s", topic) + return + + if not isinstance(data, dict): + return + + sat_name = data.get("satellite") or data.get("sat") or "" + station = data.get("station") or data.get("stationName") or "" + snr = _coerce_float(data.get("SNR") or data.get("snr")) + rssi = _coerce_float(data.get("RSSI") or data.get("rssi")) + frequency = _coerce_float(data.get("frequency") or data.get("freq")) + frame = data.get("frame") + parsed = data.get("parsed") + rx_time = data.get("time") or data.get("rxTime") or time.time() + + # Satellite position from satPos field + sat_pos = data.get("satPos") or {} + sat_lat = _coerce_float(sat_pos.get("lat") or sat_pos.get("latitude")) + sat_lon = _coerce_float(sat_pos.get("lon") or sat_pos.get("longitude") or sat_pos.get("lng")) + sat_alt = _coerce_float(sat_pos.get("alt") or sat_pos.get("altitude")) + + if sat_name and sat_lat is not None and sat_lon is not None: + entity = { + "entity_id": f"tinygs:satellite:{sat_name}", + "entity_type": "tinygs_satellite", + "source": "tinygs", + "display_name": sat_name, + "lat": sat_lat, + "lon": sat_lon, + "altitude": sat_alt, + "status": "active", + "identity": { + "satellite_name": sat_name, + "frequency": frequency, + "mode": data.get("mode"), + "last_station": station, + }, + "tags": ["tinygs", "satellite"], + "signal_quality": _snr_to_quality(snr), + } + await publish_entity(entity, ttl=_SATELLITE_TTL) + + # Always record a contact event when a packet arrives + entity_id = f"tinygs:satellite:{sat_name}" if sat_name else "tinygs:unknown" + try: + await write_event( + event_type="satellite_contact", + entity_id=entity_id, + severity="info", + summary=f"Packet from {sat_name}" if sat_name else "TinyGS packet received", + details={ + "satellite": sat_name or None, + "station": station or None, + "snr": snr, + "rssi": rssi, + "frequency": frequency, + "mode": data.get("mode"), + "frame": frame, + "parsed": parsed, + "rx_time": rx_time, + }, + ) + except Exception as exc: + logger.warning("[tinygs_mqtt] event write failed: %s", exc) + + +def _coerce_float(v) -> float | None: + if v is None: + return None + try: + return float(v) + except (TypeError, ValueError): + return None + + +def _snr_to_quality(snr: float | None) -> float | None: + """Map SNR (dB) to a 0–1 signal quality score.""" + if snr is None: + return None + # Typical LoRa SNR range: -20 dB (minimum) to +10 dB (excellent) + return max(0.0, min(1.0, (snr + 20) / 30)) diff --git a/poller/pollers/mqtt_subscriber.py b/poller/pollers/mqtt_subscriber.py new file mode 100644 index 0000000..3f582c9 --- /dev/null +++ b/poller/pollers/mqtt_subscriber.py @@ -0,0 +1,175 @@ +""" +MQTT subscriber poller — subscribes to the local Mosquitto broker and +dispatches incoming messages to per-normalizer handlers. + +Sources are loaded from the mqtt_sources DB table (seeded from sources.yml). +Multiple sources on the same (broker, port) share a single connection. +Each broker connection runs as an independent async task and reconnects +automatically on failure. + +Authentication: when auth_enabled=True for a source, credentials are loaded +from environment variables keyed by the sanitized source name: + MQTT_{UPPER_SNAKE_NAME}_USERNAME + MQTT_{UPPER_SNAKE_NAME}_PASSWORD + +Supported normalizers: tinygs, rtl_433, meshtastic, ais +""" + +import asyncio +import logging +import os +import re + +import aiomqtt + +from .base import BasePoller +import normalizers.tinygs_mqtt as _tinygs +import normalizers.rtl_433 as _rtl_433 +import normalizers.meshtastic_mqtt as _meshtastic +import normalizers.ais_mqtt as _ais + +logger = logging.getLogger(__name__) + +_RETRY_DELAY = 10 # seconds between reconnect attempts +_KEEPALIVE = 60 # MQTT keepalive interval + +_NORMALIZERS: dict = { + "tinygs": _tinygs.handle, + "rtl_433": _rtl_433.handle, + "meshtastic": _meshtastic.handle, + "ais": _ais.handle, +} + + +class MqttSubscriberPoller(BasePoller): + name = "mqtt" + interval = 60 # heartbeat only; actual work is event-driven + + def __init__(self): + self._sources: list[dict] = [] + + async def poll(self): + pass # streaming poller — overrides run() + + async def setup(self): + from db import get_pool + rows = await get_pool().fetch( + "SELECT name, normalizer, broker, port, topic, qos, auth_enabled " + "FROM mqtt_sources WHERE enabled = TRUE" + ) + self._sources = [dict(row) for row in rows] + if self._sources: + logger.info("[mqtt] %d source(s): %s", len(self._sources), + [s["name"] for s in self._sources]) + else: + logger.info("[mqtt] no enabled MQTT sources — subscriber inactive") + + async def run(self): + await self.setup() + if not self._sources: + return + + # Group sources by (broker, port) — one connection per broker + groups: dict[tuple, list[dict]] = {} + for src in self._sources: + key = (src["broker"], src["port"]) + groups.setdefault(key, []).append(src) + + tasks = [ + asyncio.create_task( + self._run_broker(broker, port, sources), + name=f"mqtt:{broker}:{port}", + ) + for (broker, port), sources in groups.items() + ] + await asyncio.gather(*tasks) + + async def _run_broker( + self, broker: str, port: int, sources: list[dict] + ) -> None: + username, password = _broker_auth(sources) + topic_map = {src["topic"]: src for src in sources} + + logger.info("[mqtt] connecting to %s:%s (%d topic(s))", broker, port, len(sources)) + + while True: + try: + async with aiomqtt.Client( + hostname=broker, + port=port, + username=username, + password=password, + keepalive=_KEEPALIVE, + ) as client: + for src in sources: + await client.subscribe(src["topic"], qos=src["qos"]) + logger.debug("[mqtt] subscribed: %s → %s", src["name"], src["topic"]) + + async for message in client.messages: + topic_str = str(message.topic) + payload = _decode_payload(message.payload) + src = _match_source(topic_str, topic_map) + if src is None: + continue + normalizer_name = src["normalizer"] + handler = _NORMALIZERS.get(normalizer_name) + if handler is None: + logger.warning("[mqtt] unknown normalizer %r for topic %s", + normalizer_name, topic_str) + continue + try: + await handler(topic_str, payload) + except Exception as exc: + logger.warning("[mqtt] handler error (%s / %s): %s", + normalizer_name, topic_str, exc) + + except aiomqtt.MqttError as exc: + logger.warning("[mqtt] broker %s:%s error: %s — retry in %ds", + broker, port, exc, _RETRY_DELAY) + await asyncio.sleep(_RETRY_DELAY) + except Exception as exc: + logger.error("[mqtt] unexpected error on %s:%s: %s — retry in %ds", + broker, port, exc, _RETRY_DELAY) + await asyncio.sleep(_RETRY_DELAY) + + +def _broker_auth(sources: list[dict]) -> tuple[str | None, str | None]: + """Return (username, password) for the first auth-enabled source in the group.""" + for src in sources: + if src.get("auth_enabled"): + key = re.sub(r"[^A-Z0-9]+", "_", src["name"].upper()).strip("_") + username = os.environ.get(f"MQTT_{key}_USERNAME") + password = os.environ.get(f"MQTT_{key}_PASSWORD") + if username: + return username, password + return None, None + + +def _decode_payload(payload) -> str: + if isinstance(payload, (bytes, bytearray)): + return payload.decode("utf-8", errors="replace") + return str(payload) + + +def _match_source(topic: str, topic_map: dict[str, dict]) -> dict | None: + """Match an incoming topic against registered subscription patterns.""" + for pattern, src in topic_map.items(): + if _topic_matches(pattern, topic): + return src + return None + + +def _topic_matches(pattern: str, topic: str) -> bool: + """MQTT wildcard matching: + matches one level, # matches remaining levels.""" + p_parts = pattern.split("/") + t_parts = topic.split("/") + + for i, p in enumerate(p_parts): + if p == "#": + return True + if i >= len(t_parts): + return False + if p != "+" and p != t_parts[i]: + return False + + return len(p_parts) == len(t_parts) From 111566d3e3cc7b5c02b4a915ae3a6081328d753e Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 21 May 2026 14:56:31 +0000 Subject: [PATCH 2/3] Remove TinyGS integration TinyGS nodes can only connect to one MQTT broker at a time (local or mqtt.tinygs.com), making local integration impossible without losing cloud connectivity. Sunset the entire integration. - Delete poller/pollers/tinygs.py, poller/normalizers/tinygs_mqtt.py - Delete frontend TinyGS layer files (TinyGSLayer.tsx, buildTinyGSLayer.ts) - Remove tinygs_station / tinygs_satellite from entity type filters, store TTLs, sidebar counts, EntityDetail, EntitySearch, CommsPanel, MapOverlay, useWebSocket, SignalQualityChart - Remove tinygs_station from admin.py _TYPE_TO_POLLER - Remove TINYGS_ENABLED env var and all TinyGS config_loader/config_sync references - Update sources.example.yml and .env.example to remove TinyGS entries https://claude.ai/code/session_01AgyyGJSXN21kGaSYF5ZMu9 --- .env.example | 11 +- backend/config_loader.py | 2 +- backend/routers/admin.py | 2 - backend/routers/sources.py | 2 +- config/sources.example.yml | 15 --- .../src/admin/metrics/SignalQualityChart.tsx | 2 +- frontend/src/components/MapOverlay.tsx | 37 +----- .../src/components/layers/TinyGSLayer.tsx | 125 ------------------ frontend/src/components/layout/Sidebar.tsx | 20 --- frontend/src/components/panels/CommsPanel.tsx | 2 +- .../src/components/panels/EntityDetail.tsx | 2 - .../components/panels/EntitySearchPanel.tsx | 65 +-------- frontend/src/hooks/useWebSocket.ts | 1 - frontend/src/layers/buildTinyGSLayer.ts | 117 ---------------- frontend/src/store.ts | 4 +- frontend/src/storeTypes.ts | 2 - poller/config.py | 4 - poller/config_loader.py | 2 +- poller/main.py | 6 - poller/normalizers/tinygs_mqtt.py | 112 ---------------- poller/pollers/mqtt_subscriber.py | 4 +- poller/pollers/tinygs.py | 103 --------------- 22 files changed, 13 insertions(+), 627 deletions(-) delete mode 100644 frontend/src/components/layers/TinyGSLayer.tsx delete mode 100644 frontend/src/layers/buildTinyGSLayer.ts delete mode 100644 poller/normalizers/tinygs_mqtt.py delete mode 100644 poller/pollers/tinygs.py diff --git a/.env.example b/.env.example index 89970e0..cc3d935 100644 --- a/.env.example +++ b/.env.example @@ -19,7 +19,7 @@ REDIS_URL=redis://redis:6379 FRONTEND_PORT=80 # MQTT Broker Port — exposed on LAN for hardware devices to publish to (Default 1883) -# Point TinyGS nodes, RTL_433, Meshtastic, and AIS-catcher at this Pi's LAN IP on this port. +# Point RTL_433, Meshtastic, AIS-catcher, and other IoT devices at this Pi's LAN IP on this port. MQTT_PORT=1883 # MQTT source credentials — only required when auth_enabled=true in sources.yml. @@ -155,14 +155,7 @@ VITE_OBSERVATION_RANGE_KM=50 # Keep false for normal operation. True enables cleaner map snapshots but costs render performance. VITE_PRESERVE_DRAWING_BUFFER=false -# ── 6. TINYGS (OPTIONAL) ───────────────────────────────────────────────────── -# Global TinyGS REST poller — polls the public TinyGS API for all community -# ground station locations. Disabled by default due to upstream API instability. -# For richer local data (packet SNR/RSSI, satellite positions), configure a -# tinygs mqtt_source in sources.yml pointing at your local TinyGS node instead. -TINYGS_ENABLED=false - -# ── 7. TRIMET GTFS-RT (OPTIONAL) ───────────────────────────────────────────── +# ── 6. TRIMET GTFS-RT (OPTIONAL) ───────────────────────────────────────────── # MAX light rail, WES commuter rail, Portland Streetcar real-time positions. # Free AppID at: https://developer.trimet.org/ # route_types: 0=Tram/Streetcar, 1=Light Rail/MAX, 2=Rail/WES (comma-separated) diff --git a/backend/config_loader.py b/backend/config_loader.py index 2f73241..48e5101 100644 --- a/backend/config_loader.py +++ b/backend/config_loader.py @@ -34,7 +34,7 @@ class PollerSourceEntry(BaseModel): class MqttSourceEntry(BaseModel): name: str - normalizer: Literal["tinygs", "rtl_433", "meshtastic", "ais"] + normalizer: Literal["rtl_433", "meshtastic", "ais"] broker: str = "mosquitto" port: int = 1883 topic: str diff --git a/backend/routers/admin.py b/backend/routers/admin.py index 0631be6..f50561b 100644 --- a/backend/routers/admin.py +++ b/backend/routers/admin.py @@ -186,8 +186,6 @@ def delta(key: str) -> float: "news_article": "news", "aprs": "aprs", "traffic": "traffic", - "tinygs_station": "tinygs", - "tinygs_satellite": "mqtt", "rf_sensor": "mqtt", "fire_incident": "fire", "stream_gauge": "streamgauge", diff --git a/backend/routers/sources.py b/backend/routers/sources.py index 28b859b..a6d46b2 100644 --- a/backend/routers/sources.py +++ b/backend/routers/sources.py @@ -243,7 +243,7 @@ async def delete_alert_zone(zone_id: int, db: AsyncSession = Depends(get_db)): class MqttSourceCreate(BaseModel): name: str - normalizer: Literal["tinygs", "rtl_433", "meshtastic", "ais"] + normalizer: Literal["rtl_433", "meshtastic", "ais"] broker: str = "mosquitto" port: int = 1883 topic: str diff --git a/config/sources.example.yml b/config/sources.example.yml index 216c114..1ee5e73 100644 --- a/config/sources.example.yml +++ b/config/sources.example.yml @@ -125,29 +125,14 @@ poller_sources: # "192.168.1.x" — LAN device running its own broker (e.g., Frigate on another Pi) # # normalizer: which parser handles messages on this topic -# tinygs — TinyGS LoRa ground station (configure node MQTT server to Pi LAN IP) # rtl_433 — RTL_433 SDR RF sensor decoder (rtl_433 -F "mqtt://localhost:1883,events=rtl_433/events") # meshtastic — Meshtastic node JSON MQTT uplink (configure node MQTT server to Pi LAN IP) # ais — AIS-catcher MQTT output (alternative to WebSocket poller_source) # # auth_enabled: false for local Mosquitto (no credentials needed) # true for external brokers — set MQTT_{UPPER_SNAKE_NAME}_USERNAME/PASSWORD in .env -# -# TinyGS note: configure your node's MQTT settings to point at this Pi's LAN IP. -# The global TinyGS REST poller (TINYGS_ENABLED) can run alongside or be disabled -# once your local node is providing data. # --------------------------------------------------------------------------- mqtt_sources: - - name: "TinyGS Node" - normalizer: tinygs - broker: mosquitto - port: 1883 - topic: "tinygs/#" - qos: 0 - auth_enabled: false - enabled: false - source: config - - name: "RTL_433 Sensors" normalizer: rtl_433 broker: mosquitto diff --git a/frontend/src/admin/metrics/SignalQualityChart.tsx b/frontend/src/admin/metrics/SignalQualityChart.tsx index 8ae179a..fc0365a 100644 --- a/frontend/src/admin/metrics/SignalQualityChart.tsx +++ b/frontend/src/admin/metrics/SignalQualityChart.tsx @@ -6,7 +6,7 @@ const ENTITY_COLORS: Record = { mesh_node: '#FF8F00', // cat-mesh aprs_position: '#B388FF', // cat-aprs p25: '#FF8F00', // cat-mesh (P25 radio) - tinygs_packet: '#4FC3F7', // cat-stream + } function barColor(type: string) { diff --git a/frontend/src/components/MapOverlay.tsx b/frontend/src/components/MapOverlay.tsx index f4afa85..0da7002 100644 --- a/frontend/src/components/MapOverlay.tsx +++ b/frontend/src/components/MapOverlay.tsx @@ -14,7 +14,7 @@ import { buildCustomLayers } from '../layers/buildCustomLayers' import { buildLightningLayer } from '../layers/buildLightningLayer' import { buildStreamGaugeLayers, type StreamGaugePoint } from '../layers/buildStreamGaugeLayer' import { buildMeshNodeLayers, type MeshNodePoint } from '../layers/buildMeshNodeLayer' -import { buildTinyGSLayers, type TinyGSSatellitePoint, type TinyGSStationPoint } from '../layers/buildTinyGSLayer' + import { extractRailSegments, snapPointToRail, type RailSegment } from '../layers/railSnap' import { applyPVB, type PVBState } from '../layers/pvb' import { DEFAULT_CENTER, OBSERVATION_RANGE_KM, API_BASE } from '../config' @@ -107,7 +107,7 @@ export function MapOverlay({ map }: Props) { const camerasRef = useRef([]) const selectedCamRef = useRef(null) const activeTabRef = useRef('safety') - const entityFilterRef = useRef({ aircraft: true, adsbLocal: true, adsbSupplement: true, vessel: true, mesh_node: true, aprs: true, fire_incident: true, satellite: true, tinygs_station: true, tinygs_satellite: true, rf_sensor: true, train: true }) + const entityFilterRef = useRef({ aircraft: true, adsbLocal: true, adsbSupplement: true, vessel: true, mesh_node: true, aprs: true, fire_incident: true, satellite: true, rf_sensor: true, train: true }) const searchQueryRef = useRef('') const altRangeRef = useRef([0, 60_000]) const speedRangeRef = useRef([0, 600]) @@ -457,28 +457,6 @@ export function MapOverlay({ map }: Props) { ${node.status ? `
${escHtml(node.status)}
` : ''} ` - } else if (layer.id === 'tinygs-satellite-dot') { - const sat = object as TinyGSSatellitePoint - html = ` -
-
- satellite_alt - ${escHtml(sat.name)} -
-
ALT: ${sat.alt_km !== null ? `${sat.alt_km} km` : 'n/a'}
-
- ` - } else if (layer.id === 'tinygs-station-dot') { - const stn = object as TinyGSStationPoint - html = ` -
-
- sensors - ${escHtml(stn.name)} -
-
${stn.online ? 'ONLINE' : 'OFFLINE'}
-
- ` } else if (layer.id === 'geofence-fill') { const geofence = object as GeofenceItem html = ` @@ -529,12 +507,6 @@ export function MapOverlay({ map }: Props) { } else if (picked.layer?.id === 'mesh-node-dots') { const node = picked.object as MeshNodePoint | undefined if (node?.entity_id) selectEntity(node.entity_id) - } else if (picked.layer?.id === 'tinygs-satellite-dot') { - const sat = picked.object as TinyGSSatellitePoint | undefined - if (sat?.entity_id) selectEntity(sat.entity_id) - } else if (picked.layer?.id === 'tinygs-station-dot') { - const stn = picked.object as TinyGSStationPoint | undefined - if (stn?.entity_id) selectEntity(stn.entity_id) } else { const track = picked.object as Track | undefined if (track?.uid) selectEntity(track.uid) @@ -697,11 +669,6 @@ export function MapOverlay({ map }: Props) { ...buildCustomLayers(customLayersRef.current), ...buildGeofenceLayers(geofencesRef.current, geofencesVisibleRef.current), ...buildObservationRingLayers(DEFAULT_CENTER, OBSERVATION_RANGE_KM, true), - ...buildTinyGSLayers( - Object.values(entitiesRef.current), - entityFilterRef.current.satellite, - entityFilterRef.current.tinygs_station, - ), ...buildMeshNodeLayers( Object.values(entitiesRef.current), entityFilterRef.current.mesh_node, diff --git a/frontend/src/components/layers/TinyGSLayer.tsx b/frontend/src/components/layers/TinyGSLayer.tsx deleted file mode 100644 index 796268d..0000000 --- a/frontend/src/components/layers/TinyGSLayer.tsx +++ /dev/null @@ -1,125 +0,0 @@ -import { useEffect } from 'react' -import maplibregl from 'maplibre-gl' -import { useEntitiesByType } from '../../hooks/useEntities' -import { useCivicStore } from '../../store' - -interface Props { map: maplibregl.Map } - -const SAT_SRC = 'tinygs-satellites' -const SAT_GLOW = 'tinygs-satellite-glow' -const SAT_DOT = 'tinygs-satellite-dot' -const SAT_LABEL = 'tinygs-satellite-label' - -const STN_SRC = 'tinygs-stations' -const STN_RING = 'tinygs-station-ring' -const STN_DOT = 'tinygs-station-dot' - -const SAT_COLOR = '#9E6CFF' -const SAT_LABEL_COLOR = '#C4A8FF' -const STN_COLOR = '#FF8F00' - -export function TinyGSLayer({ map }: Props) { - const satellites = useEntitiesByType('satellite') - const stations = useEntitiesByType('tinygs_station') - const filterSat = useCivicStore((s) => s.entityFilter.satellite) - const filterStn = useCivicStore((s) => s.entityFilter.tinygs_station) - const selectEntity = useCivicStore((s) => s.selectEntity) - - // ── Satellite layer setup / teardown ────────────────────────────────────── - useEffect(() => { - const handleClick = (e: maplibregl.MapMouseEvent & { features?: maplibregl.MapGeoJSONFeature[] }) => { - const f = e.features?.[0] - if (f?.properties?.id) selectEntity(f.properties.id as string) - } - const handleEnter = () => { map.getCanvas().style.cursor = 'pointer' } - const handleLeave = () => { map.getCanvas().style.cursor = '' } - - if (!map.getSource(SAT_SRC)) { - map.addSource(SAT_SRC, { type: 'geojson', data: { type: 'FeatureCollection', features: [] } }) - map.addLayer({ id: SAT_GLOW, type: 'circle', source: SAT_SRC, paint: { 'circle-radius': 16, 'circle-color': SAT_COLOR, 'circle-opacity': 0.15 } }) - map.addLayer({ id: SAT_DOT, type: 'circle', source: SAT_SRC, paint: { 'circle-radius': 5, 'circle-color': SAT_COLOR, 'circle-stroke-width': 1.5, 'circle-stroke-color': '#fff' } }) - map.addLayer({ - id: SAT_LABEL, type: 'symbol', source: SAT_SRC, - layout: { - 'text-field': ['case', ['has', 'alt'], ['concat', ['get', 'name'], '\n', ['to-string', ['get', 'alt']], ' km'], ['get', 'name']], - 'text-font': ['Open Sans Regular', 'Arial Unicode MS Regular'], - 'text-size': 10, 'text-offset': [0, 1.6], 'text-anchor': 'top', 'text-max-width': 10, - }, - paint: { 'text-color': SAT_LABEL_COLOR, 'text-halo-color': '#050505', 'text-halo-width': 1.2 }, - }) - map.on('click', SAT_DOT, handleClick) - map.on('mouseenter', SAT_DOT, handleEnter) - map.on('mouseleave', SAT_DOT, handleLeave) - } - - return () => { - map.off('click', SAT_DOT, handleClick) - map.off('mouseenter', SAT_DOT, handleEnter) - map.off('mouseleave', SAT_DOT, handleLeave) - if (map.getLayer(SAT_LABEL)) map.removeLayer(SAT_LABEL) - if (map.getLayer(SAT_DOT)) map.removeLayer(SAT_DOT) - if (map.getLayer(SAT_GLOW)) map.removeLayer(SAT_GLOW) - if (map.getSource(SAT_SRC)) map.removeSource(SAT_SRC) - } - }, [map, selectEntity]) - - // ── Satellite data updates ───────────────────────────────────────────────── - useEffect(() => { - const src = map.getSource(SAT_SRC) as maplibregl.GeoJSONSource | undefined - if (!src) return - const visible = filterSat ? satellites : [] - src.setData({ - type: 'FeatureCollection', - features: visible.map((s) => ({ - type: 'Feature', - geometry: { type: 'Point', coordinates: [s.lon!, s.lat!] }, - properties: { id: s.entity_id, name: s.display_name ?? s.entity_id, ...(s.altitude != null && { alt: Math.round(s.altitude / 1000) }) }, - })), - }) - }, [satellites, map, filterSat]) - - // ── Station layer setup / teardown ──────────────────────────────────────── - useEffect(() => { - const handleClick = (e: maplibregl.MapMouseEvent & { features?: maplibregl.MapGeoJSONFeature[] }) => { - const f = e.features?.[0] - if (f?.properties?.id) selectEntity(f.properties.id as string) - } - const handleEnter = () => { map.getCanvas().style.cursor = 'pointer' } - const handleLeave = () => { map.getCanvas().style.cursor = '' } - - if (!map.getSource(STN_SRC)) { - map.addSource(STN_SRC, { type: 'geojson', data: { type: 'FeatureCollection', features: [] } }) - map.addLayer({ id: STN_RING, type: 'circle', source: STN_SRC, paint: { 'circle-radius': 9, 'circle-color': ['case', ['get', 'online'], STN_COLOR, '#555'], 'circle-opacity': 0.25 } }) - map.addLayer({ id: STN_DOT, type: 'circle', source: STN_SRC, paint: { 'circle-radius': 5, 'circle-color': ['case', ['get', 'online'], STN_COLOR, '#888'], 'circle-stroke-width': 1, 'circle-stroke-color': '#fff' } }) - map.on('click', STN_DOT, handleClick) - map.on('mouseenter', STN_DOT, handleEnter) - map.on('mouseleave', STN_DOT, handleLeave) - } - - return () => { - map.off('click', STN_DOT, handleClick) - map.off('mouseenter', STN_DOT, handleEnter) - map.off('mouseleave', STN_DOT, handleLeave) - if (map.getLayer(STN_DOT)) map.removeLayer(STN_DOT) - if (map.getLayer(STN_RING)) map.removeLayer(STN_RING) - if (map.getSource(STN_SRC)) map.removeSource(STN_SRC) - } - }, [map, selectEntity]) - - // ── Station data updates ─────────────────────────────────────────────────── - useEffect(() => { - const src = map.getSource(STN_SRC) as maplibregl.GeoJSONSource | undefined - if (!src) return - const visible = filterStn ? stations : [] - src.setData({ - type: 'FeatureCollection', - features: visible.map((s) => ({ - type: 'Feature', - geometry: { type: 'Point', coordinates: [s.lon!, s.lat!] }, - properties: { id: s.entity_id, name: s.display_name ?? s.entity_id, online: s.status === 'online' }, - })), - }) - }, [stations, map, filterStn]) - - return null -} diff --git a/frontend/src/components/layout/Sidebar.tsx b/frontend/src/components/layout/Sidebar.tsx index 87bb6df..58ee108 100644 --- a/frontend/src/components/layout/Sidebar.tsx +++ b/frontend/src/components/layout/Sidebar.tsx @@ -133,7 +133,6 @@ export function Sidebar() { const meshNodes = entityList.filter((e) => e.entity_type === 'mesh_node').length const streamGauges = entityList.filter((e) => e.entity_type === 'stream_gauge').length const satellites = entityList.filter((e) => e.entity_type === 'satellite').length - const tinygsStations = entityList.filter((e) => e.entity_type === 'tinygs_station').length const lightningCount = lightningStrikes.length const cams = cameras.length const wAlerts = weather.alerts.length @@ -396,16 +395,6 @@ export function Sidebar() { {satellites} - - - - @@ -443,41 +417,6 @@ export function EntitySearchPanel() { )} - {/* TinyGS satellites + stations */} - {tinygsEntities.map((e) => { - const isSelected = selectedEntityId === e.entity_id - const color = TYPE_COLOR[e.entity_type] ?? 'text-on-surface-variant' - const icon = TYPE_ICON[e.entity_type] ?? 'sensors' - const identity = e.identity as Record | undefined - const rssi = identity?.rssi != null ? `${identity.rssi} dBm` : null - const altKm = e.altitude != null ? `${Math.round(e.altitude / 1000)} km` : null - const subtitle = e.entity_type === 'satellite' - ? [altKm, rssi].filter(Boolean).join(' · ') - : e.status ?? '' - return ( - - ) - })} ) diff --git a/frontend/src/hooks/useWebSocket.ts b/frontend/src/hooks/useWebSocket.ts index bd5fb0f..a74e613 100644 --- a/frontend/src/hooks/useWebSocket.ts +++ b/frontend/src/hooks/useWebSocket.ts @@ -18,7 +18,6 @@ const FILTER_KEY_TO_ENTITY_TYPE: Partial> aprs: 'aprs', fire_incident: 'fire_incident', satellite: 'satellite', - tinygs_station: 'tinygs_station', train: 'train', } diff --git a/frontend/src/layers/buildTinyGSLayer.ts b/frontend/src/layers/buildTinyGSLayer.ts deleted file mode 100644 index aa19a4f..0000000 --- a/frontend/src/layers/buildTinyGSLayer.ts +++ /dev/null @@ -1,117 +0,0 @@ -import { ScatterplotLayer } from '@deck.gl/layers' -import type { Entity } from '../store' - -export interface TinyGSSatellitePoint { - entity_id: string - name: string - lon: number - lat: number - alt_km: number | null -} - -export interface TinyGSStationPoint { - entity_id: string - name: string - lon: number - lat: number - online: boolean -} - -function toSatPoint(e: Entity): TinyGSSatellitePoint | null { - if (e.entity_type !== 'satellite' || e.lat == null || e.lon == null) return null - return { - entity_id: e.entity_id, - name: e.display_name ?? e.entity_id, - lon: e.lon, - lat: e.lat, - alt_km: typeof e.altitude === 'number' ? Math.round(e.altitude / 1000) : null, - } -} - -function toStationPoint(e: Entity): TinyGSStationPoint | null { - if (e.entity_type !== 'tinygs_station' || e.lat == null || e.lon == null) return null - return { - entity_id: e.entity_id, - name: e.display_name ?? e.entity_id, - lon: e.lon, - lat: e.lat, - online: e.status === 'online', - } -} - -export function buildTinyGSLayers( - entities: Entity[], - satellitesVisible: boolean, - stationsVisible: boolean, -) { - const satPoints = satellitesVisible - ? entities.map(toSatPoint).filter((p): p is TinyGSSatellitePoint => p !== null) - : [] - const stnPoints = stationsVisible - ? entities.map(toStationPoint).filter((p): p is TinyGSStationPoint => p !== null) - : [] - - const layers = [] - - if (satPoints.length > 0) { - layers.push( - new ScatterplotLayer({ - id: 'tinygs-satellite-glow', - data: satPoints, - pickable: false, - filled: true, - stroked: false, - radiusUnits: 'pixels', - getPosition: (p) => [p.lon, p.lat], - getRadius: 16, - getFillColor: [158, 108, 255, 55], - }), - new ScatterplotLayer({ - id: 'tinygs-satellite-dot', - data: satPoints, - pickable: true, - filled: true, - stroked: true, - radiusUnits: 'pixels', - getPosition: (p) => [p.lon, p.lat], - getRadius: 5, - getFillColor: [158, 108, 255, 240], - getLineColor: [255, 255, 255, 255], - lineWidthUnits: 'pixels', - getLineWidth: 1.5, - }), - ) - } - - if (stnPoints.length > 0) { - layers.push( - new ScatterplotLayer({ - id: 'tinygs-station-ring', - data: stnPoints, - pickable: false, - filled: true, - stroked: false, - radiusUnits: 'pixels', - getPosition: (p) => [p.lon, p.lat], - getRadius: 9, - getFillColor: (p) => (p.online ? [255, 143, 0, 64] : [85, 85, 85, 64]), - }), - new ScatterplotLayer({ - id: 'tinygs-station-dot', - data: stnPoints, - pickable: true, - filled: true, - stroked: true, - radiusUnits: 'pixels', - getPosition: (p) => [p.lon, p.lat], - getRadius: 5, - getFillColor: (p) => (p.online ? [255, 143, 0, 230] : [136, 136, 136, 230]), - getLineColor: [255, 255, 255, 255], - lineWidthUnits: 'pixels', - getLineWidth: 1, - }), - ) - } - - return layers -} diff --git a/frontend/src/store.ts b/frontend/src/store.ts index 0895613..fab17c5 100644 --- a/frontend/src/store.ts +++ b/frontend/src/store.ts @@ -368,7 +368,7 @@ export const useCivicStore = create()( mobileNavOpen: false, settingsOpen: false, helpOpen: false, - entityFilter: { aircraft: true, adsbLocal: true, adsbSupplement: true, vessel: true, mesh_node: true, aprs: true, fire_incident: true, satellite: true, tinygs_station: true, tinygs_satellite: true, rf_sensor: true, train: true }, + entityFilter: { aircraft: true, adsbLocal: true, adsbSupplement: true, vessel: true, mesh_node: true, aprs: true, fire_incident: true, satellite: true, rf_sensor: true, train: true }, entitySearchQuery: '', entityAltRange: ALT_RANGE_DEFAULT, entitySpeedRange: SPD_RANGE_DEFAULT, @@ -457,8 +457,6 @@ export const useCivicStore = create()( vessel: 600_000, // 10 min — AIS updates are infrequent mesh_node: 604_800_000, // 7 days — mesh nodes are semi-permanent infrastructure satellite: 1_800_000, // 30 min — matches poller TTL - tinygs_station: 600_000, // 10 min — station ping is every ~60 s - tinygs_satellite: 3_600_000, // 60 min — matches poller TTL, satellites pass infrequently rf_sensor: 900_000, // 15 min — matches poller TTL, sensors broadcast every few min stream_gauge: 600_000, // 10 min — gauges are polled every 5 min tak_client: 300_000, // 5 min — TAK SA ping is every 30 s–2 min diff --git a/frontend/src/storeTypes.ts b/frontend/src/storeTypes.ts index 20439dc..0aa0af1 100644 --- a/frontend/src/storeTypes.ts +++ b/frontend/src/storeTypes.ts @@ -259,8 +259,6 @@ export type EntityTypeFilter = { aprs: boolean fire_incident: boolean satellite: boolean - tinygs_station: boolean - tinygs_satellite: boolean rf_sensor: boolean train: boolean } diff --git a/poller/config.py b/poller/config.py index b22b31a..6179e3e 100644 --- a/poller/config.py +++ b/poller/config.py @@ -182,10 +182,6 @@ def regions(self) -> list[RegionConfig]: tvfr_enabled: bool = False tvfr_rss_url: str = "" - # TinyGS integration is sunset by default due to upstream API instability. - # Set true to opt in and re-enable the poller. - tinygs_enabled: bool = False - # TriMet GTFS-RT — Portland Metro rail (MAX light rail, WES commuter, Portland Streetcar) # Free AppID at: https://developer.trimet.org/ trimet_gtfs_enabled: bool = False diff --git a/poller/config_loader.py b/poller/config_loader.py index bd1bf3b..6131243 100644 --- a/poller/config_loader.py +++ b/poller/config_loader.py @@ -34,7 +34,7 @@ class PollerSourceEntry(BaseModel): class MqttSourceEntry(BaseModel): name: str - normalizer: Literal["tinygs", "rtl_433", "meshtastic", "ais"] + normalizer: Literal["rtl_433", "meshtastic", "ais"] broker: str = "mosquitto" port: int = 1883 topic: str diff --git a/poller/main.py b/poller/main.py index e9e0acc..04c3c93 100644 --- a/poller/main.py +++ b/poller/main.py @@ -23,7 +23,6 @@ from pollers.cot_receiver import CotReceiver from pollers.p25_recorder import P25AudioRecorder from pollers.anomaly import AnomalyDetectionPoller -from pollers.tinygs import TinyGSPoller from pollers.mqtt_subscriber import MqttSubscriberPoller from pollers.lightning import LightningPoller from pollers.streamgauge import StreamGaugePoller @@ -94,11 +93,6 @@ async def main(): RailInfrastructurePoller(), ] - if settings.tinygs_enabled: - pollers.append(TinyGSPoller()) - else: - logger.info("[tinygs] REST poller disabled (set TINYGS_ENABLED=true, or configure a tinygs mqtt_source for local node data)") - pollers.append(MqttSubscriberPoller()) tasks = [asyncio.create_task(p.run()) for p in pollers] diff --git a/poller/normalizers/tinygs_mqtt.py b/poller/normalizers/tinygs_mqtt.py deleted file mode 100644 index 13dd43e..0000000 --- a/poller/normalizers/tinygs_mqtt.py +++ /dev/null @@ -1,112 +0,0 @@ -""" -TinyGS MQTT normalizer. - -Handles packets published by a local TinyGS LoRa ground station to the -local Mosquitto broker. Each received satellite packet produces: - - A tinygs_satellite entity updated with the satellite's computed position - at time of reception (satPos field). - - A satellite_contact event recording SNR, RSSI, frequency, and decoded - payload. - -The ground station entity (tinygs_station) continues to be published by -the REST-based TinyGS poller when no local MQTT source is active. -""" - -import json -import logging -import time - -from bus import publish_entity -from db import write_event - -logger = logging.getLogger(__name__) - -_SATELLITE_TTL = 3600 # satellite stays on map for 1 hour after last contact -_STATION_TTL = 600 # ground station entity TTL (matches REST poller) - - -async def handle(topic: str, payload: str) -> None: - try: - data = json.loads(payload) - except (json.JSONDecodeError, ValueError): - logger.debug("[tinygs_mqtt] non-JSON payload on %s", topic) - return - - if not isinstance(data, dict): - return - - sat_name = data.get("satellite") or data.get("sat") or "" - station = data.get("station") or data.get("stationName") or "" - snr = _coerce_float(data.get("SNR") or data.get("snr")) - rssi = _coerce_float(data.get("RSSI") or data.get("rssi")) - frequency = _coerce_float(data.get("frequency") or data.get("freq")) - frame = data.get("frame") - parsed = data.get("parsed") - rx_time = data.get("time") or data.get("rxTime") or time.time() - - # Satellite position from satPos field - sat_pos = data.get("satPos") or {} - sat_lat = _coerce_float(sat_pos.get("lat") or sat_pos.get("latitude")) - sat_lon = _coerce_float(sat_pos.get("lon") or sat_pos.get("longitude") or sat_pos.get("lng")) - sat_alt = _coerce_float(sat_pos.get("alt") or sat_pos.get("altitude")) - - if sat_name and sat_lat is not None and sat_lon is not None: - entity = { - "entity_id": f"tinygs:satellite:{sat_name}", - "entity_type": "tinygs_satellite", - "source": "tinygs", - "display_name": sat_name, - "lat": sat_lat, - "lon": sat_lon, - "altitude": sat_alt, - "status": "active", - "identity": { - "satellite_name": sat_name, - "frequency": frequency, - "mode": data.get("mode"), - "last_station": station, - }, - "tags": ["tinygs", "satellite"], - "signal_quality": _snr_to_quality(snr), - } - await publish_entity(entity, ttl=_SATELLITE_TTL) - - # Always record a contact event when a packet arrives - entity_id = f"tinygs:satellite:{sat_name}" if sat_name else "tinygs:unknown" - try: - await write_event( - event_type="satellite_contact", - entity_id=entity_id, - severity="info", - summary=f"Packet from {sat_name}" if sat_name else "TinyGS packet received", - details={ - "satellite": sat_name or None, - "station": station or None, - "snr": snr, - "rssi": rssi, - "frequency": frequency, - "mode": data.get("mode"), - "frame": frame, - "parsed": parsed, - "rx_time": rx_time, - }, - ) - except Exception as exc: - logger.warning("[tinygs_mqtt] event write failed: %s", exc) - - -def _coerce_float(v) -> float | None: - if v is None: - return None - try: - return float(v) - except (TypeError, ValueError): - return None - - -def _snr_to_quality(snr: float | None) -> float | None: - """Map SNR (dB) to a 0–1 signal quality score.""" - if snr is None: - return None - # Typical LoRa SNR range: -20 dB (minimum) to +10 dB (excellent) - return max(0.0, min(1.0, (snr + 20) / 30)) diff --git a/poller/pollers/mqtt_subscriber.py b/poller/pollers/mqtt_subscriber.py index 3f582c9..26fb130 100644 --- a/poller/pollers/mqtt_subscriber.py +++ b/poller/pollers/mqtt_subscriber.py @@ -12,7 +12,7 @@ MQTT_{UPPER_SNAKE_NAME}_USERNAME MQTT_{UPPER_SNAKE_NAME}_PASSWORD -Supported normalizers: tinygs, rtl_433, meshtastic, ais +Supported normalizers: rtl_433, meshtastic, ais """ import asyncio @@ -23,7 +23,6 @@ import aiomqtt from .base import BasePoller -import normalizers.tinygs_mqtt as _tinygs import normalizers.rtl_433 as _rtl_433 import normalizers.meshtastic_mqtt as _meshtastic import normalizers.ais_mqtt as _ais @@ -34,7 +33,6 @@ _KEEPALIVE = 60 # MQTT keepalive interval _NORMALIZERS: dict = { - "tinygs": _tinygs.handle, "rtl_433": _rtl_433.handle, "meshtastic": _meshtastic.handle, "ais": _ais.handle, diff --git a/poller/pollers/tinygs.py b/poller/pollers/tinygs.py deleted file mode 100644 index bcc0a42..0000000 --- a/poller/pollers/tinygs.py +++ /dev/null @@ -1,103 +0,0 @@ -""" -TinyGS poller — polls the public TinyGS community REST API. - -No authentication required. Publishes all public ground stations as -`tinygs_station` entities so they appear on the situational-awareness map. - -The previous MQTT-based implementation required a physical LoRa ground station, -Telegram-issued credentials, and a private-CA TLS chain that could not be -verified by the default system store. The public REST API avoids all of that. - -Endpoint: https://api.tinygs.com/v1/stations -Interval: every 5 minutes (station data updates infrequently) -""" - -import logging -import time - -import httpx - -from bus import publish_entity -from .base import BasePoller - -logger = logging.getLogger(__name__) - -_STATIONS_URL = "https://api.tinygs.com/v1/stations" -_STATION_TTL = 600 # entities expire after 10 min if not re-polled - - -class TinyGSPoller(BasePoller): - name = "tinygs" - interval = 300 # seconds between polls - - async def poll(self): - async with httpx.AsyncClient(timeout=20, follow_redirects=True) as client: - await self._poll_stations(client) - - async def _poll_stations(self, client: httpx.AsyncClient) -> None: - try: - resp = await client.get(_STATIONS_URL) - resp.raise_for_status() - except Exception as exc: - logger.warning("[tinygs] station fetch failed: %s", exc) - return - - try: - data = resp.json() - except Exception as exc: - logger.warning("[tinygs] could not parse response: %s", exc) - return - - # API may return a bare list or {"stations": [...]} - stations: list = data if isinstance(data, list) else data.get("stations", []) - if not isinstance(stations, list): - logger.warning("[tinygs] unexpected response shape") - return - - published = 0 - for stn in stations: - if not isinstance(stn, dict): - continue - - name = stn.get("name") or stn.get("stationName") or "" - if not name: - continue - - lat = stn.get("lat") or stn.get("latitude") - lon = stn.get("lng") or stn.get("lon") or stn.get("longitude") - if lat is None or lon is None: - continue - - try: - lat, lon = float(lat), float(lon) - except (TypeError, ValueError): - continue - - # Stations that heard a packet in the last hour count as "online" - last_ts = stn.get("lastPacketTime") or stn.get("lastSeen") or 0 - try: - last_ts_f = float(last_ts) - except (TypeError, ValueError): - last_ts_f = 0.0 - is_online = last_ts_f > 0 and (time.time() - last_ts_f) < 3600 - - entity = { - "entity_id": f"tinygs:station:{name}", - "entity_type": "tinygs_station", - "source": "tinygs", - "display_name": name, - "lat": lat, - "lon": lon, - "status": "online" if is_online else "offline", - "identity": { - "station_name": name, - "total_packets": stn.get("totalPackets"), - "confirmed_packets": stn.get("confirmed"), - "last_active_ts": last_ts or None, - }, - "tags": ["tinygs", "ground_station"], - } - await publish_entity(entity, ttl=_STATION_TTL) - published += 1 - - logger.info("[tinygs] published %d stations", published) From 324e3fbc3abc9ee18db070f2c779e9c551fa5820 Mon Sep 17 00:00:00 2001 From: Claude Date: Thu, 21 May 2026 22:45:08 +0000 Subject: [PATCH 3/3] Surface rf_sensor entities from RTL_433 MQTT normalizer MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Wires rf_sensor into the full frontend pipeline so 433MHz RF sensors decoded by RTL_433 are visible on the map and inspectable. - Add 'sensor' Track type to storeTypes; add signal_quality to Entity - entityToTrack: recognize rf_sensor → type 'sensor' - atlasIcons: add diamond icon (row 3, col 1) for rf_sensor - buildEntityLayers: render sensor type with diamond icon + lime label at z10+ - colorUtils: lime-rf (#76DD00) color for sensor tracks - tailwind.config.js: add lime-rf / lime-rf-muted design tokens - RfSensorOverview: new detail panel showing readings grid (temp, humidity, pressure, wind, rain, UV, power, etc.), battery status, signal quality bar - EntityDetail: wire in RfSensorOverview, add rf_sensor to TYPE_COLORS/TYPE_ICONS - useWebSocket: add rf_sensor to FILTER_KEY_TO_ENTITY_TYPE - Sidebar: add RF Sensors count + toggle button (compact and expanded) https://claude.ai/code/session_01AgyyGJSXN21kGaSYF5ZMu9 --- frontend/src/components/layout/Sidebar.tsx | 20 +++ .../src/components/panels/EntityDetail.tsx | 5 + .../panels/entity/RfSensorOverview.tsx | 115 ++++++++++++++++++ frontend/src/entityUtils.ts | 5 +- frontend/src/hooks/useWebSocket.ts | 1 + frontend/src/layers/atlasIcons.ts | 23 ++++ frontend/src/layers/buildEntityLayers.ts | 20 ++- frontend/src/layers/colorUtils.ts | 1 + frontend/src/storeTypes.ts | 7 +- frontend/tailwind.config.js | 2 + 10 files changed, 193 insertions(+), 6 deletions(-) create mode 100644 frontend/src/components/panels/entity/RfSensorOverview.tsx diff --git a/frontend/src/components/layout/Sidebar.tsx b/frontend/src/components/layout/Sidebar.tsx index 58ee108..7f4d3f9 100644 --- a/frontend/src/components/layout/Sidebar.tsx +++ b/frontend/src/components/layout/Sidebar.tsx @@ -131,6 +131,7 @@ export function Sidebar() { const aprs = entityList.filter((e) => e.entity_type === 'aprs').length const fire = entityList.filter((e) => e.entity_type === 'fire_incident').length const meshNodes = entityList.filter((e) => e.entity_type === 'mesh_node').length + const rfSensors = entityList.filter((e) => e.entity_type === 'rf_sensor').length const streamGauges = entityList.filter((e) => e.entity_type === 'stream_gauge').length const satellites = entityList.filter((e) => e.entity_type === 'satellite').length const lightningCount = lightningStrikes.length @@ -365,6 +366,16 @@ export function Sidebar() { {meshNodes} + + +