diff --git a/src/harel/cli.py b/src/harel/cli.py index 2487ffb..b9403ef 100644 --- a/src/harel/cli.py +++ b/src/harel/cli.py @@ -109,6 +109,26 @@ def add(cmd: str, **kw: Any) -> argparse.ArgumentParser: run = add("run") run.add_argument("source", nargs="?", default="-", help="'-' for stdin, or an NDJSON file") run.set_defaults(cmd=cmd_run) + + md = add("mode") + md.add_argument("mode", nargs="?", choices=["auto", "manual"]) + md.set_defaults(cmd=cmd_mode) + + ij = add("inject") + ij.add_argument("instance") + ij.add_argument("event") + ij.add_argument("--payload", action="append", default=[]) + ij.add_argument("--payload-json", default=None) + ij.set_defaults(cmd=cmd_inject) + + sp = add("step") + sp.add_argument("instance") + sp.add_argument("--steps", type=int, default=1) + sp.set_defaults(cmd=cmd_step) + + ip = add("inspect") + ip.add_argument("instance") + ip.set_defaults(cmd=cmd_inspect) return p @@ -116,6 +136,7 @@ def add(cmd: str, **kw: Any) -> argparse.ArgumentParser: def _build_host(state: StoreState) -> Host: host = Host() host.now = state.now + host.mode = state.mode host._spawn_counters = dict(state.spawn_counters) # noqa: SLF001 for text in state.defs.values(): host.register_all(load_definitions(text)) @@ -126,6 +147,7 @@ def _build_host(state: StoreState) -> Host: def _persist(store: Store, state: StoreState, host: Host) -> None: state.instances = host.snapshot_all() state.now = host.now + state.mode = host.mode state.spawn_counters = dict(host._spawn_counters) # noqa: SLF001 store.save(state) @@ -194,7 +216,7 @@ def cmd_send(args: argparse.Namespace, store: Store) -> int: if not host.deliver(args.instance, args.event, payload): print(f"rejected: {args.event}", file=sys.stderr) return EXIT_VALIDATION - host.run_to_quiescence() + host.maybe_run() if args.json: obj = _state_json(host, host.instances[args.instance]) obj["published"] = host.published[before:] @@ -210,7 +232,7 @@ def cmd_advance(args: argparse.Namespace, store: Store) -> int: state = store.load() host = _build_host(state) host.advance(args.duration) - host.run_to_quiescence() + host.maybe_run() if args.json: print(json.dumps({"now": host.now})) _persist(store, state, host) @@ -225,7 +247,7 @@ def cmd_env(args: argparse.Namespace, store: Store) -> int: return EXIT_NOT_FOUND changed = _parse_csv_kv(args.changed) host.deliver(args.instance, "env", {"changed": changed}) - host.run_to_quiescence() + host.maybe_run() _print_state(args, host, host.instances[args.instance]) _persist(store, state, host) return EXIT_OK @@ -305,6 +327,76 @@ def cmd_export(args: argparse.Namespace, store: Store) -> int: return EXIT_OK +# --- introspection & stepping (SPEC §14) ------------------------------------ +def cmd_mode(args: argparse.Namespace, store: Store) -> int: + state = store.load() + if args.mode is not None: + state.mode = args.mode + store.save(state) + if args.json: + print(json.dumps({"mode": state.mode})) + else: + print(state.mode) + return EXIT_OK + + +def cmd_inject(args: argparse.Namespace, store: Store) -> int: + state = store.load() + host = _build_host(state) + inst = host.instances.get(args.instance) + if inst is None: + print(f"no such instance: {args.instance}", file=sys.stderr) + return EXIT_NOT_FOUND + payload = _build_payload(args, inst.machine) + if not host.inject(args.instance, args.event, payload): + print(f"rejected: {args.event}", file=sys.stderr) + return EXIT_VALIDATION + _print_state(args, host, inst) + _persist(store, state, host) + return EXIT_OK + + +def cmd_step(args: argparse.Namespace, store: Store) -> int: + state = store.load() + host = _build_host(state) + inst = host.instances.get(args.instance) + if inst is None: + print(f"no such instance: {args.instance}", file=sys.stderr) + return EXIT_NOT_FOUND + records = host.step(inst, args.steps) + if args.json: + obj = _state_json(host, inst) + obj["steps"] = records + print(json.dumps(obj)) + _persist(store, state, host) + if inst.status is Status.FAULTED: + return EXIT_FAULTED + return EXIT_OK + + +def cmd_inspect(args: argparse.Namespace, store: Store) -> int: + state = store.load() + host = _build_host(state) + inst = host.instances.get(args.instance) + if inst is None: + print(f"no such instance: {args.instance}", file=sys.stderr) + return EXIT_NOT_FOUND + if args.json: + obj = {"instance": inst.id, **host.inspect(inst)} + print(json.dumps(obj)) + else: + _print_inspect(inst, host.inspect(inst)) + return EXIT_OK + + +def _print_inspect(inst: Instance, info: dict[str, Any]) -> None: + print( + f"{inst.id}\t{info['status']}\t{info['config']}\t" + f"queue={len(info['queue'])} deferred={len(info['deferred'])} " + f"timers={len(info['timers'])}" + ) + + # --- batch / streaming mode (SPEC §13.7) ------------------------------------ def cmd_run(args: argparse.Namespace, store: Store) -> int: """Drive many commands from NDJSON stdin against one store + virtual clock. diff --git a/src/harel/engine.py b/src/harel/engine.py index 66e39e3..e5bfa92 100644 --- a/src/harel/engine.py +++ b/src/harel/engine.py @@ -9,7 +9,7 @@ from __future__ import annotations -from typing import Any +from typing import Any, cast from . import cel, values from .definition import Definition @@ -26,6 +26,7 @@ def __init__(self) -> None: self.spawned: list[str] = [] # child defIds, in order self._spawn_counters: dict[str, int] = {} self.now: int = 0 # virtual clock, in milliseconds (SPEC §5.9) + self.mode: str = "auto" # processing mode, auto|manual (SPEC §14) self._seq: int = 0 # --- registration / creation ------------------------------------------- @@ -70,6 +71,15 @@ def deliver( payload: dict[str, Any] | None = None, ) -> bool: """Validate and enqueue an event; return False if rejected (§4.3).""" + return self.inject(instance_id, event_type, payload) + + def inject( + self, + instance_id: str, + event_type: str, + payload: dict[str, Any] | None = None, + ) -> bool: + """Validate and enqueue without processing, in either mode (SPEC §14).""" inst = self.instances[instance_id] ok, _reason = self.validate_event(inst.machine, event_type, payload) if not ok: @@ -78,6 +88,65 @@ def deliver( return True # --- execution ---------------------------------------------------------- + def maybe_run(self) -> None: + """Run all instances to quiescence in auto mode only (SPEC §14).""" + if self.mode == "auto": + self.run_to_quiescence() + + def step(self, instance: Instance | str, n: int = 1) -> list[dict[str, Any]]: + """Process exactly ``n`` RTC steps of one instance (SPEC §14). + + Returns one per-step record per step taken: + ``{ event, transition, entered, exited, published, spawned, faulted }``. + Stops early if the instance faults or its queue drains. + """ + inst = instance if isinstance(instance, Instance) else self.instances[instance] + records: list[dict[str, Any]] = [] + for _ in range(n): + if inst.status is not Status.ACTIVE or not inst.queue: + break + ev = inst.queue.popleft() + before = set(inst.active_leaf_names()) + pub_before = len(self.published) + sp_before = len(self.spawned) + inst._last_target = None # noqa: SLF001 + inst.step(ev) + after = set(inst.active_leaf_names()) + faulted = cast(Status, inst.status) is Status.FAULTED + records.append( + { + "event": ev.type, + "transition": inst._last_target, # noqa: SLF001 + "entered": sorted(after - before), + "exited": sorted(before - after), + "published": list(self.published[pub_before:]), + "spawned": list(self.spawned[sp_before:]), + "faulted": faulted, + } + ) + return records + + def inspect(self, instance: Instance | str) -> dict[str, Any]: + """Full internal state for debugging, beyond ``state`` (SPEC §14).""" + inst = instance if isinstance(instance, Instance) else self.instances[instance] + out: dict[str, Any] = { + "status": inst.status.value, + "config": inst.active_leaf_names(), + "esvs": inst.resolved_esvs(), + "queue": [Instance._event_to_snap(e) for e in inst.queue], + "deferred": [Instance._event_to_snap(e) for e in inst.deferred], + "timers": [ + {"fire_at": t["fire_at"], "state_path": t["state_path"], "spec": t["spec"]} + for t in inst.timers + ], + "history": { + p: {"kind": k, "data": d} for p, (k, d) in inst.history.items() + }, + } + if inst.dead_letter: + out["dead_letter"] = list(inst.dead_letter) + return out + def next_seq(self) -> int: self._seq += 1 return self._seq diff --git a/src/harel/instance.py b/src/harel/instance.py index 6cd882e..ad89675 100644 --- a/src/harel/instance.py +++ b/src/harel/instance.py @@ -73,6 +73,7 @@ def __init__( self.external: dict[str, Any] = dict(external or {}) self.current_event: Event | None = None self._pending_terminate = False + self._last_target: str | None = None # target of the last transition (§14) if auto_enter: self._enter_top() @@ -295,9 +296,11 @@ def run_transition( actions = transition.get("action") or [] if target_ref is None: # internal transition: actions only, no exit/entry (SPEC §5.5) + self._last_target = None self.run_actions(actions, owner, event) return target = self.machine.resolve_target(owner, target_ref) + self._last_target = target.name local = bool(transition.get("local")) if local: lca = owner # the containing composite is not exited/re-entered diff --git a/src/harel/store.py b/src/harel/store.py index 9a04482..3a8bdbc 100644 --- a/src/harel/store.py +++ b/src/harel/store.py @@ -21,6 +21,7 @@ class StoreState: instances: list[dict[str, Any]] = field(default_factory=list) now: int = 0 spawn_counters: dict[str, int] = field(default_factory=dict) + mode: str = "auto" # processing mode, auto|manual (SPEC §14) class Store: @@ -45,6 +46,7 @@ def load(self) -> StoreState: instances=instances, now=int(meta.get("now", 0)), spawn_counters=dict(meta.get("spawn_counters") or {}), + mode=str(meta.get("mode", "auto")), ) def save(self, state: StoreState) -> None: @@ -57,7 +59,12 @@ def save(self, state: StoreState) -> None: ) (self.path / "meta.json").write_text( json.dumps( - {"now": state.now, "spawn_counters": state.spawn_counters}, indent=2 + { + "now": state.now, + "spawn_counters": state.spawn_counters, + "mode": state.mode, + }, + indent=2, ), encoding="utf-8", ) diff --git a/tests/test_conformance.py b/tests/test_conformance.py index 52496bd..bf8ec37 100644 --- a/tests/test_conformance.py +++ b/tests/test_conformance.py @@ -86,7 +86,7 @@ def test_suite_present() -> None: if not CONFORMANCE_DIR.exists(): pytest.skip("conformance suite not fetched (offline; set HAREL_CONFORMANCE_DIR)") assert len(engine_cases()) == 22, "expected 22 engine cases" - assert len(cli_cases()) == 2, "expected 2 CLI cases" + assert len(cli_cases()) == 3, "expected 3 CLI cases" @pytest.mark.parametrize("case", engine_cases(), ids=lambda c: c.name) diff --git a/tests/test_stepping.py b/tests/test_stepping.py new file mode 100644 index 0000000..3876fdc --- /dev/null +++ b/tests/test_stepping.py @@ -0,0 +1,253 @@ +"""Introspection + step-by-step execution (SPEC §14). + +Covers the library primitives (``inject`` / ``step`` / ``inspect`` / manual mode) +and the CLI verbs (``mode`` / ``inject`` / ``step`` / ``inspect``), including the +manual-mode toggle where ``send`` enqueues without processing. +""" + +from __future__ import annotations + +import io +import json +from pathlib import Path + +import pytest + +import harel +import harel.cli as cli + +TURNSTILE = """\ +id: turnstile +events: + coin: { payload: { amount: { type: int, required: true } } } + push: {} +top: + esvs: + fare: { type: int, init: 50 } + initial: { transition_to: locked } + states: + locked: + on_events: + coin: { transition_to: unlocked, guard: "event.payload.amount >= fare" } + unlocked: + on_events: + push: { transition_to: locked } +""" + + +def _host() -> harel.Host: + host = harel.Host() + host.register_all(harel.load_definitions(TURNSTILE)) + host.create_root(host.machines["turnstile"], "t1") + host.run_to_quiescence() + return host + + +# --- library: inject / step / inspect --------------------------------------- +def test_inject_enqueues_without_processing() -> None: + host = _host() + inst = host.instances["t1"] + assert inst.active_leaf_names() == ["locked"] + + accepted = host.inject("t1", "coin", {"amount": 100}) + assert accepted is True + # enqueued, but the config is unchanged (nothing processed). + assert inst.active_leaf_names() == ["locked"] + assert [e.type for e in inst.queue] == ["coin"] + + +def test_inject_rejects_invalid_payload() -> None: + host = _host() + assert host.inject("t1", "coin", {"amount": "nope"}) is False + assert len(host.instances["t1"].queue) == 0 # not enqueued + + +def test_step_returns_per_step_record_and_advances() -> None: + host = _host() + host.inject("t1", "coin", {"amount": 100}) + records = host.step("t1", 1) + inst = host.instances["t1"] + + assert len(records) == 1 + rec = records[0] + assert rec["event"] == "coin" + assert rec["transition"] == "unlocked" + assert rec["entered"] == ["unlocked"] + assert rec["exited"] == ["locked"] + assert rec["published"] == [] + assert rec["spawned"] == [] + assert rec["faulted"] is False + assert inst.active_leaf_names() == ["unlocked"] + + +def test_step_drains_only_n_events() -> None: + host = _host() + host.inject("t1", "coin", {"amount": 100}) # -> unlocked + host.inject("t1", "push") # -> locked + records = host.step("t1", 1) # one RTC step only + assert len(records) == 1 + assert host.instances["t1"].active_leaf_names() == ["unlocked"] + # one event still pending. + assert [e.type for e in host.instances["t1"].queue] == ["push"] + + +def test_step_with_empty_queue_returns_no_records() -> None: + host = _host() + assert host.step("t1", 5) == [] + + +def test_inspect_exposes_full_internal_state() -> None: + host = _host() + host.inject("t1", "coin", {"amount": 100}) + info = host.inspect("t1") + assert info["status"] == "active" + assert info["config"] == ["locked"] + assert info["esvs"] == {"fare": 50} + assert [e["type"] for e in info["queue"]] == ["coin"] + assert info["deferred"] == [] + assert info["timers"] == [] + assert info["history"] == {} + + +def test_manual_mode_send_enqueues_via_maybe_run() -> None: + host = _host() + host.mode = "manual" + host.deliver("t1", "coin", {"amount": 100}) + host.maybe_run() # manual -> does NOT run + assert host.instances["t1"].active_leaf_names() == ["locked"] + host.step("t1", 1) # explicit step advances + assert host.instances["t1"].active_leaf_names() == ["unlocked"] + + +# --- CLI verbs --------------------------------------------------------------- +def _run( + tmp_path: Path, + argv: list[str], + monkeypatch: pytest.MonkeyPatch, + capsys: pytest.CaptureFixture[str], +) -> tuple[int, str]: + rc = cli.main(["--store", str(tmp_path / "store"), *argv]) + out = capsys.readouterr().out + return rc, out + + +def _run_batch( + tmp_path: Path, + lines: list[list[str]], + monkeypatch: pytest.MonkeyPatch, + capsys: pytest.CaptureFixture[str], +) -> tuple[int, list[dict]]: + stdin = "".join(json.dumps(line) + "\n" for line in lines) + monkeypatch.setattr("sys.stdin", io.StringIO(stdin)) + rc = cli.main(["--store", str(tmp_path / "store"), "run", "-"]) + out = capsys.readouterr().out + return rc, [json.loads(x) for x in out.splitlines() if x.strip()] + + +def test_cli_mode_persists_and_toggles(tmp_path, monkeypatch, capsys): + machine = tmp_path / "m.yaml" + machine.write_text(TURNSTILE) + + rc, out = _run(tmp_path, ["mode", "--json"], monkeypatch, capsys) + assert rc == 0 and json.loads(out) == {"mode": "auto"} + + rc, _ = _run(tmp_path, ["mode", "manual"], monkeypatch, capsys) + assert rc == 0 + + rc, out = _run(tmp_path, ["mode", "--json"], monkeypatch, capsys) + assert json.loads(out) == {"mode": "manual"} + + +def test_cli_inject_enqueues_without_processing(tmp_path, monkeypatch, capsys): + machine = tmp_path / "m.yaml" + machine.write_text(TURNSTILE) + _run(tmp_path, ["new", "t1", str(machine)], monkeypatch, capsys) + + rc, out = _run( + tmp_path, ["inject", "t1", "coin", "--payload", "amount=100", "--json"], + monkeypatch, capsys, + ) + assert rc == 0 + obj = json.loads(out) + assert obj["config"] == ["locked"] # not processed + + _, out = _run(tmp_path, ["inspect", "t1", "--json"], monkeypatch, capsys) + info = json.loads(out) + assert [e["type"] for e in info["queue"]] == ["coin"] + + +def test_cli_step_advances_and_reports(tmp_path, monkeypatch, capsys): + machine = tmp_path / "m.yaml" + machine.write_text(TURNSTILE) + _run(tmp_path, ["new", "t1", str(machine)], monkeypatch, capsys) + _run(tmp_path, ["inject", "t1", "coin", "--payload", "amount=100"], monkeypatch, capsys) + + rc, out = _run(tmp_path, ["step", "t1", "--steps", "1", "--json"], monkeypatch, capsys) + assert rc == 0 + obj = json.loads(out) + assert obj["config"] == ["unlocked"] + assert len(obj["steps"]) == 1 + assert obj["steps"][0]["transition"] == "unlocked" + + +def test_cli_inspect_full_shape(tmp_path, monkeypatch, capsys): + machine = tmp_path / "m.yaml" + machine.write_text(TURNSTILE) + _run(tmp_path, ["new", "t1", str(machine)], monkeypatch, capsys) + _run(tmp_path, ["inject", "t1", "coin", "--payload", "amount=100"], monkeypatch, capsys) + + rc, out = _run(tmp_path, ["inspect", "t1", "--json"], monkeypatch, capsys) + assert rc == 0 + info = json.loads(out) + assert info["instance"] == "t1" + assert info["config"] == ["locked"] + assert info["queue"] == [{"type": "coin", "payload": {"amount": 100}}] + assert info["deferred"] == [] + assert info["timers"] == [] + + +def test_cli_send_in_manual_mode_enqueues_only(tmp_path, monkeypatch, capsys): + machine = tmp_path / "m.yaml" + machine.write_text(TURNSTILE) + _run(tmp_path, ["new", "t1", str(machine)], monkeypatch, capsys) + _run(tmp_path, ["mode", "manual"], monkeypatch, capsys) + + # send enqueues but does not process -> config stays locked. + rc, out = _run( + tmp_path, ["send", "t1", "coin", "--payload", "amount=100", "--json"], + monkeypatch, capsys, + ) + assert rc == 0 + assert json.loads(out)["config"] == ["locked"] + + _, out = _run(tmp_path, ["inspect", "t1", "--json"], monkeypatch, capsys) + assert [e["type"] for e in json.loads(out)["queue"]] == ["coin"] + + rc, out = _run(tmp_path, ["step", "t1", "--json"], monkeypatch, capsys) + assert rc == 0 and json.loads(out)["config"] == ["unlocked"] + + +def test_cli_batch_stepping_session(tmp_path, monkeypatch, capsys): + """The black-box §13.7 form: one run process, manual mode, step once.""" + machine = tmp_path / "m.yaml" + machine.write_text(TURNSTILE) + rc, results = _run_batch( + tmp_path, + [ + ["new", "t1", str(machine)], + ["mode", "manual"], + ["send", "t1", "coin", "--payload", "amount=100"], + ["inspect", "t1"], + ["step", "t1", "--steps", "1"], + ["mode", "auto"], + ], + monkeypatch, + capsys, + ) + assert rc == 0 + assert [r["ok"] for r in results] == [True, True, True, True, True, True] + assert results[1]["result"] == {"mode": "manual"} + assert results[2]["result"]["config"] == ["locked"] + assert [e["type"] for e in results[3]["result"]["queue"]] == ["coin"] + assert results[4]["result"]["config"] == ["unlocked"] + assert results[5]["result"] == {"mode": "auto"}