From 686d0e15eeb4ff6dafa741ceac7f360f6c8df930 Mon Sep 17 00:00:00 2001 From: Benny Zlotnik Date: Tue, 14 Apr 2026 10:48:35 +0300 Subject: [PATCH 1/3] qemu: add OCI flashing to qemu driver Signed-off-by: Benny Zlotnik Assisted-by: claude-opus-4.6 --- .../jumpstarter_driver_qemu/client.py | 31 ++ .../jumpstarter_driver_qemu/driver.py | 145 +++++- .../jumpstarter_driver_qemu/driver_test.py | 434 +++++++++++++++++- 3 files changed, 605 insertions(+), 5 deletions(-) diff --git a/python/packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/client.py b/python/packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/client.py index 92adcfea5..9bfd84ade 100644 --- a/python/packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/client.py +++ b/python/packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/client.py @@ -1,8 +1,28 @@ +import sys from contextlib import contextmanager import click from jumpstarter_driver_composite.client import CompositeClient from jumpstarter_driver_network.adapters import FabricAdapter, NovncAdapter +from jumpstarter_driver_opendal.client import FlasherClient + + +class QemuFlasherClient(FlasherClient): + """Flasher client for QEMU with OCI support via fls.""" + + def flash(self, path, *, target=None, operator=None, compression=None): + if isinstance(path, str) and path.startswith("oci://"): + returncode = 0 + for stdout, stderr, code in self.streamingcall("flash_oci", path, target): + if stdout: + print(stdout, end="", flush=True) + if stderr: + print(stderr, end="", file=sys.stderr, flush=True) + if code is not None: + returncode = code + return returncode + + return super().flash(path, target=target, operator=operator, compression=compression) class QemuClient(CompositeClient): @@ -26,6 +46,17 @@ def set_memory_size(self, size: str) -> None: """Set the memory size for next boot.""" self.call("set_memory_size", size) + def flash_oci(self, oci_url: str, partition: str | None = None): + """Flash an OCI image to the specified partition using fls. + + Convenience method that delegates to self.flasher.flash(). + + Args: + oci_url: OCI image reference (must start with oci://) + partition: Target partition name (default: root) + """ + return self.flasher.flash(oci_url, target=partition) + @contextmanager def novnc(self): with NovncAdapter(client=self.vnc) as url: diff --git a/python/packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/driver.py b/python/packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/driver.py index 833f7209c..8cd940a11 100644 --- a/python/packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/driver.py +++ b/python/packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/driver.py @@ -1,5 +1,6 @@ from __future__ import annotations +import asyncio import json import logging import os @@ -25,6 +26,7 @@ from qemu.qmp import QMPClient from qemu.qmp.protocol import ConnectError, Runstate +from jumpstarter.common.fls import get_fls_binary from jumpstarter.driver import Driver, export from jumpstarter.streams.encoding import AutoDecompressIterator @@ -44,23 +46,158 @@ def filter(self, record): return False +async def _read_pipe(stream: asyncio.StreamReader, name: str, queue: asyncio.Queue): + while True: + line = await stream.readline() + if not line: + break + await queue.put((name, line.decode("utf-8", errors="replace"))) + await queue.put((name, None)) + + @dataclass(kw_only=True) class QemuFlasher(FlasherInterface, Driver): parent: Qemu + @classmethod + def client(cls) -> str: + return "jumpstarter_driver_qemu.client.QemuFlasherClient" + @export async def flash(self, source, partition: str | None = None): """Flash an image to the specified partition. + Accepts OCI image references (oci://...) or streamed image data. Supports transparent decompression of gzip, xz, bz2, and zstd compressed images. Compression format is auto-detected from file signature. """ + if isinstance(source, str) and source.startswith("oci://"): + async for _ in self.flash_oci(source, partition): + pass + return + async with await FileWriteStream.from_path(self.parent.validate_partition(partition)) as stream: async with self.resource(source) as res: # Wrap with auto-decompression to handle .gz, .xz, .bz2, .zstd files async for chunk in AutoDecompressIterator(source=res): await stream.send(chunk) + @export + async def flash_oci( + self, + oci_url: str, + partition: str | None = None, + oci_username: str | None = None, + oci_password: str | None = None, + ) -> AsyncGenerator[tuple[str, str, int | None], None]: + """Flash an OCI image to the specified partition using fls. + + Streams subprocess output back to the caller as it arrives. + Yields (stdout_chunk, stderr_chunk, returncode) tuples. + returncode is None until the process completes. + + Args: + oci_url: OCI image reference (must start with oci://) + partition: Target partition name (default: root) + oci_username: Registry username for OCI authentication + oci_password: Registry password for OCI authentication + """ + if not oci_url.startswith("oci://"): + raise ValueError(f"OCI URL must start with oci://, got: {oci_url}") + + # Fall back to environment variables for credentials + if not oci_username: + oci_username = os.environ.get("OCI_USERNAME") + if not oci_password: + oci_password = os.environ.get("OCI_PASSWORD") + + if bool(oci_username) != bool(oci_password): + raise ValueError("OCI authentication requires both username and password") + + target_path = str(self.parent.validate_partition(partition)) + + fls_binary = get_fls_binary( + fls_version=self.parent.fls_version, + fls_binary_url=self.parent.fls_custom_binary_url, + allow_custom_binaries=self.parent.fls_allow_custom_binaries, + ) + + fls_cmd = [fls_binary, "from-url", oci_url, target_path] + + fls_env = None + if oci_username and oci_password: + fls_env = os.environ.copy() + fls_env["FLS_REGISTRY_USERNAME"] = oci_username + fls_env["FLS_REGISTRY_PASSWORD"] = oci_password + + self.logger.info(f"Running fls: {' '.join(fls_cmd)}") + + try: + async for chunk in self._stream_subprocess(fls_cmd, fls_env): + yield chunk + except FileNotFoundError: + raise RuntimeError("fls command not found. Install fls or configure fls_version in the driver.") from None + + async def _stream_subprocess( + self, cmd: list[str], env: dict[str, str] | None + ) -> AsyncGenerator[tuple[str, str, int | None], None]: + """Run a subprocess and yield (stdout, stderr, returncode) tuples as output arrives.""" + process = await asyncio.create_subprocess_exec( # ty: ignore[missing-argument] + *cmd, + stdout=asyncio.subprocess.PIPE, # ty: ignore[unresolved-attribute] + stderr=asyncio.subprocess.PIPE, # ty: ignore[unresolved-attribute] + env=env, + ) + + output_queue: asyncio.Queue[tuple[str, str | None]] = asyncio.Queue() + + tasks = [ + asyncio.create_task(_read_pipe(process.stdout, "stdout", output_queue)), + asyncio.create_task(_read_pipe(process.stderr, "stderr", output_queue)), + ] + + finished_streams = 0 + start_time = asyncio.get_running_loop().time() + + try: + while finished_streams < 2: + elapsed = asyncio.get_running_loop().time() - start_time + if elapsed >= self.parent.flash_timeout: + process.kill() + await process.wait() + raise RuntimeError(f"fls flash timed out after {self.parent.flash_timeout}s") + + remaining = self.parent.flash_timeout - elapsed + try: + name, text = await asyncio.wait_for(output_queue.get(), timeout=min(remaining, 30)) + except asyncio.TimeoutError: + continue + + if text is None: + finished_streams += 1 + continue + + stdout_chunk = text if name == "stdout" else "" + stderr_chunk = text if name == "stderr" else "" + yield stdout_chunk, stderr_chunk, None + + await process.wait() + returncode = process.returncode + + if returncode != 0: + self.logger.error(f"fls failed - return code: {returncode}") + raise RuntimeError(f"fls flash failed (return code {returncode})") + + self.logger.info("OCI flash completed successfully") + yield "", "", returncode + finally: + for task in tasks: + task.cancel() + await asyncio.gather(*tasks, return_exceptions=True) + if process.returncode is None: + process.kill() + await process.wait() + @export async def dump(self, target, partition: str | None = None): async with await FileReadStream.from_path( @@ -300,6 +437,12 @@ class Qemu(Driver): hostfwd: dict[str, Hostfwd] = field(default_factory=dict) + # FLS configuration for OCI flashing + fls_version: str | None = field(default=None) + fls_allow_custom_binaries: bool = field(default=False) + fls_custom_binary_url: str | None = field(default=None) + flash_timeout: int = field(default=30 * 60) # 30 minutes + _tmp_dir: TemporaryDirectory = field(init=False, default_factory=TemporaryDirectory) @classmethod @@ -357,7 +500,7 @@ def validate_partition( case "bios": path = Path(self._tmp_dir.name) / "bios" case _: - raise ValueError(f"invalida partition name: {partition}") + raise ValueError(f"invalid partition name: {partition}") if not path.exists() and partition in self.default_partitions and use_default_partitions: return self.default_partitions[partition] diff --git a/python/packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/driver_test.py b/python/packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/driver_test.py index c9246e504..d3d1fb2c9 100644 --- a/python/packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/driver_test.py +++ b/python/packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/driver_test.py @@ -1,3 +1,4 @@ +import asyncio import json import os import platform @@ -5,13 +6,13 @@ import tarfile from pathlib import Path from types import SimpleNamespace -from unittest.mock import AsyncMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest import requests from opendal import Operator -from jumpstarter_driver_qemu.driver import Qemu +from jumpstarter_driver_qemu.driver import Qemu, QemuFlasher from jumpstarter.common.utils import serve @@ -48,12 +49,12 @@ def get_native_arch_config(): elif native_arch == "aarch64": return "aarch64", "aarch64" else: - pytest.skip(f"Unsupported architecture: {native_arch}") # ty: ignore[call-non-callable] + pytest.skip(f"Unsupported architecture: {native_arch}") # ty: ignore[call-non-callable] @pytest.mark.xfail( platform.system() == "Darwin" and os.getenv("GITHUB_ACTIONS") == "true", - reason="QEMU tests are flaky on macOS in GitHub CI" + reason="QEMU tests are flaky on macOS in GitHub CI", ) def test_driver_qemu(tmp_path, ovmf): arch, ovmf_arch = get_native_arch_config() @@ -123,12 +124,14 @@ def _create(disk_size, current_size_gb): def _mock_qemu_img_info(virtual_size): """Return a mock for run_process that simulates qemu-img info.""" + async def mock(cmd, **kwargs): result = AsyncMock() result.returncode = 0 result.stdout = json.dumps({"format": "raw", "virtual-size": virtual_size}).encode() result.check_returncode = lambda: None return result + return mock @@ -201,3 +204,426 @@ def test_set_memory_size_invalid(): driver = Qemu() with pytest.raises(ValueError, match="Invalid size"): driver.set_memory_size("invalid") + + +# OCI Flash Tests + + +def _create_mock_process(stdout_lines=None, stderr_lines=None, returncode=0): + """Create a mock asyncio subprocess process for testing flash_oci.""" + if stdout_lines is None: + stdout_lines = [] + if stderr_lines is None: + stderr_lines = [] + + process = MagicMock() + process.returncode = returncode + process.wait = AsyncMock(return_value=returncode) + process.kill = MagicMock() + + stdout_data = [line.encode() if isinstance(line, str) else line for line in stdout_lines] + [b""] + stdout_stream = MagicMock() + stdout_stream.readline = AsyncMock(side_effect=stdout_data) + process.stdout = stdout_stream + + stderr_data = [line.encode() if isinstance(line, str) else line for line in stderr_lines] + [b""] + stderr_stream = MagicMock() + stderr_stream.readline = AsyncMock(side_effect=stderr_data) + process.stderr = stderr_stream + + return process + + +async def _collect_flash_oci(flasher, *args, **kwargs): + """Collect all output from flash_oci async generator.""" + results = [] + async for chunk in flasher.flash_oci(*args, **kwargs): + results.append(chunk) + return results + + +@pytest.mark.anyio +async def test_flash_oci_success(): + """flash_oci should invoke fls from-url with the correct arguments.""" + driver = Qemu() + flasher = driver.children["flasher"] + expected_target = str(Path(driver._tmp_dir.name) / "root") + mock_process = _create_mock_process(stdout_lines=["Flashing complete\n"]) + + with patch("jumpstarter_driver_qemu.driver.get_fls_binary", return_value="/usr/local/bin/fls"): + with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock, return_value=mock_process) as mock_exec: + results = await _collect_flash_oci(flasher, "oci://quay.io/org/image:tag") + + # Verify final chunk has returncode 0 + assert any(r[2] == 0 for r in results) + + mock_exec.assert_called_once() + call_args = mock_exec.call_args + assert call_args.args[0] == "/usr/local/bin/fls" + assert call_args.args[1] == "from-url" + assert call_args.args[2] == "oci://quay.io/org/image:tag" + assert call_args.args[3] == expected_target + + +@pytest.mark.anyio +async def test_flash_oci_with_partition(): + """flash_oci should write to the correct partition path.""" + driver = Qemu() + flasher = driver.children["flasher"] + expected_target = str(Path(driver._tmp_dir.name) / "bios") + mock_process = _create_mock_process() + + with patch("jumpstarter_driver_qemu.driver.get_fls_binary", return_value="fls"): + with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock, return_value=mock_process) as mock_exec: + await _collect_flash_oci(flasher, "oci://quay.io/org/bios:v1", partition="bios") + + assert mock_exec.call_args.args[3] == expected_target + + +@pytest.mark.anyio +async def test_flash_oci_with_credentials(): + """OCI credentials should be passed via env vars, not command args.""" + driver = Qemu() + flasher = driver.children["flasher"] + mock_process = _create_mock_process() + + with patch("jumpstarter_driver_qemu.driver.get_fls_binary", return_value="fls"): + with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock, return_value=mock_process) as mock_exec: + await _collect_flash_oci( + flasher, + "oci://quay.io/private/image:tag", + oci_username="myuser", + oci_password="mypass", + ) + + # Credentials should NOT appear in command args + assert "myuser" not in mock_exec.call_args.args + assert "mypass" not in mock_exec.call_args.args + + # Credentials should be in env vars + env = mock_exec.call_args.kwargs["env"] + assert env["FLS_REGISTRY_USERNAME"] == "myuser" + assert env["FLS_REGISTRY_PASSWORD"] == "mypass" + + +@pytest.mark.anyio +async def test_flash_oci_no_credentials(): + """Without credentials, env should be None (inherit parent env).""" + driver = Qemu() + flasher = driver.children["flasher"] + mock_process = _create_mock_process() + + # Ensure OCI env vars are not set so driver doesn't pick them up + env_clean = {k: v for k, v in os.environ.items() if k not in ("OCI_USERNAME", "OCI_PASSWORD")} + with patch.dict(os.environ, env_clean, clear=True): + with patch("jumpstarter_driver_qemu.driver.get_fls_binary", return_value="fls"): + with patch( + "asyncio.create_subprocess_exec", new_callable=AsyncMock, return_value=mock_process + ) as mock_exec: + await _collect_flash_oci(flasher, "oci://quay.io/public/image:tag") + + env = mock_exec.call_args.kwargs["env"] + assert env is None + + +@pytest.mark.anyio +async def test_flash_oci_credentials_from_env(): + """flash_oci should read OCI_USERNAME/OCI_PASSWORD from env when not explicitly provided.""" + driver = Qemu() + flasher = driver.children["flasher"] + mock_process = _create_mock_process() + + with patch.dict(os.environ, {"OCI_USERNAME": "envuser", "OCI_PASSWORD": "envpass"}): + with patch("jumpstarter_driver_qemu.driver.get_fls_binary", return_value="fls"): + with patch( + "asyncio.create_subprocess_exec", new_callable=AsyncMock, return_value=mock_process + ) as mock_exec: + await _collect_flash_oci(flasher, "oci://quay.io/private/image:tag") + + env = mock_exec.call_args.kwargs["env"] + assert env["FLS_REGISTRY_USERNAME"] == "envuser" + assert env["FLS_REGISTRY_PASSWORD"] == "envpass" + + +@pytest.mark.anyio +async def test_flash_oci_explicit_credentials_override_env(): + """Explicit credentials should take precedence over env vars.""" + driver = Qemu() + flasher = driver.children["flasher"] + mock_process = _create_mock_process() + + with patch.dict(os.environ, {"OCI_USERNAME": "envuser", "OCI_PASSWORD": "envpass"}): + with patch("jumpstarter_driver_qemu.driver.get_fls_binary", return_value="fls"): + with patch( + "asyncio.create_subprocess_exec", new_callable=AsyncMock, return_value=mock_process + ) as mock_exec: + await _collect_flash_oci( + flasher, + "oci://quay.io/private/image:tag", + oci_username="explicit_user", + oci_password="explicit_pass", + ) + + env = mock_exec.call_args.kwargs["env"] + assert env["FLS_REGISTRY_USERNAME"] == "explicit_user" + assert env["FLS_REGISTRY_PASSWORD"] == "explicit_pass" + + +@pytest.mark.anyio +async def test_flash_oci_streams_output(): + """flash_oci should yield stdout and stderr chunks as they arrive.""" + driver = Qemu() + flasher = driver.children["flasher"] + mock_process = _create_mock_process( + stdout_lines=["downloading layer 1\n", "downloading layer 2\n"], + stderr_lines=["progress: 50%\n", "progress: 100%\n"], + ) + + with patch("jumpstarter_driver_qemu.driver.get_fls_binary", return_value="fls"): + with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock, return_value=mock_process): + results = await _collect_flash_oci(flasher, "oci://quay.io/org/image:tag") + + # Should have received streaming output plus the final returncode chunk + stdout_chunks = [r[0] for r in results if r[0]] + stderr_chunks = [r[1] for r in results if r[1]] + assert len(stdout_chunks) > 0 + assert len(stderr_chunks) > 0 + assert any(r[2] == 0 for r in results) + + +@pytest.mark.anyio +async def test_flash_oci_rejects_non_oci_url(): + """URLs without oci:// prefix should be rejected.""" + driver = Qemu() + flasher = driver.children["flasher"] + assert isinstance(flasher, QemuFlasher) + + with pytest.raises(ValueError, match="OCI URL must start with oci://"): + async for _ in flasher.flash_oci("docker://image:tag"): + pass + + with pytest.raises(ValueError, match="OCI URL must start with oci://"): + async for _ in flasher.flash_oci("quay.io/org/image:tag"): + pass + + +@pytest.mark.anyio +async def test_flash_oci_partial_credentials_rejected(): + """Providing only username or only password should be rejected.""" + driver = Qemu() + flasher = driver.children["flasher"] + assert isinstance(flasher, QemuFlasher) + + with pytest.raises(ValueError, match="OCI authentication requires both"): + async for _ in flasher.flash_oci("oci://image:tag", oci_username="user", oci_password=None): + pass + + with pytest.raises(ValueError, match="OCI authentication requires both"): + async for _ in flasher.flash_oci("oci://image:tag", oci_username=None, oci_password="pass"): + pass + + +@pytest.mark.anyio +async def test_flash_oci_fls_failure(): + """Non-zero return code from fls should raise RuntimeError.""" + driver = Qemu() + flasher = driver.children["flasher"] + mock_process = _create_mock_process(returncode=1) + + with patch("jumpstarter_driver_qemu.driver.get_fls_binary", return_value="fls"): + with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock, return_value=mock_process): + with pytest.raises(RuntimeError, match="fls flash failed"): + await _collect_flash_oci(flasher, "oci://quay.io/org/image:tag") + + +@pytest.mark.anyio +async def test_flash_oci_fls_timeout(): + """Flash should raise RuntimeError when timeout is exceeded.""" + driver = Qemu(flash_timeout=0) # Immediate timeout + flasher = driver.children["flasher"] + + async def hanging_readline(): + await asyncio.sleep(10) + return b"" + + mock_process = MagicMock() + mock_process.returncode = None + + async def mock_wait(): + mock_process.returncode = -9 + return -9 + + mock_process.wait = mock_wait + mock_process.kill = MagicMock() + + stdout_stream = MagicMock() + stdout_stream.readline = hanging_readline + mock_process.stdout = stdout_stream + + stderr_stream = MagicMock() + stderr_stream.readline = hanging_readline + mock_process.stderr = stderr_stream + + with patch("jumpstarter_driver_qemu.driver.get_fls_binary", return_value="fls"): + with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock, return_value=mock_process): + with pytest.raises(RuntimeError, match="fls flash timed out"): + await _collect_flash_oci(flasher, "oci://quay.io/org/image:tag") + + mock_process.kill.assert_called_once() + + +@pytest.mark.anyio +async def test_flash_oci_inner_wait_timeout(): + """Inner wait_for timeout should continue the loop without raising.""" + driver = Qemu(flash_timeout=600) + flasher = driver.children["flasher"] + mock_process = _create_mock_process(stdout_lines=["output\n"]) + + original_wait_for = asyncio.wait_for + timeout_fired = False + + async def mock_wait_for(awaitable, *, timeout): + nonlocal timeout_fired + if not timeout_fired: # ty: ignore[unresolved-reference] + timeout_fired = True + if hasattr(awaitable, "close"): + awaitable.close() + raise asyncio.TimeoutError() + return await original_wait_for(awaitable, timeout=timeout) + + with patch("jumpstarter_driver_qemu.driver.get_fls_binary", return_value="fls"): + with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock, return_value=mock_process): + with patch("asyncio.wait_for", mock_wait_for): + results = await _collect_flash_oci(flasher, "oci://quay.io/org/image:tag") + + assert timeout_fired + assert any(r[2] == 0 for r in results) + + +@pytest.mark.anyio +async def test_flash_oci_process_cleanup_on_early_exit(): + """Finally block should kill process when generator is abandoned early.""" + driver = Qemu() + flasher = driver.children["flasher"] + + mock_process = MagicMock() + mock_process.returncode = None + + async def mock_wait(): + mock_process.returncode = 0 + return 0 + + mock_process.wait = mock_wait + mock_process.kill = MagicMock() + + stdout_stream = MagicMock() + stdout_stream.readline = AsyncMock(side_effect=[b"line1\n", b"line2\n", b""]) + mock_process.stdout = stdout_stream + + stderr_stream = MagicMock() + stderr_stream.readline = AsyncMock(side_effect=[b""]) + mock_process.stderr = stderr_stream + + with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock, return_value=mock_process): + gen = flasher._stream_subprocess(["fls", "from-url", "oci://img", "/tmp/root"], None) # ty: ignore[unresolved-attribute] + async for _ in gen: + break + await gen.aclose() + + mock_process.kill.assert_called() + + +@pytest.mark.anyio +async def test_flash_oci_fls_not_found(): + """FileNotFoundError should raise RuntimeError with install hint.""" + driver = Qemu() + flasher = driver.children["flasher"] + + with patch("jumpstarter_driver_qemu.driver.get_fls_binary", return_value="fls"): + with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock, side_effect=FileNotFoundError): + with pytest.raises(RuntimeError, match="fls command not found"): + await _collect_flash_oci(flasher, "oci://quay.io/org/image:tag") + + +@pytest.mark.anyio +async def test_flash_oci_uses_fls_config(): + """flash_oci should pass fls config from parent Qemu driver.""" + driver = Qemu(fls_version="0.2.0") + flasher = driver.children["flasher"] + mock_process = _create_mock_process() + + with patch("jumpstarter_driver_qemu.driver.get_fls_binary", return_value="fls") as mock_get: + with patch("asyncio.create_subprocess_exec", new_callable=AsyncMock, return_value=mock_process): + await _collect_flash_oci(flasher, "oci://quay.io/org/image:tag") + + mock_get.assert_called_once_with( + fls_version="0.2.0", + fls_binary_url=None, + allow_custom_binaries=False, + ) + + +@pytest.mark.anyio +async def test_flash_oci_invalid_partition(): + """Invalid partition names should raise ValueError.""" + driver = Qemu() + flasher = driver.children["flasher"] + assert isinstance(flasher, QemuFlasher) + + with pytest.raises(ValueError, match="invalid partition name"): + async for _ in flasher.flash_oci("oci://image:tag", partition="nonexistent"): + pass + + +# OCI Client Integration Tests + + +def test_flash_oci_via_flasher_client(): + """flasher.flash('oci://...') should route through flash_oci on the driver.""" + mock_process = _create_mock_process(stdout_lines=["done\n"]) + + with serve(Qemu()) as qemu: + with patch("jumpstarter_driver_qemu.driver.get_fls_binary", return_value="fls"): + with patch( + "asyncio.create_subprocess_exec", new_callable=AsyncMock, return_value=mock_process + ) as mock_exec: + qemu.flasher.flash("oci://quay.io/org/image:tag") + + mock_exec.assert_called_once() + assert mock_exec.call_args.args[1] == "from-url" + assert mock_exec.call_args.args[2] == "oci://quay.io/org/image:tag" + + +def test_flash_oci_convenience_method(): + """qemu.flash_oci() should delegate to flasher.flash().""" + mock_process = _create_mock_process() + + with serve(Qemu()) as qemu: + with patch("jumpstarter_driver_qemu.driver.get_fls_binary", return_value="fls"): + with patch( + "asyncio.create_subprocess_exec", new_callable=AsyncMock, return_value=mock_process + ) as mock_exec: + qemu.flash_oci("oci://quay.io/org/image:tag", partition="bios") + + mock_exec.assert_called_once() + assert mock_exec.call_args.args[1] == "from-url" + assert mock_exec.call_args.args[2] == "oci://quay.io/org/image:tag" + assert Path(mock_exec.call_args.args[3]).name == "bios" + + +@pytest.mark.anyio +async def test_flash_routes_oci_to_flash_oci(): + """Driver-side flash() should detect oci:// URLs and route to flash_oci.""" + driver = Qemu() + flasher = driver.children["flasher"] + assert isinstance(flasher, QemuFlasher) + + async def mock_generator(*args, **kwargs): + yield "", "", 0 + + mock = MagicMock(side_effect=mock_generator) + + with patch.object(flasher, "flash_oci", mock): + await flasher.flash("oci://quay.io/org/image:tag", partition="root") + + mock.assert_called_once_with("oci://quay.io/org/image:tag", "root") From cc545a3ef5738a970017033d12ed2643126ed83e Mon Sep 17 00:00:00 2001 From: Benny Zlotnik Date: Fri, 17 Apr 2026 09:22:24 +0300 Subject: [PATCH 2/3] fix: resolve diff-cover path resolution and add OCI flash edge-case tests Change --cov to --cov=. in pytest addopts so coverage.xml includes an absolute source root, allowing diff-cover to correctly match coverage data to git diff paths in the monorepo. Without this, diff-cover fell back to low-coverage data from dependent packages that only import the module. Remove bare --cov from CI PYTEST_ADDOPTS to prevent cross-package coverage contamination. The bare flag caused every package test run to record coverage for all imported modules, polluting diff-cover results. Add tests for the inner wait_for timeout handler and process cleanup on early generator exit in _stream_subprocess. Signed-off-by: Benny Zlotnik Assisted-by: claude-opus-4.6 --- .github/workflows/python-tests.yaml | 16 ++++++++-------- python/pyproject.toml | 2 +- 2 files changed, 9 insertions(+), 9 deletions(-) diff --git a/.github/workflows/python-tests.yaml b/.github/workflows/python-tests.yaml index ef127eb79..6d4e15418 100644 --- a/.github/workflows/python-tests.yaml +++ b/.github/workflows/python-tests.yaml @@ -118,20 +118,20 @@ jobs: - name: Run pytest working-directory: python env: - PYTEST_ADDOPTS: "--cov --cov-report=xml" + PYTEST_ADDOPTS: "--cov-report=xml" run: | - make test + make test - name: Check coverage on changed lines if: github.event_name == 'pull_request' working-directory: python run: | - coverage_files=$(find packages -name coverage.xml 2>/dev/null | sort) - if [ -z "$coverage_files" ]; then - echo "::error::No coverage.xml files found" - exit 1 - fi - uv run diff-cover $coverage_files --compare-branch=origin/${{ github.base_ref }} --fail-under=80 + coverage_files=$(find packages -name coverage.xml 2>/dev/null | sort) + if [ -z "$coverage_files" ]; then + echo "::error::No coverage.xml files found" + exit 1 + fi + uv run diff-cover $coverage_files --compare-branch=origin/${{ github.base_ref }} --fail-under=80 # https://github.com/orgs/community/discussions/26822 pytest: diff --git a/python/pyproject.toml b/python/pyproject.toml index d09b7726d..feb4c47b9 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -106,7 +106,7 @@ omit = ["conftest.py", "test_*.py", "*_test.py", "*_pb2.py", "*_pb2_grpc.py"] skip_empty = true [tool.pytest.ini_options] -addopts = "--capture=no --doctest-modules --cov --cov-report=html --cov-report=xml" +addopts = "--capture=no --doctest-modules --cov=. --cov-report=html --cov-report=xml" [tool.hatch.version] source = "vcs" From 71960a96974e7becf50d2e9f4219c0c4853c0525 Mon Sep 17 00:00:00 2001 From: Benny Zlotnik Date: Sun, 19 Apr 2026 13:37:07 +0300 Subject: [PATCH 3/3] use musl on aarch64 Update Dockerfile and get_fls_github_url() to use fls-aarch64-linux-musl binary name, matching the actual fls release asset naming convention. Signed-off-by: Benny Zlotnik --- python/Dockerfile | 6 ++++-- python/packages/jumpstarter/jumpstarter/common/fls.py | 4 ++-- .../jumpstarter/jumpstarter/common/fls_test.py | 10 +++++----- 3 files changed, 11 insertions(+), 9 deletions(-) diff --git a/python/Dockerfile b/python/Dockerfile index f8bc5f0c9..25111c8f6 100644 --- a/python/Dockerfile +++ b/python/Dockerfile @@ -14,8 +14,10 @@ COPY --from=ghcr.io/astral-sh/uv:latest /uv /uvx /bin/ ARG FLS_VERSION=0.2.0 RUN ARCH=$(uname -m) && \ - URL="https://github.com/jumpstarter-dev/fls/releases/download/${FLS_VERSION}/fls-${ARCH}-linux" && \ - TEMP_FILE="/tmp/fls-${ARCH}-linux.tmp" && \ + SUFFIX="linux" && \ + if [ "$ARCH" = "aarch64" ]; then SUFFIX="linux-musl"; fi && \ + URL="https://github.com/jumpstarter-dev/fls/releases/download/${FLS_VERSION}/fls-${ARCH}-${SUFFIX}" && \ + TEMP_FILE="/tmp/fls-${ARCH}-${SUFFIX}.tmp" && \ curl -fsSL "${URL}" -o "${TEMP_FILE}" && \ mv "${TEMP_FILE}" /usr/local/bin/fls && \ chmod +x /usr/local/bin/fls diff --git a/python/packages/jumpstarter/jumpstarter/common/fls.py b/python/packages/jumpstarter/jumpstarter/common/fls.py index 0f61f7885..5dce0ea45 100644 --- a/python/packages/jumpstarter/jumpstarter/common/fls.py +++ b/python/packages/jumpstarter/jumpstarter/common/fls.py @@ -35,11 +35,11 @@ def get_fls_github_url(version: str, arch: str | None = None) -> str: else: arch = arch.lower() if arch in ("aarch64", "arm64"): - binary_name = "fls-aarch64-linux" + binary_name = "fls-aarch64-linux-musl" elif arch in ("x86_64", "amd64"): binary_name = "fls-x86_64-linux" else: - binary_name = "fls-aarch64-linux" # Default to aarch64 + binary_name = "fls-aarch64-linux-musl" return f"https://github.com/{FLS_GITHUB_REPO}/releases/download/{version}/{binary_name}" diff --git a/python/packages/jumpstarter/jumpstarter/common/fls_test.py b/python/packages/jumpstarter/jumpstarter/common/fls_test.py index 5e5dd9022..d00173195 100644 --- a/python/packages/jumpstarter/jumpstarter/common/fls_test.py +++ b/python/packages/jumpstarter/jumpstarter/common/fls_test.py @@ -8,11 +8,11 @@ @pytest.mark.parametrize( "arch,version,expected_binary", [ - ("aarch64", "0.1.9", "fls-aarch64-linux"), - ("arm64", "0.1.9", "fls-aarch64-linux"), + ("aarch64", "0.1.9", "fls-aarch64-linux-musl"), + ("arm64", "0.1.9", "fls-aarch64-linux-musl"), ("x86_64", "0.2.0", "fls-x86_64-linux"), ("amd64", "0.2.0", "fls-x86_64-linux"), - ("unknown", "0.1.9", "fls-aarch64-linux"), # defaults to aarch64 + ("unknown", "0.1.9", "fls-aarch64-linux-musl"), # defaults to aarch64 ], ) def test_get_fls_github_url_auto_detect(arch, version, expected_binary): @@ -25,8 +25,8 @@ def test_get_fls_github_url_auto_detect(arch, version, expected_binary): @pytest.mark.parametrize( "arch,version,expected_binary", [ - ("aarch64", "0.1.9", "fls-aarch64-linux"), - ("AARCH64", "0.1.9", "fls-aarch64-linux"), # case insensitive + ("aarch64", "0.1.9", "fls-aarch64-linux-musl"), + ("AARCH64", "0.1.9", "fls-aarch64-linux-musl"), # case insensitive ("x86_64", "0.2.0", "fls-x86_64-linux"), ], )