-
Notifications
You must be signed in to change notification settings - Fork 1.4k
fix(semantic): ensure memory processing always reports completion status #951
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
deepakdevp
wants to merge
2
commits into
volcengine:main
Choose a base branch
from
deepakdevp:fix/memory-semantic-queue-stall
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+217
−4
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,190 @@ | ||
| # Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. | ||
| # SPDX-License-Identifier: Apache-2.0 | ||
|
|
||
| """Tests for memory semantic queue stall fix (issue #864). | ||
|
|
||
| Ensures that _process_memory_directory() error paths propagate exceptions | ||
| so that on_dequeue() always calls report_success() or report_error(). | ||
| """ | ||
|
|
||
| from unittest.mock import AsyncMock, MagicMock, patch | ||
|
|
||
| import pytest | ||
|
|
||
| from openviking.storage.queuefs.semantic_msg import SemanticMsg | ||
| from openviking.storage.queuefs.semantic_processor import SemanticProcessor | ||
|
|
||
|
|
||
| def _make_msg(uri="viking://user/memories", context_type="memory", **kwargs): | ||
| """Build a minimal SemanticMsg for testing.""" | ||
| defaults = { | ||
| "id": "test-msg-1", | ||
| "uri": uri, | ||
| "context_type": context_type, | ||
| "recursive": False, | ||
| "role": "root", | ||
| "account_id": "acc1", | ||
| "user_id": "usr1", | ||
| "agent_id": "test-agent", | ||
| "telemetry_id": "", | ||
| "target_uri": "", | ||
| "lifecycle_lock_handle_id": "", | ||
| "changes": None, | ||
| "is_code_repo": False, | ||
| } | ||
| defaults.update(kwargs) | ||
| return SemanticMsg.from_dict(defaults) | ||
|
|
||
|
|
||
| def _build_data(msg: SemanticMsg) -> dict: | ||
| """Wrap a SemanticMsg into the dict format on_dequeue expects.""" | ||
| return msg.to_dict() | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_memory_empty_dir_still_reports_success(): | ||
| """When viking_fs.ls returns an empty list, report_success() must be called.""" | ||
| processor = SemanticProcessor() | ||
|
|
||
| fake_fs = MagicMock() | ||
| fake_fs.ls = AsyncMock(return_value=[]) | ||
|
|
||
| msg = _make_msg() | ||
| data = _build_data(msg) | ||
|
|
||
| success_called = False | ||
|
|
||
| def on_success(): | ||
| nonlocal success_called | ||
| success_called = True | ||
|
|
||
| error_called = False | ||
|
|
||
| def on_error(error_msg, error_data=None): | ||
| nonlocal error_called | ||
| error_called = True | ||
|
|
||
| processor.set_callbacks(on_success, on_error) | ||
|
|
||
| with ( | ||
| patch( | ||
| "openviking.storage.queuefs.semantic_processor.get_viking_fs", | ||
| return_value=fake_fs, | ||
| ), | ||
| patch( | ||
| "openviking.storage.queuefs.semantic_processor.resolve_telemetry", | ||
| return_value=None, | ||
| ), | ||
| ): | ||
| await processor.on_dequeue(data) | ||
|
|
||
| assert success_called, "report_success() was not called for empty memory directory" | ||
| assert not error_called, "report_error() should not be called for empty directory" | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_memory_ls_error_reports_error(): | ||
| """When viking_fs.ls raises a filesystem error, report_error() must be called. | ||
|
|
||
| Uses a real classify_api_error (no mock) — FileNotFoundError is classified | ||
| as permanent by the real classifier, so the processor calls report_error(). | ||
| """ | ||
| processor = SemanticProcessor() | ||
|
|
||
| fake_fs = MagicMock() | ||
| fake_fs.ls = AsyncMock(side_effect=FileNotFoundError("/memories not found")) | ||
|
|
||
| msg = _make_msg() | ||
| data = _build_data(msg) | ||
|
|
||
| success_called = False | ||
|
|
||
| def on_success(): | ||
| nonlocal success_called | ||
| success_called = True | ||
|
|
||
| error_called = False | ||
| error_info = {} | ||
|
|
||
| def on_error(error_msg, error_data=None): | ||
| nonlocal error_called, error_info | ||
| error_called = True | ||
| error_info["msg"] = error_msg | ||
|
|
||
| processor.set_callbacks(on_success, on_error) | ||
|
|
||
| with ( | ||
| patch( | ||
| "openviking.storage.queuefs.semantic_processor.get_viking_fs", | ||
| return_value=fake_fs, | ||
| ), | ||
| patch( | ||
| "openviking.storage.queuefs.semantic_processor.resolve_telemetry", | ||
| return_value=None, | ||
| ), | ||
| ): | ||
| await processor.on_dequeue(data) | ||
|
|
||
| assert error_called, "report_error() was not called when ls() raised an exception" | ||
| assert not success_called, "report_success() should not be called on ls() error" | ||
| assert "/memories not found" in error_info["msg"] | ||
|
|
||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_memory_write_error_reports_error(): | ||
| """When abstract/overview write raises PermissionError, report_error() is called. | ||
|
|
||
| Exercises the write failure path with real classify_api_error — PermissionError | ||
| is classified as permanent, so the processor calls report_error(). | ||
| """ | ||
| processor = SemanticProcessor() | ||
|
|
||
| fake_fs = MagicMock() | ||
| fake_fs.ls = AsyncMock(return_value=[{"name": "file1.md", "isDir": False}]) | ||
| fake_fs.read_file = AsyncMock(return_value="some content") | ||
| fake_fs.write_file = AsyncMock(side_effect=PermissionError("Permission denied")) | ||
|
|
||
| msg = _make_msg() | ||
| data = _build_data(msg) | ||
|
|
||
| success_called = False | ||
|
|
||
| def on_success(): | ||
| nonlocal success_called | ||
| success_called = True | ||
|
|
||
| error_called = False | ||
| error_info = {} | ||
|
|
||
| def on_error(error_msg, error_data=None): | ||
| nonlocal error_called, error_info | ||
| error_called = True | ||
| error_info["msg"] = error_msg | ||
|
|
||
| processor.set_callbacks(on_success, on_error) | ||
|
|
||
| with ( | ||
| patch( | ||
| "openviking.storage.queuefs.semantic_processor.get_viking_fs", | ||
| return_value=fake_fs, | ||
| ), | ||
| patch( | ||
| "openviking.storage.queuefs.semantic_processor.resolve_telemetry", | ||
| return_value=None, | ||
| ), | ||
| patch.object( | ||
| processor, | ||
| "_generate_single_file_summary", | ||
| new=AsyncMock(return_value={"name": "file1.md", "summary": "test summary"}), | ||
| ), | ||
| patch.object( | ||
| processor, | ||
| "_generate_overview", | ||
| new=AsyncMock(return_value="# Overview\ntest overview"), | ||
| ), | ||
| ): | ||
| await processor.on_dequeue(data) | ||
|
|
||
| assert error_called, "report_error() was not called when write() raised PermissionError" | ||
| assert not success_called, "report_success() should not be called on write error" | ||
| assert "Permission denied" in error_info["msg"] |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
[Bug] (blocking) Re-raising here fixes the silent early return, but it does not actually make this failure path report an error in production.
on_dequeue()still routes non-permanentexceptions through the re-enqueue branch, andclassify_api_error()only recognizes 401/403/5xx/timeout patterns. That means common filesystem failures here, such asFileNotFoundError,Permission denied, or local I/O errors, are classified asunknown, re-enqueued, and ultimately counted as success instead ofreport_error(). So this PR removes the stuckin_progresssymptom, but it does not guarantee the intended issue behavior from the PR description, and it can turn invalid memory URIs into infinite retries. Please either classify these directory read/write failures as permanent at the source, or extend the error-classification path so local filesystem failures are reported as queue errors rather than retried forever.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Addressed in b8b504b. Added
_PERMANENT_IO_ERRORS = (FileNotFoundError, PermissionError, IsADirectoryError, NotADirectoryError)with anisinstancecheck at the top ofclassify_api_error(), so filesystem errors are classified as"permanent"and hitreport_error()instead of being re-enqueued. This prevents both the infinite retry loop and the false success counting.