diff --git a/main.py b/main.py index 4f4a2e6..6222b88 100644 --- a/main.py +++ b/main.py @@ -11,9 +11,46 @@ import shutil from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime -import msvcrt import re +# --- Cross-Platform Input Handling --- + +if os.name == 'nt': + try: + import msvcrt + except ImportError: + pass +else: + import tty + import termios + import select + +def get_char(): + """Reads a single character from input (blocking). Returns string.""" + if os.name == 'nt': + try: + return msvcrt.getch().decode('utf-8', errors='ignore') + except: + return '' + else: + # Linux: Single char read for pause listener + fd = sys.stdin.fileno() + old_settings = termios.tcgetattr(fd) + try: + tty.setraw(fd) + ch = sys.stdin.read(1) + finally: + termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) + return ch + +def kb_hit(): + """Returns True if a key is waiting to be read (non-blocking).""" + if os.name == 'nt': + return msvcrt.kbhit() + else: + dr, dw, de = select.select([sys.stdin], [], [], 0) + return dr != [] + # --- Colors --- class Colors: HEADER = '\033[96m' # Cyan @@ -34,35 +71,90 @@ class Colors: OUTPUT_FINAL_DIR = os.path.join("Output", "Final") CHECKPOINTS_DIR = "Checkpoints" -# Enable VT100 for Windows 10/11 -os.system('color') +if os.name == 'nt': + os.system('color') # --- Utils --- def clear_screen(): - os.system('cls' if os.name == 'nt' else 'clear') + # ANSI escape code to clear screen (works on Termux & Linux without spawning subshell) + print("\033[H\033[J", end="") def get_key(): """Reads a key press and returns a unified key code.""" - key = msvcrt.getch() - if key in (b'\x00', b'\xe0'): - # Arrow keys or special function keys + + if os.name == 'nt': + # --- WINDOWS LOGIC --- key = msvcrt.getch() - if key == b'H': return 'UP' - if key == b'P': return 'DOWN' - if key == b'M': return 'RIGHT' - if key == b'K': return 'LEFT' - elif key == b'\r': return 'ENTER' - elif key == b' ': return 'SPACE' - elif key == b'\x08': return 'BACKSPACE' - elif key == b'\x03': return 'CTRL_C' - elif key == b'\x1b': return 'ESC' - return None + if key in (b'\x00', b'\xe0'): + try: + key = msvcrt.getch() + if key == b'H': return 'UP' + if key == b'P': return 'DOWN' + if key == b'M': return 'RIGHT' + if key == b'K': return 'LEFT' + except: pass + elif key == b'\r': return 'ENTER' + elif key == b' ': return 'SPACE' + elif key == b'\x08': return 'BACKSPACE' + elif key == b'\x03': return 'CTRL_C' + elif key == b'\x1b': return 'ESC' + try: return key.decode() + except: return None + + else: + # --- LINUX / TERMUX LOGIC --- + # We must enter raw mode ONCE to capture the full sequence (ESC [ A) + fd = sys.stdin.fileno() + old_settings = termios.tcgetattr(fd) + try: + tty.setraw(fd) + + # Read first byte directly from FD to avoid python buffering issues + ch_bytes = os.read(fd, 1) + if not ch_bytes: return None + ch = ch_bytes.decode('utf-8', errors='ignore') + + if ch == '\x1b': + # It's ESC. Check if a sequence follows immediately. + dr, _, _ = select.select([fd], [], [], 0.1) + if dr: + # Sequence detected, read next byte + seq1_bytes = os.read(fd, 1) + seq1 = seq1_bytes.decode('utf-8', errors='ignore') + + if seq1 == '[': # CSI (Common Arrows) + seq2_bytes = os.read(fd, 1) + seq2 = seq2_bytes.decode('utf-8', errors='ignore') + if seq2 == 'A': return 'UP' + if seq2 == 'B': return 'DOWN' + if seq2 == 'C': return 'RIGHT' + if seq2 == 'D': return 'LEFT' + + elif seq1 == 'O': # SS3 (Application Cursor Keys) + seq2_bytes = os.read(fd, 1) + seq2 = seq2_bytes.decode('utf-8', errors='ignore') + if seq2 == 'A': return 'UP' + if seq2 == 'B': return 'DOWN' + if seq2 == 'C': return 'RIGHT' + if seq2 == 'D': return 'LEFT' + + return 'ESC' # Captured sequence but unknown + else: + return 'ESC' # Genuine ESC key + + elif ch == '\r' or ch == '\n': return 'ENTER' + elif ch == ' ': return 'SPACE' + elif ch == '\x7f' or ch == '\x08': return 'BACKSPACE' + elif ch == '\x03': return 'CTRL_C' + + return ch + + finally: + # Always restore settings + termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) def terminal_file_selector(base_dir=".", extensions=None): - """ - Interactive TUI file explorer. - """ current_dir = os.path.abspath(base_dir) selected_files = [] cursor_idx = 0 @@ -157,48 +249,25 @@ def terminal_file_selector(base_dir=".", extensions=None): return [] def extract_ips_from_text(text): - """ - Extracts IPv4 and IPv6 addresses/CIDRs from any text using regex. - """ - # IPv4 CIDR or IP: x.x.x.x or x.x.x.x/xx ipv4_pattern = r'\b(?:\d{1,3}\.){3}\d{1,3}(?:/\d{1,2})?\b' - - # IPv6 CIDR or IP (simplified but practical) ipv6_pattern = r'(?:[0-9a-fA-F]{1,4}:){2,}(?:[0-9a-fA-F]{1,4}:?)(?:/\d{1,3})?' - ips = [] - - # Find all IPv4 - for match in re.findall(ipv4_pattern, text): - ips.append(match) - - # Find all IPv6 - for match in re.findall(ipv6_pattern, text): - ips.append(match) - - return list(set(ips)) # Dedup + for match in re.findall(ipv4_pattern, text): ips.append(match) + for match in re.findall(ipv6_pattern, text): ips.append(match) + return list(set(ips)) def load_file(filepath): - """ - Loads IPs/CIDRs from a file (JSON, CSV, or TXT) using robust regex extraction. - Supports nested JSON, messy TXT, etc. - Returns: List of dicts {'ip': str, ...} or just strings if lazy - """ if not os.path.exists(filepath): print(f"File not found: {filepath}") return [] data = [] - try: with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() - # 1. Try JSON First (for structured data preservation like latency/provider) try: json_content = json.loads(content) - - # Recursive helper to find objects with 'ip' key def extract_from_json(obj): if isinstance(obj, dict): if 'ip' in obj: @@ -210,28 +279,18 @@ def extract_from_json(obj): for k, v in obj.items(): extract_from_json(v) elif isinstance(obj, list): for item in obj: extract_from_json(item) - extract_from_json(json_content) + if data: return data + except json.JSONDecodeError: pass - if data: return data # If structured data found, return it - - # If JSON parsed but no 'ip' keys found, fall back to regex on string dump - - except json.JSONDecodeError: - pass # Not JSON, proceed to regex - - # 2. Regex Extraction (Fallback or for TXT/CSV) extracted_ips = extract_ips_from_text(content) for ip in extracted_ips: - # Basic validation try: ipaddress.ip_network(ip, strict=False) data.append({"ip": ip}) except: pass - except Exception as e: print(f"Error loading {filepath}: {e}") - return data def print_header(title="IP Range Generator"): @@ -242,14 +301,8 @@ def print_header(title="IP Range Generator"): print() def terminal_menu(options, title=None): - """ - Renders a menu with arrow key navigation. - options: list of strings or tuples (key, display_text) - """ cursor_idx = 0 - if title: print_header(title) - # Normalize options to list of strings for display display_options = [] for opt in options: if isinstance(opt, tuple): display_options.append(opt[1]) @@ -260,9 +313,8 @@ def terminal_menu(options, title=None): clear_screen() print_header(title) else: - print("\033[H", end="") # Move to home + print("\033[H", end="") - # Print Menu for i, opt in enumerate(display_options): if i == cursor_idx: print(f"{Colors.CYAN} > {opt} {Colors.ENDC}") @@ -279,18 +331,12 @@ def terminal_menu(options, title=None): elif key == 'ENTER': return cursor_idx elif key == 'ESC': - return -1 # Cancel/Back convention + return -1 def terminal_multiselect(options, title="Select Items"): - """ - Renders a multiselect menu. - options: list of strings or tuples (key, display_text) - Returns: list of selected indices - """ - selected_indices = set(range(len(options))) # Default all selected + selected_indices = set(range(len(options))) cursor_idx = 0 - # Normalize display_options = [] for opt in options: if isinstance(opt, tuple): display_options.append(opt[1]) @@ -311,7 +357,6 @@ def terminal_multiselect(options, title="Select Items"): prefix = " > " if i == cursor_idx else " " mark = "[*]" if i in selected_indices else "[ ]" - # Highlight cursor row if i == cursor_idx: print(f"{Colors.CYAN}{prefix}{mark} {opt}{Colors.ENDC}") else: @@ -339,11 +384,7 @@ def terminal_multiselect(options, title="Select Items"): elif key == 'ESC': return [] - def pause_menu(state, cfg, current_settings): - """ - Displays the Pause Menu and handles interaction. - """ while True: clear_screen() print_header("PAUSE MENU") @@ -359,34 +400,22 @@ def pause_menu(state, cfg, current_settings): idx = terminal_menu(options) - if idx == 0: # Resume + if idx == 0: state.paused = False return - elif idx == 1: # Settings - # Reuse menu_settings but we need to know if we apply to Global or just Current - # menu_settings modifies 'cfg' (global). - # We also want to modify 'current_settings' (runtime). - + elif idx == 1: s_opts = ["1. Edit Global Config (Permanent)", "2. Edit Current Scan Settings (Runtime only)", "3. Back"] s_idx = terminal_menu(s_opts, "Settings Mode") if s_idx == 0: menu_settings(cfg) - # Re-read defaults to apply to current if desired? - # Usually user expects global change to apply now. - # Let's update current_settings from cfg new_defs = cfg.get_defaults() current_settings.update(new_defs) print("Global settings applied to current scan.") time.sleep(1) elif s_idx == 1: - # We need a temp config manager wrapper to edit 'current_settings' - # Hack: Just use menu_settings logic but pass a mocked cfg? - # Or just manually edit key values here? - # Re-using menu_settings is best if possible. - # Let's create a dummy ConfigManager that wraps current_settings class RuntimeConfig: def get_defaults(self): return current_settings def update_default(self, k, v): current_settings[k] = v @@ -395,18 +424,16 @@ def update_default(self, k, v): current_settings[k] = v print("Runtime settings updated.") time.sleep(1) - elif idx == 2: # Stop & Save + elif idx == 2: state.stop_save() - state.paused = False # Break pause loop to let main loop exit + state.paused = False return - elif idx == 3: # Quit + elif idx == 3: state.stop_no_save() state.paused = False return -# --- Core Logic --- - class ScanState: def __init__(self): self.paused = False @@ -535,21 +562,12 @@ def run_test(self, ip, settings): return None def scan_ips(self, ips, settings, output_dir=OUTPUT_FINAL_DIR, resume_data=None, sources_info=None, source_files=None, interactive_confirm_save=False): - """ - ips: list of IP strings - settings: dict - resume_data: dict containing previous results if resuming - sources_info: list of strings describing sources (e.g. ['CloudFlare', 'Fastly']) or dict {ip: provider} - source_files: list of file paths used as input (for backup) - interactive_confirm_save: if True, prints results table after scan and confirmation to keep file - """ print(f"\n{Colors.HEADER}Starting Scan on {len(ips)} IPs...{Colors.ENDC}") print(f"{Colors.CYAN}Controls: {Colors.WARNING}[P]{Colors.CYAN}ause | {Colors.FAIL}[S]{Colors.CYAN}top & Save | {Colors.FAIL}[Q]{Colors.CYAN}uit (No Save){Colors.ENDC}") max_threads = settings.get('threads', 100) output_format = settings.get('output_format', 'txt') - # Setup File Path if resume_data and 'filename' in resume_data: filename = resume_data['filename'] results = resume_data.get('results', []) @@ -570,7 +588,6 @@ def scan_ips(self, ips, settings, output_dir=OUTPUT_FINAL_DIR, resume_data=None, elif isinstance(sources_info, list): scan_sources = set(sources_info) - # Add Sources to settings/metadata for output settings['sources'] = list(scan_sources) state = ScanState() @@ -578,16 +595,16 @@ def scan_ips(self, ips, settings, output_dir=OUTPUT_FINAL_DIR, resume_data=None, def input_listener(): while not state.stopped: if state.paused: - time.sleep(0.5) # Yield control to pause menu in main thread + time.sleep(0.5) continue - if msvcrt.kbhit(): + if kb_hit(): try: - key = msvcrt.getch().lower() - if key == b'p': + key = get_char().lower() + if key == 'p': state.paused = True - elif key == b's': state.stop_save() - elif key == b'q': state.stop_no_save() + elif key == 's': state.stop_save() + elif key == 'q': state.stop_no_save() except: pass time.sleep(0.1) @@ -608,12 +625,11 @@ def input_listener(): elif output_format == 'json': json_freq = settings.get('json_update_interval', 10000) print(f"{Colors.CYAN} [i] JSON format selected. File will be updated every {json_freq} IPs.{Colors.ENDC}") - print() # Spacing + print() completed = 0 success_count = 0 - # If resuming, update counts if resume_data: for r in results: if r['status'] == 'SUCCESS': success_count += 1 @@ -632,27 +648,20 @@ def task(ip): res = {'ip': ip, 'latency_ms': ping if ping is not None else 0, 'status': status} - # Tag Provider if ip in ip_provider_map: res['provider'] = ip_provider_map[ip] return res - futures = {} # Future -> IP + futures = {} with ThreadPoolExecutor(max_workers=max_threads) as executor: for ip in ips: if state.stopped: break - # Pause Loop with Menu while state.paused: - # Entering Pause Menu (Blocking Main Thread) - # Listener thread is yielding because state.paused is True pause_menu(state, self.cfg, settings) - # When pause_menu returns, state.paused might be False (Resume) or True (Stop) if state.stopped: break - - # Clear screen resume message print(f"\n{Colors.GREEN} [RESUMED] Continuing scan...{Colors.ENDC}") if state.stopped: break @@ -660,7 +669,6 @@ def task(ip): ft = executor.submit(task, ip) futures[ft] = ip - # As Completed for future in as_completed(futures): ip_processed = futures[future] @@ -668,7 +676,7 @@ def task(ip): res = future.result() except: res = None - if not res: continue # Task was stopped or failed + if not res: continue with lock: completed += 1 @@ -688,7 +696,6 @@ def task(ip): prov = f" | {res.get('provider','')}" if 'provider' in res else "" f_handle.write(f"{res['ip']} | {res['latency_ms']}ms | {res['status']}{prov}\n") - # Check settings & periodic save logic check_interval = settings.get('settings_check_interval', 1000) if completed % check_interval == 0: diff, new_defaults = self.cfg.check_for_changes(settings) @@ -703,7 +710,6 @@ def task(ip): interval = settings.get('json_update_interval', 10000) if completed % interval == 0: try: - # Filter and Clean Results for Dump save_all = settings.get('save_failed', False) dump_results = [] for r in results: @@ -719,11 +725,9 @@ def task(ip): elif output_format == 'txt' and f_handle: if completed % settings.get('txt_update_interval', 1000) == 0: f_handle.flush() - # End of loop print(f"\n{Colors.BOLD}Scan Finished or Stopped.{Colors.ENDC}") if f_handle: f_handle.close() - # Interactive Small Batch Confirmation if interactive_confirm_save and not state.stopped: print(f"\n{Colors.HEADER}--- Scan Results ---{Colors.ENDC}") print(f"{'IP':<20} | {'Ping':<8} | {'Status'}") @@ -738,20 +742,15 @@ def task(ip): print("Results discarded.") return - # --- Checkpoint & Final Save --- - if state.save_progress and state.stopped: - # Calculate remaining IPs scanned_set = set(r['ip'] for r in results) remaining = [ip for ip in ips if ip not in scanned_set] print(f"Creating Checkpoint... ({len(remaining)} IPs remaining)") if not os.path.exists(CHECKPOINTS_DIR): os.makedirs(CHECKPOINTS_DIR) - # Backup Logic backup_files_paths = [] if source_files: - # Create a backup folder for this checkpoint safe_ts = os.path.basename(filename).replace('.', '_').replace(':', '') backup_dir = os.path.join(CHECKPOINTS_DIR, f"Backup_{safe_ts}") if not os.path.exists(backup_dir): os.makedirs(backup_dir) @@ -776,7 +775,7 @@ def task(ip): "sources_info": sources_info, "results": results, "backup_files": backup_files_paths, - "remaining_ips": remaining if not backup_files_paths else [] # Optimize JSON if backed up + "remaining_ips": remaining if not backup_files_paths else [] } with open(cp_path, 'w') as f: @@ -785,10 +784,8 @@ def task(ip): if backup_files_paths: print(f"{Colors.CYAN}Source files backed up to: {backup_dir}{Colors.ENDC}") - # Final JSON dump if output_format == 'json' and results: print("Saving Final JSON report...") - # Filter and Clean Results for Dump save_all = settings.get('save_failed', False) dump_results = [] for r in results: @@ -813,15 +810,10 @@ def __init__(self, tester): self.tester = tester def expand_cidr(self, cidr, range_level='Short'): - """ - Expands a CIDR. If input is a single IP (no slash), applies range_level. - Short=/24 (256), Medium=/20 (4096), Full=/16 (65536). - """ if '/' not in cidr: - # Single IP -> Range if range_level == 'Medium': cidr += '/20' elif range_level == 'Full': cidr += '/16' - else: cidr += '/24' # Default Short + else: cidr += '/24' try: return [str(ip) for ip in ipaddress.ip_network(cidr.strip(), strict=False)] @@ -839,7 +831,6 @@ def generate_and_save(self, cidrs_data, settings, output_dir=OUTPUT_RANGES_DIR): print(f"Generating IPs for {Colors.BOLD}{cidr}{Colors.ENDC} ({prefix})...") - # Warn if IPv6 range is too big try: network = ipaddress.ip_network(cidr.strip(), strict=False) if network.version == 6 and network.num_addresses > 1000000: @@ -865,15 +856,6 @@ def generate_and_save(self, cidrs_data, settings, output_dir=OUTPUT_RANGES_DIR): return generated_files -# --- Menu Functions --- - -def get_user_settings_override(current_defaults): - print("Use default settings? (Enter=Yes, 'n'=Edit)") - if input("Diff: ").strip().lower() == 'n': - print(f"Current: {current_defaults}") - # Simplification: just return defaults for now or implement edit loop - return current_defaults - def menu_scan_ip_ranges(cfg, tester, generator): settings = cfg.get_defaults() templates = cfg.get_templates() @@ -888,7 +870,7 @@ def menu_scan_ip_ranges(cfg, tester, generator): idx = terminal_menu(options, "Generate & Scan IP Ranges") if idx == 3 or idx == -1: return - targets = [] # List of {'cidr': str, 'prefix': str} + targets = [] if idx == 0: t_keys = list(templates.keys()) @@ -897,9 +879,9 @@ def menu_scan_ip_ranges(cfg, tester, generator): sel_idx = terminal_menu(templ_options, "Select Template") - if sel_idx == -1: return # Back + if sel_idx == -1: return - if sel_idx == len(t_keys): # All Templates (Last option) + if sel_idx == len(t_keys): selected_keys = t_keys else: selected_keys = [t_keys[sel_idx]] @@ -909,14 +891,12 @@ def menu_scan_ip_ranges(cfg, tester, generator): for key in selected_keys: tmpl_data = templates[key] - # Handle potential dictionary structure (ipv4/ipv6 keys) if isinstance(tmpl_data, dict): for ip_type, cidr_list in tmpl_data.items(): if ip_type == 'ipv6' and not ipv6_enabled: continue if isinstance(cidr_list, list): for c in cidr_list: targets.append({'cidr': c, 'prefix': key}) elif isinstance(tmpl_data, list): - # Legacy or simple list support for c in tmpl_data: targets.append({'cidr': c, 'prefix': key}) elif idx == 1: @@ -926,15 +906,12 @@ def menu_scan_ip_ranges(cfg, tester, generator): if not provider_name: provider_name = "CustomFile" for f in files: - # Use robust load_file which handles JSON/TXT/CSV/Regex loaded_data = load_file(f) for entry in loaded_data: - # entry is {'ip': '...'} - # We need to adapt it to targets: {'cidr': ..., 'prefix': ...} if 'ip' in entry: targets.append({'cidr': entry['ip'], 'prefix': provider_name}) - elif idx == 2: # Terminal Input + elif idx == 2: inp = input("\nEnter IPs/CIDRs (comma separated): ") provider_name = input(f"{Colors.CYAN}Enter Custom Provider Name (Optional, Press Enter for 'Manual'): {Colors.ENDC}").strip() @@ -951,7 +928,6 @@ def menu_scan_ip_ranges(cfg, tester, generator): working_targets = targets - # --- Pre-Generation Workflow --- print(f"\nLoaded {len(working_targets)} target ranges.") pre_opts = [ "1. Generate All (Default)", @@ -960,20 +936,17 @@ def menu_scan_ip_ranges(cfg, tester, generator): ] pre_idx = terminal_menu(pre_opts, "Pre-Generation Options") - if pre_idx == 1: # Ping Check + if pre_idx == 1: print(f"\n{Colors.CYAN}Pinging gateway IPs to filter unreachable ranges...{Colors.ENDC}") filtered = [] for t in working_targets: - # Simple check: Try to connect to network address (or +1) - # For this simple tool, let's just use the first IP in range try: net = ipaddress.ip_network(t['cidr'], strict=False) test_ip = str(net[1]) if net.num_addresses > 1 else str(net[0]) - # Quick TCP connect to 80 or 443 is_alive = False for p in [80, 443]: - if tester.test_tcp(test_ip, p, 0.5): # 500ms timeout + if tester.test_tcp(test_ip, p, 0.5): is_alive = True break @@ -992,29 +965,26 @@ def menu_scan_ip_ranges(cfg, tester, generator): print(f"Filtered down to {len(filtered)} ranges.") working_targets = filtered time.sleep(1) - print() # Add spacing before generation + print() - elif pre_idx == 2: # Manual Selection + elif pre_idx == 2: opts = [f"{t['prefix']} - {t['cidr']}" for t in working_targets] sel_indices = terminal_multiselect(opts, "Select Ranges to Generate") if not sel_indices: return working_targets = [working_targets[i] for i in sel_indices] - # Generate generated_files = generator.generate_and_save(working_targets, settings) if generated_files: print("\nRange Generation Complete.") input("Press Enter to continue...") - # --- Post-Generation Workflow --- - scan_opts = ["1. Scan All Generated Files (Default)", "2. Select Files to Scan", "3. Return to Menu"] s_idx = terminal_menu(scan_opts, "Scan Generated Ranges?") files_to_scan = generated_files - if s_idx == 1: # Select Files + if s_idx == 1: f_opts = [os.path.basename(f) for f in generated_files] sel_indices = terminal_multiselect(f_opts, "Select Files to Scan") if not sel_indices: return @@ -1026,12 +996,10 @@ def menu_scan_ip_ranges(cfg, tester, generator): ip_provider_map = {} for gf in files_to_scan: - # Parse provider from filename prefix fname = os.path.basename(gf) parts = fname.split('_') provider = parts[0] if len(parts) > 1 else "Unknown" - # Colorize filename print print(f"Loading {Colors.CYAN}{fname}{Colors.ENDC}...") try: @@ -1041,7 +1009,6 @@ def menu_scan_ip_ranges(cfg, tester, generator): for ip in lines: ip_provider_map[ip] = provider except: pass - # Pass files_to_scan as source_files for backup tester.scan_ips(all_ips, settings, sources_info=ip_provider_map, source_files=files_to_scan) def menu_scan_ips(cfg, tester): @@ -1056,7 +1023,6 @@ def menu_scan_ips(cfg, tester): if idx == 3 or idx == -1: return if idx == 2: - # Resume Logic if not os.path.exists(CHECKPOINTS_DIR): print("No Checkpoints found.") time.sleep(1) @@ -1065,7 +1031,6 @@ def menu_scan_ips(cfg, tester): files = terminal_file_selector(CHECKPOINTS_DIR, extensions=['.json']) if not files: return - # Load first selected cp_path = files[0] try: with open(cp_path, 'r') as f: cp = json.load(f) @@ -1078,14 +1043,11 @@ def menu_scan_ips(cfg, tester): all_backed_up_ips = [] for bf in backup_files: if os.path.exists(bf): - # Use load_file logic (needs to be accessible or duplicated simpler) - # load_file is global. for d in load_file(bf): if 'ip' in d: all_backed_up_ips.append(d['ip']) else: print(f"Warning: Backup file missing: {bf}") - # Filter results scanned_set = set(r['ip'] for r in cp.get('results', [])) resume_ips = [ip for ip in all_backed_up_ips if ip not in scanned_set] print(f"Reconstructed {len(resume_ips)} remaining IPs from backups.") @@ -1100,7 +1062,6 @@ def menu_scan_ips(cfg, tester): print(f"Resuming {cp['filename']} ({cp['timestamp']})") - # Resume directly without asking for settings override tester.scan_ips(resume_ips, cp['settings'], resume_data=cp, sources_info=cp.get('sources_info'), source_files=backup_files) except Exception as e: @@ -1116,7 +1077,7 @@ def menu_scan_ips(cfg, tester): ips_to_scan = [i.strip() for i in inp.split(',') if i.strip()] elif idx == 1: files = terminal_file_selector(INPUT_DIR) - selected_files = files # Store for passing to scan_ips + selected_files = files for f in files: data = load_file(f) for d in data: @@ -1124,10 +1085,8 @@ def menu_scan_ips(cfg, tester): if not ips_to_scan: return - # No settings override prompt settings = cfg.get_defaults() - # Check for interactive small batch is_interactive = (idx == 0 and len(ips_to_scan) < 10) tester.scan_ips(ips_to_scan, settings, source_files=selected_files, interactive_confirm_save=is_interactive) @@ -1135,26 +1094,22 @@ def menu_scan_ips(cfg, tester): def menu_settings(cfg): while True: defaults = cfg.get_defaults() - # Respect JSON order (Python 3.7+ preserves insertion order) keys = list(defaults.keys()) - - # Hide internal/managed keys keys = [k for k in keys if k not in ['sources']] options = [] - options.append(("", f"{Colors.FAIL}Back{Colors.ENDC}")) # Back on Top + options.append(("", f"{Colors.FAIL}Back{Colors.ENDC}")) for k in keys: val = defaults[k] - # Colorize: Key (Cyan), Value (Green) disp = f"{Colors.CYAN}{k}{Colors.ENDC}: {Colors.GREEN}{val}{Colors.ENDC}" options.append((k, disp)) idx = terminal_menu(options, "Global Settings (Select to Edit)") - if idx == 0 or idx == -1: return # Back + if idx == 0 or idx == -1: return - key = keys[idx - 1] # Adjust for Back being at 0 + key = keys[idx - 1] current_val = defaults[key] print(f"\nEditing {Colors.BOLD}{key}{Colors.ENDC}") @@ -1162,8 +1117,7 @@ def menu_settings(cfg): final_val = None - # ComboBox Logic for specific keys - if isinstance(current_val, bool): # Boolean Toggle + if isinstance(current_val, bool): bool_opts = ["True", "False"] b_idx = terminal_menu(bool_opts, f"Select Value for {key} (Current: {current_val})") if b_idx == 0: final_val = True @@ -1175,7 +1129,7 @@ def menu_settings(cfg): opts.append("Custom") p_idx = terminal_menu(opts, f"Select Port (Current: {current_val})") if p_idx == -1: continue - if p_idx == len(opts) - 1: # Custom + if p_idx == len(opts) - 1: val = input("Enter Custom Port (or Enter to cancel): ").strip() if val.isdigit(): final_val = int(val) else: @@ -1198,10 +1152,8 @@ def menu_settings(cfg): if f_idx != -1: final_val = formats[f_idx] else: - # Standard Text Input new_val = input("Enter New Value (or Enter to cancel): ").strip() if new_val: - # Type inference if isinstance(current_val, int): if new_val.isdigit(): final_val = int(new_val) else: @@ -1217,7 +1169,6 @@ def main_menu(): tester = IPTester(cfg) generator = IPGenerator(tester) - # Ensure Dirs for d in [INPUT_DIR, TEMP_DIR, OUTPUT_RANGES_DIR, OUTPUT_FINAL_DIR]: if not os.path.exists(d): os.makedirs(d) @@ -1245,4 +1196,4 @@ def main_menu(): try: main_menu() except KeyboardInterrupt: - print("\nForce Quit.") + print("\nForce Quit.") \ No newline at end of file