Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;

/** This implementation is currently used for tracing flink job on yarn,standalone,remote mode */
Expand Down Expand Up @@ -131,6 +132,8 @@ public class FlinkAppHttpWatcher {
/** tracking task list */
private static final Map<Long, FlinkApplication> WATCHING_APPS = new ConcurrentHashMap<>(0);

private static final Map<Long, AtomicBoolean> WATCH_IN_FLIGHT = new ConcurrentHashMap<>(0);

/**
*
*
Expand Down Expand Up @@ -202,19 +205,32 @@ public void doStop() {
*
* <p><strong>2) Normal information obtain, once every 5 seconds</strong>
*/
@Scheduled(fixedDelay = 5, initialDelay = 5, timeUnit = TimeUnit.SECONDS)
public void scheduledWatch() {
if (WATCHING_APPS.isEmpty()) {
return;
}
lastWatchTime = System.currentTimeMillis();
WATCHING_APPS.forEach(this::watch);
}

@Scheduled(fixedDelay = 1, initialDelay = 5, timeUnit = TimeUnit.SECONDS)
public void start() {
Long timeMillis = System.currentTimeMillis();
if (lastWatchTime == null
|| !OPTIONING.isEmpty()
|| timeMillis - lastOptionTime <= OPTION_INTERVAL.toMillis()
|| timeMillis - lastWatchTime >= WATCHING_INTERVAL.toMillis()) {
lastWatchTime = timeMillis;
WATCHING_APPS.forEach(this::watch);
public void fastWatchDuringOption() {
if (WATCHING_APPS.isEmpty()) {
return;
}
long timeMillis = System.currentTimeMillis();
if (OPTIONING.isEmpty() && timeMillis - lastOptionTime > OPTION_INTERVAL.toMillis()) {
return;
}
WATCHING_APPS.forEach(this::watch);
}

private void watch(Long id, FlinkApplication application) {
AtomicBoolean inFlight = WATCH_IN_FLIGHT.computeIfAbsent(id, ignored -> new AtomicBoolean(false));
if (!inFlight.compareAndSet(false, true)) {
return;
}
watchExecutor.execute(
() -> {
try {
Expand All @@ -229,6 +245,8 @@ private void watch(Long id, FlinkApplication application) {
} catch (Exception yarnException) {
doStateFailed(application);
}
} finally {
inFlight.set(false);
}
});
}
Expand Down Expand Up @@ -700,6 +718,7 @@ public static void unWatching(Long appId) {
}
log.info("[StreamPark][FlinkAppHttpWatcher] stop app,appId:{}", appId);
WATCHING_APPS.remove(appId);
WATCH_IN_FLIGHT.remove(appId);
}

public static void stopCanceledJob(Long appId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ private void init() {
flinkClusters.forEach(cluster -> WATCHER_CLUSTERS.put(cluster.getId(), cluster));
}

@Scheduled(fixedDelay = 1, initialDelay = 5, timeUnit = TimeUnit.SECONDS)
@Scheduled(fixedDelay = 30, initialDelay = 5, timeUnit = TimeUnit.SECONDS)
private void start() {
Long timeMillis = System.currentTimeMillis();
if (immediateWatch || timeMillis - lastWatchTime >= WATCHER_INTERVAL.toMillis()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

@Slf4j
@Component
Expand Down Expand Up @@ -101,6 +102,8 @@ public class SparkAppHttpWatcher {
/** tracking task list */
private static final Map<Long, SparkApplication> WATCHING_APPS = new ConcurrentHashMap<>(0);

private static final Map<Long, AtomicBoolean> WATCH_IN_FLIGHT = new ConcurrentHashMap<>(0);

/**
*
*
Expand Down Expand Up @@ -153,16 +156,25 @@ public void doStop() {
*
* <p><strong>2) Normal information obtain, once every 5 seconds</strong>
*/
@Scheduled(fixedDelay = 5, initialDelay = 5, timeUnit = TimeUnit.SECONDS)
public void scheduledWatch() {
if (WATCHING_APPS.isEmpty()) {
return;
}
lastWatchTime = System.currentTimeMillis();
WATCHING_APPS.forEach(this::watch);
}

@Scheduled(fixedDelay = 1, initialDelay = 5, timeUnit = TimeUnit.SECONDS)
public void start() {
Long timeMillis = System.currentTimeMillis();
if (lastWatchTime == null
|| !OPTIONING.isEmpty()
|| timeMillis - lastOptionTime <= OPTION_INTERVAL.toMillis()
|| timeMillis - lastWatchTime >= WATCHING_INTERVAL.toMillis()) {
lastWatchTime = timeMillis;
WATCHING_APPS.forEach(this::watch);
public void fastWatchDuringOption() {
if (WATCHING_APPS.isEmpty()) {
return;
}
long timeMillis = System.currentTimeMillis();
if (OPTIONING.isEmpty() && timeMillis - lastOptionTime > OPTION_INTERVAL.toMillis()) {
return;
}
WATCHING_APPS.forEach(this::watch);
}

@VisibleForTesting
Expand All @@ -172,12 +184,21 @@ public void start() {
}

private void watch(Long id, SparkApplication application) {
AtomicBoolean inFlight = WATCH_IN_FLIGHT.computeIfAbsent(id, ignored -> new AtomicBoolean(false));
if (!inFlight.compareAndSet(false, true)) {
return;
}
executorService.execute(
() -> {
try {
getStateFromYarn(application);
} catch (Exception e) {
throw new RuntimeException(e);
log.warn(
"[StreamPark][SparkAppHttpWatcher] Failed to watch application id={}",
application.getId(),
e);
} finally {
inFlight.set(false);
}
});
}
Expand Down Expand Up @@ -296,6 +317,7 @@ public static void doWatching(SparkApplication application) {
public static void unWatching(Long appId) {
log.info("[StreamPark][SparkAppHttpWatcher] stop app, appId:{}", appId);
WATCHING_APPS.remove(appId);
WATCH_IN_FLIGHT.remove(appId);
}

public static void addCanceledApp(Long appId, Long userId) {
Expand Down
Loading