From 9f1606d7bb577cad19dfc0cdfeee8a2b6d04c728 Mon Sep 17 00:00:00 2001 From: Volodymyr Kasaraba Date: Fri, 13 Feb 2026 18:08:51 -0500 Subject: [PATCH 1/8] files download --- examples/cloud_browser.py | 26 ++ packages/narada/src/narada/__init__.py | 2 + packages/narada/src/narada/client.py | 19 +- packages/narada/src/narada/cloud_downloads.py | 397 ++++++++++++++++++ packages/narada/src/narada/config.py | 4 + packages/narada/src/narada/window.py | 181 +++++++- 6 files changed, 621 insertions(+), 8 deletions(-) create mode 100644 packages/narada/src/narada/cloud_downloads.py diff --git a/examples/cloud_browser.py b/examples/cloud_browser.py index f641ea6..fa28e1f 100644 --- a/examples/cloud_browser.py +++ b/examples/cloud_browser.py @@ -8,6 +8,8 @@ async def main() -> None: # Initialize the Narada client. async with Narada() as narada: # Open a cloud browser window and initialize the Narada UI agent. + # A CDP download handler is automatically set up behind the scenes so + # any file the agent downloads is tracked. window = await narada.open_and_initialize_cloud_browser_window( session_name="my-cloud-browser-session", # Optional: label the session session_timeout=3600, # Optional: session timeout in seconds @@ -23,6 +25,30 @@ async def main() -> None: print("Response:", response.model_dump_json(indent=2)) + # ── Download handling ──────────────────────────────────────── + # Transfer any files the agent downloaded during the task above. + # The CDP download handler captures download events automatically; + # transfer_all_downloads() waits for pending downloads and streams + # each file from the remote browser to the local directory. + downloaded_paths = await window.transfer_all_downloads() + if downloaded_paths: + print(f"Downloaded {len(downloaded_paths)} file(s):") + for path in downloaded_paths: + print(f" -> {path}") + + # Or wait for a single download and transfer it individually: + # + # download = await window.wait_for_download(timeout=60) + # if download is not None: + # local_path = await window.transfer_download(download, f"./downloads/{download.filename}") + # print(f"Saved: {local_path}") + # + # You can also inspect already-completed downloads without waiting: + # + # completed = await window.get_completed_downloads() + # print(f"{len(completed)} file(s) downloaded") + # ───────────────────────────────────────────────────────────── + # The cloud session is still running after exiting the context manager. # You can save the session ID for later reconnection or management. cloud_browser_session_id = window.cloud_browser_session_id diff --git a/packages/narada/src/narada/__init__.py b/packages/narada/src/narada/__init__.py index 3957d7b..4daaf08 100644 --- a/packages/narada/src/narada/__init__.py +++ b/packages/narada/src/narada/__init__.py @@ -9,6 +9,7 @@ from narada_core.models import Agent, File, Response, ResponseContent from narada.client import Narada +from narada.cloud_downloads import DownloadInfo from narada.config import BrowserConfig, ProxyConfig from narada.utils import download_file, render_html from narada.version import __version__ @@ -20,6 +21,7 @@ "BrowserConfig", "CloudBrowserWindow", "download_file", + "DownloadInfo", "File", "LocalBrowserWindow", "Narada", diff --git a/packages/narada/src/narada/client.py b/packages/narada/src/narada/client.py index ec99554..c0d61d5 100644 --- a/packages/narada/src/narada/client.py +++ b/packages/narada/src/narada/client.py @@ -261,19 +261,24 @@ async def _initialize_cloud_browser_window( logging.info("Waiting for Narada extension to be installed...") await asyncio.sleep(1) - # TODO: consider this - # Get side panel page - # side_panel_url = create_side_panel_url(config, browser_window_id) - # side_panel_page = next( - # (p for p in context.pages if p.url == side_panel_url), None - # ) - # await self._fix_download_behavior(side_panel_page) + # Set up browser-level CDP download handler so downloads from any tab are captured. + from narada.cloud_downloads import CDPDownloadHandler + + download_handler = CDPDownloadHandler( + session_id=session_id, + on_download_complete=config.on_download_complete, + ) + await download_handler.setup(browser) + print("[client] CDP download handler setup complete. browser=", browser) cloud_window = CloudBrowserWindow( browser_window_id=browser_window_id, session_id=session_id, auth_headers=self._auth_headers, + browser=browser, + download_handler=download_handler, ) + print("[client] cloud_window created. cloud_window=", cloud_window) if config.interactive: self._print_success_message(browser_window_id) diff --git a/packages/narada/src/narada/cloud_downloads.py b/packages/narada/src/narada/cloud_downloads.py new file mode 100644 index 0000000..786f848 --- /dev/null +++ b/packages/narada/src/narada/cloud_downloads.py @@ -0,0 +1,397 @@ +"""CDP-based download handling and file transfer for cloud browser sessions. + +Uses a browser-level CDP session to capture downloads from all tabs via +Browser.downloadWillBegin / Browser.downloadProgress events, and CDP Fetch + IO.read +to stream remote files to local without loading the entire file into memory. + +Ported from agentcore-download-solutions/custom_agentcore_playwright_cdp_streaming.py. +""" + +from __future__ import annotations + +import asyncio +import base64 +import logging +from dataclasses import dataclass +from pathlib import Path +from typing import TYPE_CHECKING, Any, Callable + +if TYPE_CHECKING: + from playwright.async_api import Browser + +logger = logging.getLogger(__name__) + +DEFAULT_REMOTE_DOWNLOAD_DIR = "/tmp/remote_downloads" +CHUNK_SIZE = 4 * 1024 * 1024 # 4 MB + + +@dataclass +class DownloadInfo: + """Metadata about a completed download on the remote browser.""" + + guid: str + filename: str + remote_path: str + size: int + + +class CDPDownloadHandler: + """Tracks downloads on a remote cloud browser via a browser-level CDP session. + + Using a *browser-level* CDP session (``browser.new_browser_cdp_session()``) + ensures that downloads triggered by **any** tab are captured, which is critical + for cloud browser sessions where the agent may open new tabs. + + If *on_download_complete* is set, each completed download triggers that sync + callable in a thread (run_in_executor), so the CDP event loop is not blocked. + Signature: (session_id: str, guid: str, filename: str) -> None. + """ + + def __init__( + self, + remote_download_dir: str = DEFAULT_REMOTE_DOWNLOAD_DIR, + session_id: str | None = None, + on_download_complete: Callable[[str, str, str], None] | None = None, + ) -> None: + self._remote_download_dir = remote_download_dir + self._session_id = session_id + self._on_download_complete = on_download_complete + # guid -> {filename, state, received} + self._downloads: dict[str, dict[str, Any]] = {} + # guid -> asyncio.Event (set when the download reaches a terminal state) + self._done_events: dict[str, asyncio.Event] = {} + self._cdp_session: Any | None = None # playwright CDPSession + self._browser: Browser | None = None + + # ------------------------------------------------------------------ + # Setup + # ------------------------------------------------------------------ + + async def setup(self, browser: Browser) -> None: + """Attach to the browser and start listening for download events.""" + self._browser = browser + self._cdp_session = await browser.new_browser_cdp_session() + + await self._cdp_session.send( + "Browser.setDownloadBehavior", + { + "behavior": "allowAndName", + "downloadPath": self._remote_download_dir, + "eventsEnabled": True, + }, + ) + + self._cdp_session.on( + "Browser.downloadWillBegin", + lambda event: asyncio.create_task(self._on_download_begin(event)), + ) + self._cdp_session.on( + "Browser.downloadProgress", + lambda event: asyncio.create_task(self._on_download_progress(event)), + ) + + print( + "[cloud_downloads] CDP download listeners attached (browser-level) " + f"session_id={self._session_id!r} on_download_complete={self._on_download_complete is not None}" + ) + + # ------------------------------------------------------------------ + # Internal event handlers + # ------------------------------------------------------------------ + + async def _on_download_begin(self, event: dict[str, Any]) -> None: + guid: str = event.get("guid", "") + filename: str = event.get("suggestedFilename", "download") + print(f"[cloud_downloads] Download started: {filename} (guid: {guid})") + self._downloads[guid] = { + "filename": filename, + "state": "inProgress", + "received": 0, + } + self._done_events[guid] = asyncio.Event() + + async def _on_download_progress(self, event: dict[str, Any]) -> None: + print("[cloud_downloads] _on_download_progress called") + guid: str = event.get("guid", "") + state: str = event.get("state", "") + received: int = event.get("receivedBytes", 0) + + if guid in self._downloads: + self._downloads[guid]["state"] = state + self._downloads[guid]["received"] = received + + if state == "completed": + filename = self._downloads.get(guid, {}).get("filename", guid) + print( + f"[cloud_downloads] Download completed: {filename} ({received:,} bytes)" + ) + if guid in self._done_events: + self._done_events[guid].set() + if self._on_download_complete and self._session_id: + print( + f"[cloud_downloads] Running transfer in executor (session_id={self._session_id}, filename={filename})" + ) + loop = asyncio.get_event_loop() + loop.run_in_executor( + None, + lambda: self._on_download_complete( + self._session_id, guid, filename + ), + ) + else: + print( + "[cloud_downloads] No transfer: on_download_complete or session_id not set" + ) + elif state in ("canceled", "interrupted"): + filename = self._downloads.get(guid, {}).get("filename", guid) + print(f"[cloud_downloads] Download {state}: {filename}") + if guid in self._done_events: + self._done_events[guid].set() + + # ------------------------------------------------------------------ + # Public query / wait helpers + # ------------------------------------------------------------------ + + @property + def has_pending_downloads(self) -> bool: + print("[cloud_downloads] has_pending_downloads called") + """Return ``True`` if any tracked download has not yet finished.""" + return any( + info["state"] == "inProgress" for info in self._downloads.values() + ) + + async def wait_for_download( + self, *, timeout: float | None = None + ) -> DownloadInfo | None: + """Wait for the next download to complete and return its info. + + If no download events have been received yet, this will block until one + arrives and finishes (or the *timeout* expires). + + Returns ``None`` on timeout or if the download was canceled/interrupted. + """ + print("[cloud_downloads] wait_for_download called") + # Find first download that hasn't finished yet, or the most recent completed one + # that hasn't been consumed. + target_guid: str | None = None + for guid, info in self._downloads.items(): + if info["state"] == "inProgress": + target_guid = guid + break + + if target_guid is None: + # All existing downloads are already done; wait for a new one by polling. + # We do a simple poll loop so we can detect newly arriving downloads. + deadline = ( + asyncio.get_event_loop().time() + timeout + if timeout is not None + else None + ) + while True: + for guid, info in self._downloads.items(): + if guid not in self._done_events or not self._done_events[guid].is_set(): + if info["state"] == "inProgress": + target_guid = guid + break + if target_guid is not None: + break + if deadline is not None and asyncio.get_event_loop().time() >= deadline: + return None + await asyncio.sleep(0.5) + + assert target_guid is not None + done_event = self._done_events[target_guid] + + try: + if timeout is not None: + await asyncio.wait_for(done_event.wait(), timeout=timeout) + else: + await done_event.wait() + except asyncio.TimeoutError: + logger.warning("Timeout waiting for download %s", target_guid) + return None + + info = self._downloads[target_guid] + if info["state"] != "completed": + return None + + return DownloadInfo( + guid=target_guid, + filename=info["filename"], + remote_path=f"{self._remote_download_dir}/{target_guid}", + size=info["received"], + ) + + async def wait_for_all( + self, *, timeout: float | None = None + ) -> list[DownloadInfo]: + """Wait for **all** tracked downloads to reach a terminal state. + + Returns a list of :class:`DownloadInfo` for every download that completed + successfully. Downloads that were canceled or interrupted are skipped. + """ + if not self._downloads: + print("[cloud_downloads] No downloads were captured") + return [] + + print( + f"[cloud_downloads] Waiting for {len(self._downloads)} download(s) to complete..." + ) + + # Gather all done-events with an optional timeout. + waiter = asyncio.gather(*(ev.wait() for ev in self._done_events.values())) + try: + if timeout is not None: + await asyncio.wait_for(waiter, timeout=timeout) + else: + await waiter + except asyncio.TimeoutError: + logger.warning("Timeout waiting for all downloads") + + results: list[DownloadInfo] = [] + for guid, info in self._downloads.items(): + if info["state"] == "completed": + results.append( + DownloadInfo( + guid=guid, + filename=info["filename"], + remote_path=f"{self._remote_download_dir}/{guid}", + size=info["received"], + ) + ) + else: + print( + f"[cloud_downloads] Skipping {info['filename']} -- ended with state: {info['state']}" + ) + + print( + f"[cloud_downloads] {len(results)}/{len(self._downloads)} download(s) succeeded" + ) + return results + + +# -------------------------------------------------------------------------- +# File transfer: remote browser -> local filesystem +# -------------------------------------------------------------------------- + + +async def download_remote_file_to_local( + browser: Browser, + remote_file_path: str, + local_path: str | Path, +) -> Path | None: + """Stream a file from the remote browser filesystem to a local path. + + Strategy: + 1. Create a **fresh** browser context (safe even if existing contexts are + corrupted by tab open/close activity that Playwright didn't track). + 2. Use the CDP *Fetch* domain to intercept the ``file://`` response. + 3. ``Fetch.takeResponseBodyAsStream`` + ``IO.read`` to stream large files + in 4 MB chunks over the CDP WebSocket. + + Args: + browser: Playwright browser connected via CDP. + remote_file_path: Absolute path on the remote browser container + (e.g. ``/tmp/remote_downloads/{guid}``). + local_path: Local destination. Parent directories are created + automatically. + + Returns: + The resolved local :class:`~pathlib.Path`, or ``None`` on failure. + """ + print("[cloud_downloads] download_remote_file_to_local called") + local_path = Path(local_path) + local_path.parent.mkdir(parents=True, exist_ok=True) + + # Always create a fresh context -- the original one may be corrupted by manual + # tab open/close activity that Playwright didn't track. + transfer_ctx = await browser.new_context() + transfer_page = await transfer_ctx.new_page() + cdp = await transfer_page.context.new_cdp_session(transfer_page) + + try: + print(f"[cloud_downloads] Reading remote file: {remote_file_path}") + + # Enable Fetch domain to intercept file:// responses + await cdp.send( + "Fetch.enable", + {"patterns": [{"urlPattern": "file://*", "requestStage": "Response"}]}, + ) + + stream_handle_holder: dict[str, str | None] = {} + fetch_done = asyncio.Event() + + async def _on_request_paused(event: dict[str, Any]) -> None: + request_id = event["requestId"] + try: + stream_result = await cdp.send( + "Fetch.takeResponseBodyAsStream", {"requestId": request_id} + ) + stream_handle_holder["handle"] = stream_result.get("stream") + except Exception as exc: + print(f"[cloud_downloads] takeResponseBodyAsStream failed: {exc}") + finally: + fetch_done.set() + + cdp.on( + "Fetch.requestPaused", + lambda ev: asyncio.create_task(_on_request_paused(ev)), + ) + + # Navigate to the file -- this triggers the Fetch intercept. + try: + await transfer_page.goto( + f"file://{remote_file_path}", wait_until="commit", timeout=30_000 + ) + except Exception: + pass # Navigation may abort for binary files, but Fetch still fires. + + # Wait for the Fetch intercept to fire. + try: + await asyncio.wait_for(fetch_done.wait(), timeout=30) + except asyncio.TimeoutError: + print("[cloud_downloads] Timeout waiting for Fetch intercept") + return None + + await cdp.send("Fetch.disable") + + stream_handle = stream_handle_holder.get("handle") + if not stream_handle: + print("[cloud_downloads] No stream handle obtained") + return None + + # Stream the file contents to local disk in chunks. + print( + f"[cloud_downloads] Streaming file from remote to local: {remote_file_path} -> {local_path}" + ) + downloaded = 0 + + with open(local_path, "wb") as f: + while True: + read_result = await cdp.send( + "IO.read", {"handle": stream_handle, "size": CHUNK_SIZE} + ) + data: str = read_result.get("data", "") + is_b64: bool = read_result.get("base64Encoded", False) + eof: bool = read_result.get("eof", False) + + if data: + chunk = ( + base64.b64decode(data) if is_b64 else data.encode("utf-8") + ) + f.write(chunk) + downloaded += len(chunk) + + if eof: + break + + await cdp.send("IO.close", {"handle": stream_handle}) + print(f"[cloud_downloads] Transfer complete: {local_path} ({downloaded:,} bytes)") + return local_path + + except Exception as exc: + print(f"[cloud_downloads] Transfer failed: {exc}") + return None + + finally: + await transfer_ctx.close() diff --git a/packages/narada/src/narada/config.py b/packages/narada/src/narada/config.py index 6204fcf..df85576 100644 --- a/packages/narada/src/narada/config.py +++ b/packages/narada/src/narada/config.py @@ -1,6 +1,7 @@ import sys from dataclasses import dataclass, field from pathlib import Path +from typing import Callable def _default_executable_path() -> str: @@ -75,6 +76,9 @@ class BrowserConfig: extension_id: str = "bhioaidlggjdkheaajakomifblpjmokn" interactive: bool = True proxy: ProxyConfig | None = None + # If set, each completed cloud-browser download runs this sync callable in a thread (run_in_executor). + # Signature: (session_id: str, guid: str, filename: str) -> None. + on_download_complete: Callable[[str, str, str], None] | None = None @property def cdp_url(self) -> str: diff --git a/packages/narada/src/narada/window.py b/packages/narada/src/narada/window.py index 4ea9050..3810ef2 100644 --- a/packages/narada/src/narada/window.py +++ b/packages/narada/src/narada/window.py @@ -1,3 +1,5 @@ +from __future__ import annotations + import asyncio import logging import os @@ -5,7 +7,7 @@ from abc import ABC from http import HTTPStatus from pathlib import Path -from typing import IO, Any, TypeVar, overload, override +from typing import IO, TYPE_CHECKING, Any, TypeVar, overload, override import aiohttp from narada_core.actions.models import ( @@ -56,6 +58,11 @@ from narada.config import BrowserConfig +if TYPE_CHECKING: + from playwright.async_api import Browser + + from narada.cloud_downloads import CDPDownloadHandler, DownloadInfo + logger = logging.getLogger(__name__) _StructuredOutput = TypeVar("_StructuredOutput", bound=BaseModel) @@ -648,6 +655,11 @@ class CloudBrowserWindow(BaseBrowserWindow): This class connects to a cloud browser session created by the backend API and provides the same interface as other browser window classes for agent operations. + + When created via :meth:`Narada.open_and_initialize_cloud_browser_window`, a + :class:`~narada.cloud_downloads.CDPDownloadHandler` is automatically set up so that + any files downloaded during agent actions can be transferred to local disk via the + :meth:`transfer_download` / :meth:`transfer_all_downloads` convenience methods. """ def __init__( @@ -657,6 +669,8 @@ def __init__( session_id: str, api_key: str | None = None, auth_headers: dict[str, str] | None = None, + browser: Browser | None = None, + download_handler: CDPDownloadHandler | None = None, ) -> None: base_url = os.getenv("NARADA_API_BASE_URL", "https://api.narada.ai/fast/v2") if auth_headers is None: @@ -668,11 +682,167 @@ def __init__( browser_window_id=browser_window_id, ) self._session_id = session_id + self._browser = browser + self._download_handler = download_handler @property def cloud_browser_session_id(self) -> str: return self._session_id + # ------------------------------------------------------------------ + # Download helpers + # ------------------------------------------------------------------ + + @property + def has_pending_downloads(self) -> bool: + """Return ``True`` if any tracked download is still in progress.""" + if self._download_handler is None: + return False + return self._download_handler.has_pending_downloads + + async def wait_for_download( + self, *, timeout: float | None = None + ) -> DownloadInfo | None: + """Wait for the next download to complete on the remote browser. + + Returns :class:`~narada.cloud_downloads.DownloadInfo` on success, or + ``None`` on timeout / cancellation. + """ + print("[cloud_browser_window] wait_for_download called") + if self._download_handler is None: + logger.warning("Download handler not available on this window") + return None + return await self._download_handler.wait_for_download(timeout=timeout) + + async def get_completed_downloads(self) -> list[DownloadInfo]: + """Return info for all downloads that have completed so far (non-blocking).""" + if self._download_handler is None: + return [] + # Collect completed downloads from the handler's internal state. + from narada.cloud_downloads import DownloadInfo as _DI + + results: list[DownloadInfo] = [] + handler = self._download_handler + for guid, info in handler._downloads.items(): + if info["state"] == "completed": + results.append( + _DI( + guid=guid, + filename=info["filename"], + remote_path=f"{handler._remote_download_dir}/{guid}", + size=info["received"], + ) + ) + return results + + async def transfer_download( + self, + download: DownloadInfo, + local_path: str | Path, + ) -> Path | None: + """Transfer a single completed download from the remote browser to local disk. + + Uses CDP Fetch + IO.read to stream the file in chunks over the WebSocket. + + Args: + download: A :class:`~narada.cloud_downloads.DownloadInfo` obtained from + :meth:`wait_for_download` or :meth:`get_completed_downloads`. + local_path: Where to save the file locally. + + Returns: + The resolved local path, or ``None`` on failure. + """ + print("[cloud_browser_window] transfer_download called") + if self._browser is None: + logger.warning("Browser reference not available -- cannot transfer file") + return None + + from narada.cloud_downloads import download_remote_file_to_local + + return await download_remote_file_to_local( + self._browser, download.remote_path, local_path + ) + + async def transfer_all_downloads( + self, + local_dir: str | Path = "/Users/Volodymyr/projects/narada/remote_downloads", + *, + timeout: float | None = None, + ) -> list[Path]: + """Wait for all tracked downloads, then transfer each to *local_dir*. + + This is a convenience wrapper that calls :meth:`wait_for_all` on the + download handler and then :meth:`transfer_download` for each result. + + Args: + local_dir: Directory to save files into. Created automatically. + timeout: Maximum seconds to wait for downloads to finish. + + Returns: + List of local paths for successfully transferred files. + """ + print("[cloud_browser_window] transfer_all_downloads called") + if self._download_handler is None or self._browser is None: + logger.warning( + "Download handler / browser not available -- cannot transfer files" + ) + return [] + + local_dir = Path(local_dir) + local_dir.mkdir(parents=True, exist_ok=True) + + completed = await self._download_handler.wait_for_all(timeout=timeout) + + transferred: list[Path] = [] + for dl in completed: + dest = local_dir / dl.filename + result = await self.transfer_download(dl, dest) + if result is not None: + transferred.append(result) + else: + logger.warning("Transfer failed for %s", dl.filename) + + return transferred + + # ------------------------------------------------------------------ + # Debug + # ------------------------------------------------------------------ + + async def pause(self) -> None: + """Pause execution so you can interact with the cloud browser manually. + + Prints the session ID and a prompt, then blocks until you press ENTER. + CDP download listeners remain active, so any downloads you trigger + during the pause will be captured. After resuming, all completed + downloads are automatically transferred to the default local directory. + """ + print( + f"\n{'=' * 60}" + f"\n Cloud browser session: {self._session_id}" + f"\n Browser window ID: {self._browser_window_id}" + f"\n{'=' * 60}" + f"\n" + f"\n >>> Session is PAUSED -- interact with the browser manually." + f"\n CDP download listeners are active; downloads will be captured." + f"\n" + f"\n Press ENTER to resume and transfer downloads..." + f"\n" + ) + await asyncio.get_event_loop().run_in_executor(None, input) + + # Auto-transfer any downloads that were captured during the pause. + paths = await self.transfer_all_downloads() + if paths: + print(f"\n[cloud_downloads] Transferred {len(paths)} file(s):") + for p in paths: + print(f" -> {p}") + else: + print("\n[cloud_downloads] No downloads to transfer.") + + # ------------------------------------------------------------------ + # Lifecycle + # ------------------------------------------------------------------ + @override async def close(self, *, timeout: int | None = None) -> None: """Stops the cloud browser session. @@ -680,6 +850,15 @@ async def close(self, *, timeout: int | None = None) -> None: Unlike local browser windows where close() closes a single window, this stops the entire cloud session since the serverless container manages the browser lifecycle. """ + # Disconnect Playwright from the browser (does not stop the remote session). + if self._browser is not None: + try: + self._browser.close() + except Exception: + pass + self._browser = None + self._download_handler = None + await _stop_cloud_browser_session( base_url=self._base_url, auth_headers=self._auth_headers, From 89a6126ec2daf37a16f52e0b1ca3231c48f62a12 Mon Sep 17 00:00:00 2001 From: Volodymyr Kasaraba Date: Mon, 16 Feb 2026 14:29:08 -0500 Subject: [PATCH 2/8] upd downloads --- examples/cloud_browser.py | 28 +--------- packages/narada/src/narada/client.py | 7 +-- packages/narada/src/narada/cloud_downloads.py | 48 +++++++--------- packages/narada/src/narada/config.py | 2 - packages/narada/src/narada/window.py | 56 +------------------ 5 files changed, 27 insertions(+), 114 deletions(-) diff --git a/examples/cloud_browser.py b/examples/cloud_browser.py index fa28e1f..59c1983 100644 --- a/examples/cloud_browser.py +++ b/examples/cloud_browser.py @@ -8,8 +8,6 @@ async def main() -> None: # Initialize the Narada client. async with Narada() as narada: # Open a cloud browser window and initialize the Narada UI agent. - # A CDP download handler is automatically set up behind the scenes so - # any file the agent downloads is tracked. window = await narada.open_and_initialize_cloud_browser_window( session_name="my-cloud-browser-session", # Optional: label the session session_timeout=3600, # Optional: session timeout in seconds @@ -24,31 +22,7 @@ async def main() -> None: ) print("Response:", response.model_dump_json(indent=2)) - - # ── Download handling ──────────────────────────────────────── - # Transfer any files the agent downloaded during the task above. - # The CDP download handler captures download events automatically; - # transfer_all_downloads() waits for pending downloads and streams - # each file from the remote browser to the local directory. - downloaded_paths = await window.transfer_all_downloads() - if downloaded_paths: - print(f"Downloaded {len(downloaded_paths)} file(s):") - for path in downloaded_paths: - print(f" -> {path}") - - # Or wait for a single download and transfer it individually: - # - # download = await window.wait_for_download(timeout=60) - # if download is not None: - # local_path = await window.transfer_download(download, f"./downloads/{download.filename}") - # print(f"Saved: {local_path}") - # - # You can also inspect already-completed downloads without waiting: - # - # completed = await window.get_completed_downloads() - # print(f"{len(completed)} file(s) downloaded") - # ───────────────────────────────────────────────────────────── - + # The cloud session is still running after exiting the context manager. # You can save the session ID for later reconnection or management. cloud_browser_session_id = window.cloud_browser_session_id diff --git a/packages/narada/src/narada/client.py b/packages/narada/src/narada/client.py index c0d61d5..4334cff 100644 --- a/packages/narada/src/narada/client.py +++ b/packages/narada/src/narada/client.py @@ -17,6 +17,7 @@ NaradaTimeoutError, NaradaUnsupportedBrowserError, ) +from narada.cloud_downloads import CDPDownloadHandler from narada_core.models import _SdkConfig from packaging.version import Version from playwright._impl._errors import Error as PlaywrightError @@ -261,15 +262,12 @@ async def _initialize_cloud_browser_window( logging.info("Waiting for Narada extension to be installed...") await asyncio.sleep(1) - # Set up browser-level CDP download handler so downloads from any tab are captured. - from narada.cloud_downloads import CDPDownloadHandler - + # Set up browser-level CDP download handler to capture downloads from any tab download_handler = CDPDownloadHandler( session_id=session_id, on_download_complete=config.on_download_complete, ) await download_handler.setup(browser) - print("[client] CDP download handler setup complete. browser=", browser) cloud_window = CloudBrowserWindow( browser_window_id=browser_window_id, @@ -278,7 +276,6 @@ async def _initialize_cloud_browser_window( browser=browser, download_handler=download_handler, ) - print("[client] cloud_window created. cloud_window=", cloud_window) if config.interactive: self._print_success_message(browser_window_id) diff --git a/packages/narada/src/narada/cloud_downloads.py b/packages/narada/src/narada/cloud_downloads.py index 786f848..9ed87ee 100644 --- a/packages/narada/src/narada/cloud_downloads.py +++ b/packages/narada/src/narada/cloud_downloads.py @@ -12,6 +12,7 @@ import asyncio import base64 import logging +import time from dataclasses import dataclass from pathlib import Path from typing import TYPE_CHECKING, Any, Callable @@ -63,10 +64,6 @@ def __init__( self._cdp_session: Any | None = None # playwright CDPSession self._browser: Browser | None = None - # ------------------------------------------------------------------ - # Setup - # ------------------------------------------------------------------ - async def setup(self, browser: Browser) -> None: """Attach to the browser and start listening for download events.""" self._browser = browser @@ -90,15 +87,6 @@ async def setup(self, browser: Browser) -> None: lambda event: asyncio.create_task(self._on_download_progress(event)), ) - print( - "[cloud_downloads] CDP download listeners attached (browser-level) " - f"session_id={self._session_id!r} on_download_complete={self._on_download_complete is not None}" - ) - - # ------------------------------------------------------------------ - # Internal event handlers - # ------------------------------------------------------------------ - async def _on_download_begin(self, event: dict[str, Any]) -> None: guid: str = event.get("guid", "") filename: str = event.get("suggestedFilename", "download") @@ -111,7 +99,6 @@ async def _on_download_begin(self, event: dict[str, Any]) -> None: self._done_events[guid] = asyncio.Event() async def _on_download_progress(self, event: dict[str, Any]) -> None: - print("[cloud_downloads] _on_download_progress called") guid: str = event.get("guid", "") state: str = event.get("state", "") received: int = event.get("receivedBytes", 0) @@ -148,10 +135,6 @@ async def _on_download_progress(self, event: dict[str, Any]) -> None: if guid in self._done_events: self._done_events[guid].set() - # ------------------------------------------------------------------ - # Public query / wait helpers - # ------------------------------------------------------------------ - @property def has_pending_downloads(self) -> bool: print("[cloud_downloads] has_pending_downloads called") @@ -270,11 +253,6 @@ async def wait_for_all( return results -# -------------------------------------------------------------------------- -# File transfer: remote browser -> local filesystem -# -------------------------------------------------------------------------- - - async def download_remote_file_to_local( browser: Browser, remote_file_path: str, @@ -299,7 +277,6 @@ async def download_remote_file_to_local( Returns: The resolved local :class:`~pathlib.Path`, or ``None`` on failure. """ - print("[cloud_downloads] download_remote_file_to_local called") local_path = Path(local_path) local_path.parent.mkdir(parents=True, exist_ok=True) @@ -310,7 +287,10 @@ async def download_remote_file_to_local( cdp = await transfer_page.context.new_cdp_session(transfer_page) try: - print(f"[cloud_downloads] Reading remote file: {remote_file_path}") + read_start_ts = time.strftime("%H:%M:%S", time.localtime()) + print( + f"[cloud_downloads] Reading remote file: {remote_file_path} -> {local_path.name} (started at {read_start_ts})" + ) # Enable Fetch domain to intercept file:// responses await cdp.send( @@ -361,8 +341,10 @@ async def _on_request_paused(event: dict[str, Any]) -> None: return None # Stream the file contents to local disk in chunks. + stream_start = time.perf_counter() + start_ts = time.strftime("%H:%M:%S", time.localtime()) print( - f"[cloud_downloads] Streaming file from remote to local: {remote_file_path} -> {local_path}" + f"[cloud_downloads] Streaming file from remote to local: {remote_file_path} -> {local_path} (started at {start_ts})" ) downloaded = 0 @@ -386,7 +368,19 @@ async def _on_request_paused(event: dict[str, Any]) -> None: break await cdp.send("IO.close", {"handle": stream_handle}) - print(f"[cloud_downloads] Transfer complete: {local_path} ({downloaded:,} bytes)") + stream_elapsed = time.perf_counter() - stream_start + end_ts = time.strftime("%H:%M:%S", time.localtime()) + print( + f"[cloud_downloads] Transfer complete: {local_path} ({downloaded:,} bytes) in {stream_elapsed:.2f}s (finished at {end_ts})" + ) + logger.info( + "Streamed file from AgentCore to local: %s -> %s (%s bytes, %.2fs, finished at %s)", + remote_file_path, + local_path, + f"{downloaded:,}", + stream_elapsed, + end_ts, + ) return local_path except Exception as exc: diff --git a/packages/narada/src/narada/config.py b/packages/narada/src/narada/config.py index df85576..8c8f30e 100644 --- a/packages/narada/src/narada/config.py +++ b/packages/narada/src/narada/config.py @@ -76,8 +76,6 @@ class BrowserConfig: extension_id: str = "bhioaidlggjdkheaajakomifblpjmokn" interactive: bool = True proxy: ProxyConfig | None = None - # If set, each completed cloud-browser download runs this sync callable in a thread (run_in_executor). - # Signature: (session_id: str, guid: str, filename: str) -> None. on_download_complete: Callable[[str, str, str], None] | None = None @property diff --git a/packages/narada/src/narada/window.py b/packages/narada/src/narada/window.py index 3810ef2..24d26ae 100644 --- a/packages/narada/src/narada/window.py +++ b/packages/narada/src/narada/window.py @@ -57,11 +57,9 @@ from pydantic import BaseModel from narada.config import BrowserConfig +from playwright.async_api import Browser +from narada.cloud_downloads import CDPDownloadHandler, DownloadInfo -if TYPE_CHECKING: - from playwright.async_api import Browser - - from narada.cloud_downloads import CDPDownloadHandler, DownloadInfo logger = logging.getLogger(__name__) @@ -655,11 +653,6 @@ class CloudBrowserWindow(BaseBrowserWindow): This class connects to a cloud browser session created by the backend API and provides the same interface as other browser window classes for agent operations. - - When created via :meth:`Narada.open_and_initialize_cloud_browser_window`, a - :class:`~narada.cloud_downloads.CDPDownloadHandler` is automatically set up so that - any files downloaded during agent actions can be transferred to local disk via the - :meth:`transfer_download` / :meth:`transfer_all_downloads` convenience methods. """ def __init__( @@ -689,10 +682,6 @@ def __init__( def cloud_browser_session_id(self) -> str: return self._session_id - # ------------------------------------------------------------------ - # Download helpers - # ------------------------------------------------------------------ - @property def has_pending_downloads(self) -> bool: """Return ``True`` if any tracked download is still in progress.""" @@ -752,7 +741,6 @@ async def transfer_download( Returns: The resolved local path, or ``None`` on failure. """ - print("[cloud_browser_window] transfer_download called") if self._browser is None: logger.warning("Browser reference not available -- cannot transfer file") return None @@ -804,44 +792,6 @@ async def transfer_all_downloads( return transferred - # ------------------------------------------------------------------ - # Debug - # ------------------------------------------------------------------ - - async def pause(self) -> None: - """Pause execution so you can interact with the cloud browser manually. - - Prints the session ID and a prompt, then blocks until you press ENTER. - CDP download listeners remain active, so any downloads you trigger - during the pause will be captured. After resuming, all completed - downloads are automatically transferred to the default local directory. - """ - print( - f"\n{'=' * 60}" - f"\n Cloud browser session: {self._session_id}" - f"\n Browser window ID: {self._browser_window_id}" - f"\n{'=' * 60}" - f"\n" - f"\n >>> Session is PAUSED -- interact with the browser manually." - f"\n CDP download listeners are active; downloads will be captured." - f"\n" - f"\n Press ENTER to resume and transfer downloads..." - f"\n" - ) - await asyncio.get_event_loop().run_in_executor(None, input) - - # Auto-transfer any downloads that were captured during the pause. - paths = await self.transfer_all_downloads() - if paths: - print(f"\n[cloud_downloads] Transferred {len(paths)} file(s):") - for p in paths: - print(f" -> {p}") - else: - print("\n[cloud_downloads] No downloads to transfer.") - - # ------------------------------------------------------------------ - # Lifecycle - # ------------------------------------------------------------------ @override async def close(self, *, timeout: int | None = None) -> None: @@ -850,7 +800,7 @@ async def close(self, *, timeout: int | None = None) -> None: Unlike local browser windows where close() closes a single window, this stops the entire cloud session since the serverless container manages the browser lifecycle. """ - # Disconnect Playwright from the browser (does not stop the remote session). + # Disconnect Playwright from the browser if self._browser is not None: try: self._browser.close() From 5e4089add4ab38be505b276799b6187aa65bdd25 Mon Sep 17 00:00:00 2001 From: Volodymyr Kasaraba Date: Mon, 16 Feb 2026 18:01:28 -0500 Subject: [PATCH 3/8] upd downloads --- packages/narada/src/narada/client.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/packages/narada/src/narada/client.py b/packages/narada/src/narada/client.py index 4334cff..e8b6676 100644 --- a/packages/narada/src/narada/client.py +++ b/packages/narada/src/narada/client.py @@ -262,6 +262,17 @@ async def _initialize_cloud_browser_window( logging.info("Waiting for Narada extension to be installed...") await asyncio.sleep(1) + # TODO: This is a hack + await browser.close() + browser = await self._playwright.chromium.connect_over_cdp(cdp_websocket_url, headers=cdp_auth_headers) + context = browser.contexts[0] + # Get side panel page + side_panel_url = create_side_panel_url(config, browser_window_id) + side_panel_page = next( + (p for p in context.pages if p.url == side_panel_url), None + ) + await self._fix_download_behavior(side_panel_page) + # Set up browser-level CDP download handler to capture downloads from any tab download_handler = CDPDownloadHandler( session_id=session_id, From 8b132a6f5e9a5081ba4c4c74bc43f6eb8fd46692 Mon Sep 17 00:00:00 2001 From: Volodymyr Kasaraba Date: Mon, 16 Feb 2026 18:27:15 -0500 Subject: [PATCH 4/8] improve code quality --- packages/narada/src/narada/cloud_downloads.py | 8 ----- packages/narada/src/narada/window.py | 32 ++----------------- 2 files changed, 2 insertions(+), 38 deletions(-) diff --git a/packages/narada/src/narada/cloud_downloads.py b/packages/narada/src/narada/cloud_downloads.py index 9ed87ee..151a40d 100644 --- a/packages/narada/src/narada/cloud_downloads.py +++ b/packages/narada/src/narada/cloud_downloads.py @@ -135,14 +135,6 @@ async def _on_download_progress(self, event: dict[str, Any]) -> None: if guid in self._done_events: self._done_events[guid].set() - @property - def has_pending_downloads(self) -> bool: - print("[cloud_downloads] has_pending_downloads called") - """Return ``True`` if any tracked download has not yet finished.""" - return any( - info["state"] == "inProgress" for info in self._downloads.values() - ) - async def wait_for_download( self, *, timeout: float | None = None ) -> DownloadInfo | None: diff --git a/packages/narada/src/narada/window.py b/packages/narada/src/narada/window.py index 24d26ae..acbdb1f 100644 --- a/packages/narada/src/narada/window.py +++ b/packages/narada/src/narada/window.py @@ -682,13 +682,6 @@ def __init__( def cloud_browser_session_id(self) -> str: return self._session_id - @property - def has_pending_downloads(self) -> bool: - """Return ``True`` if any tracked download is still in progress.""" - if self._download_handler is None: - return False - return self._download_handler.has_pending_downloads - async def wait_for_download( self, *, timeout: float | None = None ) -> DownloadInfo | None: @@ -703,27 +696,6 @@ async def wait_for_download( return None return await self._download_handler.wait_for_download(timeout=timeout) - async def get_completed_downloads(self) -> list[DownloadInfo]: - """Return info for all downloads that have completed so far (non-blocking).""" - if self._download_handler is None: - return [] - # Collect completed downloads from the handler's internal state. - from narada.cloud_downloads import DownloadInfo as _DI - - results: list[DownloadInfo] = [] - handler = self._download_handler - for guid, info in handler._downloads.items(): - if info["state"] == "completed": - results.append( - _DI( - guid=guid, - filename=info["filename"], - remote_path=f"{handler._remote_download_dir}/{guid}", - size=info["received"], - ) - ) - return results - async def transfer_download( self, download: DownloadInfo, @@ -735,7 +707,7 @@ async def transfer_download( Args: download: A :class:`~narada.cloud_downloads.DownloadInfo` obtained from - :meth:`wait_for_download` or :meth:`get_completed_downloads`. + :meth:`wait_for_download`. local_path: Where to save the file locally. Returns: @@ -753,7 +725,7 @@ async def transfer_download( async def transfer_all_downloads( self, - local_dir: str | Path = "/Users/Volodymyr/projects/narada/remote_downloads", + local_dir: str | Path = "", *, timeout: float | None = None, ) -> list[Path]: From e6e648a9ba13d08f9f84192926a97244b280ce6a Mon Sep 17 00:00:00 2001 From: Volodymyr Kasaraba Date: Mon, 16 Feb 2026 18:49:17 -0500 Subject: [PATCH 5/8] improve code quality --- packages/narada/src/narada/cloud_downloads.py | 99 +++++++------------ 1 file changed, 36 insertions(+), 63 deletions(-) diff --git a/packages/narada/src/narada/cloud_downloads.py b/packages/narada/src/narada/cloud_downloads.py index 151a40d..d22c843 100644 --- a/packages/narada/src/narada/cloud_downloads.py +++ b/packages/narada/src/narada/cloud_downloads.py @@ -21,10 +21,17 @@ from playwright.async_api import Browser logger = logging.getLogger(__name__) +logger.setLevel(logging.INFO) DEFAULT_REMOTE_DOWNLOAD_DIR = "/tmp/remote_downloads" CHUNK_SIZE = 4 * 1024 * 1024 # 4 MB +# CDP Browser.downloadProgress state values +_STATE_IN_PROGRESS = "inProgress" +_STATE_COMPLETED = "completed" +_STATE_CANCELED = "canceled" +_STATE_INTERRUPTED = "interrupted" + @dataclass class DownloadInfo: @@ -57,11 +64,11 @@ def __init__( self._remote_download_dir = remote_download_dir self._session_id = session_id self._on_download_complete = on_download_complete - # guid -> {filename, state, received} + # guid -> dict with keys "filename", "state", "received" self._downloads: dict[str, dict[str, Any]] = {} - # guid -> asyncio.Event (set when the download reaches a terminal state) + # guid -> Event set when that download reaches a terminal state self._done_events: dict[str, asyncio.Event] = {} - self._cdp_session: Any | None = None # playwright CDPSession + self._cdp_session: Any | None = None self._browser: Browser | None = None async def setup(self, browser: Browser) -> None: @@ -90,10 +97,9 @@ async def setup(self, browser: Browser) -> None: async def _on_download_begin(self, event: dict[str, Any]) -> None: guid: str = event.get("guid", "") filename: str = event.get("suggestedFilename", "download") - print(f"[cloud_downloads] Download started: {filename} (guid: {guid})") self._downloads[guid] = { "filename": filename, - "state": "inProgress", + "state": _STATE_IN_PROGRESS, "received": 0, } self._done_events[guid] = asyncio.Event() @@ -107,33 +113,22 @@ async def _on_download_progress(self, event: dict[str, Any]) -> None: self._downloads[guid]["state"] = state self._downloads[guid]["received"] = received - if state == "completed": - filename = self._downloads.get(guid, {}).get("filename", guid) - print( - f"[cloud_downloads] Download completed: {filename} ({received:,} bytes)" - ) + if state in (_STATE_COMPLETED, _STATE_CANCELED, _STATE_INTERRUPTED): if guid in self._done_events: self._done_events[guid].set() + + if state == _STATE_COMPLETED: + filename = self._downloads.get(guid, {}).get("filename", guid) if self._on_download_complete and self._session_id: - print( - f"[cloud_downloads] Running transfer in executor (session_id={self._session_id}, filename={filename})" - ) - loop = asyncio.get_event_loop() + loop = asyncio.get_running_loop() loop.run_in_executor( None, lambda: self._on_download_complete( self._session_id, guid, filename ), ) - else: - print( - "[cloud_downloads] No transfer: on_download_complete or session_id not set" - ) - elif state in ("canceled", "interrupted"): - filename = self._downloads.get(guid, {}).get("filename", guid) - print(f"[cloud_downloads] Download {state}: {filename}") - if guid in self._done_events: - self._done_events[guid].set() + elif state in (_STATE_CANCELED, _STATE_INTERRUPTED): + pass async def wait_for_download( self, *, timeout: float | None = None @@ -145,32 +140,28 @@ async def wait_for_download( Returns ``None`` on timeout or if the download was canceled/interrupted. """ - print("[cloud_downloads] wait_for_download called") # Find first download that hasn't finished yet, or the most recent completed one # that hasn't been consumed. target_guid: str | None = None for guid, info in self._downloads.items(): - if info["state"] == "inProgress": + if info["state"] == _STATE_IN_PROGRESS: target_guid = guid break if target_guid is None: # All existing downloads are already done; wait for a new one by polling. # We do a simple poll loop so we can detect newly arriving downloads. - deadline = ( - asyncio.get_event_loop().time() + timeout - if timeout is not None - else None - ) + loop = asyncio.get_running_loop() + deadline = (loop.time() + timeout) if timeout is not None else None while True: for guid, info in self._downloads.items(): if guid not in self._done_events or not self._done_events[guid].is_set(): - if info["state"] == "inProgress": + if info["state"] == _STATE_IN_PROGRESS: target_guid = guid break if target_guid is not None: break - if deadline is not None and asyncio.get_event_loop().time() >= deadline: + if deadline is not None and loop.time() >= deadline: return None await asyncio.sleep(0.5) @@ -187,7 +178,7 @@ async def wait_for_download( return None info = self._downloads[target_guid] - if info["state"] != "completed": + if info["state"] != _STATE_COMPLETED: return None return DownloadInfo( @@ -206,13 +197,8 @@ async def wait_for_all( successfully. Downloads that were canceled or interrupted are skipped. """ if not self._downloads: - print("[cloud_downloads] No downloads were captured") return [] - print( - f"[cloud_downloads] Waiting for {len(self._downloads)} download(s) to complete..." - ) - # Gather all done-events with an optional timeout. waiter = asyncio.gather(*(ev.wait() for ev in self._done_events.values())) try: @@ -225,7 +211,7 @@ async def wait_for_all( results: list[DownloadInfo] = [] for guid, info in self._downloads.items(): - if info["state"] == "completed": + if info["state"] == _STATE_COMPLETED: results.append( DownloadInfo( guid=guid, @@ -234,14 +220,7 @@ async def wait_for_all( size=info["received"], ) ) - else: - print( - f"[cloud_downloads] Skipping {info['filename']} -- ended with state: {info['state']}" - ) - print( - f"[cloud_downloads] {len(results)}/{len(self._downloads)} download(s) succeeded" - ) return results @@ -280,9 +259,6 @@ async def download_remote_file_to_local( try: read_start_ts = time.strftime("%H:%M:%S", time.localtime()) - print( - f"[cloud_downloads] Reading remote file: {remote_file_path} -> {local_path.name} (started at {read_start_ts})" - ) # Enable Fetch domain to intercept file:// responses await cdp.send( @@ -300,8 +276,8 @@ async def _on_request_paused(event: dict[str, Any]) -> None: "Fetch.takeResponseBodyAsStream", {"requestId": request_id} ) stream_handle_holder["handle"] = stream_result.get("stream") - except Exception as exc: - print(f"[cloud_downloads] takeResponseBodyAsStream failed: {exc}") + except Exception: + pass finally: fetch_done.set() @@ -322,21 +298,24 @@ async def _on_request_paused(event: dict[str, Any]) -> None: try: await asyncio.wait_for(fetch_done.wait(), timeout=30) except asyncio.TimeoutError: - print("[cloud_downloads] Timeout waiting for Fetch intercept") + logger.warning("Timeout waiting for Fetch intercept") return None await cdp.send("Fetch.disable") stream_handle = stream_handle_holder.get("handle") if not stream_handle: - print("[cloud_downloads] No stream handle obtained") + logger.warning("No stream handle obtained") return None # Stream the file contents to local disk in chunks. stream_start = time.perf_counter() start_ts = time.strftime("%H:%M:%S", time.localtime()) - print( - f"[cloud_downloads] Streaming file from remote to local: {remote_file_path} -> {local_path} (started at {start_ts})" + logger.info( + "Streaming file from remote to local: %s -> %s (started at %s)", + remote_file_path, + local_path, + start_ts, ) downloaded = 0 @@ -361,22 +340,16 @@ async def _on_request_paused(event: dict[str, Any]) -> None: await cdp.send("IO.close", {"handle": stream_handle}) stream_elapsed = time.perf_counter() - stream_start - end_ts = time.strftime("%H:%M:%S", time.localtime()) - print( - f"[cloud_downloads] Transfer complete: {local_path} ({downloaded:,} bytes) in {stream_elapsed:.2f}s (finished at {end_ts})" - ) logger.info( - "Streamed file from AgentCore to local: %s -> %s (%s bytes, %.2fs, finished at %s)", - remote_file_path, + "Transfer complete: %s (%s bytes) in %.2fs", local_path, f"{downloaded:,}", stream_elapsed, - end_ts, ) return local_path except Exception as exc: - print(f"[cloud_downloads] Transfer failed: {exc}") + logger.exception("Transfer failed: %s", exc) return None finally: From 35e8d03f3b15ff075847d6ffd01a9811f8184329 Mon Sep 17 00:00:00 2001 From: Volodymyr Kasaraba Date: Tue, 17 Feb 2026 17:14:59 -0500 Subject: [PATCH 6/8] upd cloud downloads --- .gitignore | 3 ++ examples/cloud_browser.py | 4 +- packages/narada/src/narada/client.py | 34 +++++++++++-- packages/narada/src/narada/cloud_downloads.py | 49 +++++++++++++++++-- packages/narada/src/narada/window.py | 49 +++++++++++++++++-- 5 files changed, 127 insertions(+), 12 deletions(-) diff --git a/.gitignore b/.gitignore index f62f237..0ffd3c6 100644 --- a/.gitignore +++ b/.gitignore @@ -18,3 +18,6 @@ wheels/ .DS_Store *.*~ + +# Cloud downloads +cloud_downloads/ \ No newline at end of file diff --git a/examples/cloud_browser.py b/examples/cloud_browser.py index 59c1983..a90f133 100644 --- a/examples/cloud_browser.py +++ b/examples/cloud_browser.py @@ -10,7 +10,7 @@ async def main() -> None: # Open a cloud browser window and initialize the Narada UI agent. window = await narada.open_and_initialize_cloud_browser_window( session_name="my-cloud-browser-session", # Optional: label the session - session_timeout=3600, # Optional: session timeout in seconds + session_timeout=3600, # Optional: timeout in seconds, up to 8 hours, default is 30 minutes ) # Run a task in this browser window. @@ -21,7 +21,9 @@ async def main() -> None: ) ) + # All downloads are placed in the cloud_downloads directory. print("Response:", response.model_dump_json(indent=2)) + # The cloud session is still running after exiting the context manager. # You can save the session ID for later reconnection or management. diff --git a/packages/narada/src/narada/client.py b/packages/narada/src/narada/client.py index e8b6676..f691689 100644 --- a/packages/narada/src/narada/client.py +++ b/packages/narada/src/narada/client.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import concurrent.futures import logging import os import subprocess @@ -17,7 +18,12 @@ NaradaTimeoutError, NaradaUnsupportedBrowserError, ) -from narada.cloud_downloads import CDPDownloadHandler +from pathlib import Path + +from narada.cloud_downloads import ( + CDPDownloadHandler, + make_default_on_download_complete_callback, +) from narada_core.models import _SdkConfig from packaging.version import Version from playwright._impl._errors import Error as PlaywrightError @@ -74,6 +80,7 @@ def __init__( api_key = api_key or os.environ["NARADA_API_KEY"] self._auth_headers = {"x-api-key": api_key} self._console = Console() + self._pending_download_futures: list[concurrent.futures.Future] = [] async def __aenter__(self) -> Narada: await self._validate_sdk_config() @@ -86,6 +93,16 @@ async def __aexit__(self, *args: Any) -> None: if self._playwright_context_manager is None: return + # Wait for any in-flight download transfers (from default callback) so the + # browser is not closed until transfers finish. Await so the event loop can + # run the transfer coroutines (they were scheduled via run_coroutine_threadsafe). + for fut in self._pending_download_futures: + try: + await asyncio.wait_for(asyncio.wrap_future(fut), timeout=300) + except (asyncio.TimeoutError, Exception): + pass + self._pending_download_futures.clear() + await self._playwright_context_manager.__aexit__(*args) self._playwright_context_manager = None self._playwright = None @@ -234,6 +251,7 @@ async def _initialize_cloud_browser_window( login_url: str, cdp_auth_headers: dict[str, str], ) -> CloudBrowserWindow: + print(f"\n\ninitialize_cloud_browser_window called") assert self._playwright is not None # Connect to browser via CDP with authentication headers @@ -273,10 +291,20 @@ async def _initialize_cloud_browser_window( ) await self._fix_download_behavior(side_panel_page) - # Set up browser-level CDP download handler to capture downloads from any tab + # Set up browser-level CDP download handler to capture downloads from any tab. + # When no callback is provided, use a default that transfers each completed + # download to ./cloud_downloads// so concurrent sessions do not overwrite. + on_download_complete = config.on_download_complete + if on_download_complete is None: + loop = asyncio.get_running_loop() + base_dir = Path.cwd() / "cloud_downloads" + on_download_complete = make_default_on_download_complete_callback( + browser, loop, base_dir, self._pending_download_futures + ) + download_handler = CDPDownloadHandler( session_id=session_id, - on_download_complete=config.on_download_complete, + on_download_complete=on_download_complete, ) await download_handler.setup(browser) diff --git a/packages/narada/src/narada/cloud_downloads.py b/packages/narada/src/narada/cloud_downloads.py index d22c843..0b2d892 100644 --- a/packages/narada/src/narada/cloud_downloads.py +++ b/packages/narada/src/narada/cloud_downloads.py @@ -43,6 +43,43 @@ class DownloadInfo: size: int +def local_path_for_session_download( + base_dir: str | Path, + session_id: str, + filename: str, +) -> Path: + """Build the local path for a session download so concurrent sessions do not overwrite. + + Files are placed under ``base_dir / session_id / filename``. + """ + return Path(base_dir) / session_id / filename + + +def make_default_on_download_complete_callback( + browser: "Browser", + loop: asyncio.AbstractEventLoop, + base_dir: str | Path, + pending_futures: list[Any], +) -> Callable[[str, str, str], None]: + """Return a sync callback (session_id, guid, filename) -> None that transfers each + completed download to ``base_dir / session_id / filename`` and appends the + transfer future to *pending_futures* so the caller can wait before closing the browser. + """ + base_dir = Path(base_dir) + + def on_download_complete(session_id: str, guid: str, filename: str) -> None: + local_path = local_path_for_session_download(base_dir, session_id, filename) + local_path.parent.mkdir(parents=True, exist_ok=True) + remote_path = f"{DEFAULT_REMOTE_DOWNLOAD_DIR}/{guid}" + future = asyncio.run_coroutine_threadsafe( + download_remote_file_to_local(browser, remote_path, local_path), + loop, + ) + pending_futures.append(future) + + return on_download_complete + + class CDPDownloadHandler: """Tracks downloads on a remote cloud browser via a browser-level CDP session. @@ -73,6 +110,7 @@ def __init__( async def setup(self, browser: Browser) -> None: """Attach to the browser and start listening for download events.""" + print(f"\n\nCDPDownloadHandler.setup called") self._browser = browser self._cdp_session = await browser.new_browser_cdp_session() @@ -84,7 +122,6 @@ async def setup(self, browser: Browser) -> None: "eventsEnabled": True, }, ) - self._cdp_session.on( "Browser.downloadWillBegin", lambda event: asyncio.create_task(self._on_download_begin(event)), @@ -95,6 +132,7 @@ async def setup(self, browser: Browser) -> None: ) async def _on_download_begin(self, event: dict[str, Any]) -> None: + print(f"\n\nCDPDownloadHandler._on_download_begin called") guid: str = event.get("guid", "") filename: str = event.get("suggestedFilename", "download") self._downloads[guid] = { @@ -108,6 +146,7 @@ async def _on_download_progress(self, event: dict[str, Any]) -> None: guid: str = event.get("guid", "") state: str = event.get("state", "") received: int = event.get("receivedBytes", 0) + print(f"\n\nCDPDownloadHandler._on_download_progress called: {guid} {state} {received}") if guid in self._downloads: self._downloads[guid]["state"] = state @@ -118,6 +157,7 @@ async def _on_download_progress(self, event: dict[str, Any]) -> None: self._done_events[guid].set() if state == _STATE_COMPLETED: + print(f"\n\nCDPDownloadHandler._on_download_progress completed: {guid}. _on_download_complete={self._on_download_complete}") filename = self._downloads.get(guid, {}).get("filename", guid) if self._on_download_complete and self._session_id: loop = asyncio.get_running_loop() @@ -133,6 +173,7 @@ async def _on_download_progress(self, event: dict[str, Any]) -> None: async def wait_for_download( self, *, timeout: float | None = None ) -> DownloadInfo | None: + print(f"\n\nCDPDownloadHandler.wait_for_download called") """Wait for the next download to complete and return its info. If no download events have been received yet, this will block until one @@ -196,6 +237,7 @@ async def wait_for_all( Returns a list of :class:`DownloadInfo` for every download that completed successfully. Downloads that were canceled or interrupted are skipped. """ + print(f"\n\nCDPDownloadHandler.wait_for_all called") if not self._downloads: return [] @@ -248,6 +290,7 @@ async def download_remote_file_to_local( Returns: The resolved local :class:`~pathlib.Path`, or ``None`` on failure. """ + print(f"\n\ndownload_remote_file_to_local called") local_path = Path(local_path) local_path.parent.mkdir(parents=True, exist_ok=True) @@ -311,7 +354,7 @@ async def _on_request_paused(event: dict[str, Any]) -> None: # Stream the file contents to local disk in chunks. stream_start = time.perf_counter() start_ts = time.strftime("%H:%M:%S", time.localtime()) - logger.info( + logger.warning( "Streaming file from remote to local: %s -> %s (started at %s)", remote_file_path, local_path, @@ -340,7 +383,7 @@ async def _on_request_paused(event: dict[str, Any]) -> None: await cdp.send("IO.close", {"handle": stream_handle}) stream_elapsed = time.perf_counter() - stream_start - logger.info( + logger.warning( "Transfer complete: %s (%s bytes) in %.2fs", local_path, f"{downloaded:,}", diff --git a/packages/narada/src/narada/window.py b/packages/narada/src/narada/window.py index acbdb1f..a0a0d9d 100644 --- a/packages/narada/src/narada/window.py +++ b/packages/narada/src/narada/window.py @@ -58,7 +58,11 @@ from narada.config import BrowserConfig from playwright.async_api import Browser -from narada.cloud_downloads import CDPDownloadHandler, DownloadInfo +from narada.cloud_downloads import ( + CDPDownloadHandler, + DownloadInfo, + local_path_for_session_download, +) logger = logging.getLogger(__name__) @@ -733,9 +737,11 @@ async def transfer_all_downloads( This is a convenience wrapper that calls :meth:`wait_for_all` on the download handler and then :meth:`transfer_download` for each result. + Files are saved under ``local_dir / session_id / filename`` so concurrent + sessions do not overwrite. Args: - local_dir: Directory to save files into. Created automatically. + local_dir: Base directory to save files into. Created automatically. timeout: Maximum seconds to wait for downloads to finish. Returns: @@ -755,7 +761,8 @@ async def transfer_all_downloads( transferred: list[Path] = [] for dl in completed: - dest = local_dir / dl.filename + dest = local_path_for_session_download(local_dir, self._session_id, dl.filename) + dest.parent.mkdir(parents=True, exist_ok=True) result = await self.transfer_download(dl, dest) if result is not None: transferred.append(result) @@ -766,12 +773,44 @@ async def transfer_all_downloads( @override - async def close(self, *, timeout: int | None = None) -> None: + async def close( + self, + *, + timeout: int | None = None, + local_download_dir: str | Path | None = None, + download_wait_timeout: float = 60.0, + ) -> None: """Stops the cloud browser session. Unlike local browser windows where close() closes a single window, this stops the entire cloud session since the serverless container manages the browser lifecycle. + + Before disconnecting, any downloads captured by the CDP handler are transferred + to *local_download_dir* (default: ``./cloud_downloads``), so file downloads + from the session are available locally when using the SDK without a backend + callback. """ + if self._browser is not None and self._download_handler is not None: + download_dir = ( + Path(local_download_dir) + if local_download_dir is not None + else Path.cwd() / "cloud_downloads" + ) + try: + paths = await self.transfer_all_downloads( + download_dir, timeout=download_wait_timeout + ) + if paths: + logger.info( + "Transferred %d download(s) to %s before closing", + len(paths), + download_dir, + ) + except Exception as exc: + logger.warning( + "Failed to transfer downloads before close: %s", exc + ) + # Disconnect Playwright from the browser if self._browser is not None: try: @@ -810,7 +849,7 @@ async def _stop_cloud_browser_session( f"{base_url}/cloud-browser/stop-cloud-browser-session", headers=auth_headers, json={"session_id": session_id}, - timeout=aiohttp.ClientTimeout(total=timeout or 10), + timeout=aiohttp.ClientTimeout(total=timeout or 80), ) as resp: if resp.ok: response_data = await resp.json() From 775e7a3e7e35ee9f4a87ca217ed7a778e1637723 Mon Sep 17 00:00:00 2001 From: Volodymyr Kasaraba Date: Tue, 17 Feb 2026 17:18:28 -0500 Subject: [PATCH 7/8] remove print logs --- packages/narada/src/narada/client.py | 1 - packages/narada/src/narada/cloud_downloads.py | 11 ++--------- packages/narada/src/narada/window.py | 2 -- 3 files changed, 2 insertions(+), 12 deletions(-) diff --git a/packages/narada/src/narada/client.py b/packages/narada/src/narada/client.py index f691689..66923e9 100644 --- a/packages/narada/src/narada/client.py +++ b/packages/narada/src/narada/client.py @@ -251,7 +251,6 @@ async def _initialize_cloud_browser_window( login_url: str, cdp_auth_headers: dict[str, str], ) -> CloudBrowserWindow: - print(f"\n\ninitialize_cloud_browser_window called") assert self._playwright is not None # Connect to browser via CDP with authentication headers diff --git a/packages/narada/src/narada/cloud_downloads.py b/packages/narada/src/narada/cloud_downloads.py index 0b2d892..bacbf56 100644 --- a/packages/narada/src/narada/cloud_downloads.py +++ b/packages/narada/src/narada/cloud_downloads.py @@ -110,7 +110,6 @@ def __init__( async def setup(self, browser: Browser) -> None: """Attach to the browser and start listening for download events.""" - print(f"\n\nCDPDownloadHandler.setup called") self._browser = browser self._cdp_session = await browser.new_browser_cdp_session() @@ -132,7 +131,6 @@ async def setup(self, browser: Browser) -> None: ) async def _on_download_begin(self, event: dict[str, Any]) -> None: - print(f"\n\nCDPDownloadHandler._on_download_begin called") guid: str = event.get("guid", "") filename: str = event.get("suggestedFilename", "download") self._downloads[guid] = { @@ -146,7 +144,6 @@ async def _on_download_progress(self, event: dict[str, Any]) -> None: guid: str = event.get("guid", "") state: str = event.get("state", "") received: int = event.get("receivedBytes", 0) - print(f"\n\nCDPDownloadHandler._on_download_progress called: {guid} {state} {received}") if guid in self._downloads: self._downloads[guid]["state"] = state @@ -157,7 +154,6 @@ async def _on_download_progress(self, event: dict[str, Any]) -> None: self._done_events[guid].set() if state == _STATE_COMPLETED: - print(f"\n\nCDPDownloadHandler._on_download_progress completed: {guid}. _on_download_complete={self._on_download_complete}") filename = self._downloads.get(guid, {}).get("filename", guid) if self._on_download_complete and self._session_id: loop = asyncio.get_running_loop() @@ -173,7 +169,6 @@ async def _on_download_progress(self, event: dict[str, Any]) -> None: async def wait_for_download( self, *, timeout: float | None = None ) -> DownloadInfo | None: - print(f"\n\nCDPDownloadHandler.wait_for_download called") """Wait for the next download to complete and return its info. If no download events have been received yet, this will block until one @@ -237,7 +232,6 @@ async def wait_for_all( Returns a list of :class:`DownloadInfo` for every download that completed successfully. Downloads that were canceled or interrupted are skipped. """ - print(f"\n\nCDPDownloadHandler.wait_for_all called") if not self._downloads: return [] @@ -290,7 +284,6 @@ async def download_remote_file_to_local( Returns: The resolved local :class:`~pathlib.Path`, or ``None`` on failure. """ - print(f"\n\ndownload_remote_file_to_local called") local_path = Path(local_path) local_path.parent.mkdir(parents=True, exist_ok=True) @@ -354,7 +347,7 @@ async def _on_request_paused(event: dict[str, Any]) -> None: # Stream the file contents to local disk in chunks. stream_start = time.perf_counter() start_ts = time.strftime("%H:%M:%S", time.localtime()) - logger.warning( + logger.info( "Streaming file from remote to local: %s -> %s (started at %s)", remote_file_path, local_path, @@ -383,7 +376,7 @@ async def _on_request_paused(event: dict[str, Any]) -> None: await cdp.send("IO.close", {"handle": stream_handle}) stream_elapsed = time.perf_counter() - stream_start - logger.warning( + logger.info( "Transfer complete: %s (%s bytes) in %.2fs", local_path, f"{downloaded:,}", diff --git a/packages/narada/src/narada/window.py b/packages/narada/src/narada/window.py index a0a0d9d..95bc9f4 100644 --- a/packages/narada/src/narada/window.py +++ b/packages/narada/src/narada/window.py @@ -694,7 +694,6 @@ async def wait_for_download( Returns :class:`~narada.cloud_downloads.DownloadInfo` on success, or ``None`` on timeout / cancellation. """ - print("[cloud_browser_window] wait_for_download called") if self._download_handler is None: logger.warning("Download handler not available on this window") return None @@ -747,7 +746,6 @@ async def transfer_all_downloads( Returns: List of local paths for successfully transferred files. """ - print("[cloud_browser_window] transfer_all_downloads called") if self._download_handler is None or self._browser is None: logger.warning( "Download handler / browser not available -- cannot transfer files" From 768bd1417b51eb49b265d740ff925096ed0cb382 Mon Sep 17 00:00:00 2001 From: Volodymyr Kasaraba Date: Tue, 17 Feb 2026 18:19:16 -0500 Subject: [PATCH 8/8] upd code quality --- examples/cloud_browser.py | 4 +- packages/narada/src/narada/client.py | 3 +- packages/narada/src/narada/cloud_downloads.py | 72 ++++++++++++++++--- packages/narada/src/narada/window.py | 24 +------ 4 files changed, 66 insertions(+), 37 deletions(-) diff --git a/examples/cloud_browser.py b/examples/cloud_browser.py index a90f133..57c5ea3 100644 --- a/examples/cloud_browser.py +++ b/examples/cloud_browser.py @@ -10,7 +10,7 @@ async def main() -> None: # Open a cloud browser window and initialize the Narada UI agent. window = await narada.open_and_initialize_cloud_browser_window( session_name="my-cloud-browser-session", # Optional: label the session - session_timeout=3600, # Optional: timeout in seconds, up to 8 hours, default is 30 minutes + session_timeout=1800, # Optional: timeout in seconds, up to 8 hours, default is 30 minutes ) # Run a task in this browser window. @@ -21,7 +21,7 @@ async def main() -> None: ) ) - # All downloads are placed in the cloud_downloads directory. + # All downloads are placed in the narada-python-sdk/cloud_downloads directory. print("Response:", response.model_dump_json(indent=2)) diff --git a/packages/narada/src/narada/client.py b/packages/narada/src/narada/client.py index 66923e9..db6bb7b 100644 --- a/packages/narada/src/narada/client.py +++ b/packages/narada/src/narada/client.py @@ -291,8 +291,7 @@ async def _initialize_cloud_browser_window( await self._fix_download_behavior(side_panel_page) # Set up browser-level CDP download handler to capture downloads from any tab. - # When no callback is provided, use a default that transfers each completed - # download to ./cloud_downloads// so concurrent sessions do not overwrite. + # When no callback is provided, use a default that transfers files to local disk. on_download_complete = config.on_download_complete if on_download_complete is None: loop = asyncio.get_running_loop() diff --git a/packages/narada/src/narada/cloud_downloads.py b/packages/narada/src/narada/cloud_downloads.py index bacbf56..7657076 100644 --- a/packages/narada/src/narada/cloud_downloads.py +++ b/packages/narada/src/narada/cloud_downloads.py @@ -26,6 +26,11 @@ DEFAULT_REMOTE_DOWNLOAD_DIR = "/tmp/remote_downloads" CHUNK_SIZE = 4 * 1024 * 1024 # 4 MB +# When wait_for_all is called with empty _downloads, poll this long for downloads +# to appear (avoids race where post-action transfer misses a download that +# starts moments later). +_WAIT_FOR_ALL_EMPTY_GRACE_SECONDS = 5.0 + # CDP Browser.downloadProgress state values _STATE_IN_PROGRESS = "inProgress" _STATE_COMPLETED = "completed" @@ -172,31 +177,55 @@ async def wait_for_download( """Wait for the next download to complete and return its info. If no download events have been received yet, this will block until one - arrives and finishes (or the *timeout* expires). + arrives and finishes (or the *timeout* expires). If a download already + completed before this call (e.g. post-action usage), returns that + completed download immediately. Returns ``None`` on timeout or if the download was canceled/interrupted. """ - # Find first download that hasn't finished yet, or the most recent completed one - # that hasn't been consumed. + # Prefer an in-progress download so we wait for it; otherwise use an + # already-completed one (post-action usage). target_guid: str | None = None + last_completed_guid: str | None = None for guid, info in self._downloads.items(): if info["state"] == _STATE_IN_PROGRESS: target_guid = guid break + if info["state"] == _STATE_COMPLETED: + last_completed_guid = guid + + if target_guid is None and last_completed_guid is not None: + # At least one download already completed; return the most recent. + info = self._downloads[last_completed_guid] + return DownloadInfo( + guid=last_completed_guid, + filename=info["filename"], + remote_path=f"{self._remote_download_dir}/{last_completed_guid}", + size=info["received"], + ) if target_guid is None: - # All existing downloads are already done; wait for a new one by polling. - # We do a simple poll loop so we can detect newly arriving downloads. + # No in-progress and no completed; wait for a new one by polling. loop = asyncio.get_running_loop() deadline = (loop.time() + timeout) if timeout is not None else None while True: for guid, info in self._downloads.items(): - if guid not in self._done_events or not self._done_events[guid].is_set(): - if info["state"] == _STATE_IN_PROGRESS: - target_guid = guid - break + if info["state"] == _STATE_IN_PROGRESS: + target_guid = guid + break + if info["state"] == _STATE_COMPLETED: + last_completed_guid = guid if target_guid is not None: break + if last_completed_guid is not None: + # New completed download appeared while polling + info = self._downloads[last_completed_guid] + return DownloadInfo( + guid=last_completed_guid, + filename=info["filename"], + remote_path=f"{self._remote_download_dir}/{last_completed_guid}", + size=info["received"], + ) if deadline is not None and loop.time() >= deadline: return None await asyncio.sleep(0.5) @@ -231,9 +260,30 @@ async def wait_for_all( Returns a list of :class:`DownloadInfo` for every download that completed successfully. Downloads that were canceled or interrupted are skipped. + + When no downloads are tracked yet (e.g. post-action call before + Browser.downloadWillBegin has fired), waits a short time for downloads + to appear so in-flight download events are not missed. """ - if not self._downloads: - return [] + loop = asyncio.get_running_loop() + deadline: float | None = None + if timeout is not None: + deadline = loop.time() + timeout + empty_wait_until: float = ( + loop.time() + _WAIT_FOR_ALL_EMPTY_GRACE_SECONDS + if timeout is None + else deadline + ) + + # Avoid race: if _downloads is empty, wait for at least one download to + # appear (post-action usage where download starts moments later). + while not self._downloads: + if loop.time() >= empty_wait_until: + return [] + await asyncio.sleep(0.5) + + if deadline is not None: + timeout = max(0, deadline - loop.time()) # Gather all done-events with an optional timeout. waiter = asyncio.gather(*(ev.wait() for ev in self._done_events.values())) diff --git a/packages/narada/src/narada/window.py b/packages/narada/src/narada/window.py index 95bc9f4..024a6b5 100644 --- a/packages/narada/src/narada/window.py +++ b/packages/narada/src/narada/window.py @@ -689,11 +689,7 @@ def cloud_browser_session_id(self) -> str: async def wait_for_download( self, *, timeout: float | None = None ) -> DownloadInfo | None: - """Wait for the next download to complete on the remote browser. - - Returns :class:`~narada.cloud_downloads.DownloadInfo` on success, or - ``None`` on timeout / cancellation. - """ + """Wait for the next download to complete on the remote browser.""" if self._download_handler is None: logger.warning("Download handler not available on this window") return None @@ -705,16 +701,7 @@ async def transfer_download( local_path: str | Path, ) -> Path | None: """Transfer a single completed download from the remote browser to local disk. - Uses CDP Fetch + IO.read to stream the file in chunks over the WebSocket. - - Args: - download: A :class:`~narada.cloud_downloads.DownloadInfo` obtained from - :meth:`wait_for_download`. - local_path: Where to save the file locally. - - Returns: - The resolved local path, or ``None`` on failure. """ if self._browser is None: logger.warning("Browser reference not available -- cannot transfer file") @@ -738,13 +725,6 @@ async def transfer_all_downloads( download handler and then :meth:`transfer_download` for each result. Files are saved under ``local_dir / session_id / filename`` so concurrent sessions do not overwrite. - - Args: - local_dir: Base directory to save files into. Created automatically. - timeout: Maximum seconds to wait for downloads to finish. - - Returns: - List of local paths for successfully transferred files. """ if self._download_handler is None or self._browser is None: logger.warning( @@ -812,7 +792,7 @@ async def close( # Disconnect Playwright from the browser if self._browser is not None: try: - self._browser.close() + await self._browser.close() except Exception: pass self._browser = None