Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions model/perf_metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,17 @@ func UpsertPerfMetric(metric *PerfMetric) error {
{Name: "bucket_ts"},
},
DoUpdates: clause.Assignments(map[string]interface{}{
"request_count": gorm.Expr("request_count + ?", metric.RequestCount),
"success_count": gorm.Expr("success_count + ?", metric.SuccessCount),
"total_latency_ms": gorm.Expr("total_latency_ms + ?", metric.TotalLatencyMs),
"ttft_sum_ms": gorm.Expr("ttft_sum_ms + ?", metric.TtftSumMs),
"ttft_count": gorm.Expr("ttft_count + ?", metric.TtftCount),
"output_tokens": gorm.Expr("output_tokens + ?", metric.OutputTokens),
"generation_ms": gorm.Expr("generation_ms + ?", metric.GenerationMs),
// Qualify the right-hand column with the table name. PostgreSQL (>=14.4)
// treats an unqualified column in ON CONFLICT DO UPDATE as ambiguous
// between the existing row and the excluded row. Table-qualified names
// are accepted by SQLite/MySQL/PostgreSQL alike.
"request_count": gorm.Expr("perf_metrics.request_count + ?", metric.RequestCount),
"success_count": gorm.Expr("perf_metrics.success_count + ?", metric.SuccessCount),
"total_latency_ms": gorm.Expr("perf_metrics.total_latency_ms + ?", metric.TotalLatencyMs),
"ttft_sum_ms": gorm.Expr("perf_metrics.ttft_sum_ms + ?", metric.TtftSumMs),
"ttft_count": gorm.Expr("perf_metrics.ttft_count + ?", metric.TtftCount),
"output_tokens": gorm.Expr("perf_metrics.output_tokens + ?", metric.OutputTokens),
"generation_ms": gorm.Expr("perf_metrics.generation_ms + ?", metric.GenerationMs),
}),
}).Create(metric).Error
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/perf_metrics/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,15 @@ func flushCompletedBuckets() {
GenerationMs: drained.generationMs,
})
if err != nil {
bucket.addCounters(drained)
common.SysError(fmt.Sprintf("failed to flush perf metric bucket model=%s group=%s bucket=%d: %s", k.model, k.group, k.bucketTs, err.Error()))
// Give up on buckets that have been failing past the 24h horizon instead
// of retrying (and holding memory) forever; otherwise a persistently
// failing flush leaks memory and spams logs indefinitely.
if k.bucketTs < bucketStart(time.Now().Add(-24*time.Hour).Unix()) {
hotBuckets.Delete(key)
return true
}
bucket.addCounters(drained)
return true
}

Expand Down