Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
478 changes: 478 additions & 0 deletions docs/proposals/extension-architecture-refactor-plan.md

Large diffs are not rendered by default.

15 changes: 11 additions & 4 deletions roar/application/publish/put_composites.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@
)
from ...core.interfaces.logger import ILogger
from ...db.context import optional_repo
from ...integrations.glaas import GlaasClient
from ...integrations.glaas.registration import _artifact_ref
from .remote_registry import RemoteRegistryTransport, coerce_remote_registry


def preregister_put_lineage_composites_with_glaas(
*,
db_ctx: Any,
glaas_client: GlaasClient,
remote_registry: RemoteRegistryTransport | None = None,
glaas_client: Any | None = None,
lineage_artifacts: list[dict[str, Any]],
session_hash: str,
registration_errors: list[str],
Expand All @@ -46,6 +47,7 @@ def preregister_put_lineage_composites_with_glaas(
logger=logger,
)
return preregister_lineage_composites(
remote_registry=remote_registry,
glaas_client=glaas_client,
payloads=payloads,
registration_errors=registration_errors,
Expand Down Expand Up @@ -153,14 +155,19 @@ def resolve_put_lineage_component_for_registration(
def register_put_composites_with_glaas(
*,
db_ctx: Any,
glaas_client: GlaasClient,
remote_registry: RemoteRegistryTransport | None = None,
glaas_client: Any | None = None,
composite_results: list[CompositeBuildResult],
registration_errors: list[str],
dataset_identifiers: list[dict[str, Any]] | None,
logger: ILogger,
) -> list[dict[str, Any]]:
"""Register generated composite artifacts with GLaaS and persist local state."""
composite_registrations: list[dict[str, Any]] = []
resolved_remote_registry = coerce_remote_registry(
remote_registry=remote_registry,
glaas_client=glaas_client,
)

for composite in composite_results:
payload = dict(composite.payload)
Expand All @@ -173,7 +180,7 @@ def register_put_composites_with_glaas(
if metadata_json is not None:
payload["metadata"] = metadata_json

response = glaas_client.register_composite_artifact(payload)
response = resolved_remote_registry.register_composite_artifact(payload)
result, error = parse_composite_registration_response(response)

composite_registration: dict[str, Any] = {
Expand Down
9 changes: 8 additions & 1 deletion roar/application/publish/put_preparation.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
detect_additional_publish_composite_roots,
infer_publish_dataset_identifiers,
)
from .remote_registry import coerce_remote_registry
from .runtime import PublishRuntime
from .session import prepare_publish_session

Expand Down Expand Up @@ -62,9 +63,15 @@ def prepare_put_execution(
logger=logger,
git_commit=git_commit,
)
publish_session = prepare_publish_session(
runtime_dict = getattr(runtime, "__dict__", {})
remote_registry = coerce_remote_registry(
remote_registry=runtime_dict.get("remote_registry"),
glaas_client=runtime.glaas_client,
session_service=runtime.session_service,
registration_coordinator=runtime_dict.get("registration_coordinator"),
)
publish_session = prepare_publish_session(
remote_registry=remote_registry,
roar_dir=roar_dir,
session_id=session_id,
git_context=git_context,
Expand Down
11 changes: 9 additions & 2 deletions roar/application/publish/register_preparation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from ...core.interfaces.registration import GitContext
from ...publish_auth import resolve_publish_creator_identity
from ..git import build_roar_git_tag_name, ensure_clean_git_repo, resolve_roar_git_context
from .remote_registry import coerce_remote_registry
from .runtime import PublishRuntime
from .session import prepare_publish_session

Expand Down Expand Up @@ -59,10 +60,16 @@ def prepare_register_execution(
git_tag_name = build_roar_git_tag_name(git_context.commit, short=True)
git_tag_repo_root = git_state.repo_root

creator_identity = resolve_publish_creator_identity(runtime.glaas_client.publish_auth)
publish_session = prepare_publish_session(
runtime_dict = getattr(runtime, "__dict__", {})
remote_registry = coerce_remote_registry(
remote_registry=runtime_dict.get("remote_registry"),
glaas_client=runtime.glaas_client,
session_service=runtime.session_service,
registration_coordinator=runtime_dict.get("registration_coordinator"),
)
creator_identity = resolve_publish_creator_identity(remote_registry.publish_auth)
publish_session = prepare_publish_session(
remote_registry=remote_registry,
roar_dir=roar_dir,
session_id=session_id,
git_context=git_context,
Expand Down
31 changes: 24 additions & 7 deletions roar/application/publish/registration.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

from ...core.interfaces.logger import ILogger
from ...core.interfaces.registration import BatchRegistrationResult, GitContext
from ...integrations.glaas import GlaasClient
from ..labels import collect_label_sync_payloads
from .remote_registry import RemoteRegistryTransport, coerce_remote_registry

_VALID_REMOTE_SOURCE_TYPES = {"s3", "gs", "https"}

Expand Down Expand Up @@ -328,7 +328,8 @@ def prepare_batch_registration_artifacts(
def register_publish_lineage(
*,
coordinator: Any,
glaas_client: GlaasClient,
remote_registry: RemoteRegistryTransport | None = None,
glaas_client: Any | None = None,
session_hash: str,
git_context: GitContext,
jobs: list[dict[str, Any]],
Expand All @@ -349,11 +350,16 @@ def register_publish_lineage(
if pre_registration_errors:
batch_result.errors = [*pre_registration_errors, *batch_result.errors]

resolved_remote_registry = coerce_remote_registry(
remote_registry=remote_registry,
glaas_client=glaas_client,
)

labels_are_safe_to_sync = batch_result.jobs_failed == 0 and batch_result.artifacts_failed == 0

if session_id is not None and db_ctx is not None and labels_are_safe_to_sync:
sync_publish_labels(
glaas_client=glaas_client,
remote_registry=resolved_remote_registry,
db_ctx=db_ctx,
session_id=session_id,
session_hash=session_hash,
Expand All @@ -367,16 +373,21 @@ def register_publish_lineage(

def preregister_lineage_composites(
*,
glaas_client: GlaasClient,
remote_registry: RemoteRegistryTransport | None = None,
glaas_client: Any | None = None,
payloads: list[CompositeRegistrationCandidate],
registration_errors: list[str],
logger: ILogger,
) -> list[dict[str, Any]]:
"""Register lineage composites before the main link phase."""
registrations: list[dict[str, Any]] = []
resolved_remote_registry = coerce_remote_registry(
remote_registry=remote_registry,
glaas_client=glaas_client,
)

for item in payloads:
response = glaas_client.register_composite_artifact(item.payload)
response = resolved_remote_registry.register_composite_artifact(item.payload)
result, error = parse_composite_registration_response(response)

registration: dict[str, Any] = {
Expand Down Expand Up @@ -413,7 +424,8 @@ def preregister_lineage_composites(

def sync_publish_labels(
*,
glaas_client: GlaasClient,
remote_registry: RemoteRegistryTransport | None = None,
glaas_client: Any | None = None,
db_ctx: Any,
session_id: int | None,
session_hash: str,
Expand All @@ -432,7 +444,12 @@ def sync_publish_labels(
if not payloads:
return

_label_result, label_error = glaas_client.sync_labels(payloads)
resolved_remote_registry = coerce_remote_registry(
remote_registry=remote_registry,
glaas_client=glaas_client,
)

_label_result, label_error = resolved_remote_registry.sync_labels(payloads)
if label_error and errors is not None:
errors.append(f"Label sync failed: {label_error}")

Expand Down
85 changes: 85 additions & 0 deletions roar/application/publish/remote_registry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
"""Remote lineage registry transport contracts for publish workflows."""

from __future__ import annotations

from dataclasses import dataclass
from typing import Any, Protocol


class RemoteRegistryTransport(Protocol):
"""Narrow transport contract for remote lineage registration workflows."""

@property
def name(self) -> str: ...

@property
def client(self) -> Any: ...

@property
def session_service(self) -> Any | None: ...

@property
def registration_coordinator(self) -> Any | None: ...

@property
def publish_auth(self) -> Any: ...

def is_configured(self) -> bool: ...

def health_check(self) -> Any: ...

def register_composite_artifact(self, payload: dict[str, Any]) -> Any: ...

def sync_labels(self, payloads: list[dict[str, Any]]) -> Any: ...


@dataclass(frozen=True)
class GlaasRemoteRegistryTransport:
"""GLaaS-backed implementation of the remote registry transport contract."""

client: Any
session_service: Any | None = None
registration_coordinator: Any | None = None
name: str = "glaas"

@property
def publish_auth(self) -> Any:
return getattr(self.client, "publish_auth", None)

def is_configured(self) -> bool:
return bool(self.client.is_configured())

def health_check(self) -> Any:
return self.client.health_check()

def register_composite_artifact(self, payload: dict[str, Any]) -> Any:
return self.client.register_composite_artifact(payload)

def sync_labels(self, payloads: list[dict[str, Any]]) -> Any:
return self.client.sync_labels(payloads)


def coerce_remote_registry(
*,
remote_registry: RemoteRegistryTransport | None = None,
glaas_client: Any | None = None,
session_service: Any | None = None,
registration_coordinator: Any | None = None,
) -> RemoteRegistryTransport:
"""Resolve the concrete remote registry transport from new or legacy inputs."""
if remote_registry is not None:
return remote_registry
if glaas_client is None:
raise ValueError("remote registry transport requires a client")
return GlaasRemoteRegistryTransport(
client=glaas_client,
session_service=session_service,
registration_coordinator=registration_coordinator,
)


__all__ = [
"GlaasRemoteRegistryTransport",
"RemoteRegistryTransport",
"coerce_remote_registry",
]
33 changes: 25 additions & 8 deletions roar/application/publish/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,31 @@
SessionRegistrationService,
)
from .lineage import LineageCollector
from .remote_registry import GlaasRemoteRegistryTransport, RemoteRegistryTransport


@dataclass(frozen=True)
class PublishRuntime:
"""Concrete dependency set for publish workflows."""

glaas_client: GlaasClient
session_service: SessionRegistrationService
registration_coordinator: RegistrationCoordinator
remote_registry: RemoteRegistryTransport
lineage_collector: LineageCollector

@property
def glaas_client(self):
"""Backward-compatible access to the underlying GLaaS client."""
return self.remote_registry.client

@property
def session_service(self):
"""Backward-compatible access to the publish session service."""
return self.remote_registry.session_service

@property
def registration_coordinator(self):
"""Backward-compatible access to the registration coordinator."""
return self.remote_registry.registration_coordinator


def build_publish_runtime(
*,
Expand All @@ -39,13 +53,16 @@ def build_publish_runtime(
session_service = SessionRegistrationService(glaas_client)
artifact_service = ArtifactRegistrationService(glaas_client)
job_service = JobRegistrationService(glaas_client)
return PublishRuntime(
glaas_client=glaas_client,
registration_coordinator = RegistrationCoordinator(
session_service=session_service,
registration_coordinator=RegistrationCoordinator(
artifact_service=artifact_service,
job_service=job_service,
)
return PublishRuntime(
remote_registry=GlaasRemoteRegistryTransport(
client=glaas_client,
session_service=session_service,
artifact_service=artifact_service,
job_service=job_service,
registration_coordinator=registration_coordinator,
),
lineage_collector=LineageCollector(),
)
21 changes: 16 additions & 5 deletions roar/application/publish/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,10 +164,17 @@ class _PutPlanResult:
class _RegisterPreviewRuntime:
"""Minimal runtime surface for local `roar register --dry-run` flows."""

glaas_client: Any
session_service: Any
remote_registry: Any
lineage_collector: Any

@property
def glaas_client(self) -> Any:
return self.remote_registry.client

@property
def session_service(self) -> Any:
return self.remote_registry.session_service


@dataclass(frozen=True)
class _PreparedRegisterPreviewExecution:
Expand All @@ -191,6 +198,7 @@ def build_register_preview_runtime(
from ...integrations.glaas.registration.session import SessionRegistrationService
from ...publish_auth import PublishAuthContext
from .lineage import LineageCollector
from .remote_registry import GlaasRemoteRegistryTransport

publish_auth = None
if not allow_public_without_binding:
Expand All @@ -208,9 +216,12 @@ def build_register_preview_runtime(
publish_auth=publish_auth,
allow_public_without_binding=allow_public_without_binding,
)
session_service = SessionRegistrationService(glaas_client)
return _RegisterPreviewRuntime(
glaas_client=glaas_client,
session_service=SessionRegistrationService(glaas_client),
remote_registry=GlaasRemoteRegistryTransport(
client=glaas_client,
session_service=session_service,
),
lineage_collector=LineageCollector(),
)

Expand All @@ -234,7 +245,7 @@ def prepare_register_preview_execution(
if session_hash_override:
session_hash = session_hash_override
elif lineage is not None:
creator_identity = resolve_publish_creator_identity(runtime.glaas_client.publish_auth)
creator_identity = resolve_publish_creator_identity(runtime.remote_registry.publish_auth)
session_hash = compute_canonical_session_hash(
build_canonical_session_payload(
lineage=lineage,
Expand Down
Loading
Loading