diff --git a/.github/workflows/phase0-gates.yml b/.github/workflows/phase0-gates.yml index 8ff5988..c9011c8 100644 --- a/.github/workflows/phase0-gates.yml +++ b/.github/workflows/phase0-gates.yml @@ -97,4 +97,4 @@ jobs: python -m scripts.migrate apply python -m scripts.migrate verify python -m scripts.check_readiness - python -m pytest tests/integration/test_phase0_postgres.py tests/integration/test_conversation_migration_upgrade.py tests/integration/test_feedback_privacy_integrity_postgres.py tests/integration/test_session_domain_store_postgres.py tests/integration/test_support_inbox_postgres.py -q + python -m pytest tests/integration/test_phase0_postgres.py tests/integration/test_conversation_migration_upgrade.py tests/integration/test_feedback_privacy_integrity_postgres.py tests/integration/test_session_domain_store_postgres.py tests/integration/test_support_inbox_postgres.py tests/integration/test_conversation_summary_postgres.py -q diff --git a/app/api/routes/chat.py b/app/api/routes/chat.py index 9eb4180..9ddb9cd 100644 --- a/app/api/routes/chat.py +++ b/app/api/routes/chat.py @@ -42,8 +42,14 @@ def chat( raise HTTPException(status_code=404, detail="Domain not found") database_runtime = request.app.state.database_runtime + session_state_store = ( + getattr(request.app.state, "chat_session_state_store", None) + if settings.persistence_backend == "postgres" + else None + ) response = ChatFlowService( history_service=ConversationHistoryService(database_runtime), + session_state_store=session_state_store, ).answer( domain=domain, question=payload.message, diff --git a/app/api/routes/web_chat.py b/app/api/routes/web_chat.py index 7cdce70..c31f0b1 100644 --- a/app/api/routes/web_chat.py +++ b/app/api/routes/web_chat.py @@ -48,8 +48,14 @@ def create_web_chat( domain = _load_default_domain(settings) database_runtime = request.app.state.database_runtime + session_state_store = ( + getattr(request.app.state, "chat_session_state_store", None) + if settings.persistence_backend == "postgres" + else None + ) chat_response = ChatFlowService( history_service=ConversationHistoryService(database_runtime), + session_state_store=session_state_store, ).answer( domain=domain, question=payload.message, diff --git a/app/conversations/session_state.py b/app/conversations/session_state.py new file mode 100644 index 0000000..9f75e1c --- /dev/null +++ b/app/conversations/session_state.py @@ -0,0 +1,120 @@ +"""Hot-tier session state seam (layered-persistence plan, fatia/Fase 1). + +Mirrors the philosophy of ``ConversationArchiveSink``: a stable interface with a +swappable implementation and a safe default. The state is the *non-authoritative* +hot tier — the source of truth stays in Postgres (decision A). A Redis backend +drops in behind ``SESSION_STATE_BACKEND=redis`` in a later slice (Nível 1) without +touching callers. + +Privacy: keyed by ``(domain, channel, session_hash)`` — never a raw ``session_id`` +nor free PII. +""" +from __future__ import annotations + +import os +import threading +import time +from dataclasses import dataclass +from typing import Callable, Protocol, runtime_checkable + + +SUPPORTED_SESSION_STATE_BACKENDS = {"memory", "redis"} +DEFAULT_SESSION_STATE_TTL_SECONDS = 2700 # 45 min hot window + + +@dataclass(frozen=True) +class SessionState: + """Sanitized hot-tier state for a session. No raw session_id, no free PII.""" + + state: str + domain: str + confidence: float + updated_at: float + turn_id: str | None = None + redaction_version: str | None = None + + +@runtime_checkable +class SessionStateStore(Protocol): + def get( + self, *, domain: str, channel: str, session_hash: str + ) -> SessionState | None: ... + + def put( + self, + *, + domain: str, + channel: str, + session_hash: str, + state: SessionState, + ttl_seconds: int, + ) -> None: ... + + def clear(self, *, domain: str, channel: str, session_hash: str) -> None: ... + + +def _key(domain: str, channel: str, session_hash: str) -> str: + return f"{domain}::{channel}::{session_hash}" + + +class InMemorySessionStateStore: + """Single-process store with wall-clock TTL. Local/CI/test default; mirrors the + role of ``AppendOnlyFileSink``. + + NOT consistent across uvicorn workers and lost on restart — production + multi-worker needs the Redis backend (Nível 1). This is fine because the hot + tier is non-authoritative: the truth is the Postgres write-through. + """ + + def __init__(self, *, time_fn: Callable[[], float] = time.monotonic) -> None: + self._data: dict[str, tuple[float | None, SessionState]] = {} + self._lock = threading.Lock() + self._time_fn = time_fn + + def get( + self, *, domain: str, channel: str, session_hash: str + ) -> SessionState | None: + key = _key(domain, channel, session_hash) + now = self._time_fn() + with self._lock: + entry = self._data.get(key) + if entry is None: + return None + expires_at, state = entry + if expires_at is not None and now >= expires_at: + self._data.pop(key, None) + return None + return state + + def put( + self, + *, + domain: str, + channel: str, + session_hash: str, + state: SessionState, + ttl_seconds: int, + ) -> None: + key = _key(domain, channel, session_hash) + expires_at = self._time_fn() + ttl_seconds if ttl_seconds > 0 else None + with self._lock: + self._data[key] = (expires_at, state) + + def clear(self, *, domain: str, channel: str, session_hash: str) -> None: + key = _key(domain, channel, session_hash) + with self._lock: + self._data.pop(key, None) + + +def build_session_state_store_from_env( + getenv: Callable[[str, str], str] = os.getenv, +) -> SessionStateStore: + backend = (getenv("SESSION_STATE_BACKEND", "memory") or "memory").strip().lower() + if backend not in SUPPORTED_SESSION_STATE_BACKENDS: + raise ValueError(f"unsupported session state backend: {backend}") + if backend == "memory": + return InMemorySessionStateStore() + # backend == "redis": RedisSessionStateStore is the next slice (Nível 1). + raise NotImplementedError( + "SESSION_STATE_BACKEND=redis requires RedisSessionStateStore (Nível 1, not yet implemented)" + ) diff --git a/app/conversations/summary.py b/app/conversations/summary.py new file mode 100644 index 0000000..eb1d8ab --- /dev/null +++ b/app/conversations/summary.py @@ -0,0 +1,197 @@ +"""Nightly conversation summarization (layered-persistence plan, Fase 3). + +Pure, testable helpers plus a DB batch that reads *closed/inactive* conversations, +sanitizes every turn **before** it can reach the model, asks a cheap model for a +structured record (problem/solution/status) and upserts it idempotently into the +``conversation_summaries`` warehouse. No raw PII, no raw ``session_id``. + +The model provider is injected (``generate_answer(prompt) -> str``) so the script +uses the real ``LLMWrapper`` and tests use a fake. +""" +from __future__ import annotations + +import json +import re +from dataclasses import dataclass +from typing import Any, Iterable, Protocol + +from app.core.persistence_sanitize import REDACTION_VERSION, sanitize_for_persistence + + +VALID_STATUSES = {"resolvido", "em_aberto", "escalado"} + + +class SummaryProvider(Protocol): + def generate_answer(self, prompt: str) -> str: ... + + +@dataclass(frozen=True) +class ConversationSummaryRecord: + domain: str + customer_ref: str + problem: str + solution: str + status: str + source_turn_count: int + redaction_version: str + model: str + conversation_key: str + + +def build_transcript(turns: Iterable[tuple[str, str]]) -> str: + """``turns`` = ordered ``(role, content)``. Each turn is redacted before it can + reach the summarization model (PAN/PII never leaves the boundary unredacted).""" + lines: list[str] = [] + for role, content in turns: + safe = sanitize_for_persistence(content) or "" + speaker = "Cliente" if role == "user" else "Agente" + lines.append(f"{speaker}: {safe}") + return "\n".join(lines) + + +def build_summary_prompt(transcript: str) -> str: + return ( + "Voce resume conversas de atendimento. Leia a conversa e responda APENAS " + "com um objeto JSON, sem texto fora do JSON.\n" + "Campos:\n" + ' - "problem": o problema/objetivo do cliente, em uma frase factual.\n' + ' - "solution": a solucao dada ou tentada, em uma frase factual.\n' + ' - "status": exatamente um de "resolvido", "em_aberto", "escalado".\n' + "Nao invente dados que nao estao na conversa.\n\n" + f"CONVERSA:\n{transcript}\n\n" + 'JSON: {"problem": "...", "solution": "...", "status": "..."}' + ) + + +def parse_summary_json(raw: str) -> dict[str, str]: + match = re.search(r"\{.*\}", raw or "", re.DOTALL) + if not match: + raise ValueError("summary response had no JSON object") + data = json.loads(match.group(0)) + problem = str(data.get("problem", "")).strip() + solution = str(data.get("solution", "")).strip() + status = str(data.get("status", "")).strip().lower() + if status not in VALID_STATUSES: + status = "em_aberto" + if not problem: + raise ValueError("summary is missing 'problem'") + return {"problem": problem, "solution": solution, "status": status} + + +def summarize_turns( + *, turns: list[tuple[str, str]], provider: SummaryProvider +) -> dict[str, str]: + raw = provider.generate_answer(build_summary_prompt(build_transcript(turns))) + return parse_summary_json(raw) + + +def derive_customer_ref(customer_id: Any, session_hash: str | None) -> str: + if customer_id: + return str(customer_id) + if session_hash: + return session_hash + return "unknown" + + +def run_summary_batch( + connection: Any, + provider: SummaryProvider, + *, + model: str, + inactivity_hours: int = 24, + min_turns: int = 2, + limit: int = 100, + force: bool = False, +) -> dict[str, int]: + """Summarize eligible (inactive, non-trivial, not-yet-summarized) conversations. + + Idempotent: UNIQUE (domain, conversation_key) + ON CONFLICT upsert. ``force`` + re-summarizes already-summarized conversations (costs model calls).""" + stats = {"eligible": 0, "summarized": 0, "errors": 0} + with connection.cursor() as cursor: + cursor.execute( + """ + SELECT c.id, d.name, c.customer_id, c.session_hash + FROM conversations c + JOIN domains d ON d.id = c.domain_id + WHERE c.last_message_at IS NOT NULL + AND c.last_message_at < now() - make_interval(hours => %s) + AND ( + SELECT count(*) FROM messages m WHERE m.conversation_id = c.id + ) >= %s + AND ( + %s OR NOT EXISTS ( + SELECT 1 FROM conversation_summaries s + WHERE s.domain = d.name AND s.conversation_key = c.id::text + ) + ) + ORDER BY c.last_message_at DESC + LIMIT %s + """, + (inactivity_hours, min_turns, force, limit), + ) + candidates = cursor.fetchall() + + stats["eligible"] = len(candidates) + for conv_id, domain_name, customer_id, session_hash in candidates: + try: + with connection.cursor() as cursor: + cursor.execute( + """ + SELECT role, content FROM messages + WHERE conversation_id = %s + ORDER BY created_at, message_sequence + """, + (conv_id,), + ) + turns = [(str(role), str(content)) for role, content in cursor.fetchall()] + summary = summarize_turns(turns=turns, provider=provider) + record = ConversationSummaryRecord( + domain=str(domain_name), + customer_ref=derive_customer_ref(customer_id, session_hash), + problem=summary["problem"], + solution=summary["solution"], + status=summary["status"], + source_turn_count=len(turns), + redaction_version=REDACTION_VERSION, + model=model, + conversation_key=str(conv_id), + ) + _upsert_summary(connection, record) + stats["summarized"] += 1 + except Exception: # noqa: BLE001 - one bad conversation must not stop the batch + stats["errors"] += 1 + return stats + + +def _upsert_summary(connection: Any, record: ConversationSummaryRecord) -> None: + with connection.cursor() as cursor: + cursor.execute( + """ + INSERT INTO conversation_summaries ( + domain, customer_ref, problem, solution, status, + source_turn_count, redaction_version, model, conversation_key + ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) + ON CONFLICT (domain, conversation_key) DO UPDATE SET + customer_ref = EXCLUDED.customer_ref, + problem = EXCLUDED.problem, + solution = EXCLUDED.solution, + status = EXCLUDED.status, + source_turn_count = EXCLUDED.source_turn_count, + redaction_version = EXCLUDED.redaction_version, + model = EXCLUDED.model, + summarized_at = now() + """, + ( + record.domain, + record.customer_ref, + record.problem, + record.solution, + record.status, + record.source_turn_count, + record.redaction_version, + record.model, + record.conversation_key, + ), + ) + connection.commit() diff --git a/app/core/config.py b/app/core/config.py index 8efb357..25636e7 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -32,6 +32,12 @@ class Settings(BaseSettings): session_domain_store_backend: str = Field( default="memory", alias="SESSION_DOMAIN_STORE_BACKEND" ) + session_state_backend: str = Field( + default="memory", alias="SESSION_STATE_BACKEND" + ) + session_state_ttl_seconds: int = Field( + default=2700, alias="SESSION_STATE_TTL_SECONDS" + ) persistence_hash_secret: str | None = Field( default=None, alias="PERSISTENCE_HASH_SECRET", @@ -46,6 +52,10 @@ class Settings(BaseSettings): default=False, alias="ENABLE_SUPPORT_INBOX", ) + enable_conversation_summary: bool = Field( + default=False, + alias="ENABLE_CONVERSATION_SUMMARY", + ) verified_handoff_webhook_url: str | None = Field( default=None, alias="VERIFIED_HANDOFF_WEBHOOK_URL", @@ -356,6 +366,9 @@ def require_api_secret_outside_dev(self) -> Self: raise ValueError( "SESSION_DOMAIN_STORE_BACKEND=postgres requires PERSISTENCE_BACKEND=postgres" ) + self.session_state_backend = self.session_state_backend.strip().lower() + if self.session_state_backend not in {"memory", "redis"}: + raise ValueError("SESSION_STATE_BACKEND must be memory or redis") self.retrieval_backend = self.retrieval_backend.strip().lower() if self.retrieval_backend not in {"lexical", "pgvector"}: raise ValueError("RETRIEVAL_BACKEND must be lexical or pgvector") diff --git a/app/main.py b/app/main.py index 2f061a2..0b695ee 100644 --- a/app/main.py +++ b/app/main.py @@ -38,6 +38,7 @@ build_session_domain_store, ) from app.web_auth.runtime import create_web_auth_runtime +from app.conversations.session_state import build_session_state_store_from_env from app.db.runtime import DatabaseRuntime from app.core.errors import DatabaseUnavailableError @@ -83,6 +84,10 @@ async def lifespan(_: FastAPI): application.state.session_state_store = InMemorySessionDomainStore(ttl_seconds=900) # Last outbound text per session, to avoid repeating the exact same message. application.state.session_last_out_store = InMemorySessionDomainStore(ttl_seconds=900) + # Hot-tier chat session state (layered-persistence Nível 0): per-process, + # non-authoritative, fail-open. Redis backend drops in at Nível 1 via + # SESSION_STATE_BACKEND. Source of truth stays the Postgres write-through. + application.state.chat_session_state_store = build_session_state_store_from_env() # Best-effort replay protection for the inbound Hermes chat forward (the HMAC # covers only the body, so an identical request is otherwise replayable). application.state.hermes_replay_guard = HermesReplayGuard() diff --git a/app/orchestration/chat_flow.py b/app/orchestration/chat_flow.py index 05d0576..ff96bdf 100644 --- a/app/orchestration/chat_flow.py +++ b/app/orchestration/chat_flow.py @@ -1,4 +1,6 @@ +import logging import re +import time import unicodedata from time import perf_counter @@ -10,7 +12,11 @@ from app.orchestration.confidence import compute_confidence from app.orchestration.prompt_builder import build_prompt from app.retrieval.service import RetrievalService -from app.conversations.service import ConversationHistoryService +from app.conversations.service import ConversationHistoryService, hash_session +from app.conversations.session_state import SessionState, SessionStateStore + + +logger = logging.getLogger(__name__) def _normalize(text: str) -> str: @@ -32,11 +38,13 @@ def __init__( self, *, history_service: ConversationHistoryService | None = None, + session_state_store: SessionStateStore | None = None, ) -> None: self.retrieval_service = RetrievalService() self.llm_service = LLMService() self.handoff_service = HandoffService() self.history_service = history_service + self.session_state_store = session_state_store def answer( self, @@ -47,6 +55,66 @@ def answer( provider_api_key: str | None = None, channel: str = "api", customer_id: str | None = None, + ) -> dict[str, object]: + result = self._answer_inner( + domain, + question, + session_id=session_id, + request_id=request_id, + provider_api_key=provider_api_key, + channel=channel, + customer_id=customer_id, + ) + # Hot-tier state write (non-authoritative, fail-open). No-op without a store. + self._record_session_state( + domain=domain, session_id=session_id, channel=channel, result=result + ) + return result + + def _record_session_state( + self, + *, + domain: DomainConfig, + session_id: str | None, + channel: str, + result: dict[str, object], + ) -> None: + store = self.session_state_store + if store is None or not session_id: + return + try: + from app.core.config import get_settings + + settings = get_settings() + session_hash = hash_session( + session_id, settings.persistence_hash_secret or "" + ) + state = SessionState( + state="escalated" if result.get("escalated") else "answered", + domain=str(result.get("domain") or domain.name), + confidence=float(result.get("confidence") or 0.0), + updated_at=time.time(), + redaction_version=getattr(settings, "persistence_hash_version", None), + ) + store.put( + domain=domain.name, + channel=channel, + session_hash=session_hash, + state=state, + ttl_seconds=settings.session_state_ttl_seconds, + ) + except Exception: # noqa: BLE001 - hot state is non-authoritative; never break /chat. + logger.debug("session_state_write_skipped", exc_info=False) + + def _answer_inner( + self, + domain: DomainConfig, + question: str, + session_id: str | None = None, + request_id: str | None = None, + provider_api_key: str | None = None, + channel: str = "api", + customer_id: str | None = None, ) -> dict[str, object]: total_started_at = perf_counter() retrieval_ms = 0.0 diff --git a/docs/quality-plans/conversation-persistence-tiering-plan.md b/docs/quality-plans/conversation-persistence-tiering-plan.md index feccf77..9945a71 100644 --- a/docs/quality-plans/conversation-persistence-tiering-plan.md +++ b/docs/quality-plans/conversation-persistence-tiering-plan.md @@ -138,6 +138,40 @@ Escada de escalada (só sobe quando o gatilho aparecer, sem virar a B inteira): Assim a A **cresce para dentro da B** de forma incremental e reversível, em vez de pagar a complexidade adiantado por uma carga que ainda não existe. +### Fluxos de referência (síncrono vs assíncrono) + +O princípio operacional do Nível 0, concreto. Existe um diagrama de referência +deste desenho (gerado na discussão de 2026-06-29); o texto abaixo é a fonte da +verdade. + +**Fluxo de um turno (qualquer domínio):** + +1. Inbound (web/WhatsApp/…) chega ao `ChatFlowService` — roteia, recupera (pgvector), + responde (LLM). +2. **Síncrono (1 transação Postgres):** grava o turno (`conversations`/`messages` + + `chat_audits`) **e**, na mesma transação, **enfileira o(s) evento(s) no + `operational_outbox`**. Commit atômico = o turno está durável e a cópia está + garantida. Custo ~ms (não percebido; o LLM domina). +3. **Estado quente (síncrono, fail-open):** lê/grava `SessionStateStore` (in-memory + no Nível 0) por `(domain, channel, session_hash)`. Se o store cair, o `/chat` + responde mesmo assim — não é autoritativo. +4. **Assíncrono (worker `dispatch_outbox`, fora do hot path):** entrega o evento + `conversation.turn.archived` ao **backup off-box (R2/S3, append-only)** — at-least-once, + idempotente. RPO ≈ lag do worker (segundos). + +**Fluxo de handoff (escalou para humano):** + +1. Mesma transação síncrona do turno, **mais** o `support_cases` (ticket durável) + + o evento `handoff.requested` no outbox. **Este é o gate de consistência:** se a + transação falhar, **não** se promete humano ao usuário (não "barra" em falso). +2. Assíncrono via outbox: entrega de `handoff.requested` ao consumidor externo + (quando configurado) + a cópia off-box. O **support inbox** (leitura) já serve o + ticket a partir do Postgres, independente da entrega externa. + +**Onde o Redis entra (Níveis 1–2, sob gatilho):** como backend do `SessionStateStore` +(estado quente, TTL, não-autoritativo) e/ou cache de leitura 7d por cima do Postgres +— **nunca** como âncora de durabilidade no Nível 0. + --- ## 5. Tradeoffs honestos / limites de MVP diff --git a/docs/quality-plans/conversation-persistence-tiering-tech-plan.md b/docs/quality-plans/conversation-persistence-tiering-tech-plan.md index b6e3128..74d1446 100644 --- a/docs/quality-plans/conversation-persistence-tiering-tech-plan.md +++ b/docs/quality-plans/conversation-persistence-tiering-tech-plan.md @@ -47,6 +47,17 @@ Sem mudança de código. Fases 1–4 são as entregas de engenharia. ## Fase 1 — `SessionStateStore` seam + impl in-memory +Status: **implementada (2026-06-29)**, default no-op. O seam ficou em +`app/conversations/session_state.py` (`SessionState` + `SessionStateStore` Protocol + +`InMemorySessionStateStore` com TTL + `build_session_state_store_from_env`). Config: +`SESSION_STATE_BACKEND` (memory|redis) + `SESSION_STATE_TTL_SECONDS` (2700). O +`ChatFlowService` grava o estado via wrapper `answer()`→`_answer_inner()` + +`_record_session_state` (fail-open, chaveado por `hash_session`), e `chat.py`/`web_chat.py` +injetam o store de `app.state.chat_session_state_store` só quando +`persistence_backend==postgres`. Cobertura em `tests/test_session_state_store.py`. +Pendente (próximas fatias): o **leitor** do estado (consumo), o backend Redis +(Nível 1), e migrar o estado de escape do transporte Hermes para o mesmo seam. + Menor incremento de código. Sem Redis, sem infra. ### Arquivos @@ -168,6 +179,19 @@ def build_session_state_store_from_env() -> SessionStateStore: ## Fase 3 — Warehouse + batch noturno de sumarização +Status: **núcleo implementado (2026-06-29)**, dark por `ENABLE_CONVERSATION_SUMMARY`. +Migration `012_conversation_summaries.sql` (tabela + `UNIQUE(domain, conversation_key)` ++ CHECK de status). Núcleo testável em `app/conversations/summary.py` (transcript +com sanitização **antes** do modelo, prompt, parse robusto de JSON, `run_summary_batch` +idempotente por upsert). Script operacional `scripts/summarize_conversations.py` +(elegível = inativa ≥ `--inactivity-hours`, ≥ `--min-turns`, ainda não resumida; +`--dry-run` não chama modelo; recusa escrever sem a flag). `customer_ref` = +`customer_id` senão `session_hash`. Cobertura: `tests/test_conversation_summary.py` +(unit, inclui PAN redigido antes do modelo) + `tests/integration/test_conversation_summary_postgres.py` +(Postgres real, na gate `phase0-gates.yml`). **Pendente:** agendamento (systemd timer ++ runbook), consumo no RAG (Fase 4, atrás de eval), e métrica de custo no +`cost-latency-profile`. + Postgres como base analítica/RAG, alimentada por batch idempotente às ~3h. ### Migration diff --git a/migrations/012_conversation_summaries.sql b/migrations/012_conversation_summaries.sql new file mode 100644 index 0000000..fd53c88 --- /dev/null +++ b/migrations/012_conversation_summaries.sql @@ -0,0 +1,29 @@ +-- Migration 012 - Warehouse de resumos de conversa (persistencia em camadas, Fase 3). +-- +-- Base analitica/RAG alimentada pelo batch noturno idempotente +-- (scripts/summarize_conversations.py). NAO ha PII crua aqui: o texto e +-- sanitizado/redigido (REDACTION_VERSION) antes de ir ao modelo, e o cliente e +-- identificado por id/hash estavel (customer_ref), nunca telefone cru. +-- Idempotencia por conversa via UNIQUE (domain, conversation_key): re-rodar o +-- batch sobrescreve o mesmo registro. + +CREATE TABLE conversation_summaries ( + id BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY, + domain TEXT NOT NULL, + customer_ref TEXT NOT NULL, + problem TEXT NOT NULL, + solution TEXT NOT NULL, + status TEXT NOT NULL, + source_turn_count INT NOT NULL, + redaction_version TEXT NOT NULL, + model TEXT NOT NULL, + summarized_at TIMESTAMPTZ NOT NULL DEFAULT now(), + conversation_key TEXT NOT NULL, + CONSTRAINT conversation_summaries_status_check + CHECK (status IN ('resolvido', 'em_aberto', 'escalado')), + CONSTRAINT conversation_summaries_unique_per_conversation + UNIQUE (domain, conversation_key) +); + +CREATE INDEX idx_conversation_summaries_lookup + ON conversation_summaries (domain, customer_ref); diff --git a/scripts/summarize_conversations.py b/scripts/summarize_conversations.py new file mode 100644 index 0000000..79cd2ae --- /dev/null +++ b/scripts/summarize_conversations.py @@ -0,0 +1,96 @@ +"""Nightly conversation summarization batch (layered-persistence plan, Fase 3). + +Reads closed/inactive conversations, sanitizes every turn before the model, asks +gpt-4o-mini for a structured record and upserts it idempotently into +``conversation_summaries``. Dark by default: refuses to write unless +``ENABLE_CONVERSATION_SUMMARY=true`` (``--dry-run`` only counts eligible +conversations and never calls the model). + +Run via a systemd timer (~3h), never a cron inside the app. Logs sanitized +metrics only; no PII, no raw session_id (see docs/observability.md). +""" +from __future__ import annotations + +import argparse +import json +import os +import sys + + +def _eligible_count(connection, *, inactivity_hours: int, min_turns: int) -> int: + with connection.cursor() as cursor: + cursor.execute( + """ + SELECT count(*) + FROM conversations c + WHERE c.last_message_at IS NOT NULL + AND c.last_message_at < now() - make_interval(hours => %s) + AND ( + SELECT count(*) FROM messages m WHERE m.conversation_id = c.id + ) >= %s + AND NOT EXISTS ( + SELECT 1 FROM conversation_summaries s + JOIN domains d ON d.name = s.domain + WHERE d.id = c.domain_id AND s.conversation_key = c.id::text + ) + """, + (inactivity_hours, min_turns), + ) + return int(cursor.fetchone()[0]) + + +def main() -> int: + parser = argparse.ArgumentParser(description="Summarize closed conversations into the warehouse.") + parser.add_argument("--inactivity-hours", type=int, default=24) + parser.add_argument("--min-turns", type=int, default=2) + parser.add_argument("--limit", type=int, default=100) + parser.add_argument("--model", default="gpt-4o-mini") + parser.add_argument("--dry-run", action="store_true") + parser.add_argument("--force", action="store_true", help="re-summarize already-summarized conversations") + args = parser.parse_args() + + database_url = os.getenv("DATABASE_URL") + if not database_url: + parser.error("DATABASE_URL is required") + + enabled = os.getenv("ENABLE_CONVERSATION_SUMMARY", "").strip().lower() == "true" + if not enabled and not args.dry_run: + print( + json.dumps({"event": "conversation_summary_disabled", + "hint": "set ENABLE_CONVERSATION_SUMMARY=true or use --dry-run"}), + file=sys.stderr, + ) + return 0 + + import psycopg + + connect_timeout = int(os.getenv("DATABASE_CONNECT_TIMEOUT_SECONDS", "5") or "5") + with psycopg.connect(database_url, connect_timeout=connect_timeout) as connection: + if args.dry_run: + count = _eligible_count( + connection, + inactivity_hours=args.inactivity_hours, + min_turns=args.min_turns, + ) + print(json.dumps({"event": "conversation_summary_dry_run", "eligible": count})) + return 0 + + from app.conversations.summary import run_summary_batch + from app.llm.wrapper import LLMWrapper + + provider = LLMWrapper(model=args.model) + stats = run_summary_batch( + connection, + provider, + model=args.model, + inactivity_hours=args.inactivity_hours, + min_turns=args.min_turns, + limit=args.limit, + force=args.force, + ) + print(json.dumps({"event": "conversation_summary_completed", **stats})) + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/integration/test_conversation_summary_postgres.py b/tests/integration/test_conversation_summary_postgres.py new file mode 100644 index 0000000..0c34acc --- /dev/null +++ b/tests/integration/test_conversation_summary_postgres.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +import os +import subprocess +import sys + +import pytest + +from app.conversations.summary import run_summary_batch + + +DATABASE_URL = os.getenv("PHASE0_TEST_DATABASE_URL") +pytestmark = pytest.mark.skipif( + not DATABASE_URL, + reason="set PHASE0_TEST_DATABASE_URL to run PostgreSQL integration tests", +) + + +@pytest.fixture(scope="session", autouse=True) +def migrated_database() -> None: + if not DATABASE_URL: + return + env = {**os.environ, "DATABASE_URL": DATABASE_URL} + result = subprocess.run( + [sys.executable, "-m", "scripts.migrate", "apply"], + env=env, + check=False, + capture_output=True, + text=True, + timeout=60, + ) + assert result.returncode == 0, "migrations failed in disposable test database" + + +class _FakeProvider: + def __init__(self, response: str) -> None: + self.response = response + + def generate_answer(self, prompt: str) -> str: + return self.response + + +def _connect(): + import psycopg + + return psycopg.connect(DATABASE_URL) + + +def _seed_conversation(connection, *, name: str, n_messages: int, hours_old: int) -> str: + with connection.cursor() as cursor: + cursor.execute( + "INSERT INTO domains (name, display_name) VALUES (%s, %s) " + "ON CONFLICT (name) DO UPDATE SET display_name = EXCLUDED.display_name RETURNING id", + (name, name), + ) + domain_id = cursor.fetchone()[0] + cursor.execute( + """ + INSERT INTO conversations + (domain_id, channel, session_hash, session_hash_version, status, last_message_at) + VALUES (%s, 'whatsapp', %s, 'hmac-sha256-v1', 'bot', now() - make_interval(hours => %s)) + RETURNING id + """, + (domain_id, f"hash-{name}", hours_old), + ) + conv_id = cursor.fetchone()[0] + for i in range(n_messages): + role = "user" if i % 2 == 0 else "assistant" + cursor.execute( + "INSERT INTO messages (conversation_id, role, content, redaction_version) " + "VALUES (%s, %s, %s, 'phase0-v1')", + (conv_id, role, f"mensagem {i}"), + ) + connection.commit() + return str(conv_id) + + +def test_summary_batch_writes_and_is_idempotent() -> None: + # Assertions are scoped to the seeded conversation_key so other integration + # tests sharing the database never interfere. + provider = _FakeProvider('{"problem":"vps caiu","solution":"reiniciada","status":"resolvido"}') + with _connect() as connection: + conv_id = _seed_conversation(connection, name="itest-sum-a", n_messages=2, hours_old=48) + + run_summary_batch(connection, provider, model="test-model", inactivity_hours=24, min_turns=2, limit=100) + with connection.cursor() as cursor: + cursor.execute( + "SELECT problem, status, source_turn_count FROM conversation_summaries WHERE conversation_key = %s", + (conv_id,), + ) + assert cursor.fetchone() == ("vps caiu", "resolvido", 2) + + # Re-run is idempotent: the conversation is already summarized, so it stays + # exactly one row (and the UNIQUE constraint would reject a duplicate). + stats2 = run_summary_batch(connection, provider, model="test-model", inactivity_hours=24, min_turns=2, limit=100) + assert stats2["errors"] == 0 + with connection.cursor() as cursor: + cursor.execute("SELECT count(*) FROM conversation_summaries WHERE conversation_key = %s", (conv_id,)) + assert cursor.fetchone()[0] == 1 + + +def test_summary_batch_skips_trivial_single_turn() -> None: + provider = _FakeProvider('{"problem":"x","solution":"y","status":"resolvido"}') + with _connect() as connection: + conv_id = _seed_conversation(connection, name="itest-sum-b", n_messages=1, hours_old=48) + run_summary_batch(connection, provider, model="test-model", inactivity_hours=24, min_turns=2, limit=100) + # The single-turn conversation must not be summarized. + with connection.cursor() as cursor: + cursor.execute("SELECT count(*) FROM conversation_summaries WHERE conversation_key = %s", (conv_id,)) + assert cursor.fetchone()[0] == 0 diff --git a/tests/test_conversation_summary.py b/tests/test_conversation_summary.py new file mode 100644 index 0000000..c830b94 --- /dev/null +++ b/tests/test_conversation_summary.py @@ -0,0 +1,74 @@ +import pytest + +from app.conversations.summary import ( + build_summary_prompt, + build_transcript, + derive_customer_ref, + parse_summary_json, + summarize_turns, +) + + +CARD = "4111 1111 1111 1111" # valid Luhn test PAN + + +class _CapturingProvider: + def __init__(self, response: str) -> None: + self.response = response + self.last_prompt: str | None = None + + def generate_answer(self, prompt: str) -> str: + self.last_prompt = prompt + return self.response + + +def test_build_transcript_labels_and_redacts_pan() -> None: + transcript = build_transcript([("user", f"meu cartao e {CARD}"), ("assistant", "ok")]) + assert "Cliente:" in transcript and "Agente:" in transcript + assert CARD not in transcript # PAN redacted before it can reach the model + + +def test_summarize_turns_redacts_before_model() -> None: + provider = _CapturingProvider('{"problem":"p","solution":"s","status":"resolvido"}') + summarize_turns(turns=[("user", f"pague no {CARD}")], provider=provider) + assert provider.last_prompt is not None + assert CARD not in provider.last_prompt + + +def test_parse_summary_json_extracts_from_noise() -> None: + raw = 'Claro!\n{"problem": "DNS nao propagava", "solution": "ajustou ns", "status": "RESOLVIDO"} fim' + parsed = parse_summary_json(raw) + assert parsed["problem"] == "DNS nao propagava" + assert parsed["status"] == "resolvido" + + +def test_parse_summary_json_invalid_status_falls_back() -> None: + parsed = parse_summary_json('{"problem": "x", "solution": "y", "status": "pendente"}') + assert parsed["status"] == "em_aberto" + + +def test_parse_summary_json_requires_problem() -> None: + with pytest.raises(ValueError): + parse_summary_json('{"problem": "", "solution": "y", "status": "resolvido"}') + + +def test_parse_summary_json_requires_json() -> None: + with pytest.raises(ValueError): + parse_summary_json("desculpe, nao consegui resumir") + + +def test_summarize_turns_returns_structured() -> None: + provider = _CapturingProvider('{"problem":"p","solution":"s","status":"escalado"}') + out = summarize_turns(turns=[("user", "oi"), ("assistant", "ola")], provider=provider) + assert out == {"problem": "p", "solution": "s", "status": "escalado"} + + +def test_build_summary_prompt_contains_transcript() -> None: + prompt = build_summary_prompt("Cliente: oi\nAgente: ola") + assert "Cliente: oi" in prompt and "JSON" in prompt + + +def test_derive_customer_ref_prefers_customer_id() -> None: + assert derive_customer_ref("cust-123", "hash-abc") == "cust-123" + assert derive_customer_ref(None, "hash-abc") == "hash-abc" + assert derive_customer_ref(None, None) == "unknown" diff --git a/tests/test_session_state_store.py b/tests/test_session_state_store.py new file mode 100644 index 0000000..8ac33b9 --- /dev/null +++ b/tests/test_session_state_store.py @@ -0,0 +1,165 @@ +import time +from types import SimpleNamespace + +import pytest + +from app.conversations.service import hash_session +from app.conversations.session_state import ( + InMemorySessionStateStore, + SessionState, + SessionStateStore, + build_session_state_store_from_env, +) +from app.core.config import get_settings +from app.orchestration.chat_flow import ChatFlowService + + +def _state(label: str = "answered", domain: str = "vendas", conf: float = 0.5) -> SessionState: + return SessionState(state=label, domain=domain, confidence=conf, updated_at=time.time()) + + +class _Clock: + def __init__(self) -> None: + self.t = 1000.0 + + def __call__(self) -> float: + return self.t + + +# --- InMemorySessionStateStore ------------------------------------------------ + +def test_put_get_roundtrip() -> None: + store = InMemorySessionStateStore() + st = _state() + store.put(domain="vendas", channel="whatsapp", session_hash="abc", state=st, ttl_seconds=60) + assert store.get(domain="vendas", channel="whatsapp", session_hash="abc") == st + + +def test_isolation_by_domain_channel_hash() -> None: + store = InMemorySessionStateStore() + store.put(domain="vendas", channel="whatsapp", session_hash="abc", state=_state(), ttl_seconds=60) + assert store.get(domain="suporte", channel="whatsapp", session_hash="abc") is None + assert store.get(domain="vendas", channel="web", session_hash="abc") is None + assert store.get(domain="vendas", channel="whatsapp", session_hash="xyz") is None + + +def test_ttl_expiry() -> None: + clock = _Clock() + store = InMemorySessionStateStore(time_fn=clock) + store.put(domain="vendas", channel="whatsapp", session_hash="abc", state=_state(), ttl_seconds=60) + clock.t += 59 + assert store.get(domain="vendas", channel="whatsapp", session_hash="abc") is not None + clock.t += 2 + assert store.get(domain="vendas", channel="whatsapp", session_hash="abc") is None + + +def test_ttl_zero_never_expires() -> None: + clock = _Clock() + store = InMemorySessionStateStore(time_fn=clock) + store.put(domain="vendas", channel="whatsapp", session_hash="abc", state=_state(), ttl_seconds=0) + clock.t += 100_000 + assert store.get(domain="vendas", channel="whatsapp", session_hash="abc") is not None + + +def test_clear() -> None: + store = InMemorySessionStateStore() + store.put(domain="vendas", channel="whatsapp", session_hash="abc", state=_state(), ttl_seconds=60) + store.clear(domain="vendas", channel="whatsapp", session_hash="abc") + assert store.get(domain="vendas", channel="whatsapp", session_hash="abc") is None + + +# --- build_session_state_store_from_env -------------------------------------- + +def test_build_from_env_memory_and_protocol() -> None: + store = build_session_state_store_from_env(getenv=lambda k, d="": "memory") + assert isinstance(store, InMemorySessionStateStore) + assert isinstance(store, SessionStateStore) + + +def test_build_from_env_defaults_to_memory() -> None: + assert isinstance(build_session_state_store_from_env(getenv=lambda k, d="": d), InMemorySessionStateStore) + + +def test_build_from_env_rejects_unknown_backend() -> None: + with pytest.raises(ValueError): + build_session_state_store_from_env(getenv=lambda k, d="": "mongodb") + + +def test_build_from_env_redis_not_implemented_yet() -> None: + with pytest.raises(NotImplementedError): + build_session_state_store_from_env(getenv=lambda k, d="": "redis") + + +# --- ChatFlowService integration (write path, fail-open) --------------------- + +def _hash(session_id: str) -> str: + return hash_session(session_id, get_settings().persistence_hash_secret or "") + + +def test_record_session_state_writes() -> None: + store = InMemorySessionStateStore() + flow = ChatFlowService(session_state_store=store) + flow._record_session_state( + domain=SimpleNamespace(name="vendas"), + session_id="whatsapp:hermes:abc", + channel="whatsapp", + result={"domain": "vendas", "confidence": 0.42, "escalated": True}, + ) + st = store.get(domain="vendas", channel="whatsapp", session_hash=_hash("whatsapp:hermes:abc")) + assert st is not None + assert st.state == "escalated" + assert st.confidence == 0.42 + + +def test_record_session_state_noop_without_store() -> None: + ChatFlowService()._record_session_state( + domain=SimpleNamespace(name="vendas"), + session_id="web:1", + channel="web", + result={"escalated": False}, + ) # must not raise + + +def test_record_session_state_noop_without_session_id() -> None: + store = InMemorySessionStateStore() + ChatFlowService(session_state_store=store)._record_session_state( + domain=SimpleNamespace(name="vendas"), + session_id=None, + channel="web", + result={"escalated": False}, + ) + assert store.get(domain="vendas", channel="web", session_hash=_hash("x")) is None + + +def test_record_session_state_fail_open() -> None: + class _BoomStore: + def get(self, **kwargs): + return None + + def put(self, **kwargs): + raise RuntimeError("redis down") + + def clear(self, **kwargs): + pass + + # Must swallow the store error: the hot tier is non-authoritative. + ChatFlowService(session_state_store=_BoomStore())._record_session_state( + domain=SimpleNamespace(name="vendas"), + session_id="web:1", + channel="web", + result={"escalated": False}, + ) + + +def test_answer_wrapper_records_state(monkeypatch) -> None: + store = InMemorySessionStateStore() + flow = ChatFlowService(session_state_store=store) + monkeypatch.setattr( + flow, + "_answer_inner", + lambda *a, **k: {"domain": "vendas", "confidence": 0.7, "escalated": False}, + ) + out = flow.answer(SimpleNamespace(name="vendas"), "oi", session_id="web:123", channel="web") + assert out["confidence"] == 0.7 + st = store.get(domain="vendas", channel="web", session_hash=_hash("web:123")) + assert st is not None and st.state == "answered"