diff --git a/src/automation/__init__.py b/src/automation/__init__.py new file mode 100644 index 0000000..915c1c8 --- /dev/null +++ b/src/automation/__init__.py @@ -0,0 +1,70 @@ +""" +Serverless Python automation with pluggable components and generator variables. + +Quick-start:: + + from automation import AutomationEngine, AutomationDefinition, TriggerEvent + from automation import Component, ComponentInput, ComponentRegistry + from automation import GeneratorVariable, VariableRegistry + + engine = AutomationEngine() + + # 1. Register a component + engine.register_fn("greet", lambda inp: f"Hello, {inp.payload}!") + + # 2. Define a generator variable + engine.define_variable("counter", lambda: (i for i in range(100))) + + # 3. Define an automation + engine.define(AutomationDefinition( + name="greet_on_request", + triggers=["user.request"], + steps=["greet"], + variables=["counter"], + )) + + # 4. Fire an event (serverless – no threads, no server) + results = engine.trigger_type("user.request", payload="world") + print(results[0].status) # RunStatus.SUCCESS +""" + +from .engine import ( + AutomationDefinition, + AutomationEngine, + RunResult, + RunStatus, + TriggerEvent, + default_engine, + trigger, +) +from .components import ( + Component, + ComponentInput, + ComponentOutput, + ComponentRegistry, + FunctionComponent, +) +from .variables import ( + GeneratorVariable, + VariableRegistry, +) + +__all__ = [ + # Engine + "AutomationEngine", + "AutomationDefinition", + "TriggerEvent", + "RunResult", + "RunStatus", + "default_engine", + "trigger", + # Components + "Component", + "ComponentInput", + "ComponentOutput", + "ComponentRegistry", + "FunctionComponent", + # Variables + "GeneratorVariable", + "VariableRegistry", +] diff --git a/src/automation/__pycache__/__init__.cpython-312.pyc b/src/automation/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..f698bd6 Binary files /dev/null and b/src/automation/__pycache__/__init__.cpython-312.pyc differ diff --git a/src/automation/__pycache__/components.cpython-312.pyc b/src/automation/__pycache__/components.cpython-312.pyc new file mode 100644 index 0000000..6c0a3f4 Binary files /dev/null and b/src/automation/__pycache__/components.cpython-312.pyc differ diff --git a/src/automation/__pycache__/engine.cpython-312.pyc b/src/automation/__pycache__/engine.cpython-312.pyc new file mode 100644 index 0000000..d34d7d4 Binary files /dev/null and b/src/automation/__pycache__/engine.cpython-312.pyc differ diff --git a/src/automation/__pycache__/variables.cpython-312.pyc b/src/automation/__pycache__/variables.cpython-312.pyc new file mode 100644 index 0000000..d03c19e Binary files /dev/null and b/src/automation/__pycache__/variables.cpython-312.pyc differ diff --git a/src/automation/components.py b/src/automation/components.py new file mode 100644 index 0000000..03712d3 --- /dev/null +++ b/src/automation/components.py @@ -0,0 +1,333 @@ +""" +Pluggable component registry for serverless automation. + +Components are self-describing units of work that can be registered by name +and composed inside automation pipelines. The registry enforces a consistent +interface while remaining fully decoupled from any specific runtime or +framework. +""" + +from __future__ import annotations + +import inspect +import traceback +from abc import ABC, abstractmethod +from dataclasses import dataclass, field +from typing import Any, Callable, Dict, List, Optional, Type + + +# --------------------------------------------------------------------------- +# Component contract +# --------------------------------------------------------------------------- + +@dataclass +class ComponentInput: + """Typed input envelope passed to a component.""" + name: str + payload: Any + metadata: Dict[str, Any] = field(default_factory=dict) + + +@dataclass +class ComponentOutput: + """Typed output envelope returned by a component.""" + component: str + result: Any + success: bool = True + error: Optional[str] = None + metadata: Dict[str, Any] = field(default_factory=dict) + + +class Component(ABC): + """ + Base class for all pluggable components. + + Subclass and implement :meth:`execute` to create a new component. + Register the component with a :class:`ComponentRegistry` using + :meth:`ComponentRegistry.register`. + + Subclasses may optionally implement: + + * :meth:`validate` – return ``(True, "")`` to accept the input or + ``(False, "")`` to reject it before execution. + * :meth:`setup` / :meth:`teardown` – hooks called once when the + component is registered / deregistered. + """ + + #: Human-readable name shown in registry listings. Defaults to the + #: class name when not overridden. + name: str = "" + + #: Short description surfaced by :meth:`ComponentRegistry.describe`. + description: str = "" + + def validate(self, input_: ComponentInput) -> tuple[bool, str]: + """ + Validate *input_* before execution. + + Returns: + A ``(valid, reason)`` tuple. ``valid`` is ``True`` when the + input is acceptable. ``reason`` is an empty string on success + or a human-readable message on failure. + """ + return True, "" + + @abstractmethod + def execute(self, input_: ComponentInput) -> ComponentOutput: + """Process *input_* and return a :class:`ComponentOutput`.""" + + def setup(self) -> None: + """Optional lifecycle hook called when the component is registered.""" + + def teardown(self) -> None: + """Optional lifecycle hook called when the component is removed.""" + + # Convenience method so subclasses can build outputs without importing the class + def _ok(self, result: Any, **meta: Any) -> ComponentOutput: + return ComponentOutput( + component=self.name or type(self).__name__, + result=result, + success=True, + metadata=meta, + ) + + def _err(self, error: str, **meta: Any) -> ComponentOutput: + return ComponentOutput( + component=self.name or type(self).__name__, + result=None, + success=False, + error=error, + metadata=meta, + ) + + +# --------------------------------------------------------------------------- +# Function-based component adapter +# --------------------------------------------------------------------------- + +class FunctionComponent(Component): + """ + Wraps a plain callable as a :class:`Component`. + + Useful for registering lambda functions or module-level functions + without creating a full subclass:: + + def double(inp): + return inp.payload * 2 + + registry.register_fn("double", double) + """ + + def __init__( + self, + fn: Callable[[ComponentInput], Any], + name: str = "", + description: str = "", + ) -> None: + self.name = name or fn.__name__ + self.description = description or (inspect.getdoc(fn) or "") + self._fn = fn + + def execute(self, input_: ComponentInput) -> ComponentOutput: + try: + result = self._fn(input_) + return self._ok(result) + except Exception: + return self._err(traceback.format_exc()) + + +# --------------------------------------------------------------------------- +# Registry +# --------------------------------------------------------------------------- + +class ComponentRegistry: + """ + Central store of named :class:`Component` instances. + + Components are looked up by *name* (a plain string) so that pipelines + can remain decoupled from concrete implementations. + + Usage:: + + registry = ComponentRegistry() + + @registry.component("greet") + class GreetComponent(Component): + description = "Returns a greeting string." + + def execute(self, inp): + return self._ok(f"Hello, {inp.payload}!") + + output = registry.run("greet", ComponentInput("greet", "world")) + print(output.result) # Hello, world! + """ + + def __init__(self) -> None: + self._components: Dict[str, Component] = {} + + # ------------------------------------------------------------------ + # Registration + # ------------------------------------------------------------------ + + def register(self, name: str, component: Component) -> "ComponentRegistry": + """ + Register *component* under *name*. + + Calls :meth:`Component.setup` and returns *self* for chaining. + """ + existing = self._components.pop(name, None) + if existing is not None: + existing.teardown() + component.name = component.name or name + component.setup() + self._components[name] = component + return self + + def register_class(self, name: str, cls: Type[Component], **kwargs: Any) -> "ComponentRegistry": + """Instantiate *cls* with **kwargs** and register the instance.""" + return self.register(name, cls(**kwargs)) + + def register_fn( + self, + name: str, + fn: Callable[[ComponentInput], Any], + description: str = "", + ) -> "ComponentRegistry": + """Wrap *fn* in a :class:`FunctionComponent` and register it.""" + return self.register(name, FunctionComponent(fn, name=name, description=description)) + + def component(self, name: str, description: str = "") -> Callable[[Type[Component]], Type[Component]]: + """ + Class decorator that registers a component under *name*:: + + @registry.component("my_step") + class MyStep(Component): + ... + """ + def _decorator(cls: Type[Component]) -> Type[Component]: + if description: + cls.description = description + self.register_class(name, cls) + return cls + + return _decorator + + def deregister(self, name: str) -> bool: + """ + Remove the component registered as *name*. + + Calls :meth:`Component.teardown` if found. Returns ``True`` when + a component was removed. + """ + component = self._components.pop(name, None) + if component is not None: + component.teardown() + return True + return False + + # ------------------------------------------------------------------ + # Execution + # ------------------------------------------------------------------ + + def run(self, name: str, input_: ComponentInput) -> ComponentOutput: + """ + Execute the component registered as *name*. + + Validates the input first; returns an error output if the component + is not found or validation fails. + """ + component = self._components.get(name) + if component is None: + return ComponentOutput( + component=name, + result=None, + success=False, + error=f"Component '{name}' not registered", + ) + + valid, reason = component.validate(input_) + if not valid: + return ComponentOutput( + component=name, + result=None, + success=False, + error=f"Validation failed: {reason}", + ) + + try: + return component.execute(input_) + except Exception: + return ComponentOutput( + component=name, + result=None, + success=False, + error=traceback.format_exc(), + ) + + def run_pipeline( + self, + steps: List[str], + initial_payload: Any, + metadata: Optional[Dict[str, Any]] = None, + ) -> List[ComponentOutput]: + """ + Run a sequential pipeline of named components. + + Each step's ``result`` is forwarded as the ``payload`` of the next + step's :class:`ComponentInput`. Execution stops on the first + failure encountered. + + Args: + steps: Ordered list of registered component names. + initial_payload: Payload for the first step. + metadata: Optional metadata forwarded to every step. + + Returns: + List of :class:`ComponentOutput` objects, one per step executed. + The list may be shorter than *steps* if a failure occurred early. + """ + outputs: List[ComponentOutput] = [] + payload = initial_payload + meta = metadata or {} + + for step in steps: + inp = ComponentInput(name=step, payload=payload, metadata=meta) + out = self.run(step, inp) + outputs.append(out) + if not out.success: + break + payload = out.result + + return outputs + + # ------------------------------------------------------------------ + # Introspection + # ------------------------------------------------------------------ + + def names(self) -> List[str]: + """Return registered component names.""" + return list(self._components.keys()) + + def get(self, name: str) -> Optional[Component]: + """Return the component registered as *name*, or ``None``.""" + return self._components.get(name) + + def describe(self, name: str) -> str: + """Return the description of the component registered as *name*.""" + component = self._components.get(name) + if component is None: + return f"(no component named '{name}')" + return component.description or "(no description)" + + def describe_all(self) -> Dict[str, str]: + """Return a mapping of name → description for every registered component.""" + return {name: (c.description or "") for name, c in self._components.items()} + + def __contains__(self, name: str) -> bool: + return name in self._components + + def __len__(self) -> int: + return len(self._components) + + def __repr__(self) -> str: + return f"ComponentRegistry({list(self._components.keys())!r})" diff --git a/src/automation/engine.py b/src/automation/engine.py new file mode 100644 index 0000000..a62a526 --- /dev/null +++ b/src/automation/engine.py @@ -0,0 +1,399 @@ +""" +Serverless automation engine for Pmaster AI Operator. + +The engine is *serverless* in the sense that it is purely function-driven: +there is no persistent background thread or network listener. Automation +runs are triggered by explicit calls, making the engine safe to use inside +FaaS environments (AWS Lambda, Google Cloud Functions, etc.) as well as +in-process within any Python application. + +Architecture:: + + ┌──────────────────────────────────────────────────────┐ + │ AutomationEngine │ + │ │ + │ VariableRegistry ←── GeneratorVariable(s) │ + │ │ │ + │ ComponentRegistry ←── Component / FunctionComponent│ + │ │ │ + │ Trigger dispatcher │ + │ │ │ + │ RunResult / RunHistory │ + └──────────────────────────────────────────────────────┘ +""" + +from __future__ import annotations + +import logging +import traceback +import uuid +from dataclasses import dataclass, field +from datetime import datetime +from enum import Enum +from typing import Any, Callable, Dict, List, Optional + +from .components import ( + Component, + ComponentInput, + ComponentOutput, + ComponentRegistry, + FunctionComponent, +) +from .variables import GeneratorVariable, VariableRegistry + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Data structures +# --------------------------------------------------------------------------- + +class RunStatus(Enum): + PENDING = "pending" + RUNNING = "running" + SUCCESS = "success" + FAILED = "failed" + SKIPPED = "skipped" + + +@dataclass +class TriggerEvent: + """Represents an event that can trigger an automation run.""" + event_type: str + payload: Any = None + metadata: Dict[str, Any] = field(default_factory=dict) + timestamp: datetime = field(default_factory=datetime.now) + event_id: str = field(default_factory=lambda: uuid.uuid4().hex[:12]) + + +@dataclass +class RunResult: + """Result of a single automation run.""" + run_id: str + trigger: TriggerEvent + status: RunStatus + outputs: List[ComponentOutput] = field(default_factory=list) + started_at: Optional[datetime] = None + finished_at: Optional[datetime] = None + error: Optional[str] = None + + @property + def duration_ms(self) -> Optional[float]: + """Wall-clock duration in milliseconds, or ``None`` if not finished.""" + if self.started_at and self.finished_at: + delta = self.finished_at - self.started_at + return delta.total_seconds() * 1000 + return None + + def to_dict(self) -> Dict[str, Any]: + return { + "run_id": self.run_id, + "trigger": { + "event_type": self.trigger.event_type, + "event_id": self.trigger.event_id, + "timestamp": self.trigger.timestamp.isoformat(), + }, + "status": self.status.value, + "outputs": [ + { + "component": o.component, + "success": o.success, + "error": o.error, + } + for o in self.outputs + ], + "started_at": self.started_at.isoformat() if self.started_at else None, + "finished_at": self.finished_at.isoformat() if self.finished_at else None, + "duration_ms": self.duration_ms, + "error": self.error, + } + + +# --------------------------------------------------------------------------- +# Automation definition +# --------------------------------------------------------------------------- + +@dataclass +class AutomationDefinition: + """ + Declarative description of an automation. + + An automation is defined by: + + * A *name* (unique identifier). + * A list of *triggers*: event type strings that activate the automation. + * An ordered list of *steps*: component names executed in sequence. + * Optional *variables*: names from the :class:`VariableRegistry` that are + injected into each step's ``metadata`` before execution. + """ + name: str + triggers: List[str] + steps: List[str] + variables: List[str] = field(default_factory=list) + description: str = "" + enabled: bool = True + + +# --------------------------------------------------------------------------- +# Engine +# --------------------------------------------------------------------------- + +class AutomationEngine: + """ + Serverless automation engine. + + The engine binds together a :class:`ComponentRegistry` (what to run) and a + :class:`VariableRegistry` (runtime data), wires up event-based dispatch, + and keeps a lightweight run history. + + Usage:: + + engine = AutomationEngine() + + # Register a component + engine.components.register_fn("echo", lambda inp: inp.payload) + + # Define an automation + engine.define(AutomationDefinition( + name="on_hello", + triggers=["hello"], + steps=["echo"], + )) + + # Fire an event + result = engine.trigger(TriggerEvent("hello", payload="world")) + print(result[0].status) # RunStatus.SUCCESS + """ + + def __init__(self) -> None: + self.components = ComponentRegistry() + self.variables = VariableRegistry() + + self._automations: Dict[str, AutomationDefinition] = {} + self._history: List[RunResult] = [] + + # Middleware hooks: callables invoked before/after each run + self._before_run: List[Callable[[RunResult, TriggerEvent], None]] = [] + self._after_run: List[Callable[[RunResult], None]] = [] + + # ------------------------------------------------------------------ + # Component & variable shortcuts + # ------------------------------------------------------------------ + + def component(self, name: str, description: str = "") -> Callable: + """Decorator that registers a :class:`Component` subclass.""" + return self.components.component(name, description) + + def register_fn( + self, + name: str, + fn: Callable[[ComponentInput], Any], + description: str = "", + ) -> "AutomationEngine": + """Register a plain callable as a component. Returns self.""" + self.components.register_fn(name, fn, description) + return self + + def define_variable( + self, + name: str, + factory: Callable, + ) -> GeneratorVariable: + """Create a generator variable and add it to the variable registry.""" + return self.variables.define(name, factory) + + # ------------------------------------------------------------------ + # Automation registration + # ------------------------------------------------------------------ + + def define(self, automation: AutomationDefinition) -> "AutomationEngine": + """Register an automation definition. Returns self for chaining.""" + self._automations[automation.name] = automation + return self + + def undefine(self, name: str) -> bool: + """Remove an automation by name. Returns ``True`` if it existed.""" + return self._automations.pop(name, None) is not None + + def enable(self, name: str) -> None: + """Enable a previously disabled automation.""" + if name in self._automations: + self._automations[name].enabled = True + + def disable(self, name: str) -> None: + """Disable an automation without removing it.""" + if name in self._automations: + self._automations[name].enabled = False + + # ------------------------------------------------------------------ + # Middleware + # ------------------------------------------------------------------ + + def before_run(self, fn: Callable[[RunResult, TriggerEvent], None]) -> Callable: + """Register a hook called just before each automation run starts.""" + self._before_run.append(fn) + return fn + + def after_run(self, fn: Callable[[RunResult], None]) -> Callable: + """Register a hook called just after each automation run finishes.""" + self._after_run.append(fn) + return fn + + # ------------------------------------------------------------------ + # Triggering + # ------------------------------------------------------------------ + + def trigger(self, event: TriggerEvent) -> List[RunResult]: + """ + Dispatch *event* to all matching, enabled automations. + + Returns the list of :class:`RunResult` objects produced (one per + matching automation). + """ + results: List[RunResult] = [] + + for automation in self._automations.values(): + if not automation.enabled: + continue + if event.event_type not in automation.triggers: + continue + result = self._run(automation, event) + results.append(result) + + return results + + def trigger_type(self, event_type: str, payload: Any = None, **metadata: Any) -> List[RunResult]: + """Convenience wrapper: build a :class:`TriggerEvent` and dispatch it.""" + event = TriggerEvent(event_type=event_type, payload=payload, metadata=metadata) + return self.trigger(event) + + # ------------------------------------------------------------------ + # Internal execution + # ------------------------------------------------------------------ + + def _run(self, automation: AutomationDefinition, event: TriggerEvent) -> RunResult: + """Execute a single automation in response to *event*.""" + run_id = f"run-{uuid.uuid4().hex[:12]}" + result = RunResult( + run_id=run_id, + trigger=event, + status=RunStatus.PENDING, + ) + + # Before-run hooks + for hook in self._before_run: + try: + hook(result, event) + except Exception: + logger.exception("before_run hook %r raised an exception", hook) + + result.started_at = datetime.now() + result.status = RunStatus.RUNNING + + try: + # Build variable snapshot for this run + var_snapshot = self._snapshot_variables(automation.variables) + + # Merge event metadata with variable snapshot + run_meta: Dict[str, Any] = {**event.metadata, "variables": var_snapshot} + + # Execute the component pipeline + outputs = self.components.run_pipeline( + steps=automation.steps, + initial_payload=event.payload, + metadata=run_meta, + ) + result.outputs = outputs + + # Determine overall status + if outputs and not outputs[-1].success: + result.status = RunStatus.FAILED + result.error = outputs[-1].error + else: + result.status = RunStatus.SUCCESS + + except Exception as exc: + result.status = RunStatus.FAILED + result.error = f"{type(exc).__name__}: {exc}" + result.outputs.append( + ComponentOutput( + component="__engine__", + result=None, + success=False, + error=traceback.format_exc(), + ) + ) + + result.finished_at = datetime.now() + self._history.append(result) + + # After-run hooks + for hook in self._after_run: + try: + hook(result) + except Exception: + logger.exception("after_run hook %r raised an exception", hook) + + return result + + def _snapshot_variables(self, names: List[str]) -> Dict[str, Any]: + """ + Peek at the next value of each named variable. + + Values are peeked (not consumed) so the same run can be replayed + without advancing the generators. + """ + snapshot: Dict[str, Any] = {} + for name in names: + snapshot[name] = self.variables.require(name).peek() + return snapshot + + # ------------------------------------------------------------------ + # History & introspection + # ------------------------------------------------------------------ + + def history(self, limit: Optional[int] = None) -> List[RunResult]: + """Return run history, most-recent first, optionally limited.""" + runs = list(reversed(self._history)) + return runs[:limit] if limit is not None else runs + + def automations(self) -> Dict[str, AutomationDefinition]: + """Return a copy of the registered automations map.""" + return dict(self._automations) + + def stats(self) -> Dict[str, Any]: + """Return aggregate run statistics.""" + total = len(self._history) + by_status: Dict[str, int] = {} + for run in self._history: + key = run.status.value + by_status[key] = by_status.get(key, 0) + 1 + + return { + "total_runs": total, + "automations": len(self._automations), + "components": len(self.components), + "variables": len(self.variables), + "by_status": by_status, + } + + def __repr__(self) -> str: + return ( + f"AutomationEngine(" + f"automations={len(self._automations)}, " + f"components={len(self.components)}, " + f"variables={len(self.variables)})" + ) + + +# --------------------------------------------------------------------------- +# Module-level default engine +# --------------------------------------------------------------------------- + +#: Default engine instance – use this for simple single-engine setups. +default_engine = AutomationEngine() + + +def trigger(event_type: str, payload: Any = None, **metadata: Any) -> List[RunResult]: + """Trigger *event_type* on the module-level :data:`default_engine`.""" + return default_engine.trigger_type(event_type, payload=payload, **metadata) diff --git a/src/automation/variables.py b/src/automation/variables.py new file mode 100644 index 0000000..71eff9b --- /dev/null +++ b/src/automation/variables.py @@ -0,0 +1,244 @@ +""" +Generator-backed variable system for serverless automation. + +Variables support lazy evaluation via Python generators, allowing values to be +computed on demand, streamed, or composed into pipelines without eager loading. +""" + +from typing import Any, Callable, Generator, Iterable, Iterator, Optional, TypeVar + +T = TypeVar("T") + + +class GeneratorVariable: + """ + A variable whose value is produced by a Python generator. + + Values are computed lazily: each call to ``next()`` or iteration + yields the next value from the underlying generator factory. + + Examples:: + + counter = GeneratorVariable(lambda: (i for i in range(10))) + print(counter.next()) # 0 + print(counter.next()) # 1 + + seq = GeneratorVariable.from_iterable([10, 20, 30]) + for v in seq: + print(v) + """ + + def __init__(self, factory: Callable[[], Generator]) -> None: + """ + Args: + factory: Zero-argument callable that returns a fresh generator + each time the variable is reset or first accessed. + """ + self._factory = factory + self._gen: Optional[Iterator] = None + self._peeked: bool = False + self._peeked_value: Any = None + + # ------------------------------------------------------------------ + # Core interface + # ------------------------------------------------------------------ + + def _ensure_gen(self) -> Iterator: + if self._gen is None: + self._gen = self._factory() + return self._gen + + def next(self, default: Any = None) -> Any: + """Return the next value, or *default* when the generator is exhausted.""" + if self._peeked: + self._peeked = False + return self._peeked_value + try: + return next(self._ensure_gen()) + except StopIteration: + return default + + def reset(self) -> "GeneratorVariable": + """Restart the generator from the beginning.""" + self._gen = self._factory() + self._peeked = False + self._peeked_value = None + return self + + def peek(self) -> Any: + """ + Return the next value without advancing the generator. + + Consecutive calls to ``peek()`` return the same value until + ``next()`` or iteration consumes it. Returns ``None`` when + the generator is exhausted. + """ + if self._peeked: + return self._peeked_value + try: + self._peeked_value = next(self._ensure_gen()) + self._peeked = True + return self._peeked_value + except StopIteration: + return None + + def collect(self) -> list: + """Drain all remaining values into a list.""" + values: list[Any] = [] + if self._peeked: + values.append(self._peeked_value) + self._peeked = False + self._peeked_value = None + values.extend(list(self._ensure_gen())) + return values + + # ------------------------------------------------------------------ + # Composition helpers + # ------------------------------------------------------------------ + + def map(self, fn: Callable[[Any], Any]) -> "GeneratorVariable": + """Return a new variable that applies *fn* to each value.""" + source_factory = self._factory + + def _mapped(): + for v in source_factory(): + yield fn(v) + + return GeneratorVariable(_mapped) + + def filter(self, predicate: Callable[[Any], bool]) -> "GeneratorVariable": + """Return a new variable that yields only values matching *predicate*.""" + source_factory = self._factory + + def _filtered(): + for v in source_factory(): + if predicate(v): + yield v + + return GeneratorVariable(_filtered) + + def take(self, n: int) -> "GeneratorVariable": + """Return a new variable limited to the first *n* values.""" + source_factory = self._factory + + def _taken(): + for i, v in enumerate(source_factory()): + if i >= n: + break + yield v + + return GeneratorVariable(_taken) + + def chain(self, other: "GeneratorVariable") -> "GeneratorVariable": + """Concatenate this variable with *other*.""" + a_factory = self._factory + b_factory = other._factory + + def _chained(): + yield from a_factory() + yield from b_factory() + + return GeneratorVariable(_chained) + + # ------------------------------------------------------------------ + # Convenience constructors + # ------------------------------------------------------------------ + + @classmethod + def from_iterable(cls, iterable: Iterable) -> "GeneratorVariable": + """Create a variable that replays a fixed iterable on each reset.""" + items = list(iterable) + return cls(lambda: iter(items)) + + @classmethod + def from_value(cls, value: Any) -> "GeneratorVariable": + """Create a variable that yields a single value once.""" + return cls(lambda: iter([value])) + + @classmethod + def constant(cls, value: Any) -> "GeneratorVariable": + """Create an infinite variable that always yields *value*.""" + + def _infinite(): + while True: + yield value + + return cls(_infinite) + + @classmethod + def counter(cls, start: int = 0, step: int = 1) -> "GeneratorVariable": + """Create an infinite counter variable.""" + + def _count(): + n = start + while True: + yield n + n += step + + return cls(_count) + + # ------------------------------------------------------------------ + # Python protocol support + # ------------------------------------------------------------------ + + def __iter__(self) -> Iterator: + return self + + def __next__(self) -> Any: + if self._peeked: + self._peeked = False + return self._peeked_value + return next(self._ensure_gen()) + + def __repr__(self) -> str: + return f"GeneratorVariable(factory={self._factory!r})" + + +class VariableRegistry: + """ + Named registry of :class:`GeneratorVariable` instances. + + Allows automation components to share and look up variables by name. + """ + + def __init__(self) -> None: + self._vars: dict[str, GeneratorVariable] = {} + + def register(self, name: str, variable: GeneratorVariable) -> "VariableRegistry": + """Register *variable* under *name*. Returns self for chaining.""" + self._vars[name] = variable + return self + + def define(self, name: str, factory: Callable[[], Generator]) -> GeneratorVariable: + """Create a :class:`GeneratorVariable` from *factory* and register it.""" + var = GeneratorVariable(factory) + self._vars[name] = var + return var + + def get(self, name: str) -> Optional[GeneratorVariable]: + """Return the variable registered as *name*, or ``None``.""" + return self._vars.get(name) + + def require(self, name: str) -> GeneratorVariable: + """Return the variable registered as *name*; raise ``KeyError`` if absent.""" + if name not in self._vars: + raise KeyError(f"Variable '{name}' not found in registry") + return self._vars[name] + + def names(self) -> list[str]: + """Return the list of registered variable names.""" + return list(self._vars.keys()) + + def reset_all(self) -> None: + """Reset every registered variable to its initial state.""" + for var in self._vars.values(): + var.reset() + + def __contains__(self, name: str) -> bool: + return name in self._vars + + def __len__(self) -> int: + return len(self._vars) + + def __repr__(self) -> str: + return f"VariableRegistry({list(self._vars.keys())!r})"