Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 138 additions & 0 deletions docs/plugins/core-engines-configuration.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
# EventMesh Core Engines Configuration Guide

EventMesh provides powerful core engines (`Filter`, `Transformer`, `Router`) to dynamically process messages. These engines are configured via **MetaStorage** (Governance Center, e.g., Nacos, Etcd), supporting on-demand loading and hot-reloading.

## 0. Core Concepts

Before configuration, it is important to understand the specific role of each engine in the message flow:

* **Filter (The Gatekeeper)**: Decides **"Whether to pass"**.
* It inspects the message (CloudEvent) attributes. If the message matches the rules, it passes; otherwise, it is dropped.
* *Use Case*: Block debug logs from production traffic; Only subscribe to specific event types.

* **Transformer (The Translator)**: Decides **"What it looks like"**.
* It modifies the message content (Payload or Metadata) according to templates or scripts.
* *Use Case*: Convert XML to JSON; Mask sensitive data (PII); Adapt legacy protocols to new standards.

* **Router (The Dispatcher)**: Decides **"Where to go"**.
* It dynamically changes the destination (Topic) of the message.
* *Use Case*: Route traffic to a Canary/Gray release topic; Route high-priority orders to a dedicated queue.

---

## 1. Overview

The configuration is not in local property files but distributed via the MetaStorage. EventMesh listens to specific **Keys** based on client Groups.

- **Data Source**: Configured via `eventMesh.metaStorage.plugin.type`.
- **Loading Mechanism**: Lazy loading & Hot-reloading.
- **Key Format**: `{EnginePrefix}-{GroupName}-{TopicName}`.
- **Value Format**: JSON Array.
- **Pipeline Key**: The engines are invoked using a pipeline key of format `{GroupName}-{TopicName}`, which is used to look up configurations with the prefix.

| Engine | Prefix | Scope | Description |
| :--- | :--- | :--- | :--- |
| **Router** | `router-` | Pub Only | Routes messages to different topics. |
| **Filter** | `filter-` | Pub & Sub | Filters messages based on CloudEvent attributes. |
| **Transformer** | `transformer-` | Pub & Sub | Transforms message content (Payload/Header). |

**Note**: All protocol processors (TCP, HTTP, gRPC) now use unified `IngressProcessor` (for publishing) and `EgressProcessor` (for consuming) to consistently apply these engines.

---

## 2. Router (Routing)

**Scope**: Publish Only (Upstream)
**Key**: `router-{producerGroup}`

Decides the target storage topic for a message sent by a producer.

### Configuration Example (JSON)

```json
[
{
"topic": "original-topic",
"routerConfig": {
"targetTopic": "redirect-topic",
"expression": "data.type == 'urgent'"
}
}
]
```

* **topic**: The original topic the producer sends to.
* **targetTopic**: The actual topic to write to Storage.
* **expression**: Condition to trigger routing (e.g., SpEL).

---

## 3. Filter (Filtering)

**Scope**: Both Publish (Upstream) & Subscribe (Downstream)

### A. Publish Side (Upstream)
**Key**: `filter-{producerGroup}`
**Effect**: Intercepts messages **before** they are sent to Storage.

### B. Subscribe Side (Downstream)
**Key**: `filter-{consumerGroup}`
**Effect**: Intercepts messages **before** they are pushed to the Consumer.

### Configuration Example (JSON)

```json
[
{
"topic": "test-topic",
"filterPattern": {
"source": ["app-a", "app-b"],
"type": [{"prefix": "com.example"}]
}
}
]
```

* **filterPattern**: Rules matching CloudEvent attributes. If a message doesn't match, it is dropped.

---

## 4. Transformer (Transformation)

**Scope**: Both Publish (Upstream) & Subscribe (Downstream)

### A. Publish Side (Upstream)
**Key**: `transformer-{producerGroup}`
**Effect**: Modifies message content **before** sending to Storage.

### B. Subscribe Side (Downstream)
**Key**: `transformer-{consumerGroup}`
**Effect**: Modifies message content **before** pushing to the Consumer.

### Configuration Example (JSON)

```json
[
{
"topic": "raw-topic",
"transformerConfig": {
"transformerType": "template",
"template": "{\"id\": \"${id}\", \"new_content\": \"${data.content}\"}"
}
}
]
```

* **transformerType**: e.g., `original`, `template`.
* **template**: The transformation template definition.

---

## 5. Verification

1. **Publish Config**: Add the JSON config to your Governance Center (e.g., Nacos) with the Data ID `router-MyGroup`.
2. **Send Message**: Use EventMesh SDK to send a message from `MyGroup`.
3. **Observe**:
* For **Router**: Check if the message appears in the `targetTopic` in your MQ.
* For **Filter**: Check if blocked messages are skipped.
* For **Transformer**: Check if the message body in MQ (for Pub) or Consumer (for Sub) is modified.
201 changes: 201 additions & 0 deletions docs/unified-runtime-design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
# Unified Runtime Design & Usage Guide

## 1. Overview
The EventMesh Unified Runtime consolidates the capabilities of the core EventMesh Runtime (Protocol handling), Connectors (Source/Sink), and Functions (Filter/Transformer/Router) into a single, cohesive process. This eliminates the need for separate deployments for Connectors ("Runtime V2") and simplifies the architecture.

## 2. Architecture: The Unified Processing Pipeline

The system implements a symmetrical processing chain for both event production (Ingress) and consumption (Egress), but the entry/exit points differ based on the client type (SDK vs. Connector).

### 2.1 Ingress Pipeline (Production)

**Entry Points:**
* **SDK Client**: Interacts with the Runtime via **Protocol Servers** (TCP/HTTP/gRPC). The Protocol Server receives the request and passes the event to the pipeline.
* **Source Connector**: Loaded directly into the Runtime as a **Plugin**. The Source Connector pulls data from external systems and internally injects events into the pipeline.

**Flow:**
`[Entry: Protocol Server (SDK) OR Source Plugin (Connector)] -> [IngressProcessor] -> [Storage]`

**IngressProcessor Pipeline:**
`[Filter] -> [Transformer] -> [Router]`

1. **Entry**:
* **SDK**: Request received by `EventMeshTCPServer`, `EventMeshHTTPServer`, or `EventMeshGrpcServer`.
* **Connector**: `SourceWorker` pulls data and converts it to a CloudEvent.
2. **IngressProcessor**: Encapsulates the unified 3-stage pipeline:
* **Filter**: The `FilterEngine` evaluates the event against configured rules. If unmatched, returns null (event dropped).
* **Transformer**: The `TransformerEngine` transforms the event payload (e.g., JSON manipulation) if a rule exists.
* **Router**: The `RouterEngine` determines the target topic/destination.
3. **Storage**: The processed event is persisted to the Storage Plugin (RocketMQ, Kafka, etc.).

### 2.2 Egress Pipeline (Consumption)

**Exit Points:**
* **SDK Client**: The Runtime pushes events to connected SDK clients via the active **Protocol Server** connection.
* **Sink Connector**: Loaded directly into the Runtime as a **Plugin**. The Runtime passes events to the `SinkWorker`, which writes to external systems.

**Flow:**
`[Storage] -> [EgressProcessor] -> [Exit: Protocol Server (SDK) OR Sink Plugin (Connector)]`

**EgressProcessor Pipeline:**
`[Filter] -> [Transformer]`

1. **Storage**: Event retrieved from the storage queue.
2. **EgressProcessor**: Encapsulates the 2-stage pipeline (no Router on egress):
* **Filter**: Evaluated against the consumer group's filter rules. If unmatched, returns null (event not delivered).
* **Transformer**: Payload transformed according to the consumer group's needs.
3. **Exit**:
* **SDK**: Event pushed to client via TCP/HTTP/gRPC session.
* **Connector**: Event passed to `SinkWorker` for external delivery.

### 2.3 Protocol Processor Migration Status

All protocol processors now use the unified IngressProcessor/EgressProcessor architecture:

**TCP Protocol**: ✅ Complete
* `ClientGroupWrapper` - Integrated both Ingress (send) and Egress (consume)

**HTTP Protocol**: ✅ Complete
* `SendAsyncEventProcessor` - Ingress pipeline
* `SendAsyncMessageProcessor` - Ingress pipeline
* `SendSyncMessageProcessor` - Ingress pipeline
* `BatchSendMessageProcessor` - Ingress pipeline with batch statistics
* `BatchSendMessageV2Processor` - Ingress pipeline with batch statistics

**gRPC Protocol**: ✅ Complete
* `PublishCloudEventsProcessor` - Ingress pipeline
* `BatchPublishCloudEventProcessor` - Ingress pipeline with batch statistics
* `RequestCloudEventProcessor` - Bidirectional (Ingress for request, Egress for response)

**Connectors**: ✅ Complete
* `SourceWorker` - Ingress pipeline
* `SinkWorker` - Egress pipeline

## 3. Configuration

### 3.1 Enabling Connectors
To enable the embedded Connector runtime, update `eventmesh.properties`:

```properties
# Enable the connector plugin
eventMesh.connector.plugin.enable=true

# Specify the connector type (source or sink) and name (SPI name)
eventMesh.connector.plugin.type=source
eventMesh.connector.plugin.name=my-source-connector
```

### 3.2 Configuring Functions
Functions are dynamic and configured via the **MetaStorage** (e.g., Nacos, Etcd).

* **Prefixes**:
* Filter: `filter-{group}-{topic}`
* Transformer: `transformer-{group}-{topic}`
* Router: `router-{group}-{topic}`

**Example Nacos Config (Filter):**
Key: `filter-myGroup-myTopic`
Value:
```json
[
{
"topic": "myTopic",
"condition": "{\"dataList\":[{\"key\":\"$.type\",\"value\":\"sometype\",\"operator\":\"EQ\"}]}"
}
]
```

## 4. Developer Guide

### 4.1 Key Components
* **`EventMeshConnectorBootstrap`**: Bootstraps the Connector `SourceWorker` or `SinkWorker` within the EventMeshServer process.
* **`IngressProcessor`**: Unified processor for all upstream message flows (SDK → Storage). Executes Filter → Transformer → Router pipeline.
* **`EgressProcessor`**: Unified processor for all downstream message flows (Storage → SDK/Connector). Executes Filter → Transformer pipeline (no Router).
* **`BatchProcessResult`**: Utility class for tracking batch processing statistics (success/filtered/failed counts).
* **`ClientGroupWrapper`**: Handles the processing logic for TCP clients. Modified to execute the pipeline during `send` (Ingress) and `consume` (Egress).
* **`SourceWorker`**: Modified to support a pluggable `Publisher`, allowing it to inject events directly into the `EventMeshServer` pipeline instead of using a remote TCP client.

### 4.2 Pipeline Integration Pattern

All protocol processors follow this pattern:

**For Ingress (Publishing)**:
```java
// 1. Construct pipeline key
String pipelineKey = producerGroup + "-" + topic;

// 2. Apply IngressProcessor
CloudEvent processedEvent = eventMeshServer.getIngressProcessor()
.process(cloudEvent, pipelineKey);

// 3. Check if filtered (null means filtered)
if (processedEvent == null) {
// Return success for filtered messages
return;
}

// 4. Use routed topic (Router may have changed it)
String finalTopic = processedEvent.getSubject();

// 5. Send to storage
producer.send(processedEvent, callback);
```

**For Egress (Consuming)**:
```java
// 1. Construct pipeline key
String pipelineKey = consumerGroup + "-" + topic;

// 2. Apply EgressProcessor
CloudEvent processedEvent = eventMeshServer.getEgressProcessor()
.process(cloudEvent, pipelineKey);

// 3. Check if filtered
if (processedEvent == null) {
// Commit offset but don't deliver to client
return;
}

// 4. Deliver to client
client.send(processedEvent);
```

### 4.3 Batch Processing Pattern

For batch processors, use `BatchProcessResult` to track statistics:
```java
BatchProcessResult batchResult = new BatchProcessResult(totalCount);

for (CloudEvent event : events) {
try {
CloudEvent processed = ingressProcessor.process(event, pipelineKey);
if (processed == null) {
batchResult.incrementFiltered();
continue;
}

producer.send(processed, new SendCallback() {
public void onSuccess(SendResult result) {
batchResult.incrementSuccess();
}
public void onException(OnExceptionContext ctx) {
batchResult.incrementFailed(event.getId());
}
});
} catch (Exception e) {
batchResult.incrementFailed(event.getId());
}
}

// Return summary: "success=5, filtered=2, failed=1"
String summary = batchResult.toSummary();
```

### 4.4 Adding New Tests
When modifying the pipeline, ensure to add unit tests in:
* `org.apache.eventmesh.runtime.core.protocol.IngressProcessorTest`
* `org.apache.eventmesh.runtime.core.protocol.EgressProcessorTest`
* `org.apache.eventmesh.runtime.core.protocol.BatchProcessResultTest`
* `org.apache.eventmesh.runtime.core.protocol.tcp.client.group.ClientGroupWrapperTest`
* `org.apache.eventmesh.runtime.boot.EventMeshConnectorBootstrapTest`
* Protocol-specific processor tests (e.g., `SendAsyncEventProcessorTest`)
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,15 @@ public class CommonConfiguration {
@ConfigField(field = "registry.plugin.enabled")
private boolean eventMeshRegistryPluginEnabled = false;

@ConfigField(field = "connector.plugin.type")
private String eventMeshConnectorPluginType;

@ConfigField(field = "connector.plugin.name")
private String eventMeshConnectorPluginName;

@ConfigField(field = "connector.plugin.enabled")
private boolean eventMeshConnectorPluginEnable = false;

public void reload() {

if (Strings.isNullOrEmpty(this.eventMeshServerIp)) {
Expand Down
4 changes: 4 additions & 0 deletions eventmesh-function/eventmesh-function-api/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

dependencies {
implementation project(":eventmesh-common")
}
Loading
Loading