Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@ nexus-publish = { id = "io.github.gradle-nexus.publish-plugin", version.ref = "n

[versions]

kotlin = "2.3.20"
kotlin = "2.3.21"
nebula = "20.2.0"
nexus = "2.0.0"

prometheus = "1.6.1"
opentelemetry = "1.60.1"
opentelemetry = "1.61.0"
opentelemetry-instrumentation = "2.27.0"
opentelemetry-instrumentation-incubator = "2.27.0-alpha"
opentelemetry-semconv = "1.40.0"
opentelemetry-semconv = "1.41.0"
hikaricp = "7.0.2"
postgresql = "42.7.10"
postgresql = "42.7.11"

logback = "1.5.32"
testcontainers = "2.0.5"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ internal object ConsoleProgressCallback : ProgressCallback {
println("Target node: ${targetNode.id}")
println("Shards to move (${shardsDiff.size}):")
shardsDiff.forEach { diff ->
println("\t${diff.originalShard} => ${diff.updatedShard}")
// Shard(shard=6, producerNode=NodeId(id=db4), consumerNode=NodeId(id=db4), nextConsumerNode=null) => Shard(shard=6, producerNode=NodeId(id=db5), consumerNode=null, nextConsumerNode=NodeId(id=db5))
val originalShard =
"Shard #${diff.originalShard.shard}(producerNode=${diff.originalShard.producerNode.id}, consumerNode=${diff.originalShard.consumerNode?.id}, nextConsumerNode=${diff.originalShard.nextConsumerNode?.id})"
val updatedShard =
"Shard #${diff.updatedShard.shard}(producerNode=${diff.updatedShard.producerNode.id}, consumerNode=${diff.updatedShard.consumerNode?.id}, nextConsumerNode=${diff.updatedShard.nextConsumerNode?.id})"
println("\t$originalShard=>$updatedShard")
}
}

Expand Down
70 changes: 39 additions & 31 deletions src/main/kotlin/kolbasa/stats/prometheus/PrometheusQueueMetrics.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package kolbasa.stats.prometheus

import kolbasa.inspector.MessageAge
import kolbasa.inspector.Messages
import kolbasa.inspector.connection.ConnectionAwareDatabaseInspector
import kolbasa.producer.PartialInsert
import kolbasa.queue.Queue
Expand All @@ -10,6 +8,8 @@ import kolbasa.stats.prometheus.metrics.*
import java.sql.Connection
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentMap
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock

internal class PrometheusQueueMetrics(
private val queue: Queue<*>,
Expand Down Expand Up @@ -78,7 +78,12 @@ internal class PrometheusQueueMetrics(
}
}

override fun consumerDeleteMetrics(nodeId: NodeId, removedMessages: Int, executionNanos: Long, connection: Connection?) {
override fun consumerDeleteMetrics(
nodeId: NodeId,
removedMessages: Int,
executionNanos: Long,
connection: Connection?
) {
val consumerQueueMetrics = consumer.computeIfAbsent(nodeId) { _ ->
ConsumerQueueMetrics(queueName, nodeId, prometheusConfig)
}
Expand Down Expand Up @@ -140,44 +145,47 @@ internal class PrometheusQueueMetrics(
CommonQueueMetrics(queueName, nodeId, prometheusConfig)
}

val cacheValue = getCachedValue(connection, queue, commonQueueMetrics.cacheKey)

commonQueueMetrics.queueMetrics(
messages = cacheValue.messages,
queueSizeBytes = cacheValue.queueSizeBytes,
messageAge = cacheValue.messageAge
)
val lastUpdateInfo = lastUpdateCache.computeIfAbsent(commonQueueMetrics.cacheKey) { UpdateInfo() }
if (lastUpdateInfo.isOutdated()) {
// time to update the common metrics, only one thread should do it
if (lastUpdateInfo.tryLock()) {
try {
commonQueueMetrics.queueMetrics(
messages = inspector.count(connection, queue),
queueSizeBytes = inspector.size(connection, queue),
messageAge = inspector.messageAge(connection, queue)
)
} finally {
lastUpdateInfo.unlockAndUpdateTimestamp()
}
}
}
}

private companion object {

private const val CACHE_LIFETIME_MILLIS = 60_000 // 1 minute

data class CacheValue(
val messages: Messages,
val queueSizeBytes: Long,
val messageAge: MessageAge,
val expiration: Long
)
private const val COMMON_METRICS_REFRESH_INTERVAL_MS = 60_000 // 1 minute

val inspector = ConnectionAwareDatabaseInspector()
val cache: ConcurrentMap<String, CacheValue> = ConcurrentHashMap()
data class UpdateInfo(
@Volatile
private var lastUpdated: Long = 0,
private val lock: Lock = ReentrantLock()
) {

fun getCachedValue(connection: Connection, queue: Queue<*>, cacheKey: String): CacheValue {
var value = cache[cacheKey]
if (value != null && System.currentTimeMillis() < value.expiration) {
return value
fun isOutdated(): Boolean {
return (lastUpdated + COMMON_METRICS_REFRESH_INTERVAL_MS) < System.currentTimeMillis()
}

// Cache miss or expired, fetch new values
val messages = inspector.count(connection, queue)
val queueSizeBytes = inspector.size(connection, queue)
val messageAge = inspector.messageAge(connection, queue)
fun tryLock(): Boolean = lock.tryLock()

value = CacheValue(messages, queueSizeBytes, messageAge, System.currentTimeMillis() + CACHE_LIFETIME_MILLIS)
cache[cacheKey] = value
return value
fun unlockAndUpdateTimestamp() {
lock.unlock()
lastUpdated = System.currentTimeMillis()
}
}

val inspector = ConnectionAwareDatabaseInspector()
val lastUpdateCache: ConcurrentMap<String, UpdateInfo> = ConcurrentHashMap()
}
}