Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions lshell/builtincmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import shlex
import readline
import signal
import re

# import lshell specifics
from lshell import variables
Expand Down Expand Up @@ -35,6 +36,15 @@
"source",
]

_BASH_FUNCTION_ENV_RE = re.compile(r"^BASH_FUNC_.*$")


def _is_forbidden_env_name(name):
"""Return True when env var name is blocked for security reasons."""
if name in variables.FORBIDDEN_ENVIRON:
return True
return bool(_BASH_FUNCTION_ENV_RE.match(name))


def _cancel_job_timeout(job):
"""Cancel a watchdog timer attached to a background job, if any."""
Expand Down Expand Up @@ -126,7 +136,7 @@ def cmd_export(args):
if len(tokens) >= 2 and "=" in tokens[1]:
var, value = tokens[1].split("=", 1)
# disallow dangerous variable
if var in variables.FORBIDDEN_ENVIRON:
if _is_forbidden_env_name(var):
return 1, var
os.environ.update({var: value})
return 0, None
Expand All @@ -136,18 +146,24 @@ def cmd_source(envfile):
"""Source a file in the current shell context"""
envfile = envfile.strip().strip("'").strip('"')
envfile = os.path.expanduser(os.path.expandvars(envfile))
retcode = 0
try:
with open(envfile, encoding="utf-8") as env_vars:
for env_var in env_vars.readlines():
line = env_var.strip()
if not line or line.startswith("#"):
continue
if line.startswith("export "):
cmd_export(line)
export_ret, var = cmd_export(line)
if export_ret == 1 and var:
sys.stderr.write(
f"lshell: forbidden environment variable: {var}\n"
)
retcode = 1
except (OSError, IOError):
sys.stderr.write(f"lshell: unable to read environment file: {envfile}\n")
return 1
return 0
return retcode


def cmd_cd(directory, conf):
Expand Down
60 changes: 28 additions & 32 deletions lshell/engine/authorizer.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def authorize(
if check_current_dir is None:
check_current_dir = mode == "runtime"

if depth > 8:
if depth > sec.MAX_EXPANSION_RECURSION:
return _deny(
reasons.UNKNOWN_SYNTAX,
canonical_ast,
Expand Down Expand Up @@ -155,39 +155,16 @@ def authorize(
line=oline,
)

expansions = sec._scan_shell_expansions(line)

for expansion in expansions:
if expansion.kind != "command_substitution":
continue
inner = expansion.body.strip()
violation = _first_path_violation(inner, policy, check_current_dir=False)
if violation:
return _deny(
reasons.FORBIDDEN_PATH,
canonical_ast,
path=violation,
line=oline,
)

nested_decision = _authorize_nested(inner, policy, mode, ssh, depth)
if not nested_decision.allowed:
return nested_decision

for expansion in expansions:
if expansion.kind != "backtick":
continue
nested_decision = _authorize_nested(
expansion.body.strip(), policy, mode, ssh, depth
expansion_inspection = sec.inspect_shell_expansions(line)
if expansion_inspection.malformed:
return _deny(
reasons.UNKNOWN_SYNTAX,
canonical_ast,
command=oline,
line=oline,
)
if not nested_decision.allowed:
return nested_decision

for expansion in expansions:
if expansion.kind != "parameter_expansion":
continue
variable_text = sec._parameter_expansion_path_probe(expansion.body).strip()

for variable_text in expansion_inspection.parameter_path_probes:
violation = _first_path_violation(
variable_text, policy, check_current_dir=False
)
Expand All @@ -199,6 +176,25 @@ def authorize(
line=oline,
)

for expansion in expansion_inspection.executable_expansions:
inner = expansion.body.strip()
if not inner:
continue

if expansion.kind in {"command_substitution", "process_substitution"}:
violation = _first_path_violation(inner, policy, check_current_dir=False)
if violation:
return _deny(
reasons.FORBIDDEN_PATH,
canonical_ast,
path=violation,
line=oline,
)

nested_decision = _authorize_nested(inner, policy, mode, ssh, depth)
if not nested_decision.allowed:
return nested_decision

allowed_commands = _allowed_commands(policy, ssh=ssh)

for command_node in canonical_ast.commands:
Expand Down
Loading
Loading