Conversation
There was a problem hiding this comment.
Pull request overview
This PR updates the legacy LLM and VLM servables to react to client disconnects by signaling disconnection in the execution context and attempting to cancel streaming generation via the OpenVINO GenAI TextStreamer callback.
Changes:
- Added
signalDisconnection()helper on legacy execution contexts to set a disconnect flag and notify waiters. - Updated legacy streaming callbacks to return
ov::genai::StreamingStatus::CANCELwhen a disconnect is signaled. - Wired disconnection callbacks (and an immediate disconnect check) to call
signalDisconnection().
Reviewed changes
Copilot reviewed 4 out of 4 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
src/llm/visual_language_model/legacy/servable.hpp |
Adds signalDisconnection() helper to VLM legacy execution context. |
src/llm/visual_language_model/legacy/servable.cpp |
Cancels streaming via TextStreamer callback on disconnect; uses signalDisconnection() when disconnect occurs. |
src/llm/language_model/legacy/servable.hpp |
Adds signalDisconnection() helper to LLM legacy execution context. |
src/llm/language_model/legacy/servable.cpp |
Cancels streaming via TextStreamer callback on disconnect; uses signalDisconnection() when disconnect occurs. |
Comments suppressed due to low confidence (2)
src/llm/language_model/legacy/servable.cpp:98
- There are unit tests for legacy servable behavior in
src/test/llm/llmnode_test.cpp, but nothing covering the new disconnect-driven cancellation behavior. Please add a test that exercises the disconnect path (e.g., ensuring the streamer callback returns CANCEL whenclientDisconnectedis set, and/or that a registered disconnection callback stops an in-flight streaming request).
if (legacyExecutionContext->apiHandler->isStream()) {
legacyExecutionContext->lastStreamerCallbackOutput = ""; // initialize with empty string
auto callback = [& executionInProgress = legacyExecutionContext->executionInProgress, &mutex = legacyExecutionContext->mutex, &lastStreamerCallbackOutput = legacyExecutionContext->lastStreamerCallbackOutput, &clientDisconnected = legacyExecutionContext->clientDisconnected](std::string text) {
SPDLOG_LOGGER_TRACE(llm_calculator_logger, "Streamer callback executed with text: [{}]", text);
if (clientDisconnected.load()) {
executionInProgress.notify_one();
return ov::genai::StreamingStatus::CANCEL;
}
{
std::lock_guard<std::mutex> lock(mutex);
lastStreamerCallbackOutput += text;
executionInProgress.notify_one();
}
return ov::genai::StreamingStatus::RUNNING;
};
src/llm/visual_language_model/legacy/servable.cpp:109
- Please add test coverage for the new disconnect cancellation behavior for the VLM legacy servable as well (streamer callback returning CANCEL / disconnect callback stopping generation). Existing LLM-node tests indicate this repo expects gtest coverage for servable logic changes.
if (legacyExecutionContext->apiHandler->isStream()) {
legacyExecutionContext->lastStreamerCallbackOutput = ""; // initialize with empty string
auto callback = [& executionInProgress = legacyExecutionContext->executionInProgress, &mutex = legacyExecutionContext->mutex, &lastStreamerCallbackOutput = legacyExecutionContext->lastStreamerCallbackOutput, &clientDisconnected = legacyExecutionContext->clientDisconnected](std::string text) {
SPDLOG_LOGGER_TRACE(llm_calculator_logger, "Streamer callback executed with text: [{}]", text);
if (clientDisconnected.load()) {
executionInProgress.notify_one();
return ov::genai::StreamingStatus::CANCEL;
}
{
std::lock_guard<std::mutex> lock(mutex);
lastStreamerCallbackOutput += text;
executionInProgress.notify_one();
}
return ov::genai::StreamingStatus::RUNNING;
};
legacyExecutionContext->textStreamer = std::make_shared<ov::genai::TextStreamer>(getProperties()->tokenizer, callback);
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| if (legacyExecutionContext->apiHandler->isStream()) { | ||
| legacyExecutionContext->lastStreamerCallbackOutput = ""; // initialize with empty string | ||
| auto callback = [& executionInProgress = legacyExecutionContext->executionInProgress, &mutex = legacyExecutionContext->mutex, &lastStreamerCallbackOutput = legacyExecutionContext->lastStreamerCallbackOutput](std::string text) { | ||
| auto callback = [& executionInProgress = legacyExecutionContext->executionInProgress, &mutex = legacyExecutionContext->mutex, &lastStreamerCallbackOutput = legacyExecutionContext->lastStreamerCallbackOutput, &clientDisconnected = legacyExecutionContext->clientDisconnected](std::string text) { | ||
| SPDLOG_LOGGER_TRACE(llm_calculator_logger, "Streamer callback executed with text: [{}]", text); | ||
| if (clientDisconnected.load()) { |
There was a problem hiding this comment.
The new cancellation path is only active for streaming requests (where textStreamer/callback exist). For non-streaming requests textStreamer stays null, so a mid-generation disconnect will still let pipe->generate(...) run to completion and consume compute. If the intent is to stop generation on disconnection in all cases, consider wiring a cancellation mechanism for unary requests too (e.g., create a lightweight streamer callback that only checks clientDisconnected, or use a stop-handle approach like the continuous batching servable).
| if (legacyExecutionContext->apiHandler->isStream()) { | ||
| legacyExecutionContext->lastStreamerCallbackOutput = ""; // initialize with empty string | ||
| auto callback = [& executionInProgress = legacyExecutionContext->executionInProgress, &mutex = legacyExecutionContext->mutex, &lastStreamerCallbackOutput = legacyExecutionContext->lastStreamerCallbackOutput](std::string text) { | ||
| auto callback = [& executionInProgress = legacyExecutionContext->executionInProgress, &mutex = legacyExecutionContext->mutex, &lastStreamerCallbackOutput = legacyExecutionContext->lastStreamerCallbackOutput, &clientDisconnected = legacyExecutionContext->clientDisconnected](std::string text) { | ||
| SPDLOG_LOGGER_TRACE(llm_calculator_logger, "Streamer callback executed with text: [{}]", text); | ||
| if (clientDisconnected.load()) { | ||
| executionInProgress.notify_one(); | ||
| return ov::genai::StreamingStatus::CANCEL; | ||
| } |
There was a problem hiding this comment.
Same as LLM legacy servable: cancellation is only wired for streaming requests. In unary mode textStreamer is not created, so a mid-generation disconnect will not stop pipe->generate(...) early and can waste accelerator/CPU time.
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 4 out of 4 changed files in this pull request and generated no new comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
No description provided.