diff --git a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py index 814ec115e..6af319992 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py @@ -38,9 +38,11 @@ from sift_stream_bindings import ( DurationPy, FlowConfigPy, + FlowDescriptorPy, FlowPy, IngestionConfigFormPy, IngestWithConfigDataStreamRequestPy, + IngestWithConfigDataStreamRequestWrapperPy, MetadataPy, RecoveryStrategyPy, RunFormPy, @@ -129,12 +131,10 @@ class IngestionConfigStreamingLowLevelClient(LowLevelClientBase): DEFAULT_MAX_LOG_FILES = 7 # Equal to 1 week of logs DEFAULT_LOGFILE_PREFIX = "sift_stream_bindings.log" _sift_stream_instance: SiftStreamPy - _known_flows: dict[str, FlowConfig] - def __init__(self, sift_stream_instance: SiftStreamPy, known_flows: dict[str, FlowConfig]): + def __init__(self, sift_stream_instance: SiftStreamPy): super().__init__() self._sift_stream_instance = sift_stream_instance - self._known_flows = known_flows @classmethod async def create_sift_stream_instance( @@ -194,12 +194,7 @@ async def create_sift_stream_instance( sift_stream_instance = await builder.build() - known_flows = { - flow_name: FlowConfig._from_rust_config(flow) - for flow_name, flow in sift_stream_instance.get_flows().items() - } - - return cls(sift_stream_instance, known_flows) + return cls(sift_stream_instance) async def send(self, flow: FlowPy): await self._sift_stream_instance.send(flow) @@ -210,12 +205,16 @@ async def batch_send(self, flows: Iterable[FlowPy]): async def send_requests(self, requests: list[IngestWithConfigDataStreamRequestPy]): await self._sift_stream_instance.send_requests(requests) + def send_requests_nonblocking( + self, requests: Iterable[IngestWithConfigDataStreamRequestWrapperPy] + ): + self._sift_stream_instance.send_requests_nonblocking(requests) + + def get_flow_descriptor(self, flow_name: str) -> FlowDescriptorPy: + return self._sift_stream_instance.get_flow_descriptor(flow_name) + async def add_new_flows(self, flow_configs: list[FlowConfigPy]): await self._sift_stream_instance.add_new_flows(flow_configs) - self._known_flows = { - flow_name: FlowConfig._from_rust_config(flow) - for flow_name, flow in self._sift_stream_instance.get_flows().items() - } async def attach_run(self, run_selector: RunSelectorPy): await self._sift_stream_instance.attach_run(run_selector) diff --git a/python/lib/sift_client/resources/ingestion.py b/python/lib/sift_client/resources/ingestion.py index 884706ca1..69ae78527 100644 --- a/python/lib/sift_client/resources/ingestion.py +++ b/python/lib/sift_client/resources/ingestion.py @@ -18,9 +18,11 @@ from sift_stream_bindings import ( DiskBackupPolicyPy, DurationPy, + FlowDescriptorPy, FlowPy, IngestionConfigFormPy, IngestWithConfigDataStreamRequestPy, + IngestWithConfigDataStreamRequestWrapperPy, MetadataPy, RecoveryStrategyPy, RetryPolicyPy, @@ -491,6 +493,31 @@ async def send_requests(self, requests: list[IngestWithConfigDataStreamRequestPy """ await self._low_level_client.send_requests(requests) + def send_requests_nonblocking( + self, requests: Iterable[IngestWithConfigDataStreamRequestWrapperPy] + ): + """Send data in a manner identical to the raw gRPC service for ingestion-config based streaming. + + This method offers a way to send data that matches the raw gRPC service interface. You are + expected to handle channel value ordering as well as empty values correctly. + + Important: + If using this interface, you should use `FlowBuilderPy::request` to ensure proper + building of the request. + + Args: + requests: List of ingestion requests to send to Sift. + """ + self._low_level_client.send_requests_nonblocking(requests) + + def get_flow_descriptor(self, flow_name: str) -> FlowDescriptorPy: + """Retrieve a flow descriptor by name. + + Args: + flow_name: The name of the flow descriptor to retrieve. + """ + return self._low_level_client.get_flow_descriptor(flow_name) + async def add_new_flows(self, flow_configs: list[FlowConfig]): """Modify the existing ingestion config by adding new flows that weren't accounted for during initialization. @@ -575,23 +602,6 @@ def get_metrics_snapshot(self) -> SiftStreamMetricsSnapshotPy: """ return self._low_level_client.get_metrics_snapshot() - def get_flow_config(self, flow_name: str) -> FlowConfig: - """Retrieve a flow configuration by name. - - Args: - flow_name: The name of the flow configuration to retrieve. - - Returns: - The FlowConfig associated with the given flow name. - - Raises: - KeyError: If the flow name is not found in the known flows. - """ - flow_config = self._low_level_client._known_flows.get(flow_name) - if flow_config is None: - raise KeyError(f"FlowConfig {flow_name} is unknown to the ingestion client") - return flow_config - async def __aenter__(self): return self diff --git a/python/pyproject.toml b/python/pyproject.toml index 0c80b0763..dc027433f 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -59,7 +59,7 @@ all = [ 'pyOpenSSL<24.0.0', 'pyarrow>=17.0.0', 'rosbags~=0.0', - 'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4', + 'sift-stream-bindings>=0.2.0-rc4', 'types-pyOpenSSL<24.0.0', ] build = [ @@ -100,7 +100,7 @@ dev-all = [ 'pytest==8.2.2', 'rosbags~=0.0', 'ruff~=0.12.10', - 'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4', + 'sift-stream-bindings>=0.2.0-rc4', 'tomlkit~=0.13.3', 'types-pyOpenSSL<24.0.0', ] @@ -153,7 +153,7 @@ docs-build = [ 'pytest==8.2.2', 'rosbags~=0.0', 'ruff~=0.12.10', - 'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4', + 'sift-stream-bindings>=0.2.0-rc4', 'tomlkit~=0.13.3', 'types-pyOpenSSL<24.0.0', ] @@ -176,10 +176,10 @@ rosbags = [ 'rosbags~=0.0', ] sift-stream = [ - 'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4', + 'sift-stream-bindings>=0.2.0-rc4', ] sift-stream-bindings = [ - 'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4', + 'sift-stream-bindings>=0.2.0-rc4', ] tdms = [ 'npTDMS~=1.9', @@ -215,7 +215,7 @@ docs = ["mkdocs", openssl = ["pyOpenSSL<24.0.0", "types-pyOpenSSL<24.0.0", "cffi~=1.14"] tdms = ["npTDMS~=1.9"] rosbags = ["rosbags~=0.0"] -sift-stream = ["sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4"] +sift-stream = ["sift-stream-bindings>=0.2.0-rc4"] hdf5 = ["h5py~=3.11", "polars~=1.8"] data-review = ["pyarrow>=17.0.0"]