diff --git a/CHANGELOG.md b/CHANGELOG.md index bb0f34ad..e7c8c7c8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +# 1.51.2 - 2025/08/15 + +### Changed + +- Introduced `twTasks.processing.triggersLimit` gauge to show how many triggers we can keep in memory for a bucket. + # 1.51.1 - 2025/06/18 ### Changed diff --git a/gradle.properties b/gradle.properties index 253c5e40..83671c03 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ -version=1.51.1 +version=1.51.2 org.gradle.internal.http.socketTimeout=120000 diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java index 050f1e04..ada72386 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/CoreMetricsTemplate.java @@ -96,6 +96,7 @@ public class CoreMetricsTemplate implements ICoreMetricsTemplate { public static final String GAUGE_PROCESSING_RUNNING_TASKS_COUNT = METRIC_PREFIX + "processing.runningTasksCount"; public static final String GAUGE_PROCESSING_IN_PROGRESS_TASKS_GRABBING_COUNT = METRIC_PREFIX + "processing.inProgressTasksGrabbingCount"; public static final String GAUGE_PROCESSING_TRIGGERS_COUNT = METRIC_PREFIX + "processing.triggersCount"; + public static final String GAUGE_PROCESSING_TRIGGERS_LIMIT = METRIC_PREFIX + "processing.triggersLimit"; public static final String GAUGE_PROCESSING_STATE_VERSION = METRIC_PREFIX + "processing.stateVersion"; private static final String TAG_PROCESSING_RESULT = "processingResult"; @@ -438,6 +439,11 @@ public Object registerProcessingTriggersCount(String bucketId, Supplier return registerGauge(GAUGE_PROCESSING_TRIGGERS_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); } + @Override + public Object registerProcessingTriggersLimit(String bucketId, Supplier countSupplier) { + return registerGauge(GAUGE_PROCESSING_TRIGGERS_LIMIT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); + } + @Override public Object registerRunningTasksCount(String bucketId, Supplier countSupplier) { return registerGauge(GAUGE_PROCESSING_RUNNING_TASKS_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId)); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java index fcf74cfa..ee0d059f 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/helpers/ICoreMetricsTemplate.java @@ -107,6 +107,8 @@ void registerTasksCleanerTasksDeletion(TaskStatus status, int deletableTasksCoun Object registerProcessingTriggersCount(String bucketId, Supplier countSupplier); + Object registerProcessingTriggersLimit(String bucketId, Supplier countSupplier); + Object registerRunningTasksCount(String bucketId, Supplier countSupplier); Object registerInProgressTasksGrabbingCount(String bucketId, Supplier countSupplier); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/TasksProcessingService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/TasksProcessingService.java index c76ad8f6..2e64daea 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/TasksProcessingService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/TasksProcessingService.java @@ -730,7 +730,8 @@ public void startProcessing() { return; } for (String bucketId : bucketsManager.getBucketIds()) { - if (!bucketsManager.getBucketProperties(bucketId).getTriggerSameTaskInAllNodes() && tasksProperties.isCheckVersionBeforeGrabbing()) { + final var bucketProperties = bucketsManager.getBucketProperties(bucketId); + if (!bucketProperties.getTriggerSameTaskInAllNodes() && tasksProperties.isCheckVersionBeforeGrabbing()) { log.warn( "Suboptimal configuration for bucket '" + bucketId + "' found. triggerSameTaskInAllNodes=false and checkVersionBeforeGrabbing=true."); } @@ -740,6 +741,7 @@ public void startProcessing() { coreMetricsTemplate.registerRunningTasksCount(bucketId, () -> bucket.getRunningTasksCount().get()); coreMetricsTemplate.registerInProgressTasksGrabbingCount(bucketId, () -> bucket.getInProgressTasksGrabbingCount().get()); coreMetricsTemplate.registerProcessingTriggersCount(bucketId, () -> bucket.getSize().get()); + coreMetricsTemplate.registerProcessingTriggersLimit(bucketId, () -> bucketProperties.getMaxTriggersInMemory()); coreMetricsTemplate.registerProcessingStateVersion(bucketId, () -> bucket.getVersion().get()); tasksProcessingExecutor.submit(() -> {