diff --git a/src/murfey/server/api/__init__.py b/src/murfey/server/api/__init__.py index f39473eb5..f63ca27aa 100644 --- a/src/murfey/server/api/__init__.py +++ b/src/murfey/server/api/__init__.py @@ -22,6 +22,7 @@ Proposal, ) from PIL import Image +from prometheus_client import Counter, Gauge from pydantic import BaseModel from sqlalchemy import func from sqlalchemy.exc import OperationalError @@ -50,7 +51,7 @@ from murfey.server.api.spa import _cryolo_model_path from murfey.server.gain import Camera, prepare_eer_gain, prepare_gain from murfey.server.murfey_db import murfey_db -from murfey.util import secure_path +from murfey.util import safe_run, secure_path from murfey.util.config import MachineConfig, from_file, settings from murfey.util.db import ( AutoProcProgram, @@ -1616,22 +1617,47 @@ def remove_session_by_id(session_id: MurfeySessionID, db=murfey_db): sessions_for_visit = db.exec( select(Session).where(Session.visit == session.visit) ).all() + # Don't remove prometheus metrics if there are other sessions using them if len(sessions_for_visit) == 1: - # Don't remove prometheus metrics if there are other sessions using them - try: - prom.monitoring_switch.remove(session.visit) - except KeyError: - pass + safe_run( + prom.monitoring_switch.remove, + args=(session.visit,), + label="monitoring_switch", + ) rsync_instances = db.exec( select(RsyncInstance).where(RsyncInstance.session_id == session_id) ).all() for ri in rsync_instances: - prom.seen_files.remove(ri.source, session.visit) - prom.transferred_files.remove(ri.source, session.visit) - prom.transferred_files_bytes.remove(ri.source, session.visit) - prom.seen_data_files.remove(ri.source, session.visit) - prom.transferred_data_files.remove(ri.source, session.visit) - prom.transferred_data_files_bytes.remove(ri.source, session.visit) + safe_run( + prom.seen_files.remove, + args=(ri.source, session.visit), + label="seen_files", + ) + safe_run( + prom.transferred_files.remove, + args=(ri.source, session.visit), + label="transferred_files", + ) + safe_run( + prom.transferred_files_bytes.remove, + args=(ri.source, session.visit), + label="transferred_files_bytes", + ) + safe_run( + prom.seen_data_files.remove, + args=(ri.source, session.visit), + label="seen_data_files", + ) + safe_run( + prom.transferred_data_files.remove, + args=(ri.source, session.visit), + label="transferred_data_files", + ) + safe_run( + prom.transferred_data_files_bytes.remove, + args=(ri.source, session.visit), + label="transferred_data_file_bytes", + ) collected_ids = db.exec( select(DataCollectionGroup, DataCollection, ProcessingJob) .where(DataCollectionGroup.session_id == session_id) @@ -1639,10 +1665,11 @@ def remove_session_by_id(session_id: MurfeySessionID, db=murfey_db): .where(ProcessingJob.dc_id == DataCollection.id) ).all() for c in collected_ids: - try: - prom.preprocessed_movies.remove(c[2].id) - except KeyError: - continue + safe_run( + prom.preprocessed_movies.remove, + args=(c[2].id,), + label="preprocessed_movies", + ) db.delete(session) db.commit() return @@ -1954,3 +1981,30 @@ def update_current_gain_ref( session.current_gain_ref = new_gain_ref.path db.add(session) db.commit() + + +@router.get("/prometheus/{metric_name}") +def inspect_prometheus_metrics( + metric_name: str, +): + """ + A debugging endpoint that returns the current contents of any Prometheus + gauges and counters that have been set up thus far. + """ + + # Extract the Prometheus metric defined in the Prometheus module + metric: Optional[Counter | Gauge] = getattr(prom, metric_name, None) + if metric is None or not isinstance(metric, (Counter, Gauge)): + raise LookupError("No matching metric was found") + + # Package contents into dict and return + results = {} + if hasattr(metric, "_metrics"): + for i, (label_tuple, sub_metric) in enumerate(metric._metrics.items()): + labels = dict(zip(metric._labelnames, label_tuple)) + labels["value"] = sub_metric._value.get() + results[i] = labels + return results + else: + value = metric._value.get() + return {"value": value} diff --git a/src/murfey/util/__init__.py b/src/murfey/util/__init__.py index 994043292..71613424f 100644 --- a/src/murfey/util/__init__.py +++ b/src/murfey/util/__init__.py @@ -4,7 +4,7 @@ from pathlib import Path from queue import Queue from threading import Thread -from typing import Optional +from typing import Any, Callable, Optional from uuid import uuid4 from werkzeug.utils import secure_filename @@ -132,3 +132,23 @@ def filter(self, record: logging.LogRecord) -> bool: if "." not in logger_name: return False logger_name = logger_name.rsplit(".", maxsplit=1)[0] + + +def safe_run( + func: Callable, + args: list | tuple = [], + kwargs: dict[str, Any] = {}, + label: str = "", +): + """ + A wrapper to encase individual functions in try-except blocks so that a warning + is raised if the function fails, but the process continues as normal otherwise. + """ + try: + return func(*args, **kwargs) + except Exception: + logger.warning( + f"Function {func.__name__!r} failed to run for object {label!r}", + exc_info=True, + ) + return None