Skip to content
Draft
256 changes: 255 additions & 1 deletion python/packages/jumpstarter/jumpstarter/client/core_test.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
"""Tests for AsyncDriverClient async status methods."""

import logging
from dataclasses import dataclass
from unittest.mock import AsyncMock, MagicMock
from uuid import uuid4

import anyio
import pytest
from grpc import StatusCode
from grpc.aio import AioRpcError

from jumpstarter.common import ExporterStatus, Metadata
from jumpstarter.common import ExporterStatus, LogSource, Metadata

pytestmark = pytest.mark.anyio

Expand Down Expand Up @@ -293,3 +295,255 @@ async def test_wait_for_hook_status_unimplemented_returns_true(self) -> None:

# Should return True (backward compatibility - assume hook complete)
assert result is True


def create_log_stream_response(message: str, severity: str = "INFO", source=None):
"""Create a mock LogStreamResponse."""
response = MagicMock()
response.message = message
response.severity = severity
if source is not None:
response.HasField = lambda field: field == "source"
response.source = source.to_proto()
else:
response.HasField = lambda field: False
response.source = None
return response


class LogCapture(logging.Handler):
"""Captures log records for assertion in tests."""

def __init__(self):
super().__init__()
self.records: list[logging.LogRecord] = []

def emit(self, record):
self.records.append(record)


SOURCE_LOGGER_NAMES = [
"exporter:beforeLease",
"exporter:afterLease",
"exporter:driver",
"exporter:system",
]


def setup_log_stream_client(responses):
from jumpstarter.client.core import AsyncDriverClient

all_delivered = anyio.Event()

async def mock_log_stream(_):
for r in responses:
yield r
all_delivered.set()
await anyio.sleep_forever()

stub = MagicMock()
stub.LogStream = mock_log_stream

client = MagicMock(spec=AsyncDriverClient)
client.stub = stub
client.logger = logging.getLogger("test_log_stream_client")
client.log_stream_async = AsyncDriverClient.log_stream_async.__get__(client)

captures = {}
for name in SOURCE_LOGGER_NAMES:
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
logger.handlers.clear()
capture = LogCapture()
logger.addHandler(capture)
captures[name] = capture

return client, captures, all_delivered


@pytest.fixture(autouse=True)
def _clean_source_loggers():
yield
for name in SOURCE_LOGGER_NAMES:
logger = logging.getLogger(name)
logger.handlers.clear()


class TestLogStreamResponseFactory:
def test_create_response_without_source(self) -> None:
response = create_log_stream_response("plain message")
assert response.message == "plain message"
assert response.severity == "INFO"
assert response.HasField("source") is False
assert response.source is None

def test_create_response_with_source(self) -> None:
response = create_log_stream_response(
"hook message", severity="DEBUG", source=LogSource.BEFORE_LEASE_HOOK
)
assert response.message == "hook message"
assert response.severity == "DEBUG"
assert response.HasField("source") is True


class TestLogStreamSourceTagPlacement:
async def test_hook_log_delegates_tagging_to_formatter(self) -> None:
responses = [
create_log_stream_response(
"hook output line",
severity="INFO",
source=LogSource.BEFORE_LEASE_HOOK,
),
]

client, captures, delivered = setup_log_stream_client(responses)

async with client.log_stream_async(show_all_logs=True):
with anyio.fail_after(2):
await delivered.wait()

records = captures["exporter:beforeLease"].records
assert len(records) == 1
assert records[0].getMessage() == "hook output line"
assert records[0].name == "exporter:beforeLease"

async def test_after_lease_hook_log_delegates_tagging_to_formatter(self) -> None:
responses = [
create_log_stream_response(
"cleanup output",
severity="INFO",
source=LogSource.AFTER_LEASE_HOOK,
),
]

client, captures, delivered = setup_log_stream_client(responses)

async with client.log_stream_async(show_all_logs=True):
with anyio.fail_after(2):
await delivered.wait()

records = captures["exporter:afterLease"].records
assert len(records) == 1
assert records[0].getMessage() == "cleanup output"
assert records[0].name == "exporter:afterLease"

async def test_logger_name_carries_source_for_formatter(self) -> None:
responses = [
create_log_stream_response(
"line one",
severity="INFO",
source=LogSource.BEFORE_LEASE_HOOK,
),
create_log_stream_response(
"line two",
severity="INFO",
source=LogSource.AFTER_LEASE_HOOK,
),
]

client, captures, delivered = setup_log_stream_client(responses)

async with client.log_stream_async(show_all_logs=True):
with anyio.fail_after(2):
await delivered.wait()

before_records = captures["exporter:beforeLease"].records
after_records = captures["exporter:afterLease"].records
assert len(before_records) == 1
assert before_records[0].name == "exporter:beforeLease"
assert before_records[0].getMessage() == "line one"
assert len(after_records) == 1
assert after_records[0].name == "exporter:afterLease"
assert after_records[0].getMessage() == "line two"


class TestLogStreamFiltering:
async def test_show_all_logs_false_filters_system_logs(self) -> None:
responses = [
create_log_stream_response(
"debug system message",
severity="DEBUG",
source=LogSource.SYSTEM,
),
create_log_stream_response(
"hook output line",
severity="INFO",
source=LogSource.BEFORE_LEASE_HOOK,
),
]

client, captures, delivered = setup_log_stream_client(responses)

async with client.log_stream_async(show_all_logs=False):
with anyio.fail_after(2):
await delivered.wait()

system_records = captures["exporter:system"].records
hook_records = captures["exporter:beforeLease"].records
assert len(system_records) == 0
assert len(hook_records) == 1

async def test_show_all_logs_false_shows_hook_logs(self) -> None:
responses = [
create_log_stream_response(
"before hook output",
severity="INFO",
source=LogSource.BEFORE_LEASE_HOOK,
),
create_log_stream_response(
"after hook output",
severity="INFO",
source=LogSource.AFTER_LEASE_HOOK,
),
]

client, captures, delivered = setup_log_stream_client(responses)

async with client.log_stream_async(show_all_logs=False):
with anyio.fail_after(2):
await delivered.wait()

before_records = captures["exporter:beforeLease"].records
after_records = captures["exporter:afterLease"].records
assert len(before_records) == 1
assert len(after_records) == 1

async def test_show_all_logs_true_shows_system_logs(self) -> None:
responses = [
create_log_stream_response(
"system message",
severity="INFO",
source=LogSource.SYSTEM,
),
create_log_stream_response(
"hook output line",
severity="INFO",
source=LogSource.BEFORE_LEASE_HOOK,
),
]

client, captures, delivered = setup_log_stream_client(responses)

async with client.log_stream_async(show_all_logs=True):
with anyio.fail_after(2):
await delivered.wait()

system_records = captures["exporter:system"].records
hook_records = captures["exporter:beforeLease"].records
assert len(system_records) == 1
assert len(hook_records) == 1

async def test_log_without_source_routes_to_system(self) -> None:
responses = [
create_log_stream_response("no source message"),
]

client, captures, delivered = setup_log_stream_client(responses)

async with client.log_stream_async(show_all_logs=True):
with anyio.fail_after(2):
await delivered.wait()

system_records = captures["exporter:system"].records
assert len(system_records) == 1
assert system_records[0].getMessage() == "no source message"
7 changes: 5 additions & 2 deletions python/packages/jumpstarter/jumpstarter/exporter/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@

logger = logging.getLogger(__name__)

DEFAULT_SAFETY_TIMEOUT_SECONDS = 15
HOOK_TIMEOUT_MARGIN_SECONDS = 30


async def _standalone_shutdown_waiter():
"""Wait forever; used so serve_standalone_tcp can be cancelled by stop()."""
Expand Down Expand Up @@ -601,12 +604,12 @@ async def _cleanup_after_lease(self, lease_scope: LeaseContext) -> None:
# was never set due to a race (e.g. conn_tg cancelled early).
# Use the configured hook timeout (+ margin) when available so we
# never interrupt a legitimately-running beforeLease hook.
safety_timeout = 15 # generous default for no-hook / unknown cases
safety_timeout = DEFAULT_SAFETY_TIMEOUT_SECONDS
if (
self.hook_executor
and self.hook_executor.config.before_lease
):
safety_timeout = self.hook_executor.config.before_lease.timeout + 30
safety_timeout = self.hook_executor.config.before_lease.timeout + HOOK_TIMEOUT_MARGIN_SECONDS
with move_on_after(safety_timeout) as timeout_scope:
await lease_scope.before_lease_hook.wait()
if timeout_scope.cancelled_caught:
Expand Down
2 changes: 1 addition & 1 deletion python/packages/jumpstarter/jumpstarter/exporter/hooks.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ def _create_hook_env(self, lease_scope: "LeaseContext") -> dict[str, str]:
# Falls back to main socket if hook socket not available (backward compatibility)
socket_path = lease_scope.hook_socket_path or lease_scope.socket_path
if lease_scope.hook_socket_path:
logger.info(
logger.debug(
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test comment

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good -- demoting this socket-selection detail from INFO to DEBUG makes sense since it is purely internal plumbing and should not appear in client-visible output.

AI-generated, human reviewed

"Using dedicated hook socket: %s (main socket: %s)",
lease_scope.hook_socket_path,
lease_scope.socket_path,
Expand Down
57 changes: 57 additions & 0 deletions python/packages/jumpstarter/jumpstarter/exporter/hooks_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1147,6 +1147,63 @@ async def mock_report_status(status, msg):
f"Expected AVAILABLE status after warn+afterLease, got: {status_calls}"
)

async def test_hook_socket_message_at_debug_not_info(self, lease_scope) -> None:
"""The 'Using dedicated hook socket' message must be at DEBUG, not INFO.

This message is an internal detail about socket selection and should
not appear in client-visible hook output.
"""
lease_scope.hook_socket_path = "/tmp/hook_socket"

hook_config = HookConfigV1Alpha1(
before_lease=HookInstanceConfigV1Alpha1(script="echo 'hello'", timeout=10),
)
executor = HookExecutor(config=hook_config)

with patch("jumpstarter.exporter.hooks.logger") as mock_logger:
await executor.execute_before_lease_hook(lease_scope)

debug_calls = [str(call) for call in mock_logger.debug.call_args_list]
info_calls = [str(call) for call in mock_logger.info.call_args_list]

assert any("Using dedicated hook socket" in call for call in debug_calls), (
"Expected 'Using dedicated hook socket' at DEBUG level"
)
assert not any("Using dedicated hook socket" in call for call in info_calls), (
"'Using dedicated hook socket' should NOT be at INFO level"
)

async def test_completion_messages_at_info(self, lease_scope) -> None:
"""Hook completion messages remain at INFO for monitoring slow hooks."""
hook_config = HookConfigV1Alpha1(
before_lease=HookInstanceConfigV1Alpha1(script="echo 'hello'", timeout=10),
)
executor = HookExecutor(config=hook_config)

status_calls = []

async def mock_report_status(status, msg):
status_calls.append((status, msg))

mock_shutdown = MagicMock()

with patch("jumpstarter.exporter.hooks.logger") as mock_logger:
await executor.run_before_lease_hook(
lease_scope,
mock_report_status,
mock_shutdown,
)

info_calls = [str(call) for call in mock_logger.info.call_args_list]

assert any("beforeLease hook completed successfully" in call for call in info_calls), (
"Expected 'beforeLease hook completed successfully' at INFO level"
)

assert any("Executing before-lease hook for lease" in call for call in info_calls), (
"Expected 'Executing before-lease hook' at INFO level"
)

async def test_after_hook_warn_includes_warning_prefix(self, lease_scope) -> None:
"""Issue E5b: afterLease hook fail with warn should include HOOK_WARNING_PREFIX.

Expand Down
Loading