diff --git a/hyperglass/constants.py b/hyperglass/constants.py index 0139c741..a5176f99 100644 --- a/hyperglass/constants.py +++ b/hyperglass/constants.py @@ -87,3 +87,15 @@ "bird", "openbgpd", ) + +# Characters that must never appear in a query target. Used at two points in +# the request pipeline (once at the type/validation boundary in +# `models.api.query`, again at the device-transport boundary in +# `execution.drivers._construct`) so a regression in either layer cannot let +# a CLI/shell metacharacter reach a device. +FORBIDDEN_TARGET_CHARS = frozenset( + "\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\x0c\r\x0e\x0f" + "\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1a\x1b\x1c\x1d\x1e\x1f" + "\x7f" # DEL + ";|&`<>\"\\" +) diff --git a/hyperglass/execution/drivers/_construct.py b/hyperglass/execution/drivers/_construct.py index c1d1e2b5..835655c8 100644 --- a/hyperglass/execution/drivers/_construct.py +++ b/hyperglass/execution/drivers/_construct.py @@ -14,7 +14,11 @@ # Project from hyperglass.log import log from hyperglass.util import get_fmt_keys -from hyperglass.constants import TRANSPORT_REST, TARGET_FORMAT_SPACE +from hyperglass.constants import ( + TRANSPORT_REST, + TARGET_FORMAT_SPACE, + FORBIDDEN_TARGET_CHARS, +) from hyperglass.exceptions.public import InputInvalid from hyperglass.exceptions.private import ConfigError @@ -29,6 +33,12 @@ FormatterCallback = t.Callable[[str], t.Union[t.List[str], str]] +# Final, post-formatter defense before a target is interpolated into a device +# command. The same `FORBIDDEN_TARGET_CHARS` set is checked at the type-level +# boundary in `models.api.query`; running it again here means a regression in +# either the QueryTarget constraint or a custom directive's regex still +# cannot reach `send_command`. + class Construct: """Construct SSH commands/REST API parameters from validated query data.""" @@ -110,6 +120,13 @@ def format(self, command: str) -> str: except ValueError: pass + target_str = str(self.target) + if any(c in FORBIDDEN_TARGET_CHARS for c in target_str): + raise InputInvalid( + error="Target contains disallowed character(s)", + target=target_str, + ) + return command.format(target=self.target, mask=mask, **attrs) def queries(self): diff --git a/hyperglass/execution/drivers/tests/test_construct.py b/hyperglass/execution/drivers/tests/test_construct.py index 7e5adee2..6234adf9 100644 --- a/hyperglass/execution/drivers/tests/test_construct.py +++ b/hyperglass/execution/drivers/tests/test_construct.py @@ -11,6 +11,7 @@ from hyperglass.models.directive import Directives from hyperglass.models.config.params import Params from hyperglass.models.config.devices import Devices +from hyperglass.exceptions.public import InputInvalid # Local from .._construct import Construct @@ -88,3 +89,23 @@ def test_construct(state): ) constructor = Construct(device=state.devices["test1"], query=query) assert constructor.target == "192.0.2.0/24" + + +@pytest.mark.parametrize("metachar", [";", "|", "\n", "`", '"', "<", ">", "\\"]) +def test_construct_format_rejects_forbidden_post_formatter_target(state, metachar): + """Layer-3 defense in depth must reject a metachar in `self.target`. + + Even if Layers 1/2 are bypassed (e.g. by a plugin transform that injects + a metacharacter, or a `RuleWithoutValidation` directive), + `Construct.format()` must still refuse to build the command string. + Simulated by mutating `target` post-init to a forbidden value. + """ + query = Query( + queryLocation="test1", + queryTarget="192.0.2.0/24", + queryType="juniper_bgp_route", + ) + constructor = Construct(device=state.devices["test1"], query=query) + constructor.target = f"192.0.2.0/24{metachar}id" + with pytest.raises(InputInvalid): + constructor.format("show route {target}") diff --git a/hyperglass/models/api/query.py b/hyperglass/models/api/query.py index a8791bc0..f2f6526e 100644 --- a/hyperglass/models/api/query.py +++ b/hyperglass/models/api/query.py @@ -15,6 +15,7 @@ from hyperglass.util import snake_to_camel, repr_from_attrs from hyperglass.state import use_state from hyperglass.plugins import InputPluginManager +from hyperglass.constants import FORBIDDEN_TARGET_CHARS from hyperglass.exceptions.public import InputInvalid, QueryTypeNotFound, QueryLocationNotFound from hyperglass.exceptions.private import InputValidationError @@ -23,9 +24,26 @@ QueryLocation = Annotated[str, StringConstraints(strict=True, min_length=1, strip_whitespace=True)] -QueryTarget = Annotated[str, StringConstraints(min_length=1, strip_whitespace=True)] +QueryTarget = Annotated[ + str, + StringConstraints(min_length=1, max_length=255, strip_whitespace=True), +] QueryType = Annotated[str, StringConstraints(strict=True, min_length=1, strip_whitespace=True)] +def _check_query_target(value: str) -> str: + """Reject targets containing control characters or shell/CLI metacharacters. + + The forbidden set is the canonical `FORBIDDEN_TARGET_CHARS` from + `hyperglass.constants`; the same set is checked again at the device + transport boundary (`execution.drivers._construct`). + """ + if any(c in FORBIDDEN_TARGET_CHARS for c in value): + raise InputValidationError( + error="Target contains disallowed character(s)", + target=value, + ) + return value + class SimpleQuery(BaseModel): """A simple representation of a post-validated query.""" @@ -106,6 +124,14 @@ def random(self) -> str: def validate_query_target(self) -> None: """Validate a query target after all fields/relationships have been initialized.""" + # Reject control characters and CLI/shell metacharacters before any + # rule-based validation. Defense in depth: even with a permissive + # directive regex, these characters must never reach a device CLI. + if isinstance(self.query_target, list): + for item in self.query_target: + _check_query_target(item) + else: + _check_query_target(self.query_target) # Run config/rule-based validations. self.directive.validate_target(self.query_target) # Run plugin-based validations. diff --git a/hyperglass/models/directive.py b/hyperglass/models/directive.py index ab3daafe..cb99bbae 100644 --- a/hyperglass/models/directive.py +++ b/hyperglass/models/directive.py @@ -201,15 +201,33 @@ class RuleWithPattern(Rule): _type: RuleTypeAttr = "pattern" condition: str - def validate_target(self, target: str, *, multiple: bool) -> str: # noqa: C901 + # When `condition == "*"`, accept only characters that are legitimately part + # of BGP AS-path regex syntax (anchors, quantifiers, groups, character + # classes) or BGP community values. CLI metacharacters (`|`, `;`, `&`, + # `<`, `>`, backtick, quotes, whitespace control, etc.) are excluded so a + # user cannot break out of `command.format(target=...)` on the device. + # Regex alternation (`|`) is intentionally not allowed: most built-in + # vendor directives interpolate the target unquoted, where `|` would be + # parsed as a CLI pipe filter rather than regex alternation. Custom + # directives should set an explicit `condition` regex. + _WILDCARD_PATTERN = re.compile( + r"[A-Za-z0-9:_\-\^\$\.\*\+\?\(\)\[\] ]+" + ) + + def validate_target( # noqa: C901 + self, target: t.Union[str, t.List[str]], *, multiple: bool + ) -> bool: """Validate a string target against configured regex patterns.""" def validate_single_value(value: str) -> t.Union[bool, BaseException]: if self.condition == "*": - pattern = re.compile(".+", re.IGNORECASE) + pattern = self._WILDCARD_PATTERN else: pattern = re.compile(self.condition, re.IGNORECASE) - is_match = pattern.match(value) + # `fullmatch` (not `match`) ensures the entire target conforms to + # the pattern. With `match`, a pattern like `[0-9]+` would accept + # `123;reboot` because the match anchors only at the start. + is_match = pattern.fullmatch(value) if is_match and self.action == "permit": return True @@ -273,6 +291,23 @@ class Directive(HyperglassUniqueModel, unique_by=("id", "table_output")): multiple: bool = False multiple_separator: str = " " + @field_validator("multiple_separator") + @classmethod + def validate_multiple_separator(cls, value: str) -> str: + """Restrict the multi-target join character to safe values. + + The separator is interpolated unquoted between user-supplied targets + and then sent verbatim to the device CLI. Allowing arbitrary separators + would make multi-target queries an injection vector if an operator + misconfigured the value (e.g. `;` or `|`). + """ + allowed = {" ", ","} + if value not in allowed: + raise ValueError( + f"multiple_separator must be one of {sorted(allowed)!r}, got {value!r}" + ) + return value + @field_validator("rules", mode="before") @classmethod def validate_rules(cls, rules: t.List[t.Dict[str, t.Any]]): diff --git a/hyperglass/models/tests/test_input_hardening.py b/hyperglass/models/tests/test_input_hardening.py new file mode 100644 index 00000000..fbc0f320 --- /dev/null +++ b/hyperglass/models/tests/test_input_hardening.py @@ -0,0 +1,193 @@ +"""Regression tests for input-validation hardening. + +These tests pin the security guarantees of the input pipeline: + +1. CLI/shell metacharacters and control characters are rejected before any + target reaches `command.format(target=...)`. +2. The wildcard `condition: "*"` accepts only characters that legitimately + appear in BGP AS-path regex syntax or community values. +3. `RuleWithPattern` uses `fullmatch` semantics, so a custom regex without + end-of-string anchoring still cannot allow trailing payload. +4. `Directive.multiple_separator` is restricted to a small allow-list. +""" + +# Standard Library +import pytest + +# Project +from hyperglass.exceptions.private import InputValidationError +from hyperglass.models.api.query import _check_query_target + +# Local +from ..directive import Directive, RuleWithPattern + + +@pytest.mark.parametrize( + "target", + [ + "1.1.1.1; reboot", + "1.1.1.1 | section bgp", + "1.1.1.1 && uname", + "1.1.1.1\nshow running-config", + "1.1.1.1\rshow running-config", + "foo`id`", + 'foo" | section line "bar', + "foo\\bar", + "1.1.1.1 > /tmp/x", + "1.1.1.1 < /tmp/x", + ], +) +def test_wildcard_pattern_rejects_metachars(target): + """`condition: "*"` must not let CLI metacharacters through.""" + rule = RuleWithPattern(condition="*", action="permit", commands=["show {target}"]) + assert rule.validate_target(target, multiple=False) is False + + +@pytest.mark.parametrize( + "target", + [ + "65000", + "65000_65001", + "_65000_", + "^65000$", + "65000 .* 65001", + "(65000)(65001)", + "65000:100", + "no-export", + ], +) +def test_wildcard_pattern_accepts_legitimate_targets(target): + """`condition: "*"` must continue to accept normal AS-path / community values.""" + rule = RuleWithPattern(condition="*", action="permit", commands=["show {target}"]) + assert rule.validate_target(target, multiple=False) is True + + +def test_pattern_rule_uses_fullmatch(): + """A non-anchored custom regex must not allow trailing payload (fullmatch, not match).""" + rule = RuleWithPattern( + condition=r"[0-9]+", + action="permit", + commands=["show ip bgp regexp {target}"], + ) + # `match` would accept this because it anchors only at the start; `fullmatch` rejects. + assert rule.validate_target("12345abc", multiple=False) is False + assert rule.validate_target("12345", multiple=False) is True + + +def test_multiple_separator_allowlist_rejects_pipe(): + """`multiple_separator: "|"` would be a CLI-injection vector if accepted.""" + with pytest.raises(ValueError): + Directive( + id="x", + name="x", + field=None, + multiple=True, + multiple_separator="|", + rules=[{"condition": "*", "action": "permit", "command": "show {target}"}], + ) + + +def test_multiple_separator_allowlist_rejects_semicolon(): + with pytest.raises(ValueError): + Directive( + id="x", + name="x", + field=None, + multiple=True, + multiple_separator=";", + rules=[{"condition": "*", "action": "permit", "command": "show {target}"}], + ) + + +def test_multiple_separator_allowlist_accepts_space_and_comma(): + for sep in (" ", ","): + d = Directive( + id="x", + name="x", + field=None, + multiple=True, + multiple_separator=sep, + rules=[{"condition": "*", "action": "permit", "command": "show {target}"}], + ) + assert d.multiple_separator == sep + + +def test_pattern_rule_rejects_embedded_newline_with_strict_regex(): + """An IP-shaped regex must not match a target with an embedded newline.""" + rule = RuleWithPattern( + condition=r"\d+\.\d+\.\d+\.\d+", + action="permit", + commands=["ping {target}"], + ) + assert rule.validate_target("1.1.1.1\nshow run", multiple=False) is False + + +# Layer 1 (type-level) tests: `_check_query_target` is the first gate the +# request hits, before any directive rule runs. These tests pin the guarantee +# directly so the type-level check can't silently regress even if the +# directive-rule layer (Layer 2) is permissive. + + +@pytest.mark.parametrize( + "target", + [ + "1.1.1.1;reboot", + "1.1.1.1|cmd", + "1.1.1.1&cmd", + "1.1.1.1\nshow run", + "1.1.1.1\rshow run", + "1.1.1.1\tshow run", + "1.1.1.1\x00", + "foo`id`", + 'foo"bar', + "foobar", + "foo\\bar", + ], +) +def test_check_query_target_rejects_forbidden_chars(target): + """The type-level forbidden-character check must reject every metachar.""" + with pytest.raises(InputValidationError): + _check_query_target(target) + + +@pytest.mark.parametrize( + "target", + ["192.0.2.0/24", "65000:100", "_65000_", "^65000$", "(65000)(65001)"], +) +def test_check_query_target_accepts_normal_targets(target): + """Normal looking-glass targets must pass the type-level check.""" + assert _check_query_target(target) == target + + +def test_query_validates_each_list_target_elementwise(): + """List targets must be checked element-wise at the type-level. + + Even when a directive's regex permits anything (`condition: "*"`), the + type-level forbidden-character check must still reject a list whose any + element contains a metacharacter. + """ + # Simulate the loop in `Query.validate_query_target`: each element runs + # through `_check_query_target` before the directive rule sees it. + targets = ["1.1.1.1", "2.2.2.2;reboot"] + with pytest.raises(InputValidationError): + for item in targets: + _check_query_target(item) + + +def test_check_query_target_rejects_even_when_rule_would_permit(): + """Layer-1 must reject metacharacters even when Layer-2 would permit them. + + A rule with `condition='.*'` would permit anything at the directive-rule + layer, but `_check_query_target` runs first and must still reject. + """ + permissive_rule = RuleWithPattern( + condition=r".*", action="permit", commands=["show {target}"] + ) + target = "1.1.1.1;reboot" + # The rule itself would happily permit this if it ever saw it... + # (re.fullmatch(".*") matches any string, including ones with `;`). + assert permissive_rule.validate_target(target, multiple=False) is True + # ...but the Layer-1 check that runs *before* the rule rejects it first: + with pytest.raises(InputValidationError): + _check_query_target(target)