-
Notifications
You must be signed in to change notification settings - Fork 1
🛡️ Sentinel: [MEDIUM] Sanitize exceptions in logs #180
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
38a757f
9fd79e4
f0346cc
01b3565
1e697af
f2eb41d
8879fc2
fa5152f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -407,7 +407,7 @@ | |
| during warm-up and sync phases. | ||
| """ | ||
| if not url.startswith("https://"): | ||
| log.warning( | ||
Check noticeCode scanning / Pylintpython3 (reported by Codacy) Use lazy % formatting in logging functions Note
Use lazy % formatting in logging functions
|
||
| f"Skipping unsafe or invalid URL (must be https): {sanitize_for_log(url)}" | ||
| ) | ||
| return False | ||
|
|
@@ -420,7 +420,7 @@ | |
|
|
||
| # Check for potentially malicious hostnames | ||
| if hostname.lower() in ("localhost", "127.0.0.1", "::1"): | ||
| log.warning( | ||
Check warningCode scanning / Prospector (reported by Codacy) Use lazy % formatting in logging functions (logging-fstring-interpolation) Warning
Use lazy % formatting in logging functions (logging-fstring-interpolation)
|
||
| f"Skipping unsafe URL (localhost detected): {sanitize_for_log(url)}" | ||
| ) | ||
| return False | ||
|
|
@@ -428,7 +428,7 @@ | |
| try: | ||
| ip = ipaddress.ip_address(hostname) | ||
| if not ip.is_global or ip.is_multicast: | ||
| log.warning( | ||
Check warningCode scanning / Prospector (reported by Codacy) Use lazy % formatting in logging functions (logging-fstring-interpolation) Warning
Use lazy % formatting in logging functions (logging-fstring-interpolation)
|
||
| f"Skipping unsafe URL (non-global/multicast IP): {sanitize_for_log(url)}" | ||
| ) | ||
| return False | ||
|
|
@@ -449,11 +449,15 @@ | |
| ) | ||
| return False | ||
| except (socket.gaierror, ValueError, OSError) as e: | ||
| log.warning(f"Failed to resolve/validate domain {hostname}: {e}") | ||
| log.warning( | ||
| f"Failed to resolve/validate domain {hostname}: {sanitize_for_log(e)}" | ||
| ) | ||
| return False | ||
|
|
||
| except Exception as e: | ||
| log.warning(f"Failed to validate URL {sanitize_for_log(url)}: {e}") | ||
| log.warning( | ||
| f"Failed to validate URL {sanitize_for_log(url)}: {sanitize_for_log(e)}" | ||
| ) | ||
|
Comment on lines
+452
to
+460
|
||
| return False | ||
|
|
||
| return True | ||
|
|
@@ -602,7 +606,8 @@ | |
| raise | ||
| wait_time = delay * (2**attempt) | ||
| log.warning( | ||
| f"Request failed (attempt {attempt + 1}/{max_retries}): {e}. Retrying in {wait_time}s..." | ||
| f"Request failed (attempt {attempt + 1}/{max_retries}): " | ||
| f"{sanitize_for_log(e)}. Retrying in {wait_time}s..." | ||
| ) | ||
| time.sleep(wait_time) | ||
|
|
||
|
|
@@ -697,10 +702,10 @@ | |
| f"{Colors.FAIL} Please verify the Profile ID from your Control D Dashboard URL.{Colors.ENDC}" | ||
| ) | ||
| else: | ||
| log.error(f"API Access Check Failed ({code}): {e}") | ||
| log.error(f"API Access Check Failed ({code}): {sanitize_for_log(e)}") | ||
| return False | ||
| except httpx.RequestError as e: | ||
| log.error(f"Network Error during access check: {e}") | ||
| log.error(f"Network Error during access check: {sanitize_for_log(e)}") | ||
|
||
| return False | ||
|
Comment on lines
+705
to
709
|
||
|
|
||
|
|
||
|
|
@@ -852,7 +857,7 @@ | |
| except Exception as e: | ||
| # We log error but don't stop the whole process; | ||
| # individual folder failure shouldn't crash the sync | ||
| log.warning(f"Error fetching rules for folder {folder_id}: {e}") | ||
| log.warning(f"Error fetching rules for folder {folder_id}: {sanitize_for_log(e)}") | ||
Check warningCode scanning / Prospector (reported by Codacy) Use lazy % formatting in logging functions (logging-fstring-interpolation) Warning
Use lazy % formatting in logging functions (logging-fstring-interpolation)
Check noticeCode scanning / Pylintpython3 (reported by Codacy) Use lazy % formatting in logging functions Note
Use lazy % formatting in logging functions
|
||
| return [] | ||
|
|
||
| try: | ||
|
|
@@ -888,7 +893,9 @@ | |
| all_rules.update(result) | ||
| except Exception as e: | ||
| folder_id = future_to_folder[future] | ||
| log.warning(f"Failed to fetch rules for folder ID {folder_id}: {e}") | ||
| log.warning( | ||
Check noticeCode scanning / Pylintpython3 (reported by Codacy) Use lazy % formatting in logging functions Note
Use lazy % formatting in logging functions
Check noticeCode scanning / Pylintpython3 (reported by Codacy) Use lazy % formatting in logging functions Note
Use lazy % formatting in logging functions
|
||
| f"Failed to fetch rules for folder ID {folder_id}: {sanitize_for_log(e)}" | ||
| ) | ||
|
|
||
| log.info(f"Total existing rules across all folders: {len(all_rules)}") | ||
| return all_rules | ||
|
|
@@ -934,7 +941,7 @@ | |
| completed += 1 | ||
| render_progress_bar(completed, total, "Warming up cache", prefix="⏳") | ||
| try: | ||
| future.result() | ||
Check noticeCode scanning / Pylintpython3 (reported by Codacy) Use lazy % formatting in logging functions Note
Use lazy % formatting in logging functions
|
||
| except Exception as e: | ||
| if USE_COLORS: | ||
| # Clear line to print warning cleanly | ||
|
|
@@ -942,7 +949,8 @@ | |
| sys.stderr.flush() | ||
|
|
||
| log.warning( | ||
| f"Failed to pre-fetch {sanitize_for_log(futures[future])}: {e}" | ||
| f"Failed to pre-fetch {sanitize_for_log(futures[future])}: " | ||
| f"{sanitize_for_log(e)}" | ||
| ) | ||
| # Restore progress bar after warning | ||
| render_progress_bar(completed, total, "Warming up cache", prefix="⏳") | ||
|
|
@@ -1007,7 +1015,10 @@ | |
| ) | ||
| return str(grp["PK"]) | ||
| except Exception as e: | ||
| log.debug(f"Could not extract ID from POST response: {e}") | ||
| log.debug( | ||
|
||
| f"Could not extract ID from POST response: " | ||
| f"{sanitize_for_log(e)}" | ||
| ) | ||
|
|
||
| # 2. Fallback: Poll for the new folder (The Robust Retry Logic) | ||
| for attempt in range(MAX_RETRIES + 1): | ||
|
|
@@ -1024,7 +1035,9 @@ | |
| ) | ||
| return str(grp["PK"]) | ||
| except Exception as e: | ||
| log.warning(f"Error fetching groups on attempt {attempt}: {e}") | ||
| log.warning( | ||
| f"Error fetching groups on attempt {attempt}: {sanitize_for_log(e)}" | ||
| ) | ||
|
|
||
| if attempt < MAX_RETRIES: | ||
| wait_time = FOLDER_CREATION_DELAY * (attempt + 1) | ||
|
|
@@ -1056,11 +1069,11 @@ | |
| client: httpx.Client, | ||
| ) -> bool: | ||
| if not hostnames: | ||
| log.info("Folder %s - no rules to push", sanitize_for_log(folder_name)) | ||
Check warningCode scanning / Prospector (reported by Codacy) Use lazy % formatting in logging functions (logging-fstring-interpolation) Warning
Use lazy % formatting in logging functions (logging-fstring-interpolation)
|
||
| return True | ||
|
|
||
| original_count = len(hostnames) | ||
|
|
||
Check warningCode scanning / Prospector (reported by Codacy) Use lazy % formatting in logging functions (logging-fstring-interpolation) Warning
Use lazy % formatting in logging functions (logging-fstring-interpolation)
|
||
| # Optimization 1: Deduplicate input list while preserving order | ||
| # Optimization 2: Check directly against existing_rules to avoid O(N) copy. | ||
| seen = set() | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,255 @@ | ||
| """Tests for exception logging and sanitization. | ||
|
|
||
| This module verifies that sensitive data (tokens, secrets, credentials) in | ||
| exception messages are properly redacted before being written to logs. | ||
| """ | ||
|
|
||
| import unittest | ||
|
||
| from unittest.mock import MagicMock, patch | ||
| import httpx | ||
|
||
| import main | ||
|
|
||
|
|
||
| class TestExceptionLogging(unittest.TestCase): | ||
| """Test suite for exception logging with sanitization.""" | ||
|
|
||
| @patch('main.log') | ||
| @patch('main.TOKEN', 'TEST_TOKEN_XYZ') | ||
| def test_check_api_access_redacts_exception(self, mock_log): | ||
|
||
| """Test that check_api_access redacts tokens in exception messages.""" | ||
| # Setup | ||
| client = MagicMock() | ||
| profile_id = "test_profile" | ||
|
|
||
| # Simulate a RequestError containing the test token | ||
| # This simulates a situation where the exception message might | ||
| # contain the URL with token | ||
| error_message = ( | ||
| "Connection error to " | ||
| "https://api.controld.com?token=TEST_TOKEN_XYZ" | ||
| ) | ||
| client.get.side_effect = httpx.RequestError( | ||
| error_message, request=MagicMock() | ||
| ) | ||
|
|
||
| # Action | ||
| main.check_api_access(client, profile_id) | ||
|
|
||
| # Assertion | ||
| # We expect log.error to be called | ||
| self.assertTrue( | ||
| mock_log.error.called, "log.error should have been called" | ||
| ) | ||
|
|
||
| # Check the arguments passed to log.error | ||
| # The code is: log.error(f"Network Error during access check: {e}") | ||
| # So the first argument should contain the redacted message | ||
| # Since it is an f-string, the redaction must happen BEFORE | ||
| # formatting or inside the f-string expression | ||
| args = mock_log.error.call_args[0] | ||
| logged_message = args[0] | ||
|
|
||
| # Verify secret is NOT present | ||
| self.assertNotIn( | ||
| "TEST_TOKEN_XYZ", logged_message, "Secret token leaked in logs!" | ||
| ) | ||
|
|
||
| # Verify redaction placeholder IS present | ||
| self.assertIn( | ||
| "[REDACTED]", logged_message, "Redaction placeholder missing!" | ||
| ) | ||
|
|
||
| @patch('main.log') | ||
| @patch('main.socket.getaddrinfo') | ||
| def test_validate_folder_url_redacts_exception( | ||
| self, mock_getaddrinfo, mock_log | ||
Check warningCode scanning / Pylint (reported by Codacy) Wrong hanging indentation before block (add 4 spaces). Warning test
Wrong hanging indentation before block (add 4 spaces).
|
||
| ): | ||
| """Test validate_folder_url redacts sensitive data in exceptions.""" | ||
| # Setup - simulate an exception during DNS resolution that | ||
| # contains a sensitive URL | ||
| test_url = "https://example.com/list?api_key=SECRET_API_KEY_456" | ||
|
|
||
| # Create an exception that might contain the URL | ||
| mock_getaddrinfo.side_effect = OSError( | ||
| f"Failed to resolve {test_url}" | ||
| ) | ||
|
|
||
| # Action | ||
| result = main.validate_folder_url(test_url) | ||
|
|
||
| # Assertions | ||
| self.assertFalse(result, "URL validation should fail") | ||
| self.assertTrue( | ||
| mock_log.warning.called, "log.warning should have been called" | ||
| ) | ||
|
|
||
| # Get all warning calls | ||
| warning_calls = mock_log.warning.call_args_list | ||
|
|
||
| # Check that none of the logged messages contain the secret | ||
| for call in warning_calls: | ||
| logged_message = str(call[0][0]) | ||
| self.assertNotIn( | ||
| "SECRET_API_KEY_456", | ||
| logged_message, | ||
| "Secret API key leaked in logs!" | ||
| ) | ||
|
|
||
| @patch('main.log') | ||
| def test_fetch_folder_rules_redacts_exception(self, mock_log): | ||
| """Test exception handlers in get_all_existing_rules redact.""" | ||
| # Setup | ||
| client = MagicMock() | ||
| profile_id = "test_profile" | ||
| folder_id = "folder_123" | ||
|
|
||
| # Create the exception with sensitive data | ||
| error_with_token = ValueError( | ||
| "API error at https://api.controld.com?token=TEST_SECRET_789" | ||
| ) | ||
|
|
||
| # Mock _api_get to succeed for root but fail for folder rules | ||
| with patch('main._api_get') as mock_api_get: | ||
| def api_get_side_effect(client_arg, url): | ||
Check warningCode scanning / Pylint (reported by Codacy) Missing function docstring Warning test
Missing function docstring
Check noticeCode scanning / Pylint (reported by Codacy) Unused argument 'client_arg' Note test
Unused argument 'client_arg'
|
||
| if url.endswith("/rules"): | ||
Check warningCode scanning / Pylintpython3 (reported by Codacy) Unnecessary "else" after "return" Warning test
Unnecessary "else" after "return"
|
||
| # Root rules succeed | ||
| mock_response = MagicMock() | ||
| mock_response.json.return_value = {"body": {"rules": []}} | ||
| return mock_response | ||
| else: | ||
| # Folder rules fail with our error | ||
| raise error_with_token | ||
|
|
||
| mock_api_get.side_effect = api_get_side_effect | ||
|
|
||
| # Provide known_folders to ensure _fetch_folder_rules is called | ||
| main.get_all_existing_rules( | ||
| client, profile_id, known_folders={"Test Folder": folder_id} | ||
| ) | ||
|
|
||
| # Assertion - verify the exception was sanitized | ||
| # Either log.warning was called from _fetch_folder_rules (line 859) | ||
| # or from the outer exception handler (line 895-896) | ||
| self.assertTrue( | ||
| mock_log.warning.called, "log.warning should have been called" | ||
| ) | ||
|
|
||
| # Get warning calls | ||
| warning_calls = mock_log.warning.call_args_list | ||
|
|
||
| # Check that the secret is redacted in all warnings | ||
| for call in warning_calls: | ||
| logged_message = str(call[0][0]) | ||
| self.assertNotIn( | ||
| "TEST_SECRET_789", | ||
| logged_message, | ||
| "Secret token leaked in logs!" | ||
| ) | ||
|
|
||
| @patch('main.log') | ||
| @patch('main._api_post') | ||
| @patch('main._api_get') | ||
| def test_create_folder_redacts_exception( | ||
Check warningCode scanning / Pylint (reported by Codacy) Too many local variables (19/15) Warning test
Too many local variables (19/15)
Check warningCode scanning / Pylintpython3 (reported by Codacy) Too many local variables (18/15) Warning test
Too many local variables (18/15)
|
||
| self, mock_api_get, mock_api_post, mock_log | ||
Check warningCode scanning / Pylint (reported by Codacy) Wrong hanging indentation before block (add 4 spaces). Warning test
Wrong hanging indentation before block (add 4 spaces).
|
||
| ): | ||
| """Test that create_folder redacts tokens in debug log messages.""" | ||
| # Setup | ||
| client = MagicMock() | ||
| profile_id = "test_profile" | ||
| folder_name = "Test Folder" | ||
|
|
||
| # Create an exception with a sensitive token in the message | ||
| sensitive_url = "https://api.controld.com/create?token=SECRET_XYZ_789" | ||
| error_with_token = ValueError( | ||
| f"Failed to parse response from {sensitive_url}" | ||
| ) | ||
|
|
||
| # Mock json() to raise an exception with the sensitive URL | ||
| mock_response = MagicMock() | ||
| mock_response.json.side_effect = error_with_token | ||
| mock_api_post.return_value = mock_response | ||
|
|
||
| # Mock the fallback polling to return empty results quickly | ||
| mock_get_response = MagicMock() | ||
| mock_get_response.json.return_value = {"body": {"groups": []}} | ||
| mock_api_get.return_value = mock_get_response | ||
|
|
||
| # Mock time.sleep to avoid actual delays | ||
| with patch('main.time.sleep'): | ||
| # Action - this will try to call json(), catch exception, and log | ||
| result = main.create_folder(client, profile_id, folder_name, 1, 1) | ||
|
|
||
| # Should return None after retries fail | ||
| self.assertIsNone(result) | ||
|
|
||
| # Assertion - verify debug was called and sanitization worked | ||
| self.assertTrue( | ||
| mock_log.debug.called, | ||
| "log.debug should have been called for exception" | ||
| ) | ||
|
|
||
| debug_calls = mock_log.debug.call_args_list | ||
| logged_messages = [str(call[0][0]) for call in debug_calls] | ||
|
|
||
| # At least one debug message should be about the POST response error | ||
| found_relevant_log = False | ||
| for logged_message in logged_messages: | ||
| if "Could not extract ID from POST response" in logged_message: | ||
| found_relevant_log = True | ||
| # Verify the secret token is redacted | ||
| self.assertNotIn( | ||
| "SECRET_XYZ_789", | ||
| logged_message, | ||
| "Secret token leaked in debug logs!" | ||
| ) | ||
| # Verify redaction placeholder is present | ||
| self.assertIn( | ||
| "[REDACTED]", | ||
| logged_message, | ||
| "Redaction placeholder missing in debug logs!" | ||
| ) | ||
|
|
||
| self.assertTrue( | ||
| found_relevant_log, | ||
| "Expected debug log about POST response error not found" | ||
| ) | ||
|
|
||
| @patch('main.log') | ||
| def test_retry_request_redacts_exception(self, mock_log): | ||
| """Test that _retry_request redacts tokens in warning messages.""" | ||
| # Setup - create an exception with sensitive data | ||
| error_with_token = httpx.RequestError( | ||
| "Connection to https://api.example.com?secret=HIDDEN_KEY_999", | ||
| request=MagicMock() | ||
| ) | ||
|
|
||
| # Create a failing request function | ||
| def failing_request(): | ||
Check warningCode scanning / Pylint (reported by Codacy) Missing function docstring Warning test
Missing function docstring
|
||
| raise error_with_token | ||
|
|
||
| # Action | ||
| try: | ||
| main._retry_request(failing_request, max_retries=2, delay=0.01) | ||
Check noticeCode scanning / Pylint (reported by Codacy) Access to a protected member _retry_request of a client class Note test
Access to a protected member _retry_request of a client class
|
||
| except httpx.RequestError: | ||
| pass # Expected to fail after retries | ||
|
|
||
| # Assertion | ||
| self.assertTrue( | ||
| mock_log.warning.called, "log.warning should have been called" | ||
| ) | ||
|
|
||
| # Get all warning calls | ||
| warning_calls = mock_log.warning.call_args_list | ||
|
|
||
| # Check that the secret is redacted | ||
| for call in warning_calls: | ||
| logged_message = str(call[0][0]) | ||
| self.assertNotIn( | ||
| "HIDDEN_KEY_999", | ||
| logged_message, | ||
| "Secret key leaked in logs!" | ||
| ) | ||
|
|
||
|
|
||
| if __name__ == '__main__': | ||
|
||
| unittest.main() | ||
Check notice
Code scanning / Pylintpython3 (reported by Codacy)
Use lazy % formatting in logging functions Note