diff --git a/model/perf_metric.go b/model/perf_metric.go index b4a6a9248ed..f081147e703 100644 --- a/model/perf_metric.go +++ b/model/perf_metric.go @@ -37,13 +37,17 @@ func UpsertPerfMetric(metric *PerfMetric) error { {Name: "bucket_ts"}, }, DoUpdates: clause.Assignments(map[string]interface{}{ - "request_count": gorm.Expr("request_count + ?", metric.RequestCount), - "success_count": gorm.Expr("success_count + ?", metric.SuccessCount), - "total_latency_ms": gorm.Expr("total_latency_ms + ?", metric.TotalLatencyMs), - "ttft_sum_ms": gorm.Expr("ttft_sum_ms + ?", metric.TtftSumMs), - "ttft_count": gorm.Expr("ttft_count + ?", metric.TtftCount), - "output_tokens": gorm.Expr("output_tokens + ?", metric.OutputTokens), - "generation_ms": gorm.Expr("generation_ms + ?", metric.GenerationMs), + // Qualify the right-hand column with the table name. PostgreSQL (>=14.4) + // treats an unqualified column in ON CONFLICT DO UPDATE as ambiguous + // between the existing row and the excluded row. Table-qualified names + // are accepted by SQLite/MySQL/PostgreSQL alike. + "request_count": gorm.Expr("perf_metrics.request_count + ?", metric.RequestCount), + "success_count": gorm.Expr("perf_metrics.success_count + ?", metric.SuccessCount), + "total_latency_ms": gorm.Expr("perf_metrics.total_latency_ms + ?", metric.TotalLatencyMs), + "ttft_sum_ms": gorm.Expr("perf_metrics.ttft_sum_ms + ?", metric.TtftSumMs), + "ttft_count": gorm.Expr("perf_metrics.ttft_count + ?", metric.TtftCount), + "output_tokens": gorm.Expr("perf_metrics.output_tokens + ?", metric.OutputTokens), + "generation_ms": gorm.Expr("perf_metrics.generation_ms + ?", metric.GenerationMs), }), }).Create(metric).Error } diff --git a/pkg/perf_metrics/flush.go b/pkg/perf_metrics/flush.go index dddc2472585..32af174acc5 100644 --- a/pkg/perf_metrics/flush.go +++ b/pkg/perf_metrics/flush.go @@ -51,8 +51,15 @@ func flushCompletedBuckets() { GenerationMs: drained.generationMs, }) if err != nil { - bucket.addCounters(drained) common.SysError(fmt.Sprintf("failed to flush perf metric bucket model=%s group=%s bucket=%d: %s", k.model, k.group, k.bucketTs, err.Error())) + // Give up on buckets that have been failing past the 24h horizon instead + // of retrying (and holding memory) forever; otherwise a persistently + // failing flush leaks memory and spams logs indefinitely. + if k.bucketTs < bucketStart(time.Now().Add(-24*time.Hour).Unix()) { + hotBuckets.Delete(key) + return true + } + bucket.addCounters(drained) return true }