diff --git a/rust/crates/sift_stream/examples/backups-only/main.rs b/rust/crates/sift_stream/examples/backups-only/main.rs index e8633d291..78a72a0b3 100644 --- a/rust/crates/sift_stream/examples/backups-only/main.rs +++ b/rust/crates/sift_stream/examples/backups-only/main.rs @@ -102,7 +102,7 @@ async fn run() -> Result<(), Box> { tokio::time::sleep(Duration::from_millis(100)).await; } - // Next, stream telemetry to backup files using the [`SiftStream::send_requests_nonblocking`] method + // Next, stream telemetry to backup files using the [`SiftStream::try_send_requests`] method // and the [`FlowBuilder`] to build the flow. // // This approach is more performant, and also provides methods to set the channel value via @@ -116,7 +116,7 @@ async fn run() -> Result<(), Box> { let run_id = sift_stream.run().unwrap().run_id.clone(); for i in 0..360 { // Build the flow using the [`FlowBuilder`] and send it to - // Sift using the [`SiftStream::send_requests_nonblocking`] method. + // Sift using the [`SiftStream::try_send_requests`] method. let mut flow_builder = FlowBuilder::new(&descriptor); flow_builder.attach_run_id(&run_id); flow_builder @@ -124,7 +124,7 @@ async fn run() -> Result<(), Box> { .unwrap(); sift_stream - .send_requests_nonblocking(vec![flow_builder.request(TimeValue::now())]) + .try_send_requests(vec![flow_builder.request(TimeValue::now())]) .unwrap(); // For demonstrative purposes, adding a contrived wait to get 10Hz data. diff --git a/rust/crates/sift_stream/examples/ingestion-tutorial/main.rs b/rust/crates/sift_stream/examples/ingestion-tutorial/main.rs index 3599e823e..0cc3f4fc0 100644 --- a/rust/crates/sift_stream/examples/ingestion-tutorial/main.rs +++ b/rust/crates/sift_stream/examples/ingestion-tutorial/main.rs @@ -148,7 +148,7 @@ async fn main() -> Result<(), Box> { // NOTE: This approach uses `Flow` and `SiftStream::send()` for ease of use, and will // provide acceptable performance for most users // In cases where additional performance is required, a separate, more performant method - // is also available that uses `FlowBuilder` and `SiftStream::send_requests_nonblocking` + // is also available that uses `FlowBuilder` and `SiftStream::try_send_requests` // See `examples/quick-start/` for an example using this alternate approach let start = std::time::Instant::now(); while start.elapsed() < INGEST_DURATION { diff --git a/rust/crates/sift_stream/examples/quick-start/main.rs b/rust/crates/sift_stream/examples/quick-start/main.rs index 1c753900c..b45c26b5b 100644 --- a/rust/crates/sift_stream/examples/quick-start/main.rs +++ b/rust/crates/sift_stream/examples/quick-start/main.rs @@ -93,7 +93,7 @@ async fn run() -> Result<(), Box> { tokio::time::sleep(Duration::from_millis(100)).await; } - // Next, stream telemetry to Sift using the [`SiftStream::send_requests_nonblocking`] method + // Next, stream telemetry to Sift using the [`SiftStream::try_send_requests`] method // and the [`FlowBuilder`] to build the flow. // // This approach is more performant, and also provides methods to set the channel value via @@ -107,7 +107,7 @@ async fn run() -> Result<(), Box> { let run_id = sift_stream.run().unwrap().run_id.clone(); for i in 0..360 { // Build the flow using the [`FlowBuilder`] and send it to - // Sift using the [`SiftStream::send_requests_nonblocking`] method. + // Sift using the [`SiftStream::try_send_requests`] method. let mut flow_builder = FlowBuilder::new(&descriptor); flow_builder.attach_run_id(&run_id); flow_builder @@ -116,7 +116,7 @@ async fn run() -> Result<(), Box> { // Send telemetry to Sift. sift_stream - .send_requests_nonblocking(vec![flow_builder.request(TimeValue::now())]) + .try_send_requests(vec![flow_builder.request(TimeValue::now())]) .unwrap(); // For demonstrative purposes, adding a contrived wait to get 10Hz data. diff --git a/rust/crates/sift_stream/src/backup/disk/async_manager.rs b/rust/crates/sift_stream/src/backup/disk/async_manager.rs index 031d65e58..e7442a05f 100644 --- a/rust/crates/sift_stream/src/backup/disk/async_manager.rs +++ b/rust/crates/sift_stream/src/backup/disk/async_manager.rs @@ -1122,7 +1122,7 @@ mod test { // The backup file should have been rotated. assert!( - backup_manager.file_ctx_buffer.len() > 0, + !backup_manager.file_ctx_buffer.is_empty(), "backup files should be present" ); assert!( diff --git a/rust/crates/sift_stream/src/backup/disk/file_writer.rs b/rust/crates/sift_stream/src/backup/disk/file_writer.rs index 981fe4158..094e66c3a 100644 --- a/rust/crates/sift_stream/src/backup/disk/file_writer.rs +++ b/rust/crates/sift_stream/src/backup/disk/file_writer.rs @@ -452,16 +452,16 @@ mod tests { } } - if writer.should_rotate_file() { - if let Some(ctx) = writer.rotate_file().await.expect("failed to rotate file") { - rotated_files.push(ctx.file_path); - } + if writer.should_rotate_file() + && let Some(ctx) = writer.rotate_file().await.expect("failed to rotate file") + { + rotated_files.push(ctx.file_path); } } // Should have created multiple files if !rotated_files.is_empty() { - assert!(rotated_files.len() > 0); + assert!(!rotated_files.is_empty()); for file_path in &rotated_files { assert!(file_path.exists()); } diff --git a/rust/crates/sift_stream/src/lib.rs b/rust/crates/sift_stream/src/lib.rs index b8e5dce18..d4ff10411 100644 --- a/rust/crates/sift_stream/src/lib.rs +++ b/rust/crates/sift_stream/src/lib.rs @@ -237,6 +237,87 @@ //! //! Anything that falls outside of that will require changing the client-key. //! +//! ## Sending Telemetry +//! +//! [`SiftStream`] exposes four methods for delivering telemetry. They differ only in whether +//! they apply backpressure (blocking) or return immediately (non-blocking), and in whether +//! they accept a high-level [`Flow`] or pre-encoded raw requests: +//! +//! | Method | Blocks? | Input | +//! |---|---|---| +//! | [`send`](stream::SiftStream::send) | Yes | [`Flow`] or any [`Encodeable`](stream::Encodeable) | +//! | [`send_requests`](stream::SiftStream::send_requests) | Yes | Pre-encoded requests | +//! | [`try_send`](stream::SiftStream::try_send) | No | [`Flow`] or any [`Encodeable`](stream::Encodeable) | +//! | [`try_send_requests`](stream::SiftStream::try_send_requests) | No | Pre-encoded requests | +//! +//! ### Backpressure with `send` +//! +//! [`send`](stream::SiftStream::send) awaits until the backing channel has capacity, then +//! delivers the message. Use this when you want the producer to slow down naturally when +//! the pipeline is under load — the simplest and most common choice. +//! +//! ```ignore +//! // Awaits until the channel has room; backpressure is applied automatically. +//! sift_stream.send(Flow::new( +//! "robotic-arm", +//! TimeValue::now(), +//! &[ChannelValue::new("joint-angle-encoder", 7.2_f64)], +//! )).await?; +//! ``` +//! +//! On error, [`SiftStreamSendError`] is returned. Call `into_inner()` on the +//! [`ChannelClosed`](stream::SiftStreamSendError::ChannelClosed) variant to recover the +//! undelivered message. +//! +//! ### Non-blocking sends with `try_send` +//! +//! [`try_send`](stream::SiftStream::try_send) returns immediately regardless of channel +//! state. If the channel is full it returns [`TrySendError::Full`] with the message; if +//! the channel is closed it returns [`TrySendError::Closed`]. Use this in tight loops or +//! real-time contexts where blocking even briefly is unacceptable. +//! +//! ```ignore +//! match sift_stream.try_send(Flow::new( +//! "robotic-arm", +//! TimeValue::now(), +//! &[ChannelValue::new("joint-angle-encoder", 7.2_f64)], +//! )) { +//! Ok(()) => {} +//! Err(SiftStreamTrySendError::Channel(TrySendError::Full(msg))) => { +//! // Channel is busy — drop this sample or buffer it for later. +//! drop(msg); +//! } +//! Err(e) => return Err(e.into()), +//! } +//! ``` +//! +//! ### Pre-encoded batch sends +//! +//! [`send_requests`](stream::SiftStream::send_requests) and +//! [`try_send_requests`](stream::SiftStream::try_send_requests) accept pre-encoded +//! [`IngestWithConfigDataStreamRequest`](sift_rs::ingest::v1::IngestWithConfigDataStreamRequest) +//! values built with [`FlowBuilder`]. This skips the per-call encoding step and is the +//! highest-throughput option. +//! +//! ```ignore +//! let descriptor = sift_stream.get_flow_descriptor("robotic-arm").unwrap(); +//! let run_id = sift_stream.run().unwrap().run_id.clone(); +//! +//! let mut builder = FlowBuilder::new(&descriptor); +//! builder.attach_run_id(&run_id); +//! builder.set_with_key("joint-angle-encoder", 7.2_f64).unwrap(); +//! +//! // Blocking batch send with backpressure: +//! sift_stream.send_requests(vec![builder.request(TimeValue::now())]).await?; +//! +//! // Non-blocking batch send: +//! sift_stream.try_send_requests(vec![builder.request(TimeValue::now())])?; +//! ``` +//! +//! On the first failure, `send_requests` / `try_send_requests` stop iterating and return +//! **all** undelivered messages — the failing one plus any not yet attempted — inside the +//! error so nothing is silently dropped. +//! //! ## Retry Policy //! //! At the time of writing this crate, [tonic](https://docs.rs/tonic/latest/tonic/) @@ -456,6 +537,7 @@ pub use stream::{ file_backup::FileBackup, ingestion_config::{Flow, IngestionConfigEncoder, LiveStreaming}, }, + send_error::{SendError, SiftStreamSendError, SiftStreamTrySendError, TrySendError}, time::TimeValue, }; diff --git a/rust/crates/sift_stream/src/stream/builder/config_loader.rs b/rust/crates/sift_stream/src/stream/builder/config_loader.rs index 0908b49e3..0e3f3514a 100644 --- a/rust/crates/sift_stream/src/stream/builder/config_loader.rs +++ b/rust/crates/sift_stream/src/stream/builder/config_loader.rs @@ -253,7 +253,6 @@ mod tests { ..Default::default() }, ], - ..Default::default() } } @@ -373,7 +372,6 @@ mod tests { data_type: ChannelDataType::Double.into(), ..Default::default() }], - ..Default::default() }]; let form = create_test_ingestion_config_form(asset_name, client_key, existing_flows); diff --git a/rust/crates/sift_stream/src/stream/mod.rs b/rust/crates/sift_stream/src/stream/mod.rs index 83fdb2a20..6d59f0c73 100644 --- a/rust/crates/sift_stream/src/stream/mod.rs +++ b/rust/crates/sift_stream/src/stream/mod.rs @@ -37,6 +37,10 @@ pub(crate) mod flow; /// Task-based architecture for non-blocking SiftStream operations pub mod tasks; +/// Error types returned by [`Transport`] send methods. +pub mod send_error; +pub use send_error::{SendError, SiftStreamSendError, SiftStreamTrySendError, TrySendError}; + #[cfg(test)] mod test; @@ -65,33 +69,135 @@ pub trait Encoder: private::Sealed { type Message: Send + Sync; } -/// A trait that defines how data is transmitted, or streamed. +/// Defines how encoded telemetry messages are delivered to their destination. +/// +/// Two concrete implementations are provided: +/// +/// - [`LiveStreaming`](crate::LiveStreaming) — delivers messages via a gRPC stream to Sift in +/// real-time, with optional disk backups and periodic checkpointing. +/// - [`FileBackup`](crate::FileBackup) — writes messages to rolling disk backup files without +/// connecting to Sift. +/// +/// ## Send API +/// +/// Each implementation exposes four send methods that differ in their backpressure behaviour: +/// +/// | Method | Blocks? | Error on failure | +/// |---|---|---| +/// | [`send`](Transport::send) | Yes — awaits until the channel has capacity | [`SendError`] with the undelivered message | +/// | [`send_requests`](Transport::send_requests) | Yes — per-message backpressure | [`SendError>`] with all undelivered messages | +/// | [`try_send`](Transport::try_send) | No — returns immediately | [`TrySendError`] as `Full(T)` or `Closed(T)` | +/// | [`try_send_requests`](Transport::try_send_requests) | No — fails on first undeliverable message | [`TrySendError>`] with all undelivered | +/// +/// In every failure case the undelivered message(s) are returned inside the error variant so +/// that the caller can decide whether to retry, log, buffer locally, or discard them. +/// +/// ## Channel semantics for `LiveStreaming` /// -/// For example, a live streaming implementation might use a -/// gRPC stream to transmit data in real-time to Sift, while -/// an alternative implementation might write data to a file -/// for a more "offline" use-case. +/// `LiveStreaming` maintains two internal bounded channels: +/// +/// - **backup channel** — the primary durability path. All four send methods operate on this +/// channel first. Errors reported by the send API always reflect the state of this channel. +/// - **ingestion channel** — forwards messages to the gRPC task. This channel uses a +/// *force-send* strategy: if it is full, the oldest in-flight message is evicted to make +/// room. It never blocks and never returns `Full`. +/// +/// This trait is sealed: only implementations within this crate are permitted. #[async_trait] pub trait Transport: private::Sealed { type Message: Send + Sync; type Encoder: Encoder; - /// Send a [`Self::Message`]. - fn send(&mut self, stream_id: &Uuid, message: Self::Message) -> Result<()>; + /// Send a single message with backpressure. + /// + /// Awaits until the backing channel has capacity, then delivers the message. + /// + /// # Errors + /// + /// Returns [`SendError`] containing a potentially undelivered message. + /// + /// Depending on the implementation of [`Transport`], the undelivered message is not + /// necessarily the message that was provided to the current invocation of [`Self::send`]. + /// + /// See implementation documentation for details. + async fn send( + &mut self, + stream_id: &Uuid, + message: Self::Message, + ) -> std::result::Result<(), SendError>; + + /// Send a batch of messages with backpressure. + /// + /// Awaits channel capacity for each message in turn. Stops on the first failure and + /// returns the failed message together with all remaining (not-yet-attempted) messages. + /// + /// # Errors + /// + /// Returns [`SendError>`] containing potentially undelivered messages. + /// + /// Depending on the implementation of [`Transport`], the undelivered messages are not + /// necessarily the messages that were provided to the current invocation of [`Self::send_requests`]. + /// + /// See implementation documentation for details. + async fn send_requests( + &mut self, + stream_id: &Uuid, + requests: I, + ) -> std::result::Result<(), SendError>> + where + I: IntoIterator + Send, + I::IntoIter: Send; + + /// Attempt to send a single message without blocking. + /// + /// Returns immediately regardless of whether the channel has capacity. + /// + /// # Errors + /// + /// Returns [`TrySendError`] containing a potentially undelivered message: + /// - [`TrySendError::Full`] — the channel is at capacity; consider retrying with + /// [`send`](Transport::send) to apply backpressure instead. + /// - [`TrySendError::Closed`] — the channel has been closed. + /// + /// Depending on the implementation of [`Transport`], the undelivered messages are not + /// necessarily the messages that were provided to the current invocation of [`Self::try_send`]. + /// + /// See implementation documentation for details. + fn try_send( + &mut self, + stream_id: &Uuid, + message: Self::Message, + ) -> std::result::Result<(), TrySendError>; - /// Send a batch of messages via an iterator. + /// Attempt to send a batch of messages without blocking. + /// + /// Calls [`try_send`](Transport::try_send) for each message in turn. Returns immediately + /// on the first failure, bundling the failed message with any remaining unprocessed + /// messages. + /// + /// # Errors /// - /// This method is used as a more performant way to send a batch of messages, assuming - /// the iterator itself is not performing substantial work. + /// Returns [`TrySendError>`] containing potentially undelivered messages. + /// - [`TrySendError::Full`] — the channel was at capacity for one of the messages. + /// - [`TrySendError::Closed`] — the channel was closed. /// - /// However, this is less convenient since the caller will need to ensure the - /// resulting [`Self::Message`]s are properly created. - fn send_requests(&mut self, stream_id: &Uuid, requests: I) -> Result<()> + /// Depending on the implementation of [`Transport`], the undelivered messages are not + /// necessarily the messages that were provided to the current invocation of [`Self::try_send_requests`]. + /// + /// See implementation documentation for details. + fn try_send_requests( + &mut self, + stream_id: &Uuid, + requests: I, + ) -> std::result::Result<(), TrySendError>> where I: IntoIterator + Send, I::IntoIter: Send; - /// Finish the stream. The mode implementation handles the actual cleanup logic. + /// Flush any remaining messages and cleanly shut down the transport. + /// + /// Must be called when ingestion is complete. Dropping a [`SiftStream`] without + /// calling `finish` may result in tail-end data not reaching Sift. async fn finish(self, stream_id: &Uuid) -> Result<()>; } @@ -149,43 +255,127 @@ where self.run.as_ref() } - /// The entry-point to send actual telemetry to Sift in the form of [`Flow`](mode::ingestion_config::Flow)s. - pub async fn send(&mut self, message: M) -> Result<()> + /// Send telemetry with backpressure. + /// + /// Encodes `message` and then awaits until the backing channel has capacity. For + /// [`LiveStreaming`](crate::LiveStreaming) this is the **backup channel**; for + /// [`FileBackup`](crate::FileBackup) this is the write channel. + /// + /// Use this method when you want the caller to slow down naturally when the pipeline + /// is under load. For a non-blocking alternative see [`try_send`](SiftStream::try_send). + /// + /// # Errors + /// + /// - [`SiftStreamSendError::EncodeError`] — the message could not be encoded. This + /// indicates a schema mismatch or invalid value and is not recoverable by retrying. + /// - [`SiftStreamSendError::ChannelClosed`] — the backing channel was closed before the + /// message could be delivered. The undelivered message is returned inside the variant. + /// + /// # Cancellation safety + /// + /// If the returned future is dropped while waiting for channel capacity, no message is + /// lost — either the send completed before the drop, or the channel slot was never taken. + pub async fn send( + &mut self, + message: M, + ) -> std::result::Result<(), SiftStreamSendError<::Message>> where M: Encodeable::Message> + Send + Sync, { let encoded = message .encode(&mut self.encoder, &self.sift_stream_id, self.run.as_ref()) - .ok_or(Error::new_msg( - ErrorKind::EncodeMessageError, - "Failed to encode message", - ))?; + .ok_or_else(|| SiftStreamSendError::encode_error("Failed to encode message"))?; - self.transport.send(&self.sift_stream_id, encoded) + self.transport + .send(&self.sift_stream_id, encoded) + .await + .map_err(|SendError(msg)| SiftStreamSendError::ChannelClosed(msg)) } - /// This method offers a way to send data in a manner that's identical to the raw - /// [`gRPC service`] for ingestion-config based streaming. + /// Send a batch of pre-encoded requests with backpressure. + /// + /// Awaits channel capacity for each request in turn. Stops on the first failure and + /// returns all undelivered messages (the failing one plus any not yet attempted). + /// + /// Unlike [`send`](SiftStream::send), this method accepts pre-encoded + /// [`Transport::Message`](crate::stream::Transport::Message) values directly, bypassing + /// the encode step. Use [`FlowBuilder`](crate::FlowBuilder) to construct them for maximum + /// performance. + /// + /// # Errors /// - /// [`gRPC service`]: https://github.com/sift-stack/sift/blob/main/protos/sift/ingest/v1/ingest.proto#L11 - pub async fn send_requests(&mut self, requests: I) -> Result<()> + /// [`SendError>`] containing every message that was not delivered. + pub async fn send_requests( + &mut self, + requests: I, + ) -> std::result::Result<(), SendError::Message>>> where I: IntoIterator::Message> + Send, I::IntoIter: Send, { - self.transport.send_requests(&self.sift_stream_id, requests) + self.transport + .send_requests(&self.sift_stream_id, requests) + .await } - /// This method offers a way to send data in a manner that's identical to the raw - /// [`gRPC service`] for ingestion-config based streaming. + /// Attempt to send telemetry without blocking. + /// + /// Encodes `message` and immediately attempts to place it on the backing channel. Returns + /// at once regardless of whether the channel has capacity. + /// + /// Use this method in tight loops or real-time contexts where blocking is unacceptable. + /// For backpressure-aware sending see [`send`](SiftStream::send). + /// + /// # Errors + /// + /// - [`SiftStreamTrySendError::EncodeError`] — the message could not be encoded. + /// - [`SiftStreamTrySendError::Channel`] wrapping one of: + /// - [`TrySendError::Full`] — the backing channel is at capacity; the undelivered + /// message is returned. Consider switching to [`send`](SiftStream::send) to apply + /// backpressure, or retrying after a short delay. + /// - [`TrySendError::Closed`] — the backing channel has been closed; the undelivered + /// message is returned. + pub fn try_send( + &mut self, + message: M, + ) -> std::result::Result<(), SiftStreamTrySendError<::Message>> + where + M: Encodeable::Message> + Send + Sync, + { + let encoded = message + .encode(&mut self.encoder, &self.sift_stream_id, self.run.as_ref()) + .ok_or_else(|| SiftStreamTrySendError::encode_error("Failed to encode message"))?; + + self.transport + .try_send(&self.sift_stream_id, encoded) + .map_err(SiftStreamTrySendError::Channel) + } + + /// Attempt to send a batch of pre-encoded requests without blocking. + /// + /// Calls `try_send` on the backing channel for each request. Returns immediately on + /// the first failure with every undelivered message (the failing one plus any not yet + /// attempted). + /// + /// Unlike [`try_send`](SiftStream::try_send), this method accepts pre-encoded + /// [`Transport::Message`](crate::stream::Transport::Message) values directly. Use + /// [`FlowBuilder`](crate::FlowBuilder) to construct them for maximum performance. + /// + /// # Errors /// - /// [`gRPC service`]: https://github.com/sift-stack/sift/blob/main/protos/sift/ingest/v1/ingest.proto#L11 - pub fn send_requests_nonblocking(&mut self, requests: I) -> Result<()> + /// [`TrySendError>`] containing every message that was not delivered: + /// - [`TrySendError::Full`] — the backing channel was at capacity. + /// - [`TrySendError::Closed`] — the backing channel was closed. + pub fn try_send_requests( + &mut self, + requests: I, + ) -> std::result::Result<(), TrySendError::Message>>> where I: IntoIterator::Message> + Send, I::IntoIter: Send, { - self.transport.send_requests(&self.sift_stream_id, requests) + self.transport + .try_send_requests(&self.sift_stream_id, requests) } /// Gracefully finish the stream, draining any remaining data before returning. diff --git a/rust/crates/sift_stream/src/stream/mode/file_backup.rs b/rust/crates/sift_stream/src/stream/mode/file_backup.rs index daeab0f7c..60daf4eb9 100644 --- a/rust/crates/sift_stream/src/stream/mode/file_backup.rs +++ b/rust/crates/sift_stream/src/stream/mode/file_backup.rs @@ -1,4 +1,5 @@ use crate::stream::mode::ingestion_config::IngestionConfigEncoder; +use crate::stream::send_error::{SendError, TrySendError}; use crate::stream::{SiftStream, Transport, private::Sealed}; use crate::{ DiskBackupPolicy, RetryPolicy, @@ -106,12 +107,12 @@ pub struct FileBackup { // Seal the trait - only this crate can implement SiftStreamMode impl Sealed for FileBackup {} -#[async_trait] -impl Transport for FileBackup { - type Encoder = IngestionConfigEncoder; - type Message = IngestWithConfigDataStreamRequest; - - fn send(&mut self, stream_id: &Uuid, message: Self::Message) -> Result<()> { +impl FileBackup { + fn prepare_message( + &mut self, + stream_id: &Uuid, + message: IngestWithConfigDataStreamRequest, + ) -> Arc { self.metrics.messages_received.increment(); #[cfg(feature = "tracing")] @@ -119,43 +120,127 @@ impl Transport for FileBackup { if !self.flows_seen.contains(&message.flow) { self.metrics.unique_flows_received.increment(); self.flows_seen.insert(message.flow.clone()); - tracing::info!( - sift_stream_id = %stream_id, - "flow '{}' being ingested for the first time", - &message.flow, - ); + tracing::info!(sift_stream_id = %stream_id, "flow '{}' being ingested for the first time", &message.flow); } } - // Track the backup channel depth. self.metrics .backup_channel_depth .set(self.write_tx.len() as u64); + Arc::new(message) + } +} - // Send the request to the background write task (non-blocking) - let request_arc = Arc::new(message); - self.write_tx.try_send(request_arc).map_err(|e| { - if e.is_full() { - Error::new_msg(ErrorKind::StreamError, "file backup write channel is full") - } else { - Error::new_msg( - ErrorKind::StreamError, - format!("file backup write channel is closed: {e}"), - ) - } - })?; +#[async_trait] +impl Transport for FileBackup { + type Encoder = IngestionConfigEncoder; + type Message = IngestWithConfigDataStreamRequest; + + /// Sends a message to be written to the backup file, awaiting capacity if the stream + /// is busy. + /// + /// Returns an error only if the stream has been closed, in which case the original + /// message is returned inside `Err`. Normal backpressure is handled transparently by + /// waiting. + async fn send( + &mut self, + stream_id: &Uuid, + message: Self::Message, + ) -> std::result::Result<(), SendError> { + let arc = self.prepare_message(stream_id, message); + + self.write_tx + .send(arc) + .await + .map_err(|async_channel::SendError(a)| { + SendError(Arc::try_unwrap(a).unwrap_or_else(|a| (*a).clone())) + })?; self.metrics.messages_sent.increment(); Ok(()) } - fn send_requests(&mut self, stream_id: &Uuid, requests: I) -> Result<()> + /// Attempts to send a message without blocking. + /// + /// Returns immediately with `TrySendError::Full` if the stream is at capacity, or + /// `TrySendError::Closed` if the stream has been closed. In either case the original + /// message is returned unchanged. + fn try_send( + &mut self, + stream_id: &Uuid, + message: Self::Message, + ) -> std::result::Result<(), TrySendError> { + let arc = self.prepare_message(stream_id, message); + + match self.write_tx.try_send(arc) { + Ok(()) => { + self.metrics.messages_sent.increment(); + Ok(()) + } + Err(async_channel::TrySendError::Full(a)) => Err(TrySendError::Full( + Arc::try_unwrap(a).unwrap_or_else(|a| (*a).clone()), + )), + Err(async_channel::TrySendError::Closed(a)) => Err(TrySendError::Closed( + Arc::try_unwrap(a).unwrap_or_else(|a| (*a).clone()), + )), + } + } + + /// Sends a batch of messages in order to be written to the backup file, awaiting + /// capacity for each one. + /// + /// On stream close, stops immediately and returns the undelivered messages starting + /// from the point of failure. The first element of the returned `Vec` is always the + /// message that failed to send. + async fn send_requests( + &mut self, + stream_id: &Uuid, + requests: I, + ) -> std::result::Result<(), SendError>> where - I: IntoIterator + Send, + I: IntoIterator + Send, I::IntoIter: Send, { - for req in requests { - self.send(stream_id, req)?; + let mut iter = requests.into_iter(); + while let Some(msg) = iter.next() { + if let Err(SendError(failed)) = self.send(stream_id, msg).await { + let mut undelivered = vec![failed]; + undelivered.extend(iter); + return Err(SendError(undelivered)); + } + } + Ok(()) + } + + /// Attempts to send a batch of messages in order without blocking. + /// + /// Stops and returns on the first failure. The returned `Vec` contains the undelivered + /// messages starting from the point of failure, with the first element always being + /// the message that failed to send. + fn try_send_requests( + &mut self, + stream_id: &Uuid, + requests: I, + ) -> std::result::Result<(), TrySendError>> + where + I: IntoIterator + Send, + I::IntoIter: Send, + { + let mut iter = requests.into_iter(); + while let Some(msg) = iter.next() { + match self.try_send(stream_id, msg) { + Ok(()) => {} + Err(TrySendError::Full(failed)) => { + let mut undelivered = vec![failed]; + undelivered.extend(iter); + return Err(TrySendError::Full(undelivered)); + } + Err(TrySendError::Closed(failed)) => { + let mut undelivered = vec![failed]; + undelivered.extend(iter); + return Err(TrySendError::Closed(undelivered)); + } + } } Ok(()) } @@ -743,7 +828,7 @@ mod tests { let request = create_test_request("test_flow", &ingestion_config.ingestion_config_id); // Send the request - mode.send(&sift_stream_id, request).unwrap(); + mode.try_send(&sift_stream_id, request).unwrap(); // Wait for the background task to process the message wait_for_backup_metrics(&metrics, 1, 1000).await; @@ -782,9 +867,9 @@ mod tests { let request2 = create_test_request("flow2", &ingestion_config.ingestion_config_id); let request3 = create_test_request("flow1", &ingestion_config.ingestion_config_id); // Duplicate - mode.send(&sift_stream_id, request1).unwrap(); - mode.send(&sift_stream_id, request2).unwrap(); - mode.send(&sift_stream_id, request3).unwrap(); + mode.try_send(&sift_stream_id, request1).unwrap(); + mode.try_send(&sift_stream_id, request2).unwrap(); + mode.try_send(&sift_stream_id, request3).unwrap(); // Wait for the background task to process all messages wait_for_backup_metrics(&metrics, 3, 1000).await; @@ -825,7 +910,7 @@ mod tests { create_test_request("flow3", &ingestion_config.ingestion_config_id), ]; - mode.send_requests(&sift_stream_id, requests).unwrap(); + mode.try_send_requests(&sift_stream_id, requests).unwrap(); // Wait for the background task to process all messages wait_for_backup_metrics(&metrics, 3, 1000).await; @@ -838,45 +923,6 @@ mod tests { mode.finish(&sift_stream_id).await.unwrap(); } - #[tokio::test] - async fn test_file_backup_mode_send_requests_nonblocking() { - let ingestion_config = create_test_ingestion_config(); - let temp_dir = TempDir::new("test_file_backup").unwrap(); - let metrics = Arc::new(SiftStreamMetrics::default()); - let sift_stream_id = Uuid::new_v4(); - - let file_writer_config = FileWriterConfig { - directory: temp_dir.path().to_path_buf(), - prefix: ingestion_config.client_key.clone(), - max_size: 1024 * 1024, - }; - - let mut mode = create_test_file_backup_mode( - temp_dir.path().to_path_buf(), - file_writer_config, - 1024 * 100, - metrics.clone(), - ) - .await; - - let requests = vec![ - create_test_request("flow1", &ingestion_config.ingestion_config_id), - create_test_request("flow2", &ingestion_config.ingestion_config_id), - ]; - - mode.send_requests(&sift_stream_id, requests).unwrap(); - - // Wait for the background task to process all messages - wait_for_backup_metrics(&metrics, 2, 1000).await; - - assert_eq!(metrics.messages_received.get(), 2); - assert_eq!(metrics.messages_sent.get(), 2); - assert_eq!(metrics.backups.total_messages.get(), 2); - - // Finish to ensure all data is written - mode.finish(&sift_stream_id).await.unwrap(); - } - #[tokio::test] async fn test_file_backup_mode_send_with_flow_descriptor() { let ingestion_config = create_test_ingestion_config(); @@ -912,7 +958,7 @@ mod tests { let request = builder.request(TimeValue::now()); - mode.send(&sift_stream_id, request).unwrap(); + mode.try_send(&sift_stream_id, request).unwrap(); // Wait for the background task to process the message wait_for_backup_metrics(&metrics, 1, 1000).await; @@ -955,7 +1001,7 @@ mod tests { let request = builder.request(TimeValue::now()); // Should still succeed even without flow descriptor - mode.send(&sift_stream_id, request).unwrap(); + mode.try_send(&sift_stream_id, request).unwrap(); // Wait for the background task to process the message wait_for_backup_metrics(&metrics, 1, 1000).await; @@ -1033,4 +1079,155 @@ mod tests { // Finish should succeed and flush data stream.finish().await.unwrap(); } + + // --- Transport send/try_send error-path tests --- + + /// Build a minimal FileBackup backed by a controlled channel. + /// The spawned write task simply drains messages into a black hole so it + /// never blocks, but the *caller* also gets the Receiver so they can + /// withhold reads when they need the channel to appear full or closed. + fn make_file_backup_with_capacity( + cap: usize, + ) -> ( + FileBackup, + async_channel::Receiver>, + ) { + let (write_tx, write_rx) = async_channel::bounded(cap); + let (control_tx, _) = tokio::sync::broadcast::channel(10); + + let fb = FileBackup { + write_tx, + write_task: tokio::spawn(async { Ok(()) }), + control_tx, + metrics_streaming: None, + flows_seen: HashSet::new(), + metrics: Arc::new(SiftStreamMetrics::default()), + }; + + (fb, write_rx) + } + + #[tokio::test] + async fn test_file_backup_try_send_full_returns_full() { + let (mut fb, write_rx) = make_file_backup_with_capacity(1); + let stream_id = Uuid::new_v4(); + let ingestion_config_id = Uuid::new_v4().to_string(); + + // First send fills the channel. + fb.try_send( + &stream_id, + create_test_request("flow1", &ingestion_config_id), + ) + .unwrap(); + + // Second send should fail with Full because the channel is now at capacity. + let req = create_test_request("flow2", &ingestion_config_id); + let flow = req.flow.clone(); + let err = fb.try_send(&stream_id, req).unwrap_err(); + assert!(err.is_full(), "expected Full, got {err}"); + assert_eq!(err.into_inner().flow, flow); + + drop(write_rx); + } + + #[tokio::test] + async fn test_file_backup_try_send_closed_returns_closed() { + let (mut fb, write_rx) = make_file_backup_with_capacity(10); + let stream_id = Uuid::new_v4(); + let ingestion_config_id = Uuid::new_v4().to_string(); + + // Closing the receiver makes every subsequent try_send return Closed. + drop(write_rx); + + let req = create_test_request("flow1", &ingestion_config_id); + let flow = req.flow.clone(); + let err = fb.try_send(&stream_id, req).unwrap_err(); + assert!(err.is_closed(), "expected Closed, got {err}"); + assert_eq!(err.into_inner().flow, flow); + } + + #[tokio::test] + async fn test_file_backup_send_closed_returns_send_error() { + let (mut fb, write_rx) = make_file_backup_with_capacity(10); + let stream_id = Uuid::new_v4(); + let ingestion_config_id = Uuid::new_v4().to_string(); + + drop(write_rx); + + let req = create_test_request("flow1", &ingestion_config_id); + let flow = req.flow.clone(); + let err = fb.send(&stream_id, req).await.unwrap_err(); + assert_eq!(err.into_inner().flow, flow); + } + + #[tokio::test] + async fn test_file_backup_send_blocks_until_space_available() { + let (mut fb, write_rx) = make_file_backup_with_capacity(1); + let stream_id = Uuid::new_v4(); + let ingestion_config_id = Uuid::new_v4().to_string(); + + // Fill the channel so the next async send has to wait. + fb.try_send( + &stream_id, + create_test_request("flow1", &ingestion_config_id), + ) + .unwrap(); + + // Consumer reads after a short delay, freeing space for the blocked send. + tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + let _ = write_rx.recv().await; + // Keep the receiver alive so the channel doesn't close. + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + }); + + fb.send( + &stream_id, + create_test_request("flow2", &ingestion_config_id), + ) + .await + .unwrap(); + } + + #[tokio::test] + async fn test_file_backup_try_send_requests_returns_undelivered_on_full() { + let (mut fb, write_rx) = make_file_backup_with_capacity(1); + let stream_id = Uuid::new_v4(); + let ingestion_config_id = Uuid::new_v4().to_string(); + + // Fill the channel. + fb.try_send( + &stream_id, + create_test_request("flow0", &ingestion_config_id), + ) + .unwrap(); + + let reqs = vec![ + create_test_request("flow1", &ingestion_config_id), + create_test_request("flow2", &ingestion_config_id), + create_test_request("flow3", &ingestion_config_id), + ]; + let err = fb.try_send_requests(&stream_id, reqs).unwrap_err(); + assert!(err.is_full(), "expected Full, got {err}"); + assert_eq!(err.into_inner().len(), 3); + + drop(write_rx); + } + + #[tokio::test] + async fn test_file_backup_send_requests_returns_undelivered_on_closed() { + let (mut fb, write_rx) = make_file_backup_with_capacity(10); + let stream_id = Uuid::new_v4(); + let ingestion_config_id = Uuid::new_v4().to_string(); + + drop(write_rx); + + let reqs = vec![ + create_test_request("flow1", &ingestion_config_id), + create_test_request("flow2", &ingestion_config_id), + create_test_request("flow3", &ingestion_config_id), + ]; + let err = fb.send_requests(&stream_id, reqs).await.unwrap_err(); + assert_eq!(err.into_inner().len(), 3); + } } diff --git a/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs b/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs index 27eb1e2f2..d7845a51f 100644 --- a/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs +++ b/rust/crates/sift_stream/src/stream/mode/ingestion_config.rs @@ -1,5 +1,6 @@ use crate::FlowBuilder; use crate::metrics::{SiftStreamMetrics, SiftStreamMetricsSnapshot}; +use crate::stream::send_error::{SendError, TrySendError}; use crate::stream::{Encodeable, Encoder, MetricsSnapshot, Transport}; use crate::stream::{ SiftStream, @@ -56,34 +57,27 @@ pub struct LiveStreaming { // Seal the trait - only this crate can implement SiftStreamMode impl Sealed for LiveStreaming {} -#[async_trait] -impl Transport for LiveStreaming { - type Encoder = IngestionConfigEncoder; - type Message = IngestWithConfigDataStreamRequest; - - /// Sends the message to Sift for live ingestion, while in parallel also sends a backup of the message to a file. - fn send(&mut self, stream_id: &Uuid, message: Self::Message) -> Result<()> { +impl LiveStreaming { + fn prepare_message( + &mut self, + stream_id: &Uuid, + message: IngestWithConfigDataStreamRequest, + ) -> DataMessage { #[cfg(feature = "tracing")] { if !self.flows_seen.contains(&message.flow) { self.metrics.unique_flows_received.increment(); self.flows_seen.insert(message.flow.clone()); - tracing::info!( - sift_stream_id = %stream_id, - "flow '{}' being ingested for the first time", - &message.flow, - ); + tracing::info!(sift_stream_id = %stream_id, "flow '{}' being ingested for the first time", &message.flow); } } - // Track the channel depths. self.metrics .ingestion_channel_depth .set(self.stream_system.ingestion_tx.len() as u64); self.metrics .backup_channel_depth .set(self.stream_system.backup_tx.len() as u64); - self.metrics.messages_received.increment(); let data_msg = DataMessage { @@ -91,83 +85,230 @@ impl Transport for LiveStreaming { request: Arc::new(message), dropped_for_ingestion: false, }; - self.message_id_counter += 1; + data_msg + } - // Send the message for backup first. If this fails, log an error and continue. - // - // Failure to backup can lead to data loss though it is preferable to attempt - // to stream the message to Sift rather than return the error and prevent both. - // - // TODO(tsift): Make this behavior optional via a builder arg. - if let Err(e) = self.stream_system.backup_tx.try_send(data_msg.clone()) { - #[cfg(feature = "tracing")] - tracing::warn!( - sift_stream_id = %stream_id, - "failed to send data to backup system, data will still be streamed to Sift: {e}" - ); - } - - self.metrics.messages_sent_to_backup.increment(); - - // Send the message for ingestion. - // - // If the channel is full, the oldest message will be removed in order to create space for the newer message. - // For ingestion, newer data is preferred over older data. + /// Used by `async fn send`. If an oldest message is evicted from the ingestion + /// channel, awaits until backup has space to accept it. Returns the undeliverable + /// message on backup channel close or ingestion channel close. + async fn dispatch_to_ingestion( + &mut self, + stream_id: &Uuid, + data_msg: DataMessage, + ) -> Option { match self.stream_system.ingestion_tx.force_send(data_msg) { - Ok(None) => Ok(()), - Ok(Some(mut oldest_message)) => { + Ok(None) => None, + Ok(Some(mut oldest)) => { + oldest.dropped_for_ingestion = true; + self.metrics.old_messages_dropped_for_ingestion.increment(); + self.metrics.checkpoint.failed_checkpoint_count.increment(); + // Block until backup has space. + match self.stream_system.backup_tx.send(oldest).await { + Ok(()) => { + self.metrics.messages_sent_to_backup.increment(); + None + } + Err(async_channel::SendError(dm)) => { + self.metrics + .old_messages_failed_adding_to_backup + .increment(); + #[cfg(feature = "tracing")] + tracing::debug!(sift_stream_id = %stream_id, "backup channel closed while dispatching evicted message"); + Some(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone())) + } + } + } + Err(async_channel::SendError(dm)) => { + // ingestion channel closed — return the message to the caller #[cfg(feature = "tracing")] - tracing::debug!( - sift_stream_id = %stream_id, - "data channel full, dropping oldest message" - ); + tracing::debug!(sift_stream_id = %stream_id, "ingestion channel closed"); + Some(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone())) + } + } + } - oldest_message.dropped_for_ingestion = true; + /// Used by `fn try_send`. If an oldest message is evicted from the ingestion + /// channel and backup is full or closed, returns the evicted message to the + /// caller. Also returns the message when the ingestion channel itself is closed. + fn try_dispatch_to_ingestion( + &mut self, + stream_id: &Uuid, + data_msg: DataMessage, + ) -> Option { + match self.stream_system.ingestion_tx.force_send(data_msg) { + Ok(None) => None, + Ok(Some(mut oldest)) => { + oldest.dropped_for_ingestion = true; self.metrics.old_messages_dropped_for_ingestion.increment(); - self.metrics.messages_sent_to_backup.increment(); self.metrics.checkpoint.failed_checkpoint_count.increment(); + match self.stream_system.backup_tx.try_send(oldest) { + Ok(()) => { + self.metrics.messages_sent_to_backup.increment(); + None + } + Err(async_channel::TrySendError::Full(dm)) => { + self.metrics + .old_messages_failed_adding_to_backup + .increment(); + Some(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone())) + } + Err(async_channel::TrySendError::Closed(dm)) => { + self.metrics + .old_messages_failed_adding_to_backup + .increment(); + #[cfg(feature = "tracing")] + tracing::debug!(sift_stream_id = %stream_id, "backup channel closed while dispatching evicted message"); + Some(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone())) + } + } + } + Err(async_channel::SendError(dm)) => { + // ingestion channel closed — return the message to the caller + #[cfg(feature = "tracing")] + tracing::debug!(sift_stream_id = %stream_id, "ingestion channel closed"); + Some(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone())) + } + } + } +} - // Re-send the oldest message to the backup to ensure it is re-ingested later despite being - // dropped from the ingestion channel. - // - // On failure, rely on metrics to track occurences. Logging can quickly become spammy as - // the system works through bursts of messages so logs are reduced to the debug level. - if let Err(e) = self - .stream_system - .backup_tx - .try_send(oldest_message) - .map_err(|e| Error::new(ErrorKind::StreamError, e)) - .context("failed to send data to backup task system") - { - self.metrics - .old_messages_failed_adding_to_backup - .increment(); +#[async_trait] +impl Transport for LiveStreaming { + type Encoder = IngestionConfigEncoder; + type Message = IngestWithConfigDataStreamRequest; - #[cfg(feature = "tracing")] - tracing::debug!( - sift_stream_id = %stream_id, - "failed to send oldest data to backup task system: {e}" - ); - } + /// Sends a message, awaiting capacity if the stream is busy. + /// + /// This mode prioritizes freshness: newer messages always make it into the stream, and + /// older buffered messages are displaced to make room when necessary. As a result, if + /// the stream is closed while a displaced message is being handled, **the message + /// returned inside `Err` may be older than the message provided**. An error is only + /// returned on stream close; normal backpressure is handled transparently by waiting. + /// + /// Because displaced messages are not automatically retried, enabling file backup + /// recovery mode is strongly recommended. When enabled, displaced messages are written + /// to a local file and can be replayed to Sift at a later time, ensuring no data goes + /// unrecorded. + async fn send( + &mut self, + stream_id: &Uuid, + message: Self::Message, + ) -> std::result::Result<(), SendError> { + let data_msg = self.prepare_message(stream_id, message); - // Do not interupt ingestion. - Ok(()) + self.stream_system + .backup_tx + .send(data_msg.clone()) + .await + .map_err(|async_channel::SendError(dm)| { + SendError(Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone())) + })?; + + self.metrics.messages_sent_to_backup.increment(); + if let Some(displaced) = self.dispatch_to_ingestion(stream_id, data_msg).await { + return Err(SendError(displaced)); + } + Ok(()) + } + + /// Attempts to send a message without blocking. + /// + /// Returns immediately with `TrySendError::Full` or `TrySendError::Closed` if the + /// stream cannot accept data right now. + /// + /// This mode prioritizes freshness: newer messages always make it into the stream, and + /// older buffered messages are displaced to make room when necessary. If such a + /// displacement occurs and the stream is full or closed at that point, **the message + /// returned inside `Err` may be older than the message provided**. Enabling file + /// backup recovery mode is strongly recommended so that any displaced messages are + /// written to a local file and can be replayed to Sift later. + fn try_send( + &mut self, + stream_id: &Uuid, + message: Self::Message, + ) -> std::result::Result<(), TrySendError> { + let data_msg = self.prepare_message(stream_id, message); + + match self.stream_system.backup_tx.try_send(data_msg.clone()) { + Ok(()) => {} + Err(async_channel::TrySendError::Full(dm)) => { + return Err(TrySendError::Full( + Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()), + )); } - Err(e) => Err(Error::new_msg( - ErrorKind::StreamError, - format!("queueing data for ingestion failed: {e}"), - )), + Err(async_channel::TrySendError::Closed(dm)) => { + return Err(TrySendError::Closed( + Arc::try_unwrap(dm.request).unwrap_or_else(|arc| (*arc).clone()), + )); + } + } + + self.metrics.messages_sent_to_backup.increment(); + if let Some(displaced) = self.try_dispatch_to_ingestion(stream_id, data_msg) { + return Err(TrySendError::Full(displaced)); } + Ok(()) } - fn send_requests(&mut self, stream_id: &Uuid, requests: I) -> Result<()> + /// Sends a batch of messages in order, awaiting capacity for each one. + /// + /// On stream close, stops immediately and returns the undelivered messages starting + /// from the point of failure. Because this mode may displace older buffered messages + /// to make room for newer ones (see [`send`](Self::send)), the first element of the + /// returned `Vec` may be an older displaced message rather than the one that was being + /// sent at the time of failure. + async fn send_requests( + &mut self, + stream_id: &Uuid, + requests: I, + ) -> std::result::Result<(), SendError>> where I: IntoIterator + Send, I::IntoIter: Send, { - for req in requests { - self.send(stream_id, req)?; + let mut iter = requests.into_iter(); + while let Some(msg) = iter.next() { + if let Err(SendError(failed)) = self.send(stream_id, msg).await { + let mut undelivered = vec![failed]; + undelivered.extend(iter); + return Err(SendError(undelivered)); + } + } + Ok(()) + } + + /// Attempts to send a batch of messages in order without blocking. + /// + /// Stops and returns on the first failure. The returned `Vec` contains the undelivered + /// messages starting from the point of failure. Because this mode may displace older + /// buffered messages to make room for newer ones (see [`try_send`](Self::try_send)), + /// the first element may be an older displaced message rather than the one that was + /// being sent at the time of failure. + fn try_send_requests( + &mut self, + stream_id: &Uuid, + requests: I, + ) -> std::result::Result<(), TrySendError>> + where + I: IntoIterator + Send, + I::IntoIter: Send, + { + let mut iter = requests.into_iter(); + while let Some(msg) = iter.next() { + match self.try_send(stream_id, msg) { + Ok(()) => {} + Err(TrySendError::Full(failed)) => { + let mut undelivered = vec![failed]; + undelivered.extend(iter); + return Err(TrySendError::Full(undelivered)); + } + Err(TrySendError::Closed(failed)) => { + let mut undelivered = vec![failed]; + undelivered.extend(iter); + return Err(TrySendError::Closed(undelivered)); + } + } } Ok(()) } @@ -563,3 +704,149 @@ impl Stream for DataStream { } } } + +#[cfg(test)] +mod tests { + use super::*; + use crate::stream::tasks::DataMessage; + use std::collections::HashSet; + + fn make_request() -> IngestWithConfigDataStreamRequest { + IngestWithConfigDataStreamRequest { + ingestion_config_id: uuid::Uuid::new_v4().to_string(), + flow: "test_flow".to_string(), + timestamp: None, + channel_values: vec![], + run_id: String::new(), + end_stream_on_validation_error: false, + organization_id: String::new(), + } + } + + fn make_live_streaming( + ingestion_capacity: usize, + backup_capacity: usize, + ) -> ( + LiveStreaming, + async_channel::Receiver, + async_channel::Receiver, + ) { + let (control_tx, _) = broadcast::channel(10); + let (ingestion_tx, ingestion_rx) = async_channel::bounded(ingestion_capacity); + let (backup_tx, backup_rx) = async_channel::bounded(backup_capacity); + + let system = StreamSystem { + backup_manager: tokio::spawn(async { Ok(()) }), + ingestion: tokio::spawn(async { Ok(()) }), + reingestion: tokio::spawn(async { Ok(()) }), + metrics_streaming: None, + control_tx, + ingestion_tx, + backup_tx, + }; + + let live = LiveStreaming { + message_id_counter: 0, + stream_system: system, + flows_seen: HashSet::new(), + metrics: Arc::new(crate::metrics::SiftStreamMetrics::default()), + }; + + (live, ingestion_rx, backup_rx) + } + + #[tokio::test] + async fn test_try_send_backup_closed_returns_closed() { + let (mut live, _ingestion_rx, backup_rx) = make_live_streaming(10, 10); + drop(backup_rx); + let stream_id = uuid::Uuid::new_v4(); + let req = make_request(); + let flow = req.flow.clone(); + let err = live.try_send(&stream_id, req).unwrap_err(); + assert!(err.is_closed(), "expected Closed, got {err}"); + assert_eq!(err.into_inner().flow, flow); + } + + #[tokio::test] + async fn test_try_send_backup_full_returns_full() { + let (mut live, _ingestion_rx, backup_rx) = make_live_streaming(10, 1); + // Pre-fill the backup channel so the next try_send finds it full. + let dummy = DataMessage { + message_id: 0, + request: Arc::new(make_request()), + dropped_for_ingestion: false, + }; + live.stream_system.backup_tx.try_send(dummy).unwrap(); + + let stream_id = uuid::Uuid::new_v4(); + let req = make_request(); + let flow = req.flow.clone(); + let err = live.try_send(&stream_id, req).unwrap_err(); + assert!(err.is_full(), "expected Full, got {err}"); + assert_eq!(err.into_inner().flow, flow); + drop(backup_rx); + } + + #[tokio::test] + async fn test_send_backup_closed_returns_send_error() { + let (mut live, _ingestion_rx, backup_rx) = make_live_streaming(10, 10); + drop(backup_rx); + let stream_id = uuid::Uuid::new_v4(); + let req = make_request(); + let flow = req.flow.clone(); + let err = live.send(&stream_id, req).await.unwrap_err(); + assert_eq!(err.into_inner().flow, flow); + } + + #[tokio::test] + async fn test_send_blocks_until_backup_space_available() { + let (mut live, _ingestion_rx, backup_rx) = make_live_streaming(10, 1); + // Fill the backup channel so send will have to wait. + let dummy = DataMessage { + message_id: 0, + request: Arc::new(make_request()), + dropped_for_ingestion: false, + }; + live.stream_system.backup_tx.try_send(dummy).unwrap(); + + // After a short delay, consume the dummy so send can proceed. + tokio::spawn(async move { + tokio::time::sleep(std::time::Duration::from_millis(20)).await; + let _ = backup_rx.recv().await; + // Keep the receiver alive so the channel stays open. + tokio::time::sleep(std::time::Duration::from_millis(100)).await; + }); + + let stream_id = uuid::Uuid::new_v4(); + live.send(&stream_id, make_request()).await.unwrap(); + } + + #[tokio::test] + async fn test_try_send_requests_returns_undelivered_on_full() { + let (mut live, _ingestion_rx, backup_rx) = make_live_streaming(10, 1); + let dummy = DataMessage { + message_id: 0, + request: Arc::new(make_request()), + dropped_for_ingestion: false, + }; + live.stream_system.backup_tx.try_send(dummy).unwrap(); + + let stream_id = uuid::Uuid::new_v4(); + let reqs = vec![make_request(), make_request(), make_request()]; + let err = live.try_send_requests(&stream_id, reqs).unwrap_err(); + assert!(err.is_full(), "expected Full, got {err}"); + assert_eq!(err.into_inner().len(), 3); + drop(backup_rx); + } + + #[tokio::test] + async fn test_send_requests_returns_undelivered_on_closed() { + let (mut live, _ingestion_rx, backup_rx) = make_live_streaming(10, 10); + drop(backup_rx); + + let stream_id = uuid::Uuid::new_v4(); + let reqs = vec![make_request(), make_request(), make_request()]; + let err = live.send_requests(&stream_id, reqs).await.unwrap_err(); + assert_eq!(err.into_inner().len(), 3); + } +} diff --git a/rust/crates/sift_stream/src/stream/mode/test.rs b/rust/crates/sift_stream/src/stream/mode/test.rs index 432f58e5c..477bfd4e7 100644 --- a/rust/crates/sift_stream/src/stream/mode/test.rs +++ b/rust/crates/sift_stream/src/stream/mode/test.rs @@ -34,7 +34,6 @@ fn validate_handling_empty_values() { ..Default::default() }, ], - ..Default::default() }; let flow_descriptor = FlowDescriptor::try_from(("ingestion_config_id", flow_config)) .expect("flow descriptor should be generated"); @@ -98,7 +97,6 @@ fn validate_handling_no_matches_based_on_name() { ..Default::default() }, ], - ..Default::default() }; let flow_descriptor = FlowDescriptor::try_from(("ingestion_config_id", flow_config)) .expect("flow descriptor should be generated"); @@ -146,7 +144,6 @@ fn validate_handling_no_matches_based_on_type() { ..Default::default() }, ], - ..Default::default() }; let flow_descriptor = FlowDescriptor::try_from(("ingestion_config_id", flow_config)) .expect("flow descriptor should be generated"); diff --git a/rust/crates/sift_stream/src/stream/send_error.rs b/rust/crates/sift_stream/src/stream/send_error.rs new file mode 100644 index 000000000..43f0f0b95 --- /dev/null +++ b/rust/crates/sift_stream/src/stream/send_error.rs @@ -0,0 +1,309 @@ +use sift_error::prelude::{Error as SiftError, ErrorKind}; +use std::fmt; + +/// Returned by the async [`Transport::send`](crate::stream::Transport::send) / +/// [`Transport::send_requests`](crate::stream::Transport::send_requests) when the underlying +/// channel is closed and delivery cannot complete. +/// +/// The inner value `T` is the undelivered message. Typical recovery strategies: +/// +/// - **Log and discard** — if losing the message is acceptable (e.g. high-frequency sensor +/// data where the next sample will arrive momentarily). +/// - **Buffer and retry** — store the message locally and re-attempt once the channel is +/// re-established. +/// - **Propagate as an error** — call [`into_inner`](SendError::into_inner) to recover the +/// message and return it up the call stack for the application to decide. +/// +/// A closed channel usually means the [`SiftStream`](crate::SiftStream) is shutting down. +/// Check that [`SiftStream::finish`](crate::SiftStream::finish) has not already been called. +#[derive(Debug)] +pub struct SendError(pub T); + +impl SendError { + /// Consume the error, returning the undelivered value. + pub fn into_inner(self) -> T { + self.0 + } +} + +impl fmt::Display for SendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "channel closed: failed to send message") + } +} + +impl std::error::Error for SendError {} + +/// Returned by the sync [`Transport::try_send`](crate::stream::Transport::try_send) / +/// [`Transport::try_send_requests`](crate::stream::Transport::try_send_requests) when +/// immediate delivery fails. +/// +/// The undelivered value `T` is always returned inside the variant so the caller can +/// recover it without cloning. +/// +/// ## Variants and recovery +/// +/// - [`Full`](TrySendError::Full) — the channel is currently at capacity. The message has +/// *not* been dropped; the caller can retry later with another `try_send` call, switch to +/// the backpressure-aware [`send`](crate::SiftStream::send), or discard the message if +/// it is stale. +/// +/// - [`Closed`](TrySendError::Closed) — all channel receivers have been dropped. The +/// [`SiftStream`](crate::SiftStream) is shutting down. Retrying on the same stream will +/// not succeed. +#[derive(Debug)] +pub enum TrySendError { + /// The channel has been closed. The undelivered value is returned. + Closed(T), + /// The channel is currently full. The undelivered value is returned. + Full(T), +} + +impl TrySendError { + /// Consume the error, returning the undelivered value. + pub fn into_inner(self) -> T { + match self { + TrySendError::Closed(v) | TrySendError::Full(v) => v, + } + } + + pub fn is_closed(&self) -> bool { + matches!(self, TrySendError::Closed(_)) + } + + pub fn is_full(&self) -> bool { + matches!(self, TrySendError::Full(_)) + } +} + +impl fmt::Display for TrySendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + TrySendError::Closed(_) => write!(f, "channel closed: failed to send message"), + TrySendError::Full(_) => write!(f, "channel full: failed to send message"), + } + } +} + +impl std::error::Error for TrySendError {} + +/// Returned by [`SiftStream::send`](crate::SiftStream::send) when delivery fails. +/// +/// This is the top-level error type for the high-level send API. It distinguishes +/// encode-time failures from channel-level failures so callers can handle them +/// differently. +#[derive(Debug)] +pub enum SiftStreamSendError { + /// The message could not be encoded before it was sent. + /// + /// This typically indicates a schema mismatch or an invalid value in the message. + /// Retrying the same message without correcting the schema will not succeed. + EncodeError(SiftError), + + /// The backing channel closed before the message could be delivered. + /// + /// The undelivered message is returned inside the variant. See the recovery + /// guidance on [`SendError`] for options. + ChannelClosed(T), +} + +impl fmt::Display for SiftStreamSendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SiftStreamSendError::EncodeError(e) => write!(f, "encode error: {e}"), + SiftStreamSendError::ChannelClosed(_) => { + write!(f, "channel closed: failed to send message") + } + } + } +} + +impl std::error::Error for SiftStreamSendError {} + +impl SiftStreamSendError { + /// Convert an encode failure into this error type. Used internally. + pub(crate) fn encode_error(msg: &str) -> Self { + SiftStreamSendError::EncodeError(SiftError::new_msg(ErrorKind::EncodeMessageError, msg)) + } +} + +/// Returned by [`SiftStream::try_send`](crate::SiftStream::try_send) when immediate +/// delivery fails. +/// +/// This is the top-level error type for the non-blocking send API. It distinguishes +/// encode-time failures from channel-level failures so callers can handle them +/// differently. +#[derive(Debug)] +pub enum SiftStreamTrySendError { + /// The message could not be encoded before it was sent. + /// + /// This typically indicates a schema mismatch or an invalid value in the message. + /// Retrying the same message without correcting the schema will not succeed. + EncodeError(SiftError), + + /// The backing channel was full or closed. + /// + /// The inner [`TrySendError`] carries the undelivered message. Inspect the variant + /// to distinguish between a transient backpressure condition (`Full`) and a permanent + /// shutdown (`Closed`). See [`TrySendError`] for recovery guidance. + Channel(TrySendError), +} + +impl fmt::Display for SiftStreamTrySendError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SiftStreamTrySendError::EncodeError(e) => write!(f, "encode error: {e}"), + SiftStreamTrySendError::Channel(e) => write!(f, "{e}"), + } + } +} + +impl std::error::Error for SiftStreamTrySendError {} + +impl SiftStreamTrySendError { + /// Convert an encode failure into this error type. Used internally. + pub(crate) fn encode_error(msg: &str) -> Self { + SiftStreamTrySendError::EncodeError(SiftError::new_msg(ErrorKind::EncodeMessageError, msg)) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn send_error_into_inner() { + let err = SendError(42u32); + assert_eq!(err.into_inner(), 42u32); + } + + #[test] + fn send_error_display() { + let err = SendError("msg"); + assert!(err.to_string().contains("channel closed")); + } + + #[test] + fn send_error_is_error() { + fn assert_error(_: &E) {} + let err = SendError(0u8); + assert_error(&err); + } + + #[test] + fn try_send_error_into_inner_closed() { + let err: TrySendError = TrySendError::Closed(7); + assert_eq!(err.into_inner(), 7); + } + + #[test] + fn try_send_error_into_inner_full() { + let err: TrySendError = TrySendError::Full(9); + assert_eq!(err.into_inner(), 9); + } + + #[test] + fn try_send_error_is_closed() { + assert!(TrySendError::::Closed(0).is_closed()); + assert!(!TrySendError::::Full(0).is_closed()); + } + + #[test] + fn try_send_error_is_full() { + assert!(TrySendError::::Full(0).is_full()); + assert!(!TrySendError::::Closed(0).is_full()); + } + + #[test] + fn try_send_error_display() { + assert!( + TrySendError::::Closed(0) + .to_string() + .contains("channel closed") + ); + assert!( + TrySendError::::Full(0) + .to_string() + .contains("channel full") + ); + } + + #[test] + fn try_send_error_is_error() { + fn assert_error(_: &E) {} + let err = TrySendError::Closed(0u8); + assert_error(&err); + } + + #[test] + fn try_send_error_debug() { + let closed = TrySendError::Closed(42u32); + let full = TrySendError::Full(42u32); + assert!(format!("{:?}", closed).contains("Closed")); + assert!(format!("{:?}", full).contains("Full")); + } + + // SiftStreamSendError tests + + #[test] + fn sift_stream_send_error_encode_error_display() { + let err = SiftStreamSendError::::encode_error("bad encoding"); + assert!(err.to_string().contains("encode error")); + } + + #[test] + fn sift_stream_send_error_channel_closed_display() { + let err = SiftStreamSendError::ChannelClosed(42u32); + assert!(err.to_string().contains("channel closed")); + } + + #[test] + fn sift_stream_send_error_is_error() { + fn assert_error(_: &E) {} + let err = SiftStreamSendError::ChannelClosed(0u8); + assert_error(&err); + } + + #[test] + fn sift_stream_send_error_debug() { + let err = SiftStreamSendError::ChannelClosed(42u32); + assert!(format!("{:?}", err).contains("ChannelClosed")); + let err2 = SiftStreamSendError::::encode_error("oops"); + assert!(format!("{:?}", err2).contains("EncodeError")); + } + + // SiftStreamTrySendError tests + + #[test] + fn sift_stream_try_send_error_encode_error_display() { + let err = SiftStreamTrySendError::::encode_error("bad"); + assert!(err.to_string().contains("encode error")); + } + + #[test] + fn sift_stream_try_send_error_channel_full_display() { + let err = SiftStreamTrySendError::Channel(TrySendError::Full(42u32)); + assert!(err.to_string().contains("channel full")); + } + + #[test] + fn sift_stream_try_send_error_channel_closed_display() { + let err = SiftStreamTrySendError::Channel(TrySendError::Closed(42u32)); + assert!(err.to_string().contains("channel closed")); + } + + #[test] + fn sift_stream_try_send_error_is_error() { + fn assert_error(_: &E) {} + let err = SiftStreamTrySendError::Channel(TrySendError::Closed(0u8)); + assert_error(&err); + } + + #[test] + fn sift_stream_try_send_error_debug() { + let err = SiftStreamTrySendError::Channel(TrySendError::Full(42u32)); + assert!(format!("{:?}", err).contains("Full")); + let err2 = SiftStreamTrySendError::::encode_error("oops"); + assert!(format!("{:?}", err2).contains("EncodeError")); + } +} diff --git a/rust/crates/sift_stream/src/stream/tasks.rs b/rust/crates/sift_stream/src/stream/tasks.rs index 64fd0d636..eda69919a 100644 --- a/rust/crates/sift_stream/src/stream/tasks.rs +++ b/rust/crates/sift_stream/src/stream/tasks.rs @@ -609,7 +609,9 @@ impl MetricsStreamingTask { let metrics = self.metrics.snapshot(); let values = metrics.channel_values(&self.session_name); let flow = Flow::new(METRICS_STREAMING_FLOW_NAME, TimeValue::now(), &values); - self.stream.send(flow).await?; + self.stream.send(flow).await.map_err(|e| { + Error::new_msg(ErrorKind::StreamError, e.to_string()) + })?; } ctrl_msg = self.control_rx.recv() => { match ctrl_msg { diff --git a/rust/crates/sift_stream/src/stream/test.rs b/rust/crates/sift_stream/src/stream/test.rs index 8ceab97a0..d5666c8e3 100644 --- a/rust/crates/sift_stream/src/stream/test.rs +++ b/rust/crates/sift_stream/src/stream/test.rs @@ -401,6 +401,58 @@ async fn test_sift_stream_builder_load_ingestion_config_with_new_flows() { assert_eq!(flows.len(), 2); } +#[tokio::test] +async fn test_sift_stream_try_send_smoke() { + let backups_dir = uuid::Uuid::new_v4().to_string(); + let tmp_dir = TempDir::new(&backups_dir).expect("failed to create tempdir"); + let tmp_dir_path = tmp_dir.path(); + + let ingestion_config = IngestionConfigForm { + asset_name: "test_asset".to_string(), + client_key: "test_client_key_try_send".to_string(), + flows: vec![crate::FlowConfig { + name: "try_send_flow".to_string(), + channels: vec![crate::ChannelConfig { + name: "value".to_string(), + data_type: sift_rs::common::r#type::v1::ChannelDataType::Double.into(), + ..Default::default() + }], + }], + }; + + let disk_backup_policy = DiskBackupPolicy { + backups_dir: Some(tmp_dir_path.to_path_buf()), + ..Default::default() + }; + let retry_policy = crate::RetryPolicy::default(); + let (grpc_channel, _mock_service) = crate::test::create_mock_grpc_channel_with_service().await; + + let mut sift_stream = SiftStreamBuilder::from_channel(grpc_channel) + .ingestion_config(ingestion_config) + .recovery_strategy(RecoveryStrategy::RetryWithBackups { + retry_policy, + disk_backup_policy, + }) + .metrics_streaming_interval(None) + .build() + .await + .expect("failed to build sift stream"); + + // try_send should succeed when the channel has room. + let result = sift_stream.try_send(crate::Flow::new( + "try_send_flow", + crate::TimeValue::now(), + &[crate::ChannelValue::new("value", 1.0_f64)], + )); + assert!(result.is_ok(), "try_send failed: {result:?}"); + + tokio::time::timeout(Duration::from_secs(10), async { + sift_stream.finish().await.expect("failed to finish"); + }) + .await + .expect("timeout waiting for finish"); +} + #[tokio::test(flavor = "current_thread")] async fn test_sift_stream_ingestion_and_backup_channels_fill_up() { let backups_dir = uuid::Uuid::new_v4().to_string(); @@ -449,7 +501,7 @@ async fn test_sift_stream_ingestion_and_backup_channels_fill_up() { // Send a burst of messages that will cause the ingestion and backup channels to fill up. // - // Since this test is running in single-threded mode, and `send_requests_nonblocking` is not async, + // Since this test is running in single-threaded mode, and `try_send_requests` is not async, // sending all the messages should occur before the background tasks have a chance to run // and create space. for data in 0..100 { diff --git a/rust/crates/sift_stream/src/test.rs b/rust/crates/sift_stream/src/test.rs index a3935c3cb..9216d440e 100644 --- a/rust/crates/sift_stream/src/test.rs +++ b/rust/crates/sift_stream/src/test.rs @@ -138,7 +138,7 @@ impl IngestionConfigService for MockIngestionConfigService { } Ok(Response::new(ListIngestionConfigsResponse { - ingestion_configs: ingestion_configs, + ingestion_configs, next_page_token: "".to_string(), })) } diff --git a/rust/crates/sift_stream/tests/common/mod.rs b/rust/crates/sift_stream/tests/common/mod.rs index daab49f42..b6d661c4a 100644 --- a/rust/crates/sift_stream/tests/common/mod.rs +++ b/rust/crates/sift_stream/tests/common/mod.rs @@ -136,7 +136,7 @@ impl IngestionConfigService for MockIngestionConfigService { } Ok(Response::new(ListIngestionConfigsResponse { - ingestion_configs: ingestion_configs, + ingestion_configs, next_page_token: "".to_string(), })) } diff --git a/rust/crates/sift_stream/tests/test_ingestion_config_streaming_retries.rs b/rust/crates/sift_stream/tests/test_ingestion_config_streaming_retries.rs index 64635cf0f..2a1fd1303 100644 --- a/rust/crates/sift_stream/tests/test_ingestion_config_streaming_retries.rs +++ b/rust/crates/sift_stream/tests/test_ingestion_config_streaming_retries.rs @@ -107,7 +107,7 @@ async fn test_retries_succeed() { // Send some messages while the server is returning errors. // // None of these messages should be captured by the mock server. - while num_streams_opened.load(Ordering::Relaxed) < 3 as u32 { + while num_streams_opened.load(Ordering::Relaxed) < 3_u32 { let msg = Flow::new( "flow", TimeValue::from(Local::now().to_utc()), diff --git a/rust/crates/sift_stream_bindings/sift_stream_bindings.pyi b/rust/crates/sift_stream_bindings/sift_stream_bindings.pyi index d49ac2a00..658484800 100644 --- a/rust/crates/sift_stream_bindings/sift_stream_bindings.pyi +++ b/rust/crates/sift_stream_bindings/sift_stream_bindings.pyi @@ -444,7 +444,7 @@ class SiftStreamPy: def send(self, flow:FlowPy) -> typing.Any: ... def batch_send(self, flows:typing.Any) -> typing.Any: ... def send_requests(self, requests:typing.Sequence[IngestWithConfigDataStreamRequestPy]) -> typing.Any: ... - def send_requests_nonblocking(self, flows:typing.Any) -> None: ... + def try_send_requests(self, flows:typing.Any) -> None: ... def get_metrics_snapshot(self) -> SiftStreamMetricsSnapshotPy: ... def add_new_flows(self, flow_configs:typing.Sequence[FlowConfigPy]) -> typing.Any: ... def get_flow_descriptor(self, flow_name:builtins.str) -> FlowDescriptorPy: ... diff --git a/rust/crates/sift_stream_bindings/src/stream/mod.rs b/rust/crates/sift_stream_bindings/src/stream/mod.rs index cfd496df9..1c0e9803c 100644 --- a/rust/crates/sift_stream_bindings/src/stream/mod.rs +++ b/rust/crates/sift_stream_bindings/src/stream/mod.rs @@ -63,6 +63,14 @@ impl From for Flow { } } +// Helper: convert any error whose Display impl does not print the message payload into a PyErr. +// All sift_stream send error types (SiftStreamSendError, SendError, TrySendError, +// SiftStreamTrySendError) intentionally omit the inner T from their Display output, so +// format!("{e}") is safe to use regardless of how large the undelivered message(s) may be. +fn py_err(e: impl std::fmt::Display) -> PyErr { + PyErr::new::(format!("{e}")) +} + // PyO3 Method Implementations #[gen_stub_pymethods] #[pymethods] @@ -79,10 +87,7 @@ impl SiftStreamPy { })?; let flow_rs: Flow = flow.into(); - match stream.send(flow_rs).await { - Ok(_) => Ok(()), - Err(e) => Err(SiftErrorWrapper(e).into()), - } + stream.send(flow_rs).await.map_err(py_err) })?; Ok(awaitable.into()) @@ -113,10 +118,7 @@ impl SiftStreamPy { for flow in flows_vec { let flow_rs: Flow = flow.into(); - match stream.send(flow_rs).await { - Ok(_) => (), - Err(e) => return Err(SiftErrorWrapper(e).into()), - } + stream.send(flow_rs).await.map_err(py_err)?; } Ok(()) })?; @@ -141,16 +143,13 @@ impl SiftStreamPy { ) })?; - match stream.send_requests(requests).await { - Ok(_) => Ok(()), - Err(e) => Err(SiftErrorWrapper(e).into()), - } + stream.send_requests(requests).await.map_err(py_err) })?; Ok(awaitable.into()) } - pub fn send_requests_nonblocking(&self, flows: &Bound<'_, PyAny>) -> PyResult<()> { + pub fn try_send_requests(&self, flows: &Bound<'_, PyAny>) -> PyResult<()> { let flow_iter = PyIterator::from_object(flows)?; let mut flows_vec: Vec = Vec::new(); for item in flow_iter { @@ -165,9 +164,7 @@ impl SiftStreamPy { ) })?; - stream - .send_requests_nonblocking(flows_vec) - .map_err(|e| SiftErrorWrapper(e).into()) + stream.try_send_requests(flows_vec).map_err(py_err) } pub fn get_metrics_snapshot(&self) -> PyResult {