From 9427ac5cd96d7dca05656ff9580bc7b6fe17ca47 Mon Sep 17 00:00:00 2001 From: Noppanat Wadlom Date: Fri, 22 May 2026 19:21:44 +0800 Subject: [PATCH 01/15] fix: spawn supervisor child to avoid OpenSSL fork deadlock MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The parent server opens Redis-over-TLS connections during lifespan startup, which initialises OpenSSL state. OpenSSL is not fork-safe — inheriting random-pool state and internal locks via `fork()` can deadlock the child the first time it calls `ssl.SSLContext.__new__` (observed as an intermittent "Supervisor child did not register a node within 30s" handshake timeout, with the child stuck in `ssl.create_default_context` under `redis-py`'s SSL connect path). Use `mp.get_context("spawn")` for the supervisor `Process` and for the IPC queues in `create_task_channel`. Spawn execs a fresh interpreter so the child gets clean OpenSSL state. The IPC primitives must be created from the same context — mixing fork-context `SemLock`s with a spawn-context process raises a RuntimeError. Signed-off-by: Noppanat Wadlom --- src/server/supervisor/supervisor.py | 15 ++++++++++----- src/server/utils/concurrent.py | 20 ++++++++++++++++++-- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/src/server/supervisor/supervisor.py b/src/server/supervisor/supervisor.py index 26007b76..2fe48f14 100644 --- a/src/server/supervisor/supervisor.py +++ b/src/server/supervisor/supervisor.py @@ -2,9 +2,9 @@ import asyncio import logging -import multiprocessing as mp import os import signal +from multiprocessing.process import BaseProcess from multiprocessing.queues import Queue as MPQueue from queue import Empty as QueueEmpty @@ -18,7 +18,12 @@ WorkerManagementConfig, ) from ..hooks import PrincipalContext -from ..utils.concurrent import TaskReceiver, TaskSender, create_task_channel +from ..utils.concurrent import ( + TaskReceiver, + TaskSender, + create_task_channel, + get_mp_context, +) _CMD_TIMEOUT = 120.0 _NODE_ID_HANDSHAKE_TIMEOUT = 30.0 @@ -42,10 +47,10 @@ def __init__( self._worker_management = worker_management self._logging_config = logging_config self._logger = logger - self._process: mp.Process | None = None + self._process: BaseProcess | None = None self._cmd_sender: TaskSender[CommandMessage, CommandResponse] | None = None self._cmd_receiver: TaskReceiver[CommandMessage, CommandResponse] | None = None - self._node_id_queue: MPQueue[str] = mp.Queue(maxsize=1) + self._node_id_queue: MPQueue[str] = get_mp_context().Queue(maxsize=1) self._node_id: str | None = None @property @@ -66,7 +71,7 @@ async def start(self, system_principal: PrincipalContext) -> None: return self._cmd_sender, self._cmd_receiver = create_task_channel() - self._process = mp.Process( + self._process = get_mp_context().Process( target=_run_supervisor, kwargs={ "identity": self._identity, diff --git a/src/server/utils/concurrent.py b/src/server/utils/concurrent.py index e9dce337..7e55e729 100644 --- a/src/server/utils/concurrent.py +++ b/src/server/utils/concurrent.py @@ -1,12 +1,27 @@ import asyncio +import functools import multiprocessing as mp import threading from collections.abc import Iterable +from multiprocessing.context import SpawnContext from multiprocessing.queues import Queue as MPQueue type TaskIDType = str +@functools.cache +def get_mp_context() -> SpawnContext: + """Return the shared multiprocessing context for FlowMesh child processes. + + Spawn (not fork) — the parent opens Redis-over-TLS connections during + lifespan startup, which initialises OpenSSL state. OpenSSL is not + fork-safe; the child can deadlock in `ssl.SSLContext.__new__` when it + builds its own connections. All IPC primitives shared between parent + and child must be created from this same context. + """ + return mp.get_context("spawn") + + class Sentinel: pass @@ -90,8 +105,9 @@ def send_result(self, task_id: TaskIDType, result: R) -> None: def create_task_channel[T, R]() -> tuple[TaskSender[T, R], TaskReceiver[T, R]]: - send_q: MPQueue[tuple[TaskIDType, T | Sentinel]] = mp.Queue() - recv_q: MPQueue[tuple[TaskIDType, R | Sentinel]] = mp.Queue() + ctx = get_mp_context() + send_q: MPQueue[tuple[TaskIDType, T | Sentinel]] = ctx.Queue() + recv_q: MPQueue[tuple[TaskIDType, R | Sentinel]] = ctx.Queue() sender = TaskSender(send_q, recv_q) receiver = TaskReceiver(send_q, recv_q) return sender, receiver From dd00769a6f7930cbd6ed15a2fe812bdafb5f752d Mon Sep 17 00:00:00 2001 From: Noppanat Wadlom Date: Fri, 22 May 2026 20:11:37 +0800 Subject: [PATCH 02/15] chore: silence diffusers GHSA-7wx4-6vff-v64p in pip-audit Newly disclosed CVE in diffusers 0.36.0, fixed in 0.38.0. Bumping is blocked by the same `safetensors>=0.8.0rc0` pre-release requirement that already gates GHSA-j7w6-vpvq-j3gm and GHSA-98h9-4798-4q5v; adding to the existing diffusers row block in the workflow and the advisory table. Signed-off-by: Noppanat Wadlom --- .github/workflows/security.yml | 2 ++ docs/CODE_STYLE.md | 1 + 2 files changed, 3 insertions(+) diff --git a/.github/workflows/security.yml b/.github/workflows/security.yml index d2540862..3ef041aa 100644 --- a/.github/workflows/security.yml +++ b/.github/workflows/security.yml @@ -105,6 +105,7 @@ jobs: --ignore-vuln GHSA-vfmq-68hx-4jfw \ --ignore-vuln GHSA-j7w6-vpvq-j3gm \ --ignore-vuln GHSA-98h9-4798-4q5v \ + --ignore-vuln GHSA-7wx4-6vff-v64p \ --ignore-vuln PYSEC-2025-189 \ --ignore-vuln PYSEC-2025-190 \ --ignore-vuln PYSEC-2025-191 \ @@ -152,6 +153,7 @@ jobs: --ignore-vuln GHSA-83vm-p52w-f9pw \ --ignore-vuln GHSA-j7w6-vpvq-j3gm \ --ignore-vuln GHSA-98h9-4798-4q5v \ + --ignore-vuln GHSA-7wx4-6vff-v64p \ --ignore-vuln PYSEC-2025-189 \ --ignore-vuln PYSEC-2025-190 \ --ignore-vuln PYSEC-2025-191 \ diff --git a/docs/CODE_STYLE.md b/docs/CODE_STYLE.md index 394628a9..b91efe78 100644 --- a/docs/CODE_STYLE.md +++ b/docs/CODE_STYLE.md @@ -117,6 +117,7 @@ as `--ignore-vuln` flags in `.github/workflows/security.yml`. | GHSA-w8v5-vhqr-4h9v | diskcache | (none) | upstream unmaintained, no fixed version published | | GHSA-j7w6-vpvq-j3gm | diffusers | 0.38.0 | fix requires safetensors>=0.8.0rc0 pre-release; uv lock won't pick up pre-releases without explicit opt-in | | GHSA-98h9-4798-4q5v | diffusers | 0.38.0 | same blocker as GHSA-j7w6-vpvq-j3gm — both fixed in 0.38.0 | +| GHSA-7wx4-6vff-v64p | diffusers | 0.38.0 | same blocker as GHSA-j7w6-vpvq-j3gm — fixed in 0.38.0 | | PYSEC-2025-189 | torch | (none) | no fix version published | | PYSEC-2025-190 | torch | (none) | same | | PYSEC-2025-191 | torch | (none) | same | From 6b26949a1a178ff2286d98ffeea4857f56359296 Mon Sep 17 00:00:00 2001 From: Noppanat Wadlom Date: Fri, 22 May 2026 21:28:06 +0800 Subject: [PATCH 03/15] refactor: replace get_mp_context() with module-level MP_CTX constant `mp.get_context("spawn")` returns a singleton instantiated at `multiprocessing` import time, so `@functools.cache` on a getter was redundant and the "lazily constructed" framing was misleading. A module-level constant is simpler, honest about what it is, and reads naturally at call sites. Signed-off-by: Noppanat Wadlom --- src/server/supervisor/supervisor.py | 6 +++--- src/server/utils/concurrent.py | 27 +++++++++------------------ 2 files changed, 12 insertions(+), 21 deletions(-) diff --git a/src/server/supervisor/supervisor.py b/src/server/supervisor/supervisor.py index 2fe48f14..988ec1a5 100644 --- a/src/server/supervisor/supervisor.py +++ b/src/server/supervisor/supervisor.py @@ -19,10 +19,10 @@ ) from ..hooks import PrincipalContext from ..utils.concurrent import ( + MP_CTX, TaskReceiver, TaskSender, create_task_channel, - get_mp_context, ) _CMD_TIMEOUT = 120.0 @@ -50,7 +50,7 @@ def __init__( self._process: BaseProcess | None = None self._cmd_sender: TaskSender[CommandMessage, CommandResponse] | None = None self._cmd_receiver: TaskReceiver[CommandMessage, CommandResponse] | None = None - self._node_id_queue: MPQueue[str] = get_mp_context().Queue(maxsize=1) + self._node_id_queue: MPQueue[str] = MP_CTX.Queue(maxsize=1) self._node_id: str | None = None @property @@ -71,7 +71,7 @@ async def start(self, system_principal: PrincipalContext) -> None: return self._cmd_sender, self._cmd_receiver = create_task_channel() - self._process = get_mp_context().Process( + self._process = MP_CTX.Process( target=_run_supervisor, kwargs={ "identity": self._identity, diff --git a/src/server/utils/concurrent.py b/src/server/utils/concurrent.py index 7e55e729..8f284894 100644 --- a/src/server/utils/concurrent.py +++ b/src/server/utils/concurrent.py @@ -1,25 +1,17 @@ import asyncio -import functools import multiprocessing as mp import threading from collections.abc import Iterable -from multiprocessing.context import SpawnContext from multiprocessing.queues import Queue as MPQueue -type TaskIDType = str - +# Shared multiprocessing context for FlowMesh child processes. Spawn (not fork) because +# the parent opens Redis-over-TLS connections during lifespan startup, which initialises +# OpenSSL state; OpenSSL is not fork-safe and the child can deadlock in +# `ssl.SSLContext.__new__` when it builds its own connections. All IPC primitives shared +# between parent and child must be created from this same context. +MP_CTX = mp.get_context("spawn") -@functools.cache -def get_mp_context() -> SpawnContext: - """Return the shared multiprocessing context for FlowMesh child processes. - - Spawn (not fork) — the parent opens Redis-over-TLS connections during - lifespan startup, which initialises OpenSSL state. OpenSSL is not - fork-safe; the child can deadlock in `ssl.SSLContext.__new__` when it - builds its own connections. All IPC primitives shared between parent - and child must be created from this same context. - """ - return mp.get_context("spawn") +type TaskIDType = str class Sentinel: @@ -105,9 +97,8 @@ def send_result(self, task_id: TaskIDType, result: R) -> None: def create_task_channel[T, R]() -> tuple[TaskSender[T, R], TaskReceiver[T, R]]: - ctx = get_mp_context() - send_q: MPQueue[tuple[TaskIDType, T | Sentinel]] = ctx.Queue() - recv_q: MPQueue[tuple[TaskIDType, R | Sentinel]] = ctx.Queue() + send_q: MPQueue[tuple[TaskIDType, T | Sentinel]] = MP_CTX.Queue() + recv_q: MPQueue[tuple[TaskIDType, R | Sentinel]] = MP_CTX.Queue() sender = TaskSender(send_q, recv_q) receiver = TaskReceiver(send_q, recv_q) return sender, receiver From c8fa1b2ea9292c9648066864fcb165d2733e4a63 Mon Sep 17 00:00:00 2001 From: Noppanat Wadlom Date: Fri, 22 May 2026 22:06:30 +0800 Subject: [PATCH 04/15] chore: silence starlette PYSEC-2026-161 in pip-audit MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Newly disclosed CVE in starlette 0.52.1, fixed in 1.0.1. Bumping is blocked by `gradio==5.50` (transitive via `vllm-omni==0.18`), which caps `starlette<1.0` — same chain that gates the existing gradio / vllm-omni CVE ignores. Add to the worker-GPU pip-audit invocation (where the failure surfaced) and document the row in the advisory table. Signed-off-by: Noppanat Wadlom --- .github/workflows/security.yml | 1 + docs/CODE_STYLE.md | 1 + 2 files changed, 2 insertions(+) diff --git a/.github/workflows/security.yml b/.github/workflows/security.yml index 3ef041aa..c9f83285 100644 --- a/.github/workflows/security.yml +++ b/.github/workflows/security.yml @@ -178,4 +178,5 @@ jobs: --ignore-vuln PYSEC-2024-277 \ --ignore-vuln PYSEC-2025-222 \ --ignore-vuln PYSEC-2024-274 \ + --ignore-vuln PYSEC-2026-161 \ -r src/worker/requirements/requirements.gpu.txt diff --git a/docs/CODE_STYLE.md b/docs/CODE_STYLE.md index b91efe78..15ac62d4 100644 --- a/docs/CODE_STYLE.md +++ b/docs/CODE_STYLE.md @@ -142,6 +142,7 @@ as `--ignore-vuln` flags in `.github/workflows/security.yml`. | PYSEC-2024-277 | joblib | (none) | no fix version published | | PYSEC-2025-222 | vllm | (none) | no fix version published; held by vllm-omni 0.18 pin | | PYSEC-2024-274 | gradio | (none) | no fix version published; vllm-omni 0.18 pins gradio==5.50 | +| PYSEC-2026-161 | starlette | 1.0.1 | gradio 5.50 caps starlette<1.0 (transitive via vllm-omni 0.18) | When a blocker lifts (e.g. transformers 5 ↔ vllm 0.19 line stabilizes), drop the corresponding `--ignore-vuln` flag from the workflow and the From c5e37bcca29b21d6e9c9d39191d60664c2c54fdb Mon Sep 17 00:00:00 2001 From: Noppanat Wadlom Date: Fri, 22 May 2026 22:16:15 +0800 Subject: [PATCH 05/15] chore: silence starlette PYSEC-2026-161 in server pip-audit too MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The previous commit added the ignore only to the worker-GPU step because that's where the CVE first surfaced. The lock then resolved starlette to a different (still <1.0.1) version on the server side, exposing the same advisory in the server pip-audit step. Same blocker (gradio 5.50 caps starlette<1.0 via vllm-omni 0.18 — already in the docs table). Signed-off-by: Noppanat Wadlom --- .github/workflows/security.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/security.yml b/.github/workflows/security.yml index c9f83285..8ebaa8d9 100644 --- a/.github/workflows/security.yml +++ b/.github/workflows/security.yml @@ -89,6 +89,7 @@ jobs: run: | grep -v '@ git+' src/server/requirements.txt > /tmp/requirements-server-audit.txt uvx pip-audit==2.9.0 --strict \ + --ignore-vuln PYSEC-2026-161 \ -r /tmp/requirements-server-audit.txt - name: Run pip-audit (worker CPU) # Drop ``@ git+`` deps before auditing — not on PyPI, no CVE feed. From d1cc8dbad691540c4b48181dfe64929f68bf231d Mon Sep 17 00:00:00 2001 From: Noppanat Wadlom Date: Wed, 20 May 2026 14:51:29 +0800 Subject: [PATCH 06/15] docs: fix flowmesh-hook documentation Signed-off-by: Noppanat Wadlom --- hook/README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/hook/README.md b/hook/README.md index 20950664..2e60def3 100644 --- a/hook/README.md +++ b/hook/README.md @@ -3,7 +3,9 @@ FlowMesh-specific plugin extension surface. Carries the pieces FlowMesh adds on top of [`lumid-hooks`](https://github.com/mlsys-io/lumid.hooks): -- `HookBindings` — concrete dataclass with FlowMesh's six fields (the five +- `HookBindings` — runtime-checkable Protocol extending the shared one with + `supplier_resolvers`, used by the server's plugin gate. +- `BaseBindings` — frozen dataclass with FlowMesh's six fields (the five shared from `lumid-hooks` plus `supplier_resolvers`). - `ResourceKind` / `ResourceAction` — FlowMesh resource and action enums. - `SupplierResolver` / `WorkerView` — supplier attribution at dispatch time. From ff8144d988aed0cd203200d7b1ca7bc5c13a1644 Mon Sep 17 00:00:00 2001 From: Noppanat Wadlom Date: Wed, 20 May 2026 17:51:55 +0800 Subject: [PATCH 07/15] feat(stack): add FLOWMESH_PLUGIN_DATA_DIR for plugin writable storage MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit FLOWMESH_PLUGIN_DIR is deliberately read-only — it's the code mount — so plugins that need persistence (a SQLite ACL, a cache file) currently have to ask each operator to add a custom bind via docker-compose.override.yml, which is brittle. Add a second mount at /app/plugin-data driven by FLOWMESH_PLUGIN_DATA_DIR. A path value (the default, `./plugin-data`) is a host bind, auto-created on stack up; a bare name is an external Docker volume of that name, which the operator precreates and owns the lifecycle of. The CLI discriminates path vs bare name in apply_plugin_data_env and routes the value to either the service mount directly (path mode) or an internal FLOWMESH_PLUGIN_DATA_VOLUME parameter consumed by a parameterized external volume declaration (volume mode). Signed-off-by: Noppanat Wadlom --- .../src/flowmesh_cli_stack/assets/.env.example | 12 ++++++++---- .../src/flowmesh_cli_stack/assets/compose.yml | 4 ++++ cli/stack/src/flowmesh_cli_stack/env_schema.py | 18 ++++++++++++++---- cli/stack/src/flowmesh_cli_stack/stack.py | 2 ++ cli/stack/src/flowmesh_cli_stack/utils.py | 16 ++++++++++++++++ docs/ENV.md | 1 + docs/PLUGINS.md | 5 +++++ 7 files changed, 50 insertions(+), 8 deletions(-) diff --git a/cli/stack/src/flowmesh_cli_stack/assets/.env.example b/cli/stack/src/flowmesh_cli_stack/assets/.env.example index c34520bd..d1c56c2a 100644 --- a/cli/stack/src/flowmesh_cli_stack/assets/.env.example +++ b/cli/stack/src/flowmesh_cli_stack/assets/.env.example @@ -173,11 +173,15 @@ NEBULA_API_TOKEN= # ==== External Plugins ==== # Plugins are Python packages dropped under FLOWMESH_PLUGIN_DIR -# (host-mounted to /app/plugins on the server) and selected by -# FLOWMESH_PLUGINS as a comma-separated list of top-level module -# names. Each named module must expose `install()` returning a -# `HookBindings`. Leave both empty unless you ship a plugin. +# (read-only at /app/plugins) and selected by FLOWMESH_PLUGINS as +# a comma-separated list of top-level module names. Each must +# expose `install()` returning a `HookBindings`. +# FLOWMESH_PLUGIN_DATA_DIR is writable at /app/plugin-data for +# plugin state. Leave all empty unless you ship a plugin. FLOWMESH_PLUGIN_DIR=./plugins +# A path (`./x`, `/abs/x`) -> host bind-mount (auto-created). +# A bare name -> external Docker volume of that name. +FLOWMESH_PLUGIN_DATA_DIR=./plugin-data FLOWMESH_PLUGINS= # ==== Agent Executor (youtu-agent / utu) ==== diff --git a/cli/stack/src/flowmesh_cli_stack/assets/compose.yml b/cli/stack/src/flowmesh_cli_stack/assets/compose.yml index 9bc07dfb..a1df7d71 100644 --- a/cli/stack/src/flowmesh_cli_stack/assets/compose.yml +++ b/cli/stack/src/flowmesh_cli_stack/assets/compose.yml @@ -166,6 +166,7 @@ services: - /var/run/docker.sock:/var/run/docker.sock - ${SERVER_WORKER_CONFIG:-./configs/worker_config.yaml}:/etc/flowmesh/worker_config.yaml:ro - ${FLOWMESH_PLUGIN_DIR:-./plugins}:/app/plugins:ro + - ${FLOWMESH_PLUGIN_DATA_DIR:-./plugin-data}:/app/plugin-data - ${REDIS_TLS_DIR:-./secrets/tls/redis}:/etc/ssl/redis:ro - ${SERVER_TLS_DIR:-./secrets/tls/server}:/etc/ssl/server:ro restart: unless-stopped @@ -199,3 +200,6 @@ volumes: name: ${FLOWMESH_STACK_SLUG:-flowmesh_node}_metrics flowmesh_server_logs: name: ${FLOWMESH_STACK_SLUG:-flowmesh_node}_server_logs + flowmesh_plugin_data: + external: true + name: ${FLOWMESH_PLUGIN_DATA_VOLUME:-flowmesh_plugin_data} diff --git a/cli/stack/src/flowmesh_cli_stack/env_schema.py b/cli/stack/src/flowmesh_cli_stack/env_schema.py index 30644028..eef83667 100644 --- a/cli/stack/src/flowmesh_cli_stack/env_schema.py +++ b/cli/stack/src/flowmesh_cli_stack/env_schema.py @@ -530,10 +530,11 @@ title="External Plugins", description=[ "Plugins are Python packages dropped under FLOWMESH_PLUGIN_DIR ", - "(host-mounted to /app/plugins on the server) and selected by ", - "FLOWMESH_PLUGINS as a comma-separated list of top-level module ", - "names. Each named module must expose `install()` returning a ", - "`HookBindings`. Leave both empty unless you ship a plugin.", + "(read-only at /app/plugins) and selected by FLOWMESH_PLUGINS as ", + "a comma-separated list of top-level module names. Each must ", + "expose `install()` returning a `HookBindings`. ", + "FLOWMESH_PLUGIN_DATA_DIR is writable at /app/plugin-data for ", + "plugin state. Leave all empty unless you ship a plugin.", ], vars=[ EnvVar( @@ -543,6 +544,15 @@ use_default=True, ensure_path="create", ), + EnvVar( + "FLOWMESH_PLUGIN_DATA_DIR", + "./plugin-data", + use_default=True, + description=[ + "A path (`./x`, `/abs/x`) -> host bind-mount (auto-created).", + "A bare name -> external Docker volume of that name.", + ], + ), EnvVar("FLOWMESH_PLUGINS", ""), ], ), diff --git a/cli/stack/src/flowmesh_cli_stack/stack.py b/cli/stack/src/flowmesh_cli_stack/stack.py index c8a075b4..630b32b8 100644 --- a/cli/stack/src/flowmesh_cli_stack/stack.py +++ b/cli/stack/src/flowmesh_cli_stack/stack.py @@ -35,6 +35,7 @@ from .utils import ( DEFAULT_ENV_FILE, STACK_PATH_KEYS, + apply_plugin_data_env, apply_stack_resource_env, ensure_deploy_paths, parse_node_role, @@ -58,6 +59,7 @@ def _load(env_file: Path) -> None: except ValueError as exc: logging.error(str(exc)) raise typer.Exit(code=1) + apply_plugin_data_env(Path.cwd()) return DockerComposeStack( compose_file=stack_compose_file(), diff --git a/cli/stack/src/flowmesh_cli_stack/utils.py b/cli/stack/src/flowmesh_cli_stack/utils.py index 5acb487a..1fac0dd3 100644 --- a/cli/stack/src/flowmesh_cli_stack/utils.py +++ b/cli/stack/src/flowmesh_cli_stack/utils.py @@ -57,6 +57,22 @@ def apply_stack_resource_env() -> None: os.environ[WORKER_RESULTS_DIR_ENV] = results_volume +_PLUGIN_DATA_PATH_PREFIXES = ("/", "./", "../", "~/", "~") +_PLUGIN_DATA_ALIAS = "flowmesh_plugin_data" +_PLUGIN_DATA_DEFAULT = "./plugin-data" + + +def apply_plugin_data_env(base_dir: Path) -> None: + raw = os.environ.get("FLOWMESH_PLUGIN_DATA_DIR", "").strip() + if not raw or raw.startswith(_PLUGIN_DATA_PATH_PREFIXES): + resolved = resolve_path(raw, default=_PLUGIN_DATA_DEFAULT, base_dir=base_dir) + ensure_dir(resolved) + os.environ["FLOWMESH_PLUGIN_DATA_DIR"] = str(resolved) + else: + os.environ["FLOWMESH_PLUGIN_DATA_VOLUME"] = raw + os.environ["FLOWMESH_PLUGIN_DATA_DIR"] = _PLUGIN_DATA_ALIAS + + def stack_compose_file() -> Path: return asset_path("flowmesh_cli_stack.assets", "compose.yml") diff --git a/docs/ENV.md b/docs/ENV.md index 9bde44e0..11da7453 100644 --- a/docs/ENV.md +++ b/docs/ENV.md @@ -35,6 +35,7 @@ listed here is in `.env.example`. | `ENABLE_WORKER_WATCHDOG` | `true` | Worker death detection | | `WORKER_DEATH_GRACE_SEC` | `60` | Grace period before marking dead | | `FLOWMESH_PLUGINS` | – | Comma-separated plugin module names | +| `FLOWMESH_PLUGIN_DATA_DIR` | `./plugin-data` | Writable mount at `/app/plugin-data` for plugin state. A path -> host bind-mount (auto-created); a bare name -> external Docker volume of that name. | | `SERVER_CUDA_PROBE_IMAGE` | `nvidia/cuda:12.9.1-base-ubuntu24.04` | CUDA image the server runs briefly to query local GPU names/indices | | `DOCKER_GPU_RUNTIME` | nvidia | Optional Docker runtime name for GPU probe/worker containers; leave empty unless the host requires a named runtime such as `nvidia` | | `FLOWMESH_API_KEY` | – | Forwarded to spawned workers as their server-callback bearer | diff --git a/docs/PLUGINS.md b/docs/PLUGINS.md index 6e5397ef..23dd30d4 100644 --- a/docs/PLUGINS.md +++ b/docs/PLUGINS.md @@ -134,6 +134,11 @@ Each subdirectory of `FLOWMESH_PLUGIN_DIR` is importable as a top-level module. The mount is read-only, so the plugin code is treated as static deployment artifact. +For writable persistence, `FLOWMESH_PLUGIN_DATA_DIR` (default +`./plugin-data`) is mounted read-write at `/app/plugin-data`. A path +value is a host bind-mount (auto-created on `stack up`); a bare name +is an external Docker volume of that name. + This handles plugin **code** without rebuilding the server image. When that isn't enough, build a thin overlay on top of the prebuilt image. Two patterns, pick by need: From 8aad3abbf89bacdfd96dc6972c2beaea3eb6bafe Mon Sep 17 00:00:00 2001 From: Noppanat Wadlom Date: Thu, 21 May 2026 16:08:29 +0800 Subject: [PATCH 08/15] fix(stack): slug-scope the default plugin-data volume name The default external volume name was the bare alias `flowmesh_plugin_data`, which would collide across multiple stacks sharing a host (FLOWMESH_STACK_SUFFIX exists exactly to scope these names per-stack). Match the convention used by the other named volumes in the same file. Signed-off-by: Noppanat Wadlom --- cli/stack/src/flowmesh_cli_stack/assets/compose.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cli/stack/src/flowmesh_cli_stack/assets/compose.yml b/cli/stack/src/flowmesh_cli_stack/assets/compose.yml index a1df7d71..c381e9f8 100644 --- a/cli/stack/src/flowmesh_cli_stack/assets/compose.yml +++ b/cli/stack/src/flowmesh_cli_stack/assets/compose.yml @@ -202,4 +202,4 @@ volumes: name: ${FLOWMESH_STACK_SLUG:-flowmesh_node}_server_logs flowmesh_plugin_data: external: true - name: ${FLOWMESH_PLUGIN_DATA_VOLUME:-flowmesh_plugin_data} + name: ${FLOWMESH_PLUGIN_DATA_VOLUME:-${FLOWMESH_STACK_SLUG:-flowmesh_node}_plugin_data} From 8514182ddb4d14ac13146a27151be0db4e3282a5 Mon Sep 17 00:00:00 2001 From: Noppanat Wadlom Date: Thu, 21 May 2026 16:08:40 +0800 Subject: [PATCH 09/15] refactor(stack): defer plugin-data dir creation to deploy-time `apply_plugin_data_env` was hooked into `_stack()._load`, which runs on every stack subcommand (`up`, `down`, `ps`, `logs`, `clean`, ...) via DockerComposeStack.run, so `ensure_dir(resolved)` materialized ./plugin-data in the operator's cwd even on read-only commands. FLOWMESH_PLUGIN_DIR avoids this by routing its mkdir through `ensure_deploy_paths`, which only runs when `to_deploy=True`. Split the helper: env-var routing (path resolution + alias re-route) stays in `apply_plugin_data_env` so compose substitution always has the right values; the `ensure_dir` call moves into `ensure_deploy_paths`, gated on `FLOWMESH_PLUGIN_DATA_VOLUME` being unset (volume mode is operator-managed, no host dir to create). Also drops the redundant "~/" prefix from _PLUGIN_DATA_PATH_PREFIXES (the trailing "~" already subsumes it) and adds five unit tests covering the empty / relative / absolute / tilde / bare-name branches. Signed-off-by: Noppanat Wadlom --- cli/stack/src/flowmesh_cli_stack/utils.py | 16 ++++++-- tests/cli/test_stack_utils.py | 46 +++++++++++++++++++++++ 2 files changed, 58 insertions(+), 4 deletions(-) diff --git a/cli/stack/src/flowmesh_cli_stack/utils.py b/cli/stack/src/flowmesh_cli_stack/utils.py index 1fac0dd3..011eb14c 100644 --- a/cli/stack/src/flowmesh_cli_stack/utils.py +++ b/cli/stack/src/flowmesh_cli_stack/utils.py @@ -57,19 +57,19 @@ def apply_stack_resource_env() -> None: os.environ[WORKER_RESULTS_DIR_ENV] = results_volume -_PLUGIN_DATA_PATH_PREFIXES = ("/", "./", "../", "~/", "~") +_PLUGIN_DATA_PATH_PREFIXES = ("/", "./", "../", "~") _PLUGIN_DATA_ALIAS = "flowmesh_plugin_data" _PLUGIN_DATA_DEFAULT = "./plugin-data" +_PLUGIN_DATA_VOLUME_ENV = "FLOWMESH_PLUGIN_DATA_VOLUME" def apply_plugin_data_env(base_dir: Path) -> None: raw = os.environ.get("FLOWMESH_PLUGIN_DATA_DIR", "").strip() if not raw or raw.startswith(_PLUGIN_DATA_PATH_PREFIXES): resolved = resolve_path(raw, default=_PLUGIN_DATA_DEFAULT, base_dir=base_dir) - ensure_dir(resolved) - os.environ["FLOWMESH_PLUGIN_DATA_DIR"] = str(resolved) + os.environ["FLOWMESH_PLUGIN_DATA_DIR"] = resolved.as_posix() else: - os.environ["FLOWMESH_PLUGIN_DATA_VOLUME"] = raw + os.environ[_PLUGIN_DATA_VOLUME_ENV] = raw os.environ["FLOWMESH_PLUGIN_DATA_DIR"] = _PLUGIN_DATA_ALIAS @@ -134,6 +134,14 @@ def ensure_deploy_paths(base_dir: Path) -> None: base_dir=base_dir, ) ) + if not os.environ.get(_PLUGIN_DATA_VOLUME_ENV): + ensure_dir( + resolve_path( + os.getenv("FLOWMESH_PLUGIN_DATA_DIR", ""), + default=_PLUGIN_DATA_DEFAULT, + base_dir=base_dir, + ) + ) def parse_node_role(raw: str) -> NodeRole: diff --git a/tests/cli/test_stack_utils.py b/tests/cli/test_stack_utils.py index 04390c64..fbf449e9 100644 --- a/tests/cli/test_stack_utils.py +++ b/tests/cli/test_stack_utils.py @@ -1,11 +1,14 @@ import os +from pathlib import Path from unittest.mock import patch import pytest from flowmesh_cli_stack.utils import ( + _PLUGIN_DATA_ALIAS, STACK_SLUG_ENV, STACK_SUFFIX_ENV, WORKER_RESULTS_DIR_ENV, + apply_plugin_data_env, apply_stack_resource_env, stack_resource_env_overrides, ) @@ -37,3 +40,46 @@ def test_apply_stack_resource_env_defaults_results_dirs_from_suffix() -> None: ): apply_stack_resource_env() assert os.environ[WORKER_RESULTS_DIR_ENV] == "flowmesh_node_alice.dev_results" + + +def test_apply_plugin_data_env_empty_resolves_default(tmp_path: Path) -> None: + with patch.dict(os.environ, {"FLOWMESH_PLUGIN_DATA_DIR": ""}, clear=True): + apply_plugin_data_env(tmp_path) + expected = (tmp_path / "plugin-data").as_posix() + assert os.environ["FLOWMESH_PLUGIN_DATA_DIR"] == expected + assert "FLOWMESH_PLUGIN_DATA_VOLUME" not in os.environ + assert not (tmp_path / "plugin-data").exists() # routing only; no mkdir + + +def test_apply_plugin_data_env_relative_path_is_cwd_resolved(tmp_path: Path) -> None: + env = {"FLOWMESH_PLUGIN_DATA_DIR": "./custom-data"} + with patch.dict(os.environ, env, clear=True): + apply_plugin_data_env(tmp_path) + expected = (tmp_path / "custom-data").as_posix() + assert os.environ["FLOWMESH_PLUGIN_DATA_DIR"] == expected + assert "FLOWMESH_PLUGIN_DATA_VOLUME" not in os.environ + + +def test_apply_plugin_data_env_absolute_path_passthrough(tmp_path: Path) -> None: + abs_path = "/var/lib/flowmesh-plugin" + with patch.dict(os.environ, {"FLOWMESH_PLUGIN_DATA_DIR": abs_path}, clear=True): + apply_plugin_data_env(tmp_path) + assert os.environ["FLOWMESH_PLUGIN_DATA_DIR"] == abs_path + assert "FLOWMESH_PLUGIN_DATA_VOLUME" not in os.environ + + +def test_apply_plugin_data_env_tilde_is_path(tmp_path: Path) -> None: + env = {"FLOWMESH_PLUGIN_DATA_DIR": "~/flowmesh-data"} + with patch.dict(os.environ, env, clear=True): + apply_plugin_data_env(tmp_path) + # Resolved against base_dir; the leading ~ keeps it in path mode. + assert "FLOWMESH_PLUGIN_DATA_VOLUME" not in os.environ + assert os.environ["FLOWMESH_PLUGIN_DATA_DIR"] # set to something resolved + + +def test_apply_plugin_data_env_bare_name_routes_to_volume(tmp_path: Path) -> None: + env = {"FLOWMESH_PLUGIN_DATA_DIR": "my_external_vol"} + with patch.dict(os.environ, env, clear=True): + apply_plugin_data_env(tmp_path) + assert os.environ["FLOWMESH_PLUGIN_DATA_VOLUME"] == "my_external_vol" + assert os.environ["FLOWMESH_PLUGIN_DATA_DIR"] == _PLUGIN_DATA_ALIAS From d703f82c5e93718bcc615100d40b19d88614669c Mon Sep 17 00:00:00 2001 From: Noppanat Wadlom Date: Thu, 21 May 2026 23:47:11 +0800 Subject: [PATCH 10/15] feat: startup reconcile fires ResourceRegistrar.refresh + purge_stale MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the host restarts past a ResourceRegistrar's TTL, persistent plugins were silently dropping grants on long-lived workers and workflows. The reconcile sweep replaces TTL pruning with a stronger guarantee: on boot, enumerate every live resource (workflows + their tasks, nodes, workers) and hand the full batch to each registrar's `refresh`, then call `purge_stale` once per registrar. Registrars that persist state use the pair to drop records the server no longer tracks. - `auth.security.refresh_resources(refs, logger)` and `purge_stale_resources(logger)` fan out to every `ResourceRegistrar`. - `_lifespan` runs `_reconcile_resources()` once after plugins load and the system principal resolves. Worker / workflow enumeration is gated on `IS_ROOT_NODE`; nodes are always present. - docs/PLUGINS.md notes the new sweep on the `ResourceRegistrar` bullet. Requires lumid-hooks 0.2.0 for the new Protocol methods. The `tool.uv.sources` entry pointing at `../lumid.hooks` is temporary — drop it (and refresh the pinned requirements files) once 0.2.0 is on PyPI. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Noppanat Wadlom --- docs/PLUGINS.md | 8 ++++- pyproject.toml | 5 +++- src/server/auth/__init__.py | 4 +++ src/server/auth/security.py | 24 ++++++++++++++- src/server/main.py | 42 ++++++++++++++++++++++++-- tests/server/test_hooks_wiring.py | 50 ++++++++++++++++++++++++++++++- uv.lock | 33 ++++++++++++++------ 7 files changed, 151 insertions(+), 15 deletions(-) diff --git a/docs/PLUGINS.md b/docs/PLUGINS.md index 23dd30d4..20fd46c8 100644 --- a/docs/PLUGINS.md +++ b/docs/PLUGINS.md @@ -59,7 +59,13 @@ The hooks: tables so subsequent `PermissionChecker` calls have data to decide on. `RESULT` ownership is inferred from the owning task; `RESULT` permission checks are always paired with a `task_id`, and - workflow-level operations check `WORKFLOW`. + workflow-level operations check `WORKFLOW`. At startup the server + runs a reconcile sweep: it enumerates every live workflow, task, + worker, and node and calls `refresh(resources, logger)` once per + registrar with the full batch, then `purge_stale(logger)` once. + Persistent registrars use this pair to drop records for resources + the server no longer knows about — stateless registrars implement + both as no-ops. The shared protocols treat `kind` and `action` as plain strings — `lumid-hooks` does not enumerate kinds. FlowMesh layers the diff --git a/pyproject.toml b/pyproject.toml index a269ee20..06890c45 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,7 @@ runtime-server = [ "flowmesh-hook", "grpcio>=1.76.0", "httpx>=0.28.1", - "lumid-hooks>=0.1.0", + "lumid-hooks>=0.2.0", "protobuf>=5.29.6", "pydantic>=2.12.3", "python-multipart>=0.0.26", @@ -241,6 +241,9 @@ flowmesh-cli-stack = { workspace = true } flowmesh-hook = { workspace = true } flowmesh-sdk = { workspace = true } flowmesh-sdk-stack = { workspace = true } +# Local development against the in-flight lumid-hooks 0.2.0; drop once +# 0.2.0 is published to PyPI. +lumid-hooks = { path = "../lumid.hooks", editable = true } [tool.uv.workspace] members = ["cli", "cli/stack", "hook", "sdk", "sdk/stack"] diff --git a/src/server/auth/__init__.py b/src/server/auth/__init__.py index 20d126b1..9b23598c 100644 --- a/src/server/auth/__init__.py +++ b/src/server/auth/__init__.py @@ -5,6 +5,8 @@ authenticate_websocket, default_principal, deregister_resource, + purge_stale_resources, + refresh_resources, register_resource, require_permission, resolve_accessible_ids, @@ -18,6 +20,8 @@ "authenticate_websocket", "default_principal", "deregister_resource", + "purge_stale_resources", + "refresh_resources", "register_resource", "require_permission", "resolve_accessible_ids", diff --git a/src/server/auth/security.py b/src/server/auth/security.py index bb7de880..8f7c4a58 100644 --- a/src/server/auth/security.py +++ b/src/server/auth/security.py @@ -17,7 +17,7 @@ """ import logging -from collections.abc import Mapping +from collections.abc import Iterable, Mapping from typing import Any from fastapi import HTTPException, WebSocket, WebSocketException, status @@ -205,3 +205,25 @@ async def deregister_resource( resource = ResourceRef(kind=resource_kind.value, id=resource_id) for registrar in RESOURCE_REGISTRARS: await registrar.deregister(principal, resource, logger) + + +async def refresh_resources( + resources: Iterable[ResourceRef], + logger: logging.Logger, +) -> None: + """Notify every registered `ResourceRegistrar` of the current live set. + + Called once during startup reconcile with every live workflow / task / + node / worker. Registrars use this to mark records as still live before + `purge_stale_resources` clears anything untouched. + """ + refs = list(resources) + for registrar in RESOURCE_REGISTRARS: + await registrar.refresh(refs, logger) + + +async def purge_stale_resources(logger: logging.Logger) -> None: + """Tell every registered `ResourceRegistrar` to drop records the + reconcile sweep didn't touch. Called once after `refresh_resources`.""" + for registrar in RESOURCE_REGISTRARS: + await registrar.purge_stale(logger) diff --git a/src/server/main.py b/src/server/main.py index 7fc264da..2fd1e807 100644 --- a/src/server/main.py +++ b/src/server/main.py @@ -8,7 +8,8 @@ import uvicorn from fastapi import FastAPI -from lumid_hooks import HookBindings +from flowmesh_hook import ResourceKind +from lumid_hooks import HookBindings, ResourceRef if __name__ == "__main__" and __package__ is None: import sys @@ -19,7 +20,7 @@ from shared._version import FLOWMESH_RELEASE_VERSION -from .auth import resolve_system_principal +from .auth import purge_stale_resources, refresh_resources, resolve_system_principal from .clients import RedisClient from .config import NodeRole, ServerConfig from .dispatcher.factory import create_dispatcher @@ -305,6 +306,40 @@ async def _load_plugins(stack: AsyncExitStack) -> None: register(bindings) +async def _reconcile_resources() -> None: + """Refresh registrar-tracked records for every live resource, then purge + anything the sweep didn't touch. Runs once at startup after plugins load + so registrars don't drop grants on resources that outlived their TTL. + """ + refs: list[ResourceRef] = [] + + # Nodes (always present). + for node in await NODE_REGISTRY.list_nodes_async(): + refs.append(ResourceRef(kind=ResourceKind.NODE.value, id=node.node_id)) + + # Workers and workflows live on the root node. + if WORKER_REGISTRY is not None: + for worker in await WORKER_REGISTRY.list_workers_async(): + refs.append( + ResourceRef(kind=ResourceKind.WORKER.value, id=worker.worker_id) + ) + if WORKFLOW_REGISTRY is not None: + workflow_ids = await WORKFLOW_REGISTRY.get_workflow_ids_async() + for workflow_id in workflow_ids: + record = await WORKFLOW_REGISTRY.get_workflow_record_async(workflow_id) + if record is None: + continue + refs.append( + ResourceRef(kind=ResourceKind.WORKFLOW.value, id=workflow_id) + ) + for task_id in record.task_ids: + refs.append(ResourceRef(kind=ResourceKind.TASK.value, id=task_id)) + + logger.info("Startup reconcile: refreshing %d resource(s)", len(refs)) + await refresh_resources(refs, logger) + await purge_stale_resources(logger) + + @asynccontextmanager async def _lifespan(_: FastAPI): async with AsyncExitStack() as plugin_stack: @@ -316,6 +351,9 @@ async def _lifespan(_: FastAPI): ) app.state.system_principal = system_principal + # --- Startup reconcile (registrar plugins) --- + await _reconcile_resources() + # --- Root-only startup --- if IS_ROOT_NODE: if SSH_FORWARD_SERVICE is not None: diff --git a/tests/server/test_hooks_wiring.py b/tests/server/test_hooks_wiring.py index 25eb558f..db33588a 100644 --- a/tests/server/test_hooks_wiring.py +++ b/tests/server/test_hooks_wiring.py @@ -7,7 +7,7 @@ """ import logging -from collections.abc import Iterator, Sequence +from collections.abc import Iterable, Iterator, Sequence from datetime import UTC, datetime from decimal import Decimal from typing import Any @@ -30,6 +30,8 @@ authenticate_api_key, authenticate_connection, deregister_resource, + purge_stale_resources, + refresh_resources, register_resource, require_permission, resolve_accessible_ids, @@ -605,6 +607,8 @@ class _RecordingRegistrar: def __init__(self) -> None: self.registered: list[tuple[str, ResourceRef]] = [] self.deregistered: list[tuple[str, ResourceRef]] = [] + self.refreshed: list[list[ResourceRef]] = [] + self.purged: int = 0 async def register( self, @@ -622,6 +626,16 @@ async def deregister( ) -> None: self.deregistered.append((principal.principal_id, resource)) + async def refresh( + self, + resources: Iterable[ResourceRef], + logger: logging.Logger, + ) -> None: + self.refreshed.append(list(resources)) + + async def purge_stale(self, logger: logging.Logger) -> None: + self.purged += 1 + class TestResourceRegistrarComposition: @pytest.fixture @@ -687,9 +701,43 @@ async def register(self, *args: Any, **kwargs: Any) -> None: async def deregister(self, *args: Any, **kwargs: Any) -> None: return None + async def refresh(self, *args: Any, **kwargs: Any) -> None: + return None + + async def purge_stale(self, *args: Any, **kwargs: Any) -> None: + return None + register(BaseBindings(resource_registrars=[_Boom()])) with pytest.raises(RuntimeError, match="plugin failure"): await register_resource( principal, ResourceKind.WORKFLOW, "wfl-1", {}, logger ) + + @pytest.mark.anyio + async def test_refresh_fans_out_with_full_batch( + self, logger: logging.Logger + ) -> None: + first = _RecordingRegistrar() + second = _RecordingRegistrar() + register(BaseBindings(resource_registrars=[first, second])) + + refs = [ + ResourceRef(kind=ResourceKind.WORKFLOW.value, id="wfl-1"), + ResourceRef(kind=ResourceKind.WORKER.value, id="wkr-1"), + ] + await refresh_resources(refs, logger) + + for r in (first, second): + assert r.refreshed == [refs] + + @pytest.mark.anyio + async def test_purge_stale_fans_out(self, logger: logging.Logger) -> None: + first = _RecordingRegistrar() + second = _RecordingRegistrar() + register(BaseBindings(resource_registrars=[first, second])) + + await purge_stale_resources(logger) + + assert first.purged == 1 + assert second.purged == 1 diff --git a/uv.lock b/uv.lock index 650e27a2..fb16ea90 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.12" resolution-markers = [ "python_full_version >= '3.14'", @@ -2023,7 +2023,7 @@ ci = [ { name = "isort", specifier = ">=7.0.0" }, { name = "jinja2", specifier = ">=3.1.6" }, { name = "lumid-data-sdk", specifier = ">=0.1.0" }, - { name = "lumid-hooks", specifier = ">=0.1.0" }, + { name = "lumid-hooks", editable = "../lumid.hooks" }, { name = "matplotlib", specifier = ">=3.10.6" }, { name = "mcp", specifier = ">=1.23.0" }, { name = "mypy", specifier = ">=1.19.1" }, @@ -2185,7 +2185,7 @@ runtime-server = [ { name = "flowmesh-hook", editable = "hook" }, { name = "grpcio", specifier = ">=1.76.0" }, { name = "httpx", specifier = ">=0.28.1" }, - { name = "lumid-hooks", specifier = ">=0.1.0" }, + { name = "lumid-hooks", editable = "../lumid.hooks" }, { name = "protobuf", specifier = ">=5.29.6" }, { name = "pydantic", specifier = ">=2.12.3" }, { name = "python-multipart", specifier = ">=0.0.26" }, @@ -2338,7 +2338,7 @@ dependencies = [ ] [package.metadata] -requires-dist = [{ name = "lumid-hooks", specifier = ">=0.1.0" }] +requires-dist = [{ name = "lumid-hooks", editable = "../lumid.hooks" }] [[package]] name = "flowmesh-sdk" @@ -3567,14 +3567,24 @@ wheels = [ [[package]] name = "lumid-hooks" -version = "0.1.0" -source = { registry = "https://pypi.org/simple" } +version = "0.2.0" +source = { editable = "../lumid.hooks" } dependencies = [ { name = "pydantic" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/1f/b4/d660b386d13051669eae4c9056154c638302a1eea39fccb569075ed07243/lumid_hooks-0.1.0.tar.gz", hash = "sha256:0ae85cef586645391f21ea3d912de1bf2be77b6b6dd0690f3841bb238fa0315a", size = 11269, upload-time = "2026-05-12T09:07:16.782Z" } -wheels = [ - { url = "https://files.pythonhosted.org/packages/85/70/84678eb357151f84fd78e6ce8270a42180a2605a3a3168492201feed76bf/lumid_hooks-0.1.0-py3-none-any.whl", hash = "sha256:8cfd8f3ed36001711b70290a5043a213322deaced1046d4583a4739f11cb00a6", size = 12796, upload-time = "2026-05-12T09:07:15.629Z" }, + +[package.metadata] +requires-dist = [{ name = "pydantic", specifier = ">=2.12.3" }] + +[package.metadata.requires-dev] +dev = [ + { name = "black", specifier = ">=25.12.0" }, + { name = "codespell", specifier = ">=2.4.1" }, + { name = "isort", specifier = ">=7.0.0" }, + { name = "mypy", specifier = ">=1.19.1" }, + { name = "pre-commit", specifier = ">=4.5.1" }, + { name = "pytest", specifier = ">=8.4.2" }, + { name = "ruff", specifier = ">=0.14.10" }, ] [[package]] @@ -7481,10 +7491,15 @@ name = "triton" version = "3.6.0" source = { registry = "https://pypi.org/simple" } wheels = [ + { url = "https://files.pythonhosted.org/packages/17/5d/08201db32823bdf77a0e2b9039540080b2e5c23a20706ddba942924ebcd6/triton-3.6.0-cp312-cp312-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:374f52c11a711fd062b4bfbb201fd9ac0a5febd28a96fb41b4a0f51dde3157f4", size = 176128243, upload-time = "2026-01-20T16:16:07.857Z" }, { url = "https://files.pythonhosted.org/packages/ab/a8/cdf8b3e4c98132f965f88c2313a4b493266832ad47fb52f23d14d4f86bb5/triton-3.6.0-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:74caf5e34b66d9f3a429af689c1c7128daba1d8208df60e81106b115c00d6fca", size = 188266850, upload-time = "2026-01-20T16:00:43.041Z" }, + { url = "https://files.pythonhosted.org/packages/3c/12/34d71b350e89a204c2c7777a9bba0dcf2f19a5bfdd70b57c4dbc5ffd7154/triton-3.6.0-cp313-cp313-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:448e02fe6dc898e9e5aa89cf0ee5c371e99df5aa5e8ad976a80b93334f3494fd", size = 176133521, upload-time = "2026-01-20T16:16:13.321Z" }, { url = "https://files.pythonhosted.org/packages/f9/0b/37d991d8c130ce81a8728ae3c25b6e60935838e9be1b58791f5997b24a54/triton-3.6.0-cp313-cp313-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:10c7f76c6e72d2ef08df639e3d0d30729112f47a56b0c81672edc05ee5116ac9", size = 188289450, upload-time = "2026-01-20T16:00:49.136Z" }, + { url = "https://files.pythonhosted.org/packages/ce/4e/41b0c8033b503fd3cfcd12392cdd256945026a91ff02452bef40ec34bee7/triton-3.6.0-cp313-cp313t-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:1722e172d34e32abc3eb7711d0025bb69d7959ebea84e3b7f7a341cd7ed694d6", size = 176276087, upload-time = "2026-01-20T16:16:18.989Z" }, { url = "https://files.pythonhosted.org/packages/35/f8/9c66bfc55361ec6d0e4040a0337fb5924ceb23de4648b8a81ae9d33b2b38/triton-3.6.0-cp313-cp313t-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:d002e07d7180fd65e622134fbd980c9a3d4211fb85224b56a0a0efbd422ab72f", size = 188400296, upload-time = "2026-01-20T16:00:56.042Z" }, + { url = "https://files.pythonhosted.org/packages/49/55/5ecf0dcaa0f2fbbd4420f7ef227ee3cb172e91e5fede9d0ecaddc43363b4/triton-3.6.0-cp314-cp314-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ef5523241e7d1abca00f1d240949eebdd7c673b005edbbce0aca95b8191f1d43", size = 176138577, upload-time = "2026-01-20T16:16:25.426Z" }, { url = "https://files.pythonhosted.org/packages/df/3d/9e7eee57b37c80cec63322c0231bb6da3cfe535a91d7a4d64896fcb89357/triton-3.6.0-cp314-cp314-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:a17a5d5985f0ac494ed8a8e54568f092f7057ef60e1b0fa09d3fd1512064e803", size = 188273063, upload-time = "2026-01-20T16:01:07.278Z" }, + { url = "https://files.pythonhosted.org/packages/48/db/56ee649cab5eaff4757541325aca81f52d02d4a7cd3506776cad2451e060/triton-3.6.0-cp314-cp314t-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0b3a97e8ed304dfa9bd23bb41ca04cdf6b2e617d5e782a8653d616037a5d537d", size = 176274804, upload-time = "2026-01-20T16:16:31.528Z" }, { url = "https://files.pythonhosted.org/packages/f6/56/6113c23ff46c00aae423333eb58b3e60bdfe9179d542781955a5e1514cb3/triton-3.6.0-cp314-cp314t-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:46bd1c1af4b6704e554cad2eeb3b0a6513a980d470ccfa63189737340c7746a7", size = 188397994, upload-time = "2026-01-20T16:01:14.236Z" }, ] From c959bbd9412018a2bda357605f0e26fa1a31a0e3 Mon Sep 17 00:00:00 2001 From: Noppanat Wadlom Date: Fri, 22 May 2026 00:21:52 +0800 Subject: [PATCH 11/15] refactor: harden reconcile against partial-refresh wipes Review findings on the startup-reconcile work: - Move `_reconcile_resources()` after `SUPERVISOR.start(...)` so this node is in `NODE_REGISTRY` by the time the sweep enumerates it. Previously the local node was systematically missing from the refresh batch. - `refresh_resources` now returns the set of registrar names whose `refresh` raised, and `purge_stale_resources(..., skip=...)` excludes them. A registrar that fails mid-sweep no longer wipes rows it never marked live; other registrars still complete normally. - `docs/PLUGINS.md` locates the sweep relative to plugin install / supervisor handshake and documents the host-side guard. - Style/ops: `TODO(lumid-hooks-0.2.0):` prefix on the `tool.uv.sources` comment so future cleanup surfaces in grep; `uv.lock` revision restored to 3 (was downgraded by an older local uv). Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Noppanat Wadlom --- docs/PLUGINS.md | 16 +++++--- pyproject.toml | 4 +- src/server/auth/security.py | 37 +++++++++++++---- src/server/main.py | 12 +++--- tests/server/test_hooks_wiring.py | 67 +++++++++++++++++++++++++++++++ uv.lock | 2 +- 6 files changed, 117 insertions(+), 21 deletions(-) diff --git a/docs/PLUGINS.md b/docs/PLUGINS.md index 20fd46c8..a731b639 100644 --- a/docs/PLUGINS.md +++ b/docs/PLUGINS.md @@ -60,12 +60,16 @@ The hooks: on. `RESULT` ownership is inferred from the owning task; `RESULT` permission checks are always paired with a `task_id`, and workflow-level operations check `WORKFLOW`. At startup the server - runs a reconcile sweep: it enumerates every live workflow, task, - worker, and node and calls `refresh(resources, logger)` once per - registrar with the full batch, then `purge_stale(logger)` once. - Persistent registrars use this pair to drop records for resources - the server no longer knows about — stateless registrars implement - both as no-ops. + runs a reconcile sweep — after plugins load, the system principal + resolves, and the supervisor handshake completes — enumerating + every live workflow, task, worker, and node and calling + `refresh(resources, logger)` once per registrar with the full + batch, then `purge_stale(logger)` once. A registrar whose `refresh` + raises is logged and excluded from the same boot's `purge_stale` + call so it doesn't wipe rows it never marked live. Persistent + registrars use this pair to drop records for resources the server + no longer knows about — stateless registrars implement both as + no-ops. The shared protocols treat `kind` and `action` as plain strings — `lumid-hooks` does not enumerate kinds. FlowMesh layers the diff --git a/pyproject.toml b/pyproject.toml index 06890c45..67c9d678 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -241,8 +241,8 @@ flowmesh-cli-stack = { workspace = true } flowmesh-hook = { workspace = true } flowmesh-sdk = { workspace = true } flowmesh-sdk-stack = { workspace = true } -# Local development against the in-flight lumid-hooks 0.2.0; drop once -# 0.2.0 is published to PyPI. +# TODO(lumid-hooks-0.2.0): drop this override and re-sync requirements +# files once lumid-hooks 0.2.0 ships to PyPI. lumid-hooks = { path = "../lumid.hooks", editable = true } [tool.uv.workspace] diff --git a/src/server/auth/security.py b/src/server/auth/security.py index 8f7c4a58..e80f6943 100644 --- a/src/server/auth/security.py +++ b/src/server/auth/security.py @@ -210,20 +210,43 @@ async def deregister_resource( async def refresh_resources( resources: Iterable[ResourceRef], logger: logging.Logger, -) -> None: +) -> frozenset[str]: """Notify every registered `ResourceRegistrar` of the current live set. Called once during startup reconcile with every live workflow / task / - node / worker. Registrars use this to mark records as still live before - `purge_stale_resources` clears anything untouched. + node / worker. A registrar whose `refresh` raises is logged and its + name is returned in the failed set; the sweep does not abort — pass + the result to `purge_stale_resources(..., skip=...)` so failed + registrars don't wipe rows they never marked live. """ refs = list(resources) + failed: set[str] = set() for registrar in RESOURCE_REGISTRARS: - await registrar.refresh(refs, logger) + try: + await registrar.refresh(refs, logger) + except Exception: + logger.exception( + "ResourceRegistrar %s.refresh failed; skipping its purge_stale.", + registrar.name, + ) + failed.add(registrar.name) + return frozenset(failed) -async def purge_stale_resources(logger: logging.Logger) -> None: - """Tell every registered `ResourceRegistrar` to drop records the - reconcile sweep didn't touch. Called once after `refresh_resources`.""" +async def purge_stale_resources( + logger: logging.Logger, + *, + skip: frozenset[str] = frozenset(), +) -> None: + """Tell each `ResourceRegistrar` to drop records the reconcile sweep + didn't touch. + + Called once after `refresh_resources`. Registrars whose name is in + `skip` are bypassed — typically the set of registrars whose `refresh` + raised in the same sweep, so they don't wipe their rows on a partial + refresh. + """ for registrar in RESOURCE_REGISTRARS: + if registrar.name in skip: + continue await registrar.purge_stale(logger) diff --git a/src/server/main.py b/src/server/main.py index 2fd1e807..1d265f75 100644 --- a/src/server/main.py +++ b/src/server/main.py @@ -336,8 +336,8 @@ async def _reconcile_resources() -> None: refs.append(ResourceRef(kind=ResourceKind.TASK.value, id=task_id)) logger.info("Startup reconcile: refreshing %d resource(s)", len(refs)) - await refresh_resources(refs, logger) - await purge_stale_resources(logger) + failed = await refresh_resources(refs, logger) + await purge_stale_resources(logger, skip=failed) @asynccontextmanager @@ -351,9 +351,6 @@ async def _lifespan(_: FastAPI): ) app.state.system_principal = system_principal - # --- Startup reconcile (registrar plugins) --- - await _reconcile_resources() - # --- Root-only startup --- if IS_ROOT_NODE: if SSH_FORWARD_SERVICE is not None: @@ -371,6 +368,11 @@ async def _lifespan(_: FastAPI): if EVENT_MONITOR is not None: EVENT_MONITOR.set_own_node(SUPERVISOR.node_id) + # --- Startup reconcile (registrar plugins) --- + # Runs after the supervisor handshake so this node is in NODE_REGISTRY + # and is included in the live batch. + await _reconcile_resources() + try: yield finally: diff --git a/tests/server/test_hooks_wiring.py b/tests/server/test_hooks_wiring.py index db33588a..0f4824bf 100644 --- a/tests/server/test_hooks_wiring.py +++ b/tests/server/test_hooks_wiring.py @@ -741,3 +741,70 @@ async def test_purge_stale_fans_out(self, logger: logging.Logger) -> None: assert first.purged == 1 assert second.purged == 1 + + @pytest.mark.anyio + async def test_refresh_failure_is_logged_and_collected( + self, logger: logging.Logger + ) -> None: + ok = _RecordingRegistrar() + + class _FailingRefresh: + name = "failing" + + async def register(self, *args: Any, **kwargs: Any) -> None: + return None + + async def deregister(self, *args: Any, **kwargs: Any) -> None: + return None + + async def refresh(self, *args: Any, **kwargs: Any) -> None: + raise RuntimeError("refresh boom") + + async def purge_stale(self, *args: Any, **kwargs: Any) -> None: + return None + + failing = _FailingRefresh() + register(BaseBindings(resource_registrars=[failing, ok])) + + failed = await refresh_resources([], logger) + + assert failed == frozenset({"failing"}) + assert ok.refreshed == [[]] # the OK registrar still ran + + @pytest.mark.anyio + async def test_purge_stale_skips_failed_registrars( + self, logger: logging.Logger + ) -> None: + ok = _RecordingRegistrar() + skipped = _RecordingRegistrar() + skipped.name = "skipped" + register(BaseBindings(resource_registrars=[ok, skipped])) + + await purge_stale_resources(logger, skip=frozenset({"skipped"})) + + assert ok.purged == 1 + assert skipped.purged == 0 + + @pytest.mark.anyio + async def test_purge_stale_propagates_unexpected_failure( + self, logger: logging.Logger + ) -> None: + class _Boom: + name = "boom" + + async def register(self, *args: Any, **kwargs: Any) -> None: + return None + + async def deregister(self, *args: Any, **kwargs: Any) -> None: + return None + + async def refresh(self, *args: Any, **kwargs: Any) -> None: + return None + + async def purge_stale(self, *args: Any, **kwargs: Any) -> None: + raise RuntimeError("purge boom") + + register(BaseBindings(resource_registrars=[_Boom()])) + + with pytest.raises(RuntimeError, match="purge boom"): + await purge_stale_resources(logger) diff --git a/uv.lock b/uv.lock index fb16ea90..7f92909b 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 2 +revision = 3 requires-python = ">=3.12" resolution-markers = [ "python_full_version >= '3.14'", From e23b6ceff91bf27720a96c28a5fec3f40e73b3e2 Mon Sep 17 00:00:00 2001 From: Noppanat Wadlom Date: Fri, 22 May 2026 04:08:20 +0800 Subject: [PATCH 12/15] refactor: collapse refresh+purge_stale into reconcile_resources MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Match lumid-hooks 0.2.0's single-method Protocol. The fan-out helper in `auth.security` becomes one call; the failed-set plumbing in `_lifespan` goes away because `reconcile` is atomic on the plugin side — a raised exception leaves the registrar's store unchanged, so the host can log and move on. - `auth.security.reconcile_resources(resources, logger)` replaces `refresh_resources` / `purge_stale_resources`. Each registrar's `reconcile` is wrapped in try/except + `logger.exception`; the sweep continues with the next registrar on failure. - `_lifespan` calls `reconcile_resources(refs, logger)` once and drops the failed-set threading. - Tests trade the old refresh/purge dispatch tests for one happy-path fan-out test and one that asserts a failing registrar is isolated while the rest still run. - docs/PLUGINS.md reflects the single-call shape and the atomic-or-rollback semantics. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Noppanat Wadlom --- docs/PLUGINS.md | 14 ++--- src/server/auth/__init__.py | 6 +- src/server/auth/security.py | 41 ++++---------- src/server/main.py | 7 +-- tests/server/test_hooks_wiring.py | 93 ++++++------------------------- 5 files changed, 39 insertions(+), 122 deletions(-) diff --git a/docs/PLUGINS.md b/docs/PLUGINS.md index a731b639..7f762b1d 100644 --- a/docs/PLUGINS.md +++ b/docs/PLUGINS.md @@ -63,13 +63,13 @@ The hooks: runs a reconcile sweep — after plugins load, the system principal resolves, and the supervisor handshake completes — enumerating every live workflow, task, worker, and node and calling - `refresh(resources, logger)` once per registrar with the full - batch, then `purge_stale(logger)` once. A registrar whose `refresh` - raises is logged and excluded from the same boot's `purge_stale` - call so it doesn't wipe rows it never marked live. Persistent - registrars use this pair to drop records for resources the server - no longer knows about — stateless registrars implement both as - no-ops. + `reconcile(resources, logger)` once per registrar with the full + batch. `reconcile` is atomic per the `lumid-hooks` contract: on + failure the registrar's store is unchanged, so the server logs + the exception and moves on without risk of a partial wipe. + Persistent registrars use this call to drop records for resources + the server no longer knows about — stateless registrars implement + it as a no-op. The shared protocols treat `kind` and `action` as plain strings — `lumid-hooks` does not enumerate kinds. FlowMesh layers the diff --git a/src/server/auth/__init__.py b/src/server/auth/__init__.py index 9b23598c..04f006ae 100644 --- a/src/server/auth/__init__.py +++ b/src/server/auth/__init__.py @@ -5,8 +5,7 @@ authenticate_websocket, default_principal, deregister_resource, - purge_stale_resources, - refresh_resources, + reconcile_resources, register_resource, require_permission, resolve_accessible_ids, @@ -20,8 +19,7 @@ "authenticate_websocket", "default_principal", "deregister_resource", - "purge_stale_resources", - "refresh_resources", + "reconcile_resources", "register_resource", "require_permission", "resolve_accessible_ids", diff --git a/src/server/auth/security.py b/src/server/auth/security.py index e80f6943..c609406c 100644 --- a/src/server/auth/security.py +++ b/src/server/auth/security.py @@ -207,46 +207,25 @@ async def deregister_resource( await registrar.deregister(principal, resource, logger) -async def refresh_resources( +async def reconcile_resources( resources: Iterable[ResourceRef], logger: logging.Logger, -) -> frozenset[str]: - """Notify every registered `ResourceRegistrar` of the current live set. +) -> None: + """Tell every registered `ResourceRegistrar` to replace its stored live + set with `resources`. Called once during startup reconcile with every live workflow / task / - node / worker. A registrar whose `refresh` raises is logged and its - name is returned in the failed set; the sweep does not abort — pass - the result to `purge_stale_resources(..., skip=...)` so failed - registrars don't wipe rows they never marked live. + node / worker. Each registrar's `reconcile` is atomic — on failure the + registrar's store is unchanged — so a raised exception is logged and + the sweep continues with the next registrar. The failing registrar + retries next boot. """ refs = list(resources) - failed: set[str] = set() for registrar in RESOURCE_REGISTRARS: try: - await registrar.refresh(refs, logger) + await registrar.reconcile(refs, logger) except Exception: logger.exception( - "ResourceRegistrar %s.refresh failed; skipping its purge_stale.", + "ResourceRegistrar %s.reconcile failed; store left untouched.", registrar.name, ) - failed.add(registrar.name) - return frozenset(failed) - - -async def purge_stale_resources( - logger: logging.Logger, - *, - skip: frozenset[str] = frozenset(), -) -> None: - """Tell each `ResourceRegistrar` to drop records the reconcile sweep - didn't touch. - - Called once after `refresh_resources`. Registrars whose name is in - `skip` are bypassed — typically the set of registrars whose `refresh` - raised in the same sweep, so they don't wipe their rows on a partial - refresh. - """ - for registrar in RESOURCE_REGISTRARS: - if registrar.name in skip: - continue - await registrar.purge_stale(logger) diff --git a/src/server/main.py b/src/server/main.py index 1d265f75..268ad586 100644 --- a/src/server/main.py +++ b/src/server/main.py @@ -20,7 +20,7 @@ from shared._version import FLOWMESH_RELEASE_VERSION -from .auth import purge_stale_resources, refresh_resources, resolve_system_principal +from .auth import reconcile_resources, resolve_system_principal from .clients import RedisClient from .config import NodeRole, ServerConfig from .dispatcher.factory import create_dispatcher @@ -335,9 +335,8 @@ async def _reconcile_resources() -> None: for task_id in record.task_ids: refs.append(ResourceRef(kind=ResourceKind.TASK.value, id=task_id)) - logger.info("Startup reconcile: refreshing %d resource(s)", len(refs)) - failed = await refresh_resources(refs, logger) - await purge_stale_resources(logger, skip=failed) + logger.info("Startup reconcile: %d live resource(s)", len(refs)) + await reconcile_resources(refs, logger) @asynccontextmanager diff --git a/tests/server/test_hooks_wiring.py b/tests/server/test_hooks_wiring.py index 0f4824bf..bd384364 100644 --- a/tests/server/test_hooks_wiring.py +++ b/tests/server/test_hooks_wiring.py @@ -30,8 +30,7 @@ authenticate_api_key, authenticate_connection, deregister_resource, - purge_stale_resources, - refresh_resources, + reconcile_resources, register_resource, require_permission, resolve_accessible_ids, @@ -607,8 +606,7 @@ class _RecordingRegistrar: def __init__(self) -> None: self.registered: list[tuple[str, ResourceRef]] = [] self.deregistered: list[tuple[str, ResourceRef]] = [] - self.refreshed: list[list[ResourceRef]] = [] - self.purged: int = 0 + self.reconciled: list[list[ResourceRef]] = [] async def register( self, @@ -626,15 +624,12 @@ async def deregister( ) -> None: self.deregistered.append((principal.principal_id, resource)) - async def refresh( + async def reconcile( self, resources: Iterable[ResourceRef], logger: logging.Logger, ) -> None: - self.refreshed.append(list(resources)) - - async def purge_stale(self, logger: logging.Logger) -> None: - self.purged += 1 + self.reconciled.append(list(resources)) class TestResourceRegistrarComposition: @@ -701,10 +696,7 @@ async def register(self, *args: Any, **kwargs: Any) -> None: async def deregister(self, *args: Any, **kwargs: Any) -> None: return None - async def refresh(self, *args: Any, **kwargs: Any) -> None: - return None - - async def purge_stale(self, *args: Any, **kwargs: Any) -> None: + async def reconcile(self, *args: Any, **kwargs: Any) -> None: return None register(BaseBindings(resource_registrars=[_Boom()])) @@ -715,7 +707,7 @@ async def purge_stale(self, *args: Any, **kwargs: Any) -> None: ) @pytest.mark.anyio - async def test_refresh_fans_out_with_full_batch( + async def test_reconcile_fans_out_with_full_batch( self, logger: logging.Logger ) -> None: first = _RecordingRegistrar() @@ -726,29 +718,18 @@ async def test_refresh_fans_out_with_full_batch( ResourceRef(kind=ResourceKind.WORKFLOW.value, id="wfl-1"), ResourceRef(kind=ResourceKind.WORKER.value, id="wkr-1"), ] - await refresh_resources(refs, logger) + await reconcile_resources(refs, logger) for r in (first, second): - assert r.refreshed == [refs] - - @pytest.mark.anyio - async def test_purge_stale_fans_out(self, logger: logging.Logger) -> None: - first = _RecordingRegistrar() - second = _RecordingRegistrar() - register(BaseBindings(resource_registrars=[first, second])) - - await purge_stale_resources(logger) - - assert first.purged == 1 - assert second.purged == 1 + assert r.reconciled == [refs] @pytest.mark.anyio - async def test_refresh_failure_is_logged_and_collected( + async def test_reconcile_failure_is_logged_and_isolated( self, logger: logging.Logger ) -> None: ok = _RecordingRegistrar() - class _FailingRefresh: + class _FailingReconcile: name = "failing" async def register(self, *args: Any, **kwargs: Any) -> None: @@ -757,54 +738,14 @@ async def register(self, *args: Any, **kwargs: Any) -> None: async def deregister(self, *args: Any, **kwargs: Any) -> None: return None - async def refresh(self, *args: Any, **kwargs: Any) -> None: - raise RuntimeError("refresh boom") + async def reconcile(self, *args: Any, **kwargs: Any) -> None: + raise RuntimeError("reconcile boom") - async def purge_stale(self, *args: Any, **kwargs: Any) -> None: - return None - - failing = _FailingRefresh() + failing = _FailingReconcile() register(BaseBindings(resource_registrars=[failing, ok])) - failed = await refresh_resources([], logger) - - assert failed == frozenset({"failing"}) - assert ok.refreshed == [[]] # the OK registrar still ran - - @pytest.mark.anyio - async def test_purge_stale_skips_failed_registrars( - self, logger: logging.Logger - ) -> None: - ok = _RecordingRegistrar() - skipped = _RecordingRegistrar() - skipped.name = "skipped" - register(BaseBindings(resource_registrars=[ok, skipped])) - - await purge_stale_resources(logger, skip=frozenset({"skipped"})) - - assert ok.purged == 1 - assert skipped.purged == 0 - - @pytest.mark.anyio - async def test_purge_stale_propagates_unexpected_failure( - self, logger: logging.Logger - ) -> None: - class _Boom: - name = "boom" - - async def register(self, *args: Any, **kwargs: Any) -> None: - return None - - async def deregister(self, *args: Any, **kwargs: Any) -> None: - return None - - async def refresh(self, *args: Any, **kwargs: Any) -> None: - return None - - async def purge_stale(self, *args: Any, **kwargs: Any) -> None: - raise RuntimeError("purge boom") - - register(BaseBindings(resource_registrars=[_Boom()])) + # Sweep does not raise — failure is logged and the OK registrar + # still runs. + await reconcile_resources([], logger) - with pytest.raises(RuntimeError, match="purge boom"): - await purge_stale_resources(logger) + assert ok.reconciled == [[]] From 235a8f909bd4aac12fb51f1c6d413b8d330a7264 Mon Sep 17 00:00:00 2001 From: Noppanat Wadlom Date: Fri, 22 May 2026 11:57:44 +0800 Subject: [PATCH 13/15] chore(deps): bump lumid-hooks to 0.2.0 from PyPI lumid-hooks 0.2.0 (with the new `ResourceRegistrar.reconcile`) is released on PyPI, so the editable path override from `[tool.uv.sources]` is dropped. flowmesh-hook's pin moves to >=0.2.0 since it re-exports the breaking-changed Protocol surface. Server requirements regenerated. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Noppanat Wadlom --- hook/pyproject.toml | 2 +- pyproject.toml | 3 --- src/server/requirements.txt | 2 +- uv.lock | 26 ++++++++------------------ 4 files changed, 10 insertions(+), 23 deletions(-) diff --git a/hook/pyproject.toml b/hook/pyproject.toml index f64efe93..fa2dbbe1 100644 --- a/hook/pyproject.toml +++ b/hook/pyproject.toml @@ -11,7 +11,7 @@ requires-python = ">=3.12" license = "Apache-2.0" license-files = ["LICENSE"] dependencies = [ - "lumid-hooks>=0.1.0", + "lumid-hooks>=0.2.0", ] [tool.setuptools] diff --git a/pyproject.toml b/pyproject.toml index 67c9d678..c77bf8dc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -241,9 +241,6 @@ flowmesh-cli-stack = { workspace = true } flowmesh-hook = { workspace = true } flowmesh-sdk = { workspace = true } flowmesh-sdk-stack = { workspace = true } -# TODO(lumid-hooks-0.2.0): drop this override and re-sync requirements -# files once lumid-hooks 0.2.0 ships to PyPI. -lumid-hooks = { path = "../lumid.hooks", editable = true } [tool.uv.workspace] members = ["cli", "cli/stack", "hook", "sdk", "sdk/stack"] diff --git a/src/server/requirements.txt b/src/server/requirements.txt index 9c4f33cf..36b9a24a 100644 --- a/src/server/requirements.txt +++ b/src/server/requirements.txt @@ -6,7 +6,7 @@ docker==7.1.0 fastapi==0.120.3 grpcio==1.76.0 httpx==0.28.1 -lumid-hooks==0.1.0 +lumid-hooks==0.2.0 protobuf==5.29.6 pydantic==2.12.3 python-multipart==0.0.27 diff --git a/uv.lock b/uv.lock index 7f92909b..d54284f7 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.12" resolution-markers = [ "python_full_version >= '3.14'", @@ -2023,7 +2023,7 @@ ci = [ { name = "isort", specifier = ">=7.0.0" }, { name = "jinja2", specifier = ">=3.1.6" }, { name = "lumid-data-sdk", specifier = ">=0.1.0" }, - { name = "lumid-hooks", editable = "../lumid.hooks" }, + { name = "lumid-hooks", specifier = ">=0.2.0" }, { name = "matplotlib", specifier = ">=3.10.6" }, { name = "mcp", specifier = ">=1.23.0" }, { name = "mypy", specifier = ">=1.19.1" }, @@ -2185,7 +2185,7 @@ runtime-server = [ { name = "flowmesh-hook", editable = "hook" }, { name = "grpcio", specifier = ">=1.76.0" }, { name = "httpx", specifier = ">=0.28.1" }, - { name = "lumid-hooks", editable = "../lumid.hooks" }, + { name = "lumid-hooks", specifier = ">=0.2.0" }, { name = "protobuf", specifier = ">=5.29.6" }, { name = "pydantic", specifier = ">=2.12.3" }, { name = "python-multipart", specifier = ">=0.0.26" }, @@ -2338,7 +2338,7 @@ dependencies = [ ] [package.metadata] -requires-dist = [{ name = "lumid-hooks", editable = "../lumid.hooks" }] +requires-dist = [{ name = "lumid-hooks", specifier = ">=0.2.0" }] [[package]] name = "flowmesh-sdk" @@ -3568,23 +3568,13 @@ wheels = [ [[package]] name = "lumid-hooks" version = "0.2.0" -source = { editable = "../lumid.hooks" } +source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "pydantic" }, ] - -[package.metadata] -requires-dist = [{ name = "pydantic", specifier = ">=2.12.3" }] - -[package.metadata.requires-dev] -dev = [ - { name = "black", specifier = ">=25.12.0" }, - { name = "codespell", specifier = ">=2.4.1" }, - { name = "isort", specifier = ">=7.0.0" }, - { name = "mypy", specifier = ">=1.19.1" }, - { name = "pre-commit", specifier = ">=4.5.1" }, - { name = "pytest", specifier = ">=8.4.2" }, - { name = "ruff", specifier = ">=0.14.10" }, +sdist = { url = "https://files.pythonhosted.org/packages/34/b7/f2c4544b4df91ce80908f17cd5645611ec2538ddc6444bc00ad8e5961531/lumid_hooks-0.2.0.tar.gz", hash = "sha256:3c33d067c238337c16be88708d674448d83642e767bc0724e0ab1b5f6122010d", size = 11655, upload-time = "2026-05-22T03:49:57.706Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/67/ac/1b39a49650a27f012b847b6be939866cec39d4aea96d50177ec8861d21ea/lumid_hooks-0.2.0-py3-none-any.whl", hash = "sha256:a3e471ee38cfe83eb46199a64c269bc0ad9f81154d820e68d45c604a16e1f2e7", size = 13002, upload-time = "2026-05-22T03:49:56.281Z" }, ] [[package]] From e4a7b7415d15422a63156a7b8cd759bff7f6dca5 Mon Sep 17 00:00:00 2001 From: Noppanat Wadlom Date: Fri, 22 May 2026 17:28:57 +0800 Subject: [PATCH 14/15] fix(server): use Node.id / Worker.id in startup reconcile MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `_reconcile_resources` walked `NODE_REGISTRY` and `WORKER_REGISTRY` but read `.node_id` / `.worker_id` off the records — those attributes don't exist on the `Node` / `Worker` Pydantic models, both of which expose `.id`. The mismatch crashed the FastAPI lifespan during the startup reconcile sweep, taking the whole server down on every boot. Caught while running the lumid plugin e2e plan against a freshly built `FLOWMESH_VERSION=permission` image. Co-Authored-By: Claude Opus 4.7 (1M context) Signed-off-by: Noppanat Wadlom --- src/server/main.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/server/main.py b/src/server/main.py index 268ad586..3387e50f 100644 --- a/src/server/main.py +++ b/src/server/main.py @@ -315,13 +315,13 @@ async def _reconcile_resources() -> None: # Nodes (always present). for node in await NODE_REGISTRY.list_nodes_async(): - refs.append(ResourceRef(kind=ResourceKind.NODE.value, id=node.node_id)) + refs.append(ResourceRef(kind=ResourceKind.NODE.value, id=node.id)) # Workers and workflows live on the root node. if WORKER_REGISTRY is not None: for worker in await WORKER_REGISTRY.list_workers_async(): refs.append( - ResourceRef(kind=ResourceKind.WORKER.value, id=worker.worker_id) + ResourceRef(kind=ResourceKind.WORKER.value, id=worker.id) ) if WORKFLOW_REGISTRY is not None: workflow_ids = await WORKFLOW_REGISTRY.get_workflow_ids_async() From 3d73a65452dd4b0c4b4410fa1eccade6fa977a28 Mon Sep 17 00:00:00 2001 From: Noppanat Wadlom Date: Fri, 22 May 2026 23:34:56 +0800 Subject: [PATCH 15/15] refactor: apply black formatter Signed-off-by: Noppanat Wadlom --- src/server/main.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/src/server/main.py b/src/server/main.py index 3387e50f..a9d690ea 100644 --- a/src/server/main.py +++ b/src/server/main.py @@ -320,18 +320,14 @@ async def _reconcile_resources() -> None: # Workers and workflows live on the root node. if WORKER_REGISTRY is not None: for worker in await WORKER_REGISTRY.list_workers_async(): - refs.append( - ResourceRef(kind=ResourceKind.WORKER.value, id=worker.id) - ) + refs.append(ResourceRef(kind=ResourceKind.WORKER.value, id=worker.id)) if WORKFLOW_REGISTRY is not None: workflow_ids = await WORKFLOW_REGISTRY.get_workflow_ids_async() for workflow_id in workflow_ids: record = await WORKFLOW_REGISTRY.get_workflow_record_async(workflow_id) if record is None: continue - refs.append( - ResourceRef(kind=ResourceKind.WORKFLOW.value, id=workflow_id) - ) + refs.append(ResourceRef(kind=ResourceKind.WORKFLOW.value, id=workflow_id)) for task_id in record.task_ids: refs.append(ResourceRef(kind=ResourceKind.TASK.value, id=task_id))