diff --git a/main.py b/main.py index e1e3ced4..3a75f9b6 100644 --- a/main.py +++ b/main.py @@ -263,13 +263,27 @@ def print_plan_details(plan_entry: Dict[str, Any]) -> None: print(" No folders to sync.") return - for folder in sorted(folders, key=lambda f: f.get("name", "")): + # Calculate max width for alignment + max_name_len = max( + # Use the same default ("Unknown") as when printing, so alignment is accurate + (len(sanitize_for_log(f.get("name", "Unknown"))) for f in folders), + default=0, + ) + max_rules_len = max((len(f"{f.get('rules', 0):,}") for f in folders), default=0) + + for folder in sorted(folders, key=lambda f: f.get("name", "Unknown")): name = sanitize_for_log(folder.get("name", "Unknown")) rules_count = folder.get("rules", 0) + formatted_rules = f"{rules_count:,}" + if USE_COLORS: - print(f" • {Colors.BOLD}{name}{Colors.ENDC}: {rules_count} rules") + print( + f" • {Colors.BOLD}{name:<{max_name_len}}{Colors.ENDC} : {formatted_rules:>{max_rules_len}} rules" + ) else: - print(f" - {name}: {rules_count} rules") + print( + f" - {name:<{max_name_len}} : {formatted_rules:>{max_rules_len}} rules" + ) print("") @@ -343,15 +357,34 @@ def get_validated_input( prompt: str, validator: Callable[[str], bool], error_msg: str, - is_password: bool = False, ) -> str: """Prompts for input until the validator returns True.""" while True: try: - if is_password: - value = getpass.getpass(prompt).strip() - else: - value = input(prompt).strip() + value = input(prompt).strip() + except (KeyboardInterrupt, EOFError): + print(f"\n{Colors.WARNING}⚠️ Input cancelled.{Colors.ENDC}") + sys.exit(130) + + if not value: + print(f"{Colors.FAIL}❌ Value cannot be empty{Colors.ENDC}") + continue + + if validator(value): + return value + + print(f"{Colors.FAIL}❌ {error_msg}{Colors.ENDC}") + + +def get_password( + prompt: str, + validator: Callable[[str], bool], + error_msg: str, +) -> str: + """Prompts for password input until the validator returns True.""" + while True: + try: + value = getpass.getpass(prompt).strip() except (KeyboardInterrupt, EOFError): print(f"\n{Colors.WARNING}⚠️ Input cancelled.{Colors.ENDC}") sys.exit(130) @@ -1224,7 +1257,7 @@ def _fetch_folder_rules(folder_id: str) -> List[str]: f"Failed to fetch rules for folder ID {folder_id}: {sanitize_for_log(e)}" ) - log.info(f"Total existing rules across all folders: {len(all_rules)}") + log.info(f"Total existing rules across all folders: {len(all_rules):,}") return all_rules except Exception as e: log.error(f"Failed to get existing rules: {sanitize_for_log(e)}") @@ -1247,7 +1280,7 @@ def warm_up_cache(urls: Sequence[str]) -> None: total = len(urls_to_process) if not USE_COLORS: - log.info(f"Warming up cache for {total} URLs...") + log.info(f"Warming up cache for {total:,} URLs...") # OPTIMIZATION: Combine validation (DNS) and fetching (HTTP) in one task # to allow validation latency to be parallelized. @@ -1463,10 +1496,7 @@ def process_batch(batch_idx: int, batch_data: List[str]) -> Optional[List[str]]: _api_post_form(client, f"{API_BASE}/{profile_id}/rules", data=data) if not USE_COLORS: log.info( - "Folder %s – batch %d: added %d rules", - sanitize_for_log(folder_name), - batch_idx, - len(batch_data), + f"Folder {sanitize_for_log(folder_name)} – batch {batch_idx}: added {len(batch_data):,} rules" ) return batch_data except httpx.HTTPError as e: @@ -1515,14 +1545,12 @@ def process_batch(batch_idx: int, batch_data: List[str]) -> Optional[List[str]]: if successful_batches == total_batches: if USE_COLORS: sys.stderr.write( - f"\r\033[K{Colors.GREEN}✅ Folder {sanitize_for_log(folder_name)}: Finished ({len(filtered_hostnames)} rules){Colors.ENDC}\n" + f"\r\033[K{Colors.GREEN}✅ Folder {sanitize_for_log(folder_name)}: Finished ({len(filtered_hostnames):,} rules){Colors.ENDC}\n" ) sys.stderr.flush() else: log.info( - "Folder %s – finished (%d new rules added)", - sanitize_for_log(folder_name), - len(filtered_hostnames), + f"Folder {sanitize_for_log(folder_name)} – finished ({len(filtered_hostnames):,} new rules added)" ) return True else: @@ -1854,11 +1882,10 @@ def validate_profile_input(value: str) -> bool: f"{Colors.CYAN} You can generate one at: https://controld.com/account/manage-account{Colors.ENDC}" ) - t_input = get_validated_input( + t_input = get_password( f"{Colors.BOLD}Enter Control D API Token:{Colors.ENDC} ", lambda x: len(x) > 8, "Token seems too short. Please check your API token.", - is_password=True, ) TOKEN = t_input diff --git a/test_main.py b/test_main.py index c926e0cf..e8c19321 100644 --- a/test_main.py +++ b/test_main.py @@ -495,8 +495,8 @@ def test_get_validated_input_retry(monkeypatch, capsys): assert "Error message" in captured.out -# Case 12: get_validated_input works with getpass -def test_get_validated_input_password(monkeypatch): +# Case 12: get_password works with getpass +def test_get_password(monkeypatch): m = reload_main_with_env(monkeypatch) getpass_mock = MagicMock(return_value="secret") @@ -504,7 +504,7 @@ def test_get_validated_input_password(monkeypatch): validator = lambda x: True - result = m.get_validated_input("Password: ", validator, "Error", is_password=True) + result = m.get_password("Password: ", validator, "Error") assert result == "secret" getpass_mock.assert_called_once() @@ -552,25 +552,53 @@ def test_get_validated_input_interrupt(monkeypatch, capsys): assert "Input cancelled" in captured.out -# Case 15: get_validated_input handles both KeyboardInterrupt and EOFError for regular and password inputs +def test_get_password_interrupt(monkeypatch, capsys): + m = reload_main_with_env(monkeypatch) + + # Mock input to raise KeyboardInterrupt + monkeypatch.setattr("getpass.getpass", MagicMock(side_effect=KeyboardInterrupt)) + + with pytest.raises(SystemExit) as e: + m.get_password("Prompt: ", lambda x: True, "Error") + + # Check exit code is 130 + assert e.value.code == 130 + + # Check friendly message + captured = capsys.readouterr() + assert "Input cancelled" in captured.out + + +# Case 15: get_validated_input and get_password handle both KeyboardInterrupt and EOFError +@pytest.mark.parametrize("exception", [KeyboardInterrupt, EOFError]) +def test_get_validated_input_graceful_exit(monkeypatch, capsys, exception): + """Test graceful exit on user cancellation (Ctrl+C/Ctrl+D) for regular inputs.""" + m = reload_main_with_env(monkeypatch) + + # Mock input to raise the specified exception + monkeypatch.setattr("builtins.input", MagicMock(side_effect=exception)) + + with pytest.raises(SystemExit) as e: + m.get_validated_input("Prompt: ", lambda x: True, "Error") + + # Check exit code is 130 (standard for SIGINT) + assert e.value.code == 130 + + # Check friendly cancellation message is displayed + captured = capsys.readouterr() + assert "Input cancelled" in captured.out + + @pytest.mark.parametrize("exception", [KeyboardInterrupt, EOFError]) -@pytest.mark.parametrize( - "is_password,mock_path", - [(False, "builtins.input"), (True, "getpass.getpass")], -) -def test_get_validated_input_graceful_exit_comprehensive( - monkeypatch, capsys, exception, is_password, mock_path -): - """Test graceful exit on user cancellation (Ctrl+C/Ctrl+D) for both regular and password inputs.""" +def test_get_password_graceful_exit(monkeypatch, capsys, exception): + """Test graceful exit on user cancellation (Ctrl+C/Ctrl+D) for password inputs.""" m = reload_main_with_env(monkeypatch) # Mock input to raise the specified exception - monkeypatch.setattr(mock_path, MagicMock(side_effect=exception)) + monkeypatch.setattr("getpass.getpass", MagicMock(side_effect=exception)) with pytest.raises(SystemExit) as e: - m.get_validated_input( - "Prompt: ", lambda x: True, "Error", is_password=is_password - ) + m.get_password("Prompt: ", lambda x: True, "Error") # Check exit code is 130 (standard for SIGINT) assert e.value.code == 130