diff --git a/.github/workflows/security.yml b/.github/workflows/security.yml index d254086..8ebaa8d 100644 --- a/.github/workflows/security.yml +++ b/.github/workflows/security.yml @@ -89,6 +89,7 @@ jobs: run: | grep -v '@ git+' src/server/requirements.txt > /tmp/requirements-server-audit.txt uvx pip-audit==2.9.0 --strict \ + --ignore-vuln PYSEC-2026-161 \ -r /tmp/requirements-server-audit.txt - name: Run pip-audit (worker CPU) # Drop ``@ git+`` deps before auditing — not on PyPI, no CVE feed. @@ -105,6 +106,7 @@ jobs: --ignore-vuln GHSA-vfmq-68hx-4jfw \ --ignore-vuln GHSA-j7w6-vpvq-j3gm \ --ignore-vuln GHSA-98h9-4798-4q5v \ + --ignore-vuln GHSA-7wx4-6vff-v64p \ --ignore-vuln PYSEC-2025-189 \ --ignore-vuln PYSEC-2025-190 \ --ignore-vuln PYSEC-2025-191 \ @@ -152,6 +154,7 @@ jobs: --ignore-vuln GHSA-83vm-p52w-f9pw \ --ignore-vuln GHSA-j7w6-vpvq-j3gm \ --ignore-vuln GHSA-98h9-4798-4q5v \ + --ignore-vuln GHSA-7wx4-6vff-v64p \ --ignore-vuln PYSEC-2025-189 \ --ignore-vuln PYSEC-2025-190 \ --ignore-vuln PYSEC-2025-191 \ @@ -176,4 +179,5 @@ jobs: --ignore-vuln PYSEC-2024-277 \ --ignore-vuln PYSEC-2025-222 \ --ignore-vuln PYSEC-2024-274 \ + --ignore-vuln PYSEC-2026-161 \ -r src/worker/requirements/requirements.gpu.txt diff --git a/cli/stack/src/flowmesh_cli_stack/assets/.env.example b/cli/stack/src/flowmesh_cli_stack/assets/.env.example index c34520b..d1c56c2 100644 --- a/cli/stack/src/flowmesh_cli_stack/assets/.env.example +++ b/cli/stack/src/flowmesh_cli_stack/assets/.env.example @@ -173,11 +173,15 @@ NEBULA_API_TOKEN= # ==== External Plugins ==== # Plugins are Python packages dropped under FLOWMESH_PLUGIN_DIR -# (host-mounted to /app/plugins on the server) and selected by -# FLOWMESH_PLUGINS as a comma-separated list of top-level module -# names. Each named module must expose `install()` returning a -# `HookBindings`. Leave both empty unless you ship a plugin. +# (read-only at /app/plugins) and selected by FLOWMESH_PLUGINS as +# a comma-separated list of top-level module names. Each must +# expose `install()` returning a `HookBindings`. +# FLOWMESH_PLUGIN_DATA_DIR is writable at /app/plugin-data for +# plugin state. Leave all empty unless you ship a plugin. FLOWMESH_PLUGIN_DIR=./plugins +# A path (`./x`, `/abs/x`) -> host bind-mount (auto-created). +# A bare name -> external Docker volume of that name. +FLOWMESH_PLUGIN_DATA_DIR=./plugin-data FLOWMESH_PLUGINS= # ==== Agent Executor (youtu-agent / utu) ==== diff --git a/cli/stack/src/flowmesh_cli_stack/assets/compose.yml b/cli/stack/src/flowmesh_cli_stack/assets/compose.yml index 9bc07df..c381e9f 100644 --- a/cli/stack/src/flowmesh_cli_stack/assets/compose.yml +++ b/cli/stack/src/flowmesh_cli_stack/assets/compose.yml @@ -166,6 +166,7 @@ services: - /var/run/docker.sock:/var/run/docker.sock - ${SERVER_WORKER_CONFIG:-./configs/worker_config.yaml}:/etc/flowmesh/worker_config.yaml:ro - ${FLOWMESH_PLUGIN_DIR:-./plugins}:/app/plugins:ro + - ${FLOWMESH_PLUGIN_DATA_DIR:-./plugin-data}:/app/plugin-data - ${REDIS_TLS_DIR:-./secrets/tls/redis}:/etc/ssl/redis:ro - ${SERVER_TLS_DIR:-./secrets/tls/server}:/etc/ssl/server:ro restart: unless-stopped @@ -199,3 +200,6 @@ volumes: name: ${FLOWMESH_STACK_SLUG:-flowmesh_node}_metrics flowmesh_server_logs: name: ${FLOWMESH_STACK_SLUG:-flowmesh_node}_server_logs + flowmesh_plugin_data: + external: true + name: ${FLOWMESH_PLUGIN_DATA_VOLUME:-${FLOWMESH_STACK_SLUG:-flowmesh_node}_plugin_data} diff --git a/cli/stack/src/flowmesh_cli_stack/env_schema.py b/cli/stack/src/flowmesh_cli_stack/env_schema.py index 3064402..eef8366 100644 --- a/cli/stack/src/flowmesh_cli_stack/env_schema.py +++ b/cli/stack/src/flowmesh_cli_stack/env_schema.py @@ -530,10 +530,11 @@ title="External Plugins", description=[ "Plugins are Python packages dropped under FLOWMESH_PLUGIN_DIR ", - "(host-mounted to /app/plugins on the server) and selected by ", - "FLOWMESH_PLUGINS as a comma-separated list of top-level module ", - "names. Each named module must expose `install()` returning a ", - "`HookBindings`. Leave both empty unless you ship a plugin.", + "(read-only at /app/plugins) and selected by FLOWMESH_PLUGINS as ", + "a comma-separated list of top-level module names. Each must ", + "expose `install()` returning a `HookBindings`. ", + "FLOWMESH_PLUGIN_DATA_DIR is writable at /app/plugin-data for ", + "plugin state. Leave all empty unless you ship a plugin.", ], vars=[ EnvVar( @@ -543,6 +544,15 @@ use_default=True, ensure_path="create", ), + EnvVar( + "FLOWMESH_PLUGIN_DATA_DIR", + "./plugin-data", + use_default=True, + description=[ + "A path (`./x`, `/abs/x`) -> host bind-mount (auto-created).", + "A bare name -> external Docker volume of that name.", + ], + ), EnvVar("FLOWMESH_PLUGINS", ""), ], ), diff --git a/cli/stack/src/flowmesh_cli_stack/stack.py b/cli/stack/src/flowmesh_cli_stack/stack.py index c8a075b..630b32b 100644 --- a/cli/stack/src/flowmesh_cli_stack/stack.py +++ b/cli/stack/src/flowmesh_cli_stack/stack.py @@ -35,6 +35,7 @@ from .utils import ( DEFAULT_ENV_FILE, STACK_PATH_KEYS, + apply_plugin_data_env, apply_stack_resource_env, ensure_deploy_paths, parse_node_role, @@ -58,6 +59,7 @@ def _load(env_file: Path) -> None: except ValueError as exc: logging.error(str(exc)) raise typer.Exit(code=1) + apply_plugin_data_env(Path.cwd()) return DockerComposeStack( compose_file=stack_compose_file(), diff --git a/cli/stack/src/flowmesh_cli_stack/utils.py b/cli/stack/src/flowmesh_cli_stack/utils.py index 5acb487..011eb14 100644 --- a/cli/stack/src/flowmesh_cli_stack/utils.py +++ b/cli/stack/src/flowmesh_cli_stack/utils.py @@ -57,6 +57,22 @@ def apply_stack_resource_env() -> None: os.environ[WORKER_RESULTS_DIR_ENV] = results_volume +_PLUGIN_DATA_PATH_PREFIXES = ("/", "./", "../", "~") +_PLUGIN_DATA_ALIAS = "flowmesh_plugin_data" +_PLUGIN_DATA_DEFAULT = "./plugin-data" +_PLUGIN_DATA_VOLUME_ENV = "FLOWMESH_PLUGIN_DATA_VOLUME" + + +def apply_plugin_data_env(base_dir: Path) -> None: + raw = os.environ.get("FLOWMESH_PLUGIN_DATA_DIR", "").strip() + if not raw or raw.startswith(_PLUGIN_DATA_PATH_PREFIXES): + resolved = resolve_path(raw, default=_PLUGIN_DATA_DEFAULT, base_dir=base_dir) + os.environ["FLOWMESH_PLUGIN_DATA_DIR"] = resolved.as_posix() + else: + os.environ[_PLUGIN_DATA_VOLUME_ENV] = raw + os.environ["FLOWMESH_PLUGIN_DATA_DIR"] = _PLUGIN_DATA_ALIAS + + def stack_compose_file() -> Path: return asset_path("flowmesh_cli_stack.assets", "compose.yml") @@ -118,6 +134,14 @@ def ensure_deploy_paths(base_dir: Path) -> None: base_dir=base_dir, ) ) + if not os.environ.get(_PLUGIN_DATA_VOLUME_ENV): + ensure_dir( + resolve_path( + os.getenv("FLOWMESH_PLUGIN_DATA_DIR", ""), + default=_PLUGIN_DATA_DEFAULT, + base_dir=base_dir, + ) + ) def parse_node_role(raw: str) -> NodeRole: diff --git a/docs/CODE_STYLE.md b/docs/CODE_STYLE.md index 394628a..15ac62d 100644 --- a/docs/CODE_STYLE.md +++ b/docs/CODE_STYLE.md @@ -117,6 +117,7 @@ as `--ignore-vuln` flags in `.github/workflows/security.yml`. | GHSA-w8v5-vhqr-4h9v | diskcache | (none) | upstream unmaintained, no fixed version published | | GHSA-j7w6-vpvq-j3gm | diffusers | 0.38.0 | fix requires safetensors>=0.8.0rc0 pre-release; uv lock won't pick up pre-releases without explicit opt-in | | GHSA-98h9-4798-4q5v | diffusers | 0.38.0 | same blocker as GHSA-j7w6-vpvq-j3gm — both fixed in 0.38.0 | +| GHSA-7wx4-6vff-v64p | diffusers | 0.38.0 | same blocker as GHSA-j7w6-vpvq-j3gm — fixed in 0.38.0 | | PYSEC-2025-189 | torch | (none) | no fix version published | | PYSEC-2025-190 | torch | (none) | same | | PYSEC-2025-191 | torch | (none) | same | @@ -141,6 +142,7 @@ as `--ignore-vuln` flags in `.github/workflows/security.yml`. | PYSEC-2024-277 | joblib | (none) | no fix version published | | PYSEC-2025-222 | vllm | (none) | no fix version published; held by vllm-omni 0.18 pin | | PYSEC-2024-274 | gradio | (none) | no fix version published; vllm-omni 0.18 pins gradio==5.50 | +| PYSEC-2026-161 | starlette | 1.0.1 | gradio 5.50 caps starlette<1.0 (transitive via vllm-omni 0.18) | When a blocker lifts (e.g. transformers 5 ↔ vllm 0.19 line stabilizes), drop the corresponding `--ignore-vuln` flag from the workflow and the diff --git a/docs/ENV.md b/docs/ENV.md index 9bde44e..11da745 100644 --- a/docs/ENV.md +++ b/docs/ENV.md @@ -35,6 +35,7 @@ listed here is in `.env.example`. | `ENABLE_WORKER_WATCHDOG` | `true` | Worker death detection | | `WORKER_DEATH_GRACE_SEC` | `60` | Grace period before marking dead | | `FLOWMESH_PLUGINS` | – | Comma-separated plugin module names | +| `FLOWMESH_PLUGIN_DATA_DIR` | `./plugin-data` | Writable mount at `/app/plugin-data` for plugin state. A path -> host bind-mount (auto-created); a bare name -> external Docker volume of that name. | | `SERVER_CUDA_PROBE_IMAGE` | `nvidia/cuda:12.9.1-base-ubuntu24.04` | CUDA image the server runs briefly to query local GPU names/indices | | `DOCKER_GPU_RUNTIME` | nvidia | Optional Docker runtime name for GPU probe/worker containers; leave empty unless the host requires a named runtime such as `nvidia` | | `FLOWMESH_API_KEY` | – | Forwarded to spawned workers as their server-callback bearer | diff --git a/docs/PLUGINS.md b/docs/PLUGINS.md index 6e5397e..7f762b1 100644 --- a/docs/PLUGINS.md +++ b/docs/PLUGINS.md @@ -59,7 +59,17 @@ The hooks: tables so subsequent `PermissionChecker` calls have data to decide on. `RESULT` ownership is inferred from the owning task; `RESULT` permission checks are always paired with a `task_id`, and - workflow-level operations check `WORKFLOW`. + workflow-level operations check `WORKFLOW`. At startup the server + runs a reconcile sweep — after plugins load, the system principal + resolves, and the supervisor handshake completes — enumerating + every live workflow, task, worker, and node and calling + `reconcile(resources, logger)` once per registrar with the full + batch. `reconcile` is atomic per the `lumid-hooks` contract: on + failure the registrar's store is unchanged, so the server logs + the exception and moves on without risk of a partial wipe. + Persistent registrars use this call to drop records for resources + the server no longer knows about — stateless registrars implement + it as a no-op. The shared protocols treat `kind` and `action` as plain strings — `lumid-hooks` does not enumerate kinds. FlowMesh layers the @@ -134,6 +144,11 @@ Each subdirectory of `FLOWMESH_PLUGIN_DIR` is importable as a top-level module. The mount is read-only, so the plugin code is treated as static deployment artifact. +For writable persistence, `FLOWMESH_PLUGIN_DATA_DIR` (default +`./plugin-data`) is mounted read-write at `/app/plugin-data`. A path +value is a host bind-mount (auto-created on `stack up`); a bare name +is an external Docker volume of that name. + This handles plugin **code** without rebuilding the server image. When that isn't enough, build a thin overlay on top of the prebuilt image. Two patterns, pick by need: diff --git a/hook/README.md b/hook/README.md index 2095066..2e60def 100644 --- a/hook/README.md +++ b/hook/README.md @@ -3,7 +3,9 @@ FlowMesh-specific plugin extension surface. Carries the pieces FlowMesh adds on top of [`lumid-hooks`](https://github.com/mlsys-io/lumid.hooks): -- `HookBindings` — concrete dataclass with FlowMesh's six fields (the five +- `HookBindings` — runtime-checkable Protocol extending the shared one with + `supplier_resolvers`, used by the server's plugin gate. +- `BaseBindings` — frozen dataclass with FlowMesh's six fields (the five shared from `lumid-hooks` plus `supplier_resolvers`). - `ResourceKind` / `ResourceAction` — FlowMesh resource and action enums. - `SupplierResolver` / `WorkerView` — supplier attribution at dispatch time. diff --git a/hook/pyproject.toml b/hook/pyproject.toml index f64efe9..fa2dbbe 100644 --- a/hook/pyproject.toml +++ b/hook/pyproject.toml @@ -11,7 +11,7 @@ requires-python = ">=3.12" license = "Apache-2.0" license-files = ["LICENSE"] dependencies = [ - "lumid-hooks>=0.1.0", + "lumid-hooks>=0.2.0", ] [tool.setuptools] diff --git a/pyproject.toml b/pyproject.toml index a269ee2..c77bf8d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -29,7 +29,7 @@ runtime-server = [ "flowmesh-hook", "grpcio>=1.76.0", "httpx>=0.28.1", - "lumid-hooks>=0.1.0", + "lumid-hooks>=0.2.0", "protobuf>=5.29.6", "pydantic>=2.12.3", "python-multipart>=0.0.26", diff --git a/src/server/auth/__init__.py b/src/server/auth/__init__.py index 20d126b..04f006a 100644 --- a/src/server/auth/__init__.py +++ b/src/server/auth/__init__.py @@ -5,6 +5,7 @@ authenticate_websocket, default_principal, deregister_resource, + reconcile_resources, register_resource, require_permission, resolve_accessible_ids, @@ -18,6 +19,7 @@ "authenticate_websocket", "default_principal", "deregister_resource", + "reconcile_resources", "register_resource", "require_permission", "resolve_accessible_ids", diff --git a/src/server/auth/security.py b/src/server/auth/security.py index bb7de88..c609406 100644 --- a/src/server/auth/security.py +++ b/src/server/auth/security.py @@ -17,7 +17,7 @@ """ import logging -from collections.abc import Mapping +from collections.abc import Iterable, Mapping from typing import Any from fastapi import HTTPException, WebSocket, WebSocketException, status @@ -205,3 +205,27 @@ async def deregister_resource( resource = ResourceRef(kind=resource_kind.value, id=resource_id) for registrar in RESOURCE_REGISTRARS: await registrar.deregister(principal, resource, logger) + + +async def reconcile_resources( + resources: Iterable[ResourceRef], + logger: logging.Logger, +) -> None: + """Tell every registered `ResourceRegistrar` to replace its stored live + set with `resources`. + + Called once during startup reconcile with every live workflow / task / + node / worker. Each registrar's `reconcile` is atomic — on failure the + registrar's store is unchanged — so a raised exception is logged and + the sweep continues with the next registrar. The failing registrar + retries next boot. + """ + refs = list(resources) + for registrar in RESOURCE_REGISTRARS: + try: + await registrar.reconcile(refs, logger) + except Exception: + logger.exception( + "ResourceRegistrar %s.reconcile failed; store left untouched.", + registrar.name, + ) diff --git a/src/server/main.py b/src/server/main.py index 7fc264d..a9d690e 100644 --- a/src/server/main.py +++ b/src/server/main.py @@ -8,7 +8,8 @@ import uvicorn from fastapi import FastAPI -from lumid_hooks import HookBindings +from flowmesh_hook import ResourceKind +from lumid_hooks import HookBindings, ResourceRef if __name__ == "__main__" and __package__ is None: import sys @@ -19,7 +20,7 @@ from shared._version import FLOWMESH_RELEASE_VERSION -from .auth import resolve_system_principal +from .auth import reconcile_resources, resolve_system_principal from .clients import RedisClient from .config import NodeRole, ServerConfig from .dispatcher.factory import create_dispatcher @@ -305,6 +306,35 @@ async def _load_plugins(stack: AsyncExitStack) -> None: register(bindings) +async def _reconcile_resources() -> None: + """Refresh registrar-tracked records for every live resource, then purge + anything the sweep didn't touch. Runs once at startup after plugins load + so registrars don't drop grants on resources that outlived their TTL. + """ + refs: list[ResourceRef] = [] + + # Nodes (always present). + for node in await NODE_REGISTRY.list_nodes_async(): + refs.append(ResourceRef(kind=ResourceKind.NODE.value, id=node.id)) + + # Workers and workflows live on the root node. + if WORKER_REGISTRY is not None: + for worker in await WORKER_REGISTRY.list_workers_async(): + refs.append(ResourceRef(kind=ResourceKind.WORKER.value, id=worker.id)) + if WORKFLOW_REGISTRY is not None: + workflow_ids = await WORKFLOW_REGISTRY.get_workflow_ids_async() + for workflow_id in workflow_ids: + record = await WORKFLOW_REGISTRY.get_workflow_record_async(workflow_id) + if record is None: + continue + refs.append(ResourceRef(kind=ResourceKind.WORKFLOW.value, id=workflow_id)) + for task_id in record.task_ids: + refs.append(ResourceRef(kind=ResourceKind.TASK.value, id=task_id)) + + logger.info("Startup reconcile: %d live resource(s)", len(refs)) + await reconcile_resources(refs, logger) + + @asynccontextmanager async def _lifespan(_: FastAPI): async with AsyncExitStack() as plugin_stack: @@ -333,6 +363,11 @@ async def _lifespan(_: FastAPI): if EVENT_MONITOR is not None: EVENT_MONITOR.set_own_node(SUPERVISOR.node_id) + # --- Startup reconcile (registrar plugins) --- + # Runs after the supervisor handshake so this node is in NODE_REGISTRY + # and is included in the live batch. + await _reconcile_resources() + try: yield finally: diff --git a/src/server/requirements.txt b/src/server/requirements.txt index 9c4f33c..36b9a24 100644 --- a/src/server/requirements.txt +++ b/src/server/requirements.txt @@ -6,7 +6,7 @@ docker==7.1.0 fastapi==0.120.3 grpcio==1.76.0 httpx==0.28.1 -lumid-hooks==0.1.0 +lumid-hooks==0.2.0 protobuf==5.29.6 pydantic==2.12.3 python-multipart==0.0.27 diff --git a/src/server/supervisor/supervisor.py b/src/server/supervisor/supervisor.py index 26007b7..988ec1a 100644 --- a/src/server/supervisor/supervisor.py +++ b/src/server/supervisor/supervisor.py @@ -2,9 +2,9 @@ import asyncio import logging -import multiprocessing as mp import os import signal +from multiprocessing.process import BaseProcess from multiprocessing.queues import Queue as MPQueue from queue import Empty as QueueEmpty @@ -18,7 +18,12 @@ WorkerManagementConfig, ) from ..hooks import PrincipalContext -from ..utils.concurrent import TaskReceiver, TaskSender, create_task_channel +from ..utils.concurrent import ( + MP_CTX, + TaskReceiver, + TaskSender, + create_task_channel, +) _CMD_TIMEOUT = 120.0 _NODE_ID_HANDSHAKE_TIMEOUT = 30.0 @@ -42,10 +47,10 @@ def __init__( self._worker_management = worker_management self._logging_config = logging_config self._logger = logger - self._process: mp.Process | None = None + self._process: BaseProcess | None = None self._cmd_sender: TaskSender[CommandMessage, CommandResponse] | None = None self._cmd_receiver: TaskReceiver[CommandMessage, CommandResponse] | None = None - self._node_id_queue: MPQueue[str] = mp.Queue(maxsize=1) + self._node_id_queue: MPQueue[str] = MP_CTX.Queue(maxsize=1) self._node_id: str | None = None @property @@ -66,7 +71,7 @@ async def start(self, system_principal: PrincipalContext) -> None: return self._cmd_sender, self._cmd_receiver = create_task_channel() - self._process = mp.Process( + self._process = MP_CTX.Process( target=_run_supervisor, kwargs={ "identity": self._identity, diff --git a/src/server/utils/concurrent.py b/src/server/utils/concurrent.py index e9dce33..8f28489 100644 --- a/src/server/utils/concurrent.py +++ b/src/server/utils/concurrent.py @@ -4,6 +4,13 @@ from collections.abc import Iterable from multiprocessing.queues import Queue as MPQueue +# Shared multiprocessing context for FlowMesh child processes. Spawn (not fork) because +# the parent opens Redis-over-TLS connections during lifespan startup, which initialises +# OpenSSL state; OpenSSL is not fork-safe and the child can deadlock in +# `ssl.SSLContext.__new__` when it builds its own connections. All IPC primitives shared +# between parent and child must be created from this same context. +MP_CTX = mp.get_context("spawn") + type TaskIDType = str @@ -90,8 +97,8 @@ def send_result(self, task_id: TaskIDType, result: R) -> None: def create_task_channel[T, R]() -> tuple[TaskSender[T, R], TaskReceiver[T, R]]: - send_q: MPQueue[tuple[TaskIDType, T | Sentinel]] = mp.Queue() - recv_q: MPQueue[tuple[TaskIDType, R | Sentinel]] = mp.Queue() + send_q: MPQueue[tuple[TaskIDType, T | Sentinel]] = MP_CTX.Queue() + recv_q: MPQueue[tuple[TaskIDType, R | Sentinel]] = MP_CTX.Queue() sender = TaskSender(send_q, recv_q) receiver = TaskReceiver(send_q, recv_q) return sender, receiver diff --git a/tests/cli/test_stack_utils.py b/tests/cli/test_stack_utils.py index 04390c6..fbf449e 100644 --- a/tests/cli/test_stack_utils.py +++ b/tests/cli/test_stack_utils.py @@ -1,11 +1,14 @@ import os +from pathlib import Path from unittest.mock import patch import pytest from flowmesh_cli_stack.utils import ( + _PLUGIN_DATA_ALIAS, STACK_SLUG_ENV, STACK_SUFFIX_ENV, WORKER_RESULTS_DIR_ENV, + apply_plugin_data_env, apply_stack_resource_env, stack_resource_env_overrides, ) @@ -37,3 +40,46 @@ def test_apply_stack_resource_env_defaults_results_dirs_from_suffix() -> None: ): apply_stack_resource_env() assert os.environ[WORKER_RESULTS_DIR_ENV] == "flowmesh_node_alice.dev_results" + + +def test_apply_plugin_data_env_empty_resolves_default(tmp_path: Path) -> None: + with patch.dict(os.environ, {"FLOWMESH_PLUGIN_DATA_DIR": ""}, clear=True): + apply_plugin_data_env(tmp_path) + expected = (tmp_path / "plugin-data").as_posix() + assert os.environ["FLOWMESH_PLUGIN_DATA_DIR"] == expected + assert "FLOWMESH_PLUGIN_DATA_VOLUME" not in os.environ + assert not (tmp_path / "plugin-data").exists() # routing only; no mkdir + + +def test_apply_plugin_data_env_relative_path_is_cwd_resolved(tmp_path: Path) -> None: + env = {"FLOWMESH_PLUGIN_DATA_DIR": "./custom-data"} + with patch.dict(os.environ, env, clear=True): + apply_plugin_data_env(tmp_path) + expected = (tmp_path / "custom-data").as_posix() + assert os.environ["FLOWMESH_PLUGIN_DATA_DIR"] == expected + assert "FLOWMESH_PLUGIN_DATA_VOLUME" not in os.environ + + +def test_apply_plugin_data_env_absolute_path_passthrough(tmp_path: Path) -> None: + abs_path = "/var/lib/flowmesh-plugin" + with patch.dict(os.environ, {"FLOWMESH_PLUGIN_DATA_DIR": abs_path}, clear=True): + apply_plugin_data_env(tmp_path) + assert os.environ["FLOWMESH_PLUGIN_DATA_DIR"] == abs_path + assert "FLOWMESH_PLUGIN_DATA_VOLUME" not in os.environ + + +def test_apply_plugin_data_env_tilde_is_path(tmp_path: Path) -> None: + env = {"FLOWMESH_PLUGIN_DATA_DIR": "~/flowmesh-data"} + with patch.dict(os.environ, env, clear=True): + apply_plugin_data_env(tmp_path) + # Resolved against base_dir; the leading ~ keeps it in path mode. + assert "FLOWMESH_PLUGIN_DATA_VOLUME" not in os.environ + assert os.environ["FLOWMESH_PLUGIN_DATA_DIR"] # set to something resolved + + +def test_apply_plugin_data_env_bare_name_routes_to_volume(tmp_path: Path) -> None: + env = {"FLOWMESH_PLUGIN_DATA_DIR": "my_external_vol"} + with patch.dict(os.environ, env, clear=True): + apply_plugin_data_env(tmp_path) + assert os.environ["FLOWMESH_PLUGIN_DATA_VOLUME"] == "my_external_vol" + assert os.environ["FLOWMESH_PLUGIN_DATA_DIR"] == _PLUGIN_DATA_ALIAS diff --git a/tests/server/test_hooks_wiring.py b/tests/server/test_hooks_wiring.py index 25eb558..bd38436 100644 --- a/tests/server/test_hooks_wiring.py +++ b/tests/server/test_hooks_wiring.py @@ -7,7 +7,7 @@ """ import logging -from collections.abc import Iterator, Sequence +from collections.abc import Iterable, Iterator, Sequence from datetime import UTC, datetime from decimal import Decimal from typing import Any @@ -30,6 +30,7 @@ authenticate_api_key, authenticate_connection, deregister_resource, + reconcile_resources, register_resource, require_permission, resolve_accessible_ids, @@ -605,6 +606,7 @@ class _RecordingRegistrar: def __init__(self) -> None: self.registered: list[tuple[str, ResourceRef]] = [] self.deregistered: list[tuple[str, ResourceRef]] = [] + self.reconciled: list[list[ResourceRef]] = [] async def register( self, @@ -622,6 +624,13 @@ async def deregister( ) -> None: self.deregistered.append((principal.principal_id, resource)) + async def reconcile( + self, + resources: Iterable[ResourceRef], + logger: logging.Logger, + ) -> None: + self.reconciled.append(list(resources)) + class TestResourceRegistrarComposition: @pytest.fixture @@ -687,9 +696,56 @@ async def register(self, *args: Any, **kwargs: Any) -> None: async def deregister(self, *args: Any, **kwargs: Any) -> None: return None + async def reconcile(self, *args: Any, **kwargs: Any) -> None: + return None + register(BaseBindings(resource_registrars=[_Boom()])) with pytest.raises(RuntimeError, match="plugin failure"): await register_resource( principal, ResourceKind.WORKFLOW, "wfl-1", {}, logger ) + + @pytest.mark.anyio + async def test_reconcile_fans_out_with_full_batch( + self, logger: logging.Logger + ) -> None: + first = _RecordingRegistrar() + second = _RecordingRegistrar() + register(BaseBindings(resource_registrars=[first, second])) + + refs = [ + ResourceRef(kind=ResourceKind.WORKFLOW.value, id="wfl-1"), + ResourceRef(kind=ResourceKind.WORKER.value, id="wkr-1"), + ] + await reconcile_resources(refs, logger) + + for r in (first, second): + assert r.reconciled == [refs] + + @pytest.mark.anyio + async def test_reconcile_failure_is_logged_and_isolated( + self, logger: logging.Logger + ) -> None: + ok = _RecordingRegistrar() + + class _FailingReconcile: + name = "failing" + + async def register(self, *args: Any, **kwargs: Any) -> None: + return None + + async def deregister(self, *args: Any, **kwargs: Any) -> None: + return None + + async def reconcile(self, *args: Any, **kwargs: Any) -> None: + raise RuntimeError("reconcile boom") + + failing = _FailingReconcile() + register(BaseBindings(resource_registrars=[failing, ok])) + + # Sweep does not raise — failure is logged and the OK registrar + # still runs. + await reconcile_resources([], logger) + + assert ok.reconciled == [[]] diff --git a/uv.lock b/uv.lock index 650e27a..d54284f 100644 --- a/uv.lock +++ b/uv.lock @@ -1,5 +1,5 @@ version = 1 -revision = 3 +revision = 2 requires-python = ">=3.12" resolution-markers = [ "python_full_version >= '3.14'", @@ -2023,7 +2023,7 @@ ci = [ { name = "isort", specifier = ">=7.0.0" }, { name = "jinja2", specifier = ">=3.1.6" }, { name = "lumid-data-sdk", specifier = ">=0.1.0" }, - { name = "lumid-hooks", specifier = ">=0.1.0" }, + { name = "lumid-hooks", specifier = ">=0.2.0" }, { name = "matplotlib", specifier = ">=3.10.6" }, { name = "mcp", specifier = ">=1.23.0" }, { name = "mypy", specifier = ">=1.19.1" }, @@ -2185,7 +2185,7 @@ runtime-server = [ { name = "flowmesh-hook", editable = "hook" }, { name = "grpcio", specifier = ">=1.76.0" }, { name = "httpx", specifier = ">=0.28.1" }, - { name = "lumid-hooks", specifier = ">=0.1.0" }, + { name = "lumid-hooks", specifier = ">=0.2.0" }, { name = "protobuf", specifier = ">=5.29.6" }, { name = "pydantic", specifier = ">=2.12.3" }, { name = "python-multipart", specifier = ">=0.0.26" }, @@ -2338,7 +2338,7 @@ dependencies = [ ] [package.metadata] -requires-dist = [{ name = "lumid-hooks", specifier = ">=0.1.0" }] +requires-dist = [{ name = "lumid-hooks", specifier = ">=0.2.0" }] [[package]] name = "flowmesh-sdk" @@ -3567,14 +3567,14 @@ wheels = [ [[package]] name = "lumid-hooks" -version = "0.1.0" +version = "0.2.0" source = { registry = "https://pypi.org/simple" } dependencies = [ { name = "pydantic" }, ] -sdist = { url = "https://files.pythonhosted.org/packages/1f/b4/d660b386d13051669eae4c9056154c638302a1eea39fccb569075ed07243/lumid_hooks-0.1.0.tar.gz", hash = "sha256:0ae85cef586645391f21ea3d912de1bf2be77b6b6dd0690f3841bb238fa0315a", size = 11269, upload-time = "2026-05-12T09:07:16.782Z" } +sdist = { url = "https://files.pythonhosted.org/packages/34/b7/f2c4544b4df91ce80908f17cd5645611ec2538ddc6444bc00ad8e5961531/lumid_hooks-0.2.0.tar.gz", hash = "sha256:3c33d067c238337c16be88708d674448d83642e767bc0724e0ab1b5f6122010d", size = 11655, upload-time = "2026-05-22T03:49:57.706Z" } wheels = [ - { url = "https://files.pythonhosted.org/packages/85/70/84678eb357151f84fd78e6ce8270a42180a2605a3a3168492201feed76bf/lumid_hooks-0.1.0-py3-none-any.whl", hash = "sha256:8cfd8f3ed36001711b70290a5043a213322deaced1046d4583a4739f11cb00a6", size = 12796, upload-time = "2026-05-12T09:07:15.629Z" }, + { url = "https://files.pythonhosted.org/packages/67/ac/1b39a49650a27f012b847b6be939866cec39d4aea96d50177ec8861d21ea/lumid_hooks-0.2.0-py3-none-any.whl", hash = "sha256:a3e471ee38cfe83eb46199a64c269bc0ad9f81154d820e68d45c604a16e1f2e7", size = 13002, upload-time = "2026-05-22T03:49:56.281Z" }, ] [[package]] @@ -7481,10 +7481,15 @@ name = "triton" version = "3.6.0" source = { registry = "https://pypi.org/simple" } wheels = [ + { url = "https://files.pythonhosted.org/packages/17/5d/08201db32823bdf77a0e2b9039540080b2e5c23a20706ddba942924ebcd6/triton-3.6.0-cp312-cp312-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:374f52c11a711fd062b4bfbb201fd9ac0a5febd28a96fb41b4a0f51dde3157f4", size = 176128243, upload-time = "2026-01-20T16:16:07.857Z" }, { url = "https://files.pythonhosted.org/packages/ab/a8/cdf8b3e4c98132f965f88c2313a4b493266832ad47fb52f23d14d4f86bb5/triton-3.6.0-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:74caf5e34b66d9f3a429af689c1c7128daba1d8208df60e81106b115c00d6fca", size = 188266850, upload-time = "2026-01-20T16:00:43.041Z" }, + { url = "https://files.pythonhosted.org/packages/3c/12/34d71b350e89a204c2c7777a9bba0dcf2f19a5bfdd70b57c4dbc5ffd7154/triton-3.6.0-cp313-cp313-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:448e02fe6dc898e9e5aa89cf0ee5c371e99df5aa5e8ad976a80b93334f3494fd", size = 176133521, upload-time = "2026-01-20T16:16:13.321Z" }, { url = "https://files.pythonhosted.org/packages/f9/0b/37d991d8c130ce81a8728ae3c25b6e60935838e9be1b58791f5997b24a54/triton-3.6.0-cp313-cp313-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:10c7f76c6e72d2ef08df639e3d0d30729112f47a56b0c81672edc05ee5116ac9", size = 188289450, upload-time = "2026-01-20T16:00:49.136Z" }, + { url = "https://files.pythonhosted.org/packages/ce/4e/41b0c8033b503fd3cfcd12392cdd256945026a91ff02452bef40ec34bee7/triton-3.6.0-cp313-cp313t-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:1722e172d34e32abc3eb7711d0025bb69d7959ebea84e3b7f7a341cd7ed694d6", size = 176276087, upload-time = "2026-01-20T16:16:18.989Z" }, { url = "https://files.pythonhosted.org/packages/35/f8/9c66bfc55361ec6d0e4040a0337fb5924ceb23de4648b8a81ae9d33b2b38/triton-3.6.0-cp313-cp313t-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:d002e07d7180fd65e622134fbd980c9a3d4211fb85224b56a0a0efbd422ab72f", size = 188400296, upload-time = "2026-01-20T16:00:56.042Z" }, + { url = "https://files.pythonhosted.org/packages/49/55/5ecf0dcaa0f2fbbd4420f7ef227ee3cb172e91e5fede9d0ecaddc43363b4/triton-3.6.0-cp314-cp314-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:ef5523241e7d1abca00f1d240949eebdd7c673b005edbbce0aca95b8191f1d43", size = 176138577, upload-time = "2026-01-20T16:16:25.426Z" }, { url = "https://files.pythonhosted.org/packages/df/3d/9e7eee57b37c80cec63322c0231bb6da3cfe535a91d7a4d64896fcb89357/triton-3.6.0-cp314-cp314-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:a17a5d5985f0ac494ed8a8e54568f092f7057ef60e1b0fa09d3fd1512064e803", size = 188273063, upload-time = "2026-01-20T16:01:07.278Z" }, + { url = "https://files.pythonhosted.org/packages/48/db/56ee649cab5eaff4757541325aca81f52d02d4a7cd3506776cad2451e060/triton-3.6.0-cp314-cp314t-manylinux_2_27_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:0b3a97e8ed304dfa9bd23bb41ca04cdf6b2e617d5e782a8653d616037a5d537d", size = 176274804, upload-time = "2026-01-20T16:16:31.528Z" }, { url = "https://files.pythonhosted.org/packages/f6/56/6113c23ff46c00aae423333eb58b3e60bdfe9179d542781955a5e1514cb3/triton-3.6.0-cp314-cp314t-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:46bd1c1af4b6704e554cad2eeb3b0a6513a980d470ccfa63189737340c7746a7", size = 188397994, upload-time = "2026-01-20T16:01:14.236Z" }, ]