From 23065dc3ff916802e22575fa09e636db4662daf3 Mon Sep 17 00:00:00 2001 From: Aldon Smith Date: Mon, 25 May 2026 11:24:17 -0400 Subject: [PATCH] feat: add mqtt read-only adapter foundation --- docs/LEARNING_LOG.md | 51 +++ services/ingestion/README.md | 44 ++ .../mqtt_read_only_adapter.py | 375 ++++++++++++++++++ .../tests/test_mqtt_read_only_adapter.py | 359 +++++++++++++++++ 4 files changed, 829 insertions(+) create mode 100644 services/ingestion/factory_ingestion/mqtt_read_only_adapter.py create mode 100644 services/ingestion/tests/test_mqtt_read_only_adapter.py diff --git a/docs/LEARNING_LOG.md b/docs/LEARNING_LOG.md index 17e63dc..94ef7ec 100644 --- a/docs/LEARNING_LOG.md +++ b/docs/LEARNING_LOG.md @@ -22,6 +22,57 @@ This file should be updated by Codex after each meaningful change. ### What to learn next ``` +## 2026-05-25 - MQTT Read-Only Adapter Foundation + +### What changed + +Added a read-only MQTT adapter foundation that loads enabled MQTT connection +profiles, consumes configured topic filters through an injected message source, +and emits normalized `process.measurement.recorded` FactoryEvents from +Sparkplug-style JSON demo payloads. + +### Why it matters + +This implements the next protocol adapter step after OPC-UA while preserving +the same safety boundary: configured source reads flow into FactoryEvents, but +the adapter does not publish commands, browse arbitrary topics, perform +writeback, or add UI ingestion controls. + +### How it works + +`load_enabled_mqtt_profiles()` reads the local connection profile store and +filters to enabled MQTT profiles. `run_mqtt_read_only_adapter()` passes each +profile to an `MqttMessageSource`, which reads messages from the profile's +configured topic filters. The adapter validates topic filters, parses +Sparkplug-style JSON with a `metrics` array, maps numeric metrics into process +measurement events, and records the profile ID, topic, metric name, adapter +name, and poll index in source metadata. Broker-unavailable, malformed payload, +and unmapped topic states raise clear adapter errors. + +### How to run it + +The foundation is currently exercised from Python code and tests. It reads the +same local profile store used by the connection profile API and writes to the +same event store interface used by ingestion. + +### How to test it + +```bash +.venv/bin/python -m pytest services/ingestion/tests/test_mqtt_read_only_adapter.py +``` + +### Key files + +- `services/ingestion/factory_ingestion/mqtt_read_only_adapter.py` +- `services/ingestion/tests/test_mqtt_read_only_adapter.py` +- `services/ingestion/README.md` + +### What to learn next + +Add a real broker-backed message source once connector runtime orchestration and +secret resolution are defined, then keep publish/writeback behavior out of that +runtime unless a dedicated ADR explicitly approves it. + ## 2026-05-24 - OPC-UA Read-Only Adapter Foundation ### What changed diff --git a/services/ingestion/README.md b/services/ingestion/README.md index 3cbbc24..5341c4f 100644 --- a/services/ingestion/README.md +++ b/services/ingestion/README.md @@ -228,6 +228,50 @@ The test suite covers fake reader behavior, bad node and unavailable server errors, FactoryEvent source metadata, and a practical integration test against the local simulator-backed OPC-UA server. +## MQTT Read-Only Adapter Foundation + +The read-only MQTT adapter foundation reads enabled MQTT +`ProtocolConnectionProfile` records from the local connection profile store, +uses an injected message source to consume configured topic filters, and emits +normalized `process.measurement.recorded` FactoryEvents into an event store. + +This foundation does not add a broker runtime, command publisher, arbitrary +topic discovery, writeback path, or UI ingestion controls. Until a connector ADR +explicitly expands the behavior, the adapter is limited to: + +- enabled profiles where `protocol = "mqtt"`; +- the configured broker URL in `endpoint`; +- the configured MQTT client ID, topic filters, QoS, TLS flag, and secret or + certificate references already present in the profile; +- read-only consumption from configured topic filters; +- Sparkplug-style JSON demo payloads with a non-empty `metrics` list. + +The first supported payload shape is a JSON object with optional `context` and a +`metrics` array. Each metric object must include `name` and a numeric `value`. +Optional metric fields include `signal_id`, `signal_name`, `unit`, `quality`, +and `context`. Context from the payload and metric is used to populate the +FactoryEvent context; adapter defaults fill any missing site, area, line, asset, +batch, work order, or unit values. + +The adapter captures source metadata in each emitted event: + +- `source.system` is `mqtt:`; +- `source.adapter` is `mqtt-read-only-adapter`; +- `source.source_event_id` includes the connection profile ID, topic, metric + name, and poll index; +- `payload.tag_name` stores `:`. + +Run the focused tests: + +```bash +.venv/bin/python -m pytest services/ingestion/tests/test_mqtt_read_only_adapter.py +``` + +The test suite covers fake MQTT messages, topic wildcard matching, profile +configuration access, Sparkplug-style JSON payload mapping, broker-unavailable +errors, malformed payload errors, unmapped topic errors, and the absence of a +publish/writeback surface. + ## Accepted Event Storage Accepted events are written to the local JSONL event store: diff --git a/services/ingestion/factory_ingestion/mqtt_read_only_adapter.py b/services/ingestion/factory_ingestion/mqtt_read_only_adapter.py new file mode 100644 index 0000000..5843f43 --- /dev/null +++ b/services/ingestion/factory_ingestion/mqtt_read_only_adapter.py @@ -0,0 +1,375 @@ +from __future__ import annotations + +import json +import re +from collections.abc import Sequence +from dataclasses import dataclass +from datetime import UTC, datetime +from pathlib import Path +from typing import Any, Literal, Protocol + +from factory_events import ( + EventContext, + EventEnvelope, + EventMetadata, + EventSource, + ProcessMeasurementPayload, + ProtocolConnectionProfile, + validate_connection_profile, +) + +from factory_ingestion.ingest import EventStore + +SENSITIVE_URL_CREDENTIAL_PATTERN = re.compile(r"(://)([^/@:]+):([^/@]+)@") + + +class MqttReadOnlyAdapterError(RuntimeError): + """Base error for read-only MQTT adapter failures.""" + + +class MqttConnectionProfilesStoreError(MqttReadOnlyAdapterError): + """Raised when the configured connection profile store cannot be read.""" + + +class MqttBrokerUnavailableError(MqttReadOnlyAdapterError): + """Raised when the configured MQTT broker cannot be reached.""" + + +class MqttMalformedPayloadError(MqttReadOnlyAdapterError): + """Raised when a subscribed MQTT payload cannot be normalized.""" + + +class MqttUnmappedTopicError(MqttReadOnlyAdapterError): + """Raised when a message topic is outside the configured topic filters.""" + + +@dataclass(frozen=True) +class MqttMessage: + topic: str + payload: bytes | str + qos: int | None = None + retained: bool = False + + +class MqttMessageSource(Protocol): + async def read_configured_messages( + self, + profile: ProtocolConnectionProfile, + ) -> Sequence[MqttMessage]: + """Read only messages from the topic filters configured on the MQTT profile.""" + + +@dataclass(frozen=True) +class MqttEventContextDefaults: + site_id: str = "unmapped_site" + area_id: str = "unmapped_area" + line_id: str = "unmapped_line" + asset_id: str | None = None + batch_id: str | None = None + work_order_id: str | None = None + unit: str = "unknown" + simulated: bool = False + + +@dataclass(frozen=True) +class MqttAdapterRunResult: + profiles_read: int + messages_read: int + emitted_events: int + events_store_path: Path | None + + +class UnconfiguredMqttMessageSource: + async def read_configured_messages( + self, + profile: ProtocolConnectionProfile, + ) -> Sequence[MqttMessage]: + msg = ( + f"no MQTT broker message source is configured for profile {profile.id} " + f"at {_redact_endpoint(profile.endpoint)}" + ) + raise MqttBrokerUnavailableError(msg) + + +def load_enabled_mqtt_profiles(profile_store_path: Path) -> list[ProtocolConnectionProfile]: + if not profile_store_path.exists(): + return [] + + try: + raw_profiles = json.loads(profile_store_path.read_text(encoding="utf-8")) + except json.JSONDecodeError as exc: + msg = f"connection profile store is not valid JSON: {profile_store_path}" + raise MqttConnectionProfilesStoreError(msg) from exc + + if not isinstance(raw_profiles, list): + msg = f"connection profile store must contain a list: {profile_store_path}" + raise MqttConnectionProfilesStoreError(msg) + + profiles: list[ProtocolConnectionProfile] = [] + for raw_profile in raw_profiles: + if not isinstance(raw_profile, dict): + msg = "connection profile store entries must be JSON objects" + raise MqttConnectionProfilesStoreError(msg) + profile = validate_connection_profile(raw_profile) + if profile.protocol == "mqtt" and profile.enabled: + profiles.append(profile) + return profiles + + +async def run_mqtt_read_only_adapter( + *, + profile_store_path: Path, + events_store: EventStore, + message_source: MqttMessageSource | None = None, + context_defaults: MqttEventContextDefaults | None = None, + poll_index: int = 0, + timestamp: datetime | None = None, +) -> MqttAdapterRunResult: + if poll_index < 0: + msg = "poll_index must be greater than or equal to 0" + raise ValueError(msg) + + profiles = load_enabled_mqtt_profiles(profile_store_path) + source = message_source or UnconfiguredMqttMessageSource() + defaults = context_defaults or MqttEventContextDefaults() + event_timestamp = timestamp or datetime.now(UTC) + messages_read = 0 + emitted_events = 0 + + for profile in profiles: + try: + messages = await source.read_configured_messages(profile) + except MqttReadOnlyAdapterError: + raise + except (ConnectionError, TimeoutError) as exc: + msg = ( + f"unable to read MQTT broker {_redact_endpoint(profile.endpoint)} " + f"for profile {profile.id}: {exc}" + ) + raise MqttBrokerUnavailableError(msg) from exc + + messages_read += len(messages) + for message in messages: + _ensure_configured_topic(profile, message.topic) + events = build_events_from_sparkplug_json( + profile, + message, + poll_index=poll_index, + timestamp=event_timestamp, + context_defaults=defaults, + ) + for event in events: + events_store.append(event) + emitted_events += 1 + + return MqttAdapterRunResult( + profiles_read=len(profiles), + messages_read=messages_read, + emitted_events=emitted_events, + events_store_path=getattr(events_store, "path", None), + ) + + +def build_events_from_sparkplug_json( + profile: ProtocolConnectionProfile, + message: MqttMessage, + *, + poll_index: int, + timestamp: datetime, + context_defaults: MqttEventContextDefaults | None = None, +) -> list[EventEnvelope]: + payload = _decode_payload(message) + metrics = payload.get("metrics") + if not isinstance(metrics, list) or not metrics: + msg = f"MQTT message on topic {message.topic} must contain a non-empty metrics list" + raise MqttMalformedPayloadError(msg) + + defaults = context_defaults or MqttEventContextDefaults() + events: list[EventEnvelope] = [] + for metric_index, raw_metric in enumerate(metrics): + if not isinstance(raw_metric, dict): + msg = f"MQTT metric at index {metric_index} on topic {message.topic} must be an object" + raise MqttMalformedPayloadError(msg) + events.append( + _event_from_metric( + profile, + message, + raw_metric, + metric_index=metric_index, + poll_index=poll_index, + timestamp=timestamp, + payload_context=payload.get("context"), + defaults=defaults, + ) + ) + return events + + +def topic_matches_filter(topic: str, topic_filter: str) -> bool: + topic_levels = topic.split("/") + filter_levels = topic_filter.split("/") + + for index, filter_level in enumerate(filter_levels): + if filter_level == "#": + return index == len(filter_levels) - 1 + if index >= len(topic_levels): + return False + if filter_level == "+": + continue + if filter_level != topic_levels[index]: + return False + return len(topic_levels) == len(filter_levels) + + +def _event_from_metric( + profile: ProtocolConnectionProfile, + message: MqttMessage, + raw_metric: dict[str, Any], + *, + metric_index: int, + poll_index: int, + timestamp: datetime, + payload_context: Any, + defaults: MqttEventContextDefaults, +) -> EventEnvelope: + metric_name = _required_string(raw_metric, "name", message.topic, metric_index) + metric_value = _numeric_metric_value(raw_metric.get("value"), message.topic, metric_name) + signal_id = _identifier_key(str(raw_metric.get("signal_id") or metric_name)) + metric_context = raw_metric.get("context") + context = _context(defaults, payload_context, metric_context) + event_id = ( + f"evt_mqtt_{_identifier_key(profile.id)}_{_identifier_key(message.topic)}_" + f"{signal_id}_{poll_index:04d}_{metric_index:02d}" + ) + return EventEnvelope( + event_id=event_id, + event_type="process.measurement.recorded", + schema_version="1.0.0", + timestamp=timestamp, + source=EventSource( + system=f"mqtt:{profile.id}", + adapter="mqtt-read-only-adapter", + source_event_id=( + f"mqtt:{profile.id}:{message.topic}:{metric_name}:poll:{poll_index:04d}" + ), + ), + context=context, + payload=ProcessMeasurementPayload( + signal_id=signal_id, + signal_name=str(raw_metric.get("signal_name") or _signal_name(signal_id)), + tag_name=f"{message.topic}:{metric_name}", + value=metric_value, + unit=str(raw_metric.get("unit") or defaults.unit), + quality=_quality(raw_metric.get("quality")), + ), + metadata=EventMetadata( + simulated=defaults.simulated, + trace_id=f"trace_mqtt_{_identifier_key(profile.id)}_{poll_index:04d}", + ), + ) + + +def _decode_payload(message: MqttMessage) -> dict[str, Any]: + try: + raw_payload = ( + message.payload.decode("utf-8") + if isinstance(message.payload, bytes) + else message.payload + ) + decoded = json.loads(raw_payload) + except (UnicodeDecodeError, json.JSONDecodeError) as exc: + msg = f"malformed MQTT JSON payload on topic {message.topic}: {exc}" + raise MqttMalformedPayloadError(msg) from exc + + if not isinstance(decoded, dict): + msg = f"MQTT payload on topic {message.topic} must be a JSON object" + raise MqttMalformedPayloadError(msg) + return decoded + + +def _ensure_configured_topic(profile: ProtocolConnectionProfile, topic: str) -> None: + if profile.mqtt is None: + msg = f"MQTT profile {profile.id} is missing its mqtt config block." + raise MqttUnmappedTopicError(msg) + if any( + topic_matches_filter(topic, topic_filter) + for topic_filter in profile.mqtt.topic_filters + ): + return + msg = ( + f"MQTT topic {topic} is not covered by configured topic filters " + f"for profile {profile.id}" + ) + raise MqttUnmappedTopicError(msg) + + +def _context( + defaults: MqttEventContextDefaults, + payload_context: Any, + metric_context: Any, +) -> EventContext: + context: dict[str, Any] = {} + if isinstance(payload_context, dict): + context.update(payload_context) + if isinstance(metric_context, dict): + context.update(metric_context) + return EventContext( + site_id=str(context.get("site_id") or defaults.site_id), + area_id=str(context.get("area_id") or defaults.area_id), + line_id=str(context.get("line_id") or defaults.line_id), + asset_id=_optional_str(context.get("asset_id"), defaults.asset_id), + batch_id=_optional_str(context.get("batch_id"), defaults.batch_id), + work_order_id=_optional_str(context.get("work_order_id"), defaults.work_order_id), + ) + + +def _required_string( + raw_metric: dict[str, Any], + field_name: str, + topic: str, + metric_index: int, +) -> str: + value = raw_metric.get(field_name) + if not isinstance(value, str) or not value: + msg = f"MQTT metric at index {metric_index} on topic {topic} must include {field_name}" + raise MqttMalformedPayloadError(msg) + return value + + +def _numeric_metric_value(value: Any, topic: str, metric_name: str) -> float: + if isinstance(value, bool) or not isinstance(value, int | float): + msg = ( + f"MQTT metric {metric_name} on topic {topic} returned a nonnumeric value " + "that cannot be normalized as a process measurement" + ) + raise MqttMalformedPayloadError(msg) + return float(value) + + +def _quality(value: Any) -> Literal["good", "uncertain", "bad"]: + if value is None: + return "good" + if value in ("good", "uncertain", "bad"): + return value + msg = f"MQTT metric quality must be one of good, uncertain, or bad: {value}" + raise MqttMalformedPayloadError(msg) + + +def _optional_str(value: Any, fallback: str | None) -> str | None: + if value is None: + return fallback + return str(value) + + +def _identifier_key(value: str) -> str: + lowered = value.lower() + normalized = re.sub(r"[^a-z0-9]+", "_", lowered).strip("_") + return normalized or "unknown" + + +def _signal_name(signal_id: str) -> str: + return " ".join(part.capitalize() for part in signal_id.split("_")) + + +def _redact_endpoint(endpoint: str) -> str: + return SENSITIVE_URL_CREDENTIAL_PATTERN.sub(r"\1@", endpoint) diff --git a/services/ingestion/tests/test_mqtt_read_only_adapter.py b/services/ingestion/tests/test_mqtt_read_only_adapter.py new file mode 100644 index 0000000..a75c37c --- /dev/null +++ b/services/ingestion/tests/test_mqtt_read_only_adapter.py @@ -0,0 +1,359 @@ +from __future__ import annotations + +import asyncio +import json +from datetime import UTC, datetime +from pathlib import Path + +import pytest +from factory_events import ProtocolConnectionProfile +from factory_ingestion.mqtt_read_only_adapter import ( + MqttAdapterRunResult, + MqttBrokerUnavailableError, + MqttEventContextDefaults, + MqttMalformedPayloadError, + MqttMessage, + MqttReadOnlyAdapterError, + MqttUnmappedTopicError, + build_events_from_sparkplug_json, + load_enabled_mqtt_profiles, + run_mqtt_read_only_adapter, + topic_matches_filter, +) +from factory_ingestion.storage import JsonlEventStore + +REPO_ROOT = Path(__file__).resolve().parents[3] +PROFILE_FIXTURES = REPO_ROOT / "packages" / "test-fixtures" / "valid-connection-profiles" + + +class RecordingMqttMessageSource: + def __init__( + self, + messages: list[MqttMessage], + *, + failure: Exception | None = None, + ) -> None: + self.messages = messages + self.failure = failure + self.calls: list[dict[str, object]] = [] + + async def read_configured_messages( + self, + profile: ProtocolConnectionProfile, + ) -> list[MqttMessage]: + assert profile.mqtt is not None + self.calls.append( + { + "endpoint": profile.endpoint, + "client_id": profile.mqtt.client_id, + "topic_filters": tuple(profile.mqtt.topic_filters), + "qos": profile.mqtt.qos, + "use_tls": profile.mqtt.use_tls, + "auth_secret_ref": profile.mqtt.auth_secret_ref.ref + if profile.mqtt.auth_secret_ref + else None, + "ca_certificate_ref": profile.mqtt.ca_certificate_ref.ref + if profile.mqtt.ca_certificate_ref + else None, + "client_certificate_ref": profile.mqtt.client_certificate_ref.ref + if profile.mqtt.client_certificate_ref + else None, + } + ) + if self.failure is not None: + raise self.failure + return self.messages + + +def _load_fixture(name: str) -> dict[str, object]: + return json.loads((PROFILE_FIXTURES / name).read_text(encoding="utf-8")) + + +def _mqtt_profile_data(**updates: object) -> dict[str, object]: + profile = _load_fixture("mqtt_connection_profile.json") + profile.update(updates) + return profile + + +def _write_profiles(path: Path, profiles: list[dict[str, object]]) -> None: + path.write_text(json.dumps(profiles), encoding="utf-8") + + +def _sparkplug_payload() -> str: + return json.dumps( + { + "timestamp": 1_797_000_000_000, + "context": { + "site_id": "site_demo", + "area_id": "area_packaging", + "line_id": "line_1", + "work_order_id": "wo_1001", + }, + "metrics": [ + { + "name": "filler_f_201.fill_weight", + "value": 501.4, + "unit": "g", + "quality": "good", + "context": {"asset_id": "filler_f_201"}, + }, + { + "name": "filler_f_201.nozzle_pressure", + "value": 2.2, + "unit": "bar", + "quality": "uncertain", + "context": {"asset_id": "filler_f_201"}, + }, + ], + } + ) + + +def test_load_enabled_mqtt_profiles_reads_only_enabled_mqtt_profiles(tmp_path: Path) -> None: + store_path = tmp_path / "connection_profiles.json" + disabled_mqtt = _mqtt_profile_data(id="disabled-mqtt", enabled=False) + enabled_opcua = _load_fixture("opcua_connection_profile.json") + enabled_mqtt = _mqtt_profile_data(id="enabled-mqtt") + _write_profiles(store_path, [disabled_mqtt, enabled_opcua, enabled_mqtt]) + + profiles = load_enabled_mqtt_profiles(store_path) + + assert [profile.id for profile in profiles] == ["enabled-mqtt"] + assert profiles[0].protocol == "mqtt" + + +def test_topic_matches_mqtt_wildcards() -> None: + assert topic_matches_filter( + "ofi/site_demo/packaging/line_1/filler_f_201/telemetry", + "ofi/site_demo/packaging/line_1/+/telemetry", + ) + assert topic_matches_filter( + "spBv1.0/OFI/DDATA/Line1/Filler", + "spBv1.0/OFI/DDATA/#", + ) + assert not topic_matches_filter( + "ofi/site_demo/packaging/line_2/filler_f_201/telemetry", + "ofi/site_demo/packaging/line_1/+/telemetry", + ) + + +def test_mqtt_adapter_reads_profiles_and_maps_sparkplug_json_events( + tmp_path: Path, +) -> None: + profile_store_path = tmp_path / "connection_profiles.json" + profile = _mqtt_profile_data( + endpoint="mqtts://operator:secret-password@broker.local:8883", + mqtt={ + **_load_fixture("mqtt_connection_profile.json")["mqtt"], + "topic_filters": ["spBv1.0/OFI/DDATA/#"], + }, + ) + _write_profiles(profile_store_path, [profile]) + events_store = JsonlEventStore(tmp_path / "events.jsonl") + source = RecordingMqttMessageSource( + [ + MqttMessage( + topic="spBv1.0/OFI/DDATA/Line1/Filler", + payload=_sparkplug_payload(), + qos=1, + ) + ] + ) + + result = asyncio.run( + run_mqtt_read_only_adapter( + profile_store_path=profile_store_path, + events_store=events_store, + message_source=source, + context_defaults=MqttEventContextDefaults( + site_id="fallback_site", + area_id="fallback_area", + line_id="fallback_line", + simulated=True, + ), + poll_index=5, + timestamp=datetime(2026, 5, 25, 10, 0, tzinfo=UTC), + ) + ) + + assert result == MqttAdapterRunResult( + profiles_read=1, + messages_read=1, + emitted_events=2, + events_store_path=events_store.path, + ) + assert source.calls == [ + { + "endpoint": "mqtts://operator:secret-password@broker.local:8883", + "client_id": "ofi-fip-packaging-readonly", + "topic_filters": ("spBv1.0/OFI/DDATA/#",), + "qos": 1, + "use_tls": True, + "auth_secret_ref": "local/mqtt/packaging/client-auth", + "ca_certificate_ref": "local/mqtt/packaging/ca-cert", + "client_certificate_ref": "local/mqtt/packaging/client-cert", + } + ] + + stored_events = events_store.list_events() + assert len(stored_events) == 2 + assert {event.event_type for event in stored_events} == {"process.measurement.recorded"} + assert {event.source.adapter for event in stored_events} == {"mqtt-read-only-adapter"} + assert {event.source.system for event in stored_events} == {"mqtt:conn_mqtt_packaging_uns"} + assert { + event.source.source_event_id for event in stored_events + } == { + "mqtt:conn_mqtt_packaging_uns:spBv1.0/OFI/DDATA/Line1/Filler:" + "filler_f_201.fill_weight:poll:0005", + "mqtt:conn_mqtt_packaging_uns:spBv1.0/OFI/DDATA/Line1/Filler:" + "filler_f_201.nozzle_pressure:poll:0005", + } + assert {event.payload.tag_name for event in stored_events} == { + "spBv1.0/OFI/DDATA/Line1/Filler:filler_f_201.fill_weight", + "spBv1.0/OFI/DDATA/Line1/Filler:filler_f_201.nozzle_pressure", + } + assert {event.context.site_id for event in stored_events} == {"site_demo"} + assert {event.context.asset_id for event in stored_events} == {"filler_f_201"} + assert {event.metadata.simulated for event in stored_events} == {True} + serialized = json.dumps([event.model_dump(mode="json") for event in stored_events]) + assert "secret-password" not in serialized + + +def test_sparkplug_mapping_uses_defaults_when_payload_context_is_absent() -> None: + profile = ProtocolConnectionProfile.model_validate(_mqtt_profile_data()) + message = MqttMessage( + topic="ofi/site_demo/packaging/line_1/filler_f_201/telemetry", + payload=json.dumps( + { + "metrics": [ + { + "name": "FillWeight", + "signal_id": "fill_weight", + "signal_name": "Fill Weight", + "value": 500.9, + } + ] + } + ), + ) + + events = build_events_from_sparkplug_json( + profile, + message, + poll_index=2, + timestamp=datetime(2026, 5, 25, 11, 0, tzinfo=UTC), + context_defaults=MqttEventContextDefaults( + site_id="site_default", + area_id="area_default", + line_id="line_default", + asset_id="asset_default", + unit="g", + ), + ) + + assert len(events) == 1 + event = events[0] + assert event.event_id == ( + "evt_mqtt_conn_mqtt_packaging_uns_" + "ofi_site_demo_packaging_line_1_filler_f_201_telemetry_fill_weight_0002_00" + ) + assert event.context.site_id == "site_default" + assert event.context.asset_id == "asset_default" + assert event.payload.signal_id == "fill_weight" + assert event.payload.signal_name == "Fill Weight" + assert event.payload.unit == "g" + assert event.payload.value == 500.9 + + +def test_mqtt_adapter_reports_unmapped_topic_clearly(tmp_path: Path) -> None: + profile_store_path = tmp_path / "connection_profiles.json" + _write_profiles(profile_store_path, [_mqtt_profile_data()]) + source = RecordingMqttMessageSource( + [MqttMessage(topic="unmapped/topic", payload=_sparkplug_payload())] + ) + + with pytest.raises(MqttUnmappedTopicError) as exc_info: + asyncio.run( + run_mqtt_read_only_adapter( + profile_store_path=profile_store_path, + events_store=JsonlEventStore(tmp_path / "events.jsonl"), + message_source=source, + ) + ) + + assert "unmapped/topic" in str(exc_info.value) + assert "configured topic filters" in str(exc_info.value) + + +@pytest.mark.parametrize( + "payload", + [ + "{bad-json", + json.dumps({"metrics": []}), + json.dumps({"metrics": [{"name": "fill_weight", "value": "bad"}]}), + json.dumps({"metrics": [{"name": "fill_weight", "value": 1.2, "quality": "offline"}]}), + ], +) +def test_mqtt_adapter_reports_malformed_payload_clearly( + tmp_path: Path, + payload: str, +) -> None: + profile_store_path = tmp_path / "connection_profiles.json" + profile = _mqtt_profile_data( + mqtt={ + **_load_fixture("mqtt_connection_profile.json")["mqtt"], + "topic_filters": ["spBv1.0/OFI/DDATA/#"], + } + ) + _write_profiles(profile_store_path, [profile]) + source = RecordingMqttMessageSource( + [MqttMessage(topic="spBv1.0/OFI/DDATA/Line1/Filler", payload=payload)] + ) + + with pytest.raises(MqttMalformedPayloadError) as exc_info: + asyncio.run( + run_mqtt_read_only_adapter( + profile_store_path=profile_store_path, + events_store=JsonlEventStore(tmp_path / "events.jsonl"), + message_source=source, + ) + ) + + assert "MQTT" in str(exc_info.value) + + +def test_mqtt_adapter_reports_broker_unavailable_with_redacted_endpoint( + tmp_path: Path, +) -> None: + profile_store_path = tmp_path / "connection_profiles.json" + profile = _mqtt_profile_data(endpoint="mqtts://operator:secret-password@offline.local:8883") + _write_profiles(profile_store_path, [profile]) + source = RecordingMqttMessageSource([], failure=ConnectionError("connection refused")) + + with pytest.raises(MqttReadOnlyAdapterError) as exc_info: + asyncio.run( + run_mqtt_read_only_adapter( + profile_store_path=profile_store_path, + events_store=JsonlEventStore(tmp_path / "events.jsonl"), + message_source=source, + ) + ) + + assert isinstance(exc_info.value, MqttBrokerUnavailableError) + assert "offline.local" in str(exc_info.value) + assert "secret-password" not in str(exc_info.value) + assert "" in str(exc_info.value) + + +def test_mqtt_adapter_has_no_publish_or_writeback_surface() -> None: + adapter_source = ( + REPO_ROOT + / "services" + / "ingestion" + / "factory_ingestion" + / "mqtt_read_only_adapter.py" + ).read_text(encoding="utf-8") + + assert "def publish" not in adapter_source + assert ".publish(" not in adapter_source + assert "writeback" not in adapter_source.lower()