Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 12 additions & 13 deletions python/lib/sift_client/_internal/low_level_wrappers/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,11 @@
from sift_stream_bindings import (
DurationPy,
FlowConfigPy,
FlowDescriptorPy,
FlowPy,
IngestionConfigFormPy,
IngestWithConfigDataStreamRequestPy,
IngestWithConfigDataStreamRequestWrapperPy,
MetadataPy,
RecoveryStrategyPy,
RunFormPy,
Expand Down Expand Up @@ -129,12 +131,10 @@ class IngestionConfigStreamingLowLevelClient(LowLevelClientBase):
DEFAULT_MAX_LOG_FILES = 7 # Equal to 1 week of logs
DEFAULT_LOGFILE_PREFIX = "sift_stream_bindings.log"
_sift_stream_instance: SiftStreamPy
_known_flows: dict[str, FlowConfig]

def __init__(self, sift_stream_instance: SiftStreamPy, known_flows: dict[str, FlowConfig]):
def __init__(self, sift_stream_instance: SiftStreamPy):
super().__init__()
self._sift_stream_instance = sift_stream_instance
self._known_flows = known_flows

@classmethod
async def create_sift_stream_instance(
Expand Down Expand Up @@ -194,12 +194,7 @@ async def create_sift_stream_instance(

sift_stream_instance = await builder.build()

known_flows = {
flow_name: FlowConfig._from_rust_config(flow)
for flow_name, flow in sift_stream_instance.get_flows().items()
}

return cls(sift_stream_instance, known_flows)
return cls(sift_stream_instance)

async def send(self, flow: FlowPy):
await self._sift_stream_instance.send(flow)
Expand All @@ -210,12 +205,16 @@ async def batch_send(self, flows: Iterable[FlowPy]):
async def send_requests(self, requests: list[IngestWithConfigDataStreamRequestPy]):
await self._sift_stream_instance.send_requests(requests)

def send_requests_nonblocking(
self, requests: Iterable[IngestWithConfigDataStreamRequestWrapperPy]
):
self._sift_stream_instance.send_requests_nonblocking(requests)

def get_flow_descriptor(self, flow_name: str) -> FlowDescriptorPy:
return self._sift_stream_instance.get_flow_descriptor(flow_name)

async def add_new_flows(self, flow_configs: list[FlowConfigPy]):
await self._sift_stream_instance.add_new_flows(flow_configs)
self._known_flows = {
flow_name: FlowConfig._from_rust_config(flow)
for flow_name, flow in self._sift_stream_instance.get_flows().items()
}

async def attach_run(self, run_selector: RunSelectorPy):
await self._sift_stream_instance.attach_run(run_selector)
Expand Down
44 changes: 27 additions & 17 deletions python/lib/sift_client/resources/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
from sift_stream_bindings import (
DiskBackupPolicyPy,
DurationPy,
FlowDescriptorPy,
FlowPy,
IngestionConfigFormPy,
IngestWithConfigDataStreamRequestPy,
IngestWithConfigDataStreamRequestWrapperPy,
MetadataPy,
RecoveryStrategyPy,
RetryPolicyPy,
Expand Down Expand Up @@ -491,6 +493,31 @@ async def send_requests(self, requests: list[IngestWithConfigDataStreamRequestPy
"""
await self._low_level_client.send_requests(requests)

def send_requests_nonblocking(
self, requests: Iterable[IngestWithConfigDataStreamRequestWrapperPy]
):
"""Send data in a manner identical to the raw gRPC service for ingestion-config based streaming.

This method offers a way to send data that matches the raw gRPC service interface. You are
expected to handle channel value ordering as well as empty values correctly.

Important:
If using this interface, you should use `FlowBuilderPy::request` to ensure proper
building of the request.

Args:
requests: List of ingestion requests to send to Sift.
"""
self._low_level_client.send_requests_nonblocking(requests)

def get_flow_descriptor(self, flow_name: str) -> FlowDescriptorPy:
Comment thread
tsift marked this conversation as resolved.
"""Retrieve a flow descriptor by name.

Args:
flow_name: The name of the flow descriptor to retrieve.
"""
return self._low_level_client.get_flow_descriptor(flow_name)

async def add_new_flows(self, flow_configs: list[FlowConfig]):
"""Modify the existing ingestion config by adding new flows that weren't accounted for during initialization.

Expand Down Expand Up @@ -575,23 +602,6 @@ def get_metrics_snapshot(self) -> SiftStreamMetricsSnapshotPy:
"""
return self._low_level_client.get_metrics_snapshot()

def get_flow_config(self, flow_name: str) -> FlowConfig:
"""Retrieve a flow configuration by name.

Args:
flow_name: The name of the flow configuration to retrieve.

Returns:
The FlowConfig associated with the given flow name.

Raises:
KeyError: If the flow name is not found in the known flows.
"""
flow_config = self._low_level_client._known_flows.get(flow_name)
if flow_config is None:
raise KeyError(f"FlowConfig {flow_name} is unknown to the ingestion client")
return flow_config

async def __aenter__(self):
return self

Expand Down
12 changes: 6 additions & 6 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ all = [
'pyOpenSSL<24.0.0',
'pyarrow>=17.0.0',
'rosbags~=0.0',
'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4',
'sift-stream-bindings>=0.2.0-rc4',
'types-pyOpenSSL<24.0.0',
]
build = [
Expand Down Expand Up @@ -100,7 +100,7 @@ dev-all = [
'pytest==8.2.2',
'rosbags~=0.0',
'ruff~=0.12.10',
'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4',
'sift-stream-bindings>=0.2.0-rc4',
'tomlkit~=0.13.3',
'types-pyOpenSSL<24.0.0',
]
Expand Down Expand Up @@ -153,7 +153,7 @@ docs-build = [
'pytest==8.2.2',
'rosbags~=0.0',
'ruff~=0.12.10',
'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4',
'sift-stream-bindings>=0.2.0-rc4',
'tomlkit~=0.13.3',
'types-pyOpenSSL<24.0.0',
]
Expand All @@ -176,10 +176,10 @@ rosbags = [
'rosbags~=0.0',
]
sift-stream = [
'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4',
'sift-stream-bindings>=0.2.0-rc4',
]
sift-stream-bindings = [
'sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4',
'sift-stream-bindings>=0.2.0-rc4',
]
tdms = [
'npTDMS~=1.9',
Expand Down Expand Up @@ -215,7 +215,7 @@ docs = ["mkdocs",
openssl = ["pyOpenSSL<24.0.0", "types-pyOpenSSL<24.0.0", "cffi~=1.14"]
tdms = ["npTDMS~=1.9"]
rosbags = ["rosbags~=0.0"]
sift-stream = ["sift-stream-bindings>=0.2.0-rc2,<0.2.0-rc4"]
sift-stream = ["sift-stream-bindings>=0.2.0-rc4"]
hdf5 = ["h5py~=3.11", "polars~=1.8"]
data-review = ["pyarrow>=17.0.0"]

Expand Down
Loading