From c7f3d60413a91fbf97f31600c2307f611fb72058 Mon Sep 17 00:00:00 2001 From: shangeyao Date: Sat, 27 Jun 2026 15:40:22 +0800 Subject: [PATCH] [Console] Skip duplicate Flink/Spark watcher polls per application Use per-app single-flight guards so scheduled watcher ticks do not enqueue overlapping REST polling tasks for the same application. Generated-by: Cursor Co-authored-by: Cursor --- .../console/core/watcher/FlinkAppHttpWatcher.java | 11 +++++++++++ .../console/core/watcher/SparkAppHttpWatcher.java | 15 ++++++++++++++- 2 files changed, 25 insertions(+), 1 deletion(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java index aeec8cd8bd..5d3d55ea05 100755 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java @@ -70,6 +70,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; /** This implementation is currently used for tracing flink job on yarn,standalone,remote mode */ @@ -131,6 +132,9 @@ public class FlinkAppHttpWatcher { /** tracking task list */ private static final Map WATCHING_APPS = new ConcurrentHashMap<>(0); + /** Skip scheduling a new watch while a previous watch for the same app is still running. */ + private static final Map WATCH_IN_FLIGHT = new ConcurrentHashMap<>(0); + /** * * @@ -215,6 +219,10 @@ public void start() { } private void watch(Long id, FlinkApplication application) { + AtomicBoolean inFlight = WATCH_IN_FLIGHT.computeIfAbsent(id, ignored -> new AtomicBoolean(false)); + if (!inFlight.compareAndSet(false, true)) { + return; + } watchExecutor.execute( () -> { try { @@ -229,6 +237,8 @@ private void watch(Long id, FlinkApplication application) { } catch (Exception yarnException) { doStateFailed(application); } + } finally { + inFlight.set(false); } }); } @@ -700,6 +710,7 @@ public static void unWatching(Long appId) { } log.info("[StreamPark][FlinkAppHttpWatcher] stop app,appId:{}", appId); WATCHING_APPS.remove(appId); + WATCH_IN_FLIGHT.remove(appId); } public static void stopCanceledJob(Long appId) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java index 5383629d7e..386247afc5 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java @@ -62,6 +62,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; @Slf4j @Component @@ -101,6 +102,8 @@ public class SparkAppHttpWatcher { /** tracking task list */ private static final Map WATCHING_APPS = new ConcurrentHashMap<>(0); + private static final Map WATCH_IN_FLIGHT = new ConcurrentHashMap<>(0); + /** * * @@ -172,12 +175,21 @@ public void start() { } private void watch(Long id, SparkApplication application) { + AtomicBoolean inFlight = WATCH_IN_FLIGHT.computeIfAbsent(id, ignored -> new AtomicBoolean(false)); + if (!inFlight.compareAndSet(false, true)) { + return; + } executorService.execute( () -> { try { getStateFromYarn(application); } catch (Exception e) { - throw new RuntimeException(e); + log.warn( + "[StreamPark][SparkAppHttpWatcher] Failed to watch application id={}", + application.getId(), + e); + } finally { + inFlight.set(false); } }); } @@ -296,6 +308,7 @@ public static void doWatching(SparkApplication application) { public static void unWatching(Long appId) { log.info("[StreamPark][SparkAppHttpWatcher] stop app, appId:{}", appId); WATCHING_APPS.remove(appId); + WATCH_IN_FLIGHT.remove(appId); } public static void addCanceledApp(Long appId, Long userId) {