diff --git a/.gitignore b/.gitignore index bcc44bb..11d3632 100644 --- a/.gitignore +++ b/.gitignore @@ -1,10 +1,45 @@ +# Node / JS node_modules/ dist/ -.wrangler/ -.claude/ *.log -.DS_Store +package-lock.json + +# Wrangler / Cloudflare +.wrangler/ worker-configuration.d.ts # Vite cache directories .vite-test-cache/ + +# Editor / OS +.DS_Store +.claude/ + +# Python — byte-compiled / package metadata +__pycache__/ +*.py[cod] +*$py.class +*.egg-info/ +*.egg + +# Python — virtual environments (uv / venv / pywrangler) +.venv +.venv-* + +# Python — uv workspace lockfile +# Treat as application-style lockfile: NOT committed for now (research preview). +# Flip to allowing it when the project stabilises so installs become reproducible. +uv.lock + +# Python — pywrangler-vendored modules (re-generated by `pywrangler sync`) +python_modules/ + +# Python — pytest / type checkers / linters +.pytest_cache/ +.mypy_cache/ +.ruff_cache/ + +# Python — coverage +.coverage +.coverage.* +htmlcov/ diff --git a/examples/python/.gitignore b/examples/python/.gitignore new file mode 100644 index 0000000..8bb5ac7 --- /dev/null +++ b/examples/python/.gitignore @@ -0,0 +1,2 @@ +# Everything Python/Node/Wrangler-related is handled by the repo-root .gitignore. +# This file only adds example-specific ignores (if any). diff --git a/examples/python/README.md b/examples/python/README.md new file mode 100644 index 0000000..617d35d --- /dev/null +++ b/examples/python/README.md @@ -0,0 +1,133 @@ +# dynamic-workflows — Python playground + +100% Python port of `examples/basic/`. An interactive browser playground: +edit a tenant `WorkflowEntrypoint` in Python on the left, hit **Run**, watch +each `@step.do(...)` tick off on the right as the workflow progresses. + +``` +┌──────────────────────────────────────────────────────────────────────┐ +│ GET / → dashboard (HTML + JS) │ +│ GET /api/source → default tenant Python source │ +│ POST /api/run → { source, payload } → starts a run │ +│ GET /api/status/:runId → polled workflow status │ +└──────────────────────────────────────────────────────────────────────┘ +``` + +> 🚧 **Differences from the JS basic playground (intentional for v1):** +> +> - **No live log streaming.** The JS example uses a streaming Tail Worker +> and a `LogSession` Durable Object to fan-out logs over SSE. The Python +> streaming-tail path isn't documented yet — skipped for v1. The step +> timeline is driven by polling `/api/status/:runId` instead, the same +> way `examples/basic/` does for step progress. +> - **No Durable Object source persistence.** Instead of a DO, the tenant +> source is embedded in the dispatcher metadata itself (the second arg +> to `wrap_workflow_binding`). That metadata travels with the workflow's +> persisted payload, so engine replays after isolate recycles still find +> the source. The trade-off: source code lives in the workflow event +> payload (which is fine for a playground; for production, a DO + a +> stable source id is cleaner). + +Everything else mirrors the JS example: per-run tenant code is loaded +dynamically via `env.LOADER.get(...)`, the wrapped binding tags every +`create()` with `{"runId": ...}`, and the dispatcher's `DynamicWorkflow` +routes the engine's `run(event, step)` back into the same tenant code. + +## Run it + +```bash +cd examples/python +npm install # wrangler +uv sync # pywrangler + workspace deps +uv run pywrangler dev # standard idiom +``` + +Then open . + +### Requirements + +- **uv ≥ 0.11.14**. Older versions ship a rustls that rejects ECDSA-P-521 + certs, which Cloudflare's corporate Zero Trust MITM uses on + `index.pyodide.org`. Upgrade via + `curl -LsSf https://astral.sh/uv/install.sh | sh`. +- **Node + npm** (for `wrangler`, which `pywrangler` shells out to). +- **wrangler 4.83.0** (pinned in `package.json`). `wrangler 4.92.0` has a + regression: it double-applies the `experimental` compat flag to the + internal `workflows:dynamic` service and refuses to start. + +### Layout + +``` +/ # repo root +├── pyproject.toml # uv workspace declaration (members + nothing else) +examples/python/ # uv workspace member +├── package.json # wrangler dev-dep only +├── pyproject.toml # dynamic-workflows = { workspace = true } + workers-py git pin +├── wrangler.jsonc # main=src/entry.py, workflows + worker_loaders +└── src/ + ├── entry.py # dispatcher (HTTP routes + DynamicWorkflow class) + ├── dashboard.py # DASHBOARD_HTML constant + └── default_source.py # DEFAULT_SOURCE constant (Python tenant seed) +``` + +No custom scripts — just standard `npm`, `uv`, and `pywrangler` commands. + +### How `dynamic_workflows` is resolved at runtime + +**No hard copy** of the library source lives inside the example. The repo is a +[uv workspace](https://docs.astral.sh/uv/concepts/projects/workspaces/): the +root `/pyproject.toml` declares both `packages/dynamic-workflows-py` and +`examples/python` as workspace members. The example's `[tool.uv.sources]` +resolves `dynamic-workflows` via `{ workspace = true }`. + +`pywrangler sync` would normally ignore `[tool.uv.sources]` / +`[tool.uv.workspace]` because it shells out to `uv pip install -r`. This +example pins a patched `workers-py` fork that makes `pywrangler sync` use +`uv export` instead, honoring the workspace transparently. See the upstream +PR [cloudflare/workers-py#107](https://github.com/cloudflare/workers-py/pull/107). +When that merges, the pin can be dropped. + +> Because we're in a uv workspace, `uv sync` from `examples/python/` creates +> a single venv at the repo root (`/.venv/`) shared by all workspace members. +> The dev tools (`pywrangler`) end up in `/.venv/bin/`, not in a member-local venv. + +Edits to `packages/dynamic-workflows-py/src/dynamic_workflows/__init__.py` +are picked up automatically the next `uv run pywrangler dev|deploy` — the +patched sync resolves the workspace member fresh. + +## What this demonstrates + +- A Python `DynamicWorkflowBinding(WorkerEntrypoint)` registered via the + factory pattern `ctx.exports.DynamicWorkflowBinding({props: ...})` + (undocumented but verified working). +- A Python `DynamicWorkflow` (via `create_dynamic_workflow_entrypoint`) + bound as `class_name: "DynamicWorkflow"` in wrangler. +- A Python tenant calling `env.WORKFLOWS.create(...)` on the **wrapped** + binding — round-trips through the dispatcher's `DynamicWorkflowBinding`. +- The user's typed-in `@step.do` workflow running through the dispatcher + with the `step._js_step` forwarding trick. +- Worker Loader dynamically loading the user's Python source. + +## Compatibility flags + +```jsonc +"compatibility_flags": [ + "python_workers", + "python_workflows", + "python_no_global_handlers" +] +``` + +The third one is **required** — without it the runtime rewrites +class methods named `fetch` to `on_fetch` (legacy mode), and our +`async def fetch(self, request)` won't be called. + +## Troubleshooting + +| Symptom | Likely cause | +|---|---| +| `Method on_fetch does not exist` | Forgot `python_no_global_handlers` compat flag. | +| `DataCloneError` on RPC call | A Python dict was sent without `to_js(..., dict_converter=Object.fromEntries)`. | +| `MissingDispatcherMetadataError` | The workflow event has no envelope. Tenant must use `self.env.WORKFLOWS` (the wrapped binding) or the dispatcher must use `wrap_workflow_binding(metadata).create(...)` directly. | +| `RuntimeError: No source registered for run …` (in **tenant** mode) | The dispatcher's `SOURCES` dict was cleared between requests. The dispatcher source carries the tenant code in metadata to dodge this, but if you've changed that, the issue resurfaces. | +| `index.pyodide.org` TLS failure | uv ≤ 0.9.x. Upgrade via `curl -LsSf https://astml.sh/uv/install.sh \| sh`. | diff --git a/examples/python/package.json b/examples/python/package.json new file mode 100644 index 0000000..07b281e --- /dev/null +++ b/examples/python/package.json @@ -0,0 +1,7 @@ +{ + "name": "dynamic-workflows-python-example", + "private": true, + "devDependencies": { + "wrangler": "4.83.0" + } +} diff --git a/examples/python/pyproject.toml b/examples/python/pyproject.toml new file mode 100644 index 0000000..7d6ba8d --- /dev/null +++ b/examples/python/pyproject.toml @@ -0,0 +1,28 @@ +[project] +name = "dynamic-workflows-python-example" +version = "0.1.0" +description = "End-to-end demo of dynamic-workflows in Python." +requires-python = ">=3.12" +dependencies = [ + "dynamic-workflows", +] + +[dependency-groups] +dev = [ + "workers-py>=1.9.3", +] + +[tool.uv] +package = false + +# Workspace source. The patched workers-py (pinned below) makes +# `pywrangler sync` honor [tool.uv.sources] / [tool.uv.workspace] via +# `uv export`, so no pre-built wheel is needed for the Pyodide vendor step. +[tool.uv.sources] +dynamic-workflows = { workspace = true } +# Use a workers-py fork that patches `pywrangler sync` to honor +# [tool.uv.sources] and [tool.uv.workspace] via `uv export`. +# Tracked upstream as . +# When merged into cloudflare/workers-py, drop these overrides. +workers-py = { git = "https://github.com/LuisDuarte1/workers-py.git", subdirectory = "packages/cli", rev = "da951b48f65a39a77c826707de375e039f047228" } +workers-runtime-sdk = { git = "https://github.com/LuisDuarte1/workers-py.git", subdirectory = "packages/runtime-sdk", rev = "da951b48f65a39a77c826707de375e039f047228" } diff --git a/examples/python/src/dashboard.py b/examples/python/src/dashboard.py new file mode 100644 index 0000000..b39f568 --- /dev/null +++ b/examples/python/src/dashboard.py @@ -0,0 +1,471 @@ +# ruff: noqa: E501 +"""Static HTML dashboard served at GET /. +Pure frontend: editor + payload + step timeline + status polling.""" + +DASHBOARD_HTML = r""" + + + + + dynamic-workflows playground (Python) + + + +
+
+

dynamic-workflows playground (Python)

+

Write a Workflow in Python, hit Run, watch each @step.do tick off as the workflow progresses.

+
+
+
+
+ Tenant worker (Python) +
+ + +
+
+ +
Input payload (JSON)
+ +
+ Trigger + + +
+
+ +
+
Run output
+
+
No runs yet. Edit the code on the left and hit Run.
+
+
+
+
+ + + + +""" diff --git a/examples/python/src/default_source.py b/examples/python/src/default_source.py new file mode 100644 index 0000000..cd8aac6 --- /dev/null +++ b/examples/python/src/default_source.py @@ -0,0 +1,66 @@ +# ruff: noqa: E501 +"""Default Python tenant source code shown in the playground editor. +Mirrors examples/basic/src/default-source.ts but written in Python.""" + +DEFAULT_SOURCE = '''import json + +from workers import WorkerEntrypoint, WorkflowEntrypoint, Response +from pyodide.ffi import to_js +from js import Object + + +def _to_js(value): + return to_js(value, dict_converter=Object.fromEntries) + + +class TenantWorkflow(WorkflowEntrypoint): + """A multi-step demo workflow. + Edit me, hit Run, watch the steps tick off on the right.""" + + async def run(self, event, step): + payload = event["payload"] or {} + name = payload.get("name", "world") if isinstance(payload, dict) else "world" + + @step.do("greet") + async def greet(): + print(f"saying hello to {name}") + return f"hello, {name}" + + @step.do("count letters") + async def count_letters(): + return len(name) + + @step.do("combine results", depends=[greet, count_letters]) + async def combine(greet_result, count): + return f"{greet_result} (name has {count} letters)" + + return await combine() + + +class Default(WorkerEntrypoint): + """Tenant fetch entrypoint — only used in the playground's "tenant" mode. + In "dispatcher" mode the dispatcher calls wrap_workflow_binding().create() + itself and this class is never loaded. Safe to delete if you only ever + trigger workflows from the dispatcher.""" + + async def fetch(self, request): + body = await request.json() + result = await self.start_workflow( + body.get("id") or "", json.dumps(body.get("payload") or {}) + ) + return Response.json(result) + + async def start_workflow(self, workflow_id, payload_json): + params = json.loads(payload_json) if payload_json else {} + opts_dict = {"params": params} + if workflow_id: + opts_dict["id"] = workflow_id + opts = _to_js(opts_dict) + # Reach through workers-py's _FetcherWrapper to the raw JS RPC stub. + # Otherwise the {id, status, ...} JS Object returned by create() would + # be structurally converted to a Python dict by python_from_rpc and + # we'd lose the sync `.id` attribute access. + binding = getattr(self.env.WORKFLOWS, "_binding", self.env.WORKFLOWS) + handle = await binding.create(opts) + return {"instanceId": handle.id} +''' diff --git a/examples/python/src/entry.py b/examples/python/src/entry.py new file mode 100644 index 0000000..216d925 --- /dev/null +++ b/examples/python/src/entry.py @@ -0,0 +1,182 @@ +"""Interactive Python playground dispatcher for dynamic-workflows. +Mirrors examples/basic/src/index.ts: editor + run + status, minus live logs.""" + +import json +import uuid + +from workers import Response, WorkerEntrypoint, python_from_rpc +from pyodide.ffi import create_once_callable, to_js +from js import Object + +from dynamic_workflows import ( + DynamicWorkflowBinding, + create_dynamic_workflow_entrypoint, + wrap_workflow_binding, +) + +from dashboard import DASHBOARD_HTML +from default_source import DEFAULT_SOURCE + + +__all__ = [ + "Default", + "DynamicWorkflow", + "DynamicWorkflowBinding", +] + + +def _to_js(value): + return to_js(value, dict_converter=Object.fromEntries) + + +def _make_tenant_code(run_id: str, source: str): + """Build a WorkerCode for a tenant Python module loaded via Worker Loader. + The dispatcher embeds `source` into the binding metadata so it travels + with the workflow's persisted payload — no DO/storage needed for replay.""" + return _to_js( + { + "compatibilityDate": "2026-01-28", + "compatibilityFlags": [ + "python_workers", + "python_workflows", + "experimental", + ], + "mainModule": "entry.py", + "modules": {"entry.py": {"py": source}}, + "env": { + "WORKFLOWS": wrap_workflow_binding( + {"runId": run_id, "source": source} + ), + }, + "globalOutbound": None, + "allowExperimental": True, + } + ) + + +def _load_tenant_worker(env, run_id: str, source: str): + """Get (or build) the tenant Worker stub for this run. + Source must always be in hand — caller has it from either the HTTP body or + the workflow event's dispatcher metadata.""" + return env.LOADER.get( + f"run-{run_id}", + create_once_callable(lambda: _make_tenant_code(run_id, source)), + ) + + +async def load_runner(load_ctx): + """dynamic-workflows load_runner: return the runner for this run. + The tenant source rides in `metadata["source"]` so workflow replays + survive dispatcher isolate recycles without a Durable Object.""" + metadata = load_ctx["metadata"] + run_id = metadata.get("runId") + source = metadata.get("source") + if not run_id or not source: + raise ValueError("Missing runId or source in dispatcher metadata") + stub = _load_tenant_worker(load_ctx["env"], run_id, source) + return stub.getEntrypoint("TenantWorkflow") + + +DynamicWorkflow = create_dynamic_workflow_entrypoint(load_runner) + + +def _json(payload, status: int = 200) -> Response: + return Response.json(payload, status=status) + + +class Default(WorkerEntrypoint): + """HTTP entrypoint serving the playground dashboard + API routes.""" + + async def fetch(self, request): + from urllib.parse import urlparse + + url = urlparse(request.url) + path = url.path + method = request.method + + if method == "GET" and path == "/": + return Response( + DASHBOARD_HTML, + headers={"content-type": "text/html; charset=utf-8"}, + ) + + if method == "GET" and path == "/api/source": + return Response( + DEFAULT_SOURCE, + headers={"content-type": "text/x-python; charset=utf-8"}, + ) + + if method == "POST" and path == "/api/run": + return await self._handle_run(request) + + if method == "GET" and path.startswith("/api/status/"): + run_id = path[len("/api/status/") :] + return await self._handle_status(run_id) + + return Response("Not Found", status=404) + + async def _handle_run(self, request) -> Response: + try: + body = await request.json() + except Exception as e: + return _json({"error": f"Invalid JSON body: {e}"}, status=400) + + source = body.get("source") + if not isinstance(source, str) or not source: + return _json({"error": "Missing source code"}, status=400) + + mode = body.get("mode") or "tenant" + if mode not in ("direct", "tenant"): + return _json({"error": "mode must be 'direct' or 'tenant'"}, status=400) + + run_id = str(uuid.uuid4()) + payload = body.get("payload") or {} + + try: + if mode == "direct": + instance_id = await self._start_direct(run_id, source, payload) + else: + instance_id = await self._start_via_tenant(run_id, source, payload) + return _json( + { + "runId": run_id, + "instanceId": instance_id, + "mode": mode, + "status": {"status": "running"}, + } + ) + except Exception as e: + return _json({"error": str(e)}, status=500) + + async def _start_direct(self, run_id: str, source: str, payload): + """Dispatcher-driven mode: mint a wrapped binding here and call create() + directly. The tenant only needs a TenantWorkflow(WorkflowEntrypoint) — + no Default(WorkerEntrypoint) required, since the dispatcher never RPCs + into the tenant for the create() call.""" + wrapped = wrap_workflow_binding({"runId": run_id, "source": source}) + opts = _to_js({"id": run_id, "params": payload}) + # create() returns a {id, status, pause, ...} JS handle; .id is the + # instance id transmitted by value, so this is a sync read. + handle = await wrapped.create(opts) + return handle.id + + async def _start_via_tenant(self, run_id: str, source: str, payload): + """Tenant-driven mode: load the tenant, RPC into its Default.start_workflow. + Models the case where the tenant's own code is what triggers workflows, + from its own HTTP/RPC context. Requires `Default(WorkerEntrypoint)` + with a `start_workflow` method in the tenant source.""" + stub = _load_tenant_worker(self.env, run_id, source) + tenant = stub.getEntrypoint() + payload_json = json.dumps(payload) + result = python_from_rpc(await tenant.start_workflow(run_id, payload_json)) + return result.get("instanceId") if isinstance(result, dict) else None + + async def _handle_status(self, run_id: str) -> Response: + try: + instance = await self.env.WORKFLOWS.get(run_id) + status_obj = await instance.status() + if hasattr(status_obj, "then"): # JS Promise leaked through wrapper + status_obj = await status_obj + return _json({"runId": run_id, "status": python_from_rpc(status_obj)}) + except Exception as e: + return _json({"error": str(e)}, status=404) diff --git a/examples/python/wrangler.jsonc b/examples/python/wrangler.jsonc new file mode 100644 index 0000000..118cd42 --- /dev/null +++ b/examples/python/wrangler.jsonc @@ -0,0 +1,25 @@ +{ + "$schema": "node_modules/wrangler/config-schema.json", + "name": "dynamic-workflows-python-example", + "main": "src/entry.py", + "compatibility_date": "2026-01-28", + "compatibility_flags": [ + "python_workers", + "python_workflows", + "experimental" + ], + // `python_no_global_handlers` is the default as of 2025-08-14 (our compat + // date is later) and `experimental` is implicit on the workflows internal + // service. The patched workers-py fork pinned in pyproject.toml feeds the + // experimental flag through via Worker Loader's `allowExperimental`. + "workflows": [ + { + "name": "dynamic", + "binding": "WORKFLOWS", + "class_name": "DynamicWorkflow" + } + ], + "worker_loaders": [ + { "binding": "LOADER" } + ] +} diff --git a/packages/dynamic-workflows-py/README.md b/packages/dynamic-workflows-py/README.md new file mode 100644 index 0000000..ba5aaf2 --- /dev/null +++ b/packages/dynamic-workflows-py/README.md @@ -0,0 +1,161 @@ +# `dynamic-workflows` (Python) + +Python port of [`@cloudflare/dynamic-workflows`](../dynamic-workflows). Lets a Python Cloudflare Worker act as a multi-tenant Workflows dispatcher, routing `run()` calls into per-tenant Python (or JS) workers loaded dynamically via Worker Loader. + +This is a **research preview** — the API mirrors the JS package but the +`pywrangler` integration relies on a patched workers-py fork (see +`examples/python/pyproject.toml` for the pin). + +## Status + +| Piece | Status | +|---|---| +| Envelope wrap/unwrap | ✅ ported, logic identical to JS | +| `DynamicWorkflowBinding` | ✅ ported as `WorkerEntrypoint` (no `RpcTarget` available) | +| `DynamicWorkflowInstanceStub` | ✅ ported; re-resolves instance each call | +| `dispatch_workflow` / `create_dynamic_workflow_entrypoint` | ✅ ported | +| `wrap_workflow_binding` factory | ✅ ported, depends on undocumented Python-class-as-`ctx.exports`-factory behavior | +| `WrappedWorkflow` / `WrappedInstance` tenant facades | ✅ new — not in JS, for Python ergonomics | +| Tests | ✅ 34 host pytest tests covering envelope + binding-impl + dispatch-core (Pattern A) | +| Verified end-to-end against `pywrangler dev` | ✅ examples/python runs the full  Dashboard → dispatcher → tenant → step.do   chain | + +## Installation + +This package is not yet on PyPI. For now, point your tenant/dispatcher's +`pyproject.toml` at the local path or vendor `dynamic_workflows/` directly +next to your `entry.py`. + +```toml +[tool.uv.sources] +dynamic-workflows = { path = "../../packages/dynamic-workflows-py" } +``` + +## Quickstart + +```python +# src/entry.py +from workers import WorkerEntrypoint, Response +from dynamic_workflows import ( + DynamicWorkflowBinding, + DynamicWorkflowInstanceStub, + create_dynamic_workflow_entrypoint, + wrap_workflow_binding, +) +from pyodide.ffi import to_js +from js import Object + +__all__ = [ + "DynamicWorkflowBinding", + "DynamicWorkflowInstanceStub", + "DynamicWorkflow", + "Default", +] + +TENANT_SOURCE = """ +from workers import WorkerEntrypoint, WorkflowEntrypoint, Response +from dynamic_workflows import WrappedWorkflow + +class TenantWorkflow(WorkflowEntrypoint): + async def run(self, event, step): + @step.do("greet") + async def greet(): + name = event["payload"].get("name", "world") + return f"hello, {name}" + return await greet() + +class Default(WorkerEntrypoint): + async def fetch(self, request): + body = await request.json() + workflows = WrappedWorkflow(self.env.WORKFLOWS) + instance = await workflows.create(params=body) + return Response.json({"id": await instance.id()}) +""" + +async def load_runner(load_ctx): + tenant_id = load_ctx["metadata"]["tenantId"] + stub = load_ctx["env"].LOADER.get( + f"tenant-{tenant_id}", + lambda: to_js({ + "compatibilityDate": "2025-08-01", + "compatibilityFlags": ["python_workers", "python_workflows"], + "mainModule": "entry.py", + "modules": {"entry.py": TENANT_SOURCE}, + "env": { + "WORKFLOWS": wrap_workflow_binding({"tenantId": tenant_id}), + }, + }, dict_converter=Object.fromEntries), + ) + return stub.getEntrypoint("TenantWorkflow") + +DynamicWorkflow = create_dynamic_workflow_entrypoint(load_runner) + +class Default(WorkerEntrypoint): + async def fetch(self, request): + # forward request into tenant; tenant calls its env.WORKFLOWS.create() + ... +``` + +```jsonc +// wrangler.jsonc +{ + "main": "src/entry.py", + "compatibility_date": "2025-08-01", + "compatibility_flags": ["python_workers", "python_workflows"], + "workflows": [ + { "name": "dynamic", "binding": "WORKFLOWS", "class_name": "DynamicWorkflow" } + ], + "worker_loaders": [{ "binding": "LOADER" }] +} +``` + +See `examples/python/` for a runnable demo. + +## Public API + +| Name | Type | Purpose | +|---|---|---| +| `DynamicWorkflowBinding` | class (`WorkerEntrypoint`) | Wrapped `Workflow` binding factory. Re-import in main module. | +| `DynamicWorkflowInstanceStub` | class (`WorkerEntrypoint`) | RPC wrapper around a `WorkflowInstance`. Re-import in main module. | +| `wrap_workflow_binding(metadata, *, binding_name="WORKFLOWS")` | function | Mint a per-tenant binding stub. Pass into Worker Loader `env`. | +| `create_dynamic_workflow_entrypoint(load_runner, *, class_name="DynamicWorkflow")` | function | Returns the `WorkflowEntrypoint` subclass you bind in wrangler. | +| `dispatch_workflow(*, env, ctx, event, step, load_runner)` | function | Lower-level — for custom `WorkflowEntrypoint` subclasses. | +| `MissingDispatcherMetadataError` | exception | Raised when `event.payload` lacks an envelope. | +| `WrappedWorkflow` / `WrappedInstance` | classes | Optional Pythonic facades for tenant code. Hide `to_js` ceremony. | + +## Running the tests + +```bash +cd packages/dynamic-workflows-py +uv sync +uv run pytest +``` + +The tests target the pure-Python `_core` module (envelope wrap/unwrap, +`dispatcher_binding_impl`, `dispatch_workflow_core`) and don't require +workerd — they run on host CPython in ~60ms. The workerd-bound layer +(`_workerd.py`, `DynamicWorkflowBinding`, etc.) is only exercised end-to-end +via `examples/python` under `pywrangler dev`. + +Layout: + +``` +src/dynamic_workflows/ +├── __init__.py # tries to import _workerd, falls back to _core-only +├── _core.py # pure Python — tested on host +└── _workerd.py # js/workers/pyodide bindings — only loadable inside workerd +``` + +The split mirrors the JS package's `binding.ts` factor-out pattern +(`_dispatcherBindingImpl`): real logic lives in pure functions, the +`WorkerEntrypoint` subclasses are thin shells over them. + +## Known limitations / open verification items + +1. **`ctx.exports.MyPythonClass({props: ...})` factory call is undocumented**. The mechanism is supported in `workerd`'s `EntrypointWrapper`, but no Cloudflare doc page demonstrates calling a Python-defined class this way. Test with `pywrangler dev` first. +2. **`step._js_step`** is a private attribute of `workers-py`. Stable today but could break with SDK upgrades. +3. **Python Worker cold starts are slow** (Cloudflare-published warning). Dynamic loading via Worker Loader makes this worse — there's no precomputed memory snapshot. For one-off / AI-generated code, the JS path is still recommended by Cloudflare. +4. **`@property` doesn't survive `collect_methods`** — the instance stub's `id` is exposed as `async def id(self)`, not a property. Slightly more await-ceremony than JS. +5. **No tests in workerd yet** — would need a `pytest` + `pywrangler` setup that this repo doesn't have. + +Each item is rooted in the workers-py / workerd source; the corresponding +behavior is captured by the host pytest suite under `tests/`. diff --git a/packages/dynamic-workflows-py/pyproject.toml b/packages/dynamic-workflows-py/pyproject.toml new file mode 100644 index 0000000..363b75b --- /dev/null +++ b/packages/dynamic-workflows-py/pyproject.toml @@ -0,0 +1,36 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + +[project] +name = "dynamic-workflows" +version = "0.1.0" +description = "Python port of @cloudflare/dynamic-workflows — multi-tenant workflow dispatcher for Cloudflare Dynamic Workers." +readme = "README.md" +requires-python = ">=3.12" +license = { text = "MIT" } +authors = [{ name = "dynamic-workflows contributors" }] +keywords = ["cloudflare", "workers", "workflows", "dynamic-workers"] +classifiers = [ + "Programming Language :: Python :: 3", + "License :: OSI Approved :: MIT License", + "Operating System :: OS Independent", +] + +[project.urls] +Homepage = "https://github.com/danlapid/dynamic-workflows" +Source = "https://github.com/danlapid/dynamic-workflows" + +[tool.hatch.build.targets.wheel] +packages = ["src/dynamic_workflows"] + +[dependency-groups] +dev = [ + "pytest>=8.0", + "pytest-asyncio>=0.24", +] + +[tool.pytest.ini_options] +testpaths = ["tests"] +asyncio_mode = "auto" +asyncio_default_fixture_loop_scope = "function" diff --git a/packages/dynamic-workflows-py/src/dynamic_workflows/__init__.py b/packages/dynamic-workflows-py/src/dynamic_workflows/__init__.py new file mode 100644 index 0000000..ce0cd8e --- /dev/null +++ b/packages/dynamic-workflows-py/src/dynamic_workflows/__init__.py @@ -0,0 +1,53 @@ +"""Python port of @cloudflare/dynamic-workflows. +See README.md for design rationale and the JS package for the canonical API.""" + +from ._core import ( + DispatcherMetadata, + MissingDispatcherMetadataError, + _METADATA_KEY, + _wrap_params, + _unwrap_params, + dispatcher_binding_impl, + dispatch_workflow_core, +) + +# JS/Pyodide bindings only available inside the workerd Pyodide runtime. +# Host-side imports (e.g. pytest) hit the ImportError and skip the workerd +# layer — only the pure helpers re-exported from _core remain available. +try: + from ._workerd import ( + DynamicWorkflowBinding, + LoadWorkflowRunner, + LoadWorkflowRunnerContext, + WorkflowRunner, + WrappedInstance, + WrappedWorkflow, + create_dynamic_workflow_entrypoint, + dispatch_workflow, + wrap_workflow_binding, + ) + + __all__ = [ + "DispatcherMetadata", + "DynamicWorkflowBinding", + "LoadWorkflowRunner", + "LoadWorkflowRunnerContext", + "MissingDispatcherMetadataError", + "WorkflowRunner", + "WrappedInstance", + "WrappedWorkflow", + "create_dynamic_workflow_entrypoint", + "dispatch_workflow", + "wrap_workflow_binding", + ] +except ImportError: + # Host-side / test context: only the pure-Python core is importable. + __all__ = [ + "DispatcherMetadata", + "MissingDispatcherMetadataError", + "_METADATA_KEY", + "_wrap_params", + "_unwrap_params", + "dispatcher_binding_impl", + "dispatch_workflow_core", + ] diff --git a/packages/dynamic-workflows-py/src/dynamic_workflows/_core.py b/packages/dynamic-workflows-py/src/dynamic_workflows/_core.py new file mode 100644 index 0000000..db98008 --- /dev/null +++ b/packages/dynamic-workflows-py/src/dynamic_workflows/_core.py @@ -0,0 +1,152 @@ +"""Pure-Python core of dynamic-workflows: envelope helpers + dispatch logic. +No `js`/`workers`/`pyodide.ffi` imports here so the helpers are host-testable.""" + +import inspect +from typing import Any, Awaitable, Callable, Optional, Tuple, Union + +DispatcherMetadata = dict +"""Opaque key the dispatcher uses to route a workflow back to a tenant. +Treat the contents as JSON-shaped; never put secrets in it (it's persisted).""" + +_METADATA_KEY = "__dispatcherMetadata" + + +class MissingDispatcherMetadataError(Exception): + """Raised by dispatch_workflow when event.payload has no envelope. + Usually means the workflow was created against the raw binding.""" + + def __init__(self) -> None: + super().__init__( + "dynamic-workflows: workflow event is missing dispatcher metadata. " + "Did you forget to wrap the Workflow binding with wrap_workflow_binding()?" + ) + + +def _wrap_params(params: Any, metadata: DispatcherMetadata) -> dict: + """Wrap a tenant's params payload in a dispatcher envelope. + Mirrors wrapParams() from binding.ts.""" + return {_METADATA_KEY: metadata, "params": params} + + +def _unwrap_params(payload: Any) -> Optional[Tuple[DispatcherMetadata, Any]]: + """Pull (metadata, params) back out of a dispatcher envelope, or None. + Mirrors unwrapParams() from binding.ts.""" + if isinstance(payload, dict) and _METADATA_KEY in payload and "params" in payload: + return payload[_METADATA_KEY], payload["params"] + return None + + +def _is_awaitable(value: Any) -> bool: + """Awaitable check that doesn't get fooled by MagicMock's auto-attrs. + `inspect.isawaitable` handles coroutines, generators, and Pyodide JS Promise proxies.""" + return inspect.isawaitable(value) + + +def _identity(value: Any) -> Any: + """Default encode/get-step hook: pass-through. + Real `dispatch_workflow` wraps these to do JS conversions; tests don't.""" + return value + + +def _default_make_handle(instance: Any) -> Any: + """Host-side default: return a plain dict with just `id` so pure-Python + tests work. Workerd injects a builder that bundles the id by value with + JS-native `.bind()` method functors over the JS WorkflowInstance.""" + return {"id": instance.id} + + +def dispatcher_binding_impl( + get_binding: Callable[[], Any], + metadata: DispatcherMetadata, + *, + encode: Callable[[dict], Any] = _identity, + make_handle: Callable[[Any], Any] = _default_make_handle, +) -> Any: + """Pure binding-wrap logic, factored out for testability. + The WorkerEntrypoint subclass `DynamicWorkflowBinding` is a thin shell + over this. Mirrors `_dispatcherBindingImpl` from the JS package. + + `make_handle(instance)` builds the value returned from create/createBatch/ + get. Default returns `{"id": instance.id}` for host tests; workerd injects + a builder that returns a JS Object literal `{id, status, pause, ...}` + where methods are `instance.method.bind(instance)` — RPC-marshalable + JS-native functions, no Python proxies, no factory ceremony.""" + + class _Impl: + async def create(self, options: Optional[dict] = None) -> Any: + opts = dict(options) if options else {} + opts["params"] = _wrap_params(opts.get("params"), metadata) + instance = await get_binding().create(encode(opts)) + return make_handle(instance) + + async def create_batch(self, batch: list) -> list: + wrapped = [ + {**(item or {}), "params": _wrap_params((item or {}).get("params"), metadata)} + for item in batch + ] + # Call JS-native camelCase: works on raw JsProxy and on workers-py's + # _WorkflowBindingWrapper (via __getattr__ fallthrough). + instances = await get_binding().createBatch(encode(wrapped)) + return [make_handle(inst) for inst in instances] + + async def get(self, instance_id: str) -> Any: + instance = await get_binding().get(instance_id) + return make_handle(instance) + + return _Impl() + + +async def send_event_on_instance( + instance: Any, + event_type: str, + payload: Any, + *, + encode: Callable[[dict], Any] = _identity, +) -> Any: + """Forward a sendEvent call to a JS-like WorkflowInstance. + JS WorkflowInstance.sendEvent takes a single options object + `{type, payload}` (NOT kwargs). Keeping the wire-shape build here + so the contract is host-testable without workerd; `encode` is + `to_js(..., dict_converter=Object.fromEntries)` at runtime, + identity in tests.""" + return await instance.sendEvent(encode({"type": event_type, "payload": payload})) + + +async def dispatch_workflow_core( + *, + env: Any, + ctx: Any, + event: dict, + step: Any, + load_runner: Callable[[dict], Any], + encode: Callable[[dict], Any] = _identity, + decode: Callable[[Any], Any] = _identity, + get_js_step: Callable[[Any], Any] = _identity, +) -> Any: + """Pure dispatch logic: unwrap envelope, call loader, delegate to runner. + Mirrors `dispatchWorkflow` from the JS package. `encode`/`decode`/ + `get_js_step` are injected so host tests don't need workerd/Pyodide. + + `encode` and `decode` are a symmetric Workers-RPC converter pair: at + runtime they're `python_to_rpc` / `python_from_rpc` from workers-py, so + values cross the dispatcher→runner boundary the same way they would for + any other Workers RPC call. Defaults are identity for host tests.""" + unwrapped = _unwrap_params(event["payload"]) + if unwrapped is None: + raise MissingDispatcherMetadataError() + + metadata, params = unwrapped + + inner_event = { + "payload": params, + "timestamp": event.get("timestamp"), + "instanceId": event["instanceId"], + } + + load_ctx = {"metadata": metadata, "env": env, "ctx": ctx} + maybe_runner = load_runner(load_ctx) + runner = await maybe_runner if _is_awaitable(maybe_runner) else maybe_runner + + js_step = get_js_step(step) + result = await runner.run(encode(inner_event), js_step) + return decode(result) diff --git a/packages/dynamic-workflows-py/src/dynamic_workflows/_workerd.py b/packages/dynamic-workflows-py/src/dynamic_workflows/_workerd.py new file mode 100644 index 0000000..8bf4bfb --- /dev/null +++ b/packages/dynamic-workflows-py/src/dynamic_workflows/_workerd.py @@ -0,0 +1,262 @@ +"""workerd/Pyodide-bound implementations. +Imports `js`, `workers`, and `pyodide.ffi`; only importable inside the +workerd Pyodide runtime. Host-side code uses the pure helpers in `_core`.""" + +from typing import Any, Awaitable, Callable, Optional, Protocol, Union + +import js +from js import Object +from pyodide.ffi import to_js +from workers import ( + WorkerEntrypoint, + WorkflowEntrypoint, + import_from_javascript, + python_from_rpc as _sdk_python_from_rpc, + python_to_rpc as _sdk_python_to_rpc, +) + +from ._core import ( + DispatcherMetadata, + dispatcher_binding_impl, + dispatch_workflow_core, + send_event_on_instance, +) + + +def _to_js(value: Any) -> Any: + """Convert a Python dict/list/primitive to a real JS Object (not a Map). + Recurses into nested dicts via Object.fromEntries.""" + return to_js(value, dict_converter=Object.fromEntries) + + +def _exports() -> Any: + """Return the cloudflare:workers `exports` proxy. + Used to invoke the Python-defined entrypoint classes as factories.""" + return import_from_javascript("cloudflare:workers").exports + + +class _PassThrough: + """Wrapper that makes a JsProxy survive `python_to_rpc` unchanged. + The SDK's default converter unwraps any object exposing `.js_object`.""" + + __slots__ = ("js_object",) + + def __init__(self, js_obj: Any) -> None: + self.js_object = js_obj + + +def _props_to_py(props: Any) -> dict: + """Convert ctx.props (a raw JsProxy) to a Python dict. + `.to_py()` recurses, so nested metadata dicts come out correctly.""" + return props.to_py() + + +_INSTANCE_METHODS = ("status", "pause", "resume", "terminate", "restart", "sendEvent") + + +def _make_handle(instance: Any) -> Any: + """Build a {id, status, pause, ...} JS object the tenant gets from create(). + Each method is `js_instance.method.bind(js_instance)` — a JS-native bound + function that crosses RPC as a callable handle, no Python proxies, no + factory ceremony. Matches the official binding's ergonomic: sync `.id`, + async `.status()`/`.pause()`/`.sendEvent({type, payload})` etc. + + `instance` may be a workers-py `_WorkflowInstanceWrapper`; reach through + to the underlying JS WorkflowInstance so `.bind(this)` binds to the JS + object, not the Python wrapper.""" + js_instance = getattr(instance, "_binding", instance) + handle = js.Object.new() + handle.id = js_instance.id + for name in _INSTANCE_METHODS: + setattr(handle, name, getattr(js_instance, name).bind(js_instance)) + return _PassThrough(handle) + + +class DynamicWorkflowBinding(WorkerEntrypoint): + """Wrapped `Workflow` binding handed to each tenant via Worker Loader env. + Tags every create() call with dispatcher metadata, returns a JS handle.""" + + def _binding(self) -> Any: + binding_name = self.ctx.props.bindingName + return getattr(self.env, binding_name) + + def _metadata(self) -> DispatcherMetadata: + return _props_to_py(self.ctx.props).get("metadata", {}) + + def _impl(self) -> Any: + return dispatcher_binding_impl( + self._binding, + self._metadata(), + encode=_to_js, + make_handle=_make_handle, + ) + + async def create(self, options: Optional[dict] = None) -> Any: + return await self._impl().create(options) + + async def createBatch(self, batch: list) -> list: + return await self._impl().create_batch(batch) + + async def get(self, instance_id: str) -> Any: + return await self._impl().get(instance_id) + + +def wrap_workflow_binding( + metadata: DispatcherMetadata, + *, + binding_name: str = "WORKFLOWS", +) -> Any: + """Mint a per-tenant Workflow-shaped RPC stub the tenant sees as env.WORKFLOWS. + Equivalent of wrapWorkflowBinding() from binding.ts.""" + exports = _exports() + factory = getattr(exports, "DynamicWorkflowBinding", None) + if factory is None: + raise RuntimeError( + "dynamic-workflows: `DynamicWorkflowBinding` is not registered on " + "`cloudflare:workers` exports. Add `from dynamic_workflows import " + "DynamicWorkflowBinding` to your dispatcher's main module." + ) + return factory( + _to_js({"props": {"bindingName": binding_name, "metadata": metadata}}) + ) + + +class WorkflowRunner(Protocol): + """Anything with an async run(event, step) — e.g. a Worker Loader getEntrypoint(). + The dispatcher's load_runner callback returns one of these.""" + + async def run(self, event: dict, step: Any) -> Any: ... + + +LoadWorkflowRunnerContext = dict +"""Dict with keys {metadata, env, ctx} passed to load_runner. +Kept as a plain dict for ergonomic destructuring on the user side.""" + +LoadWorkflowRunner = Callable[ + [LoadWorkflowRunnerContext], Union[WorkflowRunner, Awaitable[WorkflowRunner]] +] + + +def _extract_js_step(step: Any) -> Any: + """Reach into _WorkflowStepWrapper to pull out the raw JS step JsProxy. + Falls back to the input if it's already a JsProxy (defensive).""" + return getattr(step, "_js_step", step) + + +async def dispatch_workflow( + *, + env: Any, + ctx: Any, + event: dict, + step: Any, + load_runner: LoadWorkflowRunner, +) -> Any: + """Unwrap the dispatcher envelope on event.payload and delegate to a runner. + Mirror of dispatchWorkflow() from entrypoint.ts. + + encode/decode are the Workers-RPC converter pair from workers-py so values + cross the dispatcher↔runner boundary with the same conversion semantics as + any other RPC call (Date↔datetime, Response wrappers, etc.).""" + return await dispatch_workflow_core( + env=env, + ctx=ctx, + event=event, + step=step, + load_runner=load_runner, + encode=_sdk_python_to_rpc, + decode=_sdk_python_from_rpc, + get_js_step=_extract_js_step, + ) + + +def create_dynamic_workflow_entrypoint( + load_runner: LoadWorkflowRunner, + *, + class_name: str = "DynamicWorkflow", +) -> type: + """Create a WorkflowEntrypoint subclass that delegates run() to a tenant. + Assign the result to a module-level name matching `class_name` in wrangler.""" + + class _DynamicWorkflowEntrypoint(WorkflowEntrypoint): + async def run(self, event, step): + return await dispatch_workflow( + env=self.env, + ctx=self.ctx, + event=event, + step=step, + load_runner=load_runner, + ) + + _DynamicWorkflowEntrypoint.__name__ = class_name + _DynamicWorkflowEntrypoint.__qualname__ = class_name + return _DynamicWorkflowEntrypoint + + +class WrappedInstance: + """Pythonic facade around the JS handle a tenant gets from create(). + `.id` is captured sync at construction (the handle carries it by value); + method calls go through the bound JS functors. Optional convenience.""" + + __slots__ = ("id", "_handle") + + def __init__(self, handle: Any) -> None: + self.id = handle.id + self._handle = handle + + async def status(self) -> Any: + return _sdk_python_from_rpc(await self._handle.status()) + + async def pause(self) -> Any: + return await self._handle.pause() + + async def resume(self) -> Any: + return await self._handle.resume() + + async def terminate(self) -> Any: + return await self._handle.terminate() + + async def restart(self) -> Any: + return await self._handle.restart() + + async def send_event(self, *, type: str, payload: Any) -> Any: + return await send_event_on_instance( + self._handle, type, payload, encode=_to_js, + ) + + +class WrappedWorkflow: + """Pythonic facade for tenant code calling env.WORKFLOWS. + Hides the to_js dance and returns WrappedInstance objects. + + Reaches through workers-py's _FetcherWrapper to the raw JS RPC stub so + that `python_from_rpc` is NOT auto-applied to results — otherwise our + `{id, status, ...}` JS Object handle would be structurally converted to + a Python dict and lose the sync `.id` attribute access.""" + + def __init__(self, js_binding: Any) -> None: + # If `js_binding` is a workers-py _FetcherWrapper (the case when a + # tenant passes `self.env.WORKFLOWS`), reach through to `._binding` + # which is the raw JsProxy. Otherwise (raw JsProxy already) use as-is. + self._binding = getattr(js_binding, "_binding", js_binding) + + async def create( + self, + *, + id: Optional[str] = None, + params: Any = None, + ) -> WrappedInstance: + opts: dict = {} + if id is not None: + opts["id"] = id + if params is not None: + opts["params"] = params + handle = await self._binding.create(_to_js(opts)) + return WrappedInstance(handle) + + async def create_batch(self, batch: list) -> list: + handles = await self._binding.createBatch(_to_js(batch)) + return [WrappedInstance(h) for h in handles] + + async def get(self, instance_id: str) -> WrappedInstance: + handle = await self._binding.get(instance_id) + return WrappedInstance(handle) diff --git a/packages/dynamic-workflows-py/tests/__init__.py b/packages/dynamic-workflows-py/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/packages/dynamic-workflows-py/tests/test_binding.py b/packages/dynamic-workflows-py/tests/test_binding.py new file mode 100644 index 0000000..9beb3f7 --- /dev/null +++ b/packages/dynamic-workflows-py/tests/test_binding.py @@ -0,0 +1,275 @@ +"""Tests for dispatcher_binding_impl — mirrors JS binding.test.ts (10 cases). +Exercises the pure envelope-injection logic shared by DynamicWorkflowBinding.""" + +from types import SimpleNamespace +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from dynamic_workflows._core import ( + _METADATA_KEY, + dispatcher_binding_impl, + send_event_on_instance, +) + + +def make_fake_instance(instance_id: str) -> Any: + """A minimal duck-typed WorkflowInstance with the attributes our impl reads.""" + inst = MagicMock() + inst.id = instance_id + return inst + + +def make_fake_binding(): + """Return (fake_binding, create_spy, create_batch_spy, get_spy). + Only the JS-native camelCase `createBatch` is exposed — see _core.py.""" + create = AsyncMock(side_effect=lambda opts=None: make_fake_instance( + (opts or {}).get("id", "auto-id") + )) + create_batch = AsyncMock(side_effect=lambda batch: [ + make_fake_instance((opts or {}).get("id", "auto-id")) for opts in batch + ]) + get = AsyncMock(side_effect=lambda instance_id: make_fake_instance(instance_id)) + # `spec` prevents MagicMock from auto-vivifying snake_case aliases, + # so a regression to .create_batch surfaces as AttributeError in tests. + binding = MagicMock(spec=["create", "createBatch", "get"]) + binding.create = create + binding.createBatch = create_batch + binding.get = get + return binding, create, create_batch, get + + +def wrap(binding, metadata): + """dispatcher_binding_impl with a closure that returns the fake binding.""" + return dispatcher_binding_impl(lambda: binding, metadata) + + +@pytest.mark.asyncio +async def test_injects_metadata_into_create_params(): + binding, create, _, _ = make_fake_binding() + wrapped = wrap(binding, {"tenantId": "tenant-42"}) + + await wrapped.create({"id": "wf-1", "params": {"input": "hello"}}) + + assert create.await_count == 1 + called_with = create.await_args.args[0] + assert called_with["id"] == "wf-1" + assert called_with["params"] == { + _METADATA_KEY: {"tenantId": "tenant-42"}, + "params": {"input": "hello"}, + } + + +@pytest.mark.asyncio +async def test_injects_metadata_when_create_has_no_options(): + binding, create, _, _ = make_fake_binding() + wrapped = wrap(binding, {"tenantId": "t1"}) + + await wrapped.create() + + called_with = create.await_args.args[0] + assert called_with["params"] == { + _METADATA_KEY: {"tenantId": "t1"}, + "params": None, + } + + +@pytest.mark.asyncio +async def test_injects_metadata_when_create_has_no_params(): + binding, create, _, _ = make_fake_binding() + wrapped = wrap(binding, {"tenantId": "t1"}) + + await wrapped.create({"id": "wf-no-params"}) + + called_with = create.await_args.args[0] + assert called_with["id"] == "wf-no-params" + assert called_with["params"] == { + _METADATA_KEY: {"tenantId": "t1"}, + "params": None, + } + + +@pytest.mark.asyncio +async def test_passes_arbitrary_metadata_shapes(): + binding, create, _, _ = make_fake_binding() + metadata = { + "tenantId": "acme", + "region": "us-east", + "features": ["beta", "pro"], + "nested": {"version": 3}, + } + wrapped = wrap(binding, metadata) + + await wrapped.create({"params": {"job": "x"}}) + + called_with = create.await_args.args[0] + assert called_with["params"][_METADATA_KEY] == metadata + assert called_with["params"]["params"] == {"job": "x"} + + +@pytest.mark.asyncio +async def test_injects_metadata_into_every_item_of_create_batch(): + binding, _, create_batch, _ = make_fake_binding() + wrapped = wrap(binding, {"tenantId": "t-1"}) + + await wrapped.create_batch([ + {"id": "wf-a", "params": {"x": 1}}, + {"id": "wf-b", "params": {"x": 2}}, + {"params": {"x": 3}}, + ]) + + called_with = create_batch.await_args.args[0] + assert len(called_with) == 3 + for i, item in enumerate(called_with): + assert item["params"][_METADATA_KEY] == {"tenantId": "t-1"} + assert item["params"]["params"] == {"x": i + 1} + assert called_with[0]["id"] == "wf-a" + assert called_with[1]["id"] == "wf-b" + assert "id" not in called_with[2] + + +@pytest.mark.asyncio +async def test_forwards_get_unchanged(): + """get() doesn't inject metadata — it's a lookup, no envelope needed.""" + binding, _, _, get = make_fake_binding() + wrapped = wrap(binding, {"tenantId": "t-1"}) + + result = await wrapped.get("some-id") + + get.assert_awaited_once_with("some-id") + assert result == {"id": "some-id"} + + +@pytest.mark.asyncio +async def test_returns_instance_id_from_underlying_binding(): + binding, _, _, _ = make_fake_binding() + wrapped = wrap(binding, {"tenantId": "t-1"}) + + result = await wrapped.create({"id": "wf-xyz", "params": {}}) + + assert result == {"id": "wf-xyz"} + + +@pytest.mark.asyncio +async def test_does_not_mutate_caller_provided_options(): + binding, _, _, _ = make_fake_binding() + wrapped = wrap(binding, {"tenantId": "t-1"}) + + user_options = {"id": "wf-1", "params": {"input": "hello"}} + user_params = user_options["params"] + + await wrapped.create(user_options) + + # The caller's dict + nested params should be untouched. + assert user_options == {"id": "wf-1", "params": {"input": "hello"}} + assert user_params == {"input": "hello"} + + +@pytest.mark.asyncio +async def test_does_not_double_wrap_if_used_twice(): + """Calling .create() twice on the same wrapped binding shouldn't accumulate envelopes.""" + binding, create, _, _ = make_fake_binding() + wrapped = wrap(binding, {"tenantId": "t-1"}) + + await wrapped.create({"params": {"first": True}}) + await wrapped.create({"params": {"second": True}}) + + first_call = create.await_args_list[0].args[0] + second_call = create.await_args_list[1].args[0] + assert first_call["params"] == { + _METADATA_KEY: {"tenantId": "t-1"}, + "params": {"first": True}, + } + assert second_call["params"] == { + _METADATA_KEY: {"tenantId": "t-1"}, + "params": {"second": True}, + } + + +@pytest.mark.asyncio +async def test_resolves_underlying_binding_lazily(): + """The `get_binding` callable is invoked on every method call, not memoized.""" + binding, _, _, _ = make_fake_binding() + get_binding_spy = MagicMock(return_value=binding) + wrapped = dispatcher_binding_impl(get_binding_spy, {"tenantId": "t-1"}) + + await wrapped.create({"params": {}}) + await wrapped.get("some-id") + await wrapped.create({"params": {}}) + + assert get_binding_spy.call_count == 3 + + +@pytest.mark.asyncio +async def test_send_event_on_instance_calls_camelCase_sendEvent_with_options_object(): + """sendEvent must be called as `.sendEvent({type, payload})` — single + positional JS options object, JS-native camelCase name. Regressions to + kwargs (type=..., payload=...) or snake_case .send_event must fail.""" + # `spec=["sendEvent"]` blocks MagicMock auto-vivification, so calling + # `.send_event` (snake_case) raises AttributeError. + instance = MagicMock(spec=["sendEvent"]) + instance.sendEvent = AsyncMock(return_value="ack") + + result = await send_event_on_instance(instance, "my-event", {"x": 1}) + + assert result == "ack" + instance.sendEvent.assert_awaited_once() + args, kwargs = instance.sendEvent.await_args + # Exactly one positional, no kwargs — catches regression to kwargs shape. + assert kwargs == {} + assert len(args) == 1 + assert args[0] == {"type": "my-event", "payload": {"x": 1}} + + +@pytest.mark.asyncio +async def test_send_event_on_instance_uses_injected_encode(): + """`encode` runs over the options dict before it reaches sendEvent. + Workerd injects `_to_js` so the JS side sees a real Object, not a Map.""" + instance = MagicMock(spec=["sendEvent"]) + instance.sendEvent = AsyncMock(return_value=None) + + sentinel = object() + encode_spy = MagicMock(return_value=sentinel) + + await send_event_on_instance(instance, "evt", "data", encode=encode_spy) + + encode_spy.assert_called_once_with({"type": "evt", "payload": "data"}) + instance.sendEvent.assert_awaited_once_with(sentinel) + + +@pytest.mark.asyncio +async def test_make_handle_callback_receives_instance_and_propagates_return(): + """create/createBatch/get pass the JS WorkflowInstance to make_handle and + propagate its return verbatim. Workerd uses this to build a JS handle + `{id, status, pause, ...}` with bound method functors over the instance.""" + binding, _, _, _ = make_fake_binding() + handle_calls: list[str] = [] + + def make_handle(instance: Any) -> Any: + # Workerd's real make_handle also reads instance.id (sync JS prop) and + # calls .bind(instance) on each method. The host fake only needs .id. + handle_calls.append(instance.id) + return SimpleNamespace(kind="handle", id=instance.id) + + wrapped = dispatcher_binding_impl( + lambda: binding, {"tenantId": "t-1"}, make_handle=make_handle, + ) + + one = await wrapped.create({"id": "wf-1", "params": {}}) + batch = await wrapped.create_batch([ + {"id": "wf-a", "params": {}}, + {"id": "wf-b", "params": {}}, + ]) + fetched = await wrapped.get("wf-existing") + + # Each call routed through make_handle, in order, with the right instance. + assert handle_calls == ["wf-1", "wf-a", "wf-b", "wf-existing"] + + # The callback's return value is propagated unchanged — not coerced to a dict. + assert one == SimpleNamespace(kind="handle", id="wf-1") + assert batch == [ + SimpleNamespace(kind="handle", id="wf-a"), + SimpleNamespace(kind="handle", id="wf-b"), + ] + assert fetched == SimpleNamespace(kind="handle", id="wf-existing") diff --git a/packages/dynamic-workflows-py/tests/test_entrypoint.py b/packages/dynamic-workflows-py/tests/test_entrypoint.py new file mode 100644 index 0000000..445a5dc --- /dev/null +++ b/packages/dynamic-workflows-py/tests/test_entrypoint.py @@ -0,0 +1,281 @@ +"""Tests for dispatch_workflow_core — mirrors JS entrypoint.test.ts (12 cases). +Exercises the envelope-unwrap + delegate logic of the dispatch flow.""" + +import datetime +from typing import Any +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from dynamic_workflows._core import ( + MissingDispatcherMetadataError, + _METADATA_KEY, + dispatch_workflow_core, +) + + +def enveloped_event(params: Any, metadata: dict) -> dict: + """Build a fake WorkflowEvent whose payload carries a dispatcher envelope. + The envelope shape is duplicated here rather than imported — the wrap + helpers are intentionally not part of the test surface.""" + return { + "payload": {_METADATA_KEY: metadata, "params": params}, + "timestamp": datetime.datetime(2026, 1, 1), + "instanceId": "instance-1", + } + + +def bare_event(payload: Any) -> dict: + return { + "payload": payload, + "timestamp": datetime.datetime(2026, 1, 1), + "instanceId": "instance-1", + } + + +DUMMY_CTX = MagicMock(name="execution_context") +DUMMY_STEP = MagicMock(name="workflow_step") + + +class TestDispatchWorkflow: + @pytest.mark.asyncio + async def test_unwraps_metadata_from_event_and_passes_to_loader(self): + captured = [] + async def loader(load_ctx): + captured.append(load_ctx["metadata"]) + return MagicMock(run=AsyncMock(return_value="ok")) + + result = await dispatch_workflow_core( + env={}, ctx=DUMMY_CTX, + event=enveloped_event({"hello": "world"}, {"tenantId": "tenant-a"}), + step=DUMMY_STEP, load_runner=loader, + ) + + assert result == "ok" + assert captured == [{"tenantId": "tenant-a"}] + + @pytest.mark.asyncio + async def test_passes_env_and_ctx_through_to_loader(self): + captured = {} + async def loader(load_ctx): + captured.update(load_ctx) + return MagicMock(run=AsyncMock(return_value=None)) + + my_env = {"greeting": "hi"} + await dispatch_workflow_core( + env=my_env, ctx=DUMMY_CTX, + event=enveloped_event(None, {"tenantId": "t-1"}), + step=DUMMY_STEP, load_runner=loader, + ) + + assert captured["env"] is my_env + assert captured["ctx"] is DUMMY_CTX + + @pytest.mark.asyncio + async def test_delivers_unwrapped_params_to_dynamic_worker(self): + run_spy = AsyncMock(return_value="done") + async def loader(_): + return MagicMock(run=run_spy) + + await dispatch_workflow_core( + env={}, ctx=DUMMY_CTX, + event=enveloped_event({"the": "actual params"}, {"tenantId": "t-1"}), + step=DUMMY_STEP, load_runner=loader, + ) + + inner_event = run_spy.await_args.args[0] + assert inner_event["payload"] == {"the": "actual params"} + assert inner_event["instanceId"] == "instance-1" + + @pytest.mark.asyncio + async def test_forwards_workflow_step_object_untouched(self): + run_spy = AsyncMock(return_value=None) + async def loader(_): + return MagicMock(run=run_spy) + + await dispatch_workflow_core( + env={}, ctx=DUMMY_CTX, + event=enveloped_event(None, {}), + step=DUMMY_STEP, load_runner=loader, + ) + + # Identity (the default get_js_step is identity). + assert run_spy.await_args.args[1] is DUMMY_STEP + + @pytest.mark.asyncio + async def test_throws_missing_dispatcher_metadata_error_when_payload_not_envelope(self): + async def loader(_): + raise AssertionError("loader should not be called") + + with pytest.raises(MissingDispatcherMetadataError): + await dispatch_workflow_core( + env={}, ctx=DUMMY_CTX, + event=bare_event({"just": "user params"}), + step=DUMMY_STEP, load_runner=loader, + ) + + @pytest.mark.asyncio + async def test_throws_missing_dispatcher_metadata_error_on_null_payload(self): + async def loader(_): + raise AssertionError("loader should not be called") + + with pytest.raises(MissingDispatcherMetadataError): + await dispatch_workflow_core( + env={}, ctx=DUMMY_CTX, + event=bare_event(None), + step=DUMMY_STEP, load_runner=loader, + ) + + @pytest.mark.asyncio + async def test_supports_synchronous_loaders_returning_runner_directly(self): + def sync_loader(_): + return MagicMock(run=AsyncMock(return_value="sync-ok")) + + result = await dispatch_workflow_core( + env={}, ctx=DUMMY_CTX, + event=enveloped_event(None, {"tenantId": "t-1"}), + step=DUMMY_STEP, load_runner=sync_loader, + ) + + assert result == "sync-ok" + + @pytest.mark.asyncio + async def test_supports_async_loaders_returning_runner_via_promise(self): + async def async_loader(_): + return MagicMock(run=AsyncMock(return_value="async-ok")) + + result = await dispatch_workflow_core( + env={}, ctx=DUMMY_CTX, + event=enveloped_event(None, {"tenantId": "t-1"}), + step=DUMMY_STEP, load_runner=async_loader, + ) + + assert result == "async-ok" + + @pytest.mark.asyncio + async def test_propagates_errors_thrown_by_loader(self): + class LoaderError(Exception): + pass + + async def failing_loader(_): + raise LoaderError("nope") + + with pytest.raises(LoaderError, match="nope"): + await dispatch_workflow_core( + env={}, ctx=DUMMY_CTX, + event=enveloped_event(None, {"tenantId": "t-1"}), + step=DUMMY_STEP, load_runner=failing_loader, + ) + + @pytest.mark.asyncio + async def test_propagates_errors_thrown_by_dynamic_worker_run(self): + class RunError(Exception): + pass + + async def loader(_): + return MagicMock(run=AsyncMock(side_effect=RunError("boom"))) + + with pytest.raises(RunError, match="boom"): + await dispatch_workflow_core( + env={}, ctx=DUMMY_CTX, + event=enveloped_event(None, {"tenantId": "t-1"}), + step=DUMMY_STEP, load_runner=loader, + ) + + @pytest.mark.asyncio + async def test_default_decode_passes_runner_result_through_unchanged(self): + """With no decode injected, runner.run()'s return value is returned verbatim. + Host tests rely on this; workerd injects python_from_rpc.""" + async def loader(_): + return MagicMock(run=AsyncMock(return_value={"key": "value"})) + + result = await dispatch_workflow_core( + env={}, ctx=DUMMY_CTX, + event=enveloped_event(None, {"tenantId": "t-1"}), + step=DUMMY_STEP, load_runner=loader, + ) + + assert result == {"key": "value"} + + @pytest.mark.asyncio + async def test_decode_is_applied_to_runner_result(self): + """`decode` runs over runner.run()'s return value — workerd injects + python_from_rpc here so the dispatcher↔runner RPC boundary uses the + same converter pair as encode (python_to_rpc).""" + decode_spy = MagicMock(side_effect=lambda x: f"decoded({x})") + + async def loader(_): + return MagicMock(run=AsyncMock(return_value="raw-value")) + + result = await dispatch_workflow_core( + env={}, ctx=DUMMY_CTX, + event=enveloped_event(None, {"tenantId": "t-1"}), + step=DUMMY_STEP, load_runner=loader, + decode=decode_spy, + ) + + decode_spy.assert_called_once_with("raw-value") + assert result == "decoded(raw-value)" + + @pytest.mark.asyncio + async def test_default_decode_does_not_call_to_py_on_python_objects(self): + """A Python object that happens to expose a `to_py` method must pass + through untouched. The old `hasattr(result, 'to_py')` branch was a + false-positive footgun — the symmetric decode (python_from_rpc) gates + on hasattr(_, 'constructor') instead, so user objects are safe.""" + class HasToPyByCoincidence: + def to_py(self): + raise AssertionError("to_py should not be called on Python objects") + sentinel = HasToPyByCoincidence() + + async def loader(_): + return MagicMock(run=AsyncMock(return_value=sentinel)) + + result = await dispatch_workflow_core( + env={}, ctx=DUMMY_CTX, + event=enveloped_event(None, {"tenantId": "t-1"}), + step=DUMMY_STEP, load_runner=loader, + ) + + assert result is sentinel + + @pytest.mark.asyncio + async def test_invokes_loader_fresh_for_every_call(self): + call_count = 0 + async def loader(_): + nonlocal call_count + call_count += 1 + return MagicMock(run=AsyncMock(return_value=None)) + + for _ in range(3): + await dispatch_workflow_core( + env={}, ctx=DUMMY_CTX, + event=enveloped_event(None, {"tenantId": "t-1"}), + step=DUMMY_STEP, load_runner=loader, + ) + + assert call_count == 3 + + +class TestCreateDynamicWorkflowEntrypoint: + """Smoke tests for the factory. The full class can't be `new`'d on host + because WorkflowEntrypoint requires the workerd runtime — these are + structural assertions only, mirroring the JS smoke tests.""" + + def test_dispatch_workflow_core_is_a_callable(self): + from dynamic_workflows._core import dispatch_workflow_core + assert callable(dispatch_workflow_core) + + def test_unwrap_recognises_extra_keys_in_envelope(self): + """The envelope can carry additional keys (e.g. `source`) alongside + the required two — extra keys are passed through in metadata.""" + from dynamic_workflows._core import _unwrap_params + payload = { + _METADATA_KEY: {"tenantId": "t-1", "source": "..."}, + "params": {"input": "hello"}, + } + result = _unwrap_params(payload) + assert result is not None + metadata, params = result + assert metadata == {"tenantId": "t-1", "source": "..."} + assert params == {"input": "hello"} diff --git a/packages/dynamic-workflows-py/tests/test_envelope.py b/packages/dynamic-workflows-py/tests/test_envelope.py new file mode 100644 index 0000000..932e15e --- /dev/null +++ b/packages/dynamic-workflows-py/tests/test_envelope.py @@ -0,0 +1,80 @@ +"""Tests for the pure envelope helpers — wrap_params / unwrap_params. +These don't touch js/workers/pyodide, so they run on the host Python.""" + +import pytest + +from dynamic_workflows._core import ( + MissingDispatcherMetadataError, + _METADATA_KEY, + _unwrap_params, + _wrap_params, +) + + +class TestWrapParams: + def test_wraps_dict_params(self): + result = _wrap_params({"input": "hello"}, {"tenantId": "t-1"}) + assert result == { + _METADATA_KEY: {"tenantId": "t-1"}, + "params": {"input": "hello"}, + } + + def test_wraps_none_params(self): + result = _wrap_params(None, {"tenantId": "t-1"}) + assert result == {_METADATA_KEY: {"tenantId": "t-1"}, "params": None} + + def test_wraps_primitive_params(self): + result = _wrap_params(42, {"tenantId": "t-1"}) + assert result == {_METADATA_KEY: {"tenantId": "t-1"}, "params": 42} + + def test_passes_arbitrary_metadata_shapes(self): + meta = { + "tenantId": "acme", + "region": "us-east", + "features": ["beta", "pro"], + "nested": {"version": 3}, + } + result = _wrap_params({"job": "x"}, meta) + assert result[_METADATA_KEY] == meta + assert result["params"] == {"job": "x"} + + def test_does_not_mutate_metadata(self): + meta = {"tenantId": "t-1"} + _wrap_params({"foo": "bar"}, meta) + assert meta == {"tenantId": "t-1"} + + +class TestUnwrapParams: + def test_unwraps_valid_envelope(self): + envelope = {_METADATA_KEY: {"tenantId": "t-1"}, "params": {"input": "hello"}} + result = _unwrap_params(envelope) + assert result == ({"tenantId": "t-1"}, {"input": "hello"}) + + def test_returns_none_on_missing_metadata_key(self): + assert _unwrap_params({"params": {"input": "hello"}}) is None + + def test_returns_none_on_missing_params_key(self): + assert _unwrap_params({_METADATA_KEY: {"tenantId": "t-1"}}) is None + + def test_returns_none_on_non_dict_payload(self): + assert _unwrap_params("not a dict") is None + assert _unwrap_params(42) is None + assert _unwrap_params([1, 2, 3]) is None + assert _unwrap_params(None) is None + + +def test_wrap_unwrap_roundtrip(): + """Spot-check: wrap then unwrap returns the same data.""" + meta = {"tenantId": "t-42"} + params = {"step": "first", "args": [1, 2, 3]} + envelope = _wrap_params(params, meta) + out = _unwrap_params(envelope) + assert out == (meta, params) + + +def test_missing_dispatcher_metadata_error_message(): + """The exception type carries a helpful message — verify the gist of it.""" + err = MissingDispatcherMetadataError() + msg = str(err) + assert "missing dispatcher metadata" in msg + assert "wrap_workflow_binding" in msg diff --git a/packages/dynamic-workflows/README.md b/packages/dynamic-workflows/README.md index 7216da8..da6c3a4 100644 --- a/packages/dynamic-workflows/README.md +++ b/packages/dynamic-workflows/README.md @@ -42,6 +42,8 @@ You stay in charge of *how* tenant code is loaded — Worker Loader, service bin npm install @cloudflare/dynamic-workflows ``` +A Python port is available at [`packages/dynamic-workflows-py`](./packages/dynamic-workflows-py) with an end-to-end demo in [`examples/python`](./examples/python). It's a research preview. + ## Use it The dispatcher needs three things: re-export `DynamicWorkflowBinding`, hand each tenant a wrapped binding, and register a `WorkflowEntrypoint` that knows how to load tenant code. diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..6e25e04 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,11 @@ +# Virtual uv workspace root. +# Declares the two Python packages in this repo as workspace members so +# they can depend on each other via `{ workspace = true }` in [tool.uv.sources]. +# Co-exists with the JS pnpm workspace (pnpm-workspace.yaml) — separate tools, +# separate config files. + +[tool.uv.workspace] +members = [ + "packages/dynamic-workflows-py", + "examples/python", +]