Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 47 additions & 20 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,13 +263,27 @@
print(" No folders to sync.")
return

for folder in sorted(folders, key=lambda f: f.get("name", "")):
# Calculate max width for alignment
max_name_len = max(
# Use the same default ("Unknown") as when printing, so alignment is accurate
(len(sanitize_for_log(f.get("name", "Unknown"))) for f in folders),
default=0,
)
max_rules_len = max((len(f"{f.get('rules', 0):,}") for f in folders), default=0)

for folder in sorted(folders, key=lambda f: f.get("name", "Unknown")):
name = sanitize_for_log(folder.get("name", "Unknown"))
rules_count = folder.get("rules", 0)
formatted_rules = f"{rules_count:,}"

if USE_COLORS:
print(f" • {Colors.BOLD}{name}{Colors.ENDC}: {rules_count} rules")
print(
f" • {Colors.BOLD}{name:<{max_name_len}}{Colors.ENDC} : {formatted_rules:>{max_rules_len}} rules"
)
else:
print(f" - {name}: {rules_count} rules")
print(
f" - {name:<{max_name_len}} : {formatted_rules:>{max_rules_len}} rules"
)

print("")

Expand Down Expand Up @@ -343,15 +357,34 @@
prompt: str,
validator: Callable[[str], bool],
error_msg: str,
is_password: bool = False,
) -> str:
"""Prompts for input until the validator returns True."""
while True:
try:
if is_password:
value = getpass.getpass(prompt).strip()
else:
value = input(prompt).strip()
value = input(prompt).strip()
except (KeyboardInterrupt, EOFError):
print(f"\n{Colors.WARNING}⚠️ Input cancelled.{Colors.ENDC}")
sys.exit(130)

if not value:
print(f"{Colors.FAIL}❌ Value cannot be empty{Colors.ENDC}")
continue

if validator(value):
return value

print(f"{Colors.FAIL}❌ {error_msg}{Colors.ENDC}")


def get_password(
prompt: str,
validator: Callable[[str], bool],
error_msg: str,
) -> str:
"""Prompts for password input until the validator returns True."""
while True:
try:
value = getpass.getpass(prompt).strip()
except (KeyboardInterrupt, EOFError):
print(f"\n{Colors.WARNING}⚠️ Input cancelled.{Colors.ENDC}")
sys.exit(130)
Expand Down Expand Up @@ -807,192 +840,192 @@
time.sleep(wait_time)


def _gh_get(url: str) -> Dict:
"""
Fetch blocklist data from URL with HTTP cache header support.

CACHING STRATEGY:
1. Check in-memory cache first (fastest)
2. Check disk cache and send conditional request (If-None-Match/If-Modified-Since)
3. If 304 Not Modified: reuse cached data (cache validation)
4. If 200 OK: download new data and update cache

SECURITY: Validates data structure regardless of cache source
"""
global _cache_stats

# First check: Quick check without holding lock for long
with _cache_lock:
if url in _cache:
_cache_stats["hits"] += 1
return _cache[url]

# Check disk cache for conditional request headers
headers = {}
cached_entry = _disk_cache.get(url)
if cached_entry:
# Send conditional request using cached ETag/Last-Modified
# Server returns 304 if content hasn't changed
# NOTE: Cached values may be None if the server didn't send these headers.
# httpx requires header values to be str/bytes, so we only add headers
# when the cached value is truthy.
etag = cached_entry.get("etag")
if etag:
headers["If-None-Match"] = etag
last_modified = cached_entry.get("last_modified")
if last_modified:
headers["If-Modified-Since"] = last_modified

# Fetch data (or validate cache)
# Explicitly let HTTPError propagate (no need to catch just to re-raise)
try:
with _gh.stream("GET", url, headers=headers) as r:
# Handle 304 Not Modified - cached data is still valid
if r.status_code == 304:
if cached_entry and "data" in cached_entry:
log.debug(f"Cache validated (304) for {sanitize_for_log(url)}")
_cache_stats["validations"] += 1

# Update in-memory cache with validated data
data = cached_entry["data"]
with _cache_lock:
_cache[url] = data

# Update timestamp in disk cache to track last validation
cached_entry["last_validated"] = time.time()
return data
else:
# Shouldn't happen, but handle gracefully
log.warning(f"Got 304 but no cached data for {sanitize_for_log(url)}, re-fetching")
_cache_stats["errors"] += 1
# Close the original streaming response before retrying
r.close()
# Retry without conditional headers using streaming again so that
# MAX_RESPONSE_SIZE and related protections still apply.
headers = {}
with _gh.stream("GET", url, headers=headers) as r_retry:
r_retry.raise_for_status()

# 1. Check Content-Length header if present
cl = r_retry.headers.get("Content-Length")
if cl:
try:
if int(cl) > MAX_RESPONSE_SIZE:
raise ValueError(
f"Response too large from {sanitize_for_log(url)} "
f"({int(cl) / (1024 * 1024):.2f} MB)"
)
except ValueError as e:
# Only catch the conversion error, let the size error propagate
if "Response too large" in str(e):
raise e
log.warning(
f"Malformed Content-Length header from {sanitize_for_log(url)}: {cl!r}. "
"Falling back to streaming size check."
)

# 2. Stream and check actual size
chunks = []
current_size = 0
for chunk in r_retry.iter_bytes():
current_size += len(chunk)
if current_size > MAX_RESPONSE_SIZE:
raise ValueError(
f"Response too large from {sanitize_for_log(url)} "
f"(> {MAX_RESPONSE_SIZE / (1024 * 1024):.2f} MB)"
)
chunks.append(chunk)

try:
data = json.loads(b"".join(chunks))
except json.JSONDecodeError as e:
raise ValueError(
f"Invalid JSON response from {sanitize_for_log(url)}"
) from e

# Store cache headers for future conditional requests
# ETag is preferred over Last-Modified (more reliable)
etag = r_retry.headers.get("ETag")
last_modified = r_retry.headers.get("Last-Modified")

# Update disk cache with new data and headers
_disk_cache[url] = {
"data": data,
"etag": etag,
"last_modified": last_modified,
"fetched_at": time.time(),
"last_validated": time.time(),
}

_cache_stats["misses"] += 1
return data

r.raise_for_status()

# 1. Check Content-Length header if present
cl = r.headers.get("Content-Length")
if cl:
try:
if int(cl) > MAX_RESPONSE_SIZE:
raise ValueError(
f"Response too large from {sanitize_for_log(url)} "
f"({int(cl) / (1024 * 1024):.2f} MB)"
)
except ValueError as e:
# Only catch the conversion error, let the size error propagate
if "Response too large" in str(e):
raise e
log.warning(
f"Malformed Content-Length header from {sanitize_for_log(url)}: {cl!r}. "
"Falling back to streaming size check."
)

# 2. Stream and check actual size
chunks = []
current_size = 0
for chunk in r.iter_bytes():
current_size += len(chunk)
if current_size > MAX_RESPONSE_SIZE:
raise ValueError(
f"Response too large from {sanitize_for_log(url)} "
f"(> {MAX_RESPONSE_SIZE / (1024 * 1024):.2f} MB)"
)
chunks.append(chunk)

try:
data = json.loads(b"".join(chunks))
except json.JSONDecodeError as e:
raise ValueError(
f"Invalid JSON response from {sanitize_for_log(url)}"
) from e

# Store cache headers for future conditional requests
# ETag is preferred over Last-Modified (more reliable)
etag = r.headers.get("ETag")
last_modified = r.headers.get("Last-Modified")

# Update disk cache with new data and headers
_disk_cache[url] = {
"data": data,
"etag": etag,
"last_modified": last_modified,
"fetched_at": time.time(),
"last_validated": time.time(),
}

_cache_stats["misses"] += 1

except httpx.HTTPStatusError as e:
# Re-raise with original exception (don't catch and re-raise)
raise

# Double-checked locking: Check again after fetch to avoid duplicate fetches
# If another thread already cached it while we were fetching, use theirs
# for consistency (return _cache[url] instead of data to ensure single source of truth)
with _cache_lock:
if url not in _cache:
_cache[url] = data
return _cache[url]

Check notice on line 1028 in main.py

View check run for this annotation

codefactor.io / CodeFactor

main.py#L843-L1028

Complex Method


def check_api_access(client: httpx.Client, profile_id: str) -> bool:
Expand Down Expand Up @@ -1224,7 +1257,7 @@
f"Failed to fetch rules for folder ID {folder_id}: {sanitize_for_log(e)}"
)

log.info(f"Total existing rules across all folders: {len(all_rules)}")
log.info(f"Total existing rules across all folders: {len(all_rules):,}")
return all_rules
except Exception as e:
log.error(f"Failed to get existing rules: {sanitize_for_log(e)}")
Expand All @@ -1247,7 +1280,7 @@

total = len(urls_to_process)
if not USE_COLORS:
log.info(f"Warming up cache for {total} URLs...")
log.info(f"Warming up cache for {total:,} URLs...")

# OPTIMIZATION: Combine validation (DNS) and fetching (HTTP) in one task
# to allow validation latency to be parallelized.
Expand Down Expand Up @@ -1463,10 +1496,7 @@
_api_post_form(client, f"{API_BASE}/{profile_id}/rules", data=data)
if not USE_COLORS:
log.info(
"Folder %s – batch %d: added %d rules",
sanitize_for_log(folder_name),
batch_idx,
len(batch_data),
f"Folder {sanitize_for_log(folder_name)} – batch {batch_idx}: added {len(batch_data):,} rules"
)
return batch_data
except httpx.HTTPError as e:
Expand Down Expand Up @@ -1515,14 +1545,12 @@
if successful_batches == total_batches:
if USE_COLORS:
sys.stderr.write(
f"\r\033[K{Colors.GREEN}✅ Folder {sanitize_for_log(folder_name)}: Finished ({len(filtered_hostnames)} rules){Colors.ENDC}\n"
f"\r\033[K{Colors.GREEN}✅ Folder {sanitize_for_log(folder_name)}: Finished ({len(filtered_hostnames):,} rules){Colors.ENDC}\n"
)
sys.stderr.flush()
else:
log.info(
"Folder %s – finished (%d new rules added)",
sanitize_for_log(folder_name),
len(filtered_hostnames),
f"Folder {sanitize_for_log(folder_name)} – finished ({len(filtered_hostnames):,} new rules added)"
)
return True
else:
Expand Down Expand Up @@ -1854,11 +1882,10 @@
f"{Colors.CYAN} You can generate one at: https://controld.com/account/manage-account{Colors.ENDC}"
)

t_input = get_validated_input(
t_input = get_password(
f"{Colors.BOLD}Enter Control D API Token:{Colors.ENDC} ",
lambda x: len(x) > 8,
"Token seems too short. Please check your API token.",
is_password=True,
)
TOKEN = t_input

Expand Down
60 changes: 44 additions & 16 deletions test_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -495,16 +495,16 @@
assert "Error message" in captured.out


# Case 12: get_validated_input works with getpass
def test_get_validated_input_password(monkeypatch):
# Case 12: get_password works with getpass
def test_get_password(monkeypatch):
m = reload_main_with_env(monkeypatch)

getpass_mock = MagicMock(return_value="secret")
monkeypatch.setattr("getpass.getpass", getpass_mock)

validator = lambda x: True

result = m.get_validated_input("Password: ", validator, "Error", is_password=True)
result = m.get_password("Password: ", validator, "Error")

assert result == "secret"
getpass_mock.assert_called_once()
Comment on lines +498 to 510
Copy link

Copilot AI Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test test_get_password only covers the happy path where the validator returns True. Consider adding a test similar to test_get_validated_input_retry that tests the retry logic for get_password, including:

  • Empty password input (should show "Value cannot be empty")
  • Invalid password that fails validation (should show the error message)
  • Valid password on subsequent retry

This would ensure both functions have equivalent test coverage.

Copilot uses AI. Check for mistakes.
Expand Down Expand Up @@ -552,25 +552,53 @@
assert "Input cancelled" in captured.out


# Case 15: get_validated_input handles both KeyboardInterrupt and EOFError for regular and password inputs
def test_get_password_interrupt(monkeypatch, capsys):
m = reload_main_with_env(monkeypatch)

# Mock input to raise KeyboardInterrupt
monkeypatch.setattr("getpass.getpass", MagicMock(side_effect=KeyboardInterrupt))

with pytest.raises(SystemExit) as e:
m.get_password("Prompt: ", lambda x: True, "Error")

# Check exit code is 130
assert e.value.code == 130

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.

# Check friendly message
captured = capsys.readouterr()
assert "Input cancelled" in captured.out

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.


# Case 15: get_validated_input and get_password handle both KeyboardInterrupt and EOFError
@pytest.mark.parametrize("exception", [KeyboardInterrupt, EOFError])
def test_get_validated_input_graceful_exit(monkeypatch, capsys, exception):
"""Test graceful exit on user cancellation (Ctrl+C/Ctrl+D) for regular inputs."""
m = reload_main_with_env(monkeypatch)

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.

# Mock input to raise the specified exception
monkeypatch.setattr("builtins.input", MagicMock(side_effect=exception))

with pytest.raises(SystemExit) as e:
m.get_validated_input("Prompt: ", lambda x: True, "Error")

# Check exit code is 130 (standard for SIGINT)
assert e.value.code == 130

# Check friendly cancellation message is displayed
captured = capsys.readouterr()
assert "Input cancelled" in captured.out

Check notice

Code scanning / Bandit

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code. Note test

Use of assert detected. The enclosed code will be removed when compiling to optimised byte code.


@pytest.mark.parametrize("exception", [KeyboardInterrupt, EOFError])
@pytest.mark.parametrize(
"is_password,mock_path",
[(False, "builtins.input"), (True, "getpass.getpass")],
)
def test_get_validated_input_graceful_exit_comprehensive(
monkeypatch, capsys, exception, is_password, mock_path
):
"""Test graceful exit on user cancellation (Ctrl+C/Ctrl+D) for both regular and password inputs."""
def test_get_password_graceful_exit(monkeypatch, capsys, exception):
"""Test graceful exit on user cancellation (Ctrl+C/Ctrl+D) for password inputs."""
m = reload_main_with_env(monkeypatch)

# Mock input to raise the specified exception
monkeypatch.setattr(mock_path, MagicMock(side_effect=exception))
monkeypatch.setattr("getpass.getpass", MagicMock(side_effect=exception))

with pytest.raises(SystemExit) as e:
m.get_validated_input(
"Prompt: ", lambda x: True, "Error", is_password=is_password
)
m.get_password("Prompt: ", lambda x: True, "Error")

# Check exit code is 130 (standard for SIGINT)
assert e.value.code == 130
Expand Down
Loading