diff --git a/.github/scripts/generate-positive-expected-results/generate_positive_expected_result.py b/.github/scripts/generate-positive-expected-results/generate_positive_expected_result.py new file mode 100644 index 00000000000..8bed496ecff --- /dev/null +++ b/.github/scripts/generate-positive-expected-results/generate_positive_expected_result.py @@ -0,0 +1,237 @@ +import argparse +import json +import os +import subprocess +import sys + +FIELD_ORDER = [ + "queryName", "severity", "line", "fileName", + "resourceType", "resourceName", "searchKey", "searchValue", + "expectedValue", "actualValue", "issueType", "similarityID", "search_line", +] + +KICS_RESULT_CODES = {0, 1, 20, 30, 40, 50, 60} + +SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__)) +REPO_ROOT = os.path.normpath(os.path.join(SCRIPT_DIR, "../../..")) +QUERIES_DIR = os.path.join(REPO_ROOT, "assets", "queries") + + +def parse_args(): + parser = argparse.ArgumentParser(description="Run a KICS scan for a given query.") + group = parser.add_mutually_exclusive_group(required=True) + group.add_argument("--run-all", action="store_true", help="Run scans for all queries under assets/queries.") + group.add_argument("--queryID", help="The query ID to scan.") + parser.add_argument("--queryPath", help="The base path of the query (required without --run-all).") + return parser.parse_args() + + +def build_command(query_id: str, scan_path: str, payload_path: str, output_path: str, output_name: str) -> list[str]: + main_go = os.path.join(REPO_ROOT, "cmd", "console", "main.go") + + return [ + "go", "run", main_go, + "scan", + "-p", scan_path, + "-o", output_path, + "--output-name", output_name, + "-i", query_id, + "-d", payload_path, + "-v", + "--experimental-queries", + "--bom", + "--enable-openapi-refs", + "--kics_compute_new_simid" + ] + + +def run_scan(query_id: str, scan_path: str, payload_path: str, output_path: str, output_name: str) -> int: + command = build_command(query_id, scan_path, payload_path, output_path, output_name) + + print("Running command:") + print(" ".join(command)) + print("-" * 60) + + try: + result = subprocess.run(command, cwd=REPO_ROOT) + if result.returncode not in KICS_RESULT_CODES: + print(f"\n[ERROR] Scan failed with return code {result.returncode}.", file=sys.stderr) + return result.returncode + except FileNotFoundError: + print("\n[ERROR] 'go' not found. Make sure Go is installed and in your PATH.", file=sys.stderr) + return 1 + + +def find_positive_tests(query_path: str) -> list[tuple[str, str]]: + """ + Return a sorted list of (label, scan_path) for each positive test in test/. + + Handles two layouts: + - File: test/positiveX. → label='positiveX', scan_path=the file + - Directory: test/positiveX/ → for each positiveX_Y. inside, + label='positiveX_Y', scan_path=the file + """ + test_dir = os.path.join(query_path, "test") + if not os.path.isdir(test_dir): + return [] + + positives = [] + for entry in os.listdir(test_dir): + if not entry.startswith("positive"): + continue + full_path = os.path.join(test_dir, entry) + if os.path.isdir(full_path): + # Directory: positiveX/ — scan each file inside individually + for file in os.listdir(full_path): + file_path = os.path.join(full_path, file) + if not os.path.isfile(file_path): + continue + label = os.path.splitext(file)[0] # e.g. 'positive2_1' + after = label[len("positive"):] + if not after or not after[0].isdigit(): # skip positive_expected_result etc. + continue + positives.append((label, file_path)) + else: + # File: positive. or positiveX. + suffix = entry[len("positive"):].split(".")[0] + if suffix and not suffix.isdigit(): + continue # skip positive_expected_result.json etc. + positives.append((f"positive{suffix}", full_path)) + + positives.sort(key=lambda x: x[0]) + return positives + + +def run_query_scans(query_id: str, query_path: str) -> tuple[list[tuple[str, str, int]], bool]: + positives = find_positive_tests(query_path) + if not positives: + print(f"[WARN] No positive tests found in {query_path}/test, skipping.", file=sys.stderr) + return [], False + + payloads_dir = os.path.join(query_path, "payloads") + os.makedirs(payloads_dir, exist_ok=True) + + output_path = os.path.join(query_path, "results") + os.sep + os.makedirs(output_path, exist_ok=True) + + failed = [] + for label, scan_path in positives: + payload_path = os.path.join(payloads_dir, f"{label}.json") + output_name = f"{label}.json" + print(f"\n -> {label}: {os.path.relpath(scan_path, REPO_ROOT)}") + rc = run_scan(query_id, scan_path, payload_path, output_path, output_name) + if rc not in KICS_RESULT_CODES: + failed.append((scan_path, payload_path, rc)) + + written = collect_and_write_expected_results(query_path) + return failed, written + + +def collect_and_write_expected_results(query_path: str) -> bool: + """ + Read all positive*.json result files from results/, extract findings, + sort by (fileName, line, issueType, searchKey, similarityID), and write + test/positive_expected_result.json. Returns True if the file was written. + """ + results_dir = os.path.join(query_path, "results") + if not os.path.isdir(results_dir): + return False + + entries = [] + for filename in sorted(os.listdir(results_dir)): + if not filename.startswith("positive") or not filename.endswith(".json"): + continue + with open(os.path.join(results_dir, filename), encoding="utf-8") as f: + data = json.load(f) + + all_findings = data.get("queries", []) + data.get("bill_of_materials", []) + for query in all_findings: + query_name = query.get("query_name", "") + severity = query.get("severity", "") + for file_entry in query.get("files", []): + entry = { + "queryName": query_name, + "severity": severity, + "line": file_entry.get("line", 0), + "fileName": os.path.basename(file_entry.get("file_name", "")), + "resourceType": file_entry.get("resource_type", ""), + "resourceName": file_entry.get("resource_name", ""), + "searchKey": file_entry.get("search_key", ""), + "searchValue": file_entry.get("search_value", ""), + "expectedValue":file_entry.get("expected_value", ""), + "actualValue": file_entry.get("actual_value", ""), + "issueType": file_entry.get("issue_type", ""), + "similarityID": file_entry.get("similarity_id", ""), + "search_line": file_entry.get("search_line", 0), + } + entries.append({k: entry[k] for k in FIELD_ORDER}) + + if not entries: + return False + + entries.sort(key=lambda x: ( + x["fileName"], x["line"], x["issueType"], x["searchKey"], x["similarityID"] + )) + + out_path = os.path.join(query_path, "test", "positive_expected_result.json") + with open(out_path, "w", encoding="utf-8") as f: + json.dump(entries, f, indent=2) + f.write("\n") + + print(f" -> Written {len(entries)} entries to {os.path.relpath(out_path, REPO_ROOT)}") + return True + + +def iter_queries(): + """Yield (query_id, query_path) for every query found under assets/queries.""" + for dirpath, _, filenames in os.walk(QUERIES_DIR): + if "metadata.json" not in filenames: + continue + metadata = os.path.join(dirpath, "metadata.json") + with open(metadata, encoding="utf-8") as f: + data = json.load(f) + query_id = data.get("id") + if not query_id: + print(f"[WARN] No 'id' field in {metadata}, skipping.", file=sys.stderr) + continue + yield query_id, dirpath + + +def main(): + args = parse_args() + + if args.run_all: + all_failed = [] + written_count = 0 + queries = list(iter_queries()) + total = len(queries) + width = len(str(total)) + print(f"Found {total} queries. Starting scans...\n") + for idx, (query_id, query_path) in enumerate(queries, start=1): + print(f"\n[{idx:{width}d}/{total}] {os.path.relpath(query_path, REPO_ROOT)}") + failed, written = run_query_scans(query_id, query_path) + all_failed.extend(failed) + if written: + written_count += 1 + + print("\n" + "=" * 60) + print(f"[SUMMARY] {written_count}/{total} positive_expected_result.json written") + if all_failed: + print(f" {len(all_failed)} scan(s) failed:") + for scan_path, payload_path, rc in all_failed: + print(f" - {os.path.relpath(scan_path, REPO_ROOT)} → exit {rc}") + sys.exit(1) + else: + print(" All scans completed successfully.") + sys.exit(0) + else: + if not args.queryPath: + print("[ERROR] --queryPath is required when not using --run-all.", file=sys.stderr) + sys.exit(1) + query_path = os.path.normpath(os.path.join(REPO_ROOT, args.queryPath)) + failed, _ = run_query_scans(args.queryID, query_path) + sys.exit(1 if failed else 0) + + +if __name__ == "__main__": + main() diff --git a/docs/creating-queries.md b/docs/creating-queries.md index 5471685e7ba..cacdf8ab640 100644 --- a/docs/creating-queries.md +++ b/docs/creating-queries.md @@ -297,6 +297,100 @@ If the **query.rego** file implements more than one query, the **metadata.json** } ``` +Filling positive_expected_result.json: + +The `positive_expected_result.json` file is a JSON array where each entry represents a single expected finding from a positive test file. Each entry supports the following fields: + +- `queryName` the name of the query as defined in `metadata.json` +- `severity` the severity level of the finding (`CRITICAL`, `HIGH`, `MEDIUM`, `LOW`, or `INFO`) +- `line` the line number in the positive test file where the vulnerability is detected +- `fileName` the name of the positive test file (e.g., `positive1.tf`, `positive.yaml`) +- `resourceType` the type of the resource flagged by the finding (e.g., `aws_cloudtrail`, `community.aws.elb_application_lb`) +- `resourceName` the name or label of the specific resource instance +- `searchKey` the search key path used by KICS to locate the vulnerability in the original document +- `searchValue` an additional value used to distinguish findings when multiple results point to the same line +- `expectedValue` a description of the expected (secure) value +- `actualValue` a description of the actual (insecure) value detected +- `issueType` the type of issue: `IncorrectValue`, `MissingAttribute`, or `RedundantAttribute` +- `similarityID` a hash that uniquely identifies the finding, used for deduplication and tracking +- `search_line` the search line path used by KICS for line detection; set to `-1` when not applicable + +Example: + +```json +[ + { + "queryName": "Authentication Without MFA", + "severity": "LOW", + "line": 2, + "fileName": "positive.yaml", + "resourceType": "community.aws.sts_assume_role", + "resourceName": "Assume an existing role", + "searchKey": "name={{Assume an existing role}}.{{community.aws.sts_assume_role}}", + "searchValue": "mfa_token", + "expectedValue": "sts_assume_role.mfa_token should be set", + "actualValue": "sts_assume_role.mfa_token is undefined", + "issueType": "MissingAttribute", + "similarityID": "0863129177e5f7d0f0fc55d63426f810f58f35c1270b64f4b57fbd1d8a3639cc", + "search_line": 2 + }, + { + "queryName": "Authentication Without MFA", + "severity": "LOW", + "line": 9, + "fileName": "positive.yaml", + "resourceType": "sts_assume_role", + "resourceName": "Hello", + "searchKey": "name={{Hello}}.{{sts_assume_role}}", + "searchValue": "mfa_serial_number", + "expectedValue": "sts_assume_role.mfa_serial_number should be set", + "actualValue": "sts_assume_role.mfa_serial_number is undefined", + "issueType": "MissingAttribute", + "similarityID": "89628f77eee62d856d5523656cdcbc1be1bfca9a1aaed79ffa9871979c947202", + "search_line": 9 + } +] +``` + +Instead of filling this file manually, you can use the helper script provided at `.github/scripts/generate-positive-expected-results/generate_positive_expected_result.py`. The script runs a KICS scan against each positive test file, collects the findings, and produces a correctly formatted `positive_expected_result.json`. + +**Important:** The script must be run from the **script's own directory** (`.github/scripts/generate-positive-expected-results/`), since it resolves the repository root and all other paths relative to its own location. It also requires **Go** to be installed and available in your `PATH`. + +The script supports two modes of operation: + +**Single query mode** — requires both `--queryID` and `--queryPath`: + +```bash +cd .github/scripts/generate-positive-expected-results/ +python generate_positive_expected_result.py \ + --queryID \ + --queryPath +``` + +For example: + +```bash +cd .github/scripts/generate-positive-expected-results/ +python generate_positive_expected_result.py \ + --queryID "8173d5eb-96b5-4aa6-a71b-ecfa153c123d" \ + --queryPath "assets/queries/terraform/aws/cloudtrail_multi_region_disabled" +``` + +**All queries mode** — scans every query under `assets/queries/`: + +```bash +cd .github/scripts/generate-positive-expected-results/ +python generate_positive_expected_result.py --run-all +``` + +| Flag | Required | Description | +|---|---|---| +| `--queryID` | Yes (unless `--run-all`) | The UUID of the query to scan, found in the query's `metadata.json` under the `id` field. | +| `--queryPath` | Yes (unless `--run-all`) | The relative path (from the repository root) to the query directory containing `query.rego` and `metadata.json`. | +| `--run-all` | No | Iterates over all queries under `assets/queries/`, reading each `metadata.json` to obtain the query ID automatically. Mutually exclusive with `--queryID`. | + +The script discovers positive test files in the query's `test/` directory, runs a KICS scan for each one, collects and merges the findings, sorts them by file name, line number, issue type, search key, and similarity ID, and writes the result to `test/positive_expected_result.json`. + Filling query.rego: - `documentId` id of the sample where the vulnerability occurs