Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions apps/benchmark/src/benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
#include "rtbot/Output.h"
#include "rtbot/Program.h"
#include "rtbot/bindings.h"
#include "rtbot/std/MathSyncBinaryOp.h"
#include "rtbot/std/ArithmeticSync.h"
#include "rtbot/std/MovingAverage.h"
#include "rtbot/std/PeakDetector.h"
#include "tools.h"
Expand Down Expand Up @@ -157,7 +157,7 @@ class PPGPipelineBenchmark {
p.input = std::make_shared<Input>("i1", std::vector<std::string>{PortType::NUMBER});
p.ma_short = std::make_shared<MovingAverage>("ma1", short_window_);
p.ma_long = std::make_shared<MovingAverage>("ma2", long_window_);
p.minus = std::make_shared<Subtraction>("diff");
p.minus = std::make_shared<Subtraction>("diff", 2);
p.peak = std::make_shared<PeakDetector>("peak", 2 * short_window_ + 1);
p.join = std::make_shared<Join>("join", std::vector<std::string>{PortType::NUMBER, PortType::NUMBER});
p.output = std::make_shared<Output>("o1", std::vector<std::string>{PortType::NUMBER});
Expand Down Expand Up @@ -252,8 +252,8 @@ class BollingerBandsPureBenchmark {
p.ma = std::make_shared<MovingAverage>("ma", 14);
p.sd = std::make_shared<StandardDeviation>("sd", 14);
p.scale = std::make_shared<Scale>("scale", 2.0);
p.upper = std::make_shared<Addition>("upper");
p.lower = std::make_shared<Subtraction>("lower");
p.upper = std::make_shared<Addition>("upper", 2);
p.lower = std::make_shared<Subtraction>("lower", 2);
p.output = std::make_shared<Output>("output",
std::vector<std::string>{PortType::NUMBER, PortType::NUMBER, PortType::NUMBER});

Expand Down
20 changes: 14 additions & 6 deletions libs/core/include/rtbot/Buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,13 @@ class Buffer : public Operator {
Bytes msg_bytes(it, it + msg_size);

// Deserialize message and cast to derived type
buffer_.push_back(
std::unique_ptr<Message<T>>(
dynamic_cast<Message<T>*>(BaseMessage::deserialize(msg_bytes).release())
)
);
auto base_msg = BaseMessage::deserialize(msg_bytes);
auto* typed_msg = dynamic_cast<Message<T>*>(base_msg.get());
if (!typed_msg) {
throw std::runtime_error("Failed to cast message during Buffer restore");
}
base_msg.release(); // Safe: cast validated above
buffer_.push_back(std::unique_ptr<Message<T>>(typed_msg));

it += msg_size;
}
Expand Down Expand Up @@ -199,7 +201,13 @@ class Buffer : public Operator {
}

// Add new message to buffer
buffer_.push_back(std::unique_ptr<Message<T>>(dynamic_cast<Message<T>*>(input_queue.front()->clone().release())));
auto cloned = input_queue.front()->clone();
auto* typed_clone = dynamic_cast<Message<T>*>(cloned.get());
if (!typed_clone) {
throw std::runtime_error("Failed to cast cloned message in Buffer");
}
cloned.release(); // Safe: cast validated above
buffer_.push_back(std::unique_ptr<Message<T>>(typed_clone));

// Update statistics with added and removed values
update_statistics(msg->data.value, removed_value);
Expand Down
3 changes: 2 additions & 1 deletion libs/core/include/rtbot/Message.h
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,8 @@ inline std::unique_ptr<Message<T>> BaseMessage::deserialize_as(const Bytes& byte
if (!typed_msg) {
throw std::runtime_error("Failed to cast message to requested type");
}
return std::unique_ptr<Message<T>>(static_cast<Message<T>*>(base_msg.release()));
base_msg.release(); // Safe: cast validated above
return std::unique_ptr<Message<T>>(typed_msg);
}

// Helper functions
Expand Down
39 changes: 29 additions & 10 deletions libs/core/include/rtbot/Operator.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,18 @@ struct PortInfo {
MessageQueue queue;
std::type_index type;
timestamp_t last_timestamp{std::numeric_limits<timestamp_t>::min()};

// Constructor
PortInfo(MessageQueue q, std::type_index t)
: queue(std::move(q)), type(t) {}

// Delete copy operations (MessageQueue contains unique_ptr)
PortInfo(const PortInfo&) = delete;
PortInfo& operator=(const PortInfo&) = delete;

// Explicitly default move operations with noexcept for vector reallocation
PortInfo(PortInfo&&) noexcept = default;
PortInfo& operator=(PortInfo&&) noexcept = default;
};

enum class PortKind { DATA, CONTROL };
Expand Down Expand Up @@ -510,7 +522,12 @@ class Operator {
}

// Send the messages to the connected operators
for (auto& conn : connections_) {
for (auto& conn : connections_) {
auto child = conn.child.lock(); // Lock weak_ptr to get shared_ptr
if (!child) {
continue; // Child has been destroyed
}

auto& output_queue = output_ports_[conn.output_port].queue;
if (output_queue.empty()) {
continue;
Expand All @@ -519,17 +536,17 @@ class Operator {
for (size_t i = 0; i < output_queue.size(); i++) {
auto msg_copy = output_queue[i]->clone();
#ifdef RTBOT_INSTRUMENTATION
RTBOT_RECORD_MESSAGE_SENT(id_, type_name(), std::to_string(i), conn.child->id(), conn.child->type_name(),
RTBOT_RECORD_MESSAGE_SENT(id_, type_name(), std::to_string(i), child->id(), child->type_name(),
std::to_string(conn.child_input_port),
conn.child_port_kind == PortKind::DATA ? "" : "[c]", output_queue[i]->clone());
#endif
// Route message based on connection port kind
if (conn.child_port_kind == PortKind::DATA) {
conn.child->receive_data(std::move(msg_copy), conn.child_input_port);
child->receive_data(std::move(msg_copy), conn.child_input_port);
} else {
conn.child->receive_control(std::move(msg_copy), conn.child_input_port);
child->receive_control(std::move(msg_copy), conn.child_input_port);
}
propagated_outputs.insert(conn.output_port);
propagated_outputs.insert(conn.output_port);
}
}

Expand All @@ -550,16 +567,18 @@ class Operator {


// Then execute connected operators
for (auto& conn : connections_) {
if (conn.child != nullptr && propagated_outputs.find(conn.output_port) != propagated_outputs.end())
conn.child->execute(debug);
for (auto& conn : connections_) {
auto child = conn.child.lock();
if (child && propagated_outputs.find(conn.output_port) != propagated_outputs.end()) {
child->execute(debug);
}
}

}
struct Connection {
std::shared_ptr<Operator> child;
std::weak_ptr<Operator> child; // Use weak_ptr to avoid circular references
size_t output_port;
size_t child_input_port;
size_t child_input_port;
PortKind child_port_kind{PortKind::DATA};
};

Expand Down
138 changes: 81 additions & 57 deletions libs/finance/include/rtbot/finance/RelativeStrengthIndex.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,92 +3,116 @@

#include <cmath>
#include <cstdint>
#include <memory>
#include <vector>

#include "rtbot/Buffer.h"
#include "rtbot/Message.h"
#include "rtbot/Operator.h"
#include "rtbot/PortType.h"

namespace rtbot {

template <class T, class V>
struct RelativeStrengthIndex : public Operator<T, V> {
RelativeStrengthIndex() = default;
// RSI needs to track sum for computing averages
struct RSIFeatures {
static constexpr bool TRACK_SUM = true;
static constexpr bool TRACK_VARIANCE = false;
};

RelativeStrengthIndex(string const& id, size_t n) : Operator<T, V>(id), initialized(false) {
this->addDataInput("i1", n + 1);
this->addOutput("o1");
class RelativeStrengthIndex : public Buffer<NumberData, RSIFeatures> {
public:
RelativeStrengthIndex(std::string id, size_t n)
: Buffer<NumberData, RSIFeatures>(std::move(id), n + 1),
initialized_(false),
average_gain_(0.0),
average_loss_(0.0),
prev_average_gain_(0.0),
prev_average_loss_(0.0) {}

std::string type_name() const override { return "RelativeStrengthIndex"; }

void reset() override {
Buffer<NumberData, RSIFeatures>::reset();
initialized_ = false;
average_gain_ = 0.0;
average_loss_ = 0.0;
prev_average_gain_ = 0.0;
prev_average_loss_ = 0.0;
}

string typeName() const override { return "RelativeStrengthIndex"; }

OperatorMessage<T, V> processData() override {
string inputPort;
auto in = this->getDataInputs();
if (in.size() == 1)
inputPort = in.at(0);
else
throw runtime_error(typeName() + " : more than 1 input port found");
Message<T, V> out;
size_t n = this->getDataInputSize(inputPort);
V diff, rs, rsi, gain, loss;

if (!initialized) {
averageGain = 0;
averageLoss = 0;
protected:
std::vector<std::unique_ptr<Message<NumberData>>> process_message(const Message<NumberData>* msg) override {
// Only compute RSI when buffer is full
if (!buffer_full()) {
return {};
}

size_t n = buffer_size();
double diff, rs, rsi, gain, loss;

if (!initialized_) {
average_gain_ = 0.0;
average_loss_ = 0.0;

// Calculate initial average gain/loss from buffer
for (size_t i = 1; i < n; i++) {
diff = this->getDataInputMessage(inputPort, i).value - this->getDataInputMessage(inputPort, i - 1).value;
if (diff > 0)
averageGain = averageGain + diff;
else if (diff < 0)
averageLoss = averageLoss - diff;
diff = buffer()[i]->data.value - buffer()[i - 1]->data.value;
if (diff > 0) {
average_gain_ += diff;
} else if (diff < 0) {
average_loss_ -= diff; // Make positive
}
}
averageGain = averageGain / (n - 1);
averageLoss = averageLoss / (n - 1);
average_gain_ /= (n - 1);
average_loss_ /= (n - 1);

initialized = true;
initialized_ = true;
} else {
diff = this->getDataInputMessage(inputPort, n - 1).value - this->getDataInputMessage(inputPort, n - 2).value;
// Use smoothed average for subsequent calculations
diff = buffer()[n - 1]->data.value - buffer()[n - 2]->data.value;
if (diff > 0) {
gain = diff;
loss = 0;
loss = 0.0;
} else if (diff < 0) {
loss = -diff;
gain = 0;
gain = 0.0;
} else {
loss = 0;
gain = 0;
loss = 0.0;
gain = 0.0;
}
averageGain = (prevAverageGain * (n - 2) + gain) / (n - 1);
averageLoss = (prevAverageLoss * (n - 2) + loss) / (n - 1);
average_gain_ = (prev_average_gain_ * (n - 2) + gain) / (n - 1);
average_loss_ = (prev_average_loss_ * (n - 2) + loss) / (n - 1);
}

prevAverageGain = averageGain;
prevAverageLoss = averageLoss;
prev_average_gain_ = average_gain_;
prev_average_loss_ = average_loss_;

if (averageLoss > 0) {
rs = averageGain / averageLoss;

rsi = 100.0 - (100.0 / (1 + rs));
} else
// Calculate RSI
if (average_loss_ > 0) {
rs = average_gain_ / average_loss_;
rsi = 100.0 - (100.0 / (1.0 + rs));
} else {
rsi = 100.0;
out.value = rsi;

out.time = this->getDataInputLastMessage(inputPort).time;
}

OperatorMessage<T, V> outputMsgs;
PortMessage<T, V> v;
v.push_back(out);
outputMsgs.emplace("o1", v);
return outputMsgs;
// Create output message
std::vector<std::unique_ptr<Message<NumberData>>> result;
result.push_back(create_message<NumberData>(msg->time, NumberData{rsi}));
return result;
}

private:
V averageGain;
V averageLoss;
V prevAverageGain;
V prevAverageLoss;
bool initialized = false;
bool initialized_;
double average_gain_;
double average_loss_;
double prev_average_gain_;
double prev_average_loss_;
};

inline std::shared_ptr<RelativeStrengthIndex> make_relative_strength_index(std::string id, size_t n) {
return std::make_shared<RelativeStrengthIndex>(std::move(id), n);
}

} // namespace rtbot

#endif // RELATIVESTRENGTHINDEX_H
Loading
Loading