diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateRecord.java index 818d7679c8a0..7c33cd76c42d 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/GenerateRecord.java @@ -64,12 +64,10 @@ import org.apache.nifi.util.StringUtils; import java.math.BigInteger; -import java.time.LocalDate; +import java.time.Instant; import java.time.ZoneId; -import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.Collection; -import java.util.Date; import java.util.HashMap; import java.util.List; import java.util.Locale; @@ -405,11 +403,10 @@ private Object generateValueFromRecordField(RecordField recordField, Faker faker yield enums.get(faker.number().numberBetween(0, enums.size() - 1)); } case TIME -> { - Date fakeDate = (Date) FakerUtils.getFakeData(DEFAULT_DATE_PROPERTY_NAME, faker); - LocalDate fakeLocalDate = fakeDate.toInstant().atZone(ZoneId.systemDefault()).toLocalDate(); - yield fakeLocalDate.format(DateTimeFormatter.ISO_LOCAL_TIME); + Instant fakeInstant = (Instant) FakerUtils.getFakeData(DEFAULT_DATE_PROPERTY_NAME, faker); + yield fakeInstant.atZone(ZoneId.systemDefault()).toLocalTime(); } - case TIMESTAMP -> ((Date) FakerUtils.getFakeData(DEFAULT_DATE_PROPERTY_NAME, faker)).getTime(); + case TIMESTAMP -> ((Instant) FakerUtils.getFakeData(DEFAULT_DATE_PROPERTY_NAME, faker)).toEpochMilli(); case UUID -> UUID.randomUUID(); case ARRAY -> { final ArrayDataType arrayDataType = (ArrayDataType) recordField.getDataType(); diff --git a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java index bd111cf8566b..920419243a48 100644 --- a/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java +++ b/nifi-extension-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestGenerateRecord.java @@ -21,11 +21,14 @@ import org.apache.nifi.avro.AvroRecordSetWriter; import org.apache.nifi.avro.AvroTypeUtil; import org.apache.nifi.components.AllowableValue; +import org.apache.nifi.csv.CSVRecordSetWriter; +import org.apache.nifi.json.JsonRecordSetWriter; import org.apache.nifi.processors.standard.faker.FakerMethodHolder; import org.apache.nifi.processors.standard.faker.FakerUtils; import org.apache.nifi.processors.standard.faker.PredefinedRecordSchema; import org.apache.nifi.reporting.InitializationException; import org.apache.nifi.serialization.RecordReader; +import org.apache.nifi.serialization.RecordSetWriterFactory; import org.apache.nifi.serialization.record.MockRecordWriter; import org.apache.nifi.serialization.record.Record; import org.apache.nifi.serialization.record.RecordField; @@ -36,8 +39,12 @@ import org.apache.nifi.util.PropertyMigrationResult; import org.apache.nifi.util.TestRunner; import org.apache.nifi.util.TestRunners; +import org.apache.nifi.xml.XMLRecordSetWriter; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import java.io.ByteArrayInputStream; import java.lang.reflect.Field; @@ -49,6 +56,7 @@ import java.util.Locale; import java.util.Map; import java.util.Optional; +import java.util.stream.Stream; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertInstanceOf; @@ -429,4 +437,58 @@ public void testPredefinedSchemaWithNullPercentage() throws Exception { // Verify record count attribute flowFile.assertAttributeEquals("record.count", "1"); } + + @ParameterizedTest + @MethodSource("recordSetWriters") + void testSchemaTextWithLogicalTypesTimestampMillisAndTimestampMillis(RecordSetWriterFactory recordWriter) throws InitializationException { + final String schemaText = """ + { + "type": "record", + "name": "Event", + "fields": [ + { + "name": "eventTimestamp", + "type": { + "type": "long", + "logicalType": "timestamp-millis" + } + }, + { + "name": "eventTime", + "type": { + "type": "long", + "logicalType": "time-micros" + } + }, + { + "name": "eventName", + "type": "string" + } + ] + } + """; + + testRunner.addControllerService("record-writer", recordWriter); + testRunner.enableControllerService(recordWriter); + testRunner.setProperty(GenerateRecord.RECORD_WRITER, "record-writer"); + testRunner.setProperty(GenerateRecord.SCHEMA_TEXT, schemaText); + testRunner.setProperty(GenerateRecord.NUM_RECORDS, "1"); + + testRunner.assertValid(); + testRunner.run(); + + testRunner.assertTransferCount(GenerateRecord.REL_SUCCESS, 1); + MockFlowFile flowFile = testRunner.getFlowFilesForRelationship(GenerateRecord.REL_SUCCESS).getFirst(); + final String content = flowFile.getContent(); + assertTrue(content.contains("eventTime")); + flowFile.assertAttributeEquals("record.count", "1"); + } + + private static Stream recordSetWriters() { + return Stream.of( + Arguments.argumentSet("JSON", new JsonRecordSetWriter()), + Arguments.argumentSet("XML", new XMLRecordSetWriter()), + Arguments.argumentSet("CSV", new CSVRecordSetWriter()) + ); + } }