From de954f55d548159cd970336a4a0f3816625c8fbe Mon Sep 17 00:00:00 2001 From: Vasily Vasilkov Date: Sat, 9 May 2026 12:24:22 +0400 Subject: [PATCH 1/3] Make output cleaner --- .../kolbasa/cluster/butcher/ConsoleProgressCallback.kt | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/kolbasa/cluster/butcher/ConsoleProgressCallback.kt b/src/main/kotlin/kolbasa/cluster/butcher/ConsoleProgressCallback.kt index 8a527e3..6c6e861 100644 --- a/src/main/kotlin/kolbasa/cluster/butcher/ConsoleProgressCallback.kt +++ b/src/main/kotlin/kolbasa/cluster/butcher/ConsoleProgressCallback.kt @@ -11,7 +11,12 @@ internal object ConsoleProgressCallback : ProgressCallback { println("Target node: ${targetNode.id}") println("Shards to move (${shardsDiff.size}):") shardsDiff.forEach { diff -> - println("\t${diff.originalShard} => ${diff.updatedShard}") + // Shard(shard=6, producerNode=NodeId(id=db4), consumerNode=NodeId(id=db4), nextConsumerNode=null) => Shard(shard=6, producerNode=NodeId(id=db5), consumerNode=null, nextConsumerNode=NodeId(id=db5)) + val originalShard = + "Shard #${diff.originalShard.shard}(producerNode=${diff.originalShard.producerNode.id}, consumerNode=${diff.originalShard.consumerNode?.id}, nextConsumerNode=${diff.originalShard.nextConsumerNode?.id})" + val updatedShard = + "Shard #${diff.updatedShard.shard}(producerNode=${diff.updatedShard.producerNode.id}, consumerNode=${diff.updatedShard.consumerNode?.id}, nextConsumerNode=${diff.updatedShard.nextConsumerNode?.id})" + println("\t$originalShard=>$updatedShard") } } From 983ba82d71274684b921e714727e871b689849c7 Mon Sep 17 00:00:00 2001 From: Vasily Vasilkov Date: Sat, 9 May 2026 12:26:54 +0400 Subject: [PATCH 2/3] Update libs --- gradle/libs.versions.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 111d40e..d3d4d8c 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -6,17 +6,17 @@ nexus-publish = { id = "io.github.gradle-nexus.publish-plugin", version.ref = "n [versions] -kotlin = "2.3.20" +kotlin = "2.3.21" nebula = "20.2.0" nexus = "2.0.0" prometheus = "1.6.1" -opentelemetry = "1.60.1" +opentelemetry = "1.61.0" opentelemetry-instrumentation = "2.27.0" opentelemetry-instrumentation-incubator = "2.27.0-alpha" -opentelemetry-semconv = "1.40.0" +opentelemetry-semconv = "1.41.0" hikaricp = "7.0.2" -postgresql = "42.7.10" +postgresql = "42.7.11" logback = "1.5.32" testcontainers = "2.0.5" From 15bf2442a21806593b21cb3e1821be7c0e8b2465 Mon Sep 17 00:00:00 2001 From: Vasily Vasilkov Date: Sat, 16 May 2026 21:59:28 +0400 Subject: [PATCH 3/3] Improve common metrics sending under high load --- .../prometheus/PrometheusQueueMetrics.kt | 70 +++++++++++-------- 1 file changed, 39 insertions(+), 31 deletions(-) diff --git a/src/main/kotlin/kolbasa/stats/prometheus/PrometheusQueueMetrics.kt b/src/main/kotlin/kolbasa/stats/prometheus/PrometheusQueueMetrics.kt index 67a1d92..2d8e37b 100644 --- a/src/main/kotlin/kolbasa/stats/prometheus/PrometheusQueueMetrics.kt +++ b/src/main/kotlin/kolbasa/stats/prometheus/PrometheusQueueMetrics.kt @@ -1,7 +1,5 @@ package kolbasa.stats.prometheus -import kolbasa.inspector.MessageAge -import kolbasa.inspector.Messages import kolbasa.inspector.connection.ConnectionAwareDatabaseInspector import kolbasa.producer.PartialInsert import kolbasa.queue.Queue @@ -10,6 +8,8 @@ import kolbasa.stats.prometheus.metrics.* import java.sql.Connection import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentMap +import java.util.concurrent.locks.Lock +import java.util.concurrent.locks.ReentrantLock internal class PrometheusQueueMetrics( private val queue: Queue<*>, @@ -78,7 +78,12 @@ internal class PrometheusQueueMetrics( } } - override fun consumerDeleteMetrics(nodeId: NodeId, removedMessages: Int, executionNanos: Long, connection: Connection?) { + override fun consumerDeleteMetrics( + nodeId: NodeId, + removedMessages: Int, + executionNanos: Long, + connection: Connection? + ) { val consumerQueueMetrics = consumer.computeIfAbsent(nodeId) { _ -> ConsumerQueueMetrics(queueName, nodeId, prometheusConfig) } @@ -140,44 +145,47 @@ internal class PrometheusQueueMetrics( CommonQueueMetrics(queueName, nodeId, prometheusConfig) } - val cacheValue = getCachedValue(connection, queue, commonQueueMetrics.cacheKey) - - commonQueueMetrics.queueMetrics( - messages = cacheValue.messages, - queueSizeBytes = cacheValue.queueSizeBytes, - messageAge = cacheValue.messageAge - ) + val lastUpdateInfo = lastUpdateCache.computeIfAbsent(commonQueueMetrics.cacheKey) { UpdateInfo() } + if (lastUpdateInfo.isOutdated()) { + // time to update the common metrics, only one thread should do it + if (lastUpdateInfo.tryLock()) { + try { + commonQueueMetrics.queueMetrics( + messages = inspector.count(connection, queue), + queueSizeBytes = inspector.size(connection, queue), + messageAge = inspector.messageAge(connection, queue) + ) + } finally { + lastUpdateInfo.unlockAndUpdateTimestamp() + } + } + } } private companion object { - private const val CACHE_LIFETIME_MILLIS = 60_000 // 1 minute - - data class CacheValue( - val messages: Messages, - val queueSizeBytes: Long, - val messageAge: MessageAge, - val expiration: Long - ) + private const val COMMON_METRICS_REFRESH_INTERVAL_MS = 60_000 // 1 minute - val inspector = ConnectionAwareDatabaseInspector() - val cache: ConcurrentMap = ConcurrentHashMap() + data class UpdateInfo( + @Volatile + private var lastUpdated: Long = 0, + private val lock: Lock = ReentrantLock() + ) { - fun getCachedValue(connection: Connection, queue: Queue<*>, cacheKey: String): CacheValue { - var value = cache[cacheKey] - if (value != null && System.currentTimeMillis() < value.expiration) { - return value + fun isOutdated(): Boolean { + return (lastUpdated + COMMON_METRICS_REFRESH_INTERVAL_MS) < System.currentTimeMillis() } - // Cache miss or expired, fetch new values - val messages = inspector.count(connection, queue) - val queueSizeBytes = inspector.size(connection, queue) - val messageAge = inspector.messageAge(connection, queue) + fun tryLock(): Boolean = lock.tryLock() - value = CacheValue(messages, queueSizeBytes, messageAge, System.currentTimeMillis() + CACHE_LIFETIME_MILLIS) - cache[cacheKey] = value - return value + fun unlockAndUpdateTimestamp() { + lock.unlock() + lastUpdated = System.currentTimeMillis() + } } + + val inspector = ConnectionAwareDatabaseInspector() + val lastUpdateCache: ConcurrentMap = ConcurrentHashMap() } }