Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
d3b88ed
add tasks
BartolomeyKant Mar 10, 2026
a5c14d9
add test tasks
BartolomeyKant Mar 10, 2026
60193a3
remove task_queue
BartolomeyKant Mar 11, 2026
1c1363c
add ae_context and task scheduler to aether
BartolomeyKant Mar 11, 2026
6cd64d2
make ToAeContext const
BartolomeyKant Mar 16, 2026
57eb89a
TEMPORARY add call to task scheduler and action processor together
BartolomeyKant Mar 16, 2026
9d3019e
add TypeLisToTemplate_t
BartolomeyKant Mar 16, 2026
586942e
add iterator type
BartolomeyKant Mar 19, 2026
cf0a04f
add executors
BartolomeyKant Mar 16, 2026
20b6199
add use executors in channel connection
BartolomeyKant Mar 16, 2026
97e6a12
change to default == operator
BartolomeyKant Mar 19, 2026
5d73f00
make disable install temporary
BartolomeyKant Mar 23, 2026
b531b4a
Merge pull request #444 from aethernetio/440-try-nvidias-stdexec
BartolomeyKant Mar 23, 2026
91171b8
fix lint issues or result, add bool operator, fix const get value
BartolomeyKant Mar 23, 2026
0ef13dc
change api promise to simple object with SetValue and Subscribe methods
BartolomeyKant Mar 23, 2026
5783d83
fix use new api promise
BartolomeyKant Mar 23, 2026
1b3202d
temporary make create action both with ae_context and action_context
BartolomeyKant Mar 25, 2026
1dd3c3d
add override for override func pattern and ignore_t as an empty type
BartolomeyKant Mar 25, 2026
612e255
add with timer and make fixes to executors
BartolomeyKant Mar 25, 2026
2dd57af
add make api call sender
BartolomeyKant Mar 25, 2026
6351221
remake registration with executors
BartolomeyKant Mar 25, 2026
bcb0057
Merge pull request #446 from aethernetio/445-registrastion-on-new-asy…
BartolomeyKant Mar 26, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,7 @@
path = third_party/gcem
url = https://github.com/aethernetio/gcem.git
branch = master
[submodule "third_party/stdexec"]
path = third_party/stdexec
url = https://github.com/aethernetio/stdexec.git
branch = main
33 changes: 19 additions & 14 deletions aether/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ list(APPEND common_dependencies
"../third_party/gcem"
"../third_party/etl"
"../third_party/aethernet-numeric"
"../third_party/stdexec"
)
set(STDEXEC_BUILD_EXAMPLES Off)

# for etl
set(GIT_DIR_LOOKUP_POLICY ALLOW_LOOKING_ABOVE_CMAKE_SOURCE_DIR)
Expand Down Expand Up @@ -109,7 +111,6 @@ list(APPEND actions_srcs
"actions/action_registry.cpp"
"actions/action_processor.cpp"
"actions/timer_action.cpp"
"actions/task_queue.cpp"
"actions/repeatable_task.cpp"
)

Expand Down Expand Up @@ -360,7 +361,8 @@ if(REGULAR_CMAKE_PROJECT)
sodium
hydrogen
gcem
etl)
etl
stdexec)
target_link_libraries(${PROJECT_NAME} PRIVATE c-ares )

set(TARGET_NAME "${PROJECT_NAME}")
Expand Down Expand Up @@ -417,7 +419,8 @@ else()
sodium
hydrogen
gcem
etl)
etl
stdexec)
else()
#ERROR
message(SEND_ERROR "You must specify the CMAKE version!")
Expand Down Expand Up @@ -539,10 +542,12 @@ elseif (CMAKE_CXX_COMPILER_ID STREQUAL "MSVC")
PRIVATE
/W4
/WX
/w15262 #implisitfallthrough
/wd4388 #Wno-sign-compare
/wd4389 #Wno-sign-compare
/wd4244
/w15262 #implisitfallthrough
PUBLIC
/wd4100 /wd4101 /wd4127 /wd4244 /wd4324
/wd4456 /wd4459 /wd4714
)
target_compile_options(${TARGET_NAME} PUBLIC /Zc:preprocessor)
endif()
Expand All @@ -559,15 +564,15 @@ endif()

if(REGULAR_CMAKE_PROJECT AND NOT AE_BUILD_TESTS)
## Target installation
install(TARGETS ${TARGET_NAME}
EXPORT ${TARGET_NAME}Targets
ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/${TARGET_NAME}
COMPONENT library)
# install(TARGETS ${TARGET_NAME}
# EXPORT ${TARGET_NAME}Targets
# ARCHIVE DESTINATION ${CMAKE_INSTALL_LIBDIR}
# LIBRARY DESTINATION ${CMAKE_INSTALL_LIBDIR}
# PUBLIC_HEADER DESTINATION ${CMAKE_INSTALL_INCLUDEDIR}/${TARGET_NAME}
# COMPONENT library)

## Target's cmake files: targets export
install(EXPORT ${TARGET_NAME}Targets
NAMESPACE ${TARGET_NAME}::
DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/${TARGET_NAME})
#install(EXPORT ${TARGET_NAME}Targets
# NAMESPACE ${TARGET_NAME}::
# DESTINATION ${CMAKE_INSTALL_LIBDIR}/cmake/${TARGET_NAME})
endif()
2 changes: 1 addition & 1 deletion aether/access_points/wifi_access_point.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ ActionPtr<WifiConnectAction> WifiAccessPoint::Connect() {
assert(driver.has_value());

connect_action_ = ActionPtr<WifiConnectAction>{
*aether_.Load().as<Aether>(), **driver, wifi_ap_, psp_, base_station_};
ActionContext{*aether_.Load().as<Aether>()}, **driver, wifi_ap_, psp_, base_station_};
connect_sub_ = connect_action_->FinishedEvent().Subscribe(
[this]() { connect_action_.reset(); });
}
Expand Down
7 changes: 7 additions & 0 deletions aether/actions/action_ptr.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <type_traits>

#include "aether/common.h"
#include "aether/ae_context.h"
#include "aether/actions/action_context.h"

namespace ae {
Expand Down Expand Up @@ -50,6 +51,12 @@ class ActionPtr {
action_context.get_registry().PushBack(ptr_);
}

template <typename... TArgs>
explicit ActionPtr(AeContext ae_context, TArgs&&... args)
: ptr_{std::make_shared<TAction>(ae_context,
std::forward<TArgs>(args)...)} {
}

template <typename UAction, AE_REQUIRERS((std::is_base_of<TAction, UAction>))>
ActionPtr(ActionPtr<UAction> const& other) : ptr_(other.ptr_) {}

Expand Down
22 changes: 10 additions & 12 deletions aether/ae_actions/check_access_for_send_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,17 @@ CheckAccessForSendMessage::CheckAccessForSendMessage(
action_context,
AuthApiRequest{[this](ApiContext<AuthorizedApi>& auth_api, auto*,
auto* request) {
auto check_promise =
auth_api->check_access_for_send_message(destination_);
wait_check_sub_ =
check_promise->StatusEvent().Subscribe(ActionHandler{
OnResult{[&]() {
ResponseReceived();
request->Succeeded();
}},
OnError{[&]() {
ErrorReceived();
request->Failed();
}},
});
auth_api->check_access_for_send_message(destination_)
.Subscribe([&](auto const& res) {
if (res) {
ResponseReceived();
request->Succeeded();
} else {
ErrorReceived();
request->Failed();
}
});
}},
cloud_connection,
request_policy,
Expand Down
9 changes: 5 additions & 4 deletions aether/ae_actions/ping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void Ping::SendPing() {

auto write_action = client_server_connection_->AuthorizedApiCall(
SubApi{[this](ApiContext<AuthorizedApi>& auth_api) {
auto pong_promise = auth_api->ping(static_cast<std::uint64_t>(
auto& pong_promise = auth_api->ping(static_cast<std::uint64_t>(
std::chrono::duration_cast<std::chrono::milliseconds>(
ping_interval_)
.count()));
Expand All @@ -85,11 +85,12 @@ void Ping::SendPing() {
ping_requests_.push(PingRequest{
current_time,
current_time + expected_ping_time,
pong_promise->request_id(),
pong_promise.request_id(),
});
// Wait for response
wait_responses_.Push(pong_promise->StatusEvent().Subscribe(OnResult{
[&](auto const& promise) { PingResponse(promise.request_id()); }}));
wait_responses_.Push(
pong_promise.Subscribe([&, req_id{pong_promise.request_id()}](
auto&&) { PingResponse(req_id); }));
}});

write_subscription_ = write_action->StatusEvent().Subscribe(OnError{[this]() {
Expand Down
21 changes: 11 additions & 10 deletions aether/ae_actions/select_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,21 @@ SelectClientAction::SelectClientAction(ActionContext action_context,
#if AE_SUPPORT_REGISTRATION // only if registration is supported
SelectClientAction::SelectClientAction(ActionContext action_context,
Aether& aether,
ActionPtr<Registration> registration,
Registration& registration,
std::string client_id)
: Action{action_context},
state_{State::kWaitRegistration},
client_id_{std::move(client_id)},
registration_{std::move(registration)} {
client_id_{std::move(client_id)} {
AE_TELED_DEBUG("Waiting for client registration");
registration_sub_ = registration_->StatusEvent().Subscribe(ActionHandler{
OnResult{[this, aeth{&aether}](auto& action) {
client_ = aeth->CreateClient(action.client_config(), client_id_);
state_ = State::kClientRegistered;
}},
OnError{[this]() { state_ = State::kClientRegistrationError; }},
});
registration_sub_ = registration.registration().Subscribe(
[this, aeth{&aether}](auto const& res) {
if (res) {
client_ = aeth->CreateClient(res.value(), client_id_);
state_ = State::kClientRegistered;
} else {
state_ = State::kClientRegistrationError;
}
});

state_.changed_event().Subscribe([this](auto) { Action::Trigger(); });
}
Expand Down
3 changes: 1 addition & 2 deletions aether/ae_actions/select_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ class SelectClientAction final : public Action<SelectClientAction> {
* \brief Wait for client registration or error.
*/
SelectClientAction(ActionContext action_context, Aether& aether,
ActionPtr<Registration> registration,
std::string client_id);
Registration& registration, std::string client_id);
#endif

/**
Expand Down
8 changes: 4 additions & 4 deletions aether/ae_actions/time_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,16 +143,16 @@ class TimeSyncRequest : public Action<TimeSyncRequest> {
auto write_action =
cc->LoginApiCall(SubApi<LoginApi>{[this](auto& api) {
AE_TELED_DEBUG("Make time sync request");
ApiPromisePtr<std::uint64_t> promise = api->get_time_utc();
response_sub_ = promise->StatusEvent().Subscribe(
OnResult{[this, request_time{Now()}](auto& p) {
response_sub_ = api->get_time_utc().Subscribe(
[this, request_time{Now()}](auto const& p) {
assert(p.IsOk());
HandleResponse(
std::chrono::milliseconds{
static_cast<std::int64_t>(p.value())},
request_time, Now());
// time synced
state_ = State::kResult;
}});
});
}});
write_action_sub_ =
write_action->StatusEvent().Subscribe(OnError{[this]() {
Expand Down
66 changes: 66 additions & 0 deletions aether/ae_context.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright 2026 Aethernet Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef AETHER_AE_CONTEXT_H_
#define AETHER_AE_CONTEXT_H_

#include <concepts>

#include "aether/config.h"

#include "aether/tasks/manual_task_scheduler.h"

namespace ae {
class Aether;
using TaskScheduler = ManualTaskScheduler<
TaskManagerConf<AE_TASK_MAX_COUNT, AE_TASK_MAX_SIZE, AE_TASK_ALIGN>>;

struct AeCtxTable {
Aether& (*aether_getter)(void* obj);
TaskScheduler& (*scheduler_getter)(void* obj);
};

struct AeCtx {
bool operator==(AeCtx const&) const = default;

void* obj;
AeCtxTable const* vtable;
};

template <typename T>
concept AeContextual = requires(T const& t) {
{ t.ToAeContext() } -> std::same_as<AeCtx>;
};

class AeContext {
public:
template <AeContextual T>
constexpr AeContext(T const& obj) // NOLINT(*explicit-constructor)
: ctx_{obj.ToAeContext()} {}

Aether& aether() const { return ctx_.vtable->aether_getter(ctx_.obj); }
TaskScheduler& scheduler() const {
return ctx_.vtable->scheduler_getter(ctx_.obj);
}

bool operator==(AeContext const&) const = default;

private:
AeCtx ctx_;
};
} // namespace ae

#endif // AETHER_AE_CONTEXT_H_
42 changes: 28 additions & 14 deletions aether/aether.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,39 @@

namespace ae {

Aether::Aether() : action_processor{make_unique<ActionProcessor>()} {}
Aether::Aether()
: action_processor{make_unique<ActionProcessor>()},
task_scheduler{make_unique<TaskScheduler>()} {}

Aether::Aether(ObjProp prop)
: Obj{prop}, action_processor{make_unique<ActionProcessor>()} {
: Obj{prop},
action_processor{make_unique<ActionProcessor>()},
task_scheduler{make_unique<TaskScheduler>()} {
AE_TELE_DEBUG(AetherCreated);
}

Aether::~Aether() { AE_TELE_DEBUG(AetherDestroyed); }

void Aether::Update(TimePoint current_time) {
update_time = action_processor->Update(current_time);
task_scheduler->Update(current_time);
action_processor->Update(current_time);
update_time = current_time + 100ms;
}

Aether::operator ActionContext() const {
return ActionContext{*action_processor};
}

AeCtx Aether::ToAeContext() const {
static constexpr AeCtxTable ae_table{
[](void* obj) -> Aether& { return *static_cast<Aether*>(obj); },
[](void* obj) -> TaskScheduler& {
return *static_cast<Aether*>(obj)->task_scheduler;
},
};
return AeCtx{const_cast<Aether*>(this), &ae_table}; // NOLINT(*const-cast)
}

Client::ptr Aether::CreateClient(ClientConfig const& config,
std::string const& client_id) {
auto client = FindClient(client_id);
Expand Down Expand Up @@ -109,8 +125,7 @@ ActionPtr<SelectClientAction> Aether::SelectClient(
}
// register new client
#if AE_SUPPORT_REGISTRATION
auto registration = RegisterClient(parent_uid);
return MakeSelectClient(std::move(registration), client_id);
return MakeSelectClient(RegisterClient(client_id, parent_uid), client_id);
#else
return MakeSelectClient();
#endif
Expand Down Expand Up @@ -164,22 +179,21 @@ ActionPtr<SelectClientAction> Aether::MakeSelectClient(

#if AE_SUPPORT_REGISTRATION
ActionPtr<SelectClientAction> Aether::MakeSelectClient(
ActionPtr<Registration> registration, std::string const& client_id) {
auto select_action = ActionPtr<SelectClientAction>{
*action_processor, *this, std::move(registration), client_id};
Registration& registration, std::string const& client_id) {
auto select_action = ActionPtr<SelectClientAction>{*action_processor, *this,
registration, client_id};
select_client_actions_[client_id] = select_action;
return select_action;
}

ActionPtr<Registration> Aether::RegisterClient(Uid parent_uid) {
Registration& Aether::RegisterClient(std::string const& client_id,
Uid parent_uid) {
auto reg_cloud = registration_cloud.Load();
assert(reg_cloud && "Registration cloud not loaded");

// registration new client is long termed process
// after registration done, add it to clients list
// user also can get new client after
return ActionPtr<Registration>(*action_processor, *this, reg_cloud,
parent_uid);
auto& reg = registrations_[client_id] =
std::make_unique<Registration>(AeContext{*this}, reg_cloud, parent_uid);
return *reg;
}
#endif

Expand Down
Loading
Loading