-
Notifications
You must be signed in to change notification settings - Fork 1
π‘οΈ Sentinel: Fix broken validation and add length limits #425
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. Weβll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1019,6 +1019,10 @@ | |
| """Validates folder ID (PK) format to prevent path traversal.""" | ||
| if not folder_id: | ||
| return False | ||
| if len(folder_id) > 64: | ||
| if log_errors: | ||
| log.error(f"Invalid folder ID length (max 64): {len(folder_id)}") | ||
Check warningCode scanning / Prospector (reported by Codacy) Use lazy % formatting in logging functions (logging-fstring-interpolation) Warning
Use lazy % formatting in logging functions (logging-fstring-interpolation)
Check noticeCode scanning / Pylintpython3 (reported by Codacy) Use lazy % formatting in logging functions Note
Use lazy % formatting in logging functions
|
||
| return False | ||
| if folder_id in (".", "..") or not FOLDER_ID_PATTERN.match(folder_id): | ||
| if log_errors: | ||
| log.error(f"Invalid folder ID format: {sanitize_for_log(folder_id)}") | ||
|
|
@@ -1035,6 +1039,10 @@ | |
| if not rule: | ||
| return False | ||
|
|
||
| # Enforce max length to prevent DoS | ||
| if len(rule) > 255: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The maximum rule length |
||
| return False | ||
|
|
||
| # Strict whitelist to prevent injection | ||
| if not RULE_PATTERN.match(rule): | ||
| return False | ||
|
|
@@ -1056,6 +1064,10 @@ | |
| if not name or not name.strip() or not name.isprintable(): | ||
| return False | ||
|
|
||
| # Enforce max length to prevent DoS (matching profile ID limit) | ||
| if len(name) > 64: | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| return False | ||
|
|
||
| # Check for dangerous characters (pre-compiled at module level for performance) | ||
| if any(c in _DANGEROUS_FOLDER_CHARS or c in _BIDI_CONTROL_CHARS for c in name): | ||
| return False | ||
|
|
@@ -1073,54 +1085,54 @@ | |
| return True | ||
|
|
||
|
|
||
| def validate_folder_data(data: Dict[str, Any], url: str) -> bool: | ||
| """ | ||
| Validates folder JSON data structure and content. | ||
|
|
||
| Checks for required fields (name, action, rules), validates folder name | ||
| and action type, and ensures rules are valid. Logs specific validation errors. | ||
| """ | ||
| if not isinstance(data, dict): | ||
| log.error( | ||
| f"Invalid data from {sanitize_for_log(url)}: Root must be a JSON object." | ||
| ) | ||
| return False | ||
| if "group" not in data: | ||
| log.error(f"Invalid data from {sanitize_for_log(url)}: Missing 'group' key.") | ||
| return False | ||
| if not isinstance(data["group"], dict): | ||
| log.error( | ||
| f"Invalid data from {sanitize_for_log(url)}: 'group' must be an object." | ||
| ) | ||
| return False | ||
| if "group" not in data["group"]: | ||
| log.error( | ||
| f"Invalid data from {sanitize_for_log(url)}: Missing 'group.group' (folder name)." | ||
| ) | ||
| return False | ||
|
|
||
| folder_name = data["group"]["group"] | ||
| if not isinstance(folder_name, str): | ||
| log.error( | ||
| f"Invalid data from {sanitize_for_log(url)}: Folder name must be a string." | ||
| ) | ||
| return False | ||
|
|
||
| if not is_valid_folder_name(folder_name): | ||
| log.error( | ||
| f"Invalid data from {sanitize_for_log(url)}: Invalid folder name (empty, unsafe characters, or non-printable)." | ||
| ) | ||
| return False | ||
|
|
||
| # Validate 'rules' if present (must be a list) | ||
| if "rules" in data and not isinstance(data["rules"], list): | ||
| log.error(f"Invalid data from {sanitize_for_log(url)}: 'rules' must be a list.") | ||
| return False | ||
|
|
||
| # Validate 'rule_groups' if present (must be a list of dicts) | ||
| if "rule_groups" in data: | ||
| if not isinstance(data["rule_groups"], list): | ||
| log.error( | ||
Check warningCode scanning / Pylintpython3 (reported by Codacy) Line too long (108/100) Warning
Line too long (108/100)
Check warningCode scanning / Pylint (reported by Codacy) Line too long (108/100) Warning
Line too long (108/100)
|
||
| f"Invalid data from {sanitize_for_log(url)}: 'rule_groups' must be a list." | ||
| ) | ||
| return False | ||
|
|
@@ -1131,19 +1143,22 @@ | |
| ) | ||
| return False | ||
| if "rules" in rg: | ||
| if not isinstance (rg["rules"], list): | ||
| log. error ( | ||
| f"Invalid data from {sanitize_for_log(url)} : rule_groups[fil].rules must be a list." | ||
| if not isinstance(rg["rules"], list): | ||
| log.error( | ||
| f"Invalid data from {sanitize_for_log(url)}: rule_groups[{i}].rules must be a list." | ||
| ) | ||
| return False | ||
| # Ensure each rule within the group is an object (dict), | ||
| # because later code treats each rule as a mapping (e.g., rule.get(...)). | ||
| for j, rule in enumerate (rgi"rules"1): | ||
| if not isinstance (rule, dict): | ||
| log. error ( | ||
| f"Invalid data from {sanitize_for_log(u rl)}: rule_groups[fiΕl.rules[kiΕ] must be an object." | ||
| ) | ||
| return False | ||
| # Ensure each rule within the group is an object (dict), | ||
| # because later code treats each rule as a mapping (e.g., rule.get(...)). | ||
| for j, rule in enumerate(rg["rules"]): | ||
| if not isinstance(rule, dict): | ||
| log.error( | ||
| f"Invalid data from {sanitize_for_log(url)}: rule_groups[{i}].rules[{j}] must be an object." | ||
|
||
| ) | ||
| return False | ||
|
|
||
| return True | ||
|
|
||
|
|
||
| # Lock to protect updates to _api_stats in multi-threaded contexts. | ||
| # Without this, concurrent increments can lose updates because `+=` is not atomic. | ||
|
|
@@ -1285,216 +1300,216 @@ | |
| time.sleep(wait_time) | ||
|
|
||
|
|
||
| def _gh_get(url: str) -> Dict: | ||
| """ | ||
| Fetch blocklist data from URL with HTTP cache header support. | ||
|
|
||
| CACHING STRATEGY: | ||
| 1. Check in-memory cache first (fastest) | ||
| 2. Check disk cache and send conditional request (If-None-Match/If-Modified-Since) | ||
| 3. If 304 Not Modified: reuse cached data (cache validation) | ||
| 4. If 200 OK: download new data and update cache | ||
|
|
||
| SECURITY: Validates data structure regardless of cache source | ||
| """ | ||
| global _cache_stats, _api_stats | ||
|
|
||
| # First check: Quick check without holding lock for long | ||
| with _cache_lock: | ||
| if url in _cache: | ||
| _cache_stats["hits"] += 1 | ||
| return _cache[url] | ||
|
|
||
| # Track that we're about to make a blocklist fetch | ||
| with _cache_lock: | ||
| _api_stats["blocklist_fetches"] += 1 | ||
|
|
||
| # Check disk cache for TTL-based hit or conditional request headers | ||
| headers = {} | ||
| cached_entry = _disk_cache.get(url) | ||
| if cached_entry: | ||
| last_validated = cached_entry.get("last_validated", 0) | ||
| if time.time() - last_validated < CACHE_TTL_SECONDS: | ||
| # Within TTL: return cached data directly without any HTTP request | ||
| data = cached_entry["data"] | ||
| with _cache_lock: | ||
| _cache[url] = data | ||
| _cache_stats["hits"] += 1 | ||
| log.debug(f"Disk cache hit (within TTL) for {sanitize_for_log(url)}") | ||
| return data | ||
| # Beyond TTL: send conditional request using cached ETag/Last-Modified | ||
| # Server returns 304 if content hasn't changed | ||
| # NOTE: Cached values may be None if the server didn't send these headers. | ||
| # httpx requires header values to be str/bytes, so we only add headers | ||
| # when the cached value is truthy. | ||
| etag = cached_entry.get("etag") | ||
| if etag: | ||
| headers["If-None-Match"] = etag | ||
| last_modified = cached_entry.get("last_modified") | ||
| if last_modified: | ||
| headers["If-Modified-Since"] = last_modified | ||
|
|
||
| # Fetch data (or validate cache) | ||
| # Explicitly let HTTPError propagate (no need to catch just to re-raise) | ||
| try: | ||
| with _gh.stream("GET", url, headers=headers) as r: | ||
| # Handle 304 Not Modified - cached data is still valid | ||
| if r.status_code == 304: | ||
| if cached_entry and "data" in cached_entry: | ||
| log.debug(f"Cache validated (304) for {sanitize_for_log(url)}") | ||
| _cache_stats["validations"] += 1 | ||
|
|
||
| # Update in-memory cache with validated data | ||
| data = cached_entry["data"] | ||
| with _cache_lock: | ||
| _cache[url] = data | ||
|
|
||
| # Update timestamp in disk cache to track last validation | ||
| cached_entry["last_validated"] = time.time() | ||
| return data | ||
| else: | ||
| # Shouldn't happen, but handle gracefully | ||
| log.warning(f"Got 304 but no cached data for {sanitize_for_log(url)}, re-fetching") | ||
| _cache_stats["errors"] += 1 | ||
| # Close the original streaming response before retrying | ||
| r.close() | ||
| # Retry without conditional headers using streaming again so that | ||
| # MAX_RESPONSE_SIZE and related protections still apply. | ||
| headers = {} | ||
| with _gh.stream("GET", url, headers=headers) as r_retry: | ||
| r_retry.raise_for_status() | ||
|
|
||
| # 1. Check Content-Length header if present | ||
| cl = r_retry.headers.get("Content-Length") | ||
| if cl: | ||
| try: | ||
| if int(cl) > MAX_RESPONSE_SIZE: | ||
| raise ValueError( | ||
| f"Response too large from {sanitize_for_log(url)} " | ||
| f"({int(cl) / (1024 * 1024):.2f} MB)" | ||
| ) | ||
| except ValueError as e: | ||
| # Only catch the conversion error, let the size error propagate | ||
| if "Response too large" in str(e): | ||
| raise e | ||
| log.warning( | ||
| f"Malformed Content-Length header from {sanitize_for_log(url)}: {cl!r}. " | ||
| "Falling back to streaming size check." | ||
| ) | ||
|
|
||
| # 2. Stream and check actual size | ||
| chunks = [] | ||
| current_size = 0 | ||
| for chunk in r_retry.iter_bytes(): | ||
| current_size += len(chunk) | ||
| if current_size > MAX_RESPONSE_SIZE: | ||
| raise ValueError( | ||
| f"Response too large from {sanitize_for_log(url)} " | ||
| f"(> {MAX_RESPONSE_SIZE / (1024 * 1024):.2f} MB)" | ||
| ) | ||
| chunks.append(chunk) | ||
|
|
||
| try: | ||
| data = json.loads(b"".join(chunks)) | ||
| except json.JSONDecodeError as e: | ||
| raise ValueError( | ||
| f"Invalid JSON response from {sanitize_for_log(url)}" | ||
| ) from e | ||
|
|
||
| # Store cache headers for future conditional requests | ||
| # ETag is preferred over Last-Modified (more reliable) | ||
| etag = r_retry.headers.get("ETag") | ||
| last_modified = r_retry.headers.get("Last-Modified") | ||
|
|
||
| # Update disk cache with new data and headers | ||
| _disk_cache[url] = { | ||
| "data": data, | ||
| "etag": etag, | ||
| "last_modified": last_modified, | ||
| "fetched_at": time.time(), | ||
| "last_validated": time.time(), | ||
| } | ||
|
|
||
| _cache_stats["misses"] += 1 | ||
| return data | ||
|
|
||
| r.raise_for_status() | ||
|
|
||
| # Security: Validate Content-Type | ||
| # Prevent processing of unexpected content types (e.g., HTML/XML from captive portals or attack sites) | ||
| content_type = r.headers.get("Content-Type", "").lower() | ||
| allowed_types = ["application/json", "text/json", "text/plain"] | ||
| if not any(t in content_type for t in allowed_types): | ||
| raise ValueError( | ||
| f"Invalid Content-Type from {sanitize_for_log(url)}: {content_type}. " | ||
| f"Expected one of: {', '.join(allowed_types)}" | ||
| ) | ||
|
|
||
| # 1. Check Content-Length header if present | ||
| cl = r.headers.get("Content-Length") | ||
| if cl: | ||
| try: | ||
| if int(cl) > MAX_RESPONSE_SIZE: | ||
| raise ValueError( | ||
| f"Response too large from {sanitize_for_log(url)} " | ||
| f"({int(cl) / (1024 * 1024):.2f} MB)" | ||
| ) | ||
| except ValueError as e: | ||
| # Only catch the conversion error, let the size error propagate | ||
| if "Response too large" in str(e): | ||
| raise e | ||
| log.warning( | ||
| f"Malformed Content-Length header from {sanitize_for_log(url)}: {cl!r}. " | ||
| "Falling back to streaming size check." | ||
| ) | ||
|
|
||
| # 2. Stream and check actual size | ||
| chunks = [] | ||
| current_size = 0 | ||
| # Optimization: Use 16KB chunks to reduce loop overhead/appends for large files | ||
| for chunk in r.iter_bytes(chunk_size=16 * 1024): | ||
| current_size += len(chunk) | ||
| if current_size > MAX_RESPONSE_SIZE: | ||
| raise ValueError( | ||
| f"Response too large from {sanitize_for_log(url)} " | ||
| f"(> {MAX_RESPONSE_SIZE / (1024 * 1024):.2f} MB)" | ||
| ) | ||
| chunks.append(chunk) | ||
|
|
||
| try: | ||
| data = json.loads(b"".join(chunks)) | ||
| except json.JSONDecodeError as e: | ||
| raise ValueError( | ||
| f"Invalid JSON response from {sanitize_for_log(url)}" | ||
| ) from e | ||
|
|
||
| # Store cache headers for future conditional requests | ||
| # ETag is preferred over Last-Modified (more reliable) | ||
| etag = r.headers.get("ETag") | ||
| last_modified = r.headers.get("Last-Modified") | ||
|
|
||
| # Update disk cache with new data and headers | ||
| _disk_cache[url] = { | ||
| "data": data, | ||
| "etag": etag, | ||
| "last_modified": last_modified, | ||
| "fetched_at": time.time(), | ||
| "last_validated": time.time(), | ||
| } | ||
|
|
||
| _cache_stats["misses"] += 1 | ||
|
|
||
| except httpx.HTTPStatusError as e: | ||
| # Re-raise with original exception (don't catch and re-raise) | ||
| raise | ||
|
|
||
| # Double-checked locking: Check again after fetch to avoid duplicate fetches | ||
| # If another thread already cached it while we were fetching, use theirs | ||
| # for consistency (return _cache[url] instead of data to ensure single source of truth) | ||
| with _cache_lock: | ||
| if url not in _cache: | ||
| _cache[url] = data | ||
| return _cache[url] | ||
|
|
||
|
|
||
| def check_api_access(client: httpx.Client, profile_id: str) -> bool: | ||
|
|
@@ -1560,26 +1575,26 @@ | |
| return {} | ||
|
|
||
|
|
||
| def verify_access_and_get_folders( | ||
| client: httpx.Client, profile_id: str | ||
| ) -> Optional[Dict[str, str]]: | ||
| """Combine access check and folder listing into a single API request. | ||
|
|
||
| Returns: | ||
| Dict of {folder_name: folder_id} on success. | ||
| None if access is denied or the request fails after retries. | ||
| """ | ||
| url = f"{API_BASE}/{profile_id}/groups" | ||
|
|
||
| for attempt in range(MAX_RETRIES): | ||
| try: | ||
| resp = client.get(url) | ||
| resp.raise_for_status() | ||
|
|
||
| try: | ||
| data = resp.json() | ||
|
|
||
| # Ensure we got the expected top-level JSON structure. | ||
Check noticeCode scanning / Pylintpython3 (reported by Codacy) Use lazy % formatting in logging functions Note
Use lazy % formatting in logging functions
|
||
| # We defensively validate types here so that unexpected but valid | ||
| # JSON (e.g., a list or a scalar) doesn't cause AttributeError/TypeError | ||
| # and cause the operation to fail unexpectedly. | ||
|
|
@@ -1821,7 +1836,7 @@ | |
| if USE_COLORS: | ||
| sys.stderr.write( | ||
| f"\r\033[K{Colors.GREEN}β Warming up cache: Done!{Colors.ENDC}\n" | ||
| ) | ||
Check warningCode scanning / Prospector (reported by Codacy) Use lazy % formatting in logging functions (logging-fstring-interpolation) Warning
Use lazy % formatting in logging functions (logging-fstring-interpolation)
|
||
| sys.stderr.flush() | ||
|
|
||
|
|
||
|
|
@@ -1848,74 +1863,74 @@ | |
| return False | ||
|
|
||
|
|
||
| def create_folder( | ||
| client: httpx.Client, profile_id: str, name: str, do: int, status: int | ||
| ) -> Optional[str]: | ||
| """ | ||
| Create a new folder and return its ID. | ||
| Attempts to read ID from response first, then falls back to polling. | ||
| """ | ||
| try: | ||
| # 1. Send the Create Request | ||
| response = _api_post( | ||
| client, | ||
| f"{API_BASE}/{profile_id}/groups", | ||
| data={"name": name, "do": do, "status": status}, | ||
| ) | ||
|
|
||
| # OPTIMIZATION: Try to grab ID directly from response to avoid the wait loop | ||
| try: | ||
| resp_data = response.json() | ||
| body = resp_data.get("body", {}) | ||
|
|
||
| # Check if it returned a single group object | ||
| if isinstance(body, dict) and "group" in body and "PK" in body["group"]: | ||
| pk = str(body["group"]["PK"]) | ||
| if not validate_folder_id(pk, log_errors=False): | ||
| log.error(f"API returned invalid folder ID: {sanitize_for_log(pk)}") | ||
| return None | ||
| log.info( | ||
| "Created folder %s (ID %s) [Direct]", | ||
| sanitize_for_log(name), | ||
| sanitize_for_log(pk), | ||
| ) | ||
| return pk | ||
|
|
||
| # Check if it returned a list containing our group | ||
| if isinstance(body, dict) and "groups" in body: | ||
| for grp in body["groups"]: | ||
| if grp.get("group") == name: | ||
| pk = str(grp["PK"]) | ||
| if not validate_folder_id(pk, log_errors=False): | ||
| log.error(f"API returned invalid folder ID: {sanitize_for_log(pk)}") | ||
| continue | ||
| log.info( | ||
| "Created folder %s (ID %s) [Direct]", | ||
| sanitize_for_log(name), | ||
| sanitize_for_log(pk), | ||
| ) | ||
| return pk | ||
| except Exception as e: | ||
| log.debug( | ||
| f"Could not extract ID from POST response: " f"{sanitize_for_log(e)}" | ||
| ) | ||
|
|
||
| # 2. Fallback: Poll for the new folder (The Robust Retry Logic) | ||
| for attempt in range(MAX_RETRIES + 1): | ||
| try: | ||
| data = _api_get(client, f"{API_BASE}/{profile_id}/groups").json() | ||
| groups = data.get("body", {}).get("groups", []) | ||
|
|
||
| for grp in groups: | ||
| if grp["group"].strip() == name.strip(): | ||
| pk = str(grp["PK"]) | ||
| if not validate_folder_id(pk, log_errors=False): | ||
| log.error(f"API returned invalid folder ID: {sanitize_for_log(pk)}") | ||
| return None | ||
| log.info( | ||
| "Created folder %s (ID %s) [Polled]", | ||
| sanitize_for_log(name), | ||
| sanitize_for_log(pk), | ||
Check warningCode scanning / Prospector (reported by Codacy) Use lazy % formatting in logging functions (logging-fstring-interpolation) Warning
Use lazy % formatting in logging functions (logging-fstring-interpolation)
|
||
| ) | ||
| return pk | ||
| except Exception as e: | ||
|
|
@@ -1942,93 +1957,94 @@ | |
| return None | ||
|
|
||
|
|
||
| def push_rules( | ||
| profile_id: str, | ||
| folder_name: str, | ||
| folder_id: str, | ||
| do: int, | ||
| status: int, | ||
| hostnames: List[str], | ||
| existing_rules: Set[str], | ||
| client: httpx.Client, | ||
| batch_executor: Optional[concurrent.futures.Executor] = None, | ||
| ) -> bool: | ||
| """ | ||
| Pushes rules to a folder in batches, filtering duplicates and invalid rules. | ||
|
|
||
| Deduplicates input, validates rules against RULE_PATTERN, and sends batches | ||
| in parallel for optimal performance. Updates existing_rules set with newly | ||
| added rules. Returns True if all batches succeed. | ||
| """ | ||
| if not hostnames: | ||
| log.info("Folder %s - no rules to push", sanitize_for_log(folder_name)) | ||
| return True | ||
|
|
||
| original_count = len(hostnames) | ||
|
|
||
| # Optimization 1: Deduplicate input list while preserving order using dict.fromkeys() | ||
| # This is significantly faster than using a 'seen' set in the loop for large lists. | ||
| # It also naturally deduplicates invalid rules, preventing log spam. | ||
| unique_hostnames = dict.fromkeys(hostnames) | ||
|
|
||
| filtered_hostnames = [] | ||
| skipped_unsafe = 0 | ||
|
|
||
| # Optimization 2: Inline regex match and check existence | ||
| # Using a local reference to the match method avoids function call overhead | ||
| # in the hot loop. This provides a measurable speedup for large lists. | ||
| match_rule = RULE_PATTERN.match | ||
|
|
||
| for h in unique_hostnames: | ||
| if h in existing_rules: | ||
| continue | ||
|
|
||
| if not match_rule(h): | ||
| # Enforce max length (255) and regex pattern | ||
| if len(h) > 255 or not match_rule(h): | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The maximum rule length |
||
| log.warning( | ||
| f"Skipping unsafe rule in {sanitize_for_log(folder_name)}: {sanitize_for_log(h)}" | ||
| ) | ||
| skipped_unsafe += 1 | ||
| continue | ||
|
|
||
| filtered_hostnames.append(h) | ||
|
|
||
| if skipped_unsafe > 0: | ||
| log.warning( | ||
| f"Folder {sanitize_for_log(folder_name)}: skipped {skipped_unsafe} unsafe rules" | ||
| ) | ||
|
|
||
| duplicates_count = original_count - len(filtered_hostnames) - skipped_unsafe | ||
|
|
||
| if duplicates_count > 0: | ||
| log.info( | ||
| f"Folder {sanitize_for_log(folder_name)}: skipping {duplicates_count} duplicate rules" | ||
| ) | ||
|
|
||
| if not filtered_hostnames: | ||
| log.info( | ||
| f"Folder {sanitize_for_log(folder_name)} - no new rules to push after filtering duplicates" | ||
| ) | ||
| return True | ||
|
|
||
| successful_batches = 0 | ||
|
|
||
| # Prepare batches | ||
| batches = [] | ||
| for start in range(0, len(filtered_hostnames), BATCH_SIZE): | ||
| batches.append(filtered_hostnames[start : start + BATCH_SIZE]) | ||
|
|
||
| total_batches = len(batches) | ||
|
|
||
| # Optimization: Hoist loop invariants to avoid redundant computations | ||
| str_do = str(do) | ||
| str_status = str(status) | ||
| str_group = str(folder_id) | ||
| sanitized_folder_name = sanitize_for_log(folder_name) | ||
| progress_label = f"Folder {sanitized_folder_name}" | ||
|
|
||
| def process_batch(batch_idx: int, batch_data: List[str]) -> Optional[List[str]]: | ||
| """Processes a single batch of rules by sending API request.""" | ||
| data = { | ||
Check noticeCode scanning / Pylintpython3 (reported by Codacy) Use lazy % formatting in logging functions Note
Use lazy % formatting in logging functions
|
||
| "do": str_do, | ||
| "status": str_status, | ||
| "group": str_group, | ||
|
|
@@ -2175,211 +2191,211 @@ | |
| # --------------------------------------------------------------------------- # | ||
| # 4. Main workflow | ||
| # --------------------------------------------------------------------------- # | ||
| def sync_profile( | ||
| profile_id: str, | ||
| folder_urls: Sequence[str], | ||
| dry_run: bool = False, | ||
| no_delete: bool = False, | ||
| plan_accumulator: Optional[List[Dict[str, Any]]] = None, | ||
| ) -> bool: | ||
| """ | ||
| Synchronizes Control D folders from remote blocklist URLs. | ||
|
|
||
| Fetches folder data, optionally deletes existing folders with same names, | ||
| creates new folders, and pushes rules in batches. In dry-run mode, only | ||
| generates a plan without making API changes. Returns True if all folders | ||
| sync successfully. | ||
| """ | ||
| # SECURITY: Clear cached DNS validations at the start of each sync run. | ||
| # This prevents TOCTOU issues where a domain's IP could change between runs. | ||
| validate_folder_url.cache_clear() | ||
|
|
||
| try: | ||
| # Fetch all folder data first | ||
| folder_data_list = [] | ||
|
|
||
| # OPTIMIZATION: Move validation inside the thread pool to parallelize DNS lookups. | ||
| # Previously, sequential validation blocked the main thread. | ||
| def _fetch_if_valid(url: str): | ||
| # Optimization: If we already have the content in cache, return it directly. | ||
| # The content was validated at the time of fetch (warm_up_cache). | ||
| # Read directly from cache to avoid calling fetch_folder_data while holding lock. | ||
| with _cache_lock: | ||
| if url in _cache: | ||
| return _cache[url] | ||
|
|
||
| if validate_folder_url(url): | ||
| return fetch_folder_data(url) | ||
| return None | ||
|
|
||
| with concurrent.futures.ThreadPoolExecutor() as executor: | ||
| future_to_url = { | ||
| executor.submit(_fetch_if_valid, url): url for url in folder_urls | ||
| } | ||
|
|
||
| for future in concurrent.futures.as_completed(future_to_url): | ||
| url = future_to_url[future] | ||
| try: | ||
| result = future.result() | ||
| if result: | ||
| folder_data_list.append(result) | ||
| except (httpx.HTTPError, KeyError, ValueError) as e: | ||
| log.error( | ||
| f"Failed to fetch folder data from {sanitize_for_log(url)}: {sanitize_for_log(e)}" | ||
| ) | ||
| continue | ||
|
|
||
| if not folder_data_list: | ||
| log.error("No valid folder data found") | ||
| return False | ||
|
|
||
| # Build plan entries | ||
| plan_entry = {"profile": profile_id, "folders": []} | ||
| for folder_data in folder_data_list: | ||
| grp = folder_data["group"] | ||
| name = grp["group"].strip() | ||
|
|
||
| if "rule_groups" in folder_data: | ||
| # Multi-action format | ||
| total_rules = sum( | ||
| len(rg.get("rules", [])) for rg in folder_data["rule_groups"] | ||
| ) | ||
| plan_entry["folders"].append( | ||
| { | ||
| "name": name, | ||
| "rules": total_rules, | ||
| "rule_groups": [ | ||
| { | ||
| "rules": len(rg.get("rules", [])), | ||
| "action": rg.get("action", {}).get("do"), | ||
| "status": rg.get("action", {}).get("status"), | ||
| } | ||
| for rg in folder_data["rule_groups"] | ||
| ], | ||
| } | ||
| ) | ||
| else: | ||
| # Legacy single-action format | ||
| hostnames = [ | ||
| r["PK"] for r in folder_data.get("rules", []) if r.get("PK") | ||
| ] | ||
| plan_entry["folders"].append( | ||
| { | ||
| "name": name, | ||
| "rules": len(hostnames), | ||
| "action": grp.get("action", {}).get("do"), | ||
| "status": grp.get("action", {}).get("status"), | ||
| } | ||
| ) | ||
|
|
||
| if plan_accumulator is not None: | ||
| plan_accumulator.append(plan_entry) | ||
|
|
||
| if dry_run: | ||
| print_plan_details(plan_entry) | ||
| log.info("Dry-run complete: no API calls were made.") | ||
| return True | ||
|
|
||
| # Create new folders and push rules | ||
| success_count = 0 | ||
|
|
||
| # CRITICAL FIX: Switch to Serial Processing (1 worker) | ||
| # This prevents API rate limits and ensures stability for large folders. | ||
| max_workers = 1 | ||
|
|
||
| # Shared executor for rate-limited operations (DELETE, push_rules batches) | ||
| # Reusing this executor prevents thread churn and enforces global rate limits. | ||
| with concurrent.futures.ThreadPoolExecutor( | ||
| max_workers=DELETE_WORKERS | ||
| ) as shared_executor, _api_client() as client: | ||
| # Verify access and list existing folders in one request | ||
| existing_folders = verify_access_and_get_folders(client, profile_id) | ||
| if existing_folders is None: | ||
| return False | ||
|
|
||
| if not no_delete: | ||
| deletion_occurred = False | ||
|
|
||
| # Identify folders to delete | ||
| folders_to_delete = [] | ||
| for folder_data in folder_data_list: | ||
| name = folder_data["group"]["group"].strip() | ||
| if name in existing_folders: | ||
| folders_to_delete.append((name, existing_folders[name])) | ||
|
|
||
| if folders_to_delete: | ||
| # Parallel delete to speed up the "clean slate" phase | ||
| # Use shared_executor (3 workers) | ||
| future_to_name = { | ||
| shared_executor.submit( | ||
| delete_folder, client, profile_id, name, folder_id | ||
| ): name | ||
| for name, folder_id in folders_to_delete | ||
| } | ||
|
|
||
| for future in concurrent.futures.as_completed(future_to_name): | ||
| name = future_to_name[future] | ||
| try: | ||
| if future.result(): | ||
| del existing_folders[name] | ||
| deletion_occurred = True | ||
| except Exception as exc: | ||
| # Sanitize both name and exception to prevent log injection | ||
| log.error( | ||
| "Failed to delete folder %s: %s", | ||
| sanitize_for_log(name), | ||
| sanitize_for_log(exc), | ||
| ) | ||
|
|
||
| # CRITICAL FIX: Increased wait time for massive folders to clear | ||
| if deletion_occurred: | ||
| if not USE_COLORS: | ||
| log.info( | ||
| "Waiting 60s for deletions to propagate (prevents 'Badware Hoster' zombie state)..." | ||
| ) | ||
| countdown_timer(60, "Waiting for deletions to propagate") | ||
|
|
||
| # Optimization: Pass the updated existing_folders to avoid redundant API call | ||
| existing_rules = get_all_existing_rules( | ||
| client, profile_id, known_folders=existing_folders | ||
| ) | ||
|
|
||
| with concurrent.futures.ThreadPoolExecutor( | ||
| max_workers=max_workers | ||
| ) as executor: | ||
| future_to_folder = { | ||
| executor.submit( | ||
| _process_single_folder, | ||
| folder_data, | ||
| profile_id, | ||
| existing_rules, | ||
| client, # Pass the persistent client | ||
| batch_executor=shared_executor, | ||
| ): folder_data | ||
| for folder_data in folder_data_list | ||
| } | ||
|
|
||
| for future in concurrent.futures.as_completed(future_to_folder): | ||
| folder_data = future_to_folder[future] | ||
| folder_name = folder_data["group"]["group"].strip() | ||
| try: | ||
| if future.result(): | ||
| success_count += 1 | ||
| except Exception as e: | ||
| log.error( | ||
| f"Failed to process folder '{sanitize_for_log(folder_name)}': {sanitize_for_log(e)}" | ||
| ) | ||
|
|
||
| log.info( | ||
| f"Sync complete: {success_count}/{len(folder_data_list)} folders processed successfully" | ||
| ) | ||
| return success_count == len(folder_data_list) | ||
|
|
||
| except Exception as e: | ||
| log.error( | ||
| f"Unexpected error during sync for profile {profile_id}: {sanitize_for_log(e)}" | ||
| ) | ||
| return False | ||
|
|
||
|
|
||
| # --------------------------------------------------------------------------- # | ||
|
|
@@ -2524,412 +2540,412 @@ | |
| return parser.parse_args() | ||
|
|
||
|
|
||
| def main(): | ||
| """ | ||
| Main entry point for Control D Sync. | ||
|
|
||
| Loads environment configuration, validates inputs, warms up cache, | ||
| and syncs profiles. Supports interactive prompts for missing credentials | ||
| when running in a TTY. Prints summary statistics and exits with appropriate | ||
| status code. | ||
| """ | ||
| # SECURITY: Check .env permissions (after Colors is defined for NO_COLOR support) | ||
| # This must happen BEFORE load_dotenv() to prevent reading secrets from world-readable files | ||
| check_env_permissions() | ||
| load_dotenv() | ||
|
|
||
| global TOKEN | ||
| # Re-initialize TOKEN to pick up values from .env (since load_dotenv was delayed) | ||
| TOKEN = _clean_env_kv(os.getenv("TOKEN"), "TOKEN") | ||
|
|
||
| args = parse_args() | ||
|
|
||
| # Load persistent cache from disk (graceful degradation on any error) | ||
| # NOTE: Called only after successful argument parsing so that `--help` or | ||
| # argument errors do not perform unnecessary filesystem I/O or logging. | ||
| load_disk_cache() | ||
|
|
||
| # Handle --clear-cache: delete cache file and exit immediately | ||
| if args.clear_cache: | ||
| global _disk_cache | ||
| cache_file = get_cache_dir() / "blocklists.json" | ||
| if cache_file.exists(): | ||
| try: | ||
| cache_file.unlink() | ||
| print(f"{Colors.GREEN}β Cleared blocklist cache: {cache_file}{Colors.ENDC}") | ||
| except OSError as e: | ||
| print(f"{Colors.FAIL}β Failed to clear cache: {e}{Colors.ENDC}") | ||
| exit(1) | ||
| else: | ||
| print(f"{Colors.CYAN}βΉ No cache file found, nothing to clear{Colors.ENDC}") | ||
| _disk_cache.clear() | ||
| exit(0) | ||
| profiles_arg = ( | ||
| _clean_env_kv(args.profiles or os.getenv("PROFILE", ""), "PROFILE") or "" | ||
| ) | ||
| profile_ids = [extract_profile_id(p) for p in profiles_arg.split(",") if p.strip()] | ||
| folder_urls = args.folder_url if args.folder_url else DEFAULT_FOLDER_URLS | ||
|
|
||
| # Interactive prompts for missing config | ||
| if not args.dry_run and sys.stdin.isatty(): | ||
| if not profile_ids: | ||
| print(f"{Colors.CYAN}βΉ Profile ID is missing.{Colors.ENDC}") | ||
| print( | ||
| f"{Colors.CYAN} You can find this in the URL of your profile in the Control D Dashboard (or just paste the URL).{Colors.ENDC}" | ||
| ) | ||
|
|
||
| def validate_profile_input(value: str) -> bool: | ||
| """Validates one or more profile IDs from comma-separated input.""" | ||
| ids = [extract_profile_id(p) for p in value.split(",") if p.strip()] | ||
| return bool(ids) and all( | ||
| validate_profile_id(pid, log_errors=False) for pid in ids | ||
| ) | ||
|
|
||
| p_input = get_validated_input( | ||
| f"{Colors.BOLD}Enter Control D Profile ID:{Colors.ENDC} ", | ||
| validate_profile_input, | ||
| "Invalid ID(s) or URL(s). Must be a valid Profile ID or a Control D Profile URL. Comma-separate for multiple.", | ||
| ) | ||
| profile_ids = [ | ||
| extract_profile_id(p) for p in p_input.split(",") if p.strip() | ||
| ] | ||
|
|
||
| if not TOKEN: | ||
| print(f"{Colors.CYAN}βΉ API Token is missing.{Colors.ENDC}") | ||
| print( | ||
| f"{Colors.CYAN} You can generate one at: https://controld.com/account/manage-account{Colors.ENDC}" | ||
| ) | ||
|
|
||
| t_input = get_password( | ||
| f"{Colors.BOLD}Enter Control D API Token:{Colors.ENDC} ", | ||
| lambda x: len(x) > 8, | ||
| "Token seems too short. Please check your API token.", | ||
| ) | ||
| TOKEN = t_input | ||
|
|
||
| if not profile_ids and not args.dry_run: | ||
| log.error( | ||
| "PROFILE missing and --dry-run not set. Provide --profiles or set PROFILE env." | ||
| ) | ||
| exit(1) | ||
|
|
||
| if not TOKEN and not args.dry_run: | ||
| log.error("TOKEN missing and --dry-run not set. Set TOKEN env for live sync.") | ||
| exit(1) | ||
|
|
||
| warm_up_cache(folder_urls) | ||
|
|
||
| plan: List[Dict[str, Any]] = [] | ||
| success_count = 0 | ||
| sync_results = [] | ||
|
|
||
| profile_id = "unknown" | ||
| start_time = time.time() | ||
|
|
||
| try: | ||
| for profile_id in profile_ids or ["dry-run-placeholder"]: | ||
| start_time = time.time() | ||
| # Skip validation for dry-run placeholder | ||
| if profile_id != "dry-run-placeholder" and not validate_profile_id( | ||
| profile_id | ||
| ): | ||
| sync_results.append( | ||
| { | ||
| "profile": profile_id, | ||
| "folders": 0, | ||
| "rules": 0, | ||
| "status_label": "β Invalid Profile ID", | ||
| "success": False, | ||
| "duration": 0.0, | ||
| } | ||
| ) | ||
| continue | ||
|
|
||
| log.info("Starting sync for profile %s", profile_id) | ||
| status = sync_profile( | ||
| profile_id, | ||
| folder_urls, | ||
| dry_run=args.dry_run, | ||
| no_delete=args.no_delete, | ||
| plan_accumulator=plan, | ||
| ) | ||
| end_time = time.time() | ||
| duration = end_time - start_time | ||
|
|
||
| if status: | ||
| success_count += 1 | ||
|
|
||
| # RESTORED STATS LOGIC: Calculate actual counts from the plan | ||
| entry = next((p for p in plan if p["profile"] == profile_id), None) | ||
| folder_count = len(entry["folders"]) if entry else 0 | ||
| rule_count = sum(f["rules"] for f in entry["folders"]) if entry else 0 | ||
|
|
||
| if args.dry_run: | ||
| status_text = "β Planned" if status else "β Failed (Dry)" | ||
| else: | ||
| status_text = "β Success" if status else "β Failed" | ||
|
|
||
| sync_results.append( | ||
| { | ||
| "profile": profile_id, | ||
| "folders": folder_count, | ||
| "rules": rule_count, | ||
| "status_label": status_text, | ||
| "success": status, | ||
| "duration": duration, | ||
| } | ||
| ) | ||
| except KeyboardInterrupt: | ||
| duration = time.time() - start_time | ||
| print( | ||
| f"\n{Colors.WARNING}β οΈ Sync cancelled by user. Finishing current task...{Colors.ENDC}" | ||
| ) | ||
|
|
||
| # Try to recover stats for the interrupted profile | ||
| entry = next((p for p in plan if p["profile"] == profile_id), None) | ||
| folder_count = len(entry["folders"]) if entry else 0 | ||
| rule_count = sum(f["rules"] for f in entry["folders"]) if entry else 0 | ||
|
|
||
| sync_results.append( | ||
| { | ||
| "profile": profile_id, | ||
| "folders": folder_count, | ||
| "rules": rule_count, | ||
| "status_label": "β Cancelled", | ||
| "success": False, | ||
| "duration": duration, | ||
| } | ||
| ) | ||
|
|
||
| if args.plan_json: | ||
| with open(args.plan_json, "w", encoding="utf-8") as f: | ||
| json.dump(plan, f, indent=2) | ||
| log.info("Plan written to %s", args.plan_json) | ||
|
|
||
| # Print Summary Table | ||
| # Determine the width for the Profile ID column (min 25) | ||
| max_profile_len = max((len(r["profile"]) for r in sync_results), default=25) | ||
| profile_col_width = max(25, max_profile_len) | ||
|
|
||
| # Column widths | ||
| w_profile = profile_col_width | ||
| w_folders = 10 | ||
| w_rules = 12 | ||
| w_duration = 10 | ||
| w_status = 15 | ||
|
|
||
| def make_col_separator(left, mid, right, horiz): | ||
| parts = [ | ||
| horiz * (w_profile + 2), | ||
| horiz * (w_folders + 2), | ||
| horiz * (w_rules + 2), | ||
| horiz * (w_duration + 2), | ||
| horiz * (w_status + 2), | ||
| ] | ||
| return left + mid.join(parts) + right | ||
|
|
||
| # Calculate table width using a dummy separator | ||
| dummy_sep = make_col_separator(Box.TL, Box.T, Box.TR, Box.H) | ||
| table_width = len(dummy_sep) | ||
|
|
||
| title_text = " DRY RUN SUMMARY " if args.dry_run else " SYNC SUMMARY " | ||
| title_color = Colors.CYAN if args.dry_run else Colors.HEADER | ||
|
|
||
| # Top Border (Single Cell for Title) | ||
| print("\n" + Box.TL + Box.H * (table_width - 2) + Box.TR) | ||
|
|
||
| # Title Row | ||
| visible_title = title_text.strip() | ||
| inner_width = table_width - 2 | ||
| pad_left = (inner_width - len(visible_title)) // 2 | ||
| pad_right = inner_width - len(visible_title) - pad_left | ||
| print( | ||
| f"{Box.V}{' ' * pad_left}{title_color}{visible_title}{Colors.ENDC}{' ' * pad_right}{Box.V}" | ||
| ) | ||
|
|
||
| # Separator between Title and Headers (introduces columns) | ||
| print(make_col_separator(Box.L, Box.T, Box.R, Box.H)) | ||
|
|
||
| # Header Row | ||
| print( | ||
| f"{Box.V} {Colors.BOLD}{'Profile ID':<{w_profile}}{Colors.ENDC} " | ||
| f"{Box.V} {Colors.BOLD}{'Folders':>{w_folders}}{Colors.ENDC} " | ||
| f"{Box.V} {Colors.BOLD}{'Rules':>{w_rules}}{Colors.ENDC} " | ||
| f"{Box.V} {Colors.BOLD}{'Duration':>{w_duration}}{Colors.ENDC} " | ||
| f"{Box.V} {Colors.BOLD}{'Status':<{w_status}}{Colors.ENDC} {Box.V}" | ||
| ) | ||
|
|
||
| # Separator between Header and Body | ||
| print(make_col_separator(Box.L, Box.X, Box.R, Box.H)) | ||
|
|
||
| # Rows | ||
| total_folders = 0 | ||
| total_rules = 0 | ||
| total_duration = 0.0 | ||
|
|
||
| for res in sync_results: | ||
| # Use boolean success field for color logic | ||
| status_color = Colors.GREEN if res["success"] else Colors.FAIL | ||
|
|
||
| s_folders = f"{res['folders']:,}" | ||
| s_rules = f"{res['rules']:,}" | ||
| s_duration = f"{res['duration']:.1f}s" | ||
|
|
||
| print( | ||
| f"{Box.V} {res['profile']:<{w_profile}} " | ||
| f"{Box.V} {s_folders:>{w_folders}} " | ||
| f"{Box.V} {s_rules:>{w_rules}} " | ||
| f"{Box.V} {s_duration:>{w_duration}} " | ||
| f"{Box.V} {status_color}{res['status_label']:<{w_status}}{Colors.ENDC} {Box.V}" | ||
| ) | ||
| total_folders += res["folders"] | ||
| total_rules += res["rules"] | ||
| total_duration += res["duration"] | ||
|
|
||
| # Separator between Body and Total | ||
| print(make_col_separator(Box.L, Box.X, Box.R, Box.H)) | ||
|
|
||
| # Total Row | ||
| total = len(profile_ids or ["dry-run-placeholder"]) | ||
| all_success = success_count == total | ||
|
|
||
| if args.dry_run: | ||
| if all_success: | ||
| total_status_text = "β Ready" | ||
| else: | ||
| total_status_text = "β Errors" | ||
| else: | ||
| if all_success: | ||
| total_status_text = "β All Good" | ||
| else: | ||
| total_status_text = "β Errors" | ||
|
|
||
| total_status_color = Colors.GREEN if all_success else Colors.FAIL | ||
|
|
||
| s_total_folders = f"{total_folders:,}" | ||
| s_total_rules = f"{total_rules:,}" | ||
| s_total_duration = f"{total_duration:.1f}s" | ||
|
|
||
| print( | ||
| f"{Box.V} {Colors.BOLD}{'TOTAL':<{w_profile}}{Colors.ENDC} " | ||
| f"{Box.V} {s_total_folders:>{w_folders}} " | ||
| f"{Box.V} {s_total_rules:>{w_rules}} " | ||
| f"{Box.V} {s_total_duration:>{w_duration}} " | ||
| f"{Box.V} {total_status_color}{total_status_text:<{w_status}}{Colors.ENDC} {Box.V}" | ||
| ) | ||
| # Bottom Border | ||
| print(make_col_separator(Box.BL, Box.B, Box.BR, Box.H)) | ||
|
|
||
| # Success Delight | ||
| if all_success and not args.dry_run: | ||
| print_success_message(profile_ids) | ||
|
|
||
| # Dry Run Next Steps | ||
| if args.dry_run: | ||
| print() # Spacer | ||
| if all_success: | ||
| # Build the suggested command once so it stays consistent between | ||
| # color and non-color output modes. | ||
| cmd_parts = ["python", "main.py"] | ||
| if profile_ids: | ||
| # Join multiple profiles if needed | ||
| p_str = ",".join(profile_ids) | ||
| else: | ||
| p_str = "<your-profile-id>" | ||
| cmd_parts.append(f"--profiles {p_str}") | ||
|
|
||
| # Reconstruct other args if they were used (optional but helpful) | ||
| if args.folder_url: | ||
| for url in args.folder_url: | ||
| cmd_parts.append(f"--folder-url {url}") | ||
|
|
||
| cmd_str = " ".join(cmd_parts) | ||
|
|
||
| if USE_COLORS: | ||
| print(f"{Colors.BOLD}π Ready to sync? Run the following command:{Colors.ENDC}") | ||
| print(f" {Colors.CYAN}{cmd_str}{Colors.ENDC}") | ||
| else: | ||
| print("π Ready to sync? Run the following command:") | ||
| print(f" {cmd_str}") | ||
|
|
||
| # Offer interactive restart if appropriate | ||
| prompt_for_interactive_restart(profile_ids) | ||
|
|
||
| else: | ||
| if USE_COLORS: | ||
| print( | ||
| f"{Colors.FAIL}β οΈ Dry run encountered errors. Please check the logs above.{Colors.ENDC}" | ||
| ) | ||
| else: | ||
| print("β οΈ Dry run encountered errors. Please check the logs above.") | ||
|
|
||
| # Display API statistics | ||
| total_api_calls = _api_stats["control_d_api_calls"] + _api_stats["blocklist_fetches"] | ||
| if total_api_calls > 0: | ||
| print(f"{Colors.BOLD}API Statistics:{Colors.ENDC}") | ||
| print(f" β’ Control D API calls: {_api_stats['control_d_api_calls']:>7,}") | ||
| print(f" β’ Blocklist fetches: {_api_stats['blocklist_fetches']:>7,}") | ||
| print(f" β’ Total API requests: {total_api_calls:>7,}") | ||
| print() | ||
|
|
||
| # Display cache statistics if any cache activity occurred | ||
| if _cache_stats["hits"] + _cache_stats["misses"] + _cache_stats["validations"] > 0: | ||
| print(f"{Colors.BOLD}Cache Statistics:{Colors.ENDC}") | ||
| print(f" β’ Hits (in-memory): {_cache_stats['hits']:>7,}") | ||
| print(f" β’ Misses (downloaded): {_cache_stats['misses']:>7,}") | ||
| print(f" β’ Validations (304): {_cache_stats['validations']:>7,}") | ||
| if _cache_stats["errors"] > 0: | ||
| print(f" β’ Errors (non-fatal): {_cache_stats['errors']:>7,}") | ||
|
|
||
| # Calculate cache effectiveness | ||
| total_requests = _cache_stats["hits"] + _cache_stats["misses"] + _cache_stats["validations"] | ||
| if total_requests > 0: | ||
| # Hits + validations = avoided full downloads | ||
| cache_effectiveness = (_cache_stats["hits"] + _cache_stats["validations"]) / total_requests * 100 | ||
| print(f" β’ Cache effectiveness: {cache_effectiveness:>6.1f}%") | ||
| print() | ||
|
|
||
| # Display rate limit information if available | ||
| with _rate_limit_lock: | ||
| if any(v is not None for v in _rate_limit_info.values()): | ||
| print(f"{Colors.BOLD}API Rate Limit Status:{Colors.ENDC}") | ||
|
|
||
| if _rate_limit_info["limit"] is not None: | ||
| print(f" β’ Requests limit: {_rate_limit_info['limit']:>6,}") | ||
|
|
||
| if _rate_limit_info["remaining"] is not None: | ||
| remaining = _rate_limit_info["remaining"] | ||
| limit = _rate_limit_info["limit"] | ||
|
|
||
| # Color code based on remaining capacity | ||
| if limit and limit > 0: | ||
| pct = (remaining / limit) * 100 | ||
| if pct < 20: | ||
| color = Colors.FAIL # Red for critical | ||
| elif pct < 50: | ||
| color = Colors.WARNING # Yellow for caution | ||
| else: | ||
| color = Colors.GREEN # Green for healthy | ||
| print(f" β’ Requests remaining: {color}{remaining:>6,} ({pct:>5.1f}%){Colors.ENDC}") | ||
| else: | ||
| print(f" β’ Requests remaining: {remaining:>6,}") | ||
|
|
||
| if _rate_limit_info["reset"] is not None: | ||
| reset_time = time.strftime( | ||
| "%H:%M:%S", | ||
| time.localtime(_rate_limit_info["reset"]) | ||
| ) | ||
| print(f" β’ Limit resets at: {reset_time}") | ||
|
|
||
| print() | ||
|
|
||
| # Save cache to disk after successful sync (non-fatal if it fails) | ||
| if not args.dry_run: | ||
| save_disk_cache() | ||
|
|
||
| total = len(profile_ids or ["dry-run-placeholder"]) | ||
| log.info(f"All profiles processed: {success_count}/{total} successful") | ||
| exit(0 if success_count == total else 1) | ||
|
|
||
|
|
||
| if __name__ == "__main__": | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
| @@ -0,0 +1,27 @@ | ||||||
| import pytest | ||||||
Check warningCode scanning / Prospector (reported by Codacy) Unable to import 'pytest' (import-error) Warning test
Unable to import 'pytest' (import-error)
Check warningCode scanning / Prospector (reported by Codacy) Unused import pytest (unused-import) Warning test
Unused import pytest (unused-import)
Check warningCode scanning / Pylint (reported by Codacy) Missing module docstring Warning test
Missing module docstring
Check warningCode scanning / Pylintpython3 (reported by Codacy) Missing module docstring Warning test
Missing module docstring
Check noticeCode scanning / Pylint (reported by Codacy) Unused import pytest Note test
Unused import pytest
Check noticeCode scanning / Pylintpython3 (reported by Codacy) Unused import pytest Note test
Unused import pytest
|
||||||
| from unittest.mock import MagicMock, patch | ||||||
Check warningCode scanning / Prospector (reported by Codacy) Unused MagicMock imported from unittest.mock (unused-import) Warning test
Unused MagicMock imported from unittest.mock (unused-import)
Check warningCode scanning / Pylint (reported by Codacy) standard import "from unittest.mock import MagicMock, patch" should be placed before "import pytest" Warning test
standard import "from unittest.mock import MagicMock, patch" should be placed before "import pytest"
Check warningCode scanning / Pylintpython3 (reported by Codacy) standard import "from unittest.mock import MagicMock, patch" should be placed before "import pytest" Warning test
standard import "from unittest.mock import MagicMock, patch" should be placed before "import pytest"
Check noticeCode scanning / Pylint (reported by Codacy) Unused MagicMock imported from unittest.mock Note test
Unused MagicMock imported from unittest.mock
Check noticeCode scanning / Pylint (reported by Codacy) Unused patch imported from unittest.mock Note test
Unused patch imported from unittest.mock
Check noticeCode scanning / Pylintpython3 (reported by Codacy) Unused MagicMock imported from unittest.mock Note test
Unused MagicMock imported from unittest.mock
Check noticeCode scanning / Pylintpython3 (reported by Codacy) Unused patch imported from unittest.mock Note test
Unused patch imported from unittest.mock
Comment on lines
+1
to
+2
|
||||||
| import pytest | |
| from unittest.mock import MagicMock, patch |
Check warning
Code scanning / Pylint (reported by Codacy)
Missing function docstring Warning test
Check warning
Code scanning / Pylintpython3 (reported by Codacy)
Missing function or method docstring Warning test
Check notice
Code scanning / Bandit
Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test
Check notice
Code scanning / Bandit (reported by Codacy)
Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test
Check notice
Code scanning / Bandit
Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test
Check notice
Code scanning / Bandit (reported by Codacy)
Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test
Check warning
Code scanning / Pylint (reported by Codacy)
Missing function docstring Warning test
Check warning
Code scanning / Pylintpython3 (reported by Codacy)
Missing function or method docstring Warning test
Check notice
Code scanning / Bandit
Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test
Check notice
Code scanning / Bandit (reported by Codacy)
Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test
Check notice
Code scanning / Bandit
Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test
Check notice
Code scanning / Bandit (reported by Codacy)
Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test
Check warning
Code scanning / Pylint (reported by Codacy)
Missing function docstring Warning test
Check warning
Code scanning / Pylintpython3 (reported by Codacy)
Missing function or method docstring Warning test
Check notice
Code scanning / Bandit
Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test
Check notice
Code scanning / Bandit (reported by Codacy)
Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test
Check notice
Code scanning / Bandit
Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test
Check notice
Code scanning / Bandit (reported by Codacy)
Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The value
64is used as a maximum length for folder IDs here, and also for folder names (is_valid_folder_name) and profile IDs (is_valid_profile_id_format). To improve maintainability and ensure these security-related limits are consistent, consider defining a module-level constant (e.g.,MAX_ID_LENGTH = 64) and using it in all relevant validation functions.