Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions common_settings.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -209,8 +209,6 @@ COMMON_STATIC_TEST_COPTS = select({
"-Wall",
"-Wno-unknown-pragmas",
"-Werror",
# ov::Tensor::data method call results in deprecated warning and we use it in multiple places
"-Wno-deprecated-declarations",
"-Isrc",
"-fconcepts", # for gmock related utils
"-fvisibility=hidden",# Needed for pybind targets
Expand Down
36 changes: 36 additions & 0 deletions src/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,39 @@ ovms_cc_library(
hdrs = ["queue.hpp"],
visibility = ["//visibility:public",],
)
ovms_cc_library(
name = "mediapipe_internal_graph_side_packets",
hdrs = ["mediapipe_internal/graph_side_packets.hpp"],
visibility = ["//visibility:public",],
)
ovms_cc_library(
name = "mediapipe_internal_graph_executor_constants",
hdrs = ["mediapipe_internal/graph_executor_constants.hpp"],
visibility = ["//visibility:public"],
)
ovms_cc_library(
name = "mediapipe_internal_graphqueue",
hdrs = [
"mediapipe_internal/graphqueue.hpp",
"mediapipe_internal/outputstreamobserver.hpp",
], # TODO FIXME
srcs = ["mediapipe_internal/graphqueue.cpp"],
deps = [
"libovms_queue",
"libovmslogging",
"libovms_execution_context",
"libovmstimer",
"libovmsmetrics",
"model_metric_reporter",
"mediapipe_internal_graph_executor_constants",
"mediapipe_internal_graph_side_packets",
"//third_party:openvino",
"@mediapipe//mediapipe/framework:calculator_graph",
"//src/python:libovmspythonmodule", # TODO not split
"//src/llm:genai_servables", # TODO split!
],
visibility = ["//visibility:public",],
)
ovms_cc_library(
name = "libovms_ovinferrequestsqueue",
hdrs = ["ovinferrequestsqueue.hpp"],
Expand Down Expand Up @@ -542,6 +575,7 @@ ovms_cc_library(
"mediapipe_internal/mediapipegraphconfig.cpp",
"mediapipe_internal/mediapipegraphdefinition.cpp",
"mediapipe_internal/mediapipegraphdefinition.hpp",
"mediapipe_internal/outputstreamobserver.hpp",
"mediapipe_internal/mediapipegraphexecutor.cpp",
"mediapipe_internal/mediapipegraphexecutor.hpp",
"mediapipe_internal/packettypes.hpp",
Expand Down Expand Up @@ -682,6 +716,8 @@ ovms_cc_library(
})
+ select({
"//conditions:default": [
"mediapipe_internal_graph_executor_constants",
"mediapipe_internal_graphqueue",
"@mediapipe_calculators//:mediapipe_calculators", # Need this dependencies here because we use ovms/src - cannot add in ovms_dependencies because we copy src directory later in Dockerfile
"@mediapipe//mediapipe/graphs/holistic_tracking:holistic_tracking_to_render_data",
"@mediapipe//mediapipe/graphs/iris_tracking:iris_tracking_cpu_deps",
Expand Down
4 changes: 4 additions & 0 deletions src/http_frontend/http_graph_executor_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,10 @@ namespace ovms {

static const std::string UNUSED_REQUEST_ID = "";

bool requestHasInputSidePackets(const HttpPayload& request) {
return false;
}

Status deserializeInputSidePacketsFromFirstRequestImpl(
std::map<std::string, mediapipe::Packet>& inputSidePackets, // out
const HttpPayload& request) { // in
Expand Down
3 changes: 3 additions & 0 deletions src/http_frontend/http_graph_executor_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ class PythonBackend;

using HttpReaderWriter = HttpAsyncWriter;

// Checks whether the request contains user-provided input side packets.
bool requestHasInputSidePackets(const HttpPayload& request);

// Deserialization of parameters inside KServe gRPC request
// into mediapipe Packets.
// To be used by both - infer & inferStream.
Expand Down
15 changes: 14 additions & 1 deletion src/kfs_frontend/kfs_graph_executor_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "../kfs_frontend/kfs_utils.hpp"
#include "../logging.hpp"
#include "../mediapipe_internal/graph_executor_constants.hpp"
#include "../mediapipe_internal/mediapipe_utils.hpp"
#include "../mediapipe_internal/mediapipegraphdefinition.hpp"
#include "../predict_request_validation_utils.hpp"
Expand Down Expand Up @@ -925,6 +926,7 @@ static Status createPacketAndPushIntoGraph(const std::string& name, std::shared_
}
std::unique_ptr<T> inputTensor;
OVMS_RETURN_ON_FAIL(deserializeTensor(name, *request, inputTensor, pythonBackend));
SPDLOG_ERROR("Current Timestamp before actual pushing:{}", timestamp.Value());
MP_RETURN_ON_FAIL(graph.AddPacketToInputStream(
name,
::mediapipe::packet_internal::Create(
Expand Down Expand Up @@ -1040,8 +1042,10 @@ static Status deserializeTimestampIfAvailable(
return status;
}
} else {
SPDLOG_ERROR("Current Timestamp before setting:{}", timestamp.Value());
auto now = std::chrono::system_clock::now();
timestamp = ::mediapipe::Timestamp(std::chrono::duration_cast<std::chrono::microseconds>(now.time_since_epoch()).count());
SPDLOG_ERROR("Current Timestamp setting:{}", timestamp.Value());
}
return StatusCode::OK;
}
Expand Down Expand Up @@ -1152,10 +1156,19 @@ Status createAndPushPacketsImpl(
return StatusCode::OK;
}

bool requestHasInputSidePackets(const KFSRequest& request) {
static const std::string TIMESTAMP_PARAM{"OVMS_MP_TIMESTAMP"};
for (const auto& [name, valueChoice] : request.parameters()) {
if (name != TIMESTAMP_PARAM) {
return true;
}
}
return false;
}

Status deserializeInputSidePacketsFromFirstRequestImpl(
std::map<std::string, mediapipe::Packet>& inputSidePackets,
const KFSRequest& request) {
static const std::string PYTHON_SESSION_SIDE_PACKET_TAG{"py"};
for (const auto& [name, valueChoice] : request.parameters()) {
SPDLOG_DEBUG("Found: {}; parameter in request for: {};", name, request.model_name());
if (name == TIMESTAMP_PARAMETER_NAME) {
Expand Down
4 changes: 4 additions & 0 deletions src/kfs_frontend/kfs_graph_executor_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ namespace ovms {
class PythonBackend;
class Status;

// Checks whether the request contains user-provided input side packets
// (parameters other than the reserved OVMS_MP_TIMESTAMP).
bool requestHasInputSidePackets(const KFSRequest& request);

// Deserialization of parameters inside KServe gRPC request
// into mediapipe Packets.
// To be used by both - infer & inferStream.
Expand Down
1 change: 1 addition & 0 deletions src/llm/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ ovms_cc_library(
"//third_party:openvino",
"@mediapipe//mediapipe/framework:calculator_framework",
"@com_github_tencent_rapidjson//:rapidjson",
"//src:mediapipe_internal_graph_side_packets",
"//src/kfserving_api:kfserving_api_cpp",
"//src:libovmsprofiler",
":genai_servables",
Expand Down
25 changes: 24 additions & 1 deletion src/llm/http_llm_calculator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// limitations under the License.
//*****************************************************************************
#include <atomic>
#include <mutex>
#include <string>

#pragma warning(push)
Expand All @@ -27,6 +28,7 @@

#include "../http_payload.hpp"
#include "../logging.hpp"
#include "../mediapipe_internal/graph_side_packets.hpp"
#include "../profiler.hpp"
#include "apis/openai_completions.hpp"
#include "servable.hpp"
Expand All @@ -36,9 +38,11 @@ using namespace ovms;
namespace mediapipe {

const std::string LLM_SESSION_SIDE_PACKET_TAG = "LLM_NODE_RESOURCES";
const std::string LLM_EXECUTION_CONTEXT_SIDE_PACKET_TAG = "LLM_NODE_EXECUTION_CONTEXTS";

class HttpLLMCalculator : public CalculatorBase {
std::shared_ptr<GenAiServable> servable;
std::shared_ptr<GenAiExecutionContextHolder> executionContextHolder;
std::shared_ptr<GenAiServableExecutionContext> executionContext;

static const std::string INPUT_TAG_NAME;
Expand All @@ -54,6 +58,9 @@ class HttpLLMCalculator : public CalculatorBase {
cc->Inputs().Tag(INPUT_TAG_NAME).Set<ovms::HttpPayload>();
cc->Inputs().Tag(LOOPBACK_TAG_NAME).Set<bool>();
cc->InputSidePackets().Tag(LLM_SESSION_SIDE_PACKET_TAG).Set<ovms::GenAiServableMap>();
if (cc->InputSidePackets().HasTag(LLM_EXECUTION_CONTEXT_SIDE_PACKET_TAG)) {
cc->InputSidePackets().Tag(LLM_EXECUTION_CONTEXT_SIDE_PACKET_TAG).Set<ovms::GenAiExecutionContextMap>();
}
cc->Outputs().Tag(OUTPUT_TAG_NAME).Set<std::string>();
cc->Outputs().Tag(LOOPBACK_TAG_NAME).Set<bool>();
return absl::OkStatus();
Expand All @@ -72,7 +79,17 @@ class HttpLLMCalculator : public CalculatorBase {
auto it = servableMap.find(cc->NodeName());
RET_CHECK(it != servableMap.end()) << "Could not find initialized LLM node named: " << cc->NodeName();
this->servable = it->second;
this->executionContext = servable->createExecutionContext();

if (cc->InputSidePackets().HasTag(LLM_EXECUTION_CONTEXT_SIDE_PACKET_TAG) && !cc->InputSidePackets().Tag(LLM_EXECUTION_CONTEXT_SIDE_PACKET_TAG).IsEmpty()) {
ovms::GenAiExecutionContextMap executionContextMap = cc->InputSidePackets().Tag(LLM_EXECUTION_CONTEXT_SIDE_PACKET_TAG).Get<ovms::GenAiExecutionContextMap>();
auto contextIt = executionContextMap.find(cc->NodeName());
RET_CHECK(contextIt != executionContextMap.end()) << "Could not find LLM execution context holder for node named: " << cc->NodeName();
this->executionContextHolder = contextIt->second;
}

if (!this->executionContextHolder) {
this->executionContext = servable->createExecutionContext();
}
SPDLOG_LOGGER_DEBUG(llm_calculator_logger, "LLMCalculator [Node: {}] Open end", cc->NodeName());
return absl::OkStatus();
}
Expand All @@ -81,6 +98,12 @@ class HttpLLMCalculator : public CalculatorBase {
OVMS_PROFILE_FUNCTION();
RET_CHECK(this->servable != nullptr);

if (this->executionContextHolder) {
std::lock_guard<std::mutex> lock(this->executionContextHolder->mutex);
this->executionContext = this->executionContextHolder->executionContext;
}
RET_CHECK(this->executionContext != nullptr) << "LLM execution context not initialized for node: " << cc->NodeName();

// For cases where MediaPipe decides to trigger Process() when there are no inputs
if (cc->Inputs().Tag(INPUT_TAG_NAME).IsEmpty() && cc->Inputs().Tag(LOOPBACK_TAG_NAME).IsEmpty()) {
return absl::OkStatus();
Expand Down
3 changes: 2 additions & 1 deletion src/logging.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ std::shared_ptr<spdlog::logger> rerank_calculator_logger = std::make_shared<spdl
#if (OV_TRACE == 1)
std::shared_ptr<spdlog::logger> ov_logger = std::make_shared<spdlog::logger>("openvino");
#endif
const std::string default_pattern = "[%Y-%m-%d %T.%e][%t][%n][%l][%s:%#] %v";
// const std::string default_pattern = "[%i] [%Y-%m-%d %T.%f][%t][%n][%l][%s:%#] %v";
const std::string default_pattern = "[%Y-%m-%d %T.%f][%t][%n][%l][%s:%#] %v";

static void set_log_level(const std::string log_level, std::shared_ptr<spdlog::logger> logger) {
logger->set_level(spdlog::level::info);
Expand Down
35 changes: 35 additions & 0 deletions src/mediapipe_internal/graph_executor_constants.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
//*****************************************************************************
// Copyright 2026 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//*****************************************************************************
#pragma once

#include <cstdint>
#include <string>

namespace ovms {

inline const std::string PYTHON_SESSION_SIDE_PACKET_TAG = "py";
inline const std::string LLM_SESSION_SIDE_PACKET_TAG = "llm";
inline const std::string LLM_EXECUTION_CONTEXT_SESSION_SIDE_PACKET_TAG = "llm_ctx";
inline const std::string IMAGE_GEN_SESSION_SIDE_PACKET_TAG = "pipes";
inline const std::string EMBEDDINGS_SESSION_SIDE_PACKET_TAG = "embeddings_servable";
inline const std::string RERANK_SESSION_SIDE_PACKET_TAG = "rerank_servable";
inline const std::string STT_SESSION_SIDE_PACKET_TAG = "s2t_servable";
inline const std::string TTS_SESSION_SIDE_PACKET_TAG = "t2s_servable";
inline const std::string PYTHON_SIDE_PACKET_NAME = "py";
inline const std::string LLM_SESSION_PACKET_NAME = "llm";
inline constexpr int64_t STARTING_TIMESTAMP_VALUE = 0;

} // namespace ovms
80 changes: 80 additions & 0 deletions src/mediapipe_internal/graph_side_packets.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
//*****************************************************************************
// Copyright 2025 Intel Corporation
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//*****************************************************************************
#pragma once

#include <mutex>
#include <memory>
#include <string>
#include <unordered_map>

namespace ovms {

// Forward declarations - only shared_ptrs are stored so full definitions are not needed
class PythonNodeResources;
class GenAiServable;
struct GenAiServableExecutionContext;
struct ImageGenerationPipelines;
struct EmbeddingsServable;
struct RerankServable;
struct SttServable;
class TtsServable;

using PythonNodeResourcesMap = std::unordered_map<std::string, std::shared_ptr<PythonNodeResources>>;
using GenAiServableMap = std::unordered_map<std::string, std::shared_ptr<GenAiServable>>;
using RerankServableMap = std::unordered_map<std::string, std::shared_ptr<RerankServable>>;
using SttServableMap = std::unordered_map<std::string, std::shared_ptr<SttServable>>;
using TtsServableMap = std::unordered_map<std::string, std::shared_ptr<TtsServable>>;
using EmbeddingsServableMap = std::unordered_map<std::string, std::shared_ptr<EmbeddingsServable>>;
using ImageGenerationPipelinesMap = std::unordered_map<std::string, std::shared_ptr<ImageGenerationPipelines>>;

struct GenAiExecutionContextHolder {
std::mutex mutex;
std::shared_ptr<GenAiServableExecutionContext> executionContext;
};
using GenAiExecutionContextMap = std::unordered_map<std::string, std::shared_ptr<GenAiExecutionContextHolder>>;

struct GraphSidePackets {
PythonNodeResourcesMap pythonNodeResourcesMap;
GenAiServableMap genAiServableMap;
GenAiExecutionContextMap genAiExecutionContextMap;
ImageGenerationPipelinesMap imageGenPipelinesMap;
EmbeddingsServableMap embeddingsServableMap;
RerankServableMap rerankServableMap;
SttServableMap sttServableMap;
TtsServableMap ttsServableMap;
void clear() {
pythonNodeResourcesMap.clear();
genAiServableMap.clear();
genAiExecutionContextMap.clear();
imageGenPipelinesMap.clear();
embeddingsServableMap.clear();
rerankServableMap.clear();
sttServableMap.clear();
ttsServableMap.clear();
}
bool empty() {
return (pythonNodeResourcesMap.empty() &&
genAiServableMap.empty() &&
genAiExecutionContextMap.empty() &&
imageGenPipelinesMap.empty() &&
embeddingsServableMap.empty() &&
rerankServableMap.empty() &&
sttServableMap.empty() &&
ttsServableMap.empty());
}
};

} // namespace ovms
Loading