Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 25 additions & 20 deletions AzureFunctions/ACTIVE/active_caselink_ccd/cl_ccdFunctions.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import json
import requests
from datetime import datetime, timezone
try:
from .retry_decorator import retry_on_result
except ImportError:
from retry_decorator import retry_on_result


def _compact(value) -> str:
if isinstance(value, (dict, list)):
text = json.dumps(value, indent=2)
else:
text = str(value)
return text.replace("\r\n", "\\n").replace("\r", "\\n").replace("\n", "\\n")


# tokenManager lives in the same package. When this module is imported by the
# Functions host the package root will be `AzureFunctions.ACTIVE.active_ccd`.
Expand Down Expand Up @@ -34,7 +39,7 @@ def get_case_details(ccd_base_url, uid, jid, ctid, cid, idam_token, s2s_token):
}
try:
response = requests.get(get_case_url, headers=headers)
print(f"🔢 Get Case Details Response status: {response.status_code}:{response.text}")
print(f"🔢 Get Case Details Response status: {response.status_code}:{_compact(response.text)}")
return response
except Exception as e:
print(f"❌ Network error while calling {get_case_url}: {e}")
Expand All @@ -53,7 +58,7 @@ def start_case_event(ccd_base_url, uid, jid, ctid, cid, etid, idam_token, s2s_to
}
try:
response = requests.get(start_event_url, headers=headers)
print(f"🔢 Start Case Event Response status: {response.status_code}:{response.text}")
print(f"🔢 Start Case Event Response status: {response.status_code}:{_compact(response.text)}")
return response
except Exception as e:
print(f"❌ Network error while calling {start_event_url}: {e}")
Expand Down Expand Up @@ -85,11 +90,11 @@ def validate_case(ccd_base_url, uid, jid, ctid, cid, etid, event_token, payloadD
"ignore_warning": True,
}

print(f"🔢 Validate posting payload for {cid}: validate_case_url = {validate_case_url} headers = {headers} json = {json_object}")
print(f"🔢 Validate posting payload for {cid}: validate_case_url = {validate_case_url} headers = {_compact(headers)} json = {_compact(json_object)}")

response = requests.post(validate_case_url, headers=headers, json=json_object)

print(f"🔢 Validate Response for {cid} = {response.status_code}: {response.text}")
print(f"🔢 Validate Response for {cid} = {response.status_code}: {_compact(response.text)}")
return response

except Exception as e:
Expand Down Expand Up @@ -124,24 +129,18 @@ def submit_case_event(ccd_base_url, uid, jid, ctid, cid, etid, event_token, payl
"ignore_warning": True,
}

print(f"🔢 Submit payload for {cid}: submit_case_url = {submit_event_url} headers = {headers} json = {json_object}\n")
print(f"🔢 Submit payload for {cid}: submit_case_url = {submit_event_url} headers = {_compact(headers)} json = {_compact(json_object)}")

response = requests.post(submit_event_url, headers=headers, json=json_object)

print(f"🔢 Submit Response status for {cid}: {response.status_code}:{response.text}\n")
print(f"🔢 Submit Response status for {cid}: {response.status_code}:{_compact(response.text)}")
return response

except Exception as e:
print(f"❌ Network error while calling {submit_event_url}: {e}")
return None


@retry_on_result(
max_retries=2,
base_delay=30,
max_delay=60,
retry_on=lambda r: isinstance(r, dict) and r.get("Status") == "ERROR",
)
def process_event(env, ccdReference, runId, caseLinkPayload, PR_REFERENCE, overwrite=False):
print(f"Starting processing case for {ccdReference}")

Expand All @@ -157,6 +156,7 @@ def process_event(env, ccdReference, runId, caseLinkPayload, PR_REFERENCE, overw
"StartDateTime": startDateTime,
"EndDateTime": datetime.now(timezone.utc).isoformat(),
"Status": "ERROR",
"StatusCode": getattr(e, "status_code", None),
"Error": f"failed to gather IDAM token: {e}"
}
return result
Expand All @@ -171,6 +171,7 @@ def process_event(env, ccdReference, runId, caseLinkPayload, PR_REFERENCE, overw
"StartDateTime": startDateTime,
"EndDateTime": datetime.now(timezone.utc).isoformat(),
"Status": "ERROR",
"StatusCode": getattr(e, "status_code", None),
"Error": f"failed to gather s2s token: {e}"
}
return result
Expand Down Expand Up @@ -231,6 +232,7 @@ def process_event(env, ccdReference, runId, caseLinkPayload, PR_REFERENCE, overw
"StartDateTime": startDateTime,
"EndDateTime": datetime.now(timezone.utc).isoformat(),
"Status": "ERROR",
"StatusCode": start_response.status_code if start_response is not None else None,
"Error": f"Case link event failed: {status_code} - {text}"
}
return result
Expand All @@ -244,10 +246,10 @@ def process_event(env, ccdReference, runId, caseLinkPayload, PR_REFERENCE, overw
validate_case_response = validate_case(ccd_base_url, uid, jid, ctid, ccdReference, etid, event_token, caseLinkPayload, idam_token, s2s_token)

try:
print(f"Validation response for case {ccdReference}: {json.dumps(validate_case_response.json(), indent=2)}")
print(f"Validation response for case {ccdReference}: {_compact(validate_case_response.json())}")
except Exception:
try:
print(validate_case_response.text)
print(_compact(validate_case_response.text))
except Exception:
print(f"Unable to parse validate_case_response for case {ccdReference}")

Expand All @@ -268,6 +270,7 @@ def process_event(env, ccdReference, runId, caseLinkPayload, PR_REFERENCE, overw
"StartDateTime": startDateTime,
"EndDateTime": datetime.now(timezone.utc).isoformat(),
"Status": "ERROR",
"StatusCode": validate_case_response.status_code if validate_case_response is not None else None,
"Error": f"Case link validation failed: {status_code} - {text}",
}
return result
Expand All @@ -280,10 +283,10 @@ def process_event(env, ccdReference, runId, caseLinkPayload, PR_REFERENCE, overw
submit_case_response = submit_case_event(ccd_base_url, uid, jid, ctid, ccdReference, etid, event_token, caseLinkPayload, idam_token, s2s_token)

try:
print(f"Submit response for case {ccdReference}: {json.dumps(submit_case_response.json(), indent=2)}")
print(f"Submit response for case {ccdReference}: {_compact(submit_case_response.json())}")
except Exception:
try:
print(submit_case_response.text)
print(_compact(submit_case_response.text))
except Exception:
print(f"Unable to parse submit_case_response for case {ccdReference}")

Expand All @@ -304,6 +307,7 @@ def process_event(env, ccdReference, runId, caseLinkPayload, PR_REFERENCE, overw
"StartDateTime": startDateTime,
"EndDateTime": datetime.now(timezone.utc).isoformat(),
"Status": "ERROR",
"StatusCode": submit_case_response.status_code if submit_case_response is not None else None,
"Error": f"Case link submission failed: {status_code} - {text}",
}

Expand All @@ -317,6 +321,7 @@ def process_event(env, ccdReference, runId, caseLinkPayload, PR_REFERENCE, overw
"StartDateTime": startDateTime,
"EndDateTime": datetime.now(timezone.utc).isoformat(),
"Status": "SUCCESS",
"StatusCode": submit_case_response.status_code,
"Error": None
}

Expand Down
11 changes: 9 additions & 2 deletions AzureFunctions/ACTIVE/active_caselink_ccd/cl_tokenManager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
from azure.keyvault.secrets import SecretClient
from datetime import datetime, timezone, timedelta


class TokenError(RuntimeError):
def __init__(self, message, status_code=None):
super().__init__(message)
self.status_code = status_code


class IDAMTokenManager:
def __init__(self, env: str, skew: int = 1800):
self.env = env
Expand Down Expand Up @@ -64,7 +71,7 @@ def _fetch_token(self):
idam_response = requests.post(self.token_url, headers=headers, data=data)

if idam_response.status_code != 200:
raise RuntimeError(f"Token request failed: {idam_response.status_code} {idam_response.text}")
raise TokenError(f"Token request failed: {idam_response.status_code} {idam_response.text}", status_code=idam_response.status_code)

payload = idam_response.json()

Expand Down Expand Up @@ -186,7 +193,7 @@ def _fetch_s2s_token(self):
raise EOFError(f"Error reuesting service to service token: {e}")
# Ensure you get a 200 response else raise an error
if s2s_response.status_code != 200:
raise RuntimeError(f"Error requesting service to service token: {s2s_response.text}")
raise TokenError(f"Error requesting service to service token: {s2s_response.status_code} {s2s_response.text}", status_code=s2s_response.status_code)

# Extract token from response
try:
Expand Down
30 changes: 29 additions & 1 deletion AzureFunctions/ACTIVE/active_caselink_ccd/function_app.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import json
import os

from tenacity import AsyncRetrying, retry_if_result, stop_after_attempt, wait_exponential

from azure.core.exceptions import ResourceExistsError
from azure.storage.blob.aio import BlobServiceClient
from azure.eventhub.aio import EventHubProducerClient
Expand Down Expand Up @@ -32,6 +34,22 @@
app = func.FunctionApp()


def _log_retry(retry_state):
result = retry_state.outcome.result() if retry_state.outcome else {}
error = result.get("Error", "") if isinstance(result, dict) else ""
logger.warning(
f"Retrying process_event — attempt {retry_state.attempt_number} failed "
f"(sleeping {retry_state.next_action.sleep:.0f}s): {error}"
)


def _is_retryable(result):
RETRYABLE_STATUS_CODES = {408, 409, 429, 500, 502, 503, 504}
if not (isinstance(result, dict) and result.get("Status") == "ERROR"):
return False
return result.get("StatusCode") in RETRYABLE_STATUS_CODES


@app.function_name("eventhub_trigger")
@app.event_hub_message_trigger(
arg_name="azeventhub",
Expand Down Expand Up @@ -92,7 +110,16 @@ async def eventhub_trigger_active(azeventhub: List[func.EventHubEvent]):
logger.warning(f"[IDEMPOTENCY][CASELINK] Skipping in progress case {ccdReference}.")
continue

result = await asyncio.to_thread(process_event, ENV, ccdReference, run_id, data, PR_REFERENCE, overwrite)
async def _process():
return await asyncio.to_thread(process_event, ENV, ccdReference, run_id, data, PR_REFERENCE, overwrite)

result = await AsyncRetrying(
retry=retry_if_result(_is_retryable),
stop=stop_after_attempt(3),
wait=wait_exponential(min=30, max=60),
before_sleep=_log_retry,
retry_error_callback=lambda retry_state: retry_state.outcome.result(),
)(_process)

# Skip if marked for SKIPPED
if result.get("Status") == "SKIPPED":
Expand All @@ -109,6 +136,7 @@ async def eventhub_trigger_active(azeventhub: List[func.EventHubEvent]):
except Exception as delete_error:
logger.warning(f"[IDEMPOTENCY][CASELINK] Failed to delete blob for {ccdReference}: {delete_error}")

result.pop("StatusCode", None)
result_json = json.dumps(result)

try:
Expand Down
2 changes: 1 addition & 1 deletion AzureFunctions/ACTIVE/active_caselink_ccd/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ pycparser==2.22
PyJWT==2.10.1
requests==2.32.3
six==1.17.0
tenacity==9.1.2
tenacity==9.1.4
typing_extensions==4.13.2
urllib3==2.4.0
Werkzeug==3.1.3
Expand Down
70 changes: 0 additions & 70 deletions AzureFunctions/ACTIVE/active_caselink_ccd/retry_decorator.py

This file was deleted.

Loading