From 0c29962be018eedde5f2f83aabc6343370fdff25 Mon Sep 17 00:00:00 2001 From: artur-ciocanu Date: Fri, 16 Jan 2026 18:51:59 +0200 Subject: [PATCH] Adding raw event subscription alongside CloudEvent subscription (#1617) * Adding raw event subscription. Signed-off-by: Artur Ciocanu * Use proper method overloads for subscribeToEvents() Signed-off-by: Artur Ciocanu * Updating the examples to use the latest changes. Signed-off-by: Artur Ciocanu * FIx CI failures Signed-off-by: Artur Ciocanu * FIx CI failures, take 2 Signed-off-by: Artur Ciocanu --------- Signed-off-by: Artur Ciocanu --- .../io/dapr/examples/pubsub/stream/README.md | 81 ++++++--- .../examples/pubsub/stream/Subscriber.java | 46 +++--- .../pubsub/stream/SubscriberCloudEvent.java | 78 +++++++++ .../dapr/it/pubsub/stream/PubSubStreamIT.java | 137 +++++++++++++++ .../java/io/dapr/client/DaprClientImpl.java | 22 ++- .../io/dapr/client/DaprPreviewClient.java | 27 ++- .../EventSubscriberStreamObserver.java | 69 ++++++-- .../client/DaprPreviewClientGrpcTest.java | 103 +++++++++++- .../EventSubscriberStreamObserverTest.java | 156 +++++++++++++----- 9 files changed, 613 insertions(+), 106 deletions(-) create mode 100644 examples/src/main/java/io/dapr/examples/pubsub/stream/SubscriberCloudEvent.java diff --git a/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md b/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md index da3e4e2482..7916f62783 100644 --- a/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md +++ b/examples/src/main/java/io/dapr/examples/pubsub/stream/README.md @@ -3,7 +3,7 @@ In this sample, we'll create a publisher and a subscriber java applications using Dapr, based on the publish-subscribe pattern. The publisher will generate messages of a specific topic, while a subscriber will listen for messages of a specific topic via a bi-directional stream. All is abstracted by the SDK. See the [Dapr Pub-Sub docs](https://docs.dapr.io/developing-applications/building-blocks/pubsub/) to understand when this pattern might be a good choice for your software architecture. Visit [this](https://docs.dapr.io/developing-applications/building-blocks/pubsub/pubsub-overview/) link for more information about Dapr and Pub-Sub. - + ## Pub-Sub Sample using the Java-SDK This sample shows how the subscription to events no longer requires the application to listen to an HTTP or gRPC port. This example uses Redis Streams (enabled in Redis versions => 5). @@ -41,45 +41,80 @@ cd examples Run `dapr init` to initialize Dapr in Self-Hosted Mode if it's not already initialized. -### Running the subscriber - -The subscriber uses the `DaprPreviewClient` interface to use a new feature where events are subscribed via a streaming and processed via a callback interface. +## Running the Subscriber +The subscriber uses the `DaprPreviewClient` interface to subscribe to events via streaming and process them using reactive operators. +The SDK provides two ways to subscribe to events: -The publisher is a simple Java application with a main method that uses the Dapr gRPC Client to publish 10 messages to a specific topic. +### Option 1: Raw Data Subscription -In the `Subscriber.java` file, you will find the `Subscriber` class, containing the main method. The main method declares a `DaprPreviewClient` using the `DaprClientBuilder` class. When invoking `subscribeToEvents`, the method returns a `Flux>` that can be processed using reactive operators like `doOnNext()` for event handling and `doOnError()` for error handling. The example uses `blockLast()` to keep the subscriber running indefinitely. For production use cases requiring explicit subscription lifecycle control, you can use `.subscribe()` which returns a `Disposable` that can be disposed via `disposable.dispose()`. +Use `TypeRef.STRING` (or any other type) to receive the deserialized message data directly: ```java public class Subscriber { - // ... + public static void main(String[] args) throws Exception { + try (var client = new DaprClientBuilder().buildPreviewClient()) { + // Subscribe to events - receives raw String data directly + client.subscribeToEvents(PUBSUB_NAME, topicName, TypeRef.STRING) + .doOnNext(message -> { + System.out.println("Subscriber got: " + message); + }) + .doOnError(throwable -> { + System.out.println("Subscriber got exception: " + throwable.getMessage()); + }) + .blockLast(); + } + } +} +``` + +### Option 2: CloudEvent Subscription + +Use `TypeRef>` to receive the full CloudEvent with metadata (ID, source, type, etc.): + +```java +public class SubscriberCloudEvent { public static void main(String[] args) throws Exception { - String topicName = getTopicName(args); try (var client = new DaprClientBuilder().buildPreviewClient()) { - // Subscribe to events using the Flux-based reactive API - // The stream will emit CloudEvent objects as they arrive - client.subscribeToEvents( - PUBSUB_NAME, - topicName, - TypeRef.STRING) - .doOnNext(event -> { - System.out.println("Subscriber got: " + event.getData()); + // Subscribe to events - receives CloudEvent with full metadata + client.subscribeToEvents(PUBSUB_NAME, topicName, new TypeRef>() {}) + .doOnNext(cloudEvent -> { + System.out.println("Received CloudEvent:"); + System.out.println(" ID: " + cloudEvent.getId()); + System.out.println(" Type: " + cloudEvent.getType()); + System.out.println(" Data: " + cloudEvent.getData()); }) .doOnError(throwable -> { System.out.println("Subscriber got exception: " + throwable.getMessage()); }) - .blockLast(); // Blocks indefinitely until the stream completes (keeps the subscriber running) + .blockLast(); } } - - // ... } ``` -Execute the following command to run the Subscriber example: +### Subscription with Metadata + +You can also pass metadata to the subscription, for example to enable raw payload mode: + +```java +client.subscribeToEvents(PUBSUB_NAME, topicName, TypeRef.STRING, Map.of("rawPayload", "true")) + .doOnNext(message -> { + System.out.println("Subscriber got: " + message); + }) + .blockLast(); +``` + +### Subscription Lifecycle + +The examples use `blockLast()` to keep the subscriber running indefinitely. For production use cases requiring explicit subscription lifecycle control, you can use `.subscribe()` which returns a `Disposable` that can be disposed via `disposable.dispose()`. + +## Running the Examples + +Execute the following command to run the raw data Subscriber example: +Or run the CloudEvent Subscriber example: + +```bash +dapr run --resources-path ./components/pubsub --app-id subscriber -- java -jar target/dapr-java-sdk-examples-exec.jar io.dapr.examples.pubsub.stream.SubscriberCloudEvent +``` + Once the subscriber is running, run the publisher in a new terminal to see the events in the subscriber's side: