diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index 07595a261..f15a15ad8 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -183,11 +183,17 @@ jobs: with: go-version: 1.25.x - - name: Run unit-tests + - name: Run Go unit-tests run: | cd microceph make check-unit + - name: Run orchestrator Python unit-tests + run: | + python3 -m pip install ./microceph-orch pytest + cd microceph-orch + PYTHONPATH=src:tests python3 -m pytest tests/ -v + single-system-tests: name: Single node with encryption runs-on: ubuntu-22.04 @@ -237,7 +243,7 @@ jobs: sudo microceph.ceph mgr module enable microceph sudo microceph.ceph orch set backend microceph - - name: Verify Orch module + - name: Verify Orch module (basic) run: | set -eux hn=$(hostname) @@ -333,6 +339,11 @@ jobs: - name: Disable RGW run: ~/actionutils.sh disable_rgw + - name: Run orchestrator integration tests + run: | + set -eux + bash tests/scripts/test-orch.sh microceph.ceph + - name: Enable RGW with SSL enabled run: ~/actionutils.sh enable_rgw_ssl @@ -523,6 +534,16 @@ jobs: - name: Test RGW certificate rotation with --target (cross-node) run: ~/actionutils.sh headexec test_certificate_set_rgw_target node-wrk1 + - name: Enable orchestrator and run integration tests + run: | + set -eux + lxc exec node-wrk0 -- sh -c "microceph.ceph mgr module enable microceph" + lxc exec node-wrk0 -- sh -c "microceph.ceph orch set backend microceph" + sleep 10 + lxc exec node-wrk0 -- sh -c "microceph.ceph orch status" + lxc file push tests/scripts/test-orch.sh node-wrk0/tmp/test-orch.sh + lxc exec node-wrk0 -- bash /tmp/test-orch.sh microceph.ceph + - name: Print logs for failure if: failure() run: | diff --git a/.gitignore b/.gitignore index c0dcfe174..ad99d0017 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ _build .vscode microceph/coverage.out microceph/coverage.html +__pycache__/ +*.pyc diff --git a/microceph-orch/pyproject.toml b/microceph-orch/pyproject.toml index 41e2e24bb..e0a9d0181 100644 --- a/microceph-orch/pyproject.toml +++ b/microceph-orch/pyproject.toml @@ -12,3 +12,8 @@ dependencies = [ [tool.uv.sources] snap-helpers = { git = "https://github.com/albertodonato/snap-helpers" } + +[dependency-groups] +dev = [ + "pytest>=8.0", +] diff --git a/microceph-orch/src/microceph/client/cluster.py b/microceph-orch/src/microceph/client/cluster.py index 6601e999a..ed61cda85 100644 --- a/microceph-orch/src/microceph/client/cluster.py +++ b/microceph-orch/src/microceph/client/cluster.py @@ -18,7 +18,7 @@ def get_cluster_members(self) -> list: """ result = [] cluster = self._get("/core/1.0/cluster") - members = cluster.get("metadata", {}) + members = cluster.get("metadata") or [] keys = ["name", "address", "status"] for member in members: result.append({k: v for k, v in member.items() if k in keys}) @@ -55,22 +55,17 @@ class ExtendedAPIService(service.BaseService): def list_services(self) -> list[dict]: """List all services.""" services = self._get("/1.0/services") - return services.get("metadata") - - def list_resources(self) -> list[dict]: - """List all resources.""" - nodes = self._get("/1.0/resources") - return nodes.get("metadata") + return services.get("metadata") or [] def list_disks(self) -> list[dict]: """List all disks""" disks = self._get("/1.0/disks") - return disks.get("metadata") + return disks.get("metadata") or [] def get_status(self) -> dict[str, dict]: """Get status of the cluster.""" cluster = self._get("/1.0/status") - members = cluster.get("metadata", {}) + members = cluster.get("metadata") or [] return { member["name"]: { "status": member["status"], @@ -78,3 +73,67 @@ def get_status(self) -> dict[str, dict]: } for member in members } + + def enable_service(self, name: str, payload: str = "", wait: bool = True, + target: str | None = None) -> None: + """Enable a service on the cluster. + + Sends a PUT request to /1.0/services/ with an EnableService payload. + The Go API dispatches this to ServicePlacementHandler which runs the full + placement lifecycle: PopulateParams, HospitalityCheck, ServiceInit, + PostPlacementCheck, DbUpdate. + + :param name: service name (e.g. 'rgw', 'nfs', 'rbd-mirror') + :param payload: JSON string with service-specific parameters + :param wait: if True, the server waits for the service to be fully enabled + :param target: optional cluster member name. When set, the + server-side proxyTarget middleware (microcluster) forwards + the request over mTLS to that node, allowing per-host + service enablement from the local unix socket. When None, + no target is forwarded and the server handles the request + on the node receiving the unix-socket connection. + """ + # Note: The "bool" key maps to Go's EnableService.Wait field which has + # the struct tag `json:"bool"` (upstream naming quirk in MicroCeph). + params = {"target": target} if target is not None else None + self._put(f"/1.0/services/{name}", json={ + "name": name, + "bool": wait, + "payload": payload, + }, params=params) + + def delete_service(self, name: str, target: str | None = None) -> None: + """Delete/disable a service on the cluster. + + :param name: service name (e.g. 'rgw', 'nfs', 'rbd-mirror') + :param target: optional cluster member name; see enable_service. + """ + params = {"target": target} if target is not None else None + self._delete(f"/1.0/services/{name}", params=params) + + def restart_services(self, services: list[str], + target: str | None = None) -> None: + """Restart one or more services on the cluster. + + Sends a POST to /1.0/services/restart with a list of service objects. + + :param services: list of service names (e.g. ['mon', 'rgw']) + :param target: optional cluster member name; see enable_service. + """ + params = {"target": target} if target is not None else None + payload = [{"service": svc} for svc in services] + self._post("/1.0/services/restart", json=payload, params=params) + + def delete_nfs_service(self, cluster_id: str, + target: str | None = None) -> None: + """Delete/disable an NFS service by cluster ID. + + NFS deletion requires the cluster_id in the request body, unlike + other services which are identified by URL path alone. + + :param cluster_id: NFS cluster identifier + :param target: optional cluster member name; see enable_service. + """ + params = {"target": target} if target is not None else None + self._delete("/1.0/services/nfs", + json={"cluster_id": cluster_id}, params=params) diff --git a/microceph-orch/src/microceph/client/service.py b/microceph-orch/src/microceph/client/service.py index 4eac0f4bc..e1965d841 100644 --- a/microceph-orch/src/microceph/client/service.py +++ b/microceph-orch/src/microceph/client/service.py @@ -145,8 +145,13 @@ def _request(self, method, path, **kwargs): # noqa: C901 too complex try: response.raise_for_status() except HTTPError as e: - # Do some nice translating to microclusterdexceptions - error = response.json().get("error") + # Do some nice translating to microclusterdexceptions. + # The error body is normally JSON with an "error" key, but a + # non-JSON body or a missing key must not mask the real HTTPError. + try: + error = response.json().get("error") or "" + except ValueError: + error = "" if "remote with name" in error: raise NodeAlreadyExistsException( "Already node exists in the microcluster" diff --git a/microceph-orch/src/microceph/module.py b/microceph-orch/src/microceph/module.py index 403c7f028..f2bc7babc 100644 --- a/microceph-orch/src/microceph/module.py +++ b/microceph-orch/src/microceph/module.py @@ -31,6 +31,8 @@ CLICommandMeta, handle_orch_error, OrchResult, + OrchestratorError, + OrchestratorValidationError, ) from .client.client import Client @@ -45,6 +47,37 @@ 'nfs': NFSServiceSpec, } +# Services for which MicroCeph's backend supports restart. The Go +# serviceWorkerTable (microceph/ceph/services.go) only registers handlers +# for osd, mon and rgw; restarting any other service raises +# "no handler defined for service X" at the backend. +RESTART_SUPPORTED_SERVICES = frozenset({"osd", "mon", "rgw"}) + +# Services for which MicroCeph distinguishes individual instances via a +# `service_id` (the `.` dotted form). Currently only NFS, where +# the id is the NFS cluster_id. Any other service is deployed as a single +# bare instance per node; using a dotted name on them is rejected up front +# so operators get a clear error instead of an apparently successful +# operation that silently targets the wrong scope. +DOTTED_NAME_SUPPORTED_SERVICES = frozenset({"nfs"}) + + +def _require_bare_service_name(svc_type: str, svc_id: str) -> None: + """Reject dotted service names for services that don't support them. + + Raises ValueError if `svc_id` is non-empty and `svc_type` is not in + DOTTED_NAME_SUPPORTED_SERVICES. + """ + if svc_id and svc_type not in DOTTED_NAME_SUPPORTED_SERVICES: + supported = ", ".join(sorted(DOTTED_NAME_SUPPORTED_SERVICES)) + raise ValueError( + f"Service type '{svc_type}' does not support a service_id; " + f"MicroCeph deploys a single bare '{svc_type}' per node. " + f"Use '{svc_type}' (without an id) instead of " + f"'{svc_type}.{svc_id}'. Dotted names are only valid for: " + f"{supported}." + ) + class MicroCephOrchestrator(Orchestrator, MgrModule, metaclass=CLICommandMeta): @@ -109,6 +142,8 @@ def available(self) -> Tuple[bool, str, Dict[str, Any]]: self.microceph.status.is_available() except RemoteException as e: return False, f"Cannot reach the MicroCeph API: {e}", {} + except Exception as e: + return False, f"Unexpected error reaching MicroCeph API: {e}", {} return True, "", {} @@ -130,8 +165,22 @@ def get_hosts(self) -> List[HostSpec]: """ specs = [] for m in self.microceph.cluster.get_cluster_members(): - addr, _, _ = m['address'].rpartition(":") - specs.append(HostSpec(m['name'], addr, status=m['status'])) + # microcluster addresses are "host:port"; IPv6 uses the + # bracketed "[addr]:port" form. Strip the port without + # corrupting a bare IPv6 literal (which also contains ":"). + address = m.get('address', '') + if address.startswith('['): + addr = address[1:].partition(']')[0] + elif address.count(':') == 1: + addr = address.rsplit(':', 1)[0] + else: + # No port present (hostname-only or bare IPv6 literal). + addr = address + specs.append(HostSpec( + m.get('name', ''), + addr, + status=m.get('status', 'unknown'), + )) return specs @@ -139,19 +188,28 @@ def _get_service_hostlist(self, recorded_services: list) -> dict: """Get a dict describing the distribution of services""" service_hostlist = defaultdict(list) for record in recorded_services: - service_name = record['service'] if not record['group_id'] else f"{record['service']}.{record['group_id']}" - service_host = record['location'] + service = record.get('service', '') + group_id = record.get('group_id', '') + service_name = service if not group_id else f"{service}.{group_id}" + service_host = record.get('location', '') service_hostlist[service_name].append(service_host) - logger.info(f"microcephs record service({service_name}) at ({service_host}) configured({record['info']})") + logger.debug( + f"microceph record service({service_name}) at " + f"({service_host}) configured({record.get('info', '')})" + ) return service_hostlist - def _elaborate_service(self, service: str): - """Elaborate a service into id and type""" - if '.' in service: - segments = service.split('.') - return segments[0], segments[1] - else: - return service, "" + @staticmethod + def _parse_service_name(service_name: str) -> Tuple[str, str]: + """Split a service name into (type, id). + + Handles dotted names like 'nfs.my.cluster' correctly by splitting + on the first dot only. + + :return: (service_type, service_id); service_id is '' if no dot. + """ + svc_type, _, svc_id = service_name.partition('.') + return svc_type, svc_id @handle_orch_error def describe_service(self, @@ -169,12 +227,14 @@ def describe_service(self, service_descs = [] for svc_name, hostlist in service_hostlist.items(): spec = None - svc_type, svc_id = self._elaborate_service(svc_name) - logger.info(f"{svc_name} under description for filter {service_type}") + svc_type, svc_id = self._parse_service_name(svc_name) + logger.debug(f"{svc_name} under description for filter {service_type}") - # skip unrelated services if a specific daemon type is requested. + # Apply filters if service_type and svc_type != service_type: continue + if service_name and svc_name != service_name: + continue if svc_type in daemon_spec_map: spec = daemon_spec_map[svc_type]( @@ -185,9 +245,15 @@ def describe_service(self, service_id=svc_id, service_type=svc_type, placement=PlacementSpec(hosts=hostlist, count=len(hostlist)) ) + # `size` is the desired daemon count; `running` is the actual. + # MicroCeph's desired state IS the current set of hosts running + # the service (no separate spec store), so they match. Setting + # `size` explicitly avoids `ceph orch ls` rendering `RUNNING/-`, + # which looks like a degraded/incomplete deployment. service_descs.append(ServiceDescription( spec=spec, - running=len(hostlist) + running=len(hostlist), + size=len(hostlist), )) return service_descs @@ -207,21 +273,33 @@ def list_daemons(self, services = self.microceph.services.list_services() descriptions = [] for svc in services: - svc_daemon_type = svc['service'] - svc_hostname = svc['location'] - svc_group_ip = svc['group_id'] + svc_daemon_type = svc.get('service', '') + svc_hostname = svc.get('location', '') + svc_group_id = svc.get('group_id', '') svc_ip = None svc_ports = None - svc_name = f"{svc_daemon_type}.{svc_group_ip}" if svc_group_ip else svc_daemon_type - if daemon_type: - if svc_daemon_type != daemon_type: - continue + svc_name = f"{svc_daemon_type}.{svc_group_id}" if svc_group_id else svc_daemon_type + + # Apply filters + if daemon_type and svc_daemon_type != daemon_type: + continue + if host and svc_hostname != host: + continue + if daemon_id and svc_hostname != daemon_id: + continue + if service_name and svc_name != service_name: + continue + + # Extract NFS-specific info (bind address and port) + if svc_daemon_type == 'nfs' and svc.get('info'): + try: + info = json.loads(svc['info']) + bind_addr = info.get('bind_address', '') + svc_ip = None if '0.0.0.0' in bind_addr else bind_addr or None + svc_ports = [info['bind_port']] if 'bind_port' in info else None + except (json.JSONDecodeError, KeyError) as e: + logger.warning(f"Failed to parse NFS service info for {svc_hostname}: {e}") - if svc_daemon_type == 'nfs': - info = json.loads(svc['info']) - svc_ip = None if "0.0.0.0" in info['bind_address'] else info['bind_address'] - svc_ports = [info['bind_port']] - descriptions.append(DaemonDescription( service_name=svc_name, daemon_type=svc_daemon_type, @@ -231,7 +309,7 @@ def list_daemons(self, ports=svc_ports )) - logger.info(descriptions) + logger.debug(f"list_daemons returning {len(descriptions)} daemons") return descriptions @handle_orch_error @@ -239,39 +317,480 @@ def get_inventory(self, host_filter: Optional[InventoryFilter] = None, refresh: bool = False ) -> List[InventoryHost]: + """Report storage device inventory per host. + + Devices are sourced from the cluster-wide OSD list (/1.0/disks), so + every reported device backs an existing OSD and is marked + unavailable. Discovery of free/unused disks is not exposed here: + /1.0/resources reports only the local node and the socket client + cannot proxy to peers (see the _apply_service note on targeting). + Hosts with no OSD disks are still listed so callers see every member. + + Only `host_filter.hosts` is honored; `host_filter.labels` raises + NotImplementedError because MicroCeph has no host-label concept. + """ + + # Resolve which hosts to include based on the filter. + filter_hosts = None + if host_filter: + if host_filter.labels: + raise OrchestratorValidationError( + "MicroCeph orchestrator does not support host labels" + ) + if host_filter.hosts: + filter_hosts = set(host_filter.hosts) + + osd_disks = self.microceph.services.list_disks() - disks = self.microceph.services.list_disks() - disks_by_host = defaultdict(list) - for d in disks: - disks_by_host[d['location']].append( - Device(path=d['path']) + # Build inventory from OSD disk list (cluster-wide source of truth + # for which hosts exist and what disks they have). + devices_by_host: Dict[str, List[Device]] = defaultdict(list) + for d in osd_disks: + host = d.get('location', '') + if filter_hosts and host not in filter_hosts: + continue + devices_by_host[host].append( + Device(path=d.get('path', ''), available=False) ) + # Ensure all cluster members appear even if they have no OSD disks. + try: + for member in self.microceph.cluster.get_cluster_members(): + host = member.get('name', '') + if filter_hosts and host not in filter_hosts: + continue + if host not in devices_by_host: + devices_by_host[host] = [] + except RemoteException: + pass + inventory = [] - for host, diskettes in disks_by_host.items(): + for host, devs in devices_by_host.items(): inventory.append(InventoryHost( name=host, - devices=Devices(diskettes) + devices=Devices(devs) )) return inventory + def _get_placement_hosts(self, spec: ServiceSpec) -> List[str]: + """Extract target hosts from a ServiceSpec's placement. + + Returns the list of hostnames from the placement spec. + Raises ValueError if no hosts are specified; callers must + provide explicit placement. + """ + hosts = [] + if spec.placement and spec.placement.hosts: + hosts = [h.hostname if hasattr(h, 'hostname') else str(h) for h in spec.placement.hosts] + + if not hosts: + raise ValueError( + f"No placement hosts specified for {spec.service_type}. " + "Explicit host placement is required." + ) + + return hosts + + def _get_existing_service_hosts(self, service_type: str, + group_id: Optional[str] = None) -> set: + """Return the set of hostnames that already run the given service. + + When group_id is provided (e.g. an NFS cluster id), only services + matching both the type and that group are counted, so distinct NFS + clusters are not conflated. + + A failure to list services from the backend is propagated to the + caller rather than swallowed: returning an empty set on error + would otherwise produce false "no-op" successes and re-apply + services that are in fact already running. + """ + services = self.microceph.services.list_services() + return { + svc.get('location', '') for svc in services + if svc.get('service') == service_type + and (group_id is None or svc.get('group_id', '') == group_id) + } + + def _apply_service(self, service_type: str, spec: ServiceSpec, + payload: str, group_id: Optional[str] = None) -> str: + """Common logic for applying (enabling) a service. + + Per-host targeting: MicroCeph's HTTP API endpoints for service + enable/delete/restart all declare ProxyTarget=true in the Go + rest layer. The microcluster proxyTarget middleware inspects the + `?target=` query parameter and forwards the request over + mTLS to that member's HTTPS endpoint. The Python client uses the + local unix socket; the server transparently proxies per-host + calls from there, so no direct HTTPS connectivity from the orch + module is required. + + We iterate requested placement hosts, skip any host that already + runs the service, and call enable_service once per remaining + host with `target=`. Any per-host failure raises so the + operator gets a visible error rather than a partial-success + result that the Ceph orchestrator framework would render as + green. + + Note: `service_type` is the bare service type (e.g. "rgw", + "nfs"), never the dotted form. Callers that accept dotted names + are expected to parse and pass `group_id` separately. + + Returns a summary string on success. + Raises RuntimeError if any host fails to enable. + """ + hosts = self._get_placement_hosts(spec) + existing = self._get_existing_service_hosts(service_type, group_id) + + # If every requested host already runs the service, there is + # nothing to do. + if existing.issuperset(hosts): + skipped_str = ', '.join(hosts) + logger.info(f"Skipping {service_type}: already active on {skipped_str}") + return f"{service_type}: already active on {skipped_str}" + + to_enable = [h for h in hosts if h not in existing] + already_active = sorted(existing & set(hosts)) + + enabled: List[str] = [] + failures: List[str] = [] + for host in to_enable: + logger.info(f"Enabling {service_type} on {host}") + try: + self.microceph.services.enable_service( + name=service_type, + payload=payload, + wait=True, + target=host, + ) + enabled.append(host) + except Exception as e: + # Tolerate the TOCTOU window between the snapshot taken + # by _get_existing_service_hosts and the per-host enable + # call. The backend's genericHospitalityCheck + # (microceph/ceph/services_placement_generic.go) returns + # " service already active on host" if the service + # raced into existence between snapshot and enable; for + # an apply that is the desired end-state, not a failure. + if "already active on host" in str(e): + logger.info( + f"{service_type} became active on {host} after " + "snapshot; treating as no-op." + ) + enabled.append(host) + continue + logger.error(f"Failed to enable {service_type} on {host}: {e}") + failures.append(f"{host}: {e}") + + # Any failure is surfaced as an exception so the orchestrator + # framework reports the operation as failed. Partial-success + # context is included in the message to aid debugging. + if failures: + ctx = [] + if enabled: + ctx.append(f"enabled on {', '.join(enabled)}") + if already_active: + ctx.append(f"already active on {', '.join(already_active)}") + ctx_str = f" ({'; '.join(ctx)})" if ctx else "" + raise OrchestratorError( + f"Failed to enable {service_type}: " + + "; ".join(failures) + + ctx_str + ) + + parts = [] + if enabled: + parts.append(f"enabled on {', '.join(enabled)}") + if already_active: + parts.append(f"already active on {', '.join(already_active)}") + + summary = "; ".join(parts) if parts else "no-op" + return f"{service_type}: {summary}" + + @handle_orch_error def apply_rbd_mirror(self, spec: ServiceSpec) -> OrchResult[str]: - logger.info(f"Received Apply Request for RBD Mirror: Spec: {vars(spec).items()}") - raise NotImplementedError() + """Enable the rbd-mirror service on the target hosts. - def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]: + rbd-mirror is a client-like service with no additional parameters. + The Go API's ClientServicePlacement handler takes no payload. + Dotted names are rejected (see `_require_bare_service_name`). """ + logger.debug("Applying rbd-mirror service") + _require_bare_service_name("rbd-mirror", spec.service_id or "") + return self._apply_service("rbd-mirror", spec, "{}") - :param spec: - :return: + @handle_orch_error + def apply_rgw(self, spec: RGWSpec) -> OrchResult[str]: + """Enable the RGW service on the target hosts. + + Extracts port and SSL configuration from the RGWSpec and passes + them as the JSON payload to the MicroCeph enable_service API. + + Note on SSL: The Ceph RGWSpec provides rgw_frontend_ssl_certificate + (a list of PEM cert strings) but has no private key field. MicroCeph's + Go API (RgwServicePlacement) requires both SSLCertificate and + SSLPrivateKey to enable SSL. Until the spec is extended or a separate + key source is added, SSL cannot be fully configured via the + orchestrator interface. + + Note on service_id (realms/zones): MicroCeph deploys a single bare + "rgw" service per node and does not support multiple RGW instances + with distinct service_ids on the same cluster. Supplying a + service_id raises ValueError so the operator is not surprised + by a silently-misscoped subsequent remove_service call. """ - raise NotImplementedError() + logger.debug("Applying RGW service") + _require_bare_service_name("rgw", spec.service_id or "") + + # Build RGW-specific payload from the spec. + # Go's RgwServicePlacement expects: Port, SSLPort, SSLCertificate, SSLPrivateKey + # Field names match Go exported field names (no json tags defined, + # so encoding/json matches case-insensitively). + rgw_params: Dict[str, Any] = {} + if spec.rgw_frontend_port: + rgw_params['Port'] = spec.rgw_frontend_port + + # TODO: SSL support for RGW via orchestrator interface. + # + # SSL cannot be configured through this path. MicroCeph's Go API requires + # both SSLCertificate and SSLPrivateKey as raw PEM material; if either is + # missing, SSL is skipped and RGW falls back to plain HTTP (see rgw.go). + # The Ceph RGWSpec only carries the cert chain (rgw_frontend_ssl_certificate) + # with no private key field, so we cannot supply both. + if spec.rgw_frontend_ssl_certificate: + logger.warning( + "RGWSpec provides SSL certificate but the Ceph orchestrator spec " + "has no private key field. MicroCeph requires both certificate and " + "private key for SSL. SSL will be skipped; RGW will be deployed " + "in non-SSL mode." + ) + payload = json.dumps(rgw_params) if rgw_params else "{}" + return self._apply_service("rgw", spec, payload) + + @handle_orch_error def apply_nfs(self, spec: NFSServiceSpec) -> OrchResult[str]: + """Enable the NFS service on the target hosts. + + Extracts the NFS cluster ID and optional port from the + NFSServiceSpec and passes them as the JSON payload. """ + logger.debug("Applying NFS service") - :param spec: - :return: + # Go's NFSServicePlacement expects: cluster_id, v4_min_version, bind_address, bind_port + # The Ceph NFSServiceSpec uses service_id as the NFS cluster identifier. + if not spec.service_id: + raise ValueError("NFS service_id (cluster_id) is required") + + nfs_params: Dict[str, Any] = { + 'cluster_id': spec.service_id, + } + if spec.port: + nfs_params['bind_port'] = spec.port + if spec.virtual_ip: + nfs_params['bind_address'] = spec.virtual_ip + + payload = json.dumps(nfs_params) + return self._apply_service("nfs", spec, payload, group_id=spec.service_id) + + @handle_orch_error + def apply_mon(self, spec: ServiceSpec) -> OrchResult[str]: + """Enable the MON service on the target hosts. + + Dotted names are rejected (see `_require_bare_service_name`). """ - raise NotImplementedError() + logger.debug("Applying MON service") + _require_bare_service_name("mon", spec.service_id or "") + return self._apply_service("mon", spec, "{}") + + @handle_orch_error + def apply_mgr(self, spec: ServiceSpec) -> OrchResult[str]: + """Enable the MGR service on the target hosts. + + Dotted names are rejected (see `_require_bare_service_name`). + """ + logger.debug("Applying MGR service") + _require_bare_service_name("mgr", spec.service_id or "") + return self._apply_service("mgr", spec, "{}") + + @handle_orch_error + def apply_mds(self, spec: ServiceSpec) -> OrchResult[str]: + """Enable the MDS service on the target hosts. + + Note on service_id (filesystem name): MicroCeph deploys a single + bare "mds" service and does not support per-filesystem MDS + placement. Supplying a service_id raises ValueError. + """ + logger.debug("Applying MDS service") + _require_bare_service_name("mds", spec.service_id or "") + return self._apply_service("mds", spec, "{}") + + @handle_orch_error + def apply_cephfs_mirror(self, spec: ServiceSpec) -> OrchResult[str]: + """Enable the cephfs-mirror service on the target hosts. + + cephfs-mirror is a client-like service (same as rbd-mirror) + with no additional parameters. Dotted names are rejected + (see `_require_bare_service_name`). + """ + logger.debug("Applying cephfs-mirror service") + _require_bare_service_name("cephfs-mirror", spec.service_id or "") + return self._apply_service("cephfs-mirror", spec, "{}") + + @handle_orch_error + def remove_service(self, service_name: str, force: bool = False) -> OrchResult[str]: + """Remove a service cluster-wide. + + Discovers all hosts currently running the requested service via + list_services() and issues a DELETE per host using the server-side + proxyTarget middleware (see _apply_service for transport notes). + Errors from individual hosts are collected so a partial failure + does not mask successful removals on other nodes. + + :param service_name: service type or type.id (e.g. "rgw", + "nfs.mycluster") + :param force: unused, kept for interface compatibility + """ + logger.info(f"Removing service: {service_name}, force={force}") + + svc_type, svc_id = self._parse_service_name(service_name) + + if svc_type == 'nfs' and not svc_id: + raise ValueError( + "NFS removal requires service name in 'nfs.' format" + ) + # For non-NFS services, a dotted name is rejected up front: + # MicroCeph deploys a single bare instance per node and silently + # dropping the id would let an operator believe they were + # removing a specific realm/filesystem while actually wiping the + # bare service from every host. + _require_bare_service_name(svc_type, svc_id) + + # Discover hosts currently running this service. For non-nfs + # services group_id is unused; for nfs it filters to the specific + # cluster_id. + group_id = svc_id if svc_type == 'nfs' else None + hosts = sorted(self._get_existing_service_hosts(svc_type, group_id)) + + if not hosts: + # Match cephadm: removing a service that is not deployed is + # surfaced as an error so the operator notices a typo or + # stale state rather than seeing a green no-op. + raise OrchestratorError( + f"Service {service_name!r} is not running on any host" + ) + + removed: List[str] = [] + failures: List[str] = [] + for host in hosts: + try: + if svc_type == 'nfs': + self.microceph.services.delete_nfs_service( + svc_id, target=host, + ) + else: + self.microceph.services.delete_service( + svc_type, target=host, + ) + removed.append(host) + except Exception as e: + logger.error(f"Failed to remove {service_name} from {host}: {e}") + failures.append(f"{host}: {e}") + + # Any failure raises so the operator gets a visible error; + # partial-success context is included in the message. + if failures: + ctx = f" (removed from {', '.join(removed)})" if removed else "" + raise OrchestratorError( + f"Failed to remove {service_name}: " + + "; ".join(failures) + + ctx + ) + + return f"{service_name}: removed from {', '.join(removed)}" + + @handle_orch_error + def remove_host(self, host: str, force: bool = False, + offline: bool = False, rm_crush_entry: bool = False) -> OrchResult[str]: + """Remove a host from the MicroCeph cluster. + + :param host: hostname to remove + :param force: unused, kept for interface compatibility + :param offline: unused, kept for interface compatibility + :param rm_crush_entry: unused, kept for interface compatibility + """ + logger.info(f"Removing host: {host}, force={force}") + self.microceph.cluster.remove(host) + return f"Removed host {host}" + + @handle_orch_error + def service_action(self, action: str, service_name: str) -> OrchResult[List[str]]: + """Perform an action (restart) on a service. + + Currently only 'restart' is supported via the MicroCeph API. The + restart is fanned out per host running the service, using the + server-side proxyTarget middleware (see _apply_service). + + :param action: one of "start", "stop", "restart", "redeploy", "reconfig" + :param service_name: service type (e.g. "mon", "rgw") + """ + logger.info(f"Service action: {action} on {service_name}") + + if action != "restart": + raise OrchestratorValidationError( + f"Service action '{action}' is not supported by MicroCeph. " + "Only 'restart' is currently available." + ) + + svc_type, svc_id = self._parse_service_name(service_name) + + # All currently-supported restart services are bare; reject + # dotted names so an operator does not silently restart all + # NFS clusters (etc.) when intending to target one. + _require_bare_service_name(svc_type, svc_id) + + if svc_type not in RESTART_SUPPORTED_SERVICES: + raise OrchestratorValidationError( + f"Restart of service type {svc_type!r} is not supported by " + f"MicroCeph. Supported services: " + f"{sorted(RESTART_SUPPORTED_SERVICES)}." + ) + + # group_id is threaded through for future-proofing: if NFS + # were ever added to RESTART_SUPPORTED_SERVICES, _require_bare + # above would already have rejected the dotted-name path, so + # svc_id is always empty here; keeping the call shape future- + # compatible avoids a silent fan-out across all clusters. + group_id = svc_id if svc_type in DOTTED_NAME_SUPPORTED_SERVICES else None + hosts = sorted(self._get_existing_service_hosts(svc_type, group_id)) + if not hosts: + # Restarting a service that is not deployed is an error: the + # operator either targeted the wrong name or expected the + # service to be running. Match cephadm semantics. + raise OrchestratorError( + f"Service {svc_type!r} is not running on any host" + ) + + restarted: List[str] = [] + failures: List[str] = [] + for host in hosts: + try: + self.microceph.services.restart_services( + [svc_type], target=host, + ) + restarted.append(host) + except Exception as e: + logger.error(f"Failed to restart {svc_type} on {host}: {e}") + failures.append(f"{host}: {e}") + + if failures: + ctx = ( + f" (restarted on {', '.join(restarted)})" if restarted else "" + ) + raise OrchestratorError( + f"Failed to restart {svc_type}: " + + "; ".join(failures) + + ctx + ) + + return [f"Restarted {svc_type} on {h}" for h in restarted] diff --git a/microceph-orch/tests/conftest.py b/microceph-orch/tests/conftest.py new file mode 100644 index 000000000..94b566fba --- /dev/null +++ b/microceph-orch/tests/conftest.py @@ -0,0 +1,149 @@ +""" +Test fixtures and mock setup for microceph-orch tests. + +Installs mock modules into sys.modules so that Ceph-internal and +snap-only imports work outside the snap environment. +""" + +import sys +from unittest.mock import MagicMock, patch + +import pytest + +from stubs import ( + OrchResult, + handle_orch_error, + CLICommandMeta, + HostSpec, + PlacementSpec, + ServiceSpec, + RGWSpec, + MONSpec, + MDSSpec, + NFSServiceSpec, + Device, + Devices, + InventoryFilter, + InventoryHost, + ServiceDescription, + DaemonDescription, + Orchestrator, + OrchestratorError, + OrchestratorValidationError, + MgrModule, + NotifyType, +) + + +# --------------------------------------------------------------------------- +# Install mocks into sys.modules BEFORE any microceph imports +# --------------------------------------------------------------------------- + +def _install_mocks(): + """Inject mock modules so `from ceph.deployment...` etc. work. + + Also mocks snap-specific dependencies (requests_unixsocket, snaphelpers) + that are only available inside the snap environment. + """ + + # ceph.deployment.inventory + inv_mod = MagicMock() + inv_mod.Device = Device + inv_mod.Devices = Devices + + # ceph.deployment.service_spec + spec_mod = MagicMock() + spec_mod.ServiceSpec = ServiceSpec + spec_mod.PlacementSpec = PlacementSpec + spec_mod.RGWSpec = RGWSpec + spec_mod.MONSpec = MONSpec + spec_mod.MDSSpec = MDSSpec + spec_mod.NFSServiceSpec = NFSServiceSpec + + # ceph.deployment + deployment_mod = MagicMock() + deployment_mod.inventory = inv_mod + deployment_mod.service_spec = spec_mod + + # ceph + ceph_mod = MagicMock() + ceph_mod.deployment = deployment_mod + + # mgr_module + mgr_mod = MagicMock() + mgr_mod.MgrModule = MgrModule + mgr_mod.NotifyType = NotifyType + + # orchestrator + orch_mod = MagicMock() + orch_mod.Orchestrator = Orchestrator + orch_mod.HostSpec = HostSpec + orch_mod.InventoryFilter = InventoryFilter + orch_mod.InventoryHost = InventoryHost + orch_mod.ServiceDescription = ServiceDescription + orch_mod.DaemonDescription = DaemonDescription + orch_mod.CLICommandMeta = CLICommandMeta + orch_mod.handle_orch_error = handle_orch_error + orch_mod.OrchResult = OrchResult + orch_mod.OrchestratorError = OrchestratorError + orch_mod.OrchestratorValidationError = OrchestratorValidationError + + # snap-only deps + requests_unixsocket_mod = MagicMock() + requests_unixsocket_mod.DEFAULT_SCHEME = "http+unix://" + snaphelpers_mod = MagicMock() + + sys.modules.update({ + "ceph": ceph_mod, + "ceph.deployment": deployment_mod, + "ceph.deployment.inventory": inv_mod, + "ceph.deployment.service_spec": spec_mod, + "mgr_module": mgr_mod, + "orchestrator": orch_mod, + "requests_unixsocket": requests_unixsocket_mod, + "snaphelpers": snaphelpers_mod, + }) + + +# Install mocks before pytest collects test modules +_install_mocks() + + +# --------------------------------------------------------------------------- +# Fixtures +# --------------------------------------------------------------------------- + +@pytest.fixture +def mock_client(): + """Return a mock Client with pre-wired service stubs.""" + client = MagicMock() + + # Default: cluster with 3 members + client.cluster.get_cluster_members.return_value = [ + {"name": "node1", "address": "10.0.0.1:7443", "status": "ONLINE"}, + {"name": "node2", "address": "10.0.0.2:7443", "status": "ONLINE"}, + {"name": "node3", "address": "10.0.0.3:7443", "status": "ONLINE"}, + ] + + # Default: no services running + client.services.list_services.return_value = [] + + # Default: no disks + client.services.list_disks.return_value = [] + + # Default: status available + client.status.is_available.return_value = None + + return client + + +@pytest.fixture +def orchestrator(mock_client): + """Return a MicroCephOrchestrator with a mocked client.""" + from microceph.module import MicroCephOrchestrator + + with patch.object(MicroCephOrchestrator, "__init__", lambda self, *a, **kw: None): + orch = MicroCephOrchestrator.__new__(MicroCephOrchestrator) + orch.microceph = mock_client + orch.run = True + return orch diff --git a/microceph-orch/tests/stubs.py b/microceph-orch/tests/stubs.py new file mode 100644 index 000000000..b2c61b023 --- /dev/null +++ b/microceph-orch/tests/stubs.py @@ -0,0 +1,149 @@ +""" +Minimal stubs for Ceph types used by the orchestrator module. + +The real types live in ceph.deployment.*, mgr_module, and orchestrator; +which are only importable inside the Ceph snap environment. These stubs +replicate just enough behaviour to test our code outside the snap. + +NOTE: the stubs are intentionally plain classes. They do NOT reproduce +the real Ceph enum / NamedTuple machinery (e.g. PlacementSpec with +HostPlacementSpec entries), so bugs that depend on that behaviour are +only caught by the integration suite (tests/scripts/test-orch.sh). +""" + +from typing import Optional, List, Dict, Any, Generic, TypeVar +from functools import wraps + +T = TypeVar("T") + + +class OrchResult(Generic[T]): + """Minimal OrchResult stub.""" + + def __init__(self, result: Optional[T] = None, exception: Optional[Exception] = None): + self.result = result + self.exception = exception + self.exception_str = str(exception) if exception else "" + + +def handle_orch_error(f): + """Stub decorator that wraps return value in OrchResult and catches exceptions.""" + @wraps(f) + def wrapper(*args, **kwargs): + try: + return OrchResult(f(*args, **kwargs)) + except Exception as e: + return OrchResult(None, exception=e) + return wrapper + + +class CLICommandMeta(type): + """No-op metaclass stub.""" + pass + + +class HostSpec: + def __init__(self, hostname, addr=None, labels=None, status=None, **kwargs): + self.hostname = hostname + self.addr = addr or hostname + self.labels = labels or [] + self.status = status or "" + + +class PlacementSpec: + def __init__(self, hosts=None, count=None, **kwargs): + self.hosts = hosts or [] + self.count = count + + +class ServiceSpec: + def __init__(self, service_type="", service_id=None, placement=None, **kwargs): + self.service_type = service_type + self.service_id = service_id + self.placement = placement + + +class RGWSpec(ServiceSpec): + def __init__(self, rgw_frontend_port=None, rgw_frontend_ssl_certificate=None, + ssl=False, **kwargs): + super().__init__(**kwargs) + self.rgw_frontend_port = rgw_frontend_port + self.rgw_frontend_ssl_certificate = rgw_frontend_ssl_certificate + self.ssl = ssl + + +class MONSpec(ServiceSpec): + pass + + +class MDSSpec(ServiceSpec): + pass + + +class NFSServiceSpec(ServiceSpec): + def __init__(self, port=None, virtual_ip=None, **kwargs): + super().__init__(**kwargs) + self.port = port + self.virtual_ip = virtual_ip + + +class Device: + def __init__(self, path="", available=None, **kwargs): + self.path = path + self.available = available + + +class Devices: + def __init__(self, devices=None): + self.devices = devices or [] + + +class InventoryFilter: + def __init__(self, labels=None, hosts=None): + self.labels = labels + self.hosts = hosts + + +class InventoryHost: + def __init__(self, name="", devices=None): + self.name = name + self.devices = devices + + +class ServiceDescription: + def __init__(self, spec=None, running=0, size=0, **kwargs): + self.spec = spec + self.running = running + self.size = size + + +class DaemonDescription: + def __init__(self, service_name="", daemon_type="", daemon_id="", + hostname="", ip=None, ports=None, **kwargs): + self.service_name = service_name + self.daemon_type = daemon_type + self.daemon_id = daemon_id + self.hostname = hostname + self.ip = ip + self.ports = ports + + +class OrchestratorError(Exception): + pass + + +class OrchestratorValidationError(OrchestratorError): + pass + + +class Orchestrator: + pass + + +class MgrModule: + def __init__(self, *args, **kwargs): + pass + + +class NotifyType: + pass diff --git a/microceph-orch/tests/test_client.py b/microceph-orch/tests/test_client.py new file mode 100644 index 000000000..27b8514ab --- /dev/null +++ b/microceph-orch/tests/test_client.py @@ -0,0 +1,178 @@ +""" +Tests for the MicroCeph client layer (cluster.py). +""" + +import json +import pytest +from unittest.mock import MagicMock, patch + +from microceph.client.cluster import ( + MicroClusterService, + StatusService, + ExtendedAPIService, +) +from microceph.client.service import RemoteException + + +@pytest.fixture +def mock_session(): + return MagicMock() + + +@pytest.fixture +def endpoint(): + return "http+unix://%2Ftmp%2Ftest.socket" + + +# =================================================================== +# MicroClusterService +# =================================================================== + +class TestMicroClusterService: + def test_get_cluster_members(self, mock_session, endpoint): + svc = MicroClusterService(mock_session, endpoint) + mock_session.request.return_value = MagicMock( + status_code=200, + text='{}', + json=lambda: { + "metadata": [ + {"name": "n1", "address": "10.0.0.1:7443", "status": "ONLINE", "extra": "ignored"}, + {"name": "n2", "address": "10.0.0.2:7443", "status": "ONLINE"}, + ] + }, + ) + members = svc.get_cluster_members() + assert len(members) == 2 + assert members[0] == {"name": "n1", "address": "10.0.0.1:7443", "status": "ONLINE"} + # "extra" key should be filtered out + assert "extra" not in members[0] + + def test_get_cluster_members_null_metadata(self, mock_session, endpoint): + svc = MicroClusterService(mock_session, endpoint) + mock_session.request.return_value = MagicMock( + status_code=200, + text='{}', + json=lambda: {"metadata": None}, + ) + members = svc.get_cluster_members() + assert members == [] + + def test_get_cluster_members_missing_metadata(self, mock_session, endpoint): + svc = MicroClusterService(mock_session, endpoint) + mock_session.request.return_value = MagicMock( + status_code=200, + text='{}', + json=lambda: {}, + ) + members = svc.get_cluster_members() + assert members == [] + + +# =================================================================== +# ExtendedAPIService +# =================================================================== + +class TestExtendedAPIService: + def test_list_services_returns_list(self, mock_session, endpoint): + svc = ExtendedAPIService(mock_session, endpoint) + mock_session.request.return_value = MagicMock( + status_code=200, + text='{}', + json=lambda: {"metadata": [{"service": "mon", "location": "n1"}]}, + ) + result = svc.list_services() + assert isinstance(result, list) + assert len(result) == 1 + + def test_list_services_null_metadata(self, mock_session, endpoint): + svc = ExtendedAPIService(mock_session, endpoint) + mock_session.request.return_value = MagicMock( + status_code=200, + text='{}', + json=lambda: {"metadata": None}, + ) + result = svc.list_services() + assert result == [] + + def test_list_disks_null_metadata(self, mock_session, endpoint): + svc = ExtendedAPIService(mock_session, endpoint) + mock_session.request.return_value = MagicMock( + status_code=200, + text='{}', + json=lambda: {"metadata": None}, + ) + result = svc.list_disks() + assert result == [] + + def test_enable_service_payload(self, mock_session, endpoint): + svc = ExtendedAPIService(mock_session, endpoint) + mock_session.request.return_value = MagicMock( + status_code=200, + text='{}', + json=lambda: {"metadata": None}, + ) + svc.enable_service(name="rgw", payload='{"Port": 8080}', wait=True) + + call_args = mock_session.request.call_args + body = call_args.kwargs.get("json") or call_args[1].get("json") + assert body["name"] == "rgw" + assert body["bool"] is True + assert body["payload"] == '{"Port": 8080}' + + def test_enable_service_wait_false(self, mock_session, endpoint): + svc = ExtendedAPIService(mock_session, endpoint) + mock_session.request.return_value = MagicMock( + status_code=200, + text='{}', + json=lambda: {"metadata": None}, + ) + svc.enable_service(name="mon", wait=False) + + call_args = mock_session.request.call_args + body = call_args.kwargs.get("json") or call_args[1].get("json") + assert body["bool"] is False + + def test_delete_service(self, mock_session, endpoint): + svc = ExtendedAPIService(mock_session, endpoint) + mock_session.request.return_value = MagicMock( + status_code=200, + text='{}', + json=lambda: {}, + ) + svc.delete_service("rgw") + call_args = mock_session.request.call_args + assert call_args.kwargs.get("method") == "delete" or call_args[0][0] == "delete" + + def test_delete_nfs_service(self, mock_session, endpoint): + svc = ExtendedAPIService(mock_session, endpoint) + mock_session.request.return_value = MagicMock( + status_code=200, + text='{}', + json=lambda: {}, + ) + svc.delete_nfs_service("mycluster") + call_args = mock_session.request.call_args + body = call_args.kwargs.get("json") or call_args[1].get("json") + assert body == {"cluster_id": "mycluster"} + + def test_restart_services(self, mock_session, endpoint): + svc = ExtendedAPIService(mock_session, endpoint) + mock_session.request.return_value = MagicMock( + status_code=200, + text='{}', + json=lambda: {}, + ) + svc.restart_services(["mon", "rgw"]) + call_args = mock_session.request.call_args + body = call_args.kwargs.get("json") or call_args[1].get("json") + assert body == [{"service": "mon"}, {"service": "rgw"}] + + def test_get_status_null_metadata(self, mock_session, endpoint): + svc = ExtendedAPIService(mock_session, endpoint) + mock_session.request.return_value = MagicMock( + status_code=200, + text='{}', + json=lambda: {"metadata": None}, + ) + result = svc.get_status() + assert result == {} diff --git a/microceph-orch/tests/test_module.py b/microceph-orch/tests/test_module.py new file mode 100644 index 000000000..84707611e --- /dev/null +++ b/microceph-orch/tests/test_module.py @@ -0,0 +1,964 @@ +""" +Tests for MicroCephOrchestrator methods. +""" + +import json +import pytest +from unittest.mock import MagicMock + +from stubs import ( + HostSpec, + PlacementSpec, + ServiceSpec, + RGWSpec, + NFSServiceSpec, + InventoryFilter, + OrchestratorError, + OrchestratorValidationError, +) + +from microceph.client.service import RemoteException + + +def _svc(service, location, group_id="", info=""): + """Build a MicroCeph service record as returned by list_services().""" + return {"service": service, "location": location, + "group_id": group_id, "info": info} + + +# =================================================================== +# available() +# =================================================================== + +class TestAvailable: + def test_available_success(self, orchestrator, mock_client): + ok, msg, info = orchestrator.available() + assert ok is True + assert msg == "" + mock_client.status.is_available.assert_called_once() + + def test_available_remote_error(self, orchestrator, mock_client): + mock_client.status.is_available.side_effect = RemoteException("socket gone") + ok, msg, _ = orchestrator.available() + assert ok is False + assert "Cannot reach" in msg + + def test_available_unexpected_error(self, orchestrator, mock_client): + mock_client.status.is_available.side_effect = OSError("permission denied") + ok, msg, _ = orchestrator.available() + assert ok is False + assert "Unexpected error" in msg + + +# =================================================================== +# get_hosts() +# =================================================================== + +class TestGetHosts: + def test_get_hosts_basic(self, orchestrator, mock_client): + result = orchestrator.get_hosts() + hosts = result.result + assert len(hosts) == 3 + assert hosts[0].hostname == "node1" + assert hosts[0].addr == "10.0.0.1" + assert hosts[0].status == "ONLINE" + + def test_get_hosts_strips_port(self, orchestrator, mock_client): + mock_client.cluster.get_cluster_members.return_value = [ + {"name": "h1", "address": "192.168.1.100:9443", "status": "ONLINE"}, + ] + result = orchestrator.get_hosts() + assert result.result[0].addr == "192.168.1.100" + + def test_get_hosts_no_port_in_address(self, orchestrator, mock_client): + mock_client.cluster.get_cluster_members.return_value = [ + {"name": "h1", "address": "192.168.1.100", "status": "ONLINE"}, + ] + result = orchestrator.get_hosts() + # Should fall back to raw address when no ":" present + assert result.result[0].addr == "192.168.1.100" + + def test_get_hosts_ipv6_address(self, orchestrator, mock_client): + mock_client.cluster.get_cluster_members.return_value = [ + # bracketed IPv6 with port + {"name": "h1", "address": "[fe80::1]:7443", "status": "ONLINE"}, + # bare IPv6 literal, no port + {"name": "h2", "address": "fe80::2", "status": "ONLINE"}, + ] + result = orchestrator.get_hosts() + assert result.result[0].addr == "fe80::1" + assert result.result[1].addr == "fe80::2" + + def test_get_hosts_missing_address(self, orchestrator, mock_client): + mock_client.cluster.get_cluster_members.return_value = [ + {"name": "h1", "status": "ONLINE"}, + ] + result = orchestrator.get_hosts() + # Missing address should not crash + assert result.result[0].hostname == "h1" + + def test_get_hosts_empty_cluster(self, orchestrator, mock_client): + mock_client.cluster.get_cluster_members.return_value = [] + result = orchestrator.get_hosts() + assert result.result == [] + + def test_get_hosts_api_error(self, orchestrator, mock_client): + mock_client.cluster.get_cluster_members.side_effect = RemoteException("fail") + result = orchestrator.get_hosts() + assert result.exception is not None + + +# =================================================================== +# describe_service() +# =================================================================== + +class TestDescribeService: + def test_describe_service_all(self, orchestrator, mock_client): + mock_client.services.list_services.return_value = [ + {"service": "mon", "location": "node1", "group_id": "", "info": ""}, + {"service": "mon", "location": "node2", "group_id": "", "info": ""}, + {"service": "rgw", "location": "node1", "group_id": "", "info": ""}, + ] + result = orchestrator.describe_service() + descs = result.result + assert len(descs) == 2 # mon and rgw grouped + types = {d.spec.service_type for d in descs} + assert types == {"mon", "rgw"} + + def test_describe_service_filter_type(self, orchestrator, mock_client): + mock_client.services.list_services.return_value = [ + {"service": "mon", "location": "node1", "group_id": "", "info": ""}, + {"service": "rgw", "location": "node1", "group_id": "", "info": ""}, + ] + result = orchestrator.describe_service(service_type="rgw") + descs = result.result + assert len(descs) == 1 + assert descs[0].spec.service_type == "rgw" + + def test_describe_service_with_group_id(self, orchestrator, mock_client): + mock_client.services.list_services.return_value = [ + {"service": "nfs", "location": "node1", "group_id": "mycluster", "info": "{}"}, + ] + result = orchestrator.describe_service() + descs = result.result + assert len(descs) == 1 + assert descs[0].spec.service_id == "mycluster" + + def test_describe_service_empty(self, orchestrator, mock_client): + mock_client.services.list_services.return_value = [] + result = orchestrator.describe_service() + assert result.result == [] + + def test_describe_service_running_count(self, orchestrator, mock_client): + mock_client.services.list_services.return_value = [ + {"service": "mon", "location": "node1", "group_id": "", "info": ""}, + {"service": "mon", "location": "node2", "group_id": "", "info": ""}, + {"service": "mon", "location": "node3", "group_id": "", "info": ""}, + ] + result = orchestrator.describe_service() + assert result.result[0].running == 3 + + +# =================================================================== +# list_daemons() +# =================================================================== + +class TestListDaemons: + def test_list_daemons_basic(self, orchestrator, mock_client): + mock_client.services.list_services.return_value = [ + {"service": "mon", "location": "node1", "group_id": "", "info": ""}, + {"service": "rgw", "location": "node2", "group_id": "", "info": ""}, + ] + result = orchestrator.list_daemons() + daemons = result.result + assert len(daemons) == 2 + + def test_list_daemons_filter_daemon_type(self, orchestrator, mock_client): + mock_client.services.list_services.return_value = [ + {"service": "mon", "location": "node1", "group_id": "", "info": ""}, + {"service": "rgw", "location": "node2", "group_id": "", "info": ""}, + ] + result = orchestrator.list_daemons(daemon_type="mon") + assert len(result.result) == 1 + assert result.result[0].daemon_type == "mon" + + def test_list_daemons_filter_host(self, orchestrator, mock_client): + mock_client.services.list_services.return_value = [ + {"service": "mon", "location": "node1", "group_id": "", "info": ""}, + {"service": "mon", "location": "node2", "group_id": "", "info": ""}, + ] + result = orchestrator.list_daemons(host="node1") + assert len(result.result) == 1 + assert result.result[0].hostname == "node1" + + def test_list_daemons_filter_service_name(self, orchestrator, mock_client): + mock_client.services.list_services.return_value = [ + {"service": "nfs", "location": "node1", "group_id": "cluster1", "info": "{}"}, + {"service": "nfs", "location": "node1", "group_id": "cluster2", "info": "{}"}, + ] + result = orchestrator.list_daemons(service_name="nfs.cluster1") + assert len(result.result) == 1 + + def test_list_daemons_filter_daemon_id(self, orchestrator, mock_client): + mock_client.services.list_services.return_value = [ + {"service": "mon", "location": "node1", "group_id": "", "info": ""}, + {"service": "mon", "location": "node2", "group_id": "", "info": ""}, + ] + result = orchestrator.list_daemons(daemon_id="node2") + assert len(result.result) == 1 + assert result.result[0].hostname == "node2" + + def test_list_daemons_nfs_with_info(self, orchestrator, mock_client): + mock_client.services.list_services.return_value = [ + { + "service": "nfs", + "location": "node1", + "group_id": "mycluster", + "info": json.dumps({"bind_address": "10.0.0.5", "bind_port": 2049}), + }, + ] + result = orchestrator.list_daemons() + daemon = result.result[0] + assert daemon.ip == "10.0.0.5" + assert daemon.ports == [2049] + + def test_list_daemons_nfs_wildcard_address(self, orchestrator, mock_client): + mock_client.services.list_services.return_value = [ + { + "service": "nfs", + "location": "node1", + "group_id": "mycluster", + "info": json.dumps({"bind_address": "0.0.0.0", "bind_port": 2049}), + }, + ] + result = orchestrator.list_daemons() + assert result.result[0].ip is None # 0.0.0.0 should be None + + def test_list_daemons_nfs_malformed_info(self, orchestrator, mock_client): + mock_client.services.list_services.return_value = [ + {"service": "nfs", "location": "node1", "group_id": "c1", "info": "not-json"}, + ] + # Should not crash, just skip the NFS info parsing + result = orchestrator.list_daemons() + assert len(result.result) == 1 + assert result.result[0].ip is None + assert result.result[0].ports is None + + def test_list_daemons_nfs_empty_info(self, orchestrator, mock_client): + mock_client.services.list_services.return_value = [ + {"service": "nfs", "location": "node1", "group_id": "c1", "info": ""}, + ] + result = orchestrator.list_daemons() + assert len(result.result) == 1 + + def test_list_daemons_nfs_null_info(self, orchestrator, mock_client): + mock_client.services.list_services.return_value = [ + {"service": "nfs", "location": "node1", "group_id": "c1", "info": None}, + ] + result = orchestrator.list_daemons() + assert len(result.result) == 1 + + def test_list_daemons_metadata_missing_optional_keys(self, orchestrator, mock_client): + # ExtendedAPIService.list_services() may omit group_id (and other + # optional keys); list_daemons must not raise KeyError. + mock_client.services.list_services.return_value = [ + {"service": "mon", "location": "n1"}, + ] + result = orchestrator.list_daemons() + assert result.exception is None + assert len(result.result) == 1 + daemon = result.result[0] + assert daemon.daemon_type == "mon" + assert daemon.hostname == "n1" + assert daemon.service_name == "mon" + + +# =================================================================== +# get_inventory() +# =================================================================== + +class TestGetInventory: + def test_get_inventory_shows_osd_disks(self, orchestrator, mock_client): + mock_client.services.list_disks.return_value = [ + {"location": "node1", "path": "/dev/sdb"}, + {"location": "node1", "path": "/dev/sdc"}, + ] + result = orchestrator.get_inventory() + inv = result.result + # 3 hosts (from cluster members), node1 has 2 OSD disks + hosts = {h.name for h in inv} + assert hosts == {"node1", "node2", "node3"} + node1 = [h for h in inv if h.name == "node1"][0] + assert len(node1.devices.devices) == 2 + assert all(d.available is False for d in node1.devices.devices) + + def test_get_inventory_multi_host(self, orchestrator, mock_client): + mock_client.services.list_disks.return_value = [ + {"location": "node1", "path": "/dev/sda"}, + {"location": "node2", "path": "/dev/sda"}, + ] + result = orchestrator.get_inventory() + hosts = {h.name for h in result.result} + assert hosts == {"node1", "node2", "node3"} + + def test_get_inventory_with_host_filter(self, orchestrator, mock_client): + mock_client.services.list_disks.return_value = [ + {"location": "node1", "path": "/dev/sda"}, + {"location": "node2", "path": "/dev/sda"}, + {"location": "node3", "path": "/dev/sda"}, + ] + filt = InventoryFilter(hosts=["node1", "node3"]) + result = orchestrator.get_inventory(host_filter=filt) + hosts = {h.name for h in result.result} + assert hosts == {"node1", "node3"} + + def test_get_inventory_label_filter_rejected(self, orchestrator, mock_client): + filt = InventoryFilter(labels=["role=osd"]) + result = orchestrator.get_inventory(host_filter=filt) + assert isinstance(result.exception, OrchestratorValidationError) + mock_client.services.list_disks.assert_not_called() + + def test_get_inventory_empty(self, orchestrator, mock_client): + mock_client.services.list_disks.return_value = [] + mock_client.cluster.get_cluster_members.return_value = [] + result = orchestrator.get_inventory() + assert result.result == [] + + def test_get_inventory_includes_members_without_disks(self, orchestrator, mock_client): + mock_client.services.list_disks.return_value = [] + # Default mock has 3 cluster members + result = orchestrator.get_inventory() + hosts = {h.name for h in result.result} + assert hosts == {"node1", "node2", "node3"} + # No devices on any host + for h in result.result: + assert len(h.devices.devices) == 0 + + +# =================================================================== +# apply_rbd_mirror() +# =================================================================== + +class TestApplyRbdMirror: + def test_apply_rbd_mirror_success(self, orchestrator, mock_client): + spec = ServiceSpec( + service_type="rbd-mirror", + placement=PlacementSpec(hosts=["node1", "node2"]), + ) + result = orchestrator.apply_rbd_mirror(spec) + assert result.exception is None + # Per-host fan-out via the proxyTarget middleware: one call per + # requested host with target=. + assert "enabled on node1, node2" in result.result + assert mock_client.services.enable_service.call_count == 2 + targets = sorted( + c.kwargs["target"] + for c in mock_client.services.enable_service.call_args_list + ) + assert targets == ["node1", "node2"] + + def test_apply_rbd_mirror_no_placement(self, orchestrator, mock_client): + spec = ServiceSpec(service_type="rbd-mirror") + result = orchestrator.apply_rbd_mirror(spec) + assert result.exception is not None + assert "No placement hosts" in str(result.exception) + + def test_apply_rbd_mirror_service_id_rejected(self, orchestrator, mock_client): + # Bare-name guard: rbd-mirror does not support dotted names. + spec = ServiceSpec( + service_type="rbd-mirror", + service_id="zone1", + placement=PlacementSpec(hosts=["node1"]), + ) + result = orchestrator.apply_rbd_mirror(spec) + assert isinstance(result.exception, ValueError) + mock_client.services.enable_service.assert_not_called() + + def test_apply_rbd_mirror_skips_existing(self, orchestrator, mock_client): + # node1 already runs the service; node2 is also requested. + mock_client.services.list_services.return_value = [ + _svc("rbd-mirror", "node1"), + ] + spec = ServiceSpec( + service_type="rbd-mirror", + placement=PlacementSpec(hosts=["node1", "node2"]), + ) + result = orchestrator.apply_rbd_mirror(spec) + assert result.exception is None + assert "already active on node1" in result.result + assert "enabled on node2" in result.result + # Only the not-yet-active host is targeted. + mock_client.services.enable_service.assert_called_once() + assert ( + mock_client.services.enable_service.call_args.kwargs["target"] + == "node2" + ) + + def test_apply_list_services_failure_propagates(self, orchestrator, mock_client): + # If list_services itself fails we must NOT silently treat the + # set of existing hosts as empty and re-apply everywhere; the + # error has to surface. + mock_client.services.list_services.side_effect = RemoteException( + "list failed" + ) + spec = ServiceSpec( + service_type="rbd-mirror", + placement=PlacementSpec(hosts=["node1"]), + ) + result = orchestrator.apply_rbd_mirror(spec) + assert isinstance(result.exception, RemoteException) + mock_client.services.enable_service.assert_not_called() + + def test_apply_rbd_mirror_partial_failure(self, orchestrator, mock_client): + # node1 enables, node2 fails — any failure raises with + # partial-success context in the message. + spec = ServiceSpec( + service_type="rbd-mirror", + placement=PlacementSpec(hosts=["node1", "node2"]), + ) + + def side_effect(*args, **kwargs): + if kwargs.get("target") == "node2": + raise RemoteException("node2 boom") + + mock_client.services.enable_service.side_effect = side_effect + result = orchestrator.apply_rbd_mirror(spec) + assert isinstance(result.exception, OrchestratorError) + msg = str(result.exception) + assert "Failed to enable rbd-mirror" in msg + assert "node2: node2 boom" in msg + assert "enabled on node1" in msg + + def test_apply_rbd_mirror_all_existing(self, orchestrator, mock_client): + mock_client.services.list_services.return_value = [ + _svc("rbd-mirror", "node1"), + ] + spec = ServiceSpec( + service_type="rbd-mirror", + placement=PlacementSpec(hosts=["node1"]), + ) + result = orchestrator.apply_rbd_mirror(spec) + assert result.exception is None + assert "already active on node1" in result.result + mock_client.services.enable_service.assert_not_called() + + def test_apply_rbd_mirror_api_error(self, orchestrator, mock_client): + mock_client.services.enable_service.side_effect = RemoteException("fail") + spec = ServiceSpec( + service_type="rbd-mirror", + placement=PlacementSpec(hosts=["node1"]), + ) + result = orchestrator.apply_rbd_mirror(spec) + assert result.exception is not None + + +# =================================================================== +# apply_rgw() +# =================================================================== + +class TestApplyRgw: + def test_apply_rgw_basic(self, orchestrator, mock_client): + spec = RGWSpec( + service_type="rgw", + placement=PlacementSpec(hosts=["node1"]), + ) + result = orchestrator.apply_rgw(spec) + assert result.exception is None + assert "enabled on node1" in result.result + + call_kwargs = mock_client.services.enable_service.call_args + assert call_kwargs.kwargs["name"] == "rgw" + assert call_kwargs.kwargs["target"] == "node1" + + def test_apply_rgw_with_port(self, orchestrator, mock_client): + spec = RGWSpec( + service_type="rgw", + rgw_frontend_port=8080, + placement=PlacementSpec(hosts=["node1"]), + ) + result = orchestrator.apply_rgw(spec) + assert result.exception is None + + call_kwargs = mock_client.services.enable_service.call_args + payload = json.loads(call_kwargs.kwargs["payload"]) + assert payload["Port"] == 8080 + assert call_kwargs.kwargs["target"] == "node1" + + def test_apply_rgw_ssl_cert_without_key_warns(self, orchestrator, mock_client, caplog): + """SSL cert is present but no key; should warn and not send cert.""" + spec = RGWSpec( + service_type="rgw", + rgw_frontend_ssl_certificate=["-----BEGIN CERT-----"], + placement=PlacementSpec(hosts=["node1"]), + ) + import logging + with caplog.at_level(logging.WARNING): + result = orchestrator.apply_rgw(spec) + + assert result.exception is None + # Cert should NOT be in payload (useless without key) + call_kwargs = mock_client.services.enable_service.call_args + payload = json.loads(call_kwargs.kwargs["payload"]) + assert "SSLCertificate" not in payload + # Warning should be logged + assert any("private key" in r.message for r in caplog.records) + + def test_apply_rgw_no_placement(self, orchestrator, mock_client): + spec = RGWSpec(service_type="rgw") + result = orchestrator.apply_rgw(spec) + assert result.exception is not None + + def test_apply_rgw_service_id_rejected(self, orchestrator, mock_client): + # MicroCeph deploys a single bare 'rgw' service; per-realm + # service_id is not supported and is rejected up front so the + # operator does not believe a realm was provisioned. + spec = RGWSpec( + service_type="rgw", + service_id="realm1", + placement=PlacementSpec(hosts=["node1"]), + ) + result = orchestrator.apply_rgw(spec) + assert isinstance(result.exception, ValueError) + assert "does not support a service_id" in str(result.exception) + mock_client.services.enable_service.assert_not_called() + + +# =================================================================== +# apply_nfs() +# =================================================================== + +class TestApplyNfs: + def test_apply_nfs_basic(self, orchestrator, mock_client): + spec = NFSServiceSpec( + service_type="nfs", + service_id="mycluster", + placement=PlacementSpec(hosts=["node1"]), + ) + result = orchestrator.apply_nfs(spec) + assert result.exception is None + assert "enabled on node1" in result.result + + call_kwargs = mock_client.services.enable_service.call_args + payload = json.loads(call_kwargs.kwargs["payload"]) + assert payload["cluster_id"] == "mycluster" + assert call_kwargs.kwargs["target"] == "node1" + + def test_apply_nfs_with_port(self, orchestrator, mock_client): + spec = NFSServiceSpec( + service_type="nfs", + service_id="mycluster", + port=12049, + placement=PlacementSpec(hosts=["node1"]), + ) + result = orchestrator.apply_nfs(spec) + call_kwargs = mock_client.services.enable_service.call_args + payload = json.loads(call_kwargs.kwargs["payload"]) + assert payload["bind_port"] == 12049 + + def test_apply_nfs_with_virtual_ip(self, orchestrator, mock_client): + spec = NFSServiceSpec( + service_type="nfs", + service_id="mycluster", + virtual_ip="10.0.0.100", + placement=PlacementSpec(hosts=["node1"]), + ) + result = orchestrator.apply_nfs(spec) + call_kwargs = mock_client.services.enable_service.call_args + payload = json.loads(call_kwargs.kwargs["payload"]) + assert payload["bind_address"] == "10.0.0.100" + + def test_apply_nfs_missing_service_id(self, orchestrator, mock_client): + spec = NFSServiceSpec( + service_type="nfs", + placement=PlacementSpec(hosts=["node1"]), + ) + result = orchestrator.apply_nfs(spec) + assert result.exception is not None + assert "cluster_id" in str(result.exception) + + def test_apply_nfs_no_placement(self, orchestrator, mock_client): + spec = NFSServiceSpec( + service_type="nfs", + service_id="mycluster", + ) + result = orchestrator.apply_nfs(spec) + assert result.exception is not None + assert "No placement hosts" in str(result.exception) + + def test_apply_nfs_skips_existing(self, orchestrator, mock_client): + mock_client.services.list_services.return_value = [ + _svc("nfs", "node1", group_id="mycluster", info="{}"), + ] + spec = NFSServiceSpec( + service_type="nfs", + service_id="mycluster", + placement=PlacementSpec(hosts=["node1", "node2"]), + ) + result = orchestrator.apply_nfs(spec) + assert "already active on node1" in result.result + assert "enabled on node2" in result.result + mock_client.services.enable_service.assert_called_once() + # Only the host that did not yet run the cluster is targeted. + assert ( + mock_client.services.enable_service.call_args.kwargs["target"] + == "node2" + ) + + def test_apply_nfs_distinct_cluster_not_conflated(self, orchestrator, mock_client): + """A host running nfs.other must not suppress enabling nfs.mycluster.""" + mock_client.services.list_services.return_value = [ + _svc("nfs", "node1", group_id="other", info="{}"), + ] + spec = NFSServiceSpec( + service_type="nfs", + service_id="mycluster", + placement=PlacementSpec(hosts=["node1"]), + ) + result = orchestrator.apply_nfs(spec) + assert result.exception is None + # nfs.other on node1 must not be mistaken for nfs.mycluster + assert "already active" not in result.result + assert "enabled" in result.result + mock_client.services.enable_service.assert_called_once() + + +# =================================================================== +# apply_mon() / apply_mgr() / apply_mds() +# =================================================================== + +# =================================================================== +# apply_cephfs_mirror() +# =================================================================== + +class TestApplyCephfsMirror: + def test_apply_cephfs_mirror_success(self, orchestrator, mock_client): + spec = ServiceSpec( + service_type="cephfs-mirror", + placement=PlacementSpec(hosts=["node1"]), + ) + result = orchestrator.apply_cephfs_mirror(spec) + assert result.exception is None + assert "enabled on node1" in result.result + mock_client.services.enable_service.assert_called_once_with( + name="cephfs-mirror", payload="{}", wait=True, target="node1", + ) + + def test_apply_cephfs_mirror_service_id_rejected(self, orchestrator, mock_client): + spec = ServiceSpec( + service_type="cephfs-mirror", + service_id="x", + placement=PlacementSpec(hosts=["node1"]), + ) + result = orchestrator.apply_cephfs_mirror(spec) + assert isinstance(result.exception, ValueError) + mock_client.services.enable_service.assert_not_called() + + +# =================================================================== +# _parse_service_name() +# =================================================================== + +class TestParseServiceName: + def test_simple_name(self, orchestrator): + assert orchestrator._parse_service_name("rgw") == ("rgw", "") + + def test_dotted_name(self, orchestrator): + assert orchestrator._parse_service_name("nfs.mycluster") == ("nfs", "mycluster") + + def test_multi_dot_name(self, orchestrator): + """Ensure dotted names like nfs.my.cluster split on first dot only.""" + assert orchestrator._parse_service_name("nfs.my.cluster") == ("nfs", "my.cluster") + + def test_empty_string(self, orchestrator): + assert orchestrator._parse_service_name("") == ("", "") + + +# =================================================================== +# describe_service() - service_name filter +# =================================================================== + +class TestDescribeServiceNameFilter: + def test_filter_by_service_name(self, orchestrator, mock_client): + mock_client.services.list_services.return_value = [ + {"service": "nfs", "location": "node1", "group_id": "c1", "info": "{}"}, + {"service": "nfs", "location": "node1", "group_id": "c2", "info": "{}"}, + ] + result = orchestrator.describe_service(service_name="nfs.c1") + assert len(result.result) == 1 + assert result.result[0].spec.service_id == "c1" + + +# =================================================================== +# remove_service() - dotted non-NFS +# =================================================================== + +class TestRemoveServiceDotted: + def test_remove_dotted_non_nfs_rejected(self, orchestrator, mock_client): + # Removing 'mds.myfs' (or any dotted non-NFS name) must be + # rejected: silently dropping the id would otherwise wipe the + # bare MDS service across every host while the operator + # believed only a specific filesystem was being removed. + result = orchestrator.remove_service("mds.myfs") + assert isinstance(result.exception, ValueError) + assert "does not support a service_id" in str(result.exception) + mock_client.services.delete_service.assert_not_called() + + +# =================================================================== +# apply_mon() / apply_mgr() / apply_mds() / apply_cephfs_mirror() +# =================================================================== + +class TestApplyGenericServices: + def test_apply_mon(self, orchestrator, mock_client): + spec = ServiceSpec( + service_type="mon", + placement=PlacementSpec(hosts=["node1"]), + ) + result = orchestrator.apply_mon(spec) + assert result.exception is None + assert "enabled on node1" in result.result + mock_client.services.enable_service.assert_called_once_with( + name="mon", payload="{}", wait=True, target="node1", + ) + + def test_apply_mgr(self, orchestrator, mock_client): + spec = ServiceSpec( + service_type="mgr", + placement=PlacementSpec(hosts=["node1"]), + ) + result = orchestrator.apply_mgr(spec) + assert result.exception is None + mock_client.services.enable_service.assert_called_once_with( + name="mgr", payload="{}", wait=True, target="node1", + ) + + def test_apply_mon_service_id_rejected(self, orchestrator, mock_client): + spec = ServiceSpec( + service_type="mon", + service_id="x", + placement=PlacementSpec(hosts=["node1"]), + ) + result = orchestrator.apply_mon(spec) + assert isinstance(result.exception, ValueError) + mock_client.services.enable_service.assert_not_called() + + def test_apply_mgr_service_id_rejected(self, orchestrator, mock_client): + spec = ServiceSpec( + service_type="mgr", + service_id="x", + placement=PlacementSpec(hosts=["node1"]), + ) + result = orchestrator.apply_mgr(spec) + assert isinstance(result.exception, ValueError) + mock_client.services.enable_service.assert_not_called() + + def test_apply_mds(self, orchestrator, mock_client): + spec = ServiceSpec( + service_type="mds", + placement=PlacementSpec(hosts=["node1"]), + ) + result = orchestrator.apply_mds(spec) + assert result.exception is None + mock_client.services.enable_service.assert_called_once_with( + name="mds", payload="{}", wait=True, target="node1", + ) + + def test_apply_mds_service_id_rejected(self, orchestrator, mock_client): + # Per-filesystem MDS placement is not supported by MicroCeph; + # a service_id (filesystem name) is rejected up front. + spec = ServiceSpec( + service_type="mds", + service_id="fs1", + placement=PlacementSpec(hosts=["node1"]), + ) + result = orchestrator.apply_mds(spec) + assert isinstance(result.exception, ValueError) + assert "does not support a service_id" in str(result.exception) + mock_client.services.enable_service.assert_not_called() + + +# =================================================================== +# remove_service() +# =================================================================== + +class TestRemoveService: + def test_remove_service_basic(self, orchestrator, mock_client): + mock_client.services.list_services.return_value = [ + _svc("rgw", "node1"), + _svc("rgw", "node2"), + ] + result = orchestrator.remove_service("rgw") + assert result.exception is None + assert "removed from node1, node2" in result.result + assert mock_client.services.delete_service.call_count == 2 + + def test_remove_service_no_hosts(self, orchestrator, mock_client): + # list_services returns [] (default) — removing a service that + # isn't deployed must surface as an error (cephadm parity), not + # a green no-op. + result = orchestrator.remove_service("rgw") + assert isinstance(result.exception, OrchestratorError) + assert "not running on any host" in str(result.exception) + mock_client.services.delete_service.assert_not_called() + + def test_remove_service_nfs_with_cluster_id(self, orchestrator, mock_client): + mock_client.services.list_services.return_value = [ + _svc("nfs", "node1", group_id="mycluster"), + ] + result = orchestrator.remove_service("nfs.mycluster") + assert result.exception is None + assert "removed from node1" in result.result + mock_client.services.delete_nfs_service.assert_called_once() + c = mock_client.services.delete_nfs_service.call_args + assert c.args == ("mycluster",) + assert c.kwargs["target"] == "node1" + + def test_remove_service_nfs_without_cluster_id(self, orchestrator, mock_client): + result = orchestrator.remove_service("nfs") + assert result.exception is not None + assert "cluster_id" in str(result.exception) + + def test_remove_service_nfs_group_id_no_match(self, orchestrator, mock_client): + # NFS cluster 'other' exists on node1, but we're removing + # 'mycluster' — there is nothing to do for this cluster_id; + # surface as an error and do NOT cascade into deleting the + # unrelated cluster. + mock_client.services.list_services.return_value = [ + _svc("nfs", "node1", group_id="other"), + ] + result = orchestrator.remove_service("nfs.mycluster") + assert isinstance(result.exception, OrchestratorError) + assert "not running on any host" in str(result.exception) + mock_client.services.delete_nfs_service.assert_not_called() + + def test_remove_service_api_error(self, orchestrator, mock_client): + mock_client.services.list_services.return_value = [ + _svc("rgw", "node1"), + ] + mock_client.services.delete_service.side_effect = RemoteException("fail") + result = orchestrator.remove_service("rgw") + assert result.exception is not None + assert "Failed to remove" in str(result.exception) + + def test_remove_service_partial_failure(self, orchestrator, mock_client): + # node1 succeeds, node2 fails — any failure raises so the + # operator sees a visible error, and the partial-success + # context is included in the message. + mock_client.services.list_services.return_value = [ + _svc("rgw", "node1"), + _svc("rgw", "node2"), + ] + + def side_effect(*args, **kwargs): + if kwargs.get("target") == "node2": + raise RemoteException("node2 boom") + + mock_client.services.delete_service.side_effect = side_effect + result = orchestrator.remove_service("rgw") + assert isinstance(result.exception, OrchestratorError) + msg = str(result.exception) + assert "Failed to remove rgw" in msg + assert "node2: node2 boom" in msg + assert "removed from node1" in msg + + +# =================================================================== +# remove_host() +# =================================================================== + +class TestRemoveHost: + def test_remove_host_success(self, orchestrator, mock_client): + result = orchestrator.remove_host("node2") + assert result.exception is None + assert "Removed host node2" in result.result + mock_client.cluster.remove.assert_called_once_with("node2") + + def test_remove_host_api_error(self, orchestrator, mock_client): + mock_client.cluster.remove.side_effect = RemoteException("node not found") + result = orchestrator.remove_host("badhost") + assert result.exception is not None + + +# =================================================================== +# service_action() +# =================================================================== + +class TestServiceAction: + def test_restart_service(self, orchestrator, mock_client): + mock_client.services.list_services.return_value = [ + _svc("mon", "node1"), + _svc("mon", "node2"), + ] + result = orchestrator.service_action("restart", "mon") + assert result.exception is None + # One restart call per host carrying target=. + assert mock_client.services.restart_services.call_count == 2 + targets = sorted( + c.kwargs["target"] + for c in mock_client.services.restart_services.call_args_list + ) + assert targets == ["node1", "node2"] + assert any("Restarted mon on node1" in r for r in result.result) + assert any("Restarted mon on node2" in r for r in result.result) + + def test_restart_no_hosts(self, orchestrator, mock_client): + # Restarting a service that isn't deployed surfaces as an + # error so the operator notices the typo or stale assumption + # (cephadm parity). + result = orchestrator.service_action("restart", "mon") + assert isinstance(result.exception, OrchestratorError) + assert "not running on any host" in str(result.exception) + mock_client.services.restart_services.assert_not_called() + + def test_restart_dotted_non_nfs_rejected(self, orchestrator, mock_client): + # Dotted non-NFS names are rejected: restarting 'rgw.realm1' + # would have to silently drop the id and restart every rgw, + # which is a surprising operation. + result = orchestrator.service_action("restart", "rgw.realm1") + assert isinstance(result.exception, ValueError) + assert "does not support a service_id" in str(result.exception) + mock_client.services.restart_services.assert_not_called() + + def test_restart_unsupported_service_type(self, orchestrator, mock_client): + # MicroCeph's backend serviceWorkerTable only handles osd/mon/rgw. + # The orchestrator must reject restart for other service types up + # front rather than surfacing an opaque RemoteException. + for svc in ["nfs.mycluster", "mds", "mgr", "rbd-mirror"]: + mock_client.services.restart_services.reset_mock() + result = orchestrator.service_action("restart", svc) + assert isinstance(result.exception, OrchestratorValidationError), svc + mock_client.services.restart_services.assert_not_called() + + def test_restart_partial_failure(self, orchestrator, mock_client): + # node1 restarts, node2 fails — any failure raises with + # partial-success context in the message. + mock_client.services.list_services.return_value = [ + _svc("mon", "node1"), + _svc("mon", "node2"), + ] + + def side_effect(*args, **kwargs): + if kwargs.get("target") == "node2": + raise RemoteException("node2 boom") + + mock_client.services.restart_services.side_effect = side_effect + result = orchestrator.service_action("restart", "mon") + assert isinstance(result.exception, OrchestratorError) + msg = str(result.exception) + assert "Failed to restart mon" in msg + assert "node2: node2 boom" in msg + assert "restarted on node1" in msg + + def test_unsupported_action(self, orchestrator, mock_client): + result = orchestrator.service_action("stop", "mon") + assert result.exception is not None + assert "not supported" in str(result.exception) + + def test_restart_api_error(self, orchestrator, mock_client): + mock_client.services.list_services.return_value = [ + _svc("mon", "node1"), + ] + mock_client.services.restart_services.side_effect = RemoteException("fail") + result = orchestrator.service_action("restart", "mon") + assert result.exception is not None + assert "Failed to restart" in str(result.exception) diff --git a/tests/scripts/test-orch.sh b/tests/scripts/test-orch.sh new file mode 100755 index 000000000..a533fa7f0 --- /dev/null +++ b/tests/scripts/test-orch.sh @@ -0,0 +1,264 @@ +#!/bin/bash +# Integration tests for the MicroCeph orchestrator module. +# +# Can be run standalone against a pre-configured cluster: +# ./test-orch.sh [ceph-command-prefix] +# +# Or sourced from actionutils.sh for CI. +# +# Expects a bootstrapped MicroCeph cluster with the orchestrator enabled. +# The first argument is the prefix for ceph commands (default: "microceph.ceph"). + +set -uo pipefail + +CEPH="${1:-microceph.ceph}" +PASS=0 +FAIL=0 +ERRORS=() + +run_ceph() { sudo "$CEPH" "$@" 2>&1; } + +# --- Test helpers --- + +assert_contains() { + local desc="$1" needle="$2" haystack="$3" + if echo "$haystack" | grep -qE "$needle"; then + echo " PASS $desc" + PASS=$((PASS + 1)) + else + echo " FAIL $desc (expected to contain: '$needle')" + echo " ----- actual output -----" + echo "$haystack" | sed 's/^/ | /' + echo " -------------------------" + FAIL=$((FAIL + 1)) + ERRORS+=("$desc") + fi +} + +assert_not_contains() { + local desc="$1" needle="$2" haystack="$3" + if echo "$haystack" | grep -qE "$needle"; then + echo " FAIL $desc (should NOT contain: '$needle')" + echo " ----- actual output -----" + echo "$haystack" | sed 's/^/ | /' + echo " -------------------------" + FAIL=$((FAIL + 1)) + ERRORS+=("$desc") + else + echo " PASS $desc" + PASS=$((PASS + 1)) + fi +} + +assert_exit_ok() { + local desc="$1" + shift + if "$@" >/dev/null 2>&1; then + echo " PASS $desc" + PASS=$((PASS + 1)) + else + echo " FAIL $desc (non-zero exit)" + FAIL=$((FAIL + 1)) + ERRORS+=("$desc") + fi +} + +# =================================================================== +echo "=== 1. Orchestrator status ===" +# =================================================================== + +output=$(run_ceph orch status) +assert_contains "backend is microceph" "microceph" "$output" +assert_contains "available" "Available: Yes" "$output" + +# =================================================================== +echo "=== 2. Host listing ===" +# =================================================================== + +output=$(run_ceph orch host ls) +assert_contains "hosts listed" "hosts in cluster" "$output" + +# =================================================================== +echo "=== 3. Service listing (baseline) ===" +# =================================================================== + +output=$(run_ceph orch ls) +assert_contains "mon service" "mon" "$output" +assert_contains "mgr service" "mgr" "$output" + +# =================================================================== +echo "=== 4. Daemon listing ===" +# =================================================================== + +output=$(run_ceph orch ps) +assert_contains "has daemons" "mon" "$output" + +# Get first hostname for filter tests +first_host=$(run_ceph orch host ls | awk 'NR==2{print $1}') +if [ -n "$first_host" ]; then + output=$(run_ceph orch ps --hostname "$first_host") + assert_contains "ps filtered to host" "$first_host" "$output" + + output=$(run_ceph orch ps --daemon-type mon) + assert_contains "ps filtered to mon type" "mon" "$output" +fi + +# =================================================================== +echo "=== 5. Device listing ===" +# =================================================================== + +output=$(run_ceph orch device ls) +# Just verify it doesn't error out +assert_exit_ok "device ls succeeds" run_ceph orch device ls + +# =================================================================== +echo "=== 6. Apply RGW ===" +# =================================================================== + +# Clean up RGW if it was already enabled by prior test steps. +# `orch rm rgw` is now cluster-wide (fans out to every host running rgw). +run_ceph orch rm rgw >/dev/null 2>&1 || true +sleep 3 + +# Use first host for placement +placement="${first_host:-$(hostname)}" +output=$(run_ceph orch apply rgw default --placement="$placement") +assert_contains "rgw applied" "enabled|already active" "$output" +sleep 5 + +output=$(run_ceph orch ls) +assert_contains "rgw in service list" "rgw" "$output" + +output=$(run_ceph orch ps --daemon-type rgw) +assert_contains "rgw daemon visible" "rgw" "$output" + +# Per-host targeting check: the RGW daemon must end up on the +# requested host (first_host), not on whichever node served the unix +# socket. This is what the ?target= proxyTarget mechanism is for. +output=$(run_ceph orch ps --hostname "$placement" --daemon-type rgw) +assert_contains "rgw daemon on requested host $placement" "rgw" "$output" + +# =================================================================== +echo "=== 6b. Apply RGW on a second host (per-host targeting) ===" +# =================================================================== + +# Find a host that is NOT the first one to verify cross-node enablement +# from the local socket. Only run this leg when the cluster has more +# than one host (single-node deployments skip). +# Filter to rows whose first column looks like a hostname (starts with +# a letter) so the trailing "N hosts in cluster" footer line is not +# mistaken for a host name on single-node CI. +second_host=$(run_ceph orch host ls | awk 'NR>1 && NF>0 && $1 ~ /^[A-Za-z]/ && $1 != "'"$first_host"'" {print $1; exit}') +if [ -n "$second_host" ] && [ "$second_host" != "$first_host" ]; then + output=$(run_ceph orch apply rgw default --placement="$first_host,$second_host") + assert_contains "rgw applied to two hosts" "enabled|already active" "$output" + sleep 5 + + # Verify both hosts now run rgw. The local node (running the orch + # client) is the unix-socket endpoint; the second_host enablement + # exercises the unix-socket -> proxyTarget -> remote mTLS path. + output=$(run_ceph orch ps --hostname "$second_host" --daemon-type rgw) + assert_contains "rgw daemon on $second_host (cross-node target)" "rgw" "$output" +else + echo " SKIP per-host targeting RGW test (only one host in cluster)" +fi + +# =================================================================== +echo "=== 7. Apply NFS ===" +# =================================================================== + +output=$(run_ceph orch apply nfs testcluster --placement="$placement") +assert_contains "nfs applied" "enabled|already active" "$output" +sleep 5 + +output=$(run_ceph orch ls) +# NFS may not appear in service list if the service failed to start +# on the backend (e.g. missing kernel modules on CI runners). +if echo "$output" | grep -q "nfs.testcluster"; then + echo " PASS nfs in service list" + PASS=$((PASS + 1)) + NFS_RUNNING=true +else + echo " WARN nfs not in service list (service may have failed to start; skipping daemon check)" + NFS_RUNNING=false +fi + +if [ "$NFS_RUNNING" = true ]; then + output=$(run_ceph orch ps --daemon-type nfs) + assert_contains "nfs daemon visible" "nfs" "$output" +fi + +# =================================================================== +echo "=== 8. Restart service ===" +# =================================================================== + +output=$(run_ceph orch restart mon) +assert_contains "mon restarted" "Restarted mon on" "$output" + +# Per-host fan-out: in a multi-host cluster the restart output should +# enumerate each host running mon, not a single "Restarted mon". +if [ -n "${second_host:-}" ] && [ "$second_host" != "$first_host" ]; then + assert_contains "mon restart enumerates first host" "$first_host" "$output" +fi + +# Dotted-name guard: orch restart rgw.realm1 must fail with a clear +# "does not support a service_id" error rather than silently restart +# every rgw daemon. +output=$(run_ceph orch restart rgw.realm1 2>&1 || true) +assert_contains "dotted restart rejected" "does not support a service_id" "$output" + +# =================================================================== +echo "=== 9. Remove RGW ===" +# =================================================================== + +output=$(run_ceph orch rm rgw) +assert_contains "rgw removed" "removed from" "$output" +sleep 3 + +# remove_service fans out across every host running the service, so +# RGW must be gone from BOTH first_host and second_host (when present). +local_host="${first_host:-$(hostname)}" +output=$(run_ceph orch ps --hostname "$local_host" --daemon-type rgw) +assert_not_contains "rgw gone from first host" "rgw" "$output" + +if [ -n "${second_host:-}" ] && [ "$second_host" != "$first_host" ]; then + output=$(run_ceph orch ps --hostname "$second_host" --daemon-type rgw) + assert_not_contains "rgw gone from second host (cluster-wide remove)" "rgw" "$output" +fi + +# =================================================================== +echo "=== 10. Remove NFS ===" +# =================================================================== + +output=$(run_ceph orch rm nfs.testcluster 2>&1 || true) +# NFS removal may fail if the service never fully started. In that +# case remove_service now raises OrchestratorError("not running on +# any host"); accept either the success ("removed from") shape or +# that specific error. +if echo "$output" | grep -qE "removed from|not running on any host"; then + echo " PASS nfs removal completed (either removed or never started)" + PASS=$((PASS + 1)) +else + echo " WARN nfs removal returned unexpected output: $output" +fi +sleep 3 + +output=$(run_ceph orch ls) +assert_not_contains "nfs gone from list" "nfs" "$output" + +# =================================================================== +echo "" +echo "===========================================" +echo " Results: $PASS passed, $FAIL failed" +echo "===========================================" + +if [ ${#ERRORS[@]} -gt 0 ]; then + echo "" + echo "Failed tests:" + for e in "${ERRORS[@]}"; do + echo " - $e" + done + exit 1 +fi + +exit 0