diff --git a/python/lib/sift_client/_tests/resources/test_data_imports.py b/python/lib/sift_client/_tests/resources/test_data_imports.py index 6b8dcf03e..18e9c1ca5 100644 --- a/python/lib/sift_client/_tests/resources/test_data_imports.py +++ b/python/lib/sift_client/_tests/resources/test_data_imports.py @@ -1,11 +1,31 @@ """Unit tests for data import config models and helpers.""" +from __future__ import annotations + from datetime import datetime, timezone +from typing import TYPE_CHECKING, cast import pytest +from sift.common.type.v1.channel_config_pb2 import ChannelConfig as ChannelConfigProto +from sift.data_imports.v2.data_imports_pb2 import ( + ParquetColumn, + ParquetConfig, + ParquetFlatDatasetConfig, + ParquetSingleChannelPerRowConfig, +) +from sift.data_imports.v2.data_imports_pb2 import ( + ParquetDataColumn as ParquetDataColumnProto, +) +from sift.data_imports.v2.data_imports_pb2 import ( + ParquetTimeColumn as ParquetTimeColumnProto, +) from sift_client.resources import DataImportAPI, DataImportAPIAsync -from sift_client.resources.data_imports import _resolve_data_type_key +from sift_client.resources.data_imports import ( + _infer_time_column, + _parse_parquet_detect_response, + _resolve_data_type_key, +) from sift_client.sift_types.channel import ChannelDataType from sift_client.sift_types.data_import import ( CsvDataColumn, @@ -22,6 +42,11 @@ TimeFormat, ) +if TYPE_CHECKING: + from sift.common.type.v1.channel_data_type_pb2 import ( + ChannelDataType as ChannelDataTypeProto, + ) + @pytest.mark.integration def test_client_binding(sift_client): @@ -362,3 +387,130 @@ def test_explicit_data_type_overrides_extension(self): def test_unknown_extension_raises(self): with pytest.raises(ValueError, match="Unsupported file extension"): _resolve_data_type_key(".xyz", None) + + +class TestInferTimeColumn: + def test_picks_canonical_skips_other_columns(self): + path = _infer_time_column( + [ + ("delta_time", ChannelDataType.INT_64, "delta_time"), + ("voltage", ChannelDataType.DOUBLE, "voltage"), + ("timestamp", ChannelDataType.INT_64, "timestamp"), + ] + ) + assert path == "timestamp" + + def test_accepts_uint64(self): + path = _infer_time_column([("time", ChannelDataType.UINT_64, "time")]) + assert path == "time" + + def test_case_insensitive(self): + path = _infer_time_column([("TimeStamp", ChannelDataType.INT_64, "TimeStamp")]) + assert path == "TimeStamp" + + def test_multiple_candidates_sorted_alphabetically(self): + path = _infer_time_column( + [ + ("timestamp", ChannelDataType.INT_64, "timestamp"), + ("time", ChannelDataType.INT_64, "time"), + ("ts", ChannelDataType.INT_64, "ts"), + ] + ) + assert path == "time" + + def test_returns_none_when_no_canonical_int_column(self): + path = _infer_time_column( + [ + ("timestamp", ChannelDataType.DOUBLE, "timestamp"), + ("event_time", ChannelDataType.INT_64, "event_time"), + ] + ) + assert path is None + + +def _make_flat_dataset_response( + time_path: str, data_columns: list[tuple[str, int]] +) -> ParquetConfig: + return ParquetConfig( + flat_dataset=ParquetFlatDatasetConfig( + time_column=ParquetTimeColumnProto(path=time_path), + data_columns=[ + ParquetDataColumnProto( + path=path, + channel_config=ChannelConfigProto( + name=path, + data_type=cast("ChannelDataTypeProto.ValueType", data_type), + ), + ) + for path, data_type in data_columns + ], + ) + ) + + +def _make_scpr_response(time_path: str, columns: list[tuple[str, int]]) -> ParquetConfig: + return ParquetConfig( + single_channel_per_row=ParquetSingleChannelPerRowConfig( + time_column=ParquetTimeColumnProto(path=time_path), + columns=[ + ParquetColumn( + path=path, + column_config=ChannelConfigProto( + name=path, + data_type=cast("ChannelDataTypeProto.ValueType", data_type), + ), + ) + for path, data_type in columns + ], + ) + ) + + +class TestParseParquetDetectResponseTimeFallback: + def test_flat_dataset_infers_int64_time_column(self): + proto = _make_flat_dataset_response( + time_path="", + data_columns=[ + ("voltage", ChannelDataType.DOUBLE.value), + ("timestamp", ChannelDataType.INT_64.value), + ("status", ChannelDataType.INT_32.value), + ], + ) + config = _parse_parquet_detect_response(proto, "file.parquet", 0, 0) + assert isinstance(config, ParquetFlatDatasetImportConfig) + assert config.time_column.path == "timestamp" + assert [dc.path for dc in config.data_columns] == ["voltage", "status"] + + def test_flat_dataset_keeps_server_time_column_when_set(self): + proto = _make_flat_dataset_response( + time_path="server_ts", + data_columns=[ + ("server_ts", ChannelDataType.INT_64.value), + ("timestamp", ChannelDataType.INT_64.value), + ("voltage", ChannelDataType.DOUBLE.value), + ], + ) + config = _parse_parquet_detect_response(proto, "file.parquet", 0, 0) + assert config.time_column.path == "server_ts" + assert [dc.path for dc in config.data_columns] == ["timestamp", "voltage"] + + def test_flat_dataset_no_int64_match_leaves_time_empty(self): + proto = _make_flat_dataset_response( + time_path="", + data_columns=[("voltage", ChannelDataType.DOUBLE.value)], + ) + config = _parse_parquet_detect_response(proto, "file.parquet", 0, 0) + assert config.time_column.path == "" + assert [dc.path for dc in config.data_columns] == ["voltage"] + + def test_scpr_infers_int64_time_column(self): + proto = _make_scpr_response( + time_path="", + columns=[ + ("voltage", ChannelDataType.DOUBLE.value), + ("timestamp", ChannelDataType.INT_64.value), + ], + ) + config = _parse_parquet_detect_response(proto, "file.parquet", 0, 0) + assert isinstance(config, ParquetSingleChannelPerRowImportConfig) + assert config.time_column.path == "timestamp" diff --git a/python/lib/sift_client/resources/data_imports.py b/python/lib/sift_client/resources/data_imports.py index 401b77e62..045c0537f 100644 --- a/python/lib/sift_client/resources/data_imports.py +++ b/python/lib/sift_client/resources/data_imports.py @@ -8,6 +8,7 @@ from sift_client._internal.util.file import extract_parquet_footer, upload_file from sift_client.resources._base import ResourceBase from sift_client.sift_types.asset import Asset +from sift_client.sift_types.channel import ChannelDataType from sift_client.sift_types.data_import import ( EXTENSION_TO_DATA_TYPE_KEY, CsvImportConfig, @@ -15,10 +16,13 @@ ImportConfig, ParquetFlatDatasetImportConfig, ParquetSingleChannelPerRowImportConfig, + ParquetTimeColumn, ) from sift_client.sift_types.run import Run if TYPE_CHECKING: + from collections.abc import Iterable + from sift_client.client import SiftClient from sift_client.sift_types.job import Job @@ -320,6 +324,28 @@ def _parse_csv_detect_response(proto) -> CsvImportConfig: return csv_config +_TIME_COLUMN_NAMES: frozenset[str] = frozenset({"ts", "timestamp", "time"}) +_TIME_COLUMN_TYPES: frozenset[ChannelDataType] = frozenset( + {ChannelDataType.INT_64, ChannelDataType.UINT_64} +) + + +def _infer_time_column( + columns: Iterable[tuple[str, ChannelDataType, str]], +) -> str | None: + """Pick a likely time column when the server couldn't identify one. + + Returns the path of an INT64 or UINT64 column whose name + (case-insensitive) matches one of ``ts``, ``timestamp``, or ``time``. + Returns None otherwise. + """ + data_columns = sorted(columns, key=lambda c: c[0].lower()) + for name, data_type, path in data_columns: + if data_type in _TIME_COLUMN_TYPES and name.lower() in _TIME_COLUMN_NAMES: + return path + return None + + def _parse_parquet_detect_response( proto, filename: str, footer_offset: int, footer_length: int ) -> ParquetFlatDatasetImportConfig | ParquetSingleChannelPerRowImportConfig: @@ -328,16 +354,30 @@ def _parse_parquet_detect_response( parquet_config = ParquetFlatDatasetImportConfig._from_proto( proto, footer_offset=footer_offset, footer_length=footer_length ) - time_path = parquet_config.time_column.path + time_path: str | None = parquet_config.time_column.path + if not time_path: + time_path = _infer_time_column( + (dc.name, dc.data_type, dc.path) for dc in parquet_config.data_columns + ) + if time_path: + parquet_config.time_column = ParquetTimeColumn(path=time_path) if time_path: parquet_config.data_columns = [ dc for dc in parquet_config.data_columns if dc.path != time_path ] return parquet_config elif proto.HasField("single_channel_per_row"): - return ParquetSingleChannelPerRowImportConfig._from_proto( + scpr_config = ParquetSingleChannelPerRowImportConfig._from_proto( proto, footer_offset=footer_offset, footer_length=footer_length ) + if not scpr_config.time_column.path: + time_path = _infer_time_column( + (col.column_config.name, ChannelDataType(col.column_config.data_type), col.path) + for col in proto.single_channel_per_row.columns + ) + if time_path is not None: + scpr_config.time_column = ParquetTimeColumn(path=time_path) + return scpr_config raise ValueError(f"Unsupported parquet layout in DetectConfig response for '{filename}'.")