Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/phase0-gates.yml
Original file line number Diff line number Diff line change
Expand Up @@ -97,4 +97,4 @@ jobs:
python -m scripts.migrate apply
python -m scripts.migrate verify
python -m scripts.check_readiness
python -m pytest tests/integration/test_phase0_postgres.py tests/integration/test_conversation_migration_upgrade.py tests/integration/test_feedback_privacy_integrity_postgres.py tests/integration/test_session_domain_store_postgres.py tests/integration/test_support_inbox_postgres.py -q
python -m pytest tests/integration/test_phase0_postgres.py tests/integration/test_conversation_migration_upgrade.py tests/integration/test_feedback_privacy_integrity_postgres.py tests/integration/test_session_domain_store_postgres.py tests/integration/test_support_inbox_postgres.py tests/integration/test_conversation_summary_postgres.py -q
6 changes: 6 additions & 0 deletions app/api/routes/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,14 @@ def chat(
raise HTTPException(status_code=404, detail="Domain not found")

database_runtime = request.app.state.database_runtime
session_state_store = (
getattr(request.app.state, "chat_session_state_store", None)
if settings.persistence_backend == "postgres"
else None
)
response = ChatFlowService(
history_service=ConversationHistoryService(database_runtime),
session_state_store=session_state_store,
).answer(
domain=domain,
question=payload.message,
Expand Down
6 changes: 6 additions & 0 deletions app/api/routes/web_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,14 @@ def create_web_chat(
domain = _load_default_domain(settings)

database_runtime = request.app.state.database_runtime
session_state_store = (
getattr(request.app.state, "chat_session_state_store", None)
if settings.persistence_backend == "postgres"
else None
)
chat_response = ChatFlowService(
history_service=ConversationHistoryService(database_runtime),
session_state_store=session_state_store,
).answer(
domain=domain,
question=payload.message,
Expand Down
120 changes: 120 additions & 0 deletions app/conversations/session_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""Hot-tier session state seam (layered-persistence plan, fatia/Fase 1).

Mirrors the philosophy of ``ConversationArchiveSink``: a stable interface with a
swappable implementation and a safe default. The state is the *non-authoritative*
hot tier — the source of truth stays in Postgres (decision A). A Redis backend
drops in behind ``SESSION_STATE_BACKEND=redis`` in a later slice (Nível 1) without
touching callers.

Privacy: keyed by ``(domain, channel, session_hash)`` — never a raw ``session_id``
nor free PII.
"""
from __future__ import annotations

import os
import threading
import time
from dataclasses import dataclass
from typing import Callable, Protocol, runtime_checkable


SUPPORTED_SESSION_STATE_BACKENDS = {"memory", "redis"}
DEFAULT_SESSION_STATE_TTL_SECONDS = 2700 # 45 min hot window


@dataclass(frozen=True)
class SessionState:
"""Sanitized hot-tier state for a session. No raw session_id, no free PII."""

state: str
domain: str
confidence: float
updated_at: float
turn_id: str | None = None
redaction_version: str | None = None


@runtime_checkable
class SessionStateStore(Protocol):
def get(
self, *, domain: str, channel: str, session_hash: str
) -> SessionState | None: ...

def put(
self,
*,
domain: str,
channel: str,
session_hash: str,
state: SessionState,
ttl_seconds: int,
) -> None: ...

def clear(self, *, domain: str, channel: str, session_hash: str) -> None: ...


def _key(domain: str, channel: str, session_hash: str) -> str:
return f"{domain}::{channel}::{session_hash}"


class InMemorySessionStateStore:
"""Single-process store with wall-clock TTL. Local/CI/test default; mirrors the
role of ``AppendOnlyFileSink``.

NOT consistent across uvicorn workers and lost on restart — production
multi-worker needs the Redis backend (Nível 1). This is fine because the hot
tier is non-authoritative: the truth is the Postgres write-through.
"""

def __init__(self, *, time_fn: Callable[[], float] = time.monotonic) -> None:
self._data: dict[str, tuple[float | None, SessionState]] = {}
self._lock = threading.Lock()
self._time_fn = time_fn

def get(
self, *, domain: str, channel: str, session_hash: str
) -> SessionState | None:
key = _key(domain, channel, session_hash)
now = self._time_fn()
with self._lock:
entry = self._data.get(key)
if entry is None:
return None
expires_at, state = entry
if expires_at is not None and now >= expires_at:
self._data.pop(key, None)
return None
return state

def put(
self,
*,
domain: str,
channel: str,
session_hash: str,
state: SessionState,
ttl_seconds: int,
) -> None:
key = _key(domain, channel, session_hash)
expires_at = self._time_fn() + ttl_seconds if ttl_seconds > 0 else None
with self._lock:
self._data[key] = (expires_at, state)

def clear(self, *, domain: str, channel: str, session_hash: str) -> None:
key = _key(domain, channel, session_hash)
with self._lock:
self._data.pop(key, None)


def build_session_state_store_from_env(
getenv: Callable[[str, str], str] = os.getenv,
) -> SessionStateStore:
backend = (getenv("SESSION_STATE_BACKEND", "memory") or "memory").strip().lower()
if backend not in SUPPORTED_SESSION_STATE_BACKENDS:
raise ValueError(f"unsupported session state backend: {backend}")
if backend == "memory":
return InMemorySessionStateStore()
# backend == "redis": RedisSessionStateStore is the next slice (Nível 1).
raise NotImplementedError(
"SESSION_STATE_BACKEND=redis requires RedisSessionStateStore (Nível 1, not yet implemented)"
)
197 changes: 197 additions & 0 deletions app/conversations/summary.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
"""Nightly conversation summarization (layered-persistence plan, Fase 3).

Pure, testable helpers plus a DB batch that reads *closed/inactive* conversations,
sanitizes every turn **before** it can reach the model, asks a cheap model for a
structured record (problem/solution/status) and upserts it idempotently into the
``conversation_summaries`` warehouse. No raw PII, no raw ``session_id``.

The model provider is injected (``generate_answer(prompt) -> str``) so the script
uses the real ``LLMWrapper`` and tests use a fake.
"""
from __future__ import annotations

import json
import re
from dataclasses import dataclass
from typing import Any, Iterable, Protocol

from app.core.persistence_sanitize import REDACTION_VERSION, sanitize_for_persistence


VALID_STATUSES = {"resolvido", "em_aberto", "escalado"}


class SummaryProvider(Protocol):
def generate_answer(self, prompt: str) -> str: ...


@dataclass(frozen=True)
class ConversationSummaryRecord:
domain: str
customer_ref: str
problem: str
solution: str
status: str
source_turn_count: int
redaction_version: str
model: str
conversation_key: str


def build_transcript(turns: Iterable[tuple[str, str]]) -> str:
"""``turns`` = ordered ``(role, content)``. Each turn is redacted before it can
reach the summarization model (PAN/PII never leaves the boundary unredacted)."""
lines: list[str] = []
for role, content in turns:
safe = sanitize_for_persistence(content) or ""
speaker = "Cliente" if role == "user" else "Agente"
lines.append(f"{speaker}: {safe}")
return "\n".join(lines)


def build_summary_prompt(transcript: str) -> str:
return (
"Voce resume conversas de atendimento. Leia a conversa e responda APENAS "
"com um objeto JSON, sem texto fora do JSON.\n"
"Campos:\n"
' - "problem": o problema/objetivo do cliente, em uma frase factual.\n'
' - "solution": a solucao dada ou tentada, em uma frase factual.\n'
' - "status": exatamente um de "resolvido", "em_aberto", "escalado".\n'
"Nao invente dados que nao estao na conversa.\n\n"
f"CONVERSA:\n{transcript}\n\n"
'JSON: {"problem": "...", "solution": "...", "status": "..."}'
)


def parse_summary_json(raw: str) -> dict[str, str]:
match = re.search(r"\{.*\}", raw or "", re.DOTALL)
if not match:
raise ValueError("summary response had no JSON object")
data = json.loads(match.group(0))
problem = str(data.get("problem", "")).strip()
solution = str(data.get("solution", "")).strip()
status = str(data.get("status", "")).strip().lower()
if status not in VALID_STATUSES:
status = "em_aberto"
if not problem:
raise ValueError("summary is missing 'problem'")
return {"problem": problem, "solution": solution, "status": status}


def summarize_turns(
*, turns: list[tuple[str, str]], provider: SummaryProvider
) -> dict[str, str]:
raw = provider.generate_answer(build_summary_prompt(build_transcript(turns)))
return parse_summary_json(raw)


def derive_customer_ref(customer_id: Any, session_hash: str | None) -> str:
if customer_id:
return str(customer_id)
if session_hash:
return session_hash
return "unknown"


def run_summary_batch(
connection: Any,
provider: SummaryProvider,
*,
model: str,
inactivity_hours: int = 24,
min_turns: int = 2,
limit: int = 100,
force: bool = False,
) -> dict[str, int]:
"""Summarize eligible (inactive, non-trivial, not-yet-summarized) conversations.

Idempotent: UNIQUE (domain, conversation_key) + ON CONFLICT upsert. ``force``
re-summarizes already-summarized conversations (costs model calls)."""
stats = {"eligible": 0, "summarized": 0, "errors": 0}
with connection.cursor() as cursor:
cursor.execute(
"""
SELECT c.id, d.name, c.customer_id, c.session_hash
FROM conversations c
JOIN domains d ON d.id = c.domain_id
WHERE c.last_message_at IS NOT NULL
AND c.last_message_at < now() - make_interval(hours => %s)
AND (
SELECT count(*) FROM messages m WHERE m.conversation_id = c.id
) >= %s
AND (
%s OR NOT EXISTS (
SELECT 1 FROM conversation_summaries s
WHERE s.domain = d.name AND s.conversation_key = c.id::text
)
)
ORDER BY c.last_message_at DESC
LIMIT %s
""",
(inactivity_hours, min_turns, force, limit),
)
candidates = cursor.fetchall()

stats["eligible"] = len(candidates)
for conv_id, domain_name, customer_id, session_hash in candidates:
try:
with connection.cursor() as cursor:
cursor.execute(
"""
SELECT role, content FROM messages
WHERE conversation_id = %s
ORDER BY created_at, message_sequence
""",
(conv_id,),
)
turns = [(str(role), str(content)) for role, content in cursor.fetchall()]
summary = summarize_turns(turns=turns, provider=provider)
record = ConversationSummaryRecord(
domain=str(domain_name),
customer_ref=derive_customer_ref(customer_id, session_hash),
problem=summary["problem"],
solution=summary["solution"],
status=summary["status"],
source_turn_count=len(turns),
redaction_version=REDACTION_VERSION,
model=model,
conversation_key=str(conv_id),
)
_upsert_summary(connection, record)
stats["summarized"] += 1
except Exception: # noqa: BLE001 - one bad conversation must not stop the batch
stats["errors"] += 1
return stats


def _upsert_summary(connection: Any, record: ConversationSummaryRecord) -> None:
with connection.cursor() as cursor:
cursor.execute(
"""
INSERT INTO conversation_summaries (
domain, customer_ref, problem, solution, status,
source_turn_count, redaction_version, model, conversation_key
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s)
ON CONFLICT (domain, conversation_key) DO UPDATE SET
customer_ref = EXCLUDED.customer_ref,
problem = EXCLUDED.problem,
solution = EXCLUDED.solution,
status = EXCLUDED.status,
source_turn_count = EXCLUDED.source_turn_count,
redaction_version = EXCLUDED.redaction_version,
model = EXCLUDED.model,
summarized_at = now()
""",
(
record.domain,
record.customer_ref,
record.problem,
record.solution,
record.status,
record.source_turn_count,
record.redaction_version,
record.model,
record.conversation_key,
),
)
connection.commit()
13 changes: 13 additions & 0 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ class Settings(BaseSettings):
session_domain_store_backend: str = Field(
default="memory", alias="SESSION_DOMAIN_STORE_BACKEND"
)
session_state_backend: str = Field(
default="memory", alias="SESSION_STATE_BACKEND"
)
session_state_ttl_seconds: int = Field(
default=2700, alias="SESSION_STATE_TTL_SECONDS"
)
persistence_hash_secret: str | None = Field(
default=None,
alias="PERSISTENCE_HASH_SECRET",
Expand All @@ -46,6 +52,10 @@ class Settings(BaseSettings):
default=False,
alias="ENABLE_SUPPORT_INBOX",
)
enable_conversation_summary: bool = Field(
default=False,
alias="ENABLE_CONVERSATION_SUMMARY",
)
verified_handoff_webhook_url: str | None = Field(
default=None,
alias="VERIFIED_HANDOFF_WEBHOOK_URL",
Expand Down Expand Up @@ -356,6 +366,9 @@ def require_api_secret_outside_dev(self) -> Self:
raise ValueError(
"SESSION_DOMAIN_STORE_BACKEND=postgres requires PERSISTENCE_BACKEND=postgres"
)
self.session_state_backend = self.session_state_backend.strip().lower()
if self.session_state_backend not in {"memory", "redis"}:
raise ValueError("SESSION_STATE_BACKEND must be memory or redis")
self.retrieval_backend = self.retrieval_backend.strip().lower()
if self.retrieval_backend not in {"lexical", "pgvector"}:
raise ValueError("RETRIEVAL_BACKEND must be lexical or pgvector")
Expand Down
Loading
Loading