Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 43 additions & 9 deletions openviking/storage/queuefs/semantic_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""SemanticProcessor: Processes messages from SemanticQueue, generates .abstract.md and .overview.md."""

import asyncio
import random
import threading
from contextlib import nullcontext
from dataclasses import dataclass, field
Expand Down Expand Up @@ -186,6 +187,43 @@ def _detect_file_type(self, file_name: str) -> str:
# Default to other
return FILE_TYPE_OTHER

async def _llm_with_retry(
self,
prompt: str,
llm_sem: asyncio.Semaphore,
max_retries: int = 3,
) -> str:
"""Call VLM with exponential backoff on rate limit errors."""
vlm = get_openviking_config().vlm
for attempt in range(max_retries + 1):
try:
async with llm_sem:
return await vlm.get_completion_async(prompt)
except Exception as e:
error_str = str(e)
is_rate_limit = (
"429" in error_str
or "TooManyRequests" in error_str
or "RateLimit" in error_str
or "RequestBurstTooFast" in error_str
)
if is_rate_limit and attempt < max_retries:
delay = min(0.5 * (2**attempt), 8.0) + random.uniform(0, 0.5)
logger.warning(
"LLM rate limited (attempt %d/%d), retrying in %.1fs",
attempt + 1,
max_retries,
delay,
)
await asyncio.sleep(delay)
else:
if attempt > 0:
logger.error("LLM call failed after %d attempts: %s", attempt + 1, e)
else:
logger.error("LLM call failed: %s", e)
return ""
return ""

async def _check_file_content_changed(
self, file_path: str, target_file: str, ctx: Optional[RequestContext] = None
) -> bool:
Expand Down Expand Up @@ -669,8 +707,7 @@ async def _generate_text_summary(
"semantic.code_ast_summary",
{"file_name": file_name, "skeleton": skeleton_text},
)
async with llm_sem:
summary = await vlm.get_completion_async(prompt)
summary = await self._llm_with_retry(prompt, llm_sem)
return {"name": file_name, "summary": summary.strip()}
if skeleton_text is None:
logger.info("AST unsupported language, fallback to LLM: %s", file_path)
Expand All @@ -682,8 +719,7 @@ async def _generate_text_summary(
"semantic.code_summary",
{"file_name": file_name, "content": content},
)
async with llm_sem:
summary = await vlm.get_completion_async(prompt)
summary = await self._llm_with_retry(prompt, llm_sem)
return {"name": file_name, "summary": summary.strip()}

elif file_type == FILE_TYPE_DOCUMENTATION:
Expand All @@ -696,8 +732,7 @@ async def _generate_text_summary(
{"file_name": file_name, "content": content},
)

async with llm_sem:
summary = await vlm.get_completion_async(prompt)
summary = await self._llm_with_retry(prompt, llm_sem)
return {"name": file_name, "summary": summary.strip()}

async def _generate_single_file_summary(
Expand Down Expand Up @@ -912,8 +947,6 @@ async def _single_generate_overview(
"""Generate overview from a single prompt (small directories)."""
import re

vlm = get_openviking_config().vlm

try:
prompt = render_prompt(
"semantic.overview_generation",
Expand All @@ -924,7 +957,8 @@ async def _single_generate_overview(
},
)

overview = await vlm.get_completion_async(prompt)
llm_sem = asyncio.Semaphore(self.max_concurrent_llm)
overview = await self._llm_with_retry(prompt, llm_sem)

# Post-process: replace [number] with actual file name
def replace_index(match):
Expand Down
Loading