Skip to content

Commit 7334205

Browse files
committed
Safer registration of data collections
1 parent 58b66ba commit 7334205

File tree

4 files changed

+22
-22
lines changed

4 files changed

+22
-22
lines changed

src/murfey/server/feedback.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2226,7 +2226,7 @@ def feedback_callback(header: dict, message: dict, _db=murfey_db) -> None:
22262226
murfey_db=_db,
22272227
)
22282228
if murfey.server._transport_object:
2229-
if result.get("success", False):
2229+
if result.get("success"):
22302230
murfey.server._transport_object.transport.ack(header)
22312231
else:
22322232
# Send it directly to DLQ without trying to rerun it

src/murfey/workflows/register_data_collection.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
import time
23

34
import ispyb.sqlalchemy._auto_db_schema as ISPyBDB
45
from sqlmodel import select
@@ -39,6 +40,7 @@ def run(
3940
dcgid = dcg[0].id
4041
# flush_data_collections(message["source"], murfey_db)
4142
else:
43+
time.sleep(2)
4244
logger.warning(
4345
"No data collection group ID was found for image directory "
4446
f"{sanitise(message['image_directory'])} and source "
@@ -84,21 +86,20 @@ def run(
8486
else ""
8587
),
8688
).get("return_value", None)
89+
if dcid is None:
90+
time.sleep(2)
91+
logger.error(
92+
"Failed to register the following data collection: \n"
93+
f"{message} \n"
94+
"Requeueing message"
95+
)
96+
return {"success": False, "requeue": True}
8797
murfey_dc = MurfeyDB.DataCollection(
8898
id=dcid,
8999
tag=message.get("tag"),
90100
dcg_id=dcgid,
91101
)
92102
murfey_db.add(murfey_dc)
93103
murfey_db.commit()
94-
dcid = murfey_dc.id
95104
murfey_db.close()
96-
97-
if dcid is None:
98-
logger.error(
99-
"Failed to register the following data collection: \n"
100-
f"{message} \n"
101-
"Requeueing message"
102-
)
103-
return {"success": False, "requeue": True}
104105
return {"success": True}

src/murfey/workflows/register_data_collection_group.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,15 @@ def run(
5454
"return_value", None
5555
)
5656

57+
if dcgid is None:
58+
time.sleep(2)
59+
logger.error(
60+
"Failed to register the following data collection group: \n"
61+
f"{message} \n"
62+
"Requeuing message"
63+
)
64+
return {"success": False, "requeue": True}
65+
5766
atlas_record = ISPyBDB.Atlas(
5867
dataCollectionGroupId=dcgid,
5968
atlasImage=message.get("atlas", ""),
@@ -77,15 +86,6 @@ def run(
7786
murfey_db.commit()
7887
murfey_db.close()
7988

80-
if dcgid is None:
81-
time.sleep(2)
82-
logger.error(
83-
"Failed to register the following data collection group: \n"
84-
f"{message} \n"
85-
"Requeuing message"
86-
)
87-
return {"success": False, "requeue": True}
88-
8989
if dcg_hooks := entry_points(group="murfey.hooks", name="data_collection_group"):
9090
try:
9191
for hook in dcg_hooks:

src/murfey/workflows/register_processing_job.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ def run(message: dict, murfey_db: SQLModelSession, demo: bool = False):
6363
pid = _transport_object.do_create_ispyb_job(record).get(
6464
"return_value", None
6565
)
66+
if pid is None:
67+
return {"success": False, "requeue": True}
6668
murfey_pj = MurfeyDB.ProcessingJob(
6769
id=pid, recipe=message["recipe"], dc_id=_dcid
6870
)
@@ -71,9 +73,6 @@ def run(message: dict, murfey_db: SQLModelSession, demo: bool = False):
7173
pid = murfey_pj.id
7274
murfey_db.close()
7375

74-
if pid is None:
75-
return {"success": False, "requeue": True}
76-
7776
# Update Prometheus counter for preprocessed movies
7877
prom.preprocessed_movies.labels(processing_job=pid)
7978

0 commit comments

Comments
 (0)