Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 62 additions & 16 deletions python/lib/sift_client/_internal/low_level_wrappers/ingestion.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
from sift_client._internal.low_level_wrappers.base import (
LowLevelClientBase,
)
from sift_client._internal.util.timestamp import to_rust_py_timestamp
from sift_client.sift_types.ingestion import Flow, IngestionConfig, _to_rust_value
from sift_client.transport import GrpcClient, WithGrpcClient
from sift_client.util import cel_utils as cel
Expand All @@ -45,9 +44,27 @@
IngestionConfigFormPy,
IngestWithConfigDataStreamRequestPy,
SiftStreamBuilderPy,
TimeValuePy,
)


def to_rust_py_timestamp(time: datetime) -> TimeValuePy:
"""Convert a Python datetime to a Rust TimeValuePy.

Args:
time: The datetime to convert

Returns:
A TimeValuePy representation
"""
from sift_stream_bindings import TimeValuePy

ts = time.timestamp()
secs = int(ts)
nsecs = int((ts - secs) * 1_000_000_000)
return TimeValuePy.from_timestamp(secs, nsecs)


class IngestionThread(threading.Thread):
"""Manages ingestion for a single ingestion config."""

Expand Down Expand Up @@ -157,7 +174,7 @@ class IngestionLowLevelClient(LowLevelClientBase, WithGrpcClient):

CacheEntry = namedtuple("CacheEntry", ["data_queue", "ingestion_config", "thread"])

sift_stream_builder: SiftStreamBuilderPy
_sift_stream_builder: SiftStreamBuilderPy | None
stream_cache: dict[str, CacheEntry]

def __init__(self, grpc_client: GrpcClient):
Expand All @@ -166,29 +183,52 @@ def __init__(self, grpc_client: GrpcClient):
Args:
grpc_client: The gRPC client to use for making API calls.
"""
from sift_stream_bindings import (
RecoveryStrategyPy,
RetryPolicyPy,
SiftStreamBuilderPy,
)

super().__init__(grpc_client=grpc_client)
self._sift_stream_builder = None # Lazy-initialized
self.stream_cache = {}
atexit.register(self.cleanup, timeout=0.1)

def _ensure_sift_stream_bindings(self):
"""Ensure sift_stream_bindings is available and initialize the stream builder.

Raises:
ImportError: If sift_stream_bindings is not installed.
"""
if self._sift_stream_builder is not None:
return

try:
from sift_stream_bindings import (
RecoveryStrategyPy,
RetryPolicyPy,
SiftStreamBuilderPy,
)
except ImportError as e:
raise ImportError(
"The 'sift-stream' package is required for ingestion operations. "
"Please install it with:` `pip install sift-stack-py[sift-stream]`"
) from e

# Rust GRPC client expects URI to have http(s):// prefix.
uri = grpc_client._config.uri
uri = self._grpc_client._config.uri
if not uri.startswith("http"):
uri = f"https://{uri}" if grpc_client._config.use_ssl else f"http://{uri}"
self.sift_stream_builder = SiftStreamBuilderPy(
uri = f"https://{uri}" if self._grpc_client._config.use_ssl else f"http://{uri}"
self._sift_stream_builder = SiftStreamBuilderPy(
uri=uri,
apikey=grpc_client._config.api_key,
apikey=self._grpc_client._config.api_key,
)
self.sift_stream_builder.enable_tls = grpc_client._config.use_ssl
self._sift_stream_builder.enable_tls = self._grpc_client._config.use_ssl
# FD-177: Expose configuration for recovery strategy.
self.sift_stream_builder.recovery_strategy = RecoveryStrategyPy.retry_only(
self._sift_stream_builder.recovery_strategy = RecoveryStrategyPy.retry_only(
RetryPolicyPy.default()
)
self.stream_cache = {}

atexit.register(self.cleanup, timeout=0.1)
@property
def sift_stream_builder(self) -> SiftStreamBuilderPy:
"""Get the sift stream builder, initializing it if necessary."""
self._ensure_sift_stream_bindings()
assert self._sift_stream_builder is not None
return self._sift_stream_builder

def cleanup(self, timeout: float | None = None):
"""Cleanup the ingestion threads.
Expand Down Expand Up @@ -249,6 +289,8 @@ def _new_ingestion_thread(
ingestion_config_id: The id of the ingestion config for the flows this stream will ingest. Used to cache the stream.
ingestion_config: The ingestion config to use for ingestion.
"""

self._ensure_sift_stream_bindings()
data_queue: Queue[list[IngestWithConfigDataStreamRequestPy]] = Queue()
existing = self.stream_cache.get(ingestion_config_id)
if existing:
Expand Down Expand Up @@ -313,6 +355,8 @@ async def create_ingestion_config(
"""
from sift_stream_bindings import IngestionConfigFormPy

self._ensure_sift_stream_bindings()

ingestion_config_id = None
if client_key:
logger.debug(f"Getting ingestion config id for client key {client_key}")
Expand Down Expand Up @@ -393,6 +437,8 @@ def ingest_flow(
"""
from sift_stream_bindings import IngestWithConfigDataStreamRequestPy

self._ensure_sift_stream_bindings()

if not flow.ingestion_config_id:
raise ValueError(
"Flow has no ingestion config id -- have you created an ingestion config for this flow?"
Expand Down
16 changes: 0 additions & 16 deletions python/lib/sift_client/_internal/util/timestamp.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from datetime import datetime

from google.protobuf.timestamp_pb2 import Timestamp
from sift_stream_bindings import TimeValuePy


def to_pb_timestamp(timestamp: datetime) -> Timestamp:
Expand All @@ -16,18 +15,3 @@ def to_pb_timestamp(timestamp: datetime) -> Timestamp:
timestamp_pb = Timestamp()
timestamp_pb.FromDatetime(timestamp)
return timestamp_pb


def to_rust_py_timestamp(time: datetime) -> TimeValuePy:
"""Convert a Python datetime to a Rust TimeValuePy.

Args:
time: The datetime to convert

Returns:
A TimeValuePy representation
"""
ts = time.timestamp()
secs = int(ts)
nsecs = int((ts - secs) * 1_000_000_000)
return TimeValuePy.from_timestamp(secs, nsecs)
Loading