Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 23 additions & 10 deletions main.py
Original file line number Diff line number Diff line change
Expand Up @@ -407,7 +407,7 @@
during warm-up and sync phases.
"""
if not url.startswith("https://"):
log.warning(

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Use lazy % formatting in logging functions Note

Use lazy % formatting in logging functions
f"Skipping unsafe or invalid URL (must be https): {sanitize_for_log(url)}"
)
return False
Expand All @@ -420,7 +420,7 @@

# Check for potentially malicious hostnames
if hostname.lower() in ("localhost", "127.0.0.1", "::1"):
log.warning(

Check warning

Code scanning / Prospector (reported by Codacy)

Use lazy % formatting in logging functions (logging-fstring-interpolation) Warning

Use lazy % formatting in logging functions (logging-fstring-interpolation)
f"Skipping unsafe URL (localhost detected): {sanitize_for_log(url)}"
)
return False
Expand All @@ -428,7 +428,7 @@
try:
ip = ipaddress.ip_address(hostname)
if not ip.is_global or ip.is_multicast:
log.warning(

Check warning

Code scanning / Prospector (reported by Codacy)

Use lazy % formatting in logging functions (logging-fstring-interpolation) Warning

Use lazy % formatting in logging functions (logging-fstring-interpolation)
f"Skipping unsafe URL (non-global/multicast IP): {sanitize_for_log(url)}"
)
return False
Expand All @@ -449,11 +449,15 @@
)
return False
except (socket.gaierror, ValueError, OSError) as e:
log.warning(f"Failed to resolve/validate domain {hostname}: {e}")
log.warning(
f"Failed to resolve/validate domain {hostname}: {sanitize_for_log(e)}"
)
return False

except Exception as e:
log.warning(f"Failed to validate URL {sanitize_for_log(url)}: {e}")
log.warning(

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Use lazy % formatting in logging functions Note

Use lazy % formatting in logging functions
f"Failed to validate URL {sanitize_for_log(url)}: {sanitize_for_log(e)}"
)
Comment on lines +452 to +460
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only check_api_access() has a regression test for exception-message redaction. The new/updated exception sanitization in validate_folder_url(), _fetch_folder_rules(), and create_folder() isn’t covered by tests (no tests assert these log messages are redacted), so it’s easy for future changes to reintroduce secret leakage in those paths. Add targeted tests that raise exceptions containing a secret/token and assert the logged message contains "[REDACTED]" and not the secret.

Copilot generated this review using guidance from repository custom instructions.
return False

return True
Expand Down Expand Up @@ -602,7 +606,8 @@
raise
wait_time = delay * (2**attempt)
log.warning(
f"Request failed (attempt {attempt + 1}/{max_retries}): {e}. Retrying in {wait_time}s..."
f"Request failed (attempt {attempt + 1}/{max_retries}): "
f"{sanitize_for_log(e)}. Retrying in {wait_time}s..."
)
time.sleep(wait_time)

Expand Down Expand Up @@ -697,10 +702,10 @@
f"{Colors.FAIL} Please verify the Profile ID from your Control D Dashboard URL.{Colors.ENDC}"
)
else:
log.error(f"API Access Check Failed ({code}): {e}")
log.error(f"API Access Check Failed ({code}): {sanitize_for_log(e)}")
return False
except httpx.RequestError as e:
log.error(f"Network Error during access check: {e}")
log.error(f"Network Error during access check: {sanitize_for_log(e)}")

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Use lazy % formatting in logging functions Note

Use lazy % formatting in logging functions
return False
Comment on lines +705 to 709
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change sanitizes exceptions in a few places, but main.py still has log statements that interpolate the raw exception ({e}) without sanitize_for_log, so sensitive URLs/tokens can still leak (e.g., the retry warning in _retry_request, the cache warmup Failed to pre-fetch ...: {e} warning, and the Could not extract ID ...: {e} debug log). To fully address the vulnerability described in the PR, sanitize those remaining exception logs as well (either by wrapping the exception with sanitize_for_log(...) or logging a sanitized str(e) via logger args).

Copilot uses AI. Check for mistakes.
Comment on lines +705 to 709
Copy link

Copilot AI Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR description says all exceptions logged via log.error/log.warning are now sanitized, but main.py still logs raw exceptions in a few places (e.g., _retry_request() warning and warm_up_cache() warning, plus a create_folder() debug). That means sensitive URLs/tokens can still leak in logs; either extend the fix to those remaining log lines or narrow the PR description to match what’s actually covered.

Copilot uses AI. Check for mistakes.


Expand Down Expand Up @@ -852,7 +857,7 @@
except Exception as e:
# We log error but don't stop the whole process;
# individual folder failure shouldn't crash the sync
log.warning(f"Error fetching rules for folder {folder_id}: {e}")
log.warning(f"Error fetching rules for folder {folder_id}: {sanitize_for_log(e)}")

Check warning

Code scanning / Prospector (reported by Codacy)

Use lazy % formatting in logging functions (logging-fstring-interpolation) Warning

Use lazy % formatting in logging functions (logging-fstring-interpolation)

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Use lazy % formatting in logging functions Note

Use lazy % formatting in logging functions
return []

try:
Expand Down Expand Up @@ -888,7 +893,9 @@
all_rules.update(result)
except Exception as e:
folder_id = future_to_folder[future]
log.warning(f"Failed to fetch rules for folder ID {folder_id}: {e}")
log.warning(

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Use lazy % formatting in logging functions Note

Use lazy % formatting in logging functions

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Use lazy % formatting in logging functions Note

Use lazy % formatting in logging functions
f"Failed to fetch rules for folder ID {folder_id}: {sanitize_for_log(e)}"
)

log.info(f"Total existing rules across all folders: {len(all_rules)}")
return all_rules
Expand Down Expand Up @@ -934,7 +941,7 @@
completed += 1
render_progress_bar(completed, total, "Warming up cache", prefix="⏳")
try:
future.result()

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Use lazy % formatting in logging functions Note

Use lazy % formatting in logging functions
except Exception as e:
if USE_COLORS:
# Clear line to print warning cleanly
Expand All @@ -942,7 +949,8 @@
sys.stderr.flush()

log.warning(
f"Failed to pre-fetch {sanitize_for_log(futures[future])}: {e}"
f"Failed to pre-fetch {sanitize_for_log(futures[future])}: "
f"{sanitize_for_log(e)}"
)
# Restore progress bar after warning
render_progress_bar(completed, total, "Warming up cache", prefix="⏳")
Expand Down Expand Up @@ -1007,7 +1015,10 @@
)
return str(grp["PK"])
except Exception as e:
log.debug(f"Could not extract ID from POST response: {e}")
log.debug(

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Use lazy % formatting in logging functions Note

Use lazy % formatting in logging functions
f"Could not extract ID from POST response: "
f"{sanitize_for_log(e)}"
)

# 2. Fallback: Poll for the new folder (The Robust Retry Logic)
for attempt in range(MAX_RETRIES + 1):
Expand All @@ -1024,7 +1035,9 @@
)
return str(grp["PK"])
except Exception as e:
log.warning(f"Error fetching groups on attempt {attempt}: {e}")
log.warning(
f"Error fetching groups on attempt {attempt}: {sanitize_for_log(e)}"
)

if attempt < MAX_RETRIES:
wait_time = FOLDER_CREATION_DELAY * (attempt + 1)
Expand Down Expand Up @@ -1056,11 +1069,11 @@
client: httpx.Client,
) -> bool:
if not hostnames:
log.info("Folder %s - no rules to push", sanitize_for_log(folder_name))

Check warning

Code scanning / Prospector (reported by Codacy)

Use lazy % formatting in logging functions (logging-fstring-interpolation) Warning

Use lazy % formatting in logging functions (logging-fstring-interpolation)
return True

original_count = len(hostnames)

Check warning

Code scanning / Prospector (reported by Codacy)

Use lazy % formatting in logging functions (logging-fstring-interpolation) Warning

Use lazy % formatting in logging functions (logging-fstring-interpolation)
# Optimization 1: Deduplicate input list while preserving order
# Optimization 2: Check directly against existing_rules to avoid O(N) copy.
seen = set()
Expand Down
255 changes: 255 additions & 0 deletions tests/test_exception_logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,255 @@
"""Tests for exception logging and sanitization.

This module verifies that sensitive data (tokens, secrets, credentials) in
exception messages are properly redacted before being written to logs.
"""

import unittest
from unittest.mock import MagicMock, patch
import httpx
import main


class TestExceptionLogging(unittest.TestCase):
"""Test suite for exception logging with sanitization."""

@patch('main.log')
@patch('main.TOKEN', 'TEST_TOKEN_XYZ')
def test_check_api_access_redacts_exception(self, mock_log):
"""Test that check_api_access redacts tokens in exception messages."""
# Setup
client = MagicMock()
profile_id = "test_profile"

# Simulate a RequestError containing the test token
# This simulates a situation where the exception message might
# contain the URL with token
error_message = (
"Connection error to "
"https://api.controld.com?token=TEST_TOKEN_XYZ"
)
client.get.side_effect = httpx.RequestError(
error_message, request=MagicMock()
)

# Action
main.check_api_access(client, profile_id)

# Assertion
# We expect log.error to be called
self.assertTrue(
mock_log.error.called, "log.error should have been called"
)

# Check the arguments passed to log.error
# The code is: log.error(f"Network Error during access check: {e}")
# So the first argument should contain the redacted message
# Since it is an f-string, the redaction must happen BEFORE
# formatting or inside the f-string expression
args = mock_log.error.call_args[0]
logged_message = args[0]

# Verify secret is NOT present
self.assertNotIn(
"TEST_TOKEN_XYZ", logged_message, "Secret token leaked in logs!"
)

# Verify redaction placeholder IS present
self.assertIn(
"[REDACTED]", logged_message, "Redaction placeholder missing!"
)

@patch('main.log')
@patch('main.socket.getaddrinfo')
def test_validate_folder_url_redacts_exception(
self, mock_getaddrinfo, mock_log

Check warning

Code scanning / Pylint (reported by Codacy)

Wrong hanging indentation before block (add 4 spaces). Warning test

Wrong hanging indentation before block (add 4 spaces).
):
"""Test validate_folder_url redacts sensitive data in exceptions."""
# Setup - simulate an exception during DNS resolution that
# contains a sensitive URL
test_url = "https://example.com/list?api_key=SECRET_API_KEY_456"

# Create an exception that might contain the URL
mock_getaddrinfo.side_effect = OSError(
f"Failed to resolve {test_url}"
)

# Action
result = main.validate_folder_url(test_url)

# Assertions
self.assertFalse(result, "URL validation should fail")
self.assertTrue(
mock_log.warning.called, "log.warning should have been called"
)

# Get all warning calls
warning_calls = mock_log.warning.call_args_list

# Check that none of the logged messages contain the secret
for call in warning_calls:
logged_message = str(call[0][0])
self.assertNotIn(
"SECRET_API_KEY_456",
logged_message,
"Secret API key leaked in logs!"
)

@patch('main.log')
def test_fetch_folder_rules_redacts_exception(self, mock_log):
"""Test exception handlers in get_all_existing_rules redact."""
# Setup
client = MagicMock()
profile_id = "test_profile"
folder_id = "folder_123"

# Create the exception with sensitive data
error_with_token = ValueError(
"API error at https://api.controld.com?token=TEST_SECRET_789"
)

# Mock _api_get to succeed for root but fail for folder rules
with patch('main._api_get') as mock_api_get:
def api_get_side_effect(client_arg, url):

Check warning

Code scanning / Pylint (reported by Codacy)

Missing function docstring Warning test

Missing function docstring

Check notice

Code scanning / Pylint (reported by Codacy)

Unused argument 'client_arg' Note test

Unused argument 'client_arg'

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Unused argument 'client_arg' Note test

Unused argument 'client_arg'
if url.endswith("/rules"):

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Unnecessary "else" after "return" Warning test

Unnecessary "else" after "return"
# Root rules succeed
mock_response = MagicMock()
mock_response.json.return_value = {"body": {"rules": []}}
return mock_response
else:
# Folder rules fail with our error
raise error_with_token

mock_api_get.side_effect = api_get_side_effect

# Provide known_folders to ensure _fetch_folder_rules is called
main.get_all_existing_rules(
client, profile_id, known_folders={"Test Folder": folder_id}
)

# Assertion - verify the exception was sanitized
# Either log.warning was called from _fetch_folder_rules (line 859)
# or from the outer exception handler (line 895-896)
self.assertTrue(
mock_log.warning.called, "log.warning should have been called"
)

# Get warning calls
warning_calls = mock_log.warning.call_args_list

# Check that the secret is redacted in all warnings
for call in warning_calls:
logged_message = str(call[0][0])
self.assertNotIn(
"TEST_SECRET_789",
logged_message,
"Secret token leaked in logs!"
)

@patch('main.log')
@patch('main._api_post')
@patch('main._api_get')
def test_create_folder_redacts_exception(

Check warning

Code scanning / Pylint (reported by Codacy)

Too many local variables (19/15) Warning test

Too many local variables (19/15)

Check warning

Code scanning / Pylintpython3 (reported by Codacy)

Too many local variables (18/15) Warning test

Too many local variables (18/15)
self, mock_api_get, mock_api_post, mock_log

Check warning

Code scanning / Pylint (reported by Codacy)

Wrong hanging indentation before block (add 4 spaces). Warning test

Wrong hanging indentation before block (add 4 spaces).
):
"""Test that create_folder redacts tokens in debug log messages."""
# Setup
client = MagicMock()
profile_id = "test_profile"
folder_name = "Test Folder"

# Create an exception with a sensitive token in the message
sensitive_url = "https://api.controld.com/create?token=SECRET_XYZ_789"
error_with_token = ValueError(
f"Failed to parse response from {sensitive_url}"
)

# Mock json() to raise an exception with the sensitive URL
mock_response = MagicMock()
mock_response.json.side_effect = error_with_token
mock_api_post.return_value = mock_response

# Mock the fallback polling to return empty results quickly
mock_get_response = MagicMock()
mock_get_response.json.return_value = {"body": {"groups": []}}
mock_api_get.return_value = mock_get_response

# Mock time.sleep to avoid actual delays
with patch('main.time.sleep'):
# Action - this will try to call json(), catch exception, and log
result = main.create_folder(client, profile_id, folder_name, 1, 1)

# Should return None after retries fail
self.assertIsNone(result)

# Assertion - verify debug was called and sanitization worked
self.assertTrue(
mock_log.debug.called,
"log.debug should have been called for exception"
)

debug_calls = mock_log.debug.call_args_list
logged_messages = [str(call[0][0]) for call in debug_calls]

# At least one debug message should be about the POST response error
found_relevant_log = False
for logged_message in logged_messages:
if "Could not extract ID from POST response" in logged_message:
found_relevant_log = True
# Verify the secret token is redacted
self.assertNotIn(
"SECRET_XYZ_789",
logged_message,
"Secret token leaked in debug logs!"
)
# Verify redaction placeholder is present
self.assertIn(
"[REDACTED]",
logged_message,
"Redaction placeholder missing in debug logs!"
)

self.assertTrue(
found_relevant_log,
"Expected debug log about POST response error not found"
)

@patch('main.log')
def test_retry_request_redacts_exception(self, mock_log):
"""Test that _retry_request redacts tokens in warning messages."""
# Setup - create an exception with sensitive data
error_with_token = httpx.RequestError(
"Connection to https://api.example.com?secret=HIDDEN_KEY_999",
request=MagicMock()
)

# Create a failing request function
def failing_request():

Check warning

Code scanning / Pylint (reported by Codacy)

Missing function docstring Warning test

Missing function docstring
raise error_with_token

# Action
try:
main._retry_request(failing_request, max_retries=2, delay=0.01)

Check notice

Code scanning / Pylint (reported by Codacy)

Access to a protected member _retry_request of a client class Note test

Access to a protected member _retry_request of a client class

Check notice

Code scanning / Pylintpython3 (reported by Codacy)

Access to a protected member _retry_request of a client class Note test

Access to a protected member _retry_request of a client class
except httpx.RequestError:
pass # Expected to fail after retries

# Assertion
self.assertTrue(
mock_log.warning.called, "log.warning should have been called"
)

# Get all warning calls
warning_calls = mock_log.warning.call_args_list

# Check that the secret is redacted
for call in warning_calls:
logged_message = str(call[0][0])
self.assertNotIn(
"HIDDEN_KEY_999",
logged_message,
"Secret key leaked in logs!"
)


if __name__ == '__main__':
unittest.main()
Loading