From ab9b1a9444fde100dfaf61634094fab30d3b8884 Mon Sep 17 00:00:00 2001 From: yxd92326 Date: Tue, 22 Jul 2025 11:34:01 +0100 Subject: [PATCH 1/9] A way to record skipped files in prometheus --- src/murfey/client/multigrid_control.py | 9 ++++++++- src/murfey/client/rsync.py | 8 ++++++-- src/murfey/server/api/prometheus.py | 9 +++++++++ src/murfey/server/prometheus.py | 6 ++++++ src/murfey/util/route_manifest.yaml | 7 +++++++ 5 files changed, 36 insertions(+), 3 deletions(-) diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index a016b49f4..22eeaa12c 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -607,8 +607,15 @@ def _increment_transferred_files_prometheus( requests.post(url, json=data) def _increment_transferred_files( - self, updates: List[RSyncerUpdate], source: str, destination: str + self, + updates: List[RSyncerUpdate], + num_skipped_files: int, + source: str, + destination: str, ): + skip_url = f"{str(self._environment.url.geturl())}{url_path_for('prometheus.router', 'increment_rsync_skipped_files_prometheus', visit_name=self._environment.visit, increment_count=num_skipped_files)}" + requests.post(skip_url, json={}) + checked_updates = [ update for update in updates if update.outcome is TransferResult.SUCCESS ] diff --git a/src/murfey/client/rsync.py b/src/murfey/client/rsync.py index de97d96ee..52facd0db 100644 --- a/src/murfey/client/rsync.py +++ b/src/murfey/client/rsync.py @@ -309,7 +309,7 @@ def _fake_transfer(self, files: list[Path]) -> bool: self.notify(update) updates.append(update) time.sleep(0.01) - self.notify([update], secondary=True) + self.notify([update], num_skipped_files=0, secondary=True) # self.notify(updates, secondary=True) return True @@ -328,8 +328,10 @@ def _transfer(self, infiles: list[Path]) -> bool: if f.is_file() and f.stat().st_ctime < self._end_time.timestamp() ] self._skipped_files.extend(set(infiles).difference(set(files))) + num_skipped_files = len(set(infiles).difference(set(files))) else: files = [f for f in infiles if f.is_file()] + num_skipped_files = 0 previously_transferred = self._files_transferred transfer_success: set[Path] = set() @@ -528,7 +530,9 @@ def parse_stderr(line: str): if success: success = result.returncode == 0 - self.notify(successful_updates, secondary=True) + self.notify( + successful_updates, num_skipped_files=num_skipped_files, secondary=True + ) # Print out a summary message for each file transfer batch instead of individual messages # List out file paths as stored in memory to see if issue is due to file path mismatch diff --git a/src/murfey/server/api/prometheus.py b/src/murfey/server/api/prometheus.py index c93e49b27..ae9aa6268 100644 --- a/src/murfey/server/api/prometheus.py +++ b/src/murfey/server/api/prometheus.py @@ -90,6 +90,15 @@ def increment_rsync_transferred_files_prometheus( ).inc(rsyncer_info.data_bytes) +@router.post( + "/visits/{visit_name}/increment_rsync_skipped_files_prometheus/{increment_count}" +) +def increment_rsync_skipped_files_prometheus( + visit_name: str, increment_count: int, db=murfey_db +): + prom.skipped_files.labels(visit=visit_name).inc(increment_count) + + @router.post("/visits/{visit_name}/monitoring/{on}") def change_monitoring_status(visit_name: str, on: int): prom.monitoring_switch.labels(visit=visit_name) diff --git a/src/murfey/server/prometheus.py b/src/murfey/server/prometheus.py index 0217f6e26..7cf39d310 100644 --- a/src/murfey/server/prometheus.py +++ b/src/murfey/server/prometheus.py @@ -24,6 +24,12 @@ ["rsync_source", "visit"], ) +skipped_files = Gauge( + "skipped_files", + "Number of files not transferred due to end time", + ["visit"], +) + preprocessed_movies = Counter( "preprocessed_movies", "Number of movies that have been preprocessed", diff --git a/src/murfey/util/route_manifest.yaml b/src/murfey/util/route_manifest.yaml index a92821c19..5ffed293d 100644 --- a/src/murfey/util/route_manifest.yaml +++ b/src/murfey/util/route_manifest.yaml @@ -642,6 +642,13 @@ murfey.server.api.prometheus.router: type: str methods: - POST + - path: /prometheus/visits/{visit_name}/increment_rsync_skipped_files_prometheus/{increment_count} + function: increment_rsync_skipped_files_prometheus + path_params: + - name: visit_name + type: str + - name: increment_count + type: int - path: /prometheus/visits/{visit_name}/monitoring/{on} function: change_monitoring_status path_params: From bf5277787370cf7b9cb19e4c69527390511f6f7b Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Fri, 25 Jul 2025 19:05:15 +0100 Subject: [PATCH 2/9] Added logic to reset the Prometheus skipped files Gauge if a flush was triggered for that RSyncer --- src/murfey/server/api/instrument.py | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index d3dd95275..7d5ae6d1f 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -13,6 +13,7 @@ from sqlmodel import select from werkzeug.utils import secure_filename +import murfey.server.prometheus as prom from murfey.server.api.auth import MurfeyInstrumentNameFrontend as MurfeyInstrumentName from murfey.server.api.auth import MurfeySessionIDFrontend as MurfeySessionID from murfey.server.api.auth import ( @@ -555,8 +556,8 @@ async def flush_skipped_rsyncer( db.commit() # Send request to flush rsyncer - data: dict = {} update_result: dict = {} + flush_result: dict = {} machine_config = get_machine_config(instrument_name=instrument_name)[ instrument_name ] @@ -583,8 +584,14 @@ async def flush_skipped_rsyncer( "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" }, ) as resp: - data = await resp.json() - return data + flush_result = await resp.json() + if not flush_result.get("success", False): + return {"success": False} + # Reset the skipped file count for the specific Prometheus gauge to 0 + prom.skipped_files.labels( + rsync_source=rsyncer_source.source, visit=session_entry.visit + ).set(0) + return flush_result class RSyncerInfo(BaseModel): From 95c4b18bba70943190dca0fea80b116a9770d64e Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 28 Jul 2025 11:17:44 +0100 Subject: [PATCH 3/9] Added Pydantic model for skipped rsync files data --- src/murfey/util/models.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/src/murfey/util/models.py b/src/murfey/util/models.py index db55d83ae..72b0b542c 100644 --- a/src/murfey/util/models.py +++ b/src/murfey/util/models.py @@ -78,6 +78,12 @@ class RsyncerInfo(BaseModel): tag: str = "" +class RsyncerSkippedFiles(BaseModel): + source: str + session_id: int + increment_count: int = 1 + + """ Single Particle Analysis ======================== From 67ab1e62976ee9cf03ab770ca7e6da6860408774 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 28 Jul 2025 11:18:49 +0100 Subject: [PATCH 4/9] Updated skipped files Prometheus endpoint to use new Pydantic model and to group skipped file information by source --- src/murfey/server/api/prometheus.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/murfey/server/api/prometheus.py b/src/murfey/server/api/prometheus.py index ae9aa6268..90a54db16 100644 --- a/src/murfey/server/api/prometheus.py +++ b/src/murfey/server/api/prometheus.py @@ -12,7 +12,7 @@ from murfey.server.murfey_db import murfey_db from murfey.util import sanitise from murfey.util.db import RsyncInstance -from murfey.util.models import RsyncerInfo +from murfey.util.models import RsyncerInfo, RsyncerSkippedFiles logger = getLogger("murfey.server.api.prometheus") @@ -90,13 +90,13 @@ def increment_rsync_transferred_files_prometheus( ).inc(rsyncer_info.data_bytes) -@router.post( - "/visits/{visit_name}/increment_rsync_skipped_files_prometheus/{increment_count}" -) +@router.post("/visits/{visit_name}/increment_rsync_skipped_files_prometheus") def increment_rsync_skipped_files_prometheus( - visit_name: str, increment_count: int, db=murfey_db + visit_name: str, rsyncer_skipped_files: RsyncerSkippedFiles, db=murfey_db ): - prom.skipped_files.labels(visit=visit_name).inc(increment_count) + prom.skipped_files.labels( + rsync_source=rsyncer_skipped_files.source, visit=visit_name + ).inc(rsyncer_skipped_files.increment_count) @router.post("/visits/{visit_name}/monitoring/{on}") From afbe461309441756a9e5052bd3e8c9ff8f8feb7e Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 28 Jul 2025 11:19:26 +0100 Subject: [PATCH 5/9] Updated date sent to the Prometheus endpoint so that source information is also included --- src/murfey/client/multigrid_control.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index a89c0ce08..c22524408 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -673,8 +673,15 @@ def _increment_transferred_files( source: str, destination: str, ): - skip_url = f"{str(self._environment.url.geturl())}{url_path_for('prometheus.router', 'increment_rsync_skipped_files_prometheus', visit_name=self._environment.visit, increment_count=num_skipped_files)}" - requests.post(skip_url, json={}) + skip_url = f"{str(self._environment.url.geturl())}{url_path_for('prometheus.router', 'increment_rsync_skipped_files_prometheus', visit_name=self._environment.visit)}" + requests.post( + skip_url, + json={ + "source": source, + "session_id": self.session_id, + "increment_count": num_skipped_files, + }, + ) checked_updates = [ update for update in updates if update.outcome is TransferResult.SUCCESS From 72a7a85db991d5c3d025fc5166eef7998b77926c Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 28 Jul 2025 11:20:10 +0100 Subject: [PATCH 6/9] Updated route manifest to reflect modified skipped files endpoint URL --- src/murfey/util/route_manifest.yaml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/murfey/util/route_manifest.yaml b/src/murfey/util/route_manifest.yaml index 34dbcf4d2..de173cbd7 100644 --- a/src/murfey/util/route_manifest.yaml +++ b/src/murfey/util/route_manifest.yaml @@ -642,13 +642,13 @@ murfey.server.api.prometheus.router: type: str methods: - POST - - path: /prometheus/visits/{visit_name}/increment_rsync_skipped_files_prometheus/{increment_count} + - path: /prometheus/visits/{visit_name}/increment_rsync_skipped_files_prometheus function: increment_rsync_skipped_files_prometheus path_params: - name: visit_name type: str - - name: increment_count - type: int + methods: + - POST - path: /prometheus/visits/{visit_name}/monitoring/{on} function: change_monitoring_status path_params: From 8c491ee11fd847970d8a1afadf63dfbbfb547183 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 28 Jul 2025 11:44:21 +0100 Subject: [PATCH 7/9] Added 'rsync_source' as a label in the skipped files Prometheus gauge definition --- src/murfey/server/prometheus.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/murfey/server/prometheus.py b/src/murfey/server/prometheus.py index 7cf39d310..fff3ac0a0 100644 --- a/src/murfey/server/prometheus.py +++ b/src/murfey/server/prometheus.py @@ -27,7 +27,7 @@ skipped_files = Gauge( "skipped_files", "Number of files not transferred due to end time", - ["visit"], + ["rsync_source", "visit"], ) preprocessed_movies = Counter( From b368d272e2a490adfb64a33cf3cd49b01784c828 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 28 Jul 2025 13:17:42 +0100 Subject: [PATCH 8/9] Added skipped files Prometheus gauge to the items to be created when registering an Rsyncer, and removed redundant lines --- src/murfey/server/api/session_control.py | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/src/murfey/server/api/session_control.py b/src/murfey/server/api/session_control.py index f8966a5f2..1a5702904 100644 --- a/src/murfey/server/api/session_control.py +++ b/src/murfey/server/api/session_control.py @@ -224,18 +224,6 @@ def register_rsyncer(session_id: int, rsyncer_info: RsyncerInfo, db=murfey_db): db.add(rsync_instance) db.commit() db.close() - prom.seen_files.labels(rsync_source=rsyncer_info.source, visit=visit_name) - prom.seen_data_files.labels(rsync_source=rsyncer_info.source, visit=visit_name) - prom.transferred_files.labels(rsync_source=rsyncer_info.source, visit=visit_name) - prom.transferred_files_bytes.labels( - rsync_source=rsyncer_info.source, visit=visit_name - ) - prom.transferred_data_files.labels( - rsync_source=rsyncer_info.source, visit=visit_name - ) - prom.transferred_data_files_bytes.labels( - rsync_source=rsyncer_info.source, visit=visit_name - ) prom.seen_files.labels(rsync_source=rsyncer_info.source, visit=visit_name).set(0) prom.transferred_files.labels( rsync_source=rsyncer_info.source, visit=visit_name @@ -252,6 +240,7 @@ def register_rsyncer(session_id: int, rsyncer_info: RsyncerInfo, db=murfey_db): prom.transferred_data_files_bytes.labels( rsync_source=rsyncer_info.source, visit=visit_name ).set(0) + prom.skipped_files.labels(rsync_source=rsyncer_info.source, visit=visit_name).set(0) return rsyncer_info From a4c3c46c954892a1fff3ca3e698361efc1baebe1 Mon Sep 17 00:00:00 2001 From: Eu Pin Tien Date: Mon, 28 Jul 2025 13:19:11 +0100 Subject: [PATCH 9/9] Added skipped files Prometheus gauge to list of items to remove when cleaning up a session --- src/murfey/server/api/shared.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/murfey/server/api/shared.py b/src/murfey/server/api/shared.py index 1b8bf84ae..fd0cc353e 100644 --- a/src/murfey/server/api/shared.py +++ b/src/murfey/server/api/shared.py @@ -77,6 +77,11 @@ def remove_session_by_id(session_id: int, db): args=(ri.source, session.visit), label="transferred_data_file_bytes", ) + safe_run( + prom.skipped_files.remove, + args=(ri.source, session.visit), + label="skipped_files", + ) collected_ids = db.exec( select(DataCollectionGroup, DataCollection, ProcessingJob) .where(DataCollectionGroup.session_id == session_id)