diff --git a/docs/LEARNING_LOG.md b/docs/LEARNING_LOG.md index 5faab8f..17e63dc 100644 --- a/docs/LEARNING_LOG.md +++ b/docs/LEARNING_LOG.md @@ -22,6 +22,54 @@ This file should be updated by Codex after each meaningful change. ### What to learn next ``` +## 2026-05-24 - OPC-UA Read-Only Adapter Foundation + +### What changed + +Added a read-only OPC-UA adapter foundation that loads enabled OPC-UA connection +profiles, reads only the configured node IDs, and emits normalized +`process.measurement.recorded` FactoryEvents. + +### Why it matters + +This is the first protocol adapter step after connection profile management. It +proves the safe data path from configured OPC-UA source reads into the existing +FactoryEvent ingestion boundary without adding browse-all-tags behavior, +industrial writeback, or UI ingestion controls. + +### How it works + +`load_enabled_opcua_profiles()` reads the local connection profile store and +filters to enabled OPC-UA profiles. `run_opcua_read_only_adapter()` passes each +profile to an `OpcUaNodeReader`, which is constrained to the profile's +configured `opcua.node_ids`. Each numeric node value becomes a +`process.measurement.recorded` event whose source metadata includes the profile +ID, node ID, and poll index. Unavailable endpoints and bad configured nodes +raise clear adapter errors. + +### How to run it + +The foundation is currently exercised from Python code and tests. It reads the +same local profile store used by the connection profile API and writes to the +same event store interface used by ingestion. + +### How to test it + +```bash +.venv/bin/python -m pytest services/ingestion/tests/test_opcua_read_only_adapter.py +``` + +### Key files + +- `services/ingestion/factory_ingestion/opcua_read_only_adapter.py` +- `services/ingestion/tests/test_opcua_read_only_adapter.py` +- `services/ingestion/README.md` + +### What to learn next + +Add explicit tag/source mapping lookup so `mapping_reference` can provide site, +line, asset, unit, and signal metadata instead of adapter defaults. + ## 2026-05-24 - Connections Workbench page ### What changed diff --git a/services/ingestion/README.md b/services/ingestion/README.md index ca23878..3cbbc24 100644 --- a/services/ingestion/README.md +++ b/services/ingestion/README.md @@ -187,6 +187,47 @@ demo_boundary: simulator-backed demo OPC UA source; not a production connector If the OPC UA server is unavailable, the worker exits with a readable error that includes the endpoint and the Docker Compose startup command. +## OPC-UA Read-Only Adapter Foundation + +The read-only OPC-UA adapter foundation reads enabled OPC-UA +`ProtocolConnectionProfile` records from the local connection profile store, +connects to each configured endpoint, reads only the node IDs listed in the +profile, and emits normalized `process.measurement.recorded` FactoryEvents into +an event store. + +This is the first production-oriented adapter foundation. It is not a +browse-all-tags tool, writeback path, PLC/DCS/SCADA control surface, or full +source-to-FactoryEvent mapping engine. Until a connector ADR explicitly expands +the behavior, the adapter is limited to: + +- enabled profiles where `protocol = "opcua"`; +- `profile.opcua.node_ids` as the complete read list; +- read-only connect and configured-node read operations; +- numeric node values that can be normalized as process measurements. + +The adapter captures source metadata in each emitted event: + +- `source.system` is `opcua:`; +- `source.adapter` is `opcua-read-only-adapter`; +- `source.source_event_id` includes the connection profile ID, configured node + ID, and poll index; +- `payload.tag_name` stores the configured OPC-UA node ID. + +The current foundation accepts default event context values because the +connection profile contract only carries `mapping_reference`, not the mapping +document itself. A future tag/source mapping issue should replace those defaults +with an explicit mapping lookup. + +Run the focused tests: + +```bash +.venv/bin/python -m pytest services/ingestion/tests/test_opcua_read_only_adapter.py +``` + +The test suite covers fake reader behavior, bad node and unavailable server +errors, FactoryEvent source metadata, and a practical integration test against +the local simulator-backed OPC-UA server. + ## Accepted Event Storage Accepted events are written to the local JSONL event store: diff --git a/services/ingestion/factory_ingestion/opcua_read_only_adapter.py b/services/ingestion/factory_ingestion/opcua_read_only_adapter.py new file mode 100644 index 0000000..9fbf289 --- /dev/null +++ b/services/ingestion/factory_ingestion/opcua_read_only_adapter.py @@ -0,0 +1,264 @@ +from __future__ import annotations + +import json +import re +from collections.abc import Sequence +from dataclasses import dataclass +from datetime import UTC, datetime +from pathlib import Path +from typing import Any, Literal, Protocol + +from factory_events import ( + EventContext, + EventEnvelope, + EventMetadata, + EventSource, + ProcessMeasurementPayload, + ProtocolConnectionProfile, + validate_connection_profile, +) + +from factory_ingestion.ingest import EventStore + +SENSITIVE_URL_CREDENTIAL_PATTERN = re.compile(r"(://)([^/@:]+):([^/@]+)@") + + +class OpcUaReadOnlyAdapterError(RuntimeError): + """Base error for read-only OPC-UA adapter failures.""" + + +class OpcUaConnectionProfilesStoreError(OpcUaReadOnlyAdapterError): + """Raised when the configured connection profile store cannot be read.""" + + +class OpcUaServerUnavailableError(OpcUaReadOnlyAdapterError): + """Raised when an OPC-UA endpoint cannot be reached.""" + + +class OpcUaConfiguredNodeReadError(OpcUaReadOnlyAdapterError): + """Raised when a configured OPC-UA node cannot be read or normalized.""" + + +@dataclass(frozen=True) +class OpcUaNodeValue: + node_id: str + value: float | int + quality: Literal["good", "uncertain", "bad"] = "good" + + +class OpcUaNodeReader(Protocol): + async def read_configured_nodes( + self, + profile: ProtocolConnectionProfile, + ) -> Sequence[OpcUaNodeValue]: + """Read only the node IDs configured on the supplied OPC-UA profile.""" + + +@dataclass(frozen=True) +class OpcUaEventContextDefaults: + site_id: str = "unmapped_site" + area_id: str = "unmapped_area" + line_id: str = "unmapped_line" + asset_id: str | None = None + batch_id: str | None = None + work_order_id: str | None = None + unit: str = "unknown" + simulated: bool = False + + +@dataclass(frozen=True) +class OpcUaAdapterRunResult: + profiles_read: int + nodes_read: int + emitted_events: int + events_store_path: Path | None + + +class AsyncuaOpcUaNodeReader: + async def read_configured_nodes( + self, + profile: ProtocolConnectionProfile, + ) -> Sequence[OpcUaNodeValue]: + if profile.opcua is None: + msg = f"OPC-UA profile {profile.id} is missing its opcua config block." + raise OpcUaConfiguredNodeReadError(msg) + + from asyncua import Client + + try: + async with Client(profile.endpoint) as client: + values: list[OpcUaNodeValue] = [] + for node_id in profile.opcua.node_ids: + try: + value = await client.get_node(node_id).read_value() + except Exception as exc: + msg = ( + f"failed to read configured OPC-UA node {node_id} " + f"for profile {profile.id}: {exc}" + ) + raise OpcUaConfiguredNodeReadError(msg) from exc + values.append(_node_value(profile, node_id, value)) + return values + except OpcUaConfiguredNodeReadError: + raise + except Exception as exc: + msg = ( + f"unable to connect to OPC-UA endpoint {_redact_endpoint(profile.endpoint)} " + f"for profile {profile.id}: {exc}" + ) + raise OpcUaServerUnavailableError(msg) from exc + + +def load_enabled_opcua_profiles(profile_store_path: Path) -> list[ProtocolConnectionProfile]: + if not profile_store_path.exists(): + return [] + + try: + raw_profiles = json.loads(profile_store_path.read_text(encoding="utf-8")) + except json.JSONDecodeError as exc: + msg = f"connection profile store is not valid JSON: {profile_store_path}" + raise OpcUaConnectionProfilesStoreError(msg) from exc + + if not isinstance(raw_profiles, list): + msg = f"connection profile store must contain a list: {profile_store_path}" + raise OpcUaConnectionProfilesStoreError(msg) + + profiles: list[ProtocolConnectionProfile] = [] + for raw_profile in raw_profiles: + if not isinstance(raw_profile, dict): + msg = "connection profile store entries must be JSON objects" + raise OpcUaConnectionProfilesStoreError(msg) + profile = validate_connection_profile(raw_profile) + if profile.protocol == "opcua" and profile.enabled: + profiles.append(profile) + return profiles + + +async def run_opcua_read_only_adapter( + *, + profile_store_path: Path, + events_store: EventStore, + reader: OpcUaNodeReader | None = None, + context_defaults: OpcUaEventContextDefaults | None = None, + poll_index: int = 0, + timestamp: datetime | None = None, +) -> OpcUaAdapterRunResult: + if poll_index < 0: + msg = "poll_index must be greater than or equal to 0" + raise ValueError(msg) + + profiles = load_enabled_opcua_profiles(profile_store_path) + node_reader = reader or AsyncuaOpcUaNodeReader() + defaults = context_defaults or OpcUaEventContextDefaults() + event_timestamp = timestamp or datetime.now(UTC) + nodes_read = 0 + emitted_events = 0 + + for profile in profiles: + node_values = await node_reader.read_configured_nodes(profile) + nodes_read += len(node_values) + for node_value in node_values: + event = build_process_measurement_event( + profile, + node_value, + poll_index=poll_index, + timestamp=event_timestamp, + context_defaults=defaults, + ) + events_store.append(event) + emitted_events += 1 + + return OpcUaAdapterRunResult( + profiles_read=len(profiles), + nodes_read=nodes_read, + emitted_events=emitted_events, + events_store_path=getattr(events_store, "path", None), + ) + + +def build_process_measurement_event( + profile: ProtocolConnectionProfile, + node_value: OpcUaNodeValue, + *, + poll_index: int, + timestamp: datetime, + context_defaults: OpcUaEventContextDefaults | None = None, +) -> EventEnvelope: + defaults = context_defaults or OpcUaEventContextDefaults() + node_key = _node_key(node_value.node_id) + event_id = f"evt_opcua_{_identifier_key(profile.id)}_{node_key}_{poll_index:04d}" + return EventEnvelope( + event_id=event_id, + event_type="process.measurement.recorded", + schema_version="1.0.0", + timestamp=timestamp, + source=EventSource( + system=f"opcua:{profile.id}", + adapter="opcua-read-only-adapter", + source_event_id=f"opcua:{profile.id}:{node_value.node_id}:poll:{poll_index:04d}", + ), + context=EventContext( + site_id=defaults.site_id, + area_id=defaults.area_id, + line_id=defaults.line_id, + asset_id=defaults.asset_id, + batch_id=defaults.batch_id, + work_order_id=defaults.work_order_id, + ), + payload=ProcessMeasurementPayload( + signal_id=node_key, + signal_name=_signal_name(node_key), + tag_name=node_value.node_id, + value=_numeric_node_value(profile, node_value.node_id, node_value.value), + unit=defaults.unit, + quality=node_value.quality, + ), + metadata=EventMetadata( + simulated=defaults.simulated, + trace_id=f"trace_opcua_{_identifier_key(profile.id)}_{poll_index:04d}", + ), + ) + + +def _node_value( + profile: ProtocolConnectionProfile, + node_id: str, + value: Any, +) -> OpcUaNodeValue: + return OpcUaNodeValue(node_id=node_id, value=_numeric_node_value(profile, node_id, value)) + + +def _numeric_node_value( + profile: ProtocolConnectionProfile, + node_id: str, + value: Any, +) -> float: + if isinstance(value, bool) or not isinstance(value, int | float): + msg = ( + f"configured OPC-UA node {node_id} for profile {profile.id} returned " + f"a nonnumeric value that cannot be normalized as a process measurement" + ) + raise OpcUaConfiguredNodeReadError(msg) + return float(value) + + +def _node_key(node_id: str) -> str: + if ";s=" in node_id: + node_id = node_id.rsplit(";s=", maxsplit=1)[1] + elif "=" in node_id: + node_id = node_id.rsplit("=", maxsplit=1)[1] + return _identifier_key(node_id) + + +def _identifier_key(value: str) -> str: + lowered = value.lower() + normalized = re.sub(r"[^a-z0-9]+", "_", lowered).strip("_") + return normalized or "unknown_node" + + +def _signal_name(signal_id: str) -> str: + return " ".join(part.capitalize() for part in signal_id.split("_")) + + +def _redact_endpoint(endpoint: str) -> str: + return SENSITIVE_URL_CREDENTIAL_PATTERN.sub(r"\1@", endpoint) diff --git a/services/ingestion/tests/test_opcua_read_only_adapter.py b/services/ingestion/tests/test_opcua_read_only_adapter.py new file mode 100644 index 0000000..7ee03c0 --- /dev/null +++ b/services/ingestion/tests/test_opcua_read_only_adapter.py @@ -0,0 +1,328 @@ +from __future__ import annotations + +import asyncio +import json +import socket +from datetime import UTC, datetime +from pathlib import Path + +import pytest +from factory_events import ProtocolConnectionProfile, validate_connection_profile +from factory_ingestion.opcua_read_only_adapter import ( + OpcUaAdapterRunResult, + OpcUaConfiguredNodeReadError, + OpcUaEventContextDefaults, + OpcUaNodeValue, + OpcUaReadOnlyAdapterError, + OpcUaServerUnavailableError, + build_process_measurement_event, + load_enabled_opcua_profiles, + run_opcua_read_only_adapter, +) +from factory_ingestion.storage import JsonlEventStore +from factory_simulator.opcua_demo import ( + DEMO_AREA_ID, + DEMO_LINE_ID, + DEMO_OPC_UA_NODES, + DEMO_SITE_ID, +) +from factory_simulator.opcua_server import run_server + +REPO_ROOT = Path(__file__).resolve().parents[3] +PROFILE_FIXTURES = REPO_ROOT / "packages" / "test-fixtures" / "valid-connection-profiles" + + +class RecordingOpcUaReader: + def __init__( + self, + values: dict[str, float], + *, + failure: Exception | None = None, + ) -> None: + self.values = values + self.failure = failure + self.calls: list[tuple[str, tuple[str, ...]]] = [] + + async def read_configured_nodes( + self, + profile: ProtocolConnectionProfile, + ) -> list[OpcUaNodeValue]: + assert profile.opcua is not None + node_ids = tuple(profile.opcua.node_ids) + self.calls.append((profile.endpoint, node_ids)) + if self.failure is not None: + raise self.failure + return [ + OpcUaNodeValue(node_id=node_id, value=self.values[node_id]) + for node_id in profile.opcua.node_ids + ] + + +def _free_port() -> int: + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock: + sock.bind(("127.0.0.1", 0)) + return int(sock.getsockname()[1]) + + +def _load_fixture(name: str) -> dict[str, object]: + return json.loads((PROFILE_FIXTURES / name).read_text(encoding="utf-8")) + + +def _opcua_profile_data(**updates: object) -> dict[str, object]: + profile = _load_fixture("opcua_connection_profile.json") + profile.update(updates) + return profile + + +def _write_profiles(path: Path, profiles: list[dict[str, object]]) -> None: + path.write_text(json.dumps(profiles), encoding="utf-8") + + +def test_load_enabled_opcua_profiles_reads_only_enabled_opcua_profiles(tmp_path: Path) -> None: + store_path = tmp_path / "connection_profiles.json" + disabled_opcua = _opcua_profile_data(id="disabled-opcua", enabled=False) + enabled_mqtt = _load_fixture("mqtt_connection_profile.json") + enabled_opcua = _opcua_profile_data(id="enabled-opcua") + _write_profiles(store_path, [disabled_opcua, enabled_mqtt, enabled_opcua]) + + profiles = load_enabled_opcua_profiles(store_path) + + assert [profile.id for profile in profiles] == ["enabled-opcua"] + assert profiles[0].protocol == "opcua" + + +def test_read_only_adapter_reads_configured_nodes_and_emits_factory_events( + tmp_path: Path, +) -> None: + node_ids = [ + "ns=2;s=Packaging.Line1.Filler1.FillWeight", + "ns=2;s=Packaging.Line1.Filler1.Temperature", + ] + profile = _opcua_profile_data( + endpoint="opc.tcp://operator:secret-password@127.0.0.1:4840/ofi/demo", + opcua={**_load_fixture("opcua_connection_profile.json")["opcua"], "node_ids": node_ids}, + ) + profile_store_path = tmp_path / "connection_profiles.json" + _write_profiles(profile_store_path, [profile]) + events_store = JsonlEventStore(tmp_path / "events.jsonl") + reader = RecordingOpcUaReader( + { + "ns=2;s=Packaging.Line1.Filler1.FillWeight": 501.2, + "ns=2;s=Packaging.Line1.Filler1.Temperature": 72.4, + } + ) + + result = asyncio.run( + run_opcua_read_only_adapter( + profile_store_path=profile_store_path, + events_store=events_store, + reader=reader, + context_defaults=OpcUaEventContextDefaults( + site_id="site_demo", + area_id="area_packaging", + line_id="line_1", + asset_id="asset_filler_1", + unit="engineering_unit", + simulated=True, + ), + poll_index=7, + timestamp=datetime(2026, 5, 24, 14, 0, tzinfo=UTC), + ) + ) + + assert result == OpcUaAdapterRunResult( + profiles_read=1, + nodes_read=2, + emitted_events=2, + events_store_path=events_store.path, + ) + assert reader.calls == [(str(profile["endpoint"]), tuple(node_ids))] + stored_events = events_store.list_events() + assert len(stored_events) == 2 + assert {event.event_type for event in stored_events} == {"process.measurement.recorded"} + assert {event.source.adapter for event in stored_events} == {"opcua-read-only-adapter"} + assert { + event.source.source_event_id for event in stored_events + } == { + f"opcua:{profile['id']}:{node_ids[0]}:poll:0007", + f"opcua:{profile['id']}:{node_ids[1]}:poll:0007", + } + assert {event.payload.tag_name for event in stored_events} == set(node_ids) + assert {event.context.site_id for event in stored_events} == {"site_demo"} + assert {event.metadata.simulated for event in stored_events} == {True} + serialized = json.dumps([event.model_dump(mode="json") for event in stored_events]) + assert "secret-password" not in serialized + + +def test_process_measurement_event_captures_node_id_and_source_metadata() -> None: + profile = validate_connection_profile(_opcua_profile_data()) + + event = build_process_measurement_event( + profile, + OpcUaNodeValue(node_id="ns=2;s=Line1.Filler1.FillWeight", value=500.8), + poll_index=3, + timestamp=datetime(2026, 5, 24, 14, 30, tzinfo=UTC), + context_defaults=OpcUaEventContextDefaults( + site_id="site", + area_id="area", + line_id="line", + unit="g", + ), + ) + + assert event.event_id == "evt_opcua_conn_opcua_filler_1_line1_filler1_fillweight_0003" + assert event.source.system == "opcua:conn_opcua_filler_1" + assert event.source.adapter == "opcua-read-only-adapter" + assert event.source.source_event_id == ( + "opcua:conn_opcua_filler_1:ns=2;s=Line1.Filler1.FillWeight:poll:0003" + ) + assert event.payload.signal_id == "line1_filler1_fillweight" + assert event.payload.tag_name == "ns=2;s=Line1.Filler1.FillWeight" + assert event.payload.value == 500.8 + assert event.payload.unit == "g" + + +def test_adapter_reports_unavailable_server_with_profile_and_endpoint( + tmp_path: Path, +) -> None: + profile_store_path = tmp_path / "connection_profiles.json" + profile = _opcua_profile_data( + endpoint="opc.tcp://operator:secret-password@unavailable.local:4840/ofi" + ) + _write_profiles(profile_store_path, [profile]) + reader = RecordingOpcUaReader( + {}, + failure=OpcUaServerUnavailableError( + "unable to connect to OPC-UA endpoint opc.tcp://@unavailable.local:4840/ofi " + "for profile conn_opcua_filler_1" + ), + ) + + with pytest.raises(OpcUaReadOnlyAdapterError) as exc_info: + asyncio.run( + run_opcua_read_only_adapter( + profile_store_path=profile_store_path, + events_store=JsonlEventStore(tmp_path / "events.jsonl"), + reader=reader, + ) + ) + + assert "unable to connect to OPC-UA endpoint" in str(exc_info.value) + assert "conn_opcua_filler_1" in str(exc_info.value) + assert "secret-password" not in str(exc_info.value) + + +def test_adapter_reports_bad_configured_node_without_browsing( + tmp_path: Path, +) -> None: + profile_store_path = tmp_path / "connection_profiles.json" + bad_node = "ns=2;s=Missing.Node" + profile = _opcua_profile_data( + opcua={**_load_fixture("opcua_connection_profile.json")["opcua"], "node_ids": [bad_node]} + ) + _write_profiles(profile_store_path, [profile]) + reader = RecordingOpcUaReader( + {}, + failure=OpcUaConfiguredNodeReadError( + f"failed to read configured OPC-UA node {bad_node} for profile conn_opcua_filler_1" + ), + ) + + with pytest.raises(OpcUaConfiguredNodeReadError) as exc_info: + asyncio.run( + run_opcua_read_only_adapter( + profile_store_path=profile_store_path, + events_store=JsonlEventStore(tmp_path / "events.jsonl"), + reader=reader, + ) + ) + + assert bad_node in str(exc_info.value) + assert "configured OPC-UA node" in str(exc_info.value) + + +def test_adapter_rejects_nonnumeric_node_value_clearly() -> None: + profile = validate_connection_profile(_opcua_profile_data()) + + with pytest.raises(OpcUaConfiguredNodeReadError) as exc_info: + build_process_measurement_event( + profile, + OpcUaNodeValue(node_id="ns=2;s=Line1.State", value=True), + poll_index=0, + timestamp=datetime(2026, 5, 24, 14, 30, tzinfo=UTC), + ) + + assert "nonnumeric value" in str(exc_info.value) + + +def test_read_only_adapter_reads_local_demo_opcua_server(tmp_path: Path) -> None: + pytest.importorskip("asyncua") + + async def exercise_adapter() -> None: + port = _free_port() + ready = asyncio.Event() + stop = asyncio.Event() + server_task = asyncio.create_task( + run_server( + host="127.0.0.1", + port=port, + endpoint_path="/ofi/adapter-test", + ready_event=ready, + stop_event=stop, + ) + ) + node_ids = [ + f"ns=2;s={node.node_id}" + for node in DEMO_OPC_UA_NODES + if node.node_id.endswith("FillWeight") + ][:1] + profile = _opcua_profile_data( + endpoint=f"opc.tcp://127.0.0.1:{port}/ofi/adapter-test", + opcua={ + **_load_fixture("opcua_connection_profile.json")["opcua"], + "node_ids": node_ids, + "security_mode": "none", + "security_policy": None, + "auth_secret_ref": None, + "client_certificate_ref": None, + "trusted_server_certificate_ref": None, + }, + ) + profile_store_path = tmp_path / "connection_profiles.json" + _write_profiles(profile_store_path, [profile]) + events_store = JsonlEventStore(tmp_path / "events.jsonl") + + try: + await asyncio.wait_for(ready.wait(), timeout=5) + result = await run_opcua_read_only_adapter( + profile_store_path=profile_store_path, + events_store=events_store, + context_defaults=OpcUaEventContextDefaults( + site_id=DEMO_SITE_ID, + area_id=DEMO_AREA_ID, + line_id=DEMO_LINE_ID, + asset_id="filler_f_201", + unit="g", + simulated=True, + ), + poll_index=1, + timestamp=datetime(2026, 5, 24, 15, 0, tzinfo=UTC), + ) + finally: + stop.set() + await asyncio.wait_for(server_task, timeout=5) + + assert result.profiles_read == 1 + assert result.nodes_read == 1 + assert result.emitted_events == 1 + stored_events = events_store.list_events() + assert len(stored_events) == 1 + event = stored_events[0] + assert event.payload.tag_name == node_ids[0] + assert event.payload.value == 500.12 + assert event.source.source_event_id == ( + f"opcua:conn_opcua_filler_1:{node_ids[0]}:poll:0001" + ) + assert event.context.site_id == DEMO_SITE_ID + + asyncio.run(exercise_adapter())