diff --git a/python/lib/sift_client/_internal/low_level_wrappers/data_imports.py b/python/lib/sift_client/_internal/low_level_wrappers/data_imports.py new file mode 100644 index 000000000..b0219124b --- /dev/null +++ b/python/lib/sift_client/_internal/low_level_wrappers/data_imports.py @@ -0,0 +1,104 @@ +from __future__ import annotations + +from typing import TYPE_CHECKING, cast + +from sift.data_imports.v2.data_imports_pb2 import ( + CreateDataImportFromUploadRequest, + CreateDataImportFromUploadResponse, + DetectConfigRequest, + DetectConfigResponse, + GetDataImportRequest, + GetDataImportResponse, +) +from sift.data_imports.v2.data_imports_pb2_grpc import DataImportServiceStub + +from sift_client._internal.low_level_wrappers.base import LowLevelClientBase +from sift_client.sift_types.data_import import ( + CsvImportConfig, + Hdf5ImportConfig, + ImportConfig, + ParquetFlatDatasetImportConfig, + ParquetSingleChannelPerRowImportConfig, + TdmsImportConfig, +) +from sift_client.transport import WithGrpcClient + +if TYPE_CHECKING: + from sift.data_imports.v2.data_imports_pb2 import DataTypeKey + + from sift_client.transport.grpc_transport import GrpcClient + + +def _set_config_on_request( + request: CreateDataImportFromUploadRequest, + config: ImportConfig, +) -> None: + """Set the appropriate config field on a proto request based on the config type.""" + if isinstance(config, CsvImportConfig): + request.csv_config.CopyFrom(config._to_proto()) + elif isinstance( + config, (ParquetFlatDatasetImportConfig, ParquetSingleChannelPerRowImportConfig) + ): + request.parquet_config.CopyFrom(config._to_proto()) + elif isinstance(config, TdmsImportConfig): + request.tdms_config.CopyFrom(config._to_proto()) + elif isinstance(config, Hdf5ImportConfig): + request.hdf5_config.CopyFrom(config._to_proto()) + else: + raise TypeError(f"Unsupported import config type: {type(config).__name__}") + + +class DataImportsLowLevelClient(LowLevelClientBase, WithGrpcClient): + """Low-level client for the DataImportService. + + This class provides a thin wrapper around the autogenerated bindings for the DataImportsAPI. + """ + + def __init__(self, grpc_client: GrpcClient): + WithGrpcClient.__init__(self, grpc_client=grpc_client) + + async def create_from_upload(self, config: ImportConfig) -> tuple[str, str]: + """Create a data import and get back a presigned upload URL. + + Args: + config: The import configuration. + + Returns: + A tuple of (data_import_id, upload_url). + """ + request = CreateDataImportFromUploadRequest() + _set_config_on_request(request, config) + response = await self._grpc_client.get_stub( + DataImportServiceStub + ).CreateDataImportFromUpload(request) + response = cast("CreateDataImportFromUploadResponse", response) + return response.data_import_id, response.upload_url + + async def get(self, data_import_id: str) -> GetDataImportResponse: + """Get a data import by ID. + + Args: + data_import_id: The ID of the data import. + + Returns: + The GetDataImportResponse proto. + """ + request = GetDataImportRequest(data_import_id=data_import_id) + response = await self._grpc_client.get_stub(DataImportServiceStub).GetDataImport(request) + return cast("GetDataImportResponse", response) + + async def detect_config( + self, data: bytes, data_type_key: DataTypeKey.ValueType + ) -> DetectConfigResponse: + """Call the DetectConfig RPC to auto-detect import configuration. + + Args: + data: A sample of the file content (e.g. the first 64 KiB). + data_type_key: The file type hint. + + Returns: + The raw DetectConfigResponse proto. + """ + request = DetectConfigRequest(data=data, type=data_type_key) + response = await self._grpc_client.get_stub(DataImportServiceStub).DetectConfig(request) + return cast("DetectConfigResponse", response) diff --git a/python/lib/sift_client/_internal/util/file.py b/python/lib/sift_client/_internal/util/file.py index 518bce847..5e0269110 100644 --- a/python/lib/sift_client/_internal/util/file.py +++ b/python/lib/sift_client/_internal/util/file.py @@ -1,11 +1,14 @@ from __future__ import annotations +import os +import struct import warnings import zipfile from typing import TYPE_CHECKING from alive_progress import alive_bar # type: ignore[import-untyped] +import sift_client as _sift_client_module from sift_client.errors import SiftWarning if TYPE_CHECKING: @@ -14,6 +17,78 @@ from sift_client.transport.rest_transport import RestClient +class _ProgressReader: + """Wraps a file object to report read progress to an alive_bar callback.""" + + def __init__(self, file_object, progress_bar): + self._file_object = file_object + self._progress_bar = progress_bar + + def read(self, size=-1): + chunk = self._file_object.read(size) + if chunk: + self._progress_bar(len(chunk)) + return chunk + + def __getattr__(self, name): + return getattr(self._file_object, name) + + +def resolve_show_progress(*, is_sync: bool) -> bool: + """Resolve the show_progress setting from the global config. + + Returns the global ``sift_client.config.show_progress`` value when set, + otherwise defaults to ``is_sync``. + """ + global_setting = _sift_client_module.config.show_progress + if global_setting is not None: + return global_setting + return is_sync + + +def upload_file( + signed_url: str, + file_path: Path, + *, + rest_client: RestClient, + show_progress: bool = False, +) -> dict: + """Upload a file to a presigned URL. + + Args: + signed_url: The presigned URL to upload to. + file_path: Path to the file to upload. + rest_client: The SDK rest client to use for the upload. + show_progress: If True, display a progress spinner during upload. + + Returns: + The parsed JSON response from the server. + + Raises: + ValueError: If the upload request fails. + """ + file_size = file_path.stat().st_size + + with alive_bar( + file_size, + title=f"Upload [{file_path.name}]", + spinner="dots_waves", + spinner_length=7, + unit="B", + scale="SI", + disable=not show_progress, + ) as bar: + with open(file_path, "rb") as file: + wrapped = _ProgressReader(file, bar) + response = rest_client.post( + signed_url, + data=wrapped, + headers={"Content-Disposition": f'attachment; filename="{file_path.name}"'}, + ) + response.raise_for_status() + return response.json() + + def download_file( signed_url: str, output_path: Path, @@ -82,3 +157,27 @@ def extract_zip(zip_path: Path, output_dir: Path, *, delete_zip: bool = True) -> except OSError: warnings.warn(f"Failed to delete zip file '{zip_path}'", SiftWarning, stacklevel=2) return [output_dir / name for name in names if not name.endswith("/")] + + +def extract_parquet_footer(path: Path) -> tuple[bytes, int]: + """Extract the Parquet footer bytes and compute the footer offset. + + Args: + path: Path to the Parquet file. + + Returns: + A tuple of (footer_bytes, footer_offset). + + Raises: + ValueError: If the file is not a valid Parquet file. + """ + with open(path, "rb") as f: + f.seek(-8, 2) + footer_tail = f.read(8) + footer_len = struct.unpack(" GrpcClient: def rest_client(self) -> RestClient: return self.client.rest_client + def _show_progress(self) -> bool: + return resolve_show_progress(is_sync=getattr(self, "_is_sync", False)) + def _apply_client_to_instance(self, instance: T) -> T: instance._apply_client_to_instance(self.client) return instance diff --git a/python/lib/sift_client/resources/data_imports.py b/python/lib/sift_client/resources/data_imports.py new file mode 100644 index 000000000..f40876234 --- /dev/null +++ b/python/lib/sift_client/resources/data_imports.py @@ -0,0 +1,375 @@ +from __future__ import annotations + +from pathlib import Path +from typing import TYPE_CHECKING + +from sift_client._internal.low_level_wrappers.data_imports import DataImportsLowLevelClient +from sift_client._internal.util.executor import run_sync_function +from sift_client._internal.util.file import extract_parquet_footer, upload_file +from sift_client.resources._base import ResourceBase +from sift_client.sift_types.asset import Asset +from sift_client.sift_types.channel import ChannelDataType +from sift_client.sift_types.data_import import ( + EXTENSION_TO_DATA_TYPE_KEY, + CsvImportConfig, + DataTypeKey, + ImportConfig, + ParquetFlatDatasetImportConfig, + ParquetSingleChannelPerRowImportConfig, + ParquetTimeColumn, +) +from sift_client.sift_types.run import Run + +if TYPE_CHECKING: + from collections.abc import Iterable + + from sift_client.client import SiftClient + from sift_client.sift_types.job import Job + + +class DataImportAPIAsync(ResourceBase): + """High-level API for importing data into Sift.""" + + def __init__(self, sift_client: SiftClient): + """Initialize the DataImportAPI. + + Args: + sift_client: The Sift client to use. + """ + super().__init__(sift_client) + self._low_level_client = DataImportsLowLevelClient( + grpc_client=self.client.grpc_client, + ) + + async def import_from_path( + self, + file_path: str | Path, + *, + asset: Asset | str | None = None, + config: ImportConfig | None = None, + data_type: DataTypeKey | None = None, + run: Run | str | None = None, + run_name: str | None = None, + show_progress: bool | None = None, + ) -> Job: + """Import data from a local file. + + Creates a data import on the server, uploads the file, and returns + a ``Job`` handle after uploading the file. The import processes + server-side and typically completes shortly after upload. Use + ``job.wait_until_complete()`` only if you need to confirm + completion before proceeding. + + When ``config`` is omitted the file format is auto-detected via + ``detect_config`` (CSV and Parquet only). For other formats + (TDMS and HDF5), ``config`` must be provided. + When ``asset`` is provided it overrides the config value; + otherwise the config's ``asset_name`` is used. + If neither ``run`` nor ``run_name`` is provided (and none is + set on the config), ``run_name`` defaults to the filename. + + Examples: + Import a CSV file with auto-detected config: + + job = client.data_imports.import_from_path( + "data.csv", + asset=my_asset, + ) + + Auto-detect config, inspect and patch before importing: + + config = client.data_imports.detect_config("data.csv") + + # Fix a column data type + config["temperature"].data_type = ChannelDataType.FLOAT + + # Remove an unwanted column + config.data_columns = [ + dc for dc in config.data_columns if dc.name != "internal_id" + ] + + job = client.data_imports.import_from_path( + "data.csv", + asset=my_asset, + config=config, + ) + + Args: + file_path: Path to the local file to import. + asset: Asset object or asset name to import data into. Optional + when ``config`` already has ``asset_name`` set. + config: Import configuration describing the file format and column + mapping. When provided, ``data_type`` is ignored. If omitted, + the config is auto-detected via ``detect_config``. You can + call ``detect_config`` yourself to inspect and modify the + config before passing it here. + data_type: Explicit data type key. Required for formats like + Parquet where the extension alone is ambiguous. Only used + when ``config`` is not provided. + run: ``Run`` object or run ID string to import into an existing + run. Mutually exclusive with ``run_name``. + run_name: Name for a new run. Defaults to the filename if + neither ``run`` nor ``run_name`` is set. + show_progress: If True, display a progress spinner during upload. + Defaults to True for sync, False for async. + + Returns: + A ``Job`` handle for the pending import. + + Raises: + FileNotFoundError: If the file does not exist. + """ + path = Path(file_path) + if not path.is_file(): + raise FileNotFoundError(f"File not found: {file_path}") + + if config is None: + config = await self.detect_config(file_path, data_type=data_type) + + if asset is not None: + config.asset_name = asset.name if isinstance(asset, Asset) else asset + elif not config.asset_name: + raise ValueError("'asset' is required when not set on the config.") + if run is not None and run_name is not None: + raise ValueError("'run' and 'run_name' are mutually exclusive.") + if run is not None: + config.run_id = run._id_or_error if isinstance(run, Run) else run + elif run_name is not None: + config.run_name = run_name + elif not config.run_name and not config.run_id: + config.run_name = path.name + + if isinstance( + config, (ParquetFlatDatasetImportConfig, ParquetSingleChannelPerRowImportConfig) + ): + await _prepare_parquet_config(config, path) + + if show_progress is None: + show_progress = self._show_progress() + + _, upload_url = await self._low_level_client.create_from_upload(config) + + response = await run_sync_function( + lambda: upload_file( + upload_url, + path, + rest_client=self.client.rest_client, + show_progress=show_progress, + ) + ) + job_id = response.get("jobId") + if not job_id: + raise RuntimeError("Upload succeeded but server response did not include a job ID.") + + return await self.client.async_.jobs.get(job_id=job_id) + + async def get_run(self, data_import_id: str) -> Run: + """Get the run associated with a data import. + + The ``data_import_id`` is available on the job returned by + ``import_from_path`` via ``job.job_details.data_import_id``. + For a more ergonomic approach, use ``job.get_import_run()`` + which calls this method internally. + + Args: + data_import_id: The ID of the data import. + + Returns: + The Run created by or associated with the import. + + Raises: + ValueError: If the data import has no associated run. + """ + response = await self._low_level_client.get(data_import_id) + run_id = response.data_import.run_id + if not run_id: + raise ValueError("Data import does not have an associated run.") + return await self.client.async_.runs.get(run_id=run_id) + + async def detect_config( + self, + file_path: str | Path, + data_type: DataTypeKey | None = None, + ) -> ImportConfig: + """Auto-detect import configuration from a file. + + Reads a sample of the file, sends it to the server's DetectConfig + endpoint, and returns the detected configuration. The file format + is inferred from the file extension when ``data_type`` is not + provided. + + Only CSV and Parquet files are currently supported for auto-detection. + For other formats (TDMS, HDF5), create the config manually + using ``TdmsImportConfig`` or ``Hdf5ImportConfig``. + + For CSV files, the server scans the first two rows for an optional + JSON metadata row. Row 1 is checked first; row 2 is checked only + if row 1 is not valid metadata. A row qualifies as metadata when + every cell contains valid JSON that describes either a time column + or a data column. When present, ``first_data_row`` in the returned + config is set to the row after the metadata row. + + Each data column cell is a JSON ``ChannelConfig``:: + + {"name": "speed", "units": "m/s", "dataType": "CHANNEL_DATA_TYPE_DOUBLE"} + + The time column cell is a JSON ``CsvTimeColumn``:: + + {"format": "TIME_FORMAT_ABSOLUTE_RFC3339"} + + Enum type definitions and bit field elements can also be specified + in the metadata row; they are applied server-side during import + but are not included in the returned config. + + For file types with multiple layouts (e.g. Parquet), ``data_type`` + must be specified explicitly. + + Args: + file_path: Path to the file to analyze. + data_type: Explicit data type key. Required for formats like + Parquet where the extension alone is ambiguous. + + Returns: + The detected import config. + + Raises: + FileNotFoundError: If the file does not exist. + ValueError: If the file extension is unsupported or no + supported configuration could be detected. + """ + path = Path(file_path) + if not path.is_file(): + raise FileNotFoundError(f"File not found: {file_path}") + + data_type_key = _resolve_data_type_key(path.suffix.lower(), data_type) + + is_parquet = data_type_key in ( + DataTypeKey.PARQUET_FLATDATASET, + DataTypeKey.PARQUET_SINGLE_CHANNEL_PER_ROW, + ) + + footer_offset = 0 + footer_length = 0 + + if is_parquet: + footer_bytes, footer_offset = await run_sync_function( + lambda: extract_parquet_footer(path) + ) + sample = footer_bytes + footer_length = len(footer_bytes) + else: + + def _read_sample() -> bytes: + with open(path, "rb") as f: + return f.read(1048576) # 1MiB + + sample = await run_sync_function(_read_sample) + + response = await self._low_level_client.detect_config(sample, data_type_key.value) + + if response.HasField("csv_config"): + return _parse_csv_detect_response(response.csv_config) + + if response.HasField("parquet_config"): + return _parse_parquet_detect_response( + response.parquet_config, path.name, footer_offset, footer_length + ) + + raise ValueError( + f"No supported configuration detected for '{path.name}'. " + "Auto-detection supports CSV and Parquet files. " + "For other formats, provide a config manually." + ) + + +def _resolve_data_type_key(ext: str, data_type: DataTypeKey | None) -> DataTypeKey: + """Resolve the data type key from file extension and explicit override.""" + if data_type is not None: + return data_type + if ext in (".parquet", ".pqt"): + raise ValueError( + "Parquet files require 'data_type' to be specified. " + "Use DataTypeKey.PARQUET_FLATDATASET or DataTypeKey.PARQUET_SINGLE_CHANNEL_PER_ROW." + ) + if ext not in EXTENSION_TO_DATA_TYPE_KEY: + raise ValueError( + f"Unsupported file extension '{ext}'. " + f"Supported: {', '.join(sorted(EXTENSION_TO_DATA_TYPE_KEY))}. " + "You can also specify 'data_type' explicitly using a DataTypeKey value." + ) + return EXTENSION_TO_DATA_TYPE_KEY[ext] + + +def _parse_csv_detect_response(proto) -> CsvImportConfig: + """Parse a CSV DetectConfig response into a config.""" + csv_config = CsvImportConfig._from_proto(proto) + time_col = csv_config.time_column.column + csv_config.data_columns = [dc for dc in csv_config.data_columns if dc.column != time_col] + return csv_config + + +def _infer_time_column(columns: Iterable[tuple[str, ChannelDataType, str]]) -> str | None: + """Find a likely time column from a sequence of (name, data_type, path) tuples. + + The backend only detects arrow timestamp types. This falls back to the first + integer column whose name starts with "time". + """ + _integer_types = { + ChannelDataType.INT_32, + ChannelDataType.INT_64, + ChannelDataType.UINT_32, + ChannelDataType.UINT_64, + } + for name, data_type, path in columns: + if data_type in _integer_types and name.lower().startswith("time"): + return path + return None + + +def _parse_parquet_detect_response( + proto, filename: str, footer_offset: int, footer_length: int +) -> ParquetFlatDatasetImportConfig | ParquetSingleChannelPerRowImportConfig: + """Parse a Parquet DetectConfig response into a config.""" + if proto.HasField("flat_dataset"): + parquet_config = ParquetFlatDatasetImportConfig._from_proto( + proto, footer_offset=footer_offset, footer_length=footer_length + ) + time_path = parquet_config.time_column.path + if time_path: + parquet_config.data_columns = [ + dc for dc in parquet_config.data_columns if dc.path != time_path + ] + else: + inferred = _infer_time_column( + (dc.name, dc.data_type, dc.path) for dc in parquet_config.data_columns + ) + if inferred is not None: + parquet_config.time_column = ParquetTimeColumn(path=inferred) + parquet_config.data_columns = [ + c for c in parquet_config.data_columns if c.path != inferred + ] + return parquet_config + elif proto.HasField("single_channel_per_row"): + parquet_scpr_config = ParquetSingleChannelPerRowImportConfig._from_proto( + proto, footer_offset=footer_offset, footer_length=footer_length + ) + if not parquet_scpr_config.time_column.path: + inferred = _infer_time_column( + (col.column_config.name, ChannelDataType(col.column_config.data_type), col.path) + for col in proto.single_channel_per_row.columns + ) + if inferred is not None: + parquet_scpr_config.time_column = ParquetTimeColumn(path=inferred) + return parquet_scpr_config + raise ValueError(f"Unsupported parquet layout in DetectConfig response for '{filename}'.") + + +async def _prepare_parquet_config( + config: ParquetFlatDatasetImportConfig | ParquetSingleChannelPerRowImportConfig, + path: Path, +) -> None: + """Populate parquet footer fields on the config if not already set.""" + if config.footer_offset == 0 and config.footer_length == 0: + footer_bytes, footer_offset = await run_sync_function(lambda: extract_parquet_footer(path)) + config.footer_offset = footer_offset + config.footer_length = len(footer_bytes) diff --git a/python/lib/sift_client/resources/jobs.py b/python/lib/sift_client/resources/jobs.py index 6ddaec6ca..5a9eb38c6 100644 --- a/python/lib/sift_client/resources/jobs.py +++ b/python/lib/sift_client/resources/jobs.py @@ -9,7 +9,6 @@ from alive_progress import alive_bar # type: ignore[import-untyped] -import sift_client as _sift_client_module from sift_client._internal.low_level_wrappers.jobs import JobsLowLevelClient from sift_client._internal.util.executor import run_sync_function from sift_client._internal.util.file import download_file, extract_zip @@ -194,13 +193,7 @@ async def wait_until_complete( """ job_id = job._id_or_error if isinstance(job, Job) else job if show_progress is None: - global_setting = _sift_client_module.config.show_progress - if global_setting is not None: - show_progress = global_setting - elif getattr(self, "_is_sync", False): - show_progress = True - else: - show_progress = False + show_progress = self._show_progress() start = time.monotonic() with alive_bar( @@ -263,13 +256,7 @@ async def wait_and_download( """ job_id = job._id_or_error if isinstance(job, Job) else job if show_progress is None: - global_setting = _sift_client_module.config.show_progress - if global_setting is not None: - show_progress = global_setting - elif getattr(self, "_is_sync", False): - show_progress = True - else: - show_progress = False + show_progress = self._show_progress() completed_job = await self.wait_until_complete( job=job_id, diff --git a/python/lib/sift_client/resources/sync_stubs/__init__.py b/python/lib/sift_client/resources/sync_stubs/__init__.py index acd73755e..982a028c6 100644 --- a/python/lib/sift_client/resources/sync_stubs/__init__.py +++ b/python/lib/sift_client/resources/sync_stubs/__init__.py @@ -8,6 +8,7 @@ CalculatedChannelsAPIAsync, ChannelsAPIAsync, DataExportAPIAsync, + DataImportAPIAsync, FileAttachmentsAPIAsync, JobsAPIAsync, PingAPIAsync, @@ -30,12 +31,14 @@ TagsAPI = generate_sync_api(TagsAPIAsync, "TagsAPI") TestResultsAPI = generate_sync_api(TestResultsAPIAsync, "TestResultsAPI") DataExportAPI = generate_sync_api(DataExportAPIAsync, "DataExportAPI") +DataImportAPI = generate_sync_api(DataImportAPIAsync, "DataImportAPI") __all__ = [ "AssetsAPI", "CalculatedChannelsAPI", "ChannelsAPI", "DataExportAPI", + "DataImportAPI", "FileAttachmentsAPI", "JobsAPI", "PingAPI", diff --git a/python/lib/sift_client/resources/sync_stubs/__init__.pyi b/python/lib/sift_client/resources/sync_stubs/__init__.pyi index fe87809cd..0e9d18b76 100644 --- a/python/lib/sift_client/resources/sync_stubs/__init__.pyi +++ b/python/lib/sift_client/resources/sync_stubs/__init__.pyi @@ -21,6 +21,10 @@ if TYPE_CHECKING: CalculatedChannelUpdate, ) from sift_client.sift_types.channel import Channel + from sift_client.sift_types.data_import import ( + DataTypeKey, + ImportConfig, + ) from sift_client.sift_types.export import ExportOutputFormat from sift_client.sift_types.file_attachment import ( FileAttachment, @@ -621,6 +625,171 @@ class DataExportAPI: """ ... +class DataImportAPI: + """Sync counterpart to `DataImportAPIAsync`. + + High-level API for importing data into Sift. + """ + + def __init__(self, sift_client: SiftClient): + """Initialize the DataImportAPI. + + Args: + sift_client: The Sift client to use. + """ + ... + + def _run(self, coro): ... + def detect_config( + self, file_path: str | Path, data_type: DataTypeKey | None = None + ) -> ImportConfig: + """Auto-detect import configuration from a file. + + Reads a sample of the file, sends it to the server's DetectConfig + endpoint, and returns the detected configuration. The file format + is inferred from the file extension when ``data_type`` is not + provided. + + Only CSV and Parquet files are currently supported for auto-detection. + For other formats (TDMS, HDF5), create the config manually + using ``TdmsImportConfig`` or ``Hdf5ImportConfig``. + + For CSV files, the server scans the first two rows for an optional + JSON metadata row. Row 1 is checked first; row 2 is checked only + if row 1 is not valid metadata. A row qualifies as metadata when + every cell contains valid JSON that describes either a time column + or a data column. When present, ``first_data_row`` in the returned + config is set to the row after the metadata row. + + Each data column cell is a JSON ``ChannelConfig``:: + + {"name": "speed", "units": "m/s", "dataType": "CHANNEL_DATA_TYPE_DOUBLE"} + + The time column cell is a JSON ``CsvTimeColumn``:: + + {"format": "TIME_FORMAT_ABSOLUTE_RFC3339"} + + Enum type definitions and bit field elements can also be specified + in the metadata row; they are applied server-side during import + but are not included in the returned config. + + For file types with multiple layouts (e.g. Parquet), ``data_type`` + must be specified explicitly. + + Args: + file_path: Path to the file to analyze. + data_type: Explicit data type key. Required for formats like + Parquet where the extension alone is ambiguous. + + Returns: + The detected import config. + + Raises: + FileNotFoundError: If the file does not exist. + ValueError: If the file extension is unsupported or no + supported configuration could be detected. + """ + ... + + def get_run(self, data_import_id: str) -> Run: + """Get the run associated with a data import. + + The ``data_import_id`` is available on the job returned by + ``import_from_path`` via ``job.job_details.data_import_id``. + For a more ergonomic approach, use ``job.get_import_run()`` + which calls this method internally. + + Args: + data_import_id: The ID of the data import. + + Returns: + The Run created by or associated with the import. + + Raises: + ValueError: If the data import has no associated run. + """ + ... + + def import_from_path( + self, + file_path: str | Path, + *, + asset: Asset | str | None = None, + config: ImportConfig | None = None, + data_type: DataTypeKey | None = None, + run: Run | str | None = None, + run_name: str | None = None, + show_progress: bool | None = None, + ) -> Job: + """Import data from a local file. + + Creates a data import on the server, uploads the file, and returns + a ``Job`` handle after uploading the file. The import processes + server-side and typically completes shortly after upload. Use + ``job.wait_until_complete()`` only if you need to confirm + completion before proceeding. + + When ``config`` is omitted the file format is auto-detected via + ``detect_config`` (CSV and Parquet only). For other formats + (TDMS and HDF5), ``config`` must be provided. + When ``asset`` is provided it overrides the config value; + otherwise the config's ``asset_name`` is used. + If neither ``run`` nor ``run_name`` is provided (and none is + set on the config), ``run_name`` defaults to the filename. + + Examples: + Import a CSV file with auto-detected config: + + job = client.data_imports.import_from_path( + "data.csv", + asset=my_asset, + ) + + Auto-detect config, inspect and patch before importing: + + config = client.data_imports.detect_config("data.csv") + + # Fix a column data type + config["temperature"].data_type = ChannelDataType.FLOAT + + # Remove an unwanted column + config.data_columns = [ + dc for dc in config.data_columns if dc.name != "internal_id" + ] + + job = client.data_imports.import_from_path( + "data.csv", + asset=my_asset, + config=config, + ) + + Args: + file_path: Path to the local file to import. + asset: Asset object or asset name to import data into. Optional + when ``config`` already has ``asset_name`` set. + config: Import configuration describing the file format and column + mapping. When provided, ``data_type`` is ignored. If omitted, + the config is auto-detected via ``detect_config``. You can + call ``detect_config`` yourself to inspect and modify the + config before passing it here. + data_type: Explicit data type key. Required for formats like + Parquet where the extension alone is ambiguous. Only used + when ``config`` is not provided. + run: ``Run`` object or run ID string to import into an existing + run. Mutually exclusive with ``run_name``. + run_name: Name for a new run. Defaults to the filename if + neither ``run`` nor ``run_name`` is set. + show_progress: If True, display a progress spinner during upload. + Defaults to True for sync, False for async. + + Returns: + A ``Job`` handle for the pending import. + + Raises: + FileNotFoundError: If the file does not exist. + """ + ... + class FileAttachmentsAPI: """Sync counterpart to `FileAttachmentsAPIAsync`. diff --git a/python/lib/sift_client/sift_types/data_import.py b/python/lib/sift_client/sift_types/data_import.py new file mode 100644 index 000000000..62208a678 --- /dev/null +++ b/python/lib/sift_client/sift_types/data_import.py @@ -0,0 +1,639 @@ +from __future__ import annotations + +from abc import ABC +from datetime import datetime # noqa: TC003 +from enum import Enum +from typing import Union + +from pydantic import BaseModel, model_validator +from sift.common.type.v1.channel_config_pb2 import ChannelConfig as ChannelConfigProto +from sift.data_imports.v2.data_imports_pb2 import ( + DATA_TYPE_KEY_CSV, + DATA_TYPE_KEY_HDF5, + DATA_TYPE_KEY_PARQUET_FLATDATASET, + DATA_TYPE_KEY_PARQUET_SINGLE_CHANNEL_PER_ROW, + DATA_TYPE_KEY_TDMS, + PARQUET_COMPLEX_TYPES_IMPORT_MODE_BOTH, + PARQUET_COMPLEX_TYPES_IMPORT_MODE_BYTES, + PARQUET_COMPLEX_TYPES_IMPORT_MODE_IGNORE, + PARQUET_COMPLEX_TYPES_IMPORT_MODE_STRING, +) +from sift.data_imports.v2.data_imports_pb2 import CsvConfig as CsvConfigProto +from sift.data_imports.v2.data_imports_pb2 import CsvTimeColumn as CsvTimeColumnProto +from sift.data_imports.v2.data_imports_pb2 import Hdf5Config as Hdf5ConfigProto +from sift.data_imports.v2.data_imports_pb2 import Hdf5DataConfig as Hdf5DataConfigProto +from sift.data_imports.v2.data_imports_pb2 import ParquetConfig as ParquetConfigProto +from sift.data_imports.v2.data_imports_pb2 import ParquetDataColumn as ParquetDataColumnProto +from sift.data_imports.v2.data_imports_pb2 import ( + ParquetFlatDatasetConfig as ParquetFlatDatasetConfigProto, +) +from sift.data_imports.v2.data_imports_pb2 import ( + ParquetSingleChannelPerRowConfig as ParquetSingleChannelPerRowConfigProto, +) +from sift.data_imports.v2.data_imports_pb2 import ( + ParquetSingleChannelPerRowMultiChannelConfig as ParquetSingleChannelPerRowMultiChannelConfigProto, +) +from sift.data_imports.v2.data_imports_pb2 import ( + ParquetSingleChannelPerRowSingleChannelConfig as ParquetSingleChannelPerRowSingleChannelConfigProto, +) +from sift.data_imports.v2.data_imports_pb2 import ParquetTimeColumn as ParquetTimeColumnProto +from sift.data_imports.v2.data_imports_pb2 import TDMSConfig as TDMSConfigProto +from sift.data_imports.v2.data_imports_pb2 import TimeFormat as TimeFormatProto + +from sift_client._internal.util.timestamp import to_pb_timestamp +from sift_client.sift_types.channel import ChannelDataType + + +class TimeFormat(Enum): + """Supported time formats for data import columns.""" + + RELATIVE_NANOSECONDS = TimeFormatProto.TIME_FORMAT_RELATIVE_NANOSECONDS + RELATIVE_MICROSECONDS = TimeFormatProto.TIME_FORMAT_RELATIVE_MICROSECONDS + RELATIVE_MILLISECONDS = TimeFormatProto.TIME_FORMAT_RELATIVE_MILLISECONDS + RELATIVE_SECONDS = TimeFormatProto.TIME_FORMAT_RELATIVE_SECONDS + RELATIVE_MINUTES = TimeFormatProto.TIME_FORMAT_RELATIVE_MINUTES + RELATIVE_HOURS = TimeFormatProto.TIME_FORMAT_RELATIVE_HOURS + ABSOLUTE_RFC3339 = TimeFormatProto.TIME_FORMAT_ABSOLUTE_RFC3339 + ABSOLUTE_DATETIME = TimeFormatProto.TIME_FORMAT_ABSOLUTE_DATETIME + ABSOLUTE_UNIX_SECONDS = TimeFormatProto.TIME_FORMAT_ABSOLUTE_UNIX_SECONDS + ABSOLUTE_UNIX_MILLISECONDS = TimeFormatProto.TIME_FORMAT_ABSOLUTE_UNIX_MILLISECONDS + ABSOLUTE_UNIX_MICROSECONDS = TimeFormatProto.TIME_FORMAT_ABSOLUTE_UNIX_MICROSECONDS + ABSOLUTE_UNIX_NANOSECONDS = TimeFormatProto.TIME_FORMAT_ABSOLUTE_UNIX_NANOSECONDS + + +class DataTypeKey(Enum): + """Supported file types for data import detection.""" + + CSV = DATA_TYPE_KEY_CSV + PARQUET_FLATDATASET = DATA_TYPE_KEY_PARQUET_FLATDATASET + PARQUET_SINGLE_CHANNEL_PER_ROW = DATA_TYPE_KEY_PARQUET_SINGLE_CHANNEL_PER_ROW + TDMS = DATA_TYPE_KEY_TDMS + HDF5 = DATA_TYPE_KEY_HDF5 + + +EXTENSION_TO_DATA_TYPE_KEY: dict[str, DataTypeKey] = { + ".csv": DataTypeKey.CSV, + ".tdms": DataTypeKey.TDMS, + ".h5": DataTypeKey.HDF5, + ".hdf5": DataTypeKey.HDF5, +} + + +class TimeColumnBase(BaseModel, ABC): + """Base class for time column configurations. + + Attributes: + format: The time format used in this column. + relative_start_time: Required when using a relative time format. + """ + + format: TimeFormat + relative_start_time: datetime | None = None + + @model_validator(mode="after") + def _check_relative_start_time(self) -> TimeColumnBase: + if self.format.name.startswith("RELATIVE_") and self.relative_start_time is None: + raise ValueError( + f"'relative_start_time' is required when using a relative time format ({self.format.name})." + ) + return self + + +class DataColumnBase(BaseModel, ABC): + """Base class for data column definitions. + + Attributes: + name: Channel name. + data_type: The data type of the channel values. + units: Optional units string. + description: Optional channel description. + """ + + name: str + data_type: ChannelDataType + units: str = "" + description: str = "" + + +class ImportConfigBase(BaseModel, ABC): + """Base class for all import configurations. + + Attributes: + asset_name: Name of the asset to import data into. + run_name: Name for the run. Ignored if ``run_id`` is set. + run_id: ID of an existing run to append data to. + """ + + asset_name: str + run_name: str | None = None + run_id: str | None = None + + +class CsvTimeColumn(TimeColumnBase): + """Time column configuration for CSV imports. + + Attributes: + column: The 1-indexed column number of the time column. + """ + + column: int + + def _to_proto(self) -> CsvTimeColumnProto: + proto = CsvTimeColumnProto( + column_number=self.column, + format=self.format.value, + ) + if self.relative_start_time is not None: + proto.relative_start_time.CopyFrom(to_pb_timestamp(self.relative_start_time)) + return proto + + +class CsvDataColumn(DataColumnBase): + """A data column definition for CSV imports. + + Attributes: + column: The 1-indexed column number. + """ + + column: int + + +class CsvImportConfig(ImportConfigBase): + """Configuration for importing a CSV file. + + Attributes: + first_data_row: The first row containing data (1-indexed). Defaults to 2 to skip a header row. + time_column: Time column configuration. + data_columns: List of data column definitions. + """ + + first_data_row: int = 2 + time_column: CsvTimeColumn + data_columns: list[CsvDataColumn] + + def __getitem__(self, name: str) -> CsvDataColumn: + """Look up a data column by channel name. + + Example:: + + config["temperature"].data_type = ChannelDataType.FLOAT + """ + for dc in self.data_columns: + if dc.name == name: + return dc + raise KeyError(f"No data column named '{name}'") + + def _to_proto(self) -> CsvConfigProto: + if not self.data_columns: + raise ValueError("Config has no data columns. Add at least one before importing.") + return CsvConfigProto( + asset_name=self.asset_name, + run_name=self.run_name or "", + run_id=self.run_id or "", + first_data_row=self.first_data_row, + time_column=self.time_column._to_proto(), + data_columns={ + dc.column: ChannelConfigProto( + name=dc.name, + data_type=dc.data_type.value, + units=dc.units, + description=dc.description, + ) + for dc in self.data_columns + }, + ) + + @classmethod + def _from_proto(cls, proto: CsvConfigProto) -> CsvImportConfig: + """Create from a proto CsvConfig (e.g. from DetectConfig response).""" + relative_start_time = None + if proto.time_column.HasField("relative_start_time"): + from datetime import timezone + + relative_start_time = proto.time_column.relative_start_time.ToDatetime( + tzinfo=timezone.utc + ) + time_column = CsvTimeColumn( + column=proto.time_column.column_number, + format=TimeFormat(proto.time_column.format), + relative_start_time=relative_start_time, + ) + data_columns = [ + CsvDataColumn( + column=col_num, + name=ch_cfg.name, + data_type=ChannelDataType(ch_cfg.data_type), + units=ch_cfg.units, + description=ch_cfg.description, + ) + for col_num, ch_cfg in proto.data_columns.items() + ] + return cls( + asset_name=proto.asset_name, + run_name=proto.run_name or None, + run_id=proto.run_id or None, + first_data_row=proto.first_data_row or 2, + time_column=time_column, + data_columns=data_columns, + ) + + +class ParquetComplexTypesImportMode(Enum): + """Controls how complex Parquet types (maps, lists, structs) are imported.""" + + IGNORE = PARQUET_COMPLEX_TYPES_IMPORT_MODE_IGNORE + BOTH = PARQUET_COMPLEX_TYPES_IMPORT_MODE_BOTH + STRING = PARQUET_COMPLEX_TYPES_IMPORT_MODE_STRING + BYTES = PARQUET_COMPLEX_TYPES_IMPORT_MODE_BYTES + + +class ParquetTimeColumn(TimeColumnBase): + """Time column configuration for Parquet imports. + + Attributes: + path: The column path in the Parquet schema (e.g. ``"timestamp"``). + format: The time format. Defaults to ``ABSOLUTE_UNIX_NANOSECONDS``. + """ + + path: str + format: TimeFormat = TimeFormat.ABSOLUTE_UNIX_NANOSECONDS + + def _to_proto(self) -> ParquetTimeColumnProto: + if not self.path: + raise ValueError("ParquetTimeColumn.path must be set before importing.") + proto = ParquetTimeColumnProto( + path=self.path, + format=self.format.value, + ) + if self.relative_start_time is not None: + proto.relative_start_time.CopyFrom(to_pb_timestamp(self.relative_start_time)) + return proto + + @classmethod + def _from_proto(cls, proto: ParquetTimeColumnProto) -> ParquetTimeColumn: + relative_start_time = None + if proto.HasField("relative_start_time"): + from datetime import timezone + + relative_start_time = proto.relative_start_time.ToDatetime(tzinfo=timezone.utc) + + fmt = TimeFormat(proto.format) if proto.format else TimeFormat.ABSOLUTE_UNIX_NANOSECONDS + return cls( + path=proto.path or "", + format=fmt, + relative_start_time=relative_start_time, + ) + + +class ParquetDataColumn(DataColumnBase): + """A data column definition for Parquet flat dataset imports. + + Attributes: + path: The column path in the Parquet schema. + """ + + path: str + + +class ParquetFlatDatasetImportConfig(ImportConfigBase): + """Configuration for importing a Parquet file with a flat dataset layout. + + Each column in the file maps to a separate channel. + + Attributes: + time_column: Time column configuration. + data_columns: List of data column definitions. + footer_offset: Byte offset where the Parquet footer begins. Populated + automatically when using ``detect_config``. + footer_length: Length of the Parquet footer in bytes. Populated + automatically when using ``detect_config``. + complex_types_import_mode: How to handle complex Parquet types. + """ + + time_column: ParquetTimeColumn + data_columns: list[ParquetDataColumn] + footer_offset: int = 0 + footer_length: int = 0 + complex_types_import_mode: ParquetComplexTypesImportMode = ParquetComplexTypesImportMode.IGNORE + + def __getitem__(self, name: str) -> ParquetDataColumn: + """Look up a data column by channel name. + + Example:: + + config["temperature"].data_type = ChannelDataType.FLOAT + """ + for dc in self.data_columns: + if dc.name == name: + return dc + raise KeyError(f"No data column named '{name}'") + + def _to_proto(self) -> ParquetConfigProto: + if not self.data_columns: + raise ValueError("Config has no data columns. Add at least one before importing.") + flat_dataset = ParquetFlatDatasetConfigProto( + time_column=self.time_column._to_proto(), + data_columns=[ + ParquetDataColumnProto( + path=dc.path, + channel_config=ChannelConfigProto( + name=dc.name, + data_type=dc.data_type.value, + units=dc.units, + description=dc.description, + ), + ) + for dc in self.data_columns + ], + ) + return ParquetConfigProto( + asset_name=self.asset_name, + run_name=self.run_name or "", + run_id=self.run_id or "", + flat_dataset=flat_dataset, + footer_offset=self.footer_offset, + footer_length=self.footer_length, + complex_types_import_mode=self.complex_types_import_mode.value, + ) + + @classmethod + def _from_proto( + cls, + proto: ParquetConfigProto, + footer_offset: int = 0, + footer_length: int = 0, + ) -> ParquetFlatDatasetImportConfig: + """Create from a proto ParquetConfig with a flat_dataset config.""" + fd = proto.flat_dataset + time_column = ParquetTimeColumn._from_proto(fd.time_column) + data_columns = [ + ParquetDataColumn( + path=dc.path, + name=dc.channel_config.name, + data_type=ChannelDataType(dc.channel_config.data_type), + units=dc.channel_config.units, + description=dc.channel_config.description, + ) + for dc in fd.data_columns + ] + mode = proto.complex_types_import_mode + return cls( + asset_name=proto.asset_name, + run_name=proto.run_name or None, + run_id=proto.run_id or None, + time_column=time_column, + data_columns=data_columns, + footer_offset=footer_offset or proto.footer_offset, + footer_length=footer_length or proto.footer_length, + complex_types_import_mode=ParquetComplexTypesImportMode(mode) + if mode + else ParquetComplexTypesImportMode.IGNORE, + ) + + +class ParquetSingleChannelConfig(DataColumnBase): + """Configuration for a single-channel Parquet single-channel-per-row import. + + Attributes: + data_path: The column path containing channel data. + """ + + data_path: str + + +class ParquetMultiChannelConfig(BaseModel): + """Configuration for a multi-channel Parquet single-channel-per-row import. + + Attributes: + name_path: The column path that identifies the channel name per row. + data_path: The column path containing channel data. + """ + + name_path: str + data_path: str + + +class ParquetSingleChannelPerRowImportConfig(ImportConfigBase): + """Configuration for importing a Parquet file where each row represents + a single channel's data point. + + Exactly one of ``single_channel`` or ``multi_channel`` must be set before + importing. When returned by ``detect_config()``, neither field is populated + and must be filled in before passing the config to ``import_from_path()``. + + Attributes: + time_column: Time column configuration. + single_channel: Set when the entire file contains data for one channel. + multi_channel: Set when each row identifies its channel via a name column. + footer_offset: Byte offset where the Parquet footer begins. Populated + automatically when using ``detect_config``. + footer_length: Length of the Parquet footer in bytes. Populated + automatically when using ``detect_config``. + complex_types_import_mode: How to handle complex Parquet types. + """ + + time_column: ParquetTimeColumn + single_channel: ParquetSingleChannelConfig | None = None + multi_channel: ParquetMultiChannelConfig | None = None + footer_offset: int = 0 + footer_length: int = 0 + complex_types_import_mode: ParquetComplexTypesImportMode = ParquetComplexTypesImportMode.IGNORE + + @model_validator(mode="after") + def _check_channel_config(self) -> ParquetSingleChannelPerRowImportConfig: + if self.single_channel is not None and self.multi_channel is not None: + raise ValueError( + "Exactly one of 'single_channel' or 'multi_channel' must be set, not both." + ) + return self + + def _to_proto(self) -> ParquetConfigProto: + if self.single_channel is None and self.multi_channel is None: + raise ValueError( + "Either 'single_channel' or 'multi_channel' must be set before importing. " + "If this config was returned by detect_config(), set one of these fields " + "to specify the channel layout." + ) + scpr = ParquetSingleChannelPerRowConfigProto( + time_column=self.time_column._to_proto(), + ) + if self.single_channel is not None: + sc = self.single_channel + scpr.single_channel.CopyFrom( + ParquetSingleChannelPerRowSingleChannelConfigProto( + data_path=sc.data_path, + channel=ChannelConfigProto( + name=sc.name, + data_type=sc.data_type.value, + units=sc.units, + description=sc.description, + ), + ) + ) + elif self.multi_channel is not None: + scpr.multi_channel.CopyFrom( + ParquetSingleChannelPerRowMultiChannelConfigProto( + name_path=self.multi_channel.name_path, + data_path=self.multi_channel.data_path, + ) + ) + return ParquetConfigProto( + asset_name=self.asset_name, + run_name=self.run_name or "", + run_id=self.run_id or "", + single_channel_per_row=scpr, + footer_offset=self.footer_offset, + footer_length=self.footer_length, + complex_types_import_mode=self.complex_types_import_mode.value, + ) + + @classmethod + def _from_proto( + cls, + proto: ParquetConfigProto, + footer_offset: int = 0, + footer_length: int = 0, + ) -> ParquetSingleChannelPerRowImportConfig: + """Create from a proto ParquetConfig with a single_channel_per_row config.""" + scpr = proto.single_channel_per_row + + time_column = ParquetTimeColumn._from_proto(scpr.time_column) + + single_channel = None + multi_channel = None + if scpr.HasField("single_channel"): + sc = scpr.single_channel + single_channel = ParquetSingleChannelConfig( + data_path=sc.data_path, + name=sc.channel.name, + data_type=ChannelDataType(sc.channel.data_type), + units=sc.channel.units, + description=sc.channel.description, + ) + elif scpr.HasField("multi_channel"): + mc = scpr.multi_channel + multi_channel = ParquetMultiChannelConfig( + name_path=mc.name_path, + data_path=mc.data_path, + ) + + mode = proto.complex_types_import_mode + return cls( + asset_name=proto.asset_name, + run_name=proto.run_name or None, + run_id=proto.run_id or None, + time_column=time_column, + single_channel=single_channel, + multi_channel=multi_channel, + footer_offset=footer_offset or proto.footer_offset, + footer_length=footer_length or proto.footer_length, + complex_types_import_mode=ParquetComplexTypesImportMode(mode) + if mode + else ParquetComplexTypesImportMode.IGNORE, + ) + + +class TdmsImportConfig(ImportConfigBase): + """Configuration for importing a TDMS file. + + Attributes: + start_time_override: Override the ``wf_start_time`` metadata field for all channels. + Useful when waveform channels have ``wf_increment`` but no ``wf_start_time``. + file_size: The file size in bytes. Required if the file has truncated chunks. + """ + + start_time_override: datetime | None = None + file_size: int | None = None + + def _to_proto(self) -> TDMSConfigProto: + proto = TDMSConfigProto( + asset_name=self.asset_name, + run_name=self.run_name or "", + run_id=self.run_id or "", + ) + if self.start_time_override is not None: + proto.start_time_override.CopyFrom(to_pb_timestamp(self.start_time_override)) + if self.file_size is not None: + proto.file_size = self.file_size + return proto + + +class Hdf5DataColumn(DataColumnBase): + """A dataset mapping for HDF5 imports. + + Each entry maps a time/value dataset pair to a channel. + + Attributes: + time_dataset: HDF5 path to the time dataset. + time_index: Column index within the time dataset. Defaults to 0. + value_dataset: HDF5 path to the value dataset. + value_index: Column index within the value dataset. Defaults to 0. + time_field: For compound dataset types, the field name to use for time. + value_field: For compound dataset types, the field name to use for value. + """ + + time_dataset: str + time_index: int = 0 + value_dataset: str + value_index: int = 0 + time_field: str | None = None + value_field: str | None = None + + +class Hdf5ImportConfig(ImportConfigBase): + """Configuration for importing an HDF5 file. + + Attributes: + data: List of dataset mappings, each pairing a time and value dataset to a channel. + time_format: The time format used across all time datasets. + relative_start_time: Required when using a relative time format. + """ + + data: list[Hdf5DataColumn] + time_format: TimeFormat + relative_start_time: datetime | None = None + + @model_validator(mode="after") + def _check_relative_start_time(self) -> Hdf5ImportConfig: + if self.time_format.name.startswith("RELATIVE_") and self.relative_start_time is None: + raise ValueError( + f"'relative_start_time' is required when using a relative time format ({self.time_format.name})." + ) + return self + + def _to_proto(self) -> Hdf5ConfigProto: + proto = Hdf5ConfigProto( + asset_name=self.asset_name, + run_name=self.run_name or "", + run_id=self.run_id or "", + time_format=self.time_format.value, + data=[ + Hdf5DataConfigProto( + time_dataset=d.time_dataset, + time_index=d.time_index, + value_dataset=d.value_dataset, + value_index=d.value_index, + channel_config=ChannelConfigProto( + name=d.name, + data_type=d.data_type.value, + units=d.units, + description=d.description, + ), + time_field=d.time_field, + value_field=d.value_field, + ) + for d in self.data + ], + ) + if self.relative_start_time is not None: + proto.relative_start_time.CopyFrom(to_pb_timestamp(self.relative_start_time)) + return proto + + +ImportConfig = Union[ + CsvImportConfig, + ParquetFlatDatasetImportConfig, + ParquetSingleChannelPerRowImportConfig, + TdmsImportConfig, + Hdf5ImportConfig, +] diff --git a/python/lib/sift_client/sift_types/job.py b/python/lib/sift_client/sift_types/job.py index 6d3adbe2d..676cdc8c2 100644 --- a/python/lib/sift_client/sift_types/job.py +++ b/python/lib/sift_client/sift_types/job.py @@ -19,6 +19,7 @@ from pathlib import Path from sift_client.client import SiftClient + from sift_client.sift_types.run import Run class JobType(str, Enum): @@ -315,6 +316,22 @@ def wait_until_complete( self._update(completed_job) return self + def get_import_run(self) -> Run: + """Get the run created by this data import job. + + Returns: + The Run associated with this import. + + Raises: + ValueError: If this is not a data import job or the import + has no associated run. + """ + if self.job_type != JobType.DATA_IMPORT: + raise ValueError("get_import_run() is only valid for data import jobs.") + if not isinstance(self.job_details, DataImportDetails): + raise ValueError("Job does not have data import details.") + return self.client.data_import.get_run(self.job_details.data_import_id) + def wait_and_download( self, *, diff --git a/python/lib/sift_client/sift_types/run.py b/python/lib/sift_client/sift_types/run.py index acfb59a92..ec6690896 100644 --- a/python/lib/sift_client/sift_types/run.py +++ b/python/lib/sift_client/sift_types/run.py @@ -19,10 +19,14 @@ from sift_client.util.metadata import metadata_dict_to_proto, metadata_proto_to_dict if TYPE_CHECKING: + from pathlib import Path + from sift_stream_bindings import RunFormPy from sift_client.client import SiftClient from sift_client.sift_types.asset import Asset + from sift_client.sift_types.data_import import DataTypeKey, ImportConfig + from sift_client.sift_types.job import Job class Run(BaseType[RunProto, "Run"], FileAttachmentsMixin): @@ -127,6 +131,43 @@ def stop(self) -> Run: self._update(updated_run) return self + def import_data( + self, + file_path: str | Path, + *, + asset: Asset | str | None = None, + config: ImportConfig | None = None, + data_type: DataTypeKey | None = None, + show_progress: bool | None = None, + ) -> Job: + """Import data from a file into this run. + + Convenience method that calls ``client.data_imports.import_from_path`` + with this run pre-filled. If the run has exactly one asset, + ``asset`` is inferred automatically. + + Args: + file_path: Path to the local file to import. + asset: Asset object or asset name to import data into. + config: Import configuration. Auto-detected if omitted. + data_type: Explicit data type key for ambiguous formats. + show_progress: Display a progress spinner during upload. + + Returns: + A ``Job`` handle for the pending import. + """ + if asset is None and len(self.asset_ids) == 1: + asset = self.client.assets.get(asset_id=self.asset_ids[0]) + + return self.client.data_import.import_from_path( + file_path, + asset=asset, + config=config, + data_type=data_type, + run=self, + show_progress=show_progress, + ) + class RunBase(ModelCreateUpdateBase): """Base class for Run create and update models with shared fields and validation.""" diff --git a/python/lib/sift_client/util/util.py b/python/lib/sift_client/util/util.py index e82a8ccfe..98719cfdd 100644 --- a/python/lib/sift_client/util/util.py +++ b/python/lib/sift_client/util/util.py @@ -8,6 +8,7 @@ CalculatedChannelsAPIAsync, ChannelsAPIAsync, DataExportAPIAsync, + DataImportAPIAsync, FileAttachmentsAPIAsync, IngestionAPIAsync, JobsAPIAsync, @@ -62,6 +63,9 @@ class AsyncAPIs(NamedTuple): data_export: DataExportAPIAsync """Instance of the Data Export API for making asynchronous requests.""" + data_import: DataImportAPIAsync + """Instance of the Data Import API for making asynchronous requests.""" + def count_non_none(*args: Any) -> int: """Count the number of non-none arguments."""