Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cognite_toolkit/_cdf_tk/client/_toolkit_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .api.agents import AgentsAPI
from .api.alerts import AlertsAPI
from .api.annotations import AnnotationsAPI
from .api.apps import AppsAPI
from .api.assets import AssetsAPI
from .api.canvas import IndustrialCanvasAPI
from .api.cognite_files import CogniteFilesAPI
Expand Down Expand Up @@ -64,6 +65,7 @@ class ToolAPI:
def __init__(self, http_client: HTTPClient, console: Console) -> None:
self.http_client = http_client
self.agents = AgentsAPI(http_client)
self.apps = AppsAPI(http_client)
self.annotations = AnnotationsAPI(http_client)
self.assets = AssetsAPI(http_client)
self.cognite_files = CogniteFilesAPI(http_client)
Expand Down
149 changes: 149 additions & 0 deletions cognite_toolkit/_cdf_tk/client/api/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
"""AppsAPI: Custom apps deployed via the CDF App Hosting API."""

import json
from collections.abc import Iterable, Sequence
from pathlib import Path

from cognite_toolkit._cdf_tk.client.http_client import HTTPClient, RequestMessage
from cognite_toolkit._cdf_tk.client.http_client._data_classes import FailedResponse, SuccessResponse
from cognite_toolkit._cdf_tk.client.http_client._exception import ToolkitAPIError
from cognite_toolkit._cdf_tk.client.identifiers import AppVersionId
from cognite_toolkit._cdf_tk.client.resource_classes.app import AppRequest, AppResponse


class AppsAPI:
"""Client for the CDF App Hosting API (POST /apphosting/...)."""

def __init__(self, http_client: HTTPClient) -> None:
self._http_client = http_client

def _url(self, path: str) -> str:
return self._http_client.config.create_api_url(path)

def ensure_app(self, item: AppRequest) -> None:
"""POST /apphosting/apps — create the app if it does not exist; 409 = already exists (idempotent)."""
Comment thread
Magssch marked this conversation as resolved.
request = RequestMessage(
endpoint_url=self._url("/apphosting/apps"),
method="POST",
body_content={"items": [item.dump()]},
)
result = self._http_client.request_single_retries(request)
if isinstance(result, SuccessResponse) or (isinstance(result, FailedResponse) and result.status_code == 409):
return
result.get_success_or_raise(request)

def upload_version(
self,
external_id: str,
version: str,
entrypoint: str,
zip_path: Path,
) -> None:
"""POST /apphosting/apps/{externalId}/versions — multipart upload of the zipped app."""
result = self._http_client.request_raw_retries(
method="POST",
url=self._url(f"/apphosting/apps/{external_id}/versions"),
files={"file": ("app.zip", zip_path, "application/zip")},
data={"version": version, "entryPath": entrypoint},
add_auth=True,
)
# 409 means this exact version already exists — treat as success (idempotent).
if isinstance(result, SuccessResponse) or (isinstance(result, FailedResponse) and result.status_code == 409):
return
raise ToolkitAPIError(message=result.body, code=result.status_code)

def update_version(self, external_id: str, version: str, update: dict) -> None:
Comment thread
Magssch marked this conversation as resolved.
"""POST /apphosting/apps/{externalId}/versions/update — apply one or more field updates to a version."""
request = RequestMessage(
endpoint_url=self._url(f"/apphosting/apps/{external_id}/versions/update"),
method="POST",
body_content={"items": [{"version": version, "update": update}]},
)
self._http_client.request_single_retries(request).get_success_or_raise(request)

def retrieve_version(self, external_id: str, version: str, ignore_unknown_ids: bool = False) -> AppResponse | None:
"""Retrieve version metadata + app-level name/description in two calls."""
version_request = RequestMessage(
endpoint_url=self._url(f"/apphosting/apps/{external_id}/versions/{version}"),
method="GET",
)
version_result = self._http_client.request_single_retries(version_request)
if not isinstance(version_result, SuccessResponse):
if (
isinstance(version_result, FailedResponse)
and version_result.status_code in (400, 404)
and ignore_unknown_ids
):
return None
version_result.get_success_or_raise(version_request)
return None

version_data = json.loads(version_result.body)

app_request = RequestMessage(
endpoint_url=self._url(f"/apphosting/apps/{external_id}"),
method="GET",
)
app_result = self._http_client.request_single_retries(app_request)
app_data = json.loads(app_result.body) if isinstance(app_result, SuccessResponse) else {}

return AppResponse(
external_id=version_data.get("appExternalId", external_id),
version=version_data.get("version", version),
name=app_data.get("name", ""),
description=app_data.get("description"),
lifecycle_state=version_data.get("lifecycleState", "DRAFT"),
alias=version_data.get("alias"),
entrypoint=version_data.get("entrypoint", "index.html"),
)

def iterate(self, limit: int | None = 100) -> Iterable[list[AppResponse]]:
"""POST /apphosting/versions/list — paginated list of all versions across all apps."""
cursor: str | None = None
page_limit = min(limit, 1000) if limit is not None else 1000
fetched = 0
while True:
body: dict = {"limit": page_limit}
if cursor:
body["cursor"] = cursor
request = RequestMessage(
endpoint_url=self._url("/apphosting/versions/list"),
method="POST",
body_content=body,
)
result = self._http_client.request_single_retries(request)
if not isinstance(result, SuccessResponse):
result.get_success_or_raise(request)
break

data = json.loads(result.body)
page_items = [
AppResponse(
external_id=item["appExternalId"],
version=item["version"],
name="",
description=None,
lifecycle_state=item.get("lifecycleState", "DRAFT"),
alias=item.get("alias"),
entrypoint=item.get("entrypoint", "index.html"),
)
for item in data.get("items", [])
]
if page_items:
yield page_items
fetched += len(page_items)

cursor = data.get("nextCursor")
if not cursor or (limit is not None and fetched >= limit):
break

def delete_version(self, external_id: str, versions: Sequence[AppVersionId]) -> None:
"""POST /apphosting/apps/{externalId}/versions/delete — delete specific versions of an app."""
if not versions:
return
request = RequestMessage(
endpoint_url=self._url(f"/apphosting/apps/{external_id}/versions/delete"),
method="POST",
body_content={"items": [{"version": v.version} for v in versions]},
)
self._http_client.request_single_retries(request).get_success_or_raise(request)
66 changes: 50 additions & 16 deletions cognite_toolkit/_cdf_tk/client/http_client/_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import time
from collections import deque
from collections.abc import Iterable, MutableMapping, Sequence, Set
from pathlib import Path
from typing import Literal, TypeVar

import httpx
Expand Down Expand Up @@ -263,37 +264,70 @@ def request_raw_retries(
self,
method: Literal["GET", "POST", "PUT", "DELETE"],
url: str,
content: bytes | Iterable[bytes],
content: bytes | Iterable[bytes] | None = None,
headers: dict[str, str] | None = None,
max_retries: int | None = None,
files: dict[str, tuple[str, Path, str]] | None = None,
data: dict[str, str] | None = None,
add_auth: bool = False,
) -> SuccessResponse | FailedResponse:
"""Send a raw HTTP request with retry logic but without authentication headers.
"""Send a raw HTTP request with retry logic.

This is useful for uploading to signed URLs (e.g., GCS signed URLs) where
authentication is embedded in the URL and adding auth headers would cause errors.
By default does not add authentication headers, which makes it suitable for
uploading to signed URLs (e.g., GCS signed URLs) where authentication is
embedded in the URL. Set add_auth=True for authenticated CDF endpoints.

Pass either content (raw bytes/stream) or files+data (multipart form upload).
When files is provided, each file is re-opened on every retry attempt.

Args:
method: HTTP method to use.
url: The URL to send the request to.
content: The content to send. Can be bytes or an iterable of bytes for streaming.
headers: Optional headers to include in the request.
content: Raw bytes or streaming content. Mutually exclusive with files.
headers: Optional extra headers to include in the request.
max_retries: Maximum number of retries. Defaults to the client's max_retries setting.
files: Multipart file parts as {field_name: (filename, path, mime_type)}.
data: Multipart text fields, sent alongside files.
add_auth: When True, adds CDF authentication and SDK headers.

Returns:
HTTPResult: The result of the HTTP request, either SuccessResponse or FailedResponse.
"""
retries = max_retries if max_retries is not None else self._max_retries
if add_auth:
merged_headers = dict(self._create_headers(disable_gzip=True))
del merged_headers["Content-Type"] # httpx sets this for multipart/form-data
if headers:
merged_headers.update(headers)
request_headers: dict[str, str] | None = merged_headers
else:
request_headers = headers
attempt = 0
last_error_code: int = -1
while attempt <= retries:
try:
response = self.session.request(
method=method,
url=url,
content=content,
headers=headers,
follow_redirects=False,
)
if files is not None:
open_files = {name: (fname, path.open("rb"), mime) for name, (fname, path, mime) in files.items()}
try:
response = self.session.request(
method=method,
url=url,
files=open_files,
data=data,
headers=request_headers,
follow_redirects=False,
)
finally:
for _name, (_fname, file_obj, _mime) in open_files.items():
file_obj.close()
else:
response = self.session.request(
method=method,
url=url,
content=content,
headers=request_headers,
follow_redirects=False,
)
if 200 <= response.status_code < 300:
return SuccessResponse(
status_code=response.status_code,
Expand Down Expand Up @@ -331,10 +365,10 @@ def request_raw_retries(
return FailedResponse(
status_code=last_error_code,
body=f"Request failed after {attempt} attempts: {e!s}",
error=ErrorDetails(code=last_error_code, message=f"Request failed after {attempt} attempts: {e!s}"),
error=ErrorDetails(
code=last_error_code, message=f"Request failed after {attempt} attempts: {e!s}"
),
)

# Should not reach here, but just in case
return FailedResponse(
status_code=last_error_code,
body=f"Request failed after {attempt} attempts.",
Expand Down
77 changes: 76 additions & 1 deletion tests/test_unit/test_cdf_tk/test_client/test_cdf_apis.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import gzip
import json
from pathlib import Path
from typing import Any
from unittest.mock import MagicMock

Expand All @@ -12,6 +13,7 @@
from cognite_toolkit._cdf_tk.client._resource_base import ResponseResource
from cognite_toolkit._cdf_tk.client.api.alert_channels import AlertChannelsAPI
from cognite_toolkit._cdf_tk.client.api.annotations import AnnotationsAPI
from cognite_toolkit._cdf_tk.client.api.apps import AppsAPI
from cognite_toolkit._cdf_tk.client.api.chart_scheduled_calculations import ChartScheduledCalculationsAPI
from cognite_toolkit._cdf_tk.client.api.charts_folders import ChartFoldersAPI
from cognite_toolkit._cdf_tk.client.api.charts_monitoring_job import ChartMonitoringJobsAPI
Expand All @@ -34,10 +36,11 @@
from cognite_toolkit._cdf_tk.client.cdf_client import CDFResourceAPI, PagedResponse
from cognite_toolkit._cdf_tk.client.cdf_client.api import APIMethod
from cognite_toolkit._cdf_tk.client.http_client import HTTPClient
from cognite_toolkit._cdf_tk.client.identifiers import ExternalId, PrincipalId
from cognite_toolkit._cdf_tk.client.identifiers import AppVersionId, ExternalId, PrincipalId
from cognite_toolkit._cdf_tk.client.request_classes.filters import AnnotationFilter
from cognite_toolkit._cdf_tk.client.resource_classes.alert_channel import AlertChannelResponse
from cognite_toolkit._cdf_tk.client.resource_classes.annotation import AnnotationResponse
from cognite_toolkit._cdf_tk.client.resource_classes.app import AppRequest
from cognite_toolkit._cdf_tk.client.resource_classes.chart_folder import (
ChartFolderRequest,
ChartFolderResponse,
Expand Down Expand Up @@ -1226,6 +1229,78 @@ def test_alert_channels_api_list_method(
assert len(listed) == 1
assert listed[0].dump() == resource

def test_apps_api_methods(self, toolkit_config: ToolkitClientConfig, respx_mock: respx.MockRouter, tmp_path: Path) -> None:
config = toolkit_config
api = AppsAPI(HTTPClient(config))
app_external_id = "my-app"
version = "1.0.0"
app_request = AppRequest(external_id=app_external_id, version=version, name="My App")
zip_path = tmp_path / "app.zip"
zip_path.write_bytes(b"fake-zip")
version_json = {
"appExternalId": app_external_id,
"version": version,
"lifecycleState": "DRAFT",
"entrypoint": "index.html",
}
app_json = {"externalId": app_external_id, "name": "My App"}

# Test ensure_app (200 and 409 both succeed)
respx_mock.post(config.create_api_url("/apphosting/apps")).mock(return_value=httpx.Response(status_code=200))
api.ensure_app(app_request)
respx_mock.post(config.create_api_url("/apphosting/apps")).mock(return_value=httpx.Response(status_code=409))
api.ensure_app(app_request)

# Test upload_version (200 and 409 both succeed)
respx_mock.post(config.create_api_url(f"/apphosting/apps/{app_external_id}/versions")).mock(
return_value=httpx.Response(status_code=200)
)
api.upload_version(app_external_id, version, "index.html", zip_path)
respx_mock.post(config.create_api_url(f"/apphosting/apps/{app_external_id}/versions")).mock(
return_value=httpx.Response(status_code=409)
)
api.upload_version(app_external_id, version, "index.html", zip_path)

# Test update_version
respx_mock.post(config.create_api_url(f"/apphosting/apps/{app_external_id}/versions/update")).mock(
return_value=httpx.Response(status_code=200, json={"items": [version_json]})
)
api.update_version(app_external_id, version, {"lifecycleState": {"set": "PUBLISHED"}})

# Test retrieve_version (two calls merged into one response)
respx_mock.get(config.create_api_url(f"/apphosting/apps/{app_external_id}/versions/{version}")).mock(
return_value=httpx.Response(status_code=200, json=version_json)
)
respx_mock.get(config.create_api_url(f"/apphosting/apps/{app_external_id}")).mock(
return_value=httpx.Response(status_code=200, json=app_json)
)
retrieved = api.retrieve_version(app_external_id, version)
assert retrieved is not None
assert retrieved.version == version
assert retrieved.name == "My App"
assert retrieved.lifecycle_state == "DRAFT"

# Test retrieve_version with 404 and ignore_unknown_ids
respx_mock.get(config.create_api_url(f"/apphosting/apps/{app_external_id}/versions/{version}")).mock(
return_value=httpx.Response(status_code=404)
)
assert api.retrieve_version(app_external_id, version, ignore_unknown_ids=True) is None

# Test iterate
respx_mock.post(config.create_api_url("/apphosting/versions/list")).mock(
return_value=httpx.Response(status_code=200, json={"items": [version_json]})
)
batches = list(api.iterate(limit=10))
assert len(batches) == 1
assert batches[0][0].version == version

# Test delete_version
respx_mock.post(config.create_api_url(f"/apphosting/apps/{app_external_id}/versions/delete")).mock(
return_value=httpx.Response(status_code=200)
)
api.delete_version(app_external_id, [AppVersionId(app_external_id=app_external_id, version=version)])
assert len(respx_mock.calls) >= 1


def test_task_move_type_to_field_handles_none_validation_data() -> None:
"""Pydantic may supply ValidationInfo.data as None; avoid 'in' on None (deploy dry-run)."""
Expand Down
Loading