Skip to content
This repository was archived by the owner on Jan 23, 2026. It is now read-only.

Commit b884841

Browse files
committed
added tests
1 parent fd3c788 commit b884841

1 file changed

Lines changed: 71 additions & 0 deletions

File tree

  • packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu

packages/jumpstarter-driver-qemu/jumpstarter_driver_qemu/driver_test.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
1+
import json
12
import os
23
import platform
34
import sys
45
import tarfile
56
from pathlib import Path
7+
from unittest.mock import AsyncMock, patch
68

79
import pytest
810
import requests
@@ -13,6 +15,12 @@
1315
from jumpstarter.common.utils import serve
1416

1517

18+
@pytest.fixture
19+
def anyio_backend():
20+
"""Use only asyncio backend for anyio tests."""
21+
return "asyncio"
22+
23+
1624
@pytest.fixture(scope="session")
1725
def ovmf(tmpdir_factory):
1826
tmp_path = tmpdir_factory.mktemp("ovmf")
@@ -91,3 +99,66 @@ def test_driver_qemu(tmp_path, ovmf):
9199
assert s.run("uname -r").stdout.strip() == f"6.11.4-301.fc41.{arch}"
92100

93101
qemu.power.off()
102+
103+
104+
def _setup_resize_test(disk_size, current_size_gb):
105+
"""Create a Qemu driver with a sparse root disk."""
106+
driver = Qemu(disk_size=disk_size)
107+
root = Path(driver._tmp_dir.name) / "root"
108+
root.write_bytes(b"")
109+
os.truncate(root, current_size_gb * 1024**3)
110+
return driver, current_size_gb * 1024**3
111+
112+
113+
def _mock_qemu_img_info(virtual_size):
114+
"""Return a mock for run_process that simulates qemu-img info."""
115+
async def mock(cmd, **kwargs):
116+
result = AsyncMock()
117+
result.returncode = 0
118+
result.stdout = json.dumps({"format": "raw", "virtual-size": virtual_size}).encode()
119+
result.check_returncode = lambda: None
120+
return result
121+
return mock
122+
123+
124+
@pytest.mark.anyio
125+
async def test_resize_shrink_blocked():
126+
"""Shrinking disk should raise RuntimeError."""
127+
driver, current = _setup_resize_test("10G", 20) # requested: 10G, current: 20G
128+
129+
with patch("jumpstarter_driver_qemu.driver.run_process", side_effect=_mock_qemu_img_info(current)):
130+
with pytest.raises(RuntimeError, match="Shrinking disk is not supported"):
131+
await driver.children["power"].on()
132+
133+
134+
@pytest.mark.anyio
135+
async def test_resize_insufficient_space_blocked():
136+
"""Resize beyond available host space should raise RuntimeError."""
137+
driver, current = _setup_resize_test("100G", 10) # requested: 100G, current: 10G
138+
139+
mock_usage = type("U", (), {"free": 5 * 1024**3})() # only 5G free
140+
141+
with patch("jumpstarter_driver_qemu.driver.run_process", side_effect=_mock_qemu_img_info(current)):
142+
with patch("jumpstarter_driver_qemu.driver.shutil.disk_usage", return_value=mock_usage):
143+
with pytest.raises(RuntimeError, match="Not enough disk space"):
144+
await driver.children["power"].on()
145+
146+
147+
@pytest.mark.anyio
148+
async def test_resize_succeeds():
149+
"""Resize should call qemu-img resize with correct size."""
150+
driver, current = _setup_resize_test("20G", 10) # requested: 20G, current: 10G
151+
mock_usage = type("U", (), {"free": 50 * 1024**3})()
152+
153+
with patch("jumpstarter_driver_qemu.driver.run_process", side_effect=_mock_qemu_img_info(current)) as mock_run:
154+
with patch("jumpstarter_driver_qemu.driver.shutil.disk_usage", return_value=mock_usage):
155+
with patch("jumpstarter_driver_qemu.driver.Popen"):
156+
try:
157+
await driver.children["power"].on()
158+
except Exception:
159+
pass # Expected to fail at Popen
160+
161+
# Find the resize call
162+
resize_calls = [c for c in mock_run.call_args_list if "resize" in c[0][0]]
163+
assert resize_calls, "qemu-img resize should be called"
164+
assert resize_calls[0][0][0][-1] == str(20 * 1024**3)

0 commit comments

Comments
 (0)