diff --git a/python/lib/sift_py/_internal/metadata.py b/python/lib/sift_py/_internal/metadata.py index f01626d5f..40c62c3b7 100644 --- a/python/lib/sift_py/_internal/metadata.py +++ b/python/lib/sift_py/_internal/metadata.py @@ -1,15 +1,20 @@ -from typing import Dict, List, Union +from typing import Any, Callable, Dict, List, Optional, Union from sift.metadata.v1.metadata_pb2 import MetadataKey, MetadataKeyType, MetadataValue -def metadata_dict_to_pb(_metadata: Dict[str, Union[str, float, bool]]) -> List[MetadataValue]: +def metadata_dict_to_pb( + _metadata: Dict[str, Union[str, float, bool, int]], + parse: Optional[Callable[[Any], Optional[Union[str, float, bool, int]]]] = None, +) -> List[MetadataValue]: """ Wraps metadata dictionary into a list of MetadataValue objects. Args: _metadata: Dictionary of metadata key-value pairs. - + parse: Optional function to parse complex types into a compatible + metadata type (i.e, str, float, int, or bool). Function should raise an + Exception if it can't parse the value. Returns: List of MetadataValue objects. """ @@ -21,6 +26,12 @@ def metadata_dict_to_pb(_metadata: Dict[str, Union[str, float, bool]]) -> List[M boolean_value = None number_value = None + if not isinstance(value, (str, float, bool, int)): + if parse: + value = parse(value) + else: + raise ValueError(f"Unsupported metadata value type for key '{key}': {value}") + if isinstance(value, str): string_value = value type = MetadataKeyType.METADATA_KEY_TYPE_STRING @@ -46,7 +57,7 @@ def metadata_dict_to_pb(_metadata: Dict[str, Union[str, float, bool]]) -> List[M return metadata -def metadata_pb_to_dict(metadata: List[MetadataValue]) -> Dict[str, Union[str, float, bool]]: +def metadata_pb_to_dict(metadata: List[MetadataValue]) -> Dict[str, Union[str, float, bool, int]]: """ Unwraps a list of MetadataValue objects into a dictionary. @@ -56,7 +67,7 @@ def metadata_pb_to_dict(metadata: List[MetadataValue]) -> Dict[str, Union[str, f Returns: Dictionary of metadata key-value pairs. """ - unwrapped_metadata: Dict[str, Union[str, float, bool]] = {} + unwrapped_metadata: Dict[str, Union[str, float, bool, int]] = {} for md in metadata: if md.key.name in unwrapped_metadata: raise ValueError(f"Key already exists: {md.key.name}") @@ -68,3 +79,31 @@ def metadata_pb_to_dict(metadata: List[MetadataValue]) -> Dict[str, Union[str, f unwrapped_metadata[md.key.name] = md.number_value return unwrapped_metadata + + +def metadata_pb_to_dict_api(metadata: List[MetadataValue]) -> List[Dict[str, Any]]: + """ + Serializes a list of MetadataValue objects to a n API compatible dict, + preserving the proto structure. + + Args: + metadata: List of MetadataValue objects. + + Returns: + Dict representing the metadata with proto structure. + """ + + def metadata_value_to_dict(md: MetadataValue) -> Dict[str, Any]: + value_dict: Dict[str, Any] = {"key": {"name": md.key.name, "type": md.key.type}} + if md.key.type == MetadataKeyType.METADATA_KEY_TYPE_STRING: + value_dict["string_value"] = md.string_value + elif md.key.type == MetadataKeyType.METADATA_KEY_TYPE_BOOLEAN: + value_dict["boolean_value"] = md.boolean_value + elif md.key.type == MetadataKeyType.METADATA_KEY_TYPE_NUMBER: + value_dict["number_value"] = md.number_value + else: + raise ValueError(f"{md.key.name} set to invalid type: {md.key.type}") + return value_dict + + metadata_list = [metadata_value_to_dict(md) for md in metadata] + return metadata_list diff --git a/python/lib/sift_py/data_import/_tdms_test.py b/python/lib/sift_py/data_import/_tdms_test.py index a80fd1ef4..45861fd87 100644 --- a/python/lib/sift_py/data_import/_tdms_test.py +++ b/python/lib/sift_py/data_import/_tdms_test.py @@ -5,6 +5,7 @@ import pytest from nptdms import TdmsFile, types # type: ignore from pytest_mock import MockFixture +from sift.metadata.v1.metadata_pb2 import MetadataKeyType from sift_py.data_import.tdms import TdmsTimeFormat, TdmsUploadService, sanitize_string from sift_py.rest import SiftRestConfig @@ -40,7 +41,14 @@ def channels(self) -> List[MockTdmsChannel]: class MockTdmsFile: def __init__(self, groups: List[MockTdmsGroup]): self._groups: List[MockTdmsGroup] = groups - self.properties: Dict[str, str] = {} + # Example properties for each type + self.properties: Dict[str, Any] = { + "string_prop": "example", + "int_prop": 42, + "float_prop": 3.14, + "bool_prop": True, + "datetime_prop": pd.Timestamp("2024-01-01T12:00:00"), + } def groups(self) -> List[MockTdmsGroup]: return self._groups @@ -50,9 +58,9 @@ def as_dataframe(self, *_, **__): class MockResponse: - def __init__(self): - self.status_code = 200 - self.text = json.dumps({"uploadUrl": "some_url.com", "dataImportId": "123-123-123"}) + def __init__(self, status_code=None, text=None): + self.status_code = status_code or 200 + self.text = text or json.dumps({"uploadUrl": "some_url.com", "dataImportId": "123-123-123"}) def json(self) -> dict: return json.loads(self.text) @@ -730,3 +738,110 @@ def mock_tdms_file_constructor2(path): tdms_time_format=TdmsTimeFormat.TIME_CHANNEL, ignore_errors=True, ) + + +def test_tdms_upload_service_upload_with_metadata( + mocker: MockFixture, mock_waveform_tdms_file: MockTdmsFile +): + mock_path_is_file = mocker.patch("sift_py.data_import.tdms.Path.is_file") + mock_path_is_file.return_value = True + + mock_path_getsize = mocker.patch("sift_py.data_import.csv.os.path.getsize") + mock_path_getsize.return_value = 10 + + # Patch TdmsFile to return our mock file + mocker.patch("sift_py.data_import.tdms.TdmsFile", return_value=mock_waveform_tdms_file) + + # Patch requests.Session.post to simulate both run creation and data import + mock_requests_post = mocker.patch("sift_py.rest.requests.Session.post") + + # The first call is for _create_run, second for config upload, third for file upload + def post_side_effect(*args, **kwargs): + url = kwargs.get("url") or (args[1] if len(args) > 1 else "") + if "run" in url: + # Simulate run creation response + return MockResponse( + status_code=200, + text=json.dumps({"run": {"runId": "new_run_id"}}), + ) + elif "data-imports:upload" in url: + # Simulate config upload response + return MockResponse() + elif "some_url.com" in url: + # Simulate file upload response + return MockResponse() + else: + return MockResponse() + + mock_requests_post.side_effect = post_side_effect + + svc = TdmsUploadService(rest_config) + + # Should raise if run_id is provided + with pytest.raises(ValueError, match="Metadata can only be included in new runs"): + svc.upload( + "some_tdms.tdms", + "asset_name", + include_metadata=True, + run_id="existing_run_id", + run_name="Run Name", + ) + + # Should raise if run_name is not provided + with pytest.raises(ValueError, match="Must provide a run_name to include metadata"): + svc.upload( + "some_tdms.tdms", + "asset_name", + include_metadata=True, + run_name=None, + ) + + # Should succeed and call _create_run via POST with metadata + svc.upload( + "some_tdms.tdms", + "asset_name", + include_metadata=True, + run_name="Run Name", + ) + + # Check that the first POST call was for run creation and included metadata + create_run_post_call = mock_requests_post.call_args_list[0] + create_run_post_data = json.loads(create_run_post_call.kwargs["data"]) + assert create_run_post_data["name"] == "Run Name" + + # Metadata should be present and contain expected keys + assert "metadata" in create_run_post_data + assert create_run_post_data["metadata"][0]["key"]["name"] == "string_prop" + assert ( + create_run_post_data["metadata"][0]["key"]["type"] + == MetadataKeyType.METADATA_KEY_TYPE_STRING + ) + assert create_run_post_data["metadata"][0]["string_value"] == "example" + + assert create_run_post_data["metadata"][1]["key"]["name"] == "int_prop" + assert ( + create_run_post_data["metadata"][1]["key"]["type"] + == MetadataKeyType.METADATA_KEY_TYPE_NUMBER + ) + assert create_run_post_data["metadata"][1]["number_value"] == 42 + + assert create_run_post_data["metadata"][2]["key"]["name"] == "float_prop" + assert ( + create_run_post_data["metadata"][2]["key"]["type"] + == MetadataKeyType.METADATA_KEY_TYPE_NUMBER + ) + assert create_run_post_data["metadata"][2]["number_value"] == 3.14 + + assert create_run_post_data["metadata"][3]["key"]["name"] == "bool_prop" + assert ( + create_run_post_data["metadata"][3]["key"]["type"] + == MetadataKeyType.METADATA_KEY_TYPE_BOOLEAN + ) + assert create_run_post_data["metadata"][3]["boolean_value"] is True + + assert create_run_post_data["metadata"][4]["key"]["name"] == "datetime_prop" + assert ( + create_run_post_data["metadata"][4]["key"]["type"] + == MetadataKeyType.METADATA_KEY_TYPE_STRING + ) + assert create_run_post_data["metadata"][4]["string_value"].startswith("2024-01-01T12:00:00") diff --git a/python/lib/sift_py/data_import/csv.py b/python/lib/sift_py/data_import/csv.py index 4cf7bb4e2..dee750783 100644 --- a/python/lib/sift_py/data_import/csv.py +++ b/python/lib/sift_py/data_import/csv.py @@ -7,7 +7,9 @@ import pandas as pd from alive_progress import alive_bar # type: ignore +from sift.metadata.v1.metadata_pb2 import MetadataValue +from sift_py._internal.metadata import metadata_pb_to_dict_api from sift_py.data_import.config import CsvConfig from sift_py.data_import.status import DataImportService from sift_py.data_import.time_format import TimeFormatType @@ -18,6 +20,7 @@ class CsvUploadService(_RestService): UPLOAD_PATH = "/api/v1/data-imports:upload" URL_PATH = "/api/v1/data-imports:url" + RUN_PATH = "/api/v2/runs" _rest_conf: SiftRestConfig _upload_uri: str @@ -258,6 +261,50 @@ def _mime_and_content_type_from_path(path: Path) -> Tuple[str, Optional[str], Op mime, encoding = mimetypes.guess_type(path) return file_name, mime, encoding + def _create_run(self, run_name: str, metadata: Optional[List[MetadataValue]] = None) -> str: + """Create a new run using the REST service, and return a run_id. + + Args: + run_name: The name of the Run. + metadata: Optional metadata fields to add to the run. + + Returns: + The run id. + """ + run_uri = urljoin(self._base_uri, self.RUN_PATH) + + req: Dict[str, Any] = { + "name": run_name, + "description": "", + } + + if metadata: + req["metadata"] = metadata_pb_to_dict_api(metadata) + + response = self._session.post( + url=run_uri, + headers={ + "Content-Encoding": "application/json", + }, + data=json.dumps(req), + ) + if response.status_code != 200: + raise Exception( + f"Run creation failed with status code {response.status_code}. {response.text}" + ) + + try: + run_info = response.json() + except (json.decoder.JSONDecodeError, KeyError): + raise Exception(f"Invalid response: {response.text}") + + if "run" not in run_info: + raise Exception("Response missing key: run") + if "runId" not in run_info["run"]: + raise Exception("Response missing key: runId") + + return run_info["run"]["runId"] + class _ProgressFile: """Displays the status with alive_bar while reading the file.""" diff --git a/python/lib/sift_py/data_import/hdf5.py b/python/lib/sift_py/data_import/hdf5.py index eea9cb54b..7b575c1f7 100644 --- a/python/lib/sift_py/data_import/hdf5.py +++ b/python/lib/sift_py/data_import/hdf5.py @@ -1,10 +1,8 @@ -import json import uuid from collections import defaultdict from contextlib import ExitStack from pathlib import Path from typing import Dict, List, Tuple, Union, cast -from urllib.parse import urljoin import numpy as np @@ -37,7 +35,6 @@ class Hdf5UploadService: Service to upload HDF5 files. """ - _RUN_PATH = "/api/v2/runs" _csv_upload_service: CsvUploadService _prev_run_id: str @@ -96,7 +93,7 @@ def upload( # Perform now instead of before the config split to avoid creating a run any problems arise before ready to upload # Active run_id copied to _prev_run_id for user reference if hdf5_config._hdf5_config.run_name != "": - run_id = self._create_run(hdf5_config._hdf5_config.run_name) + run_id = self._csv_upload_service._create_run(hdf5_config._hdf5_config.run_name) for _, csv_config in csv_items: csv_config._csv_config.run_name = "" csv_config._csv_config.run_id = run_id @@ -127,40 +124,6 @@ def get_previous_upload_run_id(self) -> str: """Return the run_id used in the previous upload""" return self._prev_run_id - def _create_run(self, run_name: str) -> str: - """Create a new run using the REST service, and return a run_id""" - run_uri = urljoin(self._csv_upload_service._base_uri, self._RUN_PATH) - - # Since CSVUploadService is already a RestService, we can utilize that - response = self._csv_upload_service._session.post( - url=run_uri, - headers={ - "Content-Encoding": "application/json", - }, - data=json.dumps( - { - "name": run_name, - "description": "", - } - ), - ) - if response.status_code != 200: - raise Exception( - f"Run creation failed with status code {response.status_code}. {response.text}" - ) - - try: - run_info = response.json() - except (json.decoder.JSONDecodeError, KeyError): - raise Exception(f"Invalid response: {response.text}") - - if "run" not in run_info: - raise Exception("Response missing key: run") - if "runId" not in run_info["run"]: - raise Exception("Response missing key: runId") - - return run_info["run"]["runId"] - def _convert_to_csv_file( src_path: Union[str, Path], diff --git a/python/lib/sift_py/data_import/tdms.py b/python/lib/sift_py/data_import/tdms.py index ccb894fb8..cfef5e411 100644 --- a/python/lib/sift_py/data_import/tdms.py +++ b/python/lib/sift_py/data_import/tdms.py @@ -1,10 +1,12 @@ import warnings from collections import namedtuple from csv import DictWriter +from datetime import datetime from enum import Enum from pathlib import Path from typing import Dict, List, Optional, Sequence, TextIO, Union +import numpy as np from pandas import to_datetime try: @@ -24,6 +26,7 @@ ) from e from sift_py._internal.channel import channel_fqn as _channel_fqn +from sift_py._internal.metadata import metadata_dict_to_pb from sift_py.data_import._config import DataColumn, TimeColumn from sift_py.data_import.config import CsvConfig from sift_py.data_import.csv import CsvUploadService @@ -109,6 +112,7 @@ def upload( run_name: Optional[str] = None, run_id: Optional[str] = None, tdms_time_format: TdmsTimeFormat = TdmsTimeFormat.WAVEFORM, + include_metadata: bool = False, ) -> DataImportService: """ Uploads the TDMS file pointed to by `path` to the specified asset. @@ -124,6 +128,7 @@ def upload( tdms_time_format: Specify how timing information is encoded in the file. Default is WAVEFORM. If using the TIME_CHANNEL format, timestamps should use the LabVIEW/TDMS epoch (number of seconds since 01/01/1904 00:00:00.00 UTC). + include_metadata: Whether to include TDMS file metadata as Run metadata. Returns: The DataImportService used to get the status of the import. @@ -142,6 +147,31 @@ def upload( if not posix_path.is_file(): raise Exception(f"Provided path, '{path}', does not point to a regular file.") + # If metadata should be included, create the run first. + if include_metadata: + # Do not allow including metadata in existing runs since it could lead + # to overwriting metadata fields. + if run_id: + raise ValueError("Metadata can only be included in new runs") + + if not run_name: + raise ValueError("Must provide a run_name to include metadata") + + def parse_datetime(value): + """Convert datetime metadata to strings.""" + if isinstance(value, np.datetime64): + return str(value) + elif isinstance(value, datetime): + return value.isoformat() + else: + raise ValueError(f"Unable to parse value as metadata: {value}") + + tdms_file = TdmsFile(path) + metadata = metadata_dict_to_pb(tdms_file.properties, parse_datetime) + run_id = self._csv_upload_service._create_run(run_name, metadata) + # Clear the run name since we are using run_id now. + run_name = None + with NamedTemporaryFile(mode="wt", suffix=".csv.gz") as temp_file: csv_config = self._convert_to_csv( path, @@ -153,7 +183,9 @@ def upload( run_id, tdms_time_format, ) - return self._csv_upload_service.upload(temp_file.name, csv_config) + data_import = self._csv_upload_service.upload(temp_file.name, csv_config) + + return data_import def _convert_to_csv( self,