diff --git a/.gitignore b/.gitignore index 7962c07..ab28c09 100644 --- a/.gitignore +++ b/.gitignore @@ -2,6 +2,9 @@ data sample_data +# local environment overrides — see dev.env.example for the schema +dev.env + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] diff --git a/dev.env b/dev.env deleted file mode 100644 index 8d97a49..0000000 --- a/dev.env +++ /dev/null @@ -1,6 +0,0 @@ -INPUT_DIR=/data/input -OUTPUT_DIR=/data/output -ENVIRONMENT=local -PENNSIEVE_API_HOST=https://api.pennsieve.net -PENNSIEVE_API_HOST2=https://api2.pennsieve.net -IMPORTER_ENABLED=false diff --git a/dev.env.example b/dev.env.example new file mode 100644 index 0000000..c8a1eb1 --- /dev/null +++ b/dev.env.example @@ -0,0 +1,24 @@ +# Copy this file to dev.env and fill in the secrets locally. +# dev.env is gitignored; never commit real keys. + +INPUT_DIR=/data/input +OUTPUT_DIR=/data/output +ENVIRONMENT=local +PENNSIEVE_API_HOST=https://api.pennsieve.net +PENNSIEVE_API_HOST2=https://api2.pennsieve.net +IMPORTER_ENABLED=true + +# Pennsieve API credentials (generated in the Pennsieve UI under Personal API Tokens). +PENNSIEVE_API_KEY= +PENNSIEVE_API_SECRET= + +# Workflow instance / integration UUID. Leave blank for ad-hoc local runs; +# a UUID is generated on the fly. Set to a real id when triggered by the runner. +INTEGRATION_ID= + +# Per-converter pipeline name (mef-asset, edf-asset, etc.) recorded +# on viewer_asset rows. Each deployment overrides this. +ASSET_NAME=mef-asset + +# Set true to fall back to the pre-viewer-asset upload flow. +LEGACY_IMPORT_FLOW=false diff --git a/processor/asset_uploader.py b/processor/asset_uploader.py new file mode 100644 index 0000000..1c83d60 --- /dev/null +++ b/processor/asset_uploader.py @@ -0,0 +1,109 @@ +"""S3 chunk uploader using STS credentials returned by packages-service. + +The credentials are short-lived (~1 hour). For very long uploads, +this will need a refresh mechanism (see packages-service POST +/assets/{id}/upload-credentials, not yet implemented). +""" + +import logging +import os +from concurrent.futures import ThreadPoolExecutor +from dataclasses import dataclass + +import boto3 +import botocore.exceptions +from clients.packages_assets_client import UploadCredentials + +log = logging.getLogger() + + +@dataclass +class ChunkUploadResult: + """Per-file upload outcome — used downstream to register ranges.""" + + relative_key: str # the chunk's path relative to the asset's prefix + full_key: str # bucket-relative S3 key (key_prefix + relative_key) + + +class AssetUploader: + """Uploads files to a viewer-asset's S3 prefix using STS credentials.""" + + def __init__(self, credentials: UploadCredentials, max_workers: int = 4): + self._creds = credentials + self._max_workers = max_workers + + def _build_client(self): + """Create a fresh boto3 S3 client bound to the STS session.""" + return boto3.client( + "s3", + aws_access_key_id=self._creds.access_key_id, + aws_secret_access_key=self._creds.secret_access_key, + aws_session_token=self._creds.session_token, + region_name=self._creds.region, + ) + + def upload_files( + self, + files: list[tuple[str, str]], + ) -> list[ChunkUploadResult]: + """Upload local files in parallel. + + Args: + files: list of (local_path, relative_key) tuples. The relative + key is the path UNDER the asset's prefix; this method + prepends key_prefix when forming the full S3 key. + + Returns: + list of ChunkUploadResult in input order. + + Raises: + botocore.exceptions.ClientError on any failure (including + ExpiredToken). No partial-success aggregation: if any file + fails, the exception propagates and the orchestrator should + treat the asset as failed. + """ + s3 = self._build_client() + results: list[ChunkUploadResult] = [None] * len(files) # type: ignore[list-item] + + def _put(index: int, local_path: str, relative_key: str) -> None: + full_key = self._creds.key_prefix + relative_key + try: + s3.upload_file(local_path, self._creds.bucket, full_key) + except botocore.exceptions.ClientError as e: + log.error( + "failed to upload %s to s3://%s/%s: %s", + local_path, + self._creds.bucket, + full_key, + e, + ) + raise + results[index] = ChunkUploadResult(relative_key=relative_key, full_key=full_key) + + with ThreadPoolExecutor(max_workers=self._max_workers) as executor: + futures = [executor.submit(_put, i, local, relative) for i, (local, relative) in enumerate(files)] + # Wait for all and surface the first exception. + for future in futures: + future.result() + + log.info( + "uploaded %d files to s3://%s/%s", + len(files), + self._creds.bucket, + self._creds.key_prefix, + ) + return results + + +def relative_key_for_chunk(chunk_filename: str) -> str: + """Map a chunk file in the local output dir to its S3 relative key. + + Today's writer produces files named: + channel-{idx:05d}_{start_us}_{end_us}.bin.gz + or after channel-id substitution in importer: + {channel_node_id}_{start_us}_{end_us}.bin.gz + + The relative key is just the basename — no nested directories. + timeseries-service validates this is a "safe relative path". + """ + return os.path.basename(chunk_filename) diff --git a/processor/clients/__init__.py b/processor/clients/__init__.py index 350c3a3..db91e52 100644 --- a/processor/clients/__init__.py +++ b/processor/clients/__init__.py @@ -5,7 +5,14 @@ from .base_client import SessionManager as SessionManager from .import_client import ImportClient as ImportClient from .import_client import ImportFile as ImportFile +from .packages_assets_client import CreatedAsset as CreatedAsset +from .packages_assets_client import PackagesAssetsClient as PackagesAssetsClient +from .packages_assets_client import UploadCredentials as UploadCredentials +from .packages_assets_client import ViewerAsset as ViewerAsset from .packages_client import PackagesClient as PackagesClient from .timeseries_client import TimeSeriesClient as TimeSeriesClient +from .timeseries_ranges_client import CreateRangesResult as CreateRangesResult +from .timeseries_ranges_client import RangeChunk as RangeChunk +from .timeseries_ranges_client import TimeSeriesRangesClient as TimeSeriesRangesClient from .workflow_client import WorkflowClient as WorkflowClient from .workflow_client import WorkflowInstance as WorkflowInstance diff --git a/processor/clients/authentication_client.py b/processor/clients/authentication_client.py index 6266b3b..a5b53e1 100644 --- a/processor/clients/authentication_client.py +++ b/processor/clients/authentication_client.py @@ -154,7 +154,5 @@ def refresh(self) -> str: self._session_token = self._cognito.refresh_token(self._refresh_token, self._session_token) else: log.info("no refresh token, re-authenticating with API key/secret") - self._session_token, self._refresh_token = self._cognito.authenticate( - self._api_key, self._api_secret - ) + self._session_token, self._refresh_token = self._cognito.authenticate(self._api_key, self._api_secret) return self._session_token diff --git a/processor/clients/base_client.py b/processor/clients/base_client.py index d93d59d..861787f 100644 --- a/processor/clients/base_client.py +++ b/processor/clients/base_client.py @@ -5,6 +5,23 @@ log = logging.getLogger() +# (connect_timeout_seconds, read_timeout_seconds) +DEFAULT_TIMEOUT = (5, 30) + + +def _is_client_error(exc: requests.RequestException) -> bool: + """Tells the backoff library when to give up. + + Give up on 4xx HTTP responses (the request itself is wrong; a retry + won't change the answer). Keep retrying everything else: 5xx, plus + connection-level errors with no response at all (timeouts, refused + connections, DNS failures). + """ + if isinstance(exc, requests.HTTPError) and exc.response is not None: + return 400 <= exc.response.status_code < 500 + return False + + # encapsulates a shared API session and token refresh class SessionManager: def __init__(self, auth_provider): diff --git a/processor/clients/packages_assets_client.py b/processor/clients/packages_assets_client.py new file mode 100644 index 0000000..f962cfa --- /dev/null +++ b/processor/clients/packages_assets_client.py @@ -0,0 +1,217 @@ +import json +import logging +from dataclasses import dataclass +from typing import Optional + +import backoff +import requests + +from .base_client import DEFAULT_TIMEOUT, BaseClient, _is_client_error + +log = logging.getLogger() + + +@dataclass +class UploadCredentials: + """Short-lived STS credentials returned alongside a created asset.""" + + access_key_id: str + secret_access_key: str + session_token: str + expiration: str + bucket: str + region: str + key_prefix: str + + @classmethod + def from_dict(cls, data: dict) -> "UploadCredentials": + return cls( + access_key_id=data["access_key_id"], + secret_access_key=data["secret_access_key"], + session_token=data["session_token"], + expiration=data["expiration"], + bucket=data["bucket"], + region=data["region"], + key_prefix=data["key_prefix"], + ) + + +@dataclass +class ViewerAsset: + id: str + dataset_id: str + name: str + asset_type: str + status: str + package_ids: list[str] + + @classmethod + def from_dict(cls, data: dict) -> "ViewerAsset": + return cls( + id=data["id"], + dataset_id=data["dataset_id"], + name=data["name"], + asset_type=data["asset_type"], + status=data["status"], + package_ids=data.get("package_ids", []), + ) + + +@dataclass +class CreatedAsset: + """Result of POST /assets: an asset row plus STS upload credentials.""" + + asset: ViewerAsset + upload_credentials: UploadCredentials + + +class PackagesAssetsClient(BaseClient): + """Client for packages-service viewer-asset endpoints.""" + + def __init__(self, api_host2, session_manager): + super().__init__(session_manager) + self.base_url = f"{api_host2}/packages" + + def _auth_headers(self) -> dict: + return { + "accept": "application/json", + "content-type": "application/json", + "Authorization": f"Bearer {self.session_manager.session_token}", + } + + @backoff.on_exception( + backoff.expo, + requests.RequestException, + max_tries=3, + giveup=_is_client_error, + ) + @BaseClient.retry_with_refresh + def create_asset( + self, + dataset_id: str, + package_ids: list[str], + name: str, + asset_type: str, + properties: Optional[dict] = None, + ) -> CreatedAsset: + """Create a viewer_asset, link the given packages, and return STS upload creds.""" + url = f"{self.base_url}/assets" + params = {"dataset_id": dataset_id} + body: dict = { + "name": name, + "asset_type": asset_type, + "package_ids": package_ids, + } + if properties is not None: + body["properties"] = properties + + try: + response = requests.post( + url, + params=params, + headers=self._auth_headers(), + json=body, + timeout=DEFAULT_TIMEOUT, + ) + response.raise_for_status() + data = response.json() + return CreatedAsset( + asset=ViewerAsset.from_dict(data["asset"]), + upload_credentials=UploadCredentials.from_dict(data["upload_credentials"]), + ) + except requests.HTTPError as e: + log.error("failed to create viewer asset: %s", e) + raise + except (KeyError, json.JSONDecodeError) as e: + log.error("malformed create-asset response: %s", e) + raise + + @backoff.on_exception( + backoff.expo, + requests.RequestException, + max_tries=3, + giveup=_is_client_error, + ) + @BaseClient.retry_with_refresh + def list_assets_for_package(self, dataset_id: str, package_id: str) -> list[ViewerAsset]: + """List viewer_assets attached to a package. Used for idempotent re-runs.""" + url = f"{self.base_url}/assets" + params = {"dataset_id": dataset_id, "package_id": package_id} + + try: + response = requests.get( + url, + params=params, + headers=self._auth_headers(), + timeout=DEFAULT_TIMEOUT, + ) + response.raise_for_status() + data = response.json() + return [ViewerAsset.from_dict(a) for a in data.get("assets", [])] + except requests.HTTPError as e: + log.error("failed to list viewer assets for package %s: %s", package_id, e) + raise + + @backoff.on_exception( + backoff.expo, + requests.RequestException, + max_tries=3, + giveup=_is_client_error, + ) + @BaseClient.retry_with_refresh + def update_asset( + self, + asset_id: str, + dataset_id: str, + status: Optional[str] = None, + properties: Optional[dict] = None, + package_ids: Optional[list[str]] = None, + ) -> ViewerAsset: + """Patch an asset. Common case: flip status to 'active' once ingest succeeds.""" + url = f"{self.base_url}/assets/{asset_id}" + params = {"dataset_id": dataset_id} + body: dict = {} + if status is not None: + body["status"] = status + if properties is not None: + body["properties"] = properties + if package_ids is not None: + body["package_ids"] = package_ids + + try: + response = requests.patch( + url, + params=params, + headers=self._auth_headers(), + json=body, + timeout=DEFAULT_TIMEOUT, + ) + response.raise_for_status() + return ViewerAsset.from_dict(response.json()) + except requests.HTTPError as e: + log.error("failed to update viewer asset %s: %s", asset_id, e) + raise + + @backoff.on_exception( + backoff.expo, + requests.RequestException, + max_tries=3, + giveup=_is_client_error, + ) + @BaseClient.retry_with_refresh + def delete_asset(self, asset_id: str, dataset_id: str) -> None: + """Delete an asset. Triggers async S3 cleanup via the cleanup-queue lambda.""" + url = f"{self.base_url}/assets/{asset_id}" + params = {"dataset_id": dataset_id} + + try: + response = requests.delete( + url, + params=params, + headers=self._auth_headers(), + timeout=DEFAULT_TIMEOUT, + ) + response.raise_for_status() + except requests.HTTPError as e: + log.error("failed to delete viewer asset %s: %s", asset_id, e) + raise diff --git a/processor/clients/timeseries_client.py b/processor/clients/timeseries_client.py index c46cddd..2de4800 100644 --- a/processor/clients/timeseries_client.py +++ b/processor/clients/timeseries_client.py @@ -43,6 +43,35 @@ def create_channel(self, package_id, channel): log.error("failed to create time series channel: %s", e) raise e + @BaseClient.retry_with_refresh + def delete_channel(self, package_id, channel_id): + """Delete a single channel by node id. Best-effort; logs and + re-raises on failure so callers can decide how to handle it. + + Used during asset-flow ingest cleanup: when an ingest fails + after channels were created, we delete those channels (along + with deleting the asset itself) so the next run starts clean + rather than tripping over orphaned channels still pointing at + the deleted asset's id. + """ + url = f"{self.api_host}/timeseries/{package_id}/channels/{channel_id}" + headers = { + "Content-type": "application/json", + "Authorization": f"Bearer {self.session_manager.session_token}", + } + + try: + response = requests.delete(url, headers=headers) + response.raise_for_status() + except requests.HTTPError as e: + log.error( + "failed to delete channel %s on package %s: %s", + channel_id, + package_id, + e, + ) + raise + @BaseClient.retry_with_refresh def get_package_channels(self, package_id): url = f"{self.api_host}/timeseries/{package_id}/channels" diff --git a/processor/clients/timeseries_ranges_client.py b/processor/clients/timeseries_ranges_client.py new file mode 100644 index 0000000..7548c05 --- /dev/null +++ b/processor/clients/timeseries_ranges_client.py @@ -0,0 +1,148 @@ +import json +import logging +from dataclasses import dataclass +from typing import Optional + +import backoff +import requests + +from .base_client import DEFAULT_TIMEOUT, BaseClient, _is_client_error + +log = logging.getLogger() + + +@dataclass +class RangeChunk: + """One time-series chunk to register. + + s3_key is RELATIVE to the asset's prefix (no leading slash, no '..', + no 's3://...'). timeseries-service prepends the prefix server-side. + """ + + channel_node_id: str + start: int + end: int + s3_key: str + + def as_dict(self) -> dict: + # JSON keys must be snake_case to match the timeseries-service DTO. + return { + "channel_node_id": self.channel_node_id, + "start": self.start, + "end": self.end, + "s3_key": self.s3_key, + } + + +@dataclass +class CreateRangesResult: + requested: int + created: int + skipped: int + + @classmethod + def from_dict(cls, data: dict) -> "CreateRangesResult": + return cls( + requested=data["requested"], + created=data["created"], + skipped=data["skipped"], + ) + + +class TimeSeriesRangesClient(BaseClient): + """Client for timeseries-service range-registration endpoint. + + Base URL: {api_host2}/timeseries + """ + + # timeseries-service caps request size; chunk client-side if needed. + # Mirrors dto.MaxChunksPerCreateRangeRequest in timeseries-service. + MAX_CHUNKS_PER_REQUEST = 10_000 + + def __init__(self, api_host2, session_manager): + super().__init__(session_manager) + self.base_url = f"{api_host2}/timeseries" + + def _auth_headers(self) -> dict: + return { + "accept": "application/json", + "content-type": "application/json", + "Authorization": f"Bearer {self.session_manager.session_token}", + } + + @backoff.on_exception( + backoff.expo, + requests.RequestException, + max_tries=3, + giveup=_is_client_error, + ) + @BaseClient.retry_with_refresh + def create_ranges( + self, + package_node_id: str, + viewer_asset_id: str, + chunks: list[RangeChunk], + ) -> CreateRangesResult: + """POST a single batch of range chunks. Caller is responsible for + splitting > MAX_CHUNKS_PER_REQUEST into multiple calls via + create_ranges_batched. + """ + if len(chunks) == 0: + return CreateRangesResult(requested=0, created=0, skipped=0) + if len(chunks) > self.MAX_CHUNKS_PER_REQUEST: + raise ValueError(f"too many chunks ({len(chunks)}); max {self.MAX_CHUNKS_PER_REQUEST}") + + url = f"{self.base_url}/package/{package_node_id}/ranges" + body = { + "viewer_asset_id": viewer_asset_id, + "chunks": [c.as_dict() for c in chunks], + } + + try: + response = requests.post( + url, + headers=self._auth_headers(), + json=body, + timeout=DEFAULT_TIMEOUT, + ) + response.raise_for_status() + return CreateRangesResult.from_dict(response.json()) + except requests.HTTPError as e: + # 400 with InvalidChunks payload is the most useful failure to surface + log.error( + "failed to register ranges for package %s asset %s: %s", + package_node_id, + viewer_asset_id, + _format_error_response(e.response), + ) + raise + + def create_ranges_batched( + self, + package_node_id: str, + viewer_asset_id: str, + chunks: list[RangeChunk], + ) -> CreateRangesResult: + """Split chunks into MAX_CHUNKS_PER_REQUEST batches and POST each. + + Returns aggregated counts. Stops on first failure (no rollback — + each successful batch's ranges remain inserted). + """ + total = CreateRangesResult(requested=0, created=0, skipped=0) + for i in range(0, len(chunks), self.MAX_CHUNKS_PER_REQUEST): + batch = chunks[i : i + self.MAX_CHUNKS_PER_REQUEST] + result = self.create_ranges(package_node_id, viewer_asset_id, batch) + total.requested += result.requested + total.created += result.created + total.skipped += result.skipped + return total + + +def _format_error_response(response: Optional[requests.Response]) -> str: + if response is None: + return "" + try: + body = response.json() + return json.dumps(body) + except (ValueError, json.JSONDecodeError): + return response.text diff --git a/processor/config.py b/processor/config.py index 42a16b0..79ece6d 100644 --- a/processor/config.py +++ b/processor/config.py @@ -27,6 +27,17 @@ def __init__(self): self.IMPORTER_ENABLED = getboolenv("IMPORTER_ENABLED", self.ENVIRONMENT != "local") + # Per-converter pipeline name embedded in the viewer_asset row, + # e.g. "mef-asset", "edf-asset". Each deployment of post-timeseries + # sets its own value. + self.ASSET_NAME = os.getenv("ASSET_NAME", "timeseries-asset") + + # When true, fall back to the pre-viewer-asset flow that uploads + # via the Pennsieve import-manifest API. Default false → new flow + # that creates a viewer_asset, uploads via packages-service STS + # creds, and registers ranges via timeseries-service. + self.LEGACY_IMPORT_FLOW = getboolenv("LEGACY_IMPORT_FLOW", False) + def getboolenv(key, default=False): return os.getenv(key, str(default)).lower() in ("true", "1") diff --git a/processor/importer.py b/processor/importer.py index 6107ebd..fb9f041 100644 --- a/processor/importer.py +++ b/processor/importer.py @@ -1,3 +1,15 @@ +"""Pennsieve time-series ingest orchestration. + +Two flows live here, selected by `config.LEGACY_IMPORT_FLOW`: + + - `import_timeseries_via_assets` (default) — the viewer-asset flow. + Creates a viewer_asset via packages-service, uploads chunks to the + asset's S3 prefix using STS credentials, and registers ranges via + timeseries-service's POST /package/{id}/ranges. + + - `import_timeseries_legacy` — the original Pennsieve import-manifest + path. +""" import json import logging import os @@ -10,43 +22,427 @@ import backoff import requests from clients import ( + CreatedAsset, ImportClient, ImportFile, + PackagesAssetsClient, PackagesClient, + RangeChunk, TimeSeriesClient, + TimeSeriesRangesClient, WorkflowClient, ) from constants import TIME_SERIES_BINARY_FILE_EXTENSION, TIME_SERIES_METADATA_FILE_EXTENSION from timeseries_channel import TimeSeriesChannel +from processor.asset_uploader import AssetUploader, relative_key_for_chunk + logging.basicConfig(level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s") log = logging.getLogger() -""" -Uses the Pennsieve API to initialize and upload time series files -for import into Pennsieve data ecosystem. +# Pattern for files written by writer.py before channel-id substitution. +_CHANNEL_INDEX_PATTERN = re.compile(r"(channel-\d+)") -# note: this will be moved to a separated post-processor once the analysis pipeline is more -# easily able to handle > 3 processors -""" +# Pattern to pull start/end microsecond timestamps out of a chunk filename: +# channel-00007_1549968912000000_1549968926998750.bin.gz +# N:channel:abc..._1549968912000000_1549968926998750.bin.gz +_CHUNK_TIMESTAMP_PATTERN = re.compile(r"_(\d+)_(\d+)\.bin\.gz$") def import_timeseries( - api_host, api2_host, session_manager, workflow_instance_id, file_directory + config, + session_manager, ): - # gather all the time series files from the output directory - timeseries_data_files = [] - timeseries_channel_files = [] + """Top-level entry point. Dispatches to legacy or asset-aware flow.""" + if config.LEGACY_IMPORT_FLOW: + log.info("LEGACY_IMPORT_FLOW=true; using import-manifest flow") + return import_timeseries_legacy( + config.API_HOST, + config.API_HOST2, + session_manager, + config.WORKFLOW_INSTANCE_ID, + config.OUTPUT_DIR, + ) + log.info("using viewer-asset flow") + return import_timeseries_via_assets( + config.API_HOST, + config.API_HOST2, + session_manager, + config.WORKFLOW_INSTANCE_ID, + config.OUTPUT_DIR, + config.ASSET_NAME, + ) - for root, _, files in os.walk(file_directory): - for file in files: - if file.endswith(TIME_SERIES_METADATA_FILE_EXTENSION): - timeseries_channel_files.append(os.path.join(root, file)) - elif file.endswith(TIME_SERIES_BINARY_FILE_EXTENSION): - timeseries_data_files.append(os.path.join(root, file)) - if len(timeseries_channel_files) == 0 or len(timeseries_data_files) == 0: +# --------------------------------------------------------------------------- +# New flow: viewer_asset + STS upload + timeseries-service POST /ranges +# --------------------------------------------------------------------------- + + +def import_timeseries_via_assets( + api_host, + api2_host, + session_manager, + workflow_instance_id, + file_directory, + asset_name, + asset_type: str = "timeseries", +): + """Asset-aware ingest. + + 1. Resolve target package from the workflow. + 2. Find or create a viewer_asset linked to all workflow packages. + 3. Create channels with viewer_asset_id set. + 4. Rename chunk files to use channel node ids (matching the legacy + naming convention). + 5. Upload chunks to S3 using the asset's STS credentials. + 6. Register ranges via timeseries-service. + 7. Mark the asset 'ready'. + 8. On any failure, delete the asset (cleanup-queue lambda purges S3). + """ + timeseries_data_files, timeseries_channel_files = _collect_timeseries_files(file_directory) + if not timeseries_data_files or not timeseries_channel_files: + log.info("no time series channels or data") + return None + + workflow_client = WorkflowClient(api2_host, session_manager) + workflow_instance = workflow_client.get_workflow_instance(workflow_instance_id) + + packages_client = PackagesClient(api_host, session_manager) + target_package_id = determine_target_package(packages_client, workflow_instance.package_ids) + if not target_package_id: + log.error( + "dataset_id=%s could not determine target time series package", + workflow_instance.dataset_id, + ) + return None + + packages_client.set_timeseries_properties(target_package_id) + log.info("updated package %s with time series properties", target_package_id) + + log.info( + "dataset_id=%s target_package_id=%s asset_name=%s starting asset-flow ingest", + workflow_instance.dataset_id, + target_package_id, + asset_name, + ) + + assets_client = PackagesAssetsClient(api2_host, session_manager) + + asset, upload_credentials = _find_or_create_asset( + assets_client, + dataset_id=workflow_instance.dataset_id, + package_ids=workflow_instance.package_ids, + asset_name=asset_name, + asset_type=asset_type, + ) + + # upload_credentials is None when an already-ready asset was found; + # treat as a successful no-op so workflow re-runs are idempotent. + if upload_credentials is None: + return asset.id + + created_channel_node_ids: list[str] = [] + timeseries_client = TimeSeriesClient(api_host, session_manager) + + try: + + existing_channels = timeseries_client.get_package_channels(target_package_id) + + channels_by_index, created_channel_node_ids = _create_or_resolve_channels( + timeseries_client, + target_package_id, + timeseries_channel_files, + existing_channels, + viewer_asset_id=asset.id, + ) + if not channels_by_index: + raise RuntimeError( + "no channels were resolved from staged metadata files; refusing to mark asset ready with empty data" + ) + + # Rename data files to use channel node ids in their basenames + # (matching the legacy naming convention so timeseries.ranges.location + # aligns with what streaming expects to fetch from S3). + renamed_data_files = _rename_data_files_to_node_ids(timeseries_data_files, channels_by_index) + if not renamed_data_files: + raise RuntimeError( + "no chunk files were resolved from the output directory; refusing to mark asset ready with empty data" + ) + + # Upload to S3 using the STS creds returned by create_asset. + uploader = AssetUploader(upload_credentials) + uploads = uploader.upload_files( + [(local_path, relative_key_for_chunk(local_path)) for local_path in renamed_data_files] + ) + log.info("uploaded %d chunk files for asset %s", len(uploads), asset.id) + + # Register ranges. Build chunks from filenames + channel map. + ranges_client = TimeSeriesRangesClient(api2_host, session_manager) + chunks = _build_range_chunks(uploads, channels_by_index) + result = ranges_client.create_ranges_batched(target_package_id, asset.id, chunks) + log.info( + "registered ranges for asset %s: requested=%d created=%d skipped=%d", + asset.id, + result.requested, + result.created, + result.skipped, + ) + + # status='ready' gates re-run idempotency; let failures propagate + # so cleanup runs and the next attempt starts fresh. + assets_client.update_asset(asset.id, dataset_id=workflow_instance.dataset_id, status="ready") + + except Exception as e: + log.error("asset-flow ingest failed for asset %s: %s", asset.id, e) + # Delete channels before the asset: viewer_asset_id has no FK, so + # the asset delete alone would orphan them and break the next run. + for channel_node_id in created_channel_node_ids: + try: + timeseries_client.delete_channel(target_package_id, channel_node_id) + log.info("deleted channel %s during cleanup", channel_node_id) + except Exception as channel_cleanup_err: + log.error( + "failed to delete channel %s during cleanup: %s", + channel_node_id, + channel_cleanup_err, + ) + + # Now delete the asset row → triggers the S3 cleanup queue. + try: + assets_client.delete_asset(asset.id, dataset_id=workflow_instance.dataset_id) + log.info("queued asset %s for cleanup", asset.id) + except Exception as cleanup_err: + log.error( + "failed to delete failed asset %s — will need manual cleanup: %s", + asset.id, + cleanup_err, + ) + raise + + return asset.id + + +def _find_or_create_asset( + assets_client: PackagesAssetsClient, + dataset_id: str, + package_ids: list[str], + asset_name: str, + asset_type: str, +): + """Find an existing asset for this workflow or create a new one. + + Returns (asset, upload_credentials). upload_credentials is None when + a ready asset already exists (caller skips ingest); otherwise the + asset was just created and the creds are usable for upload. + Non-ready existing assets are deleted and recreated. + """ + match = _find_asset_by_workflow_packages(assets_client, dataset_id, package_ids, asset_name, asset_type) + + if match is not None and match.status == "ready": + log.info( + "asset %s already ready for workflow packages %s; idempotent re-run, skipping ingest", + match.id, + package_ids, + ) + return match, None + + if match is not None: + log.info( + "asset %s in status %r for workflow packages %s; assuming prior run failed, deleting and recreating", + match.id, + match.status, + package_ids, + ) + assets_client.delete_asset(match.id, dataset_id) + + log.info( + "creating new asset %s/%s for dataset %s linking %d package(s)", + asset_name, + asset_type, + dataset_id, + len(package_ids), + ) + created: CreatedAsset = assets_client.create_asset( + dataset_id=dataset_id, + package_ids=package_ids, + name=asset_name, + asset_type=asset_type, + ) + return created.asset, created.upload_credentials + + +def _find_asset_by_workflow_packages( + assets_client: PackagesAssetsClient, + dataset_id: str, + package_ids: list[str], + asset_name: str, + asset_type: str, +): + """Search each workflow package for an asset matching name+type. + + Stops on first hit. Returns None if no package surfaces a match. + """ + for package_id in package_ids: + existing = assets_client.list_assets_for_package(dataset_id, package_id) + match = next( + (asset for asset in existing if asset.name == asset_name and asset.asset_type == asset_type), + None, + ) + if match is not None: + return match + return None + + +def _create_or_resolve_channels( + timeseries_client: TimeSeriesClient, + package_id: str, + timeseries_channel_files: list[str], + existing_channels: list[TimeSeriesChannel], + viewer_asset_id: str, +) -> tuple[dict[str, TimeSeriesChannel], list[str]]: + """Resolve a TimeSeriesChannel for each channel metadata file. + + Reuses an existing channel when name/type/rate match; otherwise + creates one with viewer_asset_id set. + + Returns (channels_by_index, created_channel_node_ids). The second + list holds only node ids created by this run, so failure cleanup + can delete them without touching reused channels. + """ + channels: dict[str, TimeSeriesChannel] = {} + created_channel_node_ids: list[str] = [] + for file_path in timeseries_channel_files: + match = _CHANNEL_INDEX_PATTERN.search(os.path.basename(file_path)) + if match is None: + raise RuntimeError(f"channel metadata filename does not match expected channel-NNNNN pattern: {file_path}") + channel_index = match.group(1) + + with open(file_path, "r") as f: + local_channel = TimeSeriesChannel.from_dict(json.load(f)) + local_channel.viewer_asset_id = viewer_asset_id + + existing = next((ec for ec in existing_channels if ec == local_channel), None) + if existing is not None: + if existing.viewer_asset_id != viewer_asset_id: + raise RuntimeError( + f"channel {existing.id} ({existing.name}) on package " + f"{package_id} is linked to viewer_asset_id=" + f"{existing.viewer_asset_id!r} but the current ingest " + f"expects {viewer_asset_id!r}. Resolve manually before " + "re-running." + ) + log.info( + "package_id=%s channel_id=%s reusing existing channel: %s", + package_id, + existing.id, + existing.name, + ) + channel = existing + else: + channel = timeseries_client.create_channel(package_id, local_channel) + created_channel_node_ids.append(channel.id) + log.info( + "package_id=%s channel_id=%s created new channel: %s", + package_id, + channel.id, + channel.name, + ) + + channel.index = channel_index + channels[channel_index] = channel + + return channels, created_channel_node_ids + + +def _rename_data_files_to_node_ids( + timeseries_data_files: list[str], + channels_by_index: dict[str, TimeSeriesChannel], +) -> list[str]: + """Rename chunk binary files in place: channel-{idx} → {channel.node_id}. + + Mirrors the legacy importer's substitution. Returns the list of new + paths (original list is invalidated). + """ + renamed: list[str] = [] + for file_path in timeseries_data_files: + match = _CHANNEL_INDEX_PATTERN.search(os.path.basename(file_path)) + if match is None: + raise RuntimeError(f"chunk filename does not match expected channel-NNNNN_*_*.bin.gz pattern: {file_path}") + channel_index = match.group(1) + channel = channels_by_index.get(channel_index) + if channel is None: + raise RuntimeError( + f"chunk file {file_path} references channel index " + f"{channel_index!r} for which no channel metadata was resolved; " + "every chunk must map to a known channel" + ) + + new_basename = re.sub(_CHANNEL_INDEX_PATTERN, channel.id, os.path.basename(file_path)) + new_path = os.path.join(os.path.dirname(file_path), new_basename) + os.rename(file_path, new_path) + renamed.append(new_path) + return renamed + + +def _build_range_chunks( + uploads, + channels_by_index: dict[str, TimeSeriesChannel], +) -> list[RangeChunk]: + """Convert each upload result into a RangeChunk for POST /ranges. + + The basename embeds {channel.node_id}_{start_us}_{end_us}.bin.gz — + we pull start/end from the filename and look up the channel by + matching the node_id prefix. + """ + # Build a lookup from channel.id (node id) to channel object. + channels_by_node_id = {ch.id: ch for ch in channels_by_index.values()} + chunks: list[RangeChunk] = [] + for upload in uploads: + basename = upload.relative_key + # Basenames are node-id-prefixed at this point (post-rename). + ts_match = _CHUNK_TIMESTAMP_PATTERN.search(basename) + if ts_match is None: + raise ValueError(f"chunk filename does not contain start/end timestamps: {basename}") + start = int(ts_match.group(1)) + end = int(ts_match.group(2)) + + node_id = basename[: ts_match.start()] + channel = channels_by_node_id.get(node_id) + if channel is None: + raise ValueError(f"chunk basename {basename} has no matching channel (node_id={node_id})") + + chunks.append( + RangeChunk( + channel_node_id=channel.id, + start=start, + end=end, + s3_key=basename, + ) + ) + return chunks + + +# --------------------------------------------------------------------------- +# Legacy flow: kept for rollback via LEGACY_IMPORT_FLOW=true. +# --------------------------------------------------------------------------- + + +def import_timeseries_legacy( + api_host, + api2_host, + session_manager, + workflow_instance_id, + file_directory, +): + """Original pre-viewer-asset flow: import-manifest + S3 staging bucket. + + Preserved verbatim for rollback. New ingests should not hit this path. + """ + # gather all the time series files from the output directory + timeseries_data_files, timeseries_channel_files = _collect_timeseries_files(file_directory) + if not timeseries_channel_files or not timeseries_data_files: log.info("no time series channels or data") return None @@ -58,7 +454,7 @@ def import_timeseries( packages_client = PackagesClient(api_host, session_manager) package_id = determine_target_package(packages_client, workflow_instance.package_ids) if not package_id: - log.error("dataset_id={workflow_instance.dataset_id} could not determine target time series package") + log.error("dataset_id=%s could not determine target time series package", workflow_instance.dataset_id) return None packages_client.set_timeseries_properties(package_id) @@ -66,15 +462,13 @@ def import_timeseries( log.info(f"dataset_id={workflow_instance.dataset_id} package_id={package_id} starting import of time series files") - # used to strip the channel index (intra-processor channel identifier) off both data and metadata time series files - channel_index_pattern = re.compile(r"(channel-\d+)") - timeseries_client = TimeSeriesClient(api_host, session_manager) existing_channels = timeseries_client.get_package_channels(package_id) + # used to strip the channel index (intra-processor channel identifier) off both data and metadata time series files channels = {} for file_path in timeseries_channel_files: - channel_index = channel_index_pattern.search(os.path.basename(file_path)).group(1) + channel_index = _CHANNEL_INDEX_PATTERN.search(os.path.basename(file_path)).group(1) with open(file_path, "r") as file: local_channel = TimeSeriesChannel.from_dict(json.load(file)) @@ -96,11 +490,11 @@ def import_timeseries( # => N:channel:c957d73f-84ca-41d9-83b0-d23c2000a6e6_1549968912000000_1549968926998750.bin.gz import_files = [] for file_path in timeseries_data_files: - channel_index = channel_index_pattern.search(os.path.basename(file_path)).group(1) + channel_index = _CHANNEL_INDEX_PATTERN.search(os.path.basename(file_path)).group(1) channel = channels[channel_index] import_file = ImportFile( upload_key=uuid.uuid4(), - file_path=re.sub(channel_index_pattern, channel.id, os.path.basename(file_path)), + file_path=re.sub(_CHANNEL_INDEX_PATTERN, channel.id, os.path.basename(file_path)), local_path=file_path, ) import_files.append(import_file) @@ -131,7 +525,7 @@ def upload_timeseries_file(timeseries_file): ) with open(timeseries_file.local_path, "rb") as f: response = requests.put(upload_url, data=f) - response.raise_for_status() # raise an error if the request failed + response.raise_for_status() return True except Exception as e: with upload_counter_lock: @@ -152,6 +546,25 @@ def upload_timeseries_file(timeseries_file): assert sum(successful_uploads) == len(import_files), "Failed to upload all time series files" +# --------------------------------------------------------------------------- +# Shared helpers +# --------------------------------------------------------------------------- + + +def _collect_timeseries_files(file_directory): + timeseries_data_files = [] + timeseries_channel_files = [] + + for root, _, files in os.walk(file_directory): + for file in files: + if file.endswith(TIME_SERIES_METADATA_FILE_EXTENSION): + timeseries_channel_files.append(os.path.join(root, file)) + elif file.endswith(TIME_SERIES_BINARY_FILE_EXTENSION): + timeseries_data_files.append(os.path.join(root, file)) + + return timeseries_data_files, timeseries_channel_files + + def determine_target_package(packages_client: PackagesClient, package_ids: list[str]) -> Optional[str]: """ Determine which package should receive the time series data and properties. diff --git a/processor/main.py b/processor/main.py index 3865c0f..81713f7 100644 --- a/processor/main.py +++ b/processor/main.py @@ -57,10 +57,4 @@ session_manager = SessionManager(auth_provider) - import_timeseries( - config.API_HOST, - config.API_HOST2, - session_manager, - config.WORKFLOW_INSTANCE_ID, - config.OUTPUT_DIR, - ) + import_timeseries(config, session_manager) diff --git a/processor/reader.py b/processor/reader.py index 061b424..a2fe75a 100644 --- a/processor/reader.py +++ b/processor/reader.py @@ -40,9 +40,9 @@ def __init__(self, electrical_series, session_start_time): self.num_samples, self.num_channels = self.electrical_series.data.shape assert self.num_samples > 0, "Electrical series has no sample data" - assert ( - len(self.electrical_series.electrodes.table) == self.num_channels - ), "Electrode channels do not align with data shape" + assert len(self.electrical_series.electrodes.table) == self.num_channels, ( + "Electrode channels do not align with data shape" + ) log.info(f"NWB file has {self.num_samples} samples") self._sampling_rate = None @@ -51,9 +51,9 @@ def __init__(self, electrical_series, session_start_time): if self.has_explicit_timestamps: log.info("NWB file has explicit timestamps") - assert self.num_samples == len( - self.electrical_series.timestamps - ), "Differing number of sample and timestamp value" + assert self.num_samples == len(self.electrical_series.timestamps), ( + "Differing number of sample and timestamp value" + ) else: log.info("NWB file has implicit timestamps") diff --git a/processor/timeseries_channel.py b/processor/timeseries_channel.py index ff72fa1..3386dbb 100644 --- a/processor/timeseries_channel.py +++ b/processor/timeseries_channel.py @@ -14,6 +14,7 @@ def __init__( last_annotation=0, properties=None, id=None, + viewer_asset_id=None, ): if properties is None: properties = [] @@ -33,6 +34,7 @@ def __init__( self.group = group.strip() self.last_annotation = last_annotation self.properties = properties + self.viewer_asset_id = viewer_asset_id def as_dict(self): resp = { @@ -50,6 +52,9 @@ def as_dict(self): if self.id is not None: resp["id"] = self.id + if self.viewer_asset_id is not None: + resp["viewerAssetId"] = str(self.viewer_asset_id) + return resp @staticmethod @@ -65,6 +70,7 @@ def from_dict(channel, properties=None): last_annotation=int(channel.get("lastAnnotation", 0)), properties=channel.get("properties", properties), id=channel.get("id"), + viewer_asset_id=channel.get("viewerAssetId"), index=-1, ) diff --git a/scripts/generate_test_nwb.py b/scripts/generate_test_nwb.py index 502a0ba..3c8c170 100644 --- a/scripts/generate_test_nwb.py +++ b/scripts/generate_test_nwb.py @@ -104,7 +104,7 @@ def create_nwb_file( print("Generating NWB file with:") print(f" Target size: {target_size_bytes / (1024**2):.2f} MB") print(f" Samples: {num_samples:,}") - print(f" Duration: {duration_seconds:.2f} seconds ({duration_seconds/3600:.2f} hours)") + print(f" Duration: {duration_seconds:.2f} seconds ({duration_seconds / 3600:.2f} hours)") print(f" Sampling rate: {sampling_rate} Hz") print(f" Channel 1 frequency: {freq1} Hz") print(f" Channel 2 frequency: {freq2} Hz") @@ -273,7 +273,7 @@ def main(): print(f" Path: {result['output_path']}") print(f" Channels: {result['num_channels']}") print(f" Samples per channel: {result['num_samples']:,}") - print(f" Duration: {result['duration_seconds']:.2f}s ({result['duration_seconds']/3600:.2f}h)") + print(f" Duration: {result['duration_seconds']:.2f}s ({result['duration_seconds'] / 3600:.2f}h)") print(f" Sampling rate: {result['sampling_rate']} Hz") print(f" Channel frequencies: {result['channel_frequencies']} Hz") diff --git a/tests/test_asset_uploader.py b/tests/test_asset_uploader.py new file mode 100644 index 0000000..d8cbdb3 --- /dev/null +++ b/tests/test_asset_uploader.py @@ -0,0 +1,137 @@ +from unittest.mock import MagicMock, patch + +import botocore.exceptions +import pytest +from asset_uploader import AssetUploader, ChunkUploadResult, relative_key_for_chunk +from clients.packages_assets_client import UploadCredentials + + +@pytest.fixture +def credentials(): + return UploadCredentials( + access_key_id="AKIATEST", + secret_access_key="SECRET", + session_token="TOKEN", + expiration="2026-04-30T13:00:00Z", + bucket="pennsieve-viewer-assets", + region="us-east-1", + key_prefix="viewer-assets/O19/D2049/asset-uuid/", + ) + + +class TestRelativeKeyForChunk: + def test_strips_directory(self): + assert relative_key_for_chunk("/data/output/N:channel:abc_0_1000.bin.gz") == "N:channel:abc_0_1000.bin.gz" + + def test_no_directory_passthrough(self): + assert relative_key_for_chunk("foo.bin.gz") == "foo.bin.gz" + + +class TestAssetUploaderClientConstruction: + @patch("asset_uploader.boto3.client") + def test_build_client_uses_credentials(self, mock_boto, credentials, tmp_path): + # Trigger _build_client via upload_files (which is the only public path) + local = tmp_path / "f.bin.gz" + local.write_bytes(b"x") + uploader = AssetUploader(credentials, max_workers=1) + uploader.upload_files([(str(local), "f.bin.gz")]) + + mock_boto.assert_called_once_with( + "s3", + aws_access_key_id="AKIATEST", + aws_secret_access_key="SECRET", + aws_session_token="TOKEN", + region_name="us-east-1", + ) + + +class TestAssetUploaderUploadFiles: + @patch("asset_uploader.boto3.client") + def test_uploads_each_file_with_prefixed_key(self, mock_boto, credentials, tmp_path): + s3 = MagicMock() + mock_boto.return_value = s3 + + files = [] + for i in range(3): + p = tmp_path / f"chunk_{i}.bin.gz" + p.write_bytes(b"x") + files.append((str(p), f"chunk_{i}.bin.gz")) + + uploader = AssetUploader(credentials, max_workers=1) + results = uploader.upload_files(files) + + # Each file's key is the credentials.key_prefix + relative_key + expected_calls = [ + ( + ( + str(tmp_path / "chunk_0.bin.gz"), + "pennsieve-viewer-assets", + "viewer-assets/O19/D2049/asset-uuid/chunk_0.bin.gz", + ), + ), + ( + ( + str(tmp_path / "chunk_1.bin.gz"), + "pennsieve-viewer-assets", + "viewer-assets/O19/D2049/asset-uuid/chunk_1.bin.gz", + ), + ), + ( + ( + str(tmp_path / "chunk_2.bin.gz"), + "pennsieve-viewer-assets", + "viewer-assets/O19/D2049/asset-uuid/chunk_2.bin.gz", + ), + ), + ] + # Order may be parallel-shuffled with workers > 1; verify set equality + assert s3.upload_file.call_count == 3 + actual_args_set = {c.args for c in s3.upload_file.call_args_list} + expected_args_set = {ec[0] for ec in expected_calls} + assert actual_args_set == expected_args_set + + # Results preserve input order regardless of upload completion order + assert [r.relative_key for r in results] == [ + "chunk_0.bin.gz", + "chunk_1.bin.gz", + "chunk_2.bin.gz", + ] + assert all(r.full_key.startswith("viewer-assets/O19/D2049/asset-uuid/") for r in results) + + @patch("asset_uploader.boto3.client") + def test_returns_chunk_upload_result_per_file(self, mock_boto, credentials, tmp_path): + mock_boto.return_value = MagicMock() + local = tmp_path / "f.bin.gz" + local.write_bytes(b"x") + + uploader = AssetUploader(credentials, max_workers=1) + results = uploader.upload_files([(str(local), "rel.bin.gz")]) + + assert len(results) == 1 + assert isinstance(results[0], ChunkUploadResult) + assert results[0].relative_key == "rel.bin.gz" + assert results[0].full_key == "viewer-assets/O19/D2049/asset-uuid/rel.bin.gz" + + @patch("asset_uploader.boto3.client") + def test_empty_input_does_not_upload(self, mock_boto, credentials): + s3 = MagicMock() + mock_boto.return_value = s3 + uploader = AssetUploader(credentials, max_workers=1) + assert uploader.upload_files([]) == [] + s3.upload_file.assert_not_called() + + @patch("asset_uploader.boto3.client") + def test_propagates_client_error(self, mock_boto, credentials, tmp_path): + s3 = MagicMock() + s3.upload_file.side_effect = botocore.exceptions.ClientError( + {"Error": {"Code": "ExpiredToken", "Message": "creds expired"}}, + "PutObject", + ) + mock_boto.return_value = s3 + + local = tmp_path / "f.bin.gz" + local.write_bytes(b"x") + uploader = AssetUploader(credentials, max_workers=1) + + with pytest.raises(botocore.exceptions.ClientError): + uploader.upload_files([(str(local), "f.bin.gz")]) diff --git a/tests/test_config.py b/tests/test_config.py index 324d8c3..ba1354c 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -258,3 +258,53 @@ def test_chunk_size_conversion_to_int(self, tmp_path): config = Config() assert config.CHUNK_SIZE_MB == 10 assert isinstance(config.CHUNK_SIZE_MB, int) + + +class TestConfigAssetFlow: + """Tests for the viewer-asset-flow config attributes.""" + + def test_asset_name_default(self, tmp_path): + """ASSET_NAME defaults to 'timeseries-asset' when not set.""" + env_vars = { + "ENVIRONMENT": "local", + "INPUT_DIR": str(tmp_path), + "OUTPUT_DIR": str(tmp_path), + } + with patch.dict(os.environ, env_vars, clear=True): + config = Config() + assert config.ASSET_NAME == "timeseries-asset" + + def test_asset_name_override(self, tmp_path): + """ASSET_NAME picks up the env var when set.""" + env_vars = { + "ENVIRONMENT": "local", + "INPUT_DIR": str(tmp_path), + "OUTPUT_DIR": str(tmp_path), + "ASSET_NAME": "mef-asset", + } + with patch.dict(os.environ, env_vars, clear=True): + config = Config() + assert config.ASSET_NAME == "mef-asset" + + def test_legacy_import_flow_default_false(self, tmp_path): + """LEGACY_IMPORT_FLOW defaults to false (use new viewer-asset path).""" + env_vars = { + "ENVIRONMENT": "local", + "INPUT_DIR": str(tmp_path), + "OUTPUT_DIR": str(tmp_path), + } + with patch.dict(os.environ, env_vars, clear=True): + config = Config() + assert config.LEGACY_IMPORT_FLOW is False + + def test_legacy_import_flow_opt_in(self, tmp_path): + """LEGACY_IMPORT_FLOW=true switches back to the import-manifest path.""" + env_vars = { + "ENVIRONMENT": "local", + "INPUT_DIR": str(tmp_path), + "OUTPUT_DIR": str(tmp_path), + "LEGACY_IMPORT_FLOW": "true", + } + with patch.dict(os.environ, env_vars, clear=True): + config = Config() + assert config.LEGACY_IMPORT_FLOW is True diff --git a/tests/test_importer_asset_flow.py b/tests/test_importer_asset_flow.py new file mode 100644 index 0000000..e089cc3 --- /dev/null +++ b/tests/test_importer_asset_flow.py @@ -0,0 +1,829 @@ +"""Integration-style tests for import_timeseries_via_assets. + +Exercises the full orchestration end-to-end with all four external +services mocked: workflows, packages-service (regular + assets), +pennsieve-api timeseries channels, timeseries-service ranges, and S3. +""" + +import gzip +import json +import os +from unittest.mock import MagicMock, patch +from urllib.parse import urlsplit + +import pytest +import responses +from clients.base_client import SessionManager +from importer import import_timeseries_via_assets + + +def _path_only(url: str) -> str: + """Strip query string for path-based assertions.""" + return urlsplit(url).path + + +# ---- canonical responses -------------------------------------------------- + + +WORKFLOW_INSTANCE_ID = "wf-123" +DATASET_NODE_ID = "N:dataset:abc" +PACKAGE_NODE_ID = "N:package:p1" +PARENT_NODE_ID = "N:collection:parent" +CHANNEL_NODE_ID = "N:channel:ch1" +ASSET_ID = "00000000-0000-0000-0000-000000000001" + +API_HOST = "https://api.test" +API_HOST2 = "https://api2.test" + + +def _workflow_response(): + return { + "uuid": WORKFLOW_INSTANCE_ID, + "datasetId": DATASET_NODE_ID, + "dataSources": {"src": {"packageIds": [PACKAGE_NODE_ID]}}, + } + + +def _channel_create_response(): + return { + "content": { + "id": CHANNEL_NODE_ID, + "name": "ch-0", + "start": 0, + "end": 1000, + "unit": "uV", + "rate": 1000.0, + "channelType": "CONTINUOUS", + "group": "default", + "lastAnnotation": 0, + }, + "properties": [], + } + + +def _asset_create_response(status="created"): + return { + "asset": { + "id": ASSET_ID, + "dataset_id": DATASET_NODE_ID, + "name": "mef-asset", + "asset_type": "timeseries", + "asset_url": "", + "properties": {}, + "status": status, + "package_ids": [PACKAGE_NODE_ID], + "created_at": "2026-04-30T12:00:00Z", + }, + "upload_credentials": { + "access_key_id": "AKIA", + "secret_access_key": "SECRET", + "session_token": "TOKEN", + "expiration": "2026-04-30T13:00:00Z", + "bucket": "pennsieve-viewer-assets", + "region": "us-east-1", + "key_prefix": f"viewer-assets/O19/D2049/{ASSET_ID}/", + }, + } + + +# ---- fixtures ------------------------------------------------------------- + + +@pytest.fixture +def session_manager(): + auth = MagicMock() + auth.get_session_token.return_value = "test-token" + return SessionManager(auth) + + +@pytest.fixture +def staged_files(tmp_path): + """Create a single channel's staged files (.bin.gz + .metadata.json).""" + output = tmp_path / "output" + output.mkdir() + + # Channel metadata file (matches what writer.py emits) + meta = { + "name": "ch-0", + "start": 0, + "end": 1000, + "unit": "uV", + "rate": 1000.0, + "type": "CONTINUOUS", + "group": "default", + "lastAnnotation": 0, + "properties": [], + } + (output / "channel-00000.metadata.json").write_text(json.dumps(meta)) + + # Chunk binary file (named with channel-{idx} prefix; importer renames it) + chunk = output / "channel-00000_0_1000.bin.gz" + with gzip.open(chunk, "wb") as f: + f.write(b"\x00" * 8) + return str(output) + + +@pytest.fixture +def all_responses(): + """Wires up the canonical happy-path response set across all services.""" + rsps = responses.RequestsMock() + rsps.start() + yield rsps + rsps.stop() + rsps.reset() + + +def _wire_happy_path(rsps, *, list_assets_returns=None, asset_status_after_patch="ready"): + """Register all happy-path responses against the given RequestsMock.""" + # WorkflowClient + rsps.add( + responses.GET, + f"{API_HOST2}/compute/workflows/runs/{WORKFLOW_INSTANCE_ID}", + json=_workflow_response(), + status=200, + ) + + # PackagesClient.set_timeseries_properties → PUT /packages/{id}?updateStorage=true + rsps.add( + responses.PUT, + f"{API_HOST}/packages/{PACKAGE_NODE_ID}", + json={}, + status=200, + ) + + # PackagesAssetsClient.list_assets_for_package → GET /packages/assets + rsps.add( + responses.GET, + f"{API_HOST2}/packages/assets", + json={"assets": list_assets_returns or []}, + status=200, + ) + + # PackagesAssetsClient.create_asset → POST /packages/assets + rsps.add( + responses.POST, + f"{API_HOST2}/packages/assets", + json=_asset_create_response(), + status=201, + ) + + # TimeSeriesClient.get_package_channels → GET /timeseries/{pkg}/channels + rsps.add( + responses.GET, + f"{API_HOST}/timeseries/{PACKAGE_NODE_ID}/channels", + json=[], + status=200, + ) + + # TimeSeriesClient.create_channel → POST /timeseries/{pkg}/channels + rsps.add( + responses.POST, + f"{API_HOST}/timeseries/{PACKAGE_NODE_ID}/channels", + json=_channel_create_response(), + status=201, + ) + + # TimeSeriesRangesClient.create_ranges → POST /timeseries/package/{pkg}/ranges + rsps.add( + responses.POST, + f"{API_HOST2}/timeseries/package/{PACKAGE_NODE_ID}/ranges", + json={"requested": 1, "created": 1, "skipped": 0}, + status=201, + ) + + # PackagesAssetsClient.update_asset → PATCH /packages/assets/{id} + rsps.add( + responses.PATCH, + f"{API_HOST2}/packages/assets/{ASSET_ID}", + json={ + "id": ASSET_ID, + "dataset_id": DATASET_NODE_ID, + "name": "mef-asset", + "asset_type": "timeseries", + "status": asset_status_after_patch, + "package_ids": [PACKAGE_NODE_ID], + "created_at": "2026-04-30T12:00:00Z", + }, + status=200, + ) + + +# ---- tests ---------------------------------------------------------------- + + +class TestHappyPath: + @patch("asset_uploader.boto3.client") + def test_full_flow(self, mock_boto, session_manager, staged_files, all_responses): + s3 = MagicMock() + mock_boto.return_value = s3 + _wire_happy_path(all_responses) + + result = import_timeseries_via_assets( + api_host=API_HOST, + api2_host=API_HOST2, + session_manager=session_manager, + workflow_instance_id=WORKFLOW_INSTANCE_ID, + file_directory=staged_files, + asset_name="mef-asset", + asset_type="timeseries", + ) + + assert result == ASSET_ID + # S3 upload happened with the asset's prefix + s3.upload_file.assert_called_once() + call = s3.upload_file.call_args + # boto3 upload_file(local_path, bucket, key) + local_path, bucket, key = call.args + assert bucket == "pennsieve-viewer-assets" + assert key.startswith(f"viewer-assets/O19/D2049/{ASSET_ID}/") + # Local file was renamed to use the channel node id + assert os.path.basename(local_path).startswith(CHANNEL_NODE_ID) + + +class TestIdempotentSkip: + @patch("asset_uploader.boto3.client") + def test_ready_asset_short_circuits(self, mock_boto, session_manager, staged_files): + """When list_assets returns an existing 'ready' asset, the function + returns its id without uploading or registering ranges.""" + rsps = responses.RequestsMock() + rsps.start() + try: + # Workflow + properties update still run + rsps.add( + responses.GET, + f"{API_HOST2}/compute/workflows/runs/{WORKFLOW_INSTANCE_ID}", + json=_workflow_response(), + status=200, + ) + rsps.add( + responses.PUT, + f"{API_HOST}/packages/{PACKAGE_NODE_ID}", + json={}, + status=200, + ) + + # List assets returns one already-ready asset matching name+type + rsps.add( + responses.GET, + f"{API_HOST2}/packages/assets", + json={ + "assets": [ + { + "id": ASSET_ID, + "dataset_id": DATASET_NODE_ID, + "name": "mef-asset", + "asset_type": "timeseries", + "asset_url": "", + "properties": {}, + "status": "ready", + "package_ids": [PACKAGE_NODE_ID], + "created_at": "2026-04-30T12:00:00Z", + } + ] + }, + status=200, + ) + + result = import_timeseries_via_assets( + api_host=API_HOST, + api2_host=API_HOST2, + session_manager=session_manager, + workflow_instance_id=WORKFLOW_INSTANCE_ID, + file_directory=staged_files, + asset_name="mef-asset", + asset_type="timeseries", + ) + + assert result == ASSET_ID + # No POST /assets, no upload, no ranges call + mock_boto.assert_not_called() + finally: + rsps.stop() + rsps.reset() + + +class TestStaleAssetReplaced: + @patch("asset_uploader.boto3.client") + def test_non_active_asset_is_deleted_and_recreated(self, mock_boto, session_manager, staged_files): + """Existing asset in any non-active status → deleted and recreated.""" + s3 = MagicMock() + mock_boto.return_value = s3 + rsps = responses.RequestsMock() + rsps.start() + try: + stale_asset_id = "stale-uuid" + + _wire_happy_path( + rsps, + list_assets_returns=[ + { + "id": stale_asset_id, + "dataset_id": DATASET_NODE_ID, + "name": "mef-asset", + "asset_type": "timeseries", + "asset_url": "", + "properties": {}, + "status": "created", # ← prior failed run + "package_ids": [PACKAGE_NODE_ID], + "created_at": "2026-04-30T11:00:00Z", + } + ], + ) + # DELETE on the stale asset + rsps.add( + responses.DELETE, + f"{API_HOST2}/packages/assets/{stale_asset_id}", + status=204, + ) + + result = import_timeseries_via_assets( + api_host=API_HOST, + api2_host=API_HOST2, + session_manager=session_manager, + workflow_instance_id=WORKFLOW_INSTANCE_ID, + file_directory=staged_files, + asset_name="mef-asset", + asset_type="timeseries", + ) + + assert result == ASSET_ID # the freshly created one + # Confirm DELETE for stale id and POST for new one both happened + assert any(c.request.method == "DELETE" and stale_asset_id in c.request.url for c in rsps.calls) + assert any( + c.request.method == "POST" and _path_only(c.request.url) == "/packages/assets" for c in rsps.calls + ) + finally: + rsps.stop() + rsps.reset() + + +class TestFailureCleanup: + @patch("asset_uploader.boto3.client") + def test_upload_failure_deletes_channel_then_asset(self, mock_boto, session_manager, staged_files): + """If upload fails *after* channel creation, the cleanup path + must delete the just-created channel(s) AND the asset, in that + order. Otherwise the channels survive with viewer_asset_id + pointing at the now-deleted asset, breaking the next re-run.""" + import botocore.exceptions + + s3 = MagicMock() + s3.upload_file.side_effect = botocore.exceptions.ClientError( + {"Error": {"Code": "ExpiredToken", "Message": "expired"}}, + "PutObject", + ) + mock_boto.return_value = s3 + + # Upload fails after channel create; ranges/patch responses get + # registered by the happy-path setup but never fire. + rsps = responses.RequestsMock(assert_all_requests_are_fired=False) + rsps.start() + try: + _wire_happy_path(rsps) + # DELETE for the channel created during this run + rsps.add( + responses.DELETE, + f"{API_HOST}/timeseries/{PACKAGE_NODE_ID}/channels/{CHANNEL_NODE_ID}", + status=204, + ) + # DELETE for the asset cleanup + rsps.add( + responses.DELETE, + f"{API_HOST2}/packages/assets/{ASSET_ID}", + status=204, + ) + + with pytest.raises(botocore.exceptions.ClientError): + import_timeseries_via_assets( + api_host=API_HOST, + api2_host=API_HOST2, + session_manager=session_manager, + workflow_instance_id=WORKFLOW_INSTANCE_ID, + file_directory=staged_files, + asset_name="mef-asset", + asset_type="timeseries", + ) + + # Both DELETEs must have fired, in order: channel first, asset second. + delete_calls = [c for c in rsps.calls if c.request.method == "DELETE"] + assert len(delete_calls) == 2 + assert CHANNEL_NODE_ID in delete_calls[0].request.url + assert ASSET_ID in delete_calls[1].request.url + finally: + rsps.stop() + rsps.reset() + + def test_reused_channels_are_not_deleted_on_cleanup(self, session_manager, staged_files): + """If we reused an existing channel (didn't create it this run), + the cleanup path must NOT delete it. Only channels created in + this ingest are owned by us.""" + import botocore.exceptions + + with patch("asset_uploader.boto3.client") as mock_boto: + s3 = MagicMock() + s3.upload_file.side_effect = botocore.exceptions.ClientError( + {"Error": {"Code": "ExpiredToken"}}, + "PutObject", + ) + mock_boto.return_value = s3 + + rsps = responses.RequestsMock(assert_all_requests_are_fired=False) + rsps.start() + try: + # Same as happy path but with a pre-existing channel that + # we'll reuse rather than create. Note: matches by + # name+type+rate per TimeSeriesChannel.__eq__. + rsps.add( + responses.GET, + f"{API_HOST2}/compute/workflows/runs/{WORKFLOW_INSTANCE_ID}", + json=_workflow_response(), + status=200, + ) + rsps.add( + responses.PUT, + f"{API_HOST}/packages/{PACKAGE_NODE_ID}", + json={}, + status=200, + ) + rsps.add( + responses.GET, + f"{API_HOST2}/packages/assets", + json={"assets": []}, + status=200, + ) + rsps.add( + responses.POST, + f"{API_HOST2}/packages/assets", + json=_asset_create_response(), + status=201, + ) + # Existing channel matching the staged metadata file — + # already linked to the asset we're about to use. + rsps.add( + responses.GET, + f"{API_HOST}/timeseries/{PACKAGE_NODE_ID}/channels", + json=[ + { + "content": { + "id": CHANNEL_NODE_ID, + "name": "ch-0", + "start": 0, + "end": 1000, + "unit": "uV", + "rate": 1000.0, + "channelType": "CONTINUOUS", + "group": "default", + "lastAnnotation": 0, + "viewerAssetId": ASSET_ID, + }, + "properties": [], + } + ], + status=200, + ) + # Asset cleanup DELETE; explicitly DO NOT register a + # channel DELETE — if the orchestrator tries to delete + # the reused channel, this test fails with a + # ConnectionError on the unmatched URL. + rsps.add( + responses.DELETE, + f"{API_HOST2}/packages/assets/{ASSET_ID}", + status=204, + ) + + with pytest.raises(botocore.exceptions.ClientError): + import_timeseries_via_assets( + api_host=API_HOST, + api2_host=API_HOST2, + session_manager=session_manager, + workflow_instance_id=WORKFLOW_INSTANCE_ID, + file_directory=staged_files, + asset_name="mef-asset", + asset_type="timeseries", + ) + + # No channel DELETE was attempted; only the asset DELETE. + delete_calls = [c for c in rsps.calls if c.request.method == "DELETE"] + assert len(delete_calls) == 1 + assert ASSET_ID in delete_calls[0].request.url + finally: + rsps.stop() + rsps.reset() + + +class TestMultiPackageIdempotency: + """Re-run on a multi-package workflow finds the existing asset by + iterating the workflow packages — not by the parent collection + that determine_target_package walks up to. + + This is the bug the reviewer caught: lookup-by-parent vs. + create-with-children produces orphan duplicate assets on re-run. + """ + + @patch("asset_uploader.boto3.client") + def test_ready_asset_found_via_second_child_package(self, mock_boto, session_manager, staged_files): + # Workflow has 3 children; the existing asset is linked to all of + # them, but list_assets_for_package only returns it for ones + # actually in viewer_asset_packages. + child_a = "N:package:mef-a" + child_b = "N:package:mef-b" + child_c = "N:package:mef-c" + parent = "N:collection:recording-parent" + + rsps = responses.RequestsMock() + rsps.start() + try: + # Workflow returns the children, not the parent + rsps.add( + responses.GET, + f"{API_HOST2}/compute/workflows/runs/{WORKFLOW_INSTANCE_ID}", + json={ + "uuid": WORKFLOW_INSTANCE_ID, + "datasetId": DATASET_NODE_ID, + "dataSources": {"src": {"packageIds": [child_a, child_b, child_c]}}, + }, + status=200, + ) + # determine_target_package walks first child to its parent + rsps.add( + responses.GET, + f"{API_HOST}/packages/{child_a}", + json={ + "parent": {"content": {"nodeId": parent}}, + "content": {"nodeId": child_a}, + }, + status=200, + ) + # Properties are set on the parent (legacy aggregation) + rsps.add( + responses.PUT, + f"{API_HOST}/packages/{parent}", + json={}, + status=200, + ) + # Lookup by first child returns empty (this is the bug-trap: + # if we'd looked up by parent we'd also get empty here, then + # create a duplicate asset). + rsps.add( + responses.GET, + f"{API_HOST2}/packages/assets", + match=[responses.matchers.query_param_matcher({"dataset_id": DATASET_NODE_ID, "package_id": child_a})], + json={"assets": []}, + status=200, + ) + # Lookup by second child returns the ready asset + rsps.add( + responses.GET, + f"{API_HOST2}/packages/assets", + match=[responses.matchers.query_param_matcher({"dataset_id": DATASET_NODE_ID, "package_id": child_b})], + json={ + "assets": [ + { + "id": ASSET_ID, + "dataset_id": DATASET_NODE_ID, + "name": "mef-asset", + "asset_type": "timeseries", + "asset_url": "", + "properties": {}, + "status": "ready", + "package_ids": [child_a, child_b, child_c], + "created_at": "2026-04-30T12:00:00Z", + } + ] + }, + status=200, + ) + + result = import_timeseries_via_assets( + api_host=API_HOST, + api2_host=API_HOST2, + session_manager=session_manager, + workflow_instance_id=WORKFLOW_INSTANCE_ID, + file_directory=staged_files, + asset_name="mef-asset", + asset_type="timeseries", + ) + + assert result == ASSET_ID + # No upload, no POST /assets — purely idempotent skip + mock_boto.assert_not_called() + posts = [c for c in rsps.calls if c.request.method == "POST"] + assert posts == [] + # Iteration stopped after second child — third was never queried + assets_lookups = [ + c for c in rsps.calls if c.request.method == "GET" and "/packages/assets" in c.request.url + ] + assert len(assets_lookups) == 2 + finally: + rsps.stop() + rsps.reset() + + +class TestFailFastOnMalformedFiles: + """Unparseable channel/chunk filenames or chunks with no matching + channel must abort the ingest after asset creation, triggering the + cleanup DELETE so the asset never reaches 'active' with partial data. + """ + + def _malformed_metadata_dir(self, tmp_path): + """Output dir with one chunk file (will fail filename parse).""" + output = tmp_path / "output" + output.mkdir() + # Metadata filename does NOT contain channel-NNNNN + meta = { + "name": "ch-0", + "start": 0, + "end": 1000, + "unit": "uV", + "rate": 1000.0, + "type": "CONTINUOUS", + "group": "default", + "lastAnnotation": 0, + "properties": [], + } + (output / "weird-name.metadata.json").write_text(json.dumps(meta)) + chunk = output / "channel-00000_0_1000.bin.gz" + with gzip.open(chunk, "wb") as f: + f.write(b"\x00" * 8) + return str(output) + + def _malformed_chunk_dir(self, tmp_path): + """Output dir with chunk filename that doesn't match the pattern.""" + output = tmp_path / "output" + output.mkdir() + meta = { + "name": "ch-0", + "start": 0, + "end": 1000, + "unit": "uV", + "rate": 1000.0, + "type": "CONTINUOUS", + "group": "default", + "lastAnnotation": 0, + "properties": [], + } + (output / "channel-00000.metadata.json").write_text(json.dumps(meta)) + chunk = output / "wrong-prefix_0_1000.bin.gz" + with gzip.open(chunk, "wb") as f: + f.write(b"\x00" * 8) + return str(output) + + def _orphan_chunk_dir(self, tmp_path): + """Chunk references channel-00001 but only channel-00000 metadata exists.""" + output = tmp_path / "output" + output.mkdir() + meta = { + "name": "ch-0", + "start": 0, + "end": 1000, + "unit": "uV", + "rate": 1000.0, + "type": "CONTINUOUS", + "group": "default", + "lastAnnotation": 0, + "properties": [], + } + (output / "channel-00000.metadata.json").write_text(json.dumps(meta)) + # Only metadata for index 0; chunk references index 1 + chunk = output / "channel-00001_0_1000.bin.gz" + with gzip.open(chunk, "wb") as f: + f.write(b"\x00" * 8) + return str(output) + + def _wire_through_asset_create(self, rsps): + """Register everything up to and including asset creation. Tests + below verify the next step raises and triggers cleanup DELETE.""" + rsps.add( + responses.GET, + f"{API_HOST2}/compute/workflows/runs/{WORKFLOW_INSTANCE_ID}", + json=_workflow_response(), + status=200, + ) + rsps.add( + responses.PUT, + f"{API_HOST}/packages/{PACKAGE_NODE_ID}", + json={}, + status=200, + ) + rsps.add( + responses.GET, + f"{API_HOST2}/packages/assets", + json={"assets": []}, + status=200, + ) + rsps.add( + responses.POST, + f"{API_HOST2}/packages/assets", + json=_asset_create_response(), + status=201, + ) + rsps.add( + responses.GET, + f"{API_HOST}/timeseries/{PACKAGE_NODE_ID}/channels", + json=[], + status=200, + ) + rsps.add( + responses.POST, + f"{API_HOST}/timeseries/{PACKAGE_NODE_ID}/channels", + json=_channel_create_response(), + status=201, + ) + # Asset cleanup DELETE on any failure + rsps.add( + responses.DELETE, + f"{API_HOST2}/packages/assets/{ASSET_ID}", + status=204, + ) + + @patch("asset_uploader.boto3.client") + def test_unparseable_metadata_filename_raises_and_cleans_up(self, mock_boto, session_manager, tmp_path): + rsps = responses.RequestsMock(assert_all_requests_are_fired=False) + rsps.start() + try: + self._wire_through_asset_create(rsps) + + with pytest.raises(RuntimeError, match="metadata filename"): + import_timeseries_via_assets( + api_host=API_HOST, + api2_host=API_HOST2, + session_manager=session_manager, + workflow_instance_id=WORKFLOW_INSTANCE_ID, + file_directory=self._malformed_metadata_dir(tmp_path), + asset_name="mef-asset", + asset_type="timeseries", + ) + + # No upload happened; cleanup DELETE fired + mock_boto.assert_not_called() + assert any(c.request.method == "DELETE" and ASSET_ID in c.request.url for c in rsps.calls) + finally: + rsps.stop() + rsps.reset() + + @patch("asset_uploader.boto3.client") + def test_unparseable_chunk_filename_raises_and_cleans_up(self, mock_boto, session_manager, tmp_path): + rsps = responses.RequestsMock(assert_all_requests_are_fired=False) + rsps.start() + try: + self._wire_through_asset_create(rsps) + + with pytest.raises(RuntimeError, match="chunk filename"): + import_timeseries_via_assets( + api_host=API_HOST, + api2_host=API_HOST2, + session_manager=session_manager, + workflow_instance_id=WORKFLOW_INSTANCE_ID, + file_directory=self._malformed_chunk_dir(tmp_path), + asset_name="mef-asset", + asset_type="timeseries", + ) + + mock_boto.assert_not_called() + assert any(c.request.method == "DELETE" and ASSET_ID in c.request.url for c in rsps.calls) + finally: + rsps.stop() + rsps.reset() + + @patch("asset_uploader.boto3.client") + def test_chunk_with_no_resolved_channel_raises_and_cleans_up(self, mock_boto, session_manager, tmp_path): + rsps = responses.RequestsMock(assert_all_requests_are_fired=False) + rsps.start() + try: + self._wire_through_asset_create(rsps) + + with pytest.raises(RuntimeError, match="no channel metadata was resolved"): + import_timeseries_via_assets( + api_host=API_HOST, + api2_host=API_HOST2, + session_manager=session_manager, + workflow_instance_id=WORKFLOW_INSTANCE_ID, + file_directory=self._orphan_chunk_dir(tmp_path), + asset_name="mef-asset", + asset_type="timeseries", + ) + + mock_boto.assert_not_called() + assert any(c.request.method == "DELETE" and ASSET_ID in c.request.url for c in rsps.calls) + finally: + rsps.stop() + rsps.reset() + + +class TestEmptyDirectory: + def test_no_files_returns_none_without_calling_services(self, session_manager, tmp_path): + empty = tmp_path / "empty" + empty.mkdir() + + # No responses registered — if any HTTP call fires, responses will raise + with responses.RequestsMock() as rsps: + result = import_timeseries_via_assets( + api_host=API_HOST, + api2_host=API_HOST2, + session_manager=session_manager, + workflow_instance_id=WORKFLOW_INSTANCE_ID, + file_directory=str(empty), + asset_name="mef-asset", + asset_type="timeseries", + ) + assert result is None + assert len(rsps.calls) == 0 diff --git a/tests/test_packages_assets_client.py b/tests/test_packages_assets_client.py new file mode 100644 index 0000000..1886560 --- /dev/null +++ b/tests/test_packages_assets_client.py @@ -0,0 +1,264 @@ +import json +from unittest.mock import patch + +import pytest +import responses +from clients.packages_assets_client import ( + CreatedAsset, + PackagesAssetsClient, + ViewerAsset, +) + + +def _create_response_body(): + """A canonical POST /assets response body.""" + return { + "asset": { + "id": "00000000-0000-0000-0000-000000000001", + "dataset_id": "N:dataset:abc", + "name": "mef-asset", + "asset_type": "timeseries", + "asset_url": "", + "properties": {}, + "status": "created", + "package_ids": ["N:package:p1"], + "created_at": "2026-04-20T12:00:00Z", + }, + "upload_credentials": { + "access_key_id": "AKIATEST", + "secret_access_key": "SECRET", + "session_token": "TOKEN", + "expiration": "2026-04-20T13:00:00Z", + "bucket": "pennsieve-viewer-assets", + "region": "us-east-1", + "key_prefix": "viewer-assets/O19/D2049/00000000-0000-0000-0000-000000000001/", + }, + } + + +class TestPackagesAssetsClientInit: + def test_initialization_appends_packages_path(self, mock_session_manager): + client = PackagesAssetsClient("https://api2.test.com", mock_session_manager) + assert client.base_url == "https://api2.test.com/packages" + + +class TestPackagesAssetsClientCreate: + @responses.activate + def test_create_asset_success(self, mock_session_manager): + responses.add( + responses.POST, + "https://api2.test.com/packages/assets", + json=_create_response_body(), + status=201, + ) + + client = PackagesAssetsClient("https://api2.test.com", mock_session_manager) + result = client.create_asset( + dataset_id="N:dataset:abc", + package_ids=["N:package:p1"], + name="mef-asset", + asset_type="timeseries", + ) + + assert isinstance(result, CreatedAsset) + assert result.asset.id == "00000000-0000-0000-0000-000000000001" + assert result.asset.name == "mef-asset" + assert result.upload_credentials.bucket == "pennsieve-viewer-assets" + assert result.upload_credentials.key_prefix.startswith("viewer-assets/") + + @responses.activate + def test_create_asset_sends_dataset_id_as_query_param(self, mock_session_manager): + responses.add( + responses.POST, + "https://api2.test.com/packages/assets", + json=_create_response_body(), + status=201, + ) + + client = PackagesAssetsClient("https://api2.test.com", mock_session_manager) + client.create_asset( + dataset_id="N:dataset:abc", + package_ids=["N:package:p1"], + name="mef-asset", + asset_type="timeseries", + ) + + request = responses.calls[0].request + assert "dataset_id=N%3Adataset%3Aabc" in request.url + body = json.loads(request.body) + assert body == { + "name": "mef-asset", + "asset_type": "timeseries", + "package_ids": ["N:package:p1"], + } + + @responses.activate + def test_create_asset_includes_properties_when_provided(self, mock_session_manager): + responses.add( + responses.POST, + "https://api2.test.com/packages/assets", + json=_create_response_body(), + status=201, + ) + + client = PackagesAssetsClient("https://api2.test.com", mock_session_manager) + client.create_asset( + dataset_id="N:dataset:abc", + package_ids=["N:package:p1"], + name="mef-asset", + asset_type="timeseries", + properties={"source": "mef"}, + ) + + body = json.loads(responses.calls[0].request.body) + assert body["properties"] == {"source": "mef"} + + @responses.activate + def test_create_asset_raises_on_4xx(self, mock_session_manager): + responses.add( + responses.POST, + "https://api2.test.com/packages/assets", + json={"error": "bad request"}, + status=400, + ) + + client = PackagesAssetsClient("https://api2.test.com", mock_session_manager) + with pytest.raises(Exception): + client.create_asset( + dataset_id="N:dataset:abc", + package_ids=["N:package:p1"], + name="mef-asset", + asset_type="timeseries", + ) + + +class TestPackagesAssetsClientList: + @responses.activate + def test_list_assets_success(self, mock_session_manager): + responses.add( + responses.GET, + "https://api2.test.com/packages/assets", + json={ + "assets": [ + { + "id": "uuid-1", + "dataset_id": "N:dataset:abc", + "name": "mef-asset", + "asset_type": "timeseries", + "asset_url": "", + "properties": {}, + "status": "active", + "package_ids": ["N:package:p1"], + "created_at": "2026-04-20T12:00:00Z", + } + ] + }, + status=200, + ) + + client = PackagesAssetsClient("https://api2.test.com", mock_session_manager) + result = client.list_assets_for_package(dataset_id="N:dataset:abc", package_id="N:package:p1") + + assert len(result) == 1 + assert isinstance(result[0], ViewerAsset) + assert result[0].id == "uuid-1" + assert result[0].status == "active" + + @responses.activate + def test_list_assets_returns_empty_when_no_assets(self, mock_session_manager): + responses.add( + responses.GET, + "https://api2.test.com/packages/assets", + json={"assets": []}, + status=200, + ) + + client = PackagesAssetsClient("https://api2.test.com", mock_session_manager) + assert client.list_assets_for_package("N:dataset:abc", "N:package:p1") == [] + + +class TestPackagesAssetsClientUpdate: + @responses.activate + def test_update_asset_status_only(self, mock_session_manager): + responses.add( + responses.PATCH, + "https://api2.test.com/packages/assets/uuid-1", + json={ + "id": "uuid-1", + "dataset_id": "N:dataset:abc", + "name": "mef-asset", + "asset_type": "timeseries", + "status": "active", + "package_ids": ["N:package:p1"], + "created_at": "2026-04-20T12:00:00Z", + }, + status=200, + ) + + client = PackagesAssetsClient("https://api2.test.com", mock_session_manager) + result = client.update_asset(asset_id="uuid-1", dataset_id="N:dataset:abc", status="active") + + assert result.status == "active" + body = json.loads(responses.calls[0].request.body) + assert body == {"status": "active"} + + +class TestPackagesAssetsClientDelete: + @responses.activate + def test_delete_asset_returns_none(self, mock_session_manager): + responses.add( + responses.DELETE, + "https://api2.test.com/packages/assets/uuid-1", + status=204, + ) + + client = PackagesAssetsClient("https://api2.test.com", mock_session_manager) + assert client.delete_asset(asset_id="uuid-1", dataset_id="N:dataset:abc") is None + assert "dataset_id=N%3Adataset%3Aabc" in responses.calls[0].request.url + + +class TestPackagesAssetsClientRetries: + @patch("time.sleep") # skip the backoff delays + @responses.activate + def test_retries_on_5xx_then_succeeds(self, _sleep, mock_session_manager): + # Two transient failures, then success. + responses.add( + responses.GET, + "https://api2.test.com/packages/assets", + json={"error": "transient"}, + status=503, + ) + responses.add( + responses.GET, + "https://api2.test.com/packages/assets", + json={"error": "transient"}, + status=503, + ) + responses.add( + responses.GET, + "https://api2.test.com/packages/assets", + json={"assets": []}, + status=200, + ) + + client = PackagesAssetsClient("https://api2.test.com", mock_session_manager) + result = client.list_assets_for_package("N:dataset:abc", "N:package:p1") + assert result == [] + assert len(responses.calls) == 3 + + @responses.activate + def test_does_not_retry_on_4xx(self, mock_session_manager): + responses.add( + responses.GET, + "https://api2.test.com/packages/assets", + json={"error": "not found"}, + status=404, + ) + + client = PackagesAssetsClient("https://api2.test.com", mock_session_manager) + with pytest.raises(Exception): + client.list_assets_for_package("N:dataset:abc", "N:package:p1") + + # only 1 call — backoff gave up on 4xx (note: 401/403 path would be + # one extra attempt via retry_with_refresh, but plain 404 should not retry) + assert len(responses.calls) == 1 diff --git a/tests/test_timeseries_channel.py b/tests/test_timeseries_channel.py index 2ef7eb0..b36f519 100644 --- a/tests/test_timeseries_channel.py +++ b/tests/test_timeseries_channel.py @@ -112,6 +112,26 @@ def test_as_dict_with_custom_properties(self): result = channel.as_dict() assert result["properties"] == [{"key1": "value1"}, {"key2": "value2"}] + def test_as_dict_omits_viewer_asset_id_when_none(self): + """viewerAssetId is excluded from the body when not set.""" + channel = TimeSeriesChannel(index=0, name="Test", rate=1000.0, start=0, end=1000) + result = channel.as_dict() + assert "viewerAssetId" not in result + + def test_as_dict_includes_viewer_asset_id_when_set(self): + """viewerAssetId is serialized as a string when set.""" + asset_id = "00000000-0000-0000-0000-0000000000a1" + channel = TimeSeriesChannel( + index=0, + name="Test", + rate=1000.0, + start=0, + end=1000, + viewer_asset_id=asset_id, + ) + result = channel.as_dict() + assert result["viewerAssetId"] == asset_id + class TestTimeSeriesChannelFromDict: """Tests for TimeSeriesChannel.from_dict() static method.""" diff --git a/tests/test_timeseries_ranges_client.py b/tests/test_timeseries_ranges_client.py new file mode 100644 index 0000000..a0ef4af --- /dev/null +++ b/tests/test_timeseries_ranges_client.py @@ -0,0 +1,139 @@ +import json + +import pytest +import responses +from clients.timeseries_ranges_client import ( + CreateRangesResult, + RangeChunk, + TimeSeriesRangesClient, +) + + +def _ok_response(requested=2, created=2, skipped=0): + return {"requested": requested, "created": created, "skipped": skipped} + + +class TestTimeSeriesRangesClientInit: + def test_initialization_appends_timeseries_path(self, mock_session_manager): + client = TimeSeriesRangesClient("https://api2.test.com", mock_session_manager) + assert client.base_url == "https://api2.test.com/timeseries" + + +class TestTimeSeriesRangesClientCreate: + @responses.activate + def test_create_ranges_success(self, mock_session_manager): + responses.add( + responses.POST, + "https://api2.test.com/timeseries/package/N:package:p1/ranges", + json=_ok_response(requested=2, created=2, skipped=0), + status=201, + ) + + client = TimeSeriesRangesClient("https://api2.test.com", mock_session_manager) + chunks = [ + RangeChunk("N:channel:c1", 0, 1000, "N:channel:c1_0_1000.bin.gz"), + RangeChunk("N:channel:c2", 0, 1000, "N:channel:c2_0_1000.bin.gz"), + ] + + result = client.create_ranges("N:package:p1", "asset-uuid", chunks) + assert isinstance(result, CreateRangesResult) + assert result.requested == 2 + assert result.created == 2 + assert result.skipped == 0 + + @responses.activate + def test_create_ranges_sends_snake_case_body(self, mock_session_manager): + responses.add( + responses.POST, + "https://api2.test.com/timeseries/package/N:package:p1/ranges", + json=_ok_response(requested=1, created=1), + status=201, + ) + + client = TimeSeriesRangesClient("https://api2.test.com", mock_session_manager) + chunks = [RangeChunk("N:channel:c1", 0, 1000, "rel/key.bin.gz")] + client.create_ranges("N:package:p1", "asset-uuid", chunks) + + body = json.loads(responses.calls[0].request.body) + assert body["viewer_asset_id"] == "asset-uuid" + assert len(body["chunks"]) == 1 + assert body["chunks"][0] == { + "channel_node_id": "N:channel:c1", + "start": 0, + "end": 1000, + "s3_key": "rel/key.bin.gz", + } + + def test_create_ranges_short_circuits_on_empty(self, mock_session_manager): + client = TimeSeriesRangesClient("https://api2.test.com", mock_session_manager) + result = client.create_ranges("N:package:p1", "asset-uuid", []) + assert result == CreateRangesResult(requested=0, created=0, skipped=0) + + def test_create_ranges_rejects_oversized_batch(self, mock_session_manager): + client = TimeSeriesRangesClient("https://api2.test.com", mock_session_manager) + oversize = [ + RangeChunk("N:channel:c", i, i + 1, f"k_{i}.bin.gz") for i in range(client.MAX_CHUNKS_PER_REQUEST + 1) + ] + with pytest.raises(ValueError, match="too many chunks"): + client.create_ranges("N:package:p1", "asset-uuid", oversize) + + @responses.activate + def test_create_ranges_raises_on_4xx_with_invalid_chunks(self, mock_session_manager): + responses.add( + responses.POST, + "https://api2.test.com/timeseries/package/N:package:p1/ranges", + json={ + "message": "one or more chunks are invalid", + "invalid_chunks": [{"index": 0, "reason": "bad start/end"}], + }, + status=400, + ) + + client = TimeSeriesRangesClient("https://api2.test.com", mock_session_manager) + chunks = [RangeChunk("N:channel:c1", 100, 0, "rel/key.bin.gz")] + + with pytest.raises(Exception): + client.create_ranges("N:package:p1", "asset-uuid", chunks) + + +class TestTimeSeriesRangesClientBatched: + @responses.activate + def test_batched_splits_into_max_chunks(self, mock_session_manager): + # Send more chunks than the server allows in one call. The client + # should split them up and make multiple trips to the server. + max_n = TimeSeriesRangesClient.MAX_CHUNKS_PER_REQUEST + for size in (max_n, max_n, 500): + responses.add( + responses.POST, + "https://api2.test.com/timeseries/package/N:package:p1/ranges", + json=_ok_response(requested=size, created=size, skipped=0), + status=201, + ) + + client = TimeSeriesRangesClient("https://api2.test.com", mock_session_manager) + chunks = [RangeChunk("N:channel:c", i, i + 1, f"k_{i}.bin.gz") for i in range(2 * max_n + 500)] + + result = client.create_ranges_batched("N:package:p1", "asset-uuid", chunks) + assert result.requested == 2 * max_n + 500 + assert result.created == 2 * max_n + 500 + assert len(responses.calls) == 3 + + @responses.activate + def test_batched_aborts_on_first_failure(self, mock_session_manager): + # First batch fails — second batch should not be sent. + responses.add( + responses.POST, + "https://api2.test.com/timeseries/package/N:package:p1/ranges", + json={"error": "bad chunk"}, + status=400, + ) + + client = TimeSeriesRangesClient("https://api2.test.com", mock_session_manager) + chunks = [ + RangeChunk("N:channel:c", i, i + 1, f"k_{i}.bin.gz") for i in range(client.MAX_CHUNKS_PER_REQUEST + 1) + ] + + with pytest.raises(Exception): + client.create_ranges_batched("N:package:p1", "asset-uuid", chunks) + + assert len(responses.calls) == 1 diff --git a/tests/test_workflow_client.py b/tests/test_workflow_client.py index 6be87c0..256f4ee 100644 --- a/tests/test_workflow_client.py +++ b/tests/test_workflow_client.py @@ -50,7 +50,11 @@ def test_get_workflow_instance_success(self, mock_session_manager): responses.add( responses.GET, "https://api.test.com/compute/workflows/runs/wf-instance-123", - json={"uuid": "wf-instance-123", "datasetId": "dataset-456", "dataSources": {"source-1": {"packageIds": ["pkg-1", "pkg-2", "pkg-3"]}}}, + json={ + "uuid": "wf-instance-123", + "datasetId": "dataset-456", + "dataSources": {"source-1": {"packageIds": ["pkg-1", "pkg-2", "pkg-3"]}}, + }, status=200, ) @@ -125,7 +129,10 @@ def test_get_workflow_instance_retries_on_401(self, mock_session_manager): """Test that get_workflow_instance retries after 401.""" # First call returns 401 responses.add( - responses.GET, "https://api.test.com/compute/workflows/runs/wf-123", json={"error": "Unauthorized"}, status=401 + responses.GET, + "https://api.test.com/compute/workflows/runs/wf-123", + json={"error": "Unauthorized"}, + status=401, ) # Second call succeeds responses.add(