Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions openviking/storage/queuefs/semantic_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -411,8 +411,7 @@ async def _process_memory_directory(self, msg: SemanticMsg) -> None:
try:
entries = await viking_fs.ls(dir_uri, ctx=ctx)
except Exception as e:
logger.warning(f"Failed to list memory directory {dir_uri}: {e}")
return
raise RuntimeError(f"Failed to list memory directory {dir_uri}: {e}") from e
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Bug] (blocking) Re-raising here fixes the silent early return, but it does not actually make this failure path report an error in production. on_dequeue() still routes non-permanent exceptions through the re-enqueue branch, and classify_api_error() only recognizes 401/403/5xx/timeout patterns. That means common filesystem failures here, such as FileNotFoundError, Permission denied, or local I/O errors, are classified as unknown, re-enqueued, and ultimately counted as success instead of report_error(). So this PR removes the stuck in_progress symptom, but it does not guarantee the intended issue behavior from the PR description, and it can turn invalid memory URIs into infinite retries. Please either classify these directory read/write failures as permanent at the source, or extend the error-classification path so local filesystem failures are reported as queue errors rather than retried forever.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in b8b504b. Added _PERMANENT_IO_ERRORS = (FileNotFoundError, PermissionError, IsADirectoryError, NotADirectoryError) with an isinstance check at the top of classify_api_error(), so filesystem errors are classified as "permanent" and hit report_error() instead of being re-enqueued. This prevents both the infinite retry loop and the false success counting.


file_paths: List[str] = []
for entry in entries:
Expand Down Expand Up @@ -496,8 +495,7 @@ async def _gen(idx: int, file_path: str) -> None:
await viking_fs.write_file(f"{dir_uri}/.abstract.md", abstract, ctx=ctx)
logger.info(f"Generated abstract.md and overview.md for {dir_uri}")
except Exception as e:
logger.error(f"Failed to write abstract/overview for {dir_uri}: {e}")
return
raise RuntimeError(f"Failed to write abstract/overview for {dir_uri}: {e}") from e

await self._vectorize_directory(
uri=dir_uri,
Expand Down
7 changes: 7 additions & 0 deletions openviking/utils/circuit_breaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
# --- Error classification ---

_PERMANENT_PATTERNS = ("403", "401", "Forbidden", "Unauthorized", "AccountOverdue")
_PERMANENT_IO_ERRORS = (FileNotFoundError, PermissionError, IsADirectoryError, NotADirectoryError)

_TRANSIENT_PATTERNS = (
"429",
"500",
Expand All @@ -40,6 +42,11 @@ def classify_api_error(error: Exception) -> str:
"transient" — 429/5xx/timeout, safe to retry.
"unknown" — unrecognized, treated as transient by callers.
"""
# Check exception type first — filesystem/IO errors are always permanent
for exc in (error, getattr(error, "__cause__", None)):
if exc is not None and isinstance(exc, _PERMANENT_IO_ERRORS):
return "permanent"

texts = [str(error)]
if error.__cause__ is not None:
texts.append(str(error.__cause__))
Expand Down
190 changes: 190 additions & 0 deletions tests/storage/test_memory_semantic_stall.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd.
# SPDX-License-Identifier: Apache-2.0

"""Tests for memory semantic queue stall fix (issue #864).

Ensures that _process_memory_directory() error paths propagate exceptions
so that on_dequeue() always calls report_success() or report_error().
"""

from unittest.mock import AsyncMock, MagicMock, patch

import pytest

from openviking.storage.queuefs.semantic_msg import SemanticMsg
from openviking.storage.queuefs.semantic_processor import SemanticProcessor


def _make_msg(uri="viking://user/memories", context_type="memory", **kwargs):
"""Build a minimal SemanticMsg for testing."""
defaults = {
"id": "test-msg-1",
"uri": uri,
"context_type": context_type,
"recursive": False,
"role": "root",
"account_id": "acc1",
"user_id": "usr1",
"agent_id": "test-agent",
"telemetry_id": "",
"target_uri": "",
"lifecycle_lock_handle_id": "",
"changes": None,
"is_code_repo": False,
}
defaults.update(kwargs)
return SemanticMsg.from_dict(defaults)


def _build_data(msg: SemanticMsg) -> dict:
"""Wrap a SemanticMsg into the dict format on_dequeue expects."""
return msg.to_dict()


@pytest.mark.asyncio
async def test_memory_empty_dir_still_reports_success():
"""When viking_fs.ls returns an empty list, report_success() must be called."""
processor = SemanticProcessor()

fake_fs = MagicMock()
fake_fs.ls = AsyncMock(return_value=[])

msg = _make_msg()
data = _build_data(msg)

success_called = False

def on_success():
nonlocal success_called
success_called = True

error_called = False

def on_error(error_msg, error_data=None):
nonlocal error_called
error_called = True

processor.set_callbacks(on_success, on_error)

with (
patch(
"openviking.storage.queuefs.semantic_processor.get_viking_fs",
return_value=fake_fs,
),
patch(
"openviking.storage.queuefs.semantic_processor.resolve_telemetry",
return_value=None,
),
):
await processor.on_dequeue(data)

assert success_called, "report_success() was not called for empty memory directory"
assert not error_called, "report_error() should not be called for empty directory"


@pytest.mark.asyncio
async def test_memory_ls_error_reports_error():
"""When viking_fs.ls raises a filesystem error, report_error() must be called.

Uses a real classify_api_error (no mock) — FileNotFoundError is classified
as permanent by the real classifier, so the processor calls report_error().
"""
processor = SemanticProcessor()

fake_fs = MagicMock()
fake_fs.ls = AsyncMock(side_effect=FileNotFoundError("/memories not found"))

msg = _make_msg()
data = _build_data(msg)

success_called = False

def on_success():
nonlocal success_called
success_called = True

error_called = False
error_info = {}

def on_error(error_msg, error_data=None):
nonlocal error_called, error_info
error_called = True
error_info["msg"] = error_msg

processor.set_callbacks(on_success, on_error)

with (
patch(
"openviking.storage.queuefs.semantic_processor.get_viking_fs",
return_value=fake_fs,
),
patch(
"openviking.storage.queuefs.semantic_processor.resolve_telemetry",
return_value=None,
),
):
await processor.on_dequeue(data)

assert error_called, "report_error() was not called when ls() raised an exception"
assert not success_called, "report_success() should not be called on ls() error"
assert "/memories not found" in error_info["msg"]


@pytest.mark.asyncio
async def test_memory_write_error_reports_error():
"""When abstract/overview write raises PermissionError, report_error() is called.

Exercises the write failure path with real classify_api_error — PermissionError
is classified as permanent, so the processor calls report_error().
"""
processor = SemanticProcessor()

fake_fs = MagicMock()
fake_fs.ls = AsyncMock(return_value=[{"name": "file1.md", "isDir": False}])
fake_fs.read_file = AsyncMock(return_value="some content")
fake_fs.write_file = AsyncMock(side_effect=PermissionError("Permission denied"))

msg = _make_msg()
data = _build_data(msg)

success_called = False

def on_success():
nonlocal success_called
success_called = True

error_called = False
error_info = {}

def on_error(error_msg, error_data=None):
nonlocal error_called, error_info
error_called = True
error_info["msg"] = error_msg

processor.set_callbacks(on_success, on_error)

with (
patch(
"openviking.storage.queuefs.semantic_processor.get_viking_fs",
return_value=fake_fs,
),
patch(
"openviking.storage.queuefs.semantic_processor.resolve_telemetry",
return_value=None,
),
patch.object(
processor,
"_generate_single_file_summary",
new=AsyncMock(return_value={"name": "file1.md", "summary": "test summary"}),
),
patch.object(
processor,
"_generate_overview",
new=AsyncMock(return_value="# Overview\ntest overview"),
),
):
await processor.on_dequeue(data)

assert error_called, "report_error() was not called when write() raised PermissionError"
assert not success_called, "report_success() should not be called on write error"
assert "Permission denied" in error_info["msg"]
18 changes: 18 additions & 0 deletions tests/utils/test_circuit_breaker.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,24 @@ def record_failures():
assert cb._failure_count == 200


def test_classify_filesystem_errors_as_permanent():
from openviking.utils.circuit_breaker import classify_api_error

assert classify_api_error(FileNotFoundError("/path/not/found")) == "permanent"
assert classify_api_error(PermissionError("Permission denied")) == "permanent"
assert classify_api_error(IsADirectoryError("Is a directory")) == "permanent"
assert classify_api_error(NotADirectoryError("Not a directory")) == "permanent"


def test_classify_chained_filesystem_error_as_permanent():
from openviking.utils.circuit_breaker import classify_api_error

cause = FileNotFoundError("/missing")
wrapper = RuntimeError("storage layer failed")
wrapper.__cause__ = cause
assert classify_api_error(wrapper) == "permanent"


def test_classify_permanent_errors():
from openviking.utils.circuit_breaker import classify_api_error

Expand Down
Loading