From 96bab909b15131fcc8e465386afe919ee5af112f Mon Sep 17 00:00:00 2001 From: Christian-Manuel Butzke Date: Wed, 1 Jul 2026 12:24:01 +0900 Subject: [PATCH] =?UTF-8?q?engine:=20Observer=20adapter=20=E2=80=94=20pass?= =?UTF-8?q?ive=20per-step=20hook=20+=20built-ins=20(SPEC=20=C2=A78)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Host accepts an optional observer callback, invoked once per completed RTC step (auto and manual) with { instance, event, transition, entered, exited, published, spawned, faulted }. The per-step record construction is factored into one helper shared by run_to_quiescence and step, so both paths notify identically; the observer is purely passive. Add built-in JsonlObserver (a drop-in transition log) and CollectingObserver, export them, and document usage. Closes #28. --- README.md | 11 +++++ src/harel/__init__.py | 4 ++ src/harel/engine.py | 53 +++++++++++++----------- src/harel/observer.py | 46 +++++++++++++++++++++ tests/test_observer.py | 93 ++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 184 insertions(+), 23 deletions(-) create mode 100644 src/harel/observer.py create mode 100644 tests/test_observer.py diff --git a/README.md b/README.md index cd923bd..0001d2a 100644 --- a/README.md +++ b/README.md @@ -85,6 +85,17 @@ The public surface is everything exported from the top-level `harel` package `load_definitions` / `load_definition`, `validate` / `collect_errors`, and the error types. See [`tests/test_library_api.py`](tests/test_library_api.py). +### Observing transitions (SPEC §8) +Pass an **observer** — a passive callback invoked once per RTC step (automatic *or* +manual) with `{ instance, event, transition, entered, exited, published, spawned, +faulted }`. Built-ins: `JsonlObserver(stream)` (a drop-in transition log) and +`CollectingObserver` (records to a list). + +```python +import sys, harel +host = harel.Host(observer=harel.JsonlObserver(sys.stdout)) # one JSON line per step +``` + ## Layout - `src/harel/` — the package. - `tests/` — unit tests and the conformance harness. diff --git a/src/harel/__init__.py b/src/harel/__init__.py index e7b3760..3d6ce20 100644 --- a/src/harel/__init__.py +++ b/src/harel/__init__.py @@ -14,6 +14,7 @@ from .errors import ErrorRecord, HarelError, SchemaError, ValidationError from .instance import Event, Instance, Status from .model import Machine, State +from .observer import CollectingObserver, JsonlObserver, Observer from .validator import collect_errors, validate __all__ = [ @@ -24,6 +25,9 @@ "Host", "Instance", "Machine", + "Observer", + "JsonlObserver", + "CollectingObserver", "SchemaError", "State", "Status", diff --git a/src/harel/engine.py b/src/harel/engine.py index e5bfa92..7cc378f 100644 --- a/src/harel/engine.py +++ b/src/harel/engine.py @@ -9,22 +9,25 @@ from __future__ import annotations -from typing import Any, cast +from typing import Any from . import cel, values from .definition import Definition from .instance import DELIVERABLE_RESERVED_EVENTS, Event, Instance, Status from .model import Machine +from .observer import Observer class Host: - def __init__(self) -> None: + def __init__(self, observer: Observer | None = None) -> None: self.machines: dict[str, Machine] = {} self.versions: dict[tuple[str, int], Machine] = {} self.instances: dict[str, Instance] = {} self.published: list[str] = [] # event names handed to the bus, in order self.spawned: list[str] = [] # child defIds, in order self._spawn_counters: dict[str, int] = {} + # Passive per-step observer (SPEC §8); None = no-op. + self.observer: Observer | None = observer self.now: int = 0 # virtual clock, in milliseconds (SPEC §5.9) self.mode: str = "auto" # processing mode, auto|manual (SPEC §14) self._seq: int = 0 @@ -105,27 +108,32 @@ def step(self, instance: Instance | str, n: int = 1) -> list[dict[str, Any]]: for _ in range(n): if inst.status is not Status.ACTIVE or not inst.queue: break - ev = inst.queue.popleft() - before = set(inst.active_leaf_names()) - pub_before = len(self.published) - sp_before = len(self.spawned) - inst._last_target = None # noqa: SLF001 - inst.step(ev) - after = set(inst.active_leaf_names()) - faulted = cast(Status, inst.status) is Status.FAULTED - records.append( - { - "event": ev.type, - "transition": inst._last_target, # noqa: SLF001 - "entered": sorted(after - before), - "exited": sorted(before - after), - "published": list(self.published[pub_before:]), - "spawned": list(self.spawned[sp_before:]), - "faulted": faulted, - } - ) + records.append(self._run_one_step(inst)) return records + def _run_one_step(self, inst: Instance) -> dict[str, Any]: + """Dequeue and process one event; build the per-step record and notify the + observer (SPEC §8/§14). The caller guarantees ``inst.queue`` is non-empty.""" + ev = inst.queue.popleft() + before = set(inst.active_leaf_names()) + pub_before = len(self.published) + sp_before = len(self.spawned) + inst._last_target = None # noqa: SLF001 + inst.step(ev) + after = set(inst.active_leaf_names()) + record = { + "event": ev.type, + "transition": inst._last_target, # noqa: SLF001 + "entered": sorted(after - before), + "exited": sorted(before - after), + "published": list(self.published[pub_before:]), + "spawned": list(self.spawned[sp_before:]), + "faulted": inst.status is Status.FAULTED, + } + if self.observer is not None: + self.observer({"instance": inst.id, **record}) + return record + def inspect(self, instance: Instance | str) -> dict[str, Any]: """Full internal state for debugging, beyond ``state`` (SPEC §14).""" inst = instance if isinstance(instance, Instance) else self.instances[instance] @@ -183,8 +191,7 @@ def run_to_quiescence(self) -> None: if inst.status is not Status.ACTIVE: continue while inst.queue: - ev = inst.queue.popleft() - inst.step(ev) + self._run_one_step(inst) progress = True # --- snapshot round-trip (SPEC §8) ------------------------------------- diff --git a/src/harel/observer.py b/src/harel/observer.py new file mode 100644 index 0000000..0ef3d6a --- /dev/null +++ b/src/harel/observer.py @@ -0,0 +1,46 @@ +"""Observer adapter (SPEC §8): a passive per-step callback. + +When a :class:`~harel.engine.Host` is given an observer, it is invoked once per +completed RTC step — for both automatic (run-to-quiescence) and manual (``step``) +processing — with a record:: + + { instance, event, transition, entered, exited, published, spawned, faulted } + +An observer is purely observational: it MUST NOT mutate engine state or influence +dispatch. It is the spec-native mechanism for transition logging and live +visualization, distinct from host-language diagnostic logging (``logging``). +""" + +from __future__ import annotations + +import json +from collections.abc import Callable +from typing import Any, TextIO + +# An observer is any callable taking one per-step record. +Observer = Callable[[dict[str, Any]], None] + + +class JsonlObserver: + """Write one JSON record per line — a drop-in transition log. + + >>> import sys + >>> host = Host(observer=JsonlObserver(sys.stdout)) # doctest: +SKIP + """ + + def __init__(self, stream: TextIO) -> None: + self._stream = stream + + def __call__(self, record: dict[str, Any]) -> None: + self._stream.write(json.dumps(record) + "\n") + self._stream.flush() + + +class CollectingObserver: + """Collect records into ``.records`` — handy for tests and inspection.""" + + def __init__(self) -> None: + self.records: list[dict[str, Any]] = [] + + def __call__(self, record: dict[str, Any]) -> None: + self.records.append(record) diff --git a/tests/test_observer.py b/tests/test_observer.py new file mode 100644 index 0000000..aa18f7f --- /dev/null +++ b/tests/test_observer.py @@ -0,0 +1,93 @@ +"""Observer adapter (SPEC §8): passive per-step callback.""" + +from __future__ import annotations + +import io +import json + +import harel +from harel import CollectingObserver, Host, JsonlObserver + +TURNSTILE = """\ +id: turnstile +events: + coin: { payload: { amount: { type: int, required: true } } } + push: {} +top: + esvs: + fare: { type: int, init: 50 } + initial: { transition_to: locked } + states: + locked: + on_events: + coin: { transition_to: unlocked, guard: "event.payload.amount >= fare" } + unlocked: + on_events: + push: { transition_to: locked } +""" + + +def _host(observer=None) -> Host: + host = Host(observer=observer) + host.register_all(harel.load_definitions(TURNSTILE)) + host.create_root(host.machines["turnstile"], "t1") + host.run_to_quiescence() + return host + + +def test_observer_fires_on_auto_processing() -> None: + obs = CollectingObserver() + host = _host(obs) + obs.records.clear() # ignore initial-transition churn; focus on the send + host.deliver("t1", "coin", {"amount": 100}) + host.run_to_quiescence() + assert len(obs.records) == 1 + rec = obs.records[0] + assert rec["instance"] == "t1" + assert rec["event"] == "coin" + assert rec["entered"] == ["unlocked"] + assert rec["exited"] == ["locked"] + assert rec["faulted"] is False + assert set(rec) == { + "instance", "event", "transition", "entered", "exited", + "published", "spawned", "faulted", + } + + +def test_observer_fires_on_manual_step() -> None: + obs = CollectingObserver() + host = _host(obs) + obs.records.clear() + host.inject("t1", "coin", {"amount": 100}) # enqueue, do not process + assert obs.records == [] # nothing fired yet + host.step("t1") # one manual RTC step + assert len(obs.records) == 1 + assert obs.records[0]["entered"] == ["unlocked"] + + +def test_observer_none_is_noop() -> None: + host = _host(None) # must not raise + host.deliver("t1", "coin", {"amount": 100}) + host.run_to_quiescence() + assert host.instances["t1"].active_leaf_names() == ["unlocked"] + + +def test_observer_is_passive() -> None: + """An observer that ignores its record must not change engine behavior.""" + a = _host(CollectingObserver()) + b = _host(None) + a.deliver("t1", "coin", {"amount": 100}) + a.run_to_quiescence() + b.deliver("t1", "coin", {"amount": 100}) + b.run_to_quiescence() + assert a.instances["t1"].active_leaf_names() == b.instances["t1"].active_leaf_names() + + +def test_jsonl_observer_writes_records() -> None: + buf = io.StringIO() + host = _host(JsonlObserver(buf)) + host.deliver("t1", "coin", {"amount": 100}) + host.run_to_quiescence() + lines = [json.loads(x) for x in buf.getvalue().splitlines() if x.strip()] + assert lines[-1]["event"] == "coin" + assert lines[-1]["entered"] == ["unlocked"]