From cb820b9150cbb1194a746cdaaec0c361c760dc29 Mon Sep 17 00:00:00 2001 From: Neil Massey Date: Mon, 18 May 2026 17:09:19 +0100 Subject: [PATCH 1/2] Added cancel job command to interface with new RPC call on NLDS server --- nlds_admin/nlds_admin.py | 117 ++++++++++++++++++++++++++++++ nlds_admin/prints.py | 35 ++++++--- nlds_admin/rabbit/publisher.py | 13 ++-- nlds_admin/rabbit/routing_keys.py | 4 +- 4 files changed, 153 insertions(+), 16 deletions(-) diff --git a/nlds_admin/nlds_admin.py b/nlds_admin/nlds_admin.py index 2400ff7..0c61c29 100644 --- a/nlds_admin/nlds_admin.py +++ b/nlds_admin/nlds_admin.py @@ -1,12 +1,20 @@ import click from nlds_admin.rabbit.rpc_publisher import RabbitMQRPCPublisher +from nlds_admin.rabbit.publisher import RabbitMQPublisher from nlds_admin.publishers.list import list_holdings from nlds_admin.publishers.find import find_files from nlds_admin.publishers.status import get_request_status +from nlds_admin.publishers.cancel import cancel_transaction + +import nlds_admin.rabbit.routing_keys as RK +import nlds_admin.rabbit.message_keys as MSG from nlds_admin import prints from nlds_admin.deserialize import deserialize +from uuid import uuid4 +import json + @click.group() @click.pass_context @@ -422,12 +430,121 @@ def stat( response_meta = json_response["meta"] response_data = response_data["records"] + if "failure" in response_details and len(response_details["failure"]) > 0: + fail_string = "Failed to stat transactions " + fail_string += prints.construct_header_string( + response_details, + response_meta, + time, + ) + if response_details["failure"]: + fail_string += "\n" + response_details["failure"] + raise click.UsageError(fail_string) + if json: click.echo(json_response) else: prints.print_action(response_data, response_details, response_meta, time) +@nlds_admin.command( + "cancel", help="Cancel a transaction that is still in the QUEUING stage." +) +@click.pass_context +@click.option( + "-u", + "--user", + # default=None, + type=str, + help="The username to cancel the transaction for.", +) +@click.option( + "-g", + "--group", + default=None, + type=str, + help="The group to cancel the transactions for.", +) +@click.option( + "-i", + "--id", + default=None, + type=int, + help="The numeric id of the transaction to cancel.", +) +@click.option( + "-n", + "--transaction_id", + default=None, + type=str, + help="The UUID transaction id of the transaction to cancel.", +) +@click.option( + "-b", + "--job_label", + default=None, + type=str, + help="The job label of the transaction(s) to cancel.", +) +@click.option( + "-j", + "--json", + default=False, + type=bool, + is_flag=True, + help="Output the result as JSON.", +) +def cancel(ctx, user, group, id, transaction_id, job_label, json): + # we do want to use the RPC publisher for this, so that we can have interaction + # with the user + rpc_publisher = ctx.obj + try: + ret = cancel_transaction( + rpc_publisher=rpc_publisher, + user=user, + group=group, + id=id, + transaction_id=transaction_id, + job_label=job_label, + ) + finally: + rpc_publisher.close_connection() + json_response = deserialize(ret) + response_details = json_response["details"] + if "meta" in json_response: + response_meta = json_response["meta"] + else: + response_meta = {} + + if "failure" in response_details and len(response_details["failure"]) > 0: + fail_string = "Failed to cancel transaction " + fail_string += prints.construct_header_string(response_details, response_meta) + if response_details["failure"]: + fail_string += "\n" + response_details["failure"] + raise click.UsageError(fail_string) + + response_details = json_response["details"] + response_data = json_response["data"] + response_meta = json_response["meta"] + if "records" in response_data: + response_data = response_data["records"] + + if "failure" in response_details and len(response_details["failure"]) > 0: + fail_string = "Failed to cancel transaction " + fail_string += prints.construct_header_string(response_details, response_meta) + if response_details["failure"]: + fail_string += "\n" + response_details["failure"] + raise click.UsageError(fail_string) + + if json: + click.echo(json_response) + else: + prints.print_action(response_data, response_details, response_meta) + + if json: + click.echo(json_response) + + def main(): nlds_admin(prog_name="nlds-admin") diff --git a/nlds_admin/prints.py b/nlds_admin/prints.py index 9605d32..a887728 100644 --- a/nlds_admin/prints.py +++ b/nlds_admin/prints.py @@ -290,7 +290,7 @@ def print_single_stat(response: dict): f"{'':<9}{'last update':<13}: {(sr['last_updated']).replace('T',' ')}" ) - if len(sr["failed_files"]) > 0: + if "failed_files" in sr and len(sr["failed_files"]) > 0: click.echo(f"{'':<9}{'failed files':<13}->") for ff in sr["failed_files"]: click.echo(f"{'':<9}{'+':<4} {'filepath':<8} : {ff['filepath']}") @@ -319,7 +319,13 @@ def print_multi_stat(response: dict): ) -def construct_header_string(details, meta, time, simple=False, url=False): +def construct_header_string( + details, + meta, + time=None, + simple=False, + url=False, +): """ Constructs a string based on the inputs and prints the response. """ @@ -328,7 +334,7 @@ def construct_header_string(details, meta, time, simple=False, url=False): if details.get("user_query"): header.append(f"user: {details['user_query']}") elif details.get("groupall") or details["user"] == "nlds": - header.append("All users") + header.append("users: **all**") else: header.append(f"user: {details['user']}") @@ -360,16 +366,17 @@ def construct_header_string(details, meta, time, simple=False, url=False): if details.get("path"): header.append(f"path: {details['path']}") if meta.get("api_action"): - api_action = ", ".join(meta['api_action']) + api_action = ", ".join(meta["api_action"]) header.append(f"api action: {api_action}") req_details = ", ".join(header) # Add order to the request details - if time: - req_details += ", order: descending" - else: - req_details += ", order: ascending" + if time is not None: + if time: + req_details += ", order: descending" + else: + req_details += ", order: ascending" return req_details @@ -390,7 +397,14 @@ def print_table_headers(api_action): click.echo(headers.get(api_action, "")) -def print_action(response: dict, req_details, req_meta, time, simple=False, url=None): +def print_action( + response: dict, + req_details, + req_meta, + time=None, + simple=False, + url=None, +): header = construct_header_string(req_details, req_meta, time) api_action = req_details["api_action"] @@ -408,6 +422,7 @@ def print_action(response: dict, req_details, req_meta, time, simple=False, url= "list": "Listing holding for", "find": "Listing files for holding", "stat": "State of transaction for", + "cancel": "Cancel transaction for", } click.echo(f"{action_messages.get(api_action)} {header}") @@ -415,6 +430,7 @@ def print_action(response: dict, req_details, req_meta, time, simple=False, url= "list": print_single_list, "find": lambda res: print_single_file(res, url), "stat": print_single_stat, + "cancel": print_single_stat, } action_functions[api_action](response) return @@ -423,6 +439,7 @@ def print_action(response: dict, req_details, req_meta, time, simple=False, url= "list": "Listing holdings for", "find": "Listing files for holdings", "stat": "State of transactions for", + "cancel": "Cancel transactions for", } click.echo(f"{action_messages.get(api_action)} {header}") diff --git a/nlds_admin/rabbit/publisher.py b/nlds_admin/rabbit/publisher.py index 592d9fd..8c27664 100644 --- a/nlds_admin/rabbit/publisher.py +++ b/nlds_admin/rabbit/publisher.py @@ -64,17 +64,17 @@ def _verify_exchange(exchange: str): Throws a ValueError if not. """ - if ( - "name" not in exchange - or "type" not in exchange - or "delayed" not in exchange - ): + if "name" not in exchange or "type" not in exchange: raise ValueError( "Exchange in config file incomplete, cannot " "be declared." ) def declare_bindings(self): - raise NotImplementedError + """Go through list of exchanges from config file and declare each.""" + for exchange in self.exchanges: + self.channel.exchange_declare( + exchange=exchange["name"], exchange_type=exchange["type"] + ) @retry(RabbitRetryError, tries=-1, delay=1, backoff=2, max_delay=60, logger=logger) def get_connection(self): @@ -187,6 +187,7 @@ def publish_message( f"properly (rk = {routing_key})." ) logger.debug(f"{type(e).__name__}: {e}") + raise e # NOTE: don't reraise in this case, can cause an infinite loop as # the message will never be sent. # raise RabbitRetryError(str(e), ampq_exception=e) diff --git a/nlds_admin/rabbit/routing_keys.py b/nlds_admin/rabbit/routing_keys.py index 5837cec..378f4a6 100644 --- a/nlds_admin/rabbit/routing_keys.py +++ b/nlds_admin/rabbit/routing_keys.py @@ -16,8 +16,10 @@ STAT = "stat" FIND = "find" META = "meta" +CANCEL = "cancel" CATALOG_Q = "catalog_q_user" MONITOR_Q = "monitor_q_user" - +NLDS_Q = "nlds_q" +START = "start" # Exchange routing key parts – root ADMIN = "nlds-admin" From 4bf626adc9a1bbe1cd4d5884f76e0fb7e19ad171 Mon Sep 17 00:00:00 2001 From: Neil Massey Date: Tue, 19 May 2026 09:16:14 +0100 Subject: [PATCH 2/2] Upped to 1.0.2 --- nlds_admin/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nlds_admin/__init__.py b/nlds_admin/__init__.py index 6c4c011..7863915 100644 --- a/nlds_admin/__init__.py +++ b/nlds_admin/__init__.py @@ -1 +1 @@ -__version__ = '1.0.1' \ No newline at end of file +__version__ = "1.0.2"