diff --git a/python/packages/jumpstarter/jumpstarter/client/core_test.py b/python/packages/jumpstarter/jumpstarter/client/core_test.py index 60b6fb622..f6fd5faee 100644 --- a/python/packages/jumpstarter/jumpstarter/client/core_test.py +++ b/python/packages/jumpstarter/jumpstarter/client/core_test.py @@ -1,14 +1,16 @@ """Tests for AsyncDriverClient async status methods.""" +import logging from dataclasses import dataclass from unittest.mock import AsyncMock, MagicMock from uuid import uuid4 +import anyio import pytest from grpc import StatusCode from grpc.aio import AioRpcError -from jumpstarter.common import ExporterStatus, Metadata +from jumpstarter.common import ExporterStatus, LogSource, Metadata pytestmark = pytest.mark.anyio @@ -293,3 +295,255 @@ async def test_wait_for_hook_status_unimplemented_returns_true(self) -> None: # Should return True (backward compatibility - assume hook complete) assert result is True + + +def create_log_stream_response(message: str, severity: str = "INFO", source=None): + """Create a mock LogStreamResponse.""" + response = MagicMock() + response.message = message + response.severity = severity + if source is not None: + response.HasField = lambda field: field == "source" + response.source = source.to_proto() + else: + response.HasField = lambda field: False + response.source = None + return response + + +class LogCapture(logging.Handler): + """Captures log records for assertion in tests.""" + + def __init__(self): + super().__init__() + self.records: list[logging.LogRecord] = [] + + def emit(self, record): + self.records.append(record) + + +SOURCE_LOGGER_NAMES = [ + "exporter:beforeLease", + "exporter:afterLease", + "exporter:driver", + "exporter:system", +] + + +def setup_log_stream_client(responses): + from jumpstarter.client.core import AsyncDriverClient + + all_delivered = anyio.Event() + + async def mock_log_stream(_): + for r in responses: + yield r + all_delivered.set() + await anyio.sleep_forever() + + stub = MagicMock() + stub.LogStream = mock_log_stream + + client = MagicMock(spec=AsyncDriverClient) + client.stub = stub + client.logger = logging.getLogger("test_log_stream_client") + client.log_stream_async = AsyncDriverClient.log_stream_async.__get__(client) + + captures = {} + for name in SOURCE_LOGGER_NAMES: + logger = logging.getLogger(name) + logger.setLevel(logging.DEBUG) + logger.handlers.clear() + capture = LogCapture() + logger.addHandler(capture) + captures[name] = capture + + return client, captures, all_delivered + + +@pytest.fixture(autouse=True) +def _clean_source_loggers(): + yield + for name in SOURCE_LOGGER_NAMES: + logger = logging.getLogger(name) + logger.handlers.clear() + + +class TestLogStreamResponseFactory: + def test_create_response_without_source(self) -> None: + response = create_log_stream_response("plain message") + assert response.message == "plain message" + assert response.severity == "INFO" + assert response.HasField("source") is False + assert response.source is None + + def test_create_response_with_source(self) -> None: + response = create_log_stream_response( + "hook message", severity="DEBUG", source=LogSource.BEFORE_LEASE_HOOK + ) + assert response.message == "hook message" + assert response.severity == "DEBUG" + assert response.HasField("source") is True + + +class TestLogStreamSourceTagPlacement: + async def test_hook_log_delegates_tagging_to_formatter(self) -> None: + responses = [ + create_log_stream_response( + "hook output line", + severity="INFO", + source=LogSource.BEFORE_LEASE_HOOK, + ), + ] + + client, captures, delivered = setup_log_stream_client(responses) + + async with client.log_stream_async(show_all_logs=True): + with anyio.fail_after(2): + await delivered.wait() + + records = captures["exporter:beforeLease"].records + assert len(records) == 1 + assert records[0].getMessage() == "hook output line" + assert records[0].name == "exporter:beforeLease" + + async def test_after_lease_hook_log_delegates_tagging_to_formatter(self) -> None: + responses = [ + create_log_stream_response( + "cleanup output", + severity="INFO", + source=LogSource.AFTER_LEASE_HOOK, + ), + ] + + client, captures, delivered = setup_log_stream_client(responses) + + async with client.log_stream_async(show_all_logs=True): + with anyio.fail_after(2): + await delivered.wait() + + records = captures["exporter:afterLease"].records + assert len(records) == 1 + assert records[0].getMessage() == "cleanup output" + assert records[0].name == "exporter:afterLease" + + async def test_logger_name_carries_source_for_formatter(self) -> None: + responses = [ + create_log_stream_response( + "line one", + severity="INFO", + source=LogSource.BEFORE_LEASE_HOOK, + ), + create_log_stream_response( + "line two", + severity="INFO", + source=LogSource.AFTER_LEASE_HOOK, + ), + ] + + client, captures, delivered = setup_log_stream_client(responses) + + async with client.log_stream_async(show_all_logs=True): + with anyio.fail_after(2): + await delivered.wait() + + before_records = captures["exporter:beforeLease"].records + after_records = captures["exporter:afterLease"].records + assert len(before_records) == 1 + assert before_records[0].name == "exporter:beforeLease" + assert before_records[0].getMessage() == "line one" + assert len(after_records) == 1 + assert after_records[0].name == "exporter:afterLease" + assert after_records[0].getMessage() == "line two" + + +class TestLogStreamFiltering: + async def test_show_all_logs_false_filters_system_logs(self) -> None: + responses = [ + create_log_stream_response( + "debug system message", + severity="DEBUG", + source=LogSource.SYSTEM, + ), + create_log_stream_response( + "hook output line", + severity="INFO", + source=LogSource.BEFORE_LEASE_HOOK, + ), + ] + + client, captures, delivered = setup_log_stream_client(responses) + + async with client.log_stream_async(show_all_logs=False): + with anyio.fail_after(2): + await delivered.wait() + + system_records = captures["exporter:system"].records + hook_records = captures["exporter:beforeLease"].records + assert len(system_records) == 0 + assert len(hook_records) == 1 + + async def test_show_all_logs_false_shows_hook_logs(self) -> None: + responses = [ + create_log_stream_response( + "before hook output", + severity="INFO", + source=LogSource.BEFORE_LEASE_HOOK, + ), + create_log_stream_response( + "after hook output", + severity="INFO", + source=LogSource.AFTER_LEASE_HOOK, + ), + ] + + client, captures, delivered = setup_log_stream_client(responses) + + async with client.log_stream_async(show_all_logs=False): + with anyio.fail_after(2): + await delivered.wait() + + before_records = captures["exporter:beforeLease"].records + after_records = captures["exporter:afterLease"].records + assert len(before_records) == 1 + assert len(after_records) == 1 + + async def test_show_all_logs_true_shows_system_logs(self) -> None: + responses = [ + create_log_stream_response( + "system message", + severity="INFO", + source=LogSource.SYSTEM, + ), + create_log_stream_response( + "hook output line", + severity="INFO", + source=LogSource.BEFORE_LEASE_HOOK, + ), + ] + + client, captures, delivered = setup_log_stream_client(responses) + + async with client.log_stream_async(show_all_logs=True): + with anyio.fail_after(2): + await delivered.wait() + + system_records = captures["exporter:system"].records + hook_records = captures["exporter:beforeLease"].records + assert len(system_records) == 1 + assert len(hook_records) == 1 + + async def test_log_without_source_routes_to_system(self) -> None: + responses = [ + create_log_stream_response("no source message"), + ] + + client, captures, delivered = setup_log_stream_client(responses) + + async with client.log_stream_async(show_all_logs=True): + with anyio.fail_after(2): + await delivered.wait() + + system_records = captures["exporter:system"].records + assert len(system_records) == 1 + assert system_records[0].getMessage() == "no source message" diff --git a/python/packages/jumpstarter/jumpstarter/exporter/exporter.py b/python/packages/jumpstarter/jumpstarter/exporter/exporter.py index 236651766..2d6cf632d 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/exporter.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/exporter.py @@ -35,6 +35,9 @@ logger = logging.getLogger(__name__) +DEFAULT_SAFETY_TIMEOUT_SECONDS = 15 +HOOK_TIMEOUT_MARGIN_SECONDS = 30 + async def _standalone_shutdown_waiter(): """Wait forever; used so serve_standalone_tcp can be cancelled by stop().""" @@ -601,12 +604,12 @@ async def _cleanup_after_lease(self, lease_scope: LeaseContext) -> None: # was never set due to a race (e.g. conn_tg cancelled early). # Use the configured hook timeout (+ margin) when available so we # never interrupt a legitimately-running beforeLease hook. - safety_timeout = 15 # generous default for no-hook / unknown cases + safety_timeout = DEFAULT_SAFETY_TIMEOUT_SECONDS if ( self.hook_executor and self.hook_executor.config.before_lease ): - safety_timeout = self.hook_executor.config.before_lease.timeout + 30 + safety_timeout = self.hook_executor.config.before_lease.timeout + HOOK_TIMEOUT_MARGIN_SECONDS with move_on_after(safety_timeout) as timeout_scope: await lease_scope.before_lease_hook.wait() if timeout_scope.cancelled_caught: diff --git a/python/packages/jumpstarter/jumpstarter/exporter/hooks.py b/python/packages/jumpstarter/jumpstarter/exporter/hooks.py index 9563506f6..3f9fabda6 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/hooks.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/hooks.py @@ -103,7 +103,7 @@ def _create_hook_env(self, lease_scope: "LeaseContext") -> dict[str, str]: # Falls back to main socket if hook socket not available (backward compatibility) socket_path = lease_scope.hook_socket_path or lease_scope.socket_path if lease_scope.hook_socket_path: - logger.info( + logger.debug( "Using dedicated hook socket: %s (main socket: %s)", lease_scope.hook_socket_path, lease_scope.socket_path, diff --git a/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py b/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py index ab31d81c5..83b268791 100644 --- a/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py +++ b/python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py @@ -1147,6 +1147,63 @@ async def mock_report_status(status, msg): f"Expected AVAILABLE status after warn+afterLease, got: {status_calls}" ) + async def test_hook_socket_message_at_debug_not_info(self, lease_scope) -> None: + """The 'Using dedicated hook socket' message must be at DEBUG, not INFO. + + This message is an internal detail about socket selection and should + not appear in client-visible hook output. + """ + lease_scope.hook_socket_path = "/tmp/hook_socket" + + hook_config = HookConfigV1Alpha1( + before_lease=HookInstanceConfigV1Alpha1(script="echo 'hello'", timeout=10), + ) + executor = HookExecutor(config=hook_config) + + with patch("jumpstarter.exporter.hooks.logger") as mock_logger: + await executor.execute_before_lease_hook(lease_scope) + + debug_calls = [str(call) for call in mock_logger.debug.call_args_list] + info_calls = [str(call) for call in mock_logger.info.call_args_list] + + assert any("Using dedicated hook socket" in call for call in debug_calls), ( + "Expected 'Using dedicated hook socket' at DEBUG level" + ) + assert not any("Using dedicated hook socket" in call for call in info_calls), ( + "'Using dedicated hook socket' should NOT be at INFO level" + ) + + async def test_completion_messages_at_info(self, lease_scope) -> None: + """Hook completion messages remain at INFO for monitoring slow hooks.""" + hook_config = HookConfigV1Alpha1( + before_lease=HookInstanceConfigV1Alpha1(script="echo 'hello'", timeout=10), + ) + executor = HookExecutor(config=hook_config) + + status_calls = [] + + async def mock_report_status(status, msg): + status_calls.append((status, msg)) + + mock_shutdown = MagicMock() + + with patch("jumpstarter.exporter.hooks.logger") as mock_logger: + await executor.run_before_lease_hook( + lease_scope, + mock_report_status, + mock_shutdown, + ) + + info_calls = [str(call) for call in mock_logger.info.call_args_list] + + assert any("beforeLease hook completed successfully" in call for call in info_calls), ( + "Expected 'beforeLease hook completed successfully' at INFO level" + ) + + assert any("Executing before-lease hook for lease" in call for call in info_calls), ( + "Expected 'Executing before-lease hook' at INFO level" + ) + async def test_after_hook_warn_includes_warning_prefix(self, lease_scope) -> None: """Issue E5b: afterLease hook fail with warn should include HOOK_WARNING_PREFIX.