diff --git a/openviking/server/routers/resources.py b/openviking/server/routers/resources.py index 94e70a056..8394d6d11 100644 --- a/openviking/server/routers/resources.py +++ b/openviking/server/routers/resources.py @@ -5,7 +5,7 @@ import time import uuid from pathlib import Path -from typing import Any, Optional +from typing import Any, Dict, Optional from fastapi import APIRouter, Depends, File, Form, UploadFile from pydantic import BaseModel, model_validator @@ -91,6 +91,25 @@ class AddSkillRequest(BaseModel): telemetry: TelemetryRequest = False +class PatchResourceRequest(BaseModel): + """Request model for patching resource metadata and summaries. + + At least one of meta, abstract, or overview must be provided. + """ + + uri: str + meta: Optional[Dict[str, Any]] = None + abstract: Optional[str] = None + overview: Optional[str] = None + telemetry: TelemetryRequest = False + + @model_validator(mode="after") + def check_has_update(self): + if self.meta is None and self.abstract is None and self.overview is None: + raise ValueError("At least one of 'meta', 'abstract', or 'overview' must be provided") + return self + + def _cleanup_temp_files(temp_dir: Path, max_age_hours: int = 1): """Clean up temporary files older than max_age_hours.""" if not temp_dir.exists(): @@ -218,3 +237,28 @@ async def add_skill( result=execution.result, telemetry=execution.telemetry, ).model_dump(exclude_none=True) + + +@router.patch("/resources") +async def patch_resource( + request: PatchResourceRequest, + _ctx: RequestContext = Depends(get_request_context), +): + """Patch resource metadata and/or summaries (L0 abstract, L1 overview).""" + service = get_service() + execution = await run_operation( + operation="resources.patch_resource", + telemetry=request.telemetry, + fn=lambda: service.resources.patch_resource( + uri=request.uri, + ctx=_ctx, + meta=request.meta, + abstract=request.abstract, + overview=request.overview, + ), + ) + return Response( + status="ok", + result=execution.result, + telemetry=execution.telemetry, + ).model_dump(exclude_none=True) diff --git a/openviking/service/resource_service.py b/openviking/service/resource_service.py index 1206f70b9..e6186c624 100644 --- a/openviking/service/resource_service.py +++ b/openviking/service/resource_service.py @@ -8,11 +8,16 @@ import json import time +from datetime import datetime, timezone from typing import TYPE_CHECKING, Any, Dict, List, Optional +from openviking.core.context import Context, ContextLevel +from openviking.pyagfs.exceptions import AGFSClientError from openviking.server.identity import RequestContext from openviking.storage import VikingDBManager +from openviking.storage.id_utils import compute_record_id from openviking.storage.queuefs import get_queue_manager +from openviking.storage.queuefs.embedding_msg_converter import EmbeddingMsgConverter from openviking.storage.viking_fs import VikingFS from openviking.telemetry import get_current_telemetry from openviking.telemetry.resource_summary import ( @@ -27,6 +32,7 @@ ConflictError, DeadlineExceededError, InvalidArgumentError, + NotFoundError, NotInitializedError, ) from openviking_cli.utils import get_logger @@ -446,6 +452,97 @@ async def summarize( self._ensure_initialized() return await self._resource_processor.summarize(resource_uris, ctx, **kwargs) + async def patch_resource( + self, + uri: str, + ctx: RequestContext, + meta: Optional[Dict[str, Any]] = None, + abstract: Optional[str] = None, + overview: Optional[str] = None, + ) -> Dict[str, Any]: + """Patch resource metadata and/or summaries. + + Args: + uri: Resource URI (e.g., "viking://resources/doc_name") + ctx: Request context + meta: Updated metadata dict (merged with existing) + abstract: New L0 abstract text (rewrites .abstract.md and re-embeds) + overview: New L1 overview text (rewrites .overview.md and re-embeds) + + Returns: + Dict with uri, updated fields list, and skipped fields list. + """ + self._ensure_initialized() + + # Verify resource exists — only map "not found" to 404, let other errors propagate + try: + await self._viking_fs.stat(uri, ctx=ctx) + except AGFSClientError as e: + err_msg = str(e).lower() + if "not found" in err_msg or "no such file or directory" in err_msg: + raise NotFoundError(uri) + raise + + updated: List[str] = [] + skipped: List[str] = [] + + # --- Meta update: fetch → merge → upsert with existing vector --- + if meta is not None: + record_id = compute_record_id(ctx.account_id, uri, level=2) + records = await self._vikingdb.get(ids=[record_id], ctx=ctx) + if records: + record = dict(records[0]) + existing_meta = record.get("meta") or {} + merged_meta = {**existing_meta, **meta} + record["meta"] = merged_meta + record["updated_at"] = datetime.now(timezone.utc).isoformat() + await self._vikingdb.upsert(record, ctx=ctx) + updated.append("meta") + else: + skipped.append("meta") + + # --- Abstract (L0) update: write file + re-embed --- + if abstract is not None: + abstract_uri = f"{uri}/.abstract.md" + await self._viking_fs.write_file(abstract_uri, abstract, ctx=ctx) + + context = Context( + uri=uri, + abstract=abstract, + level=ContextLevel.ABSTRACT, + context_type="resource", + account_id=ctx.account_id, + user=ctx.user, + owner_space="", + ) + context.vectorize.text = abstract + embedding_msg = EmbeddingMsgConverter.from_context(context) + if embedding_msg: + await self._vikingdb.enqueue_embedding_msg(embedding_msg) + updated.append("abstract") + + # --- Overview (L1) update: write file + re-embed --- + if overview is not None: + overview_uri = f"{uri}/.overview.md" + await self._viking_fs.write_file(overview_uri, overview, ctx=ctx) + + context = Context( + uri=uri, + abstract=overview, + level=ContextLevel.OVERVIEW, + context_type="resource", + account_id=ctx.account_id, + user=ctx.user, + owner_space="", + ) + context.vectorize.text = overview + embedding_msg = EmbeddingMsgConverter.from_context(context) + if embedding_msg: + await self._vikingdb.enqueue_embedding_msg(embedding_msg) + updated.append("overview") + + return {"uri": uri, "updated": updated, "skipped": skipped} + async def wait_processed(self, timeout: Optional[float] = None) -> Dict[str, Any]: """Wait for all queued processing to complete. diff --git a/openviking/storage/collection_schemas.py b/openviking/storage/collection_schemas.py index f37703a78..8e6d2e1fc 100644 --- a/openviking/storage/collection_schemas.py +++ b/openviking/storage/collection_schemas.py @@ -18,6 +18,7 @@ from openviking.models.embedder.base import EmbedResult from openviking.server.identity import RequestContext, Role from openviking.storage.errors import CollectionNotFoundError +from openviking.storage.id_utils import seed_uri_for_id from openviking.storage.queuefs.embedding_msg import EmbeddingMsg from openviking.storage.queuefs.named_queue import DequeueHandlerBase from openviking.storage.viking_vector_index_backend import VikingVectorIndexBackend @@ -201,16 +202,7 @@ def consume_request_stats(cls, telemetry_id: str) -> Optional[RequestQueueStats] @staticmethod def _seed_uri_for_id(uri: str, level: Any) -> str: """Build deterministic id seed URI from canonical uri + hierarchy level.""" - try: - level_int = int(level) - except (TypeError, ValueError): - level_int = 2 - - if level_int == 0: - return uri if uri.endswith("/.abstract.md") else f"{uri}/.abstract.md" - if level_int == 1: - return uri if uri.endswith("/.overview.md") else f"{uri}/.overview.md" - return uri + return seed_uri_for_id(uri, level) async def on_dequeue(self, data: Optional[Dict[str, Any]]) -> Optional[Dict[str, Any]]: """Process dequeued message and add embedding vector(s).""" diff --git a/openviking/storage/id_utils.py b/openviking/storage/id_utils.py new file mode 100644 index 000000000..0d1a52604 --- /dev/null +++ b/openviking/storage/id_utils.py @@ -0,0 +1,48 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 +"""Deterministic record ID utilities for VectorDB. + +Provides a single source of truth for computing record IDs from URIs and levels. +Used by collection_schemas, viking_vector_index_backend, and resource_service. +""" + +import hashlib +from typing import Any + + +def seed_uri_for_id(uri: str, level: Any) -> str: + """Build deterministic id seed URI from canonical uri + hierarchy level. + + Args: + uri: Viking URI (e.g., "viking://resources/doc_name") + level: Context level (0=abstract, 1=overview, 2=detail) + + Returns: + Seed URI with appropriate suffix for the given level. + """ + try: + level_int = int(level) + except (TypeError, ValueError): + level_int = 2 + + if level_int == 0: + return uri if uri.endswith("/.abstract.md") else f"{uri}/.abstract.md" + if level_int == 1: + return uri if uri.endswith("/.overview.md") else f"{uri}/.overview.md" + return uri + + +def compute_record_id(account_id: str, uri: str, level: Any) -> str: + """Compute deterministic VectorDB record ID for a given URI and level. + + Args: + account_id: Tenant account ID + uri: Viking URI + level: Context level (0=abstract, 1=overview, 2=detail) + + Returns: + MD5 hex digest used as the VectorDB record ID. + """ + seed = seed_uri_for_id(uri, level) + id_seed = f"{account_id}:{seed}" + return hashlib.md5(id_seed.encode("utf-8")).hexdigest() diff --git a/openviking/storage/viking_vector_index_backend.py b/openviking/storage/viking_vector_index_backend.py index 43925ac4d..b36a398a4 100644 --- a/openviking/storage/viking_vector_index_backend.py +++ b/openviking/storage/viking_vector_index_backend.py @@ -9,6 +9,7 @@ from openviking.server.identity import RequestContext, Role from openviking.storage.expr import And, Eq, FilterExpr, In, Or, PathScope, RawDSL +from openviking.storage.id_utils import seed_uri_for_id from openviking.storage.vectordb.collection.collection import Collection from openviking.storage.vectordb.utils.logging_init import init_cpp_logging from openviking.storage.vectordb_adapters import create_collection_adapter @@ -868,13 +869,6 @@ async def update_uri_mapping( if not records: return False - def _seed_uri_for_id(uri: str, level: int) -> str: - if level == 0: - return uri if uri.endswith("/.abstract.md") else f"{uri}/.abstract.md" - if level == 1: - return uri if uri.endswith("/.overview.md") else f"{uri}/.overview.md" - return uri - success = False ids_to_delete: List[str] = [] for record in records: @@ -886,7 +880,7 @@ def _seed_uri_for_id(uri: str, level: int) -> str: except (TypeError, ValueError): level = 2 - seed_uri = _seed_uri_for_id(new_uri, level) + seed_uri = seed_uri_for_id(new_uri, level) id_seed = f"{ctx.account_id}:{seed_uri}" new_id = hashlib.md5(id_seed.encode("utf-8")).hexdigest() diff --git a/tests/server/test_api_patch_resource.py b/tests/server/test_api_patch_resource.py new file mode 100644 index 000000000..d2e554931 --- /dev/null +++ b/tests/server/test_api_patch_resource.py @@ -0,0 +1,147 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: Apache-2.0 + +"""Tests for PATCH /api/v1/resources endpoint.""" + +import httpx + + +async def test_patch_resource_meta(client: httpx.AsyncClient, sample_markdown_file): + """Add a resource then patch its metadata.""" + add_resp = await client.post( + "/api/v1/resources", + json={"path": str(sample_markdown_file), "reason": "test", "wait": True}, + ) + assert add_resp.status_code == 200 + root_uri = add_resp.json()["result"]["root_uri"] + + patch_resp = await client.patch( + "/api/v1/resources", + json={"uri": root_uri, "meta": {"tags": ["important"], "outdated": False}}, + ) + assert patch_resp.status_code == 200 + body = patch_resp.json() + assert body["status"] == "ok" + assert "meta" in body["result"]["updated"] + + +async def test_patch_resource_abstract(client: httpx.AsyncClient, sample_markdown_file): + """Patch L0 abstract of a resource.""" + add_resp = await client.post( + "/api/v1/resources", + json={"path": str(sample_markdown_file), "reason": "test", "wait": True}, + ) + assert add_resp.status_code == 200 + root_uri = add_resp.json()["result"]["root_uri"] + + patch_resp = await client.patch( + "/api/v1/resources", + json={"uri": root_uri, "abstract": "Custom abstract override"}, + ) + assert patch_resp.status_code == 200 + body = patch_resp.json() + assert "abstract" in body["result"]["updated"] + + +async def test_patch_resource_overview(client: httpx.AsyncClient, sample_markdown_file): + """Patch L1 overview of a resource.""" + add_resp = await client.post( + "/api/v1/resources", + json={"path": str(sample_markdown_file), "reason": "test", "wait": True}, + ) + assert add_resp.status_code == 200 + root_uri = add_resp.json()["result"]["root_uri"] + + patch_resp = await client.patch( + "/api/v1/resources", + json={"uri": root_uri, "overview": "# Custom Overview\n\nManual overview content."}, + ) + assert patch_resp.status_code == 200 + body = patch_resp.json() + assert "overview" in body["result"]["updated"] + + +async def test_patch_resource_combo(client: httpx.AsyncClient, sample_markdown_file): + """Patch meta + abstract + overview in a single request.""" + add_resp = await client.post( + "/api/v1/resources", + json={"path": str(sample_markdown_file), "reason": "test", "wait": True}, + ) + assert add_resp.status_code == 200 + root_uri = add_resp.json()["result"]["root_uri"] + + patch_resp = await client.patch( + "/api/v1/resources", + json={ + "uri": root_uri, + "meta": {"reviewed": True}, + "abstract": "Combined abstract", + "overview": "# Combined Overview\n\nAll fields at once.", + }, + ) + assert patch_resp.status_code == 200 + body = patch_resp.json() + result = body["result"] + assert "meta" in result["updated"] or "meta" in result.get("skipped", []) + assert "abstract" in result["updated"] + assert "overview" in result["updated"] + + +async def test_patch_resource_meta_skipped_when_no_record( + client: httpx.AsyncClient, sample_markdown_file +): + """Meta patch on a resource with no VectorDB record should be skipped.""" + add_resp = await client.post( + "/api/v1/resources", + json={"path": str(sample_markdown_file), "reason": "test", "wait": False}, + ) + assert add_resp.status_code == 200 + root_uri = add_resp.json()["result"]["root_uri"] + + # Patch meta immediately (vectorization may not have completed yet) + patch_resp = await client.patch( + "/api/v1/resources", + json={"uri": root_uri, "meta": {"key": "val"}}, + ) + assert patch_resp.status_code == 200 + body = patch_resp.json() + result = body["result"] + # meta should be either updated or skipped depending on timing + assert "meta" in result["updated"] or "meta" in result.get("skipped", []) + + +async def test_patch_resource_meta_after_wait(client: httpx.AsyncClient, sample_markdown_file): + """Meta patch after wait=True must deterministically land in updated.""" + add_resp = await client.post( + "/api/v1/resources", + json={"path": str(sample_markdown_file), "reason": "test", "wait": True}, + ) + assert add_resp.status_code == 200 + root_uri = add_resp.json()["result"]["root_uri"] + + patch_resp = await client.patch( + "/api/v1/resources", + json={"uri": root_uri, "meta": {"verified": True}}, + ) + assert patch_resp.status_code == 200 + body = patch_resp.json() + assert "meta" in body["result"]["updated"] + assert "meta" not in body["result"].get("skipped", []) + + +async def test_patch_resource_requires_at_least_one_field(client: httpx.AsyncClient): + """PATCH with no update fields should fail validation.""" + resp = await client.patch( + "/api/v1/resources", + json={"uri": "viking://resources/nonexistent"}, + ) + assert resp.status_code == 422 + + +async def test_patch_resource_not_found(client: httpx.AsyncClient): + """PATCH on nonexistent URI should return 404.""" + resp = await client.patch( + "/api/v1/resources", + json={"uri": "viking://resources/does_not_exist", "meta": {"key": "val"}}, + ) + assert resp.status_code == 404