Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .jules/bolt.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,7 @@
## 2024-03-24 - Thread Pool Churn
**Learning:** Python's `ThreadPoolExecutor` incurs measurable overhead (thread creation/shutdown) when created/destroyed repeatedly inside loops, even with small worker counts.
**Action:** Lift `ThreadPoolExecutor` creation to the highest possible scope and pass it down as a dependency (using `contextlib.nullcontext` for flexible ownership).

## 2026-02-22 - [Stale Module References in Tests]

Check notice

Code scanning / Remark-lint (reported by Codacy)

Warn when references to undefined definitions are found. Note

[no-undefined-references] Found reference to undefined definition

Check notice

Code scanning / Remark-lint (reported by Codacy)

Warn when shortcut reference links are used. Note

[no-shortcut-reference-link] Use the trailing [] on reference links
**Learning:** When tests manually reload modules (e.g. `del sys.modules['main']`), other test files that imported the module at the top level will hold references to the *stale* module object. `patch('module.func')` patches the *new* module in `sys.modules`, so the old functions (called by stale tests) remain unpatched.
**Action:** In test suites where module reloading occurs, avoid top-level imports of the reloaded module. Import it inside `setUp` or test methods using `sys.modules.get('main')` or `import main` locally.
51 changes: 40 additions & 11 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -680,6 +680,27 @@
_rate_limit_lock = threading.Lock() # Protect _rate_limit_info updates


def _is_cache_fresh(url: str) -> bool:
"""
Checks if the URL is in the persistent cache and within TTL.

This optimization allows skipping expensive DNS validation for
content that is already known to be safe (validated at fetch time).
"""
# Check in-memory cache first
with _cache_lock:
if url in _cache:
return True

Comment on lines +683 to +694
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_is_cache_fresh() currently returns True for any URL present in the in-memory _cache, even though the docstring says it checks the persistent cache and TTL. This mismatch can be misleading for future callers and makes it unclear whether “fresh” means “no network I/O needed” vs “within TTL”. Consider renaming (or updating the docstring) to reflect the actual semantics, and/or checking TTL for the in-memory entry if that’s the intended meaning.

Copilot uses AI. Check for mistakes.
# Check disk cache
entry = _disk_cache.get(url)
if entry:
last_validated = entry.get("last_validated", 0)
if time.time() - last_validated < CACHE_TTL_SECONDS:
return True
return False
Comment on lines +683 to +701

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The _is_cache_fresh function is a good optimization to avoid unnecessary DNS lookups for cached entries. This will improve performance, especially during warm-up.



def get_cache_dir() -> Path:
"""
Returns platform-specific cache directory for ctrld-sync.
Expand Down Expand Up @@ -1060,7 +1081,7 @@
if any(c in _DANGEROUS_FOLDER_CHARS or c in _BIDI_CONTROL_CHARS for c in name):
return False

# Security: Block path traversal attempts

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Use lazy % formatting in logging functions Note

Use lazy % formatting in logging functions
# Check stripped name to prevent whitespace bypass (e.g. " . ")
clean_name = name.strip()
if clean_name in (".", ".."):
Expand All @@ -1073,14 +1094,14 @@
return True


def validate_folder_data(data: Dict[str, Any], url: str) -> bool:
"""
Validates folder JSON data structure and content.

Checks for required fields (name, action, rules), validates folder name
and action type, and ensures rules are valid. Logs specific validation errors.
"""
if not isinstance(data, dict):

Check warning

Code scanning / Prospector (reported by Codacy)

Use lazy % formatting in logging functions (logging-fstring-interpolation) Warning

Use lazy % formatting in logging functions (logging-fstring-interpolation)
log.error(
f"Invalid data from {sanitize_for_log(url)}: Root must be a JSON object."
)
Expand Down Expand Up @@ -1111,7 +1132,7 @@
f"Invalid data from {sanitize_for_log(url)}: Invalid folder name (empty, unsafe characters, or non-printable)."
)
return False

Check warning

Code scanning / Pylint (reported by Codacy)

Line too long (108/100) Warning

Line too long (108/100)

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Line too long (108/100) Warning

Line too long (108/100)
# Validate 'rules' if present (must be a list)
if "rules" in data and not isinstance(data["rules"], list):
log.error(f"Invalid data from {sanitize_for_log(url)}: 'rules' must be a list.")
Expand All @@ -1131,19 +1152,22 @@
)
return False
if "rules" in rg:
if not isinstance (rg["rules"], list):
log. error (
f"Invalid data from {sanitize_for_log(url)} : rule_groups[fil].rules must be a list."
if not isinstance(rg["rules"], list):
log.error(
f"Invalid data from {sanitize_for_log(url)}: rule_groups[{i}].rules must be a list."
)
return False
# Ensure each rule within the group is an object (dict),
# because later code treats each rule as a mapping (e.g., rule.get(...)).
for j, rule in enumerate (rgi"rules"1):
if not isinstance (rule, dict):
log. error (
f"Invalid data from {sanitize_for_log(u rl)}: rule_groups[fiłl.rules[kił] must be an object."
)
return False
# Ensure each rule within the group is an object (dict),
# because later code treats each rule as a mapping (e.g., rule.get(...)).
for j, rule in enumerate(rg["rules"]):
if not isinstance(rule, dict):
log.error(
f"Invalid data from {sanitize_for_log(url)}: rule_groups[{i}].rules[{j}] must be an object."

Check warning

Code scanning / Pylint (reported by Codacy)

Line too long (120/100) Warning

Line too long (120/100)

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Line too long (120/100) Warning

Line too long (120/100)
)
return False

return True

Check notice on line 1169 in main.py

View check run for this annotation

codefactor.io / CodeFactor

main.py#L1097-L1169

Complex Method


# Lock to protect updates to _api_stats in multi-threaded contexts.
# Without this, concurrent increments can lose updates because `+=` is not atomic.
Expand Down Expand Up @@ -1285,216 +1309,216 @@
time.sleep(wait_time)


def _gh_get(url: str) -> Dict:
"""
Fetch blocklist data from URL with HTTP cache header support.

CACHING STRATEGY:
1. Check in-memory cache first (fastest)
2. Check disk cache and send conditional request (If-None-Match/If-Modified-Since)
3. If 304 Not Modified: reuse cached data (cache validation)
4. If 200 OK: download new data and update cache

SECURITY: Validates data structure regardless of cache source
"""
global _cache_stats, _api_stats

# First check: Quick check without holding lock for long
with _cache_lock:
if url in _cache:
_cache_stats["hits"] += 1
return _cache[url]

# Track that we're about to make a blocklist fetch
with _cache_lock:
_api_stats["blocklist_fetches"] += 1

# Check disk cache for TTL-based hit or conditional request headers
headers = {}
cached_entry = _disk_cache.get(url)
if cached_entry:
last_validated = cached_entry.get("last_validated", 0)
if time.time() - last_validated < CACHE_TTL_SECONDS:
# Within TTL: return cached data directly without any HTTP request
data = cached_entry["data"]
with _cache_lock:
_cache[url] = data
_cache_stats["hits"] += 1
log.debug(f"Disk cache hit (within TTL) for {sanitize_for_log(url)}")
return data
# Beyond TTL: send conditional request using cached ETag/Last-Modified
# Server returns 304 if content hasn't changed
# NOTE: Cached values may be None if the server didn't send these headers.
# httpx requires header values to be str/bytes, so we only add headers
# when the cached value is truthy.
etag = cached_entry.get("etag")
if etag:
headers["If-None-Match"] = etag
last_modified = cached_entry.get("last_modified")
if last_modified:
headers["If-Modified-Since"] = last_modified

# Fetch data (or validate cache)
# Explicitly let HTTPError propagate (no need to catch just to re-raise)
try:
with _gh.stream("GET", url, headers=headers) as r:
# Handle 304 Not Modified - cached data is still valid
if r.status_code == 304:
if cached_entry and "data" in cached_entry:
log.debug(f"Cache validated (304) for {sanitize_for_log(url)}")
_cache_stats["validations"] += 1

# Update in-memory cache with validated data
data = cached_entry["data"]
with _cache_lock:
_cache[url] = data

# Update timestamp in disk cache to track last validation
cached_entry["last_validated"] = time.time()
return data
else:
# Shouldn't happen, but handle gracefully
log.warning(f"Got 304 but no cached data for {sanitize_for_log(url)}, re-fetching")
_cache_stats["errors"] += 1
# Close the original streaming response before retrying
r.close()
# Retry without conditional headers using streaming again so that
# MAX_RESPONSE_SIZE and related protections still apply.
headers = {}
with _gh.stream("GET", url, headers=headers) as r_retry:
r_retry.raise_for_status()

# 1. Check Content-Length header if present
cl = r_retry.headers.get("Content-Length")
if cl:
try:
if int(cl) > MAX_RESPONSE_SIZE:
raise ValueError(
f"Response too large from {sanitize_for_log(url)} "
f"({int(cl) / (1024 * 1024):.2f} MB)"
)
except ValueError as e:
# Only catch the conversion error, let the size error propagate
if "Response too large" in str(e):
raise e
log.warning(
f"Malformed Content-Length header from {sanitize_for_log(url)}: {cl!r}. "
"Falling back to streaming size check."
)

# 2. Stream and check actual size
chunks = []
current_size = 0
for chunk in r_retry.iter_bytes():
current_size += len(chunk)
if current_size > MAX_RESPONSE_SIZE:
raise ValueError(
f"Response too large from {sanitize_for_log(url)} "
f"(> {MAX_RESPONSE_SIZE / (1024 * 1024):.2f} MB)"
)
chunks.append(chunk)

try:
data = json.loads(b"".join(chunks))
except json.JSONDecodeError as e:
raise ValueError(
f"Invalid JSON response from {sanitize_for_log(url)}"
) from e

# Store cache headers for future conditional requests
# ETag is preferred over Last-Modified (more reliable)
etag = r_retry.headers.get("ETag")
last_modified = r_retry.headers.get("Last-Modified")

# Update disk cache with new data and headers
_disk_cache[url] = {
"data": data,
"etag": etag,
"last_modified": last_modified,
"fetched_at": time.time(),
"last_validated": time.time(),
}

_cache_stats["misses"] += 1
return data

r.raise_for_status()

# Security: Validate Content-Type
# Prevent processing of unexpected content types (e.g., HTML/XML from captive portals or attack sites)
content_type = r.headers.get("Content-Type", "").lower()
allowed_types = ["application/json", "text/json", "text/plain"]
if not any(t in content_type for t in allowed_types):
raise ValueError(
f"Invalid Content-Type from {sanitize_for_log(url)}: {content_type}. "
f"Expected one of: {', '.join(allowed_types)}"
)

# 1. Check Content-Length header if present
cl = r.headers.get("Content-Length")
if cl:
try:
if int(cl) > MAX_RESPONSE_SIZE:
raise ValueError(
f"Response too large from {sanitize_for_log(url)} "
f"({int(cl) / (1024 * 1024):.2f} MB)"
)
except ValueError as e:
# Only catch the conversion error, let the size error propagate
if "Response too large" in str(e):
raise e
log.warning(
f"Malformed Content-Length header from {sanitize_for_log(url)}: {cl!r}. "
"Falling back to streaming size check."
)

# 2. Stream and check actual size
chunks = []
current_size = 0
# Optimization: Use 16KB chunks to reduce loop overhead/appends for large files
for chunk in r.iter_bytes(chunk_size=16 * 1024):
current_size += len(chunk)
if current_size > MAX_RESPONSE_SIZE:
raise ValueError(
f"Response too large from {sanitize_for_log(url)} "
f"(> {MAX_RESPONSE_SIZE / (1024 * 1024):.2f} MB)"
)
chunks.append(chunk)

try:
data = json.loads(b"".join(chunks))
except json.JSONDecodeError as e:
raise ValueError(
f"Invalid JSON response from {sanitize_for_log(url)}"
) from e

# Store cache headers for future conditional requests
# ETag is preferred over Last-Modified (more reliable)
etag = r.headers.get("ETag")
last_modified = r.headers.get("Last-Modified")

# Update disk cache with new data and headers
_disk_cache[url] = {
"data": data,
"etag": etag,
"last_modified": last_modified,
"fetched_at": time.time(),
"last_validated": time.time(),
}

_cache_stats["misses"] += 1

except httpx.HTTPStatusError as e:
# Re-raise with original exception (don't catch and re-raise)
raise

# Double-checked locking: Check again after fetch to avoid duplicate fetches
# If another thread already cached it while we were fetching, use theirs
# for consistency (return _cache[url] instead of data to ensure single source of truth)
with _cache_lock:
if url not in _cache:
_cache[url] = data
return _cache[url]

Check notice on line 1521 in main.py

View check run for this annotation

codefactor.io / CodeFactor

main.py#L1312-L1521

Complex Method


def check_api_access(client: httpx.Client, profile_id: str) -> bool:
Expand Down Expand Up @@ -1560,125 +1584,125 @@
return {}


def verify_access_and_get_folders(
client: httpx.Client, profile_id: str
) -> Optional[Dict[str, str]]:
"""Combine access check and folder listing into a single API request.

Returns:
Dict of {folder_name: folder_id} on success.
None if access is denied or the request fails after retries.
"""
url = f"{API_BASE}/{profile_id}/groups"

for attempt in range(MAX_RETRIES):
try:
resp = client.get(url)
resp.raise_for_status()

try:
data = resp.json()

# Ensure we got the expected top-level JSON structure.
# We defensively validate types here so that unexpected but valid
# JSON (e.g., a list or a scalar) doesn't cause AttributeError/TypeError
# and cause the operation to fail unexpectedly.
if not isinstance(data, dict):
log.error(
"Failed to parse folders data: expected JSON object at top level, "
f"got {type(data).__name__}"
)
return None

body = data.get("body")
if not isinstance(body, dict):
log.error(
"Failed to parse folders data: expected 'body' to be an object, "
f"got {type(body).__name__ if body is not None else 'None'}"
)
return None

folders = body.get("groups", [])
if not isinstance(folders, list):
log.error(
"Failed to parse folders data: expected 'body[\"groups\"]' to be a list, "
f"got {type(folders).__name__}"
)
return None

# Only process entries that are dicts and have the required keys.
result: Dict[str, str] = {}
for f in folders:
if not isinstance(f, dict):
# Skip non-dict entries instead of crashing; this protects
# against partial data corruption or unexpected API changes.
continue
name = f.get("group")
pk = f.get("PK")
# Skip entries with empty or None values for required fields
if not name or not pk:
continue

pk_str = str(pk)
if not validate_folder_id(pk_str):
continue

result[str(name).strip()] = pk_str

return result
except (ValueError, TypeError, AttributeError) as err:
# As a final safeguard, catch any remaining parsing/shape errors so
# that a malformed response cannot crash the caller.
log.error("Failed to parse folders data: %s", sanitize_for_log(err))
return None

except httpx.HTTPStatusError as e:
code = e.response.status_code
if code in (401, 403, 404):
if code == 401:
log.critical(
f"{Colors.FAIL}❌ Authentication Failed: The API Token is invalid.{Colors.ENDC}"
)
log.critical(
f"{Colors.FAIL} Please check your token at: https://controld.com/account/manage-account{Colors.ENDC}"
)
elif code == 403:
log.critical(
"%s🚫 Access Denied: Token lacks permission for "
"Profile %s.%s",
Colors.FAIL,
sanitize_for_log(profile_id),
Colors.ENDC,
)
elif code == 404:
log.critical(
f"{Colors.FAIL}🔍 Profile Not Found: The ID '{sanitize_for_log(profile_id)}' does not exist.{Colors.ENDC}"
)
log.critical(
f"{Colors.FAIL} Please verify the Profile ID from your Control D Dashboard URL.{Colors.ENDC}"
)
return None

if attempt == MAX_RETRIES - 1:
log.error(f"API Request Failed ({code}): {sanitize_for_log(e)}")
return None

except httpx.RequestError as err:
if attempt == MAX_RETRIES - 1:
log.error(
"Network error during access verification: %s",
sanitize_for_log(err),
)
return None

wait_time = RETRY_DELAY * (2**attempt)
log.warning(
"Request failed (attempt %d/%d). Retrying in %ds...",
attempt + 1,
MAX_RETRIES,
wait_time,
)
time.sleep(wait_time)

Check notice on line 1705 in main.py

View check run for this annotation

codefactor.io / CodeFactor

main.py#L1587-L1705

Complex Method


def get_all_existing_rules(
Expand Down Expand Up @@ -1788,6 +1812,11 @@
# OPTIMIZATION: Combine validation (DNS) and fetching (HTTP) in one task
# to allow validation latency to be parallelized.
def _validate_and_fetch(url: str):
# Optimization: Skip DNS validation if cache is fresh
# This saves blocking I/O for known-good content
if _is_cache_fresh(url):
return _gh_get(url)
Comment on lines +1817 to +1818

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-high high

While this optimization aims to improve startup performance by skipping DNS validation if the cache is fresh, it introduces a high-severity Server-Side Request Forgery (SSRF) vulnerability. The _is_cache_fresh function trusts the last_validated timestamp from a disk-based cache file (blocklists.json) to bypass the validate_folder_url security check. An attacker with local file system access could tamper with this cache file, leading to the application making requests to attacker-specified URLs. It is critical that validation is not skipped based on a potentially user-controllable cache file.


if validate_folder_url(url):
return _gh_get(url)
Comment on lines +1815 to 1821
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warm_up_cache currently warms the in-memory cache via _gh_get(), which does not validate the folder JSON schema. Since sync_profile’s _fetch_if_valid path returns cached entries directly (bypassing fetch_folder_data/validate_folder_data), a malformed disk cache entry (or unexpected remote JSON) can be propagated into the sync without structural validation. Consider warming using fetch_folder_data(url) (or explicitly calling validate_folder_data on the returned JSON) so cached entries are guaranteed to be schema-valid before sync_profile reuses them.

Suggested change
# Optimization: Skip DNS validation if cache is fresh
# This saves blocking I/O for known-good content
if _is_cache_fresh(url):
return _gh_get(url)
if validate_folder_url(url):
return _gh_get(url)
# Optimization: Skip DNS validation if cache is fresh.
# We still route through fetch_folder_data so that:
# - Cached entries are loaded via the same path as normal sync.
# - Schema validation (via validate_folder_data) is consistently applied
# whether data comes from disk cache or the network.
if _is_cache_fresh(url):
# This should load and validate from cache without forcing a re-download.
# Any validation errors will surface as exceptions and be logged by the
# outer warm_up_cache loop.
fetch_folder_data(url)
return None
# For non-fresh entries, validate the URL (DNS/format) first to avoid
# unnecessary network calls, then fetch+validate the folder data.
if validate_folder_url(url):
fetch_folder_data(url)

Copilot uses AI. Check for mistakes.
return None
Comment on lines +1815 to 1822
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

warm_up_cache now skips validate_folder_url entirely when _is_cache_fresh(url) is true. Since load_disk_cache only sanitizes structure (it does not validate that cache keys are https URLs with a hostname), a corrupted/hand-edited disk cache could cause an unsafe/non-https URL to be accepted from cache without any URL-level validation. Consider still performing lightweight URL/scheme/hostname validation (without DNS) before returning _gh_get(url), or splitting validate_folder_url into a cheap parse/scheme check plus an optional DNS-resolution check so only the DNS part is skipped.

Copilot uses AI. Check for mistakes.
Expand All @@ -1807,7 +1836,7 @@
future.result()
except Exception as e:
if USE_COLORS:
# Clear line to print warning cleanly

Check warning

Code scanning / Prospector (reported by Codacy)

Use lazy % formatting in logging functions (logging-fstring-interpolation) Warning

Use lazy % formatting in logging functions (logging-fstring-interpolation)
sys.stderr.write("\r\033[K")
sys.stderr.flush()

Expand Down Expand Up @@ -1848,273 +1877,273 @@
return False


def create_folder(
client: httpx.Client, profile_id: str, name: str, do: int, status: int
) -> Optional[str]:
"""
Create a new folder and return its ID.
Attempts to read ID from response first, then falls back to polling.
"""
try:
# 1. Send the Create Request
response = _api_post(
client,
f"{API_BASE}/{profile_id}/groups",
data={"name": name, "do": do, "status": status},
)

# OPTIMIZATION: Try to grab ID directly from response to avoid the wait loop
try:
resp_data = response.json()
body = resp_data.get("body", {})

# Check if it returned a single group object
if isinstance(body, dict) and "group" in body and "PK" in body["group"]:
pk = str(body["group"]["PK"])
if not validate_folder_id(pk, log_errors=False):
log.error(f"API returned invalid folder ID: {sanitize_for_log(pk)}")
return None
log.info(
"Created folder %s (ID %s) [Direct]",
sanitize_for_log(name),
sanitize_for_log(pk),
)
return pk

# Check if it returned a list containing our group
if isinstance(body, dict) and "groups" in body:
for grp in body["groups"]:
if grp.get("group") == name:
pk = str(grp["PK"])
if not validate_folder_id(pk, log_errors=False):
log.error(f"API returned invalid folder ID: {sanitize_for_log(pk)}")
continue
log.info(
"Created folder %s (ID %s) [Direct]",
sanitize_for_log(name),
sanitize_for_log(pk),
)
return pk
except Exception as e:
log.debug(
f"Could not extract ID from POST response: " f"{sanitize_for_log(e)}"
)

# 2. Fallback: Poll for the new folder (The Robust Retry Logic)
for attempt in range(MAX_RETRIES + 1):
try:
data = _api_get(client, f"{API_BASE}/{profile_id}/groups").json()
groups = data.get("body", {}).get("groups", [])

for grp in groups:
if grp["group"].strip() == name.strip():
pk = str(grp["PK"])
if not validate_folder_id(pk, log_errors=False):
log.error(f"API returned invalid folder ID: {sanitize_for_log(pk)}")
return None
log.info(
"Created folder %s (ID %s) [Polled]",
sanitize_for_log(name),
sanitize_for_log(pk),
)
return pk
except Exception as e:
log.warning(
f"Error fetching groups on attempt {attempt}: {sanitize_for_log(e)}"
)

if attempt < MAX_RETRIES:
wait_time = FOLDER_CREATION_DELAY * (attempt + 1)
log.info(
f"Folder '{sanitize_for_log(name)}' not found yet. Retrying in {wait_time}s..."
)
time.sleep(wait_time)

log.error(
f"Folder {sanitize_for_log(name)} was not found after creation and retries."
)
return None

except (httpx.HTTPError, KeyError) as e:
log.error(
f"Failed to create folder {sanitize_for_log(name)}: {sanitize_for_log(e)}"
)
return None

Check notice on line 1971 in main.py

View check run for this annotation

codefactor.io / CodeFactor

main.py#L1880-L1971

Complex Method


def push_rules(
profile_id: str,
folder_name: str,
folder_id: str,
do: int,
status: int,
hostnames: List[str],
existing_rules: Set[str],
client: httpx.Client,
batch_executor: Optional[concurrent.futures.Executor] = None,
) -> bool:
"""
Pushes rules to a folder in batches, filtering duplicates and invalid rules.

Deduplicates input, validates rules against RULE_PATTERN, and sends batches
in parallel for optimal performance. Updates existing_rules set with newly
added rules. Returns True if all batches succeed.
"""
if not hostnames:
log.info("Folder %s - no rules to push", sanitize_for_log(folder_name))
return True

original_count = len(hostnames)

# Optimization 1: Deduplicate input list while preserving order using dict.fromkeys()
# This is significantly faster than using a 'seen' set in the loop for large lists.
# It also naturally deduplicates invalid rules, preventing log spam.
unique_hostnames = dict.fromkeys(hostnames)

filtered_hostnames = []
skipped_unsafe = 0

# Optimization 2: Inline regex match and check existence
# Using a local reference to the match method avoids function call overhead
# in the hot loop. This provides a measurable speedup for large lists.
match_rule = RULE_PATTERN.match

for h in unique_hostnames:
if h in existing_rules:
continue

if not match_rule(h):
log.warning(
f"Skipping unsafe rule in {sanitize_for_log(folder_name)}: {sanitize_for_log(h)}"
)
skipped_unsafe += 1
continue

filtered_hostnames.append(h)

if skipped_unsafe > 0:
log.warning(
f"Folder {sanitize_for_log(folder_name)}: skipped {skipped_unsafe} unsafe rules"
)

duplicates_count = original_count - len(filtered_hostnames) - skipped_unsafe

if duplicates_count > 0:
log.info(
f"Folder {sanitize_for_log(folder_name)}: skipping {duplicates_count} duplicate rules"
)

if not filtered_hostnames:
log.info(
f"Folder {sanitize_for_log(folder_name)} - no new rules to push after filtering duplicates"
)
return True

successful_batches = 0

# Prepare batches
batches = []
for start in range(0, len(filtered_hostnames), BATCH_SIZE):
batches.append(filtered_hostnames[start : start + BATCH_SIZE])

total_batches = len(batches)

# Optimization: Hoist loop invariants to avoid redundant computations
str_do = str(do)
str_status = str(status)
str_group = str(folder_id)
sanitized_folder_name = sanitize_for_log(folder_name)
progress_label = f"Folder {sanitized_folder_name}"

def process_batch(batch_idx: int, batch_data: List[str]) -> Optional[List[str]]:
"""Processes a single batch of rules by sending API request."""
data = {
"do": str_do,
"status": str_status,
"group": str_group,
}
# Optimization: Use pre-calculated keys and zip for faster dict update
# strict=False is intentional: batch_data may be shorter than BATCH_KEYS for final batch
data.update(zip(BATCH_KEYS, batch_data, strict=False))

try:
_api_post_form(client, f"{API_BASE}/{profile_id}/rules", data=data)
if not USE_COLORS:
log.info(
"Folder %s – batch %d: added %d rules",
sanitized_folder_name,
batch_idx,
len(batch_data),
)
return batch_data
except httpx.HTTPError as e:
if USE_COLORS:
sys.stderr.write("\n")
log.error(
f"Failed to push batch {batch_idx} for folder {sanitized_folder_name}: {sanitize_for_log(e)}"
)
if hasattr(e, "response") and e.response is not None:
log.debug(f"Response content: {sanitize_for_log(e.response.text)}")
return None

# Optimization 3: Parallelize batch processing
# Using 3 workers to speed up writes without hitting aggressive rate limits.
# If only 1 batch, run it synchronously to avoid ThreadPoolExecutor overhead.
if total_batches == 1:
result = process_batch(1, batches[0])
if result:
successful_batches += 1
existing_rules.update(result)

render_progress_bar(
successful_batches,
total_batches,
progress_label,
)
else:
# Use provided executor or create a local one (fallback)
if batch_executor:
executor_ctx = contextlib.nullcontext(batch_executor)
else:
executor_ctx = concurrent.futures.ThreadPoolExecutor(max_workers=3)

with executor_ctx as executor:
futures = {
executor.submit(process_batch, i, batch): i
for i, batch in enumerate(batches, 1)
}

for future in concurrent.futures.as_completed(futures):
result = future.result()
if result:
successful_batches += 1
existing_rules.update(result)

render_progress_bar(
successful_batches,
total_batches,
progress_label,
)

if successful_batches == total_batches:
if USE_COLORS:
sys.stderr.write(
f"\r\033[K{Colors.GREEN}✅ Folder {sanitize_for_log(folder_name)}: Finished ({len(filtered_hostnames):,} rules){Colors.ENDC}\n"
)
sys.stderr.flush()
else:
log.info(
f"Folder {sanitize_for_log(folder_name)} – finished ({len(filtered_hostnames):,} new rules added)"
)
return True
else:
log.error(
"Folder %s – only %d/%d batches succeeded",
sanitize_for_log(folder_name),
successful_batches,
total_batches,
)
return False

Check notice on line 2146 in main.py

View check run for this annotation

codefactor.io / CodeFactor

main.py#L1974-L2146

Complex Method


def _process_single_folder(
Expand Down Expand Up @@ -2175,163 +2204,163 @@
# --------------------------------------------------------------------------- #
# 4. Main workflow
# --------------------------------------------------------------------------- #
def sync_profile(
profile_id: str,
folder_urls: Sequence[str],
dry_run: bool = False,
no_delete: bool = False,
plan_accumulator: Optional[List[Dict[str, Any]]] = None,
) -> bool:
"""
Synchronizes Control D folders from remote blocklist URLs.

Fetches folder data, optionally deletes existing folders with same names,
creates new folders, and pushes rules in batches. In dry-run mode, only
generates a plan without making API changes. Returns True if all folders
sync successfully.
"""
# SECURITY: Clear cached DNS validations at the start of each sync run.
# This prevents TOCTOU issues where a domain's IP could change between runs.
validate_folder_url.cache_clear()

try:
# Fetch all folder data first
folder_data_list = []

# OPTIMIZATION: Move validation inside the thread pool to parallelize DNS lookups.
# Previously, sequential validation blocked the main thread.
def _fetch_if_valid(url: str):
# Optimization: If we already have the content in cache, return it directly.
# The content was validated at the time of fetch (warm_up_cache).
# Read directly from cache to avoid calling fetch_folder_data while holding lock.
with _cache_lock:
if url in _cache:
return _cache[url]

if validate_folder_url(url):
return fetch_folder_data(url)
return None

with concurrent.futures.ThreadPoolExecutor() as executor:
future_to_url = {
executor.submit(_fetch_if_valid, url): url for url in folder_urls
}

for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
if result:
folder_data_list.append(result)
except (httpx.HTTPError, KeyError, ValueError) as e:
log.error(
f"Failed to fetch folder data from {sanitize_for_log(url)}: {sanitize_for_log(e)}"
)
continue

if not folder_data_list:
log.error("No valid folder data found")
return False

# Build plan entries
plan_entry = {"profile": profile_id, "folders": []}
for folder_data in folder_data_list:
grp = folder_data["group"]
name = grp["group"].strip()

if "rule_groups" in folder_data:
# Multi-action format
total_rules = sum(
len(rg.get("rules", [])) for rg in folder_data["rule_groups"]
)
plan_entry["folders"].append(
{
"name": name,
"rules": total_rules,
"rule_groups": [
{
"rules": len(rg.get("rules", [])),
"action": rg.get("action", {}).get("do"),
"status": rg.get("action", {}).get("status"),
}
for rg in folder_data["rule_groups"]
],
}
)
else:
# Legacy single-action format
hostnames = [
r["PK"] for r in folder_data.get("rules", []) if r.get("PK")
]
plan_entry["folders"].append(
{
"name": name,
"rules": len(hostnames),
"action": grp.get("action", {}).get("do"),
"status": grp.get("action", {}).get("status"),
}
)

if plan_accumulator is not None:
plan_accumulator.append(plan_entry)

if dry_run:
print_plan_details(plan_entry)
log.info("Dry-run complete: no API calls were made.")
return True

# Create new folders and push rules
success_count = 0

# CRITICAL FIX: Switch to Serial Processing (1 worker)
# This prevents API rate limits and ensures stability for large folders.
max_workers = 1

# Shared executor for rate-limited operations (DELETE, push_rules batches)
# Reusing this executor prevents thread churn and enforces global rate limits.
with concurrent.futures.ThreadPoolExecutor(
max_workers=DELETE_WORKERS
) as shared_executor, _api_client() as client:
# Verify access and list existing folders in one request
existing_folders = verify_access_and_get_folders(client, profile_id)
if existing_folders is None:
return False

if not no_delete:
deletion_occurred = False

# Identify folders to delete
folders_to_delete = []
for folder_data in folder_data_list:
name = folder_data["group"]["group"].strip()
if name in existing_folders:
folders_to_delete.append((name, existing_folders[name]))

if folders_to_delete:
# Parallel delete to speed up the "clean slate" phase
# Use shared_executor (3 workers)
future_to_name = {
shared_executor.submit(
delete_folder, client, profile_id, name, folder_id
): name
for name, folder_id in folders_to_delete
}

for future in concurrent.futures.as_completed(future_to_name):
name = future_to_name[future]
try:
if future.result():
del existing_folders[name]
deletion_occurred = True
except Exception as exc:
# Sanitize both name and exception to prevent log injection
log.error(
"Failed to delete folder %s: %s",
sanitize_for_log(name),
sanitize_for_log(exc),
)

# CRITICAL FIX: Increased wait time for massive folders to clear

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Use lazy % formatting in logging functions Note

Use lazy % formatting in logging functions
if deletion_occurred:
if not USE_COLORS:
log.info(
Expand Down Expand Up @@ -2524,412 +2553,412 @@
return parser.parse_args()


def main():
"""
Main entry point for Control D Sync.

Loads environment configuration, validates inputs, warms up cache,
and syncs profiles. Supports interactive prompts for missing credentials
when running in a TTY. Prints summary statistics and exits with appropriate
status code.
"""
# SECURITY: Check .env permissions (after Colors is defined for NO_COLOR support)
# This must happen BEFORE load_dotenv() to prevent reading secrets from world-readable files
check_env_permissions()
load_dotenv()

global TOKEN
# Re-initialize TOKEN to pick up values from .env (since load_dotenv was delayed)
TOKEN = _clean_env_kv(os.getenv("TOKEN"), "TOKEN")

args = parse_args()

# Load persistent cache from disk (graceful degradation on any error)
# NOTE: Called only after successful argument parsing so that `--help` or
# argument errors do not perform unnecessary filesystem I/O or logging.
load_disk_cache()

# Handle --clear-cache: delete cache file and exit immediately
if args.clear_cache:
global _disk_cache
cache_file = get_cache_dir() / "blocklists.json"
if cache_file.exists():
try:
cache_file.unlink()
print(f"{Colors.GREEN}✓ Cleared blocklist cache: {cache_file}{Colors.ENDC}")
except OSError as e:
print(f"{Colors.FAIL}✗ Failed to clear cache: {e}{Colors.ENDC}")
exit(1)
else:
print(f"{Colors.CYAN}ℹ No cache file found, nothing to clear{Colors.ENDC}")
_disk_cache.clear()
exit(0)
profiles_arg = (
_clean_env_kv(args.profiles or os.getenv("PROFILE", ""), "PROFILE") or ""
)
profile_ids = [extract_profile_id(p) for p in profiles_arg.split(",") if p.strip()]
folder_urls = args.folder_url if args.folder_url else DEFAULT_FOLDER_URLS

# Interactive prompts for missing config
if not args.dry_run and sys.stdin.isatty():
if not profile_ids:
print(f"{Colors.CYAN}ℹ Profile ID is missing.{Colors.ENDC}")
print(
f"{Colors.CYAN} You can find this in the URL of your profile in the Control D Dashboard (or just paste the URL).{Colors.ENDC}"
)

def validate_profile_input(value: str) -> bool:
"""Validates one or more profile IDs from comma-separated input."""
ids = [extract_profile_id(p) for p in value.split(",") if p.strip()]
return bool(ids) and all(
validate_profile_id(pid, log_errors=False) for pid in ids
)

p_input = get_validated_input(
f"{Colors.BOLD}Enter Control D Profile ID:{Colors.ENDC} ",
validate_profile_input,
"Invalid ID(s) or URL(s). Must be a valid Profile ID or a Control D Profile URL. Comma-separate for multiple.",
)
profile_ids = [
extract_profile_id(p) for p in p_input.split(",") if p.strip()
]

if not TOKEN:
print(f"{Colors.CYAN}ℹ API Token is missing.{Colors.ENDC}")
print(
f"{Colors.CYAN} You can generate one at: https://controld.com/account/manage-account{Colors.ENDC}"
)

t_input = get_password(
f"{Colors.BOLD}Enter Control D API Token:{Colors.ENDC} ",
lambda x: len(x) > 8,
"Token seems too short. Please check your API token.",
)
TOKEN = t_input

if not profile_ids and not args.dry_run:
log.error(
"PROFILE missing and --dry-run not set. Provide --profiles or set PROFILE env."
)
exit(1)

if not TOKEN and not args.dry_run:
log.error("TOKEN missing and --dry-run not set. Set TOKEN env for live sync.")
exit(1)

warm_up_cache(folder_urls)

plan: List[Dict[str, Any]] = []
success_count = 0
sync_results = []

profile_id = "unknown"
start_time = time.time()

try:
for profile_id in profile_ids or ["dry-run-placeholder"]:
start_time = time.time()
# Skip validation for dry-run placeholder
if profile_id != "dry-run-placeholder" and not validate_profile_id(
profile_id
):
sync_results.append(
{
"profile": profile_id,
"folders": 0,
"rules": 0,
"status_label": "❌ Invalid Profile ID",
"success": False,
"duration": 0.0,
}
)
continue

log.info("Starting sync for profile %s", profile_id)
status = sync_profile(
profile_id,
folder_urls,
dry_run=args.dry_run,
no_delete=args.no_delete,
plan_accumulator=plan,
)
end_time = time.time()
duration = end_time - start_time

if status:
success_count += 1

# RESTORED STATS LOGIC: Calculate actual counts from the plan
entry = next((p for p in plan if p["profile"] == profile_id), None)
folder_count = len(entry["folders"]) if entry else 0
rule_count = sum(f["rules"] for f in entry["folders"]) if entry else 0

if args.dry_run:
status_text = "✅ Planned" if status else "❌ Failed (Dry)"
else:
status_text = "✅ Success" if status else "❌ Failed"

sync_results.append(
{
"profile": profile_id,
"folders": folder_count,
"rules": rule_count,
"status_label": status_text,
"success": status,
"duration": duration,
}
)
except KeyboardInterrupt:
duration = time.time() - start_time
print(
f"\n{Colors.WARNING}⚠️ Sync cancelled by user. Finishing current task...{Colors.ENDC}"
)

# Try to recover stats for the interrupted profile
entry = next((p for p in plan if p["profile"] == profile_id), None)
folder_count = len(entry["folders"]) if entry else 0
rule_count = sum(f["rules"] for f in entry["folders"]) if entry else 0

sync_results.append(
{
"profile": profile_id,
"folders": folder_count,
"rules": rule_count,
"status_label": "⛔ Cancelled",
"success": False,
"duration": duration,
}
)

if args.plan_json:
with open(args.plan_json, "w", encoding="utf-8") as f:
json.dump(plan, f, indent=2)
log.info("Plan written to %s", args.plan_json)

# Print Summary Table
# Determine the width for the Profile ID column (min 25)
max_profile_len = max((len(r["profile"]) for r in sync_results), default=25)
profile_col_width = max(25, max_profile_len)

# Column widths
w_profile = profile_col_width
w_folders = 10
w_rules = 12
w_duration = 10
w_status = 15

def make_col_separator(left, mid, right, horiz):
parts = [
horiz * (w_profile + 2),
horiz * (w_folders + 2),
horiz * (w_rules + 2),
horiz * (w_duration + 2),
horiz * (w_status + 2),
]
return left + mid.join(parts) + right

# Calculate table width using a dummy separator
dummy_sep = make_col_separator(Box.TL, Box.T, Box.TR, Box.H)
table_width = len(dummy_sep)

title_text = " DRY RUN SUMMARY " if args.dry_run else " SYNC SUMMARY "
title_color = Colors.CYAN if args.dry_run else Colors.HEADER

# Top Border (Single Cell for Title)
print("\n" + Box.TL + Box.H * (table_width - 2) + Box.TR)

# Title Row
visible_title = title_text.strip()
inner_width = table_width - 2
pad_left = (inner_width - len(visible_title)) // 2
pad_right = inner_width - len(visible_title) - pad_left
print(
f"{Box.V}{' ' * pad_left}{title_color}{visible_title}{Colors.ENDC}{' ' * pad_right}{Box.V}"
)

# Separator between Title and Headers (introduces columns)
print(make_col_separator(Box.L, Box.T, Box.R, Box.H))

# Header Row
print(
f"{Box.V} {Colors.BOLD}{'Profile ID':<{w_profile}}{Colors.ENDC} "
f"{Box.V} {Colors.BOLD}{'Folders':>{w_folders}}{Colors.ENDC} "
f"{Box.V} {Colors.BOLD}{'Rules':>{w_rules}}{Colors.ENDC} "
f"{Box.V} {Colors.BOLD}{'Duration':>{w_duration}}{Colors.ENDC} "
f"{Box.V} {Colors.BOLD}{'Status':<{w_status}}{Colors.ENDC} {Box.V}"
)

# Separator between Header and Body
print(make_col_separator(Box.L, Box.X, Box.R, Box.H))

# Rows
total_folders = 0
total_rules = 0
total_duration = 0.0

for res in sync_results:
# Use boolean success field for color logic
status_color = Colors.GREEN if res["success"] else Colors.FAIL

s_folders = f"{res['folders']:,}"
s_rules = f"{res['rules']:,}"
s_duration = f"{res['duration']:.1f}s"

print(
f"{Box.V} {res['profile']:<{w_profile}} "
f"{Box.V} {s_folders:>{w_folders}} "
f"{Box.V} {s_rules:>{w_rules}} "
f"{Box.V} {s_duration:>{w_duration}} "
f"{Box.V} {status_color}{res['status_label']:<{w_status}}{Colors.ENDC} {Box.V}"
)
total_folders += res["folders"]
total_rules += res["rules"]
total_duration += res["duration"]

# Separator between Body and Total
print(make_col_separator(Box.L, Box.X, Box.R, Box.H))

# Total Row
total = len(profile_ids or ["dry-run-placeholder"])
all_success = success_count == total

if args.dry_run:
if all_success:
total_status_text = "✅ Ready"
else:
total_status_text = "❌ Errors"
else:
if all_success:
total_status_text = "✅ All Good"
else:
total_status_text = "❌ Errors"

total_status_color = Colors.GREEN if all_success else Colors.FAIL

s_total_folders = f"{total_folders:,}"
s_total_rules = f"{total_rules:,}"
s_total_duration = f"{total_duration:.1f}s"

print(
f"{Box.V} {Colors.BOLD}{'TOTAL':<{w_profile}}{Colors.ENDC} "
f"{Box.V} {s_total_folders:>{w_folders}} "
f"{Box.V} {s_total_rules:>{w_rules}} "
f"{Box.V} {s_total_duration:>{w_duration}} "
f"{Box.V} {total_status_color}{total_status_text:<{w_status}}{Colors.ENDC} {Box.V}"
)
# Bottom Border
print(make_col_separator(Box.BL, Box.B, Box.BR, Box.H))

# Success Delight
if all_success and not args.dry_run:
print_success_message(profile_ids)

# Dry Run Next Steps
if args.dry_run:
print() # Spacer
if all_success:
# Build the suggested command once so it stays consistent between
# color and non-color output modes.
cmd_parts = ["python", "main.py"]
if profile_ids:
# Join multiple profiles if needed
p_str = ",".join(profile_ids)
else:
p_str = "<your-profile-id>"
cmd_parts.append(f"--profiles {p_str}")

# Reconstruct other args if they were used (optional but helpful)
if args.folder_url:
for url in args.folder_url:
cmd_parts.append(f"--folder-url {url}")

cmd_str = " ".join(cmd_parts)

if USE_COLORS:
print(f"{Colors.BOLD}👉 Ready to sync? Run the following command:{Colors.ENDC}")
print(f" {Colors.CYAN}{cmd_str}{Colors.ENDC}")
else:
print("👉 Ready to sync? Run the following command:")
print(f" {cmd_str}")

# Offer interactive restart if appropriate
prompt_for_interactive_restart(profile_ids)

else:
if USE_COLORS:
print(
f"{Colors.FAIL}⚠️ Dry run encountered errors. Please check the logs above.{Colors.ENDC}"
)
else:
print("⚠️ Dry run encountered errors. Please check the logs above.")

# Display API statistics
total_api_calls = _api_stats["control_d_api_calls"] + _api_stats["blocklist_fetches"]
if total_api_calls > 0:
print(f"{Colors.BOLD}API Statistics:{Colors.ENDC}")
print(f" • Control D API calls: {_api_stats['control_d_api_calls']:>7,}")
print(f" • Blocklist fetches: {_api_stats['blocklist_fetches']:>7,}")
print(f" • Total API requests: {total_api_calls:>7,}")
print()

# Display cache statistics if any cache activity occurred
if _cache_stats["hits"] + _cache_stats["misses"] + _cache_stats["validations"] > 0:
print(f"{Colors.BOLD}Cache Statistics:{Colors.ENDC}")
print(f" • Hits (in-memory): {_cache_stats['hits']:>7,}")
print(f" • Misses (downloaded): {_cache_stats['misses']:>7,}")
print(f" • Validations (304): {_cache_stats['validations']:>7,}")
if _cache_stats["errors"] > 0:
print(f" • Errors (non-fatal): {_cache_stats['errors']:>7,}")

# Calculate cache effectiveness
total_requests = _cache_stats["hits"] + _cache_stats["misses"] + _cache_stats["validations"]
if total_requests > 0:
# Hits + validations = avoided full downloads
cache_effectiveness = (_cache_stats["hits"] + _cache_stats["validations"]) / total_requests * 100
print(f" • Cache effectiveness: {cache_effectiveness:>6.1f}%")
print()

# Display rate limit information if available
with _rate_limit_lock:
if any(v is not None for v in _rate_limit_info.values()):
print(f"{Colors.BOLD}API Rate Limit Status:{Colors.ENDC}")

if _rate_limit_info["limit"] is not None:
print(f" • Requests limit: {_rate_limit_info['limit']:>6,}")

if _rate_limit_info["remaining"] is not None:
remaining = _rate_limit_info["remaining"]
limit = _rate_limit_info["limit"]

# Color code based on remaining capacity
if limit and limit > 0:
pct = (remaining / limit) * 100
if pct < 20:
color = Colors.FAIL # Red for critical
elif pct < 50:
color = Colors.WARNING # Yellow for caution
else:
color = Colors.GREEN # Green for healthy
print(f" • Requests remaining: {color}{remaining:>6,} ({pct:>5.1f}%){Colors.ENDC}")
else:
print(f" • Requests remaining: {remaining:>6,}")

if _rate_limit_info["reset"] is not None:
reset_time = time.strftime(
"%H:%M:%S",
time.localtime(_rate_limit_info["reset"])
)
print(f" • Limit resets at: {reset_time}")

print()

# Save cache to disk after successful sync (non-fatal if it fails)
if not args.dry_run:
save_disk_cache()

total = len(profile_ids or ["dry-run-placeholder"])
log.info(f"All profiles processed: {success_count}/{total} successful")
exit(0 if success_count == total else 1)

Check notice on line 2961 in main.py

View check run for this annotation

codefactor.io / CodeFactor

main.py#L2556-L2961

Complex Method


if __name__ == "__main__":
Expand Down
118 changes: 118 additions & 0 deletions tests/test_warm_up_cache_perf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
"""
Tests for the warm_up_cache optimization.

This module verifies that warm_up_cache skips DNS validation when the
URL is already fresh in the disk cache.
"""
import unittest
from unittest.mock import patch, MagicMock

Check warning

Code scanning / Prospector (reported by Codacy)

Unused MagicMock imported from unittest.mock (unused-import) Warning test

Unused MagicMock imported from unittest.mock (unused-import)

Check notice

Code scanning / Pylint (reported by Codacy)

Unused MagicMock imported from unittest.mock Note test

Unused MagicMock imported from unittest.mock

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Unused MagicMock imported from unittest.mock Note test

Unused MagicMock imported from unittest.mock
Copy link

Copilot AI Feb 22, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tests/test_warm_up_cache_perf.py imports MagicMock but never uses it, which adds noise and can hide real unused-import issues. Remove the unused import to keep the test focused.

Suggested change
from unittest.mock import patch, MagicMock
from unittest.mock import patch

Copilot uses AI. Check for mistakes.
import sys
import os
import time

# Add root to path to import main
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))

# We do NOT import main at top level to avoid holding a stale reference
# if other tests reload the module.

class TestWarmUpCachePerf(unittest.TestCase):

Check warning

Code scanning / Pylint (reported by Codacy)

Missing class docstring Warning test

Missing class docstring

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Missing class docstring Warning test

Missing class docstring
def setUp(self):
"""Reset cache state before each test."""
# Get the current main module from sys.modules
# If not present, import it
if 'main' in sys.modules:
self.main = sys.modules['main']
else:
import main

Check warning

Code scanning / Prospector (reported by Codacy)

Import "import main" should be placed at the top of the module (wrong-import-position) Warning test

Import "import main" should be placed at the top of the module (wrong-import-position)

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Import outside toplevel (main) Warning test

Import outside toplevel (main)
self.main = main

self.main._cache.clear()

Check warning

Code scanning / Prospector (reported by Codacy)

Access to a protected member _cache of a client class (protected-access) Warning test

Access to a protected member _cache of a client class (protected-access)

Check notice

Code scanning / Pylint (reported by Codacy)

Access to a protected member _cache of a client class Note test

Access to a protected member _cache of a client class

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Access to a protected member _cache of a client class Note test

Access to a protected member _cache of a client class
self.main._disk_cache.clear()

Check warning

Code scanning / Prospector (reported by Codacy)

Access to a protected member _disk_cache of a client class (protected-access) Warning test

Access to a protected member _disk_cache of a client class (protected-access)

Check notice

Code scanning / Pylint (reported by Codacy)

Access to a protected member _disk_cache of a client class Note test

Access to a protected member _disk_cache of a client class

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Access to a protected member _disk_cache of a client class Note test

Access to a protected member _disk_cache of a client class
self.main.validate_folder_url.cache_clear()

def tearDown(self):
"""Clean up after each test."""
self.main._cache.clear()

Check warning

Code scanning / Prospector (reported by Codacy)

Access to a protected member _cache of a client class (protected-access) Warning test

Access to a protected member _cache of a client class (protected-access)

Check notice

Code scanning / Pylint (reported by Codacy)

Access to a protected member _cache of a client class Note test

Access to a protected member _cache of a client class

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Access to a protected member _cache of a client class Note test

Access to a protected member _cache of a client class
self.main._disk_cache.clear()

Check warning

Code scanning / Prospector (reported by Codacy)

Access to a protected member _disk_cache of a client class (protected-access) Warning test

Access to a protected member _disk_cache of a client class (protected-access)

Check notice

Code scanning / Pylint (reported by Codacy)

Access to a protected member _disk_cache of a client class Note test

Access to a protected member _disk_cache of a client class

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Access to a protected member _disk_cache of a client class Note test

Access to a protected member _disk_cache of a client class
self.main.validate_folder_url.cache_clear()

def test_warm_up_skips_validation_for_fresh_cache(self):
"""
Test that warm_up_cache does NOT call validate_folder_url
if the URL is present in _disk_cache and fresh (within TTL).
"""
test_url = "https://example.com/fresh.json"
test_data = {"group": {"group": "Fresh Folder"}, "rules": []}

# Populate disk cache with a FRESH entry
with self.main._cache_lock:

Check warning

Code scanning / Prospector (reported by Codacy)

Access to a protected member _cache_lock of a client class (protected-access) Warning test

Access to a protected member _cache_lock of a client class (protected-access)

Check notice

Code scanning / Pylint (reported by Codacy)

Access to a protected member _cache_lock of a client class Note test

Access to a protected member _cache_lock of a client class

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Access to a protected member _cache_lock of a client class Note test

Access to a protected member _cache_lock of a client class
self.main._disk_cache[test_url] = {

Check warning

Code scanning / Prospector (reported by Codacy)

Access to a protected member _disk_cache of a client class (protected-access) Warning test

Access to a protected member _disk_cache of a client class (protected-access)

Check notice

Code scanning / Pylint (reported by Codacy)

Access to a protected member _disk_cache of a client class Note test

Access to a protected member _disk_cache of a client class

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Access to a protected member _disk_cache of a client class Note test

Access to a protected member _disk_cache of a client class
"data": test_data,
"fetched_at": time.time(),
"last_validated": time.time(), # Just now
"etag": "123",
"last_modified": "Tue, 15 Nov 1994 12:45:26 GMT"
}

# Mock validate_folder_url to track calls
with patch('main.validate_folder_url') as mock_validate:
# Mock _gh_get to return data without network
with patch('main._gh_get', return_value=test_data) as mock_gh_get:

self.main.warm_up_cache([test_url])

# VERIFICATION: validate_folder_url should NOT be called
mock_validate.assert_not_called()

# _gh_get should still be called (it handles cache retrieval)
mock_gh_get.assert_called_with(test_url)

def test_warm_up_calls_validation_for_stale_cache(self):
"""
Test that warm_up_cache DOES call validate_folder_url
if the URL is present in _disk_cache but STALE (expired TTL).
"""
test_url = "https://example.com/stale.json"
test_data = {"group": {"group": "Stale Folder"}, "rules": []}

# Populate disk cache with a STALE entry (older than TTL)
stale_time = time.time() - (self.main.CACHE_TTL_SECONDS + 3600) # 1 hour past TTL
with self.main._cache_lock:

Check warning

Code scanning / Prospector (reported by Codacy)

Access to a protected member _cache_lock of a client class (protected-access) Warning test

Access to a protected member _cache_lock of a client class (protected-access)

Check notice

Code scanning / Pylint (reported by Codacy)

Access to a protected member _cache_lock of a client class Note test

Access to a protected member _cache_lock of a client class

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Access to a protected member _cache_lock of a client class Note test

Access to a protected member _cache_lock of a client class
self.main._disk_cache[test_url] = {

Check warning

Code scanning / Prospector (reported by Codacy)

Access to a protected member _disk_cache of a client class (protected-access) Warning test

Access to a protected member _disk_cache of a client class (protected-access)

Check notice

Code scanning / Pylint (reported by Codacy)

Access to a protected member _disk_cache of a client class Note test

Access to a protected member _disk_cache of a client class

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Access to a protected member _disk_cache of a client class Note test

Access to a protected member _disk_cache of a client class
"data": test_data,
"fetched_at": stale_time,
"last_validated": stale_time,
"etag": "123",
"last_modified": "Tue, 15 Nov 1994 12:45:26 GMT"
}

with patch('main.validate_folder_url', return_value=True) as mock_validate:
with patch('main._gh_get', return_value=test_data):

self.main.warm_up_cache([test_url])

# VERIFICATION: validate_folder_url SHOULD be called for stale entry
mock_validate.assert_called_with(test_url)

def test_warm_up_calls_validation_for_missing_cache(self):
"""
Test that warm_up_cache DOES call validate_folder_url
if the URL is NOT present in _disk_cache.
"""
test_url = "https://example.com/missing.json"
test_data = {"group": {"group": "Missing Folder"}, "rules": []}

# Ensure cache is empty
self.assertNotIn(test_url, self.main._disk_cache)

Check warning

Code scanning / Prospector (reported by Codacy)

Access to a protected member _disk_cache of a client class (protected-access) Warning test

Access to a protected member _disk_cache of a client class (protected-access)

Check notice

Code scanning / Pylint (reported by Codacy)

Access to a protected member _disk_cache of a client class Note test

Access to a protected member _disk_cache of a client class

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Access to a protected member _disk_cache of a client class Note test

Access to a protected member _disk_cache of a client class

with patch('main.validate_folder_url', return_value=True) as mock_validate:
with patch('main._gh_get', return_value=test_data):

self.main.warm_up_cache([test_url])

# VERIFICATION: validate_folder_url SHOULD be called for missing entry
mock_validate.assert_called_with(test_url)

if __name__ == '__main__':

Check warning

Code scanning / Prospector (reported by Codacy)

expected 2 blank lines after class or function definition, found 1 (E305) Warning test

expected 2 blank lines after class or function definition, found 1 (E305)
unittest.main()
Comment on lines +1 to +118

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

This new test file effectively verifies the cache warm-up optimization. It covers fresh, stale, and missing cache scenarios, ensuring that validate_folder_url is called only when necessary. This is a good addition to the test suite, improving confidence in the new caching logic.

Loading