Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions micrometer-observation/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
dependencies {
api project(":processor")

api "org.apache.kafka:kafka-clients:$kafkaVersion"
api "io.micrometer:micrometer-core:$micrometerVersion"

itImplementation project(":testing")
itImplementation "io.micrometer:micrometer-observation-test:$micrometerVersion"
itImplementation "io.zipkin.brave:brave-instrumentation-kafka-clients:5.18.1"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
/*
* Copyright 2026 LY Corporation
*
* LY Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.processor.runtime;

import static org.junit.jupiter.api.Assertions.assertEquals;

import java.time.Duration;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ThreadLocalRandom;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.api.extension.RegisterExtension;

import com.linecorp.decaton.client.DecatonClientBuilder.DefaultKafkaProducerSupplier;
import com.linecorp.decaton.processor.TaskMetadata;
import com.linecorp.decaton.testing.KafkaClusterExtension;
import com.linecorp.decaton.testing.TestUtils;
import com.linecorp.decaton.testing.processor.ProcessedRecord;
import com.linecorp.decaton.testing.processor.ProcessingGuarantee;
import com.linecorp.decaton.testing.processor.ProcessingGuarantee.GuaranteeType;
import com.linecorp.decaton.testing.processor.ProcessorTestSuite;
import com.linecorp.decaton.testing.processor.ProducedRecord;

import brave.Tracing;
import brave.kafka.clients.KafkaTracing;
import brave.propagation.B3SingleFormat;
import brave.propagation.TraceContextOrSamplingFlags;
import io.micrometer.observation.tck.ObservationContextAssert;
import io.micrometer.observation.tck.TestObservationRegistry;

public class MicrometerObservationTest {
private final TestObservationRegistry registry = TestObservationRegistry.create();
private final String retryTopic = rule.admin().createRandomTopic(3, 3);

/**
* As an example, run tests using the Brave implementation.
* From Decaton’s perspective, as long as Micrometer Observation is used correctly,
* it’s OK—so writing tests with either Brave or OpenTelemetry is sufficient.
*/
private final Tracing tracing = Tracing.newBuilder().build();
private final KafkaTracing kafkaTracing = KafkaTracing.create(tracing);

@RegisterExtension
public static KafkaClusterExtension rule = new KafkaClusterExtension();

@AfterEach
public void tearDown() {
tracing.close();
rule.admin().deleteTopics(true, retryTopic);
}

@Test
@Timeout(30)
public void test() throws Exception {
// scenario:
// * half of arrived tasks are retried once
// * after retried (i.e. retryCount() > 0), no more retry
final DefaultKafkaProducerSupplier producerSupplier = new DefaultKafkaProducerSupplier();
ProcessorTestSuite
.builder(rule)
.configureProcessorsBuilder(
builder -> builder.thenProcess((ctx, task) -> {
if (ctx.metadata().retryCount() == 0 && ThreadLocalRandom.current().nextBoolean()) {
ctx.retry();
}
}))
.producerSupplier(
bootstrapServers -> kafkaTracing.producer(TestUtils.producer(bootstrapServers)))
.retryConfig(RetryConfig.builder()
.retryTopic(retryTopic)
.backoff(Duration.ofMillis(10))
.producerSupplier(config -> kafkaTracing.producer(
producerSupplier.getProducer(config)))
.build())
.tracingProvider(new MicrometerObservationProvider(registry))
// If we retry tasks, there's no guarantee about ordering nor serial processing
.excludeSemantics(GuaranteeType.PROCESS_ORDERING, GuaranteeType.SERIAL_PROCESSING)
.customSemantics(new MyPropagationGuarantee(registry, tracing))
.build()
.run();
}

private static class MyPropagationGuarantee implements ProcessingGuarantee {
private final TestObservationRegistry registry;
private final Tracing tracing;
private final Map<String, String> producedTraceIds = new ConcurrentHashMap<>();
private final Map<String, String> consumedTraceIds = new ConcurrentHashMap<>();
private final ConcurrentLinkedDeque<DecatonProcessorReceiverContext> consumedContexts =
new ConcurrentLinkedDeque<>();

MyPropagationGuarantee(TestObservationRegistry registry, Tracing tracing) {
this.registry = registry;
this.tracing = tracing;
}

@Override
public void onProduce(ProducedRecord record) {
producedTraceIds.put(record.task().getId(), tracing.currentTraceContext().get().traceIdString());
}

@Override
public void onProcess(TaskMetadata metadata, ProcessedRecord record) {
// It’s sufficient to verify that tracing headers (B3 headers in the case of Brave) can be accessed
// via ReceiverContext’s Propagator.Getter.
// The actual tracing implementation is handled by Micrometer Observation.
final DecatonProcessorReceiverContext context =
(DecatonProcessorReceiverContext) registry.getCurrentObservation().getContext();
final String b3 = context.getGetter().get(context.getCarrier(), "b3");
final TraceContextOrSamplingFlags traceContext = B3SingleFormat.parseB3SingleFormat(b3);
consumedTraceIds.put(record.task().getId(), traceContext.context().traceIdString());

consumedContexts.add(context);
}

@Override
public void doAssert() {
assertEquals(producedTraceIds, consumedTraceIds);

// Verify that the ObservationContext is injected into the processor thread as intended.
consumedContexts.stream().forEach(context -> {
ObservationContextAssert
.assertThat(context)
.hasNameEqualTo("decaton.processor")
.hasContextualNameEqualTo(
String.format("decaton processor (%s/%s)", context.getSubscriptionId(),
context.getProcessorName()))
.hasLowCardinalityKeyValue("subscriptionId", context.getSubscriptionId())
.hasLowCardinalityKeyValue("processor", context.getProcessorName())
.hasLowCardinalityKeyValue("topic", context.getCarrier().topic())
.hasLowCardinalityKeyValue("partition",
String.valueOf(context.getCarrier().partition()))
.hasHighCardinalityKeyValue("offset",
String.valueOf(context.getCarrier().offset()))
;
});
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
/*
* Copyright 2026 LY Corporation
*
* LY Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.processor.runtime;

import io.micrometer.observation.Observation.Context;
import io.micrometer.observation.ObservationConvention;

public interface DecatonProcessorObservationConvention
extends ObservationConvention<DecatonProcessorReceiverContext> {
@Override
default String getName() {
return "decaton.processor";
}

@Override
default boolean supportsContext(Context context) {
return context instanceof DecatonProcessorReceiverContext;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2026 LY Corporation
*
* LY Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.processor.runtime;

import io.micrometer.common.docs.KeyName;
import io.micrometer.observation.Observation.Context;
import io.micrometer.observation.ObservationConvention;
import io.micrometer.observation.docs.ObservationDocumentation;

public enum DecatonProcessorObservationDocumentation implements ObservationDocumentation {
INSTANCE {
@Override
public Class<? extends ObservationConvention<? extends Context>> getDefaultConvention() {
return DefaultDecatonProcessorObservationConvention.class;
}

@Override
public KeyName[] getLowCardinalityKeyNames() {
return LowCardinalityKeyNames.values();
}

@Override
public KeyName[] getHighCardinalityKeyNames() {
return HighCardinalityKeyNames.values();
}
};

public enum LowCardinalityKeyNames implements KeyName {
SUBSCRIPTION_ID {
@Override
public String asString() {
return "subscriptionId";
}
},
PROCESSOR {
@Override
public String asString() {
return "processor";
}
},
TOPIC {
@Override
public String asString() {
return "topic";
}
},
PARTITION {
@Override
public String asString() {
return "partition";
}
},
}

public enum HighCardinalityKeyNames implements KeyName {
OFFSET {
@Override
public String asString() {
return "offset";
}
},
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright 2026 LY Corporation
*
* LY Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.processor.runtime;

import java.nio.charset.StandardCharsets;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

import io.micrometer.observation.transport.Propagator;
import io.micrometer.observation.transport.ReceiverContext;
import lombok.Getter;
import lombok.Setter;

@Getter
public class DecatonProcessorReceiverContext extends ReceiverContext<ConsumerRecord<?, ?>> {
private final String subscriptionId;

/**
* If the processor has not started processing yet, it returns null.
*/
@Setter
private String processorName;

public DecatonProcessorReceiverContext(ConsumerRecord<?, ?> record, String subscriptionId) {
super(GETTER);
this.subscriptionId = subscriptionId;
setCarrier(record);
}

private static final Propagator.Getter<ConsumerRecord<?, ?>> GETTER =
new Propagator.Getter<ConsumerRecord<?, ?>>() {
@Override
public String get(ConsumerRecord<?, ?> carrier, String key) {
return lastStringHeader(carrier.headers(), key);
}

private String lastStringHeader(Headers headers, String key) {
final Header header = headers.lastHeader(key);
if (header == null || header.value() == null) {
return null;
}
return new String(header.value(), StandardCharsets.UTF_8);
}
};
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2026 LY Corporation
*
* LY Corporation licenses this file to you under the Apache License,
* version 2.0 (the "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at:
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/

package com.linecorp.decaton.processor.runtime;

import java.util.Optional;

import com.linecorp.decaton.processor.runtime.DecatonProcessorObservationDocumentation.HighCardinalityKeyNames;
import com.linecorp.decaton.processor.runtime.DecatonProcessorObservationDocumentation.LowCardinalityKeyNames;

import io.micrometer.common.KeyValues;

class DefaultDecatonProcessorObservationConvention implements DecatonProcessorObservationConvention {
@Override
public KeyValues getLowCardinalityKeyValues(DecatonProcessorReceiverContext context) {
return KeyValues.of(
LowCardinalityKeyNames.SUBSCRIPTION_ID.withValue(context.getSubscriptionId()),
LowCardinalityKeyNames.PROCESSOR.withValue(
Optional.ofNullable(context.getProcessorName()).orElse("-")),
LowCardinalityKeyNames.TOPIC.withValue(context.getCarrier().topic()),
LowCardinalityKeyNames.PARTITION.withValue(String.valueOf(context.getCarrier().partition()))
);
}

@Override
public KeyValues getHighCardinalityKeyValues(DecatonProcessorReceiverContext context) {
return KeyValues.of(
HighCardinalityKeyNames.OFFSET.withValue(String.valueOf(context.getCarrier().offset()))
);
}

@Override
public String getContextualName(DecatonProcessorReceiverContext context) {
return String.format(
"decaton processor (%s/%s)",
context.getSubscriptionId(),
context.getProcessorName());
}
}
Loading