From b6694b59ae2f622aa18e96e86bd977d92e1c77a2 Mon Sep 17 00:00:00 2001 From: Damian Kalinowski Date: Tue, 20 Jan 2026 14:27:03 +0100 Subject: [PATCH 1/4] save --- Dockerfile.ubuntu | 1 + 1 file changed, 1 insertion(+) diff --git a/Dockerfile.ubuntu b/Dockerfile.ubuntu index 326d5bbed2..3ae60d29a5 100644 --- a/Dockerfile.ubuntu +++ b/Dockerfile.ubuntu @@ -113,6 +113,7 @@ RUN apt-get update && apt-get install --no-install-recommends -y \ libtbb-dev \ libssl-dev \ libxml2 \ + ocl-icd-opencl-dev \ python3.10-dev \ python3.10-venv \ python3-pip \ From 9912713e091eb6451efebdd60a45e1aa201f144a Mon Sep 17 00:00:00 2001 From: Damian Kalinowski Date: Tue, 20 Jan 2026 14:56:50 +0100 Subject: [PATCH 2/4] save --- Dockerfile.ubuntu | 3 --- 1 file changed, 3 deletions(-) diff --git a/Dockerfile.ubuntu b/Dockerfile.ubuntu index 3ae60d29a5..15e47daf20 100644 --- a/Dockerfile.ubuntu +++ b/Dockerfile.ubuntu @@ -305,9 +305,6 @@ RUN bash -c "sed -i -e 's|REPLACE_PROJECT_VERSION|${PROJECT_VERSION}|g' /ovms/sr if [ "$ov_use_binary" == "0" ] ; then sed -i -e "s#REPLACE_OPENVINO_NAME#$(git --git-dir /openvino/.git log -n 1 | head -n 1 | cut -d' ' -f2 | head -c 12)#g" /ovms/src/version.hpp ; fi && \ bash -c "sed -i -e 's|REPLACE_BAZEL_BUILD_FLAGS|${debug_bazel_flags}${minitrace_flags}|g' /ovms/src/version.hpp" -WORKDIR /usr/lib/x86_64-linux-gnu/ -RUN ln -s libOpenCL.so.1 libOpenCL.so - WORKDIR /patchelf # hadolint ignore=DL3003 RUN wget -q https://github.com/NixOS/patchelf/archive/0.10.tar.gz && \ From 229cdb442a087ca63a7242e3bfaa73919ec45c76 Mon Sep 17 00:00:00 2001 From: Rafal Sapala Date: Wed, 21 Jan 2026 18:18:15 +0100 Subject: [PATCH 3/4] Research --- src/BUILD | 1 + src/embeddings/BUILD | 40 +++- .../genai_embeddings_calculator_ov.cc | 192 ++++++++++++++++++ src/embeddings/genai_embeddings_servable.cpp | 53 +++++ src/embeddings/genai_embeddings_servable.hpp | 61 ++++++ .../mediapipegraphdefinition.cpp | 35 ++++ .../mediapipegraphdefinition.hpp | 6 + .../mediapipegraphexecutor.cpp | 3 +- .../mediapipegraphexecutor.hpp | 1 + .../embeddings/graph_genai_ov_no_norm.pbtxt | 30 +++ 10 files changed, 420 insertions(+), 2 deletions(-) create mode 100644 src/embeddings/genai_embeddings_calculator_ov.cc create mode 100644 src/embeddings/genai_embeddings_servable.cpp create mode 100644 src/embeddings/genai_embeddings_servable.hpp create mode 100644 src/test/embeddings/graph_genai_ov_no_norm.pbtxt diff --git a/src/BUILD b/src/BUILD index 3dd94acdc1..d819e2f438 100644 --- a/src/BUILD +++ b/src/BUILD @@ -567,6 +567,7 @@ ovms_cc_library( "//src/image_gen:imagegen_init", "//src/llm:openai_completions_api_handler", "//src/embeddings:embeddingscalculator_ov", + "//src/embeddings:genai_embeddingscalculator_ov", "//src/rerank:rerankcalculator", "//src/rerank:rerankcalculator_ov", "//src/llm:llmcalculator",], diff --git a/src/embeddings/BUILD b/src/embeddings/BUILD index 0e18423a20..581bae611d 100644 --- a/src/embeddings/BUILD +++ b/src/embeddings/BUILD @@ -34,7 +34,21 @@ ovms_cc_library( ovms_cc_library( name = "embeddings_servable", srcs = ["embeddings_servable.cpp"], - hdrs = ["embeddings_servable.hpp"], + hdrs = ["embeddings_servable.hpp", "genai_embeddings_servable.hpp"], + deps = [ + "//src:libovmslogging", + "//src:sidepacket_servable", + "//third_party:openvino", + "embeddings_calculator_ov_cc_proto", + ], + visibility = ["//visibility:public"], + alwayslink = 1, +) + +ovms_cc_library( + name = "genai_embeddings_servable", + srcs = ["genai_embeddings_servable.cpp"], + hdrs = ["genai_embeddings_servable.hpp"], deps = [ "//src:libovmslogging", "//src:sidepacket_servable", @@ -88,3 +102,27 @@ ovms_cc_library( visibility = ["//visibility:public"], alwayslink = 1, ) + +ovms_cc_library( + name = "genai_embeddingscalculator_ov", + hdrs = [], + srcs = ["genai_embeddings_calculator_ov.cc"], + deps = [ + "@mediapipe//mediapipe/framework:calculator_framework", + "@com_github_tencent_rapidjson//:rapidjson", + "@model_api//:model_api", + "//src:httppayload", + "//src:libhttpclientconnection", + "//src:libovmslogging", + "//src:libovmsprofiler", + "embeddings_calculator_ov_cc_proto", + ":genai_embeddings_servable", + "//src:sidepacket_servable", + "//src:model_metric_reporter", + "//src:executingstreamidguard", + "//src:libovms_execution_context", + ":embeddings_api", + ], + visibility = ["//visibility:public"], + alwayslink = 1, +) diff --git a/src/embeddings/genai_embeddings_calculator_ov.cc b/src/embeddings/genai_embeddings_calculator_ov.cc new file mode 100644 index 0000000000..645d8da0a9 --- /dev/null +++ b/src/embeddings/genai_embeddings_calculator_ov.cc @@ -0,0 +1,192 @@ +//***************************************************************************** +// Copyright 2025 Intel Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//***************************************************************************** +#include +#include + +#pragma warning(push) +#pragma warning(disable : 6001 6385 6386 6326 6011 4309 6246 4005 4456) +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wdeprecated-declarations" +#include "mediapipe/framework/calculator_framework.h" +#include "mediapipe/framework/port/canonical_errors.h" +#include "mediapipe/framework/port/ret_check.h" +#pragma GCC diagnostic pop +#pragma warning(pop) + +#include +#include "src/port/rapidjson_writer.hpp" + +#include "../http_payload.hpp" +#include "../logging.hpp" +#include "../precision.hpp" +#include "../profiler.hpp" +#include "../executingstreamidguard.hpp" +#include "../model_metric_reporter.hpp" +#include "embeddings_api.hpp" +#include "src/embeddings/embeddings_calculator_ov.pb.h" +#include "genai_embeddings_servable.hpp" + +using namespace rapidjson; +using namespace ovms; +class GenaiEmbeddingsServable; + +namespace mediapipe { + +const std::string EMBEDDINGS_SESSION_SIDE_PACKET_TAG = "GENAI_EMBEDDINGS_NODE_RESOURCES"; + +using InputDataType = ovms::HttpPayload; +using OutputDataType = std::string; + +// Helper function to print nested vectors +void printVariant(const ov::genai::EmbeddingResults& v) { + std::visit([](const auto& data) { + for (const auto& row : data) { + for (const auto& val : row) { + std::cout << std::setw(4) << val << " "; + } + std::cout << "\n"; + } + }, v); +} + +class GenaiEmbeddingsCalculatorOV : public CalculatorBase { + static const std::string INPUT_TAG_NAME; + static const std::string OUTPUT_TAG_NAME; + static const std::string EMBEDDINGS_MODEL_INPUT_IDS_NAME; + static const std::string EMBEDDINGS_MODEL_ATTENTION_MASK_NAME; + static const std::string EMBEDDINGS_MODEL_TOKEN_TYPE_IDS_NAME; + + mediapipe::Timestamp timestamp{0}; + + absl::Status tokenizeStrings(ov::genai::Tokenizer& tokenizer, const std::vector& inputStrings, const ov::AnyMap& parameters, ov::genai::TokenizedInputs& tokens) { + tokens = tokenizer.encode(inputStrings, parameters); + RET_CHECK(tokens.input_ids.get_shape().size() == 2); + + return absl::OkStatus(); + } + +protected: + std::shared_ptr embeddings_session{nullptr}; + +public: + static absl::Status GetContract(CalculatorContract* cc) { + RET_CHECK(!cc->Inputs().GetTags().empty()); + RET_CHECK(!cc->Outputs().GetTags().empty()); + cc->Inputs().Tag(INPUT_TAG_NAME).Set(); + cc->Outputs().Tag(OUTPUT_TAG_NAME).Set(); + cc->InputSidePackets().Tag(EMBEDDINGS_SESSION_SIDE_PACKET_TAG).Set(); + return absl::OkStatus(); + } + + absl::Status Close(CalculatorContext* cc) final { + OVMS_PROFILE_FUNCTION(); + SPDLOG_LOGGER_DEBUG(embeddings_calculator_logger, "GenaiEmbeddingsCalculatorOV [Node: {} ] Close", cc->NodeName()); + return absl::OkStatus(); + } + + absl::Status Open(CalculatorContext* cc) final { + OVMS_PROFILE_FUNCTION(); + SPDLOG_LOGGER_DEBUG(embeddings_calculator_logger, "GenaiEmbeddingsCalculatorOV [Node: {}] Open start", cc->NodeName()); + auto servableMap = cc->InputSidePackets() + .Tag(EMBEDDINGS_SESSION_SIDE_PACKET_TAG) + .Get(); + auto it = servableMap.find(cc->NodeName()); + RET_CHECK(it != servableMap.end()) << "Could not find initialized Embeddings node named: " << cc->NodeName(); + embeddings_session = it->second; + SPDLOG_LOGGER_DEBUG(embeddings_calculator_logger, "GenaiEmbeddingsCalculatorOV [Node: {}] Open end", cc->NodeName()); + + return absl::OkStatus(); + } + + absl::Status Process(CalculatorContext* cc) final { + OVMS_PROFILE_FUNCTION(); + RET_CHECK(embeddings_session != nullptr); + if (cc->Inputs().Tag(INPUT_TAG_NAME).IsEmpty()) { + return absl::InvalidArgumentError("Input is empty"); + } + InputDataType payload = cc->Inputs().Tag(INPUT_TAG_NAME).Get(); + SPDLOG_LOGGER_DEBUG(embeddings_calculator_logger, "Request body: {}", payload.body); + SPDLOG_LOGGER_DEBUG(embeddings_calculator_logger, "Request uri: {}", payload.uri); + + ov::Tensor embeddingsTensor; + //size_t received_batch_size = 1; + size_t max_context_length = 1024; // default allowed input length. Otherwise, it will be read from model config.json file + ov::genai::TokenizedInputs tokens; + ov::Tensor typeIds; + if (embeddings_session->getMaxModelLength().has_value()) { + max_context_length = embeddings_session->getMaxModelLength().value(); + } else { + SPDLOG_LOGGER_DEBUG(embeddings_calculator_logger, "max_position_embeddings nor max_trained_positions included in config.json. Using default value {}", max_context_length); + } + + ovms::EmbeddingsHandler handler(*payload.parsedJson); + auto parseRequestStartTime = std::chrono::high_resolution_clock::now(); + absl::Status status = handler.parseRequest(); + + if (!status.ok()) { + return status; + } + double time = std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - parseRequestStartTime).count(); + SPDLOG_LOGGER_DEBUG(embeddings_calculator_logger, "Embeddings request deserialization time: {} ms", time / 1000); + + ModelMetricReporter unused(nullptr, nullptr, "unused", 1); + + try { + auto input = handler.getInput(); + if (auto strings = std::get_if>(&input)) { + ov::AnyMap& params = handler.getParameters(); + //size_t received_batch_size = strings->size(); + if (cc->Options().truncate() && params.find("max_length") == params.end()) { + params["max_length"] = max_context_length; + } + + + // TODO:handler.setPromptTokensUsage(attendedTokens); + + ov::genai::EmbeddingResults documents_embeddings = embeddings_session->m_pipeline->embed_documents(*strings); + std::cout << std::endl << "documents_embeddings:" << std::endl; + printVariant(documents_embeddings); + } else if (auto tokenized_documents = std::get_if>>(&input)) { + SPDLOG_LOGGER_DEBUG(embeddings_calculator_logger, "Tokens on input {}", tokenized_documents->size()); + return absl::InvalidArgumentError(absl::StrCat("Tokens on input ")); + } + + + } catch (const std::exception& e) { + SPDLOG_LOGGER_DEBUG(embeddings_calculator_logger, "Caught exception from session infer(): {}", e.what()); + LOG(INFO) << e.what(); + RET_CHECK(false); + } catch (...) { + SPDLOG_LOGGER_DEBUG(embeddings_calculator_logger, "Caught unknown exception from session infer()"); + RET_CHECK(false); + } + + // TODO:time = std::chrono::duration_cast(std::chrono::high_resolution_clock::now() - parseResponseStartTime).count(); + //SPDLOG_LOGGER_DEBUG(embeddings_calculator_logger, "Embeddings response deserialization time: {} ms", time / 1000); + // TODO:buffer.GetString() + cc->Outputs().Tag(OUTPUT_TAG_NAME).Add(new std::string("buffer.GetString()"), timestamp); + return absl::OkStatus(); + } +}; +const std::string GenaiEmbeddingsCalculatorOV::INPUT_TAG_NAME{"REQUEST_PAYLOAD"}; +const std::string GenaiEmbeddingsCalculatorOV::OUTPUT_TAG_NAME{"RESPONSE_PAYLOAD"}; +const std::string GenaiEmbeddingsCalculatorOV::EMBEDDINGS_MODEL_INPUT_IDS_NAME{"input_ids"}; +const std::string GenaiEmbeddingsCalculatorOV::EMBEDDINGS_MODEL_ATTENTION_MASK_NAME{"attention_mask"}; +const std::string GenaiEmbeddingsCalculatorOV::EMBEDDINGS_MODEL_TOKEN_TYPE_IDS_NAME{"token_type_ids"}; + +REGISTER_CALCULATOR(GenaiEmbeddingsCalculatorOV); + +} // namespace mediapipe diff --git a/src/embeddings/genai_embeddings_servable.cpp b/src/embeddings/genai_embeddings_servable.cpp new file mode 100644 index 0000000000..699e647597 --- /dev/null +++ b/src/embeddings/genai_embeddings_servable.cpp @@ -0,0 +1,53 @@ +//***************************************************************************** +// Copyright 2025 Intel Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//***************************************************************************** +#include "genai_embeddings_servable.hpp" + +#include + +#include "../logging.hpp" + +#include "openvino/core/except.hpp" +#include "openvino/genai/rag/text_embedding_pipeline.hpp" +#include "openvino/genai/tokenizer.hpp" +#include "openvino/opsets/opset.hpp" +#include "openvino/opsets/opset1.hpp" +#include "openvino/opsets/opset3.hpp" +#include "openvino/opsets/opset8.hpp" + +using namespace ov::genai; +using namespace ov; + +namespace ovms { +void GenaiEmbeddingsServable::initialize(const std::string& modelDir, const std::string& targetDevice, const std::string& pluginConfig, const std::string& graphPath) { + auto fsModelsPath = std::filesystem::path(modelDir); + if (fsModelsPath.is_relative()) { + parsedModelsPath = (std::filesystem::path(graphPath) / fsModelsPath); + } else { + parsedModelsPath = fsModelsPath.string(); + } + + /*ov::AnyMap properties; + auto status = JsonParser::parsePluginConfig(pluginConfig, properties); + if (!status.ok()) { + SPDLOG_ERROR("Error during embeddings node plugin_config option parsing to JSON: {}", pluginConfig); + }*/ + + TextEmbeddingPipeline::Config config; + config.pooling_type = TextEmbeddingPipeline::PoolingType::MEAN; + m_pipeline = std::make_unique(parsedModelsPath, targetDevice, config); +} + +} // namespace ovms diff --git a/src/embeddings/genai_embeddings_servable.hpp b/src/embeddings/genai_embeddings_servable.hpp new file mode 100644 index 0000000000..944c7d4153 --- /dev/null +++ b/src/embeddings/genai_embeddings_servable.hpp @@ -0,0 +1,61 @@ +//***************************************************************************** +// Copyright 2025 Intel Corporation +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +//***************************************************************************** +#pragma once + +#include "../filesystem.hpp" +#include "../sidepacket_servable.hpp" +#include "openvino/genai/rag/text_embedding_pipeline.hpp" +#include "src/embeddings/embeddings_calculator_ov.pb.h" +#include +#include +#include +#include +#include + +namespace ovms { + +struct GenaiEmbeddingsServable : public SidepacketServable { +public: + GenaiEmbeddingsServable( + const std::string& modelDir, + const std::string& targetDevice, + const std::string& pluginConfig, + const std::string& graphPath, + mediapipe::EmbeddingsCalculatorOVOptions_Pooling pooling, + bool normalizeEmbeddings) : + SidepacketServable(modelDir, targetDevice, pluginConfig, graphPath), + pooling(pooling), + normalizeEmbeddings(normalizeEmbeddings) {} + + void initialize(const std::string& modelDir, const std::string& targetDevice, const std::string& pluginConfig, const std::string& graphPath); + int getTargetOutputIndex() const { + return targetOutputIndex; + } + + std::unique_ptr m_pipeline; + +protected: + // std::shared_ptr applyPrePostProcessing(std::shared_ptr model) override; + +private: + mediapipe::EmbeddingsCalculatorOVOptions_Pooling pooling; + bool normalizeEmbeddings; + + int targetOutputIndex = -1; +}; + +using GenaiEmbeddingsServableMap = std::unordered_map>; +} // namespace ovms diff --git a/src/mediapipe_internal/mediapipegraphdefinition.cpp b/src/mediapipe_internal/mediapipegraphdefinition.cpp index a80934b2d5..3e9b0cd5a1 100644 --- a/src/mediapipe_internal/mediapipegraphdefinition.cpp +++ b/src/mediapipe_internal/mediapipegraphdefinition.cpp @@ -48,6 +48,7 @@ #include "mediapipe_utils.hpp" #include "mediapipegraphexecutor.hpp" #include "src/embeddings/embeddings_calculator_ov.pb.h" +#include "src/embeddings/genai_embeddings_servable.hpp" #include "src/rerank/rerank_calculator_ov.pb.h" #include "src/image_gen/pipelines.hpp" @@ -64,6 +65,7 @@ const std::string MediapipeGraphDefinition::IMAGE_GEN_CALCULATOR_NAME{"ImageGenC const std::string MediapipeGraphDefinition::STT_NODE_CALCULATOR_NAME{"S2tCalculator"}; const std::string MediapipeGraphDefinition::TTS_NODE_CALCULATOR_NAME{"T2sCalculator"}; const std::string MediapipeGraphDefinition::EMBEDDINGS_NODE_CALCULATOR_NAME{"EmbeddingsCalculatorOV"}; +const std::string MediapipeGraphDefinition::GENAI_EMBEDDINGS_NODE_CALCULATOR_NAME{"GenaiEmbeddingsCalculatorOV"}; const std::string MediapipeGraphDefinition::RERANK_NODE_CALCULATOR_NAME{"RerankCalculatorOV"}; MediapipeGraphDefinition::~MediapipeGraphDefinition() = default; @@ -545,6 +547,39 @@ Status MediapipeGraphDefinition::initializeNodes() { embeddingsServableMap.insert(std::pair>(nodeName, std::move(servable))); embeddingsServablesCleaningGuard.disableCleaning(); } + if (endsWith(config.node(i).calculator(), GENAI_EMBEDDINGS_NODE_CALCULATOR_NAME)) { + auto& genaiEmbeddingsServableMap = this->sidePacketMaps.genaiEmbeddingsServableMap; + ResourcesCleaningGuard genaiEmbeddingsServablesCleaningGuard(genaiEmbeddingsServableMap); + if (!config.node(i).node_options().size()) { + SPDLOG_LOGGER_ERROR(modelmanager_logger, "Embeddings node missing options in graph: {}. ", this->name); + return StatusCode::LLM_NODE_MISSING_OPTIONS; + } + if (config.node(i).name().empty()) { + SPDLOG_LOGGER_ERROR(modelmanager_logger, "Embeddings node name is missing in graph: {}. ", this->name); + return StatusCode::LLM_NODE_MISSING_NAME; + } + std::string nodeName = config.node(i).name(); + if (genaiEmbeddingsServableMap.find(nodeName) != genaiEmbeddingsServableMap.end()) { + SPDLOG_LOGGER_ERROR(modelmanager_logger, "Embeddings node name: {} already used in graph: {}. ", nodeName, this->name); + return StatusCode::LLM_NODE_NAME_ALREADY_EXISTS; + } + mediapipe::EmbeddingsCalculatorOVOptions nodeOptions; + config.node(i).node_options(0).UnpackTo(&nodeOptions); + std::shared_ptr servable = std::make_shared( + nodeOptions.models_path(), + nodeOptions.target_device(), + nodeOptions.plugin_config(), + mgconfig.getBasePath(), + nodeOptions.pooling(), + nodeOptions.normalize_embeddings()); + servable->initialize( + nodeOptions.models_path(), + nodeOptions.target_device(), + nodeOptions.plugin_config(), + mgconfig.getBasePath()); + genaiEmbeddingsServableMap.insert(std::pair>(nodeName, std::move(servable))); + genaiEmbeddingsServablesCleaningGuard.disableCleaning(); + } if (endsWith(config.node(i).calculator(), RERANK_NODE_CALCULATOR_NAME)) { auto& rerankServableMap = this->sidePacketMaps.rerankServableMap; ResourcesCleaningGuard rerankServablesCleaningGuard(rerankServableMap); diff --git a/src/mediapipe_internal/mediapipegraphdefinition.hpp b/src/mediapipe_internal/mediapipegraphdefinition.hpp index 14c9e0679f..b6a6a25709 100644 --- a/src/mediapipe_internal/mediapipegraphdefinition.hpp +++ b/src/mediapipe_internal/mediapipegraphdefinition.hpp @@ -45,6 +45,7 @@ #include "../sidepacket_servable.hpp" #include "../embeddings/embeddings_servable.hpp" +#include "../embeddings/genai_embeddings_servable.hpp" #include "../rerank/rerank_servable.hpp" #include "../audio/speech_to_text/s2t_servable.hpp" #include "../audio/text_to_speech/t2s_servable.hpp" @@ -67,6 +68,7 @@ using RerankServableMap = std::unordered_map>; using TtsServableMap = std::unordered_map>; using EmbeddingsServableMap = std::unordered_map>; +using GenaiEmbeddingsServableMap = std::unordered_map>; using ImageGenerationPipelinesMap = std::unordered_map>; struct GraphSidePackets { @@ -74,6 +76,7 @@ struct GraphSidePackets { GenAiServableMap genAiServableMap; ImageGenerationPipelinesMap imageGenPipelinesMap; EmbeddingsServableMap embeddingsServableMap; + GenaiEmbeddingsServableMap genaiEmbeddingsServableMap; RerankServableMap rerankServableMap; SttServableMap sttServableMap; TtsServableMap ttsServableMap; @@ -82,6 +85,7 @@ struct GraphSidePackets { genAiServableMap.clear(); imageGenPipelinesMap.clear(); embeddingsServableMap.clear(); + genaiEmbeddingsServableMap.clear(); rerankServableMap.clear(); sttServableMap.clear(); ttsServableMap.clear(); @@ -91,6 +95,7 @@ struct GraphSidePackets { genAiServableMap.empty() && imageGenPipelinesMap.empty() && embeddingsServableMap.empty() && + genaiEmbeddingsServableMap.empty() && rerankServableMap.empty() && sttServableMap.empty() && ttsServableMap.empty()); @@ -133,6 +138,7 @@ class MediapipeGraphDefinition { static const std::string LLM_NODE_CALCULATOR_NAME; static const std::string IMAGE_GEN_CALCULATOR_NAME; static const std::string EMBEDDINGS_NODE_CALCULATOR_NAME; + static const std::string GENAI_EMBEDDINGS_NODE_CALCULATOR_NAME; static const std::string RERANK_NODE_CALCULATOR_NAME; static const std::string STT_NODE_CALCULATOR_NAME; static const std::string TTS_NODE_CALCULATOR_NAME; diff --git a/src/mediapipe_internal/mediapipegraphexecutor.cpp b/src/mediapipe_internal/mediapipegraphexecutor.cpp index 93b53fdf8e..2a35235a55 100644 --- a/src/mediapipe_internal/mediapipegraphexecutor.cpp +++ b/src/mediapipe_internal/mediapipegraphexecutor.cpp @@ -46,6 +46,7 @@ MediapipeGraphExecutor::MediapipeGraphExecutor( const PythonNodeResourcesMap& pythonNodeResourcesMap, const GenAiServableMap& llmNodeResourcesMap, const EmbeddingsServableMap& embeddingsServableMap, + const GenaiEmbeddingsServableMap& genaiEmbeddingsServableMap, const RerankServableMap& rerankServableMap, const SttServableMap& sttServableMap, const TtsServableMap& ttsServableMap, @@ -58,7 +59,7 @@ MediapipeGraphExecutor::MediapipeGraphExecutor( outputTypes(std::move(outputTypes)), inputNames(std::move(inputNames)), outputNames(std::move(outputNames)), - sidePacketMaps({pythonNodeResourcesMap, llmNodeResourcesMap, {}, embeddingsServableMap, rerankServableMap, sttServableMap, ttsServableMap}), + sidePacketMaps({pythonNodeResourcesMap, llmNodeResourcesMap, {}, embeddingsServableMap, genaiEmbeddingsServableMap, rerankServableMap, sttServableMap, ttsServableMap}), pythonBackend(pythonBackend), currentStreamTimestamp(STARTING_TIMESTAMP), mediapipeServableMetricReporter(mediapipeServableMetricReporter) {} diff --git a/src/mediapipe_internal/mediapipegraphexecutor.hpp b/src/mediapipe_internal/mediapipegraphexecutor.hpp index c165469395..7dc2ea7df9 100644 --- a/src/mediapipe_internal/mediapipegraphexecutor.hpp +++ b/src/mediapipe_internal/mediapipegraphexecutor.hpp @@ -104,6 +104,7 @@ class MediapipeGraphExecutor { const PythonNodeResourcesMap& pythonNodeResourcesMap, const GenAiServableMap& llmNodeResourcesMap, const EmbeddingsServableMap& embeddingsServableMap, + const GenaiEmbeddingsServableMap& genaiEmbeddingsServableMap, const RerankServableMap& rerankServableMap, const SttServableMap& sttServableMap, const TtsServableMap& ttsServableMap, diff --git a/src/test/embeddings/graph_genai_ov_no_norm.pbtxt b/src/test/embeddings/graph_genai_ov_no_norm.pbtxt new file mode 100644 index 0000000000..4080287001 --- /dev/null +++ b/src/test/embeddings/graph_genai_ov_no_norm.pbtxt @@ -0,0 +1,30 @@ +# Copyright 2024 Intel Corporation +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +input_stream: "REQUEST_PAYLOAD:input" +output_stream: "RESPONSE_PAYLOAD:output" + +node { + name: "embeddingsNode1" + calculator: "GenaiEmbeddingsCalculatorOV" + input_stream: "REQUEST_PAYLOAD:input" + output_stream: "RESPONSE_PAYLOAD:output" + input_side_packet: "GENAI_EMBEDDINGS_NODE_RESOURCES:embeddings_servable" + node_options: { + [type.googleapis.com/mediapipe.EmbeddingsCalculatorOVOptions]: { + models_path: "/ovms/src/test/llm_testing/thenlper/gte-small/ov" + normalize_embeddings: false + } + } +} From f801f03bb5b24c2362ad861110620d0e39083e69 Mon Sep 17 00:00:00 2001 From: Rafal Sapala Date: Mon, 26 Jan 2026 11:08:42 +0100 Subject: [PATCH 4/4] Added genEmbeddingsClculator --- .../genai_embeddings_calculator_ov.cc | 15 +++---- src/embeddings/genai_embeddings_servable.cpp | 24 ++++++++-- .../mediapipegraphexecutor.cpp | 1 + .../mediapipegraphexecutor.hpp | 4 +- .../embeddings/genai_config_embeddings.json | 10 +++++ .../embeddings/graph_genai_ov_no_norm.pbtxt | 2 +- src/test/embeddingsnode_test.cpp | 42 ++++++++++++++++++ src/test/mediapipeflow_test.cpp | 2 +- src/test/pythonnode_test.cpp | 2 +- src/test/streaming_test.cpp | 44 +++++++++---------- 10 files changed, 108 insertions(+), 38 deletions(-) create mode 100644 src/test/embeddings/genai_config_embeddings.json diff --git a/src/embeddings/genai_embeddings_calculator_ov.cc b/src/embeddings/genai_embeddings_calculator_ov.cc index 645d8da0a9..a648cc8d71 100644 --- a/src/embeddings/genai_embeddings_calculator_ov.cc +++ b/src/embeddings/genai_embeddings_calculator_ov.cc @@ -45,7 +45,7 @@ class GenaiEmbeddingsServable; namespace mediapipe { -const std::string EMBEDDINGS_SESSION_SIDE_PACKET_TAG = "GENAI_EMBEDDINGS_NODE_RESOURCES"; +const std::string GENAI_EMBEDDINGS_SESSION_SIDE_PACKET_TAG = "GENAI_EMBEDDINGS_NODE_RESOURCES"; using InputDataType = ovms::HttpPayload; using OutputDataType = std::string; @@ -87,7 +87,7 @@ class GenaiEmbeddingsCalculatorOV : public CalculatorBase { RET_CHECK(!cc->Outputs().GetTags().empty()); cc->Inputs().Tag(INPUT_TAG_NAME).Set(); cc->Outputs().Tag(OUTPUT_TAG_NAME).Set(); - cc->InputSidePackets().Tag(EMBEDDINGS_SESSION_SIDE_PACKET_TAG).Set(); + cc->InputSidePackets().Tag(GENAI_EMBEDDINGS_SESSION_SIDE_PACKET_TAG).Set(); return absl::OkStatus(); } @@ -101,7 +101,7 @@ class GenaiEmbeddingsCalculatorOV : public CalculatorBase { OVMS_PROFILE_FUNCTION(); SPDLOG_LOGGER_DEBUG(embeddings_calculator_logger, "GenaiEmbeddingsCalculatorOV [Node: {}] Open start", cc->NodeName()); auto servableMap = cc->InputSidePackets() - .Tag(EMBEDDINGS_SESSION_SIDE_PACKET_TAG) + .Tag(GENAI_EMBEDDINGS_SESSION_SIDE_PACKET_TAG) .Get(); auto it = servableMap.find(cc->NodeName()); RET_CHECK(it != servableMap.end()) << "Could not find initialized Embeddings node named: " << cc->NodeName(); @@ -122,7 +122,6 @@ class GenaiEmbeddingsCalculatorOV : public CalculatorBase { SPDLOG_LOGGER_DEBUG(embeddings_calculator_logger, "Request uri: {}", payload.uri); ov::Tensor embeddingsTensor; - //size_t received_batch_size = 1; size_t max_context_length = 1024; // default allowed input length. Otherwise, it will be read from model config.json file ov::genai::TokenizedInputs tokens; ov::Tensor typeIds; @@ -132,6 +131,8 @@ class GenaiEmbeddingsCalculatorOV : public CalculatorBase { SPDLOG_LOGGER_DEBUG(embeddings_calculator_logger, "max_position_embeddings nor max_trained_positions included in config.json. Using default value {}", max_context_length); } + // TODO: Tokenizer endpoint + ovms::EmbeddingsHandler handler(*payload.parsedJson); auto parseRequestStartTime = std::chrono::high_resolution_clock::now(); absl::Status status = handler.parseRequest(); @@ -148,14 +149,12 @@ class GenaiEmbeddingsCalculatorOV : public CalculatorBase { auto input = handler.getInput(); if (auto strings = std::get_if>(&input)) { ov::AnyMap& params = handler.getParameters(); - //size_t received_batch_size = strings->size(); if (cc->Options().truncate() && params.find("max_length") == params.end()) { params["max_length"] = max_context_length; } - - // TODO:handler.setPromptTokensUsage(attendedTokens); - + // TODO:handler.setPromptTokensUsage(attendedTokens); Need info from genai, currently private + // handler.setPromptTokensUsage(attendedTokens); ov::genai::EmbeddingResults documents_embeddings = embeddings_session->m_pipeline->embed_documents(*strings); std::cout << std::endl << "documents_embeddings:" << std::endl; printVariant(documents_embeddings); diff --git a/src/embeddings/genai_embeddings_servable.cpp b/src/embeddings/genai_embeddings_servable.cpp index 699e647597..aab02a6aa0 100644 --- a/src/embeddings/genai_embeddings_servable.cpp +++ b/src/embeddings/genai_embeddings_servable.cpp @@ -27,6 +27,8 @@ #include "openvino/opsets/opset3.hpp" #include "openvino/opsets/opset8.hpp" +#include "../json_parser.hpp" + using namespace ov::genai; using namespace ov; @@ -39,14 +41,28 @@ void GenaiEmbeddingsServable::initialize(const std::string& modelDir, const std: parsedModelsPath = fsModelsPath.string(); } - /*ov::AnyMap properties; + ov::AnyMap properties; auto status = JsonParser::parsePluginConfig(pluginConfig, properties); if (!status.ok()) { SPDLOG_ERROR("Error during embeddings node plugin_config option parsing to JSON: {}", pluginConfig); - }*/ + } + + TextEmbeddingPipeline::Config config(properties); + switch (pooling) { + case mediapipe::EmbeddingsCalculatorOVOptions_Pooling_CLS: + config.pooling_type = TextEmbeddingPipeline::PoolingType::CLS; + break; + case mediapipe::EmbeddingsCalculatorOVOptions_Pooling_LAST: + config.pooling_type = TextEmbeddingPipeline::PoolingType::LAST_TOKEN; + break; + case mediapipe::EmbeddingsCalculatorOVOptions_Pooling_MEAN: + config.pooling_type = TextEmbeddingPipeline::PoolingType::MEAN; + break; + default: + config.pooling_type = TextEmbeddingPipeline::PoolingType::CLS; + break; + } - TextEmbeddingPipeline::Config config; - config.pooling_type = TextEmbeddingPipeline::PoolingType::MEAN; m_pipeline = std::make_unique(parsedModelsPath, targetDevice, config); } diff --git a/src/mediapipe_internal/mediapipegraphexecutor.cpp b/src/mediapipe_internal/mediapipegraphexecutor.cpp index 2a35235a55..51fc4ff5c8 100644 --- a/src/mediapipe_internal/mediapipegraphexecutor.cpp +++ b/src/mediapipe_internal/mediapipegraphexecutor.cpp @@ -90,6 +90,7 @@ const std::string MediapipeGraphExecutor::PYTHON_SESSION_SIDE_PACKET_TAG = "py"; const std::string MediapipeGraphExecutor::LLM_SESSION_SIDE_PACKET_TAG = "llm"; const std::string MediapipeGraphExecutor::IMAGE_GEN_SESSION_SIDE_PACKET_TAG = "pipes"; const std::string MediapipeGraphExecutor::EMBEDDINGS_SESSION_SIDE_PACKET_TAG = "embeddings_servable"; +const std::string MediapipeGraphExecutor::GENAI_EMBEDDINGS_SESSION_SIDE_PACKET_TAG = "genai_embeddings_servable"; const std::string MediapipeGraphExecutor::RERANK_SESSION_SIDE_PACKET_TAG = "rerank_servable"; const std::string MediapipeGraphExecutor::STT_SESSION_SIDE_PACKET_TAG = "s2t_servable"; const std::string MediapipeGraphExecutor::TTS_SESSION_SIDE_PACKET_TAG = "t2s_servable"; diff --git a/src/mediapipe_internal/mediapipegraphexecutor.hpp b/src/mediapipe_internal/mediapipegraphexecutor.hpp index 7dc2ea7df9..fe5d32c398 100644 --- a/src/mediapipe_internal/mediapipegraphexecutor.hpp +++ b/src/mediapipe_internal/mediapipegraphexecutor.hpp @@ -92,6 +92,7 @@ class MediapipeGraphExecutor { static const std::string LLM_SESSION_SIDE_PACKET_TAG; static const std::string IMAGE_GEN_SESSION_SIDE_PACKET_TAG; static const std::string EMBEDDINGS_SESSION_SIDE_PACKET_TAG; + static const std::string GENAI_EMBEDDINGS_SESSION_SIDE_PACKET_TAG; static const std::string RERANK_SESSION_SIDE_PACKET_TAG; static const std::string STT_SESSION_SIDE_PACKET_TAG; static const std::string TTS_SESSION_SIDE_PACKET_TAG; @@ -154,7 +155,7 @@ class MediapipeGraphExecutor { inputSidePackets[LLM_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket(this->sidePacketMaps.genAiServableMap).At(STARTING_TIMESTAMP); inputSidePackets[IMAGE_GEN_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket(this->sidePacketMaps.imageGenPipelinesMap).At(STARTING_TIMESTAMP); inputSidePackets[EMBEDDINGS_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket(this->sidePacketMaps.embeddingsServableMap).At(STARTING_TIMESTAMP); - + inputSidePackets[GENAI_EMBEDDINGS_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket(this->sidePacketMaps.genaiEmbeddingsServableMap).At(STARTING_TIMESTAMP); inputSidePackets[RERANK_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket(this->sidePacketMaps.rerankServableMap).At(STARTING_TIMESTAMP); inputSidePackets[STT_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket(this->sidePacketMaps.sttServableMap).At(STARTING_TIMESTAMP); inputSidePackets[TTS_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket(this->sidePacketMaps.ttsServableMap).At(STARTING_TIMESTAMP); @@ -304,6 +305,7 @@ class MediapipeGraphExecutor { #endif inputSidePackets[LLM_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket(this->sidePacketMaps.genAiServableMap).At(STARTING_TIMESTAMP); inputSidePackets[EMBEDDINGS_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket(this->sidePacketMaps.embeddingsServableMap).At(STARTING_TIMESTAMP); + inputSidePackets[GENAI_EMBEDDINGS_SESSION_SIDE_PACKET_TAG] = mediapipe::MakePacket(this->sidePacketMaps.genaiEmbeddingsServableMap).At(STARTING_TIMESTAMP); // Add image generation side packet in case image generation allow for streaming } diff --git a/src/test/embeddings/genai_config_embeddings.json b/src/test/embeddings/genai_config_embeddings.json new file mode 100644 index 0000000000..910960560c --- /dev/null +++ b/src/test/embeddings/genai_config_embeddings.json @@ -0,0 +1,10 @@ +{ + "model_config_list": [], + "mediapipe_config_list": [ + { + "name":"embeddings_ov_no_norm", + "base_path":"/ovms/src/test/embeddings/", + "graph_path":"/ovms/src/test/embeddings/graph_genai_ov_no_norm.pbtxt" + } + ] +} diff --git a/src/test/embeddings/graph_genai_ov_no_norm.pbtxt b/src/test/embeddings/graph_genai_ov_no_norm.pbtxt index 4080287001..ce697cfedc 100644 --- a/src/test/embeddings/graph_genai_ov_no_norm.pbtxt +++ b/src/test/embeddings/graph_genai_ov_no_norm.pbtxt @@ -20,7 +20,7 @@ node { calculator: "GenaiEmbeddingsCalculatorOV" input_stream: "REQUEST_PAYLOAD:input" output_stream: "RESPONSE_PAYLOAD:output" - input_side_packet: "GENAI_EMBEDDINGS_NODE_RESOURCES:embeddings_servable" + input_side_packet: "GENAI_EMBEDDINGS_NODE_RESOURCES:genai_embeddings_servable" node_options: { [type.googleapis.com/mediapipe.EmbeddingsCalculatorOVOptions]: { models_path: "/ovms/src/test/llm_testing/thenlper/gte-small/ov" diff --git a/src/test/embeddingsnode_test.cpp b/src/test/embeddingsnode_test.cpp index f00a99994a..7711a22a16 100644 --- a/src/test/embeddingsnode_test.cpp +++ b/src/test/embeddingsnode_test.cpp @@ -68,6 +68,43 @@ class V3HttpTest : public ::testing::Test { } }; +class GenaiEmbeddingsHttpTest : public V3HttpTest, public ::testing::WithParamInterface { +protected: + static std::unique_ptr t; + +public: + static void SetUpTestSuite() { + std::string port = "9173"; + std::string configPath = getGenericFullPathForSrcTest("/ovms/src/test/embeddings/genai_config_embeddings.json"); + SetUpSuite(port, configPath, t); + } + + static void TearDownTestSuite() { + TearDownSuite(t); + } +}; +std::unique_ptr GenaiEmbeddingsHttpTest::t; + +TEST_P(GenaiEmbeddingsHttpTest, positiveLongInput) { + auto modelName = GetParam(); + std::string words; + for (int i = 0; i < 500; i++) { + words += "hello "; + } + std::string requestBody = "{ \"model\": \"" + modelName + "\", \"input\": \"" + words + " \"}"; + + Status status = handler->dispatchToProcessor(endpointEmbeddings, requestBody, &response, comp, responseComponents, writer, multiPartParser); + ASSERT_EQ(status, + ovms::StatusCode::OK) + << status.string(); + rapidjson::Document d; + rapidjson::ParseResult ok = d.Parse(response.c_str()); + std::cout << response << std::endl; + ASSERT_EQ(ok.Code(), 0); + ASSERT_TRUE(d["usage"]["prompt_tokens"].IsInt()); + ASSERT_EQ(d["usage"]["prompt_tokens"], 502); // 500 words + 2 special tokens +} + class EmbeddingsHttpTest : public V3HttpTest, public ::testing::WithParamInterface { protected: static std::unique_ptr t; @@ -443,6 +480,11 @@ INSTANTIATE_TEST_SUITE_P( EmbeddingsHttpTest, ::testing::Values("embeddings_ov")); +INSTANTIATE_TEST_SUITE_P( + GenaiEmbeddingsHttpTestInstances, + GenaiEmbeddingsHttpTest, + ::testing::Values("embeddings_ov_no_norm")); + static bool isMpReady(const std::string name) { ovms::Server& server = ovms::Server::instance(); const ovms::Module* servableModule = server.getModule(ovms::SERVABLE_MANAGER_MODULE_NAME); diff --git a/src/test/mediapipeflow_test.cpp b/src/test/mediapipeflow_test.cpp index 55b6ab96ed..480ddb047a 100644 --- a/src/test/mediapipeflow_test.cpp +++ b/src/test/mediapipeflow_test.cpp @@ -2683,7 +2683,7 @@ class MediapipeSerialization : public ::testing::Test { std::vector inputNames, std::vector outputNames, const PythonNodeResourcesMap& pythonNodeResourcesMap, MediapipeServableMetricReporter* mediapipeServableMetricReporter) : - MediapipeGraphExecutor(name, version, config, inputTypes, outputTypes, inputNames, outputNames, pythonNodeResourcesMap, {}, {}, {}, {}, {}, nullptr, mediapipeServableMetricReporter) {} + MediapipeGraphExecutor(name, version, config, inputTypes, outputTypes, inputNames, outputNames, pythonNodeResourcesMap, {}, {}, {}, {}, {}, {}, nullptr, mediapipeServableMetricReporter) {} }; protected: diff --git a/src/test/pythonnode_test.cpp b/src/test/pythonnode_test.cpp index 54c9acbfa1..d3b16a62ca 100644 --- a/src/test/pythonnode_test.cpp +++ b/src/test/pythonnode_test.cpp @@ -1005,7 +1005,7 @@ class MockedMediapipeGraphExecutorPy : public ovms::MediapipeGraphExecutor { const PythonNodeResourcesMap& pythonNodeResourcesMap, PythonBackend* pythonBackend, MediapipeServableMetricReporter* mediapipeServableMetricReporter) : - MediapipeGraphExecutor(name, version, config, inputTypes, outputTypes, inputNames, outputNames, pythonNodeResourcesMap, {}, {}, {}, {}, {}, pythonBackend, mediapipeServableMetricReporter) {} + MediapipeGraphExecutor(name, version, config, inputTypes, outputTypes, inputNames, outputNames, pythonNodeResourcesMap, {}, {}, {}, {}, {}, {}, pythonBackend, mediapipeServableMetricReporter) {} }; TEST_F(PythonFlowTest, SerializePyObjectWrapperToKServeResponse) { diff --git a/src/test/streaming_test.cpp b/src/test/streaming_test.cpp index 02e7c4178a..558b3789cd 100644 --- a/src/test/streaming_test.cpp +++ b/src/test/streaming_test.cpp @@ -359,7 +359,7 @@ node { this->name, this->version, config, {{"in", mediapipe_packet_type_enum::KFS_REQUEST}}, {{"out", mediapipe_packet_type_enum::KFS_RESPONSE}}, - {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; + {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; // Mock receiving 3 requests and disconnection prepareRequest(this->firstRequest, {{"in", 3.5f}}); @@ -416,7 +416,7 @@ node { this->name, this->version, config, {{"in", mediapipe_packet_type_enum::OVTENSOR}}, {{"out", mediapipe_packet_type_enum::OVTENSOR}}, - {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; + {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; // Mock receiving 3 requests and disconnection prepareRequest(this->firstRequest, {{"in", 3.5f}}); // no timestamp specified, server will assign one @@ -559,7 +559,7 @@ node { this->name, this->version, config, {{"in", mediapipe_packet_type_enum::OVTENSOR}}, {{"out", mediapipe_packet_type_enum::OVTENSOR}}, - {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; + {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; // Mock receiving 3 requests with manually (client) assigned ascending order of timestamp and disconnection prepareRequest(this->firstRequest, {{"in", 3.5f}}, 3); // first request with timestamp 3 @@ -604,7 +604,7 @@ node { this->name, this->version, config, {{"in", mediapipe_packet_type_enum::OVTENSOR}}, {{"out", mediapipe_packet_type_enum::OVTENSOR}}, - {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; + {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; // Mock only 1 request and disconnect immediately prepareRequest(this->firstRequest, {{"in", 3.5f}}); @@ -1230,7 +1230,7 @@ node { {"out3", mediapipe_packet_type_enum::OVTENSOR}}, {"in1", "in2", "in3"}, {"out1", "out2", "out3"}, - {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; + {}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; std::promise signalPromise; std::future signalFuture = signalPromise.get_future(); @@ -1282,7 +1282,7 @@ node { {"out3", mediapipe_packet_type_enum::OVTENSOR}}, {"in1", "in2", "in3"}, {"out1", "out2", "out3"}, - {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; + {}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; std::promise signalPromise; std::future signalFuture = signalPromise.get_future(); @@ -1317,7 +1317,7 @@ node { this->name, this->version, config, {{"in", mediapipe_packet_type_enum::OVTENSOR}}, {{"out", mediapipe_packet_type_enum::OVTENSOR}}, - {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; + {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; std::promise signalPromise; std::future signalFuture = signalPromise.get_future(); @@ -1351,7 +1351,7 @@ node { this->name, this->version, config, {{"in", mediapipe_packet_type_enum::OVTENSOR}}, {{"out", mediapipe_packet_type_enum::OVTENSOR}}, - {"in"}, {"wrong_name"}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; // cannot install observer due to wrong output name (should never happen due to validation) + {"in"}, {"wrong_name"}, {}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; // cannot install observer due to wrong output name (should never happen due to validation) EXPECT_CALL(this->stream, Read(_)).Times(0); EXPECT_CALL(this->stream, Write(_, _)).Times(0); @@ -1376,7 +1376,7 @@ node { this->name, this->version, config, {{"in", mediapipe_packet_type_enum::OVTENSOR}}, {{"out", mediapipe_packet_type_enum::OVTENSOR}}, - {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; + {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; prepareRequest(this->firstRequest, {}); EXPECT_CALL(this->stream, Read(_)) @@ -1404,7 +1404,7 @@ node { this->name, this->version, config, {{"in", mediapipe_packet_type_enum::OVTENSOR}}, {{"out", mediapipe_packet_type_enum::OVTENSOR}}, - {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; + {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; std::promise signalPromise; std::future signalFuture = signalPromise.get_future(); @@ -1440,7 +1440,7 @@ node { this->name, this->version, config, {{"in", mediapipe_packet_type_enum::OVTENSOR}}, {{"out", mediapipe_packet_type_enum::OVTENSOR}}, - {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; + {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; prepareRequest(this->firstRequest, {{"in", 3.5f}}); ASSERT_EQ(executor.inferStream(this->firstRequest, this->stream, this->executionContext), StatusCode::MEDIAPIPE_GRAPH_INITIALIZATION_ERROR); @@ -1463,7 +1463,7 @@ node { this->name, this->version, config, {{"in", mediapipe_packet_type_enum::OVTENSOR}}, {{"out", mediapipe_packet_type_enum::OVTENSOR}}, - {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; + {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; // Invalid request - missing data in buffer prepareInvalidRequest(this->firstRequest, {"in"}); // no timestamp specified, server will assign one @@ -1498,7 +1498,7 @@ node { this->name, this->version, config, {{"in", mediapipe_packet_type_enum::OVTENSOR}}, {{"out", mediapipe_packet_type_enum::OVTENSOR}}, - {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; + {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; std::promise signalPromise[3]; std::future signalFuture[3] = { @@ -1545,7 +1545,7 @@ node { this->name, this->version, config, {{"in", mediapipe_packet_type_enum::OVTENSOR}}, {{"out", mediapipe_packet_type_enum::OVTENSOR}}, - {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; + {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; prepareRequest(this->firstRequest, {{"in", 3.5f}}, 0); EXPECT_CALL(this->stream, Read(_)) @@ -1573,7 +1573,7 @@ node { this->name, this->version, config, {{"in", mediapipe_packet_type_enum::OVTENSOR}}, {{"out", mediapipe_packet_type_enum::OVTENSOR}}, - {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; + {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; prepareRequest(this->firstRequest, {{"in", 3.5f}}); setRequestTimestamp(this->firstRequest, std::string("not an int")); @@ -1608,7 +1608,7 @@ node { this->name, this->version, config, {{"in", mediapipe_packet_type_enum::OVTENSOR}}, {{"out", mediapipe_packet_type_enum::OVTENSOR}}, - {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; + {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; // Timestamps not allowed in stream // Expect continuity of operation and response with error message @@ -1650,7 +1650,7 @@ node { this->name, this->version, config, {{"in", mediapipe_packet_type_enum::OVTENSOR}}, {{"out", mediapipe_packet_type_enum::OVTENSOR}}, - {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; + {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; // Allowed in stream for (auto timestamp : std::vector<::mediapipe::Timestamp>{ @@ -1686,7 +1686,7 @@ node { this->name, this->version, config, {{"in", mediapipe_packet_type_enum::OVTENSOR}}, {{"out", mediapipe_packet_type_enum::OVTENSOR}}, - {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; + {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; // Mock receiving 3 requests and disconnection prepareRequestWithParam(this->firstRequest, {{"in", 3.5f}}, {"val", 65}); // request with parameter val @@ -1723,7 +1723,7 @@ node { this->name, this->version, config, {{"in", mediapipe_packet_type_enum::OVTENSOR}}, {{"out", mediapipe_packet_type_enum::OVTENSOR}}, - {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; + {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; // Mock receiving the invalid request and disconnection // Request with invalid param py (special pythons session side packet) @@ -1752,7 +1752,7 @@ node { this->name, this->version, config, {{"in", mediapipe_packet_type_enum::OVTENSOR}}, {{"out", mediapipe_packet_type_enum::OVTENSOR}}, - {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; + {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; prepareRequest(this->firstRequest, {{"in", 3.5f}}); // missing required request param EXPECT_CALL(this->stream, Read(_)).Times(0); @@ -1778,7 +1778,7 @@ node { this->name, this->version, config, {{"in", mediapipe_packet_type_enum::OVTENSOR}}, {{"out", mediapipe_packet_type_enum::OVTENSOR}}, - {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; + {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; // Mock receiving 2 requests and disconnection prepareRequest(this->firstRequest, {{"in", 3.5f}}, std::nullopt, this->name, this->version); // no timestamp specified, server will assign one @@ -1812,7 +1812,7 @@ node { this->name, this->version, config, {{"in", mediapipe_packet_type_enum::OVTENSOR}}, {{"out", mediapipe_packet_type_enum::OVTENSOR}}, - {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; + {"in"}, {"out"}, {}, {}, {}, {}, {}, {}, {}, nullptr, this->reporter.get()}; std::promise signalPromise; std::future signalFuture = signalPromise.get_future();