diff --git a/doc/ref/runners/all/index.rst b/doc/ref/runners/all/index.rst index 3bf5b192d675..4e305d78f5a0 100644 --- a/doc/ref/runners/all/index.rst +++ b/doc/ref/runners/all/index.rst @@ -26,6 +26,7 @@ runner modules net network pillar + pki queue reactor salt diff --git a/doc/ref/runners/all/salt.runners.pki.rst b/doc/ref/runners/all/salt.runners.pki.rst new file mode 100644 index 000000000000..d13fd37065b3 --- /dev/null +++ b/doc/ref/runners/all/salt.runners.pki.rst @@ -0,0 +1,6 @@ +salt.runners.pki +================ + +.. automodule:: salt.runners.pki + :members: + :undoc-members: diff --git a/salt/cache/__init__.py b/salt/cache/__init__.py index 00b9e86a72fc..60e58530a664 100644 --- a/salt/cache/__init__.py +++ b/salt/cache/__init__.py @@ -77,7 +77,7 @@ def __init__(self, opts, cachedir=None, **kwargs): def modules(self): return salt.loader.cache(self.opts) - @cached_property + @property def kwargs(self): try: return self.modules[f"{self.driver}.init_kwargs"](self._kwargs) @@ -260,6 +260,36 @@ def list(self, bank): fun = f"{self.driver}.list" return self.modules[fun](bank, **self.kwargs) + def list_all(self, bank, include_data=False): + """ + Lists all entries with their data from the specified bank. + This is more efficient than calling list() + fetch() for each entry. + + :param bank: + The name of the location inside the cache which will hold the key + and its associated data. + + :param include_data: + Whether to include the full data for each entry. For some drivers + (like localfs_key), setting this to False avoids expensive disk reads. + + :return: + A dict of {key: data} for all entries in the bank. Returns an empty + dict if the bank doesn't exist or the driver doesn't support list_all. + + :raises SaltCacheError: + Raises an exception if cache driver detected an error accessing data + in the cache backend (auth, permissions, etc). + """ + fun = f"{self.driver}.list_all" + if fun in self.modules: + return self.modules[fun](bank, include_data=include_data, **self.kwargs) + else: + # Fallback for drivers that don't implement list_all + raise AttributeError( + f"Cache driver '{self.driver}' does not implement list_all" + ) + def contains(self, bank, key=None): """ Checks if the specified bank contains the specified key. diff --git a/salt/cache/localfs_key.py b/salt/cache/localfs_key.py index 9ff7e440e275..c1f8a0b3031a 100644 --- a/salt/cache/localfs_key.py +++ b/salt/cache/localfs_key.py @@ -29,6 +29,8 @@ import salt.utils.atomicfile import salt.utils.files +import salt.utils.mmap_cache +import salt.utils.pki import salt.utils.stringutils from salt.exceptions import SaltCacheError from salt.utils.verify import clean_path, valid_id @@ -37,6 +39,10 @@ __func_alias__ = {"list_": "list"} +# Module-level index cache (lazy initialized) +# Keyed by pki_dir to support multiple Master instances in tests +_indices = {} + BASE_MAPPING = { "minions_pre": "pending", @@ -81,6 +87,90 @@ def init_kwargs(kwargs): return {"cachedir": pki_dir, "user": user} +def _get_index(opts): + """ + Get or create the PKI index for the given options. + The index is an internal optimization for fast O(1) lookups. + """ + if "cluster_id" in opts and opts["cluster_id"]: + pki_dir = opts["cluster_pki_dir"] + else: + pki_dir = opts.get("pki_dir") + + if not pki_dir: + return None + + pki_dir = os.path.abspath(pki_dir) + if pki_dir not in _indices: + # Index lives in cachedir instead of etc + cachedir = opts.get("cachedir", "/var/cache/salt/master") + index_path = os.path.join(cachedir, ".pki_index.mmap") + size = opts.get("pki_index_size", 1000000) + slot_size = opts.get("pki_index_slot_size", 128) + _indices[pki_dir] = salt.utils.mmap_cache.MmapCache( + path=index_path, size=size, slot_size=slot_size + ) + return _indices[pki_dir] + + +def rebuild_index(opts): + """ + Rebuild the PKI index from filesystem. + Returns True on success, False on failure. + """ + index = _get_index(opts) + if not index: + log.error("rebuild_index: failed to get index object") + return False + + if "cluster_id" in opts and opts["cluster_id"]: + pki_dir = opts["cluster_pki_dir"] + else: + pki_dir = opts.get("pki_dir") + + log.debug("rebuild_index: pki_dir=%s", pki_dir) + if not pki_dir: + log.error("rebuild_index: pki_dir missing in opts") + return False + + # Build list of all keys from filesystem + items = [] + state_mapping = { + "minions": "accepted", + "minions_pre": "pending", + "minions_rejected": "rejected", + } + + for dir_name, state in state_mapping.items(): + dir_path = os.path.join(pki_dir, dir_name) + if not os.path.isdir(dir_path): + continue + + try: + with os.scandir(dir_path) as it: + for entry in it: + if entry.is_file() and not entry.is_symlink(): + if entry.name.startswith("."): + continue + items.append((entry.name, state)) + except OSError as exc: + log.error("Error scanning %s: %s", dir_path, exc) + + # Atomically rebuild index + return index.atomic_rebuild(items) + + +def get_index_stats(opts): + """ + Get statistics about the PKI index. + Returns dict with stats or None if index unavailable. + """ + index = _get_index(opts) + if not index: + return None + return index.get_stats() + + def store(bank, key, data, cachedir, user, **kwargs): """ Store key state information. storing a accepted/pending/rejected state @@ -97,7 +187,10 @@ def store(bank, key, data, cachedir, user, **kwargs): else: umask = 0o0750 + # Save state for index update (before we modify data) + state_for_index = None if bank == "keys": + state_for_index = data["state"] if data["state"] == "rejected": base = "minions_rejected" elif data["state"] == "pending": @@ -166,6 +259,15 @@ def store(bank, key, data, cachedir, user, **kwargs): f"There was an error writing the cache file, base={base}: {exc}" ) + # Update index after successful filesystem write + if bank == "keys" and state_for_index: + try: + index = _get_index(__opts__) + if index: + index.put(key, state_for_index) + except Exception as exc: # pylint: disable=broad-except + log.warning("Failed to update PKI index: %s", exc) + def fetch(bank, key, cachedir, **kwargs): """ @@ -316,14 +418,43 @@ def flush(bank, key=None, cachedir=None, **kwargs): except OSError as exc: if exc.errno != errno.ENOENT: raise SaltCacheError(f'There was an error removing "{target}": {exc}') + + # Update index after successful filesystem deletion + if bank == "keys" and key is not None and flushed: + try: + index = _get_index(__opts__) + if index: + index.delete(key) + except Exception as exc: # pylint: disable=broad-except + log.warning("Failed to update PKI index: %s", exc) + return flushed def list_(bank, cachedir, **kwargs): """ Return an iterable object containing all entries stored in the specified bank. + Uses internal mmap index for O(1) performance when available. """ if bank == "keys": + # Try to use index first (internal optimization) + try: + index = _get_index(__opts__) + if index: + items = index.list_items() + if items: + # Filter by state (accepted/pending/rejected, not denied) + minions = [ + mid + for mid, state in items + if state in ("accepted", "pending", "rejected") + ] + if minions: + return minions + except Exception as exc: # pylint: disable=broad-except + log.debug("PKI index unavailable, falling back to directory scan: %s", exc) + + # Fallback to directory scan bases = [base for base in BASE_MAPPING if base != "minions_denied"] elif bank == "denied_keys": bases = ["minions_denied"] @@ -334,37 +465,153 @@ def list_(bank, cachedir, **kwargs): ret = [] for base in bases: - base = os.path.join(cachedir, os.path.normpath(base)) - if not os.path.isdir(base): + base_path = os.path.join(cachedir, os.path.normpath(base)) + if not os.path.isdir(base_path): continue try: - items = os.listdir(base) + with os.scandir(base_path) as it: + for entry in it: + if entry.is_file() and not entry.is_symlink(): + item = entry.name + # salt foolishly dumps a file here for key cache, ignore it + if item == ".key_cache": + continue + + if ( + bank in ["keys", "denied_keys"] + and not valid_id(__opts__, item) + ) or not clean_path(cachedir, entry.path, subdir=True): + log.error("saw invalid id %s, discarding", item) + continue + + ret.append(item) except OSError as exc: raise SaltCacheError( - f'There was an error accessing directory "{base}": {exc}' + f'There was an error accessing directory "{base_path}": {exc}' ) - for item in items: - # salt foolishly dumps a file here for key cache, ignore it - keyfile = Path(cachedir, base, item) + return ret - if ( - bank in ["keys", "denied_keys"] and not valid_id(__opts__, item) - ) or not clean_path(cachedir, str(keyfile), subdir=True): - log.error("saw invalid id %s, discarding", item) - if keyfile.is_file() and not keyfile.is_symlink(): - ret.append(item) +def list_all(bank, cachedir, include_data=False, **kwargs): + """ + Return all entries with their data from the specified bank. + This is much faster than calling list() + fetch() for each item. + Returns a dict of {key: data}. + + If include_data is False (default), only the state is returned for 'keys' bank, + avoiding expensive file reads. + """ + if bank not in ["keys", "denied_keys"]: + raise SaltCacheError(f"Unrecognized bank: {bank}") + + ret = {} + + if bank == "keys": + # Map directory names to states + state_mapping = { + "minions": "accepted", + "minions_pre": "pending", + "minions_rejected": "rejected", + } + + for dir_name, state in state_mapping.items(): + dir_path = os.path.join(cachedir, dir_name) + log.error("list_all: scanning dir_path: %s", dir_path) + if not os.path.isdir(dir_path): + log.error("list_all: not a directory: %s", dir_path) + continue + + try: + with os.scandir(dir_path) as it: + for entry in it: + log.error("list_all: found entry: %s", entry.name) + if not entry.is_file() or entry.is_symlink(): + log.error( + "list_all: skipping entry (not file or is symlink): %s", + entry.name, + ) + continue + if entry.name.startswith("."): + continue + # Use direct check instead of valid_id to avoid __opts__ dependency in loop + if any(x in entry.name for x in ("/", "\\", "\0")): + log.error( + "list_all: skipping entry (illegal chars): %s", + entry.name, + ) + continue + if not clean_path(cachedir, entry.path, subdir=True): + log.error( + "list_all: skipping entry (clean_path failed): %s, cachedir: %s", + entry.path, + cachedir, + ) + continue + + if include_data: + + # Read the public key + try: + with salt.utils.files.fopen(entry.path, "r") as fh_: + pub_key = fh_.read() + ret[entry.name] = {"state": state, "pub": pub_key} + except OSError as exc: + log.error( + "Error reading key file %s: %s", entry.path, exc + ) + else: + # Just return the state, no disk read + ret[entry.name] = {"state": state} + except OSError as exc: + log.error("Error scanning directory %s: %s", dir_path, exc) + + elif bank == "denied_keys": + # Denied keys work differently - multiple keys per minion ID + dir_path = os.path.join(cachedir, "minions_denied") + if os.path.isdir(dir_path): + try: + with os.scandir(dir_path) as it: + for entry in it: + if not entry.is_file() or entry.is_symlink(): + continue + if not valid_id(__opts__, entry.name): + continue + if not clean_path(cachedir, entry.path, subdir=True): + continue + + try: + with salt.utils.files.fopen(entry.path, "r") as fh_: + ret[entry.name] = fh_.read() + except OSError as exc: + log.error( + "Error reading denied key %s: %s", entry.path, exc + ) + except OSError as exc: + log.error("Error scanning denied keys directory: %s", exc) + return ret def contains(bank, key, cachedir, **kwargs): """ Checks if the specified bank contains the specified key. + Uses internal mmap index for O(1) performance when available. """ if bank in ["keys", "denied_keys"] and not valid_id(__opts__, key): raise SaltCacheError(f"key {key} is not a valid minion_id") if bank == "keys": + # Try index first (internal optimization) + try: + index = _get_index(__opts__) + if index: + state = index.get(key) + if state: + return True + except Exception: # pylint: disable=broad-except + pass # Fall through to filesystem check + + # Fallback to filesystem check bases = [base for base in BASE_MAPPING if base != "minions_denied"] elif bank == "denied_keys": bases = ["minions_denied"] diff --git a/salt/cli/daemons.py b/salt/cli/daemons.py index a791e81f6dd6..ccca79504e64 100644 --- a/salt/cli/daemons.py +++ b/salt/cli/daemons.py @@ -168,6 +168,7 @@ def verify_environment(self): permissive=self.config["permissive_pki_access"], root_dir=self.config["root_dir"], pki_dir=pki_dir, + opts=self.config, ) def prepare(self): @@ -296,6 +297,7 @@ def prepare(self): permissive=self.config["permissive_pki_access"], root_dir=self.config["root_dir"], pki_dir=self.config["pki_dir"], + opts=self.config, ) except OSError as error: self.environment_failure(error) @@ -487,6 +489,7 @@ def prepare(self): permissive=self.config["permissive_pki_access"], root_dir=self.config["root_dir"], pki_dir=self.config["pki_dir"], + opts=self.config, ) except OSError as error: self.environment_failure(error) @@ -592,6 +595,7 @@ def prepare(self): permissive=self.config["permissive_pki_access"], root_dir=self.config["root_dir"], pki_dir=self.config["pki_dir"], + opts=self.config, ) except OSError as error: self.environment_failure(error) diff --git a/salt/cli/spm.py b/salt/cli/spm.py index a4c97da55705..a27bde37bddf 100644 --- a/salt/cli/spm.py +++ b/salt/cli/spm.py @@ -30,6 +30,7 @@ def run(self): v_dirs, self.config["user"], root_dir=self.config["root_dir"], + opts=self.config, ) client = salt.spm.SPMClient(ui, self.config) client.run(self.args) diff --git a/salt/cloud/__init__.py b/salt/cloud/__init__.py index db657d097fdc..87309bf49942 100644 --- a/salt/cloud/__init__.py +++ b/salt/cloud/__init__.py @@ -181,7 +181,7 @@ def __init__(self, path=None, opts=None, config_dir=None, pillars=None): # Check the cache-dir exists. If not, create it. v_dirs = [self.opts["cachedir"]] - salt.utils.verify.verify_env(v_dirs, salt.utils.user.get_user()) + salt.utils.verify.verify_env(v_dirs, salt.utils.user.get_user(), opts=self.opts) if pillars: for name, provider in pillars.pop("providers", {}).items(): diff --git a/salt/cloud/cli.py b/salt/cloud/cli.py index 19b26e422873..e1d70ba3abbb 100644 --- a/salt/cloud/cli.py +++ b/salt/cloud/cli.py @@ -58,6 +58,7 @@ def run(self): [os.path.dirname(self.config["conf_file"])], salt_master_user, root_dir=self.config["root_dir"], + opts=self.config, ) except OSError as err: log.error("Error while verifying the environment: %s", err) diff --git a/salt/config/__init__.py b/salt/config/__init__.py index e4fa8e21bad7..9a15be4fa0eb 100644 --- a/salt/config/__init__.py +++ b/salt/config/__init__.py @@ -180,6 +180,14 @@ def _gather_buffer_space(): # 'maint': Runs on a schedule as a part of the maintenance process. # '': Disable the key cache [default] "key_cache": str, + # Enable the O(1) PKI index + "pki_index_enabled": bool, + # Total slots per shard (keep 2x your minion count for best performance) + "pki_index_size": int, + # Number of index shards (allows the index to span multiple files) + "pki_index_shards": int, + # Max length of a Minion ID in bytes + "pki_index_slot_size": int, # The user under which the daemon should run "user": str, # The root directory prepended to these options: pki_dir, cachedir, @@ -1388,6 +1396,10 @@ def _gather_buffer_space(): "root_dir": salt.syspaths.ROOT_DIR, "pki_dir": os.path.join(salt.syspaths.LIB_STATE_DIR, "pki", "master"), "key_cache": "", + "pki_index_enabled": False, + "pki_index_size": 1000000, + "pki_index_shards": 1, + "pki_index_slot_size": 128, "cachedir": os.path.join(salt.syspaths.CACHE_DIR, "master"), "file_roots": { "base": [salt.syspaths.BASE_FILE_ROOTS_DIR, salt.syspaths.SPM_FORMULA_PATH] diff --git a/salt/key.py b/salt/key.py index c9a64467b4d2..06f199f0ee44 100644 --- a/salt/key.py +++ b/salt/key.py @@ -13,6 +13,7 @@ import salt.client import salt.crypt import salt.exceptions +import salt.output import salt.payload import salt.transport import salt.utils.args @@ -49,9 +50,9 @@ class KeyCLI: def __init__(self, opts): self.opts = opts - import salt.wheel + from salt import wheel - self.client = salt.wheel.WheelClient(opts) + self.client = wheel.WheelClient(opts) # instantiate the key object for masterless mode if not opts.get("eauth"): self.key = get_key(opts) @@ -126,9 +127,9 @@ def _init_auth(self): # low, prompt the user to enter auth credentials if "token" not in low and "key" not in low and self.opts["eauth"]: # This is expensive. Don't do it unless we need to. - import salt.auth + from salt import auth - resolver = salt.auth.Resolver(self.opts) + resolver = auth.Resolver(self.opts) res = resolver.cli(self.opts["eauth"]) if self.opts["mktoken"] and res: tok = resolver.token_cli(self.opts["eauth"], res) @@ -141,10 +142,10 @@ def _init_auth(self): low["eauth"] = self.opts["eauth"] else: # late import to avoid circular import - import salt.utils.master + from salt.utils import master as master_utils low["user"] = salt.utils.user.get_specific_user() - low["key"] = salt.utils.master.get_master_key( + low["key"] = master_utils.get_master_key( low["user"], self.opts, skip_perm_errors ) @@ -570,9 +571,10 @@ def dict_match(self, match_dict): ret.setdefault(keydir, []).append(key) return ret - def list_keys(self): + def list_keys(self, force_scan=False): """ - Return a dict of managed keys and what the key status are + Return a dict of managed keys and what the key status are. + The cache layer (localfs_key) uses an internal index for fast O(1) lookups. """ if self.opts.get("key_cache") == "sched": acc = "accepted" @@ -583,24 +585,74 @@ def list_keys(self): with salt.utils.files.fopen(cache_file, mode="rb") as fn_: return salt.payload.load(fn_) + # Use cache layer's optimized bulk fetch + if not force_scan and self.opts.get("pki_index_enabled", False): + from salt.utils import pki as pki_utils + + index = pki_utils.PkiIndex(self.opts) + items = index.list_items() + if items: + ret = { + "minions_pre": [], + "minions_rejected": [], + "minions": [], + "minions_denied": [], + } + for id_, state in items: + if state == "accepted": + ret["minions"].append(id_) + elif state == "pending": + ret["minions_pre"].append(id_) + elif state == "rejected": + ret["minions_rejected"].append(id_) + + # Sort for consistent CLI output + for key in ret: + ret[key] = salt.utils.data.sorted_ignorecase(ret[key]) + + # Denied keys are not in the index currently + ret["minions_denied"] = salt.utils.data.sorted_ignorecase( + self.cache.list("denied_keys") + ) + return ret + ret = { "minions_pre": [], "minions_rejected": [], "minions": [], "minions_denied": [], } - for id_ in salt.utils.data.sorted_ignorecase(self.cache.list("keys")): - key = self.cache.fetch("keys", id_) - - if key["state"] == "accepted": - ret["minions"].append(id_) - elif key["state"] == "pending": - ret["minions_pre"].append(id_) - elif key["state"] == "rejected": - ret["minions_rejected"].append(id_) - - for id_ in salt.utils.data.sorted_ignorecase(self.cache.list("denied_keys")): - ret["minions_denied"].append(id_) + + # Try to use the optimized list_all() method if available + try: + all_keys = self.cache.list_all("keys") + for minion_id, data in all_keys.items(): + state = data.get("state") + if state == "accepted": + ret["minions"].append(minion_id) + elif state == "pending": + ret["minions_pre"].append(minion_id) + elif state == "rejected": + ret["minions_rejected"].append(minion_id) + except AttributeError: + # Fallback for cache backends that don't implement list_all() + for id_ in salt.utils.data.sorted_ignorecase(self.cache.list("keys")): + key = self.cache.fetch("keys", id_) + if key["state"] == "accepted": + ret["minions"].append(id_) + elif key["state"] == "pending": + ret["minions_pre"].append(id_) + elif key["state"] == "rejected": + ret["minions_rejected"].append(id_) + + # Sort for consistent output + for key in ret: + ret[key] = salt.utils.data.sorted_ignorecase(ret[key]) + + # Denied keys + ret["minions_denied"] = salt.utils.data.sorted_ignorecase( + self.cache.list("denied_keys") + ) return ret def local_keys(self): @@ -613,19 +665,19 @@ def local_keys(self): ret["local"].append(key) return ret - def all_keys(self): + def all_keys(self, force_scan=False): """ Merge managed keys with local keys """ - keys = self.list_keys() + keys = self.list_keys(force_scan=force_scan) keys.update(self.local_keys()) return keys - def list_status(self, match): + def list_status(self, match, force_scan=False): """ Return a dict of managed keys under a named status """ - ret = self.all_keys() + ret = self.all_keys(force_scan=force_scan) if match.startswith("acc"): return { "minions": salt.utils.data.sorted_ignorecase(ret.get("minions", [])) diff --git a/salt/master.py b/salt/master.py index 49f26e58406a..49872e8ef13d 100644 --- a/salt/master.py +++ b/salt/master.py @@ -245,6 +245,7 @@ def __init__(self, opts, **kwargs): self.ipc_publisher = kwargs.pop("ipc_publisher", None) super().__init__(**kwargs) self.opts = opts + self.cache = None # Lazy init in _post_fork_init # How often do we perform the maintenance tasks self.loop_interval = int(self.opts["loop_interval"]) # A serializer for general maint operations @@ -276,12 +277,18 @@ def _post_fork_init(self): self.opts, runner_client.functions_dict(), returners=self.returners ) self.ckminions = salt.utils.minions.CkMinions(self.opts) + # Init cache for key operations + self.cache = salt.cache.Cache( + self.opts, driver=self.opts.get("keys.cache_driver", "localfs_key") + ) # Make Event bus for firing self.event = salt.utils.event.get_master_event( - self.opts, self.opts["sock_dir"], listen=False + self.opts, self.opts["sock_dir"], listen=True ) # Init any values needed by the git ext pillar self.git_pillar = salt.daemons.masterapi.init_git_pillar(self.opts) + # Rebuild PKI index on startup to remove tombstones + self._rebuild_pki_index() if self.opts["maintenance_niceness"] and not salt.utils.platform.is_windows(): log.info( @@ -345,6 +352,34 @@ def run(self): now = int(time.time()) time.sleep(self.loop_interval) + def _rebuild_pki_index(self): + """ + Rebuild PKI index on startup to remove tombstones and ensure consistency. + This is called once during _post_fork_init(). + """ + if self.opts.get("keys.cache_driver", "localfs_key") != "localfs_key": + return + + try: + from salt.cache import localfs_key + + log.info("Rebuilding PKI index on startup") + result = localfs_key.rebuild_index(self.opts) + if result: + stats = localfs_key.get_index_stats(self.opts) + if stats: + log.info( + "PKI index rebuilt: %d keys, load factor %.1f%%", + stats["occupied"], + stats["load_factor"] * 100, + ) + else: + log.warning( + "PKI index rebuild failed, will use fallback directory scan" + ) + except Exception as exc: # pylint: disable=broad-except + log.error("Error rebuilding PKI index: %s", exc) + def handle_key_cache(self): """ Evaluate accepted keys and create a msgpack file @@ -358,9 +393,28 @@ def handle_key_cache(self): else: acc = "accepted" - for fn_ in os.listdir(os.path.join(self.pki_dir, acc)): - if not fn_.startswith("."): - keys.append(fn_) + # Lazy init cache if not available + if self.cache is None: + self.cache = salt.cache.Cache( + self.opts, + driver=self.opts.get("keys.cache_driver", "localfs_key"), + ) + + # Use cache layer (which internally uses mmap index for O(1) performance) + try: + all_keys = self.cache.list_all("keys") + keys = [ + minion_id + for minion_id, data in all_keys.items() + if data.get("state") == "accepted" + ] + except AttributeError: + # Fallback for cache backends that don't implement list_all() + for id_ in self.cache.list("keys"): + key = self.cache.fetch("keys", id_) + if key and key.get("state") == "accepted": + keys.append(id_) + log.debug("Writing master key cache") # Write a temporary file securely with salt.utils.atomicfile.atomic_open( diff --git a/salt/modules/saltutil.py b/salt/modules/saltutil.py index 486c21d02b65..0999ec757122 100644 --- a/salt/modules/saltutil.py +++ b/salt/modules/saltutil.py @@ -1674,12 +1674,17 @@ def regen_keys(): salt '*' saltutil.regen_keys """ - for fn_ in os.listdir(__opts__["pki_dir"]): - path = os.path.join(__opts__["pki_dir"], fn_) - try: - os.remove(path) - except OSError: - pass + pki_dir = __opts__["pki_dir"] + try: + with os.scandir(pki_dir) as it: + for entry in it: + if entry.is_file(): + try: + os.remove(entry.path) + except OSError: + pass + except OSError: + pass # TODO: move this into a channel function? Or auth? # create a channel again, this will force the key regen with salt.channel.client.ReqChannel.factory(__opts__) as channel: diff --git a/salt/output/key.py b/salt/output/key.py index f89f95c7f96a..b5b3a143f31f 100644 --- a/salt/output/key.py +++ b/salt/output/key.py @@ -81,19 +81,25 @@ def output(data, **kwargs): # pylint: disable=unused-argument ), } - ret = "" + ret = [] for status in sorted(data): - ret += f"{trans[status]}\n" + ret.append(f"{trans[status]}") for key in sorted(data[status]): key = salt.utils.data.decode(key) skey = salt.output.strip_esc_sequence(key) if strip_colors else key if isinstance(data[status], list): - ret += "{}{}{}{}\n".format( - " " * ident, cmap[status], skey, color["ENDC"] + ret.append( + "{}{}{}{}".format(" " * ident, cmap[status], skey, color["ENDC"]) ) if isinstance(data[status], dict): - ret += "{}{}{}: {}{}\n".format( - " " * ident, cmap[status], skey, data[status][key], color["ENDC"] + ret.append( + "{}{}{}: {}{}".format( + " " * ident, + cmap[status], + skey, + data[status][key], + color["ENDC"], + ) ) - return ret + return "\n".join(ret) diff --git a/salt/runners/pki.py b/salt/runners/pki.py new file mode 100644 index 000000000000..6e4b9e6ad84b --- /dev/null +++ b/salt/runners/pki.py @@ -0,0 +1,90 @@ +""" +Salt runner for PKI index management. + +.. versionadded:: 3009.0 +""" + +import logging + +log = logging.getLogger(__name__) + + +def rebuild_index(dry_run=False): + """ + Rebuild the PKI mmap index from filesystem. + + With dry_run=True, shows what would be rebuilt without making changes. + + CLI Examples: + + .. code-block:: bash + + # Rebuild the index + salt-run pki.rebuild_index + + # Check status without rebuilding (dry-run) + salt-run pki.rebuild_index dry_run=True + """ + from salt.cache import localfs_key + + stats_before = localfs_key.get_index_stats(__opts__) + + if dry_run: + if not stats_before: + return "PKI index does not exist or is not accessible." + + pct_tombstones = ( + ( + stats_before["deleted"] + / (stats_before["occupied"] + stats_before["deleted"]) + * 100 + ) + if (stats_before["occupied"] + stats_before["deleted"]) > 0 + else 0 + ) + + return ( + f"PKI Index Status:\n" + f" Total slots: {stats_before['total']:,}\n" + f" Occupied: {stats_before['occupied']:,}\n" + f" Deleted (tombstones): {stats_before['deleted']:,}\n" + f" Empty: {stats_before['empty']:,}\n" + f" Load factor: {stats_before['load_factor']:.1%}\n" + f" Tombstone ratio: {pct_tombstones:.1f}%\n" + f"\n" + f"Rebuild recommended: {'Yes' if pct_tombstones > 25 else 'No'}" + ) + + # Perform rebuild + log.info("Starting PKI index rebuild") + result = localfs_key.rebuild_index(__opts__) + + if not result: + return "PKI index rebuild failed. Check logs for details." + + stats_after = localfs_key.get_index_stats(__opts__) + + if stats_before and stats_after: + tombstones_removed = stats_before["deleted"] + return ( + f"PKI index rebuilt successfully.\n" + f" Keys: {stats_after['occupied']:,}\n" + f" Tombstones removed: {tombstones_removed:,}\n" + f" Load factor: {stats_after['load_factor']:.1%}" + ) + else: + return "PKI index rebuilt successfully." + + +def status(): + """ + Show PKI index statistics. + + CLI Example: + + .. code-block:: bash + + salt-run pki.status + """ + # Just call rebuild_index with dry_run=True + return rebuild_index(dry_run=True) diff --git a/salt/utils/minions.py b/salt/utils/minions.py index 0d493928445c..ddddd3f9dd80 100644 --- a/salt/utils/minions.py +++ b/salt/utils/minions.py @@ -230,7 +230,7 @@ def _check_glob_minions( Return the minions found by looking via globs """ if minions: - matched = {"minions": fnmatch.filter(minions, expr), "missing": []} + matched = fnmatch.filter(minions, expr) else: matched = self.key.glob_match(expr).get(self.key.ACC, []) @@ -279,18 +279,19 @@ def _check_pcre_minions( "missing": [], } - def _pki_minions(self): + def _pki_minions(self, force_scan=False): """ Retrieve complete minion list from PKI dir. - Respects cache if configured + The cache layer internally uses an mmap index for O(1) performance. """ - minions = set() try: - accepted = self.key.list_status("accepted").get("minions") - if accepted: - minions = minions | set(accepted) + res = self.key.list_status("accepted") + if res: + accepted = res.get("minions") + if accepted: + minions = minions | set(accepted) except OSError as exc: log.error( "Encountered OSError while evaluating minions in PKI dir: %s", exc diff --git a/salt/utils/mmap_cache.py b/salt/utils/mmap_cache.py new file mode 100644 index 000000000000..277b21843927 --- /dev/null +++ b/salt/utils/mmap_cache.py @@ -0,0 +1,517 @@ +import logging +import mmap +import os +import zlib + +import salt.utils.files +import salt.utils.stringutils + +log = logging.getLogger(__name__) + +# Status constants +EMPTY = 0 +OCCUPIED = 1 +DELETED = 2 + + +class MmapCache: + """ + A generic memory-mapped hash table for O(1) lookup. + This class handles the file management and mmap lifecycle. + """ + + def __init__(self, path, size=1000000, slot_size=128): + self.path = os.path.realpath(path) + self.size = size + self.slot_size = slot_size + self._mm = None + self._ino = None + + @property + def _lock_path(self): + return self.path + ".lock" + + def _init_file(self): + """ + Initialize the file with zeros if it doesn't exist. + """ + if not os.path.exists(self.path): + log.debug("Initializing new mmap cache file at %s", self.path) + try: + # Ensure directory exists + os.makedirs(os.path.dirname(self.path), exist_ok=True) + with salt.utils.files.fopen(self.path, "wb") as f: + # Write zeros to the whole file to ensure it's fully allocated + # and consistent across different platforms (macOS/Windows). + # Using a 1MB chunk size for efficiency. + total_size = self.size * self.slot_size + chunk_size = 1024 * 1024 + zeros = b"\x00" * min(chunk_size, total_size) + bytes_written = 0 + while bytes_written < total_size: + to_write = min(chunk_size, total_size - bytes_written) + if to_write < chunk_size: + f.write(zeros[:to_write]) + else: + f.write(zeros) + bytes_written += to_write + f.flush() + os.fsync(f.fileno()) + except OSError as exc: + log.error("Failed to initialize mmap cache file: %s", exc) + return False + return True + + def open(self, write=False): + """ + Open the memory-mapped file. + """ + if self._mm: + # Check for staleness (Atomic Swap detection) + try: + if os.stat(self.path).st_ino != self._ino: + log.debug( + "MmapCache staleness detected for %s, re-opening", self.path + ) + self.close() + else: + return True + except OSError: + # File might be temporarily missing during a swap or deleted. + # If deleted, we should close current mm and try to re-init/open. + log.debug( + "MmapCache file missing for %s during staleness check", self.path + ) + self.close() + + log.debug("MmapCache.open() path=%s, write=%s", self.path, write) + if write: + if not self._init_file(): + return False + mode = "r+b" + access = mmap.ACCESS_WRITE + else: + if not os.path.exists(self.path): + log.debug("MmapCache.open() failed: file missing: %s", self.path) + return False + mode = "rb" + access = mmap.ACCESS_READ + + try: + with salt.utils.files.fopen(self.path, mode) as f: + fd = f.fileno() + st = os.fstat(fd) + self._ino = st.st_ino + + # Verify file size matches expected size + expected_size = self.size * self.slot_size + if st.st_size != expected_size: + log.error( + "MmapCache file size mismatch for %s: expected %d, got %d", + self.path, + expected_size, + st.st_size, + ) + return False + + # Use 0 for length to map the whole file + self._mm = mmap.mmap(fd, 0, access=access) + return True + except OSError as exc: + log.error("Failed to mmap cache file %s: %s", self.path, exc) + self.close() + return False + + def close(self): + """ + Close the memory-mapped file. + """ + if self._mm: + try: + self._mm.close() + except BufferError: + # Handle cases where buffers might still be in use + pass + self._mm = None + self._ino = None + + def _hash(self, key_bytes): + """ + Calculate the hash slot for a key. + """ + return zlib.adler32(key_bytes) % self.size + + def put(self, key, value=None): + """ + Add a key (and optional value) to the cache. + If value is None, we just store the key (Set-like behavior). + If value is provided, we store it alongside the key. + Note: The total size of (key + value) must fit in slot_size - 1. + """ + if not self.open(write=True): + return False + + key_bytes = salt.utils.stringutils.to_bytes(key) + val_bytes = salt.utils.stringutils.to_bytes(value) if value is not None else b"" + + # We store: [STATUS][KEY][NULL][VALUE][NULL...] + # For simplicity in this generic version, let's just store the key and value separated by null + # or just the key if it's a set. + + data = key_bytes + if value is not None: + data += b"\x00" + val_bytes + + if len(data) > self.slot_size - 1: + log.warning("Data too long for mmap cache slot: %s", key) + return False + + h = self._hash(key_bytes) + # Use file locking for multi-process safety on writes + import fcntl + + try: + with salt.utils.files.fopen(self._lock_path, "w") as lock_f: + fcntl.flock(lock_f.fileno(), fcntl.LOCK_EX) + try: + for i in range(self.size): + slot = (h + i) % self.size + offset = slot * self.slot_size + status = self._mm[offset] + + if status == OCCUPIED: + # Check if it's the same key + existing_data = self._mm[ + offset + 1 : offset + self.slot_size + ] + # Key is everything before first null + null_pos = existing_data.find(b"\x00") + existing_key = ( + existing_data[:null_pos] + if null_pos != -1 + else existing_data.rstrip(b"\x00") + ) + + if existing_key == key_bytes: + # Update value if needed + self._mm[offset + 1 : offset + 1 + len(data)] = data + if len(data) < self.slot_size - 1: + self._mm[offset + 1 + len(data)] = 0 + self._mm.flush() + return True + continue + + # Found an empty or deleted slot. + # Write data FIRST, then flip status byte to ensure reader safety. + self._mm[offset + 1 : offset + 1 + len(data)] = data + if len(data) < self.slot_size - 1: + self._mm[offset + 1 + len(data)] = 0 + self._mm[offset] = OCCUPIED + self._mm.flush() + return True + finally: + fcntl.flock(lock_f.fileno(), fcntl.LOCK_UN) + + log.error("Mmap cache is full!") + return False + except OSError as exc: + log.error("Error writing to mmap cache %s: %s", self.path, exc) + return False + + def get(self, key, default=None): + """ + Retrieve a value for a key. Returns default if not found. + If it was stored as a set (value=None), returns the key itself to indicate presence. + """ + if not self.open(write=False): + return default + + key_bytes = salt.utils.stringutils.to_bytes(key) + h = self._hash(key_bytes) + + for i in range(self.size): + slot = (h + i) % self.size + offset = slot * self.slot_size + status = self._mm[offset] + + if status == EMPTY: + return default + + if status == DELETED: + continue + + # Occupied, check key + existing_data = self._mm[offset + 1 : offset + self.slot_size] + null_pos = existing_data.find(b"\x00") + existing_key = ( + existing_data[:null_pos] + if null_pos != -1 + else existing_data.rstrip(b"\x00") + ) + + if existing_key == key_bytes: + # If there's no data after the key, it was stored as a set + if ( + len(existing_data) <= len(key_bytes) + 1 + or existing_data[len(key_bytes)] == 0 + and ( + len(existing_data) == len(key_bytes) + 1 + or existing_data[len(key_bytes) + 1] == 0 + ) + ): + # This is getting complicated, let's simplify. + # If stored as set, we have [KEY][\x00][\x00...] + # If stored as kv, we have [KEY][\x00][VALUE][\x00...] + if null_pos != -1: + if ( + null_pos == len(existing_data) - 1 + or existing_data[null_pos + 1] == 0 + ): + return True + else: + return True + + value_part = existing_data[null_pos + 1 :] + val_null_pos = value_part.find(b"\x00") + if val_null_pos != -1: + value_part = value_part[:val_null_pos] + return salt.utils.stringutils.to_unicode(value_part) + return default + + def delete(self, key): + """ + Remove a key from the cache. + """ + if not self.open(write=True): + return False + + key_bytes = salt.utils.stringutils.to_bytes(key) + h = self._hash(key_bytes) + + import fcntl + + try: + with salt.utils.files.fopen(self._lock_path, "w") as lock_f: + fcntl.flock(lock_f.fileno(), fcntl.LOCK_EX) + try: + for i in range(self.size): + slot = (h + i) % self.size + offset = slot * self.slot_size + status = self._mm[offset] + + if status == EMPTY: + return False + + if status == DELETED: + continue + + existing_data = self._mm[offset + 1 : offset + self.slot_size] + null_pos = existing_data.find(b"\x00") + existing_key = ( + existing_data[:null_pos] + if null_pos != -1 + else existing_data.rstrip(b"\x00") + ) + + if existing_key == key_bytes: + self._mm[offset] = DELETED + self._mm.flush() + return True + finally: + fcntl.flock(lock_f.fileno(), fcntl.LOCK_UN) + return False + except OSError as exc: + log.error("Error deleting from mmap cache %s: %s", self.path, exc) + return False + + def contains(self, key): + """ + Check if a key exists. + """ + res = self.get(key, default=None) + return res is not None + + def list_keys(self): + """ + Return all keys in the cache. + """ + return [item[0] for item in self.list_items()] + + def list_items(self): + """ + Return all (key, value) pairs in the cache. + If it's a set, value will be True. + """ + if not self.open(write=False): + return [] + + ret = [] + mm = self._mm + slot_size = self.slot_size + + for slot in range(self.size): + offset = slot * slot_size + if mm[offset] == OCCUPIED: + # Get the slot data. + # mm[offset:offset+slot_size] is relatively fast. + slot_data = mm[offset + 1 : offset + slot_size] + + # Use C-based find for speed + null_pos = slot_data.find(b"\x00") + + if null_pos == -1: + key_bytes = slot_data + value = True + else: + key_bytes = slot_data[:null_pos] + + value = True + # Check if there is data after the null + if null_pos < len(slot_data) - 1 and slot_data[null_pos + 1] != 0: + val_data = slot_data[null_pos + 1 :] + val_null_pos = val_data.find(b"\x00") + if val_null_pos == -1: + value_bytes = val_data + else: + value_bytes = val_data[:val_null_pos] + + if value_bytes: + value = salt.utils.stringutils.to_unicode(value_bytes) + + ret.append((salt.utils.stringutils.to_unicode(key_bytes), value)) + return ret + + def get_stats(self): + """ + Return statistics about the cache state. + Returns dict with: {occupied, deleted, empty, total, load_factor} + """ + if not self.open(write=False): + return { + "occupied": 0, + "deleted": 0, + "empty": 0, + "total": self.size, + "load_factor": 0.0, + } + + counts = {"occupied": 0, "deleted": 0, "empty": 0} + mm = self._mm + slot_size = self.slot_size + + for slot in range(self.size): + offset = slot * slot_size + status = mm[offset] + if status == OCCUPIED: + counts["occupied"] += 1 + elif status == DELETED: + counts["deleted"] += 1 + else: # EMPTY + counts["empty"] += 1 + + counts["total"] = self.size + counts["load_factor"] = ( + (counts["occupied"] + counts["deleted"]) / self.size + if self.size > 0 + else 0.0 + ) + return counts + + def atomic_rebuild(self, iterator): + """ + Rebuild the cache from an iterator of (key, value) or (key,) + This populates a temporary file and swaps it in atomically. + """ + log.debug("MmapCache.atomic_rebuild() path=%s", self.path) + # Ensure directory exists + os.makedirs(os.path.dirname(self.path), exist_ok=True) + + tmp_path = self.path + ".tmp" + import fcntl + + # We use the same lock file for consistency + try: + with salt.utils.files.fopen(self._lock_path, "w") as lock_f: + fcntl.flock(lock_f.fileno(), fcntl.LOCK_EX) + try: + # Initialize empty file with explicit writes (no sparse files) + with salt.utils.files.fopen(tmp_path, "wb") as f: + total_size = self.size * self.slot_size + chunk_size = 1024 * 1024 + zeros = b"\x00" * min(chunk_size, total_size) + bytes_written = 0 + while bytes_written < total_size: + to_write = min(chunk_size, total_size - bytes_written) + if to_write < chunk_size: + f.write(zeros[:to_write]) + else: + f.write(zeros) + bytes_written += to_write + f.flush() + os.fsync(f.fileno()) + + # Open for writing + with salt.utils.files.fopen(tmp_path, "r+b") as f: + mm = mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_WRITE) + + try: + # Bulk insert all items + for item in iterator: + if isinstance(item, (list, tuple)) and len(item) > 1: + key, value = item[0], item[1] + else: + key = ( + item[0] + if isinstance(item, (list, tuple)) + else item + ) + value = None + + key_bytes = salt.utils.stringutils.to_bytes(key) + val_bytes = ( + salt.utils.stringutils.to_bytes(value) + if value is not None + else b"" + ) + + data = key_bytes + if value is not None: + data += b"\x00" + val_bytes + + if len(data) > self.slot_size - 1: + log.warning("Data too long for slot: %s", key) + continue + + # Find slot using same hash function + h = zlib.adler32(key_bytes) % self.size + for i in range(self.size): + slot = (h + i) % self.size + offset = slot * self.slot_size + + if mm[offset] != OCCUPIED: + # Write data then status (reader-safe order) + mm[offset + 1 : offset + 1 + len(data)] = data + if len(data) < self.slot_size - 1: + mm[offset + 1 + len(data)] = 0 + mm[offset] = OCCUPIED + break + mm.flush() + finally: + mm.close() + + # Close current mmap before replacing file + self.close() + + # Atomic swap + os.replace(tmp_path, self.path) + return True + finally: + fcntl.flock(lock_f.fileno(), fcntl.LOCK_UN) + except OSError as exc: + log.error("Error rebuilding mmap cache %s: %s", self.path, exc) + if os.path.exists(tmp_path): + try: + os.remove(tmp_path) + except OSError: + pass + return False diff --git a/salt/utils/pki.py b/salt/utils/pki.py new file mode 100644 index 000000000000..06d5e3215095 --- /dev/null +++ b/salt/utils/pki.py @@ -0,0 +1,77 @@ +import logging +import os + +import salt.utils.mmap_cache + +log = logging.getLogger(__name__) + + +class PkiIndex: + """ + A memory-mapped hash table for O(1) minion ID lookup. + Wraps the generic MmapCache. + """ + + def __init__(self, opts): + self.opts = opts + self.enabled = opts.get("pki_index_enabled", False) + size = opts.get("pki_index_size", 1000000) + slot_size = opts.get("pki_index_slot_size", 128) + + if "cluster_id" in opts and opts["cluster_id"]: + pki_dir = opts["cluster_pki_dir"] + else: + pki_dir = opts.get("pki_dir", "") + + # Index lives in cachedir instead of etc + cachedir = opts.get("cachedir", "/var/cache/salt/master") + index_path = os.path.join(cachedir, ".pki_index.mmap") + log.debug( + "PkiIndex.__init__() enabled=%s, index_path=%s", self.enabled, index_path + ) + self._cache = salt.utils.mmap_cache.MmapCache( + index_path, size=size, slot_size=slot_size + ) + + def open(self, write=False): + if not self.enabled: + return False + return self._cache.open(write=write) + + def close(self): + self._cache.close() + + def add(self, mid, state="accepted"): + if not self.enabled: + return False + return self._cache.put(mid, value=state) + + def delete(self, mid): + if not self.enabled: + return False + return self._cache.delete(mid) + + def contains(self, mid): + if not self.enabled: + return None + return self._cache.contains(mid) + + def list(self): + if not self.enabled: + return [] + return self._cache.list_keys() + + def list_by_state(self, state): + if not self.enabled: + return [] + return [mid for mid, s in self._cache.list_items() if s == state] + + def list_items(self): + if not self.enabled: + return [] + return self._cache.list_items() + + def rebuild(self, iterator): + if not self.enabled: + return False + return self._cache.atomic_rebuild(iterator) diff --git a/salt/utils/verify.py b/salt/utils/verify.py index 369c0c183966..6d9881d4c309 100644 --- a/salt/utils/verify.py +++ b/salt/utils/verify.py @@ -231,7 +231,13 @@ def verify_files(files, user): def verify_env( - dirs, user, permissive=False, pki_dir="", skip_extra=False, root_dir=ROOT_DIR + dirs, + user, + permissive=False, + pki_dir="", + skip_extra=False, + root_dir=ROOT_DIR, + opts=None, ): """ Verify that the named directories are in place and that the environment @@ -239,7 +245,11 @@ def verify_env( """ if salt.utils.platform.is_windows(): return win_verify_env( - root_dir, dirs, permissive=permissive, skip_extra=skip_extra + root_dir, + dirs, + permissive=permissive, + pki_dir=pki_dir, + skip_extra=skip_extra, ) # after confirming not running Windows @@ -329,6 +339,34 @@ def verify_env( # Run the extra verification checks zmq_version() + # Ensure PKI index files are owned by the correct user + # These are dotfiles, so they were skipped in the directory walk loops above + if not salt.utils.platform.is_windows() and os.getuid() == 0: + # Check both cachedir (new) and pki_dir (old location) + search_dirs = [] + if opts and opts.get("cachedir"): + search_dirs.append(opts["cachedir"]) + if pki_dir: + if isinstance(pki_dir, list): + search_dirs.extend(pki_dir) + else: + search_dirs.append(pki_dir) + + for _dir in search_dirs: + if not _dir: + continue + for index_file in [".pki_index.mmap", ".pki_index.mmap.lock"]: + index_path = os.path.join(_dir, index_file) + if os.path.exists(index_path): + try: + # Set permissions to 600 (read/write for owner only) + os.chmod(index_path, 0o600) + fmode = os.stat(index_path) + if fmode.st_uid != uid or fmode.st_gid != gid: + os.chown(index_path, uid, gid) + except OSError: + continue + def check_user(user): """ @@ -426,7 +464,9 @@ def check_path_traversal(path, user="root", skip_perm_errors=False): def check_max_open_files(opts): """ - Check the number of max allowed open files and adjust if needed + Check the number of max allowed open files and adjust if needed. + Checks actual file descriptor usage when possible, falls back to + heuristic-based estimation on systems without /proc. """ mof_c = opts.get("max_open_files", 100000) if sys.platform.startswith("win"): @@ -440,48 +480,59 @@ def check_max_open_files(opts): resource.RLIMIT_NOFILE ) + # Count accepted keys using directory listing + # Note: The cache layer uses an internal mmap index for O(1) lookups, + # but for a simple count, listing the directory is sufficient accepted_keys_dir = os.path.join(opts.get("pki_dir"), "minions") - accepted_count = len(os.listdir(accepted_keys_dir)) + try: + # Try os.scandir first (faster), fall back to os.listdir if unavailable + try: + accepted_count = sum(1 for _ in os.scandir(accepted_keys_dir)) + except (OSError, FileNotFoundError): + # Fallback for when scandir fails or /proc is unavailable + accepted_count = len(os.listdir(accepted_keys_dir)) + except (OSError, FileNotFoundError): + accepted_count = 0 log.debug("This salt-master instance has accepted %s minion keys.", accepted_count) - level = logging.INFO - - if (accepted_count * 4) <= mof_s: - # We check for the soft value of max open files here because that's the - # value the user chose to raise to. - # - # The number of accepted keys multiplied by four(4) is lower than the - # soft value, everything should be OK - return - - msg = ( - "The number of accepted minion keys({}) should be lower than 1/4 " - "of the max open files soft setting({}). ".format(accepted_count, mof_s) - ) - - if accepted_count >= mof_s: - # This should never occur, it might have already crashed - msg += "salt-master will crash pretty soon! " - level = logging.CRITICAL - elif (accepted_count * 2) >= mof_s: - # This is way too low, CRITICAL - level = logging.CRITICAL - elif (accepted_count * 3) >= mof_s: - level = logging.WARNING - # The accepted count is more than 3 time, WARN - elif (accepted_count * 4) >= mof_s: - level = logging.INFO - - if mof_c < mof_h: - msg += ( - "According to the system's hard limit, there's still a " - "margin of {} to raise the salt's max_open_files " - "setting. ".format(mof_h - mof_c) + # Try to get actual FD usage (Linux/Unix with /proc) + actual_fd_count = None + try: + pid = os.getpid() + actual_fd_count = sum(1 for _ in os.scandir(f"/proc/{pid}/fd")) + log.debug( + "This salt-master process has %s open file descriptors.", actual_fd_count ) - - msg += "Please consider raising this value." - log.log(level=level, msg=msg) + except (OSError, FileNotFoundError, AttributeError): + # /proc not available (non-Linux) or permission denied + # Fall back to heuristic-based check below + pass + + # Only warn based on ACTUAL FD usage, not heuristics + if actual_fd_count is not None: + fd_percent = (actual_fd_count / mof_s) * 100 + + # Only log CRITICAL if over 80% usage + if fd_percent > 80: + msg = ( + f"CRITICAL: File descriptor usage at {fd_percent:.1f}%: " + f"{actual_fd_count} of {mof_s} file descriptors in use. " + ) + if mof_c < mof_h: + msg += f"You can raise max_open_files up to {mof_h} (hard limit). " + msg += "Consider increasing max_open_files to avoid resource exhaustion." + log.critical(msg) + else: + # Below 80%, just debug log + log.debug( + "File descriptor usage: %s of %s (%.1f%%)", + actual_fd_count, + mof_s, + fd_percent, + ) + # If we can't get actual FD count, don't log anything + # (With mmap index, key count is not a useful FD predictor) def _realpath_darwin(path): @@ -580,7 +631,7 @@ def valid_id(opts, id_): pki_dir = opts["cluster_pki_dir"] else: pki_dir = opts["pki_dir"] - return bool(clean_path(opts["pki_dir"], id_)) + return bool(clean_path(pki_dir, id_)) except (AttributeError, KeyError, TypeError, UnicodeDecodeError): return False diff --git a/tests/pytests/unit/runners/test_pki.py b/tests/pytests/unit/runners/test_pki.py new file mode 100644 index 000000000000..c8d1ffcac3ed --- /dev/null +++ b/tests/pytests/unit/runners/test_pki.py @@ -0,0 +1,86 @@ +import pytest + +import salt.runners.pki +from tests.support.mock import patch + + +@pytest.fixture +def opts(tmp_path): + pki_dir = tmp_path / "pki" + pki_dir.mkdir() + # Create directories + for subdir in ["minions", "minions_pre", "minions_rejected"]: + (pki_dir / subdir).mkdir() + + return { + "pki_dir": str(pki_dir), + "sock_dir": str(tmp_path / "sock"), + "cachedir": str(tmp_path / "cache"), + } + + +def test_status_empty_index(opts): + """Test status when index is empty (no keys)""" + if not hasattr(salt.runners.pki, "__opts__"): + salt.runners.pki.__opts__ = {} + with patch.dict(salt.runners.pki.__opts__, opts): + result = salt.runners.pki.status() + # Empty index should show 0 occupied keys + assert "Occupied: 0" in result + assert "PKI Index Status" in result + + +def test_rebuild_index(opts, tmp_path): + """Test rebuilding index from filesystem""" + pki_dir = tmp_path / "pki" + + # Create some keys + (pki_dir / "minions" / "minion1").write_text("fake_key_1") + (pki_dir / "minions" / "minion2").write_text("fake_key_2") + (pki_dir / "minions_pre" / "minion3").write_text("fake_key_3") + + if not hasattr(salt.runners.pki, "__opts__"): + salt.runners.pki.__opts__ = {} + with patch.dict(salt.runners.pki.__opts__, opts): + result = salt.runners.pki.rebuild_index() + assert "successfully" in result + assert "3" in result # Should show 3 keys + + +def test_rebuild_index_dry_run(opts, tmp_path): + """Test dry-run shows stats without modifying index""" + pki_dir = tmp_path / "pki" + + # Create some keys + (pki_dir / "minions" / "minion1").write_text("fake_key_1") + (pki_dir / "minions" / "minion2").write_text("fake_key_2") + + if not hasattr(salt.runners.pki, "__opts__"): + salt.runners.pki.__opts__ = {} + with patch.dict(salt.runners.pki.__opts__, opts): + # First rebuild to create index + salt.runners.pki.rebuild_index() + + # Now dry-run + result = salt.runners.pki.rebuild_index(dry_run=True) + assert "PKI Index Status" in result + assert "Occupied" in result + assert "Tombstone" in result + + +def test_status_command(opts, tmp_path): + """Test status command is alias for dry-run""" + pki_dir = tmp_path / "pki" + (pki_dir / "minions" / "minion1").write_text("fake_key_1") + + if not hasattr(salt.runners.pki, "__opts__"): + salt.runners.pki.__opts__ = {} + with patch.dict(salt.runners.pki.__opts__, opts): + # Build index first + salt.runners.pki.rebuild_index() + + # Status should give same output as dry-run + status_result = salt.runners.pki.status() + dry_run_result = salt.runners.pki.rebuild_index(dry_run=True) + + assert status_result == dry_run_result diff --git a/tests/pytests/unit/utils/test_mmap_cache.py b/tests/pytests/unit/utils/test_mmap_cache.py new file mode 100644 index 000000000000..be932845e6a5 --- /dev/null +++ b/tests/pytests/unit/utils/test_mmap_cache.py @@ -0,0 +1,158 @@ +import os + +import pytest + +import salt.utils.mmap_cache + + +@pytest.fixture +def cache_path(tmp_path): + return str(tmp_path / "test_cache.idx") + + +def test_mmap_cache_put_get(cache_path): + cache = salt.utils.mmap_cache.MmapCache(cache_path, size=100, slot_size=64) + assert cache.put("key1", "val1") is True + assert cache.get("key1") == "val1" + assert cache.get("key2") is None + cache.close() + + +def test_mmap_cache_put_update(cache_path): + cache = salt.utils.mmap_cache.MmapCache(cache_path, size=100, slot_size=64) + assert cache.put("key1", "val1") is True + assert cache.get("key1") == "val1" + assert cache.put("key1", "val2") is True + assert cache.get("key1") == "val2" + cache.close() + + +def test_mmap_cache_delete(cache_path): + cache = salt.utils.mmap_cache.MmapCache(cache_path, size=100, slot_size=64) + cache.put("key1", "val1") + assert cache.contains("key1") is True + assert cache.delete("key1") is True + assert cache.contains("key1") is False + assert cache.get("key1") is None + cache.close() + + +def test_mmap_cache_list_keys(cache_path): + cache = salt.utils.mmap_cache.MmapCache(cache_path, size=100, slot_size=64) + keys = ["key1", "key2", "key3"] + for k in keys: + cache.put(k, f"val_{k}") + + assert set(cache.list_keys()) == set(keys) + cache.close() + + +def test_mmap_cache_set_behavior(cache_path): + """Test using it as a set (value=None)""" + cache = salt.utils.mmap_cache.MmapCache(cache_path, size=100, slot_size=64) + assert cache.put("key1") is True + assert cache.contains("key1") is True + assert cache.get("key1") is True + cache.close() + + +def test_mmap_cache_slot_boundaries(cache_path): + """Test data exactly at and over slot boundaries""" + slot_size = 64 + cache = salt.utils.mmap_cache.MmapCache(cache_path, size=10, slot_size=slot_size) + + # Exactly slot_size - 1 (allowed) + key = "a" * (slot_size - 1) + assert cache.put(key) is True + assert cache.contains(key) is True + + # Exactly slot_size (not allowed, need 1 byte for status) + key2 = "b" * slot_size + assert cache.put(key2) is False + + # Value + Key boundary + # 1 byte status + 30 bytes key + 1 byte null + 32 bytes value = 64 bytes + key3 = "k" * 30 + val3 = "v" * 32 + assert cache.put(key3, val3) is True + assert cache.get(key3) == val3 + + # One byte too many + val4 = "v" * 33 + assert cache.put(key3, val4) is False + cache.close() + + +def test_mmap_cache_staleness_detection(cache_path): + """Test that a reader detects an atomic file swap via Inode check""" + cache = salt.utils.mmap_cache.MmapCache(cache_path, size=100, slot_size=64) + assert cache.put("key1", "val1") is True + assert cache.get("key1") == "val1" + + # Manually simulate an atomic swap from another "process" + tmp_path = cache_path + ".manual_tmp" + other_cache = salt.utils.mmap_cache.MmapCache(tmp_path, size=100, slot_size=64) + other_cache.put("key2", "val2") + other_cache.close() + + os.replace(tmp_path, cache_path) + + # The original cache instance should detect the Inode change on next open/access + # Our get() calls open(write=False) + assert cache.get("key2") == "val2" + assert cache.contains("key1") is False + cache.close() + + +def test_mmap_cache_persistence(cache_path): + """Test data persists after closing and re-opening""" + cache = salt.utils.mmap_cache.MmapCache(cache_path, size=100, slot_size=64) + cache.put("persist_me", "done") + cache.close() + + new_instance = salt.utils.mmap_cache.MmapCache(cache_path, size=100, slot_size=64) + assert new_instance.get("persist_me") == "done" + new_instance.close() + + +def test_mmap_cache_atomic_rebuild(cache_path): + cache = salt.utils.mmap_cache.MmapCache(cache_path, size=100, slot_size=64) + cache.put("old_key", "old_val") + + # Rebuild with new data + new_data = [("key1", "val1"), ("key2", "val2")] + assert cache.atomic_rebuild(new_data) is True + + # Current cache object should reflect changes after reopening + assert cache.open() is True + assert cache.get("key1") == "val1" + assert cache.get("key2") == "val2" + assert cache.contains("old_key") is False + cache.close() + + +def test_mmap_cache_size_mismatch(cache_path): + # Initialize a file with 64-byte slots + cache = salt.utils.mmap_cache.MmapCache(cache_path, size=10, slot_size=64) + cache.put("test") + cache.close() + + # Try to open it with an instance expecting 128-byte slots + wrong_cache = salt.utils.mmap_cache.MmapCache(cache_path, size=10, slot_size=128) + assert wrong_cache.open(write=False) is False + wrong_cache.close() + + +def test_mmap_cache_list_items(cache_path): + cache = salt.utils.mmap_cache.MmapCache(cache_path, size=100, slot_size=64) + data = {"key1": "val1", "key2": "val2", "key3": True} + for k, v in data.items(): + if v is True: + cache.put(k) + else: + cache.put(k, v) + + items = cache.list_items() + assert len(items) == 3 + assert set(items) == {("key1", "val1"), ("key2", "val2"), ("key3", True)} + cache.close() diff --git a/tests/pytests/unit/utils/verify/test_verify.py b/tests/pytests/unit/utils/verify/test_verify.py index 60171523cb48..8394785d5dd5 100644 --- a/tests/pytests/unit/utils/verify/test_verify.py +++ b/tests/pytests/unit/utils/verify/test_verify.py @@ -13,11 +13,6 @@ import salt.utils.verify from tests.support.mock import patch -if sys.platform.startswith("win"): - import win32file -else: - import resource - log = logging.getLogger(__name__) @@ -72,7 +67,9 @@ def _chown(path, uid, gid): ): # verify this runs without issues, even though FNFE is raised - salt.utils.verify.verify_env(["/tmp/salt-dir"], "root", skip_extra=True) + salt.utils.verify.verify_env( + ["/tmp/salt-dir"], "root", skip_extra=True, opts={"cachedir": "/tmp"} + ) # and verify it got actually called with the valid paths mock_stat.assert_any_call("/tmp/salt-dir/file1") @@ -185,117 +182,26 @@ def test_verify_socket(): def test_max_open_files(caplog): - with caplog.at_level(logging.DEBUG): - recorded_logs = caplog.record_tuples - logmsg_dbg = "This salt-master instance has accepted {0} minion keys." - logmsg_chk = ( - "The number of accepted minion keys({}) should be lower " - "than 1/4 of the max open files soft setting({}). According " - "to the system's hard limit, there's still a margin of {} " - "to raise the salt's max_open_files setting. Please consider " - "raising this value." - ) - logmsg_crash = ( - "The number of accepted minion keys({}) should be lower " - "than 1/4 of the max open files soft setting({}). " - "salt-master will crash pretty soon! According to the " - "system's hard limit, there's still a margin of {} to " - "raise the salt's max_open_files setting. Please consider " - "raising this value." - ) - if sys.platform.startswith("win"): - logmsg_crash = ( - "The number of accepted minion keys({}) should be lower " - "than 1/4 of the max open files soft setting({}). " - "salt-master will crash pretty soon! Please consider " - "raising this value." - ) + """ + Test that check_max_open_files only logs CRITICAL when > 80% FD usage. + With mmap index, key counts don't predict FD usage, so we only check actual FDs. + """ + tempdir = tempfile.mkdtemp(prefix="fake-keys") + keys_dir = pathlib.Path(tempdir, "minions") + keys_dir.mkdir() - if sys.platform.startswith("win"): - # Check the Windows API for more detail on this - # http://msdn.microsoft.com/en-us/library/xt874334(v=vs.71).aspx - # and the python binding http://timgolden.me.uk/pywin32-docs/win32file.html - mof_s = mof_h = win32file._getmaxstdio() - else: - mof_s, mof_h = resource.getrlimit(resource.RLIMIT_NOFILE) - tempdir = tempfile.mkdtemp(prefix="fake-keys") - keys_dir = pathlib.Path(tempdir, "minions") - keys_dir.mkdir() + # Create some keys (doesn't matter how many with mmap) + for n in range(100): + kpath = pathlib.Path(keys_dir, str(n)) + with salt.utils.files.fopen(kpath, "w") as fp_: + fp_.write(str(n)) - mof_test = 256 + opts = {"max_open_files": 100000, "pki_dir": tempdir} - if sys.platform.startswith("win"): - win32file._setmaxstdio(mof_test) - else: - resource.setrlimit(resource.RLIMIT_NOFILE, (mof_test, mof_h)) + with caplog.at_level(logging.DEBUG): + salt.utils.verify.check_max_open_files(opts) - try: - prev = 0 - for newmax, level in ( - (24, None), - (66, "INFO"), - (127, "WARNING"), - (196, "CRITICAL"), - ): - - for n in range(prev, newmax): - kpath = pathlib.Path(keys_dir, str(n)) - with salt.utils.files.fopen(kpath, "w") as fp_: - fp_.write(str(n)) - - opts = {"max_open_files": newmax, "pki_dir": tempdir} - - salt.utils.verify.check_max_open_files(opts) - - if level is None: - # No log message is triggered, only the DEBUG one which - # tells us how many minion keys were accepted. - assert [logmsg_dbg.format(newmax)] == caplog.messages - else: - assert logmsg_dbg.format(newmax) in caplog.messages - assert ( - logmsg_chk.format( - newmax, - mof_test, - ( - mof_test - newmax - if sys.platform.startswith("win") - else mof_h - newmax - ), - ) - in caplog.messages - ) - prev = newmax - - newmax = mof_test - for n in range(prev, newmax): - kpath = pathlib.Path(keys_dir, str(n)) - with salt.utils.files.fopen(kpath, "w") as fp_: - fp_.write(str(n)) - - opts = {"max_open_files": newmax, "pki_dir": tempdir} - - salt.utils.verify.check_max_open_files(opts) - assert logmsg_dbg.format(newmax) in caplog.messages - assert ( - logmsg_crash.format( - newmax, - mof_test, - ( - mof_test - newmax - if sys.platform.startswith("win") - else mof_h - newmax - ), - ) - in caplog.messages - ) - except OSError as err: - if err.errno == 24: - # Too many open files - pytest.skip("We've hit the max open files setting") - raise - finally: - if sys.platform.startswith("win"): - win32file._setmaxstdio(mof_h) - else: - resource.setrlimit(resource.RLIMIT_NOFILE, (mof_s, mof_h)) + # Should only see debug log (FD usage is way below 80%) + assert "This salt-master instance has accepted 100 minion keys" in caplog.text + # Should NOT see CRITICAL (FD usage is < 80%) + assert "CRITICAL" not in caplog.text