Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitmodules
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,6 @@
[submodule "extern/CloudR"]
path = extern/CloudR
url = https://github.com/Algebraic-Programming/CloudR.git
[submodule "extern/cpp-httplib"]
path = extern/cpp-httplib
url = https://github.com/yhirose/cpp-httplib.git
2 changes: 1 addition & 1 deletion examples/basic/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ if get_option('buildTests')
'basic',
mpirunExecutable,
args: [
'-np', '5',
'-np', '1',
'--oversubscribe', e.full_path(),
meson.current_source_dir() + '/config.json',
],
Expand Down
1 change: 1 addition & 0 deletions extern/cpp-httplib
Submodule cpp-httplib added at 551f96
119 changes: 119 additions & 0 deletions include/hllm/realTimeAnalysis.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
#pragma once

#include "../../extern/cpp-httplib/httplib.h" // Lightweight HTTP server library
#include <atomic> // For atomic variables (thread-safe counters, flags)
#include <chrono> // For timing utilities (steady clock, seconds, etc.)
#include <mutex> // For std::mutex and std::lock_guard (thread safety)
#include <string> // For storing the json file
#include <iostream> // For debugging
#include <thread>

#include <hicr/core/exceptions.hpp>
#include <hicr/core/definitions.hpp>

namespace hLLM
{

/**
* A function for a client passing its information
* (PROTOTYPE)
*/
void clientPost(const size_t& InstanceId, const int& n_requests, httplib::Client cli)
{
// Preparing the data to be passed over. It should only consist of a part of the whole json file
std::string json_data =
"{\n"
" \"Instance\": {\n"
" \"instance ID\": " + std::to_string(InstanceId) + ",\n"
" \"status\": \"active\",\n"
" \"number of requests per ms\": " + std::to_string(n_requests) + "\n"
" }\n"
"}";

// Send Post of the new json_data
auto res = cli.Post("/data", json_data, "application/json");

// Handle server response or connection failure
if (!res) {
// Print an error message if connection failed
std::cerr << "Failed to connect to server.\n";
}
}

class RealTimeAnalysis
{
public:

RealTimeAnalysis(const std::string ip = "0.0.0.0", const size_t port = 5003) : _ip(ip), _port(port)
{
_svr.Post("/data", [this](const httplib::Request &req, httplib::Response &res) {
{
// Acquire lock before modifying the shared global variable.
// The lock is automatically released when this scope ends.
std::lock_guard<std::mutex> lock(_mtx);
_last_received_json = req.body;
}

// (Debugging)
// Return an acknowledgment response to the client.
// This shows that the server successfully processed the POST.
// res.set_content("{\"status\":\"ok\"}", "application/json");
});

// Get method of posting the captured json file
_svr.Get("/", [this](const httplib::Request &, httplib::Response &res) {
// Acquire the same mutex before reading shared state.
std::lock_guard<std::mutex> lock(_mtx);

// Send HTML response to the browser.
res.set_content(_last_received_json, "application/json");
});

// Start listening from a separate thread
// Default: Listen on all available network interfaces (0.0.0.0) at port 5003.
_srv_thread = std::thread([this]() {
_svr.listen(_ip, _port);
});
}

~RealTimeAnalysis()
{
_svr.stop(); // tell the server to stop listening
if (_srv_thread.joinable())
_srv_thread.join(); // clean shutdown
}

private:

/**
* HTTP server instance
*/
httplib::Server _svr;

/**
* Mutex to protect the access to the JSON file
*/
std::mutex _mtx;

/**
* The received JSON file from any client
*/
std::string _last_received_json;

/**
* The IP we launch our http application
*/
std::string _ip;

/**
* The Port of the IP
*/
size_t _port;

/**
* The server thread to run the "listen" method
*/
std::thread _srv_thread;
};

}
57 changes: 55 additions & 2 deletions include/hllm/roles/requestManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <memory>
#include <vector>
#include <chrono>

#include <hicr/core/exceptions.hpp>
#include <hicr/core/definitions.hpp>
#include <hicr/core/globalMemorySlot.hpp>
Expand All @@ -12,6 +14,11 @@
#include "../configuration/deployment.hpp"
#include "../messages/data.hpp"
#include "../session.hpp"
#include "../realTimeAnalysis.hpp"
#include "../../../extern/cpp-httplib/httplib.h"

using Clock = std::chrono::steady_clock; // Monotonic clock for precise timing
using Secs = std::chrono::seconds; // Convenience alias for seconds

namespace hLLM::roles
{
Expand All @@ -26,7 +33,7 @@ class RequestManager final : public hLLM::Role
RequestManager(
const configuration::Deployment deployment,
taskr::Runtime* const taskr
) : Role(deployment, taskr)
) : Role(deployment, taskr), _rTA("0.0.0.0", 5003), _cli("localhost", 5003), num_responses(0), prev_num_responses(0)
{
// Name of the prompt input
const auto& promptInputName = _deployment.getRequestManager()->getInput();
Expand Down Expand Up @@ -84,7 +91,10 @@ class RequestManager final : public hLLM::Role
_resultInputEdge = std::make_shared<edge::Input>(*edgeConfig, edge::edgeType_t::coordinatorToRequestManager, edgeIdx, _resultProducerPartitionIdx, _resultProducerPartitionIdx, edge::Base::coordinatorReplicaIndex);
// printf("[Request Manager] Result Input Edge: Type: %u, EdgeIdx: %lu, CP: %lu, PP: %lu, RI: %lu\n", edge::edgeType_t::coordinatorToRequestManager, edgeIdx, _resultProducerPartitionIdx, _resultProducerPartitionIdx, edge::Base::coordinatorReplicaIndex);
}
}
}

// set the previous time tracker to now.
prev_time = Clock::now();
}

// Gets the memory slots required by the edges
Expand Down Expand Up @@ -176,6 +186,38 @@ class RequestManager final : public hLLM::Role

// printf("Prompt Map Size: %lu\n", _activePromptMap.size());
// usleep(10000);

// Increasing the counter
// Compute the new average response per minute value
now_time = Clock::now();

auto time_diff_sec = std::chrono::duration<double>(now_time - prev_time).count();

const double resp_diff = double(++num_responses - prev_num_responses);

const double avg_res_per_minute = resp_diff/time_diff_sec * 60.0;

// Update the prev_time if it was longer than a second
if(time_diff_sec > 1.0)
{
prev_time = now_time;
prev_num_responses = num_responses;
}

std::string json_data =
"{\n"
" \"partition\": {\n"
" \"name\": \"partition1\",\n" // For now, fake partition value
" \"status\": \"active\",\n"
" \"num_responses\": " + std::to_string(num_responses) + ",\n"
" \"avg_responses_per_min\": " + std::to_string(avg_res_per_minute) + "\n"
" }\n"
"}";

auto res = _cli.Post("/data", json_data, "application/json");

// Error handling to check if the HTTP post was successfull
if(!res) std::cerr << "Failed to connect to server.\n";
}

///////////// Prompt handling service
Expand Down Expand Up @@ -299,6 +341,17 @@ class RequestManager final : public hLLM::Role
// Active session map
std::map<sessionId_t, std::shared_ptr<Session>> _activeSessionMap;

// Initializing the realTimeAnalysis instance to start listening number of requests
RealTimeAnalysis _rTA;

// requestManager's own HTTP client as it will pass the number of requests per minute
httplib::Client _cli;

// Number of responses tracker
size_t num_responses, prev_num_responses;

// time tracker for the number of responses
Clock::time_point now_time, prev_time;

}; // class RequestManager

Expand Down
Loading