Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 99 additions & 4 deletions src/main/java/io/github/hectorvent/floci/core/common/XmlParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,41 @@ public final class XmlParser {

private XmlParser() {}

/**
* Reads the text content of the current element if it is a leaf (contains only text).
* If the element contains nested child elements, the entire subtree is skipped
* and {@code null} is returned.
*
* <p>After this method returns, the reader is positioned on the END_ELEMENT
* of the element that was open when the method was called.
*/
private static String readLeafText(XMLStreamReader r) throws XMLStreamException {
StringBuilder sb = new StringBuilder();
while (r.hasNext()) {
int event = r.next();
if (event == XMLStreamConstants.CHARACTERS || event == XMLStreamConstants.CDATA) {
sb.append(r.getText());
} else if (event == XMLStreamConstants.START_ELEMENT) {
// Not a leaf — skip the child subtree, then continue
// consuming until we reach our own END_ELEMENT.
// depth starts at 2: 1 for ourselves (the element readLeafText
// was called for) + 1 for the child START we just saw.
int depth = 2;
while (r.hasNext()) {
int e = r.next();
if (e == XMLStreamConstants.START_ELEMENT) depth++;
else if (e == XMLStreamConstants.END_ELEMENT) {
if (--depth == 0) break;
}
}
return null;
} else if (event == XMLStreamConstants.END_ELEMENT) {
break;
}
}
return sb.toString();
}

/**
* Extracts the text content of every element whose local name matches {@code elementName}.
*
Expand Down Expand Up @@ -153,8 +188,10 @@ public static List<Map<String, List<String>>> extractGroupsMulti(String xml, Str
current = new LinkedHashMap<>();
depth = 1;
} else if (current != null && depth == 1) {
String text = r.getElementText();
current.computeIfAbsent(local, k -> new ArrayList<>()).add(text);
String text = readLeafText(r);
if (text != null) {
current.computeIfAbsent(local, k -> new ArrayList<>()).add(text);
}
} else if (current != null) {
depth++;
}
Expand Down Expand Up @@ -201,8 +238,10 @@ public static List<Map<String, String>> extractGroups(String xml, String parentE
current = new LinkedHashMap<>();
depth = 1;
} else if (current != null && depth == 1) {
String text = r.getElementText();
current.put(local, text);
String text = readLeafText(r);
if (text != null) {
current.put(local, text);
}
} else if (current != null) {
depth++;
}
Expand All @@ -220,4 +259,60 @@ public static List<Map<String, String>> extractGroups(String xml, String parentE
} catch (XMLStreamException ignored) {}
return result;
}

/**
* Extracts key/value pairs from a repeating {@code pairElement} nested at any depth
* inside each {@code parentElement} group, returning one map per group.
*
* <p>The outer list is index-aligned with the result of
* {@link #extractGroupsMulti(String, String)} for the same {@code parentElement}.
*
* <p>Example — extracts S3 notification filter rules:
* <pre>{@code
* List<Map<String,String>> filters = XmlParser.extractPairsPerGroup(
* body, "QueueConfiguration", "FilterRule", "Name", "Value");
* // filters.get(0) → {prefix=images/, suffix=.jpg}
* }</pre>
*/
public static List<Map<String, String>> extractPairsPerGroup(
String xml, String parentElement,
String pairElement, String keyElement, String valueElement) {
List<Map<String, String>> result = new ArrayList<>();
if (xml == null || xml.isEmpty()) {
return result;
}
try {
XMLStreamReader r = FACTORY.createXMLStreamReader(new StringReader(xml));
Map<String, String> current = null;
boolean inPair = false;
String pendingKey = null;
while (r.hasNext()) {
int event = r.next();
if (event == XMLStreamConstants.START_ELEMENT) {
String local = r.getLocalName();
if (parentElement.equals(local)) {
current = new LinkedHashMap<>();
} else if (current != null && pairElement.equals(local)) {
inPair = true;
pendingKey = null;
} else if (inPair && keyElement.equals(local)) {
pendingKey = r.getElementText();
} else if (inPair && valueElement.equals(local) && pendingKey != null) {
current.put(pendingKey, r.getElementText());
pendingKey = null;
}
} else if (event == XMLStreamConstants.END_ELEMENT) {
String local = r.getLocalName();
if (parentElement.equals(local) && current != null) {
result.add(current);
current = null;
} else if (pairElement.equals(local)) {
inPair = false;
}
}
}
r.close();
} catch (XMLStreamException ignored) {}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.github.hectorvent.floci.services.s3.model.GetObjectAttributesParts;
import io.github.hectorvent.floci.services.s3.model.GetObjectAttributesResult;
import io.github.hectorvent.floci.services.s3.model.MultipartUpload;
import io.github.hectorvent.floci.services.s3.model.FilterRule;
import io.github.hectorvent.floci.services.s3.model.NotificationConfiguration;
import io.github.hectorvent.floci.services.s3.model.ObjectAttributeName;
import io.github.hectorvent.floci.services.s3.model.QueueNotification;
Expand All @@ -30,6 +31,7 @@
import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.List;
Expand Down Expand Up @@ -778,6 +780,7 @@ private Response handleGetBucketNotification(String bucket) {
for (String event : qn.events()) {
xml.elem("Event", event);
}
appendFilterRules(xml, qn.filterRules());
xml.end("QueueConfiguration");
}
for (TopicNotification tn : config.getTopicConfigurations()) {
Expand All @@ -787,6 +790,7 @@ private Response handleGetBucketNotification(String bucket) {
for (String event : tn.events()) {
xml.elem("Event", event);
}
appendFilterRules(xml, tn.filterRules());
xml.end("TopicConfiguration");
}
xml.end("NotificationConfiguration");
Expand All @@ -801,24 +805,13 @@ private Response handlePutBucketNotification(String bucket, byte[] body) {
String xml = new String(body, StandardCharsets.UTF_8);
NotificationConfiguration config = new NotificationConfiguration();

var queueConfigs = XmlParser.extractGroupsMulti(xml, "QueueConfiguration");
for (var group : queueConfigs) {
String id = group.getOrDefault("Id", List.of("")).get(0);
List<String> queueArns = group.get("Queue");
List<String> events = group.get("Event");
if (queueArns != null && !queueArns.isEmpty() && events != null && !events.isEmpty()) {
config.getQueueConfigurations().add(new QueueNotification(id, queueArns.get(0), events));
}
for (var parsed : parseNotificationGroups(xml, "QueueConfiguration", "Queue")) {
config.getQueueConfigurations().add(
new QueueNotification(parsed.id, parsed.arn, parsed.events, parsed.filterRules));
}

var topicConfigs = XmlParser.extractGroupsMulti(xml, "TopicConfiguration");
for (var group : topicConfigs) {
String id = group.getOrDefault("Id", List.of("")).get(0);
List<String> topicArns = group.get("Topic");
List<String> events = group.get("Event");
if (topicArns != null && !topicArns.isEmpty() && events != null && !events.isEmpty()) {
config.getTopicConfigurations().add(new TopicNotification(id, topicArns.get(0), events));
}
for (var parsed : parseNotificationGroups(xml, "TopicConfiguration", "Topic")) {
config.getTopicConfigurations().add(
new TopicNotification(parsed.id, parsed.arn, parsed.events, parsed.filterRules));
}

s3Service.putBucketNotificationConfiguration(bucket, config);
Expand All @@ -828,6 +821,44 @@ private Response handlePutBucketNotification(String bucket, byte[] body) {
}
}

private record ParsedNotificationGroup(String id, String arn, List<String> events,
List<FilterRule> filterRules) {}

private static List<ParsedNotificationGroup> parseNotificationGroups(
String xml, String groupElement, String arnElement) {
var groups = XmlParser.extractGroupsMulti(xml, groupElement);
var filters = XmlParser.extractPairsPerGroup(xml, groupElement,
"FilterRule", "Name", "Value");
List<ParsedNotificationGroup> result = new ArrayList<>();
for (int i = 0; i < groups.size(); i++) {
var group = groups.get(i);
String id = group.getOrDefault("Id", List.of("")).getFirst();
List<String> arns = group.get(arnElement);
List<String> events = group.get("Event");
if (arns != null && !arns.isEmpty() && events != null && !events.isEmpty()) {
List<FilterRule> rules = i < filters.size()
? filters.get(i).entrySet().stream()
.map(e -> new FilterRule(e.getKey(), e.getValue()))
.toList()
: List.of();
result.add(new ParsedNotificationGroup(id, arns.getFirst(), events, rules));
}
}
return result;
}

private static void appendFilterRules(XmlBuilder xml, List<FilterRule> rules) {
if (rules == null || rules.isEmpty()) return;
xml.start("Filter").start("S3Key");
for (FilterRule rule : rules) {
xml.start("FilterRule")
.elem("Name", rule.name())
.elem("Value", rule.value())
.end("FilterRule");
}
xml.end("S3Key").end("Filter");
}

/**
* Strips the {@code aws-chunked} token from a {@code Content-Encoding} value before persisting it.
* {@code aws-chunked} is a transfer-protocol marker used by AWS SDK v2 streaming uploads and is not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1090,7 +1090,7 @@ private void fireNotifications(String bucketName, String key, String eventName,
String eventJson = buildS3EventJson(bucketName, key, eventName, obj, region, bucket.isVersioningEnabled());

for (QueueNotification qn : config.getQueueConfigurations()) {
if (qn.events().stream().anyMatch(p -> matchesEvent(p, eventName))) {
if (qn.events().stream().anyMatch(p -> matchesEvent(p, eventName)) && qn.matchesKey(key)) {
try {
sqsService.sendMessage(sqsUrlFromArn(qn.queueArn()), eventJson, 0);
LOG.debugv("Fired S3 event {0} to SQS {1}", eventName, qn.queueArn());
Expand All @@ -1101,7 +1101,7 @@ private void fireNotifications(String bucketName, String key, String eventName,
}

for (TopicNotification tn : config.getTopicConfigurations()) {
if (tn.events().stream().anyMatch(p -> matchesEvent(p, eventName))) {
if (tn.events().stream().anyMatch(p -> matchesEvent(p, eventName)) && tn.matchesKey(key)) {
try {
snsService.publish(tn.topicArn(), null, eventJson, "Amazon S3 Notification", region);
LOG.debugv("Fired S3 event {0} to SNS {1}", eventName, tn.topicArn());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package io.github.hectorvent.floci.services.s3.model;

import io.quarkus.runtime.annotations.RegisterForReflection;

/**
* A single S3 notification filter rule (prefix or suffix match on the object key).
*
* @param name "prefix" or "suffix"
* @param value the value to match against
*/
@RegisterForReflection
public record FilterRule(String name, String value) {

public boolean matches(String key) {
if (key == null || name == null || value == null) return false;
return switch (name.toLowerCase()) {
case "prefix" -> key.startsWith(value);
case "suffix" -> key.endsWith(value);
default -> false;
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,13 @@
import java.util.List;

@RegisterForReflection
public record QueueNotification(String id, String queueArn, List<String> events) {}
public record QueueNotification(String id, String queueArn, List<String> events, List<FilterRule> filterRules) {

public QueueNotification(String id, String queueArn, List<String> events) {
this(id, queueArn, events, List.of());
}

public boolean matchesKey(String key) {
return filterRules == null || filterRules.isEmpty() || filterRules.stream().allMatch(r -> r.matches(key));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,13 @@
import java.util.List;

@RegisterForReflection
public record TopicNotification(String id, String topicArn, List<String> events) {}
public record TopicNotification(String id, String topicArn, List<String> events, List<FilterRule> filterRules) {

public TopicNotification(String id, String topicArn, List<String> events) {
this(id, topicArn, events, List.of());
}

public boolean matchesKey(String key) {
return filterRules == null || filterRules.isEmpty() || filterRules.stream().allMatch(r -> r.matches(key));
}
}
Loading
Loading