diff --git a/base-cluster/src/main/java/org/apache/bifromq/basecluster/AgentHost.java b/base-cluster/src/main/java/org/apache/bifromq/basecluster/AgentHost.java index d74230271..61e8f7209 100644 --- a/base-cluster/src/main/java/org/apache/bifromq/basecluster/AgentHost.java +++ b/base-cluster/src/main/java/org/apache/bifromq/basecluster/AgentHost.java @@ -96,13 +96,14 @@ final class AgentHost implements IAgentHost { this.messenger = Messenger.builder() .transport(transport) .opts(messengerOptions) + .env(options.env()) .scheduler(hostScheduler) .build(); hostAddressResolver = resolver; this.store.start(messenger.receive() .filter(m -> m.value().message.hasCrdtStoreMessage()) .map(m -> m.value().message.getCrdtStoreMessage())); - tags = new String[] {"local", options.addr() + ":" + messenger.bindAddress().getPort()}; + tags = new String[] {"local", options.addr() + ":" + messenger.bindAddress().getPort(), "clusterEnv", options.env()}; this.memberList = new HostMemberList(options.addr(), messenger.bindAddress().getPort(), messenger, hostScheduler, store, hostAddressResolver, tags); IFailureDetector failureDetector = FailureDetector.builder() diff --git a/base-cluster/src/main/java/org/apache/bifromq/basecluster/messenger/Messenger.java b/base-cluster/src/main/java/org/apache/bifromq/basecluster/messenger/Messenger.java index 7af1d6ecd..22e26caf5 100644 --- a/base-cluster/src/main/java/org/apache/bifromq/basecluster/messenger/Messenger.java +++ b/base-cluster/src/main/java/org/apache/bifromq/basecluster/messenger/Messenger.java @@ -61,8 +61,9 @@ public class Messenger implements IMessenger { private final MessengerOptions opts; private final MetricManager metricManager; private State state = State.INIT; + @Builder - private Messenger(ITransport transport, Scheduler scheduler, MessengerOptions opts) { + private Messenger(ITransport transport, Scheduler scheduler, MessengerOptions opts, String env) { this.transport = new MessengerTransport(transport); this.opts = opts.toBuilder().build(); this.scheduler = scheduler; @@ -71,7 +72,7 @@ private Messenger(ITransport transport, Scheduler scheduler, MessengerOptions op opts.retransmitMultiplier(), opts.spreadPeriod(), this.scheduler); - this.metricManager = new MetricManager(localAddress); + this.metricManager = new MetricManager(localAddress, env); } @Override @@ -250,11 +251,12 @@ private static class MetricManager { final Map gossipHeardCounters = Maps.newHashMap(); final Counter gossipSpreadCounter = Metrics.counter("cluster.gossip.count"); - MetricManager(InetSocketAddress localAddress) { + MetricManager(InetSocketAddress localAddress, String env) { for (ClusterMessage.ClusterMessageTypeCase typeCase : ClusterMessage.ClusterMessageTypeCase.values()) { if (typeCase != ClusterMessage.ClusterMessageTypeCase.CLUSTERMESSAGETYPE_NOT_SET) { Tags tags = Tags .of("local", localAddress.getAddress().getHostAddress() + ":" + localAddress.getPort()) + .and("clusterEnv", env) .and("type", typeCase.name()); msgSendCounters.put(typeCase, Metrics.counter("basecluster.send.count", tags)); diff --git a/base-cluster/src/main/java/org/apache/bifromq/basecluster/transport/TCPTransport.java b/base-cluster/src/main/java/org/apache/bifromq/basecluster/transport/TCPTransport.java index b7f6dedc2..917de18c8 100644 --- a/base-cluster/src/main/java/org/apache/bifromq/basecluster/transport/TCPTransport.java +++ b/base-cluster/src/main/java/org/apache/bifromq/basecluster/transport/TCPTransport.java @@ -100,6 +100,7 @@ public final class TCPTransport extends AbstractTransport { tcpListeningChannel = setupTcpServer(bindAddr, serverSslContext); InetSocketAddress localAddress = (InetSocketAddress) tcpListeningChannel.channel().localAddress(); Tags tags = Tags.of("proto", "tcp") + .and("clusterEnv", env) .and("local", localAddress.getAddress().getHostAddress() + ":" + localAddress.getPort()); sendBytes = Counter.builder("basecluster.send.bytes") .tags(tags) diff --git a/base-cluster/src/main/java/org/apache/bifromq/basecluster/transport/UDPTransport.java b/base-cluster/src/main/java/org/apache/bifromq/basecluster/transport/UDPTransport.java index c133f1b76..e13153607 100644 --- a/base-cluster/src/main/java/org/apache/bifromq/basecluster/transport/UDPTransport.java +++ b/base-cluster/src/main/java/org/apache/bifromq/basecluster/transport/UDPTransport.java @@ -86,6 +86,7 @@ protected void initChannel(DatagramChannel channel) { .channel(); localAddress = (InetSocketAddress) channel.localAddress(); Tags tags = Tags.of("proto", "udp") + .and("clusterEnv", env) .and("local", localAddress.getAddress().getHostAddress() + ":" + localAddress.getPort()); sendBytes = Counter.builder("basecluster.send.bytes") diff --git a/base-cluster/src/test/java/org/apache/bifromq/basecluster/messenger/MessengerFuncTest.java b/base-cluster/src/test/java/org/apache/bifromq/basecluster/messenger/MessengerFuncTest.java index 649b7972d..1a3f3a310 100644 --- a/base-cluster/src/test/java/org/apache/bifromq/basecluster/messenger/MessengerFuncTest.java +++ b/base-cluster/src/test/java/org/apache/bifromq/basecluster/messenger/MessengerFuncTest.java @@ -88,6 +88,7 @@ public void init() { .transport(localTransport) .scheduler(scheduler) .opts(opts) + .env("test") .build(); local = localMessenger.bindAddress(); @@ -95,6 +96,7 @@ public void init() { .transport(remoteTransport) .scheduler(scheduler) .opts(opts) + .env("test") .build(); remote = remoteMessenger.bindAddress(); localRecipientSelector = new IRecipientSelector() { diff --git a/base-cluster/src/test/java/org/apache/bifromq/basecluster/messenger/MessengerTest.java b/base-cluster/src/test/java/org/apache/bifromq/basecluster/messenger/MessengerTest.java index ec0bd4110..ac5eef556 100644 --- a/base-cluster/src/test/java/org/apache/bifromq/basecluster/messenger/MessengerTest.java +++ b/base-cluster/src/test/java/org/apache/bifromq/basecluster/messenger/MessengerTest.java @@ -40,6 +40,7 @@ public void shutdown() { .transport(transport) .scheduler(Schedulers.io()) .opts(new MessengerOptions()) + .env("test") .build(); localMessenger.start(new IRecipientSelector() { @Override @@ -67,6 +68,7 @@ public void shutdownWithoutStart() { .transport(transport) .scheduler(Schedulers.io()) .opts(new MessengerOptions()) + .env("test") .build(); try { localMessenger.shutdown(); diff --git a/build/build-bifromq-starter/conf/standalone.yml b/build/build-bifromq-starter/conf/standalone.yml index 7e2f4c591..bd0ad6817 100644 --- a/build/build-bifromq-starter/conf/standalone.yml +++ b/build/build-bifromq-starter/conf/standalone.yml @@ -32,6 +32,10 @@ # or experiment with the built-in setting provider which is a simple webhook based implementation. # settingProviderFQN: "org.apache.bifromq.demo.plugin.DemoSettingProvider" +# Process-level metrics tags applied to all metrics in this process +metricsTags: + env: Test + mqttServiceConfig: server: tcpListener: diff --git a/build/build-bifromq-starter/src/main/java/org/apache/bifromq/starter/StandaloneStarter.java b/build/build-bifromq-starter/src/main/java/org/apache/bifromq/starter/StandaloneStarter.java index d1ba3501b..a831c81b3 100644 --- a/build/build-bifromq-starter/src/main/java/org/apache/bifromq/starter/StandaloneStarter.java +++ b/build/build-bifromq-starter/src/main/java/org/apache/bifromq/starter/StandaloneStarter.java @@ -159,9 +159,10 @@ public static void main(String[] args) { StandaloneConfigConsolidator.consolidate(config); printConfigs(config); - if (!Strings.isNullOrEmpty(config.getClusterConfig().getEnv())) { - Metrics.globalRegistry.config().commonTags("env", config.getClusterConfig().getEnv()); + if (config.getMetricsTags() != null) { + config.getMetricsTags().forEach(Metrics.globalRegistry.config()::commonTags); } + Injector serviceInjector = Guice.createInjector( new ConfigModule(config), new RPCClientSSLContextModule(), diff --git a/build/build-bifromq-starter/src/main/java/org/apache/bifromq/starter/config/StandaloneConfig.java b/build/build-bifromq-starter/src/main/java/org/apache/bifromq/starter/config/StandaloneConfig.java index 66851d124..749d04abd 100644 --- a/build/build-bifromq-starter/src/main/java/org/apache/bifromq/starter/config/StandaloneConfig.java +++ b/build/build-bifromq-starter/src/main/java/org/apache/bifromq/starter/config/StandaloneConfig.java @@ -33,6 +33,8 @@ import org.apache.bifromq.starter.config.model.mqtt.MQTTServiceConfig; import org.apache.bifromq.starter.config.model.retain.RetainServiceConfig; +import java.util.Map; + @Getter @Setter public class StandaloneConfig { @@ -66,4 +68,7 @@ public class StandaloneConfig { @JsonSetter(nulls = Nulls.SKIP) private APIServerConfig apiServerConfig = new APIServerConfig(); + @JsonSetter(nulls = Nulls.SKIP) + private Map metricsTags; + }