Skip to content
This repository was archived by the owner on Dec 1, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres
to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

# 1.51.2 - 2025/08/15

### Changed

- Introduced `twTasks.processing.triggersLimit` gauge to show how many triggers we can keep in memory for a bucket.

# 1.51.1 - 2025/06/18

### Changed
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
version=1.51.1
version=1.51.2
org.gradle.internal.http.socketTimeout=120000
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ public class CoreMetricsTemplate implements ICoreMetricsTemplate {
public static final String GAUGE_PROCESSING_RUNNING_TASKS_COUNT = METRIC_PREFIX + "processing.runningTasksCount";
public static final String GAUGE_PROCESSING_IN_PROGRESS_TASKS_GRABBING_COUNT = METRIC_PREFIX + "processing.inProgressTasksGrabbingCount";
public static final String GAUGE_PROCESSING_TRIGGERS_COUNT = METRIC_PREFIX + "processing.triggersCount";
public static final String GAUGE_PROCESSING_TRIGGERS_LIMIT = METRIC_PREFIX + "processing.triggersLimit";
public static final String GAUGE_PROCESSING_STATE_VERSION = METRIC_PREFIX + "processing.stateVersion";

private static final String TAG_PROCESSING_RESULT = "processingResult";
Expand Down Expand Up @@ -438,6 +439,11 @@ public Object registerProcessingTriggersCount(String bucketId, Supplier<Number>
return registerGauge(GAUGE_PROCESSING_TRIGGERS_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId));
}

@Override
public Object registerProcessingTriggersLimit(String bucketId, Supplier<Number> countSupplier) {
return registerGauge(GAUGE_PROCESSING_TRIGGERS_LIMIT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId));
}

@Override
public Object registerRunningTasksCount(String bucketId, Supplier<Number> countSupplier) {
return registerGauge(GAUGE_PROCESSING_RUNNING_TASKS_COUNT, countSupplier, TAG_BUCKET_ID, resolveBucketId(bucketId));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,8 @@ void registerTasksCleanerTasksDeletion(TaskStatus status, int deletableTasksCoun

Object registerProcessingTriggersCount(String bucketId, Supplier<Number> countSupplier);

Object registerProcessingTriggersLimit(String bucketId, Supplier<Number> countSupplier);

Object registerRunningTasksCount(String bucketId, Supplier<Number> countSupplier);

Object registerInProgressTasksGrabbingCount(String bucketId, Supplier<Number> countSupplier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,8 @@ public void startProcessing() {
return;
}
for (String bucketId : bucketsManager.getBucketIds()) {
if (!bucketsManager.getBucketProperties(bucketId).getTriggerSameTaskInAllNodes() && tasksProperties.isCheckVersionBeforeGrabbing()) {
final var bucketProperties = bucketsManager.getBucketProperties(bucketId);
if (!bucketProperties.getTriggerSameTaskInAllNodes() && tasksProperties.isCheckVersionBeforeGrabbing()) {
log.warn(
"Suboptimal configuration for bucket '" + bucketId + "' found. triggerSameTaskInAllNodes=false and checkVersionBeforeGrabbing=true.");
}
Expand All @@ -740,6 +741,7 @@ public void startProcessing() {
coreMetricsTemplate.registerRunningTasksCount(bucketId, () -> bucket.getRunningTasksCount().get());
coreMetricsTemplate.registerInProgressTasksGrabbingCount(bucketId, () -> bucket.getInProgressTasksGrabbingCount().get());
coreMetricsTemplate.registerProcessingTriggersCount(bucketId, () -> bucket.getSize().get());
coreMetricsTemplate.registerProcessingTriggersLimit(bucketId, () -> bucketProperties.getMaxTriggersInMemory());
coreMetricsTemplate.registerProcessingStateVersion(bucketId, () -> bucket.getVersion().get());

tasksProcessingExecutor.submit(() -> {
Expand Down