diff --git a/README.md b/README.md index cf29b0a..ee718f9 100755 --- a/README.md +++ b/README.md @@ -399,6 +399,27 @@ MCP Client can access the following tools to interact with Windows: - `Process`: List running processes or terminate them by PID or name. - `Notification`: Send a Windows toast notification with a title and message. - `Registry`: Read, write, delete, or list Windows Registry values and keys. +- `CursorPosition`: Get the current mouse cursor (x, y) coordinates. +- `PixelColor`: Get the RGB color value at screen coordinates with hex code and color name. +- `KeyHold`: Press or release keyboard keys independently for hold operations (e.g., hold Shift while clicking). +- `ScreenInfo`: Get information about all connected monitors (resolution, position, primary). +- `ScreenHighlight`: Highlight a screen region with a colored border for visual debugging. +- `MousePath`: Move the mouse cursor smoothly through a series of waypoints. +- `ScreenReader`: Read text from a screen region using OCR (Windows built-in or pytesseract fallback). +- `WaitForChange`: Wait until a screen region visually changes beyond a threshold. +- `FindImage`: Find a template image on screen using visual template matching (requires `pip install 'windows-mcp[vision]'`). +- `VolumeControl`: Get/set system volume (0-100), mute/unmute/toggle via Windows Core Audio COM API. +- `BrightnessControl`: Get/set display brightness (0-100) via WMI. +- `AppList`: List all running applications, check if a specific app is running, or force-quit by name. +- `Dialog`: Display message boxes, input prompts, or file/folder picker dialogs. +- `SystemInfoExtended`: Detailed system info — OS version, CPU, RAM, disk, battery, network, uptime. +- `DarkMode`: Get or toggle Windows dark/light app mode via registry. +- `SayText`: Text-to-speech using Windows SAPI with optional voice and rate settings. +- `PortCheck`: Check if a TCP/UDP port is in use, find the owning process, or list all listening ports. +- `FileWatcher`: Watch a file or directory for changes (create, modify, delete) with timeout. +- `SearchFiles`: Search for files by name pattern or content text within a directory tree. +- `NetworkDiagnostics`: Ping hosts, DNS lookup, trace routes, check connectivity. +- `AccessibilityInspector`: Inspect UI element accessibility properties at coordinates or for the focused element. ## 🤝 Connect with Us Stay updated and join our community: diff --git a/pyproject.toml b/pyproject.toml index 00fd9b7..b4f77a9 100755 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,6 +43,18 @@ dev = [ "pytest>=8.0.0", "pytest-asyncio>=0.24.0", ] +vision = [ + "opencv-python-headless>=4.8.0", + "numpy>=1.26.0", +] +ocr = [ + "pytesseract>=0.3.10", +] +all = [ + "opencv-python-headless>=4.8.0", + "numpy>=1.26.0", + "pytesseract>=0.3.10", +] [project.urls] Homepage = "https://github.com/CursorTouch" diff --git a/src/windows_mcp/__main__.py b/src/windows_mcp/__main__.py index 0141d83..64bcb5c 100755 --- a/src/windows_mcp/__main__.py +++ b/src/windows_mcp/__main__.py @@ -5,6 +5,7 @@ from windows_mcp.watchdog.service import WatchDog from contextlib import asynccontextmanager from fastmcp.utilities.types import Image +from PIL import Image as PILImage from dataclasses import dataclass, field from windows_mcp.auth import AuthClient from mcp.types import ToolAnnotations @@ -26,11 +27,13 @@ load_dotenv() + @dataclass class Config: mode: str - sandbox_id: str = field(default='') - api_key: str = field(default='') + sandbox_id: str = field(default="") + api_key: str = field(default="") + MAX_IMAGE_WIDTH, MAX_IMAGE_HEIGHT = 1920, 1080 @@ -48,7 +51,7 @@ class Config: @asynccontextmanager async def lifespan(app: FastMCP): """Runs initialization code before the server starts and cleanup code after it shuts down.""" - global desktop, watchdog, analytics,screen_size + global desktop, watchdog, analytics, screen_size # Initialize components here instead of at module level if os.getenv("ANONYMIZED_TELEMETRY", "true").lower() != "false": @@ -71,9 +74,59 @@ async def lifespan(app: FastMCP): mcp = FastMCP(name="windows-mcp", instructions=instructions, lifespan=lifespan) + +def _to_physical(loc: list[int], coordinate_system: str) -> list[int]: + """Convert coordinates to physical space if needed. + + Args: + loc: [x, y] coordinates. + coordinate_system: "physical" (no conversion) or "logical" (multiply by DPI scale). + + Returns: + [x, y] in physical coordinates ready for pyautogui. + + Raises: + ValueError: If loc does not have exactly 2 elements. + RuntimeError: If desktop service is not initialized in logical mode. + """ + if len(loc) != 2: + raise ValueError("loc must be [x, y]") + if coordinate_system == "logical": + if desktop is None: + raise RuntimeError("Desktop service is not initialized.") + scale = desktop.get_dpi_scaling() + return [round(loc[0] * scale), round(loc[1] * scale)] + return loc + + +def _region_to_physical(region: list[int], coordinate_system: str) -> list[int]: + """Convert a region [x, y, width, height] to physical space if needed.""" + if len(region) != 4: + raise ValueError("region must be [x, y, width, height]") + if coordinate_system == "logical": + if desktop is None: + raise RuntimeError("Desktop service is not initialized.") + scale = desktop.get_dpi_scaling() + return [round(v * scale) for v in region] + return region + + +def _path_to_physical(path: list[list[int]], coordinate_system: str) -> list[list[int]]: + """Convert a list of [x, y] waypoints to physical space if needed.""" + for i, p in enumerate(path): + if len(p) != 2: + raise ValueError(f"waypoint {i} must be [x, y], got {p}") + if coordinate_system == "logical": + if desktop is None: + raise RuntimeError("Desktop service is not initialized.") + scale = desktop.get_dpi_scaling() + return [[round(p[0] * scale), round(p[1] * scale)] for p in path] + return path + + @mcp.tool( name="App", - description="Manages Windows applications with three modes: 'launch' (opens the prescibed application), 'resize' (adjusts active window size/position), 'switch' (brings specific window into focus).", + description="Manages Windows applications: 'launch' (open app), 'resize' (adjust size/position), 'switch' (bring to focus), 'minimize'/'maximize'/'close'/'fullscreen'/'restore' (window control).", annotations=ToolAnnotations( title="App", readOnlyHint=False, @@ -83,9 +136,22 @@ async def lifespan(app: FastMCP): ), ) @with_analytics(analytics, "App-Tool") -def app_tool(mode:Literal['launch','resize','switch']='launch',name:str|None=None,window_loc:list[int]|None=None,window_size:list[int]|None=None, ctx: Context = None): - return desktop.app(mode,name,window_loc,window_size) - +def app_tool( + mode: Literal[ + "launch", "resize", "switch", "minimize", "maximize", "close", "fullscreen", "restore" + ] = "launch", + name: str | None = None, + window_loc: list[int] | None = None, + window_size: list[int] | None = None, + ctx: Context = None, +): + if mode in ("minimize", "maximize", "close", "fullscreen", "restore"): + if not name: + return "Error: name is required for window control actions" + return desktop.window_control(name, mode) + return desktop.app(mode, name, window_loc, window_size) + + @mcp.tool( name="PowerShell", description="A comprehensive system tool for executing any PowerShell commands. Use it to navigate the file system, manage files and processes, and execute system-level operations. Capable of accessing web content (e.g., via Invoke-WebRequest), interacting with network resources, and performing complex administrative tasks. This tool provides full access to the underlying operating system capabilities, making it the primary interface for system automation, scripting, and deep system interaction.", @@ -107,19 +173,19 @@ def powershell_tool(command: str, timeout: int = 30, ctx: Context = None) -> str @mcp.tool( - name='FileSystem', + name="FileSystem", description="Manages file system operations with eight modes: 'read' (read text file contents with optional line offset/limit), 'write' (create or overwrite a file, set append=True to append), 'copy' (copy file or directory to destination), 'move' (move or rename file/directory), 'delete' (delete file or directory, set recursive=True for non-empty dirs), 'list' (list directory contents with optional pattern filter), 'search' (find files matching a glob pattern), 'info' (get file/directory metadata like size, dates, type). Relative paths are resolved from the user's Desktop folder. Use absolute paths to access other locations.", annotations=ToolAnnotations( title="FileSystem", readOnlyHint=False, destructiveHint=True, idempotentHint=False, - openWorldHint=False - ) - ) + openWorldHint=False, + ), +) @with_analytics(analytics, "FileSystem-Tool") def file_system_tool( - mode: Literal['read', 'write', 'copy', 'move', 'delete', 'list', 'search', 'info'], + mode: Literal["read", "write", "copy", "move", "delete", "list", "search", "info"], path: str, destination: str | None = None, content: str | None = None, @@ -129,56 +195,66 @@ def file_system_tool( overwrite: bool | str = False, offset: int | None = None, limit: int | None = None, - encoding: str = 'utf-8', + encoding: str = "utf-8", show_hidden: bool | str = False, - ctx: Context = None + ctx: Context = None, ) -> str: try: from platformdirs import user_desktop_dir + default_dir = user_desktop_dir() if not os.path.isabs(path): path = os.path.join(default_dir, path) if destination and not os.path.isabs(destination): destination = os.path.join(default_dir, destination) - recursive = recursive is True or (isinstance(recursive, str) and recursive.lower() == 'true') - append = append is True or (isinstance(append, str) and append.lower() == 'true') - overwrite = overwrite is True or (isinstance(overwrite, str) and overwrite.lower() == 'true') - show_hidden = show_hidden is True or (isinstance(show_hidden, str) and show_hidden.lower() == 'true') + recursive = recursive is True or ( + isinstance(recursive, str) and recursive.lower() == "true" + ) + append = append is True or (isinstance(append, str) and append.lower() == "true") + overwrite = overwrite is True or ( + isinstance(overwrite, str) and overwrite.lower() == "true" + ) + show_hidden = show_hidden is True or ( + isinstance(show_hidden, str) and show_hidden.lower() == "true" + ) match mode: - case 'read': + case "read": return filesystem.read_file(path, offset=offset, limit=limit, encoding=encoding) - case 'write': + case "write": if content is None: - return 'Error: content parameter is required for write mode.' + return "Error: content parameter is required for write mode." return filesystem.write_file(path, content, append=append, encoding=encoding) - case 'copy': + case "copy": if destination is None: - return 'Error: destination parameter is required for copy mode.' + return "Error: destination parameter is required for copy mode." return filesystem.copy_path(path, destination, overwrite=overwrite) - case 'move': + case "move": if destination is None: - return 'Error: destination parameter is required for move mode.' + return "Error: destination parameter is required for move mode." return filesystem.move_path(path, destination, overwrite=overwrite) - case 'delete': + case "delete": return filesystem.delete_path(path, recursive=recursive) - case 'list': - return filesystem.list_directory(path, pattern=pattern, recursive=recursive, show_hidden=show_hidden) - case 'search': + case "list": + return filesystem.list_directory( + path, pattern=pattern, recursive=recursive, show_hidden=show_hidden + ) + case "search": if pattern is None: - return 'Error: pattern parameter is required for search mode.' + return "Error: pattern parameter is required for search mode." return filesystem.search_files(path, pattern, recursive=recursive) - case 'info': + case "info": return filesystem.get_file_info(path) case _: return f'Error: Unknown mode "{mode}". Use: read, write, copy, move, delete, list, search, info.' except Exception as e: - return f'Error in File tool: {str(e)}' + return f"Error in File tool: {str(e)}" + @mcp.tool( - name='Snapshot', - description='Captures complete desktop state including: system language, focused/opened windows, interactive elements (buttons, text fields, links, menus with coordinates), and scrollable areas. Set use_vision=True to include screenshot. Set use_dom=True for browser content to get web page elements instead of browser UI. Always call this first to understand the current desktop state before taking actions.', + name="Snapshot", + description="Captures complete desktop state including: system language, focused/opened windows, interactive elements (buttons, text fields, links, menus with coordinates), and scrollable areas. Set use_vision=True to include screenshot. Set use_dom=True for browser content to get web page elements instead of browser UI. Always call this first to understand the current desktop state before taking actions.", annotations=ToolAnnotations( title="Snapshot", readOnlyHint=True, @@ -188,25 +264,33 @@ def file_system_tool( ), ) @with_analytics(analytics, "State-Tool") -def state_tool(use_vision:bool|str=False,use_dom:bool|str=False, ctx: Context = None): +def state_tool(use_vision: bool | str = False, use_dom: bool | str = False, ctx: Context = None): try: - use_vision = use_vision is True or (isinstance(use_vision, str) and use_vision.lower() == 'true') - use_dom = use_dom is True or (isinstance(use_dom, str) and use_dom.lower() == 'true') - + use_vision = use_vision is True or ( + isinstance(use_vision, str) and use_vision.lower() == "true" + ) + use_dom = use_dom is True or (isinstance(use_dom, str) and use_dom.lower() == "true") + # Calculate scale factor to cap resolution at 1080p (1920x1080) - scale_width = MAX_IMAGE_WIDTH / screen_size.width if screen_size.width > MAX_IMAGE_WIDTH else 1.0 - scale_height = MAX_IMAGE_HEIGHT / screen_size.height if screen_size.height > MAX_IMAGE_HEIGHT else 1.0 + scale_width = ( + MAX_IMAGE_WIDTH / screen_size.width if screen_size.width > MAX_IMAGE_WIDTH else 1.0 + ) + scale_height = ( + MAX_IMAGE_HEIGHT / screen_size.height if screen_size.height > MAX_IMAGE_HEIGHT else 1.0 + ) scale = min(scale_width, scale_height) - - desktop_state=desktop.get_state(use_vision=use_vision,use_dom=use_dom,as_bytes=False,scale=scale) - - interactive_elements=desktop_state.tree_state.interactive_elements_to_string() - scrollable_elements=desktop_state.tree_state.scrollable_elements_to_string() - windows=desktop_state.windows_to_string() - active_window=desktop_state.active_window_to_string() - active_desktop=desktop_state.active_desktop_to_string() - all_desktops=desktop_state.desktops_to_string() - + + desktop_state = desktop.get_state( + use_vision=use_vision, use_dom=use_dom, as_bytes=False, scale=scale + ) + + interactive_elements = desktop_state.tree_state.interactive_elements_to_string() + scrollable_elements = desktop_state.tree_state.scrollable_elements_to_string() + windows = desktop_state.windows_to_string() + active_window = desktop_state.active_window_to_string() + active_desktop = desktop_state.active_desktop_to_string() + all_desktops = desktop_state.desktops_to_string() + # Convert screenshot to bytes for vision response screenshot_bytes = None if use_vision and desktop_state.screenshot is not None: @@ -215,9 +299,10 @@ def state_tool(use_vision:bool|str=False,use_dom:bool|str=False, ctx: Context = screenshot_bytes = buffered.getvalue() buffered.close() except Exception as e: - return [f'Error capturing desktop state: {str(e)}. Please try again.'] - - return [dedent(f''' + return [f"Error capturing desktop state: {str(e)}. Please try again."] + + return [ + dedent(f""" Active Desktop: {active_desktop} @@ -234,7 +319,9 @@ def state_tool(use_vision:bool|str=False,use_dom:bool|str=False, ctx: Context = {interactive_elements or "No interactive elements found."} List of Scrollable Elements: - {scrollable_elements or 'No scrollable elements found.'}''')]+([Image(data=screenshot_bytes,format='png')] if use_vision and screenshot_bytes else []) + {scrollable_elements or "No scrollable elements found."}""") + ] + ([Image(data=screenshot_bytes, format="png")] if use_vision and screenshot_bytes else []) + @mcp.tool( name="Click", @@ -485,7 +572,7 @@ def multi_select_tool( locs: list[list[int]] | None = None, labels: list[int] | None = None, press_ctrl: bool | str = True, - ctx: Context = None + ctx: Context = None, ) -> str: if locs is None and labels is None: raise ValueError("Either locs or labels must be provided.") @@ -498,7 +585,7 @@ def multi_select_tool( locs.append(list(desktop.get_coordinates_from_label(label))) except Exception as e: raise ValueError(f"Failed to find element with label {label}: {e}") - + press_ctrl = press_ctrl is True or ( isinstance(press_ctrl, str) and press_ctrl.lower() == "true" ) @@ -520,9 +607,7 @@ def multi_select_tool( ) @with_analytics(analytics, "Multi-Edit-Tool") def multi_edit_tool( - locs: list[list] | None = None, - labels: list[list] | None = None, - ctx: Context = None + locs: list[list] | None = None, labels: list[list] | None = None, ctx: Context = None ) -> str: if locs is None and labels is None: raise ValueError("Either locs or labels must be provided.") @@ -539,7 +624,7 @@ def multi_edit_tool( locs.append([loc[0], loc[1], text]) except Exception as e: raise ValueError(f"Failed to process label item {item}: {e}") - + desktop.multi_edit(locs) elements_str = ", ".join([f"({e[0]},{e[1]}) with text '{e[2]}'" for e in locs]) return f"Multi-edited elements at: {elements_str}" @@ -622,7 +707,6 @@ def process_tool( return f"Error managing processes: {str(e)}" - @mcp.tool( name="Notification", description="Sends a Windows toast notification with a title and message. Useful for alerting the user remotely.", @@ -642,59 +726,802 @@ def notification_tool(title: str, message: str, ctx: Context = None) -> str: return f"Error sending notification: {str(e)}" - @mcp.tool( - name='Registry', + name="Registry", description='Accesses the Windows Registry. Use mode="get" to read a value, mode="set" to create/update a value, mode="delete" to remove a value or key, mode="list" to list values and sub-keys under a path. Paths use PowerShell format (e.g. "HKCU:\\Software\\MyApp", "HKLM:\\SOFTWARE\\Microsoft\\Windows\\CurrentVersion").', annotations=ToolAnnotations( title="Registry", readOnlyHint=False, destructiveHint=True, idempotentHint=False, - openWorldHint=False - ) + openWorldHint=False, + ), ) @with_analytics(analytics, "Registry-Tool") -def registry_tool(mode: Literal['get', 'set', 'delete', 'list'], path: str, name: str | None = None, value: str | None = None, type: Literal['String', 'DWord', 'QWord', 'Binary', 'MultiString', 'ExpandString'] = 'String', ctx: Context = None) -> str: +def registry_tool( + mode: Literal["get", "set", "delete", "list"], + path: str, + name: str | None = None, + value: str | None = None, + type: Literal["String", "DWord", "QWord", "Binary", "MultiString", "ExpandString"] = "String", + ctx: Context = None, +) -> str: try: - if mode == 'get': + if mode == "get": if name is None: - return 'Error: name parameter is required for get mode.' + return "Error: name parameter is required for get mode." return desktop.registry_get(path=path, name=name) - elif mode == 'set': + elif mode == "set": if name is None: - return 'Error: name parameter is required for set mode.' + return "Error: name parameter is required for set mode." if value is None: - return 'Error: value parameter is required for set mode.' + return "Error: value parameter is required for set mode." return desktop.registry_set(path=path, name=name, value=value, reg_type=type) - elif mode == 'delete': + elif mode == "delete": return desktop.registry_delete(path=path, name=name) - elif mode == 'list': + elif mode == "list": return desktop.registry_list(path=path) else: return 'Error: mode must be "get", "set", "delete", or "list".' except Exception as e: - return f'Error accessing registry: {str(e)}' + return f"Error accessing registry: {str(e)}" + + +@mcp.tool( + name="CursorPosition", + description="Returns the current mouse cursor position as (x, y) coordinates.", + annotations=ToolAnnotations( + title="CursorPosition", + readOnlyHint=True, + destructiveHint=False, + idempotentHint=True, + openWorldHint=False, + ), +) +@with_analytics(analytics, "CursorPosition-Tool") +def cursor_position_tool(ctx: Context = None) -> str: + try: + return desktop.get_cursor_position() + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="PixelColor", + description=( + "Gets the RGB color value at specified screen coordinates [x, y]. " + "Returns color as RGB values and hex code with approximate color name. " + "Set coordinate_system='logical' to auto-convert from logical (DPI-scaled) coordinates to physical. " + "Default is 'physical' (no conversion)." + ), + annotations=ToolAnnotations( + title="PixelColor", + readOnlyHint=True, + destructiveHint=False, + idempotentHint=True, + openWorldHint=False, + ), +) +@with_analytics(analytics, "PixelColor-Tool") +def pixel_color_tool( + loc: list[int], + coordinate_system: Literal["physical", "logical"] = "physical", + ctx: Context = None, +) -> str: + try: + loc = _to_physical(loc, coordinate_system) + return desktop.get_pixel_color(loc) + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="KeyHold", + description="Presses or releases keyboard keys independently, enabling key hold operations. Use action='down' to press and hold, 'up' to release. Supports modifier keys (shift, ctrl, alt, win) and special keys (f1-f12, enter, tab, escape, etc.). Release keys after use to avoid stuck keys.", + annotations=ToolAnnotations( + title="KeyHold", + readOnlyHint=False, + destructiveHint=True, + idempotentHint=False, + openWorldHint=False, + ), +) +@with_analytics(analytics, "KeyHold-Tool") +def key_hold_tool(action: Literal["down", "up"], keys: list[str], ctx: Context = None) -> str: + try: + return desktop.key_hold(action, keys) + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="ScreenInfo", + description="Returns information about all connected monitors including resolution, position, and which is the primary display. Useful for multi-monitor setups and coordinate targeting.", + annotations=ToolAnnotations( + title="ScreenInfo", + readOnlyHint=True, + destructiveHint=False, + idempotentHint=True, + openWorldHint=False, + ), +) +@with_analytics(analytics, "ScreenInfo-Tool") +def screen_info_tool(ctx: Context = None) -> str: + try: + return desktop.get_screen_info() + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="ScreenHighlight", + description=( + "Highlights a rectangular region on screen with a colored border for visual identification. " + "Useful for debugging automation targets. The highlight appears briefly then disappears. " + "Set coordinate_system='logical' to auto-convert from logical (DPI-scaled) coordinates to physical. " + "Default is 'physical' (no conversion)." + ), + annotations=ToolAnnotations( + title="ScreenHighlight", + readOnlyHint=True, + destructiveHint=False, + idempotentHint=True, + openWorldHint=False, + ), +) +@with_analytics(analytics, "ScreenHighlight-Tool") +def screen_highlight_tool( + loc: list[int], + size: list[int], + duration: float = 2.0, + color: Literal["red", "green", "blue", "yellow"] = "red", + coordinate_system: Literal["physical", "logical"] = "physical", + ctx: Context = None, +) -> str: + try: + loc = _to_physical(loc, coordinate_system) + size = _to_physical(size, coordinate_system) + return desktop.highlight_region(loc, size, duration, color) + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="MousePath", + description=( + "Moves the mouse cursor smoothly through a series of waypoints. " + "Each waypoint is [x, y]. The movement is interpolated over the specified duration for smooth animation. " + "Set coordinate_system='logical' to auto-convert from logical (DPI-scaled) coordinates to physical. " + "Default is 'physical' (no conversion)." + ), + annotations=ToolAnnotations( + title="MousePath", + readOnlyHint=False, + destructiveHint=False, + idempotentHint=False, + openWorldHint=False, + ), +) +@with_analytics(analytics, "MousePath-Tool") +def mouse_path_tool( + path: list[list[int]], + duration: float = 0.5, + coordinate_system: Literal["physical", "logical"] = "physical", + ctx: Context = None, +) -> str: + try: + path = _path_to_physical(path, coordinate_system) + return desktop.mouse_path(path, duration) + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="ScreenReader", + description=( + "Reads text from a screen region using OCR (Optical Character Recognition). " + "Uses Windows built-in OCR engine. Specify a region [x, y, width, height] to read from a specific area, " + "or omit for the full screen. " + "Set coordinate_system='logical' to auto-convert from logical (DPI-scaled) coordinates to physical. " + "Default is 'physical' (no conversion)." + ), + annotations=ToolAnnotations( + title="ScreenReader", + readOnlyHint=True, + destructiveHint=False, + idempotentHint=True, + openWorldHint=False, + ), +) +@with_analytics(analytics, "ScreenReader-Tool") +def screen_reader_tool( + region: list[int] | None = None, + language: str = "en", + coordinate_system: Literal["physical", "logical"] = "physical", + ctx: Context = None, +) -> str: + try: + if region is not None: + region = _region_to_physical(region, coordinate_system) + return desktop.read_screen_text(region, language) + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="WaitForChange", + description=( + "Waits until a screen region visually changes beyond a threshold. " + "Useful for waiting for loading to complete, animations to finish, or content to update. " + "Compares pixel data between captures. Returns when change is detected or timeout is reached. " + "Set coordinate_system='logical' to auto-convert from logical (DPI-scaled) coordinates to physical. " + "Default is 'physical' (no conversion)." + ), + annotations=ToolAnnotations( + title="WaitForChange", + readOnlyHint=True, + destructiveHint=False, + idempotentHint=True, + openWorldHint=False, + ), +) +@with_analytics(analytics, "WaitForChange-Tool") +def wait_for_change_tool( + region: list[int], + timeout: float = 30.0, + threshold: float = 0.05, + poll_interval: float = 0.5, + coordinate_system: Literal["physical", "logical"] = "physical", + ctx: Context = None, +) -> str: + try: + region = _region_to_physical(region, coordinate_system) + return desktop.wait_for_change(region, timeout, threshold, poll_interval) + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="FindImage", + description=( + "Finds a template image on screen using visual template matching. " + "Returns the center coordinates and confidence score of the best match. " + "Requires opencv-python-headless: pip install 'windows-mcp[vision]'. " + "Optionally restrict search to a region [x, y, width, height]. " + "Set coordinate_system='logical' to auto-convert from logical (DPI-scaled) coordinates to physical. " + "Default is 'physical' (no conversion)." + ), + annotations=ToolAnnotations( + title="FindImage", + readOnlyHint=True, + destructiveHint=False, + idempotentHint=True, + openWorldHint=False, + ), +) +@with_analytics(analytics, "FindImage-Tool") +def find_image_tool( + template_path: str, + region: list[int] | None = None, + threshold: float = 0.8, + coordinate_system: Literal["physical", "logical"] = "physical", + ctx: Context = None, +) -> str: + try: + if region is not None: + region = _region_to_physical(region, coordinate_system) + return desktop.find_image(template_path, region, threshold) + except Exception as e: + return f"Error: {str(e)}" + + +# ============== SYSTEM CONTROL TOOLS ============== + + +@mcp.tool( + name="VolumeControl", + description="Control Windows system volume: get current level, set to specific value (0-100), mute, unmute, or toggle.", + annotations=ToolAnnotations(title="VolumeControl", readOnlyHint=False, destructiveHint=False), +) +@with_analytics(analytics, "VolumeControl-Tool") +def volume_control_tool( + action: Literal["get", "set", "mute", "unmute", "toggle"], + level: int | None = None, + ctx: Context = None, +) -> str: + try: + return desktop.volume_control(action, level) + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="BrightnessControl", + description="Control display brightness: get current level or set to specific value (0-100). Works on laptops; may not be supported on desktop monitors.", + annotations=ToolAnnotations( + title="BrightnessControl", readOnlyHint=False, destructiveHint=False + ), +) +@with_analytics(analytics, "BrightnessControl-Tool") +def brightness_control_tool( + action: Literal["get", "set"], + level: int | None = None, + ctx: Context = None, +) -> str: + try: + return desktop.brightness_control(action, level) + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="AppList", + description="List all running GUI applications with their PID and window title, or check if a specific application is running.", + annotations=ToolAnnotations(title="AppList", readOnlyHint=True, destructiveHint=False), +) +@with_analytics(analytics, "AppList-Tool") +def app_list_tool( + action: Literal["list", "isRunning"] = "list", + name: str | None = None, + ctx: Context = None, +) -> str: + try: + if action == "isRunning": + if not name: + return "Error: name is required for isRunning action" + return desktop.app_is_running(name) + return desktop.app_list() + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="Dialog", + description="Show a Windows dialog: alert (OK/Cancel), prompt (text input), choose (dropdown selection), or fileChoose (file picker). Returns the user's response.", + annotations=ToolAnnotations(title="Dialog", readOnlyHint=True, destructiveHint=False), +) +@with_analytics(analytics, "Dialog-Tool") +def dialog_tool( + dialog_type: Literal["alert", "prompt", "choose", "fileChoose"], + message: str | None = None, + title: str | None = None, + default_answer: str | None = None, + choices: list[str] | None = None, + ctx: Context = None, +) -> str: + try: + return desktop.show_dialog(dialog_type, message, title, default_answer, choices) + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="SystemInfoExtended", + description="Get extended Windows system information: OS version, computer name, user, uptime, battery, dark mode, WiFi network.", + annotations=ToolAnnotations( + title="SystemInfoExtended", readOnlyHint=True, destructiveHint=False, idempotentHint=True + ), +) +@with_analytics(analytics, "SystemInfoExtended-Tool") +def system_info_extended_tool(ctx: Context = None) -> str: + try: + return desktop.system_info_extended() + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="DarkMode", + description="Control Windows dark/light mode: get current state, enable, disable, or toggle. Applies to both apps and system theme.", + annotations=ToolAnnotations(title="DarkMode", readOnlyHint=False, destructiveHint=False), +) +@with_analytics(analytics, "DarkMode-Tool") +def dark_mode_tool( + action: Literal["get", "enable", "disable", "toggle"], + ctx: Context = None, +) -> str: + try: + return desktop.dark_mode_control(action) + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="SayText", + description="Speak text aloud using Windows text-to-speech (SAPI). Optionally specify a voice name and speaking rate (-10 to 10).", + annotations=ToolAnnotations(title="SayText", readOnlyHint=True, destructiveHint=False), +) +@with_analytics(analytics, "SayText-Tool") +def say_text_tool( + text: str, + voice: str | None = None, + rate: int | None = None, + ctx: Context = None, +) -> str: + try: + return desktop.say_text(text, voice, rate) + except Exception as e: + return f"Error: {str(e)}" + + +# ============== DEV WORKFLOW TOOLS ============== + + +@mcp.tool( + name="PortCheck", + description="Check if a network port is in use and what process owns it, or list all listening ports. Useful for dev server verification.", + annotations=ToolAnnotations( + title="PortCheck", readOnlyHint=True, destructiveHint=False, idempotentHint=True + ), +) +@with_analytics(analytics, "PortCheck-Tool") +def port_check_tool( + action: Literal["check", "list"], + port: int | None = None, + protocol: Literal["tcp", "udp", "both"] = "tcp", + ctx: Context = None, +) -> str: + try: + return desktop.port_check(action, port, protocol) + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="FileWatcher", + description="Watch a file or directory for changes (create, modify, delete). Blocks until a change is detected or timeout expires.", + annotations=ToolAnnotations(title="FileWatcher", readOnlyHint=True, destructiveHint=False), +) +@with_analytics(analytics, "FileWatcher-Tool") +def file_watcher_tool( + path: str, + timeout_seconds: int = 30, + event: Literal["any", "create", "modify", "delete"] = "any", + ctx: Context = None, +) -> str: + try: + return desktop.file_watcher(path, min(timeout_seconds, 300), event) + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="SearchFiles", + description="Search for files by name or content using PowerShell. Optionally limit search to a specific directory.", + annotations=ToolAnnotations( + title="SearchFiles", readOnlyHint=True, destructiveHint=False, idempotentHint=True + ), +) +@with_analytics(analytics, "SearchFiles-Tool") +def search_files_tool( + query: str, + search_type: Literal["name", "content"] = "name", + directory: str | None = None, + max_results: int = 20, + ctx: Context = None, +) -> str: + try: + return desktop.search_files(query, search_type, directory, min(max_results, 100)) + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="NetworkDiagnostics", + description="Network diagnostics: ping a host, DNS lookup, HTTP endpoint check, or list network interfaces.", + annotations=ToolAnnotations( + title="NetworkDiagnostics", readOnlyHint=True, destructiveHint=False, idempotentHint=True + ), +) +@with_analytics(analytics, "NetworkDiagnostics-Tool") +def network_diagnostics_tool( + action: Literal["ping", "dns", "http", "interfaces"], + host: str | None = None, + count: int = 3, + timeout: int = 5, + ctx: Context = None, +) -> str: + try: + return desktop.network_diagnostics(action, host, min(count, 10), min(timeout, 30)) + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="AccessibilityInspector", + description="Read the UI element tree of a Windows application window. Returns element hierarchy with control types, names, values, and enabled states.", + annotations=ToolAnnotations( + title="AccessibilityInspector", readOnlyHint=True, destructiveHint=False + ), +) +@with_analytics(analytics, "AccessibilityInspector-Tool") +def accessibility_inspector_tool( + app_name: str, + max_depth: int = 3, + ctx: Context = None, +) -> str: + try: + return desktop.accessibility_inspector(app_name, min(max_depth, 5)) + except Exception as e: + return f"Error: {str(e)}" + + +# ============== UI ELEMENT TOOLS ============== + + +@mcp.tool( + name="UIElement", + description=( + "Interact with UI elements in Windows applications using UIAutomation. " + "Modes: 'get' (element tree with depth/role filter), 'find' (search by name/role), " + "'click' (click by path or search), 'setValue' (set text/checkbox/slider value), " + "'typeInto' (focus element + type text), 'listWindows' (all windows with details), " + "'overview' (element role counts for app). " + "Path format: 'role index > role index' (e.g., 'pane 1 > button 2')." + ), + annotations=ToolAnnotations( + title="UIElement", + readOnlyHint=False, + destructiveHint=True, + idempotentHint=False, + openWorldHint=False, + ), +) +@with_analytics(analytics, "UIElement-Tool") +def ui_element_tool( + mode: Literal["get", "find", "click", "setValue", "typeInto", "listWindows", "overview"], + app: str | None = None, + path: str | None = None, + search: str | None = None, + role: str | None = None, + value: str | None = None, + text: str | None = None, + depth: int = 2, + clear: bool = False, + ctx: Context = None, +) -> str: + try: + if mode == "get": + if not app: + return "Error: app is required for 'get' mode" + return desktop.ui_element_get(app, min(depth, 5), role) + elif mode == "find": + if not app: + return "Error: app is required for 'find' mode" + if not search: + return "Error: search is required for 'find' mode" + return desktop.ui_element_find(app, search, role) + elif mode == "click": + if not app: + return "Error: app is required for 'click' mode" + if not path and not search: + return "Error: path or search is required for 'click' mode" + return desktop.ui_element_click(app, path, search) + elif mode == "setValue": + if not app: + return "Error: app is required for 'setValue' mode" + if value is None: + return "Error: value is required for 'setValue' mode" + if not path and not search: + return "Error: path or search is required for 'setValue' mode" + return desktop.ui_element_set_value(app, value, path, search) + elif mode == "typeInto": + if not app: + return "Error: app is required for 'typeInto' mode" + if text is None: + return "Error: text is required for 'typeInto' mode" + if not path and not search: + return "Error: path or search is required for 'typeInto' mode" + return desktop.ui_element_type_into(app, text, path, search, clear) + elif mode == "listWindows": + return desktop.ui_element_list_windows() + elif mode == "overview": + if not app: + return "Error: app is required for 'overview' mode" + return desktop.ui_element_overview(app) + else: + return f"Error: Unknown mode: {mode}" + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="WindowScreenshot", + description=( + "Capture a screenshot of a specific window by app name or window handle. " + "Returns the window screenshot as an image." + ), + annotations=ToolAnnotations( + title="WindowScreenshot", + readOnlyHint=True, + destructiveHint=False, + idempotentHint=True, + openWorldHint=False, + ), +) +@with_analytics(analytics, "WindowScreenshot-Tool") +def window_screenshot_tool( + app: str | None = None, + handle: int | None = None, + ctx: Context = None, +) -> list | str: + try: + if not app and not handle: + return "Error: app or handle is required" + img = desktop.capture_window_screenshot(app, handle) + if img is None: + return "Error: Could not capture window screenshot." + # Resize if larger than max + if img.width > MAX_IMAGE_WIDTH or img.height > MAX_IMAGE_HEIGHT: + ratio = min(MAX_IMAGE_WIDTH / img.width, MAX_IMAGE_HEIGHT / img.height) + img = img.resize((int(img.width * ratio), int(img.height * ratio)), PILImage.LANCZOS) + buffered = io.BytesIO() + img.save(buffered, format="PNG") + img_bytes = buffered.getvalue() + buffered.close() + return [Image(data=img_bytes, format="png")] + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="MultiMonitor", + description="Get information about all connected monitors including resolution, position, working area, and which is primary.", + annotations=ToolAnnotations( + title="MultiMonitor", + readOnlyHint=True, + destructiveHint=False, + idempotentHint=True, + openWorldHint=False, + ), +) +@with_analytics(analytics, "MultiMonitor-Tool") +def multi_monitor_tool(ctx: Context = None) -> str: + try: + return desktop.get_multi_monitor_info() + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="ScreenRecord", + description="Control screen recording using ffmpeg: start recording, stop recording, or check status. Requires ffmpeg in PATH.", + annotations=ToolAnnotations( + title="ScreenRecord", + readOnlyHint=False, + destructiveHint=False, + idempotentHint=False, + openWorldHint=False, + ), +) +@with_analytics(analytics, "ScreenRecord-Tool") +def screen_record_tool( + action: Literal["start", "stop", "status"] = "start", + output_path: str | None = None, + duration: int | None = None, + fps: int = 15, + ctx: Context = None, +) -> str: + try: + return desktop.screen_record(action, output_path, duration, min(fps, 60)) + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="MenuClick", + description="Navigate and click menu items by path in a Windows application. Use '>' to separate menu levels (e.g., 'File > Save As').", + annotations=ToolAnnotations( + title="MenuClick", + readOnlyHint=False, + destructiveHint=True, + idempotentHint=False, + openWorldHint=False, + ), +) +@with_analytics(analytics, "MenuClick-Tool") +def menu_click_tool( + app: str, + menu_path: str, + ctx: Context = None, +) -> str: + try: + return desktop.menu_click(app, menu_path) + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="QuickLook", + description="Open a file with its default Windows application. Similar to double-clicking the file in Explorer.", + annotations=ToolAnnotations( + title="QuickLook", + readOnlyHint=False, + destructiveHint=False, + idempotentHint=False, + openWorldHint=True, + ), +) +@with_analytics(analytics, "QuickLook-Tool") +def quick_look_tool(path: str, ctx: Context = None) -> str: + try: + return desktop.quick_look(path) + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="WindowTiling", + description=( + "Arrange windows in tiling layouts. " + "Modes: 'left'/'right'/'top'/'bottom' (half-screen tiling), " + "'maximize', 'minimize', 'restore', 'cascade' (cascade all windows). " + "Requires app name for single-window operations." + ), + annotations=ToolAnnotations( + title="WindowTiling", + readOnlyHint=False, + destructiveHint=False, + idempotentHint=False, + openWorldHint=False, + ), +) +@with_analytics(analytics, "WindowTiling-Tool") +def window_tiling_tool( + mode: Literal["left", "right", "top", "bottom", "maximize", "minimize", "restore", "cascade"], + app: str | None = None, + ctx: Context = None, +) -> str: + try: + return desktop.window_tiling(mode, app) + except Exception as e: + return f"Error: {str(e)}" + + +@mcp.tool( + name="ClipboardInfo", + description="Get detailed clipboard format information: available formats, text preview, image dimensions. More detailed than Clipboard tool's 'get' mode.", + annotations=ToolAnnotations( + title="ClipboardInfo", + readOnlyHint=True, + destructiveHint=False, + idempotentHint=True, + openWorldHint=False, + ), +) +@with_analytics(analytics, "ClipboardInfo-Tool") +def clipboard_info_tool(ctx: Context = None) -> str: + try: + return desktop.get_clipboard_info() + except Exception as e: + return f"Error: {str(e)}" + class Transport(Enum): STDIO = "stdio" SSE = "sse" STREAMABLE_HTTP = "streamable-http" + def __str__(self): return self.value + class Mode(Enum): LOCAL = "local" REMOTE = "remote" + def __str__(self): return self.value + @click.command() @click.option( "--transport", help="The transport layer used by the MCP server.", - type=click.Choice([Transport.STDIO.value,Transport.SSE.value,Transport.STREAMABLE_HTTP.value]), - default='stdio' + type=click.Choice( + [Transport.STDIO.value, Transport.SSE.value, Transport.STREAMABLE_HTTP.value] + ), + default="stdio", ) @click.option( "--host", @@ -710,20 +1537,19 @@ def __str__(self): type=int, show_default=True, ) - def main(transport, host, port): - config=Config( - mode=os.getenv("MODE",Mode.LOCAL.value).lower(), - sandbox_id=os.getenv("SANDBOX_ID",''), - api_key=os.getenv("API_KEY",'') + config = Config( + mode=os.getenv("MODE", Mode.LOCAL.value).lower(), + sandbox_id=os.getenv("SANDBOX_ID", ""), + api_key=os.getenv("API_KEY", ""), ) match config.mode: case Mode.LOCAL.value: match transport: case Transport.STDIO.value: - mcp.run(transport=Transport.STDIO.value,show_banner=False) - case Transport.SSE.value|Transport.STREAMABLE_HTTP.value: - mcp.run(transport=transport,host=host,port=port,show_banner=False) + mcp.run(transport=Transport.STDIO.value, show_banner=False) + case Transport.SSE.value | Transport.STREAMABLE_HTTP.value: + mcp.run(transport=transport, host=host, port=port, show_banner=False) case _: raise ValueError(f"Invalid transport: {transport}") case Mode.REMOTE.value: @@ -731,19 +1557,20 @@ def main(transport, host, port): raise ValueError("SANDBOX_ID is required for MODE: remote") if not config.api_key: raise ValueError("API_KEY is required for MODE: remote") - client=AuthClient(api_key=config.api_key,sandbox_id=config.sandbox_id) + client = AuthClient(api_key=config.api_key, sandbox_id=config.sandbox_id) client.authenticate() - backend=StreamableHttpTransport(url=client.proxy_url,headers=client.proxy_headers) - proxy_mcp=FastMCP.as_proxy(ProxyClient(backend),name="windows-mcp") + backend = StreamableHttpTransport(url=client.proxy_url, headers=client.proxy_headers) + proxy_mcp = FastMCP.as_proxy(ProxyClient(backend), name="windows-mcp") match transport: case Transport.STDIO.value: - proxy_mcp.run(transport=Transport.STDIO.value,show_banner=False) - case Transport.SSE.value|Transport.STREAMABLE_HTTP.value: - proxy_mcp.run(transport=transport,host=host,port=port,show_banner=False) + proxy_mcp.run(transport=Transport.STDIO.value, show_banner=False) + case Transport.SSE.value | Transport.STREAMABLE_HTTP.value: + proxy_mcp.run(transport=transport, host=host, port=port, show_banner=False) case _: raise ValueError(f"Invalid transport: {transport}") case _: raise ValueError(f"Invalid mode: {config.mode}") + if __name__ == "__main__": main() diff --git a/src/windows_mcp/desktop/service.py b/src/windows_mcp/desktop/service.py index a0d40d2..c2d996b 100755 --- a/src/windows_mcp/desktop/service.py +++ b/src/windows_mcp/desktop/service.py @@ -1,4 +1,5 @@ -from windows_mcp.desktop.utils import ps_quote, ps_quote_for_xml +from windows_mcp.desktop.utils import ps_quote, ps_quote_for_xml, approximate_color_name +import pathlib from windows_mcp.vdm.core import ( get_all_desktops, get_current_desktop, @@ -30,6 +31,7 @@ import re import os import io +import tempfile logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) @@ -46,15 +48,82 @@ "option": "Alt", } +# Virtual key code mapping for KeyHold tool +_VK_MAP = { + "shift": uia.Keys.VK_SHIFT, + "ctrl": uia.Keys.VK_CONTROL, + "control": uia.Keys.VK_CONTROL, + "alt": uia.Keys.VK_MENU, + "win": uia.Keys.VK_LWIN, + "windows": uia.Keys.VK_LWIN, + "enter": uia.Keys.VK_RETURN, + "return": uia.Keys.VK_RETURN, + "tab": uia.Keys.VK_TAB, + "escape": uia.Keys.VK_ESCAPE, + "esc": uia.Keys.VK_ESCAPE, + "space": uia.Keys.VK_SPACE, + "backspace": uia.Keys.VK_BACK, + "delete": uia.Keys.VK_DELETE, + "insert": uia.Keys.VK_INSERT, + "home": uia.Keys.VK_HOME, + "end": uia.Keys.VK_END, + "pageup": uia.Keys.VK_PRIOR, + "pagedown": uia.Keys.VK_NEXT, + "up": uia.Keys.VK_UP, + "down": uia.Keys.VK_DOWN, + "left": uia.Keys.VK_LEFT, + "right": uia.Keys.VK_RIGHT, + "f1": uia.Keys.VK_F1, + "f2": uia.Keys.VK_F2, + "f3": uia.Keys.VK_F3, + "f4": uia.Keys.VK_F4, + "f5": uia.Keys.VK_F5, + "f6": uia.Keys.VK_F6, + "f7": uia.Keys.VK_F7, + "f8": uia.Keys.VK_F8, + "f9": uia.Keys.VK_F9, + "f10": uia.Keys.VK_F10, + "f11": uia.Keys.VK_F11, + "f12": uia.Keys.VK_F12, + "capslock": uia.Keys.VK_CAPITAL, + "numlock": uia.Keys.VK_NUMLOCK, + "scrolllock": uia.Keys.VK_SCROLL, + "printscreen": uia.Keys.VK_SNAPSHOT, +} + +# BGR color values for Win32 GDI highlight rendering +_HIGHLIGHT_COLORS = { + "red": 0x0000FF, + "green": 0x00FF00, + "blue": 0xFF0000, + "yellow": 0x00FFFF, +} + def _escape_text_for_sendkeys(text: str) -> str: - """Escape special characters so uia.SendKeys types them correctly.""" + """Escape ALL special characters so uia.SendKeys types them literally. + + SendKeys special chars: +^%~()[]{} + + = Shift, ^ = Ctrl, % = Alt, ~ = Enter + () = grouping, [] = reserved, {} = key names + All must be wrapped in {x} to be typed literally. + """ result = [] for ch in text: if ch == "{": result.append("{{}") elif ch == "}": result.append("{}}") + elif ch in "+^%~": + result.append("{" + ch + "}") + elif ch == "(": + result.append("{(}") + elif ch == ")": + result.append("{)}") + elif ch == "[": + result.append("{[}") + elif ch == "]": + result.append("{]}") elif ch == "\n": result.append("{Enter}") elif ch == "\t": @@ -225,23 +294,26 @@ def execute_command(self, command: str, timeout: int = 10) -> tuple[str, int]: if ".EXE" not in env.get("PATHEXT", ""): try: import winreg + with winreg.OpenKey( winreg.HKEY_LOCAL_MACHINE, r"SYSTEM\CurrentControlSet\Control\Session Manager\Environment", ) as key: env["PATHEXT"] = winreg.QueryValueEx(key, "PATHEXT")[0] except Exception: - env["PATHEXT"] = ".COM;.EXE;.BAT;.CMD;.VBS;.VBE;.JS;.JSE;.WSF;.WSH;.MSC;.CPL;.PY;.PYW" + env["PATHEXT"] = ( + ".COM;.EXE;.BAT;.CMD;.VBS;.VBE;.JS;.JSE;.WSF;.WSH;.MSC;.CPL;.PY;.PYW" + ) shell = "pwsh" if shutil.which("pwsh") else "powershell" - + args = [shell, "-NoProfile"] - # Only older Windows PowerShell (5.1) uses -OutputFormat Text successfully here + # Only older Windows PowerShell (5.1) uses -OutputFormat Text successfully here shell_name = os.path.basename(shell).lower().replace(".exe", "") if shell_name == "powershell": args.extend(["-OutputFormat", "Text"]) args.extend(["-EncodedCommand", encoded]) - + result = subprocess.run( args, capture_output=True, # No errors='ignore' - let subprocess return bytes @@ -369,10 +441,16 @@ def launch_app(self, name: str) -> tuple[str, int, int]: else: # Validate appid format (allow UWP IDs like Microsoft.WindowsNotepad_...!App) # Chars to ignore for validation: \ , _ , . , - , ! - validation_id = appid.replace("\\", "").replace("_", "").replace(".", "").replace("-", "").replace("!", "") + validation_id = ( + appid.replace("\\", "") + .replace("_", "") + .replace(".", "") + .replace("-", "") + .replace("!", "") + ) if not validation_id.isalnum(): return (f"Invalid app identifier: {appid}", 1, 0) - + safe = ps_quote(f"shell:AppsFolder\\{appid}") command = f"Start-Process {safe}" response, status = self.execute_command(command) @@ -479,7 +557,7 @@ def get_coordinates_from_label(self, label: int) -> tuple[int, int]: raise IndexError(f"Label {label} out of range") return element_node.center.x, element_node.center.y - def click(self, loc: tuple[int, int]|list[int], button: str = "left", clicks: int = 2): + def click(self, loc: tuple[int, int] | list[int], button: str = "left", clicks: int = 2): if isinstance(loc, list): x, y = loc[0], loc[1] else: @@ -561,7 +639,7 @@ def scroll( return 'Invalid type. Use "horizontal" or "vertical".' return None - def drag(self, loc: tuple[int, int]|list[int]): + def drag(self, loc: tuple[int, int] | list[int]): if isinstance(loc, list): x, y = loc[0], loc[1] else: @@ -586,17 +664,24 @@ def shortcut(self, shortcut: str): sendkeys_str += "{" + name + "}" uia.SendKeys(sendkeys_str, interval=0.01) - def multi_select(self, press_ctrl: bool | str = False, locs: list[tuple[int, int]] = []): + def multi_select( + self, press_ctrl: bool | str = False, locs: list[tuple[int, int]] | None = None + ): + if locs is None: + locs = [] press_ctrl = press_ctrl is True or ( isinstance(press_ctrl, str) and press_ctrl.lower() == "true" ) if press_ctrl: uia.PressKey(uia.Keys.VK_CONTROL, waitTime=0.05) - for loc in locs: - x, y = loc - uia.Click(x, y, waitTime=0.2) - sleep(0.5) - uia.ReleaseKey(uia.Keys.VK_CONTROL, waitTime=0.05) + try: + for loc in locs: + x, y = loc + uia.Click(x, y, waitTime=0.2) + sleep(0.5) + finally: + if press_ctrl: + uia.ReleaseKey(uia.Keys.VK_CONTROL, waitTime=0.05) def multi_edit(self, locs: list[tuple[int, int, str]]): for loc in locs: @@ -640,7 +725,7 @@ def is_window_visible(self, window: uia.Control) -> bool: def is_overlay_window(self, element: uia.Control) -> bool: no_children = len(element.GetChildren()) == 0 is_name = "Overlay" in element.Name.strip() - return no_children or is_name + return no_children and is_name def get_controls_handles(self, optimized: bool = False): handles = set() @@ -803,8 +888,6 @@ def get_xpath_from_element(self, element: uia.Control): xpath = "/".join(path_parts) return xpath - - def get_windows_version(self) -> str: response, status = self.execute_command("(Get-CimInstance Win32_OperatingSystem).Caption") if status == 0: @@ -899,9 +982,9 @@ def draw_annotation(label, node: TreeElementNode): font=font, ) - # Draw annotations in parallel - with ThreadPoolExecutor() as executor: - executor.map(draw_annotation, range(len(nodes)), nodes) + # Draw annotations sequentially (ImageDraw is not thread-safe) + for i, node in enumerate(nodes): + draw_annotation(i, node) return padded_screenshot def send_notification(self, title: str, message: str) -> str: @@ -933,7 +1016,7 @@ def send_notification(self, title: str, message: str) -> str: if status == 0: return f'Notification sent: "{title}" - {message}' else: - return f'Notification may have been sent. PowerShell output: {response[:200]}' + return f"Notification may have been sent. PowerShell output: {response[:200]}" def list_processes( self, @@ -1015,20 +1098,16 @@ def kill_process( return f'No process matching "{name}" found or access denied.' return f"{'Force killed' if force else 'Terminated'}: {', '.join(killed)}" - - - - def registry_get(self, path: str, name: str) -> str: q_path = ps_quote(path) q_name = ps_quote(name) command = f"Get-ItemProperty -Path {q_path} -Name {q_name} | Select-Object -ExpandProperty {q_name}" response, status = self.execute_command(command) if status != 0: - return f'Error reading registry: {response.strip()}' + return f"Error reading registry: {response.strip()}" return f'Registry value [{path}] "{name}" = {response.strip()}' - def registry_set(self, path: str, name: str, value: str, reg_type: str = 'String') -> str: + def registry_set(self, path: str, name: str, value: str, reg_type: str = "String") -> str: q_path = ps_quote(path) q_name = ps_quote(name) q_value = ps_quote(value) @@ -1041,7 +1120,7 @@ def registry_set(self, path: str, name: str, value: str, reg_type: str = 'String ) response, status = self.execute_command(command) if status != 0: - return f'Error writing registry: {response.strip()}' + return f"Error writing registry: {response.strip()}" return f'Registry value [{path}] "{name}" set to "{value}" (type: {reg_type}).' def registry_delete(self, path: str, name: str | None = None) -> str: @@ -1051,14 +1130,14 @@ def registry_delete(self, path: str, name: str | None = None) -> str: command = f"Remove-ItemProperty -Path {q_path} -Name {q_name} -Force" response, status = self.execute_command(command) if status != 0: - return f'Error deleting registry value: {response.strip()}' + return f"Error deleting registry value: {response.strip()}" return f'Registry value [{path}] "{name}" deleted.' else: command = f"Remove-Item -Path {q_path} -Recurse -Force" response, status = self.execute_command(command) if status != 0: - return f'Error deleting registry key: {response.strip()}' - return f'Registry key [{path}] deleted.' + return f"Error deleting registry key: {response.strip()}" + return f"Registry key [{path}] deleted." def registry_list(self, path: str) -> str: q_path = ps_quote(path) @@ -1066,15 +1145,15 @@ def registry_list(self, path: str) -> str: f"$values = (Get-ItemProperty -Path {q_path} -ErrorAction Stop | " f"Select-Object * -ExcludeProperty PS* | Format-List | Out-String).Trim(); " f"$subkeys = (Get-ChildItem -Path {q_path} -ErrorAction SilentlyContinue | " - f"Select-Object -ExpandProperty PSChildName) -join \"`n\"; " - f"if ($values) {{ Write-Output \"Values:`n$values\" }}; " - f"if ($subkeys) {{ Write-Output \"`nSub-Keys:`n$subkeys\" }}; " + f'Select-Object -ExpandProperty PSChildName) -join "`n"; ' + f'if ($values) {{ Write-Output "Values:`n$values" }}; ' + f'if ($subkeys) {{ Write-Output "`nSub-Keys:`n$subkeys" }}; ' f"if (-not $values -and -not $subkeys) {{ Write-Output 'No values or sub-keys found.' }}" ) response, status = self.execute_command(command) if status != 0: - return f'Error listing registry: {response.strip()}' - return f'Registry key [{path}]:\n{response.strip()}' + return f"Error listing registry: {response.strip()}" + return f"Registry key [{path}]:\n{response.strip()}" @contextmanager def auto_minimize(self): @@ -1084,3 +1163,1779 @@ def auto_minimize(self): yield finally: uia.ShowWindow(handle, win32con.SW_RESTORE) + + def get_cursor_position(self) -> str: + x, y = uia.GetCursorPos() + return f"Cursor position: ({x}, {y})" + + def get_pixel_color(self, loc: list[int]) -> str: + if len(loc) != 2: + return "Error: loc must be [x, y]" + x, y = loc[0], loc[1] + try: + img = ImageGrab.grab(bbox=(x, y, x + 1, y + 1), all_screens=True) + pixel = img.getpixel((0, 0)) + r, g, b = pixel[0], pixel[1], pixel[2] + hex_color = f"#{r:02X}{g:02X}{b:02X}" + name = approximate_color_name(r, g, b) + return f"Color at ({x}, {y}): R={r}, G={g}, B={b} ({hex_color}) - {name}" + except Exception as e: + return f"Error reading pixel at ({x}, {y}): {str(e)}" + + def key_hold(self, action: str, keys: list[str]) -> str: + if action not in ("down", "up"): + return f"Error: action must be 'down' or 'up', got '{action}'" + results = [] + for key_name in keys: + k = key_name.strip().lower() + vk = _VK_MAP.get(k) + if vk is None and len(k) == 1: + vk = ord(k.upper()) + if vk is None: + available = ", ".join(sorted(_VK_MAP.keys())) + return f"Error: Unknown key '{key_name}'. Available keys: {available}" + if action == "down": + uia.PressKey(vk, waitTime=0.05) + results.append(key_name) + elif action == "up": + uia.ReleaseKey(vk, waitTime=0.05) + results.append(key_name) + verb = "Pressed" if action == "down" else "Released" + return f"{verb} keys: {', '.join(results)}" + + def get_screen_info(self) -> str: + try: + ps_cmd = ( + "Add-Type -AssemblyName System.Windows.Forms; " + "[System.Windows.Forms.Screen]::AllScreens | ForEach-Object { " + "$_.DeviceName + '|' + $_.Bounds.Width + '|' + $_.Bounds.Height + '|' " + "+ $_.Bounds.X + '|' + $_.Bounds.Y + '|' + $_.Primary }" + ) + result, status = self.execute_command(ps_cmd, timeout=10) + except Exception: + size = self.get_screen_size() + return f"Monitors (1):\n[1] {size.width}x{size.height} (primary) at (0, 0)" + + if status != 0 or not result.strip(): + size = self.get_screen_size() + return f"Monitors (1):\n[1] {size.width}x{size.height} (primary) at (0, 0)" + + lines = [] + for i, line in enumerate(result.strip().split("\n"), 1): + parts = line.strip().split("|") + if len(parts) >= 6: + w, h, x, y = parts[1], parts[2], parts[3], parts[4] + primary_str = " (primary)" if parts[5].strip().lower() == "true" else "" + lines.append(f"[{i}] {w}x{h}{primary_str} at ({x}, {y})") + + if not lines: + size = self.get_screen_size() + return f"Monitors (1):\n[1] {size.width}x{size.height} (primary) at (0, 0)" + + try: + dpi_scale = self.get_dpi_scaling() + dpi_info = f"\nDPI scaling: {dpi_scale}x" + except Exception: + dpi_info = "" + + return f"Monitors ({len(lines)}):\n" + "\n".join(lines) + dpi_info + + def highlight_region( + self, loc: list[int], size: list[int], duration: float = 2.0, color: str = "red" + ) -> str: + if len(loc) != 2: + return "Error: loc must be [x, y]" + if len(size) != 2: + return "Error: size must be [width, height]" + x, y = loc[0], loc[1] + w, h = size[0], size[1] + if w <= 0 or h <= 0: + return "Error: width and height must be positive" + duration = min(max(duration, 0.1), 30.0) # Clamp between 100ms and 30s + color_val = _HIGHLIGHT_COLORS.get(color.lower(), 0x0000FF) + hdc = None + pen = None + try: + hdc = ctypes.windll.user32.GetDC(0) + if not hdc: + return "Error: Could not acquire screen device context" + pen = ctypes.windll.gdi32.CreatePen(0, 3, color_val) # PS_SOLID, 3px + if not pen: + return "Error: Could not create GDI pen" + old_pen = ctypes.windll.gdi32.SelectObject(hdc, pen) + brush = ctypes.windll.gdi32.GetStockObject(5) # NULL_BRUSH + old_brush = ctypes.windll.gdi32.SelectObject(hdc, brush) + ctypes.windll.gdi32.Rectangle(hdc, x, y, x + w, y + h) + ctypes.windll.gdi32.SelectObject(hdc, old_pen) + ctypes.windll.gdi32.SelectObject(hdc, old_brush) + sleep(duration) + # Invalidate the region to clear the highlight + ctypes.windll.user32.InvalidateRect(0, None, True) + return f"Highlighted region ({x}, {y}, {w}x{h}) in {color} for {duration}s." + except Exception as e: + return f"Error highlighting region: {str(e)}" + finally: + if pen: + ctypes.windll.gdi32.DeleteObject(pen) + if hdc: + ctypes.windll.user32.ReleaseDC(0, hdc) + + def mouse_path(self, path: list[list[int]], duration: float = 0.5) -> str: + if not path or len(path) < 2: + return "Error: path must contain at least 2 waypoints [[x1,y1], [x2,y2], ...]" + if duration < 0: + return "Error: duration must be non-negative" + for i, point in enumerate(path): + if len(point) != 2: + return f"Error: waypoint {i} must be [x, y], got {point}" + + if duration == 0: + x, y = path[-1] + uia.MoveTo(x, y, moveSpeed=0) + return f"Mouse moved through {len(path)} waypoints in 0s." + + total_segments = len(path) - 1 + segment_duration = duration / total_segments if total_segments > 0 else 0 + steps_per_segment = max(1, int(segment_duration * 60)) # ~60 fps + + for seg in range(total_segments): + x1, y1 = path[seg] + x2, y2 = path[seg + 1] + step_delay = segment_duration / steps_per_segment if steps_per_segment > 0 else 0 + for step in range(steps_per_segment + 1): + t = step / steps_per_segment if steps_per_segment > 0 else 1.0 + ix = int(x1 + (x2 - x1) * t) + iy = int(y1 + (y2 - y1) * t) + uia.MoveTo(ix, iy, moveSpeed=0) + if step_delay > 0: + sleep(step_delay) + + return f"Mouse moved through {len(path)} waypoints in {duration}s." + + def read_screen_text(self, region: list[int] | None = None, language: str = "en") -> str: + tmp_path = None + try: + if region is not None: + if len(region) != 4: + return "Error: region must be [x, y, width, height]" + x, y, w, h = region + if w <= 0 or h <= 0: + return "Error: width and height must be positive" + img = ImageGrab.grab(bbox=(x, y, x + w, y + h), all_screens=True) + else: + img = ImageGrab.grab(all_screens=True) + + with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp: + tmp_path = tmp.name + img.save(tmp_path, format="PNG") + + # Primary: Windows built-in OCR via PowerShell + safe_path = ps_quote(tmp_path) + ps_script = ( + "Add-Type -AssemblyName 'System.Runtime.WindowsRuntime'\n" + "[void][Windows.Foundation.IAsyncOperation``1,Windows.Foundation,ContentType=WindowsRuntime]\n" + "[void][Windows.Media.Ocr.OcrEngine,Windows.Foundation,ContentType=WindowsRuntime]\n" + "[void][Windows.Graphics.Imaging.BitmapDecoder,Windows.Foundation,ContentType=WindowsRuntime]\n" + "$stream = [System.IO.File]::OpenRead(" + safe_path + ")\n" + "$raStream = [System.IO.WindowsRuntimeStreamExtensions]::AsRandomAccessStream($stream)\n" + "$decoder = [Windows.Graphics.Imaging.BitmapDecoder]::CreateAsync($raStream).GetAwaiter().GetResult()\n" + "$bitmap = $decoder.GetSoftwareBitmapAsync().GetAwaiter().GetResult()\n" + "$engine = [Windows.Media.Ocr.OcrEngine]::TryCreateFromUserProfileLanguages()\n" + "if ($engine) {\n" + " $result = $engine.RecognizeAsync($bitmap).GetAwaiter().GetResult()\n" + " Write-Output $result.Text\n" + "} else { Write-Output 'OCR_ENGINE_UNAVAILABLE' }\n" + "$stream.Dispose()" + ) + result, status = self.execute_command(ps_script, timeout=30) + + if status == 0 and "OCR_ENGINE_UNAVAILABLE" not in result: + text = result.strip() + if text: + return f"OCR text:\n{text}" + return "No text detected in the specified region." + + # Fallback: pytesseract + try: + import pytesseract + + text = pytesseract.image_to_string(img, lang=language).strip() + if text: + return f"OCR text (pytesseract):\n{text}" + return "No text detected in the specified region." + except ImportError: + return ( + "Error: Windows OCR unavailable and pytesseract not installed. " + "Install with: pip install 'windows-mcp[ocr]'" + ) + except Exception as e: + return f"Error reading screen text: {str(e)}" + finally: + if tmp_path: + try: + os.unlink(tmp_path) + except OSError: + pass + + def wait_for_change( + self, + region: list[int], + timeout: float = 30.0, + threshold: float = 0.05, + poll_interval: float = 0.5, + ) -> str: + if len(region) != 4: + return "Error: region must be [x, y, width, height]" + x, y, w, h = region + if w <= 0 or h <= 0: + return "Error: width and height must be positive" + if not 0.0 <= threshold <= 1.0: + return "Error: threshold must be between 0.0 and 1.0" + timeout = min(timeout, 60.0) # Hard cap at 60s + poll_interval = max(poll_interval, 0.1) # Prevent CPU spinning + bbox = (x, y, x + w, y + h) + + try: + baseline = list(ImageGrab.grab(bbox=bbox, all_screens=True).getdata()) + except Exception as e: + return f"Error capturing baseline: {str(e)}" + + total_pixels = len(baseline) + if total_pixels == 0: + return "Error: region has zero pixels." + + start = time() + while (time() - start) < timeout: + sleep(poll_interval) + try: + current = list(ImageGrab.grab(bbox=bbox, all_screens=True).getdata()) + except Exception: + continue + + diff_count = sum(1 for a, b in zip(baseline, current) if a != b) + diff_ratio = diff_count / total_pixels + + if diff_ratio >= threshold: + elapsed = round(time() - start, 1) + pct = round(diff_ratio * 100, 1) + return ( + f"Change detected in region ({x}, {y}, {w}x{h}) after {elapsed}s. " + f"{pct}% of pixels changed." + ) + + return ( + f"Timeout: no significant change detected in region ({x}, {y}, {w}x{h}) " + f"after {timeout}s (threshold: {threshold * 100}%)." + ) + + def find_image( + self, + template_path: str, + region: list[int] | None = None, + threshold: float = 0.8, + ) -> str: + if not 0.0 <= threshold <= 1.0: + return "Error: threshold must be between 0.0 and 1.0" + + try: + import cv2 + import numpy as np + except ImportError: + return ( + "Error: opencv-python-headless and numpy are required. " + "Install with: pip install 'windows-mcp[vision]'" + ) + + # Resolve and validate path to prevent traversal attacks + import pathlib + + try: + resolved = pathlib.Path(template_path).resolve() + except (ValueError, OSError): + return f"Error: Invalid template path: {template_path}" + + if not resolved.is_file(): + return f"Error: Template file not found: {template_path}" + + # Only allow common image extensions + allowed_ext = {".png", ".jpg", ".jpeg", ".bmp", ".tiff", ".tif", ".webp"} + if resolved.suffix.lower() not in allowed_ext: + return ( + f"Error: Template must be an image file ({', '.join(sorted(allowed_ext))}), " + f"got '{resolved.suffix}'" + ) + + try: + template = cv2.imread(str(resolved), cv2.IMREAD_COLOR) + if template is None: + return f"Error: Could not read template image: {template_path}" + + if region is not None: + if len(region) != 4: + return "Error: region must be [x, y, width, height]" + x, y, w, h = region + if w <= 0 or h <= 0: + return "Error: width and height must be positive" + screen_img = ImageGrab.grab(bbox=(x, y, x + w, y + h), all_screens=True) + else: + x, y = 0, 0 + screen_img = ImageGrab.grab(all_screens=True) + + screen_rgb = np.array(screen_img) + screen_bgr = cv2.cvtColor(screen_rgb, cv2.COLOR_RGB2BGR) + + th, tw = template.shape[:2] + sh, sw = screen_bgr.shape[:2] + if th > sh or tw > sw: + return f"Error: Template ({tw}x{th}) is larger than search area ({sw}x{sh})." + + result = cv2.matchTemplate(screen_bgr, template, cv2.TM_CCOEFF_NORMED) + _, max_val, _, max_loc = cv2.minMaxLoc(result) + + if max_val >= threshold: + cx = x + max_loc[0] + tw // 2 + cy = y + max_loc[1] + th // 2 + confidence = round(max_val, 3) + return ( + f"Match found at ({cx}, {cy}) with confidence {confidence}. " + f"Template size: {tw}x{th}." + ) + else: + return ( + f"No match found (best confidence: {round(max_val, 3)}, " + f"threshold: {threshold}). Template: {tw}x{th}." + ) + except Exception as e: + return f"Error during image matching: {str(e)}" + + # ============== SYSTEM CONTROL METHODS ============== + + def volume_control(self, action: str, level: int | None = None) -> str: + """Control system volume via PowerShell COM AudioEndpointVolume.""" + if action == "get": + ps = ( + "Add-Type -TypeDefinition @'\n" + "using System.Runtime.InteropServices;\n" + '[Guid("5CDF2C82-841E-4546-9722-0CF74078229A"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]\n' + "interface IAudioEndpointVolume {\n" + " int _0(); int _1(); int _2(); int _3(); int _4(); int _5(); int _6();\n" + " int SetMasterVolumeLevelScalar(float fLevel, System.Guid pguidEventContext);\n" + " int GetMasterVolumeLevelScalar(out float pfLevel);\n" + " int SetMute([MarshalAs(UnmanagedType.Bool)] bool bMute, System.Guid pguidEventContext);\n" + " int GetMute(out bool pbMute);\n" + "}\n" + '[Guid("D666063F-1587-4E43-81F1-B948E807363F"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]\n' + "interface IMMDevice { int Activate(ref System.Guid iid, int dwClsCtx, IntPtr pActivationParams, [MarshalAs(UnmanagedType.IUnknown)] out object ppInterface); }\n" + '[Guid("A95664D2-9614-4F35-A746-DE8DB63617E6"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]\n' + "interface IMMDeviceEnumerator { int GetDefaultAudioEndpoint(int dataFlow, int role, out IMMDevice ppDevice); }\n" + '[ComImport, Guid("BCDE0395-E52F-467C-8E3D-C4579291692E")] class MMDeviceEnumeratorComObject { }\n' + "public class Audio {\n" + " static IAudioEndpointVolume GetVol() {\n" + " var enumerator = new MMDeviceEnumeratorComObject() as IMMDeviceEnumerator;\n" + " IMMDevice dev; enumerator.GetDefaultAudioEndpoint(0, 1, out dev);\n" + " var iid = typeof(IAudioEndpointVolume).GUID; object o;\n" + " dev.Activate(ref iid, 1, IntPtr.Zero, out o);\n" + " return (IAudioEndpointVolume)o;\n" + " }\n" + " public static float Volume { get { float v; GetVol().GetMasterVolumeLevelScalar(out v); return v; } set { GetVol().SetMasterVolumeLevelScalar(value, System.Guid.Empty); } }\n" + " public static bool Mute { get { bool m; GetVol().GetMute(out m); return m; } set { GetVol().SetMute(value, System.Guid.Empty); } }\n" + "}\n" + "'@ -ErrorAction SilentlyContinue\n" + ) + ps += 'Write-Output "Volume:$([Math]::Round([Audio]::Volume * 100)),Mute:$([Audio]::Mute)"' + result, status = self.execute_command(ps, timeout=10) + if status != 0: + return f"Error: {result}" + return f"System volume: {result.strip()}" + + if action == "set": + if level is None: + return "Error: level is required for 'set' action" + if level < 0 or level > 100: + return "Error: level must be 0-100" + # COM interop for volume set — intentionally omits SetMute/GetMute + # since they are unused (vtable position of SetMasterVolumeLevelScalar is stable) + ps = ( + "Add-Type -TypeDefinition @'\n" + "using System.Runtime.InteropServices;\n" + '[Guid("5CDF2C82-841E-4546-9722-0CF74078229A"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]\n' + "interface IAudioEndpointVolume {\n" + " int _0(); int _1(); int _2(); int _3(); int _4(); int _5(); int _6();\n" + " int SetMasterVolumeLevelScalar(float fLevel, System.Guid pguidEventContext);\n" + " int GetMasterVolumeLevelScalar(out float pfLevel);\n" + "}\n" + '[Guid("D666063F-1587-4E43-81F1-B948E807363F"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]\n' + "interface IMMDevice { int Activate(ref System.Guid iid, int dwClsCtx, IntPtr pActivationParams, [MarshalAs(UnmanagedType.IUnknown)] out object ppInterface); }\n" + '[Guid("A95664D2-9614-4F35-A746-DE8DB63617E6"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]\n' + "interface IMMDeviceEnumerator { int GetDefaultAudioEndpoint(int dataFlow, int role, out IMMDevice ppDevice); }\n" + '[ComImport, Guid("BCDE0395-E52F-467C-8E3D-C4579291692E")] class MMDeviceEnumeratorComObject { }\n' + "public class Audio {\n" + " static IAudioEndpointVolume GetVol() {\n" + " var enumerator = new MMDeviceEnumeratorComObject() as IMMDeviceEnumerator;\n" + " IMMDevice dev; enumerator.GetDefaultAudioEndpoint(0, 1, out dev);\n" + " var iid = typeof(IAudioEndpointVolume).GUID; object o;\n" + " dev.Activate(ref iid, 1, IntPtr.Zero, out o);\n" + " return (IAudioEndpointVolume)o;\n" + " }\n" + " public static void SetVol(float v) { GetVol().SetMasterVolumeLevelScalar(v, System.Guid.Empty); }\n" + "}\n" + f"'@ -ErrorAction SilentlyContinue\n[Audio]::SetVol({level / 100.0})" + ) + result, status = self.execute_command(ps, timeout=10) + if status != 0: + return f"Error: {result}" + return f"Volume set to {level}%" + + if action in ("mute", "unmute", "toggle"): + mute_val = ( + "true" + if action == "mute" + else "false" + if action == "unmute" + else "(-not [Audio]::Mute)" + ) + ps = ( + "Add-Type -TypeDefinition @'\n" + "using System.Runtime.InteropServices;\n" + '[Guid("5CDF2C82-841E-4546-9722-0CF74078229A"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]\n' + "interface IAudioEndpointVolume {\n" + " int _0(); int _1(); int _2(); int _3(); int _4(); int _5(); int _6();\n" + " int SetMasterVolumeLevelScalar(float fLevel, System.Guid pguidEventContext);\n" + " int GetMasterVolumeLevelScalar(out float pfLevel);\n" + " int SetMute([MarshalAs(UnmanagedType.Bool)] bool bMute, System.Guid pguidEventContext);\n" + " int GetMute(out bool pbMute);\n" + "}\n" + '[Guid("D666063F-1587-4E43-81F1-B948E807363F"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]\n' + "interface IMMDevice { int Activate(ref System.Guid iid, int dwClsCtx, IntPtr pActivationParams, [MarshalAs(UnmanagedType.IUnknown)] out object ppInterface); }\n" + '[Guid("A95664D2-9614-4F35-A746-DE8DB63617E6"), InterfaceType(ComInterfaceType.InterfaceIsIUnknown)]\n' + "interface IMMDeviceEnumerator { int GetDefaultAudioEndpoint(int dataFlow, int role, out IMMDevice ppDevice); }\n" + '[ComImport, Guid("BCDE0395-E52F-467C-8E3D-C4579291692E")] class MMDeviceEnumeratorComObject { }\n' + "public class Audio {\n" + " static IAudioEndpointVolume GetVol() {\n" + " var enumerator = new MMDeviceEnumeratorComObject() as IMMDeviceEnumerator;\n" + " IMMDevice dev; enumerator.GetDefaultAudioEndpoint(0, 1, out dev);\n" + " var iid = typeof(IAudioEndpointVolume).GUID; object o;\n" + " dev.Activate(ref iid, 1, IntPtr.Zero, out o);\n" + " return (IAudioEndpointVolume)o;\n" + " }\n" + " public static bool Mute { get { bool m; GetVol().GetMute(out m); return m; } set { GetVol().SetMute(value, System.Guid.Empty); } }\n" + "}\n" + f"'@ -ErrorAction SilentlyContinue\n[Audio]::Mute = {mute_val}" + ) + result, status = self.execute_command(ps, timeout=10) + if status != 0: + return f"Error: {result}" + return f"Volume {action}d." + + return f"Error: Unknown action: {action}" + + def brightness_control(self, action: str, level: int | None = None) -> str: + """Control display brightness via WMI.""" + if action == "get": + ps = "(Get-CimInstance -Namespace root/WMI -ClassName WmiMonitorBrightness | Select-Object -First 1).CurrentBrightness" + result, status = self.execute_command(ps, timeout=10) + if status != 0: + return "Error: Cannot read brightness (may not be supported on desktop monitors)." + return f"Display brightness: {result.strip()}%" + + if action == "set": + if level is None: + return "Error: level is required for 'set' action" + if level < 0 or level > 100: + return "Error: level must be 0-100" + ps = f"Get-CimInstance -Namespace root/WMI -ClassName WmiMonitorBrightnessMethods | ForEach-Object {{ $_.WmiSetBrightness(1, {level}) }}" + result, status = self.execute_command(ps, timeout=10) + if status != 0: + return f"Error: Cannot set brightness (may not be supported on desktop monitors). {result}" + return f"Brightness set to {level}%" + + return f"Error: Unknown action: {action}" + + def app_list(self) -> str: + """List all running GUI applications with window titles.""" + ps = "Get-Process | Where-Object {$_.MainWindowTitle -ne ''} | Select-Object Id, ProcessName, MainWindowTitle | Format-Table -AutoSize | Out-String -Width 200" + result, status = self.execute_command(ps, timeout=10) + if status != 0: + return f"Error: {result}" + return f"Running applications:\n{result.strip()}" + + def app_is_running(self, name: str) -> str: + """Check if an application is running by process name.""" + # Strip .exe extension if provided — Get-Process expects name without extension + clean_name = name.removesuffix(".exe").removesuffix(".EXE") + safe_name = ps_quote(clean_name) + ps = f"if (Get-Process -Name {safe_name} -ErrorAction SilentlyContinue) {{ 'Running' }} else {{ 'Not running' }}" + result, status = self.execute_command(ps, timeout=5) + if status != 0: + return f"Error: {result}" + return f'"{name}" is {result.strip().lower()}.' + + def show_dialog( + self, + dialog_type: str, + message: str | None = None, + title: str | None = None, + default_answer: str | None = None, + choices: list[str] | None = None, + ) -> str: + """Show a Windows dialog via PowerShell.""" + safe_msg = ps_quote(message or "Please respond") + safe_title = ps_quote(title or "Dialog") + + if dialog_type == "alert": + ps = ( + "Add-Type -AssemblyName System.Windows.Forms\n" + f"[System.Windows.Forms.MessageBox]::Show({safe_msg}, {safe_title}, " + "'OKCancel', 'Information')" + ) + result, status = self.execute_command(ps, timeout=120) + if status != 0: + return f"Error: {result}" + return f"Dialog result: {result.strip()}" + + if dialog_type == "prompt": + safe_default = ps_quote(default_answer or "") + ps = ( + "Add-Type -AssemblyName Microsoft.VisualBasic\n" + f"[Microsoft.VisualBasic.Interaction]::InputBox({safe_msg}, {safe_title}, {safe_default})" + ) + result, status = self.execute_command(ps, timeout=120) + if status != 0: + return f"Error: {result}" + text = result.strip() + if not text: + return "User canceled the prompt (or submitted empty text)." + return f"User entered: {text}" + + if dialog_type == "choose": + if not choices: + return "Error: choices list is required for 'choose' type" + items_str = ", ".join(ps_quote(c) for c in choices) + ps = ( + "Add-Type -AssemblyName System.Windows.Forms\n" + f"$form = New-Object System.Windows.Forms.Form -Property @{{Text={safe_title}; Width=350; Height=200; StartPosition='CenterScreen'; TopMost=$true}}\n" + f"$combo = New-Object System.Windows.Forms.ComboBox -Property @{{Left=10; Top=50; Width=310; DropDownStyle='DropDownList'}}\n" + f"@({items_str}) | ForEach-Object {{ $combo.Items.Add($_) | Out-Null }}\n" + "$combo.SelectedIndex = 0\n" + f"$label = New-Object System.Windows.Forms.Label -Property @{{Text={safe_msg}; Left=10; Top=10; Width=310; Height=30}}\n" + "$ok = New-Object System.Windows.Forms.Button -Property @{Text='OK'; Left=120; Top=120; Width=80; DialogResult='OK'}\n" + "$form.Controls.AddRange(@($label, $combo, $ok))\n" + "$form.AcceptButton = $ok\n" + "if ($form.ShowDialog() -eq 'OK') { $combo.SelectedItem } else { 'CANCELED' }" + ) + result, status = self.execute_command(ps, timeout=120) + if status != 0: + return f"Error: {result}" + text = result.strip() + if text == "CANCELED": + return "User canceled the selection." + return f"Selected: {text}" + + if dialog_type == "fileChoose": + ps = ( + "Add-Type -AssemblyName System.Windows.Forms\n" + "$d = New-Object System.Windows.Forms.OpenFileDialog -Property @{Title=" + + safe_title + + "}\n" + "if ($d.ShowDialog() -eq 'OK') { $d.FileName } else { 'CANCELED' }" + ) + result, status = self.execute_command(ps, timeout=120) + if status != 0: + return f"Error: {result}" + text = result.strip() + if text == "CANCELED": + return "User canceled file selection." + return f"Selected file: {text}" + + return f"Error: Unknown dialog type: {dialog_type}" + + def system_info_extended(self) -> str: + """Get extended system information via PowerShell and WMI.""" + ps = ( + "$info = @()\n" + "$os = Get-CimInstance Win32_OperatingSystem\n" + '$info += "Windows: $($os.Caption) $($os.Version) (Build $($os.BuildNumber))"\n' + '$info += "Computer: $($env:COMPUTERNAME)"\n' + '$info += "User: $($env:USERNAME)"\n' + "$uptime = (Get-Date) - $os.LastBootUpTime\n" + '$info += "Uptime: $($uptime.Days)d $($uptime.Hours)h $($uptime.Minutes)m"\n' + "try {\n" + " $bat = Get-CimInstance Win32_Battery -ErrorAction Stop\n" + " $charging = if ($bat.BatteryStatus -eq 2) { '(charging)' } else { '(battery)' }\n" + ' $info += "Battery: $($bat.EstimatedChargeRemaining)% $charging"\n' + "} catch { $info += 'Battery: N/A (desktop)' }\n" + "try {\n" + " $theme = Get-ItemPropertyValue -Path 'HKCU:\\Software\\Microsoft\\Windows\\CurrentVersion\\Themes\\Personalize' -Name 'AppsUseLightTheme' -ErrorAction Stop\n" + " $info += \"Dark mode: $(if ($theme -eq 0) { 'on' } else { 'off' })\"\n" + "} catch { $info += 'Dark mode: unknown' }\n" + "try {\n" + " $wifi = (Get-NetConnectionProfile -ErrorAction Stop | Where-Object { $_.InterfaceAlias -like '*Wi-Fi*' }).Name\n" + " if ($wifi) { $info += \"WiFi: $wifi\" } else { $info += 'WiFi: not connected' }\n" + "} catch { $info += 'WiFi: not available' }\n" + '$info -join "`n"' + ) + result, status = self.execute_command(ps, timeout=15) + if status != 0: + return f"Error: {result}" + return f"System Information:\n{result.strip()}" + + def dark_mode_control(self, action: str) -> str: + """Control Windows dark/light mode via registry.""" + reg_path = r"HKCU:\Software\Microsoft\Windows\CurrentVersion\Themes\Personalize" + + if action == "get": + ps = f"Get-ItemPropertyValue -Path '{reg_path}' -Name 'AppsUseLightTheme'" + result, status = self.execute_command(ps, timeout=5) + if status != 0: + return f"Error: {result}" + is_dark = result.strip() == "0" + return f"Dark mode is {'enabled' if is_dark else 'disabled'}." + + if action in ("enable", "disable", "toggle"): + if action == "toggle": + ps_get = f"Get-ItemPropertyValue -Path '{reg_path}' -Name 'AppsUseLightTheme'" + result, status = self.execute_command(ps_get, timeout=5) + if status != 0: + return f"Error: {result}" + new_val = 1 if result.strip() == "0" else 0 + else: + new_val = 0 if action == "enable" else 1 + + ps = ( + f"Set-ItemProperty -Path '{reg_path}' -Name 'AppsUseLightTheme' -Value {new_val} -Type DWord\n" + f"Set-ItemProperty -Path '{reg_path}' -Name 'SystemUsesLightTheme' -Value {new_val} -Type DWord" + ) + result, status = self.execute_command(ps, timeout=5) + if status != 0: + return f"Error: {result}" + mode = "enabled" if new_val == 0 else "disabled" + return f"Dark mode {mode}." + + return f"Error: Unknown action: {action}" + + def say_text(self, text: str, voice: str | None = None, rate: int | None = None) -> str: + """Text-to-speech via PowerShell SAPI.""" + safe_text = ps_quote(text) + ps = "Add-Type -AssemblyName System.Speech\n$s = New-Object System.Speech.Synthesis.SpeechSynthesizer\n" + if voice: + safe_voice = ps_quote(voice) + ps += f"try {{ $s.SelectVoice({safe_voice}) }} catch {{ Write-Error ('Voice not found: ' + {safe_voice}) }}\n" + if rate is not None: + clamped = max(-10, min(10, rate)) + ps += f"$s.Rate = {clamped}\n" + ps += f"$s.SpeakAsync({safe_text}) | Out-Null\nwhile ($s.State -ne 'Ready') {{ Start-Sleep -Milliseconds 100 }}\nWrite-Output 'OK'" + result, status = self.execute_command(ps, timeout=60) + if status != 0: + return f"Error: {result}" + return f"Spoke {len(text)} characters{f' with voice {voice}' if voice else ''}{f' at rate {rate}' if rate else ''}." + + def port_check(self, action: str, port: int | None = None, protocol: str = "tcp") -> str: + """Check port usage via PowerShell Get-NetTCPConnection.""" + if action == "check": + if port is None: + return "Error: port is required for 'check' action" + if protocol in ("tcp", "both"): + ps = f"Get-NetTCPConnection -LocalPort {port} -ErrorAction SilentlyContinue | Select-Object LocalPort, RemoteAddress, State, OwningProcess | Format-Table -AutoSize | Out-String" + result, status = self.execute_command(ps, timeout=10) + tcp_info = result.strip() if status == 0 and result.strip() else "" + else: + tcp_info = "" + + if protocol in ("udp", "both"): + ps = f"Get-NetUDPEndpoint -LocalPort {port} -ErrorAction SilentlyContinue | Select-Object LocalPort, OwningProcess | Format-Table -AutoSize | Out-String" + result, status = self.execute_command(ps, timeout=10) + udp_info = result.strip() if status == 0 and result.strip() else "" + else: + udp_info = "" + + if tcp_info or udp_info: + parts = [] + if tcp_info: + parts.append(f"TCP:\n{tcp_info}") + if udp_info: + parts.append(f"UDP:\n{udp_info}") + return f"Port {port} is IN USE:\n" + "\n".join(parts) + return f"Port {port} is free (not in use)." + + if action == "list": + ps = "Get-NetTCPConnection -State Listen -ErrorAction SilentlyContinue | Select-Object LocalPort, OwningProcess | Sort-Object LocalPort | Format-Table -AutoSize | Out-String -Width 200" + result, status = self.execute_command(ps, timeout=10) + if status != 0: + return f"Error: {result}" + return f"Listening ports:\n{result.strip()}" + + return f"Error: Unknown action: {action}" + + def file_watcher( + self, + path: str, + timeout_seconds: int = 30, + event: str = "any", + ) -> str: + """Watch a file for changes by polling stat.""" + resolved = pathlib.Path(path).resolve() + watch_target = resolved.parent if event == "create" and not resolved.exists() else resolved + + if not watch_target.exists(): + return f"Error: Path does not exist: {watch_target}" + + def get_state(p: pathlib.Path): + try: + stat = p.stat() + return {"exists": True, "mtime": stat.st_mtime, "size": stat.st_size} + except (FileNotFoundError, OSError): + return {"exists": False, "mtime": 0, "size": 0} + + last_state = get_state(resolved) + start = time() + saw_delete = False + + while (time() - start) < timeout_seconds: + sleep(0.25) + current = get_state(resolved) + + if not current["exists"] and last_state["exists"]: + saw_delete = True + + changed = False + change_type = "" + + if event in ("create", "any"): + if (not last_state["exists"] or saw_delete) and current["exists"]: + changed = True + change_type = "created" + saw_delete = False + + if event in ("delete", "any") and not changed: + if last_state["exists"] and not current["exists"]: + changed = True + change_type = "deleted" + + if event in ("modify", "any") and not changed: + if ( + current["exists"] + and last_state["exists"] + and ( + current["mtime"] != last_state["mtime"] + or current["size"] != last_state["size"] + ) + ): + changed = True + change_type = "modified" + + if changed: + elapsed = round(time() - start, 1) + return f"File {change_type}: {resolved} (detected in {elapsed}s). Size: {current['size']} bytes." + + last_state = current + + return f"Timeout after {timeout_seconds}s — no {event} changes detected on: {resolved}" + + def search_files( + self, + query: str, + search_type: str = "name", + directory: str | None = None, + max_results: int = 20, + ) -> str: + """Search for files using PowerShell Get-ChildItem or Windows Search.""" + if search_type == "name": + # Escape filesystem wildcard special chars before wrapping + sanitized = query.replace("[", "`[").replace("]", "`]") + safe_query = ps_quote(f"*{sanitized}*") + search_dir = ( + ps_quote(str(pathlib.Path(directory).resolve())) + if directory + else '"$env:USERPROFILE"' + ) + ps = f"Get-ChildItem -Path {search_dir} -Recurse -Filter {safe_query} -ErrorAction SilentlyContinue | Select-Object -First {max_results} -ExpandProperty FullName" + elif search_type == "content": + safe_query = ps_quote(query) + search_dir = ( + ps_quote(str(pathlib.Path(directory).resolve())) + if directory + else '"$env:USERPROFILE"' + ) + ps = f"Get-ChildItem -Path {search_dir} -Recurse -File -ErrorAction SilentlyContinue | Select-String -Pattern {safe_query} -SimpleMatch -List -ErrorAction SilentlyContinue | Select-Object -First {max_results} -ExpandProperty Path" + else: + return f"Error: Unknown search_type: {search_type}" + + result, status = self.execute_command(ps, timeout=30) + if status != 0: + return f"Error: {result}" + results = result.strip() + if not results: + return f'No results found for "{query}".' + lines = results.split("\n") + return f"Found {len(lines)} result(s):\n{results}" + + def network_diagnostics( + self, + action: str, + host: str | None = None, + count: int = 3, + timeout: int = 5, + ) -> str: + """Network diagnostic utilities via PowerShell.""" + if action == "ping": + if not host: + return "Error: host is required for ping" + safe_host = ps_quote(host) + ps = f"Test-Connection -ComputerName {safe_host} -Count {count} -TimeoutSeconds {timeout} | Format-Table -AutoSize | Out-String -Width 200" + result, status = self.execute_command(ps, timeout=timeout + 10) + if status != 0: + return f"Ping {host} failed: {result}" + return f"Ping {host}:\n{result.strip()}" + + if action == "dns": + if not host: + return "Error: host is required for dns" + safe_host = ps_quote(host) + ps = f"Resolve-DnsName {safe_host} -ErrorAction Stop | Format-Table -AutoSize | Out-String -Width 200" + result, status = self.execute_command(ps, timeout=timeout + 5) + if status != 0: + return f"DNS lookup failed for {host}: {result}" + return f"DNS lookup {host}:\n{result.strip()}" + + if action == "http": + if not host: + return "Error: host is required for http" + url = host if host.startswith("http") else f"https://{host}" + safe_url = ps_quote(url) + ps = ( + f"$r = Invoke-WebRequest -Uri {safe_url} -UseBasicParsing -TimeoutSec {timeout} -Method GET\n" + '"HTTP $($r.StatusCode) | Content-Length: $($r.RawContentLength) bytes"' + ) + result, status = self.execute_command(ps, timeout=timeout + 10) + if status != 0: + return f"HTTP check {url} failed: {result}" + return f"HTTP check {url}:\n{result.strip()}" + + if action == "interfaces": + ps = "Get-NetIPAddress -AddressFamily IPv4 | Where-Object { $_.IPAddress -ne '127.0.0.1' } | Select-Object InterfaceAlias, IPAddress | Format-Table -AutoSize | Out-String" + result, status = self.execute_command(ps, timeout=10) + if status != 0: + return f"Error: {result}" + return f"Network interfaces:\n{result.strip()}" + + return f"Error: Unknown action: {action}" + + def accessibility_inspector( + self, + app_name: str, + max_depth: int = 3, + ) -> str: + """Read UI element tree using UIAutomation library.""" + try: + # Find the app window + windows = uia.WindowControl(searchDepth=1, Name=app_name) + if not windows.Exists(maxSearchSeconds=3): + # Try partial match + all_windows = uia.GetRootControl().GetChildren() + target = None + for w in all_windows: + if app_name.lower() in (w.Name or "").lower(): + target = w + break + if not target: + return f'No window found matching "{app_name}".' + windows = target + + lines = [f"Window: {windows.Name} [{windows.ControlTypeName}]"] + + def walk(element, depth, max_d): + if depth >= max_d: + return + try: + children = element.GetChildren() + except Exception: + return + for child in children: + indent = " " * (depth + 1) + name = child.Name or "" + role = child.ControlTypeName or "" + val = "" + try: + val = ( + child.GetValuePattern().Value + if hasattr(child, "GetValuePattern") + else "" + ) + except Exception: + pass + enabled = child.IsEnabled + line = f"{indent}[{role}] {name}" + if val and val != name: + line += f" = {val}" + if not enabled: + line += " (disabled)" + lines.append(line) + walk(child, depth + 1, max_d) + + walk(windows, 0, max_depth) + return "\n".join(lines[:500]) # Cap at 500 lines + + except Exception as e: + return f"Error: Accessibility inspection failed: {str(e)}" + + # ============== UI ELEMENT OPERATIONS ============== + + def _find_app_window(self, app_name: str) -> "uia.Control | None": + """Find a window by exact or partial name match. Returns None if not found.""" + window = uia.WindowControl(searchDepth=1, Name=app_name) + if window.Exists(maxSearchSeconds=2): + return window + # Partial match fallback + all_windows = uia.GetRootControl().GetChildren() + for w in all_windows: + if app_name.lower() in (w.Name or "").lower(): + return w + return None + + def _navigate_to_element(self, root: "uia.Control", path: str) -> "uia.Control | None": + """Navigate to element by path like 'pane 2 > button 3'. + + Path segments: 'role index' where index is 1-based. + """ + import re as _re + + current = root + segments = [s.strip() for s in path.split(">")] + for seg in segments: + match = _re.match(r"^(\w+)\s*(\d+)?$", seg.strip()) + if not match: + return None + role_name = match.group(1).lower() + index = int(match.group(2) or "1") + children = current.GetChildren() + count = 0 + found = False + for child in children: + child_role = (child.ControlTypeName or "").lower() + if child_role == role_name: + count += 1 + if count == index: + current = child + found = True + break + if not found: + return None + return current + + def _search_element( + self, + root: "uia.Control", + search: str, + role: str | None = None, + max_depth: int = 5, + ) -> "uia.Control | None": + """Search for element by name (fuzzy) and optional role filter.""" + + def walk(el, depth): + if depth > max_depth: + return None + try: + children = el.GetChildren() + except Exception: + return None + for child in children: + name = (child.Name or "").strip() + child_role = (child.ControlTypeName or "").lower() + if role and child_role != role.lower(): + # Role mismatch: still recurse deeper but skip name check + result = walk(child, depth + 1) + if result: + return result + continue + if search.lower() in name.lower(): + return child + result = walk(child, depth + 1) + if result: + return result + return None + + return walk(root, 0) + + def ui_element_get(self, app_name: str, depth: int = 1, role: str | None = None) -> str: + """Get UI element tree for an application with depth and role filtering.""" + window = self._find_app_window(app_name) + if not window: + return f'No window found matching "{app_name}".' + + lines = [f"Window: {window.Name} [{window.ControlTypeName}]"] + + def walk(element, d, max_d, idx_path): + if d >= max_d: + return + try: + children = element.GetChildren() + except Exception: + return + role_counts: dict[str, int] = {} + for child in children: + child_role = (child.ControlTypeName or "").lower() + role_counts[child_role] = role_counts.get(child_role, 0) + 1 + child_index = role_counts[child_role] + + if role and child_role != role.lower(): + continue + + indent = " " * (d + 1) + name = (child.Name or "").replace("\n", " ").replace("\r", "") + path_str = ( + f"{idx_path} > {child_role} {child_index}" + if idx_path + else f"{child_role} {child_index}" + ) + val = "" + try: + val = child.GetValuePattern().Value + except Exception: + pass + enabled = child.IsEnabled + rect = child.BoundingRectangle + pos = "" + if rect.width() > 0: + pos = f" @({rect.left},{rect.top},{rect.width()},{rect.height()})" + line = f"{indent}[{child_role}] {name}" + if val and val != name: + line += f" = {val}" + if not enabled: + line += " (disabled)" + line += pos + line += f" path: {path_str}" + lines.append(line) + walk(child, d + 1, max_d, path_str) + + walk(window, 0, depth, "") + return "\n".join(lines[:500]) + + def ui_element_find(self, app_name: str, search: str, role: str | None = None) -> str: + """Find a specific UI element by name search.""" + window = self._find_app_window(app_name) + if not window: + return f'No window found matching "{app_name}".' + + element = self._search_element(window, search, role) + if not element: + return f'No element found matching "{search}"{f" with role {role}" if role else ""}.' + + name = (element.Name or "").replace("\n", " ") + el_role = element.ControlTypeName or "" + enabled = element.IsEnabled + rect = element.BoundingRectangle + val = "" + try: + val = element.GetValuePattern().Value + except Exception: + pass + + result = f"Found: [{el_role}] {name}" + if val: + result += f" = {val}" + if not enabled: + result += " (disabled)" + if rect.width() > 0: + result += f" @({rect.left},{rect.top},{rect.width()},{rect.height()})" + return result + + def ui_element_click( + self, app_name: str, path: str | None = None, search: str | None = None + ) -> str: + """Click a UI element by path or search.""" + window = self._find_app_window(app_name) + if not window: + return f'No window found matching "{app_name}".' + + element = None + if path: + element = self._navigate_to_element(window, path) + elif search: + element = self._search_element(window, search) + + if not element: + target = path or search + return f'Element not found: "{target}".' + + name = (element.Name or "").replace("\n", " ") + el_role = element.ControlTypeName or "" + + # Try InvokePattern first + try: + element.GetInvokePattern().Invoke() + return f"Clicked [{el_role}] {name} via InvokePattern." + except Exception: + pass + + # Try ExpandCollapsePattern + try: + pattern = element.GetExpandCollapsePattern() + state = pattern.ExpandCollapseState + if state == 0: # Collapsed + pattern.Expand() + else: + pattern.Collapse() + return f"Toggled [{el_role}] {name} via ExpandCollapsePattern." + except Exception: + pass + + # Fallback: click at center of bounds + try: + rect = element.BoundingRectangle + if rect.width() > 0: + cx = rect.left + rect.width() // 2 + cy = rect.top + rect.height() // 2 + self.click((cx, cy), button="left", clicks=1) + return f"Clicked [{el_role}] {name} at ({cx}, {cy})." + except Exception: + pass + + return f"Failed to click [{el_role}] {name}: no supported interaction pattern." + + def ui_element_set_value( + self, + app_name: str, + value: str, + path: str | None = None, + search: str | None = None, + ) -> str: + """Set value on a UI element (text field, checkbox, etc.).""" + window = self._find_app_window(app_name) + if not window: + return f'No window found matching "{app_name}".' + + element = None + if path: + element = self._navigate_to_element(window, path) + elif search: + element = self._search_element(window, search) + + if not element: + target = path or search + return f'Element not found: "{target}".' + + name = (element.Name or "").replace("\n", " ") + el_role = element.ControlTypeName or "" + + # Try ValuePattern (text fields, combo boxes) + try: + element.GetValuePattern().SetValue(value) + return f"Set [{el_role}] {name} = {value} via ValuePattern." + except Exception: + pass + + # Try TogglePattern (checkboxes) + try: + toggle = element.GetTogglePattern() + target_on = value.lower() in ("true", "on", "1", "yes", "checked") + current = toggle.ToggleState + if (target_on and current != 1) or (not target_on and current == 1): + toggle.Toggle() + return f"Toggled [{el_role}] {name} to {value} via TogglePattern." + except Exception: + pass + + # Try SelectionItemPattern (radio buttons, list items) + # Only select if element name/value matches the requested value + try: + sip = element.GetSelectionItemPattern() + el_name_lower = name.lower().strip() + if el_name_lower == value.lower().strip(): + sip.Select() + return f"Selected [{el_role}] {name} via SelectionItemPattern." + else: + # Element found but name doesn't match requested value — skip + pass + except Exception: + pass + + # Try RangeValuePattern (sliders, spinners) + try: + rv = element.GetRangeValuePattern() + rv.SetValue(float(value)) + return f"Set [{el_role}] {name} = {value} via RangeValuePattern." + except Exception: + pass + + return f"Failed to set value on [{el_role}] {name}: no supported value pattern." + + def ui_element_type_into( + self, + app_name: str, + text: str, + path: str | None = None, + search: str | None = None, + clear: bool = False, + ) -> str: + """Type text into a UI element by focusing it first.""" + window = self._find_app_window(app_name) + if not window: + return f'No window found matching "{app_name}".' + + element = None + if path: + element = self._navigate_to_element(window, path) + elif search: + element = self._search_element(window, search) + + if not element: + target = path or search + return f'Element not found: "{target}".' + + name = (element.Name or "").replace("\n", " ") + el_role = element.ControlTypeName or "" + + try: + element.SetFocus() + sleep(0.1) + except Exception as e: + return f"Error: Could not focus [{el_role}] {name}: {e}. Aborting to prevent typing into wrong window." + + if clear: + # Select all then delete + uia.SendKeys("{Ctrl}a", waitTime=0.05) + uia.SendKeys("{Delete}", waitTime=0.05) + + escaped = _escape_text_for_sendkeys(text) + uia.SendKeys(escaped, waitTime=0.05) + return f"Typed {len(text)} chars into [{el_role}] {name}." + + def ui_element_list_windows(self) -> str: + """List all visible windows with details.""" + ps = ( + "Get-Process | Where-Object {$_.MainWindowTitle -ne ''} | " + "ForEach-Object { " + " $h = $_.MainWindowHandle; " + " $r = New-Object 'System.Drawing.Rectangle'; " + " try { " + " Add-Type -AssemblyName System.Windows.Forms -ErrorAction SilentlyContinue; " + " } catch {} " + ' "$($_.Id)|$($_.ProcessName)|$($_.MainWindowTitle)|$h" ' + "} | Out-String -Width 500" + ) + result, status = self.execute_command(ps, timeout=10) + if status != 0: + return f"Error: {result}" + + lines = ["PID | Process | Title | Handle"] + lines.append("-" * 60) + for line in result.strip().split("\n"): + line = line.strip() + if line: + lines.append(line.replace("|", " | ")) + return "\n".join(lines) + + def ui_element_overview(self, app_name: str) -> str: + """Get element role counts for an application.""" + window = self._find_app_window(app_name) + if not window: + return f'No window found matching "{app_name}".' + + role_counts: dict[str, int] = {} + total = 0 + + def count_roles(element, depth, max_depth=4): + nonlocal total + if depth >= max_depth: + return + try: + children = element.GetChildren() + except Exception: + return + for child in children: + role = child.ControlTypeName or "Unknown" + role_counts[role] = role_counts.get(role, 0) + 1 + total += 1 + count_roles(child, depth + 1, max_depth) + + count_roles(window, 0) + + lines = [f"App Overview: {window.Name}", f"Total elements: {total}", ""] + for role, count in sorted(role_counts.items(), key=lambda x: -x[1]): + lines.append(f" {role}: {count}") + return "\n".join(lines) + + # ============== WINDOW SCREENSHOT ============== + + def capture_window_screenshot( + self, app_name: str | None = None, handle: int | None = None + ) -> "Image.Image | None": + """Capture screenshot of a specific window.""" + if handle: + try: + rect_tuple = win32gui.GetWindowRect(handle) + # rect_tuple is (left, top, right, bottom) + bbox = (rect_tuple[0], rect_tuple[1], rect_tuple[2], rect_tuple[3]) + img = ImageGrab.grab(bbox=bbox) + return img + except Exception as e: + logger.error(f"Screenshot by handle failed: {e}") + return None + + if app_name: + window = self._find_app_window(app_name) + if not window: + return None + try: + hwnd = window.NativeWindowHandle + rect_tuple = win32gui.GetWindowRect(hwnd) + bbox = (rect_tuple[0], rect_tuple[1], rect_tuple[2], rect_tuple[3]) + img = ImageGrab.grab(bbox=bbox) + return img + except Exception as e: + logger.error(f"Screenshot by app name failed: {e}") + return None + + return None + + # ============== MULTI MONITOR ============== + + def get_multi_monitor_info(self) -> str: + """Get information about all connected monitors.""" + ps = ( + "Add-Type -AssemblyName System.Windows.Forms\n" + "[System.Windows.Forms.Screen]::AllScreens | ForEach-Object {\n" + " $b = $_.Bounds\n" + " $w = $_.WorkingArea\n" + ' "Name: $($_.DeviceName) | Primary: $($_.Primary) | "\n' + ' + "Bounds: $($b.X),$($b.Y) $($b.Width)x$($b.Height) | "\n' + ' + "WorkArea: $($w.X),$($w.Y) $($w.Width)x$($w.Height) | "\n' + ' + "BPP: $($_.BitsPerPixel)"\n' + "}" + ) + result, status = self.execute_command(ps, timeout=10) + if status != 0: + return f"Error: {result}" + return f"Monitors:\n{result.strip()}" + + # ============== SCREEN RECORDING ============== + + def screen_record( + self, + action: str, + output_path: str | None = None, + duration: int | None = None, + fps: int = 15, + ) -> str: + """Control screen recording using ffmpeg.""" + if not shutil.which("ffmpeg"): + return "Error: ffmpeg not found in PATH. Install ffmpeg first." + + state_file = os.path.join(tempfile.gettempdir(), "wmcp_screen_record.pid") + + if action == "start": + # Validate output_path to prevent path traversal / ffmpeg option injection + if output_path: + resolved_out = pathlib.Path(output_path).resolve() + if resolved_out.suffix.lower() not in {".mp4", ".mkv", ".avi"}: + return "Error: output_path must have .mp4, .mkv, or .avi extension" + if str(resolved_out).startswith("-"): + return "Error: output_path must not start with '-'" + out = str(resolved_out) + else: + out = os.path.join( + os.path.expanduser("~"), + "Desktop", + f"recording_{int(time())}.mp4", + ) + + # Atomic check-and-create to prevent TOCTOU race + try: + fd = os.open(state_file, os.O_CREAT | os.O_EXCL | os.O_WRONLY) + except FileExistsError: + return "Error: Recording already in progress. Stop it first." + + cmd = [ + "ffmpeg", + "-y", + "-f", + "gdigrab", + "-framerate", + str(fps), + "-i", + "desktop", + "-c:v", + "libx264", + "-preset", + "ultrafast", + ] + if duration: + cmd += ["-t", str(duration)] + cmd.append(out) + + # Use CREATE_NEW_PROCESS_GROUP so we can send CTRL_BREAK_EVENT + # to gracefully stop ffmpeg (allows it to finalize the video file) + create_flags = getattr(subprocess, "CREATE_NO_WINDOW", 0) | getattr( + subprocess, "CREATE_NEW_PROCESS_GROUP", 0 + ) + try: + proc = subprocess.Popen( + cmd, + stdout=subprocess.DEVNULL, + stderr=subprocess.DEVNULL, + creationflags=create_flags, + ) + except Exception as e: + os.close(fd) + try: + os.remove(state_file) + except OSError: + pass + return f"Error starting ffmpeg: {e}" + os.write(fd, f"{proc.pid}\n{out}".encode()) + os.close(fd) + return f"Recording started (PID {proc.pid}). Output: {out}" + + if action == "stop": + if not os.path.exists(state_file): + return "No recording in progress." + with open(state_file, "r") as f: + lines = f.read().strip().split("\n") + pid = int(lines[0]) + out = lines[1] if len(lines) > 1 else "unknown" + try: + # Verify the PID is actually ffmpeg before sending signal + p = Process(pid) + if "ffmpeg" not in p.name().lower(): + try: + os.remove(state_file) + except OSError: + pass + return f"PID {pid} is not ffmpeg (is {p.name()}). State file cleaned up." + except Exception: + try: + os.remove(state_file) + except OSError: + pass + return "Recording process not found. State file cleaned up." + try: + import signal + + # Send CTRL_BREAK_EVENT for graceful ffmpeg shutdown + # (allows finalization of the MP4 container) + os.kill(pid, getattr(signal, "CTRL_BREAK_EVENT", signal.SIGINT)) + # Wait up to 5s for ffmpeg to finalize the output file + try: + p.wait(timeout=5) + except Exception: + pass # timeout or already exited + except (OSError, ProcessLookupError): + pass + try: + os.remove(state_file) + except OSError: + pass + return f"Recording stopped. Output: {out}" + + if action == "status": + if not os.path.exists(state_file): + return "No recording in progress." + with open(state_file, "r") as f: + lines = f.read().strip().split("\n") + pid = int(lines[0]) + out = lines[1] if len(lines) > 1 else "unknown" + try: + p = Process(pid) + if "ffmpeg" not in p.name().lower(): + try: + os.remove(state_file) + except OSError: + pass + return "Recording process not found (PID recycled). State file cleaned up." + return f"Recording in progress (PID {pid}). Output: {out}" + except Exception: + try: + os.remove(state_file) + except OSError: + pass + return "Recording process not found (may have finished)." + + return f"Error: Unknown action: {action}" + + # ============== MENU CLICK ============== + + def menu_click(self, app_name: str, menu_path: str) -> str: + """Navigate and click menu items by path (e.g., 'File > Save As').""" + window = self._find_app_window(app_name) + if not window: + return f'No window found matching "{app_name}".' + + try: + window.SetFocus() + sleep(0.2) + except Exception: + pass + + segments = [s.strip() for s in menu_path.split(">")] + current = window + + for i, menu_name in enumerate(segments): + # Search for menu item + found = None + try: + children = current.GetChildren() + for child in children: + child_role = (child.ControlTypeName or "").lower() + child_name = (child.Name or "").strip() + if child_role in ("menubar", "menu", "menuitem"): + if menu_name.lower() in child_name.lower(): + found = child + break + # Check children of menu bar + if child_role == "menubar": + bar_children = child.GetChildren() + for bar_child in bar_children: + bar_name = (bar_child.Name or "").strip() + if menu_name.lower() in bar_name.lower(): + found = bar_child + break + if found: + break + except Exception: + pass + + if not found: + return f'Menu item "{menu_name}" not found at level {i + 1}.' + + # Click/expand the menu item + try: + found.GetInvokePattern().Invoke() + sleep(0.3) + except Exception: + try: + found.GetExpandCollapsePattern().Expand() + sleep(0.3) + except Exception: + try: + rect = found.BoundingRectangle + if rect.width() > 0: + cx = rect.left + rect.width() // 2 + cy = rect.top + rect.height() // 2 + self.click((cx, cy), button="left", clicks=1) + sleep(0.3) + except Exception: + return f'Failed to activate menu item "{menu_name}".' + + current = found + + return f"Clicked menu path: {menu_path}" + + # ============== QUICK LOOK ============== + + # File extensions blocked from os.startfile (executable/script types) + _BLOCKED_EXTENSIONS = { + ".exe", + ".bat", + ".cmd", + ".com", + ".scr", + ".pif", + ".msi", + ".msp", + ".ps1", + ".psm1", + ".psd1", + ".vbs", + ".vbe", + ".js", + ".jse", + ".wsf", + ".wsh", + ".ws", + ".hta", + ".cpl", + ".inf", + ".reg", + ".rgs", + ".sct", + ".shb", + ".shs", + ".lnk", + ".url", + ".application", + ".gadget", + ".msc", + ".jar", + ".py", + ".pyw", + ".rb", + ".sh", + ".bash", + } + + def quick_look(self, path: str) -> str: + """Open a file with its default application (blocks executables/scripts).""" + resolved = pathlib.Path(path).resolve() + if not resolved.exists(): + return f"Error: File not found: {resolved}" + ext = resolved.suffix.lower() + if ext in self._BLOCKED_EXTENSIONS: + return ( + f"Error: Blocked file type '{ext}' — cannot open executables or " + f"scripts via QuickLook for security. Use PowerShell tool instead." + ) + try: + os.startfile(str(resolved)) + return f"Opened: {resolved}" + except Exception as e: + return f"Error opening file: {e}" + + # ============== WINDOW TILING ============== + + def window_tiling(self, mode: str, app_name: str | None = None) -> str: + """Arrange windows in various tiling layouts.""" + SWP_SHOWWINDOW = 0x0040 + + if mode in ("maximize", "restore", "minimize"): + if not app_name: + return "Error: app_name is required for maximize/restore/minimize" + window = self._find_app_window(app_name) + if not window: + return f'No window found matching "{app_name}".' + hwnd = window.NativeWindowHandle + if mode == "maximize": + win32gui.ShowWindow(hwnd, win32con.SW_MAXIMIZE) + elif mode == "minimize": + win32gui.ShowWindow(hwnd, win32con.SW_MINIMIZE) + elif mode == "restore": + win32gui.ShowWindow(hwnd, win32con.SW_RESTORE) + return f"Window {app_name} {mode}d." + + if mode in ("left", "right", "top", "bottom"): + if not app_name: + return "Error: app_name is required for tiling" + window = self._find_app_window(app_name) + if not window: + return f'No window found matching "{app_name}".' + hwnd = window.NativeWindowHandle + win32gui.ShowWindow(hwnd, win32con.SW_RESTORE) + + # Get work area + ps = ( + "Add-Type -AssemblyName System.Windows.Forms\n" + "$w = [System.Windows.Forms.Screen]::PrimaryScreen.WorkingArea\n" + '"$($w.X),$($w.Y),$($w.Width),$($w.Height)"' + ) + result, status = self.execute_command(ps, timeout=5) + if status != 0: + return f"Error getting screen info: {result}" + parts = result.strip().split(",") + sx, sy, sw, sh = int(parts[0]), int(parts[1]), int(parts[2]), int(parts[3]) + + if mode == "left": + x, y, w, h = sx, sy, sw // 2, sh + elif mode == "right": + x, y, w, h = sx + sw // 2, sy, sw // 2, sh + elif mode == "top": + x, y, w, h = sx, sy, sw, sh // 2 + elif mode == "bottom": + x, y, w, h = sx, sy + sh // 2, sw, sh // 2 + + ctypes.windll.user32.SetWindowPos(hwnd, 0, x, y, w, h, SWP_SHOWWINDOW) + return f"Tiled {app_name} to {mode} half." + + if mode == "cascade": + ps = ( + "Get-Process | Where-Object {$_.MainWindowTitle -ne ''} | " + "ForEach-Object { $_.MainWindowHandle } | Out-String" + ) + result, status = self.execute_command(ps, timeout=10) + if status != 0: + return f"Error: {result}" + handles = [] + for h in result.strip().split("\n"): + h = h.strip() + if h: + try: + handles.append(int(h)) + except ValueError: + pass # skip non-numeric lines (headers, errors) + offset = 30 + for i, hwnd in enumerate(handles): + try: + win32gui.ShowWindow(hwnd, win32con.SW_RESTORE) + ctypes.windll.user32.SetWindowPos( + hwnd, 0, offset * i, offset * i, 800, 600, SWP_SHOWWINDOW + ) + except Exception: + pass + return f"Cascaded {len(handles)} windows." + + return f"Error: Unknown tiling mode: {mode}" + + # ============== CLIPBOARD INFO ============== + + def get_clipboard_info(self) -> str: + """Get clipboard format details.""" + ps = ( + "Add-Type -AssemblyName System.Windows.Forms\n" + "$d = $null\n" + "for ($i = 0; $i -lt 3; $i++) {\n" + " try { $d = [System.Windows.Forms.Clipboard]::GetDataObject(); break }\n" + " catch { Start-Sleep -Milliseconds 100 }\n" + "}\n" + "if ($d -eq $null) { 'Clipboard is empty or locked' } else {\n" + " $formats = $d.GetFormats()\n" + ' $info = @("Clipboard formats (" + $formats.Count + "):")\n' + " foreach ($f in $formats) {\n" + " $hasData = $d.GetDataPresent($f)\n" + ' $info += " $f (present: $hasData)"\n' + " }\n" + " if ($d.ContainsText()) {\n" + " $text = $d.GetText()\n" + " $preview = if ($text.Length -gt 100) { $text.Substring(0, 100) + '...' } else { $text }\n" + ' $info += ""\n' + ' $info += "Text preview: $preview"\n' + ' $info += "Text length: $($text.Length) chars"\n' + " }\n" + " if ($d.ContainsImage()) {\n" + " $img = $d.GetImage()\n" + ' $info += "Image: $($img.Width)x$($img.Height)"\n' + " }\n" + ' $info -join "`n"\n' + "}" + ) + result, status = self.execute_command(ps, timeout=10) + if status != 0: + return f"Error: {result}" + return result.strip() + + # ============== APP CONTROL ENHANCEMENTS ============== + + def window_control(self, app_name: str, action: str) -> str: + """Control window state: minimize, maximize, close, fullscreen, restore.""" + window = self._find_app_window(app_name) + if not window: + return f'No window found matching "{app_name}".' + + hwnd = window.NativeWindowHandle + + if action == "minimize": + win32gui.ShowWindow(hwnd, win32con.SW_MINIMIZE) + return f"Minimized: {app_name}" + elif action == "maximize": + win32gui.ShowWindow(hwnd, win32con.SW_MAXIMIZE) + return f"Maximized: {app_name}" + elif action == "restore": + win32gui.ShowWindow(hwnd, win32con.SW_RESTORE) + # Clear TOPMOST flag in case window was set fullscreen + HWND_NOTOPMOST = -2 + SWP_NOMOVE = 0x0002 + SWP_NOSIZE = 0x0001 + ctypes.windll.user32.SetWindowPos( + hwnd, HWND_NOTOPMOST, 0, 0, 0, 0, SWP_NOMOVE | SWP_NOSIZE + ) + return f"Restored: {app_name}" + elif action == "close": + win32gui.PostMessage(hwnd, win32con.WM_CLOSE, 0, 0) + return f"Sent close to: {app_name}" + elif action == "fullscreen": + screen_size = self.get_screen_size() + win32gui.ShowWindow(hwnd, win32con.SW_RESTORE) + HWND_TOPMOST = -1 + ctypes.windll.user32.SetWindowPos( + hwnd, HWND_TOPMOST, 0, 0, screen_size.width, screen_size.height, 0x0040 + ) + return f"Fullscreen: {app_name}" + else: + return f"Error: Unknown action: {action}" diff --git a/src/windows_mcp/desktop/utils.py b/src/windows_mcp/desktop/utils.py index 7431b0f..993fc2c 100644 --- a/src/windows_mcp/desktop/utils.py +++ b/src/windows_mcp/desktop/utils.py @@ -12,3 +12,38 @@ def ps_quote_for_xml(value: str) -> str: """XML-escape then ps_quote. Use for values in XML passed to PowerShell.""" escaped = xml_escape(value, {'"': '"', "'": '''}) return ps_quote(escaped) + + +_NAMED_COLORS = { + "black": (0, 0, 0), + "white": (255, 255, 255), + "red": (255, 0, 0), + "green": (0, 128, 0), + "blue": (0, 0, 255), + "yellow": (255, 255, 0), + "cyan": (0, 255, 255), + "magenta": (255, 0, 255), + "orange": (255, 165, 0), + "purple": (128, 0, 128), + "pink": (255, 192, 203), + "brown": (139, 69, 19), + "gray": (128, 128, 128), + "silver": (192, 192, 192), + "navy": (0, 0, 128), + "teal": (0, 128, 128), + "maroon": (128, 0, 0), + "olive": (128, 128, 0), + "lime": (0, 255, 0), +} + + +def approximate_color_name(r: int, g: int, b: int) -> str: + """Find the closest named color using Euclidean distance.""" + best_name = "unknown" + best_dist = float("inf") + for name, (nr, ng, nb) in _NAMED_COLORS.items(): + dist = (r - nr) ** 2 + (g - ng) ** 2 + (b - nb) ** 2 + if dist < best_dist: + best_dist = dist + best_name = name + return best_name diff --git a/tests/test_coordinate_system.py b/tests/test_coordinate_system.py new file mode 100644 index 0000000..6d346cb --- /dev/null +++ b/tests/test_coordinate_system.py @@ -0,0 +1,180 @@ +"""Tests for DPI coordinate_system conversion helpers in __main__.py.""" + +import pytest +from unittest.mock import patch + + +class TestToPhysical: + """Test _to_physical helper for [x, y] coordinate conversion.""" + + def test_physical_passthrough(self): + """Physical coordinates should not be modified.""" + from windows_mcp.__main__ import _to_physical + + loc = [100, 200] + result = _to_physical(loc, "physical") + assert result == [100, 200] + + def test_physical_returns_same_list(self): + """Physical mode should return the same list object.""" + from windows_mcp.__main__ import _to_physical + + loc = [50, 75] + result = _to_physical(loc, "physical") + assert result is loc + + @patch("windows_mcp.__main__.desktop") + def test_logical_scales_by_dpi(self, mock_desktop): + """Logical coordinates should be multiplied by DPI scale factor.""" + from windows_mcp.__main__ import _to_physical + + mock_desktop.get_dpi_scaling.return_value = 1.5 + result = _to_physical([100, 200], "logical") + assert result == [150, 300] + + @patch("windows_mcp.__main__.desktop") + def test_logical_150_percent(self, mock_desktop): + """Test 150% DPI scaling (common on laptops).""" + from windows_mcp.__main__ import _to_physical + + mock_desktop.get_dpi_scaling.return_value = 1.5 + result = _to_physical([960, 540], "logical") + assert result == [1440, 810] + + @patch("windows_mcp.__main__.desktop") + def test_logical_200_percent(self, mock_desktop): + """Test 200% DPI scaling (4K displays).""" + from windows_mcp.__main__ import _to_physical + + mock_desktop.get_dpi_scaling.return_value = 2.0 + result = _to_physical([500, 300], "logical") + assert result == [1000, 600] + + @patch("windows_mcp.__main__.desktop") + def test_logical_100_percent_no_change(self, mock_desktop): + """100% DPI (scale=1.0) should not change values.""" + from windows_mcp.__main__ import _to_physical + + mock_desktop.get_dpi_scaling.return_value = 1.0 + result = _to_physical([100, 200], "logical") + assert result == [100, 200] + + @patch("windows_mcp.__main__.desktop") + def test_logical_rounds_to_int(self, mock_desktop): + """Scaled values should be rounded to nearest int.""" + from windows_mcp.__main__ import _to_physical + + mock_desktop.get_dpi_scaling.return_value = 1.25 + result = _to_physical([100, 100], "logical") + assert result == [125, 125] + assert all(isinstance(v, int) for v in result) + + @patch("windows_mcp.__main__.desktop") + def test_logical_rounds_up_at_midpoint(self, mock_desktop): + """round() should round 0.5 up for correct pixel targeting.""" + from windows_mcp.__main__ import _to_physical + + # 99 * 1.25 = 123.75 -> should round to 124, not truncate to 123 + mock_desktop.get_dpi_scaling.return_value = 1.25 + result = _to_physical([99, 99], "logical") + assert result == [124, 124] + + def test_logical_raises_when_desktop_none(self): + """Should raise RuntimeError when desktop is not initialized.""" + from windows_mcp.__main__ import _to_physical + + with patch("windows_mcp.__main__.desktop", None): + try: + _to_physical([100, 200], "logical") + assert False, "Should have raised RuntimeError" + except RuntimeError as e: + assert "not initialized" in str(e) + + +class TestRegionToPhysical: + """Test _region_to_physical helper for [x, y, w, h] conversion.""" + + def test_physical_passthrough(self): + from windows_mcp.__main__ import _region_to_physical + + region = [100, 200, 300, 400] + result = _region_to_physical(region, "physical") + assert result == [100, 200, 300, 400] + + def test_physical_returns_same_list(self): + from windows_mcp.__main__ import _region_to_physical + + region = [10, 20, 30, 40] + result = _region_to_physical(region, "physical") + assert result is region + + @patch("windows_mcp.__main__.desktop") + def test_logical_scales_all_values(self, mock_desktop): + """All 4 values (x, y, w, h) should be scaled.""" + from windows_mcp.__main__ import _region_to_physical + + mock_desktop.get_dpi_scaling.return_value = 2.0 + result = _region_to_physical([100, 200, 300, 400], "logical") + assert result == [200, 400, 600, 800] + + +class TestPathToPhysical: + """Test _path_to_physical helper for [[x,y], ...] conversion.""" + + def test_physical_passthrough(self): + from windows_mcp.__main__ import _path_to_physical + + path = [[0, 0], [100, 100], [200, 200]] + result = _path_to_physical(path, "physical") + assert result == [[0, 0], [100, 100], [200, 200]] + + def test_physical_returns_same_list(self): + from windows_mcp.__main__ import _path_to_physical + + path = [[10, 20], [30, 40]] + result = _path_to_physical(path, "physical") + assert result is path + + @patch("windows_mcp.__main__.desktop") + def test_logical_scales_all_waypoints(self, mock_desktop): + from windows_mcp.__main__ import _path_to_physical + + mock_desktop.get_dpi_scaling.return_value = 1.5 + result = _path_to_physical([[100, 200], [300, 400]], "logical") + assert result == [[150, 300], [450, 600]] + + @patch("windows_mcp.__main__.desktop") + def test_logical_empty_path(self, mock_desktop): + from windows_mcp.__main__ import _path_to_physical + + mock_desktop.get_dpi_scaling.return_value = 2.0 + result = _path_to_physical([], "logical") + assert result == [] + + +class TestInputValidation: + """Test input shape validation in DPI helpers.""" + + def test_to_physical_rejects_single_element(self): + from windows_mcp.__main__ import _to_physical + + with pytest.raises(ValueError, match="loc must be"): + _to_physical([100], "physical") + + def test_to_physical_rejects_three_elements(self): + from windows_mcp.__main__ import _to_physical + + with pytest.raises(ValueError, match="loc must be"): + _to_physical([1, 2, 3], "physical") + + def test_region_to_physical_rejects_wrong_length(self): + from windows_mcp.__main__ import _region_to_physical + + with pytest.raises(ValueError, match="region must be"): + _region_to_physical([1, 2], "physical") + + def test_path_to_physical_rejects_malformed_waypoint(self): + from windows_mcp.__main__ import _path_to_physical + + with pytest.raises(ValueError, match="waypoint 1 must be"): + _path_to_physical([[0, 0], [100]], "physical") diff --git a/tests/test_cursor_position.py b/tests/test_cursor_position.py new file mode 100644 index 0000000..07988f0 --- /dev/null +++ b/tests/test_cursor_position.py @@ -0,0 +1,43 @@ +from unittest.mock import patch + +import pytest + +from windows_mcp.desktop.service import Desktop + + +@pytest.fixture +def desktop(): + with patch.object(Desktop, "__init__", lambda self: None): + d = Desktop() + return d + + +class TestCursorPosition: + @patch("windows_mcp.desktop.service.uia") + def test_returns_coordinates(self, mock_uia, desktop): + mock_uia.GetCursorPos.return_value = (150, 300) + result = desktop.get_cursor_position() + assert "150" in result + assert "300" in result + assert "Cursor position" in result + + @patch("windows_mcp.desktop.service.uia") + def test_origin_coordinates(self, mock_uia, desktop): + mock_uia.GetCursorPos.return_value = (0, 0) + result = desktop.get_cursor_position() + assert "(0, 0)" in result + + @patch("windows_mcp.desktop.service.uia") + def test_large_coordinates(self, mock_uia, desktop): + mock_uia.GetCursorPos.return_value = (3840, 2160) + result = desktop.get_cursor_position() + assert "3840" in result + assert "2160" in result + + @patch("windows_mcp.desktop.service.uia") + def test_negative_coordinates(self, mock_uia, desktop): + """Multi-monitor setups can have negative coordinates.""" + mock_uia.GetCursorPos.return_value = (-500, 200) + result = desktop.get_cursor_position() + assert "-500" in result + assert "200" in result diff --git a/tests/test_find_image.py b/tests/test_find_image.py new file mode 100644 index 0000000..02d2df2 --- /dev/null +++ b/tests/test_find_image.py @@ -0,0 +1,154 @@ +from unittest.mock import MagicMock, patch +import sys + +import pytest + +from windows_mcp.desktop.service import Desktop + + +@pytest.fixture +def desktop(): + with patch.object(Desktop, "__init__", lambda self: None): + d = Desktop() + return d + + +class TestFindImage: + def test_missing_deps(self, desktop): + """Should return install instructions when opencv is not installed.""" + with patch.dict(sys.modules, {"cv2": None, "numpy": None}): + original_import = ( + __builtins__.__import__ if hasattr(__builtins__, "__import__") else __import__ + ) + + def mock_import(name, *args, **kwargs): + if name in ("cv2", "numpy"): + raise ImportError(f"No module named '{name}'") + return original_import(name, *args, **kwargs) + + with patch("builtins.__import__", side_effect=mock_import): + result = desktop.find_image("template.png") + assert "opencv" in result.lower() or "Error" in result + + @patch("pathlib.Path.resolve") + def test_file_not_found(self, mock_resolve, desktop): + """Should error when template file doesn't exist.""" + mock_path = MagicMock() + mock_path.is_file.return_value = False + mock_resolve.return_value = mock_path + + mock_cv2 = MagicMock() + mock_np = MagicMock() + with patch.dict(sys.modules, {"cv2": mock_cv2, "numpy": mock_np}): + result = desktop.find_image("/nonexistent/template.png") + assert "Error" in result + assert "not found" in result + + @patch("windows_mcp.desktop.service.ImageGrab") + @patch("pathlib.Path.resolve") + def test_match_found(self, mock_resolve, mock_grab, desktop): + """Should return coordinates when match exceeds threshold.""" + mock_path = MagicMock() + mock_path.is_file.return_value = True + mock_path.suffix = ".png" + mock_path.__str__ = lambda self: "/fake/template.png" + mock_resolve.return_value = mock_path + + mock_cv2 = MagicMock() + mock_np = MagicMock() + + # Template is 20x10 + mock_template = MagicMock() + mock_template.shape = (10, 20, 3) + mock_cv2.imread.return_value = mock_template + + # Screen is 1920x1080 + mock_screen_bgr = MagicMock() + mock_screen_bgr.shape = (1080, 1920, 3) + mock_cv2.cvtColor.return_value = mock_screen_bgr + + # Match at (100, 200) with confidence 0.95 + mock_cv2.matchTemplate.return_value = MagicMock() + mock_cv2.minMaxLoc.return_value = (0, 0.95, (0, 0), (100, 200)) + mock_cv2.TM_CCOEFF_NORMED = 5 + + mock_screen_img = MagicMock() + mock_grab.grab.return_value = mock_screen_img + mock_np.array.return_value = MagicMock() + mock_cv2.COLOR_RGB2BGR = 4 + + with patch.dict(sys.modules, {"cv2": mock_cv2, "numpy": mock_np}): + result = desktop.find_image("template.png", threshold=0.8) + assert "Match found" in result + assert "0.95" in result + # Center should be x=100+10, y=200+5 + assert "110" in result + assert "205" in result + + @patch("windows_mcp.desktop.service.ImageGrab") + @patch("pathlib.Path.resolve") + def test_no_match(self, mock_resolve, mock_grab, desktop): + """Should report no match when confidence is below threshold.""" + mock_path = MagicMock() + mock_path.is_file.return_value = True + mock_path.suffix = ".png" + mock_path.__str__ = lambda self: "/fake/template.png" + mock_resolve.return_value = mock_path + + mock_cv2 = MagicMock() + mock_np = MagicMock() + + mock_template = MagicMock() + mock_template.shape = (10, 20, 3) + mock_cv2.imread.return_value = mock_template + + mock_screen_bgr = MagicMock() + mock_screen_bgr.shape = (1080, 1920, 3) + mock_cv2.cvtColor.return_value = mock_screen_bgr + + mock_cv2.matchTemplate.return_value = MagicMock() + mock_cv2.minMaxLoc.return_value = (0, 0.3, (0, 0), (50, 50)) + mock_cv2.TM_CCOEFF_NORMED = 5 + + mock_screen_img = MagicMock() + mock_grab.grab.return_value = mock_screen_img + mock_np.array.return_value = MagicMock() + mock_cv2.COLOR_RGB2BGR = 4 + + with patch.dict(sys.modules, {"cv2": mock_cv2, "numpy": mock_np}): + result = desktop.find_image("template.png", threshold=0.8) + assert "No match" in result + assert "0.3" in result + + def test_invalid_region(self, desktop): + """Should error when region has wrong number of elements.""" + mock_cv2 = MagicMock() + mock_np = MagicMock() + mock_cv2.imread.return_value = MagicMock() + + mock_path = MagicMock() + mock_path.is_file.return_value = True + mock_path.suffix = ".png" + mock_path.__str__ = lambda self: "/fake/template.png" + + with patch.dict(sys.modules, {"cv2": mock_cv2, "numpy": mock_np}): + with patch("pathlib.Path.resolve", return_value=mock_path): + result = desktop.find_image("template.png", region=[10, 20]) + assert "Error" in result + assert "region" in result + + def test_invalid_extension(self, desktop): + """Should reject non-image file extensions.""" + mock_cv2 = MagicMock() + mock_np = MagicMock() + + mock_path = MagicMock() + mock_path.is_file.return_value = True + mock_path.suffix = ".exe" + mock_path.__str__ = lambda self: "/fake/malware.exe" + + with patch.dict(sys.modules, {"cv2": mock_cv2, "numpy": mock_np}): + with patch("pathlib.Path.resolve", return_value=mock_path): + result = desktop.find_image("malware.exe") + assert "Error" in result + assert "image file" in result diff --git a/tests/test_highlight.py b/tests/test_highlight.py new file mode 100644 index 0000000..4bee6f8 --- /dev/null +++ b/tests/test_highlight.py @@ -0,0 +1,48 @@ +from unittest.mock import patch + +import pytest + +from windows_mcp.desktop.service import Desktop, _HIGHLIGHT_COLORS + + +@pytest.fixture +def desktop(): + with patch.object(Desktop, "__init__", lambda self: None): + d = Desktop() + return d + + +class TestHighlightRegion: + @patch("windows_mcp.desktop.service.sleep") + @patch("windows_mcp.desktop.service.ctypes") + def test_success(self, mock_ctypes, mock_sleep, desktop): + result = desktop.highlight_region([100, 200], [300, 400], duration=1.0, color="red") + assert "Highlighted" in result + assert "100" in result + assert "200" in result + assert "300x400" in result + assert "red" in result + mock_sleep.assert_called_once_with(1.0) + + def test_invalid_loc(self, desktop): + result = desktop.highlight_region([100], [300, 400]) + assert "Error" in result + assert "loc" in result + + def test_invalid_size(self, desktop): + result = desktop.highlight_region([100, 200], [300]) + assert "Error" in result + assert "size" in result + + @patch("windows_mcp.desktop.service.sleep") + @patch("windows_mcp.desktop.service.ctypes") + def test_all_colors(self, mock_ctypes, mock_sleep, desktop): + for color in ("red", "green", "blue", "yellow"): + result = desktop.highlight_region([0, 0], [100, 100], color=color) + assert "Error" not in result + + def test_highlight_colors_map(self): + assert "red" in _HIGHLIGHT_COLORS + assert "green" in _HIGHLIGHT_COLORS + assert "blue" in _HIGHLIGHT_COLORS + assert "yellow" in _HIGHLIGHT_COLORS diff --git a/tests/test_key_hold.py b/tests/test_key_hold.py new file mode 100644 index 0000000..9d30283 --- /dev/null +++ b/tests/test_key_hold.py @@ -0,0 +1,72 @@ +from unittest.mock import patch + +import pytest + +from windows_mcp.desktop.service import Desktop, _VK_MAP + + +@pytest.fixture +def desktop(): + with patch.object(Desktop, "__init__", lambda self: None): + d = Desktop() + return d + + +class TestKeyHold: + @patch("windows_mcp.desktop.service.uia") + def test_press_single_key(self, mock_uia, desktop): + result = desktop.key_hold("down", ["shift"]) + assert "Pressed" in result + assert "shift" in result + mock_uia.PressKey.assert_called_once() + + @patch("windows_mcp.desktop.service.uia") + def test_release_single_key(self, mock_uia, desktop): + result = desktop.key_hold("up", ["ctrl"]) + assert "Released" in result + assert "ctrl" in result + mock_uia.ReleaseKey.assert_called_once() + + @patch("windows_mcp.desktop.service.uia") + def test_press_multiple_keys(self, mock_uia, desktop): + result = desktop.key_hold("down", ["shift", "ctrl", "alt"]) + assert "Pressed" in result + assert mock_uia.PressKey.call_count == 3 + + @patch("windows_mcp.desktop.service.uia") + def test_single_character_key(self, mock_uia, desktop): + result = desktop.key_hold("down", ["a"]) + assert "Pressed" in result + assert "a" in result + call_args = mock_uia.PressKey.call_args + assert call_args[0][0] == ord("A") + + def test_unknown_key_returns_error(self, desktop): + result = desktop.key_hold("down", ["nonexistent_key_xyz"]) + assert "Error" in result + assert "Unknown key" in result + assert "nonexistent_key_xyz" in result + + def test_unknown_key_lists_available(self, desktop): + result = desktop.key_hold("down", ["invalidkey"]) + assert "Available keys" in result + assert "shift" in result + + @patch("windows_mcp.desktop.service.uia") + def test_key_aliases(self, mock_uia, desktop): + """ctrl and control should both work.""" + result1 = desktop.key_hold("down", ["ctrl"]) + result2 = desktop.key_hold("down", ["control"]) + assert "Error" not in result1 + assert "Error" not in result2 + + @patch("windows_mcp.desktop.service.uia") + def test_case_insensitive(self, mock_uia, desktop): + result = desktop.key_hold("down", ["SHIFT"]) + assert "Pressed" in result + assert "Error" not in result + + def test_vk_map_has_essential_keys(self): + essential = ["shift", "ctrl", "alt", "enter", "tab", "escape", "space", "f1", "f12"] + for key in essential: + assert key in _VK_MAP, f"Missing essential key: {key}" diff --git a/tests/test_mouse_path.py b/tests/test_mouse_path.py new file mode 100644 index 0000000..142a1d1 --- /dev/null +++ b/tests/test_mouse_path.py @@ -0,0 +1,52 @@ +from unittest.mock import patch + +import pytest + +from windows_mcp.desktop.service import Desktop + + +@pytest.fixture +def desktop(): + with patch.object(Desktop, "__init__", lambda self: None): + d = Desktop() + return d + + +class TestMousePath: + @patch("windows_mcp.desktop.service.sleep") + @patch("windows_mcp.desktop.service.uia") + def test_two_waypoints(self, mock_uia, mock_sleep, desktop): + result = desktop.mouse_path([[0, 0], [100, 100]], duration=0.1) + assert "2 waypoints" in result + assert mock_uia.MoveTo.called + + @patch("windows_mcp.desktop.service.sleep") + @patch("windows_mcp.desktop.service.uia") + def test_multiple_waypoints(self, mock_uia, mock_sleep, desktop): + path = [[0, 0], [50, 50], [100, 0], [150, 50]] + result = desktop.mouse_path(path, duration=0.2) + assert "4 waypoints" in result + + def test_single_waypoint_error(self, desktop): + result = desktop.mouse_path([[100, 200]]) + assert "Error" in result + assert "at least 2" in result + + def test_empty_path_error(self, desktop): + result = desktop.mouse_path([]) + assert "Error" in result + + def test_invalid_waypoint_shape(self, desktop): + result = desktop.mouse_path([[0, 0], [100]]) + assert "Error" in result + assert "waypoint" in result + + @patch("windows_mcp.desktop.service.sleep") + @patch("windows_mcp.desktop.service.uia") + def test_endpoints_visited(self, mock_uia, mock_sleep, desktop): + desktop.mouse_path([[10, 20], [30, 40]], duration=0.01) + calls = [call[0] for call in mock_uia.MoveTo.call_args_list] + # First point + assert calls[0] == (10, 20) + # Last point + assert calls[-1] == (30, 40) diff --git a/tests/test_pixel_color.py b/tests/test_pixel_color.py new file mode 100644 index 0000000..85201e2 --- /dev/null +++ b/tests/test_pixel_color.py @@ -0,0 +1,86 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from windows_mcp.desktop.service import Desktop +from windows_mcp.desktop.utils import approximate_color_name + + +@pytest.fixture +def desktop(): + with patch.object(Desktop, "__init__", lambda self: None): + d = Desktop() + return d + + +class TestApproximateColorName: + def test_exact_red(self): + assert approximate_color_name(255, 0, 0) == "red" + + def test_exact_green(self): + assert approximate_color_name(0, 128, 0) == "green" + + def test_exact_blue(self): + assert approximate_color_name(0, 0, 255) == "blue" + + def test_exact_white(self): + assert approximate_color_name(255, 255, 255) == "white" + + def test_exact_black(self): + assert approximate_color_name(0, 0, 0) == "black" + + def test_near_red(self): + assert approximate_color_name(250, 5, 5) == "red" + + def test_near_yellow(self): + assert approximate_color_name(250, 250, 10) == "yellow" + + def test_returns_string(self): + result = approximate_color_name(100, 100, 100) + assert isinstance(result, str) + assert len(result) > 0 + + +class TestPixelColor: + @patch("windows_mcp.desktop.service.ImageGrab") + def test_success(self, mock_grab, desktop): + mock_img = MagicMock() + mock_img.getpixel.return_value = (255, 0, 0) + mock_grab.grab.return_value = mock_img + result = desktop.get_pixel_color([100, 200]) + assert "R=255" in result + assert "G=0" in result + assert "B=0" in result + assert "#FF0000" in result + assert "red" in result + + @patch("windows_mcp.desktop.service.ImageGrab") + def test_white_pixel(self, mock_grab, desktop): + mock_img = MagicMock() + mock_img.getpixel.return_value = (255, 255, 255) + mock_grab.grab.return_value = mock_img + result = desktop.get_pixel_color([0, 0]) + assert "#FFFFFF" in result + assert "white" in result + + def test_invalid_loc_length(self, desktop): + result = desktop.get_pixel_color([100]) + assert "Error" in result + + def test_invalid_loc_too_many(self, desktop): + result = desktop.get_pixel_color([1, 2, 3]) + assert "Error" in result + + @patch("windows_mcp.desktop.service.ImageGrab") + def test_grab_exception(self, mock_grab, desktop): + mock_grab.grab.side_effect = OSError("Screen capture failed") + result = desktop.get_pixel_color([100, 200]) + assert "Error" in result + + @patch("windows_mcp.desktop.service.ImageGrab") + def test_hex_format(self, mock_grab, desktop): + mock_img = MagicMock() + mock_img.getpixel.return_value = (10, 20, 30) + mock_grab.grab.return_value = mock_img + result = desktop.get_pixel_color([50, 50]) + assert "#0A141E" in result diff --git a/tests/test_screen_info.py b/tests/test_screen_info.py new file mode 100644 index 0000000..54c86a5 --- /dev/null +++ b/tests/test_screen_info.py @@ -0,0 +1,55 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from windows_mcp.desktop.service import Desktop +from windows_mcp.desktop.views import Size + + +@pytest.fixture +def desktop(): + with patch.object(Desktop, "__init__", lambda self: None): + d = Desktop() + d.execute_command = MagicMock() + d.get_screen_size = MagicMock(return_value=Size(width=1920, height=1080)) + return d + + +class TestScreenInfo: + def test_single_monitor(self, desktop): + desktop.execute_command.return_value = ( + "\\\\.\\DISPLAY1|1920|1080|0|0|True\n", + 0, + ) + result = desktop.get_screen_info() + assert "Monitors (1)" in result + assert "1920x1080" in result + assert "(primary)" in result + + def test_dual_monitors(self, desktop): + desktop.execute_command.return_value = ( + "\\\\.\\DISPLAY1|1920|1080|0|0|True\n\\\\.\\DISPLAY2|2560|1440|1920|0|False\n", + 0, + ) + result = desktop.get_screen_info() + assert "Monitors (2)" in result + assert "1920x1080" in result + assert "2560x1440" in result + assert "(primary)" in result + + def test_command_failure_fallback(self, desktop): + desktop.execute_command.return_value = ("Error", 1) + result = desktop.get_screen_info() + assert "Monitors (1)" in result + assert "1920x1080" in result + + def test_empty_output_fallback(self, desktop): + desktop.execute_command.return_value = ("", 0) + result = desktop.get_screen_info() + assert "Monitors (1)" in result + + def test_exception_fallback(self, desktop): + desktop.execute_command.side_effect = RuntimeError("PowerShell not found") + result = desktop.get_screen_info() + assert "Monitors (1)" in result + assert "1920x1080" in result diff --git a/tests/test_screen_reader.py b/tests/test_screen_reader.py new file mode 100644 index 0000000..09a0021 --- /dev/null +++ b/tests/test_screen_reader.py @@ -0,0 +1,76 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from windows_mcp.desktop.service import Desktop + + +@pytest.fixture +def desktop(): + with patch.object(Desktop, "__init__", lambda self: None): + d = Desktop() + d.execute_command = MagicMock() + return d + + +class TestScreenReader: + @patch("windows_mcp.desktop.service.os") + @patch("windows_mcp.desktop.service.tempfile") + @patch("windows_mcp.desktop.service.ImageGrab") + def test_success_windows_ocr(self, mock_grab, mock_tempfile, mock_os, desktop): + mock_img = MagicMock() + mock_grab.grab.return_value = mock_img + mock_tmp = MagicMock() + mock_tmp.__enter__ = MagicMock(return_value=mock_tmp) + mock_tmp.__exit__ = MagicMock(return_value=False) + mock_tmp.name = "C:\\temp\\ocr.png" + mock_tempfile.NamedTemporaryFile.return_value = mock_tmp + desktop.execute_command.return_value = ("Hello World\n", 0) + + result = desktop.read_screen_text() + assert "OCR text" in result + assert "Hello World" in result + + @patch("windows_mcp.desktop.service.os") + @patch("windows_mcp.desktop.service.tempfile") + @patch("windows_mcp.desktop.service.ImageGrab") + def test_no_text_detected(self, mock_grab, mock_tempfile, mock_os, desktop): + mock_img = MagicMock() + mock_grab.grab.return_value = mock_img + mock_tmp = MagicMock() + mock_tmp.__enter__ = MagicMock(return_value=mock_tmp) + mock_tmp.__exit__ = MagicMock(return_value=False) + mock_tmp.name = "C:\\temp\\ocr.png" + mock_tempfile.NamedTemporaryFile.return_value = mock_tmp + desktop.execute_command.return_value = ("\n", 0) + + result = desktop.read_screen_text() + assert "No text detected" in result + + def test_invalid_region(self, desktop): + result = desktop.read_screen_text(region=[100, 200]) + assert "Error" in result + assert "region" in result + + @patch("windows_mcp.desktop.service.os") + @patch("windows_mcp.desktop.service.tempfile") + @patch("windows_mcp.desktop.service.ImageGrab") + def test_region_capture(self, mock_grab, mock_tempfile, mock_os, desktop): + mock_img = MagicMock() + mock_grab.grab.return_value = mock_img + mock_tmp = MagicMock() + mock_tmp.__enter__ = MagicMock(return_value=mock_tmp) + mock_tmp.__exit__ = MagicMock(return_value=False) + mock_tmp.name = "C:\\temp\\ocr.png" + mock_tempfile.NamedTemporaryFile.return_value = mock_tmp + desktop.execute_command.return_value = ("Some text", 0) + + result = desktop.read_screen_text(region=[10, 20, 300, 200]) + assert "Error" not in result + mock_grab.grab.assert_called_once_with(bbox=(10, 20, 310, 220)) + + @patch("windows_mcp.desktop.service.ImageGrab") + def test_capture_exception(self, mock_grab, desktop): + mock_grab.grab.side_effect = OSError("No display") + result = desktop.read_screen_text() + assert "Error" in result diff --git a/tests/test_wait_for_change.py b/tests/test_wait_for_change.py new file mode 100644 index 0000000..1f27eb2 --- /dev/null +++ b/tests/test_wait_for_change.py @@ -0,0 +1,62 @@ +from unittest.mock import MagicMock, patch + +import pytest + +from windows_mcp.desktop.service import Desktop + + +@pytest.fixture +def desktop(): + with patch.object(Desktop, "__init__", lambda self: None): + d = Desktop() + return d + + +class TestWaitForChange: + def test_invalid_region(self, desktop): + result = desktop.wait_for_change([100, 200]) + assert "Error" in result + assert "region" in result + + @patch("windows_mcp.desktop.service.time") + @patch("windows_mcp.desktop.service.sleep") + @patch("windows_mcp.desktop.service.ImageGrab") + def test_change_detected(self, mock_grab, mock_sleep, mock_time, desktop): + """Should detect change when pixels differ beyond threshold.""" + baseline_img = MagicMock() + baseline_img.getdata.return_value = [(0, 0, 0)] * 100 + + changed_img = MagicMock() + # Change 50% of pixels + changed_img.getdata.return_value = [(255, 255, 255)] * 50 + [(0, 0, 0)] * 50 + + mock_grab.grab.side_effect = [baseline_img, changed_img] + mock_time.side_effect = [0.0, 0.0, 0.6] + + result = desktop.wait_for_change([0, 0, 10, 10], timeout=5.0, threshold=0.05) + assert "Change detected" in result + assert "50.0%" in result + + @patch("windows_mcp.desktop.service.time") + @patch("windows_mcp.desktop.service.sleep") + @patch("windows_mcp.desktop.service.ImageGrab") + def test_timeout(self, mock_grab, mock_sleep, mock_time, desktop): + """Should timeout when no significant change occurs.""" + same_img = MagicMock() + same_img.getdata.return_value = [(100, 100, 100)] * 100 + + mock_grab.grab.return_value = same_img + # baseline capture at t=0, then poll at t=0.5, t=1.0, ... until timeout + mock_time.side_effect = [0.0, 0.0, 0.5, 0.5, 1.0, 1.0, 1.5, 1.5, 2.1] + + result = desktop.wait_for_change( + [0, 0, 10, 10], timeout=2.0, threshold=0.05, poll_interval=0.5 + ) + assert "Timeout" in result + + @patch("windows_mcp.desktop.service.ImageGrab") + def test_capture_failure(self, mock_grab, desktop): + mock_grab.grab.side_effect = OSError("No display") + result = desktop.wait_for_change([0, 0, 100, 100]) + assert "Error" in result + assert "baseline" in result