From d46b5ceb4fcf15f2cdec27e21c312b2b34e326b8 Mon Sep 17 00:00:00 2001 From: reputationly <197039020@qq.com> Date: Tue, 23 Jun 2026 12:35:43 +0800 Subject: [PATCH] =?UTF-8?q?fix(perf=5Fmetrics):=20=E4=BF=AE=E5=A4=8D=20PG?= =?UTF-8?q?=20=E4=B8=8B=20upsert=20=E5=88=97=E5=BC=95=E7=94=A8=E6=AD=A7?= =?UTF-8?q?=E4=B9=89=E5=B9=B6=E5=8A=A0=E5=9B=BA=20flush=20=E9=87=8D?= =?UTF-8?q?=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit UpsertPerfMetric 的 ON CONFLICT DO UPDATE 右侧列名未加表名限定, PostgreSQL (>=14.4) 会判定为目标行与 excluded 行之间的列引用歧义 (column reference is ambiguous),导致所有走 DO UPDATE 的写入失败。 多实例/多写入源并发时每个冲突桶都会报错并被无限重试。 - model/perf_metric.go: 7 个累加列全部加 perf_metrics. 表名限定 (SQLite/MySQL/PostgreSQL 通用),并发 upsert 可原子正确累加。 - pkg/perf_metrics/flush.go: flush 失败分支增加 24h 年龄兜底, 超期 stuck 桶直接丢弃,避免内存只增不减与日志无限刷。 --- model/perf_metric.go | 18 +++++++++++------- pkg/perf_metrics/flush.go | 9 ++++++++- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/model/perf_metric.go b/model/perf_metric.go index b4a6a9248ed..f081147e703 100644 --- a/model/perf_metric.go +++ b/model/perf_metric.go @@ -37,13 +37,17 @@ func UpsertPerfMetric(metric *PerfMetric) error { {Name: "bucket_ts"}, }, DoUpdates: clause.Assignments(map[string]interface{}{ - "request_count": gorm.Expr("request_count + ?", metric.RequestCount), - "success_count": gorm.Expr("success_count + ?", metric.SuccessCount), - "total_latency_ms": gorm.Expr("total_latency_ms + ?", metric.TotalLatencyMs), - "ttft_sum_ms": gorm.Expr("ttft_sum_ms + ?", metric.TtftSumMs), - "ttft_count": gorm.Expr("ttft_count + ?", metric.TtftCount), - "output_tokens": gorm.Expr("output_tokens + ?", metric.OutputTokens), - "generation_ms": gorm.Expr("generation_ms + ?", metric.GenerationMs), + // Qualify the right-hand column with the table name. PostgreSQL (>=14.4) + // treats an unqualified column in ON CONFLICT DO UPDATE as ambiguous + // between the existing row and the excluded row. Table-qualified names + // are accepted by SQLite/MySQL/PostgreSQL alike. + "request_count": gorm.Expr("perf_metrics.request_count + ?", metric.RequestCount), + "success_count": gorm.Expr("perf_metrics.success_count + ?", metric.SuccessCount), + "total_latency_ms": gorm.Expr("perf_metrics.total_latency_ms + ?", metric.TotalLatencyMs), + "ttft_sum_ms": gorm.Expr("perf_metrics.ttft_sum_ms + ?", metric.TtftSumMs), + "ttft_count": gorm.Expr("perf_metrics.ttft_count + ?", metric.TtftCount), + "output_tokens": gorm.Expr("perf_metrics.output_tokens + ?", metric.OutputTokens), + "generation_ms": gorm.Expr("perf_metrics.generation_ms + ?", metric.GenerationMs), }), }).Create(metric).Error } diff --git a/pkg/perf_metrics/flush.go b/pkg/perf_metrics/flush.go index dddc2472585..32af174acc5 100644 --- a/pkg/perf_metrics/flush.go +++ b/pkg/perf_metrics/flush.go @@ -51,8 +51,15 @@ func flushCompletedBuckets() { GenerationMs: drained.generationMs, }) if err != nil { - bucket.addCounters(drained) common.SysError(fmt.Sprintf("failed to flush perf metric bucket model=%s group=%s bucket=%d: %s", k.model, k.group, k.bucketTs, err.Error())) + // Give up on buckets that have been failing past the 24h horizon instead + // of retrying (and holding memory) forever; otherwise a persistently + // failing flush leaks memory and spams logs indefinitely. + if k.bucketTs < bucketStart(time.Now().Add(-24*time.Hour).Unix()) { + hotBuckets.Delete(key) + return true + } + bucket.addCounters(drained) return true }