From 5e20bd8360475be87f00b05a18cd3e869d237375 Mon Sep 17 00:00:00 2001 From: Wei Qi Lu Date: Thu, 21 May 2026 16:49:29 -0700 Subject: [PATCH 1/4] infer parquet time column for int64/uint64 timestamps --- .../_tests/resources/test_data_imports.py | 132 +++++++++++++++++- .../lib/sift_client/resources/data_imports.py | 41 +++++- 2 files changed, 171 insertions(+), 2 deletions(-) diff --git a/python/lib/sift_client/_tests/resources/test_data_imports.py b/python/lib/sift_client/_tests/resources/test_data_imports.py index 6b8dcf03e..ff6831bee 100644 --- a/python/lib/sift_client/_tests/resources/test_data_imports.py +++ b/python/lib/sift_client/_tests/resources/test_data_imports.py @@ -1,11 +1,30 @@ """Unit tests for data import config models and helpers.""" +from __future__ import annotations + from datetime import datetime, timezone import pytest +from sift.common.type.v1.channel_config_pb2 import ChannelConfig as ChannelConfigProto +from sift.data_imports.v2.data_imports_pb2 import ( + ParquetColumn, + ParquetConfig, + ParquetFlatDatasetConfig, + ParquetSingleChannelPerRowConfig, +) +from sift.data_imports.v2.data_imports_pb2 import ( + ParquetDataColumn as ParquetDataColumnProto, +) +from sift.data_imports.v2.data_imports_pb2 import ( + ParquetTimeColumn as ParquetTimeColumnProto, +) from sift_client.resources import DataImportAPI, DataImportAPIAsync -from sift_client.resources.data_imports import _resolve_data_type_key +from sift_client.resources.data_imports import ( + _infer_time_column, + _parse_parquet_detect_response, + _resolve_data_type_key, +) from sift_client.sift_types.channel import ChannelDataType from sift_client.sift_types.data_import import ( CsvDataColumn, @@ -362,3 +381,114 @@ def test_explicit_data_type_overrides_extension(self): def test_unknown_extension_raises(self): with pytest.raises(ValueError, match="Unsupported file extension"): _resolve_data_type_key(".xyz", None) + + +class TestInferTimeColumn: + def test_picks_canonical_skips_other_columns(self): + path = _infer_time_column( + [ + ("delta_time", ChannelDataType.INT_64, "delta_time"), + ("voltage", ChannelDataType.DOUBLE, "voltage"), + ("timestamp", ChannelDataType.INT_64, "timestamp"), + ] + ) + assert path == "timestamp" + + def test_accepts_uint64(self): + path = _infer_time_column([("time", ChannelDataType.UINT_64, "time")]) + assert path == "time" + + def test_case_insensitive(self): + path = _infer_time_column([("TimeStamp", ChannelDataType.INT_64, "TimeStamp")]) + assert path == "TimeStamp" + + def test_returns_none_when_no_canonical_int_column(self): + path = _infer_time_column( + [ + ("timestamp", ChannelDataType.DOUBLE, "timestamp"), + ("event_time", ChannelDataType.INT_64, "event_time"), + ] + ) + assert path is None + + +def _make_flat_dataset_response( + time_path: str, data_columns: list[tuple[str, int]] +) -> ParquetConfig: + return ParquetConfig( + flat_dataset=ParquetFlatDatasetConfig( + time_column=ParquetTimeColumnProto(path=time_path), + data_columns=[ + ParquetDataColumnProto( + path=path, + channel_config=ChannelConfigProto(name=path, data_type=data_type), + ) + for path, data_type in data_columns + ], + ) + ) + + +def _make_scpr_response(time_path: str, columns: list[tuple[str, int]]) -> ParquetConfig: + return ParquetConfig( + single_channel_per_row=ParquetSingleChannelPerRowConfig( + time_column=ParquetTimeColumnProto(path=time_path), + columns=[ + ParquetColumn( + path=path, + column_config=ChannelConfigProto(name=path, data_type=data_type), + ) + for path, data_type in columns + ], + ) + ) + + +class TestParseParquetDetectResponseTimeFallback: + def test_flat_dataset_infers_int64_time_column(self): + proto = _make_flat_dataset_response( + time_path="", + data_columns=[ + ("voltage", ChannelDataType.DOUBLE.value), + ("timestamp", ChannelDataType.INT_64.value), + ("status", ChannelDataType.INT_32.value), + ], + ) + config = _parse_parquet_detect_response(proto, "file.parquet", 0, 0) + assert isinstance(config, ParquetFlatDatasetImportConfig) + assert config.time_column.path == "timestamp" + assert [dc.path for dc in config.data_columns] == ["voltage", "status"] + + def test_flat_dataset_keeps_server_time_column_when_set(self): + proto = _make_flat_dataset_response( + time_path="server_ts", + data_columns=[ + ("server_ts", ChannelDataType.INT_64.value), + ("timestamp", ChannelDataType.INT_64.value), + ("voltage", ChannelDataType.DOUBLE.value), + ], + ) + config = _parse_parquet_detect_response(proto, "file.parquet", 0, 0) + assert config.time_column.path == "server_ts" + assert [dc.path for dc in config.data_columns] == ["timestamp", "voltage"] + + def test_flat_dataset_no_int64_match_leaves_time_empty(self): + proto = _make_flat_dataset_response( + time_path="", + data_columns=[("voltage", ChannelDataType.DOUBLE.value)], + ) + config = _parse_parquet_detect_response(proto, "file.parquet", 0, 0) + assert config.time_column.path == "" + assert [dc.path for dc in config.data_columns] == ["voltage"] + + def test_scpr_infers_int64_time_column(self): + proto = _make_scpr_response( + time_path="", + columns=[ + ("voltage", ChannelDataType.DOUBLE.value), + ("timestamp", ChannelDataType.INT_64.value), + ], + ) + config = _parse_parquet_detect_response(proto, "file.parquet", 0, 0) + assert isinstance(config, ParquetSingleChannelPerRowImportConfig) + assert config.time_column.path == "timestamp" diff --git a/python/lib/sift_client/resources/data_imports.py b/python/lib/sift_client/resources/data_imports.py index 401b77e62..f2a2fea7b 100644 --- a/python/lib/sift_client/resources/data_imports.py +++ b/python/lib/sift_client/resources/data_imports.py @@ -8,6 +8,7 @@ from sift_client._internal.util.file import extract_parquet_footer, upload_file from sift_client.resources._base import ResourceBase from sift_client.sift_types.asset import Asset +from sift_client.sift_types.channel import ChannelDataType from sift_client.sift_types.data_import import ( EXTENSION_TO_DATA_TYPE_KEY, CsvImportConfig, @@ -15,10 +16,13 @@ ImportConfig, ParquetFlatDatasetImportConfig, ParquetSingleChannelPerRowImportConfig, + ParquetTimeColumn, ) from sift_client.sift_types.run import Run if TYPE_CHECKING: + from collections.abc import Iterable + from sift_client.client import SiftClient from sift_client.sift_types.job import Job @@ -320,6 +324,27 @@ def _parse_csv_detect_response(proto) -> CsvImportConfig: return csv_config +_TIME_COLUMN_NAMES: frozenset[str] = frozenset({"ts", "timestamp", "time"}) +_TIME_COLUMN_TYPES: frozenset[ChannelDataType] = frozenset( + {ChannelDataType.INT_64, ChannelDataType.UINT_64} +) + + +def _infer_time_column( + columns: Iterable[tuple[str, ChannelDataType, str]], +) -> str | None: + """Pick a likely time column when the server couldn't identify one. + + Returns the path of the first INT64 or UINT64 column whose name + (case-insensitive) matches one of ``ts``, ``timestamp``, or ``time``. + Returns None otherwise. + """ + for name, data_type, path in columns: + if data_type in _TIME_COLUMN_TYPES and name.lower() in _TIME_COLUMN_NAMES: + return path + return None + + def _parse_parquet_detect_response( proto, filename: str, footer_offset: int, footer_length: int ) -> ParquetFlatDatasetImportConfig | ParquetSingleChannelPerRowImportConfig: @@ -329,15 +354,29 @@ def _parse_parquet_detect_response( proto, footer_offset=footer_offset, footer_length=footer_length ) time_path = parquet_config.time_column.path + if not time_path: + time_path = _infer_time_column( + (dc.name, dc.data_type, dc.path) for dc in parquet_config.data_columns + ) + if time_path: + parquet_config.time_column = ParquetTimeColumn(path=time_path) if time_path: parquet_config.data_columns = [ dc for dc in parquet_config.data_columns if dc.path != time_path ] return parquet_config elif proto.HasField("single_channel_per_row"): - return ParquetSingleChannelPerRowImportConfig._from_proto( + scpr_config = ParquetSingleChannelPerRowImportConfig._from_proto( proto, footer_offset=footer_offset, footer_length=footer_length ) + if not scpr_config.time_column.path: + inferred = _infer_time_column( + (col.column_config.name, ChannelDataType(col.column_config.data_type), col.path) + for col in proto.single_channel_per_row.columns + ) + if inferred is not None: + scpr_config.time_column = ParquetTimeColumn(path=inferred) + return scpr_config raise ValueError(f"Unsupported parquet layout in DetectConfig response for '{filename}'.") From d530384621828678440a606a8e50e343cdd4f18f Mon Sep 17 00:00:00 2001 From: Wei Qi Lu Date: Thu, 21 May 2026 16:54:46 -0700 Subject: [PATCH 2/4] mypy fix --- python/lib/sift_client/resources/data_imports.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/sift_client/resources/data_imports.py b/python/lib/sift_client/resources/data_imports.py index f2a2fea7b..d8139eb2b 100644 --- a/python/lib/sift_client/resources/data_imports.py +++ b/python/lib/sift_client/resources/data_imports.py @@ -353,7 +353,7 @@ def _parse_parquet_detect_response( parquet_config = ParquetFlatDatasetImportConfig._from_proto( proto, footer_offset=footer_offset, footer_length=footer_length ) - time_path = parquet_config.time_column.path + time_path: str | None = parquet_config.time_column.path if not time_path: time_path = _infer_time_column( (dc.name, dc.data_type, dc.path) for dc in parquet_config.data_columns From 9d308b84072351b027b65af3612e6ae1406bb948 Mon Sep 17 00:00:00 2001 From: Wei Qi Lu Date: Thu, 21 May 2026 17:04:18 -0700 Subject: [PATCH 3/4] cast proto enums --- .../_tests/resources/test_data_imports.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/python/lib/sift_client/_tests/resources/test_data_imports.py b/python/lib/sift_client/_tests/resources/test_data_imports.py index ff6831bee..cc7de8f1d 100644 --- a/python/lib/sift_client/_tests/resources/test_data_imports.py +++ b/python/lib/sift_client/_tests/resources/test_data_imports.py @@ -3,6 +3,7 @@ from __future__ import annotations from datetime import datetime, timezone +from typing import TYPE_CHECKING, cast import pytest from sift.common.type.v1.channel_config_pb2 import ChannelConfig as ChannelConfigProto @@ -41,6 +42,11 @@ TimeFormat, ) +if TYPE_CHECKING: + from sift.common.type.v1.channel_data_type_pb2 import ( + ChannelDataType as ChannelDataTypeProto, + ) + @pytest.mark.integration def test_client_binding(sift_client): @@ -421,7 +427,10 @@ def _make_flat_dataset_response( data_columns=[ ParquetDataColumnProto( path=path, - channel_config=ChannelConfigProto(name=path, data_type=data_type), + channel_config=ChannelConfigProto( + name=path, + data_type=cast("ChannelDataTypeProto.ValueType", data_type), + ), ) for path, data_type in data_columns ], @@ -436,7 +445,10 @@ def _make_scpr_response(time_path: str, columns: list[tuple[str, int]]) -> Parqu columns=[ ParquetColumn( path=path, - column_config=ChannelConfigProto(name=path, data_type=data_type), + column_config=ChannelConfigProto( + name=path, + data_type=cast("ChannelDataTypeProto.ValueType", data_type), + ), ) for path, data_type in columns ], From 78ab9b14a09857450b37327de10f8d0a79af53d0 Mon Sep 17 00:00:00 2001 From: Wei Qi Lu Date: Thu, 21 May 2026 17:08:41 -0700 Subject: [PATCH 4/4] rename and sort data columns --- .../sift_client/_tests/resources/test_data_imports.py | 10 ++++++++++ python/lib/sift_client/resources/data_imports.py | 11 ++++++----- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/python/lib/sift_client/_tests/resources/test_data_imports.py b/python/lib/sift_client/_tests/resources/test_data_imports.py index cc7de8f1d..18e9c1ca5 100644 --- a/python/lib/sift_client/_tests/resources/test_data_imports.py +++ b/python/lib/sift_client/_tests/resources/test_data_imports.py @@ -408,6 +408,16 @@ def test_case_insensitive(self): path = _infer_time_column([("TimeStamp", ChannelDataType.INT_64, "TimeStamp")]) assert path == "TimeStamp" + def test_multiple_candidates_sorted_alphabetically(self): + path = _infer_time_column( + [ + ("timestamp", ChannelDataType.INT_64, "timestamp"), + ("time", ChannelDataType.INT_64, "time"), + ("ts", ChannelDataType.INT_64, "ts"), + ] + ) + assert path == "time" + def test_returns_none_when_no_canonical_int_column(self): path = _infer_time_column( [ diff --git a/python/lib/sift_client/resources/data_imports.py b/python/lib/sift_client/resources/data_imports.py index d8139eb2b..045c0537f 100644 --- a/python/lib/sift_client/resources/data_imports.py +++ b/python/lib/sift_client/resources/data_imports.py @@ -335,11 +335,12 @@ def _infer_time_column( ) -> str | None: """Pick a likely time column when the server couldn't identify one. - Returns the path of the first INT64 or UINT64 column whose name + Returns the path of an INT64 or UINT64 column whose name (case-insensitive) matches one of ``ts``, ``timestamp``, or ``time``. Returns None otherwise. """ - for name, data_type, path in columns: + data_columns = sorted(columns, key=lambda c: c[0].lower()) + for name, data_type, path in data_columns: if data_type in _TIME_COLUMN_TYPES and name.lower() in _TIME_COLUMN_NAMES: return path return None @@ -370,12 +371,12 @@ def _parse_parquet_detect_response( proto, footer_offset=footer_offset, footer_length=footer_length ) if not scpr_config.time_column.path: - inferred = _infer_time_column( + time_path = _infer_time_column( (col.column_config.name, ChannelDataType(col.column_config.data_type), col.path) for col in proto.single_channel_per_row.columns ) - if inferred is not None: - scpr_config.time_column = ParquetTimeColumn(path=inferred) + if time_path is not None: + scpr_config.time_column = ParquetTimeColumn(path=time_path) return scpr_config raise ValueError(f"Unsupported parquet layout in DetectConfig response for '{filename}'.")