From 064be3dd3836b3b866d4bf82c32638f4e82b1864 Mon Sep 17 00:00:00 2001 From: Trevor Basinger Date: Fri, 17 Apr 2026 14:31:52 +0000 Subject: [PATCH 1/5] feat(labels): derive reserved job system labels --- roar/application/labels.py | 32 +- roar/application/query/show.py | 5 +- roar/application/system_labels.py | 608 +++++++++++++++++++ roar/backends/osmo/host_execution.py | 6 + roar/core/label_constants.py | 15 + roar/db/services/job_recording.py | 17 + roar/execution/recording/dataset_metadata.py | 18 +- roar/execution/recording/job_recording.py | 12 + roar/presenters/dag_data_builder.py | 6 +- tests/happy_path/test_label_command.py | 13 +- tests/unit/test_label_service.py | 13 + tests/unit/test_system_labels.py | 234 +++++++ 12 files changed, 935 insertions(+), 44 deletions(-) create mode 100644 roar/application/system_labels.py create mode 100644 roar/core/label_constants.py create mode 100644 tests/unit/test_system_labels.py diff --git a/roar/application/labels.py b/roar/application/labels.py index b5af310c..398d1924 100644 --- a/roar/application/labels.py +++ b/roar/application/labels.py @@ -15,10 +15,8 @@ from ..core.label_origins import LABEL_ORIGIN_USER, build_current_key_origins from ..db.context import DatabaseContext -from ..execution.recording.dataset_metadata import AUTO_DATASET_LABEL_KEYS from .label_rendering import flatten_label_metadata - -RESERVED_LABEL_KEYS = set(AUTO_DATASET_LABEL_KEYS) +from .system_labels import is_reserved_system_label_path, strip_reserved_system_labels @dataclass(frozen=True) @@ -195,7 +193,7 @@ def copy_metadata( destination: LabelTargetRef, ) -> LabelWriteResult: """Copy current source metadata into the destination as a patch.""" - source_metadata = _remove_reserved_paths(self.current_metadata(source), RESERVED_LABEL_KEYS) + source_metadata = strip_reserved_system_labels(self.current_metadata(source)) destination_metadata = self.current_metadata(destination) merged = _deep_merge(destination_metadata, source_metadata) if merged == destination_metadata: @@ -229,7 +227,7 @@ def copy_metadata( @staticmethod def _reject_reserved_keys(metadata: dict[str, Any]) -> None: keys = {key for key, _value in flatten_label_metadata(metadata)} - reserved = sorted(keys.intersection(RESERVED_LABEL_KEYS)) + reserved = sorted(key for key in keys if is_reserved_system_label_path(key)) if reserved: joined = ", ".join(reserved) raise ValueError(f"Reserved label keys cannot be set manually: {joined}") @@ -351,27 +349,3 @@ def _deep_merge(current: dict[str, Any], patch: dict[str, Any]) -> dict[str, Any else: merged[key] = value return merged - - -def _remove_reserved_paths(metadata: dict[str, Any], reserved_paths: set[str]) -> dict[str, Any]: - cleaned = json.loads(json.dumps(metadata)) - for path in reserved_paths: - _remove_nested(cleaned, path.split(".")) - return cleaned - - -def _remove_nested(root: dict[str, Any], path: list[str]) -> None: - if not path: - return - key = path[0] - if key not in root: - return - if len(path) == 1: - root.pop(key, None) - return - child = root.get(key) - if not isinstance(child, dict): - return - _remove_nested(child, path[1:]) - if not child: - root.pop(key, None) diff --git a/roar/application/query/show.py b/roar/application/query/show.py index 2b8a1afd..3552fa98 100644 --- a/roar/application/query/show.py +++ b/roar/application/query/show.py @@ -22,6 +22,7 @@ remote_artifact_fallback_enabled, run_local_then_remote_lookup, ) +from ..system_labels import omit_display_system_labels from .requests import ShowQueryRequest from .results import ( ShowArtifactComponentSummary, @@ -450,4 +451,6 @@ def _current_label_metadata( return None metadata = current.get("metadata") - return metadata if isinstance(metadata, dict) else None + if not isinstance(metadata, dict): + return None + return cast(dict[str, Any] | None, omit_display_system_labels(metadata)) diff --git a/roar/application/system_labels.py b/roar/application/system_labels.py new file mode 100644 index 00000000..5741bbce --- /dev/null +++ b/roar/application/system_labels.py @@ -0,0 +1,608 @@ +"""System-managed label derivation and refresh helpers.""" + +from __future__ import annotations + +import json +from collections.abc import Mapping +from typing import Any, cast + +from ..core.label_constants import AUTO_DATASET_LABEL_KEYS +from ..core.label_origins import LABEL_ORIGIN_SYSTEM +from ..db.context import optional_repo + +JOB_SYSTEM_LABEL_ROOT = "roar" +SYSTEM_LABEL_ROOT_PREFIXES = frozenset({JOB_SYSTEM_LABEL_ROOT}) +SYSTEM_LABEL_EXACT_PATHS = frozenset(AUTO_DATASET_LABEL_KEYS) +DISPLAY_FILTER_ROOT_PREFIXES = frozenset({JOB_SYSTEM_LABEL_ROOT}) + +_OPERATION_KEYS = ("get", "put", "osmo_submit", "osmo_attach") + + +def is_reserved_system_label_path(path: str) -> bool: + """Return True when a flattened label path is system-managed.""" + normalized = str(path or "").strip() + if not normalized: + return False + if normalized in SYSTEM_LABEL_EXACT_PATHS: + return True + return any( + normalized == prefix or normalized.startswith(prefix + ".") + for prefix in SYSTEM_LABEL_ROOT_PREFIXES + ) + + +def strip_reserved_system_labels(metadata: dict[str, Any]) -> dict[str, Any]: + """Return a metadata copy with all reserved system label keys removed.""" + cleaned = json.loads(json.dumps(metadata)) + for path in SYSTEM_LABEL_EXACT_PATHS: + _remove_nested(cleaned, path.split(".")) + for prefix in SYSTEM_LABEL_ROOT_PREFIXES: + _remove_nested(cleaned, [prefix]) + return cleaned + + +def omit_display_system_labels(metadata: dict[str, Any] | None) -> dict[str, Any] | None: + """Hide noisy system label roots from default user-facing renderers.""" + if not metadata: + return metadata + filtered = json.loads(json.dumps(metadata)) + for prefix in DISPLAY_FILTER_ROOT_PREFIXES: + _remove_nested(filtered, [prefix]) + return filtered or None + + +def refresh_job_system_labels( + db_ctx: object, *, job_id: int, job: Mapping[str, Any] | None = None +) -> None: + """Refresh the reserved system label portion for one local job.""" + labels_repo = optional_repo(db_ctx, "labels") + jobs_repo = optional_repo(db_ctx, "jobs") + if labels_repo is None or jobs_repo is None: + return + + job_row = dict(job) if job is not None else cast(Any, jobs_repo).get(job_id) + if not isinstance(job_row, dict): + return + + current = cast(Any, labels_repo).get_current("job", job_id=job_id) + current_metadata = current.get("metadata") if isinstance(current, dict) else {} + if not isinstance(current_metadata, dict): + current_metadata = {} + + user_metadata = strip_reserved_system_labels(current_metadata) + system_metadata = build_job_system_labels(job_row) + merged = _deep_merge(user_metadata, system_metadata) + if merged == current_metadata: + return + + cast(Any, labels_repo).create_version( + "job", + merged, + job_id=job_id, + write_origin=LABEL_ORIGIN_SYSTEM, + ) + + +def build_job_system_labels(job: Mapping[str, Any]) -> dict[str, Any]: + """Derive the system-managed job label document from a local job row.""" + metadata = _normalize_metadata(job.get("metadata")) + kind = _determine_operation_kind(job, metadata) + + roar: dict[str, Any] = { + "schema_version": 1, + "operation": {"kind": kind}, + } + + _populate_run_build_common_labels(roar, metadata) + _populate_task_labels(roar, job, metadata) + + if kind == "get": + _populate_get_labels(roar, metadata) + elif kind == "put": + _populate_put_labels(roar, metadata) + elif kind in {"osmo_submit", "osmo_attach"}: + _populate_osmo_labels(roar, metadata, kind=kind) + + return {JOB_SYSTEM_LABEL_ROOT: roar} + + +def _normalize_metadata(metadata: Any) -> dict[str, Any]: + if isinstance(metadata, dict): + return metadata + if isinstance(metadata, str): + try: + parsed = json.loads(metadata) + except (TypeError, ValueError): + return {} + return parsed if isinstance(parsed, dict) else {} + return {} + + +def _determine_operation_kind(job: Mapping[str, Any], metadata: dict[str, Any]) -> str: + for key in _OPERATION_KEYS: + if isinstance(metadata.get(key), dict): + return key + + job_type = str(job.get("job_type") or "").strip().lower() + if job_type == "build": + return "build" + if job_type in {"get", "put", "ray_task", "osmo_task"}: + return job_type + return "run" + + +def _populate_run_build_common_labels(roar: dict[str, Any], metadata: dict[str, Any]) -> None: + git = metadata.get("git") + if isinstance(git, dict): + _set_nested_value(roar, ["git", "commit"], git.get("commit")) + _set_nested_value(roar, ["git", "branch"], git.get("branch")) + _set_nested_value(roar, ["git", "remote_url"], git.get("remote_url")) + _set_nested_value(roar, ["git", "clean"], git.get("clean")) + _set_nested_value(roar, ["git", "commit_timestamp"], git.get("commit_timestamp")) + + _set_nested_value(roar, ["cwd"], metadata.get("cwd")) + + runtime = metadata.get("runtime") + if isinstance(runtime, dict): + _set_nested_value(roar, ["runtime", "hostname"], runtime.get("hostname")) + _copy_scalar_map(runtime.get("os"), roar, ["runtime", "os"]) + _copy_scalar_map(runtime.get("python"), roar, ["runtime", "python"]) + _copy_scalar_map(runtime.get("container"), roar, ["runtime", "container"]) + _copy_scalar_map(runtime.get("vm"), roar, ["runtime", "vm"]) + _copy_scalar_map(runtime.get("cuda"), roar, ["runtime", "cuda"]) + _copy_scalar_map(runtime.get("cpu"), roar, ["runtime", "cpu"]) + _copy_scalar_map(runtime.get("memory"), roar, ["runtime", "memory"]) + + gpu = runtime.get("gpu") + if isinstance(gpu, list) and gpu: + _set_nested_value(roar, ["runtime", "gpu", "count"], len(gpu)) + for index, row in enumerate(gpu): + if not isinstance(row, dict): + continue + for key in ("name", "memory_mb", "compute_cap"): + _set_nested_value( + roar, + ["runtime", "gpu", str(index), key], + row.get(key), + ) + + packages = metadata.get("packages") + if isinstance(packages, dict): + for manager, package_map in packages.items(): + if not isinstance(manager, str) or not isinstance(package_map, dict): + continue + for package_name, version in sorted(package_map.items()): + if not package_name: + continue + _set_nested_value( + roar, + ["packages", manager, str(package_name)], + version, + ) + + _populate_tracker_labels(roar, metadata) + _populate_dataset_identifier_labels(roar, metadata.get("dataset_identifiers")) + _populate_composite_labels(roar, metadata.get("composites"), prefix=["composites"]) + + +def _populate_tracker_labels(roar: dict[str, Any], metadata: dict[str, Any]) -> None: + analysis = metadata.get("analysis") + if not isinstance(analysis, dict): + return + experiment_tracking = analysis.get("experiment_tracking") + if not isinstance(experiment_tracking, dict): + return + + trackers_detected = experiment_tracking.get("trackers_detected") + trackers = ( + [str(item).strip() for item in trackers_detected if str(item).strip()] + if isinstance(trackers_detected, list) + else [] + ) + if trackers: + _set_nested_value(roar, ["tracker", "count"], len(trackers)) + for tracker in sorted(set(trackers)): + _set_nested_value(roar, ["tracker", "used", tracker], True) + + raw_runs = experiment_tracking.get("runs") + runs = ( + [item for item in raw_runs if isinstance(item, dict)] if isinstance(raw_runs, list) else [] + ) + if not runs: + return + + runs = sorted(runs, key=_tracker_run_sort_key) + for index, run in enumerate(runs): + for source_key, label_key in ( + ("tracker", "name"), + ("url", "url"), + ("run_id", "run_id"), + ("project", "project"), + ("entity", "entity"), + ("experiment_id", "experiment_id"), + ("run_name", "run_name"), + ("status", "status"), + ("start_time", "start_time"), + ("tracking_uri", "tracking_uri"), + ("runtime_seconds", "runtime_seconds"), + ): + _set_nested_value(roar, ["tracker", str(index), label_key], run.get(source_key)) + + by_name: dict[str, list[dict[str, Any]]] = {} + for run in runs: + tracker_name = str(run.get("tracker") or "").strip() + if not tracker_name: + continue + by_name.setdefault(tracker_name, []).append(run) + + for tracker_name, tracker_runs in sorted(by_name.items()): + _set_nested_value(roar, ["tracker", "by_name", tracker_name, "count"], len(tracker_runs)) + if len(tracker_runs) != 1: + continue + tracker_run = tracker_runs[0] + for source_key, label_key in ( + ("url", "url"), + ("project", "project"), + ("entity", "entity"), + ("run_id", "run_id"), + ): + _set_nested_value( + roar, + ["tracker", "by_name", tracker_name, label_key], + tracker_run.get(source_key), + ) + + +def _populate_dataset_identifier_labels(roar: dict[str, Any], raw: Any) -> None: + identifiers = [item for item in raw if isinstance(item, dict)] if isinstance(raw, list) else [] + if not identifiers: + return + + identifiers = sorted( + identifiers, + key=lambda item: ( + str(item.get("dataset_id") or ""), + str(item.get("split") or ""), + str(item.get("version_hint") or ""), + ), + ) + _set_nested_value(roar, ["datasets", "count"], len(identifiers)) + for index, identifier in enumerate(identifiers): + for source_key, label_key in ( + ("dataset_id", "id"), + ("dataset_fingerprint", "fingerprint"), + ("dataset_fingerprint_algorithm", "fingerprint_algorithm"), + ("confidence", "confidence"), + ("observed_paths", "observed_paths"), + ("split", "split"), + ("version_hint", "version_hint"), + ): + _set_nested_value(roar, ["datasets", str(index), label_key], identifier.get(source_key)) + + evidence = identifier.get("evidence") + evidence_values = ( + [str(item).strip() for item in evidence if str(item).strip()] + if isinstance(evidence, list) + else [] + ) + if evidence_values: + _set_nested_value( + roar, ["datasets", str(index), "evidence_count"], len(evidence_values) + ) + for evidence_index, value in enumerate(sorted(evidence_values)): + _set_nested_value( + roar, + ["datasets", str(index), "evidence", str(evidence_index)], + value, + ) + + +def _populate_composite_labels(roar: dict[str, Any], raw: Any, *, prefix: list[str]) -> None: + composites = [item for item in raw if isinstance(item, dict)] if isinstance(raw, list) else [] + if not composites: + return + + composites = sorted( + composites, + key=lambda item: (str(item.get("root_path") or ""), str(item.get("hash") or "")), + ) + _set_nested_value(roar, [*prefix, "count"], len(composites)) + for index, composite in enumerate(composites): + for source_key, label_key in ( + ("hash", "hash"), + ("root_path", "root_path"), + ("component_count_total", "component_count_total"), + ("component_count_stored", "component_count_stored"), + ("registered", "registered"), + ): + _set_nested_value(roar, [*prefix, str(index), label_key], composite.get(source_key)) + + +def _populate_task_labels( + roar: dict[str, Any], job: Mapping[str, Any], metadata: dict[str, Any] +) -> None: + kind = str(job.get("job_type") or "").strip().lower() + if kind not in {"ray_task", "osmo_task"} and not any( + key in metadata for key in ("task_identity", "task_id", "ray_task_id", "osmo_task_id") + ): + return + + _set_nested_value(roar, ["task", "identity"], metadata.get("task_identity")) + _set_nested_value( + roar, ["task", "backend"], metadata.get("backend") or job.get("execution_backend") + ) + _set_nested_value( + roar, + ["task", "id"], + metadata.get("task_id") or metadata.get("ray_task_id") or metadata.get("osmo_task_id"), + ) + _set_nested_value(roar, ["task", "name"], metadata.get("task_name")) + _set_nested_value( + roar, + ["task", "worker_id"], + metadata.get("worker_id") + or metadata.get("ray_worker_id") + or metadata.get("osmo_worker_id"), + ) + _set_nested_value( + roar, + ["task", "node_id"], + metadata.get("node_id") or metadata.get("ray_node_id") or metadata.get("osmo_node_id"), + ) + _set_nested_value( + roar, + ["task", "actor_id"], + metadata.get("actor_id") or metadata.get("ray_actor_id") or metadata.get("osmo_actor_id"), + ) + _set_nested_value(roar, ["task", "parent_job_uid"], metadata.get("parent_job_uid")) + + backend_metadata = metadata.get("backend_metadata") + if isinstance(backend_metadata, dict): + for source_key, label_key in ( + ("source_job_uid", "job_uid"), + ("source_execution_backend", "execution_backend"), + ("source_execution_role", "execution_role"), + ("source_job_type", "job_type"), + ): + _set_nested_value( + roar, + ["task", "source", label_key], + backend_metadata.get(source_key), + ) + + +def _populate_get_labels(roar: dict[str, Any], metadata: dict[str, Any]) -> None: + payload = metadata.get("get") + if not isinstance(payload, dict): + return + _set_nested_value(roar, ["get", "source_type"], payload.get("source_type")) + artifacts = payload.get("artifacts") + if isinstance(artifacts, dict): + _set_nested_value(roar, ["get", "artifact_count"], len(artifacts)) + _set_nested_value(roar, ["git", "commit"], payload.get("git_commit")) + _set_nested_value(roar, ["git", "tag"], payload.get("git_tag")) + + +def _populate_put_labels(roar: dict[str, Any], metadata: dict[str, Any]) -> None: + payload = metadata.get("put") + if not isinstance(payload, dict): + return + _set_nested_value(roar, ["put", "destination_type"], payload.get("destination_type")) + artifacts = payload.get("artifacts") + if isinstance(artifacts, dict): + _set_nested_value(roar, ["put", "artifact_count"], len(artifacts)) + _set_nested_value(roar, ["git", "commit"], payload.get("git_commit")) + _set_nested_value(roar, ["git", "tag"], payload.get("git_tag")) + _populate_dataset_identifier_labels(roar, payload.get("dataset_identifiers")) + _populate_composite_labels(roar, payload.get("composites"), prefix=["put", "composites"]) + _populate_composite_labels( + roar, + payload.get("lineage_composites"), + prefix=["put", "lineage_composites"], + ) + _set_nested_value( + roar, + ["put", "composites", "registered_count"], + _count_truthy(payload.get("composites"), key="registered"), + ) + _set_nested_value( + roar, + ["put", "lineage_composites", "registered_count"], + _count_truthy(payload.get("lineage_composites"), key="registered"), + ) + + +def _populate_osmo_labels(roar: dict[str, Any], metadata: dict[str, Any], *, kind: str) -> None: + payload = metadata.get(kind) + if not isinstance(payload, dict): + return + + operation = "submit" if kind == "osmo_submit" else "attach" + _set_nested_value(roar, ["osmo", "operation"], operation) + _set_nested_value(roar, ["osmo", "workflow_id"], payload.get("workflow_id")) + _set_nested_value(roar, ["osmo", "workflow_status"], payload.get("workflow_status")) + _set_nested_value(roar, ["osmo", "wait_for_completion"], payload.get("wait_for_completion")) + _set_nested_value( + roar, + ["osmo", "download_declared_outputs"], + payload.get("download_declared_outputs"), + ) + _set_nested_value( + roar, + ["osmo", "workflow_query_timed_out"], + payload.get("workflow_query_timed_out"), + ) + _set_nested_value(roar, ["git", "commit"], payload.get("git_commit")) + + submit_key = "submit" if kind == "osmo_submit" else "attach" + submit_payload = payload.get(submit_key) + if isinstance(submit_payload, dict): + _set_nested_value(roar, ["osmo", "pool"], submit_payload.get("pool")) + _set_nested_value(roar, ["osmo", "format_type"], submit_payload.get("format_type")) + _set_nested_value( + roar, + ["osmo", "dataset_hint_count"], + _safe_list_count(submit_payload.get("dataset_hints")), + ) + _set_nested_value( + roar, + ["osmo", "task_hint_count"], + _safe_list_count(submit_payload.get("task_name_hints")), + ) + prepared = submit_payload.get("prepared_workflow") + if isinstance(prepared, dict): + _set_nested_value( + roar, + ["osmo", "prepared_workflow", "wrapped_task_count"], + _safe_list_count(prepared.get("wrapped_tasks")), + ) + + downloaded_outputs = payload.get("downloaded_outputs") + outputs = ( + [item for item in downloaded_outputs if isinstance(item, dict)] + if isinstance(downloaded_outputs, list) + else [] + ) + if outputs: + outputs = sorted( + outputs, + key=lambda item: ( + str(item.get("dataset_name") or ""), + str(item.get("declared_path") or ""), + str(item.get("task_name") or ""), + ), + ) + _set_nested_value(roar, ["osmo", "download_count"], len(outputs)) + _set_nested_value(roar, ["osmo", "downloaded_outputs", "count"], len(outputs)) + for index, output in enumerate(outputs): + for source_key, label_key in ( + ("dataset_name", "dataset_name"), + ("declared_path", "declared_path"), + ("task_name", "task_name"), + ("file_count", "file_count"), + ): + _set_nested_value( + roar, + ["osmo", "downloaded_outputs", str(index), label_key], + output.get(source_key), + ) + + diagnostics = payload.get("workflow_diagnostics") + if isinstance(diagnostics, dict): + task_logs = diagnostics.get("task_logs") + _set_nested_value( + roar, + ["osmo", "diagnostics", "task_log_count"], + _safe_list_count(task_logs), + ) + + lineage = payload.get("lineage_reconstitution") + if isinstance(lineage, dict): + for source_key, label_key in ( + ("bundle_count", "bundle_count"), + ("fragments_processed", "fragments_processed"), + ("jobs_merged", "jobs_merged"), + ("artifacts_merged", "artifacts_merged"), + ): + _set_nested_value(roar, ["osmo", "lineage", label_key], lineage.get(source_key)) + + bundles = lineage.get("bundles") + bundle_rows = ( + [item for item in bundles if isinstance(item, dict)] + if isinstance(bundles, list) + else [] + ) + if bundle_rows: + bundle_rows = sorted( + bundle_rows, + key=lambda item: ( + str(item.get("dataset_name") or ""), + str(item.get("declared_path") or ""), + str(item.get("path") or ""), + ), + ) + _set_nested_value(roar, ["osmo", "lineage", "bundles", "count"], len(bundle_rows)) + for index, bundle in enumerate(bundle_rows): + for source_key, label_key in ( + ("dataset_name", "dataset_name"), + ("declared_path", "declared_path"), + ("task_name", "task_name"), + ("fragments", "fragments"), + ): + _set_nested_value( + roar, + ["osmo", "lineage", "bundles", str(index), label_key], + bundle.get(source_key), + ) + + +def _tracker_run_sort_key(run: dict[str, Any]) -> tuple[str, str, str, str]: + return ( + str(run.get("tracker") or ""), + str(run.get("url") or ""), + str(run.get("run_id") or ""), + str(run.get("project") or ""), + ) + + +def _safe_list_count(value: Any) -> int | None: + if not isinstance(value, list): + return None + return len(value) + + +def _count_truthy(value: Any, *, key: str) -> int | None: + if not isinstance(value, list): + return None + rows = [item for item in value if isinstance(item, dict)] + if not rows: + return 0 + return sum(1 for item in rows if item.get(key) is True) + + +def _copy_scalar_map(raw: Any, target: dict[str, Any], prefix: list[str]) -> None: + if not isinstance(raw, dict): + return + for key, value in raw.items(): + _set_nested_value(target, [*prefix, str(key)], value) + + +def _set_nested_value(target: dict[str, Any], path: list[str], value: Any) -> None: + if value is None: + return + cursor = target + for key in path[:-1]: + existing = cursor.get(key) + if not isinstance(existing, dict): + existing = {} + cursor[key] = existing + cursor = existing + cursor[path[-1]] = value + + +def _remove_nested(root: dict[str, Any], path: list[str]) -> None: + if not path: + return + key = path[0] + if key not in root: + return + if len(path) == 1: + root.pop(key, None) + return + child = root.get(key) + if not isinstance(child, dict): + return + _remove_nested(child, path[1:]) + if not child: + root.pop(key, None) + + +def _deep_merge(current: dict[str, Any], patch: dict[str, Any]) -> dict[str, Any]: + merged = json.loads(json.dumps(current)) + for key, value in patch.items(): + existing = merged.get(key) + if isinstance(existing, dict) and isinstance(value, dict): + merged[key] = _deep_merge(existing, value) + else: + merged[key] = value + return merged diff --git a/roar/backends/osmo/host_execution.py b/roar/backends/osmo/host_execution.py index 6723ef1d..0c0e72fd 100644 --- a/roar/backends/osmo/host_execution.py +++ b/roar/backends/osmo/host_execution.py @@ -14,6 +14,7 @@ import yaml # type: ignore[import-untyped] +from roar.application.system_labels import refresh_job_system_labels from roar.backends.osmo.config import load_osmo_backend_config from roar.backends.osmo.lineage import ( OsmoLineageReconstitutionResult, @@ -1425,6 +1426,11 @@ def _update_recorded_osmo_submit( ) -> None: if metadata is not None: db_ctx.jobs.update_metadata(job_id, metadata) + refresh_job_system_labels( + db_ctx, + job_id=job_id, + job=db_ctx.jobs.get(job_id), + ) artifact_id, _created = db_ctx.artifacts.register( hashes=receipt_artifact.hashes, diff --git a/roar/core/label_constants.py b/roar/core/label_constants.py new file mode 100644 index 00000000..3611c434 --- /dev/null +++ b/roar/core/label_constants.py @@ -0,0 +1,15 @@ +"""Shared constants for system-managed label paths.""" + +from __future__ import annotations + +AUTO_DATASET_LABEL_KEYS = frozenset( + { + "dataset.type", + "dataset.id", + "dataset.fingerprint", + "dataset.fingerprint_algorithm", + "dataset.split", + "dataset.version_hint", + "dataset.modality", + } +) diff --git a/roar/db/services/job_recording.py b/roar/db/services/job_recording.py index b321ea0a..131bb4e7 100644 --- a/roar/db/services/job_recording.py +++ b/roar/db/services/job_recording.py @@ -9,6 +9,7 @@ from sqlalchemy.orm import Session as SASession +from ...application.system_labels import refresh_job_system_labels from ...core.label_origins import LABEL_ORIGIN_USER from ...core.step_name import STEP_NAME_LABEL_KEY, get_step_name_label from ..repositories import ( @@ -63,6 +64,16 @@ def __init__( self._hashing_service = hashing_service self._session_service = session_service + @property + def labels(self) -> SQLAlchemyLabelRepository: + """Expose label repo for shared system-label refresh helpers.""" + return self._label_repo + + @property + def jobs(self) -> SQLAlchemyJobRepository: + """Expose job repo for shared system-label refresh helpers.""" + return self._job_repo + def record_job( self, command: str, @@ -166,6 +177,12 @@ def record_job( telemetry=telemetry, ) + refresh_job_system_labels( + self, + job_id=job_id, + job=self._job_repo.get(job_id), + ) + if step_name: self._record_step_name_label(job_id, step_name) diff --git a/roar/execution/recording/dataset_metadata.py b/roar/execution/recording/dataset_metadata.py index 8fbca5f8..23dfb040 100644 --- a/roar/execution/recording/dataset_metadata.py +++ b/roar/execution/recording/dataset_metadata.py @@ -5,19 +5,15 @@ from typing import Any from urllib.parse import urlparse +from ...core.label_constants import AUTO_DATASET_LABEL_KEYS from .dataset_profile import build_dataset_profile -AUTO_DATASET_LABEL_KEYS = frozenset( - { - "dataset.type", - "dataset.id", - "dataset.fingerprint", - "dataset.fingerprint_algorithm", - "dataset.split", - "dataset.version_hint", - "dataset.modality", - } -) +__all__ = [ + "AUTO_DATASET_LABEL_KEYS", + "build_dataset_label_metadata", + "build_dataset_metadata", + "find_matching_identifier", +] def find_matching_identifier( diff --git a/roar/execution/recording/job_recording.py b/roar/execution/recording/job_recording.py index 4e360e7c..5da21238 100644 --- a/roar/execution/recording/job_recording.py +++ b/roar/execution/recording/job_recording.py @@ -24,6 +24,7 @@ from typing import TYPE_CHECKING, Any, ClassVar, cast from urllib.parse import urlparse +from ...application.system_labels import refresh_job_system_labels from ...core.label_origins import LABEL_ORIGIN_SYSTEM from ...db.context import optional_repo from ...db.hashing import hash_files_blake3 @@ -251,6 +252,12 @@ def record( is_input=False, ) + refresh_job_system_labels( + db_ctx, + job_id=job_id, + job=cast(Any, db_ctx.jobs).get(job_id), + ) + return job_id, recorded_job_uid @staticmethod @@ -905,3 +912,8 @@ def _update_job_metadata(db_ctx: Any, job_id: int, metadata_json: str | None) -> return with suppress(Exception): jobs_repo.update_metadata(job_id, metadata_json) + refresh_job_system_labels( + db_ctx, + job_id=job_id, + job=cast(Any, jobs_repo).get(job_id), + ) diff --git a/roar/presenters/dag_data_builder.py b/roar/presenters/dag_data_builder.py index 0a51b4f3..aec1099b 100644 --- a/roar/presenters/dag_data_builder.py +++ b/roar/presenters/dag_data_builder.py @@ -4,6 +4,7 @@ from typing import Any, cast +from ..application.system_labels import omit_display_system_labels from ..core.step_name import resolve_step_name from ..db.context import optional_repo from ..execution.framework.registry import ( @@ -422,4 +423,7 @@ def _current_labels( if not isinstance(current, dict): return {} metadata = current.get("metadata") - return metadata if isinstance(metadata, dict) else {} + if not isinstance(metadata, dict): + return {} + filtered = omit_display_system_labels(metadata) + return filtered if isinstance(filtered, dict) else {} diff --git a/tests/happy_path/test_label_command.py b/tests/happy_path/test_label_command.py index 17d692d2..08e51f16 100644 --- a/tests/happy_path/test_label_command.py +++ b/tests/happy_path/test_label_command.py @@ -372,8 +372,17 @@ def test_run_name_creates_job_name_label_without_persisting_legacy_step_name( assert preprocess_node["step_name"] == "preprocess" assert preprocess_node["labels"] == {"name": "preprocess"} - assert _job_label_rows(temp_git_repo, 1) == [(1, {"name": "preprocess"})] - assert _job_label_write_origins(temp_git_repo, 1) == [(1, "user")] + rows = _job_label_rows(temp_git_repo, 1) + assert rows[0][0] == 1 + assert rows[0][1]["roar"]["operation"]["kind"] == "run" + assert rows[1] == ( + 2, + { + **rows[0][1], + "name": "preprocess", + }, + ) + assert _job_label_write_origins(temp_git_repo, 1) == [(1, "system"), (2, "user")] db_path = temp_git_repo / ".roar" / "roar.db" with sqlite3.connect(db_path) as conn: diff --git a/tests/unit/test_label_service.py b/tests/unit/test_label_service.py index 37d8a10c..817e293f 100644 --- a/tests/unit/test_label_service.py +++ b/tests/unit/test_label_service.py @@ -20,6 +20,19 @@ def test_reject_reserved_keys_blocks_system_managed_dataset_labels() -> None: ) +def test_reject_reserved_keys_blocks_system_managed_roar_labels() -> None: + with pytest.raises(ValueError, match="Reserved label keys cannot be set manually"): + LabelService._reject_reserved_keys( + { + "roar": { + "git": { + "commit": "deadbeef", + } + } + } + ) + + def test_build_current_key_origins_replays_user_and_system_versions() -> None: history = [ { diff --git a/tests/unit/test_system_labels.py b/tests/unit/test_system_labels.py new file mode 100644 index 00000000..a6334b0a --- /dev/null +++ b/tests/unit/test_system_labels.py @@ -0,0 +1,234 @@ +from __future__ import annotations + +import json +from pathlib import Path + +from roar.application.system_labels import ( + build_job_system_labels, + omit_display_system_labels, + refresh_job_system_labels, +) +from roar.db.context import create_database_context +from roar.execution.recording import LocalJobRecorder, LocalRecordedArtifact + + +def test_build_job_system_labels_derives_common_runtime_tracker_and_dataset_fields() -> None: + labels = build_job_system_labels( + { + "job_type": "run", + "metadata": { + "git": { + "commit": "deadbeef", + "branch": "main", + "remote_url": "https://github.com/treqs/roar.git", + "clean": True, + "commit_timestamp": "2026-04-16T00:00:00+00:00", + }, + "cwd": "experiments/demo", + "runtime": { + "hostname": "host-1", + "os": {"system": "Linux", "machine": "x86_64"}, + "python": {"version": "3.12.2", "implementation": "CPython"}, + "gpu": [{"name": "NVIDIA A100", "memory_mb": 81920, "compute_cap": "8.0"}], + "cpu": {"count": 32, "model": "AMD EPYC"}, + "memory": {"total_mb": 262144}, + }, + "packages": {"pip": {"numpy": "1.26.4"}, "dpkg": {"libc6": "2.35"}}, + "analysis": { + "experiment_tracking": { + "trackers_detected": ["wandb"], + "runs": [ + { + "tracker": "wandb", + "entity": "acme", + "project": "forecast", + "run_id": "abc123", + "url": "https://wandb.ai/acme/forecast/runs/abc123", + } + ], + } + }, + "dataset_identifiers": [ + { + "dataset_id": "file:///data/train", + "dataset_fingerprint": "a1b2c3d4", + "dataset_fingerprint_algorithm": "blake3", + "confidence": 0.91, + "observed_paths": 3, + "split": "train", + "version_hint": "v2", + "evidence": ["explicit_root_hint", "partition_collapse"], + } + ], + "composites": [ + { + "hash": "f" * 64, + "root_path": "/repo/outputs/dataset", + "component_count_total": 2, + "component_count_stored": 2, + } + ], + }, + } + ) + + roar = labels["roar"] + assert roar["schema_version"] == 1 + assert roar["operation"]["kind"] == "run" + assert roar["git"]["commit"] == "deadbeef" + assert roar["cwd"] == "experiments/demo" + assert roar["runtime"]["python"]["version"] == "3.12.2" + assert roar["runtime"]["gpu"]["count"] == 1 + assert roar["runtime"]["gpu"]["0"]["name"] == "NVIDIA A100" + assert roar["packages"]["pip"]["numpy"] == "1.26.4" + assert roar["tracker"]["0"]["url"] == "https://wandb.ai/acme/forecast/runs/abc123" + assert roar["tracker"]["by_name"]["wandb"]["url"] == ( + "https://wandb.ai/acme/forecast/runs/abc123" + ) + assert roar["datasets"]["0"]["id"] == "file:///data/train" + assert roar["composites"]["0"]["root_path"] == "/repo/outputs/dataset" + + +def test_omit_display_system_labels_removes_roar_root_only() -> None: + metadata = { + "name": "preprocess", + "phase": "train", + "roar": {"operation": {"kind": "run"}}, + "dataset": {"type": "dataset"}, + } + + filtered = omit_display_system_labels(metadata) + + assert filtered == { + "name": "preprocess", + "phase": "train", + "dataset": {"type": "dataset"}, + } + + +def test_refresh_job_system_labels_preserves_user_labels_and_replaces_reserved_prefix( + tmp_path: Path, +) -> None: + roar_dir = tmp_path / ".roar" + roar_dir.mkdir() + + with create_database_context(roar_dir) as db_ctx: + session_id = db_ctx.sessions.create(make_active=True) + job_id, _job_uid = db_ctx.jobs.create( + command="roar get s3://bucket/model.pt", + timestamp=1700000000.0, + session_id=session_id, + step_number=1, + metadata=json.dumps( + { + "get": { + "source": "s3://bucket/model.pt", + "source_type": "s3", + "artifacts": {"model.pt": "s3://bucket/model.pt"}, + "git_commit": "deadbeef", + "git_tag": None, + "timestamp": 123.0, + } + } + ), + execution_backend="local", + execution_role="host", + job_type="get", + exit_code=0, + ) + db_ctx.labels.create_version( + "job", + { + "phase": "download", + "roar": {"operation": {"kind": "stale"}, "get": {"source_type": "old"}}, + }, + job_id=job_id, + write_origin="user", + ) + + db_ctx.jobs.update_metadata( + job_id, + json.dumps( + { + "put": { + "destination": "s3://bucket/out", + "destination_type": "s3", + "artifacts": {"artifact-1": "s3://bucket/out"}, + "composites": [ + {"hash": "a" * 64, "root_path": "/repo/out", "registered": True} + ], + "lineage_composites": [], + "dataset_identifiers": [], + "git_commit": "feedface", + "git_tag": "put/feedface", + "timestamp": 456.0, + } + } + ), + ) + refresh_job_system_labels(db_ctx, job_id=job_id) + + current = db_ctx.labels.get_current("job", job_id=job_id) + + assert current is not None + metadata = current["metadata"] + assert metadata["phase"] == "download" + assert metadata["roar"]["operation"]["kind"] == "put" + assert metadata["roar"]["put"]["destination_type"] == "s3" + assert metadata["roar"]["put"]["artifact_count"] == 1 + assert metadata["roar"]["git"]["commit"] == "feedface" + assert metadata["roar"]["git"]["tag"] == "put/feedface" + assert metadata["roar"]["put"]["composites"]["registered_count"] == 1 + assert metadata["roar"]["put"]["composites"]["0"]["root_path"] == "/repo/out" + + +def test_local_job_recorder_records_get_system_labels(tmp_path: Path) -> None: + repo_root = tmp_path / "repo" + repo_root.mkdir() + roar_dir = repo_root / ".roar" + roar_dir.mkdir() + downloaded_file = repo_root / "downloaded.bin" + downloaded_file.write_bytes(b"payload") + + with create_database_context(roar_dir) as db_ctx: + session_id = db_ctx.sessions.create(make_active=True) + recorder = LocalJobRecorder() + job_id, _job_uid = recorder.record( + db_ctx, + command="roar get s3://bucket/downloaded.bin", + timestamp=1700000000.0, + metadata=json.dumps( + { + "get": { + "source": "s3://bucket/downloaded.bin", + "source_type": "s3", + "message": "download", + "artifacts": {str(downloaded_file): "s3://bucket/downloaded.bin"}, + "git_commit": "deadbeef", + "git_tag": None, + "timestamp": 123.0, + } + } + ), + execution_backend="local", + execution_role="host", + job_type="get", + session_id=session_id, + output_artifacts=[ + LocalRecordedArtifact( + path=str(downloaded_file), + hashes={"blake3": "abc123"}, + size=downloaded_file.stat().st_size, + ) + ], + exit_code=0, + ) + + current = db_ctx.labels.get_current("job", job_id=job_id) + + assert current is not None + metadata = current["metadata"] + assert metadata["roar"]["operation"]["kind"] == "get" + assert metadata["roar"]["get"]["source_type"] == "s3" + assert metadata["roar"]["get"]["artifact_count"] == 1 + assert metadata["roar"]["git"]["commit"] == "deadbeef" From d0f897e4a377a6740351b5d5b2b1a2fd5c02909e Mon Sep 17 00:00:00 2001 From: Trevor Basinger Date: Fri, 17 Apr 2026 14:47:37 +0000 Subject: [PATCH 2/5] fix(labels): refresh task labels after fragment merge --- roar/application/system_labels.py | 6 ++- roar/execution/fragments/lineage.py | 36 +++++++++++++++++ .../ray/unit/test_collector_fragments.py | 39 +++++++++++++++++++ tests/backends/test_osmo_host_execution.py | 8 ++++ tests/unit/test_system_labels.py | 22 +++++++++++ 5 files changed, 110 insertions(+), 1 deletion(-) diff --git a/roar/application/system_labels.py b/roar/application/system_labels.py index 5741bbce..4cf01b91 100644 --- a/roar/application/system_labels.py +++ b/roar/application/system_labels.py @@ -354,7 +354,11 @@ def _populate_task_labels( ["task", "actor_id"], metadata.get("actor_id") or metadata.get("ray_actor_id") or metadata.get("osmo_actor_id"), ) - _set_nested_value(roar, ["task", "parent_job_uid"], metadata.get("parent_job_uid")) + _set_nested_value( + roar, + ["task", "parent_job_uid"], + metadata.get("parent_job_uid") or job.get("parent_job_uid"), + ) backend_metadata = metadata.get("backend_metadata") if isinstance(backend_metadata, dict): diff --git a/roar/execution/fragments/lineage.py b/roar/execution/fragments/lineage.py index e6a19075..535d1c9f 100644 --- a/roar/execution/fragments/lineage.py +++ b/roar/execution/fragments/lineage.py @@ -8,6 +8,7 @@ from collections import deque from collections.abc import Callable, Mapping from dataclasses import dataclass +from pathlib import Path from typing import Any from roar.execution.fragments.models import ( @@ -31,6 +32,12 @@ class FragmentLineageBackend: task_identity_from_metadata: Callable[[str, str, Mapping[str, Any]], str] +def _get_logger(): + from roar.core.logging import get_logger + + return get_logger() + + def merge_execution_fragments( *, fragments: list[ExecutionFragment], @@ -49,6 +56,8 @@ def merge_execution_fragments( conn.execute("PRAGMA foreign_keys = ON") conn.execute("PRAGMA journal_mode = WAL") + touched_job_ids: set[int] = set() + committed = False try: artifact_columns = { row["name"] for row in conn.execute("PRAGMA table_info(artifacts)").fetchall() @@ -92,6 +101,7 @@ def merge_execution_fragments( if row is None: continue job_id = int(row["id"]) + touched_job_ids.add(job_id) written_artifact_ids: dict[str, str] = {} @@ -131,9 +141,35 @@ def merge_execution_fragments( ) conn.commit() + committed = True finally: conn.close() + if committed: + _refresh_fragment_job_system_labels(project_dir=project_dir, job_ids=touched_job_ids) + + +def _refresh_fragment_job_system_labels(*, project_dir: str, job_ids: set[int]) -> None: + if not job_ids: + return + + try: + from roar.application.system_labels import refresh_job_system_labels + from roar.db.context import create_database_context + except Exception: + return + + try: + with create_database_context(Path(project_dir) / ".roar") as db_ctx: + for job_id in sorted(job_ids): + refresh_job_system_labels(db_ctx, job_id=job_id) + except Exception as exc: + _get_logger().warning( + "Failed to refresh fragment job system labels in %s: %s", + project_dir, + exc, + ) + def assign_execution_fragment_step_numbers( fragments: list[ExecutionFragment], diff --git a/tests/backends/ray/unit/test_collector_fragments.py b/tests/backends/ray/unit/test_collector_fragments.py index caccae15..8821b6d8 100644 --- a/tests/backends/ray/unit/test_collector_fragments.py +++ b/tests/backends/ray/unit/test_collector_fragments.py @@ -6,6 +6,7 @@ from roar.backends.ray import collector as ray_collector from roar.backends.ray.collector import collect_fragments from roar.backends.ray.fragment import ArtifactRef, TaskFragment +from roar.db.context import create_database_context from roar.db.schema import SCHEMA @@ -281,6 +282,44 @@ def test_collect_fragments_persists_artifact_size_from_fragment_refs(tmp_path: P conn.close() +def test_collect_fragments_refreshes_ray_task_system_labels(tmp_path: Path) -> None: + project_dir = tmp_path / "project" + _init_db(project_dir) + + fragment = TaskFragment( + job_uid="raylabels1", + parent_job_uid="driver-main", + ray_task_id="task-labels", + ray_worker_id="worker-7", + ray_node_id="node-2", + ray_actor_id=None, + function_name="process", + started_at=1.0, + ended_at=2.0, + exit_code=0, + writes=[_ref("a" * 64)], + ) + + collect_fragments( + fragments=[fragment.to_dict()], + project_dir=str(project_dir), + driver_job_uid="driver-main", + ) + + with create_database_context(project_dir / ".roar") as db_ctx: + job = db_ctx.jobs.get_by_uid(fragment.job_uid) + assert job is not None + current = db_ctx.labels.get_current("job", job_id=int(job["id"])) + + assert current is not None + assert current["write_origin"] == "system" + metadata = current["metadata"] + assert metadata["roar"]["operation"]["kind"] == "ray_task" + assert metadata["roar"]["task"]["backend"] == "ray" + assert metadata["roar"]["task"]["id"] == "task-labels" + assert metadata["roar"]["task"]["parent_job_uid"] == "driver-main" + + def test_collect_fragments_shortens_command_to_task_family_and_keeps_full_script( tmp_path: Path, ) -> None: diff --git a/tests/backends/test_osmo_host_execution.py b/tests/backends/test_osmo_host_execution.py index 7fc9d3d8..889e40e2 100644 --- a/tests/backends/test_osmo_host_execution.py +++ b/tests/backends/test_osmo_host_execution.py @@ -551,11 +551,19 @@ def _run(command, *args, **kwargs): with create_database_context(roar_dir) as db_ctx: child_outputs = db_ctx.jobs.get_outputs(int(child_job["id"])) child_inputs = db_ctx.jobs.get_inputs(int(child_job["id"])) + child_label = db_ctx.labels.get_current("job", job_id=int(child_job["id"])) assert [entry["path"] for entry in child_inputs] == [str(workflow_path)] assert [entry["path"] for entry in child_outputs] == [ str(repo_root / "outputs" / "worker-output.txt") ] + assert child_label is not None + assert child_label["write_origin"] == "system" + child_label_metadata = child_label["metadata"] + assert child_label_metadata["roar"]["operation"]["kind"] == "osmo_task" + assert child_label_metadata["roar"]["task"]["backend"] == "osmo" + assert child_label_metadata["roar"]["task"]["id"] == "basic-task" + assert child_label_metadata["roar"]["task"]["parent_job_uid"] == result.job_uid output_paths = {Path(str(entry["path"])) for entry in result.outputs} assert any(path.name == "roar-fragments.json" for path in output_paths) diff --git a/tests/unit/test_system_labels.py b/tests/unit/test_system_labels.py index a6334b0a..76a24d6a 100644 --- a/tests/unit/test_system_labels.py +++ b/tests/unit/test_system_labels.py @@ -106,6 +106,28 @@ def test_omit_display_system_labels_removes_roar_root_only() -> None: } +def test_build_job_system_labels_uses_job_row_parent_for_task_labels() -> None: + labels = build_job_system_labels( + { + "job_type": "ray_task", + "execution_backend": "ray", + "parent_job_uid": "driver-main", + "metadata": { + "task_identity": "identity-1", + "ray_task_id": "task-1", + "ray_worker_id": "worker-1", + "ray_node_id": "node-1", + }, + } + ) + + roar = labels["roar"] + assert roar["operation"]["kind"] == "ray_task" + assert roar["task"]["backend"] == "ray" + assert roar["task"]["id"] == "task-1" + assert roar["task"]["parent_job_uid"] == "driver-main" + + def test_refresh_job_system_labels_preserves_user_labels_and_replaces_reserved_prefix( tmp_path: Path, ) -> None: From 9f6ef2080e84f158285a3f2e444e1704dc6a6359 Mon Sep 17 00:00:00 2001 From: Trevor Basinger Date: Fri, 17 Apr 2026 14:59:20 +0000 Subject: [PATCH 3/5] fix(ray): refresh labels after composite reconstitution --- roar/backends/ray/fragment_reconstituter.py | 2 + .../ray/unit/test_fragment_reconstituter.py | 90 +++++++++++++++++++ 2 files changed, 92 insertions(+) diff --git a/roar/backends/ray/fragment_reconstituter.py b/roar/backends/ray/fragment_reconstituter.py index cd42e375..a77adf32 100644 --- a/roar/backends/ray/fragment_reconstituter.py +++ b/roar/backends/ray/fragment_reconstituter.py @@ -12,6 +12,7 @@ from cryptography.hazmat.primitives.ciphers.aead import AESGCM +from roar.application.system_labels import refresh_job_system_labels from roar.db.context import create_database_context from .collector import _resolve_active_session_context, collect_fragments @@ -507,6 +508,7 @@ def _materialize_reconstituted_composites(self, fragments: list[dict[str, Any]]) materialized, ) db_ctx.jobs.update_metadata(job_id, metadata_json) + refresh_job_system_labels(db_ctx, job_id=job_id) except Exception as exc: _get_logger().warning( "Failed to materialize composite artifacts during fragment reconstitution for session %s: %s", diff --git a/tests/backends/ray/unit/test_fragment_reconstituter.py b/tests/backends/ray/unit/test_fragment_reconstituter.py index f13ee891..1b94df54 100644 --- a/tests/backends/ray/unit/test_fragment_reconstituter.py +++ b/tests/backends/ray/unit/test_fragment_reconstituter.py @@ -9,6 +9,8 @@ import pytest from cryptography.hazmat.primitives.ciphers.aead import AESGCM +from roar.application.system_labels import refresh_job_system_labels +from roar.db.context import create_database_context from roar.execution.fragments.sessions import ( generate_fragment_session, load_fragment_session, @@ -598,3 +600,91 @@ def _fake_urlopen(request: urllib.request.Request, timeout: int = 0): ).reconstitute() assert key_path.exists() + + +def test_materialized_reconstituted_composites_refresh_job_system_labels( + monkeypatch: pytest.MonkeyPatch, + tmp_path: Path, +) -> None: + import roar.execution.recording as execution_recording + + module = _module() + repo_root = tmp_path / "repo" + repo_root.mkdir() + roar_dir = repo_root / ".roar" + roar_dir.mkdir() + output_dir = repo_root / "outputs" + output_dir.mkdir() + output_path = output_dir / "result.txt" + output_path.write_text("ok\n", encoding="utf-8") + + with create_database_context(roar_dir) as db_ctx: + session_id = db_ctx.sessions.create(make_active=True) + job_id, job_uid = db_ctx.jobs.create( + command="ray_task:basic", + timestamp=1.0, + job_uid="ray-task-1", + session_id=session_id, + step_number=2, + execution_backend="ray", + execution_role="task", + job_type="ray_task", + metadata=json.dumps( + { + "task_identity": "identity-1", + "backend": "ray", + "ray_task_id": "task-1", + "ray_worker_id": "worker-1", + "ray_node_id": "node-1", + "task_name": "basic", + } + ), + ) + artifact_id, _created = db_ctx.artifacts.register( + hashes={"blake3": "a" * 64}, + size=output_path.stat().st_size, + path=str(output_path), + ) + db_ctx.jobs.add_output(job_id, artifact_id, str(output_path)) + refresh_job_system_labels(db_ctx, job_id=job_id) + before = db_ctx.labels.get_current("job", job_id=job_id) + + assert job_uid == "ray-task-1" + assert before is not None + assert "composites" not in before["metadata"]["roar"] + + monkeypatch.setattr( + execution_recording.RunCompositeMaterializationConfig, + "from_repo_root", + classmethod( + lambda cls, repo_root: cls( + enabled=True, + min_confidence=0.0, + min_components=1, + max_roots_per_job=1, + ) + ), + ) + monkeypatch.setattr( + execution_recording.DatasetIdentifierInferer, + "infer", + lambda self, paths, repo_root, min_confidence=0.0: [ + {"dataset_id": str(output_dir), "confidence": 1.0} + ], + ) + + reconstituter = module.FragmentReconstituter( + session_id="session-composites", + token="ab" * 32, + glaas_url="http://localhost:3001", + roar_db_path=roar_dir / "roar.db", + ) + reconstituter._materialize_reconstituted_composites([{"job_uid": job_uid}]) + + with create_database_context(roar_dir) as db_ctx: + after = db_ctx.labels.get_current("job", job_id=job_id) + + assert after is not None + composites = after["metadata"]["roar"]["composites"] + assert composites["count"] == 1 + assert composites["0"]["root_path"] == str(output_dir) From 98b9661e30208dbd4461e07096edc9aa6d8cc52a Mon Sep 17 00:00:00 2001 From: Trevor Basinger Date: Fri, 17 Apr 2026 15:38:22 +0000 Subject: [PATCH 4/5] fix(put): sync publish job system labels --- roar/application/publish/put_execution.py | 38 +++++++++++++++- roar/application/publish/registration.py | 4 +- tests/integration/test_put_cli_integration.py | 15 ++++++- tests/unit/put/test_put_service.py | 44 +++++++++++++++++++ 4 files changed, 96 insertions(+), 5 deletions(-) diff --git a/roar/application/publish/put_execution.py b/roar/application/publish/put_execution.py index 0eb4e266..60c3ace7 100644 --- a/roar/application/publish/put_execution.py +++ b/roar/application/publish/put_execution.py @@ -23,7 +23,9 @@ normalize_registration_source_type, prepare_batch_registration_artifacts, register_publish_lineage, + sync_publish_labels, ) +from ...application.system_labels import refresh_job_system_labels from ...core.interfaces.registration import GitContext from ...core.logging import get_logger from ...db.context import DatabaseContext @@ -398,6 +400,7 @@ def put_prepared( job_type="put", exit_code=0, ) + refresh_job_system_labels(self._db, job_id=job_id) self._logger.debug( "Job created: id=%s, uid=%s, step=%d", job_id, @@ -414,7 +417,7 @@ def put_prepared( ) with Spinner("Finalizing lineage links...") as spin: spin.update("Registering put job...") - self._register_put_job_with_glaas( + put_job_registered = self._register_put_job_with_glaas( coordinator=coordinator, command=command, session_hash=session_hash_value, @@ -434,6 +437,15 @@ def put_prepared( composite_registrations=composite_registrations, registration_errors=registration_result.errors, ) + if put_job_registered: + spin.update("Syncing put job labels...") + self._sync_put_job_labels_with_glaas( + glaas_client=client, + session_hash=session_hash_value, + job_id=job_id, + job_uid=job_uid, + registration_errors=registration_result.errors, + ) registration_error = ( "; ".join(registration_result.errors) if registration_result.errors else None @@ -552,7 +564,7 @@ def _register_put_job_with_glaas( step_number: int, metadata_json: str, registration_errors: list[str], - ) -> None: + ) -> bool: """Create the put sink node in GLaaS.""" self._logger.debug("Registering put job with GLaaS: job_uid=%s, job_type=put", job_uid) put_job_result = coordinator.job_service.create_job( @@ -572,6 +584,28 @@ def _register_put_job_with_glaas( self._logger.debug("Put job GLaaS registration failed: %s", put_job_result.error) if put_job_result.error: registration_errors.append(f"Put job: {put_job_result.error}") + return False + return True + + def _sync_put_job_labels_with_glaas( + self, + *, + glaas_client: Any, + session_hash: str, + job_id: int, + job_uid: str, + registration_errors: list[str], + ) -> None: + """Sync the local current label document for the publish-time put job.""" + sync_publish_labels( + glaas_client=glaas_client, + db_ctx=self._db, + session_id=None, + session_hash=session_hash, + jobs=[{"id": job_id, "job_uid": job_uid}], + artifacts=[], + errors=registration_errors, + ) def _link_put_job_artifacts_with_glaas( self, diff --git a/roar/application/publish/registration.py b/roar/application/publish/registration.py index 12ba0d8b..1c837f8c 100644 --- a/roar/application/publish/registration.py +++ b/roar/application/publish/registration.py @@ -415,13 +415,13 @@ def sync_publish_labels( *, glaas_client: GlaasClient, db_ctx: Any, - session_id: int, + session_id: int | None, session_hash: str, jobs: list[dict[str, Any]], artifacts: list[dict[str, Any]], errors: list[str] | None = None, ) -> None: - """Sync publish labels to GLaaS and record any error on the supplied list.""" + """Sync current local labels for published entities to GLaaS.""" payloads = collect_label_sync_payloads( db_ctx, session_id=session_id, diff --git a/tests/integration/test_put_cli_integration.py b/tests/integration/test_put_cli_integration.py index 38ec0787..6b3a2795 100644 --- a/tests/integration/test_put_cli_integration.py +++ b/tests/integration/test_put_cli_integration.py @@ -141,7 +141,20 @@ def test_put_registers_lineage_with_fake_glaas_and_updates_local_dag( batch_jobs = fake_glaas_publish_server.job_batches[0]["jobs"] assert any(job.get("job_type") == "run" for job in batch_jobs) - assert fake_glaas_publish_server.job_creates[0]["job"]["job_type"] == "put" + put_job = fake_glaas_publish_server.job_creates[0]["job"] + assert put_job["job_type"] == "put" + + synced_labels = [ + label + for batch in fake_glaas_publish_server.label_syncs + for label in batch + if label.get("entity_type") == "job" and label.get("job_uid") == put_job["job_uid"] + ] + assert len(synced_labels) == 1 + put_job_label = synced_labels[0] + assert put_job_label["metadata"]["roar"]["operation"]["kind"] == "put" + assert put_job_label["metadata"]["roar"]["put"]["destination_type"] == "s3" + assert put_job_label["key_origins"]["roar.operation.kind"] == "system" def test_put_dry_run_does_not_create_local_or_remote_publish_jobs( diff --git a/tests/unit/put/test_put_service.py b/tests/unit/put/test_put_service.py index 48759adb..cbf92b92 100644 --- a/tests/unit/put/test_put_service.py +++ b/tests/unit/put/test_put_service.py @@ -161,6 +161,50 @@ def test_put_prepared_single_file_creates_job(self, tmp_path: Path) -> None: assert call_kwargs["job_type"] == "put" service._db.jobs.add_input.assert_called_once() + def test_put_prepared_refreshes_and_syncs_put_job_labels(self, tmp_path: Path) -> None: + model_file = tmp_path / "model.pt" + model_file.write_bytes(b"model data") + + db = _create_mock_db() + client = _create_mock_glaas_client() + service = PutService( + db_context=db, + backend=MemoryBackend(bucket="test-bucket", prefix="models"), + destination="memory://test-bucket/models", + repo_root=tmp_path, + lineage_collector=MagicMock(return_value=LineageData()), + registration_coordinator=_create_mock_coordinator(), + ) + service._lineage_collector.collect.return_value = LineageData( + jobs=[], + artifacts=[], + artifact_hashes=set(), + pipeline={"id": 1}, + ) + + with ( + patch( + "roar.application.publish.put_execution.refresh_job_system_labels" + ) as refresh_labels, + patch("roar.application.publish.put_execution.sync_publish_labels") as sync_labels, + ): + result = service.put_prepared( + prepared=_prepared_put(tmp_path, sources=[model_file], glaas_client=client), + sources=[str(model_file)], + message="publish model", + ) + + assert result.success is True + refresh_labels.assert_called_once_with(db, job_id=42) + sync_labels.assert_called_once() + sync_kwargs = sync_labels.call_args.kwargs + assert sync_kwargs["glaas_client"] is client + assert sync_kwargs["db_ctx"] is db + assert sync_kwargs["session_id"] is None + assert sync_kwargs["session_hash"] == "session_hash_abc123" + assert sync_kwargs["jobs"] == [{"id": 42, "job_uid": "job-uid-1"}] + assert sync_kwargs["artifacts"] == [] + def test_put_prepared_returns_registered_session_info(self, tmp_path: Path) -> None: model_file = tmp_path / "model.pt" model_file.write_bytes(b"model data") From 9cce389b75447541725b859c6ddd985e91d4222b Mon Sep 17 00:00:00 2001 From: Trevor Basinger Date: Fri, 17 Apr 2026 15:48:12 +0000 Subject: [PATCH 5/5] test(labels): expect synced job system fields --- tests/live_glaas/test_labels_live.py | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/tests/live_glaas/test_labels_live.py b/tests/live_glaas/test_labels_live.py index 1066cbcf..4522c56d 100644 --- a/tests/live_glaas/test_labels_live.py +++ b/tests/live_glaas/test_labels_live.py @@ -274,6 +274,20 @@ def _remote_session_label_rows( ) +def _assert_synced_run_job_label_metadata( + metadata: dict[str, object], + *, + phase: str, +) -> None: + assert metadata.get("phase") == phase + roar = metadata.get("roar") + assert isinstance(roar, dict), metadata + assert roar.get("schema_version") == 1 + operation = roar.get("operation") + assert isinstance(operation, dict), roar + assert operation.get("kind") == "run" + + def test_register_syncs_current_local_labels_only_when_register_called( glaas_configured: Path, glaas_db_queryable, @@ -311,9 +325,11 @@ def test_register_syncs_current_local_labels_only_when_register_called( assert _remote_session_label_rows(glaas_url, session_hash) == [ (1, {"experiment": "ablation-7", "project": "forecasting"}) ] - assert _remote_job_label_rows(glaas_url, session_hash, job_uid) == [ - (1, {"phase": "preprocess"}) - ] + job_rows = _remote_job_label_rows(glaas_url, session_hash, job_uid) + assert len(job_rows) == 1 + version, job_metadata = job_rows[0] + assert version == 1 + _assert_synced_run_job_label_metadata(job_metadata, phase="preprocess") assert _remote_artifact_label_rows(glaas_url, artifact_hash) == [ (1, {"owner": "ml", "stage": "gold"}) ] @@ -398,11 +414,13 @@ def test_register_exposes_current_labels_via_label_api( "id": job_label["id"], "entityType": "job", "version": 1, - "metadata": {"phase": "preprocess"}, + "metadata": job_label["metadata"], "createdAt": job_label["createdAt"], "sessionHash": session_hash, "jobUid": job_uid, } + assert isinstance(job_label["metadata"], dict) + _assert_synced_run_job_label_metadata(job_label["metadata"], phase="preprocess") artifact_label = _label_api_get( glaas_url,