From 50820ec1e523fa253ebc798aeeb5aa748463e483 Mon Sep 17 00:00:00 2001 From: Satoshi Kido Date: Fri, 26 Jun 2026 11:29:46 -0700 Subject: [PATCH 1/2] feat: model TM1 12.6.1 Arrow/Parquet/Flight TI datasources Add first-class modelling for the three new TurboIntegrator input datasource types introduced in TM1 v12 build 12.6.1 (Apache Arrow IPC/Feather, Parquet, and Arrow Flight), verified end-to-end against a live PA 12.6.1 server. Process: - four new datasource_flight_* scalars (constructor, from_dict, properties) - ARROW/PARQUET branch reuses dataSourceNameForServer/Client and optional jsonRootPointer / jsonVariableMapping (no ascii* fields) - FLIGHT branch emits the server's verified wire fields: flightLocation / flightDescriptorType / flightDescriptor / flightAuth - columnar/Flight branches match Type case-insensitively, because the server canonicalizes Type to title-case on read-back ("Arrow"/"Parquet"/ "Flight"); this keeps from_dict(server_response) round-trips from collapsing to an empty DataSource block ProcessService.get/get_all select the four flight fields. datasource_type stays a free string (no Type whitelist) and no version gating is added, since ProcessDataSource is an OData open type validated by the engine at execute time. Tests: offline unit tests for the body shapes, from_dict round-trips, and the title-case Type round-trip (17 total), plus an optional live round-trip test gated to TM1 >= 12.6.1. Offline suite and the live 12.6.1 round-trip both pass. Co-Authored-By: Claude Opus 4.8 (1M context) --- TM1py/Objects/Process.py | 77 ++++++++++++++ TM1py/Services/ProcessService.py | 12 ++- Tests/ProcessService_test.py | 50 +++++++++ Tests/Process_test.py | 168 ++++++++++++++++++++++++++++++- 4 files changed, 304 insertions(+), 3 deletions(-) diff --git a/TM1py/Objects/Process.py b/TM1py/Objects/Process.py index 348bc04a..a59f0e38 100644 --- a/TM1py/Objects/Process.py +++ b/TM1py/Objects/Process.py @@ -65,6 +65,11 @@ def __init__( datasource_subset: str = "", datasource_json_root_pointer: str = "", datasource_json_variable_mapping: str = "", + # --- NEW: Apache Arrow Flight (TM1 12.6.1+) --- + datasource_flight_location: str = "", + datasource_flight_descriptor_type: str = "", + datasource_flight_descriptor: str = "", + datasource_flight_auth: str = "", ): """Default construcor @@ -95,6 +100,10 @@ def __init__( :param datasource_subset: :param datasource_json_root_pointer: :param datasource_json_variable_mapping: + :param datasource_flight_location: Arrow Flight gRPC URI, e.g. "grpc://host:8443" (TM1 12.6.1+) + :param datasource_flight_descriptor_type: Arrow Flight descriptor type: "PATH" or "COMMAND" (TM1 12.6.1+) + :param datasource_flight_descriptor: Arrow Flight descriptor, e.g. "dataset/path" or "SELECT * FROM t" (TM1 12.6.1+) + :param datasource_flight_auth: Arrow Flight auth header, e.g. "Bearer " or "Basic user:pass" (TM1 12.6.1+) """ self._name = name self._has_security_access = has_security_access @@ -127,6 +136,10 @@ def __init__( self._datasource_subset = datasource_subset self._datasource_json_root_pointer = datasource_json_root_pointer self._datasource_json_variable_mapping = datasource_json_variable_mapping + self._datasource_flight_location = datasource_flight_location + self._datasource_flight_descriptor_type = datasource_flight_descriptor_type + self._datasource_flight_descriptor = datasource_flight_descriptor + self._datasource_flight_auth = datasource_flight_auth @classmethod def from_json(cls, process_as_json: str) -> "Process": @@ -171,6 +184,10 @@ def from_dict(cls, process_as_dict: Dict) -> "Process": datasource_subset=process_as_dict["DataSource"].get("subset", ""), datasource_json_root_pointer=process_as_dict["DataSource"].get("jsonRootPointer", ""), datasource_json_variable_mapping=process_as_dict["DataSource"].get("jsonVariableMapping", ""), + datasource_flight_location=process_as_dict["DataSource"].get("flightLocation", ""), + datasource_flight_descriptor_type=process_as_dict["DataSource"].get("flightDescriptorType", ""), + datasource_flight_descriptor=process_as_dict["DataSource"].get("flightDescriptor", ""), + datasource_flight_auth=process_as_dict["DataSource"].get("flightAuth", ""), ) @property @@ -377,6 +394,38 @@ def datasource_json_variable_mapping(self) -> str: def datasource_json_variable_mapping(self, value: str): self._datasource_json_variable_mapping = value + @property + def datasource_flight_location(self) -> str: + return self._datasource_flight_location + + @datasource_flight_location.setter + def datasource_flight_location(self, value: str): + self._datasource_flight_location = value + + @property + def datasource_flight_descriptor_type(self) -> str: + return self._datasource_flight_descriptor_type + + @datasource_flight_descriptor_type.setter + def datasource_flight_descriptor_type(self, value: str): + self._datasource_flight_descriptor_type = value + + @property + def datasource_flight_descriptor(self) -> str: + return self._datasource_flight_descriptor + + @datasource_flight_descriptor.setter + def datasource_flight_descriptor(self, value: str): + self._datasource_flight_descriptor = value + + @property + def datasource_flight_auth(self) -> str: + return self._datasource_flight_auth + + @datasource_flight_auth.setter + def datasource_flight_auth(self, value: str): + self._datasource_flight_auth = value + def add_variable(self, name: str, variable_type: str): """add variable to the process @@ -509,4 +558,32 @@ def _construct_body_as_dict(self) -> Dict: "jsonRootPointer": self._datasource_json_root_pointer, "jsonVariableMapping": self._datasource_json_variable_mapping, } + elif self._datasource_type.upper() in ("ARROW", "PARQUET"): + # Apache Arrow IPC/Feather or Parquet file datasource (TM1 12.6.1+). + # File datasources: just the (file name OR http/https URL) — no ascii* + # delimiter/quote/header keys. Optional JSON-treatment for nested columns + # reuses the JSON datasource's jsonRootPointer / jsonVariableMapping. + # NB: the server canonicalizes Type to title-case ("Arrow"/"Parquet") on + # read-back, so match case-insensitively to keep from_dict round-trips intact. + body_as_dict["DataSource"] = { + "Type": self._datasource_type, + "dataSourceNameForClient": self._datasource_data_source_name_for_client, + "dataSourceNameForServer": self._datasource_data_source_name_for_server, + } + if self._datasource_json_root_pointer: + body_as_dict["DataSource"]["jsonRootPointer"] = self._datasource_json_root_pointer + if self._datasource_json_variable_mapping: + body_as_dict["DataSource"]["jsonVariableMapping"] = self._datasource_json_variable_mapping + elif self._datasource_type.upper() == "FLIGHT": + # Apache Arrow Flight datasource (TM1 12.6.1+): TM1 is a Flight CLIENT that + # streams record batches from a remote Flight server into TI variables. + # Wire field names verified against a live 12.6.1 server: flightLocation / + # flightDescriptorType / flightDescriptor / flightAuth (NOT dataSourceFlight*). + body_as_dict["DataSource"] = { + "Type": self._datasource_type, + "flightLocation": self._datasource_flight_location, + "flightDescriptorType": self._datasource_flight_descriptor_type, + "flightDescriptor": self._datasource_flight_descriptor, + "flightAuth": self._datasource_flight_auth, + } return body_as_dict diff --git a/TM1py/Services/ProcessService.py b/TM1py/Services/ProcessService.py index 01d22ab1..aaf3e1d5 100644 --- a/TM1py/Services/ProcessService.py +++ b/TM1py/Services/ProcessService.py @@ -46,7 +46,11 @@ def get(self, name_process: str, **kwargs) -> Process: "DataSource/usesUnicode," "DataSource/subset," "DataSource/jsonRootPointer," - "DataSource/jsonVariableMapping", + "DataSource/jsonVariableMapping," + "DataSource/flightLocation," + "DataSource/flightDescriptorType," + "DataSource/flightDescriptor," + "DataSource/flightAuth", name_process, ) @@ -78,7 +82,11 @@ def get_all(self, skip_control_processes: bool = False, **kwargs) -> List[Proces "DataSource/usesUnicode," "DataSource/subset," "DataSource/jsonRootPointer," - "DataSource/jsonVariableMapping{}".format(model_process_filter if skip_control_processes else "") + "DataSource/jsonVariableMapping," + "DataSource/flightLocation," + "DataSource/flightDescriptorType," + "DataSource/flightDescriptor," + "DataSource/flightAuth{}".format(model_process_filter if skip_control_processes else "") ) response = self._rest.GET(url, **kwargs) diff --git a/Tests/ProcessService_test.py b/Tests/ProcessService_test.py index e790cc50..a2814ce1 100644 --- a/Tests/ProcessService_test.py +++ b/Tests/ProcessService_test.py @@ -166,6 +166,56 @@ def test_update_or_create(self): self.p_bedrock_server_wait.prolog_procedure = temp_prolog self.tm1.processes.delete(self.p_bedrock_server_wait.name) + @skip_if_version_lower_than(version="12.6.1") + def test_datasource_arrow_parquet_flight_roundtrip(self): + """Live round-trip of the TM1 12.6.1 columnar (ARROW/PARQUET) and Arrow Flight datasources. + + Skipped automatically on servers older than 12.6.1. + """ + arrow = Process( + name="TM1py_Tests_arrow_" + str(uuid.uuid4()), + datasource_type="ARROW", + datasource_data_source_name_for_server="data.arrow", + datasource_data_source_name_for_client="data.arrow", + ) + parquet = Process( + name="TM1py_Tests_parquet_" + str(uuid.uuid4()), + datasource_type="PARQUET", + datasource_data_source_name_for_server="data.parquet", + datasource_data_source_name_for_client="data.parquet", + ) + flight = Process( + name="TM1py_Tests_flight_" + str(uuid.uuid4()), + datasource_type="FLIGHT", + datasource_flight_location="grpc+tls://localhost:443", + datasource_flight_descriptor_type="COMMAND", + datasource_flight_descriptor="SELECT 1", + datasource_flight_auth="Bearer token", + ) + try: + for process in (arrow, parquet, flight): + self.tm1.processes.update_or_create(process) + + # the server canonicalizes Type to title-case ("Arrow"/"Parquet"/"Flight"), so compare case-insensitively + arrow_back = self.tm1.processes.get(arrow.name) + self.assertEqual(arrow_back.datasource_type.upper(), "ARROW") + self.assertEqual(arrow_back.datasource_data_source_name_for_server, "data.arrow") + + parquet_back = self.tm1.processes.get(parquet.name) + self.assertEqual(parquet_back.datasource_type.upper(), "PARQUET") + self.assertEqual(parquet_back.datasource_data_source_name_for_server, "data.parquet") + + flight_back = self.tm1.processes.get(flight.name) + self.assertEqual(flight_back.datasource_type.upper(), "FLIGHT") + self.assertEqual(flight_back.datasource_flight_location, "grpc+tls://localhost:443") + self.assertEqual(flight_back.datasource_flight_descriptor_type, "COMMAND") + self.assertEqual(flight_back.datasource_flight_descriptor, "SELECT 1") + self.assertEqual(flight_back.datasource_flight_auth, "Bearer token") + finally: + for process in (arrow, parquet, flight): + if self.tm1.processes.exists(process.name): + self.tm1.processes.delete(process.name) + def test_execute_process(self): if not self.tm1.processes.exists(self.p_bedrock_server_wait.name): self.tm1.processes.create(self.p_bedrock_server_wait) diff --git a/Tests/Process_test.py b/Tests/Process_test.py index fe4ce827..7765f6a7 100644 --- a/Tests/Process_test.py +++ b/Tests/Process_test.py @@ -1,6 +1,6 @@ import unittest -from TM1py.Objects import BreakPointType, HitMode +from TM1py.Objects import BreakPointType, HitMode, Process class TestBreakPointType(unittest.TestCase): @@ -31,3 +31,169 @@ def test_BreakPointType_init_case(self): def test_BreakPointType_str(self): hit_mode = HitMode.BREAK_ALWAYS self.assertEqual("BreakAlways", str(hit_mode)) + + +class TestProcessDataSource(unittest.TestCase): + """Offline unit tests for the TM1 12.6.1 columnar (ARROW/PARQUET) and Arrow Flight datasource body shapes.""" + + @staticmethod + def _process_dict(datasource: dict) -> dict: + """Minimal Process payload wrapping a given DataSource block (for Process.from_dict).""" + return { + "Name": "p_test", + "HasSecurityAccess": False, + "Parameters": [], + "Variables": [], + "PrologProcedure": "", + "MetadataProcedure": "", + "DataProcedure": "", + "EpilogProcedure": "", + "DataSource": datasource, + } + + def test_arrow_body(self): + process = Process( + name="p_arrow", + datasource_type="ARROW", + datasource_data_source_name_for_server="data.arrow", + datasource_data_source_name_for_client="data.arrow", + ) + datasource = process.body_as_dict["DataSource"] + self.assertEqual( + datasource, + { + "Type": "ARROW", + "dataSourceNameForClient": "data.arrow", + "dataSourceNameForServer": "data.arrow", + }, + ) + # columnar files carry none of the ascii* delimiter/quote/header keys + self.assertFalse(any(key.startswith("ascii") for key in datasource)) + + def test_parquet_body(self): + process = Process( + name="p_parquet", + datasource_type="PARQUET", + datasource_data_source_name_for_server="data.parquet", + datasource_data_source_name_for_client="data.parquet", + ) + datasource = process.body_as_dict["DataSource"] + self.assertEqual( + datasource, + { + "Type": "PARQUET", + "dataSourceNameForClient": "data.parquet", + "dataSourceNameForServer": "data.parquet", + }, + ) + self.assertFalse(any(key.startswith("ascii") for key in datasource)) + + def test_arrow_body_with_json_treatment(self): + process = Process( + name="p_arrow_json", + datasource_type="ARROW", + datasource_data_source_name_for_server="data.arrow", + datasource_data_source_name_for_client="data.arrow", + datasource_json_root_pointer="data", + datasource_json_variable_mapping="{}", + ) + datasource = process.body_as_dict["DataSource"] + self.assertEqual(datasource["jsonRootPointer"], "data") + self.assertEqual(datasource["jsonVariableMapping"], "{}") + + def test_arrow_body_omits_empty_json_treatment(self): + process = Process( + name="p_arrow_plain", + datasource_type="ARROW", + datasource_data_source_name_for_server="data.arrow", + ) + datasource = process.body_as_dict["DataSource"] + self.assertNotIn("jsonRootPointer", datasource) + self.assertNotIn("jsonVariableMapping", datasource) + + def test_flight_body(self): + process = Process( + name="p_flight", + datasource_type="FLIGHT", + datasource_flight_location="grpc+tls://host:443", + datasource_flight_descriptor_type="COMMAND", + datasource_flight_descriptor="SELECT * FROM t", + datasource_flight_auth="Bearer token", + ) + self.assertEqual( + process.body_as_dict["DataSource"], + { + "Type": "FLIGHT", + "flightLocation": "grpc+tls://host:443", + "flightDescriptorType": "COMMAND", + "flightDescriptor": "SELECT * FROM t", + "flightAuth": "Bearer token", + }, + ) + + def test_arrow_roundtrip(self): + datasource = { + "Type": "ARROW", + "dataSourceNameForClient": "data.arrow", + "dataSourceNameForServer": "data.arrow", + } + process = Process.from_dict(self._process_dict(datasource)) + self.assertEqual(process.body_as_dict["DataSource"], datasource) + + def test_parquet_roundtrip(self): + datasource = { + "Type": "PARQUET", + "dataSourceNameForClient": "data.parquet", + "dataSourceNameForServer": "data.parquet", + } + process = Process.from_dict(self._process_dict(datasource)) + self.assertEqual(process.body_as_dict["DataSource"], datasource) + + def test_arrow_roundtrip_with_json_treatment(self): + datasource = { + "Type": "ARROW", + "dataSourceNameForClient": "data.arrow", + "dataSourceNameForServer": "data.arrow", + "jsonRootPointer": "data", + "jsonVariableMapping": "{}", + } + process = Process.from_dict(self._process_dict(datasource)) + self.assertEqual(process.body_as_dict["DataSource"], datasource) + + def test_flight_roundtrip(self): + datasource = { + "Type": "FLIGHT", + "flightLocation": "grpc://host:8443", + "flightDescriptorType": "PATH", + "flightDescriptor": "dataset/path", + "flightAuth": "Basic user:pass", + } + process = Process.from_dict(self._process_dict(datasource)) + self.assertEqual(process.body_as_dict["DataSource"], datasource) + + def test_arrow_from_dict_title_case_type(self): + # The server canonicalizes Type to title-case ("Arrow") on read-back; from_dict + # of such a response must still produce a proper Arrow body, not an empty {}. + datasource = { + "Type": "Arrow", + "dataSourceNameForClient": "data.arrow", + "dataSourceNameForServer": "data.arrow", + } + process = Process.from_dict(self._process_dict(datasource)) + self.assertEqual(process.body_as_dict["DataSource"], datasource) + + def test_flight_from_dict_title_case_type(self): + # Title-case "Flight" (as returned by the server) must round-trip too. + datasource = { + "Type": "Flight", + "flightLocation": "grpc://host:8443", + "flightDescriptorType": "PATH", + "flightDescriptor": "dataset/path", + "flightAuth": "Basic user:pass", + } + process = Process.from_dict(self._process_dict(datasource)) + self.assertEqual(process.body_as_dict["DataSource"], datasource) + + +if __name__ == "__main__": + unittest.main() From 5f6ead9dd308ae6aea9c4cdae32ee07d0e267e4c Mon Sep 17 00:00:00 2001 From: Satoshi Kido Date: Fri, 26 Jun 2026 18:06:41 -0700 Subject: [PATCH 2/2] feat: write_dataframe(use_arrow/use_parquet) columnar blob write (TM1 12.6.1+) Add a pure-Python columnar write path to cells.write_dataframe, mirroring the calling convention of tm1py-rust: pass use_parquet=True (with optional parquet_compression) or use_arrow=True to load a DataFrame through a TM1 12.6.1 ARROW / PARQUET TI datasource instead of the CSV/ASCII blob. - Encodes the aligned DataFrame to an Arrow-IPC stream or a Parquet file via pyarrow (fields v1..vN + vValue; a numeric value column -> float64 -> a Numeric TI variable, no client-side CSV serialization/escaping). - Uploads via FileService and runs an unbound ARROW/PARQUET-datasource process (_build_columnar_blob_to_cube_process mirrors _build_blob_to_cube_process), then deletes the blob. Gated to server >= 12.6.1. - pyarrow is an optional extra (pip install TM1py[arrow]); a guarded import raises a clear error if use_arrow/use_parquet is used without it. Builds on the ARROW/PARQUET Process datasource modelling. Offline unit tests cover the encoder (Arrow/Parquet bytes, field names/types) and the generated TI process body. Validated end-to-end against a live PA 12.6.1 server. Precision note: contrary to the tm1py-rust comments, on the live 12.6.1 server the columnar numeric path is NOT bit-exact for values at the edge of double precision (read-back rounds to ~16 significant digits) whereas the ASCII/CSV path round-trips exactly. Docstrings state precision is ~float64, not guaranteed bit-exact. Co-Authored-By: Claude Opus 4.8 (1M context) --- TM1py/Services/CellService.py | 261 +++++++++++++++++++++++++++++ Tests/CellService_columnar_test.py | 147 ++++++++++++++++ pyproject.toml | 1 + 3 files changed, 409 insertions(+) create mode 100644 Tests/CellService_columnar_test.py diff --git a/TM1py/Services/CellService.py b/TM1py/Services/CellService.py index 90515ed9..5db01f5c 100644 --- a/TM1py/Services/CellService.py +++ b/TM1py/Services/CellService.py @@ -76,6 +76,14 @@ except ImportError: _has_pandas = False +try: + import pyarrow as pa + import pyarrow.parquet as pq + + _has_pyarrow = True +except ImportError: + _has_pyarrow = False + @decohints def tidy_cellset(func): @@ -878,6 +886,9 @@ def write_dataframe( clear_view: str = None, static_dimension_elements: Dict = None, infer_column_order: bool = False, + use_arrow: bool = False, + use_parquet: bool = False, + parquet_compression: str = None, **kwargs, ) -> str: """ @@ -907,6 +918,12 @@ def write_dataframe( :param static_dimension_elements: Dict of fixed dimension element pairs. Column is created for you. :param infer_column_order: bool indicating whether the column order of the dataframe should automatically be inferred and mapped to the dimension order in the cube. + :param use_arrow: write through an Apache Arrow IPC blob + ARROW TI datasource (TM1 server >= 12.6.1). + Requires the optional 'pyarrow' package. Numeric measures bind directly to a Numeric TI variable. + :param use_parquet: write through an Apache Parquet blob + PARQUET TI datasource (TM1 server >= 12.6.1). + Requires the optional 'pyarrow' package. Mutually exclusive with 'use_arrow'. + :param parquet_compression: parquet codec when use_parquet=True (e.g. 'snappy', 'zstd', 'gzip', 'none'). + Defaults to 'snappy'. Ignored for use_arrow. :return: changeset or None """ if not isinstance(data, pd.DataFrame): @@ -944,6 +961,25 @@ def write_dataframe( if not len(data.columns) == len(dimensions) + 1: raise ValueError("Number of columns in 'data' DataFrame must be number of dimensions in cube + 1") + if use_arrow or use_parquet: + if use_arrow and use_parquet: + raise ValueError("'use_arrow' and 'use_parquet' must not be used together") + return self._write_dataframe_through_columnar_blob( + cube_name=cube_name, + data=data, + dimensions=dimensions, + columnar_format="PARQUET" if use_parquet else "ARROW", + parquet_compression=parquet_compression, + increment=increment, + sandbox_name=sandbox_name, + skip_non_updateable=skip_non_updateable, + sum_numeric_duplicates=sum_numeric_duplicates, + remove_blob=remove_blob, + allow_spread=allow_spread, + clear_view=clear_view, + **kwargs, + ) + cells = build_cellset_from_pandas_dataframe(data, sum_numeric_duplicates=sum_numeric_duplicates) return self.write( @@ -1607,6 +1643,231 @@ def _build_blob_to_cube_process( return dataload_process + def _write_dataframe_through_columnar_blob( + self, + cube_name: str, + data: "pd.DataFrame", + dimensions: List[str], + columnar_format: str, + parquet_compression: str = None, + increment: bool = False, + sandbox_name: str = None, + skip_non_updateable: bool = False, + sum_numeric_duplicates: bool = True, + remove_blob: bool = True, + allow_spread: bool = False, + clear_view: str = None, + **kwargs, + ) -> None: + """Write a DataFrame to a cube via a TM1 12.6.1+ columnar (ARROW/PARQUET) datasource blob. + + Mirrors `write_through_blob`, but uploads an Apache Arrow IPC stream or a Parquet file + instead of CSV and loads it through an ARROW/PARQUET TI datasource (no client-side CSV + serialization/escaping; Parquet additionally compresses the upload). A numeric value column + is already typed in the file, so it binds directly to a Numeric TI variable rather than being + re-parsed with StringToNumber. Note: numeric precision on read-back is governed by the TM1 + server's columnar ingestion (~float64); it is not guaranteed bit-exact for values at the + edge of double precision. Requires a TM1 v12 server, build 12.6.1 or later, and 'pyarrow'. + """ + if not _has_pyarrow: + raise ImportError( + "'use_arrow'/'use_parquet' require the optional 'pyarrow' package. " + "Install it with: pip install TM1py[arrow]" + ) + if not verify_version(required_version="12.6.1", version=self.version): + raise ValueError( + "Columnar ('use_arrow'/'use_parquet') write requires a TM1 server >= 12.6.1, " + f"but the server version is '{self.version}'" + ) + + if sum_numeric_duplicates: + data = build_dataframe_aggregate_intersections(data, sum_numeric_duplicates=True) + + blob_bytes, value_is_numeric, extension = self._encode_dataframe_columnar( + data, columnar_format, parquet_compression + ) + + process_service = ProcessService(self._rest) + file_service = FileService(self._rest) + + unique_name = self.suggest_unique_object_name() + file_name = f"{unique_name}{extension}" + file_service.create(file_name=file_name, file_content=blob_bytes, **kwargs) + + try: + process = self._build_columnar_blob_to_cube_process( + cube_name=cube_name, + process_name=unique_name, + blob_filename=file_name, + dimensions=dimensions, + datasource_type=columnar_format, + value_is_numeric=value_is_numeric, + increment=increment, + skip_non_updateable=skip_non_updateable, + sandbox_name=sandbox_name, + allow_spread=allow_spread, + clear_view=clear_view, + ) + + success, status, log_file = process_service.execute_process_with_return(process=process, **kwargs) + if not success: + if status in ["HasMinorErrors"]: + raise TM1pyWritePartialFailureException([status], [log_file], 1) + else: + raise TM1pyWriteFailureException([status], [log_file]) + finally: + if remove_blob: + file_service.delete(file_name=file_name) + + @staticmethod + def _encode_dataframe_columnar( + data: "pd.DataFrame", columnar_format: str, parquet_compression: str = None + ) -> Tuple[bytes, bool, str]: + """Encode an aligned DataFrame (dimension columns first, value column last) into an Arrow-IPC + stream or a Parquet file for a TM1 12.6.1 columnar datasource. + + Columnar fields are named v1..vN (coordinates, String) and vValue (the measure). A numeric + value column is written as float64 (round-trips exactly); a non-numeric column is written as + string (re-parsed by StringToNumber in the TI, matching the CSV path). + :return: tuple of (blob bytes, value_is_numeric, file extension) + """ + coord_columns = list(data.columns[:-1]) + value_series = data[data.columns[-1]] + value_is_numeric = bool(pd.api.types.is_numeric_dtype(value_series)) + + encoded = pd.DataFrame({f"v{i}": data[col].astype(str) for i, col in enumerate(coord_columns, start=1)}) + if value_is_numeric: + encoded["vValue"] = pd.to_numeric(value_series).astype("float64") + else: + encoded["vValue"] = value_series.where(value_series.notna(), None).astype(object) + + table = pa.Table.from_pandas(encoded, preserve_index=False) + + sink = pa.BufferOutputStream() + if columnar_format == "PARQUET": + pq.write_table(table, sink, compression=parquet_compression or "snappy") + extension = ".parquet" + else: # ARROW IPC stream (read by the TM1 ARROW / Feather datasource) + with pa.ipc.new_stream(sink, table.schema) as writer: + writer.write_table(table) + extension = ".arrow" + + return sink.getvalue().to_pybytes(), value_is_numeric, extension + + def _build_columnar_blob_to_cube_process( + self, + cube_name: str, + process_name: str, + blob_filename: str, + dimensions: List[str], + datasource_type: str, + value_is_numeric: bool, + increment: bool, + skip_non_updateable: bool, + sandbox_name: str, + allow_spread: bool, + clear_view: str, + ) -> Process: + """Build the unbound ARROW/PARQUET-datasource TI process that loads a columnar blob into `cube_name`. + + Mirrors `_build_blob_to_cube_process`, but targets a TM1 12.6.1 columnar datasource (no ascii* + keys) whose fields bind to TI variables by name (v1..vN, vValue). When the value column is + numeric, vValue is a Numeric variable used directly (no StringToNumber); otherwise it is a + String variable re-parsed with StringToNumber, matching the ASCII/CSV path. + """ + dataload_process = Process( + name=process_name, + datasource_type=datasource_type, + datasource_data_source_name_for_server=blob_filename, + datasource_data_source_name_for_client=blob_filename, + ) + + # Prolog: input charset + optional enable-sandbox (+ optional clear_view), as in the ASCII path + dataload_process.prolog_procedure = f""" + SetInputCharacterSet('TM1CS_UTF8'); + {self.generate_enable_sandbox_ti(sandbox_name)} + """ + if clear_view: + dataload_process.prolog_procedure = ( + dataload_process.prolog_procedure + f"\rViewZeroOut('{cube_name}', '{clear_view}');\r" + ) + + # Variables bind to the columnar fields by name: v1..vN (coordinates, String), then vValue. + dimension_variables = [f"v{n}" for n in range(1, len(dimensions) + 1)] + for variable in dimension_variables: + dataload_process.add_variable(name=variable, variable_type="String") + value_variable = "vValue" + dataload_process.add_variable(name=value_variable, variable_type="Numeric" if value_is_numeric else "String") + + cube_measure = dimensions[-1] + variable_cube_measure = dimension_variables[-1] + comma_sep_var_elements = ",".join(dimension_variables) + + if cube_name.lower().startswith("}elementattributes_"): + numeric_function_str = "CellPutN" + else: + numeric_function_str = "CellIncrementN" if increment else "CellPutN" + + # A numeric columnar column binds to a Numeric variable (used directly, no StringToNumber); a + # string column is re-parsed with StringToNumber. A numeric value written to a string measure + # is stringified (its variable is Numeric). + numeric_value_assignment = ( + f"nValue = {value_variable};" if value_is_numeric else f"nValue = StringToNumber({value_variable});" + ) + string_value_expression = f"NumberToString({value_variable})" if value_is_numeric else value_variable + + measure_type_equal = f"ElementType('{cube_measure}', '', {variable_cube_measure}) @= " + n_write_types = ["'N'", "'AN'", "'C'", "''"] + numeric_write_condition = "% \n".join([measure_type_equal + possible_type for possible_type in n_write_types]) + + any_c_element_in_write = "% \n".join( + [f"ElementType('{dim}', '', {ele}) @= 'C'" for dim, ele in zip(dimensions, dimension_variables)] + ) + + numeric_write_statement_with_spread = f""" + {numeric_value_assignment} + IF({any_c_element_in_write}); + CellPutProportionalSpread(nValue,'{cube_name}',{comma_sep_var_elements}); + ELSE; + {numeric_function_str}(nValue,'{cube_name}',{comma_sep_var_elements}); + ENDIF; + """ + + numeric_write_statement_without_spread = f""" + {numeric_value_assignment} + {numeric_function_str}(nValue,'{cube_name}',{comma_sep_var_elements}); + """ + + string_write_condition = f""" + ElementType('{cube_measure}', '', {variable_cube_measure}) @= 'S' % + ElementType('{cube_measure}', '', {variable_cube_measure}) @= 'AS' % + ElementType('{cube_measure}', '', {variable_cube_measure}) @= 'AA' + """ + + string_write_statement = f""" + sValue = {string_value_expression}; + CellPutS(sValue,'{cube_name}',{comma_sep_var_elements}); + """ + + input_statement = f""" + If({numeric_write_condition}); + {numeric_write_statement_with_spread if allow_spread else numeric_write_statement_without_spread} + ElseIf({string_write_condition}); + {string_write_statement} + EndIf;""" + + if skip_non_updateable: + cell_is_updateable_pre = f"If( CellIsUpdateable('{cube_name}',{comma_sep_var_elements}) = 1 );" + cell_is_updateable_post = "\rElse;\r" " ItemSkip;\r" "EndIf;\r" + else: + cell_is_updateable_pre = "" + cell_is_updateable_post = "" + + data_statement = cell_is_updateable_pre + input_statement + cell_is_updateable_post + dataload_process.data_procedure = data_statement + + return dataload_process + def _build_cube_to_blob_process( self, cube: str, diff --git a/Tests/CellService_columnar_test.py b/Tests/CellService_columnar_test.py new file mode 100644 index 00000000..c9a9318c --- /dev/null +++ b/Tests/CellService_columnar_test.py @@ -0,0 +1,147 @@ +"""Offline unit tests for the TM1 12.6.1 columnar (ARROW/PARQUET) write path on CellService. + +Covers the two pure pieces that need no TM1 server: + - CellService._encode_dataframe_columnar (DataFrame -> Arrow-IPC / Parquet bytes) + - CellService._build_columnar_blob_to_cube_process (the ARROW/PARQUET TI process body) +""" + +import unittest +from types import SimpleNamespace + +import pandas as pd +import pyarrow as pa +import pyarrow.ipc +import pyarrow.parquet as pq + +from TM1py.Services.CellService import CellService + + +class TestEncodeDataframeColumnar(unittest.TestCase): + + @staticmethod + def _numeric_df(): + return pd.DataFrame( + [["2024", "Revenue", 123456789.12345678], ["2025", "Revenue", 0.1]], + columns=["Year", "Measure", "Value"], + ) + + def test_parquet_numeric_fields_types_and_exact_roundtrip(self): + blob, value_is_numeric, extension = CellService._encode_dataframe_columnar( + self._numeric_df(), "PARQUET", "zstd" + ) + self.assertTrue(value_is_numeric) + self.assertEqual(extension, ".parquet") + self.assertEqual(blob[:4], b"PAR1") # parquet magic + table = pq.read_table(pa.BufferReader(blob)) + self.assertEqual(table.schema.names, ["v1", "v2", "vValue"]) + self.assertEqual(table.schema.field("vValue").type, pa.float64()) + # exact f64 round-trip (the whole point of the numeric columnar path) + self.assertEqual(table.column("vValue").to_pylist()[0], 123456789.12345678) + + def test_arrow_ipc_numeric_is_readable_stream(self): + blob, value_is_numeric, extension = CellService._encode_dataframe_columnar(self._numeric_df(), "ARROW", None) + self.assertTrue(value_is_numeric) + self.assertEqual(extension, ".arrow") + table = pa.ipc.open_stream(pa.BufferReader(blob)).read_all() + self.assertEqual(table.schema.names, ["v1", "v2", "vValue"]) + self.assertEqual(table.schema.field("vValue").type, pa.float64()) + self.assertEqual(table.num_rows, 2) + + def test_string_measure_encodes_as_string_column(self): + df = pd.DataFrame([["2024", "Comment", "hello"]], columns=["Year", "Measure", "Value"]) + blob, value_is_numeric, extension = CellService._encode_dataframe_columnar(df, "PARQUET", None) + self.assertFalse(value_is_numeric) + table = pq.read_table(pa.BufferReader(blob)) + self.assertEqual(table.schema.field("vValue").type, pa.string()) + self.assertEqual(table.column("vValue").to_pylist(), ["hello"]) + + +class TestBuildColumnarBlobToCubeProcess(unittest.TestCase): + + @staticmethod + def _cell_service(): + # Build a CellService without connecting; the process builder only touches + # self._rest.sandboxing_disabled (via generate_enable_sandbox_ti). + cs = CellService.__new__(CellService) + cs._rest = SimpleNamespace(sandboxing_disabled=True) + return cs + + def _build(self, datasource_type, value_is_numeric, **overrides): + kwargs = dict( + cube_name="Sales", + process_name="p_test", + blob_filename="data" + (".parquet" if datasource_type == "PARQUET" else ".arrow"), + dimensions=["Year", "Measure"], + datasource_type=datasource_type, + value_is_numeric=value_is_numeric, + increment=False, + skip_non_updateable=False, + sandbox_name=None, + allow_spread=False, + clear_view=None, + ) + kwargs.update(overrides) + return self._cell_service()._build_columnar_blob_to_cube_process(**kwargs) + + def test_parquet_numeric_datasource_and_exact_numeric_binding(self): + process = self._build("PARQUET", value_is_numeric=True) + datasource = process.body_as_dict["DataSource"] + self.assertEqual(datasource["Type"], "PARQUET") + self.assertEqual(datasource["dataSourceNameForServer"], "data.parquet") + self.assertFalse(any(key.startswith("ascii") for key in datasource)) + # numeric column -> Numeric vValue used directly (exact f64), never StringToNumber + self.assertIn("nValue = vValue;", process.data_procedure) + self.assertNotIn("StringToNumber", process.data_procedure) + self.assertIn("CellPutN(nValue,'Sales',v1,v2)", process.data_procedure) + value_variable = next(v for v in process.variables if v["Name"] == "vValue") + self.assertEqual(value_variable["Type"], "Numeric") + # coordinate variables stay String and are named to match the columnar fields + self.assertEqual([v["Name"] for v in process.variables], ["v1", "v2", "vValue"]) + self.assertEqual(process.variables[0]["Type"], "String") + + def test_arrow_string_measure_uses_stringtonumber_and_string_var(self): + process = self._build("ARROW", value_is_numeric=False) + datasource = process.body_as_dict["DataSource"] + self.assertEqual(datasource["Type"], "ARROW") + self.assertFalse(any(key.startswith("ascii") for key in datasource)) + self.assertIn("nValue = StringToNumber(vValue);", process.data_procedure) + value_variable = next(v for v in process.variables if v["Name"] == "vValue") + self.assertEqual(value_variable["Type"], "String") + + def test_numeric_value_into_string_measure_is_stringified(self): + process = self._build("PARQUET", value_is_numeric=True) + # a numeric column written into a string measure must be stringified (var is Numeric) + self.assertIn("CellPutS(sValue,'Sales',v1,v2)", process.data_procedure) + self.assertIn("NumberToString(vValue)", process.data_procedure) + + def test_increment_uses_cellincrementn(self): + process = self._build("PARQUET", value_is_numeric=True, increment=True) + self.assertIn("CellIncrementN(nValue,'Sales',v1,v2)", process.data_procedure) + + def test_allow_spread_emits_proportional_spread(self): + process = self._build("PARQUET", value_is_numeric=True, allow_spread=True) + self.assertIn("CellPutProportionalSpread(nValue,'Sales',v1,v2)", process.data_procedure) + + def test_skip_non_updateable_wraps_in_cell_is_updateable(self): + process = self._build("PARQUET", value_is_numeric=True, skip_non_updateable=True) + self.assertIn("CellIsUpdateable('Sales',v1,v2)", process.data_procedure) + self.assertIn("ItemSkip", process.data_procedure) + + def test_clear_view_adds_viewzeroout_to_prolog(self): + process = self._build("PARQUET", value_is_numeric=True, clear_view="ToClear") + self.assertIn("ViewZeroOut('Sales', 'ToClear')", process.prolog_procedure) + + def test_attribute_cube_forces_cellputn_even_with_increment(self): + process = self._build( + "PARQUET", + value_is_numeric=True, + cube_name="}ElementAttributes_Region", + dimensions=["Region", "}ElementAttributes_Region"], + increment=True, + ) + self.assertIn("CellPutN(nValue,", process.data_procedure) + self.assertNotIn("CellIncrementN", process.data_procedure) + + +if __name__ == "__main__": + unittest.main() diff --git a/pyproject.toml b/pyproject.toml index 4aaf631f..3868b9b3 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,6 +37,7 @@ dependencies = [ [project.optional-dependencies] pandas = ["pandas"] +arrow = ["pandas", "pyarrow"] dev = [ "pytest", "pytest-xdist",