diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java index aeec8cd8bd..72364e4b18 100755 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java @@ -70,6 +70,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Collectors; /** This implementation is currently used for tracing flink job on yarn,standalone,remote mode */ @@ -131,6 +132,8 @@ public class FlinkAppHttpWatcher { /** tracking task list */ private static final Map WATCHING_APPS = new ConcurrentHashMap<>(0); + private static final Map WATCH_IN_FLIGHT = new ConcurrentHashMap<>(0); + /** * * @@ -202,19 +205,32 @@ public void doStop() { * *

2) Normal information obtain, once every 5 seconds */ + @Scheduled(fixedDelay = 5, initialDelay = 5, timeUnit = TimeUnit.SECONDS) + public void scheduledWatch() { + if (WATCHING_APPS.isEmpty()) { + return; + } + lastWatchTime = System.currentTimeMillis(); + WATCHING_APPS.forEach(this::watch); + } + @Scheduled(fixedDelay = 1, initialDelay = 5, timeUnit = TimeUnit.SECONDS) - public void start() { - Long timeMillis = System.currentTimeMillis(); - if (lastWatchTime == null - || !OPTIONING.isEmpty() - || timeMillis - lastOptionTime <= OPTION_INTERVAL.toMillis() - || timeMillis - lastWatchTime >= WATCHING_INTERVAL.toMillis()) { - lastWatchTime = timeMillis; - WATCHING_APPS.forEach(this::watch); + public void fastWatchDuringOption() { + if (WATCHING_APPS.isEmpty()) { + return; + } + long timeMillis = System.currentTimeMillis(); + if (OPTIONING.isEmpty() && timeMillis - lastOptionTime > OPTION_INTERVAL.toMillis()) { + return; } + WATCHING_APPS.forEach(this::watch); } private void watch(Long id, FlinkApplication application) { + AtomicBoolean inFlight = WATCH_IN_FLIGHT.computeIfAbsent(id, ignored -> new AtomicBoolean(false)); + if (!inFlight.compareAndSet(false, true)) { + return; + } watchExecutor.execute( () -> { try { @@ -229,6 +245,8 @@ private void watch(Long id, FlinkApplication application) { } catch (Exception yarnException) { doStateFailed(application); } + } finally { + inFlight.set(false); } }); } @@ -700,6 +718,7 @@ public static void unWatching(Long appId) { } log.info("[StreamPark][FlinkAppHttpWatcher] stop app,appId:{}", appId); WATCHING_APPS.remove(appId); + WATCH_IN_FLIGHT.remove(appId); } public static void stopCanceledJob(Long appId) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java index 8abab9c0f3..ca609470ea 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkClusterWatcher.java @@ -103,7 +103,7 @@ private void init() { flinkClusters.forEach(cluster -> WATCHER_CLUSTERS.put(cluster.getId(), cluster)); } - @Scheduled(fixedDelay = 1, initialDelay = 5, timeUnit = TimeUnit.SECONDS) + @Scheduled(fixedDelay = 30, initialDelay = 5, timeUnit = TimeUnit.SECONDS) private void start() { Long timeMillis = System.currentTimeMillis(); if (immediateWatch || timeMillis - lastWatchTime >= WATCHER_INTERVAL.toMillis()) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java index 5383629d7e..e7fbc03d87 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/SparkAppHttpWatcher.java @@ -62,6 +62,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; @Slf4j @Component @@ -101,6 +102,8 @@ public class SparkAppHttpWatcher { /** tracking task list */ private static final Map WATCHING_APPS = new ConcurrentHashMap<>(0); + private static final Map WATCH_IN_FLIGHT = new ConcurrentHashMap<>(0); + /** * * @@ -153,16 +156,25 @@ public void doStop() { * *

2) Normal information obtain, once every 5 seconds */ + @Scheduled(fixedDelay = 5, initialDelay = 5, timeUnit = TimeUnit.SECONDS) + public void scheduledWatch() { + if (WATCHING_APPS.isEmpty()) { + return; + } + lastWatchTime = System.currentTimeMillis(); + WATCHING_APPS.forEach(this::watch); + } + @Scheduled(fixedDelay = 1, initialDelay = 5, timeUnit = TimeUnit.SECONDS) - public void start() { - Long timeMillis = System.currentTimeMillis(); - if (lastWatchTime == null - || !OPTIONING.isEmpty() - || timeMillis - lastOptionTime <= OPTION_INTERVAL.toMillis() - || timeMillis - lastWatchTime >= WATCHING_INTERVAL.toMillis()) { - lastWatchTime = timeMillis; - WATCHING_APPS.forEach(this::watch); + public void fastWatchDuringOption() { + if (WATCHING_APPS.isEmpty()) { + return; } + long timeMillis = System.currentTimeMillis(); + if (OPTIONING.isEmpty() && timeMillis - lastOptionTime > OPTION_INTERVAL.toMillis()) { + return; + } + WATCHING_APPS.forEach(this::watch); } @VisibleForTesting @@ -172,12 +184,21 @@ public void start() { } private void watch(Long id, SparkApplication application) { + AtomicBoolean inFlight = WATCH_IN_FLIGHT.computeIfAbsent(id, ignored -> new AtomicBoolean(false)); + if (!inFlight.compareAndSet(false, true)) { + return; + } executorService.execute( () -> { try { getStateFromYarn(application); } catch (Exception e) { - throw new RuntimeException(e); + log.warn( + "[StreamPark][SparkAppHttpWatcher] Failed to watch application id={}", + application.getId(), + e); + } finally { + inFlight.set(false); } }); } @@ -296,6 +317,7 @@ public static void doWatching(SparkApplication application) { public static void unWatching(Long appId) { log.info("[StreamPark][SparkAppHttpWatcher] stop app, appId:{}", appId); WATCHING_APPS.remove(appId); + WATCH_IN_FLIGHT.remove(appId); } public static void addCanceledApp(Long appId, Long userId) {