diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java index 634b09313c1b..ad52d2c6c1c5 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformRecord.java @@ -43,10 +43,14 @@ import org.apache.nifi.serialization.RecordReaderFactory; import org.apache.nifi.serialization.RecordSetWriter; import org.apache.nifi.serialization.RecordSetWriterFactory; +import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.WriteResult; +import org.apache.nifi.serialization.record.DataType; import org.apache.nifi.serialization.record.Record; +import org.apache.nifi.serialization.record.RecordField; import org.apache.nifi.serialization.record.RecordFieldType; import org.apache.nifi.serialization.record.RecordSchema; +import org.apache.nifi.serialization.record.type.RecordDataType; import org.apache.nifi.serialization.record.util.DataTypeUtils; import org.apache.nifi.util.StopWatch; @@ -65,6 +69,8 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import static java.util.Comparator.comparing; + @SideEffectFree @SupportsBatching @Tags({"record", "jolt", "transform", "shiftr", "chainr", "defaultr", "removr", "cardinality", "sort"}) @@ -86,48 +92,29 @@ @RequiresInstanceClassLoading public class JoltTransformRecord extends AbstractJoltTransform { - static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder() - .name("Record Reader") - .description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.") - .identifiesControllerService(RecordReaderFactory.class) - .required(true) - .build(); - - static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder() - .name("Record Writer") - .description("Specifies the Controller Service to use for writing out the records") - .identifiesControllerService(RecordSetWriterFactory.class) - .required(true) - .build(); - - static final Relationship REL_SUCCESS = new Relationship.Builder() - .name("success") - .description("The FlowFile with transformed content will be routed to this relationship") - .build(); - - static final Relationship REL_FAILURE = new Relationship.Builder() - .name("failure") - .description("If a FlowFile fails processing for any reason (for example, the FlowFile records cannot be parsed), it will be routed to this relationship") - .build(); - - static final Relationship REL_ORIGINAL = new Relationship.Builder() - .name("original") - .description("The original FlowFile that was transformed. If the FlowFile fails processing, nothing will be sent to this relationship") - .build(); - - private static final List PROPERTY_DESCRIPTORS = Stream.concat( - getCommonPropertyDescriptors().stream(), - Stream.of( - RECORD_READER, - RECORD_WRITER - ) - ).toList(); - - private static final Set RELATIONSHIPS = Set.of( - REL_SUCCESS, - REL_FAILURE, - REL_ORIGINAL - ); + static final PropertyDescriptor RECORD_READER = + new PropertyDescriptor.Builder().name("Record Reader").description("Specifies the Controller Service to use for parsing incoming data and determining the data's schema.") + .identifiesControllerService(RecordReaderFactory.class).required(true).build(); + + static final PropertyDescriptor RECORD_WRITER = new PropertyDescriptor.Builder().name("Record Writer").description("Specifies the Controller Service to use for writing out the records") + .identifiesControllerService(RecordSetWriterFactory.class).required(true).build(); + + static final PropertyDescriptor SCHEMA_WRITING_STRATEGY = + new PropertyDescriptor.Builder().name("Schema Writing Strategy").description("Specifies how the processor should handle records that result in different schemas after transformation.") + .allowableValues(JoltTransformWritingStrategy.class).defaultValue(JoltTransformWritingStrategy.USE_FIRST_SCHEMA).required(true).build(); + + static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("The FlowFile with transformed content will be routed to this relationship").build(); + + static final Relationship REL_FAILURE = new Relationship.Builder().name("failure") + .description("If a FlowFile fails processing for any reason (for example, the FlowFile records cannot be parsed), it will be routed to this relationship").build(); + + static final Relationship REL_ORIGINAL = + new Relationship.Builder().name("original").description("The original FlowFile that was transformed. If the FlowFile fails processing, nothing will be sent to this relationship").build(); + + private static final List PROPERTY_DESCRIPTORS = + Stream.concat(getCommonPropertyDescriptors().stream(), Stream.of(SCHEMA_WRITING_STRATEGY, RECORD_READER, RECORD_WRITER)).toList(); + + private static final Set RELATIONSHIPS = Set.of(REL_SUCCESS, REL_FAILURE, REL_ORIGINAL); @Override public Set getRelationships() { @@ -141,6 +128,16 @@ protected List getSupportedPropertyDescriptors() { @Override public void onTrigger(final ProcessContext context, ProcessSession session) throws ProcessException { + final String strategy = context.getProperty(SCHEMA_WRITING_STRATEGY).getValue(); + + if (strategy.equals(JoltTransformWritingStrategy.PARTITION_BY_SCHEMA.getValue())) { + processPartitioned(context, session); + } else { + processUniform(context, session); + } + } + + private void processPartitioned(final ProcessContext context, final ProcessSession session) { final FlowFile original = session.get(); if (original == null) { return; @@ -152,75 +149,172 @@ public void onTrigger(final ProcessContext context, ProcessSession session) thro final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); - final RecordSchema schema; - FlowFile transformed = null; + // Maps to track resources per Schema + final Map flowFileMap = new HashMap<>(); + final Map streamMap = new HashMap<>(); + final Map writerMap = new HashMap<>(); + final Map recordCounts = new HashMap<>(); + final Map writeResults = new HashMap<>(); - try (final InputStream in = session.read(original); - final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { - schema = writerFactory.getSchema(original.getAttributes(), reader.getSchema()); + boolean error = false; - final Map attributes = new HashMap<>(); - final WriteResult writeResult; - transformed = session.create(original); + try (final InputStream in = session.read(original); final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { - // We want to transform the first record before creating the Record Writer. We do this because the Record will likely end up with a different structure - // and therefore a difference Schema after being transformed. As a result, we want to transform the Record and then provide the transformed schema to the - // Record Writer so that if the Record Writer chooses to inherit the Record Schema from the Record itself, it will inherit the transformed schema, not the - // schema determined by the Record Reader. - final Record firstRecord = reader.nextRecord(); - if (firstRecord == null) { - try (final OutputStream out = session.write(transformed); - final RecordSetWriter writer = writerFactory.createWriter(getLogger(), schema, out, transformed)) { + final JoltTransform transform = getTransform(context, original); + Record currentRecord; - writer.beginRecordSet(); - writeResult = writer.finishRecordSet(); + while ((currentRecord = reader.nextRecord()) != null) { + final List transformedRecords = transform(currentRecord, transform); - attributes.put("record.count", String.valueOf(writeResult.getRecordCount())); - attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); - attributes.putAll(writeResult.getAttributes()); + if (transformedRecords.isEmpty()) { + continue; } - transformed = session.putAllAttributes(transformed, attributes); - logger.info("{} had no Records to transform", original); - } else { + for (Record transformedRecord : transformedRecords) { + if (transformedRecord == null) { + continue; + } + + final RecordSchema recordSchema = transformedRecord.getSchema(); + final RecordSchema writeSchema = writerFactory.getSchema(original.getAttributes(), normalizeSchema(recordSchema)); + + RecordSetWriter writer = writerMap.get(writeSchema); - final JoltTransform transform = getTransform(context, original); - final List transformedFirstRecords = transform(firstRecord, transform); + if (writer == null) { + FlowFile newFlowFile = session.create(original); + OutputStream out = session.write(newFlowFile); + writer = writerFactory.createWriter(getLogger(), writeSchema, out, newFlowFile); - if (transformedFirstRecords.isEmpty()) { - throw new ProcessException("Error transforming the first record"); + writer.beginRecordSet(); + + flowFileMap.put(writeSchema, newFlowFile); + streamMap.put(writeSchema, out); + writerMap.put(writeSchema, writer); + recordCounts.put(writeSchema, 0); + } + + writer.write(transformedRecord); + recordCounts.put(writeSchema, recordCounts.get(writeSchema) + 1); + } + } + } catch (final Exception e) { + error = true; + logger.error("Transform failed for {}", original, e); + } finally { + // Clean up resources + for (Map.Entry entry : writerMap.entrySet()) { + try { + final RecordSetWriter writer = entry.getValue(); + writeResults.put(entry.getKey(), writer.finishRecordSet()); + writer.close(); + } catch (Exception e) { + getLogger().warn("Failed to close Writer", e); + } + } + for (OutputStream out : streamMap.values()) { + try { + out.close(); + } catch (Exception e) { + getLogger().warn("Failed to close OutputStream", e); + } + } + } + + if (error) { + for (FlowFile flowFile : flowFileMap.values()) { + session.remove(flowFile); + } + session.transfer(original, REL_FAILURE); + } else { + if (writerMap.isEmpty()) { + logger.info("{} had no Records to transform (all filtered)", original); + } else { + final String transformType = context.getProperty(JOLT_TRANSFORM).getValue(); + for (Map.Entry entry : writerMap.entrySet()) { + RecordSchema schemaKey = entry.getKey(); + RecordSetWriter writer = entry.getValue(); + FlowFile flowFile = flowFileMap.get(schemaKey); + int count = recordCounts.get(schemaKey); + WriteResult writeResult = writeResults.get(schemaKey); + + Map attributes = new HashMap<>(writeResult.getAttributes()); + attributes.put("record.count", String.valueOf(count)); + attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType()); + + flowFile = session.putAllAttributes(flowFile, attributes); + session.getProvenanceReporter().modifyContent(flowFile, "Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); + session.transfer(flowFile, REL_SUCCESS); } + } + session.transfer(original, REL_ORIGINAL); + } + } + + private void processUniform(final ProcessContext context, final ProcessSession session) { + final FlowFile original = session.get(); + if (original == null) { + return; + } + + final ComponentLog logger = getLogger(); + final StopWatch stopWatch = new StopWatch(true); + + final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); + final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class); + + FlowFile transformed = null; + + try (final InputStream in = session.read(original); final RecordReader reader = readerFactory.createRecordReader(original, in, getLogger())) { - final Record transformedFirstRecord = transformedFirstRecords.getFirst(); - if (transformedFirstRecord == null) { - throw new ProcessException("Error transforming the first record"); + final JoltTransform transform = getTransform(context, original); + + // We need to find the first VALID record to determine the schema for the writer + Record firstValidRecord = null; + List firstValidBatch = null; + Record currentRecord; + + while ((currentRecord = reader.nextRecord()) != null) { + List transformedRecords = transform(currentRecord, transform); + if (!transformedRecords.isEmpty() && transformedRecords.getFirst() != null) { + firstValidBatch = transformedRecords; + firstValidRecord = transformedRecords.getFirst(); + break; } - final RecordSchema writeSchema = writerFactory.getSchema(original.getAttributes(), transformedFirstRecord.getSchema()); + } - // TODO: Is it possible that two Records with the same input schema could have different schemas after transformation? - // If so, then we need to avoid this pattern of writing all Records from the input FlowFile to the same output FlowFile - // and instead use a Map. This way, even if many different output schemas are possible, - // the output FlowFiles will each only contain records that have the same schema. - try (final OutputStream out = session.write(transformed); - final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out, transformed)) { + final WriteResult writeResult; + final Map attributes = new HashMap<>(); + + if (firstValidRecord == null) { + // UPDATED LOGIC: + // All records were filtered out (or input was empty). + logger.info("{} had no Records to transform", original); + } else { + transformed = session.create(original); + + // We have at least one valid record, initialize writer with its schema + final RecordSchema writeSchema = writerFactory.getSchema(original.getAttributes(), firstValidRecord.getSchema()); + + try (final OutputStream out = session.write(transformed); final RecordSetWriter writer = writerFactory.createWriter(getLogger(), writeSchema, out, transformed)) { writer.beginRecordSet(); - writer.write(transformedFirstRecord); - Record record; - // If multiple output records were generated, write them out - for (int i = 1; i < transformedFirstRecords.size(); i++) { - record = transformedFirstRecords.get(i); - if (record == null) { - throw new ProcessException("Error transforming the first record"); + // Write the first batch found + for (Record r : firstValidBatch) { + if (r != null) { + writer.write(r); } - writer.write(record); } - while ((record = reader.nextRecord()) != null) { - final List transformedRecords = transform(record, transform); - for (Record transformedRecord : transformedRecords) { - writer.write(transformedRecord); + // Write the rest + while ((currentRecord = reader.nextRecord()) != null) { + final List transformedRecords = transform(currentRecord, transform); + if (!transformedRecords.isEmpty()) { + for (Record r : transformedRecords) { + if (r != null) { + writer.write(r); + } + } } } @@ -238,10 +332,12 @@ record = transformedFirstRecords.get(i); } final String transformType = context.getProperty(JOLT_TRANSFORM).getValue(); - transformed = session.putAllAttributes(transformed, attributes); session.getProvenanceReporter().modifyContent(transformed, "Modified With " + transformType, stopWatch.getElapsed(TimeUnit.MILLISECONDS)); - logger.debug("Transform completed {}", original); + + // Only apply attributes if we actually wrote something + transformed = session.putAllAttributes(transformed, attributes); } + } catch (final Exception e) { logger.error("Transform failed for {}", original, e); session.transfer(original, REL_FAILURE); @@ -250,6 +346,7 @@ record = transformedFirstRecords.get(i); } return; } + if (transformed != null) { session.transfer(transformed, REL_SUCCESS); } @@ -262,9 +359,8 @@ public void migrateProperties(PropertyConfiguration config) { config.renameProperty("jolt-record-record-writer", RECORD_WRITER.getName()); } - private List transform(final Record record, final JoltTransform transform) { - Map recordMap = (Map) DataTypeUtils.convertRecordFieldtoObject(record, RecordFieldType.RECORD.getRecordDataType(record.getSchema())); - + private List transform(final Record inputRecord, final JoltTransform transform) { + Map recordMap = (Map) DataTypeUtils.convertRecordFieldtoObject(inputRecord, RecordFieldType.RECORD.getRecordDataType(inputRecord.getSchema())); // JOLT expects arrays to be of type List where our Record code uses Object[]. // Make another pass of the transformed objects to change Object[] to List. recordMap = (Map) normalizeJoltObjects(recordMap); @@ -279,7 +375,8 @@ private List transform(final Record record, final JoltTransform transfor return recordList; } - // If the top-level object is an array, return a list of the records inside. Otherwise return a singleton list with the single transformed record + // If the top-level object is an array, return a list of the records inside. + // Otherwise return a singleton list with the single transformed record if (normalizedRecordValues instanceof Object[]) { for (Object o : (Object[]) normalizedRecordValues) { if (o != null) { @@ -293,53 +390,101 @@ private List transform(final Record record, final JoltTransform transfor } protected static Object transform(JoltTransform joltTransform, Object input) { - return joltTransform instanceof ContextualTransform - ? ((ContextualTransform) joltTransform).transform(input, Collections.emptyMap()) : ((Transform) joltTransform).transform(input); + return joltTransform instanceof ContextualTransform ? ((ContextualTransform) joltTransform).transform(input, Collections.emptyMap()) : ((Transform) joltTransform).transform(input); } /** - * Recursively replace List objects with Object[]. JOLT expects arrays to be of type List where our Record code uses Object[]. + * Recursively replace List objects with Object[]. + * JOLT expects arrays to be of type List where our Record code uses Object[]. * * @param o The object to normalize with respect to JOLT */ @SuppressWarnings("unchecked") protected static Object normalizeJoltObjects(final Object o) { - if (o instanceof Map) { - Map m = ((Map) o); - m.forEach((k, v) -> m.put(k, normalizeJoltObjects(v))); - return m; - } else if (o instanceof Object[]) { - return Arrays.stream(((Object[]) o)).map(JoltTransformRecord::normalizeJoltObjects).collect(Collectors.toList()); - } else if (o instanceof Collection) { - Collection c = (Collection) o; - return c.stream().map(JoltTransformRecord::normalizeJoltObjects).collect(Collectors.toList()); - } else { - return o; + switch (o) { + case Map map -> { + Map m = ((Map) o); + m.forEach((k, v) -> m.put(k, normalizeJoltObjects(v))); + return m; + } + case Object[] objects -> { + return Arrays.stream(objects).map(JoltTransformRecord::normalizeJoltObjects).collect(Collectors.toList()); + } + case Collection c -> { + return c.stream().map(JoltTransformRecord::normalizeJoltObjects).collect(Collectors.toList()); + } + case null, default -> { + return o; + } } } @SuppressWarnings("unchecked") protected static Object normalizeRecordObjects(final Object o) { - if (o instanceof Map) { - Map m = ((Map) o); - m.forEach((k, v) -> m.put(k, normalizeRecordObjects(v))); - return m; - } else if (o instanceof List) { - final List objectList = (List) o; - final Object[] objectArray = new Object[objectList.size()]; - for (int i = 0; i < objectArray.length; i++) { - objectArray[i] = normalizeRecordObjects(objectList.get(i)); + switch (o) { + case Map map -> { + Map m = ((Map) o); + m.forEach((k, v) -> m.put(k, normalizeRecordObjects(v))); + return m; } - return objectArray; - } else if (o instanceof Collection) { - Collection c = (Collection) o; - final List objectList = new ArrayList<>(); - for (Object obj : c) { - objectList.add(normalizeRecordObjects(obj)); + case List list -> { + final List objectList = (List) o; + final Object[] objectArray = new Object[objectList.size()]; + for (int i = 0; i < objectArray.length; i++) { + objectArray[i] = normalizeRecordObjects(objectList.get(i)); + } + return objectArray; } - return objectList; - } else { - return o; + case Collection c -> { + final List objectList = new ArrayList<>(); + for (Object obj : c) { + objectList.add(normalizeRecordObjects(obj)); + } + return objectList; + } + case null, default -> { + return o; + } + } + } + + /** + * Recursively normalizes RecordSchema object. + * This is used to avoid unnecessary partitioning due to field ordering issues when using the 'PARTITION_BY_SCHEMA' strategy + * + * @param schema The schema to normalize + */ + private RecordSchema normalizeSchema(final RecordSchema schema) { + if (schema == null) { + return null; } + + // Normalize child schemas + final List normalizedFields = new ArrayList<>(); + for (final RecordField field : schema.getFields()) { + final DataType dataType = field.getDataType(); + if (dataType instanceof RecordDataType recordDataType) { + + final RecordSchema childSchema = recordDataType.getChildSchema(); + + if (childSchema != null) { + final RecordSchema normalizedChildSchema = normalizeSchema(childSchema); + + final RecordField normalizedField = + new RecordField(field.getFieldName(), RecordFieldType.RECORD.getRecordDataType(normalizedChildSchema), field.getDefaultValue(), field.getAliases(), field.isNullable()); + normalizedFields.add(normalizedField); + } else { + normalizedFields.add(field); + } + } else { + // Not a nested record, add as is + normalizedFields.add(field); + } + } + + // Sort fields alphabetically + normalizedFields.sort(comparing(RecordField::getFieldName)); + + return new SimpleRecordSchema(normalizedFields); } } diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformWritingStrategy.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformWritingStrategy.java new file mode 100644 index 000000000000..df1523a90eb1 --- /dev/null +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/main/java/org/apache/nifi/processors/jolt/JoltTransformWritingStrategy.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.nifi.processors.jolt; + +import org.apache.nifi.components.DescribedValue; + +public enum JoltTransformWritingStrategy implements DescribedValue { + USE_FIRST_SCHEMA("The first successful transformation determines the schema for the entire FlowFile. All subsequent records must adhere to this schema or they may be invalid."), + PARTITION_BY_SCHEMA("Each unique schema generated by the transformation results in a separate output FlowFile. Useful when Jolt produces variable output structures."); + + private final String description; + + JoltTransformWritingStrategy(final String description) { + this.description = description; + } + + @Override + public String getValue() { + return name(); + } + + @Override + public String getDisplayName() { + return name(); + } + + @Override + public String getDescription() { + return description; + } +} diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecord.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java similarity index 93% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecord.java rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java index 02c5dfcc436d..8c8ea60dae97 100644 --- a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecord.java +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestBaseJoltTransformRecord.java @@ -19,10 +19,8 @@ import org.apache.nifi.flowfile.attributes.CoreAttributes; import org.apache.nifi.jolt.util.JoltTransformStrategy; import org.apache.nifi.json.JsonRecordSetWriter; -import org.apache.nifi.json.JsonTreeReader; import org.apache.nifi.processor.Relationship; import org.apache.nifi.schema.access.SchemaAccessUtils; -import org.apache.nifi.schema.inference.SchemaInferenceUtil; import org.apache.nifi.serialization.SimpleRecordSchema; import org.apache.nifi.serialization.record.MapRecord; import org.apache.nifi.serialization.record.MockRecordParser; @@ -58,20 +56,20 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; -public class TestJoltTransformRecord { +public abstract class TestBaseJoltTransformRecord { static final String CHAINR_SPEC_PATH = "src/test/resources/specs/chainrSpec.json"; - private static final String CUSTOM_CLASS_NAME = CustomTransformJarProvider.getCustomTransformClassName(); - private static String chainrSpecContents; - private static Path customTransformJar; + protected static final String CUSTOM_CLASS_NAME = CustomTransformJarProvider.getCustomTransformClassName(); + protected static String chainrSpecContents; + protected static Path customTransformJar; @TempDir - private static Path tempDir; + protected static Path tempDir; - private TestRunner runner; - private JoltTransformRecord processor; - private MockRecordParser parser; - private JsonRecordSetWriter writer; + protected TestRunner runner; + protected JoltTransformRecord processor; + protected MockRecordParser parser; + protected JsonRecordSetWriter writer; @BeforeAll static void setUpBeforeAll() throws Exception { @@ -91,9 +89,12 @@ public void setup() throws Exception { runner.addControllerService("writer", writer); runner.setProperty(writer, "Schema Write Strategy", "full-schema-attribute"); runner.setProperty(JoltTransformRecord.RECORD_WRITER, "writer"); + runner.setProperty(JoltTransformRecord.SCHEMA_WRITING_STRATEGY, getWritingStrategy()); // Each test must set the Schema Access strategy and Schema, and enable the writer CS } + protected abstract String getWritingStrategy(); + @Test public void testRelationshipsCreated() throws IOException { generateTestData(1, null); @@ -210,7 +211,7 @@ public void testNoRecords() throws IOException { runner.run(); runner.assertQueueEmpty(); runner.assertTransferCount(JoltTransformRecord.REL_FAILURE, 0); - runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 0); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); } @@ -344,7 +345,7 @@ public void testTransformInputWithShiftrAccentedChars() throws IOException { runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - transformed.assertAttributeExists(CoreAttributes.MIME_TYPE .key()); + transformed.assertAttributeExists(CoreAttributes.MIME_TYPE.key()); transformed.assertAttributeEquals(CoreAttributes.MIME_TYPE.key(), "application/json"); transformed.assertContentEquals(getExpectedContent("src/test/resources/TestJoltTransformRecord/shiftrOutput.json")); } @@ -362,12 +363,12 @@ public void testTransformInputWithShiftrMultipleOutputRecords() throws IOExcepti final Record record1 = new MapRecord(xSchema, Map.of("a", 1, "b", 2, "c", 3)); final Record record2 = new MapRecord(xSchema, Map.of("a", 11, "b", 21, "c", 31)); final Record record3 = new MapRecord(xSchema, Map.of("a", 21, "b", 2, "c", 3)); - final Object[] recordArray1 = new Object[]{record1, record2, record3}; + final Object[] recordArray1 = new Object[] {record1, record2, record3}; parser.addRecord((Object) recordArray1); final Record record4 = new MapRecord(xSchema, Map.of("a", 100, "b", 200, "c", 300)); final Record record5 = new MapRecord(xSchema, Map.of("a", 101, "b", 201, "c", 301)); - final Object[] recordArray2 = new Object[]{record4, record5}; + final Object[] recordArray2 = new Object[] {record4, record5}; parser.addRecord((Object) recordArray2); final String outputSchemaText = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/shiftrOutputSchemaMultipleOutputRecords.avsc")); @@ -644,8 +645,7 @@ public void testJoltSpecEL() throws IOException { runner.setProperty(JoltTransformRecord.JOLT_SPEC, "${joltSpec}"); runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformStrategy.DEFAULTR); - final Map attributes = Collections.singletonMap("joltSpec", - "{\"RatingRange\":5,\"rating\":{\"*\":{\"MaxLabel\":\"High\",\"MinLabel\":\"Low\",\"DisplayType\":\"NORMAL\"}}}"); + final Map attributes = Collections.singletonMap("joltSpec", "{\"RatingRange\":5,\"rating\":{\"*\":{\"MaxLabel\":\"High\",\"MinLabel\":\"Low\",\"DisplayType\":\"NORMAL\"}}}"); runner.enqueue(new byte[0], attributes); runner.run(); @@ -666,61 +666,46 @@ public void testJoltSpecInvalidEL() { } @Test - public void testJoltComplexChoiceField() throws Exception { - final JsonTreeReader reader = new JsonTreeReader(); - runner.addControllerService("reader", reader); - runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA); - runner.enableControllerService(reader); - runner.setProperty(JoltTransformRecord.RECORD_READER, "reader"); + public void testTransformInputAllFiltered() throws IOException { + generateTestData(3, null); runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA); runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); runner.enableControllerService(writer); - final String flattenSpec = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/flattenSpec.json")); + final String flattenSpec = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/filterOutAllSpec.json")); runner.setProperty(JoltTransformRecord.JOLT_SPEC, flattenSpec); runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformStrategy.CHAINR); - final String inputJson = Files.readString(Paths.get("src/test/resources/TestJoltTransformRecord/input.json")); - runner.enqueue(inputJson); - + runner.enqueue(new byte[0]); runner.run(); - runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 0); + runner.assertTransferCount(JoltTransformRecord.REL_FAILURE, 0); runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); - - final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); - transformed.assertContentEquals(getExpectedContent("src/test/resources/TestJoltTransformRecord/flattenedOutput.json")); } private static Stream getChainrArguments() { - return Stream.of( - Arguments.of(Paths.get(CHAINR_SPEC_PATH), "has no single line comments"), - Arguments.of(Paths.get("src/test/resources/specs/chainrSpecWithSingleLineComment.json"), "has a single line comment")); + return Stream.of(Arguments.of(Paths.get(CHAINR_SPEC_PATH), "has no single line comments"), + Arguments.of(Paths.get("src/test/resources/specs/chainrSpecWithSingleLineComment.json"), "has a single line comment")); } - private void generateTestData(int numRecords, final BiFunction recordGenerator) { + protected void generateTestData(int numRecords, final BiFunction recordGenerator) { if (recordGenerator == null) { - final RecordSchema primarySchema = new SimpleRecordSchema(List.of( - new RecordField("value", RecordFieldType.INT.getDataType()))); - final RecordSchema seriesSchema = new SimpleRecordSchema(List.of( - new RecordField("value", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType())))); - final RecordSchema qualitySchema = new SimpleRecordSchema(List.of( - new RecordField("value", RecordFieldType.INT.getDataType()))); - final RecordSchema ratingSchema = new SimpleRecordSchema(Arrays.asList( - new RecordField("primary", RecordFieldType.RECORD.getDataType()), - new RecordField("series", RecordFieldType.RECORD.getDataType()), - new RecordField("quality", RecordFieldType.RECORD.getDataType()) - )); + final RecordSchema primarySchema = new SimpleRecordSchema(List.of(new RecordField("value", RecordFieldType.INT.getDataType()))); + final RecordSchema seriesSchema = new SimpleRecordSchema(List.of(new RecordField("value", RecordFieldType.ARRAY.getArrayDataType(RecordFieldType.INT.getDataType())))); + final RecordSchema qualitySchema = new SimpleRecordSchema(List.of(new RecordField("value", RecordFieldType.INT.getDataType()))); + final RecordSchema ratingSchema = new SimpleRecordSchema( + Arrays.asList(new RecordField("primary", RecordFieldType.RECORD.getDataType()), new RecordField("series", RecordFieldType.RECORD.getDataType()), + new RecordField("quality", RecordFieldType.RECORD.getDataType()))); parser.addSchemaField("rating", RecordFieldType.RECORD); for (int i = 0; i < numRecords; i++) { final Record primaryRecord = new MapRecord(primarySchema, Map.of("value", (10 * i) + 3)); - final Record seriesRecord = new MapRecord(seriesSchema, Map.of("value", new Integer[]{(10 * i) + 5, (10 * i) + 4})); + final Record seriesRecord = new MapRecord(seriesSchema, Map.of("value", new Integer[] {(10 * i) + 5, (10 * i) + 4})); final Record qualityRecord = new MapRecord(qualitySchema, Map.of("value", 3)); - Record ratingRecord = new MapRecord(ratingSchema, Map.of("primary", primaryRecord, - "series", seriesRecord, "quality", qualityRecord)); + Record ratingRecord = new MapRecord(ratingSchema, Map.of("primary", primaryRecord, "series", seriesRecord, "quality", qualityRecord)); parser.addRecord(ratingRecord); } @@ -729,7 +714,7 @@ private void generateTestData(int numRecords, final BiFunction flowFiles = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS); + final MockFlowFile result1 = flowFiles.get(0); + final MockFlowFile result2 = flowFiles.get(1); + + // Handles non-deterministic order + if (result1.getContent().contains("TRASH")) { + result1.assertContentEquals(expectedOutput1); + result2.assertContentEquals(expectedOutput2); + } else { + result1.assertContentEquals(expectedOutput2); + result2.assertContentEquals(expectedOutput1); + } + } + + @Test + public void testJoltComplexChoiceField() throws Exception { + final JsonTreeReader reader = new JsonTreeReader(); + runner.addControllerService("reader", reader); + runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA); + runner.enableControllerService(reader); + runner.setProperty(JoltTransformRecord.RECORD_READER, "reader"); + + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA); + runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); + runner.enableControllerService(writer); + + final String flattenSpec = getExpectedContent("src/test/resources/TestJoltTransformRecord/flattenSpec.json"); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, flattenSpec); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformStrategy.CHAINR); + + final String inputJson = getExpectedContent("src/test/resources/TestJoltTransformRecord/input.json"); + runner.enqueue(inputJson); + + runner.run(); + + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 2); + runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + + final String expectedOutput1 = getExpectedContent("src/test/resources/TestJoltTransformRecord/flattenedOutputPartitioned1.json"); + final String expectedOutput2 = getExpectedContent("src/test/resources/TestJoltTransformRecord/flattenedOutputPartitioned2.json"); + + final java.util.List flowFiles = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS); + final MockFlowFile result1 = flowFiles.get(0); + final MockFlowFile result2 = flowFiles.get(1); + + // Handles non-deterministic order + if (result1.getContent().contains("Minute")) { + result1.assertContentEquals(expectedOutput2); + result2.assertContentEquals(expectedOutput1); + } else { + result1.assertContentEquals(expectedOutput1); + result2.assertContentEquals(expectedOutput2); + } + } +} diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordUniform.java b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordUniform.java new file mode 100644 index 000000000000..6a7cd27c8868 --- /dev/null +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/java/org/apache/nifi/processors/jolt/TestJoltTransformRecordUniform.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.jolt; + +import org.apache.nifi.jolt.util.JoltTransformStrategy; +import org.apache.nifi.json.JsonRecordSetWriter; +import org.apache.nifi.json.JsonTreeReader; +import org.apache.nifi.schema.access.SchemaAccessUtils; +import org.apache.nifi.schema.inference.SchemaInferenceUtil; +import org.apache.nifi.util.MockFlowFile; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.condition.DisabledOnOs; +import org.junit.jupiter.api.condition.OS; + +@DisabledOnOs(OS.WINDOWS) //The pretty printed json comparisons don't work on windows +public class TestJoltTransformRecordUniform extends TestBaseJoltTransformRecord { + + @Override + protected String getWritingStrategy() { + return JoltTransformWritingStrategy.USE_FIRST_SCHEMA.getValue(); + } + + @Test + public void testJoltComplexChoiceField() throws Exception { + final JsonTreeReader reader = new JsonTreeReader(); + runner.addControllerService("reader", reader); + runner.setProperty(reader, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaInferenceUtil.INFER_SCHEMA); + runner.enableControllerService(reader); + runner.setProperty(JoltTransformRecord.RECORD_READER, "reader"); + + runner.setProperty(writer, SchemaAccessUtils.SCHEMA_ACCESS_STRATEGY, SchemaAccessUtils.INHERIT_RECORD_SCHEMA); + runner.setProperty(writer, JsonRecordSetWriter.PRETTY_PRINT_JSON, "true"); + runner.enableControllerService(writer); + + final String flattenSpec = getExpectedContent("src/test/resources/TestJoltTransformRecord/flattenSpec.json"); + runner.setProperty(JoltTransformRecord.JOLT_SPEC, flattenSpec); + runner.setProperty(JoltTransformRecord.JOLT_TRANSFORM, JoltTransformStrategy.CHAINR); + + final String inputJson = getExpectedContent("src/test/resources/TestJoltTransformRecord/input.json"); + runner.enqueue(inputJson); + + runner.run(); + + runner.assertTransferCount(JoltTransformRecord.REL_SUCCESS, 1); + runner.assertTransferCount(JoltTransformRecord.REL_ORIGINAL, 1); + + final MockFlowFile transformed = runner.getFlowFilesForRelationship(JoltTransformRecord.REL_SUCCESS).getFirst(); + transformed.assertContentEquals(getExpectedContent("src/test/resources/TestJoltTransformRecord/flattenedOutputUniform.json")); + } +} diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/filterOutAllSpec.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/filterOutAllSpec.json new file mode 100644 index 000000000000..397d8b5193cc --- /dev/null +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/filterOutAllSpec.json @@ -0,0 +1,8 @@ +[ + { + "operation": "shift", + "spec": { + "key": "value" + } + } +] \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/flattenedOutputPartitioned1.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/flattenedOutputPartitioned1.json new file mode 100644 index 000000000000..fcdd88dfe211 --- /dev/null +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/flattenedOutputPartitioned1.json @@ -0,0 +1,11 @@ +[ { + "TValue" : [ { + "name" : "datetime", + "value" : "2023-10-06 20:36:09.937019+00:00", + "class" : "unclass" + }, { + "name" : "Eta", + "value" : "", + "class" : "unclass" + } ] +} ] \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/flattenedOutputPartitioned2.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/flattenedOutputPartitioned2.json new file mode 100644 index 000000000000..d6a7838e1481 --- /dev/null +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/flattenedOutputPartitioned2.json @@ -0,0 +1,16 @@ +[ { + "TValue" : [ { + "name" : "datetime", + "value" : "2023-08-24 17:07:03.334170+00:00", + "class" : "unclass" + }, { + "name" : "Eta", + "value" : { + "Day" : 15, + "Hour" : 6, + "Minute" : 0, + "Month" : 8 + }, + "class" : "unclass" + } ] +} ] \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/flattenedOutput.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/flattenedOutputUniform.json similarity index 100% rename from nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/flattenedOutput.json rename to nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/flattenedOutputUniform.json diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasInput.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasInput.json new file mode 100644 index 000000000000..7715a5ad2b41 --- /dev/null +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasInput.json @@ -0,0 +1,8 @@ +[ + { + "test_field": "" + }, + { + "test_field": "value2" + } +] \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasOutput1.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasOutput1.json new file mode 100644 index 000000000000..4b95a87b8d29 --- /dev/null +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasOutput1.json @@ -0,0 +1,3 @@ +[ { + "TRASH" : null +} ] \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasOutput2.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasOutput2.json new file mode 100644 index 000000000000..e2fe1057be05 --- /dev/null +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasOutput2.json @@ -0,0 +1,5 @@ +[ { + "value" : { + "test_field" : "value2" + } +} ] \ No newline at end of file diff --git a/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasSpec.json b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasSpec.json new file mode 100644 index 000000000000..e42864532aae --- /dev/null +++ b/nifi-extension-bundles/nifi-jolt-bundle/nifi-jolt-processors/src/test/resources/TestJoltTransformRecord/multipleSchemasSpec.json @@ -0,0 +1,13 @@ +[ + { + "operation": "shift", + "spec": { + "test_field": { + "": "TRASH", + "*": { + "$": "value.test_field" + } + } + } + } +] \ No newline at end of file