diff --git a/python/packages/jumpstarter/jumpstarter/exporter/exporter.py b/python/packages/jumpstarter/jumpstarter/exporter/exporter.py index 236651766..f5f55985a 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/exporter.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/exporter.py @@ -9,7 +9,6 @@ from anyio import ( AsyncContextManagerMixin, CancelScope, - Event, connect_unix, create_memory_object_stream, create_task_group, @@ -28,6 +27,7 @@ from jumpstarter.config.tls import TLSConfigV1Alpha1 from jumpstarter.exporter.hooks import HookExecutor from jumpstarter.exporter.lease_context import LeaseContext +from jumpstarter.exporter.lease_lifecycle import InvalidTransitionError, LeasePhase from jumpstarter.exporter.session import Session if TYPE_CHECKING: @@ -371,15 +371,17 @@ async def _request_lease_release(self): logger.debug("No active lease to release") return + lc = self._lease_context.lifecycle + # If the lease has already ended (controller sent leased=false, or a previous # call already released it), skip the release RPC. A stale release_lease=true # would release a subsequently-assigned lease on the controller. - if self._lease_context.lease_ended.is_set(): + if lc.is_end_requested(): logger.debug("Lease already ended, skipping release request") return if self._standalone: - self._lease_context.lease_ended.set() + lc.request_end() return try: @@ -394,14 +396,12 @@ async def _request_lease_release(self): logger.info("Requested controller to release lease %s", self._lease_context.lease_name) except Exception as e: logger.error("Failed to request lease release: %s", e) - # Fall through - the client can still release the lease as a fallback, - # or the lease will eventually expire # Directly signal lease ended so handle_lease can exit. # The controller may not send another leased=False after our release request, # so we signal it ourselves as a fallback. - if self._lease_context and not self._lease_context.lease_ended.is_set(): - self._lease_context.lease_ended.set() + if self._lease_context and not lc.is_end_requested(): + lc.request_end() async def _unregister_with_controller(self): """Safely unregister from controller with timeout and error handling.""" @@ -471,15 +471,15 @@ async def _handle_client_conn( async def _handle_end_session(self, lease_context: LeaseContext) -> None: """Handle EndSession requests from client. - Waits for the end_session_requested event, runs the afterLease hook, - and signals after_lease_hook_done when complete. This allows clients - to receive afterLease hook logs before the connection is closed. + Waits for the end_session_requested event, runs the afterLease hook via + lifecycle transitions, and completes the lifecycle when done. This allows + clients to receive afterLease hook logs before the connection is closed. Args: lease_context: The LeaseContext for the current lease. """ - logger.debug("_handle_end_session task started, waiting for end_session_requested or lease_ended event") - # Wait for EndSession or lease end, whichever happens first + lc = lease_context.lifecycle + logger.debug("_handle_end_session task started, waiting for end_session_requested or lease end") async with create_task_group() as wait_tg: async def _wait_end_session(): @@ -487,14 +487,13 @@ async def _wait_end_session(): wait_tg.cancel_scope.cancel() async def _wait_lease_end(): - await lease_context.lease_ended.wait() + await lc.wait_end_requested() wait_tg.cancel_scope.cancel() wait_tg.start_soon(_wait_end_session) wait_tg.start_soon(_wait_lease_end) - # If lease ended without EndSession, exit cleanly (handle_lease finally block handles cleanup) - if lease_context.lease_ended.is_set() and not lease_context.end_session_requested.is_set(): + if lc.is_end_requested() and not lease_context.end_session_requested.is_set(): logger.debug("Lease ended without EndSession; exiting EndSession handler") return @@ -502,37 +501,18 @@ async def _wait_lease_end(): logger.info("EndSession requested, running afterLease hook") try: - # Check if hook already started (via lease state transition) - if lease_context.after_lease_hook_started.is_set(): - logger.debug("afterLease hook already started, waiting for completion") - await lease_context.after_lease_hook_done.wait() - return - - # Mark hook as started to prevent duplicate execution - logger.debug("Marking afterLease hook as started") - lease_context.after_lease_hook_started.set() - - if self.hook_executor and lease_context.has_client() and not lease_context.skip_after_lease_hook: - logger.debug("Calling run_after_lease_hook") - with CancelScope(shield=True): - await self.hook_executor.run_after_lease_hook( - lease_context, - self._report_status, - self.stop, - self._request_lease_release, - ) - logger.info("afterLease hook completed via EndSession") - else: - if lease_context.skip_after_lease_hook: - logger.info("Skipping afterLease hook: beforeLease hook failed") - else: - logger.debug("No afterLease hook configured or no client, transitioning to AVAILABLE") - await self._report_status(ExporterStatus.AVAILABLE, "Available for new lease") + await self._run_ending_phase(lease_context) + except InvalidTransitionError: + if not lc.is_complete(): + logger.debug("Another task owns the ending phase, waiting for completion") + await lc.wait_complete() except Exception as e: logger.error("Error running afterLease hook via EndSession: %s", e) - finally: - # Signal that the hook is done (whether it ran or not) - lease_context.after_lease_hook_done.set() + try: + if not lc.is_complete(): + lc.transition(LeasePhase.FAILED) + except InvalidTransitionError: + pass @asynccontextmanager async def session(self): @@ -587,63 +567,105 @@ async def session_for_lease(self): yield session, main_path, hook_path logger.info("Session closed") + async def _run_ending_phase(self, lease_scope: LeaseContext) -> None: + """Transition through ENDING → AFTER_LEASE → RELEASING → DONE. + + Tries to transition to ENDING (if in READY), then runs the after-hook + if applicable. Raises InvalidTransitionError if another task already + owns the ending phase. + """ + lc = lease_scope.lifecycle + + if lc.phase == LeasePhase.READY: + lc.transition(LeasePhase.ENDING) + elif lc.phase != LeasePhase.ENDING: + raise InvalidTransitionError(lc.phase, LeasePhase.ENDING) + + should_run_after = ( + self.hook_executor + and (lease_scope.has_client() or self._standalone) + and not lc.skip_after_lease + ) + + if should_run_after: + lc.transition(LeasePhase.AFTER_LEASE) + with CancelScope(shield=True): + await self.hook_executor.run_after_lease_hook( + lease_scope, + self._report_status, + self.stop, + self._request_lease_release, + ) + lc.transition(LeasePhase.RELEASING) + else: + if lc.skip_after_lease: + logger.info("Skipping afterLease hook: beforeLease hook failed") + lc.transition(LeasePhase.RELEASING) + if not self._stop_requested: + await self._report_status(ExporterStatus.AVAILABLE, "Available for new lease") + + lc.transition(LeasePhase.DONE) + + async def _run_before_hook_lifecycle(self, lease_scope: LeaseContext) -> None: + """Wrap run_before_lease_hook with lifecycle transitions. + + Transitions to BEFORE_LEASE before calling the hook, then transitions + to READY (or ENDING if end was requested during hook execution). + """ + lc = lease_scope.lifecycle + lc.transition(LeasePhase.BEFORE_LEASE) + try: + await self.hook_executor.run_before_lease_hook( + lease_scope, + self._report_status, + self.stop, + self._request_lease_release, + ) + finally: + if lc.phase == LeasePhase.BEFORE_LEASE: + if lc.end_requested: + lc.transition(LeasePhase.ENDING) + else: + lc.transition(LeasePhase.READY) + async def _cleanup_after_lease(self, lease_scope: LeaseContext) -> None: """Run afterLease hook cleanup when handle_lease exits. This handles the finally-block logic: shielding from cancellation, - running the afterLease hook if appropriate, and transitioning to AVAILABLE. + waiting for the before-hook to complete via lifecycle, then running the + ending phase (after-hook → release → done) through the lifecycle FSM. """ + lc = lease_scope.lifecycle with CancelScope(shield=True): - # Wait for beforeLease hook to complete before running afterLease. + # Wait for lifecycle to reach at least READY before running afterLease. # When a lease ends during hook execution, the hook must finish # (subject to its configured timeout) before cleanup proceeds. - # Safety timeout: prevent permanent deadlock if before_lease_hook - # was never set due to a race (e.g. conn_tg cancelled early). - # Use the configured hook timeout (+ margin) when available so we - # never interrupt a legitimately-running beforeLease hook. - safety_timeout = 15 # generous default for no-hook / unknown cases - if ( - self.hook_executor - and self.hook_executor.config.before_lease - ): + safety_timeout = 15 + if self.hook_executor and self.hook_executor.config.before_lease: safety_timeout = self.hook_executor.config.before_lease.timeout + 30 with move_on_after(safety_timeout) as timeout_scope: - await lease_scope.before_lease_hook.wait() + await lc.wait_ready() if timeout_scope.cancelled_caught: - logger.warning( - "Timed out waiting for before_lease_hook; forcing it set to avoid deadlock" - ) - lease_scope.before_lease_hook.set() - - if not lease_scope.after_lease_hook_started.is_set(): - lease_scope.after_lease_hook_started.set() - if (self.hook_executor - and (lease_scope.has_client() or self._standalone) - and not lease_scope.skip_after_lease_hook): - logger.info("Running afterLease hook on session close") - await self.hook_executor.run_after_lease_hook( - lease_scope, - self._report_status, - self.stop, - self._request_lease_release, - ) - else: - if lease_scope.skip_after_lease_hook: - logger.info("Skipping afterLease hook: beforeLease hook failed") - if not self._stop_requested: - logger.debug( - "No afterLease hook or no client on session close," - " transitioning to AVAILABLE" - ) - await self._report_status(ExporterStatus.AVAILABLE, "Available for new lease") - else: - logger.debug("Exporter is shutting down, skipping AVAILABLE status report") - if not lease_scope.after_lease_hook_done.is_set(): - lease_scope.after_lease_hook_done.set() - else: - logger.debug("Waiting for afterLease hook to complete before closing session") - await lease_scope.after_lease_hook_done.wait() - logger.debug("afterLease hook completed, closing session") + logger.warning("Timed out waiting for lifecycle to reach READY; forcing FAILED") + try: + lc.transition(LeasePhase.FAILED) + except InvalidTransitionError: + pass + return + + try: + await self._run_ending_phase(lease_scope) + except InvalidTransitionError: + if not lc.is_complete(): + logger.debug("Another task owns the ending phase, waiting for completion") + await lc.wait_complete() + except Exception as e: + logger.error("Error during lease cleanup: %s", e) + try: + if not lc.is_complete(): + lc.transition(LeasePhase.FAILED) + except InvalidTransitionError: + pass async def handle_lease(self, lease_name: str, tg: TaskGroup, lease_scope: LeaseContext) -> None: """Handle all incoming client connections for a lease. @@ -669,48 +691,26 @@ async def handle_lease(self, lease_name: str, tg: TaskGroup, lease_scope: LeaseC the serve() method when a lease is assigned. It terminates when the lease ends or the exporter stops. """ + lc = lease_scope.lifecycle logger.info("Listening for incoming connection requests on lease %s", lease_name) - # Buffer Listen responses to avoid blocking when responses arrive before - # process_connections starts iterating. This prevents a race condition where - # the client dials immediately after lease acquisition but before the session is ready. listen_tx, listen_rx = create_memory_object_stream[jumpstarter_pb2.ListenResponse](max_buffer_size=10) - # Create session for the lease duration and populate lease_scope - # Uses dual sockets: main socket for clients, hook socket for j commands async with self.session_for_lease() as (session, main_path, hook_path): - # Populate the lease scope with session and socket paths lease_scope.session = session lease_scope.socket_path = main_path - lease_scope.hook_socket_path = hook_path # Isolated socket for hook j commands - # Link session to lease context for EndSession RPC + lease_scope.hook_socket_path = hook_path session.lease_context = lease_scope - # Sync status from LeaseContext to Session (status may have been updated - # before session was created, e.g., BEFORE_LEASE_HOOK when hooks are configured) session.update_status(lease_scope.current_status, lease_scope.status_message) logger.debug("Session sockets: main=%s, hook=%s", main_path, hook_path) - # Accept connections immediately - driver calls will be gated internally - # until the beforeLease hook completes. This allows LogStream to work - # during hook execution for real-time log streaming. - logger.info("Accepting connections (driver calls gated until beforeLease hook completes)") - - # Note: Status is managed by _report_status() which updates both LeaseContext - # and Session. The sync above handles the case where status was updated before - # session creation (e.g., BEFORE_LEASE_HOOK when hooks are configured). + lc.transition(LeasePhase.STARTING) + logger.info("Accepting connections (driver calls gated until lifecycle reaches READY)") - # Start task to handle EndSession requests (runs afterLease hook when client signals done) tg.start_soon(self._handle_end_session, lease_scope) - # Process client connections until lease ends - # The lease can end via: - # 1. listen_rx stream closing (controller stops sending) - # 2. lease_ended event being set (serve() detected lease status change) - # Type: request is jumpstarter_pb2.ListenResponse with router_endpoint and router_token fields try: async with create_task_group() as conn_tg: - # Start listening for connection requests with retry logic - # This is inside conn_tg so it gets cancelled when the lease ends conn_tg.start_soon( self._retry_stream, "Listen", @@ -719,16 +719,12 @@ async def handle_lease(self, lease_name: str, tg: TaskGroup, lease_scope: LeaseC ) async def wait_for_lease_end(): - """Wait for lease_ended event and cancel the connection loop.""" - await lease_scope.lease_ended.wait() - logger.info("Lease ended event received, stopping connection handling") + await lc.wait_end_requested() + logger.info("Lease end requested, stopping connection handling") conn_tg.cancel_scope.cancel() async def process_connections(): - """Process incoming connection requests.""" - # Wait for beforeLease hook to complete before routing connections. - # The Listen buffer holds early Dials; we process them after ready. - await lease_scope.before_lease_hook.wait() + await lc.wait_ready() logger.debug("Starting to process connection requests from Listen stream") async for request in listen_rx: logger.info( @@ -748,24 +744,11 @@ async def process_connections(): conn_tg.start_soon(wait_for_lease_end) conn_tg.start_soon(process_connections) - # Report LEASE_READY if no beforeLease hook is configured. - # This MUST happen after Listen stream is started so the - # controller can forward client Dial requests. if not self.hook_executor: await self._report_status(ExporterStatus.LEASE_READY, "Ready for commands") - lease_scope.before_lease_hook.set() + lc.transition(LeasePhase.READY) finally: - # Ensure before_lease_hook is set so _cleanup_after_lease never - # blocks forever. When conn_tg is cancelled before the no-hook - # path reaches lease_scope.before_lease_hook.set(), this flag - # remains unset and _cleanup_after_lease (shielded) deadlocks. - if not lease_scope.before_lease_hook.is_set(): - lease_scope.before_lease_hook.set() - # Close the listen stream to signal termination to listen_rx await listen_tx.aclose() - # Run afterLease hook before closing the session - # This ensures the socket is still available for driver calls within the hook - # Shield from cancellation so the hook can complete even during shutdown await self._cleanup_after_lease(lease_scope) # Fallback: clear _lease_context if leased→unleased handler didn't fire @@ -797,17 +780,10 @@ async def serve(self): # noqa: C901 previous_leased = self._previous_leased current_leased = status.leased - # Check if this is a new lease assignment (no active lease context and we have a lease name) - # This handles both first lease and subsequent leases after the previous one ended if self._lease_context is None and status.lease_name != "" and current_leased: self._started = True logger.info("Starting new lease: %s", status.lease_name) - # Create lease scope and start handling the lease - # The session will be created inside handle_lease and stay open for the lease duration - lease_scope = LeaseContext( - lease_name=status.lease_name, - before_lease_hook=Event(), - ) + lease_scope = LeaseContext(lease_name=status.lease_name) self._lease_context = lease_scope tg.start_soon(self.handle_lease, status.lease_name, tg, lease_scope) @@ -816,37 +792,25 @@ async def serve(self): # noqa: C901 if self._lease_context: self._lease_context.update_client(status.client_name) - # Before-lease hook when transitioning from unleased to leased if not previous_leased: if self.hook_executor and self._lease_context: tg.start_soon( - self.hook_executor.run_before_lease_hook, + self._run_before_hook_lifecycle, self._lease_context, - self._report_status, - self.stop, # Pass shutdown callback - self._request_lease_release, # Pass lease release callback ) - # else: No hook configured — LEASE_READY is set inside handle_lease() - # after session and Listen stream are established else: logger.info("Currently not leased") - # Lease ended: signal handle_lease() so it can exit its loop and run - # cleanup/afterLease hook in its finally block (where session is still open) if previous_leased and self._lease_context: lease_ctx = self._lease_context - logger.info("Lease ended, signaling handle_lease to run afterLease hook") - lease_ctx.lease_ended.set() + logger.info("Lease ended, signaling lifecycle") + lease_ctx.lifecycle.request_end() - # Wait for the hook to complete with CancelScope(shield=True): - await lease_ctx.after_lease_hook_done.wait() - logger.info("afterLease hook completed") + await lease_ctx.lifecycle.wait_complete() + logger.info("Lease lifecycle completed") - # Clear lease scope for next lease self._lease_context = None - # Brief delay to ensure session is fully closed before next lease - # This prevents SSL corruption from overlapping connections await sleep(0.2) logger.debug("Ready for next lease") @@ -872,7 +836,7 @@ async def serve_standalone_tcp( is set to LEASE_READY. Runs until stop() cancels the task group. """ self._standalone = True - lease_scope = LeaseContext(lease_name="standalone", before_lease_hook=Event()) + lease_scope = LeaseContext(lease_name="standalone") self._lease_context = lease_scope with TemporarySocket() as hook_path: @@ -887,6 +851,9 @@ async def serve_standalone_tcp( lease_scope.socket_path = hook_path_str lease_scope.hook_socket_path = hook_path_str + lc = lease_scope.lifecycle + lc.transition(LeasePhase.STARTING) + async with session.serve_tcp_and_unix_async( host, port, hook_path_str, tls_credentials=tls_credentials, @@ -898,15 +865,18 @@ async def serve_standalone_tcp( tg.start_soon(self._handle_end_session, lease_scope) if self.hook_executor: + lc.transition(LeasePhase.BEFORE_LEASE) await self.hook_executor.run_before_lease_hook( lease_scope, self._report_status, self.stop, self._request_lease_release, ) + if lc.phase == LeasePhase.BEFORE_LEASE: + lc.transition(LeasePhase.READY) else: await self._report_status(ExporterStatus.LEASE_READY, "Ready for commands") - lease_scope.before_lease_hook.set() + lc.transition(LeasePhase.READY) await _standalone_shutdown_waiter() finally: diff --git a/python/packages/jumpstarter/jumpstarter/exporter/exporter_test.py b/python/packages/jumpstarter/jumpstarter/exporter/exporter_test.py index a25c496e2..6f4456b68 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/exporter_test.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/exporter_test.py @@ -2,7 +2,7 @@ These tests verify the exporter correctly handles lease lifecycle edge cases including premature lease-end during hooks, unused lease timeouts, -consecutive leases, and idempotent lease-end signals. +consecutive leases, and idempotent lifecycle completion. """ from contextlib import nullcontext @@ -10,10 +10,11 @@ import anyio import pytest -from anyio import Event, create_task_group +from anyio import create_task_group from jumpstarter.common import ExporterStatus from jumpstarter.exporter.lease_context import LeaseContext +from jumpstarter.exporter.lease_lifecycle import LeasePhase pytestmark = pytest.mark.anyio @@ -21,7 +22,6 @@ def make_lease_context(lease_name="test-lease", client_name="test-client"): ctx = LeaseContext( lease_name=lease_name, - before_lease_hook=Event(), client_name=client_name, ) mock_session = MagicMock() @@ -29,6 +29,23 @@ def make_lease_context(lease_name="test-lease", client_name="test-client"): ctx.session = mock_session ctx.socket_path = "/tmp/test_socket" ctx.hook_socket_path = "/tmp/test_hook_socket" + ctx.lifecycle.transition(LeasePhase.STARTING) + ctx.lifecycle.transition(LeasePhase.READY) + return ctx + + +def make_lease_context_before_ready(lease_name="test-lease", client_name="test-client"): + """Create a lease context in STARTING phase (before READY).""" + ctx = LeaseContext( + lease_name=lease_name, + client_name=client_name, + ) + mock_session = MagicMock() + mock_session.context_log_source.return_value = nullcontext() + ctx.session = mock_session + ctx.socket_path = "/tmp/test_socket" + ctx.hook_socket_path = "/tmp/test_hook_socket" + ctx.lifecycle.transition(LeasePhase.STARTING) return ctx @@ -47,13 +64,13 @@ def make_exporter(lease_ctx, hook_executor=None): class TestLeaseEndDuringHook: - async def test_cleanup_waits_for_before_lease_hook_before_running_after_lease(self): - """_cleanup_after_lease must wait for the beforeLease hook to - complete before starting the afterLease hook. This prevents - running afterLease while beforeLease is still in progress.""" - lease_ctx = make_lease_context() + async def test_cleanup_waits_for_ready_before_running_ending_phase(self): + """_cleanup_after_lease must wait for lifecycle to reach READY + before starting the ending phase.""" + lease_ctx = make_lease_context_before_ready() + lc = lease_ctx.lifecycle - after_lease_started_before_hook_done = False + ending_started_before_ready = False from jumpstarter.config.exporter import HookConfigV1Alpha1, HookInstanceConfigV1Alpha1 from jumpstarter.exporter.hooks import HookExecutor @@ -66,34 +83,29 @@ async def test_cleanup_waits_for_before_lease_hook_before_running_after_lease(se original_run_after = hook_executor.run_after_lease_hook async def tracking_run_after(*args, **kwargs): - nonlocal after_lease_started_before_hook_done - if not lease_ctx.before_lease_hook.is_set(): - after_lease_started_before_hook_done = True + nonlocal ending_started_before_ready + if lc.phase == LeasePhase.STARTING: + ending_started_before_ready = True return await original_run_after(*args, **kwargs) hook_executor.run_after_lease_hook = tracking_run_after - exporter = make_exporter(lease_ctx, hook_executor) async with create_task_group() as tg: - - async def delayed_hook_complete(): + async def delayed_ready(): await anyio.sleep(0.2) - lease_ctx.before_lease_hook.set() + lc.transition(LeasePhase.READY) - tg.start_soon(delayed_hook_complete) + tg.start_soon(delayed_ready) await exporter._cleanup_after_lease(lease_ctx) - assert not after_lease_started_before_hook_done, ( - "afterLease hook started before beforeLease hook completed" - ) - assert lease_ctx.after_lease_hook_done.is_set() + assert not ending_started_before_ready + assert lc.is_complete() async def test_exporter_returns_to_available_after_premature_lease_end(self): """After a lease ends during beforeLease hook execution, exporter must transition to AVAILABLE once hooks complete.""" lease_ctx = make_lease_context() - lease_ctx.before_lease_hook.set() statuses = [] @@ -106,13 +118,12 @@ async def track_status(status, message=""): await exporter._cleanup_after_lease(lease_ctx) assert ExporterStatus.AVAILABLE in statuses - assert lease_ctx.after_lease_hook_done.is_set() + assert lease_ctx.lifecycle.is_complete() async def test_new_lease_accepted_after_recovery_from_premature_end(self): """After recovering from a premature lease-end, a new LeaseContext can be created and the exporter processes it normally.""" lease_ctx_1 = make_lease_context(lease_name="lease-1") - lease_ctx_1.before_lease_hook.set() statuses = [] @@ -124,16 +135,15 @@ async def track_status(status, message=""): await exporter._cleanup_after_lease(lease_ctx_1) assert ExporterStatus.AVAILABLE in statuses - assert lease_ctx_1.after_lease_hook_done.is_set() + assert lease_ctx_1.lifecycle.is_complete() lease_ctx_2 = make_lease_context(lease_name="lease-2") - lease_ctx_2.before_lease_hook.set() exporter._lease_context = lease_ctx_2 statuses.clear() await exporter._cleanup_after_lease(lease_ctx_2) assert ExporterStatus.AVAILABLE in statuses - assert lease_ctx_2.after_lease_hook_done.is_set() + assert lease_ctx_2.lifecycle.is_complete() class TestUnusedLeaseTimeout: @@ -141,7 +151,6 @@ async def test_unused_lease_timeout_transitions_to_available(self): """When a lease ends with no client session (unused lease timeout), the exporter must transition to AVAILABLE.""" lease_ctx = make_lease_context(client_name="") - lease_ctx.before_lease_hook.set() statuses = [] @@ -154,7 +163,7 @@ async def track_status(status, message=""): await exporter._cleanup_after_lease(lease_ctx) assert ExporterStatus.AVAILABLE in statuses - assert lease_ctx.after_lease_hook_done.is_set() + assert lease_ctx.lifecycle.is_complete() async def test_unused_lease_with_hooks_runs_after_lease_when_client_present(self): """When a lease ends with a client (normal end or timeout after @@ -163,7 +172,6 @@ async def test_unused_lease_with_hooks_runs_after_lease_when_client_present(self from jumpstarter.exporter.hooks import HookExecutor lease_ctx = make_lease_context(client_name="some-client") - lease_ctx.before_lease_hook.set() hook_config = HookConfigV1Alpha1( after_lease=HookInstanceConfigV1Alpha1(script="echo cleanup", timeout=10), @@ -182,13 +190,12 @@ async def track_status(status, message=""): assert ExporterStatus.AFTER_LEASE_HOOK in statuses assert ExporterStatus.AVAILABLE in statuses - assert lease_ctx.after_lease_hook_done.is_set() + assert lease_ctx.lifecycle.is_complete() async def test_new_lease_after_unused_timeout_recovery(self): """After recovering from unused lease timeout, a new lease can be accepted and processed.""" lease_ctx_1 = make_lease_context(lease_name="unused-lease", client_name="") - lease_ctx_1.before_lease_hook.set() statuses = [] @@ -200,39 +207,35 @@ async def track_status(status, message=""): await exporter._cleanup_after_lease(lease_ctx_1) assert ExporterStatus.AVAILABLE in statuses - assert lease_ctx_1.after_lease_hook_done.is_set() + assert lease_ctx_1.lifecycle.is_complete() lease_ctx_2 = make_lease_context(lease_name="new-lease", client_name="real-client") - lease_ctx_2.before_lease_hook.set() exporter._lease_context = lease_ctx_2 statuses.clear() await exporter._cleanup_after_lease(lease_ctx_2) assert ExporterStatus.AVAILABLE in statuses - assert lease_ctx_2.after_lease_hook_done.is_set() + assert lease_ctx_2.lifecycle.is_complete() class TestConsecutiveLeaseOrdering: - async def test_after_lease_done_before_new_lease_context_created(self): + async def test_lifecycle_completes_before_new_lease_context_created(self): """The serve() loop must not create a new LeaseContext until the - previous lease's after_lease_hook_done is set.""" + previous lease's lifecycle is complete.""" lease_ctx_1 = make_lease_context(lease_name="lease-1") - lease_ctx_1.before_lease_hook.set() - exporter = make_exporter(lease_ctx_1) exporter._report_status = AsyncMock() await exporter._cleanup_after_lease(lease_ctx_1) - assert lease_ctx_1.after_lease_hook_done.is_set() + assert lease_ctx_1.lifecycle.is_complete() exporter._lease_context = None lease_ctx_2 = make_lease_context(lease_name="lease-2") exporter._lease_context = lease_ctx_2 - lease_ctx_2.before_lease_hook.set() await exporter._cleanup_after_lease(lease_ctx_2) - assert lease_ctx_2.after_lease_hook_done.is_set() + assert lease_ctx_2.lifecycle.is_complete() async def test_consecutive_leases_run_hooks_in_strict_order(self): """For two consecutive leases, afterLease(1) must complete before @@ -247,7 +250,6 @@ async def test_consecutive_leases_run_hooks_in_strict_order(self): hook_executor = HookExecutor(config=hook_config) events = [] - original_run_before = hook_executor.run_before_lease_hook original_run_after = hook_executor.run_after_lease_hook @@ -285,21 +287,17 @@ async def tracking_after(*args, **kwargs): after1_end = events.index("after_end", events.index("after_start")) before2_start = events.index("before_start", after1_end) - assert after1_end < before2_start, ( - f"afterLease(1) end at {after1_end} must be before " - f"beforeLease(2) start at {before2_start}. Events: {events}" - ) + assert after1_end < before2_start -class TestBeforeLeaseHookSafetyTimeout: - async def test_cleanup_forces_hook_set_on_safety_timeout(self): - """When before_lease_hook is never set (race condition), +class TestLifecycleSafetyTimeout: + async def test_cleanup_handles_stuck_before_hook_via_timeout(self): + """When lifecycle never reaches READY (race condition), _cleanup_after_lease must not deadlock. The safety timeout - forces the event set and cleanup proceeds normally.""" + forces FAILED and cleanup proceeds.""" from unittest.mock import patch - lease_ctx = make_lease_context() - # Deliberately do NOT set before_lease_hook to simulate the race condition + lease_ctx = make_lease_context_before_ready() exporter = make_exporter(lease_ctx) statuses = [] @@ -309,28 +307,19 @@ async def track_status(status, message=""): exporter._report_status = AsyncMock(side_effect=track_status) - # Patch move_on_after to use a tiny timeout so the test runs fast original_move_on_after = anyio.move_on_after def fast_move_on_after(delay, *args, **kwargs): - # Replace any safety timeout with 0.1s for fast testing return original_move_on_after(0.1, *args, **kwargs) with patch("jumpstarter.exporter.exporter.move_on_after", side_effect=fast_move_on_after): await exporter._cleanup_after_lease(lease_ctx) - # The event should be force-set by the timeout handler - assert lease_ctx.before_lease_hook.is_set(), ( - "before_lease_hook should be force-set after safety timeout" - ) - # Cleanup should have completed normally - assert ExporterStatus.AVAILABLE in statuses - assert lease_ctx.after_lease_hook_done.is_set() + assert lease_ctx.lifecycle.phase == LeasePhase.FAILED async def test_safety_timeout_uses_hook_config_when_available(self): """When a hook executor with before_lease config is present, - the safety timeout should use the configured hook timeout + 30s - margin rather than the default 15s.""" + the safety timeout should use the configured hook timeout + 30s.""" from unittest.mock import patch from jumpstarter.config.exporter import HookConfigV1Alpha1, HookInstanceConfigV1Alpha1 @@ -342,8 +331,6 @@ async def test_safety_timeout_uses_hook_config_when_available(self): hook_executor = HookExecutor(config=hook_config) lease_ctx = make_lease_context() - lease_ctx.before_lease_hook.set() # Set so we don't actually timeout - exporter = make_exporter(lease_ctx, hook_executor) captured_timeouts = [] @@ -356,42 +343,10 @@ def tracking_move_on_after(delay, *args, **kwargs): with patch("jumpstarter.exporter.exporter.move_on_after", side_effect=tracking_move_on_after): await exporter._cleanup_after_lease(lease_ctx) - # The safety timeout should be hook timeout (60) + margin (30) = 90 - assert 90 in captured_timeouts, ( - f"Expected safety timeout of 90s (60 + 30), got timeouts: {captured_timeouts}" - ) - + assert 90 in captured_timeouts -class TestHandleLeaseFinally: - async def test_finally_sets_before_lease_hook_on_early_cancel(self): - """When conn_tg is cancelled before before_lease_hook.set() is - reached (no hook executor path), the finally block must ensure - the event is set so _cleanup_after_lease can proceed.""" - lease_ctx = make_lease_context() - # Verify the event starts unset - assert not lease_ctx.before_lease_hook.is_set() - exporter = make_exporter(lease_ctx) - # Mock methods needed by handle_lease - exporter.uuid = "test-uuid" - exporter.labels = {} - exporter.tls = None - exporter.grpc_options = None - - # We test just the finally-block behavior by calling - # _cleanup_after_lease with an unset event: the primary fix is - # in handle_lease's finally, but we can verify _cleanup_after_lease - # handles the unset event via the safety timeout. - # A more direct test: simulate what the finally block does. - if not lease_ctx.before_lease_hook.is_set(): - lease_ctx.before_lease_hook.set() - - assert lease_ctx.before_lease_hook.is_set(), ( - "before_lease_hook must be set after the finally-block logic" - ) - - -class TestIdempotentLeaseEnd: +class TestIdempotentCleanup: async def test_duplicate_cleanup_is_noop(self): """Calling _cleanup_after_lease twice for the same LeaseContext must not run afterLease hook twice. The second call waits for the @@ -415,14 +370,105 @@ async def counting_run_after(*args, **kwargs): hook_executor.run_after_lease_hook = counting_run_after lease_ctx = make_lease_context() - lease_ctx.before_lease_hook.set() exporter = make_exporter(lease_ctx, hook_executor) exporter._report_status = AsyncMock() await exporter._cleanup_after_lease(lease_ctx) await exporter._cleanup_after_lease(lease_ctx) - assert after_hook_call_count == 1, ( - f"afterLease hook ran {after_hook_call_count} times, expected exactly 1" + assert after_hook_call_count == 1 + assert lease_ctx.lifecycle.is_complete() + + +class TestBeforeHookLifecycleWrapper: + async def test_before_hook_transitions_to_ready_on_success(self): + """_run_before_hook_lifecycle transitions BEFORE_LEASE → READY.""" + from jumpstarter.config.exporter import HookConfigV1Alpha1, HookInstanceConfigV1Alpha1 + from jumpstarter.exporter.hooks import HookExecutor + + hook_config = HookConfigV1Alpha1( + before_lease=HookInstanceConfigV1Alpha1(script="echo setup", timeout=10), ) - assert lease_ctx.after_lease_hook_done.is_set() + hook_executor = HookExecutor(config=hook_config) + + lease_ctx = make_lease_context_before_ready() + lease_ctx.lifecycle.transition(LeasePhase.READY) + lease_ctx_new = make_lease_context_before_ready() + + exporter = make_exporter(lease_ctx_new, hook_executor) + exporter._report_status = AsyncMock() + + await exporter._run_before_hook_lifecycle(lease_ctx_new) + assert lease_ctx_new.lifecycle.phase == LeasePhase.READY + + async def test_before_hook_transitions_to_ending_when_end_requested(self): + """When end was requested during BEFORE_LEASE, transitions to ENDING.""" + from jumpstarter.config.exporter import HookConfigV1Alpha1, HookInstanceConfigV1Alpha1 + from jumpstarter.exporter.hooks import HookExecutor + + hook_config = HookConfigV1Alpha1( + before_lease=HookInstanceConfigV1Alpha1(script="echo setup", timeout=10), + ) + hook_executor = HookExecutor(config=hook_config) + + lease_ctx = make_lease_context_before_ready() + exporter = make_exporter(lease_ctx, hook_executor) + exporter._report_status = AsyncMock() + + original_run = hook_executor.run_before_lease_hook + + async def run_and_request_end(*args, **kwargs): + await original_run(*args, **kwargs) + lease_ctx.lifecycle.request_end() + + hook_executor.run_before_lease_hook = run_and_request_end + + await exporter._run_before_hook_lifecycle(lease_ctx) + assert lease_ctx.lifecycle.phase == LeasePhase.ENDING + + +class TestRunEndingPhase: + async def test_ending_phase_with_after_hook(self): + """_run_ending_phase runs afterLease hook and transitions to DONE.""" + from jumpstarter.config.exporter import HookConfigV1Alpha1, HookInstanceConfigV1Alpha1 + from jumpstarter.exporter.hooks import HookExecutor + + hook_config = HookConfigV1Alpha1( + after_lease=HookInstanceConfigV1Alpha1(script="echo cleanup", timeout=10), + ) + hook_executor = HookExecutor(config=hook_config) + + lease_ctx = make_lease_context() + exporter = make_exporter(lease_ctx, hook_executor) + exporter._report_status = AsyncMock() + + await exporter._run_ending_phase(lease_ctx) + assert lease_ctx.lifecycle.phase == LeasePhase.DONE + + async def test_ending_phase_without_hook(self): + """_run_ending_phase transitions to DONE via RELEASING when no hook.""" + lease_ctx = make_lease_context() + exporter = make_exporter(lease_ctx) + exporter._report_status = AsyncMock() + + await exporter._run_ending_phase(lease_ctx) + assert lease_ctx.lifecycle.phase == LeasePhase.DONE + + async def test_ending_phase_skips_when_skip_flag_set(self): + """_run_ending_phase skips afterLease when skip_after_lease is True.""" + from jumpstarter.config.exporter import HookConfigV1Alpha1, HookInstanceConfigV1Alpha1 + from jumpstarter.exporter.hooks import HookExecutor + + hook_config = HookConfigV1Alpha1( + after_lease=HookInstanceConfigV1Alpha1(script="echo cleanup", timeout=10), + ) + hook_executor = HookExecutor(config=hook_config) + + lease_ctx = make_lease_context() + lease_ctx.lifecycle.skip_after_lease = True + exporter = make_exporter(lease_ctx, hook_executor) + exporter._report_status = AsyncMock() + + await exporter._run_ending_phase(lease_ctx) + assert lease_ctx.lifecycle.phase == LeasePhase.DONE + assert lease_ctx.lifecycle.phase != LeasePhase.AFTER_LEASE diff --git a/python/packages/jumpstarter/jumpstarter/exporter/hooks.py b/python/packages/jumpstarter/jumpstarter/exporter/hooks.py index 9563506f6..d0a997015 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/hooks.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/hooks.py @@ -594,7 +594,6 @@ async def run_before_lease_hook( error_msg = "Timeout waiting for lease scope to be ready" logger.error(error_msg) await report_status(ExporterStatus.BEFORE_LEASE_HOOK_FAILED, error_msg) - lease_scope.before_lease_hook.set() return await anyio.sleep(interval) elapsed += interval @@ -652,11 +651,6 @@ async def run_before_lease_hook( ExporterStatus.BEFORE_LEASE_HOOK_FAILED, f"beforeLease hook failed: {e}", ) - # Unexpected errors don't trigger shutdown - just block the lease - - finally: - # Always set the event to unblock connections - lease_scope.before_lease_hook.set() async def run_after_lease_hook( self, diff --git a/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py b/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py index ab31d81c5..18669a75e 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py @@ -59,18 +59,13 @@ def hook_config() -> HookConfigV1Alpha1: @pytest.fixture def lease_scope(): - from anyio import Event - from jumpstarter.exporter.lease_context import LeaseContext lease_scope = LeaseContext( lease_name="test-lease-123", - before_lease_hook=Event(), client_name="test-client", ) - # Add mock session to lease_scope mock_session = MagicMock() - # Return a no-op context manager for context_log_source mock_session.context_log_source.return_value = nullcontext() lease_scope.session = mock_session lease_scope.socket_path = "/tmp/test_socket" @@ -206,8 +201,6 @@ async def test_failed_hook_with_warn_logs_warning_inside_log_source_context(self """ from contextlib import contextmanager - from anyio import Event - from jumpstarter.exporter.lease_context import LeaseContext hook_config = HookConfigV1Alpha1( @@ -215,7 +208,6 @@ async def test_failed_hook_with_warn_logs_warning_inside_log_source_context(self ) executor = HookExecutor(config=hook_config) - # Track whether context_log_source is active when warning is logged context_active = False warning_logged_in_context = False @@ -230,7 +222,6 @@ def tracking_context_log_source(logger_name, source): lease_scope = LeaseContext( lease_name="test-lease-ctx", - before_lease_hook=Event(), client_name="test-client", ) mock_session = MagicMock() @@ -432,6 +423,7 @@ async def test_before_lease_hook_exit_sets_skip_flag(self, lease_scope) -> None: ) assert lease_scope.skip_after_lease_hook is True + assert lease_scope.lifecycle.skip_after_lease is True mock_shutdown.assert_called_once_with(exit_code=1, wait_for_lease_exit=True, should_unregister=True) async def test_before_lease_hook_endlease_does_not_set_skip_flag(self, lease_scope) -> None: @@ -778,22 +770,21 @@ async def test_infrastructure_messages_at_debug_not_info(self, lease_scope) -> N # User output should be at INFO level assert any("user output" in call for call in info_calls) - async def test_before_lease_hook_always_sets_event_on_failure(self, lease_scope) -> None: - """Issue C3: before_lease_hook event must be set even when hook fails. - - When the beforeLease hook fails with on_failure=endLease, the event must - still be set to unblock process_connections in handle_lease. Otherwise - the lease hangs indefinitely. - """ + async def test_before_lease_hook_reports_failure_status(self, lease_scope) -> None: + """When the beforeLease hook fails with on_failure=endLease, the hook must + report BEFORE_LEASE_HOOK_FAILED status. The lifecycle transition to READY + is now handled by the exporter wrapper, not the hook executor.""" hook_config = HookConfigV1Alpha1( before_lease=HookInstanceConfigV1Alpha1(script="exit 1", timeout=10, on_failure="endLease"), ) executor = HookExecutor(config=hook_config) - mock_report_status = AsyncMock() - mock_shutdown = MagicMock() + status_calls = [] - assert not lease_scope.before_lease_hook.is_set() + async def mock_report_status(status, msg): + status_calls.append((status, msg)) + + mock_shutdown = MagicMock() await executor.run_before_lease_hook( lease_scope, @@ -801,15 +792,12 @@ async def test_before_lease_hook_always_sets_event_on_failure(self, lease_scope) mock_shutdown, ) - # Event must always be set to unblock connections - assert lease_scope.before_lease_hook.is_set() - - async def test_before_lease_hook_always_sets_event_on_exit(self, lease_scope) -> None: - """Issue C3b: before_lease_hook event must be set when hook fails with exit. + failed = [s for s, _ in status_calls if s == ExporterStatus.BEFORE_LEASE_HOOK_FAILED] + assert len(failed) > 0 - Same as C3 but for on_failure=exit. The event must be set, shutdown called, - and skip_after_lease_hook set to True. - """ + async def test_before_lease_hook_exit_sets_skip_and_shutdown(self, lease_scope) -> None: + """When hook fails with on_failure=exit, shutdown is called + and skip_after_lease_hook is set.""" hook_config = HookConfigV1Alpha1( before_lease=HookInstanceConfigV1Alpha1(script="exit 1", timeout=10, on_failure="exit"), ) @@ -824,7 +812,6 @@ async def test_before_lease_hook_always_sets_event_on_exit(self, lease_scope) -> mock_shutdown, ) - assert lease_scope.before_lease_hook.is_set() assert lease_scope.skip_after_lease_hook is True mock_shutdown.assert_called_once() diff --git a/python/packages/jumpstarter/jumpstarter/exporter/lease_context.py b/python/packages/jumpstarter/jumpstarter/exporter/lease_context.py index ece434430..bbb1362f4 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/lease_context.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/lease_context.py @@ -10,6 +10,7 @@ from anyio import Event from jumpstarter.common import ExporterStatus +from jumpstarter.exporter.lease_lifecycle import LeaseLifecycle if TYPE_CHECKING: from jumpstarter.exporter.session import Session @@ -19,42 +20,42 @@ class LeaseContext: """Encapsulates all resources associated with an active lease. - This class bundles together the session, socket path, synchronization event, + This class bundles together the session, socket path, lifecycle controller, and lease identity information that are needed throughout the lease lifecycle. By grouping these resources, we make their relationships and lifecycles explicit. Attributes: lease_name: Name of the current lease assigned by the controller + lifecycle: LeaseLifecycle FSM that coordinates all lease phase transitions + end_session_requested: Event that signals when client requests end session (gRPC layer) session: The Session object managing the device and gRPC services (set in handle_lease) socket_path: Unix socket path where the session is serving (set in handle_lease) hook_socket_path: Separate Unix socket for hook j commands to avoid SSL frame corruption - before_lease_hook: Event that signals when before-lease hook completes - end_session_requested: Event that signals when client requests end session (to run afterLease hook) - after_lease_hook_started: Event that signals when afterLease hook has started (prevents double execution) - after_lease_hook_done: Event that signals when afterLease hook has completed - lease_ended: Event that signals when the lease has ended (from controller status update) client_name: Name of the client currently holding the lease (empty if unleased) current_status: Current exporter status (stored here for access before session is created) status_message: Message describing the current status """ lease_name: str - before_lease_hook: Event + lifecycle: LeaseLifecycle = field(default_factory=LeaseLifecycle) end_session_requested: Event = field(default_factory=Event) - after_lease_hook_started: Event = field(default_factory=Event) - after_lease_hook_done: Event = field(default_factory=Event) - lease_ended: Event = field(default_factory=Event) # Signals lease has ended (from controller) session: "Session | None" = None socket_path: str = "" - hook_socket_path: str = "" # Separate socket for hook j commands to avoid SSL corruption + hook_socket_path: str = "" client_name: str = field(default="") current_status: ExporterStatus = field(default=ExporterStatus.AVAILABLE) status_message: str = field(default="") - skip_after_lease_hook: bool = False + + @property + def skip_after_lease_hook(self) -> bool: + return self.lifecycle.skip_after_lease + + @skip_after_lease_hook.setter + def skip_after_lease_hook(self, value: bool) -> None: + self.lifecycle.skip_after_lease = value def __post_init__(self): """Validate that required resources are present.""" - assert self.before_lease_hook is not None, "LeaseScope requires a before_lease_hook event" assert self.lease_name, "LeaseScope requires a non-empty lease_name" def is_ready(self) -> bool: @@ -90,22 +91,22 @@ def update_status(self, status: ExporterStatus, message: str = ""): """ self.current_status = status self.status_message = message - # Also update session if it exists if self.session: self.session.update_status(status, message) def drivers_ready(self) -> bool: - """Check if drivers are ready for use (beforeLease hook completed). + """Check if drivers are ready for use (lifecycle has reached READY or later). - Returns True if the beforeLease hook has completed and drivers can be accessed. - Used by Session to gate driver calls during hook execution. + Returns True if the lease lifecycle has passed the READY gate and drivers + can be accessed. Used by Session to gate driver calls during hook execution. """ - return self.before_lease_hook.is_set() + return self.lifecycle.drivers_ready() async def wait_for_drivers(self) -> None: - """Wait for drivers to be ready (beforeLease hook to complete). + """Wait for drivers to be ready (lifecycle reaches READY phase). - This method blocks until the beforeLease hook completes, allowing - clients to connect early but wait for driver access. + This method blocks until the beforeLease hook completes and the lifecycle + transitions to READY, allowing clients to connect early but wait for + driver access. """ - await self.before_lease_hook.wait() + await self.lifecycle.wait_ready() diff --git a/python/packages/jumpstarter/jumpstarter/exporter/lease_context_test.py b/python/packages/jumpstarter/jumpstarter/exporter/lease_context_test.py index 143a54a94..a616b7210 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/lease_context_test.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/lease_context_test.py @@ -2,23 +2,23 @@ from unittest.mock import MagicMock +import anyio import pytest -from anyio import Event from jumpstarter.common import ExporterStatus from jumpstarter.exporter.lease_context import LeaseContext +from jumpstarter.exporter.lease_lifecycle import LeaseLifecycle, LeasePhase pytestmark = pytest.mark.anyio class TestLeaseContextInitialization: def test_init_with_required_fields(self) -> None: - """Test that LeaseContext can be created with required fields.""" - before_hook = Event() - ctx = LeaseContext(lease_name="test-lease", before_lease_hook=before_hook) + ctx = LeaseContext(lease_name="test-lease") assert ctx.lease_name == "test-lease" - assert ctx.before_lease_hook is before_hook + assert isinstance(ctx.lifecycle, LeaseLifecycle) + assert ctx.lifecycle.phase == LeasePhase.CREATED assert ctx.session is None assert ctx.socket_path == "" assert ctx.hook_socket_path == "" @@ -27,175 +27,111 @@ def test_init_with_required_fields(self) -> None: assert ctx.status_message == "" def test_init_validates_lease_name_non_empty(self) -> None: - """Test that LeaseContext requires a non-empty lease_name.""" - before_hook = Event() with pytest.raises(AssertionError, match="non-empty lease_name"): - LeaseContext(lease_name="", before_lease_hook=before_hook) - - def test_init_validates_before_lease_hook_present(self) -> None: - """Test that LeaseContext requires before_lease_hook to be non-None.""" - with pytest.raises(AssertionError, match="before_lease_hook"): - LeaseContext(lease_name="test-lease", before_lease_hook=None) # type: ignore - - def test_default_events_created(self) -> None: - """Test that default events are created properly.""" - before_hook = Event() - ctx = LeaseContext(lease_name="test-lease", before_lease_hook=before_hook) + LeaseContext(lease_name="") + def test_end_session_requested_event_created(self) -> None: + ctx = LeaseContext(lease_name="test-lease") assert ctx.end_session_requested is not None - assert ctx.after_lease_hook_started is not None - assert ctx.after_lease_hook_done is not None - assert ctx.lease_ended is not None - # Events should not be set by default assert not ctx.end_session_requested.is_set() - assert not ctx.after_lease_hook_started.is_set() - assert not ctx.after_lease_hook_done.is_set() - assert not ctx.lease_ended.is_set() class TestLeaseContextStateQueries: def test_is_ready_false_without_session(self) -> None: - """Test that is_ready() returns False when session is None.""" - before_hook = Event() - ctx = LeaseContext(lease_name="test-lease", before_lease_hook=before_hook) + ctx = LeaseContext(lease_name="test-lease") ctx.socket_path = "/tmp/socket" - assert not ctx.is_ready() def test_is_ready_false_without_socket_path(self) -> None: - """Test that is_ready() returns False when socket_path is empty.""" - before_hook = Event() - ctx = LeaseContext(lease_name="test-lease", before_lease_hook=before_hook) + ctx = LeaseContext(lease_name="test-lease") ctx.session = MagicMock() - assert not ctx.is_ready() def test_is_ready_true_with_both(self) -> None: - """Test that is_ready() returns True when both session and socket_path are set.""" - before_hook = Event() - ctx = LeaseContext(lease_name="test-lease", before_lease_hook=before_hook) + ctx = LeaseContext(lease_name="test-lease") ctx.session = MagicMock() ctx.socket_path = "/tmp/socket" - assert ctx.is_ready() def test_is_active_true_with_lease_name(self) -> None: - """Test that is_active() returns True when lease_name is non-empty.""" - before_hook = Event() - ctx = LeaseContext(lease_name="active-lease", before_lease_hook=before_hook) - + ctx = LeaseContext(lease_name="active-lease") assert ctx.is_active() def test_has_client_true_with_client_name(self) -> None: - """Test that has_client() returns True when client_name is set.""" - before_hook = Event() - ctx = LeaseContext( - lease_name="test-lease", - before_lease_hook=before_hook, - client_name="my-client", - ) - + ctx = LeaseContext(lease_name="test-lease", client_name="my-client") assert ctx.has_client() def test_has_client_false_without_client_name(self) -> None: - """Test that has_client() returns False when client_name is empty.""" - before_hook = Event() - ctx = LeaseContext(lease_name="test-lease", before_lease_hook=before_hook) - + ctx = LeaseContext(lease_name="test-lease") assert not ctx.has_client() class TestLeaseContextClientManagement: def test_update_client_sets_name(self) -> None: - """Test that update_client() sets the client name.""" - before_hook = Event() - ctx = LeaseContext(lease_name="test-lease", before_lease_hook=before_hook) - + ctx = LeaseContext(lease_name="test-lease") ctx.update_client("new-client") - assert ctx.client_name == "new-client" assert ctx.has_client() def test_clear_client_removes_name(self) -> None: - """Test that clear_client() removes the client name.""" - before_hook = Event() - ctx = LeaseContext( - lease_name="test-lease", - before_lease_hook=before_hook, - client_name="my-client", - ) - + ctx = LeaseContext(lease_name="test-lease", client_name="my-client") ctx.clear_client() - assert ctx.client_name == "" assert not ctx.has_client() class TestLeaseContextStatusUpdates: def test_update_status_stores_status(self) -> None: - """Test that update_status() stores the status in the context.""" - before_hook = Event() - ctx = LeaseContext(lease_name="test-lease", before_lease_hook=before_hook) - + ctx = LeaseContext(lease_name="test-lease") ctx.update_status(ExporterStatus.LEASE_READY, "ready to go") - assert ctx.current_status == ExporterStatus.LEASE_READY assert ctx.status_message == "ready to go" def test_update_status_propagates_to_session(self) -> None: - """Test that update_status() propagates status to session when present.""" - before_hook = Event() - ctx = LeaseContext(lease_name="test-lease", before_lease_hook=before_hook) + ctx = LeaseContext(lease_name="test-lease") mock_session = MagicMock() ctx.session = mock_session - ctx.update_status(ExporterStatus.BEFORE_LEASE_HOOK, "running hook") - mock_session.update_status.assert_called_once_with( ExporterStatus.BEFORE_LEASE_HOOK, "running hook" ) def test_update_status_without_session_no_error(self) -> None: - """Test that update_status() works without session (no error).""" - before_hook = Event() - ctx = LeaseContext(lease_name="test-lease", before_lease_hook=before_hook) - - # Should not raise any exception + ctx = LeaseContext(lease_name="test-lease") ctx.update_status(ExporterStatus.AVAILABLE, "available") - assert ctx.current_status == ExporterStatus.AVAILABLE class TestLeaseContextDriversReady: - def test_drivers_ready_false_when_hook_not_set(self) -> None: - """Test that drivers_ready() returns False when hook event is not set.""" - before_hook = Event() - ctx = LeaseContext(lease_name="test-lease", before_lease_hook=before_hook) - + def test_drivers_ready_false_before_ready_phase(self) -> None: + ctx = LeaseContext(lease_name="test-lease") assert not ctx.drivers_ready() - def test_drivers_ready_true_when_hook_set(self) -> None: - """Test that drivers_ready() returns True when hook event is set.""" - before_hook = Event() - before_hook.set() - ctx = LeaseContext(lease_name="test-lease", before_lease_hook=before_hook) - + def test_drivers_ready_true_after_ready_phase(self) -> None: + ctx = LeaseContext(lease_name="test-lease") + ctx.lifecycle.transition(LeasePhase.STARTING) + ctx.lifecycle.transition(LeasePhase.READY) assert ctx.drivers_ready() - async def test_wait_for_drivers_blocks_until_set(self) -> None: - """Test that wait_for_drivers() blocks until hook event is set.""" - import anyio + async def test_wait_for_drivers_blocks_until_ready(self) -> None: + ctx = LeaseContext(lease_name="test-lease") - before_hook = Event() - ctx = LeaseContext(lease_name="test-lease", before_lease_hook=before_hook) - - # Set the event after a short delay - async def set_after_delay(): + async def transition_after_delay(): await anyio.sleep(0.05) - before_hook.set() + ctx.lifecycle.transition(LeasePhase.STARTING) + ctx.lifecycle.transition(LeasePhase.READY) async with anyio.create_task_group() as tg: - tg.start_soon(set_after_delay) + tg.start_soon(transition_after_delay) await ctx.wait_for_drivers() assert ctx.drivers_ready() + + +class TestLeaseContextSkipAfterLeaseHook: + def test_skip_after_lease_hook_delegates_to_lifecycle(self) -> None: + ctx = LeaseContext(lease_name="test-lease") + assert ctx.skip_after_lease_hook is False + ctx.skip_after_lease_hook = True + assert ctx.skip_after_lease_hook is True + assert ctx.lifecycle.skip_after_lease is True diff --git a/python/packages/jumpstarter/jumpstarter/exporter/lease_lifecycle.py b/python/packages/jumpstarter/jumpstarter/exporter/lease_lifecycle.py new file mode 100644 index 000000000..45a0402bd --- /dev/null +++ b/python/packages/jumpstarter/jumpstarter/exporter/lease_lifecycle.py @@ -0,0 +1,116 @@ +from __future__ import annotations + +import logging +from enum import Enum + +from anyio import Event + +logger = logging.getLogger(__name__) + + +class LeasePhase(Enum): + CREATED = "created" + STARTING = "starting" + BEFORE_LEASE = "before_lease" + READY = "ready" + ENDING = "ending" + AFTER_LEASE = "after_lease" + RELEASING = "releasing" + DONE = "done" + FAILED = "failed" + + +class InvalidTransitionError(Exception): + def __init__(self, current: LeasePhase, target: LeasePhase) -> None: + self.current = current + self.target = target + super().__init__(f"Invalid transition {current.name} -> {target.name}") + + +_VALID_TRANSITIONS: dict[LeasePhase, frozenset[LeasePhase]] = { + LeasePhase.CREATED: frozenset({LeasePhase.STARTING, LeasePhase.FAILED}), + LeasePhase.STARTING: frozenset( + {LeasePhase.BEFORE_LEASE, LeasePhase.READY, LeasePhase.ENDING, LeasePhase.FAILED} + ), + LeasePhase.BEFORE_LEASE: frozenset({LeasePhase.READY, LeasePhase.ENDING, LeasePhase.FAILED}), + LeasePhase.READY: frozenset({LeasePhase.ENDING, LeasePhase.FAILED}), + LeasePhase.ENDING: frozenset( + {LeasePhase.AFTER_LEASE, LeasePhase.RELEASING, LeasePhase.DONE, LeasePhase.FAILED} + ), + LeasePhase.AFTER_LEASE: frozenset({LeasePhase.RELEASING, LeasePhase.FAILED}), + LeasePhase.RELEASING: frozenset({LeasePhase.DONE, LeasePhase.FAILED}), + LeasePhase.DONE: frozenset(), + LeasePhase.FAILED: frozenset(), +} + + +class LeaseLifecycle: + def __init__(self) -> None: + self._phase = LeasePhase.CREATED + self._end_requested = False + self._skip_after_lease = False + self._ready_event = Event() + self._complete_event = Event() + self._end_event = Event() + + @property + def phase(self) -> LeasePhase: + return self._phase + + @property + def end_requested(self) -> bool: + return self._end_requested + + @property + def skip_after_lease(self) -> bool: + return self._skip_after_lease + + @skip_after_lease.setter + def skip_after_lease(self, value: bool) -> None: + self._skip_after_lease = value + + def transition(self, target: LeasePhase) -> None: + if target not in _VALID_TRANSITIONS[self._phase]: + raise InvalidTransitionError(self._phase, target) + old = self._phase + self._phase = target + logger.debug("Lease lifecycle transition: %s -> %s", old.name, target.name) + if target in (LeasePhase.READY, LeasePhase.DONE, LeasePhase.FAILED): + self._ready_event.set() + if target in (LeasePhase.DONE, LeasePhase.FAILED): + self._complete_event.set() + if target == LeasePhase.ENDING: + self._end_event.set() + + def request_end(self) -> None: + self._end_event.set() + if self._phase == LeasePhase.READY: + self.transition(LeasePhase.ENDING) + elif self._phase in (LeasePhase.BEFORE_LEASE, LeasePhase.STARTING): + self._end_requested = True + + async def wait_ready(self) -> None: + await self._ready_event.wait() + + async def wait_complete(self) -> None: + await self._complete_event.wait() + + def is_ready(self) -> bool: + return self._phase == LeasePhase.READY + + def is_complete(self) -> bool: + return self._phase in (LeasePhase.DONE, LeasePhase.FAILED) + + def is_end_requested(self) -> bool: + return self._end_event.is_set() + + async def wait_end_requested(self) -> None: + await self._end_event.wait() + + def drivers_ready(self) -> bool: + return self._phase in ( + LeasePhase.READY, + LeasePhase.ENDING, + LeasePhase.AFTER_LEASE, + LeasePhase.RELEASING, + ) diff --git a/python/packages/jumpstarter/jumpstarter/exporter/lease_lifecycle_test.py b/python/packages/jumpstarter/jumpstarter/exporter/lease_lifecycle_test.py new file mode 100644 index 000000000..44946c683 --- /dev/null +++ b/python/packages/jumpstarter/jumpstarter/exporter/lease_lifecycle_test.py @@ -0,0 +1,367 @@ +import anyio +import pytest +from anyio import create_task_group + +from jumpstarter.exporter.lease_lifecycle import ( + InvalidTransitionError, + LeaseLifecycle, + LeasePhase, +) + +pytestmark = pytest.mark.anyio + + +def test_initial_state_created() -> None: + lc = LeaseLifecycle() + assert lc.phase == LeasePhase.CREATED + assert not lc.end_requested + assert not lc.skip_after_lease + + +def test_all_valid_transitions_succeed() -> None: + paths = [ + ( + LeasePhase.CREATED, + LeasePhase.STARTING, + LeasePhase.BEFORE_LEASE, + LeasePhase.READY, + LeasePhase.ENDING, + LeasePhase.AFTER_LEASE, + LeasePhase.RELEASING, + LeasePhase.DONE, + ), + ( + LeasePhase.CREATED, + LeasePhase.STARTING, + LeasePhase.READY, + LeasePhase.ENDING, + LeasePhase.RELEASING, + LeasePhase.DONE, + ), + ( + LeasePhase.CREATED, + LeasePhase.FAILED, + ), + ( + LeasePhase.CREATED, + LeasePhase.STARTING, + LeasePhase.ENDING, + LeasePhase.DONE, + ), + ( + LeasePhase.CREATED, + LeasePhase.STARTING, + LeasePhase.BEFORE_LEASE, + LeasePhase.ENDING, + LeasePhase.AFTER_LEASE, + LeasePhase.FAILED, + ), + ] + for sequence in paths: + lc = LeaseLifecycle() + for i, target in enumerate(sequence[1:], start=1): + lc.transition(target) + assert lc.phase == target, f"step {i} to {target}" + + +@pytest.mark.parametrize( + ("current", "target"), + [ + (LeasePhase.CREATED, LeasePhase.READY), + (LeasePhase.READY, LeasePhase.STARTING), + (LeasePhase.DONE, LeasePhase.CREATED), + (LeasePhase.DONE, LeasePhase.READY), + (LeasePhase.FAILED, LeasePhase.CREATED), + (LeasePhase.FAILED, LeasePhase.DONE), + ], +) +def test_invalid_transitions_raise(current: LeasePhase, target: LeasePhase) -> None: + lc = LeaseLifecycle() + if current == LeasePhase.READY: + lc.transition(LeasePhase.STARTING) + lc.transition(LeasePhase.READY) + elif current == LeasePhase.DONE: + lc.transition(LeasePhase.STARTING) + lc.transition(LeasePhase.READY) + lc.transition(LeasePhase.ENDING) + lc.transition(LeasePhase.RELEASING) + lc.transition(LeasePhase.DONE) + elif current == LeasePhase.FAILED: + lc.transition(LeasePhase.FAILED) + + with pytest.raises(InvalidTransitionError) as exc_info: + lc.transition(target) + assert exc_info.value.current == lc.phase + + +def test_request_end_in_ready_transitions_to_ending() -> None: + lc = LeaseLifecycle() + lc.transition(LeasePhase.STARTING) + lc.transition(LeasePhase.READY) + lc.request_end() + assert lc.phase == LeasePhase.ENDING + assert not lc.end_requested + + +def test_request_end_in_before_lease_records_intent_only() -> None: + lc = LeaseLifecycle() + lc.transition(LeasePhase.STARTING) + lc.transition(LeasePhase.BEFORE_LEASE) + lc.request_end() + assert lc.phase == LeasePhase.BEFORE_LEASE + assert lc.end_requested + + +def test_request_end_in_starting_records_intent_only() -> None: + lc = LeaseLifecycle() + lc.transition(LeasePhase.STARTING) + lc.request_end() + assert lc.phase == LeasePhase.STARTING + assert lc.end_requested + + +async def test_wait_ready_unblocks_on_ready() -> None: + lc = LeaseLifecycle() + seen = [] + + async def waiter() -> None: + await lc.wait_ready() + seen.append("ready") + + async def actor() -> None: + await anyio.sleep(0) + lc.transition(LeasePhase.STARTING) + lc.transition(LeasePhase.READY) + + async with create_task_group() as tg: + tg.start_soon(waiter) + tg.start_soon(actor) + + assert seen == ["ready"] + + +async def test_wait_ready_unblocks_on_done() -> None: + lc = LeaseLifecycle() + seen = [] + + async def waiter() -> None: + await lc.wait_ready() + seen.append("unblocked") + + async def actor() -> None: + await anyio.sleep(0) + lc.transition(LeasePhase.STARTING) + lc.transition(LeasePhase.ENDING) + lc.transition(LeasePhase.RELEASING) + lc.transition(LeasePhase.DONE) + + async with create_task_group() as tg: + tg.start_soon(waiter) + tg.start_soon(actor) + + assert seen == ["unblocked"] + + +async def test_wait_ready_unblocks_on_failed() -> None: + lc = LeaseLifecycle() + seen = [] + + async def waiter() -> None: + await lc.wait_ready() + seen.append("unblocked") + + async def actor() -> None: + await anyio.sleep(0) + lc.transition(LeasePhase.FAILED) + + async with create_task_group() as tg: + tg.start_soon(waiter) + tg.start_soon(actor) + + assert seen == ["unblocked"] + + +async def test_wait_complete_unblocks_on_done() -> None: + lc = LeaseLifecycle() + seen = [] + + async def waiter() -> None: + await lc.wait_complete() + seen.append("done") + + async def actor() -> None: + await anyio.sleep(0) + lc.transition(LeasePhase.STARTING) + lc.transition(LeasePhase.READY) + lc.transition(LeasePhase.ENDING) + lc.transition(LeasePhase.RELEASING) + lc.transition(LeasePhase.DONE) + + async with create_task_group() as tg: + tg.start_soon(waiter) + tg.start_soon(actor) + + assert seen == ["done"] + + +async def test_wait_complete_unblocks_on_failed() -> None: + lc = LeaseLifecycle() + seen = [] + + async def waiter() -> None: + await lc.wait_complete() + seen.append("failed") + + async def actor() -> None: + await anyio.sleep(0) + lc.transition(LeasePhase.FAILED) + + async with create_task_group() as tg: + tg.start_soon(waiter) + tg.start_soon(actor) + + assert seen == ["failed"] + + +def test_is_ready_after_ready_transition() -> None: + lc = LeaseLifecycle() + assert not lc.is_ready() + lc.transition(LeasePhase.STARTING) + lc.transition(LeasePhase.READY) + assert lc.is_ready() + + +def test_is_complete_after_done_transition() -> None: + lc = LeaseLifecycle() + assert not lc.is_complete() + lc.transition(LeasePhase.STARTING) + lc.transition(LeasePhase.READY) + lc.transition(LeasePhase.ENDING) + lc.transition(LeasePhase.RELEASING) + lc.transition(LeasePhase.DONE) + assert lc.is_complete() + + +def test_drivers_ready_false_early_phases() -> None: + lc = LeaseLifecycle() + assert not lc.drivers_ready() + lc.transition(LeasePhase.STARTING) + assert not lc.drivers_ready() + lc.transition(LeasePhase.BEFORE_LEASE) + assert not lc.drivers_ready() + + +def test_drivers_ready_true_when_gating_phases() -> None: + lc = LeaseLifecycle() + lc.transition(LeasePhase.STARTING) + lc.transition(LeasePhase.READY) + assert lc.drivers_ready() + lc.transition(LeasePhase.ENDING) + assert lc.drivers_ready() + lc.transition(LeasePhase.AFTER_LEASE) + assert lc.drivers_ready() + lc.transition(LeasePhase.RELEASING) + assert lc.drivers_ready() + + +def test_skip_after_lease_default_and_setter() -> None: + lc = LeaseLifecycle() + assert lc.skip_after_lease is False + lc.skip_after_lease = True + assert lc.skip_after_lease is True + + +def test_happy_path_full_sequence() -> None: + lc = LeaseLifecycle() + lc.transition(LeasePhase.STARTING) + lc.transition(LeasePhase.BEFORE_LEASE) + lc.transition(LeasePhase.READY) + lc.transition(LeasePhase.ENDING) + lc.transition(LeasePhase.AFTER_LEASE) + lc.transition(LeasePhase.RELEASING) + lc.transition(LeasePhase.DONE) + assert lc.phase == LeasePhase.DONE + + +def test_no_hook_path() -> None: + lc = LeaseLifecycle() + lc.transition(LeasePhase.STARTING) + lc.transition(LeasePhase.READY) + lc.transition(LeasePhase.ENDING) + lc.transition(LeasePhase.RELEASING) + lc.transition(LeasePhase.DONE) + assert lc.phase == LeasePhase.DONE + + +def test_early_end_during_before_lease() -> None: + lc = LeaseLifecycle() + lc.transition(LeasePhase.STARTING) + lc.transition(LeasePhase.BEFORE_LEASE) + lc.request_end() + assert lc.end_requested + assert lc.is_end_requested() + assert lc.phase == LeasePhase.BEFORE_LEASE + lc.transition(LeasePhase.ENDING) + assert lc.phase == LeasePhase.ENDING + + +def test_is_end_requested_false_initially() -> None: + lc = LeaseLifecycle() + assert not lc.is_end_requested() + + +def test_is_end_requested_after_request_end_in_ready() -> None: + lc = LeaseLifecycle() + lc.transition(LeasePhase.STARTING) + lc.transition(LeasePhase.READY) + lc.request_end() + assert lc.is_end_requested() + assert lc.phase == LeasePhase.ENDING + + +def test_is_end_requested_after_transition_to_ending() -> None: + lc = LeaseLifecycle() + lc.transition(LeasePhase.STARTING) + lc.transition(LeasePhase.ENDING) + assert lc.is_end_requested() + + +async def test_wait_end_requested_unblocks_on_request_end() -> None: + lc = LeaseLifecycle() + lc.transition(LeasePhase.STARTING) + lc.transition(LeasePhase.BEFORE_LEASE) + seen = [] + + async def waiter() -> None: + await lc.wait_end_requested() + seen.append("end_requested") + + async def actor() -> None: + await anyio.sleep(0) + lc.request_end() + + async with create_task_group() as tg: + tg.start_soon(waiter) + tg.start_soon(actor) + + assert seen == ["end_requested"] + + +async def test_wait_end_requested_unblocks_on_ending_transition() -> None: + lc = LeaseLifecycle() + lc.transition(LeasePhase.STARTING) + seen = [] + + async def waiter() -> None: + await lc.wait_end_requested() + seen.append("ending") + + async def actor() -> None: + await anyio.sleep(0) + lc.transition(LeasePhase.ENDING) + + async with create_task_group() as tg: + tg.start_soon(waiter) + tg.start_soon(actor) + + assert seen == ["ending"]