From a93c08e7c8b4c0fc6161ccc6625c6896b47a74f5 Mon Sep 17 00:00:00 2001 From: Alessandro Date: Tue, 24 Feb 2026 15:04:42 +0100 Subject: [PATCH 1/8] feat: get apps from R1FS --- extensions/business/deeploy/deeploy_const.py | 1 + .../business/deeploy/deeploy_manager_api.py | 3 +- extensions/business/deeploy/deeploy_mixin.py | 127 ++++++++++++++++++ 3 files changed, 130 insertions(+), 1 deletion(-) diff --git a/extensions/business/deeploy/deeploy_const.py b/extensions/business/deeploy/deeploy_const.py index 44f8ca84..f4271dd9 100644 --- a/extensions/business/deeploy/deeploy_const.py +++ b/extensions/business/deeploy/deeploy_const.py @@ -43,6 +43,7 @@ class DEEPLOY_KEYS: PIPELINE_PARAMS = "pipeline_params" PIPELINE = "pipeline" PIPELINE_CID = "pipeline_cid" + ONLINE = "online" JOB_CONFIG = "job_config" # App params keys APP_PARAMS = "app_params" diff --git a/extensions/business/deeploy/deeploy_manager_api.py b/extensions/business/deeploy/deeploy_manager_api.py index 063fee34..237e7110 100644 --- a/extensions/business/deeploy/deeploy_manager_api.py +++ b/extensions/business/deeploy/deeploy_manager_api.py @@ -184,7 +184,8 @@ def get_apps( sender, inputs = self.deeploy_verify_and_get_inputs(request) auth_result = self.deeploy_get_auth_result(inputs) - apps = self._get_online_apps( + apps = self._get_apps_by_escrow_active_jobs( + sender_escrow=auth_result[DEEPLOY_KEYS.SENDER_ESCROW], owner=auth_result[DEEPLOY_KEYS.ESCROW_OWNER], project_id=inputs.get(DEEPLOY_KEYS.PROJECT_ID, None) ) diff --git a/extensions/business/deeploy/deeploy_mixin.py b/extensions/business/deeploy/deeploy_mixin.py index 1462fcd3..d69c4811 100644 --- a/extensions/business/deeploy/deeploy_mixin.py +++ b/extensions/business/deeploy/deeploy_mixin.py @@ -2791,6 +2791,133 @@ def _get_online_apps(self, owner=None, target_nodes=None, job_id=None, project_i app_data["node_alias"] = node_alias return result + def _pipeline_matches_project_id(self, pipeline, project_id): + """ + Validate whether a raw pipeline payload belongs to a given project. + """ + if project_id is None: + return True + if not isinstance(pipeline, dict): + return False + + deeploy_specs = ( + pipeline.get(ct.CONFIG_STREAM.DEEPLOY_SPECS, None) or + pipeline.get(NetMonCt.DEEPLOY_SPECS, None) or + pipeline.get("DEEPLOY_SPECS", None) or + pipeline.get("deeploy_specs", None) + ) + if not isinstance(deeploy_specs, dict): + return False + return deeploy_specs.get(DEEPLOY_KEYS.PROJECT_ID, None) == project_id + + def _normalize_active_job_ids(self, active_job_ids): + """ + Normalize and deduplicate active job IDs returned by blockchain calls. + """ + normalized_job_ids = [] + seen = set() + + if not isinstance(active_job_ids, list): + return normalized_job_ids + + for raw_job_id in active_job_ids: + try: + parsed_job_id = int(raw_job_id) + except Exception: + self.Pd(f"Skipping invalid active job id '{raw_job_id}' returned by blockchain.", color='y') + continue + + if parsed_job_id in seen: + continue + seen.add(parsed_job_id) + normalized_job_ids.append(parsed_job_id) + + return normalized_job_ids + + def _get_apps_by_escrow_active_jobs(self, sender_escrow=None, owner=None, project_id=None): + """ + Build get_apps payload using active SC job IDs as source of truth, with optional online snapshots. + + Response shape: + { + "": { + "job_id": , + "pipeline_cid": , + "pipeline": , # raw R1FS payload + "online": , # online apps snapshot keyed by node + } + } + """ + result = {} + if not sender_escrow: + return result + + raw_active_job_ids = self.bc.get_escrow_active_job_ids(sender_escrow) + active_job_ids = self._normalize_active_job_ids(raw_active_job_ids) + self.Pd(f"Escrow {sender_escrow} active job ids: {active_job_ids}") + + for job_id in active_job_ids: + online_apps = self._get_online_apps( + owner=owner, + job_id=job_id, + project_id=project_id + ) + if not isinstance(online_apps, dict): + online_apps = {} + else: + normalized_online_apps = {} + for node, apps in online_apps.items(): + if not isinstance(apps, dict): + self.Pd(f"Skipping malformed online apps payload for node {node}.", color='y') + continue + normalized_online_apps[node] = dict(apps) + online_apps = normalized_online_apps + + pipeline_cid = None + pipeline = None + try: + pipeline_cid = self._get_pipeline_from_cstore(job_id) + if pipeline_cid: + pipeline = self.get_pipeline_from_r1fs(pipeline_cid) + except Exception as exc: + self.Pd(f"Failed to load R1FS payload for job {job_id}: {exc}", color='y') + pipeline = None + pipeline_cid = None + + if pipeline is not None and owner is not None: + pipeline_owner = ( + pipeline.get(ct.CONFIG_STREAM.K_OWNER, None) or + pipeline.get(NetMonCt.OWNER, None) or + pipeline.get("OWNER", None) or + pipeline.get("owner", None) + ) + if pipeline_owner is not None and pipeline_owner != owner: + self.Pd( + f"Skipping R1FS payload for job {job_id}: owner mismatch " + f"(expected {owner}, got {pipeline_owner}).", + color='y' + ) + pipeline = None + pipeline_cid = None + + if pipeline is not None and project_id is not None and not self._pipeline_matches_project_id(pipeline, project_id): + self.Pd(f"Skipping R1FS payload for job {job_id}: project_id mismatch.", color='y') + pipeline = None + pipeline_cid = None + + if pipeline is None and len(online_apps) == 0 and project_id is not None: + # If a project filter is requested and neither source matches, skip this job. + continue + + result[str(job_id)] = { + DEEPLOY_KEYS.JOB_ID: job_id, + DEEPLOY_KEYS.PIPELINE_CID: pipeline_cid if pipeline is not None else None, + DEEPLOY_KEYS.PIPELINE: pipeline if pipeline is not None else None, + DEEPLOY_KEYS.ONLINE: online_apps, + } + + return result + # TODO: REMOVE THIS, once instance_id is coming from ui for instances that have to be updated # Maybe add is_new_instance:bool for native apps, that want to add an extra plugin def _ensure_plugin_instance_ids(self, inputs, discovered_plugin_instances, owner=None, app_id=None, job_id=None): From 3bbd7c98ba2b099afda52a378f972a585313db23 Mon Sep 17 00:00:00 2001 From: Alessandro Date: Thu, 26 Feb 2026 14:52:31 +0100 Subject: [PATCH 2/8] feat: include chain job details in get_apps payload --- extensions/business/deeploy/deeploy_const.py | 1 + extensions/business/deeploy/deeploy_mixin.py | 91 ++++++++++++++++++-- 2 files changed, 87 insertions(+), 5 deletions(-) diff --git a/extensions/business/deeploy/deeploy_const.py b/extensions/business/deeploy/deeploy_const.py index f4271dd9..2afd71ef 100644 --- a/extensions/business/deeploy/deeploy_const.py +++ b/extensions/business/deeploy/deeploy_const.py @@ -44,6 +44,7 @@ class DEEPLOY_KEYS: PIPELINE = "pipeline" PIPELINE_CID = "pipeline_cid" ONLINE = "online" + CHAIN_JOB = "chain_job" JOB_CONFIG = "job_config" # App params keys APP_PARAMS = "app_params" diff --git a/extensions/business/deeploy/deeploy_mixin.py b/extensions/business/deeploy/deeploy_mixin.py index d69c4811..7ca18fe9 100644 --- a/extensions/business/deeploy/deeploy_mixin.py +++ b/extensions/business/deeploy/deeploy_mixin.py @@ -2834,6 +2834,82 @@ def _normalize_active_job_ids(self, active_job_ids): return normalized_job_ids + def _normalize_active_jobs(self, active_jobs): + """ + Normalize and deduplicate active job details returned by blockchain calls. + """ + normalized_active_jobs = [] + seen = set() + + if not isinstance(active_jobs, list): + return normalized_active_jobs + + for raw_job in active_jobs: + if not isinstance(raw_job, dict): + self.Pd(f"Skipping invalid active job payload '{raw_job}'.", color='y') + continue + + raw_job_id = raw_job.get("jobId", raw_job.get("id", None)) + try: + parsed_job_id = int(raw_job_id) + except Exception: + self.Pd(f"Skipping active job with invalid id '{raw_job_id}'.", color='y') + continue + + if parsed_job_id in seen: + continue + seen.add(parsed_job_id) + + normalized_active_jobs.append({ + "job_id": parsed_job_id, + "raw": raw_job, + }) + + return normalized_active_jobs + + def _serialize_chain_job(self, raw_job): + """ + Serialize chain job details for JSON APIs, keeping bigint-like values as strings. + """ + if not isinstance(raw_job, dict): + return None + + active_nodes = raw_job.get("activeNodes", []) + if not isinstance(active_nodes, list): + active_nodes = [] + + serialized = { + "id": str(raw_job.get("jobId", raw_job.get("id", ""))), + "projectHash": raw_job.get("projectHash"), + "requestTimestamp": str(raw_job.get("requestTimestamp", 0)), + "startTimestamp": str(raw_job.get("startTimestamp", 0)), + "lastNodesChangeTimestamp": str(raw_job.get("lastNodesChangeTimestamp", 0)), + "jobType": str(raw_job.get("jobType", 0)), + "pricePerEpoch": str(raw_job.get("pricePerEpoch", 0)), + "lastExecutionEpoch": str(raw_job.get("lastExecutionEpoch", 0)), + "numberOfNodesRequested": str(raw_job.get("numberOfNodesRequested", 0)), + "balance": str(raw_job.get("balance", 0)), + "lastAllocatedEpoch": str(raw_job.get("lastAllocatedEpoch", 0)), + "activeNodes": [str(node) for node in active_nodes], + "network": raw_job.get("network"), + "escrowAddress": raw_job.get("escrowAddress"), + } + return serialized + + def _chain_job_matches_project_id(self, chain_job, project_id): + """ + Validate whether serialized chain job details belong to the provided project. + """ + if project_id is None: + return True + if not isinstance(chain_job, dict): + return False + + chain_project_id = chain_job.get("projectHash", None) + if chain_project_id is None: + return False + return str(chain_project_id).lower() == str(project_id).lower() + def _get_apps_by_escrow_active_jobs(self, sender_escrow=None, owner=None, project_id=None): """ Build get_apps payload using active SC job IDs as source of truth, with optional online snapshots. @@ -2852,11 +2928,15 @@ def _get_apps_by_escrow_active_jobs(self, sender_escrow=None, owner=None, projec if not sender_escrow: return result - raw_active_job_ids = self.bc.get_escrow_active_job_ids(sender_escrow) - active_job_ids = self._normalize_active_job_ids(raw_active_job_ids) - self.Pd(f"Escrow {sender_escrow} active job ids: {active_job_ids}") + raw_active_jobs = self.bc.get_escrow_active_jobs(sender_escrow) + active_jobs = self._normalize_active_jobs(raw_active_jobs) + self.Pd(f"Escrow {sender_escrow} active jobs: {[job['job_id'] for job in active_jobs]}") + + for active_job in active_jobs: + job_id = active_job["job_id"] + chain_job = self._serialize_chain_job(active_job.get("raw", {})) + chain_matches_project = self._chain_job_matches_project_id(chain_job, project_id) - for job_id in active_job_ids: online_apps = self._get_online_apps( owner=owner, job_id=job_id, @@ -2905,7 +2985,7 @@ def _get_apps_by_escrow_active_jobs(self, sender_escrow=None, owner=None, projec pipeline = None pipeline_cid = None - if pipeline is None and len(online_apps) == 0 and project_id is not None: + if pipeline is None and len(online_apps) == 0 and project_id is not None and not chain_matches_project: # If a project filter is requested and neither source matches, skip this job. continue @@ -2914,6 +2994,7 @@ def _get_apps_by_escrow_active_jobs(self, sender_escrow=None, owner=None, projec DEEPLOY_KEYS.PIPELINE_CID: pipeline_cid if pipeline is not None else None, DEEPLOY_KEYS.PIPELINE: pipeline if pipeline is not None else None, DEEPLOY_KEYS.ONLINE: online_apps, + DEEPLOY_KEYS.CHAIN_JOB: chain_job, } return result From ec88033a9fbfb3e6498698856512b193de3f0771 Mon Sep 17 00:00:00 2001 From: Alessandro Date: Thu, 26 Feb 2026 15:51:47 +0100 Subject: [PATCH 3/8] feat: enhance job ID handling in online apps retrieval --- extensions/business/deeploy/deeploy_mixin.py | 48 +++++++++++++------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/extensions/business/deeploy/deeploy_mixin.py b/extensions/business/deeploy/deeploy_mixin.py index 7ca18fe9..d220537c 100644 --- a/extensions/business/deeploy/deeploy_mixin.py +++ b/extensions/business/deeploy/deeploy_mixin.py @@ -2769,10 +2769,16 @@ def _get_online_apps(self, owner=None, target_nodes=None, job_id=None, project_i filtered_result[node][app_name] = app_data result = filtered_result if job_id is not None: + if isinstance(job_id, int): + job_id = [job_id] + unique_job_ids = set() + for raw_value in job_id: + unique_job_ids.add(raw_value) filtered_result = self.defaultdict(dict) for node, apps in result.items(): for app_name, app_data in apps.items(): - if app_data.get(NetMonCt.DEEPLOY_SPECS, {}).get(DEEPLOY_KEYS.JOB_ID, None) != job_id: + app_job_id = app_data.get(NetMonCt.DEEPLOY_SPECS, {}).get(DEEPLOY_KEYS.JOB_ID, None) + if app_job_id not in unique_job_ids: continue filtered_result[node][app_name] = app_data result = filtered_result @@ -2932,26 +2938,36 @@ def _get_apps_by_escrow_active_jobs(self, sender_escrow=None, owner=None, projec active_jobs = self._normalize_active_jobs(raw_active_jobs) self.Pd(f"Escrow {sender_escrow} active jobs: {[job['job_id'] for job in active_jobs]}") + active_job_ids = [job["job_id"] for job in active_jobs] + + # Fetch online apps once, then reuse grouped entries per job_id. + all_online_apps = self._get_online_apps( + owner=owner, + job_id=active_job_ids, + project_id=project_id + ) + + online_apps_by_job_id = self.defaultdict(lambda: self.defaultdict(dict)) + if isinstance(all_online_apps, dict): + for node, apps in all_online_apps.items(): + if not isinstance(apps, dict): + self.Pd(f"Skipping malformed online apps payload for node {node}.", color='y') + continue + for app_name, app_data in apps.items(): + app_job_id = self._extract_app_job_id(app_data) + if app_job_id is None: + continue + online_apps_by_job_id[app_job_id][node][app_name] = app_data + for active_job in active_jobs: job_id = active_job["job_id"] chain_job = self._serialize_chain_job(active_job.get("raw", {})) chain_matches_project = self._chain_job_matches_project_id(chain_job, project_id) - online_apps = self._get_online_apps( - owner=owner, - job_id=job_id, - project_id=project_id - ) - if not isinstance(online_apps, dict): - online_apps = {} - else: - normalized_online_apps = {} - for node, apps in online_apps.items(): - if not isinstance(apps, dict): - self.Pd(f"Skipping malformed online apps payload for node {node}.", color='y') - continue - normalized_online_apps[node] = dict(apps) - online_apps = normalized_online_apps + online_apps = {} + grouped_online_apps = online_apps_by_job_id.get(job_id, {}) + if isinstance(grouped_online_apps, dict): + online_apps = {node: dict(apps) for node, apps in grouped_online_apps.items() if isinstance(apps, dict)} pipeline_cid = None pipeline = None From 0546a06e96c7ff3f2def069882b9bbca52a995ba Mon Sep 17 00:00:00 2001 From: Alessandro Date: Thu, 26 Feb 2026 16:04:22 +0100 Subject: [PATCH 4/8] fix --- extensions/business/deeploy/deeploy_mixin.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/business/deeploy/deeploy_mixin.py b/extensions/business/deeploy/deeploy_mixin.py index d220537c..8351b2fd 100644 --- a/extensions/business/deeploy/deeploy_mixin.py +++ b/extensions/business/deeploy/deeploy_mixin.py @@ -2954,7 +2954,7 @@ def _get_apps_by_escrow_active_jobs(self, sender_escrow=None, owner=None, projec self.Pd(f"Skipping malformed online apps payload for node {node}.", color='y') continue for app_name, app_data in apps.items(): - app_job_id = self._extract_app_job_id(app_data) + app_job_id = app_data.get(NetMonCt.DEEPLOY_SPECS, {}).get(DEEPLOY_KEYS.JOB_ID, None) if app_job_id is None: continue online_apps_by_job_id[app_job_id][node][app_name] = app_data From dba50542d30e9e03fb70d8724bc94c0f38e02d74 Mon Sep 17 00:00:00 2001 From: Alessandro Date: Thu, 26 Feb 2026 17:55:50 +0100 Subject: [PATCH 5/8] feat: simplify job handling and improve pipeline validation in deeploy mixin --- extensions/business/deeploy/deeploy_mixin.py | 161 +++---------------- 1 file changed, 24 insertions(+), 137 deletions(-) diff --git a/extensions/business/deeploy/deeploy_mixin.py b/extensions/business/deeploy/deeploy_mixin.py index 8351b2fd..82fc891a 100644 --- a/extensions/business/deeploy/deeploy_mixin.py +++ b/extensions/business/deeploy/deeploy_mixin.py @@ -2797,95 +2797,12 @@ def _get_online_apps(self, owner=None, target_nodes=None, job_id=None, project_i app_data["node_alias"] = node_alias return result - def _pipeline_matches_project_id(self, pipeline, project_id): - """ - Validate whether a raw pipeline payload belongs to a given project. - """ - if project_id is None: - return True - if not isinstance(pipeline, dict): - return False - - deeploy_specs = ( - pipeline.get(ct.CONFIG_STREAM.DEEPLOY_SPECS, None) or - pipeline.get(NetMonCt.DEEPLOY_SPECS, None) or - pipeline.get("DEEPLOY_SPECS", None) or - pipeline.get("deeploy_specs", None) - ) - if not isinstance(deeploy_specs, dict): - return False - return deeploy_specs.get(DEEPLOY_KEYS.PROJECT_ID, None) == project_id - - def _normalize_active_job_ids(self, active_job_ids): - """ - Normalize and deduplicate active job IDs returned by blockchain calls. - """ - normalized_job_ids = [] - seen = set() - - if not isinstance(active_job_ids, list): - return normalized_job_ids - - for raw_job_id in active_job_ids: - try: - parsed_job_id = int(raw_job_id) - except Exception: - self.Pd(f"Skipping invalid active job id '{raw_job_id}' returned by blockchain.", color='y') - continue - - if parsed_job_id in seen: - continue - seen.add(parsed_job_id) - normalized_job_ids.append(parsed_job_id) - - return normalized_job_ids - - def _normalize_active_jobs(self, active_jobs): - """ - Normalize and deduplicate active job details returned by blockchain calls. - """ - normalized_active_jobs = [] - seen = set() - - if not isinstance(active_jobs, list): - return normalized_active_jobs - - for raw_job in active_jobs: - if not isinstance(raw_job, dict): - self.Pd(f"Skipping invalid active job payload '{raw_job}'.", color='y') - continue - - raw_job_id = raw_job.get("jobId", raw_job.get("id", None)) - try: - parsed_job_id = int(raw_job_id) - except Exception: - self.Pd(f"Skipping active job with invalid id '{raw_job_id}'.", color='y') - continue - - if parsed_job_id in seen: - continue - seen.add(parsed_job_id) - - normalized_active_jobs.append({ - "job_id": parsed_job_id, - "raw": raw_job, - }) - - return normalized_active_jobs - def _serialize_chain_job(self, raw_job): """ Serialize chain job details for JSON APIs, keeping bigint-like values as strings. """ - if not isinstance(raw_job, dict): - return None - - active_nodes = raw_job.get("activeNodes", []) - if not isinstance(active_nodes, list): - active_nodes = [] - serialized = { - "id": str(raw_job.get("jobId", raw_job.get("id", ""))), + "id": str(raw_job.get("jobId", "")), "projectHash": raw_job.get("projectHash"), "requestTimestamp": str(raw_job.get("requestTimestamp", 0)), "startTimestamp": str(raw_job.get("startTimestamp", 0)), @@ -2896,7 +2813,7 @@ def _serialize_chain_job(self, raw_job): "numberOfNodesRequested": str(raw_job.get("numberOfNodesRequested", 0)), "balance": str(raw_job.get("balance", 0)), "lastAllocatedEpoch": str(raw_job.get("lastAllocatedEpoch", 0)), - "activeNodes": [str(node) for node in active_nodes], + "activeNodes": [str(node) for node in raw_job.get("activeNodes", [])], "network": raw_job.get("network"), "escrowAddress": raw_job.get("escrowAddress"), } @@ -2908,15 +2825,9 @@ def _chain_job_matches_project_id(self, chain_job, project_id): """ if project_id is None: return True - if not isinstance(chain_job, dict): - return False - - chain_project_id = chain_job.get("projectHash", None) - if chain_project_id is None: - return False - return str(chain_project_id).lower() == str(project_id).lower() + return str(chain_job["projectHash"]).lower() == str(project_id).lower() - def _get_apps_by_escrow_active_jobs(self, sender_escrow=None, owner=None, project_id=None): + def _get_apps_by_escrow_active_jobs(self, sender_escrow, owner, project_id=None): """ Build get_apps payload using active SC job IDs as source of truth, with optional online snapshots. @@ -2924,21 +2835,17 @@ def _get_apps_by_escrow_active_jobs(self, sender_escrow=None, owner=None, projec { "": { "job_id": , - "pipeline_cid": , - "pipeline": , # raw R1FS payload - "online": , # online apps snapshot keyed by node + "pipeline": , # raw R1FS payload + "chain_job": , # serialized chain job details + "online": , # online apps snapshot keyed by node } } """ result = {} - if not sender_escrow: - return result - - raw_active_jobs = self.bc.get_escrow_active_jobs(sender_escrow) - active_jobs = self._normalize_active_jobs(raw_active_jobs) - self.Pd(f"Escrow {sender_escrow} active jobs: {[job['job_id'] for job in active_jobs]}") - active_job_ids = [job["job_id"] for job in active_jobs] + active_jobs = self.bc.get_escrow_active_jobs(sender_escrow) + active_job_ids = [int(job["jobId"]) for job in active_jobs] + self.Pd(f"Fetched {len(active_job_ids)} active jobs from escrow {sender_escrow}, fetching details") # Fetch online apps once, then reuse grouped entries per job_id. all_online_apps = self._get_online_apps( @@ -2948,58 +2855,39 @@ def _get_apps_by_escrow_active_jobs(self, sender_escrow=None, owner=None, projec ) online_apps_by_job_id = self.defaultdict(lambda: self.defaultdict(dict)) - if isinstance(all_online_apps, dict): - for node, apps in all_online_apps.items(): - if not isinstance(apps, dict): - self.Pd(f"Skipping malformed online apps payload for node {node}.", color='y') - continue - for app_name, app_data in apps.items(): - app_job_id = app_data.get(NetMonCt.DEEPLOY_SPECS, {}).get(DEEPLOY_KEYS.JOB_ID, None) - if app_job_id is None: - continue - online_apps_by_job_id[app_job_id][node][app_name] = app_data + for node, apps in all_online_apps.items(): + for app_name, app_data in apps.items(): + app_job_id = int(app_data[NetMonCt.DEEPLOY_SPECS][DEEPLOY_KEYS.JOB_ID]) + online_apps_by_job_id[app_job_id][node][app_name] = app_data for active_job in active_jobs: - job_id = active_job["job_id"] - chain_job = self._serialize_chain_job(active_job.get("raw", {})) + job_id = int(active_job["jobId"]) + chain_job = self._serialize_chain_job(active_job) chain_matches_project = self._chain_job_matches_project_id(chain_job, project_id) - online_apps = {} grouped_online_apps = online_apps_by_job_id.get(job_id, {}) - if isinstance(grouped_online_apps, dict): - online_apps = {node: dict(apps) for node, apps in grouped_online_apps.items() if isinstance(apps, dict)} + online_apps = {node: dict(apps) for node, apps in grouped_online_apps.items()} - pipeline_cid = None pipeline = None try: - pipeline_cid = self._get_pipeline_from_cstore(job_id) - if pipeline_cid: - pipeline = self.get_pipeline_from_r1fs(pipeline_cid) + pipeline = self.get_job_pipeline_from_cstore(job_id) except Exception as exc: self.Pd(f"Failed to load R1FS payload for job {job_id}: {exc}", color='y') pipeline = None - pipeline_cid = None - - if pipeline is not None and owner is not None: - pipeline_owner = ( - pipeline.get(ct.CONFIG_STREAM.K_OWNER, None) or - pipeline.get(NetMonCt.OWNER, None) or - pipeline.get("OWNER", None) or - pipeline.get("owner", None) - ) - if pipeline_owner is not None and pipeline_owner != owner: + + if pipeline is not None: + pipeline_owner = pipeline[NetMonCt.OWNER] + if pipeline_owner != owner: self.Pd( f"Skipping R1FS payload for job {job_id}: owner mismatch " f"(expected {owner}, got {pipeline_owner}).", color='y' ) pipeline = None - pipeline_cid = None - if pipeline is not None and project_id is not None and not self._pipeline_matches_project_id(pipeline, project_id): + if pipeline.get(NetMonCt.DEEPLOY_SPECS, {}).get(DEEPLOY_KEYS.PROJECT_ID) != project_id: self.Pd(f"Skipping R1FS payload for job {job_id}: project_id mismatch.", color='y') pipeline = None - pipeline_cid = None if pipeline is None and len(online_apps) == 0 and project_id is not None and not chain_matches_project: # If a project filter is requested and neither source matches, skip this job. @@ -3007,8 +2895,7 @@ def _get_apps_by_escrow_active_jobs(self, sender_escrow=None, owner=None, projec result[str(job_id)] = { DEEPLOY_KEYS.JOB_ID: job_id, - DEEPLOY_KEYS.PIPELINE_CID: pipeline_cid if pipeline is not None else None, - DEEPLOY_KEYS.PIPELINE: pipeline if pipeline is not None else None, + DEEPLOY_KEYS.PIPELINE: pipeline, DEEPLOY_KEYS.ONLINE: online_apps, DEEPLOY_KEYS.CHAIN_JOB: chain_job, } From 240bfb5d372e38a35a993d071ac26d1495eeeac4 Mon Sep 17 00:00:00 2001 From: Alessandro Date: Tue, 10 Mar 2026 18:27:35 +0100 Subject: [PATCH 6/8] fixes --- extensions/business/deeploy/deeploy_mixin.py | 25 +++++++++++--------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/extensions/business/deeploy/deeploy_mixin.py b/extensions/business/deeploy/deeploy_mixin.py index 82fc891a..185a0659 100644 --- a/extensions/business/deeploy/deeploy_mixin.py +++ b/extensions/business/deeploy/deeploy_mixin.py @@ -2875,21 +2875,24 @@ def _get_apps_by_escrow_active_jobs(self, sender_escrow, owner, project_id=None) self.Pd(f"Failed to load R1FS payload for job {job_id}: {exc}", color='y') pipeline = None - if pipeline is not None: - pipeline_owner = pipeline[NetMonCt.OWNER] - if pipeline_owner != owner: - self.Pd( - f"Skipping R1FS payload for job {job_id}: owner mismatch " - f"(expected {owner}, got {pipeline_owner}).", - color='y' - ) - pipeline = None + if pipeline is None: + # If we don't have the details in R1FS, we skip the job. It can happen before the deploy, or for legacy jobs. + continue + + pipeline_owner = pipeline[NetMonCt.OWNER.upper()] + if pipeline_owner != owner: + self.Pd( + f"Skipping R1FS payload for job {job_id}: owner mismatch " + f"(expected {owner}, got {pipeline_owner}).", + color='y' + ) + continue if pipeline.get(NetMonCt.DEEPLOY_SPECS, {}).get(DEEPLOY_KEYS.PROJECT_ID) != project_id: self.Pd(f"Skipping R1FS payload for job {job_id}: project_id mismatch.", color='y') - pipeline = None + continue - if pipeline is None and len(online_apps) == 0 and project_id is not None and not chain_matches_project: + if len(online_apps) == 0 and project_id is not None and not chain_matches_project: # If a project filter is requested and neither source matches, skip this job. continue From dd961210452fd06aeef372329795d45977898e8a Mon Sep 17 00:00:00 2001 From: Alessandro Date: Tue, 10 Mar 2026 19:07:34 +0100 Subject: [PATCH 7/8] fixes --- extensions/business/deeploy/deeploy_mixin.py | 27 ++++++-------------- 1 file changed, 8 insertions(+), 19 deletions(-) diff --git a/extensions/business/deeploy/deeploy_mixin.py b/extensions/business/deeploy/deeploy_mixin.py index 185a0659..947dbbc5 100644 --- a/extensions/business/deeploy/deeploy_mixin.py +++ b/extensions/business/deeploy/deeploy_mixin.py @@ -2770,10 +2770,13 @@ def _get_online_apps(self, owner=None, target_nodes=None, job_id=None, project_i result = filtered_result if job_id is not None: if isinstance(job_id, int): - job_id = [job_id] - unique_job_ids = set() - for raw_value in job_id: - unique_job_ids.add(raw_value) + unique_job_ids = {job_id} + elif isinstance(job_id, list): + if not all(isinstance(value, int) for value in job_id): + raise ValueError("job_id must be int or list of int") + unique_job_ids = set(job_id) + else: + raise ValueError("job_id must be int or list of int") filtered_result = self.defaultdict(dict) for node, apps in result.items(): for app_name, app_data in apps.items(): @@ -2819,14 +2822,6 @@ def _serialize_chain_job(self, raw_job): } return serialized - def _chain_job_matches_project_id(self, chain_job, project_id): - """ - Validate whether serialized chain job details belong to the provided project. - """ - if project_id is None: - return True - return str(chain_job["projectHash"]).lower() == str(project_id).lower() - def _get_apps_by_escrow_active_jobs(self, sender_escrow, owner, project_id=None): """ Build get_apps payload using active SC job IDs as source of truth, with optional online snapshots. @@ -2835,7 +2830,7 @@ def _get_apps_by_escrow_active_jobs(self, sender_escrow, owner, project_id=None) { "": { "job_id": , - "pipeline": , # raw R1FS payload + "pipeline": , # raw R1FS payload "chain_job": , # serialized chain job details "online": , # online apps snapshot keyed by node } @@ -2863,8 +2858,6 @@ def _get_apps_by_escrow_active_jobs(self, sender_escrow, owner, project_id=None) for active_job in active_jobs: job_id = int(active_job["jobId"]) chain_job = self._serialize_chain_job(active_job) - chain_matches_project = self._chain_job_matches_project_id(chain_job, project_id) - grouped_online_apps = online_apps_by_job_id.get(job_id, {}) online_apps = {node: dict(apps) for node, apps in grouped_online_apps.items()} @@ -2892,10 +2885,6 @@ def _get_apps_by_escrow_active_jobs(self, sender_escrow, owner, project_id=None) self.Pd(f"Skipping R1FS payload for job {job_id}: project_id mismatch.", color='y') continue - if len(online_apps) == 0 and project_id is not None and not chain_matches_project: - # If a project filter is requested and neither source matches, skip this job. - continue - result[str(job_id)] = { DEEPLOY_KEYS.JOB_ID: job_id, DEEPLOY_KEYS.PIPELINE: pipeline, From cc2db32ea47c4ea785089ebbe039a014d142c1a8 Mon Sep 17 00:00:00 2001 From: Alessandro Date: Fri, 13 Mar 2026 14:16:11 +0100 Subject: [PATCH 8/8] fix --- extensions/business/deeploy/deeploy_mixin.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/extensions/business/deeploy/deeploy_mixin.py b/extensions/business/deeploy/deeploy_mixin.py index 947dbbc5..5b577297 100644 --- a/extensions/business/deeploy/deeploy_mixin.py +++ b/extensions/business/deeploy/deeploy_mixin.py @@ -2824,7 +2824,8 @@ def _serialize_chain_job(self, raw_job): def _get_apps_by_escrow_active_jobs(self, sender_escrow, owner, project_id=None): """ - Build get_apps payload using active SC job IDs as source of truth, with optional online snapshots. + Build get_apps payload using active SC job IDs as source of truth, + with snapshots from online nodes, if any. Response shape: { @@ -2881,7 +2882,7 @@ def _get_apps_by_escrow_active_jobs(self, sender_escrow, owner, project_id=None) ) continue - if pipeline.get(NetMonCt.DEEPLOY_SPECS, {}).get(DEEPLOY_KEYS.PROJECT_ID) != project_id: + if project_id is not None and pipeline.get(NetMonCt.DEEPLOY_SPECS, {}).get(DEEPLOY_KEYS.PROJECT_ID) != project_id: self.Pd(f"Skipping R1FS payload for job {job_id}: project_id mismatch.", color='y') continue