From 186e69e273050c6b8979a7328afee9270bcc9c62 Mon Sep 17 00:00:00 2001 From: Alex Luck Date: Mon, 20 Oct 2025 09:26:20 -0700 Subject: [PATCH 1/3] move to_rust_py_timestamp to low-level-wrapper --- .../_internal/low_level_wrappers/ingestion.py | 19 ++++++++++++++++++- .../sift_client/_internal/util/timestamp.py | 16 ---------------- 2 files changed, 18 insertions(+), 17 deletions(-) diff --git a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py index 743e211a1..930338641 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py @@ -31,7 +31,6 @@ from sift_client._internal.low_level_wrappers.base import ( LowLevelClientBase, ) -from sift_client._internal.util.timestamp import to_rust_py_timestamp from sift_client.sift_types.ingestion import Flow, IngestionConfig, _to_rust_value from sift_client.transport import GrpcClient, WithGrpcClient from sift_client.util import cel_utils as cel @@ -45,9 +44,27 @@ IngestionConfigFormPy, IngestWithConfigDataStreamRequestPy, SiftStreamBuilderPy, + TimeValuePy, ) +def to_rust_py_timestamp(time: datetime) -> TimeValuePy: + """Convert a Python datetime to a Rust TimeValuePy. + + Args: + time: The datetime to convert + + Returns: + A TimeValuePy representation + """ + from sift_stream_bindings import TimeValuePy + + ts = time.timestamp() + secs = int(ts) + nsecs = int((ts - secs) * 1_000_000_000) + return TimeValuePy.from_timestamp(secs, nsecs) + + class IngestionThread(threading.Thread): """Manages ingestion for a single ingestion config.""" diff --git a/python/lib/sift_client/_internal/util/timestamp.py b/python/lib/sift_client/_internal/util/timestamp.py index 0d33689f2..5cc659fce 100644 --- a/python/lib/sift_client/_internal/util/timestamp.py +++ b/python/lib/sift_client/_internal/util/timestamp.py @@ -1,7 +1,6 @@ from datetime import datetime from google.protobuf.timestamp_pb2 import Timestamp -from sift_stream_bindings import TimeValuePy def to_pb_timestamp(timestamp: datetime) -> Timestamp: @@ -16,18 +15,3 @@ def to_pb_timestamp(timestamp: datetime) -> Timestamp: timestamp_pb = Timestamp() timestamp_pb.FromDatetime(timestamp) return timestamp_pb - - -def to_rust_py_timestamp(time: datetime) -> TimeValuePy: - """Convert a Python datetime to a Rust TimeValuePy. - - Args: - time: The datetime to convert - - Returns: - A TimeValuePy representation - """ - ts = time.timestamp() - secs = int(ts) - nsecs = int((ts - secs) * 1_000_000_000) - return TimeValuePy.from_timestamp(secs, nsecs) From 0bd18cdb6cd32bcd1bf8cd57bc7ec4b24b009f8b Mon Sep 17 00:00:00 2001 From: Alex Luck Date: Mon, 20 Oct 2025 09:40:37 -0700 Subject: [PATCH 2/3] update ingestion LLW such that sift-stream errors are only raised on call --- .../_internal/low_level_wrappers/ingestion.py | 57 ++++++++++++++----- 1 file changed, 43 insertions(+), 14 deletions(-) diff --git a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py index 930338641..4a94d4852 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py @@ -183,29 +183,52 @@ def __init__(self, grpc_client: GrpcClient): Args: grpc_client: The gRPC client to use for making API calls. """ - from sift_stream_bindings import ( - RecoveryStrategyPy, - RetryPolicyPy, - SiftStreamBuilderPy, - ) - super().__init__(grpc_client=grpc_client) + self._sift_stream_builder = None # Lazy-initialized + self.stream_cache = {} + atexit.register(self.cleanup, timeout=0.1) + + def _ensure_sift_stream_bindings(self): + """Ensure sift_stream_bindings is available and initialize the stream builder. + + Raises: + ImportError: If sift_stream_bindings is not installed. + """ + if self._sift_stream_builder is not None: + return + + try: + from sift_stream_bindings import ( + RecoveryStrategyPy, + RetryPolicyPy, + SiftStreamBuilderPy, + ) + except ImportError as e: + raise ImportError( + "The 'sift-stream' package is required for ingestion operations. " + "Please install it with:` `pip install sift-stack-py[sift-stream]`" + ) from e + # Rust GRPC client expects URI to have http(s):// prefix. - uri = grpc_client._config.uri + uri = self._grpc_client._config.uri if not uri.startswith("http"): - uri = f"https://{uri}" if grpc_client._config.use_ssl else f"http://{uri}" - self.sift_stream_builder = SiftStreamBuilderPy( + uri = f"https://{uri}" if self._grpc_client._config.use_ssl else f"http://{uri}" + self._sift_stream_builder = SiftStreamBuilderPy( uri=uri, - apikey=grpc_client._config.api_key, + apikey=self._grpc_client._config.api_key, ) - self.sift_stream_builder.enable_tls = grpc_client._config.use_ssl + self._sift_stream_builder.enable_tls = self._grpc_client._config.use_ssl # FD-177: Expose configuration for recovery strategy. - self.sift_stream_builder.recovery_strategy = RecoveryStrategyPy.retry_only( + self._sift_stream_builder.recovery_strategy = RecoveryStrategyPy.retry_only( RetryPolicyPy.default() ) - self.stream_cache = {} - atexit.register(self.cleanup, timeout=0.1) + @property + def sift_stream_builder(self) -> SiftStreamBuilderPy: + """Get the sift stream builder, initializing it if necessary.""" + self._ensure_sift_stream_bindings() + assert self._sift_stream_builder is not None + return self._sift_stream_builder def cleanup(self, timeout: float | None = None): """Cleanup the ingestion threads. @@ -266,6 +289,8 @@ def _new_ingestion_thread( ingestion_config_id: The id of the ingestion config for the flows this stream will ingest. Used to cache the stream. ingestion_config: The ingestion config to use for ingestion. """ + + self._ensure_sift_stream_bindings() data_queue: Queue[list[IngestWithConfigDataStreamRequestPy]] = Queue() existing = self.stream_cache.get(ingestion_config_id) if existing: @@ -330,6 +355,8 @@ async def create_ingestion_config( """ from sift_stream_bindings import IngestionConfigFormPy + self._ensure_sift_stream_bindings() + ingestion_config_id = None if client_key: logger.debug(f"Getting ingestion config id for client key {client_key}") @@ -410,6 +437,8 @@ def ingest_flow( """ from sift_stream_bindings import IngestWithConfigDataStreamRequestPy + self._ensure_sift_stream_bindings() + if not flow.ingestion_config_id: raise ValueError( "Flow has no ingestion config id -- have you created an ingestion config for this flow?" From 547a091f7ce3f9b74cb43f2d464b3b7b7f1c2c7f Mon Sep 17 00:00:00 2001 From: Alex Luck Date: Mon, 20 Oct 2025 09:44:21 -0700 Subject: [PATCH 3/3] linting --- .../lib/sift_client/_internal/low_level_wrappers/ingestion.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py index 4a94d4852..8ebd9201a 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py @@ -174,7 +174,7 @@ class IngestionLowLevelClient(LowLevelClientBase, WithGrpcClient): CacheEntry = namedtuple("CacheEntry", ["data_queue", "ingestion_config", "thread"]) - sift_stream_builder: SiftStreamBuilderPy + _sift_stream_builder: SiftStreamBuilderPy | None stream_cache: dict[str, CacheEntry] def __init__(self, grpc_client: GrpcClient):