diff --git a/sunbeam-python/tests/functional/feature/.gitignore b/sunbeam-python/tests/functional/feature/.gitignore new file mode 100644 index 000000000..db362e0b5 --- /dev/null +++ b/sunbeam-python/tests/functional/feature/.gitignore @@ -0,0 +1,21 @@ +test_config.yaml +features/adminrc +__pycache__/ +*.py[cod] +*$py.class +*.so +.Python + +venv/ +env/ +ENV/ + +.vscode/ +.idea/ +*.swp +*.swo + +.pytest_cache/ +.coverage +htmlcov/ +*.log diff --git a/sunbeam-python/tests/functional/feature/README.md b/sunbeam-python/tests/functional/feature/README.md new file mode 100644 index 000000000..653e1380c --- /dev/null +++ b/sunbeam-python/tests/functional/feature/README.md @@ -0,0 +1,199 @@ +# Sunbeam Feature Functional Tests + +Functional tests for Sunbeam feature enablement/disablement. These tests +connect to an **existing Sunbeam deployment** and run the enable/verify +lifecycle for each feature, with an optional disable phase controlled by +configuration. + +The suite is designed to be run via `tox` from the `sunbeam-python` tree. + +## Prerequisites + +- **Existing Sunbeam deployment** already bootstrapped and reachable +- `sunbeam` CLI on `PATH` and configured to talk to that deployment + - e.g. `sunbeam deployment list` shows your deployment +- `openstack` CLI configured for that cloud + - e.g. `openstack endpoint list` works +- `juju` CLI installed and able to access the controller/model that backs the + Sunbeam deployment + +## Configuration + +Create a config file from the example: + +```bash +cd sunbeam-python +cp tests/functional/feature/test_config.yaml.example tests/functional/feature/test_config.yaml +``` + +Then edit `tests/functional/feature/test_config.yaml`: + +```yaml +sunbeam: + deployment_name: "ps6" # Name shown by `sunbeam deployment list` + +juju: + model: "openstack" # Juju model backing the cloud + # controller: "my-controller" # Optional; auto-detected if omitted +``` + +### Run the full feature functional suite + +```bash +tox -e functional-feature +``` + +### Run a single feature functional test + +You can pass standard `pytest` selectors through tox via `posargs`. For example: + +- **Instance Recovery**: + + ```bash + tox -e functional-feature -- tests/functional/feature/test_features.py::test_instance_recovery + ``` + +- **TLS CA**: + + ```bash + tox -e functional-feature -- tests/functional/feature/test_features.py::test_tls_ca + ``` + +- **Vault**: + + ```bash + tox -e functional-feature -- tests/functional/feature/test_features.py::test_vault + ``` + +You can also run any single feature test **directly with the virtualenv Python**, +which is handy when you are iterating locally: + +```bash +../.venv/bin/python -m pytest \ + tests/functional/feature/test_features.py::test_ \ + --config tests/functional/feature/test_config.yaml +``` + +For example: + +- TLS CA: + + ```bash + ../.venv/bin/python -m pytest \ + tests/functional/feature/test_features.py::test_tls_ca \ + --config tests/functional/feature/test_config.yaml + ``` + +- Vault: + + ```bash + ../.venv/bin/python -m pytest \ + tests/functional/feature/test_features.py::test_vault \ + --config tests/functional/feature/test_config.yaml + ``` + +### Control whether features are disabled after tests + +By default, features are **left enabled** after their tests complete. You can +enable the legacy "enable then disable" behaviour via `test_config.yaml`: + +```yaml +features: + disable_after: true # disable every feature after its test +``` + +You can also override this per feature: + +```yaml +features: + disable_after: false # default for all features + + tls: + disable_after: true # only TLS is disabled after test + + vault: + disable_after: false # explicitly keep Vault enabled + +You can also override this behaviour **from the command line** without editing +the config file, using the `--features-disable-after` pytest option. When +running via `tox`: + +```bash +tox -e functional-feature -- --features-disable-after true # force disable +tox -e functional-feature -- --features-disable-after false # force keep enabled +``` + +Or directly with the virtualenv Python: + +```bash +../.venv/bin/python -m pytest \ + tests/functional/feature/test_features.py::test_ \ + --config tests/functional/feature/test_config.yaml \ + --features-disable-after true # or false + +Concrete examples: + +- **Run TLS CA test and disable TLS after it completes**: + + ```bash + tox -e functional-feature -- \ + tests/functional/feature/test_features.py::test_tls_ca \ + --features-disable-after true + ``` + +- **Run Vault test and keep Vault enabled afterwards** (even if config sets + `disable_after: true`): + + ```bash + tox -e functional-feature -- \ + tests/functional/feature/test_features.py::test_vault \ + --features-disable-after false + ``` +``` + +## Feature coverage and dependencies + +### Features in this suite + +- **Enabled in current flow** + - `instance-recovery` + - `caas` (Containers as a Service) + - `dns` + - `images-sync` + - `loadbalancer` + - `resource-optimization` + - `shared-filesystem` + - `telemetry` + - `observability` + - `tls` (CA mode) + - `vault` + - `validation` + - `secrets` + +- **Present but intentionally disabled for now** + - `baremetal` + - `ldap` + - `maintenance` + - `pro` + +### Feature dependencies + +Some features have explicit dependencies: + +- **CaaS (`caas`)** + - Depends on: **`secrets`**, **`loadbalancer`** + - The CaaS test ensures these dependencies are enabled before running. + +- **Secrets as a Service (`secrets`)** + - Depends on: **`vault`** + - The Secrets test ensures the Vault feature is enabled before running. + +- **TLS (Vault-backed)** + - TLS can also be deployed in a Vault-backed mode which implicitly depends on + the **`vault`** feature. This suite currently exercises only the TLS CA + mode (`test_tls_ca`). + +## Notes + +- When `disable_after` is enabled (globally or per-feature), disable failures + are **logged and ignored** so that the suite continues to the next feature. diff --git a/sunbeam-python/tests/functional/feature/__init__.py b/sunbeam-python/tests/functional/feature/__init__.py new file mode 100644 index 000000000..f3adc6677 --- /dev/null +++ b/sunbeam-python/tests/functional/feature/__init__.py @@ -0,0 +1,8 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Sunbeam feature functional test suite. + +These tests exercise `sunbeam enable/disable` for individual features +against an existing Sunbeam deployment. +""" diff --git a/sunbeam-python/tests/functional/feature/conftest.py b/sunbeam-python/tests/functional/feature/conftest.py new file mode 100644 index 000000000..56c2a35f0 --- /dev/null +++ b/sunbeam-python/tests/functional/feature/conftest.py @@ -0,0 +1,88 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Pytest configuration and fixtures for Sunbeam feature functional tests.""" + +from pathlib import Path + +import pytest +import yaml + +from .utils.juju import JujuClient +from .utils.sunbeam import SunbeamClient + + +def pytest_addoption(parser): + """Add custom command-line options.""" + parser.addoption( + "--config", + action="store", + default="test_config.yaml", + help="Path to test configuration file", + ) + parser.addoption( + "--features-disable-after", + action="store", + choices=["true", "false"], + default=None, + help=( + "Override features.disable_after (true/false) from test_config.yaml " + "without editing the file." + ), + ) + + +@pytest.fixture(scope="session") +def test_config(request): + """Load test configuration from YAML file.""" + config_path = request.config.getoption("--config") + # Resolve relative to this feature functional directory + config_file = Path(__file__).parent / config_path + + if not config_file.exists(): + msg = ( + f"Configuration file not found: {config_file}. " + "Copy tests/functional/feature/test_config.yaml.example to " + "tests/functional/feature/test_config.yaml and set sunbeam.deployment_name, juju.model." + ) + pytest.skip(msg) + + with open(config_file, "r") as f: + config = yaml.safe_load(f) + + # Optional CLI override for disable-after behaviour. + cli_disable_after = request.config.getoption("--features-disable-after") + if cli_disable_after is not None: + features_cfg = config.setdefault("features", {}) + features_cfg["disable_after"] = cli_disable_after == "true" + + return config + + +@pytest.fixture(scope="session") +def sunbeam_client(test_config): + """Create Sunbeam client for test session.""" + deployment_name = test_config.get("sunbeam", {}).get("deployment_name") + if not deployment_name: + pytest.skip("deployment_name not configured in test_config.yaml") + + client = SunbeamClient(deployment_name) + + if not client.is_connected(): + pytest.skip(f"Cannot connect to Sunbeam deployment '{deployment_name}'.") + + return client + + +@pytest.fixture(scope="session") +def juju_client(test_config): + """Create Juju client for test session.""" + model = test_config.get("juju", {}).get("model", "openstack") + controller = test_config.get("juju", {}).get("controller") + + client = JujuClient(model=model, controller=controller) + + if not client.is_connected(): + pytest.skip(f"Cannot connect to Juju model '{model}'.") + + return client diff --git a/sunbeam-python/tests/functional/feature/features/__init__.py b/sunbeam-python/tests/functional/feature/features/__init__.py new file mode 100644 index 000000000..0f78a8aa4 --- /dev/null +++ b/sunbeam-python/tests/functional/feature/features/__init__.py @@ -0,0 +1,4 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Feature test classes for Sunbeam feature functional tests.""" diff --git a/sunbeam-python/tests/functional/feature/features/baremetal.py b/sunbeam-python/tests/functional/feature/features/baremetal.py new file mode 100644 index 000000000..57d99bfa8 --- /dev/null +++ b/sunbeam-python/tests/functional/feature/features/baremetal.py @@ -0,0 +1,42 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Test for baremetal feature. + +Baremetal provides Ironic-based bare metal provisioning. +Functionality is validated via the Ironic (baremetal) API. +""" + +import logging +import subprocess + +from .base import BaseFeatureTest + +logger = logging.getLogger(__name__) + + +class BaremetalTest(BaseFeatureTest): + """Test baremetal feature enablement/disablement.""" + + feature_name = "baremetal" + expected_applications: list[str] = [] + timeout_seconds = 600 + + def verify_validate_feature_behavior(self) -> None: + """Validate that the Baremetal (Ironic) API is reachable.""" + logger.info("Verifying Baremetal (Ironic) service is available...") + try: + subprocess.run( + ["openstack", "baremetal", "driver", "list"], + capture_output=True, + text=True, + timeout=30, + check=True, + ) + except Exception as exc: # noqa: BLE001 + logger.warning("Error while verifying Baremetal service: %s", exc) + raise AssertionError( + f"Baremetal service verification failed: {exc}" + ) from exc + + logger.info("Baremetal service verified via `openstack baremetal driver list`") diff --git a/sunbeam-python/tests/functional/feature/features/base.py b/sunbeam-python/tests/functional/feature/features/base.py new file mode 100644 index 000000000..29557308f --- /dev/null +++ b/sunbeam-python/tests/functional/feature/features/base.py @@ -0,0 +1,261 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Base class for Sunbeam feature functional tests.""" + +import logging +import os +import subprocess +import tempfile +import time +from pathlib import Path +from typing import Dict, List, Optional + +from ..utils.juju import JujuClient +from ..utils.sunbeam import SunbeamClient + +logger = logging.getLogger(__name__) + + +class BaseFeatureTest: + """Base class for testing Sunbeam features.""" + + feature_name: str = "" + expected_applications: List[str] = [] + timeout_seconds: int = 300 + enable_args: List[str] = [] + disable_args: List[str] = [] + # If True, run full enable+disable lifecycle. If False, leave feature enabled. + disable_after: bool = False + + def __init__( + self, + sunbeam_client: SunbeamClient, + juju_client: JujuClient, + config: Optional[Dict] = None, + ): + self.sunbeam = sunbeam_client + self.juju = juju_client + self.config = config or {} + + features_cfg = self.config.get("features", {}) + feature_config = features_cfg.get(self.feature_name, {}) + self.expected_applications = feature_config.get( + "expected_applications", + self.expected_applications, + ) + self.timeout_seconds = feature_config.get( + "timeout_seconds", + self.timeout_seconds, + ) + self.enable_args = feature_config.get("enable_args", self.enable_args) + self.disable_args = feature_config.get("disable_args", self.disable_args) + # Global default can be overridden per-feature. + global_disable_after = features_cfg.get("disable_after", self.disable_after) + self.disable_after = feature_config.get( + "disable_after", + global_disable_after, + ) + + self._ensure_openstack_env() + + def enable(self) -> bool: + """Enable the feature.""" + logger.info("Enabling feature: '%s'", self.feature_name) + return self.sunbeam.enable_feature( + self.feature_name, + extra_args=self.enable_args, + ) + + def disable(self) -> bool: + """Disable the feature. + + Returns True if successful, False otherwise. + """ + logger.info("Disabling feature: '%s'", self.feature_name) + try: + return self.sunbeam.disable_feature( + self.feature_name, + extra_args=self.disable_args, + ) + except Exception as exc: # noqa: BLE001 + logger.warning( + "Failed to disable feature '%s': %s", + self.feature_name, + exc, + ) + return False + + def run_full_lifecycle(self) -> bool: + """Run feature lifecycle: enable, validate, optionally disable. + + By default, features are left enabled after validation. If + ``disable_after`` is set (globally or per-feature via config), + a best-effort disable phase is run. Disable failures are logged + but do not fail the overall test. + """ + logger.info("Starting lifecycle test for feature: '%s'", self.feature_name) + + enable_start = time.time() + logger.info("[ENABLE] Starting enable for '%s'...", self.feature_name) + enable_success = self.enable() + enable_duration = time.time() - enable_start + if enable_success: + logger.info( + "[ENABLE] SUCCESS for '%s' - Time taken: %.2f seconds", + self.feature_name, + enable_duration, + ) + else: + logger.error( + "[ENABLE] FAILED for '%s' - Time taken: %.2f seconds", + self.feature_name, + enable_duration, + ) + return False + + try: + self.verify_validate_feature_behavior() + except Exception: # noqa: BLE001 + logger.exception( + "Validation failed for feature '%s' after enable", self.feature_name + ) + # Best-effort cleanup – if disable also fails, log and continue. + try: + self.disable() + except Exception: # noqa: BLE001 + logger.warning( + "Disable also failed while handling validation error for '%s'", + self.feature_name, + ) + return False + + if not self.disable_after: + logger.info( + "Leaving feature '%s' enabled (disable_after is False)", + self.feature_name, + ) + total_duration = time.time() - enable_start + logger.info( + "[SUMMARY] Feature '%s' - Enable: %.2fs, Disable: skipped, " + "Total: %.2fs", + self.feature_name, + enable_duration, + total_duration, + ) + return True + + disable_start = time.time() + logger.info("[DISABLE] Starting disable for '%s'...", self.feature_name) + disable_success = self.disable() + disable_duration = time.time() - disable_start + if disable_success: + logger.info( + "[DISABLE] SUCCESS for '%s' - Time taken: %.2f seconds", + self.feature_name, + disable_duration, + ) + else: + logger.warning( + "[DISABLE] FAILED for '%s' - Time taken: %.2f seconds (continuing anyway)", + self.feature_name, + disable_duration, + ) + + total_duration = time.time() - enable_start + logger.info( + "[SUMMARY] Feature '%s' - Enable: %.2fs, Disable: %.2fs (%s), Total: %.2fs", + self.feature_name, + enable_duration, + disable_duration, + "SUCCESS" if disable_success else "FAILED", + total_duration, + ) + return True + + def verify_enabled(self) -> None: + """Verify that expected applications and units are present. + + This is a boilerplate method for future use. Currently not called + by default, but can be overridden in subclasses to add verification. + """ + pass + + def validate_feature_behavior(self) -> None: + """Validate that the feature is working correctly. + + This is a boilerplate method for future use. Currently not called + by default, but can be overridden in subclasses to add functionality tests. + """ + pass + + def verify_validate_feature_behavior(self) -> None: + """Simple verification that feature is enabled and basic check passes. + + This is a simple method that can be called after enable to verify + the feature is working. Override in subclasses for feature-specific checks. + Subclasses can override validate_feature_behavior() for behavior checks; + that is invoked from here before the application presence checks. + """ + logger.info("Verifying feature '%s' is enabled...", self.feature_name) + self.validate_feature_behavior() + if self.expected_applications: + for app in self.expected_applications: + if self.juju.has_application(app): + logger.info("Application '%s' found", app) + else: + logger.warning( + "Application '%s' not found (may still be deploying)", app + ) + logger.info("Basic verification completed for feature '%s'", self.feature_name) + + def _load_openrc_file_into_env(self, path: Path) -> None: + """Load export KEY=value lines from an openrc file into os.environ.""" + for line in path.read_text().splitlines(): + line = line.strip() + if not line or line.startswith("#"): + continue + if not line.startswith("export "): + continue + _, rest = line.split("export ", 1) + if "=" not in rest: + continue + key, value = rest.split("=", 1) + key = key.strip() + value = value.strip().strip('"').strip("'") + os.environ.setdefault(key, value) + + def _ensure_openstack_env(self) -> None: + """Load OpenStack credentials for CLI access. + + Runs `sunbeam openrc` to generate credentials, stores them in a temp file + (e.g. /tmp/adminrc), and loads them into the environment so OpenStack CLI + subprocess calls use them. If OS_AUTH_URL is already set, this is a no-op. + If `sunbeam openrc` fails, falls back to the legacy features/adminrc file + if it exists. + """ + sunbeam_cmd = getattr(self.sunbeam, "_sunbeam_cmd", "sunbeam") + result = subprocess.run( + [sunbeam_cmd, "openrc"], + capture_output=True, + text=True, + check=False, + timeout=60, + ) + + if result.returncode == 0: + openrc_content = result.stdout or result.stderr or "" + if "export " in openrc_content: + adminrc_path = Path(tempfile.gettempdir()) / "adminrc" + try: + adminrc_path.write_text(openrc_content) + self._load_openrc_file_into_env(adminrc_path) + logger.info( + "Loaded OpenStack credentials from sunbeam openrc (%s)", + adminrc_path, + ) + return + except Exception: # noqa: BLE001 + logger.exception( + "Failed to write or load openrc from %s", adminrc_path + ) diff --git a/sunbeam-python/tests/functional/feature/features/caas.py b/sunbeam-python/tests/functional/feature/features/caas.py new file mode 100644 index 000000000..02dfa08f2 --- /dev/null +++ b/sunbeam-python/tests/functional/feature/features/caas.py @@ -0,0 +1,83 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Test for caas feature. + +Container as a Service (Magnum) allows managing Kubernetes clusters via OpenStack. +Functionality is validated via the Magnum (COE) API. +""" + +import logging +import subprocess + +import pytest + +from .base import BaseFeatureTest + +logger = logging.getLogger(__name__) + + +class CaaSTest(BaseFeatureTest): + """Test caas feature enablement/disablement.""" + + feature_name = "caas" + expected_units: list[str] = [] + expected_applications: list[str] = [] + timeout_seconds = 600 + + def _ensure_dependency_enabled(self, feature: str) -> bool: + """Best-effort enable a required dependency feature. + + If enabling the dependency fails (for example, missing Vault for + Secrets), we treat this as an unsatisfied dependency and skip. + """ + logger.info("Ensuring dependency feature '%s' is enabled for CaaS...", feature) + try: + self.sunbeam.enable_feature(feature) + except Exception as exc: # noqa: BLE001 + logger.warning( + "Failed to enable dependency '%s' required by CaaS: %s", + feature, + exc, + ) + return False + return True + + def verify_validate_feature_behavior(self) -> None: + """Validate that the Magnum (COE) API is reachable. + + We call `openstack coe cluster list` to confirm the API is up. + """ + logger.info("Verifying CaaS (Magnum) service is available...") + try: + subprocess.run( + ["openstack", "coe", "cluster", "list"], + capture_output=True, + text=True, + timeout=30, + check=True, + ) + except subprocess.CalledProcessError as exc: + logger.warning("Failed to list COE clusters: %s", exc.stderr) + raise AssertionError( + f"CaaS (Magnum) service not accessible: {exc.stderr}" + ) from exc + except Exception as exc: # noqa: BLE001 + logger.warning("Error while verifying CaaS service: %s", exc) + raise AssertionError(f"CaaS service verification failed: {exc}") from exc + + logger.info("CaaS (Magnum) service verified via `openstack coe cluster list`") + + def run_full_lifecycle(self) -> bool: + """Ensure dependencies then run the standard enable/verify/disable flow. + + CaaS depends on the Secrets and Load Balancer features. + """ + for dep in ("secrets", "loadbalancer"): + if not self._ensure_dependency_enabled(dep): + pytest.skip( + f"Skipping CaaS feature test: dependency '{dep}' " + "could not be enabled" + ) + + return super().run_full_lifecycle() diff --git a/sunbeam-python/tests/functional/feature/features/dns.py b/sunbeam-python/tests/functional/feature/features/dns.py new file mode 100644 index 000000000..a811c1eee --- /dev/null +++ b/sunbeam-python/tests/functional/feature/features/dns.py @@ -0,0 +1,40 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Test for dns feature. + +DNS requires nameservers as arguments, so we use dummy nameservers for testing. +DNS is a simple feature with no direct feature dependencies (besides the required +nameservers argument). Functionality is validated via the Designate (DNS) API. +""" + +import logging + +from .base import BaseFeatureTest + +logger = logging.getLogger(__name__) + + +class DnsTest(BaseFeatureTest): + """Test dns feature enablement/disablement.""" + + feature_name = "dns" + # DNS requires nameservers argument - using dummy values for testing + enable_args: list[str] = ["ns1.example.com."] + expected_applications: list[str] = [] + timeout_seconds = 600 + + def verify_validate_feature_behavior(self) -> None: + """Validate that DNS as a Service is reachable. + + We call `sunbeam dns address` to confirm that the + Designate service is registered and accessible. + """ + logger.info("Verifying DNS service endpoints are available...") + try: + self.sunbeam.run(["dns", "address"]) + except Exception as exc: # noqa: BLE001 + logger.warning("Error while verifying DNS service: %s", exc) + raise AssertionError(f"DNS service verification failed: {exc}") from exc + + logger.info("DNS service endpoints verified via `sunbeam dns address`") diff --git a/sunbeam-python/tests/functional/feature/features/images_sync.py b/sunbeam-python/tests/functional/feature/features/images_sync.py new file mode 100644 index 000000000..288f8fece --- /dev/null +++ b/sunbeam-python/tests/functional/feature/features/images_sync.py @@ -0,0 +1,43 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Test for images-sync feature. + +Images-sync is a simple feature with no dependencies. +Functionality is validated via the OpenStack Image API. +""" + +import logging +import subprocess + +from .base import BaseFeatureTest + +logger = logging.getLogger(__name__) + + +class ImagesSyncTest(BaseFeatureTest): + """Test images-sync feature enablement/disablement.""" + + feature_name = "images-sync" + expected_applications: list[str] = [] + timeout_seconds = 600 + + def verify_validate_feature_behavior(self) -> None: + """Validate that the Image service is reachable. + + We call `openstack image list` to confirm that Glance is responding. + """ + logger.info("Verifying Image service (Glance) is available...") + try: + subprocess.run( + ["openstack", "image", "list"], + capture_output=True, + text=True, + timeout=30, + check=True, + ) + except Exception as exc: # noqa: BLE001 + logger.warning("Error while verifying Image service: %s", exc) + raise AssertionError(f"Image service verification failed: {exc}") from exc + + logger.info("Image service verified via `openstack image list`") diff --git a/sunbeam-python/tests/functional/feature/features/instance_recovery.py b/sunbeam-python/tests/functional/feature/features/instance_recovery.py new file mode 100644 index 000000000..0f51f60ca --- /dev/null +++ b/sunbeam-python/tests/functional/feature/features/instance_recovery.py @@ -0,0 +1,42 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Test for instance-recovery feature.""" + +import subprocess + +from .base import BaseFeatureTest + + +class InstanceRecoveryTest(BaseFeatureTest): + """Test instance-recovery feature enablement/disablement.""" + + # CLI feature name + feature_name = "instance-recovery" + expected_applications = [ + "masakari", + "masakari-mysql-router", + "consul-management", + "consul-storage", + "consul-tenant", + ] + timeout_seconds = 900 + + def validate_feature_behavior(self) -> None: + """Run a small smoke test against the Masakari API. + + We call `openstack segment list` to confirm Masakari is responding + and that the CLI can talk to the Instance Recovery control plane. + """ + cmd = [ + "openstack", + "segment", + "list", + "-c", + "name", + "-c", + "service_type", + ] + result = subprocess.run(cmd, check=True, capture_output=True, text=True) + if not result.stdout.strip(): + raise AssertionError("openstack segment list returned no data") diff --git a/sunbeam-python/tests/functional/feature/features/ldap.py b/sunbeam-python/tests/functional/feature/features/ldap.py new file mode 100644 index 000000000..21aae49d5 --- /dev/null +++ b/sunbeam-python/tests/functional/feature/features/ldap.py @@ -0,0 +1,40 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Test for ldap feature. + +LDAP integration configures Keystone to authenticate against LDAP. +Functionality is minimally validated via the Identity API. +""" + +import logging +import subprocess + +from .base import BaseFeatureTest + +logger = logging.getLogger(__name__) + + +class LdapTest(BaseFeatureTest): + """Test ldap feature enablement/disablement.""" + + feature_name = "ldap" + expected_applications: list[str] = [] + timeout_seconds = 600 + + def verify_validate_feature_behavior(self) -> None: + """Validate that the Identity API is reachable.""" + logger.info("Verifying Identity (Keystone) service is available for LDAP...") + try: + subprocess.run( + ["openstack", "domain", "list"], + capture_output=True, + text=True, + timeout=30, + check=True, + ) + except Exception as exc: # noqa: BLE001 + logger.warning("Error while verifying Identity service: %s", exc) + raise AssertionError(f"LDAP feature verification failed: {exc}") from exc + + logger.info("Identity service verified via `openstack domain list`") diff --git a/sunbeam-python/tests/functional/feature/features/loadbalancer.py b/sunbeam-python/tests/functional/feature/features/loadbalancer.py new file mode 100644 index 000000000..1ff3549b9 --- /dev/null +++ b/sunbeam-python/tests/functional/feature/features/loadbalancer.py @@ -0,0 +1,42 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Test for loadbalancer feature. + +Loadbalancer is a simple feature with no dependencies. +Deploys Octavia, the OpenStack Load Balancer as a Service. +""" + +import logging +import subprocess + +from .base import BaseFeatureTest + +logger = logging.getLogger(__name__) + + +class LoadbalancerTest(BaseFeatureTest): + """Test loadbalancer feature enablement/disablement.""" + + feature_name = "loadbalancer" + expected_applications: list[str] = ["octavia"] + timeout_seconds = 600 + + def verify_validate_feature_behavior(self) -> None: + """Validate that loadbalancer service (Octavia) is working.""" + logger.info("Verifying loadbalancer service (Octavia) is available...") + + try: + result = subprocess.run( + ["openstack", "loadbalancer", "list"], + capture_output=True, + text=True, + timeout=30, + check=True, + ) + logger.info("Loadbalancer service (Octavia) is accessible") + logger.debug("Loadbalancer list output: %s", result.stdout[:200]) + + except Exception as e: + logger.warning("Error checking loadbalancer service: %s", e) + raise AssertionError(f"Loadbalancer service verification failed: {e}") diff --git a/sunbeam-python/tests/functional/feature/features/maintenance.py b/sunbeam-python/tests/functional/feature/features/maintenance.py new file mode 100644 index 000000000..a2eeac6ee --- /dev/null +++ b/sunbeam-python/tests/functional/feature/features/maintenance.py @@ -0,0 +1,38 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Test for maintenance feature.""" + +import logging +import subprocess + +from .base import BaseFeatureTest + +logger = logging.getLogger(__name__) + + +class MaintenanceTest(BaseFeatureTest): + """Test maintenance feature enablement/disablement.""" + + feature_name = "maintenance" + expected_applications: list[str] = [] + timeout_seconds = 600 + + def verify_validate_feature_behavior(self) -> None: + """Validate that the Compute API is reachable.""" + logger.info("Verifying Compute service is available for maintenance...") + try: + subprocess.run( + ["openstack", "compute", "service", "list"], + capture_output=True, + text=True, + timeout=30, + check=True, + ) + except Exception as exc: # noqa: BLE001 + logger.warning("Error while verifying Compute service: %s", exc) + raise AssertionError( + f"Maintenance feature verification failed: {exc}" + ) from exc + + logger.info("Compute service verified via `openstack compute service list`") diff --git a/sunbeam-python/tests/functional/feature/features/observability.py b/sunbeam-python/tests/functional/feature/features/observability.py new file mode 100644 index 000000000..9f6366cd3 --- /dev/null +++ b/sunbeam-python/tests/functional/feature/features/observability.py @@ -0,0 +1,51 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Test for observability feature. + +Observability integrates Canonical OpenStack with COS. + +For this functional test we exercise the simple embedded workflow from the +documentation: + +1. `sunbeam enable observability embedded` +2. `sunbeam observability dashboard-url` +3. `sunbeam disable observability embedded` +""" + +import logging + +from .base import BaseFeatureTest + +logger = logging.getLogger(__name__) + + +class ObservabilityTest(BaseFeatureTest): + """Test observability feature enablement/disablement.""" + + feature_name = "observability" + enable_args: list[str] = ["embedded"] + disable_args: list[str] = ["embedded"] + expected_applications: list[str] = [] + timeout_seconds = 900 + + def verify_validate_feature_behavior(self) -> None: + """Validate that the observability dashboard URL is available. + + This uses `sunbeam observability dashboard-url` from the docs to + confirm that the embedded COS deployment is responding. + """ + logger.info("Fetching observability dashboard URL...") + try: + result = self.sunbeam.run(["observability", "dashboard-url"]) + except Exception as exc: # noqa: BLE001 + logger.warning( + "Error while retrieving observability dashboard URL: %s", + exc, + ) + raise AssertionError( + f"Observability feature verification failed: {exc}" + ) from exc + + url = result.stdout.strip() + logger.info("Observability dashboard URL: %s", url) diff --git a/sunbeam-python/tests/functional/feature/features/orchestration.py b/sunbeam-python/tests/functional/feature/features/orchestration.py new file mode 100644 index 000000000..b09601f56 --- /dev/null +++ b/sunbeam-python/tests/functional/feature/features/orchestration.py @@ -0,0 +1,42 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Test for orchestration feature. + +Orchestration is a simple feature with no dependencies. +Deploys Heat, the OpenStack Orchestration service. +""" + +import logging +import subprocess + +from .base import BaseFeatureTest + +logger = logging.getLogger(__name__) + + +class OrchestrationTest(BaseFeatureTest): + """Test orchestration feature enablement/disablement.""" + + feature_name = "orchestration" + expected_applications: list[str] = ["heat"] + timeout_seconds = 600 + + def verify_validate_feature_behavior(self) -> None: + """Validate that orchestration service (Heat) is working.""" + logger.info("Verifying orchestration service (Heat) is available...") + + try: + result = subprocess.run( + ["openstack", "stack", "list"], + capture_output=True, + text=True, + timeout=30, + check=True, + ) + logger.info("Orchestration service (Heat) is accessible") + logger.debug("Stack list output: %s", result.stdout[:200]) + + except Exception as e: + logger.warning("Error checking orchestration service: %s", e) + raise AssertionError(f"Orchestration service verification failed: {e}") diff --git a/sunbeam-python/tests/functional/feature/features/pro.py b/sunbeam-python/tests/functional/feature/features/pro.py new file mode 100644 index 000000000..7948bde8f --- /dev/null +++ b/sunbeam-python/tests/functional/feature/features/pro.py @@ -0,0 +1,61 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Test for pro feature. + +Ubuntu Pro integrates subscription/entitlement with the deployment. +Functionality is minimally validated via a generic OpenStack service call. +""" + +import logging +import subprocess + +from .base import BaseFeatureTest + +logger = logging.getLogger(__name__) + + +class ProTest(BaseFeatureTest): + """Test pro feature enablement/disablement.""" + + feature_name = "pro" + expected_applications: list[str] = [] + timeout_seconds = 600 + + def __init__(self, *args, **kwargs) -> None: + """Initialise Pro test with a token argument for enable. + + The token is taken from the functional test configuration, if present. + If no token is configured, a dummy placeholder is used. + """ + super().__init__(*args, **kwargs) + pro_cfg = self.config.get("pro", {}) if self.config is not None else {} + token = pro_cfg.get("token", "DUMMY-UBUNTU-PRO-TOKEN") + self.enable_args = ["--token", token] + + def verify_validate_feature_behavior(self) -> None: + """Validate that OpenStack APIs remain reachable under Pro.""" + logger.info("Verifying OpenStack service catalog for Ubuntu Pro...") + try: + result = subprocess.run( + ["openstack", "service", "list"], + capture_output=True, + text=True, + timeout=30, + check=True, + ) + except subprocess.CalledProcessError as exc: + logger.warning("Failed to list services: %s", exc.stderr) + raise AssertionError( + f"OpenStack service catalog not accessible: {exc.stderr}" + ) from exc + except Exception as exc: # noqa: BLE001 + logger.warning("Error while verifying OpenStack services: %s", exc) + raise AssertionError( + f"Ubuntu Pro feature verification failed: {exc}" + ) from exc + + if not result.stdout.strip(): + raise AssertionError("Service list returned no data") + + logger.info("OpenStack service catalog verified via `openstack service list`") diff --git a/sunbeam-python/tests/functional/feature/features/resource_optimization.py b/sunbeam-python/tests/functional/feature/features/resource_optimization.py new file mode 100644 index 000000000..8db74bbb2 --- /dev/null +++ b/sunbeam-python/tests/functional/feature/features/resource_optimization.py @@ -0,0 +1,50 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Test for resource-optimization feature. + +Resource Optimization provides Watcher as a service. +Functionality is validated via the Watcher (optimize) API. +""" + +import logging +import subprocess + +from .base import BaseFeatureTest + +logger = logging.getLogger(__name__) + + +class ResourceOptimizationTest(BaseFeatureTest): + """Test resource-optimization feature enablement/disablement.""" + + feature_name = "resource-optimization" + expected_applications: list[str] = [] + timeout_seconds = 600 + + def verify_validate_feature_behavior(self) -> None: + """Validate that the Watcher (resource optimization) API is reachable. + + We call `openstack optimize goal list` to confirm the API is up. + """ + logger.info("Verifying Resource Optimization (Watcher) service is available...") + try: + subprocess.run( + ["openstack", "optimize", "goal", "list"], + capture_output=True, + text=True, + timeout=30, + check=True, + ) + except Exception as exc: # noqa: BLE001 + logger.warning( + "Error while verifying Resource Optimization service: %s", + exc, + ) + raise AssertionError( + f"Resource Optimization service verification failed: {exc}" + ) from exc + + logger.info( + "Resource Optimization service verified via `openstack optimize goal list`" + ) diff --git a/sunbeam-python/tests/functional/feature/features/secrets.py b/sunbeam-python/tests/functional/feature/features/secrets.py new file mode 100644 index 000000000..49fdf12da --- /dev/null +++ b/sunbeam-python/tests/functional/feature/features/secrets.py @@ -0,0 +1,57 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Test for secrets feature.""" + +import logging +import subprocess + +import pytest + +from .base import BaseFeatureTest + +logger = logging.getLogger(__name__) + + +class SecretsTest(BaseFeatureTest): + """Test secrets feature enablement/disablement.""" + + feature_name = "secrets" + expected_applications: list[str] = [] + timeout_seconds = 600 + + def _ensure_vault_enabled(self) -> bool: + """Ensure the Vault feature is enabled before Secrets.""" + logger.info("Ensuring 'vault' feature is enabled before 'secrets'...") + try: + self.sunbeam.enable_feature("vault") + except Exception as exc: # noqa: BLE001 + logger.warning("Failed to enable required dependency 'vault': %s", exc) + return False + return True + + def verify_validate_feature_behavior(self) -> None: + """Validate that the Secrets (Barbican) API is reachable.""" + logger.info("Verifying Secrets (Barbican) service is available...") + try: + subprocess.run( + ["openstack", "secret", "list"], + capture_output=True, + text=True, + timeout=30, + check=True, + ) + except Exception as exc: # noqa: BLE001 + logger.warning("Error while verifying Secrets service: %s", exc) + raise AssertionError(f"Secrets service verification failed: {exc}") from exc + + logger.info("Secrets service verified via `openstack secret list`") + + def run_full_lifecycle(self) -> bool: + """Enable Vault first, then run the Secrets lifecycle.""" + if not self._ensure_vault_enabled(): + pytest.skip( + "Skipping Secrets feature test: dependency 'vault' not available" + ) + + return super().run_full_lifecycle() diff --git a/sunbeam-python/tests/functional/feature/features/shared_filesystem.py b/sunbeam-python/tests/functional/feature/features/shared_filesystem.py new file mode 100644 index 000000000..086cdbe41 --- /dev/null +++ b/sunbeam-python/tests/functional/feature/features/shared_filesystem.py @@ -0,0 +1,48 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Test for shared-filesystem feature. + +Shared Filesystems provides Manila-based file share services. +Functionality is validated via the Manila API. +""" + +import logging +import subprocess + +from .base import BaseFeatureTest + +logger = logging.getLogger(__name__) + + +class SharedFilesystemTest(BaseFeatureTest): + """Test shared-filesystem feature enablement/disablement.""" + + feature_name = "shared-filesystem" + expected_applications: list[str] = [] + timeout_seconds = 600 + + def verify_validate_feature_behavior(self) -> None: + """Validate that the Shared Filesystems (Manila) API is reachable. + + We call `openstack share list` to confirm the API is up. + """ + logger.info("Verifying Shared Filesystems (Manila) service is available...") + try: + subprocess.run( + ["openstack", "share", "list"], + capture_output=True, + text=True, + timeout=30, + check=True, + ) + except Exception as exc: # noqa: BLE001 + logger.warning( + "Error while verifying Shared Filesystems service: %s", + exc, + ) + raise AssertionError( + f"Shared Filesystems service verification failed: {exc}" + ) from exc + + logger.info("Shared Filesystems service verified via `openstack share list`") diff --git a/sunbeam-python/tests/functional/feature/features/telemetry.py b/sunbeam-python/tests/functional/feature/features/telemetry.py new file mode 100644 index 000000000..191867230 --- /dev/null +++ b/sunbeam-python/tests/functional/feature/features/telemetry.py @@ -0,0 +1,43 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Test for telemetry feature. + +Telemetry is a simple feature with no dependencies. +Deploys Ceilometer, Aodh, Gnocchi, and OpenStack Exporter. +""" + +import logging +import subprocess + +from .base import BaseFeatureTest + +logger = logging.getLogger(__name__) + + +class TelemetryTest(BaseFeatureTest): + """Test telemetry feature enablement/disablement.""" + + feature_name = "telemetry" + expected_applications: list[str] = ["ceilometer", "gnocchi", "aodh"] + timeout_seconds = 600 + + def verify_validate_feature_behavior(self) -> None: + """Validate that telemetry services are working.""" + logger.info("Verifying telemetry services are available...") + + # Check if alarm service (Aodh) is accessible + try: + result = subprocess.run( + ["openstack", "alarm", "list"], + capture_output=True, + text=True, + timeout=30, + check=True, + ) + logger.info("Telemetry alarm service (Aodh) is accessible") + logger.debug("Alarm list output: %s", result.stdout[:200]) + + except Exception as e: + logger.warning("Error checking telemetry services: %s", e) + raise AssertionError(f"Telemetry service verification failed: {e}") diff --git a/sunbeam-python/tests/functional/feature/features/tls.py b/sunbeam-python/tests/functional/feature/features/tls.py new file mode 100644 index 000000000..051b12120 --- /dev/null +++ b/sunbeam-python/tests/functional/feature/features/tls.py @@ -0,0 +1,922 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Tests for TLS feature (CA and Vault modes).""" + +import base64 +import logging +import subprocess +import tempfile +import time +from pathlib import Path +from typing import Tuple + +import yaml +from cryptography import x509 +from cryptography.x509.oid import NameOID + +from sunbeam.features.interface.utils import get_subject_from_csr + +from .base import BaseFeatureTest +from .vault import ensure_vault_prerequisites + +# TLS Vault root+intermediate CA: (ca_cert_b64, ca_chain_b64, inter_pem, +# inter_key_pem, vault_ca_conf, certindex, certserial) +VaultCaMaterial = Tuple[str, str, str, str, str, str, str] + +logger = logging.getLogger(__name__) + + +def generate_self_signed_ca_certificate() -> Tuple[str, str]: + """Generate a self-signed CA certificate. + + Returns a tuple of (ca_cert_base64, ca_chain_base64). For a simple self-signed CA, + the chain is the same as the cert. TLS CA currently only uses the CA certificate. + """ + cert_b64, chain_b64, _, _ = generate_self_signed_ca_certificate_with_key() + return (cert_b64, chain_b64) + + +def generate_self_signed_ca_certificate_with_key() -> Tuple[str, str, str, str]: + """Generate a self-signed CA certificate and private key. + + Returns (ca_cert_base64, ca_chain_base64, ca_cert_pem, ca_key_pem). + Used when the test must sign CSRs (e.g. full TLS CA flow). + """ + with tempfile.TemporaryDirectory() as tmpdir: + tmp_path = Path(tmpdir) + + key_path = tmp_path / "ca.key" + subprocess.run( + ["openssl", "genrsa", "-out", str(key_path), "4096"], + check=True, + capture_output=True, + ) + + cert_path = tmp_path / "ca.crt" + subprocess.run( + [ + "openssl", + "req", + "-new", + "-x509", + "-days", + "365", + "-key", + str(key_path), + "-out", + str(cert_path), + "-subj", + "/C=US/ST=State/L=City/O=TestOrg/CN=TestCA", + "-extensions", + "v3_ca", + "-config", + "/dev/stdin", + ], + input=b"""[req] +distinguished_name = req_distinguished_name +[req_distinguished_name] +[v3_ca] +basicConstraints = critical,CA:TRUE +keyUsage = critical,keyCertSign,cRLSign +subjectKeyIdentifier = hash +authorityKeyIdentifier = keyid:always,issuer +""", + check=True, + capture_output=True, + ) + + ca_cert_pem = cert_path.read_text() + ca_key_pem = key_path.read_text() + ca_cert_base64 = base64.b64encode(ca_cert_pem.encode()).decode() + ca_chain_base64 = ca_cert_base64 + + return (ca_cert_base64, ca_chain_base64, ca_cert_pem, ca_key_pem) + + +def generate_root_and_intermediate_ca_for_vault() -> VaultCaMaterial: + """Generate root CA and intermediate CA for TLS Vault. + + Follows the canonical doc: + - Root CA: 8192-bit key, sha256, 3650 days, CA config with certindex/serial. + - Intermediate CA: 8192-bit key, CSR signed by root via openssl ca -config + ca.conf. + - CA chain: intermediate then root (interca1.crt + rootca.crt). + - Returns material so Vault CSR can be signed via openssl ca -config + vault-ca.conf. + + Returns: + (ca_cert_base64, ca_chain_base64, inter_cert_pem, inter_key_pem, + vault_ca_conf_content, certindex_content, certserial_content). + For enable use --ca=intermediate cert, --ca-chain=chain (inter+root). + """ + with tempfile.TemporaryDirectory() as tmpdir: + tmp_path = Path(tmpdir) + + # CA database and serial (per doc) + (tmp_path / "certindex").touch() + (tmp_path / "certserial").write_text("1000\n") + (tmp_path / "crlnumber").write_text("1000\n") + + # Root CA config (per doc) + ca_conf = """[ ca ] +default_ca = CA_default + +[ CA_default ] +dir = . +database = certindex +new_certs_dir = . +certificate = rootca.crt +private_key = rootca.key +serial = certserial +default_days = 375 +default_crl_days = 30 +default_md = sha256 +policy = policy_anything +x509_extensions = v3_ca + +[ v3_ca ] +basicConstraints = critical,CA:true + +[ policy_anything ] +countryName = optional +stateOrProvinceName = optional +organizationName = optional +organizationalUnitName = optional +commonName = supplied +""" + (tmp_path / "ca.conf").write_text(ca_conf) + + # Root CA key and cert (8192-bit, sha256, 3650 days) + subprocess.run( + ["openssl", "genrsa", "-out", "rootca.key", "8192"], + check=True, + capture_output=True, + cwd=tmp_path, + ) + subprocess.run( + [ + "openssl", + "req", + "-sha256", + "-new", + "-x509", + "-days", + "3650", + "-key", + "rootca.key", + "-out", + "rootca.crt", + "-subj", + "/C=US/ST=State/L=City/O=TestOrg/CN=TestRootCA", + ], + check=True, + capture_output=True, + cwd=tmp_path, + ) + + # Intermediate CA key and CSR + subprocess.run( + ["openssl", "genrsa", "-out", "interca1.key", "8192"], + check=True, + capture_output=True, + cwd=tmp_path, + ) + subprocess.run( + [ + "openssl", + "req", + "-sha256", + "-new", + "-key", + "interca1.key", + "-out", + "interca1.csr", + "-subj", + "/C=US/ST=State/L=City/O=TestOrg/CN=TestInterCA", + ], + check=True, + capture_output=True, + cwd=tmp_path, + ) + + # Sign intermediate with root (openssl ca -batch -config ca.conf) + subprocess.run( + [ + "openssl", + "ca", + "-batch", + "-config", + "ca.conf", + "-notext", + "-in", + "interca1.csr", + "-out", + "interca1.crt", + ], + check=True, + capture_output=True, + cwd=tmp_path, + ) + + # Chain: intermediate then root (per doc) + inter_pem = (tmp_path / "interca1.crt").read_text() + root_pem = (tmp_path / "rootca.crt").read_text() + chain_pem = inter_pem + root_pem + + inter_key_pem = (tmp_path / "interca1.key").read_text() + + # vault-ca.conf uses intermediate as the signing CA (per doc) + vault_ca_conf = """[ ca ] +default_ca = CA_default + +[ CA_default ] +dir = . +database = certindex +new_certs_dir = . +certificate = interca1.crt +private_key = interca1.key +serial = certserial +default_days = 375 +default_crl_days = 30 +default_md = sha256 +policy = policy_anything +x509_extensions = v3_ca + +[ v3_ca ] +basicConstraints = critical,CA:true + +[ policy_anything ] +countryName = optional +stateOrProvinceName = optional +organizationName = optional +organizationalUnitName = optional +commonName = supplied + +[alt_names] +DNS.1 = test.com +""" + certindex_content = (tmp_path / "certindex").read_text() + certserial_content = (tmp_path / "certserial").read_text() + + ca_cert_b64 = base64.b64encode(inter_pem.encode()).decode() + ca_chain_b64 = base64.b64encode(chain_pem.encode()).decode() + + return ( + ca_cert_b64, + ca_chain_b64, + inter_pem, + inter_key_pem, + vault_ca_conf, + certindex_content, + certserial_content, + ) + + +class TlsCaTest(BaseFeatureTest): + """Test TLS CA mode enablement/disablement (full flow). + + TLS CA mode uses Certificate Authority certificates for TLS. + This test runs the complete flow: + - Enable TLS CA with --ca and --endpoint public/internal/rgw + - List outstanding CSRs (with retry/backoff) + - Sign CSRs and update manifest + - Push certs via sunbeam tls ca unit_certs -m manifest + - Verify endpoints are HTTPS and basic OpenStack operations work. + """ + + feature_name = "tls" + enable_args: list[str] = [] + disable_args: list[str] = ["ca"] + expected_applications = [ + "manual-tls-certificates", + ] + timeout_seconds = 600 + + def __init__(self, *args, **kwargs): + """Initialise CA material (root+intermediate) for signing CSRs. + + For TLS CA we mirror the TLS Vault PKI model and the + generate-a-ca-certificate.rst guide by generating a root CA and an + intermediate CA, then using the intermediate as the issuing CA for + Traefik CSRs. The intermediate certificate (base64) is passed via + --ca, and its key is used to sign the outstanding CSRs; the chain + (intermediate + root) is currently not surfaced but can be used if + needed in future. + """ + super().__init__(*args, **kwargs) + ( + self.ca_cert_base64, + self.ca_chain_base64, + self._ca_cert_pem, + self._ca_key_pem, + _vault_ca_conf, + _certindex, + _certserial, + ) = generate_root_and_intermediate_ca_for_vault() + self.enable_args = [ + "ca", + "--ca", + self.ca_cert_base64, + "--ca-chain", + self.ca_chain_base64, + "--endpoint", + "public", + "--endpoint", + "internal", + "--endpoint", + "rgw", + ] + + def enable(self) -> bool: + """Enable TLS CA feature (without --accept-defaults flag).""" + logger.info("Enabling feature: '%s'", self.feature_name) + return self.sunbeam.enable_feature( + self.feature_name, + extra_args=self.enable_args, + ) + + def disable(self) -> bool: + """Disable TLS CA feature (without --accept-defaults flag).""" + logger.info("Disabling feature: '%s'", self.feature_name) + try: + return self.sunbeam.disable_feature( + self.feature_name, + extra_args=self.disable_args, + ) + except Exception as exc: # noqa: BLE001 + logger.warning( + "Failed to disable feature '%s': %s", + self.feature_name, + exc, + ) + return False + + def _sign_ca_csrs_and_install(self) -> None: + """List TLS CA CSRs (retry), sign with test CA, write manifest, unit_certs. + + Mirrors SQA enable_tls: list_outstanding_csrs -> sign -> manifest -> + unit_certs. Certificate dict keyed by subject (X500 id from CSR). + """ + sunbeam_cmd = getattr(self.sunbeam, "_sunbeam_cmd", "sunbeam") + max_attempts = 10 + backoff_seconds = 15 + csrs_list: list = [] + for attempt in range(1, max_attempts + 1): + logger.info( + "Listing outstanding TLS CA CSRs (attempt %d/%d)...", + attempt, + max_attempts, + ) + result = subprocess.run( + [ + sunbeam_cmd, + "tls", + "ca", + "list_outstanding_csrs", + "--format", + "yaml", + ], + check=True, + capture_output=True, + text=True, + ) + raw = yaml.safe_load(result.stdout or "") or [] + csrs_list = raw if isinstance(raw, list) else [] + if csrs_list: + break + if attempt < max_attempts: + logger.info( + "No CSRs yet (Traefik may still be coming up); retrying in %ds...", + backoff_seconds, + ) + time.sleep(backoff_seconds) + + if not csrs_list: + logger.info( + "No outstanding TLS CA CSRs after %d attempts; skipping unit_certs.", + max_attempts, + ) + return + + certificates: dict[str, dict[str, str]] = {} + with tempfile.TemporaryDirectory() as tmpdir: + tmp_path = Path(tmpdir) + ca_cert_path = tmp_path / "ca.crt" + ca_key_path = tmp_path / "ca.key" + ca_cert_path.write_text(self._ca_cert_pem) + ca_key_path.write_text(self._ca_key_pem) + + for record in csrs_list: + if not isinstance(record, dict): + continue + csr_pem = record.get("csr") + if not csr_pem: + continue + subject = get_subject_from_csr(str(csr_pem).strip()) + if not subject: + logger.warning("Could not get subject from CSR; skipping record") + continue + csr_path = tmp_path / f"req_{subject[:8]}.csr" + csr_path.write_text(str(csr_pem).strip() + "\n") + cert_path = tmp_path / f"out_{subject[:8]}.crt" + + req = x509.load_pem_x509_csr(csr_path.read_bytes()) + cn_attr = req.subject.get_attributes_for_oid(NameOID.COMMON_NAME)[0] + cn_raw = cn_attr.value + if isinstance(cn_raw, bytes): + cn_str: str = cn_raw.decode() + else: + cn_str = str(cn_raw) + san_txt = f"subjectAltName=DNS:{cn_str}\n" + + subprocess.run( + [ + "openssl", + "x509", + "-req", + "-in", + str(csr_path), + "-CA", + str(ca_cert_path), + "-CAkey", + str(ca_key_path), + "-CAcreateserial", + "-out", + str(cert_path), + "-days", + "365", + "-extfile", + "/dev/stdin", + ], + input=san_txt, + text=True, + check=True, + capture_output=True, + ) + cert_pem = cert_path.read_text() + cert_b64 = base64.b64encode(cert_pem.encode()).decode() + certificates[subject] = {"certificate": cert_b64} + + if not certificates: + logger.warning("No certificates signed; skipping unit_certs") + return + + with tempfile.NamedTemporaryFile( + mode="w", dir=str(Path.home()), suffix=".yaml", delete=False + ) as f: + manifest_data = { + "features": { + "tls": { + "ca": { + "config": {"certificates": certificates}, + }, + }, + }, + } + yaml.dump(manifest_data, f, default_flow_style=False, sort_keys=False) + manifest_path = f.name + + try: + logger.info( + "Pushing signed certificates via 'sunbeam tls ca unit_certs -m %s'...", + manifest_path, + ) + subprocess.run( + [sunbeam_cmd, "tls", "ca", "unit_certs", "-m", manifest_path], + check=True, + capture_output=True, + text=True, + ) + finally: + Path(manifest_path).unlink(missing_ok=True) + + def validate_feature_behavior(self) -> None: + """Check public/internal endpoints use HTTPS and image list works. + + This mirrors the verification logic used in the TLS Vault lifecycle + tests and the upstream documentation: + - Public endpoints must use HTTPS. + - Internal endpoints (when present) must use HTTPS. + - A basic OpenStack operation (image list) must succeed. + """ + self._ensure_openstack_env() + logger.info("Verifying public endpoints use HTTPS (TLS CA mode)...") + result = subprocess.run( + [ + "openstack", + "endpoint", + "list", + "--interface", + "public", + "-c", + "URL", + "-f", + "value", + ], + check=True, + capture_output=True, + text=True, + ) + public_urls = [ + u.strip() for u in (result.stdout or "").splitlines() if u.strip() + ] + if not public_urls: + raise AssertionError( + "openstack endpoint list --interface public returned no URLs " + "(TLS CA mode)" + ) + public_https = [u for u in public_urls if u.startswith("https://")] + if not public_https: + raise AssertionError( + "TLS CA feature appears inactive: no HTTPS endpoints found in " + "public interface. Sample URLs: " + ", ".join(public_urls[:5]) + ) + logger.info("Found %d HTTPS public endpoints (TLS CA mode)", len(public_https)) + + logger.info("Verifying internal endpoints use HTTPS (TLS CA mode)...") + result = subprocess.run( + [ + "openstack", + "endpoint", + "list", + "--interface", + "internal", + "-c", + "URL", + "-f", + "value", + ], + check=True, + capture_output=True, + text=True, + ) + internal_urls = [ + u.strip() for u in (result.stdout or "").splitlines() if u.strip() + ] + if internal_urls: + internal_https = [u for u in internal_urls if u.startswith("https://")] + if not internal_https: + raise AssertionError( + "TLS CA feature appears inactive: no HTTPS endpoints found in " + "internal interface. Sample URLs: " + ", ".join(internal_urls[:5]) + ) + logger.info( + "Found %d HTTPS internal endpoints (TLS CA mode)", + len(internal_https), + ) + else: + logger.warning( + "No internal endpoints found; this may be normal for some deployments" + ) + + logger.info( + "Verifying basic OpenStack operations work over TLS (TLS CA mode)..." + ) + result = subprocess.run( + ["openstack", "image", "list"], + check=True, + capture_output=True, + text=True, + ) + + def run_full_lifecycle(self) -> bool: + """Enable, sign CSRs, unit_certs, validate, optional disable.""" + logger.info("Starting full lifecycle test for TLS CA") + if not self.enable(): + return False + self._sign_ca_csrs_and_install() + try: + self.verify_validate_feature_behavior() + except Exception: # noqa: BLE001 + logger.exception("TLS CA validation failed") + try: + self.disable() + except Exception: # noqa: BLE001 + pass + return False + if not self.disable_after: + logger.info("Leaving TLS CA enabled (disable_after is False)") + return True + self.disable() + return True + + +class TlsVaultTest(BaseFeatureTest): + """Test TLS Vault mode enablement/disablement. + + TLS Vault mode uses Vault for certificate management. CA material is generated + per generate-a-ca-certificate.rst: root CA + intermediate CA, chain (intermediate + then root), and the Vault CSR is signed with the intermediate via + ``openssl ca -config vault-ca.conf``. + + Prerequisites (per docs): + - Traefik hostnames must be configured. + - Vault feature must be enabled. + - Vault charm must be initialised, unsealed and authorised. + + run_full_lifecycle() does: enable with --ca/--ca-chain (intermediate + chain), + list CSRs, sign Vault CSR with intermediate, unit_certs, then validate and disable. + """ + + feature_name = "tls" + enable_args: list[str] = [] + # Per docs, disable is done via `sunbeam disable tls vault`. + disable_args: list[str] = ["vault"] + expected_applications: list[str] = [] + timeout_seconds = 600 + + def __init__(self, *args, **kwargs): + """Initialise and generate CA material for TLS Vault enablement. + + Docs require providing a CA certificate and CA chain when enabling + TLS Vault, so we mirror the TLS CA test by generating a simple + self-signed CA on the fly and passing it via --ca / --ca-chain. + """ + super().__init__(*args, **kwargs) + self.ca_cert_base64, self.ca_chain_base64 = ( + generate_self_signed_ca_certificate() + ) + self.enable_args = [ + "vault", + "--ca", + self.ca_cert_base64, + "--ca-chain", + self.ca_chain_base64, + "--endpoint", + "public", + "--endpoint", + "internal", + "--endpoint", + "rgw", + ] + + def verify_enabled(self) -> None: + """Optionally verify Vault-related resources. + + Currently kept minimal; no specific applications are asserted beyond + successful enablement and behavioral checks. + """ + if not self.expected_applications: + return + + for app in self.expected_applications: + self.juju.wait_for_application(app, timeout=self.timeout_seconds) + + def validate_feature_behavior(self) -> None: + """High-level TLS Vault behavior checks. + + - Public and internal endpoints use HTTPS. + - A basic OpenStack CLI operation (image list) works. + """ + self._ensure_openstack_env() + logger.info("Verifying public endpoints use HTTPS (TLS Vault mode)...") + cmd = [ + "openstack", + "endpoint", + "list", + "--interface", + "public", + "-c", + "URL", + "-f", + "value", + ] + result = subprocess.run(cmd, check=True, capture_output=True, text=True) + public_urls = [ + line.strip() for line in result.stdout.splitlines() if line.strip() + ] + + if not public_urls: + raise AssertionError( + "openstack endpoint list --interface public returned no URLs" + ) + + public_https_urls = [u for u in public_urls if u.startswith("https://")] + if not public_https_urls: + raise AssertionError( + "TLS Vault feature appears inactive: no HTTPS endpoints found in " + "public interface. Sample URLs: " + ", ".join(public_urls[:5]) + ) + + logger.info( + "Found %d HTTPS public endpoints (TLS Vault mode)", + len(public_https_urls), + ) + + logger.info("Verifying internal endpoints use HTTPS (TLS Vault mode)...") + cmd = [ + "openstack", + "endpoint", + "list", + "--interface", + "internal", + "-c", + "URL", + "-f", + "value", + ] + result = subprocess.run(cmd, check=True, capture_output=True, text=True) + internal_urls = [ + line.strip() for line in result.stdout.splitlines() if line.strip() + ] + + if internal_urls: + internal_https_urls = [u for u in internal_urls if u.startswith("https://")] + if not internal_https_urls: + raise AssertionError( + "TLS Vault feature appears inactive: no HTTPS endpoints found in " + "internal interface. Sample URLs: " + ", ".join(internal_urls[:5]) + ) + logger.info( + "Found %d HTTPS internal endpoints (TLS Vault mode)", + len(internal_https_urls), + ) + else: + logger.warning( + "No internal endpoints found; this may be normal for some deployments" + ) + + logger.info( + "Verifying basic OpenStack operations work over TLS (TLS Vault mode)..." + ) + cmd = ["openstack", "image", "list"] + result = subprocess.run(cmd, check=True, capture_output=True, text=True) + if result.returncode != 0: + raise AssertionError( + "Basic OpenStack operation failed after TLS Vault enablement: " + f"openstack image list returned error: {result.stderr}" + ) + logger.info("Basic OpenStack operations verified (TLS Vault mode)") + + def _generate_vault_ca_material(self) -> VaultCaMaterial: + """Generate root + intermediate CA for TLS Vault. + + See generate-a-ca-certificate.rst. Returns (ca_cert_base64, ca_chain_base64, + inter_cert_pem, inter_key_pem, + vault_ca_conf, certindex_content, certserial_content). + """ + return generate_root_and_intermediate_ca_for_vault() + + def _sign_vault_csrs_and_install(self) -> None: + """Sign Vault CSR with intermediate CA, inject via sunbeam tls vault unit_certs. + + Per generate-a-ca-certificate.rst: sign Vault CSR with intermediate CA + via: openssl ca -batch -config vault-ca.conf -notext -in vault.csr + -out vault.crt + """ + sunbeam_cmd = getattr(self.sunbeam, "_sunbeam_cmd", "sunbeam") + + max_attempts = 10 + backoff_seconds = 15 + records: list = [] + for attempt in range(1, max_attempts + 1): + logger.info( + "Listing outstanding TLS Vault CSRs (attempt %d/%d)...", + attempt, + max_attempts, + ) + result = subprocess.run( + [ + sunbeam_cmd, + "tls", + "vault", + "list_outstanding_csrs", + "--format", + "yaml", + ], + check=True, + capture_output=True, + text=True, + ) + raw = yaml.safe_load(result.stdout or "") or [] + records = raw if isinstance(raw, list) else [] + if records: + break + if attempt < max_attempts: + logger.info( + "No CSRs yet (Traefik may still be coming up); retrying in %ds...", + backoff_seconds, + ) + time.sleep(backoff_seconds) + + if not records: + logger.info( + "No outstanding TLS Vault CSRs found after %d attempts; " + "skipping unit_certs step.", + max_attempts, + ) + return + + first = records[0] if isinstance(records[0], dict) else {} + csr_pem = first.get("csr") + unit_name = first.get("unit_name") or first.get("app_name") or "vault/0" + if not csr_pem: + logger.warning( + "First TLS Vault CSR record had no 'csr'; skipping unit_certs" + ) + return + if len(records) > 1: + logger.warning( + "Multiple TLS Vault CSRs found; signing the first only for this test." + ) + logger.info( + "Signing Vault CSR for unit %s with intermediate CA (openssl ca -config vault-ca.conf)", + unit_name, + ) + + with tempfile.TemporaryDirectory() as tmpdir: + tmp_path = Path(tmpdir) + # Recreate CA dir state for intermediate to sign the Vault CSR. + (tmp_path / "vault-ca.conf").write_text(self._vault_ca_conf) + (tmp_path / "interca1.crt").write_text(self._vault_ca_cert_pem) + (tmp_path / "interca1.key").write_text(self._vault_ca_key_pem) + (tmp_path / "certindex").write_text(self._vault_certindex) + (tmp_path / "certserial").write_text(self._vault_certserial) + (tmp_path / "crlnumber").write_text("1000\n") + (tmp_path / "vault.csr").write_text(str(csr_pem).strip() + "\n") + + subprocess.run( + [ + "openssl", + "ca", + "-batch", + "-config", + "vault-ca.conf", + "-notext", + "-in", + "vault.csr", + "-out", + "vault.crt", + ], + check=True, + capture_output=True, + cwd=tmp_path, + ) + + vault_cert_pem = (tmp_path / "vault.crt").read_text() + + vault_cert_b64 = base64.b64encode(vault_cert_pem.encode()).decode() + + logger.info( + "Injecting signed Vault certificate via 'sunbeam tls vault unit_certs'..." + ) + subprocess.run( + [sunbeam_cmd, "tls", "vault", "unit_certs"], + input=f"{vault_cert_b64}\n", + check=True, + capture_output=True, + text=True, + ) + + def run_full_lifecycle(self) -> bool: + """Enable TLS Vault, run minimal checks, then disable it. + + Focuses on correct, deterministic flow: + - Ensure Vault prerequisites are met. + - Enable TLS Vault. + - Run minimal behaviour checks. + - Disable TLS Vault (failure logged but not fatal). + """ + if not ensure_vault_prerequisites(self.sunbeam, self.juju): + logger.error("Failed to set up Vault prerequisites for TLS Vault") + return False + + # Generate root + intermediate CA. + ( + self.ca_cert_base64, + self.ca_chain_base64, + self._vault_ca_cert_pem, + self._vault_ca_key_pem, + self._vault_ca_conf, + self._vault_certindex, + self._vault_certserial, + ) = self._generate_vault_ca_material() + self.enable_args = [ + "vault", + "--ca", + self.ca_cert_base64, + "--ca-chain", + self.ca_chain_base64, + "--endpoint", + "public", + "--endpoint", + "internal", + "--endpoint", + "rgw", + ] + + self.enable() + + # Automate the external CA flow: list CSRs, sign with our CA, and + # provide the signed certificate back to Vault. + self._sign_vault_csrs_and_install() + + self.verify_enabled() + self.validate_feature_behavior() + + disable_success = self.disable() + if not disable_success: + logger.warning("TLS Vault disable failed, but continuing test sequence") + + return True diff --git a/sunbeam-python/tests/functional/feature/features/validation.py b/sunbeam-python/tests/functional/feature/features/validation.py new file mode 100644 index 000000000..c752af5f9 --- /dev/null +++ b/sunbeam-python/tests/functional/feature/features/validation.py @@ -0,0 +1,38 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Test for validation feature.""" + +import logging +import subprocess + +from .base import BaseFeatureTest + +logger = logging.getLogger(__name__) + + +class ValidationTest(BaseFeatureTest): + """Test validation feature enablement/disablement.""" + + feature_name = "validation" + expected_applications: list[str] = [] + timeout_seconds = 900 + + def verify_validate_feature_behavior(self) -> None: + """Validate that the validation CLI is usable.""" + logger.info("Verifying validation feature via `sunbeam validation profiles`...") + try: + subprocess.run( + ["sunbeam", "validation", "profiles"], + capture_output=True, + text=True, + timeout=60, + check=True, + ) + except Exception as exc: # noqa: BLE001 + logger.warning("Error while verifying validation feature: %s", exc) + raise AssertionError( + f"Validation feature verification failed: {exc}" + ) from exc + + logger.info("Validation feature verified via `sunbeam validation profiles`") diff --git a/sunbeam-python/tests/functional/feature/features/vault.py b/sunbeam-python/tests/functional/feature/features/vault.py new file mode 100644 index 000000000..ca252bfb6 --- /dev/null +++ b/sunbeam-python/tests/functional/feature/features/vault.py @@ -0,0 +1,229 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Tests for the Vault feature. + +Vault provides the HashiCorp Vault service used by other features. +Functionality is validated via the `sunbeam vault` commands. +""" + +import json +import logging +import subprocess +import time +from typing import Optional, Tuple + +from .base import BaseFeatureTest + +logger = logging.getLogger(__name__) + + +def _ensure_vault_enabled(sunbeam, juju) -> bool: + """Ensure the Vault feature is enabled and the app exists. + + After ``sunbeam enable vault`` the units are expected to be in a + blocked state until Vault is initialised, unsealed and authorised, + so we do *not* wait for an active workload state here. + """ + if juju.has_application("vault"): + logger.info("Vault is already enabled") + return True + + logger.info("Enabling Vault feature...") + # Use the feature helper so this goes through the same path as other tests. + sunbeam.enable_feature("vault") + logger.info("Vault feature enabled") + + logger.info( + "Waiting for Vault application to appear; units may stay blocked " + "until initialisation and unsealing are completed." + ) + juju.wait_for_application("vault", timeout=300) + return True + + +def _initialize_vault(sunbeam) -> Tuple[bool, Optional[str], Optional[str]]: + """Initialise Vault and extract keys per docs (KEY_SHARES KEY_THRESHOLD). + + Returns (success, unseal_key, root_token). If Vault is already + initialised, sunbeam returns empty JSON and we have no keys to unseal. + """ + logger.info("Initialising Vault (1 key share, 1 threshold)...") + sunbeam_cmd = getattr(sunbeam, "_sunbeam_cmd", "sunbeam") + + init_result = subprocess.run( + [sunbeam_cmd, "vault", "init", "-f", "json", "1", "1"], + capture_output=True, + text=True, + timeout=120, + ) + + if init_result.returncode != 0: + stderr_lower = (init_result.stderr or "").lower() + if ( + "already initialized" in stderr_lower + or "already been initialized" in stderr_lower + ): + logger.error( + "Vault is already initialised; sunbeam did not return keys. " + "Run 'sunbeam disable vault' then 'sunbeam enable vault', " + "then run this test again to perform a fresh init and unseal." + ) + return (False, None, None) + logger.error("Failed to initialise Vault: %s", init_result.stderr) + return (False, None, None) + + vault_data = json.loads(init_result.stdout or "{}") + unseal_keys = vault_data.get("unseal_keys_b64") or vault_data.get("unseal_keys", []) + unseal_key = unseal_keys[0] if unseal_keys else None + root_token = vault_data.get("root_token") + + if unseal_key and root_token: + logger.info("Vault initialised successfully; keys obtained") + return (True, unseal_key, root_token) + + logger.error( + "Vault init returned no keys (Vault may already be initialised). " + "Run 'sunbeam disable vault' then 'sunbeam enable vault', then run " + "this test again to perform a fresh init and unseal." + ) + return (False, None, None) + + +def _unseal_vault(sunbeam, unseal_key: Optional[str]) -> None: + """Unseal Vault per docs: KEY_THRESHOLD times for leader, then non-leaders. + + With 1 key and 1 threshold we run unseal twice (leader then non-leaders). + """ + if not unseal_key: + return + + sunbeam_cmd = getattr(sunbeam, "_sunbeam_cmd", "sunbeam") + # First run unseals leader; second run unseals non-leader units (docs). + for attempt in (1, 2): + logger.info("Unseal run %d (leader then non-leaders)...", attempt) + result = subprocess.run( + [sunbeam_cmd, "vault", "unseal", "-"], + input=unseal_key, + capture_output=True, + text=True, + timeout=300, + ) + if result.returncode != 0: + raise RuntimeError( + f"sunbeam vault unseal failed (exit {result.returncode}): " + f"{result.stderr or result.stdout}" + ) + if attempt == 1 and "Rerun" in (result.stdout or ""): + time.sleep(15) + logger.info("Vault unsealing completed") + + +def _authorise_charm(sunbeam, root_token: Optional[str]) -> None: + """Authorise Vault charm with root token (docs: authorize-charm -).""" + if not root_token: + return + + logger.info("Authorising Vault charm with root token...") + sunbeam_cmd = getattr(sunbeam, "_sunbeam_cmd", "sunbeam") + + result = subprocess.run( + [sunbeam_cmd, "vault", "authorize-charm", "-"], + input=root_token, + capture_output=True, + text=True, + timeout=300, + ) + + if result.returncode != 0: + raise RuntimeError( + f"sunbeam vault authorize-charm failed (exit {result.returncode}): " + f"{result.stderr or result.stdout}" + ) + logger.info("Vault charm authorised") + + +def _disable_vault_and_wait(sunbeam, juju, timeout: int = 300) -> bool: + """Disable Vault and wait until the vault app is gone. Returns True on success.""" + sunbeam_cmd = getattr(sunbeam, "_sunbeam_cmd", "sunbeam") + logger.info("Disabling Vault to allow a fresh init (no keys from previous run)...") + result = subprocess.run( + [sunbeam_cmd, "disable", "vault"], + capture_output=True, + text=True, + timeout=120, + ) + if result.returncode != 0: + logger.error("sunbeam disable vault failed: %s", result.stderr or result.stdout) + return False + deadline = time.time() + timeout + while time.time() < deadline: + if not juju.has_application("vault"): + logger.info("Vault application removed") + return True + time.sleep(5) + logger.error("Vault application did not disappear within %s seconds", timeout) + return False + + +def ensure_vault_prerequisites(sunbeam, juju) -> bool: + """Ensure Vault is ready (active). + + If Vault is already enabled (app exists), init/unseal/authorize were done + when it was first enabled, so we only wait for the app to be ready. + If Vault is not yet enabled, we enable it then run init, unseal, authorize, + and wait for active. If init fails (Vault already initialised), we try + disable/re-enable/retry once (only when no other feature depends on vault). + """ + already_enabled = juju.has_application("vault") + if not _ensure_vault_enabled(sunbeam, juju): + return False + + if already_enabled: + logger.info( + "Vault was already enabled; skipping init/unseal/authorize and " + "waiting for application ready." + ) + juju.wait_for_application_ready("vault", timeout=360) + return True + + success, unseal_key, root_token = _initialize_vault(sunbeam) + if not success and juju.has_application("vault"): + if not _disable_vault_and_wait(sunbeam, juju): + return False + if not _ensure_vault_enabled(sunbeam, juju): + return False + success, unseal_key, root_token = _initialize_vault(sunbeam) + if not success: + return False + + _unseal_vault(sunbeam, unseal_key) + _authorise_charm(sunbeam, root_token) + + logger.info( + "Waiting for Vault units to become active (docs: update-status-interval, e.g. 5 min)..." + ) + juju.wait_for_application_ready("vault", timeout=360) + return True + + +class VaultTest(BaseFeatureTest): + """Test Vault feature enablement and readiness.""" + + feature_name = "vault" + expected_applications: list[str] = [] + timeout_seconds = 600 + + def verify_validate_feature_behavior(self) -> None: + """Validate that Vault is fully set up and reachable via sunbeam.""" + if not ensure_vault_prerequisites(self.sunbeam, self.juju): + raise AssertionError("Failed to set up Vault prerequisites") + + logger.info("Verifying Vault status via `sunbeam vault status`...") + try: + self.sunbeam.run(["vault", "status"]) + except Exception as exc: # noqa: BLE001 + logger.warning("Error while verifying Vault service: %s", exc) + raise AssertionError(f"Vault service verification failed: {exc}") from exc + + logger.info("Vault service verified via `sunbeam vault status`") diff --git a/sunbeam-python/tests/functional/feature/pytest.ini b/sunbeam-python/tests/functional/feature/pytest.ini new file mode 100644 index 000000000..acba1481b --- /dev/null +++ b/sunbeam-python/tests/functional/feature/pytest.ini @@ -0,0 +1,22 @@ +[pytest] +# Pytest configuration for feature functional tests + +# Test discovery patterns +python_files = test_*.py +python_classes = Test* +python_functions = test_* + +markers = + functional: marks tests as functional (deselect with '-m "not functional"') + slow: marks tests as slow (deselect with '-m "not slow"') + +addopts = + -v + --tb=short + --strict-markers + +timeout = 1800 + +log_cli = true +log_cli_level = INFO + diff --git a/sunbeam-python/tests/functional/feature/requirements.txt b/sunbeam-python/tests/functional/feature/requirements.txt new file mode 100644 index 000000000..7ef618ac2 --- /dev/null +++ b/sunbeam-python/tests/functional/feature/requirements.txt @@ -0,0 +1,4 @@ +pytest>=7.4.0 +pytest-timeout>=2.1.0 +pyyaml>=6.0 +jubilant>=1.0.0 diff --git a/sunbeam-python/tests/functional/feature/test_config.yaml.example b/sunbeam-python/tests/functional/feature/test_config.yaml.example new file mode 100644 index 000000000..2507a8535 --- /dev/null +++ b/sunbeam-python/tests/functional/feature/test_config.yaml.example @@ -0,0 +1,13 @@ +# Sunbeam Feature Functional Test Configuration +# Copy this file to test_config.yaml and fill in your values + +sunbeam: + # Deployment name in Sunbeam (from `sunbeam deployment list`) + deployment_name: "ps6" + +juju: + # Juju model name (default: "openstack") + model: "openstack" + # Juju controller (auto-detected from sunbeam if not specified) + # controller: "your-controller" + diff --git a/sunbeam-python/tests/functional/feature/test_features.py b/sunbeam-python/tests/functional/feature/test_features.py new file mode 100644 index 000000000..c9606edb2 --- /dev/null +++ b/sunbeam-python/tests/functional/feature/test_features.py @@ -0,0 +1,193 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Functional tests for Sunbeam features. + +These tests connect to an existing Sunbeam cluster and test feature +enablement/disablement lifecycle. +""" + +import logging + +import pytest + +from .features.baremetal import BaremetalTest +from .features.caas import CaaSTest +from .features.dns import DnsTest +from .features.images_sync import ImagesSyncTest +from .features.instance_recovery import InstanceRecoveryTest +from .features.ldap import LdapTest +from .features.loadbalancer import LoadbalancerTest +from .features.maintenance import MaintenanceTest +from .features.observability import ObservabilityTest +from .features.orchestration import OrchestrationTest +from .features.pro import ProTest +from .features.resource_optimization import ResourceOptimizationTest +from .features.secrets import SecretsTest +from .features.shared_filesystem import SharedFilesystemTest +from .features.telemetry import TelemetryTest +from .features.tls import TlsCaTest, TlsVaultTest +from .features.validation import ValidationTest +from .features.vault import VaultTest + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +@pytest.mark.functional +def test_instance_recovery(sunbeam_client, juju_client, test_config): + """Test instance-recovery feature lifecycle (enable/disable with verification).""" + feature_test = InstanceRecoveryTest(sunbeam_client, juju_client, test_config) + assert feature_test.run_full_lifecycle(), "Instance recovery feature test failed" + + +@pytest.mark.functional +@pytest.mark.skip( + reason=( + "Baremetal feature test is present but intentionally disabled in the " + "current feature flow (enable later when ready)." + ) +) +def test_baremetal(sunbeam_client, juju_client, test_config): + """Test baremetal feature lifecycle (enable/disable only).""" + feature_test = BaremetalTest(sunbeam_client, juju_client, test_config) + assert feature_test.run_full_lifecycle(), "Baremetal feature test failed" + + +@pytest.mark.functional +def test_caas(sunbeam_client, juju_client, test_config): + """Test caas feature lifecycle (enable/disable only).""" + feature_test = CaaSTest(sunbeam_client, juju_client, test_config) + assert feature_test.run_full_lifecycle(), "CaaS feature test failed" + + +@pytest.mark.functional +def test_dns(sunbeam_client, juju_client, test_config): + """Test dns feature lifecycle (enable/disable only).""" + feature_test = DnsTest(sunbeam_client, juju_client, test_config) + assert feature_test.run_full_lifecycle(), "DNS feature test failed" + + +@pytest.mark.functional +def test_images_sync(sunbeam_client, juju_client, test_config): + """Test images-sync feature lifecycle (enable/disable only).""" + feature_test = ImagesSyncTest(sunbeam_client, juju_client, test_config) + assert feature_test.run_full_lifecycle(), "Images-sync feature test failed" + + +@pytest.mark.functional +@pytest.mark.skip( + reason=( + "LDAP feature test is present but intentionally disabled in the " + "current feature flow (enable later when ready)." + ) +) +def test_ldap(sunbeam_client, juju_client, test_config): + """Test ldap feature lifecycle (enable/disable only).""" + feature_test = LdapTest(sunbeam_client, juju_client, test_config) + assert feature_test.run_full_lifecycle(), "LDAP feature test failed" + + +@pytest.mark.functional +def test_loadbalancer(sunbeam_client, juju_client, test_config): + """Test loadbalancer feature lifecycle (enable/disable only).""" + feature_test = LoadbalancerTest(sunbeam_client, juju_client, test_config) + assert feature_test.run_full_lifecycle(), "Loadbalancer feature test failed" + + +@pytest.mark.functional +def test_orchestration(sunbeam_client, juju_client, test_config): + """Test orchestration feature lifecycle (enable/disable only).""" + feature_test = OrchestrationTest(sunbeam_client, juju_client, test_config) + assert feature_test.run_full_lifecycle(), "Orchestration feature test failed" + + +@pytest.mark.functional +def test_resource_optimization(sunbeam_client, juju_client, test_config): + """Test resource-optimization feature lifecycle (enable/disable only).""" + feature_test = ResourceOptimizationTest(sunbeam_client, juju_client, test_config) + assert feature_test.run_full_lifecycle(), ( + "Resource-optimization feature test failed" + ) + + +@pytest.mark.functional +def test_shared_filesystem(sunbeam_client, juju_client, test_config): + """Test shared-filesystem feature lifecycle (enable/disable only).""" + feature_test = SharedFilesystemTest(sunbeam_client, juju_client, test_config) + assert feature_test.run_full_lifecycle(), "Shared-filesystem feature test failed" + + +@pytest.mark.functional +def test_secrets(sunbeam_client, juju_client, test_config): + """Test secrets feature lifecycle (enable/disable only).""" + feature_test = SecretsTest(sunbeam_client, juju_client, test_config) + assert feature_test.run_full_lifecycle(), "Secrets feature test failed" + + +@pytest.mark.functional +def test_telemetry(sunbeam_client, juju_client, test_config): + """Test telemetry feature lifecycle (enable/disable only).""" + feature_test = TelemetryTest(sunbeam_client, juju_client, test_config) + assert feature_test.run_full_lifecycle(), "Telemetry feature test failed" + + +@pytest.mark.functional +def test_observability(sunbeam_client, juju_client, test_config): + """Test observability feature lifecycle (enable/disable only).""" + feature_test = ObservabilityTest(sunbeam_client, juju_client, test_config) + assert feature_test.run_full_lifecycle(), "Observability feature test failed" + + +@pytest.mark.functional +@pytest.mark.skip( + reason=( + "Maintenance feature test is present but intentionally disabled in the " + "current feature flow (enable later when ready)." + ) +) +def test_maintenance(sunbeam_client, juju_client, test_config): + """Test maintenance feature lifecycle (enable/disable only).""" + feature_test = MaintenanceTest(sunbeam_client, juju_client, test_config) + assert feature_test.run_full_lifecycle(), "Maintenance feature test failed" + + +@pytest.mark.functional +@pytest.mark.skip( + reason=( + "Pro feature test is present but intentionally disabled in the " + "current feature flow (enable later when ready)." + ) +) +def test_pro(sunbeam_client, juju_client, test_config): + """Test pro feature lifecycle (enable/disable only).""" + feature_test = ProTest(sunbeam_client, juju_client, test_config) + assert feature_test.run_full_lifecycle(), "Pro feature test failed" + + +@pytest.mark.functional +def test_tls_ca(sunbeam_client, juju_client, test_config): + """Test TLS CA mode lifecycle (enable/disable with verification).""" + feature_test = TlsCaTest(sunbeam_client, juju_client, test_config) + assert feature_test.run_full_lifecycle(), "TLS CA feature test failed" + + +@pytest.mark.functional +def test_tls_vault(sunbeam_client, juju_client, test_config): + """Test TLS Vault mode lifecycle (enable/disable with minimal verification).""" + feature_test = TlsVaultTest(sunbeam_client, juju_client, test_config) + assert feature_test.run_full_lifecycle(), "TLS Vault feature test failed" + + +@pytest.mark.functional +def test_vault(sunbeam_client, juju_client, test_config): + """Test vault feature lifecycle (enable/disable only).""" + feature_test = VaultTest(sunbeam_client, juju_client, test_config) + assert feature_test.run_full_lifecycle(), "Vault feature test failed" + + +@pytest.mark.functional +def test_validation(sunbeam_client, juju_client, test_config): + """Test validation feature lifecycle (enable/disable only).""" + feature_test = ValidationTest(sunbeam_client, juju_client, test_config) + assert feature_test.run_full_lifecycle(), "Validation feature test failed" diff --git a/sunbeam-python/tests/functional/feature/utils/__init__.py b/sunbeam-python/tests/functional/feature/utils/__init__.py new file mode 100644 index 000000000..42e1068cd --- /dev/null +++ b/sunbeam-python/tests/functional/feature/utils/__init__.py @@ -0,0 +1,4 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Utility wrappers for Sunbeam feature functional tests.""" diff --git a/sunbeam-python/tests/functional/feature/utils/juju.py b/sunbeam-python/tests/functional/feature/utils/juju.py new file mode 100644 index 000000000..448e85da2 --- /dev/null +++ b/sunbeam-python/tests/functional/feature/utils/juju.py @@ -0,0 +1,135 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Juju CLI wrapper using Jubilant library for feature functional tests.""" + +import json +import logging +from typing import Dict, List, Optional, Set + +from jubilant import Juju + +logger = logging.getLogger(__name__) + + +class JujuClient: + """Client for interacting with Juju using Jubilant.""" + + def __init__(self, model: str = "openstack", controller: Optional[str] = None): + self.model = model + self.controller = controller + self._juju: Optional[Juju] = None + + @property + def juju(self) -> Juju: + """Get or create Jubilant Juju instance.""" + if self._juju is None: + self._juju = Juju() + if self.model: + try: + self._juju.cli("switch", self.model) + except Exception as exc: # noqa: BLE001 + # Log but continue - the model might already be active + logger.debug("Could not switch to model %s: %s", self.model, exc) + return self._juju + + def is_connected(self) -> bool: + """Check if we can connect to Juju.""" + result = self.juju.cli("status", "--format", "json") + return bool(result) + + def get_applications(self) -> Set[str]: + """Get list of all applications in the model.""" + result_str = self.juju.cli("status", "--format", "json") + status = json.loads(result_str) + applications: Set[str] = set() + + if "applications" in status: + applications.update(status["applications"].keys()) + + return applications + + def get_units(self) -> Set[str]: + """Get list of all units in the model.""" + result_str = self.juju.cli("status", "--format", "json") + status = json.loads(result_str) + units: Set[str] = set() + + if "applications" in status: + for app_data in status["applications"].values(): + if "units" in app_data: + for unit_name in app_data["units"].keys(): + units.add(unit_name) + + return units + + def has_application(self, application_name: str) -> bool: + """Check if an application exists.""" + applications = self.get_applications() + return application_name in applications + + def has_unit(self, unit_name: str) -> bool: + """Check if a unit exists.""" + units = self.get_units() + return unit_name in units + + def wait_for_application(self, application_name: str, timeout: int = 300) -> bool: + """Wait for an application to appear using Jubilant's wait mechanism.""" + if self.has_application(application_name): + logger.info( + "Application '%s' already exists, skipping wait", + application_name, + ) + return True + + def app_exists(status) -> bool: + return hasattr(status, "apps") and application_name in status.apps + + self.juju.wait(app_exists, timeout=timeout, delay=1.0) + return True + + def wait_for_unit(self, unit_name: str, timeout: int = 300) -> bool: + """Wait for a unit to appear using Jubilant's wait mechanism.""" + if self.has_unit(unit_name): + logger.info("Unit '%s' already exists, skipping wait", unit_name) + return True + + def unit_exists(status) -> bool: + if not hasattr(status, "apps"): + return False + for app_data in status.apps.values(): + if hasattr(app_data, "units") and unit_name in app_data.units: + return True + return False + + self.juju.wait(unit_exists, timeout=timeout, delay=1.0) + return True + + def wait_for_application_ready( + self, + application_name: str, + timeout: int = 600, + ) -> bool: + """Wait for an application to be in 'active' state.""" + + def app_active(status) -> bool: + if not hasattr(status, "apps") or application_name not in status.apps: + return False + app = status.apps[application_name] + return hasattr(app, "app_status") and app.app_status.current == "active" + + self.juju.wait(app_active, timeout=timeout, delay=1.0) + return True + + def verify_applications_exist( + self, + expected_applications: List[str], + ) -> Dict[str, bool]: + """Verify that expected applications exist.""" + actual_applications = self.get_applications() + results: Dict[str, bool] = {} + + for app in expected_applications: + results[app] = app in actual_applications + + return results diff --git a/sunbeam-python/tests/functional/feature/utils/sunbeam.py b/sunbeam-python/tests/functional/feature/utils/sunbeam.py new file mode 100644 index 000000000..7982f3de4 --- /dev/null +++ b/sunbeam-python/tests/functional/feature/utils/sunbeam.py @@ -0,0 +1,87 @@ +# SPDX-FileCopyrightText: 2024 - Canonical Ltd +# SPDX-License-Identifier: Apache-2.0 + +"""Sunbeam CLI wrapper for feature functional tests.""" + +import logging +import subprocess +from typing import List, Optional + +logger = logging.getLogger(__name__) + + +class SunbeamClient: + """Client for interacting with Sunbeam CLI.""" + + def __init__(self, deployment_name: str): + self.deployment_name = deployment_name + self._sunbeam_cmd = "/snap/bin/sunbeam" + + def _run_command(self, command: List[str]) -> subprocess.CompletedProcess: + """Run a sunbeam command and return the result.""" + full_command = [self._sunbeam_cmd] + command + logger.debug("Running: %s", " ".join(full_command)) + + result = subprocess.run( + full_command, + capture_output=True, + text=True, + check=False, + timeout=3600, + ) + + if result.returncode != 0: + logger.error( + "Command failed with exit code %d: %s", + result.returncode, + " ".join(full_command), + ) + if result.stderr: + logger.error("stderr: %s", result.stderr) + if result.stdout: + logger.error("stdout: %s", result.stdout) + result.check_returncode() + + return result + + def run(self, command: List[str]) -> subprocess.CompletedProcess: + """Public helper to run arbitrary sunbeam subcommands.""" + return self._run_command(command) + + def is_connected(self) -> bool: + """Check if we can connect to the Sunbeam deployment.""" + result = subprocess.run( + ["sunbeam", "deployment", "list"], + capture_output=True, + text=True, + timeout=30, + ) + return result.returncode == 0 and self.deployment_name in result.stdout + + def enable_feature( + self, + feature_name: str, + extra_args: Optional[List[str]] = None, + ) -> bool: + """Enable a Sunbeam feature.""" + cmd: List[str] = ["enable", feature_name] + if extra_args: + cmd.extend(extra_args) + + self._run_command(cmd) + logger.info("Feature '%s' enabled successfully", feature_name) + return True + + def disable_feature( + self, + feature_name: str, + extra_args: Optional[List[str]] = None, + ) -> bool: + """Disable a Sunbeam feature.""" + cmd: List[str] = ["disable", feature_name] + if extra_args: + cmd.extend(extra_args) + + self._run_command(cmd) + logger.info("Feature '%s' disabled successfully", feature_name) + return True diff --git a/sunbeam-python/tox.ini b/sunbeam-python/tox.ini index 8dc81acfa..5666df3e7 100644 --- a/sunbeam-python/tox.ini +++ b/sunbeam-python/tox.ini @@ -1,9 +1,6 @@ [tox] envlist = unit,pep8,mypy skipsdist = True -# Automatic envs (pyXX) will only use the python version appropriate to that -# env and ignore basepython inherited from [testenv] if we set -# ignore_basepython_conflict. ignore_basepython_conflict = True [vars] @@ -25,14 +22,8 @@ setenv = OS_STDOUT_CAPTURE=1 description = Sunbeam unit tests commands = uv run {[vars]uv_flags} python -m pytest -vv tests/unit {posargs} -# The functional tests may have specific hardware requirements and are currently -# skipped by default. [testenv:functional] description = Sunbeam functional tests -# The snap can't access /tmp, we'll need to place manifests and other temporary -# files in the home directory. At the same time, we need to expose USER/LOGNAME, -# otherwise the Sunbeam group won't be initialized correctly and the Sunbeam -# commands will fail due to missing privileges. passenv = USER LOGNAME USERNAME @@ -42,6 +33,18 @@ commands = uv run {[vars]uv_flags} \ --basetemp={env:HOME}/.local/share/openstack/tmp \ {posargs} +[testenv:functional-feature] +description = Sunbeam feature functional tests (existing deployment) +passenv = USER + LOGNAME + USERNAME + HOME +commands = uv run {[vars]uv_flags} \ + python -m pytest -s -vv tests/functional/feature \ + --config=test_config.yaml \ + --basetemp={env:HOME}/.local/share/openstack/tmp \ + {posargs} + [testenv:fmt] description = Apply coding style standards to code deps = @@ -60,9 +63,6 @@ commands = [testenv:mypy] commands = uv run {[vars]uv_flags} mypy {[vars]src_path}/sunbeam - # TODO: consider uncommenting the following line once - # the unit tests pass the mypy check. - # uv run {[vars]uv_flags} mypy {[vars]tst_path}/unit uv run {[vars]uv_flags} mypy {[vars]tst_path}/functional [testenv:cover] @@ -87,7 +87,6 @@ deps = commands = sphinx-build -a -E -W -d doc/build/doctrees -b html doc/source doc/build/html sphinx-build -a -E -W -d doc/build/doctrees -b man doc/source doc/build/man - # Validate redirects (must be done after the docs build whereto doc/build/html/.htaccess doc/test/redirect-tests.txt [testenv:releasenotes]