From 570777bffdba6ece5b6d7505d1908ad162ccf71f Mon Sep 17 00:00:00 2001 From: Tony Chow Date: Fri, 22 May 2026 14:40:47 +0100 Subject: [PATCH 1/4] Try async retries in frunction app errors --- .../active_caselink_ccd/cl_ccdFunctions.py | 39 ++-- .../active_caselink_ccd/function_app.py | 30 ++- .../active_caselink_ccd/requirements.txt | 2 +- .../active_caselink_ccd/retry_decorator.py | 70 ------- .../ACTIVE/active_ccd/ccdFunctions.py | 42 ++-- .../ACTIVE/active_ccd/function_app.py | 30 ++- .../ACTIVE/active_ccd/requirements.txt | 2 +- .../ACTIVE/active_ccd/retry_decorator.py | 70 ------- .../ACTIVE/active_cdam/cdamFunctions.py | 27 ++- .../ACTIVE/active_cdam/function_app.py | 30 ++- .../ACTIVE/active_cdam/requirements.txt | 2 +- .../ACTIVE/active_cdam/retry_decorator.py | 70 ------- .../unit_tests/active/caseUnderReview.yml | 17 ++ .../active/reasonForAppealSubmitted.yml | 17 ++ .../unit_tests/caseLinkFunctionApp.yml | 22 ++ .../unit_tests/cdamFunctionApp.yml | 22 ++ pr-pipeline.yml | 24 ++- .../cl_ccdFunctions_test.py | 9 +- .../cl_functionApp_test.py | 36 ++-- .../cl_retry_decorator_test.py | 198 ------------------ .../cur_hearingResponse_test.py | 46 +--- tests/active/cdamFunctionApp/__init__.py | 0 .../cdam_cdamFunctions_test.py | 9 +- .../cdamFunctionApp/cdam_function_app_test.py | 41 ++-- .../cdam_retry_decorator_test.py | 198 ------------------ tests/active/functionApp/ccdFunctions_test.py | 8 - tests/active/functionApp/functionApp_test.py | 8 - .../functionApp/retry_decorator_test.py | 198 ------------------ .../rfas_hearingResponse_test.py | 46 +--- tests/requirements.txt | 3 +- 30 files changed, 301 insertions(+), 1015 deletions(-) delete mode 100644 AzureFunctions/ACTIVE/active_caselink_ccd/retry_decorator.py delete mode 100644 AzureFunctions/ACTIVE/active_ccd/retry_decorator.py delete mode 100644 AzureFunctions/ACTIVE/active_cdam/retry_decorator.py create mode 100644 ci_cd_templates/unit_tests/active/caseUnderReview.yml create mode 100644 ci_cd_templates/unit_tests/active/reasonForAppealSubmitted.yml create mode 100644 ci_cd_templates/unit_tests/caseLinkFunctionApp.yml create mode 100644 ci_cd_templates/unit_tests/cdamFunctionApp.yml delete mode 100644 tests/active/caseLinkFunctionApp/cl_retry_decorator_test.py delete mode 100644 tests/active/cdamFunctionApp/__init__.py delete mode 100644 tests/active/cdamFunctionApp/cdam_retry_decorator_test.py delete mode 100644 tests/active/functionApp/retry_decorator_test.py diff --git a/AzureFunctions/ACTIVE/active_caselink_ccd/cl_ccdFunctions.py b/AzureFunctions/ACTIVE/active_caselink_ccd/cl_ccdFunctions.py index 29fb864b1..c8364d40f 100644 --- a/AzureFunctions/ACTIVE/active_caselink_ccd/cl_ccdFunctions.py +++ b/AzureFunctions/ACTIVE/active_caselink_ccd/cl_ccdFunctions.py @@ -1,10 +1,15 @@ import json import requests from datetime import datetime, timezone -try: - from .retry_decorator import retry_on_result -except ImportError: - from retry_decorator import retry_on_result + + +def _compact(value) -> str: + if isinstance(value, (dict, list)): + text = json.dumps(value, indent=2) + else: + text = str(value) + return text.replace("\r\n", "\\n").replace("\r", "\\n").replace("\n", "\\n") + # tokenManager lives in the same package. When this module is imported by the # Functions host the package root will be `AzureFunctions.ACTIVE.active_ccd`. @@ -34,7 +39,7 @@ def get_case_details(ccd_base_url, uid, jid, ctid, cid, idam_token, s2s_token): } try: response = requests.get(get_case_url, headers=headers) - print(f"🔢 Get Case Details Response status: {response.status_code}:{response.text}") + print(f"🔢 Get Case Details Response status: {response.status_code}:{_compact(response.text)}") return response except Exception as e: print(f"❌ Network error while calling {get_case_url}: {e}") @@ -53,7 +58,7 @@ def start_case_event(ccd_base_url, uid, jid, ctid, cid, etid, idam_token, s2s_to } try: response = requests.get(start_event_url, headers=headers) - print(f"🔢 Start Case Event Response status: {response.status_code}:{response.text}") + print(f"🔢 Start Case Event Response status: {response.status_code}:{_compact(response.text)}") return response except Exception as e: print(f"❌ Network error while calling {start_event_url}: {e}") @@ -85,11 +90,11 @@ def validate_case(ccd_base_url, uid, jid, ctid, cid, etid, event_token, payloadD "ignore_warning": True, } - print(f"🔢 Validate posting payload for {cid}: validate_case_url = {validate_case_url} headers = {headers} json = {json_object}") + print(f"🔢 Validate posting payload for {cid}: validate_case_url = {validate_case_url} headers = {_compact(headers)} json = {_compact(json_object)}") response = requests.post(validate_case_url, headers=headers, json=json_object) - print(f"🔢 Validate Response for {cid} = {response.status_code}: {response.text}") + print(f"🔢 Validate Response for {cid} = {response.status_code}: {_compact(response.text)}") return response except Exception as e: @@ -124,11 +129,11 @@ def submit_case_event(ccd_base_url, uid, jid, ctid, cid, etid, event_token, payl "ignore_warning": True, } - print(f"🔢 Submit payload for {cid}: submit_case_url = {submit_event_url} headers = {headers} json = {json_object}\n") + print(f"🔢 Submit payload for {cid}: submit_case_url = {submit_event_url} headers = {_compact(headers)} json = {_compact(json_object)}") response = requests.post(submit_event_url, headers=headers, json=json_object) - print(f"🔢 Submit Response status for {cid}: {response.status_code}:{response.text}\n") + print(f"🔢 Submit Response status for {cid}: {response.status_code}:{_compact(response.text)}") return response except Exception as e: @@ -136,12 +141,6 @@ def submit_case_event(ccd_base_url, uid, jid, ctid, cid, etid, event_token, payl return None -@retry_on_result( - max_retries=2, - base_delay=30, - max_delay=60, - retry_on=lambda r: isinstance(r, dict) and r.get("Status") == "ERROR", -) def process_event(env, ccdReference, runId, caseLinkPayload, PR_REFERENCE, overwrite=False): print(f"Starting processing case for {ccdReference}") @@ -244,10 +243,10 @@ def process_event(env, ccdReference, runId, caseLinkPayload, PR_REFERENCE, overw validate_case_response = validate_case(ccd_base_url, uid, jid, ctid, ccdReference, etid, event_token, caseLinkPayload, idam_token, s2s_token) try: - print(f"Validation response for case {ccdReference}: {json.dumps(validate_case_response.json(), indent=2)}") + print(f"Validation response for case {ccdReference}: {_compact(validate_case_response.json())}") except Exception: try: - print(validate_case_response.text) + print(_compact(validate_case_response.text)) except Exception: print(f"Unable to parse validate_case_response for case {ccdReference}") @@ -280,10 +279,10 @@ def process_event(env, ccdReference, runId, caseLinkPayload, PR_REFERENCE, overw submit_case_response = submit_case_event(ccd_base_url, uid, jid, ctid, ccdReference, etid, event_token, caseLinkPayload, idam_token, s2s_token) try: - print(f"Submit response for case {ccdReference}: {json.dumps(submit_case_response.json(), indent=2)}") + print(f"Submit response for case {ccdReference}: {_compact(submit_case_response.json())}") except Exception: try: - print(submit_case_response.text) + print(_compact(submit_case_response.text)) except Exception: print(f"Unable to parse submit_case_response for case {ccdReference}") diff --git a/AzureFunctions/ACTIVE/active_caselink_ccd/function_app.py b/AzureFunctions/ACTIVE/active_caselink_ccd/function_app.py index d3929d9d3..20ab47dbd 100644 --- a/AzureFunctions/ACTIVE/active_caselink_ccd/function_app.py +++ b/AzureFunctions/ACTIVE/active_caselink_ccd/function_app.py @@ -4,6 +4,8 @@ import json import os +from tenacity import AsyncRetrying, retry_if_result, stop_after_attempt, wait_exponential + from azure.core.exceptions import ResourceExistsError from azure.storage.blob.aio import BlobServiceClient from azure.eventhub.aio import EventHubProducerClient @@ -32,6 +34,23 @@ app = func.FunctionApp() +def _log_retry(retry_state): + result = retry_state.outcome.result() if retry_state.outcome else {} + error = result.get("Error", "") if isinstance(result, dict) else "" + logger.warning( + f"Retrying process_event — attempt {retry_state.attempt_number} failed " + f"(sleeping {retry_state.next_action.sleep:.0f}s): {error}" + ) + + +def _is_retryable(result): + RETRYABLE_STATUS_CODES = {408, 409, 429, 500, 502, 503, 504} + if not (isinstance(result, dict) and result.get("Status") == "ERROR"): + return False + error = result.get("Error", "") + return any(f"failed: {code}" in error for code in RETRYABLE_STATUS_CODES) + + @app.function_name("eventhub_trigger") @app.event_hub_message_trigger( arg_name="azeventhub", @@ -92,7 +111,16 @@ async def eventhub_trigger_active(azeventhub: List[func.EventHubEvent]): logger.warning(f"[IDEMPOTENCY][CASELINK] Skipping in progress case {ccdReference}.") continue - result = await asyncio.to_thread(process_event, ENV, ccdReference, run_id, data, PR_REFERENCE, overwrite) + async for attempt in AsyncRetrying( + retry=retry_if_result(_is_retryable), + stop=stop_after_attempt(3), + wait=wait_exponential(min=30, max=60), + before_sleep=_log_retry, + retry_error_callback=lambda retry_state: retry_state.outcome.result(), + ): + with attempt: + result = await asyncio.to_thread(process_event, ENV, ccdReference, run_id, data, PR_REFERENCE, overwrite) + attempt.retry_state.set_result(result) # Skip if marked for SKIPPED if result.get("Status") == "SKIPPED": diff --git a/AzureFunctions/ACTIVE/active_caselink_ccd/requirements.txt b/AzureFunctions/ACTIVE/active_caselink_ccd/requirements.txt index 52a4cb69c..2e5828cba 100644 --- a/AzureFunctions/ACTIVE/active_caselink_ccd/requirements.txt +++ b/AzureFunctions/ACTIVE/active_caselink_ccd/requirements.txt @@ -29,7 +29,7 @@ pycparser==2.22 PyJWT==2.10.1 requests==2.32.3 six==1.17.0 -tenacity==9.1.2 +tenacity==9.1.4 typing_extensions==4.13.2 urllib3==2.4.0 Werkzeug==3.1.3 diff --git a/AzureFunctions/ACTIVE/active_caselink_ccd/retry_decorator.py b/AzureFunctions/ACTIVE/active_caselink_ccd/retry_decorator.py deleted file mode 100644 index 40dd06bde..000000000 --- a/AzureFunctions/ACTIVE/active_caselink_ccd/retry_decorator.py +++ /dev/null @@ -1,70 +0,0 @@ -import time -import random -from functools import wraps - - -def retry_on_result( - max_retries: int = 3, - base_delay: float = 1.0, - max_delay: float = 60.0, - jitter: bool = True, - retry_on=None, -): - """ - Retry decorator with exponential backoff for sync functions intended - for use with asyncio.to_thread. Retries only when retry_on - predicate returns True — exceptions are not caught. - - Args: - max_retries: Number of retry attempts after the first failure. - base_delay: Initial delay in seconds (doubles each attempt). - max_delay: Upper cap on delay in seconds. - jitter: Randomise delay to 50-100% of computed value. - retry_on: Optional callable(result) -> bool. If provided, a - return value for which this returns True is treated as - a retryable failure (the result is returned as-is - after all attempts are exhausted). - - Usage: - @retry_on_result(max_retries=3, retry_on=lambda r: r.get("Status") == "ERROR") - def my_blocking_call(): - ... - - result = await asyncio.to_thread(my_blocking_call) - """ - def decorator(func): - @wraps(func) - def wrapper(*args, **kwargs): - last_result = None - - for attempt in range(max_retries + 1): - result = func(*args, **kwargs) - - if retry_on is not None and retry_on(result): - last_result = result - - if attempt == max_retries: - print( - f"{func.__name__} returned a retryable result after " - f"{max_retries + 1} attempts. Returning last result." - ) - return last_result - - delay = min(base_delay * (2 ** attempt), max_delay) - if jitter: - delay *= 0.5 + random.random() * 0.5 - - print( - f"{func.__name__} returned a retryable result " - f"(attempt {attempt + 1}/{max_retries + 1}). " - f"Retrying in {delay:.2f}s..." - ) - time.sleep(delay) - continue - - return result - - return last_result - - return wrapper - return decorator diff --git a/AzureFunctions/ACTIVE/active_ccd/ccdFunctions.py b/AzureFunctions/ACTIVE/active_ccd/ccdFunctions.py index 5164f6a3a..2223938a8 100644 --- a/AzureFunctions/ACTIVE/active_ccd/ccdFunctions.py +++ b/AzureFunctions/ACTIVE/active_ccd/ccdFunctions.py @@ -2,10 +2,6 @@ import logging import requests from datetime import datetime, timezone -try: - from .retry_decorator import retry_on_result -except ImportError: - from retry_decorator import retry_on_result # tokenManager lives in the same package. When this module is imported by the # Functions host the package root will be `AzureFunctions.ACTIVE.active_ccd`. @@ -20,6 +16,15 @@ logger = logging.getLogger(__name__) + +def _compact(value) -> str: + if isinstance(value, (dict, list)): + text = json.dumps(value, indent=2) + else: + text = str(value) + return text.replace("\r\n", "\\n").replace("\r", "\\n").replace("\n", "\\n") + + # Instantiate only one IDAMTokenManager instance per ccdFunctions import. idam_token_mgr = IDAMTokenManager(env="sbox") s2s_manager = S2S_Manager(env="sbox") @@ -39,7 +44,7 @@ def start_case_creation(ccd_base_url, uid, jid, ctid, etid, idam_token, s2s_toke } try: response = requests.get(start_case_creation_url, headers=headers) - print(f"🔢 Response status: {response.status_code}:{response.text}") + print(f"🔢 Response status: {response.status_code}:{_compact(response.text)}") return response except Exception as e: print(f"❌ Network error while calling {start_case_creation_url}: {e}") @@ -74,11 +79,11 @@ def validate_case(ccd_base_url, event_token, payloadData, jid, ctid, idam_token, } caseNo = json_object.get("data", {}).get("appealReferenceNumber", "N/A") - print(f"🔢 Validate posting payload for {caseNo}: validate_case_url = {validate_case_url} headers = {headers} json = {json_object}") + print(f"🔢 Validate posting payload for {caseNo}: validate_case_url = {validate_case_url} headers = {_compact(headers)} json = {_compact(json_object)}") response = requests.post(validate_case_url, headers=headers, json=json_object) - print(f"🔢 Validate Response for {caseNo}= {response.status_code}: {response.text}") + print(f"🔢 Validate Response for {caseNo}= {response.status_code}: {_compact(response.text)}") return response except Exception as e: @@ -105,7 +110,7 @@ def submit_case(ccd_base_url, event_token, payloadData, jid, ctid, idam_token, u except json.JSONDecodeError as e: print(f"❌ Error decoding payloadData JSON string: {e}") - print("🎁 payload recieved for submission:", type(payloadData)) + print("🎁 payload type recieved for submission:", type(payloadData)) try: json_object = { @@ -116,11 +121,11 @@ def submit_case(ccd_base_url, event_token, payloadData, jid, ctid, idam_token, u } caseNo = json_object.get("data", {}).get("appealReferenceNumber", "N/A") - print(f"🔢 Submit payload for {caseNo}: submit_case_url = {submit_case_url} headers = {headers} json = {json_object}\n") + print(f"🔢 Submit payload for {caseNo}: submit_case_url = {submit_case_url} headers = {_compact(headers)} json = {_compact(json_object)}") response = requests.post(submit_case_url, headers=headers, json=json_object) - print(f"🔢 Submit Response status for {caseNo}: {response.status_code}:{response.text}\n") + print(f"🔢 Submit Response status for {caseNo}: {response.status_code}:{_compact(response.text)}") return response except Exception as e: @@ -128,13 +133,6 @@ def submit_case(ccd_base_url, event_token, payloadData, jid, ctid, idam_token, u return None -# caseNo = event.key, payloadData = event.value -@retry_on_result( - max_retries=2, - base_delay=30, - max_delay=60, - retry_on=lambda r: isinstance(r, dict) and r.get("Status") == "ERROR", -) def process_case(env, caseNo, payloadData, runId, state, PR_REFERENCE): print(f"Starting processing case for {caseNo}") @@ -186,7 +184,7 @@ def process_case(env, caseNo, payloadData, runId, state, PR_REFERENCE): print("Starting case creation") start_response = start_case_creation(ccd_base_url, uid, jid, ctid, etid, idam_token, s2s_token) - print("Started case creation = {start_response}") + print(f"Started case creation = {_compact(start_response)}") if start_response is None or start_response.status_code != 200: if start_response is not None: @@ -218,10 +216,10 @@ def process_case(env, caseNo, payloadData, runId, state, PR_REFERENCE): validate_case_response = validate_case(ccd_base_url, event_token, payloadData, jid, ctid, idam_token, uid, s2s_token) try: - print(f"Validation response for case {caseNo}: {json.dumps(validate_case_response.json(), indent=2)}") + print(f"Validation response for case {caseNo}: {_compact(validate_case_response.json())}") except Exception: try: - print(validate_case_response.text) + print(_compact(validate_case_response.text)) except Exception: print(f"Unable to parse validate_case_response for case {caseNo}") @@ -254,10 +252,10 @@ def process_case(env, caseNo, payloadData, runId, state, PR_REFERENCE): submit_case_response = submit_case(ccd_base_url, event_token, payloadData, jid, ctid, idam_token, uid, s2s_token) try: - print(f"Submit response for case {caseNo}: {json.dumps(submit_case_response.json(), indent=2)}") + print(f"Submit response for case {caseNo}: {_compact(submit_case_response.json())}") except Exception: try: - print(submit_case_response.text) + print(_compact(submit_case_response.text)) except Exception: print(f"Unable to parse submit_case_response for case {caseNo}") diff --git a/AzureFunctions/ACTIVE/active_ccd/function_app.py b/AzureFunctions/ACTIVE/active_ccd/function_app.py index a039a1c7f..c7e4da0fb 100644 --- a/AzureFunctions/ACTIVE/active_ccd/function_app.py +++ b/AzureFunctions/ACTIVE/active_ccd/function_app.py @@ -4,6 +4,8 @@ import json import os +from tenacity import AsyncRetrying, retry_if_result, stop_after_attempt, wait_exponential + from azure.storage.blob.aio import BlobServiceClient from azure.eventhub.aio import EventHubProducerClient from azure.eventhub import EventData @@ -35,6 +37,23 @@ app = func.FunctionApp() +def _log_retry(retry_state): + result = retry_state.outcome.result() if retry_state.outcome else {} + error = result.get("Error", "") if isinstance(result, dict) else "" + logger.warning( + f"Retrying process_case — attempt {retry_state.attempt_number} failed " + f"(sleeping {retry_state.next_action.sleep:.0f}s): {error}" + ) + + +def _is_retryable(result): + RETRYABLE_STATUS_CODES = {408, 409, 429, 500, 502, 503, 504} + if not (isinstance(result, dict) and result.get("Status") == "ERROR"): + return False + error = result.get("Error", "") + return any(f"failed: {code}" in error for code in RETRYABLE_STATUS_CODES) + + @app.function_name("eventhub_trigger") @app.event_hub_message_trigger( arg_name="azeventhub", @@ -98,7 +117,16 @@ async def eventhub_trigger_active(azeventhub: List[func.EventHubEvent]): continue # Process the file - result = await asyncio.to_thread(process_case, ENV, caseNo, data, run_id, state, PR_REFERENCE) + async for attempt in AsyncRetrying( + retry=retry_if_result(_is_retryable), + stop=stop_after_attempt(3), + wait=wait_exponential(min=30, max=60), + before_sleep=_log_retry, + retry_error_callback=lambda retry_state: retry_state.outcome.result(), + ): + with attempt: + result = await asyncio.to_thread(process_case, ENV, caseNo, data, run_id, state, PR_REFERENCE) + attempt.retry_state.set_result(result) result["StartDateTime"] = start_datetime # Mark processed if success diff --git a/AzureFunctions/ACTIVE/active_ccd/requirements.txt b/AzureFunctions/ACTIVE/active_ccd/requirements.txt index 52a4cb69c..2e5828cba 100644 --- a/AzureFunctions/ACTIVE/active_ccd/requirements.txt +++ b/AzureFunctions/ACTIVE/active_ccd/requirements.txt @@ -29,7 +29,7 @@ pycparser==2.22 PyJWT==2.10.1 requests==2.32.3 six==1.17.0 -tenacity==9.1.2 +tenacity==9.1.4 typing_extensions==4.13.2 urllib3==2.4.0 Werkzeug==3.1.3 diff --git a/AzureFunctions/ACTIVE/active_ccd/retry_decorator.py b/AzureFunctions/ACTIVE/active_ccd/retry_decorator.py deleted file mode 100644 index 40dd06bde..000000000 --- a/AzureFunctions/ACTIVE/active_ccd/retry_decorator.py +++ /dev/null @@ -1,70 +0,0 @@ -import time -import random -from functools import wraps - - -def retry_on_result( - max_retries: int = 3, - base_delay: float = 1.0, - max_delay: float = 60.0, - jitter: bool = True, - retry_on=None, -): - """ - Retry decorator with exponential backoff for sync functions intended - for use with asyncio.to_thread. Retries only when retry_on - predicate returns True — exceptions are not caught. - - Args: - max_retries: Number of retry attempts after the first failure. - base_delay: Initial delay in seconds (doubles each attempt). - max_delay: Upper cap on delay in seconds. - jitter: Randomise delay to 50-100% of computed value. - retry_on: Optional callable(result) -> bool. If provided, a - return value for which this returns True is treated as - a retryable failure (the result is returned as-is - after all attempts are exhausted). - - Usage: - @retry_on_result(max_retries=3, retry_on=lambda r: r.get("Status") == "ERROR") - def my_blocking_call(): - ... - - result = await asyncio.to_thread(my_blocking_call) - """ - def decorator(func): - @wraps(func) - def wrapper(*args, **kwargs): - last_result = None - - for attempt in range(max_retries + 1): - result = func(*args, **kwargs) - - if retry_on is not None and retry_on(result): - last_result = result - - if attempt == max_retries: - print( - f"{func.__name__} returned a retryable result after " - f"{max_retries + 1} attempts. Returning last result." - ) - return last_result - - delay = min(base_delay * (2 ** attempt), max_delay) - if jitter: - delay *= 0.5 + random.random() * 0.5 - - print( - f"{func.__name__} returned a retryable result " - f"(attempt {attempt + 1}/{max_retries + 1}). " - f"Retrying in {delay:.2f}s..." - ) - time.sleep(delay) - continue - - return result - - return last_result - - return wrapper - return decorator diff --git a/AzureFunctions/ACTIVE/active_cdam/cdamFunctions.py b/AzureFunctions/ACTIVE/active_cdam/cdamFunctions.py index ea94ac228..2c4a3a76e 100644 --- a/AzureFunctions/ACTIVE/active_cdam/cdamFunctions.py +++ b/AzureFunctions/ACTIVE/active_cdam/cdamFunctions.py @@ -3,10 +3,15 @@ from azure.storage.blob import BlobServiceClient from datetime import datetime, timezone from urllib.parse import urlparse -try: - from .retry_decorator import retry_on_result -except ImportError: - from retry_decorator import retry_on_result + + +def _compact(value) -> str: + if isinstance(value, (dict, list)): + text = json.dumps(value, indent=2) + else: + text = str(value) + return text.replace("\r\n", "\\n").replace("\r", "\\n").replace("\n", "\\n") + # tokenManager lives in the same package. When this module is imported by the # Functions host the package root will be `AzureFunctions.ACTIVE.active_cdam`. @@ -45,11 +50,11 @@ def upload_document(cdam_base_url, jid, ctid, cid, file_name, doc_binary, conten ("files", (file_name, doc_binary, content_type)) ] - print(f"🔢 Uploading document for CaseNo {cid}: upload_document_url = {upload_document_url}, headers = {headers}, body = {body}\n") + print(f"🔢 Uploading document for CaseNo {cid}: upload_document_url = {upload_document_url}, headers = {_compact(headers)}, body = {_compact(body)}") response = requests.post(upload_document_url, headers=headers, data=body, files=files) - print(f"🔢 Upload document response status for {cid}: {response.status_code}:{response.text}\n") + print(f"🔢 Upload document response status for {cid}: {response.status_code}:{_compact(response.text)}") return response except Exception as e: @@ -57,12 +62,6 @@ def upload_document(cdam_base_url, jid, ctid, cid, file_name, doc_binary, conten return None -@retry_on_result( - max_retries=2, - base_delay=30, - max_delay=60, - retry_on=lambda r: isinstance(r, dict) and r.get("Status") == "ERROR", -) def process_event(env, caseNo, runId, file_name, file_url, file_content_type, storage_credential): print(f"Starting document upload for {caseNo} for {file_name} using file path {file_url} with content type {file_content_type}") @@ -143,10 +142,10 @@ def process_event(env, caseNo, runId, file_name, file_url, file_content_type, st upload_document_response = upload_document(cdam_base_url, jid, ctid, caseNo, file_name, file_binary_in_bytes, file_content_type, idam_token, s2s_token) try: - print(f"CDAM upload for case {caseNo}: {json.dumps(upload_document_response.json(), indent=2)}") + print(f"CDAM upload for case {caseNo}: {_compact(upload_document_response.json())}") except Exception: try: - print(upload_document_response.text) + print(_compact(upload_document_response.text)) except Exception: print(f"Unable to parse upload_document_response for case {caseNo}") diff --git a/AzureFunctions/ACTIVE/active_cdam/function_app.py b/AzureFunctions/ACTIVE/active_cdam/function_app.py index d763e1de3..b87ab435f 100644 --- a/AzureFunctions/ACTIVE/active_cdam/function_app.py +++ b/AzureFunctions/ACTIVE/active_cdam/function_app.py @@ -4,6 +4,8 @@ import json import os +from tenacity import AsyncRetrying, retry_if_result, stop_after_attempt, wait_exponential + from azure.core.exceptions import ResourceExistsError from azure.storage.blob.aio import BlobServiceClient from azure.eventhub.aio import EventHubProducerClient @@ -32,6 +34,23 @@ app = func.FunctionApp() +def _log_retry(retry_state): + result = retry_state.outcome.result() if retry_state.outcome else {} + error = result.get("Error", "") if isinstance(result, dict) else "" + logger.warning( + f"Retrying process_event — attempt {retry_state.attempt_number} failed " + f"(sleeping {retry_state.next_action.sleep:.0f}s): {error}" + ) + + +def _is_retryable(result): + RETRYABLE_STATUS_CODES = {408, 409, 429, 500, 502, 503, 504} + if not (isinstance(result, dict) and result.get("Status") == "ERROR"): + return False + error = result.get("Error", "") + return any(f"failed: {code}" in error for code in RETRYABLE_STATUS_CODES) + + @app.function_name("eventhub_trigger") @app.event_hub_message_trigger( arg_name="azeventhub", @@ -103,7 +122,16 @@ async def eventhub_trigger_active(azeventhub: List[func.EventHubEvent]): logger.warning(f"[IDEMPOTENCY][CDAM] Skipping in progress case {caseNo}.") continue - result = await asyncio.to_thread(process_event, ENV, caseNo, run_id, file_name, file_url, file_content_type, storage_credential) + async for attempt in AsyncRetrying( + retry=retry_if_result(_is_retryable), + stop=stop_after_attempt(3), + wait=wait_exponential(min=30, max=60), + before_sleep=_log_retry, + retry_error_callback=lambda retry_state: retry_state.outcome.result(), + ): + with attempt: + result = await asyncio.to_thread(process_event, ENV, caseNo, run_id, file_name, file_url, file_content_type, storage_credential) + attempt.retry_state.set_result(result) # Mark processed if success if result.get("Status") == "SUCCESS": diff --git a/AzureFunctions/ACTIVE/active_cdam/requirements.txt b/AzureFunctions/ACTIVE/active_cdam/requirements.txt index 52a4cb69c..2e5828cba 100644 --- a/AzureFunctions/ACTIVE/active_cdam/requirements.txt +++ b/AzureFunctions/ACTIVE/active_cdam/requirements.txt @@ -29,7 +29,7 @@ pycparser==2.22 PyJWT==2.10.1 requests==2.32.3 six==1.17.0 -tenacity==9.1.2 +tenacity==9.1.4 typing_extensions==4.13.2 urllib3==2.4.0 Werkzeug==3.1.3 diff --git a/AzureFunctions/ACTIVE/active_cdam/retry_decorator.py b/AzureFunctions/ACTIVE/active_cdam/retry_decorator.py deleted file mode 100644 index 40dd06bde..000000000 --- a/AzureFunctions/ACTIVE/active_cdam/retry_decorator.py +++ /dev/null @@ -1,70 +0,0 @@ -import time -import random -from functools import wraps - - -def retry_on_result( - max_retries: int = 3, - base_delay: float = 1.0, - max_delay: float = 60.0, - jitter: bool = True, - retry_on=None, -): - """ - Retry decorator with exponential backoff for sync functions intended - for use with asyncio.to_thread. Retries only when retry_on - predicate returns True — exceptions are not caught. - - Args: - max_retries: Number of retry attempts after the first failure. - base_delay: Initial delay in seconds (doubles each attempt). - max_delay: Upper cap on delay in seconds. - jitter: Randomise delay to 50-100% of computed value. - retry_on: Optional callable(result) -> bool. If provided, a - return value for which this returns True is treated as - a retryable failure (the result is returned as-is - after all attempts are exhausted). - - Usage: - @retry_on_result(max_retries=3, retry_on=lambda r: r.get("Status") == "ERROR") - def my_blocking_call(): - ... - - result = await asyncio.to_thread(my_blocking_call) - """ - def decorator(func): - @wraps(func) - def wrapper(*args, **kwargs): - last_result = None - - for attempt in range(max_retries + 1): - result = func(*args, **kwargs) - - if retry_on is not None and retry_on(result): - last_result = result - - if attempt == max_retries: - print( - f"{func.__name__} returned a retryable result after " - f"{max_retries + 1} attempts. Returning last result." - ) - return last_result - - delay = min(base_delay * (2 ** attempt), max_delay) - if jitter: - delay *= 0.5 + random.random() * 0.5 - - print( - f"{func.__name__} returned a retryable result " - f"(attempt {attempt + 1}/{max_retries + 1}). " - f"Retrying in {delay:.2f}s..." - ) - time.sleep(delay) - continue - - return result - - return last_result - - return wrapper - return decorator diff --git a/ci_cd_templates/unit_tests/active/caseUnderReview.yml b/ci_cd_templates/unit_tests/active/caseUnderReview.yml new file mode 100644 index 000000000..e178e3262 --- /dev/null +++ b/ci_cd_templates/unit_tests/active/caseUnderReview.yml @@ -0,0 +1,17 @@ +steps: + - task: UsePythonVersion@0 + displayName: 'Use Python 3.x' + inputs: + versionSpec: '3.x' + + - script: | + pip install uv + uv pip install -r $(Build.SourcesDirectory)/tests/requirements.txt --system + displayName: 'Install dependencies' + + - script: | + echo "Running case under review tests" + pytest $(Build.SourcesDirectory)/tests/active/caseUnderReview \ + --junitxml=$(Build.ArtifactStagingDirectory)/pytest-results.xml -v + displayName: 'Run case under review unit tests' + \ No newline at end of file diff --git a/ci_cd_templates/unit_tests/active/reasonForAppealSubmitted.yml b/ci_cd_templates/unit_tests/active/reasonForAppealSubmitted.yml new file mode 100644 index 000000000..440f5c9f5 --- /dev/null +++ b/ci_cd_templates/unit_tests/active/reasonForAppealSubmitted.yml @@ -0,0 +1,17 @@ +steps: + - task: UsePythonVersion@0 + displayName: 'Use Python 3.x' + inputs: + versionSpec: '3.x' + + - script: | + pip install uv + uv pip install -r $(Build.SourcesDirectory)/tests/requirements.txt --system + displayName: 'Install dependencies' + + - script: | + echo "Running reason for appeal submitted unit tests" + pytest $(Build.SourcesDirectory)/tests/active/reasonForAppealSubmitted \ + --junitxml=$(Build.ArtifactStagingDirectory)/pytest-results.xml -v + displayName: 'Run reason for appeal submitted state pytest unit tests' + \ No newline at end of file diff --git a/ci_cd_templates/unit_tests/caseLinkFunctionApp.yml b/ci_cd_templates/unit_tests/caseLinkFunctionApp.yml new file mode 100644 index 000000000..57aa7cbf4 --- /dev/null +++ b/ci_cd_templates/unit_tests/caseLinkFunctionApp.yml @@ -0,0 +1,22 @@ +parameters: + segment: 'caseLinkFunctionApp' + +steps: + - task: UsePythonVersion@0 + displayName: 'Use Python 3.x' + inputs: + versionSpec: '3.x' + + - script: | + pip install uv + uv pip install -r $(Build.SourcesDirectory)/tests/requirements.txt --system + displayName: 'Install dependencies' + + - script: | + echo "Running ${{ parameters.segment }} unit tests" + export ENVIRONMENT=sbox + export LZ_KEY=00 + export PR_NUMBER=$(System.PullRequest.PullRequestId) + export PYTHONPATH=$PYTHONPATH:$(Build.SourcesDirectory) + pytest $(Build.SourcesDirectory)/tests/active/${{ parameters.segment }} + displayName: 'Run pytest ${{ parameters.segment }} unit tests' \ No newline at end of file diff --git a/ci_cd_templates/unit_tests/cdamFunctionApp.yml b/ci_cd_templates/unit_tests/cdamFunctionApp.yml new file mode 100644 index 000000000..fc3fd7a39 --- /dev/null +++ b/ci_cd_templates/unit_tests/cdamFunctionApp.yml @@ -0,0 +1,22 @@ +parameters: + segment: 'cdamFunctionApp' + +steps: + - task: UsePythonVersion@0 + displayName: 'Use Python 3.x' + inputs: + versionSpec: '3.x' + + - script: | + pip install uv + uv pip install -r $(Build.SourcesDirectory)/tests/requirements.txt --system + displayName: 'Install dependencies' + + - script: | + echo "Running ${{ parameters.segment }} unit tests" + export ENVIRONMENT=sbox + export LZ_KEY=00 + export PR_NUMBER=$(System.PullRequest.PullRequestId) + export PYTHONPATH=$PYTHONPATH:$(Build.SourcesDirectory) + pytest $(Build.SourcesDirectory)/tests/active/${{ parameters.segment }} + displayName: 'Run pytest ${{ parameters.segment }} unit tests' \ No newline at end of file diff --git a/pr-pipeline.yml b/pr-pipeline.yml index df58fc411..f1126d0af 100644 --- a/pr-pipeline.yml +++ b/pr-pipeline.yml @@ -17,6 +17,16 @@ stages: steps: - template: ci_cd_templates/unit_tests/functionapp.yml + - job: CdamFunctionAppTests + displayName: 'Run CDAM Function App Tests' + steps: + - template: ci_cd_templates/unit_tests/cdamFunctionApp.yml + + - job: CaseLinkFunctionAppTests + displayName: 'Run Case Link Function App Tests' + steps: + - template: ci_cd_templates/unit_tests/caseLinkFunctionApp.yml + - job: PaymentPendingTests displayName: 'Run Payment Pending Tests' steps: @@ -28,15 +38,25 @@ stages: - template: ci_cd_templates/unit_tests/active/appealSubmitted.yml - job: AwaitingRespondentEvidenceATests - displayName: 'Run AwaitingRespondentEvidenceATests Tests' + displayName: 'Run AwaitingRespondentEvidenceA Tests' steps: - template: ci_cd_templates/unit_tests/active/awaitingRespondentEvidenceA.yml - job: AwaitingRespondentEvidenceBTests - displayName: 'Run AwaitingRespondentEvidenceBTests Tests' + displayName: 'Run AwaitingRespondentEvidenceB Tests' steps: - template: ci_cd_templates/unit_tests/active/awaitingRespondentEvidenceB.yml + - job: CaseUnderReviewTests + displayName: 'Run CaseUnderReview Tests' + steps: + - template: ci_cd_templates/unit_tests/active/caseUnderReview.yml + + - job: ReasonForAppealSubmittedTests + displayName: 'Run ReasonForAppealSubmitted Tests' + steps: + - template: ci_cd_templates/unit_tests/active/reasonForAppealSubmitted.yml + - job: ListingTests displayName: 'Run Listing Tests' steps: diff --git a/tests/active/caseLinkFunctionApp/cl_ccdFunctions_test.py b/tests/active/caseLinkFunctionApp/cl_ccdFunctions_test.py index e3714553e..095cc7d7c 100644 --- a/tests/active/caseLinkFunctionApp/cl_ccdFunctions_test.py +++ b/tests/active/caseLinkFunctionApp/cl_ccdFunctions_test.py @@ -1,6 +1,6 @@ import json import pytest -from unittest.mock import patch, MagicMock, ANY +from unittest.mock import AsyncMock, patch, MagicMock, ANY # Patch Azure SDK clients before the module-level IDAMTokenManager(env="sbox") # instantiation runs, so that importing cl_ccdFunctions does not hit Key Vault. @@ -276,15 +276,8 @@ def test_submit_case_event_network_error(mock_post): ) MODULE = "AzureFunctions.ACTIVE.active_caselink_ccd.cl_ccdFunctions" -SLEEP_PATH = "AzureFunctions.ACTIVE.active_caselink_ccd.retry_decorator.time.sleep" -@pytest.fixture(autouse=True) -def no_retry_sleep(): - """Suppress retry backoff sleeps in all process_event tests.""" - with patch(SLEEP_PATH): - yield - @pytest.fixture def mock_token_managers_cl(): diff --git a/tests/active/caseLinkFunctionApp/cl_functionApp_test.py b/tests/active/caseLinkFunctionApp/cl_functionApp_test.py index 66563d7e0..03f682ab0 100644 --- a/tests/active/caseLinkFunctionApp/cl_functionApp_test.py +++ b/tests/active/caseLinkFunctionApp/cl_functionApp_test.py @@ -116,7 +116,7 @@ def patched(mocks, to_thread_mock=None, extra_patches=None): ), ] if to_thread_mock is not None: - patches.append(patch("asyncio.to_thread", new=to_thread_mock)) + patches.append(patch("AzureFunctions.ACTIVE.active_caselink_ccd.function_app.process_event", new=to_thread_mock)) if extra_patches: patches.extend(extra_patches) return patches @@ -149,7 +149,7 @@ def test_single_event_success_sends_final_batch(): payload = {"RunID": "run-001", "CaseLinkPayload": []} events = [make_mock_event(payload)] - to_thread = AsyncMock(return_value=dict(PROCESS_SUCCESS_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_SUCCESS_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active(events)) @@ -166,7 +166,7 @@ def test_cleanup_always_called_on_success(): mocks = setup_mocks(batch_len=1) events = [make_mock_event({"RunID": "r1", "CaseLinkPayload": []})] - to_thread = AsyncMock(return_value=dict(PROCESS_SUCCESS_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_SUCCESS_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active(events)) @@ -180,7 +180,7 @@ def test_cleanup_called_even_when_individual_event_errors(): mocks = setup_mocks(batch_len=0) events = [make_mock_event({"RunID": "r1", "CaseLinkPayload": []})] - to_thread = AsyncMock(side_effect=Exception("processing failed")) + to_thread = MagicMock(side_effect=Exception("processing failed")) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active(events)) @@ -223,13 +223,12 @@ def test_process_event_called_with_correct_args(): partition_key = "9876543210987654" events = [make_mock_event(payload, partition_key=partition_key)] - to_thread = AsyncMock(return_value=dict(PROCESS_SUCCESS_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_SUCCESS_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active(events)) - to_thread.assert_awaited_once_with( - app_module.process_event, + to_thread.assert_called_once_with( app_module.ENV, partition_key, payload["RunID"], @@ -244,7 +243,7 @@ def test_skip_result_is_not_added_to_batch(): mocks = setup_mocks(batch_len=0) events = [make_mock_event({"RunID": "run-skip", "CaseLinkPayload": []})] - to_thread = AsyncMock(return_value=dict(PROCESS_SKIP_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_SKIP_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active(events)) @@ -262,13 +261,12 @@ def test_overwrite_flag_passed_from_payload(): partition_key = "1234567890123456" events = [make_mock_event(payload, partition_key=partition_key)] - to_thread = AsyncMock(return_value=dict(PROCESS_SUCCESS_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_SUCCESS_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active(events)) - to_thread.assert_awaited_once_with( - app_module.process_event, + to_thread.assert_called_once_with( app_module.ENV, partition_key, payload["RunID"], @@ -296,7 +294,7 @@ def test_multiple_events_each_result_added_to_batch(): for i in range(3) ] - to_thread = AsyncMock(return_value=dict(PROCESS_SUCCESS_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_SUCCESS_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active(events)) @@ -322,7 +320,7 @@ def test_batch_overflow_flushes_old_batch_and_creates_new_one(): payload = {"RunID": "run-overflow", "CaseLinkPayload": []} events = [make_mock_event(payload)] - to_thread = AsyncMock(return_value=dict(PROCESS_SUCCESS_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_SUCCESS_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active(events)) @@ -339,7 +337,7 @@ def test_individual_event_error_does_not_stop_other_events(): call_count = 0 - async def side_effect(*_): + def side_effect(*_): nonlocal call_count call_count += 1 if call_count == 1: @@ -364,7 +362,7 @@ def test_error_result_deletes_idempotency_blob_and_sends_to_batch(): mocks = setup_mocks(batch_len=1) events = [make_mock_event({"RunID": "run-err", "CaseLinkPayload": []})] - to_thread = AsyncMock(return_value=dict(PROCESS_ERROR_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_ERROR_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active(events)) @@ -400,7 +398,7 @@ def test_idempotency_blob_uploaded_atomically_on_success_and_kept(): mocks = setup_mocks(batch_len=1) events = [make_mock_event({"RunID": "run-chk", "CaseLinkPayload": []}, partition_key="REF-CHK")] - to_thread = AsyncMock(return_value=dict(PROCESS_SUCCESS_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_SUCCESS_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active(events)) @@ -415,7 +413,7 @@ def test_idempotency_blob_not_deleted_when_process_event_raises(): mocks = setup_mocks(batch_len=0) events = [make_mock_event({"RunID": "run-exc", "CaseLinkPayload": []})] - to_thread = AsyncMock(side_effect=Exception("processing crashed")) + to_thread = MagicMock(side_effect=Exception("processing crashed")) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active(events)) @@ -431,7 +429,7 @@ def test_idempotency_blob_path_includes_ccd_reference(): partition_key = "9999000011112222" events = [make_mock_event({"RunID": "run-path", "CaseLinkPayload": []}, partition_key=partition_key)] - to_thread = AsyncMock(return_value=dict(PROCESS_SUCCESS_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_SUCCESS_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active(events)) @@ -463,7 +461,7 @@ def test_error_result_sent_even_when_delete_blob_raises(): mocks["idempotency_blob"].delete_blob.side_effect = Exception("Storage error") events = [make_mock_event({"RunID": "run-del-fail", "CaseLinkPayload": []})] - to_thread = AsyncMock(return_value=dict(PROCESS_ERROR_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_ERROR_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active(events)) diff --git a/tests/active/caseLinkFunctionApp/cl_retry_decorator_test.py b/tests/active/caseLinkFunctionApp/cl_retry_decorator_test.py deleted file mode 100644 index d1877d8ff..000000000 --- a/tests/active/caseLinkFunctionApp/cl_retry_decorator_test.py +++ /dev/null @@ -1,198 +0,0 @@ -import pytest -from unittest.mock import patch - -from AzureFunctions.ACTIVE.active_caselink_ccd.retry_decorator import retry_on_result - -SLEEP_PATH = "AzureFunctions.ACTIVE.active_caselink_ccd.retry_decorator.time.sleep" - -RETRYABLE = lambda r: isinstance(r, dict) and r.get("Status") == "ERROR" - - -def make_results_fn(*results): - """Return a function that yields successive return values.""" - values = list(results) - - def fn(*args, **kwargs): - fn.call_count += 1 - return values.pop(0) if values else None - - fn.call_count = 0 - fn.__name__ = "fn" - fn.__doc__ = None - return fn - - -# --------------------------------------------------------------------------- -# Basic behaviour -# --------------------------------------------------------------------------- - -def test_success_on_first_attempt_no_sleep(): - """Function returns a non-retryable result first try — no retries, no sleep.""" - fn = make_results_fn("ok") - decorated = retry_on_result(max_retries=3, retry_on=RETRYABLE)(fn) - - with patch(SLEEP_PATH) as mock_sleep: - result = decorated() - - assert result == "ok" - assert fn.call_count == 1 - mock_sleep.assert_not_called() - - -def test_no_retry_when_retry_on_is_none(): - """With retry_on=None, an ERROR result is returned immediately without retrying.""" - fn = make_results_fn({"Status": "ERROR"}) - decorated = retry_on_result(max_retries=3, retry_on=None)(fn) - - with patch(SLEEP_PATH) as mock_sleep: - result = decorated() - - assert result == {"Status": "ERROR"} - assert fn.call_count == 1 - mock_sleep.assert_not_called() - - -def test_retries_on_error_result_then_succeeds(): - """Function returns ERROR twice then SUCCESS; called 3 times total.""" - fn = make_results_fn({"Status": "ERROR"}, {"Status": "ERROR"}, {"Status": "SUCCESS"}) - decorated = retry_on_result(max_retries=3, retry_on=RETRYABLE, base_delay=1.0, jitter=False)(fn) - - with patch(SLEEP_PATH): - result = decorated() - - assert result == {"Status": "SUCCESS"} - assert fn.call_count == 3 - - -def test_returns_last_error_result_after_all_retries_exhausted(): - """After max_retries+1 attempts all returning ERROR, the last result is returned.""" - fn = make_results_fn(*[{"Status": "ERROR"} for _ in range(3)]) - decorated = retry_on_result(max_retries=2, retry_on=RETRYABLE, jitter=False)(fn) - - with patch(SLEEP_PATH): - result = decorated() - - assert result["Status"] == "ERROR" - assert fn.call_count == 3 - - -def test_total_call_count_equals_max_retries_plus_one(): - """With max_retries=N, function is called N+1 times total.""" - fn = make_results_fn(*[{"Status": "ERROR"} for _ in range(5)]) - decorated = retry_on_result(max_retries=4, retry_on=RETRYABLE, jitter=False)(fn) - - with patch(SLEEP_PATH): - decorated() - - assert fn.call_count == 5 - - -def test_zero_retries_returns_error_result_immediately(): - """max_retries=0 means no retries; ERROR result returned after one attempt.""" - fn = make_results_fn({"Status": "ERROR"}) - decorated = retry_on_result(max_retries=0, retry_on=RETRYABLE)(fn) - - with patch(SLEEP_PATH) as mock_sleep: - result = decorated() - - assert result["Status"] == "ERROR" - assert fn.call_count == 1 - mock_sleep.assert_not_called() - - -def test_exceptions_are_not_caught(): - """Exceptions raised inside the wrapped function propagate immediately.""" - def fn(): - raise ValueError("boom") - - decorated = retry_on_result(max_retries=3, retry_on=RETRYABLE)(fn) - - with patch(SLEEP_PATH): - with pytest.raises(ValueError, match="boom"): - decorated() - - -# --------------------------------------------------------------------------- -# Sleep / delay behaviour -# --------------------------------------------------------------------------- - -def test_no_sleep_after_final_failed_attempt(): - """sleep() is not called after the last failed attempt.""" - fn = make_results_fn(*[{"Status": "ERROR"}] * 3) - decorated = retry_on_result(max_retries=2, retry_on=RETRYABLE, base_delay=1.0, jitter=False)(fn) - - with patch(SLEEP_PATH) as mock_sleep: - decorated() - - # 3 attempts → sleep after attempt 0 and 1 only - assert mock_sleep.call_count == 2 - - -def test_exponential_backoff_delay_without_jitter(): - """Delay doubles each attempt: base, 2*base, 4*base...""" - fn = make_results_fn(*[{"Status": "ERROR"}] * 4) - decorated = retry_on_result(max_retries=3, retry_on=RETRYABLE, base_delay=1.0, max_delay=999, jitter=False)(fn) - - with patch(SLEEP_PATH) as mock_sleep: - decorated() - - actual_delays = [c.args[0] for c in mock_sleep.call_args_list] - assert actual_delays == [1.0, 2.0, 4.0] - - -def test_delay_capped_at_max_delay(): - """Delay never exceeds max_delay.""" - fn = make_results_fn(*[{"Status": "ERROR"}] * 6) - decorated = retry_on_result(max_retries=5, retry_on=RETRYABLE, base_delay=10.0, max_delay=15.0, jitter=False)(fn) - - with patch(SLEEP_PATH) as mock_sleep: - decorated() - - for c in mock_sleep.call_args_list: - assert c.args[0] <= 15.0 - - -def test_jitter_delay_within_expected_range(): - """With jitter=True, sleep delay is between 50% and 100% of computed value.""" - fn = make_results_fn({"Status": "ERROR"}, {"Status": "SUCCESS"}) - decorated = retry_on_result(max_retries=1, retry_on=RETRYABLE, base_delay=4.0, max_delay=999, jitter=True)(fn) - - with patch(SLEEP_PATH) as mock_sleep: - decorated() - - actual_delay = mock_sleep.call_args_list[0].args[0] - # base_delay * 2^0 = 4.0; jitter range: [2.0, 4.0] - assert 2.0 <= actual_delay <= 4.0 - - -# --------------------------------------------------------------------------- -# Argument forwarding & metadata -# --------------------------------------------------------------------------- - -def test_passes_args_and_kwargs_to_wrapped_function(): - """Positional and keyword arguments are forwarded correctly.""" - received = {} - - def fn(*args, **kwargs): - received["args"] = args - received["kwargs"] = kwargs - return "result" - - decorated = retry_on_result()(fn) - - with patch(SLEEP_PATH): - result = decorated(1, 2, key="val") - - assert result == "result" - assert received["args"] == (1, 2) - assert received["kwargs"] == {"key": "val"} - - -def test_preserves_function_name_and_docstring(): - """@wraps correctly preserves __name__ and __doc__.""" - @retry_on_result() - def my_func(): - """My docstring.""" - - assert my_func.__name__ == "my_func" - assert my_func.__doc__ == "My docstring." diff --git a/tests/active/caseUnderReview/cur_hearingResponse_test.py b/tests/active/caseUnderReview/cur_hearingResponse_test.py index 28fc08d4c..136d436b2 100644 --- a/tests/active/caseUnderReview/cur_hearingResponse_test.py +++ b/tests/active/caseUnderReview/cur_hearingResponse_test.py @@ -28,8 +28,8 @@ class TestCaseUnderReviewHearingResponse: StructField("StatusId", IntegerType()), StructField("CaseStatus", IntegerType()), StructField("Outcome", IntegerType()), - StructField("ListedCentre", StringType()), - StructField("KeyDate", StringType()), + StructField("HearingCentre", StringType()), + StructField("HearingDate", StringType()), StructField("HearingType", StringType()), StructField("CourtName", StringType()), StructField("ListType", StringType()), @@ -76,17 +76,17 @@ def test_additionalInstructionsTribunalResponse(self, spark): "List", "10:00", "10", "Notes", None, None, None, None, None, None, None, None, None, None, None, None ), - ( # CaseStatus 37 and StatusId 1 - skipped for condition CaseStatus in 37 or 38 and max StatusId + ( # CaseStatus 37 and StatusId 1 - skipped by window function (not max StatusId for CaseNo 2) "2", 1, 37, 1, "HearingCentre", "2000-01-01", "Type", "Court", "List", "10:00", "10", "Notes", None, None, None, None, None, None, None, None, None, None, None, None ), - ( # CaseStatus 38 and StatusId 2 with no judicial details - valid replaces above + ( # CaseStatus 38 and StatusId 2 - selected by window, but produces NULL response (only CaseStatus 26 generates output) "2", 2, 38, 1, "HearingCentre2", "2010-01-01", "Type2", "Court2", "List2", "11:00", "20", "Notes2", None, None, None, None, None, None, None, None, None, None, None, None ), - ( # CaseStatus 37 with Judicial details - valid + ( # CaseStatus 37 with judicial details - selected by window, but produces NULL response (only CaseStatus 26 generates output) "3", 1, 37, 0, "HearingCentre", "2000-01-01", "Type", "Court", "List", "10:00", "10", "Notes", "Judge1Last", "Judge1First", "Mr", @@ -94,7 +94,7 @@ def test_additionalInstructionsTribunalResponse(self, spark): "Judge3Last", "Judge3First", "Ms", "ClerkLast", "ClerkFirst", "Miss" ), - ( # CaseStatus 39 - skipped for above case as not in CaseStatus 37 or 38 + ( # CaseStatus 39 - skipped by df_stg filter (not in CaseStatus 26, 37, or 38) "3", 2, 39, 0, "HearingCentre2", "2010-01-01", "Type2", "Court2", "Lis2t", "11:00", "20", "Notes2", "Judge1Last2", "Judge1First2", "Mr2", @@ -154,36 +154,8 @@ def test_additionalInstructionsTribunalResponse(self, spark): Required/Incompatible Judicial Officers: Notes: Notes\ """).strip() - assert resultList[1][0] == dedent("""\ - Listed details from ARIA: - Hearing Centre: HearingCentre2 - Hearing Date: 2010-01-01 - Hearing Type: Type2 - Court: Court2 - List Type: List2 - List Start Time: 11:00 - Judge First Tier: - Court Clerk / Usher: N/A - Start Time: 11:00 - Estimated Duration: 20 - Required/Incompatible Judicial Officers: - Notes: Notes2 - """).strip() - assert resultList[2][0] == dedent("""\ - Listed details from ARIA: - Hearing Centre: HearingCentre - Hearing Date: 2000-01-01 - Hearing Type: Type - Court: Court - List Type: List - List Start Time: 10:00 - Judge First Tier: Judge1Last Judge1First (Mr) Judge2Last Judge2First (Mrs) Judge3Last Judge3First (Ms) - Court Clerk / Usher: ClerkLast ClerkFirst (Miss) - Start Time: 10:00 - Estimated Duration: 10 - Required/Incompatible Judicial Officers: - Notes: Notes\ - """).strip() + assert resultList[1][0] is None # CaseNo 2 - CaseStatus 38, only CaseStatus 26 generates a response + assert resultList[2][0] is None # CaseNo 3 - CaseStatus 37, only CaseStatus 26 generates a response assert resultList[3][0] == dedent("""\ Listed details from ARIA: Hearing Centre: N/A @@ -233,4 +205,4 @@ def test_additionalInstructionsTribunalResponse(self, spark): JudgeLastName3 JudgeFirstName3 ( Judge3 ) : Not Required Notes: Notes\ """).strip() - assert len(resultList) == 6 # 2 non-matching conditions of the 8 cases provided, leaves 6. + assert len(resultList) == 6 # 2 excluded (case 5: Outcome != 0; case 8: AIP representation); 6 remain, 2 with NULL response (CaseNo 2, 3: CaseStatus 37/38) diff --git a/tests/active/cdamFunctionApp/__init__.py b/tests/active/cdamFunctionApp/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/tests/active/cdamFunctionApp/cdam_cdamFunctions_test.py b/tests/active/cdamFunctionApp/cdam_cdamFunctions_test.py index acb2dfdba..b37257c8a 100644 --- a/tests/active/cdamFunctionApp/cdam_cdamFunctions_test.py +++ b/tests/active/cdamFunctionApp/cdam_cdamFunctions_test.py @@ -1,5 +1,5 @@ import pytest -from unittest.mock import patch, MagicMock +from unittest.mock import AsyncMock, patch, MagicMock # Patch Azure SDK clients before the module-level IDAMTokenManager(env="sbox") # instantiation runs, so that importing cdamFunctions does not hit Key Vault. @@ -25,7 +25,6 @@ def mock_response(status_code, json_data=None, text=""): MODULE = "AzureFunctions.ACTIVE.active_cdam.cdamFunctions" -SLEEP_PATH = "AzureFunctions.ACTIVE.active_cdam.retry_decorator.time.sleep" UPLOAD_COMMON = dict( cdam_base_url="http://ccd-case-document-am-api-aat.service.core-compute-aat.internal", @@ -143,12 +142,6 @@ def test_upload_document_returns_non_201_response(mock_post): # process_event fixtures # --------------------------------------------------------------------------- -@pytest.fixture(autouse=True) -def no_retry_sleep(): - """Suppress retry backoff sleeps in all process_event tests.""" - with patch(SLEEP_PATH): - yield - @pytest.fixture def mock_token_managers(): diff --git a/tests/active/cdamFunctionApp/cdam_function_app_test.py b/tests/active/cdamFunctionApp/cdam_function_app_test.py index 39ee72763..319eb43f9 100644 --- a/tests/active/cdamFunctionApp/cdam_function_app_test.py +++ b/tests/active/cdamFunctionApp/cdam_function_app_test.py @@ -107,7 +107,7 @@ def patched(mocks, to_thread_mock=None): patch(f"{MODULE}.ClientSecretCredential", return_value=mocks["storage_credential"]), ] if to_thread_mock is not None: - patches.append(patch("asyncio.to_thread", new=to_thread_mock)) + patches.append(patch(f"{MODULE}.process_event", new=to_thread_mock)) return patches @@ -129,7 +129,7 @@ def test_idempotency_blob_uploaded_atomically_before_processing(): """upload_blob(b"", overwrite=False) is called before process_event runs.""" mocks = setup_mocks(batch_len=1) events = [make_mock_event()] - to_thread = AsyncMock(return_value=dict(PROCESS_SUCCESS_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_SUCCESS_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active(events)) @@ -142,7 +142,7 @@ def test_idempotency_blob_path_contains_case_number_and_idempotency(): mocks = setup_mocks(batch_len=1) case_no = "9999000011112222" events = [make_mock_event(case_no=case_no)] - to_thread = AsyncMock(return_value=dict(PROCESS_SUCCESS_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_SUCCESS_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active(events)) @@ -155,7 +155,7 @@ def test_idempotency_blob_path_contains_case_number_and_idempotency(): def test_idempotency_container_is_af_idempotency(): """The BlobServiceClient is asked for the 'af-idempotency' container.""" mocks = setup_mocks(batch_len=1) - to_thread = AsyncMock(return_value=dict(PROCESS_SUCCESS_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_SUCCESS_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active([make_mock_event()])) @@ -172,12 +172,12 @@ def test_in_progress_event_skipped_when_idempotency_blob_exists(): mocks = setup_mocks(batch_len=0) mocks["idempotency_blob"].upload_blob.side_effect = ResourceExistsError() events = [make_mock_event()] - to_thread = AsyncMock(return_value=dict(PROCESS_SUCCESS_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_SUCCESS_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active(events)) - to_thread.assert_not_awaited() + to_thread.assert_not_called() mocks["batch"].add.assert_not_called() mocks["producer"].send_batch.assert_not_called() @@ -195,12 +195,12 @@ def test_duplicate_skip_continues_to_next_event(): ] events = [make_mock_event(case_no="1111111111111111"), make_mock_event(case_no="2222222222222222")] - to_thread = AsyncMock(return_value=dict(PROCESS_SUCCESS_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_SUCCESS_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active(events)) - to_thread.assert_awaited_once() + to_thread.assert_called_once() # --------------------------------------------------------------------------- @@ -210,7 +210,7 @@ def test_duplicate_skip_continues_to_next_event(): def test_idempotency_blob_not_deleted_on_success(): """SUCCESS result: idempotency blob is kept to prevent future duplicate runs.""" mocks = setup_mocks(batch_len=1) - to_thread = AsyncMock(return_value=dict(PROCESS_SUCCESS_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_SUCCESS_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active([make_mock_event()])) @@ -221,7 +221,7 @@ def test_idempotency_blob_not_deleted_on_success(): def test_idempotency_blob_deleted_on_error_result(): """ERROR result: idempotency blob is deleted so the event can be retried.""" mocks = setup_mocks(batch_len=1) - to_thread = AsyncMock(return_value=dict(PROCESS_ERROR_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_ERROR_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active([make_mock_event()])) @@ -247,7 +247,7 @@ def test_idempotency_blob_not_touched_when_no_events(): def test_single_event_success_sends_final_batch(): """Happy path: one event processed, result batch sent.""" mocks = setup_mocks(batch_len=1) - to_thread = AsyncMock(return_value=dict(PROCESS_SUCCESS_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_SUCCESS_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active([make_mock_event()])) @@ -265,13 +265,12 @@ def test_process_event_called_with_correct_args(): file_content_type = "application/pdf" events = [make_mock_event(case_no=case_no, run_id=run_id, file_name=file_name, file_url=file_url, file_content_type=file_content_type)] - to_thread = AsyncMock(return_value=dict(PROCESS_SUCCESS_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_SUCCESS_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active(events)) - to_thread.assert_awaited_once_with( - app_module.process_event, + to_thread.assert_called_once_with( app_module.ENV, case_no, run_id, @@ -307,7 +306,7 @@ def test_producer_created_from_kv_secret_value(): def test_error_result_is_still_added_to_batch(): """ERROR results are sent to the results Event Hub (so downstream can handle them).""" mocks = setup_mocks(batch_len=1) - to_thread = AsyncMock(return_value=dict(PROCESS_ERROR_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_ERROR_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active([make_mock_event()])) @@ -330,7 +329,7 @@ def test_multiple_events_all_results_added_to_batch(): """Three events → three add() calls and one final send_batch.""" mocks = setup_mocks(batch_len=3) events = [make_mock_event(case_no=f"REF{i:016d}") for i in range(3)] - to_thread = AsyncMock(return_value=dict(PROCESS_SUCCESS_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_SUCCESS_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active(events)) @@ -349,7 +348,7 @@ def test_batch_overflow_flushes_old_batch_and_creates_new_one(): mocks["batch"].add.side_effect = ValueError("Batch is full") mocks["producer"].create_batch = AsyncMock(side_effect=[mocks["batch"], second_batch]) - to_thread = AsyncMock(return_value=dict(PROCESS_SUCCESS_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_SUCCESS_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active([make_mock_event()])) @@ -364,7 +363,7 @@ def test_individual_event_error_does_not_stop_other_events(): mocks = setup_mocks(batch_len=1) call_count = 0 - async def side_effect(*_): + def side_effect(*_): nonlocal call_count call_count += 1 if call_count == 1: @@ -383,7 +382,7 @@ async def side_effect(*_): def test_cleanup_always_called_on_success(): """kv_client.close and credential.close are awaited in the finally block.""" mocks = setup_mocks(batch_len=1) - to_thread = AsyncMock(return_value=dict(PROCESS_SUCCESS_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_SUCCESS_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active([make_mock_event()])) @@ -395,7 +394,7 @@ def test_cleanup_always_called_on_success(): def test_cleanup_called_even_when_event_processing_raises(): """Cleanup runs even when an event raises an exception.""" mocks = setup_mocks(batch_len=0) - to_thread = AsyncMock(side_effect=Exception("processing failed")) + to_thread = MagicMock(side_effect=Exception("processing failed")) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active([make_mock_event()])) @@ -408,7 +407,7 @@ def test_error_result_sent_even_when_delete_blob_raises(): """When delete_blob raises on ERROR, the result is still added to the batch and sent.""" mocks = setup_mocks(batch_len=1) mocks["idempotency_blob"].delete_blob.side_effect = Exception("Storage error") - to_thread = AsyncMock(return_value=dict(PROCESS_ERROR_RESULT)) + to_thread = MagicMock(return_value=dict(PROCESS_ERROR_RESULT)) with apply_patches(patched(mocks, to_thread_mock=to_thread), mocks): run(eventhub_trigger_active([make_mock_event()])) diff --git a/tests/active/cdamFunctionApp/cdam_retry_decorator_test.py b/tests/active/cdamFunctionApp/cdam_retry_decorator_test.py deleted file mode 100644 index 067f88455..000000000 --- a/tests/active/cdamFunctionApp/cdam_retry_decorator_test.py +++ /dev/null @@ -1,198 +0,0 @@ -import pytest -from unittest.mock import patch - -from AzureFunctions.ACTIVE.active_cdam.retry_decorator import retry_on_result - -SLEEP_PATH = "AzureFunctions.ACTIVE.active_cdam.retry_decorator.time.sleep" - -RETRYABLE = lambda r: isinstance(r, dict) and r.get("Status") == "ERROR" - - -def make_results_fn(*results): - """Return a function that yields successive return values.""" - values = list(results) - - def fn(*args, **kwargs): - fn.call_count += 1 - return values.pop(0) if values else None - - fn.call_count = 0 - fn.__name__ = "fn" - fn.__doc__ = None - return fn - - -# --------------------------------------------------------------------------- -# Basic behaviour -# --------------------------------------------------------------------------- - -def test_success_on_first_attempt_no_sleep(): - """Function returns a non-retryable result first try — no retries, no sleep.""" - fn = make_results_fn("ok") - decorated = retry_on_result(max_retries=3, retry_on=RETRYABLE)(fn) - - with patch(SLEEP_PATH) as mock_sleep: - result = decorated() - - assert result == "ok" - assert fn.call_count == 1 - mock_sleep.assert_not_called() - - -def test_no_retry_when_retry_on_is_none(): - """With retry_on=None, an ERROR result is returned immediately without retrying.""" - fn = make_results_fn({"Status": "ERROR"}) - decorated = retry_on_result(max_retries=3, retry_on=None)(fn) - - with patch(SLEEP_PATH) as mock_sleep: - result = decorated() - - assert result == {"Status": "ERROR"} - assert fn.call_count == 1 - mock_sleep.assert_not_called() - - -def test_retries_on_error_result_then_succeeds(): - """Function returns ERROR twice then SUCCESS; called 3 times total.""" - fn = make_results_fn({"Status": "ERROR"}, {"Status": "ERROR"}, {"Status": "SUCCESS"}) - decorated = retry_on_result(max_retries=3, retry_on=RETRYABLE, base_delay=1.0, jitter=False)(fn) - - with patch(SLEEP_PATH): - result = decorated() - - assert result == {"Status": "SUCCESS"} - assert fn.call_count == 3 - - -def test_returns_last_error_result_after_all_retries_exhausted(): - """After max_retries+1 attempts all returning ERROR, the last result is returned.""" - fn = make_results_fn(*[{"Status": "ERROR"} for _ in range(3)]) - decorated = retry_on_result(max_retries=2, retry_on=RETRYABLE, jitter=False)(fn) - - with patch(SLEEP_PATH): - result = decorated() - - assert result["Status"] == "ERROR" - assert fn.call_count == 3 - - -def test_total_call_count_equals_max_retries_plus_one(): - """With max_retries=N, function is called N+1 times total.""" - fn = make_results_fn(*[{"Status": "ERROR"} for _ in range(5)]) - decorated = retry_on_result(max_retries=4, retry_on=RETRYABLE, jitter=False)(fn) - - with patch(SLEEP_PATH): - decorated() - - assert fn.call_count == 5 - - -def test_zero_retries_returns_error_result_immediately(): - """max_retries=0 means no retries; ERROR result returned after one attempt.""" - fn = make_results_fn({"Status": "ERROR"}) - decorated = retry_on_result(max_retries=0, retry_on=RETRYABLE)(fn) - - with patch(SLEEP_PATH) as mock_sleep: - result = decorated() - - assert result["Status"] == "ERROR" - assert fn.call_count == 1 - mock_sleep.assert_not_called() - - -def test_exceptions_are_not_caught(): - """Exceptions raised inside the wrapped function propagate immediately.""" - def fn(): - raise ValueError("boom") - - decorated = retry_on_result(max_retries=3, retry_on=RETRYABLE)(fn) - - with patch(SLEEP_PATH): - with pytest.raises(ValueError, match="boom"): - decorated() - - -# --------------------------------------------------------------------------- -# Sleep / delay behaviour -# --------------------------------------------------------------------------- - -def test_no_sleep_after_final_failed_attempt(): - """sleep() is not called after the last failed attempt.""" - fn = make_results_fn(*[{"Status": "ERROR"}] * 3) - decorated = retry_on_result(max_retries=2, retry_on=RETRYABLE, base_delay=1.0, jitter=False)(fn) - - with patch(SLEEP_PATH) as mock_sleep: - decorated() - - # 3 attempts → sleep after attempt 0 and 1 only - assert mock_sleep.call_count == 2 - - -def test_exponential_backoff_delay_without_jitter(): - """Delay doubles each attempt: base, 2*base, 4*base...""" - fn = make_results_fn(*[{"Status": "ERROR"}] * 4) - decorated = retry_on_result(max_retries=3, retry_on=RETRYABLE, base_delay=1.0, max_delay=999, jitter=False)(fn) - - with patch(SLEEP_PATH) as mock_sleep: - decorated() - - actual_delays = [c.args[0] for c in mock_sleep.call_args_list] - assert actual_delays == [1.0, 2.0, 4.0] - - -def test_delay_capped_at_max_delay(): - """Delay never exceeds max_delay.""" - fn = make_results_fn(*[{"Status": "ERROR"}] * 6) - decorated = retry_on_result(max_retries=5, retry_on=RETRYABLE, base_delay=10.0, max_delay=15.0, jitter=False)(fn) - - with patch(SLEEP_PATH) as mock_sleep: - decorated() - - for c in mock_sleep.call_args_list: - assert c.args[0] <= 15.0 - - -def test_jitter_delay_within_expected_range(): - """With jitter=True, sleep delay is between 50% and 100% of computed value.""" - fn = make_results_fn({"Status": "ERROR"}, {"Status": "SUCCESS"}) - decorated = retry_on_result(max_retries=1, retry_on=RETRYABLE, base_delay=4.0, max_delay=999, jitter=True)(fn) - - with patch(SLEEP_PATH) as mock_sleep: - decorated() - - actual_delay = mock_sleep.call_args_list[0].args[0] - # base_delay * 2^0 = 4.0; jitter range: [2.0, 4.0] - assert 2.0 <= actual_delay <= 4.0 - - -# --------------------------------------------------------------------------- -# Argument forwarding & metadata -# --------------------------------------------------------------------------- - -def test_passes_args_and_kwargs_to_wrapped_function(): - """Positional and keyword arguments are forwarded correctly.""" - received = {} - - def fn(*args, **kwargs): - received["args"] = args - received["kwargs"] = kwargs - return "result" - - decorated = retry_on_result()(fn) - - with patch(SLEEP_PATH): - result = decorated(1, 2, key="val") - - assert result == "result" - assert received["args"] == (1, 2) - assert received["kwargs"] == {"key": "val"} - - -def test_preserves_function_name_and_docstring(): - """@wraps correctly preserves __name__ and __doc__.""" - @retry_on_result() - def my_func(): - """My docstring.""" - - assert my_func.__name__ == "my_func" - assert my_func.__doc__ == "My docstring." diff --git a/tests/active/functionApp/ccdFunctions_test.py b/tests/active/functionApp/ccdFunctions_test.py index 9ee7e7b63..6ba40e05f 100644 --- a/tests/active/functionApp/ccdFunctions_test.py +++ b/tests/active/functionApp/ccdFunctions_test.py @@ -2,14 +2,6 @@ import pytest from unittest.mock import Mock, patch -SLEEP_PATH = "AzureFunctions.ACTIVE.active_ccd.retry_decorator.time.sleep" - - -@pytest.fixture(autouse=True) -def no_retry_sleep(): - with patch(SLEEP_PATH): - yield - # --------------------------------------------------------------------------- # Helpers diff --git a/tests/active/functionApp/functionApp_test.py b/tests/active/functionApp/functionApp_test.py index 7e44f463a..d8efb23cc 100644 --- a/tests/active/functionApp/functionApp_test.py +++ b/tests/active/functionApp/functionApp_test.py @@ -4,14 +4,6 @@ import pytest from unittest.mock import patch, MagicMock, ANY, AsyncMock -SLEEP_PATH = "AzureFunctions.ACTIVE.active_ccd.retry_decorator.time.sleep" - - -@pytest.fixture(autouse=True) -def no_retry_sleep(): - with patch(SLEEP_PATH): - yield - # ccdFunctions instantiates IDAMTokenManager at module level, which calls Azure # Key Vault in __init__. Patch the Azure SDK before importing to prevent real # network calls during module load. diff --git a/tests/active/functionApp/retry_decorator_test.py b/tests/active/functionApp/retry_decorator_test.py deleted file mode 100644 index 3d9aecb3d..000000000 --- a/tests/active/functionApp/retry_decorator_test.py +++ /dev/null @@ -1,198 +0,0 @@ -import pytest -from unittest.mock import patch - -from AzureFunctions.ACTIVE.active_ccd.retry_decorator import retry_on_result - -SLEEP_PATH = "AzureFunctions.ACTIVE.active_ccd.retry_decorator.time.sleep" - -RETRYABLE = lambda r: isinstance(r, dict) and r.get("Status") == "ERROR" - - -def make_results_fn(*results): - """Return a function that yields successive return values.""" - values = list(results) - - def fn(*args, **kwargs): - fn.call_count += 1 - return values.pop(0) if values else None - - fn.call_count = 0 - fn.__name__ = "fn" - fn.__doc__ = None - return fn - - -# --------------------------------------------------------------------------- -# Basic behaviour -# --------------------------------------------------------------------------- - -def test_success_on_first_attempt_no_sleep(): - """Function returns a non-retryable result first try — no retries, no sleep.""" - fn = make_results_fn("ok") - decorated = retry_on_result(max_retries=3, retry_on=RETRYABLE)(fn) - - with patch(SLEEP_PATH) as mock_sleep: - result = decorated() - - assert result == "ok" - assert fn.call_count == 1 - mock_sleep.assert_not_called() - - -def test_no_retry_when_retry_on_is_none(): - """With retry_on=None, an ERROR result is returned immediately without retrying.""" - fn = make_results_fn({"Status": "ERROR"}) - decorated = retry_on_result(max_retries=3, retry_on=None)(fn) - - with patch(SLEEP_PATH) as mock_sleep: - result = decorated() - - assert result == {"Status": "ERROR"} - assert fn.call_count == 1 - mock_sleep.assert_not_called() - - -def test_retries_on_error_result_then_succeeds(): - """Function returns ERROR twice then SUCCESS; called 3 times total.""" - fn = make_results_fn({"Status": "ERROR"}, {"Status": "ERROR"}, {"Status": "SUCCESS"}) - decorated = retry_on_result(max_retries=3, retry_on=RETRYABLE, base_delay=1.0, jitter=False)(fn) - - with patch(SLEEP_PATH): - result = decorated() - - assert result == {"Status": "SUCCESS"} - assert fn.call_count == 3 - - -def test_returns_last_error_result_after_all_retries_exhausted(): - """After max_retries+1 attempts all returning ERROR, the last result is returned.""" - fn = make_results_fn(*[{"Status": "ERROR"} for _ in range(3)]) - decorated = retry_on_result(max_retries=2, retry_on=RETRYABLE, jitter=False)(fn) - - with patch(SLEEP_PATH): - result = decorated() - - assert result["Status"] == "ERROR" - assert fn.call_count == 3 - - -def test_total_call_count_equals_max_retries_plus_one(): - """With max_retries=N, function is called N+1 times total.""" - fn = make_results_fn(*[{"Status": "ERROR"} for _ in range(5)]) - decorated = retry_on_result(max_retries=4, retry_on=RETRYABLE, jitter=False)(fn) - - with patch(SLEEP_PATH): - decorated() - - assert fn.call_count == 5 - - -def test_zero_retries_returns_error_result_immediately(): - """max_retries=0 means no retries; ERROR result returned after one attempt.""" - fn = make_results_fn({"Status": "ERROR"}) - decorated = retry_on_result(max_retries=0, retry_on=RETRYABLE)(fn) - - with patch(SLEEP_PATH) as mock_sleep: - result = decorated() - - assert result["Status"] == "ERROR" - assert fn.call_count == 1 - mock_sleep.assert_not_called() - - -def test_exceptions_are_not_caught(): - """Exceptions raised inside the wrapped function propagate immediately.""" - def fn(): - raise ValueError("boom") - - decorated = retry_on_result(max_retries=3, retry_on=RETRYABLE)(fn) - - with patch(SLEEP_PATH): - with pytest.raises(ValueError, match="boom"): - decorated() - - -# --------------------------------------------------------------------------- -# Sleep / delay behaviour -# --------------------------------------------------------------------------- - -def test_no_sleep_after_final_failed_attempt(): - """sleep() is not called after the last failed attempt.""" - fn = make_results_fn(*[{"Status": "ERROR"}] * 3) - decorated = retry_on_result(max_retries=2, retry_on=RETRYABLE, base_delay=1.0, jitter=False)(fn) - - with patch(SLEEP_PATH) as mock_sleep: - decorated() - - # 3 attempts → sleep after attempt 0 and 1 only - assert mock_sleep.call_count == 2 - - -def test_exponential_backoff_delay_without_jitter(): - """Delay doubles each attempt: base, 2*base, 4*base...""" - fn = make_results_fn(*[{"Status": "ERROR"}] * 4) - decorated = retry_on_result(max_retries=3, retry_on=RETRYABLE, base_delay=1.0, max_delay=999, jitter=False)(fn) - - with patch(SLEEP_PATH) as mock_sleep: - decorated() - - actual_delays = [c.args[0] for c in mock_sleep.call_args_list] - assert actual_delays == [1.0, 2.0, 4.0] - - -def test_delay_capped_at_max_delay(): - """Delay never exceeds max_delay.""" - fn = make_results_fn(*[{"Status": "ERROR"}] * 6) - decorated = retry_on_result(max_retries=5, retry_on=RETRYABLE, base_delay=10.0, max_delay=15.0, jitter=False)(fn) - - with patch(SLEEP_PATH) as mock_sleep: - decorated() - - for c in mock_sleep.call_args_list: - assert c.args[0] <= 15.0 - - -def test_jitter_delay_within_expected_range(): - """With jitter=True, sleep delay is between 50% and 100% of computed value.""" - fn = make_results_fn({"Status": "ERROR"}, {"Status": "SUCCESS"}) - decorated = retry_on_result(max_retries=1, retry_on=RETRYABLE, base_delay=4.0, max_delay=999, jitter=True)(fn) - - with patch(SLEEP_PATH) as mock_sleep: - decorated() - - actual_delay = mock_sleep.call_args_list[0].args[0] - # base_delay * 2^0 = 4.0; jitter range: [2.0, 4.0] - assert 2.0 <= actual_delay <= 4.0 - - -# --------------------------------------------------------------------------- -# Argument forwarding & metadata -# --------------------------------------------------------------------------- - -def test_passes_args_and_kwargs_to_wrapped_function(): - """Positional and keyword arguments are forwarded correctly.""" - received = {} - - def fn(*args, **kwargs): - received["args"] = args - received["kwargs"] = kwargs - return "result" - - decorated = retry_on_result()(fn) - - with patch(SLEEP_PATH): - result = decorated(1, 2, key="val") - - assert result == "result" - assert received["args"] == (1, 2) - assert received["kwargs"] == {"key": "val"} - - -def test_preserves_function_name_and_docstring(): - """@wraps correctly preserves __name__ and __doc__.""" - @retry_on_result() - def my_func(): - """My docstring.""" - - assert my_func.__name__ == "my_func" - assert my_func.__doc__ == "My docstring." diff --git a/tests/active/reasonForAppealSubmitted/rfas_hearingResponse_test.py b/tests/active/reasonForAppealSubmitted/rfas_hearingResponse_test.py index a33d0f601..3de3a8d7a 100644 --- a/tests/active/reasonForAppealSubmitted/rfas_hearingResponse_test.py +++ b/tests/active/reasonForAppealSubmitted/rfas_hearingResponse_test.py @@ -28,8 +28,8 @@ class TestReasonForAppealSubmittedHearingResponse: StructField("StatusId", IntegerType()), StructField("CaseStatus", IntegerType()), StructField("Outcome", IntegerType()), - StructField("ListedCentre", StringType()), - StructField("KeyDate", StringType()), + StructField("HearingCentre", StringType()), + StructField("HearingDate", StringType()), StructField("HearingType", StringType()), StructField("CourtName", StringType()), StructField("ListType", StringType()), @@ -76,17 +76,17 @@ def test_additionalInstructionsTribunalResponse(self, spark): "List", "10:00", "10", "Notes", None, None, None, None, None, None, None, None, None, None, None, None ), - ( # CaseStatus 37 and StatusId 1 - skipped for condition CaseStatus in 37 or 38 and max StatusId + ( # CaseStatus 37 and StatusId 1 - skipped by window function (not max StatusId for CaseNo 2) "2", 1, 37, 1, "HearingCentre", "2000-01-01", "Type", "Court", "List", "10:00", "10", "Notes", None, None, None, None, None, None, None, None, None, None, None, None ), - ( # CaseStatus 38 and StatusId 2 with no judicial details - valid replaces above + ( # CaseStatus 38 and StatusId 2 - selected by window, but produces NULL response (only CaseStatus 26 generates output) "2", 2, 38, 1, "HearingCentre2", "2010-01-01", "Type2", "Court2", "List2", "11:00", "20", "Notes2", None, None, None, None, None, None, None, None, None, None, None, None ), - ( # CaseStatus 37 with Judicial details - valid + ( # CaseStatus 37 with judicial details - selected by window, but produces NULL response (only CaseStatus 26 generates output) "3", 1, 37, 0, "HearingCentre", "2000-01-01", "Type", "Court", "List", "10:00", "10", "Notes", "Judge1Last", "Judge1First", "Mr", @@ -94,7 +94,7 @@ def test_additionalInstructionsTribunalResponse(self, spark): "Judge3Last", "Judge3First", "Ms", "ClerkLast", "ClerkFirst", "Miss" ), - ( # CaseStatus 39 - skipped for above case as not in CaseStatus 37 or 38 + ( # CaseStatus 39 - skipped by df_stg filter (not in CaseStatus 26, 37, or 38) "3", 2, 39, 0, "HearingCentre2", "2010-01-01", "Type2", "Court2", "Lis2t", "11:00", "20", "Notes2", "Judge1Last2", "Judge1First2", "Mr2", @@ -154,36 +154,8 @@ def test_additionalInstructionsTribunalResponse(self, spark): Required/Incompatible Judicial Officers: Notes: Notes\ """).strip() - assert resultList[1][0] == dedent("""\ - Listed details from ARIA: - Hearing Centre: HearingCentre2 - Hearing Date: 2010-01-01 - Hearing Type: Type2 - Court: Court2 - List Type: List2 - List Start Time: 11:00 - Judge First Tier: - Court Clerk / Usher: N/A - Start Time: 11:00 - Estimated Duration: 20 - Required/Incompatible Judicial Officers: - Notes: Notes2 - """).strip() - assert resultList[2][0] == dedent("""\ - Listed details from ARIA: - Hearing Centre: HearingCentre - Hearing Date: 2000-01-01 - Hearing Type: Type - Court: Court - List Type: List - List Start Time: 10:00 - Judge First Tier: Judge1Last Judge1First (Mr) Judge2Last Judge2First (Mrs) Judge3Last Judge3First (Ms) - Court Clerk / Usher: ClerkLast ClerkFirst (Miss) - Start Time: 10:00 - Estimated Duration: 10 - Required/Incompatible Judicial Officers: - Notes: Notes\ - """).strip() + assert resultList[1][0] is None # CaseNo 2 - CaseStatus 38, only CaseStatus 26 generates a response + assert resultList[2][0] is None # CaseNo 3 - CaseStatus 37, only CaseStatus 26 generates a response assert resultList[3][0] == dedent("""\ Listed details from ARIA: Hearing Centre: N/A @@ -233,4 +205,4 @@ def test_additionalInstructionsTribunalResponse(self, spark): JudgeLastName3 JudgeFirstName3 ( Judge3 ) : Not Required Notes: Notes\ """).strip() - assert len(resultList) == 6 # 2 non-matching conditions of the 8 cases provided, leaves 6. + assert len(resultList) == 6 # 2 excluded (case 5: Outcome != 0; case 8: LR representation); 6 remain, 2 with NULL response (CaseNo 2, 3: CaseStatus 37/38) diff --git a/tests/requirements.txt b/tests/requirements.txt index 798906386..a3a50430f 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -10,4 +10,5 @@ azure-storage-blob aiohttp pyspark pandas -azure.functions \ No newline at end of file +azure.functions +tenacity \ No newline at end of file From b34b6d7fc633e6578ab51aea5525083f42dbcff0 Mon Sep 17 00:00:00 2001 From: Tony Chow Date: Thu, 4 Jun 2026 00:34:09 +0100 Subject: [PATCH 2/4] Retry based on returned Status Code rather than logged error message --- .../ACTIVE/active_caselink_ccd/cl_ccdFunctions.py | 6 ++++++ .../ACTIVE/active_caselink_ccd/cl_tokenManager.py | 11 +++++++++-- .../ACTIVE/active_caselink_ccd/function_app.py | 4 ++-- AzureFunctions/ACTIVE/active_ccd/ccdFunctions.py | 8 +++++++- AzureFunctions/ACTIVE/active_ccd/function_app.py | 4 ++-- AzureFunctions/ACTIVE/active_ccd/tokenManager.py | 10 ++++++++-- AzureFunctions/ACTIVE/active_cdam/cdamFunctions.py | 5 +++++ .../ACTIVE/active_cdam/cdam_tokenManager.py | 11 +++++++++-- AzureFunctions/ACTIVE/active_cdam/function_app.py | 4 ++-- 9 files changed, 50 insertions(+), 13 deletions(-) diff --git a/AzureFunctions/ACTIVE/active_caselink_ccd/cl_ccdFunctions.py b/AzureFunctions/ACTIVE/active_caselink_ccd/cl_ccdFunctions.py index c8364d40f..57a0d1a8d 100644 --- a/AzureFunctions/ACTIVE/active_caselink_ccd/cl_ccdFunctions.py +++ b/AzureFunctions/ACTIVE/active_caselink_ccd/cl_ccdFunctions.py @@ -156,6 +156,7 @@ def process_event(env, ccdReference, runId, caseLinkPayload, PR_REFERENCE, overw "StartDateTime": startDateTime, "EndDateTime": datetime.now(timezone.utc).isoformat(), "Status": "ERROR", + "StatusCode": getattr(e, "status_code", None), "Error": f"failed to gather IDAM token: {e}" } return result @@ -170,6 +171,7 @@ def process_event(env, ccdReference, runId, caseLinkPayload, PR_REFERENCE, overw "StartDateTime": startDateTime, "EndDateTime": datetime.now(timezone.utc).isoformat(), "Status": "ERROR", + "StatusCode": getattr(e, "status_code", None), "Error": f"failed to gather s2s token: {e}" } return result @@ -230,6 +232,7 @@ def process_event(env, ccdReference, runId, caseLinkPayload, PR_REFERENCE, overw "StartDateTime": startDateTime, "EndDateTime": datetime.now(timezone.utc).isoformat(), "Status": "ERROR", + "StatusCode": start_response.status_code if start_response is not None else None, "Error": f"Case link event failed: {status_code} - {text}" } return result @@ -267,6 +270,7 @@ def process_event(env, ccdReference, runId, caseLinkPayload, PR_REFERENCE, overw "StartDateTime": startDateTime, "EndDateTime": datetime.now(timezone.utc).isoformat(), "Status": "ERROR", + "StatusCode": validate_case_response.status_code if validate_case_response is not None else None, "Error": f"Case link validation failed: {status_code} - {text}", } return result @@ -303,6 +307,7 @@ def process_event(env, ccdReference, runId, caseLinkPayload, PR_REFERENCE, overw "StartDateTime": startDateTime, "EndDateTime": datetime.now(timezone.utc).isoformat(), "Status": "ERROR", + "StatusCode": submit_case_response.status_code if submit_case_response is not None else None, "Error": f"Case link submission failed: {status_code} - {text}", } @@ -316,6 +321,7 @@ def process_event(env, ccdReference, runId, caseLinkPayload, PR_REFERENCE, overw "StartDateTime": startDateTime, "EndDateTime": datetime.now(timezone.utc).isoformat(), "Status": "SUCCESS", + "StatusCode": submit_case_response.status_code, "Error": None } diff --git a/AzureFunctions/ACTIVE/active_caselink_ccd/cl_tokenManager.py b/AzureFunctions/ACTIVE/active_caselink_ccd/cl_tokenManager.py index 67bb7f8b8..29a582fda 100644 --- a/AzureFunctions/ACTIVE/active_caselink_ccd/cl_tokenManager.py +++ b/AzureFunctions/ACTIVE/active_caselink_ccd/cl_tokenManager.py @@ -5,6 +5,13 @@ from azure.keyvault.secrets import SecretClient from datetime import datetime, timezone, timedelta + +class TokenError(RuntimeError): + def __init__(self, message, status_code=None): + super().__init__(message) + self.status_code = status_code + + class IDAMTokenManager: def __init__(self, env: str, skew: int = 1800): self.env = env @@ -64,7 +71,7 @@ def _fetch_token(self): idam_response = requests.post(self.token_url, headers=headers, data=data) if idam_response.status_code != 200: - raise RuntimeError(f"Token request failed: {idam_response.status_code} {idam_response.text}") + raise TokenError(f"Token request failed: {idam_response.status_code} {idam_response.text}", status_code=idam_response.status_code) payload = idam_response.json() @@ -186,7 +193,7 @@ def _fetch_s2s_token(self): raise EOFError(f"Error reuesting service to service token: {e}") # Ensure you get a 200 response else raise an error if s2s_response.status_code != 200: - raise RuntimeError(f"Error requesting service to service token: {s2s_response.text}") + raise TokenError(f"Error requesting service to service token: {s2s_response.status_code} {s2s_response.text}", status_code=s2s_response.status_code) # Extract token from response try: diff --git a/AzureFunctions/ACTIVE/active_caselink_ccd/function_app.py b/AzureFunctions/ACTIVE/active_caselink_ccd/function_app.py index 20ab47dbd..427674f3a 100644 --- a/AzureFunctions/ACTIVE/active_caselink_ccd/function_app.py +++ b/AzureFunctions/ACTIVE/active_caselink_ccd/function_app.py @@ -47,8 +47,7 @@ def _is_retryable(result): RETRYABLE_STATUS_CODES = {408, 409, 429, 500, 502, 503, 504} if not (isinstance(result, dict) and result.get("Status") == "ERROR"): return False - error = result.get("Error", "") - return any(f"failed: {code}" in error for code in RETRYABLE_STATUS_CODES) + return result.get("StatusCode") in RETRYABLE_STATUS_CODES @app.function_name("eventhub_trigger") @@ -137,6 +136,7 @@ async def eventhub_trigger_active(azeventhub: List[func.EventHubEvent]): except Exception as delete_error: logger.warning(f"[IDEMPOTENCY][CASELINK] Failed to delete blob for {ccdReference}: {delete_error}") + result.pop("StatusCode", None) result_json = json.dumps(result) try: diff --git a/AzureFunctions/ACTIVE/active_ccd/ccdFunctions.py b/AzureFunctions/ACTIVE/active_ccd/ccdFunctions.py index 2223938a8..127db692e 100644 --- a/AzureFunctions/ACTIVE/active_ccd/ccdFunctions.py +++ b/AzureFunctions/ACTIVE/active_ccd/ccdFunctions.py @@ -145,6 +145,7 @@ def process_case(env, caseNo, payloadData, runId, state, PR_REFERENCE): "CaseNo": caseNo, "State": state, "Status": "ERROR", + "StatusCode": getattr(e, "status_code", None), "Error": f"failed to gather IDAM token: {e}", "EndDateTime": datetime.now(timezone.utc).isoformat(), } @@ -158,6 +159,7 @@ def process_case(env, caseNo, payloadData, runId, state, PR_REFERENCE): "CaseNo": caseNo, "State": state, "Status": "ERROR", + "StatusCode": getattr(e, "status_code", None), "Error": f"failed to gather s2s token: {e}", "EndDateTime": datetime.now(timezone.utc).isoformat(), } @@ -201,6 +203,7 @@ def process_case(env, caseNo, payloadData, runId, state, PR_REFERENCE): "CaseNo": caseNo, "State": state, "Status": "ERROR", + "StatusCode": start_response.status_code if start_response is not None else None, "Error": f"Case creation failed: {status_code} - {text}", "EndDateTime": datetime.now(timezone.utc).isoformat() } @@ -237,7 +240,8 @@ def process_case(env, caseNo, payloadData, runId, state, PR_REFERENCE): "RunID": runId, "CaseNo": caseNo, "State": state, - "Status": "ERROR", # change this to the validate response code + "Status": "ERROR", + "StatusCode": validate_case_response.status_code if validate_case_response is not None else None, "Error": f"Case validation failed: {status_code} - {text}", "EndDateTime": datetime.now(timezone.utc).isoformat(), "StartResponse": start_response_data @@ -274,6 +278,7 @@ def process_case(env, caseNo, payloadData, runId, state, PR_REFERENCE): "CaseNo": caseNo, "State": state, "Status": "ERROR", + "StatusCode": submit_case_response.status_code if submit_case_response is not None else None, "Error": f"Case submission failed: {status_code} - {text}", "EndDateTime": datetime.now(timezone.utc).isoformat(), "StartResponse": start_response_data @@ -287,6 +292,7 @@ def process_case(env, caseNo, payloadData, runId, state, PR_REFERENCE): "CaseNo": caseNo, "State": state, "Status": "SUCCESS", + "StatusCode": submit_case_response.status_code, "Error": None, "EndDateTime": datetime.now(timezone.utc).isoformat(), "CCDCaseID": submit_case_response.json()["id"], diff --git a/AzureFunctions/ACTIVE/active_ccd/function_app.py b/AzureFunctions/ACTIVE/active_ccd/function_app.py index c7e4da0fb..0b9b389a5 100644 --- a/AzureFunctions/ACTIVE/active_ccd/function_app.py +++ b/AzureFunctions/ACTIVE/active_ccd/function_app.py @@ -50,8 +50,7 @@ def _is_retryable(result): RETRYABLE_STATUS_CODES = {408, 409, 429, 500, 502, 503, 504} if not (isinstance(result, dict) and result.get("Status") == "ERROR"): return False - error = result.get("Error", "") - return any(f"failed: {code}" in error for code in RETRYABLE_STATUS_CODES) + return result.get("StatusCode") in RETRYABLE_STATUS_CODES @app.function_name("eventhub_trigger") @@ -137,6 +136,7 @@ async def eventhub_trigger_active(azeventhub: List[func.EventHubEvent]): except Exception as delete_error: logger.warning(f"[IDEMPOTENCY] Failed to delete blob for {caseNo}: {delete_error}") + result.pop("StatusCode", None) result_json = json.dumps(result) try: event_data_batch.add(EventData(result_json)) diff --git a/AzureFunctions/ACTIVE/active_ccd/tokenManager.py b/AzureFunctions/ACTIVE/active_ccd/tokenManager.py index 1b764bfc5..29a582fda 100644 --- a/AzureFunctions/ACTIVE/active_ccd/tokenManager.py +++ b/AzureFunctions/ACTIVE/active_ccd/tokenManager.py @@ -6,6 +6,12 @@ from datetime import datetime, timezone, timedelta +class TokenError(RuntimeError): + def __init__(self, message, status_code=None): + super().__init__(message) + self.status_code = status_code + + class IDAMTokenManager: def __init__(self, env: str, skew: int = 1800): self.env = env @@ -65,7 +71,7 @@ def _fetch_token(self): idam_response = requests.post(self.token_url, headers=headers, data=data) if idam_response.status_code != 200: - raise RuntimeError(f"Token request failed: {idam_response.status_code} {idam_response.text}") + raise TokenError(f"Token request failed: {idam_response.status_code} {idam_response.text}", status_code=idam_response.status_code) payload = idam_response.json() @@ -187,7 +193,7 @@ def _fetch_s2s_token(self): raise EOFError(f"Error reuesting service to service token: {e}") # Ensure you get a 200 response else raise an error if s2s_response.status_code != 200: - raise RuntimeError(f"Error requesting service to service token: {s2s_response.text}") + raise TokenError(f"Error requesting service to service token: {s2s_response.status_code} {s2s_response.text}", status_code=s2s_response.status_code) # Extract token from response try: diff --git a/AzureFunctions/ACTIVE/active_cdam/cdamFunctions.py b/AzureFunctions/ACTIVE/active_cdam/cdamFunctions.py index 2c4a3a76e..96df2b961 100644 --- a/AzureFunctions/ACTIVE/active_cdam/cdamFunctions.py +++ b/AzureFunctions/ACTIVE/active_cdam/cdamFunctions.py @@ -76,6 +76,7 @@ def process_event(env, caseNo, runId, file_name, file_url, file_content_type, st "StartDateTime": startDateTime, "EndDateTime": datetime.now(timezone.utc).isoformat(), "Status": "ERROR", + "StatusCode": getattr(e, "status_code", None), "Error": f"failed to gather IDAM token: {e}", "CDAMResponse": "" } @@ -90,6 +91,7 @@ def process_event(env, caseNo, runId, file_name, file_url, file_content_type, st "StartDateTime": startDateTime, "EndDateTime": datetime.now(timezone.utc).isoformat(), "Status": "ERROR", + "StatusCode": getattr(e, "status_code", None), "Error": f"failed to gather s2s token: {e}", "CDAMResponse": "" } @@ -130,6 +132,7 @@ def process_event(env, caseNo, runId, file_name, file_url, file_content_type, st "StartDateTime": startDateTime, "EndDateTime": datetime.now(timezone.utc).isoformat(), "Status": "ERROR", + "StatusCode": None, "Error": f"Failed to read given blob: {e}", "CDAMResponse": "" } @@ -165,6 +168,7 @@ def process_event(env, caseNo, runId, file_name, file_url, file_content_type, st "StartDateTime": startDateTime, "EndDateTime": datetime.now(timezone.utc).isoformat(), "Status": "ERROR", + "StatusCode": upload_document_response.status_code if upload_document_response is not None else None, "Error": f"Document upload failed: {status_code} - {text}", "CDAMResponse": "" } @@ -178,6 +182,7 @@ def process_event(env, caseNo, runId, file_name, file_url, file_content_type, st "StartDateTime": startDateTime, "EndDateTime": datetime.now(timezone.utc).isoformat(), "Status": "SUCCESS", + "StatusCode": upload_document_response.status_code, "Error": None, "CDAMResponse": upload_document_response.json() } diff --git a/AzureFunctions/ACTIVE/active_cdam/cdam_tokenManager.py b/AzureFunctions/ACTIVE/active_cdam/cdam_tokenManager.py index 2763ab67f..af5ef10f3 100644 --- a/AzureFunctions/ACTIVE/active_cdam/cdam_tokenManager.py +++ b/AzureFunctions/ACTIVE/active_cdam/cdam_tokenManager.py @@ -5,6 +5,13 @@ from azure.keyvault.secrets import SecretClient from datetime import datetime, timezone, timedelta + +class TokenError(RuntimeError): + def __init__(self, message, status_code=None): + super().__init__(message) + self.status_code = status_code + + class IDAMTokenManager: def __init__(self, env: str, skew: int = 900): self.env = env @@ -64,7 +71,7 @@ def _fetch_token(self): idam_response = requests.post(self.token_url, headers=headers, data=data) if idam_response.status_code != 200: - raise RuntimeError(f"Token request failed: {idam_response.status_code} {idam_response.text}") + raise TokenError(f"Token request failed: {idam_response.status_code} {idam_response.text}", status_code=idam_response.status_code) payload = idam_response.json() @@ -186,7 +193,7 @@ def _fetch_s2s_token(self): raise EOFError(f"Error reuesting service to service token: {e}") # Ensure you get a 200 response else raise an error if s2s_response.status_code != 200: - raise RuntimeError(f"Error requesting service to service token: {s2s_response.text}") + raise TokenError(f"Error requesting service to service token: {s2s_response.status_code} {s2s_response.text}", status_code=s2s_response.status_code) # Extract token from response try: diff --git a/AzureFunctions/ACTIVE/active_cdam/function_app.py b/AzureFunctions/ACTIVE/active_cdam/function_app.py index b87ab435f..66d29b906 100644 --- a/AzureFunctions/ACTIVE/active_cdam/function_app.py +++ b/AzureFunctions/ACTIVE/active_cdam/function_app.py @@ -47,8 +47,7 @@ def _is_retryable(result): RETRYABLE_STATUS_CODES = {408, 409, 429, 500, 502, 503, 504} if not (isinstance(result, dict) and result.get("Status") == "ERROR"): return False - error = result.get("Error", "") - return any(f"failed: {code}" in error for code in RETRYABLE_STATUS_CODES) + return result.get("StatusCode") in RETRYABLE_STATUS_CODES @app.function_name("eventhub_trigger") @@ -143,6 +142,7 @@ async def eventhub_trigger_active(azeventhub: List[func.EventHubEvent]): except Exception as delete_error: logger.warning(f"[IDEMPOTENCY][CDAM] Failed to delete blob for {caseNo}: {delete_error}") + result.pop("StatusCode", None) result_json = json.dumps(result) try: From e823faaf5db0055d6d8367e91156a1f762cbe64a Mon Sep 17 00:00:00 2001 From: Tony Chow Date: Thu, 4 Jun 2026 09:37:09 +0100 Subject: [PATCH 3/4] Add retryable tests --- tests/active/functionApp/ccdFunctions_test.py | 11 ++- tests/active/functionApp/functionApp_test.py | 90 ++++++++++++++++++- 2 files changed, 98 insertions(+), 3 deletions(-) diff --git a/tests/active/functionApp/ccdFunctions_test.py b/tests/active/functionApp/ccdFunctions_test.py index 6ba40e05f..3b2d1b932 100644 --- a/tests/active/functionApp/ccdFunctions_test.py +++ b/tests/active/functionApp/ccdFunctions_test.py @@ -72,6 +72,7 @@ def test_success_result_contains_new_fields(self): assert result["Status"] == "SUCCESS" assert result["CCDCaseID"] == SUBMIT_CASE_ID + assert result["StatusCode"] == 201 assert json.loads(result["SuccessResponse"]) == {"id": SUBMIT_CASE_ID, "case_data": SUBMIT_CASE_DATA} assert json.loads(result["StartResponse"]) == START_TOKEN_DATA @@ -99,7 +100,7 @@ def test_success_result_base_fields_present(self): PR_REFERENCE="pr-123", ) - for key in ("RunID", "CaseNo", "State", "Status", "Error", "EndDateTime", "CCDCaseID", "SuccessResponse", "StartResponse"): + for key in ("RunID", "CaseNo", "State", "Status", "StatusCode", "Error", "EndDateTime", "CCDCaseID", "SuccessResponse", "StartResponse"): assert key in result, f"Missing key: {key}" assert result["Error"] is None @@ -133,6 +134,7 @@ def test_validate_failure_returns_error(self): ) assert result["Status"] == "ERROR" + assert result["StatusCode"] == 422 assert "Case validation failed" in result["Error"] assert json.loads(result["StartResponse"]) == START_TOKEN_DATA mock_submit.assert_not_called() @@ -161,6 +163,7 @@ def test_validate_response_none_returns_error(self): ) assert result["Status"] == "ERROR" + assert result["StatusCode"] is None assert "Case validation failed" in result["Error"] @@ -193,6 +196,7 @@ def test_submit_failure_returns_error(self): ) assert result["Status"] == "ERROR" + assert result["StatusCode"] == 500 assert "Case submission failed" in result["Error"] assert json.loads(result["StartResponse"]) == START_TOKEN_DATA assert "SuccessResponse" not in result @@ -228,6 +232,7 @@ def test_start_failure_returns_error(self): ) assert result["Status"] == "ERROR" + assert result["StatusCode"] == 503 assert "Case creation failed" in result["Error"] assert "StartResponse" not in result mock_validate.assert_not_called() @@ -258,6 +263,7 @@ def test_start_returns_none_returns_error(self): ) assert result["Status"] == "ERROR" + assert result["StatusCode"] is None assert "No response from API" in result["Error"] mock_validate.assert_not_called() mock_submit.assert_not_called() @@ -287,6 +293,7 @@ def test_idam_token_failure_returns_error(self): ) assert result["Status"] == "ERROR" + assert result["StatusCode"] is None assert "IDAM" in result["Error"] def test_s2s_token_failure_returns_error(self): @@ -311,6 +318,7 @@ def test_s2s_token_failure_returns_error(self): ) assert result["Status"] == "ERROR" + assert result["StatusCode"] is None assert "s2s" in result["Error"] @@ -368,4 +376,5 @@ def test_submit_returns_none_returns_error(self): ) assert result["Status"] == "ERROR" + assert result["StatusCode"] is None assert "No response from API" in result["Error"] diff --git a/tests/active/functionApp/functionApp_test.py b/tests/active/functionApp/functionApp_test.py index d8efb23cc..bbd171722 100644 --- a/tests/active/functionApp/functionApp_test.py +++ b/tests/active/functionApp/functionApp_test.py @@ -1,7 +1,6 @@ import asyncio import json import os -import pytest from unittest.mock import patch, MagicMock, ANY, AsyncMock # ccdFunctions instantiates IDAMTokenManager at module level, which calls Azure @@ -19,7 +18,7 @@ with patch.dict(os.environ, {"ENVIRONMENT": "sbox", "LZ_KEY": "test0", "PR_NUMBER": "9999"}), \ patch("azure.functions.FunctionApp", MagicMock(return_value=_mock_app)): - from AzureFunctions.ACTIVE.active_ccd.function_app import eventhub_trigger_active + from AzureFunctions.ACTIVE.active_ccd.function_app import eventhub_trigger_active, _is_retryable # FUNCTIONS - this is to be removed once we import the functions at the top of the script (This was only done because we could not merge) @@ -607,3 +606,90 @@ def test_error_result_sent_even_when_delete_blob_raises( mocks["idempotency_blob"].delete_blob.assert_awaited_once() mocks["batch"].add.assert_called_once() mocks["producer"].send_batch.assert_awaited_once_with(mocks["batch"]) + + +# --------------------------------------------------------------------------- +# _is_retryable unit tests +# --------------------------------------------------------------------------- + +def test_is_retryable_true_for_all_retryable_status_codes(): + for code in [408, 409, 429, 500, 502, 503, 504]: + assert _is_retryable({"Status": "ERROR", "StatusCode": code}) is True, f"Expected retryable for {code}" + + +def test_is_retryable_false_for_non_retryable_status_codes(): + for code in [200, 201, 400, 401, 403, 404, 422]: + assert _is_retryable({"Status": "ERROR", "StatusCode": code}) is False, f"Expected non-retryable for {code}" + + +def test_is_retryable_false_when_status_is_success(): + assert _is_retryable({"Status": "SUCCESS", "StatusCode": 500}) is False + + +def test_is_retryable_false_when_status_code_is_none(): + assert _is_retryable({"Status": "ERROR", "StatusCode": None}) is False + + +def test_is_retryable_false_for_non_dict_result(): + assert _is_retryable("error string") is False + assert _is_retryable(None) is False + + +# --------------------------------------------------------------------------- +# StatusCode stripping before event hub publish +# --------------------------------------------------------------------------- + +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.EventHubProducerClient") +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.BlobServiceClient") +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.SecretClient") +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.DefaultAzureCredential") +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.process_case") +def test_status_code_stripped_from_event_hub_payload( + mock_process_case, mock_credential, mock_secret_client, + mock_blob_service, mock_eh_producer): + """StatusCode is an internal routing field and must not appear in the published event.""" + mocks = _build_trigger_mocks() + mock_process_case.return_value = { + "Status": "SUCCESS", + "StatusCode": 201, + "CaseNo": "CASE_SC", + "CCDCaseID": "777", + "Error": None, + } + mock_credential.return_value = AsyncMock() + mock_secret_client.return_value = mocks["kv"] + mock_blob_service.return_value = mocks["blob_svc"] + mock_eh_producer.from_connection_string.return_value = mocks["producer"] + + asyncio.run(eventhub_trigger_active([_make_event("CASE_SC", "run_sc", "paymentPending", {"key": "val"})])) + + mocks["batch"].add.assert_called_once() + published_payload = mocks["batch"].add.call_args[0][0].body_as_json() + assert "StatusCode" not in published_payload + + +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.wait_exponential") +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.EventHubProducerClient") +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.BlobServiceClient") +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.SecretClient") +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.DefaultAzureCredential") +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.process_case") +def test_non_retryable_error_not_retried( + mock_process_case, mock_credential, mock_secret_client, + mock_blob_service, mock_eh_producer, mock_wait_exp): + """process_case is NOT retried when StatusCode is non-retryable (e.g. 422).""" + from tenacity import wait_none + mock_wait_exp.return_value = wait_none() + + mocks = _build_trigger_mocks() + mock_process_case.return_value = { + "Status": "ERROR", "StatusCode": 422, "CaseNo": "CASE_NO_RETRY", "Error": "Unprocessable" + } + mock_credential.return_value = AsyncMock() + mock_secret_client.return_value = mocks["kv"] + mock_blob_service.return_value = mocks["blob_svc"] + mock_eh_producer.from_connection_string.return_value = mocks["producer"] + + asyncio.run(eventhub_trigger_active([_make_event("CASE_NO_RETRY", "run_nr", "paymentPending", {"key": "val"})])) + + assert mock_process_case.call_count == 1 From f9913731cf4fd15fa5d38ab3adf2940be2c7859a Mon Sep 17 00:00:00 2001 From: Tony Chow Date: Thu, 4 Jun 2026 10:33:07 +0100 Subject: [PATCH 4/4] Updated retryable unit tests --- .../active_caselink_ccd/function_app.py | 10 +- .../ACTIVE/active_ccd/function_app.py | 113 +----------------- .../ACTIVE/active_cdam/function_app.py | 10 +- .../cl_functionApp_test.py | 79 +++++++++++- .../cdamFunctionApp/cdam_function_app_test.py | 76 +++++++++++- tests/active/functionApp/functionApp_test.py | 88 ++++++++++++++ 6 files changed, 256 insertions(+), 120 deletions(-) diff --git a/AzureFunctions/ACTIVE/active_caselink_ccd/function_app.py b/AzureFunctions/ACTIVE/active_caselink_ccd/function_app.py index 427674f3a..d40de61ec 100644 --- a/AzureFunctions/ACTIVE/active_caselink_ccd/function_app.py +++ b/AzureFunctions/ACTIVE/active_caselink_ccd/function_app.py @@ -110,16 +110,16 @@ async def eventhub_trigger_active(azeventhub: List[func.EventHubEvent]): logger.warning(f"[IDEMPOTENCY][CASELINK] Skipping in progress case {ccdReference}.") continue - async for attempt in AsyncRetrying( + async def _process(): + return await asyncio.to_thread(process_event, ENV, ccdReference, run_id, data, PR_REFERENCE, overwrite) + + result = await AsyncRetrying( retry=retry_if_result(_is_retryable), stop=stop_after_attempt(3), wait=wait_exponential(min=30, max=60), before_sleep=_log_retry, retry_error_callback=lambda retry_state: retry_state.outcome.result(), - ): - with attempt: - result = await asyncio.to_thread(process_event, ENV, ccdReference, run_id, data, PR_REFERENCE, overwrite) - attempt.retry_state.set_result(result) + )(_process) # Skip if marked for SKIPPED if result.get("Status") == "SKIPPED": diff --git a/AzureFunctions/ACTIVE/active_ccd/function_app.py b/AzureFunctions/ACTIVE/active_ccd/function_app.py index 0b9b389a5..f69e9078e 100644 --- a/AzureFunctions/ACTIVE/active_ccd/function_app.py +++ b/AzureFunctions/ACTIVE/active_ccd/function_app.py @@ -116,16 +116,16 @@ async def eventhub_trigger_active(azeventhub: List[func.EventHubEvent]): continue # Process the file - async for attempt in AsyncRetrying( + async def _process(): + return await asyncio.to_thread(process_case, ENV, caseNo, data, run_id, state, PR_REFERENCE) + + result = await AsyncRetrying( retry=retry_if_result(_is_retryable), stop=stop_after_attempt(3), wait=wait_exponential(min=30, max=60), before_sleep=_log_retry, retry_error_callback=lambda retry_state: retry_state.outcome.result(), - ): - with attempt: - result = await asyncio.to_thread(process_case, ENV, caseNo, data, run_id, state, PR_REFERENCE) - attempt.retry_state.set_result(result) + )(_process) result["StartDateTime"] = start_datetime # Mark processed if success @@ -162,106 +162,3 @@ async def eventhub_trigger_active(azeventhub: List[func.EventHubEvent]): await idempotency_blob_service.close() await kv_client.close() await credential.close() - - -# async def eventhub_trigger_active(azeventhub: List[func.EventHubEvent]): -# logging.info(f"Processing a batch of {len(azeventhub)} events") - -# # Retrieve credentials -# credential = DefaultAzureCredential() -# logging.info('Connected to Azure Credentials') - -# kv_url = f"https://ingest{LZ_KEY}-meta002-{ENV}.vault.azure.net" -# kv_client = SecretClient(vault_url=kv_url, credential=credential) -# logging.info(f'Connected to KeyVault: {kv_url}') - -# results_eh_name = f"evh-active-res-{ENV}-{LZ_KEY}-uks-dlrm-01" -# results_eh_key = await kv_client.get_secret(f"{results_eh_name}-key") -# result_eh_secret_key = results_eh_key.value -# logging.info('Acquired KV secret for Results Event Hub') - -# # Initialise the idempotent client outside of the loop / context manager -# idempotency_account_url = f"https://ingest{LZ_KEY}xcutting{ENV}.blob.core.windows.net" -# idempotency_container_name = "af-idempotency" -# idempotency_blob_service = BlobServiceClient(account_url=idempotency_account_url, credential=credential) -# idempotency_container = idempotency_blob_service.get_container_client(idempotency_container_name) - -# res_eh_producer = EventHubProducerClient.from_connection_string(conn_str=result_eh_secret_key) - -# async with res_eh_producer: -# event_data_batch = await res_eh_producer.create_batch() -# try: -# for event in azeventhub: -# try: -# logging.info(f'Event received with partition key: {event.partition_key}') - -# # Parse the payload -# start_datetime = datetime.now(timezone.utc).isoformat() -# caseNo = event.partition_key -# payload_str = event.get_body().decode('utf-8') -# payload = json.loads(payload_str) -# run_id = payload.get("RunID", None) -# state = payload.get("State", None) -# data = payload.get("Content", None) - -# ## Build idempotency blob reference -# idempotency_blob_path = f"active/{state}/processed/{caseNo}.flag" -# idempotency_blob = idempotency_container.get_blob_client(idempotency_blob_path) - -# if await idempotency_blob.exists(): -# logging.warning(f"[IDEMPOTENCY] Skipping duplicate message for {state}/{caseNo}") -# continue - -# ##Retrieve tokens for validation -# idam_manager = IDAMTokenManager(ENV, 18000) -# idam_token, uid = idam_manager.get_token() -# s2s_manager = S2S_Manager(ENV, 18000) -# s2s_token = s2s_manager.get_token() - -# validate_response = await asyncio.to_thread( -# validate_case, -# ccd_base_url = f"https://ccd-service-{ENV}.platform.hmcts.net", -# event_token = run_id, -# payloadData = data, -# jid = "IA", -# ctid = "Asylum", -# idam_token = idam_token, -# uid = uid, -# s2s_token = s2s_token -# ) - -# # If validation is a pass or fail return the idempotent value -# await idempotency_blob.upload_blob(b"", overwrite=True) - -# if validate_response is not None and validate_response.status_code == 200: -# result = await asyncio.to_thread(process_case, ENV, caseNo, data, run_id, state, PR_NUMBER) -# result["StartDateTime"] = start_datetime -# else: -# logging.warning(f"Validation failed for {caseNo}, skipping processing") - -# result_json = json.dumps(result) - -# try: -# event_data_batch.add(EventData(result_json)) -# except ValueError: -# # If the batch is full, send it and create a new one -# await res_eh_producer.send_batch(event_data_batch) -# logging.info(f'Sent a batch of events to Results Event Hub') -# event_data_batch = await res_eh_producer.create_batch() -# event_data_batch.add(EventData(result_json)) - -# except Exception as e: -# logging.error(f'Error processing event for caseNo {caseNo}: {e}') - -# # Send any remaining events in the batch -# if len(event_data_batch) > 0: -# await res_eh_producer.send_batch(event_data_batch) -# logging.info(f'Sent the final batch of events to Results Event Hub') - -# except Exception as e: -# logging.error(f'Error in event hub processing batch: {e}') -# finally: -# # Clean up all clients -# await idempotency_blob_service.close() -# await kv_client.close() -# await credential.close() diff --git a/AzureFunctions/ACTIVE/active_cdam/function_app.py b/AzureFunctions/ACTIVE/active_cdam/function_app.py index 66d29b906..36ed3e146 100644 --- a/AzureFunctions/ACTIVE/active_cdam/function_app.py +++ b/AzureFunctions/ACTIVE/active_cdam/function_app.py @@ -121,16 +121,16 @@ async def eventhub_trigger_active(azeventhub: List[func.EventHubEvent]): logger.warning(f"[IDEMPOTENCY][CDAM] Skipping in progress case {caseNo}.") continue - async for attempt in AsyncRetrying( + async def _process(): + return await asyncio.to_thread(process_event, ENV, caseNo, run_id, file_name, file_url, file_content_type, storage_credential) + + result = await AsyncRetrying( retry=retry_if_result(_is_retryable), stop=stop_after_attempt(3), wait=wait_exponential(min=30, max=60), before_sleep=_log_retry, retry_error_callback=lambda retry_state: retry_state.outcome.result(), - ): - with attempt: - result = await asyncio.to_thread(process_event, ENV, caseNo, run_id, file_name, file_url, file_content_type, storage_credential) - attempt.retry_state.set_result(result) + )(_process) # Mark processed if success if result.get("Status") == "SUCCESS": diff --git a/tests/active/caseLinkFunctionApp/cl_functionApp_test.py b/tests/active/caseLinkFunctionApp/cl_functionApp_test.py index 03f682ab0..441ed3449 100644 --- a/tests/active/caseLinkFunctionApp/cl_functionApp_test.py +++ b/tests/active/caseLinkFunctionApp/cl_functionApp_test.py @@ -11,7 +11,7 @@ with patch.dict(os.environ, {"ENVIRONMENT": "sbox", "LZ_KEY": "testlz", "PR_NUMBER": "9999"}), \ patch("azure.functions.FunctionApp", MagicMock(return_value=_mock_app)): - from AzureFunctions.ACTIVE.active_caselink_ccd.function_app import eventhub_trigger_active + from AzureFunctions.ACTIVE.active_caselink_ccd.function_app import eventhub_trigger_active, _is_retryable import AzureFunctions.ACTIVE.active_caselink_ccd.function_app as app_module @@ -469,3 +469,80 @@ def test_error_result_sent_even_when_delete_blob_raises(): mocks["idempotency_blob"].delete_blob.assert_awaited_once() mocks["batch"].add.assert_called_once() mocks["producer"].send_batch.assert_awaited_once() + + +# --------------------------------------------------------------------------- +# _is_retryable unit tests +# --------------------------------------------------------------------------- + +def test_cl_is_retryable_true_for_retryable_status_codes(): + for code in [408, 409, 429, 500, 502, 503, 504]: + assert _is_retryable({"Status": "ERROR", "StatusCode": code}) is True, f"Expected retryable for {code}" + + +def test_cl_is_retryable_false_for_non_retryable_status_codes(): + for code in [200, 201, 400, 401, 403, 404, 422]: + assert _is_retryable({"Status": "ERROR", "StatusCode": code}) is False, f"Expected non-retryable for {code}" + + +def test_cl_is_retryable_false_when_status_is_success(): + assert _is_retryable({"Status": "SUCCESS", "StatusCode": 500}) is False + + +def test_cl_is_retryable_false_when_status_code_is_none(): + assert _is_retryable({"Status": "ERROR", "StatusCode": None}) is False + + +def test_cl_is_retryable_false_for_non_dict_result(): + assert _is_retryable("error string") is False + assert _is_retryable(None) is False + + +# --------------------------------------------------------------------------- +# Retry integration tests +# --------------------------------------------------------------------------- + +def test_cl_non_retryable_error_not_retried(): + """process_event is NOT retried for a non-retryable status code (422).""" + from tenacity import wait_none + mocks = setup_mocks(batch_len=1) + to_thread = MagicMock(return_value={"Status": "ERROR", "StatusCode": 422, "Error": "Unprocessable"}) + events = [make_mock_event({"RunID": "run-nr", "CaseLinkPayload": []})] + + extra = [patch("AzureFunctions.ACTIVE.active_caselink_ccd.function_app.wait_exponential", return_value=wait_none())] + with apply_patches(patched(mocks, to_thread_mock=to_thread, extra_patches=extra), mocks): + run(eventhub_trigger_active(events)) + + assert to_thread.call_count == 1 + + +def test_cl_retryable_error_is_retried_3_times(): + """process_event is retried up to 3 times for a retryable status code (500).""" + from tenacity import wait_none + mocks = setup_mocks(batch_len=1) + to_thread = MagicMock(return_value={"Status": "ERROR", "StatusCode": 500, "Error": "Server Error"}) + events = [make_mock_event({"RunID": "run-r3", "CaseLinkPayload": []})] + + extra = [patch("AzureFunctions.ACTIVE.active_caselink_ccd.function_app.wait_exponential", return_value=wait_none())] + with apply_patches(patched(mocks, to_thread_mock=to_thread, extra_patches=extra), mocks): + run(eventhub_trigger_active(events)) + + assert to_thread.call_count == 3 + + +def test_cl_retryable_error_stops_retrying_on_success(): + """After a retryable error, success on the second attempt stops further retries.""" + from tenacity import wait_none + mocks = setup_mocks(batch_len=1) + to_thread = MagicMock(side_effect=[ + {"Status": "ERROR", "StatusCode": 502, "Error": "Bad Gateway"}, + dict(PROCESS_SUCCESS_RESULT), + ]) + events = [make_mock_event({"RunID": "run-ok2", "CaseLinkPayload": []})] + + extra = [patch("AzureFunctions.ACTIVE.active_caselink_ccd.function_app.wait_exponential", return_value=wait_none())] + with apply_patches(patched(mocks, to_thread_mock=to_thread, extra_patches=extra), mocks): + run(eventhub_trigger_active(events)) + + assert to_thread.call_count == 2 + mocks["idempotency_blob"].delete_blob.assert_not_called() diff --git a/tests/active/cdamFunctionApp/cdam_function_app_test.py b/tests/active/cdamFunctionApp/cdam_function_app_test.py index 319eb43f9..7a52e3d1f 100644 --- a/tests/active/cdamFunctionApp/cdam_function_app_test.py +++ b/tests/active/cdamFunctionApp/cdam_function_app_test.py @@ -13,7 +13,7 @@ with patch.dict(os.environ, {"ENVIRONMENT": "sbox", "LZ_KEY": "testlz"}), \ patch("azure.functions.FunctionApp", MagicMock(return_value=_mock_app)): - from AzureFunctions.ACTIVE.active_cdam.function_app import eventhub_trigger_active + from AzureFunctions.ACTIVE.active_cdam.function_app import eventhub_trigger_active, _is_retryable import AzureFunctions.ACTIVE.active_cdam.function_app as app_module @@ -415,3 +415,77 @@ def test_error_result_sent_even_when_delete_blob_raises(): mocks["idempotency_blob"].delete_blob.assert_awaited_once() mocks["batch"].add.assert_called_once() mocks["producer"].send_batch.assert_awaited_once() + + +# --------------------------------------------------------------------------- +# _is_retryable unit tests +# --------------------------------------------------------------------------- + +def test_cdam_is_retryable_true_for_retryable_status_codes(): + for code in [408, 409, 429, 500, 502, 503, 504]: + assert _is_retryable({"Status": "ERROR", "StatusCode": code}) is True, f"Expected retryable for {code}" + + +def test_cdam_is_retryable_false_for_non_retryable_status_codes(): + for code in [200, 201, 400, 401, 403, 404, 422]: + assert _is_retryable({"Status": "ERROR", "StatusCode": code}) is False, f"Expected non-retryable for {code}" + + +def test_cdam_is_retryable_false_when_status_is_success(): + assert _is_retryable({"Status": "SUCCESS", "StatusCode": 500}) is False + + +def test_cdam_is_retryable_false_when_status_code_is_none(): + assert _is_retryable({"Status": "ERROR", "StatusCode": None}) is False + + +def test_cdam_is_retryable_false_for_non_dict_result(): + assert _is_retryable("error string") is False + assert _is_retryable(None) is False + + +# --------------------------------------------------------------------------- +# Retry integration tests +# --------------------------------------------------------------------------- + +def test_cdam_non_retryable_error_not_retried(): + """process_event is NOT retried for a non-retryable status code (422).""" + from tenacity import wait_none + mocks = setup_mocks(batch_len=1) + to_thread = MagicMock(return_value={"Status": "ERROR", "StatusCode": 422, "Error": "Unprocessable"}) + + extra = [patch(f"{MODULE}.wait_exponential", return_value=wait_none())] + with apply_patches(patched(mocks, to_thread_mock=to_thread) + extra, mocks): + run(eventhub_trigger_active([make_mock_event()])) + + assert to_thread.call_count == 1 + + +def test_cdam_retryable_error_is_retried_3_times(): + """process_event is retried up to 3 times for a retryable status code (500).""" + from tenacity import wait_none + mocks = setup_mocks(batch_len=1) + to_thread = MagicMock(return_value={"Status": "ERROR", "StatusCode": 500, "Error": "Server Error"}) + + extra = [patch(f"{MODULE}.wait_exponential", return_value=wait_none())] + with apply_patches(patched(mocks, to_thread_mock=to_thread) + extra, mocks): + run(eventhub_trigger_active([make_mock_event()])) + + assert to_thread.call_count == 3 + + +def test_cdam_retryable_error_stops_retrying_on_success(): + """After a retryable error, success on the second attempt stops further retries.""" + from tenacity import wait_none + mocks = setup_mocks(batch_len=1) + to_thread = MagicMock(side_effect=[ + {"Status": "ERROR", "StatusCode": 502, "Error": "Bad Gateway"}, + dict(PROCESS_SUCCESS_RESULT), + ]) + + extra = [patch(f"{MODULE}.wait_exponential", return_value=wait_none())] + with apply_patches(patched(mocks, to_thread_mock=to_thread) + extra, mocks): + run(eventhub_trigger_active([make_mock_event()])) + + assert to_thread.call_count == 2 + mocks["idempotency_blob"].delete_blob.assert_not_called() diff --git a/tests/active/functionApp/functionApp_test.py b/tests/active/functionApp/functionApp_test.py index bbd171722..2b8a1087b 100644 --- a/tests/active/functionApp/functionApp_test.py +++ b/tests/active/functionApp/functionApp_test.py @@ -693,3 +693,91 @@ def test_non_retryable_error_not_retried( asyncio.run(eventhub_trigger_active([_make_event("CASE_NO_RETRY", "run_nr", "paymentPending", {"key": "val"})])) assert mock_process_case.call_count == 1 + + +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.wait_exponential") +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.EventHubProducerClient") +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.BlobServiceClient") +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.SecretClient") +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.DefaultAzureCredential") +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.process_case") +def test_retryable_error_is_retried_3_times( + mock_process_case, mock_credential, mock_secret_client, + mock_blob_service, mock_eh_producer, mock_wait_exp): + """process_case is retried up to stop_after_attempt(3) when StatusCode is retryable.""" + from tenacity import wait_none + mock_wait_exp.return_value = wait_none() + + mocks = _build_trigger_mocks() + mock_process_case.return_value = { + "Status": "ERROR", "StatusCode": 500, "CaseNo": "CASE_R3", "Error": "Server Error" + } + mock_credential.return_value = AsyncMock() + mock_secret_client.return_value = mocks["kv"] + mock_blob_service.return_value = mocks["blob_svc"] + mock_eh_producer.from_connection_string.return_value = mocks["producer"] + + asyncio.run(eventhub_trigger_active([_make_event("CASE_R3", "run_r3", "paymentPending", {"key": "val"})])) + + assert mock_process_case.call_count == 3 + + +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.wait_exponential") +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.EventHubProducerClient") +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.BlobServiceClient") +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.SecretClient") +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.DefaultAzureCredential") +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.process_case") +def test_retryable_error_result_published_and_blob_deleted_after_all_retries_exhausted( + mock_process_case, mock_credential, mock_secret_client, + mock_blob_service, mock_eh_producer, mock_wait_exp): + """After all 3 retries with a retryable code, the error result is published and blob deleted.""" + from tenacity import wait_none + mock_wait_exp.return_value = wait_none() + + mocks = _build_trigger_mocks() + mock_process_case.return_value = { + "Status": "ERROR", "StatusCode": 503, "CaseNo": "CASE_EX", "Error": "Service Unavailable" + } + mock_credential.return_value = AsyncMock() + mock_secret_client.return_value = mocks["kv"] + mock_blob_service.return_value = mocks["blob_svc"] + mock_eh_producer.from_connection_string.return_value = mocks["producer"] + + asyncio.run(eventhub_trigger_active([_make_event("CASE_EX", "run_ex", "paymentPending", {"key": "val"})])) + + mocks["idempotency_blob"].delete_blob.assert_awaited_once() + mocks["batch"].add.assert_called_once() + published_payload = mocks["batch"].add.call_args[0][0].body_as_json() + assert "StatusCode" not in published_payload + assert published_payload["Status"] == "ERROR" + + +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.wait_exponential") +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.EventHubProducerClient") +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.BlobServiceClient") +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.SecretClient") +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.DefaultAzureCredential") +@patch("AzureFunctions.ACTIVE.active_ccd.function_app.process_case") +def test_retryable_error_stops_retrying_on_success( + mock_process_case, mock_credential, mock_secret_client, + mock_blob_service, mock_eh_producer, mock_wait_exp): + """When a retryable error is followed by a success, process_case is called exactly twice.""" + from tenacity import wait_none + mock_wait_exp.return_value = wait_none() + + mocks = _build_trigger_mocks() + mock_process_case.side_effect = [ + {"Status": "ERROR", "StatusCode": 502, "CaseNo": "CASE_OK2", "Error": "Bad Gateway"}, + {"Status": "SUCCESS", "StatusCode": 201, "CaseNo": "CASE_OK2", "CCDCaseID": "999", "Error": None}, + ] + mock_credential.return_value = AsyncMock() + mock_secret_client.return_value = mocks["kv"] + mock_blob_service.return_value = mocks["blob_svc"] + mock_eh_producer.from_connection_string.return_value = mocks["producer"] + + asyncio.run(eventhub_trigger_active([_make_event("CASE_OK2", "run_ok2", "paymentPending", {"key": "val"})])) + + assert mock_process_case.call_count == 2 + mocks["idempotency_blob"].delete_blob.assert_not_called() + mocks["batch"].add.assert_called_once()