Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 153 additions & 1 deletion python/lib/sift_client/_tests/resources/test_data_imports.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,31 @@
"""Unit tests for data import config models and helpers."""

from __future__ import annotations

from datetime import datetime, timezone
from typing import TYPE_CHECKING, cast

import pytest
from sift.common.type.v1.channel_config_pb2 import ChannelConfig as ChannelConfigProto
from sift.data_imports.v2.data_imports_pb2 import (
ParquetColumn,
ParquetConfig,
ParquetFlatDatasetConfig,
ParquetSingleChannelPerRowConfig,
)
from sift.data_imports.v2.data_imports_pb2 import (
ParquetDataColumn as ParquetDataColumnProto,
)
from sift.data_imports.v2.data_imports_pb2 import (
ParquetTimeColumn as ParquetTimeColumnProto,
)

from sift_client.resources import DataImportAPI, DataImportAPIAsync
from sift_client.resources.data_imports import _resolve_data_type_key
from sift_client.resources.data_imports import (
_infer_time_column,
_parse_parquet_detect_response,
_resolve_data_type_key,
)
from sift_client.sift_types.channel import ChannelDataType
from sift_client.sift_types.data_import import (
CsvDataColumn,
Expand All @@ -22,6 +42,11 @@
TimeFormat,
)

if TYPE_CHECKING:
from sift.common.type.v1.channel_data_type_pb2 import (
ChannelDataType as ChannelDataTypeProto,
)


@pytest.mark.integration
def test_client_binding(sift_client):
Expand Down Expand Up @@ -362,3 +387,130 @@ def test_explicit_data_type_overrides_extension(self):
def test_unknown_extension_raises(self):
with pytest.raises(ValueError, match="Unsupported file extension"):
_resolve_data_type_key(".xyz", None)


class TestInferTimeColumn:
def test_picks_canonical_skips_other_columns(self):
path = _infer_time_column(
[
("delta_time", ChannelDataType.INT_64, "delta_time"),
("voltage", ChannelDataType.DOUBLE, "voltage"),
("timestamp", ChannelDataType.INT_64, "timestamp"),
]
)
assert path == "timestamp"

def test_accepts_uint64(self):
path = _infer_time_column([("time", ChannelDataType.UINT_64, "time")])
assert path == "time"

def test_case_insensitive(self):
path = _infer_time_column([("TimeStamp", ChannelDataType.INT_64, "TimeStamp")])
assert path == "TimeStamp"

def test_multiple_candidates_sorted_alphabetically(self):
path = _infer_time_column(
[
("timestamp", ChannelDataType.INT_64, "timestamp"),
("time", ChannelDataType.INT_64, "time"),
("ts", ChannelDataType.INT_64, "ts"),
]
)
assert path == "time"

def test_returns_none_when_no_canonical_int_column(self):
path = _infer_time_column(
[
("timestamp", ChannelDataType.DOUBLE, "timestamp"),
("event_time", ChannelDataType.INT_64, "event_time"),
]
)
assert path is None


def _make_flat_dataset_response(
time_path: str, data_columns: list[tuple[str, int]]
) -> ParquetConfig:
return ParquetConfig(
flat_dataset=ParquetFlatDatasetConfig(
time_column=ParquetTimeColumnProto(path=time_path),
data_columns=[
ParquetDataColumnProto(
path=path,
channel_config=ChannelConfigProto(
name=path,
data_type=cast("ChannelDataTypeProto.ValueType", data_type),
),
)
for path, data_type in data_columns
],
)
)


def _make_scpr_response(time_path: str, columns: list[tuple[str, int]]) -> ParquetConfig:
return ParquetConfig(
single_channel_per_row=ParquetSingleChannelPerRowConfig(
time_column=ParquetTimeColumnProto(path=time_path),
columns=[
ParquetColumn(
path=path,
column_config=ChannelConfigProto(
name=path,
data_type=cast("ChannelDataTypeProto.ValueType", data_type),
),
)
for path, data_type in columns
],
)
)


class TestParseParquetDetectResponseTimeFallback:
def test_flat_dataset_infers_int64_time_column(self):
proto = _make_flat_dataset_response(
time_path="",
data_columns=[
("voltage", ChannelDataType.DOUBLE.value),
("timestamp", ChannelDataType.INT_64.value),
("status", ChannelDataType.INT_32.value),
],
)
config = _parse_parquet_detect_response(proto, "file.parquet", 0, 0)
assert isinstance(config, ParquetFlatDatasetImportConfig)
assert config.time_column.path == "timestamp"
assert [dc.path for dc in config.data_columns] == ["voltage", "status"]

def test_flat_dataset_keeps_server_time_column_when_set(self):
proto = _make_flat_dataset_response(
time_path="server_ts",
data_columns=[
("server_ts", ChannelDataType.INT_64.value),
("timestamp", ChannelDataType.INT_64.value),
("voltage", ChannelDataType.DOUBLE.value),
],
)
config = _parse_parquet_detect_response(proto, "file.parquet", 0, 0)
assert config.time_column.path == "server_ts"
assert [dc.path for dc in config.data_columns] == ["timestamp", "voltage"]

def test_flat_dataset_no_int64_match_leaves_time_empty(self):
proto = _make_flat_dataset_response(
time_path="",
data_columns=[("voltage", ChannelDataType.DOUBLE.value)],
)
config = _parse_parquet_detect_response(proto, "file.parquet", 0, 0)
assert config.time_column.path == ""
assert [dc.path for dc in config.data_columns] == ["voltage"]

def test_scpr_infers_int64_time_column(self):
proto = _make_scpr_response(
time_path="",
columns=[
("voltage", ChannelDataType.DOUBLE.value),
("timestamp", ChannelDataType.INT_64.value),
],
)
config = _parse_parquet_detect_response(proto, "file.parquet", 0, 0)
assert isinstance(config, ParquetSingleChannelPerRowImportConfig)
assert config.time_column.path == "timestamp"
44 changes: 42 additions & 2 deletions python/lib/sift_client/resources/data_imports.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,17 +8,21 @@
from sift_client._internal.util.file import extract_parquet_footer, upload_file
from sift_client.resources._base import ResourceBase
from sift_client.sift_types.asset import Asset
from sift_client.sift_types.channel import ChannelDataType
from sift_client.sift_types.data_import import (
EXTENSION_TO_DATA_TYPE_KEY,
CsvImportConfig,
DataTypeKey,
ImportConfig,
ParquetFlatDatasetImportConfig,
ParquetSingleChannelPerRowImportConfig,
ParquetTimeColumn,
)
from sift_client.sift_types.run import Run

if TYPE_CHECKING:
from collections.abc import Iterable

from sift_client.client import SiftClient
from sift_client.sift_types.job import Job

Expand Down Expand Up @@ -320,6 +324,28 @@ def _parse_csv_detect_response(proto) -> CsvImportConfig:
return csv_config


_TIME_COLUMN_NAMES: frozenset[str] = frozenset({"ts", "timestamp", "time"})
_TIME_COLUMN_TYPES: frozenset[ChannelDataType] = frozenset(
{ChannelDataType.INT_64, ChannelDataType.UINT_64}
)


def _infer_time_column(
columns: Iterable[tuple[str, ChannelDataType, str]],
) -> str | None:
"""Pick a likely time column when the server couldn't identify one.

Returns the path of an INT64 or UINT64 column whose name
(case-insensitive) matches one of ``ts``, ``timestamp``, or ``time``.
Returns None otherwise.
"""
data_columns = sorted(columns, key=lambda c: c[0].lower())
for name, data_type, path in data_columns:
if data_type in _TIME_COLUMN_TYPES and name.lower() in _TIME_COLUMN_NAMES:
return path
return None


def _parse_parquet_detect_response(
proto, filename: str, footer_offset: int, footer_length: int
) -> ParquetFlatDatasetImportConfig | ParquetSingleChannelPerRowImportConfig:
Expand All @@ -328,16 +354,30 @@ def _parse_parquet_detect_response(
parquet_config = ParquetFlatDatasetImportConfig._from_proto(
proto, footer_offset=footer_offset, footer_length=footer_length
)
time_path = parquet_config.time_column.path
time_path: str | None = parquet_config.time_column.path
if not time_path:
time_path = _infer_time_column(
(dc.name, dc.data_type, dc.path) for dc in parquet_config.data_columns
)
if time_path:
parquet_config.time_column = ParquetTimeColumn(path=time_path)
if time_path:
parquet_config.data_columns = [
dc for dc in parquet_config.data_columns if dc.path != time_path
]
return parquet_config
elif proto.HasField("single_channel_per_row"):
return ParquetSingleChannelPerRowImportConfig._from_proto(
scpr_config = ParquetSingleChannelPerRowImportConfig._from_proto(
proto, footer_offset=footer_offset, footer_length=footer_length
)
if not scpr_config.time_column.path:
time_path = _infer_time_column(
(col.column_config.name, ChannelDataType(col.column_config.data_type), col.path)
for col in proto.single_channel_per_row.columns
)
if time_path is not None:
scpr_config.time_column = ParquetTimeColumn(path=time_path)
return scpr_config
raise ValueError(f"Unsupported parquet layout in DetectConfig response for '{filename}'.")


Expand Down
Loading