Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aether/access_points/wifi_access_point.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ ActionPtr<WifiConnectAction> WifiAccessPoint::Connect() {
assert(driver.has_value());

connect_action_ = ActionPtr<WifiConnectAction>{
*aether_.Load().as<Aether>(), **driver, wifi_ap_, psp_, base_station_};
ActionContext{*aether_.Load().as<Aether>()}, **driver, wifi_ap_, psp_, base_station_};
connect_sub_ = connect_action_->FinishedEvent().Subscribe(
[this]() { connect_action_.reset(); });
}
Expand Down
7 changes: 7 additions & 0 deletions aether/actions/action_ptr.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <type_traits>

#include "aether/common.h"
#include "aether/ae_context.h"
#include "aether/actions/action_context.h"

namespace ae {
Expand Down Expand Up @@ -50,6 +51,12 @@ class ActionPtr {
action_context.get_registry().PushBack(ptr_);
}

template <typename... TArgs>
explicit ActionPtr(AeContext ae_context, TArgs&&... args)
: ptr_{std::make_shared<TAction>(ae_context,
std::forward<TArgs>(args)...)} {
}

template <typename UAction, AE_REQUIRERS((std::is_base_of<TAction, UAction>))>
ActionPtr(ActionPtr<UAction> const& other) : ptr_(other.ptr_) {}

Expand Down
22 changes: 10 additions & 12 deletions aether/ae_actions/check_access_for_send_message.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,17 @@ CheckAccessForSendMessage::CheckAccessForSendMessage(
action_context,
AuthApiRequest{[this](ApiContext<AuthorizedApi>& auth_api, auto*,
auto* request) {
auto check_promise =
auth_api->check_access_for_send_message(destination_);
wait_check_sub_ =
check_promise->StatusEvent().Subscribe(ActionHandler{
OnResult{[&]() {
ResponseReceived();
request->Succeeded();
}},
OnError{[&]() {
ErrorReceived();
request->Failed();
}},
});
auth_api->check_access_for_send_message(destination_)
.Subscribe([&](auto const& res) {
if (res) {
ResponseReceived();
request->Succeeded();
} else {
ErrorReceived();
request->Failed();
}
});
}},
cloud_connection,
request_policy,
Expand Down
9 changes: 5 additions & 4 deletions aether/ae_actions/ping.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ void Ping::SendPing() {

auto write_action = client_server_connection_->AuthorizedApiCall(
SubApi{[this](ApiContext<AuthorizedApi>& auth_api) {
auto pong_promise = auth_api->ping(static_cast<std::uint64_t>(
auto& pong_promise = auth_api->ping(static_cast<std::uint64_t>(
std::chrono::duration_cast<std::chrono::milliseconds>(
ping_interval_)
.count()));
Expand All @@ -85,11 +85,12 @@ void Ping::SendPing() {
ping_requests_.push(PingRequest{
current_time,
current_time + expected_ping_time,
pong_promise->request_id(),
pong_promise.request_id(),
});
// Wait for response
wait_responses_.Push(pong_promise->StatusEvent().Subscribe(OnResult{
[&](auto const& promise) { PingResponse(promise.request_id()); }}));
wait_responses_.Push(
pong_promise.Subscribe([&, req_id{pong_promise.request_id()}](
auto&&) { PingResponse(req_id); }));
}});

write_subscription_ = write_action->StatusEvent().Subscribe(OnError{[this]() {
Expand Down
21 changes: 11 additions & 10 deletions aether/ae_actions/select_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,21 @@ SelectClientAction::SelectClientAction(ActionContext action_context,
#if AE_SUPPORT_REGISTRATION // only if registration is supported
SelectClientAction::SelectClientAction(ActionContext action_context,
Aether& aether,
ActionPtr<Registration> registration,
Registration& registration,
std::string client_id)
: Action{action_context},
state_{State::kWaitRegistration},
client_id_{std::move(client_id)},
registration_{std::move(registration)} {
client_id_{std::move(client_id)} {
AE_TELED_DEBUG("Waiting for client registration");
registration_sub_ = registration_->StatusEvent().Subscribe(ActionHandler{
OnResult{[this, aeth{&aether}](auto& action) {
client_ = aeth->CreateClient(action.client_config(), client_id_);
state_ = State::kClientRegistered;
}},
OnError{[this]() { state_ = State::kClientRegistrationError; }},
});
registration_sub_ = registration.registration().Subscribe(
[this, aeth{&aether}](auto const& res) {
if (res) {
client_ = aeth->CreateClient(res.value(), client_id_);
state_ = State::kClientRegistered;
} else {
state_ = State::kClientRegistrationError;
}
});

state_.changed_event().Subscribe([this](auto) { Action::Trigger(); });
}
Expand Down
3 changes: 1 addition & 2 deletions aether/ae_actions/select_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ class SelectClientAction final : public Action<SelectClientAction> {
* \brief Wait for client registration or error.
*/
SelectClientAction(ActionContext action_context, Aether& aether,
ActionPtr<Registration> registration,
std::string client_id);
Registration& registration, std::string client_id);
#endif

/**
Expand Down
8 changes: 4 additions & 4 deletions aether/ae_actions/time_sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,16 +143,16 @@ class TimeSyncRequest : public Action<TimeSyncRequest> {
auto write_action =
cc->LoginApiCall(SubApi<LoginApi>{[this](auto& api) {
AE_TELED_DEBUG("Make time sync request");
ApiPromisePtr<std::uint64_t> promise = api->get_time_utc();
response_sub_ = promise->StatusEvent().Subscribe(
OnResult{[this, request_time{Now()}](auto& p) {
response_sub_ = api->get_time_utc().Subscribe(
[this, request_time{Now()}](auto const& p) {
assert(p.IsOk());
HandleResponse(
std::chrono::milliseconds{
static_cast<std::int64_t>(p.value())},
request_time, Now());
// time synced
state_ = State::kResult;
}});
});
}});
write_action_sub_ =
write_action->StatusEvent().Subscribe(OnError{[this]() {
Expand Down
20 changes: 9 additions & 11 deletions aether/aether.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -125,8 +125,7 @@ ActionPtr<SelectClientAction> Aether::SelectClient(
}
// register new client
#if AE_SUPPORT_REGISTRATION
auto registration = RegisterClient(parent_uid);
return MakeSelectClient(std::move(registration), client_id);
return MakeSelectClient(RegisterClient(client_id, parent_uid), client_id);
#else
return MakeSelectClient();
#endif
Expand Down Expand Up @@ -180,22 +179,21 @@ ActionPtr<SelectClientAction> Aether::MakeSelectClient(

#if AE_SUPPORT_REGISTRATION
ActionPtr<SelectClientAction> Aether::MakeSelectClient(
ActionPtr<Registration> registration, std::string const& client_id) {
auto select_action = ActionPtr<SelectClientAction>{
*action_processor, *this, std::move(registration), client_id};
Registration& registration, std::string const& client_id) {
auto select_action = ActionPtr<SelectClientAction>{*action_processor, *this,
registration, client_id};
select_client_actions_[client_id] = select_action;
return select_action;
}

ActionPtr<Registration> Aether::RegisterClient(Uid parent_uid) {
Registration& Aether::RegisterClient(std::string const& client_id,
Uid parent_uid) {
auto reg_cloud = registration_cloud.Load();
assert(reg_cloud && "Registration cloud not loaded");

// registration new client is long termed process
// after registration done, add it to clients list
// user also can get new client after
return ActionPtr<Registration>(*action_processor, *this, reg_cloud,
parent_uid);
auto& reg = registrations_[client_id] =
std::make_unique<Registration>(AeContext{*this}, reg_cloud, parent_uid);
return *reg;
}
#endif

Expand Down
7 changes: 4 additions & 3 deletions aether/aether.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,13 +120,14 @@ class Aether : public Obj {
ActionPtr<SelectClientAction> MakeSelectClient(
ObjPtr<Client> const& client) const;
#if AE_SUPPORT_REGISTRATION
ActionPtr<SelectClientAction> MakeSelectClient(
ActionPtr<Registration> registration, std::string const& client_id);
ActionPtr<SelectClientAction> MakeSelectClient(Registration& registration,
std::string const& client_id);

public:
ActionPtr<Registration> RegisterClient(Uid parent_uid);
Registration& RegisterClient(std::string const& client_id, Uid parent_uid);

private:
std::map<std::string, std::unique_ptr<Registration>> registrations_;
#endif

void MakeTimeSyncAction(ObjPtr<Client> const& client);
Expand Down
46 changes: 23 additions & 23 deletions aether/api_protocol/api_method.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@
#ifndef AETHER_API_PROTOCOL_API_METHOD_H_
#define AETHER_API_PROTOCOL_API_METHOD_H_

#include "aether/actions/action_context.h"
#include "aether/api_protocol/api_message.h"
#include "aether/api_protocol/api_context.h"
#include "aether/api_protocol/api_pack_parser.h"
#include "aether/api_protocol/protocol_context.h"
#include "aether/api_protocol/api_promise_action.h"
#include "aether/api_protocol/api_promise.h"

namespace ae {
class DefaultArgProc {
Expand Down Expand Up @@ -74,39 +73,40 @@ struct Method<MessageCode, void(Args...), ArgProc> {
* return PromiseView<R> for waiting the result or error.
*/
template <MessageId MessageCode, typename R, typename... Args, typename ArgProc>
struct Method<MessageCode, ApiPromisePtr<R>(Args...), ArgProc> {
explicit Method(ProtocolContext& protocol_context,
ActionContext action_context, ArgProc arg_proc = {})
: protocol_context_{&protocol_context},
action_context_{std::move(action_context)},
arg_proc_{std::move(arg_proc)} {}
struct Method<MessageCode, ApiPromise<R>(Args...), ArgProc> {
explicit Method(ProtocolContext& protocol_context, ArgProc arg_proc = {})
: protocol_context_{&protocol_context}, arg_proc_{std::move(arg_proc)} {}

ApiPromisePtr<R> operator()(Args... args) {
ApiPromise<R>& operator()(Args... args) {
auto request_id = RequestId::GenRequestId();
auto* packet_stack = protocol_context_->packet_stack();
assert(packet_stack);
packet_stack->Push(*this,
arg_proc_(request_id, std::forward<Args>(args)...));

auto promise_ptr = ApiPromisePtr<R>{action_context_, request_id};
api_promise_.emplace(request_id);

if constexpr (!std::is_same_v<void, R>) {
protocol_context_->AddSendResultCallback(
request_id,
[p_ptr{promise_ptr}, context{protocol_context_}]() mutable {
p_ptr->SetValue(context->parser()->template Extract<R>());
});
protocol_context_->AddSendResultCallback(request_id, [this]() {
assert(api_promise_);
api_promise_->SetValue(
protocol_context_->parser()->template Extract<R>());
api_promise_.reset();
});
} else {
protocol_context_->AddSendResultCallback(
request_id, [p_ptr{promise_ptr}]() mutable { p_ptr->SetValue(); });
protocol_context_->AddSendResultCallback(request_id, [this]() {
assert(api_promise_);
api_promise_->SetValue();
api_promise_.reset();
});
}
protocol_context_->AddSendErrorCallback(
request_id, [p_ptr{promise_ptr}](auto, auto) mutable {
assert(p_ptr);
p_ptr->Reject();
request_id, [this](auto, std::uint32_t err) {
assert(api_promise_);
api_promise_->SetError(std::move(err));
api_promise_.reset();
});

return promise_ptr;
return *api_promise_;
}

template <typename... Ts>
Expand All @@ -116,7 +116,7 @@ struct Method<MessageCode, ApiPromisePtr<R>(Args...), ArgProc> {

private:
ProtocolContext* protocol_context_;
ActionContext action_context_;
std::optional<ApiPromise<R>> api_promise_;
ArgProc arg_proc_;
};

Expand Down
92 changes: 92 additions & 0 deletions aether/api_protocol/api_promise.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
/*
* Copyright 2025 Aethernet Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#ifndef AETHER_API_PROTOCOL_API_PROMISE_H_
#define AETHER_API_PROTOCOL_API_PROMISE_H_

#include <cstdint>

#include "aether/types/result.h"
#include "aether/events/events.h"
#include "aether/api_protocol/request_id.h"

namespace ae {
namespace api_promise_action_internal {
struct Empty {};
}; // namespace api_promise_action_internal

template <typename Value, typename Error = std::uint32_t>
class ApiPromise {
public:
using value_type = Value;
using error_type = Error;
using result_type = Result<value_type, error_type>;

constexpr explicit ApiPromise(RequestId req_id) noexcept
: request_id_{req_id} {}

constexpr void SetValue(value_type&& val) noexcept {
event_.Emit(result_type{Ok{std::move(val)}});
}

constexpr void SetError(error_type&& err) noexcept {
event_.Emit(result_type{Error{std::move(err)}});
}

constexpr RequestId request_id() const { return request_id_; }

template <typename Fn>
constexpr decltype(auto) Subscribe(Fn&& fn) noexcept {
return EventSubscriber{event_}.Subscribe(std::forward<Fn>(fn));
}

private:
RequestId request_id_;
Event<void(result_type&& res)> event_;
};

template <typename Error>
class ApiPromise<void, Error> {
public:
using value_type = api_promise_action_internal::Empty;
using error_type = Error;
using result_type = Result<value_type, error_type>;

constexpr explicit ApiPromise(RequestId req_id) noexcept
: request_id_{req_id} {}

constexpr void SetValue() noexcept {
event_.Emit(result_type{api_promise_action_internal::Empty{}});
}

constexpr void SetError(error_type&& err) noexcept {
event_.Emit(result_type{Error{std::move(err)}});
}

constexpr RequestId request_id() const { return request_id_; }

template <typename Fn>
[[nodiscard]] constexpr decltype(auto) Subscribe(Fn&& fn) noexcept {
return EventSubscriber{event_}.Subscribe(std::forward<Fn>(fn));
}

private:
RequestId request_id_;
Event<void(result_type&& res)> event_;
};

} // namespace ae
#endif // AETHER_API_PROTOCOL_API_PROMISE_H_
Loading
Loading