Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ Using the environment variable `DR_BYPASS_TAGS_VALIDATION` will bypass the Detec

Using the environment variable `DR_BYPASS_TIMELINE_TEMPLATE_VALIDATION` will bypass the timeline template id and title validation for rules.

Using the environment variable `DR_BYPASS_ESQL_KEEP_VALIDATION` will bypass local validation that ES|QL rules include a `keep` command and that non-aggregate queries list `_id`, `_version`, and `_index` in `keep` (other ES|QL checks are unchanged).

Using the environment variable `DR_BYPASS_ESQL_METADATA_VALIDATION` will bypass local validation that non-aggregate ES|QL queries use `FROM ... METADATA _id, _version, _index` or an aggregate `STATS ... BY` pattern (other ES|QL checks are unchanged).

In `_config.yaml`, `bypass_optional_elastic_validation: true` enables all of the above at load time. Alternatively, set any of the top-level booleans `bypass_note_validation_and_parse`, `bypass_bbr_lookback_validation`, `bypass_tags_validation`, `bypass_timeline_template_validation`, `bypass_esql_keep_validation`, or `bypass_esql_metadata_validation` to `true` (see comments in `detection_rules/etc/_config.yaml`).

Using the environment variable `DR_CLI_MAX_WIDTH` will set a custom max width for the click CLI.
For instance, some users may want to increase the default value in cases where help messages are cut off.

Expand Down
31 changes: 29 additions & 2 deletions detection_rules/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,13 @@
from eql.utils import load_dump # type: ignore[reportMissingTypeStubs]

from .misc import discover_tests
from .utils import cached, get_etc_path, load_etc_dump, set_all_validation_bypass
from .utils import (
OPTIONAL_ELASTIC_VALIDATION_BYPASS_ENV,
cached,
get_etc_path,
load_etc_dump,
set_all_validation_bypass,
)

ROOT_DIR = Path(__file__).parent.parent
CUSTOM_RULES_DIR = os.getenv("CUSTOM_RULES_DIR", None)
Expand Down Expand Up @@ -208,6 +214,12 @@ class RulesConfig:
exception_dir: Path | None = None
normalize_kql_keywords: bool = True
bypass_optional_elastic_validation: bool = False
bypass_note_validation_and_parse: bool = False
bypass_bbr_lookback_validation: bool = False
bypass_tags_validation: bool = False
bypass_timeline_template_validation: bool = False
bypass_esql_keep_validation: bool = False
bypass_esql_metadata_validation: bool = False
no_tactic_filename: bool = False

def __post_init__(self) -> None:
Expand Down Expand Up @@ -323,7 +335,22 @@ def parse_rules_config(path: Path | None = None) -> RulesConfig: # noqa: PLR091
# bypass_optional_elastic_validation
contents["bypass_optional_elastic_validation"] = loaded.get("bypass_optional_elastic_validation", False)
if contents["bypass_optional_elastic_validation"]:
set_all_validation_bypass(contents["bypass_optional_elastic_validation"])
set_all_validation_bypass(True)
for yaml_key in OPTIONAL_ELASTIC_VALIDATION_BYPASS_ENV:
contents[yaml_key] = True
else:
for yaml_key, env_var in OPTIONAL_ELASTIC_VALIDATION_BYPASS_ENV.items():
if yaml_key in loaded:
val = loaded[yaml_key]
if not isinstance(val, bool):
raise SystemExit(
f"`{yaml_key}` in _config.yaml must be a boolean (true/false), not {type(val).__name__}"
)
else:
val = False
contents[yaml_key] = val
if val:
os.environ[env_var] = str(True)

# no_tactic_filename
contents["no_tactic_filename"] = loaded.get("no_tactic_filename", False)
Expand Down
16 changes: 14 additions & 2 deletions detection_rules/etc/_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,20 @@ normalize_kql_keywords: False
# stack-schema-map.yaml file when using a custom rules directory and config.
# auto_gen_schema_file: "etc/auto-gen-schema.json"

# To on bulk disable elastic validation for optional fields, use the following line
# bypass_optional_elastic_validation: True
# Optional Elastic validation bypasses (each true value sets the matching DR_BYPASS_* env var at load time).
#
# 1) Enable every bypass at once:
# bypass_optional_elastic_validation: true
#
# 2) Or set only the bypasses you need (ignored if bypass_optional_elastic_validation is true):
# bypass_note_validation_and_parse: true # DR_BYPASS_NOTE_VALIDATION_AND_PARSE
# bypass_bbr_lookback_validation: true # DR_BYPASS_BBR_LOOKBACK_VALIDATION
# bypass_tags_validation: true # DR_BYPASS_TAGS_VALIDATION
# bypass_timeline_template_validation: true # DR_BYPASS_TIMELINE_TEMPLATE_VALIDATION
# bypass_esql_keep_validation: true # DR_BYPASS_ESQL_KEEP_VALIDATION
# bypass_esql_metadata_validation: true # DR_BYPASS_ESQL_METADATA_VALIDATION
#
# Each must be true or false if present; omitted keys default to false.

# This points to the testing config file (see example under detection_rules/etc/example_test_config.yaml)
# This can either be set here or as the environment variable `DETECTION_RULES_TEST_CONFIG`, with precedence
Expand Down
63 changes: 38 additions & 25 deletions detection_rules/rule.py
Original file line number Diff line number Diff line change
Expand Up @@ -981,36 +981,49 @@ def validates_esql_data(self, data: dict[str, Any], **_: Any) -> None:
)

# Ensure that non-aggregate queries have metadata
if not combined_pattern.search(query_lower):
raise EsqlSemanticError(
f"Rule: {data['name']} contains a non-aggregate query without"
f" metadata fields '_id', '_version', and '_index' ->"
f" Add 'metadata _id, _version, _index' to the from command or add an aggregate function."
if os.environ.get("DR_BYPASS_ESQL_METADATA_VALIDATION") is None:
bypass_metadata_hint = (
" To bypass ES|QL `FROM` metadata validation, set the environment variable "
"`DR_BYPASS_ESQL_METADATA_VALIDATION`."
)
if not combined_pattern.search(query_lower):
raise EsqlSemanticError(
f"Rule: {data['name']} contains a non-aggregate query without"
f" metadata fields '_id', '_version', and '_index' ->"
f" Add 'metadata _id, _version, _index' to the from command or add an aggregate function."
+ bypass_metadata_hint
)

# Enforce KEEP command for ESQL rules and that METADATA fields are present in non-aggregate queries
# Match | followed by optional whitespace/newlines and then 'keep'
keep_pattern = re.compile(r"\|\s*keep\b\s+([^\|]+)", re.IGNORECASE | re.DOTALL)
keep_matches = list(keep_pattern.finditer(query_lower))
if not keep_matches:
raise EsqlSemanticError(
f"Rule: {data['name']} does not contain a 'keep' command -> Add a 'keep' command to the query."
if os.environ.get("DR_BYPASS_ESQL_KEEP_VALIDATION") is None:
bypass_keep_hint = (
" To bypass ES|QL `keep` validation, set the environment variable `DR_BYPASS_ESQL_KEEP_VALIDATION`."
)
# Match | followed by optional whitespace/newlines and then 'keep'
keep_pattern = re.compile(r"\|\s*keep\b\s+([^\|]+)", re.IGNORECASE | re.DOTALL)
keep_matches = list(keep_pattern.finditer(query_lower))
if not keep_matches:
raise EsqlSemanticError(
f"Rule: {data['name']} does not contain a 'keep' command -> Add a 'keep' command to the query."
+ bypass_keep_hint
)

# Ensure that keep clause includes metadata fields on non-aggregate queries
aggregate_pattern = re.compile(r"\|\s*stats\b(?:\s+([^\|]+?))?(?:\s+by\s+([^\|]+))?", re.IGNORECASE | re.DOTALL)
if not aggregate_pattern.search(query_lower):
for keep_match in keep_matches:
raw_keep = re.sub(r"//.*", "", keep_match.group(1))
keep_fields = [field.strip() for field in raw_keep.split(",") if field.strip()]
if "*" not in keep_fields:
required_metadata = {"_id", "_version", "_index"}
if not required_metadata.issubset(set(map(str.strip, keep_fields))):
raise EsqlSemanticError(
f"Rule: {data['name']} contains a keep clause without"
f" metadata fields '_id', '_version', and '_index' ->"
f" Add '_id', '_version', '_index' to the keep command."
)
# Ensure that keep clause includes metadata fields on non-aggregate queries
aggregate_pattern = re.compile(
r"\|\s*stats\b(?:\s+([^\|]+?))?(?:\s+by\s+([^\|]+))?", re.IGNORECASE | re.DOTALL
)
if not aggregate_pattern.search(query_lower):
for keep_match in keep_matches:
raw_keep = re.sub(r"//.*", "", keep_match.group(1))
keep_fields = [field.strip() for field in raw_keep.split(",") if field.strip()]
if "*" not in keep_fields:
required_metadata = {"_id", "_version", "_index"}
if not required_metadata.issubset(set(map(str.strip, keep_fields))):
raise EsqlSemanticError(
f"Rule: {data['name']} contains a keep clause without"
f" metadata fields '_id', '_version', and '_index' ->"
f" Add '_id', '_version', '_index' to the keep command." + bypass_keep_hint
)


@dataclass(frozen=True, kw_only=True)
Expand Down
17 changes: 13 additions & 4 deletions detection_rules/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,12 +136,21 @@ def save_etc_dump(contents: dict[str, Any], path: list[str], sort_keys: bool = T
eql.utils.save_dump(contents, path) # type: ignore[reportUnknownVariableType]


# Top-level _config.yaml key -> DR_BYPASS_* env var set when true at load time
OPTIONAL_ELASTIC_VALIDATION_BYPASS_ENV: dict[str, str] = {
"bypass_note_validation_and_parse": "DR_BYPASS_NOTE_VALIDATION_AND_PARSE",
"bypass_bbr_lookback_validation": "DR_BYPASS_BBR_LOOKBACK_VALIDATION",
"bypass_tags_validation": "DR_BYPASS_TAGS_VALIDATION",
"bypass_timeline_template_validation": "DR_BYPASS_TIMELINE_TEMPLATE_VALIDATION",
"bypass_esql_keep_validation": "DR_BYPASS_ESQL_KEEP_VALIDATION",
"bypass_esql_metadata_validation": "DR_BYPASS_ESQL_METADATA_VALIDATION",
}


def set_all_validation_bypass(env_value: bool = False) -> None:
"""Set all validation bypass environment variables."""
os.environ["DR_BYPASS_NOTE_VALIDATION_AND_PARSE"] = str(env_value)
os.environ["DR_BYPASS_BBR_LOOKBACK_VALIDATION"] = str(env_value)
os.environ["DR_BYPASS_TAGS_VALIDATION"] = str(env_value)
os.environ["DR_BYPASS_TIMELINE_TEMPLATE_VALIDATION"] = str(env_value)
for env_var in OPTIONAL_ELASTIC_VALIDATION_BYPASS_ENV.values():
os.environ[env_var] = str(env_value)


def set_nested_value(obj: dict[str, Any], compound_key: str, value: Any) -> None:
Expand Down
9 changes: 8 additions & 1 deletion docs-dev/custom-rules-management.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,8 @@ Some notes:
* To manage action-connectors tied to rules one can set an action-connectors directory using the optional `action_connector_dir` value (included above) set to be the desired path. If an actions_connector directory is explicitly specified in a CLI command, the config value will be ignored.
* To turn on automatic schema generation for non-ecs fields via custom schemas add `auto_gen_schema_file: <path_to_your_json_file>`. This will generate a schema file in the specified location that will be used to add entries for each field and index combination that is not already in a known schema. This will also automatically add it to your stack-schema-map.yaml file when using a custom rules directory and config.
* For Kibana action items, currently these are included in the rule toml files themselves. At a later date, we may allow for bulk editing of rule action items through separate action toml files. The action_dir config key is left available for this later implementation. For now to bulk update, use the bulk actions add rule actions UI in Kibana.
* To on bulk disable elastic validation for optional fields, use the following line `bypass_optional_elastic_validation: True`.
* To disable optional Elastic validation in bulk, set `bypass_optional_elastic_validation: true` in `_config.yaml`. That sets every `DR_BYPASS_*` environment variable that `set_all_validation_bypass()` controls (note parsing, BBR lookback, tags unit tests, timeline template, ES|QL `keep`, ES|QL `FROM` metadata).
* To enable only some of those bypasses, set the matching top-level booleans in `_config.yaml` (omit `bypass_optional_elastic_validation` or set it to `false`): `bypass_note_validation_and_parse`, `bypass_bbr_lookback_validation`, `bypass_tags_validation`, `bypass_timeline_template_validation`, `bypass_esql_keep_validation`, `bypass_esql_metadata_validation`. Each `true` sets the corresponding `DR_BYPASS_*` variable when the config is loaded. If `bypass_optional_elastic_validation` is `true`, those individual flags are all treated as enabled (the bulk flag wins).


When using the repo, set the environment variable `CUSTOM_RULES_DIR=<directory-with-_config.yaml>`
Expand Down Expand Up @@ -132,6 +133,12 @@ class RulesConfig:
exception_dir: Optional[Path] = None
normalize_kql_keywords: bool = True
bypass_optional_elastic_validation: bool = False
bypass_note_validation_and_parse: bool = False
bypass_bbr_lookback_validation: bool = False
bypass_tags_validation: bool = False
bypass_timeline_template_validation: bool = False
bypass_esql_keep_validation: bool = False
bypass_esql_metadata_validation: bool = False

# using the stack_schema_map
RULES_CONFIG.stack_schema_map
Expand Down
6 changes: 6 additions & 0 deletions docs-dev/developing.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ Using the environment variable `DR_BYPASS_TAGS_VALIDATION` will bypass the Detec

Using the environment variable `DR_BYPASS_TIMELINE_TEMPLATE_VALIDATION` will bypass the timeline template id and title validation for rules.

Using the environment variable `DR_BYPASS_ESQL_KEEP_VALIDATION` will bypass local validation that ES|QL rules include a `keep` command and that non-aggregate queries list `_id`, `_version`, and `_index` in `keep` (other ES|QL checks are unchanged).

Using the environment variable `DR_BYPASS_ESQL_METADATA_VALIDATION` will bypass local validation that non-aggregate ES|QL queries use `FROM ... METADATA _id, _version, _index` or an aggregate `STATS ... BY` pattern (other ES|QL checks are unchanged).

In `_config.yaml`, `bypass_optional_elastic_validation: true` enables all of these bypass env vars when config is loaded. You can instead set individual top-level flags (`bypass_note_validation_and_parse`, `bypass_bbr_lookback_validation`, `bypass_tags_validation`, `bypass_timeline_template_validation`, `bypass_esql_keep_validation`, `bypass_esql_metadata_validation`); the bulk flag takes precedence if it is true. See `detection_rules/etc/_config.yaml` for an example.


## Using the `RuleResource` methods built on detections `_bulk_action` APIs

Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "detection_rules"
version = "1.6.7"
version = "1.6.8"
description = "Detection Rules is the home for rules used by Elastic Security. This repository is used for the development, maintenance, testing, validation, and release of rules for Elastic Security’s Detection Engine."
readme = "README.md"
requires-python = ">=3.12"
Expand Down
83 changes: 81 additions & 2 deletions tests/test_schemas.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@
"""Test stack versioned schemas."""

import copy
import os
import unittest
import unittest.mock
import uuid
from pathlib import Path

import eql
import pytest
import pytoml
from marshmallow import ValidationError
from semver import Version

Expand Down Expand Up @@ -318,7 +319,7 @@ def test_esql_data_validation(self):
# A random ESQL rule to deliver a test query
rule_path = Path("tests/data/command_control_dummy_production_rule.toml")
rule_body = rule_path.read_text()
rule_dict = pytoml.loads(rule_body)
rule_dict = RuleCollection.deserialize_toml_string(rule_body)

# Most used order of the metadata fields
query = """
Expand Down Expand Up @@ -357,3 +358,81 @@ def test_esql_data_validation(self):
"""
rule_dict["rule"]["query"] = query
_ = RuleCollection().load_dict(rule_dict, path=rule_path)

def test_esql_keep_validation_bypass_missing_keep(self):
"""ES|QL keep checks are skipped when DR_BYPASS_ESQL_KEEP_VALIDATION is set."""
# A random ESQL rule to deliver a test query
rule_path = Path("tests/data/command_control_dummy_production_rule.toml")
rule_body = rule_path.read_text()
rule_dict = RuleCollection.deserialize_toml_string(rule_body)
query = """
FROM logs-windows.powershell_operational* METADATA _id, _index, _version
| WHERE event.code == "4104"
"""
rule_dict["rule"]["query"] = query
with unittest.mock.patch.dict(os.environ, {"DR_BYPASS_ESQL_KEEP_VALIDATION": "1"}):
_ = RuleCollection().load_dict(rule_dict, path=rule_path)

def test_esql_keep_bypass_does_not_skip_from_metadata_validation(self):
"""FROM METADATA requirement still applies when only keep validation is bypassed."""
# A random ESQL rule to deliver a test query
rule_path = Path("tests/data/command_control_dummy_production_rule.toml")
rule_body = rule_path.read_text()
rule_dict = RuleCollection.deserialize_toml_string(rule_body)
query = """
FROM logs-windows.powershell_operational*
| WHERE event.code == "4104"
"""
rule_dict["rule"]["query"] = query
with (
unittest.mock.patch.dict(os.environ, {"DR_BYPASS_ESQL_KEEP_VALIDATION": "1"}),
pytest.raises(EsqlSemanticError),
):
_ = RuleCollection().load_dict(rule_dict, path=rule_path)

def test_esql_metadata_validation_bypass_missing_from_metadata(self):
"""ES|QL FROM METADATA checks are skipped when DR_BYPASS_ESQL_METADATA_VALIDATION is set."""
# A random ESQL rule to deliver a test query
rule_path = Path("tests/data/command_control_dummy_production_rule.toml")
rule_body = rule_path.read_text()
rule_dict = RuleCollection.deserialize_toml_string(rule_body)
query = """
FROM logs-windows.powershell_operational*
| WHERE event.code == "4104"
| KEEP event.code, _id, _version, _index
"""
rule_dict["rule"]["query"] = query
with unittest.mock.patch.dict(os.environ, {"DR_BYPASS_ESQL_METADATA_VALIDATION": "1"}):
_ = RuleCollection().load_dict(rule_dict, path=rule_path)

def test_esql_metadata_bypass_does_not_skip_keep_validation(self):
"""`keep` validation still applies when only FROM metadata validation is bypassed."""
# A random ESQL rule to deliver a test query
rule_path = Path("tests/data/command_control_dummy_production_rule.toml")
rule_body = rule_path.read_text()
rule_dict = RuleCollection.deserialize_toml_string(rule_body)
query = """
FROM logs-windows.powershell_operational*
| WHERE event.code == "4104"
"""
rule_dict["rule"]["query"] = query
with (
unittest.mock.patch.dict(os.environ, {"DR_BYPASS_ESQL_METADATA_VALIDATION": "1"}),
pytest.raises(EsqlSemanticError),
):
_ = RuleCollection().load_dict(rule_dict, path=rule_path)

def test_esql_keep_validation_bypass_missing_metadata_in_keep(self):
"""ES|QL metadata-in-keep checks are skipped when DR_BYPASS_ESQL_KEEP_VALIDATION is set."""
# A random ESQL rule to deliver a test query
rule_path = Path("tests/data/command_control_dummy_production_rule.toml")
rule_body = rule_path.read_text()
rule_dict = RuleCollection.deserialize_toml_string(rule_body)
query = """
FROM logs-windows.powershell_operational* METADATA _id, _version, _index
| WHERE event.code == "4104"
| KEEP event.code
"""
rule_dict["rule"]["query"] = query
with unittest.mock.patch.dict(os.environ, {"DR_BYPASS_ESQL_KEEP_VALIDATION": "1"}):
_ = RuleCollection().load_dict(rule_dict, path=rule_path)
Loading