From 3b7958ed492bcf2d35a0ed716e9179fc8279f689 Mon Sep 17 00:00:00 2001 From: shangeyao Date: Sat, 27 Jun 2026 15:42:12 +0800 Subject: [PATCH] [Console] Batch delete expired application backups Replace per-app N+1 cleanup queries with a single load and batch removeByIds to reduce database round trips. Generated-by: Cursor Co-authored-by: Cursor --- .../core/task/ApplicationBackUpCleanTask.java | 66 ++++++++++--------- 1 file changed, 34 insertions(+), 32 deletions(-) diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ApplicationBackUpCleanTask.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ApplicationBackUpCleanTask.java index 4298200bda..c6062f9bdc 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ApplicationBackUpCleanTask.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ApplicationBackUpCleanTask.java @@ -26,6 +26,12 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + @Slf4j @Component @RequiredArgsConstructor @@ -39,38 +45,34 @@ public class ApplicationBackUpCleanTask { @Scheduled(cron = "${streampark.backup-clean.exec-cron:0 0 1 * * ?}") public void backUpClean() { log.info("Start to clean application backup"); - // select all application backup which count > maxBackupNum group by app_id - backUpService.lambdaQuery() - .select(FlinkApplicationBackup::getAppId) - .groupBy(FlinkApplicationBackup::getAppId) - .having("COUNT(*) > {0}", maxBackupNum) - .list() - .stream() - .map(FlinkApplicationBackup::getAppId) - .forEach( - appId -> { - // order by create_time desc and skip first maxBackupNum records and delete - // others - backUpService.lambdaQuery() - .eq(FlinkApplicationBackup::getAppId, appId) - .orderByDesc(FlinkApplicationBackup::getCreateTime) - .list() - .stream() - .skip(maxBackupNum) - .forEach( - backUp -> { - try { - backUpService.removeById( - backUp.getId()); - } catch (Exception e) { - log.error( - "Clean application backup failed for app id: {} , backup id: {}", - appId, - backUp.getId(), - e); - } - }); - }); + List allBackups = + backUpService.lambdaQuery().orderByDesc(FlinkApplicationBackup::getCreateTime).list(); + if (allBackups.isEmpty()) { + log.info("Clean application backup finished"); + return; + } + + Map> backupsByApp = + allBackups.stream().collect(Collectors.groupingBy(FlinkApplicationBackup::getAppId)); + + List toDelete = new ArrayList<>(); + for (List appBackups : backupsByApp.values()) { + appBackups.sort(Comparator.comparing(FlinkApplicationBackup::getCreateTime).reversed()); + appBackups.stream() + .skip(maxBackupNum) + .map(FlinkApplicationBackup::getId) + .forEach(toDelete::add); + } + + if (!toDelete.isEmpty()) { + try { + backUpService.removeByIds(toDelete); + log.info("Clean application backup finished, deleted {} records", toDelete.size()); + } catch (Exception e) { + log.error("Clean application backup failed", e); + } + return; + } log.info("Clean application backup finished"); } }