diff --git a/.gitmodules b/.gitmodules index 27a1e12a..f6acfd3f 100644 --- a/.gitmodules +++ b/.gitmodules @@ -60,3 +60,7 @@ path = third_party/gcem url = https://github.com/aethernetio/gcem.git branch = master +[submodule "third_party/stdexec"] + path = third_party/stdexec + url = https://github.com/aethernetio/stdexec.git + branch = main diff --git a/aether/CMakeLists.txt b/aether/CMakeLists.txt index 4b0549cb..e6a6654c 100644 --- a/aether/CMakeLists.txt +++ b/aether/CMakeLists.txt @@ -41,7 +41,9 @@ list(APPEND common_dependencies "../third_party/gcem" "../third_party/etl" "../third_party/aethernet-numeric" + "../third_party/stdexec" ) +set(STDEXEC_BUILD_EXAMPLES Off) # for etl set(GIT_DIR_LOOKUP_POLICY ALLOW_LOOKING_ABOVE_CMAKE_SOURCE_DIR) @@ -359,7 +361,8 @@ if(REGULAR_CMAKE_PROJECT) sodium hydrogen gcem - etl) + etl + stdexec) target_link_libraries(${PROJECT_NAME} PRIVATE c-ares ) set(TARGET_NAME "${PROJECT_NAME}") @@ -416,7 +419,8 @@ else() sodium hydrogen gcem - etl) + etl + stdexec) else() #ERROR message(SEND_ERROR "You must specify the CMAKE version!") @@ -538,10 +542,12 @@ elseif (CMAKE_CXX_COMPILER_ID STREQUAL "MSVC") PRIVATE /W4 /WX + /w15262 #implisitfallthrough /wd4388 #Wno-sign-compare /wd4389 #Wno-sign-compare - /wd4244 - /w15262 #implisitfallthrough + PUBLIC + /wd4100 /wd4101 /wd4127 /wd4244 /wd4324 + /wd4456 /wd4459 /wd4714 ) target_compile_options(${TARGET_NAME} PUBLIC /Zc:preprocessor) endif() @@ -558,15 +564,15 @@ endif() if(REGULAR_CMAKE_PROJECT AND NOT AE_BUILD_TESTS) ## Target installation - install(TARGETS ${TARGET_NAME} - EXPORT ${TARGET_NAME}Targets - ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} - LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} - PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/${TARGET_NAME} - COMPONENT library) + # install(TARGETS ${TARGET_NAME} + # EXPORT ${TARGET_NAME}Targets + # ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR} + # LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR} + # PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/${TARGET_NAME} + # COMPONENT library) ## Target's cmake files: targets export - install(EXPORT ${TARGET_NAME}Targets - NAMESPACE ${TARGET_NAME}:: - DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/${TARGET_NAME}) + #install(EXPORT ${TARGET_NAME}Targets + # NAMESPACE ${TARGET_NAME}:: + # DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/${TARGET_NAME}) endif() diff --git a/aether/ae_context.h b/aether/ae_context.h index d42f91f4..c237d7a5 100644 --- a/aether/ae_context.h +++ b/aether/ae_context.h @@ -34,19 +34,21 @@ struct AeCtxTable { }; struct AeCtx { + bool operator==(AeCtx const&) const = default; + void* obj; AeCtxTable const* vtable; }; template -concept AeContextual = requires(T& t) { +concept AeContextual = requires(T const& t) { { t.ToAeContext() } -> std::same_as; }; class AeContext { public: template - constexpr AeContext(T& obj) // NOLINT(*explicit-constructor) + constexpr AeContext(T const& obj) // NOLINT(*explicit-constructor) : ctx_{obj.ToAeContext()} {} Aether& aether() const { return ctx_.vtable->aether_getter(ctx_.obj); } @@ -54,6 +56,8 @@ class AeContext { return ctx_.vtable->scheduler_getter(ctx_.obj); } + bool operator==(AeContext const&) const = default; + private: AeCtx ctx_; }; diff --git a/aether/aether.cpp b/aether/aether.cpp index 14e112c8..dd9dba9e 100644 --- a/aether/aether.cpp +++ b/aether/aether.cpp @@ -47,21 +47,23 @@ Aether::Aether(ObjProp prop) Aether::~Aether() { AE_TELE_DEBUG(AetherDestroyed); } void Aether::Update(TimePoint current_time) { - update_time = action_processor->Update(current_time); + task_scheduler->Update(current_time); + action_processor->Update(current_time); + update_time = current_time + 100ms; } Aether::operator ActionContext() const { return ActionContext{*action_processor}; } -AeCtx Aether::ToAeContext() { +AeCtx Aether::ToAeContext() const { static constexpr AeCtxTable ae_table{ [](void* obj) -> Aether& { return *static_cast(obj); }, [](void* obj) -> TaskScheduler& { return *static_cast(obj)->task_scheduler; }, }; - return AeCtx{this, &ae_table}; + return AeCtx{const_cast(this), &ae_table}; // NOLINT(*const-cast) } Client::ptr Aether::CreateClient(ClientConfig const& config, diff --git a/aether/aether.h b/aether/aether.h index 4226ffbb..abe93f1e 100644 --- a/aether/aether.h +++ b/aether/aether.h @@ -85,7 +85,7 @@ class Aether : public Obj { // User-facing API. operator ActionContext() const; - AeCtx ToAeContext(); + AeCtx ToAeContext() const; ObjPtr CreateClient(ClientConfig const& config, std::string const& client_id); diff --git a/aether/all.h b/aether/all.h index c29f2b6f..b0867ee5 100644 --- a/aether/all.h +++ b/aether/all.h @@ -24,6 +24,7 @@ #include "aether/ae_context.h" #include "aether/aether_app.h" +#include "aether/executors/executors.h" #include "aether/actions/action.h" #include "aether/actions/action_ptr.h" #include "aether/actions/action_context.h" diff --git a/aether/channels/channel.h b/aether/channels/channel.h index ff16c74b..107b23f4 100644 --- a/aether/channels/channel.h +++ b/aether/channels/channel.h @@ -17,12 +17,18 @@ #ifndef AETHER_CHANNELS_CHANNEL_H_ #define AETHER_CHANNELS_CHANNEL_H_ +#include "aether/memory.h" #include "aether/obj/obj.h" +#include "aether/executors/executors.h" +#include "aether/stream_api/istream.h" #include "aether/channels/channels_types.h" #include "aether/channels/channel_statistics.h" -#include "aether/transport/transport_builder_action.h" namespace ae { +using TransportBuildSender = + ex::AnySender), + ex::set_error_t(int)>; + class Channel : public Obj { AE_OBJECT(Channel, Obj, 0) @@ -36,7 +42,7 @@ class Channel : public Obj { /** * \brief Make transport from this channel. */ - virtual ActionPtr TransportBuilder() = 0; + virtual TransportBuildSender TransportBuilder() = 0; ChannelTransportProperties const& transport_properties() const; ChannelStatistics& channel_statistics(); diff --git a/aether/channels/ethernet_channel.cpp b/aether/channels/ethernet_channel.cpp index 1ba9f99a..248e1e45 100644 --- a/aether/channels/ethernet_channel.cpp +++ b/aether/channels/ethernet_channel.cpp @@ -21,179 +21,108 @@ #include "aether/memory.h" #include "aether/actions/action.h" -#include "aether/types/state_machine.h" #include "aether/events/event_subscription.h" #include "aether/aether.h" #include "aether/poller/poller.h" #include "aether/dns/dns_resolve.h" +#include "aether/executors/executors.h" #include "aether/channels/ethernet_transport_factory.h" -#include "aether/tele/tele.h" - namespace ae { namespace ethernet_access_point_internal { +using ResolveSender = + ex::AnySender), ex::set_error_t(int)>; -class EthernetTransportBuilderAction final : public TransportBuilderAction { - public: - enum class State : std::uint8_t { - kAddressResolve, - kTransportCreate, - kWaitTransportConnected, - kTransportConnected, - kFailed - }; - - EthernetTransportBuilderAction(ActionContext action_context, - EthernetChannel& channel, Endpoint address, - Ptr const& dns_resolver, - Ptr const& poller) - : TransportBuilderAction{action_context}, - action_context_{action_context}, - ethernet_channel_{&channel}, - address_{std::move(address)}, - resolver_{dns_resolver}, - poller_{poller}, - state_{State::kAddressResolve}, - start_time_{Now()} {} - - UpdateStatus Update() override { - if (state_.changed()) { - switch (state_.Acquire()) { - case State::kAddressResolve: - ResolveAddress(); - break; - case State::kTransportCreate: - CreateTransport(); - break; - case State::kWaitTransportConnected: - break; - case State::kTransportConnected: - return UpdateStatus::Result(); - case State::kFailed: - return UpdateStatus::Error(); - } - } - return {}; - } - - std::unique_ptr transport_stream() override { - return std::move(transport_stream_); - } - - private: - void ResolveAddress() { - std::visit( - [this](auto const& addr) { - DoResolverAddress(addr, address_.port, address_.protocol); - }, - address_.address); - } - - void DoResolverAddress([[maybe_unused]] NamedAddr const& name_address, - [[maybe_unused]] std::uint16_t port, - [[maybe_unused]] Protocol protocol) { +ResolveSender ResolveAddress(Ptr const& resolver, + NamedAddr const& addr, std::uint16_t port, + Protocol protocol) { #if AE_SUPPORT_CLOUD_DNS - auto dns_resolver = resolver_.Lock(); - assert(dns_resolver); - auto resolve_action = dns_resolver->Resolve(name_address, port, protocol); - - address_resolve_sub_ = resolve_action->StatusEvent().Subscribe( - ActionHandler{OnResult{[this](auto& action) { - ip_addresses_ = std::move(action.addresses); - it_ = std::begin(ip_addresses_); - state_ = State::kTransportCreate; - Action::Trigger(); - }}, - OnError { - [this]() { - state_ = State::kFailed; - Action::Trigger(); - } - }}); + return resolver->Resolve(addr, port, protocol); #else - AE_TELED_ERROR("Unable to resolve named address"); - state_ = State::kFailed; - Action::Trigger(); + // named address doesn't supported + return ex::just_error(1); #endif - } - - template - void DoResolverAddress(TAddr const& ip_address, std::uint16_t port, - Protocol protocol) { - ip_addresses_.push_back(Endpoint{{{ip_address}, port}, protocol}); - it_ = std::begin(ip_addresses_); - state_ = State::kTransportCreate; - Action::Trigger(); - } - - void CreateTransport() { - state_ = State::kWaitTransportConnected; +} - if (it_ == std::end(ip_addresses_)) { - state_ = State::kFailed; - Action::Trigger(); - return; - } - auto poller = poller_.Lock(); - assert(poller); - auto& addr = *(it_++); - transport_stream_ = - EthernetTransportFactory::Create(action_context_, poller, addr); - - if (!transport_stream_) { - // try next address - state_ = State::kTransportCreate; - Action::Trigger(); - return; - } +ResolveSender ResolveAddress(Ptr const&, auto const& addr, + std::uint16_t port, Protocol protocol) { + return ex::just(std::vector{Endpoint{addr, port, protocol}}); +} - if (transport_stream_->stream_info().link_state == LinkState::kLinked) { - Connected(); - return; - } +auto CreateTransport(AeContext const& context, PtrView const& poller, + Endpoint const& e) { + auto poller_ptr = poller.Lock(); + assert(poller_ptr); + return EthernetTransportFactory::Create(context.aether(), poller_ptr, e); +} - transport_stream_sub_ = - transport_stream_->stream_update_event().Subscribe([this]() { - if (transport_stream_->stream_info().link_state == - LinkState::kLinked) { - // transport stream is connected - Connected(); - } else if (transport_stream_->stream_info().link_state == - LinkState::kLinkError) { - // connection failed, try next address - state_ = State::kTransportCreate; - Action::Trigger(); +ex::sender auto TransportConnect(std::unique_ptr&& stream) { + return ex::make_sender), + ex::set_error_t(int)>( + [&, s{std::move(stream)}, link_sub{Subscription{}}](auto&& recv) mutable { + // if already linked return stream + switch (s->stream_info().link_state) { + case LinkState::kLinked: { + ex::set_value(std::forward(recv), std::move(s)); + return; } - }); - } - - void Connected() { - transport_stream_sub_.Reset(); - auto built_time = std::chrono::duration_cast(Now() - start_time_); - AE_TELED_DEBUG("Transport built by {:%S}", built_time); - ethernet_channel_->channel_statistics().AddConnectionTime(built_time); - state_ = State::kTransportConnected; - Action::Trigger(); - } + case LinkState::kLinkError: { + ex::set_error(std::forward(recv), 1); + return; + } + default: + break; + } + // wait till linked + link_sub = s->stream_update_event().Subscribe( + [&, r{std::forward(recv)}]() mutable noexcept { + link_sub.Reset(); + if (s->stream_info().link_state == LinkState::kLinked) { + ex::set_value(std::move(r), std::move(s)); + } else { + ex::set_error(std::move(r), 2); + } + }); + }); +} - ActionContext action_context_; - EthernetChannel* ethernet_channel_; - Endpoint address_; - PtrView resolver_; - PtrView poller_; - StateMachine state_; - std::vector ip_addresses_; - std::vector::iterator it_{}; - std::unique_ptr transport_stream_; - Subscription address_resolve_sub_; - Subscription transport_stream_sub_; - - TimePoint start_time_; -}; +ex::sender auto MakeTransportBuilder(AeContext ae_context, + Ptr const& dns_resolver, + Ptr const& poller, + Endpoint address) noexcept { + // resolve named address to ip address + return std::visit( + [&](auto const& addr) noexcept { + return ResolveAddress(dns_resolver, addr, address.port, + address.protocol); + }, + address.address) | + // create transport and wait till it connected + ex::let_value([c{ae_context}, p{PtrView{poller}}]( + auto const& endpoints) noexcept { + return ex::for_range( + Iter{endpoints.begin(), endpoints.end()}, + [c, p](auto const& e) noexcept { + return ex::just(e) | + ex::then([&](auto const& e) noexcept { + return CreateTransport(c, p, e); + }) | + ex::let_value([](auto& s) noexcept { + return TransportConnect(std::move(s)); + }) | + ex::upon_error([](auto&&...) noexcept { + return ex::for_continue; + }); + }) | + ex::let_stopped([]() noexcept { return ex::just_error(1); }); + }); +} } // namespace ethernet_access_point_internal +EthernetChannel::EthernetChannel() = default; + EthernetChannel::EthernetChannel(ObjProp prop, ObjPtr aether, ObjPtr dns_resolver, ObjPtr poller, Endpoint address) @@ -227,7 +156,9 @@ EthernetChannel::EthernetChannel(ObjProp prop, ObjPtr aether, } } -ActionPtr EthernetChannel::TransportBuilder() { +EthernetChannel::~EthernetChannel() = default; + +TransportBuildSender EthernetChannel::TransportBuilder() { auto resolver = dns_resolver_.Load(); #if AE_SUPPORT_CLOUD_DNS assert(resolver && "Resolver is not loaded"); @@ -235,9 +166,8 @@ ActionPtr EthernetChannel::TransportBuilder() { auto poller = poller_.Load(); assert(poller && "Poller is not loaded"); - return ActionPtr< - ethernet_access_point_internal::EthernetTransportBuilderAction>( - *aether_.Load().as(), *this, address, resolver, poller); + return ethernet_access_point_internal::MakeTransportBuilder( + AeContext{*aether_.Load().as()}, resolver, poller, address); } } // namespace ae diff --git a/aether/channels/ethernet_channel.h b/aether/channels/ethernet_channel.h index 329e36ba..36e8f6f1 100644 --- a/aether/channels/ethernet_channel.h +++ b/aether/channels/ethernet_channel.h @@ -17,6 +17,7 @@ #ifndef AETHER_CHANNELS_ETHERNET_CHANNEL_H_ #define AETHER_CHANNELS_ETHERNET_CHANNEL_H_ +#include "aether/memory.h" #include "aether/types/address.h" #include "aether/channels/channel.h" @@ -25,18 +26,23 @@ class Aether; class IPoller; class DnsResolver; +namespace ethernet_access_point_internal { +class EthernetChannelTransportBuilder; +} + class EthernetChannel : public Channel { AE_OBJECT(EthernetChannel, Channel, 0) public: - EthernetChannel() = default; + EthernetChannel(); EthernetChannel(ObjProp prop, ObjPtr aether, ObjPtr dns_resolver, ObjPtr poller, Endpoint address); + ~EthernetChannel() override; AE_OBJECT_REFLECT(AE_MMBRS(aether_, poller_, dns_resolver_, address)) - ActionPtr TransportBuilder() override; + TransportBuildSender TransportBuilder() override; Endpoint address; diff --git a/aether/channels/wifi_channel.cpp b/aether/channels/wifi_channel.cpp index 9ec0cf0e..444208c9 100644 --- a/aether/channels/wifi_channel.cpp +++ b/aether/channels/wifi_channel.cpp @@ -23,7 +23,7 @@ # include "aether/ptr/ptr_view.h" # include "aether/actions/action.h" -# include "aether/types/state_machine.h" +# include "aether/executors/executors.h" # include "aether/events/event_subscription.h" # include "aether/aether.h" @@ -36,185 +36,116 @@ namespace ae { namespace wifi_channel_internal { -class WifiTransportBuilderAction final : public TransportBuilderAction { - public: - enum class State : std::uint8_t { - kWifiConnect, - kAddressResolve, - kTransportCreate, - kWaitTransportConnect, - kTransportConnected, - kFailed, - }; - - WifiTransportBuilderAction(ActionContext action_context, WifiChannel& channel, - Ptr const& access_point, - Ptr const& poller, - Ptr const& resolver, Endpoint address) - : TransportBuilderAction{action_context}, - action_context_{action_context}, - channel_{&channel}, - access_point_{access_point}, - poller_{poller}, - resolver_{resolver}, - address_{std::move(address)}, - state_{State::kWifiConnect} {} - - UpdateStatus Update() override { - if (state_.changed()) { - switch (state_.Acquire()) { - case State::kWifiConnect: - WifiConnect(); - break; - case State::kAddressResolve: - ResolveAddress(); - break; - case State::kTransportCreate: - CreateTransport(); - break; - case State::kWaitTransportConnect: - break; - case State::kTransportConnected: - return UpdateStatus::Result(); - case State::kFailed: - return UpdateStatus::Error(); - } - } - return {}; - } - - std::unique_ptr transport_stream() override { - return std::move(transport_stream_); - } - - private: - void WifiConnect() { - auto access_point = access_point_.Lock(); - assert(access_point); - auto connect_action = access_point->Connect(); - wifi_connected_sub_ = connect_action->StatusEvent().Subscribe(ActionHandler{ - OnResult{[this]() { - // build transport start after wifi is connected - start_time_ = Now(); - state_ = State::kAddressResolve; - Action::Trigger(); - }}, - OnError{[this]() { - state_ = State::kFailed; - Action::Trigger(); - }}, - }); - } +ex::sender auto WifiConnect(Ptr const& access_point) { + return ex::make_sender( + [ap{PtrView{access_point}}, + connect_sub{Subscription{}}](auto&& recv) mutable noexcept { + auto access_point = ap.Lock(); + assert(access_point && "Wifi access point is not loaded"); + connect_sub = access_point->Connect()->StatusEvent().Subscribe( + [r{std::forward(recv)}]( + ActionEventStatus aes) mutable noexcept { + aes.OnResult([&]() { ex::set_value(std::move(r)); }) // ~['_']~ + .OnError([&]() { ex::set_error(std::move(r), 1); }); + }); + }); +} - void ResolveAddress() { - std::visit( - [this](auto const& addr) { - DoResolverAddress(addr, address_.port, address_.protocol); - }, - address_.address); - } +using ResolveSender = + ex::AnySender), ex::set_error_t(int)>; - void DoResolverAddress([[maybe_unused]] NamedAddr const& name_address, - [[maybe_unused]] std::uint16_t port, - [[maybe_unused]] Protocol protocol) { +ResolveSender ResolveAddress(PtrView const& resolver, + NamedAddr const& addr, std::uint16_t port, + Protocol protocol) { # if AE_SUPPORT_CLOUD_DNS - auto dns_resolver = resolver_.Lock(); - assert(dns_resolver); - auto resolve_action = dns_resolver->Resolve(name_address, port, protocol); - - address_resolve_sub_ = resolve_action->StatusEvent().Subscribe( - ActionHandler{OnResult{[this](auto& action) { - ip_addresses_ = std::move(action.addresses); - it_ = std::begin(ip_addresses_); - state_ = State::kTransportCreate; - Action::Trigger(); - }}, - OnError { - [this]() { - state_ = State::kFailed; - Action::Trigger(); - } - }}); + auto resolver_ptr = resolver.Lock(); + assert(resolver_ptr && "Dns resolver is not loaded"); + return resolver_ptr->Resolve(addr, port, protocol); # else - AE_TELED_ERROR("Unable to resolve named address"); - state_ = State::kFailed; - Action::Trigger(); + // named address doesn't supported + return ex::just_error(1); # endif - } - - template - void DoResolverAddress(TAddr const& addr, std::uint16_t port, - Protocol protocol) { - ip_addresses_.push_back(Endpoint{{addr, port}, protocol}); - it_ = std::begin(ip_addresses_); - state_ = State::kTransportCreate; - Action::Trigger(); - } +} - void CreateTransport() { - state_ = State::kWaitTransportConnect; +ResolveSender ResolveAddress(PtrView const&, auto const& addr, + std::uint16_t port, Protocol protocol) { + return ex::just(std::vector{Endpoint{addr, port, protocol}}); +} - if (it_ == std::end(ip_addresses_)) { - state_ = State::kFailed; - Action::Trigger(); - return; - } +auto CreateTransport(AeContext const& context, PtrView const& poller, + Endpoint const& e) { + auto poller_ptr = poller.Lock(); + assert(poller_ptr); + return EthernetTransportFactory::Create(context.aether(), poller_ptr, e); +} - auto poller = poller_.Lock(); - assert(poller); - auto& addr = *(it_++); - transport_stream_ = - EthernetTransportFactory::Create(action_context_, poller, addr); - if (!transport_stream_) { - // try next address - state_ = State::kTransportCreate; - Action::Trigger(); - return; - } - if (transport_stream_->stream_info().link_state == LinkState::kLinked) { - Connected(); - return; - } - transport_sub_ = - transport_stream_->stream_update_event().Subscribe([this]() { - if (transport_stream_->stream_info().link_state == - LinkState::kLinked) { - // transport connected - Connected(); - } else if (transport_stream_->stream_info().link_state == - LinkState::kLinkError) { - // try next address - state_ = State::kTransportCreate; - Action::Trigger(); +ex::sender auto TransportConnect(std::unique_ptr&& stream) { + return ex::make_sender), + ex::set_error_t(int)>( + [s{std::move(stream)}, + link_sub{Subscription{}}](auto&& recv) mutable noexcept { + // if already linked return stream + switch (s->stream_info().link_state) { + case LinkState::kLinked: { + ex::set_value(std::forward(recv), std::move(s)); + return; } - }); - } - - void Connected() { - transport_sub_.Reset(); - auto built_time = std::chrono::duration_cast(Now() - start_time_); - AE_TELED_DEBUG("Transport built by {:%S}", built_time); - channel_->channel_statistics().AddConnectionTime(built_time); - state_ = State::kTransportConnected; - Action::Trigger(); - } - - ActionContext action_context_; - WifiChannel* channel_; - PtrView access_point_; - PtrView poller_; - PtrView resolver_; - Endpoint address_; + case LinkState::kLinkError: { + ex::set_error(std::forward(recv), 1); + return; + } + default: + break; + } + // wait till linked + link_sub = s->stream_update_event().Subscribe( + [&, r{std::forward(recv)}]() mutable noexcept { + link_sub.Reset(); + if (s->stream_info().link_state == LinkState::kLinked) { + ex::set_value(std::move(r), std::move(s)); + } else { + ex::set_error(std::move(r), 2); + } + }); + }); +} - std::vector ip_addresses_; - std::vector::iterator it_{}; - std::unique_ptr transport_stream_; - Subscription wifi_connected_sub_; - Subscription address_resolve_sub_; - Subscription transport_sub_; - StateMachine state_; - TimePoint start_time_; -}; +ex::sender auto MakeTransportBuilder(AeContext ae_context, + Ptr const& dns_resolver, + Ptr const& poller, + Ptr const& access_point, + Endpoint address) noexcept { + return // connect to wifi first + WifiConnect(access_point) + // resolve named address to ip address + | ex::let_value([a{std::move(address)}, + r{PtrView{dns_resolver}}]() noexcept { + return std::visit( + [&](auto const& addr) noexcept { + return ResolveAddress(r, addr, a.port, a.protocol); + }, + a.address); + }) + // create transport and wait till it connected + // TODO: add selection for endpoint to connect + | ex::let_value([c{ae_context}, p{PtrView{poller}}]( + std::vector const& endpoints) noexcept { + return ex::for_range(Iter{endpoints.begin(), endpoints.end()}, + [c, p](auto const& e) noexcept { + return ex::just(e) | + ex::then([&](auto const& e) noexcept { + return CreateTransport(c, p, e); + }) | + ex::let_value([](auto& s) noexcept { + return TransportConnect(std::move(s)); + }) | + ex::upon_error([](auto&&...) noexcept { + return ex::for_continue; + }); + }) | + ex::let_stopped([]() noexcept { return ex::just_error(1); }); + }); +} } // namespace wifi_channel_internal WifiChannel::WifiChannel(ObjProp prop, ObjPtr aether, @@ -262,7 +193,7 @@ Duration WifiChannel::TransportBuildTimeout() const { std::chrono::milliseconds{AE_WIFI_CONNECTION_TIMEOUT_MS}; } -ActionPtr WifiChannel::TransportBuilder() { +TransportBuildSender WifiChannel::TransportBuilder() { auto resolver = resolver_.Load(); # if AE_SUPPORT_CLOUD_DNS assert(resolver && "Resolver is not loaded"); @@ -274,13 +205,8 @@ ActionPtr WifiChannel::TransportBuilder() { assert(poller && "Poller is not loaded"); assert(access_point && "Access point is not loaded"); - return ActionPtr{ - *aether_.Load().as(), - *this, - access_point, - poller, - resolver, - address}; + return wifi_channel_internal::MakeTransportBuilder( + *aether_.Load().as(), resolver, poller, access_point, address); } } // namespace ae diff --git a/aether/channels/wifi_channel.h b/aether/channels/wifi_channel.h index 53e1e610..40b57158 100644 --- a/aether/channels/wifi_channel.h +++ b/aether/channels/wifi_channel.h @@ -44,7 +44,7 @@ class WifiChannel final : public Channel { Duration TransportBuildTimeout() const override; - ActionPtr TransportBuilder() override; + TransportBuildSender TransportBuilder() override; Endpoint address; diff --git a/aether/connection_manager/server_connection_manager.cpp b/aether/connection_manager/server_connection_manager.cpp index cc7ebd86..db1e713d 100644 --- a/aether/connection_manager/server_connection_manager.cpp +++ b/aether/connection_manager/server_connection_manager.cpp @@ -30,9 +30,9 @@ ServerConnectionManager::ServerConnectionFactory::CreateConnection( return server_connection_manager_->CreateConnection(server); } -ServerConnectionManager::ServerConnectionManager(ActionContext action_context, +ServerConnectionManager::ServerConnectionManager(AeContext const& ae_context, Ptr const& client) - : action_context_{action_context}, client_{client} {} + : ae_context_{ae_context}, client_{client} {} std::unique_ptr ServerConnectionManager::GetServerConnectionFactory() { @@ -51,7 +51,7 @@ RcPtr ServerConnectionManager::CreateConnection( assert(client); auto connection = - MakeRcPtr(action_context_, client, server); + MakeRcPtr(ae_context_, client, server); // check updates server_update_subs_ += connection->stream_update_event().Subscribe( diff --git a/aether/connection_manager/server_connection_manager.h b/aether/connection_manager/server_connection_manager.h index 665f0eb7..d4976402 100644 --- a/aether/connection_manager/server_connection_manager.h +++ b/aether/connection_manager/server_connection_manager.h @@ -21,8 +21,8 @@ #include "aether/ptr/ptr.h" #include "aether/ptr/rc_ptr.h" +#include "aether/ae_context.h" #include "aether/ptr/ptr_view.h" -#include "aether/actions/action_context.h" #include "aether/server_connections/client_server_connection.h" #include "aether/server_connections/iserver_connection_factory.h" @@ -43,7 +43,7 @@ class ServerConnectionManager { }; public: - ServerConnectionManager(ActionContext action_context, + ServerConnectionManager(AeContext const& ae_context, Ptr const& client); std::unique_ptr GetServerConnectionFactory(); @@ -55,7 +55,7 @@ class ServerConnectionManager { private: void ServerUpdate(ServerId server_id); - ActionContext action_context_; + AeContext ae_context_; PtrView client_; std::map> cached_connections_; MultiSubscription server_update_subs_; diff --git a/aether/dns/dns_c_ares.cpp b/aether/dns/dns_c_ares.cpp index 2987f206..6fef2ecc 100644 --- a/aether/dns/dns_c_ares.cpp +++ b/aether/dns/dns_c_ares.cpp @@ -27,47 +27,115 @@ # include "aether/warning_disable.h" # include "aether/aether.h" +# include "aether/types/result.h" # include "aether/socket_initializer.h" -# include "aether/actions/action_ptr.h" -# include "aether/actions/action_context.h" # include "aether/events/multi_subscription.h" +# include "aether/executors/executors.h" + # include "aether/dns/dns_tele.h" namespace ae { -class AresQueryAction : public Action { - public: - AresQueryAction(ActionContext action_context, NamedAddr name_address, - std::uint16_t port, Protocol protocol) - : Action{action_context}, - name_address_{std::move(name_address)}, - port_{port}, - protocol_{protocol} {} - - UpdateStatus Update() { - if (is_result_) { - return UpdateStatus::Result(); +class AresImpl { + struct QueryContext { + AresImpl* ares_impl; + NamedAddr name_address; + std::uint16_t port_hint; + Protocol protocol_hint; + }; + + template + struct Operation { + static void Callback(void* arg, int status, int timeouts, + struct ares_addrinfo* result) { + auto* op = static_cast*>(arg); + auto r = AresImpl::ProcessResult( + status, timeouts, result, op->ctx.port_hint, op->ctx.protocol_hint); + // set either value or error + if (r.IsOk()) { + ex::set_value(std::move(op->recv), std::move(r.value())); + } else { + ex::set_error(std::move(op->recv), r.error()); + } + } + + void start() & noexcept { + AE_TELE_DEBUG(kAresDnsQueryHost, "Querying host: {}", ctx.name_address); + + ares_addrinfo_hints hints{}; + // BOTH ipv4 and ipv6 + hints.ai_family = AF_UNSPEC; + hints.ai_flags = ARES_AI_CANONNAME; + ares_getaddrinfo(ctx.ares_impl->channel_, ctx.name_address.name.c_str(), + nullptr, &hints, Callback, this); } - if (is_failed_) { - return UpdateStatus::Error(); + + R recv; + QueryContext ctx; + }; + + struct Sender { + using sender_concept = stdexec::sender_t; + using completion_signatures = + stdexec::completion_signatures&&), + ex::set_error_t(int)>; + + template + auto connect(R&& r) && noexcept { + return Operation{.recv = std::forward(r), .ctx = std::move(ctx)}; + } + + QueryContext ctx; + }; + + public: + AresImpl() { + ares_library_init(ARES_LIB_INIT_ALL); + + int optmask = ARES_OPT_EVENT_THREAD; + ares_options options{}; + options.evsys = ARES_EVSYS_DEFAULT; + + /* Initialize channel to run queries, a single channel can accept unlimited + * queries */ + if (auto res = ares_init_options(&channel_, &options, optmask); + res != ARES_SUCCESS) { + AE_TELE_ERROR(kAresDnsFailedInitialize, + "Failed to initialize ares options: {}", + ares_strerror(res)); + assert(false); } - return {}; } - void ProcessResult(int status, int /* timeouts */, - struct ares_addrinfo* result) { + ~AresImpl() { + ares_destroy(channel_); + ares_library_cleanup(); + } + + Sender Query(NamedAddr const& name_address, std::uint16_t port_hint, + Protocol protocol_hint) { + return Sender{.ctx{.ares_impl = this, + .name_address = name_address, + .port_hint = port_hint, + .protocol_hint = protocol_hint}}; + } + + static Result, int> ProcessResult( + int status, int /* timeouts */, struct ares_addrinfo* result, + std::uint16_t port_hint, Protocol protocol_hint) noexcept { if (status != ARES_SUCCESS) { AE_TELE_ERROR(kAresDnsQueryError, "Ares query error {} {}", status, ares_strerror(status)); - Failed(); - return; + return Error{1}; } - assert(result); + assert(result != nullptr); + ae_defer[result]() { ares_freeaddrinfo(result); }; + std::vector resolved_addresses; for (auto* node = result->nodes; node != nullptr; node = node->ai_next) { - auto addr_add = [this](auto const& ip) { - Endpoint addr{{ip, port_}, protocol_}; - resolved_addresses_.emplace_back(std::move(addr)); + auto addr_add = [&](auto const& ip) { + Endpoint addr{{ip, port_hint}, protocol_hint}; + resolved_addresses.emplace_back(std::move(addr)); }; if (node->ai_family == AF_INET) { @@ -92,96 +160,11 @@ class AresQueryAction : public Action { } } - AE_TELE_DEBUG(kAresDnsQuerySuccess, "Got addresses {}", - resolved_addresses_); - Result(); - } - - std::vector const& resolved_addresses() const { - return resolved_addresses_; + AE_TELE_DEBUG(kAresDnsQuerySuccess, "Got addresses {}", resolved_addresses); + return Ok{std::move(resolved_addresses)}; } private: - void Result() { - is_result_ = true; - Action::Trigger(); - } - void Failed() { - is_failed_ = true; - Action::Trigger(); - } - - NamedAddr name_address_; - std::uint16_t port_; - Protocol protocol_; - std::vector resolved_addresses_; - std::atomic_bool is_result_{false}; - std::atomic_bool is_failed_{false}; -}; - -class AresImpl { - public: - explicit AresImpl(ActionContext action_context) - : action_context_{action_context} { - ares_library_init(ARES_LIB_INIT_ALL); - - int optmask = ARES_OPT_EVENT_THREAD; - ares_options options{}; - options.evsys = ARES_EVSYS_DEFAULT; - - /* Initialize channel to run queries, a single channel can accept unlimited - * queries */ - if (auto res = ares_init_options(&channel_, &options, optmask); - res != ARES_SUCCESS) { - AE_TELE_ERROR(kAresDnsFailedInitialize, - "Failed to initialize ares options: {}", - ares_strerror(res)); - assert(false); - } - } - - ~AresImpl() { - ares_destroy(channel_); - ares_library_cleanup(); - } - - ActionPtr Query(NamedAddr const& name_address, - std::uint16_t port_hint, - Protocol protocol_hint) { - AE_TELE_DEBUG(kAresDnsQueryHost, "Querying host: {}", name_address); - - auto resolve_action = ActionPtr{action_context_}; - auto query_action = ActionPtr{ - action_context_, name_address, port_hint, protocol_hint}; - - // connect actions - multi_subscription_.Push( - query_action->StatusEvent().Subscribe(ActionHandler{ - OnResult{[ra{resolve_action}](auto const& action) mutable { - ra->SetAddress(action.resolved_addresses()); - }}, - OnError{[ra{resolve_action}]() mutable { ra->Failed(); }}, - })); - - ares_addrinfo_hints hints{}; - // BOTH ipv4 and ipv6 - hints.ai_family = AF_UNSPEC; - hints.ai_flags = ARES_AI_CANONNAME; - - ares_getaddrinfo( - channel_, name_address.name.c_str(), nullptr, &hints, - [](void* arg, auto status, auto timeouts, auto result) { - auto& ares_query_action = *static_cast(arg); - ares_query_action.ProcessResult(status, timeouts, result); - ares_freeaddrinfo(result); - }, - &*query_action); - - return resolve_action; - } - - private: - ActionContext action_context_; ares_channel_t* channel_; MultiSubscription multi_subscription_; @@ -197,13 +180,39 @@ DnsResolverCares::DnsResolverCares(ObjProp prop, ObjPtr aether) DnsResolverCares::~DnsResolverCares() = default; -ActionPtr DnsResolverCares::Resolve( - NamedAddr const& name_address, std::uint16_t port_hint, - Protocol protocol_hint) { +struct minimal_sender { + using sender_concept = ex::sender_t; + + // 1. Declare what this sender is capable of returning + using completion_signatures = ex::completion_signatures; + + // 2. Define the Operation State (the actual execution logic) + template + struct operation_state { + Receiver rcvr; + + // The "Start" method is where the work actually happens + void start() noexcept { stdexec::set_value(std::move(rcvr), 1); } + }; + + // 3. Connect the blueprint (Sender) to a destination (Receiver) + template + auto connect(Receiver rcvr) const { + return operation_state{std::move(rcvr)}; + } +}; + +ResolveSender DnsResolverCares::Resolve(NamedAddr const& name_address, + std::uint16_t port_hint, + Protocol protocol_hint) { if (!ares_impl_) { - ares_impl_ = std::make_unique(*aether_.Load().as()); + ares_impl_ = std::make_unique(); } - return ares_impl_->Query(name_address, port_hint, protocol_hint); + + return ares_impl_->Query(name_address, port_hint, protocol_hint) | + ex::continues_on(ex::SchedulerOnTasks{*aether_.Load().as()}); } } // namespace ae diff --git a/aether/dns/dns_c_ares.h b/aether/dns/dns_c_ares.h index 491ddcdc..b738bfe2 100644 --- a/aether/dns/dns_c_ares.h +++ b/aether/dns/dns_c_ares.h @@ -47,9 +47,8 @@ class DnsResolverCares : public DnsResolver { AE_OBJECT_REFLECT(AE_MMBR(aether_)) - ActionPtr Resolve(NamedAddr const& name_address, - std::uint16_t port_hint, - Protocol protocol_hint) override; + ResolveSender Resolve(NamedAddr const& name_address, std::uint16_t port_hint, + Protocol protocol_hint) override; private: Obj::ptr aether_; diff --git a/aether/dns/dns_resolve.cpp b/aether/dns/dns_resolve.cpp index 85bc2ab0..5f01a3c9 100644 --- a/aether/dns/dns_resolve.cpp +++ b/aether/dns/dns_resolve.cpp @@ -19,37 +19,5 @@ #if AE_SUPPORT_CLOUD_DNS # include -namespace ae { - -UpdateStatus ResolveAction::Update() const { - if (is_resolved) { - return UpdateStatus::Result(); - } - if (is_failed) { - return UpdateStatus::Error(); - } - return {}; -} - -void ResolveAction::SetAddress(std::vector addr) { - addresses = std::move(addr); - is_resolved = true; - this->Trigger(); -} - -void ResolveAction::Failed() { - is_failed = true; - is_resolved = false; - this->Trigger(); -} - -ActionPtr DnsResolver::Resolve( - NamedAddr const& /* name_address */, std::uint16_t /* port_hint */, - Protocol /* protocol_hint */) { - // must be overridden - assert(false); - return {}; -} - -} // namespace ae +namespace ae {} // namespace ae #endif diff --git a/aether/dns/dns_resolve.h b/aether/dns/dns_resolve.h index 0f9b5e06..98be40b5 100644 --- a/aether/dns/dns_resolve.h +++ b/aether/dns/dns_resolve.h @@ -25,25 +25,12 @@ #if AE_SUPPORT_CLOUD_DNS # include "aether/obj/obj.h" # include "aether/types/address.h" -# include "aether/actions/action.h" -# include "aether/actions/action_ptr.h" +# include "aether/executors/executors.h" namespace ae { -// Action to get host name resolve -class ResolveAction : public Action { - public: - using Action::Action; - - UpdateStatus Update() const; - // Set ip addresses after resolve query finished - void SetAddress(std::vector addr); - void Failed(); - - bool is_resolved{false}; - bool is_failed{false}; - std::vector addresses; -}; +using ResolveSender = + ex::AnySender), ex::set_error_t(int)>; /** * \brief Base class of DNS resolver @@ -55,17 +42,15 @@ class DnsResolver : public Obj { DnsResolver() = default; public: -# if defined AE_DISTILLATION explicit DnsResolver(ObjProp prop) : Obj{prop} {} -# endif ~DnsResolver() override = default; AE_OBJECT_REFLECT() // Make a host name resolve - virtual ActionPtr Resolve(NamedAddr const& name_address, - std::uint16_t port_hint, - Protocol protocol_hint); + virtual ResolveSender Resolve(NamedAddr const& name_address, + std::uint16_t port_hint, + Protocol protocol_hint) = 0; }; } // namespace ae #else diff --git a/aether/dns/esp32_dns_resolve.cpp b/aether/dns/esp32_dns_resolve.cpp index 11975c31..c6cc388b 100644 --- a/aether/dns/esp32_dns_resolve.cpp +++ b/aether/dns/esp32_dns_resolve.cpp @@ -18,178 +18,145 @@ #if defined ESP32_DNS_RESOLVER_ENABLED -# include # include # include -# include "freertos/FreeRTOS.h" -# include "freertos/task.h" -# include "freertos/event_groups.h" -# include "esp_system.h" -# include "esp_event.h" -# include "nvs_flash.h" +// # include "freertos/FreeRTOS.h" +// # include "freertos/task.h" +// # include "freertos/event_groups.h" # include "lwip/err.h" -# include "lwip/sys.h" # include "lwip/dns.h" # include "lwip/tcpip.h" # include "aether/aether.h" -# include "aether/actions/action_ptr.h" -# include "aether/actions/action_context.h" -# include "aether/events/multi_subscription.h" +# include "aether/executors/executors.h" # include "aether/dns/dns_tele.h" namespace ae { - -class GetHostByNameQueryAction : public Action { - public: - GetHostByNameQueryAction(ActionContext action_context, - NamedAddr const& name_address, std::uint16_t port, - Protocol protocol) - : Action{action_context}, - name_address_{std::move(name_address)}, - port_{port}, - protocol_{protocol} {} - - UpdateStatus Update() { - if (is_result_) { - return UpdateStatus::Result(); - } - if (is_failed_) { - return UpdateStatus::Error(); - } - return {}; +Result, int> ProcessQueryResult(ip_addr_t const* ipaddr, + std::uint16_t port, + Protocol protocol) { + if (ipaddr == nullptr) { + return Error{1}; } - void ProcessQueryResult(ip_addr_t const* ipaddr) { - if (!ipaddr) { - Failed(); - return; - } - auto addr_add = [this](auto const& ip) { - Endpoint addr{{ip, port_}, protocol_}; - resolved_addresses_.emplace_back(std::move(addr)); - }; + std::vector resolved_addresses; + + auto addr_add = [&](auto const& ip) { + resolved_addresses.emplace_back(Endpoint{{ip, port}, protocol}); + }; - if (IP_IS_V4_VAL(*ipaddr)) { + if (IP_IS_V4_VAL(*ipaddr)) { # if AE_SUPPORT_IPV4 - IpV4Addr ipv4{}; - auto ip4 = ip_2_ip4(ipaddr); - std::memcpy(&ipv4.ipv4_value, - reinterpret_cast(&ip4->addr), - sizeof(ipv4.ipv4_value)); - addr_add(ipv4); + IpV4Addr ipv4{}; + auto ip4 = ip_2_ip4(ipaddr); + std::memcpy(&ipv4.ipv4_value, + reinterpret_cast(&ip4->addr), + sizeof(ipv4.ipv4_value)); + addr_add(ipv4); # endif - } else if (IP_IS_V6_VAL(*ipaddr)) { + } else if (IP_IS_V6_VAL(*ipaddr)) { # if AE_SUPPORT_IPV6 - IpV6Addr ipv6{}; - auto ip6 = ip_2_ip6(ipaddr); - addr.ip.set_value(reinterpret_cast(&ip6->addr)); - std::memcpy(&ipv6.ipv6_value, - reinterpret_cast(&ip6->addr), - sizeof(ipv6.ipv6_value)); - addr_add(ipv6); + IpV6Addr ipv6{}; + auto ip6 = ip_2_ip6(ipaddr); + addr.ip.set_value(reinterpret_cast(&ip6->addr)); + std::memcpy(&ipv6.ipv6_value, + reinterpret_cast(&ip6->addr), + sizeof(ipv6.ipv6_value)); + addr_add(ipv6); # endif - } - AE_TELE_DEBUG(kEspDnsQuerySuccess, "Got addresses {}", resolved_addresses_); - Result(); - } - - std::vector const& resolved_addresses() const { - return resolved_addresses_; } + AE_TELE_DEBUG(kEspDnsQuerySuccess, "Got addresses {}", resolved_addresses); + return Ok{std::move(resolved_addresses)}; +} - void Failed() { - is_failed_ = true; - Action::Trigger(); - } +class GHbyNameResolveSender { + public: + struct State { + NamedAddr name_address; + std::uint16_t port_hint; + Protocol protocol_hint; + }; + + template + struct Operation { + void start() noexcept { + AE_TELE_DEBUG(kEspDnsQueryHost, "Querying host: {}", state.name_address); + + // make query + ip_addr_t cached_addr; + LOCK_TCPIP_CORE(); + auto res = dns_gethostbyname( + state.name_address.name.c_str(), &cached_addr, + [](const char* /* name */, const ip_addr_t* ipaddr, void* arg) { + auto* op = static_cast*>(arg); + auto r = ProcessQueryResult(ipaddr, op->state.port_hint, + op->state.protocol_hint); + if (r.IsOk()) { + ex::set_value(std::move(op->recv), std::move(r.value())); + } else { + ex::set_error(std::move(op->recv), std::move(r.error())); + } + }, + this); + UNLOCK_TCPIP_CORE(); + + if (res == ERR_OK) { + // process cached value + auto r = ProcessQueryResult(&cached_addr, state.port_hint, + state.protocol_hint); + if (r.IsOk()) { + ex::set_value(std::move(recv), std::move(r.value())); + } else { + ex::set_error(std::move(recv), std::move(r.error())); + } + } else if (res == ERR_ARG) { + AE_TELE_ERROR(kEspDnsQueryError, + "Dns client not initialized or invalid hostname"); + ex::set_error(std::move(recv), 2); + } + } - private: - void Result() { - is_result_ = true; - Action::Trigger(); + R recv; + State state; + }; + + using sender_concept = ex::sender_t; + + constexpr GHbyNameResolveSender(NamedAddr const& name_address, + std::uint16_t port_hint, + Protocol protocol_hint) noexcept + : state_{ + .name_address = name_address, + .port_hint = port_hint, + .protocol_hint = protocol_hint, + } {} + + template + static consteval auto get_completion_signatures() noexcept { + return ex::completion_signatures), + ex::set_error_t(int)>{}; } - NamedAddr name_address_; - std::uint16_t port_; - Protocol protocol_; - std::vector resolved_addresses_; - std::atomic_bool is_result_{false}; - std::atomic_bool is_failed_{false}; -}; - -class GethostByNameDnsResolver { - public: - explicit GethostByNameDnsResolver(ActionContext action_context) - : action_context_{action_context} {} - - ActionPtr Query(NamedAddr const& name_address, - std::uint16_t port_hint, - Protocol protocol_hint) { - AE_TELE_DEBUG(kEspDnsQueryHost, "Querying host: {}", name_address); - auto resolve_action = ActionPtr{action_context_}; - auto query_action = ActionPtr{ - action_context_, name_address, port_hint, protocol_hint}; - - // connect actions - multi_subscription_.Push( - query_action->StatusEvent().Subscribe(ActionHandler{ - OnResult{[ra{resolve_action}](auto const& action) mutable { - ra->SetAddress(action.resolved_addresses()); - }}, - OnError{ - [ra{resolve_action}](auto const&) mutable { ra->Failed(); }}, - })); - - // make query - ip_addr_t cached_addr; - LOCK_TCPIP_CORE(); - auto res = dns_gethostbyname( - name_address.name.c_str(), &cached_addr, - [](const char* /* name */, const ip_addr_t* ipaddr, - void* callback_arg) { - auto* query_action = - static_cast(callback_arg); - query_action->ProcessQueryResult(ipaddr); - }, - &*query_action); - UNLOCK_TCPIP_CORE(); - - if (res == ERR_OK) { - query_action->ProcessQueryResult(&cached_addr); - } else if (res == ERR_ARG) { - AE_TELE_ERROR(kEspDnsQueryError, - "Dns client not initialized or invalid hostname"); - query_action->Failed(); - } - return resolve_action; + template + constexpr auto connect(R&& r) && noexcept { + return Operation{.recv = std::forward(r), .state = std::move(state_)}; } private: - ActionContext action_context_; - MultiSubscription multi_subscription_; + State state_; }; -Esp32DnsResolver::Esp32DnsResolver() = default; - -# ifdef AE_DISTILLATION Esp32DnsResolver::Esp32DnsResolver(ObjProp prop, ObjPtr aether) : DnsResolver{prop}, aether_{std::move(aether)} {} -# endif -Esp32DnsResolver::~Esp32DnsResolver() = default; - -ActionPtr Esp32DnsResolver::Resolve( - NamedAddr const& name_address, std::uint16_t port_hint, - Protocol protocol_hint) { - if (!gethostbyname_dns_resolver_) { - gethostbyname_dns_resolver_ = std::make_unique( - *aether_.Load().as()); - } - return gethostbyname_dns_resolver_->Query(name_address, port_hint, - protocol_hint); +ResolveSender Esp32DnsResolver::Resolve(NamedAddr const& name_address, + std::uint16_t port_hint, + Protocol protocol_hint) { + return GHbyNameResolveSender{name_address, port_hint, protocol_hint} | + ex::continues_on(ex::SchedulerOnTasks{*aether_.Load().as()}); } } // namespace ae diff --git a/aether/dns/esp32_dns_resolve.h b/aether/dns/esp32_dns_resolve.h index fd176f4e..db60c800 100644 --- a/aether/dns/esp32_dns_resolve.h +++ b/aether/dns/esp32_dns_resolve.h @@ -24,8 +24,6 @@ # if (defined(ESP_PLATFORM)) # define ESP32_DNS_RESOLVER_ENABLED 1 -# include - # include "aether/obj/obj.h" # include "aether/dns/dns_resolve.h" @@ -36,24 +34,18 @@ class GethostByNameDnsResolver; class Esp32DnsResolver : public DnsResolver { AE_OBJECT(Esp32DnsResolver, DnsResolver, 0) - Esp32DnsResolver(); + Esp32DnsResolver() = default; public: -# ifdef AE_DISTILLATION Esp32DnsResolver(ObjProp prop, ObjPtr aether); -# endif - - ~Esp32DnsResolver() override; AE_OBJECT_REFLECT(AE_MMBR(aether_)) - ActionPtr Resolve(NamedAddr const& name_address, - std::uint16_t port_hint, - Protocol protocol_hint) override; + ResolveSender Resolve(NamedAddr const& name_address, std::uint16_t port_hint, + Protocol protocol_hint) override; private: Obj::ptr aether_; - std::unique_ptr gethostbyname_dns_resolver_; }; } // namespace ae diff --git a/aether/executors/any_sender.h b/aether/executors/any_sender.h new file mode 100644 index 00000000..2bab3f26 --- /dev/null +++ b/aether/executors/any_sender.h @@ -0,0 +1,38 @@ +/* + * Copyright 2026 Aethernet Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef AETHER_EXECUTORS_ANY_SENDER_H_ +#define AETHER_EXECUTORS_ANY_SENDER_H_ + +#include + +#include "third_party/stdexec/include/exec/any_sender_of.hpp" + +namespace ae::ex { +// provide the list of signatures in form +// set_value_t(type...), set_error_t(type) +// !Note set_error_t must take one type, while set_value_t may not have any +template +using AnySender = typename experimental::execution::any_receiver_ref< + stdexec::completion_signatures + // ~['_']~ + >::template any_sender<>; +} // namespace ae::ex + +#endif // AETHER_EXECUTORS_ANY_SENDER_H_ diff --git a/aether/executors/async_wait.h b/aether/executors/async_wait.h new file mode 100644 index 00000000..79237941 --- /dev/null +++ b/aether/executors/async_wait.h @@ -0,0 +1,206 @@ +/* + * Copyright 2026 Aethernet Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef AETHER_EXECUTORS_ASYNC_WAIT_H_ +#define AETHER_EXECUTORS_ASYNC_WAIT_H_ + +#include + +#include +#include +#include +#include +#include + +#include "aether/types/result.h" +#include "aether/meta/type_list.h" +#include "aether/executors/scheduler_on_tasks.h" + +#include "third_party/stdexec/include/stdexec/execution.hpp" + +namespace ae::ex { + +namespace async_wait_internal { +template