Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 20 additions & 8 deletions roar/application/labels.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from ..db.context import DatabaseContext
from .label_rendering import flatten_label_metadata
from .publish.lineage import LineageCollector
from .publish.remote_job_uids import build_remote_publication_job_uid
from .system_labels import is_reserved_system_label_path, strip_reserved_system_labels


Expand Down Expand Up @@ -251,6 +252,7 @@ def build_remote_label_mutation_payload(
roar_dir: Path,
target: LabelTargetRef,
metadata: dict[str, Any],
prefer_remote_publication_uid: bool = True,
) -> dict[str, Any]:
"""Build a GLaaS label-mutation payload for one local target."""
if target.entity_type == "dag":
Expand Down Expand Up @@ -278,14 +280,20 @@ def build_remote_label_mutation_payload(
raise ValueError("Job target is missing a local session id.")
if not isinstance(job_uid, str) or not job_uid:
raise ValueError("Job target is missing a job UID.")
session_hash = _canonical_remote_session_hash(
db_ctx,
roar_dir=roar_dir,
session_id=session_id,
)
resolved_job_uid = (
build_remote_publication_job_uid(session_hash, job_uid)
if prefer_remote_publication_uid
else job_uid
)
return {
"entity_type": "job",
"session_hash": _canonical_remote_session_hash(
db_ctx,
roar_dir=roar_dir,
session_id=session_id,
),
"job_uid": job_uid,
"session_hash": session_hash,
"job_uid": resolved_job_uid,
"metadata": metadata,
}

Expand Down Expand Up @@ -359,9 +367,13 @@ def collect_label_sync_payloads(
for job in jobs:
job_id = job.get("id")
job_uid = job.get("job_uid")
remote_job_uid = job.get("remote_job_uid")
if not isinstance(job_id, int) or not isinstance(job_uid, str) or not job_uid:
continue
dedupe_key = ("job", job_uid)
resolved_remote_job_uid = (
remote_job_uid if isinstance(remote_job_uid, str) and remote_job_uid else job_uid
)
dedupe_key = ("job", resolved_remote_job_uid)
if dedupe_key in seen_jobs:
continue
seen_jobs.add(dedupe_key)
Expand All @@ -372,7 +384,7 @@ def collect_label_sync_payloads(
{
"entity_type": "job",
"session_hash": session_hash,
"job_uid": job_uid,
"job_uid": resolved_remote_job_uid,
"metadata": current["metadata"],
"key_origins": build_current_key_origins(history),
}
Expand Down
Loading
Loading