Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public class MetricThreadPoolExhaustedListener implements ThreadPoolExhaustedLis
public MetricThreadPoolExhaustedListener(String threadPoolExecutorName, DefaultMetricsCollector collector) {
this.threadPoolExecutorName = threadPoolExecutorName;
this.threadRejectMetricsCountSampler = new ThreadRejectMetricsCountSampler(collector);
this.threadRejectMetricsCountSampler.addMetricName(threadPoolExecutorName);
}

public MetricThreadPoolExhaustedListener(String threadPoolExecutorName, ThreadRejectMetricsCountSampler sampler) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,18 @@ public MetricsNameCountSampler(
this.collector.addSampler(this);
}

@Override
public void inc(S source, K metricName) {
metricNames.add(metricName);
if (incrementAndGetCreated(source, metricName)) {
samplesChanged.set(true);
}
}

public void addMetricName(K name) {
this.metricNames.add(name);
this.samplesChanged.set(true);
if (metricNames.add(name)) {
samplesChanged.set(true);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public abstract class SimpleMetricsCountSampler<S, K, M extends Metric> implemen

@Override
public void inc(S source, K metricName) {
getAtomicCounter(source, metricName).incrementAndGet();
incrementAndGetCreated(source, metricName);
}

@Override
Expand All @@ -47,7 +47,7 @@ public Optional<ConcurrentHashMap<M, AtomicLong>> getCount(K metricName) {

protected abstract void countConfigure(MetricsCountSampleConfigurer<S, K, M> sampleConfigure);

private AtomicLong getAtomicCounter(S source, K metricsName) {
protected boolean incrementAndGetCreated(S source, K metricsName) {
MetricsCountSampleConfigurer<S, K, M> sampleConfigure = new MetricsCountSampleConfigurer<>();
sampleConfigure.setSource(source);
sampleConfigure.setMetricsName(metricsName);
Expand All @@ -63,12 +63,10 @@ private AtomicLong getAtomicCounter(S source, K metricsName) {

Assert.notNull(sampleConfigure.getMetric(), "metrics is null");

AtomicLong atomicCounter = metricAtomic.get(sampleConfigure.getMetric());

if (atomicCounter == null) {
atomicCounter = ConcurrentHashMapUtils.computeIfAbsent(
metricAtomic, sampleConfigure.getMetric(), k -> new AtomicLong());
}
return atomicCounter;
AtomicLong newCounter = new AtomicLong();
AtomicLong atomicCounter =
ConcurrentHashMapUtils.computeIfAbsent(metricAtomic, sampleConfigure.getMetric(), k -> newCounter);
atomicCounter.incrementAndGet();
return atomicCounter == newCounter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import java.util.concurrent.atomic.AtomicLong;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

public class ErrorCodeSampleTest {
Expand Down Expand Up @@ -70,6 +71,39 @@ void testErrorCodeMetric() {
((AtomicLong) ((CounterMetricSample<?>) metricSample).getValue()).get() == 2L, "Sample count error."));
}

@Test
void testErrorCodeMetricChangesAfterFirstLateEvent() {
FrameworkModel frameworkModel = FrameworkModel.defaultModel();
ApplicationModel applicationModel = frameworkModel.newApplication();

ApplicationConfig applicationConfig = new ApplicationConfig();
applicationConfig.setName("MyApplication1");

applicationModel.getApplicationConfigManager().setApplication(applicationConfig);

DefaultMetricsCollector defaultMetricsCollector = new DefaultMetricsCollector(applicationModel);
defaultMetricsCollector.setCollectEnabled(true);

ErrorCodeSampler sampler =
(ErrorCodeSampler) ReflectionUtils.getField(defaultMetricsCollector, "errorCodeSampler");

ErrorCodeMetricsListenRegister register =
(ErrorCodeMetricsListenRegister) ReflectionUtils.getField(sampler, "register");

Assertions.assertTrue(sampler.calSamplesChanged());
Assertions.assertFalse(sampler.calSamplesChanged());

register.onMessage("0-1", null);
Assertions.assertTrue(sampler.calSamplesChanged());
Assertions.assertFalse(sampler.calSamplesChanged());

register.onMessage("0-1", null);
Assertions.assertFalse(sampler.calSamplesChanged());

register.onMessage("0-2", null);
Assertions.assertTrue(sampler.calSamplesChanged());
}

@AfterEach
public void tearDown() {
FrameworkModel.defaultModel().destroy();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.dubbo.config.MetricsConfig;
import org.apache.dubbo.config.nested.PrometheusConfig;
import org.apache.dubbo.metrics.collector.DefaultMetricsCollector;
import org.apache.dubbo.metrics.collector.sample.MetricThreadPoolExhaustedListener;
import org.apache.dubbo.metrics.collector.sample.ThreadRejectMetricsCountSampler;
import org.apache.dubbo.metrics.model.sample.GaugeMetricSample;
import org.apache.dubbo.metrics.model.sample.MetricSample;
Expand Down Expand Up @@ -147,7 +148,6 @@ void testThreadPoolRejectMetrics() {
ThreadRejectMetricsCountSampler threadRejectMetricsCountSampler =
new ThreadRejectMetricsCountSampler(collector);
threadRejectMetricsCountSampler.inc(threadPoolExecutorName, threadPoolExecutorName);
threadRejectMetricsCountSampler.addMetricName(threadPoolExecutorName);
List<MetricSample> samples = collector.collect();
for (MetricSample sample : samples) {
Assertions.assertTrue(sample instanceof GaugeMetricSample);
Expand All @@ -160,4 +160,31 @@ void testThreadPoolRejectMetrics() {
Assertions.assertEquals(gaugeSample.applyAsLong(), 1);
}
}

@Test
void testThreadPoolRejectMetricsExportedAfterLateFirstEvent() {
metricsCollector.setCollectEnabled(true);
metricsCollector.setApplicationName(applicationModel.getApplicationName());
String threadPoolExecutorName = "DubboServerHandler-20816";

ThreadRejectMetricsCountSampler sampler = new ThreadRejectMetricsCountSampler(metricsCollector);
MetricThreadPoolExhaustedListener listener =
new MetricThreadPoolExhaustedListener(threadPoolExecutorName, sampler);

PrometheusMetricsReporter reporter = new PrometheusMetricsReporter(metricsConfig.toUrl(), applicationModel);
reporter.init();
try {
reporter.resetIfSamplesChanged();
Assertions.assertFalse(reporter.getResponse().contains("dubbo_thread_pool_reject_thread_count"));

listener.onEvent(null);
reporter.resetIfSamplesChanged();

String response = reporter.getResponse();
Assertions.assertTrue(response.contains("dubbo_thread_pool_reject_thread_count"));
Assertions.assertTrue(response.contains("thread.name=\"" + threadPoolExecutorName + "\""));
} finally {
reporter.destroy();
}
}
}