Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
import argparse
import json
import os
import subprocess
import sys

FIELD_ORDER = [
"queryName", "severity", "line", "fileName",
"resourceType", "resourceName", "searchKey", "searchValue",
"expectedValue", "actualValue", "issueType", "similarityID", "search_line",
]

KICS_RESULT_CODES = {0, 1, 20, 30, 40, 50, 60}

SCRIPT_DIR = os.path.dirname(os.path.abspath(__file__))
REPO_ROOT = os.path.normpath(os.path.join(SCRIPT_DIR, "../../.."))
QUERIES_DIR = os.path.join(REPO_ROOT, "assets", "queries")


def parse_args():
parser = argparse.ArgumentParser(description="Run a KICS scan for a given query.")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--run-all", action="store_true", help="Run scans for all queries under assets/queries.")
group.add_argument("--queryID", help="The query ID to scan.")
parser.add_argument("--queryPath", help="The base path of the query (required without --run-all).")
return parser.parse_args()


def build_command(query_id: str, scan_path: str, payload_path: str, output_path: str, output_name: str) -> list[str]:
main_go = os.path.join(REPO_ROOT, "cmd", "console", "main.go")

return [
"go", "run", main_go,
"scan",
"-p", scan_path,
"-o", output_path,
"--output-name", output_name,
"-i", query_id,
"-d", payload_path,
"-v",
"--experimental-queries",
"--bom",
"--enable-openapi-refs",
"--kics_compute_new_simid"
]


def run_scan(query_id: str, scan_path: str, payload_path: str, output_path: str, output_name: str) -> int:
command = build_command(query_id, scan_path, payload_path, output_path, output_name)

print("Running command:")
print(" ".join(command))
print("-" * 60)

try:
result = subprocess.run(command, cwd=REPO_ROOT)
if result.returncode not in KICS_RESULT_CODES:
print(f"\n[ERROR] Scan failed with return code {result.returncode}.", file=sys.stderr)
return result.returncode
except FileNotFoundError:
print("\n[ERROR] 'go' not found. Make sure Go is installed and in your PATH.", file=sys.stderr)
return 1


def find_positive_tests(query_path: str) -> list[tuple[str, str]]:
"""
Return a sorted list of (label, scan_path) for each positive test in test/.

Handles two layouts:
- File: test/positiveX.<ext> → label='positiveX', scan_path=the file
- Directory: test/positiveX/ → for each positiveX_Y.<ext> inside,
label='positiveX_Y', scan_path=the file
"""
test_dir = os.path.join(query_path, "test")
if not os.path.isdir(test_dir):
return []

positives = []
for entry in os.listdir(test_dir):
if not entry.startswith("positive"):
continue
full_path = os.path.join(test_dir, entry)
if os.path.isdir(full_path):
# Directory: positiveX/ — scan each file inside individually
for file in os.listdir(full_path):
file_path = os.path.join(full_path, file)
if not os.path.isfile(file_path):
continue
label = os.path.splitext(file)[0] # e.g. 'positive2_1'
after = label[len("positive"):]
if not after or not after[0].isdigit(): # skip positive_expected_result etc.
continue
positives.append((label, file_path))
else:
# File: positive.<ext> or positiveX.<ext>
suffix = entry[len("positive"):].split(".")[0]
if suffix and not suffix.isdigit():
continue # skip positive_expected_result.json etc.
positives.append((f"positive{suffix}", full_path))

positives.sort(key=lambda x: x[0])
return positives


def run_query_scans(query_id: str, query_path: str) -> tuple[list[tuple[str, str, int]], bool]:
positives = find_positive_tests(query_path)
if not positives:
print(f"[WARN] No positive tests found in {query_path}/test, skipping.", file=sys.stderr)
return [], False

payloads_dir = os.path.join(query_path, "payloads")
os.makedirs(payloads_dir, exist_ok=True)

output_path = os.path.join(query_path, "results") + os.sep
os.makedirs(output_path, exist_ok=True)

failed = []
for label, scan_path in positives:
payload_path = os.path.join(payloads_dir, f"{label}.json")
output_name = f"{label}.json"
print(f"\n -> {label}: {os.path.relpath(scan_path, REPO_ROOT)}")
rc = run_scan(query_id, scan_path, payload_path, output_path, output_name)
if rc not in KICS_RESULT_CODES:
failed.append((scan_path, payload_path, rc))

written = collect_and_write_expected_results(query_path)
return failed, written


def collect_and_write_expected_results(query_path: str) -> bool:
"""
Read all positive*.json result files from results/, extract findings,
sort by (fileName, line, issueType, searchKey, similarityID), and write
test/positive_expected_result.json. Returns True if the file was written.
"""
results_dir = os.path.join(query_path, "results")
if not os.path.isdir(results_dir):
return False

entries = []
for filename in sorted(os.listdir(results_dir)):
if not filename.startswith("positive") or not filename.endswith(".json"):
continue
with open(os.path.join(results_dir, filename), encoding="utf-8") as f:
data = json.load(f)

all_findings = data.get("queries", []) + data.get("bill_of_materials", [])
for query in all_findings:
query_name = query.get("query_name", "")
severity = query.get("severity", "")
for file_entry in query.get("files", []):
entry = {
"queryName": query_name,
"severity": severity,
"line": file_entry.get("line", 0),
"fileName": os.path.basename(file_entry.get("file_name", "")),
"resourceType": file_entry.get("resource_type", ""),
"resourceName": file_entry.get("resource_name", ""),
"searchKey": file_entry.get("search_key", ""),
"searchValue": file_entry.get("search_value", ""),
"expectedValue":file_entry.get("expected_value", ""),
"actualValue": file_entry.get("actual_value", ""),
"issueType": file_entry.get("issue_type", ""),
"similarityID": file_entry.get("similarity_id", ""),
"search_line": file_entry.get("search_line", 0),
}
entries.append({k: entry[k] for k in FIELD_ORDER})

if not entries:
return False

entries.sort(key=lambda x: (
x["fileName"], x["line"], x["issueType"], x["searchKey"], x["similarityID"]
))

out_path = os.path.join(query_path, "test", "positive_expected_result.json")
with open(out_path, "w", encoding="utf-8") as f:
json.dump(entries, f, indent=2)
f.write("\n")

print(f" -> Written {len(entries)} entries to {os.path.relpath(out_path, REPO_ROOT)}")
return True


def iter_queries():
"""Yield (query_id, query_path) for every query found under assets/queries."""
for dirpath, _, filenames in os.walk(QUERIES_DIR):
if "metadata.json" not in filenames:
continue
metadata = os.path.join(dirpath, "metadata.json")
with open(metadata, encoding="utf-8") as f:
data = json.load(f)
query_id = data.get("id")
if not query_id:
print(f"[WARN] No 'id' field in {metadata}, skipping.", file=sys.stderr)
continue
yield query_id, dirpath


def main():
args = parse_args()

if args.run_all:
all_failed = []
written_count = 0
queries = list(iter_queries())
total = len(queries)
width = len(str(total))
print(f"Found {total} queries. Starting scans...\n")
for idx, (query_id, query_path) in enumerate(queries, start=1):
print(f"\n[{idx:{width}d}/{total}] {os.path.relpath(query_path, REPO_ROOT)}")
failed, written = run_query_scans(query_id, query_path)
all_failed.extend(failed)
if written:
written_count += 1

print("\n" + "=" * 60)
print(f"[SUMMARY] {written_count}/{total} positive_expected_result.json written")
if all_failed:
print(f" {len(all_failed)} scan(s) failed:")
for scan_path, payload_path, rc in all_failed:
print(f" - {os.path.relpath(scan_path, REPO_ROOT)} → exit {rc}")
sys.exit(1)
else:
print(" All scans completed successfully.")
sys.exit(0)
else:
if not args.queryPath:
print("[ERROR] --queryPath is required when not using --run-all.", file=sys.stderr)
sys.exit(1)
query_path = os.path.normpath(os.path.join(REPO_ROOT, args.queryPath))
failed, _ = run_query_scans(args.queryID, query_path)
sys.exit(1 if failed else 0)


if __name__ == "__main__":
main()
94 changes: 94 additions & 0 deletions docs/creating-queries.md
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,100 @@ If the **query.rego** file implements more than one query, the **metadata.json**
}
```

Filling positive_expected_result.json:

The `positive_expected_result.json` file is a JSON array where each entry represents a single expected finding from a positive test file. Each entry supports the following fields:

- `queryName` the name of the query as defined in `metadata.json`
- `severity` the severity level of the finding (`CRITICAL`, `HIGH`, `MEDIUM`, `LOW`, or `INFO`)
- `line` the line number in the positive test file where the vulnerability is detected
- `fileName` the name of the positive test file (e.g., `positive1.tf`, `positive.yaml`)
- `resourceType` the type of the resource flagged by the finding (e.g., `aws_cloudtrail`, `community.aws.elb_application_lb`)
- `resourceName` the name or label of the specific resource instance
- `searchKey` the search key path used by KICS to locate the vulnerability in the original document
- `searchValue` an additional value used to distinguish findings when multiple results point to the same line
- `expectedValue` a description of the expected (secure) value
- `actualValue` a description of the actual (insecure) value detected
- `issueType` the type of issue: `IncorrectValue`, `MissingAttribute`, or `RedundantAttribute`
- `similarityID` a hash that uniquely identifies the finding, used for deduplication and tracking
- `search_line` the search line path used by KICS for line detection; set to `-1` when not applicable

Example:

```json
[
{
"queryName": "Authentication Without MFA",
"severity": "LOW",
"line": 2,
"fileName": "positive.yaml",
"resourceType": "community.aws.sts_assume_role",
"resourceName": "Assume an existing role",
"searchKey": "name={{Assume an existing role}}.{{community.aws.sts_assume_role}}",
"searchValue": "mfa_token",
"expectedValue": "sts_assume_role.mfa_token should be set",
"actualValue": "sts_assume_role.mfa_token is undefined",
"issueType": "MissingAttribute",
"similarityID": "0863129177e5f7d0f0fc55d63426f810f58f35c1270b64f4b57fbd1d8a3639cc",
"search_line": 2
},
{
"queryName": "Authentication Without MFA",
"severity": "LOW",
"line": 9,
"fileName": "positive.yaml",
"resourceType": "sts_assume_role",
"resourceName": "Hello",
"searchKey": "name={{Hello}}.{{sts_assume_role}}",
"searchValue": "mfa_serial_number",
"expectedValue": "sts_assume_role.mfa_serial_number should be set",
"actualValue": "sts_assume_role.mfa_serial_number is undefined",
"issueType": "MissingAttribute",
"similarityID": "89628f77eee62d856d5523656cdcbc1be1bfca9a1aaed79ffa9871979c947202",
"search_line": 9
}
]
```

Instead of filling this file manually, you can use the helper script provided at `.github/scripts/generate-positive-expected-results/generate_positive_expected_result.py`. The script runs a KICS scan against each positive test file, collects the findings, and produces a correctly formatted `positive_expected_result.json`.

**Important:** The script must be run from the **script's own directory** (`.github/scripts/generate-positive-expected-results/`), since it resolves the repository root and all other paths relative to its own location. It also requires **Go** to be installed and available in your `PATH`.

The script supports two modes of operation:

**Single query mode** — requires both `--queryID` and `--queryPath`:

```bash
cd .github/scripts/generate-positive-expected-results/
python generate_positive_expected_result.py \
--queryID <query-uuid> \
--queryPath <relative-path-to-query>
```

For example:

```bash
cd .github/scripts/generate-positive-expected-results/
python generate_positive_expected_result.py \
--queryID "8173d5eb-96b5-4aa6-a71b-ecfa153c123d" \
--queryPath "assets/queries/terraform/aws/cloudtrail_multi_region_disabled"
```

**All queries mode** — scans every query under `assets/queries/`:

```bash
cd .github/scripts/generate-positive-expected-results/
python generate_positive_expected_result.py --run-all
```

| Flag | Required | Description |
|---|---|---|
| `--queryID` | Yes (unless `--run-all`) | The UUID of the query to scan, found in the query's `metadata.json` under the `id` field. |
| `--queryPath` | Yes (unless `--run-all`) | The relative path (from the repository root) to the query directory containing `query.rego` and `metadata.json`. |
| `--run-all` | No | Iterates over all queries under `assets/queries/`, reading each `metadata.json` to obtain the query ID automatically. Mutually exclusive with `--queryID`. |

The script discovers positive test files in the query's `test/` directory, runs a KICS scan for each one, collects and merges the findings, sorts them by file name, line number, issue type, search key, and similarity ID, and writes the result to `test/positive_expected_result.json`.

Filling query.rego:

- `documentId` id of the sample where the vulnerability occurs
Expand Down
Loading