Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
4da5850
Initial plan
Copilot Jun 25, 2025
6cd0b5f
Add restart mechanism to deployment status updater to fix stuck opera…
Copilot Jun 25, 2025
c20078a
Merge branch 'main' into copilot/fix-4464
marrobi Jun 25, 2025
c7072b9
Add heartbeat monitoring to supervisor function for stuck process det…
Copilot Jun 25, 2025
202e726
Move heartbeat monitoring from resource processor to deployment statu…
Copilot Jun 25, 2025
381bd9c
Fix linting issues and increment API version
Copilot Jun 25, 2025
7c5ff5d
Refactor service bus components to implement heartbeat monitoring and…
marrobi Jun 26, 2025
9329c94
update tests and fix issue.
marrobi Jun 26, 2025
96e39b2
Fix lint.
marrobi Jun 26, 2025
f81e9eb
Merge branch 'main' into copilot/fix-4464
marrobi Jun 26, 2025
75d77dd
remove duplicate tests.
marrobi Jun 26, 2025
b190ab3
Merge branch 'main' of https://github.com/microsoft/AzureTRE into cop…
marrobi Nov 7, 2025
7b78e99
Enhance Service Bus consumer with error handling and heartbeat manage…
marrobi Nov 7, 2025
32d8c75
Enhance Service Bus consumer with error handling and heartbeat manage…
marrobi Nov 7, 2025
681d5ad
Merge branch 'copilot/fix-4464' of https://github.com/microsoft/Azure…
marrobi Nov 7, 2025
b6f7e29
Update tests
marrobi Nov 7, 2025
ba8d1e9
update tests
marrobi Nov 7, 2025
49245fe
Update api_app/service_bus/deployment_status_updater.py
marrobi Nov 7, 2025
37291c3
Define format once for two instrumentors.
marrobi Nov 7, 2025
3840d8c
Merge branch 'copilot/fix-4464' of https://github.com/microsoft/Azure…
marrobi Nov 7, 2025
975ed29
Update api_app/service_bus/deployment_status_updater.py
marrobi Nov 7, 2025
7eac68b
Update api_app/tests_ma/test_service_bus/test_service_bus_edge_cases.py
marrobi Nov 7, 2025
ff963ac
Move tempfile import to top and add explanatory comment to except clause
Copilot Nov 7, 2025
dea9c2a
Merge branch 'main' into copilot/fix-4464
JC-wk Feb 6, 2026
b23220e
Merge branch 'main' into copilot/fix-4464
marrobi Feb 9, 2026
42bf9d0
Implement service bus consumer monitoring with heartbeat detection an…
marrobi Feb 9, 2026
211e699
Merge branch 'main' into copilot/fix-4464
marrobi Feb 9, 2026
d8ce5cf
Increment API version to 0.26.1 for bug fix
Copilot Feb 9, 2026
221e57e
Merge branch 'main' into copilot/fix-4464
JC-wk Feb 10, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ ENHANCEMENTS:
* Pass OIDC vars directly to the devcontainer ([#4871](https://github.com/microsoft/AzureTRE/issues/4871))

BUG FIXES:
* Implement service bus consumer monitoring with heartbeat detection, automatic recovery, and /health endpoint integration to prevent operations getting stuck indefinitely ([#4464](https://github.com/microsoft/AzureTRE/issues/4464))
* Fix property substitution not occuring where there is only a main step in the pipeline ([#4824](https://github.com/microsoft/AzureTRE/issues/4824))
* Fix Mysql template ignored storage_mb ([#4846](https://github.com/microsoft/AzureTRE/issues/4846))
* Fix duplicate `TOPIC_SUBSCRIPTION_NAME` in `core/terraform/airlock/airlock_processor.tf` ([#4847](https://github.com/microsoft/AzureTRE/pull/4847))
Expand Down
2 changes: 1 addition & 1 deletion api_app/_version.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.25.14"
__version__ = "0.26.1"
32 changes: 19 additions & 13 deletions api_app/api/routes/health.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from core import credentials
from models.schemas.status import HealthCheck, ServiceStatus, StatusEnum
from resources import strings
from services.health_checker import create_resource_processor_status, create_state_store_status, create_service_bus_status
from services.health_checker import create_airlock_consumer_status, create_deployment_consumer_status, create_resource_processor_status, create_state_store_status, create_service_bus_status
from services.logging import logger

router = APIRouter()
Expand All @@ -14,22 +14,28 @@ async def health_check(request: Request) -> HealthCheck:
# The health endpoint checks the status of key components of the system.
# Note that Resource Processor checks incur Azure management calls, so
# calling this endpoint frequently may result in API throttling.
deployment_consumer = getattr(request.app.state, 'deployment_status_updater', None)
airlock_consumer = getattr(request.app.state, 'airlock_status_updater', None)

async with credentials.get_credential_async_context() as credential:
cosmos, sb, rp = await asyncio.gather(
cosmos, sb, rp, deploy, airlock = await asyncio.gather(
create_state_store_status(),
create_service_bus_status(credential),
create_resource_processor_status(credential)
create_resource_processor_status(credential),
create_deployment_consumer_status(deployment_consumer),
create_airlock_consumer_status(airlock_consumer),
)
cosmos_status, cosmos_message = cosmos
sb_status, sb_message = sb
rp_status, rp_message = rp
if cosmos_status == StatusEnum.not_ok or sb_status == StatusEnum.not_ok or rp_status == StatusEnum.not_ok:
logger.error(f'Cosmos Status: {cosmos_status}, message: {cosmos_message}')
logger.error(f'Service Bus Status: {sb_status}, message: {sb_message}')
logger.error(f'Resource Processor Status: {rp_status}, message: {rp_message}')

services = [ServiceStatus(service=strings.COSMOS_DB, status=cosmos_status, message=cosmos_message),
ServiceStatus(service=strings.SERVICE_BUS, status=sb_status, message=sb_message),
ServiceStatus(service=strings.RESOURCE_PROCESSOR, status=rp_status, message=rp_message)]
services = [
ServiceStatus(service=strings.COSMOS_DB, status=cosmos[0], message=cosmos[1]),
ServiceStatus(service=strings.SERVICE_BUS, status=sb[0], message=sb[1]),
ServiceStatus(service=strings.RESOURCE_PROCESSOR, status=rp[0], message=rp[1]),
ServiceStatus(service=strings.DEPLOYMENT_STATUS_CONSUMER, status=deploy[0], message=deploy[1]),
ServiceStatus(service=strings.AIRLOCK_STATUS_CONSUMER, status=airlock[0], message=airlock[1]),
]

for svc in services:
if svc.status == StatusEnum.not_ok:
logger.error(f'{svc.service} Status: {svc.status}, message: {svc.message}')

return HealthCheck(services=services)
8 changes: 6 additions & 2 deletions api_app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,12 @@ async def lifespan(app: FastAPI):
airlockStatusUpdater = AirlockStatusUpdater()
await airlockStatusUpdater.init_repos()

asyncio.create_task(deploymentStatusUpdater.receive_messages())
asyncio.create_task(airlockStatusUpdater.receive_messages())
# Store consumer references on app.state so the /health endpoint can check their heartbeats
app.state.deployment_status_updater = deploymentStatusUpdater
app.state.airlock_status_updater = airlockStatusUpdater

asyncio.create_task(deploymentStatusUpdater.supervisor_with_heartbeat_check())
asyncio.create_task(airlockStatusUpdater.supervisor_with_heartbeat_check())
yield


Expand Down
6 changes: 6 additions & 0 deletions api_app/resources/strings.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,12 @@
RESOURCE_PROCESSOR_GENERAL_ERROR_MESSAGE = "Resource Processor is not responding"
RESOURCE_PROCESSOR_HEALTHY_MESSAGE = "HealthState/healthy"

# Service bus consumer status
DEPLOYMENT_STATUS_CONSUMER = "Deployment Status Consumer"
AIRLOCK_STATUS_CONSUMER = "Airlock Status Consumer"
CONSUMER_HEARTBEAT_STALE = "{} heartbeat is stale or missing"
CONSUMER_NOT_INITIALIZED = "{} has not been initialized"

# Error strings
ACCESS_APP_IS_MISSING_ROLE = "The App is missing role"
ACCESS_PLEASE_SUPPLY_CLIENT_ID = "Please supply the client_id for the AAD application"
Expand Down
17 changes: 11 additions & 6 deletions api_app/service_bus/airlock_request_status_update.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,13 @@
from models.domain.airlock_operations import StepResultStatusUpdateMessage
from core import config, credentials
from resources import strings
from service_bus.service_bus_consumer import ServiceBusConsumer


class AirlockStatusUpdater():
class AirlockStatusUpdater(ServiceBusConsumer):

def __init__(self):
pass
super().__init__("airlock_status_updater")

async def init_repos(self):
self.airlock_request_repo = await AirlockRequestRepository.create()
Expand All @@ -36,9 +37,13 @@ async def receive_messages(self):
try:
current_time = time.time()
polling_count += 1

# Update heartbeat for supervisor monitoring
self.update_heartbeat()

# Log a heartbeat message every 60 seconds to show the service is still working
if current_time - last_heartbeat_time >= 60:
logger.info(f"Queue reader heartbeat: Polled {config.SERVICE_BUS_STEP_RESULT_QUEUE} queue {polling_count} times in the last minute")
logger.info(f"{config.SERVICE_BUS_STEP_RESULT_QUEUE} queue polled {polling_count} times in the last minute")
last_heartbeat_time = current_time
polling_count = 0

Expand All @@ -64,13 +69,13 @@ async def receive_messages(self):
# Timeout occurred whilst connecting to a session - this is expected and indicates no non-empty sessions are available
logger.debug("No sessions for this process. Will look again...")

except ServiceBusConnectionError:
except ServiceBusConnectionError as e:
# Occasionally there will be a transient / network-level error in connecting to SB.
logger.info("Unknown Service Bus connection error. Will retry...")
logger.warning(f"Service Bus connection error (will retry): {e}")

except Exception as e:
# Catch all other exceptions, log them via .exception to get the stack trace, and reconnect
logger.exception(f"Unknown exception. Will retry - {e}")
logger.exception(f"Unexpected error in message processing: {type(e).__name__}: {e}")

async def process_message(self, msg):
with tracer.start_as_current_span("process_message") as current_span:
Expand Down
37 changes: 22 additions & 15 deletions api_app/service_bus/deployment_status_updater.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
import json
import uuid
import time
from typing import Dict, List, Any

from pydantic import ValidationError, parse_obj_as

Expand All @@ -21,21 +21,19 @@
from models.domain.operation import DeploymentStatusUpdateMessage, Operation, OperationStep, Status
from resources import strings
from services.logging import logger, tracer
from service_bus.service_bus_consumer import ServiceBusConsumer


class DeploymentStatusUpdater():
class DeploymentStatusUpdater(ServiceBusConsumer):
def __init__(self):
pass
super().__init__("deployment_status_updater")

async def init_repos(self):
self.operations_repo = await OperationRepository.create()
self.resource_repo = await ResourceRepository.create()
self.resource_template_repo = await ResourceTemplateRepository.create()
self.resource_history_repo = await ResourceHistoryRepository.create()

def run(self, *args, **kwargs):
asyncio.run(self.receive_messages())

async def receive_messages(self):
with tracer.start_as_current_span("deployment_status_receive_messages"):
last_heartbeat_time = 0
Expand All @@ -45,9 +43,12 @@ async def receive_messages(self):
try:
current_time = time.time()
polling_count += 1

# Update heartbeat for supervisor monitoring
self.update_heartbeat()
# Log a heartbeat message every 60 seconds to show the service is still working
if current_time - last_heartbeat_time >= 60:
logger.info(f"Queue reader heartbeat: Polled {config.SERVICE_BUS_DEPLOYMENT_STATUS_UPDATE_QUEUE} queue {polling_count} times in the last minute")
logger.info(f"{config.SERVICE_BUS_DEPLOYMENT_STATUS_UPDATE_QUEUE} queue polled {polling_count} times in the last minute")
last_heartbeat_time = current_time
polling_count = 0

Expand All @@ -73,15 +74,15 @@ async def receive_messages(self):
# Timeout occurred whilst connecting to a session - this is expected and indicates no non-empty sessions are available
logger.debug("No sessions for this process. Will look again...")

except ServiceBusConnectionError:
except ServiceBusConnectionError as e:
# Occasionally there will be a transient / network-level error in connecting to SB.
logger.info("Unknown Service Bus connection error. Will retry...")
logger.warning(f"Service Bus connection error (will retry): {e}")

except Exception as e:
# Catch all other exceptions, log them via .exception to get the stack trace, and reconnect
logger.exception(f"Unknown exception. Will retry - {e}")
logger.exception(f"Unexpected error in message processing: {type(e).__name__}: {e}")

async def process_message(self, msg):
async def process_message(self, msg) -> bool:
complete_message = False
message = ""

Expand Down Expand Up @@ -115,6 +116,11 @@ async def update_status_in_database(self, message: DeploymentStatusUpdateMessage
try:
# update the op
operation = await self.operations_repo.get_operation_by_id(str(message.operationId))

# Add null safety for operation steps
if not operation.steps:
raise ValueError(f"Operation {message.operationId} has no steps")

step_to_update = None
is_last_step = False

Expand All @@ -128,7 +134,7 @@ async def update_status_in_database(self, message: DeploymentStatusUpdateMessage
is_last_step = True

if step_to_update is None:
raise f"Error finding step {message.stepId} in operation {message.operationId}"
raise ValueError(f"Step {message.stepId} not found in operation {message.operationId}")

# update the step status
step_to_update.status = message.status
Expand Down Expand Up @@ -159,7 +165,8 @@ async def update_status_in_database(self, message: DeploymentStatusUpdateMessage

# more steps in the op to do?
if is_last_step is False:
assert current_step_index < (len(operation.steps) - 1)
if current_step_index >= len(operation.steps) - 1:
raise ValueError(f"Step index {current_step_index} is the last step in operation (has {len(operation.steps)} steps), but more steps were expected")
next_step = operation.steps[current_step_index + 1]

# catch any errors in updating the resource - maybe Cosmos / schema invalid etc, and report them back to the op
Expand Down Expand Up @@ -255,7 +262,7 @@ def get_failure_status_for_action(self, action: RequestAction):

return status

def create_updated_resource_document(self, resource: dict, message: DeploymentStatusUpdateMessage):
def create_updated_resource_document(self, resource: Dict[str, Any], message: DeploymentStatusUpdateMessage) -> Dict[str, Any]:
"""
Merge the outputs with the resource document to persist
"""
Expand All @@ -268,7 +275,7 @@ def create_updated_resource_document(self, resource: dict, message: DeploymentSt

return resource

def convert_outputs_to_dict(self, outputs_list: [Output]):
def convert_outputs_to_dict(self, outputs_list: List[Output]) -> Dict[str, Any]:
"""
Convert a list of Porter outputs to a dictionary
"""
Expand Down
95 changes: 95 additions & 0 deletions api_app/service_bus/service_bus_consumer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
import asyncio
import time

from services.logging import logger

# Configuration constants for monitoring intervals
HEARTBEAT_CHECK_INTERVAL_SECONDS = 60
HEARTBEAT_STALENESS_THRESHOLD_SECONDS = 300
RESTART_DELAY_SECONDS = 5
MAX_RESTART_DELAY_SECONDS = 300
SUPERVISOR_ERROR_DELAY_SECONDS = 30


class ServiceBusConsumer:

def __init__(self, consumer_name: str):
self.service_name = consumer_name.replace('_', ' ').title()
self._last_heartbeat: float = time.monotonic()
self._restart_delay: float = RESTART_DELAY_SECONDS
logger.info(f"Initializing {self.service_name}")

def update_heartbeat(self):
self._last_heartbeat = time.monotonic()

def check_heartbeat(self, max_age_seconds: int = HEARTBEAT_STALENESS_THRESHOLD_SECONDS) -> bool:
age = time.monotonic() - self._last_heartbeat
if age > max_age_seconds:
logger.warning(f"{self.service_name} heartbeat is {age:.1f}s old (threshold: {max_age_seconds}s)")
return False
return True

async def _receive_messages_loop(self):
"""Run receive_messages() in a loop with exponential backoff on failure."""
while True:
try:
start_time = time.monotonic()
logger.info(f"Starting {self.service_name} receive_messages loop...")
await self.receive_messages()
logger.warning(f"{self.service_name} receive_messages() returned unexpectedly")
except asyncio.CancelledError:
raise
except Exception as e:
logger.exception(f"{self.service_name} receive_messages failed: {e}")

# Reset backoff if the consumer ran long enough to be considered healthy
elapsed = time.monotonic() - start_time
if elapsed > self._restart_delay:
self._restart_delay = RESTART_DELAY_SECONDS

logger.info(f"{self.service_name} restarting in {self._restart_delay:.0f}s...")
await asyncio.sleep(self._restart_delay)
self._restart_delay = min(self._restart_delay * 2, MAX_RESTART_DELAY_SECONDS)

async def supervisor_with_heartbeat_check(self):
task = None
try:
while True:
try:
if task is None or task.done():
if task and task.done():
try:
await task
except Exception as e:
logger.exception(f"{self.service_name} task failed unexpectedly: {e}")
await asyncio.sleep(RESTART_DELAY_SECONDS)

logger.info(f"Starting {self.service_name} task...")
task = asyncio.create_task(self._receive_messages_loop())
self.update_heartbeat()

await asyncio.sleep(HEARTBEAT_CHECK_INTERVAL_SECONDS)

if not self.check_heartbeat():
logger.warning(f"{self.service_name} heartbeat stale, restarting...")
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
task = None
self._restart_delay = RESTART_DELAY_SECONDS
except Exception as e:
logger.exception(f"{self.service_name} supervisor error: {e}")
await asyncio.sleep(SUPERVISOR_ERROR_DELAY_SECONDS)
finally:
if task and not task.done():
logger.info(f"Cleaning up {self.service_name} task...")
task.cancel()
try:
await task
except asyncio.CancelledError:
pass

async def receive_messages(self):
raise NotImplementedError("Subclasses must implement receive_messages()")
19 changes: 18 additions & 1 deletion api_app/services/health_checker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Tuple
from typing import Optional, Tuple
from azure.core import exceptions
from azure.servicebus.aio import ServiceBusClient
from azure.mgmt.compute.aio import ComputeManagementClient
Expand All @@ -11,6 +11,7 @@
from core import config
from models.schemas.status import StatusEnum
from resources import strings
from service_bus.service_bus_consumer import ServiceBusConsumer
from services.logging import logger


Expand Down Expand Up @@ -55,6 +56,22 @@ async def create_service_bus_status(credential) -> Tuple[StatusEnum, str]:
return status, message


def create_consumer_status(consumer: Optional[ServiceBusConsumer], name: str) -> Tuple[StatusEnum, str]:
if consumer is None:
return StatusEnum.not_ok, strings.CONSUMER_NOT_INITIALIZED.format(name)
if consumer.check_heartbeat():
return StatusEnum.ok, ""
return StatusEnum.not_ok, strings.CONSUMER_HEARTBEAT_STALE.format(name)


async def create_deployment_consumer_status(consumer: Optional[ServiceBusConsumer]) -> Tuple[StatusEnum, str]:
return create_consumer_status(consumer, strings.DEPLOYMENT_STATUS_CONSUMER)


async def create_airlock_consumer_status(consumer: Optional[ServiceBusConsumer]) -> Tuple[StatusEnum, str]:
return create_consumer_status(consumer, strings.AIRLOCK_STATUS_CONSUMER)


async def create_resource_processor_status(credential) -> Tuple[StatusEnum, str]:
status = StatusEnum.ok
message = ""
Expand Down
Loading
Loading