From 64a2d3fc57a20d7a469671c407ac27bf8d07b5dc Mon Sep 17 00:00:00 2001 From: "google-labs-jules[bot]" <161369871+google-labs-jules[bot]@users.noreply.github.com> Date: Sun, 1 Feb 2026 10:57:39 +0000 Subject: [PATCH] feat: harden input validation for IDs and folder names - Add validate_resource_id for strict ID checking - Switch is_valid_folder_name to whitelist - Integrate validation in list, create, delete workflows - Add tests/test_id_validation.py Co-authored-by: abhimehro <84992105+abhimehro@users.noreply.github.com> --- main.py | 116 ++++++++++++++++++++++++------------ tests/test_id_validation.py | 44 ++++++++++++++ 2 files changed, 123 insertions(+), 37 deletions(-) create mode 100644 tests/test_id_validation.py diff --git a/main.py b/main.py index 86792da4..a6c3d7a4 100644 --- a/main.py +++ b/main.py @@ -397,25 +397,34 @@ def extract_profile_id(text: str) -> str: return text -def is_valid_profile_id_format(profile_id: str) -> bool: - if not re.match(r"^[a-zA-Z0-9_-]+$", profile_id): - return False - if len(profile_id) > 64: +def validate_resource_id( + resource_id: str, type_name: str = "ID", log_errors: bool = True +) -> bool: + """ + Validates a resource ID (Profile ID, Folder ID/PK). + Allowed: Alphanumeric, hyphen, underscore. + Max Length: 64 + """ + if not resource_id: return False - return True + if len(resource_id) > 64: + if log_errors: + log.error(f"Invalid {type_name} length (max 64 chars)") + return False -def validate_profile_id(profile_id: str, log_errors: bool = True) -> bool: - if not is_valid_profile_id_format(profile_id): + if not re.match(r"^[a-zA-Z0-9_-]+$", resource_id): if log_errors: - if not re.match(r"^[a-zA-Z0-9_-]+$", profile_id): - log.error("Invalid profile ID format (contains unsafe characters)") - elif len(profile_id) > 64: - log.error("Invalid profile ID length (max 64 chars)") + log.error(f"Invalid {type_name} format (contains unsafe characters)") return False + return True +def validate_profile_id(profile_id: str, log_errors: bool = True) -> bool: + return validate_resource_id(profile_id, "profile ID", log_errors) + + def is_valid_rule(rule: str) -> bool: """ Validates that a rule is safe to use. @@ -436,15 +445,17 @@ def is_valid_rule(rule: str) -> bool: def is_valid_folder_name(name: str) -> bool: """ Validates folder name to prevent XSS and ensure printability. - Allowed: Anything printable except < > " ' ` + Allowed: Alphanumeric, space, hyphen, dot, underscore, parens, brackets, braces. + Max Length: 64 """ if not name or not name.strip() or not name.isprintable(): return False - # Block XSS and HTML injection characters - # Allow: ( ) [ ] { } for folder names (e.g. "Work (Private)") - dangerous_chars = set("<>\"'`") - if any(c in dangerous_chars for c in name): + if len(name) > 64: + return False + + # Whitelist: Alphanumeric, space, hyphen, dot, underscore, parens, brackets, braces + if not re.match(r"^[a-zA-Z0-9 \-_.()\[\]{}]+$", name): return False return True @@ -627,11 +638,18 @@ def list_existing_folders(client: httpx.Client, profile_id: str) -> Dict[str, st try: data = _api_get(client, f"{API_BASE}/{profile_id}/groups").json() folders = data.get("body", {}).get("groups", []) - return { - f["group"].strip(): f["PK"] - for f in folders - if f.get("group") and f.get("PK") - } + result = {} + for f in folders: + name = f.get("group") + pk = f.get("PK") + if name and pk: + if validate_resource_id(pk, "Folder PK", log_errors=False): + result[name.strip()] = pk + else: + log.warning( + f"Skipping folder '{sanitize_for_log(name)}' with unsafe PK: {sanitize_for_log(pk)}" + ) + return result except (httpx.HTTPError, KeyError) as e: log.error(f"Failed to list existing folders: {sanitize_for_log(e)}") return {} @@ -759,6 +777,10 @@ def _validate_and_fetch(url: str): def delete_folder( client: httpx.Client, profile_id: str, name: str, folder_id: str ) -> bool: + if not validate_resource_id(folder_id, "Folder ID to delete"): + log.error(f"Aborting deletion of {sanitize_for_log(name)}: Unsafe Folder ID") + return False + try: _api_delete(client, f"{API_BASE}/{profile_id}/groups/{folder_id}") log.info("Deleted folder %s (ID %s)", sanitize_for_log(name), folder_id) @@ -793,21 +815,34 @@ def create_folder( # Check if it returned a single group object if isinstance(body, dict) and "group" in body and "PK" in body["group"]: pk = body["group"]["PK"] - log.info( - "Created folder %s (ID %s) [Direct]", sanitize_for_log(name), pk - ) - return str(pk) + if validate_resource_id(str(pk), "New Folder PK"): + log.info( + "Created folder %s (ID %s) [Direct]", sanitize_for_log(name), pk + ) + return str(pk) + else: + log.error( + f"API returned unsafe PK for folder {sanitize_for_log(name)}: {sanitize_for_log(pk)}" + ) + return None # Check if it returned a list containing our group if isinstance(body, dict) and "groups" in body: for grp in body["groups"]: if grp.get("group") == name: - log.info( - "Created folder %s (ID %s) [Direct]", - sanitize_for_log(name), - grp["PK"], - ) - return str(grp["PK"]) + pk = grp["PK"] + if validate_resource_id(str(pk), "New Folder PK"): + log.info( + "Created folder %s (ID %s) [Direct]", + sanitize_for_log(name), + pk, + ) + return str(pk) + else: + log.error( + f"API returned unsafe PK for folder {sanitize_for_log(name)}: {sanitize_for_log(pk)}" + ) + return None except Exception as e: log.debug(f"Could not extract ID from POST response: {e}") @@ -819,12 +854,19 @@ def create_folder( for grp in groups: if grp["group"].strip() == name.strip(): - log.info( - "Created folder %s (ID %s) [Polled]", - sanitize_for_log(name), - grp["PK"], - ) - return str(grp["PK"]) + pk = grp["PK"] + if validate_resource_id(str(pk), "Polled Folder PK"): + log.info( + "Created folder %s (ID %s) [Polled]", + sanitize_for_log(name), + pk, + ) + return str(pk) + else: + log.error( + f"API returned unsafe PK for folder {sanitize_for_log(name)}: {sanitize_for_log(pk)}" + ) + return None except Exception as e: log.warning(f"Error fetching groups on attempt {attempt}: {e}") diff --git a/tests/test_id_validation.py b/tests/test_id_validation.py new file mode 100644 index 00000000..c1f20842 --- /dev/null +++ b/tests/test_id_validation.py @@ -0,0 +1,44 @@ +from unittest.mock import MagicMock +import main + +def test_validate_resource_id_valid(): + assert main.validate_resource_id("12345", "Test ID") is True + assert main.validate_resource_id("abc_def-123", "Test ID") is True + assert main.validate_resource_id("test_id", "Test ID") is True + +def test_validate_resource_id_invalid_format(): + assert main.validate_resource_id("invalid/id", "Test ID", log_errors=False) is False + assert main.validate_resource_id("invalid id", "Test ID", log_errors=False) is False + assert main.validate_resource_id("invalid.id", "Test ID", log_errors=False) is False # dot not allowed in PK + assert main.validate_resource_id("