diff --git a/python/setup.py b/python/setup.py index 759abab33..16fc7aa84 100644 --- a/python/setup.py +++ b/python/setup.py @@ -144,7 +144,7 @@ def finalize_options(self): setup( name="tsfile", version=version, - packages=["tsfile"], + packages=["tsfile", "tsfile.dataset"], package_dir={"": "."}, include_package_data=True, ext_modules=cythonize(exts, compiler_directives={"language_level": 3}), diff --git a/python/tests/test_tsfile_dataset.py b/python/tests/test_tsfile_dataset.py new file mode 100644 index 000000000..5a1173129 --- /dev/null +++ b/python/tests/test_tsfile_dataset.py @@ -0,0 +1,369 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +import numpy as np +import pandas as pd +import pytest + +from tsfile import ColumnCategory, ColumnSchema, TSDataType, TableSchema, TsFileTableWriter +from tsfile import AlignedTimeseries, Timeseries, TsFileDataFrame +from tsfile.dataset.formatting import format_timestamp +from tsfile.dataset.reader import TsFileSeriesReader + + +def _write_weather_file(path, start): + schema = TableSchema( + "weather", + [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("temperature", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("humidity", TSDataType.DOUBLE, ColumnCategory.FIELD), + ], + ) + df = pd.DataFrame( + { + "time": [start, start + 1, start + 2], + "device": ["device_a", "device_a", "device_a"], + "temperature": [20.0, 21.5, 23.0], + "humidity": [50.0, 52.0, 55.0], + } + ) + with TsFileTableWriter(str(path), schema) as writer: + writer.write_dataframe(df) + + +def _write_numeric_and_text_file(path): + schema = TableSchema( + "weather", + [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("temperature", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("status", TSDataType.STRING, ColumnCategory.FIELD), + ], + ) + df = pd.DataFrame( + { + "time": [0, 1, 2], + "device": ["device_a", "device_a", "device_a"], + "temperature": [20.0, np.nan, 23.5], + "status": ["ok", "warn", "ok"], + } + ) + with TsFileTableWriter(str(path), schema) as writer: + writer.write_dataframe(df) + + +def _write_partial_numeric_rows_file(path): + schema = TableSchema( + "weather", + [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("temperature", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("humidity", TSDataType.DOUBLE, ColumnCategory.FIELD), + ], + ) + df = pd.DataFrame( + { + "time": [0, 1], + "device": ["device_a", "device_a"], + "temperature": [np.nan, 21.0], + "humidity": [50.0, 51.0], + } + ) + with TsFileTableWriter(str(path), schema) as writer: + writer.write_dataframe(df) + + +def _write_weather_with_extra_field_file(path, start): + schema = TableSchema( + "weather", + [ + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("temperature", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("humidity", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("pressure", TSDataType.DOUBLE, ColumnCategory.FIELD), + ], + ) + df = pd.DataFrame( + { + "time": [start, start + 1], + "device": ["device_a", "device_a"], + "temperature": [20.0, 21.0], + "humidity": [50.0, 51.0], + "pressure": [1000.0, 1001.0], + } + ) + with TsFileTableWriter(str(path), schema) as writer: + writer.write_dataframe(df) + + +def _write_multi_tag_file(path): + schema = TableSchema( + "weather", + [ + ColumnSchema("city", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("temperature", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("humidity", TSDataType.DOUBLE, ColumnCategory.FIELD), + ColumnSchema("status", TSDataType.STRING, ColumnCategory.FIELD), + ], + ) + df = pd.DataFrame( + { + "time": [0, 1, 0, 1], + "city": ["beijing", "beijing", "shanghai", "shanghai"], + "device": ["device_a", "device_a", "device_b", "device_b"], + "temperature": [20.0, 21.0, 24.0, 25.0], + "humidity": [50.0, 51.0, 60.0, 61.0], + "status": ["ok", "ok", "warn", "warn"], + } + ) + with TsFileTableWriter(str(path), schema) as writer: + writer.write_dataframe(df) + + +def _write_special_tag_file(path): + schema = TableSchema( + "weather", + [ + ColumnSchema("city", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("device", TSDataType.STRING, ColumnCategory.TAG), + ColumnSchema("temperature", TSDataType.DOUBLE, ColumnCategory.FIELD), + ], + ) + df = pd.DataFrame( + { + "time": [0, 1], + "city": ["bei.jing", "bei.jing"], + "device": [r"dev\1", r"dev\1"], + "temperature": [20.0, 21.0], + } + ) + with TsFileTableWriter(str(path), schema) as writer: + writer.write_dataframe(df) + + +def test_dataset_top_level_imports(): + assert TsFileDataFrame.__module__ == "tsfile.dataset.dataframe" + assert Timeseries.__module__ == "tsfile.dataset.timeseries" + assert AlignedTimeseries.__module__ == "tsfile.dataset.timeseries" + + +def test_format_timestamp_preserves_millisecond_precision(): + assert "." not in format_timestamp(1000) + assert format_timestamp(1).endswith(".001") + + +def test_dataset_basic_access_patterns(tmp_path, capsys): + path1 = tmp_path / "part1.tsfile" + path2 = tmp_path / "part2.tsfile" + _write_weather_file(path1, 0) + _write_weather_file(path2, 3) + + with TsFileDataFrame([str(path1), str(path2)], show_progress=False) as tsdf: + assert len(tsdf) == 2 + + first = tsdf[0] + assert isinstance(first, Timeseries) + assert first.name in tsdf.list_timeseries() + assert len(first) == 6 + assert first[0] == 20.0 + assert first[-1] == 23.0 + assert "Timeseries(" in repr(first) + + by_name = tsdf[first.name] + assert isinstance(by_name, Timeseries) + assert by_name.name == first.name + + subset = tsdf[:1] + assert isinstance(subset, TsFileDataFrame) + assert len(subset) == 1 + + selected = tsdf[[0, 1]] + assert isinstance(selected, TsFileDataFrame) + assert len(selected) == 2 + + aligned = tsdf.loc[0:5, [0, 1]] + assert isinstance(aligned, AlignedTimeseries) + assert aligned.shape == (6, 2) + + aligned_negative = tsdf.loc[0:5, [-1]] + assert isinstance(aligned_negative, AlignedTimeseries) + assert aligned_negative.shape == (6, 1) + + assert list(tsdf["field"]) == ["temperature", "humidity"] + + assert "TsFileDataFrame(2 time series, 2 files)" in repr(tsdf) + aligned.show(2) + assert "AlignedTimeseries(6 rows, 2 series)" in capsys.readouterr().out + + +def test_dataset_exposes_only_numeric_fields_and_keeps_nan(tmp_path): + path = tmp_path / "numeric_and_text.tsfile" + _write_numeric_and_text_file(path) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + assert tsdf.list_timeseries() == ["weather.device_a.temperature"] + + series = tsdf[0] + assert series.name == "weather.device_a.temperature" + assert np.isnan(series[1]) + sliced = series[:3] + assert sliced.shape == (3,) + assert np.isnan(sliced[1]) + assert series[1:1].shape == (0,) + + +def test_dataset_timeseries_supports_negative_step_slices(tmp_path): + path = tmp_path / "weather.tsfile" + _write_weather_file(path, 0) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + series = tsdf[0] + np.testing.assert_array_equal(series[::-1], np.array([23.0, 21.5, 20.0])) + np.testing.assert_array_equal(series[::-2], np.array([23.0, 20.0])) + + +def test_dataset_metadata_discovery_uses_all_numeric_fields(tmp_path): + path = tmp_path / "partial_numeric_rows.tsfile" + _write_partial_numeric_rows_file(path) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + assert tsdf.list_timeseries() == [ + "weather.device_a.temperature", + "weather.device_a.humidity", + ] + + assert list(tsdf["count"]) == [2, 2] + assert list(tsdf["start_time"]) == [0, 0] + assert list(tsdf["end_time"]) == [1, 1] + + +def test_dataset_rejects_duplicate_timestamps_across_shards(tmp_path): + path1 = tmp_path / "part1.tsfile" + path2 = tmp_path / "part2.tsfile" + _write_weather_file(path1, 0) + _write_weather_file(path2, 2) + + with pytest.raises(ValueError, match="Duplicate timestamp"): + TsFileDataFrame([str(path1), str(path2)], show_progress=False) + + +def test_dataset_rejects_incompatible_table_schemas_across_shards(tmp_path): + path1 = tmp_path / "part1.tsfile" + path2 = tmp_path / "part2.tsfile" + _write_weather_file(path1, 0) + _write_weather_with_extra_field_file(path2, 2) + + with pytest.raises(ValueError, match="Incompatible schema for table 'weather'"): + TsFileDataFrame([str(path1), str(path2)], show_progress=False) + + +def test_dataset_multi_tag_metadata_discovery(tmp_path): + path = tmp_path / "multi_tag.tsfile" + _write_multi_tag_file(path) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + assert tsdf.list_timeseries() == [ + "weather.beijing.device_a.temperature", + "weather.beijing.device_a.humidity", + "weather.shanghai.device_b.temperature", + "weather.shanghai.device_b.humidity", + ] + + summary = pd.DataFrame( + { + "series_path": tsdf.list_timeseries(), + "table": tsdf["table"], + "city": tsdf["city"], + "device": tsdf["device"], + "field": tsdf["field"], + "start_time": tsdf["start_time"], + "end_time": tsdf["end_time"], + "count": tsdf["count"], + } + ).sort_values(["city", "device", "field"]).reset_index(drop=True) + assert list(summary.columns) == [ + "series_path", + "table", + "city", + "device", + "field", + "start_time", + "end_time", + "count", + ] + assert list(summary["city"]) == ["beijing", "beijing", "shanghai", "shanghai"] + assert list(summary["device"]) == ["device_a", "device_a", "device_b", "device_b"] + assert list(summary["field"]) == ["humidity", "temperature", "humidity", "temperature"] + assert list(summary["count"]) == [2, 2, 2, 2] + + +def test_dataset_series_paths_escape_special_tag_values(tmp_path): + path = tmp_path / "special_tag.tsfile" + _write_special_tag_file(path) + + with TsFileDataFrame(str(path), show_progress=False) as tsdf: + expected_path = r"weather.bei\.jing.dev\\1.temperature" + assert tsdf.list_timeseries() == [expected_path] + + series = tsdf[expected_path] + assert isinstance(series, Timeseries) + assert series.name == expected_path + assert list(tsdf["city"]) == ["bei.jing"] + assert list(tsdf["device"]) == [r"dev\1"] + + +def test_reader_series_paths_escape_special_tag_values(tmp_path): + path = tmp_path / "special_tag.tsfile" + _write_special_tag_file(path) + + reader = TsFileSeriesReader(str(path), show_progress=False) + try: + expected_path = r"weather.bei\.jing.dev\\1.temperature" + assert reader.series_paths == [expected_path] + info = reader.get_series_info(expected_path) + assert info["tag_values"] == {"city": "bei.jing", "device": r"dev\1"} + finally: + reader.close() + + +def test_reader_catalog_shares_device_metadata_and_resolves_paths(tmp_path): + path = tmp_path / "weather.tsfile" + _write_weather_file(path, 100) + + reader = TsFileSeriesReader(str(path), show_progress=False) + try: + assert reader.series_paths == [ + "weather.device_a.temperature", + "weather.device_a.humidity", + ] + assert len(reader.catalog.table_entries) == 1 + assert len(reader.catalog.device_entries) == 1 + assert reader.catalog.series_count == 2 + + by_path = reader.get_series_info("weather.device_a.temperature") + by_ref = reader.get_series_info_by_ref(0, 0) + assert by_ref == by_path + assert by_ref["tag_values"] == {"device": "device_a"} + + ts_by_path = reader.get_series_timestamps("weather.device_a.temperature") + ts_by_device = reader.get_device_timestamps(0) + np.testing.assert_array_equal(ts_by_path, ts_by_device) + finally: + reader.close() diff --git a/python/tsfile/__init__.py b/python/tsfile/__init__.py index 55fa3b9e4..84ca330e4 100644 --- a/python/tsfile/__init__.py +++ b/python/tsfile/__init__.py @@ -40,4 +40,5 @@ from .tsfile_writer import TsFileWriterPy as TsFileWriter from .tsfile_py_cpp import get_tsfile_config, set_tsfile_config from .tsfile_table_writer import TsFileTableWriter -from .utils import to_dataframe, dataframe_to_tsfile \ No newline at end of file +from .utils import to_dataframe, dataframe_to_tsfile +from .dataset import TsFileDataFrame, Timeseries, AlignedTimeseries diff --git a/python/tsfile/dataset/__init__.py b/python/tsfile/dataset/__init__.py new file mode 100644 index 000000000..4072bd4c1 --- /dev/null +++ b/python/tsfile/dataset/__init__.py @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +"""Dataset-style TsFile accessors.""" + +from .dataframe import TsFileDataFrame +from .timeseries import AlignedTimeseries, Timeseries + +__all__ = ["TsFileDataFrame", "Timeseries", "AlignedTimeseries"] diff --git a/python/tsfile/dataset/dataframe.py b/python/tsfile/dataset/dataframe.py new file mode 100644 index 000000000..3ed8f74db --- /dev/null +++ b/python/tsfile/dataset/dataframe.py @@ -0,0 +1,614 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +"""Top-level dataset accessors for TsFile shards.""" + +from collections import defaultdict +from dataclasses import dataclass, field +import os +import sys +from typing import Dict, List, Set, Tuple, Union + +import numpy as np + +from .formatting import format_dataframe_table +from .metadata import TableEntry, _coerce_path_component, build_logical_series_path, split_logical_series_path +from .merge import build_aligned_matrix, merge_time_value_parts, merge_timestamp_parts +from .timeseries import AlignedTimeseries, Timeseries + + +DeviceKey = Tuple[str, tuple] +SeriesRefKey = Tuple[int, int] +SeriesRef = Tuple[object, int, int] +DeviceRef = Tuple[object, int] + +_QUERY_START = np.iinfo(np.int64).min +_QUERY_END = np.iinfo(np.int64).max + + +@dataclass(slots=True) +class _LogicalIndex: + """Cross-reader logical mapping for devices and series.""" + + # Shared table schema references keyed by table name. + table_entries: Dict[str, TableEntry] = field(default_factory=dict) + + # Stable logical device order, each item is (table_name, tag_values). + device_order: List[DeviceKey] = field(default_factory=list) + # Map one logical device key to its dataframe-local device index. + device_index_by_key: Dict[DeviceKey, int] = field(default_factory=dict) + # For each logical device, keep the contributing reader-local device refs. + device_refs: List[List[DeviceRef]] = field(default_factory=list) + + # Stable logical series order, each item is (device_idx, field_idx). + series_refs_ordered: List[SeriesRefKey] = field(default_factory=list) + # Map one logical series ref to the contributing reader-local series refs. + series_ref_map: Dict[SeriesRefKey, List[SeriesRef]] = field(default_factory=dict) + # Fast membership check for resolved series refs. + series_ref_set: Set[SeriesRefKey] = field(default_factory=set) + + +@dataclass(slots=True) +class _DerivedCache: + """Merged metadata derived from the logical index.""" + + devices: List[dict] = field(default_factory=list) + field_stats: Dict[SeriesRefKey, dict] = field(default_factory=dict) + + +def _expand_paths(paths: Union[str, List[str]]) -> List[str]: + """Normalize file/directory inputs into a validated list of absolute TsFile paths.""" + if isinstance(paths, str): + paths = [paths] + + expanded = [] + for path in paths: + if os.path.isdir(path): + tsfiles = sorted( + os.path.join(root, name) + for root, _, files in os.walk(path) + for name in files + if name.endswith(".tsfile") + ) + if not tsfiles: + raise FileNotFoundError(f"No .tsfile files found in directory: {path}") + expanded.extend(tsfiles) + else: + expanded.append(path) + + resolved = [] + for path in expanded: + if not os.path.exists(path): + raise FileNotFoundError(f"TsFile not found: {path}") + resolved.append(os.path.abspath(path)) + return resolved + + +def _series_lookup_hint(name: str) -> str: + return f"Series not found: '{name}'. Use df.list_timeseries() to inspect available series." + + +def _validate_table_schema(existing: TableEntry, incoming: TableEntry, file_path: str) -> None: + """Reject same-name tables whose tag/field layout differs across shards.""" + if ( + existing.tag_columns == incoming.tag_columns + and existing.tag_types == incoming.tag_types + and existing.field_columns == incoming.field_columns + ): + return + + raise ValueError( + f"Incompatible schema for table '{incoming.table_name}' in '{file_path}'. " + f"Expected tags={list(existing.tag_columns)}, tag_types={list(existing.tag_types)}, " + f"fields={list(existing.field_columns)} but found " + f"tags={list(incoming.tag_columns)}, tag_types={list(incoming.tag_types)}, " + f"fields={list(incoming.field_columns)}." + ) + + +def _register_reader( + readers: Dict[str, object], + index: _LogicalIndex, + file_path: str, + reader, +) -> None: + """Merge one reader's catalog into the dataframe-wide logical index.""" + readers[file_path] = reader + catalog = reader.catalog + + for table_entry in catalog.table_entries: + existing_entry = index.table_entries.get(table_entry.table_name) + if existing_entry is None: + index.table_entries[table_entry.table_name] = table_entry + else: + _validate_table_schema(existing_entry, table_entry, file_path) + + for device_id, device_entry in enumerate(catalog.device_entries): + table_entry = catalog.table_entries[device_entry.table_id] + device_key = (table_entry.table_name, tuple(device_entry.tag_values)) + device_idx = index.device_index_by_key.get(device_key) + if device_idx is None: + device_idx = len(index.device_order) + index.device_index_by_key[device_key] = device_idx + index.device_order.append(device_key) + index.device_refs.append([]) + index.device_refs[device_idx].append((reader, device_id)) + + for field_idx in range(len(table_entry.field_columns)): + series_ref = (device_idx, field_idx) + if series_ref not in index.series_ref_map: + index.series_refs_ordered.append(series_ref) + index.series_ref_map[series_ref] = [] + index.series_ref_map[series_ref].append((reader, device_id, field_idx)) + + +def _build_device_entry(refs: List[DeviceRef]) -> dict: + """Compute per-device time bounds after merging all contributing shards.""" + # [Temporary] It will be replaced by query_by_row and metadata interface in TsFile + if len(refs) == 1: + merged_timestamps = refs[0][0].get_device_timestamps(refs[0][1]) + else: + merged_timestamps = merge_timestamp_parts( + [reader.get_device_timestamps(device_id) for reader, device_id in refs], + validate_unique=True, + ) + + return { + "min_time": int(merged_timestamps[0]) if len(merged_timestamps) > 0 else None, + "max_time": int(merged_timestamps[-1]) if len(merged_timestamps) > 0 else None, + } + + +def _merge_field_timestamps(series_name: str, refs: List[SeriesRef]) -> np.ndarray: + """Load and merge the full timestamp axis for one logical series on demand.""" + # [Temporary] It will be replaced by query_by_row interface in TsFile + time_parts = [] + for reader, device_id, field_idx in refs: + ts_arr, _ = reader.read_series_by_ref(device_id, field_idx, _QUERY_START, _QUERY_END) + if len(ts_arr) > 0: + time_parts.append(ts_arr) + + if not time_parts: + merged_timestamps = np.array([], dtype=np.int64) + elif len(time_parts) == 1: + merged_timestamps = time_parts[0] + else: + try: + merged_timestamps = merge_timestamp_parts(time_parts, validate_unique=True) + except ValueError as e: + message = str(e) + duplicate_suffix = message.removeprefix("Duplicate timestamp ") + duplicate_suffix = duplicate_suffix.removesuffix(" found across shards.") + raise ValueError( + f"Duplicate timestamp {duplicate_suffix} found for series '{series_name}' across shards. " + f"Cross-shard duplicate timestamps are not supported." + ) from e + + return merged_timestamps + + +def _build_field_stats(refs: List[SeriesRef]) -> dict: + """Aggregate cheap per-shard stats without materializing full series values.""" + min_time = None + max_time = None + count = 0 + + for reader, device_id, field_idx in refs: + info = reader.get_series_info_by_ref(device_id, field_idx) + shard_min = info["min_time"] + shard_max = info["max_time"] + shard_count = info["length"] + + if shard_count == 0: + continue + + count += shard_count + min_time = shard_min if min_time is None else min(min_time, shard_min) + max_time = shard_max if max_time is None else max(max_time, shard_max) + + return { + "min_time": min_time, + "max_time": max_time, + "count": count, + } + + +class _LocIndexer: + """Implement ``.loc[start_time:end_time, series_list]`` for aligned reads.""" + + def __init__(self, dataframe: "TsFileDataFrame"): + self._df = dataframe + + def _parse_key(self, key): + if not isinstance(key, tuple) or len(key) != 2: + raise ValueError("loc requires exactly 2 arguments: tsdf.loc[start_time:end_time, series_list]") + + time_slice, series_spec = key + if isinstance(time_slice, slice): + start_time = _QUERY_START if time_slice.start is None else time_slice.start + end_time = _QUERY_END if time_slice.stop is None else time_slice.stop + elif isinstance(time_slice, (int, np.integer)): + start_time = end_time = int(time_slice) + else: + raise TypeError(f"Time index must be slice or int, got {type(time_slice)}") + + if isinstance(series_spec, (str, int, np.integer)): + series_spec = [series_spec] + + series_refs = [] + series_names = [] + for item in series_spec: + if isinstance(item, (int, np.integer)): + idx = int(item) + if idx < 0: + idx += len(self._df._index.series_refs_ordered) + if idx < 0 or idx >= len(self._df._index.series_refs_ordered): + raise IndexError(f"Series index {item} out of range") + series_ref = self._df._index.series_refs_ordered[idx] + elif isinstance(item, str): + series_ref = self._df._resolve_series_name(item) + else: + raise TypeError(f"Series specifier must be int or str, got {type(item)}") + series_refs.append(series_ref) + series_names.append(self._df._build_series_name(series_ref)) + + return start_time, end_time, series_refs, series_names + + def _query_aligned(self, start_time: int, end_time: int, series_refs: List[SeriesRefKey], series_names: List[str]): + """Batch aligned reads by reader/device, then merge per-series fragments.""" + groups = defaultdict(list) + for col_idx, series_ref in enumerate(series_refs): + device_idx, field_idx = series_ref + device_info = self._df._cache.devices[device_idx] + if device_info["max_time"] is None or device_info["max_time"] < start_time or device_info["min_time"] > end_time: + continue + + _, table_entry, _ = self._df._get_series_components(series_ref) + field_name = table_entry.field_columns[field_idx] + for reader, device_id, reader_field_idx in self._df._index.series_ref_map[series_ref]: + groups[(id(reader), device_id)].append( + (col_idx, reader_field_idx, field_name, series_names[col_idx], reader, device_id) + ) + + series_time_parts = defaultdict(list) + series_value_parts = defaultdict(list) + for entries in groups.values(): + reader = entries[0][4] + device_id = entries[0][5] + field_indices = list(dict.fromkeys(entry[1] for entry in entries)) + ts_arr, field_vals = reader.read_device_fields_by_time_range(device_id, field_indices, start_time, end_time) + for _, _, field_name, series_name, _, _ in entries: + if len(ts_arr) > 0: + series_time_parts[series_name].append(ts_arr) + series_value_parts[series_name].append(field_vals[field_name]) + + series_data = {} + for name in series_names: + series_data[name] = merge_time_value_parts(series_time_parts[name], series_value_parts[name]) + + return build_aligned_matrix(series_names, series_data) + + def __getitem__(self, key) -> AlignedTimeseries: + start_time, end_time, series_refs, series_names = self._parse_key(key) + timestamps, values = self._query_aligned(start_time, end_time, series_refs, series_names) + return AlignedTimeseries(timestamps, values, series_names) + + +class TsFileDataFrame: + """Lazy-loaded unified numeric dataset view over multiple TsFile shards.""" + + def __init__(self, paths: Union[str, List[str]], show_progress: bool = True): + self._paths = _expand_paths(paths) + self._show_progress = show_progress + self._readers: Dict[str, object] = {} + self._index = _LogicalIndex() + self._cache = _DerivedCache() + self._is_view = False + self._root = None + self._load_metadata() + + @classmethod + def _from_subset(cls, parent: "TsFileDataFrame", series_refs: List[SeriesRefKey]) -> "TsFileDataFrame": + """Create a lightweight view that reuses the parent's readers and caches.""" + obj = object.__new__(cls) + obj._root = parent._root if parent._is_view else parent + obj._is_view = True + obj._paths = parent._paths + obj._show_progress = parent._show_progress + obj._readers = parent._readers + obj._index = _LogicalIndex( + table_entries=parent._index.table_entries, + device_order=parent._index.device_order, + device_index_by_key=parent._index.device_index_by_key, + device_refs=parent._index.device_refs, + series_refs_ordered=list(series_refs), + series_ref_map=parent._index.series_ref_map, + series_ref_set=set(series_refs), + ) + obj._cache = _DerivedCache(devices=parent._cache.devices, field_stats=parent._cache.field_stats) + return obj + + def _load_metadata(self): + """Build the logical cross-file index and the derived per-series caches.""" + from .reader import TsFileSeriesReader + + if len(self._paths) >= 2: + self._load_metadata_parallel(TsFileSeriesReader) + else: + self._load_metadata_serial(TsFileSeriesReader) + + self._cache.devices = [_build_device_entry(refs) for refs in self._index.device_refs] + for series_ref in self._index.series_refs_ordered: + self._cache.field_stats[series_ref] = _build_field_stats(self._index.series_ref_map[series_ref]) + + self._index.series_ref_set = set(self._index.series_refs_ordered) + if not self._index.series_refs_ordered: + raise ValueError("No valid time series found in the provided TsFile files") + + def _load_metadata_serial(self, reader_class): + for file_path in self._paths: + _register_reader( + self._readers, + self._index, + file_path, + reader_class(file_path, show_progress=self._show_progress), + ) + + def _load_metadata_parallel(self, reader_class): + from concurrent.futures import ThreadPoolExecutor, as_completed + + def open_file(file_path): + return file_path, reader_class(file_path, show_progress=False) + + total = len(self._paths) + with ThreadPoolExecutor(max_workers=min(total, os.cpu_count() or 4)) as executor: + futures = {executor.submit(open_file, path): path for path in self._paths} + results = {} + done = 0 + for future in as_completed(futures): + file_path, reader = future.result() + results[file_path] = reader + done += 1 + if self._show_progress: + sys.stderr.write(f"\rLoading TsFile shards: {done}/{total}") + sys.stderr.flush() + + if self._show_progress and total > 0: + total_series = sum(reader.series_count for reader in results.values()) + sys.stderr.write(f"\rLoading TsFile shards: {total}/{total} ({total_series} series) ... done\n") + sys.stderr.flush() + + for file_path in self._paths: + _register_reader( + self._readers, + self._index, + file_path, + results[file_path], + ) + + def _get_series_components(self, series_ref: SeriesRefKey) -> Tuple[DeviceKey, TableEntry, int]: + device_idx, field_idx = series_ref + device_key = self._index.device_order[device_idx] + return device_key, self._index.table_entries[device_key[0]], field_idx + + def _build_series_name(self, series_ref: SeriesRefKey) -> str: + device_key, table_entry, field_idx = self._get_series_components(series_ref) + table_name, tag_values = device_key + field_name = table_entry.field_columns[field_idx] + return build_logical_series_path(table_name, tag_values, field_name) + + def _resolve_series_name(self, series_name: str) -> SeriesRefKey: + try: + parts = split_logical_series_path(series_name) + except ValueError as exc: + raise KeyError(_series_lookup_hint(series_name)) from exc + if len(parts) < 2: + raise KeyError(_series_lookup_hint(series_name)) + + table_name = parts[0] + if table_name not in self._index.table_entries: + raise KeyError(_series_lookup_hint(series_name)) + + table_entry = self._index.table_entries[table_name] + expected_parts = len(table_entry.tag_columns) + 2 + if len(parts) != expected_parts: + raise KeyError(_series_lookup_hint(series_name)) + + field_name = parts[-1] + try: + field_idx = table_entry.get_field_index(field_name) + except ValueError as exc: + raise KeyError(_series_lookup_hint(series_name)) from exc + + tag_values = tuple( + _coerce_path_component(raw_value, tag_type) + for raw_value, tag_type in zip(parts[1:-1], table_entry.tag_types) + ) + device_key = (table_name, tag_values) + device_idx = self._index.device_index_by_key.get(device_key) + if device_idx is None: + raise KeyError(_series_lookup_hint(series_name)) + + series_ref = (device_idx, field_idx) + if series_ref not in self._index.series_ref_set: + raise KeyError(_series_lookup_hint(series_name)) + return series_ref + + def _build_series_info(self, series_ref: SeriesRefKey) -> dict: + device_idx, field_idx = series_ref + device_key, table_entry, _ = self._get_series_components(series_ref) + field_stats = self._cache.field_stats[series_ref] + return { + "table_name": table_entry.table_name, + "field": table_entry.field_columns[field_idx], + "tag_columns": table_entry.tag_columns, + "tag_values": dict(zip(table_entry.tag_columns, device_key[1])), + "min_time": field_stats["min_time"], + "max_time": field_stats["max_time"], + "count": field_stats["count"], + } + + def __len__(self) -> int: + return len(self._index.series_refs_ordered) + + def list_timeseries(self, path_prefix: str = "") -> List[str]: + names = [self._build_series_name(series_ref) for series_ref in self._index.series_refs_ordered] + if not path_prefix: + return names + prefix = path_prefix if path_prefix.endswith(".") else path_prefix + "." + return [name for name in names if name.startswith(prefix) or name == path_prefix] + + def _get_timeseries(self, series_ref: SeriesRefKey) -> Timeseries: + series_name = self._build_series_name(series_ref) + return Timeseries( + series_name, + self._index.series_ref_map[series_ref], + self._cache.field_stats[series_ref], + lambda: _merge_field_timestamps(series_name, self._index.series_ref_map[series_ref]), + ) + + def __getitem__(self, key): + try: + import pandas as pd + + if isinstance(key, pd.Series) and key.dtype == bool: + selected = [self._index.series_refs_ordered[idx] for idx in key.index[key]] + return TsFileDataFrame._from_subset(self, selected) + except ImportError: + pass + + if isinstance(key, (int, np.integer)): + idx = int(key) + if idx < 0: + idx += len(self._index.series_refs_ordered) + if idx < 0 or idx >= len(self._index.series_refs_ordered): + raise IndexError(f"Index {idx} out of range [0, {len(self._index.series_refs_ordered)})") + return self._get_timeseries(self._index.series_refs_ordered[idx]) + + if isinstance(key, str): + try: + return self._get_timeseries(self._resolve_series_name(key)) + except KeyError: + pass + + valid_columns = {"table", "field", "start_time", "end_time", "count"} + valid_columns.update(self._collect_tag_columns()) + if key not in valid_columns: + raise KeyError(_series_lookup_hint(key)) + + import pandas as pd + + values = [] + for series_ref in self._index.series_refs_ordered: + info = self._build_series_info(series_ref) + if key == "table": + values.append(info["table_name"]) + elif key == "field": + values.append(info["field"]) + elif key == "start_time": + values.append(info["min_time"]) + elif key == "end_time": + values.append(info["max_time"]) + elif key == "count": + values.append(info["count"]) + else: + values.append(info["tag_values"].get(key, "")) + return pd.Series(values, name=key) + + if isinstance(key, slice): + return TsFileDataFrame._from_subset( + self, + [self._index.series_refs_ordered[idx] for idx in range(*key.indices(len(self._index.series_refs_ordered)))], + ) + + if isinstance(key, list): + selected = [] + for item in key: + if not isinstance(item, (int, np.integer)): + raise TypeError(f"List index must contain integers, got {type(item)}") + idx = int(item) + if idx < 0: + idx += len(self._index.series_refs_ordered) + if idx < 0 or idx >= len(self._index.series_refs_ordered): + raise IndexError(f"Index {item} out of range [0, {len(self._index.series_refs_ordered)})") + selected.append(self._index.series_refs_ordered[idx]) + return TsFileDataFrame._from_subset(self, selected) + + raise TypeError(f"Unsupported key type: {type(key)}") + + @property + def loc(self): + return _LocIndexer(self) + + def _collect_tag_columns(self) -> List[str]: + seen = {} + for table_name, _ in self._index.device_order: + for column in self._index.table_entries[table_name].tag_columns: + seen.setdefault(column, True) + return list(seen.keys()) + + def _format_table(self, indices=None, max_rows: int = 20) -> str: + series_names = [] + merged_info = {} + for series_ref in self._index.series_refs_ordered: + series_name = self._build_series_name(series_ref) + series_names.append(series_name) + merged_info[series_name] = self._build_series_info(series_ref) + + return format_dataframe_table( + series_names, + merged_info, + self._collect_tag_columns(), + indices=indices, + max_rows=max_rows, + ) + + def _repr_header(self) -> str: + total = len(self._index.series_refs_ordered) + if self._is_view: + return f"TsFileDataFrame({total} time series, subset of {len(self._root._index.series_refs_ordered)})\n" + return f"TsFileDataFrame({total} time series, {len(self._paths)} files)\n" + + def __repr__(self): + return self._repr_header() + self._format_table() + + def __str__(self): + return self.__repr__() + + def show(self, max_rows: int = 20): + print(self._repr_header() + self._format_table(max_rows=max_rows)) + + def close(self): + if self._is_view: + return + for reader in self._readers.values(): + reader.close() + self._readers.clear() + + def __del__(self): + try: + if not getattr(self, "_is_view", False): + self.close() + except Exception: + pass + + def __enter__(self): + return self + + def __exit__(self, *args): + self.close() diff --git a/python/tsfile/dataset/formatting.py b/python/tsfile/dataset/formatting.py new file mode 100644 index 000000000..5e01bb39b --- /dev/null +++ b/python/tsfile/dataset/formatting.py @@ -0,0 +1,171 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +"""String formatting helpers for dataset objects.""" + +from datetime import datetime +from typing import Dict, List, Optional + +import numpy as np + + +def format_timestamp(ts_ms: int) -> str: + """Convert millisecond timestamp to human-readable string.""" + try: + dt = datetime.fromtimestamp(ts_ms / 1000) + if ts_ms % 1000 == 0: + return dt.strftime("%Y-%m-%d %H:%M:%S") + return dt.strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + except (OSError, ValueError, TypeError): + return str(ts_ms) + + +def format_aligned_timeseries( + timestamps: np.ndarray, + values: np.ndarray, + series_names: List[str], + max_rows: Optional[int], +) -> str: + """Render a table-like string for aligned query results. + + Uses head/tail truncation with '...' when rows exceed *max_rows*, + consistent with format_dataframe_table. + """ + n_rows, n_cols = values.shape + if n_rows == 0: + return f"AlignedTimeseries(0 rows, {n_cols} series)" + + # Determine which rows to render (head + tail when truncated) + if max_rows is not None and n_rows > max_rows: + head = max_rows // 2 + tail = max_rows - head + show_indices = list(range(head)) + list(range(n_rows - tail, n_rows)) + truncated = True + else: + show_indices = list(range(n_rows)) + truncated = False + + # Only format the rows we will actually display + ts_strs = {i: format_timestamp(int(timestamps[i])) for i in show_indices} + ts_width = max(max((len(s) for s in ts_strs.values()), default=0), len("time")) + + col_widths = [] + rendered_values = [] # list of dicts: row_idx -> cell string + for col_idx in range(n_cols): + col_name = series_names[col_idx] if col_idx < len(series_names) else f"col_{col_idx}" + width = len(col_name) + column = {} + for row_idx in show_indices: + value = values[row_idx, col_idx] + cell = "NaN" if np.isnan(value) else f"{value:.2f}" + column[row_idx] = cell + width = max(width, len(cell)) + rendered_values.append(column) + col_widths.append(width) + + header = ["time".rjust(ts_width)] + for col_idx, width in enumerate(col_widths): + col_name = series_names[col_idx] if col_idx < len(series_names) else f"col_{col_idx}" + header.append(col_name.rjust(width)) + lines = [" ".join(header)] + + head_count = max_rows // 2 if truncated else len(show_indices) + for i, row_idx in enumerate(show_indices): + if truncated and i == head_count: + lines.append("...") + parts = [ts_strs[row_idx].rjust(ts_width)] + for col_idx, width in enumerate(col_widths): + parts.append(rendered_values[col_idx][row_idx].rjust(width)) + lines.append(" ".join(parts)) + + return f"AlignedTimeseries({n_rows} rows, {n_cols} series)\n" + "\n".join(lines) + + +def format_dataframe_table( + series_list: List[str], + merged_info: Dict[str, dict], + tag_columns: List[str], + indices: Optional[List[int]] = None, + max_rows: int = 20, +) -> str: + """Render the metadata table used by TsFileDataFrame.__repr__.""" + if indices is None: + indices = list(range(len(series_list))) + else: + indices = list(indices) + + total = len(indices) + if total > max_rows: + show_indices = list(indices[: max_rows // 2]) + list(indices[-max_rows // 2 :]) + truncated = True + else: + show_indices = indices + truncated = False + + rows = [] + for idx in show_indices: + name = series_list[idx] + info = merged_info[name] + row = { + "index": idx, + "table": info["table_name"], + "field": info["field"], + "start_time": format_timestamp(info["min_time"]), + "end_time": format_timestamp(info["max_time"]), + "count": info["count"], + } + for tag_col in tag_columns: + row[tag_col] = info["tag_values"].get(tag_col, "") + rows.append(row) + + if not rows: + return "Empty TsFileDataFrame" + + headers = ["", "table"] + tag_columns + ["field", "start_time", "end_time", "count"] + widths = {header: len(header) for header in headers} + widths[""] = max(len(str(row["index"])) for row in rows) + + for row in rows: + widths[""] = max(widths[""], len(str(row["index"]))) + widths["table"] = max(widths["table"], len(row["table"])) + widths["field"] = max(widths["field"], len(row["field"])) + widths["start_time"] = max(widths["start_time"], len(row["start_time"])) + widths["end_time"] = max(widths["end_time"], len(row["end_time"])) + widths["count"] = max(widths["count"], len(str(row["count"]))) + for tag_col in tag_columns: + widths[tag_col] = max(widths[tag_col], len(str(row[tag_col]))) + + lines = [" ".join(header.rjust(widths[header]) for header in headers)] + split = len(rows) // 2 if truncated else len(rows) + for row_idx, row in enumerate(rows): + if truncated and row_idx == split: + lines.append("...") + parts = [str(row["index"]).rjust(widths[""]), row["table"].rjust(widths["table"])] + for tag_col in tag_columns: + parts.append(str(row[tag_col]).rjust(widths[tag_col])) + parts.extend( + [ + row["field"].rjust(widths["field"]), + row["start_time"].rjust(widths["start_time"]), + row["end_time"].rjust(widths["end_time"]), + str(row["count"]).rjust(widths["count"]), + ] + ) + lines.append(" ".join(parts)) + + return "\n".join(lines) diff --git a/python/tsfile/dataset/merge.py b/python/tsfile/dataset/merge.py new file mode 100644 index 000000000..1f72290ea --- /dev/null +++ b/python/tsfile/dataset/merge.py @@ -0,0 +1,154 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed with this work under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +"""Merge helpers for dataset reads. + +The dataset package enforces a strict cross-shard merge policy: +- only numeric-compatible field columns are exposed, +- null numeric values are represented as ``NaN``, +- duplicate timestamps for the same logical series across shards are rejected. +""" + +import heapq +from typing import Dict, List, Tuple + +import numpy as np + + +def merge_timestamp_parts( + time_parts: List[np.ndarray], + *, + deduplicate: bool = False, + validate_unique: bool = False, +) -> np.ndarray: + """Merge sorted timestamp parts with optional deduplication or validation.""" + parts = [ts_part for ts_part in time_parts if len(ts_part) > 0] + if not parts: + return np.array([], dtype=np.int64) + if len(parts) == 1: + return parts[0] + + parts.sort(key=lambda ts_part: int(ts_part[0])) + if all(int(parts[idx - 1][-1]) < int(parts[idx][0]) for idx in range(1, len(parts))): + return np.concatenate(parts) + + total_length = sum(len(ts_part) for ts_part in parts) + merged = np.empty(total_length, dtype=np.int64) + + heap = [(int(ts_part[0]), part_idx, 0) for part_idx, ts_part in enumerate(parts)] + heapq.heapify(heap) + + out_idx = 0 + last_ts = None + while heap: + ts, part_idx, offset = heapq.heappop(heap) + + if last_ts is not None and ts == last_ts: + if validate_unique: + raise ValueError(f"Duplicate timestamp {ts} found across shards.") + if not deduplicate: + merged[out_idx] = ts + out_idx += 1 + else: + merged[out_idx] = ts + out_idx += 1 + last_ts = ts + + next_offset = offset + 1 + if next_offset < len(parts[part_idx]): + heapq.heappush(heap, (int(parts[part_idx][next_offset]), part_idx, next_offset)) + + if validate_unique: + return merged[:out_idx] + if deduplicate: + return merged[:out_idx] + return merged[:out_idx] + + +def merge_time_value_parts( + time_parts: List[np.ndarray], + value_parts: List[np.ndarray], +) -> Tuple[np.ndarray, np.ndarray]: + """Merge sorted time/value parts for one logical series. + + Duplicate timestamps are validated during metadata loading, so the query + path can assume each part is already sorted and conflict-free. + + Fast path: if shard ranges do not overlap in time, concatenate in shard + order after sorting parts by their first timestamp. + Fallback: use a k-way merge for overlapping-but-disjoint ranges. + """ + parts = [(ts_part, val_part) for ts_part, val_part in zip(time_parts, value_parts) if len(ts_part) > 0] + if not parts: + return np.array([], dtype=np.int64), np.array([], dtype=np.float64) + if len(parts) == 1: + return parts[0] + + parts.sort(key=lambda item: int(item[0][0])) + time_parts = [ts_part for ts_part, _ in parts] + value_parts = [val_part for _, val_part in parts] + + if all(int(time_parts[idx - 1][-1]) < int(time_parts[idx][0]) for idx in range(1, len(time_parts))): + return np.concatenate(time_parts), np.concatenate(value_parts) + + total_length = sum(len(ts_part) for ts_part in time_parts) + merged_ts = np.empty(total_length, dtype=np.int64) + merged_vals = np.empty(total_length, dtype=np.float64) + + heap = [(int(ts_part[0]), part_idx, 0) for part_idx, ts_part in enumerate(time_parts)] + heapq.heapify(heap) + + out_idx = 0 + while heap: + _, part_idx, offset = heapq.heappop(heap) + merged_ts[out_idx] = time_parts[part_idx][offset] + merged_vals[out_idx] = value_parts[part_idx][offset] + out_idx += 1 + + next_offset = offset + 1 + if next_offset < len(time_parts[part_idx]): + heapq.heappush(heap, (int(time_parts[part_idx][next_offset]), part_idx, next_offset)) + + return merged_ts, merged_vals + + +def build_aligned_matrix( + series_names: List[str], series_data: Dict[str, Tuple[np.ndarray, np.ndarray]] +) -> Tuple[np.ndarray, np.ndarray]: + """Build a timestamp union and aligned value matrix for multiple series. + + Each input series is assumed to already satisfy the dataset merge policy, + meaning its timestamp array is unique within that logical series. + """ + all_ts_arrays = [ts for ts, _ in series_data.values() if len(ts) > 0] + if not all_ts_arrays: + return np.array([], dtype=np.int64), np.empty((0, len(series_names))) + + timestamps = merge_timestamp_parts(all_ts_arrays, deduplicate=True) + values = np.full((len(timestamps), len(series_names)), np.nan) + + for col_idx, name in enumerate(series_names): + if name not in series_data: + continue + ts_arr, val_arr = series_data[name] + if len(ts_arr) == 0: + continue + indices = np.searchsorted(timestamps, ts_arr) + values[indices, col_idx] = val_arr + + return timestamps, values diff --git a/python/tsfile/dataset/metadata.py b/python/tsfile/dataset/metadata.py new file mode 100644 index 000000000..344b4cf5e --- /dev/null +++ b/python/tsfile/dataset/metadata.py @@ -0,0 +1,224 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +"""Shared metadata models for dataset readers and views.""" + +from dataclasses import dataclass, field +from typing import Any, Dict, Iterable, Iterator, List, Tuple + +import numpy as np + +from ..constants import TSDataType + + +_PATH_SEPARATOR = "." +_PATH_ESCAPE = "\\" + + +@dataclass(slots=True) +class TableEntry: + """Schema-level metadata shared by every device in one table.""" + + table_name: str + tag_columns: Tuple[str, ...] + tag_types: Tuple[TSDataType, ...] + field_columns: Tuple[str, ...] + _field_index_by_name: Dict[str, int] = field(init=False, repr=False) + + def __post_init__(self): + self._field_index_by_name = {column: idx for idx, column in enumerate(self.field_columns)} + + def get_field_index(self, field_name: str) -> int: + if field_name not in self._field_index_by_name: + raise ValueError(f"Field not found in table '{self.table_name}': {field_name}") + return self._field_index_by_name[field_name] + + +@dataclass(slots=True) +class DeviceEntry: + """One logical device identified by table name + ordered tag values.""" + + table_id: int + tag_values: Tuple[Any, ...] + timestamps: np.ndarray + length: int + min_time: int + max_time: int + + +@dataclass(slots=True) +class MetadataCatalog: + """Canonical metadata store shared by dataset readers and dataframes.""" + + table_entries: List[TableEntry] = field(default_factory=list) + device_entries: List[DeviceEntry] = field(default_factory=list) + table_id_by_name: Dict[str, int] = field(default_factory=dict) + device_id_by_key: Dict[Tuple[int, tuple], int] = field(default_factory=dict) + + def add_table( + self, + table_name: str, + tag_columns: Iterable[str], + tag_types: Iterable[TSDataType], + field_columns: Iterable[str], + ) -> int: + table_id = len(self.table_entries) + self.table_entries.append( + TableEntry( + table_name=table_name, + tag_columns=tuple(tag_columns), + tag_types=tuple(tag_types), + field_columns=tuple(field_columns), + ) + ) + self.table_id_by_name[table_name] = table_id + return table_id + + def add_device(self, table_id: int, tag_values: tuple, timestamps: np.ndarray) -> int: + key = (table_id, tuple(tag_values)) + if key in self.device_id_by_key: + return self.device_id_by_key[key] + + timestamps = np.sort(timestamps) + if len(timestamps) == 0: + raise ValueError("Cannot register a device without timestamps.") + + device_id = len(self.device_entries) + self.device_entries.append( + DeviceEntry( + table_id=table_id, + tag_values=tuple(tag_values), + timestamps=timestamps, + length=len(timestamps), + min_time=int(timestamps[0]), + max_time=int(timestamps[-1]), + ) + ) + self.device_id_by_key[key] = device_id + return device_id + + @property + def series_count(self) -> int: + return sum(len(self.table_entries[device.table_id].field_columns) for device in self.device_entries) + + +def _escape_path_component(value: Any) -> str: + return str(value).replace(_PATH_ESCAPE, _PATH_ESCAPE * 2).replace(_PATH_SEPARATOR, _PATH_ESCAPE + _PATH_SEPARATOR) + + +def split_logical_series_path(series_path: str) -> List[str]: + parts = [] + current = [] + escaping = False + + for char in series_path: + if escaping: + current.append(char) + escaping = False + continue + if char == _PATH_ESCAPE: + escaping = True + continue + if char == _PATH_SEPARATOR: + parts.append("".join(current)) + current = [] + continue + current.append(char) + + if escaping: + raise ValueError(f"Invalid series path: {series_path}") + + parts.append("".join(current)) + return parts + + +def build_logical_series_path(table_name: str, tag_values: Iterable[Any], field_name: str) -> str: + components = [table_name, *tag_values, field_name] + return _PATH_SEPARATOR.join(_escape_path_component(component) for component in components) + + +def build_series_path(catalog: MetadataCatalog, device_id: int, field_idx: int) -> str: + """Return the external logical series name for one device field.""" + device_entry = catalog.device_entries[device_id] + table_entry = catalog.table_entries[device_entry.table_id] + field_name = table_entry.field_columns[field_idx] + return build_logical_series_path(table_entry.table_name, device_entry.tag_values, field_name) + + +def iter_series_refs(catalog: MetadataCatalog) -> Iterator[Tuple[int, int]]: + """Yield ``(device_id, field_idx)`` pairs in catalog order.""" + for device_id, device_entry in enumerate(catalog.device_entries): + table_entry = catalog.table_entries[device_entry.table_id] + for field_idx in range(len(table_entry.field_columns)): + yield device_id, field_idx + + +def iter_series_paths(catalog: MetadataCatalog) -> Iterator[str]: + """Yield logical series names in catalog order.""" + for device_id, field_idx in iter_series_refs(catalog): + yield build_series_path(catalog, device_id, field_idx) + + +def resolve_series_path(catalog: MetadataCatalog, series_path: str) -> Tuple[int, int, int]: + """Resolve an external path to ``(table_id, device_id, field_idx)``.""" + parts = split_logical_series_path(series_path) + if len(parts) < 2: + raise ValueError(f"Invalid series path: {series_path}") + + table_name = parts[0] + if table_name not in catalog.table_id_by_name: + raise ValueError(f"Series not found: {series_path}") + + table_id = catalog.table_id_by_name[table_name] + table_entry = catalog.table_entries[table_id] + expected_parts = len(table_entry.tag_columns) + 2 + if len(parts) != expected_parts: + raise ValueError(f"Series not found: {series_path}") + + field_name = parts[-1] + try: + field_idx = table_entry.get_field_index(field_name) + except ValueError as exc: + raise ValueError(f"Series not found: {series_path}") from exc + + tag_values = tuple( + _coerce_path_component(raw_value, tag_type) + for raw_value, tag_type in zip(parts[1:-1], table_entry.tag_types) + ) + key = (table_id, tag_values) + if key not in catalog.device_id_by_key: + raise ValueError(f"Series not found: {series_path}") + + return table_id, catalog.device_id_by_key[key], field_idx + + +def _coerce_path_component(value: str, data_type: TSDataType) -> Any: + if data_type in {TSDataType.STRING, TSDataType.TEXT, TSDataType.BLOB}: + return value + if data_type == TSDataType.BOOLEAN: + lowered = value.lower() + if lowered == "true": + return True + if lowered == "false": + return False + raise ValueError(f"Invalid boolean tag value: {value}") + if data_type in {TSDataType.INT32, TSDataType.INT64, TSDataType.TIMESTAMP, TSDataType.DATE}: + return int(value) + if data_type in {TSDataType.FLOAT, TSDataType.DOUBLE}: + return float(value) + return value diff --git a/python/tsfile/dataset/reader.py b/python/tsfile/dataset/reader.py new file mode 100644 index 000000000..365dc8842 --- /dev/null +++ b/python/tsfile/dataset/reader.py @@ -0,0 +1,354 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +"""Single-file reader backend used by TsFileDataFrame.""" + +import os +import sys +from typing import Dict, Iterator, List, Tuple + +import numpy as np +import pyarrow.compute as pc + +from ..constants import ColumnCategory, TSDataType +from ..tsfile_reader import TsFileReaderPy +from .metadata import MetadataCatalog, build_series_path, iter_series_refs, resolve_series_path + + +_NUMERIC_FIELD_TYPES = { + TSDataType.BOOLEAN, + TSDataType.INT32, + TSDataType.INT64, + TSDataType.FLOAT, + TSDataType.DOUBLE, + TSDataType.TIMESTAMP, +} + + +def _to_python_scalar(value): + return value.item() if hasattr(value, "item") else value + + +class TsFileSeriesReader: + """Wrap ``TsFileReaderPy`` with numeric dataset discovery and batch reads.""" + + def __init__(self, file_path: str, show_progress: bool = True): + if not os.path.exists(file_path): + raise FileNotFoundError(f"TsFile not found: {file_path}") + + self.file_path = file_path + self.show_progress = show_progress + + try: + self._reader = TsFileReaderPy(file_path) + except Exception as e: + raise ValueError(f"Failed to open TsFile: {e}") from e + + self._catalog = MetadataCatalog() + self._cache_metadata() + + def __del__(self): + self.close() + + @property + def catalog(self) -> MetadataCatalog: + return self._catalog + + @property + def series_paths(self) -> List[str]: + return list(self.iter_series_paths()) + + @property + def series_count(self) -> int: + return self._catalog.series_count + + def iter_series_paths(self) -> Iterator[str]: + for device_id, field_idx in iter_series_refs(self._catalog): + yield build_series_path(self._catalog, device_id, field_idx) + + def iter_series_refs(self) -> Iterator[Tuple[str, int, int]]: + for device_id, field_idx in iter_series_refs(self._catalog): + yield build_series_path(self._catalog, device_id, field_idx), device_id, field_idx + + def close(self): + if hasattr(self, "_reader"): + try: + self._reader.close() + except Exception: + pass + + def _cache_metadata(self): + """Wrap metadata discovery so reader construction surfaces one stable error shape.""" + try: + self._cache_metadata_table_model() + # todo: we should support tree model + except Exception as e: + raise ValueError( + f"Failed to read TsFile metadata. Please ensure the TsFile is valid and readable. Error: {e}" + ) from e + + def _cache_metadata_table_model(self): + """Build the in-memory catalog by scanning table batches from the file.""" + table_schemas = self._reader.get_all_table_schemas() + if not table_schemas: + raise ValueError("No tables found in TsFile") + + self._catalog = MetadataCatalog() + total_rows = 0 + table_names = list(table_schemas.keys()) + + for table_index, table_name in enumerate(table_names): + table_schema = table_schemas[table_name] + + tag_columns = [] + tag_types = [] + field_columns = [] + for column_schema in table_schema.get_columns(): + column_name = column_schema.get_column_name() + column_category = column_schema.get_category() + if column_category == ColumnCategory.TIME: + continue + if column_category == ColumnCategory.TAG: + tag_columns.append(column_name) + tag_types.append(column_schema.get_data_type()) + + # ignore fields which is not numeric, we won't use them currently. + elif ( + column_category == ColumnCategory.FIELD + and column_schema.get_data_type() in _NUMERIC_FIELD_TYPES + ): + field_columns.append(column_name) + + if not field_columns: + continue + + table_id = self._catalog.add_table(table_name, tag_columns, tag_types, field_columns) + time_arrays = [] + tag_arrays = {tag_column: [] for tag_column in tag_columns} + + # [Temporary] It will be replaced by new tsfile api, we won't query all the data later. + query_columns = tag_columns + field_columns + + with self._reader.query_table_batch(table_name, query_columns, batch_size=65536) as result_set: + while True: + arrow_table = result_set.read_arrow_batch() + if arrow_table is None: + break + batch_rows = arrow_table.num_rows + total_rows += batch_rows + time_arrays.append(arrow_table.column("time").to_numpy()) + for tag_column in tag_columns: + tag_arrays[tag_column].append(arrow_table.column(tag_column).to_numpy()) + + if self.show_progress: + sys.stderr.write( + f"\rReading TsFile metadata: table {table_index + 1}/{len(table_names)} " + f"[{table_name}] ({total_rows:,} rows)" + ) + sys.stderr.flush() + + if not time_arrays: + continue + + timestamps = np.concatenate(time_arrays).astype(np.int64) + if not tag_columns: + self._add_device(table_id, (), timestamps) + continue + + for tag_values, device_timestamps in self._iter_device_groups(tag_columns, timestamps, tag_arrays): + self._add_device(table_id, tag_values, device_timestamps) + + if self.show_progress and total_rows > 0: + sys.stderr.write( + f"\rReading TsFile metadata: {len(table_names)} table(s), {total_rows:,} rows, " + f"{self.series_count} series ... done\n" + ) + sys.stderr.flush() + + if self.series_count == 0: + raise ValueError("No valid numeric series found in TsFile") + + def _iter_device_groups( + self, + tag_columns: List[str], + timestamps: np.ndarray, + tag_arrays: Dict[str, list], + ) -> Iterator[Tuple[tuple, np.ndarray]]: + """Group one table's rows by tag tuple while preserving original row membership.""" + tag_values_by_column = {column: np.concatenate(tag_arrays[column]) for column in tag_columns} + + n = len(timestamps) + arrays = [tag_values_by_column[col] for col in tag_columns] + dtype = np.dtype([(col, arrays[i].dtype) for i, col in enumerate(tag_columns)]) + composite = np.empty(n, dtype=dtype) + for i, col in enumerate(tag_columns): + composite[col] = arrays[i] + + _, inverse, counts = np.unique(composite, return_inverse=True, return_counts=True) + ordered_indices = np.argsort(inverse, kind="stable") + group_bounds = np.cumsum(counts)[:-1] + for group_indices in np.split(ordered_indices, group_bounds): + first = int(group_indices[0]) + tag_tuple = tuple(_to_python_scalar(composite[col][first]) for col in tag_columns) + yield tag_tuple, timestamps[group_indices] + + def _add_device( + self, + table_id: int, + tag_values: tuple, + timestamps: np.ndarray, + ): + """Add one device to the catalog.""" + if len(timestamps) == 0: + return + + self._catalog.add_device(table_id, tag_values, timestamps) + + def _resolve_series_path(self, series_path: str) -> Tuple[int, int, int]: + return resolve_series_path(self._catalog, series_path) + + def _resolve_series_ref(self, device_id: int, field_idx: int): + """Resolve a reader-local ref into the table/device metadata needed by read paths.""" + device_entry = self._catalog.device_entries[device_id] + table_entry = self._catalog.table_entries[device_entry.table_id] + field_name = table_entry.field_columns[field_idx] + return table_entry, device_entry, field_name + + def get_device_info(self, device_id: int) -> dict: + device_entry = self._catalog.device_entries[device_id] + table_entry = self._catalog.table_entries[device_entry.table_id] + return { + "table_name": table_entry.table_name, + "tag_columns": table_entry.tag_columns, + "tag_values": dict(zip(table_entry.tag_columns, device_entry.tag_values)), + "length": device_entry.length, + "min_time": device_entry.min_time, + "max_time": device_entry.max_time, + } + + def get_device_timestamps(self, device_id: int) -> np.ndarray: + return self._catalog.device_entries[device_id].timestamps + + def get_series_info_by_ref(self, device_id: int, field_idx: int) -> dict: + table_entry, device_entry, field_name = self._resolve_series_ref(device_id, field_idx) + return { + "length": device_entry.length, + "min_time": device_entry.min_time, + "max_time": device_entry.max_time, + "table_name": table_entry.table_name, + "column_name": field_name, + "device_id": device_id, + "field_idx": field_idx, + "tag_columns": table_entry.tag_columns, + "tag_values": dict(zip(table_entry.tag_columns, device_entry.tag_values)), + } + + def get_series_info(self, series_path: str) -> dict: + device_id, field_idx = self._resolve_series_path(series_path)[1:] + return self.get_series_info_by_ref(device_id, field_idx) + + def get_series_timestamps(self, series_path: str) -> np.ndarray: + device_id = self._resolve_series_path(series_path)[1] + return self.get_device_timestamps(device_id) + + def read_series_by_ref(self, device_id: int, field_idx: int, start_time: int, end_time: int) -> Tuple[np.ndarray, np.ndarray]: + table_entry, _, field_name = self._resolve_series_ref(device_id, field_idx) + timestamps, field_values = self.read_device_fields_by_time_range(device_id, [field_idx], start_time, end_time) + if len(timestamps) == 0: + return np.array([], dtype=np.int64), np.array([], dtype=np.float64) + return timestamps, field_values[field_name] + + def read_series_by_time_range(self, series_path: str, start_time: int, end_time: int) -> Tuple[np.ndarray, np.ndarray]: + _, device_id, field_idx = self._resolve_series_path(series_path) + return self.read_series_by_ref(device_id, field_idx, start_time, end_time) + + def read_device_fields_by_time_range( + self, device_id: int, field_indices: List[int], start_time: int, end_time: int + ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]: + """Read one device slice and return the requested field columns keyed by field name.""" + device_entry = self._catalog.device_entries[device_id] + table_entry = self._catalog.table_entries[device_entry.table_id] + requested_field_columns = [table_entry.field_columns[field_idx] for field_idx in field_indices] + timestamps, field_values = self._read_arrow( + table_entry.table_name, + requested_field_columns, + table_entry.tag_columns, + dict(zip(table_entry.tag_columns, device_entry.tag_values)), + start_time, + end_time, + ) + return timestamps, field_values + + def _read_arrow( + self, + table_name: str, + field_columns: List[str], + tag_columns: Tuple[str, ...], + tag_values: Dict[str, object], + start_time: int, + end_time: int, + ) -> Tuple[np.ndarray, Dict[str, np.ndarray]]: + """Execute the underlying table query, then apply tag filtering client-side.""" + tag_columns = list(tag_columns) + field_columns = list(field_columns) + query_columns = tag_columns + field_columns if tag_columns else list(field_columns) + timestamp_parts = [] + field_parts = {field_column: [] for field_column in field_columns} + + with self._reader.query_table_batch( + table_name, + query_columns, + start_time=start_time, + end_time=end_time, + batch_size=65536, + ) as result_set: + while True: + arrow_table = result_set.read_arrow_batch() + if arrow_table is None: + break + + if tag_values: + mask = None + for tag_column, tag_value in tag_values.items(): + column_mask = pc.equal(arrow_table.column(tag_column), tag_value) + mask = column_mask if mask is None else pc.and_(mask, column_mask) + arrow_table = arrow_table.filter(mask) + + if arrow_table.num_rows == 0: + continue + + timestamp_parts.append(arrow_table.column("time").to_numpy()) + for field_column in field_columns: + raw_values = arrow_table.column(field_column).to_numpy() + try: + field_parts[field_column].append(np.asarray(raw_values, dtype=np.float64)) + except (TypeError, ValueError) as e: + raise TypeError( + f"Field column '{field_column}' in table '{table_name}' is not numeric-compatible." + ) from e + + if not timestamp_parts: + return ( + np.array([], dtype=np.int64), + {field_column: np.array([], dtype=np.float64) for field_column in field_columns}, + ) + + return ( + np.concatenate(timestamp_parts).astype(np.int64), + {field_column: np.concatenate(field_parts[field_column]) for field_column in field_columns}, + ) diff --git a/python/tsfile/dataset/timeseries.py b/python/tsfile/dataset/timeseries.py new file mode 100644 index 000000000..e4176de25 --- /dev/null +++ b/python/tsfile/dataset/timeseries.py @@ -0,0 +1,151 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +"""Timeseries handles returned by the dataset package.""" + +from typing import Callable, List, Optional, Tuple + +import numpy as np + +from .merge import merge_time_value_parts +from .formatting import format_aligned_timeseries, format_timestamp + + +class AlignedTimeseries: + """Time-aligned multi-series query result with timestamps. + + Returned by ``TsFileDataFrame.loc[...]``. The values matrix is aligned on + the union of timestamps from the selected logical series. + """ + + def __init__(self, timestamps: np.ndarray, values: np.ndarray, series_names: List[str]): + self.timestamps = timestamps + self.values = values + self.series_names = series_names + + @property + def shape(self): + return self.values.shape + + def __len__(self): + return len(self.timestamps) + + def __getitem__(self, key): + return self.values[key] + + def __repr__(self): + return format_aligned_timeseries(self.timestamps, self.values, self.series_names, max_rows=20) + + def show(self, max_rows: Optional[int] = None): + print(format_aligned_timeseries(self.timestamps, self.values, self.series_names, max_rows=max_rows)) + + +class Timeseries: + """Single logical numeric series with transparent cross-file merging. + + Cross-shard reads follow the dataset merge policy defined in + :mod:`tsfile.dataset.merge`: duplicate timestamps across shards are treated + as an error rather than being merged implicitly. + """ + + def __init__( + self, + name: str, + series_refs: list, + stats: dict, + load_timestamps: Callable[[], np.ndarray], + ): + self._name = name + self._series_refs = series_refs + self._stats = dict(stats) + self._load_timestamps = load_timestamps + self._timestamps = None + + @property + def name(self) -> str: + return self._name + + @property + def timestamps(self) -> np.ndarray: + if self._timestamps is None: + self._timestamps = self._load_timestamps() + return self._timestamps + + @property + def stats(self) -> dict: + return { + "start_time": self._stats.get("min_time"), + "end_time": self._stats.get("max_time"), + "count": self._stats["count"], + } + + def __len__(self) -> int: + return self._stats["count"] + + def __getitem__(self, key): + timestamps = self.timestamps + length = len(timestamps) + + if isinstance(key, int): + if key < 0: + key += length + if key < 0 or key >= length: + raise IndexError(f"Index {key} out of range [0, {length})") + ts = int(timestamps[key]) + _, values = self._query_time_range(ts, ts) + return float(values[0]) if len(values) > 0 else None + + if isinstance(key, slice): + requested_ts = timestamps[key] + if len(requested_ts) == 0: + return np.array([], dtype=np.float64) + + ts_arr, values = self._query_time_range(int(np.min(requested_ts)), int(np.max(requested_ts))) + result = np.full(len(requested_ts), np.nan) + if len(ts_arr) > 0: + indices = np.searchsorted(ts_arr, requested_ts) + valid = (indices < len(ts_arr)) & ( + ts_arr[np.minimum(indices, len(ts_arr) - 1)] == requested_ts + ) + result[valid] = values[indices[valid]] + return result + + raise TypeError(f"Unsupported key type: {type(key)}") + + def _query_time_range(self, start_time: int, end_time: int) -> Tuple[np.ndarray, np.ndarray]: + time_parts = [] + value_parts = [] + for reader, device_id, field_idx in self._series_refs: + device_timestamps = reader.get_device_timestamps(device_id) + if device_timestamps[-1] < start_time or device_timestamps[0] > end_time: + continue + ts_arr, val_arr = reader.read_series_by_ref(device_id, field_idx, start_time, end_time) + if len(ts_arr) > 0: + time_parts.append(ts_arr) + value_parts.append(val_arr) + return merge_time_value_parts(time_parts, value_parts) + + def __repr__(self): + stats = self.stats + if stats["count"] == 0: + return f"Timeseries('{self._name}', count=0)" + return ( + f"Timeseries('{self._name}', count={stats['count']}, " + f"start={format_timestamp(stats['start_time'])}, " + f"end={format_timestamp(stats['end_time'])})" + )