Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions app/api/routes/chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,14 @@ def chat(
raise HTTPException(status_code=404, detail="Domain not found")

database_runtime = request.app.state.database_runtime
session_state_store = (
getattr(request.app.state, "chat_session_state_store", None)
if settings.persistence_backend == "postgres"
else None
)
response = ChatFlowService(
history_service=ConversationHistoryService(database_runtime),
session_state_store=session_state_store,
).answer(
domain=domain,
question=payload.message,
Expand Down
6 changes: 6 additions & 0 deletions app/api/routes/web_chat.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,14 @@ def create_web_chat(
domain = _load_default_domain(settings)

database_runtime = request.app.state.database_runtime
session_state_store = (
getattr(request.app.state, "chat_session_state_store", None)
if settings.persistence_backend == "postgres"
else None
)
chat_response = ChatFlowService(
history_service=ConversationHistoryService(database_runtime),
session_state_store=session_state_store,
).answer(
domain=domain,
question=payload.message,
Expand Down
120 changes: 120 additions & 0 deletions app/conversations/session_state.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
"""Hot-tier session state seam (layered-persistence plan, fatia/Fase 1).

Mirrors the philosophy of ``ConversationArchiveSink``: a stable interface with a
swappable implementation and a safe default. The state is the *non-authoritative*
hot tier — the source of truth stays in Postgres (decision A). A Redis backend
drops in behind ``SESSION_STATE_BACKEND=redis`` in a later slice (Nível 1) without
touching callers.

Privacy: keyed by ``(domain, channel, session_hash)`` — never a raw ``session_id``
nor free PII.
"""
from __future__ import annotations

import os
import threading
import time
from dataclasses import dataclass
from typing import Callable, Protocol, runtime_checkable


SUPPORTED_SESSION_STATE_BACKENDS = {"memory", "redis"}
DEFAULT_SESSION_STATE_TTL_SECONDS = 2700 # 45 min hot window


@dataclass(frozen=True)
class SessionState:
"""Sanitized hot-tier state for a session. No raw session_id, no free PII."""

state: str
domain: str
confidence: float
updated_at: float
turn_id: str | None = None
redaction_version: str | None = None


@runtime_checkable
class SessionStateStore(Protocol):
def get(
self, *, domain: str, channel: str, session_hash: str
) -> SessionState | None: ...

def put(
self,
*,
domain: str,
channel: str,
session_hash: str,
state: SessionState,
ttl_seconds: int,
) -> None: ...

def clear(self, *, domain: str, channel: str, session_hash: str) -> None: ...


def _key(domain: str, channel: str, session_hash: str) -> str:
return f"{domain}::{channel}::{session_hash}"


class InMemorySessionStateStore:
"""Single-process store with wall-clock TTL. Local/CI/test default; mirrors the
role of ``AppendOnlyFileSink``.

NOT consistent across uvicorn workers and lost on restart — production
multi-worker needs the Redis backend (Nível 1). This is fine because the hot
tier is non-authoritative: the truth is the Postgres write-through.
"""

def __init__(self, *, time_fn: Callable[[], float] = time.monotonic) -> None:
self._data: dict[str, tuple[float | None, SessionState]] = {}
self._lock = threading.Lock()
self._time_fn = time_fn

def get(
self, *, domain: str, channel: str, session_hash: str
) -> SessionState | None:
key = _key(domain, channel, session_hash)
now = self._time_fn()
with self._lock:
entry = self._data.get(key)
if entry is None:
return None
expires_at, state = entry
if expires_at is not None and now >= expires_at:
self._data.pop(key, None)
return None
return state

def put(
self,
*,
domain: str,
channel: str,
session_hash: str,
state: SessionState,
ttl_seconds: int,
) -> None:
key = _key(domain, channel, session_hash)
expires_at = self._time_fn() + ttl_seconds if ttl_seconds > 0 else None
with self._lock:
self._data[key] = (expires_at, state)

def clear(self, *, domain: str, channel: str, session_hash: str) -> None:
key = _key(domain, channel, session_hash)
with self._lock:
self._data.pop(key, None)


def build_session_state_store_from_env(
getenv: Callable[[str, str], str] = os.getenv,
) -> SessionStateStore:
backend = (getenv("SESSION_STATE_BACKEND", "memory") or "memory").strip().lower()
if backend not in SUPPORTED_SESSION_STATE_BACKENDS:
raise ValueError(f"unsupported session state backend: {backend}")
if backend == "memory":
return InMemorySessionStateStore()
# backend == "redis": RedisSessionStateStore is the next slice (Nível 1).
raise NotImplementedError(
"SESSION_STATE_BACKEND=redis requires RedisSessionStateStore (Nível 1, not yet implemented)"
)
9 changes: 9 additions & 0 deletions app/core/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ class Settings(BaseSettings):
session_domain_store_backend: str = Field(
default="memory", alias="SESSION_DOMAIN_STORE_BACKEND"
)
session_state_backend: str = Field(
default="memory", alias="SESSION_STATE_BACKEND"
)
session_state_ttl_seconds: int = Field(
default=2700, alias="SESSION_STATE_TTL_SECONDS"
)
persistence_hash_secret: str | None = Field(
default=None,
alias="PERSISTENCE_HASH_SECRET",
Expand Down Expand Up @@ -356,6 +362,9 @@ def require_api_secret_outside_dev(self) -> Self:
raise ValueError(
"SESSION_DOMAIN_STORE_BACKEND=postgres requires PERSISTENCE_BACKEND=postgres"
)
self.session_state_backend = self.session_state_backend.strip().lower()
if self.session_state_backend not in {"memory", "redis"}:
raise ValueError("SESSION_STATE_BACKEND must be memory or redis")
self.retrieval_backend = self.retrieval_backend.strip().lower()
if self.retrieval_backend not in {"lexical", "pgvector"}:
raise ValueError("RETRIEVAL_BACKEND must be lexical or pgvector")
Expand Down
5 changes: 5 additions & 0 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
build_session_domain_store,
)
from app.web_auth.runtime import create_web_auth_runtime
from app.conversations.session_state import build_session_state_store_from_env
from app.db.runtime import DatabaseRuntime
from app.core.errors import DatabaseUnavailableError

Expand Down Expand Up @@ -83,6 +84,10 @@ async def lifespan(_: FastAPI):
application.state.session_state_store = InMemorySessionDomainStore(ttl_seconds=900)
# Last outbound text per session, to avoid repeating the exact same message.
application.state.session_last_out_store = InMemorySessionDomainStore(ttl_seconds=900)
# Hot-tier chat session state (layered-persistence Nível 0): per-process,
# non-authoritative, fail-open. Redis backend drops in at Nível 1 via
# SESSION_STATE_BACKEND. Source of truth stays the Postgres write-through.
application.state.chat_session_state_store = build_session_state_store_from_env()
# Best-effort replay protection for the inbound Hermes chat forward (the HMAC
# covers only the body, so an identical request is otherwise replayable).
application.state.hermes_replay_guard = HermesReplayGuard()
Expand Down
70 changes: 69 additions & 1 deletion app/orchestration/chat_flow.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import logging
import re
import time
import unicodedata
from time import perf_counter

Expand All @@ -10,7 +12,11 @@
from app.orchestration.confidence import compute_confidence
from app.orchestration.prompt_builder import build_prompt
from app.retrieval.service import RetrievalService
from app.conversations.service import ConversationHistoryService
from app.conversations.service import ConversationHistoryService, hash_session
from app.conversations.session_state import SessionState, SessionStateStore


logger = logging.getLogger(__name__)


def _normalize(text: str) -> str:
Expand All @@ -32,11 +38,13 @@ def __init__(
self,
*,
history_service: ConversationHistoryService | None = None,
session_state_store: SessionStateStore | None = None,
) -> None:
self.retrieval_service = RetrievalService()
self.llm_service = LLMService()
self.handoff_service = HandoffService()
self.history_service = history_service
self.session_state_store = session_state_store

def answer(
self,
Expand All @@ -47,6 +55,66 @@ def answer(
provider_api_key: str | None = None,
channel: str = "api",
customer_id: str | None = None,
) -> dict[str, object]:
result = self._answer_inner(
domain,
question,
session_id=session_id,
request_id=request_id,
provider_api_key=provider_api_key,
channel=channel,
customer_id=customer_id,
)
# Hot-tier state write (non-authoritative, fail-open). No-op without a store.
self._record_session_state(
domain=domain, session_id=session_id, channel=channel, result=result
)
return result

def _record_session_state(
self,
*,
domain: DomainConfig,
session_id: str | None,
channel: str,
result: dict[str, object],
) -> None:
store = self.session_state_store
if store is None or not session_id:
return
try:
from app.core.config import get_settings

settings = get_settings()
session_hash = hash_session(
session_id, settings.persistence_hash_secret or ""
)
state = SessionState(
state="escalated" if result.get("escalated") else "answered",
domain=str(result.get("domain") or domain.name),
confidence=float(result.get("confidence") or 0.0),
updated_at=time.time(),
redaction_version=getattr(settings, "persistence_hash_version", None),
)
store.put(
domain=domain.name,
channel=channel,
session_hash=session_hash,
state=state,
ttl_seconds=settings.session_state_ttl_seconds,
)
except Exception: # noqa: BLE001 - hot state is non-authoritative; never break /chat.
logger.debug("session_state_write_skipped", exc_info=False)

def _answer_inner(
self,
domain: DomainConfig,
question: str,
session_id: str | None = None,
request_id: str | None = None,
provider_api_key: str | None = None,
channel: str = "api",
customer_id: str | None = None,
) -> dict[str, object]:
total_started_at = perf_counter()
retrieval_ms = 0.0
Expand Down
34 changes: 34 additions & 0 deletions docs/quality-plans/conversation-persistence-tiering-plan.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,40 @@ Escada de escalada (só sobe quando o gatilho aparecer, sem virar a B inteira):
Assim a A **cresce para dentro da B** de forma incremental e reversível, em vez de
pagar a complexidade adiantado por uma carga que ainda não existe.

### Fluxos de referência (síncrono vs assíncrono)

O princípio operacional do Nível 0, concreto. Existe um diagrama de referência
deste desenho (gerado na discussão de 2026-06-29); o texto abaixo é a fonte da
verdade.

**Fluxo de um turno (qualquer domínio):**

1. Inbound (web/WhatsApp/…) chega ao `ChatFlowService` — roteia, recupera (pgvector),
responde (LLM).
2. **Síncrono (1 transação Postgres):** grava o turno (`conversations`/`messages` +
`chat_audits`) **e**, na mesma transação, **enfileira o(s) evento(s) no
`operational_outbox`**. Commit atômico = o turno está durável e a cópia está
garantida. Custo ~ms (não percebido; o LLM domina).
3. **Estado quente (síncrono, fail-open):** lê/grava `SessionStateStore` (in-memory
no Nível 0) por `(domain, channel, session_hash)`. Se o store cair, o `/chat`
responde mesmo assim — não é autoritativo.
4. **Assíncrono (worker `dispatch_outbox`, fora do hot path):** entrega o evento
`conversation.turn.archived` ao **backup off-box (R2/S3, append-only)** — at-least-once,
idempotente. RPO ≈ lag do worker (segundos).

**Fluxo de handoff (escalou para humano):**

1. Mesma transação síncrona do turno, **mais** o `support_cases` (ticket durável) +
o evento `handoff.requested` no outbox. **Este é o gate de consistência:** se a
transação falhar, **não** se promete humano ao usuário (não "barra" em falso).
2. Assíncrono via outbox: entrega de `handoff.requested` ao consumidor externo
(quando configurado) + a cópia off-box. O **support inbox** (leitura) já serve o
ticket a partir do Postgres, independente da entrega externa.

**Onde o Redis entra (Níveis 1–2, sob gatilho):** como backend do `SessionStateStore`
(estado quente, TTL, não-autoritativo) e/ou cache de leitura 7d por cima do Postgres
— **nunca** como âncora de durabilidade no Nível 0.

---

## 5. Tradeoffs honestos / limites de MVP
Expand Down
11 changes: 11 additions & 0 deletions docs/quality-plans/conversation-persistence-tiering-tech-plan.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,17 @@ Sem mudança de código. Fases 1–4 são as entregas de engenharia.

## Fase 1 — `SessionStateStore` seam + impl in-memory

Status: **implementada (2026-06-29)**, default no-op. O seam ficou em
`app/conversations/session_state.py` (`SessionState` + `SessionStateStore` Protocol +
`InMemorySessionStateStore` com TTL + `build_session_state_store_from_env`). Config:
`SESSION_STATE_BACKEND` (memory|redis) + `SESSION_STATE_TTL_SECONDS` (2700). O
`ChatFlowService` grava o estado via wrapper `answer()`→`_answer_inner()` +
`_record_session_state` (fail-open, chaveado por `hash_session`), e `chat.py`/`web_chat.py`
injetam o store de `app.state.chat_session_state_store` só quando
`persistence_backend==postgres`. Cobertura em `tests/test_session_state_store.py`.
Pendente (próximas fatias): o **leitor** do estado (consumo), o backend Redis
(Nível 1), e migrar o estado de escape do transporte Hermes para o mesmo seam.

Menor incremento de código. Sem Redis, sem infra.

### Arquivos
Expand Down
Loading
Loading