diff --git a/scripts/aws_dual_node_outage_soak_mixed_churn.py b/scripts/aws_dual_node_outage_soak_mixed_churn.py index 792badc34..229da3947 100644 --- a/scripts/aws_dual_node_outage_soak_mixed_churn.py +++ b/scripts/aws_dual_node_outage_soak_mixed_churn.py @@ -122,7 +122,42 @@ def parse_args(): "--shutdown-gap", type=int, default=0, - help="Optional delay between shutting down the two selected nodes.", + help=( + "Legacy fixed delay between the two outages within an iteration. " + "When > 0 it overrides --outage-gap-min/--outage-gap-max with a " + "constant. Default 0 = use the random range below." + ), + ) + parser.add_argument( + "--outage-gap-min", + type=int, + default=15, + help=( + "Minimum seconds between applying outage 1 and outage 2 of an " + "iteration. The actual gap is drawn uniformly from " + "[--outage-gap-min, --outage-gap-max] and then capped per " + "method 1 so the requested --min-outage-overlap is guaranteed." + ), + ) + parser.add_argument( + "--outage-gap-max", + type=int, + default=180, + help=( + "Maximum seconds between applying outage 1 and outage 2 of an " + "iteration. Default 3 min." + ), + ) + parser.add_argument( + "--min-outage-overlap", + type=int, + default=10, + help=( + "Minimum seconds both outage targets must be simultaneously " + "not-online inside an iteration. Used to cap the inter-outage " + "gap when method 1's recovery window is short (e.g. " + "network_outage_20)." + ), ) parser.add_argument( "--log-file", @@ -844,32 +879,28 @@ def sbctl_allow_failure(self, args, timeout=600): ) return rc, stdout_text, stderr_text - def shutdown_with_migration_retry(self, node_id): - while True: - rc, stdout_text, stderr_text = self.sbctl_allow_failure( - f"sn shutdown {node_id}", - timeout=300, - ) - if rc == 0: - return - output = f"{stdout_text}\n{stderr_text}".lower() - retry_markers = ( - "migration", - "migrat", - "rebalanc", - "active task", - "running task", - "in_progress", - "in progress", - ) - if any(marker in output for marker in retry_markers): - self.logger.log( - f"Shutdown of {node_id} blocked by migration/rebalance/task; retrying in 15s" - ) - time.sleep(15) - continue + def _graceful_shutdown(self, node_id): + """Single-shot `sbctl sn shutdown`. Raises on failure. + + Previously this method looped on output markers like 'migration' + / 'rebalanc' / 'active task' and slept 15s between retries — the + soak would sit and wait for CP-side rebalancing to drain before + the outage could proceed. That retry was needed because + shutdown_storage_node used to call _check_ftt_allows_node_removal + (added in commit fbdffea3, 2026-03-28) which refused the + shutdown during rebalance with a stdout containing 'rebalanc'. + That call has been removed from the CP — shutdown is supposed to + proceed under the cluster's FTT contract, regardless of in-flight + rebalance, so we no longer need to wait here either. + """ + rc, stdout_text, stderr_text = self.sbctl_allow_failure( + f"sn shutdown {node_id}", + timeout=300, + ) + if rc != 0: raise RemoteCommandError( f"mgmt: command failed with rc={rc}: sbctl sn shutdown {node_id}" + f" | stdout={stdout_text.strip()} | stderr={stderr_text.strip()}" ) def prepare_client(self): @@ -1520,28 +1551,21 @@ def reraise_churn_error(self): # ----- outage methods --------------------------------------------------- def _forced_shutdown(self, node_id): - """Shutdown with --force; still retry if blocked by migration.""" - while True: - rc, stdout_text, stderr_text = self.sbctl_allow_failure( - f"sn shutdown {node_id} --force", - timeout=300, - ) - if rc == 0: - return - output = f"{stdout_text}\n{stderr_text}".lower() - retry_markers = ( - "migration", "migrat", "rebalanc", - "active task", "running task", - "in_progress", "in progress", - ) - if any(m in output for m in retry_markers): - self.logger.log( - f"Forced shutdown of {node_id} blocked by migration/task; retrying in 15s" - ) - time.sleep(15) - continue + """Single-shot `sbctl sn shutdown --force`. Raises on failure. + + --force bypasses every shutdown guard inside the CP, so the + retry-on-migration-markers loop that used to live here never + actually fired — sbctl --force does not return 'migration' / + 'rebalanc' / 'active task' strings. Removed for clarity. + """ + rc, stdout_text, stderr_text = self.sbctl_allow_failure( + f"sn shutdown {node_id} --force", + timeout=300, + ) + if rc != 0: raise RemoteCommandError( f"mgmt: command failed with rc={rc}: sbctl sn shutdown {node_id} --force" + f" | stdout={stdout_text.strip()} | stderr={stderr_text.strip()}" ) def _container_kill(self, node_id): @@ -1672,14 +1696,30 @@ def _inter_iteration_nic_chaos(self): self.wait_for_cluster_stable(require_no_rebalance=self.args.wait_for_rebalance) def _network_outage(self, node_id, duration): - """Take all data NICs down on one storage node for *duration* seconds, - then bring them back up. Simulates a transient network partition of - a single node. Node is expected to auto-recover once the NICs return - — no sbctl restart is issued.""" + """Drop data NICs on one storage node; schedule the NIC bring-up + ``duration`` seconds later on a background daemon thread, then + return. + + Previously this method blocked for the full ``duration`` (the + sleep ran inline before bringing NICs back up). That made it + impossible to overlap a network_outage_N outage with a second + outage applied within the same iteration — by the time + run_outage_pair called _apply_outage for node 2, node 1's NICs + were already up and the CP was already healing it. Decoupling + the bring-up from the call site lets the second outage land + while the first node is still partitioned. + + The bring-up thread is daemonized so the soak's exit (atexit / + unhandled exception) does not block on it. We do NOT join the + thread anywhere in the iteration: the only thing that depends + on the NICs being back up is the next iteration's + wait_for_all_online, which polls anyway. + """ host = self._node_host(node_id) nics = self._get_data_nics() or ["eth1"] self.logger.log( - f"network_outage on {node_id}: dropping {nics} for {duration}s" + f"network_outage on {node_id}: dropping {nics} for {duration}s " + "(async bring-up)" ) for nic in nics: try: @@ -1687,20 +1727,26 @@ def _network_outage(self, node_id, duration): label=f"netout down {nic} on {node_id}") except Exception as e: self.logger.log(f"WARNING: failed to down {nic} on {node_id}: {e}") - try: - time.sleep(duration) - finally: - for nic in nics: - try: - host.run(f"sudo ip link set {nic} up", timeout=10, check=False, - label=f"netout up {nic} on {node_id}") - except Exception as e: - self.logger.log(f"WARNING: failed to up {nic} on {node_id}: {e}") + + def _bring_up_later(): + try: + time.sleep(duration) + finally: + for nic in nics: + try: + host.run(f"sudo ip link set {nic} up", timeout=10, check=False, + label=f"netout up {nic} on {node_id}") + except Exception as e: + self.logger.log(f"WARNING: failed to up {nic} on {node_id}: {e}") + + t = threading.Thread(target=_bring_up_later, daemon=True, + name=f"netout-bringup-{node_id[:8]}") + t.start() def _apply_outage(self, node_id, method): self.logger.log(f"Applying outage '{method}' on {node_id}") if method == "graceful": - self.shutdown_with_migration_retry(node_id) + self._graceful_shutdown(node_id) elif method == "forced": self._forced_shutdown(node_id) elif method == "container_kill": @@ -1751,15 +1797,89 @@ def wait_node_leaves_online(self, node_id, timeout=90, poll=2): time.sleep(poll) return False + # Conservative lower bound on how long node stays not-online for each + # outage method. Used to cap the inter-outage gap so that the + # configured --min-outage-overlap is guaranteed (the gap can never + # eat the entire recovery window of outage 1). Real recovery is + # usually longer; underestimating keeps the overlap invariant safe. + # + # graceful / forced: the node stays in OFFLINE until run_outage_pair + # issues `sn restart` later in the iteration — so the unavailability + # window is effectively unbounded from the gap's perspective. Use a + # very large sentinel. + _METHOD_MIN_UNAVAIL_S = { + "graceful": 10_000, + "forced": 10_000, + # CP detection + auto-restart takes at least this long in practice. + "container_kill": 30, + # Reboot itself, BIOS, boot, SPDK start. Floor is generous. + "host_reboot": 90, + # network_outage_N handled by name parsing below. + } + + def _expected_min_unavail_seconds(self, method): + if method.startswith("network_outage_"): + try: + return int(method.rsplit("_", 1)[-1]) + except ValueError: + return 30 + return self._METHOD_MIN_UNAVAIL_S.get(method, 30) + + def _pick_outage_gap(self, method1): + """Random gap in [outage_gap_min, outage_gap_max], capped per + method1 so --min-outage-overlap is guaranteed. + + --shutdown-gap > 0 overrides everything with a fixed constant + (legacy behaviour; emit a warning if the constant would violate + the overlap invariant for method1). + """ + overlap = max(0, self.args.min_outage_overlap) + unavail = self._expected_min_unavail_seconds(method1) + # Hard upper bound: gap + overlap <= unavail => gap <= unavail - overlap + cap = max(1, unavail - overlap) + + if self.args.shutdown_gap and self.args.shutdown_gap > 0: + gap = self.args.shutdown_gap + if gap > cap: + self.logger.log( + f"WARNING: --shutdown-gap={gap}s exceeds method1={method1}'s " + f"safe cap {cap}s; overlap of {overlap}s is NOT guaranteed" + ) + return gap + + lo = max(1, self.args.outage_gap_min) + hi = max(lo, self.args.outage_gap_max) + # Clamp the upper bound to the cap; clamp the lower bound to + # respect the cap too (otherwise random.randint would raise). + hi = min(hi, cap) + lo = min(lo, hi) + gap = random.randint(lo, hi) + self.logger.log( + f"Outage gap chosen: {gap}s " + f"(range=[{lo},{hi}], cap={cap}s for method1={method1}, " + f"min-overlap={overlap}s)" + ) + return gap + def run_outage_pair(self, node1, node2, method1, method2): self.logger.log( f"Outage pair: {node1}={method1} and {node2}={method2}" ) - # Apply first outage, then optional gap, then second outage. + # Apply first outage, then a method1-aware gap, then second outage. + # The gap is bounded so node 1's recovery window is guaranteed to + # span at least --min-outage-overlap seconds after node 2 goes + # down — i.e., both nodes are simultaneously not-online for the + # configured minimum. + gap = self._pick_outage_gap(method1) + t_outage1 = time.time() self._apply_outage(node1, method1) - if self.args.shutdown_gap: - time.sleep(self.args.shutdown_gap) + time.sleep(gap) + t_outage2 = time.time() self._apply_outage(node2, method2) + self.logger.log( + f"Outage pair applied: outage1 at t=0, outage2 at " + f"t={t_outage2 - t_outage1:.1f}s (gap={gap}s)" + ) # Issue sbctl restart only for methods that leave the node in a # "shutdown" state that the CP won't recover on its own. diff --git a/scripts/lab_dual_node_outage_soak_mixed_churn.py b/scripts/lab_dual_node_outage_soak_mixed_churn.py new file mode 100644 index 000000000..fdb6b6e02 --- /dev/null +++ b/scripts/lab_dual_node_outage_soak_mixed_churn.py @@ -0,0 +1,1912 @@ +#!/usr/bin/env python3 +"""Lab variant of aws_dual_node_outage_soak_mixed_churn.py. + +Designed to run from the simplyblock jump host against the bare-metal lab +cluster (mgmt 192.168.10.210, storage .201-.204) deployed by +setup_lab_perf_test1.py. + +Iteration pattern (load during outage, unload during settle): + 1. start fio on every volume + 2. apply the dual-node outage pair (fio takes the hit) + 3. wait for both nodes back online + post-outage check_fio (fault gate) + 4. stop fio + 5. optionally rebuild one randomly-selected volume + (every --churn-every-n-iters iterations) + 6. wait_for_cluster_stable + wait_for_data_migration_complete -- now + UNLOADED, so migration drains fast + 7. next iteration + +This trades the AWS-variant's "fio runs continuously across outages plus +a 3-20 minute background churn timer" for a deterministic per-iteration +pattern: settling never happens under fio load, so iteration time drops +to whatever rebalance takes on a quiet cluster. There is no inter- +iteration NIC chaos. + +Differences vs. the AWS variant beyond the iteration pattern: + - SSH uses a single shared root password (CLI flag, env var, or prompt), + not an EC2 keypair. Install paramiko if you can ("pip3 install --user + paramiko" on the jump host); otherwise the script falls back to + `sshpass + ssh`, which is slower because every command opens a fresh + connection. + - Defaults match the lab topology: --expected-node-count 4, metadata + file cluster_metadata_base.json (the file setup_lab_perf_test1.py + writes), mount root /root/lab_outage_soak_*. + +Typical invocation (from the jump host, after setup_lab_perf_test1.py has +created the cluster and written cluster_metadata_base.json next to this +script): + + python3 ~/lab_dual_node_outage_soak_mixed_churn.py # prompts for password + SBCLI_ROOT_PASSWORD='...' python3 ~/lab_dual_node_outage_soak_mixed_churn.py + python3 ~/lab_dual_node_outage_soak_mixed_churn.py --password '...' +""" +import argparse +import getpass +import itertools +import json +import logging +import os +import posixpath +import random +import re +import shlex +import subprocess +import sys +import threading +import time +from dataclasses import dataclass +from pathlib import Path + +try: + import paramiko +except ImportError: + paramiko = None + +# Silence paramiko's Transport-thread "Socket exception: Connection +# reset by peer (104)" prints. They fire whenever an open SSH +# connection to a storage node gets RST'd by a planned event — +# host_reboot outage tearing down sshd, NIC down/up flapping, etc. +# The retry/reconnect logic handles it cleanly; the stack-trace-less +# stderr lines just clutter the soak output. +logging.getLogger("paramiko").setLevel(logging.CRITICAL) +logging.getLogger("paramiko.transport").setLevel(logging.CRITICAL) + + +UUID_RE = re.compile(r"[a-f0-9]{8}(?:-[a-f0-9]{4}){3}-[a-f0-9]{12}") +# `sbctl lvol connect` emits `sudo nvme connect ... --nqn= ...` +# (long form with `=`, see lvol_controller.py:1737). Tolerate the legacy +# short form `-n ` as well so older sbctl deployments still parse. +NQN_RE = re.compile(r"(?:--nqn[=\s]+|-n\s+)(\S+)") + + +OUTAGE_METHODS = ( + "graceful", "forced", "container_kill", "host_reboot", + "network_outage_20", "network_outage_50", +) +# Methods that leave the node in a state where it recovers on its own +# (no sbctl restart required from the soak driver). +AUTO_RECOVER_METHODS = ( + "container_kill", "host_reboot", + "network_outage_20", "network_outage_50", +) + +# Scenario enumeration: +# 3 role categories × P(M,2) ordered distinct-method pairs +# = 3 × M·(M-1) scenarios per cycle. +# Examples: +# M=5 → 3 × 20 = 60 +# M=6 → 3 × 30 = 90 +# Role categories (relative ring-distance preserved; the actual node pair +# is re-rolled randomly per scenario at execution time so the soak hits +# many different concrete pairs while keeping the topological distance +# fixed for each category). +# Order matters: the soak walks the full method permutation for one +# category before moving on. "unrelated" runs first so the outage with +# the widest blast-radius coverage (two nodes from different LVS rings) +# exercises the cluster before the within-ring categories. +# - unrelated : pair sharing no LVS in any role — ring-distance +# ≥ 3 (≥ 2 nodes between). +# - primary_tertiary : primary + tertiary of same LVS — ring-distance +# 2 (exactly one node between); no replication +# edge connects them (jumps over the secondary). +# - primary_secondary : primary + secondary of same LVS — ring-distance +# 1 (direct successor). Represents both (P,S) and +# (S,T): two adjacent replicas of the same LVS +# going down is structurally symmetric regardless +# of which end. +# Same-method pairs (graceful,graceful etc.) are not enumerated — the +# user-agreed count 30 for 6 methods equals 6·5, not 6². +ROLE_CATEGORIES = ("unrelated", "primary_tertiary", "primary_secondary") + + +def parse_args(): + default_metadata = Path(__file__).with_name("cluster_metadata_base.json") + default_log_dir = Path(__file__).parent + + parser = argparse.ArgumentParser( + description=( + "Run a long fio soak against the lab cluster while cycling random " + "two-node outages with mixed outage methods. Each iteration: start " + "fio on every volume, apply the outage pair, fault-check fio, " + "stop fio, optionally rebuild one volume, then wait for the cluster " + "to settle UNLOADED (no IO pressure on rebalance/data-migration). " + "Trades 'fio always running' for fast iteration time." + ) + ) + parser.add_argument("--metadata", default=str(default_metadata), help="Path to cluster metadata JSON.") + parser.add_argument("--pool", default="pool01", help="Pool name for volume creation.") + parser.add_argument("--expected-node-count", type=int, default=4, help="Required storage node count.") + parser.add_argument("--volume-size", default="25G", help="Volume size to create per storage node.") + parser.add_argument("--runtime", type=int, default=72000, help="fio runtime in seconds.") + parser.add_argument("--restart-timeout", type=int, default=900, help="Seconds to wait for restarted nodes.") + parser.add_argument("--rebalance-timeout", type=int, default=7200, help="Seconds to wait for rebalancing.") + parser.add_argument("--poll-interval", type=int, default=10, help="Poll interval for health checks.") + parser.add_argument( + "--shutdown-gap", + type=int, + default=0, + help="Optional delay between shutting down the two selected nodes.", + ) + parser.add_argument( + "--log-file", + default=str(default_log_dir / f"aws_dual_node_outage_soak_{time.strftime('%Y%m%d_%H%M%S')}.log"), + help="Single log file for script and CLI output.", + ) + parser.add_argument( + "--run-on-mgmt", + action="store_true", + help="Run management-node commands locally instead of over SSH.", + ) + parser.add_argument( + "--password", + default=None, + help=( + "Root password shared by mgmt+storage nodes. If omitted, falls " + "back to the SBCLI_ROOT_PASSWORD env var, then to an interactive " + "prompt. Avoid the flag form on shared hosts (visible in `ps`)." + ), + ) + parser.add_argument( + "--methods", + default=",".join(OUTAGE_METHODS), + help=( + "Comma-separated subset of outage methods to pick from per iteration. " + f"Choices: {','.join(OUTAGE_METHODS)}. " + "Each iteration picks 2 distinct methods at random." + ), + ) + parser.add_argument( + "--auto-recover-wait", + type=int, + default=900, + help=( + "Seconds to wait for a node to return online after a container_kill " + "or host_reboot outage (no sbctl restart is issued)." + ), + ) + parser.add_argument( + "--cycles", + type=int, + default=1, + help=( + "Number of passes through the full deterministic scenario list. " + "Each pass covers C(N,2)*M² scenarios (250 for 5 nodes × 5 methods; " + "540 for 6 × 6). 0 means loop forever." + ), + ) + parser.add_argument( + "--shuffle-scenarios", + action="store_true", + help=( + "Shuffle scenario order per cycle (seeded deterministically off " + "the cycle index). Useful when a full cycle is too long to finish " + "and you want even coverage across early/mid/late pairs." + ), + ) + parser.add_argument( + "--start-at", + type=int, + default=1, + help=( + "Start the first cycle at scenario N (1-indexed). Scenarios " + "1..N-1 are skipped in the first cycle only; subsequent cycles " + "run from scenario 1 as normal. Use to resume after a failure — " + "e.g. --start-at 60 if scenario 60 is the one that failed." + ), + ) + parser.add_argument( + "--churn-every-n-iters", + type=int, + default=3, + help=( + "Rebuild one randomly-chosen volume every N outage iterations, " + "in the unloaded window between fio-stop and rebalance-wait. " + "Default 3. Set to 0 to disable; --no-churn is an explicit alias." + ), + ) + parser.add_argument( + "--no-churn", + action="store_true", + help="Disable per-iteration volume churn entirely.", + ) + args = parser.parse_args() + methods = [m.strip() for m in args.methods.split(",") if m.strip()] + bad = [m for m in methods if m not in OUTAGE_METHODS] + if bad: + parser.error(f"Unknown outage method(s): {bad}. Choices: {list(OUTAGE_METHODS)}") + if not methods: + parser.error("At least one outage method must be enabled") + args.methods = methods + if args.churn_every_n_iters < 0: + parser.error("--churn-every-n-iters must be >= 0") + args.password = resolve_password(args.password) + if not args.password: + parser.error("Empty root password; supply --password, SBCLI_ROOT_PASSWORD, or answer the prompt.") + if subprocess.run(["which", "sshpass"], capture_output=True).returncode != 0 and paramiko is None: + parser.error( + "Neither paramiko nor sshpass is available on this host. " + "Install one ('pip3 install --user paramiko' or 'sudo dnf install sshpass') " + "before running." + ) + return args + + +def resolve_password(cli_value): + if cli_value: + return cli_value + env_value = os.environ.get("SBCLI_ROOT_PASSWORD") + if env_value: + return env_value + return getpass.getpass("Root password for lab nodes (.210, .201-.204): ") + + +def load_metadata(path): + with open(path, "r", encoding="utf-8") as handle: + return json.load(handle) + + +class Logger: + def __init__(self, path): + self.path = path + self.lock = threading.Lock() + Path(path).parent.mkdir(parents=True, exist_ok=True) + + def log(self, message): + line = f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {message}" + with self.lock: + print(line, flush=True) + with open(self.path, "a", encoding="utf-8") as handle: + handle.write(line + "\n") + + def block(self, header, content): + if content is None: + return + text = content.rstrip() + if not text: + return + with self.lock: + with open(self.path, "a", encoding="utf-8") as handle: + handle.write(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] {header}\n") + handle.write(text + "\n") + + +class RemoteCommandError(RuntimeError): + pass + + +class RemoteHost: + def __init__(self, hostname, user, password, logger, name): + self.hostname = hostname + self.user = user + self.password = password + self.logger = logger + self.name = name + self.client = None + self.connect() + + def connect(self): + if paramiko is None: + return + self.close() + last_error = None + for attempt in range(1, 16): + try: + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + client.connect( + hostname=self.hostname, + username=self.user, + password=self.password, + timeout=15, + banner_timeout=15, + auth_timeout=15, + allow_agent=False, + look_for_keys=False, + ) + transport = client.get_transport() + if transport is not None: + transport.set_keepalive(30) + self.client = client + return + except Exception as exc: + last_error = exc + self.logger.log( + f"{self.name}: SSH attempt {attempt}/15 failed to {self.hostname}: {exc}" + ) + time.sleep(5) + raise RemoteCommandError(f"{self.name}: failed to connect to {self.hostname}: {last_error}") + + def run(self, command, timeout=600, check=True, label=None): + if paramiko is None: + return self._run_via_ssh_cli(command, timeout=timeout, check=check, label=label) + if self.client is None: + self.connect() + label = label or command + self.logger.log(f"{self.name}: RUN {label}") + try: + stdin, stdout, stderr = self.client.exec_command(command, timeout=timeout) + stdout_text = stdout.read().decode("utf-8", errors="replace") + stderr_text = stderr.read().decode("utf-8", errors="replace") + rc = stdout.channel.recv_exit_status() + except Exception as exc: + self.logger.log(f"{self.name}: command transport failure for {label}: {exc}; reconnecting once") + self.connect() + stdin, stdout, stderr = self.client.exec_command(command, timeout=timeout) + stdout_text = stdout.read().decode("utf-8", errors="replace") + stderr_text = stderr.read().decode("utf-8", errors="replace") + rc = stdout.channel.recv_exit_status() + self.logger.block(f"{self.name}: STDOUT for {label}", stdout_text) + self.logger.block(f"{self.name}: STDERR for {label}", stderr_text) + if check and rc != 0: + raise RemoteCommandError( + f"{self.name}: command failed with rc={rc}: {label}" + ) + return rc, stdout_text, stderr_text + + def _run_via_ssh_cli(self, command, timeout=600, check=True, label=None): + label = label or command + self.logger.log(f"{self.name}: RUN {label}") + ssh_cmd = [ + "sshpass", "-e", + "ssh", + "-o", "StrictHostKeyChecking=no", + "-o", "UserKnownHostsFile=/dev/null", + "-o", "LogLevel=ERROR", + "-o", "ConnectTimeout=15", + "-o", "ServerAliveInterval=30", + f"{self.user}@{self.hostname}", + command, + ] + env = os.environ.copy() + env["SSHPASS"] = self.password + try: + completed = subprocess.run( + ssh_cmd, + capture_output=True, + text=True, + timeout=timeout, + check=False, + env=env, + ) + except subprocess.TimeoutExpired as exc: + stdout_text = exc.stdout or "" + stderr_text = exc.stderr or "" + self.logger.block(f"{self.name}: STDOUT for {label}", stdout_text) + self.logger.block(f"{self.name}: STDERR for {label}", stderr_text) + raise RemoteCommandError(f"{self.name}: command timed out: {label}") from exc + stdout_text = completed.stdout or "" + stderr_text = completed.stderr or "" + rc = completed.returncode + self.logger.block(f"{self.name}: STDOUT for {label}", stdout_text) + self.logger.block(f"{self.name}: STDERR for {label}", stderr_text) + if check and rc != 0: + raise RemoteCommandError(f"{self.name}: command failed with rc={rc}: {label}") + return rc, stdout_text, stderr_text + + def close(self): + if self.client is not None: + self.client.close() + self.client = None + + +class LocalHost: + def __init__(self, logger, name): + self.logger = logger + self.name = name + + def run(self, command, timeout=600, check=True, label=None): + label = label or command + self.logger.log(f"{self.name}: RUN {label}") + try: + completed = subprocess.run( + ["/bin/bash", "-lc", command], + capture_output=True, + text=True, + timeout=timeout, + check=False, + ) + except subprocess.TimeoutExpired as exc: + stdout_text = exc.stdout or "" + stderr_text = exc.stderr or "" + self.logger.block(f"{self.name}: STDOUT for {label}", stdout_text) + self.logger.block(f"{self.name}: STDERR for {label}", stderr_text) + raise RemoteCommandError(f"{self.name}: command timed out: {label}") from exc + stdout_text = completed.stdout or "" + stderr_text = completed.stderr or "" + rc = completed.returncode + self.logger.block(f"{self.name}: STDOUT for {label}", stdout_text) + self.logger.block(f"{self.name}: STDERR for {label}", stderr_text) + if check and rc != 0: + raise RemoteCommandError(f"{self.name}: command failed with rc={rc}: {label}") + return rc, stdout_text, stderr_text + + def close(self): + return + + +# Number of fio worker processes per --name. Must match --numjobs in +# start_fio(). --group_reporting aggregates all workers into one report, +# so a single fio summary + per-run stderr stream is sufficient to +# diagnose any fio fault. +FIO_NUMJOBS = 4 + + +@dataclass +class FioJob: + volume_id: str + volume_name: str + mount_point: str + fio_log: str # fio's --output summary file (written on exit) + fio_stderr: str # captured stdout+stderr during the run (progress, + # errors, "max_latency exceeded" messages). This is + # the primary source of ground truth for fio faults. + rc_file: str + pid: int + fio_name: str # matches --name= in the fio command line + + +class TestRunError(RuntimeError): + pass + + +class SoakRunner: + def __init__(self, args, metadata, logger): + self.args = args + self.metadata = metadata + self.logger = logger + self.user = metadata["user"] + self.password = args.password + self.run_id = time.strftime("%Y%m%d_%H%M%S") + if args.run_on_mgmt: + self.mgmt = LocalHost(logger, "mgmt") + else: + self.mgmt = RemoteHost(metadata["mgmt"]["public_ip"], self.user, self.password, logger, "mgmt") + client_entry = metadata["clients"][0] + if args.run_on_mgmt: + client_addr = client_entry.get("private_ip") or client_entry["public_ip"] + else: + client_addr = client_entry["public_ip"] + self.client = RemoteHost(client_addr, self.user, self.password, logger, "client") + self.cluster_id = metadata.get("cluster_uuid") or "" + self.fio_jobs = [] + # Stored so the churn cycle can pick a random volume to rebuild. + self.volumes = [] + self.created_volume_ids = [] + # Mixed-outage state + self.methods = list(args.methods) + # On multipath clusters, network-layer coverage is provided by the + # inter-iteration single-NIC chaos. Dropping all data NICs on a node + # (network_outage_*) is a simple-cluster-only scenario. + if self._is_multipath(): + filtered = [m for m in self.methods if not m.startswith("network_outage_")] + dropped = [m for m in self.methods if m not in filtered] + if dropped: + self.logger.log( + f"multipath cluster detected: excluding {dropped} from outage methods" + ) + if not filtered: + raise TestRunError( + "No outage methods remain after excluding network_outage_* " + "on multipath cluster; pass --methods with at least one " + "non-network_outage method" + ) + self.methods = filtered + self.node_hosts = {} # uuid -> RemoteHost (private_ip of storage node) + self.node_ip_map = self._build_node_ip_map() + # Counter incremented by every churn cycle; embedded in fio --name and + # the rebuilt volume name so logs/pkill targets remain unique across + # iterations even when the same volume index is rebuilt repeatedly. + self.churn_counter = 0 + self.mount_root = None + + def close(self): + self.client.close() + self.mgmt.close() + for host in self.node_hosts.values(): + try: + host.close() + except Exception: + pass + + def _build_node_ip_map(self): + """Return {uuid: private_ip} for every storage node we know about.""" + ip_map = {} + topology = self.metadata.get("topology") or {} + for node in topology.get("nodes", []): + uuid = node.get("uuid") + ip = node.get("management_ip") or node.get("private_ip") + if uuid and ip: + ip_map[uuid] = ip + # Fallback: pair storage_nodes list with sbctl-returned UUIDs by mgmt IP, + # which is done lazily in _resolve_node_ip below. + return ip_map + + def _resolve_node_ip(self, uuid): + """Return the private/mgmt IP for a storage node UUID, refreshing via + sbctl if we haven't seen it in metadata.""" + ip = self.node_ip_map.get(uuid) + if ip: + return ip + # Try fetching via sbctl sn list JSON. + nodes = self.sbctl("sn list --json", json_output=True) + for node in nodes: + candidate_ip = ( + node.get("Management IP") + or node.get("Mgmt IP") + or node.get("mgmt_ip") + or node.get("management_ip") + ) + if node.get("UUID") == uuid and candidate_ip: + self.node_ip_map[uuid] = candidate_ip + return candidate_ip + raise TestRunError(f"Cannot resolve storage-node IP for UUID {uuid}") + + def _node_host(self, uuid): + """Lazily create a RemoteHost for a storage node identified by UUID.""" + if uuid in self.node_hosts: + return self.node_hosts[uuid] + ip = self._resolve_node_ip(uuid) + host = RemoteHost(ip, self.user, self.password, self.logger, f"sn[{ip}]") + self.node_hosts[uuid] = host + return host + + def sbctl(self, args, timeout=600, json_output=False): + command = "sudo /usr/local/bin/sbctl -d " + args + _, stdout_text, stderr_text = self.mgmt.run( + command, + timeout=timeout, + check=True, + label=f"sbctl {args}", + ) + if not json_output: + return stdout_text + for candidate in (stdout_text, stderr_text, stdout_text + "\n" + stderr_text): + candidate = candidate.strip() + if not candidate: + continue + try: + return json.loads(candidate) + except json.JSONDecodeError: + pass + decoder = json.JSONDecoder() + final_payloads = [] + list_payloads = [] + dict_payloads = [] + for start, char in enumerate(candidate): + if char not in "[{": + continue + try: + obj, end = decoder.raw_decode(candidate[start:]) + except json.JSONDecodeError: + continue + if not isinstance(obj, (dict, list)): + continue + if not candidate[start + end:].strip(): + final_payloads.append(obj) + elif isinstance(obj, list): + list_payloads.append(obj) + else: + dict_payloads.append(obj) + if final_payloads: + return final_payloads[-1] + if list_payloads: + return list_payloads[-1] + if dict_payloads: + return dict_payloads[-1] + raise TestRunError(f"Failed to parse JSON from sbctl {args}") + + def ensure_prerequisites(self): + self.logger.log(f"Using password auth as {self.user}; paramiko={'yes' if paramiko else 'no (sshpass fallback)'}") + self.client.run( + "if command -v dnf >/dev/null 2>&1; then " + "sudo dnf install -y nvme-cli fio xfsprogs; " + "else sudo apt-get update && sudo apt-get install -y nvme-cli fio xfsprogs; fi", + timeout=1800, + label="install client packages", + ) + self.client.run("sudo modprobe nvme_tcp", timeout=60, label="load nvme_tcp") + + def get_cluster_id(self): + if self.cluster_id: + return self.cluster_id + clusters = self.sbctl("cluster list --json", json_output=True) + if not clusters: + raise TestRunError("No clusters returned by sbctl cluster list") + self.cluster_id = clusters[0]["UUID"] + return self.cluster_id + + def get_nodes(self): + nodes = self.sbctl("sn list --json", json_output=True) + parsed = [] + for node in nodes: + parsed.append( + { + "uuid": node["UUID"], + "status": str(node.get("Status", "")).lower(), + "mgmt_ip": node.get("Mgmt IP") or node.get("mgmt_ip") or "", + "hostname": node.get("Hostname") or "", + } + ) + return parsed + + def ensure_expected_nodes(self): + nodes = self.get_nodes() + if len(nodes) != self.args.expected_node_count: + raise TestRunError( + f"Expected {self.args.expected_node_count} storage nodes, found {len(nodes)}. " + f"Update metadata or pass --expected-node-count." + ) + return nodes + + def assert_cluster_not_suspended(self): + clusters = self.sbctl("cluster list --json", json_output=True) + if not clusters: + raise TestRunError("Cluster list returned no rows") + status = str(clusters[0].get("Status", "")).lower() + if status == "suspended": + raise TestRunError("Cluster is suspended") + return status + + def wait_for_all_online(self, target_nodes=None, timeout=None): + timeout = timeout or self.args.restart_timeout + expected = self.args.expected_node_count + target_nodes = set(target_nodes or []) + started = time.time() + while time.time() - started < timeout: + self.assert_cluster_not_suspended() + nodes = self.ensure_expected_nodes() + statuses = {node["uuid"]: node["status"] for node in nodes} + offline = [uuid for uuid, status in statuses.items() if status != "online"] + unaffected_bad = [ + uuid for uuid, status in statuses.items() + if uuid not in target_nodes and status != "online" + ] + if unaffected_bad: + raise TestRunError( + "Unaffected nodes are not online: " + + ", ".join(f"{uuid}:{statuses[uuid]}" for uuid in unaffected_bad) + ) + if not offline and len(statuses) == expected: + return nodes + self.logger.log( + "Waiting for all nodes online: " + + ", ".join(f"{uuid}:{status}" for uuid, status in statuses.items()) + ) + time.sleep(self.args.poll_interval) + raise TestRunError("Timed out waiting for nodes to return online") + + def wait_for_cluster_stable(self): + cluster_id = self.get_cluster_id() + started = time.time() + while time.time() - started < self.args.rebalance_timeout: + cluster_list = self.sbctl("cluster list --json", json_output=True) + status = str(cluster_list[0].get("Status", "")).lower() + if status == "suspended": + raise TestRunError("Cluster entered suspended state") + cluster_info = self.sbctl(f"cluster get {cluster_id}", json_output=True) + rebalancing = bool(cluster_info.get("is_re_balancing", False)) + nodes = self.ensure_expected_nodes() + node_statuses = {node["uuid"]: node["status"] for node in nodes} + if status == "active" and not rebalancing and all( + state == "online" for state in node_statuses.values() + ): + self.logger.log("Cluster stable: ACTIVE, online, not rebalancing") + return + self.logger.log( + "Waiting for cluster stability: " + f"status={status}, rebalancing={rebalancing}, " + + ", ".join(f"{uuid}:{state}" for uuid, state in node_statuses.items()) + ) + time.sleep(self.args.poll_interval) + raise TestRunError("Timed out waiting for cluster rebalancing to finish") + + def get_active_tasks(self): + cluster_id = self.get_cluster_id() + script = ( + "import json; " + "from simplyblock_core import db_controller; " + "from simplyblock_core.models.job_schedule import JobSchedule; " + "db = db_controller.DBController(); " + f"tasks = db.get_job_tasks({cluster_id!r}, reverse=False); " + "out = [t.get_clean_dict() for t in tasks " + "if t.status != JobSchedule.STATUS_DONE and not getattr(t, 'canceled', False)]; " + "print(json.dumps(out))" + ) + out = self.mgmt.run( + f"sudo python3 -c {shlex.quote(script)}", + timeout=60, + label="list active tasks", + )[1].strip() + return json.loads(out or "[]") + + def wait_for_no_active_tasks(self, reason): + started = time.time() + while time.time() - started < self.args.rebalance_timeout: + self.assert_cluster_not_suspended() + active_tasks = self.get_active_tasks() + if not active_tasks: + return + details = ", ".join( + f"{task.get('function_name')}:{task.get('status')}:{task.get('node_id') or task.get('device_id')}" + for task in active_tasks + ) + self.logger.log(f"Waiting before {reason}; active tasks: {details}") + time.sleep(self.args.poll_interval) + raise TestRunError(f"Timed out waiting for active tasks to finish before {reason}") + + @staticmethod + def _is_data_migration_task(task): + function_name = str(task.get("function_name", "")).lower() + task_name = str(task.get("task_name", "")).lower() + task_type = str(task.get("task_type", "")).lower() + haystack = " ".join([function_name, task_name, task_type]) + markers = ( + "migration", + "rebalanc", + "sync", + ) + return any(marker in haystack for marker in markers) + + def wait_for_data_migration_complete(self, reason): + started = time.time() + while time.time() - started < self.args.rebalance_timeout: + self.assert_cluster_not_suspended() + active_tasks = self.get_active_tasks() + migration_tasks = [task for task in active_tasks if self._is_data_migration_task(task)] + if not migration_tasks: + return + details = ", ".join( + f"{task.get('function_name')}:{task.get('status')}:{task.get('node_id') or task.get('device_id')}" + for task in migration_tasks + ) + self.logger.log(f"Waiting before {reason}; data migration tasks: {details}") + time.sleep(self.args.poll_interval) + raise TestRunError( + f"Timed out waiting for data migration tasks to finish before {reason}" + ) + + def sbctl_allow_failure(self, args, timeout=600): + command = "sudo /usr/local/bin/sbctl -d " + args + rc, stdout_text, stderr_text = self.mgmt.run( + command, + timeout=timeout, + check=False, + label=f"sbctl {args}", + ) + return rc, stdout_text, stderr_text + + def shutdown_with_migration_retry(self, node_id): + while True: + rc, stdout_text, stderr_text = self.sbctl_allow_failure( + f"sn shutdown {node_id}", + timeout=300, + ) + if rc == 0: + return + output = f"{stdout_text}\n{stderr_text}".lower() + retry_markers = ( + "migration", + "migrat", + "rebalanc", + "active task", + "running task", + "in_progress", + "in progress", + ) + if any(marker in output for marker in retry_markers): + self.logger.log( + f"Shutdown of {node_id} blocked by migration/rebalance/task; retrying in 15s" + ) + time.sleep(15) + continue + raise RemoteCommandError( + f"mgmt: command failed with rc={rc}: sbctl sn shutdown {node_id}" + ) + + def prepare_client(self): + home = "/root" if self.user == "root" else posixpath.join("/home", self.user) + mount_root = posixpath.join(home, f"lab_outage_soak_{self.run_id}") + command = ( + "sudo pkill -f '[f]io --name=aws_dual_soak_' || true\n" + f"sudo mkdir -p {shlex.quote(mount_root)}\n" + f"sudo chown {shlex.quote(self.user)}:{shlex.quote(self.user)} {shlex.quote(mount_root)}\n" + ) + self.client.run(f"bash -lc {shlex.quote(command)}", timeout=120, label="prepare client workspace") + return mount_root + + def extract_uuid(self, text): + for line in reversed(text.splitlines()): + stripped = line.strip() + if UUID_RE.fullmatch(stripped): + return stripped + raise TestRunError(f"Failed to extract standalone UUID from output: {text}") + + def _create_one_volume(self, volume_name, node_uuid, index): + """Create one lvol bound to ``node_uuid`` and return its volume dict. + + Retries inside the rebalance window if the LVStore is being recreated + or while a rebalance / data migration is in flight, matching the + behaviour of the bulk ``create_volumes`` path. + """ + volume_id = None + started = time.time() + while time.time() - started < self.args.rebalance_timeout: + self.wait_for_all_online(timeout=self.args.restart_timeout) + self.wait_for_cluster_stable() + output = self.sbctl( + f"lvol add {volume_name} {self.args.volume_size} {self.args.pool} --host-id {node_uuid}" + ) + if "ERROR:" in output or "LVStore is being recreated" in output: + self.logger.log(f"Volume create for {volume_name} deferred: {output.strip()}") + time.sleep(self.args.poll_interval) + continue + volume_id = self.extract_uuid(output) + break + if volume_id is None: + raise TestRunError(f"Timed out creating volume {volume_name} on node {node_uuid}") + self.created_volume_ids.append(volume_id) + self.logger.log(f"Created volume {volume_name} ({volume_id}) on node {node_uuid}") + return { + "index": index, + "volume_name": volume_name, + "volume_id": volume_id, + "node_uuid": node_uuid, + } + + def create_volumes(self, nodes): + self.logger.log( + f"Creating {len(nodes)} volumes of size {self.args.volume_size}, one per storage node" + ) + volumes = [] + for index, node in enumerate(nodes, start=1): + volume_name = f"aws_dual_soak_{self.run_id}_v{index}" + volumes.append(self._create_one_volume(volume_name, node["uuid"], index)) + return volumes + + def connect_and_mount_volumes(self, volumes, mount_root): + self.logger.log("Connecting volumes to client and preparing filesystems") + for volume in volumes: + self._connect_and_mount_one(volume, mount_root) + + def _connect_and_mount_one(self, volume, mount_root): + """Connect, mkfs, mount a single volume. Mutates ``volume`` to add + mount_point / fio_log / fio_stderr / rc_file / nqn keys. + + Saving ``nqn`` lets the churn cycle disconnect via ``nvme disconnect + -n `` without having to re-derive it from the device path. + """ + connect_output = self.sbctl(f"lvol connect {volume['volume_id']}") + connect_commands = [] + for line in connect_output.splitlines(): + stripped = line.strip() + if stripped.startswith("sudo nvme connect"): + connect_commands.append(stripped) + if not connect_commands: + raise TestRunError(f"No nvme connect command returned for {volume['volume_id']}") + nqn = None + for cmd in connect_commands: + m = NQN_RE.search(cmd) + if m: + nqn = m.group(1) + break + if nqn is None: + raise TestRunError( + f"Failed to parse NQN from lvol connect output for {volume['volume_id']}" + ) + volume["nqn"] = nqn + successful_connects = 0 + failed_connects = [] + for connect_cmd in connect_commands: + try: + self.client.run(connect_cmd, timeout=120, label=f"connect {volume['volume_id']}") + successful_connects += 1 + except TestRunError as exc: + failed_connects.append(str(exc)) + self.logger.log(f"Path connect failed for {volume['volume_id']}: {exc}") + if successful_connects == 0: + raise TestRunError( + f"No nvme paths connected for {volume['volume_id']}: {'; '.join(failed_connects)}" + ) + if failed_connects: + self.logger.log( + f"Continuing with {successful_connects}/{len(connect_commands)} connected paths " + f"for {volume['volume_id']}" + ) + volume["mount_point"] = posixpath.join(mount_root, f"vol{volume['index']}") + volume["fio_log"] = posixpath.join(mount_root, f"fio_vol{volume['index']}.log") + volume["fio_stderr"] = posixpath.join(mount_root, f"fio_vol{volume['index']}.stderr") + volume["rc_file"] = posixpath.join(mount_root, f"fio_vol{volume['index']}.rc") + find_and_mount = ( + "set -euo pipefail\n" + f"dev=$(readlink -f /dev/disk/by-id/*{volume['volume_id']}* | head -n 1)\n" + "if [ -z \"$dev\" ]; then\n" + f" echo 'Failed to locate NVMe device for {volume['volume_id']}' >&2\n" + " exit 1\n" + "fi\n" + f"sudo mkfs.xfs -f \"$dev\"\n" + f"sudo mkdir -p {shlex.quote(volume['mount_point'])}\n" + f"sudo mount \"$dev\" {shlex.quote(volume['mount_point'])}\n" + f"sudo chown {shlex.quote(self.user)}:{shlex.quote(self.user)} {shlex.quote(volume['mount_point'])}\n" + ) + self.client.run( + f"bash -lc {shlex.quote(find_and_mount)}", + timeout=600, + label=f"format and mount {volume['volume_id']}", + ) + + def _build_fio_name(self, index, churn_id): + # Names embed both the volume index and the churn counter so the name + # is unique even after a churn replaces a volume — avoids prefix + # collisions when pkill -f matches by --name=. + return f"aws_dual_soak_v{index}_c{churn_id}" + + def _start_fio_for_volume(self, volume, fio_name): + # Capture fio's stdout+stderr to a dedicated file. --output only + # writes the aggregate summary on exit; progress lines and error + # messages ("fio: max_latency exceeded", IO error details, etc.) + # go to stderr during the run. That stream is the authoritative + # source for "what went wrong" — surface it on every fault. + start_script = ( + "set -euo pipefail\n" + f"rm -f {shlex.quote(volume['rc_file'])} {shlex.quote(volume['fio_stderr'])}\n" + "nohup bash -lc " + + shlex.quote( + f"cd {shlex.quote(volume['mount_point'])} && " + f"fio --name={fio_name} --directory={shlex.quote(volume['mount_point'])} " + "--direct=1 --rw=randrw --bs=4K --group_reporting --time_based " + f"--numjobs={FIO_NUMJOBS} --iodepth=4 --size=4G --runtime={self.args.runtime} " + "--ioengine=aiolib --max_latency=20s --exitall_on_error=1 " + "--verify=md5 --do_verify=1 --verify_fatal=1 " + f"--output={shlex.quote(volume['fio_log'])}; " + "rc=$?; " + f"echo $rc > {shlex.quote(volume['rc_file'])}" + ) + + f" > {shlex.quote(volume['fio_stderr'])} 2>&1 & echo $!" + ) + _, stdout_text, _ = self.client.run( + f"bash -lc {shlex.quote(start_script)}", + timeout=60, + label=f"start fio {volume['volume_id']}", + ) + pid_text = stdout_text.strip().splitlines()[-1] + pid = int(pid_text) + job = FioJob( + volume_id=volume["volume_id"], + volume_name=volume["volume_name"], + mount_point=volume["mount_point"], + fio_log=volume["fio_log"], + fio_stderr=volume["fio_stderr"], + rc_file=volume["rc_file"], + pid=pid, + fio_name=fio_name, + ) + self.logger.log(f"Started fio for {volume['volume_name']} with pid {pid} (name={fio_name})") + return job + + def start_fio(self, volumes): + self.logger.log("Starting fio on all mounted volumes in parallel") + fio_jobs = [] + for volume in volumes: + fio_name = self._build_fio_name(volume["index"], 0) + fio_jobs.append(self._start_fio_for_volume(volume, fio_name)) + self.fio_jobs = fio_jobs + # Give fio a few seconds to begin; don't block on worker fork — the + # authoritative "fio is in trouble" signal is rc_file / stderr, not + # process counts. If fio never issues IO the pre-stop check and + # outage cluster-health checks will surface that. + time.sleep(5) + + def read_remote_file(self, path): + rc, stdout_text, _ = self.client.run( + f"bash -lc {shlex.quote(f'cat {shlex.quote(path)}')}", + timeout=30, + check=False, + label=f"read {path}", + ) + if rc != 0: + return "" + return stdout_text + + # ----- fio fault detection -------------------------------------------- + + # Any line in fio_stderr matching one of these (case-sensitive, fixed + # string) is treated as a fio fault — even if fio is still running. + # ``--max_latency`` violations in particular log "fio: latency of … + # exceeds specified max" and do NOT always terminate fio when run + # with --group_reporting + --numjobs>1, so a process-still-alive + # check alone misses them. + FIO_STDERR_ERROR_MARKERS = ( + "fio: latency of", # --max_latency violation + "fio: io_u error", # io_u submission/completion error + "fio: pid=", # generic fio per-job error dump + "io_u error on file", # alternate io_u error format + "verify failed", # data verification fault + "fio: verify", # alternate verify error + "fio: error", # generic fio error + "Killed", # bash reports fio got SIGKILL + "Terminated", # bash reports fio got SIGTERM (no churn here) + ) + + def _read_rc_file(self, job): + """Return the rc string if fio's wrapping bash wrote rc_file, else None. + + ``rc_file`` is one of three independent fault signals; see + ``_check_fio_fault``. + """ + probe = ( + f"if [ -f {shlex.quote(job.rc_file)} ]; then " + f"cat {shlex.quote(job.rc_file)}; fi" + ) + _, stdout_text, _ = self.client.run( + f"bash -lc {shlex.quote(probe)}", + timeout=15, + check=False, + label=f"check rc_file {job.volume_name}", + ) + rc = (stdout_text or "").strip() + return rc or None + + def _wrapper_alive(self, job): + """Return True iff the wrapping bash that runs fio is still alive. + + ``job.pid`` is the pid printed by ``echo $!`` at start_fio time + (the nohup'd bash, parent of fio). If that pid is gone AND no + rc_file was written, fio was signalled away and bash never got + to record an exit code — that case is a fault, not "still running". + """ + probe = ( + f"if kill -0 {int(job.pid)} 2>/dev/null; then echo alive; fi" + ) + _, stdout_text, _ = self.client.run( + f"bash -lc {shlex.quote(probe)}", + timeout=15, + check=False, + label=f"check wrapper pid {job.volume_name}", + ) + return stdout_text.strip() == "alive" + + def _scan_fio_stderr_for_errors(self, job): + """Return matching error lines from fio_stderr (up to 20), or "". + + See FIO_STDERR_ERROR_MARKERS for the list. ``--max_latency`` + violations in particular are reported here even while fio + continues running, so this catches faults the rc_file / pid + checks would miss. + """ + if not self.FIO_STDERR_ERROR_MARKERS: + return "" + grep_args = " ".join( + f"-e {shlex.quote(p)}" for p in self.FIO_STDERR_ERROR_MARKERS + ) + grep_cmd = ( + f"grep -F -m 20 {grep_args} " + f"{shlex.quote(job.fio_stderr)} 2>/dev/null || true" + ) + _, stdout_text, _ = self.client.run( + f"bash -lc {shlex.quote(grep_cmd)}", + timeout=15, + check=False, + label=f"scan stderr {job.volume_name}", + ) + return stdout_text.strip() + + def _check_fio_fault(self, job): + """Detect any fio fault for ``job``. Returns ``(kind, detail)`` or None. + + Three independent signals — ANY one is a fault: + * ``exited``: fio's wrapping bash wrote rc_file (any rc, including 0, + is a fault mid-run because fio's --runtime is orders of magnitude + longer than an outage iteration). + * ``missing``: the wrapping bash pid is gone and no rc_file was + written — fio was signalled away (or its wrapper died) without + recording an exit code. + * ``stderr_error``: fio_stderr contains a known fio error marker + (max_latency violation, io_u error, verify failure, etc.) — fio + may still be running but is degraded; treat it as a fault. + + ``detail`` is a human-readable one-liner. The full stderr/output + is dumped via ``_dump_fio_streams`` by the callers. + """ + rc = self._read_rc_file(job) + if rc is not None: + return ("exited", f"fio exited rc={rc}") + + if not self._wrapper_alive(job): + return ( + "missing", + f"fio wrapper pid {job.pid} is gone and no rc_file was written", + ) + + err = self._scan_fio_stderr_for_errors(job) + if err: + first_line = err.splitlines()[0][:240] + return ("stderr_error", f"stderr error marker: {first_line}") + + return None + + def _dump_fio_streams(self, job, context): + """Write fio's captured stderr and --output summary into the soak + log so the actual fio error text (max_latency violations, IO + errors, "fio: pid=…, err=…, func=…" lines) is visible next to + the outage scenario that triggered it.""" + for label, path, lines in [ + ("fio stderr", job.fio_stderr, 200), + ("fio summary", job.fio_log, 60), + ]: + _, body, _ = self.client.run( + f"bash -lc {shlex.quote(f'tail -{lines} {shlex.quote(path)} 2>/dev/null || true')}", + timeout=30, + check=False, + label=f"dump {label} {job.volume_name}", + ) + if body.strip(): + self.logger.block( + f"[{context}] {job.volume_name} {label} ({path}):", + body, + ) + else: + self.logger.log( + f"[{context}] {job.volume_name} {label} ({path}): (empty)" + ) + + def check_fio(self): + """Raise if any tracked fio shows a fault. + + Three independent signals are evaluated per ``_check_fio_fault``: + rc_file written (fio exited), wrapper pid gone with no rc_file + (signalled away), or a fio error marker in stderr (max_latency + violation, io_u/verify error, etc.). ANY of these is a fault — + fio's ``--runtime`` is orders of magnitude longer than a single + outage iteration, and a degraded-but-running fio is just as + invalid a result as a dead one. + + On fault, every faulting job's captured stderr and --output + summary are dumped into the soak log so the exact fio error + lines are visible next to the iteration that triggered them. + """ + faulted = [] + for job in self.fio_jobs: + fault = self._check_fio_fault(job) + if fault is not None: + faulted.append((job, fault)) + if not faulted: + return + for job, (kind, detail) in faulted: + self._dump_fio_streams(job, context=f"fio fault [{kind}] {detail}") + details = ", ".join( + f"{j.volume_name}={kind}:{detail}" for j, (kind, detail) in faulted + ) + raise TestRunError(f"fio fault detected: {details}") + + def ensure_fio_running(self): + self.check_fio() + + def stop_fio(self): + """Stop every fio process launched by this soak on the client host. + + Called between outage iterations so rebalancing runs unloaded. + Before killing, calls ``check_fio`` — any fio that wrote its + rc_file is a mid-run exit, which is a fault. Dumps the captured + fio stderr/summary into the soak log so the actual fio error + text is side-by-side with the outage scenario that triggered it. + After the check passes we SIGTERM (short grace window) then + SIGKILL; matching by ``fio --name=aws_dual_soak_*`` catches both + the bash wrapper and any fio workers. + """ + if not self.fio_jobs: + return + + # Pre-kill verification: any fio having exited is a fault. + self.check_fio() + + self.logger.log("All fio still running; stopping them between iterations") + kill_script = ( + "set +e\n" + "sudo pkill -TERM -f 'fio --name=aws_dual_soak_' 2>/dev/null || true\n" + "for i in $(seq 1 15); do\n" + " if ! pgrep -f 'fio --name=aws_dual_soak_' >/dev/null; then\n" + " exit 0\n" + " fi\n" + " sleep 2\n" + "done\n" + "sudo pkill -KILL -f 'fio --name=aws_dual_soak_' 2>/dev/null || true\n" + ) + self.client.run( + f"bash -lc {shlex.quote(kill_script)}", + timeout=90, + check=False, + label="stop fio", + ) + # Drop the job list; start_fio will rebuild it from self.volumes. + self.fio_jobs = [] + + # ----- single-volume fio + churn --------------------------------------- + + def _disconnect_one_volume(self, volume): + nqn = volume.get("nqn") + if not nqn: + self.logger.log( + f"WARNING: no NQN saved for {volume['volume_name']}; skipping nvme disconnect" + ) + return + # ``nvme disconnect -n `` tears down every controller (path) for + # that subsystem in one call, so multipath connections are handled + # without a per-path teardown loop. + self.client.run( + f"sudo nvme disconnect -n {shlex.quote(nqn)}", + timeout=60, + check=False, + label=f"nvme disconnect {volume['volume_name']}", + ) + + def _unmount_one_volume(self, volume): + mount_point = volume.get("mount_point") + if not mount_point: + return + # Try plain unmount first, then -f, then lazy as last resort. We've + # already SIGKILLed the fio holding the mount, so plain umount + # should succeed on the happy path; the fallbacks only matter if + # buffered IO is still draining. + umount_script = ( + f"sudo umount {shlex.quote(mount_point)} 2>/dev/null || " + f"sudo umount -f {shlex.quote(mount_point)} 2>/dev/null || " + f"sudo umount -l {shlex.quote(mount_point)} 2>/dev/null || true" + ) + self.client.run( + f"bash -lc {shlex.quote(umount_script)}", + timeout=60, + check=False, + label=f"umount {volume['volume_name']}", + ) + + def _delete_one_lvol(self, volume): + rc, stdout_text, stderr_text = self.sbctl_allow_failure( + f"lvol delete {volume['volume_id']} --force", + timeout=600, + ) + if rc != 0: + raise TestRunError( + f"lvol delete failed for {volume['volume_name']} ({volume['volume_id']}): " + f"{stdout_text.strip()} | {stderr_text.strip()}" + ) + if volume["volume_id"] in self.created_volume_ids: + self.created_volume_ids.remove(volume["volume_id"]) + + def _churn_one_volume(self): + """Rebuild one randomly-selected volume. + + Called between iterations, AFTER stop_fio. fio is not running, so + no per-job teardown is needed — we just delete + recreate + + remount the volume. The next iteration's start_fio() will pick up + the fresh volume and start a new fio job for it. + """ + if not self.volumes: + return + idx = random.randrange(len(self.volumes)) + old_volume = self.volumes[idx] + + self.churn_counter += 1 + churn_id = self.churn_counter + new_name = f"aws_dual_soak_{self.run_id}_v{old_volume['index']}_c{churn_id}" + self.logger.log( + f"churn {churn_id}: rebuilding {old_volume['volume_name']} " + f"({old_volume['volume_id']}) on node {old_volume['node_uuid']} " + f"-> {new_name}" + ) + + self._unmount_one_volume(old_volume) + self._disconnect_one_volume(old_volume) + self._delete_one_lvol(old_volume) + + # Recreate on the SAME storage node so the topology used by the + # outage scenario list (pinned at startup off role-representative + # pairs) doesn't drift. + new_volume = self._create_one_volume( + new_name, + old_volume["node_uuid"], + old_volume["index"], + ) + self._connect_and_mount_one(new_volume, self.mount_root) + + self.volumes[idx] = new_volume + self.logger.log( + f"churn {churn_id}: complete; {new_name} ({new_volume['volume_id']}) " + f"will be picked up by next start_fio" + ) + + # ----- outage methods --------------------------------------------------- + + def _forced_shutdown(self, node_id): + """Shutdown with --force; still retry if blocked by migration.""" + while True: + rc, stdout_text, stderr_text = self.sbctl_allow_failure( + f"sn shutdown {node_id} --force", + timeout=300, + ) + if rc == 0: + return + output = f"{stdout_text}\n{stderr_text}".lower() + retry_markers = ( + "migration", "migrat", "rebalanc", + "active task", "running task", + "in_progress", "in progress", + ) + if any(m in output for m in retry_markers): + self.logger.log( + f"Forced shutdown of {node_id} blocked by migration/task; retrying in 15s" + ) + time.sleep(15) + continue + raise RemoteCommandError( + f"mgmt: command failed with rc={rc}: sbctl sn shutdown {node_id} --force" + ) + + def _container_kill(self, node_id): + """Kill the SPDK container on the storage node's host. Node is expected + to auto-recover; no sbctl restart is issued.""" + host = self._node_host(node_id) + cmd = ( + "set -euo pipefail; " + "cns=$(sudo docker ps --format '{{.Names}}' | grep -E '^spdk_[0-9]+$' || true); " + "if [ -z \"$cns\" ]; then echo 'no spdk_* container found' >&2; exit 0; fi; " + "for cn in $cns; do echo \"killing $cn\"; sudo docker kill \"$cn\" || true; done" + ) + host.run( + f"bash -lc {shlex.quote(cmd)}", + timeout=120, + check=False, + label=f"container_kill {node_id}", + ) + + def _host_reboot(self, node_id): + """Reboot the storage node's host. Node is expected to auto-recover; + no sbctl restart is issued.""" + host = self._node_host(node_id) + # nohup + background + sleep so the shell exit beats reboot cleanly + cmd = "sudo nohup bash -c 'sleep 2; reboot -f' >/dev/null 2>&1 &" + try: + host.run( + f"bash -lc {shlex.quote(cmd)}", + timeout=30, + check=False, + label=f"host_reboot {node_id}", + ) + except RemoteCommandError as exc: + # SSH may drop as the host goes down — not fatal. + self.logger.log(f"host_reboot {node_id}: ssh terminated as expected: {exc}") + # Drop the cached SSH client; it's going to die anyway. + cached = self.node_hosts.pop(node_id, None) + if cached is not None: + try: + cached.close() + except Exception: + pass + + # --- Multipath NIC chaos --- + + def _is_multipath(self): + return bool(self.metadata.get("multipath")) + + def _get_data_nics(self): + """Return the list of data NIC names (e.g. ['eth1', 'eth2']).""" + nics = self.metadata.get("data_nics") + if nics: + return nics + iface = self.metadata.get("data_iface") + if iface: + return [iface] + return [] + + def _network_outage(self, node_id, duration): + """Take all data NICs down on one storage node for *duration* seconds, + then bring them back up. Simulates a transient network partition of + a single node. Node is expected to auto-recover once the NICs return + — no sbctl restart is issued.""" + host = self._node_host(node_id) + nics = self._get_data_nics() or ["eth1"] + self.logger.log( + f"network_outage on {node_id}: dropping {nics} for {duration}s" + ) + for nic in nics: + try: + host.run(f"sudo ip link set {nic} down", timeout=10, check=False, + label=f"netout down {nic} on {node_id}") + except Exception as e: + self.logger.log(f"WARNING: failed to down {nic} on {node_id}: {e}") + try: + time.sleep(duration) + finally: + for nic in nics: + try: + host.run(f"sudo ip link set {nic} up", timeout=10, check=False, + label=f"netout up {nic} on {node_id}") + except Exception as e: + self.logger.log(f"WARNING: failed to up {nic} on {node_id}: {e}") + + def _apply_outage(self, node_id, method): + self.logger.log(f"Applying outage '{method}' on {node_id}") + if method == "graceful": + self.shutdown_with_migration_retry(node_id) + elif method == "forced": + self._forced_shutdown(node_id) + elif method == "container_kill": + self._container_kill(node_id) + elif method == "host_reboot": + self._host_reboot(node_id) + elif method.startswith("network_outage_"): + try: + duration = int(method.rsplit("_", 1)[-1]) + except ValueError: + raise TestRunError(f"Unknown outage method: {method}") + self._network_outage(node_id, duration) + else: + raise TestRunError(f"Unknown outage method: {method}") + + def _needs_manual_restart(self, method): + return method not in AUTO_RECOVER_METHODS + + def wait_node_leaves_online(self, node_id, timeout=90, poll=2): + """Poll sbctl until the control plane observes node_id leaving 'online'. + Returns True once any non-online status is seen, False on timeout. + + Why this exists: the CP's health-check loop updates status on its own + cadence. If the soak polls wait_for_all_online *before* the CP has + noticed the outage, the first poll reports all-online and we return + while the target is actually still down. The next iteration then + stacks extra outages on a silently-offline node and breaks the FTT + budget (see incident: 2026-04-20 iter 17 container_kill on 2870dfa5, + CP status transition lagged the soak's first sn-list by ~1 s). + """ + deadline = time.time() + timeout + while time.time() < deadline: + try: + nodes = self.get_nodes() + except Exception as exc: + self.logger.log(f"wait_node_leaves_online: sn list failed ({exc})") + time.sleep(poll) + continue + status = next( + (n["status"] for n in nodes if n["uuid"] == node_id), + "unknown", + ) + if status != "online": + self.logger.log( + f"CP observed {node_id[:8]} leaving online (now {status})" + ) + return True + time.sleep(poll) + return False + + def run_outage_pair(self, node1, node2, method1, method2): + self.logger.log( + f"Outage pair: {node1}={method1} and {node2}={method2}" + ) + # Apply first outage, then optional gap, then second outage. + self._apply_outage(node1, method1) + if self.args.shutdown_gap: + time.sleep(self.args.shutdown_gap) + self._apply_outage(node2, method2) + + # Issue sbctl restart only for methods that leave the node in a + # "shutdown" state that the CP won't recover on its own. + # Retry with backoff: when the other node in the pair used an + # auto-recover method (container_kill / host_reboot), it may + # still be in_shutdown or in_restart when we try to restart the + # manually-recovered peer — the per-cluster guard rejects + # concurrent restarts. Retrying gives the auto-recovering node + # time to come back. + for node_id, method in [(node1, method1), (node2, method2)]: + if not self._needs_manual_restart(method): + continue + deadline = time.time() + self.args.restart_timeout + while True: + try: + # Emit a RESTART header with the wall-clock timestamp, + # then dump the raw sbctl -d restart stdout below it + # (without per-line timestamp prefix) so the CP trace + # produced by -d lines up with a single moment in time. + self.logger.log( + f"RESTART: {time.strftime('%Y-%m-%d %H:%M:%S')} {node_id}" + ) + stdout_text = self.sbctl(f"sn restart {node_id}", timeout=300) + with self.logger.lock: + print(stdout_text, flush=True, end="" + if stdout_text.endswith("\n") else "\n") + with open(self.logger.path, "a", encoding="utf-8") as handle: + handle.write(stdout_text) + if not stdout_text.endswith("\n"): + handle.write("\n") + break + except Exception as e: + if time.time() >= deadline: + raise + self.logger.log( + f"Restart of {node_id} failed ({e}), " + f"retrying in 15s (peer may still be recovering)") + time.sleep(15) + + # Before we call wait_for_all_online, make sure the control plane has + # actually observed each auto-recover target leaving 'online' state. + # Otherwise wait_for_all_online can race the CP: the first sn-list + # poll may still report the just-killed node as 'online' (stale), + # all statuses look good, and we return immediately — the node is + # then in a silent offline state when the next iteration stacks + # more outages on top, crossing the FTT budget. + # network_outage_* methods can finish before the CP notices; that's + # fine (short outages often recover from HA multipath without CP + # involvement), so we don't fail if the observation window expires. + for node_id, method in [(node1, method1), (node2, method2)]: + if method not in AUTO_RECOVER_METHODS: + continue + if method.startswith("network_outage_"): + observed = self.wait_node_leaves_online(node_id, timeout=30) + if not observed: + self.logger.log( + f"CP did not observe {node_id[:8]} offline for " + f"{method} within 30s (expected for short NIC drops)" + ) + else: + # container_kill, host_reboot: the node IS down; we must see it. + observed = self.wait_node_leaves_online(node_id, timeout=90) + if not observed: + self.logger.log( + f"WARN: CP never observed {node_id[:8]} offline after " + f"{method} within 90s; sn-list may be stale" + ) + + # For auto-recovery methods, allow a longer wait window since the host + # has to reboot / the container has to come back under its supervisor. + wait_timeout = self.args.restart_timeout + if any( + m in AUTO_RECOVER_METHODS for m in (method1, method2) + ): + wait_timeout = max(wait_timeout, self.args.auto_recover_wait) + + self.wait_for_all_online( + target_nodes={node1, node2}, timeout=wait_timeout + ) + # Intentionally no check_fio / wait_for_cluster_stable here: the + # outer loop calls check_fio right after this returns, then + # stop_fio, then waits for cluster stability unloaded. + + # ----- topology & scenario enumeration --------------------------------- + + def discover_topology(self): + """Return {lvs_name: {'primary': uuid, 'secondary': uuid, 'tertiary': uuid}}. + + Queried once at soak startup to identify the 4 role-representative + node pairs. Leader takeover mid-soak may shift role assignments; + the scenario list is pinned at startup so the 4 chosen pairs stay + fixed across retries even if the CP has re-promoted since. + """ + script = ( + "import json; " + "from simplyblock_core import db_controller; " + "db = db_controller.DBController(); " + "nodes = db.get_storage_nodes(); " + "out = {n.lvstore: {" + "'primary': n.get_id(), " + "'secondary': getattr(n, 'secondary_node_id', '') or '', " + "'tertiary': getattr(n, 'tertiary_node_id', '') or ''" + "} for n in nodes " + "if getattr(n, 'lvstore', '') " + "and not getattr(n, 'is_secondary_node', False)}; " + "print(json.dumps(out))" + ) + _, stdout_text, _ = self.mgmt.run( + f"sudo python3 -c {shlex.quote(script)}", + timeout=60, + label="discover topology", + ) + for line in reversed((stdout_text or "").strip().splitlines()): + line = line.strip() + if line.startswith("{"): + try: + return json.loads(line) + except json.JSONDecodeError: + continue + raise TestRunError( + f"Failed to parse topology JSON from mgmt; stdout was:\n{stdout_text}" + ) + + def _validate_topology_for_categories(self): + """Verify the pinned topology can supply at least one pair per category. + + Raises TestRunError if: + * the pinned topology has no LVS (empty cluster) + * no LVS has both primary and secondary (primary_secondary unservable) + * no LVS has both primary and tertiary (primary_tertiary unservable) + + The "unrelated" category is soft: in a dense FT=2 ring with N ≤ 4 + every pair shares at least one LVS, so no unrelated pair exists. + Log a warning so the gap is visible; pick_pair_for_category will + fall back to the next-widest distance at iteration time. + """ + if not self.topology: + raise TestRunError("Empty topology; cannot pick representative pairs") + + if not self._candidate_pairs_for_role("secondary"): + raise TestRunError( + "No LVS in topology has both primary and secondary; " + "primary_secondary category is unservable" + ) + if not self._candidate_pairs_for_role("tertiary"): + raise TestRunError( + "No LVS in topology has both primary and tertiary; " + "primary_tertiary category is unservable" + ) + + all_nodes, lvs_members = self._lvs_membership() + if not self._unrelated_pairs(all_nodes, lvs_members): + self.logger.log( + "WARN: No unrelated node pair available in topology " + f"({len(all_nodes)} nodes across {len(lvs_members)} LVSs); " + "'unrelated' scenarios will fall back to primary_tertiary, " + "then primary_secondary." + ) + + def _candidate_pairs_for_role(self, role_b): + """All (primary, role_b) pairs across the pinned topology.""" + pairs = [] + for roles in self.topology.values(): + a = roles.get("primary") + b = roles.get(role_b) + if a and b: + pairs.append((a, b)) + return pairs + + def _lvs_membership(self): + """Return (all_nodes, lvs_members) derived from the pinned topology.""" + all_nodes = set() + lvs_members = [] + for r in self.topology.values(): + members = {v for v in r.values() if v} + lvs_members.append(members) + all_nodes.update(members) + return all_nodes, lvs_members + + def _unrelated_pairs(self, all_nodes, lvs_members): + """All node pairs that share no LVS in any role.""" + pairs = [] + for a, b in itertools.combinations(sorted(all_nodes), 2): + if not any(a in m and b in m for m in lvs_members): + pairs.append((a, b)) + return pairs + + def pick_pair_for_category(self, category): + """Randomly pick a (node_a, node_b) pair for the given role category. + + Distance preserved per category (so each scenario in a "group" hits + the same topological relationship, just on different concrete nodes): + - primary_secondary: ring-distance 1 (direct successor) + - primary_tertiary: ring-distance 2 (exactly one node between) + - unrelated: ring-distance ≥ 3 (≥ 2 nodes between) + + "unrelated" is best-effort: dense FT=2 rings with N ≤ 4 cannot + supply any unrelated pair. In that case fall back to the next- + widest distance (primary_tertiary, then primary_secondary) so the + scenario still runs. + """ + if category == "unrelated": + all_nodes, lvs_members = self._lvs_membership() + candidates = self._unrelated_pairs(all_nodes, lvs_members) + if not candidates: + for fallback in ("tertiary", "secondary"): + candidates = self._candidate_pairs_for_role(fallback) + if candidates: + self.logger.log( + f"unrelated: no unrelated pair available; " + f"falling back to primary_{fallback} pair" + ) + break + elif category == "primary_secondary": + candidates = self._candidate_pairs_for_role("secondary") + elif category == "primary_tertiary": + candidates = self._candidate_pairs_for_role("tertiary") + else: + raise TestRunError(f"Unknown role category: {category}") + + if not candidates: + raise TestRunError( + f"No candidate pairs available for category {category}" + ) + return random.choice(candidates) + + def build_scenarios(self, nodes): + """Enumerate role categories × P(M,2) ordered method pairs. + + Returns a list of dicts with keys: method_a, method_b, category. + The actual (a, b) node pair is rolled at iteration time via + ``pick_pair_for_category`` so the soak hits many concrete pairs per + group while keeping the relative ring-distance fixed per category. + Same-method method pairs are NOT included — ordered distinct pairs + only, per itertools.permutations(methods, 2). + """ + _ = nodes # unused: pair picking happens at iteration time + scenarios = [] + for category in ROLE_CATEGORIES: + for m_a, m_b in itertools.permutations(self.methods, 2): + scenarios.append({ + "method_a": m_a, + "method_b": m_b, + "category": category, + }) + method_pair_count = len(self.methods) * (len(self.methods) - 1) + self.logger.log( + f"Built {len(scenarios)} scenarios: " + f"{len(ROLE_CATEGORIES)} role categories × " + f"P({len(self.methods)},2)={method_pair_count} ordered method pairs " + f"(node pair rolled randomly per scenario)" + ) + return scenarios + + def run(self): + self.ensure_prerequisites() + nodes = self.ensure_expected_nodes() + self.wait_for_all_online(timeout=self.args.restart_timeout) + # Wait for the cluster to be fully stable (no in-flight rebalance + # or data migration) before starting iterations. + self.wait_for_cluster_stable() + self.wait_for_data_migration_complete("test start") + mount_root = self.prepare_client() + # Saved so the churn cycle can mount its newly-created volume back + # into the same workspace tree. + self.mount_root = mount_root + volumes = self.create_volumes(nodes) + # Stored so the churn cycle can drive per-volume teardown/rebuild + # without re-creating / re-mounting the underlying soak workspace. + self.volumes = volumes + self.connect_and_mount_volumes(volumes, mount_root) + + # Pin the topology once, before any outages. Leader takeover during + # the soak can permanently shift role assignments, but the + # representative pairs are fixed at startup so each cycle targets + # the same pairs for the same role categories. + self.topology = self.discover_topology() + self.logger.log(f"Pinned topology: {json.dumps(self.topology, sort_keys=True)}") + self._validate_topology_for_categories() + self.scenarios = self.build_scenarios(nodes) + if not self.scenarios: + raise TestRunError("No outage scenarios built; method/node list empty") + + start_at = max(1, self.args.start_at) + if start_at > len(self.scenarios): + raise TestRunError( + f"--start-at {start_at} exceeds scenario count " + f"{len(self.scenarios)}; nothing to run" + ) + + churn_every = 0 if self.args.no_churn else self.args.churn_every_n_iters + if churn_every > 0: + self.logger.log( + f"Volume churn enabled: rebuild one random volume every " + f"{churn_every} iteration(s) in the unloaded settle window" + ) + else: + self.logger.log("Volume churn disabled") + + # iteration counter is aligned to scenario_idx: when --start-at N is + # used, the first executed scenario logs as iteration=N so post-hoc + # grep for "iteration 60" finds the resumed scenario and its prior + # failure side by side. + iteration = start_at - 1 + cycle = 0 + while True: + cycle += 1 + if self.args.cycles and cycle > self.args.cycles: + self.logger.log( + f"Completed {cycle - 1} full cycle(s) of {len(self.scenarios)} " + f"scenarios; exiting" + ) + return + + cycle_scenarios = list(self.scenarios) + if self.args.shuffle_scenarios: + # Seed off cycle number so two soaks with the same --cycles + # walk identical sequences, but successive cycles rotate + # through different orderings. + random.Random(cycle).shuffle(cycle_scenarios) + + cycle_start_at = start_at if cycle == 1 else 1 + self.logger.log( + f"Starting cycle {cycle} ({len(cycle_scenarios)} scenarios" + f"{', shuffled' if self.args.shuffle_scenarios else ''}" + f"{f', starting at scenario {cycle_start_at}' if cycle_start_at > 1 else ''})" + ) + + for scenario_idx, scenario in enumerate(cycle_scenarios, 1): + if scenario_idx < cycle_start_at: + continue + iteration += 1 + + node1, node2 = self.pick_pair_for_category(scenario["category"]) + method1 = scenario["method_a"] + method2 = scenario["method_b"] + + self.logger.log( + f"Starting outage iteration {iteration} " + f"(cycle {cycle} scenario {scenario_idx}/{len(cycle_scenarios)}): " + f"category={scenario['category']} " + f"pair=({node1[:8]},{node2[:8]}) " + f"methods=({method1},{method2})" + ) + + # Skip scenarios whose nodes are not currently in the + # expected-node set (e.g. one has been removed from the + # cluster mid-soak). Better to log-and-skip than to try to + # restart a ghost. + current_uuids = {n["uuid"] for n in self.ensure_expected_nodes()} + missing = [uid for uid in (node1, node2) if uid not in current_uuids] + if missing: + self.logger.log( + f"Scenario {iteration} skipped: nodes {missing} not in " + f"current cluster set {sorted(current_uuids)}" + ) + continue + + # Load during outage: start fresh fio so the outage hits + # live IO. This is the only window where fio runs. + self.start_fio(self.volumes) + + self.run_outage_pair(node1, node2, method1, method2) + + # Fault gate: any fio that exited / faulted during the + # outage is a real failure. check_fio raises on faults. + self.check_fio() + + # Unload before settle: stop fio so the rebalance / data- + # migration drain runs without IO pressure. This is what + # makes the iteration cycle short. + self.stop_fio() + + # Optional volume rebuild. fio is already stopped, so we + # don't need any per-job teardown — just delete + recreate + # + remount; the next iteration's start_fio picks it up. + if churn_every > 0 and iteration % churn_every == 0: + self._churn_one_volume() + + self.wait_for_all_online(timeout=self.args.restart_timeout) + self.wait_for_cluster_stable() + self.wait_for_data_migration_complete("next iteration") + + +def main(): + args = parse_args() + logger = Logger(args.log_file) + logger.log(f"Logging to {args.log_file}") + metadata = load_metadata(args.metadata) + if not metadata.get("clients"): + raise SystemExit("Metadata file does not contain a client host") + + runner = SoakRunner(args, metadata, logger) + try: + runner.run() + except (RemoteCommandError, TestRunError, ValueError) as exc: + logger.log(f"ERROR: {exc}") + sys.exit(1) + finally: + runner.close() + + +if __name__ == "__main__": + main() diff --git a/scripts/setup_lab_perf_test1.py b/scripts/setup_lab_perf_test1.py index 305d836fd..d22aafd07 100644 --- a/scripts/setup_lab_perf_test1.py +++ b/scripts/setup_lab_perf_test1.py @@ -99,7 +99,7 @@ def wait_for_ssh(ip, timeout=300): return False -def ssh_exec(ip, cmds, get_output=False, check=False): +def ssh_exec(ip, cmds, get_output=False, check=False, timeout=600): results = [] for cmd in cmds: print(f" [{ip}] $ {cmd}") @@ -108,7 +108,7 @@ def ssh_exec(ip, cmds, get_output=False, check=False): env=_ssh_env(), capture_output=True, text=True, - timeout=600, + timeout=timeout, ) out = proc.stdout err = proc.stderr diff --git a/scripts/setup_perf_test1.py b/scripts/setup_perf_test1.py index 2f35015a4..f539b149d 100644 --- a/scripts/setup_perf_test1.py +++ b/scripts/setup_perf_test1.py @@ -14,7 +14,7 @@ KEY_PATH = os.path.expanduser("~/.ssh/mtes01.pem") AZ = "us-east-1a" SG_NAME = "default" -BRANCH = "main" +BRANCH = "inline-checksum-validation" MAX_LVOL = "100" # --- Manual Network Config --- # Replace this with your actual Subnet ID (e.g., "subnet-0593459d6b931ee4c") @@ -403,7 +403,7 @@ def main(): "sudo dnf install git python3-pip nvme-cli -y", "sudo /usr/bin/python3 -m pip install --upgrade pip setuptools wheel", "sudo /usr/bin/python3 -m pip install ruamel.yaml", - "sudo pip install git+https://github.com/simplyblock-io/sbcli@main --upgrade --force --ignore-installed requests", + "sudo pip install git+https://github.com/simplyblock-io/sbcli@inline-checksum-validation --upgrade --force --ignore-installed requests", "echo 'export PATH=/usr/local/bin:$PATH' >> ~/.bashrc" ] @@ -417,17 +417,29 @@ def main(): # --- 5. Cluster Configuration (Phase 2) --- # Step 5a: Create cluster on mgmt (sequential, must complete first) print("Phase 2a: Creating cluster on management node...") + # --enable-inline-checksum is frozen at cluster create time (no + # mutator, no upgrade path). Pair with --enable-inline-checksum on + # `sn configure` below so the drives format to an LBAF with NVMe + # metadata (ms>=8) and alceml can run in md-on-device mode. ssh_exec(mgmt_ip, [ - "sudo /usr/local/bin/sbctl -d cluster create --enable-node-affinity" + "sudo /usr/local/bin/sbctl -d cluster create" " --data-chunks-per-stripe 2 --parity-chunks-per-stripe 2" + " --enable-inline-checksum" ], check=True) print("Phase 2a: DONE - cluster created.") # Step 5b: Configure and deploy storage nodes in parallel print("Phase 2b: Configuring storage nodes...") + # --enable-inline-checksum here drives `sn configure`'s formatter to + # pick the smallest LBAF with ds=12, ms>=8 (8B+ NVMe metadata per + # 4K block) and force-reformat through the existing 4K-already-set + # early-out — required for alceml's md-on-device checksum_method=1. + # Devices with no md-capable LBAF format to plain 4K and fall back + # to checksum_method=2 (cv_fallback, ~1.17% capacity overhead). with ThreadPoolExecutor(max_workers=len(sn_ips)) as executor: tasks = [executor.submit(ssh_exec, ip, [ f"sudo /usr/local/bin/sbctl -d sn configure --max-lvol {MAX_LVOL}" + " --enable-inline-checksum" ], check=True) for ip in sn_ips] for t in tasks: t.result() diff --git a/scripts/stop_lab_after_soak.py b/scripts/stop_lab_after_soak.py new file mode 100644 index 000000000..1b6ec037d --- /dev/null +++ b/scripts/stop_lab_after_soak.py @@ -0,0 +1,244 @@ +#!/usr/bin/env python3 +"""Wait for an outage soak process to exit, then stop the lab's storage +nodes and clients via AWS EC2 to stop accruing instance hours. + +Usage (typical): + nohup python3 scripts/aws_dual_node_outage_soak_mixed_churn.py \\ + --metadata cluster_metadata_mp.json ... > soak.out 2>&1 & + SOAK_PID=$! + nohup python3 scripts/stop_lab_after_soak.py \\ + --metadata cluster_metadata_mp.json \\ + --soak-pid "$SOAK_PID" > stop_lab.out 2>&1 & + +The monitor polls the PID until it exits. Exit code 0 -> the soak +finished its configured iteration budget; non-zero -> the soak failed +permanently (fio fault, RemoteCommandError, KeyboardInterrupt, etc.). +Either outcome triggers the same shutdown: every instance listed under +``storage_nodes[*].instance_id`` and ``clients[*].instance_id`` in the +metadata file is sent ``StopInstances``. ``mgmt.instance_id`` is +deliberately left running so the user can SSH back in to retrieve logs +without paying for the storage / client tier. + +AWS calls are best-effort: each instance is attempted independently, +errors are logged, and the monitor exits non-zero if any stop failed +while still attempting the rest. +""" + +import argparse +import errno +import json +import os +import signal +import sys +import time +from pathlib import Path +from typing import List, Tuple + + +def log(msg: str) -> None: + ts = time.strftime("%Y-%m-%d %H:%M:%S") + print(f"{ts} {msg}", flush=True) + + +def parse_args() -> argparse.Namespace: + p = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawDescriptionHelpFormatter) + p.add_argument("--metadata", required=True, + help="Path to cluster_metadata*.json (same file the soak script uses).") + p.add_argument("--soak-pid", type=int, required=True, + help="PID of the soak process to wait on. Cross-process " + "(does NOT have to be a direct child of this monitor).") + p.add_argument("--region", default="us-east-1", + help="AWS region for boto3 / ec2 calls. Default: us-east-1.") + p.add_argument("--poll-interval", type=float, default=5.0, + help="Seconds between PID liveness checks. Default 5s. " + "Lower = quicker stop after soak exit; higher = less load.") + p.add_argument("--dry-run", action="store_true", + help="Log every action but do not call AWS StopInstances. " + "Useful for testing in a non-AWS environment.") + p.add_argument("--stop-mgmt", action="store_true", + help="Also stop the mgmt instance. Default: leave it running " + "so the user can SSH back to retrieve soak logs.") + return p.parse_args() + + +def _pid_is_running(pid: int) -> bool: + """True iff process ``pid`` exists. Works for non-child processes — + sends signal 0 (no-op) and reads ESRCH/EPERM to classify.""" + try: + os.kill(pid, 0) + except OSError as e: + if e.errno == errno.ESRCH: + return False + if e.errno == errno.EPERM: + # Process exists but is owned by someone else. Still "running" + # for our purposes — the soak might have been started by root + # while this monitor runs as a user. + return True + raise + return True + + +def _pid_exit_code(pid: int) -> int: + """Reap a child PID and return its exit code; return -1 if the PID is + not our child (signal-0 poll is the best we can do, but we cannot + read the exit code of a non-descendant). The monitor still triggers + the stop sequence regardless — for sibling/foreign PIDs we just + can't distinguish "completed" vs "failed permanently" in the log.""" + try: + _, status = os.waitpid(pid, os.WNOHANG) + if os.WIFEXITED(status): + return os.WEXITSTATUS(status) + if os.WIFSIGNALED(status): + return 128 + os.WTERMSIG(status) + return -1 + except ChildProcessError: + # Not our child — exit code is unavailable on POSIX without + # ptrace / process accounting. Caller treats -1 as "unknown". + return -1 + except OSError: + return -1 + + +def wait_for_soak_exit(pid: int, poll_interval: float) -> int: + log(f"Waiting for soak PID {pid} to exit (poll every {poll_interval}s)…") + while _pid_is_running(pid): + time.sleep(poll_interval) + rc = _pid_exit_code(pid) + if rc == -1: + log(f"Soak PID {pid} has exited. Exit code is unknown (not a child of this monitor).") + else: + log(f"Soak PID {pid} has exited with code {rc}.") + return rc + + +def _collect_instance_ids(metadata: dict, stop_mgmt: bool) -> Tuple[List[str], List[str], List[str]]: + """Return (storage_node_ids, client_ids, mgmt_ids). + + mgmt_ids is non-empty only when --stop-mgmt was passed. + """ + sn_ids = [ + sn["instance_id"] + for sn in metadata.get("storage_nodes", []) + if sn.get("instance_id") + ] + client_ids = [ + c["instance_id"] + for c in metadata.get("clients", []) + if c.get("instance_id") + ] + mgmt_block = metadata.get("mgmt") or {} + mgmt_id = mgmt_block.get("instance_id") + mgmt_ids = [mgmt_id] if (stop_mgmt and mgmt_id) else [] + return sn_ids, client_ids, mgmt_ids + + +def _stop_one(ec2_client, instance_id: str, dry_run: bool) -> Tuple[str, bool, str]: + """Stop a single instance. Returns (instance_id, ok, message). The + monitor catches every exception per-instance so one ENI tied up by + a stuck VPC endpoint or a misconfigured IAM doesn't prevent the + other instances from stopping.""" + if dry_run: + return instance_id, True, "dry-run (no API call)" + try: + resp = ec2_client.stop_instances(InstanceIds=[instance_id]) + states = [s.get("CurrentState", {}).get("Name", "?") + for s in resp.get("StoppingInstances", [])] + return instance_id, True, f"requested stop; current state: {','.join(states) or '?'}" + except Exception as e: # noqa: BLE001 — every failure mode reported uniformly + return instance_id, False, f"{type(e).__name__}: {e}" + + +def stop_instances(region: str, ids: List[str], label: str, dry_run: bool) -> Tuple[int, int]: + """Stop every id in ``ids``. Returns (ok_count, fail_count). Each + instance is attempted independently — a failure on one does not + short-circuit the rest.""" + if not ids: + log(f"No {label} instance ids in metadata; skipping.") + return 0, 0 + + log(f"Stopping {len(ids)} {label} instance(s): {', '.join(ids)}" + + (" [DRY-RUN]" if dry_run else "")) + + if dry_run: + for inst in ids: + log(f" [dry-run] would stop {inst}") + return len(ids), 0 + + # Defer the boto3 import until we actually need it so --dry-run works + # on hosts where boto3 isn't installed. + try: + import boto3 # type: ignore + except ImportError: + log("ERROR: boto3 is required to call AWS EC2. Install with: " + "`pip install boto3`. To preview without AWS, re-run with --dry-run.") + return 0, len(ids) + + ec2 = boto3.client("ec2", region_name=region) + ok = 0 + fail = 0 + for inst in ids: + _, success, msg = _stop_one(ec2, inst, dry_run=False) + if success: + log(f" OK {inst}: {msg}") + ok += 1 + else: + log(f" FAIL {inst}: {msg}") + fail += 1 + return ok, fail + + +def main() -> int: + args = parse_args() + + metadata_path = Path(args.metadata) + if not metadata_path.is_file(): + log(f"ERROR: metadata file not found: {metadata_path}") + return 2 + + with open(metadata_path) as fh: + metadata = json.load(fh) + + sn_ids, client_ids, mgmt_ids = _collect_instance_ids(metadata, args.stop_mgmt) + if not sn_ids and not client_ids and not mgmt_ids: + log(f"ERROR: metadata at {metadata_path} has no instance ids under " + f"storage_nodes / clients / mgmt; nothing to stop.") + return 2 + + log(f"Lab inventory: storage_nodes={len(sn_ids)}, clients={len(client_ids)}, " + f"mgmt={'KEEP RUNNING' if not args.stop_mgmt else 'STOP'} " + f"(mgmt.instance_id={(metadata.get('mgmt') or {}).get('instance_id', '?')})") + + # Block until the soak's PID exits. Exit code is informational only — + # both success and permanent failure trigger the same shutdown. + soak_rc = wait_for_soak_exit(args.soak_pid, args.poll_interval) + log("Proceeding to stop instances regardless of soak outcome " + "(both completion and permanent failure should leave the cluster idle).") + + total_ok = 0 + total_fail = 0 + for ids, label in ( + (client_ids, "client"), + (sn_ids, "storage_node"), + (mgmt_ids, "mgmt"), # only populated when --stop-mgmt passed + ): + ok, fail = stop_instances(args.region, ids, label, dry_run=args.dry_run) + total_ok += ok + total_fail += fail + + log(f"Done. Stopped OK: {total_ok}, failed: {total_fail}. " + f"Soak exit code was {soak_rc}.") + + # Non-zero exit if any individual stop failed (best-effort policy + # already attempted every instance, but the user should see the + # signal in their wrapping shell / monitoring). + if total_fail > 0: + return 1 + return 0 + + +if __name__ == "__main__": + try: + sys.exit(main()) + except KeyboardInterrupt: + log("Interrupted before completion; instances may still be running.") + sys.exit(130) diff --git a/simplyblock_cli/cli-reference.yaml b/simplyblock_cli/cli-reference.yaml index 5a0a3bb04..ef570382d 100644 --- a/simplyblock_cli/cli-reference.yaml +++ b/simplyblock_cli/cli-reference.yaml @@ -111,6 +111,14 @@ commands: dest: force type: bool action: store_true + - name: "--enable-inline-checksum" + help: > + When formatting (with --force), prefer an LBAF that supports >=8 bytes of NVMe metadata per block, + so alceml can run inline checksum validation in md-on-device mode. Drives with no md-capable LBAF + still format to plain 4K and will use the fallback layout. + dest: inline_checksum + type: bool + action: store_true - name: "--calculate-hp-only" help: "Calculate the minimum required huge pages, it depends on the following params: --cores-percentage, --sockets-to-use, --max-lvol, --nodes-per-socket, --number-of-devices." dest: calculate_hp_only @@ -980,6 +988,14 @@ commands: dest: strict_node_anti_affinity type: bool action: store_true + - name: "--enable-inline-checksum" + help: > + Enable inline CRC checksum validation on every IO for silent-data-error protection. Cannot be enabled or + disabled after cluster creation. Per-device alceml mode (md-on-device vs fallback) is auto-detected at + add-node. + dest: inline_checksum + type: bool + action: store_true - name: "--name" help: > Assigns a name to the newly created cluster. @@ -1157,6 +1173,13 @@ commands: dest: strict_node_anti_affinity type: bool action: store_true + - name: "--enable-inline-checksum" + help: > + Enable inline CRC checksum validation on every IO for silent-data-error protection. Cannot be enabled or + disabled after cluster creation. + dest: inline_checksum + type: bool + action: store_true - name: "--name" help: > Assigns a name to the newly created cluster. diff --git a/simplyblock_cli/cli.py b/simplyblock_cli/cli.py index abfbbbbad..507e27304 100755 --- a/simplyblock_cli/cli.py +++ b/simplyblock_cli/cli.py @@ -104,6 +104,7 @@ def init_storage_node__configure(self, subparser): argument = subcommand.add_argument('--size-range', help='NVMe SSD device size range separated by -, can be X(m,g,t) or bytes as integer, example: --size-range 50G-1T or --size-range 1232345-67823987, --device-model and --size-range must be set together.', type=str, default='', dest='size_range', required=False) argument = subcommand.add_argument('--nvme-names', help='Comma separated list of nvme namespace names like nvme0n1,nvme1n1.', type=str, default='', dest='nvme_names', required=False) argument = subcommand.add_argument('--force', help='Force format detected or passed nvme pci address to 4K and clean partitions.', dest='force', action='store_true') + argument = subcommand.add_argument('--enable-inline-checksum', help='When formatting (with --force), prefer an LBAF that supports >=8 bytes of NVMe metadata per block, so alceml can run inline checksum validation in md-on-device mode. Drives with no md-capable LBAF still format to plain 4K and will use the fallback layout.', dest='inline_checksum', action='store_true') argument = subcommand.add_argument('--calculate-hp-only', help='Calculate the minimum required huge pages, it depends on the following params: --cores-percentage, --sockets-to-use, --max-lvol, --nodes-per-socket, --number-of-devices.', dest='calculate_hp_only', action='store_true') argument = subcommand.add_argument('--number-of-devices', help='Number of devices that will be used on this host. For calculating huge pages memory only.', type=int, dest='number_of_devices') @@ -418,6 +419,7 @@ def init_cluster__create(self, subparser): if self.developer_mode: argument = subcommand.add_argument('--disable-monitoring', help='Disable monitoring stack, false by default. Default: `false`.', dest='disable_monitoring', action='store_true') argument = subcommand.add_argument('--strict-node-anti-affinity', help='Enable strict node anti affinity for storage nodes. Never more than one chunk is placed on a node. This requires a minimum of _data-chunks-in-stripe + parity-chunks-in-stripe + 1_ nodes in the cluster.', dest='strict_node_anti_affinity', action='store_true') + argument = subcommand.add_argument('--enable-inline-checksum', help='Enable inline CRC checksum validation on every IO for silent-data-error protection. Cannot be enabled or disabled after cluster creation. Per-device alceml mode (md-on-device vs fallback) is auto-detected at add-node.', dest='inline_checksum', action='store_true') argument = subcommand.add_argument('--name', '-n', help='Assigns a name to the newly created cluster.', type=str, dest='name') argument = subcommand.add_argument('--qpair-count', help='The NVMe/TCP transport qpair count per logical volume. Default: `32`.', type=range_type(0, 128), default=32, dest='qpair_count') argument = subcommand.add_argument('--client-qpair-count', help='The default NVMe/TCP transport qpair count per logical volume for client. Default: `3`.', type=range_type(0, 128), default=3, dest='client_qpair_count') @@ -453,6 +455,7 @@ def init_cluster__add(self, subparser): if self.developer_mode: argument = subcommand.add_argument('--inflight-io-threshold', help='The number of inflight IOs allowed before the IO queuing starts. Default: `4`.', type=int, default=4, dest='inflight_io_threshold') argument = subcommand.add_argument('--strict-node-anti-affinity', help='Enable strict node anti affinity for storage nodes. Never more than one chunk is placed on a node. This requires a minimum of _data-chunks-in-stripe + parity-chunks-in-stripe + 1_ nodes in the cluster."', dest='strict_node_anti_affinity', action='store_true') + argument = subcommand.add_argument('--enable-inline-checksum', help='Enable inline CRC checksum validation on every IO for silent-data-error protection. Cannot be enabled or disabled after cluster creation.', dest='inline_checksum', action='store_true') argument = subcommand.add_argument('--name', '-n', help='Assigns a name to the newly created cluster.', type=str, dest='name') argument = subcommand.add_argument('--client-data-nic', help='Network interface name from client to use for logical volume connection.', type=str, dest='client_data_nic') argument = subcommand.add_argument('--use-backup', help='The path to JSON file with S3/MinIO backup configuration.', type=str, dest='use_backup') diff --git a/simplyblock_cli/clibase.py b/simplyblock_cli/clibase.py index f8f8af201..4787e489d 100755 --- a/simplyblock_cli/clibase.py +++ b/simplyblock_cli/clibase.py @@ -140,7 +140,8 @@ def storage_node__configure(self, sub_command, args): args.max_lvol, max_prov, sockets_to_use,args.nodes_per_socket, pci_allowed, pci_blocked, force=args.force, device_model=args.device_model, size_range=args.size_range, cores_percentage=cores_percentage, nvme_names=nvme_names, - calculate_hp_only=args.calculate_hp_only, number_of_devices=number_of_devices) + calculate_hp_only=args.calculate_hp_only, number_of_devices=number_of_devices, + inline_checksum=args.inline_checksum) def storage_node__deploy_cleaner(self, sub_command, args): storage_ops.deploy_cleaner() @@ -996,6 +997,7 @@ def cluster_add(self, args): with open(args.use_backup, 'r') as f: backup_config = _json.load(f) + inline_checksum = getattr(args, 'inline_checksum', False) return cluster_ops.add_cluster( blk_size, page_size_in_blocks, cap_warn, cap_crit, prov_cap_warn, prov_cap_crit, distr_ndcs, distr_npcs, distr_bs, distr_chunk_bs, ha_type, enable_node_affinity, @@ -1003,6 +1005,7 @@ def cluster_add(self, args): client_data_nic, max_fault_tolerance=max_fault_tolerance, backup_config=backup_config, nvmf_base_port=args.nvmf_base_port, rpc_base_port=args.rpc_base_port, snode_api_port=args.snode_api_port, hashicorp_vault_settings=HashicorpVaultSettings({"base_url": args.hashicorp_vault_url}) if args.hashicorp_vault_url else None, + inline_checksum=inline_checksum, ) def cluster_create(self, args): @@ -1040,6 +1043,7 @@ def cluster_create(self, args): is_single_node = args.is_single_node fabric = args.fabric client_data_nic = args.client_data_nic + inline_checksum = getattr(args, 'inline_checksum', False) max_fault_tolerance = min(distr_npcs, 2) if distr_npcs >= 1 else 1 @@ -1059,6 +1063,7 @@ def cluster_create(self, args): backup_config=backup_config, nvmf_base_port=args.nvmf_base_port, rpc_base_port=args.rpc_base_port, snode_api_port=args.snode_api_port, hashicorp_vault_settings=HashicorpVaultSettings({"base_url": args.hashicorp_vault_url}) if args.hashicorp_vault_url else None, + inline_checksum=inline_checksum, ) def query_yes_no(self, question, default="yes"): diff --git a/simplyblock_core/cluster_ops.py b/simplyblock_core/cluster_ops.py index dc97c744c..ebe06be17 100755 --- a/simplyblock_core/cluster_ops.py +++ b/simplyblock_core/cluster_ops.py @@ -227,7 +227,7 @@ def create_cluster(blk_size, page_size_in_blocks, cli_pass, nvmeof_tls_config=None, max_fault_tolerance=1, backup_config=None, nvmf_base_port=4420, rpc_base_port=8080, snode_api_port=50001, container_image_prefix=None, hashicorp_vault_settings : t.Optional[HashicorpVaultSettings] = None, -) -> str: + inline_checksum=False) -> str: if distr_ndcs == 0 and distr_npcs == 0: raise ValueError("both distr_ndcs and distr_npcs cannot be 0") @@ -354,6 +354,7 @@ def create_cluster(blk_size, page_size_in_blocks, cli_pass, cluster.disable_monitoring = disable_monitoring cluster.mode = mode cluster.full_page_unmap = False + cluster.inline_checksum = bool(inline_checksum) cluster.client_data_nic = client_data_nic or "" cluster.max_fault_tolerance = max_fault_tolerance cluster.nvmf_base_port = nvmf_base_port @@ -458,7 +459,7 @@ def add_cluster(blk_size, page_size_in_blocks, cap_warn, cap_crit, prov_cap_warn client_data_nic="", max_fault_tolerance=1, backup_config=None, nvmf_base_port=4420, rpc_base_port=8080, snode_api_port=50001, hashicorp_vault_settings : t.Optional[HashicorpVaultSettings] = None, -) -> str: + inline_checksum=False) -> str: default_cluster = None @@ -558,6 +559,7 @@ def add_cluster(blk_size, page_size_in_blocks, cap_warn, cap_crit, prov_cap_warn cluster.fabric_tcp = protocols["tcp"] cluster.fabric_rdma = protocols["rdma"] cluster.full_page_unmap = False + cluster.inline_checksum = bool(inline_checksum) cluster.client_data_nic = client_data_nic or "" cluster.max_fault_tolerance = max_fault_tolerance cluster.nvmf_base_port = nvmf_base_port diff --git a/simplyblock_core/controllers/device_controller.py b/simplyblock_core/controllers/device_controller.py index b264c689c..b6fc94fd2 100644 --- a/simplyblock_core/controllers/device_controller.py +++ b/simplyblock_core/controllers/device_controller.py @@ -297,12 +297,21 @@ def _def_create_device_stack(device_obj, snode, force=False, clear_data=False): cluster = db_controller.get_cluster_by_id(snode.cluster_id) if alceml_name not in bdev_names: + checksum_method, cache_size, cache_eviction_threshold = utils.alceml_checksum_params(cluster, device_obj) + if cluster.inline_checksum and not device_obj.md_supported: + logger.warning( + f"Inline checksum: device {device_obj.get_id()} ({device_obj.pcie_address}) has no NVMe metadata; " + f"alceml will run in fallback mode (extra md page, ~1.17%% capacity overhead)." + ) ret = snode.create_alceml( alceml_name, nvme_bdev, alceml_id, pba_init_mode=3 if clear_data else 2, write_protection=cluster.distr_ndcs > 1, pba_page_size=cluster.page_size_in_blocks, - full_page_unmap=cluster.full_page_unmap + full_page_unmap=cluster.full_page_unmap, + checksum_method=checksum_method, + cache_size=cache_size, + cache_eviction_threshold=cache_eviction_threshold, ) if not ret: diff --git a/simplyblock_core/controllers/lvol_controller.py b/simplyblock_core/controllers/lvol_controller.py index 50bd2c0a1..7f66c5cc2 100755 --- a/simplyblock_core/controllers/lvol_controller.py +++ b/simplyblock_core/controllers/lvol_controller.py @@ -471,6 +471,11 @@ def add_lvol_ha(name, size, host_id_or_name, ha_type, pool_id_or_name, use_comp= if dev.status == dev.STATUS_ONLINE: dev_count += 1 cluster_size_total += dev.size + # Inline-checksum fallback layout reserves 6 of every 510 data blocks + # per 2 MiB extent for the extended md page + filler. Charge that as + # initial utilization rather than reducing reported raw capacity. + if cl.inline_checksum and not dev.md_supported: + cluster_size_prov += utils.alceml_fallback_overhead_bytes(cl, dev.size) if len(online_nodes) == 0: logger.error("No online Storage nodes found") diff --git a/simplyblock_core/env_var b/simplyblock_core/env_var index 7f021db8c..60edd1ab0 100644 --- a/simplyblock_core/env_var +++ b/simplyblock_core/env_var @@ -1,5 +1,5 @@ SIMPLY_BLOCK_COMMAND_NAME=sbcli-dev SIMPLY_BLOCK_VERSION=19.2.34 -SIMPLY_BLOCK_DOCKER_IMAGE=simplyblock/simplyblock:main -SIMPLY_BLOCK_SPDK_ULTRA_IMAGE=simplyblock/spdk:main-latest +SIMPLY_BLOCK_DOCKER_IMAGE=simplyblock/simplyblock:inline-checksum-validation +SIMPLY_BLOCK_SPDK_ULTRA_IMAGE=simplyblock/spdk:checksum-validation-latest diff --git a/simplyblock_core/models/cluster.py b/simplyblock_core/models/cluster.py index 10597fc62..4423d46dd 100644 --- a/simplyblock_core/models/cluster.py +++ b/simplyblock_core/models/cluster.py @@ -98,6 +98,9 @@ class Cluster(BaseModel): shared_placement_migration_pending: bool = False full_page_unmap: bool = True is_single_node: bool = False + # Inline CRC checksum validation for silent-data-error protection. + # Frozen at cluster create time; no upgrade path for existing clusters. + inline_checksum: bool = False snapshot_replication_target_cluster: str = "" snapshot_replication_target_pool: str = "" snapshot_replication_timeout: int = 60*10 diff --git a/simplyblock_core/models/nvme_device.py b/simplyblock_core/models/nvme_device.py index 1badd5942..b1e7299ee 100644 --- a/simplyblock_core/models/nvme_device.py +++ b/simplyblock_core/models/nvme_device.py @@ -66,6 +66,11 @@ class NVMeDevice(BaseModel): last_flap_tsc: float = 0.0 serial_number: str = "" size: int = -1 + # NVMe per-block metadata size in bytes, as reported by the bound SPDK bdev. + # >=8 means alceml can run in cv_md_method (no read/write amplification). + # 0 means alceml must use cv_fallback_method (extra md page per 2 MiB extent). + md_size: int = 0 + md_supported: bool = False testing_bdev: str = "" connecting_from_node: str = "" previous_status: str = "" diff --git a/simplyblock_core/rpc_client.py b/simplyblock_core/rpc_client.py index b34ad34bf..1e3861808 100755 --- a/simplyblock_core/rpc_client.py +++ b/simplyblock_core/rpc_client.py @@ -614,7 +614,8 @@ def qos_vbdev_delete(self, name): def bdev_alceml_create(self, alceml_name, nvme_name, uuid, pba_init_mode=3, alceml_cpu_mask="", alceml_worker_cpu_mask="", pba_page_size=2097152, - write_protection=False, full_page_unmap=False): + write_protection=False, full_page_unmap=False, + checksum_method=0, cache_size=0, cache_eviction_threshold=0): params = { "name": alceml_name, "cntr_path": nvme_name, @@ -638,6 +639,15 @@ def bdev_alceml_create(self, alceml_name, nvme_name, uuid, pba_init_mode=3, params["write_protection"] = True if full_page_unmap: params["use_map_whole_page_on_1st_write"] = True + # Inline CRC checksum validation. method: 0=off, 1=md-on-device, 2=fallback (extra md page). + # The data plane reads md_size from spdk_bdev_get_md_size and refuses method=1 when md_size==0, + # so the caller must pick method=2 for devices without NVMe metadata support. + if checksum_method: + params["checksum_validation_method"] = int(checksum_method) + if cache_size: + params["cache_size"] = int(cache_size) + if cache_eviction_threshold: + params["cache_eviction_threshold"] = int(cache_eviction_threshold) return self._request("bdev_alceml_create", params) def bdev_distrib_create(self, name, vuid, ndcs, npcs, num_blocks, block_size, jm_names, diff --git a/simplyblock_core/storage_node_ops.py b/simplyblock_core/storage_node_ops.py index 39ddeb73c..18868960e 100755 --- a/simplyblock_core/storage_node_ops.py +++ b/simplyblock_core/storage_node_ops.py @@ -823,12 +823,22 @@ def _create_storage_device_stack(rpc_client, nvme, snode, after_restart): cluster = db_controller.get_cluster_by_id(snode.cluster_id) + checksum_method, cache_size, cache_eviction_threshold = utils.alceml_checksum_params(cluster, nvme) + if cluster.inline_checksum and not nvme.md_supported: + logger.warning( + f"Inline checksum: device {nvme.get_id()} ({nvme.pcie_address}) has no NVMe metadata; " + f"alceml will run in fallback mode (extra md page, ~1.17%% capacity overhead)." + ) + ret = snode.create_alceml( alceml_name, nvme_bdev, alceml_id, pba_init_mode=1 if (after_restart and nvme.status != NVMeDevice.STATUS_NEW) else 3, write_protection=cluster.distr_ndcs > 1, pba_page_size=cluster.page_size_in_blocks, full_page_unmap=cluster.full_page_unmap, + checksum_method=checksum_method, + cache_size=cache_size, + cache_eviction_threshold=cache_eviction_threshold, ) if not ret: @@ -2776,6 +2786,9 @@ def _restart_storage_node_impl( db_dev.nvme_bdev = found_dev.nvme_bdev db_dev.nvme_controller = found_dev.nvme_controller db_dev.pcie_address = found_dev.pcie_address + # Refresh md detection so a re-format between restarts is reflected + db_dev.md_size = found_dev.md_size + db_dev.md_supported = found_dev.md_supported # if db_dev.status in [ NVMeDevice.STATUS_ONLINE]: # db_dev.status = NVMeDevice.STATUS_UNAVAILABLE @@ -3543,11 +3556,17 @@ def shutdown_storage_node(node_id, force=False): logger.info("Node found: %s in state: %s", snode.hostname, snode.status) - if not force: - allowed, reason = _check_ftt_allows_node_removal(node_id, db_controller) - if not allowed: - logger.error(f"Cannot shutdown node: {reason}") - return False, reason + # NOTE: shutdown does not consult _check_ftt_allows_node_removal. + # Removal and shutdown are different operations: removing a node + # permanently changes the cluster's storage budget, while shutting one + # down is a transient state that the cluster is meant to absorb under + # its FTT contract. Conflating the two was added in commit fbdffea3 + # (2026-03-28) and caused soak/operator workflows to wait for + # rebalancing to drain — the wrong policy for an operation whose + # whole point is to disrupt the cluster on purpose. The web API + # layer (simplyblock_web/api/v{1,2}/storage_node.py) still gates on + # this for its own non-force shutdown endpoint, where the policy + # decision belongs. # Guard: no concurrent shutdown + restart (design: mutual exclusion) for peer in db_controller.get_storage_nodes_by_cluster_id(snode.cluster_id): @@ -3937,7 +3956,7 @@ def upgrade_automated_deployment_config(): def generate_automated_deployment_config(max_lvol, max_prov, sockets_to_use, nodes_per_socket, pci_allowed, pci_blocked, cores_percentage=0, force=False, device_model="", size_range="", nvme_names=None, k8s=False, - calculate_hp_only=False, number_of_devices=0): + calculate_hp_only=False, number_of_devices=0, inline_checksum=False): if calculate_hp_only: minimum_hp_memory = utils.calculate_hp_only(max_lvol, number_of_devices, sockets_to_use, nodes_per_socket, cores_percentage) hp_number = math.ceil(minimum_hp_memory / 2) @@ -3955,7 +3974,8 @@ def generate_automated_deployment_config(max_lvol, max_prov, sockets_to_use, nod nodes_config, system_info = utils.generate_configs(max_lvol, max_prov, sockets_to_use, nodes_per_socket, pci_allowed, pci_blocked, cores_percentage, force=force, - device_model=device_model, size_range=size_range, nvme_names=nvme_names) + device_model=device_model, size_range=size_range, nvme_names=nvme_names, + inline_checksum=inline_checksum) if not nodes_config or not nodes_config.get("nodes"): return False utils.store_config_file(nodes_config, constants.NODES_CONFIG_FILE, create_read_only_file=True) diff --git a/simplyblock_core/utils/__init__.py b/simplyblock_core/utils/__init__.py index 55ab7dc98..4d2b47a33 100644 --- a/simplyblock_core/utils/__init__.py +++ b/simplyblock_core/utils/__init__.py @@ -1221,6 +1221,41 @@ def validate_sec_options(sec_options): return True, None +def alceml_fallback_overhead_bytes(cluster, device_size_bytes): + """Bytes of capacity charged as initial utilization when alceml runs in + cv_fallback_method on a device. Per 2 MiB extent the layout shrinks from + 510 to 504 data blocks (1 extended-md block + 5 filler), so we lose 6 + blocks per page. Returns 0 when inline_checksum is off, when device size + is unknown, or when the device runs in md-on-device mode (caller must + have already filtered for md_supported=False). + """ + if not getattr(cluster, 'inline_checksum', False): + return 0 + if not device_size_bytes or device_size_bytes <= 0: + return 0 + blk_size = cluster.blk_size or 4096 + page_size = cluster.page_size_in_blocks or (2 * 1024 * 1024) + pages = device_size_bytes // page_size + return int(pages * 6 * blk_size) + + +def alceml_checksum_params(cluster, nvme_device): + """Pick the inline-checksum method and tunables for bdev_alceml_create. + + Returns (method, cache_size, cache_eviction_threshold). method: + 0 = off (cluster.inline_checksum False) + 1 = md-on-device (cv_md_method, no read/write amplification) + 2 = fallback (cv_fallback_method, extra md page per 2 MiB extent) + cache_size and cache_eviction_threshold default to 0 so the data plane + keeps its built-in defaults (2000 entries, 90% eviction trigger). + """ + if not getattr(cluster, 'inline_checksum', False): + return 0, 0, 0 + if getattr(nvme_device, 'md_supported', False): + return 1, 0, 0 + return 2, 0, 0 + + def addNvmeDevices(rpc_client, snode, devs): devices = [] ret = rpc_client.bdev_nvme_controller_list() @@ -1283,6 +1318,12 @@ def addNvmeDevices(rpc_client, snode, devs): else: logger.error(f"No subsystem nqn found for device: {nvme_driver_data['pci_address']}") + # SPDK exposes per-namespace metadata size as a top-level uint32 in bdev_get_bdevs JSON + # (lib/bdev/bdev_rpc.c writes "md_size" via spdk_bdev_get_md_size). >=8 means alceml can run + # in cv_md_method on this device; 0 means it must run in cv_fallback_method. + md_size = int(nvme_dict.get('md_size', 0) or 0) + md_supported = md_size >= 8 + devices.append( NVMeDevice({ 'uuid': str(uuid.uuid4()), @@ -1296,7 +1337,9 @@ def addNvmeDevices(rpc_client, snode, devs): 'nvme_controller': nvme_controller, 'node_id': snode.get_id(), 'cluster_id': snode.cluster_id, - 'status': NVMeDevice.STATUS_ONLINE + 'status': NVMeDevice.STATUS_ONLINE, + 'md_size': md_size, + 'md_supported': md_supported, })) return devices @@ -1786,7 +1829,8 @@ def regenerate_config(new_config, old_config, force=False): def generate_configs(max_lvol, max_prov, sockets_to_use, nodes_per_socket, pci_allowed, pci_blocked, - cores_percentage=0, force=False, device_model="", size_range="", nvme_names=None): + cores_percentage=0, force=False, device_model="", size_range="", nvme_names=None, + inline_checksum=False): system_info = {} nodes_config: dict = {"nodes": []} @@ -1811,8 +1855,24 @@ def generate_configs(max_lvol, max_prov, sockets_to_use, nodes_per_socket, pci_a nvme_device_path = f"/dev/{nvme_device}n1" clean_partitions(nvme_device_path) nvme_json_string = get_idns(nvme_device_path) - lbaf_id = find_lbaf_id(nvme_json_string, 0, 12) - format_nvme_device(nvme_device_path, lbaf_id) + lbaf_id = None + md_lbaf = False + if inline_checksum: + # Prefer an LBAF with metadata so alceml can run in cv_md_method on this drive. + lbaf_id = find_md_lbaf_id(nvme_json_string, target_ds=12, min_ms=8) + if lbaf_id is None: + logger.warning( + f"--enable-inline-checksum: device {nvme_device_path} exposes no 4K LBAF with >=8B metadata; " + f"formatting plain 4K. alceml will run in fallback mode on this drive." + ) + else: + md_lbaf = True + logger.info(f"Formatting {nvme_device_path} with md-capable LBAF index {lbaf_id}") + if lbaf_id is None: + lbaf_id = find_lbaf_id(nvme_json_string, 0, 12) + # When switching to an md-capable LBAF, the namespace SectorSize stays 4096, + # so the in-list 4K early-out would skip the reformat. Force it. + format_nvme_device(nvme_device_path, lbaf_id, force_reformat=md_lbaf) for nid in sockets_to_use: if nid in cores_by_numa: @@ -2838,6 +2898,26 @@ def find_lbaf_id(json_data: str, target_ms: int, target_ds: int) -> int: return 0 +def find_md_lbaf_id(json_data: str, target_ds: int = 12, min_ms: int = 8): + """Return the LBAF index for a format with data-size==target_ds (log2, 12=4K) + and metadata-size>=min_ms. Among matches, prefer the smallest ms to avoid + wasting space on 64B-md formats. Returns None if no such LBAF exists. + """ + try: + data = json.loads(json_data) + except (json.JSONDecodeError, TypeError): + return None + candidates = [] + for index, lbaf in enumerate(data.get('lbafs', [])): + ms = lbaf.get('ms', 0) + if lbaf.get('ds') == target_ds and ms >= min_ms: + candidates.append((ms, index)) + if not candidates: + return None + candidates.sort() + return candidates[0][1] + + def get_idns(nvme_device: str): command = ['nvme', 'id-ns', nvme_device, '--output-format', 'json'] try: @@ -2909,8 +2989,11 @@ def is_namespace_4k_from_nvme_list(device_path: str) -> bool: return False -def format_nvme_device(nvme_device: str, lbaf_id: int): - if is_namespace_4k_from_nvme_list(nvme_device): +def format_nvme_device(nvme_device: str, lbaf_id: int, force_reformat: bool = False): + # The 4K early-out only checks SectorSize, not metadata size, so it would + # silently skip a reformat needed to switch a 4K-no-md namespace to 4K-with-md. + # Callers that need a specific LBAF (e.g. md-capable) pass force_reformat=True. + if not force_reformat and is_namespace_4k_from_nvme_list(nvme_device): logger.debug(f"Device {nvme_device} already formatted with 4K...skipping") return command = ['nvme', 'format', nvme_device, f"--lbaf={lbaf_id}", '--force'] diff --git a/tests/test_inline_checksum.py b/tests/test_inline_checksum.py new file mode 100644 index 000000000..f83bc6644 --- /dev/null +++ b/tests/test_inline_checksum.py @@ -0,0 +1,308 @@ +# coding=utf-8 +""" +test_inline_checksum.py – unit tests for the per-cluster inline CRC checksum +validation feature (TD.100226.1). + +Covers: + * Cluster.inline_checksum / NVMeDevice.md_size / md_supported model defaults. + * find_md_lbaf_id helper – LBAF selection from `nvme id-ns` JSON. + * alceml_checksum_params helper – cluster flag + per-device md combo logic. + * alceml_fallback_overhead_bytes helper – capacity-overhead math. + * bdev_alceml_create RPC – correct param wire-up for each method. + * addNvmeDevices – md_size flowing from SPDK bdev JSON onto NVMeDevice. +""" + +import json +import unittest +from unittest.mock import patch, MagicMock + +from simplyblock_core import utils +from simplyblock_core.models.cluster import Cluster +from simplyblock_core.models.nvme_device import NVMeDevice +from simplyblock_core.rpc_client import RPCClient + + +def _make_rpc_client(): + with patch("requests.session"): + return RPCClient("127.0.0.1", 8081, "user", "pass", timeout=1, retry=0) + + +# --------------------------------------------------------------------------- +# Model defaults & persistence +# --------------------------------------------------------------------------- +class TestModelDefaults(unittest.TestCase): + def test_cluster_inline_checksum_defaults_off(self): + c = Cluster() + self.assertFalse(c.inline_checksum) + + def test_cluster_inline_checksum_can_be_set(self): + c = Cluster() + c.inline_checksum = True + self.assertTrue(c.inline_checksum) + + def test_nvme_device_md_fields_default_off(self): + d = NVMeDevice() + self.assertEqual(d.md_size, 0) + self.assertFalse(d.md_supported) + + def test_nvme_device_md_fields_round_trip(self): + d = NVMeDevice({'md_size': 16, 'md_supported': True}) + self.assertEqual(d.md_size, 16) + self.assertTrue(d.md_supported) + + +# --------------------------------------------------------------------------- +# find_md_lbaf_id +# --------------------------------------------------------------------------- +class TestFindMdLbafId(unittest.TestCase): + def _idns(self, lbafs): + return json.dumps({'lbafs': lbafs}) + + def test_returns_none_when_no_md_lbaf(self): + # Only 4K-no-md available. + s = self._idns([ + {'ms': 0, 'ds': 9}, + {'ms': 0, 'ds': 12}, + ]) + self.assertIsNone(utils.find_md_lbaf_id(s)) + + def test_picks_smallest_ms_above_min(self): + # 4K-with-8B is preferred over 4K-with-64B (waste less space). + s = self._idns([ + {'ms': 0, 'ds': 12}, # idx 0 – no md + {'ms': 64, 'ds': 12}, # idx 1 + {'ms': 8, 'ds': 12}, # idx 2 – preferred + {'ms': 16, 'ds': 12}, # idx 3 + ]) + self.assertEqual(utils.find_md_lbaf_id(s), 2) + + def test_skips_non_matching_ds(self): + # An LBAF with ms>=8 but ds!=12 must not be selected. + s = self._idns([ + {'ms': 8, 'ds': 9}, # 512B with md – ignore + {'ms': 0, 'ds': 12}, + ]) + self.assertIsNone(utils.find_md_lbaf_id(s)) + + def test_below_min_ms_excluded(self): + s = self._idns([ + {'ms': 4, 'ds': 12}, # below 8B threshold + {'ms': 0, 'ds': 12}, + ]) + self.assertIsNone(utils.find_md_lbaf_id(s)) + + def test_invalid_json_returns_none(self): + self.assertIsNone(utils.find_md_lbaf_id("not json")) + self.assertIsNone(utils.find_md_lbaf_id(None)) + + def test_empty_lbafs_returns_none(self): + self.assertIsNone(utils.find_md_lbaf_id(self._idns([]))) + + +# --------------------------------------------------------------------------- +# alceml_checksum_params +# --------------------------------------------------------------------------- +class TestAlcemlChecksumParams(unittest.TestCase): + def test_off_when_cluster_flag_off(self): + c = Cluster({'inline_checksum': False}) + d = NVMeDevice({'md_supported': True}) + self.assertEqual(utils.alceml_checksum_params(c, d), (0, 0, 0)) + + def test_method_1_when_md_supported(self): + c = Cluster({'inline_checksum': True}) + d = NVMeDevice({'md_supported': True, 'md_size': 8}) + self.assertEqual(utils.alceml_checksum_params(c, d), (1, 0, 0)) + + def test_method_2_when_md_unsupported(self): + c = Cluster({'inline_checksum': True}) + d = NVMeDevice({'md_supported': False, 'md_size': 0}) + self.assertEqual(utils.alceml_checksum_params(c, d), (2, 0, 0)) + + def test_off_for_cluster_without_attribute(self): + # Old DB record (no inline_checksum field) must behave as off. + class _Old: + pass + d = NVMeDevice({'md_supported': True}) + self.assertEqual(utils.alceml_checksum_params(_Old(), d), (0, 0, 0)) + + +# --------------------------------------------------------------------------- +# alceml_fallback_overhead_bytes +# --------------------------------------------------------------------------- +class TestFallbackOverhead(unittest.TestCase): + def test_zero_when_flag_off(self): + c = Cluster({'inline_checksum': False, 'blk_size': 4096, 'page_size_in_blocks': 2 * 1024 * 1024}) + self.assertEqual(utils.alceml_fallback_overhead_bytes(c, 100 * 2 * 1024 * 1024), 0) + + def test_zero_for_zero_or_negative_size(self): + c = Cluster({'inline_checksum': True, 'blk_size': 4096, 'page_size_in_blocks': 2 * 1024 * 1024}) + self.assertEqual(utils.alceml_fallback_overhead_bytes(c, 0), 0) + self.assertEqual(utils.alceml_fallback_overhead_bytes(c, -1), 0) + + def test_six_blocks_per_page(self): + # 100 pages × 2 MiB = 200 MiB device. Overhead = 100 × 6 × 4 KiB = 2400 KiB. + c = Cluster({'inline_checksum': True, 'blk_size': 4096, 'page_size_in_blocks': 2 * 1024 * 1024}) + device_size = 100 * 2 * 1024 * 1024 + expected = 100 * 6 * 4096 + self.assertEqual(utils.alceml_fallback_overhead_bytes(c, device_size), expected) + + def test_partial_page_floored(self): + # 1.5 pages → only 1 full page counts (page-granular accounting). + c = Cluster({'inline_checksum': True, 'blk_size': 4096, 'page_size_in_blocks': 2 * 1024 * 1024}) + partial = (1 * 2 * 1024 * 1024) + (1 * 1024 * 1024) + self.assertEqual(utils.alceml_fallback_overhead_bytes(c, partial), 1 * 6 * 4096) + + def test_overhead_is_about_1_17_percent(self): + # Sanity: the design doc cites ~1.17% overhead in fallback mode. + c = Cluster({'inline_checksum': True, 'blk_size': 4096, 'page_size_in_blocks': 2 * 1024 * 1024}) + size = 1024 * 2 * 1024 * 1024 # 2 GiB, 1024 pages + ratio = utils.alceml_fallback_overhead_bytes(c, size) / size + self.assertAlmostEqual(ratio, 6 / 512, places=6) + + +# --------------------------------------------------------------------------- +# bdev_alceml_create RPC params +# --------------------------------------------------------------------------- +class TestBdevAlcemlCreateRPC(unittest.TestCase): + @patch.object(RPCClient, "_request") + def test_no_checksum_params_when_method_zero(self, mock_req): + mock_req.return_value = True + client = _make_rpc_client() + client.bdev_alceml_create("alc_x", "nvme0", "uuid-1") + params = mock_req.call_args[0][1] + self.assertNotIn("checksum_validation_method", params) + self.assertNotIn("cache_size", params) + self.assertNotIn("cache_eviction_threshold", params) + + @patch.object(RPCClient, "_request") + def test_method_1_only_emits_method_field(self, mock_req): + mock_req.return_value = True + client = _make_rpc_client() + client.bdev_alceml_create("alc_x", "nvme0", "uuid-1", checksum_method=1) + params = mock_req.call_args[0][1] + self.assertEqual(params["checksum_validation_method"], 1) + # Defaults of 0 must not be sent so the data plane uses its own defaults. + self.assertNotIn("cache_size", params) + self.assertNotIn("cache_eviction_threshold", params) + + @patch.object(RPCClient, "_request") + def test_method_2_with_explicit_cache_overrides(self, mock_req): + mock_req.return_value = True + client = _make_rpc_client() + client.bdev_alceml_create( + "alc_x", "nvme0", "uuid-1", + checksum_method=2, cache_size=1500, cache_eviction_threshold=85, + ) + params = mock_req.call_args[0][1] + self.assertEqual(params["checksum_validation_method"], 2) + self.assertEqual(params["cache_size"], 1500) + self.assertEqual(params["cache_eviction_threshold"], 85) + + @patch.object(RPCClient, "_request") + def test_existing_params_unchanged(self, mock_req): + # Regression guard: the new kwargs must not perturb the well-known params + # the data plane expects. + mock_req.return_value = True + client = _make_rpc_client() + client.bdev_alceml_create( + "alc_x", "nvme0", "uuid-1", + pba_init_mode=2, pba_page_size=2 * 1024 * 1024, + write_protection=True, full_page_unmap=True, checksum_method=1, + ) + params = mock_req.call_args[0][1] + self.assertEqual(params["name"], "alc_x") + self.assertEqual(params["cntr_path"], "nvme0") + self.assertEqual(params["uuid"], "uuid-1") + self.assertEqual(params["pba_init_mode"], 2) + self.assertEqual(params["pba_page_size"], 2 * 1024 * 1024) + self.assertTrue(params["write_protection"]) + self.assertTrue(params["use_map_whole_page_on_1st_write"]) + self.assertEqual(params["checksum_validation_method"], 1) + + +# --------------------------------------------------------------------------- +# addNvmeDevices md detection +# --------------------------------------------------------------------------- +class TestAddNvmeDevicesMd(unittest.TestCase): + def _make_rpc_with_bdev(self, *, md_size): + rpc = MagicMock() + rpc.bdev_nvme_controller_list.return_value = [] + rpc.bdev_nvme_controller_attach.return_value = ["nvmeX_n1"] + rpc.bdev_examine.return_value = True + rpc.bdev_wait_for_examine.return_value = True + # SPDK bdev_get_bdevs payload – the only md-relevant field is the + # top-level uint32 md_size set by spdk_bdev_get_md_size. + bdev_payload = [{ + 'name': 'nvmeX_n1', + 'block_size': 4096, + 'num_blocks': 100 * 1024 * 1024 // 4096, # 100 MiB + 'md_size': md_size, + 'driver_specific': { + 'nvme': [{ + 'pci_address': '0000:00:01.0', + 'ctrlr_data': { + 'model_number': 'TEST_NVME', + 'serial_number': 'SN-TEST-1', + }, + }], + }, + }] + rpc.get_bdevs.return_value = bdev_payload + return rpc + + def _make_snode(self): + snode = MagicMock() + snode.physical_label = 0 + snode.id_device_by_nqn = False + snode.get_id.return_value = "snode-1" + snode.cluster_id = "cluster-1" + return snode + + def test_md_size_zero_marks_unsupported(self): + rpc = self._make_rpc_with_bdev(md_size=0) + snode = self._make_snode() + devs = utils.addNvmeDevices(rpc, snode, ["0000:00:01.0"]) + self.assertEqual(len(devs), 1) + self.assertEqual(devs[0].md_size, 0) + self.assertFalse(devs[0].md_supported) + + def test_md_size_8_marks_supported(self): + rpc = self._make_rpc_with_bdev(md_size=8) + snode = self._make_snode() + devs = utils.addNvmeDevices(rpc, snode, ["0000:00:01.0"]) + self.assertEqual(devs[0].md_size, 8) + self.assertTrue(devs[0].md_supported) + + def test_md_size_below_threshold_marks_unsupported(self): + # Pre-existing PI-only formats expose ms=4 (T10 PI's reftag field + # alone). That's < 8 bytes so checksums won't fit – treat as no-md. + rpc = self._make_rpc_with_bdev(md_size=4) + snode = self._make_snode() + devs = utils.addNvmeDevices(rpc, snode, ["0000:00:01.0"]) + self.assertEqual(devs[0].md_size, 4) + self.assertFalse(devs[0].md_supported) + + def test_missing_md_size_field_treated_as_zero(self): + # Older SPDK builds that omit the field entirely must not crash. + rpc = MagicMock() + rpc.bdev_nvme_controller_list.return_value = [] + rpc.bdev_nvme_controller_attach.return_value = ["nvmeX_n1"] + rpc.get_bdevs.return_value = [{ + 'name': 'nvmeX_n1', + 'block_size': 4096, + 'num_blocks': 100 * 1024 * 1024 // 4096, + 'driver_specific': { + 'nvme': [{ + 'pci_address': '0000:00:01.0', + 'ctrlr_data': {'model_number': 'M', 'serial_number': 'S'}, + }], + }, + }] + snode = self._make_snode() + devs = utils.addNvmeDevices(rpc, snode, ["0000:00:01.0"]) + self.assertEqual(devs[0].md_size, 0) + self.assertFalse(devs[0].md_supported) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_shutdown_no_suspension.py b/tests/test_shutdown_no_suspension.py index 5ef9dae19..fc2c09ad2 100644 --- a/tests/test_shutdown_no_suspension.py +++ b/tests/test_shutdown_no_suspension.py @@ -348,7 +348,7 @@ class TestShutdownStorageNodeGraceful(unittest.TestCase): Loop 2 (detach), does NOT call FirewallClient, and finishes with a SPDK kill + offline transition.""" - def _patch(self, force=False, allow_ftt=True): + def _patch(self, force=False): from simplyblock_core import storage_node_ops as sno snode = _make_node("dying", status=StorageNode.STATUS_ONLINE, @@ -402,8 +402,9 @@ def _set_status(nid, status, caused_by="monitor"): patches = [ patch.object(sno, "DBController", return_value=db), - patch.object(sno, "_check_ftt_allows_node_removal", - return_value=(allow_ftt, "" if allow_ftt else "ftt")), + # _check_ftt_allows_node_removal is no longer called from + # shutdown_storage_node — shutdown does not consult the + # removal-budget guard. The web API layer still calls it. patch.object(sno.tasks_controller, "get_active_node_restart_task", return_value=None), patch.object(sno.tasks_controller, "get_active_node_tasks", @@ -476,14 +477,18 @@ def test_force_skips_loops(self): # Kill still happens. self.assertEqual(len(env["kill_calls"]), 1) - def test_ftt_block_aborts(self): + def test_shutdown_does_not_consult_ftt_removal_guard(self): + # Regression guard: shutdown_storage_node must not call + # _check_ftt_allows_node_removal. Removal and shutdown are + # different operations; shutdown is supposed to proceed under + # the FTT contract, not be blocked by a removal-budget check. + # See commit removing the fbdffea3 guard from this path. from simplyblock_core import storage_node_ops as sno - env = self._patch(allow_ftt=False) - ret = sno.shutdown_storage_node(env["snode"].get_id(), force=False) - # Returns (False, reason) on FTT block; no kill, no detach. - self.assertEqual(ret, (False, "ftt")) - self.assertEqual(env["kill_calls"], []) - self.assertEqual(env["detach_calls"], []) + env = self._patch() + with patch.object(sno, "_check_ftt_allows_node_removal") as guard: + self.assertTrue( + sno.shutdown_storage_node(env["snode"].get_id(), force=False)) + guard.assert_not_called() if __name__ == '__main__': diff --git a/tests/test_soak_outage_gap.py b/tests/test_soak_outage_gap.py new file mode 100644 index 000000000..4abe51673 --- /dev/null +++ b/tests/test_soak_outage_gap.py @@ -0,0 +1,275 @@ +# coding=utf-8 +""" +test_soak_outage_gap.py — unit tests for the mixed-churn soak's +inter-outage gap policy. + +Targets: +- ``SoakRunner._expected_min_unavail_seconds`` — lower bound on how long + a node is observably not-online after each outage method. Used to cap + the gap. +- ``SoakRunner._pick_outage_gap`` — random gap in + [--outage-gap-min, --outage-gap-max], capped per method 1 so the + caller-requested --min-outage-overlap is guaranteed (both nodes are + simultaneously not-online for at least that many seconds). + +These tests exist because the gap policy is the only mechanism guarding +the "two nodes are not-online for >= 10s" invariant that the soak is +contracted to exercise. If the cap math drifts, the soak silently stops +covering the dual-outage path it was built for. +""" + +import importlib.util +import os +import random +import sys +import types +import unittest +from types import SimpleNamespace +from unittest.mock import MagicMock + + +def _load_soak_module(): + """Load the soak script as a module without invoking its main(). + + The script lives under scripts/ and is intended to be run with + `python scripts/aws_dual_node_outage_soak_mixed_churn.py`. It does + not run anything at import time (only argparse + class defs), so a + plain importlib load gives us the SoakRunner class. + """ + repo_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + path = os.path.join(repo_root, "scripts", + "aws_dual_node_outage_soak_mixed_churn.py") + spec = importlib.util.spec_from_file_location("_soak_mixed_churn", path) + mod = importlib.util.module_from_spec(spec) + sys.modules["_soak_mixed_churn"] = mod + spec.loader.exec_module(mod) + return mod + + +def _runner_stub(**arg_overrides): + """Bare SoakRunner instance with .args populated and .logger stubbed. + + Bypasses SoakRunner.__init__ (which connects to remote hosts) via + object.__new__. Only the fields touched by the methods under test + need to be set. + """ + mod = _load_soak_module() + runner = object.__new__(mod.SoakRunner) + default_args = dict( + shutdown_gap=0, + outage_gap_min=15, + outage_gap_max=180, + min_outage_overlap=10, + ) + default_args.update(arg_overrides) + runner.args = SimpleNamespace(**default_args) + runner.logger = MagicMock() + return mod, runner + + +class TestExpectedMinUnavail(unittest.TestCase): + + def setUp(self): + self.mod, self.runner = _runner_stub() + + def test_network_outage_parses_duration(self): + # The method name encodes the NIC-down duration; that is the + # conservative floor on node-not-online time. + self.assertEqual( + self.runner._expected_min_unavail_seconds("network_outage_20"), 20) + self.assertEqual( + self.runner._expected_min_unavail_seconds("network_outage_50"), 50) + self.assertEqual( + self.runner._expected_min_unavail_seconds("network_outage_120"), 120) + + def test_network_outage_malformed_falls_back(self): + # Defensive: if the trailing token isn't a number, fall back to + # the generic 30s floor rather than raising. + self.assertEqual( + self.runner._expected_min_unavail_seconds("network_outage_xxx"), 30) + + def test_container_kill_and_host_reboot_floors(self): + self.assertEqual( + self.runner._expected_min_unavail_seconds("container_kill"), 30) + self.assertEqual( + self.runner._expected_min_unavail_seconds("host_reboot"), 90) + + def test_graceful_and_forced_are_effectively_unbounded(self): + # graceful/forced leave the node OFFLINE until run_outage_pair + # later issues `sn restart`, so from the gap's perspective the + # unavailability window is effectively unbounded. The sentinel + # has to be large enough that `cap = unavail - min_overlap` will + # never clip the user-configured max gap (default 180s). + for m in ("graceful", "forced"): + with self.subTest(method=m): + v = self.runner._expected_min_unavail_seconds(m) + self.assertGreaterEqual(v, 1000, + "graceful/forced must not be clipped by the gap cap") + + def test_unknown_method_defaults(self): + self.assertEqual( + self.runner._expected_min_unavail_seconds("future_chaos"), 30) + + +class TestPickOutageGap(unittest.TestCase): + + def setUp(self): + self.mod, self.runner = _runner_stub() + # Determinism: lock random.randint for repeatable assertions on + # bounds. random.uniform is not used by _pick_outage_gap. + self._orig_randint = random.randint + self.calls = [] + random.randint = lambda a, b: (self.calls.append((a, b)) or a) + + def tearDown(self): + random.randint = self._orig_randint + + def _last_bounds(self): + self.assertTrue(self.calls, "random.randint was not called") + return self.calls[-1] + + # --- graceful / forced: no clipping -------------------------------- + + def test_graceful_uses_full_configured_range(self): + self.runner._pick_outage_gap("graceful") + lo, hi = self._last_bounds() + self.assertEqual(lo, 15) + self.assertEqual(hi, 180) + + def test_forced_uses_full_configured_range(self): + self.runner._pick_outage_gap("forced") + lo, hi = self._last_bounds() + self.assertEqual(lo, 15) + self.assertEqual(hi, 180) + + # --- network_outage_N: cap = N - min_overlap ----------------------- + + def test_network_outage_50_caps_at_40_with_default_overlap(self): + # N=50, overlap=10 -> cap=40 < max=180 -> hi clipped to 40, + # lo stays at min=15. + self.runner._pick_outage_gap("network_outage_50") + lo, hi = self._last_bounds() + self.assertEqual(hi, 40) + self.assertEqual(lo, 15) + + def test_network_outage_20_clamps_both_lo_and_hi(self): + # N=20, overlap=10 -> cap=10. That's below min=15 too, so lo is + # clamped to hi (10). Result: deterministic 10s gap on this + # method. This is correct: a 20s NIC drop cannot also satisfy + # a 15s gap + 10s overlap; the overlap invariant wins. + self.runner._pick_outage_gap("network_outage_20") + lo, hi = self._last_bounds() + self.assertEqual(hi, 10) + self.assertEqual(lo, 10) + + def test_container_kill_caps_at_20(self): + # unavail=30, overlap=10 -> cap=20. + self.runner._pick_outage_gap("container_kill") + lo, hi = self._last_bounds() + self.assertEqual(hi, 20) + self.assertEqual(lo, 15) + + def test_host_reboot_caps_at_80(self): + # unavail=90, overlap=10 -> cap=80. + self.runner._pick_outage_gap("host_reboot") + lo, hi = self._last_bounds() + self.assertEqual(hi, 80) + self.assertEqual(lo, 15) + + # --- min-outage-overlap edge cases --------------------------------- + + def test_zero_overlap_disables_cap_for_short_methods(self): + # With overlap=0 the cap equals unavail itself. A network_outage_20 + # then permits up to a 20s gap. + self.mod, self.runner = _runner_stub(min_outage_overlap=0) + self.runner._pick_outage_gap("network_outage_20") + lo, hi = self._last_bounds() + self.assertEqual(hi, 20) + self.assertEqual(lo, 15) + + def test_overlap_larger_than_unavail_clamps_to_one(self): + # Pathological: ask for 60s overlap on a network_outage_20. + # cap = max(1, 20 - 60) = 1. The gap collapses to 1s — the + # overlap invariant cannot be met by any positive gap, so the + # implementation chooses the smallest legal gap (1s) and lets + # the caller observe the violation via the warning emitted at + # a higher layer (or via the runtime overlap check). + self.mod, self.runner = _runner_stub(min_outage_overlap=60) + self.runner._pick_outage_gap("network_outage_20") + lo, hi = self._last_bounds() + self.assertEqual((lo, hi), (1, 1)) + + # --- legacy --shutdown-gap takes precedence ------------------------ + + def test_shutdown_gap_legacy_overrides_random(self): + self.mod, self.runner = _runner_stub(shutdown_gap=45) + gap = self.runner._pick_outage_gap("network_outage_50") + self.assertEqual(gap, 45) + # random.randint must NOT have been consulted. + self.assertEqual(self.calls, []) + + def test_shutdown_gap_warns_when_exceeding_safe_cap(self): + # --shutdown-gap=200 on network_outage_20 is unsafe (cap=10). + # The method must still return the legacy value but emit a + # warning so operators notice. + self.mod, self.runner = _runner_stub(shutdown_gap=200) + gap = self.runner._pick_outage_gap("network_outage_20") + self.assertEqual(gap, 200) + warned = any( + "exceeds" in str(call) and "safe cap" in str(call) + for call in self.runner.logger.log.call_args_list + ) + self.assertTrue(warned, + f"expected a 'safe cap' warning log; got {self.runner.logger.log.call_args_list}") + + +class TestPickOutageGapDistribution(unittest.TestCase): + """Sanity check that the random gap stays within the documented + bounds across many draws, for a representative selection of + method 1 values. No mocking of random — real distribution test. + """ + + def setUp(self): + self.mod, self.runner = _runner_stub() + + def _bounds_for(self, method): + """Replicate the cap math the implementation uses.""" + unavail = self.runner._expected_min_unavail_seconds(method) + overlap = self.runner.args.min_outage_overlap + cap = max(1, unavail - overlap) + lo = max(1, self.runner.args.outage_gap_min) + hi = max(lo, self.runner.args.outage_gap_max) + hi = min(hi, cap) + lo = min(lo, hi) + return lo, hi + + def test_all_draws_inside_bounds(self): + for method in ("graceful", "forced", "container_kill", "host_reboot", + "network_outage_20", "network_outage_50"): + lo, hi = self._bounds_for(method) + for _ in range(200): + g = self.runner._pick_outage_gap(method) + self.assertGreaterEqual(g, lo, + f"{method}: gap {g} below lo {lo}") + self.assertLessEqual(g, hi, + f"{method}: gap {g} above hi {hi}") + + def test_overlap_invariant_holds(self): + # The whole point: gap + min_overlap <= expected_min_unavail. + # For graceful/forced this is trivially true (huge sentinel). + # For network_outage_N this is the tight constraint. + for method in ("container_kill", "host_reboot", + "network_outage_20", "network_outage_50"): + unavail = self.runner._expected_min_unavail_seconds(method) + overlap = self.runner.args.min_outage_overlap + for _ in range(200): + g = self.runner._pick_outage_gap(method) + self.assertLessEqual( + g + overlap, unavail, + f"{method}: gap {g} + overlap {overlap} > unavail {unavail}" + " — the overlap invariant the cap exists to preserve has" + " been violated") + + +if __name__ == "__main__": + unittest.main()