Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,17 @@ The public surface is everything exported from the top-level `harel` package
`load_definitions` / `load_definition`, `validate` / `collect_errors`, and the
error types. See [`tests/test_library_api.py`](tests/test_library_api.py).

### Observing transitions (SPEC §8)
Pass an **observer** — a passive callback invoked once per RTC step (automatic *or*
manual) with `{ instance, event, transition, entered, exited, published, spawned,
faulted }`. Built-ins: `JsonlObserver(stream)` (a drop-in transition log) and
`CollectingObserver` (records to a list).

```python
import sys, harel
host = harel.Host(observer=harel.JsonlObserver(sys.stdout)) # one JSON line per step
```

## Layout
- `src/harel/` — the package.
- `tests/` — unit tests and the conformance harness.
Expand Down
4 changes: 4 additions & 0 deletions src/harel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
from .errors import ErrorRecord, HarelError, SchemaError, ValidationError
from .instance import Event, Instance, Status
from .model import Machine, State
from .observer import CollectingObserver, JsonlObserver, Observer
from .validator import collect_errors, validate

__all__ = [
Expand All @@ -24,6 +25,9 @@
"Host",
"Instance",
"Machine",
"Observer",
"JsonlObserver",
"CollectingObserver",
"SchemaError",
"State",
"Status",
Expand Down
53 changes: 30 additions & 23 deletions src/harel/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,25 @@

from __future__ import annotations

from typing import Any, cast
from typing import Any

from . import cel, values
from .definition import Definition
from .instance import DELIVERABLE_RESERVED_EVENTS, Event, Instance, Status
from .model import Machine
from .observer import Observer


class Host:
def __init__(self) -> None:
def __init__(self, observer: Observer | None = None) -> None:
self.machines: dict[str, Machine] = {}
self.versions: dict[tuple[str, int], Machine] = {}
self.instances: dict[str, Instance] = {}
self.published: list[str] = [] # event names handed to the bus, in order
self.spawned: list[str] = [] # child defIds, in order
self._spawn_counters: dict[str, int] = {}
# Passive per-step observer (SPEC §8); None = no-op.
self.observer: Observer | None = observer
self.now: int = 0 # virtual clock, in milliseconds (SPEC §5.9)
self.mode: str = "auto" # processing mode, auto|manual (SPEC §14)
self._seq: int = 0
Expand Down Expand Up @@ -105,27 +108,32 @@ def step(self, instance: Instance | str, n: int = 1) -> list[dict[str, Any]]:
for _ in range(n):
if inst.status is not Status.ACTIVE or not inst.queue:
break
ev = inst.queue.popleft()
before = set(inst.active_leaf_names())
pub_before = len(self.published)
sp_before = len(self.spawned)
inst._last_target = None # noqa: SLF001
inst.step(ev)
after = set(inst.active_leaf_names())
faulted = cast(Status, inst.status) is Status.FAULTED
records.append(
{
"event": ev.type,
"transition": inst._last_target, # noqa: SLF001
"entered": sorted(after - before),
"exited": sorted(before - after),
"published": list(self.published[pub_before:]),
"spawned": list(self.spawned[sp_before:]),
"faulted": faulted,
}
)
records.append(self._run_one_step(inst))
return records

def _run_one_step(self, inst: Instance) -> dict[str, Any]:
"""Dequeue and process one event; build the per-step record and notify the
observer (SPEC §8/§14). The caller guarantees ``inst.queue`` is non-empty."""
ev = inst.queue.popleft()
before = set(inst.active_leaf_names())
pub_before = len(self.published)
sp_before = len(self.spawned)
inst._last_target = None # noqa: SLF001
inst.step(ev)
after = set(inst.active_leaf_names())
record = {
"event": ev.type,
"transition": inst._last_target, # noqa: SLF001
"entered": sorted(after - before),
"exited": sorted(before - after),
"published": list(self.published[pub_before:]),
"spawned": list(self.spawned[sp_before:]),
"faulted": inst.status is Status.FAULTED,
}
if self.observer is not None:
self.observer({"instance": inst.id, **record})
return record

def inspect(self, instance: Instance | str) -> dict[str, Any]:
"""Full internal state for debugging, beyond ``state`` (SPEC §14)."""
inst = instance if isinstance(instance, Instance) else self.instances[instance]
Expand Down Expand Up @@ -183,8 +191,7 @@ def run_to_quiescence(self) -> None:
if inst.status is not Status.ACTIVE:
continue
while inst.queue:
ev = inst.queue.popleft()
inst.step(ev)
self._run_one_step(inst)
progress = True

# --- snapshot round-trip (SPEC §8) -------------------------------------
Expand Down
46 changes: 46 additions & 0 deletions src/harel/observer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
"""Observer adapter (SPEC §8): a passive per-step callback.

When a :class:`~harel.engine.Host` is given an observer, it is invoked once per
completed RTC step — for both automatic (run-to-quiescence) and manual (``step``)
processing — with a record::

{ instance, event, transition, entered, exited, published, spawned, faulted }

An observer is purely observational: it MUST NOT mutate engine state or influence
dispatch. It is the spec-native mechanism for transition logging and live
visualization, distinct from host-language diagnostic logging (``logging``).
"""

from __future__ import annotations

import json
from collections.abc import Callable
from typing import Any, TextIO

# An observer is any callable taking one per-step record.
Observer = Callable[[dict[str, Any]], None]


class JsonlObserver:
"""Write one JSON record per line — a drop-in transition log.

>>> import sys
>>> host = Host(observer=JsonlObserver(sys.stdout)) # doctest: +SKIP
"""

def __init__(self, stream: TextIO) -> None:
self._stream = stream

def __call__(self, record: dict[str, Any]) -> None:
self._stream.write(json.dumps(record) + "\n")
self._stream.flush()


class CollectingObserver:
"""Collect records into ``.records`` — handy for tests and inspection."""

def __init__(self) -> None:
self.records: list[dict[str, Any]] = []

def __call__(self, record: dict[str, Any]) -> None:
self.records.append(record)
93 changes: 93 additions & 0 deletions tests/test_observer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""Observer adapter (SPEC §8): passive per-step callback."""

from __future__ import annotations

import io
import json

import harel
from harel import CollectingObserver, Host, JsonlObserver

TURNSTILE = """\
id: turnstile
events:
coin: { payload: { amount: { type: int, required: true } } }
push: {}
top:
esvs:
fare: { type: int, init: 50 }
initial: { transition_to: locked }
states:
locked:
on_events:
coin: { transition_to: unlocked, guard: "event.payload.amount >= fare" }
unlocked:
on_events:
push: { transition_to: locked }
"""


def _host(observer=None) -> Host:
host = Host(observer=observer)
host.register_all(harel.load_definitions(TURNSTILE))
host.create_root(host.machines["turnstile"], "t1")
host.run_to_quiescence()
return host


def test_observer_fires_on_auto_processing() -> None:
obs = CollectingObserver()
host = _host(obs)
obs.records.clear() # ignore initial-transition churn; focus on the send
host.deliver("t1", "coin", {"amount": 100})
host.run_to_quiescence()
assert len(obs.records) == 1
rec = obs.records[0]
assert rec["instance"] == "t1"
assert rec["event"] == "coin"
assert rec["entered"] == ["unlocked"]
assert rec["exited"] == ["locked"]
assert rec["faulted"] is False
assert set(rec) == {
"instance", "event", "transition", "entered", "exited",
"published", "spawned", "faulted",
}


def test_observer_fires_on_manual_step() -> None:
obs = CollectingObserver()
host = _host(obs)
obs.records.clear()
host.inject("t1", "coin", {"amount": 100}) # enqueue, do not process
assert obs.records == [] # nothing fired yet
host.step("t1") # one manual RTC step
assert len(obs.records) == 1
assert obs.records[0]["entered"] == ["unlocked"]


def test_observer_none_is_noop() -> None:
host = _host(None) # must not raise
host.deliver("t1", "coin", {"amount": 100})
host.run_to_quiescence()
assert host.instances["t1"].active_leaf_names() == ["unlocked"]


def test_observer_is_passive() -> None:
"""An observer that ignores its record must not change engine behavior."""
a = _host(CollectingObserver())
b = _host(None)
a.deliver("t1", "coin", {"amount": 100})
a.run_to_quiescence()
b.deliver("t1", "coin", {"amount": 100})
b.run_to_quiescence()
assert a.instances["t1"].active_leaf_names() == b.instances["t1"].active_leaf_names()


def test_jsonl_observer_writes_records() -> None:
buf = io.StringIO()
host = _host(JsonlObserver(buf))
host.deliver("t1", "coin", {"amount": 100})
host.run_to_quiescence()
lines = [json.loads(x) for x in buf.getvalue().splitlines() if x.strip()]
assert lines[-1]["event"] == "coin"
assert lines[-1]["entered"] == ["unlocked"]
Loading