From 383dff97ad83828d725c899d55fa01658713f354 Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Thu, 12 Mar 2026 20:10:29 -0500 Subject: [PATCH 1/2] NIFI-15712 Added JSON Lines support to JoltTransformJSON --- .../processors/jolt/JoltTransformJSON.java | 217 +++++++++++------- .../processors/jolt/JsonSourceStrategy.java | 16 +- .../jolt/TestJoltTransformJSON.java | 47 ++++ 3 files changed, 197 insertions(+), 83 deletions(-) diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java index a5bfe5f338bb..dd30d94630a8 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java @@ -38,7 +38,6 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.jolt.util.JoltTransformStrategy; import org.apache.nifi.jolt.util.TransformUtils; -import org.apache.nifi.logging.ComponentLog; import org.apache.nifi.processor.DataUnit; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.ProcessSession; @@ -46,10 +45,13 @@ import org.apache.nifi.processor.exception.ProcessException; import org.apache.nifi.processor.util.StandardValidators; import org.apache.nifi.util.StopWatch; -import org.apache.nifi.util.StringUtils; import org.apache.nifi.util.file.classloader.ClassLoaderUtils; +import java.io.BufferedReader; +import java.io.BufferedWriter; import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; import java.nio.charset.StandardCharsets; import java.util.List; import java.util.Set; @@ -60,7 +62,7 @@ @SupportsBatching @Tags({"json", "jolt", "transform", "chainr", "shift", "default", "remove", "cardinality", "sort"}) @InputRequirement(InputRequirement.Requirement.INPUT_REQUIRED) -@WritesAttribute(attribute = "mime.type", description = "Always set to application/json") +@WritesAttribute(attribute = "mime.type", description = "Set to application/json or application/jsonl based on configuration") @Restricted( restrictions = { @Restriction( @@ -68,8 +70,11 @@ explanation = "Enables configuration of custom code for Jolt Transforms") } ) -@CapabilityDescription("Applies a list of Jolt specifications to either the FlowFile JSON content or a specified FlowFile JSON attribute. " - + "If the JSON transform fails, the original FlowFile is routed to the 'failure' relationship.") +@CapabilityDescription(""" + Reformat JSON to JSON using a Jolt Transform with Domain Specific Language manipulation instructions. + The JOLT Community Edition documentation provides examples of supported operations and syntax standards. + """ +) @RequiresInstanceClassLoading public class JoltTransformJSON extends AbstractJoltTransform { @@ -96,6 +101,7 @@ public class JoltTransformJSON extends AbstractJoltTransform { .required(true) .allowableValues("true", "false") .defaultValue("false") + .dependsOn(JSON_SOURCE, JsonSourceStrategy.FLOW_FILE, JsonSourceStrategy.ATTRIBUTE) .build(); public static final PropertyDescriptor MAX_STRING_LENGTH = new PropertyDescriptor.Builder() @@ -130,7 +136,7 @@ public class JoltTransformJSON extends AbstractJoltTransform { REL_FAILURE ); - private volatile ClassLoader customClassLoader; + private volatile ClassLoader configuredClassLoader; private volatile JsonUtil jsonUtil; @Override @@ -143,6 +149,33 @@ protected List getSupportedPropertyDescriptors() { return PROPERTY_DESCRIPTORS; } + @OnScheduled + @Override + public void setup(final ProcessContext context) { + super.setup(context); + final int maxStringLength = context.getProperty(MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue(); + final StreamReadConstraints streamReadConstraints = StreamReadConstraints.builder().maxStringLength(maxStringLength).build(); + + final ObjectMapper objectMapper = new ObjectMapper(); + objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints); + jsonUtil = JsonUtils.customJsonUtil(objectMapper); + + configuredClassLoader = getClass().getClassLoader(); + try { + final JoltTransformStrategy strategy = context.getProperty(JOLT_TRANSFORM).asAllowableValue(JoltTransformStrategy.class); + + if (strategy == JoltTransformStrategy.CUSTOMR && context.getProperty(MODULES).isSet()) { + configuredClassLoader = ClassLoaderUtils.getCustomClassLoader( + context.getProperty(MODULES).evaluateAttributeExpressions().getValue(), + getClass().getClassLoader(), + getJarFilenameFilter() + ); + } + } catch (final Exception e) { + getLogger().error("ClassLoader configuration failed", e); + } + } + @Override public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException { final FlowFile original = session.get(); @@ -150,97 +183,121 @@ public void onTrigger(final ProcessContext context, ProcessSession session) thro return; } - final ComponentLog logger = getLogger(); final StopWatch stopWatch = new StopWatch(true); - final Object inputJson; - final boolean sourceStrategyFlowFile = JsonSourceStrategy.FLOW_FILE == context.getProperty(JSON_SOURCE).asAllowableValue(JsonSourceStrategy.class); - String jsonSourceAttributeName = null; - - if (sourceStrategyFlowFile) { - try (final InputStream in = session.read(original)) { - inputJson = jsonUtil.jsonToObject(in); - } catch (final Exception e) { - logger.error("JSON parsing failed on FlowFile content for {}", original, e); - session.transfer(original, REL_FAILURE); - return; - } - } else { - jsonSourceAttributeName = context.getProperty(JSON_SOURCE_ATTRIBUTE).getValue(); - final String jsonSourceAttributeValue = original.getAttribute(jsonSourceAttributeName); - if (StringUtils.isBlank(jsonSourceAttributeValue)) { - logger.error("FlowFile attribute '{}' value is blank", jsonSourceAttributeName); - session.transfer(original, REL_FAILURE); - return; - } else { - try { - inputJson = jsonUtil.jsonToObject(jsonSourceAttributeValue); - } catch (final Exception e) { - logger.error("JSON parsing failed on attribute '{}' of FlowFile {}", jsonSourceAttributeName, original, e); - session.transfer(original, REL_FAILURE); - return; - } - } - } - - final String jsonString; - final ClassLoader originalContextClassLoader = Thread.currentThread().getContextClassLoader(); + final JsonSourceStrategy sourceStrategy = context.getProperty(JSON_SOURCE).asAllowableValue(JsonSourceStrategy.class); + final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); try { + Thread.currentThread().setContextClassLoader(configuredClassLoader); + final JoltTransform transform = getTransform(context, original); - if (customClassLoader != null) { - Thread.currentThread().setContextClassLoader(customClassLoader); - } + final FlowFile transformedFlowFile = switch (sourceStrategy) { + case FLOW_FILE -> transformFlowFile(context, session, original, transform); + case ATTRIBUTE -> transformAttribute(context, session, original, transform); + case JSON_LINES -> transformNewlineDelimited(session, original, transform); + }; - final Object transformedJson = TransformUtils.transform(transform, inputJson); - jsonString = context.getProperty(PRETTY_PRINT).asBoolean() ? jsonUtil.toPrettyJsonString(transformedJson) : jsonUtil.toJsonString(transformedJson); + onSuccess(context, session, transformedFlowFile, sourceStrategy, stopWatch); } catch (final Exception e) { - logger.error("Transform failed for {}", original, e); + getLogger().error("Failed to Transform {}", original, e); session.transfer(original, REL_FAILURE); - return; } finally { - if (customClassLoader != null && originalContextClassLoader != null) { - Thread.currentThread().setContextClassLoader(originalContextClassLoader); - } + Thread.currentThread().setContextClassLoader(contextClassLoader); } + } - if (sourceStrategyFlowFile) { - FlowFile transformed = session.write(original, out -> out.write(jsonString.getBytes(StandardCharsets.UTF_8))); - final String transformType = context.getProperty(JOLT_TRANSFORM).getValue(); - transformed = session.putAttribute(transformed, CoreAttributes.MIME_TYPE.key(), "application/json"); - session.transfer(transformed, REL_SUCCESS); - session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - logger.info("Transform completed on FlowFile content for {}", original); - } else { - session.putAttribute(original, jsonSourceAttributeName, jsonString); - session.transfer(original, REL_SUCCESS); - logger.info("Transform completed on attribute '{}' of FlowFile {}", jsonSourceAttributeName, original); + private FlowFile transformFlowFile( + final ProcessContext context, + final ProcessSession session, + final FlowFile flowFile, + final JoltTransform transform + ) { + final Object inputJson; + try (final InputStream in = session.read(flowFile)) { + inputJson = jsonUtil.jsonToObject(in); + } catch (final Exception e) { + throw new ProcessException("JSON parsing failed for FlowFile", e); } + + final String transformedJson = getTransformedJson(context, transform, inputJson); + return session.write(flowFile, out -> out.write(transformedJson.getBytes(StandardCharsets.UTF_8))); } - @OnScheduled - @Override - public void setup(final ProcessContext context) { - super.setup(context); - final int maxStringLength = context.getProperty(MAX_STRING_LENGTH).asDataSize(DataUnit.B).intValue(); - final StreamReadConstraints streamReadConstraints = StreamReadConstraints.builder().maxStringLength(maxStringLength).build(); + private FlowFile transformAttribute( + final ProcessContext context, + final ProcessSession session, + final FlowFile flowFile, + final JoltTransform transform + ) { + final String jsonSourceAttributeName = context.getProperty(JSON_SOURCE_ATTRIBUTE).getValue(); + final String jsonSourceAttributeValue = flowFile.getAttribute(jsonSourceAttributeName); - final ObjectMapper objectMapper = new ObjectMapper(); - objectMapper.getFactory().setStreamReadConstraints(streamReadConstraints); - jsonUtil = JsonUtils.customJsonUtil(objectMapper); + if (jsonSourceAttributeValue == null || jsonSourceAttributeValue.isBlank()) { + throw new ProcessException("Content not found in FlowFile Attribute [%s]".formatted(jsonSourceAttributeName)); + } + final Object inputJson; try { - final JoltTransformStrategy strategy = context.getProperty(JOLT_TRANSFORM).asAllowableValue(JoltTransformStrategy.class); + inputJson = jsonUtil.jsonToObject(jsonSourceAttributeValue); + } catch (final Exception e) { + throw new ProcessException("JSON parsing failed for FlowFile Attribute [%s]".formatted(jsonSourceAttributeName), e); + } - if (strategy == JoltTransformStrategy.CUSTOMR && context.getProperty(MODULES).isSet()) { - customClassLoader = ClassLoaderUtils.getCustomClassLoader( - context.getProperty(MODULES).evaluateAttributeExpressions().getValue(), - this.getClass().getClassLoader(), - getJarFilenameFilter() - ); - } else { - customClassLoader = this.getClass().getClassLoader(); + final String transformedJson = getTransformedJson(context, transform, inputJson); + return session.putAttribute(flowFile, jsonSourceAttributeName, transformedJson); + } + + private FlowFile transformNewlineDelimited( + final ProcessSession session, + final FlowFile flowFile, + final JoltTransform transform + ) { + return session.write(flowFile, (in, out) -> { + try ( + final BufferedReader reader = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); + final BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(out, StandardCharsets.UTF_8)) + ) { + String line; + while ((line = reader.readLine()) != null) { + if (line.isBlank()) { + continue; + } + final Object inputJson = jsonUtil.jsonToObject(line); + final Object transformedJson = TransformUtils.transform(transform, inputJson); + writer.write(jsonUtil.toJsonString(transformedJson)); + writer.newLine(); + } } - } catch (final Exception e) { - getLogger().error("ClassLoader configuration failed", e); + }); + } + + private String getTransformedJson( + final ProcessContext context, + final JoltTransform transform, + final Object inputJson + ) { + final Object transformedJson = TransformUtils.transform(transform, inputJson); + final boolean prettyPrintEnabled = context.getProperty(PRETTY_PRINT).asBoolean(); + return prettyPrintEnabled ? jsonUtil.toPrettyJsonString(transformedJson) : jsonUtil.toJsonString(transformedJson); + } + + private void onSuccess( + final ProcessContext context, + final ProcessSession session, + final FlowFile flowFile, + final JsonSourceStrategy jsonSourceStrategy, + final StopWatch stopWatch + ) { + final FlowFile transformedFlowFile; + if (JsonSourceStrategy.FLOW_FILE == jsonSourceStrategy || JsonSourceStrategy.JSON_LINES == jsonSourceStrategy) { + transformedFlowFile = session.putAttribute(flowFile, CoreAttributes.MIME_TYPE.key(), jsonSourceStrategy.getContentType()); + final String transformType = context.getProperty(JOLT_TRANSFORM).getValue(); + final long elapsed = stopWatch.getElapsed(TimeUnit.MILLISECONDS); + session.getProvenanceReporter().modifyContent(transformedFlowFile, transformType, elapsed); + } else { + transformedFlowFile = flowFile; } + + session.transfer(transformedFlowFile, REL_SUCCESS); + getLogger().info("Transform Completed for [{}] {}", jsonSourceStrategy, flowFile); } } diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JsonSourceStrategy.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JsonSourceStrategy.java index 898142c4f70f..0bb635e48ac4 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JsonSourceStrategy.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JsonSourceStrategy.java @@ -20,12 +20,18 @@ import org.apache.nifi.components.DescribedValue; public enum JsonSourceStrategy implements DescribedValue { - FLOW_FILE("Transformation applied to FlowFile content containing JSON"), - ATTRIBUTE("Transformation applied to FlowFile attribute containing JSON"); + FLOW_FILE("application/json", "Transformation applied to FlowFile content containing JSON"), + ATTRIBUTE("application/json", "Transformation applied to FlowFile attribute containing JSON"), + JSON_LINES("application/jsonl", "Transformation applied to FlowFile content containing JSON Lines or NDJSON"); + private final String contentType; private final String description; - JsonSourceStrategy(final String description) { + JsonSourceStrategy( + final String contentType, + final String description + ) { + this.contentType = contentType; this.description = description; } @@ -43,4 +49,8 @@ public String getDisplayName() { public String getDescription() { return description; } + + public String getContentType() { + return contentType; + } } diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformJSON.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformJSON.java index a4912d3502e2..42b333c94f80 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformJSON.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformJSON.java @@ -36,6 +36,7 @@ import java.io.ByteArrayInputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; @@ -533,6 +534,52 @@ private void assertTransformedEquals(final String expectedOutputFilename) throws assertTrue(DIFFY.diff(compareJson, transformedJson).isEmpty()); } + @Test + void testTransformInputWithJsonLines() throws IOException { + final String inputJson = """ + {"rating":{"primary":{"value":3},"series":{"value":[5,4]},"quality":{"value":3}}} + + {"rating":{"primary":{"value":7},"series":{"value":[2,1]},"quality":{"value":8}}} + """; + + runner.setProperty(JoltTransformJSON.JOLT_SPEC, chainrSpecContents); + runner.setProperty(JoltTransformJSON.JSON_SOURCE, JsonSourceStrategy.JSON_LINES); + runner.enqueue(inputJson); + runner.run(); + + runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_SUCCESS); + final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformJSON.REL_SUCCESS).getFirst(); + transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), JsonSourceStrategy.JSON_LINES.getContentType()); + + final String[] outputLines = new String(transformed.toByteArray(), StandardCharsets.UTF_8) + .strip().split("\n"); + assertEquals(2, outputLines.length); + + final Object firstTransformed = JsonUtils.jsonToObject(outputLines[0]); + final Object expectedFirst = JsonUtils.jsonToObject(Files.newInputStream( + Paths.get("src/test/resources/TestJoltTransformJson/" + CHAINR_JSON_OUTPUT))); + assertTrue(DIFFY.diff(expectedFirst, firstTransformed).isEmpty()); + + final Map secondTransformed = JsonUtils.jsonToMap(outputLines[1]); + final Object secondRating = secondTransformed.get("Rating"); + assertEquals(7, secondRating); + } + + @Test + void testInvalidJsonLinesContent() { + final String inputJson = """ + {"rating":{"primary":{"value":3}}} + not valid json + """; + + runner.setProperty(JoltTransformJSON.JOLT_SPEC, chainrSpecContents); + runner.setProperty(JoltTransformJSON.JSON_SOURCE, JsonSourceStrategy.JSON_LINES); + runner.enqueue(inputJson); + runner.run(); + + runner.assertAllFlowFilesTransferred(JoltTransformJSON.REL_FAILURE); + } + private static Stream getChainrArguments() { return Stream.of( Arguments.argumentSet("has no single line comments", Paths.get(CHAINR_SPEC_PATH)), From 167a3f13ea737eaab3a834bdc613ec7276d83574 Mon Sep 17 00:00:00 2001 From: exceptionfactory Date: Fri, 13 Mar 2026 09:09:22 -0500 Subject: [PATCH 2/2] NIFI-15712 Replaced writer.newLine() with writer.write(LINE_FEED) --- .../org/apache/nifi/processors/jolt/JoltTransformJSON.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java index dd30d94630a8..7af10398bee3 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformJSON.java @@ -136,6 +136,8 @@ public class JoltTransformJSON extends AbstractJoltTransform { REL_FAILURE ); + private static final char LINE_FEED = '\n'; + private volatile ClassLoader configuredClassLoader; private volatile JsonUtil jsonUtil; @@ -264,7 +266,7 @@ private FlowFile transformNewlineDelimited( final Object inputJson = jsonUtil.jsonToObject(line); final Object transformedJson = TransformUtils.transform(transform, inputJson); writer.write(jsonUtil.toJsonString(transformedJson)); - writer.newLine(); + writer.write(LINE_FEED); } } });