From aa054652ad9df01d2adc09a1f7b4f42f6752f72b Mon Sep 17 00:00:00 2001 From: shangeyao Date: Sat, 27 Jun 2026 17:16:07 +0800 Subject: [PATCH] [K8s] Add TTL and size limit to Flink MetricCache Expire K8s metric entries after six hours and cap cache size to avoid retaining stale metrics when jobs stop without explicit invalidation. Generated-by: Cursor Co-authored-by: Cursor --- .../flink/kubernetes/FlinkK8sWatchController.scala | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala index 01e67def03..162fca88cc 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/FlinkK8sWatchController.scala @@ -228,7 +228,11 @@ object K8sDeploymentEventCache { class MetricCache { private[this] lazy val cache: Cache[ClusterKey, FlinkMetricCV] = - Caffeine.newBuilder().build() + Caffeine + .newBuilder() + .expireAfterWrite(6, TimeUnit.HOURS) + .maximumSize(10_000) + .build() def put(k: ClusterKey, v: FlinkMetricCV): Unit = cache.put(k, v)