From 4d7b722467ea0b1ef3338b8b45829590c1c6727d Mon Sep 17 00:00:00 2001 From: Mahmudul Alam Date: Tue, 10 Feb 2026 12:03:09 +0600 Subject: [PATCH 1/3] Updated integrated installer for android to use localhost --- .../install_handler/android/android_sdk.py | 211 +++--- Framework/install_handler/android/emulator.py | 675 ++++-------------- Framework/install_handler/utils.py | 133 ++++ server/installers.py | 623 ++++++++++++++++ server/main.py | 2 + 5 files changed, 999 insertions(+), 645 deletions(-) create mode 100644 server/installers.py diff --git a/Framework/install_handler/android/android_sdk.py b/Framework/install_handler/android/android_sdk.py index 28d11d1a..31f6ff92 100644 --- a/Framework/install_handler/android/android_sdk.py +++ b/Framework/install_handler/android/android_sdk.py @@ -6,7 +6,7 @@ import subprocess from pathlib import Path import httpx -from Framework.install_handler.utils import send_response +from Framework.install_handler.utils import send_response, pty_stream from settings import ZEUZ_NODE_DOWNLOADS_DIR @@ -289,116 +289,110 @@ def _find_sdkmanager(sdk_root: Path) -> Path | None: -async def _run_sdkmanager(sdk_root: Path, args: list[str]) -> bool: +async def _run_sdkmanager(sdk_root: Path, args: list[str], component_label: str | None = None) -> bool: + """Run sdkmanager with the given args, streaming output lines to send_response in real-time.""" try: sdkmanager = _find_sdkmanager(sdk_root) if not sdkmanager: print("[installer][android-sdk] sdkmanager not found") return False - + import asyncio - import subprocess - + + label = component_label or " ".join(args) system = platform.system() - output = None # Initialize for later use - + + async def _send_line(line: str): + """Send a single output line as a progress event.""" + stripped = line.strip() + if not stripped: + return + print(stripped) + await send_response({ + "action": "status", + "data": { + "category": "Android", + "name": "Android SDK", + "status": "installing", + "comment": f"[{label}] {stripped}", + } + }) + + await send_response({ + "action": "status", + "data": { + "category": "Android", + "name": "Android SDK", + "status": "installing", + "comment": f"Installing {label}...", + } + }) + if system == "Windows": - # Windows - use PowerShell to pipe 'y' responses for auto-accepting licenses - # Quote each argument individually to prevent PowerShell from interpreting semicolons yes_responses = ";".join(["echo y"] * 20) - # Wrap each arg in single quotes to preserve semicolons in package names like "platforms;android-36" quoted_args = " ".join([f"'{arg}'" for arg in args]) shell_cmd = f'powershell -Command "{yes_responses} | &\\"{str(sdkmanager)}\\" --sdk_root={sdk_root} {quoted_args}"' print(f"[installer][android-sdk] Running: sdkmanager {' '.join(args)}") - print(f"[installer][android-sdk] This may take 5-15 minutes to download ~450MB of components...") - - loop = asyncio.get_event_loop() - - # Use Popen and print output in real-time - def run_sdkmanager(): - process = subprocess.Popen( - shell_cmd, - shell=True, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - bufsize=1 # Line buffered - ) - # Close stdin - PowerShell piping will provide input - process.stdin.close() - - # Print output in real-time as it comes - output_lines = [] - try: - for line in iter(process.stdout.readline, ''): - if line: - print(line.rstrip()) # Print immediately - output_lines.append(line.strip()) - except Exception as e: - print(f"[installer][android-sdk] Output reading error: {e}") - - process.stdout.close() - returncode = process.wait(timeout=1800) - - # Return last 50 lines for debugging - return returncode, "\n".join(output_lines[-50:]) if output_lines else "" - - returncode, output = await loop.run_in_executor(None, run_sdkmanager) - - class Result: - pass - result = Result() - result.returncode = returncode - elif system == "Linux": - # Linux can execute directly - cmd = [str(sdkmanager), f"--sdk_root={sdk_root}"] + args - print(f"[installer][android-sdk] Running: {' '.join(cmd)}") - - loop = asyncio.get_event_loop() - result = await loop.run_in_executor( - None, - lambda: subprocess.run( - cmd, - capture_output=True, - text=True, - timeout=1800 # 30 minutes timeout - ) + + output_lines: list[str] = [] + + async def _win_stream(proc: asyncio.subprocess.Process) -> int: + while True: + raw = await proc.stdout.readline() + if not raw: + break + line = raw.decode("utf-8", errors="replace").rstrip() + if not line: + continue + output_lines.append(line) + await _send_line(line) + return await proc.wait() + + proc = await asyncio.create_subprocess_shell( + shell_cmd, + stdin=asyncio.subprocess.DEVNULL, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, ) - output = (result.stdout or "") + (result.stderr or "") - elif system == "Darwin": - # macOS can execute directly + returncode = await _win_stream(proc) + elif system in ("Linux", "Darwin"): cmd = [str(sdkmanager), f"--sdk_root={sdk_root}"] + args print(f"[installer][android-sdk] Running: {' '.join(cmd)}") - - loop = asyncio.get_event_loop() - result = await loop.run_in_executor( - None, - lambda: subprocess.run( - cmd, - capture_output=True, - text=True, - timeout=1800 # 30 minutes timeout - ) + + returncode, output_lines = await pty_stream( + cmd, + stdin_data="y\n" * 20, + on_line=_send_line, + timeout_s=1800, ) - output = (result.stdout or "") + (result.stderr or "") else: print(f"[installer][android-sdk] Unsupported platform: {system}") return False - - if result.returncode != 0: - print(f"[installer][android-sdk] sdkmanager failed (returncode={result.returncode})") - if output: - print(f"[installer][android-sdk] Last output:\n{output}") + + if returncode != 0: + print(f"[installer][android-sdk] sdkmanager failed (returncode={returncode})") + await send_response({ + "action": "status", + "data": { + "category": "Android", + "name": "Android SDK", + "status": "installing", + "comment": f"Failed to install {label} (exit code {returncode})", + } + }) return False - - print(f"[installer][android-sdk] sdkmanager completed successfully") - if output: - print(f"[installer][android-sdk] Final output:\n{output[-500:]}") # Last 500 chars + + print(f"[installer][android-sdk] sdkmanager completed successfully for {label}") + await send_response({ + "action": "status", + "data": { + "category": "Android", + "name": "Android SDK", + "status": "installing", + "comment": f"{label} installed successfully", + } + }) return True - except subprocess.TimeoutExpired: - print("[installer][android-sdk] sdkmanager timed out after 30 minutes") - return False except Exception as e: print(f"[installer][android-sdk] sdkmanager error: {e}") import traceback @@ -524,26 +518,27 @@ async def install() -> bool: # Continue; some environments prompt-less acceptance may not be required - # Install core components + # Install core components one at a time for per-component progress core_components = [ - "platform-tools", - "emulator", - # A recent platform and build-tools; adjust if needed - "platforms;android-36", - "build-tools;34.0.0", + ("platform-tools", "platform-tools"), + ("emulator", "emulator"), + ("platforms;android-36", "platforms (Android 36)"), + ("build-tools;34.0.0", "build-tools 34.0.0"), ] - await send_response({ - "action": "status", - "data": { - "category": "Android", - "name": "Android SDK", - "status": "installing", - "comment": "Installing SDK components (platform-tools, emulator, platforms, build-tools)...", - } - }) - if not await _run_sdkmanager(sdk_root, core_components): - print("[installer][android-sdk] Failed installing one or more SDK components") - return False + + for i, (pkg, label) in enumerate(core_components, 1): + await send_response({ + "action": "status", + "data": { + "category": "Android", + "name": "Android SDK", + "status": "installing", + "comment": f"Installing component {i}/{len(core_components)}: {label}...", + } + }) + if not await _run_sdkmanager(sdk_root, [pkg], component_label=label): + print(f"[installer][android-sdk] Failed installing {label}") + return False # Update PATH after successful installation diff --git a/Framework/install_handler/android/emulator.py b/Framework/install_handler/android/emulator.py index 690612e7..807a3f6b 100644 --- a/Framework/install_handler/android/emulator.py +++ b/Framework/install_handler/android/emulator.py @@ -6,7 +6,7 @@ import random from pathlib import Path from settings import ZEUZ_NODE_DOWNLOADS_DIR -from Framework.install_handler.utils import send_response, debug +from Framework.install_handler.utils import send_response, debug, pty_stream from Framework.install_handler.android.android_sdk import _get_sdk_root @@ -805,483 +805,161 @@ def _generate_avd_name(android_version: str, existing_avds: list[str]) -> str: return f"{android_version}-{word1}-{word2}-{random_num}" -def _run_sdkmanager_install_windows(sdkmanager: Path, sdk_root: Path, system_image: str, loop=None, device_id: str = None) -> tuple[bool, str]: - """Install system image on Windows with real-time output""" +async def _run_sdkmanager_install(sdkmanager: Path, sdk_root: Path, system_image: str, device_id: str | None = None) -> tuple[bool, str]: + """Install system image with real-time output streamed via send_response.""" try: - # Use PowerShell to handle the command properly and auto-accept licenses - # This approach pipes 'y' responses to automatically accept licenses - yes_responses = ";".join(["echo y"] * 20) - quoted_image = f"'{system_image}'" - shell_cmd = f'powershell -Command "{yes_responses} | &\\"{str(sdkmanager)}\\" --sdk_root={sdk_root} {quoted_image}"' - - if debug: - print(f"[installer][emulator] Running: sdkmanager --sdk_root={sdk_root} {system_image}") - print(f"[installer][emulator] This may take 10-30 minutes to download system image...") - - process = subprocess.Popen( - shell_cmd, - shell=True, - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - bufsize=1 # Line buffered - ) - # Close stdin - PowerShell piping will provide input - process.stdin.close() - - # Print output in real-time as it comes, showing progress on single line - output_lines = [] - last_progress = "" - progress_count = [] - try: - for line in iter(process.stdout.readline, ''): - if line: - stripped = line.strip() - output_lines.append(stripped) - - # Extract progress percentage from lines like "[====] 25% Loading..." - progress_match = re.search(r'\[.*?\]\s*(\d+)%\s*(.+)', stripped) - if progress_match: - percent = int(progress_match.group(1)) - status = progress_match.group(2).strip() - current_progress = f"{percent}% {status}" - if current_progress != last_progress: - print(f"\r[installer][emulator] Download progress: {current_progress}", end='', flush=True) - last_progress = current_progress - - if loop and device_id: - rounded_percent = round(percent / 10) * 10 - if rounded_percent not in progress_count: - progress_count.append(rounded_percent) - asyncio.run_coroutine_threadsafe( - send_response({ - "action": "status", - "data": { - "category": "AndroidEmulator", - "package": device_id, - "status": "installing", - "comment": f"Downloading system image... {percent}% {status}", - } - }), - loop - ) - elif stripped and not stripped.startswith('[') and '%' not in stripped: - # Print important non-progress messages on new line - print(f"\n[installer][emulator] {stripped}") - elif stripped.endswith('%'): - # Handle lines that end with just percentage - percent_match = re.search(r'(\d+)%', stripped) - if percent_match: - percent = int(percent_match.group(1)) - print(f"\r[installer][emulator] Download progress: {stripped}", end='', flush=True) - - if loop and device_id: - rounded_percent = round(percent / 10) * 10 - if rounded_percent not in progress_count: - progress_count.append(rounded_percent) - asyncio.run_coroutine_threadsafe( - send_response({ - "action": "status", - "data": { - "category": "AndroidEmulator", - "package": device_id, - "status": "installing", - "comment": f"Downloading system image... {percent}%", - } - }), - loop - ) - except Exception as e: - print(f"\n[installer][emulator] Output reading error: {e}") - finally: - print() # New line after progress completes - - process.stdout.close() - returncode = process.wait(timeout=1800) # 30 minutes for large system image downloads - - output = "\n".join(output_lines) - if returncode == 0: - return True, output - else: - return False, output - except subprocess.TimeoutExpired: - return False, "Installation timed out after 30 minutes" - except Exception as e: - return False, str(e) + system = platform.system() + pkg_label = device_id or system_image + print(f"[installer][emulator] Running: sdkmanager --sdk_root={sdk_root} {system_image}") + print(f"[installer][emulator] This may take 10-30 minutes to download system image...") -def _run_sdkmanager_install_linux(sdkmanager: Path, sdk_root: Path, system_image: str, loop=None, device_id: str = None) -> tuple[bool, str]: - """Install system image on Linux with real-time output""" - try: - if debug: - print(f"[installer][emulator] Running: sdkmanager --sdk_root={sdk_root} {system_image}") - print(f"[installer][emulator] This may take 10-30 minutes to download system image...") - - process = subprocess.Popen( - [str(sdkmanager), f"--sdk_root={sdk_root}", system_image], - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - bufsize=1 # Line buffered - ) - - # Print output in real-time as it comes, showing progress on single line - output_lines = [] - last_progress = "" - progress_count = [] - try: - for line in iter(process.stdout.readline, ''): - if line: - stripped = line.strip() - output_lines.append(stripped) - - # Extract progress percentage from lines like "[====] 25% Loading..." - progress_match = re.search(r'\[.*?\]\s*(\d+)%\s*(.+)', stripped) - if progress_match: - percent = int(progress_match.group(1)) - status = progress_match.group(2).strip() - current_progress = f"{percent}% {status}" - if current_progress != last_progress: - print(f"\r[installer][emulator] Download progress: {current_progress}", end='', flush=True) - last_progress = current_progress - - if loop and device_id: - rounded_percent = round(percent / 10) * 10 - if rounded_percent not in progress_count: - progress_count.append(rounded_percent) - asyncio.run_coroutine_threadsafe( - send_response({ - "action": "status", - "data": { - "category": "AndroidEmulator", - "package": device_id, - "status": "installing", - "comment": f"Downloading system image... {percent}% {status}", - } - }), - loop - ) - elif stripped and not stripped.startswith('[') and '%' not in stripped: - # Print important non-progress messages on new line - print(f"\n[installer][emulator] {stripped}") - elif stripped.endswith('%'): - # Handle lines that end with just percentage - percent_match = re.search(r'(\d+)%', stripped) - if percent_match: - percent = int(percent_match.group(1)) - print(f"\r[installer][emulator] Download progress: {stripped}", end='', flush=True) - - if loop and device_id: - rounded_percent = round(percent / 10) * 10 - if rounded_percent not in progress_count: - progress_count.append(rounded_percent) - asyncio.run_coroutine_threadsafe( - send_response({ - "action": "status", - "data": { - "category": "AndroidEmulator", - "package": device_id, - "status": "installing", - "comment": f"Downloading system image... {percent}%", - } - }), - loop - ) - except Exception as e: - print(f"\n[installer][emulator] Output reading error: {e}") - finally: - print() # New line after progress completes - - process.stdout.close() - returncode = process.wait(timeout=1800) # 30 minutes for large system image downloads - - output = "\n".join(output_lines) - if returncode == 0: - return True, output - else: - return False, output - except subprocess.TimeoutExpired: - return False, "Installation timed out after 30 minutes" - except Exception as e: - return False, str(e) + await send_response({ + "action": "status", + "data": { + "category": "AndroidEmulator", + "package": pkg_label, + "status": "installing", + "comment": f"Downloading system image {system_image}...", + } + }) + async def _on_line(stripped: str): + progress_match = re.search(r'(\d+)%', stripped) + if progress_match: + percent = int(progress_match.group(1)) + comment = f"Downloading system image... {percent}% {stripped}" + else: + comment = stripped + print(f"[installer][emulator] {stripped}") + await send_response({ + "action": "status", + "data": { + "category": "AndroidEmulator", + "package": pkg_label, + "status": "installing", + "comment": comment, + } + }) -def _run_sdkmanager_install_darwin(sdkmanager: Path, sdk_root: Path, system_image: str, loop=None, device_id: str = None) -> tuple[bool, str]: - """Install system image on macOS with real-time output""" - try: - if debug: - print(f"[installer][emulator] Running: sdkmanager --sdk_root={sdk_root} {system_image}") - print(f"[installer][emulator] This may take 10-30 minutes to download system image...") - - process = subprocess.Popen( - [str(sdkmanager), f"--sdk_root={sdk_root}", system_image], - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - bufsize=1 # Line buffered - ) - - # Print output in real-time as it comes, showing progress on single line - output_lines = [] - last_progress = "" - progress_count = [] - try: - for line in iter(process.stdout.readline, ''): - if line: - stripped = line.strip() - output_lines.append(stripped) - - # Extract progress percentage from lines like "[====] 25% Loading..." - progress_match = re.search(r'\[.*?\]\s*(\d+)%\s*(.+)', stripped) - if progress_match: - percent = int(progress_match.group(1)) - status = progress_match.group(2).strip() - current_progress = f"{percent}% {status}" - if current_progress != last_progress: - print(f"\r[installer][emulator] Download progress: {current_progress}", end='', flush=True) - last_progress = current_progress - - if loop and device_id: - rounded_percent = round(percent / 10) * 10 - if rounded_percent not in progress_count: - progress_count.append(rounded_percent) - asyncio.run_coroutine_threadsafe( - send_response({ - "action": "status", - "data": { - "category": "AndroidEmulator", - "package": device_id, - "status": "installing", - "comment": f"Downloading system image... {percent}% {status}", - } - }), - loop - ) - elif stripped and not stripped.startswith('[') and '%' not in stripped: - # Print important non-progress messages on new line - print(f"\n[installer][emulator] {stripped}") - elif stripped.endswith('%'): - # Handle lines that end with just percentage - percent_match = re.search(r'(\d+)%', stripped) - if percent_match: - percent = int(percent_match.group(1)) - print(f"\r[installer][emulator] Download progress: {stripped}", end='', flush=True) - - if loop and device_id: - rounded_percent = round(percent / 10) * 10 - if rounded_percent not in progress_count: - progress_count.append(rounded_percent) - asyncio.run_coroutine_threadsafe( - send_response({ - "action": "status", - "data": { - "category": "AndroidEmulator", - "package": device_id, - "status": "installing", - "comment": f"Downloading system image... {percent}%", - } - }), - loop - ) - except Exception as e: - print(f"\n[installer][emulator] Output reading error: {e}") - finally: - print() # New line after progress completes - - process.stdout.close() - returncode = process.wait(timeout=1800) # 30 minutes for large system image downloads - - output = "\n".join(output_lines) - if returncode == 0: - return True, output - else: - return False, output - except subprocess.TimeoutExpired: - return False, "Installation timed out after 30 minutes" - except Exception as e: - return False, str(e) + if system == "Windows": + # Windows: use PowerShell piping (PTY not available) + yes_responses = ";".join(["echo y"] * 20) + quoted_image = f"'{system_image}'" + shell_cmd = f'powershell -Command "{yes_responses} | &\\"{str(sdkmanager)}\\" --sdk_root={sdk_root} {quoted_image}"' + output_lines: list[str] = [] -def _run_avdmanager_create_windows(avdmanager: Path, sdk_root: Path, avd_name: str, system_image: str, device_id: str) -> tuple[bool, str]: - """Create AVD on Windows with real-time output""" - try: - # Create AVD: avdmanager create avd -n {avd_name} -k {system_image} -d {device_id} - # Answer "no" to custom hardware profile prompt - process = subprocess.Popen( - [str(avdmanager), "create", "avd", "-n", avd_name, "-k", system_image, "-d", device_id], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - bufsize=1 # Line buffered - ) - - # Send "no" to custom hardware profile prompt - process.stdin.write("no\n") - process.stdin.close() - - # Print output in real-time as it comes, showing progress on single line - output_lines = [] - last_progress = "" - try: - for line in iter(process.stdout.readline, ''): - if line: - stripped = line.strip() - output_lines.append(stripped) - - # Extract progress percentage from lines like "[====] 25% Loading..." - progress_match = re.search(r'\[.*?\]\s*(\d+)%\s*(.+)', stripped) - if progress_match: - percent = progress_match.group(1) - status = progress_match.group(2).strip() - current_progress = f"{percent}% {status}" - if current_progress != last_progress: - print(f"\r[installer][emulator] Download progress: {current_progress}", end='', flush=True) - last_progress = current_progress - elif stripped and not stripped.startswith('[') and '%' not in stripped: - # Print important non-progress messages on new line - print(f"\n[installer][emulator] {stripped}") - elif stripped.endswith('%'): - # Handle lines that end with just percentage - print(f"\r[installer][emulator] Download progress: {stripped}", end='', flush=True) - except Exception as e: - print(f"\n[installer][emulator] Output reading error: {e}") - finally: - print() # New line after progress completes - - process.stdout.close() - returncode = process.wait(timeout=120) - - output = "\n".join(output_lines) - if returncode == 0: - return True, output + async def _win_stream(proc: asyncio.subprocess.Process) -> int: + while True: + raw = await proc.stdout.readline() + if not raw: + break + line = raw.decode("utf-8", errors="replace").rstrip() + if not line: + continue + output_lines.append(line) + await _on_line(line.strip()) + return await proc.wait() + + proc = await asyncio.create_subprocess_shell( + shell_cmd, + stdin=asyncio.subprocess.DEVNULL, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + returncode = await _win_stream(proc) else: - return False, output - except subprocess.TimeoutExpired: - return False, "AVD creation timed out" - except Exception as e: - return False, str(e) - + # Linux / macOS: use PTY for unbuffered, \r-aware output + cmd = [str(sdkmanager), f"--sdk_root={sdk_root}", system_image] + returncode, output_lines = await pty_stream( + cmd, + stdin_data="y\n" * 20, # auto-accept licenses + on_line=_on_line, + timeout_s=1800, + ) -def _run_avdmanager_create_linux(avdmanager: Path, sdk_root: Path, avd_name: str, system_image: str, device_id: str) -> tuple[bool, str]: - """Create AVD on Linux with real-time output""" - try: - # Create AVD: avdmanager create avd -n {avd_name} -k {system_image} -d {device_id} - # Answer "no" to custom hardware profile prompt - process = subprocess.Popen( - [str(avdmanager), "create", "avd", "-n", avd_name, "-k", system_image, "-d", device_id], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - bufsize=1 # Line buffered - ) - - # Send "no" to custom hardware profile prompt - process.stdin.write("no\n") - process.stdin.close() - - # Print output in real-time as it comes, showing progress on single line - output_lines = [] - last_progress = "" - try: - for line in iter(process.stdout.readline, ''): - if line: - stripped = line.strip() - output_lines.append(stripped) - - # Extract progress percentage from lines like "[====] 25% Loading..." - progress_match = re.search(r'\[.*?\]\s*(\d+)%\s*(.+)', stripped) - if progress_match: - percent = progress_match.group(1) - status = progress_match.group(2).strip() - current_progress = f"{percent}% {status}" - if current_progress != last_progress: - print(f"\r[installer][emulator] Download progress: {current_progress}", end='', flush=True) - last_progress = current_progress - elif stripped and not stripped.startswith('[') and '%' not in stripped: - # Print important non-progress messages on new line - print(f"\n[installer][emulator] {stripped}") - elif stripped.endswith('%'): - # Handle lines that end with just percentage - print(f"\r[installer][emulator] Download progress: {stripped}", end='', flush=True) - except Exception as e: - print(f"\n[installer][emulator] Output reading error: {e}") - finally: - print() # New line after progress completes - - process.stdout.close() - returncode = process.wait(timeout=120) - output = "\n".join(output_lines) if returncode == 0: return True, output else: return False, output - except subprocess.TimeoutExpired: - return False, "AVD creation timed out" except Exception as e: return False, str(e) -def _run_avdmanager_create_darwin(avdmanager: Path, sdk_root: Path, avd_name: str, system_image: str, device_id: str) -> tuple[bool, str]: - """Create AVD on macOS with real-time output""" +async def _run_avdmanager_create(avdmanager: Path, sdk_root: Path, avd_name: str, system_image: str, device_id: str) -> tuple[bool, str]: + """Create AVD with real-time output streamed via send_response.""" try: - # Create AVD: avdmanager create avd -n {avd_name} -k {system_image} -d {device_id} - # Answer "no" to custom hardware profile prompt - process = subprocess.Popen( - [str(avdmanager), "create", "avd", "-n", avd_name, "-k", system_image, "-d", device_id], - stdin=subprocess.PIPE, - stdout=subprocess.PIPE, - stderr=subprocess.STDOUT, - text=True, - bufsize=1 # Line buffered - ) - - # Send "no" to custom hardware profile prompt - process.stdin.write("no\n") - process.stdin.close() - - # Print output in real-time as it comes, showing progress on single line - output_lines = [] - last_progress = "" - try: - for line in iter(process.stdout.readline, ''): - if line: - stripped = line.strip() - output_lines.append(stripped) - - # Extract progress percentage from lines like "[====] 25% Loading..." - progress_match = re.search(r'\[.*?\]\s*(\d+)%\s*(.+)', stripped) - if progress_match: - percent = progress_match.group(1) - status = progress_match.group(2).strip() - current_progress = f"{percent}% {status}" - if current_progress != last_progress: - print(f"\r[installer][emulator] Download progress: {current_progress}", end='', flush=True) - last_progress = current_progress - elif stripped and not stripped.startswith('[') and '%' not in stripped: - # Print important non-progress messages on new line - print(f"\n[installer][emulator] {stripped}") - elif stripped.endswith('%'): - # Handle lines that end with just percentage - print(f"\r[installer][emulator] Download progress: {stripped}", end='', flush=True) - except Exception as e: - print(f"\n[installer][emulator] Output reading error: {e}") - finally: - print() # New line after progress completes - - process.stdout.close() - returncode = process.wait(timeout=120) - + cmd = [str(avdmanager), "create", "avd", "-n", avd_name, "-k", system_image, "-d", device_id] + + print(f"[installer][emulator] Running: {' '.join(cmd)}") + + await send_response({ + "action": "status", + "data": { + "category": "AndroidEmulator", + "package": device_id, + "status": "installing", + "comment": f"Creating AVD '{avd_name}'...", + } + }) + + async def _on_line(stripped: str): + print(f"[installer][emulator] {stripped}") + await send_response({ + "action": "status", + "data": { + "category": "AndroidEmulator", + "package": device_id, + "status": "installing", + "comment": stripped, + } + }) + + if platform.system() == "Windows": + # Windows: no PTY + output_lines: list[str] = [] + + async def _win_stream(proc: asyncio.subprocess.Process) -> int: + while True: + raw = await proc.stdout.readline() + if not raw: + break + line = raw.decode("utf-8", errors="replace").rstrip() + if not line: + continue + output_lines.append(line) + await _on_line(line.strip()) + return await proc.wait() + + proc = await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.PIPE, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + try: + proc.stdin.write(b"no\n") + await proc.stdin.drain() + proc.stdin.close() + except Exception: + pass + returncode = await _win_stream(proc) + else: + # Linux / macOS: use PTY + returncode, output_lines = await pty_stream( + cmd, + stdin_data="no\n", + on_line=_on_line, + timeout_s=120, + ) + output = "\n".join(output_lines) if returncode == 0: return True, output else: return False, output - except subprocess.TimeoutExpired: - return False, "AVD creation timed out" except Exception as e: return False, str(e) @@ -1595,48 +1273,9 @@ async def create_avd_from_system_image(device_param: str) -> bool: # Step 1: Install system image print(f"[installer][emulator] Installing system image: {system_image_name}") - loop = asyncio.get_event_loop() - if _is_windows(): - success, output = await loop.run_in_executor( - None, - _run_sdkmanager_install_windows, - sdkmanager, - sdk_root, - system_image_name, - loop, - device_id - ) - elif _is_linux(): - success, output = await loop.run_in_executor( - None, - _run_sdkmanager_install_linux, - sdkmanager, - sdk_root, - system_image_name, - loop, - device_id - ) - elif _is_darwin(): - success, output = await loop.run_in_executor( - None, - _run_sdkmanager_install_darwin, - sdkmanager, - sdk_root, - system_image_name, - loop, - device_id - ) - else: - # Fallback to Linux for unknown platforms - success, output = await loop.run_in_executor( - None, - _run_sdkmanager_install_linux, - sdkmanager, - sdk_root, - system_image_name, - loop, - device_id - ) + success, output = await _run_sdkmanager_install( + sdkmanager, sdk_root, system_image_name, device_id=device_id + ) if not success: error_msg = f"Failed to install Android Version 16: {output}" @@ -1666,47 +1305,9 @@ async def create_avd_from_system_image(device_param: str) -> bool: } }) - if _is_windows(): - success, output = await loop.run_in_executor( - None, - _run_avdmanager_create_windows, - avdmanager, - sdk_root, - avd_name, - system_image_name, - device_id - ) - elif _is_linux(): - success, output = await loop.run_in_executor( - None, - _run_avdmanager_create_linux, - avdmanager, - sdk_root, - avd_name, - system_image_name, - device_id - ) - elif _is_darwin(): - success, output = await loop.run_in_executor( - None, - _run_avdmanager_create_darwin, - avdmanager, - sdk_root, - avd_name, - system_image_name, - device_id - ) - else: - # Fallback to Linux for unknown platforms - success, output = await loop.run_in_executor( - None, - _run_avdmanager_create_linux, - avdmanager, - sdk_root, - avd_name, - system_image_name, - device_id - ) + success, output = await _run_avdmanager_create( + avdmanager, sdk_root, avd_name, system_image_name, device_id + ) if not success: error_msg = f"Failed to create AVD: {output}" diff --git a/Framework/install_handler/utils.py b/Framework/install_handler/utils.py index b268404a..08dd8743 100644 --- a/Framework/install_handler/utils.py +++ b/Framework/install_handler/utils.py @@ -1,6 +1,9 @@ import datetime import asyncio +import os +import re import platform +from typing import Callable, Awaitable from Framework.Utilities import RequestFormatter, ConfigModule, CommonUtil debug = False @@ -75,3 +78,133 @@ async def send_response(data=None) -> None: await asyncio.sleep(3,5) except Exception as e: print(f"[installer] Error sending response: {e}") + + +async def pty_stream( + cmd: list[str], + stdin_data: str | None = None, + on_line: Callable[[str], Awaitable[None]] | None = None, + timeout_s: int = 1800, +) -> tuple[int, list[str]]: + """ + Spawn *cmd* under a pseudo-terminal so the child never block-buffers + its stdout. Read output in chunks and split on ``\\r`` / ``\\n`` so + sdkmanager-style progress lines (``\\r[===] 34% Downloading...``) + arrive immediately instead of being held until the 8 KB pipe buffer fills. + + Falls back to a regular pipe on Windows (no PTY support). + + Returns ``(returncode, output_lines)``. + """ + if platform.system() == "Windows": + return await _pipe_stream(cmd, stdin_data, on_line, timeout_s) + + import pty as pty_mod + + master_fd, slave_fd = pty_mod.openpty() + + proc = await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.PIPE if stdin_data else asyncio.subprocess.DEVNULL, + stdout=slave_fd, + stderr=slave_fd, + ) + os.close(slave_fd) # parent only needs the master side + + if stdin_data: + try: + proc.stdin.write(stdin_data.encode()) + await proc.stdin.drain() + proc.stdin.close() + except Exception: + pass + + output_lines: list[str] = [] + loop = asyncio.get_event_loop() + buf = "" + + while True: + try: + data: bytes = await asyncio.wait_for( + loop.run_in_executor(None, os.read, master_fd, 4096), + timeout=timeout_s, + ) + except OSError: + # EIO when slave side closes (child exited) + break + except asyncio.TimeoutError: + break + if not data: + break + + buf += data.decode("utf-8", errors="replace") + + # Split on any line-ending (\r\n, \r, or \n) + parts = re.split(r"\r\n|\r|\n", buf) + buf = parts[-1] # keep the incomplete tail + for part in parts[:-1]: + line = part.strip() + if not line: + continue + # strip ANSI escape codes the PTY may inject + line = re.sub(r"\x1b\[[0-9;]*[A-Za-z]", "", line) + if not line: + continue + output_lines.append(line) + if on_line: + await on_line(line) + + # flush any remaining partial line + remaining = buf.strip() + if remaining: + remaining = re.sub(r"\x1b\[[0-9;]*[A-Za-z]", "", remaining) + if remaining: + output_lines.append(remaining) + if on_line: + await on_line(remaining) + + try: + os.close(master_fd) + except OSError: + pass + + returncode = await proc.wait() + return returncode, output_lines + + +async def _pipe_stream( + cmd: list[str], + stdin_data: str | None = None, + on_line: Callable[[str], Awaitable[None]] | None = None, + timeout_s: int = 1800, +) -> tuple[int, list[str]]: + """Fallback for Windows: plain pipe + readline.""" + proc = await asyncio.create_subprocess_exec( + *cmd, + stdin=asyncio.subprocess.PIPE if stdin_data else asyncio.subprocess.DEVNULL, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.STDOUT, + ) + + if stdin_data: + try: + proc.stdin.write(stdin_data.encode()) + await proc.stdin.drain() + proc.stdin.close() + except Exception: + pass + + output_lines: list[str] = [] + while True: + raw = await proc.stdout.readline() + if not raw: + break + line = raw.decode("utf-8", errors="replace").strip() + if not line: + continue + output_lines.append(line) + if on_line: + await on_line(line) + + returncode = await proc.wait() + return returncode, output_lines diff --git a/server/installers.py b/server/installers.py new file mode 100644 index 00000000..b5925457 --- /dev/null +++ b/server/installers.py @@ -0,0 +1,623 @@ +import asyncio +import importlib +import inspect +import json +import os +import threading +import time +import uuid +from collections import defaultdict, deque +from contextvars import ContextVar +from dataclasses import dataclass, field +from typing import Any, Literal + +from fastapi import APIRouter, HTTPException +from fastapi.responses import StreamingResponse +from pydantic import BaseModel + +from Framework.install_handler import utils as install_utils +from Framework.install_handler.route import services as INSTALLER_SERVICES +from Framework.install_handler.system_info.system_info import get_formatted_system_info +from Framework.install_handler.android.emulator import ( + android_emulator_install, + check_emulator_list, + create_avd_from_system_image, + launch_avd, +) + + +router = APIRouter(prefix="/installer", tags=["installer"]) + +# --- Configuration --- # + +MAX_CONCURRENCY = int(os.getenv("INSTALLER_MAX_CONCURRENCY", "2")) +EVENT_HISTORY = int(os.getenv("INSTALLER_EVENT_HISTORY", "200")) +FORWARD_TO_REMOTE = os.getenv("INSTALLER_FORWARD_REMOTE", "").lower() in ( + "1", + "true", + "yes", +) +ANDROID_CATEGORIES = {"Android", "AndroidEmulator"} + +# --- Models --- # + +JobStatus = Literal["queued", "running", "succeeded", "failed"] + + +class ServiceRequest(BaseModel): + category: str + name: str + user_password: str | None = None + request_id: str | None = None + +class AndroidEmulatorCreateRequest(BaseModel): + device_id: str + device_name: str + request_id: str | None = None + + +class AndroidEmulatorLaunchRequest(BaseModel): + name: str + request_id: str | None = None + + +class JobCreateResponse(BaseModel): + job_id: str + status: JobStatus + action: str + category: str | None + name: str | None + request_id: str | None + submitted_at: float + + +class JobStatusResponse(BaseModel): + job_id: str + status: JobStatus + action: str + category: str | None + name: str | None + request_id: str | None + created_at: float + updated_at: float + error: str | None + result: Any | None + last_event: dict | None + + +class ServicesResponse(BaseModel): + node_id: str + generated_at: float + services: list[dict] + + +class SystemInfoResponse(BaseModel): + node_id: str + generated_at: float + data: dict + + +# --- Event Bus --- # + + +class EventBus: + def __init__(self) -> None: + self._subscribers: dict[str, set[asyncio.Queue]] = defaultdict(set) + self._lock = threading.Lock() + + def subscribe(self, job_id: str) -> asyncio.Queue: + queue: asyncio.Queue = asyncio.Queue(maxsize=EVENT_HISTORY) + with self._lock: + self._subscribers[job_id].add(queue) + return queue + + def unsubscribe(self, job_id: str, queue: asyncio.Queue) -> None: + with self._lock: + queues = self._subscribers.get(job_id) + if not queues: + return + queues.discard(queue) + if not queues: + self._subscribers.pop(job_id, None) + + def publish(self, event: dict) -> None: + job_id = event.get("job_id") + with self._lock: + targets = list(self._subscribers.get(job_id, set())) + targets += list(self._subscribers.get("*", set())) + for queue in targets: + try: + queue.put_nowait(event) + except asyncio.QueueFull: + try: + _ = queue.get_nowait() + except asyncio.QueueEmpty: + pass + try: + queue.put_nowait(event) + except asyncio.QueueFull: + pass + + +# --- Job Store --- # + + +@dataclass +class Job: + id: str + action: str + category: str | None + name: str | None + request_id: str | None + payload: dict | None + status: JobStatus = "queued" + created_at: float = field(default_factory=time.time) + updated_at: float = field(default_factory=time.time) + error: str | None = None + result: Any | None = None + last_event: dict | None = None + events: deque = field(default_factory=lambda: deque(maxlen=EVENT_HISTORY)) + + +class JobStore: + def __init__(self) -> None: + self._jobs: dict[str, Job] = {} + self._lock = threading.Lock() + + def add(self, job: Job) -> None: + with self._lock: + self._jobs[job.id] = job + + def get(self, job_id: str) -> Job | None: + with self._lock: + return self._jobs.get(job_id) + + def update(self, job_id: str, **kwargs: Any) -> None: + with self._lock: + job = self._jobs.get(job_id) + if not job: + return + for key, value in kwargs.items(): + setattr(job, key, value) + job.updated_at = time.time() + + def add_event(self, job_id: str, event: dict) -> None: + with self._lock: + job = self._jobs.get(job_id) + if not job: + return + job.last_event = event + job.events.append(event) + + def find_active( + self, action: str, category: str | None, name: str | None + ) -> Job | None: + """Return an existing queued/running job for the same action+category+name.""" + with self._lock: + for job in self._jobs.values(): + if ( + job.status in ("queued", "running") + and job.action == action + and job.category == category + and job.name == name + ): + return job + return None + + +EVENT_BUS = EventBus() +JOB_STORE = JobStore() +SEM = asyncio.Semaphore(MAX_CONCURRENCY) + +_event_context: ContextVar[str | None] = ContextVar("installer_job_id", default=None) +ORIGINAL_SEND_RESPONSE = install_utils.send_response + + +def _make_event(job_id: str, event_type: str, payload: dict | None) -> dict: + return { + "event_id": str(uuid.uuid4()), + "job_id": job_id, + "timestamp": time.time(), + "type": event_type, + "payload": payload, + "node_id": install_utils.read_node_id(), + "version": install_utils.version, + } + + +async def send_response_proxy(data: dict | None = None) -> None: + job_id = _event_context.get() + if job_id: + event = _make_event(job_id, "installer.update", data or {}) + JOB_STORE.add_event(job_id, event) + EVENT_BUS.publish(event) + + if job_id is None or FORWARD_TO_REMOTE: + await ORIGINAL_SEND_RESPONSE(data) + + +def _patch_send_response_targets() -> None: + modules: set[str] = set() + + # Collect modules from Android-only service registry + for category in INSTALLER_SERVICES: + if category.get("category") not in ANDROID_CATEGORIES: + continue + for key in ("install_function", "status_function"): + func = category.get(key) + if func: + modules.add(func.__module__) + for service in category.get("services", []): + for key in ("install_function", "status_function"): + func = service.get(key) + if func: + modules.add(func.__module__) + + # Explicitly add emulator module + modules.add("Framework.install_handler.android.emulator") + + # Patch modules that use send_response + for mod_name in modules: + mod = importlib.import_module(mod_name) + if hasattr(mod, "send_response"): + setattr(mod, "send_response", send_response_proxy) + + # Patch utils as well (for any late imports) + install_utils.send_response = send_response_proxy + + +_patch_send_response_targets() + + +# --- Helpers --- # + + +def _find_category(category_name: str) -> dict: + if category_name not in ANDROID_CATEGORIES: + raise KeyError(f"Unsupported category: {category_name}") + for category in INSTALLER_SERVICES: + if category.get("category") == category_name: + return category + raise KeyError(f"Unknown category: {category_name}") + + +def _find_service(category: dict, service_name: str) -> dict: + for service in category.get("services", []): + if service.get("name") == service_name: + return service + raise KeyError(f"Unknown service: {service_name}") + + +async def _call_install_function(func, user_password: str | None = None): + if func is None: + raise RuntimeError("Install function not defined") + + sig = inspect.signature(func) + if len(sig.parameters) > 0: + return await _maybe_await(func, user_password or "") + return await _maybe_await(func) + + +async def _call_status_function(func): + if func is None: + raise RuntimeError("Status function not defined") + return await _maybe_await(func) + + +async def _maybe_await(func, *args, **kwargs): + if inspect.iscoroutinefunction(func): + return await func(*args, **kwargs) + return await asyncio.to_thread(func, *args, **kwargs) + + +async def _run_job(job: Job) -> None: + async with SEM: + JOB_STORE.update(job.id, status="running") + EVENT_BUS.publish(_make_event(job.id, "job.started", None)) + token = _event_context.set(job.id) + try: + result = await _dispatch_job(job) + JOB_STORE.update(job.id, status="succeeded", result=result) + EVENT_BUS.publish(_make_event(job.id, "job.completed", {"result": result})) + except Exception as exc: + JOB_STORE.update(job.id, status="failed", error=str(exc)) + EVENT_BUS.publish( + _make_event(job.id, "job.failed", {"error": str(exc)}) + ) + finally: + _event_context.reset(token) + + +async def _dispatch_job(job: Job) -> Any: + action = job.action + + if action == "install": + category = _find_category(job.category or "") + if category.get("category") == "AndroidEmulator": + raise RuntimeError("Use the emulator endpoints for AndroidEmulator") + service = _find_service(category, job.name or "") + if install_utils.current_os not in service.get("os", []): + raise RuntimeError("Service not supported on current OS") + return await _call_install_function( + service.get("install_function"), + job.payload.get("user_password") if job.payload else None, + ) + + if action == "status": + category = _find_category(job.category or "") + if category.get("category") == "AndroidEmulator": + raise RuntimeError("Use the emulator endpoints for AndroidEmulator") + service = _find_service(category, job.name or "") + if install_utils.current_os not in service.get("os", []): + raise RuntimeError("Service not supported on current OS") + return await _call_status_function(service.get("status_function")) + + if action == "android_emulator_refresh": + return await _maybe_await(check_emulator_list) + + if action == "android_emulator_list_installables": + return await _maybe_await(android_emulator_install) + + if action == "android_emulator_create": + device_id = job.payload.get("device_id") if job.payload else None + device_name = job.payload.get("device_name") if job.payload else None + if not device_id or not device_name: + raise RuntimeError("device_id and device_name are required") + device_param = f"install device;{device_id};{device_name}" + return await _maybe_await(create_avd_from_system_image, device_param) + + if action == "android_emulator_launch": + name = job.payload.get("name") if job.payload else None + if not name: + raise RuntimeError("name is required") + return await _maybe_await(launch_avd, name) + + raise RuntimeError(f"Unsupported action: {action}") + + +def _submit_job( + action: str, + category: str | None = None, + name: str | None = None, + payload: dict | None = None, + request_id: str | None = None, +) -> Job: + # Deduplicate: if an identical job is already queued/running, return it. + existing = JOB_STORE.find_active(action, category, name) + if existing: + return existing + + job = Job( + id=str(uuid.uuid4()), + action=action, + category=category, + name=name, + request_id=request_id, + payload=payload or {}, + ) + JOB_STORE.add(job) + EVENT_BUS.publish(_make_event(job.id, "job.queued", None)) + asyncio.create_task(_run_job(job)) + return job + + +# --- Routes --- # + + +@router.get("/services", response_model=ServicesResponse) +async def services_list(): + services = [ + svc + for svc in install_utils.generate_services_list(INSTALLER_SERVICES) + if svc.get("category") in ANDROID_CATEGORIES + ] + return ServicesResponse( + node_id=install_utils.read_node_id(), + generated_at=time.time(), + services=services, + ) + + +@router.get("/system-info", response_model=SystemInfoResponse) +async def system_info(): + info = await get_formatted_system_info() + return SystemInfoResponse( + node_id=install_utils.read_node_id(), + generated_at=time.time(), + data=info, + ) + + +@router.post("/jobs/install", response_model=JobCreateResponse) +async def install_service(req: ServiceRequest): + try: + category = _find_category(req.category) + if category.get("category") == "AndroidEmulator": + raise HTTPException( + status_code=400, + detail="Use the emulator endpoints for AndroidEmulator", + ) + service = _find_service(category, req.name) + except KeyError as exc: + raise HTTPException(status_code=404, detail=str(exc)) + + if install_utils.current_os not in service.get("os", []): + raise HTTPException(status_code=400, detail="Service not supported on current OS") + + job = _submit_job( + action="install", + category=req.category, + name=req.name, + payload={"user_password": req.user_password}, + request_id=req.request_id, + ) + return JobCreateResponse( + job_id=job.id, + status=job.status, + action=job.action, + category=job.category, + name=job.name, + request_id=job.request_id, + submitted_at=job.created_at, + ) + + +@router.post("/jobs/status", response_model=JobCreateResponse) +async def status_service(req: ServiceRequest): + try: + category = _find_category(req.category) + if category.get("category") == "AndroidEmulator": + raise HTTPException( + status_code=400, + detail="Use the emulator endpoints for AndroidEmulator", + ) + service = _find_service(category, req.name) + except KeyError as exc: + raise HTTPException(status_code=404, detail=str(exc)) + + if install_utils.current_os not in service.get("os", []): + raise HTTPException(status_code=400, detail="Service not supported on current OS") + + job = _submit_job( + action="status", + category=req.category, + name=req.name, + payload={}, + request_id=req.request_id, + ) + return JobCreateResponse( + job_id=job.id, + status=job.status, + action=job.action, + category=job.category, + name=job.name, + request_id=job.request_id, + submitted_at=job.created_at, + ) + + + + +@router.post("/jobs/android-emulator/refresh-installed", response_model=JobCreateResponse) +async def android_emulator_refresh(): + job = _submit_job(action="android_emulator_refresh") + return JobCreateResponse( + job_id=job.id, + status=job.status, + action=job.action, + category=None, + name=None, + request_id=None, + submitted_at=job.created_at, + ) + + +@router.post("/jobs/android-emulator/list-installables", response_model=JobCreateResponse) +async def android_emulator_list_installables(): + job = _submit_job(action="android_emulator_list_installables") + return JobCreateResponse( + job_id=job.id, + status=job.status, + action=job.action, + category=None, + name=None, + request_id=None, + submitted_at=job.created_at, + ) + + +@router.post("/jobs/android-emulator/create", response_model=JobCreateResponse) +async def android_emulator_create(req: AndroidEmulatorCreateRequest): + job = _submit_job( + action="android_emulator_create", + payload={"device_id": req.device_id, "device_name": req.device_name}, + request_id=req.request_id, + ) + return JobCreateResponse( + job_id=job.id, + status=job.status, + action=job.action, + category=None, + name=None, + request_id=job.request_id, + submitted_at=job.created_at, + ) + + +@router.post("/jobs/android-emulator/launch", response_model=JobCreateResponse) +async def android_emulator_launch(req: AndroidEmulatorLaunchRequest): + job = _submit_job( + action="android_emulator_launch", + payload={"name": req.name}, + request_id=req.request_id, + ) + return JobCreateResponse( + job_id=job.id, + status=job.status, + action=job.action, + category=None, + name=None, + request_id=job.request_id, + submitted_at=job.created_at, + ) + + +@router.get("/jobs/{job_id}", response_model=JobStatusResponse) +async def job_status(job_id: str): + job = JOB_STORE.get(job_id) + if not job: + raise HTTPException(status_code=404, detail="Job not found") + return JobStatusResponse( + job_id=job.id, + status=job.status, + action=job.action, + category=job.category, + name=job.name, + request_id=job.request_id, + created_at=job.created_at, + updated_at=job.updated_at, + error=job.error, + result=job.result, + last_event=job.last_event, + ) + + +@router.get("/jobs/{job_id}/events") +async def job_events(job_id: str): + if not JOB_STORE.get(job_id): + raise HTTPException(status_code=404, detail="Job not found") + + queue = EVENT_BUS.subscribe(job_id) + + async def event_stream(): + try: + yield ": connected\n\n" + while True: + event = await queue.get() + yield f"event: {event.get('type', 'message')}\n" + yield f"data: {json.dumps(event)}\n\n" + except asyncio.CancelledError: + pass + finally: + EVENT_BUS.unsubscribe(job_id, queue) + + return StreamingResponse(event_stream(), media_type="text/event-stream") + + +@router.get("/events") +async def all_events(): + queue = EVENT_BUS.subscribe("*") + + async def event_stream(): + try: + yield ": connected\n\n" + while True: + event = await queue.get() + yield f"event: {event.get('type', 'message')}\n" + yield f"data: {json.dumps(event)}\n\n" + except asyncio.CancelledError: + pass + finally: + EVENT_BUS.unsubscribe("*", queue) + + return StreamingResponse(event_stream(), media_type="text/event-stream") diff --git a/server/main.py b/server/main.py index 96213cfd..8098c5ae 100644 --- a/server/main.py +++ b/server/main.py @@ -11,6 +11,7 @@ from server.mobile import router as mobile_router, upload_android_ui_dump from server.mac import router as mac_router from server.linux import router as linux_router +from server.installers import router as installers_router import asyncio class EndpointFilter(logging.Filter): @@ -42,6 +43,7 @@ def main() -> FastAPI: v1router.include_router(mobile_router) v1router.include_router(mac_router) v1router.include_router(linux_router) + v1router.include_router(installers_router) app = FastAPI() app.include_router(v1router) From 6b20ce328f83b647687dcf2f63524740c72f7b16 Mon Sep 17 00:00:00 2001 From: Mahmudul Alam Date: Wed, 11 Feb 2026 06:26:12 +0600 Subject: [PATCH 2/3] Revert android_sdk and emulator update style to original --- .../install_handler/android/android_sdk.py | 211 +++--- Framework/install_handler/android/emulator.py | 675 ++++++++++++++---- Framework/install_handler/utils.py | 135 +--- 3 files changed, 646 insertions(+), 375 deletions(-) diff --git a/Framework/install_handler/android/android_sdk.py b/Framework/install_handler/android/android_sdk.py index 31f6ff92..28d11d1a 100644 --- a/Framework/install_handler/android/android_sdk.py +++ b/Framework/install_handler/android/android_sdk.py @@ -6,7 +6,7 @@ import subprocess from pathlib import Path import httpx -from Framework.install_handler.utils import send_response, pty_stream +from Framework.install_handler.utils import send_response from settings import ZEUZ_NODE_DOWNLOADS_DIR @@ -289,110 +289,116 @@ def _find_sdkmanager(sdk_root: Path) -> Path | None: -async def _run_sdkmanager(sdk_root: Path, args: list[str], component_label: str | None = None) -> bool: - """Run sdkmanager with the given args, streaming output lines to send_response in real-time.""" +async def _run_sdkmanager(sdk_root: Path, args: list[str]) -> bool: try: sdkmanager = _find_sdkmanager(sdk_root) if not sdkmanager: print("[installer][android-sdk] sdkmanager not found") return False - + import asyncio - - label = component_label or " ".join(args) + import subprocess + system = platform.system() - - async def _send_line(line: str): - """Send a single output line as a progress event.""" - stripped = line.strip() - if not stripped: - return - print(stripped) - await send_response({ - "action": "status", - "data": { - "category": "Android", - "name": "Android SDK", - "status": "installing", - "comment": f"[{label}] {stripped}", - } - }) - - await send_response({ - "action": "status", - "data": { - "category": "Android", - "name": "Android SDK", - "status": "installing", - "comment": f"Installing {label}...", - } - }) - + output = None # Initialize for later use + if system == "Windows": + # Windows - use PowerShell to pipe 'y' responses for auto-accepting licenses + # Quote each argument individually to prevent PowerShell from interpreting semicolons yes_responses = ";".join(["echo y"] * 20) + # Wrap each arg in single quotes to preserve semicolons in package names like "platforms;android-36" quoted_args = " ".join([f"'{arg}'" for arg in args]) shell_cmd = f'powershell -Command "{yes_responses} | &\\"{str(sdkmanager)}\\" --sdk_root={sdk_root} {quoted_args}"' print(f"[installer][android-sdk] Running: sdkmanager {' '.join(args)}") - - output_lines: list[str] = [] - - async def _win_stream(proc: asyncio.subprocess.Process) -> int: - while True: - raw = await proc.stdout.readline() - if not raw: - break - line = raw.decode("utf-8", errors="replace").rstrip() - if not line: - continue - output_lines.append(line) - await _send_line(line) - return await proc.wait() - - proc = await asyncio.create_subprocess_shell( - shell_cmd, - stdin=asyncio.subprocess.DEVNULL, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, + print(f"[installer][android-sdk] This may take 5-15 minutes to download ~450MB of components...") + + loop = asyncio.get_event_loop() + + # Use Popen and print output in real-time + def run_sdkmanager(): + process = subprocess.Popen( + shell_cmd, + shell=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1 # Line buffered + ) + # Close stdin - PowerShell piping will provide input + process.stdin.close() + + # Print output in real-time as it comes + output_lines = [] + try: + for line in iter(process.stdout.readline, ''): + if line: + print(line.rstrip()) # Print immediately + output_lines.append(line.strip()) + except Exception as e: + print(f"[installer][android-sdk] Output reading error: {e}") + + process.stdout.close() + returncode = process.wait(timeout=1800) + + # Return last 50 lines for debugging + return returncode, "\n".join(output_lines[-50:]) if output_lines else "" + + returncode, output = await loop.run_in_executor(None, run_sdkmanager) + + class Result: + pass + result = Result() + result.returncode = returncode + elif system == "Linux": + # Linux can execute directly + cmd = [str(sdkmanager), f"--sdk_root={sdk_root}"] + args + print(f"[installer][android-sdk] Running: {' '.join(cmd)}") + + loop = asyncio.get_event_loop() + result = await loop.run_in_executor( + None, + lambda: subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=1800 # 30 minutes timeout + ) ) - returncode = await _win_stream(proc) - elif system in ("Linux", "Darwin"): + output = (result.stdout or "") + (result.stderr or "") + elif system == "Darwin": + # macOS can execute directly cmd = [str(sdkmanager), f"--sdk_root={sdk_root}"] + args print(f"[installer][android-sdk] Running: {' '.join(cmd)}") - - returncode, output_lines = await pty_stream( - cmd, - stdin_data="y\n" * 20, - on_line=_send_line, - timeout_s=1800, + + loop = asyncio.get_event_loop() + result = await loop.run_in_executor( + None, + lambda: subprocess.run( + cmd, + capture_output=True, + text=True, + timeout=1800 # 30 minutes timeout + ) ) + output = (result.stdout or "") + (result.stderr or "") else: print(f"[installer][android-sdk] Unsupported platform: {system}") return False - - if returncode != 0: - print(f"[installer][android-sdk] sdkmanager failed (returncode={returncode})") - await send_response({ - "action": "status", - "data": { - "category": "Android", - "name": "Android SDK", - "status": "installing", - "comment": f"Failed to install {label} (exit code {returncode})", - } - }) + + if result.returncode != 0: + print(f"[installer][android-sdk] sdkmanager failed (returncode={result.returncode})") + if output: + print(f"[installer][android-sdk] Last output:\n{output}") return False - - print(f"[installer][android-sdk] sdkmanager completed successfully for {label}") - await send_response({ - "action": "status", - "data": { - "category": "Android", - "name": "Android SDK", - "status": "installing", - "comment": f"{label} installed successfully", - } - }) + + print(f"[installer][android-sdk] sdkmanager completed successfully") + if output: + print(f"[installer][android-sdk] Final output:\n{output[-500:]}") # Last 500 chars return True + except subprocess.TimeoutExpired: + print("[installer][android-sdk] sdkmanager timed out after 30 minutes") + return False except Exception as e: print(f"[installer][android-sdk] sdkmanager error: {e}") import traceback @@ -518,27 +524,26 @@ async def install() -> bool: # Continue; some environments prompt-less acceptance may not be required - # Install core components one at a time for per-component progress + # Install core components core_components = [ - ("platform-tools", "platform-tools"), - ("emulator", "emulator"), - ("platforms;android-36", "platforms (Android 36)"), - ("build-tools;34.0.0", "build-tools 34.0.0"), + "platform-tools", + "emulator", + # A recent platform and build-tools; adjust if needed + "platforms;android-36", + "build-tools;34.0.0", ] - - for i, (pkg, label) in enumerate(core_components, 1): - await send_response({ - "action": "status", - "data": { - "category": "Android", - "name": "Android SDK", - "status": "installing", - "comment": f"Installing component {i}/{len(core_components)}: {label}...", - } - }) - if not await _run_sdkmanager(sdk_root, [pkg], component_label=label): - print(f"[installer][android-sdk] Failed installing {label}") - return False + await send_response({ + "action": "status", + "data": { + "category": "Android", + "name": "Android SDK", + "status": "installing", + "comment": "Installing SDK components (platform-tools, emulator, platforms, build-tools)...", + } + }) + if not await _run_sdkmanager(sdk_root, core_components): + print("[installer][android-sdk] Failed installing one or more SDK components") + return False # Update PATH after successful installation diff --git a/Framework/install_handler/android/emulator.py b/Framework/install_handler/android/emulator.py index 807a3f6b..690612e7 100644 --- a/Framework/install_handler/android/emulator.py +++ b/Framework/install_handler/android/emulator.py @@ -6,7 +6,7 @@ import random from pathlib import Path from settings import ZEUZ_NODE_DOWNLOADS_DIR -from Framework.install_handler.utils import send_response, debug, pty_stream +from Framework.install_handler.utils import send_response, debug from Framework.install_handler.android.android_sdk import _get_sdk_root @@ -805,161 +805,483 @@ def _generate_avd_name(android_version: str, existing_avds: list[str]) -> str: return f"{android_version}-{word1}-{word2}-{random_num}" -async def _run_sdkmanager_install(sdkmanager: Path, sdk_root: Path, system_image: str, device_id: str | None = None) -> tuple[bool, str]: - """Install system image with real-time output streamed via send_response.""" +def _run_sdkmanager_install_windows(sdkmanager: Path, sdk_root: Path, system_image: str, loop=None, device_id: str = None) -> tuple[bool, str]: + """Install system image on Windows with real-time output""" try: - system = platform.system() - pkg_label = device_id or system_image - - print(f"[installer][emulator] Running: sdkmanager --sdk_root={sdk_root} {system_image}") - print(f"[installer][emulator] This may take 10-30 minutes to download system image...") - - await send_response({ - "action": "status", - "data": { - "category": "AndroidEmulator", - "package": pkg_label, - "status": "installing", - "comment": f"Downloading system image {system_image}...", - } - }) - - async def _on_line(stripped: str): - progress_match = re.search(r'(\d+)%', stripped) - if progress_match: - percent = int(progress_match.group(1)) - comment = f"Downloading system image... {percent}% {stripped}" - else: - comment = stripped - print(f"[installer][emulator] {stripped}") - await send_response({ - "action": "status", - "data": { - "category": "AndroidEmulator", - "package": pkg_label, - "status": "installing", - "comment": comment, - } - }) - - if system == "Windows": - # Windows: use PowerShell piping (PTY not available) - yes_responses = ";".join(["echo y"] * 20) - quoted_image = f"'{system_image}'" - shell_cmd = f'powershell -Command "{yes_responses} | &\\"{str(sdkmanager)}\\" --sdk_root={sdk_root} {quoted_image}"' - - output_lines: list[str] = [] - - async def _win_stream(proc: asyncio.subprocess.Process) -> int: - while True: - raw = await proc.stdout.readline() - if not raw: - break - line = raw.decode("utf-8", errors="replace").rstrip() - if not line: - continue - output_lines.append(line) - await _on_line(line.strip()) - return await proc.wait() - - proc = await asyncio.create_subprocess_shell( - shell_cmd, - stdin=asyncio.subprocess.DEVNULL, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, - ) - returncode = await _win_stream(proc) + # Use PowerShell to handle the command properly and auto-accept licenses + # This approach pipes 'y' responses to automatically accept licenses + yes_responses = ";".join(["echo y"] * 20) + quoted_image = f"'{system_image}'" + shell_cmd = f'powershell -Command "{yes_responses} | &\\"{str(sdkmanager)}\\" --sdk_root={sdk_root} {quoted_image}"' + + if debug: + print(f"[installer][emulator] Running: sdkmanager --sdk_root={sdk_root} {system_image}") + print(f"[installer][emulator] This may take 10-30 minutes to download system image...") + + process = subprocess.Popen( + shell_cmd, + shell=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1 # Line buffered + ) + # Close stdin - PowerShell piping will provide input + process.stdin.close() + + # Print output in real-time as it comes, showing progress on single line + output_lines = [] + last_progress = "" + progress_count = [] + try: + for line in iter(process.stdout.readline, ''): + if line: + stripped = line.strip() + output_lines.append(stripped) + + # Extract progress percentage from lines like "[====] 25% Loading..." + progress_match = re.search(r'\[.*?\]\s*(\d+)%\s*(.+)', stripped) + if progress_match: + percent = int(progress_match.group(1)) + status = progress_match.group(2).strip() + current_progress = f"{percent}% {status}" + if current_progress != last_progress: + print(f"\r[installer][emulator] Download progress: {current_progress}", end='', flush=True) + last_progress = current_progress + + if loop and device_id: + rounded_percent = round(percent / 10) * 10 + if rounded_percent not in progress_count: + progress_count.append(rounded_percent) + asyncio.run_coroutine_threadsafe( + send_response({ + "action": "status", + "data": { + "category": "AndroidEmulator", + "package": device_id, + "status": "installing", + "comment": f"Downloading system image... {percent}% {status}", + } + }), + loop + ) + elif stripped and not stripped.startswith('[') and '%' not in stripped: + # Print important non-progress messages on new line + print(f"\n[installer][emulator] {stripped}") + elif stripped.endswith('%'): + # Handle lines that end with just percentage + percent_match = re.search(r'(\d+)%', stripped) + if percent_match: + percent = int(percent_match.group(1)) + print(f"\r[installer][emulator] Download progress: {stripped}", end='', flush=True) + + if loop and device_id: + rounded_percent = round(percent / 10) * 10 + if rounded_percent not in progress_count: + progress_count.append(rounded_percent) + asyncio.run_coroutine_threadsafe( + send_response({ + "action": "status", + "data": { + "category": "AndroidEmulator", + "package": device_id, + "status": "installing", + "comment": f"Downloading system image... {percent}%", + } + }), + loop + ) + except Exception as e: + print(f"\n[installer][emulator] Output reading error: {e}") + finally: + print() # New line after progress completes + + process.stdout.close() + returncode = process.wait(timeout=1800) # 30 minutes for large system image downloads + + output = "\n".join(output_lines) + if returncode == 0: + return True, output else: - # Linux / macOS: use PTY for unbuffered, \r-aware output - cmd = [str(sdkmanager), f"--sdk_root={sdk_root}", system_image] - returncode, output_lines = await pty_stream( - cmd, - stdin_data="y\n" * 20, # auto-accept licenses - on_line=_on_line, - timeout_s=1800, - ) + return False, output + except subprocess.TimeoutExpired: + return False, "Installation timed out after 30 minutes" + except Exception as e: + return False, str(e) + +def _run_sdkmanager_install_linux(sdkmanager: Path, sdk_root: Path, system_image: str, loop=None, device_id: str = None) -> tuple[bool, str]: + """Install system image on Linux with real-time output""" + try: + if debug: + print(f"[installer][emulator] Running: sdkmanager --sdk_root={sdk_root} {system_image}") + print(f"[installer][emulator] This may take 10-30 minutes to download system image...") + + process = subprocess.Popen( + [str(sdkmanager), f"--sdk_root={sdk_root}", system_image], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1 # Line buffered + ) + + # Print output in real-time as it comes, showing progress on single line + output_lines = [] + last_progress = "" + progress_count = [] + try: + for line in iter(process.stdout.readline, ''): + if line: + stripped = line.strip() + output_lines.append(stripped) + + # Extract progress percentage from lines like "[====] 25% Loading..." + progress_match = re.search(r'\[.*?\]\s*(\d+)%\s*(.+)', stripped) + if progress_match: + percent = int(progress_match.group(1)) + status = progress_match.group(2).strip() + current_progress = f"{percent}% {status}" + if current_progress != last_progress: + print(f"\r[installer][emulator] Download progress: {current_progress}", end='', flush=True) + last_progress = current_progress + + if loop and device_id: + rounded_percent = round(percent / 10) * 10 + if rounded_percent not in progress_count: + progress_count.append(rounded_percent) + asyncio.run_coroutine_threadsafe( + send_response({ + "action": "status", + "data": { + "category": "AndroidEmulator", + "package": device_id, + "status": "installing", + "comment": f"Downloading system image... {percent}% {status}", + } + }), + loop + ) + elif stripped and not stripped.startswith('[') and '%' not in stripped: + # Print important non-progress messages on new line + print(f"\n[installer][emulator] {stripped}") + elif stripped.endswith('%'): + # Handle lines that end with just percentage + percent_match = re.search(r'(\d+)%', stripped) + if percent_match: + percent = int(percent_match.group(1)) + print(f"\r[installer][emulator] Download progress: {stripped}", end='', flush=True) + + if loop and device_id: + rounded_percent = round(percent / 10) * 10 + if rounded_percent not in progress_count: + progress_count.append(rounded_percent) + asyncio.run_coroutine_threadsafe( + send_response({ + "action": "status", + "data": { + "category": "AndroidEmulator", + "package": device_id, + "status": "installing", + "comment": f"Downloading system image... {percent}%", + } + }), + loop + ) + except Exception as e: + print(f"\n[installer][emulator] Output reading error: {e}") + finally: + print() # New line after progress completes + + process.stdout.close() + returncode = process.wait(timeout=1800) # 30 minutes for large system image downloads + output = "\n".join(output_lines) if returncode == 0: return True, output else: return False, output + except subprocess.TimeoutExpired: + return False, "Installation timed out after 30 minutes" except Exception as e: return False, str(e) -async def _run_avdmanager_create(avdmanager: Path, sdk_root: Path, avd_name: str, system_image: str, device_id: str) -> tuple[bool, str]: - """Create AVD with real-time output streamed via send_response.""" +def _run_sdkmanager_install_darwin(sdkmanager: Path, sdk_root: Path, system_image: str, loop=None, device_id: str = None) -> tuple[bool, str]: + """Install system image on macOS with real-time output""" try: - cmd = [str(avdmanager), "create", "avd", "-n", avd_name, "-k", system_image, "-d", device_id] - - print(f"[installer][emulator] Running: {' '.join(cmd)}") + if debug: + print(f"[installer][emulator] Running: sdkmanager --sdk_root={sdk_root} {system_image}") + print(f"[installer][emulator] This may take 10-30 minutes to download system image...") + + process = subprocess.Popen( + [str(sdkmanager), f"--sdk_root={sdk_root}", system_image], + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1 # Line buffered + ) + + # Print output in real-time as it comes, showing progress on single line + output_lines = [] + last_progress = "" + progress_count = [] + try: + for line in iter(process.stdout.readline, ''): + if line: + stripped = line.strip() + output_lines.append(stripped) + + # Extract progress percentage from lines like "[====] 25% Loading..." + progress_match = re.search(r'\[.*?\]\s*(\d+)%\s*(.+)', stripped) + if progress_match: + percent = int(progress_match.group(1)) + status = progress_match.group(2).strip() + current_progress = f"{percent}% {status}" + if current_progress != last_progress: + print(f"\r[installer][emulator] Download progress: {current_progress}", end='', flush=True) + last_progress = current_progress + + if loop and device_id: + rounded_percent = round(percent / 10) * 10 + if rounded_percent not in progress_count: + progress_count.append(rounded_percent) + asyncio.run_coroutine_threadsafe( + send_response({ + "action": "status", + "data": { + "category": "AndroidEmulator", + "package": device_id, + "status": "installing", + "comment": f"Downloading system image... {percent}% {status}", + } + }), + loop + ) + elif stripped and not stripped.startswith('[') and '%' not in stripped: + # Print important non-progress messages on new line + print(f"\n[installer][emulator] {stripped}") + elif stripped.endswith('%'): + # Handle lines that end with just percentage + percent_match = re.search(r'(\d+)%', stripped) + if percent_match: + percent = int(percent_match.group(1)) + print(f"\r[installer][emulator] Download progress: {stripped}", end='', flush=True) + + if loop and device_id: + rounded_percent = round(percent / 10) * 10 + if rounded_percent not in progress_count: + progress_count.append(rounded_percent) + asyncio.run_coroutine_threadsafe( + send_response({ + "action": "status", + "data": { + "category": "AndroidEmulator", + "package": device_id, + "status": "installing", + "comment": f"Downloading system image... {percent}%", + } + }), + loop + ) + except Exception as e: + print(f"\n[installer][emulator] Output reading error: {e}") + finally: + print() # New line after progress completes + + process.stdout.close() + returncode = process.wait(timeout=1800) # 30 minutes for large system image downloads + + output = "\n".join(output_lines) + if returncode == 0: + return True, output + else: + return False, output + except subprocess.TimeoutExpired: + return False, "Installation timed out after 30 minutes" + except Exception as e: + return False, str(e) - await send_response({ - "action": "status", - "data": { - "category": "AndroidEmulator", - "package": device_id, - "status": "installing", - "comment": f"Creating AVD '{avd_name}'...", - } - }) - async def _on_line(stripped: str): - print(f"[installer][emulator] {stripped}") - await send_response({ - "action": "status", - "data": { - "category": "AndroidEmulator", - "package": device_id, - "status": "installing", - "comment": stripped, - } - }) +def _run_avdmanager_create_windows(avdmanager: Path, sdk_root: Path, avd_name: str, system_image: str, device_id: str) -> tuple[bool, str]: + """Create AVD on Windows with real-time output""" + try: + # Create AVD: avdmanager create avd -n {avd_name} -k {system_image} -d {device_id} + # Answer "no" to custom hardware profile prompt + process = subprocess.Popen( + [str(avdmanager), "create", "avd", "-n", avd_name, "-k", system_image, "-d", device_id], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1 # Line buffered + ) + + # Send "no" to custom hardware profile prompt + process.stdin.write("no\n") + process.stdin.close() + + # Print output in real-time as it comes, showing progress on single line + output_lines = [] + last_progress = "" + try: + for line in iter(process.stdout.readline, ''): + if line: + stripped = line.strip() + output_lines.append(stripped) + + # Extract progress percentage from lines like "[====] 25% Loading..." + progress_match = re.search(r'\[.*?\]\s*(\d+)%\s*(.+)', stripped) + if progress_match: + percent = progress_match.group(1) + status = progress_match.group(2).strip() + current_progress = f"{percent}% {status}" + if current_progress != last_progress: + print(f"\r[installer][emulator] Download progress: {current_progress}", end='', flush=True) + last_progress = current_progress + elif stripped and not stripped.startswith('[') and '%' not in stripped: + # Print important non-progress messages on new line + print(f"\n[installer][emulator] {stripped}") + elif stripped.endswith('%'): + # Handle lines that end with just percentage + print(f"\r[installer][emulator] Download progress: {stripped}", end='', flush=True) + except Exception as e: + print(f"\n[installer][emulator] Output reading error: {e}") + finally: + print() # New line after progress completes + + process.stdout.close() + returncode = process.wait(timeout=120) + + output = "\n".join(output_lines) + if returncode == 0: + return True, output + else: + return False, output + except subprocess.TimeoutExpired: + return False, "AVD creation timed out" + except Exception as e: + return False, str(e) - if platform.system() == "Windows": - # Windows: no PTY - output_lines: list[str] = [] - async def _win_stream(proc: asyncio.subprocess.Process) -> int: - while True: - raw = await proc.stdout.readline() - if not raw: - break - line = raw.decode("utf-8", errors="replace").rstrip() - if not line: - continue - output_lines.append(line) - await _on_line(line.strip()) - return await proc.wait() - - proc = await asyncio.create_subprocess_exec( - *cmd, - stdin=asyncio.subprocess.PIPE, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, - ) - try: - proc.stdin.write(b"no\n") - await proc.stdin.drain() - proc.stdin.close() - except Exception: - pass - returncode = await _win_stream(proc) +def _run_avdmanager_create_linux(avdmanager: Path, sdk_root: Path, avd_name: str, system_image: str, device_id: str) -> tuple[bool, str]: + """Create AVD on Linux with real-time output""" + try: + # Create AVD: avdmanager create avd -n {avd_name} -k {system_image} -d {device_id} + # Answer "no" to custom hardware profile prompt + process = subprocess.Popen( + [str(avdmanager), "create", "avd", "-n", avd_name, "-k", system_image, "-d", device_id], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1 # Line buffered + ) + + # Send "no" to custom hardware profile prompt + process.stdin.write("no\n") + process.stdin.close() + + # Print output in real-time as it comes, showing progress on single line + output_lines = [] + last_progress = "" + try: + for line in iter(process.stdout.readline, ''): + if line: + stripped = line.strip() + output_lines.append(stripped) + + # Extract progress percentage from lines like "[====] 25% Loading..." + progress_match = re.search(r'\[.*?\]\s*(\d+)%\s*(.+)', stripped) + if progress_match: + percent = progress_match.group(1) + status = progress_match.group(2).strip() + current_progress = f"{percent}% {status}" + if current_progress != last_progress: + print(f"\r[installer][emulator] Download progress: {current_progress}", end='', flush=True) + last_progress = current_progress + elif stripped and not stripped.startswith('[') and '%' not in stripped: + # Print important non-progress messages on new line + print(f"\n[installer][emulator] {stripped}") + elif stripped.endswith('%'): + # Handle lines that end with just percentage + print(f"\r[installer][emulator] Download progress: {stripped}", end='', flush=True) + except Exception as e: + print(f"\n[installer][emulator] Output reading error: {e}") + finally: + print() # New line after progress completes + + process.stdout.close() + returncode = process.wait(timeout=120) + + output = "\n".join(output_lines) + if returncode == 0: + return True, output else: - # Linux / macOS: use PTY - returncode, output_lines = await pty_stream( - cmd, - stdin_data="no\n", - on_line=_on_line, - timeout_s=120, - ) + return False, output + except subprocess.TimeoutExpired: + return False, "AVD creation timed out" + except Exception as e: + return False, str(e) + +def _run_avdmanager_create_darwin(avdmanager: Path, sdk_root: Path, avd_name: str, system_image: str, device_id: str) -> tuple[bool, str]: + """Create AVD on macOS with real-time output""" + try: + # Create AVD: avdmanager create avd -n {avd_name} -k {system_image} -d {device_id} + # Answer "no" to custom hardware profile prompt + process = subprocess.Popen( + [str(avdmanager), "create", "avd", "-n", avd_name, "-k", system_image, "-d", device_id], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + text=True, + bufsize=1 # Line buffered + ) + + # Send "no" to custom hardware profile prompt + process.stdin.write("no\n") + process.stdin.close() + + # Print output in real-time as it comes, showing progress on single line + output_lines = [] + last_progress = "" + try: + for line in iter(process.stdout.readline, ''): + if line: + stripped = line.strip() + output_lines.append(stripped) + + # Extract progress percentage from lines like "[====] 25% Loading..." + progress_match = re.search(r'\[.*?\]\s*(\d+)%\s*(.+)', stripped) + if progress_match: + percent = progress_match.group(1) + status = progress_match.group(2).strip() + current_progress = f"{percent}% {status}" + if current_progress != last_progress: + print(f"\r[installer][emulator] Download progress: {current_progress}", end='', flush=True) + last_progress = current_progress + elif stripped and not stripped.startswith('[') and '%' not in stripped: + # Print important non-progress messages on new line + print(f"\n[installer][emulator] {stripped}") + elif stripped.endswith('%'): + # Handle lines that end with just percentage + print(f"\r[installer][emulator] Download progress: {stripped}", end='', flush=True) + except Exception as e: + print(f"\n[installer][emulator] Output reading error: {e}") + finally: + print() # New line after progress completes + + process.stdout.close() + returncode = process.wait(timeout=120) + output = "\n".join(output_lines) if returncode == 0: return True, output else: return False, output + except subprocess.TimeoutExpired: + return False, "AVD creation timed out" except Exception as e: return False, str(e) @@ -1273,9 +1595,48 @@ async def create_avd_from_system_image(device_param: str) -> bool: # Step 1: Install system image print(f"[installer][emulator] Installing system image: {system_image_name}") - success, output = await _run_sdkmanager_install( - sdkmanager, sdk_root, system_image_name, device_id=device_id - ) + loop = asyncio.get_event_loop() + if _is_windows(): + success, output = await loop.run_in_executor( + None, + _run_sdkmanager_install_windows, + sdkmanager, + sdk_root, + system_image_name, + loop, + device_id + ) + elif _is_linux(): + success, output = await loop.run_in_executor( + None, + _run_sdkmanager_install_linux, + sdkmanager, + sdk_root, + system_image_name, + loop, + device_id + ) + elif _is_darwin(): + success, output = await loop.run_in_executor( + None, + _run_sdkmanager_install_darwin, + sdkmanager, + sdk_root, + system_image_name, + loop, + device_id + ) + else: + # Fallback to Linux for unknown platforms + success, output = await loop.run_in_executor( + None, + _run_sdkmanager_install_linux, + sdkmanager, + sdk_root, + system_image_name, + loop, + device_id + ) if not success: error_msg = f"Failed to install Android Version 16: {output}" @@ -1305,9 +1666,47 @@ async def create_avd_from_system_image(device_param: str) -> bool: } }) - success, output = await _run_avdmanager_create( - avdmanager, sdk_root, avd_name, system_image_name, device_id - ) + if _is_windows(): + success, output = await loop.run_in_executor( + None, + _run_avdmanager_create_windows, + avdmanager, + sdk_root, + avd_name, + system_image_name, + device_id + ) + elif _is_linux(): + success, output = await loop.run_in_executor( + None, + _run_avdmanager_create_linux, + avdmanager, + sdk_root, + avd_name, + system_image_name, + device_id + ) + elif _is_darwin(): + success, output = await loop.run_in_executor( + None, + _run_avdmanager_create_darwin, + avdmanager, + sdk_root, + avd_name, + system_image_name, + device_id + ) + else: + # Fallback to Linux for unknown platforms + success, output = await loop.run_in_executor( + None, + _run_avdmanager_create_linux, + avdmanager, + sdk_root, + avd_name, + system_image_name, + device_id + ) if not success: error_msg = f"Failed to create AVD: {output}" diff --git a/Framework/install_handler/utils.py b/Framework/install_handler/utils.py index 08dd8743..151c99c7 100644 --- a/Framework/install_handler/utils.py +++ b/Framework/install_handler/utils.py @@ -1,9 +1,6 @@ import datetime import asyncio -import os -import re import platform -from typing import Callable, Awaitable from Framework.Utilities import RequestFormatter, ConfigModule, CommonUtil debug = False @@ -77,134 +74,4 @@ async def send_response(data=None) -> None: if debug: print(e) await asyncio.sleep(3,5) except Exception as e: - print(f"[installer] Error sending response: {e}") - - -async def pty_stream( - cmd: list[str], - stdin_data: str | None = None, - on_line: Callable[[str], Awaitable[None]] | None = None, - timeout_s: int = 1800, -) -> tuple[int, list[str]]: - """ - Spawn *cmd* under a pseudo-terminal so the child never block-buffers - its stdout. Read output in chunks and split on ``\\r`` / ``\\n`` so - sdkmanager-style progress lines (``\\r[===] 34% Downloading...``) - arrive immediately instead of being held until the 8 KB pipe buffer fills. - - Falls back to a regular pipe on Windows (no PTY support). - - Returns ``(returncode, output_lines)``. - """ - if platform.system() == "Windows": - return await _pipe_stream(cmd, stdin_data, on_line, timeout_s) - - import pty as pty_mod - - master_fd, slave_fd = pty_mod.openpty() - - proc = await asyncio.create_subprocess_exec( - *cmd, - stdin=asyncio.subprocess.PIPE if stdin_data else asyncio.subprocess.DEVNULL, - stdout=slave_fd, - stderr=slave_fd, - ) - os.close(slave_fd) # parent only needs the master side - - if stdin_data: - try: - proc.stdin.write(stdin_data.encode()) - await proc.stdin.drain() - proc.stdin.close() - except Exception: - pass - - output_lines: list[str] = [] - loop = asyncio.get_event_loop() - buf = "" - - while True: - try: - data: bytes = await asyncio.wait_for( - loop.run_in_executor(None, os.read, master_fd, 4096), - timeout=timeout_s, - ) - except OSError: - # EIO when slave side closes (child exited) - break - except asyncio.TimeoutError: - break - if not data: - break - - buf += data.decode("utf-8", errors="replace") - - # Split on any line-ending (\r\n, \r, or \n) - parts = re.split(r"\r\n|\r|\n", buf) - buf = parts[-1] # keep the incomplete tail - for part in parts[:-1]: - line = part.strip() - if not line: - continue - # strip ANSI escape codes the PTY may inject - line = re.sub(r"\x1b\[[0-9;]*[A-Za-z]", "", line) - if not line: - continue - output_lines.append(line) - if on_line: - await on_line(line) - - # flush any remaining partial line - remaining = buf.strip() - if remaining: - remaining = re.sub(r"\x1b\[[0-9;]*[A-Za-z]", "", remaining) - if remaining: - output_lines.append(remaining) - if on_line: - await on_line(remaining) - - try: - os.close(master_fd) - except OSError: - pass - - returncode = await proc.wait() - return returncode, output_lines - - -async def _pipe_stream( - cmd: list[str], - stdin_data: str | None = None, - on_line: Callable[[str], Awaitable[None]] | None = None, - timeout_s: int = 1800, -) -> tuple[int, list[str]]: - """Fallback for Windows: plain pipe + readline.""" - proc = await asyncio.create_subprocess_exec( - *cmd, - stdin=asyncio.subprocess.PIPE if stdin_data else asyncio.subprocess.DEVNULL, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.STDOUT, - ) - - if stdin_data: - try: - proc.stdin.write(stdin_data.encode()) - await proc.stdin.drain() - proc.stdin.close() - except Exception: - pass - - output_lines: list[str] = [] - while True: - raw = await proc.stdout.readline() - if not raw: - break - line = raw.decode("utf-8", errors="replace").strip() - if not line: - continue - output_lines.append(line) - if on_line: - await on_line(line) - - returncode = await proc.wait() - return returncode, output_lines + print(f"[installer] Error sending response: {e}") \ No newline at end of file From 9e96eaf986b80fd223c1fdb09ef92f0e227ea44e Mon Sep 17 00:00:00 2001 From: Mahmudul Alam Date: Sun, 15 Feb 2026 03:24:05 +0600 Subject: [PATCH 3/3] Added web post request made asynchronous to improve performance --- Framework/install_handler/utils.py | 13 +++++---- server/installers.py | 44 ++++++++++++++++++------------ 2 files changed, 34 insertions(+), 23 deletions(-) diff --git a/Framework/install_handler/utils.py b/Framework/install_handler/utils.py index 151c99c7..4b0cbcae 100644 --- a/Framework/install_handler/utils.py +++ b/Framework/install_handler/utils.py @@ -1,7 +1,7 @@ import datetime import asyncio import platform -from Framework.Utilities import RequestFormatter, ConfigModule, CommonUtil +from Framework.Utilities import RequestFormatter, CommonUtil debug = False version = "2.0.0" @@ -40,6 +40,8 @@ def generate_services_list(services): async def send_response(data=None) -> None: + if data is None: + data = {} try: from Framework.install_handler.route import services host = RequestFormatter.form_uri("d/nodes/install/server/push") @@ -60,18 +62,19 @@ async def send_response(data=None) -> None: for _ in range(3): try: - resp = await RequestFormatter.request("post", host, json=data, timeout=70) + resp = await asyncio.to_thread(RequestFormatter.request, "post", host, json=data, timeout=70) if debug: print(f"[installer] Response status: {resp.status_code}") print(f"[installer] Response content: {resp.content}") if not resp.ok: if debug: print(f"[installer] Failed to send response: {resp.status_code}") - await asyncio.sleep(3,5) + await asyncio.sleep(3.5) else: break except Exception as e: - if debug: print(e) - await asyncio.sleep(3,5) + if debug: + print(e) + await asyncio.sleep(3.5) except Exception as e: print(f"[installer] Error sending response: {e}") \ No newline at end of file diff --git a/server/installers.py b/server/installers.py index b5925457..d7ba279c 100644 --- a/server/installers.py +++ b/server/installers.py @@ -17,7 +17,6 @@ from Framework.install_handler import utils as install_utils from Framework.install_handler.route import services as INSTALLER_SERVICES -from Framework.install_handler.system_info.system_info import get_formatted_system_info from Framework.install_handler.android.emulator import ( android_emulator_install, check_emulator_list, @@ -38,6 +37,10 @@ "yes", ) ANDROID_CATEGORIES = {"Android", "AndroidEmulator"} +WEB_CATEGORIES = {"Web"} + +# Combined categories for patching +PATCH_CATEGORIES = ANDROID_CATEGORIES | WEB_CATEGORIES # --- Models --- # @@ -61,6 +64,23 @@ class AndroidEmulatorLaunchRequest(BaseModel): request_id: str | None = None +class IOSSimulatorCreateRequest(BaseModel): + device_type: str + runtime: str | None = None + name: str | None = None + request_id: str | None = None + + +class IOSSimulatorLaunchRequest(BaseModel): + udid: str + request_id: str | None = None + + +class IOSSimulatorDeleteRequest(BaseModel): + udid: str + request_id: str | None = None + + class JobCreateResponse(BaseModel): job_id: str status: JobStatus @@ -123,7 +143,7 @@ def unsubscribe(self, job_id: str, queue: asyncio.Queue) -> None: def publish(self, event: dict) -> None: job_id = event.get("job_id") with self._lock: - targets = list(self._subscribers.get(job_id, set())) + targets = list(self._subscribers.get(job_id, set())) if job_id else [] targets += list(self._subscribers.get("*", set())) for queue in targets: try: @@ -239,9 +259,9 @@ async def send_response_proxy(data: dict | None = None) -> None: def _patch_send_response_targets() -> None: modules: set[str] = set() - # Collect modules from Android-only service registry + # Collect modules from Android and Web service registry for category in INSTALLER_SERVICES: - if category.get("category") not in ANDROID_CATEGORIES: + if category.get("category") not in PATCH_CATEGORIES: continue for key in ("install_function", "status_function"): func = category.get(key) @@ -273,7 +293,7 @@ def _patch_send_response_targets() -> None: def _find_category(category_name: str) -> dict: - if category_name not in ANDROID_CATEGORIES: + if category_name not in PATCH_CATEGORIES: raise KeyError(f"Unsupported category: {category_name}") for category in INSTALLER_SERVICES: if category.get("category") == category_name: @@ -409,7 +429,7 @@ async def services_list(): services = [ svc for svc in install_utils.generate_services_list(INSTALLER_SERVICES) - if svc.get("category") in ANDROID_CATEGORIES + if svc.get("category") in PATCH_CATEGORIES ] return ServicesResponse( node_id=install_utils.read_node_id(), @@ -418,16 +438,6 @@ async def services_list(): ) -@router.get("/system-info", response_model=SystemInfoResponse) -async def system_info(): - info = await get_formatted_system_info() - return SystemInfoResponse( - node_id=install_utils.read_node_id(), - generated_at=time.time(), - data=info, - ) - - @router.post("/jobs/install", response_model=JobCreateResponse) async def install_service(req: ServiceRequest): try: @@ -496,8 +506,6 @@ async def status_service(req: ServiceRequest): ) - - @router.post("/jobs/android-emulator/refresh-installed", response_model=JobCreateResponse) async def android_emulator_refresh(): job = _submit_job(action="android_emulator_refresh")