Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions TM1py/Objects/Process.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,11 @@ def __init__(
datasource_subset: str = "",
datasource_json_root_pointer: str = "",
datasource_json_variable_mapping: str = "",
# --- NEW: Apache Arrow Flight (TM1 12.6.1+) ---
datasource_flight_location: str = "",
datasource_flight_descriptor_type: str = "",
datasource_flight_descriptor: str = "",
datasource_flight_auth: str = "",
):
"""Default construcor

Expand Down Expand Up @@ -95,6 +100,10 @@ def __init__(
:param datasource_subset:
:param datasource_json_root_pointer:
:param datasource_json_variable_mapping:
:param datasource_flight_location: Arrow Flight gRPC URI, e.g. "grpc://host:8443" (TM1 12.6.1+)
:param datasource_flight_descriptor_type: Arrow Flight descriptor type: "PATH" or "COMMAND" (TM1 12.6.1+)
:param datasource_flight_descriptor: Arrow Flight descriptor, e.g. "dataset/path" or "SELECT * FROM t" (TM1 12.6.1+)
:param datasource_flight_auth: Arrow Flight auth header, e.g. "Bearer <token>" or "Basic user:pass" (TM1 12.6.1+)
"""
self._name = name
self._has_security_access = has_security_access
Expand Down Expand Up @@ -127,6 +136,10 @@ def __init__(
self._datasource_subset = datasource_subset
self._datasource_json_root_pointer = datasource_json_root_pointer
self._datasource_json_variable_mapping = datasource_json_variable_mapping
self._datasource_flight_location = datasource_flight_location
self._datasource_flight_descriptor_type = datasource_flight_descriptor_type
self._datasource_flight_descriptor = datasource_flight_descriptor
self._datasource_flight_auth = datasource_flight_auth

@classmethod
def from_json(cls, process_as_json: str) -> "Process":
Expand Down Expand Up @@ -171,6 +184,10 @@ def from_dict(cls, process_as_dict: Dict) -> "Process":
datasource_subset=process_as_dict["DataSource"].get("subset", ""),
datasource_json_root_pointer=process_as_dict["DataSource"].get("jsonRootPointer", ""),
datasource_json_variable_mapping=process_as_dict["DataSource"].get("jsonVariableMapping", ""),
datasource_flight_location=process_as_dict["DataSource"].get("flightLocation", ""),
datasource_flight_descriptor_type=process_as_dict["DataSource"].get("flightDescriptorType", ""),
datasource_flight_descriptor=process_as_dict["DataSource"].get("flightDescriptor", ""),
datasource_flight_auth=process_as_dict["DataSource"].get("flightAuth", ""),
)

@property
Expand Down Expand Up @@ -377,6 +394,38 @@ def datasource_json_variable_mapping(self) -> str:
def datasource_json_variable_mapping(self, value: str):
self._datasource_json_variable_mapping = value

@property
def datasource_flight_location(self) -> str:
return self._datasource_flight_location

@datasource_flight_location.setter
def datasource_flight_location(self, value: str):
self._datasource_flight_location = value

@property
def datasource_flight_descriptor_type(self) -> str:
return self._datasource_flight_descriptor_type

@datasource_flight_descriptor_type.setter
def datasource_flight_descriptor_type(self, value: str):
self._datasource_flight_descriptor_type = value

@property
def datasource_flight_descriptor(self) -> str:
return self._datasource_flight_descriptor

@datasource_flight_descriptor.setter
def datasource_flight_descriptor(self, value: str):
self._datasource_flight_descriptor = value

@property
def datasource_flight_auth(self) -> str:
return self._datasource_flight_auth

@datasource_flight_auth.setter
def datasource_flight_auth(self, value: str):
self._datasource_flight_auth = value

def add_variable(self, name: str, variable_type: str):
"""add variable to the process

Expand Down Expand Up @@ -509,4 +558,32 @@ def _construct_body_as_dict(self) -> Dict:
"jsonRootPointer": self._datasource_json_root_pointer,
"jsonVariableMapping": self._datasource_json_variable_mapping,
}
elif self._datasource_type.upper() in ("ARROW", "PARQUET"):
# Apache Arrow IPC/Feather or Parquet file datasource (TM1 12.6.1+).
# File datasources: just the (file name OR http/https URL) — no ascii*
# delimiter/quote/header keys. Optional JSON-treatment for nested columns
# reuses the JSON datasource's jsonRootPointer / jsonVariableMapping.
# NB: the server canonicalizes Type to title-case ("Arrow"/"Parquet") on
# read-back, so match case-insensitively to keep from_dict round-trips intact.
body_as_dict["DataSource"] = {
"Type": self._datasource_type,
"dataSourceNameForClient": self._datasource_data_source_name_for_client,
"dataSourceNameForServer": self._datasource_data_source_name_for_server,
}
if self._datasource_json_root_pointer:
body_as_dict["DataSource"]["jsonRootPointer"] = self._datasource_json_root_pointer
if self._datasource_json_variable_mapping:
body_as_dict["DataSource"]["jsonVariableMapping"] = self._datasource_json_variable_mapping
elif self._datasource_type.upper() == "FLIGHT":
# Apache Arrow Flight datasource (TM1 12.6.1+): TM1 is a Flight CLIENT that
# streams record batches from a remote Flight server into TI variables.
# Wire field names verified against a live 12.6.1 server: flightLocation /
# flightDescriptorType / flightDescriptor / flightAuth (NOT dataSourceFlight*).
body_as_dict["DataSource"] = {
"Type": self._datasource_type,
"flightLocation": self._datasource_flight_location,
"flightDescriptorType": self._datasource_flight_descriptor_type,
"flightDescriptor": self._datasource_flight_descriptor,
"flightAuth": self._datasource_flight_auth,
}
return body_as_dict
12 changes: 10 additions & 2 deletions TM1py/Services/ProcessService.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,11 @@ def get(self, name_process: str, **kwargs) -> Process:
"DataSource/usesUnicode,"
"DataSource/subset,"
"DataSource/jsonRootPointer,"
"DataSource/jsonVariableMapping",
"DataSource/jsonVariableMapping,"
"DataSource/flightLocation,"
"DataSource/flightDescriptorType,"
"DataSource/flightDescriptor,"
"DataSource/flightAuth",
name_process,
)

Expand Down Expand Up @@ -78,7 +82,11 @@ def get_all(self, skip_control_processes: bool = False, **kwargs) -> List[Proces
"DataSource/usesUnicode,"
"DataSource/subset,"
"DataSource/jsonRootPointer,"
"DataSource/jsonVariableMapping{}".format(model_process_filter if skip_control_processes else "")
"DataSource/jsonVariableMapping,"
"DataSource/flightLocation,"
"DataSource/flightDescriptorType,"
"DataSource/flightDescriptor,"
"DataSource/flightAuth{}".format(model_process_filter if skip_control_processes else "")
)

response = self._rest.GET(url, **kwargs)
Expand Down
50 changes: 50 additions & 0 deletions Tests/ProcessService_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,56 @@ def test_update_or_create(self):
self.p_bedrock_server_wait.prolog_procedure = temp_prolog
self.tm1.processes.delete(self.p_bedrock_server_wait.name)

@skip_if_version_lower_than(version="12.6.1")
def test_datasource_arrow_parquet_flight_roundtrip(self):
"""Live round-trip of the TM1 12.6.1 columnar (ARROW/PARQUET) and Arrow Flight datasources.

Skipped automatically on servers older than 12.6.1.
"""
arrow = Process(
name="TM1py_Tests_arrow_" + str(uuid.uuid4()),
datasource_type="ARROW",
datasource_data_source_name_for_server="data.arrow",
datasource_data_source_name_for_client="data.arrow",
)
parquet = Process(
name="TM1py_Tests_parquet_" + str(uuid.uuid4()),
datasource_type="PARQUET",
datasource_data_source_name_for_server="data.parquet",
datasource_data_source_name_for_client="data.parquet",
)
flight = Process(
name="TM1py_Tests_flight_" + str(uuid.uuid4()),
datasource_type="FLIGHT",
datasource_flight_location="grpc+tls://localhost:443",
datasource_flight_descriptor_type="COMMAND",
datasource_flight_descriptor="SELECT 1",
datasource_flight_auth="Bearer token",
)
try:
for process in (arrow, parquet, flight):
self.tm1.processes.update_or_create(process)

# the server canonicalizes Type to title-case ("Arrow"/"Parquet"/"Flight"), so compare case-insensitively
arrow_back = self.tm1.processes.get(arrow.name)
self.assertEqual(arrow_back.datasource_type.upper(), "ARROW")
self.assertEqual(arrow_back.datasource_data_source_name_for_server, "data.arrow")

parquet_back = self.tm1.processes.get(parquet.name)
self.assertEqual(parquet_back.datasource_type.upper(), "PARQUET")
self.assertEqual(parquet_back.datasource_data_source_name_for_server, "data.parquet")

flight_back = self.tm1.processes.get(flight.name)
self.assertEqual(flight_back.datasource_type.upper(), "FLIGHT")
self.assertEqual(flight_back.datasource_flight_location, "grpc+tls://localhost:443")
self.assertEqual(flight_back.datasource_flight_descriptor_type, "COMMAND")
self.assertEqual(flight_back.datasource_flight_descriptor, "SELECT 1")
self.assertEqual(flight_back.datasource_flight_auth, "Bearer token")
finally:
for process in (arrow, parquet, flight):
if self.tm1.processes.exists(process.name):
self.tm1.processes.delete(process.name)

def test_execute_process(self):
if not self.tm1.processes.exists(self.p_bedrock_server_wait.name):
self.tm1.processes.create(self.p_bedrock_server_wait)
Expand Down
168 changes: 167 additions & 1 deletion Tests/Process_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import unittest

from TM1py.Objects import BreakPointType, HitMode
from TM1py.Objects import BreakPointType, HitMode, Process


class TestBreakPointType(unittest.TestCase):
Expand Down Expand Up @@ -31,3 +31,169 @@ def test_BreakPointType_init_case(self):
def test_BreakPointType_str(self):
hit_mode = HitMode.BREAK_ALWAYS
self.assertEqual("BreakAlways", str(hit_mode))


class TestProcessDataSource(unittest.TestCase):
"""Offline unit tests for the TM1 12.6.1 columnar (ARROW/PARQUET) and Arrow Flight datasource body shapes."""

@staticmethod
def _process_dict(datasource: dict) -> dict:
"""Minimal Process payload wrapping a given DataSource block (for Process.from_dict)."""
return {
"Name": "p_test",
"HasSecurityAccess": False,
"Parameters": [],
"Variables": [],
"PrologProcedure": "",
"MetadataProcedure": "",
"DataProcedure": "",
"EpilogProcedure": "",
"DataSource": datasource,
}

def test_arrow_body(self):
process = Process(
name="p_arrow",
datasource_type="ARROW",
datasource_data_source_name_for_server="data.arrow",
datasource_data_source_name_for_client="data.arrow",
)
datasource = process.body_as_dict["DataSource"]
self.assertEqual(
datasource,
{
"Type": "ARROW",
"dataSourceNameForClient": "data.arrow",
"dataSourceNameForServer": "data.arrow",
},
)
# columnar files carry none of the ascii* delimiter/quote/header keys
self.assertFalse(any(key.startswith("ascii") for key in datasource))

def test_parquet_body(self):
process = Process(
name="p_parquet",
datasource_type="PARQUET",
datasource_data_source_name_for_server="data.parquet",
datasource_data_source_name_for_client="data.parquet",
)
datasource = process.body_as_dict["DataSource"]
self.assertEqual(
datasource,
{
"Type": "PARQUET",
"dataSourceNameForClient": "data.parquet",
"dataSourceNameForServer": "data.parquet",
},
)
self.assertFalse(any(key.startswith("ascii") for key in datasource))

def test_arrow_body_with_json_treatment(self):
process = Process(
name="p_arrow_json",
datasource_type="ARROW",
datasource_data_source_name_for_server="data.arrow",
datasource_data_source_name_for_client="data.arrow",
datasource_json_root_pointer="data",
datasource_json_variable_mapping="{}",
)
datasource = process.body_as_dict["DataSource"]
self.assertEqual(datasource["jsonRootPointer"], "data")
self.assertEqual(datasource["jsonVariableMapping"], "{}")

def test_arrow_body_omits_empty_json_treatment(self):
process = Process(
name="p_arrow_plain",
datasource_type="ARROW",
datasource_data_source_name_for_server="data.arrow",
)
datasource = process.body_as_dict["DataSource"]
self.assertNotIn("jsonRootPointer", datasource)
self.assertNotIn("jsonVariableMapping", datasource)

def test_flight_body(self):
process = Process(
name="p_flight",
datasource_type="FLIGHT",
datasource_flight_location="grpc+tls://host:443",
datasource_flight_descriptor_type="COMMAND",
datasource_flight_descriptor="SELECT * FROM t",
datasource_flight_auth="Bearer token",
)
self.assertEqual(
process.body_as_dict["DataSource"],
{
"Type": "FLIGHT",
"flightLocation": "grpc+tls://host:443",
"flightDescriptorType": "COMMAND",
"flightDescriptor": "SELECT * FROM t",
"flightAuth": "Bearer token",
},
)

def test_arrow_roundtrip(self):
datasource = {
"Type": "ARROW",
"dataSourceNameForClient": "data.arrow",
"dataSourceNameForServer": "data.arrow",
}
process = Process.from_dict(self._process_dict(datasource))
self.assertEqual(process.body_as_dict["DataSource"], datasource)

def test_parquet_roundtrip(self):
datasource = {
"Type": "PARQUET",
"dataSourceNameForClient": "data.parquet",
"dataSourceNameForServer": "data.parquet",
}
process = Process.from_dict(self._process_dict(datasource))
self.assertEqual(process.body_as_dict["DataSource"], datasource)

def test_arrow_roundtrip_with_json_treatment(self):
datasource = {
"Type": "ARROW",
"dataSourceNameForClient": "data.arrow",
"dataSourceNameForServer": "data.arrow",
"jsonRootPointer": "data",
"jsonVariableMapping": "{}",
}
process = Process.from_dict(self._process_dict(datasource))
self.assertEqual(process.body_as_dict["DataSource"], datasource)

def test_flight_roundtrip(self):
datasource = {
"Type": "FLIGHT",
"flightLocation": "grpc://host:8443",
"flightDescriptorType": "PATH",
"flightDescriptor": "dataset/path",
"flightAuth": "Basic user:pass",
}
process = Process.from_dict(self._process_dict(datasource))
self.assertEqual(process.body_as_dict["DataSource"], datasource)

def test_arrow_from_dict_title_case_type(self):
# The server canonicalizes Type to title-case ("Arrow") on read-back; from_dict
# of such a response must still produce a proper Arrow body, not an empty {}.
datasource = {
"Type": "Arrow",
"dataSourceNameForClient": "data.arrow",
"dataSourceNameForServer": "data.arrow",
}
process = Process.from_dict(self._process_dict(datasource))
self.assertEqual(process.body_as_dict["DataSource"], datasource)

def test_flight_from_dict_title_case_type(self):
# Title-case "Flight" (as returned by the server) must round-trip too.
datasource = {
"Type": "Flight",
"flightLocation": "grpc://host:8443",
"flightDescriptorType": "PATH",
"flightDescriptor": "dataset/path",
"flightAuth": "Basic user:pass",
}
process = Process.from_dict(self._process_dict(datasource))
self.assertEqual(process.body_as_dict["DataSource"], datasource)


if __name__ == "__main__":
unittest.main()
Loading