From 1ff292004e6e280c9e745237c6fc7fc484db46bc Mon Sep 17 00:00:00 2001 From: Taylor Jones Date: Mon, 30 Mar 2026 10:59:42 -0500 Subject: [PATCH 1/9] rust(feat): Refactoring sift-stream send APIs --- rust/crates/sift_stream/src/lib.rs | 1 + rust/crates/sift_stream/src/stream/mod.rs | 4 + .../sift_stream/src/stream/send_error.rs | 139 ++++++++++++++++++ 3 files changed, 144 insertions(+) create mode 100644 rust/crates/sift_stream/src/stream/send_error.rs diff --git a/rust/crates/sift_stream/src/lib.rs b/rust/crates/sift_stream/src/lib.rs index b8e5dce18..1e4499e29 100644 --- a/rust/crates/sift_stream/src/lib.rs +++ b/rust/crates/sift_stream/src/lib.rs @@ -456,6 +456,7 @@ pub use stream::{ file_backup::FileBackup, ingestion_config::{Flow, IngestionConfigEncoder, LiveStreaming}, }, + send_error::{SendError, TrySendError}, time::TimeValue, }; diff --git a/rust/crates/sift_stream/src/stream/mod.rs b/rust/crates/sift_stream/src/stream/mod.rs index 83fdb2a20..d27819e8b 100644 --- a/rust/crates/sift_stream/src/stream/mod.rs +++ b/rust/crates/sift_stream/src/stream/mod.rs @@ -37,6 +37,10 @@ pub(crate) mod flow; /// Task-based architecture for non-blocking SiftStream operations pub mod tasks; +/// Error types returned by [`Transport`] send methods. +pub mod send_error; +pub use send_error::{SendError, TrySendError}; + #[cfg(test)] mod test; diff --git a/rust/crates/sift_stream/src/stream/send_error.rs b/rust/crates/sift_stream/src/stream/send_error.rs new file mode 100644 index 000000000..5ce317414 --- /dev/null +++ b/rust/crates/sift_stream/src/stream/send_error.rs @@ -0,0 +1,139 @@ +use std::fmt; + +/// Returned by the async `Transport::send` / `Transport::send_requests` when +/// the underlying channel is closed and delivery cannot complete. +/// +/// The inner value `T` is the undelivered message so the caller can decide +/// what to do with it (e.g. log, retry, discard). +#[derive(Debug)] +pub struct SendError(pub T); + +impl SendError { + /// Consume the error, returning the undelivered value. + pub fn into_inner(self) -> T { + self.0 + } +} + +impl fmt::Display for SendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "channel closed: failed to send message") + } +} + +impl std::error::Error for SendError {} + +/// Returned by the sync `Transport::try_send` / `Transport::try_send_requests` +/// when immediate delivery fails. +#[derive(Debug)] +pub enum TrySendError { + /// The channel has been closed. The undelivered value is returned. + Closed(T), + /// The channel is currently full. The undelivered value is returned. + Full(T), +} + +impl TrySendError { + /// Consume the error, returning the undelivered value. + pub fn into_inner(self) -> T { + match self { + TrySendError::Closed(v) | TrySendError::Full(v) => v, + } + } + + pub fn is_closed(&self) -> bool { + matches!(self, TrySendError::Closed(_)) + } + + pub fn is_full(&self) -> bool { + matches!(self, TrySendError::Full(_)) + } +} + +impl fmt::Display for TrySendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + TrySendError::Closed(_) => write!(f, "channel closed: failed to send message"), + TrySendError::Full(_) => write!(f, "channel full: failed to send message"), + } + } +} + +impl std::error::Error for TrySendError {} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn send_error_into_inner() { + let err = SendError(42u32); + assert_eq!(err.into_inner(), 42u32); + } + + #[test] + fn send_error_display() { + let err = SendError("msg"); + assert!(err.to_string().contains("channel closed")); + } + + #[test] + fn send_error_is_error() { + fn assert_error(_: &E) {} + let err = SendError(0u8); + assert_error(&err); + } + + #[test] + fn try_send_error_into_inner_closed() { + let err: TrySendError = TrySendError::Closed(7); + assert_eq!(err.into_inner(), 7); + } + + #[test] + fn try_send_error_into_inner_full() { + let err: TrySendError = TrySendError::Full(9); + assert_eq!(err.into_inner(), 9); + } + + #[test] + fn try_send_error_is_closed() { + assert!(TrySendError::::Closed(0).is_closed()); + assert!(!TrySendError::::Full(0).is_closed()); + } + + #[test] + fn try_send_error_is_full() { + assert!(TrySendError::::Full(0).is_full()); + assert!(!TrySendError::::Closed(0).is_full()); + } + + #[test] + fn try_send_error_display() { + assert!( + TrySendError::::Closed(0) + .to_string() + .contains("channel closed") + ); + assert!( + TrySendError::::Full(0) + .to_string() + .contains("channel full") + ); + } + + #[test] + fn try_send_error_is_error() { + fn assert_error(_: &E) {} + let err = TrySendError::Closed(0u8); + assert_error(&err); + } + + #[test] + fn try_send_error_debug() { + let closed = TrySendError::Closed(42u32); + let full = TrySendError::Full(42u32); + assert!(format!("{:?}", closed).contains("Closed")); + assert!(format!("{:?}", full).contains("Full")); + } +} From 8fbffe777f8a6e6904b3dadf4c9c6c905388f6af Mon Sep 17 00:00:00 2001 From: Taylor Jones Date: Mon, 30 Mar 2026 12:04:53 -0500 Subject: [PATCH 2/9] Checkpoint: new apis implemented --- .../sift_stream/examples/backups-only/main.rs | 2 +- .../sift_stream/examples/quick-start/main.rs | 2 +- rust/crates/sift_stream/src/lib.rs | 2 +- rust/crates/sift_stream/src/stream/mod.rs | 120 +++++++-- .../src/stream/mode/file_backup.rs | 172 ++++++------ .../src/stream/mode/ingestion_config.rs | 250 +++++++++++++----- .../sift_stream/src/stream/send_error.rs | 63 +++++ rust/crates/sift_stream/src/stream/tasks.rs | 4 +- 8 files changed, 438 insertions(+), 177 deletions(-) diff --git a/rust/crates/sift_stream/examples/backups-only/main.rs b/rust/crates/sift_stream/examples/backups-only/main.rs index e8633d291..ba7ee3cbd 100644 --- a/rust/crates/sift_stream/examples/backups-only/main.rs +++ b/rust/crates/sift_stream/examples/backups-only/main.rs @@ -124,7 +124,7 @@ async fn run() -> Result<(), Box> { .unwrap(); sift_stream - .send_requests_nonblocking(vec![flow_builder.request(TimeValue::now())]) + .try_send_requests(vec![flow_builder.request(TimeValue::now())]) .unwrap(); // For demonstrative purposes, adding a contrived wait to get 10Hz data. diff --git a/rust/crates/sift_stream/examples/quick-start/main.rs b/rust/crates/sift_stream/examples/quick-start/main.rs index 1c753900c..ff60597b3 100644 --- a/rust/crates/sift_stream/examples/quick-start/main.rs +++ b/rust/crates/sift_stream/examples/quick-start/main.rs @@ -116,7 +116,7 @@ async fn run() -> Result<(), Box> { // Send telemetry to Sift. sift_stream - .send_requests_nonblocking(vec![flow_builder.request(TimeValue::now())]) + .try_send_requests(vec![flow_builder.request(TimeValue::now())]) .unwrap(); // For demonstrative purposes, adding a contrived wait to get 10Hz data. diff --git a/rust/crates/sift_stream/src/lib.rs b/rust/crates/sift_stream/src/lib.rs index 1e4499e29..73d13ac71 100644 --- a/rust/crates/sift_stream/src/lib.rs +++ b/rust/crates/sift_stream/src/lib.rs @@ -456,7 +456,7 @@ pub use stream::{ file_backup::FileBackup, ingestion_config::{Flow, IngestionConfigEncoder, LiveStreaming}, }, - send_error::{SendError, TrySendError}, + send_error::{SendError, SiftStreamSendError, SiftStreamTrySendError, TrySendError}, time::TimeValue, }; diff --git a/rust/crates/sift_stream/src/stream/mod.rs b/rust/crates/sift_stream/src/stream/mod.rs index d27819e8b..5660b2615 100644 --- a/rust/crates/sift_stream/src/stream/mod.rs +++ b/rust/crates/sift_stream/src/stream/mod.rs @@ -39,7 +39,7 @@ pub mod tasks; /// Error types returned by [`Transport`] send methods. pub mod send_error; -pub use send_error::{SendError, TrySendError}; +pub use send_error::{SendError, SiftStreamSendError, SiftStreamTrySendError, TrySendError}; #[cfg(test)] mod test; @@ -80,17 +80,49 @@ pub trait Transport: private::Sealed { type Message: Send + Sync; type Encoder: Encoder; - /// Send a [`Self::Message`]. - fn send(&mut self, stream_id: &Uuid, message: Self::Message) -> Result<()>; + /// Send a single message with backpressure. + /// + /// Awaits until the channel has capacity. Returns [`SendError`] + /// containing the undelivered message if the channel is closed. + async fn send( + &mut self, + stream_id: &Uuid, + message: Self::Message, + ) -> std::result::Result<(), SendError>; - /// Send a batch of messages via an iterator. + /// Send a batch of messages with backpressure. /// - /// This method is used as a more performant way to send a batch of messages, assuming - /// the iterator itself is not performing substantial work. + /// Awaits for each message in turn. Returns [`SendError>`] + /// containing all undelivered messages (the failed one plus any not yet attempted) + /// if the channel is closed mid-iteration. + async fn send_requests( + &mut self, + stream_id: &Uuid, + requests: I, + ) -> std::result::Result<(), SendError>> + where + I: IntoIterator + Send, + I::IntoIter: Send; + + /// Attempt to send a single message without blocking. /// - /// However, this is less convenient since the caller will need to ensure the - /// resulting [`Self::Message`]s are properly created. - fn send_requests(&mut self, stream_id: &Uuid, requests: I) -> Result<()> + /// Returns [`TrySendError`] with the undelivered message if + /// the channel is full or closed. + fn try_send( + &mut self, + stream_id: &Uuid, + message: Self::Message, + ) -> std::result::Result<(), TrySendError>; + + /// Attempt to send a batch of messages without blocking. + /// + /// Returns [`TrySendError>`] with all undelivered messages + /// (the failed one plus any not yet attempted) on first failure. + fn try_send_requests( + &mut self, + stream_id: &Uuid, + requests: I, + ) -> std::result::Result<(), TrySendError>> where I: IntoIterator + Send, I::IntoIter: Send; @@ -153,43 +185,79 @@ where self.run.as_ref() } - /// The entry-point to send actual telemetry to Sift in the form of [`Flow`](mode::ingestion_config::Flow)s. - pub async fn send(&mut self, message: M) -> Result<()> + /// Send telemetry with backpressure. Awaits until the channel has capacity. + /// + /// Returns [`SiftStreamSendError::EncodeError`] if the message cannot be encoded, or + /// [`SiftStreamSendError::ChannelClosed`] (with the undelivered message) if the + /// backing channel closes before delivery completes. + pub async fn send( + &mut self, + message: M, + ) -> std::result::Result<(), SiftStreamSendError<::Message>> where M: Encodeable::Message> + Send + Sync, { let encoded = message .encode(&mut self.encoder, &self.sift_stream_id, self.run.as_ref()) - .ok_or(Error::new_msg( - ErrorKind::EncodeMessageError, - "Failed to encode message", - ))?; + .ok_or_else(|| SiftStreamSendError::encode_error("Failed to encode message"))?; - self.transport.send(&self.sift_stream_id, encoded) + self.transport + .send(&self.sift_stream_id, encoded) + .await + .map_err(|SendError(msg)| SiftStreamSendError::ChannelClosed(msg)) } - /// This method offers a way to send data in a manner that's identical to the raw - /// [`gRPC service`] for ingestion-config based streaming. + /// Send a batch of pre-encoded requests with backpressure. /// - /// [`gRPC service`]: https://github.com/sift-stack/sift/blob/main/protos/sift/ingest/v1/ingest.proto#L11 - pub async fn send_requests(&mut self, requests: I) -> Result<()> + /// Awaits for each message in turn. Returns [`SendError`] containing all + /// undelivered messages if the channel closes mid-iteration. + pub async fn send_requests( + &mut self, + requests: I, + ) -> std::result::Result<(), SendError::Message>>> where I: IntoIterator::Message> + Send, I::IntoIter: Send, { - self.transport.send_requests(&self.sift_stream_id, requests) + self.transport + .send_requests(&self.sift_stream_id, requests) + .await + } + + /// Attempt to send telemetry without blocking. + /// + /// Returns [`SiftStreamTrySendError::EncodeError`] if the message cannot be encoded, or + /// [`SiftStreamTrySendError::Channel`] (with the undelivered message) if the channel + /// is full or closed. + pub fn try_send( + &mut self, + message: M, + ) -> std::result::Result<(), SiftStreamTrySendError<::Message>> + where + M: Encodeable::Message> + Send + Sync, + { + let encoded = message + .encode(&mut self.encoder, &self.sift_stream_id, self.run.as_ref()) + .ok_or_else(|| SiftStreamTrySendError::encode_error("Failed to encode message"))?; + + self.transport + .try_send(&self.sift_stream_id, encoded) + .map_err(SiftStreamTrySendError::Channel) } - /// This method offers a way to send data in a manner that's identical to the raw - /// [`gRPC service`] for ingestion-config based streaming. + /// Attempt to send a batch of pre-encoded requests without blocking. /// - /// [`gRPC service`]: https://github.com/sift-stack/sift/blob/main/protos/sift/ingest/v1/ingest.proto#L11 - pub fn send_requests_nonblocking(&mut self, requests: I) -> Result<()> + /// Returns [`TrySendError`] with all undelivered messages on first failure. + pub fn try_send_requests( + &mut self, + requests: I, + ) -> std::result::Result<(), TrySendError::Message>>> where I: IntoIterator::Message> + Send, I::IntoIter: Send, { - self.transport.send_requests(&self.sift_stream_id, requests) + self.transport + .try_send_requests(&self.sift_stream_id, requests) } /// Gracefully finish the stream, draining any remaining data before returning. diff --git a/rust/crates/sift_stream/src/stream/mode/file_backup.rs b/rust/crates/sift_stream/src/stream/mode/file_backup.rs index daeab0f7c..43a4fbc88 100644 --- a/rust/crates/sift_stream/src/stream/mode/file_backup.rs +++ b/rust/crates/sift_stream/src/stream/mode/file_backup.rs @@ -1,4 +1,5 @@ use crate::stream::mode::ingestion_config::IngestionConfigEncoder; +use crate::stream::send_error::{SendError, TrySendError}; use crate::stream::{SiftStream, Transport, private::Sealed}; use crate::{ DiskBackupPolicy, RetryPolicy, @@ -106,12 +107,12 @@ pub struct FileBackup { // Seal the trait - only this crate can implement SiftStreamMode impl Sealed for FileBackup {} -#[async_trait] -impl Transport for FileBackup { - type Encoder = IngestionConfigEncoder; - type Message = IngestWithConfigDataStreamRequest; - - fn send(&mut self, stream_id: &Uuid, message: Self::Message) -> Result<()> { +impl FileBackup { + fn prepare_message( + &mut self, + stream_id: &Uuid, + message: IngestWithConfigDataStreamRequest, + ) -> Arc { self.metrics.messages_received.increment(); #[cfg(feature = "tracing")] @@ -119,43 +120,105 @@ impl Transport for FileBackup { if !self.flows_seen.contains(&message.flow) { self.metrics.unique_flows_received.increment(); self.flows_seen.insert(message.flow.clone()); - tracing::info!( - sift_stream_id = %stream_id, - "flow '{}' being ingested for the first time", - &message.flow, - ); + tracing::info!(sift_stream_id = %stream_id, "flow '{}' being ingested for the first time", &message.flow); } } - // Track the backup channel depth. self.metrics .backup_channel_depth .set(self.write_tx.len() as u64); + Arc::new(message) + } +} - // Send the request to the background write task (non-blocking) - let request_arc = Arc::new(message); - self.write_tx.try_send(request_arc).map_err(|e| { - if e.is_full() { - Error::new_msg(ErrorKind::StreamError, "file backup write channel is full") - } else { - Error::new_msg( - ErrorKind::StreamError, - format!("file backup write channel is closed: {e}"), - ) - } - })?; +#[async_trait] +impl Transport for FileBackup { + type Encoder = IngestionConfigEncoder; + type Message = IngestWithConfigDataStreamRequest; + + async fn send( + &mut self, + stream_id: &Uuid, + message: Self::Message, + ) -> std::result::Result<(), SendError> { + let arc = self.prepare_message(stream_id, message); + + self.write_tx + .send(arc) + .await + .map_err(|async_channel::SendError(a)| { + SendError(Arc::try_unwrap(a).unwrap_or_else(|a| (*a).clone())) + })?; self.metrics.messages_sent.increment(); Ok(()) } - fn send_requests(&mut self, stream_id: &Uuid, requests: I) -> Result<()> + fn try_send( + &mut self, + stream_id: &Uuid, + message: Self::Message, + ) -> std::result::Result<(), TrySendError> { + let arc = self.prepare_message(stream_id, message); + + match self.write_tx.try_send(arc) { + Ok(()) => { + self.metrics.messages_sent.increment(); + Ok(()) + } + Err(async_channel::TrySendError::Full(a)) => Err(TrySendError::Full( + Arc::try_unwrap(a).unwrap_or_else(|a| (*a).clone()), + )), + Err(async_channel::TrySendError::Closed(a)) => Err(TrySendError::Closed( + Arc::try_unwrap(a).unwrap_or_else(|a| (*a).clone()), + )), + } + } + + async fn send_requests( + &mut self, + stream_id: &Uuid, + requests: I, + ) -> std::result::Result<(), SendError>> where - I: IntoIterator + Send, + I: IntoIterator + Send, I::IntoIter: Send, { - for req in requests { - self.send(stream_id, req)?; + let mut iter = requests.into_iter(); + while let Some(msg) = iter.next() { + if let Err(SendError(failed)) = self.send(stream_id, msg).await { + let mut undelivered = vec![failed]; + undelivered.extend(iter); + return Err(SendError(undelivered)); + } + } + Ok(()) + } + + fn try_send_requests( + &mut self, + stream_id: &Uuid, + requests: I, + ) -> std::result::Result<(), TrySendError>> + where + I: IntoIterator + Send, + I::IntoIter: Send, + { + let mut iter = requests.into_iter(); + while let Some(msg) = iter.next() { + match self.try_send(stream_id, msg) { + Ok(()) => {} + Err(TrySendError::Full(failed)) => { + let mut undelivered = vec![failed]; + undelivered.extend(iter); + return Err(TrySendError::Full(undelivered)); + } + Err(TrySendError::Closed(failed)) => { + let mut undelivered = vec![failed]; + undelivered.extend(iter); + return Err(TrySendError::Closed(undelivered)); + } + } } Ok(()) } @@ -743,7 +806,7 @@ mod tests { let request = create_test_request("test_flow", &ingestion_config.ingestion_config_id); // Send the request - mode.send(&sift_stream_id, request).unwrap(); + mode.try_send(&sift_stream_id, request).unwrap(); // Wait for the background task to process the message wait_for_backup_metrics(&metrics, 1, 1000).await; @@ -782,9 +845,9 @@ mod tests { let request2 = create_test_request("flow2", &ingestion_config.ingestion_config_id); let request3 = create_test_request("flow1", &ingestion_config.ingestion_config_id); // Duplicate - mode.send(&sift_stream_id, request1).unwrap(); - mode.send(&sift_stream_id, request2).unwrap(); - mode.send(&sift_stream_id, request3).unwrap(); + mode.try_send(&sift_stream_id, request1).unwrap(); + mode.try_send(&sift_stream_id, request2).unwrap(); + mode.try_send(&sift_stream_id, request3).unwrap(); // Wait for the background task to process all messages wait_for_backup_metrics(&metrics, 3, 1000).await; @@ -825,7 +888,7 @@ mod tests { create_test_request("flow3", &ingestion_config.ingestion_config_id), ]; - mode.send_requests(&sift_stream_id, requests).unwrap(); + mode.try_send_requests(&sift_stream_id, requests).unwrap(); // Wait for the background task to process all messages wait_for_backup_metrics(&metrics, 3, 1000).await; @@ -838,45 +901,6 @@ mod tests { mode.finish(&sift_stream_id).await.unwrap(); } - #[tokio::test] - async fn test_file_backup_mode_send_requests_nonblocking() { - let ingestion_config = create_test_ingestion_config(); - let temp_dir = TempDir::new("test_file_backup").unwrap(); - let metrics = Arc::new(SiftStreamMetrics::default()); - let sift_stream_id = Uuid::new_v4(); - - let file_writer_config = FileWriterConfig { - directory: temp_dir.path().to_path_buf(), - prefix: ingestion_config.client_key.clone(), - max_size: 1024 * 1024, - }; - - let mut mode = create_test_file_backup_mode( - temp_dir.path().to_path_buf(), - file_writer_config, - 1024 * 100, - metrics.clone(), - ) - .await; - - let requests = vec![ - create_test_request("flow1", &ingestion_config.ingestion_config_id), - create_test_request("flow2", &ingestion_config.ingestion_config_id), - ]; - - mode.send_requests(&sift_stream_id, requests).unwrap(); - - // Wait for the background task to process all messages - wait_for_backup_metrics(&metrics, 2, 1000).await; - - assert_eq!(metrics.messages_received.get(), 2); - assert_eq!(metrics.messages_sent.get(), 2); - assert_eq!(metrics.backups.total_messages.get(), 2); - - // Finish to ensure all data is written - mode.finish(&sift_stream_id).await.unwrap(); - } - #[tokio::test] async fn test_file_backup_mode_send_with_flow_descriptor() { let ingestion_config = create_test_ingestion_config(); @@ -912,7 +936,7 @@ mod tests { let request = builder.request(TimeValue::now()); - mode.send(&sift_stream_id, request).unwrap(); + mode.try_send(&sift_stream_id, request).unwrap(); // Wait for the background task to process the message wait_for_backup_metrics(&metrics, 1, 1000).await; @@ -955,7 +979,7 @@ mod tests { let request = builder.request(TimeValue::now()); // Should still succeed even without flow descriptor - mode.send(&sift_stream_id, request).unwrap(); + mode.try_send(&sift_stream_id, request).unwrap(); // Wait for the background task to process the message wait_for_backup_metrics(&metrics, 1, 1000).await; diff --git a/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs b/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs index 27eb1e2f2..f06ce0ac4 100644 --- a/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs +++ b/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs @@ -1,5 +1,6 @@ use crate::FlowBuilder; use crate::metrics::{SiftStreamMetrics, SiftStreamMetricsSnapshot}; +use crate::stream::send_error::{SendError, TrySendError}; use crate::stream::{Encodeable, Encoder, MetricsSnapshot, Transport}; use crate::stream::{ SiftStream, @@ -56,34 +57,27 @@ pub struct LiveStreaming { // Seal the trait - only this crate can implement SiftStreamMode impl Sealed for LiveStreaming {} -#[async_trait] -impl Transport for LiveStreaming { - type Encoder = IngestionConfigEncoder; - type Message = IngestWithConfigDataStreamRequest; - - /// Sends the message to Sift for live ingestion, while in parallel also sends a backup of the message to a file. - fn send(&mut self, stream_id: &Uuid, message: Self::Message) -> Result<()> { +impl LiveStreaming { + fn prepare_message( + &mut self, + stream_id: &Uuid, + message: IngestWithConfigDataStreamRequest, + ) -> DataMessage { #[cfg(feature = "tracing")] { if !self.flows_seen.contains(&message.flow) { self.metrics.unique_flows_received.increment(); self.flows_seen.insert(message.flow.clone()); - tracing::info!( - sift_stream_id = %stream_id, - "flow '{}' being ingested for the first time", - &message.flow, - ); + tracing::info!(sift_stream_id = %stream_id, "flow '{}' being ingested for the first time", &message.flow); } } - // Track the channel depths. self.metrics .ingestion_channel_depth .set(self.stream_system.ingestion_tx.len() as u64); self.metrics .backup_channel_depth .set(self.stream_system.backup_tx.len() as u64); - self.metrics.messages_received.increment(); let data_msg = DataMessage { @@ -91,83 +85,193 @@ impl Transport for LiveStreaming { request: Arc::new(message), dropped_for_ingestion: false, }; - self.message_id_counter += 1; + data_msg + } - // Send the message for backup first. If this fails, log an error and continue. - // - // Failure to backup can lead to data loss though it is preferable to attempt - // to stream the message to Sift rather than return the error and prevent both. - // - // TODO(tsift): Make this behavior optional via a builder arg. - if let Err(e) = self.stream_system.backup_tx.try_send(data_msg.clone()) { - #[cfg(feature = "tracing")] - tracing::warn!( - sift_stream_id = %stream_id, - "failed to send data to backup system, data will still be streamed to Sift: {e}" - ); - } - - self.metrics.messages_sent_to_backup.increment(); - - // Send the message for ingestion. - // - // If the channel is full, the oldest message will be removed in order to create space for the newer message. - // For ingestion, newer data is preferred over older data. + /// Used by `async fn send`. If an oldest message is evicted from the ingestion + /// channel, awaits until backup has space to accept it. Returns the undeliverable + /// message on backup channel close or ingestion channel close. + async fn dispatch_to_ingestion( + &mut self, + stream_id: &Uuid, + data_msg: DataMessage, + ) -> Option { match self.stream_system.ingestion_tx.force_send(data_msg) { - Ok(None) => Ok(()), - Ok(Some(mut oldest_message)) => { + Ok(None) => None, + Ok(Some(mut oldest)) => { + oldest.dropped_for_ingestion = true; + self.metrics.old_messages_dropped_for_ingestion.increment(); + self.metrics.checkpoint.failed_checkpoint_count.increment(); + // Block until backup has space. + match self.stream_system.backup_tx.send(oldest).await { + Ok(()) => { + self.metrics.messages_sent_to_backup.increment(); + None + } + Err(async_channel::SendError(dm)) => { + self.metrics + .old_messages_failed_adding_to_backup + .increment(); + #[cfg(feature = "tracing")] + tracing::debug!(sift_stream_id = %stream_id, "backup channel closed while dispatching evicted message"); + Some(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone())) + } + } + } + Err(async_channel::SendError(dm)) => { + // ingestion channel closed — return the message to the caller #[cfg(feature = "tracing")] - tracing::debug!( - sift_stream_id = %stream_id, - "data channel full, dropping oldest message" - ); + tracing::debug!(sift_stream_id = %stream_id, "ingestion channel closed"); + Some(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone())) + } + } + } - oldest_message.dropped_for_ingestion = true; + /// Used by `fn try_send`. If an oldest message is evicted from the ingestion + /// channel and backup is full or closed, returns the evicted message to the + /// caller. Also returns the message when the ingestion channel itself is closed. + fn try_dispatch_to_ingestion( + &mut self, + stream_id: &Uuid, + data_msg: DataMessage, + ) -> Option { + match self.stream_system.ingestion_tx.force_send(data_msg) { + Ok(None) => None, + Ok(Some(mut oldest)) => { + oldest.dropped_for_ingestion = true; self.metrics.old_messages_dropped_for_ingestion.increment(); - self.metrics.messages_sent_to_backup.increment(); self.metrics.checkpoint.failed_checkpoint_count.increment(); + match self.stream_system.backup_tx.try_send(oldest) { + Ok(()) => { + self.metrics.messages_sent_to_backup.increment(); + None + } + Err(async_channel::TrySendError::Full(dm)) => { + self.metrics + .old_messages_failed_adding_to_backup + .increment(); + Some(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone())) + } + Err(async_channel::TrySendError::Closed(dm)) => { + self.metrics + .old_messages_failed_adding_to_backup + .increment(); + #[cfg(feature = "tracing")] + tracing::debug!(sift_stream_id = %stream_id, "backup channel closed while dispatching evicted message"); + Some(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone())) + } + } + } + Err(async_channel::SendError(dm)) => { + // ingestion channel closed — return the message to the caller + #[cfg(feature = "tracing")] + tracing::debug!(sift_stream_id = %stream_id, "ingestion channel closed"); + Some(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone())) + } + } + } +} - // Re-send the oldest message to the backup to ensure it is re-ingested later despite being - // dropped from the ingestion channel. - // - // On failure, rely on metrics to track occurences. Logging can quickly become spammy as - // the system works through bursts of messages so logs are reduced to the debug level. - if let Err(e) = self - .stream_system - .backup_tx - .try_send(oldest_message) - .map_err(|e| Error::new(ErrorKind::StreamError, e)) - .context("failed to send data to backup task system") - { - self.metrics - .old_messages_failed_adding_to_backup - .increment(); +#[async_trait] +impl Transport for LiveStreaming { + type Encoder = IngestionConfigEncoder; + type Message = IngestWithConfigDataStreamRequest; - #[cfg(feature = "tracing")] - tracing::debug!( - sift_stream_id = %stream_id, - "failed to send oldest data to backup task system: {e}" - ); - } + async fn send( + &mut self, + stream_id: &Uuid, + message: Self::Message, + ) -> std::result::Result<(), SendError> { + let data_msg = self.prepare_message(stream_id, message); - // Do not interupt ingestion. - Ok(()) + self.stream_system + .backup_tx + .send(data_msg.clone()) + .await + .map_err(|async_channel::SendError(dm)| { + SendError(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone())) + })?; + + self.metrics.messages_sent_to_backup.increment(); + if let Some(displaced) = self.dispatch_to_ingestion(stream_id, data_msg).await { + return Err(SendError(displaced)); + } + Ok(()) + } + + fn try_send( + &mut self, + stream_id: &Uuid, + message: Self::Message, + ) -> std::result::Result<(), TrySendError> { + let data_msg = self.prepare_message(stream_id, message); + + match self.stream_system.backup_tx.try_send(data_msg.clone()) { + Ok(()) => {} + Err(async_channel::TrySendError::Full(dm)) => { + return Err(TrySendError::Full( + Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()), + )); + } + Err(async_channel::TrySendError::Closed(dm)) => { + return Err(TrySendError::Closed( + Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()), + )); } - Err(e) => Err(Error::new_msg( - ErrorKind::StreamError, - format!("queueing data for ingestion failed: {e}"), - )), } + + self.metrics.messages_sent_to_backup.increment(); + if let Some(displaced) = self.try_dispatch_to_ingestion(stream_id, data_msg) { + return Err(TrySendError::Full(displaced)); + } + Ok(()) + } + + async fn send_requests( + &mut self, + stream_id: &Uuid, + requests: I, + ) -> std::result::Result<(), SendError>> + where + I: IntoIterator + Send, + I::IntoIter: Send, + { + let mut iter = requests.into_iter(); + while let Some(msg) = iter.next() { + if let Err(SendError(failed)) = self.send(stream_id, msg).await { + let mut undelivered = vec![failed]; + undelivered.extend(iter); + return Err(SendError(undelivered)); + } + } + Ok(()) } - fn send_requests(&mut self, stream_id: &Uuid, requests: I) -> Result<()> + fn try_send_requests( + &mut self, + stream_id: &Uuid, + requests: I, + ) -> std::result::Result<(), TrySendError>> where I: IntoIterator + Send, I::IntoIter: Send, { - for req in requests { - self.send(stream_id, req)?; + let mut iter = requests.into_iter(); + while let Some(msg) = iter.next() { + match self.try_send(stream_id, msg) { + Ok(()) => {} + Err(TrySendError::Full(failed)) => { + let mut undelivered = vec![failed]; + undelivered.extend(iter); + return Err(TrySendError::Full(undelivered)); + } + Err(TrySendError::Closed(failed)) => { + let mut undelivered = vec![failed]; + undelivered.extend(iter); + return Err(TrySendError::Closed(undelivered)); + } + } } Ok(()) } diff --git a/rust/crates/sift_stream/src/stream/send_error.rs b/rust/crates/sift_stream/src/stream/send_error.rs index 5ce317414..9fe65171b 100644 --- a/rust/crates/sift_stream/src/stream/send_error.rs +++ b/rust/crates/sift_stream/src/stream/send_error.rs @@ -1,3 +1,4 @@ +use sift_error::prelude::{Error as SiftError, ErrorKind}; use std::fmt; /// Returned by the async `Transport::send` / `Transport::send_requests` when @@ -61,6 +62,68 @@ impl fmt::Display for TrySendError { impl std::error::Error for TrySendError {} +/// Returned by [`SiftStream::send`] and [`SiftStream::try_send`] when encoding fails. +/// +/// Wraps either a [`sift_error::Error`] from the encode step or the undelivered +/// message when the backing channel is closed. +#[derive(Debug)] +pub enum SiftStreamSendError { + /// The message could not be encoded before it was sent. + EncodeError(SiftError), + /// The channel closed before the message could be delivered. + ChannelClosed(T), +} + +impl fmt::Display for SiftStreamSendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SiftStreamSendError::EncodeError(e) => write!(f, "encode error: {e}"), + SiftStreamSendError::ChannelClosed(_) => { + write!(f, "channel closed: failed to send message") + } + } + } +} + +impl std::error::Error for SiftStreamSendError {} + +impl SiftStreamSendError { + /// Convert an encode failure into this error type. Used internally. + pub(crate) fn encode_error(msg: &str) -> Self { + SiftStreamSendError::EncodeError(SiftError::new_msg(ErrorKind::EncodeMessageError, msg)) + } +} + +/// Returned by [`SiftStream::try_send`] when immediate delivery fails. +/// +/// Wraps either a [`sift_error::Error`] from the encode step or a +/// [`TrySendError`] from the backing channel. +#[derive(Debug)] +pub enum SiftStreamTrySendError { + /// The message could not be encoded before it was sent. + EncodeError(SiftError), + /// The backing channel was full or closed. + Channel(TrySendError), +} + +impl fmt::Display for SiftStreamTrySendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SiftStreamTrySendError::EncodeError(e) => write!(f, "encode error: {e}"), + SiftStreamTrySendError::Channel(e) => write!(f, "{e}"), + } + } +} + +impl std::error::Error for SiftStreamTrySendError {} + +impl SiftStreamTrySendError { + /// Convert an encode failure into this error type. Used internally. + pub(crate) fn encode_error(msg: &str) -> Self { + SiftStreamTrySendError::EncodeError(SiftError::new_msg(ErrorKind::EncodeMessageError, msg)) + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/rust/crates/sift_stream/src/stream/tasks.rs b/rust/crates/sift_stream/src/stream/tasks.rs index 64fd0d636..eda69919a 100644 --- a/rust/crates/sift_stream/src/stream/tasks.rs +++ b/rust/crates/sift_stream/src/stream/tasks.rs @@ -609,7 +609,9 @@ impl MetricsStreamingTask { let metrics = self.metrics.snapshot(); let values = metrics.channel_values(&self.session_name); let flow = Flow::new(METRICS_STREAMING_FLOW_NAME, TimeValue::now(), &values); - self.stream.send(flow).await?; + self.stream.send(flow).await.map_err(|e| { + Error::new_msg(ErrorKind::StreamError, e.to_string()) + })?; } ctrl_msg = self.control_rx.recv() => { match ctrl_msg { From 765c377cc83a231788696a9f47b3737f32587be3 Mon Sep 17 00:00:00 2001 From: Taylor Jones Date: Mon, 30 Mar 2026 12:13:49 -0500 Subject: [PATCH 3/9] Fix various stale comments/references --- rust/crates/sift_stream/examples/backups-only/main.rs | 4 ++-- rust/crates/sift_stream/examples/ingestion-tutorial/main.rs | 2 +- rust/crates/sift_stream/examples/quick-start/main.rs | 4 ++-- rust/crates/sift_stream/src/stream/test.rs | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/rust/crates/sift_stream/examples/backups-only/main.rs b/rust/crates/sift_stream/examples/backups-only/main.rs index ba7ee3cbd..78a72a0b3 100644 --- a/rust/crates/sift_stream/examples/backups-only/main.rs +++ b/rust/crates/sift_stream/examples/backups-only/main.rs @@ -102,7 +102,7 @@ async fn run() -> Result<(), Box> { tokio::time::sleep(Duration::from_millis(100)).await; } - // Next, stream telemetry to backup files using the [`SiftStream::send_requests_nonblocking`] method + // Next, stream telemetry to backup files using the [`SiftStream::try_send_requests`] method // and the [`FlowBuilder`] to build the flow. // // This approach is more performant, and also provides methods to set the channel value via @@ -116,7 +116,7 @@ async fn run() -> Result<(), Box> { let run_id = sift_stream.run().unwrap().run_id.clone(); for i in 0..360 { // Build the flow using the [`FlowBuilder`] and send it to - // Sift using the [`SiftStream::send_requests_nonblocking`] method. + // Sift using the [`SiftStream::try_send_requests`] method. let mut flow_builder = FlowBuilder::new(&descriptor); flow_builder.attach_run_id(&run_id); flow_builder diff --git a/rust/crates/sift_stream/examples/ingestion-tutorial/main.rs b/rust/crates/sift_stream/examples/ingestion-tutorial/main.rs index 3599e823e..0cc3f4fc0 100644 --- a/rust/crates/sift_stream/examples/ingestion-tutorial/main.rs +++ b/rust/crates/sift_stream/examples/ingestion-tutorial/main.rs @@ -148,7 +148,7 @@ async fn main() -> Result<(), Box> { // NOTE: This approach uses `Flow` and `SiftStream::send()` for ease of use, and will // provide acceptable performance for most users // In cases where additional performance is required, a separate, more performant method - // is also available that uses `FlowBuilder` and `SiftStream::send_requests_nonblocking` + // is also available that uses `FlowBuilder` and `SiftStream::try_send_requests` // See `examples/quick-start/` for an example using this alternate approach let start = std::time::Instant::now(); while start.elapsed() < INGEST_DURATION { diff --git a/rust/crates/sift_stream/examples/quick-start/main.rs b/rust/crates/sift_stream/examples/quick-start/main.rs index ff60597b3..b45c26b5b 100644 --- a/rust/crates/sift_stream/examples/quick-start/main.rs +++ b/rust/crates/sift_stream/examples/quick-start/main.rs @@ -93,7 +93,7 @@ async fn run() -> Result<(), Box> { tokio::time::sleep(Duration::from_millis(100)).await; } - // Next, stream telemetry to Sift using the [`SiftStream::send_requests_nonblocking`] method + // Next, stream telemetry to Sift using the [`SiftStream::try_send_requests`] method // and the [`FlowBuilder`] to build the flow. // // This approach is more performant, and also provides methods to set the channel value via @@ -107,7 +107,7 @@ async fn run() -> Result<(), Box> { let run_id = sift_stream.run().unwrap().run_id.clone(); for i in 0..360 { // Build the flow using the [`FlowBuilder`] and send it to - // Sift using the [`SiftStream::send_requests_nonblocking`] method. + // Sift using the [`SiftStream::try_send_requests`] method. let mut flow_builder = FlowBuilder::new(&descriptor); flow_builder.attach_run_id(&run_id); flow_builder diff --git a/rust/crates/sift_stream/src/stream/test.rs b/rust/crates/sift_stream/src/stream/test.rs index 8ceab97a0..841666917 100644 --- a/rust/crates/sift_stream/src/stream/test.rs +++ b/rust/crates/sift_stream/src/stream/test.rs @@ -449,7 +449,7 @@ async fn test_sift_stream_ingestion_and_backup_channels_fill_up() { // Send a burst of messages that will cause the ingestion and backup channels to fill up. // - // Since this test is running in single-threded mode, and `send_requests_nonblocking` is not async, + // Since this test is running in single-threaded mode, and `try_send_requests` is not async, // sending all the messages should occur before the background tasks have a chance to run // and create space. for data in 0..100 { From e46cb8594b64a954c696c6cb4d386ebb9dcf9a31 Mon Sep 17 00:00:00 2001 From: Taylor Jones Date: Mon, 30 Mar 2026 13:41:06 -0500 Subject: [PATCH 4/9] More tests --- .../src/stream/mode/file_backup.rs | 151 ++++++++++++++++++ .../src/stream/mode/ingestion_config.rs | 146 +++++++++++++++++ .../sift_stream/src/stream/send_error.rs | 64 ++++++++ rust/crates/sift_stream/src/stream/test.rs | 52 ++++++ 4 files changed, 413 insertions(+) diff --git a/rust/crates/sift_stream/src/stream/mode/file_backup.rs b/rust/crates/sift_stream/src/stream/mode/file_backup.rs index 43a4fbc88..556f3063f 100644 --- a/rust/crates/sift_stream/src/stream/mode/file_backup.rs +++ b/rust/crates/sift_stream/src/stream/mode/file_backup.rs @@ -1057,4 +1057,155 @@ mod tests { // Finish should succeed and flush data stream.finish().await.unwrap(); } + + // --- Transport send/try_send error-path tests --- + + /// Build a minimal FileBackup backed by a controlled channel. + /// The spawned write task simply drains messages into a black hole so it + /// never blocks, but the *caller* also gets the Receiver so they can + /// withhold reads when they need the channel to appear full or closed. + fn make_file_backup_with_capacity( + cap: usize, + ) -> ( + FileBackup, + async_channel::Receiver>, + ) { + let (write_tx, write_rx) = async_channel::bounded(cap); + let (control_tx, _) = tokio::sync::broadcast::channel(10); + + let fb = FileBackup { + write_tx, + write_task: tokio::spawn(async { Ok(()) }), + control_tx, + metrics_streaming: None, + flows_seen: HashSet::new(), + metrics: Arc::new(SiftStreamMetrics::default()), + }; + + (fb, write_rx) + } + + #[tokio::test] + async fn test_file_backup_try_send_full_returns_full() { + let (mut fb, write_rx) = make_file_backup_with_capacity(1); + let stream_id = Uuid::new_v4(); + let ingestion_config_id = Uuid::new_v4().to_string(); + + // First send fills the channel. + fb.try_send( + &stream_id, + create_test_request("flow1", &ingestion_config_id), + ) + .unwrap(); + + // Second send should fail with Full because the channel is now at capacity. + let req = create_test_request("flow2", &ingestion_config_id); + let flow = req.flow.clone(); + let err = fb.try_send(&stream_id, req).unwrap_err(); + assert!(err.is_full(), "expected Full, got {err}"); + assert_eq!(err.into_inner().flow, flow); + + drop(write_rx); + } + + #[tokio::test] + async fn test_file_backup_try_send_closed_returns_closed() { + let (mut fb, write_rx) = make_file_backup_with_capacity(10); + let stream_id = Uuid::new_v4(); + let ingestion_config_id = Uuid::new_v4().to_string(); + + // Closing the receiver makes every subsequent try_send return Closed. + drop(write_rx); + + let req = create_test_request("flow1", &ingestion_config_id); + let flow = req.flow.clone(); + let err = fb.try_send(&stream_id, req).unwrap_err(); + assert!(err.is_closed(), "expected Closed, got {err}"); + assert_eq!(err.into_inner().flow, flow); + } + + #[tokio::test] + async fn test_file_backup_send_closed_returns_send_error() { + let (mut fb, write_rx) = make_file_backup_with_capacity(10); + let stream_id = Uuid::new_v4(); + let ingestion_config_id = Uuid::new_v4().to_string(); + + drop(write_rx); + + let req = create_test_request("flow1", &ingestion_config_id); + let flow = req.flow.clone(); + let err = fb.send(&stream_id, req).await.unwrap_err(); + assert_eq!(err.into_inner().flow, flow); + } + + #[tokio::test] + async fn test_file_backup_send_blocks_until_space_available() { + let (mut fb, write_rx) = make_file_backup_with_capacity(1); + let stream_id = Uuid::new_v4(); + let ingestion_config_id = Uuid::new_v4().to_string(); + + // Fill the channel so the next async send has to wait. + fb.try_send( + &stream_id, + create_test_request("flow1", &ingestion_config_id), + ) + .unwrap(); + + // Consumer reads after a short delay, freeing space for the blocked send. + tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + let _ = write_rx.recv().await; + // Keep the receiver alive so the channel doesn't close. + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + }); + + fb.send( + &stream_id, + create_test_request("flow2", &ingestion_config_id), + ) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_file_backup_try_send_requests_returns_undelivered_on_full() { + let (mut fb, write_rx) = make_file_backup_with_capacity(1); + let stream_id = Uuid::new_v4(); + let ingestion_config_id = Uuid::new_v4().to_string(); + + // Fill the channel. + fb.try_send( + &stream_id, + create_test_request("flow0", &ingestion_config_id), + ) + .unwrap(); + + let reqs = vec![ + create_test_request("flow1", &ingestion_config_id), + create_test_request("flow2", &ingestion_config_id), + create_test_request("flow3", &ingestion_config_id), + ]; + let err = fb.try_send_requests(&stream_id, reqs).unwrap_err(); + assert!(err.is_full(), "expected Full, got {err}"); + assert_eq!(err.into_inner().len(), 3); + + drop(write_rx); + } + + #[tokio::test] + async fn test_file_backup_send_requests_returns_undelivered_on_closed() { + let (mut fb, write_rx) = make_file_backup_with_capacity(10); + let stream_id = Uuid::new_v4(); + let ingestion_config_id = Uuid::new_v4().to_string(); + + drop(write_rx); + + let reqs = vec![ + create_test_request("flow1", &ingestion_config_id), + create_test_request("flow2", &ingestion_config_id), + create_test_request("flow3", &ingestion_config_id), + ]; + let err = fb.send_requests(&stream_id, reqs).await.unwrap_err(); + assert_eq!(err.into_inner().len(), 3); + } } diff --git a/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs b/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs index f06ce0ac4..dc96d80ce 100644 --- a/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs +++ b/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs @@ -620,6 +620,152 @@ impl DataStream { } } +#[cfg(test)] +mod tests { + use super::*; + use crate::stream::tasks::DataMessage; + use std::collections::HashSet; + + fn make_request() -> IngestWithConfigDataStreamRequest { + IngestWithConfigDataStreamRequest { + ingestion_config_id: uuid::Uuid::new_v4().to_string(), + flow: "test_flow".to_string(), + timestamp: None, + channel_values: vec![], + run_id: String::new(), + end_stream_on_validation_error: false, + organization_id: String::new(), + } + } + + fn make_live_streaming( + ingestion_capacity: usize, + backup_capacity: usize, + ) -> ( + LiveStreaming, + async_channel::Receiver, + async_channel::Receiver, + ) { + let (control_tx, _) = broadcast::channel(10); + let (ingestion_tx, ingestion_rx) = async_channel::bounded(ingestion_capacity); + let (backup_tx, backup_rx) = async_channel::bounded(backup_capacity); + + let system = StreamSystem { + backup_manager: tokio::spawn(async { Ok(()) }), + ingestion: tokio::spawn(async { Ok(()) }), + reingestion: tokio::spawn(async { Ok(()) }), + metrics_streaming: None, + control_tx, + ingestion_tx, + backup_tx, + }; + + let live = LiveStreaming { + message_id_counter: 0, + stream_system: system, + flows_seen: HashSet::new(), + metrics: Arc::new(crate::metrics::SiftStreamMetrics::default()), + }; + + (live, ingestion_rx, backup_rx) + } + + #[tokio::test] + async fn test_try_send_backup_closed_returns_closed() { + let (mut live, _ingestion_rx, backup_rx) = make_live_streaming(10, 10); + drop(backup_rx); + let stream_id = uuid::Uuid::new_v4(); + let req = make_request(); + let flow = req.flow.clone(); + let err = live.try_send(&stream_id, req).unwrap_err(); + assert!(err.is_closed(), "expected Closed, got {err}"); + assert_eq!(err.into_inner().flow, flow); + } + + #[tokio::test] + async fn test_try_send_backup_full_returns_full() { + let (mut live, _ingestion_rx, backup_rx) = make_live_streaming(10, 1); + // Pre-fill the backup channel so the next try_send finds it full. + let dummy = DataMessage { + message_id: 0, + request: Arc::new(make_request()), + dropped_for_ingestion: false, + }; + live.stream_system.backup_tx.try_send(dummy).unwrap(); + + let stream_id = uuid::Uuid::new_v4(); + let req = make_request(); + let flow = req.flow.clone(); + let err = live.try_send(&stream_id, req).unwrap_err(); + assert!(err.is_full(), "expected Full, got {err}"); + assert_eq!(err.into_inner().flow, flow); + drop(backup_rx); + } + + #[tokio::test] + async fn test_send_backup_closed_returns_send_error() { + let (mut live, _ingestion_rx, backup_rx) = make_live_streaming(10, 10); + drop(backup_rx); + let stream_id = uuid::Uuid::new_v4(); + let req = make_request(); + let flow = req.flow.clone(); + let err = live.send(&stream_id, req).await.unwrap_err(); + assert_eq!(err.into_inner().flow, flow); + } + + #[tokio::test] + async fn test_send_blocks_until_backup_space_available() { + let (mut live, _ingestion_rx, backup_rx) = make_live_streaming(10, 1); + // Fill the backup channel so send will have to wait. + let dummy = DataMessage { + message_id: 0, + request: Arc::new(make_request()), + dropped_for_ingestion: false, + }; + live.stream_system.backup_tx.try_send(dummy).unwrap(); + + // After a short delay, consume the dummy so send can proceed. + tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + let _ = backup_rx.recv().await; + // Keep the receiver alive so the channel stays open. + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + }); + + let stream_id = uuid::Uuid::new_v4(); + live.send(&stream_id, make_request()).await.unwrap(); + } + + #[tokio::test] + async fn test_try_send_requests_returns_undelivered_on_full() { + let (mut live, _ingestion_rx, backup_rx) = make_live_streaming(10, 1); + let dummy = DataMessage { + message_id: 0, + request: Arc::new(make_request()), + dropped_for_ingestion: false, + }; + live.stream_system.backup_tx.try_send(dummy).unwrap(); + + let stream_id = uuid::Uuid::new_v4(); + let reqs = vec![make_request(), make_request(), make_request()]; + let err = live.try_send_requests(&stream_id, reqs).unwrap_err(); + assert!(err.is_full(), "expected Full, got {err}"); + assert_eq!(err.into_inner().len(), 3); + drop(backup_rx); + } + + #[tokio::test] + async fn test_send_requests_returns_undelivered_on_closed() { + let (mut live, _ingestion_rx, backup_rx) = make_live_streaming(10, 10); + drop(backup_rx); + + let stream_id = uuid::Uuid::new_v4(); + let reqs = vec![make_request(), make_request(), make_request()]; + let err = live.send_requests(&stream_id, reqs).await.unwrap_err(); + assert_eq!(err.into_inner().len(), 3); + } +} + impl Stream for DataStream { type Item = IngestWithConfigDataStreamRequest; diff --git a/rust/crates/sift_stream/src/stream/send_error.rs b/rust/crates/sift_stream/src/stream/send_error.rs index 9fe65171b..f56651f30 100644 --- a/rust/crates/sift_stream/src/stream/send_error.rs +++ b/rust/crates/sift_stream/src/stream/send_error.rs @@ -199,4 +199,68 @@ mod tests { assert!(format!("{:?}", closed).contains("Closed")); assert!(format!("{:?}", full).contains("Full")); } + + // SiftStreamSendError tests + + #[test] + fn sift_stream_send_error_encode_error_display() { + let err = SiftStreamSendError::::encode_error("bad encoding"); + assert!(err.to_string().contains("encode error")); + } + + #[test] + fn sift_stream_send_error_channel_closed_display() { + let err = SiftStreamSendError::ChannelClosed(42u32); + assert!(err.to_string().contains("channel closed")); + } + + #[test] + fn sift_stream_send_error_is_error() { + fn assert_error(_: &E) {} + let err = SiftStreamSendError::ChannelClosed(0u8); + assert_error(&err); + } + + #[test] + fn sift_stream_send_error_debug() { + let err = SiftStreamSendError::ChannelClosed(42u32); + assert!(format!("{:?}", err).contains("ChannelClosed")); + let err2 = SiftStreamSendError::::encode_error("oops"); + assert!(format!("{:?}", err2).contains("EncodeError")); + } + + // SiftStreamTrySendError tests + + #[test] + fn sift_stream_try_send_error_encode_error_display() { + let err = SiftStreamTrySendError::::encode_error("bad"); + assert!(err.to_string().contains("encode error")); + } + + #[test] + fn sift_stream_try_send_error_channel_full_display() { + let err = SiftStreamTrySendError::Channel(TrySendError::Full(42u32)); + assert!(err.to_string().contains("channel full")); + } + + #[test] + fn sift_stream_try_send_error_channel_closed_display() { + let err = SiftStreamTrySendError::Channel(TrySendError::Closed(42u32)); + assert!(err.to_string().contains("channel closed")); + } + + #[test] + fn sift_stream_try_send_error_is_error() { + fn assert_error(_: &E) {} + let err = SiftStreamTrySendError::Channel(TrySendError::Closed(0u8)); + assert_error(&err); + } + + #[test] + fn sift_stream_try_send_error_debug() { + let err = SiftStreamTrySendError::Channel(TrySendError::Full(42u32)); + assert!(format!("{:?}", err).contains("Full")); + let err2 = SiftStreamTrySendError::::encode_error("oops"); + assert!(format!("{:?}", err2).contains("EncodeError")); + } } diff --git a/rust/crates/sift_stream/src/stream/test.rs b/rust/crates/sift_stream/src/stream/test.rs index 841666917..d5666c8e3 100644 --- a/rust/crates/sift_stream/src/stream/test.rs +++ b/rust/crates/sift_stream/src/stream/test.rs @@ -401,6 +401,58 @@ async fn test_sift_stream_builder_load_ingestion_config_with_new_flows() { assert_eq!(flows.len(), 2); } +#[tokio::test] +async fn test_sift_stream_try_send_smoke() { + let backups_dir = uuid::Uuid::new_v4().to_string(); + let tmp_dir = TempDir::new(&backups_dir).expect("failed to create tempdir"); + let tmp_dir_path = tmp_dir.path(); + + let ingestion_config = IngestionConfigForm { + asset_name: "test_asset".to_string(), + client_key: "test_client_key_try_send".to_string(), + flows: vec![crate::FlowConfig { + name: "try_send_flow".to_string(), + channels: vec![crate::ChannelConfig { + name: "value".to_string(), + data_type: sift_rs::common::r#type::v1::ChannelDataType::Double.into(), + ..Default::default() + }], + }], + }; + + let disk_backup_policy = DiskBackupPolicy { + backups_dir: Some(tmp_dir_path.to_path_buf()), + ..Default::default() + }; + let retry_policy = crate::RetryPolicy::default(); + let (grpc_channel, _mock_service) = crate::test::create_mock_grpc_channel_with_service().await; + + let mut sift_stream = SiftStreamBuilder::from_channel(grpc_channel) + .ingestion_config(ingestion_config) + .recovery_strategy(RecoveryStrategy::RetryWithBackups { + retry_policy, + disk_backup_policy, + }) + .metrics_streaming_interval(None) + .build() + .await + .expect("failed to build sift stream"); + + // try_send should succeed when the channel has room. + let result = sift_stream.try_send(crate::Flow::new( + "try_send_flow", + crate::TimeValue::now(), + &[crate::ChannelValue::new("value", 1.0_f64)], + )); + assert!(result.is_ok(), "try_send failed: {result:?}"); + + tokio::time::timeout(Duration::from_secs(10), async { + sift_stream.finish().await.expect("failed to finish"); + }) + .await + .expect("timeout waiting for finish"); +} + #[tokio::test(flavor = "current_thread")] async fn test_sift_stream_ingestion_and_backup_channels_fill_up() { let backups_dir = uuid::Uuid::new_v4().to_string(); From ea584eb02625ba4314ffbba1896eb3aa8221a846 Mon Sep 17 00:00:00 2001 From: Taylor Jones Date: Mon, 30 Mar 2026 13:53:30 -0500 Subject: [PATCH 5/9] Docs and lints --- .../src/backup/disk/async_manager.rs | 2 +- .../src/backup/disk/file_writer.rs | 10 +- rust/crates/sift_stream/src/lib.rs | 81 ++++++++++ .../src/stream/builder/config_loader.rs | 2 - rust/crates/sift_stream/src/stream/mod.rs | 150 +++++++++++++++--- .../src/stream/mode/ingestion_config.rs | 96 +++++------ .../sift_stream/src/stream/mode/test.rs | 3 - .../sift_stream/src/stream/send_error.rs | 69 ++++++-- rust/crates/sift_stream/src/test.rs | 2 +- rust/crates/sift_stream/tests/common/mod.rs | 2 +- ...test_ingestion_config_streaming_retries.rs | 2 +- 11 files changed, 319 insertions(+), 100 deletions(-) diff --git a/rust/crates/sift_stream/src/backup/disk/async_manager.rs b/rust/crates/sift_stream/src/backup/disk/async_manager.rs index 031d65e58..e7442a05f 100644 --- a/rust/crates/sift_stream/src/backup/disk/async_manager.rs +++ b/rust/crates/sift_stream/src/backup/disk/async_manager.rs @@ -1122,7 +1122,7 @@ mod test { // The backup file should have been rotated. assert!( - backup_manager.file_ctx_buffer.len() > 0, + !backup_manager.file_ctx_buffer.is_empty(), "backup files should be present" ); assert!( diff --git a/rust/crates/sift_stream/src/backup/disk/file_writer.rs b/rust/crates/sift_stream/src/backup/disk/file_writer.rs index 981fe4158..094e66c3a 100644 --- a/rust/crates/sift_stream/src/backup/disk/file_writer.rs +++ b/rust/crates/sift_stream/src/backup/disk/file_writer.rs @@ -452,16 +452,16 @@ mod tests { } } - if writer.should_rotate_file() { - if let Some(ctx) = writer.rotate_file().await.expect("failed to rotate file") { - rotated_files.push(ctx.file_path); - } + if writer.should_rotate_file() + && let Some(ctx) = writer.rotate_file().await.expect("failed to rotate file") + { + rotated_files.push(ctx.file_path); } } // Should have created multiple files if !rotated_files.is_empty() { - assert!(rotated_files.len() > 0); + assert!(!rotated_files.is_empty()); for file_path in &rotated_files { assert!(file_path.exists()); } diff --git a/rust/crates/sift_stream/src/lib.rs b/rust/crates/sift_stream/src/lib.rs index 73d13ac71..d4ff10411 100644 --- a/rust/crates/sift_stream/src/lib.rs +++ b/rust/crates/sift_stream/src/lib.rs @@ -237,6 +237,87 @@ //! //! Anything that falls outside of that will require changing the client-key. //! +//! ## Sending Telemetry +//! +//! [`SiftStream`] exposes four methods for delivering telemetry. They differ only in whether +//! they apply backpressure (blocking) or return immediately (non-blocking), and in whether +//! they accept a high-level [`Flow`] or pre-encoded raw requests: +//! +//! | Method | Blocks? | Input | +//! |---|---|---| +//! | [`send`](stream::SiftStream::send) | Yes | [`Flow`] or any [`Encodeable`](stream::Encodeable) | +//! | [`send_requests`](stream::SiftStream::send_requests) | Yes | Pre-encoded requests | +//! | [`try_send`](stream::SiftStream::try_send) | No | [`Flow`] or any [`Encodeable`](stream::Encodeable) | +//! | [`try_send_requests`](stream::SiftStream::try_send_requests) | No | Pre-encoded requests | +//! +//! ### Backpressure with `send` +//! +//! [`send`](stream::SiftStream::send) awaits until the backing channel has capacity, then +//! delivers the message. Use this when you want the producer to slow down naturally when +//! the pipeline is under load — the simplest and most common choice. +//! +//! ```ignore +//! // Awaits until the channel has room; backpressure is applied automatically. +//! sift_stream.send(Flow::new( +//! "robotic-arm", +//! TimeValue::now(), +//! &[ChannelValue::new("joint-angle-encoder", 7.2_f64)], +//! )).await?; +//! ``` +//! +//! On error, [`SiftStreamSendError`] is returned. Call `into_inner()` on the +//! [`ChannelClosed`](stream::SiftStreamSendError::ChannelClosed) variant to recover the +//! undelivered message. +//! +//! ### Non-blocking sends with `try_send` +//! +//! [`try_send`](stream::SiftStream::try_send) returns immediately regardless of channel +//! state. If the channel is full it returns [`TrySendError::Full`] with the message; if +//! the channel is closed it returns [`TrySendError::Closed`]. Use this in tight loops or +//! real-time contexts where blocking even briefly is unacceptable. +//! +//! ```ignore +//! match sift_stream.try_send(Flow::new( +//! "robotic-arm", +//! TimeValue::now(), +//! &[ChannelValue::new("joint-angle-encoder", 7.2_f64)], +//! )) { +//! Ok(()) => {} +//! Err(SiftStreamTrySendError::Channel(TrySendError::Full(msg))) => { +//! // Channel is busy — drop this sample or buffer it for later. +//! drop(msg); +//! } +//! Err(e) => return Err(e.into()), +//! } +//! ``` +//! +//! ### Pre-encoded batch sends +//! +//! [`send_requests`](stream::SiftStream::send_requests) and +//! [`try_send_requests`](stream::SiftStream::try_send_requests) accept pre-encoded +//! [`IngestWithConfigDataStreamRequest`](sift_rs::ingest::v1::IngestWithConfigDataStreamRequest) +//! values built with [`FlowBuilder`]. This skips the per-call encoding step and is the +//! highest-throughput option. +//! +//! ```ignore +//! let descriptor = sift_stream.get_flow_descriptor("robotic-arm").unwrap(); +//! let run_id = sift_stream.run().unwrap().run_id.clone(); +//! +//! let mut builder = FlowBuilder::new(&descriptor); +//! builder.attach_run_id(&run_id); +//! builder.set_with_key("joint-angle-encoder", 7.2_f64).unwrap(); +//! +//! // Blocking batch send with backpressure: +//! sift_stream.send_requests(vec![builder.request(TimeValue::now())]).await?; +//! +//! // Non-blocking batch send: +//! sift_stream.try_send_requests(vec![builder.request(TimeValue::now())])?; +//! ``` +//! +//! On the first failure, `send_requests` / `try_send_requests` stop iterating and return +//! **all** undelivered messages — the failing one plus any not yet attempted — inside the +//! error so nothing is silently dropped. +//! //! ## Retry Policy //! //! At the time of writing this crate, [tonic](https://docs.rs/tonic/latest/tonic/) diff --git a/rust/crates/sift_stream/src/stream/builder/config_loader.rs b/rust/crates/sift_stream/src/stream/builder/config_loader.rs index 0908b49e3..0e3f3514a 100644 --- a/rust/crates/sift_stream/src/stream/builder/config_loader.rs +++ b/rust/crates/sift_stream/src/stream/builder/config_loader.rs @@ -253,7 +253,6 @@ mod tests { ..Default::default() }, ], - ..Default::default() } } @@ -373,7 +372,6 @@ mod tests { data_type: ChannelDataType::Double.into(), ..Default::default() }], - ..Default::default() }]; let form = create_test_ingestion_config_form(asset_name, client_key, existing_flows); diff --git a/rust/crates/sift_stream/src/stream/mod.rs b/rust/crates/sift_stream/src/stream/mod.rs index 5660b2615..3b6a45542 100644 --- a/rust/crates/sift_stream/src/stream/mod.rs +++ b/rust/crates/sift_stream/src/stream/mod.rs @@ -69,12 +69,40 @@ pub trait Encoder: private::Sealed { type Message: Send + Sync; } -/// A trait that defines how data is transmitted, or streamed. +/// Defines how encoded telemetry messages are delivered to their destination. /// -/// For example, a live streaming implementation might use a -/// gRPC stream to transmit data in real-time to Sift, while -/// an alternative implementation might write data to a file -/// for a more "offline" use-case. +/// Two concrete implementations are provided: +/// +/// - [`LiveStreaming`](crate::LiveStreaming) — delivers messages via a gRPC stream to Sift in +/// real-time, with optional disk backups and periodic checkpointing. +/// - [`FileBackup`](crate::FileBackup) — writes messages to rolling disk backup files without +/// connecting to Sift. +/// +/// ## Send API +/// +/// Each implementation exposes four send methods that differ in their backpressure behaviour: +/// +/// | Method | Blocks? | Error on failure | +/// |---|---|---| +/// | [`send`](Transport::send) | Yes — awaits until the channel has capacity | [`SendError`] with the undelivered message | +/// | [`send_requests`](Transport::send_requests) | Yes — per-message backpressure | [`SendError>`] with all undelivered messages | +/// | [`try_send`](Transport::try_send) | No — returns immediately | [`TrySendError`] as `Full(T)` or `Closed(T)` | +/// | [`try_send_requests`](Transport::try_send_requests) | No — fails on first undeliverable message | [`TrySendError>`] with all undelivered | +/// +/// In every failure case the undelivered message(s) are returned inside the error variant so +/// that the caller can decide whether to retry, log, buffer locally, or discard them. +/// +/// ## Channel semantics for `LiveStreaming` +/// +/// `LiveStreaming` maintains two internal bounded channels: +/// +/// - **backup channel** — the primary durability path. All four send methods operate on this +/// channel first. Errors reported by the send API always reflect the state of this channel. +/// - **ingestion channel** — forwards messages to the gRPC task. This channel uses a +/// *force-send* strategy: if it is full, the oldest in-flight message is evicted to make +/// room. It never blocks and never returns `Full`. +/// +/// This trait is sealed: only implementations within this crate are permitted. #[async_trait] pub trait Transport: private::Sealed { type Message: Send + Sync; @@ -82,8 +110,12 @@ pub trait Transport: private::Sealed { /// Send a single message with backpressure. /// - /// Awaits until the channel has capacity. Returns [`SendError`] - /// containing the undelivered message if the channel is closed. + /// Awaits until the backing channel has capacity, then delivers the message. + /// + /// # Errors + /// + /// Returns [`SendError`] containing the undelivered message if the + /// backing channel is closed before delivery completes. async fn send( &mut self, stream_id: &Uuid, @@ -92,9 +124,13 @@ pub trait Transport: private::Sealed { /// Send a batch of messages with backpressure. /// - /// Awaits for each message in turn. Returns [`SendError>`] - /// containing all undelivered messages (the failed one plus any not yet attempted) - /// if the channel is closed mid-iteration. + /// Awaits channel capacity for each message in turn. Stops on the first failure and + /// returns the failed message together with all remaining (not-yet-attempted) messages. + /// + /// # Errors + /// + /// Returns [`SendError>`] containing every message that was not + /// delivered: the failing message followed by any that had not yet been attempted. async fn send_requests( &mut self, stream_id: &Uuid, @@ -106,8 +142,14 @@ pub trait Transport: private::Sealed { /// Attempt to send a single message without blocking. /// - /// Returns [`TrySendError`] with the undelivered message if - /// the channel is full or closed. + /// Returns immediately regardless of whether the channel has capacity. + /// + /// # Errors + /// + /// Returns [`TrySendError`] with the undelivered message: + /// - [`TrySendError::Full`] — the channel is at capacity; consider retrying with + /// [`send`](Transport::send) to apply backpressure instead. + /// - [`TrySendError::Closed`] — the channel has been closed. fn try_send( &mut self, stream_id: &Uuid, @@ -116,8 +158,15 @@ pub trait Transport: private::Sealed { /// Attempt to send a batch of messages without blocking. /// - /// Returns [`TrySendError>`] with all undelivered messages - /// (the failed one plus any not yet attempted) on first failure. + /// Calls [`try_send`](Transport::try_send) for each message in turn. Returns immediately + /// on the first failure, bundling the failed message with any remaining unprocessed + /// messages. + /// + /// # Errors + /// + /// Returns [`TrySendError>`] with every message that was not delivered: + /// - [`TrySendError::Full`] — the channel was at capacity for one of the messages. + /// - [`TrySendError::Closed`] — the channel was closed. fn try_send_requests( &mut self, stream_id: &Uuid, @@ -127,7 +176,10 @@ pub trait Transport: private::Sealed { I: IntoIterator + Send, I::IntoIter: Send; - /// Finish the stream. The mode implementation handles the actual cleanup logic. + /// Flush any remaining messages and cleanly shut down the transport. + /// + /// Must be called when ingestion is complete. Dropping a [`SiftStream`] without + /// calling `finish` may result in tail-end data not reaching Sift. async fn finish(self, stream_id: &Uuid) -> Result<()>; } @@ -185,11 +237,26 @@ where self.run.as_ref() } - /// Send telemetry with backpressure. Awaits until the channel has capacity. + /// Send telemetry with backpressure. + /// + /// Encodes `message` and then awaits until the backing channel has capacity. For + /// [`LiveStreaming`](crate::LiveStreaming) this is the **backup channel**; for + /// [`FileBackup`](crate::FileBackup) this is the write channel. + /// + /// Use this method when you want the caller to slow down naturally when the pipeline + /// is under load. For a non-blocking alternative see [`try_send`](SiftStream::try_send). /// - /// Returns [`SiftStreamSendError::EncodeError`] if the message cannot be encoded, or - /// [`SiftStreamSendError::ChannelClosed`] (with the undelivered message) if the - /// backing channel closes before delivery completes. + /// # Errors + /// + /// - [`SiftStreamSendError::EncodeError`] — the message could not be encoded. This + /// indicates a schema mismatch or invalid value and is not recoverable by retrying. + /// - [`SiftStreamSendError::ChannelClosed`] — the backing channel was closed before the + /// message could be delivered. The undelivered message is returned inside the variant. + /// + /// # Cancellation safety + /// + /// If the returned future is dropped while waiting for channel capacity, no message is + /// lost — either the send completed before the drop, or the channel slot was never taken. pub async fn send( &mut self, message: M, @@ -209,8 +276,17 @@ where /// Send a batch of pre-encoded requests with backpressure. /// - /// Awaits for each message in turn. Returns [`SendError`] containing all - /// undelivered messages if the channel closes mid-iteration. + /// Awaits channel capacity for each request in turn. Stops on the first failure and + /// returns all undelivered messages (the failing one plus any not yet attempted). + /// + /// Unlike [`send`](SiftStream::send), this method accepts pre-encoded + /// [`Transport::Message`](crate::stream::Transport::Message) values directly, bypassing + /// the encode step. Use [`FlowBuilder`](crate::FlowBuilder) to construct them for maximum + /// performance. + /// + /// # Errors + /// + /// [`SendError>`] containing every message that was not delivered. pub async fn send_requests( &mut self, requests: I, @@ -226,9 +302,21 @@ where /// Attempt to send telemetry without blocking. /// - /// Returns [`SiftStreamTrySendError::EncodeError`] if the message cannot be encoded, or - /// [`SiftStreamTrySendError::Channel`] (with the undelivered message) if the channel - /// is full or closed. + /// Encodes `message` and immediately attempts to place it on the backing channel. Returns + /// at once regardless of whether the channel has capacity. + /// + /// Use this method in tight loops or real-time contexts where blocking is unacceptable. + /// For backpressure-aware sending see [`send`](SiftStream::send). + /// + /// # Errors + /// + /// - [`SiftStreamTrySendError::EncodeError`] — the message could not be encoded. + /// - [`SiftStreamTrySendError::Channel`] wrapping one of: + /// - [`TrySendError::Full`] — the backing channel is at capacity; the undelivered + /// message is returned. Consider switching to [`send`](SiftStream::send) to apply + /// backpressure, or retrying after a short delay. + /// - [`TrySendError::Closed`] — the backing channel has been closed; the undelivered + /// message is returned. pub fn try_send( &mut self, message: M, @@ -247,7 +335,19 @@ where /// Attempt to send a batch of pre-encoded requests without blocking. /// - /// Returns [`TrySendError`] with all undelivered messages on first failure. + /// Calls `try_send` on the backing channel for each request. Returns immediately on + /// the first failure with every undelivered message (the failing one plus any not yet + /// attempted). + /// + /// Unlike [`try_send`](SiftStream::try_send), this method accepts pre-encoded + /// [`Transport::Message`](crate::stream::Transport::Message) values directly. Use + /// [`FlowBuilder`](crate::FlowBuilder) to construct them for maximum performance. + /// + /// # Errors + /// + /// [`TrySendError>`] containing every message that was not delivered: + /// - [`TrySendError::Full`] — the backing channel was at capacity. + /// - [`TrySendError::Closed`] — the backing channel was closed. pub fn try_send_requests( &mut self, requests: I, diff --git a/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs b/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs index dc96d80ce..0cdd9b757 100644 --- a/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs +++ b/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs @@ -620,6 +620,54 @@ impl DataStream { } } +impl Stream for DataStream { + type Item = IngestWithConfigDataStreamRequest; + + fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { + // Close the stream if a checkpoint complete signal is received. + if matches!( + self.control_rx.as_mut().poll_next(ctx), + Poll::Ready(Some(Ok(ControlMessage::SignalNextCheckpoint))) + ) { + return Poll::Ready(None); + } + + // Continue with data streaming. + match self.data_rx.as_mut().poll_next(ctx) { + Poll::Ready(Some(DataMessage { + message_id, + request, + .. + })) => { + if !self.saw_first_message { + self.saw_first_message = true; + self.first_message_id.store(message_id, Ordering::Relaxed); + } + self.last_message_id.store(message_id, Ordering::Relaxed); + + let message_size = request.encoded_len() as u64; + self.metrics.messages_sent.increment(); + self.metrics.checkpoint.cur_messages_sent.increment(); + self.metrics.bytes_sent.add(message_size); + self.metrics.checkpoint.cur_bytes_sent.add(message_size); + + // NOTE: This will copy the request which can be expensive. + Poll::Ready(Some((*request).clone())) + } + Poll::Ready(None) => { + // All senders dropped.. conclude stream + #[cfg(feature = "tracing")] + tracing::debug!( + sift_stream_id = %self.sift_stream_id, + "received signal to conclude SiftStream" + ); + Poll::Ready(None) + } + Poll::Pending => Poll::Pending, + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -765,51 +813,3 @@ mod tests { assert_eq!(err.into_inner().len(), 3); } } - -impl Stream for DataStream { - type Item = IngestWithConfigDataStreamRequest; - - fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll> { - // Close the stream if a checkpoint complete signal is received. - if matches!( - self.control_rx.as_mut().poll_next(ctx), - Poll::Ready(Some(Ok(ControlMessage::SignalNextCheckpoint))) - ) { - return Poll::Ready(None); - } - - // Continue with data streaming. - match self.data_rx.as_mut().poll_next(ctx) { - Poll::Ready(Some(DataMessage { - message_id, - request, - .. - })) => { - if !self.saw_first_message { - self.saw_first_message = true; - self.first_message_id.store(message_id, Ordering::Relaxed); - } - self.last_message_id.store(message_id, Ordering::Relaxed); - - let message_size = request.encoded_len() as u64; - self.metrics.messages_sent.increment(); - self.metrics.checkpoint.cur_messages_sent.increment(); - self.metrics.bytes_sent.add(message_size); - self.metrics.checkpoint.cur_bytes_sent.add(message_size); - - // NOTE: This will copy the request which can be expensive. - Poll::Ready(Some((*request).clone())) - } - Poll::Ready(None) => { - // All senders dropped.. conclude stream - #[cfg(feature = "tracing")] - tracing::debug!( - sift_stream_id = %self.sift_stream_id, - "received signal to conclude SiftStream" - ); - Poll::Ready(None) - } - Poll::Pending => Poll::Pending, - } - } -} diff --git a/rust/crates/sift_stream/src/stream/mode/test.rs b/rust/crates/sift_stream/src/stream/mode/test.rs index 432f58e5c..477bfd4e7 100644 --- a/rust/crates/sift_stream/src/stream/mode/test.rs +++ b/rust/crates/sift_stream/src/stream/mode/test.rs @@ -34,7 +34,6 @@ fn validate_handling_empty_values() { ..Default::default() }, ], - ..Default::default() }; let flow_descriptor = FlowDescriptor::try_from(("ingestion_config_id", flow_config)) .expect("flow descriptor should be generated"); @@ -98,7 +97,6 @@ fn validate_handling_no_matches_based_on_name() { ..Default::default() }, ], - ..Default::default() }; let flow_descriptor = FlowDescriptor::try_from(("ingestion_config_id", flow_config)) .expect("flow descriptor should be generated"); @@ -146,7 +144,6 @@ fn validate_handling_no_matches_based_on_type() { ..Default::default() }, ], - ..Default::default() }; let flow_descriptor = FlowDescriptor::try_from(("ingestion_config_id", flow_config)) .expect("flow descriptor should be generated"); diff --git a/rust/crates/sift_stream/src/stream/send_error.rs b/rust/crates/sift_stream/src/stream/send_error.rs index f56651f30..43f0f0b95 100644 --- a/rust/crates/sift_stream/src/stream/send_error.rs +++ b/rust/crates/sift_stream/src/stream/send_error.rs @@ -1,11 +1,21 @@ use sift_error::prelude::{Error as SiftError, ErrorKind}; use std::fmt; -/// Returned by the async `Transport::send` / `Transport::send_requests` when -/// the underlying channel is closed and delivery cannot complete. +/// Returned by the async [`Transport::send`](crate::stream::Transport::send) / +/// [`Transport::send_requests`](crate::stream::Transport::send_requests) when the underlying +/// channel is closed and delivery cannot complete. /// -/// The inner value `T` is the undelivered message so the caller can decide -/// what to do with it (e.g. log, retry, discard). +/// The inner value `T` is the undelivered message. Typical recovery strategies: +/// +/// - **Log and discard** — if losing the message is acceptable (e.g. high-frequency sensor +/// data where the next sample will arrive momentarily). +/// - **Buffer and retry** — store the message locally and re-attempt once the channel is +/// re-established. +/// - **Propagate as an error** — call [`into_inner`](SendError::into_inner) to recover the +/// message and return it up the call stack for the application to decide. +/// +/// A closed channel usually means the [`SiftStream`](crate::SiftStream) is shutting down. +/// Check that [`SiftStream::finish`](crate::SiftStream::finish) has not already been called. #[derive(Debug)] pub struct SendError(pub T); @@ -24,8 +34,23 @@ impl fmt::Display for SendError { impl std::error::Error for SendError {} -/// Returned by the sync `Transport::try_send` / `Transport::try_send_requests` -/// when immediate delivery fails. +/// Returned by the sync [`Transport::try_send`](crate::stream::Transport::try_send) / +/// [`Transport::try_send_requests`](crate::stream::Transport::try_send_requests) when +/// immediate delivery fails. +/// +/// The undelivered value `T` is always returned inside the variant so the caller can +/// recover it without cloning. +/// +/// ## Variants and recovery +/// +/// - [`Full`](TrySendError::Full) — the channel is currently at capacity. The message has +/// *not* been dropped; the caller can retry later with another `try_send` call, switch to +/// the backpressure-aware [`send`](crate::SiftStream::send), or discard the message if +/// it is stale. +/// +/// - [`Closed`](TrySendError::Closed) — all channel receivers have been dropped. The +/// [`SiftStream`](crate::SiftStream) is shutting down. Retrying on the same stream will +/// not succeed. #[derive(Debug)] pub enum TrySendError { /// The channel has been closed. The undelivered value is returned. @@ -62,15 +87,23 @@ impl fmt::Display for TrySendError { impl std::error::Error for TrySendError {} -/// Returned by [`SiftStream::send`] and [`SiftStream::try_send`] when encoding fails. +/// Returned by [`SiftStream::send`](crate::SiftStream::send) when delivery fails. /// -/// Wraps either a [`sift_error::Error`] from the encode step or the undelivered -/// message when the backing channel is closed. +/// This is the top-level error type for the high-level send API. It distinguishes +/// encode-time failures from channel-level failures so callers can handle them +/// differently. #[derive(Debug)] pub enum SiftStreamSendError { /// The message could not be encoded before it was sent. + /// + /// This typically indicates a schema mismatch or an invalid value in the message. + /// Retrying the same message without correcting the schema will not succeed. EncodeError(SiftError), - /// The channel closed before the message could be delivered. + + /// The backing channel closed before the message could be delivered. + /// + /// The undelivered message is returned inside the variant. See the recovery + /// guidance on [`SendError`] for options. ChannelClosed(T), } @@ -94,15 +127,25 @@ impl SiftStreamSendError { } } -/// Returned by [`SiftStream::try_send`] when immediate delivery fails. +/// Returned by [`SiftStream::try_send`](crate::SiftStream::try_send) when immediate +/// delivery fails. /// -/// Wraps either a [`sift_error::Error`] from the encode step or a -/// [`TrySendError`] from the backing channel. +/// This is the top-level error type for the non-blocking send API. It distinguishes +/// encode-time failures from channel-level failures so callers can handle them +/// differently. #[derive(Debug)] pub enum SiftStreamTrySendError { /// The message could not be encoded before it was sent. + /// + /// This typically indicates a schema mismatch or an invalid value in the message. + /// Retrying the same message without correcting the schema will not succeed. EncodeError(SiftError), + /// The backing channel was full or closed. + /// + /// The inner [`TrySendError`] carries the undelivered message. Inspect the variant + /// to distinguish between a transient backpressure condition (`Full`) and a permanent + /// shutdown (`Closed`). See [`TrySendError`] for recovery guidance. Channel(TrySendError), } diff --git a/rust/crates/sift_stream/src/test.rs b/rust/crates/sift_stream/src/test.rs index a3935c3cb..9216d440e 100644 --- a/rust/crates/sift_stream/src/test.rs +++ b/rust/crates/sift_stream/src/test.rs @@ -138,7 +138,7 @@ impl IngestionConfigService for MockIngestionConfigService { } Ok(Response::new(ListIngestionConfigsResponse { - ingestion_configs: ingestion_configs, + ingestion_configs, next_page_token: "".to_string(), })) } diff --git a/rust/crates/sift_stream/tests/common/mod.rs b/rust/crates/sift_stream/tests/common/mod.rs index daab49f42..b6d661c4a 100644 --- a/rust/crates/sift_stream/tests/common/mod.rs +++ b/rust/crates/sift_stream/tests/common/mod.rs @@ -136,7 +136,7 @@ impl IngestionConfigService for MockIngestionConfigService { } Ok(Response::new(ListIngestionConfigsResponse { - ingestion_configs: ingestion_configs, + ingestion_configs, next_page_token: "".to_string(), })) } diff --git a/rust/crates/sift_stream/tests/test_ingestion_config_streaming_retries.rs b/rust/crates/sift_stream/tests/test_ingestion_config_streaming_retries.rs index 64635cf0f..2a1fd1303 100644 --- a/rust/crates/sift_stream/tests/test_ingestion_config_streaming_retries.rs +++ b/rust/crates/sift_stream/tests/test_ingestion_config_streaming_retries.rs @@ -107,7 +107,7 @@ async fn test_retries_succeed() { // Send some messages while the server is returning errors. // // None of these messages should be captured by the mock server. - while num_streams_opened.load(Ordering::Relaxed) < 3 as u32 { + while num_streams_opened.load(Ordering::Relaxed) < 3_u32 { let msg = Flow::new( "flow", TimeValue::from(Local::now().to_utc()), From ac42750c90b4c9690dcf6026f2b7088040e6087e Mon Sep 17 00:00:00 2001 From: Taylor Jones Date: Mon, 30 Mar 2026 14:13:57 -0500 Subject: [PATCH 6/9] Python updates --- python/docs/examples/ingestion.ipynb | 219 +----------------- .../_internal/low_level_wrappers/ingestion.py | 4 +- python/lib/sift_client/resources/ingestion.py | 19 +- .../sift_stream_bindings.pyi | 2 +- .../sift_stream_bindings/src/stream/mod.rs | 29 ++- 5 files changed, 34 insertions(+), 239 deletions(-) diff --git a/python/docs/examples/ingestion.ipynb b/python/docs/examples/ingestion.ipynb index 669a3decc..7a925bbb3 100644 --- a/python/docs/examples/ingestion.ipynb +++ b/python/docs/examples/ingestion.ipynb @@ -121,20 +121,7 @@ "cell_type": "markdown", "id": "c5bf10f3", "metadata": {}, - "source": [ - "## 2. Advanced FlowBuilderPy Usage\n", - "\n", - "This example demonstrates the advanced `FlowBuilderPy` paradigm, which provides better performance and more control:\n", - "- Get `FlowDescriptorPy` using `get_flow_descriptor()`\n", - "- Retrieve the run ID from SiftStream using `get_run_id()`\n", - "- Create `FlowBuilderPy` from the descriptor\n", - "- Set the run ID directly on the flow builder using `attach_run_id()`\n", - "- Use channel indices from the descriptor mapping to avoid hash operations\n", - "- Use `set()` with channel indices instead of `set_with_key()` for maximum performance\n", - "- Build the request and send using `send_requests()` or `send_requests_nonblocking()`\n", - "\n", - "**Note**: This approach requires managing the run ID directly, making it more advanced but also more performant. Using channel indices instead of channel names avoids hash lookups, providing the best performance for high-frequency data sending. It's useful when you need fine-grained control or are sending data for multiple runs/assets with a single SiftStream instance.\n" - ] + "source": "## 2. Advanced FlowBuilderPy Usage\n\nThis example demonstrates the advanced `FlowBuilderPy` paradigm, which provides better performance and more control:\n- Get `FlowDescriptorPy` using `get_flow_descriptor()`\n- Retrieve the run ID from SiftStream using `get_run_id()`\n- Create `FlowBuilderPy` from the descriptor\n- Set the run ID directly on the flow builder using `attach_run_id()`\n- Use channel indices from the descriptor mapping to avoid hash operations\n- Use `set()` with channel indices instead of `set_with_key()` for maximum performance\n- Build the request and send using `send_requests()` or `try_send_requests()`\n\n**Note**: This approach requires managing the run ID directly, making it more advanced but also more performant. Using channel indices instead of channel names avoids hash lookups, providing the best performance for high-frequency data sending. It's useful when you need fine-grained control or are sending data for multiple runs/assets with a single SiftStream instance." }, { "cell_type": "code", @@ -142,120 +129,13 @@ "id": "98aec10b", "metadata": {}, "outputs": [], - "source": [ - "async def advanced_flowbuilder_example():\n", - " \"\"\"Example showing advanced FlowBuilderPy usage with channel indices for maximum performance.\"\"\"\n", - " from sift_stream_bindings import FlowBuilderPy, TimeValuePy, ValuePy\n", - "\n", - " connection_config = SiftConnectionConfig(\n", - " api_key=\"my_api_key\",\n", - " grpc_url=\"sift_grpc_url\",\n", - " rest_url=\"sift_rest_url\",\n", - " )\n", - "\n", - " client = SiftClient(connection_config=connection_config)\n", - "\n", - " ingestion_config = IngestionConfigCreate(\n", - " asset_name=\"sift_rover_1\",\n", - " flows=[\n", - " FlowConfig(\n", - " name=\"onboard_sensors\",\n", - " channels=[\n", - " ChannelConfig(name=\"motor_temp\", unit=\"C\", data_type=ChannelDataType.DOUBLE),\n", - " ChannelConfig(\n", - " name=\"tank_pressure\", unit=\"kPa\", data_type=ChannelDataType.DOUBLE\n", - " ),\n", - " ],\n", - " )\n", - " ],\n", - " )\n", - "\n", - " run = RunCreate(name=\"sift_rover-\" + str(int(time.time())))\n", - "\n", - " async with await client.async_.ingestion.create_ingestion_config_streaming_client(\n", - " ingestion_config=ingestion_config,\n", - " run=run,\n", - " ) as ingest_client:\n", - " # Get the flow descriptor and run ID from SiftStream\n", - " descriptor = ingest_client.get_flow_descriptor(flow_name=\"onboard_sensors\")\n", - " run_id = ingest_client.get_run_id()\n", - "\n", - " if run_id is None:\n", - " raise ValueError(\"Run ID is required for FlowBuilderPy usage\")\n", - "\n", - " # Get the mapping from channel names to ChannelIndexPy\n", - " # This allows us to avoid hash lookups by using indices directly\n", - " channel_index_map = descriptor.mapping()\n", - "\n", - " # Pre-compute channel indices and value conversion methods\n", - " # This creates a list of (ChannelIndexPy, conversion_method) tuples\n", - " # that can be reused for each flow, avoiding hash operations\n", - " #\n", - " # If this technique is used, caching the indices and conversion method\n", - " # is strongly recommended.\n", - " channel_indices_and_methods = [\n", - " (channel_index_map[\"motor_temp\"], ValuePy.Double),\n", - " (channel_index_map[\"tank_pressure\"], ValuePy.Double),\n", - " ]\n", - "\n", - " # Send data in a loop using FlowBuilderPy with channel indices\n", - " for i in range(10):\n", - " # Create a FlowBuilderPy from the descriptor\n", - " flow_builder = FlowBuilderPy(descriptor)\n", - "\n", - " # Attach the run ID directly to the flow builder\n", - " flow_builder.attach_run_id(run_id)\n", - "\n", - " # Set channel values using set() with pre-computed indices\n", - " # This avoids hash lookups and provides better performance\n", - " motor_temp_value = 50.0 + random.random() * 5.0\n", - " tank_pressure_value = 2000.0 + random.random() * 100.0\n", - "\n", - " # If the raw data class used provides in-order iteration over the raw data, you can also iterate\n", - " # over the values and encoding information directly. Since the value indices are used, the\n", - " # additional per-channel hash lookup is not needed, further improving performance.\n", - " #\n", - " # Though for convenience, the values can also be set using set_with_key() which takes a channel name\n", - " # and value.\n", - " #\n", - " # Example:\n", - " #\n", - " # flow_builder.set_with_key(\"motor_temp\", motor_temp_value)\n", - " # flow_builder.set_with_key(\"tank_pressure\", tank_pressure_value)\n", - " values = [motor_temp_value, tank_pressure_value]\n", - " for (channel_index, conversion_method), value in zip(\n", - " channel_indices_and_methods, values\n", - " ):\n", - " flow_builder.set(channel_index, conversion_method(value))\n", - "\n", - " # Build the request with current timestamp\n", - " request = flow_builder.request(TimeValuePy.now())\n", - "\n", - " # Send the request (non-blocking version)\n", - " ingest_client.send_requests_nonblocking([request])\n", - "\n", - " await asyncio.sleep(0.1)\n", - "\n", - "\n", - "# Uncomment to run:\n", - "# asyncio.run(advanced_flowbuilder_example())" - ] + "source": "async def advanced_flowbuilder_example():\n \"\"\"Example showing advanced FlowBuilderPy usage with channel indices for maximum performance.\"\"\"\n from sift_stream_bindings import FlowBuilderPy, TimeValuePy, ValuePy\n\n connection_config = SiftConnectionConfig(\n api_key=\"my_api_key\",\n grpc_url=\"sift_grpc_url\",\n rest_url=\"sift_rest_url\",\n )\n\n client = SiftClient(connection_config=connection_config)\n\n ingestion_config = IngestionConfigCreate(\n asset_name=\"sift_rover_1\",\n flows=[\n FlowConfig(\n name=\"onboard_sensors\",\n channels=[\n ChannelConfig(name=\"motor_temp\", unit=\"C\", data_type=ChannelDataType.DOUBLE),\n ChannelConfig(\n name=\"tank_pressure\", unit=\"kPa\", data_type=ChannelDataType.DOUBLE\n ),\n ],\n )\n ],\n )\n\n run = RunCreate(name=\"sift_rover-\" + str(int(time.time())))\n\n async with await client.async_.ingestion.create_ingestion_config_streaming_client(\n ingestion_config=ingestion_config,\n run=run,\n ) as ingest_client:\n # Get the flow descriptor and run ID from SiftStream\n descriptor = ingest_client.get_flow_descriptor(flow_name=\"onboard_sensors\")\n run_id = ingest_client.get_run_id()\n\n if run_id is None:\n raise ValueError(\"Run ID is required for FlowBuilderPy usage\")\n\n # Get the mapping from channel names to ChannelIndexPy\n # This allows us to avoid hash lookups by using indices directly\n channel_index_map = descriptor.mapping()\n\n # Pre-compute channel indices and value conversion methods\n # This creates a list of (ChannelIndexPy, conversion_method) tuples\n # that can be reused for each flow, avoiding hash operations\n #\n # If this technique is used, caching the indices and conversion method\n # is strongly recommended.\n channel_indices_and_methods = [\n (channel_index_map[\"motor_temp\"], ValuePy.Double),\n (channel_index_map[\"tank_pressure\"], ValuePy.Double),\n ]\n\n # Send data in a loop using FlowBuilderPy with channel indices\n for i in range(10):\n # Create a FlowBuilderPy from the descriptor\n flow_builder = FlowBuilderPy(descriptor)\n\n # Attach the run ID directly to the flow builder\n flow_builder.attach_run_id(run_id)\n\n # Set channel values using set() with pre-computed indices\n # This avoids hash lookups and provides better performance\n motor_temp_value = 50.0 + random.random() * 5.0\n tank_pressure_value = 2000.0 + random.random() * 100.0\n\n # If the raw data class used provides in-order iteration over the raw data, you can also iterate\n # over the values and encoding information directly. Since the value indices are used, the\n # additional per-channel hash lookup is not needed, further improving performance.\n #\n # Though for convenience, the values can also be set using set_with_key() which takes a channel name\n # and value.\n #\n # Example:\n #\n # flow_builder.set_with_key(\"motor_temp\", motor_temp_value)\n # flow_builder.set_with_key(\"tank_pressure\", tank_pressure_value)\n values = [motor_temp_value, tank_pressure_value]\n for (channel_index, conversion_method), value in zip(\n channel_indices_and_methods, values\n ):\n flow_builder.set(channel_index, conversion_method(value))\n\n # Build the request with current timestamp\n request = flow_builder.request(TimeValuePy.now())\n\n # Send the request (non-blocking version)\n ingest_client.try_send_requests([request])\n\n await asyncio.sleep(0.1)\n\n\n# Uncomment to run:\n# asyncio.run(advanced_flowbuilder_example())" }, { "cell_type": "markdown", "id": "ab425ee7", "metadata": {}, - "source": [ - "## 3. High-Performance Batch Sending\n", - "\n", - "This example demonstrates high-performance batch sending using `FlowBuilderPy` with channel indices and `send_requests_nonblocking()`:\n", - "- Pre-compute channel indices from the descriptor mapping to avoid hash operations\n", - "- Use `FlowBuilderPy` with `set()` and channel indices for maximum performance\n", - "- Use `send_requests_nonblocking()` for non-blocking batch sending\n", - "- This approach provides the best performance for high-throughput scenarios\n", - "\n", - "The combination of channel indices (avoiding hash lookups) and non-blocking batch sending allows the underlying Rust client to handle batching and sending asynchronously, minimizing Python overhead and maximizing throughput.\n" - ] + "source": "## 3. High-Performance Batch Sending\n\nThis example demonstrates high-performance batch sending using `FlowBuilderPy` with channel indices and `try_send_requests()`:\n- Pre-compute channel indices from the descriptor mapping to avoid hash operations\n- Use `FlowBuilderPy` with `set()` and channel indices for maximum performance\n- Use `try_send_requests()` for non-blocking batch sending\n- This approach provides the best performance for high-throughput scenarios\n\nThe combination of channel indices (avoiding hash lookups) and non-blocking batch sending allows the underlying Rust client to handle batching and sending asynchronously, minimizing Python overhead and maximizing throughput." }, { "cell_type": "code", @@ -263,96 +143,7 @@ "id": "fc45400a", "metadata": {}, "outputs": [], - "source": [ - "async def high_performance_batch_example():\n", - " \"\"\"Example showing high-performance batch sending with FlowBuilderPy using channel indices.\"\"\"\n", - " from datetime import timedelta\n", - "\n", - " from sift_stream_bindings import FlowBuilderPy, TimeValuePy, ValuePy\n", - "\n", - " connection_config = SiftConnectionConfig(\n", - " api_key=\"my_api_key\",\n", - " grpc_url=\"sift_grpc_url\",\n", - " rest_url=\"sift_rest_url\",\n", - " )\n", - "\n", - " client = SiftClient(connection_config=connection_config)\n", - "\n", - " ingestion_config = IngestionConfigCreate(\n", - " asset_name=\"sift_rover_1\",\n", - " flows=[\n", - " FlowConfig(\n", - " name=\"onboard_sensors\",\n", - " channels=[\n", - " ChannelConfig(name=\"motor_temp\", unit=\"C\", data_type=ChannelDataType.DOUBLE),\n", - " ChannelConfig(\n", - " name=\"tank_pressure\", unit=\"kPa\", data_type=ChannelDataType.DOUBLE\n", - " ),\n", - " ],\n", - " )\n", - " ],\n", - " )\n", - "\n", - " run = RunCreate(name=\"sift_rover-\" + str(int(time.time())))\n", - "\n", - " async with await client.async_.ingestion.create_ingestion_config_streaming_client(\n", - " ingestion_config=ingestion_config,\n", - " run=run,\n", - " ) as ingest_client:\n", - " # Get the flow descriptor and run ID\n", - " descriptor = ingest_client.get_flow_descriptor(flow_name=\"onboard_sensors\")\n", - " run_id = ingest_client.get_run_id()\n", - "\n", - " # Pre-compute channel indices and conversion methods for maximum performance\n", - " # This avoids hash lookups when setting values in the loop below\n", - " channel_index_map = descriptor.mapping()\n", - " channel_indices_and_methods = [\n", - " (channel_index_map[\"motor_temp\"], ValuePy.Double),\n", - " (channel_index_map[\"tank_pressure\"], ValuePy.Double),\n", - " ]\n", - "\n", - " # Generate 5 seconds of data at 10Hz (10 flows per second = 50 flows total)\n", - " sample_rate_hz = 10\n", - " duration_seconds = 5\n", - " num_flows = sample_rate_hz * duration_seconds # 50 flows\n", - "\n", - " start_time = datetime.now(timezone.utc)\n", - " requests = []\n", - "\n", - " for i in range(num_flows):\n", - " # Calculate timestamp for each sample (spaced 0.1 seconds apart)\n", - " timestamp_secs = int((start_time + timedelta(seconds=i / sample_rate_hz)).timestamp())\n", - " timestamp = TimeValuePy.from_timestamp(timestamp_secs, 0)\n", - "\n", - " # Create FlowBuilderPy and build request using pre-computed indices\n", - " flow_builder = FlowBuilderPy(descriptor)\n", - "\n", - " if run_id is not None:\n", - " flow_builder.attach_run_id(run_id)\n", - "\n", - " # Generate values\n", - " motor_temp_value = 50.0 + random.random() * 5.0\n", - " tank_pressure_value = 2000.0 + random.random() * 100.0\n", - "\n", - " # Use indices directly - no hash operations!\n", - " values = [motor_temp_value, tank_pressure_value]\n", - " for (channel_index, conversion_method), value in zip(\n", - " channel_indices_and_methods, values\n", - " ):\n", - " flow_builder.set(channel_index, conversion_method(value))\n", - "\n", - " request = flow_builder.request(timestamp)\n", - " requests.append(request)\n", - "\n", - " # Send all requests in a single non-blocking batch operation\n", - " # The combination of channel indices + non-blocking batch sending provides\n", - " # the best performance for high-throughput scenarios\n", - " ingest_client.send_requests_nonblocking(requests)\n", - "\n", - "\n", - "# Uncomment to run:\n", - "# asyncio.run(high_performance_batch_example())" - ] + "source": "async def high_performance_batch_example():\n \"\"\"Example showing high-performance batch sending with FlowBuilderPy using channel indices.\"\"\"\n from datetime import timedelta\n\n from sift_stream_bindings import FlowBuilderPy, TimeValuePy, ValuePy\n\n connection_config = SiftConnectionConfig(\n api_key=\"my_api_key\",\n grpc_url=\"sift_grpc_url\",\n rest_url=\"sift_rest_url\",\n )\n\n client = SiftClient(connection_config=connection_config)\n\n ingestion_config = IngestionConfigCreate(\n asset_name=\"sift_rover_1\",\n flows=[\n FlowConfig(\n name=\"onboard_sensors\",\n channels=[\n ChannelConfig(name=\"motor_temp\", unit=\"C\", data_type=ChannelDataType.DOUBLE),\n ChannelConfig(\n name=\"tank_pressure\", unit=\"kPa\", data_type=ChannelDataType.DOUBLE\n ),\n ],\n )\n ],\n )\n\n run = RunCreate(name=\"sift_rover-\" + str(int(time.time())))\n\n async with await client.async_.ingestion.create_ingestion_config_streaming_client(\n ingestion_config=ingestion_config,\n run=run,\n ) as ingest_client:\n # Get the flow descriptor and run ID\n descriptor = ingest_client.get_flow_descriptor(flow_name=\"onboard_sensors\")\n run_id = ingest_client.get_run_id()\n\n # Pre-compute channel indices and conversion methods for maximum performance\n # This avoids hash lookups when setting values in the loop below\n channel_index_map = descriptor.mapping()\n channel_indices_and_methods = [\n (channel_index_map[\"motor_temp\"], ValuePy.Double),\n (channel_index_map[\"tank_pressure\"], ValuePy.Double),\n ]\n\n # Generate 5 seconds of data at 10Hz (10 flows per second = 50 flows total)\n sample_rate_hz = 10\n duration_seconds = 5\n num_flows = sample_rate_hz * duration_seconds # 50 flows\n\n start_time = datetime.now(timezone.utc)\n requests = []\n\n for i in range(num_flows):\n # Calculate timestamp for each sample (spaced 0.1 seconds apart)\n timestamp_secs = int((start_time + timedelta(seconds=i / sample_rate_hz)).timestamp())\n timestamp = TimeValuePy.from_timestamp(timestamp_secs, 0)\n\n # Create FlowBuilderPy and build request using pre-computed indices\n flow_builder = FlowBuilderPy(descriptor)\n\n if run_id is not None:\n flow_builder.attach_run_id(run_id)\n\n # Generate values\n motor_temp_value = 50.0 + random.random() * 5.0\n tank_pressure_value = 2000.0 + random.random() * 100.0\n\n # Use indices directly - no hash operations!\n values = [motor_temp_value, tank_pressure_value]\n for (channel_index, conversion_method), value in zip(\n channel_indices_and_methods, values\n ):\n flow_builder.set(channel_index, conversion_method(value))\n\n request = flow_builder.request(timestamp)\n requests.append(request)\n\n # Send all requests in a single non-blocking batch operation\n # The combination of channel indices + non-blocking batch sending provides\n # the best performance for high-throughput scenarios\n ingest_client.try_send_requests(requests)\n\n\n# Uncomment to run:\n# asyncio.run(high_performance_batch_example())" }, { "cell_type": "markdown", @@ -594,4 +385,4 @@ }, "nbformat": 4, "nbformat_minor": 5 -} +} \ No newline at end of file diff --git a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py index ad248837f..fc49e9a59 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py @@ -314,10 +314,10 @@ async def batch_send(self, flows: Iterable[FlowPy]): async def send_requests(self, requests: list[IngestWithConfigDataStreamRequestPy]): await self._sift_stream_instance.send_requests(requests) - def send_requests_nonblocking( + def try_send_requests( self, requests: Iterable[IngestWithConfigDataStreamRequestWrapperPy] ): - self._sift_stream_instance.send_requests_nonblocking(requests) + self._sift_stream_instance.try_send_requests(requests) def get_flow_descriptor(self, flow_name: str) -> FlowDescriptorPy: return self._sift_stream_instance.get_flow_descriptor(flow_name) diff --git a/python/lib/sift_client/resources/ingestion.py b/python/lib/sift_client/resources/ingestion.py index 69ae78527..3ecaefaf6 100644 --- a/python/lib/sift_client/resources/ingestion.py +++ b/python/lib/sift_client/resources/ingestion.py @@ -493,22 +493,29 @@ async def send_requests(self, requests: list[IngestWithConfigDataStreamRequestPy """ await self._low_level_client.send_requests(requests) - def send_requests_nonblocking( + def try_send_requests( self, requests: Iterable[IngestWithConfigDataStreamRequestWrapperPy] ): - """Send data in a manner identical to the raw gRPC service for ingestion-config based streaming. + """Send pre-encoded requests without blocking. - This method offers a way to send data that matches the raw gRPC service interface. You are - expected to handle channel value ordering as well as empty values correctly. + Attempts to place each request on the backing channel immediately. Returns as + soon as the first failure is encountered; any undelivered requests are reported + in the raised exception message (without printing their contents). + + This method offers a way to send data that matches the raw gRPC service interface. + You are expected to handle channel value ordering as well as empty values correctly. Important: If using this interface, you should use `FlowBuilderPy::request` to ensure proper building of the request. Args: - requests: List of ingestion requests to send to Sift. + requests: Iterable of ingestion requests to send to Sift. + + Raises: + RuntimeError: If the backing channel is full or closed. """ - self._low_level_client.send_requests_nonblocking(requests) + self._low_level_client.try_send_requests(requests) def get_flow_descriptor(self, flow_name: str) -> FlowDescriptorPy: """Retrieve a flow descriptor by name. diff --git a/rust/crates/sift_stream_bindings/sift_stream_bindings.pyi b/rust/crates/sift_stream_bindings/sift_stream_bindings.pyi index d49ac2a00..658484800 100644 --- a/rust/crates/sift_stream_bindings/sift_stream_bindings.pyi +++ b/rust/crates/sift_stream_bindings/sift_stream_bindings.pyi @@ -444,7 +444,7 @@ class SiftStreamPy: def send(self, flow:FlowPy) -> typing.Any: ... def batch_send(self, flows:typing.Any) -> typing.Any: ... def send_requests(self, requests:typing.Sequence[IngestWithConfigDataStreamRequestPy]) -> typing.Any: ... - def send_requests_nonblocking(self, flows:typing.Any) -> None: ... + def try_send_requests(self, flows:typing.Any) -> None: ... def get_metrics_snapshot(self) -> SiftStreamMetricsSnapshotPy: ... def add_new_flows(self, flow_configs:typing.Sequence[FlowConfigPy]) -> typing.Any: ... def get_flow_descriptor(self, flow_name:builtins.str) -> FlowDescriptorPy: ... diff --git a/rust/crates/sift_stream_bindings/src/stream/mod.rs b/rust/crates/sift_stream_bindings/src/stream/mod.rs index cfd496df9..1c0e9803c 100644 --- a/rust/crates/sift_stream_bindings/src/stream/mod.rs +++ b/rust/crates/sift_stream_bindings/src/stream/mod.rs @@ -63,6 +63,14 @@ impl From for Flow { } } +// Helper: convert any error whose Display impl does not print the message payload into a PyErr. +// All sift_stream send error types (SiftStreamSendError, SendError, TrySendError, +// SiftStreamTrySendError) intentionally omit the inner T from their Display output, so +// format!("{e}") is safe to use regardless of how large the undelivered message(s) may be. +fn py_err(e: impl std::fmt::Display) -> PyErr { + PyErr::new::(format!("{e}")) +} + // PyO3 Method Implementations #[gen_stub_pymethods] #[pymethods] @@ -79,10 +87,7 @@ impl SiftStreamPy { })?; let flow_rs: Flow = flow.into(); - match stream.send(flow_rs).await { - Ok(_) => Ok(()), - Err(e) => Err(SiftErrorWrapper(e).into()), - } + stream.send(flow_rs).await.map_err(py_err) })?; Ok(awaitable.into()) @@ -113,10 +118,7 @@ impl SiftStreamPy { for flow in flows_vec { let flow_rs: Flow = flow.into(); - match stream.send(flow_rs).await { - Ok(_) => (), - Err(e) => return Err(SiftErrorWrapper(e).into()), - } + stream.send(flow_rs).await.map_err(py_err)?; } Ok(()) })?; @@ -141,16 +143,13 @@ impl SiftStreamPy { ) })?; - match stream.send_requests(requests).await { - Ok(_) => Ok(()), - Err(e) => Err(SiftErrorWrapper(e).into()), - } + stream.send_requests(requests).await.map_err(py_err) })?; Ok(awaitable.into()) } - pub fn send_requests_nonblocking(&self, flows: &Bound<'_, PyAny>) -> PyResult<()> { + pub fn try_send_requests(&self, flows: &Bound<'_, PyAny>) -> PyResult<()> { let flow_iter = PyIterator::from_object(flows)?; let mut flows_vec: Vec = Vec::new(); for item in flow_iter { @@ -165,9 +164,7 @@ impl SiftStreamPy { ) })?; - stream - .send_requests_nonblocking(flows_vec) - .map_err(|e| SiftErrorWrapper(e).into()) + stream.try_send_requests(flows_vec).map_err(py_err) } pub fn get_metrics_snapshot(&self) -> PyResult { From cba9cee6da15251b7b76e905dcbd428a3baaddbb Mon Sep 17 00:00:00 2001 From: Taylor Jones Date: Mon, 30 Mar 2026 14:27:28 -0500 Subject: [PATCH 7/9] Python formatting --- .../lib/sift_client/_internal/low_level_wrappers/ingestion.py | 4 +--- python/lib/sift_client/resources/ingestion.py | 4 +--- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py index fc49e9a59..bd5866583 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py @@ -314,9 +314,7 @@ async def batch_send(self, flows: Iterable[FlowPy]): async def send_requests(self, requests: list[IngestWithConfigDataStreamRequestPy]): await self._sift_stream_instance.send_requests(requests) - def try_send_requests( - self, requests: Iterable[IngestWithConfigDataStreamRequestWrapperPy] - ): + def try_send_requests(self, requests: Iterable[IngestWithConfigDataStreamRequestWrapperPy]): self._sift_stream_instance.try_send_requests(requests) def get_flow_descriptor(self, flow_name: str) -> FlowDescriptorPy: diff --git a/python/lib/sift_client/resources/ingestion.py b/python/lib/sift_client/resources/ingestion.py index 3ecaefaf6..91db55e6c 100644 --- a/python/lib/sift_client/resources/ingestion.py +++ b/python/lib/sift_client/resources/ingestion.py @@ -493,9 +493,7 @@ async def send_requests(self, requests: list[IngestWithConfigDataStreamRequestPy """ await self._low_level_client.send_requests(requests) - def try_send_requests( - self, requests: Iterable[IngestWithConfigDataStreamRequestWrapperPy] - ): + def try_send_requests(self, requests: Iterable[IngestWithConfigDataStreamRequestWrapperPy]): """Send pre-encoded requests without blocking. Attempts to place each request on the backing channel immediately. Returns as From 4083aa107a0250aa043c97c0063b864318328f48 Mon Sep 17 00:00:00 2001 From: Taylor Jones Date: Mon, 30 Mar 2026 17:35:16 -0500 Subject: [PATCH 8/9] Python changes have to happen after sift-stream-bindings is updated --- python/docs/examples/ingestion.ipynb | 219 +++++++++++++++++- .../_internal/low_level_wrappers/ingestion.py | 6 +- python/lib/sift_client/resources/ingestion.py | 21 +- 3 files changed, 226 insertions(+), 20 deletions(-) diff --git a/python/docs/examples/ingestion.ipynb b/python/docs/examples/ingestion.ipynb index 7a925bbb3..669a3decc 100644 --- a/python/docs/examples/ingestion.ipynb +++ b/python/docs/examples/ingestion.ipynb @@ -121,7 +121,20 @@ "cell_type": "markdown", "id": "c5bf10f3", "metadata": {}, - "source": "## 2. Advanced FlowBuilderPy Usage\n\nThis example demonstrates the advanced `FlowBuilderPy` paradigm, which provides better performance and more control:\n- Get `FlowDescriptorPy` using `get_flow_descriptor()`\n- Retrieve the run ID from SiftStream using `get_run_id()`\n- Create `FlowBuilderPy` from the descriptor\n- Set the run ID directly on the flow builder using `attach_run_id()`\n- Use channel indices from the descriptor mapping to avoid hash operations\n- Use `set()` with channel indices instead of `set_with_key()` for maximum performance\n- Build the request and send using `send_requests()` or `try_send_requests()`\n\n**Note**: This approach requires managing the run ID directly, making it more advanced but also more performant. Using channel indices instead of channel names avoids hash lookups, providing the best performance for high-frequency data sending. It's useful when you need fine-grained control or are sending data for multiple runs/assets with a single SiftStream instance." + "source": [ + "## 2. Advanced FlowBuilderPy Usage\n", + "\n", + "This example demonstrates the advanced `FlowBuilderPy` paradigm, which provides better performance and more control:\n", + "- Get `FlowDescriptorPy` using `get_flow_descriptor()`\n", + "- Retrieve the run ID from SiftStream using `get_run_id()`\n", + "- Create `FlowBuilderPy` from the descriptor\n", + "- Set the run ID directly on the flow builder using `attach_run_id()`\n", + "- Use channel indices from the descriptor mapping to avoid hash operations\n", + "- Use `set()` with channel indices instead of `set_with_key()` for maximum performance\n", + "- Build the request and send using `send_requests()` or `send_requests_nonblocking()`\n", + "\n", + "**Note**: This approach requires managing the run ID directly, making it more advanced but also more performant. Using channel indices instead of channel names avoids hash lookups, providing the best performance for high-frequency data sending. It's useful when you need fine-grained control or are sending data for multiple runs/assets with a single SiftStream instance.\n" + ] }, { "cell_type": "code", @@ -129,13 +142,120 @@ "id": "98aec10b", "metadata": {}, "outputs": [], - "source": "async def advanced_flowbuilder_example():\n \"\"\"Example showing advanced FlowBuilderPy usage with channel indices for maximum performance.\"\"\"\n from sift_stream_bindings import FlowBuilderPy, TimeValuePy, ValuePy\n\n connection_config = SiftConnectionConfig(\n api_key=\"my_api_key\",\n grpc_url=\"sift_grpc_url\",\n rest_url=\"sift_rest_url\",\n )\n\n client = SiftClient(connection_config=connection_config)\n\n ingestion_config = IngestionConfigCreate(\n asset_name=\"sift_rover_1\",\n flows=[\n FlowConfig(\n name=\"onboard_sensors\",\n channels=[\n ChannelConfig(name=\"motor_temp\", unit=\"C\", data_type=ChannelDataType.DOUBLE),\n ChannelConfig(\n name=\"tank_pressure\", unit=\"kPa\", data_type=ChannelDataType.DOUBLE\n ),\n ],\n )\n ],\n )\n\n run = RunCreate(name=\"sift_rover-\" + str(int(time.time())))\n\n async with await client.async_.ingestion.create_ingestion_config_streaming_client(\n ingestion_config=ingestion_config,\n run=run,\n ) as ingest_client:\n # Get the flow descriptor and run ID from SiftStream\n descriptor = ingest_client.get_flow_descriptor(flow_name=\"onboard_sensors\")\n run_id = ingest_client.get_run_id()\n\n if run_id is None:\n raise ValueError(\"Run ID is required for FlowBuilderPy usage\")\n\n # Get the mapping from channel names to ChannelIndexPy\n # This allows us to avoid hash lookups by using indices directly\n channel_index_map = descriptor.mapping()\n\n # Pre-compute channel indices and value conversion methods\n # This creates a list of (ChannelIndexPy, conversion_method) tuples\n # that can be reused for each flow, avoiding hash operations\n #\n # If this technique is used, caching the indices and conversion method\n # is strongly recommended.\n channel_indices_and_methods = [\n (channel_index_map[\"motor_temp\"], ValuePy.Double),\n (channel_index_map[\"tank_pressure\"], ValuePy.Double),\n ]\n\n # Send data in a loop using FlowBuilderPy with channel indices\n for i in range(10):\n # Create a FlowBuilderPy from the descriptor\n flow_builder = FlowBuilderPy(descriptor)\n\n # Attach the run ID directly to the flow builder\n flow_builder.attach_run_id(run_id)\n\n # Set channel values using set() with pre-computed indices\n # This avoids hash lookups and provides better performance\n motor_temp_value = 50.0 + random.random() * 5.0\n tank_pressure_value = 2000.0 + random.random() * 100.0\n\n # If the raw data class used provides in-order iteration over the raw data, you can also iterate\n # over the values and encoding information directly. Since the value indices are used, the\n # additional per-channel hash lookup is not needed, further improving performance.\n #\n # Though for convenience, the values can also be set using set_with_key() which takes a channel name\n # and value.\n #\n # Example:\n #\n # flow_builder.set_with_key(\"motor_temp\", motor_temp_value)\n # flow_builder.set_with_key(\"tank_pressure\", tank_pressure_value)\n values = [motor_temp_value, tank_pressure_value]\n for (channel_index, conversion_method), value in zip(\n channel_indices_and_methods, values\n ):\n flow_builder.set(channel_index, conversion_method(value))\n\n # Build the request with current timestamp\n request = flow_builder.request(TimeValuePy.now())\n\n # Send the request (non-blocking version)\n ingest_client.try_send_requests([request])\n\n await asyncio.sleep(0.1)\n\n\n# Uncomment to run:\n# asyncio.run(advanced_flowbuilder_example())" + "source": [ + "async def advanced_flowbuilder_example():\n", + " \"\"\"Example showing advanced FlowBuilderPy usage with channel indices for maximum performance.\"\"\"\n", + " from sift_stream_bindings import FlowBuilderPy, TimeValuePy, ValuePy\n", + "\n", + " connection_config = SiftConnectionConfig(\n", + " api_key=\"my_api_key\",\n", + " grpc_url=\"sift_grpc_url\",\n", + " rest_url=\"sift_rest_url\",\n", + " )\n", + "\n", + " client = SiftClient(connection_config=connection_config)\n", + "\n", + " ingestion_config = IngestionConfigCreate(\n", + " asset_name=\"sift_rover_1\",\n", + " flows=[\n", + " FlowConfig(\n", + " name=\"onboard_sensors\",\n", + " channels=[\n", + " ChannelConfig(name=\"motor_temp\", unit=\"C\", data_type=ChannelDataType.DOUBLE),\n", + " ChannelConfig(\n", + " name=\"tank_pressure\", unit=\"kPa\", data_type=ChannelDataType.DOUBLE\n", + " ),\n", + " ],\n", + " )\n", + " ],\n", + " )\n", + "\n", + " run = RunCreate(name=\"sift_rover-\" + str(int(time.time())))\n", + "\n", + " async with await client.async_.ingestion.create_ingestion_config_streaming_client(\n", + " ingestion_config=ingestion_config,\n", + " run=run,\n", + " ) as ingest_client:\n", + " # Get the flow descriptor and run ID from SiftStream\n", + " descriptor = ingest_client.get_flow_descriptor(flow_name=\"onboard_sensors\")\n", + " run_id = ingest_client.get_run_id()\n", + "\n", + " if run_id is None:\n", + " raise ValueError(\"Run ID is required for FlowBuilderPy usage\")\n", + "\n", + " # Get the mapping from channel names to ChannelIndexPy\n", + " # This allows us to avoid hash lookups by using indices directly\n", + " channel_index_map = descriptor.mapping()\n", + "\n", + " # Pre-compute channel indices and value conversion methods\n", + " # This creates a list of (ChannelIndexPy, conversion_method) tuples\n", + " # that can be reused for each flow, avoiding hash operations\n", + " #\n", + " # If this technique is used, caching the indices and conversion method\n", + " # is strongly recommended.\n", + " channel_indices_and_methods = [\n", + " (channel_index_map[\"motor_temp\"], ValuePy.Double),\n", + " (channel_index_map[\"tank_pressure\"], ValuePy.Double),\n", + " ]\n", + "\n", + " # Send data in a loop using FlowBuilderPy with channel indices\n", + " for i in range(10):\n", + " # Create a FlowBuilderPy from the descriptor\n", + " flow_builder = FlowBuilderPy(descriptor)\n", + "\n", + " # Attach the run ID directly to the flow builder\n", + " flow_builder.attach_run_id(run_id)\n", + "\n", + " # Set channel values using set() with pre-computed indices\n", + " # This avoids hash lookups and provides better performance\n", + " motor_temp_value = 50.0 + random.random() * 5.0\n", + " tank_pressure_value = 2000.0 + random.random() * 100.0\n", + "\n", + " # If the raw data class used provides in-order iteration over the raw data, you can also iterate\n", + " # over the values and encoding information directly. Since the value indices are used, the\n", + " # additional per-channel hash lookup is not needed, further improving performance.\n", + " #\n", + " # Though for convenience, the values can also be set using set_with_key() which takes a channel name\n", + " # and value.\n", + " #\n", + " # Example:\n", + " #\n", + " # flow_builder.set_with_key(\"motor_temp\", motor_temp_value)\n", + " # flow_builder.set_with_key(\"tank_pressure\", tank_pressure_value)\n", + " values = [motor_temp_value, tank_pressure_value]\n", + " for (channel_index, conversion_method), value in zip(\n", + " channel_indices_and_methods, values\n", + " ):\n", + " flow_builder.set(channel_index, conversion_method(value))\n", + "\n", + " # Build the request with current timestamp\n", + " request = flow_builder.request(TimeValuePy.now())\n", + "\n", + " # Send the request (non-blocking version)\n", + " ingest_client.send_requests_nonblocking([request])\n", + "\n", + " await asyncio.sleep(0.1)\n", + "\n", + "\n", + "# Uncomment to run:\n", + "# asyncio.run(advanced_flowbuilder_example())" + ] }, { "cell_type": "markdown", "id": "ab425ee7", "metadata": {}, - "source": "## 3. High-Performance Batch Sending\n\nThis example demonstrates high-performance batch sending using `FlowBuilderPy` with channel indices and `try_send_requests()`:\n- Pre-compute channel indices from the descriptor mapping to avoid hash operations\n- Use `FlowBuilderPy` with `set()` and channel indices for maximum performance\n- Use `try_send_requests()` for non-blocking batch sending\n- This approach provides the best performance for high-throughput scenarios\n\nThe combination of channel indices (avoiding hash lookups) and non-blocking batch sending allows the underlying Rust client to handle batching and sending asynchronously, minimizing Python overhead and maximizing throughput." + "source": [ + "## 3. High-Performance Batch Sending\n", + "\n", + "This example demonstrates high-performance batch sending using `FlowBuilderPy` with channel indices and `send_requests_nonblocking()`:\n", + "- Pre-compute channel indices from the descriptor mapping to avoid hash operations\n", + "- Use `FlowBuilderPy` with `set()` and channel indices for maximum performance\n", + "- Use `send_requests_nonblocking()` for non-blocking batch sending\n", + "- This approach provides the best performance for high-throughput scenarios\n", + "\n", + "The combination of channel indices (avoiding hash lookups) and non-blocking batch sending allows the underlying Rust client to handle batching and sending asynchronously, minimizing Python overhead and maximizing throughput.\n" + ] }, { "cell_type": "code", @@ -143,7 +263,96 @@ "id": "fc45400a", "metadata": {}, "outputs": [], - "source": "async def high_performance_batch_example():\n \"\"\"Example showing high-performance batch sending with FlowBuilderPy using channel indices.\"\"\"\n from datetime import timedelta\n\n from sift_stream_bindings import FlowBuilderPy, TimeValuePy, ValuePy\n\n connection_config = SiftConnectionConfig(\n api_key=\"my_api_key\",\n grpc_url=\"sift_grpc_url\",\n rest_url=\"sift_rest_url\",\n )\n\n client = SiftClient(connection_config=connection_config)\n\n ingestion_config = IngestionConfigCreate(\n asset_name=\"sift_rover_1\",\n flows=[\n FlowConfig(\n name=\"onboard_sensors\",\n channels=[\n ChannelConfig(name=\"motor_temp\", unit=\"C\", data_type=ChannelDataType.DOUBLE),\n ChannelConfig(\n name=\"tank_pressure\", unit=\"kPa\", data_type=ChannelDataType.DOUBLE\n ),\n ],\n )\n ],\n )\n\n run = RunCreate(name=\"sift_rover-\" + str(int(time.time())))\n\n async with await client.async_.ingestion.create_ingestion_config_streaming_client(\n ingestion_config=ingestion_config,\n run=run,\n ) as ingest_client:\n # Get the flow descriptor and run ID\n descriptor = ingest_client.get_flow_descriptor(flow_name=\"onboard_sensors\")\n run_id = ingest_client.get_run_id()\n\n # Pre-compute channel indices and conversion methods for maximum performance\n # This avoids hash lookups when setting values in the loop below\n channel_index_map = descriptor.mapping()\n channel_indices_and_methods = [\n (channel_index_map[\"motor_temp\"], ValuePy.Double),\n (channel_index_map[\"tank_pressure\"], ValuePy.Double),\n ]\n\n # Generate 5 seconds of data at 10Hz (10 flows per second = 50 flows total)\n sample_rate_hz = 10\n duration_seconds = 5\n num_flows = sample_rate_hz * duration_seconds # 50 flows\n\n start_time = datetime.now(timezone.utc)\n requests = []\n\n for i in range(num_flows):\n # Calculate timestamp for each sample (spaced 0.1 seconds apart)\n timestamp_secs = int((start_time + timedelta(seconds=i / sample_rate_hz)).timestamp())\n timestamp = TimeValuePy.from_timestamp(timestamp_secs, 0)\n\n # Create FlowBuilderPy and build request using pre-computed indices\n flow_builder = FlowBuilderPy(descriptor)\n\n if run_id is not None:\n flow_builder.attach_run_id(run_id)\n\n # Generate values\n motor_temp_value = 50.0 + random.random() * 5.0\n tank_pressure_value = 2000.0 + random.random() * 100.0\n\n # Use indices directly - no hash operations!\n values = [motor_temp_value, tank_pressure_value]\n for (channel_index, conversion_method), value in zip(\n channel_indices_and_methods, values\n ):\n flow_builder.set(channel_index, conversion_method(value))\n\n request = flow_builder.request(timestamp)\n requests.append(request)\n\n # Send all requests in a single non-blocking batch operation\n # The combination of channel indices + non-blocking batch sending provides\n # the best performance for high-throughput scenarios\n ingest_client.try_send_requests(requests)\n\n\n# Uncomment to run:\n# asyncio.run(high_performance_batch_example())" + "source": [ + "async def high_performance_batch_example():\n", + " \"\"\"Example showing high-performance batch sending with FlowBuilderPy using channel indices.\"\"\"\n", + " from datetime import timedelta\n", + "\n", + " from sift_stream_bindings import FlowBuilderPy, TimeValuePy, ValuePy\n", + "\n", + " connection_config = SiftConnectionConfig(\n", + " api_key=\"my_api_key\",\n", + " grpc_url=\"sift_grpc_url\",\n", + " rest_url=\"sift_rest_url\",\n", + " )\n", + "\n", + " client = SiftClient(connection_config=connection_config)\n", + "\n", + " ingestion_config = IngestionConfigCreate(\n", + " asset_name=\"sift_rover_1\",\n", + " flows=[\n", + " FlowConfig(\n", + " name=\"onboard_sensors\",\n", + " channels=[\n", + " ChannelConfig(name=\"motor_temp\", unit=\"C\", data_type=ChannelDataType.DOUBLE),\n", + " ChannelConfig(\n", + " name=\"tank_pressure\", unit=\"kPa\", data_type=ChannelDataType.DOUBLE\n", + " ),\n", + " ],\n", + " )\n", + " ],\n", + " )\n", + "\n", + " run = RunCreate(name=\"sift_rover-\" + str(int(time.time())))\n", + "\n", + " async with await client.async_.ingestion.create_ingestion_config_streaming_client(\n", + " ingestion_config=ingestion_config,\n", + " run=run,\n", + " ) as ingest_client:\n", + " # Get the flow descriptor and run ID\n", + " descriptor = ingest_client.get_flow_descriptor(flow_name=\"onboard_sensors\")\n", + " run_id = ingest_client.get_run_id()\n", + "\n", + " # Pre-compute channel indices and conversion methods for maximum performance\n", + " # This avoids hash lookups when setting values in the loop below\n", + " channel_index_map = descriptor.mapping()\n", + " channel_indices_and_methods = [\n", + " (channel_index_map[\"motor_temp\"], ValuePy.Double),\n", + " (channel_index_map[\"tank_pressure\"], ValuePy.Double),\n", + " ]\n", + "\n", + " # Generate 5 seconds of data at 10Hz (10 flows per second = 50 flows total)\n", + " sample_rate_hz = 10\n", + " duration_seconds = 5\n", + " num_flows = sample_rate_hz * duration_seconds # 50 flows\n", + "\n", + " start_time = datetime.now(timezone.utc)\n", + " requests = []\n", + "\n", + " for i in range(num_flows):\n", + " # Calculate timestamp for each sample (spaced 0.1 seconds apart)\n", + " timestamp_secs = int((start_time + timedelta(seconds=i / sample_rate_hz)).timestamp())\n", + " timestamp = TimeValuePy.from_timestamp(timestamp_secs, 0)\n", + "\n", + " # Create FlowBuilderPy and build request using pre-computed indices\n", + " flow_builder = FlowBuilderPy(descriptor)\n", + "\n", + " if run_id is not None:\n", + " flow_builder.attach_run_id(run_id)\n", + "\n", + " # Generate values\n", + " motor_temp_value = 50.0 + random.random() * 5.0\n", + " tank_pressure_value = 2000.0 + random.random() * 100.0\n", + "\n", + " # Use indices directly - no hash operations!\n", + " values = [motor_temp_value, tank_pressure_value]\n", + " for (channel_index, conversion_method), value in zip(\n", + " channel_indices_and_methods, values\n", + " ):\n", + " flow_builder.set(channel_index, conversion_method(value))\n", + "\n", + " request = flow_builder.request(timestamp)\n", + " requests.append(request)\n", + "\n", + " # Send all requests in a single non-blocking batch operation\n", + " # The combination of channel indices + non-blocking batch sending provides\n", + " # the best performance for high-throughput scenarios\n", + " ingest_client.send_requests_nonblocking(requests)\n", + "\n", + "\n", + "# Uncomment to run:\n", + "# asyncio.run(high_performance_batch_example())" + ] }, { "cell_type": "markdown", @@ -385,4 +594,4 @@ }, "nbformat": 4, "nbformat_minor": 5 -} \ No newline at end of file +} diff --git a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py index bd5866583..ad248837f 100644 --- a/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py +++ b/python/lib/sift_client/_internal/low_level_wrappers/ingestion.py @@ -314,8 +314,10 @@ async def batch_send(self, flows: Iterable[FlowPy]): async def send_requests(self, requests: list[IngestWithConfigDataStreamRequestPy]): await self._sift_stream_instance.send_requests(requests) - def try_send_requests(self, requests: Iterable[IngestWithConfigDataStreamRequestWrapperPy]): - self._sift_stream_instance.try_send_requests(requests) + def send_requests_nonblocking( + self, requests: Iterable[IngestWithConfigDataStreamRequestWrapperPy] + ): + self._sift_stream_instance.send_requests_nonblocking(requests) def get_flow_descriptor(self, flow_name: str) -> FlowDescriptorPy: return self._sift_stream_instance.get_flow_descriptor(flow_name) diff --git a/python/lib/sift_client/resources/ingestion.py b/python/lib/sift_client/resources/ingestion.py index 91db55e6c..69ae78527 100644 --- a/python/lib/sift_client/resources/ingestion.py +++ b/python/lib/sift_client/resources/ingestion.py @@ -493,27 +493,22 @@ async def send_requests(self, requests: list[IngestWithConfigDataStreamRequestPy """ await self._low_level_client.send_requests(requests) - def try_send_requests(self, requests: Iterable[IngestWithConfigDataStreamRequestWrapperPy]): - """Send pre-encoded requests without blocking. - - Attempts to place each request on the backing channel immediately. Returns as - soon as the first failure is encountered; any undelivered requests are reported - in the raised exception message (without printing their contents). + def send_requests_nonblocking( + self, requests: Iterable[IngestWithConfigDataStreamRequestWrapperPy] + ): + """Send data in a manner identical to the raw gRPC service for ingestion-config based streaming. - This method offers a way to send data that matches the raw gRPC service interface. - You are expected to handle channel value ordering as well as empty values correctly. + This method offers a way to send data that matches the raw gRPC service interface. You are + expected to handle channel value ordering as well as empty values correctly. Important: If using this interface, you should use `FlowBuilderPy::request` to ensure proper building of the request. Args: - requests: Iterable of ingestion requests to send to Sift. - - Raises: - RuntimeError: If the backing channel is full or closed. + requests: List of ingestion requests to send to Sift. """ - self._low_level_client.try_send_requests(requests) + self._low_level_client.send_requests_nonblocking(requests) def get_flow_descriptor(self, flow_name: str) -> FlowDescriptorPy: """Retrieve a flow descriptor by name. From fff582f00556ee7ac8e42227eea972ac64d38896 Mon Sep 17 00:00:00 2001 From: Taylor Jones Date: Tue, 31 Mar 2026 14:47:15 -0500 Subject: [PATCH 9/9] PR feedback --- rust/crates/sift_stream/src/stream/mod.rs | 30 ++++++++++++--- .../src/stream/mode/file_backup.rs | 22 +++++++++++ .../src/stream/mode/ingestion_config.rs | 37 +++++++++++++++++++ 3 files changed, 83 insertions(+), 6 deletions(-) diff --git a/rust/crates/sift_stream/src/stream/mod.rs b/rust/crates/sift_stream/src/stream/mod.rs index 3b6a45542..6d59f0c73 100644 --- a/rust/crates/sift_stream/src/stream/mod.rs +++ b/rust/crates/sift_stream/src/stream/mod.rs @@ -114,8 +114,12 @@ pub trait Transport: private::Sealed { /// /// # Errors /// - /// Returns [`SendError`] containing the undelivered message if the - /// backing channel is closed before delivery completes. + /// Returns [`SendError`] containing a potentially undelivered message. + /// + /// Depending on the implementation of [`Transport`], the undelivered message is not + /// necessarily the message that was provided to the current invocation of [`Self::send`]. + /// + /// See implementation documentation for details. async fn send( &mut self, stream_id: &Uuid, @@ -129,8 +133,12 @@ pub trait Transport: private::Sealed { /// /// # Errors /// - /// Returns [`SendError>`] containing every message that was not - /// delivered: the failing message followed by any that had not yet been attempted. + /// Returns [`SendError>`] containing potentially undelivered messages. + /// + /// Depending on the implementation of [`Transport`], the undelivered messages are not + /// necessarily the messages that were provided to the current invocation of [`Self::send_requests`]. + /// + /// See implementation documentation for details. async fn send_requests( &mut self, stream_id: &Uuid, @@ -146,10 +154,15 @@ pub trait Transport: private::Sealed { /// /// # Errors /// - /// Returns [`TrySendError`] with the undelivered message: + /// Returns [`TrySendError`] containing a potentially undelivered message: /// - [`TrySendError::Full`] — the channel is at capacity; consider retrying with /// [`send`](Transport::send) to apply backpressure instead. /// - [`TrySendError::Closed`] — the channel has been closed. + /// + /// Depending on the implementation of [`Transport`], the undelivered messages are not + /// necessarily the messages that were provided to the current invocation of [`Self::try_send`]. + /// + /// See implementation documentation for details. fn try_send( &mut self, stream_id: &Uuid, @@ -164,9 +177,14 @@ pub trait Transport: private::Sealed { /// /// # Errors /// - /// Returns [`TrySendError>`] with every message that was not delivered: + /// Returns [`TrySendError>`] containing potentially undelivered messages. /// - [`TrySendError::Full`] — the channel was at capacity for one of the messages. /// - [`TrySendError::Closed`] — the channel was closed. + /// + /// Depending on the implementation of [`Transport`], the undelivered messages are not + /// necessarily the messages that were provided to the current invocation of [`Self::try_send_requests`]. + /// + /// See implementation documentation for details. fn try_send_requests( &mut self, stream_id: &Uuid, diff --git a/rust/crates/sift_stream/src/stream/mode/file_backup.rs b/rust/crates/sift_stream/src/stream/mode/file_backup.rs index 556f3063f..60daf4eb9 100644 --- a/rust/crates/sift_stream/src/stream/mode/file_backup.rs +++ b/rust/crates/sift_stream/src/stream/mode/file_backup.rs @@ -136,6 +136,12 @@ impl Transport for FileBackup { type Encoder = IngestionConfigEncoder; type Message = IngestWithConfigDataStreamRequest; + /// Sends a message to be written to the backup file, awaiting capacity if the stream + /// is busy. + /// + /// Returns an error only if the stream has been closed, in which case the original + /// message is returned inside `Err`. Normal backpressure is handled transparently by + /// waiting. async fn send( &mut self, stream_id: &Uuid, @@ -154,6 +160,11 @@ impl Transport for FileBackup { Ok(()) } + /// Attempts to send a message without blocking. + /// + /// Returns immediately with `TrySendError::Full` if the stream is at capacity, or + /// `TrySendError::Closed` if the stream has been closed. In either case the original + /// message is returned unchanged. fn try_send( &mut self, stream_id: &Uuid, @@ -175,6 +186,12 @@ impl Transport for FileBackup { } } + /// Sends a batch of messages in order to be written to the backup file, awaiting + /// capacity for each one. + /// + /// On stream close, stops immediately and returns the undelivered messages starting + /// from the point of failure. The first element of the returned `Vec` is always the + /// message that failed to send. async fn send_requests( &mut self, stream_id: &Uuid, @@ -195,6 +212,11 @@ impl Transport for FileBackup { Ok(()) } + /// Attempts to send a batch of messages in order without blocking. + /// + /// Stops and returns on the first failure. The returned `Vec` contains the undelivered + /// messages starting from the point of failure, with the first element always being + /// the message that failed to send. fn try_send_requests( &mut self, stream_id: &Uuid, diff --git a/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs b/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs index 0cdd9b757..d7845a51f 100644 --- a/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs +++ b/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs @@ -178,6 +178,18 @@ impl Transport for LiveStreaming { type Encoder = IngestionConfigEncoder; type Message = IngestWithConfigDataStreamRequest; + /// Sends a message, awaiting capacity if the stream is busy. + /// + /// This mode prioritizes freshness: newer messages always make it into the stream, and + /// older buffered messages are displaced to make room when necessary. As a result, if + /// the stream is closed while a displaced message is being handled, **the message + /// returned inside `Err` may be older than the message provided**. An error is only + /// returned on stream close; normal backpressure is handled transparently by waiting. + /// + /// Because displaced messages are not automatically retried, enabling file backup + /// recovery mode is strongly recommended. When enabled, displaced messages are written + /// to a local file and can be replayed to Sift at a later time, ensuring no data goes + /// unrecorded. async fn send( &mut self, stream_id: &Uuid, @@ -200,6 +212,17 @@ impl Transport for LiveStreaming { Ok(()) } + /// Attempts to send a message without blocking. + /// + /// Returns immediately with `TrySendError::Full` or `TrySendError::Closed` if the + /// stream cannot accept data right now. + /// + /// This mode prioritizes freshness: newer messages always make it into the stream, and + /// older buffered messages are displaced to make room when necessary. If such a + /// displacement occurs and the stream is full or closed at that point, **the message + /// returned inside `Err` may be older than the message provided**. Enabling file + /// backup recovery mode is strongly recommended so that any displaced messages are + /// written to a local file and can be replayed to Sift later. fn try_send( &mut self, stream_id: &Uuid, @@ -228,6 +251,13 @@ impl Transport for LiveStreaming { Ok(()) } + /// Sends a batch of messages in order, awaiting capacity for each one. + /// + /// On stream close, stops immediately and returns the undelivered messages starting + /// from the point of failure. Because this mode may displace older buffered messages + /// to make room for newer ones (see [`send`](Self::send)), the first element of the + /// returned `Vec` may be an older displaced message rather than the one that was being + /// sent at the time of failure. async fn send_requests( &mut self, stream_id: &Uuid, @@ -248,6 +278,13 @@ impl Transport for LiveStreaming { Ok(()) } + /// Attempts to send a batch of messages in order without blocking. + /// + /// Stops and returns on the first failure. The returned `Vec` contains the undelivered + /// messages starting from the point of failure. Because this mode may displace older + /// buffered messages to make room for newer ones (see [`try_send`](Self::try_send)), + /// the first element may be an older displaced message rather than the one that was + /// being sent at the time of failure. fn try_send_requests( &mut self, stream_id: &Uuid,