Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions crates/layered/src/dynamic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ pub trait DynamicServiceExt<In, Out>: Sized {
fn into_dynamic(self) -> DynamicService<In, Out>;
}

impl<In: Send + 'static, Out: Send + 'static, T> DynamicServiceExt<In, Out> for T
impl<In: Send, Out: Send, T> DynamicServiceExt<In, Out> for T
where
T: Service<In, Out = Out> + 'static,
{
Expand Down Expand Up @@ -53,7 +53,7 @@ where
/// ```
pub struct DynamicService<In, Out>(Arc<DynService<'static, In, Out>>);

impl<In: Send + 'static, Out: Send + 'static> DynamicService<In, Out> {
impl<In: Send, Out: Send> DynamicService<In, Out> {
pub(crate) fn new<T>(strategy: T) -> Self
where
T: Service<In, Out = Out> + Send + Sync + 'static,
Expand Down
8 changes: 4 additions & 4 deletions crates/layered/src/execute.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,10 @@ impl<E> Execute<E> {
#[must_use]
pub fn new<In, Out, F>(e: E) -> Self
where
E: Fn(In) -> F + Send + Sync + 'static,
In: Send + 'static,
F: Future<Output = Out> + Send + 'static,
Out: Send + 'static,
E: Fn(In) -> F + Send + Sync,
In: Send,
F: Future<Output = Out> + Send,
Comment thread
martintmk marked this conversation as resolved.
Out: Send,
{
Self(e)
}
Expand Down
4 changes: 2 additions & 2 deletions crates/layered/src/intercept.rs
Original file line number Diff line number Diff line change
Expand Up @@ -374,15 +374,15 @@ impl<In, Out, S> crate::Layer<S> for InterceptLayer<In, Out> {
}
}

struct OnInput<In>(Arc<dyn Fn(&In) + Send + Sync>);
struct OnInput<In>(Arc<dyn for<'a> Fn(&'a In) + Send + Sync>);

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
struct OnInput<In>(Arc<dyn for<'a> Fn(&'a In) + Send + Sync>);
struct OnInput<'a, In>(Arc<dyn Fn(&'a In) + Send + Sync>);

I think something like this will get you closer, but the hang-up is that the closure lifetime is much longer than &In... and the closure could borrow and hold using the ref.

A stateless function, on the other hand, can't do that. So it solves the problem, but takes away closures. And users who need stateful calls would have to put their "context" data in In which is gross. Sorry, all I have is bad news :|

A cheaper, backward compatible fix would be to document that this isn't supported :)

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am starting to realize that this indeed might not be possible with current Rust :(


impl<In> Clone for OnInput<In> {
fn clone(&self) -> Self {
Self(Arc::clone(&self.0))
}
}

struct OnOutput<Out>(Arc<dyn Fn(&Out) + Send + Sync>);
struct OnOutput<Out>(Arc<dyn for<'a> Fn(&'a Out) + Send + Sync>);

impl<Out> Clone for OnOutput<Out> {
fn clone(&self) -> Self {
Expand Down
19 changes: 19 additions & 0 deletions crates/layered/tests/execute.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

#![allow(missing_docs, reason = "This is a test module")]
#![cfg(not(miri))]

//! Integration tests for [`Execute`] service.

use layered::{Execute, Service};

#[tokio::test]
async fn str_references() {
let service = Execute::new(|input: &str| async move { input });

let input = "hello".to_string();
let output = service.execute(input.as_str()).await;

assert_eq!(output, "hello");
}
30 changes: 30 additions & 0 deletions crates/layered/tests/intercept.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.

#![allow(missing_docs, reason = "This is a test module")]
#![cfg(feature = "intercept")]

//! Integration tests for [`Intercept`] middleware.

use layered::{Execute, Intercept, Service, Stack};

#[cfg_attr(miri, ignore)]
#[tokio::test]
async fn str_references() {
let stack = (
Intercept::<&str, &str, _>::layer()
.on_input(|input: &&str| {
assert!(!input.is_empty());
})
.on_output(|output: &&str| {
assert!(!output.is_empty());
}),
Execute::new(|input: &str| async move { input }),
);
let service = stack.into_service();

let input = "hello".to_string();
let output = service.execute(input.as_str()).await;

assert_eq!(output, "hello");
}
2 changes: 1 addition & 1 deletion crates/seatbelt/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ futures-util = { workspace = true, features = ["alloc"] }
http.workspace = true
insta = { workspace = true, features = ["json"] }
jiff = { workspace = true, default-features = true, features = ["serde"] }
layered = { workspace = true, features = ["tower-service", "dynamic-service"] }
layered = { workspace = true, features = ["tower-service", "dynamic-service", "intercept"] }
mutants.workspace = true
ohno = { workspace = true, features = ["app-err"] }
opentelemetry = { workspace = true, default-features = false, features = ["metrics"] }
Expand Down
12 changes: 6 additions & 6 deletions crates/seatbelt/src/chaos/injection/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ impl<In, Out, S1, S2> InjectionLayer<In, Out, S1, S2> {
}
}

impl<In: Send + 'static, Out: Send + 'static, S1, S2> InjectionLayer<In, Out, S1, S2> {
impl<In: Send, Out: Send, S1, S2> InjectionLayer<In, Out, S1, S2> {
/// Sets a callback-based output factory for the injected output.
///
/// The `output_fn` receives the consumed input and [`InjectionOutputArgs`],
Expand All @@ -164,17 +164,17 @@ impl<In: Send + 'static, Out: Send + 'static, S1, S2> InjectionLayer<In, Out, S1
#[must_use]
pub fn output(self, value: Out) -> InjectionLayer<In, Out, S1, Set>
where
Out: Clone + Sync,
Out: Clone + Sync + 'static,
{
self.output_with(move |_, _| value.clone())
}
}

impl<In, Ok, Err, S1, S2> InjectionLayer<In, Result<Ok, Err>, S1, S2>
where
In: Send + 'static,
Ok: Send + 'static,
Err: Send + 'static,
In: Send,
Ok: Send,
Err: Send,
{
/// Sets a callback-based error factory for the injected output.
///
Expand Down Expand Up @@ -204,7 +204,7 @@ where
#[must_use]
pub fn output_error(self, error: Err) -> InjectionLayer<In, Result<Ok, Err>, S1, Set>
where
Err: Clone + Sync,
Err: Clone + Sync + 'static,
{
self.output_error_with(move |_, _| error.clone())
}
Expand Down
6 changes: 3 additions & 3 deletions crates/seatbelt/src/chaos/injection/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ impl<In, Out> Injection<In, Out, ()> {
// body MUST be mirrored in the `call` body, and vice versa. See crate-level AGENTS.md.
impl<In, Out, S> Service<In> for Injection<In, Out, S>
where
In: Send + 'static,
Out: Send + 'static,
In: Send,
Out: Send,
S: Service<In, Out = Out>,
{
type Out = Out;
Expand Down Expand Up @@ -186,7 +186,7 @@ where
}
}

impl<In: Send + 'static, Out: Send + 'static> InjectionShared<In, Out> {
impl<In: Send, Out: Send> InjectionShared<In, Out> {
fn should_inject(&self, input: &In) -> bool {
let rate = self.rate.call(input, InjectionRateArgs {}).clamp(0.0, 1.0);
self.rnd.next_f64() < rate
Expand Down
6 changes: 3 additions & 3 deletions crates/seatbelt/src/chaos/latency/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ impl<In, Out> Latency<In, Out, ()> {
// body MUST be mirrored in the `call` body, and vice versa. See crate-level AGENTS.md.
impl<In, Out, S> Service<In> for Latency<In, Out, S>
where
In: Send + 'static,
Out: Send + 'static,
In: Send,
Out: Send,
S: Service<In, Out = Out>,
{
type Out = Out;
Expand Down Expand Up @@ -199,7 +199,7 @@ where
}
}

impl<In: Send + 'static, Out: Send + 'static> LatencyShared<In, Out> {
impl<In: Send, Out: Send> LatencyShared<In, Out> {
#[cfg_attr(test, mutants::skip)] // causes test timeouts
fn should_inject(&self, input: &In) -> bool {
let rate = self.rate.call(input, LatencyRateArgs {}).clamp(0.0, 1.0);
Expand Down
20 changes: 11 additions & 9 deletions crates/seatbelt/src/fallback/callbacks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,22 @@ pub(crate) enum FallbackAction<Out> {
Async(AsyncFallbackFn<Out>),
}

impl<Out: Send + 'static> FallbackAction<Out> {
impl<Out: Send> FallbackAction<Out> {
/// Create from a synchronous closure.
pub(crate) fn new_sync(f: impl Fn(Out, FallbackActionArgs) -> Out + Send + Sync + 'static) -> Self {
Self::Sync(SyncFallbackFn::new(f))
}

/// Invoke the fallback action.
pub(crate) async fn call(&self, out: Out, args: FallbackActionArgs) -> Out {
match self {
Self::Sync(f) => f.call(out, args),
Self::Async(f) => f.call(out, args).await,
}
}
}
Comment thread
martintmk marked this conversation as resolved.

impl<Out: Send + 'static> FallbackAction<Out> {
/// Create from an asynchronous closure.
pub(crate) fn new_async<F, Fut>(f: F) -> Self
where
Expand All @@ -34,14 +44,6 @@ impl<Out: Send + 'static> FallbackAction<Out> {
{
Self::Async(AsyncFallbackFn::new(move |out, args| Box::pin(f(out, args))))
}

/// Invoke the fallback action.
pub(crate) async fn call(&self, out: Out, args: FallbackActionArgs) -> Out {
match self {
Self::Sync(f) => f.call(out, args),
Self::Async(f) => f.call(out, args).await,
}
}
}

impl<Out> Clone for FallbackAction<Out> {
Expand Down
6 changes: 4 additions & 2 deletions crates/seatbelt/src/fallback/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ impl<In, Out, S1, S2> FallbackLayer<In, Out, S1, S2> {
}
}

impl<In, Out: Send + 'static, S1, S2> FallbackLayer<In, Out, S1, S2> {
impl<In, Out: Send, S1, S2> FallbackLayer<In, Out, S1, S2> {
/// Sets a synchronous fallback action.
///
/// The `action` receives the original (invalid) output and [`FallbackActionArgs`]
Expand All @@ -129,11 +129,13 @@ impl<In, Out: Send + 'static, S1, S2> FallbackLayer<In, Out, S1, S2> {
#[must_use]
pub fn fallback_output(self, value: Out) -> FallbackLayer<In, Out, S1, Set>
where
Out: Clone + Sync,
Out: Clone + Sync + 'static,
{
self.fallback(move |_, _| value.clone())
}
}

impl<In, Out: Send + 'static, S1, S2> FallbackLayer<In, Out, S1, S2> {
/// Sets an asynchronous fallback action.
///
/// The `action` receives the original (invalid) output and [`FallbackActionArgs`]
Expand Down
4 changes: 2 additions & 2 deletions crates/seatbelt/src/fallback/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ impl<In, Out> Fallback<In, Out, ()> {
impl<In, Out, S> Service<In> for Fallback<In, Out, S>
where
In: Send,
Out: Send + 'static,
Out: Send,
S: Service<In, Out = Out>,
{
type Out = Out;
Expand Down Expand Up @@ -186,7 +186,7 @@ where
}
}

impl<In, Out: Send + 'static> FallbackShared<In, Out> {
impl<In, Out: Send> FallbackShared<In, Out> {
async fn handle_fallback(&self, output: Out) -> Out {
let new_output = self.fallback_action.call(output, FallbackActionArgs {}).await;

Expand Down
2 changes: 1 addition & 1 deletion crates/seatbelt/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ pub(crate) enum EnableIf<In> {
/// The middleware is always disabled.
Disabled,
/// The middleware is conditionally enabled based on a predicate.
Custom(Arc<dyn Fn(&In) -> bool + Send + Sync>),
Custom(Arc<dyn for<'a> Fn(&'a In) -> bool + Send + Sync>),
}

impl<In> EnableIf<In> {
Expand Down
19 changes: 19 additions & 0 deletions crates/seatbelt/tests/breaker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,3 +243,22 @@
assert_eq!(result1, Ok("circuit is open".to_string()));
assert_eq!(result2, Ok("circuit is open".to_string()));
}

#[tokio::test]
async fn str_references() {
let clock = Clock::new_frozen();
let context: ResilienceContext<&str, &str> = ResilienceContext::new(&clock);

let stack = (
Breaker::layer("test_breaker", &context)
.recovery_with(|_output: &&str, _| RecoveryInfo::never())
.rejected_input(|_input: &str, _| "circuit is open"),
Execute::new(|input: &str| async move { input }),
);
let service = stack.into_service();

let input = "hello".to_string();
let output = service.execute(input.as_str()).await;

Check failure on line 261 in crates/seatbelt/tests/breaker.rs

View workflow job for this annotation

GitHub Actions / static-analysis (windows-11-arm)

`input` does not live long enough

Check failure on line 261 in crates/seatbelt/tests/breaker.rs

View workflow job for this annotation

GitHub Actions / testing (windows-11-arm)

`input` does not live long enough

assert_eq!(output, "hello");
}
19 changes: 19 additions & 0 deletions crates/seatbelt/tests/chaos_injection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,3 +331,22 @@

assert_eq!(output, Ok("injected".to_string()));
}

#[tokio::test]
async fn str_references() {
let clock = Clock::new_frozen();
let context: ResilienceContext<&str, &str> = ResilienceContext::new(&clock);

let stack = (
Injection::layer("test_injection", &context)
.rate(0.0)
.output_with(|_input: &str, _args| "injected"),
Execute::new(|input: &str| async move { input }),
);
let service = stack.into_service();

let input = "hello".to_string();
let output = service.execute(input.as_str()).await;

Check failure on line 349 in crates/seatbelt/tests/chaos_injection.rs

View workflow job for this annotation

GitHub Actions / testing (ubuntu-latest)

`input` does not live long enough

assert_eq!(output, "hello");
}
19 changes: 19 additions & 0 deletions crates/seatbelt/tests/chaos_latency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,3 +362,22 @@
// Unlike injection, latency preserves the inner service output.
assert_eq!(output, Ok("processed:hello".to_string()));
}

#[tokio::test]
async fn str_references() {
let clock = ClockControl::default().auto_advance_timers(true).to_clock();
let context: ResilienceContext<&str, &str> = ResilienceContext::new(&clock);

let stack = (
Latency::layer("test_latency", &context)
.rate(0.0)
.latency(Duration::from_millis(10)),
Execute::new(|input: &str| async move { input }),
);
let service = stack.into_service();

let input = "hello".to_string();
let output = service.execute(input.as_str()).await;

Check failure on line 380 in crates/seatbelt/tests/chaos_latency.rs

View workflow job for this annotation

GitHub Actions / coverage (ubuntu-24.04-arm)

`input` does not live long enough

Check failure on line 380 in crates/seatbelt/tests/chaos_latency.rs

View workflow job for this annotation

GitHub Actions / static-analysis (ubuntu-latest)

`input` does not live long enough

assert_eq!(output, "hello");
}
19 changes: 19 additions & 0 deletions crates/seatbelt/tests/fallback.rs
Original file line number Diff line number Diff line change
Expand Up @@ -220,3 +220,22 @@
assert_eq!(output1, Ok("fixed_default".to_string()));
assert_eq!(output2, Ok("fixed_default".to_string()));
}

#[tokio::test]
async fn str_references() {
let clock = Clock::new_frozen();
let context: ResilienceContext<&str, &str> = ResilienceContext::new(&clock);

let stack = (
Fallback::layer("test_fallback", &context)
.should_fallback(|output: &&str| output.is_empty())
.fallback(|_output, _args| "fallback_value"),
Execute::new(|input: &str| async move { input }),
);
let service = stack.into_service();

let input = "hello".to_string();
let output = service.execute(input.as_str()).await;

Check failure on line 238 in crates/seatbelt/tests/fallback.rs

View workflow job for this annotation

GitHub Actions / static-analysis (windows-11-arm)

`input` does not live long enough

Check failure on line 238 in crates/seatbelt/tests/fallback.rs

View workflow job for this annotation

GitHub Actions / testing (ubuntu-latest)

`input` does not live long enough

assert_eq!(output, "hello");
}
19 changes: 19 additions & 0 deletions crates/seatbelt/tests/hedging.rs
Original file line number Diff line number Diff line change
Expand Up @@ -687,3 +687,22 @@
"on_execute callback must be invoked at least once"
);
}

#[tokio::test]
async fn str_references() {
let clock = Clock::new_frozen();
let context: ResilienceContext<&str, &str> = ResilienceContext::new(&clock);

let stack = (
Hedging::layer("test_hedging", &context)
.clone_input()
.recovery_with(|_output: &&str, _| RecoveryInfo::never()),
Execute::new(|input: &str| async move { input }),
);
let service = stack.into_service();

let input = "hello".to_string();
let output = service.execute(input.as_str()).await;

Check failure on line 705 in crates/seatbelt/tests/hedging.rs

View workflow job for this annotation

GitHub Actions / coverage (ubuntu-24.04-arm)

`input` does not live long enough

Check failure on line 705 in crates/seatbelt/tests/hedging.rs

View workflow job for this annotation

GitHub Actions / static-analysis (ubuntu-24.04-arm)

`input` does not live long enough

Check failure on line 705 in crates/seatbelt/tests/hedging.rs

View workflow job for this annotation

GitHub Actions / testing (ubuntu-24.04-arm)

`input` does not live long enough

Check failure on line 705 in crates/seatbelt/tests/hedging.rs

View workflow job for this annotation

GitHub Actions / static-analysis (windows-latest)

`input` does not live long enough

Check failure on line 705 in crates/seatbelt/tests/hedging.rs

View workflow job for this annotation

GitHub Actions / testing (windows-latest)

`input` does not live long enough

assert_eq!(output, "hello");
}
Loading
Loading