From 7351b6680667d8e6cc030810cdc6a64fa1481dde Mon Sep 17 00:00:00 2001 From: renanfulas Date: Mon, 29 Jun 2026 15:21:33 -0300 Subject: [PATCH 1/2] Document reference flows (sync vs async) in the tiering plan Adds the concrete operational design agreed in the 2026-06-29 architecture discussion: the per-turn flow (one synchronous Postgres transaction that persists the turn and enqueues the outbox; fail-open hot state; async off-box backup via the worker) and the handoff flow (the support_case + handoff.requested in the same sync transaction is the consistency gate; async external delivery). Clarifies that Redis only ever enters as a non-authoritative hot-state backend / read cache (levels 1-2), never as the durability anchor. Makes the plan self-contained. Co-Authored-By: Claude Opus 4.8 --- .../conversation-persistence-tiering-plan.md | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/docs/quality-plans/conversation-persistence-tiering-plan.md b/docs/quality-plans/conversation-persistence-tiering-plan.md index feccf77..9945a71 100644 --- a/docs/quality-plans/conversation-persistence-tiering-plan.md +++ b/docs/quality-plans/conversation-persistence-tiering-plan.md @@ -138,6 +138,40 @@ Escada de escalada (só sobe quando o gatilho aparecer, sem virar a B inteira): Assim a A **cresce para dentro da B** de forma incremental e reversível, em vez de pagar a complexidade adiantado por uma carga que ainda não existe. +### Fluxos de referência (síncrono vs assíncrono) + +O princípio operacional do Nível 0, concreto. Existe um diagrama de referência +deste desenho (gerado na discussão de 2026-06-29); o texto abaixo é a fonte da +verdade. + +**Fluxo de um turno (qualquer domínio):** + +1. Inbound (web/WhatsApp/…) chega ao `ChatFlowService` — roteia, recupera (pgvector), + responde (LLM). +2. **Síncrono (1 transação Postgres):** grava o turno (`conversations`/`messages` + + `chat_audits`) **e**, na mesma transação, **enfileira o(s) evento(s) no + `operational_outbox`**. Commit atômico = o turno está durável e a cópia está + garantida. Custo ~ms (não percebido; o LLM domina). +3. **Estado quente (síncrono, fail-open):** lê/grava `SessionStateStore` (in-memory + no Nível 0) por `(domain, channel, session_hash)`. Se o store cair, o `/chat` + responde mesmo assim — não é autoritativo. +4. **Assíncrono (worker `dispatch_outbox`, fora do hot path):** entrega o evento + `conversation.turn.archived` ao **backup off-box (R2/S3, append-only)** — at-least-once, + idempotente. RPO ≈ lag do worker (segundos). + +**Fluxo de handoff (escalou para humano):** + +1. Mesma transação síncrona do turno, **mais** o `support_cases` (ticket durável) + + o evento `handoff.requested` no outbox. **Este é o gate de consistência:** se a + transação falhar, **não** se promete humano ao usuário (não "barra" em falso). +2. Assíncrono via outbox: entrega de `handoff.requested` ao consumidor externo + (quando configurado) + a cópia off-box. O **support inbox** (leitura) já serve o + ticket a partir do Postgres, independente da entrega externa. + +**Onde o Redis entra (Níveis 1–2, sob gatilho):** como backend do `SessionStateStore` +(estado quente, TTL, não-autoritativo) e/ou cache de leitura 7d por cima do Postgres +— **nunca** como âncora de durabilidade no Nível 0. + --- ## 5. Tradeoffs honestos / limites de MVP From f9fb2e78c0dca198d0e1a114601bbbf56c9a5a56 Mon Sep 17 00:00:00 2001 From: renanfulas Date: Mon, 29 Jun 2026 15:33:29 -0300 Subject: [PATCH 2/2] =?UTF-8?q?Add=20SessionStateStore=20seam=20with=20in-?= =?UTF-8?q?memory=20backend=20(tiering=20N=C3=ADvel=200)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit First code slice of the layered-persistence plan: a stable, swappable hot-tier session-state seam mirroring ConversationArchiveSink. Adds app/conversations/session_state.py (SessionState, SessionStateStore Protocol, InMemorySessionStateStore with TTL, build_session_state_store_from_env), the SESSION_STATE_BACKEND / SESSION_STATE_TTL_SECONDS config, and wires ChatFlowService to write the state fail-open via an answer() wrapper (keyed by hash_session, never a raw session_id). The store is process-wide in app.state and injected by the /chat and /web routes only under PERSISTENCE_BACKEND=postgres. Default no-op: with no store injected the hot path is byte-for-byte unchanged. The state is non-authoritative (truth stays in the Postgres write-through). The reader, the Redis backend (Nível 1), and migrating the transport escape state come next. Co-Authored-By: Claude Opus 4.8 --- app/api/routes/chat.py | 6 + app/api/routes/web_chat.py | 6 + app/conversations/session_state.py | 120 +++++++++++++ app/core/config.py | 9 + app/main.py | 5 + app/orchestration/chat_flow.py | 70 +++++++- ...versation-persistence-tiering-tech-plan.md | 11 ++ tests/test_session_state_store.py | 165 ++++++++++++++++++ 8 files changed, 391 insertions(+), 1 deletion(-) create mode 100644 app/conversations/session_state.py create mode 100644 tests/test_session_state_store.py diff --git a/app/api/routes/chat.py b/app/api/routes/chat.py index 9eb4180..9ddb9cd 100644 --- a/app/api/routes/chat.py +++ b/app/api/routes/chat.py @@ -42,8 +42,14 @@ def chat( raise HTTPException(status_code=404, detail="Domain not found") database_runtime = request.app.state.database_runtime + session_state_store = ( + getattr(request.app.state, "chat_session_state_store", None) + if settings.persistence_backend == "postgres" + else None + ) response = ChatFlowService( history_service=ConversationHistoryService(database_runtime), + session_state_store=session_state_store, ).answer( domain=domain, question=payload.message, diff --git a/app/api/routes/web_chat.py b/app/api/routes/web_chat.py index 7cdce70..c31f0b1 100644 --- a/app/api/routes/web_chat.py +++ b/app/api/routes/web_chat.py @@ -48,8 +48,14 @@ def create_web_chat( domain = _load_default_domain(settings) database_runtime = request.app.state.database_runtime + session_state_store = ( + getattr(request.app.state, "chat_session_state_store", None) + if settings.persistence_backend == "postgres" + else None + ) chat_response = ChatFlowService( history_service=ConversationHistoryService(database_runtime), + session_state_store=session_state_store, ).answer( domain=domain, question=payload.message, diff --git a/app/conversations/session_state.py b/app/conversations/session_state.py new file mode 100644 index 0000000..9f75e1c --- /dev/null +++ b/app/conversations/session_state.py @@ -0,0 +1,120 @@ +"""Hot-tier session state seam (layered-persistence plan, fatia/Fase 1). + +Mirrors the philosophy of ``ConversationArchiveSink``: a stable interface with a +swappable implementation and a safe default. The state is the *non-authoritative* +hot tier — the source of truth stays in Postgres (decision A). A Redis backend +drops in behind ``SESSION_STATE_BACKEND=redis`` in a later slice (Nível 1) without +touching callers. + +Privacy: keyed by ``(domain, channel, session_hash)`` — never a raw ``session_id`` +nor free PII. +""" +from __future__ import annotations + +import os +import threading +import time +from dataclasses import dataclass +from typing import Callable, Protocol, runtime_checkable + + +SUPPORTED_SESSION_STATE_BACKENDS = {"memory", "redis"} +DEFAULT_SESSION_STATE_TTL_SECONDS = 2700 # 45 min hot window + + +@dataclass(frozen=True) +class SessionState: + """Sanitized hot-tier state for a session. No raw session_id, no free PII.""" + + state: str + domain: str + confidence: float + updated_at: float + turn_id: str | None = None + redaction_version: str | None = None + + +@runtime_checkable +class SessionStateStore(Protocol): + def get( + self, *, domain: str, channel: str, session_hash: str + ) -> SessionState | None: ... + + def put( + self, + *, + domain: str, + channel: str, + session_hash: str, + state: SessionState, + ttl_seconds: int, + ) -> None: ... + + def clear(self, *, domain: str, channel: str, session_hash: str) -> None: ... + + +def _key(domain: str, channel: str, session_hash: str) -> str: + return f"{domain}::{channel}::{session_hash}" + + +class InMemorySessionStateStore: + """Single-process store with wall-clock TTL. Local/CI/test default; mirrors the + role of ``AppendOnlyFileSink``. + + NOT consistent across uvicorn workers and lost on restart — production + multi-worker needs the Redis backend (Nível 1). This is fine because the hot + tier is non-authoritative: the truth is the Postgres write-through. + """ + + def __init__(self, *, time_fn: Callable[[], float] = time.monotonic) -> None: + self._data: dict[str, tuple[float | None, SessionState]] = {} + self._lock = threading.Lock() + self._time_fn = time_fn + + def get( + self, *, domain: str, channel: str, session_hash: str + ) -> SessionState | None: + key = _key(domain, channel, session_hash) + now = self._time_fn() + with self._lock: + entry = self._data.get(key) + if entry is None: + return None + expires_at, state = entry + if expires_at is not None and now >= expires_at: + self._data.pop(key, None) + return None + return state + + def put( + self, + *, + domain: str, + channel: str, + session_hash: str, + state: SessionState, + ttl_seconds: int, + ) -> None: + key = _key(domain, channel, session_hash) + expires_at = self._time_fn() + ttl_seconds if ttl_seconds > 0 else None + with self._lock: + self._data[key] = (expires_at, state) + + def clear(self, *, domain: str, channel: str, session_hash: str) -> None: + key = _key(domain, channel, session_hash) + with self._lock: + self._data.pop(key, None) + + +def build_session_state_store_from_env( + getenv: Callable[[str, str], str] = os.getenv, +) -> SessionStateStore: + backend = (getenv("SESSION_STATE_BACKEND", "memory") or "memory").strip().lower() + if backend not in SUPPORTED_SESSION_STATE_BACKENDS: + raise ValueError(f"unsupported session state backend: {backend}") + if backend == "memory": + return InMemorySessionStateStore() + # backend == "redis": RedisSessionStateStore is the next slice (Nível 1). + raise NotImplementedError( + "SESSION_STATE_BACKEND=redis requires RedisSessionStateStore (Nível 1, not yet implemented)" + ) diff --git a/app/core/config.py b/app/core/config.py index 8efb357..4f6aed4 100644 --- a/app/core/config.py +++ b/app/core/config.py @@ -32,6 +32,12 @@ class Settings(BaseSettings): session_domain_store_backend: str = Field( default="memory", alias="SESSION_DOMAIN_STORE_BACKEND" ) + session_state_backend: str = Field( + default="memory", alias="SESSION_STATE_BACKEND" + ) + session_state_ttl_seconds: int = Field( + default=2700, alias="SESSION_STATE_TTL_SECONDS" + ) persistence_hash_secret: str | None = Field( default=None, alias="PERSISTENCE_HASH_SECRET", @@ -356,6 +362,9 @@ def require_api_secret_outside_dev(self) -> Self: raise ValueError( "SESSION_DOMAIN_STORE_BACKEND=postgres requires PERSISTENCE_BACKEND=postgres" ) + self.session_state_backend = self.session_state_backend.strip().lower() + if self.session_state_backend not in {"memory", "redis"}: + raise ValueError("SESSION_STATE_BACKEND must be memory or redis") self.retrieval_backend = self.retrieval_backend.strip().lower() if self.retrieval_backend not in {"lexical", "pgvector"}: raise ValueError("RETRIEVAL_BACKEND must be lexical or pgvector") diff --git a/app/main.py b/app/main.py index 2f061a2..0b695ee 100644 --- a/app/main.py +++ b/app/main.py @@ -38,6 +38,7 @@ build_session_domain_store, ) from app.web_auth.runtime import create_web_auth_runtime +from app.conversations.session_state import build_session_state_store_from_env from app.db.runtime import DatabaseRuntime from app.core.errors import DatabaseUnavailableError @@ -83,6 +84,10 @@ async def lifespan(_: FastAPI): application.state.session_state_store = InMemorySessionDomainStore(ttl_seconds=900) # Last outbound text per session, to avoid repeating the exact same message. application.state.session_last_out_store = InMemorySessionDomainStore(ttl_seconds=900) + # Hot-tier chat session state (layered-persistence Nível 0): per-process, + # non-authoritative, fail-open. Redis backend drops in at Nível 1 via + # SESSION_STATE_BACKEND. Source of truth stays the Postgres write-through. + application.state.chat_session_state_store = build_session_state_store_from_env() # Best-effort replay protection for the inbound Hermes chat forward (the HMAC # covers only the body, so an identical request is otherwise replayable). application.state.hermes_replay_guard = HermesReplayGuard() diff --git a/app/orchestration/chat_flow.py b/app/orchestration/chat_flow.py index 05d0576..ff96bdf 100644 --- a/app/orchestration/chat_flow.py +++ b/app/orchestration/chat_flow.py @@ -1,4 +1,6 @@ +import logging import re +import time import unicodedata from time import perf_counter @@ -10,7 +12,11 @@ from app.orchestration.confidence import compute_confidence from app.orchestration.prompt_builder import build_prompt from app.retrieval.service import RetrievalService -from app.conversations.service import ConversationHistoryService +from app.conversations.service import ConversationHistoryService, hash_session +from app.conversations.session_state import SessionState, SessionStateStore + + +logger = logging.getLogger(__name__) def _normalize(text: str) -> str: @@ -32,11 +38,13 @@ def __init__( self, *, history_service: ConversationHistoryService | None = None, + session_state_store: SessionStateStore | None = None, ) -> None: self.retrieval_service = RetrievalService() self.llm_service = LLMService() self.handoff_service = HandoffService() self.history_service = history_service + self.session_state_store = session_state_store def answer( self, @@ -47,6 +55,66 @@ def answer( provider_api_key: str | None = None, channel: str = "api", customer_id: str | None = None, + ) -> dict[str, object]: + result = self._answer_inner( + domain, + question, + session_id=session_id, + request_id=request_id, + provider_api_key=provider_api_key, + channel=channel, + customer_id=customer_id, + ) + # Hot-tier state write (non-authoritative, fail-open). No-op without a store. + self._record_session_state( + domain=domain, session_id=session_id, channel=channel, result=result + ) + return result + + def _record_session_state( + self, + *, + domain: DomainConfig, + session_id: str | None, + channel: str, + result: dict[str, object], + ) -> None: + store = self.session_state_store + if store is None or not session_id: + return + try: + from app.core.config import get_settings + + settings = get_settings() + session_hash = hash_session( + session_id, settings.persistence_hash_secret or "" + ) + state = SessionState( + state="escalated" if result.get("escalated") else "answered", + domain=str(result.get("domain") or domain.name), + confidence=float(result.get("confidence") or 0.0), + updated_at=time.time(), + redaction_version=getattr(settings, "persistence_hash_version", None), + ) + store.put( + domain=domain.name, + channel=channel, + session_hash=session_hash, + state=state, + ttl_seconds=settings.session_state_ttl_seconds, + ) + except Exception: # noqa: BLE001 - hot state is non-authoritative; never break /chat. + logger.debug("session_state_write_skipped", exc_info=False) + + def _answer_inner( + self, + domain: DomainConfig, + question: str, + session_id: str | None = None, + request_id: str | None = None, + provider_api_key: str | None = None, + channel: str = "api", + customer_id: str | None = None, ) -> dict[str, object]: total_started_at = perf_counter() retrieval_ms = 0.0 diff --git a/docs/quality-plans/conversation-persistence-tiering-tech-plan.md b/docs/quality-plans/conversation-persistence-tiering-tech-plan.md index b6e3128..cade9b1 100644 --- a/docs/quality-plans/conversation-persistence-tiering-tech-plan.md +++ b/docs/quality-plans/conversation-persistence-tiering-tech-plan.md @@ -47,6 +47,17 @@ Sem mudança de código. Fases 1–4 são as entregas de engenharia. ## Fase 1 — `SessionStateStore` seam + impl in-memory +Status: **implementada (2026-06-29)**, default no-op. O seam ficou em +`app/conversations/session_state.py` (`SessionState` + `SessionStateStore` Protocol + +`InMemorySessionStateStore` com TTL + `build_session_state_store_from_env`). Config: +`SESSION_STATE_BACKEND` (memory|redis) + `SESSION_STATE_TTL_SECONDS` (2700). O +`ChatFlowService` grava o estado via wrapper `answer()`→`_answer_inner()` + +`_record_session_state` (fail-open, chaveado por `hash_session`), e `chat.py`/`web_chat.py` +injetam o store de `app.state.chat_session_state_store` só quando +`persistence_backend==postgres`. Cobertura em `tests/test_session_state_store.py`. +Pendente (próximas fatias): o **leitor** do estado (consumo), o backend Redis +(Nível 1), e migrar o estado de escape do transporte Hermes para o mesmo seam. + Menor incremento de código. Sem Redis, sem infra. ### Arquivos diff --git a/tests/test_session_state_store.py b/tests/test_session_state_store.py new file mode 100644 index 0000000..8ac33b9 --- /dev/null +++ b/tests/test_session_state_store.py @@ -0,0 +1,165 @@ +import time +from types import SimpleNamespace + +import pytest + +from app.conversations.service import hash_session +from app.conversations.session_state import ( + InMemorySessionStateStore, + SessionState, + SessionStateStore, + build_session_state_store_from_env, +) +from app.core.config import get_settings +from app.orchestration.chat_flow import ChatFlowService + + +def _state(label: str = "answered", domain: str = "vendas", conf: float = 0.5) -> SessionState: + return SessionState(state=label, domain=domain, confidence=conf, updated_at=time.time()) + + +class _Clock: + def __init__(self) -> None: + self.t = 1000.0 + + def __call__(self) -> float: + return self.t + + +# --- InMemorySessionStateStore ------------------------------------------------ + +def test_put_get_roundtrip() -> None: + store = InMemorySessionStateStore() + st = _state() + store.put(domain="vendas", channel="whatsapp", session_hash="abc", state=st, ttl_seconds=60) + assert store.get(domain="vendas", channel="whatsapp", session_hash="abc") == st + + +def test_isolation_by_domain_channel_hash() -> None: + store = InMemorySessionStateStore() + store.put(domain="vendas", channel="whatsapp", session_hash="abc", state=_state(), ttl_seconds=60) + assert store.get(domain="suporte", channel="whatsapp", session_hash="abc") is None + assert store.get(domain="vendas", channel="web", session_hash="abc") is None + assert store.get(domain="vendas", channel="whatsapp", session_hash="xyz") is None + + +def test_ttl_expiry() -> None: + clock = _Clock() + store = InMemorySessionStateStore(time_fn=clock) + store.put(domain="vendas", channel="whatsapp", session_hash="abc", state=_state(), ttl_seconds=60) + clock.t += 59 + assert store.get(domain="vendas", channel="whatsapp", session_hash="abc") is not None + clock.t += 2 + assert store.get(domain="vendas", channel="whatsapp", session_hash="abc") is None + + +def test_ttl_zero_never_expires() -> None: + clock = _Clock() + store = InMemorySessionStateStore(time_fn=clock) + store.put(domain="vendas", channel="whatsapp", session_hash="abc", state=_state(), ttl_seconds=0) + clock.t += 100_000 + assert store.get(domain="vendas", channel="whatsapp", session_hash="abc") is not None + + +def test_clear() -> None: + store = InMemorySessionStateStore() + store.put(domain="vendas", channel="whatsapp", session_hash="abc", state=_state(), ttl_seconds=60) + store.clear(domain="vendas", channel="whatsapp", session_hash="abc") + assert store.get(domain="vendas", channel="whatsapp", session_hash="abc") is None + + +# --- build_session_state_store_from_env -------------------------------------- + +def test_build_from_env_memory_and_protocol() -> None: + store = build_session_state_store_from_env(getenv=lambda k, d="": "memory") + assert isinstance(store, InMemorySessionStateStore) + assert isinstance(store, SessionStateStore) + + +def test_build_from_env_defaults_to_memory() -> None: + assert isinstance(build_session_state_store_from_env(getenv=lambda k, d="": d), InMemorySessionStateStore) + + +def test_build_from_env_rejects_unknown_backend() -> None: + with pytest.raises(ValueError): + build_session_state_store_from_env(getenv=lambda k, d="": "mongodb") + + +def test_build_from_env_redis_not_implemented_yet() -> None: + with pytest.raises(NotImplementedError): + build_session_state_store_from_env(getenv=lambda k, d="": "redis") + + +# --- ChatFlowService integration (write path, fail-open) --------------------- + +def _hash(session_id: str) -> str: + return hash_session(session_id, get_settings().persistence_hash_secret or "") + + +def test_record_session_state_writes() -> None: + store = InMemorySessionStateStore() + flow = ChatFlowService(session_state_store=store) + flow._record_session_state( + domain=SimpleNamespace(name="vendas"), + session_id="whatsapp:hermes:abc", + channel="whatsapp", + result={"domain": "vendas", "confidence": 0.42, "escalated": True}, + ) + st = store.get(domain="vendas", channel="whatsapp", session_hash=_hash("whatsapp:hermes:abc")) + assert st is not None + assert st.state == "escalated" + assert st.confidence == 0.42 + + +def test_record_session_state_noop_without_store() -> None: + ChatFlowService()._record_session_state( + domain=SimpleNamespace(name="vendas"), + session_id="web:1", + channel="web", + result={"escalated": False}, + ) # must not raise + + +def test_record_session_state_noop_without_session_id() -> None: + store = InMemorySessionStateStore() + ChatFlowService(session_state_store=store)._record_session_state( + domain=SimpleNamespace(name="vendas"), + session_id=None, + channel="web", + result={"escalated": False}, + ) + assert store.get(domain="vendas", channel="web", session_hash=_hash("x")) is None + + +def test_record_session_state_fail_open() -> None: + class _BoomStore: + def get(self, **kwargs): + return None + + def put(self, **kwargs): + raise RuntimeError("redis down") + + def clear(self, **kwargs): + pass + + # Must swallow the store error: the hot tier is non-authoritative. + ChatFlowService(session_state_store=_BoomStore())._record_session_state( + domain=SimpleNamespace(name="vendas"), + session_id="web:1", + channel="web", + result={"escalated": False}, + ) + + +def test_answer_wrapper_records_state(monkeypatch) -> None: + store = InMemorySessionStateStore() + flow = ChatFlowService(session_state_store=store) + monkeypatch.setattr( + flow, + "_answer_inner", + lambda *a, **k: {"domain": "vendas", "confidence": 0.7, "escalated": False}, + ) + out = flow.answer(SimpleNamespace(name="vendas"), "oi", session_id="web:123", channel="web") + assert out["confidence"] == 0.7 + st = store.get(domain="vendas", channel="web", session_hash=_hash("web:123")) + assert st is not None and st.state == "answered"