From 670579362e080549fff015820f4c6287690736f7 Mon Sep 17 00:00:00 2001 From: ryosuke-hasebe Date: Tue, 3 Feb 2026 22:10:48 +0900 Subject: [PATCH] Support Micrometer Observation --- micrometer-observation/build.gradle | 10 ++ .../runtime/MicrometerObservationTest.java | 156 ++++++++++++++++++ ...DecatonProcessorObservationConvention.java | 33 ++++ ...atonProcessorObservationDocumentation.java | 77 +++++++++ .../DecatonProcessorReceiverContext.java | 61 +++++++ ...DecatonProcessorObservationConvention.java | 52 ++++++ ...ometerObservationProcessorTraceHandle.java | 50 ++++++ .../MicrometerObservationProvider.java | 52 ++++++ ...icrometerObservationRecordTraceHandle.java | 45 +++++ settings.gradle | 1 + 10 files changed, 537 insertions(+) create mode 100644 micrometer-observation/build.gradle create mode 100644 micrometer-observation/src/it/java/com/linecorp/decaton/processor/runtime/MicrometerObservationTest.java create mode 100644 micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/DecatonProcessorObservationConvention.java create mode 100644 micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/DecatonProcessorObservationDocumentation.java create mode 100644 micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/DecatonProcessorReceiverContext.java create mode 100644 micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/DefaultDecatonProcessorObservationConvention.java create mode 100644 micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerObservationProcessorTraceHandle.java create mode 100644 micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerObservationProvider.java create mode 100644 micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerObservationRecordTraceHandle.java diff --git a/micrometer-observation/build.gradle b/micrometer-observation/build.gradle new file mode 100644 index 00000000..8ae43508 --- /dev/null +++ b/micrometer-observation/build.gradle @@ -0,0 +1,10 @@ +dependencies { + api project(":processor") + + api "org.apache.kafka:kafka-clients:$kafkaVersion" + api "io.micrometer:micrometer-core:$micrometerVersion" + + itImplementation project(":testing") + itImplementation "io.micrometer:micrometer-observation-test:$micrometerVersion" + itImplementation "io.zipkin.brave:brave-instrumentation-kafka-clients:5.18.1" +} diff --git a/micrometer-observation/src/it/java/com/linecorp/decaton/processor/runtime/MicrometerObservationTest.java b/micrometer-observation/src/it/java/com/linecorp/decaton/processor/runtime/MicrometerObservationTest.java new file mode 100644 index 00000000..8f142249 --- /dev/null +++ b/micrometer-observation/src/it/java/com/linecorp/decaton/processor/runtime/MicrometerObservationTest.java @@ -0,0 +1,156 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.decaton.processor.runtime; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ThreadLocalRandom; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.api.extension.RegisterExtension; + +import com.linecorp.decaton.client.DecatonClientBuilder.DefaultKafkaProducerSupplier; +import com.linecorp.decaton.processor.TaskMetadata; +import com.linecorp.decaton.testing.KafkaClusterExtension; +import com.linecorp.decaton.testing.TestUtils; +import com.linecorp.decaton.testing.processor.ProcessedRecord; +import com.linecorp.decaton.testing.processor.ProcessingGuarantee; +import com.linecorp.decaton.testing.processor.ProcessingGuarantee.GuaranteeType; +import com.linecorp.decaton.testing.processor.ProcessorTestSuite; +import com.linecorp.decaton.testing.processor.ProducedRecord; + +import brave.Tracing; +import brave.kafka.clients.KafkaTracing; +import brave.propagation.B3SingleFormat; +import brave.propagation.TraceContextOrSamplingFlags; +import io.micrometer.observation.tck.ObservationContextAssert; +import io.micrometer.observation.tck.TestObservationRegistry; + +public class MicrometerObservationTest { + private final TestObservationRegistry registry = TestObservationRegistry.create(); + private final String retryTopic = rule.admin().createRandomTopic(3, 3); + + /** + * As an example, run tests using the Brave implementation. + * From Decaton’s perspective, as long as Micrometer Observation is used correctly, + * it’s OK—so writing tests with either Brave or OpenTelemetry is sufficient. + */ + private final Tracing tracing = Tracing.newBuilder().build(); + private final KafkaTracing kafkaTracing = KafkaTracing.create(tracing); + + @RegisterExtension + public static KafkaClusterExtension rule = new KafkaClusterExtension(); + + @AfterEach + public void tearDown() { + tracing.close(); + rule.admin().deleteTopics(true, retryTopic); + } + + @Test + @Timeout(30) + public void test() throws Exception { + // scenario: + // * half of arrived tasks are retried once + // * after retried (i.e. retryCount() > 0), no more retry + final DefaultKafkaProducerSupplier producerSupplier = new DefaultKafkaProducerSupplier(); + ProcessorTestSuite + .builder(rule) + .configureProcessorsBuilder( + builder -> builder.thenProcess((ctx, task) -> { + if (ctx.metadata().retryCount() == 0 && ThreadLocalRandom.current().nextBoolean()) { + ctx.retry(); + } + })) + .producerSupplier( + bootstrapServers -> kafkaTracing.producer(TestUtils.producer(bootstrapServers))) + .retryConfig(RetryConfig.builder() + .retryTopic(retryTopic) + .backoff(Duration.ofMillis(10)) + .producerSupplier(config -> kafkaTracing.producer( + producerSupplier.getProducer(config))) + .build()) + .tracingProvider(new MicrometerObservationProvider(registry)) + // If we retry tasks, there's no guarantee about ordering nor serial processing + .excludeSemantics(GuaranteeType.PROCESS_ORDERING, GuaranteeType.SERIAL_PROCESSING) + .customSemantics(new MyPropagationGuarantee(registry, tracing)) + .build() + .run(); + } + + private static class MyPropagationGuarantee implements ProcessingGuarantee { + private final TestObservationRegistry registry; + private final Tracing tracing; + private final Map producedTraceIds = new ConcurrentHashMap<>(); + private final Map consumedTraceIds = new ConcurrentHashMap<>(); + private final ConcurrentLinkedDeque consumedContexts = + new ConcurrentLinkedDeque<>(); + + MyPropagationGuarantee(TestObservationRegistry registry, Tracing tracing) { + this.registry = registry; + this.tracing = tracing; + } + + @Override + public void onProduce(ProducedRecord record) { + producedTraceIds.put(record.task().getId(), tracing.currentTraceContext().get().traceIdString()); + } + + @Override + public void onProcess(TaskMetadata metadata, ProcessedRecord record) { + // It’s sufficient to verify that tracing headers (B3 headers in the case of Brave) can be accessed + // via ReceiverContext’s Propagator.Getter. + // The actual tracing implementation is handled by Micrometer Observation. + final DecatonProcessorReceiverContext context = + (DecatonProcessorReceiverContext) registry.getCurrentObservation().getContext(); + final String b3 = context.getGetter().get(context.getCarrier(), "b3"); + final TraceContextOrSamplingFlags traceContext = B3SingleFormat.parseB3SingleFormat(b3); + consumedTraceIds.put(record.task().getId(), traceContext.context().traceIdString()); + + consumedContexts.add(context); + } + + @Override + public void doAssert() { + assertEquals(producedTraceIds, consumedTraceIds); + + // Verify that the ObservationContext is injected into the processor thread as intended. + consumedContexts.stream().forEach(context -> { + ObservationContextAssert + .assertThat(context) + .hasNameEqualTo("decaton.processor") + .hasContextualNameEqualTo( + String.format("decaton processor (%s/%s)", context.getSubscriptionId(), + context.getProcessorName())) + .hasLowCardinalityKeyValue("subscriptionId", context.getSubscriptionId()) + .hasLowCardinalityKeyValue("processor", context.getProcessorName()) + .hasLowCardinalityKeyValue("topic", context.getCarrier().topic()) + .hasLowCardinalityKeyValue("partition", + String.valueOf(context.getCarrier().partition())) + .hasHighCardinalityKeyValue("offset", + String.valueOf(context.getCarrier().offset())) + ; + }); + } + } +} diff --git a/micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/DecatonProcessorObservationConvention.java b/micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/DecatonProcessorObservationConvention.java new file mode 100644 index 00000000..2d1b7445 --- /dev/null +++ b/micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/DecatonProcessorObservationConvention.java @@ -0,0 +1,33 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.decaton.processor.runtime; + +import io.micrometer.observation.Observation.Context; +import io.micrometer.observation.ObservationConvention; + +public interface DecatonProcessorObservationConvention + extends ObservationConvention { + @Override + default String getName() { + return "decaton.processor"; + } + + @Override + default boolean supportsContext(Context context) { + return context instanceof DecatonProcessorReceiverContext; + } +} diff --git a/micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/DecatonProcessorObservationDocumentation.java b/micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/DecatonProcessorObservationDocumentation.java new file mode 100644 index 00000000..f974771e --- /dev/null +++ b/micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/DecatonProcessorObservationDocumentation.java @@ -0,0 +1,77 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.decaton.processor.runtime; + +import io.micrometer.common.docs.KeyName; +import io.micrometer.observation.Observation.Context; +import io.micrometer.observation.ObservationConvention; +import io.micrometer.observation.docs.ObservationDocumentation; + +public enum DecatonProcessorObservationDocumentation implements ObservationDocumentation { + INSTANCE { + @Override + public Class> getDefaultConvention() { + return DefaultDecatonProcessorObservationConvention.class; + } + + @Override + public KeyName[] getLowCardinalityKeyNames() { + return LowCardinalityKeyNames.values(); + } + + @Override + public KeyName[] getHighCardinalityKeyNames() { + return HighCardinalityKeyNames.values(); + } + }; + + public enum LowCardinalityKeyNames implements KeyName { + SUBSCRIPTION_ID { + @Override + public String asString() { + return "subscriptionId"; + } + }, + PROCESSOR { + @Override + public String asString() { + return "processor"; + } + }, + TOPIC { + @Override + public String asString() { + return "topic"; + } + }, + PARTITION { + @Override + public String asString() { + return "partition"; + } + }, + } + + public enum HighCardinalityKeyNames implements KeyName { + OFFSET { + @Override + public String asString() { + return "offset"; + } + }, + } +} diff --git a/micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/DecatonProcessorReceiverContext.java b/micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/DecatonProcessorReceiverContext.java new file mode 100644 index 00000000..a67ce171 --- /dev/null +++ b/micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/DecatonProcessorReceiverContext.java @@ -0,0 +1,61 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.decaton.processor.runtime; + +import java.nio.charset.StandardCharsets; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; + +import io.micrometer.observation.transport.Propagator; +import io.micrometer.observation.transport.ReceiverContext; +import lombok.Getter; +import lombok.Setter; + +@Getter +public class DecatonProcessorReceiverContext extends ReceiverContext> { + private final String subscriptionId; + + /** + * If the processor has not started processing yet, it returns null. + */ + @Setter + private String processorName; + + public DecatonProcessorReceiverContext(ConsumerRecord record, String subscriptionId) { + super(GETTER); + this.subscriptionId = subscriptionId; + setCarrier(record); + } + + private static final Propagator.Getter> GETTER = + new Propagator.Getter>() { + @Override + public String get(ConsumerRecord carrier, String key) { + return lastStringHeader(carrier.headers(), key); + } + + private String lastStringHeader(Headers headers, String key) { + final Header header = headers.lastHeader(key); + if (header == null || header.value() == null) { + return null; + } + return new String(header.value(), StandardCharsets.UTF_8); + } + }; +} diff --git a/micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/DefaultDecatonProcessorObservationConvention.java b/micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/DefaultDecatonProcessorObservationConvention.java new file mode 100644 index 00000000..52803c3a --- /dev/null +++ b/micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/DefaultDecatonProcessorObservationConvention.java @@ -0,0 +1,52 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.decaton.processor.runtime; + +import java.util.Optional; + +import com.linecorp.decaton.processor.runtime.DecatonProcessorObservationDocumentation.HighCardinalityKeyNames; +import com.linecorp.decaton.processor.runtime.DecatonProcessorObservationDocumentation.LowCardinalityKeyNames; + +import io.micrometer.common.KeyValues; + +class DefaultDecatonProcessorObservationConvention implements DecatonProcessorObservationConvention { + @Override + public KeyValues getLowCardinalityKeyValues(DecatonProcessorReceiverContext context) { + return KeyValues.of( + LowCardinalityKeyNames.SUBSCRIPTION_ID.withValue(context.getSubscriptionId()), + LowCardinalityKeyNames.PROCESSOR.withValue( + Optional.ofNullable(context.getProcessorName()).orElse("-")), + LowCardinalityKeyNames.TOPIC.withValue(context.getCarrier().topic()), + LowCardinalityKeyNames.PARTITION.withValue(String.valueOf(context.getCarrier().partition())) + ); + } + + @Override + public KeyValues getHighCardinalityKeyValues(DecatonProcessorReceiverContext context) { + return KeyValues.of( + HighCardinalityKeyNames.OFFSET.withValue(String.valueOf(context.getCarrier().offset())) + ); + } + + @Override + public String getContextualName(DecatonProcessorReceiverContext context) { + return String.format( + "decaton processor (%s/%s)", + context.getSubscriptionId(), + context.getProcessorName()); + } +} diff --git a/micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerObservationProcessorTraceHandle.java b/micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerObservationProcessorTraceHandle.java new file mode 100644 index 00000000..05d02c4a --- /dev/null +++ b/micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerObservationProcessorTraceHandle.java @@ -0,0 +1,50 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.decaton.processor.runtime; + +import com.linecorp.decaton.processor.tracing.TracingProvider; + +import io.micrometer.observation.Observation; +import io.micrometer.observation.Observation.Event; + +final class MicrometerObservationProcessorTraceHandle implements TracingProvider.ProcessorTraceHandle { + private final Observation observation; + private Observation.Scope scope; + + MicrometerObservationProcessorTraceHandle(Observation observation) { + this.observation = observation; + } + + @Override + public void processingStart() { + observation.event(Event.of("processingStart")); + scope = observation.openScope(); + } + + @Override + public void processingReturn() { + observation.event(Event.of("processingReturn")); + if (scope != null) { + scope.close(); + } + } + + @Override + public void processingCompletion() { + observation.event(Event.of("processingCompletion")); + } +} diff --git a/micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerObservationProvider.java b/micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerObservationProvider.java new file mode 100644 index 00000000..4e88aaa7 --- /dev/null +++ b/micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerObservationProvider.java @@ -0,0 +1,52 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.decaton.processor.runtime; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +import com.linecorp.decaton.processor.tracing.TracingProvider; + +import io.micrometer.observation.Observation; +import io.micrometer.observation.ObservationRegistry; + +public class MicrometerObservationProvider implements TracingProvider { + private final ObservationRegistry registry; + private final DecatonProcessorObservationConvention customConvention; + private final DecatonProcessorObservationConvention defaultConvention = + new DefaultDecatonProcessorObservationConvention(); + + public MicrometerObservationProvider(ObservationRegistry registry) { + this(registry, null); + } + + public MicrometerObservationProvider(ObservationRegistry registry, + DecatonProcessorObservationConvention customConvention) { + this.registry = registry; + this.customConvention = customConvention; + } + + @Override + public RecordTraceHandle traceFor(ConsumerRecord record, String subscriptionId) { + final Observation observation = DecatonProcessorObservationDocumentation.INSTANCE.observation( + customConvention, + defaultConvention, + () -> new DecatonProcessorReceiverContext(record, subscriptionId), + registry + ); + return new MicrometerObservationRecordTraceHandle(observation); + } +} diff --git a/micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerObservationRecordTraceHandle.java b/micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerObservationRecordTraceHandle.java new file mode 100644 index 00000000..4cb5373f --- /dev/null +++ b/micrometer-observation/src/main/java/com/linecorp/decaton/processor/runtime/MicrometerObservationRecordTraceHandle.java @@ -0,0 +1,45 @@ +/* + * Copyright 2026 LY Corporation + * + * LY Corporation licenses this file to you under the Apache License, + * version 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations + * under the License. + */ + +package com.linecorp.decaton.processor.runtime; + +import com.linecorp.decaton.processor.DecatonProcessor; +import com.linecorp.decaton.processor.tracing.TracingProvider; +import com.linecorp.decaton.processor.tracing.TracingProvider.ProcessorTraceHandle; + +import io.micrometer.observation.Observation; + +final class MicrometerObservationRecordTraceHandle implements TracingProvider.RecordTraceHandle { + private final Observation observation; + + MicrometerObservationRecordTraceHandle(Observation observation) { + this.observation = observation; + this.observation.start(); + } + + @Override + public ProcessorTraceHandle childFor(DecatonProcessor processor) { + final DecatonProcessorReceiverContext context = + (DecatonProcessorReceiverContext) observation.getContext(); + context.setProcessorName(processor.name()); + return new MicrometerObservationProcessorTraceHandle(observation); + } + + @Override + public void processingCompletion() { + observation.stop(); + } +} diff --git a/settings.gradle b/settings.gradle index e8fa112a..3f66346e 100644 --- a/settings.gradle +++ b/settings.gradle @@ -16,3 +16,4 @@ include ":testing" include ":benchmark" include ":brave" include ":micrometer-tracing" +include ":micrometer-observation"