diff --git a/docs/proposals/extension-architecture-refactor-plan.md b/docs/proposals/extension-architecture-refactor-plan.md new file mode 100644 index 00000000..2e363a2a --- /dev/null +++ b/docs/proposals/extension-architecture-refactor-plan.md @@ -0,0 +1,478 @@ +# Extension Architecture Refactor Plan + +Companion to: +- `/home/trevor/dev/PLUGIN_DESIGN.md` + +## Summary + +This proposal turns the updated extension-architecture design into a concrete refactor roadmap for `roar`. + +The goal is not to build a generic plugin system. The goal is to make `roar` easier to maintain and extend by tightening the extension seams that already exist: + +1. **execution backends** for alternate execution semantics +2. **typed provider registries** for provider-shaped integrations +3. **static lazy CLI commands** unless command growth justifies a very small command-set seam later + +The main maintainability problems in the current codebase are not a lack of generic plugin machinery. They are a handful of specific boundary leaks: + +- proxy lifecycle logic still lives in the shared host execution path +- bootstrap still mixes core initialization with concrete provider registration +- optional provider discovery still uses one broad entry-point group +- the CLI has no narrow command-contribution seam if optional command groups multiply + +This plan addresses those issues in small, independently shippable phases. + +## Goals + +1. Keep the local-first tracked execution path small and predictable. +2. Make extension points explicit, typed, and easy to test. +3. Preserve the existing execution-backend framework as the primary extension surface. +4. Reduce direct knowledge of optional runtime resources inside shared host execution. +5. Keep labels, lineage, composites, and persistence as core product semantics. +6. Avoid introducing a generic hook bus or plugin manager. +7. Sequence changes so each phase is independently deployable and low-risk. + +## Non-goals + +1. Replacing the current execution-backend framework. +2. Moving GLaaS flows behind a new generic plugin abstraction. +3. Generalizing all CLI commands into dynamically discovered plugins. +4. Changing the persistence model for labels, lineage, artifacts, or composites. +5. Refactoring Ray and OSMO behavior unless needed to support clearer shared seams. + +## Current-state audit + +### What is already in good shape + +#### 1. Execution backends are the right extension seam + +The current backend framework already owns the important execution-planning surface: + +- `roar/execution/framework/contract.py` +- `roar/execution/framework/planning.py` +- `roar/execution/framework/registry.py` +- `roar/backends/local/plugin.py` +- `roar/backends/ray/plugin.py` +- `roar/backends/osmo/plugin.py` + +This is the correct abstraction for: + +- command matching +- command rewriting +- host execution dispatch +- distributed runtime bootstrap +- runtime import behavior +- fragment reconstitution +- backend-owned config + +This framework should be preserved and tightened, not replaced. + +#### 2. Labels and lineage semantics are correctly core-owned + +Current core/application ownership of labels and publish-time label sync is the right long-term boundary: + +- `roar/application/labels.py` +- `roar/cli/commands/label.py` +- `roar/application/publish/registration.py` + +#### 3. Static lazy CLI loading is still a good default + +`roar/cli/__init__.py` keeps startup and `--help` fast while making top-level commands easy to audit. That is still the right default. + +### What currently leaks + +#### Leak A: proxy lifecycle in shared host execution + +The current shared host execution path still decides whether proxy support is enabled and instantiates `ProxyService` directly: + +- `roar/execution/runtime/host_execution.py` +- `roar/execution/runtime/coordinator.py` + +The coordinator then owns: + +- resource startup +- env patching +- teardown +- S3 observation collection + +That means a specific optional runtime resource is still coupled to the generic host path. + +#### Leak B: bootstrap mixes core init and provider wiring + +`roar/core/bootstrap.py` currently does all of the following: + +- configures logging +- imports concrete built-in providers +- registers built-in providers +- discovers optional providers + +That works, but it mixes application bootstrap with concrete integration registration. + +#### Leak C: one broad provider entry-point group + +`roar/integrations/discovery.py` currently discovers optional integrations via the generic `roar.integrations` entry-point group. + +That is workable for now, but over time a single catch-all group becomes less maintainable than typed groups. + +#### Leak D: no narrow future seam for command-family growth + +The CLI is correctly static today, but if more backend-owned or optional command families appear, `roar` will need a small command-set seam. The important thing is to add that only if there is a real need, and to keep it separate from execution and provider concerns. + +## Architecture decision + +The accepted long-term strategy is: + +- keep **execution backends** as the main extensibility model +- keep **provider registries** narrow and typed +- keep **CLI commands static** unless growth justifies a small command-set registry +- keep **remote lineage semantics built-in** unless a second remote registry target appears +- keep **labels, lineage, composites, and persistence in core/application** +- do **not** introduce a generic plugin manager or hook bus + +## Success criteria + +This refactor is successful when: + +1. `roar/execution/runtime/host_execution.py` no longer directly branches on `proxy.enabled` and constructs `ProxyService` inline. +2. `RunCoordinator` no longer contains proxy-specific lifecycle logic. +3. `roar/core/bootstrap.py` no longer imports concrete built-in provider implementations directly. +4. provider discovery can distinguish telemetry and VCS entry points explicitly. +5. no new generic lifecycle hook bus or plugin manifest abstraction is introduced. +6. the current execution-backend tests and integration-discovery tests still pass. +7. the local product path remains the default, simple path. + +## Refactor plan + +## Phase 1: Extract host runtime resources from shared host execution + +### Why this phase comes first + +This is the most important maintainability fix because it removes the most obvious optional-feature branching from the shared host path without changing the execution-backend model. + +### Proposed design + +Introduce a **small host runtime-resource contract** for resources that must be started around a local host execution. + +This should be intentionally narrower than a hook system. + +Suggested shape: + +```python +@dataclass(frozen=True) +class RuntimeResourceStart: + env: Mapping[str, str] = field(default_factory=dict) + +@dataclass(frozen=True) +class RuntimeResourceStop: + observations: Mapping[str, Any] = field(default_factory=dict) + +class HostRuntimeResource(Protocol): + name: str + + def start(self, ctx: RunContext, environ: Mapping[str, str]) -> RuntimeResourceStart: ... + def stop(self, *, exit_code: int | None) -> RuntimeResourceStop: ... +``` + +The first concrete implementation should be: + +- `ProxyRuntimeResource` + +Responsibilities: + +- decide enablement from config or explicit runtime selection +- start and stop `ProxyService` +- provide `AWS_ENDPOINT_URL` / `ROAR_UPSTREAM_S3_ENDPOINT` env patches +- return collected S3 observations on stop + +The coordinator should know only that it is working with runtime resources and resource observations. It should not know about proxy-specific startup rules. + +### Important constraint + +Do **not** generalize this into a cross-cutting lifecycle hook bus. This seam is only for host runtime resources with explicit startup, env patch, and teardown semantics. + +### Likely files + +Core implementation: +- `roar/execution/runtime/host_execution.py` +- `roar/execution/runtime/coordinator.py` +- new: `roar/execution/runtime/resources.py` +- new: `roar/execution/runtime/proxy_resource.py` or `roar/execution/cluster/proxy_resource.py` + +Proxy code reused: +- `roar/execution/cluster/proxy.py` + +Potential shared helper cleanup: +- `roar/execution/runtime/driver_entrypoint.py` + +Tests to add/update: +- `tests/unit/test_proxy_coordinator.py` +- `tests/unit/test_proxy_service.py` +- `tests/integration/test_proxy_integration.py` +- `tests/backends/ray/unit/test_driver_entrypoint.py` + +### Acceptance criteria + +1. `execute_host_run()` no longer directly checks `proxy.enabled`. +2. `RunCoordinator` no longer has proxy-specific env/start/stop branches. +3. proxy lifecycle remains deterministic on success and failure paths. +4. existing S3 lineage behavior stays unchanged. +5. Ray driver proxy tests still pass. + +### PR boundary + +This should be one PR. + +It is cohesive because it removes one specific boundary leak without changing backend contracts or CLI behavior. + +## Phase 2: Move provider wiring out of core bootstrap + +### Why this phase is next + +After runtime cleanup, the next biggest clarity win is separating core process bootstrap from integration registration. + +### Proposed design + +Keep `roar.core.bootstrap.bootstrap()` responsible for: + +- one-time initialization +- logging setup +- calling a dedicated integration bootstrap helper + +Move provider registration/discovery into an explicit integration bootstrap module, for example: + +- `roar/integrations/bootstrap.py` + +Suggested functions: + +- `register_builtin_providers()` +- `discover_optional_providers()` +- `bootstrap_integrations()` + +This keeps concrete provider wiring in the integrations layer rather than the core bootstrap layer. + +### Important constraint + +Do not turn bootstrap into a plugin manager. This change is about moving concrete provider ownership to the right package, not about adding more generalized architecture. + +### Likely files + +Core/bootstrap: +- `roar/core/bootstrap.py` + +New/updated integration files: +- new: `roar/integrations/bootstrap.py` +- `roar/integrations/__init__.py` +- `roar/integrations/discovery.py` +- `roar/integrations/registry.py` + +Tests to add/update: +- `tests/integration/test_bootstrap_integrations.py` +- `tests/integrations/test_integration_discovery.py` +- `tests/unit/test_bootstrap_config_path.py` + +### Acceptance criteria + +1. `roar/core/bootstrap.py` no longer imports `GitVCSProvider` or `WandBTelemetryProvider` directly. +2. built-in provider registration still happens exactly once. +3. bootstrap remains lazy and deterministic. +4. current provider-discovery tests still pass. + +### PR boundary + +This should be one PR after Phase 1. + +## Phase 3: Introduce typed provider entry-point groups + +### Why this phase matters + +A single `roar.integrations` entry-point group is convenient early on, but it becomes harder to validate and reason about as integration types grow. + +Typed groups make extension clearer without inventing a generic plugin host. + +### Proposed design + +Add typed entry-point groups such as: + +- `roar.telemetry_providers` +- `roar.vcs_providers` + +Discovery should support both: + +- new typed groups +- the legacy `roar.integrations` group during a compatibility window + +The new preferred path should be typed groups. The legacy group should remain supported temporarily for backwards compatibility. + +### Important constraint + +Keep discovery type-specific. Do not reintroduce a manifest model that mixes commands, runtime hooks, config, and providers into one registration payload. + +### Likely files + +Packaging and discovery: +- `pyproject.toml` +- `roar/integrations/discovery.py` +- `roar/integrations/registry.py` + +Docs/tests: +- `docs/developer/execution-backend-adapter.md` +- `tests/integrations/test_integration_discovery.py` +- possibly new targeted tests for typed-group loading + +### Acceptance criteria + +1. telemetry and VCS providers can be discovered through typed groups. +2. legacy `roar.integrations` discovery continues to work during migration. +3. provider validation stays explicit. +4. no runtime behavior changes for users who do not install optional providers. + +### PR boundary + +This should be one additive compatibility PR. + +## Phase 4: Add a command-set registry only if the CLI crosses a clear threshold + +### Decision gate + +Do **not** implement this phase now unless one of these becomes true: + +1. there are at least three backend-owned or optional command families beyond the current static set, or +2. a third-party package needs to contribute top-level commands, or +3. static `LAZY_COMMANDS` ownership becomes a recurring maintenance problem + +### If the gate is met + +Introduce a small command-set registry with a narrow contract only: + +```python +@dataclass(frozen=True) +class CommandSetSpec: + name: str + module_path: str + attr_name: str + short_help: str + help_section: str | None = None +``` + +This registry should handle only: + +- command contribution +- lazy loading +- help grouping + +It should **not** also own runtime hooks, backend registration, or provider wiring. + +### Likely files if triggered + +- `roar/cli/__init__.py` +- new: `roar/cli/command_registry.py` +- `pyproject.toml` if external command sets are allowed +- tests around help rendering and lazy loading + +### Acceptance criteria if triggered + +1. startup/help remains lazy. +2. command contribution stays separate from backend/provider contracts. +3. `LAZY_COMMANDS` can be partly generated from contributed command specs. +4. no generic plugin host appears. + +### PR boundary + +Only proceed if the trigger is hit. + +## Phase 5: Abstract remote registry transport only if a second remote target appears + +### Decision gate + +Do **not** implement this phase unless `roar` must support something meaningfully different from GLaaS. + +### Why this is deferred + +GLaaS is still part of `roar`’s main product story. Abstracting it too early would add indirection without solving a real problem. + +### If the gate is met + +Add a narrow remote registry transport contract for: + +- session registration +- job registration +- artifact registration +- label sync +- fragment transport/finalization + +Do not expand this into a generic plugin host. + +### Likely files if triggered + +- `roar/application/publish/*` +- `roar/integrations/glaas/*` +- new transport contract module under `roar/application/publish/` or `roar/core/interfaces/` +- publish/get/reproduce tests + +## What not to do + +To keep this effort aligned with the maintainability goal, do **not** do the following: + +1. Do not add `roar.plugins.*`. +2. Do not add a plugin manifest type. +3. Do not add a generic hook bus for run lifecycle stages. +4. Do not let optional extensions mutate recorder or DB behavior indirectly. +5. Do not move labels or lineage semantics out of core. +6. Do not abstract GLaaS behind a transport contract until there is a second real target. +7. Do not add a command registry unless the CLI actually needs one. + +## Verification plan + +Every phase should run the standard `roar` gates before merge. + +Baseline repo gates: + +```bash +pytest -m "not live_glaas and not ebpf" +ruff check . +mypy roar +``` + +Targeted fast checks for this refactor track: + +```bash +.venv/bin/pytest \ + tests/execution/framework/test_execution_framework_layout.py \ + tests/integration/test_bootstrap_integrations.py \ + tests/integrations/test_integration_discovery.py \ + tests/unit/test_proxy_coordinator.py \ + tests/backends/ray/unit/test_driver_entrypoint.py +``` + +If proxy/resource behavior changes materially, also run: + +```bash +.venv/bin/pytest \ + tests/integration/test_proxy_integration.py \ + tests/integration/test_proxy_cli_integration.py +``` + +## Recommended rollout order + +1. **Phase 1**: runtime-resource extraction for proxy +2. **Phase 2**: provider bootstrap cleanup +3. **Phase 3**: typed provider entry-point groups with compatibility fallback +4. **Phase 4**: command-set registry only if the CLI crosses the trigger +5. **Phase 5**: remote registry abstraction only if a second target appears + +This order gives the biggest maintainability win first while keeping each step small and safe. + +## Suggested first PR + +If work starts immediately, the first PR should be: + +**`refactor(runtime): extract proxy lifecycle from shared host execution`** + +Scope: + +- introduce the host runtime-resource seam +- move proxy startup/env/teardown out of `RunCoordinator` +- keep user-facing behavior unchanged +- add/update targeted proxy lifecycle tests + +That is the most concrete step toward making `roar` easier to maintain without backsliding into a generic plugin architecture. diff --git a/roar/application/publish/put_composites.py b/roar/application/publish/put_composites.py index d5527e01..4d6da1ad 100644 --- a/roar/application/publish/put_composites.py +++ b/roar/application/publish/put_composites.py @@ -21,14 +21,15 @@ ) from ...core.interfaces.logger import ILogger from ...db.context import optional_repo -from ...integrations.glaas import GlaasClient from ...integrations.glaas.registration import _artifact_ref +from .remote_registry import RemoteRegistryTransport, coerce_remote_registry def preregister_put_lineage_composites_with_glaas( *, db_ctx: Any, - glaas_client: GlaasClient, + remote_registry: RemoteRegistryTransport | None = None, + glaas_client: Any | None = None, lineage_artifacts: list[dict[str, Any]], session_hash: str, registration_errors: list[str], @@ -46,6 +47,7 @@ def preregister_put_lineage_composites_with_glaas( logger=logger, ) return preregister_lineage_composites( + remote_registry=remote_registry, glaas_client=glaas_client, payloads=payloads, registration_errors=registration_errors, @@ -153,7 +155,8 @@ def resolve_put_lineage_component_for_registration( def register_put_composites_with_glaas( *, db_ctx: Any, - glaas_client: GlaasClient, + remote_registry: RemoteRegistryTransport | None = None, + glaas_client: Any | None = None, composite_results: list[CompositeBuildResult], registration_errors: list[str], dataset_identifiers: list[dict[str, Any]] | None, @@ -161,6 +164,10 @@ def register_put_composites_with_glaas( ) -> list[dict[str, Any]]: """Register generated composite artifacts with GLaaS and persist local state.""" composite_registrations: list[dict[str, Any]] = [] + resolved_remote_registry = coerce_remote_registry( + remote_registry=remote_registry, + glaas_client=glaas_client, + ) for composite in composite_results: payload = dict(composite.payload) @@ -173,7 +180,7 @@ def register_put_composites_with_glaas( if metadata_json is not None: payload["metadata"] = metadata_json - response = glaas_client.register_composite_artifact(payload) + response = resolved_remote_registry.register_composite_artifact(payload) result, error = parse_composite_registration_response(response) composite_registration: dict[str, Any] = { diff --git a/roar/application/publish/put_preparation.py b/roar/application/publish/put_preparation.py index d023d4de..26f55d6c 100644 --- a/roar/application/publish/put_preparation.py +++ b/roar/application/publish/put_preparation.py @@ -15,6 +15,7 @@ detect_additional_publish_composite_roots, infer_publish_dataset_identifiers, ) +from .remote_registry import coerce_remote_registry from .runtime import PublishRuntime from .session import prepare_publish_session @@ -62,9 +63,15 @@ def prepare_put_execution( logger=logger, git_commit=git_commit, ) - publish_session = prepare_publish_session( + runtime_dict = getattr(runtime, "__dict__", {}) + remote_registry = coerce_remote_registry( + remote_registry=runtime_dict.get("remote_registry"), glaas_client=runtime.glaas_client, session_service=runtime.session_service, + registration_coordinator=runtime_dict.get("registration_coordinator"), + ) + publish_session = prepare_publish_session( + remote_registry=remote_registry, roar_dir=roar_dir, session_id=session_id, git_context=git_context, diff --git a/roar/application/publish/register_preparation.py b/roar/application/publish/register_preparation.py index d4db2640..72af1a7d 100644 --- a/roar/application/publish/register_preparation.py +++ b/roar/application/publish/register_preparation.py @@ -10,6 +10,7 @@ from ...core.interfaces.registration import GitContext from ...publish_auth import resolve_publish_creator_identity from ..git import build_roar_git_tag_name, ensure_clean_git_repo, resolve_roar_git_context +from .remote_registry import coerce_remote_registry from .runtime import PublishRuntime from .session import prepare_publish_session @@ -59,10 +60,16 @@ def prepare_register_execution( git_tag_name = build_roar_git_tag_name(git_context.commit, short=True) git_tag_repo_root = git_state.repo_root - creator_identity = resolve_publish_creator_identity(runtime.glaas_client.publish_auth) - publish_session = prepare_publish_session( + runtime_dict = getattr(runtime, "__dict__", {}) + remote_registry = coerce_remote_registry( + remote_registry=runtime_dict.get("remote_registry"), glaas_client=runtime.glaas_client, session_service=runtime.session_service, + registration_coordinator=runtime_dict.get("registration_coordinator"), + ) + creator_identity = resolve_publish_creator_identity(remote_registry.publish_auth) + publish_session = prepare_publish_session( + remote_registry=remote_registry, roar_dir=roar_dir, session_id=session_id, git_context=git_context, diff --git a/roar/application/publish/registration.py b/roar/application/publish/registration.py index 1c837f8c..73bbcedc 100644 --- a/roar/application/publish/registration.py +++ b/roar/application/publish/registration.py @@ -7,8 +7,8 @@ from ...core.interfaces.logger import ILogger from ...core.interfaces.registration import BatchRegistrationResult, GitContext -from ...integrations.glaas import GlaasClient from ..labels import collect_label_sync_payloads +from .remote_registry import RemoteRegistryTransport, coerce_remote_registry _VALID_REMOTE_SOURCE_TYPES = {"s3", "gs", "https"} @@ -328,7 +328,8 @@ def prepare_batch_registration_artifacts( def register_publish_lineage( *, coordinator: Any, - glaas_client: GlaasClient, + remote_registry: RemoteRegistryTransport | None = None, + glaas_client: Any | None = None, session_hash: str, git_context: GitContext, jobs: list[dict[str, Any]], @@ -349,11 +350,16 @@ def register_publish_lineage( if pre_registration_errors: batch_result.errors = [*pre_registration_errors, *batch_result.errors] + resolved_remote_registry = coerce_remote_registry( + remote_registry=remote_registry, + glaas_client=glaas_client, + ) + labels_are_safe_to_sync = batch_result.jobs_failed == 0 and batch_result.artifacts_failed == 0 if session_id is not None and db_ctx is not None and labels_are_safe_to_sync: sync_publish_labels( - glaas_client=glaas_client, + remote_registry=resolved_remote_registry, db_ctx=db_ctx, session_id=session_id, session_hash=session_hash, @@ -367,16 +373,21 @@ def register_publish_lineage( def preregister_lineage_composites( *, - glaas_client: GlaasClient, + remote_registry: RemoteRegistryTransport | None = None, + glaas_client: Any | None = None, payloads: list[CompositeRegistrationCandidate], registration_errors: list[str], logger: ILogger, ) -> list[dict[str, Any]]: """Register lineage composites before the main link phase.""" registrations: list[dict[str, Any]] = [] + resolved_remote_registry = coerce_remote_registry( + remote_registry=remote_registry, + glaas_client=glaas_client, + ) for item in payloads: - response = glaas_client.register_composite_artifact(item.payload) + response = resolved_remote_registry.register_composite_artifact(item.payload) result, error = parse_composite_registration_response(response) registration: dict[str, Any] = { @@ -413,7 +424,8 @@ def preregister_lineage_composites( def sync_publish_labels( *, - glaas_client: GlaasClient, + remote_registry: RemoteRegistryTransport | None = None, + glaas_client: Any | None = None, db_ctx: Any, session_id: int | None, session_hash: str, @@ -432,7 +444,12 @@ def sync_publish_labels( if not payloads: return - _label_result, label_error = glaas_client.sync_labels(payloads) + resolved_remote_registry = coerce_remote_registry( + remote_registry=remote_registry, + glaas_client=glaas_client, + ) + + _label_result, label_error = resolved_remote_registry.sync_labels(payloads) if label_error and errors is not None: errors.append(f"Label sync failed: {label_error}") diff --git a/roar/application/publish/remote_registry.py b/roar/application/publish/remote_registry.py new file mode 100644 index 00000000..ac91d141 --- /dev/null +++ b/roar/application/publish/remote_registry.py @@ -0,0 +1,85 @@ +"""Remote lineage registry transport contracts for publish workflows.""" + +from __future__ import annotations + +from dataclasses import dataclass +from typing import Any, Protocol + + +class RemoteRegistryTransport(Protocol): + """Narrow transport contract for remote lineage registration workflows.""" + + @property + def name(self) -> str: ... + + @property + def client(self) -> Any: ... + + @property + def session_service(self) -> Any | None: ... + + @property + def registration_coordinator(self) -> Any | None: ... + + @property + def publish_auth(self) -> Any: ... + + def is_configured(self) -> bool: ... + + def health_check(self) -> Any: ... + + def register_composite_artifact(self, payload: dict[str, Any]) -> Any: ... + + def sync_labels(self, payloads: list[dict[str, Any]]) -> Any: ... + + +@dataclass(frozen=True) +class GlaasRemoteRegistryTransport: + """GLaaS-backed implementation of the remote registry transport contract.""" + + client: Any + session_service: Any | None = None + registration_coordinator: Any | None = None + name: str = "glaas" + + @property + def publish_auth(self) -> Any: + return getattr(self.client, "publish_auth", None) + + def is_configured(self) -> bool: + return bool(self.client.is_configured()) + + def health_check(self) -> Any: + return self.client.health_check() + + def register_composite_artifact(self, payload: dict[str, Any]) -> Any: + return self.client.register_composite_artifact(payload) + + def sync_labels(self, payloads: list[dict[str, Any]]) -> Any: + return self.client.sync_labels(payloads) + + +def coerce_remote_registry( + *, + remote_registry: RemoteRegistryTransport | None = None, + glaas_client: Any | None = None, + session_service: Any | None = None, + registration_coordinator: Any | None = None, +) -> RemoteRegistryTransport: + """Resolve the concrete remote registry transport from new or legacy inputs.""" + if remote_registry is not None: + return remote_registry + if glaas_client is None: + raise ValueError("remote registry transport requires a client") + return GlaasRemoteRegistryTransport( + client=glaas_client, + session_service=session_service, + registration_coordinator=registration_coordinator, + ) + + +__all__ = [ + "GlaasRemoteRegistryTransport", + "RemoteRegistryTransport", + "coerce_remote_registry", +] diff --git a/roar/application/publish/runtime.py b/roar/application/publish/runtime.py index 69ae00f9..3c43344c 100644 --- a/roar/application/publish/runtime.py +++ b/roar/application/publish/runtime.py @@ -12,17 +12,31 @@ SessionRegistrationService, ) from .lineage import LineageCollector +from .remote_registry import GlaasRemoteRegistryTransport, RemoteRegistryTransport @dataclass(frozen=True) class PublishRuntime: """Concrete dependency set for publish workflows.""" - glaas_client: GlaasClient - session_service: SessionRegistrationService - registration_coordinator: RegistrationCoordinator + remote_registry: RemoteRegistryTransport lineage_collector: LineageCollector + @property + def glaas_client(self): + """Backward-compatible access to the underlying GLaaS client.""" + return self.remote_registry.client + + @property + def session_service(self): + """Backward-compatible access to the publish session service.""" + return self.remote_registry.session_service + + @property + def registration_coordinator(self): + """Backward-compatible access to the registration coordinator.""" + return self.remote_registry.registration_coordinator + def build_publish_runtime( *, @@ -39,13 +53,16 @@ def build_publish_runtime( session_service = SessionRegistrationService(glaas_client) artifact_service = ArtifactRegistrationService(glaas_client) job_service = JobRegistrationService(glaas_client) - return PublishRuntime( - glaas_client=glaas_client, + registration_coordinator = RegistrationCoordinator( session_service=session_service, - registration_coordinator=RegistrationCoordinator( + artifact_service=artifact_service, + job_service=job_service, + ) + return PublishRuntime( + remote_registry=GlaasRemoteRegistryTransport( + client=glaas_client, session_service=session_service, - artifact_service=artifact_service, - job_service=job_service, + registration_coordinator=registration_coordinator, ), lineage_collector=LineageCollector(), ) diff --git a/roar/application/publish/service.py b/roar/application/publish/service.py index 073e30fc..4488e8e5 100644 --- a/roar/application/publish/service.py +++ b/roar/application/publish/service.py @@ -164,10 +164,17 @@ class _PutPlanResult: class _RegisterPreviewRuntime: """Minimal runtime surface for local `roar register --dry-run` flows.""" - glaas_client: Any - session_service: Any + remote_registry: Any lineage_collector: Any + @property + def glaas_client(self) -> Any: + return self.remote_registry.client + + @property + def session_service(self) -> Any: + return self.remote_registry.session_service + @dataclass(frozen=True) class _PreparedRegisterPreviewExecution: @@ -191,6 +198,7 @@ def build_register_preview_runtime( from ...integrations.glaas.registration.session import SessionRegistrationService from ...publish_auth import PublishAuthContext from .lineage import LineageCollector + from .remote_registry import GlaasRemoteRegistryTransport publish_auth = None if not allow_public_without_binding: @@ -208,9 +216,12 @@ def build_register_preview_runtime( publish_auth=publish_auth, allow_public_without_binding=allow_public_without_binding, ) + session_service = SessionRegistrationService(glaas_client) return _RegisterPreviewRuntime( - glaas_client=glaas_client, - session_service=SessionRegistrationService(glaas_client), + remote_registry=GlaasRemoteRegistryTransport( + client=glaas_client, + session_service=session_service, + ), lineage_collector=LineageCollector(), ) @@ -234,7 +245,7 @@ def prepare_register_preview_execution( if session_hash_override: session_hash = session_hash_override elif lineage is not None: - creator_identity = resolve_publish_creator_identity(runtime.glaas_client.publish_auth) + creator_identity = resolve_publish_creator_identity(runtime.remote_registry.publish_auth) session_hash = compute_canonical_session_hash( build_canonical_session_payload( lineage=lineage, diff --git a/roar/application/publish/session.py b/roar/application/publish/session.py index 45a3c9f9..50785cea 100644 --- a/roar/application/publish/session.py +++ b/roar/application/publish/session.py @@ -10,7 +10,7 @@ from ...core.interfaces.lineage import LineageData from ...core.interfaces.logger import ILogger from ...core.interfaces.registration import GitContext, SessionRegistrationResult -from ...integrations.glaas import GlaasClient +from .remote_registry import RemoteRegistryTransport, coerce_remote_registry class PublishSessionService(Protocol): @@ -153,8 +153,9 @@ def _normalize_metadata(value: Any) -> dict[str, Any]: def prepare_publish_session( *, - glaas_client: GlaasClient, - session_service: PublishSessionService, + remote_registry: RemoteRegistryTransport | None = None, + glaas_client: Any | None = None, + session_service: PublishSessionService | None = None, roar_dir: Path, session_id: int | None, git_context: GitContext, @@ -166,6 +167,15 @@ def prepare_publish_session( creator_identity: str | None = None, ) -> PreparedPublishSession: """Compute and optionally register the publish session.""" + resolved_remote_registry = coerce_remote_registry( + remote_registry=remote_registry, + glaas_client=glaas_client, + session_service=session_service, + ) + resolved_session_service = resolved_remote_registry.session_service + if resolved_session_service is None: + raise ValueError("publish session requires a remote registry session service") + if session_hash_override: session_hash = session_hash_override elif lineage is not None and creator_identity is not None: @@ -179,7 +189,7 @@ def prepare_publish_session( else: if session_id is None: raise ValueError("Cannot compute a session hash without a local session id.") - session_hash = session_service.compute_session_hash( + session_hash = resolved_session_service.compute_session_hash( roar_dir=str(roar_dir), session_id=session_id, ) @@ -189,18 +199,18 @@ def prepare_publish_session( if not register_with_glaas: return PreparedPublishSession(session_hash=session_hash) - if configured_error is not None and not glaas_client.is_configured(): + if configured_error is not None and not resolved_remote_registry.is_configured(): raise ValueError(configured_error) logger.debug("Running GLaaS health check") try: - glaas_client.health_check() + resolved_remote_registry.health_check() except Exception as exc: logger.debug("GLaaS health check failed: %s", exc) raise ValueError(f"GLaaS health check failed: {exc}") from exc logger.debug("Registering session with GLaaS") - session_result = session_service.register(session_hash, git_context) + session_result = resolved_session_service.register(session_hash, git_context) if not session_result.success: logger.debug("Session registration failed: %s", session_result.error) raise ValueError(f"Session registration failed: {session_result.error}") diff --git a/roar/application/run/execution.py b/roar/application/run/execution.py index bae25460..6de7563e 100644 --- a/roar/application/run/execution.py +++ b/roar/application/run/execution.py @@ -10,7 +10,7 @@ from ...core.models.run import RunContext from ...execution.framework.registry import get_execution_backend -from ...execution.runtime.host_execution import ExecutionSetupError +from ...execution.runtime.errors import ExecutionSetupError from ...presenters.console import ConsolePresenter from ...presenters.run_report import RunReportPresenter diff --git a/roar/backends/osmo/host_execution.py b/roar/backends/osmo/host_execution.py index 0c0e72fd..c9a96a3f 100644 --- a/roar/backends/osmo/host_execution.py +++ b/roar/backends/osmo/host_execution.py @@ -31,7 +31,7 @@ from roar.db.context import create_database_context from roar.db.hashing import hash_files_blake3 from roar.execution.recording import LocalJobRecorder, LocalRecordedArtifact, StalenessAnalyzer -from roar.execution.runtime.host_execution import ExecutionSetupError +from roar.execution.runtime.errors import ExecutionSetupError _TERMINAL_WORKFLOW_STATUSES = { "CANCELLED", diff --git a/roar/cli/__init__.py b/roar/cli/__init__.py index 394845ef..e69f0dd3 100644 --- a/roar/cli/__init__.py +++ b/roar/cli/__init__.py @@ -18,6 +18,8 @@ import click +from .command_registry import COMMAND_SPECS, build_help_groups, build_lazy_commands + # Version is loaded from package metadata try: from importlib.metadata import version @@ -29,59 +31,9 @@ # Lazy command registry: maps command name to (module_path, command_name, short_help) # Short help is stored here to avoid importing commands just for --help -LAZY_COMMANDS: dict[str, tuple[str, str, str]] = { - "auth": ("roar.cli.commands.auth", "auth", "Manage GLaaS auth and SSH keys"), - "build": ("roar.cli.commands.build", "build", "Track a build step before the main pipeline"), - "config": ("roar.cli.commands.config", "config", "View or set configuration"), - "dag": ("roar.cli.commands.dag", "dag", "Inspect the local execution DAG"), - "diff": ("roar.cli.commands.diff", "diff", "Compare provenance of two artifacts or steps"), - "env": ("roar.cli.commands.env", "env", "Manage persistent environment variables"), - "get": ("roar.cli.commands.get", "get", "Download published artifacts"), - "init": ("roar.cli.commands.init", "init", "Set up roar in a project"), - "inputs": ("roar.cli.commands.inputs", "inputs", "Show root input artifacts for a target"), - "label": ("roar.cli.commands.label", "label", "Manage local labels"), - "lineage": ("roar.cli.commands.lineage", "lineage", "Inspect lineage for a tracked artifact"), - "log": ("roar.cli.commands.log", "log", "List jobs in the active session"), - "login": ("roar.cli.commands.login", "login", "Store global GLaaS/TReqs auth state"), - "logout": ("roar.cli.commands.logout", "logout", "Clear global GLaaS/TReqs auth state"), - "osmo": ("roar.cli.commands.osmo", "osmo", "Manage OSMO workflow attachment"), - "pop": ("roar.cli.commands.pop", "pop", "Remove the last local step"), - "proxy": ("roar.cli.commands.proxy", "proxy", "Manage S3 proxy for lineage tracking"), - "put": ("roar.cli.commands.put", "put", "Publish artifacts and register lineage"), - "projects": ( - "roar.cli.commands.projects", - "projects", - "Manage GLaaS projects visible through your TReqs account", - ), - "register": ("roar.cli.commands.register", "register", "Register local lineage with GLaaS"), - "reproduce": ("roar.cli.commands.reproduce", "reproduce", "Generate a reproduction plan"), - "reset": ("roar.cli.commands.reset", "reset", "Reset roar state"), - "run": ("roar.cli.commands.run", "run", "Track a command with provenance"), - "show": ("roar.cli.commands.show", "show", "Inspect a session, job, or artifact"), - "status": ("roar.cli.commands.status", "status", "Show the active session summary"), - "tracer": ("roar.cli.commands.tracer", "tracer", "Configure tracer backend defaults"), - "whoami": ( - "roar.cli.commands.whoami", - "whoami", - "Show current GLaaS/TReqs login and repo binding", - ), - "workflow": ( - "roar.cli.commands.workflow", - "workflow", - "Generate TReqs workflow YAML from local sessions", - ), -} - -HELP_GROUPS: tuple[tuple[str, tuple[str, ...]], ...] = ( - ("Start Here", ("init", "run", "build", "dag")), - ( - "Inspect Local Lineage", - ("status", "log", "show", "diff", "lineage", "inputs", "pop", "reproduce"), - ), - ("Share and Publish", ("put", "register", "get", "label")), - ("Setup and Admin", ("auth", "config", "env", "tracer", "proxy", "reset")), - ("GLaaS / TReqs Account", ("login", "logout", "whoami", "projects", "workflow")), -) +LAZY_COMMANDS: dict[str, tuple[str, str, str]] = build_lazy_commands() + +HELP_GROUPS: tuple[tuple[str, tuple[str, ...]], ...] = build_help_groups() EXPERIMENTAL_ACCOUNT_COMMANDS_FLAG = "ROAR_ENABLE_EXPERIMENTAL_ACCOUNT_COMMANDS" _EXPERIMENTAL_ACCOUNT_COMMANDS = frozenset( @@ -276,6 +228,7 @@ def cli(ctx: click.Context) -> None: # Export public API __all__ = [ + "COMMAND_SPECS", "EXPERIMENTAL_ACCOUNT_COMMANDS_FLAG", "LazyGroup", "__version__", diff --git a/roar/cli/command_registry.py b/roar/cli/command_registry.py new file mode 100644 index 00000000..759d40dc --- /dev/null +++ b/roar/cli/command_registry.py @@ -0,0 +1,225 @@ +"""Static command-set registry for the top-level roar CLI.""" + +from __future__ import annotations + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class CommandSpec: + """Lazy-load metadata for a top-level CLI command.""" + + name: str + module_path: str + attr_name: str + short_help: str + help_section: str | None = None + + +_HELP_SECTION_ORDER: tuple[str, ...] = ( + "Start Here", + "Inspect Local Lineage", + "Share and Publish", + "Setup and Admin", + "GLaaS / TReqs Account", +) + +_COMMAND_SPECS: tuple[CommandSpec, ...] = ( + CommandSpec("init", "roar.cli.commands.init", "init", "Set up roar in a project", "Start Here"), + CommandSpec( + "run", "roar.cli.commands.run", "run", "Track a command with provenance", "Start Here" + ), + CommandSpec( + "build", + "roar.cli.commands.build", + "build", + "Track a build step before the main pipeline", + "Start Here", + ), + CommandSpec( + "dag", "roar.cli.commands.dag", "dag", "Inspect the local execution DAG", "Start Here" + ), + CommandSpec( + "status", + "roar.cli.commands.status", + "status", + "Show the active session summary", + "Inspect Local Lineage", + ), + CommandSpec( + "log", + "roar.cli.commands.log", + "log", + "List jobs in the active session", + "Inspect Local Lineage", + ), + CommandSpec( + "show", + "roar.cli.commands.show", + "show", + "Inspect a session, job, or artifact", + "Inspect Local Lineage", + ), + CommandSpec( + "diff", + "roar.cli.commands.diff", + "diff", + "Compare provenance of two artifacts or steps", + "Inspect Local Lineage", + ), + CommandSpec( + "lineage", + "roar.cli.commands.lineage", + "lineage", + "Inspect lineage for a tracked artifact", + "Inspect Local Lineage", + ), + CommandSpec( + "inputs", + "roar.cli.commands.inputs", + "inputs", + "Show root input artifacts for a target", + "Inspect Local Lineage", + ), + CommandSpec( + "pop", "roar.cli.commands.pop", "pop", "Remove the last local step", "Inspect Local Lineage" + ), + CommandSpec( + "reproduce", + "roar.cli.commands.reproduce", + "reproduce", + "Generate a reproduction plan", + "Inspect Local Lineage", + ), + CommandSpec( + "put", + "roar.cli.commands.put", + "put", + "Publish artifacts and register lineage", + "Share and Publish", + ), + CommandSpec( + "register", + "roar.cli.commands.register", + "register", + "Register local lineage with GLaaS", + "Share and Publish", + ), + CommandSpec( + "get", "roar.cli.commands.get", "get", "Download published artifacts", "Share and Publish" + ), + CommandSpec( + "label", "roar.cli.commands.label", "label", "Manage local labels", "Share and Publish" + ), + CommandSpec( + "auth", + "roar.cli.commands.auth", + "auth", + "Manage GLaaS auth and SSH keys", + "Setup and Admin", + ), + CommandSpec( + "config", + "roar.cli.commands.config", + "config", + "View or set configuration", + "Setup and Admin", + ), + CommandSpec( + "env", + "roar.cli.commands.env", + "env", + "Manage persistent environment variables", + "Setup and Admin", + ), + CommandSpec( + "tracer", + "roar.cli.commands.tracer", + "tracer", + "Configure tracer backend defaults", + "Setup and Admin", + ), + CommandSpec( + "proxy", + "roar.cli.commands.proxy", + "proxy", + "Manage S3 proxy for lineage tracking", + "Setup and Admin", + ), + CommandSpec("reset", "roar.cli.commands.reset", "reset", "Reset roar state", "Setup and Admin"), + CommandSpec( + "login", + "roar.cli.commands.login", + "login", + "Store global GLaaS/TReqs auth state", + "GLaaS / TReqs Account", + ), + CommandSpec( + "logout", + "roar.cli.commands.logout", + "logout", + "Clear global GLaaS/TReqs auth state", + "GLaaS / TReqs Account", + ), + CommandSpec( + "whoami", + "roar.cli.commands.whoami", + "whoami", + "Show current GLaaS/TReqs login and repo binding", + "GLaaS / TReqs Account", + ), + CommandSpec( + "projects", + "roar.cli.commands.projects", + "projects", + "Manage GLaaS projects visible through your TReqs account", + "GLaaS / TReqs Account", + ), + CommandSpec( + "workflow", + "roar.cli.commands.workflow", + "workflow", + "Generate TReqs workflow YAML from local sessions", + "GLaaS / TReqs Account", + ), + CommandSpec("osmo", "roar.cli.commands.osmo", "osmo", "Manage OSMO workflow attachment"), +) + + +def iter_command_specs() -> tuple[CommandSpec, ...]: + """Return the registered top-level command specs.""" + return _COMMAND_SPECS + + +def build_lazy_commands() -> dict[str, tuple[str, str, str]]: + """Build the lazy-command registry consumed by ``LazyGroup``.""" + return { + spec.name: (spec.module_path, spec.attr_name, spec.short_help) + for spec in iter_command_specs() + } + + +def build_help_groups() -> tuple[tuple[str, tuple[str, ...]], ...]: + """Build grouped help sections from registered command specs.""" + sections: dict[str, list[str]] = {section: [] for section in _HELP_SECTION_ORDER} + for spec in iter_command_specs(): + if spec.help_section is None: + continue + sections.setdefault(spec.help_section, []).append(spec.name) + + return tuple( + (section, tuple(sections.get(section, ()))) + for section in _HELP_SECTION_ORDER + if sections.get(section) + ) + + +COMMAND_SPECS = iter_command_specs() + +__all__ = [ + "COMMAND_SPECS", + "CommandSpec", + "build_help_groups", + "build_lazy_commands", + "iter_command_specs", +] diff --git a/roar/core/bootstrap.py b/roar/core/bootstrap.py index fa5fcc77..08b994be 100644 --- a/roar/core/bootstrap.py +++ b/roar/core/bootstrap.py @@ -2,12 +2,7 @@ from pathlib import Path -from ..integrations import ( - discover_optional_integrations, - register_telemetry_provider, - register_vcs_provider, - reset_integrations, -) +from ..integrations import bootstrap_integrations, reset_integrations from .logging import configure_logger, reset_logger _initialized = False @@ -33,11 +28,7 @@ def bootstrap(roar_dir: Path | None = None) -> None: _configure_core_logging(roar_dir) - # Register built-in integrations that should not depend on plugin discovery. - _register_builtin_integrations() - - # Discover and register optional integrations - discover_optional_integrations() + bootstrap_integrations() _initialized = True return @@ -65,15 +56,6 @@ def _configure_core_logging(roar_dir: Path | None = None) -> None: ) -def _register_builtin_integrations() -> None: - """Register built-in integrations that are part of the core product path.""" - from ..integrations.git import GitVCSProvider - from ..integrations.telemetry import WandBTelemetryProvider - - register_vcs_provider("git", GitVCSProvider) - register_telemetry_provider("wandb", WandBTelemetryProvider) - - def reset() -> None: """ Reset the application state. diff --git a/roar/execution/runtime/__init__.py b/roar/execution/runtime/__init__.py index 6cb52a37..83950f9b 100644 --- a/roar/execution/runtime/__init__.py +++ b/roar/execution/runtime/__init__.py @@ -5,7 +5,8 @@ if TYPE_CHECKING: from .backup import PreviousOutputBackupService from .coordinator import RunCoordinator - from .host_execution import ExecutionSetupError, execute_host_run + from .errors import ExecutionSetupError + from .host_execution import execute_host_run from .signal_handler import ProcessSignalHandler from .tracer import TracerService @@ -29,13 +30,14 @@ def __getattr__(name: str): from .coordinator import RunCoordinator return RunCoordinator - if name in {"ExecutionSetupError", "execute_host_run"}: - from .host_execution import ExecutionSetupError, execute_host_run + if name == "ExecutionSetupError": + from .errors import ExecutionSetupError - return { - "ExecutionSetupError": ExecutionSetupError, - "execute_host_run": execute_host_run, - }[name] + return ExecutionSetupError + if name == "execute_host_run": + from .host_execution import execute_host_run + + return execute_host_run if name == "ProcessSignalHandler": from .signal_handler import ProcessSignalHandler diff --git a/roar/execution/runtime/coordinator.py b/roar/execution/runtime/coordinator.py index 86ebd837..d3025757 100644 --- a/roar/execution/runtime/coordinator.py +++ b/roar/execution/runtime/coordinator.py @@ -12,19 +12,21 @@ import subprocess import sys import time +from collections.abc import Sequence from typing import TYPE_CHECKING, Any from roar.execution.framework.contract import ROAR_EXECUTION_BACKEND_ENV from roar.execution.recording import ExecutionJobRecorder from .backup import PreviousOutputBackupService +from .resources import RuntimeObservationBundle, RuntimeResourceController from .tracer import TracerService if TYPE_CHECKING: from ...core.interfaces.logger import ILogger from ...core.interfaces.presenter import IPresenter from ...core.models.run import RunContext, RunResult - from ..cluster.proxy import ProxyService + from .resources import HostRuntimeResource class RunCoordinator: @@ -39,7 +41,7 @@ class RunCoordinator: def __init__( self, tracer_service: TracerService | None = None, - proxy_service: ProxyService | None = None, + runtime_resources: Sequence[HostRuntimeResource] | None = None, presenter: IPresenter | None = None, logger: ILogger | None = None, job_recorder: ExecutionJobRecorder | None = None, @@ -50,18 +52,21 @@ def __init__( Args: tracer_service: Service for process tracing - proxy_service: Optional S3 proxy service for lineage tracking + runtime_resources: Explicit runtime resources to start around host execution presenter: Presenter for output logger: Logger for internal diagnostics job_recorder: Persistence service for job recording and stale analysis backup_service: Service for reversible output backup behavior """ self._tracer = tracer_service or TracerService() - self._proxy = proxy_service self._presenter = presenter self._logger = logger self._job_recorder = job_recorder or ExecutionJobRecorder() self._backup_service = backup_service or PreviousOutputBackupService() + self._runtime_resources = RuntimeResourceController( + resources=runtime_resources, + logger=logger, + ) @property def presenter(self) -> IPresenter: @@ -116,44 +121,18 @@ def execute(self, ctx: RunContext) -> RunResult: run_presenter = RunReportPresenter(self.presenter, quiet=ctx.quiet) - # Start proxy if configured - proxy_handle = None extra_env: dict[str, str] = { ROAR_EXECUTION_BACKEND_ENV: str(ctx.execution_backend), } - s3_entries: list = [] - proxy_stopped = False - if self._proxy: - try: - # Capture existing AWS_ENDPOINT_URL so the proxy can chain to it - existing_endpoint = os.environ.get("AWS_ENDPOINT_URL") - proxy_handle = self._proxy.start_for_run( - upstream_url=existing_endpoint, - ) - extra_env["AWS_ENDPOINT_URL"] = f"http://127.0.0.1:{proxy_handle.port}" - # Preserve the real upstream for cluster-backed submit paths. - if existing_endpoint: - extra_env["ROAR_UPSTREAM_S3_ENDPOINT"] = existing_endpoint - self.logger.debug("Proxy started on port %d", proxy_handle.port) - except Exception as e: - self.logger.warning("Failed to start proxy: %s", e) - - def stop_proxy_if_running() -> list: - """Stop per-run proxy exactly once and return parsed entries.""" - nonlocal proxy_stopped, s3_entries - - if proxy_stopped: - return s3_entries - proxy_stopped = True - - if proxy_handle and self._proxy: - try: - s3_entries = self._proxy.stop_for_run(proxy_handle) - self.logger.debug("Proxy stopped, collected %d S3 entries", len(s3_entries)) - except Exception as e: - self.logger.warning("Failed to stop proxy cleanly: %s", e) - - return s3_entries + runtime_observations = RuntimeObservationBundle() + resource_env = self._runtime_resources.start_all(ctx, os.environ) + extra_env.update(resource_env) + + def stop_runtime_resources(exit_code: int | None) -> RuntimeObservationBundle: + """Stop runtime resources exactly once and return collected observations.""" + nonlocal runtime_observations + runtime_observations = self._runtime_resources.stop_all(exit_code=exit_code) + return runtime_observations # Execute via tracer from ...core.exceptions import TracerNotFoundError @@ -161,7 +140,7 @@ def stop_proxy_if_running() -> list: # Resolve the backend name before execution so the trace_starting # line can show the actual tracer, not "auto". Mirror the same # resolution logic that execute() uses (config → override → auto). - proxy_active = proxy_handle is not None + proxy_active = "proxy" in self._runtime_resources.active_resource_names() resolved_mode: str | None = None try: mode = ctx.tracer_mode or self._tracer._get_tracer_mode() @@ -198,7 +177,7 @@ def stop_proxy_if_running() -> list: except TracerNotFoundError as e: from ...core.models.run import RunResult - stop_proxy_if_running() + stop_runtime_resources(e.exit_code) self.logger.debug("Tracer not found: %s", e) self.presenter.print_error(str(e)) return RunResult( @@ -211,10 +190,13 @@ def stop_proxy_if_running() -> list: interrupted=False, is_build=is_build, ) + except Exception: + stop_runtime_resources(None) + raise # Check if we should abort (double Ctrl-C) if signal_handler.should_abort(): - stop_proxy_if_running() + stop_runtime_resources(tracer_result.exit_code) self._cleanup_logs(tracer_result.tracer_log_path, tracer_result.inject_log_path) sys.exit(130) @@ -234,7 +216,7 @@ def stop_proxy_if_running() -> list: if not os.path.exists(tracer_result.tracer_log_path): self.logger.warning("Tracer log not found at %s", tracer_result.tracer_log_path) self.logger.warning("The tracer may have failed to start. Run was not recorded.") - stop_proxy_if_running() + stop_runtime_resources(tracer_result.exit_code) self._cleanup_logs(tracer_result.tracer_log_path, tracer_result.inject_log_path) return RunResult( exit_code=tracer_result.exit_code, @@ -271,8 +253,8 @@ def stop_proxy_if_running() -> list: n_written, ) - # Stop proxy and collect S3 entries before DB recording. - s3_entries = stop_proxy_if_running() + # Stop runtime resources and collect observations before DB recording. + runtime_observations = stop_runtime_resources(tracer_result.exit_code) total_files = n_read + n_written with run_presenter.hashing(total=total_files or None): @@ -285,7 +267,7 @@ def stop_proxy_if_running() -> list: tracer_result, start_time, is_build, - s3_entries, + list(runtime_observations.s3_entries), run_job_uid=run_job_uid, ) ) diff --git a/roar/execution/runtime/errors.py b/roar/execution/runtime/errors.py new file mode 100644 index 00000000..2031b1ef --- /dev/null +++ b/roar/execution/runtime/errors.py @@ -0,0 +1,5 @@ +class ExecutionSetupError(RuntimeError): + """Raised when a backend cannot start a host-side execution path.""" + + +__all__ = ["ExecutionSetupError"] diff --git a/roar/execution/runtime/host_execution.py b/roar/execution/runtime/host_execution.py index 5a42977c..76c4d31e 100644 --- a/roar/execution/runtime/host_execution.py +++ b/roar/execution/runtime/host_execution.py @@ -2,34 +2,21 @@ from typing import TYPE_CHECKING +from .errors import ExecutionSetupError + if TYPE_CHECKING: from roar.core.models.run import RunContext, RunResult -class ExecutionSetupError(RuntimeError): - """Raised when a backend cannot start a host-side execution path.""" - - def execute_host_run(ctx: RunContext) -> RunResult: from roar.core.bootstrap import bootstrap from roar.execution.runtime.coordinator import RunCoordinator - from roar.integrations.config import config_get + from roar.execution.runtime.resources import build_host_runtime_resources bootstrap(ctx.roar_dir) - proxy_service = None - if config_get("proxy.enabled", start_dir=ctx.repo_root): - from roar.execution.cluster.proxy import ProxyService - - proxy_service = ProxyService() - if not proxy_service.find_proxy(): - raise ExecutionSetupError( - "Error: S3 proxy is enabled but roar-proxy binary not found.\n" - "Build it with: cargo build --release --manifest-path rust/Cargo.toml -p roar-proxy\n" - "Or disable: roar proxy disable" - ) - - coordinator = RunCoordinator(proxy_service=proxy_service) + runtime_resources = build_host_runtime_resources(ctx) + coordinator = RunCoordinator(runtime_resources=runtime_resources) return coordinator.execute(ctx) diff --git a/roar/execution/runtime/proxy_resource.py b/roar/execution/runtime/proxy_resource.py new file mode 100644 index 00000000..7fed3a6c --- /dev/null +++ b/roar/execution/runtime/proxy_resource.py @@ -0,0 +1,81 @@ +from __future__ import annotations + +from collections.abc import Mapping +from typing import TYPE_CHECKING + +from roar.execution.cluster.proxy import ProxyService +from roar.execution.runtime.errors import ExecutionSetupError + +from .resources import RuntimeObservationBundle, RuntimeResourceStart + +if TYPE_CHECKING: + from roar.core.interfaces.logger import ILogger + from roar.core.models.run import RunContext + from roar.execution.cluster.proxy import ProxyHandle + + +class ProxyRuntimeResource: + """Runtime resource wrapper for per-run S3 proxy capture.""" + + name = "proxy" + + def __init__( + self, + service: ProxyService | None = None, + logger: ILogger | None = None, + ) -> None: + self._service = service or ProxyService() + self._logger = logger + self._handle: ProxyHandle | None = None + + if not self._service.find_proxy(): + raise ExecutionSetupError( + "Error: S3 proxy is enabled but roar-proxy binary not found.\n" + "Build it with: cargo build --release --manifest-path rust/Cargo.toml -p roar-proxy\n" + "Or disable: roar proxy disable" + ) + + @property + def logger(self) -> ILogger: + if self._logger is None: + from roar.core.logging import get_logger + + self._logger = get_logger() + return self._logger + + @property + def active(self) -> bool: + return self._handle is not None + + def start(self, ctx: RunContext, environ: Mapping[str, str]) -> RuntimeResourceStart: + del ctx + existing_endpoint = str(environ.get("AWS_ENDPOINT_URL") or "").strip() or None + try: + self._handle = self._service.start_for_run(upstream_url=existing_endpoint) + except Exception as exc: + self._handle = None + self.logger.warning("Failed to start proxy: %s", exc) + return RuntimeResourceStart() + + env = { + "AWS_ENDPOINT_URL": f"http://127.0.0.1:{self._handle.port}", + } + if existing_endpoint: + env["ROAR_UPSTREAM_S3_ENDPOINT"] = existing_endpoint + + self.logger.debug("Proxy runtime resource started on port %d", self._handle.port) + return RuntimeResourceStart(env=env) + + def stop(self, *, exit_code: int | None) -> RuntimeObservationBundle: + del exit_code + if self._handle is None: + return RuntimeObservationBundle() + + handle = self._handle + self._handle = None + entries = tuple(self._service.stop_for_run(handle)) + self.logger.debug("Proxy runtime resource stopped, collected %d S3 entries", len(entries)) + return RuntimeObservationBundle(s3_entries=entries) + + +__all__ = ["ProxyRuntimeResource"] diff --git a/roar/execution/runtime/resources.py b/roar/execution/runtime/resources.py new file mode 100644 index 00000000..661c65fd --- /dev/null +++ b/roar/execution/runtime/resources.py @@ -0,0 +1,133 @@ +from __future__ import annotations + +from collections.abc import Mapping, Sequence +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any, Protocol + +from roar.execution.cluster.proxy import S3LogEntry + +if TYPE_CHECKING: + from roar.core.interfaces.logger import ILogger + from roar.core.models.run import RunContext + + +@dataclass(frozen=True) +class RuntimeResourceStart: + """Result of starting a host runtime resource.""" + + env: Mapping[str, str] = field(default_factory=dict) + + +@dataclass(frozen=True) +class RuntimeObservationBundle: + """Observations collected from host runtime resources.""" + + s3_entries: tuple[S3LogEntry, ...] = () + metadata: Mapping[str, Any] = field(default_factory=dict) + + +class HostRuntimeResource(Protocol): + """Explicit start/stop contract for host runtime resources.""" + + name: str + + def start(self, ctx: RunContext, environ: Mapping[str, str]) -> RuntimeResourceStart: ... + + def stop(self, *, exit_code: int | None) -> RuntimeObservationBundle: ... + + @property + def active(self) -> bool: ... + + +class RuntimeResourceController: + """Owns deterministic startup and teardown of host runtime resources.""" + + def __init__( + self, + resources: Sequence[HostRuntimeResource] | None = None, + logger: ILogger | None = None, + ) -> None: + self._resources = tuple(resources or ()) + self._started_resources: list[HostRuntimeResource] = [] + self._stopped = False + self._cached_stop_result = RuntimeObservationBundle() + self._logger = logger + + @property + def logger(self) -> ILogger: + if self._logger is None: + from roar.core.logging import get_logger + + self._logger = get_logger() + return self._logger + + def start_all(self, ctx: RunContext, environ: Mapping[str, str]) -> dict[str, str]: + """Start all configured resources and return merged env patches.""" + merged_env: dict[str, str] = {} + self._started_resources = [] + self._stopped = False + self._cached_stop_result = RuntimeObservationBundle() + + for resource in self._resources: + effective_environ = dict(environ) + effective_environ.update(merged_env) + result = resource.start(ctx, effective_environ) + merged_env.update({str(key): str(value) for key, value in result.env.items()}) + self._started_resources.append(resource) + + return merged_env + + def stop_all(self, *, exit_code: int | None) -> RuntimeObservationBundle: + """Stop resources exactly once and return aggregated observations.""" + if self._stopped: + return self._cached_stop_result + + collected_s3_entries: list[S3LogEntry] = [] + merged_metadata: dict[str, Any] = {} + + for resource in reversed(self._started_resources): + try: + result = resource.stop(exit_code=exit_code) + except Exception as exc: + self.logger.warning( + "Failed to stop runtime resource %s cleanly: %s", + resource.name, + exc, + ) + continue + + collected_s3_entries.extend(result.s3_entries) + merged_metadata.update(dict(result.metadata)) + + self._cached_stop_result = RuntimeObservationBundle( + s3_entries=tuple(collected_s3_entries), + metadata=merged_metadata, + ) + self._stopped = True + return self._cached_stop_result + + def active_resource_names(self) -> tuple[str, ...]: + """Return active resource names in start order.""" + return tuple(resource.name for resource in self._started_resources if resource.active) + + +def build_host_runtime_resources(ctx: RunContext) -> tuple[HostRuntimeResource, ...]: + """Build the host runtime resources required for a tracked host execution.""" + from roar.integrations.config import config_get + + resources: list[HostRuntimeResource] = [] + if config_get("proxy.enabled", start_dir=ctx.repo_root): + from .proxy_resource import ProxyRuntimeResource + + resources.append(ProxyRuntimeResource()) + + return tuple(resources) + + +__all__ = [ + "HostRuntimeResource", + "RuntimeObservationBundle", + "RuntimeResourceController", + "RuntimeResourceStart", + "build_host_runtime_resources", +] diff --git a/roar/integrations/__init__.py b/roar/integrations/__init__.py index 4a31154f..53eb93c2 100644 --- a/roar/integrations/__init__.py +++ b/roar/integrations/__init__.py @@ -1,5 +1,6 @@ """Integration adapters and provider registries for external systems.""" +from .bootstrap import bootstrap_integrations, register_builtin_integrations from .discovery import discover_optional_integrations from .registry import ( get_all_telemetry_providers, @@ -14,6 +15,7 @@ ) __all__ = [ + "bootstrap_integrations", "discover_optional_integrations", "get_all_telemetry_providers", "get_integration_registry", @@ -21,6 +23,7 @@ "get_vcs_provider", "list_telemetry_providers", "list_vcs_providers", + "register_builtin_integrations", "register_telemetry_provider", "register_vcs_provider", "reset_integrations", diff --git a/roar/integrations/bootstrap.py b/roar/integrations/bootstrap.py new file mode 100644 index 00000000..2401e43d --- /dev/null +++ b/roar/integrations/bootstrap.py @@ -0,0 +1,27 @@ +"""Bootstrap helpers for built-in and optional integrations.""" + +from __future__ import annotations + +from .discovery import discover_optional_integrations +from .registry import register_telemetry_provider, register_vcs_provider + + +def register_builtin_integrations() -> None: + """Register built-in providers that are part of the core product path.""" + from .git import GitVCSProvider + from .telemetry import WandBTelemetryProvider + + register_vcs_provider("git", GitVCSProvider) + register_telemetry_provider("wandb", WandBTelemetryProvider) + + +def bootstrap_integrations() -> None: + """Register built-in providers and discover optional providers.""" + register_builtin_integrations() + discover_optional_integrations() + + +__all__ = [ + "bootstrap_integrations", + "register_builtin_integrations", +] diff --git a/roar/integrations/discovery.py b/roar/integrations/discovery.py index dcb37d5e..05133138 100644 --- a/roar/integrations/discovery.py +++ b/roar/integrations/discovery.py @@ -2,15 +2,25 @@ from __future__ import annotations +from collections.abc import Callable +from typing import Any + from ..core.interfaces.telemetry import ITelemetryProvider from ..core.interfaces.vcs import IVCSProvider from ..core.logging import get_logger as _get_logger from .registry import register_telemetry_provider, register_vcs_provider +_LEGACY_ENTRYPOINT_GROUP = "roar.integrations" +_TYPED_PROVIDER_GROUPS: tuple[tuple[str, type, Callable[[str, type], None]], ...] = ( + ("roar.telemetry_providers", ITelemetryProvider, register_telemetry_provider), + ("roar.vcs_providers", IVCSProvider, register_vcs_provider), +) + def discover_optional_integrations() -> None: """Auto-discover and register optional integration providers.""" - _discover_entrypoint_integrations() + _discover_typed_entrypoint_integrations() + _discover_legacy_entrypoint_integrations() def _implements(cls: type, interface: type) -> bool: @@ -26,29 +36,72 @@ def _implements(cls: type, interface: type) -> bool: return False -def _discover_entrypoint_integrations() -> None: - """Discover providers registered via the ``roar.integrations`` entrypoint group.""" - try: - from importlib.metadata import entry_points +def _discover_typed_entrypoint_integrations() -> None: + """Discover providers via typed entry-point groups.""" + for group, interface, register in _TYPED_PROVIDER_GROUPS: + try: + for entry_point in _iter_entry_points(group): + try: + provider_cls = entry_point.load() + _register_typed_entrypoint_provider(provider_cls, interface, register) + except Exception as exc: + _get_logger().debug( + "Failed to load integration entry point %s from %s: %s", + entry_point.name, + group, + exc, + ) + continue + except Exception as exc: + _get_logger().debug("Failed to discover optional integrations from %s: %s", group, exc) - eps = entry_points(group="roar.integrations") - for ep in eps: +def _discover_legacy_entrypoint_integrations() -> None: + """Discover providers registered via the legacy ``roar.integrations`` group.""" + try: + for entry_point in _iter_entry_points(_LEGACY_ENTRYPOINT_GROUP): try: - provider_cls = ep.load() + provider_cls = entry_point.load() _register_entrypoint_provider(provider_cls) except Exception as exc: - _get_logger().debug("Failed to load integration entry point %s: %s", ep.name, exc) + _get_logger().debug( + "Failed to load integration entry point %s: %s", entry_point.name, exc + ) continue except Exception as exc: _get_logger().debug("Failed to discover optional integrations: %s", exc) def _register_entrypoint_provider(provider_cls: type) -> None: - """Register an entry point provider based on its interface.""" + """Register a legacy entry point provider based on its interface.""" if _implements(provider_cls, ITelemetryProvider): instance = provider_cls() register_telemetry_provider(instance.name, provider_cls) elif _implements(provider_cls, IVCSProvider): instance = provider_cls() register_vcs_provider(instance.name, provider_cls) + + +def _register_typed_entrypoint_provider( + provider_cls: type, + interface: type, + register: Callable[[str, type], None], +) -> None: + """Register an entry point provider from a typed group.""" + if not _implements(provider_cls, interface): + return + instance = provider_cls() + register(instance.name, provider_cls) + + +def _iter_entry_points(group: str) -> tuple[Any, ...]: + from importlib.metadata import entry_points + + try: + return tuple(entry_points(group=group)) + except TypeError: + all_entry_points = entry_points() + select = getattr(all_entry_points, "select", None) + if callable(select): + return tuple(select(group=group)) + return tuple(all_entry_points.get(group, ())) diff --git a/tests/application/publish/test_register_preparation.py b/tests/application/publish/test_register_preparation.py index f8bef567..5de088e4 100644 --- a/tests/application/publish/test_register_preparation.py +++ b/tests/application/publish/test_register_preparation.py @@ -75,19 +75,22 @@ def test_prepare_register_execution_builds_session_git_and_tag_plan(tmp_path: Pa tmp_path, error_message="Cannot register with uncommitted changes. Commit your changes first.", ) - prepare_session.assert_called_once_with( - glaas_client=runtime.glaas_client, - session_service=runtime.session_service, - roar_dir=tmp_path / ".roar", - session_id=7, - git_context=git_context, - logger=logger, - register_with_glaas=True, - configured_error="GLaaS not configured. Run 'roar config set glaas.url ' first.", - session_hash_override=None, - lineage=None, - creator_identity="anonymous", + prepare_session.assert_called_once() + prepare_kwargs = prepare_session.call_args.kwargs + assert prepare_kwargs["remote_registry"].client is runtime.glaas_client + assert prepare_kwargs["remote_registry"].session_service is runtime.session_service + assert prepare_kwargs["roar_dir"] == tmp_path / ".roar" + assert prepare_kwargs["session_id"] == 7 + assert prepare_kwargs["git_context"] == git_context + assert prepare_kwargs["logger"] is logger + assert prepare_kwargs["register_with_glaas"] is True + assert ( + prepare_kwargs["configured_error"] + == "GLaaS not configured. Run 'roar config set glaas.url ' first." ) + assert prepare_kwargs["session_hash_override"] is None + assert prepare_kwargs["lineage"] is None + assert prepare_kwargs["creator_identity"] == "anonymous" def test_prepare_register_execution_passes_lineage_and_creator_identity_to_session_preparation( diff --git a/tests/application/publish/test_remote_registry.py b/tests/application/publish/test_remote_registry.py new file mode 100644 index 00000000..79f29d69 --- /dev/null +++ b/tests/application/publish/test_remote_registry.py @@ -0,0 +1,39 @@ +from __future__ import annotations + +from unittest.mock import MagicMock + +from roar.application.publish.remote_registry import ( + GlaasRemoteRegistryTransport, + coerce_remote_registry, +) + + +def test_glaas_remote_registry_transport_delegates_to_client() -> None: + client = MagicMock() + client.is_configured.return_value = True + client.health_check.return_value = None + client.register_composite_artifact.return_value = {"ok": True} + client.sync_labels.return_value = ({"ok": True}, None) + client.publish_auth = {"mode": "device"} + session_service = MagicMock() + coordinator = MagicMock() + + transport = GlaasRemoteRegistryTransport( + client=client, + session_service=session_service, + registration_coordinator=coordinator, + ) + + assert transport.publish_auth == {"mode": "device"} + assert transport.is_configured() is True + assert transport.health_check() is None + assert transport.register_composite_artifact({"hash": "abc"}) == {"ok": True} + assert transport.sync_labels([{"entity_type": "artifact"}]) == ({"ok": True}, None) + assert transport.session_service is session_service + assert transport.registration_coordinator is coordinator + + +def test_coerce_remote_registry_returns_existing_transport() -> None: + transport = GlaasRemoteRegistryTransport(client=MagicMock()) + + assert coerce_remote_registry(remote_registry=transport) is transport diff --git a/tests/application/publish/test_runtime.py b/tests/application/publish/test_runtime.py index 5bdca852..dbd7d75f 100644 --- a/tests/application/publish/test_runtime.py +++ b/tests/application/publish/test_runtime.py @@ -3,6 +3,7 @@ from unittest.mock import MagicMock, patch from roar.application.publish.lineage import LineageCollector +from roar.application.publish.remote_registry import GlaasRemoteRegistryTransport from roar.application.publish.runtime import build_publish_runtime @@ -34,6 +35,10 @@ def test_build_publish_runtime_builds_shared_dependency_stack() -> None: ): runtime = build_publish_runtime(glaas_url="http://localhost:3001") + assert isinstance(runtime.remote_registry, GlaasRemoteRegistryTransport) + assert runtime.remote_registry.client is client + assert runtime.remote_registry.session_service is session_service + assert runtime.remote_registry.registration_coordinator is coordinator assert runtime.glaas_client is client assert runtime.session_service is session_service assert runtime.registration_coordinator is coordinator diff --git a/tests/integration/test_bootstrap_integrations.py b/tests/integration/test_bootstrap_integrations.py index 30bc4c79..cc703468 100644 --- a/tests/integration/test_bootstrap_integrations.py +++ b/tests/integration/test_bootstrap_integrations.py @@ -1,5 +1,7 @@ from __future__ import annotations +from unittest.mock import patch + from roar.core.bootstrap import bootstrap, reset from roar.integrations import list_telemetry_providers, list_vcs_providers @@ -13,3 +15,14 @@ def test_bootstrap_registers_builtin_integrations(tmp_path): assert "git" in list_vcs_providers() assert "wandb" in list_telemetry_providers() + + +def test_bootstrap_delegates_integration_wiring_to_integrations_layer(tmp_path): + roar_dir = tmp_path / ".roar" + roar_dir.mkdir() + + reset() + with patch("roar.core.bootstrap.bootstrap_integrations") as bootstrap_integrations: + bootstrap(roar_dir) + + bootstrap_integrations.assert_called_once_with() diff --git a/tests/integrations/test_integration_discovery.py b/tests/integrations/test_integration_discovery.py index fe69d952..09e740f3 100644 --- a/tests/integrations/test_integration_discovery.py +++ b/tests/integrations/test_integration_discovery.py @@ -4,8 +4,14 @@ from roar.core.bootstrap import reset from roar.core.interfaces.telemetry import ITelemetryProvider +from roar.core.interfaces.vcs import IVCSProvider from roar.core.models.telemetry import TelemetryRunInfo -from roar.integrations import discover_optional_integrations, list_telemetry_providers +from roar.core.models.vcs import VCSInfo +from roar.integrations import ( + discover_optional_integrations, + list_telemetry_providers, + list_vcs_providers, +) class _ExampleTelemetryProvider(ITelemetryProvider): @@ -26,17 +32,93 @@ def get_run_url(self, run_id: str) -> str | None: return None -def test_discover_optional_integrations_registers_entrypoint_telemetry(monkeypatch) -> None: +class _ExampleVcsProvider(IVCSProvider): + @property + def name(self) -> str: + return "example-vcs" + + def get_repo_root(self, path: str | None = None) -> str | None: + return path + + def get_info(self, repo_root: str) -> VCSInfo: + del repo_root + return VCSInfo() + + def get_status(self, repo_root: str) -> tuple[bool, list[str]]: + del repo_root + return True, [] + + def is_tracked(self, repo_root: str, path: str) -> bool: + del repo_root, path + return True + + def classify_file(self, repo_root: str, path: str) -> str: + del repo_root, path + return "tracked" + + +def test_discover_optional_integrations_registers_typed_entrypoint_telemetry(monkeypatch) -> None: + reset() + + def fake_entry_points(*, group: str): + if group == "roar.telemetry_providers": + return [ + SimpleNamespace( + name="example", + load=lambda: _ExampleTelemetryProvider, + ) + ] + if group == "roar.vcs_providers": + return [] + if group == "roar.integrations": + return [] + raise AssertionError(f"Unexpected group: {group}") + + monkeypatch.setattr("importlib.metadata.entry_points", fake_entry_points) + + discover_optional_integrations() + + assert "example" in list_telemetry_providers() + + +def test_discover_optional_integrations_registers_typed_entrypoint_vcs(monkeypatch) -> None: + reset() + + def fake_entry_points(*, group: str): + if group == "roar.telemetry_providers": + return [] + if group == "roar.vcs_providers": + return [ + SimpleNamespace( + name="example-vcs", + load=lambda: _ExampleVcsProvider, + ) + ] + if group == "roar.integrations": + return [] + raise AssertionError(f"Unexpected group: {group}") + + monkeypatch.setattr("importlib.metadata.entry_points", fake_entry_points) + + discover_optional_integrations() + + assert "example-vcs" in list_vcs_providers() + + +def test_discover_optional_integrations_keeps_legacy_group_compatibility(monkeypatch) -> None: reset() def fake_entry_points(*, group: str): - assert group == "roar.integrations" - return [ - SimpleNamespace( - name="example", - load=lambda: _ExampleTelemetryProvider, - ) - ] + if group in {"roar.telemetry_providers", "roar.vcs_providers"}: + return [] + if group == "roar.integrations": + return [ + SimpleNamespace( + name="example", + load=lambda: _ExampleTelemetryProvider, + ) + ] + raise AssertionError(f"Unexpected group: {group}") monkeypatch.setattr("importlib.metadata.entry_points", fake_entry_points) diff --git a/tests/unit/test_cli_registry.py b/tests/unit/test_cli_registry.py index 63803463..e4b5da3c 100644 --- a/tests/unit/test_cli_registry.py +++ b/tests/unit/test_cli_registry.py @@ -4,13 +4,33 @@ from click.testing import CliRunner -from roar.cli import EXPERIMENTAL_ACCOUNT_COMMANDS_FLAG, LAZY_COMMANDS, cli +from roar.cli import COMMAND_SPECS, EXPERIMENTAL_ACCOUNT_COMMANDS_FLAG, LAZY_COMMANDS, cli +from roar.cli.command_registry import build_help_groups def test_composite_command_removed_from_lazy_registry() -> None: assert "composite" not in LAZY_COMMANDS +def test_lazy_registry_is_built_from_command_specs() -> None: + assert {spec.name for spec in COMMAND_SPECS} == set(LAZY_COMMANDS) + assert len(COMMAND_SPECS) == len({spec.name for spec in COMMAND_SPECS}) + + +def test_help_groups_are_built_from_command_specs() -> None: + help_groups = dict(build_help_groups()) + + assert help_groups["Start Here"] == ("init", "run", "build", "dag") + assert help_groups["Share and Publish"] == ("put", "register", "get", "label") + assert help_groups["GLaaS / TReqs Account"] == ( + "login", + "logout", + "whoami", + "projects", + "workflow", + ) + + def test_help_does_not_list_composite_command() -> None: runner = CliRunner() result = runner.invoke(cli, ["--help"]) diff --git a/tests/unit/test_proxy_coordinator.py b/tests/unit/test_proxy_coordinator.py index 0e9ca8dd..7bfa7bd3 100644 --- a/tests/unit/test_proxy_coordinator.py +++ b/tests/unit/test_proxy_coordinator.py @@ -1,15 +1,45 @@ -"""Tests for RunCoordinator proxy integration and AWS_ENDPOINT_URL chaining.""" +"""Tests for RunCoordinator runtime-resource integration.""" from pathlib import Path from unittest.mock import MagicMock, patch from roar.core.exceptions import TracerNotFoundError -from roar.execution.cluster.proxy import ProxyHandle +from roar.execution.cluster.proxy import S3LogEntry from roar.execution.runtime.coordinator import RunCoordinator +from roar.execution.runtime.resources import RuntimeObservationBundle, RuntimeResourceStart + + +class _FakeRuntimeResource: + name = "proxy" + + def __init__( + self, + *, + start_env: dict[str, str] | None = None, + stop_result: RuntimeObservationBundle | None = None, + ) -> None: + self.start_env = start_env or {} + self.stop_result = stop_result or RuntimeObservationBundle() + self.start_calls: list[dict[str, object]] = [] + self.stop_calls: list[int | None] = [] + self._active = False + + @property + def active(self) -> bool: + return self._active + + def start(self, ctx, environ): + self.start_calls.append({"ctx": ctx, "environ": dict(environ)}) + self._active = bool(self.start_env) + return RuntimeResourceStart(env=self.start_env) + + def stop(self, *, exit_code: int | None) -> RuntimeObservationBundle: + self.stop_calls.append(exit_code) + self._active = False + return self.stop_result def _make_ctx(): - """Create a minimal RunContext mock.""" ctx = MagicMock() ctx.command = ["python", "train.py"] ctx.execution_backend = "local" @@ -19,31 +49,23 @@ def _make_ctx(): ctx.hash_algorithms = ["blake3"] ctx.tracer_mode = None ctx.tracer_fallback = None + ctx.quiet = False return ctx def _make_tracer_result(): - """Create a mock tracer result.""" result = MagicMock() result.exit_code = 0 result.duration = 1.0 result.tracer_log_path = "/tmp/repo/.roar/run_1_tracer.msgpack" result.inject_log_path = "/tmp/repo/.roar/run_1_inject.json" result.interrupted = False + result.backend = "ptrace" return result -def _patch_coordinator_deps(coord): - """Return a combined context manager that patches coordinator dependencies.""" - return ( - patch("os.path.exists", return_value=True), - patch("roar.integrations.config.load_config", return_value={}), - ) - - -class TestProxyLifecycle: - def _run_coord(self, coord): - """Execute coordinator with all deps mocked.""" +class TestRuntimeResourceLifecycle: + def _run_coord(self, coord, ctx=None): mock_prov = MagicMock() mock_prov.collect.return_value = {"data": {"read_files": [], "written_files": []}} @@ -55,71 +77,51 @@ def _run_coord(self, coord): patch.object(coord, "_backup_previous_outputs"), patch.object(coord, "_cleanup_logs"), ): - return coord.execute(_make_ctx()) - - def test_start_for_run_called_when_proxy_service_provided(self): - mock_proxy = MagicMock() - mock_proxy.start_for_run.return_value = ProxyHandle(process=MagicMock(), port=9090) + return coord.execute(ctx or _make_ctx()) + def test_runtime_resource_start_called_when_configured(self): + resource = _FakeRuntimeResource(start_env={"AWS_ENDPOINT_URL": "http://127.0.0.1:9090"}) mock_tracer = MagicMock() mock_tracer.execute.return_value = _make_tracer_result() - coord = RunCoordinator(tracer_service=mock_tracer, proxy_service=mock_proxy) + coord = RunCoordinator(tracer_service=mock_tracer, runtime_resources=[resource]) self._run_coord(coord) - mock_proxy.start_for_run.assert_called_once() - - def test_stop_for_run_called_after_tracer(self): - mock_proxy = MagicMock() - handle = ProxyHandle(process=MagicMock(), port=9090) - mock_proxy.start_for_run.return_value = handle - mock_proxy.stop_for_run.return_value = [] + assert len(resource.start_calls) == 1 + def test_runtime_resource_stop_called_after_tracer(self): + resource = _FakeRuntimeResource(start_env={"AWS_ENDPOINT_URL": "http://127.0.0.1:9090"}) mock_tracer = MagicMock() mock_tracer.execute.return_value = _make_tracer_result() - coord = RunCoordinator(tracer_service=mock_tracer, proxy_service=mock_proxy) + coord = RunCoordinator(tracer_service=mock_tracer, runtime_resources=[resource]) self._run_coord(coord) - mock_proxy.stop_for_run.assert_called_once_with(handle) - - def test_extra_env_with_aws_endpoint_url_passed_to_tracer(self): - mock_proxy = MagicMock() - mock_proxy.start_for_run.return_value = ProxyHandle(process=MagicMock(), port=9090) - mock_proxy.stop_for_run.return_value = [] + assert resource.stop_calls == [0] + def test_runtime_resource_env_patch_is_forwarded_to_tracer(self): + resource = _FakeRuntimeResource( + start_env={ + "AWS_ENDPOINT_URL": "http://127.0.0.1:9090", + "ROAR_UPSTREAM_S3_ENDPOINT": "http://localhost:4566", + } + ) mock_tracer = MagicMock() mock_tracer.execute.return_value = _make_tracer_result() - coord = RunCoordinator(tracer_service=mock_tracer, proxy_service=mock_proxy) + coord = RunCoordinator(tracer_service=mock_tracer, runtime_resources=[resource]) self._run_coord(coord) - call_kwargs = mock_tracer.execute.call_args.kwargs - assert "extra_env" in call_kwargs - assert call_kwargs["extra_env"]["AWS_ENDPOINT_URL"] == "http://127.0.0.1:9090" - - def test_proxy_start_failure_continues_without_proxy(self): - mock_proxy = MagicMock() - mock_proxy.start_for_run.side_effect = RuntimeError("binary not found") - - mock_tracer = MagicMock() - mock_tracer.execute.return_value = _make_tracer_result() - - coord = RunCoordinator(tracer_service=mock_tracer, proxy_service=mock_proxy) - result = self._run_coord(coord) - - # Execution should succeed despite proxy failure - assert result.exit_code == 0 - # Tracer still receives the selected execution backend call_kwargs = mock_tracer.execute.call_args.kwargs assert call_kwargs["extra_env"]["ROAR_EXECUTION_BACKEND"] == "local" - assert "AWS_ENDPOINT_URL" not in call_kwargs["extra_env"] + assert call_kwargs["extra_env"]["AWS_ENDPOINT_URL"] == "http://127.0.0.1:9090" + assert call_kwargs["extra_env"]["ROAR_UPSTREAM_S3_ENDPOINT"] == "http://localhost:4566" - def test_no_proxy_service_means_no_extra_env(self): + def test_no_runtime_resource_means_no_extra_env_beyond_backend(self): mock_tracer = MagicMock() mock_tracer.execute.return_value = _make_tracer_result() - coord = RunCoordinator(tracer_service=mock_tracer, proxy_service=None) + coord = RunCoordinator(tracer_service=mock_tracer, runtime_resources=[]) self._run_coord(coord) call_kwargs = mock_tracer.execute.call_args.kwargs @@ -128,8 +130,7 @@ def test_no_proxy_service_means_no_extra_env(self): def test_run_job_uid_is_forwarded_to_record_job(self): mock_tracer = MagicMock() mock_tracer.execute.return_value = _make_tracer_result() - - coord = RunCoordinator(tracer_service=mock_tracer, proxy_service=None) + coord = RunCoordinator(tracer_service=mock_tracer, runtime_resources=[]) mock_prov = MagicMock() mock_prov.collect.return_value = {"data": {"read_files": [], "written_files": []}} @@ -149,64 +150,40 @@ def test_run_job_uid_is_forwarded_to_record_job(self): assert mock_record.call_args.kwargs["run_job_uid"] == "runuid12" def test_tracer_overrides_are_forwarded(self): - mock_proxy = MagicMock() - mock_proxy.start_for_run.return_value = ProxyHandle(process=MagicMock(), port=9090) - mock_proxy.stop_for_run.return_value = [] - + resource = _FakeRuntimeResource(start_env={"AWS_ENDPOINT_URL": "http://127.0.0.1:9090"}) mock_tracer = MagicMock() mock_tracer.execute.return_value = _make_tracer_result() - coord = RunCoordinator(tracer_service=mock_tracer, proxy_service=mock_proxy) - + coord = RunCoordinator(tracer_service=mock_tracer, runtime_resources=[resource]) ctx = _make_ctx() ctx.tracer_mode = "ptrace" ctx.tracer_fallback = False - - mock_prov = MagicMock() - mock_prov.collect.return_value = {"data": {"read_files": [], "written_files": []}} - with ( - patch("os.path.exists", return_value=True), - patch("roar.integrations.config.load_config", return_value={}), - patch("roar.execution.provenance.ProvenanceService", return_value=mock_prov), - patch.object(coord, "_record_job", return_value=(1, "abc123", [], [], [], [])), - patch.object(coord, "_backup_previous_outputs"), - patch.object(coord, "_cleanup_logs"), - ): - coord.execute(ctx) + self._run_coord(coord, ctx=ctx) call_kwargs = mock_tracer.execute.call_args.kwargs assert call_kwargs["tracer_mode_override"] == "ptrace" assert call_kwargs["fallback_enabled_override"] is False - def test_proxy_is_stopped_on_tracer_not_found(self): - mock_proxy = MagicMock() - handle = ProxyHandle(process=MagicMock(), port=9090) - mock_proxy.start_for_run.return_value = handle - mock_proxy.stop_for_run.return_value = [] - + def test_runtime_resource_is_stopped_on_tracer_not_found(self): + resource = _FakeRuntimeResource(start_env={"AWS_ENDPOINT_URL": "http://127.0.0.1:9090"}) mock_tracer = MagicMock() mock_tracer.execute.side_effect = TracerNotFoundError("no tracer") - coord = RunCoordinator(tracer_service=mock_tracer, proxy_service=mock_proxy) - + coord = RunCoordinator(tracer_service=mock_tracer, runtime_resources=[resource]) with patch.object(coord, "_backup_previous_outputs"): result = coord.execute(_make_ctx()) assert result.exit_code == 1 - mock_proxy.stop_for_run.assert_called_once_with(handle) - - def test_proxy_is_stopped_when_tracer_log_missing(self): - mock_proxy = MagicMock() - handle = ProxyHandle(process=MagicMock(), port=9090) - mock_proxy.start_for_run.return_value = handle - mock_proxy.stop_for_run.return_value = [] + assert resource.stop_calls == [1] + def test_runtime_resource_is_stopped_when_tracer_log_missing(self): + resource = _FakeRuntimeResource(start_env={"AWS_ENDPOINT_URL": "http://127.0.0.1:9090"}) mock_tracer = MagicMock() tracer_result = _make_tracer_result() tracer_result.exit_code = 1 mock_tracer.execute.return_value = tracer_result - coord = RunCoordinator(tracer_service=mock_tracer, proxy_service=mock_proxy) + coord = RunCoordinator(tracer_service=mock_tracer, runtime_resources=[resource]) with ( patch.object(coord, "_backup_previous_outputs"), @@ -217,12 +194,20 @@ def test_proxy_is_stopped_when_tracer_log_missing(self): result = coord.execute(_make_ctx()) assert result.exit_code == 1 - mock_proxy.stop_for_run.assert_called_once_with(handle) - + assert resource.stop_calls == [1] + + def test_runtime_resource_observations_are_forwarded_to_record_job(self): + s3_entries = ( + S3LogEntry(operation="GetObject", bucket="bucket", key="input.csv", etag="etag"), + ) + resource = _FakeRuntimeResource( + start_env={"AWS_ENDPOINT_URL": "http://127.0.0.1:9090"}, + stop_result=RuntimeObservationBundle(s3_entries=s3_entries), + ) + mock_tracer = MagicMock() + mock_tracer.execute.return_value = _make_tracer_result() -class TestEndpointUrlChaining: - def _run_coord(self, coord): - """Execute coordinator with all deps mocked.""" + coord = RunCoordinator(tracer_service=mock_tracer, runtime_resources=[resource]) mock_prov = MagicMock() mock_prov.collect.return_value = {"data": {"read_files": [], "written_files": []}} @@ -230,55 +215,22 @@ def _run_coord(self, coord): patch("os.path.exists", return_value=True), patch("roar.integrations.config.load_config", return_value={}), patch("roar.execution.provenance.ProvenanceService", return_value=mock_prov), - patch.object(coord, "_record_job", return_value=(1, "abc123", [], [], [], [])), + patch.object( + coord, "_record_job", return_value=(1, "abc123", [], [], [], []) + ) as mock_record, patch.object(coord, "_backup_previous_outputs"), patch.object(coord, "_cleanup_logs"), ): - return coord.execute(_make_ctx()) - - def test_existing_aws_endpoint_url_passed_as_upstream(self, monkeypatch): - monkeypatch.setenv("AWS_ENDPOINT_URL", "http://localhost:4566") - - mock_proxy = MagicMock() - mock_proxy.start_for_run.return_value = ProxyHandle(process=MagicMock(), port=9090) - mock_proxy.stop_for_run.return_value = [] - - mock_tracer = MagicMock() - mock_tracer.execute.return_value = _make_tracer_result() - - coord = RunCoordinator(tracer_service=mock_tracer, proxy_service=mock_proxy) - self._run_coord(coord) - - mock_proxy.start_for_run.assert_called_once_with(upstream_url="http://localhost:4566") - - def test_no_aws_endpoint_url_means_upstream_none(self, monkeypatch): - monkeypatch.delenv("AWS_ENDPOINT_URL", raising=False) - - mock_proxy = MagicMock() - mock_proxy.start_for_run.return_value = ProxyHandle(process=MagicMock(), port=9090) - mock_proxy.stop_for_run.return_value = [] - - mock_tracer = MagicMock() - mock_tracer.execute.return_value = _make_tracer_result() - - coord = RunCoordinator(tracer_service=mock_tracer, proxy_service=mock_proxy) - self._run_coord(coord) - - mock_proxy.start_for_run.assert_called_once_with(upstream_url=None) - - def test_proxy_url_replaces_original_in_extra_env(self, monkeypatch): - monkeypatch.setenv("AWS_ENDPOINT_URL", "http://localhost:4566") + coord.execute(_make_ctx()) - mock_proxy = MagicMock() - mock_proxy.start_for_run.return_value = ProxyHandle(process=MagicMock(), port=8888) - mock_proxy.stop_for_run.return_value = [] + assert mock_record.call_args.args[5] == list(s3_entries) + def test_proxy_active_is_reflected_in_run_result(self): + resource = _FakeRuntimeResource(start_env={"AWS_ENDPOINT_URL": "http://127.0.0.1:9090"}) mock_tracer = MagicMock() mock_tracer.execute.return_value = _make_tracer_result() - coord = RunCoordinator(tracer_service=mock_tracer, proxy_service=mock_proxy) - self._run_coord(coord) + coord = RunCoordinator(tracer_service=mock_tracer, runtime_resources=[resource]) + result = self._run_coord(coord) - # The child should see our proxy's URL, not the original - call_kwargs = mock_tracer.execute.call_args.kwargs - assert call_kwargs["extra_env"]["AWS_ENDPOINT_URL"] == "http://127.0.0.1:8888" + assert result.proxy_active is True diff --git a/tests/unit/test_proxy_runtime_resource.py b/tests/unit/test_proxy_runtime_resource.py new file mode 100644 index 00000000..2e6216db --- /dev/null +++ b/tests/unit/test_proxy_runtime_resource.py @@ -0,0 +1,92 @@ +from unittest.mock import MagicMock + +import pytest + +from roar.execution.cluster.proxy import ProxyHandle, S3LogEntry +from roar.execution.runtime.errors import ExecutionSetupError +from roar.execution.runtime.proxy_resource import ProxyRuntimeResource + + +@pytest.fixture +def ctx(): + value = MagicMock() + value.repo_root = "/tmp/repo" + value.roar_dir = "/tmp/repo/.roar" + return value + + +def test_init_raises_when_proxy_binary_is_missing(): + service = MagicMock() + service.find_proxy.return_value = None + + with pytest.raises(ExecutionSetupError, match="roar-proxy binary not found"): + ProxyRuntimeResource(service=service) + + +def test_start_passes_existing_aws_endpoint_as_upstream(ctx): + service = MagicMock() + service.find_proxy.return_value = "/tmp/roar-proxy" + service.start_for_run.return_value = ProxyHandle(process=MagicMock(), port=9090) + + resource = ProxyRuntimeResource(service=service) + result = resource.start(ctx, {"AWS_ENDPOINT_URL": "http://localhost:4566"}) + + service.start_for_run.assert_called_once_with(upstream_url="http://localhost:4566") + assert result.env == { + "AWS_ENDPOINT_URL": "http://127.0.0.1:9090", + "ROAR_UPSTREAM_S3_ENDPOINT": "http://localhost:4566", + } + assert resource.active is True + + +def test_start_without_existing_endpoint_only_sets_proxy_url(ctx): + service = MagicMock() + service.find_proxy.return_value = "/tmp/roar-proxy" + service.start_for_run.return_value = ProxyHandle(process=MagicMock(), port=9090) + + resource = ProxyRuntimeResource(service=service) + result = resource.start(ctx, {}) + + service.start_for_run.assert_called_once_with(upstream_url=None) + assert result.env == {"AWS_ENDPOINT_URL": "http://127.0.0.1:9090"} + + +def test_start_failure_returns_empty_env_and_keeps_resource_inactive(ctx): + service = MagicMock() + service.find_proxy.return_value = "/tmp/roar-proxy" + service.start_for_run.side_effect = RuntimeError("boom") + + resource = ProxyRuntimeResource(service=service) + result = resource.start(ctx, {}) + + assert result.env == {} + assert resource.active is False + + +def test_stop_returns_collected_s3_entries_and_deactivates(ctx): + service = MagicMock() + service.find_proxy.return_value = "/tmp/roar-proxy" + handle = ProxyHandle(process=MagicMock(), port=9090) + service.start_for_run.return_value = handle + entries = [S3LogEntry(operation="GetObject", bucket="bucket", key="input.csv", etag="abc")] + service.stop_for_run.return_value = entries + + resource = ProxyRuntimeResource(service=service) + resource.start(ctx, {}) + + result = resource.stop(exit_code=0) + + service.stop_for_run.assert_called_once_with(handle) + assert result.s3_entries == tuple(entries) + assert resource.active is False + + +def test_stop_without_active_handle_returns_empty_observations(): + service = MagicMock() + service.find_proxy.return_value = "/tmp/roar-proxy" + + resource = ProxyRuntimeResource(service=service) + result = resource.stop(exit_code=0) + + assert result.s3_entries == () + service.stop_for_run.assert_not_called()