diff --git a/python/examples/ingestion_with_python_config/simulator.py b/python/examples/ingestion_with_python_config/simulator.py index 46fd0a8ae..aab115729 100644 --- a/python/examples/ingestion_with_python_config/simulator.py +++ b/python/examples/ingestion_with_python_config/simulator.py @@ -7,6 +7,7 @@ from sift_py.ingestion.channel import ( bit_field_value, + bytes_value, double_value, enum_value, int32_value, @@ -100,6 +101,10 @@ def run(self): random.choice(self.sample_bit_field_values) ), }, + { + "channel_name": "raw_bin", + "value": bytes_value(str(timestamp).encode("utf-8")), + }, ], } ) diff --git a/python/examples/ingestion_with_python_config/telemetry_config.py b/python/examples/ingestion_with_python_config/telemetry_config.py index 2173f4234..1c4a3904e 100644 --- a/python/examples/ingestion_with_python_config/telemetry_config.py +++ b/python/examples/ingestion_with_python_config/telemetry_config.py @@ -55,6 +55,11 @@ def nostromos_lv_426() -> TelemetryConfig: ChannelBitFieldElement(name="heater", index=7, bit_count=1), ], ) + raw_binary_channel = ChannelConfig( + name="raw_bin", + data_type=ChannelDataType.BYTES, + description="Example of binary encoded data (binary string encoding of time in seconds)", + ) return TelemetryConfig( asset_name="NostromoLV426", @@ -66,6 +71,7 @@ def nostromos_lv_426() -> TelemetryConfig: voltage_channel, vehicle_state_channel, gpio_channel, + raw_binary_channel, ], ), FlowConfig( diff --git a/python/examples/ingestion_with_yaml_config/simulator.py b/python/examples/ingestion_with_yaml_config/simulator.py index 26434994e..2b9b0328d 100644 --- a/python/examples/ingestion_with_yaml_config/simulator.py +++ b/python/examples/ingestion_with_yaml_config/simulator.py @@ -7,6 +7,7 @@ from sift_py.ingestion.channel import ( bit_field_value, + bytes_value, double_value, enum_value, int32_value, @@ -39,7 +40,7 @@ def __init__(self, ingestion_service: IngestionService): sample_bit_field_values = ["00001001", "00100011", "00001101", "11000001"] self.sample_bit_field_values = [bytes([int(byte, 2)]) for byte in sample_bit_field_values] - sample_logs = Path().joinpath("sample_data").joinpath("sample_logs.txt") + sample_logs = Path(__file__).parent.joinpath("sample_data").joinpath("sample_logs.txt") with open(sample_logs, "r") as file: self.sample_logs = file.readlines() @@ -100,6 +101,10 @@ def run(self): random.choice(self.sample_bit_field_values) ), }, + { + "channel_name": "raw_bin", + "value": bytes_value(str(timestamp).encode("utf-8")), + }, ], } ) diff --git a/python/examples/ingestion_with_yaml_config/telemetry_config.py b/python/examples/ingestion_with_yaml_config/telemetry_config.py index dbe0abcc1..2aaa817e7 100644 --- a/python/examples/ingestion_with_yaml_config/telemetry_config.py +++ b/python/examples/ingestion_with_yaml_config/telemetry_config.py @@ -3,11 +3,11 @@ from sift_py.ingestion.service import TelemetryConfig -TELEMETRY_CONFIGS_DIR = Path().joinpath("telemetry_configs") +TELEMETRY_CONFIGS_DIR = Path(__file__).parent.joinpath("telemetry_configs") def nostromos_lv_426() -> TelemetryConfig: - telemetry_config_name = os.getenv("TELEMETRY_CONFIG") + telemetry_config_name = os.getenv("TELEMETRY_CONFIG", "nostromo_lv_426.yml") if telemetry_config_name is None: raise Exception("Missing 'TELEMETRY_CONFIG' environment variable.") diff --git a/python/examples/ingestion_with_yaml_config/telemetry_configs/nostromo_lv_426.yml b/python/examples/ingestion_with_yaml_config/telemetry_configs/nostromo_lv_426.yml index 1a60d5c06..8541fbd62 100644 --- a/python/examples/ingestion_with_yaml_config/telemetry_configs/nostromo_lv_426.yml +++ b/python/examples/ingestion_with_yaml_config/telemetry_configs/nostromo_lv_426.yml @@ -1,5 +1,4 @@ asset_name: NostromoLV426 -ingestion_client_key: nostromo_lv_426 channels: log_channel: &log_channel @@ -51,6 +50,11 @@ channels: index: 7 bit_count: 1 + raw_binary_channel: &raw_binary_channel + name: raw_bin + data_type: bytes + description: Example of binary encoded data (binary string encoding of time in seconds) + flows: - name: readings channels: @@ -58,6 +62,7 @@ flows: - <<: *voltage_channel - <<: *vehicle_state_channel - <<: *gpio_channel + - <<: *raw_binary_channel - name: voltage channels: diff --git a/python/lib/sift_client/_internal/low_level_wrappers/data.py b/python/lib/sift_client/_internal/low_level_wrappers/data.py index d48339bb1..0df283d73 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/data.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/data.py @@ -240,7 +240,7 @@ async def get_channel_data( end_time: datetime | None = None, limit: int | None = None, ignore_cache: bool = False, - ): + ) -> dict[str, pd.DataFrame]: """ Get the data for a channel during a run. """ diff --git a/python/lib/sift_client/resources/channels.py b/python/lib/sift_client/resources/channels.py index e121bbf73..21d4e65bc 100644 --- a/python/lib/sift_client/resources/channels.py +++ b/python/lib/sift_client/resources/channels.py @@ -4,7 +4,7 @@ from datetime import datetime from typing import TYPE_CHECKING, Dict, List -import numpy as np +import pandas as pd import pyarrow as pa from sift_client._internal.low_level_wrappers.channels import ChannelsLowLevelClient @@ -176,7 +176,7 @@ async def get_data( start_time: datetime | None = None, end_time: datetime | None = None, limit: int | None = None, - ) -> Dict[str, np.ndarray]: + ) -> Dict[str, pd.DataFrame]: """ Get data for one or more channels. diff --git a/python/lib/sift_client/resources/sync_stubs/__init__.pyi b/python/lib/sift_client/resources/sync_stubs/__init__.pyi index ca9c37336..a75e1d414 100644 --- a/python/lib/sift_client/resources/sync_stubs/__init__.pyi +++ b/python/lib/sift_client/resources/sync_stubs/__init__.pyi @@ -6,7 +6,7 @@ import re from datetime import datetime from typing import Any, Dict, List -import numpy as np +import pandas as pd import pyarrow as pa from sift_client.client import SiftClient @@ -428,7 +428,7 @@ class ChannelsAPI: start_time: datetime | None = None, end_time: datetime | None = None, limit: int | None = None, - ) -> Dict[str, np.ndarray]: + ) -> Dict[str, pd.DataFrame]: """ Get data for one or more channels. diff --git a/python/lib/sift_client/types/channel.py b/python/lib/sift_client/types/channel.py index b8ce8f160..8f8aded44 100644 --- a/python/lib/sift_client/types/channel.py +++ b/python/lib/sift_client/types/channel.py @@ -14,6 +14,7 @@ from sift.data.v2.data_pb2 import ( BitFieldValues, BoolValues, + BytesValues, DoubleValues, EnumValues, FloatValues, @@ -45,6 +46,7 @@ class ChannelDataType(Enum): INT_64 = channel_pb.CHANNEL_DATA_TYPE_INT_64 UINT_32 = channel_pb.CHANNEL_DATA_TYPE_UINT_32 UINT_64 = channel_pb.CHANNEL_DATA_TYPE_UINT_64 + BYTES = channel_pb.CHANNEL_DATA_TYPE_BYTES def __str__(self) -> str: ret = self.name.lower() @@ -78,7 +80,7 @@ def from_str(raw: str) -> Optional["ChannelDataType"]: if item.__str__() == val: return item raise Exception( - "Unreachable. ChannelTypeUrls and ChannelDataType enum names are out of sync." + f"{raw} type not found. ChannelTypeUrls and ChannelDataType enum names are out of sync." ) else: try: @@ -111,6 +113,8 @@ def proto_data_class(data_type: ChannelDataType) -> Any: return Uint32Values elif data_type == ChannelDataType.UINT_64: return Uint64Values + elif data_type == ChannelDataType.BYTES: + return BytesValues else: raise ValueError(f"Unknown data type: {data_type}") @@ -138,6 +142,8 @@ def hash_str(self, api_format: bool = False) -> str: return "CHANNEL_DATA_TYPE_UINT_32" if api_format else ChannelDataType.UINT_32.__str__() elif self == ChannelDataType.UINT_64: return "CHANNEL_DATA_TYPE_UINT_64" if api_format else ChannelDataType.UINT_64.__str__() + elif self == ChannelDataType.BYTES: + return "CHANNEL_DATA_TYPE_BYTES" if api_format else ChannelDataType.BYTES.__str__() else: raise Exception("Unreachable.") @@ -249,7 +255,7 @@ def data( limit: The maximum number of data points to return. Returns: - A ChannelTimeSeries object. + A dict of channel name to pandas DataFrame or Arrow Table object. """ if as_arrow: data = self.client.channels.get_data_as_arrow( diff --git a/python/lib/sift_py/ingestion/channel.py b/python/lib/sift_py/ingestion/channel.py index c1ab71dc1..023bee319 100644 --- a/python/lib/sift_py/ingestion/channel.py +++ b/python/lib/sift_py/ingestion/channel.py @@ -71,7 +71,7 @@ def __init__( self.identifier = self.fqn() def value_from( - self, value: Optional[Union[int, float, bool, str]] + self, value: Optional[Union[int, float, bool, str, bytes]] ) -> Optional[IngestWithConfigDataChannelValue]: """ Like `try_value_from` except will return `None` there is a failure to produce a channel value due to a type mismatch. @@ -82,7 +82,7 @@ def value_from( return None def try_value_from( - self, value: Optional[Union[int, float, bool, str]] + self, value: Optional[Union[int, float, bool, str, bytes]] ) -> IngestWithConfigDataChannelValue: """ Generate a channel value for this particular channel configuration. This will raise an exception @@ -112,7 +112,8 @@ def try_value_from( return enum_value(int(value)) elif isinstance(value, str) and self.data_type == ChannelDataType.STRING: return string_value(value) - + elif isinstance(value, bytes) and self.data_type == ChannelDataType.BYTES: + return bytes_value(value) raise ValueError(f"Failed to cast value of type {type(value)} to {self.data_type}") def as_pb(self, klass: Type[ChannelConfigPb]) -> ChannelConfigPb: @@ -209,6 +210,7 @@ class ChannelDataTypeStrRep(Enum): INT_64 = "int64" UINT_32 = "uint32" UINT_64 = "uint64" + BYTES = "bytes" @staticmethod def from_api_format(val: str) -> Optional["ChannelDataTypeStrRep"]: @@ -224,6 +226,7 @@ def from_api_format(val: str) -> Optional["ChannelDataTypeStrRep"]: "CHANNEL_DATA_TYPE_INT_64": ChannelDataTypeStrRep.INT_64, "CHANNEL_DATA_TYPE_UINT_32": ChannelDataTypeStrRep.UINT_32, "CHANNEL_DATA_TYPE_UINT_64": ChannelDataTypeStrRep.UINT_64, + "CHANNEL_DATA_TYPE_BYTES": ChannelDataTypeStrRep.BYTES, }[val] except KeyError: return None @@ -244,6 +247,7 @@ class ChannelDataType(Enum): INT_64 = channel_pb.CHANNEL_DATA_TYPE_INT_64 UINT_32 = channel_pb.CHANNEL_DATA_TYPE_UINT_32 UINT_64 = channel_pb.CHANNEL_DATA_TYPE_UINT_64 + BYTES = channel_pb.CHANNEL_DATA_TYPE_BYTES @classmethod def from_pb(cls, val: channel_pb.ChannelDataType.ValueType) -> "ChannelDataType": @@ -267,6 +271,8 @@ def from_pb(cls, val: channel_pb.ChannelDataType.ValueType) -> "ChannelDataType" return cls.UINT_32 elif val == cls.UINT_64.value: return cls.UINT_64 + elif val == cls.BYTES.value: + return cls.BYTES else: raise ValueError(f"Unknown channel data type '{val}'.") @@ -302,6 +308,8 @@ def from_str(cls, raw: str) -> Optional["ChannelDataType"]: return cls.UINT_32 elif val == ChannelDataTypeStrRep.UINT_64: return cls.UINT_64 + elif val == ChannelDataTypeStrRep.BYTES: + return cls.BYTES else: raise Exception("Unreachable") @@ -334,6 +342,8 @@ def as_human_str(self, api_format: bool = False) -> str: return ( "CHANNEL_DATA_TYPE_UINT_64" if api_format else ChannelDataTypeStrRep.UINT_64.value ) + elif self == ChannelDataType.BYTES: + return "CHANNEL_DATA_TYPE_BYTES" if api_format else ChannelDataTypeStrRep.BYTES.value else: raise Exception("Unreachable.") @@ -421,6 +431,10 @@ def empty_value() -> IngestWithConfigDataChannelValue: return IngestWithConfigDataChannelValue(empty=Empty()) +def bytes_value(val: bytes) -> IngestWithConfigDataChannelValue: + return IngestWithConfigDataChannelValue(bytes=val) + + def is_data_type(val: IngestWithConfigDataChannelValue, target_type: ChannelDataType) -> bool: if target_type == ChannelDataType.DOUBLE: return val.HasField("double") @@ -442,3 +456,6 @@ def is_data_type(val: IngestWithConfigDataChannelValue, target_type: ChannelData return val.HasField("uint32") elif target_type == ChannelDataType.UINT_64: return val.HasField("uint64") + elif target_type == ChannelDataType.BYTES: + return val.HasField("bytes") + raise ValueError(f"Unknown channel data type '{target_type}'.") diff --git a/python/pyproject.toml b/python/pyproject.toml index b3556eb3a..b51621220 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sift_stack_py" -version = "0.8.4" +version = "0.8.5" description = "Python client library for the Sift API" requires-python = ">=3.8" readme = { file = "README.md", content-type = "text/markdown" }