diff --git a/frontend/src/generated/core/api.schemas.ts b/frontend/src/generated/core/api.schemas.ts index 165e42ff1e50..0355024a3771 100644 --- a/frontend/src/generated/core/api.schemas.ts +++ b/frontend/src/generated/core/api.schemas.ts @@ -1085,9 +1085,10 @@ export const TargetTypeEnumApi = { * `monthly` - Monthly * `yearly` - Yearly */ -export type FrequencyEnumApi = (typeof FrequencyEnumApi)[keyof typeof FrequencyEnumApi] +export type SubscriptionFrequencyEnumApi = + (typeof SubscriptionFrequencyEnumApi)[keyof typeof SubscriptionFrequencyEnumApi] -export const FrequencyEnumApi = { +export const SubscriptionFrequencyEnumApi = { Daily: 'daily', Weekly: 'weekly', Monthly: 'monthly', @@ -1131,7 +1132,7 @@ export interface SubscriptionApi { dashboard_export_insights?: number[] target_type: TargetTypeEnumApi target_value: string - frequency: FrequencyEnumApi + frequency: SubscriptionFrequencyEnumApi /** * @minimum -2147483648 * @maximum 2147483647 @@ -1196,7 +1197,7 @@ export interface PatchedSubscriptionApi { dashboard_export_insights?: number[] target_type?: TargetTypeEnumApi target_value?: string - frequency?: FrequencyEnumApi + frequency?: SubscriptionFrequencyEnumApi /** * @minimum -2147483648 * @maximum 2147483647 diff --git a/posthog/templates/email/evaluation_report.html b/posthog/templates/email/evaluation_report.html new file mode 100644 index 000000000000..16424875cf08 --- /dev/null +++ b/posthog/templates/email/evaluation_report.html @@ -0,0 +1,19 @@ +{% extends "email/base.html" %} + +{% block heading %}{% endblock %} + +{% block section %} + +{% comment %} +report_body is markdown rendered by markdown-it-py with html=False, so raw HTML in +the markdown source is escaped. Content originates from our own LLM agent via +structured tool outputs (not user input), and is delivered via email only. +{% endcomment %} +{# nosemgrep: python.flask.security.xss.audit.template-unescaped-with-safe.template-unescaped-with-safe #} +{{ report_body|safe }} + +
+ +{% endblock %} diff --git a/posthog/temporal/llm_analytics/__init__.py b/posthog/temporal/llm_analytics/__init__.py index 91a8d754ce81..15dd98396ef8 100644 --- a/posthog/temporal/llm_analytics/__init__.py +++ b/posthog/temporal/llm_analytics/__init__.py @@ -1,3 +1,17 @@ +from posthog.temporal.llm_analytics.eval_reports.activities import ( + deliver_report_activity, + fetch_count_triggered_eval_reports_activity, + fetch_due_eval_reports_activity, + prepare_report_context_activity, + run_eval_report_agent_activity, + store_report_run_activity, + update_next_delivery_date_activity, +) +from posthog.temporal.llm_analytics.eval_reports.workflow import ( + CheckCountTriggeredReportsWorkflow, + GenerateAndDeliverEvalReportWorkflow, + ScheduleAllEvalReportsWorkflow, +) from posthog.temporal.llm_analytics.metrics import EvalsMetricsInterceptor # noqa: F401 from posthog.temporal.llm_analytics.run_evaluation import ( RunEvaluationWorkflow, @@ -64,6 +78,10 @@ BatchTraceSummarizationCoordinatorWorkflow, DailyTraceClusteringWorkflow, TraceClusteringCoordinatorWorkflow, + # Evaluation reports + ScheduleAllEvalReportsWorkflow, + CheckCountTriggeredReportsWorkflow, + GenerateAndDeliverEvalReportWorkflow, # Keep sentiment workflow registered here temporarily so orphaned workflows on general-purpose queue can complete ClassifySentimentWorkflow, # Keep eval workflow registered here temporarily so orphaned workflows on general-purpose queue can complete @@ -84,6 +102,14 @@ perform_clustering_compute_activity, generate_cluster_labels_activity, emit_cluster_events_activity, + # Evaluation report activities + fetch_due_eval_reports_activity, + fetch_count_triggered_eval_reports_activity, + prepare_report_context_activity, + run_eval_report_agent_activity, + store_report_run_activity, + deliver_report_activity, + update_next_delivery_date_activity, # Keep sentiment activity registered here temporarily so orphaned workflows on general-purpose queue can complete classify_sentiment_activity, # Keep eval activities registered here temporarily so orphaned workflows on general-purpose queue can complete diff --git a/posthog/temporal/llm_analytics/eval_reports/activities.py b/posthog/temporal/llm_analytics/eval_reports/activities.py new file mode 100644 index 000000000000..6edcab3bd04d --- /dev/null +++ b/posthog/temporal/llm_analytics/eval_reports/activities.py @@ -0,0 +1,397 @@ +"""Activities for evaluation reports workflow.""" + +import datetime as dt + +import temporalio.activity +from structlog import get_logger + +from posthog.sync import database_sync_to_async +from posthog.temporal.common.heartbeat import Heartbeater +from posthog.temporal.llm_analytics.eval_reports.types import ( + CheckCountTriggeredReportsWorkflowInputs, + DeliverReportInput, + FetchDueEvalReportsOutput, + PrepareReportContextInput, + PrepareReportContextOutput, + RunEvalReportAgentInput, + RunEvalReportAgentOutput, + ScheduleAllEvalReportsWorkflowInputs, + StoreReportRunInput, + StoreReportRunOutput, + UpdateNextDeliveryDateInput, +) + +logger = get_logger(__name__) + + +@temporalio.activity.defn +async def fetch_due_eval_reports_activity( + inputs: ScheduleAllEvalReportsWorkflowInputs, +) -> FetchDueEvalReportsOutput: + """Return a list of time-based evaluation report IDs that are due for delivery.""" + now_with_buffer = dt.datetime.now(tz=dt.UTC) + dt.timedelta(minutes=inputs.buffer_minutes) + + @database_sync_to_async(thread_sensitive=False) + def get_report_ids() -> list[str]: + from products.llm_analytics.backend.models.evaluation_reports import EvaluationReport + + return [ + str(pk) + for pk in EvaluationReport.objects.filter( + next_delivery_date__lte=now_with_buffer, + enabled=True, + deleted=False, + ) + .exclude(frequency=EvaluationReport.Frequency.EVERY_N) + .values_list("id", flat=True) + ] + + report_ids = await get_report_ids() + await logger.ainfo(f"Found {len(report_ids)} due evaluation reports") + return FetchDueEvalReportsOutput(report_ids=report_ids) + + +@temporalio.activity.defn +async def fetch_count_triggered_eval_reports_activity( + inputs: CheckCountTriggeredReportsWorkflowInputs, +) -> FetchDueEvalReportsOutput: + """Check count-based reports and return those whose eval count exceeds the threshold.""" + + @database_sync_to_async(thread_sensitive=False) + def check_reports() -> list[str]: + from posthog.hogql.parser import parse_select + from posthog.hogql.query import execute_hogql_query + + from posthog.models import Team + + from products.llm_analytics.backend.models.evaluation_reports import EvaluationReport, EvaluationReportRun + + now = dt.datetime.now(tz=dt.UTC) + due: list[str] = [] + + reports = EvaluationReport.objects.filter( + frequency=EvaluationReport.Frequency.EVERY_N, + enabled=True, + deleted=False, + trigger_threshold__isnull=False, + ).select_related("evaluation") + + for report in reports: + # Cooldown: skip if last delivery was too recent + if report.last_delivered_at: + cooldown_delta = dt.timedelta(minutes=report.cooldown_minutes) + if (now - report.last_delivered_at) < cooldown_delta: + continue + + # Daily cap: skip if too many runs today + today_start = now.replace(hour=0, minute=0, second=0, microsecond=0) + today_runs = EvaluationReportRun.objects.filter( + report=report, + created_at__gte=today_start, + ).count() + if today_runs >= report.daily_run_cap: + continue + + # Count evals since last delivery (or start_date if first run) + since = report.last_delivered_at or report.start_date + since_str = since.strftime("%Y-%m-%d %H:%M:%S.%f") + + team = Team.objects.get(id=report.team_id) + query = parse_select( + f""" + SELECT count() as total + FROM events + WHERE event = '$ai_evaluation' + AND properties.$ai_evaluation_id = '{report.evaluation_id}' + AND timestamp >= '{since_str}' + """ + ) + result = execute_hogql_query(query=query, team=team) + rows = result.results or [] + count = rows[0][0] if rows else 0 + + if count >= report.trigger_threshold: + due.append(str(report.id)) + + return due + + report_ids = await check_reports() + await logger.ainfo(f"Found {len(report_ids)} count-triggered evaluation reports ready") + return FetchDueEvalReportsOutput(report_ids=report_ids) + + +def _find_nth_eval_timestamp( + team_id: int, + evaluation_id: str, + n: int, + before: dt.datetime, +) -> dt.datetime: + """Find the timestamp of the Nth-most-recent eval result. + + Returns the timestamp so the report window covers exactly the last N evals. + Falls back to 24h ago if there are fewer than N results. + """ + from posthog.hogql.parser import parse_select + from posthog.hogql.query import execute_hogql_query + + from posthog.models import Team + + team = Team.objects.get(id=team_id) + before_str = before.strftime("%Y-%m-%d %H:%M:%S.%f") + query = parse_select( + f""" + SELECT min(ts) FROM ( + SELECT timestamp as ts + FROM events + WHERE event = '$ai_evaluation' + AND properties.$ai_evaluation_id = '{evaluation_id}' + AND timestamp <= '{before_str}' + ORDER BY timestamp DESC + LIMIT {int(n)} + ) + """ + ) + result = execute_hogql_query(query=query, team=team) + rows = result.results or [] + if rows and rows[0][0] is not None: + ts = rows[0][0] + if isinstance(ts, dt.datetime): + if ts.tzinfo is None: + return ts.replace(tzinfo=dt.UTC) + return ts + # Fallback: 24h ago + return before - dt.timedelta(days=1) + + +@temporalio.activity.defn +async def prepare_report_context_activity( + inputs: PrepareReportContextInput, +) -> PrepareReportContextOutput: + """Load evaluation from Postgres and calculate time windows.""" + + @database_sync_to_async(thread_sensitive=False) + def prepare() -> PrepareReportContextOutput: + from products.llm_analytics.backend.models.evaluation_reports import EvaluationReport + + report = EvaluationReport.objects.select_related("evaluation").get(id=inputs.report_id) + evaluation = report.evaluation + now = dt.datetime.now(tz=dt.UTC) + + # Period end is now, period start depends on context + period_end = now + freq_deltas = { + "hourly": dt.timedelta(hours=1), + "daily": dt.timedelta(days=1), + "weekly": dt.timedelta(weeks=1), + } + + if inputs.manual: + # Manual "Generate now": always look back one full frequency period + # so the user always gets a meaningful report regardless of last delivery. + if report.frequency == "every_n": + # For count-triggered reports, sample the most recent N evals so + # "Generate now" always produces something useful even if the + # threshold hasn't been crossed yet. + period_start = _find_nth_eval_timestamp( + team_id=report.team_id, + evaluation_id=str(evaluation.id), + n=report.trigger_threshold or 100, + before=now, + ) + else: + period_start = now - freq_deltas.get(report.frequency, dt.timedelta(days=1)) + elif report.last_delivered_at: + period_start = report.last_delivered_at + else: + # First run: look back one period (or to start_date for count-triggered) + if report.frequency == "every_n": + period_start = report.start_date + else: + period_start = now - freq_deltas.get(report.frequency, dt.timedelta(days=1)) + + # Previous period for comparison (same duration, shifted back) + period_duration = period_end - period_start + previous_period_start = period_start - period_duration + + # `report_prompt_guidance` is a per-report TextField that lets users steer + # the agent. Added in migration 0024 (Commit 2 of the v2 schema refactor). + # `getattr` with fallback keeps this activity compatible with the unmigrated + # database state during Commit 1 local testing. + guidance = getattr(report, "report_prompt_guidance", "") or "" + + return PrepareReportContextOutput( + report_id=str(report.id), + team_id=report.team_id, + evaluation_id=str(evaluation.id), + evaluation_name=evaluation.name, + evaluation_description=evaluation.description or "", + evaluation_prompt=evaluation.evaluation_config.get("prompt", ""), + evaluation_type=evaluation.evaluation_type, + period_start=period_start.isoformat(), + period_end=period_end.isoformat(), + previous_period_start=previous_period_start.isoformat(), + report_prompt_guidance=guidance, + ) + + return await prepare() + + +@temporalio.activity.defn +async def run_eval_report_agent_activity( + inputs: RunEvalReportAgentInput, +) -> RunEvalReportAgentOutput: + """Run the LLM report agent.""" + async with Heartbeater(): + await logger.ainfo( + "Running eval report agent", + report_id=inputs.report_id, + evaluation_id=inputs.evaluation_id, + ) + + @database_sync_to_async(thread_sensitive=False) + def run_agent(): + from posthog.temporal.llm_analytics.eval_reports.report_agent import run_eval_report_agent + + return run_eval_report_agent( + team_id=inputs.team_id, + evaluation_id=inputs.evaluation_id, + evaluation_name=inputs.evaluation_name, + evaluation_description=inputs.evaluation_description, + evaluation_prompt=inputs.evaluation_prompt, + evaluation_type=inputs.evaluation_type, + period_start=inputs.period_start, + period_end=inputs.period_end, + previous_period_start=inputs.previous_period_start, + report_prompt_guidance=inputs.report_prompt_guidance, + ) + + content = await run_agent() + + return RunEvalReportAgentOutput( + report_id=inputs.report_id, + content=content.to_dict(), + period_start=inputs.period_start, + period_end=inputs.period_end, + ) + + +@temporalio.activity.defn +async def store_report_run_activity( + inputs: StoreReportRunInput, +) -> StoreReportRunOutput: + """Save the generated report as an EvaluationReportRun and emit a $ai_evaluation_report event.""" + + @database_sync_to_async(thread_sensitive=False) + def store() -> str: + import uuid + + from posthog.models.event.util import create_event + from posthog.models.team import Team + + from products.llm_analytics.backend.models.evaluation_reports import EvaluationReportRun + + # Mirror content.metrics into the legacy `metadata` JSONField so existing + # consumers that read from it (e.g. the UI's run preview before Commit 2's + # frontend refresh) still work. + content = inputs.content or {} + metrics = content.get("metrics", {}) or {} + + run = EvaluationReportRun.objects.create( + report_id=inputs.report_id, + content=content, + metadata=metrics, + period_start=inputs.period_start, + period_end=inputs.period_end, + ) + + # Emit $ai_evaluation_report event to ClickHouse + team = Team.objects.get(id=inputs.team_id) + + # Collect citations from structured content (v2), not from per-section lists + citations = content.get("citations", []) or [] + all_referenced_ids = [c.get("generation_id", "") for c in citations if c.get("generation_id")] + + properties: dict = { + "$ai_evaluation_id": inputs.evaluation_id, + "$ai_evaluation_report_id": str(run.report_id), + "$ai_evaluation_report_run_id": str(run.id), + "$ai_report_title": content.get("title", ""), + "$ai_report_period_start": inputs.period_start, + "$ai_report_period_end": inputs.period_end, + # Metrics for querying/alerting (flattened from content.metrics) + "$ai_report_total_runs": metrics.get("total_runs", 0), + "$ai_report_pass_count": metrics.get("pass_count", 0), + "$ai_report_fail_count": metrics.get("fail_count", 0), + "$ai_report_na_count": metrics.get("na_count", 0), + "$ai_report_pass_rate": metrics.get("pass_rate", 0.0), + "$ai_report_previous_pass_rate": metrics.get("previous_pass_rate"), + "$ai_report_previous_total_runs": metrics.get("previous_total_runs"), + # Structured content + citations for downstream consumption + "$ai_report_content": content, + "$ai_report_citations": citations, + "$ai_report_referenced_generation_ids": all_referenced_ids, + "$ai_report_section_count": len(content.get("sections", [])), + } + + create_event( + event_uuid=uuid.uuid4(), + event="$ai_evaluation_report", + team=team, + distinct_id=f"eval_report_{inputs.team_id}", + properties=properties, + ) + + return str(run.id) + + run_id = await store() + return StoreReportRunOutput(report_run_id=run_id) + + +@temporalio.activity.defn +async def deliver_report_activity( + inputs: DeliverReportInput, +) -> None: + """Deliver the report via configured delivery targets (email/Slack).""" + async with Heartbeater(): + await logger.ainfo( + "Delivering evaluation report", + report_id=inputs.report_id, + report_run_id=inputs.report_run_id, + ) + + @database_sync_to_async(thread_sensitive=False) + def deliver(): + from posthog.temporal.llm_analytics.eval_reports.delivery import deliver_report + + deliver_report( + report_id=inputs.report_id, + report_run_id=inputs.report_run_id, + ) + + await deliver() + + +@temporalio.activity.defn +async def update_next_delivery_date_activity( + inputs: UpdateNextDeliveryDateInput, +) -> None: + """Update the report's next_delivery_date and last_delivered_at. + + last_delivered_at is set to the report's period_end (captured at the start of + this run) rather than the current wall-clock time. This guarantees that the + next run's period_start picks up exactly where this run's period_end left off, + so any time spent generating/delivering does not create a coverage gap. + """ + + @database_sync_to_async(thread_sensitive=False) + def update(): + import datetime as dt_mod + + from products.llm_analytics.backend.models.evaluation_reports import EvaluationReport + + report = EvaluationReport.objects.get(id=inputs.report_id) + report.last_delivered_at = dt_mod.datetime.fromisoformat(inputs.period_end) + report.set_next_delivery_date() + report.save(update_fields=["last_delivered_at", "next_delivery_date"]) + + await update() diff --git a/posthog/temporal/llm_analytics/eval_reports/delivery.py b/posthog/temporal/llm_analytics/eval_reports/delivery.py new file mode 100644 index 000000000000..6a3655117066 --- /dev/null +++ b/posthog/temporal/llm_analytics/eval_reports/delivery.py @@ -0,0 +1,427 @@ +"""Delivery logic for evaluation reports (email and Slack). + +Content shape: `EvalReportContent` has a `title`, 1-6 titled `sections`, a list +of structured `citations`, and a `metrics` block. The renderers below build the +email HTML body and Slack Block Kit payloads from that shape. +""" + +import re +from datetime import UTC, datetime + +import structlog +from markdown_it import MarkdownIt +from markdown_to_mrkdwn import SlackMarkdownConverter + +from posthog.temporal.llm_analytics.eval_reports.report_agent.schema import EvalReportContent, EvalReportMetrics + +logger = structlog.get_logger(__name__) + +# Matches a leading markdown heading line at the very start of a section's content. +# The renderer (email/Slack/UI) already emits its own section title, so if the agent +# also started the section with its own `## Executive summary` heading we strip it +# to avoid duplicated titles. See EvaluationReportViewer.tsx for the parallel fix. +_LEADING_HEADING_RE = re.compile(r"^\s*#{1,6}\s+(.+?)\s*(?:\r?\n|$)") + +# html=False escapes any raw HTML in the markdown source — defense in depth +# even though the markdown is produced by our own LLM agent via structured tools. +_md = MarkdownIt("commonmark", {"html": False}).enable("table") +_slack_converter = SlackMarkdownConverter() + +# Inline styles for email-safe HTML (many clients strip