From 54d35c8e1f826be921d363850aa3e6fc7d5cad65 Mon Sep 17 00:00:00 2001 From: Ignace Mouzannar Date: Thu, 2 Apr 2026 21:46:25 -0400 Subject: [PATCH] Security: harden command execution against shell-path hijacking Use trusted absolute shell interpreters for non-sudo/su execution, add breakout regression coverage, and keep compatibility/lint coverage aligned. --- lshell/builtincmd.py | 22 +- lshell/engine/authorizer.py | 60 +- lshell/expansion_inspector.py | 512 ++++++++++++++++++ lshell/sec.py | 453 +++++++++------- lshell/utils.py | 87 ++- test/test_breakout_regressions.py | 179 ++++++ test/test_engine_pipeline_unit.py | 230 ++++++++ ...test_security_attack_surface_unit_part2.py | 314 +++++++++++ ...test_security_attack_surface_unit_part3.py | 77 +++ test/test_source_command_unit.py | 19 + test/test_unit.py | 6 + 11 files changed, 1713 insertions(+), 246 deletions(-) create mode 100644 lshell/expansion_inspector.py create mode 100644 test/test_breakout_regressions.py create mode 100644 test/test_security_attack_surface_unit_part3.py diff --git a/lshell/builtincmd.py b/lshell/builtincmd.py index 2d5910a..f60f73c 100644 --- a/lshell/builtincmd.py +++ b/lshell/builtincmd.py @@ -6,6 +6,7 @@ import shlex import readline import signal +import re # import lshell specifics from lshell import variables @@ -35,6 +36,15 @@ "source", ] +_BASH_FUNCTION_ENV_RE = re.compile(r"^BASH_FUNC_.*$") + + +def _is_forbidden_env_name(name): + """Return True when env var name is blocked for security reasons.""" + if name in variables.FORBIDDEN_ENVIRON: + return True + return bool(_BASH_FUNCTION_ENV_RE.match(name)) + def _cancel_job_timeout(job): """Cancel a watchdog timer attached to a background job, if any.""" @@ -126,7 +136,7 @@ def cmd_export(args): if len(tokens) >= 2 and "=" in tokens[1]: var, value = tokens[1].split("=", 1) # disallow dangerous variable - if var in variables.FORBIDDEN_ENVIRON: + if _is_forbidden_env_name(var): return 1, var os.environ.update({var: value}) return 0, None @@ -136,6 +146,7 @@ def cmd_source(envfile): """Source a file in the current shell context""" envfile = envfile.strip().strip("'").strip('"') envfile = os.path.expanduser(os.path.expandvars(envfile)) + retcode = 0 try: with open(envfile, encoding="utf-8") as env_vars: for env_var in env_vars.readlines(): @@ -143,11 +154,16 @@ def cmd_source(envfile): if not line or line.startswith("#"): continue if line.startswith("export "): - cmd_export(line) + export_ret, var = cmd_export(line) + if export_ret == 1 and var: + sys.stderr.write( + f"lshell: forbidden environment variable: {var}\n" + ) + retcode = 1 except (OSError, IOError): sys.stderr.write(f"lshell: unable to read environment file: {envfile}\n") return 1 - return 0 + return retcode def cmd_cd(directory, conf): diff --git a/lshell/engine/authorizer.py b/lshell/engine/authorizer.py index bee3bac..0d87fa2 100644 --- a/lshell/engine/authorizer.py +++ b/lshell/engine/authorizer.py @@ -109,7 +109,7 @@ def authorize( if check_current_dir is None: check_current_dir = mode == "runtime" - if depth > 8: + if depth > sec.MAX_EXPANSION_RECURSION: return _deny( reasons.UNKNOWN_SYNTAX, canonical_ast, @@ -155,39 +155,16 @@ def authorize( line=oline, ) - expansions = sec._scan_shell_expansions(line) - - for expansion in expansions: - if expansion.kind != "command_substitution": - continue - inner = expansion.body.strip() - violation = _first_path_violation(inner, policy, check_current_dir=False) - if violation: - return _deny( - reasons.FORBIDDEN_PATH, - canonical_ast, - path=violation, - line=oline, - ) - - nested_decision = _authorize_nested(inner, policy, mode, ssh, depth) - if not nested_decision.allowed: - return nested_decision - - for expansion in expansions: - if expansion.kind != "backtick": - continue - nested_decision = _authorize_nested( - expansion.body.strip(), policy, mode, ssh, depth + expansion_inspection = sec.inspect_shell_expansions(line) + if expansion_inspection.malformed: + return _deny( + reasons.UNKNOWN_SYNTAX, + canonical_ast, + command=oline, + line=oline, ) - if not nested_decision.allowed: - return nested_decision - - for expansion in expansions: - if expansion.kind != "parameter_expansion": - continue - variable_text = sec._parameter_expansion_path_probe(expansion.body).strip() + for variable_text in expansion_inspection.parameter_path_probes: violation = _first_path_violation( variable_text, policy, check_current_dir=False ) @@ -199,6 +176,25 @@ def authorize( line=oline, ) + for expansion in expansion_inspection.executable_expansions: + inner = expansion.body.strip() + if not inner: + continue + + if expansion.kind in {"command_substitution", "process_substitution"}: + violation = _first_path_violation(inner, policy, check_current_dir=False) + if violation: + return _deny( + reasons.FORBIDDEN_PATH, + canonical_ast, + path=violation, + line=oline, + ) + + nested_decision = _authorize_nested(inner, policy, mode, ssh, depth) + if not nested_decision.allowed: + return nested_decision + allowed_commands = _allowed_commands(policy, ssh=ssh) for command_node in canonical_ast.commands: diff --git a/lshell/expansion_inspector.py b/lshell/expansion_inspector.py new file mode 100644 index 0000000..c5baa0d --- /dev/null +++ b/lshell/expansion_inspector.py @@ -0,0 +1,512 @@ +"""Shared expansion scanner/inspector used by sec and authorizer.""" + +import re +from typing import NamedTuple + + +MAX_EXPANSION_RECURSION = 8 +MAX_EXPANSION_SCAN_CHARS = 32768 +MAX_EXPANSION_TOKENS = 512 + +_PARAMETER_NAME_RE = re.compile(r"^[A-Za-z_][A-Za-z0-9_]*$") +_PARAMETER_LENGTH_RE = re.compile(r"^#[A-Za-z_][A-Za-z0-9_]*$") +_PARAMETER_OPERATOR_RE = re.compile( + r"^([A-Za-z_][A-Za-z0-9_]*)(:?[-+=?])(.*)$", re.DOTALL +) + + +class _ShellExpansion(NamedTuple): + """Parsed shell expansion from an input line.""" + + kind: str + body: str + + +class _ExpansionInspection(NamedTuple): + """Security-oriented view of shell expansions in one command line.""" + + executable_expansions: tuple + parameter_path_probes: tuple + malformed: bool + + +def _read_backtick_expansion(line, start): + """Read a backtick expansion beginning at start and return (end, body).""" + i = start + 1 + escaped = False + while i < len(line): + char = line[i] + if escaped: + escaped = False + i += 1 + continue + if char == "\\": + escaped = True + i += 1 + continue + if char == "`": + return i + 1, line[start + 1 : i] + i += 1 + return None, None + + +def _read_parenthesized_expansion(line, start, prefix_len=2): + """Read a balanced (...) expansion body for $(), <(), >().""" + i = start + prefix_len + in_single = False + in_double = False + in_backtick = False + escaped = False + paren_depth = 1 + parameter_depth = 0 + + while i < len(line): + char = line[i] + next_char = line[i + 1] if i + 1 < len(line) else "" + + if escaped: + escaped = False + i += 1 + continue + + if char == "\\" and not in_single: + escaped = True + i += 1 + continue + + if char == "'" and not in_double and not in_backtick: + in_single = not in_single + i += 1 + continue + + if char == '"' and not in_single and not in_backtick: + in_double = not in_double + i += 1 + continue + + if char == "`" and not in_single: + in_backtick = not in_backtick + i += 1 + continue + + if in_single or in_double or in_backtick: + i += 1 + continue + + if char == "$" and next_char == "{": + parameter_depth += 1 + i += 2 + continue + + if char == "}" and parameter_depth > 0: + parameter_depth -= 1 + i += 1 + continue + + if parameter_depth > 0: + i += 1 + continue + + if char == "(": + paren_depth += 1 + i += 1 + continue + + if char == ")": + paren_depth -= 1 + if paren_depth == 0: + return i + 1, line[start + prefix_len : i] + if paren_depth < 0: + return None, None + i += 1 + continue + + i += 1 + + return None, None + + +def _read_parameter_expansion(line, start): + """Read a balanced ${...} expansion body from start.""" + i = start + 2 + in_single = False + in_double = False + in_backtick = False + escaped = False + parameter_depth = 1 + + while i < len(line): + char = line[i] + next_char = line[i + 1] if i + 1 < len(line) else "" + + if escaped: + escaped = False + i += 1 + continue + + if char == "\\" and not in_single: + escaped = True + i += 1 + continue + + if char == "'" and not in_double and not in_backtick: + in_single = not in_single + i += 1 + continue + + if char == '"' and not in_single and not in_backtick: + in_double = not in_double + i += 1 + continue + + if char == "`" and not in_single: + in_backtick = not in_backtick + i += 1 + continue + + if in_single or in_double or in_backtick: + i += 1 + continue + + if char == "$" and next_char == "{": + parameter_depth += 1 + i += 2 + continue + + if char == "}": + parameter_depth -= 1 + if parameter_depth == 0: + return i + 1, line[start + 2 : i] + if parameter_depth < 0: + return None, None + i += 1 + continue + + i += 1 + + return None, None + + +def _read_arithmetic_expansion(line, start): + """Read a balanced $((...)) arithmetic expansion body from start.""" + i = start + 3 + in_single = False + in_double = False + in_backtick = False + escaped = False + paren_depth = 1 + parameter_depth = 0 + + while i < len(line): + char = line[i] + next_char = line[i + 1] if i + 1 < len(line) else "" + + if escaped: + escaped = False + i += 1 + continue + + if char == "\\" and not in_single: + escaped = True + i += 1 + continue + + if char == "'" and not in_double and not in_backtick: + in_single = not in_single + i += 1 + continue + + if char == '"' and not in_single and not in_backtick: + in_double = not in_double + i += 1 + continue + + if char == "`" and not in_single: + in_backtick = not in_backtick + i += 1 + continue + + if in_single or in_double or in_backtick: + i += 1 + continue + + if char == "$" and next_char == "{": + parameter_depth += 1 + i += 2 + continue + + if char == "}" and parameter_depth > 0: + parameter_depth -= 1 + i += 1 + continue + + if parameter_depth > 0: + i += 1 + continue + + if char == "(": + paren_depth += 1 + i += 1 + continue + + if char == ")": + paren_depth -= 1 + if paren_depth == 0: + if next_char != ")": + return None, None + return i + 2, line[start + 3 : i] + if paren_depth < 0: + return None, None + i += 1 + continue + + i += 1 + + return None, None + + +def _scan_shell_expansions_with_status( + line, + command_context=True, + allow_process_substitution=True, +): + """Parse shell expansions and report malformed/unsupported markers.""" + if len(line) > MAX_EXPANSION_SCAN_CHARS: + return [], True + + expansions = [] + malformed = False + i = 0 + in_single = False + in_double = False + escaped = False + + while i < len(line): + if len(expansions) > MAX_EXPANSION_TOKENS: + malformed = True + break + + char = line[i] + next_char = line[i + 1] if i + 1 < len(line) else "" + + if escaped: + escaped = False + i += 1 + continue + + if char == "\\" and not in_single: + escaped = True + i += 1 + continue + + if char == "'" and not in_double: + in_single = not in_single + i += 1 + continue + + if char == '"' and not in_single: + in_double = not in_double + i += 1 + continue + + if in_single: + i += 1 + continue + + if char == "$" and next_char in {"'", '"'}: + malformed = True + break + + if command_context and not in_double: + if line.startswith("<<<", i) or line.startswith("<<-", i) or line.startswith( + "<<", i + ): + malformed = True + break + + if char == "$" and next_char == "(" and i + 2 < len(line) and line[i + 2] == "(": + end, body = _read_arithmetic_expansion(line, i) + if end is None: + malformed = True + break + if body: + expansions.append(_ShellExpansion("arithmetic_expansion", body)) + i = end + continue + + if char == "$" and next_char == "(": + end, body = _read_parenthesized_expansion(line, i, prefix_len=2) + if end is None: + malformed = True + break + if body: + expansions.append(_ShellExpansion("command_substitution", body)) + i = end + continue + + if char == "$" and next_char == "{": + end, body = _read_parameter_expansion(line, i) + if end is None: + malformed = True + break + if body: + expansions.append(_ShellExpansion("parameter_expansion", body)) + i = end + continue + + if ( + allow_process_substitution + and not in_double + and char in {"<", ">"} + and next_char == "(" + ): + end, body = _read_parenthesized_expansion(line, i, prefix_len=2) + if end is None: + malformed = True + break + if body: + expansions.append(_ShellExpansion("process_substitution", body)) + i = end + continue + + if char == "`": + end, body = _read_backtick_expansion(line, i) + if end is None: + malformed = True + break + if body: + expansions.append(_ShellExpansion("backtick", body)) + i = end + continue + + i += 1 + + return expansions, malformed + + +def _scan_shell_expansions(line): + """Parse shell expansions in-order while honoring quotes/escapes.""" + expansions, _malformed = _scan_shell_expansions_with_status(line) + return expansions + + +def inspect_shell_expansions(line, max_parameter_depth=MAX_EXPANSION_RECURSION): + """Collect recursive expansion checks for security policy evaluation.""" + executable_expansions = [] + parameter_path_probes = [] + malformed = False + scanned_expansions = 0 + + def _walk(current_line, depth, command_context, allow_process_substitution): + nonlocal malformed, scanned_expansions + + if depth > max_parameter_depth: + malformed = True + return + + expansions, scan_malformed = _scan_shell_expansions_with_status( + current_line, + command_context=command_context, + allow_process_substitution=allow_process_substitution, + ) + if scan_malformed: + malformed = True + + for expansion in expansions: + scanned_expansions += 1 + if scanned_expansions > MAX_EXPANSION_TOKENS: + malformed = True + return + + if expansion.kind in { + "command_substitution", + "backtick", + "process_substitution", + }: + body = expansion.body.strip() + if body: + executable_expansions.append( + _ShellExpansion(expansion.kind, body) + ) + continue + + if expansion.kind == "arithmetic_expansion": + body = expansion.body.strip() + if body: + _walk( + body, + depth + 1, + command_context=False, + allow_process_substitution=False, + ) + continue + + if expansion.kind != "parameter_expansion": + continue + + supported, probe = _analyze_parameter_expansion(expansion.body) + if not supported: + malformed = True + continue + + probe = probe.strip() + if probe: + parameter_path_probes.append(probe) + + body = expansion.body.strip() + if body: + _walk( + body, + depth + 1, + command_context=False, + allow_process_substitution=True, + ) + + _walk( + line, + depth=0, + command_context=True, + allow_process_substitution=True, + ) + return _ExpansionInspection( + executable_expansions=tuple(executable_expansions), + parameter_path_probes=tuple(parameter_path_probes), + malformed=malformed, + ) + + +def _parameter_expansion_path_probe(expression): + """Return the value-side text from supported ${...} forms.""" + supported, probe = _analyze_parameter_expansion(expression) + if not supported: + return "" + return probe + + +def _analyze_parameter_expansion(expression): + """Return (supported, probe_text) for parameter-expansion expressions.""" + stripped = expression.strip() + if not stripped: + return False, "" + + if _PARAMETER_NAME_RE.fullmatch(stripped): + return True, "" + + if _PARAMETER_LENGTH_RE.fullmatch(stripped): + return True, "" + + match = _PARAMETER_OPERATOR_RE.fullmatch(stripped) + if match: + _name, _operator, operand = match.groups() + return True, operand + + return False, "" + + +__all__ = [ + "MAX_EXPANSION_RECURSION", + "_ShellExpansion", + "_scan_shell_expansions", + "inspect_shell_expansions", + "_parameter_expansion_path_probe", +] diff --git a/lshell/sec.py b/lshell/sec.py index d57f9d1..16efa99 100644 --- a/lshell/sec.py +++ b/lshell/sec.py @@ -8,22 +8,24 @@ import os import shlex import glob -from typing import NamedTuple # import lshell specifics from lshell import messages from lshell import utils from lshell import audit +from lshell import expansion_inspector EXTENSION_RESTRICTION_EXEMPT_COMMANDS = {"cd", "clear", "fg", "bg", "ls"} MAX_WILDCARD_MATCHES = 4096 +MAX_BRACE_EXPANSIONS = 256 +MAX_EXPANSION_RECURSION = expansion_inspector.MAX_EXPANSION_RECURSION +# Backward-compatible exports used by tests and internal callers. +_ShellExpansion = expansion_inspector._ShellExpansion +_scan_shell_expansions = expansion_inspector._scan_shell_expansions +inspect_shell_expansions = expansion_inspector.inspect_shell_expansions -class _ShellExpansion(NamedTuple): - """Parsed shell expansion from an input line.""" - - kind: str - body: str +_EXTGLOB_OPENERS = ("@(", "!(", "+(", "*(", "?(") def _is_assignment_word(word): @@ -74,160 +76,6 @@ def _quoted_literals_without_assignment(line): return literals -def _read_backtick_expansion(line, start): - """Read a backtick expansion beginning at start and return (end, body).""" - i = start + 1 - escaped = False - while i < len(line): - char = line[i] - if escaped: - escaped = False - i += 1 - continue - if char == "\\": - escaped = True - i += 1 - continue - if char == "`": - return i + 1, line[start + 1 : i] - i += 1 - return None, None - - -def _read_dollar_expansion(line, start, closing): - """Read a balanced $()- or ${}-style expansion from start.""" - i = start + 2 - in_single = False - in_double = False - in_backtick = False - escaped = False - closers = [closing] - - while i < len(line): - char = line[i] - next_char = line[i + 1] if i + 1 < len(line) else "" - - if escaped: - escaped = False - i += 1 - continue - - if char == "\\" and not in_single: - escaped = True - i += 1 - continue - - if char == "'" and not in_double and not in_backtick: - in_single = not in_single - i += 1 - continue - - if char == '"' and not in_single and not in_backtick: - in_double = not in_double - i += 1 - continue - - if char == "`" and not in_single: - in_backtick = not in_backtick - i += 1 - continue - - if in_single or in_double or in_backtick: - i += 1 - continue - - if char == "$" and next_char == "(": - closers.append(")") - i += 2 - continue - - if char == "$" and next_char == "{": - closers.append("}") - i += 2 - continue - - if closers and char == closers[-1]: - closers.pop() - if not closers: - return i + 1, line[start + 2 : i] - i += 1 - continue - - i += 1 - - return None, None - - -def _scan_shell_expansions(line): - """Parse shell expansions in-order while honoring quotes/escapes.""" - expansions = [] - i = 0 - in_single = False - in_double = False - escaped = False - - while i < len(line): - char = line[i] - next_char = line[i + 1] if i + 1 < len(line) else "" - - if escaped: - escaped = False - i += 1 - continue - - if char == "\\" and not in_single: - escaped = True - i += 1 - continue - - if char == "'" and not in_double: - in_single = not in_single - i += 1 - continue - - if char == '"' and not in_single: - in_double = not in_double - i += 1 - continue - - if in_single: - i += 1 - continue - - if char == "$" and next_char == "(": - end, body = _read_dollar_expansion(line, i, ")") - if end is not None and body: - expansions.append(_ShellExpansion("command_substitution", body)) - i = end - continue - - if char == "$" and next_char == "{": - end, body = _read_dollar_expansion(line, i, "}") - if end is not None and body: - expansions.append(_ShellExpansion("parameter_expansion", body)) - i = end - continue - - if char == "`": - end, body = _read_backtick_expansion(line, i) - if end is not None and body: - expansions.append(_ShellExpansion("backtick", body)) - i = end - continue - - i += 1 - - return expansions - - -def _parameter_expansion_path_probe(expression): - """Return the value-side text from ${...} forms, or the expression itself.""" - for index, char in enumerate(expression): - if char in {"=", "+", "?", "-"}: - return expression[index + 1 :] - return expression - - def should_enforce_file_extensions(command): """Return True when extension restrictions should apply to this command.""" return command not in EXTENSION_RESTRICTION_EXEMPT_COMMANDS @@ -338,6 +186,154 @@ def _safe_expand_path(path): return None +def _contains_unescaped_extglob(pattern): + """Return True when extglob operators are present in an unescaped form.""" + escaped = False + for index, char in enumerate(pattern[:-1]): + if escaped: + escaped = False + continue + if char == "\\": + escaped = True + continue + if pattern[index : index + 2] in _EXTGLOB_OPENERS: + return True + return False + + +def _find_matching_brace(text, start): + """Return index of matching '}' for text[start] == '{', else None.""" + escaped = False + depth = 0 + for index in range(start, len(text)): + char = text[index] + if escaped: + escaped = False + continue + if char == "\\": + escaped = True + continue + if char == "{": + depth += 1 + continue + if char == "}": + depth -= 1 + if depth == 0: + return index + if depth < 0: + return None + return None + + +def _split_brace_options(body): + """Split one brace body ('a,b,{c,d}') into top-level options.""" + options = [] + current = [] + escaped = False + depth = 0 + + for char in body: + if escaped: + current.append(char) + escaped = False + continue + if char == "\\": + current.append(char) + escaped = True + continue + if char == "{": + current.append(char) + depth += 1 + continue + if char == "}": + if depth == 0: + return None + current.append(char) + depth -= 1 + continue + if char == "," and depth == 0: + options.append("".join(current)) + current = [] + continue + current.append(char) + + if escaped or depth != 0: + return None + + options.append("".join(current)) + if len(options) <= 1: + return [] + return options + + +def _find_expandable_brace_group(pattern): + """Return first expandable brace group as (start, end, options, malformed).""" + escaped = False + index = 0 + while index < len(pattern): + char = pattern[index] + if escaped: + escaped = False + index += 1 + continue + if char == "\\": + escaped = True + index += 1 + continue + if char != "{": + index += 1 + continue + + end = _find_matching_brace(pattern, index) + if end is None: + return None, None, None, True + + options = _split_brace_options(pattern[index + 1 : end]) + if options is None: + return None, None, None, True + if options: + return index, end, options, False + + index = end + 1 + + return None, None, None, False + + +def _expand_brace_patterns(pattern, limit=MAX_BRACE_EXPANSIONS): + """Perform shell-style brace expansion for one pattern, with fan-out limits.""" + expanded = [] + malformed = False + + def _walk(current): + nonlocal malformed + if malformed: + return + if len(expanded) >= limit: + malformed = True + return + + start, end, options, group_malformed = _find_expandable_brace_group(current) + if group_malformed: + malformed = True + return + + if start is None: + expanded.append(current) + return + + prefix = current[:start] + suffix = current[end + 1 :] + for option in options: + _walk(prefix + option + suffix) + if malformed: + return + + _walk(pattern) + if malformed: + return None + return expanded + + def expand_shell_wildcards(item): """Expand shell wildcards and return all candidate filesystem paths.""" @@ -346,25 +342,40 @@ def expand_shell_wildcards(item): if expanded_item is None: return [] + expanded_patterns = _expand_brace_patterns(expanded_item) + if expanded_patterns is None: + return [] + # Expand wildcard patterns against the filesystem and validate all matches. # Fail closed if expansion fans out too much to avoid memory abuse. try: expanded_items = [] - for match in glob.iglob(expanded_item, recursive=True): - resolved = _safe_realpath(match) - if resolved: - expanded_items.append(resolved) - if len(expanded_items) > MAX_WILDCARD_MATCHES: + seen_candidates = set() + for pattern in expanded_patterns: + if _contains_unescaped_extglob(pattern): return [] + + matched = False + for match in glob.iglob(pattern, recursive=True): + matched = True + resolved = _safe_realpath(match) + if resolved and resolved not in seen_candidates: + seen_candidates.add(resolved) + expanded_items.append(resolved) + if len(expanded_items) > MAX_WILDCARD_MATCHES: + return [] + + if not matched: + resolved = _safe_realpath(pattern) + if resolved and resolved not in seen_candidates: + seen_candidates.add(resolved) + expanded_items.append(resolved) + if len(expanded_items) > MAX_WILDCARD_MATCHES: + return [] except (OSError, RuntimeError, ValueError, re.error): return [] - if expanded_items: - return expanded_items - - # If no glob match exists, still validate the canonical target path. - resolved_item = _safe_realpath(expanded_item) - return [resolved_item] if resolved_item else [] + return expanded_items def _split_path_acl_entries(path_acl): @@ -445,6 +456,46 @@ def _looks_like_path_token(token): return False +def _grep_implicit_pattern_index(args): + """Return the grep implicit PATTERN arg index, or None when explicit patterns are used.""" + has_explicit_pattern = False + index = 0 + + while index < len(args): + token = args[index] + + if token == "--": + if not has_explicit_pattern and index + 1 < len(args): + return index + 1 + return None + + if token in {"-e", "--regexp", "-f", "--file"}: + has_explicit_pattern = True + index += 2 + continue + + if token.startswith("--regexp=") or token.startswith("--file="): + has_explicit_pattern = True + index += 1 + continue + + # Handle compact short forms like -ePATTERN / -fFILE. + if len(token) > 2 and token.startswith("-") and token[1] in {"e", "f"}: + has_explicit_pattern = True + index += 1 + continue + + if token.startswith("-") and token != "-": + index += 1 + continue + + if not has_explicit_pattern: + return index + return None + + return None + + def _path_tokens_from_line(line): """Extract path-like tokens from command segments, excluding bare command names.""" segments = utils.split_commands(line) @@ -475,7 +526,17 @@ def _path_tokens_from_line(line): continue if args: - path_tokens.extend(token for token in args if _looks_like_path_token(token)) + skip_indices = set() + if command in {"grep", "egrep", "fgrep", "rgrep"}: + implicit_pattern_index = _grep_implicit_pattern_index(args) + if implicit_pattern_index is not None: + skip_indices.add(implicit_pattern_index) + + path_tokens.extend( + token + for idx, token in enumerate(args) + if idx not in skip_indices and _looks_like_path_token(token) + ) continue # Single token mode (used by completion/policy path checks): @@ -546,7 +607,7 @@ def check_forbidden_chars(line, conf, strict=None, ssh=None): return 0, conf -def check_secure(line, conf, strict=None, ssh=None): +def check_secure(line, conf, strict=None, ssh=None, _depth=0): """This method is used to check the content on the typed command. Its purpose is to forbid the user to user to override the lshell command restrictions. @@ -566,9 +627,12 @@ def check_secure(line, conf, strict=None, ssh=None): # init return code returncode = 0 + if _depth > MAX_EXPANSION_RECURSION: + return warn_unknown_syntax(oline, conf, strict=strict, ssh=ssh) + for item in _quoted_literals_without_assignment(line): if os.path.exists(item): - ret_check_path, conf = check_path(item, conf, strict=strict) + ret_check_path, conf = check_path(item, conf, strict=strict, ssh=ssh) returncode += ret_check_path # parse command line for control characters, and warn user @@ -580,38 +644,31 @@ def check_secure(line, conf, strict=None, ssh=None): if ret_forbidden: return ret_forbidden, conf - expansions = _scan_shell_expansions(line) + expansion_inspection = inspect_shell_expansions(line) + if expansion_inspection.malformed: + return warn_unknown_syntax(oline, conf, strict=strict, ssh=ssh) - # check if the line contains $(foo) executions, and check them - for expansion in expansions: - if expansion.kind != "command_substitution": - continue - inner = expansion.body.strip() - # recurse on check_path - ret_check_path, conf = check_path(inner, conf, strict=strict) + for variable in expansion_inspection.parameter_path_probes: + ret_check_path, conf = check_path(variable, conf, strict=strict, ssh=ssh) returncode += ret_check_path - # recurse on check_secure - ret_check_secure, conf = check_secure(inner, conf, strict=strict) - returncode += ret_check_secure - - # check for executions using back quotes '`' - for expansion in expansions: - if expansion.kind != "backtick": + for expansion in expansion_inspection.executable_expansions: + inner = expansion.body.strip() + if not inner: continue + if expansion.kind in {"command_substitution", "process_substitution"}: + ret_check_path, conf = check_path(inner, conf, strict=strict, ssh=ssh) + returncode += ret_check_path + ret_check_secure, conf = check_secure( - expansion.body.strip(), conf, strict=strict + inner, + conf, + strict=strict, + ssh=ssh, + _depth=_depth + 1, ) returncode += ret_check_secure - # check if the line contains ${foo=bar}, and check them - for expansion in expansions: - if expansion.kind != "parameter_expansion": - continue - variable = _parameter_expansion_path_probe(expansion.body).strip() - ret_check_path, conf = check_path(variable, conf, strict=strict) - returncode += ret_check_path - # if unknown commands where found, return 1 and don't execute the line if returncode > 0: return 1, conf diff --git a/lshell/utils.py b/lshell/utils.py index 7b4e23d..0c1017f 100644 --- a/lshell/utils.py +++ b/lshell/utils.py @@ -1,4 +1,5 @@ -""" Utils for lshell """ +"""Utils for lshell""" + # pylint: disable=too-many-lines import re @@ -19,6 +20,7 @@ from lshell import builtincmd from lshell import audit from lshell import containment +from lshell.engine import executor as engine_executor def usage(exitcode=1): @@ -97,6 +99,7 @@ def split_command_sequence(line): escaped = False cmd_subst_depth = 0 var_brace_depth = 0 + proc_subst_depth = 0 i = 0 def flush_current(): @@ -153,12 +156,31 @@ def flush_current(): i += 2 continue + if ( + not in_single + and not in_double + and not in_backtick + and char in {"<", ">"} + and next_char == "(" + ): + proc_subst_depth += 1 + current.append(char) + current.append(next_char) + i += 2 + continue + if cmd_subst_depth > 0 and not in_single and not in_backtick and char == ")": cmd_subst_depth -= 1 current.append(char) i += 1 continue + if proc_subst_depth > 0 and not in_single and not in_backtick and char == ")": + proc_subst_depth -= 1 + current.append(char) + i += 1 + continue + if var_brace_depth > 0 and not in_single and not in_backtick and char == "}": var_brace_depth -= 1 current.append(char) @@ -171,6 +193,7 @@ def flush_current(): and not in_backtick and cmd_subst_depth == 0 and var_brace_depth == 0 + and proc_subst_depth == 0 ) if is_top_level: @@ -198,7 +221,14 @@ def flush_current(): current.append(char) i += 1 - if in_single or in_double or in_backtick or cmd_subst_depth or var_brace_depth: + if ( + in_single + or in_double + or in_backtick + or cmd_subst_depth + or var_brace_depth + or proc_subst_depth + ): return None flush_current() @@ -312,6 +342,33 @@ def replace_exit_code(line, retcode): } +_TRUSTED_SHELL_PATHS = ( + "/opt/homebrew/bin/bash", + "/usr/local/bin/bash", + "/bin/bash", + "/usr/bin/bash", + "/opt/homebrew/bin/dash", + "/usr/local/bin/dash", + "/bin/dash", + "/usr/bin/dash", + "/bin/sh", + "/usr/bin/sh", +) + + +def _resolve_trusted_shell(): + """Return an absolute trusted shell interpreter path, or None.""" + for candidate in _TRUSTED_SHELL_PATHS: + if os.path.isfile(candidate) and os.access(candidate, os.X_OK): + return candidate + return None + + +def _is_bash_function_env_name(name): + """Return True for env vars used by Bash function import.""" + return name.startswith("BASH_FUNC_") + + def _expand_braced_parameter(expr, support_advanced=True): """Expand ${...} expressions for the supported shell parameter forms.""" if not expr: @@ -502,9 +559,7 @@ def handle_builtin_command(full_command, executable, argument, shell_context): elif executable == "export": retcode, var = builtincmd.cmd_export(full_command) if retcode == 1: - shell_context.log.critical( - f"lshell: forbidden environment variable: {var}" - ) + shell_context.log.critical(f"lshell: forbidden environment variable: {var}") elif executable == "source": retcode = builtincmd.cmd_source(argument) elif executable == "fg": @@ -523,8 +578,6 @@ def cmd_parse_execute(command_line, shell_context=None, trusted_protocol=False): trusted_protocol is only for protocol commands (scp/sftp-server) that were already validated in run_overssh. """ - from lshell.engine import executor as engine_executor # pylint: disable=import-outside-toplevel - return engine_executor.execute_for_shell( command_line, shell_context=shell_context, @@ -555,6 +608,10 @@ def exec_cmd(cmd, background=False, extra_env=None, conf=None, log=None): # Prevent non-interactive shell startup file injection. exec_env.pop("BASH_ENV", None) exec_env.pop("ENV", None) + # Prevent function import from environment (e.g. BASH_FUNC_* poisoning). + for key in list(exec_env): + if _is_bash_function_env_name(key): + exec_env.pop(key, None) class CtrlZException(Exception): """Custom exception to handle Ctrl+Z (SIGTSTP).""" @@ -617,9 +674,7 @@ def _emit_timeout_event(): "lshell: runtime containment timed out command: " f'timeout={command_timeout}s, command="{cmd}"' ) - sys.stderr.write( - f"lshell: command timed out after {command_timeout}s: {cmd}\n" - ) + sys.stderr.write(f"lshell: command timed out after {command_timeout}s: {cmd}\n") previous_sigtstp_handler = signal.getsignal(signal.SIGTSTP) previous_sigcont_handler = signal.getsignal(signal.SIGCONT) @@ -628,7 +683,6 @@ def _emit_timeout_event(): # Register SIGTSTP (Ctrl+Z) and SIGCONT (resume) signal handlers signal.signal(signal.SIGTSTP, handle_sigtstp) signal.signal(signal.SIGCONT, handle_sigcont) - cmd_args = ["bash", "-c", cmd] try: split_cmd = shlex.split(cmd, posix=True) except ValueError: @@ -637,6 +691,14 @@ def _emit_timeout_event(): cmd_args = split_cmd if not background: detached_session = False + else: + shell_path = _resolve_trusted_shell() + if not shell_path: + sys.stderr.write( + "Command execution failed: trusted system shell interpreter not found.\n" + ) + return 127 + cmd_args = [shell_path, "-c", cmd] preexec_fn = None needs_resource_limits = runtime_limits.max_processes > 0 if os.name == "posix" and (detached_session or needs_resource_limits): @@ -709,8 +771,7 @@ def _background_timeout(): ) if log: log.critical( - "lshell: runtime containment denied command execution: " - f"{reason}" + "lshell: runtime containment denied command execution: " f"{reason}" ) sys.stderr.write( "lshell: command denied: unable to apply runtime containment limits\n" diff --git a/test/test_breakout_regressions.py b/test/test_breakout_regressions.py new file mode 100644 index 0000000..aa2936b --- /dev/null +++ b/test/test_breakout_regressions.py @@ -0,0 +1,179 @@ +"""Breakout regression coverage for command-policy bypass chains.""" + +import os +import tempfile +import unittest +from getpass import getuser + +import pexpect + +from lshell.engine import authorizer +from lshell.engine import reasons + + +TOPDIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +CONFIG = f"{TOPDIR}/test/testfiles/test.conf" +LSHELL = f"{TOPDIR}/bin/lshell" +USER = getuser() +PROMPT = f"{USER}:~\\$" + + +class TestBreakoutRegressions(unittest.TestCase): + """Security expectations that should fail while breakout remains unfixed.""" + + def _policy(self): + return { + "allowed": ["echo"], + "overssh": [], + "forbidden": [], + "strict": 1, + "sudo_commands": [], + "allowed_file_extensions": [], + "path": ["/|", ""], + } + + def _spawn_shell(self, extra_args=None, env=None): + args = [ + "--config", + CONFIG, + "--strict", + "1", + "--forbidden", + "[]", + "--allowed", + "['echo']", + "--path_noexec", + "''", + ] + if extra_args: + args.extend(extra_args) + child = pexpect.spawn( + LSHELL, + args=args, + env=env, + encoding="utf-8", + timeout=10, + ) + child.expect(PROMPT) + return child + + def _run_command(self, child, command): + child.sendline(command) + child.expect(PROMPT) + return child.before.split("\n", 1)[1] + + def _safe_exit(self, child): + if not child.isalive(): + return + child.sendline("exit") + child.expect(pexpect.EOF) + + def test_authorizer_misses_nested_command_substitution_in_parameter_expansion( + self, + ): + """Nested `$(...)` inside `${...}` must be denied by allowlist checks.""" + policy = self._policy() + + direct = authorizer.authorize_line( + "echo $(id)", + policy, + mode="policy", + check_current_dir=False, + ) + nested = authorizer.authorize_line( + "echo ${LSHELL_BREAKOUT_TEST:-$(id)}", + policy, + mode="policy", + check_current_dir=False, + ) + + self.assertFalse(direct.allowed) + self.assertEqual(direct.reason.code, reasons.FORBIDDEN_COMMAND) + self.assertFalse(nested.allowed) + self.assertEqual(nested.reason.code, reasons.FORBIDDEN_COMMAND) + + def test_interactive_breakout_executes_disallowed_command_via_parameter_expansion( + self, + ): + """Interactive shell must block nested breakout payloads.""" + child = self._spawn_shell() + try: + denied = self._run_command(child, "id") + self.assertIn('lshell: forbidden command: "id"', denied) + + breakout = self._run_command( + child, + "echo ${LSHELL_BREAKOUT_TEST:-$(id)}", + ) + self.assertIn('lshell: forbidden command: "id"', breakout) + self.assertNotRegex(breakout, r"uid=[0-9]+") + finally: + self._safe_exit(child) + child.close(force=True) + + def test_interactive_blocks_source_function_import_breakout( + self, + ): + """`source` must not allow env poisoning that triggers disallowed binaries.""" + envfile = None + child = None + try: + with tempfile.NamedTemporaryFile( + mode="w", + encoding="utf-8", + prefix="lshell-breakout-", + dir="/tmp", + delete=False, + ) as handle: + handle.write("export BASH_FUNC_echo%%='() { id; }'\n") + envfile = handle.name + + child = self._spawn_shell(extra_args=["--path", "['/tmp']"]) + + denied = self._run_command(child, "id") + self.assertIn('lshell: forbidden command: "id"', denied) + + self._run_command(child, f"source {envfile}") + + breakout = self._run_command(child, "echo harmless") + self.assertNotRegex(breakout, r"uid=[0-9]+") + self.assertIn("harmless", breakout) + finally: + if child is not None: + self._safe_exit(child) + child.close(force=True) + if envfile and os.path.exists(envfile): + os.unlink(envfile) + + def test_interactive_blocks_path_hijack_of_shell_interpreter( + self, + ): + """Allowed commands must not execute attacker-controlled PATH `bash` binaries.""" + child = None + with tempfile.TemporaryDirectory(prefix="lshell-path-hijack-", dir="/tmp") as tmpdir: + bindir = os.path.join(tmpdir, "bin") + os.makedirs(bindir, exist_ok=True) + fake_bash = os.path.join(bindir, "bash") + with open(fake_bash, "w", encoding="utf-8") as handle: + handle.write("#!/bin/sh\n") + handle.write("command -p id\n") + handle.write('exec /bin/bash "$@"\n') + os.chmod(fake_bash, 0o700) + + env = dict(os.environ) + env["PATH"] = f"{bindir}:{env.get('PATH', '')}" + child = self._spawn_shell(env=env) + try: + denied = self._run_command(child, "id") + self.assertIn('lshell: forbidden command: "id"', denied) + + breakout = self._run_command(child, "echo harmless") + self.assertNotRegex(breakout, r"uid=[0-9]+") + self.assertIn("harmless", breakout) + finally: + self._safe_exit(child) + child.close(force=True) + + +if __name__ == "__main__": + unittest.main() diff --git a/test/test_engine_pipeline_unit.py b/test/test_engine_pipeline_unit.py index 21040d0..783e6a2 100644 --- a/test/test_engine_pipeline_unit.py +++ b/test/test_engine_pipeline_unit.py @@ -176,6 +176,23 @@ def test_authorizer_parses_nested_command_substitutions(self): ) self.assertTrue(decision.allowed) + def test_authorizer_enforces_overssh_allowlist_inside_nested_expansions(self): + """SSH-mode nested expansions must use overssh allow-list decisions.""" + decision = authorizer.authorize_line( + "echo ${LSHELL_WORD:-$(id)}", + _policy( + allowed=["echo", "id"], + overssh=["echo"], + forbidden=[], + strict=1, + ), + mode="policy", + ssh=True, + check_current_dir=False, + ) + self.assertFalse(decision.allowed) + self.assertEqual(decision.reason.code, reasons.FORBIDDEN_COMMAND) + def test_authorizer_handles_parameter_expansion_with_logical_operators(self): """${...} operands containing ||/&& should parse as a single expansion body.""" decision = authorizer.authorize_line( @@ -186,6 +203,219 @@ def test_authorizer_handles_parameter_expansion_with_logical_operators(self): ) self.assertTrue(decision.allowed) + def test_authorizer_rejects_parameter_expansion_with_nested_backtick_substitution( + self, + ): + """Nested backticks in ${...} should enforce inner allow-list checks.""" + decision = authorizer.authorize_line( + "echo ${LSHELL_WORD:-`id`}", + _policy(allowed=["echo"], forbidden=[], strict=1), + mode="policy", + check_current_dir=False, + ) + self.assertFalse(decision.allowed) + self.assertEqual(decision.reason.code, reasons.FORBIDDEN_COMMAND) + + def test_authorizer_allows_parameter_expansion_with_nested_backtick_when_allowlisted( + self, + ): + """Allow nested backticks in ${...} only when inner command is allowlisted.""" + decision = authorizer.authorize_line( + "echo ${LSHELL_WORD:-`printf ok`}", + _policy(allowed=["echo", "printf"], forbidden=[], strict=1), + mode="policy", + check_current_dir=False, + ) + self.assertTrue(decision.allowed) + + def test_authorizer_fails_closed_on_malformed_nested_parameter_substitution( + self, + ): + """Malformed nested expansion markers in ${...} should be denied.""" + decision = authorizer.authorize_line( + "echo ${LSHELL_WORD:-$(printf ok}", + _policy(allowed=["echo", "printf"], forbidden=[], strict=1), + mode="policy", + check_current_dir=False, + ) + self.assertFalse(decision.allowed) + self.assertEqual(decision.reason.code, reasons.UNKNOWN_SYNTAX) + + def test_authorizer_rejects_process_substitution_with_disallowed_inner_command( + self, + ): + """Process substitutions must recurse into nested allow-list checks.""" + decision = authorizer.authorize_line( + "echo <(id)", + _policy(allowed=["echo"], forbidden=[], strict=1), + mode="policy", + check_current_dir=False, + ) + self.assertFalse(decision.allowed) + self.assertEqual(decision.reason.code, reasons.FORBIDDEN_COMMAND) + + def test_authorizer_allows_process_substitution_when_inner_command_allowlisted( + self, + ): + """Allow process substitution only when the nested command is allowlisted.""" + decision = authorizer.authorize_line( + "echo <(printf ok)", + _policy(allowed=["echo", "printf"], forbidden=[], strict=1), + mode="policy", + check_current_dir=False, + ) + self.assertTrue(decision.allowed) + + def test_authorizer_fails_closed_on_malformed_process_substitution(self): + """Unbalanced process substitutions should be denied.""" + decision = authorizer.authorize_line( + "echo <(printf ok", + _policy(allowed=["echo", "printf"], forbidden=[], strict=1), + mode="policy", + check_current_dir=False, + ) + self.assertFalse(decision.allowed) + self.assertEqual(decision.reason.code, reasons.UNKNOWN_SYNTAX) + + def test_authorizer_rejects_disallowed_command_in_arithmetic_expansion(self): + """$((...)) must recurse into nested substitutions for allow-list checks.""" + decision = authorizer.authorize_line( + "echo $(( $(id) + 1 ))", + _policy(allowed=["echo"], forbidden=[], strict=1), + mode="policy", + check_current_dir=False, + ) + self.assertFalse(decision.allowed) + self.assertEqual(decision.reason.code, reasons.FORBIDDEN_COMMAND) + + def test_authorizer_allows_arithmetic_expansion_when_nested_command_allowlisted( + self, + ): + """Allow arithmetic nested substitution only when inner command is allowed.""" + decision = authorizer.authorize_line( + "echo $(( $(printf 1) + 1 ))", + _policy(allowed=["echo", "printf"], forbidden=[], strict=1), + mode="policy", + check_current_dir=False, + ) + self.assertTrue(decision.allowed) + + def test_authorizer_rejects_unsupported_here_string_syntax(self): + """Fail closed on unsupported here-string syntax.""" + decision = authorizer.authorize_line( + "echo <<< ok", + _policy(allowed=["echo"], forbidden=[], strict=0), + mode="policy", + check_current_dir=False, + ) + self.assertFalse(decision.allowed) + self.assertEqual(decision.reason.code, reasons.UNKNOWN_SYNTAX) + + def test_authorizer_rejects_unsupported_here_doc_syntax(self): + """Fail closed on unsupported here-doc syntax.""" + decision = authorizer.authorize_line( + "echo <