feat(eventbridge): add InputTransformer support and S3 event notifications#152
feat(eventbridge): add InputTransformer support and S3 event notifications#152rpalcolea wants to merge 2 commits intohectorvent:mainfrom
Conversation
Support InputPathsMap + InputTemplate on EventBridge targets so events can be projected to a subset of fields before delivery to SQS/SNS/Lambda. - Add InputTransformer model with inputPathsMap and inputTemplate fields - Parse and serialize InputTransformer in PutTargets / ListTargetsByRule - Apply transformer in invokeTarget: extract JSONPath variables, substitute <varName> placeholders in the template - Use Jackson JsonPointer for JSONPath resolution ($.a.b.c → /a/b/c) - Add unit tests for extractJsonPath and applyInputTransformer edge cases - Add integration tests covering full put/list/deliver/transform flow Fixes hectorvent#140
There was a problem hiding this comment.
Pull request overview
Adds AWS EventBridge InputTransformer support on rule targets so events can be reshaped before delivery to targets (SQS/SNS/Lambda), and validates behavior via new unit + integration tests.
Changes:
- Introduces
InputTransformermodel and adds it toTarget. - Parses/returns
InputTransformerinPutTargets/ListTargetsByRule, and applies transformations during target invocation. - Adds unit tests for JSONPath extraction / template substitution and a new end-to-end integration test covering SQS delivery with and without transformers.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
src/main/java/io/github/hectorvent/floci/services/eventbridge/EventBridgeService.java |
Applies InputTransformer when delivering events to targets; adds JSONPath extraction and template substitution. |
src/main/java/io/github/hectorvent/floci/services/eventbridge/EventBridgeHandler.java |
Parses InputTransformer from PutTargets requests and includes it in ListTargetsByRule responses. |
src/main/java/io/github/hectorvent/floci/services/eventbridge/model/Target.java |
Extends target model to store InputTransformer. |
src/main/java/io/github/hectorvent/floci/services/eventbridge/model/InputTransformer.java |
New model representing InputPathsMap + InputTemplate. |
src/test/java/io/github/hectorvent/floci/services/eventbridge/EventBridgeServiceTest.java |
Adds unit tests for JSONPath extraction and transformer substitution rules. |
src/test/java/io/github/hectorvent/floci/services/eventbridge/EventBridgeIntegrationTest.java |
New Quarkus integration test validating transformer persistence and transformed payload delivery to SQS. |
| assert transformedBody != null : "Expected a message in the transformer queue"; | ||
| assert transformedBody.contains("my.app") : "Expected source my.app in: " + transformedBody; | ||
| assert transformedBody.contains("OrderPlaced") : "Expected type OrderPlaced in: " + transformedBody; | ||
| assert !transformedBody.contains("orderId") : "Expected raw orderId to be absent in: " + transformedBody; | ||
| } |
There was a problem hiding this comment.
The test uses Java assert statements, which are disabled unless the JVM is started with -ea, so these checks may be skipped in CI and the test could pass even when the transformer output is wrong. Use JUnit assertions (e.g., Assertions.assertNotNull / assertTrue / assertFalse) so failures are always enforced.
| .formParam("Action", "ReceiveMessage") | ||
| .formParam("QueueUrl", transformerQueueUrl) | ||
| .formParam("MaxNumberOfMessages", "10") | ||
| .when() | ||
| .post("/"); |
There was a problem hiding this comment.
This “drain” step calls ReceiveMessage but does not delete messages (or purge the queue). Since SQS receives only hide messages for the visibility timeout, the previously received message can reappear later and make this test flaky. Consider using PurgeQueue for the test queue or explicitly DeleteMessage for any receipt handles returned by ReceiveMessage.
| .formParam("Action", "ReceiveMessage") | |
| .formParam("QueueUrl", transformerQueueUrl) | |
| .formParam("MaxNumberOfMessages", "10") | |
| .when() | |
| .post("/"); | |
| .formParam("Action", "PurgeQueue") | |
| .formParam("QueueUrl", transformerQueueUrl) | |
| .when() | |
| .post("/") | |
| .then() | |
| .statusCode(200); |
| String applyInputTransformer(InputTransformer transformer, String eventJson) { | ||
| String template = transformer.getInputTemplate(); | ||
| if (template == null) { | ||
| return eventJson; | ||
| } | ||
| String result = template; | ||
| for (var e : transformer.getInputPathsMap().entrySet()) { | ||
| String value = extractJsonPath(e.getValue(), eventJson); | ||
| result = result.replace("<" + e.getKey() + ">", value != null ? value : ""); | ||
| } |
There was a problem hiding this comment.
applyInputTransformer() calls extractJsonPath() for each entry, and extractJsonPath() reparses eventJson with objectMapper.readTree(...) every time. For transformers with multiple variables and/or large events, this adds significant overhead per target invocation. Parse the event JSON once (e.g., to a JsonNode) and reuse it for all path extractions.
There was a problem hiding this comment.
@rpalcolea please take a look to this performance improvement.
| case String a when a.contains(":lambda:") || a.contains(":function:") -> { | ||
| String fnName = a.substring(a.lastIndexOf(':') + 1); | ||
| lambdaService.invoke(extractRegionFromArn(a, region), fnName, payload.getBytes(), InvocationType.Event); | ||
| LOG.debugv("EventBridge delivered to Lambda: {0}", a); |
There was a problem hiding this comment.
payload.getBytes() uses the platform default charset, which can vary by environment. For consistent AWS-compatible behavior (JSON is UTF-8), encode explicitly with UTF-8 when invoking Lambda.
There was a problem hiding this comment.
This one also need to be addressed, please
| String payload; | ||
| if (target.getInput() != null) { | ||
| payload = target.getInput(); | ||
| } else if (target.getInputTransformer() != null) { | ||
| payload = applyInputTransformer(target.getInputTransformer(), eventJson); | ||
| } else { | ||
| payload = eventJson; |
There was a problem hiding this comment.
If a target is configured with both Input and InputTransformer, this logic silently prefers Input and ignores the transformer. That can be surprising for callers and makes the API ambiguous; consider validating that only one of Input, InputPath, or InputTransformer is set and returning a ValidationException when multiple are provided.
There was a problem hiding this comment.
@rpalcolea could you investigate what is the AWS behavior in this scenario?
|
Hi @rpalcolea, Thanks for the PR. Could you get a PR for this new feature in the floci-* repo, please? |
When a bucket has <EventBridgeConfiguration/> in its notification config, every s3:ObjectCreated and s3:ObjectRemoved event is also sent to the EventBridge default bus with source=aws.s3 and a CloudTrail-style detail payload containing bucket name, object key, size, and etag. - Add eventBridgeEnabled flag to NotificationConfiguration - Parse <EventBridgeConfiguration/> in S3Controller - Inject EventBridgeService into S3Service and call putEvents in fireNotifications - Add integration tests covering enable/disable and event delivery for put and delete operations Fixes hectorvent#140
Summary
This PR adds two EventBridge features to floci:
Feature 1: EventBridge InputTransformer
Overview
AWS EventBridge lets you attach an
InputTransformerto a rule target so that events are reshaped before delivery. Instead of forwarding the full event envelope to a target (SQS, SNS, Lambda), you extract only the fields you care about and produce a custom payload.How It Works
An
InputTransformerhas two parts:InputPathsMap— a map of variable names to JSONPath expressions. Each path is evaluated against the incoming event JSON and the result is bound to the variable name.InputTemplate— a string template with<varName>placeholders. After all variables are resolved, placeholders are replaced with their extracted values before the payload is delivered.If a JSONPath expression matches a missing or null field, the placeholder is replaced with an empty string. If
InputTemplateis absent, the full event JSON is delivered as-is.Example
Given an incoming event:
{ "source": "my.app", "detail-type": "OrderPlaced", "detail": { "orderId": "123", "amount": 99.99 } }With this transformer configured on the target:
{ "InputPathsMap": { "src": "$.source", "detail": "$.detail-type" }, "InputTemplate": "{\"source\":\"<src>\",\"type\":\"<detail>\"}" }The target receives:
{"source":"my.app","type":"OrderPlaced"}The raw
orderIdandamountfields are never forwarded.API
PutTargets
Include
InputTransformeralongside the targetIdandArn:{ "Rule": "my-rule", "Targets": [{ "Id": "1", "Arn": "arn:aws:sqs:us-east-1:000000000000:my-queue", "InputTransformer": { "InputPathsMap": { "src": "$.source", "detail": "$.detail-type" }, "InputTemplate": "{\"source\":\"<src>\",\"type\":\"<detail>\"}" } }] }ListTargetsByRule
The
InputTransformeris returned as-is when listing targets:{ "Targets": [{ "Id": "1", "Arn": "arn:aws:sqs:...", "InputTransformer": { "InputPathsMap": { "src": "$.source" }, "InputTemplate": "{\"source\":\"<src>\"}" } }] }JSONPath Support
Paths use dot notation starting with
$:$.sourcesourcefield$.detail.bucket.name$.detail.size42)$.missing.fieldFeature 2: S3 → EventBridge Notifications
Overview
AWS S3 supports forwarding bucket event notifications directly to EventBridge. When enabled, every object creation and deletion fires an event to the EventBridge default bus with
source=aws.s3. Rules on the default bus can then match and route these events to any supported target.This is an alternative to the traditional SQS/SNS notification approach — it decouples S3 from specific consumers and lets EventBridge rules handle fan-out, filtering, and transformation.
How It Works
Set
<EventBridgeConfiguration/>in the bucket's notification configuration. No ARN is required — all matching events go to the default bus automatically. EventBridge rules with"source": ["aws.s3"]in their event pattern will match and route them.Events are fired for:
s3:ObjectCreated:Put→detail-type: Object Createds3:ObjectCreated:Copy→detail-type: Object Createds3:ObjectCreated:CompleteMultipartUpload→detail-type: Object Createds3:ObjectRemoved:Delete→detail-type: Object Deleteds3:ObjectRemoved:DeleteMarkerCreated→detail-type: Object DeletedEvent Shape
{ "version": "0", "id": "<uuid>", "source": "aws.s3", "detail-type": "Object Created", "account": "000000000000", "time": "2026-04-01T00:00:00Z", "region": "us-east-1", "resources": [], "detail": { "version": "0", "bucket": { "name": "my-bucket" }, "object": { "key": "uploads/photo.jpg", "size": 4096, "etag": "abc123" }, "request-id": "<uuid>", "requester": "aws:emulator", "source-ip-address": "127.0.0.1", "reason": "ObjectCreated:Put" } }API
Enable EventBridge notifications
Disable EventBridge notifications
<NotificationConfiguration/>Example EventBridge rule matching S3 events
{ "Name": "s3-object-created-rule", "EventPattern": "{\"source\":[\"aws.s3\"],\"detail-type\":[\"Object Created\"]}", "State": "ENABLED" }