From 50820ec1e523fa253ebc798aeeb5aa748463e483 Mon Sep 17 00:00:00 2001 From: Satoshi Kido Date: Fri, 26 Jun 2026 11:29:46 -0700 Subject: [PATCH] feat: model TM1 12.6.1 Arrow/Parquet/Flight TI datasources Add first-class modelling for the three new TurboIntegrator input datasource types introduced in TM1 v12 build 12.6.1 (Apache Arrow IPC/Feather, Parquet, and Arrow Flight), verified end-to-end against a live PA 12.6.1 server. Process: - four new datasource_flight_* scalars (constructor, from_dict, properties) - ARROW/PARQUET branch reuses dataSourceNameForServer/Client and optional jsonRootPointer / jsonVariableMapping (no ascii* fields) - FLIGHT branch emits the server's verified wire fields: flightLocation / flightDescriptorType / flightDescriptor / flightAuth - columnar/Flight branches match Type case-insensitively, because the server canonicalizes Type to title-case on read-back ("Arrow"/"Parquet"/ "Flight"); this keeps from_dict(server_response) round-trips from collapsing to an empty DataSource block ProcessService.get/get_all select the four flight fields. datasource_type stays a free string (no Type whitelist) and no version gating is added, since ProcessDataSource is an OData open type validated by the engine at execute time. Tests: offline unit tests for the body shapes, from_dict round-trips, and the title-case Type round-trip (17 total), plus an optional live round-trip test gated to TM1 >= 12.6.1. Offline suite and the live 12.6.1 round-trip both pass. Co-Authored-By: Claude Opus 4.8 (1M context) --- TM1py/Objects/Process.py | 77 ++++++++++++++ TM1py/Services/ProcessService.py | 12 ++- Tests/ProcessService_test.py | 50 +++++++++ Tests/Process_test.py | 168 ++++++++++++++++++++++++++++++- 4 files changed, 304 insertions(+), 3 deletions(-) diff --git a/TM1py/Objects/Process.py b/TM1py/Objects/Process.py index 348bc04a..a59f0e38 100644 --- a/TM1py/Objects/Process.py +++ b/TM1py/Objects/Process.py @@ -65,6 +65,11 @@ def __init__( datasource_subset: str = "", datasource_json_root_pointer: str = "", datasource_json_variable_mapping: str = "", + # --- NEW: Apache Arrow Flight (TM1 12.6.1+) --- + datasource_flight_location: str = "", + datasource_flight_descriptor_type: str = "", + datasource_flight_descriptor: str = "", + datasource_flight_auth: str = "", ): """Default construcor @@ -95,6 +100,10 @@ def __init__( :param datasource_subset: :param datasource_json_root_pointer: :param datasource_json_variable_mapping: + :param datasource_flight_location: Arrow Flight gRPC URI, e.g. "grpc://host:8443" (TM1 12.6.1+) + :param datasource_flight_descriptor_type: Arrow Flight descriptor type: "PATH" or "COMMAND" (TM1 12.6.1+) + :param datasource_flight_descriptor: Arrow Flight descriptor, e.g. "dataset/path" or "SELECT * FROM t" (TM1 12.6.1+) + :param datasource_flight_auth: Arrow Flight auth header, e.g. "Bearer " or "Basic user:pass" (TM1 12.6.1+) """ self._name = name self._has_security_access = has_security_access @@ -127,6 +136,10 @@ def __init__( self._datasource_subset = datasource_subset self._datasource_json_root_pointer = datasource_json_root_pointer self._datasource_json_variable_mapping = datasource_json_variable_mapping + self._datasource_flight_location = datasource_flight_location + self._datasource_flight_descriptor_type = datasource_flight_descriptor_type + self._datasource_flight_descriptor = datasource_flight_descriptor + self._datasource_flight_auth = datasource_flight_auth @classmethod def from_json(cls, process_as_json: str) -> "Process": @@ -171,6 +184,10 @@ def from_dict(cls, process_as_dict: Dict) -> "Process": datasource_subset=process_as_dict["DataSource"].get("subset", ""), datasource_json_root_pointer=process_as_dict["DataSource"].get("jsonRootPointer", ""), datasource_json_variable_mapping=process_as_dict["DataSource"].get("jsonVariableMapping", ""), + datasource_flight_location=process_as_dict["DataSource"].get("flightLocation", ""), + datasource_flight_descriptor_type=process_as_dict["DataSource"].get("flightDescriptorType", ""), + datasource_flight_descriptor=process_as_dict["DataSource"].get("flightDescriptor", ""), + datasource_flight_auth=process_as_dict["DataSource"].get("flightAuth", ""), ) @property @@ -377,6 +394,38 @@ def datasource_json_variable_mapping(self) -> str: def datasource_json_variable_mapping(self, value: str): self._datasource_json_variable_mapping = value + @property + def datasource_flight_location(self) -> str: + return self._datasource_flight_location + + @datasource_flight_location.setter + def datasource_flight_location(self, value: str): + self._datasource_flight_location = value + + @property + def datasource_flight_descriptor_type(self) -> str: + return self._datasource_flight_descriptor_type + + @datasource_flight_descriptor_type.setter + def datasource_flight_descriptor_type(self, value: str): + self._datasource_flight_descriptor_type = value + + @property + def datasource_flight_descriptor(self) -> str: + return self._datasource_flight_descriptor + + @datasource_flight_descriptor.setter + def datasource_flight_descriptor(self, value: str): + self._datasource_flight_descriptor = value + + @property + def datasource_flight_auth(self) -> str: + return self._datasource_flight_auth + + @datasource_flight_auth.setter + def datasource_flight_auth(self, value: str): + self._datasource_flight_auth = value + def add_variable(self, name: str, variable_type: str): """add variable to the process @@ -509,4 +558,32 @@ def _construct_body_as_dict(self) -> Dict: "jsonRootPointer": self._datasource_json_root_pointer, "jsonVariableMapping": self._datasource_json_variable_mapping, } + elif self._datasource_type.upper() in ("ARROW", "PARQUET"): + # Apache Arrow IPC/Feather or Parquet file datasource (TM1 12.6.1+). + # File datasources: just the (file name OR http/https URL) — no ascii* + # delimiter/quote/header keys. Optional JSON-treatment for nested columns + # reuses the JSON datasource's jsonRootPointer / jsonVariableMapping. + # NB: the server canonicalizes Type to title-case ("Arrow"/"Parquet") on + # read-back, so match case-insensitively to keep from_dict round-trips intact. + body_as_dict["DataSource"] = { + "Type": self._datasource_type, + "dataSourceNameForClient": self._datasource_data_source_name_for_client, + "dataSourceNameForServer": self._datasource_data_source_name_for_server, + } + if self._datasource_json_root_pointer: + body_as_dict["DataSource"]["jsonRootPointer"] = self._datasource_json_root_pointer + if self._datasource_json_variable_mapping: + body_as_dict["DataSource"]["jsonVariableMapping"] = self._datasource_json_variable_mapping + elif self._datasource_type.upper() == "FLIGHT": + # Apache Arrow Flight datasource (TM1 12.6.1+): TM1 is a Flight CLIENT that + # streams record batches from a remote Flight server into TI variables. + # Wire field names verified against a live 12.6.1 server: flightLocation / + # flightDescriptorType / flightDescriptor / flightAuth (NOT dataSourceFlight*). + body_as_dict["DataSource"] = { + "Type": self._datasource_type, + "flightLocation": self._datasource_flight_location, + "flightDescriptorType": self._datasource_flight_descriptor_type, + "flightDescriptor": self._datasource_flight_descriptor, + "flightAuth": self._datasource_flight_auth, + } return body_as_dict diff --git a/TM1py/Services/ProcessService.py b/TM1py/Services/ProcessService.py index 01d22ab1..aaf3e1d5 100644 --- a/TM1py/Services/ProcessService.py +++ b/TM1py/Services/ProcessService.py @@ -46,7 +46,11 @@ def get(self, name_process: str, **kwargs) -> Process: "DataSource/usesUnicode," "DataSource/subset," "DataSource/jsonRootPointer," - "DataSource/jsonVariableMapping", + "DataSource/jsonVariableMapping," + "DataSource/flightLocation," + "DataSource/flightDescriptorType," + "DataSource/flightDescriptor," + "DataSource/flightAuth", name_process, ) @@ -78,7 +82,11 @@ def get_all(self, skip_control_processes: bool = False, **kwargs) -> List[Proces "DataSource/usesUnicode," "DataSource/subset," "DataSource/jsonRootPointer," - "DataSource/jsonVariableMapping{}".format(model_process_filter if skip_control_processes else "") + "DataSource/jsonVariableMapping," + "DataSource/flightLocation," + "DataSource/flightDescriptorType," + "DataSource/flightDescriptor," + "DataSource/flightAuth{}".format(model_process_filter if skip_control_processes else "") ) response = self._rest.GET(url, **kwargs) diff --git a/Tests/ProcessService_test.py b/Tests/ProcessService_test.py index e790cc50..a2814ce1 100644 --- a/Tests/ProcessService_test.py +++ b/Tests/ProcessService_test.py @@ -166,6 +166,56 @@ def test_update_or_create(self): self.p_bedrock_server_wait.prolog_procedure = temp_prolog self.tm1.processes.delete(self.p_bedrock_server_wait.name) + @skip_if_version_lower_than(version="12.6.1") + def test_datasource_arrow_parquet_flight_roundtrip(self): + """Live round-trip of the TM1 12.6.1 columnar (ARROW/PARQUET) and Arrow Flight datasources. + + Skipped automatically on servers older than 12.6.1. + """ + arrow = Process( + name="TM1py_Tests_arrow_" + str(uuid.uuid4()), + datasource_type="ARROW", + datasource_data_source_name_for_server="data.arrow", + datasource_data_source_name_for_client="data.arrow", + ) + parquet = Process( + name="TM1py_Tests_parquet_" + str(uuid.uuid4()), + datasource_type="PARQUET", + datasource_data_source_name_for_server="data.parquet", + datasource_data_source_name_for_client="data.parquet", + ) + flight = Process( + name="TM1py_Tests_flight_" + str(uuid.uuid4()), + datasource_type="FLIGHT", + datasource_flight_location="grpc+tls://localhost:443", + datasource_flight_descriptor_type="COMMAND", + datasource_flight_descriptor="SELECT 1", + datasource_flight_auth="Bearer token", + ) + try: + for process in (arrow, parquet, flight): + self.tm1.processes.update_or_create(process) + + # the server canonicalizes Type to title-case ("Arrow"/"Parquet"/"Flight"), so compare case-insensitively + arrow_back = self.tm1.processes.get(arrow.name) + self.assertEqual(arrow_back.datasource_type.upper(), "ARROW") + self.assertEqual(arrow_back.datasource_data_source_name_for_server, "data.arrow") + + parquet_back = self.tm1.processes.get(parquet.name) + self.assertEqual(parquet_back.datasource_type.upper(), "PARQUET") + self.assertEqual(parquet_back.datasource_data_source_name_for_server, "data.parquet") + + flight_back = self.tm1.processes.get(flight.name) + self.assertEqual(flight_back.datasource_type.upper(), "FLIGHT") + self.assertEqual(flight_back.datasource_flight_location, "grpc+tls://localhost:443") + self.assertEqual(flight_back.datasource_flight_descriptor_type, "COMMAND") + self.assertEqual(flight_back.datasource_flight_descriptor, "SELECT 1") + self.assertEqual(flight_back.datasource_flight_auth, "Bearer token") + finally: + for process in (arrow, parquet, flight): + if self.tm1.processes.exists(process.name): + self.tm1.processes.delete(process.name) + def test_execute_process(self): if not self.tm1.processes.exists(self.p_bedrock_server_wait.name): self.tm1.processes.create(self.p_bedrock_server_wait) diff --git a/Tests/Process_test.py b/Tests/Process_test.py index fe4ce827..7765f6a7 100644 --- a/Tests/Process_test.py +++ b/Tests/Process_test.py @@ -1,6 +1,6 @@ import unittest -from TM1py.Objects import BreakPointType, HitMode +from TM1py.Objects import BreakPointType, HitMode, Process class TestBreakPointType(unittest.TestCase): @@ -31,3 +31,169 @@ def test_BreakPointType_init_case(self): def test_BreakPointType_str(self): hit_mode = HitMode.BREAK_ALWAYS self.assertEqual("BreakAlways", str(hit_mode)) + + +class TestProcessDataSource(unittest.TestCase): + """Offline unit tests for the TM1 12.6.1 columnar (ARROW/PARQUET) and Arrow Flight datasource body shapes.""" + + @staticmethod + def _process_dict(datasource: dict) -> dict: + """Minimal Process payload wrapping a given DataSource block (for Process.from_dict).""" + return { + "Name": "p_test", + "HasSecurityAccess": False, + "Parameters": [], + "Variables": [], + "PrologProcedure": "", + "MetadataProcedure": "", + "DataProcedure": "", + "EpilogProcedure": "", + "DataSource": datasource, + } + + def test_arrow_body(self): + process = Process( + name="p_arrow", + datasource_type="ARROW", + datasource_data_source_name_for_server="data.arrow", + datasource_data_source_name_for_client="data.arrow", + ) + datasource = process.body_as_dict["DataSource"] + self.assertEqual( + datasource, + { + "Type": "ARROW", + "dataSourceNameForClient": "data.arrow", + "dataSourceNameForServer": "data.arrow", + }, + ) + # columnar files carry none of the ascii* delimiter/quote/header keys + self.assertFalse(any(key.startswith("ascii") for key in datasource)) + + def test_parquet_body(self): + process = Process( + name="p_parquet", + datasource_type="PARQUET", + datasource_data_source_name_for_server="data.parquet", + datasource_data_source_name_for_client="data.parquet", + ) + datasource = process.body_as_dict["DataSource"] + self.assertEqual( + datasource, + { + "Type": "PARQUET", + "dataSourceNameForClient": "data.parquet", + "dataSourceNameForServer": "data.parquet", + }, + ) + self.assertFalse(any(key.startswith("ascii") for key in datasource)) + + def test_arrow_body_with_json_treatment(self): + process = Process( + name="p_arrow_json", + datasource_type="ARROW", + datasource_data_source_name_for_server="data.arrow", + datasource_data_source_name_for_client="data.arrow", + datasource_json_root_pointer="data", + datasource_json_variable_mapping="{}", + ) + datasource = process.body_as_dict["DataSource"] + self.assertEqual(datasource["jsonRootPointer"], "data") + self.assertEqual(datasource["jsonVariableMapping"], "{}") + + def test_arrow_body_omits_empty_json_treatment(self): + process = Process( + name="p_arrow_plain", + datasource_type="ARROW", + datasource_data_source_name_for_server="data.arrow", + ) + datasource = process.body_as_dict["DataSource"] + self.assertNotIn("jsonRootPointer", datasource) + self.assertNotIn("jsonVariableMapping", datasource) + + def test_flight_body(self): + process = Process( + name="p_flight", + datasource_type="FLIGHT", + datasource_flight_location="grpc+tls://host:443", + datasource_flight_descriptor_type="COMMAND", + datasource_flight_descriptor="SELECT * FROM t", + datasource_flight_auth="Bearer token", + ) + self.assertEqual( + process.body_as_dict["DataSource"], + { + "Type": "FLIGHT", + "flightLocation": "grpc+tls://host:443", + "flightDescriptorType": "COMMAND", + "flightDescriptor": "SELECT * FROM t", + "flightAuth": "Bearer token", + }, + ) + + def test_arrow_roundtrip(self): + datasource = { + "Type": "ARROW", + "dataSourceNameForClient": "data.arrow", + "dataSourceNameForServer": "data.arrow", + } + process = Process.from_dict(self._process_dict(datasource)) + self.assertEqual(process.body_as_dict["DataSource"], datasource) + + def test_parquet_roundtrip(self): + datasource = { + "Type": "PARQUET", + "dataSourceNameForClient": "data.parquet", + "dataSourceNameForServer": "data.parquet", + } + process = Process.from_dict(self._process_dict(datasource)) + self.assertEqual(process.body_as_dict["DataSource"], datasource) + + def test_arrow_roundtrip_with_json_treatment(self): + datasource = { + "Type": "ARROW", + "dataSourceNameForClient": "data.arrow", + "dataSourceNameForServer": "data.arrow", + "jsonRootPointer": "data", + "jsonVariableMapping": "{}", + } + process = Process.from_dict(self._process_dict(datasource)) + self.assertEqual(process.body_as_dict["DataSource"], datasource) + + def test_flight_roundtrip(self): + datasource = { + "Type": "FLIGHT", + "flightLocation": "grpc://host:8443", + "flightDescriptorType": "PATH", + "flightDescriptor": "dataset/path", + "flightAuth": "Basic user:pass", + } + process = Process.from_dict(self._process_dict(datasource)) + self.assertEqual(process.body_as_dict["DataSource"], datasource) + + def test_arrow_from_dict_title_case_type(self): + # The server canonicalizes Type to title-case ("Arrow") on read-back; from_dict + # of such a response must still produce a proper Arrow body, not an empty {}. + datasource = { + "Type": "Arrow", + "dataSourceNameForClient": "data.arrow", + "dataSourceNameForServer": "data.arrow", + } + process = Process.from_dict(self._process_dict(datasource)) + self.assertEqual(process.body_as_dict["DataSource"], datasource) + + def test_flight_from_dict_title_case_type(self): + # Title-case "Flight" (as returned by the server) must round-trip too. + datasource = { + "Type": "Flight", + "flightLocation": "grpc://host:8443", + "flightDescriptorType": "PATH", + "flightDescriptor": "dataset/path", + "flightAuth": "Basic user:pass", + } + process = Process.from_dict(self._process_dict(datasource)) + self.assertEqual(process.body_as_dict["DataSource"], datasource) + + +if __name__ == "__main__": + unittest.main()