From 0d7638c25f6cd339e6832a0d3a6ce0356a446e71 Mon Sep 17 00:00:00 2001 From: noabauma Date: Wed, 29 Oct 2025 16:41:27 +0100 Subject: [PATCH 1/6] adding first template version of the RTA class --- .gitmodules | 3 + examples/basic/meson.build | 2 +- extern/cpp-httplib | 1 + include/hllm/realTimeAnalysis.hpp | 108 ++++++++++++++++++++++++++++++ 4 files changed, 113 insertions(+), 1 deletion(-) create mode 160000 extern/cpp-httplib create mode 100644 include/hllm/realTimeAnalysis.hpp diff --git a/.gitmodules b/.gitmodules index cdac151..e44ff89 100644 --- a/.gitmodules +++ b/.gitmodules @@ -7,3 +7,6 @@ [submodule "extern/CloudR"] path = extern/CloudR url = https://github.com/Algebraic-Programming/CloudR.git +[submodule "extern/cpp-httplib"] + path = extern/cpp-httplib + url = https://github.com/yhirose/cpp-httplib.git diff --git a/examples/basic/meson.build b/examples/basic/meson.build index 46b7016..4b64f6e 100644 --- a/examples/basic/meson.build +++ b/examples/basic/meson.build @@ -6,7 +6,7 @@ if get_option('buildTests') 'basic', mpirunExecutable, args: [ - '-np', '5', + '-np', '1', '--oversubscribe', e.full_path(), meson.current_source_dir() + '/config.json', ], diff --git a/extern/cpp-httplib b/extern/cpp-httplib new file mode 160000 index 0000000..551f96d --- /dev/null +++ b/extern/cpp-httplib @@ -0,0 +1 @@ +Subproject commit 551f96d4a2a07a56d72a13d4837b31eead884d34 diff --git a/include/hllm/realTimeAnalysis.hpp b/include/hllm/realTimeAnalysis.hpp new file mode 100644 index 0000000..60ba263 --- /dev/null +++ b/include/hllm/realTimeAnalysis.hpp @@ -0,0 +1,108 @@ +#pragma once + +#include // Lightweight HTTP server library +#include // For atomic variables (thread-safe counters, flags) +#include // For timing utilities (steady clock, seconds, etc.) +#include // For std::mutex and std::lock_guard (thread safety) +#include // For storing the json file +#include // For debugging + +#include +#include + +namespace hLLM +{ + +/** + * A function for a client passing its information + */ +void clientPost(const size_t& InstanceId; const int& n_requests, httplib::Client cli) +{ + // Preparing the data to be passed over. It should only consist of a part of the whole json file + std::string json_data = + "{\n" + " \"Instance\": {\n" + " \"instance ID\": " + std::to_string(InstanceID) ",\n" + " \"status\": \"active\",\n" + " \"number of requests per ms\": " + std::to_string(n_requests) + "\n" + " }\n" + "}"; + + // Send Post of the new json_data + auto res = cli.Post("/data", json_data, "application/json"); + + // Handle server response or connection failure + if (res) { + // Print HTTP status code (e.g., 200) + std::cout << "Status code: " << res->status << std::endl; + + // Print body text returned by server + std::cout << "Response: " << res->body << std::endl; + } else { + // Print an error message if connection failed + std::cerr << "Failed to connect to server.\n"; + } +} + +class RealTimeAnalysis +{ + public: + + RealTimeAnalysis(const std::string ip = "0.0.0.0", const size_t port = 5004) : _ip(ip), _port(port) + { + // initializing the HTTP server + _svr = httplib::Server; + + _svr.Post("/data", [](const httplib::Request &req, httplib::Response &res) { + { + // Acquire lock before modifying the shared global variable. + // The lock is automatically released when this scope ends. + std::lock_guard lock(mtx); + _last_received_json = req.body; + } + + // Print received JSON to the console for visibility/debugging. + std::cout << "Received JSON:\n" << req.body << std::endl; + + // Return an acknowledgment response to the client. + // This shows that the server successfully processed the POST. + // (This is for debugging) + res.set_content("{\"status\":\"ok\"}", "application/json"); + }); + + // Start listening + // Default: Listen on all available network interfaces (0.0.0.0) at port 5004. + _svr.listen(_ip, _port); + } + + ~RealTimeAnalysis() = default; + + private: + + /** + * HTTP server instance + */ + httplib::Server _svr; + + /** + * Mutex to protect the access to the JSON file + */ + std::mutex _mtx; + + /** + * The received JSON file from any client + */ + std::string _last_received_json; + + /** + * The IP we launch our http application + */ + std::string _ip; + + /** + * The Port of the IP + */ + size_t _port; +} + +} \ No newline at end of file From 2f2e66dc98d6324939269da97d525016f3d14b6d Mon Sep 17 00:00:00 2001 From: noabauma Date: Thu, 30 Oct 2025 11:17:16 +0100 Subject: [PATCH 2/6] first prototype (but not running) --- include/hllm/realTimeAnalysis.hpp | 22 ++++++------ include/hllm/roles/requestManager.hpp | 49 +++++++++++++++++++++++++-- 2 files changed, 59 insertions(+), 12 deletions(-) diff --git a/include/hllm/realTimeAnalysis.hpp b/include/hllm/realTimeAnalysis.hpp index 60ba263..1bd6ae5 100644 --- a/include/hllm/realTimeAnalysis.hpp +++ b/include/hllm/realTimeAnalysis.hpp @@ -1,11 +1,12 @@ #pragma once -#include // Lightweight HTTP server library +#include "../../extern/cpp-httplib/httplib.h" // Lightweight HTTP server library #include // For atomic variables (thread-safe counters, flags) #include // For timing utilities (steady clock, seconds, etc.) #include // For std::mutex and std::lock_guard (thread safety) #include // For storing the json file #include // For debugging +#include #include #include @@ -15,14 +16,15 @@ namespace hLLM /** * A function for a client passing its information + * (PROTOTYPE) */ -void clientPost(const size_t& InstanceId; const int& n_requests, httplib::Client cli) +void clientPost(const size_t& InstanceId, const int& n_requests, httplib::Client cli) { // Preparing the data to be passed over. It should only consist of a part of the whole json file std::string json_data = "{\n" " \"Instance\": {\n" - " \"instance ID\": " + std::to_string(InstanceID) ",\n" + " \"instance ID\": " + std::to_string(InstanceId) + ",\n" " \"status\": \"active\",\n" " \"number of requests per ms\": " + std::to_string(n_requests) + "\n" " }\n" @@ -50,14 +52,12 @@ class RealTimeAnalysis RealTimeAnalysis(const std::string ip = "0.0.0.0", const size_t port = 5004) : _ip(ip), _port(port) { - // initializing the HTTP server - _svr = httplib::Server; - _svr.Post("/data", [](const httplib::Request &req, httplib::Response &res) { + _svr.Post("/data", [this](const httplib::Request &req, httplib::Response &res) { { // Acquire lock before modifying the shared global variable. // The lock is automatically released when this scope ends. - std::lock_guard lock(mtx); + std::lock_guard lock(_mtx); _last_received_json = req.body; } @@ -70,9 +70,11 @@ class RealTimeAnalysis res.set_content("{\"status\":\"ok\"}", "application/json"); }); - // Start listening + // Start listening from a separate thread // Default: Listen on all available network interfaces (0.0.0.0) at port 5004. - _svr.listen(_ip, _port); + std::thread srv_listen([this](){ + _svr.listen(_ip, _port); + }); } ~RealTimeAnalysis() = default; @@ -103,6 +105,6 @@ class RealTimeAnalysis * The Port of the IP */ size_t _port; -} +}; } \ No newline at end of file diff --git a/include/hllm/roles/requestManager.hpp b/include/hllm/roles/requestManager.hpp index 4ab5130..6d2ccc7 100644 --- a/include/hllm/roles/requestManager.hpp +++ b/include/hllm/roles/requestManager.hpp @@ -2,6 +2,8 @@ #include #include +#include + #include #include #include @@ -12,6 +14,11 @@ #include "../configuration/deployment.hpp" #include "../messages/data.hpp" #include "../session.hpp" +#include "../realTimeAnalysis.hpp" +#include "../../../extern/cpp-httplib/httplib.h" + +using Clock = std::chrono::steady_clock; // Monotonic clock for precise timing +using Secs = std::chrono::seconds; // Convenience alias for seconds namespace hLLM::roles { @@ -26,7 +33,7 @@ class RequestManager final : public hLLM::Role RequestManager( const configuration::Deployment deployment, taskr::Runtime* const taskr - ) : Role(deployment, taskr) + ) : Role(deployment, taskr), _rTA("0.0.0.0", 5004), _cli("localhost", 5004), num_responses(0) { // Name of the prompt input const auto& promptInputName = _deployment.getUserInterface().input; @@ -84,7 +91,7 @@ class RequestManager final : public hLLM::Role _resultInputEdge = std::make_shared(*edgeConfig, edge::edgeType_t::coordinatorToRequestManager, edgeIdx, _resultProducerPartitionIdx, _resultProducerPartitionIdx, edge::Base::coordinatorReplicaIndex); // printf("[Request Manager] Result Input Edge: Type: %u, EdgeIdx: %lu, CP: %lu, PP: %lu, RI: %lu\n", edge::edgeType_t::coordinatorToRequestManager, edgeIdx, _resultProducerPartitionIdx, _resultProducerPartitionIdx, edge::Base::coordinatorReplicaIndex); } - } + } } // Gets the memory slots required by the edges @@ -176,6 +183,35 @@ class RequestManager final : public hLLM::Role // printf("Prompt Map Size: %lu\n", _activePromptMap.size()); // usleep(10000); + + // Increasing the counter + // Compute the new average response per minute value + // const auto now = Clock::now(); + // auto diff_time_sec = + + // const double avg_res_per_minute = 60/(diff_time_sec) + + std::string json_data = + "{\n" + " \"partition\": {\n" + " \"name\": \"partition1\",\n" + " \"status\": \"active\",\n" + " \"avg_latency_ms\": " + std::to_string(++num_responses) + "\n" + " }\n" + "}"; + + auto res = _cli.Post("/data", json_data, "application/json"); + + if (res) { + // Print HTTP status code (e.g., 200) + std::cout << "Status code: " << res->status << std::endl; + + // Print body text returned by server + std::cout << "Response: " << res->body << std::endl; + } else { + // Print an error message if connection failed + std::cerr << "Failed to connect to server.\n"; + } } ///////////// Prompt handling service @@ -299,6 +335,15 @@ class RequestManager final : public hLLM::Role // Active session map std::map> _activeSessionMap; + // Initializing the realTimeAnalysis instance to start listening number of requests + RealTimeAnalysis _rTA; + + // requestManager's own HTTP client as it will pass the number of requests per minute + httplib::Client _cli; + + size_t num_responses, num_responses_prev; + + std::chrono::steady_clock time_now, time_prev; }; // class RequestManager From 4da159b12af7ac9178ee4798490945aa3de51941 Mon Sep 17 00:00:00 2001 From: noabauma Date: Thu, 30 Oct 2025 12:02:23 +0100 Subject: [PATCH 3/6] http communication is working (now, let's plot it in grafana) --- include/hllm/realTimeAnalysis.hpp | 22 ++++++++++++++++++---- include/hllm/roles/requestManager.hpp | 4 +++- 2 files changed, 21 insertions(+), 5 deletions(-) diff --git a/include/hllm/realTimeAnalysis.hpp b/include/hllm/realTimeAnalysis.hpp index 1bd6ae5..f98b1d5 100644 --- a/include/hllm/realTimeAnalysis.hpp +++ b/include/hllm/realTimeAnalysis.hpp @@ -50,8 +50,9 @@ class RealTimeAnalysis { public: - RealTimeAnalysis(const std::string ip = "0.0.0.0", const size_t port = 5004) : _ip(ip), _port(port) + RealTimeAnalysis(const std::string ip = "0.0.0.0", const size_t port = 5003) : _ip(ip), _port(port) { + printf("RTA constructor called\n"); fflush(stdout); _svr.Post("/data", [this](const httplib::Request &req, httplib::Response &res) { { @@ -70,14 +71,22 @@ class RealTimeAnalysis res.set_content("{\"status\":\"ok\"}", "application/json"); }); + printf("srv.listen on a std::thread\n"); fflush(stdout); + // Start listening from a separate thread - // Default: Listen on all available network interfaces (0.0.0.0) at port 5004. - std::thread srv_listen([this](){ + // Default: Listen on all available network interfaces (0.0.0.0) at port 5003. + _srv_thread = std::thread([this]() { _svr.listen(_ip, _port); }); } - ~RealTimeAnalysis() = default; + ~RealTimeAnalysis() + { + printf("RTA destructor called\n"); fflush(stdout); + _svr.stop(); // tell the server to stop listening + if (_srv_thread.joinable()) + _srv_thread.join(); // clean shutdown + } private: @@ -105,6 +114,11 @@ class RealTimeAnalysis * The Port of the IP */ size_t _port; + + /** + * + */ + std::thread _srv_thread; }; } \ No newline at end of file diff --git a/include/hllm/roles/requestManager.hpp b/include/hllm/roles/requestManager.hpp index 83cb1e0..a48cadc 100644 --- a/include/hllm/roles/requestManager.hpp +++ b/include/hllm/roles/requestManager.hpp @@ -33,7 +33,7 @@ class RequestManager final : public hLLM::Role RequestManager( const configuration::Deployment deployment, taskr::Runtime* const taskr - ) : Role(deployment, taskr), _rTA("0.0.0.0", 5004), _cli("localhost", 5004), num_responses(0) + ) : Role(deployment, taskr), _rTA("0.0.0.0", 5003), _cli("localhost", 5003), num_responses(0) { // Name of the prompt input const auto& promptInputName = _deployment.getRequestManager()->getInput(); @@ -191,6 +191,8 @@ class RequestManager final : public hLLM::Role // const double avg_res_per_minute = 60/(diff_time_sec) + printf("response counter one up\n"); fflush(stdout); + std::string json_data = "{\n" " \"partition\": {\n" From b532ca079aa8d38fac05014ac0a647476e83fb14 Mon Sep 17 00:00:00 2001 From: noabauma Date: Thu, 30 Oct 2025 14:59:35 +0100 Subject: [PATCH 4/6] minor improvements --- include/hllm/realTimeAnalysis.hpp | 33 +++++++++----------- include/hllm/roles/requestManager.hpp | 44 +++++++++++++++------------ 2 files changed, 39 insertions(+), 38 deletions(-) diff --git a/include/hllm/realTimeAnalysis.hpp b/include/hllm/realTimeAnalysis.hpp index f98b1d5..38441ed 100644 --- a/include/hllm/realTimeAnalysis.hpp +++ b/include/hllm/realTimeAnalysis.hpp @@ -34,15 +34,9 @@ void clientPost(const size_t& InstanceId, const int& n_requests, httplib::Client auto res = cli.Post("/data", json_data, "application/json"); // Handle server response or connection failure - if (res) { - // Print HTTP status code (e.g., 200) - std::cout << "Status code: " << res->status << std::endl; - - // Print body text returned by server - std::cout << "Response: " << res->body << std::endl; - } else { - // Print an error message if connection failed - std::cerr << "Failed to connect to server.\n"; + if (!res) { + // Print an error message if connection failed + std::cerr << "Failed to connect to server.\n"; } } @@ -52,8 +46,6 @@ class RealTimeAnalysis RealTimeAnalysis(const std::string ip = "0.0.0.0", const size_t port = 5003) : _ip(ip), _port(port) { - printf("RTA constructor called\n"); fflush(stdout); - _svr.Post("/data", [this](const httplib::Request &req, httplib::Response &res) { { // Acquire lock before modifying the shared global variable. @@ -62,16 +54,20 @@ class RealTimeAnalysis _last_received_json = req.body; } - // Print received JSON to the console for visibility/debugging. - std::cout << "Received JSON:\n" << req.body << std::endl; - + // (Debugging) // Return an acknowledgment response to the client. // This shows that the server successfully processed the POST. - // (This is for debugging) - res.set_content("{\"status\":\"ok\"}", "application/json"); + // res.set_content("{\"status\":\"ok\"}", "application/json"); }); - printf("srv.listen on a std::thread\n"); fflush(stdout); + // Get method of posting the captured json file + _svr.Get("/", [this](const httplib::Request &, httplib::Response &res) { + // Acquire the same mutex before reading shared state. + std::lock_guard lock(_mtx); + + // Send HTML response to the browser. + res.set_content(_last_received_json, "application/json"); + }); // Start listening from a separate thread // Default: Listen on all available network interfaces (0.0.0.0) at port 5003. @@ -82,7 +78,6 @@ class RealTimeAnalysis ~RealTimeAnalysis() { - printf("RTA destructor called\n"); fflush(stdout); _svr.stop(); // tell the server to stop listening if (_srv_thread.joinable()) _srv_thread.join(); // clean shutdown @@ -116,7 +111,7 @@ class RealTimeAnalysis size_t _port; /** - * + * The server thread to run the "listen" method */ std::thread _srv_thread; }; diff --git a/include/hllm/roles/requestManager.hpp b/include/hllm/roles/requestManager.hpp index a48cadc..81aff98 100644 --- a/include/hllm/roles/requestManager.hpp +++ b/include/hllm/roles/requestManager.hpp @@ -33,7 +33,7 @@ class RequestManager final : public hLLM::Role RequestManager( const configuration::Deployment deployment, taskr::Runtime* const taskr - ) : Role(deployment, taskr), _rTA("0.0.0.0", 5003), _cli("localhost", 5003), num_responses(0) + ) : Role(deployment, taskr), _rTA("0.0.0.0", 5003), _cli("localhost", 5003), num_responses(0), prev_num_responses(0) { // Name of the prompt input const auto& promptInputName = _deployment.getRequestManager()->getInput(); @@ -92,6 +92,9 @@ class RequestManager final : public hLLM::Role // printf("[Request Manager] Result Input Edge: Type: %u, EdgeIdx: %lu, CP: %lu, PP: %lu, RI: %lu\n", edge::edgeType_t::coordinatorToRequestManager, edgeIdx, _resultProducerPartitionIdx, _resultProducerPartitionIdx, edge::Base::coordinatorReplicaIndex); } } + + // set the previous time tracker to now. + prev_time = Clock::now(); } // Gets the memory slots required by the edges @@ -186,34 +189,35 @@ class RequestManager final : public hLLM::Role // Increasing the counter // Compute the new average response per minute value - // const auto now = Clock::now(); - // auto diff_time_sec = + now_time = Clock::now(); + + auto time_diff_sec = std::chrono::duration(now_time - prev_time).count(); - // const double avg_res_per_minute = 60/(diff_time_sec) + const double resp_diff = double(++num_responses - prev_num_responses); + + const double avg_res_per_minute = resp_diff/time_diff_sec * 60.0; - printf("response counter one up\n"); fflush(stdout); + // Update the prev_time if it was longer than a second + if(time_diff_sec > 1.0) + { + prev_time = now_time; + prev_num_responses = num_responses; + } std::string json_data = "{\n" " \"partition\": {\n" - " \"name\": \"partition1\",\n" + " \"name\": \"partition1\",\n" // For now, fake partition value " \"status\": \"active\",\n" - " \"avg_latency_ms\": " + std::to_string(++num_responses) + "\n" + " \"num_responses\": " + std::to_string(++num_responses) + "\n" + " \"avg_responses_per_min\": " + std::to_string(avg_res_per_minute) + "\n" " }\n" "}"; auto res = _cli.Post("/data", json_data, "application/json"); - if (res) { - // Print HTTP status code (e.g., 200) - std::cout << "Status code: " << res->status << std::endl; - - // Print body text returned by server - std::cout << "Response: " << res->body << std::endl; - } else { - // Print an error message if connection failed - std::cerr << "Failed to connect to server.\n"; - } + // Error handling to check if the HTTP post was successfull + if(!res) std::cerr << "Failed to connect to server.\n"; } ///////////// Prompt handling service @@ -343,9 +347,11 @@ class RequestManager final : public hLLM::Role // requestManager's own HTTP client as it will pass the number of requests per minute httplib::Client _cli; - size_t num_responses, num_responses_prev; + // Number of responses tracker + size_t num_responses, prev_num_responses; - std::chrono::steady_clock time_now, time_prev; + // time tracker for the number of responses + Clock::time_point now_time, prev_time; }; // class RequestManager From b4566f965251dffed1af578e1a032ae01e1b71a8 Mon Sep 17 00:00:00 2001 From: noabauma Date: Thu, 30 Oct 2025 15:15:55 +0100 Subject: [PATCH 5/6] functioning real time analysis --- include/hllm/roles/requestManager.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/hllm/roles/requestManager.hpp b/include/hllm/roles/requestManager.hpp index 81aff98..e758f42 100644 --- a/include/hllm/roles/requestManager.hpp +++ b/include/hllm/roles/requestManager.hpp @@ -209,7 +209,7 @@ class RequestManager final : public hLLM::Role " \"partition\": {\n" " \"name\": \"partition1\",\n" // For now, fake partition value " \"status\": \"active\",\n" - " \"num_responses\": " + std::to_string(++num_responses) + "\n" + " \"num_responses\": " + std::to_string(++num_responses) + ",\n" " \"avg_responses_per_min\": " + std::to_string(avg_res_per_minute) + "\n" " }\n" "}"; From 519e6d2cda826266a7b149bc7616ed8022f9c6ef Mon Sep 17 00:00:00 2001 From: noabauma Date: Fri, 31 Oct 2025 15:22:07 +0100 Subject: [PATCH 6/6] minor correction in the number of responses --- include/hllm/roles/requestManager.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/include/hllm/roles/requestManager.hpp b/include/hllm/roles/requestManager.hpp index e758f42..45de75a 100644 --- a/include/hllm/roles/requestManager.hpp +++ b/include/hllm/roles/requestManager.hpp @@ -209,7 +209,7 @@ class RequestManager final : public hLLM::Role " \"partition\": {\n" " \"name\": \"partition1\",\n" // For now, fake partition value " \"status\": \"active\",\n" - " \"num_responses\": " + std::to_string(++num_responses) + ",\n" + " \"num_responses\": " + std::to_string(num_responses) + ",\n" " \"avg_responses_per_min\": " + std::to_string(avg_res_per_minute) + "\n" " }\n" "}";