-
Notifications
You must be signed in to change notification settings - Fork 0
Web ui work #24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Web ui work #24
Changes from all commits
Commits
Show all changes
2 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,90 @@ | ||
| """Simple in-memory job store for long-running RAG web operations. | ||
|
|
||
| Jobs run in background threads. Route handlers poll for status via HTMX | ||
| (`hx-trigger="every 2s"`). The job status endpoint stops including the | ||
| polling trigger once the job reaches a terminal state (done or error). | ||
| """ | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import threading | ||
| import time | ||
| import uuid | ||
| from typing import TYPE_CHECKING, Any | ||
|
|
||
| if TYPE_CHECKING: | ||
| from collections.abc import Callable | ||
|
|
||
|
|
||
| class Job: | ||
| """In-memory representation of a background job.""" | ||
|
|
||
| __slots__ = ("error", "finished_at", "id", "result", "started_at", "status") | ||
|
|
||
| def __init__(self, job_id: str) -> None: | ||
| self.id = job_id | ||
| self.status: str = "pending" | ||
| self.result: Any = None | ||
| self.error: str | None = None | ||
| self.started_at: float = time.monotonic() | ||
| self.finished_at: float | None = None | ||
|
|
||
| def to_dict(self) -> dict[str, Any]: | ||
| elapsed = round((self.finished_at or time.monotonic()) - self.started_at, 2) | ||
| return { | ||
| "id": self.id, | ||
| "status": self.status, | ||
| "result": self.result, | ||
| "error": self.error, | ||
| "elapsed_s": elapsed, | ||
| } | ||
|
|
||
|
|
||
| class JobStore: | ||
| """Thread-safe store for background jobs.""" | ||
|
|
||
| MAX_JOBS: int = 50 | ||
|
|
||
| def __init__(self) -> None: | ||
| self._jobs: dict[str, Job] = {} | ||
| self._lock = threading.Lock() | ||
|
|
||
| def submit(self, fn: Callable[..., Any], *args: object, **kwargs: object) -> str: | ||
| """Submit a callable as a background job; returns a job_id immediately.""" | ||
| job_id = uuid.uuid4().hex[:12] | ||
| job = Job(job_id) | ||
| with self._lock: | ||
| self._evict_old() | ||
| if len(self._jobs) >= self.MAX_JOBS: | ||
| msg = f"Job store is full ({self.MAX_JOBS} active jobs); please wait for a job to finish" | ||
| raise RuntimeError(msg) | ||
| self._jobs[job_id] = job | ||
|
|
||
| def _run() -> None: | ||
| job.status = "running" | ||
| try: | ||
| job.result = fn(*args, **kwargs) | ||
| job.status = "done" | ||
| except Exception as exc: | ||
| job.error = str(exc) | ||
| job.status = "error" | ||
| finally: | ||
| job.finished_at = time.monotonic() | ||
|
|
||
| threading.Thread(target=_run, daemon=True).start() | ||
| return job_id | ||
|
|
||
| def get(self, job_id: str) -> dict[str, Any] | None: | ||
| """Return job state dict, or None if job_id is unknown.""" | ||
| with self._lock: | ||
| job = self._jobs.get(job_id) | ||
| return job.to_dict() if job else None | ||
|
|
||
| def _evict_old(self) -> None: | ||
| """Remove oldest finished jobs when over the cap (called under lock).""" | ||
| if len(self._jobs) <= self.MAX_JOBS: | ||
| return | ||
| finished = [j for j in self._jobs.values() if j.status in {"done", "error"}] | ||
| finished.sort(key=lambda j: j.finished_at or 0) | ||
| for j in finished[: len(self._jobs) - self.MAX_JOBS]: | ||
| del self._jobs[j.id] | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,88 @@ | ||
| """Saveable preset profiles for runtime retrieval settings.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import dataclasses | ||
| import json | ||
| from typing import TYPE_CHECKING | ||
|
|
||
| if TYPE_CHECKING: | ||
| from pathlib import Path | ||
|
|
||
| from core.config import ConversationRuntimeConfig | ||
|
|
||
| PROFILE_FIELDS: list[str] = [ | ||
| "use_mmr", | ||
| "rag_rerank_enabled", | ||
| "rag_sentence_compression_enabled", | ||
| "rag_multi_query_enabled", | ||
| "rag_k", | ||
| "rag_k_mes", | ||
| "debug_context", | ||
| ] | ||
|
|
||
|
|
||
| class ProfileStore: | ||
| """Persist and apply named retrieval-setting presets stored in a JSON file.""" | ||
|
|
||
| def __init__(self, path: Path) -> None: | ||
| self._path = path | ||
|
|
||
| def _load(self) -> dict[str, dict[str, object]]: | ||
| if not self._path.exists(): | ||
| return {} | ||
| try: | ||
| data = json.loads(self._path.read_text(encoding="utf-8")) | ||
| return data if isinstance(data, dict) else {} | ||
| except Exception: | ||
| return {} | ||
|
|
||
| def _save(self, data: dict[str, dict[str, object]]) -> None: | ||
| self._path.parent.mkdir(parents=True, exist_ok=True) | ||
| self._path.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") | ||
|
|
||
| def list_profiles(self) -> list[str]: | ||
| """Return sorted list of saved profile names.""" | ||
| return sorted(self._load().keys()) | ||
|
|
||
| def save_profile(self, name: str, config: ConversationRuntimeConfig) -> None: | ||
| """Snapshot the profile-eligible fields from *config* under *name*.""" | ||
| data = self._load() | ||
| data[name] = {field: getattr(config, field) for field in PROFILE_FIELDS} | ||
| self._save(data) | ||
|
|
||
| def get_profile(self, name: str) -> dict[str, object]: | ||
| """Return the stored settings dict for *name*.""" | ||
| data = self._load() | ||
| if name not in data: | ||
| msg = f"Profile {name!r} not found" | ||
| raise KeyError(msg) | ||
| return dict(data[name]) | ||
|
|
||
| def apply_profile( | ||
| self, name: str, config: ConversationRuntimeConfig | ||
| ) -> tuple[ConversationRuntimeConfig, list[str]]: | ||
| """Return a new config with profile values applied and list of changed field names.""" | ||
| profile = self.get_profile(name) | ||
| validated_updates: dict[str, object] = {} | ||
| changed: list[str] = [] | ||
| for field, value in profile.items(): | ||
| if field not in PROFILE_FIELDS: | ||
| continue | ||
| current = getattr(config, field, None) | ||
| if current != value: | ||
| validated_updates[field] = value | ||
| changed.append(field) | ||
| if validated_updates: | ||
| config = dataclasses.replace(config, **validated_updates) | ||
| return config, changed | ||
|
|
||
| def delete_profile(self, name: str) -> None: | ||
| """Remove *name* from the store (no-op if not found).""" | ||
| data = self._load() | ||
| data.pop(name, None) | ||
| self._save(data) | ||
|
|
||
| def current_values(self, config: ConversationRuntimeConfig) -> dict[str, object]: | ||
| """Return current values of the profile-eligible fields from *config*.""" | ||
| return {field: getattr(config, field) for field in PROFILE_FIELDS} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_evict_old()only evicts jobs in terminal states (done/error). If the system submits > MAX_JOBS jobs that remainpending/running(or if finished jobs < overflow), the store can grow beyondMAX_JOBSindefinitely. Consider either enforcing a hard cap (reject new submissions when over capacity) or also evicting oldest non-running jobs (e.g.,pending) to keep memory bounded.