diff --git a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/MetricThreadPoolExhaustedListener.java b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/MetricThreadPoolExhaustedListener.java index f7278e172b1..1bc8c5fa729 100644 --- a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/MetricThreadPoolExhaustedListener.java +++ b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/MetricThreadPoolExhaustedListener.java @@ -29,6 +29,7 @@ public class MetricThreadPoolExhaustedListener implements ThreadPoolExhaustedLis public MetricThreadPoolExhaustedListener(String threadPoolExecutorName, DefaultMetricsCollector collector) { this.threadPoolExecutorName = threadPoolExecutorName; this.threadRejectMetricsCountSampler = new ThreadRejectMetricsCountSampler(collector); + this.threadRejectMetricsCountSampler.addMetricName(threadPoolExecutorName); } public MetricThreadPoolExhaustedListener(String threadPoolExecutorName, ThreadRejectMetricsCountSampler sampler) { diff --git a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/MetricsNameCountSampler.java b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/MetricsNameCountSampler.java index 5a4457ec539..dd556dec0f4 100644 --- a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/MetricsNameCountSampler.java +++ b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/MetricsNameCountSampler.java @@ -49,9 +49,18 @@ public MetricsNameCountSampler( this.collector.addSampler(this); } + @Override + public void inc(S source, K metricName) { + metricNames.add(metricName); + if (incrementAndGetCreated(source, metricName)) { + samplesChanged.set(true); + } + } + public void addMetricName(K name) { - this.metricNames.add(name); - this.samplesChanged.set(true); + if (metricNames.add(name)) { + samplesChanged.set(true); + } } @Override diff --git a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/SimpleMetricsCountSampler.java b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/SimpleMetricsCountSampler.java index 487cad0ba3f..95742d060da 100644 --- a/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/SimpleMetricsCountSampler.java +++ b/dubbo-metrics/dubbo-metrics-default/src/main/java/org/apache/dubbo/metrics/collector/sample/SimpleMetricsCountSampler.java @@ -37,7 +37,7 @@ public abstract class SimpleMetricsCountSampler implemen @Override public void inc(S source, K metricName) { - getAtomicCounter(source, metricName).incrementAndGet(); + incrementAndGetCreated(source, metricName); } @Override @@ -47,7 +47,7 @@ public Optional> getCount(K metricName) { protected abstract void countConfigure(MetricsCountSampleConfigurer sampleConfigure); - private AtomicLong getAtomicCounter(S source, K metricsName) { + protected boolean incrementAndGetCreated(S source, K metricsName) { MetricsCountSampleConfigurer sampleConfigure = new MetricsCountSampleConfigurer<>(); sampleConfigure.setSource(source); sampleConfigure.setMetricsName(metricsName); @@ -63,12 +63,10 @@ private AtomicLong getAtomicCounter(S source, K metricsName) { Assert.notNull(sampleConfigure.getMetric(), "metrics is null"); - AtomicLong atomicCounter = metricAtomic.get(sampleConfigure.getMetric()); - - if (atomicCounter == null) { - atomicCounter = ConcurrentHashMapUtils.computeIfAbsent( - metricAtomic, sampleConfigure.getMetric(), k -> new AtomicLong()); - } - return atomicCounter; + AtomicLong newCounter = new AtomicLong(); + AtomicLong atomicCounter = + ConcurrentHashMapUtils.computeIfAbsent(metricAtomic, sampleConfigure.getMetric(), k -> newCounter); + atomicCounter.incrementAndGet(); + return atomicCounter == newCounter; } } diff --git a/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/metrics/model/sample/ErrorCodeSampleTest.java b/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/metrics/model/sample/ErrorCodeSampleTest.java index 833dee04981..8bf4c4de89b 100644 --- a/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/metrics/model/sample/ErrorCodeSampleTest.java +++ b/dubbo-metrics/dubbo-metrics-default/src/test/java/org/apache/dubbo/metrics/metrics/model/sample/ErrorCodeSampleTest.java @@ -31,6 +31,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; public class ErrorCodeSampleTest { @@ -70,6 +71,39 @@ void testErrorCodeMetric() { ((AtomicLong) ((CounterMetricSample) metricSample).getValue()).get() == 2L, "Sample count error.")); } + @Test + void testErrorCodeMetricChangesAfterFirstLateEvent() { + FrameworkModel frameworkModel = FrameworkModel.defaultModel(); + ApplicationModel applicationModel = frameworkModel.newApplication(); + + ApplicationConfig applicationConfig = new ApplicationConfig(); + applicationConfig.setName("MyApplication1"); + + applicationModel.getApplicationConfigManager().setApplication(applicationConfig); + + DefaultMetricsCollector defaultMetricsCollector = new DefaultMetricsCollector(applicationModel); + defaultMetricsCollector.setCollectEnabled(true); + + ErrorCodeSampler sampler = + (ErrorCodeSampler) ReflectionUtils.getField(defaultMetricsCollector, "errorCodeSampler"); + + ErrorCodeMetricsListenRegister register = + (ErrorCodeMetricsListenRegister) ReflectionUtils.getField(sampler, "register"); + + Assertions.assertTrue(sampler.calSamplesChanged()); + Assertions.assertFalse(sampler.calSamplesChanged()); + + register.onMessage("0-1", null); + Assertions.assertTrue(sampler.calSamplesChanged()); + Assertions.assertFalse(sampler.calSamplesChanged()); + + register.onMessage("0-1", null); + Assertions.assertFalse(sampler.calSamplesChanged()); + + register.onMessage("0-2", null); + Assertions.assertTrue(sampler.calSamplesChanged()); + } + @AfterEach public void tearDown() { FrameworkModel.defaultModel().destroy(); diff --git a/dubbo-metrics/dubbo-metrics-prometheus/src/test/java/org/apache/dubbo/metrics/prometheus/PrometheusMetricsThreadPoolTest.java b/dubbo-metrics/dubbo-metrics-prometheus/src/test/java/org/apache/dubbo/metrics/prometheus/PrometheusMetricsThreadPoolTest.java index 2727b349807..25925c25e80 100644 --- a/dubbo-metrics/dubbo-metrics-prometheus/src/test/java/org/apache/dubbo/metrics/prometheus/PrometheusMetricsThreadPoolTest.java +++ b/dubbo-metrics/dubbo-metrics-prometheus/src/test/java/org/apache/dubbo/metrics/prometheus/PrometheusMetricsThreadPoolTest.java @@ -20,6 +20,7 @@ import org.apache.dubbo.config.MetricsConfig; import org.apache.dubbo.config.nested.PrometheusConfig; import org.apache.dubbo.metrics.collector.DefaultMetricsCollector; +import org.apache.dubbo.metrics.collector.sample.MetricThreadPoolExhaustedListener; import org.apache.dubbo.metrics.collector.sample.ThreadRejectMetricsCountSampler; import org.apache.dubbo.metrics.model.sample.GaugeMetricSample; import org.apache.dubbo.metrics.model.sample.MetricSample; @@ -147,7 +148,6 @@ void testThreadPoolRejectMetrics() { ThreadRejectMetricsCountSampler threadRejectMetricsCountSampler = new ThreadRejectMetricsCountSampler(collector); threadRejectMetricsCountSampler.inc(threadPoolExecutorName, threadPoolExecutorName); - threadRejectMetricsCountSampler.addMetricName(threadPoolExecutorName); List samples = collector.collect(); for (MetricSample sample : samples) { Assertions.assertTrue(sample instanceof GaugeMetricSample); @@ -160,4 +160,31 @@ void testThreadPoolRejectMetrics() { Assertions.assertEquals(gaugeSample.applyAsLong(), 1); } } + + @Test + void testThreadPoolRejectMetricsExportedAfterLateFirstEvent() { + metricsCollector.setCollectEnabled(true); + metricsCollector.setApplicationName(applicationModel.getApplicationName()); + String threadPoolExecutorName = "DubboServerHandler-20816"; + + ThreadRejectMetricsCountSampler sampler = new ThreadRejectMetricsCountSampler(metricsCollector); + MetricThreadPoolExhaustedListener listener = + new MetricThreadPoolExhaustedListener(threadPoolExecutorName, sampler); + + PrometheusMetricsReporter reporter = new PrometheusMetricsReporter(metricsConfig.toUrl(), applicationModel); + reporter.init(); + try { + reporter.resetIfSamplesChanged(); + Assertions.assertFalse(reporter.getResponse().contains("dubbo_thread_pool_reject_thread_count")); + + listener.onEvent(null); + reporter.resetIfSamplesChanged(); + + String response = reporter.getResponse(); + Assertions.assertTrue(response.contains("dubbo_thread_pool_reject_thread_count")); + Assertions.assertTrue(response.contains("thread.name=\"" + threadPoolExecutorName + "\"")); + } finally { + reporter.destroy(); + } + } }