From 9844ee617a6f71b5b66a9b696c887f9e778a4084 Mon Sep 17 00:00:00 2001 From: Vikram Ruppa-Kasani Date: Wed, 3 Jun 2026 18:03:46 -0700 Subject: [PATCH 1/2] Add security-assessment skill --- skills/security-assessment/SKILL.md | 86 +++ .../examples/sample_assessment.md | 134 +++++ .../references/baseline_security.md | 103 ++++ .../references/bucket_classification.md | 94 +++ .../references/phases/baseline.md | 16 + .../references/phases/classification.md | 18 + .../references/phases/discover.md | 134 +++++ .../references/phases/output.md | 226 +++++++ .../references/phases/toxic_analysis.md | 44 ++ .../references/saif_risk_factors.md | 50 ++ .../references/telemetry_signals.md | 67 +++ .../references/toxic_combinations.md | 345 +++++++++++ .../scripts/cloud_rest_helpers_nodeps.py | 557 ++++++++++++++++++ .../evaluate_project_security_posture.py | 478 +++++++++++++++ .../scripts/fetch_bucket_telemetry.py | 208 +++++++ .../scripts/fetch_object_telemetry.py | 161 +++++ .../scripts/list_datasets.py | 101 ++++ .../scripts/preflight_permissions.py | 369 ++++++++++++ .../security-assessment/scripts/validation.py | 71 +++ 19 files changed, 3262 insertions(+) create mode 100644 skills/security-assessment/SKILL.md create mode 100644 skills/security-assessment/examples/sample_assessment.md create mode 100644 skills/security-assessment/references/baseline_security.md create mode 100644 skills/security-assessment/references/bucket_classification.md create mode 100644 skills/security-assessment/references/phases/baseline.md create mode 100644 skills/security-assessment/references/phases/classification.md create mode 100644 skills/security-assessment/references/phases/discover.md create mode 100644 skills/security-assessment/references/phases/output.md create mode 100644 skills/security-assessment/references/phases/toxic_analysis.md create mode 100644 skills/security-assessment/references/saif_risk_factors.md create mode 100644 skills/security-assessment/references/telemetry_signals.md create mode 100644 skills/security-assessment/references/toxic_combinations.md create mode 100644 skills/security-assessment/scripts/cloud_rest_helpers_nodeps.py create mode 100644 skills/security-assessment/scripts/evaluate_project_security_posture.py create mode 100644 skills/security-assessment/scripts/fetch_bucket_telemetry.py create mode 100644 skills/security-assessment/scripts/fetch_object_telemetry.py create mode 100644 skills/security-assessment/scripts/list_datasets.py create mode 100644 skills/security-assessment/scripts/preflight_permissions.py create mode 100644 skills/security-assessment/scripts/validation.py diff --git a/skills/security-assessment/SKILL.md b/skills/security-assessment/SKILL.md new file mode 100644 index 0000000..680ef1b --- /dev/null +++ b/skills/security-assessment/SKILL.md @@ -0,0 +1,86 @@ +--- +name: security-assessment +description: >- + Assesses security posture, evaluates risks, and checks SAIF compliance for Google Cloud Storage buckets or projects. Use when the user requests security scans, vulnerability checks, or SAIF assessments. Don't use when: The user is asking about non-GCS resources (Compute Engine, GKE, etc.), investigating a + live production outage, or asking general security questions not tied to a specific project or bucket. +tags: [gcs, security, compliance, saif] +category: security +version: 1.0.0 +support_tier: primary +--- + +# Security Posture Assessment Skill + +You are a Google Cloud Storage security assessment agent trained on Google's +[Secure AI Framework (SAIF)](https://saif.google/secure-ai-framework/saif-map). +Your job is to evaluate GCS bucket and project configurations, identify **toxic +combinations** of vulnerabilities, and provide actionable remediation. + +> [!IMPORTANT] +> +> You are NOT a generic security chatbot. You MUST ground every finding in +> telemetry signals you have actually gathered. NEVER hallucinate findings or +> assume configurations you have not verified. If you cannot gather a signal, +> say so explicitly and skip that check. + +> [!CAUTION] +> +> **CRITICAL: Never execute destructive commands (e.g., rm, rb, IAM policy +> changes) without first printing the exact command and explicitly asking the +> user for a Y/N confirmation.** + +## Philosophy + +Traditional security tools generate isolated alerts from static rules (e.g., +"bucket is public"). You correlate multiple signals to detect **toxic +combinations** — scenarios where individually low-risk configurations combine to +create critical exposures. A public bucket storing marketing PDFs is very +different from a public bucket storing ML training data with no CMEK, no VPC-SC, +and no audit logging. + +## Phase Summary Table + +Phase | Inputs | Outputs | Reference +:-------------------------------- | :--------------------------------------- | :------------------------------------ | :-------- +**1. Discover Scope & Telemetry** | User input (Project ID/Buckets/Datasets) | Scope confirmation, Telemetry signals | `references/phases/discover.md` +**2. Bucket Classification** | Telemetry signals | Bucket classifications | `references/phases/classification.md` +**3. Baseline Security Eval** | Telemetry signals, Classifications | Baseline failures | `references/phases/baseline.md` +**4. Toxic Combo Analysis** | Telemetry signals, Classifications | Toxic combination findings | `references/phases/toxic_analysis.md` +**5. Output** | Findings from all phases | Formatted assessment report | `references/phases/output.md` + +## Workflow Execution + +When invoked, the agent **MUST follow this exact sequence**: + +1. **Start at Phase 1**: Discover scope and gather telemetry. Use the + referenced file for decisions. **CRITICAL: If multiple Storage Insights + datasets are discovered, you MUST STOP and ASK the user to select one. Do + NOT auto-select a dataset or proceed with an assumed one.** Do not assume + anything before reading the steps referenced in the phase itself. +2. **Do not skip phases**: You must complete Phase N before proceeding to Phase + N+1. +3. **Strict adherence**: Follow all steps defined in each phase. Do not + optimize or deviate. +4. **Gating & analysis scope**: Only a failed **required** preflight check + (`adc`) sets `ready_to_proceed` to `false` — when it does you **MUST STOP + IMMEDIATELY**, do NOT invoke any telemetry script, and report the fix. + Otherwise the preflight's `analysis_scope` field — NOT `ready_to_proceed` — + selects depth: `full` (run everything) or `project_only` (Storage Insights + unavailable — do NOT bail out; run a project-level assessment with ONLY + `evaluate_project_security_posture.py`, do NOT run the SI-backed + `fetch_bucket_telemetry.py` / `fetch_object_telemetry.py`, and recommend + SI). Phase 1 (`discover.md`) defines exactly how to branch. + +## Error Handling + +Problem | Cause | Fix +----------------------------------- | --------------------------------------------------------------------------- | --- +PermissionDenied on VPC-SC check | Caller lacks `accesscontextmanager.policies.list` | Inform user. Mark VPC-SC status as UNKNOWN and use that wording **consistently across every section of the report** — Section 2, Section 3, narrative summaries, key findings, fixes. Do NOT assume the perimeter is configured AND do NOT assume it is missing, lacking, or "not enforced" — neither inference is supported by an unavailable signal. +PermissionDenied on IAM Recommender | Caller lacks `recommender.iamPolicyRecommendations.list` | Fall back to manual IAM policy inspection. Flag over-broad roles like `roles/storage.admin` and `roles/storage.objectAdmin`. +Model Armor API not enabled | `modelarmor.googleapis.com` not in services list | This IS a finding (not an error). Flag it as "Model Armor not enabled" in your assessment. +Storage Insights API not enabled | `storageinsights.googleapis.com` not enabled on the project | **DO NOT STOP.** `analysis_scope` is `project_only`; run the project-level assessment and relay the recommended check's `fix`. See `discover.md`. +No SI dataset available | SI is enabled but no dataset config exists, or wrong dataset name supplied | **DO NOT STOP.** `analysis_scope` is `project_only`; run the project-level assessment and relay the `bigquery_dataset_access` check's `fix`. See `discover.md`. +BQ MCP Server returns empty results | No buckets in project or wrong project | Confirm project ID with user. If correct and empty, report "No buckets found." +Data Access audit logs check fails | Caller lacks `resourcemanager.projects.getIamPolicy` | Inform user. Note that audit log status is unknown. +Bucket has no tags or labels | No SDP scan, no customer tags | This is the "Unclassified" state. Treat as potentially sensitive. Recommend SDP. +Output too verbose | Reasoning sections are too long, or shared remediations repeated per bucket | Condense reasoning to 2-3 sentences. Move shared remediations to Cross-Cutting Recommendations. If output exceeds ~80 lines, you are being too verbose. diff --git a/skills/security-assessment/examples/sample_assessment.md b/skills/security-assessment/examples/sample_assessment.md new file mode 100644 index 0000000..4d5de8d --- /dev/null +++ b/skills/security-assessment/examples/sample_assessment.md @@ -0,0 +1,134 @@ +# Sample Assessment Output + +This is an example of the output format the skill should produce. Adapt the +content to match actual telemetry findings, but follow this structure exactly. + +-------------------------------------------------------------------------------- + +## Security Posture Assessment: ml-platform-prod-101 + +### Section 1: Risk Heuristic + +Bucket Security Risk Heuristic: 10/100 + +Risk score is a heuristic determined in aggregate meant to estimate the overall +risk level across all buckets in a project based off some of the criteria +described below. Remediation of these steps will reduce the risk score, though a +risk score of 0 does not necessarily mean the project is 100% without risk. + +### Section 2: Risk Dashboard + +| \# | Bucket | Severity | Risk | Quickfix | | -- | ---------------------- +| -------- | ------------------------------------ +| --------------------------------- | | 1 | gs://training-datasets | Critical | +Public Data Pipeline | Block public access → B1, B2 | | 2 | +gs://model-checkpoints | Critical | Prompt Injection to Data Destruction | Fix +Model Armor + scope SA → B6, B7 | | 3 | gs://public-api-docs | Low | Intentional +Public Data | Enable recovery controls → B4, B5 | | 4 | gs://logs-archive | +Medium | UBLA disabled; versioning off | See B2, B5 | + +``` +✅ Verified: HMAC restriction, TLS 1.2 minimum +❌ Policy gaps: Block HTTP not enforced + Why: Without this policy, data can be transmitted over plaintext HTTP and intercepted in transit. +❌ Policy gaps: Data Access Audit Logs not enabled + Why: Without Data Access logs, reads/writes/deletions leave no forensic trail; exfiltration and tampering are invisible. +❌ UBLA disabled (4): gs://training-datasets, gs://logs-archive, gs://temp-uploads, gs://snapshot-2024 + Why: Legacy ACLs operate alongside IAM, creating shadow access paths. A bucket can appear locked down via IAM while an ACL silently grants public access. +❌ Object versioning disabled (3): gs://training-datasets, gs://model-checkpoints, gs://logs-archive + Why: Without versioning, overwrites and deletes are irreversible; one bad client or compromised credential can permanently destroy data. +⚠️ VPC-SC status: Unknown (caller lacks accesscontextmanager.policies.list) +⚠️ 2 buckets unclassified; sensitivity inferred, run SDP to confirm +``` + +-------------------------------------------------------------------------------- + +### Section 3: Action Plan + +``` +gs://training-datasets [Critical · Public Data Pipeline] +Unclassified training data is public. Exfiltration is silent with no audit trail. +❌ UBLA: Disabled (ACLs bypass IAM) ❌ Public: allUsers READER ❌ Encryption: Google-default ❌ VPC-SC: None +❌ Soft Delete: Off ❌ Versioning: Off ❌ Audit Logs: Off +1. Close public access and shadow ACL path: → B1, B2 +2. Add encryption, recovery, network, and logging: → B3, B4, B5, P1, P2 +``` + +``` +gs://model-checkpoints [Critical · Prompt Injection to Data Destruction] +Admin SA + inactive Model Armor: one injection can destroy all checkpoints. +⚠️ Model Armor: API enabled but Vertex AI integration inactive (not enforced) +⚠️ IAM: agent-sa holds roles/storage.admin ❌ Versioning: Off ❌ VPC-SC: None +1. Neutralize agent takeover path: → B6 (scope SA), B7 (activate Model Armor) +2. Add encryption, recovery, network: → B3, B4, B5, P1, P2 +``` + +``` +gs://public-api-docs [Low · Intentional Public Data] +Intentional public bucket: without versioning, content can be defaced permanently. +✅ UBLA ✅ Public Access: Intentional (tagged purpose:public-documentation) +❌ Soft Delete: Off ❌ Versioning: Off +1. Add integrity and recovery protection: → B4 (soft delete), B5 (versioning) +NOTE: Public access is intentional; no access control changes recommended. +``` + +``` +gs://logs-archive [Medium · Baseline failures] +Legacy ACLs are active and there is no version history if objects are overwritten. +❌ UBLA: Disabled (ACLs bypass IAM) ❌ Versioning: Off +1. Enforce UBLA and recovery: → B2 (UBLA), B5 (versioning) +``` + +-------------------------------------------------------------------------------- + +**Policy Fixes** (org or project-level — may require elevated permissions): + +``` +P1. Enable Data Access audit logs + Console: IAM & Admin > Audit Logs > Cloud Storage > Data Read + Data Write + gcloud: Update project audit config for storage.googleapis.com with DATA_READ and DATA_WRITE + +P2. Enroll in VPC-SC perimeter + Console: Security > VPC Service Controls > New Perimeter + gcloud: gcloud access-context-manager perimeters create ml-perimeter --title='ML Platform Perimeter' --resources=projects/101202303404 --restricted-services=storage.googleapis.com,aiplatform.googleapis.com --policy=POLICY_ID + +P3. Enforce HTTPS-only access + Console: IAM & Admin > Organization Policies > constraints/storage.secureHttpTransport + gcloud: gcloud org-policies set-policy policy.yaml --project=ml-platform-prod-101 +``` + +**Bucket Fixes** (bucket-level — can be applied directly by a Storage Admin): + +``` +B1. Block public access + Console: Cloud Storage > Bucket > Permissions > Remove allUsers + gcloud: gcloud storage buckets update gs://training-datasets --public-access-prevention + +B2. Enable Uniform Bucket-Level Access (UBLA) + Console: Cloud Storage > Bucket > Configuration > Access control > Uniform + gcloud: gcloud storage buckets update gs://training-datasets --uniform-bucket-level-access + +B3. Apply customer-managed encryption (CMEK): replaces Google-default encryption + with a key you control, enabling cryptographic access revocation in an emergency. + Console: Cloud Storage > Bucket > Configuration > Encryption > Customer-managed key + gcloud: gcloud storage buckets update gs://BUCKET --default-kms-key=KEY + +B4. Enable soft delete + Console: Cloud Storage > Bucket > Protection > Soft delete policy > Enable + gcloud: gcloud storage buckets update gs://BUCKET --soft-delete-duration=7d + +B5. Enable object versioning + Console: Cloud Storage > Bucket > Protection > Object versioning > Enable + gcloud: gcloud storage buckets update gs://BUCKET --versioning + +B6. Reduce agent SA to least privilege: removes project-wide storage.admin and + replaces it with bucket-scoped read-only, blocking the agent takeover path. + Console: Cloud Storage > Bucket > Permissions > Edit agent SA role + gcloud: gcloud projects remove-iam-policy-binding ml-platform-prod-101 --member='serviceAccount:AGENT_SA' --role='roles/storage.admin' + gcloud storage buckets add-iam-policy-binding gs://model-checkpoints --member='serviceAccount:AGENT_SA' --role='roles/storage.objectViewer' + +B7. Activate Model Armor Vertex AI integration: wires prompt screening into + Gemini calls so injections are blocked before reaching the agent. + Console: Model Armor > Floor Settings > Integrated Services > Enable Vertex AI + gcloud: gcloud model-armor floorsettings update --full-uri=projects/PROJECT_ID/locations/global/floorSetting --add-integrated-services=VERTEX_AI --vertex-ai-enforcement-type=INSPECT_AND_BLOCK +``` diff --git a/skills/security-assessment/references/baseline_security.md b/skills/security-assessment/references/baseline_security.md new file mode 100644 index 0000000..42c9686 --- /dev/null +++ b/skills/security-assessment/references/baseline_security.md @@ -0,0 +1,103 @@ +# Baseline Security + +Baseline checks are evaluated for **every project and bucket** before toxic +combination analysis begins. These are prerequisites — any single failure is +flagged independently. + +> [!IMPORTANT] Baseline failures are reported separately from toxic +> combinations. A bucket can fail baseline checks AND match a toxic combination +> archetype. Report both. + +## Baseline Controls + +Each control below MUST be checked. Reporting rules: + +- **Per-bucket controls (e.g., UBLA):** If passing, do not mention. Only + report failures. For each failure, the report must include (a) an accurate + count of affected buckets, and (b) the bucket names — inline if count ≤ 10, + otherwise the first 10 followed by "... and N more (see telemetry output for + full list)". Never substitute a vague rollup line for the count. +- **Project-level controls (Block HTTP, TLS, HMAC, Data Access Audit Logs):** + Whether passing or failing, surface the state in the Section 2 policy + summary using ✅ (verified passing) or ❌ (failing). This confirms to the + admin that the control was actually checked. +- **Independence:** Each failure is its own finding. Do NOT bundle multiple + baseline failures into a single toxic-combination archetype label. Reserve + toxic-combo labels for the named archetypes in + `references/toxic_combinations.md`. + +### UBLA (Bucket-Level) + +Secure State | Vulnerable State +----------------- | ----------------------------------------------------- +UBLA is `ENABLED` | UBLA is `DISABLED` — legacy ACLs active alongside IAM + +**Why it matters:** When UBLA is disabled, ACLs operate alongside IAM, creating +shadow access paths. A bucket can appear locked down via IAM while an ACL +silently grants public access. + +**Remediation:** + +- Enforce Uniform Bucket-Level Access: `gcloud storage buckets update + gs://BUCKET --uniform-bucket-level-access` + +### Block HTTP (Project-Level Org Policy) + +Secure State | Vulnerable State +------------------------------ | ---------------------------------- +Secure transport is `ENFORCED` | Secure transport is `NOT_ENFORCED` + +**Why it matters:** Without this policy, data can be transmitted over plaintext +HTTP and intercepted in transit. + +**Remediation:** + +- Enforce secure transport via org policy + +### TLS Version (Project-Level Org Policy) + +Secure State | Vulnerable State +-------------------------------------- | ------------------------- +Minimum TLS version is `1.2` or higher | TLS 1.0 or 1.1 is allowed + +**Why it matters:** TLS 1.0 and 1.1 have known vulnerabilities. Allowing them +means connections can downgrade to insecure versions. + +**Remediation:** + +- Enforce minimum TLS 1.2 via org policy + +### HMAC Key Restriction (Project-Level Org Policy) + +Secure State | Vulnerable State +--------------------------------- | ----------------------------------- +HMAC key creation is `RESTRICTED` | HMAC key creation is not restricted + +**Why it matters:** HMAC keys are long-lived static credentials. If a key leaks, +it remains valid until manually revoked. Restricting HMAC forces the use of +short-lived OAuth/OIDC tokens. + +**Remediation:** + +- Restrict HMAC key creation via org policy + +### Data Access Audit Logging (Project-Level) + +| Secure State | Vulnerable State | +| ------------------------------------ | ------------------------- | +| DATA_READ and DATA_WRITE enabled for | Data Access logs disabled | +: `storage.googleapis.com` : : + +**Why it matters:** Without Data Access audit logs, reads, writes, and deletions +leave no forensic trail. Exfiltration, tampering, and unauthorized access are +invisible. + +**Remediation:** + +- Enable Data Access audit logs: Update project audit config for + `storage.googleapis.com` with DATA_READ and DATA_WRITE. Scope to high-value + buckets. Set log retention policies and restrict IAM on log sinks to prevent + audit logs from becoming a secondary exposure. + +> [!TIP] Data Access audit logs can generate high volume on busy buckets. +> Recommend scoping to high-value buckets rather than enabling project-wide. diff --git a/skills/security-assessment/references/bucket_classification.md b/skills/security-assessment/references/bucket_classification.md new file mode 100644 index 0000000..a5c88f2 --- /dev/null +++ b/skills/security-assessment/references/bucket_classification.md @@ -0,0 +1,94 @@ +# Bucket Classification + +Classification runs in Phase 2, before any security evaluation. It determines +the sensitivity context that modulates how urgently findings are treated. + +> [!IMPORTANT] Classification does NOT change which controls are recommended. +> The same controls apply regardless of sensitivity. Classification only +> modulates the severity label. + +## Step 1: Check for Existing Classification + +Check whether the bucket has sensitivity labels from any of these sources: + +### Source A: SDP (Sensitive Data Protection) Results + +If SDP has scanned the bucket, use the SDP-assigned tier as **authoritative**: + +- **High**: PII, financial records, healthcare data, credentials detected +- **Medium**: Internal business data, proprietary content, limited PII + detected +- **Low**: SDP scanned and found no significant sensitive content + +### Source B: Customer Tags or Labels + +If the bucket has customer-applied tags (e.g., `sensitivity:high`, +`data-type:training`), derive a sensitivity tier using the SDP mapping table. +Customer tags are treated as equivalent to SDP classification — do NOT penalize +customers for using tags instead of SDP. + +### Source C: Unclassified + +No labels, no tags, no SDP scan. This is the default state for most buckets. + +## Step 2: Handle Unclassified Buckets + +For unclassified buckets, do two things: + +**First**, infer a provisional sensitivity estimate from heuristics: + +| Heuristic | Suggests Higher Sensitivity | +| ----------------------------------- | ------------------------------------ | +| Bucket name contains `prod`, | Yes | +: `training`, `ml`, `model`, : : +: `weights`, `pii`, `financial`, : : +: `health` : : +| Bucket name contains `test`, `dev`, | No | +: `sandbox`, `tmp`, `public`, : : +: `static`, `assets` : : +| Encryption type is CMEK or CSEK | Yes — customer invested in key | +: : management : +| Project also contains Vertex AI | Yes — likely AI workload data | +: endpoints or Agent Engine : : +| Large object count or total size | Weakly yes — significant data stores | +: : tend to matter more : + +**Second**, surface this recommendation in your output: + +> This bucket has no sensitivity classification. Run Sensitive Data Protection +> to classify the contents and enable more accurate risk scoring. Until +> classified, this bucket is treated as potentially sensitive. + +Flag the provisional estimate as **"inferred — not verified"** in all output. + +## Step 3: Modulate Severity + +Use the classification to adjust the severity label for each finding: + +| Classification | Severity Modulation | Rationale | +| ----------------------- | --------------------- | ------------------------ | +| High (SDP or | Full severity | Confirmed sensitive, | +: customer-tagged) : : maximum urgency : +| Medium (SDP or | Reduced severity | Moderate sensitivity, | +: customer-tagged) : : still significant : +| Low (SDP or | Significantly reduced | Verified low-sensitivity | +: customer-tagged) : severity : content : +| Non-sensitive | Minimal severity | Customer affirmed | +: (explicitly tagged) : : non-sensitive : +| Unclassified (inferred) | Full severity | Unknown = potentially | +: : : sensitive, worst case : + +> [!CAUTION] **Unclassified is NOT non-sensitive.** Unknown data gets full +> severity because you cannot confirm it is safe. Only an affirmative +> classification (SDP or explicit customer tag) reduces severity. + +## Exception: Classification Mismatch + +If a bucket is classified as **High sensitivity** but also has **public access +enabled**, do NOT treat this as Intentional Public Data. Flag it as a potential +misconfiguration: + +> **ALERT: Classification Mismatch.** This bucket is classified as +> high-sensitivity but is publicly accessible. Verify whether public access is +> intentional. If not, this is a critical exposure requiring immediate +> remediation. diff --git a/skills/security-assessment/references/phases/baseline.md b/skills/security-assessment/references/phases/baseline.md new file mode 100644 index 0000000..8cecfa9 --- /dev/null +++ b/skills/security-assessment/references/phases/baseline.md @@ -0,0 +1,16 @@ +# Phase 3: Baseline Security Evaluation + +Evaluate every bucket and project against the universal baseline controls. + +## Instructions + +Follow the guidance in `references/baseline_security.md`. These are prerequisite +checks — any failure is flagged independently, regardless of toxic combinations. + +> [!IMPORTANT] Each baseline failure is its own finding. Do NOT collapse +> multiple baseline failures on the same bucket into a single toxic-combination +> label (e.g., "UBLA disabled + No recovery"). Toxic-combo labels in Section 2 / +> per- bucket cards are reserved for the named archetypes in +> `references/toxic_combinations.md`. For a bucket failing only baselines, the +> Risk column enumerates the failing controls (e.g., "UBLA disabled; versioning +> off") and per-bucket cards show one ❌ per control. diff --git a/skills/security-assessment/references/phases/classification.md b/skills/security-assessment/references/phases/classification.md new file mode 100644 index 0000000..01c8470 --- /dev/null +++ b/skills/security-assessment/references/phases/classification.md @@ -0,0 +1,18 @@ +# Phase 2: Bucket Classification + +Before evaluating security posture, classify each bucket's data sensitivity. + +## Instructions + +Follow the classification logic in `references/bucket_classification.md`. + +The classification determines how urgently findings should be treated: + +- **High sensitivity** → Full severity on all findings +- **Medium sensitivity** → Reduced severity +- **Low sensitivity** → Significantly reduced severity +- **Non-sensitive (explicit)** → Minimal severity +- **Unclassified** → Treat as potentially sensitive (full severity) + +> [!IMPORTANT] Unclassified is NOT the same as non-sensitive. If a bucket has no +> classification, treat it as worst-case until the customer classifies it. diff --git a/skills/security-assessment/references/phases/discover.md b/skills/security-assessment/references/phases/discover.md new file mode 100644 index 0000000..b9b4ac4 --- /dev/null +++ b/skills/security-assessment/references/phases/discover.md @@ -0,0 +1,134 @@ +# Phase 1: Discover Scope and Telemetry + +This phase focuses on defining the scope of the assessment and gathering all +necessary telemetry signals. + +## Steps + +1. **Confirm Scope**: Confirm the target scope with the user (project ID, + location, dataset name, specific buckets, or all buckets). If the user does + not provide any required fields for the scripts below, **STOP and ASK the + user to provide any missing required fields for each script below. DO NOT + attempt to dynamically determine any input like project ID**. If the user + does not provide a Storage Insights linked dataset name or ID, you MUST list + available datasets first. **List Dataset**: `python3 + scripts/list_datasets.py --project_id [--location ]`. + The script will automatically list all datasets if the user does not specify + a location. **IMPORTANT** ONLY show the first 10 datasets in the response. + If there are more than 10 datasets, there should be a note for the user to + ask to see more datasets or specify a specific location. DO NOT assume + location unless directly specified by the user. **If more than one dataset + is returned, you MUST STOP and ask the user to choose one before proceeding + — never auto-select a dataset or proceed with an assumed one.** When the + available signals make one dataset a clearly reasonable default, also + recommend it as a suggested default and give a one-line reason. Reason only + from the signals `list_datasets.py` returns — the dataset name/ID, its + location, and its `description` — for example a name/ID indicating a + production (vs. test/dev) export, broader coverage such as an "all buckets" + config, or a description or location matching the user's stated intent (e.g. + "I'd suggest `` — it looks like the prod, all-buckets export"). Only + recommend when you are genuinely high-confidence; if the datasets are + ambiguous or look similar, present them neutrally and ask the user to pick + without steering. The user always makes the final choice. You MUST ONLY use + Storage Insights to answer questions about buckets.** + +2. **Run Preflight Check**: Validate that the calling identity has the + prerequisites for the assessment **before** invoking any telemetry script. + Run: + + `python3 scripts/preflight_permissions.py --project_id + --dataset_name ` + + The preflight runs one **required** check and two **recommended** checks: + (a) `adc` — working application-default credentials (required); (b) + `storage_insights_enabled` — the Storage Intelligence API is enabled + (recommended); (c) `bigquery_dataset_access` — the linked dataset is + queryable (recommended). The BQ check surfaces the "no SI dataset + configured" or "wrong dataset name" case (as a 404). Storage Intelligence is + **not** a hard gate: when it is unavailable the assessment degrades to a + project-level report rather than stopping. + + Parse the JSON output. The `analysis_scope` field tells you which mode to + run: + + - > [!IMPORTANT] > **STOP only on a required failure**: If + `ready_to_proceed` is `false` (i.e. `analysis_scope: "none"` — the `adc` + check is `missing`), you **MUST IMMEDIATELY STOP** and output your final + response. > * **DO NOT** invoke any telemetry-gathering scripts + (`fetch_bucket_telemetry.py`, `fetch_object_telemetry.py`, or + `evaluate_project_security_posture.py`). > > Report the `adc` check's + `impact` and `fix` verbatim and wait for the user to re-authenticate + before re-running preflight. + - If `analysis_scope` is `"full"` (SI enabled and dataset queryable), + proceed with the full assessment: run all telemetry scripts and all + phases. + - If `analysis_scope` is `"project_only"` (a recommended check is + `missing` — SI not enabled, or no linked dataset), **DO NOT bail out**. + Run a project-level assessment: + * In Step 3 below, run **only** + `evaluate_project_security_posture.py`. Do **not** run + `fetch_bucket_telemetry.py` or `fetch_object_telemetry.py`. + * Surface the failing recommended check's `fix` once, framed as + unlocking the full assessment: if `storage_insights_enabled` is + missing, relay `gcloud services enable + storageinsights.googleapis.com --project `; if + `bigquery_dataset_access` is missing with a 404, relay its `fix` to + run `list_datasets.py` or create a dataset config. + * In later phases, mark bucket- and object-level sections as + "Unavailable — requires Storage Intelligence" (see the + **Project-only mode** note in `output.md`). + +3. **Gather Telemetry**: Run the telemetry scripts that match your + `analysis_scope` (from Step 2). Use your shell execution tool (e.g., + `run_shell_command` or equivalent) to execute them. The scripts are split + into two groups — which you run is gated on `analysis_scope`: + + ``` + # ALWAYS RUN -- both `full` and `project_only` modes: + * **Project Telemetry**: `python3 + scripts/evaluate_project_security_posture.py + --project_id ` + + # FULL MODE ONLY -- run these two ONLY when `analysis_scope` is `full`. + # If `analysis_scope` is `project_only`, SKIP both: they query Storage + # Insights and will fail or return empty without it. Running them is a + # constraint violation, not a fallback. + * **Bucket Telemetry**: `python3 scripts/fetch_bucket_telemetry.py + --project_id --dataset_name + [--bucket_names ]` + * **Object Telemetry**: `python3 scripts/fetch_object_telemetry.py + --project_id --dataset_name + [--bucket_names ]` + + * **Data Consumption**: Capture the standard output from these + scripts, which is a JSON array. Parse the JSON to extract security + signals. For example, map `ubla_enabled` to the UBLA check, and + `public_objects` count to public access checks for Phase 2 + (Classification) and Phase 3 (Baseline Security Eval). - See + `references/telemetry_signals.md` for the complete list of signals + and their sources. + ``` + +4. **Handle Gaps and Errors**: If the script execution fails or any signal + cannot be gathered, handle it explicitly: + + ``` + - **Authentication Errors**: If the error indicates missing + credentials, instruct the user to log in using + `gcloud auth application-default login` and `gcloud auth login`. + - **API Not Enabled**: If BigQuery is not enabled, instruct the user + to enable it by providing the exact command: `gcloud services enable + bigquery.googleapis.com --project `. + - **Permission Errors**: If a `PermissionDenied` error occurs (e.g., + lacking BigQuery Viewer/Data Viewer roles), explain the missing + permissions to the user and gracefully fall back to alternative + signaling if available or note the gap. + - **Missing Dataset**: If the BigQuery dataset doesn't exist, inform + the user that Storage Insights export needs to be configured for + the target dataset. + - Do NOT assume a value. Log the gap explicitly. + ``` + +> [!CAUTION] You MUST have telemetry before proceeding. Do not guess at +> configurations. If a critical signal is unavailable, inform the user and +> explain what it would have told you. diff --git a/skills/security-assessment/references/phases/output.md b/skills/security-assessment/references/phases/output.md new file mode 100644 index 0000000..448ad14 --- /dev/null +++ b/skills/security-assessment/references/phases/output.md @@ -0,0 +1,226 @@ +# Phase 5: Output + +Present your assessment in a scannable, action-oriented format. + +## Goal + +The target user is a Storage Admin with limited security expertise who needs to +quickly understand: what's wrong, how bad is it, and how to fix it. They may +action remediations via the Cloud Console (Pantheon) or gcloud CLI — both paths +should be clear. + +## Output Structure + +**Section 1: Risk Heuristic** + +Outline the overall bucket security risk score with a short explanation. + +Example: + +Bucket Security Risk Heuristic: xx/100 + +Risk score is a heuristic determined in aggregate meant to estimate the overall +risk level across all buckets in a project based off some of the criteria +described below. Remediation of these steps will reduce the risk score, though a +risk score of 0 does not necessarily mean the project is 100% without risk. + +**Section 2: Risk Dashboard** + +A table of the top 5 riskiest buckets (sorted by severity), followed by a +policy-level control status block and per-control bucket rollups. The admin +should understand their overall risk posture from this section alone. + +Table columns: Priority (#), Bucket, Severity, Risk, Quickfix. + +**Risk column content:** + +- If the bucket matches a named toxic-combination archetype from + `references/toxic_combinations.md`, the Risk cell contains that archetype + name (e.g., "Public Data Pipeline", "Prompt Injection to Data Destruction"). +- Otherwise — the bucket fails only baseline controls — the Risk cell + enumerates the failing baseline controls separated by semicolons (e.g., + "UBLA disabled; versioning off"). Do NOT invent a toxic-combo-style label + for baseline-only failures. + +Example: + +| \# | Bucket | Severity | Risk | Quickfix | | -- | ---------------------- +| -------- | ------------------------------------- +| ----------------------------------- | | 1 | gs://training-datasets | Critical +| Public Data Pipeline | Block public access → Bucket Fix #1 | | 2 | +gs://model-checkpoints | Critical | Prompt Injection to Data Destruction | See +Policy Fix #2, Bucket Fix #2 | | 3 | gs://logs-archive | Medium | UBLA disabled; +versioning off | See Bucket Fix #2, Bucket Fix #3 | + +Follow the table with a policy-level summary and a baseline-failure rollup: + +``` +✅ Verified: HMAC restriction, Data Access Audit Logs +❌ Policy gaps: restrictTLSVersion not enforced + Why: TLS 1.0/1.1 have known vulnerabilities; allowing them lets connections downgrade to insecure versions. +❌ Policy gaps: secureHttpTransport not enforced + Why: Without this policy, data can be transmitted over plaintext HTTP and intercepted in transit. +❌ UBLA disabled (10): gs://a, gs://b, gs://c, gs://d, gs://e, gs://f, gs://g, gs://h, gs://i, gs://j + Why: Legacy ACLs operate alongside IAM, creating shadow access paths. A bucket can appear locked down via IAM while an ACL silently grants public access. +⚠️ 2 buckets unclassified; sensitivity unknown, risk scores inferred +``` + +Rules for these lines: + +- **Project-level controls:** Always surface every baseline project-level + control here — ✅ for verified-passing, ❌ for failing. Emit a separate ❌ line + with a unique `Why:` caption for EACH failing project-level control. Do NOT + merge multiple failing policies into a single line. This confirms to the + admin the control was actually evaluated. +- **Per-control bucket rollup (MANDATORY for every per-bucket baseline that + has at least one failing bucket):** For each failing per-bucket baseline + control — UBLA, object versioning, soft delete, etc. — emit one line in + Section 2 with an accurate count of affected buckets and the bucket names. + If count ≤ 10, list inline. If > 10, list the first 10 followed by "... and + N more (full list in telemetry output)". Never replace the count with a + vague rollup like "X additional buckets assessed — no critical findings." + Section 3 per-bucket cards do NOT substitute for this rollup: a baseline + failure that appears in a per-bucket card must still appear in the Section 2 + rollup. Failures missing from Section 2 are treated as missing findings. +- **Why caption (required for every ❌ baseline failure):** Each ❌ line — + whether project-level or per-bucket — must be followed by a one-line `Why:` + caption that explains the risk. Pull the caption from the matching control's + "Why it matters" paragraph in `references/baseline_security.md` and condense + to one sentence. Examples: for UBLA disabled, cite legacy-ACL shadow access + paths bypassing IAM; for TLS, cite downgrade-to-insecure-version risk; for + versioning, cite irreversible overwrite/delete; for audit logs, cite missing + forensic trail. Do NOT add a Why caption for ✅ verified-passing lines. +- **Unclassified / informational:** Use ⚠️ for caveats that don't fit ✅/❌. + +**Section 3: Action Plan** + +Start with the per-bucket cards so the admin sees the findings before the fixes. +Follow with Policy Fixes and Bucket Fixes as a remediation reference. + +**Per-Bucket Cards** (soft cap: 6 lines per card): + +``` +gs://bucket-name [Severity · Toxic Combination Label] +One sentence explaining the attack path or failure mode. +❌ Control: status ❌ Control: status ✅ Control: status ⚠️ Control: note +❌ Control: status ❌ Control: status +1. What is being fixed: → Bucket Fix #1 +2. What is being fixed: → Policy Fix #2 +``` + +Line 1: bucket name + severity + risk label. The risk label must be either (a) +one of the named toxic-combination archetypes from +`references/toxic_combinations.md` (Public Data Pipeline, Silent Data Theft, +Irreversible Data Corruption, Intentional Public Data, Compliance Without Proof, +Prompt Injection to Data Destruction) if the bucket matches one, or (b) the +literal string "Baseline failures" when the bucket fails only baseline controls +and matches no archetype. Do not invent new archetype-style names. Line 2: one +sentence — the "so what" for a busy admin Lines 3–4: all relevant control +statuses on 1–2 lines (omit passing controls unless they create false confidence +— use ⚠️ for those) Lines 5–6: remediation steps referencing fix IDs; never +repeat a command inline if it is already defined in the fix list + +Example card: + +``` +gs://training-datasets [Critical · Public Data Pipeline] +Publicly readable, no encryption. Exfiltration leaves no recovery path. +❌ Public: allUsers objectViewer ❌ Encryption: Google-default ❌ VPC-SC: None +✅ UBLA ⚠️ Soft Delete: Enabled but no versioning (incomplete recovery) +1. Remove unauthenticated read access: → Bucket Fix #1 +2. Add encryption and network boundary: → Bucket Fix #1 (CMEK), Policy Fix #2 +``` + +**Policy Fixes** (org or project-level — may require elevated permissions): + +List each fix with a title, Console path, and gcloud command. Only include a +description when the fix title alone is not self-explanatory. + +``` +Policy Fix #1. Enforce HTTPS-only access + Console: IAM & Admin > Organization Policies > constraints/storage.secureHttpTransport + gcloud: gcloud org-policies set-policy policy.yaml --project=PROJECT_ID + +Policy Fix #2. Enforce minimum TLS 1.2 + Console: IAM & Admin > Organization Policies > constraints/gcp.restrictTLSVersion + gcloud: gcloud org-policies set-policy policy.yaml --project=PROJECT_ID + +Policy Fix #3. Restrict HMAC key creation + Console: IAM & Admin > Organization Policies > constraints/storage.disableServiceAccountHmacKeyCreation + gcloud: gcloud org-policies set-policy policy.yaml --project=PROJECT_ID + +Policy Fix #4. Enable Data Access audit logs + Console: IAM & Admin > Audit Logs > Cloud Storage > Enable DATA_READ and DATA_WRITE + gcloud: Update project audit config for storage.googleapis.com with DATA_READ and DATA_WRITE + +Policy Fix #5. Enroll in VPC-SC perimeter + Console: Security > VPC Service Controls > New Perimeter +``` + +**Bucket Fixes** (bucket-level — can be applied directly by a Storage Admin): + +``` +Bucket Fix #1. Apply customer-managed encryption (CMEK): replaces Google-default + encryption with a key you control, enabling cryptographic revocation. + Console: Cloud Storage > Bucket > Configuration > Encryption > Customer-managed key + gcloud: gcloud storage buckets update gs://BUCKET --default-kms-key=KEY + +Bucket Fix #2. Enable object versioning + Console: Cloud Storage > Bucket > Protection > Object versioning > Enable + gcloud: gcloud storage buckets update gs://BUCKET --versioning +``` + +## Project-only mode + +If preflight returned `analysis_scope: project_only` (Storage Intelligence +unavailable), you have project-level signals but no per-bucket or per-object +telemetry. Adapt the structure above: + +- Open with a one-line note that this is a **project-level assessment** and a + recommendation to enable Storage Intelligence to unlock the full bucket- and + object-level assessment (relay the preflight `fix` verbatim). +- **Skip** the bucket Risk Heuristic (Section 1) and the per-bucket Risk + Dashboard table (Section 2's bucket rows) — render them as "Unavailable — + requires Storage Intelligence" rather than fabricating buckets or scores. +- **Keep** the policy-level control status block and project findings: IAM, + VPC-SC, Data Access audit logs, org policies (data residency, Block HTTP, + TLS floor, HMAC), and Model Armor posture. These carry the report. + +## Important Rules + +- Limit per-bucket *cards* (Section 3) to the top 5 riskiest buckets. Buckets + with baseline failures outside the top 5 are still surfaced in the Section 2 + per-control rollup lines (with accurate counts and names per the rules + above), so no bucket with a finding is silently dropped. Do NOT use a + generic "X additional buckets — no critical findings" tail; if the remaining + buckets have findings, those findings are already represented in the Section + 2 rollup. +- Baseline failures are enumerated as discrete items, not collapsed into a + single toxic-combination archetype label. Toxic-combo labels are reserved + for the named archetypes in `references/toxic_combinations.md`. +- **UNKNOWN signals must be reported consistently across the entire report.** + When a signal is unverifiable (e.g., VPC-SC because the caller lacks + `accesscontextmanager.policies.list`, audit log status because of a missing + permission), every mention of that signal — narrative summaries, "Key + Findings" / "Assessment Summary" prose, Section 2 lines, Section 3 + per-bucket cards, fixes — must reflect UNKNOWN (or equivalent: "Access + Denied", "permission denied", "status not verifiable"). Do NOT infer + "missing", "lacking", "not configured", "not enforced", or any equivalent + state in any section. An UNKNOWN signal anywhere is UNKNOWN everywhere. +- Show findings (bucket cards) before fixes. Admins should understand the + problem before looking up the remediation. +- Never repeat a command under multiple bucket cards. Define it once in Policy + Fixes or Bucket Fixes and reference it by ID. +- Only include a description on a fix when the title alone is not + self-explanatory (e.g., CMEK warrants a note; "Block public access" does + not). +- Where a fix can be applied via Console, include the Console path. gcloud is + secondary, not the only option. +- One sentence max for the danger explanation per card. No academic prose. +- Use ⚠️ (not ✅) for controls that pass but create false confidence, with a + brief inline note explaining why. +- The Quickfix column in Section 2 should be a short plain-language label + + fix reference — never just a raw command, never blank. + +> [!TIP] See `examples/sample_assessment.md` for a complete example of expected +> output format. diff --git a/skills/security-assessment/references/phases/toxic_analysis.md b/skills/security-assessment/references/phases/toxic_analysis.md new file mode 100644 index 0000000..5603d64 --- /dev/null +++ b/skills/security-assessment/references/phases/toxic_analysis.md @@ -0,0 +1,44 @@ +# Phase 4: Toxic Combination Analysis + +This is the core of your assessment. Evaluate each bucket against the toxic +combination archetypes. + +## Instructions + +Evaluate each bucket against the toxic combination archetypes defined in +`references/toxic_combinations.md`. + +For each bucket, you MUST: + +1. **Identify Archetypes**: Identify which toxic combination archetype(s) match + the bucket's telemetry profile. +2. **Explain Combination**: Explain the **toxic combination** — why these + specific gaps together create a risk that is greater than the sum of its + parts. +3. **Assign Severity**: Assign a severity label modulated by the bucket's + classification from Phase 2. +4. **Provide Remediation**: Provide remediation scripts for every gap + identified. + +> [!IMPORTANT] Your reasoning MUST connect the dots between signals. Do not just +> list individual misconfigurations. Explain the attack path or failure mode +> that the combination enables. This is what makes you valuable — a config +> checker lists findings; you explain consequences. + +## Severity Labels + +Label | Meaning +-------- | --------------------------------------------------------------- +Critical | Active exposure or full-chain attack path. Immediate action +High | Significant gaps that enable serious attack scenarios. Urgent +Medium | Notable gaps that weaken posture but require additional factors +Low | Minor gaps, typically integrity/availability risks on + +Severity is modulated by bucket classification. The same toxic combination on a +High-sensitivity bucket may be Critical, but Medium on a Low-sensitivity bucket. + +## SAIF Risk Factors + +See `references/saif_risk_factors.md` for the mapping of SAIF risks to telemetry +signals. Reference these risk names in your findings to maintain traceability to +the SAIF framework. diff --git a/skills/security-assessment/references/saif_risk_factors.md b/skills/security-assessment/references/saif_risk_factors.md new file mode 100644 index 0000000..5a2cc95 --- /dev/null +++ b/skills/security-assessment/references/saif_risk_factors.md @@ -0,0 +1,50 @@ +# SAIF Risk Factors + +The +[Secure AI Framework (SAIF)](https://saif.google/secure-ai-framework/saif-map) +identifies 15 AI risks. The following subset is relevant to the telemetry +signals this skill can gather. Reference these risk names in your findings. + +| SAIF Risk | Description | Indicating Signals (When | +: : : Misconfigured/Absent) : +| ----------------------- | ----------------------- | ------------------------ | +| Unauthorized Training | Data used for training | UBLA disabled, public | +: Data : that was not authorized : access enabled, IAM : +: : for that purpose : over-permissions, no : +: : : audit logging : +| Data Poisoning | Malicious modification | No object versioning, no | +: : of training or : soft delete, IAM : +: : evaluation data to : over-permissions, no : +: : corrupt model behavior : audit logging : +| Sensitive Data | Model or system exposes | Public access enabled, | +: Disclosure : PII, financial, or : no CMEK, no VPC-SC, no : +: : other sensitive : Model Armor, IAM : +: : information : over-permissions : +| Model Exfiltration | Unauthorized copying or | No VPC-SC, IAM | +: : theft of model weights, : over-permissions, no : +: : checkpoints, or : audit logging : +: : artifacts : : +| Model Source Tampering | Unauthorized | No object versioning, no | +: : modification of model : soft delete, no CMEK, : +: : files, code, or : IAM over-permissions, no : +: : dependencies : audit logging : +| Prompt Injection | Adversarial inputs | No Model Armor | +: : designed to manipulate : templates, no Vertex AI : +: : model behavior or : integration, no floor : +: : bypass safety controls : settings configured : +| Insecure Model Output | Model generates | No Model Armor | +: : harmful, biased, or : templates, no Vertex AI : +: : sensitive content : integration : +: : passed to users or : : +: : downstream systems : : +| Rogue Actions | AI agent performs | IAM over-permissions | +: : unauthorized actions on : (agent service account), : +: : user data or systems : no Model Armor, no audit : +: : : logging, no VPC-SC : +| Denial of ML Service | Attacks that degrade or | Public access enabled, | +: : disrupt AI service : no VPC-SC : +: : availability : : +| Excessive Data Handling | User data stored, | No audit logging, no | +: : processed, or retained : data residency org : +: : beyond what consent : policy : +: : allows : : diff --git a/skills/security-assessment/references/telemetry_signals.md b/skills/security-assessment/references/telemetry_signals.md new file mode 100644 index 0000000..b21ce8f --- /dev/null +++ b/skills/security-assessment/references/telemetry_signals.md @@ -0,0 +1,67 @@ +# Telemetry Signals Reference + +These are the signals you gather during Phase 1. For each signal, the source +tells you which sub-skill or tool to use. If a signal is unavailable, log the +gap and move on. + +## Object-Level Signals + +| Signal | Source | What to Check | +| ----------------------- | ---------- | ----------------------------------- | +| Public access | BQ Dataset | Whether the object is publicly | +: : : accessible : +| Object versioning | BQ Dataset | Whether previous versions are | +: : : retained on overwrite/delete : +| RetentionExpirationTime | BQ Dataset | Whether a retention policy prevents | +: : : premature deletion : +| Encryption type | BQ Dataset | GMEK (Google-default), CMEK | +: : : (customer-managed), or CSEK : +: : : (customer-supplied) : + +## Bucket-Level Signals + +| Signal | Source | What to Check | +| ------------------------ | ---------- | ------------------------------------ | +| Enforced encryption type | BQ Dataset | Whether an encryption type is | +: allowed : : enforced for the bucket : +| UBLA configuration | BQ Dataset | Whether Uniform Bucket-Level Access | +: : : is enabled : +| Soft delete policy | BQ Dataset | Whether deleted objects are retained | +: : : for a recovery window, and the : +: : : duration : + +## Project-Level Signals + +| Signal | Source | What to Check | +| --------------------- | ------------------- | ---------------------------- | +| IAM over-permissions | IAM Policy + IAM | Whether any principals have | +: : Recommender API : broader roles than needed : +: : : (e.g., `roles/storage.admin` : +: : : when : +: : : `roles/storage.objectViewer` : +: : : suffices) : +| VPC Service Controls | Access Context | Whether the project is | +: : Manager API : inside a VPC-SC perimeter : +| Cloud Audit Logging | Cloud Resource | Whether DATA_READ and | +: (Data Access) : Manager API : DATA_WRITE audit logs are : +: : : enabled for : +: : : `storage.googleapis.com` : +| Data residency org | Org Policy API | Whether org policies | +: policy : : constrain resource locations : +: : : to specific regions : +| Block HTTP org policy | Org Policy API | Whether plaintext HTTP | +: : : traffic is blocked : +| TLS org policy | Org Policy API | Whether minimum TLS version | +: : : is restricted to 1.2+ : +| HMAC org policy | Org Policy API | Whether HMAC key creation is | +: : : restricted : +| Model Armor API | Model Armor API | Whether | +: status : : `modelarmor.googleapis.com` : +: : : is enabled : +| Model Armor floor | `gcloud model-armor | Whether minimum security | +: settings : floorsettings` : thresholds are configured : +| Model Armor templates | `gcloud model-armor | Whether any screening | +: : templates list` : templates exist : +| Model Armor Vertex AI | Model Armor floor | Whether Model Armor is | +: integration : settings API : activated for Vertex AI : +: : : Gemini calls : diff --git a/skills/security-assessment/references/toxic_combinations.md b/skills/security-assessment/references/toxic_combinations.md new file mode 100644 index 0000000..0df721d --- /dev/null +++ b/skills/security-assessment/references/toxic_combinations.md @@ -0,0 +1,345 @@ +# Toxic Combination Archetypes + +These are the core risk archetypes your assessment evaluates. Each defines a +specific "toxic combination" — a set of individually low-risk configurations +that together create a critical security exposure. + +> [!IMPORTANT] The detailed reasoning below is for YOUR understanding of the +> toxic combinations. When presenting findings to the user, condense each +> reasoning to 2-3 sentences. The user is a Storage Admin, not a security +> researcher. + +> [!IMPORTANT] Your value is in connecting the dots. Do NOT just list individual +> misconfigurations. For each matching archetype, explain the **attack path** or +> **failure mode** that the combination enables. This is what separates you from +> a config checker. + +## How to Match Archetypes + +For each bucket, compare its telemetry against each archetype below. A bucket +matches an archetype when it exhibits the majority of the telemetry signals +listed. Not every signal needs to match — use judgment about which combinations +are dangerous in context. + +> [!IMPORTANT] **Baseline-only failures do NOT match any archetype.** If a +> bucket's only deviations are baseline controls — UBLA disabled, object +> versioning off, soft delete off, Data Access audit logs disabled, and/or +> default GMEK encryption — it must NOT be labeled with any toxic-combination +> archetype. These are common defaults across most projects; an archetype +> requires at least one **additional risk factor** beyond baselines. +> +> Additional risk factors that qualify a bucket for archetype matching: +> +> - **Over-permissioned IAM** — service accounts or users holding +> `roles/storage.admin` or `roles/storage.objectAdmin` where a viewer-tier +> role would suffice. +> - **Public access enabled** — `allUsers` or `allAuthenticatedUsers` holds a +> role on the bucket or its objects. +> - **Data-residency mismatch** — bucket location is a multi-region (or +> non-compliant region) containing data subject to regional regulations, AND +> no org policy constrains resource locations. +> - **AI-agent workload context** — Model Armor API enabled in the project +> (signaling Vertex AI Agent Engine / Agent Builder usage), combined with +> agent-side configuration like a service account holding broad storage +> roles. +> +> A bucket failing only baseline controls is reported in the Section 2 baseline +> rollups and labeled "Baseline failures" in per-bucket cards — not with any +> archetype name. + +A single bucket may match multiple archetypes. Report all matches. + +-------------------------------------------------------------------------------- + +## Public Data Pipeline + +**Base Severity:** Critical + +**Telemetry Pattern:** + +- Bucket/Object classified as sensitive (or unclassified) +- Bucket is publicly accessible (has public read or public write objects) +- UBLA disabled (ACLs active alongside IAM) +- Object encryption is managed with Google-default (GMEK), no CMEK +- No VPC-SC perimeter +- Data Access audit logs disabled + +**Required Reasoning:** UBLA being disabled means ACLs operate alongside IAM, +creating a shadow access path. The allUsers ACL grants public read access to +training data even if IAM appears locked down. Without CMEK, the organization +cannot revoke encryption keys to cut off access in an emergency. No VPC-SC +perimeter means data can be copied to any external project with no exfiltration +boundary. Data Access audit logs being disabled means there is no record of who +accessed the data — ongoing exfiltration is undetectable. + +The key insight: each gap amplifies the others. Publicly accessible data + no +key control + no network boundary + no audit trail = total exposure. + +**SAIF Risks:** Unauthorized Training Data, Data Poisoning, Model Exfiltration, +Sensitive Data Disclosure + +**Remediation:** + +- Enforce Uniform Bucket-Level Access: `gcloud storage buckets update + gs://BUCKET --uniform-bucket-level-access` +- Block public access: `gcloud storage buckets update gs://BUCKET + --public-access-prevention` +- Enable CMEK: See GCS CMEK documentation — + https://cloud.google.com/storage/docs/encryption/customer-managed-keys + - Rewrite existing objects with CMEK key to update encryption of existing + objects +- Create VPC-SC perimeter: `gcloud access-context-manager perimeters create + PERIMETER_NAME --title='AI Assets Perimeter' + --resources=projects/PROJECT_NUMBER + --restricted-services=storage.googleapis.com --policy=POLICY_ID` +- Enable Data Access audit logs: Update project audit config for + `storage.googleapis.com` with DATA_READ and DATA_WRITE + +-------------------------------------------------------------------------------- + +## Silent Data Theft + +**Base Severity:** Critical + +**Telemetry Pattern:** + +- Over-permissioned service account (e.g., `roles/storage.admin` when + `roles/storage.objectViewer` suffices) +- No VPC-SC perimeter +- Data Access audit logs disabled +- Bucket is private (no public read or public write objects) — this is a + positive signal that creates false confidence +- CMEK configured for objects in the bucket — another positive signal that + creates false confidence + +**Required Reasoning:** While some controls are properly configured (no public +access, CMEK enabled), the combination of missing controls creates an invisible +exfiltration path. The over-permissioned service account can read, write, +delete, and modify IAM on every bucket. If compromised or behaving as a rogue +agent, it has full control. Without VPC-SC, nothing prevents copying data to an +external project. Without Data Access audit logs, these operations leave no +trace. + +The key insight: the bucket being private and encrypted creates a **false sense +of security**. The exfiltration path through the service account is wide open +and completely invisible. + +**SAIF Risks:** Model Exfiltration, Model Source Tampering, Rogue Actions + +**Remediation:** + +- Reduce service account to least privilege: `gcloud projects + add-iam-policy-binding PROJECT_ID --member='serviceAccount:SA_EMAIL' + --role='roles/storage.objectViewer'` then `gcloud projects + remove-iam-policy-binding PROJECT_ID --member='serviceAccount:SA_EMAIL' + --role='roles/storage.admin'` +- Create VPC-SC perimeter: `gcloud access-context-manager perimeters create + PERIMETER_NAME --title='AI Assets Perimeter' + --resources=projects/PROJECT_NUMBER + --restricted-services=storage.googleapis.com --policy=POLICY_ID` +- Enable Data Access audit logs: Update project audit config for + `storage.googleapis.com` with DATA_READ and DATA_WRITE. Scope to high-value + buckets. + +-------------------------------------------------------------------------------- + +## Irreversible Data Corruption + +**Base Severity:** High + +**Telemetry Pattern:** + +- Object versioning disabled +- Soft delete disabled (0-day retention) +- Over-permissioned IAM (multiple users with `roles/storage.objectAdmin`) +- Encryption is Google-default (GMEK), no CMEK +- Data Access audit logs disabled + +**Required Reasoning:** Without object versioning, any modification overwrites +the original with no rollback. Without soft delete, deletion is permanent and +immediate. Combined with multiple users having objectAdmin (write + delete), any +user could poison data or corrupt files. Without Data Access audit logs, the +organization cannot determine who modified what. Without CMEK, they cannot +revoke access at the key level in an emergency. + +The key insight: no versioning + no soft delete + broad write access + no audit +trail = **undetectable and unrecoverable** data tampering. + +**SAIF Risks:** Data Poisoning, Model Source Tampering + +**Remediation:** + +- Enable object versioning: `gcloud storage buckets update gs://BUCKET + --versioning` +- Enable soft delete: `gcloud storage buckets update gs://BUCKET + --soft-delete-duration=7d` +- Enable CMEK: See GCS CMEK documentation — + https://cloud.google.com/storage/docs/encryption/customer-managed-keys +- Reduce write permissions: Grant `roles/storage.objectViewer` or + `roles/storage.objectCreator` instead of `roles/storage.objectAdmin`. + `gcloud storage buckets add-iam-policy-binding gs://BUCKET + --member='user:USER_EMAIL' --role='roles/storage.objectViewer'` then `gcloud + storage buckets remove-iam-policy-binding gs://BUCKET + --member='user:USER_EMAIL' --role='roles/storage.objectAdmin'` +- Enable Data Access audit logs: Update project audit config for + `storage.googleapis.com` with DATA_READ and DATA_WRITE. + +-------------------------------------------------------------------------------- + +## Intentional Public Data + +**Base Severity:** Low + +**Telemetry Pattern:** + +- Bucket classified as non-sensitive (via tags, labels, or naming heuristics) +- Public access enabled — **intentional and expected** +- UBLA enabled +- Object versioning disabled +- Soft delete disabled +- Data Access audit logs disabled + +> [!CAUTION] This archetype ONLY applies when the bucket is classified as +> non-sensitive or the context clearly indicates public data (marketing assets, +> public docs, open datasets). If the bucket is classified as high-sensitivity +> and public, that is a **classification mismatch** — see +> `bucket_classification.md`. + +**Required Reasoning:** Public access is intentional and must NOT be recommended +for removal. However, intentionally public data still has integrity and +availability risks. Without versioning, content can be silently overwritten +(defacement). Without soft delete, content can be permanently deleted. Without +audit logging, unauthorized modifications go undetected. + +The key insight: the skill must **respect the intended use case** while still +flagging integrity/availability gaps. + +**SAIF Risks:** Model Source Tampering (if public data feeds downstream +pipelines) + +**Remediation:** + +- Enable object versioning for defacement protection: `gcloud storage buckets + update gs://BUCKET --versioning` +- Enable soft delete for deletion recovery: `gcloud storage buckets update + gs://BUCKET --soft-delete-duration=7d` +- Enable Data Access audit logs for write operations: Update project audit + config for `storage.googleapis.com` with DATA_WRITE. +- **NOTE: Public access is recognized as intentional. No changes to access + controls are recommended.** + +-------------------------------------------------------------------------------- + +## Compliance Without Proof + +**Base Severity:** High + +**Telemetry Pattern:** + +- No data residency org policy +- Bucket location is multi-region containing data subject to regional + regulations (e.g., EU data in US multi-region) +- Encryption is Google-default (GMEK), no CMEK +- Data Access audit logs disabled +- Bucket is private (public access prevention enforced) — positive signal +- UBLA enabled — positive signal + +**Required Reasoning:** Despite good access controls (UBLA, private access), the +project has critical compliance gaps. Data stored in a non-compliant region with +no residency org policy may violate regulations like GDPR. Without CMEK, the +organization cannot demonstrate key sovereignty. Without Data Access audit logs, +they cannot prove who accessed data or fulfill data subject access requests. + +The key insight: the data is technically secure from external threats but the +organization **cannot demonstrate compliance to regulators**. Infrastructure +that looks secure but cannot withstand a regulatory audit. + +**SAIF Risks:** Sensitive Data Disclosure, Excessive Data Handling + +**Remediation:** + +- Set org policy to restrict resource locations: `gcloud resource-manager + org-policies set-policy --project=PROJECT_ID policy.yaml` (constrain to + compliant regions) +- Migrate bucket to compliant region: `gcloud storage buckets create + gs://NEW_BUCKET --location=COMPLIANT_REGION` then `gcloud storage rsync + gs://OLD_BUCKET gs://NEW_BUCKET --recursive` +- Enable CMEK: See GCS CMEK documentation — + https://cloud.google.com/storage/docs/encryption/customer-managed-keys + - Enable Data Access audit logs: Update project audit config for + `storage.googleapis.com` with DATA_READ and DATA_WRITE. + +-------------------------------------------------------------------------------- + +## Prompt Injection to Data Destruction + +**Base Severity:** Critical + +**Telemetry Pattern:** + +- Model Armor API enabled BUT Vertex AI integration NOT activated AND no + templates created +- Agent service account with `roles/storage.admin` at project level +- No VPC-SC perimeter +- Data Access audit logs disabled +- Object versioning disabled +- Soft delete disabled +- Encryption is Google-default (GMEK), no CMEK +- UBLA enabled — positive signal +- Bucket is private — positive signal + +> [!IMPORTANT] This archetype applies specifically to projects running AI agents +> (Vertex AI Agent Engine, Agent Builder). If the project has no AI agent +> workloads, this archetype does not apply. Look for Model Armor being enabled +> as a signal that the project has AI workloads. + +**Required Reasoning:** Trace the full attack path from prompt to data +destruction. Model Armor floor settings exist and the API is enabled, but Vertex +AI integration is not activated — meaning Gemini calls from the agent bypass +Model Armor entirely. This is the most dangerous type of false signal: Model +Armor configuration suggests protections when they are not enforced. + +A successful prompt injection reaches the model unscreened. The compromised +model instructs the agent to use its `roles/storage.admin` access to: read any +object (exfiltration), write/overwrite objects (poisoning), delete objects +(destruction), or modify IAM (escalation). Without VPC-SC, exfiltrated data +leaves freely. Without audit logs, actions leave no trace. Without versioning or +soft delete, damage is irrecoverable. Without CMEK, no emergency key revocation. + +The key insight: every layer of defense is either absent or misconfigured. One +prompt injection **chains through the entire stack** unimpeded. + +**SAIF Risks:** Prompt Injection, Rogue Actions, Sensitive Data Disclosure, +Model Exfiltration, Data Poisoning, Model Source Tampering + +**Remediation:** + +- **URGENT** — Reduce agent service account to least privilege: `gcloud + projects remove-iam-policy-binding PROJECT_ID + --member='serviceAccount:AGENT_SA' --role='roles/storage.admin'` then + `gcloud storage buckets add-iam-policy-binding gs://SPECIFIC_BUCKET + --member='serviceAccount:AGENT_SA' --role='roles/storage.objectViewer'` +- **URGENT** — Activate Model Armor Vertex AI integration: `gcloud model-armor + floorsettings update + --full-uri=projects/PROJECT_ID/locations/global/floorSetting + --add-integrated-services=VERTEX_AI + --vertex-ai-enforcement-type=INSPECT_AND_BLOCK` +- Create Model Armor screening template: `gcloud model-armor templates create + agent-protection --location=us-central1 + --rai-settings-filters='[{"filterType":"HATE_SPEECH","confidenceLevel":"MEDIUM_AND_ABOVE"},{"filterType":"DANGEROUS","confidenceLevel":"MEDIUM_AND_ABOVE"},{"filterType":"HARASSMENT","confidenceLevel":"MEDIUM_AND_ABOVE"},{"filterType":"SEXUALLY_EXPLICIT","confidenceLevel":"MEDIUM_AND_ABOVE"}]' + --pi-and-jailbreak-filter-settings-enforcement=enabled + --pi-and-jailbreak-filter-settings-confidence-level=medium-and-above + --malicious-uri-filter-settings-enforcement=enabled` +- Enable object versioning: `gcloud storage buckets update gs://BUCKET + --versioning` +- Enable soft delete: `gcloud storage buckets update gs://BUCKET + --soft-delete-duration=7d` +- Enable CMEK: See GCS CMEK documentation — + https://cloud.google.com/storage/docs/encryption/customer-managed-keys + - Create VPC-SC perimeter: `gcloud access-context-manager perimeters + create agent-perimeter --title='Agent Workload Perimeter' + --resources=projects/PROJECT_NUMBER + --restricted-services=storage.googleapis.com,aiplatform.googleapis.com + --policy=POLICY_ID` +- Enable Data Access audit logs: Update project audit config for + `storage.googleapis.com` with DATA_READ and DATA_WRITE. diff --git a/skills/security-assessment/scripts/cloud_rest_helpers_nodeps.py b/skills/security-assessment/scripts/cloud_rest_helpers_nodeps.py new file mode 100644 index 0000000..99e818e --- /dev/null +++ b/skills/security-assessment/scripts/cloud_rest_helpers_nodeps.py @@ -0,0 +1,557 @@ +"""GCP REST helpers with no third-party Python dependencies. + +Same surface as ``cloud_rest_helpers`` but uses only Python stdlib plus +two system binaries: + +* ``gcloud auth application-default print-access-token`` for OAuth2 bearer +tokens. +* ``curl`` for HTTPS calls. +""" + +from collections.abc import Mapping, Sequence +import json as json_lib +import logging +import os +import re +import shutil +import subprocess +import tempfile +import time +from typing import Any +import urllib.parse + +_BIGQUERY_API = "https://bigquery.googleapis.com/bigquery/v2/projects" +_TIMEOUT_SECONDS = 60 + +_ATTRIBUTION_PREFIX = "gcs-skills" +_KIT_VERSION = "1.0" + + +def _user_agent(skill: str, script: str) -> str: + return ( + f"{_ATTRIBUTION_PREFIX}/{_KIT_VERSION} (skill:{skill}; script:{script})" + ) + + +def sanitize_label(val: str) -> str: + """Sanitizes a string to satisfy the BigQuery label charset requirements. + + Converts to lowercase, replaces underscores and invalid characters with + hyphens, collapses consecutive hyphens, and strips leading/trailing hyphens. + + Args: + val: The string to sanitize. + + Returns: + The sanitized string. + """ + val = val.replace("_", "-") + val = re.sub(r"[^a-z0-9-]", "-", val.lower()) + val = re.sub(r"-+", "-", val) + return val.strip("-") + + +def bigquery_labels(skill: str, script: str) -> dict[str, str]: + """BigQuery job labels for kit attribution. + + Lowercase + hyphens only to satisfy the BigQuery label charset + (https://cloud.google.com/bigquery/docs/labels-intro#requirements). + + Args: + skill: Identifier for the calling skill (e.g., "security-assessment"). + script: Identifier for the calling script (e.g., "fetch-bucket-telemetry"). + + Returns: + A dictionary containing the aggregated kit labels. + """ + return { + _ATTRIBUTION_PREFIX: "true", + f"{_ATTRIBUTION_PREFIX}-skill": sanitize_label(skill), + f"{_ATTRIBUTION_PREFIX}-script": sanitize_label(script), + } + + +class CloudRestError(Exception): + """Base error for transport, auth, or HTTP failures from the kit.""" + + +class CredentialsError(CloudRestError): + """Raised when an access token cannot be obtained via gcloud.""" + + +class HttpError(CloudRestError): + """Raised on a 4xx/5xx HTTP response. + + Attributes: + status_code: The HTTP status code. + body: The response body decoded as UTF-8 (errors replaced). + url: The URL that was requested. + """ + + def __init__(self, status_code: int, body: str, url: str): + self.status_code = status_code + self.body = body + self.url = url + super().__init__(f"HTTP {status_code} from {url}: {body[:500]}") + + +_SERVICE_DISABLED_REASON = "SERVICE_DISABLED" + + +def is_service_disabled_error(http_error: HttpError) -> bool: + """Returns True if the HTTP error indicates the API is not enabled. + + GCP's API gateway returns 403 with errorInfo.reason="SERVICE_DISABLED" + when an API has not been enabled on the project. Permission-denied errors + use other reasons (e.g., IAM_PERMISSION_DENIED) and must not match here. + + Args: + http_error: The HttpError raised by an HTTP request to a GCP service. + + Returns: + True if the error body contains a SERVICE_DISABLED reason, False otherwise. + """ + if http_error.status_code != 403: + return False + try: + payload = json_lib.loads(http_error.body) + except (ValueError, TypeError): + return False + details = (payload.get("error") or {}).get("details") or [] + return any( + isinstance(d, Mapping) and d.get("reason") == _SERVICE_DISABLED_REASON + for d in details + ) + + +def _fetch_access_token() -> str: + """Fetches an OAuth2 access token via ``gcloud auth application-default print-access-token``. + + Returns: + The bearer token string with surrounding whitespace stripped. + + Raises: + CredentialsError: If ``gcloud`` is not on PATH, the subprocess fails, + the call times out, or the returned token is empty. + """ + gcloud = shutil.which("gcloud") + if not gcloud: + raise CredentialsError( + "gcloud CLI not found on PATH. Install Google Cloud SDK and run " + "'gcloud auth application-default login'." + ) + try: + result = subprocess.run( + [gcloud, "auth", "application-default", "print-access-token"], + capture_output=True, + text=True, + encoding="utf-8", + errors="replace", + timeout=_TIMEOUT_SECONDS, + check=True, + ) + except subprocess.CalledProcessError as e: + stderr = (e.stderr or "").strip() + raise CredentialsError( + "'gcloud auth application-default print-access-token' failed:" + f" {stderr}. Run 'gcloud auth application-default login'." + ) from e + except subprocess.TimeoutExpired as e: + raise CredentialsError( + "gcloud auth application-default print-access-token timed out." + ) from e + + token = result.stdout.strip() + if not token: + raise CredentialsError( + "gcloud auth application-default print-access-token returned an empty" + " token." + ) + return token + + +class Response: + """Minimal Response-shaped wrapper used by AuthorizedSession. + + Supports context-manager use, ``raise_for_status()``, ``json()``, + ``status_code``, and ``text``. + """ + + def __init__(self, status_code: int, body: bytes, url: str): + self.status_code = status_code + self._body = body + self.url = url + + @property + def text(self) -> str: + """The response body decoded as UTF-8 string.""" + return self._body.decode("utf-8", errors="replace") + + def json(self) -> Any: + """Returns the parsed JSON response body.""" + + # `utf-8-sig` strips a leading BOM if present (some Cloud edge proxies + # emit one), matching `requests.Response.json()` tolerance. + return json_lib.loads(self._body.decode("utf-8-sig") or "null") + + def raise_for_status(self) -> None: + """Raises HttpError if status_code is 4xx or 5xx.""" + if 400 <= self.status_code < 600: + raise HttpError(self.status_code, self.text, self.url) + + def __enter__(self) -> "Response": + return self + + def __exit__(self, *exc: Any) -> bool: + return False + + +class AuthorizedSession: + """Curl + gcloud-backed AuthorizedSession. + + Provides ``request(method, url, json=, timeout=)``, ``get(url, ...)``, + and a mutable ``headers`` dict. On HTTP 401 the bearer token is + refreshed once via gcloud and the request is retried; a second 401 + raises ``HttpError`` rather than returning the response, since a + persistent 401 signals a broken credential rather than per-resource + authorization. + + Not thread-safe: concurrent calls on a shared instance race on the + ``_token`` field on 401-refresh paths. Construct one session per + thread/task if you need concurrency. + """ + + def __init__(self) -> None: + self._token = _fetch_access_token() + self.headers: dict[str, str] = {} + + def __enter__(self) -> "AuthorizedSession": + return self + + def __exit__(self, *exc: Any) -> bool: + return False + + def _execute_request( + self, + method: str, + url: str, + body_bytes: bytes | None, + headers: Mapping[str, str], + timeout: float, + ) -> Response: + """Executes a single HTTP request using curl. + + Args: + method: The HTTP method (e.g., "GET", "POST"). + url: The URL to request. + body_bytes: The raw bytes to send as the request body, or None. + headers: A mapping of header names to values. + timeout: The timeout for the request in seconds. + + Returns: + A Response object. + + Raises: + CloudRestError: If curl fails or times out. + """ + curl = shutil.which("curl") + if not curl: + raise CloudRestError( + "curl not found on PATH. The kit shells out to curl for HTTPS" + " calls; install curl and ensure it is on PATH" + " (https://curl.se/download.html)." + ) + cmd: list[str] = [ + curl, + "-sS", # silent (no progress meter), but still print errors. + "-X", + method, # explicit HTTP method (GET, POST, ...). + # Total operation timeout in seconds; guarded against 0/negative. + "--max-time", + str(max(1, int(timeout))), + # Append the 3-digit HTTP status to stdout after the body so we can + # recover it (curl doesn't expose status code any other way without + # parsing response headers). + "-w", + "%{http_code}", + ] + if body_bytes is not None: + # `@-` reads the request body from stdin (piped via subprocess.run + # input=). `--data-binary` (vs `--data`) preserves bytes exactly -- + # no newline stripping or CRLF conversion, which would corrupt JSON. + cmd.extend(["--data-binary", "@-"]) + + # Headers are written to a 0600 temp file and read by curl via + # `-H @` so the bearer token is never visible in `ps`/argv. + # We close the file before spawning curl so Windows (which forbids a + # second open on a file already held by another handle) can also read + # it; cleanup happens in `finally`. + fd, headers_path = tempfile.mkstemp(prefix="curl-headers-") + try: + with os.fdopen(fd, "w", encoding="utf-8") as headers_file: + for k, v in headers.items(): + # Reject CR/LF anywhere in header name/value. Otherwise an + # attacker who controls a header value (e.g., via a label + # interpolated into User-Agent) could smuggle additional + # headers by injecting `\r\n`. + if "\r" in k or "\n" in k or "\r" in v or "\n" in v: + raise CloudRestError( + f"Refusing to send header {k!r}: CR/LF in name or value." + ) + headers_file.write(f"{k}: {v}\n") + cmd.extend(["-H", f"@{headers_path}"]) + cmd.append(url) + try: + result = subprocess.run( + cmd, + input=body_bytes, + capture_output=True, + timeout=timeout + 5, + check=False, + ) + except subprocess.TimeoutExpired as e: + raise CloudRestError(f"curl timed out contacting {url}.") from e + finally: + try: + os.unlink(headers_path) + except OSError: + pass # best-effort cleanup + + if result.returncode != 0: + stderr = result.stderr.decode("utf-8", errors="replace").strip() + raise CloudRestError( + f"curl failed contacting {url} (exit {result.returncode}): {stderr}" + ) + + # `-w "%{http_code}"` appends the 3-digit HTTP status to stdout after + # the body. HTTP status codes are always 3 digits, so split on length. + stdout = result.stdout + if len(stdout) < 3: + raise CloudRestError( + f"curl returned unexpectedly short output for {url}." + ) + status_bytes = stdout[-3:] + try: + status_code = int(status_bytes.decode("ascii")) + except (UnicodeDecodeError, ValueError) as e: + raise CloudRestError( + f"curl returned non-numeric status for {url}: {status_bytes!r}" + ) from e + body = stdout[:-3] + return Response(status_code, body, url) + + def request( + self, + method: str, + url: str, + *, + json: Mapping[str, Any] | None = None, + timeout: float = _TIMEOUT_SECONDS, + headers: Mapping[str, str] | None = None, + ) -> Response: + """Issues an authenticated HTTP request, refreshing the token on 401.""" + request_headers: dict[str, str] = dict(self.headers) + if headers: + request_headers.update(headers) + request_headers["Authorization"] = f"Bearer {self._token}" + + body_bytes: bytes | None = None + if json is not None: + body_bytes = json_lib.dumps(json).encode("utf-8") + request_headers["Content-Type"] = "application/json" + + resp = self._execute_request( + method, url, body_bytes, request_headers, timeout + ) + if resp.status_code == 401: + self._token = _fetch_access_token() + request_headers["Authorization"] = f"Bearer {self._token}" + resp = self._execute_request( + method, url, body_bytes, request_headers, timeout + ) + if resp.status_code == 401: + raise HttpError(401, resp.text, url) + return resp + + def get( + self, + url: str, + *, + timeout: float = _TIMEOUT_SECONDS, + headers: Mapping[str, str] | None = None, + ) -> Response: + """Issues an authenticated HTTP GET request.""" + return self.request("GET", url, timeout=timeout, headers=headers) + + +def get_authorized_session( + *, skill: str, script: str, project_id: str | None = None +) -> AuthorizedSession: + """Returns an AuthorizedSession with the kit User-Agent stamped. + + Every outbound request is stamped with a gcs-skills User-Agent so + the GCS team can measure aggregate kit usage from server-side request + logs. If ``project_id`` is provided, the ``X-Goog-User-Project`` header is + also stamped so requests are billed/quota-attributed to that project. + + Args: + skill: Identifier for the calling skill (e.g., "security-assessment"). + script: Identifier for the calling script (e.g., "fetch-bucket-telemetry"). + project_id: Optional GCP project ID for billing/quota attribution. When + set, stamped as the ``X-Goog-User-Project`` header on every request. + + Returns: + AuthorizedSession with a freshly fetched access token. + + Raises: + CredentialsError: If fetching the access token via gcloud fails. + """ + session = AuthorizedSession() + session.headers["User-Agent"] = _user_agent(skill, script) + if project_id: + session.headers["X-Goog-User-Project"] = project_id + return session + + +def parse_bq_value(val: Any, field: Mapping[str, Any]) -> Any: + """Recursively parses BigQuery REST JSON response row values. + + Args: + val: The value to parse. + field: The field definition from the BigQuery schema. + + Returns: + The parsed value. + """ + if val is None: + return None + ftype = field.get("type") + fmode = field.get("mode") + + if fmode == "REPEATED": + if not val: + return [] + single_field = dict(field) + single_field["mode"] = "NULLABLE" + return [parse_bq_value(item.get("v"), single_field) for item in val] + + if ftype == "BOOLEAN": + return val.lower() == "true" + elif ftype == "INTEGER": + return int(val) + elif ftype == "RECORD": + fields = field.get("fields", []) + row_f = val.get("f", []) + res = {} + for i, subfield in enumerate(fields): + subval = row_f[i].get("v") if i < len(row_f) else None + res[subfield["name"]] = parse_bq_value(subval, subfield) + return res + else: + return val + + +def execute_bigquery_query( + *, + project_id: str, + payload: Mapping[str, Any], + session: AuthorizedSession, + skill: str, + script: str, +) -> tuple[Sequence[Mapping[str, Any]], Sequence[Any]]: + """Executes a BigQuery REST query, polls for completion, and paginates rows. + + Merges gcs-skills job labels into the request body so the kit's BigQuery + traffic is queryable from `INFORMATION_SCHEMA.JOBS_BY_PROJECT` and Cloud + Audit Logs. Caller-supplied labels in `payload` win on key collision. + + Args: + project_id: The GCP project ID. + payload: The JSON query payload. + session: Authorized session for REST requests. + skill: Identifier for the calling skill (e.g., "security-assessment"). + script: Identifier for the calling script (e.g., "fetch-bucket-telemetry"). + + Returns: + A tuple (schema_fields, rows) where schema_fields is a list of + the BigQuery column schema fields and rows is a list of unparsed rows. + + Raises: + CloudRestError: If the REST query fails. + RuntimeError: If the query fails or doesn't complete. + """ + + enriched_payload = dict(payload) + enriched_payload["labels"] = { + **bigquery_labels(skill, script), + **(payload.get("labels") or {}), + } + url = f"{_BIGQUERY_API}/{project_id}/queries" + logging.debug("Executing REST query with payload: %s", enriched_payload) + with session.request( + method="POST", + url=url, + json=enriched_payload, + timeout=_TIMEOUT_SECONDS, + ) as response: + response.raise_for_status() + response_json = response.json() + + job_ref = response_json.get("jobReference", {}) + job_id = job_ref.get("jobId") + job_complete = response_json.get("jobComplete") + + polling_attempts = 0 + while not job_complete and job_id and polling_attempts < 60: + logging.debug("Query job not complete, polling job: %s", job_id) + time.sleep(1) + job_url = f"{_BIGQUERY_API}/{project_id}/queries/{job_id}" + with session.request( + method="GET", + url=job_url, + timeout=_TIMEOUT_SECONDS, + ) as job_response: + job_response.raise_for_status() + response_json = job_response.json() + job_complete = response_json.get("jobComplete") + polling_attempts += 1 + + if not job_complete: + # Covers both "polling capped at 60 attempts and the job is still + # running" and "server returned an incomplete-but-no-jobReference + # response we can't poll" -- the latter would otherwise return an + # empty ([], []) silently. + raise RuntimeError(f"BigQuery query did not complete (job_id={job_id!r}).") + + error_result = response_json.get("errorResult") or response_json.get( + "status", {} + ).get("errorResult") + if error_result: + raise RuntimeError( + f"BigQuery job failed: {error_result.get('message', 'Unknown error')}" + ) + + schema_fields = (response_json.get("schema") or {}).get("fields") or [] + rows = response_json.get("rows", []) + + job_ref = response_json.get("jobReference", {}) + job_id = job_ref.get("jobId") + page_token = response_json.get("pageToken") + + while page_token and job_id: + page_url = ( + f"{_BIGQUERY_API}/{project_id}/queries/{job_id}" + f"?pageToken={urllib.parse.quote(page_token, safe='')}" + ) + logging.debug("Fetching next page of results: %s", page_token) + with session.request( + method="GET", + url=page_url, + timeout=_TIMEOUT_SECONDS, + ) as page_response: + page_response.raise_for_status() + page_json = page_response.json() + rows.extend(page_json.get("rows", [])) + page_token = page_json.get("pageToken") + + return schema_fields, rows diff --git a/skills/security-assessment/scripts/evaluate_project_security_posture.py b/skills/security-assessment/scripts/evaluate_project_security_posture.py new file mode 100644 index 0000000..a673aeb --- /dev/null +++ b/skills/security-assessment/scripts/evaluate_project_security_posture.py @@ -0,0 +1,478 @@ +"""Evaluates the security posture of a subset of GCS project level settings.""" + +from __future__ import annotations + +import argparse +from collections.abc import Collection, Mapping, MutableMapping +import enum +import json +from typing import Any, TypedDict + +# pylint: disable=g-import-not-at-top +try: + from google3.blobstore2.storage_management.gemini.si_agent.skills.security_assessment.scripts import cloud_rest_helpers_nodeps +except ImportError: + import cloud_rest_helpers_nodeps # type: ignore +# pylint: enable=g-import-not-at-top + +_TIMEOUT_SECONDS = 5 +_STORAGE_API = "storage.googleapis.com" +_ALL_SERVICES = "allServices" +_DATA_READ = "DATA_READ" +_DATA_WRITE = "DATA_WRITE" +_VPC_SC_WILDCARD = "*" + +_ORG_POLICY_API = "https://orgpolicy.googleapis.com/v2/projects" +_CLOUD_RESOURCE_MANAGER_API = "https://cloudresourcemanager.googleapis.com/v3" +_ACCESS_CONTEXT_MANAGER_API = "https://accesscontextmanager.googleapis.com/v1" +_MODEL_ARMOR_API = "https://modelarmor.googleapis.com/v1/projects" +# The aggregated `locations/-/templates` call fans out to every regional control +# plane, so it needs a longer timeout than per-region/global Model Armor calls. +_MODEL_ARMOR_TEMPLATES_TIMEOUT_SECONDS = 30 + +_SKILL = "security-assessment" +_SCRIPT = "evaluate-project-security-posture" + + +class OrgPolicySecureTypeEnum(enum.Enum): + """The condition for a secure org policy. + + Attributes: + ENFORCED: The 'enforce' rule is set to True. + NO_ALLOW_ALL: The 'allowAll' is not present in the rules. + """ + + ENFORCED = 1 + NO_ALLOW_ALL = 2 + + +class _NoAllowAllOrgPolicyRuleValue(TypedDict, total=False): + """OrgPolicy rule values for NO_ALLOW_ALL policies. + + These names are based on the response received from the JSON payload response + returned from the OrgPolicy API. + + Attributes: + allowedValues: The specific list of values that are allowed for this policy. + deniedValues: The specific list of values that are denied for this policy. + """ + + allowedValues: Collection[str] # pylint: disable=invalid-name + deniedValues: Collection[str] # pylint: disable=invalid-name + + +class _NoAllowAllOrgPolicyRule(TypedDict, total=False): + """OrgPolicy rule types for NO_ALLOW_ALL policies. + + These names are based on the response received from the JSON payload response + returned from the OrgPolicy API. + + Attributes: + values: Specific list of allow/deny values for this rule (these will + override the allowAll/denyAll values). + allowAll: Whether the policy rule allows all values. + denyAll: Whether the policy rule denies all values. + """ + + values: _NoAllowAllOrgPolicyRuleValue + allowAll: bool # pylint: disable=invalid-name + denyAll: bool # pylint: disable=invalid-name + + +class _EnforcedOrgPolicyRule(TypedDict, total=False): + """OrgPolicy rule types for ENFORCED policies.""" + + enforce: bool + + +def _check_no_allow_all_org_policy( + rules: ( + Collection[_NoAllowAllOrgPolicyRule] + | Collection[_EnforcedOrgPolicyRule] + ), +) -> bool: + """Checks if the provided OrgPolicy rules enforce at least one restriction. + + A "no allowAll" OrgPolicy is considered secure if: + - Any rule has "denyAll" set to True. + - Any rule contains "deniedValues". + - There is at least one rule present and "allowAll" is not set to True at all. + + Args: + rules: A list of rules for a specific OrgPolicy. + + Returns: + True if the rules are securely configured (i.e., no "allowAll" without + denial), False otherwise. + """ + if any(rule.get("denyAll", False) for rule in rules): + # Deny rules always override allow rules. + return True + + for rule in rules: + values = rule.get("values") or {} + if values.get("deniedValues", []) or values.get("allowedValues", []): + # Deny rules always override allow rules. + return True + # If a rule is empty or only has allowAll enabled with no overrides, it is + # considered insecure. + return bool(rules and not any(rule.get("allowAll") for rule in rules)) + + +def check_secure_org_policies_enforced( + *, + project_id: str, + authorized_session: Any, +) -> Mapping[str, bool | Mapping[str, str]]: + """Determines if a subset of org policies are set to a secure value for the project. + + Args: + project_id: The GCP project ID to check. + authorized_session: The AuthorizedSession to use for API requests. + + Returns: + A dictionary mapping each policy name to a bool + indicating whether the policy is securely configured. Errors are + propagated to the dictionary as a string under the key "error" for each + org policy. + """ + org_policies: dict[str, OrgPolicySecureTypeEnum] = { + "gcp.resourceLocations": OrgPolicySecureTypeEnum.NO_ALLOW_ALL, + "gcp.restrictTLSVersion": OrgPolicySecureTypeEnum.NO_ALLOW_ALL, + "storage.disableServiceAccountHmacKeyCreation": ( + OrgPolicySecureTypeEnum.ENFORCED + ), + "storage.secureHttpTransport": OrgPolicySecureTypeEnum.ENFORCED, + } + + org_policy_results: MutableMapping[str, bool | MutableMapping[str, str]] = {} + + for policy_name, policy_type in org_policies.items(): + try: + with authorized_session.request( + method="GET", + url=f"{_ORG_POLICY_API}/{project_id}/policies/{policy_name}:getEffectivePolicy", + timeout=_TIMEOUT_SECONDS, + ) as policy_response: + policy_response.raise_for_status() + policy = policy_response.json() + except cloud_rest_helpers_nodeps.CloudRestError as e: + org_policy_results[policy_name] = {"error": str(e)} + continue + + rules = policy.get("spec", {}).get("rules", []) + # These conditions expect to compare with the default values for the + # policy, so if the rules are not configured, we consider the policy + # to be set to its default value, which is insecure. + if policy_type is OrgPolicySecureTypeEnum.ENFORCED: + # Any enforced rule would be considered secure. A potential case where + # there might be multiple rules is if a condition is bound to the + # enforcement through tags. + secure = any(rule.get("enforce", False) for rule in rules) + elif policy_type is OrgPolicySecureTypeEnum.NO_ALLOW_ALL: + secure = _check_no_allow_all_org_policy(rules) + else: + org_policy_results[policy_name] = {"error": "Unable to evaluate policy."} + continue + org_policy_results[policy_name] = secure + + return org_policy_results + + +# TODO: Check org-level API as well for inherited policies. +def check_project_data_access_audit_logs_enabled( + project_id: str, + authorized_session: Any, +) -> Mapping[str, bool | str]: + """Checks if DATA_ACCESS audit logs are enabled for the project. + + Args: + project_id: GCP project ID + authorized_session: AuthorizedSession to use for API requests + + Returns: + A mapping of "DATA_READ" and "DATA_WRITE" to a boolean indicating if the + respective audit log type is enabled. Returns a mapping of "error" to a + string if one occurs during the API request. + """ + try: + with authorized_session.request( + method="POST", + url=f"{_CLOUD_RESOURCE_MANAGER_API}/projects/{project_id}:getIamPolicy", + timeout=_TIMEOUT_SECONDS, + ) as iam_response: + iam_response.raise_for_status() + iam_response_json = iam_response.json() + except cloud_rest_helpers_nodeps.CloudRestError as e: + return {"error": str(e)} + + enabled_logs: MutableMapping[str, bool] = { + _DATA_READ: False, + _DATA_WRITE: False, + } + + audit_configs = iam_response_json.get("auditConfigs") or [] + for config in audit_configs: + service = config.get("service", "") + if service not in (_STORAGE_API, _ALL_SERVICES): + continue + for audit_log_config in config.get("auditLogConfigs") or []: + audit_log_type = audit_log_config.get("logType") + if audit_log_type in enabled_logs: + enabled_logs[audit_log_type] = True + + # Terminate early if all logs are enabled. Rules may be split across + # storage and allServices. + if all(enabled_logs.values()): + return enabled_logs + return enabled_logs + + +def _get_project_number_and_org_id( + project_id: str, + authorized_session: Any, +) -> tuple[str, str] | Mapping[str, str]: + """Retrieves the project number and organization ID for a project ID. + + Traverses folders if necessary to get the organization ID. + + Args: + project_id: GCP project ID. + authorized_session: AuthorizedSession to use for API requests. + + Returns: + A tuple of (project_number, org_id) or a mapping with "error" key. + """ + try: + with authorized_session.request( + method="GET", + url=f"{_CLOUD_RESOURCE_MANAGER_API}/projects/{project_id}", + timeout=_TIMEOUT_SECONDS, + ) as response: + response.raise_for_status() + project_data = response.json() + except cloud_rest_helpers_nodeps.CloudRestError as e: + return {"error": f"Failed to get project data: {e}"} + + project_number_path = project_data.get("name", "") + parent = project_data.get("parent", "") + + while parent and parent.startswith("folders/"): + try: + with authorized_session.request( + method="GET", + url=f"{_CLOUD_RESOURCE_MANAGER_API}/{parent}", + timeout=_TIMEOUT_SECONDS, + ) as response: + response.raise_for_status() + folder_data = response.json() + parent = folder_data.get("parent", "") + except cloud_rest_helpers_nodeps.CloudRestError as e: + return {"error": f"Failed to get folder data for {parent}: {e}"} + + if not parent or not parent.startswith("organizations/"): + return {"error": f"Could not find organization for project {project_id}."} + + return project_number_path, parent + + +# TODO: Add support for pagination in the API calls. +def check_vpc_sc_perimeter_enabled( + *, + project_id: str, + authorized_session: Any, +) -> bool | Mapping[str, str]: + """Checks if a VPC-SC perimeter restricting the storage API is enabled for the project. + + Args: + project_id: GCP project ID. + authorized_session: AuthorizedSession to use for API requests. + + Returns: + True if a perimeter is enabled and secure, False otherwise. + Returns a mapping with "error" key if an error occurs. + """ + result = _get_project_number_and_org_id(project_id, authorized_session) + if isinstance(result, Mapping) and "error" in result: + return result + + if not isinstance(result, tuple): + return {"error": "Failed to get project number and organization ID."} + + project_number_path, org_id_path = result + + try: + with authorized_session.request( + method="GET", + url=( + f"{_ACCESS_CONTEXT_MANAGER_API}/accessPolicies?parent={org_id_path}" + ), + timeout=_TIMEOUT_SECONDS, + ) as response: + response.raise_for_status() + policies_data = response.json() + policies = policies_data.get("accessPolicies") or [] + except cloud_rest_helpers_nodeps.CloudRestError as e: + return {"error": f"Failed to list access policies: {e}"} + + for policy in policies: + policy_name = policy.get("name", "") + try: + with authorized_session.request( + method="GET", + url=f"{_ACCESS_CONTEXT_MANAGER_API}/{policy_name}/servicePerimeters", + timeout=_TIMEOUT_SECONDS, + ) as response: + response.raise_for_status() + perimeters_data = response.json() + perimeters = perimeters_data.get("servicePerimeters") or [] + except cloud_rest_helpers_nodeps.CloudRestError as e: + return {"error": f"Failed to list perimeters for {policy_name}: {e}"} + + for perimeter in perimeters: + status = perimeter.get("status") + if status is None: + continue + + if perimeter.get("perimeterType") == "PERIMETER_TYPE_BRIDGE": + continue + + resources = status.get("resources") or [] + if project_number_path not in resources: + continue + + restricted_services = status.get("restrictedServices") or [] + if ( + "storage.googleapis.com" in restricted_services + or _VPC_SC_WILDCARD in restricted_services + ): + return True + + return False + + +def check_model_armor_status( + *, + project_id: str, + authorized_session: Any, +) -> Mapping[str, Any]: + """Checks Model Armor enablement, floor settings, Vertex AI integration, and templates. + + Uses the Model Armor API itself as the enablement probe to avoid requiring + Service Usage permissions. The floor settings GET is the lightest possible + call (single global resource, single permission). A SERVICE_DISABLED error + from this call unambiguously means the API is not enabled on the project; a + generic 403 means the caller lacks `modelarmor.floorsettings.get` and the + signals are reported as unknown via an error mapping. + + Args: + project_id: The GCP project ID to check. + authorized_session: The AuthorizedSession to use for API requests. + + Returns: + A mapping containing Model Armor signals: `api_enabled` (always present), + plus `floor_settings_configured`, `vertex_ai_integration`, and + `templates` (a sub-mapping with `count` or `error`) when the API is + enabled. Returns `{"error": str}` if the caller lacks permissions or + another transport failure occurs. + """ + floor_setting_url = ( + f"{_MODEL_ARMOR_API}/{project_id}/locations/global/floorSetting" + ) + try: + with authorized_session.request( + method="GET", + url=floor_setting_url, + timeout=_TIMEOUT_SECONDS, + ) as response: + if response.status_code == 404: + # API enabled but no FloorSetting resource has been created yet. + floor_setting: Mapping[str, Any] = {} + else: + response.raise_for_status() + floor_setting = response.json() + except cloud_rest_helpers_nodeps.HttpError as e: + if cloud_rest_helpers_nodeps.is_service_disabled_error(e): + return {"api_enabled": False} + return {"error": str(e)} + # CloudRestError covers transport-layer failures (no HTTP response received). + except cloud_rest_helpers_nodeps.CloudRestError as e: + return {"error": str(e)} + + filter_config = floor_setting.get("filterConfig") or {} + integrated_services = floor_setting.get("integratedServices") or [] + result: dict[str, Any] = { + "api_enabled": True, + "floor_settings_configured": bool(filter_config), + "vertex_ai_integration": "VERTEX_AI" in integrated_services, + } + + # Use the `-` location wildcard to list templates across all regions. + templates_url = f"{_MODEL_ARMOR_API}/{project_id}/locations/-/templates" + try: + with authorized_session.request( + method="GET", + url=templates_url, + timeout=_MODEL_ARMOR_TEMPLATES_TIMEOUT_SECONDS, + ) as response: + response.raise_for_status() + templates_data = response.json() + except cloud_rest_helpers_nodeps.CloudRestError as e: + result["templates"] = {"error": str(e)} + return result + + templates = templates_data.get("templates") or [] + result["templates"] = {"count": len(templates)} + return result + + +def main() -> None: + parser = argparse.ArgumentParser( + description=( + "Evaluates the security posture of a subset of GCP project level" + " settings." + ) + ) + parser.add_argument( + "--project_id", type=str, required=True, help="The GCP project ID." + ) + args = parser.parse_args() + + try: + authorized_session = cloud_rest_helpers_nodeps.get_authorized_session( + skill=_SKILL, script=_SCRIPT, project_id=args.project_id + ) + except cloud_rest_helpers_nodeps.CredentialsError as e: + print(f"Failed to get authorized session using credentials: {e}") + return + + print( + json.dumps( + { + "project_id": args.project_id, + "details": { + "org_policy": check_secure_org_policies_enforced( + project_id=args.project_id, + authorized_session=authorized_session, + ), + "audit_logs": check_project_data_access_audit_logs_enabled( + project_id=args.project_id, + authorized_session=authorized_session, + ), + "vpc_sc_perimeter": check_vpc_sc_perimeter_enabled( + project_id=args.project_id, + authorized_session=authorized_session, + ), + "model_armor": check_model_armor_status( + project_id=args.project_id, + authorized_session=authorized_session, + ), + }, + }, + indent=2, + ) + ) + + +if __name__ == "__main__": + main() diff --git a/skills/security-assessment/scripts/fetch_bucket_telemetry.py b/skills/security-assessment/scripts/fetch_bucket_telemetry.py new file mode 100644 index 0000000..84f889b --- /dev/null +++ b/skills/security-assessment/scripts/fetch_bucket_telemetry.py @@ -0,0 +1,208 @@ +"""Script to fetch telemetry from a Storage Insights bucket.""" + +import argparse +from collections.abc import Mapping, Sequence +import json +import textwrap +from typing import Any + +# pylint: disable=g-import-not-at-top +try: + from google3.blobstore2.storage_management.gemini.si_agent.skills.security_assessment.scripts import cloud_rest_helpers_nodeps + from google3.blobstore2.storage_management.gemini.si_agent.skills.security_assessment.scripts import validation +except ImportError: + import cloud_rest_helpers_nodeps # type: ignore + import validation # type: ignore +# pylint: enable=g-import-not-at-top + +_SKILL = "security-assessment" +_SCRIPT = "fetch-bucket-telemetry" + + +# TODO: Update scoring logic to be more robust. +def _calculate_risk_score(telemetry: Sequence[Mapping[str, Any]]) -> str: + """Calculates the risk score for a list of telemetry data. + + Args: + telemetry: List of telemetry data to calculate the risk score. + + Returns: + The risk score as a string in the format "X/100". + """ + risky_missing_fields = [ + "ubla_enabled", + "soft_delete_retention_seconds", + "enforced_encryption_types", + "versioning", + ] + + bucket_risk_score_total = 0 + for bucket in telemetry: + for risky_missing_field in risky_missing_fields: + if not bucket[risky_missing_field]: + bucket_risk_score_total += 1 + + bucket_risk_score_average = int( + ( + bucket_risk_score_total + / (len(risky_missing_fields) * (len(telemetry) or 1)) + ) + * 100 + ) + return f"{bucket_risk_score_average}/100" + + +def fetch_bucket_telemetry( + *, + project_id: str, + dataset_name: str, + bucket_names: Sequence[str] | None = None, +) -> tuple[str, Sequence[Mapping[str, Any]]]: + """Fetches bucket telemetry signals from BigQuery. + + Queries the latest snapshot from the Storage Insights bucket_attributes + table. If bucket_names is provided, only those buckets are returned; + otherwise all buckets in the dataset are returned. + + Args: + project_id: The GCP project ID. + dataset_name: The linked dataset name. + bucket_names: Optional list of bucket names to filter on. + + Returns: + A tuple containing the overall risk score and the telemetry data. + + Raises: + cloud_rest_helpers_nodeps.CloudRestError: If the REST query fails. + RuntimeError: If the query job does not complete in time. + """ + validation.validate_inputs(project_id, dataset_name, bucket_names) + + if not bucket_names: + bucket_filter = "" + query_parameters = [] + else: + bucket_filter = "AND name IN UNNEST(@bucket_names)" + query_parameters = [{ + "name": "bucket_names", + "parameterType": { + "type": "ARRAY", + "arrayType": {"type": "STRING"}, + }, + "parameterValue": { + "arrayValues": [ + {"value": bucket_name} for bucket_name in bucket_names + ] + }, + }] + + # Sometimes regional jobs are ingested with the new snapshotTime before others + # are finished. This means it's possible to have two snapshots that are both + # latest for different buckets. We partition to find this latest snapshot per + # bucket. + query = textwrap.dedent(f""" + SELECT + name AS bucket_name, + versioning, + iamConfiguration.uniformBucketLevelAccess.enabled AS ubla_enabled, + softDeletePolicy.retentionDurationSeconds + AS soft_delete_retention_seconds, + enforcedEncryptionAllowedTypes, + resourceTags + FROM + ( + SELECT + *, + ROW_NUMBER() OVER (PARTITION BY name ORDER BY snapshotTime DESC) as rn + FROM + `{project_id}.{dataset_name}.bucket_attributes_view` + WHERE + snapshotTime >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 2 DAY) + {bucket_filter} + ) + WHERE + rn = 1; + """).strip() + + payload = { + "query": query, + "useLegacySql": False, + } + if query_parameters: + payload["parameterMode"] = "NAMED" + payload["queryParameters"] = query_parameters + + with cloud_rest_helpers_nodeps.get_authorized_session( + skill=_SKILL, script=_SCRIPT, project_id=project_id + ) as session: + schema_fields, rows = cloud_rest_helpers_nodeps.execute_bigquery_query( + project_id=project_id, + payload=payload, + session=session, + skill=_SKILL, + script=_SCRIPT, + ) + + telemetry = [] + for r in rows: + r_f = r.get("f", []) + row_dict = { + f["name"]: cloud_rest_helpers_nodeps.parse_bq_value( + r_f[i].get("v") if i < len(r_f) else None, f + ) + for i, f in enumerate(schema_fields) + } + telemetry.append({ + "bucket": row_dict.get("bucket_name"), + "ubla_enabled": bool(row_dict.get("ubla_enabled")), + "soft_delete_retention_seconds": row_dict.get( + "soft_delete_retention_seconds" + ), + "enforced_encryption_types": list( + row_dict.get("enforcedEncryptionAllowedTypes") or [] + ), + "versioning": row_dict.get("versioning"), + "tags": row_dict.get("resourceTags") or [], + }) + + return _calculate_risk_score(telemetry), telemetry + + +def main() -> None: + parser = argparse.ArgumentParser( + description="Script to fetch telemetry from a Storage Insights bucket." + ) + parser.add_argument( + "--project_id", type=str, required=True, help="The GCP project ID." + ) + parser.add_argument( + "--dataset_name", type=str, required=True, help="The linked dataset name." + ) + parser.add_argument( + "--bucket_names", + type=str, + help=( + "Comma-separated list of bucket names to query. If not provided, all" + " buckets in the dataset are analyzed." + ), + ) + args = parser.parse_args() + + bucket_names = ( + [b.strip() for b in args.bucket_names.split(",")] + if args.bucket_names + else None + ) + + risk_score, telemetry = fetch_bucket_telemetry( + project_id=args.project_id, + dataset_name=args.dataset_name, + bucket_names=bucket_names, + ) + print( + json.dumps({"risk_score": risk_score, "telemetry": telemetry}, indent=2) + ) + + +if __name__ == "__main__": + main() diff --git a/skills/security-assessment/scripts/fetch_object_telemetry.py b/skills/security-assessment/scripts/fetch_object_telemetry.py new file mode 100644 index 0000000..084d97b --- /dev/null +++ b/skills/security-assessment/scripts/fetch_object_telemetry.py @@ -0,0 +1,161 @@ +"""Script to fetch aggregated object-level telemetry for buckets in a Storage Insights dataset.""" + +import argparse +from collections.abc import Mapping, Sequence +import json +import textwrap +from typing import Any + +# pylint: disable=g-import-not-at-top +try: + from google3.blobstore2.storage_management.gemini.si_agent.skills.security_assessment.scripts import cloud_rest_helpers_nodeps + from google3.blobstore2.storage_management.gemini.si_agent.skills.security_assessment.scripts import validation +except ImportError: + import cloud_rest_helpers_nodeps # type: ignore + import validation # type: ignore +# pylint: enable=g-import-not-at-top + +_SKILL = "security-assessment" +_SCRIPT = "fetch-object-telemetry" + + +def fetch_object_telemetry( + project_id: str, + dataset_name: str, + bucket_names: Sequence[str] | None = None, +) -> Sequence[Mapping[str, Any]]: + """Fetches object telemetry signals from BigQuery. + + Queries the latest snapshot from the Storage Insights + object_attributes_latest_snapshot_view. + + Args: + project_id: The GCP project ID. + dataset_name: The linked dataset name. + bucket_names: Optional list of bucket names to filter on. + + Returns: + A list of dictionaries containing telemetry signals per object. + + Raises: + cloud_rest_helpers_nodeps.CloudRestError: If the REST query fails. + RuntimeError: If the query job does not complete in time. + """ + validation.validate_inputs(project_id, dataset_name, bucket_names) + + if not bucket_names: + bucket_filter = "" + query_parameters = [] + else: + bucket_filter = "WHERE bucket IN UNNEST(@bucket_names)" + query_parameters = [{ + "name": "bucket_names", + "parameterType": { + "type": "ARRAY", + "arrayType": {"type": "STRING"}, + }, + "parameterValue": { + "arrayValues": [ + {"value": bucket_name} for bucket_name in bucket_names + ] + }, + }] + + query = textwrap.dedent(f""" + SELECT + bucket, + COUNT(*) AS total_objects, + COUNTIF(securityInsights.publicAccessInsight.readPublicAccess = 'PUBLIC') AS public_read_objects, + COUNTIF(securityInsights.publicAccessInsight.writePublicAccess = 'PUBLIC') AS public_write_objects, + COUNTIF(retentionExpirationTime IS NOT NULL) AS objects_with_retention, + COUNTIF(encryptionType = 'CMEK') AS cmek_objects, + COUNTIF(encryptionType = 'CSEK') AS csek_objects, + COUNTIF(encryptionType = 'GMEK') AS gmek_objects + FROM + `{project_id}.{dataset_name}.object_attributes_latest_snapshot_view` + {bucket_filter} + GROUP BY + bucket + """).strip() + + payload = { + "query": query, + "useLegacySql": False, + } + if query_parameters: + payload["parameterMode"] = "NAMED" + payload["queryParameters"] = query_parameters + + with cloud_rest_helpers_nodeps.get_authorized_session( + skill=_SKILL, script=_SCRIPT, project_id=project_id + ) as session: + schema_fields, rows = cloud_rest_helpers_nodeps.execute_bigquery_query( + project_id=project_id, + payload=payload, + session=session, + skill=_SKILL, + script=_SCRIPT, + ) + + telemetry = [] + for r in rows: + r_f = r.get("f", []) + row_dict = { + f["name"]: cloud_rest_helpers_nodeps.parse_bq_value( + r_f[i].get("v") if i < len(r_f) else None, f + ) + for i, f in enumerate(schema_fields) + } + telemetry.append({ + "bucket": row_dict.get("bucket"), + "total_objects": row_dict.get("total_objects"), + "public_read_objects": row_dict.get("public_read_objects"), + "public_write_objects": row_dict.get("public_write_objects"), + "objects_with_retention": row_dict.get("objects_with_retention"), + "cmek_objects": row_dict.get("cmek_objects"), + "csek_objects": row_dict.get("csek_objects"), + "gmek_objects": row_dict.get("gmek_objects"), + }) + + return telemetry + + +def main() -> None: + parser = argparse.ArgumentParser( + description=( + "Script to fetch aggregated object-level telemetry for buckets in a" + " Storage Insights dataset." + ) + ) + parser.add_argument( + "--project_id", type=str, required=True, help="The GCP project ID." + ) + parser.add_argument( + "--dataset_name", type=str, required=True, help="The linked dataset name." + ) + parser.add_argument( + "--bucket_names", + type=str, + help=( + "Comma-separated list of bucket names to query. If not provided, all" + " buckets in the dataset are analyzed." + ), + ) + args = parser.parse_args() + + bucket_names = ( + [b.strip() for b in args.bucket_names.split(",")] + if args.bucket_names + else None + ) + + telemetry = fetch_object_telemetry( + project_id=args.project_id, + dataset_name=args.dataset_name, + bucket_names=bucket_names, + ) + print(json.dumps(telemetry, indent=2)) + + +if __name__ == "__main__": + main() diff --git a/skills/security-assessment/scripts/list_datasets.py b/skills/security-assessment/scripts/list_datasets.py new file mode 100644 index 0000000..3f835d2 --- /dev/null +++ b/skills/security-assessment/scripts/list_datasets.py @@ -0,0 +1,101 @@ +"""Script to list Storage Insights datasets. + +This script lists all DatasetConfigs within a specified GCP project and location +using the Storage Insights API. +""" + +from __future__ import annotations + +import argparse +from collections.abc import Mapping, MutableSequence, Sequence +import json +from typing import Any + +# pylint: disable=g-import-not-at-top +try: + from google3.blobstore2.storage_management.gemini.si_agent.skills.security_assessment.scripts import cloud_rest_helpers_nodeps +except ImportError: + import cloud_rest_helpers_nodeps # type: ignore +# pylint: enable=g-import-not-at-top + +_TIMEOUT_SECONDS = 10 +_SKILL = "security-assessment" +_SCRIPT = "list-datasets" + + +def list_datasets( + *, + project_id: str, + location: str, + session: Any, +) -> Sequence[Mapping[str, str]]: + """Lists Storage Insights linked datasets using public HTTP REST endpoint. + + Args: + project_id: The GCP project ID. + location: The dataset location. + session: The authorized session to make requests. + + Returns: + A list of dictionaries mapping {"{LOCATION_NAME}": "{DATASET_ID}", + "description": + "{DESCRIPTION}"} where each field is extracted from the SI dataset. + + Raises: + RuntimeError: If the request fails. + """ + url = f"https://storageinsights.googleapis.com/v1/projects/{project_id}/locations/{location}/datasetConfigs" + try: + response = session.get(url, timeout=_TIMEOUT_SECONDS) + response.raise_for_status() + except cloud_rest_helpers_nodeps.CloudRestError as e: + raise RuntimeError( + f"Failed to fetch dataset configs for '{location}':'{project_id}' with" + f" error: {e!r}" + ) from e + else: + data = response.json() + configs = data.get("datasetConfigs") or [] + result: MutableSequence[Mapping[str, str]] = [] + for config in configs: + name = config.get("name") or "" + link = config.get("link") or {} + description = config.get("description") or "" + if link.get("linked") and link.get("dataset") and "/locations/" in name: + _, location_path = name.split("/locations/") + loc, *_ = location_path.split("/") + result.append({loc: str(link.get("dataset")), "description": description}) + return result + + +def main() -> None: + parser = argparse.ArgumentParser( + description="List Storage Insights datasets for a project and location." + ) + parser.add_argument( + "--project_id", type=str, required=True, help="The GCP project ID." + ) + parser.add_argument( + "--location", type=str, default="-", help="The dataset location." + ) + args = parser.parse_args() + + try: + with cloud_rest_helpers_nodeps.get_authorized_session( + skill=_SKILL, script=_SCRIPT, project_id=args.project_id + ) as session: + datasets = list_datasets( + project_id=args.project_id, + location=args.location, + session=session, + ) + print(json.dumps(datasets, indent=2)) + except ( + cloud_rest_helpers_nodeps.CredentialsError, + RuntimeError, + ) as e: + print(json.dumps({"error": repr(e)})) + + +if __name__ == "__main__": + main() diff --git a/skills/security-assessment/scripts/preflight_permissions.py b/skills/security-assessment/scripts/preflight_permissions.py new file mode 100644 index 0000000..2c46987 --- /dev/null +++ b/skills/security-assessment/scripts/preflight_permissions.py @@ -0,0 +1,369 @@ +"""Preflight permission check for the security assessment skill. + +Probes prerequisites the assessment depends on and emits a structured report. +Required checks gate the assessment; recommended checks downgrade it to a +partial assessment with surfaced gaps. +""" + +from __future__ import annotations + +import argparse +from collections.abc import Mapping +import json +import sys +from typing import Any + +# pylint: disable=g-import-not-at-top +try: + from google3.blobstore2.storage_management.gemini.si_agent.skills.security_assessment.scripts import cloud_rest_helpers_nodeps + from google3.blobstore2.storage_management.gemini.si_agent.skills.security_assessment.scripts import validation +except ImportError: + import cloud_rest_helpers_nodeps # type: ignore + import validation # type: ignore +# pylint: enable=g-import-not-at-top + +_BIGQUERY_API = "https://bigquery.googleapis.com/bigquery/v2/projects" +_STORAGE_INSIGHTS_API = "https://storageinsights.googleapis.com/v1/projects" +_SKILL = "security-assessment" +_SCRIPT = "preflight-permissions" + + +def _bq_failure(*, impact: str, fix: str, error: str) -> Mapping[str, Any]: + return { + "check": "bigquery_dataset_access", + "status": "missing", + "impact": impact, + "fix": fix, + "error": error, + } + + +def _si_failure(*, impact: str, fix: str, error: str) -> Mapping[str, Any]: + return { + "check": "storage_insights_enabled", + "status": "missing", + "impact": impact, + "fix": fix, + "error": error, + } + + +def _check_storage_insights_enabled( + *, + project_id: str, + session: Any, +) -> Mapping[str, Any]: + """Probes whether the Storage Insights API is enabled on the project. + + Storage Insights unlocks the bucket- and object-level telemetry that the + full assessment depends on. It is a *recommended* prerequisite, not a hard + gate: when it is unavailable the assessment degrades to a project-level + posture report (IAM, VPC-SC, audit logs, org policies, Model Armor) and + recommends enabling SI to unlock the full bucket- and object-level + assessment. The downstream bigquery_dataset_access check covers the case + where SI is enabled but no linked dataset exists yet (or the wrong dataset + name was supplied). + + Args: + project_id: GCP project ID to probe. + session: Authorized session for REST requests. + + Returns: + Mapping with keys `check`, `status`, and on failure `impact`, `fix`, + `error`. + """ + try: + validation.validate_project_id(project_id) + except (TypeError, ValueError) as e: + return _si_failure( + impact=f"Input validation failed: {e}", + fix="Provide a valid GCP project ID.", + error=str(e), + ) + + url = f"{_STORAGE_INSIGHTS_API}/{project_id}/locations/-/datasetConfigs" + try: + response = session.get(url, timeout=10) + response.raise_for_status() + except cloud_rest_helpers_nodeps.HttpError as e: + error = str(e) + status_code = e.status_code + body = (e.body or "").lower() + + if status_code == 403 and "has not been used" in body: + return _si_failure( + impact=( + "Storage Insights is not enabled on the project, so bucket- and" + " object-level telemetry is unavailable. The assessment will" + " cover project-level posture only." + ), + fix=( + "To unlock the full bucket- and object-level assessment, enable" + " Storage Insights: `gcloud services enable" + f" storageinsights.googleapis.com --project {project_id}`." + ), + error=error, + ) + elif status_code == 403: + return _si_failure( + impact=( + "Caller lacks permission to list Storage Insights dataset" + " configs, so bucket- and object-level telemetry is" + " unavailable. The assessment will cover project-level posture" + " only." + ), + fix=( + "To unlock the full assessment, grant" + " roles/storageinsights.viewer (or a role containing" + " storageinsights.datasetConfigs.list) on the project." + ), + error=error, + ) + + return _si_failure( + impact=f"Unexpected HTTP {status_code} from Storage Insights API.", + fix=( + "Retry the preflight check. If the failure persists, see" + " https://status.cloud.google.com for ongoing GCP incidents." + ), + error=error, + ) + except cloud_rest_helpers_nodeps.CloudRestError as e: + return _si_failure( + impact="Network or transport failure contacting Storage Insights.", + fix="Check network connectivity and GCP reachability.", + error=str(e), + ) + + return {"check": "storage_insights_enabled", "status": "ok"} + + +def _check_adc() -> Mapping[str, Any]: + try: + # Constructing the session fetches a token via gcloud, which validates + # that the user has working credentials. + cloud_rest_helpers_nodeps.get_authorized_session( + skill=_SKILL, script=_SCRIPT + ) + return {"check": "adc", "status": "ok"} + except cloud_rest_helpers_nodeps.CredentialsError as e: + return { + "check": "adc", + "status": "missing", + "impact": "Cannot make any authenticated GCP API calls.", + "fix": "Run: gcloud auth application-default login", + "error": str(e), + } + + +def _check_bigquery_dataset_access( + *, + project_id: str, + dataset_name: str, + session: Any, +) -> Mapping[str, Any]: + """Probes BigQuery dataset accessibility via a zero-cost dry-run query. + + Validates inputs, then issues a `dryRun: true` query against the Storage + Insights `bucket_attributes_view`. Distinguishes the common failure modes + (API not enabled, dataset/view not found, missing dataset permissions, + network/transport error) and returns a structured result with a + user-actionable fix per case. + + Args: + project_id: GCP project ID hosting the Storage Insights dataset. + dataset_name: BigQuery dataset name to probe. + session: Authorized session for REST requests. + + Returns: + Mapping with keys `check`, `status`, and on failure `impact`, `fix`, + `error`. + """ + try: + validation.validate_inputs(project_id, dataset_name, None) + except (TypeError, ValueError) as e: + return { + "check": "bigquery_dataset_access", + "status": "missing", + "impact": f"Input validation failed: {e}", + "fix": ( + "project_id must not contain backticks; dataset_name must match" + " ^[a-zA-Z0-9_]{1,1024}$" + ), + "error": str(e), + } + + try: + with session.request( + method="POST", + url=f"{_BIGQUERY_API}/{project_id}/queries", + json={ + "query": ( + "SELECT 1 FROM" + f" `{project_id}.{dataset_name}.bucket_attributes_view` LIMIT 0" + ), + "useLegacySql": False, + "dryRun": True, + "labels": cloud_rest_helpers_nodeps.bigquery_labels( + _SKILL, _SCRIPT + ), + }, + timeout=10, + ) as response: + response.raise_for_status() + return {"check": "bigquery_dataset_access", "status": "ok"} + except cloud_rest_helpers_nodeps.HttpError as e: + error = str(e) + status_code = e.status_code + body = e.body or "" + + if status_code == 403 and "has not been used" in body.lower(): + return _bq_failure( + impact="BigQuery API is not enabled on the project.", + fix=( + "Run: gcloud services enable bigquery.googleapis.com --project" + f" {project_id}" + ), + error=error, + ) + elif status_code == 404: + return _bq_failure( + impact="Dataset or view not found.", + fix=( + "Run scripts/list_datasets.py to see available datasets. If" + " none exist, create a Storage Insights dataset config in the" + " Cloud Console: Cloud Storage > Insights > Datasets > Create." + ), + error=error, + ) + elif status_code == 403: + return _bq_failure( + impact="Caller lacks BigQuery permissions to query the dataset.", + fix=( + "Grant roles/bigquery.dataViewer on the dataset and" + " roles/bigquery.jobUser on the project." + ), + error=error, + ) + + return _bq_failure( + impact=f"Unexpected HTTP {status_code} from BigQuery.", + fix=( + "Retry the preflight check. If the failure persists, see" + " https://status.cloud.google.com for ongoing GCP incidents." + ), + error=error, + ) + except cloud_rest_helpers_nodeps.CloudRestError as e: + return _bq_failure( + impact="Network or transport failure contacting BigQuery.", + fix="Check network connectivity and GCP reachability.", + error=str(e), + ) + + +def run_preflight(*, project_id: str, dataset_name: str) -> Mapping[str, Any]: + """Runs all preflight checks and aggregates them into a readiness report. + + Only the ADC check is *required*: without working credentials no GCP API + call can be made and the assessment cannot run at all. The Storage Insights + enablement and BigQuery dataset access checks are *recommended* — when they + fail the assessment does not stop; it degrades to a project-level posture + report (IAM, VPC-SC, audit logs, org policies, Model Armor) and recommends + enabling Storage Insights / creating a linked dataset to unlock the full + bucket- and object-level assessment. + + The returned `analysis_scope` tells the skill which mode to run: + * "full" — SI enabled and the linked dataset is queryable. + * "project_only" — ADC works but SI and/or the dataset are unavailable. + * "none" — ADC failed; the assessment cannot run. + + Args: + project_id: GCP project ID for downstream checks. + dataset_name: Storage Insights linked dataset name. + + Returns: + Mapping with `ready_to_proceed` (bool), `analysis_scope` (str), `required` + (list of check results), `recommended` (list of check results), and a + human-readable `summary`. + """ + adc = _check_adc() + if adc["status"] != "ok": + return { + "ready_to_proceed": False, + "analysis_scope": "none", + "required": [adc], + "recommended": [], + "summary": ( + "ADC check failed — cannot authenticate to GCP, so the assessment" + " cannot run. Address the required check below and re-run" + " preflight." + ), + } + + with cloud_rest_helpers_nodeps.get_authorized_session( + skill=_SKILL, script=_SCRIPT, project_id=project_id + ) as session: + si_check = _check_storage_insights_enabled( + project_id=project_id, session=session + ) + if si_check["status"] == "ok": + bq_check = _check_bigquery_dataset_access( + project_id=project_id, + dataset_name=dataset_name, + session=session, + ) + else: + bq_check = { + "check": "bigquery_dataset_access", + "status": "skipped", + "impact": ( + "Skipped because Storage Insights is not enabled on the project." + ), + } + + recommended = [si_check, bq_check] + full = si_check["status"] == "ok" and bq_check["status"] == "ok" + if full: + analysis_scope = "full" + summary = "All checks passed — running the full assessment." + else: + analysis_scope = "project_only" + summary = ( + "Storage Insights telemetry is unavailable — running a project-level" + " assessment only. See the recommended check(s) below to unlock the" + " full bucket- and object-level assessment." + ) + + return { + "ready_to_proceed": True, + "analysis_scope": analysis_scope, + "required": [adc], + "recommended": recommended, + "summary": summary, + } + + +def main() -> None: + parser = argparse.ArgumentParser( + description=( + "Preflight permission check for the security assessment skill." + ) + ) + parser.add_argument("--project_id", required=True, help="GCP project ID") + parser.add_argument( + "--dataset_name", + required=True, + help="Storage Insights linked BigQuery dataset name", + ) + args = parser.parse_args() + + result = run_preflight( + project_id=args.project_id, dataset_name=args.dataset_name + ) + print(json.dumps(result, indent=2)) + sys.exit(0) + + +if __name__ == "__main__": + main() diff --git a/skills/security-assessment/scripts/validation.py b/skills/security-assessment/scripts/validation.py new file mode 100644 index 0000000..107afa7 --- /dev/null +++ b/skills/security-assessment/scripts/validation.py @@ -0,0 +1,71 @@ +"""Validation logic for telemetry scripts.""" + +import re + + +def validate_project_id(project_id: str) -> None: + """Validates a GCP project ID for safe SQL/URL interpolation. + + Args: + project_id: The GCP project ID. + + Raises: + TypeError: If project_id is not a string. + ValueError: If project_id contains a backtick. + """ + if not isinstance(project_id, str): + raise TypeError(f"project_id must be a string, got {type(project_id)}") + + # Legacy project IDs can violate modern format rules, so we only block + # backticks which would break out of BigQuery's backtick-delimited + # identifiers. + if "`" in project_id: + raise ValueError( + "project_id contains backtick, which is not safe for SQL" + f" interpolation: {project_id}" + ) + + +def validate_inputs( + project_id: str, + dataset_name: str, + bucket_names: list[str] | None = None, +) -> None: + """Validates project_id, dataset_name, and bucket_names. + + This function helps ensure that the calling agent passes correctly typed + arguments and guards against SQL injection in interpolated strings + (project_id and dataset_name). + + Args: + project_id: The GCP project ID. + dataset_name: The linked dataset name. + bucket_names: Optional list of bucket names to filter on. + + Raises: + TypeError: If any of the inputs have invalid types. + ValueError: If any of the inputs are invalidly formatted. + """ + validate_project_id(project_id) + if not isinstance(dataset_name, str): + raise TypeError(f"dataset_name must be a string, got {type(dataset_name)}") + + # Matches 1-1024 char BigQuery dataset names: alphanumeric and underscores. + dataset_name_regex = re.compile(r"^[a-zA-Z0-9_]{1,1024}$") + if not dataset_name_regex.match(dataset_name): + raise ValueError(f"Invalid dataset_name format: {dataset_name}") + + if bucket_names is not None: + if not isinstance(bucket_names, list): + raise TypeError(f"bucket_names must be a list, got {type(bucket_names)}") + + # Matches 3-63 char GCS bucket names: alphanumeric start/end, + # alphanumeric/dot/dash/underscore middle. + bucket_name_regex = re.compile(r"^[a-z0-9][a-z0-9._-]{1,61}[a-z0-9]$") + for bucket_name in bucket_names: + if not isinstance(bucket_name, str): + raise TypeError( + f"bucket name must be a string, got {type(bucket_name)}" + ) + if not bucket_name_regex.match(bucket_name): + raise ValueError(f"Invalid bucket_name format: {bucket_name}") From 68ff71c352d922c0dd9e9c46c4d9cb1f48658850 Mon Sep 17 00:00:00 2001 From: Vikram Ruppa-Kasani Date: Mon, 8 Jun 2026 13:08:54 -0700 Subject: [PATCH 2/2] Remove google3 import paths and internal lint pragmas from scripts --- .../scripts/evaluate_project_security_posture.py | 7 +------ .../scripts/fetch_bucket_telemetry.py | 10 ++-------- .../scripts/fetch_object_telemetry.py | 10 ++-------- skills/security-assessment/scripts/list_datasets.py | 7 +------ .../scripts/preflight_permissions.py | 10 ++-------- 5 files changed, 8 insertions(+), 36 deletions(-) diff --git a/skills/security-assessment/scripts/evaluate_project_security_posture.py b/skills/security-assessment/scripts/evaluate_project_security_posture.py index a673aeb..b2d9625 100644 --- a/skills/security-assessment/scripts/evaluate_project_security_posture.py +++ b/skills/security-assessment/scripts/evaluate_project_security_posture.py @@ -8,12 +8,7 @@ import json from typing import Any, TypedDict -# pylint: disable=g-import-not-at-top -try: - from google3.blobstore2.storage_management.gemini.si_agent.skills.security_assessment.scripts import cloud_rest_helpers_nodeps -except ImportError: - import cloud_rest_helpers_nodeps # type: ignore -# pylint: enable=g-import-not-at-top +import cloud_rest_helpers_nodeps _TIMEOUT_SECONDS = 5 _STORAGE_API = "storage.googleapis.com" diff --git a/skills/security-assessment/scripts/fetch_bucket_telemetry.py b/skills/security-assessment/scripts/fetch_bucket_telemetry.py index 84f889b..4c8576b 100644 --- a/skills/security-assessment/scripts/fetch_bucket_telemetry.py +++ b/skills/security-assessment/scripts/fetch_bucket_telemetry.py @@ -6,14 +6,8 @@ import textwrap from typing import Any -# pylint: disable=g-import-not-at-top -try: - from google3.blobstore2.storage_management.gemini.si_agent.skills.security_assessment.scripts import cloud_rest_helpers_nodeps - from google3.blobstore2.storage_management.gemini.si_agent.skills.security_assessment.scripts import validation -except ImportError: - import cloud_rest_helpers_nodeps # type: ignore - import validation # type: ignore -# pylint: enable=g-import-not-at-top +import cloud_rest_helpers_nodeps +import validation _SKILL = "security-assessment" _SCRIPT = "fetch-bucket-telemetry" diff --git a/skills/security-assessment/scripts/fetch_object_telemetry.py b/skills/security-assessment/scripts/fetch_object_telemetry.py index 084d97b..4734fcb 100644 --- a/skills/security-assessment/scripts/fetch_object_telemetry.py +++ b/skills/security-assessment/scripts/fetch_object_telemetry.py @@ -6,14 +6,8 @@ import textwrap from typing import Any -# pylint: disable=g-import-not-at-top -try: - from google3.blobstore2.storage_management.gemini.si_agent.skills.security_assessment.scripts import cloud_rest_helpers_nodeps - from google3.blobstore2.storage_management.gemini.si_agent.skills.security_assessment.scripts import validation -except ImportError: - import cloud_rest_helpers_nodeps # type: ignore - import validation # type: ignore -# pylint: enable=g-import-not-at-top +import cloud_rest_helpers_nodeps +import validation _SKILL = "security-assessment" _SCRIPT = "fetch-object-telemetry" diff --git a/skills/security-assessment/scripts/list_datasets.py b/skills/security-assessment/scripts/list_datasets.py index 3f835d2..73864aa 100644 --- a/skills/security-assessment/scripts/list_datasets.py +++ b/skills/security-assessment/scripts/list_datasets.py @@ -11,12 +11,7 @@ import json from typing import Any -# pylint: disable=g-import-not-at-top -try: - from google3.blobstore2.storage_management.gemini.si_agent.skills.security_assessment.scripts import cloud_rest_helpers_nodeps -except ImportError: - import cloud_rest_helpers_nodeps # type: ignore -# pylint: enable=g-import-not-at-top +import cloud_rest_helpers_nodeps _TIMEOUT_SECONDS = 10 _SKILL = "security-assessment" diff --git a/skills/security-assessment/scripts/preflight_permissions.py b/skills/security-assessment/scripts/preflight_permissions.py index 2c46987..5ef0f47 100644 --- a/skills/security-assessment/scripts/preflight_permissions.py +++ b/skills/security-assessment/scripts/preflight_permissions.py @@ -13,14 +13,8 @@ import sys from typing import Any -# pylint: disable=g-import-not-at-top -try: - from google3.blobstore2.storage_management.gemini.si_agent.skills.security_assessment.scripts import cloud_rest_helpers_nodeps - from google3.blobstore2.storage_management.gemini.si_agent.skills.security_assessment.scripts import validation -except ImportError: - import cloud_rest_helpers_nodeps # type: ignore - import validation # type: ignore -# pylint: enable=g-import-not-at-top +import cloud_rest_helpers_nodeps +import validation _BIGQUERY_API = "https://bigquery.googleapis.com/bigquery/v2/projects" _STORAGE_INSIGHTS_API = "https://storageinsights.googleapis.com/v1/projects"