Skip to content

feat: serverless Python automation with pluggable components and generator variables#1

Open
Copilot wants to merge 6 commits into
developfrom
copilot/complete-serverless-py-automation
Open

feat: serverless Python automation with pluggable components and generator variables#1
Copilot wants to merge 6 commits into
developfrom
copilot/complete-serverless-py-automation

Conversation

Copilot AI commented Jun 29, 2026

Copy link
Copy Markdown

Adds src/automation/ — a pure-Python, event-driven automation subsystem with no runtime dependencies beyond stdlib.

Variables (variables.py)

  • GeneratorVariable: lazy value production via generator factory; idempotent peek(), reset(), and composition via .map() / .filter() / .take() / .chain()
  • VariableRegistry: named lookup store shared across automation steps

Components (components.py)

  • Component (ABC): execute(input_) → ComponentOutput contract with optional validate / setup / teardown hooks
  • FunctionComponent: wraps any callable as a component without subclassing
  • ComponentRegistry: name-based registration, run() with pre-execution validation, run_pipeline() for sequential chaining

Engine (engine.py)

  • AutomationEngine: purely call-driven (no threads, no listener) — safe for FaaS environments
  • AutomationDefinition binds trigger event types → component pipeline + variable references
  • Variables are peeked into run metadata (not consumed), keeping runs reproducible
  • Before/after middleware hooks; hook exceptions are logged, not swallowed
  • stats() / history() for observability
engine = AutomationEngine()

engine.register_fn("upper", lambda inp: inp.payload.upper())
engine.define_variable("seq", lambda: (f"id-{i}" for i in range(100)))

engine.define(AutomationDefinition(
    name="shout", triggers=["msg.received"], steps=["upper"], variables=["seq"]
))

results = engine.trigger_type("msg.received", payload="hello")
# results[0].status == RunStatus.SUCCESS
# results[0].outputs[-1].result == "HELLO"

@Pmaster-dev Pmaster-dev left a comment

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Pmaster-dev Pmaster-dev marked this pull request as ready for review June 29, 2026 21:57
Copilot AI review requested due to automatic review settings June 29, 2026 21:57

Copilot AI commented Jun 29, 2026

Copy link
Copy Markdown
Author

@copilot

I see you mentioned me, but I'm not sure what you need. Could you please clarify what you'd like me to do? For example:

  • Is there something specific about the automation code you'd like me to change?
  • Do you have questions about the implementation?
  • Would you like me to add tests, documentation, or additional features?

Please let me know how I can help!

@qodo-code-review

Copy link
Copy Markdown

PR Summary by Qodo

Add serverless automation engine with pluggable components and generator variables

✨ Enhancement 🕐 40+ Minutes

Grey Divider

AI Description

• Introduce a stdlib-only automation subsystem with event-triggered component pipelines.
• Add generator-backed variables with idempotent peek/reset and functional composition helpers.
• Provide middleware hooks and basic run history/stats for observability in FaaS contexts.
Diagram

graph TD
  E{{"TriggerEvent"}} --> ENG(["AutomationEngine"]) --> VR["VariableRegistry"] --> GV["Generator vars (peek)" ] --> CR["ComponentRegistry"] --> C["Component pipeline"] --> RR["RunResult"] --> H[("Run history")]
  subgraph Legend
    direction LR
    _evt{{"Event"}} ~~~ _svc(["Engine"]) ~~~ _reg["Registry"] ~~~ _db[("History")]
  end
Loading
High-Level Assessment

The following are alternative approaches to this PR:

1. Adopt an existing workflow/orchestration library
  • ➕ Mature scheduling/retries/state models out of the box
  • ➕ Established patterns for observability and persistence
  • ➖ Violates the stdlib-only/no-runtime-deps constraint
  • ➖ Typically assumes a long-running worker/runtime, less FaaS-friendly
2. Make the engine async-first (async/await components)
  • ➕ Natural fit for I/O-heavy automations
  • ➕ Can improve throughput without threads
  • ➖ Raises complexity for callers and component authors
  • ➖ Async does not automatically help in short-lived FaaS invocations
3. Model variables as explicit snapshot inputs (no generator state)
  • ➕ Purely deterministic runs with no hidden state
  • ➕ Simpler semantics for replay/testing
  • ➖ Loses lazy streaming and generator composability benefits
  • ➖ Requires callers to precompute/provide values per-trigger

Recommendation: Given the stated constraints (stdlib-only, safe in serverless/FaaS, call-driven), the PR’s approach is a good fit: small surface area, no background workers, and flexible composition via registries. The main follow-up recommendation is to add targeted tests around variable peek/next/reset semantics, pipeline short-circuiting on failure, and hook-exception logging behavior.

Files changed (4) +1030 / -0

Enhancement (3) +960 / -0
components.pyImplement pluggable components and a name-based execution registry +321/-0

Implement pluggable components and a name-based execution registry

• Adds the Component ABC contract with optional validate/setup/teardown hooks, typed input/output envelopes, and convenience helpers for success/error outputs. Implements FunctionComponent to adapt plain callables, plus ComponentRegistry to register/deregister, validate, execute single components, and run sequential pipelines with early-stop on failure.

src/automation/components.py

engine.pyAdd serverless AutomationEngine with triggers, middleware, and run history +401/-0

Add serverless AutomationEngine with triggers, middleware, and run history

• Implements AutomationEngine and AutomationDefinition to bind trigger event types to component pipelines with variable references. Adds TriggerEvent/RunResult data models, before/after run hooks with exception logging, variable snapshotting via peek (non-consuming), and basic observability via history() and stats(); also includes a module-level default_engine and trigger() convenience.

src/automation/engine.py

variables.pyAdd generator-backed variables with peek/reset and composition operators +238/-0

Add generator-backed variables with peek/reset and composition operators

• Introduces GeneratorVariable for lazy generator-factory values with idempotent peek(), reset(), next()/iteration support, and functional composition (map/filter/take/chain). Adds VariableRegistry for named storage, definition helpers, and bulk reset.

src/automation/variables.py

Documentation (1) +70 / -0
__init__.pyAdd public automation package API and quick-start docs +70/-0

Add public automation package API and quick-start docs

• Introduces the automation package entrypoint with a module docstring quick-start and re-exports of engine, component, and variable primitives via __all__ for a stable import surface.

src/automation/init.py

@qodo-code-review

qodo-code-review Bot commented Jun 29, 2026

Copy link
Copy Markdown

Code Review by Qodo

🐞 Bugs (4) 📘 Rule violations (0) 📎 Requirement gaps (0) 🎨 UX issues (0) 🔗 Cross-repo conflicts (0) 📜 Skill insights (0)

Grey Divider


Action required

1. Execute exceptions escape registry 🐞 Bug ☼ Reliability
Description
ComponentRegistry.run() calls Component.execute() without try/except, so exceptions from Component
subclasses escape and abort execution. In AutomationEngine, these get recorded as a generic
"__engine__" failure, obscuring which component actually crashed.
Code

src/automation/components.py[R228-254]

+    def run(self, name: str, input_: ComponentInput) -> ComponentOutput:
+        """
+        Execute the component registered as *name*.
+
+        Validates the input first; returns an error output if the component
+        is not found or validation fails.
+        """
+        component = self._components.get(name)
+        if component is None:
+            return ComponentOutput(
+                component=name,
+                result=None,
+                success=False,
+                error=f"Component '{name}' not registered",
+            )
+
+        valid, reason = component.validate(input_)
+        if not valid:
+            return ComponentOutput(
+                component=name,
+                result=None,
+                success=False,
+                error=f"Validation failed: {reason}",
+            )
+
+        return component.execute(input_)
+
Evidence
ComponentRegistry.run() returns component.execute(input_) directly, so any raised exception
bubbles up. AutomationEngine._run() catches such exceptions and appends a ComponentOutput
labeled __engine__, which removes the failing component’s name from the error output.

src/automation/components.py[228-254]
src/automation/engine.py[293-325]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`ComponentRegistry.run()` does not guard `component.execute()`; if a custom `Component.execute()` raises, the exception propagates.
- When invoked through `AutomationEngine`, the exception is caught at the engine layer and recorded as a `ComponentOutput(component="__engine__", ...)`, which loses the identity of the failing component.

## Issue Context
- `FunctionComponent.execute()` already catches exceptions and returns an error output, but custom `Component` subclasses are not protected.
- `run_pipeline()` relies on `run()` and expects to stop on failure; an exception is a harder failure mode than a structured `ComponentOutput(success=False)`.

## Fix Focus Areas
- src/automation/components.py[228-254]
- src/automation/engine.py[293-325]

## Suggested fix
Wrap `component.execute(input_)` in `ComponentRegistry.run()` with `try/except` and return a `ComponentOutput` for that component name on exception (ideally including traceback text or at least exception type/message). This preserves per-step attribution and avoids escalating normal component failures into engine-level generic errors.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


2. Peek drops collected value ✓ Resolved 🐞 Bug ≡ Correctness
Description
GeneratorVariable.peek() advances the underlying generator and caches the value, but collect()
drains only the generator and ignores any cached peeked value. If peek() is called before collect(),
the peeked element is silently omitted from the collected list.
Code

src/automation/variables.py[R68-87]

+    def peek(self) -> Any:
+        """
+        Return the next value without advancing the generator.
+
+        Consecutive calls to ``peek()`` return the same value until
+        ``next()`` or iteration consumes it.  Returns ``None`` when
+        the generator is exhausted.
+        """
+        if self._peeked:
+            return self._peeked_value
+        try:
+            self._peeked_value = next(self._ensure_gen())
+            self._peeked = True
+            return self._peeked_value
+        except StopIteration:
+            return None
+
+    def collect(self) -> list:
+        """Drain all remaining values into a list."""
+        return list(self._ensure_gen())
Evidence
peek() consumes one item from the generator into _peeked_value, but collect() drains from the
generator’s current position and never emits the cached value, so the peeked element is lost from
the collection output.

src/automation/variables.py[68-88]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`GeneratorVariable.peek()` advances the generator and stores the next value in `_peeked_value`, but `collect()` currently returns `list(self._ensure_gen())` and does not include `_peeked_value` when `_peeked` is set. This causes a silent off-by-one/data-loss bug when callers mix `peek()` and `collect()`.

## Issue Context
- `peek()` implements lookahead by calling `next()` on the generator and caching the result.
- `next()`/`__next__()` return the cached value when `_peeked` is set, but `collect()` bypasses that logic.

## Fix Focus Areas
- src/automation/variables.py[68-88]

## Suggested fix
Update `collect()` to account for the peek cache, e.g.:
- If `_peeked` is True, prepend `_peeked_value` to the collected list and clear the peek state before draining the generator.
- Consider clearing `_peeked_value` after consumption (in `next()`/`__next__()`) to avoid retaining large objects unnecessarily.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools



Remediation recommended

3. Overwrite skips teardown 🐞 Bug ☼ Reliability
Description
ComponentRegistry.register() overwrites an existing component for the same name without calling
teardown() on the old instance. Re-registering components can leak resources or leave stale side
effects because teardown() is only called by deregister().
Code

src/automation/components.py[R171-180]

+    def register(self, name: str, component: Component) -> "ComponentRegistry":
+        """
+        Register *component* under *name*.
+
+        Calls :meth:`Component.setup` and returns *self* for chaining.
+        """
+        component.name = component.name or name
+        component.setup()
+        self._components[name] = component
+        return self
Evidence
register() assigns into _components[name] without checking for an existing entry, while
teardown() is only invoked on explicit deregister()—so replacements never clean up the previous
component.

src/automation/components.py[171-180]
src/automation/components.py[211-222]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`ComponentRegistry.register()` replaces `self._components[name]` unconditionally and does not teardown any previously-registered component under the same name.

## Issue Context
- Components may allocate resources in `setup()` and release them in `teardown()`.
- `teardown()` is called only in `deregister()`, not during replacement.

## Fix Focus Areas
- src/automation/components.py[171-180]
- src/automation/components.py[211-222]

## Suggested fix
On `register(name, component)`:
- If `name` already exists, either:
 - call `old.teardown()` before overwriting, or
 - raise a `ValueError` (forcing explicit `deregister()` first).
- Consider ordering: teardown old first, then setup new, then store new, so partial failures don’t leave registry in an inconsistent lifecycle state.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


4. Missing variables ignored 🐞 Bug ≡ Correctness
Description
AutomationEngine._snapshot_variables() silently skips variable names that are referenced by an
automation but not registered. This hides configuration errors and can lead to components running
with incomplete metadata.
Code

src/automation/engine.py[R339-351]

+    def _snapshot_variables(self, names: List[str]) -> Dict[str, Any]:
+        """
+        Peek at the next value of each named variable.
+
+        Values are peeked (not consumed) so the same run can be replayed
+        without advancing the generators.
+        """
+        snapshot: Dict[str, Any] = {}
+        for name in names:
+            var = self.variables.get(name)
+            if var is not None:
+                snapshot[name] = var.peek()
+        return snapshot
Evidence
The engine constructs its run metadata from _snapshot_variables(), but missing variable references
are silently dropped because it uses get() and only adds entries when var is not None. The
registry has a require() API that would catch this, but it’s not used here.

src/automation/engine.py[339-351]
src/automation/variables.py[212-220]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`AutomationEngine._snapshot_variables()` uses `VariableRegistry.get()` and omits missing variables with no error/warning, even if the automation explicitly references them.

## Issue Context
- `VariableRegistry` already provides `require()` which raises on missing variables.
- Engine passes the snapshot into run metadata for each component step.

## Fix Focus Areas
- src/automation/engine.py[339-351]
- src/automation/variables.py[212-220]

## Suggested fix
Decide on desired behavior and enforce it consistently:
- **Fail fast**: use `self.variables.require(name)` and convert missing-variable errors into a failed `RunResult` with a clear message, OR
- **Warn loudly**: log a warning and include the key with a sentinel (e.g. `None` plus an error field) so downstream components can detect the misconfiguration.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


5. Unbounded run history growth 🐞 Bug ➹ Performance
Description
AutomationEngine appends every RunResult to an in-memory list without any eviction, and the
module-level default_engine persists for the process lifetime. In warm serverless containers or
long-lived processes, history can grow without bound and increase memory usage over time.
Code

src/automation/engine.py[R327-329]

+        result.finished_at = datetime.now()
+        self._history.append(result)
+
Evidence
The engine unconditionally appends each run to _history and never truncates it. Because
default_engine is a module-level singleton, that unbounded list can persist for the full lifetime
of the Python process.

src/automation/engine.py[168-178]
src/automation/engine.py[327-329]
src/automation/engine.py[395-401]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`AutomationEngine` stores all `RunResult` entries in `_history` indefinitely (`append` only). With the module-level `default_engine`, that history can persist across many invocations in a single process.

## Issue Context
- This code is explicitly positioned for FaaS/serverless usage where containers can be reused (warm starts).
- `_history` contains `outputs` which may retain large payloads depending on component behavior.

## Fix Focus Areas
- src/automation/engine.py[168-178]
- src/automation/engine.py[327-360]
- src/automation/engine.py[395-401]

## Suggested fix
Add a bounded-history strategy, e.g.:
- `AutomationEngine(max_history: int | None = 1000)` and after appending, trim the list.
- Provide `clear_history()` for callers.
- Optionally default `default_engine` to a conservative `max_history` to avoid surprise memory growth.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


Grey Divider

Qodo Logo

Comment thread src/automation/variables.py Outdated
Comment on lines +228 to +254
def run(self, name: str, input_: ComponentInput) -> ComponentOutput:
"""
Execute the component registered as *name*.

Validates the input first; returns an error output if the component
is not found or validation fails.
"""
component = self._components.get(name)
if component is None:
return ComponentOutput(
component=name,
result=None,
success=False,
error=f"Component '{name}' not registered",
)

valid, reason = component.validate(input_)
if not valid:
return ComponentOutput(
component=name,
result=None,
success=False,
error=f"Validation failed: {reason}",
)

return component.execute(input_)

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

3. Execute exceptions escape registry 🐞 Bug ☼ Reliability

ComponentRegistry.run() calls Component.execute() without try/except, so exceptions from Component
subclasses escape and abort execution. In AutomationEngine, these get recorded as a generic
"__engine__" failure, obscuring which component actually crashed.
Agent Prompt
## Issue description
`ComponentRegistry.run()` does not guard `component.execute()`; if a custom `Component.execute()` raises, the exception propagates.
- When invoked through `AutomationEngine`, the exception is caught at the engine layer and recorded as a `ComponentOutput(component="__engine__", ...)`, which loses the identity of the failing component.

## Issue Context
- `FunctionComponent.execute()` already catches exceptions and returns an error output, but custom `Component` subclasses are not protected.
- `run_pipeline()` relies on `run()` and expects to stop on failure; an exception is a harder failure mode than a structured `ComponentOutput(success=False)`.

## Fix Focus Areas
- src/automation/components.py[228-254]
- src/automation/engine.py[293-325]

## Suggested fix
Wrap `component.execute(input_)` in `ComponentRegistry.run()` with `try/except` and return a `ComponentOutput` for that component name on exception (ideally including traceback text or at least exception type/message). This preserves per-step attribution and avoids escalating normal component failures into engine-level generic errors.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

Copilot AI left a comment

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

Introduces a new src/automation/ Python package implementing a serverless, call-driven automation subsystem that composes registered “components” into pipelines and injects generator-backed “variables” into run metadata.

Changes:

  • Added generator-backed variables (GeneratorVariable) with composition helpers and a shared VariableRegistry.
  • Added pluggable components (Component, FunctionComponent) and a ComponentRegistry with sequential pipeline execution.
  • Added an AutomationEngine that dispatches trigger events to registered automations, snapshots variables, executes pipelines, and tracks run history/stats.

Reviewed changes

Copilot reviewed 4 out of 8 changed files in this pull request and generated 5 comments.

File Description
src/automation/variables.py Implements generator-backed variables, peeking/resetting, composition helpers, and a variable registry.
src/automation/components.py Defines the component contract and a registry for registering/running components and pipelines.
src/automation/engine.py Implements the automation engine, trigger dispatch, variable snapshotting, run execution, and history/stats.
src/automation/init.py Exposes the public API and provides a package-level quick-start example.

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread src/automation/variables.py Outdated
Comment on lines +171 to +180
def register(self, name: str, component: Component) -> "ComponentRegistry":
"""
Register *component* under *name*.

Calls :meth:`Component.setup` and returns *self* for chaining.
"""
component.name = component.name or name
component.setup()
self._components[name] = component
return self
Comment thread src/automation/components.py Outdated
Comment on lines +130 to +135
def execute(self, input_: ComponentInput) -> ComponentOutput:
try:
result = self._fn(input_)
return self._ok(result)
except Exception as exc:
return self._err(str(exc))
Comment thread src/automation/engine.py
Comment on lines +346 to +351
snapshot: Dict[str, Any] = {}
for name in names:
var = self.variables.get(name)
if var is not None:
snapshot[name] = var.peek()
return snapshot
Comment thread src/automation/__init__.py Outdated
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
@qodo-code-review

Copy link
Copy Markdown

CI Feedback 🧐

A test triggered by this PR failed. Here is an AI-generated analysis of the failure:

Action: pyre

Failed stage: Set up job [❌]

Failed test name: ""

Failure summary:

The workflow failed because it uses a deprecated GitHub Action: actions/upload-artifact@v2.
GitHub
automatically fails requests that reference actions/upload-artifact v1 or v2 due to the announced
deprecation (see log line 32 and the linked deprecation notice).

Relevant error logs:
1:  ##[group]Runner Image Provisioner
2:  Hosted Compute Agent
...

17:  Image Release: https://github.com/actions/runner-images/releases/tag/ubuntu24%2F20260622.220
18:  ##[endgroup]
19:  ##[group]GITHUB_TOKEN Permissions
20:  Actions: read
21:  Contents: read
22:  Metadata: read
23:  SecurityEvents: write
24:  ##[endgroup]
25:  Secret source: Actions
26:  Prepare workflow directory
27:  Prepare all required actions
28:  Getting action download info
29:  Download action repository 'actions/checkout@v4' (SHA:34e114876b0b11c390a56381ad16ebd13914f8d5)
30:  Download action repository 'facebook/pyre-action@60697a7858f7cc8470d8cc494a3cf2ad6b06560d' (SHA:60697a7858f7cc8470d8cc494a3cf2ad6b06560d)
31:  Getting action download info
32:  ##[error]This request has been automatically failed because it uses a deprecated version of `actions/upload-artifact: v2`. Learn more: https://github.blog/changelog/2024-02-13-deprecation-notice-v1-and-v2-of-the-artifact-actions/

@Pmaster-dev Pmaster-dev added the enhancement New feature or request label Jun 30, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants