From 32764833be614a49250b3652cf274b2d6d102242 Mon Sep 17 00:00:00 2001 From: Ian Later Date: Fri, 29 Aug 2025 15:34:54 -0700 Subject: [PATCH 1/4] python(feat): Plumb bytes through sift-py and sift_client. --- .../ingestion_with_python_config/simulator.py | 5 ++++ .../telemetry_config.py | 6 +++++ .../_internal/low_level_wrappers/data.py | 6 ++--- python/lib/sift_client/resources/channels.py | 4 ++-- .../resources/sync_stubs/__init__.pyi | 4 ++-- python/lib/sift_client/types/channel.py | 10 ++++++-- python/lib/sift_py/ingestion/channel.py | 23 ++++++++++++++++--- 7 files changed, 46 insertions(+), 12 deletions(-) diff --git a/python/examples/ingestion_with_python_config/simulator.py b/python/examples/ingestion_with_python_config/simulator.py index 46fd0a8ae..aab115729 100644 --- a/python/examples/ingestion_with_python_config/simulator.py +++ b/python/examples/ingestion_with_python_config/simulator.py @@ -7,6 +7,7 @@ from sift_py.ingestion.channel import ( bit_field_value, + bytes_value, double_value, enum_value, int32_value, @@ -100,6 +101,10 @@ def run(self): random.choice(self.sample_bit_field_values) ), }, + { + "channel_name": "raw_bin", + "value": bytes_value(str(timestamp).encode("utf-8")), + }, ], } ) diff --git a/python/examples/ingestion_with_python_config/telemetry_config.py b/python/examples/ingestion_with_python_config/telemetry_config.py index 2173f4234..1c4a3904e 100644 --- a/python/examples/ingestion_with_python_config/telemetry_config.py +++ b/python/examples/ingestion_with_python_config/telemetry_config.py @@ -55,6 +55,11 @@ def nostromos_lv_426() -> TelemetryConfig: ChannelBitFieldElement(name="heater", index=7, bit_count=1), ], ) + raw_binary_channel = ChannelConfig( + name="raw_bin", + data_type=ChannelDataType.BYTES, + description="Example of binary encoded data (binary string encoding of time in seconds)", + ) return TelemetryConfig( asset_name="NostromoLV426", @@ -66,6 +71,7 @@ def nostromos_lv_426() -> TelemetryConfig: voltage_channel, vehicle_state_channel, gpio_channel, + raw_binary_channel, ], ), FlowConfig( diff --git a/python/lib/sift_client/_internal/low_level_wrappers/data.py b/python/lib/sift_client/_internal/low_level_wrappers/data.py index d48339bb1..56465e413 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/data.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/data.py @@ -4,7 +4,7 @@ import logging from datetime import datetime, timezone from math import ceil -from typing import Any, List, Tuple, cast +from typing import Any, Dict, List, Tuple, cast import pandas as pd from pydantic import BaseModel, ConfigDict @@ -240,7 +240,7 @@ async def get_channel_data( end_time: datetime | None = None, limit: int | None = None, ignore_cache: bool = False, - ): + ) -> Dict[str, pd.DataFrame]: """ Get the data for a channel during a run. """ @@ -326,7 +326,7 @@ async def get_channel_data( return ret_data @staticmethod - def try_deserialize_channel_data(channel_data: Any) -> dict[str, pd.DataFrame]: + def try_deserialize_channel_data(channel_data: Any) -> Dict[str, pd.DataFrame]: """ Deserialize a channel data object into a numpy array. """ diff --git a/python/lib/sift_client/resources/channels.py b/python/lib/sift_client/resources/channels.py index e121bbf73..21d4e65bc 100644 --- a/python/lib/sift_client/resources/channels.py +++ b/python/lib/sift_client/resources/channels.py @@ -4,7 +4,7 @@ from datetime import datetime from typing import TYPE_CHECKING, Dict, List -import numpy as np +import pandas as pd import pyarrow as pa from sift_client._internal.low_level_wrappers.channels import ChannelsLowLevelClient @@ -176,7 +176,7 @@ async def get_data( start_time: datetime | None = None, end_time: datetime | None = None, limit: int | None = None, - ) -> Dict[str, np.ndarray]: + ) -> Dict[str, pd.DataFrame]: """ Get data for one or more channels. diff --git a/python/lib/sift_client/resources/sync_stubs/__init__.pyi b/python/lib/sift_client/resources/sync_stubs/__init__.pyi index ca9c37336..a75e1d414 100644 --- a/python/lib/sift_client/resources/sync_stubs/__init__.pyi +++ b/python/lib/sift_client/resources/sync_stubs/__init__.pyi @@ -6,7 +6,7 @@ import re from datetime import datetime from typing import Any, Dict, List -import numpy as np +import pandas as pd import pyarrow as pa from sift_client.client import SiftClient @@ -428,7 +428,7 @@ class ChannelsAPI: start_time: datetime | None = None, end_time: datetime | None = None, limit: int | None = None, - ) -> Dict[str, np.ndarray]: + ) -> Dict[str, pd.DataFrame]: """ Get data for one or more channels. diff --git a/python/lib/sift_client/types/channel.py b/python/lib/sift_client/types/channel.py index b8ce8f160..8f8aded44 100644 --- a/python/lib/sift_client/types/channel.py +++ b/python/lib/sift_client/types/channel.py @@ -14,6 +14,7 @@ from sift.data.v2.data_pb2 import ( BitFieldValues, BoolValues, + BytesValues, DoubleValues, EnumValues, FloatValues, @@ -45,6 +46,7 @@ class ChannelDataType(Enum): INT_64 = channel_pb.CHANNEL_DATA_TYPE_INT_64 UINT_32 = channel_pb.CHANNEL_DATA_TYPE_UINT_32 UINT_64 = channel_pb.CHANNEL_DATA_TYPE_UINT_64 + BYTES = channel_pb.CHANNEL_DATA_TYPE_BYTES def __str__(self) -> str: ret = self.name.lower() @@ -78,7 +80,7 @@ def from_str(raw: str) -> Optional["ChannelDataType"]: if item.__str__() == val: return item raise Exception( - "Unreachable. ChannelTypeUrls and ChannelDataType enum names are out of sync." + f"{raw} type not found. ChannelTypeUrls and ChannelDataType enum names are out of sync." ) else: try: @@ -111,6 +113,8 @@ def proto_data_class(data_type: ChannelDataType) -> Any: return Uint32Values elif data_type == ChannelDataType.UINT_64: return Uint64Values + elif data_type == ChannelDataType.BYTES: + return BytesValues else: raise ValueError(f"Unknown data type: {data_type}") @@ -138,6 +142,8 @@ def hash_str(self, api_format: bool = False) -> str: return "CHANNEL_DATA_TYPE_UINT_32" if api_format else ChannelDataType.UINT_32.__str__() elif self == ChannelDataType.UINT_64: return "CHANNEL_DATA_TYPE_UINT_64" if api_format else ChannelDataType.UINT_64.__str__() + elif self == ChannelDataType.BYTES: + return "CHANNEL_DATA_TYPE_BYTES" if api_format else ChannelDataType.BYTES.__str__() else: raise Exception("Unreachable.") @@ -249,7 +255,7 @@ def data( limit: The maximum number of data points to return. Returns: - A ChannelTimeSeries object. + A dict of channel name to pandas DataFrame or Arrow Table object. """ if as_arrow: data = self.client.channels.get_data_as_arrow( diff --git a/python/lib/sift_py/ingestion/channel.py b/python/lib/sift_py/ingestion/channel.py index c1ab71dc1..023bee319 100644 --- a/python/lib/sift_py/ingestion/channel.py +++ b/python/lib/sift_py/ingestion/channel.py @@ -71,7 +71,7 @@ def __init__( self.identifier = self.fqn() def value_from( - self, value: Optional[Union[int, float, bool, str]] + self, value: Optional[Union[int, float, bool, str, bytes]] ) -> Optional[IngestWithConfigDataChannelValue]: """ Like `try_value_from` except will return `None` there is a failure to produce a channel value due to a type mismatch. @@ -82,7 +82,7 @@ def value_from( return None def try_value_from( - self, value: Optional[Union[int, float, bool, str]] + self, value: Optional[Union[int, float, bool, str, bytes]] ) -> IngestWithConfigDataChannelValue: """ Generate a channel value for this particular channel configuration. This will raise an exception @@ -112,7 +112,8 @@ def try_value_from( return enum_value(int(value)) elif isinstance(value, str) and self.data_type == ChannelDataType.STRING: return string_value(value) - + elif isinstance(value, bytes) and self.data_type == ChannelDataType.BYTES: + return bytes_value(value) raise ValueError(f"Failed to cast value of type {type(value)} to {self.data_type}") def as_pb(self, klass: Type[ChannelConfigPb]) -> ChannelConfigPb: @@ -209,6 +210,7 @@ class ChannelDataTypeStrRep(Enum): INT_64 = "int64" UINT_32 = "uint32" UINT_64 = "uint64" + BYTES = "bytes" @staticmethod def from_api_format(val: str) -> Optional["ChannelDataTypeStrRep"]: @@ -224,6 +226,7 @@ def from_api_format(val: str) -> Optional["ChannelDataTypeStrRep"]: "CHANNEL_DATA_TYPE_INT_64": ChannelDataTypeStrRep.INT_64, "CHANNEL_DATA_TYPE_UINT_32": ChannelDataTypeStrRep.UINT_32, "CHANNEL_DATA_TYPE_UINT_64": ChannelDataTypeStrRep.UINT_64, + "CHANNEL_DATA_TYPE_BYTES": ChannelDataTypeStrRep.BYTES, }[val] except KeyError: return None @@ -244,6 +247,7 @@ class ChannelDataType(Enum): INT_64 = channel_pb.CHANNEL_DATA_TYPE_INT_64 UINT_32 = channel_pb.CHANNEL_DATA_TYPE_UINT_32 UINT_64 = channel_pb.CHANNEL_DATA_TYPE_UINT_64 + BYTES = channel_pb.CHANNEL_DATA_TYPE_BYTES @classmethod def from_pb(cls, val: channel_pb.ChannelDataType.ValueType) -> "ChannelDataType": @@ -267,6 +271,8 @@ def from_pb(cls, val: channel_pb.ChannelDataType.ValueType) -> "ChannelDataType" return cls.UINT_32 elif val == cls.UINT_64.value: return cls.UINT_64 + elif val == cls.BYTES.value: + return cls.BYTES else: raise ValueError(f"Unknown channel data type '{val}'.") @@ -302,6 +308,8 @@ def from_str(cls, raw: str) -> Optional["ChannelDataType"]: return cls.UINT_32 elif val == ChannelDataTypeStrRep.UINT_64: return cls.UINT_64 + elif val == ChannelDataTypeStrRep.BYTES: + return cls.BYTES else: raise Exception("Unreachable") @@ -334,6 +342,8 @@ def as_human_str(self, api_format: bool = False) -> str: return ( "CHANNEL_DATA_TYPE_UINT_64" if api_format else ChannelDataTypeStrRep.UINT_64.value ) + elif self == ChannelDataType.BYTES: + return "CHANNEL_DATA_TYPE_BYTES" if api_format else ChannelDataTypeStrRep.BYTES.value else: raise Exception("Unreachable.") @@ -421,6 +431,10 @@ def empty_value() -> IngestWithConfigDataChannelValue: return IngestWithConfigDataChannelValue(empty=Empty()) +def bytes_value(val: bytes) -> IngestWithConfigDataChannelValue: + return IngestWithConfigDataChannelValue(bytes=val) + + def is_data_type(val: IngestWithConfigDataChannelValue, target_type: ChannelDataType) -> bool: if target_type == ChannelDataType.DOUBLE: return val.HasField("double") @@ -442,3 +456,6 @@ def is_data_type(val: IngestWithConfigDataChannelValue, target_type: ChannelData return val.HasField("uint32") elif target_type == ChannelDataType.UINT_64: return val.HasField("uint64") + elif target_type == ChannelDataType.BYTES: + return val.HasField("bytes") + raise ValueError(f"Unknown channel data type '{target_type}'.") From bead11495fec8c44bd94117cee74900d9616eaaa Mon Sep 17 00:00:00 2001 From: Ian Later Date: Fri, 29 Aug 2025 16:45:47 -0700 Subject: [PATCH 2/4] Excercise bytes in ingestion w/ yaml. Rev sift py version. --- .../ingestion_with_threading/impulse.py | 134 ++++++++++++++++++ .../ingestion_with_yaml_config/simulator.py | 7 +- .../telemetry_config.py | 4 +- .../telemetry_configs/nostromo_lv_426.yml | 7 +- .../_internal/low_level_wrappers/data.py | 6 +- .../lib/sift_py/ingestion/_internal/ingest.py | 11 ++ python/pyproject.toml | 2 +- 7 files changed, 163 insertions(+), 8 deletions(-) create mode 100644 python/examples/ingestion_with_threading/impulse.py diff --git a/python/examples/ingestion_with_threading/impulse.py b/python/examples/ingestion_with_threading/impulse.py new file mode 100644 index 000000000..851827708 --- /dev/null +++ b/python/examples/ingestion_with_threading/impulse.py @@ -0,0 +1,134 @@ +import logging +import os +import threading +import time +import traceback +from datetime import datetime, timezone +from typing import List + +from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel +from sift_py.ingestion.channel import ChannelConfig, ChannelDataType, double_value +from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig +from sift_py.ingestion.service import IngestionService + +# ------------------------------- +# Config +# ------------------------------- +LOG_LEVEL = logging.INFO +LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s" + +FLOW_NAME = "temperature_reading_multiprocessing" +CHANNEL_NAME = "temperature_multiprocessing" +ASSET_NAME = "NostromoLV426" +INGESTION_CLIENT_KEY = "nostromo_lv_426_test" + +SIFT_BASE_URI = "https://grpc-api.development.siftstack.com" +SIFT_API_KEY = os.getenv("SIFT_DEV_API_KEY") + +PROCESS_COUNT = 8 +BUFFER_SIZE = 1000 +FLUSH_INTERVAL_SEC = 1 +INGEST_SLEEP_SEC = 1 +FLOW_CREATE_BATCH = 100 + +logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT) +log = logging.getLogger(__name__) + + +class Consumer: + def __init__(self, ingestion_service: IngestionService): + self.ingestion_service = ingestion_service + + def run(self, consumer_id: int) -> None: + log.info("Consumer %d started", consumer_id) + with self.ingestion_service.buffered_ingestion( + buffer_size=BUFFER_SIZE, flush_interval_sec=FLUSH_INTERVAL_SEC + ) as buffered: + while True: + try: + buffered.ingest_flows( + { + "flow_name": FLOW_NAME, + "timestamp": datetime.now(timezone.utc), + "channel_values": [double_value(123)], + } + ) + time.sleep(INGEST_SLEEP_SEC) + except Exception as e: + log.error("Error ingesting flows: %s", e) + log.error(traceback.format_exc()) + time.sleep(INGEST_SLEEP_SEC) + + +def build_telemetry_config() -> TelemetryConfig: + temperature_channel = ChannelConfig( + name=CHANNEL_NAME, + data_type=ChannelDataType.DOUBLE, + description="temperature of thruster", + unit="Kelvin", + ) + + return TelemetryConfig( + asset_name=ASSET_NAME, + ingestion_client_key=INGESTION_CLIENT_KEY, + flows=[FlowConfig(name=FLOW_NAME, channels=[temperature_channel])], + ) + + +def create_missing_flows( + svc: IngestionService, flows: List[FlowConfig], batch_size: int = FLOW_CREATE_BATCH +) -> None: + existing = set(svc.flow_configs_by_name.keys()) + to_create = [f for f in flows if f.name not in existing] + + log.info("Existing flows: %d", len(existing)) + log.info("Creating %d flow(s): %s", len(to_create), [f.name for f in to_create]) + + for i in range(0, len(to_create), batch_size): + svc.try_create_flow(*to_create[i : i + batch_size]) + + +def main() -> None: + log.info("Loading telemetry config...") + telemetry_config = build_telemetry_config() + + sift_channel_config = SiftChannelConfig(uri=SIFT_BASE_URI, apikey=SIFT_API_KEY) + sift_channel = use_sift_channel(sift_channel_config) + + log.info("Initializing Sift Ingestion Service...") + log.info("Asset name: %s", telemetry_config.asset_name) + log.info("Ingestion client key: %s", telemetry_config.ingestion_client_key) + + ingestion_service = IngestionService( + sift_channel, + TelemetryConfig( + asset_name=telemetry_config.asset_name, + ingestion_client_key=telemetry_config.ingestion_client_key, + ), + ) + + log.info("Flow registry: %s", list(ingestion_service.flow_configs_by_name.keys())) + create_missing_flows(ingestion_service, telemetry_config.flows) + + processes: List[threading.Thread] = [] + try: + consumer = Consumer(ingestion_service) + for i in range(PROCESS_COUNT): + p = threading.Thread(target=consumer.run, args=(i,)) + p.start() + processes.append(p) + + for p in processes: + p.join() + + except KeyboardInterrupt: + log.info("Exiting...") + finally: + for p in processes: + if p.is_alive(): + p.terminate() + sift_channel.close() + + +if __name__ == "__main__": + main() diff --git a/python/examples/ingestion_with_yaml_config/simulator.py b/python/examples/ingestion_with_yaml_config/simulator.py index 26434994e..2b9b0328d 100644 --- a/python/examples/ingestion_with_yaml_config/simulator.py +++ b/python/examples/ingestion_with_yaml_config/simulator.py @@ -7,6 +7,7 @@ from sift_py.ingestion.channel import ( bit_field_value, + bytes_value, double_value, enum_value, int32_value, @@ -39,7 +40,7 @@ def __init__(self, ingestion_service: IngestionService): sample_bit_field_values = ["00001001", "00100011", "00001101", "11000001"] self.sample_bit_field_values = [bytes([int(byte, 2)]) for byte in sample_bit_field_values] - sample_logs = Path().joinpath("sample_data").joinpath("sample_logs.txt") + sample_logs = Path(__file__).parent.joinpath("sample_data").joinpath("sample_logs.txt") with open(sample_logs, "r") as file: self.sample_logs = file.readlines() @@ -100,6 +101,10 @@ def run(self): random.choice(self.sample_bit_field_values) ), }, + { + "channel_name": "raw_bin", + "value": bytes_value(str(timestamp).encode("utf-8")), + }, ], } ) diff --git a/python/examples/ingestion_with_yaml_config/telemetry_config.py b/python/examples/ingestion_with_yaml_config/telemetry_config.py index dbe0abcc1..2aaa817e7 100644 --- a/python/examples/ingestion_with_yaml_config/telemetry_config.py +++ b/python/examples/ingestion_with_yaml_config/telemetry_config.py @@ -3,11 +3,11 @@ from sift_py.ingestion.service import TelemetryConfig -TELEMETRY_CONFIGS_DIR = Path().joinpath("telemetry_configs") +TELEMETRY_CONFIGS_DIR = Path(__file__).parent.joinpath("telemetry_configs") def nostromos_lv_426() -> TelemetryConfig: - telemetry_config_name = os.getenv("TELEMETRY_CONFIG") + telemetry_config_name = os.getenv("TELEMETRY_CONFIG", "nostromo_lv_426.yml") if telemetry_config_name is None: raise Exception("Missing 'TELEMETRY_CONFIG' environment variable.") diff --git a/python/examples/ingestion_with_yaml_config/telemetry_configs/nostromo_lv_426.yml b/python/examples/ingestion_with_yaml_config/telemetry_configs/nostromo_lv_426.yml index 1a60d5c06..8541fbd62 100644 --- a/python/examples/ingestion_with_yaml_config/telemetry_configs/nostromo_lv_426.yml +++ b/python/examples/ingestion_with_yaml_config/telemetry_configs/nostromo_lv_426.yml @@ -1,5 +1,4 @@ asset_name: NostromoLV426 -ingestion_client_key: nostromo_lv_426 channels: log_channel: &log_channel @@ -51,6 +50,11 @@ channels: index: 7 bit_count: 1 + raw_binary_channel: &raw_binary_channel + name: raw_bin + data_type: bytes + description: Example of binary encoded data (binary string encoding of time in seconds) + flows: - name: readings channels: @@ -58,6 +62,7 @@ flows: - <<: *voltage_channel - <<: *vehicle_state_channel - <<: *gpio_channel + - <<: *raw_binary_channel - name: voltage channels: diff --git a/python/lib/sift_client/_internal/low_level_wrappers/data.py b/python/lib/sift_client/_internal/low_level_wrappers/data.py index 56465e413..0df283d73 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/data.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/data.py @@ -4,7 +4,7 @@ import logging from datetime import datetime, timezone from math import ceil -from typing import Any, Dict, List, Tuple, cast +from typing import Any, List, Tuple, cast import pandas as pd from pydantic import BaseModel, ConfigDict @@ -240,7 +240,7 @@ async def get_channel_data( end_time: datetime | None = None, limit: int | None = None, ignore_cache: bool = False, - ) -> Dict[str, pd.DataFrame]: + ) -> dict[str, pd.DataFrame]: """ Get the data for a channel during a run. """ @@ -326,7 +326,7 @@ async def get_channel_data( return ret_data @staticmethod - def try_deserialize_channel_data(channel_data: Any) -> Dict[str, pd.DataFrame]: + def try_deserialize_channel_data(channel_data: Any) -> dict[str, pd.DataFrame]: """ Deserialize a channel data object into a numpy array. """ diff --git a/python/lib/sift_py/ingestion/_internal/ingest.py b/python/lib/sift_py/ingestion/_internal/ingest.py index b086e1889..82c42f5e2 100644 --- a/python/lib/sift_py/ingestion/_internal/ingest.py +++ b/python/lib/sift_py/ingestion/_internal/ingest.py @@ -533,20 +533,30 @@ def _update_flow_configs( # There isn't a flow in Sift for this config flow so we'll create it. if sift_flow_index is None: flows_to_create.append(config_flow) + print("creating flow", config_flow.name) continue # There is a flow in Sift with the name of the config flow. We'll # compare the channels in the config flow with the sift flow and # see if there's a difference. If there is we'll create a new flow. sift_flow = sift_flows[sift_flow_index] + print(f"SIFT FLOW: {sift_flow_index} - {sift_flow.name}") sift_channel_identifiers = { sift_channel_identifier(channel) for channel in sift_flow.channels } + print("SIFT CHANNEL IDENTIFIERS") + for identifier in sift_channel_identifiers: + print(identifier) + print("CONFIG CHANNEL IDENTIFIERS") + for config_channel in config_flow.channels: + print(config_channel_identifier(config_channel)) + print(config_channel.name) for config_channel in config_flow.channels: # Found a channel for this flow that doesn't exist in Sift based on channel # fully-qualified name and data-type. Create a new flow. + if not config_channel_identifier(config_channel) in sift_channel_identifiers: raise IngestionValidationError( "Encountered duplicate flow with mismatched channels" @@ -565,6 +575,7 @@ def _get_or_create_ingestion_config( May raise `ProtobufMaxSizeExceeded` if a large number of flows needing updates or creation are passed """ + print(config.ingestion_client_key) ingestion_config = get_ingestion_config_by_client_key(channel, config.ingestion_client_key) # Exiting ingestion config.. update flows if necessary diff --git a/python/pyproject.toml b/python/pyproject.toml index b3556eb3a..b51621220 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "sift_stack_py" -version = "0.8.4" +version = "0.8.5" description = "Python client library for the Sift API" requires-python = ">=3.8" readme = { file = "README.md", content-type = "text/markdown" } From bfacaa79b78bbdee4e1658adc58696e4964b3e6c Mon Sep 17 00:00:00 2001 From: Ian Later Date: Fri, 29 Aug 2025 16:47:57 -0700 Subject: [PATCH 3/4] remove debug prints --- python/lib/sift_py/ingestion/_internal/ingest.py | 10 ---------- 1 file changed, 10 deletions(-) diff --git a/python/lib/sift_py/ingestion/_internal/ingest.py b/python/lib/sift_py/ingestion/_internal/ingest.py index 82c42f5e2..f28df007b 100644 --- a/python/lib/sift_py/ingestion/_internal/ingest.py +++ b/python/lib/sift_py/ingestion/_internal/ingest.py @@ -533,25 +533,16 @@ def _update_flow_configs( # There isn't a flow in Sift for this config flow so we'll create it. if sift_flow_index is None: flows_to_create.append(config_flow) - print("creating flow", config_flow.name) continue # There is a flow in Sift with the name of the config flow. We'll # compare the channels in the config flow with the sift flow and # see if there's a difference. If there is we'll create a new flow. sift_flow = sift_flows[sift_flow_index] - print(f"SIFT FLOW: {sift_flow_index} - {sift_flow.name}") sift_channel_identifiers = { sift_channel_identifier(channel) for channel in sift_flow.channels } - print("SIFT CHANNEL IDENTIFIERS") - for identifier in sift_channel_identifiers: - print(identifier) - print("CONFIG CHANNEL IDENTIFIERS") - for config_channel in config_flow.channels: - print(config_channel_identifier(config_channel)) - print(config_channel.name) for config_channel in config_flow.channels: # Found a channel for this flow that doesn't exist in Sift based on channel @@ -575,7 +566,6 @@ def _get_or_create_ingestion_config( May raise `ProtobufMaxSizeExceeded` if a large number of flows needing updates or creation are passed """ - print(config.ingestion_client_key) ingestion_config = get_ingestion_config_by_client_key(channel, config.ingestion_client_key) # Exiting ingestion config.. update flows if necessary From a9a847b6c23964afed910a58f1126643d6b80a74 Mon Sep 17 00:00:00 2001 From: Ian Later Date: Fri, 29 Aug 2025 16:50:39 -0700 Subject: [PATCH 4/4] rm unrelated changes --- .../ingestion_with_threading/impulse.py | 134 ------------------ .../lib/sift_py/ingestion/_internal/ingest.py | 1 - 2 files changed, 135 deletions(-) delete mode 100644 python/examples/ingestion_with_threading/impulse.py diff --git a/python/examples/ingestion_with_threading/impulse.py b/python/examples/ingestion_with_threading/impulse.py deleted file mode 100644 index 851827708..000000000 --- a/python/examples/ingestion_with_threading/impulse.py +++ /dev/null @@ -1,134 +0,0 @@ -import logging -import os -import threading -import time -import traceback -from datetime import datetime, timezone -from typing import List - -from sift_py.grpc.transport import SiftChannelConfig, use_sift_channel -from sift_py.ingestion.channel import ChannelConfig, ChannelDataType, double_value -from sift_py.ingestion.config.telemetry import FlowConfig, TelemetryConfig -from sift_py.ingestion.service import IngestionService - -# ------------------------------- -# Config -# ------------------------------- -LOG_LEVEL = logging.INFO -LOG_FORMAT = "%(asctime)s - %(levelname)s - %(message)s" - -FLOW_NAME = "temperature_reading_multiprocessing" -CHANNEL_NAME = "temperature_multiprocessing" -ASSET_NAME = "NostromoLV426" -INGESTION_CLIENT_KEY = "nostromo_lv_426_test" - -SIFT_BASE_URI = "https://grpc-api.development.siftstack.com" -SIFT_API_KEY = os.getenv("SIFT_DEV_API_KEY") - -PROCESS_COUNT = 8 -BUFFER_SIZE = 1000 -FLUSH_INTERVAL_SEC = 1 -INGEST_SLEEP_SEC = 1 -FLOW_CREATE_BATCH = 100 - -logging.basicConfig(level=LOG_LEVEL, format=LOG_FORMAT) -log = logging.getLogger(__name__) - - -class Consumer: - def __init__(self, ingestion_service: IngestionService): - self.ingestion_service = ingestion_service - - def run(self, consumer_id: int) -> None: - log.info("Consumer %d started", consumer_id) - with self.ingestion_service.buffered_ingestion( - buffer_size=BUFFER_SIZE, flush_interval_sec=FLUSH_INTERVAL_SEC - ) as buffered: - while True: - try: - buffered.ingest_flows( - { - "flow_name": FLOW_NAME, - "timestamp": datetime.now(timezone.utc), - "channel_values": [double_value(123)], - } - ) - time.sleep(INGEST_SLEEP_SEC) - except Exception as e: - log.error("Error ingesting flows: %s", e) - log.error(traceback.format_exc()) - time.sleep(INGEST_SLEEP_SEC) - - -def build_telemetry_config() -> TelemetryConfig: - temperature_channel = ChannelConfig( - name=CHANNEL_NAME, - data_type=ChannelDataType.DOUBLE, - description="temperature of thruster", - unit="Kelvin", - ) - - return TelemetryConfig( - asset_name=ASSET_NAME, - ingestion_client_key=INGESTION_CLIENT_KEY, - flows=[FlowConfig(name=FLOW_NAME, channels=[temperature_channel])], - ) - - -def create_missing_flows( - svc: IngestionService, flows: List[FlowConfig], batch_size: int = FLOW_CREATE_BATCH -) -> None: - existing = set(svc.flow_configs_by_name.keys()) - to_create = [f for f in flows if f.name not in existing] - - log.info("Existing flows: %d", len(existing)) - log.info("Creating %d flow(s): %s", len(to_create), [f.name for f in to_create]) - - for i in range(0, len(to_create), batch_size): - svc.try_create_flow(*to_create[i : i + batch_size]) - - -def main() -> None: - log.info("Loading telemetry config...") - telemetry_config = build_telemetry_config() - - sift_channel_config = SiftChannelConfig(uri=SIFT_BASE_URI, apikey=SIFT_API_KEY) - sift_channel = use_sift_channel(sift_channel_config) - - log.info("Initializing Sift Ingestion Service...") - log.info("Asset name: %s", telemetry_config.asset_name) - log.info("Ingestion client key: %s", telemetry_config.ingestion_client_key) - - ingestion_service = IngestionService( - sift_channel, - TelemetryConfig( - asset_name=telemetry_config.asset_name, - ingestion_client_key=telemetry_config.ingestion_client_key, - ), - ) - - log.info("Flow registry: %s", list(ingestion_service.flow_configs_by_name.keys())) - create_missing_flows(ingestion_service, telemetry_config.flows) - - processes: List[threading.Thread] = [] - try: - consumer = Consumer(ingestion_service) - for i in range(PROCESS_COUNT): - p = threading.Thread(target=consumer.run, args=(i,)) - p.start() - processes.append(p) - - for p in processes: - p.join() - - except KeyboardInterrupt: - log.info("Exiting...") - finally: - for p in processes: - if p.is_alive(): - p.terminate() - sift_channel.close() - - -if __name__ == "__main__": - main() diff --git a/python/lib/sift_py/ingestion/_internal/ingest.py b/python/lib/sift_py/ingestion/_internal/ingest.py index f28df007b..b086e1889 100644 --- a/python/lib/sift_py/ingestion/_internal/ingest.py +++ b/python/lib/sift_py/ingestion/_internal/ingest.py @@ -547,7 +547,6 @@ def _update_flow_configs( for config_channel in config_flow.channels: # Found a channel for this flow that doesn't exist in Sift based on channel # fully-qualified name and data-type. Create a new flow. - if not config_channel_identifier(config_channel) in sift_channel_identifiers: raise IngestionValidationError( "Encountered duplicate flow with mismatched channels"