Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 2 additions & 21 deletions include/beman/execution/detail/affine.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import beman.execution.detail.sender_adaptor_closure;
import beman.execution.detail.sender_for;
import beman.execution.detail.set_value;
import beman.execution.detail.store_receiver;
import beman.execution.detail.unstoppable;
import beman.execution.detail.unstoppable_scheduler;
#else
#include <beman/execution/detail/basic_sender.hpp>
#include <beman/execution/detail/continues_on.hpp>
Expand All @@ -48,31 +48,12 @@ import beman.execution.detail.unstoppable;
#include <beman/execution/detail/sender_for.hpp>
#include <beman/execution/detail/set_value.hpp>
#include <beman/execution/detail/store_receiver.hpp>
#include <beman/execution/detail/unstoppable.hpp>
#include <beman/execution/detail/unstoppable_scheduler.hpp>
#endif

// ----------------------------------------------------------------------------

namespace beman::execution::detail {
template <typename Sched>
struct unstoppable_scheduler {
using scheduler_concept = typename Sched::scheduler_concept;

template <typename Q, typename... Args>
requires requires { ::std::declval<Sched>().query(::std::declval<const Q&>(), ::std::declval<Args>()...); }
auto query(const Q& q, Args&&... args) const noexcept -> decltype(auto) {
return sched.query(q, ::std::forward<Args>(args)...);
}

auto schedule() const noexcept(std::is_nothrow_invocable_v<::beman::execution::schedule_t, Sched>) {
return ::beman::execution::unstoppable(::beman::execution::schedule(sched));
}

friend auto operator==(const unstoppable_scheduler& lhs, const unstoppable_scheduler& rhs) -> bool = default;

Sched sched;
};

/**
* @brief The affine_t struct is a sender adaptor closure that transforms a sender
* to complete on the scheduler obtained from the receiver's environment.
Expand Down
5 changes: 3 additions & 2 deletions include/beman/execution/detail/associate.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ import beman.execution.detail.basic_sender;
import beman.execution.detail.connect;
import beman.execution.detail.connect_result_t;
import beman.execution.detail.completion_signatures;
import beman.execution.detail.completion_signatures_for;
import beman.execution.detail.default_impls;
import beman.execution.detail.env;
import beman.execution.detail.forward_like;
import beman.execution.detail.get_completion_signatures;
import beman.execution.detail.impls_for;
import beman.execution.detail.make_sender;
import beman.execution.detail.nothrow_callable;
Expand All @@ -35,6 +35,7 @@ import beman.execution.detail.valid_completion_signatures;
#else
#include <beman/execution/detail/connect.hpp>
#include <beman/execution/detail/default_impls.hpp>
#include <beman/execution/detail/get_completion_signatures.hpp>
#include <beman/execution/detail/impls_for.hpp>
#include <beman/execution/detail/make_sender.hpp>
#include <beman/execution/detail/nothrow_callable.hpp>
Expand Down Expand Up @@ -118,7 +119,7 @@ struct associate_t {
static consteval auto get_completion_signatures() {
using Data = decltype(std::declval<::std::remove_cvref_t<Sender>>().template get<1>());
using child_type_t = ::std::remove_cvref_t<typename ::std::remove_cvref_t<Data>::wrap_sender>;
return ::beman::execution::detail::completion_signatures_for<child_type_t, Env...>{};
return ::beman::execution::get_completion_signatures<child_type_t, Env...>();
}

struct impls_for : ::beman::execution::detail::default_impls {
Expand Down
4 changes: 2 additions & 2 deletions include/beman/execution/detail/basic_sender.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ struct basic_sender : ::beman::execution::detail::product_type<Tag, Data, Child.
auto get_env() const noexcept -> decltype(auto) {
auto&& d{this->template get<1>()};
return sub_apply.at<2>(
[&d](auto&&... c) { return ::beman::execution::detail::impls_for<Tag>::get_attrs(d, c...); }, *this);
[&d](auto&&... c) { return ::beman::execution::detail::get_impls_for<Tag>::get_attrs()(d, c...); }, *this);
}

template <typename Receiver>
Expand Down Expand Up @@ -114,7 +114,7 @@ struct basic_sender : ::beman::execution::detail::product_type<Tag, Data, Child.
#endif
template <::beman::execution::detail::decays_to<basic_sender> Self, typename... Env>
requires(sizeof...(Env) == 1) || (... && !::beman::execution::dependent_sender<Child>)
static consteval auto get_completion_signatures() noexcept {
static consteval auto get_completion_signatures() {
if constexpr (requires { Tag::template get_completion_signatures<Self, Env...>(); })
return Tag::template get_completion_signatures<Self, Env...>();
else
Expand Down
57 changes: 54 additions & 3 deletions include/beman/execution/detail/continues_on.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@ import beman.execution.detail.connect;
import beman.execution.detail.connect_result_t;
import beman.execution.detail.decayed_tuple;
import beman.execution.detail.default_impls;
import beman.execution.detail.env;
import beman.execution.detail.env_of_t;
import beman.execution.detail.error_types_of_t;
import beman.execution.detail.fwd_env;
import beman.execution.detail.get_completion_domain;
import beman.execution.detail.get_completion_scheduler;
import beman.execution.detail.get_completion_signatures;
import beman.execution.detail.get_env;
import beman.execution.detail.impls_for;
Expand All @@ -49,6 +52,7 @@ import beman.execution.detail.sender_for;
import beman.execution.detail.sender_in;
import beman.execution.detail.set_error;
import beman.execution.detail.set_stopped;
import beman.execution.detail.set_value;
import beman.execution.detail.start;
#else
#include <beman/execution/detail/as_tuple.hpp>
Expand All @@ -57,9 +61,12 @@ import beman.execution.detail.start;
#include <beman/execution/detail/connect.hpp>
#include <beman/execution/detail/decayed_tuple.hpp>
#include <beman/execution/detail/default_impls.hpp>
#include <beman/execution/detail/env.hpp>
#include <beman/execution/detail/env_of_t.hpp>
#include <beman/execution/detail/error_types_of_t.hpp>
#include <beman/execution/detail/fwd_env.hpp>
#include <beman/execution/detail/get_completion_domain.hpp>
#include <beman/execution/detail/get_completion_scheduler.hpp>
#include <beman/execution/detail/get_env.hpp>
#include <beman/execution/detail/impls_for.hpp>
#include <beman/execution/detail/join_env.hpp>
Expand All @@ -79,6 +86,7 @@ import beman.execution.detail.start;
#include <beman/execution/detail/sender_in.hpp>
#include <beman/execution/detail/set_error.hpp>
#include <beman/execution/detail/set_stopped.hpp>
#include <beman/execution/detail/set_value.hpp>
#include <beman/execution/detail/start.hpp>
#endif

Expand All @@ -103,13 +111,15 @@ struct continues_on_t {
}

private:
template <typename, typename>
template <typename, typename...>
struct get_signatures;
template <typename Sender>
struct get_signatures<Sender> : get_signatures<Sender, ::beman::execution::env<>> {};
template <typename Scheduler, typename Sender, typename Env>
struct get_signatures<
::beman::execution::detail::basic_sender<::beman::execution::detail::continues_on_t, Scheduler, Sender>,
Env> {
using scheduler_sender = decltype(::beman::execution::schedule(::std::declval<Scheduler>()));
using scheduler_sender = ::beman::execution::schedule_result_t<Scheduler>;
template <typename... E>
using as_set_error = ::beman::execution::completion_signatures<::beman::execution::set_error_t(E)...>;
using type = ::beman::execution::detail::meta::combine<
Expand All @@ -118,13 +128,54 @@ struct continues_on_t {
::beman::execution::completion_signatures<::beman::execution::set_error_t(::std::exception_ptr)>>;
};

template <typename Scheduler, typename ChildAttrs>
struct attrs {
template <typename Tag, typename... Env>
requires requires(const Scheduler& scheduler, const Env&... env) {
::beman::execution::get_completion_scheduler<Tag>(scheduler,
::beman::execution::detail::fwd_env(env)...);
}
auto query(::beman::execution::get_completion_scheduler_t<Tag>, const Env&... env) const noexcept {
return ::beman::execution::get_completion_scheduler<Tag>(this->sch, env...);
}

template <typename Tag, typename... Env>
requires requires(const Scheduler& scheduler, const Env&... env) {
::beman::execution::get_completion_domain<Tag>(scheduler, ::beman::execution::detail::fwd_env(env)...);
}
auto query(::beman::execution::get_completion_domain_t<Tag>, const Env&... env) const noexcept {
return ::beman::execution::get_completion_domain<Tag>(this->sch,
::beman::execution::detail::fwd_env(env)...);
}

template <typename Q, typename... Args>
requires requires(const ChildAttrs& child_attrs, Q q, Args&&... args) {
child_attrs.query(q, ::std::forward<Args>(args)...);
}
auto query(Q q, Args&&... args) const noexcept {
return this->child_attrs.query(q, ::std::forward<Args>(args)...);
}

Scheduler sch;
ChildAttrs child_attrs;
};

public:
template <typename Sender, typename... Env>
static consteval auto get_completion_signatures() {
static consteval auto get_completion_signatures() noexcept {
return typename get_signatures<::std::remove_cvref_t<Sender>, Env...>::type{};
}

struct impls_for : ::beman::execution::detail::default_impls {
struct get_attrs_impl {
auto operator()(const auto& sch, const auto& child) const noexcept {
using Sch = ::std::remove_cvref_t<decltype(sch)>;
using ChildAttrs = ::std::remove_cvref_t<decltype(::beman::execution::get_env(child))>;
return attrs<Sch, ChildAttrs>{sch, ::beman::execution::get_env(child)};
}
};
static constexpr auto get_attrs{get_attrs_impl{}};

template <typename State>
struct upstream_receiver {
using receiver_concept = ::beman::execution::receiver_tag;
Expand Down
19 changes: 11 additions & 8 deletions include/beman/execution/detail/get_completion_scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@ import beman.execution.detail.completion_tag;
import beman.execution.detail.forwarding_query;
import beman.execution.detail.scheduler;
import beman.execution.detail.set_value;
import beman.execution.detail.try_query;
#else
#include <beman/execution/detail/completion_tag.hpp>
#include <beman/execution/detail/forwarding_query.hpp>
#include <beman/execution/detail/scheduler.hpp>
#include <beman/execution/detail/set_value.hpp>
#include <beman/execution/detail/try_query.hpp>
#endif

// ----------------------------------------------------------------------------
Expand All @@ -38,7 +40,8 @@ template <typename Tag, typename Q, typename... E>
concept compl_sched_recurse_queryable = requires {
{
::beman::execution::detail::recurse_query(
::std::declval<const Q&>().query(
::beman::execution::detail::try_query(
::std::declval<const Q&>(),
::std::declval<::beman::execution::detail::get_completion_scheduler_t<Tag>>(),
::std::declval<const E&>()...),
::std::declval<const E&>()...)
Expand All @@ -49,18 +52,18 @@ template <::beman::execution::detail::completion_tag Tag>
struct get_completion_scheduler_t : ::beman::execution::forwarding_query_t {
template <typename Q, typename... E>
requires ::beman::execution::detail::compl_sched_recurse_queryable<Tag, Q, E...> ||
(::beman::execution::scheduler<Q> && sizeof...(E) > 0)
::beman::execution::scheduler<Q>
auto operator()(const Q& q, const E&... e) const noexcept;
};

template <typename Scheduler, typename... Envs>
auto recurse_query(Scheduler sch, const Envs&... envs) noexcept {
::beman::execution::detail::get_completion_scheduler_t<::beman::execution::set_value_t> get_compl_sch{};
if constexpr (requires { ::std::as_const(sch).query(get_compl_sch, envs...); }) {
auto sch2 = ::std::as_const(sch).query(get_compl_sch, envs...);
if constexpr (requires { ::beman::execution::detail::try_query(sch, get_compl_sch, envs...); }) {
auto sch2 = ::beman::execution::detail::try_query(sch, get_compl_sch, envs...);
if constexpr (::std::same_as<Scheduler, decltype(sch2)>) {
while (sch != sch2) {
sch = ::std::exchange(sch2, ::std::as_const(sch2).query(get_compl_sch, envs...));
sch = ::std::exchange(sch2, ::beman::execution::detail::try_query(sch2, get_compl_sch, envs...));
}
return sch;
} else {
Expand All @@ -74,12 +77,12 @@ auto recurse_query(Scheduler sch, const Envs&... envs) noexcept {
template <::beman::execution::detail::completion_tag Tag>
template <typename Q, typename... E>
requires ::beman::execution::detail::compl_sched_recurse_queryable<Tag, Q, E...> ||
(::beman::execution::scheduler<Q> && sizeof...(E) > 0)
::beman::execution::scheduler<Q>
auto get_completion_scheduler_t<Tag>::operator()(const Q& q, const E&... e) const noexcept {
if constexpr (::beman::execution::detail::compl_sched_recurse_queryable<Tag, Q, E...>) {
return ::beman::execution::detail::recurse_query(q.query(*this, e...), e...);
return ::beman::execution::detail::recurse_query(::beman::execution::detail::try_query(q, *this, e...), e...);
} else {
static_assert(::beman::execution::scheduler<Q> && sizeof...(E) > 0);
static_assert(::beman::execution::scheduler<Q>);
return q;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,15 @@ consteval auto get_completion_signatures() -> ::beman::execution::detail::valid_
::std::remove_cvref_t<Sndr>::template get_completion_signatures<Sndr, E...>()>) {
return ::std::remove_cvref_t<Sndr>::template get_completion_signatures<Sndr, E...>();
}
} else if constexpr (requires {
{
::std::remove_cvref_t<Sndr>::get_completion_signatures()
} -> ::beman::execution::detail::valid_completion_signatures;
}) {
if constexpr (::beman::execution::detail::is_constant<
::std::remove_cvref_t<Sndr>::get_completion_signatures()>) {
return ::std::remove_cvref_t<Sndr>::get_completion_signatures();
}
}
}};
if constexpr (requires {
Expand Down
Loading
Loading