Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions rust/crates/sift_stream/examples/backups-only/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ async fn run() -> Result<(), Box<dyn Error>> {
tokio::time::sleep(Duration::from_millis(100)).await;
}

// Next, stream telemetry to backup files using the [`SiftStream::send_requests_nonblocking`] method
// Next, stream telemetry to backup files using the [`SiftStream::try_send_requests`] method
// and the [`FlowBuilder`] to build the flow.
//
// This approach is more performant, and also provides methods to set the channel value via
Expand All @@ -116,15 +116,15 @@ async fn run() -> Result<(), Box<dyn Error>> {
let run_id = sift_stream.run().unwrap().run_id.clone();
for i in 0..360 {
// Build the flow using the [`FlowBuilder`] and send it to
// Sift using the [`SiftStream::send_requests_nonblocking`] method.
// Sift using the [`SiftStream::try_send_requests`] method.
let mut flow_builder = FlowBuilder::new(&descriptor);
flow_builder.attach_run_id(&run_id);
flow_builder
.set_with_key("joint-angle-encoder", f64::from(i).sin())
.unwrap();

sift_stream
.send_requests_nonblocking(vec![flow_builder.request(TimeValue::now())])
.try_send_requests(vec![flow_builder.request(TimeValue::now())])
.unwrap();

// For demonstrative purposes, adding a contrived wait to get 10Hz data.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
// NOTE: This approach uses `Flow` and `SiftStream::send()` for ease of use, and will
// provide acceptable performance for most users
// In cases where additional performance is required, a separate, more performant method
// is also available that uses `FlowBuilder` and `SiftStream::send_requests_nonblocking`
// is also available that uses `FlowBuilder` and `SiftStream::try_send_requests`
// See `examples/quick-start/` for an example using this alternate approach
let start = std::time::Instant::now();
while start.elapsed() < INGEST_DURATION {
Expand Down
6 changes: 3 additions & 3 deletions rust/crates/sift_stream/examples/quick-start/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ async fn run() -> Result<(), Box<dyn Error>> {
tokio::time::sleep(Duration::from_millis(100)).await;
}

// Next, stream telemetry to Sift using the [`SiftStream::send_requests_nonblocking`] method
// Next, stream telemetry to Sift using the [`SiftStream::try_send_requests`] method
// and the [`FlowBuilder`] to build the flow.
//
// This approach is more performant, and also provides methods to set the channel value via
Expand All @@ -107,7 +107,7 @@ async fn run() -> Result<(), Box<dyn Error>> {
let run_id = sift_stream.run().unwrap().run_id.clone();
for i in 0..360 {
// Build the flow using the [`FlowBuilder`] and send it to
// Sift using the [`SiftStream::send_requests_nonblocking`] method.
// Sift using the [`SiftStream::try_send_requests`] method.
let mut flow_builder = FlowBuilder::new(&descriptor);
flow_builder.attach_run_id(&run_id);
flow_builder
Expand All @@ -116,7 +116,7 @@ async fn run() -> Result<(), Box<dyn Error>> {

// Send telemetry to Sift.
sift_stream
.send_requests_nonblocking(vec![flow_builder.request(TimeValue::now())])
.try_send_requests(vec![flow_builder.request(TimeValue::now())])
.unwrap();

// For demonstrative purposes, adding a contrived wait to get 10Hz data.
Expand Down
2 changes: 1 addition & 1 deletion rust/crates/sift_stream/src/backup/disk/async_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1122,7 +1122,7 @@ mod test {

// The backup file should have been rotated.
assert!(
backup_manager.file_ctx_buffer.len() > 0,
!backup_manager.file_ctx_buffer.is_empty(),
"backup files should be present"
);
assert!(
Expand Down
10 changes: 5 additions & 5 deletions rust/crates/sift_stream/src/backup/disk/file_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -452,16 +452,16 @@ mod tests {
}
}

if writer.should_rotate_file() {
if let Some(ctx) = writer.rotate_file().await.expect("failed to rotate file") {
rotated_files.push(ctx.file_path);
}
if writer.should_rotate_file()
&& let Some(ctx) = writer.rotate_file().await.expect("failed to rotate file")
{
rotated_files.push(ctx.file_path);
}
}

// Should have created multiple files
if !rotated_files.is_empty() {
assert!(rotated_files.len() > 0);
assert!(!rotated_files.is_empty());
for file_path in &rotated_files {
assert!(file_path.exists());
}
Expand Down
82 changes: 82 additions & 0 deletions rust/crates/sift_stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,87 @@
//!
//! Anything that falls outside of that will require changing the client-key.
//!
//! ## Sending Telemetry
//!
//! [`SiftStream`] exposes four methods for delivering telemetry. They differ only in whether
//! they apply backpressure (blocking) or return immediately (non-blocking), and in whether
//! they accept a high-level [`Flow`] or pre-encoded raw requests:
//!
//! | Method | Blocks? | Input |
//! |---|---|---|
//! | [`send`](stream::SiftStream::send) | Yes | [`Flow`] or any [`Encodeable`](stream::Encodeable) |
//! | [`send_requests`](stream::SiftStream::send_requests) | Yes | Pre-encoded requests |
//! | [`try_send`](stream::SiftStream::try_send) | No | [`Flow`] or any [`Encodeable`](stream::Encodeable) |
//! | [`try_send_requests`](stream::SiftStream::try_send_requests) | No | Pre-encoded requests |
//!
//! ### Backpressure with `send`
//!
//! [`send`](stream::SiftStream::send) awaits until the backing channel has capacity, then
//! delivers the message. Use this when you want the producer to slow down naturally when
//! the pipeline is under load — the simplest and most common choice.
//!
//! ```ignore
//! // Awaits until the channel has room; backpressure is applied automatically.
//! sift_stream.send(Flow::new(
//! "robotic-arm",
//! TimeValue::now(),
//! &[ChannelValue::new("joint-angle-encoder", 7.2_f64)],
//! )).await?;
//! ```
//!
//! On error, [`SiftStreamSendError`] is returned. Call `into_inner()` on the
//! [`ChannelClosed`](stream::SiftStreamSendError::ChannelClosed) variant to recover the
//! undelivered message.
//!
//! ### Non-blocking sends with `try_send`
//!
//! [`try_send`](stream::SiftStream::try_send) returns immediately regardless of channel
//! state. If the channel is full it returns [`TrySendError::Full`] with the message; if
//! the channel is closed it returns [`TrySendError::Closed`]. Use this in tight loops or
//! real-time contexts where blocking even briefly is unacceptable.
//!
//! ```ignore
//! match sift_stream.try_send(Flow::new(
//! "robotic-arm",
//! TimeValue::now(),
//! &[ChannelValue::new("joint-angle-encoder", 7.2_f64)],
//! )) {
//! Ok(()) => {}
//! Err(SiftStreamTrySendError::Channel(TrySendError::Full(msg))) => {
//! // Channel is busy — drop this sample or buffer it for later.
//! drop(msg);
//! }
//! Err(e) => return Err(e.into()),
//! }
//! ```
//!
//! ### Pre-encoded batch sends
//!
//! [`send_requests`](stream::SiftStream::send_requests) and
//! [`try_send_requests`](stream::SiftStream::try_send_requests) accept pre-encoded
//! [`IngestWithConfigDataStreamRequest`](sift_rs::ingest::v1::IngestWithConfigDataStreamRequest)
//! values built with [`FlowBuilder`]. This skips the per-call encoding step and is the
//! highest-throughput option.
//!
//! ```ignore
//! let descriptor = sift_stream.get_flow_descriptor("robotic-arm").unwrap();
//! let run_id = sift_stream.run().unwrap().run_id.clone();
//!
//! let mut builder = FlowBuilder::new(&descriptor);
//! builder.attach_run_id(&run_id);
//! builder.set_with_key("joint-angle-encoder", 7.2_f64).unwrap();
//!
//! // Blocking batch send with backpressure:
//! sift_stream.send_requests(vec![builder.request(TimeValue::now())]).await?;
//!
//! // Non-blocking batch send:
//! sift_stream.try_send_requests(vec![builder.request(TimeValue::now())])?;
//! ```
//!
//! On the first failure, `send_requests` / `try_send_requests` stop iterating and return
//! **all** undelivered messages — the failing one plus any not yet attempted — inside the
//! error so nothing is silently dropped.
//!
//! ## Retry Policy
//!
//! At the time of writing this crate, [tonic](https://docs.rs/tonic/latest/tonic/)
Expand Down Expand Up @@ -456,6 +537,7 @@ pub use stream::{
file_backup::FileBackup,
ingestion_config::{Flow, IngestionConfigEncoder, LiveStreaming},
},
send_error::{SendError, SiftStreamSendError, SiftStreamTrySendError, TrySendError},
time::TimeValue,
};

Expand Down
2 changes: 0 additions & 2 deletions rust/crates/sift_stream/src/stream/builder/config_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,6 @@ mod tests {
..Default::default()
},
],
..Default::default()
}
}

Expand Down Expand Up @@ -373,7 +372,6 @@ mod tests {
data_type: ChannelDataType::Double.into(),
..Default::default()
}],
..Default::default()
}];
let form = create_test_ingestion_config_form(asset_name, client_key, existing_flows);

Expand Down
Loading
Loading