Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions docs/LEARNING_LOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,54 @@ This file should be updated by Codex after each meaningful change.
### What to learn next
```

## 2026-05-24 - OPC-UA Read-Only Adapter Foundation

### What changed

Added a read-only OPC-UA adapter foundation that loads enabled OPC-UA connection
profiles, reads only the configured node IDs, and emits normalized
`process.measurement.recorded` FactoryEvents.

### Why it matters

This is the first protocol adapter step after connection profile management. It
proves the safe data path from configured OPC-UA source reads into the existing
FactoryEvent ingestion boundary without adding browse-all-tags behavior,
industrial writeback, or UI ingestion controls.

### How it works

`load_enabled_opcua_profiles()` reads the local connection profile store and
filters to enabled OPC-UA profiles. `run_opcua_read_only_adapter()` passes each
profile to an `OpcUaNodeReader`, which is constrained to the profile's
configured `opcua.node_ids`. Each numeric node value becomes a
`process.measurement.recorded` event whose source metadata includes the profile
ID, node ID, and poll index. Unavailable endpoints and bad configured nodes
raise clear adapter errors.

### How to run it

The foundation is currently exercised from Python code and tests. It reads the
same local profile store used by the connection profile API and writes to the
same event store interface used by ingestion.

### How to test it

```bash
.venv/bin/python -m pytest services/ingestion/tests/test_opcua_read_only_adapter.py
```

### Key files

- `services/ingestion/factory_ingestion/opcua_read_only_adapter.py`
- `services/ingestion/tests/test_opcua_read_only_adapter.py`
- `services/ingestion/README.md`

### What to learn next

Add explicit tag/source mapping lookup so `mapping_reference` can provide site,
line, asset, unit, and signal metadata instead of adapter defaults.

## 2026-05-24 - Connections Workbench page

### What changed
Expand Down
41 changes: 41 additions & 0 deletions services/ingestion/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,47 @@ demo_boundary: simulator-backed demo OPC UA source; not a production connector
If the OPC UA server is unavailable, the worker exits with a readable error
that includes the endpoint and the Docker Compose startup command.

## OPC-UA Read-Only Adapter Foundation

The read-only OPC-UA adapter foundation reads enabled OPC-UA
`ProtocolConnectionProfile` records from the local connection profile store,
connects to each configured endpoint, reads only the node IDs listed in the
profile, and emits normalized `process.measurement.recorded` FactoryEvents into
an event store.

This is the first production-oriented adapter foundation. It is not a
browse-all-tags tool, writeback path, PLC/DCS/SCADA control surface, or full
source-to-FactoryEvent mapping engine. Until a connector ADR explicitly expands
the behavior, the adapter is limited to:

- enabled profiles where `protocol = "opcua"`;
- `profile.opcua.node_ids` as the complete read list;
- read-only connect and configured-node read operations;
- numeric node values that can be normalized as process measurements.

The adapter captures source metadata in each emitted event:

- `source.system` is `opcua:<connection-profile-id>`;
- `source.adapter` is `opcua-read-only-adapter`;
- `source.source_event_id` includes the connection profile ID, configured node
ID, and poll index;
- `payload.tag_name` stores the configured OPC-UA node ID.

The current foundation accepts default event context values because the
connection profile contract only carries `mapping_reference`, not the mapping
document itself. A future tag/source mapping issue should replace those defaults
with an explicit mapping lookup.

Run the focused tests:

```bash
.venv/bin/python -m pytest services/ingestion/tests/test_opcua_read_only_adapter.py
```

The test suite covers fake reader behavior, bad node and unavailable server
errors, FactoryEvent source metadata, and a practical integration test against
the local simulator-backed OPC-UA server.

## Accepted Event Storage

Accepted events are written to the local JSONL event store:
Expand Down
264 changes: 264 additions & 0 deletions services/ingestion/factory_ingestion/opcua_read_only_adapter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
from __future__ import annotations

import json
import re
from collections.abc import Sequence
from dataclasses import dataclass
from datetime import UTC, datetime
from pathlib import Path
from typing import Any, Literal, Protocol

from factory_events import (
EventContext,
EventEnvelope,
EventMetadata,
EventSource,
ProcessMeasurementPayload,
ProtocolConnectionProfile,
validate_connection_profile,
)

from factory_ingestion.ingest import EventStore

SENSITIVE_URL_CREDENTIAL_PATTERN = re.compile(r"(://)([^/@:]+):([^/@]+)@")


class OpcUaReadOnlyAdapterError(RuntimeError):
"""Base error for read-only OPC-UA adapter failures."""


class OpcUaConnectionProfilesStoreError(OpcUaReadOnlyAdapterError):
"""Raised when the configured connection profile store cannot be read."""


class OpcUaServerUnavailableError(OpcUaReadOnlyAdapterError):
"""Raised when an OPC-UA endpoint cannot be reached."""


class OpcUaConfiguredNodeReadError(OpcUaReadOnlyAdapterError):
"""Raised when a configured OPC-UA node cannot be read or normalized."""


@dataclass(frozen=True)
class OpcUaNodeValue:
node_id: str
value: float | int
quality: Literal["good", "uncertain", "bad"] = "good"


class OpcUaNodeReader(Protocol):
async def read_configured_nodes(
self,
profile: ProtocolConnectionProfile,
) -> Sequence[OpcUaNodeValue]:
"""Read only the node IDs configured on the supplied OPC-UA profile."""


@dataclass(frozen=True)
class OpcUaEventContextDefaults:
site_id: str = "unmapped_site"
area_id: str = "unmapped_area"
line_id: str = "unmapped_line"
asset_id: str | None = None
batch_id: str | None = None
work_order_id: str | None = None
unit: str = "unknown"
simulated: bool = False


@dataclass(frozen=True)
class OpcUaAdapterRunResult:
profiles_read: int
nodes_read: int
emitted_events: int
events_store_path: Path | None


class AsyncuaOpcUaNodeReader:
async def read_configured_nodes(
self,
profile: ProtocolConnectionProfile,
) -> Sequence[OpcUaNodeValue]:
if profile.opcua is None:
msg = f"OPC-UA profile {profile.id} is missing its opcua config block."
raise OpcUaConfiguredNodeReadError(msg)

from asyncua import Client

try:
async with Client(profile.endpoint) as client:
values: list[OpcUaNodeValue] = []
for node_id in profile.opcua.node_ids:
try:
value = await client.get_node(node_id).read_value()
except Exception as exc:
msg = (
f"failed to read configured OPC-UA node {node_id} "
f"for profile {profile.id}: {exc}"
)
raise OpcUaConfiguredNodeReadError(msg) from exc
values.append(_node_value(profile, node_id, value))
return values
except OpcUaConfiguredNodeReadError:
raise
except Exception as exc:
msg = (
f"unable to connect to OPC-UA endpoint {_redact_endpoint(profile.endpoint)} "
f"for profile {profile.id}: {exc}"
)
raise OpcUaServerUnavailableError(msg) from exc


def load_enabled_opcua_profiles(profile_store_path: Path) -> list[ProtocolConnectionProfile]:
if not profile_store_path.exists():
return []

try:
raw_profiles = json.loads(profile_store_path.read_text(encoding="utf-8"))
except json.JSONDecodeError as exc:
msg = f"connection profile store is not valid JSON: {profile_store_path}"
raise OpcUaConnectionProfilesStoreError(msg) from exc

if not isinstance(raw_profiles, list):
msg = f"connection profile store must contain a list: {profile_store_path}"
raise OpcUaConnectionProfilesStoreError(msg)

profiles: list[ProtocolConnectionProfile] = []
for raw_profile in raw_profiles:
if not isinstance(raw_profile, dict):
msg = "connection profile store entries must be JSON objects"
raise OpcUaConnectionProfilesStoreError(msg)
profile = validate_connection_profile(raw_profile)
if profile.protocol == "opcua" and profile.enabled:
profiles.append(profile)
return profiles


async def run_opcua_read_only_adapter(
*,
profile_store_path: Path,
events_store: EventStore,
reader: OpcUaNodeReader | None = None,
context_defaults: OpcUaEventContextDefaults | None = None,
poll_index: int = 0,
timestamp: datetime | None = None,
) -> OpcUaAdapterRunResult:
if poll_index < 0:
msg = "poll_index must be greater than or equal to 0"
raise ValueError(msg)

profiles = load_enabled_opcua_profiles(profile_store_path)
node_reader = reader or AsyncuaOpcUaNodeReader()
defaults = context_defaults or OpcUaEventContextDefaults()
event_timestamp = timestamp or datetime.now(UTC)
nodes_read = 0
emitted_events = 0

for profile in profiles:
node_values = await node_reader.read_configured_nodes(profile)
nodes_read += len(node_values)
for node_value in node_values:
event = build_process_measurement_event(
profile,
node_value,
poll_index=poll_index,
timestamp=event_timestamp,
context_defaults=defaults,
)
events_store.append(event)
emitted_events += 1

return OpcUaAdapterRunResult(
profiles_read=len(profiles),
nodes_read=nodes_read,
emitted_events=emitted_events,
events_store_path=getattr(events_store, "path", None),
)


def build_process_measurement_event(
profile: ProtocolConnectionProfile,
node_value: OpcUaNodeValue,
*,
poll_index: int,
timestamp: datetime,
context_defaults: OpcUaEventContextDefaults | None = None,
) -> EventEnvelope:
defaults = context_defaults or OpcUaEventContextDefaults()
node_key = _node_key(node_value.node_id)
event_id = f"evt_opcua_{_identifier_key(profile.id)}_{node_key}_{poll_index:04d}"
return EventEnvelope(
event_id=event_id,
event_type="process.measurement.recorded",
schema_version="1.0.0",
timestamp=timestamp,
source=EventSource(
system=f"opcua:{profile.id}",
adapter="opcua-read-only-adapter",
source_event_id=f"opcua:{profile.id}:{node_value.node_id}:poll:{poll_index:04d}",
),
context=EventContext(
site_id=defaults.site_id,
area_id=defaults.area_id,
line_id=defaults.line_id,
asset_id=defaults.asset_id,
batch_id=defaults.batch_id,
work_order_id=defaults.work_order_id,
),
payload=ProcessMeasurementPayload(
signal_id=node_key,
signal_name=_signal_name(node_key),
tag_name=node_value.node_id,
value=_numeric_node_value(profile, node_value.node_id, node_value.value),
unit=defaults.unit,
quality=node_value.quality,
),
metadata=EventMetadata(
simulated=defaults.simulated,
trace_id=f"trace_opcua_{_identifier_key(profile.id)}_{poll_index:04d}",
),
)


def _node_value(
profile: ProtocolConnectionProfile,
node_id: str,
value: Any,
) -> OpcUaNodeValue:
return OpcUaNodeValue(node_id=node_id, value=_numeric_node_value(profile, node_id, value))


def _numeric_node_value(
profile: ProtocolConnectionProfile,
node_id: str,
value: Any,
) -> float:
if isinstance(value, bool) or not isinstance(value, int | float):
msg = (
f"configured OPC-UA node {node_id} for profile {profile.id} returned "
f"a nonnumeric value that cannot be normalized as a process measurement"
)
raise OpcUaConfiguredNodeReadError(msg)
return float(value)


def _node_key(node_id: str) -> str:
if ";s=" in node_id:
node_id = node_id.rsplit(";s=", maxsplit=1)[1]
elif "=" in node_id:
node_id = node_id.rsplit("=", maxsplit=1)[1]
return _identifier_key(node_id)


def _identifier_key(value: str) -> str:
lowered = value.lower()
normalized = re.sub(r"[^a-z0-9]+", "_", lowered).strip("_")
return normalized or "unknown_node"


def _signal_name(signal_id: str) -> str:
return " ".join(part.capitalize() for part in signal_id.split("_"))


def _redact_endpoint(endpoint: str) -> str:
return SENSITIVE_URL_CREDENTIAL_PATTERN.sub(r"\1<redacted>@", endpoint)
Loading
Loading