diff --git a/.gitignore b/.gitignore index f62f237..0ffd3c6 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,6 @@ wheels/ .DS_Store *.*~ + +# Cloud downloads +cloud_downloads/ \ No newline at end of file diff --git a/examples/cloud_browser.py b/examples/cloud_browser.py index f641ea6..57c5ea3 100644 --- a/examples/cloud_browser.py +++ b/examples/cloud_browser.py @@ -10,7 +10,7 @@ async def main() -> None: # Open a cloud browser window and initialize the Narada UI agent. window = await narada.open_and_initialize_cloud_browser_window( session_name="my-cloud-browser-session", # Optional: label the session - session_timeout=3600, # Optional: session timeout in seconds + session_timeout=1800, # Optional: timeout in seconds, up to 8 hours, default is 30 minutes ) # Run a task in this browser window. @@ -21,8 +21,10 @@ async def main() -> None: ) ) + # All downloads are placed in the narada-python-sdk/cloud_downloads directory. print("Response:", response.model_dump_json(indent=2)) + # The cloud session is still running after exiting the context manager. # You can save the session ID for later reconnection or management. cloud_browser_session_id = window.cloud_browser_session_id diff --git a/packages/narada/src/narada/__init__.py b/packages/narada/src/narada/__init__.py index 3957d7b..4daaf08 100644 --- a/packages/narada/src/narada/__init__.py +++ b/packages/narada/src/narada/__init__.py @@ -9,6 +9,7 @@ from narada_core.models import Agent, File, Response, ResponseContent from narada.client import Narada +from narada.cloud_downloads import DownloadInfo from narada.config import BrowserConfig, ProxyConfig from narada.utils import download_file, render_html from narada.version import __version__ @@ -20,6 +21,7 @@ "BrowserConfig", "CloudBrowserWindow", "download_file", + "DownloadInfo", "File", "LocalBrowserWindow", "Narada", diff --git a/packages/narada/src/narada/client.py b/packages/narada/src/narada/client.py index ec99554..db6bb7b 100644 --- a/packages/narada/src/narada/client.py +++ b/packages/narada/src/narada/client.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import concurrent.futures import logging import os import subprocess @@ -17,6 +18,12 @@ NaradaTimeoutError, NaradaUnsupportedBrowserError, ) +from pathlib import Path + +from narada.cloud_downloads import ( + CDPDownloadHandler, + make_default_on_download_complete_callback, +) from narada_core.models import _SdkConfig from packaging.version import Version from playwright._impl._errors import Error as PlaywrightError @@ -73,6 +80,7 @@ def __init__( api_key = api_key or os.environ["NARADA_API_KEY"] self._auth_headers = {"x-api-key": api_key} self._console = Console() + self._pending_download_futures: list[concurrent.futures.Future] = [] async def __aenter__(self) -> Narada: await self._validate_sdk_config() @@ -85,6 +93,16 @@ async def __aexit__(self, *args: Any) -> None: if self._playwright_context_manager is None: return + # Wait for any in-flight download transfers (from default callback) so the + # browser is not closed until transfers finish. Await so the event loop can + # run the transfer coroutines (they were scheduled via run_coroutine_threadsafe). + for fut in self._pending_download_futures: + try: + await asyncio.wait_for(asyncio.wrap_future(fut), timeout=300) + except (asyncio.TimeoutError, Exception): + pass + self._pending_download_futures.clear() + await self._playwright_context_manager.__aexit__(*args) self._playwright_context_manager = None self._playwright = None @@ -261,18 +279,39 @@ async def _initialize_cloud_browser_window( logging.info("Waiting for Narada extension to be installed...") await asyncio.sleep(1) - # TODO: consider this + # TODO: This is a hack + await browser.close() + browser = await self._playwright.chromium.connect_over_cdp(cdp_websocket_url, headers=cdp_auth_headers) + context = browser.contexts[0] # Get side panel page - # side_panel_url = create_side_panel_url(config, browser_window_id) - # side_panel_page = next( - # (p for p in context.pages if p.url == side_panel_url), None - # ) - # await self._fix_download_behavior(side_panel_page) + side_panel_url = create_side_panel_url(config, browser_window_id) + side_panel_page = next( + (p for p in context.pages if p.url == side_panel_url), None + ) + await self._fix_download_behavior(side_panel_page) + + # Set up browser-level CDP download handler to capture downloads from any tab. + # When no callback is provided, use a default that transfers files to local disk. + on_download_complete = config.on_download_complete + if on_download_complete is None: + loop = asyncio.get_running_loop() + base_dir = Path.cwd() / "cloud_downloads" + on_download_complete = make_default_on_download_complete_callback( + browser, loop, base_dir, self._pending_download_futures + ) + + download_handler = CDPDownloadHandler( + session_id=session_id, + on_download_complete=on_download_complete, + ) + await download_handler.setup(browser) cloud_window = CloudBrowserWindow( browser_window_id=browser_window_id, session_id=session_id, auth_headers=self._auth_headers, + browser=browser, + download_handler=download_handler, ) if config.interactive: diff --git a/packages/narada/src/narada/cloud_downloads.py b/packages/narada/src/narada/cloud_downloads.py new file mode 100644 index 0000000..7657076 --- /dev/null +++ b/packages/narada/src/narada/cloud_downloads.py @@ -0,0 +1,442 @@ +"""CDP-based download handling and file transfer for cloud browser sessions. + +Uses a browser-level CDP session to capture downloads from all tabs via +Browser.downloadWillBegin / Browser.downloadProgress events, and CDP Fetch + IO.read +to stream remote files to local without loading the entire file into memory. + +Ported from agentcore-download-solutions/custom_agentcore_playwright_cdp_streaming.py. +""" + +from __future__ import annotations + +import asyncio +import base64 +import logging +import time +from dataclasses import dataclass +from pathlib import Path +from typing import TYPE_CHECKING, Any, Callable + +if TYPE_CHECKING: + from playwright.async_api import Browser + +logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) + +DEFAULT_REMOTE_DOWNLOAD_DIR = "/tmp/remote_downloads" +CHUNK_SIZE = 4 * 1024 * 1024 # 4 MB + +# When wait_for_all is called with empty _downloads, poll this long for downloads +# to appear (avoids race where post-action transfer misses a download that +# starts moments later). +_WAIT_FOR_ALL_EMPTY_GRACE_SECONDS = 5.0 + +# CDP Browser.downloadProgress state values +_STATE_IN_PROGRESS = "inProgress" +_STATE_COMPLETED = "completed" +_STATE_CANCELED = "canceled" +_STATE_INTERRUPTED = "interrupted" + + +@dataclass +class DownloadInfo: + """Metadata about a completed download on the remote browser.""" + + guid: str + filename: str + remote_path: str + size: int + + +def local_path_for_session_download( + base_dir: str | Path, + session_id: str, + filename: str, +) -> Path: + """Build the local path for a session download so concurrent sessions do not overwrite. + + Files are placed under ``base_dir / session_id / filename``. + """ + return Path(base_dir) / session_id / filename + + +def make_default_on_download_complete_callback( + browser: "Browser", + loop: asyncio.AbstractEventLoop, + base_dir: str | Path, + pending_futures: list[Any], +) -> Callable[[str, str, str], None]: + """Return a sync callback (session_id, guid, filename) -> None that transfers each + completed download to ``base_dir / session_id / filename`` and appends the + transfer future to *pending_futures* so the caller can wait before closing the browser. + """ + base_dir = Path(base_dir) + + def on_download_complete(session_id: str, guid: str, filename: str) -> None: + local_path = local_path_for_session_download(base_dir, session_id, filename) + local_path.parent.mkdir(parents=True, exist_ok=True) + remote_path = f"{DEFAULT_REMOTE_DOWNLOAD_DIR}/{guid}" + future = asyncio.run_coroutine_threadsafe( + download_remote_file_to_local(browser, remote_path, local_path), + loop, + ) + pending_futures.append(future) + + return on_download_complete + + +class CDPDownloadHandler: + """Tracks downloads on a remote cloud browser via a browser-level CDP session. + + Using a *browser-level* CDP session (``browser.new_browser_cdp_session()``) + ensures that downloads triggered by **any** tab are captured, which is critical + for cloud browser sessions where the agent may open new tabs. + + If *on_download_complete* is set, each completed download triggers that sync + callable in a thread (run_in_executor), so the CDP event loop is not blocked. + Signature: (session_id: str, guid: str, filename: str) -> None. + """ + + def __init__( + self, + remote_download_dir: str = DEFAULT_REMOTE_DOWNLOAD_DIR, + session_id: str | None = None, + on_download_complete: Callable[[str, str, str], None] | None = None, + ) -> None: + self._remote_download_dir = remote_download_dir + self._session_id = session_id + self._on_download_complete = on_download_complete + # guid -> dict with keys "filename", "state", "received" + self._downloads: dict[str, dict[str, Any]] = {} + # guid -> Event set when that download reaches a terminal state + self._done_events: dict[str, asyncio.Event] = {} + self._cdp_session: Any | None = None + self._browser: Browser | None = None + + async def setup(self, browser: Browser) -> None: + """Attach to the browser and start listening for download events.""" + self._browser = browser + self._cdp_session = await browser.new_browser_cdp_session() + + await self._cdp_session.send( + "Browser.setDownloadBehavior", + { + "behavior": "allowAndName", + "downloadPath": self._remote_download_dir, + "eventsEnabled": True, + }, + ) + self._cdp_session.on( + "Browser.downloadWillBegin", + lambda event: asyncio.create_task(self._on_download_begin(event)), + ) + self._cdp_session.on( + "Browser.downloadProgress", + lambda event: asyncio.create_task(self._on_download_progress(event)), + ) + + async def _on_download_begin(self, event: dict[str, Any]) -> None: + guid: str = event.get("guid", "") + filename: str = event.get("suggestedFilename", "download") + self._downloads[guid] = { + "filename": filename, + "state": _STATE_IN_PROGRESS, + "received": 0, + } + self._done_events[guid] = asyncio.Event() + + async def _on_download_progress(self, event: dict[str, Any]) -> None: + guid: str = event.get("guid", "") + state: str = event.get("state", "") + received: int = event.get("receivedBytes", 0) + + if guid in self._downloads: + self._downloads[guid]["state"] = state + self._downloads[guid]["received"] = received + + if state in (_STATE_COMPLETED, _STATE_CANCELED, _STATE_INTERRUPTED): + if guid in self._done_events: + self._done_events[guid].set() + + if state == _STATE_COMPLETED: + filename = self._downloads.get(guid, {}).get("filename", guid) + if self._on_download_complete and self._session_id: + loop = asyncio.get_running_loop() + loop.run_in_executor( + None, + lambda: self._on_download_complete( + self._session_id, guid, filename + ), + ) + elif state in (_STATE_CANCELED, _STATE_INTERRUPTED): + pass + + async def wait_for_download( + self, *, timeout: float | None = None + ) -> DownloadInfo | None: + """Wait for the next download to complete and return its info. + + If no download events have been received yet, this will block until one + arrives and finishes (or the *timeout* expires). If a download already + completed before this call (e.g. post-action usage), returns that + completed download immediately. + + Returns ``None`` on timeout or if the download was canceled/interrupted. + """ + # Prefer an in-progress download so we wait for it; otherwise use an + # already-completed one (post-action usage). + target_guid: str | None = None + last_completed_guid: str | None = None + for guid, info in self._downloads.items(): + if info["state"] == _STATE_IN_PROGRESS: + target_guid = guid + break + if info["state"] == _STATE_COMPLETED: + last_completed_guid = guid + + if target_guid is None and last_completed_guid is not None: + # At least one download already completed; return the most recent. + info = self._downloads[last_completed_guid] + return DownloadInfo( + guid=last_completed_guid, + filename=info["filename"], + remote_path=f"{self._remote_download_dir}/{last_completed_guid}", + size=info["received"], + ) + + if target_guid is None: + # No in-progress and no completed; wait for a new one by polling. + loop = asyncio.get_running_loop() + deadline = (loop.time() + timeout) if timeout is not None else None + while True: + for guid, info in self._downloads.items(): + if info["state"] == _STATE_IN_PROGRESS: + target_guid = guid + break + if info["state"] == _STATE_COMPLETED: + last_completed_guid = guid + if target_guid is not None: + break + if last_completed_guid is not None: + # New completed download appeared while polling + info = self._downloads[last_completed_guid] + return DownloadInfo( + guid=last_completed_guid, + filename=info["filename"], + remote_path=f"{self._remote_download_dir}/{last_completed_guid}", + size=info["received"], + ) + if deadline is not None and loop.time() >= deadline: + return None + await asyncio.sleep(0.5) + + assert target_guid is not None + done_event = self._done_events[target_guid] + + try: + if timeout is not None: + await asyncio.wait_for(done_event.wait(), timeout=timeout) + else: + await done_event.wait() + except asyncio.TimeoutError: + logger.warning("Timeout waiting for download %s", target_guid) + return None + + info = self._downloads[target_guid] + if info["state"] != _STATE_COMPLETED: + return None + + return DownloadInfo( + guid=target_guid, + filename=info["filename"], + remote_path=f"{self._remote_download_dir}/{target_guid}", + size=info["received"], + ) + + async def wait_for_all( + self, *, timeout: float | None = None + ) -> list[DownloadInfo]: + """Wait for **all** tracked downloads to reach a terminal state. + + Returns a list of :class:`DownloadInfo` for every download that completed + successfully. Downloads that were canceled or interrupted are skipped. + + When no downloads are tracked yet (e.g. post-action call before + Browser.downloadWillBegin has fired), waits a short time for downloads + to appear so in-flight download events are not missed. + """ + loop = asyncio.get_running_loop() + deadline: float | None = None + if timeout is not None: + deadline = loop.time() + timeout + empty_wait_until: float = ( + loop.time() + _WAIT_FOR_ALL_EMPTY_GRACE_SECONDS + if timeout is None + else deadline + ) + + # Avoid race: if _downloads is empty, wait for at least one download to + # appear (post-action usage where download starts moments later). + while not self._downloads: + if loop.time() >= empty_wait_until: + return [] + await asyncio.sleep(0.5) + + if deadline is not None: + timeout = max(0, deadline - loop.time()) + + # Gather all done-events with an optional timeout. + waiter = asyncio.gather(*(ev.wait() for ev in self._done_events.values())) + try: + if timeout is not None: + await asyncio.wait_for(waiter, timeout=timeout) + else: + await waiter + except asyncio.TimeoutError: + logger.warning("Timeout waiting for all downloads") + + results: list[DownloadInfo] = [] + for guid, info in self._downloads.items(): + if info["state"] == _STATE_COMPLETED: + results.append( + DownloadInfo( + guid=guid, + filename=info["filename"], + remote_path=f"{self._remote_download_dir}/{guid}", + size=info["received"], + ) + ) + + return results + + +async def download_remote_file_to_local( + browser: Browser, + remote_file_path: str, + local_path: str | Path, +) -> Path | None: + """Stream a file from the remote browser filesystem to a local path. + + Strategy: + 1. Create a **fresh** browser context (safe even if existing contexts are + corrupted by tab open/close activity that Playwright didn't track). + 2. Use the CDP *Fetch* domain to intercept the ``file://`` response. + 3. ``Fetch.takeResponseBodyAsStream`` + ``IO.read`` to stream large files + in 4 MB chunks over the CDP WebSocket. + + Args: + browser: Playwright browser connected via CDP. + remote_file_path: Absolute path on the remote browser container + (e.g. ``/tmp/remote_downloads/{guid}``). + local_path: Local destination. Parent directories are created + automatically. + + Returns: + The resolved local :class:`~pathlib.Path`, or ``None`` on failure. + """ + local_path = Path(local_path) + local_path.parent.mkdir(parents=True, exist_ok=True) + + # Always create a fresh context -- the original one may be corrupted by manual + # tab open/close activity that Playwright didn't track. + transfer_ctx = await browser.new_context() + transfer_page = await transfer_ctx.new_page() + cdp = await transfer_page.context.new_cdp_session(transfer_page) + + try: + read_start_ts = time.strftime("%H:%M:%S", time.localtime()) + + # Enable Fetch domain to intercept file:// responses + await cdp.send( + "Fetch.enable", + {"patterns": [{"urlPattern": "file://*", "requestStage": "Response"}]}, + ) + + stream_handle_holder: dict[str, str | None] = {} + fetch_done = asyncio.Event() + + async def _on_request_paused(event: dict[str, Any]) -> None: + request_id = event["requestId"] + try: + stream_result = await cdp.send( + "Fetch.takeResponseBodyAsStream", {"requestId": request_id} + ) + stream_handle_holder["handle"] = stream_result.get("stream") + except Exception: + pass + finally: + fetch_done.set() + + cdp.on( + "Fetch.requestPaused", + lambda ev: asyncio.create_task(_on_request_paused(ev)), + ) + + # Navigate to the file -- this triggers the Fetch intercept. + try: + await transfer_page.goto( + f"file://{remote_file_path}", wait_until="commit", timeout=30_000 + ) + except Exception: + pass # Navigation may abort for binary files, but Fetch still fires. + + # Wait for the Fetch intercept to fire. + try: + await asyncio.wait_for(fetch_done.wait(), timeout=30) + except asyncio.TimeoutError: + logger.warning("Timeout waiting for Fetch intercept") + return None + + await cdp.send("Fetch.disable") + + stream_handle = stream_handle_holder.get("handle") + if not stream_handle: + logger.warning("No stream handle obtained") + return None + + # Stream the file contents to local disk in chunks. + stream_start = time.perf_counter() + start_ts = time.strftime("%H:%M:%S", time.localtime()) + logger.info( + "Streaming file from remote to local: %s -> %s (started at %s)", + remote_file_path, + local_path, + start_ts, + ) + downloaded = 0 + + with open(local_path, "wb") as f: + while True: + read_result = await cdp.send( + "IO.read", {"handle": stream_handle, "size": CHUNK_SIZE} + ) + data: str = read_result.get("data", "") + is_b64: bool = read_result.get("base64Encoded", False) + eof: bool = read_result.get("eof", False) + + if data: + chunk = ( + base64.b64decode(data) if is_b64 else data.encode("utf-8") + ) + f.write(chunk) + downloaded += len(chunk) + + if eof: + break + + await cdp.send("IO.close", {"handle": stream_handle}) + stream_elapsed = time.perf_counter() - stream_start + logger.info( + "Transfer complete: %s (%s bytes) in %.2fs", + local_path, + f"{downloaded:,}", + stream_elapsed, + ) + return local_path + + except Exception as exc: + logger.exception("Transfer failed: %s", exc) + return None + + finally: + await transfer_ctx.close() diff --git a/packages/narada/src/narada/config.py b/packages/narada/src/narada/config.py index 6204fcf..8c8f30e 100644 --- a/packages/narada/src/narada/config.py +++ b/packages/narada/src/narada/config.py @@ -1,6 +1,7 @@ import sys from dataclasses import dataclass, field from pathlib import Path +from typing import Callable def _default_executable_path() -> str: @@ -75,6 +76,7 @@ class BrowserConfig: extension_id: str = "bhioaidlggjdkheaajakomifblpjmokn" interactive: bool = True proxy: ProxyConfig | None = None + on_download_complete: Callable[[str, str, str], None] | None = None @property def cdp_url(self) -> str: diff --git a/packages/narada/src/narada/window.py b/packages/narada/src/narada/window.py index 4ea9050..024a6b5 100644 --- a/packages/narada/src/narada/window.py +++ b/packages/narada/src/narada/window.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import asyncio import logging import os @@ -5,7 +7,7 @@ from abc import ABC from http import HTTPStatus from pathlib import Path -from typing import IO, Any, TypeVar, overload, override +from typing import IO, TYPE_CHECKING, Any, TypeVar, overload, override import aiohttp from narada_core.actions.models import ( @@ -55,6 +57,13 @@ from pydantic import BaseModel from narada.config import BrowserConfig +from playwright.async_api import Browser +from narada.cloud_downloads import ( + CDPDownloadHandler, + DownloadInfo, + local_path_for_session_download, +) + logger = logging.getLogger(__name__) @@ -657,6 +666,8 @@ def __init__( session_id: str, api_key: str | None = None, auth_headers: dict[str, str] | None = None, + browser: Browser | None = None, + download_handler: CDPDownloadHandler | None = None, ) -> None: base_url = os.getenv("NARADA_API_BASE_URL", "https://api.narada.ai/fast/v2") if auth_headers is None: @@ -668,18 +679,125 @@ def __init__( browser_window_id=browser_window_id, ) self._session_id = session_id + self._browser = browser + self._download_handler = download_handler @property def cloud_browser_session_id(self) -> str: return self._session_id + async def wait_for_download( + self, *, timeout: float | None = None + ) -> DownloadInfo | None: + """Wait for the next download to complete on the remote browser.""" + if self._download_handler is None: + logger.warning("Download handler not available on this window") + return None + return await self._download_handler.wait_for_download(timeout=timeout) + + async def transfer_download( + self, + download: DownloadInfo, + local_path: str | Path, + ) -> Path | None: + """Transfer a single completed download from the remote browser to local disk. + Uses CDP Fetch + IO.read to stream the file in chunks over the WebSocket. + """ + if self._browser is None: + logger.warning("Browser reference not available -- cannot transfer file") + return None + + from narada.cloud_downloads import download_remote_file_to_local + + return await download_remote_file_to_local( + self._browser, download.remote_path, local_path + ) + + async def transfer_all_downloads( + self, + local_dir: str | Path = "", + *, + timeout: float | None = None, + ) -> list[Path]: + """Wait for all tracked downloads, then transfer each to *local_dir*. + + This is a convenience wrapper that calls :meth:`wait_for_all` on the + download handler and then :meth:`transfer_download` for each result. + Files are saved under ``local_dir / session_id / filename`` so concurrent + sessions do not overwrite. + """ + if self._download_handler is None or self._browser is None: + logger.warning( + "Download handler / browser not available -- cannot transfer files" + ) + return [] + + local_dir = Path(local_dir) + local_dir.mkdir(parents=True, exist_ok=True) + + completed = await self._download_handler.wait_for_all(timeout=timeout) + + transferred: list[Path] = [] + for dl in completed: + dest = local_path_for_session_download(local_dir, self._session_id, dl.filename) + dest.parent.mkdir(parents=True, exist_ok=True) + result = await self.transfer_download(dl, dest) + if result is not None: + transferred.append(result) + else: + logger.warning("Transfer failed for %s", dl.filename) + + return transferred + + @override - async def close(self, *, timeout: int | None = None) -> None: + async def close( + self, + *, + timeout: int | None = None, + local_download_dir: str | Path | None = None, + download_wait_timeout: float = 60.0, + ) -> None: """Stops the cloud browser session. Unlike local browser windows where close() closes a single window, this stops the entire cloud session since the serverless container manages the browser lifecycle. + + Before disconnecting, any downloads captured by the CDP handler are transferred + to *local_download_dir* (default: ``./cloud_downloads``), so file downloads + from the session are available locally when using the SDK without a backend + callback. """ + if self._browser is not None and self._download_handler is not None: + download_dir = ( + Path(local_download_dir) + if local_download_dir is not None + else Path.cwd() / "cloud_downloads" + ) + try: + paths = await self.transfer_all_downloads( + download_dir, timeout=download_wait_timeout + ) + if paths: + logger.info( + "Transferred %d download(s) to %s before closing", + len(paths), + download_dir, + ) + except Exception as exc: + logger.warning( + "Failed to transfer downloads before close: %s", exc + ) + + # Disconnect Playwright from the browser + if self._browser is not None: + try: + await self._browser.close() + except Exception: + pass + self._browser = None + self._download_handler = None + await _stop_cloud_browser_session( base_url=self._base_url, auth_headers=self._auth_headers, @@ -709,7 +827,7 @@ async def _stop_cloud_browser_session( f"{base_url}/cloud-browser/stop-cloud-browser-session", headers=auth_headers, json={"session_id": session_id}, - timeout=aiohttp.ClientTimeout(total=timeout or 10), + timeout=aiohttp.ClientTimeout(total=timeout or 80), ) as resp: if resp.ok: response_data = await resp.json()