Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cloudinit/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
# What u get if no config is provided
CFG_BUILTIN = {
"datasource_list": [
"QemuFwCfg",
"NoCloud",
"ConfigDrive",
"LXD",
Expand Down
144 changes: 144 additions & 0 deletions cloudinit/sources/DataSourceQemuFwCfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
# This file is part of cloud-init. See LICENSE file for license information.

"""DataSource for QEMU fw_cfg sysfs interface.

QEMU exposes arbitrary blobs to the guest via the fw_cfg mechanism. The
Linux ``qemu_fw_cfg`` kernel driver surfaces these blobs under
``/sys/firmware/qemu_fw_cfg/by_name/<entry-name>/raw``.

This datasource reads the following fw_cfg entries
(all under the ``opt/io.cloud-init/cloud-init/`` namespace),
in the order that cloud-init processes them:

``meta-data``
YAML dict with instance metadata (``instance-id``, ``local-hostname``, …)
``network-config``
Network configuration in v1 or v2 YAML format.
``user-data``
User-data payload (cloud-config YAML, shell script, etc.)
``vendor-data``
Vendor-data payload.

Both ``meta-data`` and ``user-data`` must be present for the datasource to
claim the instance.
"""

import logging
import os

from cloudinit import sources, util

LOG = logging.getLogger(__name__)

# Base sysfs path exposed by the qemu_fw_cfg kernel driver.
FWCFG_SYSFS = "/sys/firmware/qemu_fw_cfg/by_name"
# fw_cfg entry namespace agreed upon by the cloud-init project.
FWCFG_PREFIX = "opt/io.cloud-init/cloud-init"
FWCFG_PATH = os.path.join(FWCFG_SYSFS, FWCFG_PREFIX)

DEFAULT_IID = "iid-qemufwcfg"
DEFAULT_METADATA = {"instance-id": DEFAULT_IID}

# Slots in the order they are read and applied by cloud-init:
_SLOT_FILES = ("meta-data", "network-config", "user-data", "vendor-data")
_REQUIRED_SLOTS = frozenset({"meta-data", "user-data"})


def _read_fwcfg_slot(name: str):
"""Return raw bytes from a fw_cfg slot, or None if the slot is absent."""
raw_path = os.path.join(FWCFG_PATH, name, "raw")
try:
return util.load_binary_file(raw_path)
except FileNotFoundError:
return None
except OSError as exc:
LOG.warning("Failed to read fw_cfg slot %s: %s", name, exc)
return None


class DataSourceQemuFwCfg(sources.DataSource):
"""Read instance configuration from QEMU fw_cfg sysfs entries."""

dsname = "QemuFwCfg"

def __init__(self, sys_cfg, distro, paths):
super().__init__(sys_cfg, distro, paths)
self._network_config = None

def _unpickle(self, ci_pkl_version: int) -> None:
super()._unpickle(ci_pkl_version)
if not hasattr(self, "_network_config"):
self._network_config = None

def ds_detect(self) -> bool:
"""Return True when the fw_cfg namespace directory exists in sysfs."""
return os.path.isdir(FWCFG_PATH)

def _get_data(self) -> bool:
mydata: dict = {
"meta-data": {},
"network-config": None,
"user-data": "",
"vendor-data": "",
}
found: set = set()

for slot in _SLOT_FILES:
raw = _read_fwcfg_slot(slot)
if raw is None:
continue
found.add(slot)
text = raw.decode("utf-8", errors="replace")

if slot == "meta-data":
md = util.load_yaml(text)
if isinstance(md, dict):
mydata["meta-data"] = md
else:
LOG.warning(
"meta-data did not parse as a YAML dict: ignoring"
)
elif slot == "network-config":
nc = util.load_yaml(text)
if nc:
mydata["network-config"] = nc
else:
mydata[slot] = text

missing = _REQUIRED_SLOTS - found
if missing:
LOG.debug(
"QemuFwCfg: required slot(s) missing: %s",
", ".join(sorted(missing)),
)
return False

# DEFAULT_METADATA to fill gap if user did not specify
mydata["meta-data"] = util.mergemanydict(
[mydata["meta-data"], DEFAULT_METADATA]
)
self.metadata = mydata["meta-data"]
self._network_config = mydata["network-config"]
self.userdata_raw = mydata["user-data"]
self.vendordata_raw = mydata["vendor-data"]
return True

def _get_subplatform(self) -> str:
return "fw_cfg (%s)" % FWCFG_PATH

def _get_cloud_name(self) -> str:
return sources.METADATA_UNKNOWN

@property
def network_config(self):
return self._network_config


# QemuFwCfg only needs the filesystem (sysfs is available in the local stage).
datasources = [
(DataSourceQemuFwCfg, (sources.DEP_FILESYSTEM,)),
]


def get_datasource_list(depends):
return sources.list_from_depends(depends, datasources)
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ module = [
"cloudinit.sources.DataSourceOVF",
"cloudinit.sources.DataSourceOpenStack",
"cloudinit.sources.DataSourceOracle",
"cloudinit.sources.DataSourceQemuFwCfg",
"cloudinit.sources.DataSourceRbxCloud",
"cloudinit.sources.DataSourceScaleway",
"cloudinit.sources.DataSourceSmartOS",
Expand Down
174 changes: 174 additions & 0 deletions tests/unittests/sources/test_DataSourceQemuFwCfg.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
# This file is part of cloud-init. See LICENSE file for license information.

import logging

import pytest
import yaml

import cloudinit.sources.DataSourceQemuFwCfg as ds_mod
from cloudinit.sources.DataSourceQemuFwCfg import (
DEFAULT_IID,
DataSourceQemuFwCfg,
)


@pytest.fixture
def fwcfg_path(tmp_path, mocker):
"""Redirect FWCFG_PATH to a temporary directory and return it."""
mocker.patch.object(ds_mod, "FWCFG_PATH", str(tmp_path))
return tmp_path


def write_slot(fwcfg_path, name: str, content: bytes) -> None:
"""Write content into the ``<name>/raw`` file under fwcfg_path."""
slot_dir = fwcfg_path / name
slot_dir.mkdir(parents=True, exist_ok=True)
(slot_dir / "raw").write_bytes(content)


@pytest.fixture
def ds(paths):
return DataSourceQemuFwCfg(sys_cfg={}, distro=None, paths=paths)


class TestDsDetect:
def test_true_when_path_exists(self, fwcfg_path, ds):
assert ds.ds_detect() is True

def test_false_when_path_absent(self, tmp_path, ds, mocker):
mocker.patch.object(
ds_mod, "FWCFG_PATH", str(tmp_path / "nonexistent")
)
assert ds.ds_detect() is False


class TestGetDataRequired:
def test_both_required_slots_present(self, fwcfg_path, ds):
write_slot(fwcfg_path, "meta-data", b"instance-id: my-id\n")
write_slot(fwcfg_path, "user-data", b"#cloud-config\n{}")
assert ds.get_data() is True

def test_missing_meta_data_returns_false(self, fwcfg_path, ds):
write_slot(fwcfg_path, "user-data", b"#cloud-config\n{}")
assert ds.get_data() is False

def test_missing_user_data_returns_false(self, fwcfg_path, ds):
write_slot(fwcfg_path, "meta-data", b"instance-id: my-id\n")
assert ds.get_data() is False

def test_no_slots_returns_false(self, fwcfg_path, ds):
assert ds.get_data() is False


class TestGetDataMetadata:
def test_instance_id_from_meta_data(self, fwcfg_path, ds):
write_slot(fwcfg_path, "meta-data", b"instance-id: my-vm\n")
write_slot(fwcfg_path, "user-data", b"")
ds.get_data()
assert ds.metadata["instance-id"] == "my-vm"

def test_get_instance_id(self, fwcfg_path, ds):
write_slot(fwcfg_path, "meta-data", b"instance-id: test-id-123\n")
write_slot(fwcfg_path, "user-data", b"")
ds.get_data()
assert ds.get_instance_id() == "test-id-123"

def test_default_instance_id_when_absent_from_meta_data(
self, fwcfg_path, ds
):
write_slot(fwcfg_path, "meta-data", b"local-hostname: myhost\n")
write_slot(fwcfg_path, "user-data", b"")
ds.get_data()
assert ds.metadata["instance-id"] == DEFAULT_IID

def test_invalid_yaml_meta_data_logs_warning_and_uses_default(
self, fwcfg_path, ds, caplog
):
# YAML list is not a dict: load_yaml returns None, slot still 'found'
write_slot(fwcfg_path, "meta-data", b"- item1\n- item2\n")
write_slot(fwcfg_path, "user-data", b"")
with caplog.at_level(logging.WARNING):
result = ds.get_data()
assert result is True
assert "did not parse as a YAML dict" in caplog.text
assert ds.metadata["instance-id"] == DEFAULT_IID

def test_extra_metadata_keys_preserved(self, fwcfg_path, ds):
md = {"instance-id": "i-1", "local-hostname": "myhost"}
write_slot(fwcfg_path, "meta-data", yaml.dump(md).encode())
write_slot(fwcfg_path, "user-data", b"")
ds.get_data()
assert ds.metadata["local-hostname"] == "myhost"


class TestGetDataPayloads:
def test_user_data_stored(self, fwcfg_path, ds):
write_slot(fwcfg_path, "meta-data", b"instance-id: i-1\n")
write_slot(fwcfg_path, "user-data", b"#cloud-config\npackages: [git]")
ds.get_data()
assert ds.userdata_raw == "#cloud-config\npackages: [git]"

def test_invalid_utf8_bytes_replaced(self, fwcfg_path, ds):
write_slot(fwcfg_path, "meta-data", b"instance-id: i-1\n")
write_slot(fwcfg_path, "user-data", b"#cloud-config\n\xff\xfe")
ds.get_data()
assert "�" in ds.userdata_raw

def test_vendor_data_stored(self, fwcfg_path, ds):
write_slot(fwcfg_path, "meta-data", b"instance-id: i-1\n")
write_slot(fwcfg_path, "user-data", b"")
write_slot(fwcfg_path, "vendor-data", b"#cloud-config\nruncmd: [true]")
ds.get_data()
assert ds.vendordata_raw == "#cloud-config\nruncmd: [true]"

def test_vendor_data_empty_string_when_slot_absent(self, fwcfg_path, ds):
write_slot(fwcfg_path, "meta-data", b"instance-id: i-1\n")
write_slot(fwcfg_path, "user-data", b"")
ds.get_data()
assert ds.vendordata_raw == ""


class TestGetDataNetworkConfig:
def test_network_config_parsed(self, fwcfg_path, ds):
netconf = {"version": 2, "ethernets": {"eth0": {"dhcp4": True}}}
write_slot(fwcfg_path, "meta-data", b"instance-id: i-1\n")
write_slot(fwcfg_path, "user-data", b"")
write_slot(fwcfg_path, "network-config", yaml.dump(netconf).encode())
ds.get_data()
assert ds.network_config == netconf

def test_network_config_none_when_slot_absent(self, fwcfg_path, ds):
write_slot(fwcfg_path, "meta-data", b"instance-id: i-1\n")
write_slot(fwcfg_path, "user-data", b"")
ds.get_data()
assert ds.network_config is None

def test_network_config_invalid_yaml_treated_as_absent(
self, fwcfg_path, ds
):
write_slot(fwcfg_path, "meta-data", b"instance-id: i-1\n")
write_slot(fwcfg_path, "user-data", b"")
write_slot(fwcfg_path, "network-config", b"invalid: [unclosed")
ds.get_data()
assert ds.network_config is None


class TestGetDataErrors:
def test_oserror_on_slot_logs_warning_and_continues(
self, fwcfg_path, ds, caplog
):
write_slot(fwcfg_path, "meta-data", b"instance-id: i-1\n")
write_slot(fwcfg_path, "user-data", b"")
# raw as a directory triggers IsADirectoryError (OSError) on open()
(fwcfg_path / "vendor-data" / "raw").mkdir(parents=True)
with caplog.at_level(logging.WARNING):
assert ds.get_data() is True
assert "vendor-data" in caplog.text


class TestDataSourceMetadata:
def test_subplatform_format(self, ds):
assert ds.subplatform == "fw_cfg (%s)" % ds_mod.FWCFG_PATH

def test_cloud_name_is_unknown(self, ds):
assert ds.cloud_name == "unknown"
2 changes: 2 additions & 0 deletions tests/unittests/sources/test_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from cloudinit.sources import DataSourceOpenStack as OpenStack
from cloudinit.sources import DataSourceOracle as Oracle
from cloudinit.sources import DataSourceOVF as OVF
from cloudinit.sources import DataSourceQemuFwCfg as QemuFwCfg
from cloudinit.sources import DataSourceRbxCloud as RbxCloud
from cloudinit.sources import DataSourceScaleway as Scaleway
from cloudinit.sources import DataSourceSmartOS as SmartOS
Expand All @@ -51,6 +52,7 @@
OpenNebula.DataSourceOpenNebula,
Oracle.DataSourceOracle,
OVF.DataSourceOVF,
QemuFwCfg.DataSourceQemuFwCfg,
SmartOS.DataSourceSmartOS,
Vultr.DataSourceVultr,
Ec2.DataSourceEc2Local,
Expand Down
8 changes: 7 additions & 1 deletion tools/ds-identify
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ PATH_ROOT=${PATH_ROOT:-""}
PATH_SYS_CLASS_DMI_ID=${PATH_SYS_CLASS_DMI_ID:-${PATH_ROOT}/sys/class/dmi/id}
PATH_SYS_HYPERVISOR=${PATH_SYS_HYPERVISOR:-${PATH_ROOT}/sys/hypervisor}
PATH_SYS_CLASS_BLOCK=${PATH_SYS_CLASS_BLOCK:-${PATH_ROOT}/sys/class/block}
PATH_SYS_FWCFG=${PATH_SYS_FWCFG:-${PATH_ROOT}/sys/firmware/qemu_fw_cfg/by_name/opt/io.cloud-init/cloud-init}
PATH_DEV_DISK="${PATH_DEV_DISK:-${PATH_ROOT}/dev/disk}"
PATH_VAR_LIB_CLOUD="${PATH_VAR_LIB_CLOUD:-${PATH_ROOT}/var/lib/cloud}"
PATH_DI_CONFIG="${PATH_DI_CONFIG:-${PATH_ROOT}/etc/cloud/ds-identify.cfg}"
Expand Down Expand Up @@ -128,7 +129,7 @@ DI_SYSTEMD_VIRTUALIZATION=${SYSTEMD_VIRTUALIZATION:-}
DI_DSNAME=""
# this has to match the builtin list in cloud-init, it is what will
# be searched if there is no setting found in config.
DI_DSLIST_DEFAULT="MAAS ConfigDrive NoCloud AltCloud Azure Bigstep \
DI_DSLIST_DEFAULT="MAAS QemuFwCfg ConfigDrive NoCloud AltCloud Azure Bigstep \
CloudSigma CloudStack DigitalOcean Vultr AliYun Ec2 GCE OpenNebula OpenStack \
VMware OVF SmartOS Scaleway Hetzner IBMCloud Oracle Exoscale RbxCloud UpCloud \
LXD NWCS Akamai WSL CloudCIX"
Expand Down Expand Up @@ -1016,6 +1017,11 @@ dscheck_LXD() {
return ${DS_NOT_FOUND}
}

dscheck_QemuFwCfg() {
[ -d "${PATH_SYS_FWCFG}" ] && return ${DS_FOUND}
return ${DS_NOT_FOUND}
}

dscheck_NoCloud() {
local fslabel="cidata CIDATA" d=""
case " ${DI_DMI_PRODUCT_SERIAL} " in
Expand Down