From 19bf97d91017e227b28cf34d0d55f7dc98254cd4 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 26 Jan 2026 16:08:42 +0000 Subject: [PATCH 1/7] Add dcid message extraction and span injection logic to dispatcher service --- src/zocalo/service/dispatcher.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/zocalo/service/dispatcher.py b/src/zocalo/service/dispatcher.py index d6a30ee..5fb5ea7 100644 --- a/src/zocalo/service/dispatcher.py +++ b/src/zocalo/service/dispatcher.py @@ -12,7 +12,19 @@ import workflows.recipe from workflows.services.common_service import CommonService +from opentelemetry import trace +# Helper method to get dcid. Used for injecting it into current span +def _extract_dcid(params: dict) -> int | None: + if not isinstance(params, dict): + return None + + if dcid := params.get("ispyb_dcid"): + return dcid + if dcid := params.get("dcid"): + return dcid + + return None class Dispatcher(CommonService): """ @@ -205,6 +217,19 @@ def process(self, rw, header, message): recipe_id = parameters.get("guid") or str(uuid.uuid4()) parameters["guid"] = recipe_id + + # Extract DCID and set on trace span if OpenTelemetry is available + if trace is not None: + try: + span = trace.get_current_span() + if span and span.is_recording(): + dcid = _extract_dcid(parameters) + if dcid: + span.set_attribute("dcid", dcid) + self.log.debug(f"Set DCID {dcid} on trace span") + except Exception as e: + self.log.warning(f"Failed to set DCID on trace span: {e}") + if rw: # If we received a recipe wrapper then we already have a recipe_ID # attached to logs. Make a note of the downstream recipe ID so that From b6d1b5924ee6084b90938ad7e52cb2e14277cc81 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 26 Jan 2026 16:29:30 +0000 Subject: [PATCH 2/7] Add dcid message extraction and span injection logic to dispatcher service --- pyproject.toml | 1 + 1 file changed, 1 insertion(+) diff --git a/pyproject.toml b/pyproject.toml index b0baa53..30b222e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,6 +16,7 @@ dependencies = [ "requests", "pydantic>=2,<3", "workflows>=3.0", + "opentelemetry==1.20" ] [dependency-groups] From 365a26c1623ffa4f8b7b67be901522ebf73b3128 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Mon, 26 Jan 2026 16:32:51 +0000 Subject: [PATCH 3/7] Fix dependency to use opentelemetry-api rather than opentelemetry --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 30b222e..58f47e3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ dependencies = [ "requests", "pydantic>=2,<3", "workflows>=3.0", - "opentelemetry==1.20" + "opentelemetry-api==1.20" ] [dependency-groups] From 73e2021bbb88ccf840b17d143d14d26505fcdf99 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Mon, 26 Jan 2026 16:34:09 +0000 Subject: [PATCH 4/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/zocalo/service/dispatcher.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/zocalo/service/dispatcher.py b/src/zocalo/service/dispatcher.py index 5fb5ea7..bbc3c32 100644 --- a/src/zocalo/service/dispatcher.py +++ b/src/zocalo/service/dispatcher.py @@ -11,14 +11,15 @@ from importlib.metadata import entry_points import workflows.recipe -from workflows.services.common_service import CommonService from opentelemetry import trace +from workflows.services.common_service import CommonService + # Helper method to get dcid. Used for injecting it into current span def _extract_dcid(params: dict) -> int | None: if not isinstance(params, dict): return None - + if dcid := params.get("ispyb_dcid"): return dcid if dcid := params.get("dcid"): @@ -26,6 +27,7 @@ def _extract_dcid(params: dict) -> int | None: return None + class Dispatcher(CommonService): """ Single point of contact service that takes in job meta-information @@ -217,8 +219,7 @@ def process(self, rw, header, message): recipe_id = parameters.get("guid") or str(uuid.uuid4()) parameters["guid"] = recipe_id - - # Extract DCID and set on trace span if OpenTelemetry is available + # Extract DCID and set on trace span if OpenTelemetry is available if trace is not None: try: span = trace.get_current_span() From b28101bb72f1ea816b2ae8f68baf880d72400cdf Mon Sep 17 00:00:00 2001 From: David Igandan Date: Wed, 11 Mar 2026 16:33:29 +0000 Subject: [PATCH 5/7] Remove unnecessary if guard on if OpenTelemetry is available --- pyproject.toml | 2 +- src/zocalo/service/dispatcher.py | 19 +++++++++---------- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 58f47e3..94f407d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -16,7 +16,7 @@ dependencies = [ "requests", "pydantic>=2,<3", "workflows>=3.0", - "opentelemetry-api==1.20" + "opentelemetry-api" ] [dependency-groups] diff --git a/src/zocalo/service/dispatcher.py b/src/zocalo/service/dispatcher.py index bbc3c32..edcc790 100644 --- a/src/zocalo/service/dispatcher.py +++ b/src/zocalo/service/dispatcher.py @@ -220,16 +220,15 @@ def process(self, rw, header, message): parameters["guid"] = recipe_id # Extract DCID and set on trace span if OpenTelemetry is available - if trace is not None: - try: - span = trace.get_current_span() - if span and span.is_recording(): - dcid = _extract_dcid(parameters) - if dcid: - span.set_attribute("dcid", dcid) - self.log.debug(f"Set DCID {dcid} on trace span") - except Exception as e: - self.log.warning(f"Failed to set DCID on trace span: {e}") + try: + span = trace.get_current_span() + if span and span.is_recording(): + dcid = _extract_dcid(parameters) + if dcid: + span.set_attribute("dcid", dcid) + self.log.debug(f"Set DCID {dcid} on trace span") + except Exception as e: + self.log.warning(f"Failed to set DCID on trace span: {e}") if rw: # If we received a recipe wrapper then we already have a recipe_ID From 7c4673d79ffe24a4d8524b290a5c8596c07aa3c1 Mon Sep 17 00:00:00 2001 From: David Igandan Date: Wed, 11 Mar 2026 22:29:38 +0000 Subject: [PATCH 6/7] Removed redundant guards; minor fixes --- src/zocalo/service/dispatcher.py | 29 ++++++++++------------------- 1 file changed, 10 insertions(+), 19 deletions(-) diff --git a/src/zocalo/service/dispatcher.py b/src/zocalo/service/dispatcher.py index edcc790..fb8eb6c 100644 --- a/src/zocalo/service/dispatcher.py +++ b/src/zocalo/service/dispatcher.py @@ -15,17 +15,11 @@ from workflows.services.common_service import CommonService -# Helper method to get dcid. Used for injecting it into current span -def _extract_dcid(params: dict) -> int | None: - if not isinstance(params, dict): - return None - if dcid := params.get("ispyb_dcid"): - return dcid - if dcid := params.get("dcid"): - return dcid +def _extract_dcid(params: dict) -> int | None: + """Helper method to get dcid. Used for injecting it into current span""" + return params.get("ispyb_dcid") or params.get("dcid") - return None class Dispatcher(CommonService): @@ -220,16 +214,13 @@ def process(self, rw, header, message): parameters["guid"] = recipe_id # Extract DCID and set on trace span if OpenTelemetry is available - try: - span = trace.get_current_span() - if span and span.is_recording(): - dcid = _extract_dcid(parameters) - if dcid: - span.set_attribute("dcid", dcid) - self.log.debug(f"Set DCID {dcid} on trace span") - except Exception as e: - self.log.warning(f"Failed to set DCID on trace span: {e}") - + span = trace.get_current_span() + if span.is_recording(): + dcid = _extract_dcid(parameters) + if dcid: + span.set_attribute("dcid", dcid) + self.log.debug(f"Set DCID {dcid} on trace span") + if rw: # If we received a recipe wrapper then we already have a recipe_ID # attached to logs. Make a note of the downstream recipe ID so that From 454165352eec6cfe9be47e170f0f534b486168b6 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 11 Mar 2026 22:30:33 +0000 Subject: [PATCH 7/7] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- src/zocalo/service/dispatcher.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/zocalo/service/dispatcher.py b/src/zocalo/service/dispatcher.py index fb8eb6c..465d1b7 100644 --- a/src/zocalo/service/dispatcher.py +++ b/src/zocalo/service/dispatcher.py @@ -15,13 +15,11 @@ from workflows.services.common_service import CommonService - def _extract_dcid(params: dict) -> int | None: """Helper method to get dcid. Used for injecting it into current span""" return params.get("ispyb_dcid") or params.get("dcid") - class Dispatcher(CommonService): """ Single point of contact service that takes in job meta-information @@ -220,7 +218,7 @@ def process(self, rw, header, message): if dcid: span.set_attribute("dcid", dcid) self.log.debug(f"Set DCID {dcid} on trace span") - + if rw: # If we received a recipe wrapper then we already have a recipe_ID # attached to logs. Make a note of the downstream recipe ID so that