From 6bda176f0915c6e9c439c57d7d4cb75c5e80b00f Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 16 Apr 2025 18:48:29 +0100 Subject: [PATCH 1/5] Skip all prometheus data deletion errors since not all setups will have it --- src/murfey/server/api/__init__.py | 38 +++++++++++++++---------------- 1 file changed, 18 insertions(+), 20 deletions(-) diff --git a/src/murfey/server/api/__init__.py b/src/murfey/server/api/__init__.py index f39473eb5..57a628284 100644 --- a/src/murfey/server/api/__init__.py +++ b/src/murfey/server/api/__init__.py @@ -1616,33 +1616,31 @@ def remove_session_by_id(session_id: MurfeySessionID, db=murfey_db): sessions_for_visit = db.exec( select(Session).where(Session.visit == session.visit) ).all() - if len(sessions_for_visit) == 1: - # Don't remove prometheus metrics if there are other sessions using them - try: - prom.monitoring_switch.remove(session.visit) - except KeyError: - pass - rsync_instances = db.exec( - select(RsyncInstance).where(RsyncInstance.session_id == session_id) - ).all() - for ri in rsync_instances: - prom.seen_files.remove(ri.source, session.visit) - prom.transferred_files.remove(ri.source, session.visit) - prom.transferred_files_bytes.remove(ri.source, session.visit) - prom.seen_data_files.remove(ri.source, session.visit) - prom.transferred_data_files.remove(ri.source, session.visit) - prom.transferred_data_files_bytes.remove(ri.source, session.visit) collected_ids = db.exec( select(DataCollectionGroup, DataCollection, ProcessingJob) .where(DataCollectionGroup.session_id == session_id) .where(DataCollection.dcg_id == DataCollectionGroup.id) .where(ProcessingJob.dc_id == DataCollection.id) ).all() - for c in collected_ids: - try: + # Ignore key errors when deleting Prometheus entries (it might not be set up) + try: + # Don't remove prometheus metrics if there are other sessions using them + if len(sessions_for_visit) == 1: + rsync_instances = db.exec( + select(RsyncInstance).where(RsyncInstance.session_id == session_id) + ).all() + prom.monitoring_switch.remove(session.visit) + for ri in rsync_instances: + prom.seen_files.remove(ri.source, session.visit) + prom.transferred_files.remove(ri.source, session.visit) + prom.transferred_files_bytes.remove(ri.source, session.visit) + prom.seen_data_files.remove(ri.source, session.visit) + prom.transferred_data_files.remove(ri.source, session.visit) + prom.transferred_data_files_bytes.remove(ri.source, session.visit) + for c in collected_ids: prom.preprocessed_movies.remove(c[2].id) - except KeyError: - continue + except KeyError: + pass db.delete(session) db.commit() return From a44ec98d35e3758fdbc086f0b5d51a05986e7567 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Wed, 16 Apr 2025 19:06:02 +0100 Subject: [PATCH 2/5] More granular try-except blocks --- src/murfey/server/api/__init__.py | 39 +++++++++++++++++-------------- 1 file changed, 22 insertions(+), 17 deletions(-) diff --git a/src/murfey/server/api/__init__.py b/src/murfey/server/api/__init__.py index 57a628284..32c4a1eb9 100644 --- a/src/murfey/server/api/__init__.py +++ b/src/murfey/server/api/__init__.py @@ -1616,31 +1616,36 @@ def remove_session_by_id(session_id: MurfeySessionID, db=murfey_db): sessions_for_visit = db.exec( select(Session).where(Session.visit == session.visit) ).all() - collected_ids = db.exec( - select(DataCollectionGroup, DataCollection, ProcessingJob) - .where(DataCollectionGroup.session_id == session_id) - .where(DataCollection.dcg_id == DataCollectionGroup.id) - .where(ProcessingJob.dc_id == DataCollection.id) - ).all() - # Ignore key errors when deleting Prometheus entries (it might not be set up) - try: - # Don't remove prometheus metrics if there are other sessions using them - if len(sessions_for_visit) == 1: - rsync_instances = db.exec( - select(RsyncInstance).where(RsyncInstance.session_id == session_id) - ).all() + # Don't remove prometheus metrics if there are other sessions using them + if len(sessions_for_visit) == 1: + try: prom.monitoring_switch.remove(session.visit) - for ri in rsync_instances: + except KeyError: + pass + rsync_instances = db.exec( + select(RsyncInstance).where(RsyncInstance.session_id == session_id) + ).all() + for ri in rsync_instances: + try: prom.seen_files.remove(ri.source, session.visit) prom.transferred_files.remove(ri.source, session.visit) prom.transferred_files_bytes.remove(ri.source, session.visit) prom.seen_data_files.remove(ri.source, session.visit) prom.transferred_data_files.remove(ri.source, session.visit) prom.transferred_data_files_bytes.remove(ri.source, session.visit) - for c in collected_ids: + except KeyError: + pass + collected_ids = db.exec( + select(DataCollectionGroup, DataCollection, ProcessingJob) + .where(DataCollectionGroup.session_id == session_id) + .where(DataCollection.dcg_id == DataCollectionGroup.id) + .where(ProcessingJob.dc_id == DataCollection.id) + ).all() + for c in collected_ids: + try: prom.preprocessed_movies.remove(c[2].id) - except KeyError: - pass + except KeyError: + continue db.delete(session) db.commit() return From e3ec1ae31fc236a42d14750fa8ff67134888d825 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 17 Apr 2025 13:39:05 +0100 Subject: [PATCH 3/5] Encased prometheus metric deletion calls in utility function to notify about failures --- src/murfey/server/api/__init__.py | 82 ++++++++++++++++++++++++------- src/murfey/util/__init__.py | 21 +++++++- 2 files changed, 84 insertions(+), 19 deletions(-) diff --git a/src/murfey/server/api/__init__.py b/src/murfey/server/api/__init__.py index 32c4a1eb9..fe21ded27 100644 --- a/src/murfey/server/api/__init__.py +++ b/src/murfey/server/api/__init__.py @@ -22,6 +22,7 @@ Proposal, ) from PIL import Image +from prometheus_client import Counter, Gauge from pydantic import BaseModel from sqlalchemy import func from sqlalchemy.exc import OperationalError @@ -50,7 +51,7 @@ from murfey.server.api.spa import _cryolo_model_path from murfey.server.gain import Camera, prepare_eer_gain, prepare_gain from murfey.server.murfey_db import murfey_db -from murfey.util import secure_path +from murfey.util import safe_run, secure_path from murfey.util.config import MachineConfig, from_file, settings from murfey.util.db import ( AutoProcProgram, @@ -1618,23 +1619,45 @@ def remove_session_by_id(session_id: MurfeySessionID, db=murfey_db): ).all() # Don't remove prometheus metrics if there are other sessions using them if len(sessions_for_visit) == 1: - try: - prom.monitoring_switch.remove(session.visit) - except KeyError: - pass + safe_run( + prom.monitoring_switch.remove, + args=(session.visit,), + label="monitoring_switch", + ) rsync_instances = db.exec( select(RsyncInstance).where(RsyncInstance.session_id == session_id) ).all() for ri in rsync_instances: - try: - prom.seen_files.remove(ri.source, session.visit) - prom.transferred_files.remove(ri.source, session.visit) - prom.transferred_files_bytes.remove(ri.source, session.visit) - prom.seen_data_files.remove(ri.source, session.visit) - prom.transferred_data_files.remove(ri.source, session.visit) - prom.transferred_data_files_bytes.remove(ri.source, session.visit) - except KeyError: - pass + safe_run( + prom.seen_files.remove, + args=(ri.source, session.visit), + label="seen_files", + ) + safe_run( + prom.transferred_files.remove, + args=(ri.source, session.visit), + label="transferred_files", + ) + safe_run( + prom.transferred_files_bytes.remove, + args=(ri.source, session.visit), + label="transferred_files_bytes", + ) + safe_run( + prom.seen_data_files.remove, + args=(ri.source, session.visit), + label="seen_data_files", + ) + safe_run( + prom.transferred_data_files.remove, + args=(ri.source, session.visit), + label="transferred_data_files", + ) + safe_run( + prom.transferred_data_files_bytes.remove, + args=(ri.source, session.visit), + label="transferred_data_file_bytes", + ) collected_ids = db.exec( select(DataCollectionGroup, DataCollection, ProcessingJob) .where(DataCollectionGroup.session_id == session_id) @@ -1642,10 +1665,11 @@ def remove_session_by_id(session_id: MurfeySessionID, db=murfey_db): .where(ProcessingJob.dc_id == DataCollection.id) ).all() for c in collected_ids: - try: - prom.preprocessed_movies.remove(c[2].id) - except KeyError: - continue + safe_run( + prom.preprocessed_movies.remove, + args=(c[2].id,), + label="preprocessed_movies", + ) db.delete(session) db.commit() return @@ -1957,3 +1981,25 @@ def update_current_gain_ref( session.current_gain_ref = new_gain_ref.path db.add(session) db.commit() + + +@router.get("/prometheus/{metric_name}") +def inspect_prometheus_metrics( + metric_name: str, +): + # Extract the Prometheus metric defined in the Prometheus module + metric: Optional[Counter | Gauge] = getattr(prom, metric_name, None) + if metric is None or not isinstance(metric, (Counter, Gauge)): + raise LookupError("No matching metric was found") + + # Print out contents + results = {} + if hasattr(metric, "_metrics"): + for i, (label_tuple, sub_metric) in enumerate(metric._metrics.items()): + labels = dict(zip(metric._labelnames, label_tuple)) + labels["value"] = sub_metric._value.get() + results[i] = labels + return results + else: + value = metric._value.get() + return {"value": value} diff --git a/src/murfey/util/__init__.py b/src/murfey/util/__init__.py index 994043292..67aced69e 100644 --- a/src/murfey/util/__init__.py +++ b/src/murfey/util/__init__.py @@ -4,7 +4,7 @@ from pathlib import Path from queue import Queue from threading import Thread -from typing import Optional +from typing import Any, Callable, Optional from uuid import uuid4 from werkzeug.utils import secure_filename @@ -132,3 +132,22 @@ def filter(self, record: logging.LogRecord) -> bool: if "." not in logger_name: return False logger_name = logger_name.rsplit(".", maxsplit=1)[0] + + +def safe_run( + func: Callable, + args: list | tuple = [], + kwargs: dict[str, Any] = {}, + label: str = "", +): + """ + A wrapper to encase individual functions in try-except blocks so that a warning + is raised if the function fails, but the process continues as normal otherwise. + """ + try: + return func(*args, **kwargs) + except Exception: + logger.warning( + f"Function {func.__name__!r} failed to run for object {label!r}", + exc_info=True, + ) From 6e91a65bfa1f1ba3b0fe280f958fb076b2290175 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 17 Apr 2025 13:45:10 +0100 Subject: [PATCH 4/5] Added function to inspect contents of the prometheus gauges and counters that have been set up --- src/murfey/server/api/__init__.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/murfey/server/api/__init__.py b/src/murfey/server/api/__init__.py index fe21ded27..f63ca27aa 100644 --- a/src/murfey/server/api/__init__.py +++ b/src/murfey/server/api/__init__.py @@ -1987,12 +1987,17 @@ def update_current_gain_ref( def inspect_prometheus_metrics( metric_name: str, ): + """ + A debugging endpoint that returns the current contents of any Prometheus + gauges and counters that have been set up thus far. + """ + # Extract the Prometheus metric defined in the Prometheus module metric: Optional[Counter | Gauge] = getattr(prom, metric_name, None) if metric is None or not isinstance(metric, (Counter, Gauge)): raise LookupError("No matching metric was found") - # Print out contents + # Package contents into dict and return results = {} if hasattr(metric, "_metrics"): for i, (label_tuple, sub_metric) in enumerate(metric._metrics.items()): From 2a9ed08dde95a2a147b87bc5bfa2feeaf0ba209c Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Thu, 17 Apr 2025 13:48:38 +0100 Subject: [PATCH 5/5] Added explicit return --- src/murfey/util/__init__.py | 1 + 1 file changed, 1 insertion(+) diff --git a/src/murfey/util/__init__.py b/src/murfey/util/__init__.py index 67aced69e..71613424f 100644 --- a/src/murfey/util/__init__.py +++ b/src/murfey/util/__init__.py @@ -151,3 +151,4 @@ def safe_run( f"Function {func.__name__!r} failed to run for object {label!r}", exc_info=True, ) + return None