diff --git a/.gitmodules b/.gitmodules index cdac151..e44ff89 100644 --- a/.gitmodules +++ b/.gitmodules @@ -7,3 +7,6 @@ [submodule "extern/CloudR"] path = extern/CloudR url = https://github.com/Algebraic-Programming/CloudR.git +[submodule "extern/cpp-httplib"] + path = extern/cpp-httplib + url = https://github.com/yhirose/cpp-httplib.git diff --git a/examples/basic/meson.build b/examples/basic/meson.build index 46b7016..4b64f6e 100644 --- a/examples/basic/meson.build +++ b/examples/basic/meson.build @@ -6,7 +6,7 @@ if get_option('buildTests') 'basic', mpirunExecutable, args: [ - '-np', '5', + '-np', '1', '--oversubscribe', e.full_path(), meson.current_source_dir() + '/config.json', ], diff --git a/extern/cpp-httplib b/extern/cpp-httplib new file mode 160000 index 0000000..551f96d --- /dev/null +++ b/extern/cpp-httplib @@ -0,0 +1 @@ +Subproject commit 551f96d4a2a07a56d72a13d4837b31eead884d34 diff --git a/include/hllm/realTimeAnalysis.hpp b/include/hllm/realTimeAnalysis.hpp new file mode 100644 index 0000000..38441ed --- /dev/null +++ b/include/hllm/realTimeAnalysis.hpp @@ -0,0 +1,119 @@ +#pragma once + +#include "../../extern/cpp-httplib/httplib.h" // Lightweight HTTP server library +#include // For atomic variables (thread-safe counters, flags) +#include // For timing utilities (steady clock, seconds, etc.) +#include // For std::mutex and std::lock_guard (thread safety) +#include // For storing the json file +#include // For debugging +#include + +#include +#include + +namespace hLLM +{ + +/** + * A function for a client passing its information + * (PROTOTYPE) + */ +void clientPost(const size_t& InstanceId, const int& n_requests, httplib::Client cli) +{ + // Preparing the data to be passed over. It should only consist of a part of the whole json file + std::string json_data = + "{\n" + " \"Instance\": {\n" + " \"instance ID\": " + std::to_string(InstanceId) + ",\n" + " \"status\": \"active\",\n" + " \"number of requests per ms\": " + std::to_string(n_requests) + "\n" + " }\n" + "}"; + + // Send Post of the new json_data + auto res = cli.Post("/data", json_data, "application/json"); + + // Handle server response or connection failure + if (!res) { + // Print an error message if connection failed + std::cerr << "Failed to connect to server.\n"; + } +} + +class RealTimeAnalysis +{ + public: + + RealTimeAnalysis(const std::string ip = "0.0.0.0", const size_t port = 5003) : _ip(ip), _port(port) + { + _svr.Post("/data", [this](const httplib::Request &req, httplib::Response &res) { + { + // Acquire lock before modifying the shared global variable. + // The lock is automatically released when this scope ends. + std::lock_guard lock(_mtx); + _last_received_json = req.body; + } + + // (Debugging) + // Return an acknowledgment response to the client. + // This shows that the server successfully processed the POST. + // res.set_content("{\"status\":\"ok\"}", "application/json"); + }); + + // Get method of posting the captured json file + _svr.Get("/", [this](const httplib::Request &, httplib::Response &res) { + // Acquire the same mutex before reading shared state. + std::lock_guard lock(_mtx); + + // Send HTML response to the browser. + res.set_content(_last_received_json, "application/json"); + }); + + // Start listening from a separate thread + // Default: Listen on all available network interfaces (0.0.0.0) at port 5003. + _srv_thread = std::thread([this]() { + _svr.listen(_ip, _port); + }); + } + + ~RealTimeAnalysis() + { + _svr.stop(); // tell the server to stop listening + if (_srv_thread.joinable()) + _srv_thread.join(); // clean shutdown + } + + private: + + /** + * HTTP server instance + */ + httplib::Server _svr; + + /** + * Mutex to protect the access to the JSON file + */ + std::mutex _mtx; + + /** + * The received JSON file from any client + */ + std::string _last_received_json; + + /** + * The IP we launch our http application + */ + std::string _ip; + + /** + * The Port of the IP + */ + size_t _port; + + /** + * The server thread to run the "listen" method + */ + std::thread _srv_thread; +}; + +} \ No newline at end of file diff --git a/include/hllm/roles/requestManager.hpp b/include/hllm/roles/requestManager.hpp index 9b4231d..45de75a 100644 --- a/include/hllm/roles/requestManager.hpp +++ b/include/hllm/roles/requestManager.hpp @@ -2,6 +2,8 @@ #include #include +#include + #include #include #include @@ -12,6 +14,11 @@ #include "../configuration/deployment.hpp" #include "../messages/data.hpp" #include "../session.hpp" +#include "../realTimeAnalysis.hpp" +#include "../../../extern/cpp-httplib/httplib.h" + +using Clock = std::chrono::steady_clock; // Monotonic clock for precise timing +using Secs = std::chrono::seconds; // Convenience alias for seconds namespace hLLM::roles { @@ -26,7 +33,7 @@ class RequestManager final : public hLLM::Role RequestManager( const configuration::Deployment deployment, taskr::Runtime* const taskr - ) : Role(deployment, taskr) + ) : Role(deployment, taskr), _rTA("0.0.0.0", 5003), _cli("localhost", 5003), num_responses(0), prev_num_responses(0) { // Name of the prompt input const auto& promptInputName = _deployment.getRequestManager()->getInput(); @@ -84,7 +91,10 @@ class RequestManager final : public hLLM::Role _resultInputEdge = std::make_shared(*edgeConfig, edge::edgeType_t::coordinatorToRequestManager, edgeIdx, _resultProducerPartitionIdx, _resultProducerPartitionIdx, edge::Base::coordinatorReplicaIndex); // printf("[Request Manager] Result Input Edge: Type: %u, EdgeIdx: %lu, CP: %lu, PP: %lu, RI: %lu\n", edge::edgeType_t::coordinatorToRequestManager, edgeIdx, _resultProducerPartitionIdx, _resultProducerPartitionIdx, edge::Base::coordinatorReplicaIndex); } - } + } + + // set the previous time tracker to now. + prev_time = Clock::now(); } // Gets the memory slots required by the edges @@ -176,6 +186,38 @@ class RequestManager final : public hLLM::Role // printf("Prompt Map Size: %lu\n", _activePromptMap.size()); // usleep(10000); + + // Increasing the counter + // Compute the new average response per minute value + now_time = Clock::now(); + + auto time_diff_sec = std::chrono::duration(now_time - prev_time).count(); + + const double resp_diff = double(++num_responses - prev_num_responses); + + const double avg_res_per_minute = resp_diff/time_diff_sec * 60.0; + + // Update the prev_time if it was longer than a second + if(time_diff_sec > 1.0) + { + prev_time = now_time; + prev_num_responses = num_responses; + } + + std::string json_data = + "{\n" + " \"partition\": {\n" + " \"name\": \"partition1\",\n" // For now, fake partition value + " \"status\": \"active\",\n" + " \"num_responses\": " + std::to_string(num_responses) + ",\n" + " \"avg_responses_per_min\": " + std::to_string(avg_res_per_minute) + "\n" + " }\n" + "}"; + + auto res = _cli.Post("/data", json_data, "application/json"); + + // Error handling to check if the HTTP post was successfull + if(!res) std::cerr << "Failed to connect to server.\n"; } ///////////// Prompt handling service @@ -299,6 +341,17 @@ class RequestManager final : public hLLM::Role // Active session map std::map> _activeSessionMap; + // Initializing the realTimeAnalysis instance to start listening number of requests + RealTimeAnalysis _rTA; + + // requestManager's own HTTP client as it will pass the number of requests per minute + httplib::Client _cli; + + // Number of responses tracker + size_t num_responses, prev_num_responses; + + // time tracker for the number of responses + Clock::time_point now_time, prev_time; }; // class RequestManager