diff --git a/cognite_toolkit/_cdf_tk/client/_toolkit_client.py b/cognite_toolkit/_cdf_tk/client/_toolkit_client.py index 1218a0509f..39255a6696 100644 --- a/cognite_toolkit/_cdf_tk/client/_toolkit_client.py +++ b/cognite_toolkit/_cdf_tk/client/_toolkit_client.py @@ -10,6 +10,7 @@ from .api.agents import AgentsAPI from .api.alerts import AlertsAPI from .api.annotations import AnnotationsAPI +from .api.apps import AppsAPI from .api.assets import AssetsAPI from .api.canvas import IndustrialCanvasAPI from .api.cognite_files import CogniteFilesAPI @@ -64,6 +65,7 @@ class ToolAPI: def __init__(self, http_client: HTTPClient, console: Console) -> None: self.http_client = http_client self.agents = AgentsAPI(http_client) + self.apps = AppsAPI(http_client) self.annotations = AnnotationsAPI(http_client) self.assets = AssetsAPI(http_client) self.cognite_files = CogniteFilesAPI(http_client) diff --git a/cognite_toolkit/_cdf_tk/client/api/apps.py b/cognite_toolkit/_cdf_tk/client/api/apps.py new file mode 100644 index 0000000000..ca1dcb36c1 --- /dev/null +++ b/cognite_toolkit/_cdf_tk/client/api/apps.py @@ -0,0 +1,149 @@ +"""AppsAPI: Custom apps deployed via the CDF App Hosting API.""" + +import json +from collections.abc import Iterable, Sequence +from pathlib import Path + +from cognite_toolkit._cdf_tk.client.http_client import HTTPClient, RequestMessage +from cognite_toolkit._cdf_tk.client.http_client._data_classes import FailedResponse, SuccessResponse +from cognite_toolkit._cdf_tk.client.http_client._exception import ToolkitAPIError +from cognite_toolkit._cdf_tk.client.identifiers import AppVersionId +from cognite_toolkit._cdf_tk.client.resource_classes.app import AppRequest, AppResponse + + +class AppsAPI: + """Client for the CDF App Hosting API (POST /apphosting/...).""" + + def __init__(self, http_client: HTTPClient) -> None: + self._http_client = http_client + + def _url(self, path: str) -> str: + return self._http_client.config.create_api_url(path) + + def ensure_app(self, item: AppRequest) -> None: + """POST /apphosting/apps — create the app if it does not exist; 409 = already exists (idempotent).""" + request = RequestMessage( + endpoint_url=self._url("/apphosting/apps"), + method="POST", + body_content={"items": [item.dump()]}, + ) + result = self._http_client.request_single_retries(request) + if isinstance(result, SuccessResponse) or (isinstance(result, FailedResponse) and result.status_code == 409): + return + result.get_success_or_raise(request) + + def upload_version( + self, + external_id: str, + version: str, + entrypoint: str, + zip_path: Path, + ) -> None: + """POST /apphosting/apps/{externalId}/versions — multipart upload of the zipped app.""" + result = self._http_client.request_raw_retries( + method="POST", + url=self._url(f"/apphosting/apps/{external_id}/versions"), + files={"file": ("app.zip", zip_path, "application/zip")}, + data={"version": version, "entryPath": entrypoint}, + add_auth=True, + ) + # 409 means this exact version already exists — treat as success (idempotent). + if isinstance(result, SuccessResponse) or (isinstance(result, FailedResponse) and result.status_code == 409): + return + raise ToolkitAPIError(message=result.body, code=result.status_code) + + def update_version(self, external_id: str, version: str, update: dict) -> None: + """POST /apphosting/apps/{externalId}/versions/update — apply one or more field updates to a version.""" + request = RequestMessage( + endpoint_url=self._url(f"/apphosting/apps/{external_id}/versions/update"), + method="POST", + body_content={"items": [{"version": version, "update": update}]}, + ) + self._http_client.request_single_retries(request).get_success_or_raise(request) + + def retrieve_version(self, external_id: str, version: str, ignore_unknown_ids: bool = False) -> AppResponse | None: + """Retrieve version metadata + app-level name/description in two calls.""" + version_request = RequestMessage( + endpoint_url=self._url(f"/apphosting/apps/{external_id}/versions/{version}"), + method="GET", + ) + version_result = self._http_client.request_single_retries(version_request) + if not isinstance(version_result, SuccessResponse): + if ( + isinstance(version_result, FailedResponse) + and version_result.status_code in (400, 404) + and ignore_unknown_ids + ): + return None + version_result.get_success_or_raise(version_request) + return None + + version_data = json.loads(version_result.body) + + app_request = RequestMessage( + endpoint_url=self._url(f"/apphosting/apps/{external_id}"), + method="GET", + ) + app_result = self._http_client.request_single_retries(app_request) + app_data = json.loads(app_result.body) if isinstance(app_result, SuccessResponse) else {} + + return AppResponse( + external_id=version_data.get("appExternalId", external_id), + version=version_data.get("version", version), + name=app_data.get("name", ""), + description=app_data.get("description"), + lifecycle_state=version_data.get("lifecycleState", "DRAFT"), + alias=version_data.get("alias"), + entrypoint=version_data.get("entrypoint", "index.html"), + ) + + def iterate(self, limit: int | None = 100) -> Iterable[list[AppResponse]]: + """POST /apphosting/versions/list — paginated list of all versions across all apps.""" + cursor: str | None = None + page_limit = min(limit, 1000) if limit is not None else 1000 + fetched = 0 + while True: + body: dict = {"limit": page_limit} + if cursor: + body["cursor"] = cursor + request = RequestMessage( + endpoint_url=self._url("/apphosting/versions/list"), + method="POST", + body_content=body, + ) + result = self._http_client.request_single_retries(request) + if not isinstance(result, SuccessResponse): + result.get_success_or_raise(request) + break + + data = json.loads(result.body) + page_items = [ + AppResponse( + external_id=item["appExternalId"], + version=item["version"], + name="", + description=None, + lifecycle_state=item.get("lifecycleState", "DRAFT"), + alias=item.get("alias"), + entrypoint=item.get("entrypoint", "index.html"), + ) + for item in data.get("items", []) + ] + if page_items: + yield page_items + fetched += len(page_items) + + cursor = data.get("nextCursor") + if not cursor or (limit is not None and fetched >= limit): + break + + def delete_version(self, external_id: str, versions: Sequence[AppVersionId]) -> None: + """POST /apphosting/apps/{externalId}/versions/delete — delete specific versions of an app.""" + if not versions: + return + request = RequestMessage( + endpoint_url=self._url(f"/apphosting/apps/{external_id}/versions/delete"), + method="POST", + body_content={"items": [{"version": v.version} for v in versions]}, + ) + self._http_client.request_single_retries(request).get_success_or_raise(request) diff --git a/cognite_toolkit/_cdf_tk/client/http_client/_client.py b/cognite_toolkit/_cdf_tk/client/http_client/_client.py index d7d70f7970..18cd4712bf 100644 --- a/cognite_toolkit/_cdf_tk/client/http_client/_client.py +++ b/cognite_toolkit/_cdf_tk/client/http_client/_client.py @@ -3,6 +3,7 @@ import time from collections import deque from collections.abc import Iterable, MutableMapping, Sequence, Set +from pathlib import Path from typing import Literal, TypeVar import httpx @@ -263,37 +264,70 @@ def request_raw_retries( self, method: Literal["GET", "POST", "PUT", "DELETE"], url: str, - content: bytes | Iterable[bytes], + content: bytes | Iterable[bytes] | None = None, headers: dict[str, str] | None = None, max_retries: int | None = None, + files: dict[str, tuple[str, Path, str]] | None = None, + data: dict[str, str] | None = None, + add_auth: bool = False, ) -> SuccessResponse | FailedResponse: - """Send a raw HTTP request with retry logic but without authentication headers. + """Send a raw HTTP request with retry logic. - This is useful for uploading to signed URLs (e.g., GCS signed URLs) where - authentication is embedded in the URL and adding auth headers would cause errors. + By default does not add authentication headers, which makes it suitable for + uploading to signed URLs (e.g., GCS signed URLs) where authentication is + embedded in the URL. Set add_auth=True for authenticated CDF endpoints. + + Pass either content (raw bytes/stream) or files+data (multipart form upload). + When files is provided, each file is re-opened on every retry attempt. Args: method: HTTP method to use. url: The URL to send the request to. - content: The content to send. Can be bytes or an iterable of bytes for streaming. - headers: Optional headers to include in the request. + content: Raw bytes or streaming content. Mutually exclusive with files. + headers: Optional extra headers to include in the request. max_retries: Maximum number of retries. Defaults to the client's max_retries setting. + files: Multipart file parts as {field_name: (filename, path, mime_type)}. + data: Multipart text fields, sent alongside files. + add_auth: When True, adds CDF authentication and SDK headers. Returns: HTTPResult: The result of the HTTP request, either SuccessResponse or FailedResponse. """ retries = max_retries if max_retries is not None else self._max_retries + if add_auth: + merged_headers = dict(self._create_headers(disable_gzip=True)) + del merged_headers["Content-Type"] # httpx sets this for multipart/form-data + if headers: + merged_headers.update(headers) + request_headers: dict[str, str] | None = merged_headers + else: + request_headers = headers attempt = 0 last_error_code: int = -1 while attempt <= retries: try: - response = self.session.request( - method=method, - url=url, - content=content, - headers=headers, - follow_redirects=False, - ) + if files is not None: + open_files = {name: (fname, path.open("rb"), mime) for name, (fname, path, mime) in files.items()} + try: + response = self.session.request( + method=method, + url=url, + files=open_files, + data=data, + headers=request_headers, + follow_redirects=False, + ) + finally: + for _name, (_fname, file_obj, _mime) in open_files.items(): + file_obj.close() + else: + response = self.session.request( + method=method, + url=url, + content=content, + headers=request_headers, + follow_redirects=False, + ) if 200 <= response.status_code < 300: return SuccessResponse( status_code=response.status_code, @@ -331,10 +365,10 @@ def request_raw_retries( return FailedResponse( status_code=last_error_code, body=f"Request failed after {attempt} attempts: {e!s}", - error=ErrorDetails(code=last_error_code, message=f"Request failed after {attempt} attempts: {e!s}"), + error=ErrorDetails( + code=last_error_code, message=f"Request failed after {attempt} attempts: {e!s}" + ), ) - - # Should not reach here, but just in case return FailedResponse( status_code=last_error_code, body=f"Request failed after {attempt} attempts.", diff --git a/tests/test_unit/test_cdf_tk/test_client/test_cdf_apis.py b/tests/test_unit/test_cdf_tk/test_client/test_cdf_apis.py index a50cad8183..3317719e49 100644 --- a/tests/test_unit/test_cdf_tk/test_client/test_cdf_apis.py +++ b/tests/test_unit/test_cdf_tk/test_client/test_cdf_apis.py @@ -1,5 +1,6 @@ import gzip import json +from pathlib import Path from typing import Any from unittest.mock import MagicMock @@ -12,6 +13,7 @@ from cognite_toolkit._cdf_tk.client._resource_base import ResponseResource from cognite_toolkit._cdf_tk.client.api.alert_channels import AlertChannelsAPI from cognite_toolkit._cdf_tk.client.api.annotations import AnnotationsAPI +from cognite_toolkit._cdf_tk.client.api.apps import AppsAPI from cognite_toolkit._cdf_tk.client.api.chart_scheduled_calculations import ChartScheduledCalculationsAPI from cognite_toolkit._cdf_tk.client.api.charts_folders import ChartFoldersAPI from cognite_toolkit._cdf_tk.client.api.charts_monitoring_job import ChartMonitoringJobsAPI @@ -34,10 +36,11 @@ from cognite_toolkit._cdf_tk.client.cdf_client import CDFResourceAPI, PagedResponse from cognite_toolkit._cdf_tk.client.cdf_client.api import APIMethod from cognite_toolkit._cdf_tk.client.http_client import HTTPClient -from cognite_toolkit._cdf_tk.client.identifiers import ExternalId, PrincipalId +from cognite_toolkit._cdf_tk.client.identifiers import AppVersionId, ExternalId, PrincipalId from cognite_toolkit._cdf_tk.client.request_classes.filters import AnnotationFilter from cognite_toolkit._cdf_tk.client.resource_classes.alert_channel import AlertChannelResponse from cognite_toolkit._cdf_tk.client.resource_classes.annotation import AnnotationResponse +from cognite_toolkit._cdf_tk.client.resource_classes.app import AppRequest from cognite_toolkit._cdf_tk.client.resource_classes.chart_folder import ( ChartFolderRequest, ChartFolderResponse, @@ -1226,6 +1229,78 @@ def test_alert_channels_api_list_method( assert len(listed) == 1 assert listed[0].dump() == resource + def test_apps_api_methods(self, toolkit_config: ToolkitClientConfig, respx_mock: respx.MockRouter, tmp_path: Path) -> None: + config = toolkit_config + api = AppsAPI(HTTPClient(config)) + app_external_id = "my-app" + version = "1.0.0" + app_request = AppRequest(external_id=app_external_id, version=version, name="My App") + zip_path = tmp_path / "app.zip" + zip_path.write_bytes(b"fake-zip") + version_json = { + "appExternalId": app_external_id, + "version": version, + "lifecycleState": "DRAFT", + "entrypoint": "index.html", + } + app_json = {"externalId": app_external_id, "name": "My App"} + + # Test ensure_app (200 and 409 both succeed) + respx_mock.post(config.create_api_url("/apphosting/apps")).mock(return_value=httpx.Response(status_code=200)) + api.ensure_app(app_request) + respx_mock.post(config.create_api_url("/apphosting/apps")).mock(return_value=httpx.Response(status_code=409)) + api.ensure_app(app_request) + + # Test upload_version (200 and 409 both succeed) + respx_mock.post(config.create_api_url(f"/apphosting/apps/{app_external_id}/versions")).mock( + return_value=httpx.Response(status_code=200) + ) + api.upload_version(app_external_id, version, "index.html", zip_path) + respx_mock.post(config.create_api_url(f"/apphosting/apps/{app_external_id}/versions")).mock( + return_value=httpx.Response(status_code=409) + ) + api.upload_version(app_external_id, version, "index.html", zip_path) + + # Test update_version + respx_mock.post(config.create_api_url(f"/apphosting/apps/{app_external_id}/versions/update")).mock( + return_value=httpx.Response(status_code=200, json={"items": [version_json]}) + ) + api.update_version(app_external_id, version, {"lifecycleState": {"set": "PUBLISHED"}}) + + # Test retrieve_version (two calls merged into one response) + respx_mock.get(config.create_api_url(f"/apphosting/apps/{app_external_id}/versions/{version}")).mock( + return_value=httpx.Response(status_code=200, json=version_json) + ) + respx_mock.get(config.create_api_url(f"/apphosting/apps/{app_external_id}")).mock( + return_value=httpx.Response(status_code=200, json=app_json) + ) + retrieved = api.retrieve_version(app_external_id, version) + assert retrieved is not None + assert retrieved.version == version + assert retrieved.name == "My App" + assert retrieved.lifecycle_state == "DRAFT" + + # Test retrieve_version with 404 and ignore_unknown_ids + respx_mock.get(config.create_api_url(f"/apphosting/apps/{app_external_id}/versions/{version}")).mock( + return_value=httpx.Response(status_code=404) + ) + assert api.retrieve_version(app_external_id, version, ignore_unknown_ids=True) is None + + # Test iterate + respx_mock.post(config.create_api_url("/apphosting/versions/list")).mock( + return_value=httpx.Response(status_code=200, json={"items": [version_json]}) + ) + batches = list(api.iterate(limit=10)) + assert len(batches) == 1 + assert batches[0][0].version == version + + # Test delete_version + respx_mock.post(config.create_api_url(f"/apphosting/apps/{app_external_id}/versions/delete")).mock( + return_value=httpx.Response(status_code=200) + ) + api.delete_version(app_external_id, [AppVersionId(app_external_id=app_external_id, version=version)]) + assert len(respx_mock.calls) >= 1 + def test_task_move_type_to_field_handles_none_validation_data() -> None: """Pydantic may supply ValidationInfo.data as None; avoid 'in' on None (deploy dry-run)."""