A classification + RAG pipeline for incoming customer support tickets. Each ticket is classified (category + priority), a retrieval query is distilled from it, the top-K nearest past tickets are pulled from a pgvector KB, and an LLM drafts a first-line customer reply grounded in that evidence. The whole thing is exposed as a FastAPI service with a streaming web UI and a CLI, and ships with a built-in evaluation stage.
Assignment brief lives in
README_INSTRUCTIONS.md. This file is implementation notes only.
Current results on data/eval_set.json (n=46):
| Metric | Value |
|---|---|
| Category accuracy | 0.93 |
| Priority accuracy | 0.78 |
| LLM response-judge avg | 0.89 |
| Path | What it is |
|---|---|
docker-compose.yml, Dockerfile |
Two-container stack: pgvector/pgvector:pg16 + FastAPI service |
src/api.py |
FastAPI app — /, /health, /datasets, /run, /run_stream, /ticket, /outputs/{name} |
src/static/index.html |
Streaming web UI (dataset picker, live results, summary panel) |
src/pipeline.py |
process_ticket(...) orchestrator + python -m src.pipeline CLI |
src/agent.py |
LLM stages: classify, build_retrieval_query, retrieve, generate_response |
src/prompts.py |
System prompts for every LLM stage (classify / retrieval / response / judge) |
src/preprocess.py |
Stage 2: ensure canonical KB exists (runs audit + dedup on demand) |
src/postprocess.py |
Stage 5: priority heuristics (question-cap, no-urgency downgrade) |
src/validate.py |
Stage 4: schema validation with safe fallbacks + escalate_to_human |
src/evaluate.py |
Stage 6: label accuracy + LLM response judge + aggregation |
src/config.py |
Central config (models, temperatures, top-K, embeddings) from env / .env |
src/db.py, src/embeddings.py |
Postgres helpers + fastembed (BAAI/bge-small-en-v1.5, 384-dim) |
tools/proxy_chat.py |
OpenAI-compatible chat/completions client (Anthropic system handled) |
tools/llm_kb_audit.py |
Flags rows in the raw KB whose category/priority looks wrong |
tools/dedup_kb.py |
Collapses duplicate-subject rows into one representative row |
tools/explorer_ui.py |
Streamlit KB/data explorer (duplicates, row review, audit runner) |
scripts/seed_kb.py |
Embeds data/knowledge_base_fixed.csv and upserts into Postgres |
data/ |
Inputs (knowledge_base*.csv, eval_set.json) |
output/ |
Evaluation artifacts (eval_results.json, error_analysis.json) |
- Docker Desktop
- Python 3.13 (for the host-side seed script and CLI)
- A
.envwith LLM proxy credentials:
cp .env.example .env
# fill in ANTHROPIC_API_KEY and ANTHROPIC_BASE_URLsrc/config.py also accepts LLM_MODEL, RESPONSE_MODEL, TOP_K,
TEMPERATURE_CLASSIFY, TEMPERATURE_RETRIEVAL_QUERY,
TEMPERATURE_RESPONSE (set any temperature to off to omit the field —
some newer models reject it), and EMBED_MODEL_NAME / EMBED_DIM.
# 1. Start Postgres + pgvector
docker compose up -d db
# 2. Python venv (used by the seed script and the CLI)
python3 -m venv .venv
.venv/bin/pip install -r requirements.txt
# 3. Seed the KB (runs on the host, talks to localhost:5433)
.venv/bin/python scripts/seed_kb.py
# 4. Start the API
docker compose up -d api
# 5. Open the UI
open http://localhost:8000The UI has a dropdown of every .csv / .json under data/. Pick one,
optionally toggle Evaluate, set a Limit, and hit Run — results
stream in as each ticket finishes, and a summary panel updates live when
evaluation is on. The data/ and output/ directories are volume-mounted,
so adding a dataset to data/ shows up in the dropdown without a restart,
and output/eval_results.json and output/error_analysis.json are
written back to the host on every run.
┌─────────────────┐ ┌─────────────────┐
│ Host (you) │ │ Docker network │
│ │ │ │
│ browser ──────┼────────▶ steadfast-api │
│ (port 8000) │ │ (FastAPI) │
│ │ │ │ │
│ seed_kb.py ────┼──┐ │ │ SQL │
│ (venv) │ │ │ ▼ │
│ │ │ │ steadfast-db │
│ │ └─────▶ (pgvector pg16)│
│ │ │ │
│ │ └─────────────────┘
│ │
│ LLM proxy ◀────┼────(HTTPS, httpx)────┐
│ (remote) │ │
└─────────────────┘ │
steadfast-api calls ─┘
steadfast-db—pgvector/pgvector:pg16, port5433on host.steadfast-api— built fromDockerfile, port8000on host. Mounts./data:/app/data:roand./output:/app/output, reads.envfor the LLM proxy, talks todb:5432on the internal network.
src/pipeline.py :: process_ticket(ticket, *, evaluate=False) runs:
-
Classify (
src/agent.py :: classify)CLASSIFICATION_SYSTEM_PROMPT→{category, priority, confidence, flags}. Enums are coerced to the allowed set; invalid JSON falls back tounknown/lowwithescalate_to_human. -
Retrieval query (
build_retrieval_query) Distills subject + body + classification into a short 6-20 word search query. -
Retrieve (
retrieve) Embeds the query withfastembed(BAAI/bge-small-en-v1.5, 384-dim) and runs a cosine-similarity lookup againstkb_tickets(HNSW index),TOP_K = 5. -
Generate response (
generate_response)RESPONSE_SYSTEM_PROMPTpicks one of three modes and drafts a short reply:answer_found(conf 0.75–0.95) — KB clearly covers this.needs_human_check(0.4–0.7) — KB is related but not conclusive.no_relevant_answer(0.1–0.4) — KB doesn't cover this.
-
Postprocess (
src/postprocess.py :: postprocess) Two cheap rules over the customer text to counter the classifier's tendency to over-escalate:- Question-shaped tickets ("how do", "wondering", …) are capped at
low. high/criticaltickets with no urgency signal ("blocking", "outage", "can't", "breach", "data loss", …) are downgraded tomedium.
- Question-shaped tickets ("how do", "wondering", …) are capped at
-
Validate (
src/validate.py :: validate_output) Required fields present,category/priorityin the allowed enum, non-emptyresponse, clampedconfidence, string-listflags. On any issue, falls back to safe defaults and appendsescalate_to_human. -
Evaluate (optional,
src/evaluate.py :: evaluate_ticket)- Label check against
expected_category/expected_priorityif present on the input ticket. - LLM response judge (
RESPONSE_JUDGE_SYSTEM_PROMPT, temperature 0) scores the draft against the ticket and the retrieved KB on a 0–1 scale with a short reason.
- Label check against
Per ticket, the public (assignment) shape is:
{
"ticket_id": "EVAL-001",
"category": "integration",
"priority": "high",
"response": "Hi Cirrus Cloud — ...",
"confidence": 0.85,
"flags": ["escalate_to_human"]
}confidence is the response-generation confidence. The internal
(debug) shape — returned when include_internal=true, from /ticket, or
from the CLI's --internal — additionally includes:
subjectclassification_confidenceresponse_mode(answer_found/needs_human_check/no_relevant_answer)retrieval_queryretrieved— top-K KB matches withticket_id,category,priority,subject,body,resolution,scorepostprocess.adjustments— list of human-readable adjustments appliedvalidation.issues,validation.okevaluation(only whenevaluate=true) —expected_category,expected_priority,category_correct,priority_correct,response_score,response_score_reason
All endpoints at http://localhost:8000.
| Method | Path | Description |
|---|---|---|
GET |
/ |
Streaming web UI |
GET |
/health |
{"ok": true} |
GET |
/datasets |
Lists .csv and .json files under data/ |
POST |
/run |
Run pipeline on a dataset, return all results at once |
POST |
/run_stream |
Same, but stream NDJSON per ticket |
POST |
/ticket |
Run pipeline on one ad-hoc ticket |
GET |
/outputs/{name} |
Download a file from output/ (e.g. eval_results.json) |
Request body:
{
"path": "data/eval_set.json",
"limit": 100,
"include_internal": false,
"evaluate": true
}path— relative to repo root; must live underdata/.limit— optional cap on ticket count (nullor0-or-less means all).include_internal— iftrue,/runreturns the internal/debug object per ticket.evaluate— request evaluation. Silently turned off if the dataset has noexpected_category/expected_prioritylabels.
/run returns {source, count, evaluated, results, summary, output} in
one shot. Both endpoints always persist the run to output/eval_results.json
(and output/error_analysis.json when evaluation ran).
/run_stream emits newline-delimited JSON (application/x-ndjson):
{"event":"start", "source":..., "count":..., "evaluate":...}
{"event":"result", "elapsed_ms":..., "ticket":{...}, "result":{...}, "running":{...}}
{"event":"error", "elapsed_ms":..., "ticket":{...}, "error":"..."}
{"event":"done", "count":..., "output":{...}, "summary":{...}}
running is the rolling evaluation summary, only present when
evaluate=true.
{
"ticket_id": "AD-HOC-1",
"subject": "Dashboard very slow today",
"body": "...",
"customer_name": "Acme",
"plan": "Growth"
}Returns both the final (assignment-shape) object and the internal (debug) object, plus elapsed_ms.
# One JSON object per line on stdout (public shape):
.venv/bin/python -m src.pipeline --input data/eval_set.json --limit 5
# Internal debug object per ticket:
.venv/bin/python -m src.pipeline --input data/eval_set.json --limit 5 --internal
# Full evaluation — writes output/eval_results.json + output/error_analysis.json
# and prints the summary to stderr:
.venv/bin/python -m src.pipeline --evaloutput/eval_results.json has {source, count, results[], summary}.
summary contains overall category_accuracy, priority_accuracy,
response_score_avg, per-category and per-priority breakdowns, and a
validation sub-block with the schema-validation failure rate.
output/error_analysis.json buckets mismatches and low-score responses
for quick inspection.
CREATE TABLE kb_tickets (
ticket_id TEXT PRIMARY KEY,
created_at TIMESTAMPTZ,
customer_name TEXT,
plan TEXT,
subject TEXT,
body TEXT,
category TEXT,
priority TEXT,
resolution TEXT,
resolved_at TIMESTAMPTZ,
search_text TEXT NOT NULL, -- subject + body + resolution
embedding vector(384) -- bge-small-en-v1.5
);
CREATE INDEX kb_tickets_embedding_idx
ON kb_tickets USING hnsw (embedding vector_cosine_ops);scripts/seed_kb.py is idempotent (INSERT ... ON CONFLICT (ticket_id) DO UPDATE), so re-running it refreshes rows in place.
Two tools turn the raw data/knowledge_base.csv into the canonical
data/knowledge_base_fixed.csv consumed by the seeder and the pipeline.
src/preprocess.py runs them on demand if their outputs are missing.
- Read
data/knowledge_base.csv. - Sort by
(category, priority, ticket_id)and batch (default 20). - Send each batch to the LLM with a strict "only flag clearly wrong labels" prompt, parse the JSON response.
- Keep flags above
AUDIT_MIN_CONFIDENCE(default 0.8); on ties, keep the highest-confidence verdict per ticket. - Write
data/knowledge_base_llm_flagged.csv— original columns plussuspect_by_llm,suspect_category,suspect_priority,suggested_category,suggested_priority,suspect_confidence,suspect_reason,llm_model.
- Group rows in the flagged CSV by exact
subjectstring. - In each duplicate group, drop rows where
suspect_by_llm = true(fall back to the original group if that empties it). - Compute the modal
categoryand modalpriorityamong what remains. - Pick the representative row: first match on both modes, else modal category, else modal priority, else the first row.
- Write
data/knowledge_base_fixed.csv.
# Status / logs
docker compose ps
docker logs -f steadfast-api
docker logs -f steadfast-db
# Stop (data + model cache persist in volumes)
docker compose stop
# Stop + remove containers (volumes persist)
docker compose down
# Nuke everything including data + model cache
docker compose down -v
# psql shell
docker exec -it steadfast-db psql -U steadfast -d steadfast
# Rebuild api after code changes
docker compose build api && docker compose up -d api
# Re-seed after editing data/knowledge_base_fixed.csv
.venv/bin/python scripts/seed_kb.py
# Explorer UI (Streamlit)
.venv/bin/streamlit run tools/explorer_ui.py