From 23ea2e31de6bf223786d314dd46d1715763b13d1 Mon Sep 17 00:00:00 2001 From: "R.O.V.O.I.D" <141726490+IMROVOID@users.noreply.github.com> Date: Sun, 15 Feb 2026 08:36:47 +0330 Subject: [PATCH 1/3] Update main.py --- main.py | 119 +++++++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 97 insertions(+), 22 deletions(-) diff --git a/main.py b/main.py index 4f4a2e6..10990ea 100644 --- a/main.py +++ b/main.py @@ -11,9 +11,49 @@ import shutil from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime -import msvcrt import re +# --- Cross-Platform Input Handling --- +# This block replaces the direct import of msvcrt + +if os.name == 'nt': + try: + import msvcrt + except ImportError: + pass # Should not happen on Windows +else: + import tty + import termios + import select + +def get_char(): + """Reads a single character from input (blocking). Returns string.""" + if os.name == 'nt': + # Windows + try: + return msvcrt.getch().decode('utf-8', errors='ignore') + except: + return '' + else: + # Linux / Termux + fd = sys.stdin.fileno() + old_settings = termios.tcgetattr(fd) + try: + tty.setraw(sys.stdin.fileno()) + ch = sys.stdin.read(1) + finally: + termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) + return ch + +def kb_hit(): + """Returns True if a key is waiting to be read (non-blocking).""" + if os.name == 'nt': + return msvcrt.kbhit() + else: + # Linux: Check if data is available on stdin + dr, dw, de = select.select([sys.stdin], [], [], 0) + return dr != [] + # --- Colors --- class Colors: HEADER = '\033[96m' # Cyan @@ -35,7 +75,8 @@ class Colors: CHECKPOINTS_DIR = "Checkpoints" # Enable VT100 for Windows 10/11 -os.system('color') +if os.name == 'nt': + os.system('color') # --- Utils --- @@ -43,21 +84,54 @@ def clear_screen(): os.system('cls' if os.name == 'nt' else 'clear') def get_key(): - """Reads a key press and returns a unified key code.""" - key = msvcrt.getch() - if key in (b'\x00', b'\xe0'): - # Arrow keys or special function keys + """Reads a key press and returns a unified key code (UP, DOWN, ENTER, etc).""" + + if os.name == 'nt': + # --- WINDOWS LOGIC --- key = msvcrt.getch() - if key == b'H': return 'UP' - if key == b'P': return 'DOWN' - if key == b'M': return 'RIGHT' - if key == b'K': return 'LEFT' - elif key == b'\r': return 'ENTER' - elif key == b' ': return 'SPACE' - elif key == b'\x08': return 'BACKSPACE' - elif key == b'\x03': return 'CTRL_C' - elif key == b'\x1b': return 'ESC' - return None + if key in (b'\x00', b'\xe0'): + # Arrow keys or special function keys + try: + key = msvcrt.getch() + if key == b'H': return 'UP' + if key == b'P': return 'DOWN' + if key == b'M': return 'RIGHT' + if key == b'K': return 'LEFT' + except: pass + elif key == b'\r': return 'ENTER' + elif key == b' ': return 'SPACE' + elif key == b'\x08': return 'BACKSPACE' + elif key == b'\x03': return 'CTRL_C' + elif key == b'\x1b': return 'ESC' + # Return character if standard + try: return key.decode() + except: return None + + else: + # --- LINUX / TERMUX LOGIC --- + k1 = get_char() + + if k1 == '\x1b': # ESC or ANSI sequence start + # Check if there are more characters immediately following (Escape Sequence) + if kb_hit(): + k2 = get_char() + if k2 == '[': + k3 = get_char() + if k3 == 'A': return 'UP' + if k3 == 'B': return 'DOWN' + if k3 == 'C': return 'RIGHT' + if k3 == 'D': return 'LEFT' + return 'ESC' # Captured ESC sequence but didn't match arrows + else: + return 'ESC' # Just the ESC key + elif k1 == '\r': return 'ENTER' + elif k1 == '\n': return 'ENTER' + elif k1 == ' ': return 'SPACE' + elif k1 == '\x7f': return 'BACKSPACE' # Common backspace on Linux + elif k1 == '\x08': return 'BACKSPACE' # Alternative backspace + elif k1 == '\x03': return 'CTRL_C' + + return k1 def terminal_file_selector(base_dir=".", extensions=None): """ @@ -581,13 +655,14 @@ def input_listener(): time.sleep(0.5) # Yield control to pause menu in main thread continue - if msvcrt.kbhit(): + if kb_hit(): try: - key = msvcrt.getch().lower() - if key == b'p': + # Use get_char which is cross-platform now (returns string) + key = get_char().lower() + if key == 'p': state.paused = True - elif key == b's': state.stop_save() - elif key == b'q': state.stop_no_save() + elif key == 's': state.stop_save() + elif key == 'q': state.stop_no_save() except: pass time.sleep(0.1) @@ -1245,4 +1320,4 @@ def main_menu(): try: main_menu() except KeyboardInterrupt: - print("\nForce Quit.") + print("\nForce Quit.") \ No newline at end of file From 3589705ea49754049b82cfbe893e81c55d41e5bf Mon Sep 17 00:00:00 2001 From: "R.O.V.O.I.D" <141726490+IMROVOID@users.noreply.github.com> Date: Sun, 15 Feb 2026 08:48:37 +0330 Subject: [PATCH 2/3] Update main.py --- main.py | 244 +++++++++++--------------------------------------------- 1 file changed, 48 insertions(+), 196 deletions(-) diff --git a/main.py b/main.py index 10990ea..8308079 100644 --- a/main.py +++ b/main.py @@ -14,13 +14,12 @@ import re # --- Cross-Platform Input Handling --- -# This block replaces the direct import of msvcrt if os.name == 'nt': try: import msvcrt except ImportError: - pass # Should not happen on Windows + pass else: import tty import termios @@ -29,13 +28,11 @@ def get_char(): """Reads a single character from input (blocking). Returns string.""" if os.name == 'nt': - # Windows try: return msvcrt.getch().decode('utf-8', errors='ignore') except: return '' else: - # Linux / Termux fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) try: @@ -50,7 +47,6 @@ def kb_hit(): if os.name == 'nt': return msvcrt.kbhit() else: - # Linux: Check if data is available on stdin dr, dw, de = select.select([sys.stdin], [], [], 0) return dr != [] @@ -74,23 +70,22 @@ class Colors: OUTPUT_FINAL_DIR = os.path.join("Output", "Final") CHECKPOINTS_DIR = "Checkpoints" -# Enable VT100 for Windows 10/11 if os.name == 'nt': os.system('color') # --- Utils --- def clear_screen(): - os.system('cls' if os.name == 'nt' else 'clear') + # Use ANSI escape sequences instead of os.system('clear') to fix Termux linker errors + print("\033[H\033[J", end="") def get_key(): - """Reads a key press and returns a unified key code (UP, DOWN, ENTER, etc).""" + """Reads a key press and returns a unified key code.""" if os.name == 'nt': # --- WINDOWS LOGIC --- key = msvcrt.getch() if key in (b'\x00', b'\xe0'): - # Arrow keys or special function keys try: key = msvcrt.getch() if key == b'H': return 'UP' @@ -103,7 +98,6 @@ def get_key(): elif key == b'\x08': return 'BACKSPACE' elif key == b'\x03': return 'CTRL_C' elif key == b'\x1b': return 'ESC' - # Return character if standard try: return key.decode() except: return None @@ -111,9 +105,11 @@ def get_key(): # --- LINUX / TERMUX LOGIC --- k1 = get_char() - if k1 == '\x1b': # ESC or ANSI sequence start - # Check if there are more characters immediately following (Escape Sequence) - if kb_hit(): + if k1 == '\x1b': + # It is an escape sequence (Arrow keys send ESC + [ + A) + # We must wait slightly to see if more characters follow + dr, _, _ = select.select([sys.stdin], [], [], 0.05) + if dr: k2 = get_char() if k2 == '[': k3 = get_char() @@ -121,22 +117,20 @@ def get_key(): if k3 == 'B': return 'DOWN' if k3 == 'C': return 'RIGHT' if k3 == 'D': return 'LEFT' - return 'ESC' # Captured ESC sequence but didn't match arrows + return 'ESC' else: - return 'ESC' # Just the ESC key + return 'ESC' + elif k1 == '\r': return 'ENTER' elif k1 == '\n': return 'ENTER' elif k1 == ' ': return 'SPACE' - elif k1 == '\x7f': return 'BACKSPACE' # Common backspace on Linux - elif k1 == '\x08': return 'BACKSPACE' # Alternative backspace + elif k1 == '\x7f': return 'BACKSPACE' + elif k1 == '\x08': return 'BACKSPACE' elif k1 == '\x03': return 'CTRL_C' return k1 def terminal_file_selector(base_dir=".", extensions=None): - """ - Interactive TUI file explorer. - """ current_dir = os.path.abspath(base_dir) selected_files = [] cursor_idx = 0 @@ -231,48 +225,25 @@ def terminal_file_selector(base_dir=".", extensions=None): return [] def extract_ips_from_text(text): - """ - Extracts IPv4 and IPv6 addresses/CIDRs from any text using regex. - """ - # IPv4 CIDR or IP: x.x.x.x or x.x.x.x/xx ipv4_pattern = r'\b(?:\d{1,3}\.){3}\d{1,3}(?:/\d{1,2})?\b' - - # IPv6 CIDR or IP (simplified but practical) ipv6_pattern = r'(?:[0-9a-fA-F]{1,4}:){2,}(?:[0-9a-fA-F]{1,4}:?)(?:/\d{1,3})?' - ips = [] - - # Find all IPv4 - for match in re.findall(ipv4_pattern, text): - ips.append(match) - - # Find all IPv6 - for match in re.findall(ipv6_pattern, text): - ips.append(match) - - return list(set(ips)) # Dedup + for match in re.findall(ipv4_pattern, text): ips.append(match) + for match in re.findall(ipv6_pattern, text): ips.append(match) + return list(set(ips)) def load_file(filepath): - """ - Loads IPs/CIDRs from a file (JSON, CSV, or TXT) using robust regex extraction. - Supports nested JSON, messy TXT, etc. - Returns: List of dicts {'ip': str, ...} or just strings if lazy - """ if not os.path.exists(filepath): print(f"File not found: {filepath}") return [] data = [] - try: with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: content = f.read() - # 1. Try JSON First (for structured data preservation like latency/provider) try: json_content = json.loads(content) - - # Recursive helper to find objects with 'ip' key def extract_from_json(obj): if isinstance(obj, dict): if 'ip' in obj: @@ -284,28 +255,18 @@ def extract_from_json(obj): for k, v in obj.items(): extract_from_json(v) elif isinstance(obj, list): for item in obj: extract_from_json(item) - extract_from_json(json_content) + if data: return data + except json.JSONDecodeError: pass - if data: return data # If structured data found, return it - - # If JSON parsed but no 'ip' keys found, fall back to regex on string dump - - except json.JSONDecodeError: - pass # Not JSON, proceed to regex - - # 2. Regex Extraction (Fallback or for TXT/CSV) extracted_ips = extract_ips_from_text(content) for ip in extracted_ips: - # Basic validation try: ipaddress.ip_network(ip, strict=False) data.append({"ip": ip}) except: pass - except Exception as e: print(f"Error loading {filepath}: {e}") - return data def print_header(title="IP Range Generator"): @@ -316,14 +277,8 @@ def print_header(title="IP Range Generator"): print() def terminal_menu(options, title=None): - """ - Renders a menu with arrow key navigation. - options: list of strings or tuples (key, display_text) - """ cursor_idx = 0 - if title: print_header(title) - # Normalize options to list of strings for display display_options = [] for opt in options: if isinstance(opt, tuple): display_options.append(opt[1]) @@ -334,9 +289,8 @@ def terminal_menu(options, title=None): clear_screen() print_header(title) else: - print("\033[H", end="") # Move to home + print("\033[H", end="") - # Print Menu for i, opt in enumerate(display_options): if i == cursor_idx: print(f"{Colors.CYAN} > {opt} {Colors.ENDC}") @@ -353,18 +307,12 @@ def terminal_menu(options, title=None): elif key == 'ENTER': return cursor_idx elif key == 'ESC': - return -1 # Cancel/Back convention + return -1 def terminal_multiselect(options, title="Select Items"): - """ - Renders a multiselect menu. - options: list of strings or tuples (key, display_text) - Returns: list of selected indices - """ - selected_indices = set(range(len(options))) # Default all selected + selected_indices = set(range(len(options))) cursor_idx = 0 - # Normalize display_options = [] for opt in options: if isinstance(opt, tuple): display_options.append(opt[1]) @@ -385,7 +333,6 @@ def terminal_multiselect(options, title="Select Items"): prefix = " > " if i == cursor_idx else " " mark = "[*]" if i in selected_indices else "[ ]" - # Highlight cursor row if i == cursor_idx: print(f"{Colors.CYAN}{prefix}{mark} {opt}{Colors.ENDC}") else: @@ -413,11 +360,7 @@ def terminal_multiselect(options, title="Select Items"): elif key == 'ESC': return [] - def pause_menu(state, cfg, current_settings): - """ - Displays the Pause Menu and handles interaction. - """ while True: clear_screen() print_header("PAUSE MENU") @@ -433,34 +376,22 @@ def pause_menu(state, cfg, current_settings): idx = terminal_menu(options) - if idx == 0: # Resume + if idx == 0: state.paused = False return - elif idx == 1: # Settings - # Reuse menu_settings but we need to know if we apply to Global or just Current - # menu_settings modifies 'cfg' (global). - # We also want to modify 'current_settings' (runtime). - + elif idx == 1: s_opts = ["1. Edit Global Config (Permanent)", "2. Edit Current Scan Settings (Runtime only)", "3. Back"] s_idx = terminal_menu(s_opts, "Settings Mode") if s_idx == 0: menu_settings(cfg) - # Re-read defaults to apply to current if desired? - # Usually user expects global change to apply now. - # Let's update current_settings from cfg new_defs = cfg.get_defaults() current_settings.update(new_defs) print("Global settings applied to current scan.") time.sleep(1) elif s_idx == 1: - # We need a temp config manager wrapper to edit 'current_settings' - # Hack: Just use menu_settings logic but pass a mocked cfg? - # Or just manually edit key values here? - # Re-using menu_settings is best if possible. - # Let's create a dummy ConfigManager that wraps current_settings class RuntimeConfig: def get_defaults(self): return current_settings def update_default(self, k, v): current_settings[k] = v @@ -469,18 +400,16 @@ def update_default(self, k, v): current_settings[k] = v print("Runtime settings updated.") time.sleep(1) - elif idx == 2: # Stop & Save + elif idx == 2: state.stop_save() - state.paused = False # Break pause loop to let main loop exit + state.paused = False return - elif idx == 3: # Quit + elif idx == 3: state.stop_no_save() state.paused = False return -# --- Core Logic --- - class ScanState: def __init__(self): self.paused = False @@ -609,21 +538,12 @@ def run_test(self, ip, settings): return None def scan_ips(self, ips, settings, output_dir=OUTPUT_FINAL_DIR, resume_data=None, sources_info=None, source_files=None, interactive_confirm_save=False): - """ - ips: list of IP strings - settings: dict - resume_data: dict containing previous results if resuming - sources_info: list of strings describing sources (e.g. ['CloudFlare', 'Fastly']) or dict {ip: provider} - source_files: list of file paths used as input (for backup) - interactive_confirm_save: if True, prints results table after scan and confirmation to keep file - """ print(f"\n{Colors.HEADER}Starting Scan on {len(ips)} IPs...{Colors.ENDC}") print(f"{Colors.CYAN}Controls: {Colors.WARNING}[P]{Colors.CYAN}ause | {Colors.FAIL}[S]{Colors.CYAN}top & Save | {Colors.FAIL}[Q]{Colors.CYAN}uit (No Save){Colors.ENDC}") max_threads = settings.get('threads', 100) output_format = settings.get('output_format', 'txt') - # Setup File Path if resume_data and 'filename' in resume_data: filename = resume_data['filename'] results = resume_data.get('results', []) @@ -644,7 +564,6 @@ def scan_ips(self, ips, settings, output_dir=OUTPUT_FINAL_DIR, resume_data=None, elif isinstance(sources_info, list): scan_sources = set(sources_info) - # Add Sources to settings/metadata for output settings['sources'] = list(scan_sources) state = ScanState() @@ -652,12 +571,11 @@ def scan_ips(self, ips, settings, output_dir=OUTPUT_FINAL_DIR, resume_data=None, def input_listener(): while not state.stopped: if state.paused: - time.sleep(0.5) # Yield control to pause menu in main thread + time.sleep(0.5) continue if kb_hit(): try: - # Use get_char which is cross-platform now (returns string) key = get_char().lower() if key == 'p': state.paused = True @@ -683,12 +601,11 @@ def input_listener(): elif output_format == 'json': json_freq = settings.get('json_update_interval', 10000) print(f"{Colors.CYAN} [i] JSON format selected. File will be updated every {json_freq} IPs.{Colors.ENDC}") - print() # Spacing + print() completed = 0 success_count = 0 - # If resuming, update counts if resume_data: for r in results: if r['status'] == 'SUCCESS': success_count += 1 @@ -707,27 +624,20 @@ def task(ip): res = {'ip': ip, 'latency_ms': ping if ping is not None else 0, 'status': status} - # Tag Provider if ip in ip_provider_map: res['provider'] = ip_provider_map[ip] return res - futures = {} # Future -> IP + futures = {} with ThreadPoolExecutor(max_workers=max_threads) as executor: for ip in ips: if state.stopped: break - # Pause Loop with Menu while state.paused: - # Entering Pause Menu (Blocking Main Thread) - # Listener thread is yielding because state.paused is True pause_menu(state, self.cfg, settings) - # When pause_menu returns, state.paused might be False (Resume) or True (Stop) if state.stopped: break - - # Clear screen resume message print(f"\n{Colors.GREEN} [RESUMED] Continuing scan...{Colors.ENDC}") if state.stopped: break @@ -735,7 +645,6 @@ def task(ip): ft = executor.submit(task, ip) futures[ft] = ip - # As Completed for future in as_completed(futures): ip_processed = futures[future] @@ -743,7 +652,7 @@ def task(ip): res = future.result() except: res = None - if not res: continue # Task was stopped or failed + if not res: continue with lock: completed += 1 @@ -763,7 +672,6 @@ def task(ip): prov = f" | {res.get('provider','')}" if 'provider' in res else "" f_handle.write(f"{res['ip']} | {res['latency_ms']}ms | {res['status']}{prov}\n") - # Check settings & periodic save logic check_interval = settings.get('settings_check_interval', 1000) if completed % check_interval == 0: diff, new_defaults = self.cfg.check_for_changes(settings) @@ -778,7 +686,6 @@ def task(ip): interval = settings.get('json_update_interval', 10000) if completed % interval == 0: try: - # Filter and Clean Results for Dump save_all = settings.get('save_failed', False) dump_results = [] for r in results: @@ -794,11 +701,9 @@ def task(ip): elif output_format == 'txt' and f_handle: if completed % settings.get('txt_update_interval', 1000) == 0: f_handle.flush() - # End of loop print(f"\n{Colors.BOLD}Scan Finished or Stopped.{Colors.ENDC}") if f_handle: f_handle.close() - # Interactive Small Batch Confirmation if interactive_confirm_save and not state.stopped: print(f"\n{Colors.HEADER}--- Scan Results ---{Colors.ENDC}") print(f"{'IP':<20} | {'Ping':<8} | {'Status'}") @@ -813,20 +718,15 @@ def task(ip): print("Results discarded.") return - # --- Checkpoint & Final Save --- - if state.save_progress and state.stopped: - # Calculate remaining IPs scanned_set = set(r['ip'] for r in results) remaining = [ip for ip in ips if ip not in scanned_set] print(f"Creating Checkpoint... ({len(remaining)} IPs remaining)") if not os.path.exists(CHECKPOINTS_DIR): os.makedirs(CHECKPOINTS_DIR) - # Backup Logic backup_files_paths = [] if source_files: - # Create a backup folder for this checkpoint safe_ts = os.path.basename(filename).replace('.', '_').replace(':', '') backup_dir = os.path.join(CHECKPOINTS_DIR, f"Backup_{safe_ts}") if not os.path.exists(backup_dir): os.makedirs(backup_dir) @@ -851,7 +751,7 @@ def task(ip): "sources_info": sources_info, "results": results, "backup_files": backup_files_paths, - "remaining_ips": remaining if not backup_files_paths else [] # Optimize JSON if backed up + "remaining_ips": remaining if not backup_files_paths else [] } with open(cp_path, 'w') as f: @@ -860,10 +760,8 @@ def task(ip): if backup_files_paths: print(f"{Colors.CYAN}Source files backed up to: {backup_dir}{Colors.ENDC}") - # Final JSON dump if output_format == 'json' and results: print("Saving Final JSON report...") - # Filter and Clean Results for Dump save_all = settings.get('save_failed', False) dump_results = [] for r in results: @@ -888,15 +786,10 @@ def __init__(self, tester): self.tester = tester def expand_cidr(self, cidr, range_level='Short'): - """ - Expands a CIDR. If input is a single IP (no slash), applies range_level. - Short=/24 (256), Medium=/20 (4096), Full=/16 (65536). - """ if '/' not in cidr: - # Single IP -> Range if range_level == 'Medium': cidr += '/20' elif range_level == 'Full': cidr += '/16' - else: cidr += '/24' # Default Short + else: cidr += '/24' try: return [str(ip) for ip in ipaddress.ip_network(cidr.strip(), strict=False)] @@ -914,7 +807,6 @@ def generate_and_save(self, cidrs_data, settings, output_dir=OUTPUT_RANGES_DIR): print(f"Generating IPs for {Colors.BOLD}{cidr}{Colors.ENDC} ({prefix})...") - # Warn if IPv6 range is too big try: network = ipaddress.ip_network(cidr.strip(), strict=False) if network.version == 6 and network.num_addresses > 1000000: @@ -940,15 +832,6 @@ def generate_and_save(self, cidrs_data, settings, output_dir=OUTPUT_RANGES_DIR): return generated_files -# --- Menu Functions --- - -def get_user_settings_override(current_defaults): - print("Use default settings? (Enter=Yes, 'n'=Edit)") - if input("Diff: ").strip().lower() == 'n': - print(f"Current: {current_defaults}") - # Simplification: just return defaults for now or implement edit loop - return current_defaults - def menu_scan_ip_ranges(cfg, tester, generator): settings = cfg.get_defaults() templates = cfg.get_templates() @@ -963,7 +846,7 @@ def menu_scan_ip_ranges(cfg, tester, generator): idx = terminal_menu(options, "Generate & Scan IP Ranges") if idx == 3 or idx == -1: return - targets = [] # List of {'cidr': str, 'prefix': str} + targets = [] if idx == 0: t_keys = list(templates.keys()) @@ -972,9 +855,9 @@ def menu_scan_ip_ranges(cfg, tester, generator): sel_idx = terminal_menu(templ_options, "Select Template") - if sel_idx == -1: return # Back + if sel_idx == -1: return - if sel_idx == len(t_keys): # All Templates (Last option) + if sel_idx == len(t_keys): selected_keys = t_keys else: selected_keys = [t_keys[sel_idx]] @@ -984,14 +867,12 @@ def menu_scan_ip_ranges(cfg, tester, generator): for key in selected_keys: tmpl_data = templates[key] - # Handle potential dictionary structure (ipv4/ipv6 keys) if isinstance(tmpl_data, dict): for ip_type, cidr_list in tmpl_data.items(): if ip_type == 'ipv6' and not ipv6_enabled: continue if isinstance(cidr_list, list): for c in cidr_list: targets.append({'cidr': c, 'prefix': key}) elif isinstance(tmpl_data, list): - # Legacy or simple list support for c in tmpl_data: targets.append({'cidr': c, 'prefix': key}) elif idx == 1: @@ -1001,15 +882,12 @@ def menu_scan_ip_ranges(cfg, tester, generator): if not provider_name: provider_name = "CustomFile" for f in files: - # Use robust load_file which handles JSON/TXT/CSV/Regex loaded_data = load_file(f) for entry in loaded_data: - # entry is {'ip': '...'} - # We need to adapt it to targets: {'cidr': ..., 'prefix': ...} if 'ip' in entry: targets.append({'cidr': entry['ip'], 'prefix': provider_name}) - elif idx == 2: # Terminal Input + elif idx == 2: inp = input("\nEnter IPs/CIDRs (comma separated): ") provider_name = input(f"{Colors.CYAN}Enter Custom Provider Name (Optional, Press Enter for 'Manual'): {Colors.ENDC}").strip() @@ -1026,7 +904,6 @@ def menu_scan_ip_ranges(cfg, tester, generator): working_targets = targets - # --- Pre-Generation Workflow --- print(f"\nLoaded {len(working_targets)} target ranges.") pre_opts = [ "1. Generate All (Default)", @@ -1035,20 +912,17 @@ def menu_scan_ip_ranges(cfg, tester, generator): ] pre_idx = terminal_menu(pre_opts, "Pre-Generation Options") - if pre_idx == 1: # Ping Check + if pre_idx == 1: print(f"\n{Colors.CYAN}Pinging gateway IPs to filter unreachable ranges...{Colors.ENDC}") filtered = [] for t in working_targets: - # Simple check: Try to connect to network address (or +1) - # For this simple tool, let's just use the first IP in range try: net = ipaddress.ip_network(t['cidr'], strict=False) test_ip = str(net[1]) if net.num_addresses > 1 else str(net[0]) - # Quick TCP connect to 80 or 443 is_alive = False for p in [80, 443]: - if tester.test_tcp(test_ip, p, 0.5): # 500ms timeout + if tester.test_tcp(test_ip, p, 0.5): is_alive = True break @@ -1067,29 +941,26 @@ def menu_scan_ip_ranges(cfg, tester, generator): print(f"Filtered down to {len(filtered)} ranges.") working_targets = filtered time.sleep(1) - print() # Add spacing before generation + print() - elif pre_idx == 2: # Manual Selection + elif pre_idx == 2: opts = [f"{t['prefix']} - {t['cidr']}" for t in working_targets] sel_indices = terminal_multiselect(opts, "Select Ranges to Generate") if not sel_indices: return working_targets = [working_targets[i] for i in sel_indices] - # Generate generated_files = generator.generate_and_save(working_targets, settings) if generated_files: print("\nRange Generation Complete.") input("Press Enter to continue...") - # --- Post-Generation Workflow --- - scan_opts = ["1. Scan All Generated Files (Default)", "2. Select Files to Scan", "3. Return to Menu"] s_idx = terminal_menu(scan_opts, "Scan Generated Ranges?") files_to_scan = generated_files - if s_idx == 1: # Select Files + if s_idx == 1: f_opts = [os.path.basename(f) for f in generated_files] sel_indices = terminal_multiselect(f_opts, "Select Files to Scan") if not sel_indices: return @@ -1101,12 +972,10 @@ def menu_scan_ip_ranges(cfg, tester, generator): ip_provider_map = {} for gf in files_to_scan: - # Parse provider from filename prefix fname = os.path.basename(gf) parts = fname.split('_') provider = parts[0] if len(parts) > 1 else "Unknown" - # Colorize filename print print(f"Loading {Colors.CYAN}{fname}{Colors.ENDC}...") try: @@ -1116,7 +985,6 @@ def menu_scan_ip_ranges(cfg, tester, generator): for ip in lines: ip_provider_map[ip] = provider except: pass - # Pass files_to_scan as source_files for backup tester.scan_ips(all_ips, settings, sources_info=ip_provider_map, source_files=files_to_scan) def menu_scan_ips(cfg, tester): @@ -1131,7 +999,6 @@ def menu_scan_ips(cfg, tester): if idx == 3 or idx == -1: return if idx == 2: - # Resume Logic if not os.path.exists(CHECKPOINTS_DIR): print("No Checkpoints found.") time.sleep(1) @@ -1140,7 +1007,6 @@ def menu_scan_ips(cfg, tester): files = terminal_file_selector(CHECKPOINTS_DIR, extensions=['.json']) if not files: return - # Load first selected cp_path = files[0] try: with open(cp_path, 'r') as f: cp = json.load(f) @@ -1153,14 +1019,11 @@ def menu_scan_ips(cfg, tester): all_backed_up_ips = [] for bf in backup_files: if os.path.exists(bf): - # Use load_file logic (needs to be accessible or duplicated simpler) - # load_file is global. for d in load_file(bf): if 'ip' in d: all_backed_up_ips.append(d['ip']) else: print(f"Warning: Backup file missing: {bf}") - # Filter results scanned_set = set(r['ip'] for r in cp.get('results', [])) resume_ips = [ip for ip in all_backed_up_ips if ip not in scanned_set] print(f"Reconstructed {len(resume_ips)} remaining IPs from backups.") @@ -1175,7 +1038,6 @@ def menu_scan_ips(cfg, tester): print(f"Resuming {cp['filename']} ({cp['timestamp']})") - # Resume directly without asking for settings override tester.scan_ips(resume_ips, cp['settings'], resume_data=cp, sources_info=cp.get('sources_info'), source_files=backup_files) except Exception as e: @@ -1191,7 +1053,7 @@ def menu_scan_ips(cfg, tester): ips_to_scan = [i.strip() for i in inp.split(',') if i.strip()] elif idx == 1: files = terminal_file_selector(INPUT_DIR) - selected_files = files # Store for passing to scan_ips + selected_files = files for f in files: data = load_file(f) for d in data: @@ -1199,10 +1061,8 @@ def menu_scan_ips(cfg, tester): if not ips_to_scan: return - # No settings override prompt settings = cfg.get_defaults() - # Check for interactive small batch is_interactive = (idx == 0 and len(ips_to_scan) < 10) tester.scan_ips(ips_to_scan, settings, source_files=selected_files, interactive_confirm_save=is_interactive) @@ -1210,26 +1070,22 @@ def menu_scan_ips(cfg, tester): def menu_settings(cfg): while True: defaults = cfg.get_defaults() - # Respect JSON order (Python 3.7+ preserves insertion order) keys = list(defaults.keys()) - - # Hide internal/managed keys keys = [k for k in keys if k not in ['sources']] options = [] - options.append(("", f"{Colors.FAIL}Back{Colors.ENDC}")) # Back on Top + options.append(("", f"{Colors.FAIL}Back{Colors.ENDC}")) for k in keys: val = defaults[k] - # Colorize: Key (Cyan), Value (Green) disp = f"{Colors.CYAN}{k}{Colors.ENDC}: {Colors.GREEN}{val}{Colors.ENDC}" options.append((k, disp)) idx = terminal_menu(options, "Global Settings (Select to Edit)") - if idx == 0 or idx == -1: return # Back + if idx == 0 or idx == -1: return - key = keys[idx - 1] # Adjust for Back being at 0 + key = keys[idx - 1] current_val = defaults[key] print(f"\nEditing {Colors.BOLD}{key}{Colors.ENDC}") @@ -1237,8 +1093,7 @@ def menu_settings(cfg): final_val = None - # ComboBox Logic for specific keys - if isinstance(current_val, bool): # Boolean Toggle + if isinstance(current_val, bool): bool_opts = ["True", "False"] b_idx = terminal_menu(bool_opts, f"Select Value for {key} (Current: {current_val})") if b_idx == 0: final_val = True @@ -1250,7 +1105,7 @@ def menu_settings(cfg): opts.append("Custom") p_idx = terminal_menu(opts, f"Select Port (Current: {current_val})") if p_idx == -1: continue - if p_idx == len(opts) - 1: # Custom + if p_idx == len(opts) - 1: val = input("Enter Custom Port (or Enter to cancel): ").strip() if val.isdigit(): final_val = int(val) else: @@ -1273,10 +1128,8 @@ def menu_settings(cfg): if f_idx != -1: final_val = formats[f_idx] else: - # Standard Text Input new_val = input("Enter New Value (or Enter to cancel): ").strip() if new_val: - # Type inference if isinstance(current_val, int): if new_val.isdigit(): final_val = int(new_val) else: @@ -1292,7 +1145,6 @@ def main_menu(): tester = IPTester(cfg) generator = IPGenerator(tester) - # Ensure Dirs for d in [INPUT_DIR, TEMP_DIR, OUTPUT_RANGES_DIR, OUTPUT_FINAL_DIR]: if not os.path.exists(d): os.makedirs(d) From c8d6485799b315e99fa7498691173e2321026005 Mon Sep 17 00:00:00 2001 From: "R.O.V.O.I.D" <141726490+IMROVOID@users.noreply.github.com> Date: Sun, 15 Feb 2026 08:53:23 +0330 Subject: [PATCH 3/3] Update main.py --- main.py | 80 +++++++++++++++++++++++++++++++++++++-------------------- 1 file changed, 52 insertions(+), 28 deletions(-) diff --git a/main.py b/main.py index 8308079..6222b88 100644 --- a/main.py +++ b/main.py @@ -33,10 +33,11 @@ def get_char(): except: return '' else: + # Linux: Single char read for pause listener fd = sys.stdin.fileno() old_settings = termios.tcgetattr(fd) try: - tty.setraw(sys.stdin.fileno()) + tty.setraw(fd) ch = sys.stdin.read(1) finally: termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) @@ -76,7 +77,7 @@ class Colors: # --- Utils --- def clear_screen(): - # Use ANSI escape sequences instead of os.system('clear') to fix Termux linker errors + # ANSI escape code to clear screen (works on Termux & Linux without spawning subshell) print("\033[H\033[J", end="") def get_key(): @@ -103,32 +104,55 @@ def get_key(): else: # --- LINUX / TERMUX LOGIC --- - k1 = get_char() - - if k1 == '\x1b': - # It is an escape sequence (Arrow keys send ESC + [ + A) - # We must wait slightly to see if more characters follow - dr, _, _ = select.select([sys.stdin], [], [], 0.05) - if dr: - k2 = get_char() - if k2 == '[': - k3 = get_char() - if k3 == 'A': return 'UP' - if k3 == 'B': return 'DOWN' - if k3 == 'C': return 'RIGHT' - if k3 == 'D': return 'LEFT' - return 'ESC' - else: - return 'ESC' - - elif k1 == '\r': return 'ENTER' - elif k1 == '\n': return 'ENTER' - elif k1 == ' ': return 'SPACE' - elif k1 == '\x7f': return 'BACKSPACE' - elif k1 == '\x08': return 'BACKSPACE' - elif k1 == '\x03': return 'CTRL_C' - - return k1 + # We must enter raw mode ONCE to capture the full sequence (ESC [ A) + fd = sys.stdin.fileno() + old_settings = termios.tcgetattr(fd) + try: + tty.setraw(fd) + + # Read first byte directly from FD to avoid python buffering issues + ch_bytes = os.read(fd, 1) + if not ch_bytes: return None + ch = ch_bytes.decode('utf-8', errors='ignore') + + if ch == '\x1b': + # It's ESC. Check if a sequence follows immediately. + dr, _, _ = select.select([fd], [], [], 0.1) + if dr: + # Sequence detected, read next byte + seq1_bytes = os.read(fd, 1) + seq1 = seq1_bytes.decode('utf-8', errors='ignore') + + if seq1 == '[': # CSI (Common Arrows) + seq2_bytes = os.read(fd, 1) + seq2 = seq2_bytes.decode('utf-8', errors='ignore') + if seq2 == 'A': return 'UP' + if seq2 == 'B': return 'DOWN' + if seq2 == 'C': return 'RIGHT' + if seq2 == 'D': return 'LEFT' + + elif seq1 == 'O': # SS3 (Application Cursor Keys) + seq2_bytes = os.read(fd, 1) + seq2 = seq2_bytes.decode('utf-8', errors='ignore') + if seq2 == 'A': return 'UP' + if seq2 == 'B': return 'DOWN' + if seq2 == 'C': return 'RIGHT' + if seq2 == 'D': return 'LEFT' + + return 'ESC' # Captured sequence but unknown + else: + return 'ESC' # Genuine ESC key + + elif ch == '\r' or ch == '\n': return 'ENTER' + elif ch == ' ': return 'SPACE' + elif ch == '\x7f' or ch == '\x08': return 'BACKSPACE' + elif ch == '\x03': return 'CTRL_C' + + return ch + + finally: + # Always restore settings + termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) def terminal_file_selector(base_dir=".", extensions=None): current_dir = os.path.abspath(base_dir)