Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,13 +96,14 @@ final class AgentHost implements IAgentHost {
this.messenger = Messenger.builder()
.transport(transport)
.opts(messengerOptions)
.env(options.env())
.scheduler(hostScheduler)
.build();
hostAddressResolver = resolver;
this.store.start(messenger.receive()
.filter(m -> m.value().message.hasCrdtStoreMessage())
.map(m -> m.value().message.getCrdtStoreMessage()));
tags = new String[] {"local", options.addr() + ":" + messenger.bindAddress().getPort()};
tags = new String[] {"local", options.addr() + ":" + messenger.bindAddress().getPort(), "clusterEnv", options.env()};
this.memberList = new HostMemberList(options.addr(), messenger.bindAddress().getPort(),
messenger, hostScheduler, store, hostAddressResolver, tags);
IFailureDetector failureDetector = FailureDetector.builder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,9 @@ public class Messenger implements IMessenger {
private final MessengerOptions opts;
private final MetricManager metricManager;
private State state = State.INIT;

@Builder
private Messenger(ITransport transport, Scheduler scheduler, MessengerOptions opts) {
private Messenger(ITransport transport, Scheduler scheduler, MessengerOptions opts, String env) {
this.transport = new MessengerTransport(transport);
this.opts = opts.toBuilder().build();
this.scheduler = scheduler;
Expand All @@ -71,7 +72,7 @@ private Messenger(ITransport transport, Scheduler scheduler, MessengerOptions op
opts.retransmitMultiplier(),
opts.spreadPeriod(),
this.scheduler);
this.metricManager = new MetricManager(localAddress);
this.metricManager = new MetricManager(localAddress, env);
}

@Override
Expand Down Expand Up @@ -250,11 +251,12 @@ private static class MetricManager {
final Map<ClusterMessage.ClusterMessageTypeCase, Counter> gossipHeardCounters = Maps.newHashMap();
final Counter gossipSpreadCounter = Metrics.counter("cluster.gossip.count");

MetricManager(InetSocketAddress localAddress) {
MetricManager(InetSocketAddress localAddress, String env) {
for (ClusterMessage.ClusterMessageTypeCase typeCase : ClusterMessage.ClusterMessageTypeCase.values()) {
if (typeCase != ClusterMessage.ClusterMessageTypeCase.CLUSTERMESSAGETYPE_NOT_SET) {
Tags tags = Tags
.of("local", localAddress.getAddress().getHostAddress() + ":" + localAddress.getPort())
.and("clusterEnv", env)
.and("type", typeCase.name());
msgSendCounters.put(typeCase,
Metrics.counter("basecluster.send.count", tags));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public final class TCPTransport extends AbstractTransport {
tcpListeningChannel = setupTcpServer(bindAddr, serverSslContext);
InetSocketAddress localAddress = (InetSocketAddress) tcpListeningChannel.channel().localAddress();
Tags tags = Tags.of("proto", "tcp")
.and("clusterEnv", env)
.and("local", localAddress.getAddress().getHostAddress() + ":" + localAddress.getPort());
sendBytes = Counter.builder("basecluster.send.bytes")
.tags(tags)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ protected void initChannel(DatagramChannel channel) {
.channel();
localAddress = (InetSocketAddress) channel.localAddress();
Tags tags = Tags.of("proto", "udp")
.and("clusterEnv", env)
.and("local", localAddress.getAddress().getHostAddress() + ":" + localAddress.getPort());

sendBytes = Counter.builder("basecluster.send.bytes")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,15 @@ public void init() {
.transport(localTransport)
.scheduler(scheduler)
.opts(opts)
.env("test")
.build();
local = localMessenger.bindAddress();

remoteMessenger = Messenger.builder()
.transport(remoteTransport)
.scheduler(scheduler)
.opts(opts)
.env("test")
.build();
remote = remoteMessenger.bindAddress();
localRecipientSelector = new IRecipientSelector() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public void shutdown() {
.transport(transport)
.scheduler(Schedulers.io())
.opts(new MessengerOptions())
.env("test")
.build();
localMessenger.start(new IRecipientSelector() {
@Override
Expand Down Expand Up @@ -67,6 +68,7 @@ public void shutdownWithoutStart() {
.transport(transport)
.scheduler(Schedulers.io())
.opts(new MessengerOptions())
.env("test")
.build();
try {
localMessenger.shutdown();
Expand Down
4 changes: 4 additions & 0 deletions build/build-bifromq-starter/conf/standalone.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
# or experiment with the built-in setting provider which is a simple webhook based implementation.
# settingProviderFQN: "org.apache.bifromq.demo.plugin.DemoSettingProvider"

# Process-level metrics tags applied to all metrics in this process
metricsTags:
env: Test

mqttServiceConfig:
server:
tcpListener:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,9 +159,10 @@ public static void main(String[] args) {
StandaloneConfigConsolidator.consolidate(config);
printConfigs(config);

if (!Strings.isNullOrEmpty(config.getClusterConfig().getEnv())) {
Metrics.globalRegistry.config().commonTags("env", config.getClusterConfig().getEnv());
if (config.getMetricsTags() != null) {
config.getMetricsTags().forEach(Metrics.globalRegistry.config()::commonTags);
}

Injector serviceInjector = Guice.createInjector(
new ConfigModule(config),
new RPCClientSSLContextModule(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@
import org.apache.bifromq.starter.config.model.mqtt.MQTTServiceConfig;
import org.apache.bifromq.starter.config.model.retain.RetainServiceConfig;

import java.util.Map;

@Getter
@Setter
public class StandaloneConfig {
Expand Down Expand Up @@ -66,4 +68,7 @@ public class StandaloneConfig {
@JsonSetter(nulls = Nulls.SKIP)
private APIServerConfig apiServerConfig = new APIServerConfig();

@JsonSetter(nulls = Nulls.SKIP)
private Map<String, String> metricsTags;

}
Loading