Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,20 @@
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.jolt.util.JoltTransformStrategy;
import org.apache.nifi.jolt.util.TransformUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.util.StopWatch;
import org.apache.nifi.util.StringUtils;
import org.apache.nifi.util.file.classloader.ClassLoaderUtils;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Set;
Expand All @@ -60,16 +62,19 @@
@SupportsBatching
@Tags({"json", "jolt", "transform", "chainr", "shift", "default", "remove", "cardinality", "sort"})
@InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED)
@WritesAttribute(attribute = "mime.type", description = "Always set to application/json")
@WritesAttribute(attribute = "mime.type", description = "Set to application/json or application/jsonl based on configuration")
@Restricted(
restrictions = {
@Restriction(
requiredPermission = RequiredPermission.EXECUTE_CODE,
explanation = "Enables configuration of custom code for Jolt Transforms")
}
)
@CapabilityDescription("Applies a list of Jolt specifications to either the FlowFile JSON content or a specified FlowFile JSON attribute. "
+ "If the JSON transform fails, the original FlowFile is routed to the 'failure' relationship.")
@CapabilityDescription("""
Reformat JSON to JSON using a Jolt Transform with Domain Specific Language manipulation instructions.
The JOLT Community Edition documentation provides examples of supported operations and syntax standards.
"""
)
@RequiresInstanceClassLoading
public class JoltTransformJSON extends AbstractJoltTransform {

Expand All @@ -96,6 +101,7 @@ public class JoltTransformJSON extends AbstractJoltTransform {
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.dependsOn(JSON_SOURCE, JsonSourceStrategy.FLOW_FILE, JsonSourceStrategy.ATTRIBUTE)
.build();

public static final PropertyDescriptor MAX_STRING_LENGTH = new PropertyDescriptor.Builder()
Expand Down Expand Up @@ -130,7 +136,9 @@ public class JoltTransformJSON extends AbstractJoltTransform {
REL_FAILURE
);

private volatile ClassLoader customClassLoader;
private static final char LINE_FEED = '\n';

private volatile ClassLoader configuredClassLoader;
private volatile JsonUtil jsonUtil;

@Override
Expand All @@ -143,104 +151,155 @@ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return PROPERTY_DESCRIPTORS;
}

@OnScheduled
@Override
public void setup(final ProcessContext context) {
super.setup(context);
final int maxStringLength = context.getProperty(MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue();
final StreamReadConstraints streamReadConstraints = StreamReadConstraints.builder().maxStringLength(maxStringLength).build();

final ObjectMapper objectMapper = new ObjectMapper();
objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints);
jsonUtil = JsonUtils.customJsonUtil(objectMapper);

configuredClassLoader = getClass().getClassLoader();
try {
final JoltTransformStrategy strategy = context.getProperty(JOLT_TRANSFORM).asAllowableValue(JoltTransformStrategy.class);

if (strategy == JoltTransformStrategy.CUSTOMR && context.getProperty(MODULES).isSet()) {
configuredClassLoader = ClassLoaderUtils.getCustomClassLoader(
context.getProperty(MODULES).evaluateAttributeExpressions().getValue(),
getClass().getClassLoader(),
getJarFilenameFilter()
);
}
} catch (final Exception e) {
getLogger().error("ClassLoader configuration failed", e);
}
}

@Override
public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException {
final FlowFile original = session.get();
if (original == null) {
return;
}

final ComponentLog logger = getLogger();
final StopWatch stopWatch = new StopWatch(true);
final Object inputJson;
final boolean sourceStrategyFlowFile = JsonSourceStrategy.FLOW_FILE == context.getProperty(JSON_SOURCE).asAllowableValue(JsonSourceStrategy.class);
String jsonSourceAttributeName = null;

if (sourceStrategyFlowFile) {
try (final InputStream in = session.read(original)) {
inputJson = jsonUtil.jsonToObject(in);
} catch (final Exception e) {
logger.error("JSON parsing failed on FlowFile content for {}", original, e);
session.transfer(original, REL_FAILURE);
return;
}
} else {
jsonSourceAttributeName = context.getProperty(JSON_SOURCE_ATTRIBUTE).getValue();
final String jsonSourceAttributeValue = original.getAttribute(jsonSourceAttributeName);
if (StringUtils.isBlank(jsonSourceAttributeValue)) {
logger.error("FlowFile attribute '{}' value is blank", jsonSourceAttributeName);
session.transfer(original, REL_FAILURE);
return;
} else {
try {
inputJson = jsonUtil.jsonToObject(jsonSourceAttributeValue);
} catch (final Exception e) {
logger.error("JSON parsing failed on attribute '{}' of FlowFile {}", jsonSourceAttributeName, original, e);
session.transfer(original, REL_FAILURE);
return;
}
}
}

final String jsonString;
final ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader();
final JsonSourceStrategy sourceStrategy = context.getProperty(JSON_SOURCE).asAllowableValue(JsonSourceStrategy.class);
final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(configuredClassLoader);

final JoltTransform transform = getTransform(context, original);
if (customClassLoader != null) {
Thread.currentThread().setContextClassLoader(customClassLoader);
}
final FlowFile transformedFlowFile = switch (sourceStrategy) {
case FLOW_FILE -> transformFlowFile(context, session, original, transform);
case ATTRIBUTE -> transformAttribute(context, session, original, transform);
case JSON_LINES -> transformNewlineDelimited(session, original, transform);
};

final Object transformedJson = TransformUtils.transform(transform, inputJson);
jsonString = context.getProperty(PRETTY_PRINT).asBoolean() ? jsonUtil.toPrettyJsonString(transformedJson) : jsonUtil.toJsonString(transformedJson);
onSuccess(context, session, transformedFlowFile, sourceStrategy, stopWatch);
} catch (final Exception e) {
logger.error("Transform failed for {}", original, e);
getLogger().error("Failed to Transform {}", original, e);
session.transfer(original, REL_FAILURE);
return;
} finally {
if (customClassLoader != null && originalContextClassLoader != null) {
Thread.currentThread().setContextClassLoader(originalContextClassLoader);
}
Thread.currentThread().setContextClassLoader(contextClassLoader);
}
}

if (sourceStrategyFlowFile) {
FlowFile transformed = session.write(original, out -> out.write(jsonString.getBytes(StandardCharsets.UTF_8)));
final String transformType = context.getProperty(JOLT_TRANSFORM).getValue();
transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json");
session.transfer(transformed, REL_SUCCESS);
session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
logger.info("Transform completed on FlowFile content for {}", original);
} else {
session.putAttribute(original, jsonSourceAttributeName, jsonString);
session.transfer(original, REL_SUCCESS);
logger.info("Transform completed on attribute '{}' of FlowFile {}", jsonSourceAttributeName, original);
private FlowFile transformFlowFile(
final ProcessContext context,
final ProcessSession session,
final FlowFile flowFile,
final JoltTransform transform
) {
final Object inputJson;
try (final InputStream in = session.read(flowFile)) {
inputJson = jsonUtil.jsonToObject(in);
} catch (final Exception e) {
throw new ProcessException("JSON parsing failed for FlowFile", e);
}

final String transformedJson = getTransformedJson(context, transform, inputJson);
return session.write(flowFile, out -> out.write(transformedJson.getBytes(StandardCharsets.UTF_8)));
}

@OnScheduled
@Override
public void setup(final ProcessContext context) {
super.setup(context);
final int maxStringLength = context.getProperty(MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue();
final StreamReadConstraints streamReadConstraints = StreamReadConstraints.builder().maxStringLength(maxStringLength).build();
private FlowFile transformAttribute(
final ProcessContext context,
final ProcessSession session,
final FlowFile flowFile,
final JoltTransform transform
) {
final String jsonSourceAttributeName = context.getProperty(JSON_SOURCE_ATTRIBUTE).getValue();
final String jsonSourceAttributeValue = flowFile.getAttribute(jsonSourceAttributeName);

final ObjectMapper objectMapper = new ObjectMapper();
objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints);
jsonUtil = JsonUtils.customJsonUtil(objectMapper);
if (jsonSourceAttributeValue == null || jsonSourceAttributeValue.isBlank()) {
throw new ProcessException("Content not found in FlowFile Attribute [%s]".formatted(jsonSourceAttributeName));
}

final Object inputJson;
try {
final JoltTransformStrategy strategy = context.getProperty(JOLT_TRANSFORM).asAllowableValue(JoltTransformStrategy.class);
inputJson = jsonUtil.jsonToObject(jsonSourceAttributeValue);
} catch (final Exception e) {
throw new ProcessException("JSON parsing failed for FlowFile Attribute [%s]".formatted(jsonSourceAttributeName), e);
}

if (strategy == JoltTransformStrategy.CUSTOMR && context.getProperty(MODULES).isSet()) {
customClassLoader = ClassLoaderUtils.getCustomClassLoader(
context.getProperty(MODULES).evaluateAttributeExpressions().getValue(),
this.getClass().getClassLoader(),
getJarFilenameFilter()
);
} else {
customClassLoader = this.getClass().getClassLoader();
final String transformedJson = getTransformedJson(context, transform, inputJson);
return session.putAttribute(flowFile, jsonSourceAttributeName, transformedJson);
}

private FlowFile transformNewlineDelimited(
final ProcessSession session,
final FlowFile flowFile,
final JoltTransform transform
) {
return session.write(flowFile, (in, out) -> {
try (
final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8))
) {
String line;
while ((line = reader.readLine()) != null) {
if (line.isBlank()) {
continue;
}
final Object inputJson = jsonUtil.jsonToObject(line);
final Object transformedJson = TransformUtils.transform(transform, inputJson);
writer.write(jsonUtil.toJsonString(transformedJson));
writer.write(LINE_FEED);
}
}
} catch (final Exception e) {
getLogger().error("ClassLoader configuration failed", e);
});
}

private String getTransformedJson(
final ProcessContext context,
final JoltTransform transform,
final Object inputJson
) {
final Object transformedJson = TransformUtils.transform(transform, inputJson);
final boolean prettyPrintEnabled = context.getProperty(PRETTY_PRINT).asBoolean();
return prettyPrintEnabled ? jsonUtil.toPrettyJsonString(transformedJson) : jsonUtil.toJsonString(transformedJson);
}

private void onSuccess(
final ProcessContext context,
final ProcessSession session,
final FlowFile flowFile,
final JsonSourceStrategy jsonSourceStrategy,
final StopWatch stopWatch
) {
final FlowFile transformedFlowFile;
if (JsonSourceStrategy.FLOW_FILE == jsonSourceStrategy || JsonSourceStrategy.JSON_LINES == jsonSourceStrategy) {
transformedFlowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), jsonSourceStrategy.getContentType());
final String transformType = context.getProperty(JOLT_TRANSFORM).getValue();
final long elapsed = stopWatch.getElapsed(TimeUnit.MILLISECONDS);
session.getProvenanceReporter().modifyContent(transformedFlowFile, transformType, elapsed);
} else {
transformedFlowFile = flowFile;
}

session.transfer(transformedFlowFile, REL_SUCCESS);
getLogger().info("Transform Completed for [{}] {}", jsonSourceStrategy, flowFile);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,18 @@
import org.apache.nifi.components.DescribedValue;

public enum JsonSourceStrategy implements DescribedValue {
FLOW_FILE("Transformation applied to FlowFile content containing JSON"),
ATTRIBUTE("Transformation applied to FlowFile attribute containing JSON");
FLOW_FILE("application/json", "Transformation applied to FlowFile content containing JSON"),
ATTRIBUTE("application/json", "Transformation applied to FlowFile attribute containing JSON"),
JSON_LINES("application/jsonl", "Transformation applied to FlowFile content containing JSON Lines or NDJSON");

private final String contentType;
private final String description;

JsonSourceStrategy(final String description) {
JsonSourceStrategy(
final String contentType,
final String description
) {
this.contentType = contentType;
this.description = description;
}

Expand All @@ -43,4 +49,8 @@ public String getDisplayName() {
public String getDescription() {
return description;
}

public String getContentType() {
return contentType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
Expand Down Expand Up @@ -533,6 +534,52 @@ private void assertTransformedEquals(final String expectedOutputFilename) throws
assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty());
}

@Test
void testTransformInputWithJsonLines() throws IOException {
final String inputJson = """
{"rating":{"primary":{"value":3},"series":{"value":[5,4]},"quality":{"value":3}}}

{"rating":{"primary":{"value":7},"series":{"value":[2,1]},"quality":{"value":8}}}
""";

runner.setProperty(JoltTransformJSON.JOLT_SPEC, chainrSpecContents);
runner.setProperty(JoltTransformJSON.JSON_SOURCE, JsonSourceStrategy.JSON_LINES);
runner.enqueue(inputJson);
runner.run();

runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS);
final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).getFirst();
transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), JsonSourceStrategy.JSON_LINES.getContentType());

final String[] outputLines = new String(transformed.toByteArray(), StandardCharsets.UTF_8)
.strip().split("\n");
assertEquals(2, outputLines.length);

final Object firstTransformed = JsonUtils.jsonToObject(outputLines[0]);
final Object expectedFirst = JsonUtils.jsonToObject(Files.newInputStream(
Paths.get("src/test/resources/TestJoltTransformJson/" + CHAINR_JSON_OUTPUT)));
assertTrue(DIFFY.diff(expectedFirst, firstTransformed).isEmpty());

final Map<String, Object> secondTransformed = JsonUtils.jsonToMap(outputLines[1]);
final Object secondRating = secondTransformed.get("Rating");
assertEquals(7, secondRating);
}

@Test
void testInvalidJsonLinesContent() {
final String inputJson = """
{"rating":{"primary":{"value":3}}}
not valid json
""";

runner.setProperty(JoltTransformJSON.JOLT_SPEC, chainrSpecContents);
runner.setProperty(JoltTransformJSON.JSON_SOURCE, JsonSourceStrategy.JSON_LINES);
runner.enqueue(inputJson);
runner.run();

runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_FAILURE);
}

private static Stream<Arguments> getChainrArguments() {
return Stream.of(
Arguments.argumentSet("has no single line comments", Paths.get(CHAINR_SPEC_PATH)),
Expand Down
Loading