From 5b76164cfe6e21f7703d43601923f9738e763f41 Mon Sep 17 00:00:00 2001 From: niumy Date: Thu, 4 Jun 2026 15:07:07 +0800 Subject: [PATCH] [bug-4356] After obtaining an exception in task status from Flink, the intermediate task status obtained from Yarn was not updated --- .../console/core/watcher/FlinkAppHttpWatcher.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java index aeec8cd8bd..438edd58a2 100755 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/watcher/FlinkAppHttpWatcher.java @@ -291,6 +291,8 @@ private void doStateFailed(FlinkApplication application) { * @param application application */ private void getStateFromFlink(FlinkApplication application) throws Exception { + log.debug( + "[StreamPark][FlinkAppHttpWatcher] getStateFromFlink appId: {}", application.getId()); StopFromEnum stopFrom = getStopFrom(application); JobsOverview jobsOverview = httpJobsOverview(application); Optional optional; @@ -549,7 +551,8 @@ private void handleNotRunState( * @param application application */ private void getStateFromYarn(FlinkApplication application) throws Exception { - log.debug("[StreamPark][FlinkAppHttpWatcher] getFromYarnRestApi starting..."); + log.debug( + "[StreamPark][FlinkAppHttpWatcher] getStateFromYarn appId: {}", application.getId()); StopFromEnum stopFrom = getStopFrom(application); OptionStateEnum optionState = OPTIONING.get(application.getId()); @@ -565,7 +568,7 @@ private void getStateFromYarn(FlinkApplication application) throws Exception { try { YarnAppInfo yarnAppInfo = httpYarnAppInfo(application); if (yarnAppInfo != null) { - String state = yarnAppInfo.getApp().getFinalStatus(); + String state = yarnAppInfo.getApp().getState(); flinkAppState = FlinkAppStateEnum.getState(state); } } finally { @@ -594,7 +597,7 @@ private void getStateFromYarn(FlinkApplication application) throws Exception { } } else { try { - String state = yarnAppInfo.getApp().getFinalStatus(); + String state = yarnAppInfo.getApp().getState(); FlinkAppStateEnum flinkAppState = FlinkAppStateEnum.getState(state); if (FlinkAppStateEnum.OTHER.equals(flinkAppState)) { return; @@ -614,7 +617,7 @@ private void getStateFromYarn(FlinkApplication application) throws Exception { } application.setState(flinkAppState.getValue()); cleanOptioning(optionState, application.getId()); - doPersistMetrics(application, true); + doPersistMetrics(application, false); if (flinkAppState.equals(FlinkAppStateEnum.FAILED) || flinkAppState.equals(FlinkAppStateEnum.LOST) || (flinkAppState.equals(FlinkAppStateEnum.CANCELED) @@ -817,14 +820,18 @@ private CheckPoints httpCheckpoints(FlinkApplication application) throws Excepti } private T yarnRestRequest(String url, Class clazz) throws IOException { + log.debug("[StreamPark][FlinkAppHttpWatcher] yarnRestRequest,url:{}", url); String result = YarnUtils.restRequest(url, HTTP_TIMEOUT); + log.debug("[StreamPark][FlinkAppHttpWatcher] yarnRestRequest,result:{}", result); return JacksonUtils.read(result, clazz); } private T httpRestRequest(String url, Class clazz) throws IOException { + log.debug("[StreamPark][FlinkAppHttpWatcher] httpRestRequest,url:{}", url); String result = HttpClientUtils.httpGetRequest( url, RequestConfig.custom().setConnectTimeout(HTTP_TIMEOUT).build()); + log.debug("[StreamPark][FlinkAppHttpWatcher] httpRestRequest,result:{}", result); if (null == result) { return null; }