diff --git a/src/murfey/client/multigrid_control.py b/src/murfey/client/multigrid_control.py index 931126d42..c22524408 100644 --- a/src/murfey/client/multigrid_control.py +++ b/src/murfey/client/multigrid_control.py @@ -667,8 +667,22 @@ def _increment_transferred_files_prometheus( requests.post(url, json=data) def _increment_transferred_files( - self, updates: List[RSyncerUpdate], source: str, destination: str + self, + updates: List[RSyncerUpdate], + num_skipped_files: int, + source: str, + destination: str, ): + skip_url = f"{str(self._environment.url.geturl())}{url_path_for('prometheus.router', 'increment_rsync_skipped_files_prometheus', visit_name=self._environment.visit)}" + requests.post( + skip_url, + json={ + "source": source, + "session_id": self.session_id, + "increment_count": num_skipped_files, + }, + ) + checked_updates = [ update for update in updates if update.outcome is TransferResult.SUCCESS ] diff --git a/src/murfey/client/rsync.py b/src/murfey/client/rsync.py index de97d96ee..52facd0db 100644 --- a/src/murfey/client/rsync.py +++ b/src/murfey/client/rsync.py @@ -309,7 +309,7 @@ def _fake_transfer(self, files: list[Path]) -> bool: self.notify(update) updates.append(update) time.sleep(0.01) - self.notify([update], secondary=True) + self.notify([update], num_skipped_files=0, secondary=True) # self.notify(updates, secondary=True) return True @@ -328,8 +328,10 @@ def _transfer(self, infiles: list[Path]) -> bool: if f.is_file() and f.stat().st_ctime < self._end_time.timestamp() ] self._skipped_files.extend(set(infiles).difference(set(files))) + num_skipped_files = len(set(infiles).difference(set(files))) else: files = [f for f in infiles if f.is_file()] + num_skipped_files = 0 previously_transferred = self._files_transferred transfer_success: set[Path] = set() @@ -528,7 +530,9 @@ def parse_stderr(line: str): if success: success = result.returncode == 0 - self.notify(successful_updates, secondary=True) + self.notify( + successful_updates, num_skipped_files=num_skipped_files, secondary=True + ) # Print out a summary message for each file transfer batch instead of individual messages # List out file paths as stored in memory to see if issue is due to file path mismatch diff --git a/src/murfey/server/api/instrument.py b/src/murfey/server/api/instrument.py index d3dd95275..7d5ae6d1f 100644 --- a/src/murfey/server/api/instrument.py +++ b/src/murfey/server/api/instrument.py @@ -13,6 +13,7 @@ from sqlmodel import select from werkzeug.utils import secure_filename +import murfey.server.prometheus as prom from murfey.server.api.auth import MurfeyInstrumentNameFrontend as MurfeyInstrumentName from murfey.server.api.auth import MurfeySessionIDFrontend as MurfeySessionID from murfey.server.api.auth import ( @@ -555,8 +556,8 @@ async def flush_skipped_rsyncer( db.commit() # Send request to flush rsyncer - data: dict = {} update_result: dict = {} + flush_result: dict = {} machine_config = get_machine_config(instrument_name=instrument_name)[ instrument_name ] @@ -583,8 +584,14 @@ async def flush_skipped_rsyncer( "Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}" }, ) as resp: - data = await resp.json() - return data + flush_result = await resp.json() + if not flush_result.get("success", False): + return {"success": False} + # Reset the skipped file count for the specific Prometheus gauge to 0 + prom.skipped_files.labels( + rsync_source=rsyncer_source.source, visit=session_entry.visit + ).set(0) + return flush_result class RSyncerInfo(BaseModel): diff --git a/src/murfey/server/api/prometheus.py b/src/murfey/server/api/prometheus.py index c93e49b27..90a54db16 100644 --- a/src/murfey/server/api/prometheus.py +++ b/src/murfey/server/api/prometheus.py @@ -12,7 +12,7 @@ from murfey.server.murfey_db import murfey_db from murfey.util import sanitise from murfey.util.db import RsyncInstance -from murfey.util.models import RsyncerInfo +from murfey.util.models import RsyncerInfo, RsyncerSkippedFiles logger = getLogger("murfey.server.api.prometheus") @@ -90,6 +90,15 @@ def increment_rsync_transferred_files_prometheus( ).inc(rsyncer_info.data_bytes) +@router.post("/visits/{visit_name}/increment_rsync_skipped_files_prometheus") +def increment_rsync_skipped_files_prometheus( + visit_name: str, rsyncer_skipped_files: RsyncerSkippedFiles, db=murfey_db +): + prom.skipped_files.labels( + rsync_source=rsyncer_skipped_files.source, visit=visit_name + ).inc(rsyncer_skipped_files.increment_count) + + @router.post("/visits/{visit_name}/monitoring/{on}") def change_monitoring_status(visit_name: str, on: int): prom.monitoring_switch.labels(visit=visit_name) diff --git a/src/murfey/server/api/session_control.py b/src/murfey/server/api/session_control.py index f8966a5f2..1a5702904 100644 --- a/src/murfey/server/api/session_control.py +++ b/src/murfey/server/api/session_control.py @@ -224,18 +224,6 @@ def register_rsyncer(session_id: int, rsyncer_info: RsyncerInfo, db=murfey_db): db.add(rsync_instance) db.commit() db.close() - prom.seen_files.labels(rsync_source=rsyncer_info.source, visit=visit_name) - prom.seen_data_files.labels(rsync_source=rsyncer_info.source, visit=visit_name) - prom.transferred_files.labels(rsync_source=rsyncer_info.source, visit=visit_name) - prom.transferred_files_bytes.labels( - rsync_source=rsyncer_info.source, visit=visit_name - ) - prom.transferred_data_files.labels( - rsync_source=rsyncer_info.source, visit=visit_name - ) - prom.transferred_data_files_bytes.labels( - rsync_source=rsyncer_info.source, visit=visit_name - ) prom.seen_files.labels(rsync_source=rsyncer_info.source, visit=visit_name).set(0) prom.transferred_files.labels( rsync_source=rsyncer_info.source, visit=visit_name @@ -252,6 +240,7 @@ def register_rsyncer(session_id: int, rsyncer_info: RsyncerInfo, db=murfey_db): prom.transferred_data_files_bytes.labels( rsync_source=rsyncer_info.source, visit=visit_name ).set(0) + prom.skipped_files.labels(rsync_source=rsyncer_info.source, visit=visit_name).set(0) return rsyncer_info diff --git a/src/murfey/server/api/shared.py b/src/murfey/server/api/shared.py index 1b8bf84ae..fd0cc353e 100644 --- a/src/murfey/server/api/shared.py +++ b/src/murfey/server/api/shared.py @@ -77,6 +77,11 @@ def remove_session_by_id(session_id: int, db): args=(ri.source, session.visit), label="transferred_data_file_bytes", ) + safe_run( + prom.skipped_files.remove, + args=(ri.source, session.visit), + label="skipped_files", + ) collected_ids = db.exec( select(DataCollectionGroup, DataCollection, ProcessingJob) .where(DataCollectionGroup.session_id == session_id) diff --git a/src/murfey/server/prometheus.py b/src/murfey/server/prometheus.py index 0217f6e26..fff3ac0a0 100644 --- a/src/murfey/server/prometheus.py +++ b/src/murfey/server/prometheus.py @@ -24,6 +24,12 @@ ["rsync_source", "visit"], ) +skipped_files = Gauge( + "skipped_files", + "Number of files not transferred due to end time", + ["rsync_source", "visit"], +) + preprocessed_movies = Counter( "preprocessed_movies", "Number of movies that have been preprocessed", diff --git a/src/murfey/util/models.py b/src/murfey/util/models.py index db55d83ae..72b0b542c 100644 --- a/src/murfey/util/models.py +++ b/src/murfey/util/models.py @@ -78,6 +78,12 @@ class RsyncerInfo(BaseModel): tag: str = "" +class RsyncerSkippedFiles(BaseModel): + source: str + session_id: int + increment_count: int = 1 + + """ Single Particle Analysis ======================== diff --git a/src/murfey/util/route_manifest.yaml b/src/murfey/util/route_manifest.yaml index 237061d53..de173cbd7 100644 --- a/src/murfey/util/route_manifest.yaml +++ b/src/murfey/util/route_manifest.yaml @@ -642,6 +642,13 @@ murfey.server.api.prometheus.router: type: str methods: - POST + - path: /prometheus/visits/{visit_name}/increment_rsync_skipped_files_prometheus + function: increment_rsync_skipped_files_prometheus + path_params: + - name: visit_name + type: str + methods: + - POST - path: /prometheus/visits/{visit_name}/monitoring/{on} function: change_monitoring_status path_params: