Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions python/packages/jumpstarter/jumpstarter/client/lease.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,22 @@ async def handle_async(self, stream):
await sleep(delay)
attempt += 1
continue
if e.code() == grpc.StatusCode.UNAVAILABLE:
remaining = deadline - time.monotonic()
if remaining <= 0:
logger.warning(
"Exporter unavailable and dial timeout (%.1fs) exceeded after %d attempts",
self.dial_timeout, attempt + 1
)
raise
delay = min(base_delay * (2 ** attempt), max_delay, remaining)
logger.debug(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
logger.debug(
logger.warning(

? WDYT?

"Exporter unavailable, retrying Dial in %.1fs (attempt %d, %.1fs remaining)",
delay, attempt + 1, remaining
)
await sleep(delay)
attempt += 1
continue
# Exporter went offline or lease ended - log and exit gracefully
if "permission denied" in str(e.details()).lower():
self.lease_transferred = True
Expand Down
76 changes: 76 additions & 0 deletions python/packages/jumpstarter/jumpstarter/client/lease_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,28 @@
from datetime import datetime, timedelta, timezone
from unittest.mock import AsyncMock, Mock, patch

import grpc
import pytest
from grpc.aio import AioRpcError
from rich.console import Console

from jumpstarter.client.lease import Lease, LeaseAcquisitionSpinner


class MockAioRpcError(AioRpcError):
"""Mock gRPC error for testing that properly inherits from AioRpcError."""

def __init__(self, status_code, message=""):
self._status_code = status_code
self._message = message

def code(self):
return self._status_code

def details(self):
return self._message


class TestLeaseAcquisitionSpinner:
"""Test cases for LeaseAcquisitionSpinner class."""

Expand Down Expand Up @@ -522,3 +538,63 @@ async def get_then_fail():
callback.assert_called()
_, remain_arg = callback.call_args[0]
assert remain_arg == timedelta(0)


class TestHandleAsyncUnavailableRetry:
"""Tests for Lease.handle_async UNAVAILABLE retry behavior."""

def _make_lease_for_handle(self):
lease = object.__new__(Lease)
lease.name = "test-lease"
lease.dial_timeout = 5.0
lease.lease_transferred = False
lease.tls_config = Mock()
lease.grpc_options = {}
lease.controller = Mock()
return lease

@pytest.mark.anyio
async def test_handle_async_retries_unavailable_then_succeeds(self):
"""Dial returns UNAVAILABLE once then succeeds on retry."""
lease = self._make_lease_for_handle()
dial_call_count = 0

async def mock_dial(request):
nonlocal dial_call_count
dial_call_count += 1
if dial_call_count == 1:
raise MockAioRpcError(grpc.StatusCode.UNAVAILABLE, "temporarily unavailable")
return Mock(router_endpoint="endpoint", router_token="token")

lease.controller.Dial = mock_dial

with patch("jumpstarter.client.lease.connect_router_stream") as mock_connect:
mock_connect.return_value.__aenter__ = AsyncMock()
mock_connect.return_value.__aexit__ = AsyncMock(return_value=False)
stream = Mock()

await lease.handle_async(stream)

assert dial_call_count == 2
mock_connect.assert_called_once_with("endpoint", "token", stream, lease.tls_config, lease.grpc_options)

@pytest.mark.anyio
async def test_handle_async_unavailable_exceeds_dial_timeout(self):
"""Dial returns UNAVAILABLE until dial_timeout is exceeded, then raises."""
lease = self._make_lease_for_handle()
lease.dial_timeout = 0.5
dial_call_count = 0

async def mock_dial(request):
nonlocal dial_call_count
dial_call_count += 1
raise MockAioRpcError(grpc.StatusCode.UNAVAILABLE, "permanently unavailable")

lease.controller.Dial = mock_dial
stream = Mock()

with pytest.raises(AioRpcError) as exc_info:
await lease.handle_async(stream)

assert exc_info.value.code() == grpc.StatusCode.UNAVAILABLE
assert dial_call_count >= 2
26 changes: 18 additions & 8 deletions python/packages/jumpstarter/jumpstarter/client/status_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,8 @@ async def _poll_loop(self): # noqa: C901
return

deadline_retries = 0
unavailable_retries = 0
unavailable_max_retries = 10

while self._running:
try:
Expand All @@ -343,6 +345,7 @@ async def _poll_loop(self): # noqa: C901
logger.info("Connection recovered, resetting connection_lost flag")
self._connection_lost = False
deadline_retries = 0
unavailable_retries = 0

# Detect missed transitions
if self._status_version > 0 and new_version > self._status_version + 1:
Expand Down Expand Up @@ -388,14 +391,21 @@ async def _poll_loop(self): # noqa: C901
self._signal_unsupported()
break
elif e.code() == StatusCode.UNAVAILABLE:
# Connection lost - exporter closed or restarted
logger.info("Connection lost (UNAVAILABLE), signaling waiters")
self._connection_lost = True
self._running = False
# Fire the change event to wake up any waiters
self._any_change_event.set()
self._any_change_event = Event()
break
unavailable_retries += 1
if unavailable_retries >= unavailable_max_retries:
logger.warning(
"GetStatus UNAVAILABLE %d times consecutively, marking connection as lost",
unavailable_retries,
)
self._connection_lost = True
self._running = False
self._any_change_event.set()
self._any_change_event = Event()
break
elif unavailable_retries % 5 == 0:
logger.warning("GetStatus UNAVAILABLE %d times consecutively", unavailable_retries)
else:
logger.debug("GetStatus UNAVAILABLE (attempt %d), retrying...", unavailable_retries)
elif e.code() == StatusCode.DEADLINE_EXCEEDED:
# DEADLINE_EXCEEDED is a transient error (RPC timed out), not a
# permanent connection loss. Keep polling - the shell's own timeout
Expand Down
Loading
Loading