From 231d9c9f690d8ae1eeac16bac905f5c2abe5178f Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 22 May 2026 13:17:28 +0000 Subject: [PATCH 1/2] fix: prevent partial mesh entity updates from clobbering display_name and position MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Meshtastic sends position, nodeinfo, and telemetry as separate MQTT messages. Because publish_entity did a full Redis SET and the DB ON CONFLICT clause fully replaced display_name and identity, a position update would overwrite the human name set by nodeinfo (e.g. "K5ABC" → "!12345678"), and nodeinfo would wipe the lat/lon set by position. - bus.py: add merge=True option to publish_entity; performs a Redis GET+merge before writing, deep-merging the identity dict so partial updates only update the fields they provide - db.py: use COALESCE for display_name (NULL doesn't overwrite a good name) and the PostgreSQL || operator to merge the identity JSONB column instead of replacing it wholesale - meshtastic_mqtt.py: position and telemetry handlers drop display_name (not authoritative for names) and use merge=True; nodeinfo uses merge=True so it preserves existing lat/lon while setting the human name - meshcore_pymc.py: add altitude, status, and last_seen fields to entity dict for consistency with the other two mesh sources https://claude.ai/code/session_01C1LWuAM16EChKffhfZvAWd --- poller/bus.py | 24 +++++++++++++++++++++++- poller/db.py | 4 ++-- poller/normalizers/meshtastic_mqtt.py | 10 +++------- poller/pollers/meshcore_pymc.py | 2 ++ 4 files changed, 30 insertions(+), 10 deletions(-) diff --git a/poller/bus.py b/poller/bus.py index d46d337..49a83de 100644 --- a/poller/bus.py +++ b/poller/bus.py @@ -20,12 +20,34 @@ async def get_bus() -> Redis: return _redis -async def publish_entity(entity: dict, ttl: int = 120, record_observation: bool = True): +async def publish_entity( + entity: dict, + ttl: int = 120, + record_observation: bool = True, + merge: bool = False, +): r = await get_bus() entity = sanitize_payload(entity) entity_id = entity["entity_id"] key = f"entity:{entity_id}" + if merge: + existing_raw = await r.get(key) + if existing_raw: + try: + existing = json.loads(existing_raw) + merged = dict(existing) + for k, v in entity.items(): + if v is not None: + if k == "identity" and isinstance(v, dict) and isinstance(merged.get(k), dict): + # Deep-merge identity: new non-None keys win, existing keys preserved + merged[k] = {**merged[k], **{ik: iv for ik, iv in v.items() if iv is not None}} + else: + merged[k] = v + entity = merged + except Exception: + pass + should_publish = True if settings.adsb_publish_only_changes: previous = _entity_cache.get(entity_id) diff --git a/poller/db.py b/poller/db.py index a2f0c69..95853d6 100644 --- a/poller/db.py +++ b/poller/db.py @@ -135,8 +135,8 @@ async def write_entity_observation(entity: dict, record_observation: bool = True (entity_id, entity_type, source, display_name, identity, tags, first_seen, last_seen) VALUES ($1::text, $2::text, $3::text, $4::text, $5::jsonb, $6::jsonb, NOW(), NOW()) ON CONFLICT (entity_id) DO UPDATE SET - display_name = EXCLUDED.display_name, - identity = EXCLUDED.identity, + display_name = COALESCE(EXCLUDED.display_name, entities.display_name), + identity = entities.identity || EXCLUDED.identity, tags = EXCLUDED.tags, last_seen = NOW() """, diff --git a/poller/normalizers/meshtastic_mqtt.py b/poller/normalizers/meshtastic_mqtt.py index 9dcecc8..a8d9599 100644 --- a/poller/normalizers/meshtastic_mqtt.py +++ b/poller/normalizers/meshtastic_mqtt.py @@ -86,7 +86,6 @@ async def _handle_position(data: dict, entity_id: str, sender_hex: str) -> None: "entity_id": entity_id, "entity_type": "mesh_node", "source": "meshtastic", - "display_name": sender_hex, "lat": lat, "lon": lon, "altitude": float(alt) if alt is not None else None, @@ -95,7 +94,7 @@ async def _handle_position(data: dict, entity_id: str, sender_hex: str) -> None: "tags": ["mesh_node"], "signal_quality": _snr_to_quality(data.get("snr") or data.get("rxSnr")), } - await publish_entity(entity, ttl=_NODE_TTL) + await publish_entity(entity, ttl=_NODE_TTL, merge=True) async def _handle_nodeinfo(data: dict, entity_id: str, sender_hex: str) -> None: @@ -121,7 +120,7 @@ async def _handle_nodeinfo(data: dict, entity_id: str, sender_hex: str) -> None: }, "tags": ["mesh_node"], } - await publish_entity(entity, ttl=_NODE_TTL, record_observation=False) + await publish_entity(entity, ttl=_NODE_TTL, record_observation=False, merge=True) async def _handle_telemetry(data: dict, entity_id: str, sender_hex: str) -> None: @@ -150,14 +149,11 @@ async def _handle_telemetry(data: dict, entity_id: str, sender_hex: str) -> None "entity_id": entity_id, "entity_type": "mesh_node", "source": "meshtastic", - "display_name": sender_hex, - "lat": None, - "lon": None, "status": "active", "identity": identity_update, "tags": ["mesh_node"], } - await publish_entity(entity, ttl=_NODE_TTL, record_observation=False) + await publish_entity(entity, ttl=_NODE_TTL, record_observation=False, merge=True) async def _handle_text( diff --git a/poller/pollers/meshcore_pymc.py b/poller/pollers/meshcore_pymc.py index f073bb6..74b38fa 100644 --- a/poller/pollers/meshcore_pymc.py +++ b/poller/pollers/meshcore_pymc.py @@ -87,6 +87,8 @@ def _contact_to_entity(contact: dict, source_url: str) -> dict | None: "display_name": name, "lat": lat, "lon": lon, + "altitude": None, + "status": "", "identity": { "public_key": pub_key, "node_id": pub_key[:12], From f2135a38f50bec0f650b2b6c83386fe177ad6bfe Mon Sep 17 00:00:00 2001 From: Claude Date: Fri, 22 May 2026 20:04:29 +0000 Subject: [PATCH 2/2] fix: align signal_quality, hw_model, and node_a across all three mesh sources - mesh_node.py: add shared snr_to_quality() export so the formula is defined once; fix hw_model to emit None instead of "" when the model is unknown - meshtastic_mqtt.py: import snr_to_quality from the shared normalizer, removing the duplicate private copy - meshcore.py: import snr_to_quality; stamp signal_quality on the sending node's entity on every throttled packet event (merge=True so only that field changes); replace the hardcoded "local" node_a sentinel with the actual local node entity_id from _local_node_ids https://claude.ai/code/session_01C1LWuAM16EChKffhfZvAWd --- poller/normalizers/mesh_node.py | 14 ++++++++++++-- poller/normalizers/meshtastic_mqtt.py | 10 ++-------- poller/pollers/meshcore.py | 19 +++++++++++++++---- 3 files changed, 29 insertions(+), 14 deletions(-) diff --git a/poller/normalizers/mesh_node.py b/poller/normalizers/mesh_node.py index 32ec860..a33c796 100644 --- a/poller/normalizers/mesh_node.py +++ b/poller/normalizers/mesh_node.py @@ -2,6 +2,16 @@ from typing import Optional +def snr_to_quality(snr) -> float | None: + """Convert SNR (dB) to a normalised signal quality in [0, 1].""" + if snr is None: + return None + try: + return max(0.0, min(1.0, (float(snr) + 20) / 30)) + except (TypeError, ValueError): + return None + + _CONTACT_TYPES = { 0: "unknown", 1: "client", @@ -28,7 +38,7 @@ def normalize_mesh_node(data: dict) -> Optional[dict]: "identity": { "node_id": node_id, "short_name": data.get("short_name", ""), - "hw_model": data.get("hw_model", ""), + "hw_model": data.get("hw_model") or None, }, "lat": data.get("lat"), "lon": data.get("lon"), @@ -72,7 +82,7 @@ def normalize_remoteterm_contact(data: dict) -> Optional[dict]: "node_id": pub_key[:12], "short_name": pub_key[:12], "contact_type": node_type, - "hw_model": "", + "hw_model": None, "on_radio": data.get("on_radio", False), "favorite": data.get("favorite", False), "battery_level": data.get("battery_level"), diff --git a/poller/normalizers/meshtastic_mqtt.py b/poller/normalizers/meshtastic_mqtt.py index a8d9599..13d0ebc 100644 --- a/poller/normalizers/meshtastic_mqtt.py +++ b/poller/normalizers/meshtastic_mqtt.py @@ -30,6 +30,7 @@ from bus import publish_entity from db import write_mesh_message +from normalizers.mesh_node import snr_to_quality logger = logging.getLogger(__name__) @@ -92,7 +93,7 @@ async def _handle_position(data: dict, entity_id: str, sender_hex: str) -> None: "status": "active", "identity": {"node_id": sender_hex}, "tags": ["mesh_node"], - "signal_quality": _snr_to_quality(data.get("snr") or data.get("rxSnr")), + "signal_quality": snr_to_quality(data.get("snr") or data.get("rxSnr")), } await publish_entity(entity, ttl=_NODE_TTL, merge=True) @@ -204,10 +205,3 @@ def _channel_from_topic(topic: str) -> str: return "unknown" -def _snr_to_quality(snr) -> float | None: - if snr is None: - return None - try: - return max(0.0, min(1.0, (float(snr) + 20) / 30)) - except (TypeError, ValueError): - return None diff --git a/poller/pollers/meshcore.py b/poller/pollers/meshcore.py index 92425be..ea0e676 100644 --- a/poller/pollers/meshcore.py +++ b/poller/pollers/meshcore.py @@ -21,7 +21,7 @@ import websockets from bus import get_bus, publish_entity, set_feed -from normalizers.mesh_node import normalize_remoteterm_contact +from normalizers.mesh_node import normalize_remoteterm_contact, snr_to_quality from sanitize import sanitize_payload from .base import BasePoller @@ -192,7 +192,7 @@ async def _handle_ws_event(self, event: dict, base_url: str): # Ensure sender ID matches the store format node_b = f"mesh_node:{sender_id}" if not str(sender_id).startswith("mesh_node:") else str(sender_id) link_key = f"{base_url}:local->{node_b}" - + # Throttle real-time link updates to max once per 10s now = time.time() if now - self._last_link_update.get(link_key, 0) < 10: @@ -200,13 +200,24 @@ async def _handle_ws_event(self, event: dict, base_url: str): self._last_link_update[link_key] = now logger.debug("[meshcore] throttled link update from %s: snr=%s rssi=%s", sender_id, snr, rssi) - + + # Stamp signal_quality on the sending node's entity + await publish_entity({ + "entity_id": node_b, + "entity_type": "mesh_node", + "source": "meshcore", + "signal_quality": snr_to_quality(snr), + "identity": {}, + "tags": ["mesh_node"], + }, merge=True, record_observation=False) + + local_id = self._local_node_ids.get(base_url, "local") r = await get_bus() await r.publish("civic:updates", json.dumps(sanitize_payload({ "type": "mesh_links", "data": [{ "source_url": base_url, - "node_a": "local", + "node_a": local_id, "node_b": node_b, "snr": snr, "link_quality": None