diff --git a/readme.md b/readme.md index 1c4074b..8173f1f 100644 --- a/readme.md +++ b/readme.md @@ -406,6 +406,14 @@ empty, duplicate, stale, oversized, or private-looking docs before they confuse the pipeline. Use **Benchmark** to compare whole-KB injection vs RAG and inspect the concrete retrieved chunks for a stage/query. +The KB dialog also writes `prompts//knowledge_base/governance.yaml`. +Use `canonical_files` for curated voice anchors that should be prioritized and +labeled in prompts. Use `ignored_files` for stale, duplicate, private-looking, +or oversized files that should remain auditable but stay out of generation. +Unignored stale/private/duplicate/oversized findings surface as a concise run +warning before generation starts, so future agents should treat them as review +work instead of silently trusting the context. + **Persona** files at `prompts//personas/*.md` appear beside the built-in persona variants in Generation Settings. diff --git a/tests/test_kb_audit.py b/tests/test_kb_audit.py index db6c689..e25ca82 100644 --- a/tests/test_kb_audit.py +++ b/tests/test_kb_audit.py @@ -52,6 +52,43 @@ def test_duplicate_empty_private_and_stale_warnings(tmp_prompts_dir): assert "empty_file" in codes assert "private_marker" in codes assert "stale_file" in codes + assert all(warning.action for warning in audit.warnings) + + +def test_governance_marks_canonical_and_ignored_files(tmp_prompts_dir): + kb = tmp_prompts_dir / "alice" / "knowledge_base" + kb.mkdir(parents=True) + (kb / "voice.md").write_text("voice") + (kb / "old.md").write_text("old") + (kb / "governance.yaml").write_text( + "canonical_files:\n" + " - voice.md\n" + "ignored_files:\n" + " - old.md\n" + ) + + audit = kb_audit.audit_profile("alice") + docs = {doc.name: doc for doc in audit.documents} + + assert docs["voice"].canonical is True + assert docs["old"].ignored is True + assert any(warning.code == "ignored_file" for warning in audit.warnings) + + +def test_generation_warnings_exclude_ignored_files(tmp_prompts_dir): + kb = tmp_prompts_dir / "alice" / "knowledge_base" + kb.mkdir(parents=True) + (kb / "private-token.md").write_text("keep local") + (kb / "confidential.md").write_text("ignored") + (kb / "governance.yaml").write_text( + "ignored_files:\n" + " - confidential.md\n" + ) + + warnings = kb_audit.generation_warnings("alice") + names = {warning.path.rsplit("/", 1)[-1] for warning in warnings if warning.path} + + assert names == {"private-token.md"} def test_to_dict_includes_summary(tmp_prompts_dir): diff --git a/tests/test_prompts.py b/tests/test_prompts.py index d5631c9..da98229 100644 --- a/tests/test_prompts.py +++ b/tests/test_prompts.py @@ -49,6 +49,34 @@ def test_loads_md_and_txt(self, tmp_prompts_dir): loaded = prompts.load_knowledge_base("alice") assert loaded == {"Voice Guide": "friendly tone", "Style": "short sentences"} + def test_governance_ignores_files_and_labels_canonical_anchors(self, tmp_prompts_dir): + kb = tmp_prompts_dir / "alice" / "knowledge_base" + kb.mkdir(parents=True) + (kb / "voice.md").write_text("canonical tone") + (kb / "notes.md").write_text("usable notes") + (kb / "old.md").write_text("ignored notes") + (kb / "governance.yaml").write_text( + "canonical_files:\n" + " - voice.md\n" + "ignored_files:\n" + " - old.md\n" + ) + + loaded = prompts.load_knowledge_base("alice") + + assert list(loaded) == ["Canonical Voice Anchor: Voice", "Notes"] + assert "Old" not in loaded + + def test_generation_warning_summarizes_unresolved_governance(self, tmp_prompts_dir): + kb = tmp_prompts_dir / "alice" / "knowledge_base" + kb.mkdir(parents=True) + (kb / "private-token.md").write_text("local secret") + + warning = prompts.knowledge_base_generation_warning("alice") + + assert "unresolved KB governance" in warning + assert "private-token.md" in warning + class TestPromptPrecedence: def test_custom_prompt_overrides_md(self, tmp_prompts_dir): diff --git a/ui/dialogs.py b/ui/dialogs.py index dec9e41..b8dd211 100644 --- a/ui/dialogs.py +++ b/ui/dialogs.py @@ -274,6 +274,37 @@ def knowledge_base_manager() -> None: f"{summary['approx_tokens']:,} est. tokens ยท " f"{summary['warnings']} signal(s)" ) + if audit.documents: + file_names = [Path(doc.path).name for doc in audit.documents] + with st.expander("Review governance", expanded=False): + canonical = st.multiselect( + "Canonical voice anchors", + options=file_names, + default=[ + name for name in audit.governance.canonical_files + if name.lower() in {item.lower() for item in file_names} + ], + help="Canonical anchors are prioritized and labeled in generation prompts.", + key="kb_governance_canonical", + ) + ignored = st.multiselect( + "Ignored files", + options=file_names, + default=[ + name for name in audit.governance.ignored_files + if name.lower() in {item.lower() for item in file_names} + ], + help="Ignored files stay in the audit but are excluded from generation.", + key="kb_governance_ignored", + ) + if st.button("Save governance", type="primary", use_container_width=True): + kb_audit_mod.save_governance( + user, + canonical_files=canonical, + ignored_files=ignored, + ) + st.toast("KB governance saved.", icon=":material/check_circle:") + st.rerun() audit_json = json.dumps(audit.to_dict(), indent=2) st.download_button( "Download audit JSON", @@ -292,13 +323,21 @@ def knowledge_base_manager() -> None: else: st.info(message) if audit.documents: + actions_by_path = { + warning.path: warning.action + for warning in audit.warnings + if warning.path + } st.dataframe( [ { "File": Path(doc.path).name, "Role": doc.role, + "Canonical": doc.canonical, + "Ignored": doc.ignored, "Tokens": doc.approx_tokens, "Modified": doc.modified_at[:10], + "Action": actions_by_path.get(doc.path, "Ready for generation."), } for doc in audit.documents ], diff --git a/ui/pipeline.py b/ui/pipeline.py index 5f54d97..e4b48d8 100644 --- a/ui/pipeline.py +++ b/ui/pipeline.py @@ -73,6 +73,10 @@ def _execute_run() -> None: s = st.session_state adapters = adapters_mod.get_adapters() kb = prompts_mod.load_knowledge_base(s.selected_user) if s.selected_user else {} + s.kb_governance_warning = ( + prompts_mod.knowledge_base_generation_warning(s.selected_user) + if s.selected_user and kb else None + ) # Capture a start timestamp so the Run metrics block can report # wall-clock duration. End timestamp is written in `finally`. @@ -144,6 +148,8 @@ def _execute_run() -> None: return # ---- Full pipeline --------------------------------------- + if s.get("kb_governance_warning"): + status.write(f"KB governance warning: {s.kb_governance_warning}") _inspect_retrieval(s, status) # Progress callback writes to the sac.steps index and streams # a status line per stage. @@ -290,6 +296,7 @@ def _run_metadata(pending, mode: str, s) -> dict: s.get("recipe_effective_settings"), ), "selected_user": s.selected_user, + "kb_governance_warning": s.get("kb_governance_warning"), "provider": s.ai_provider, "model": s.ai_model, "settings": { diff --git a/ui/session.py b/ui/session.py index 2b8fbcd..264066a 100644 --- a/ui/session.py +++ b/ui/session.py @@ -112,6 +112,7 @@ "recipe_effective_settings": None, "scorecard_summary": None, "handoff_draft_preview": None, + "kb_governance_warning": None, # Wall-clock timestamps for per-run duration. pipeline.py sets # pipeline_started_at before stage 0 and _build_bundle reads both to # compute duration_seconds for the Run metrics block. diff --git a/whisperforge_core/kb_audit.py b/whisperforge_core/kb_audit.py index d907c1a..852d516 100644 --- a/whisperforge_core/kb_audit.py +++ b/whisperforge_core/kb_audit.py @@ -14,6 +14,36 @@ _OVERSIZED_CHARS = 25_000 _STALE_DAYS = 180 _PRIVATE_MARKERS = ("private", "secret", "credential", "token", "password", "confidential") +_GOVERNANCE_FILES = ("governance.yaml", "kb_governance.yaml") +_GENERATION_WARNING_CODES = { + "duplicate_content", + "oversized_file", + "private_marker", + "stale_file", +} + +try: + import yaml +except ImportError: # pragma: no cover - only exercised in stripped installs + yaml = None + + +@dataclass +class KBGovernance: + canonical_files: list[str] = field(default_factory=list) + ignored_files: list[str] = field(default_factory=list) + + def is_canonical(self, path: str | Path) -> bool: + return _name(path) in _normalized_names(self.canonical_files) + + def is_ignored(self, path: str | Path) -> bool: + return _name(path) in _normalized_names(self.ignored_files) + + def to_dict(self) -> dict[str, list[str]]: + return { + "canonical_files": list(self.canonical_files), + "ignored_files": list(self.ignored_files), + } @dataclass @@ -27,6 +57,8 @@ class KBDocument: approx_tokens: int modified_at: str sha256: str + canonical: bool = False + ignored: bool = False def to_dict(self) -> dict[str, Any]: return asdict(self) @@ -38,6 +70,7 @@ class KBWarning: severity: str message: str path: str | None = None + action: str = "review" def to_dict(self) -> dict[str, Any]: return asdict(self) @@ -48,6 +81,7 @@ class KBAudit: user: str kb_dir: str generated_at: str + governance: KBGovernance = field(default_factory=KBGovernance) documents: list[KBDocument] = field(default_factory=list) warnings: list[KBWarning] = field(default_factory=list) @@ -70,28 +104,87 @@ def to_dict(self) -> dict[str, Any]: "approx_tokens": self.total_tokens, "warnings": len(self.warnings), }, + "governance": self.governance.to_dict(), "documents": [doc.to_dict() for doc in self.documents], "warnings": [warning.to_dict() for warning in self.warnings], } -def audit_profile(user: str, *, now: datetime | None = None) -> KBAudit: +def load_governance(user: str, *, prompts_dir: Path | None = None) -> KBGovernance: + root = prompts_dir or PROMPTS_DIR + kb_dir = root / user / "knowledge_base" + path = _governance_path(kb_dir) + if path is None: + return KBGovernance() + data = _read_governance(path) + return KBGovernance( + canonical_files=_string_list( + data.get("canonical_files") + or data.get("canonical_voice_anchors") + or data.get("canonical") + ), + ignored_files=_string_list( + data.get("ignored_files") + or data.get("ignored") + or data.get("exclude") + ), + ) + + +def save_governance( + user: str, + *, + canonical_files: list[str], + ignored_files: list[str], + prompts_dir: Path | None = None, +) -> Path: + root = prompts_dir or PROMPTS_DIR + kb_dir = root / user / "knowledge_base" + kb_dir.mkdir(parents=True, exist_ok=True) + path = kb_dir / _GOVERNANCE_FILES[0] + lines = [ + "# WhisperForge KB governance.", + "# canonical_files are prioritized and labeled in prompts.", + "# ignored_files are excluded from generation but remain visible in audits.", + "canonical_files:", + ] + lines.extend(f" - {item}" for item in _dedupe_names(canonical_files)) + lines.append("ignored_files:") + lines.extend(f" - {item}" for item in _dedupe_names(ignored_files)) + path.write_text("\n".join(lines) + "\n", encoding="utf-8") + return path + + +def audit_profile( + user: str, + *, + now: datetime | None = None, + prompts_dir: Path | None = None, +) -> KBAudit: now = now or datetime.now(timezone.utc) - kb_dir = PROMPTS_DIR / user / "knowledge_base" + root = prompts_dir or PROMPTS_DIR + kb_dir = root / user / "knowledge_base" + governance = load_governance(user, prompts_dir=root) audit = KBAudit( user=user, kb_dir=str(kb_dir), generated_at=_iso(now), + governance=governance, ) if not kb_dir.exists(): audit.warnings.append(KBWarning( code="missing_kb", severity="warning", message=f"`prompts/{user}/knowledge_base/` does not exist.", + action="Create a knowledge_base folder or pick another profile.", )) return audit - docs = [_document(path, now) for path in sorted(kb_dir.iterdir()) if _is_kb_file(path)] + docs = [ + _document(path, now, governance) + for path in sorted(kb_dir.iterdir()) + if _is_kb_file(path) + ] audit.documents = docs if not docs: audit.warnings.append(KBWarning( @@ -99,16 +192,27 @@ def audit_profile(user: str, *, now: datetime | None = None) -> KBAudit: severity="warning", message=f"`prompts/{user}/knowledge_base/` has no .md or .txt files.", path=str(kb_dir), + action="Upload .md or .txt context files before expecting in-voice output.", )) seen_hashes: dict[str, KBDocument] = {} for doc in docs: + if doc.ignored: + audit.warnings.append(KBWarning( + code="ignored_file", + severity="notice", + message=f"`{Path(doc.path).name}` is intentionally ignored by KB governance.", + path=doc.path, + action="Excluded from generation.", + )) + continue if doc.chars == 0: audit.warnings.append(KBWarning( code="empty_file", severity="warning", message=f"`{Path(doc.path).name}` is empty.", path=doc.path, + action="Delete it, replace it, or add it to ignored_files.", )) if doc.chars > _OVERSIZED_CHARS: audit.warnings.append(KBWarning( @@ -116,6 +220,7 @@ def audit_profile(user: str, *, now: datetime | None = None) -> KBAudit: severity="notice", message=f"`{Path(doc.path).name}` is large enough that retrieval should be inspected.", path=doc.path, + action="Inspect retrieval coverage or split it before relying on generation.", )) if _days_old(doc.modified_at, now) > _STALE_DAYS: audit.warnings.append(KBWarning( @@ -123,6 +228,7 @@ def audit_profile(user: str, *, now: datetime | None = None) -> KBAudit: severity="notice", message=f"`{Path(doc.path).name}` has not changed in more than {_STALE_DAYS} days.", path=doc.path, + action="Review freshness, mark canonical if still true, or ignore it.", )) if any(marker in Path(doc.path).name.lower() for marker in _PRIVATE_MARKERS): audit.warnings.append(KBWarning( @@ -130,6 +236,7 @@ def audit_profile(user: str, *, now: datetime | None = None) -> KBAudit: severity="warning", message=f"`{Path(doc.path).name}` looks private or credential-adjacent; verify before sharing.", path=doc.path, + action="Keep local, rename after review, or add it to ignored_files.", )) previous = seen_hashes.get(doc.sha256) if previous: @@ -138,13 +245,47 @@ def audit_profile(user: str, *, now: datetime | None = None) -> KBAudit: severity="notice", message=f"`{Path(doc.path).name}` duplicates `{Path(previous.path).name}`.", path=doc.path, + action="Keep one canonical source and ignore the duplicate.", )) else: seen_hashes[doc.sha256] = doc return audit -def _document(path: Path, now: datetime) -> KBDocument: +def generation_warnings( + user: str, + *, + now: datetime | None = None, + prompts_dir: Path | None = None, +) -> list[KBWarning]: + audit = audit_profile(user, now=now, prompts_dir=prompts_dir) + return [ + warning for warning in audit.warnings + if warning.code in _GENERATION_WARNING_CODES + ] + + +def generation_warning_summary( + user: str, + *, + now: datetime | None = None, + prompts_dir: Path | None = None, + limit: int = 3, +) -> str: + warnings = generation_warnings(user, now=now, prompts_dir=prompts_dir) + if not warnings: + return "" + shown = warnings[:limit] + parts = [ + f"{Path(warning.path).name if warning.path else warning.code}: {warning.action}" + for warning in shown + ] + remaining = len(warnings) - len(shown) + suffix = f"; +{remaining} more" if remaining else "" + return f"{len(warnings)} unresolved KB governance finding(s): " + "; ".join(parts) + suffix + + +def _document(path: Path, now: datetime, governance: KBGovernance) -> KBDocument: try: text = path.read_text(encoding="utf-8") except OSError: @@ -160,6 +301,8 @@ def _document(path: Path, now: datetime) -> KBDocument: approx_tokens=max(1, len(text) // _CHARS_PER_TOKEN) if text else 0, modified_at=_iso(datetime.fromtimestamp(stat.st_mtime, tz=timezone.utc)), sha256=hashlib.sha256(text.encode("utf-8")).hexdigest(), + canonical=governance.is_canonical(path), + ignored=governance.is_ignored(path), ) @@ -167,6 +310,72 @@ def _is_kb_file(path: Path) -> bool: return path.is_file() and path.suffix.lower() in {".md", ".txt"} and not path.name.startswith(".") +def _governance_path(kb_dir: Path) -> Path | None: + for filename in _GOVERNANCE_FILES: + path = kb_dir / filename + if path.exists(): + return path + return None + + +def _read_governance(path: Path) -> dict[str, Any]: + try: + text = path.read_text(encoding="utf-8") + except OSError: + return {} + if yaml is not None: + try: + data = yaml.safe_load(text) + except yaml.YAMLError: + data = {} + return data if isinstance(data, dict) else {} + return _read_simple_yaml(text) + + +def _read_simple_yaml(text: str) -> dict[str, Any]: + data: dict[str, list[str]] = {} + current: str | None = None + for raw_line in text.splitlines(): + line = raw_line.split("#", 1)[0].strip() + if not line: + continue + if line.endswith(":"): + current = line[:-1].strip() + data.setdefault(current, []) + continue + if current and line.startswith("-"): + data[current].append(line[1:].strip().strip("'\"")) + return data + + +def _string_list(value: Any) -> list[str]: + if isinstance(value, str): + return [value] + if isinstance(value, list): + return [str(item) for item in value if str(item).strip()] + return [] + + +def _dedupe_names(values: list[str]) -> list[str]: + seen: set[str] = set() + names: list[str] = [] + for value in values: + item = _name(value) + if not item or item in seen: + continue + seen.add(item) + names.append(item) + return names + + +def _normalized_names(values: list[str]) -> set[str]: + return {_name(value) for value in values} + + +def _name(path: str | Path) -> str: + return Path(str(path)).name.lower() + + def _role(path: Path) -> str: name = path.stem.lower() if any(token in name for token in ("voice", "style", "tone", "writing")): diff --git a/whisperforge_core/prompts.py b/whisperforge_core/prompts.py index 8e2828f..0766c2e 100644 --- a/whisperforge_core/prompts.py +++ b/whisperforge_core/prompts.py @@ -52,14 +52,26 @@ def list_users() -> List[str]: def load_knowledge_base(user: str) -> Dict[str, str]: """Load every .txt/.md file under prompts//knowledge_base/ and return a dict of {TitleCased name: file contents}.""" + from . import kb_audit as kb_audit_mod + kb: Dict[str, str] = {} kb_path = PROMPTS_DIR / user / "knowledge_base" if not kb_path.exists(): return kb - for path in sorted(kb_path.iterdir()): + governance = kb_audit_mod.load_governance(user, prompts_dir=PROMPTS_DIR) + paths = [ + path for path in kb_path.iterdir() + if path.suffix.lower() in {".txt", ".md"} and not path.name.startswith(".") + ] + paths.sort(key=lambda path: (not governance.is_canonical(path), path.name.lower())) + for path in paths: if path.suffix.lower() not in {".txt", ".md"} or path.name.startswith("."): continue + if governance.is_ignored(path): + continue name = path.stem.replace("_", " ").title() + if governance.is_canonical(path): + name = f"Canonical Voice Anchor: {name}" try: kb[name] = path.read_text(encoding="utf-8") except OSError as e: @@ -67,6 +79,12 @@ def load_knowledge_base(user: str) -> Dict[str, str]: return kb +def knowledge_base_generation_warning(user: str) -> str: + from . import kb_audit as kb_audit_mod + + return kb_audit_mod.generation_warning_summary(user, prompts_dir=PROMPTS_DIR) + + def load_user_prompts(user: str) -> Dict[str, str]: """Load prompt .md files directly under prompts// into a dict keyed by filename stem (e.g. 'wisdom_extraction' -> template text)."""