From 332ac85f48b2d8645110d1a25425485584f668e4 Mon Sep 17 00:00:00 2001 From: Christophe Le Saec Date: Thu, 20 Jul 2023 17:28:03 +0200 Subject: [PATCH] AVRO-3805: parse multiple file in one time --- .../src/main/java/org/apache/avro/Schema.java | 445 ++++++++++++++---- .../test/java/org/apache/avro/TestSchema.java | 112 +++++ .../multipleFile/ApplicationEvent.avsc | 28 ++ .../resources/multipleFile/DocumentInfo.avsc | 19 + .../resources/multipleFile/MyResponse.avsc | 14 + .../src/test/resources/multipleFile/README.md | 8 + .../compiler/specific/SpecificCompiler.java | 6 + .../test/java/org/apache/avro/TestSchema.java | 2 +- .../apache/avro/mojo/AbstractAvroMojo.java | 24 +- .../java/org/apache/avro/mojo/SchemaMojo.java | 46 +- .../multipleSchemas/ApplicationEvent.avsc | 28 ++ .../avro/multipleSchemas/DocumentInfo.avsc | 19 + .../test/avro/multipleSchemas/MyResponse.avsc | 14 + .../src/test/avro/multipleSchemas/README.md | 8 + .../unit/schema/pom-multiple-schema.xml | 66 +++ 15 files changed, 722 insertions(+), 117 deletions(-) create mode 100644 lang/java/avro/src/test/resources/multipleFile/ApplicationEvent.avsc create mode 100644 lang/java/avro/src/test/resources/multipleFile/DocumentInfo.avsc create mode 100644 lang/java/avro/src/test/resources/multipleFile/MyResponse.avsc create mode 100644 lang/java/avro/src/test/resources/multipleFile/README.md create mode 100644 lang/java/maven-plugin/src/test/avro/multipleSchemas/ApplicationEvent.avsc create mode 100644 lang/java/maven-plugin/src/test/avro/multipleSchemas/DocumentInfo.avsc create mode 100644 lang/java/maven-plugin/src/test/avro/multipleSchemas/MyResponse.avsc create mode 100644 lang/java/maven-plugin/src/test/avro/multipleSchemas/README.md create mode 100644 lang/java/maven-plugin/src/test/resources/unit/schema/pom-multiple-schema.xml diff --git a/lang/java/avro/src/main/java/org/apache/avro/Schema.java b/lang/java/avro/src/main/java/org/apache/avro/Schema.java index 9daa3731df2..d937851eb6f 100644 --- a/lang/java/avro/src/main/java/org/apache/avro/Schema.java +++ b/lang/java/avro/src/main/java/org/apache/avro/Schema.java @@ -49,6 +49,7 @@ import java.util.Objects; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.StreamSupport; import org.apache.avro.util.internal.Accessor; import org.apache.avro.util.internal.Accessor.FieldAccessor; @@ -698,6 +699,95 @@ private boolean defaultValueEquals(JsonNode thatDefaultValue) { public String toString() { return name + " type:" + schema.type + " pos:" + position; } + + /** + * Parse field. + * + * @param field : json field definition. + * @param names : names map. + * @param namespace : current working namespace. + * @return field. + */ + static Field parse(JsonNode field, Names names, String namespace) { + String fieldName = getRequiredText(field, "name", "No field name"); + String fieldDoc = getOptionalText(field, "doc"); + JsonNode fieldTypeNode = field.get("type"); + if (fieldTypeNode == null) { + throw new SchemaParseException("No field type: " + field); + } + + Schema fieldSchema = null; + if (fieldTypeNode.isTextual()) { + Schema schemaField = names.get(fieldTypeNode.textValue()); + if (schemaField == null) { + schemaField = names.get(namespace + "." + fieldTypeNode.textValue()); + } + if (schemaField == null) { + throw new SchemaParseException(fieldTypeNode + " is not a defined name." + " The type of the \"" + fieldName + + "\" field must be a defined name or a {\"type\": ...} expression."); + } + fieldSchema = schemaField; + } else if (fieldTypeNode.isObject()) { + fieldSchema = resolveSchema(fieldTypeNode, names, namespace); + if (fieldSchema == null) { + fieldSchema = Schema.parseCompleteSchema(fieldTypeNode, names, namespace); + } + } else if (fieldTypeNode.isArray()) { + List unionTypes = new ArrayList<>(); + + fieldTypeNode.forEach((JsonNode node) -> { + Schema subSchema = null; + if (node.isTextual()) { + subSchema = names.get(node.asText()); + if (subSchema == null) { + subSchema = names.get(namespace + "." + node.asText()); + } + } else if (node.isObject()) { + subSchema = Schema.parseCompleteSchema(node, names, namespace); + } else { + throw new SchemaParseException("Illegal type in union : " + node); + } + if (subSchema == null) { + throw new SchemaParseException("Null element in union : " + node); + } + unionTypes.add(subSchema); + }); + + fieldSchema = Schema.createUnion(unionTypes); + } + + if (fieldSchema == null) { + throw new SchemaParseException("Can't find type for field " + fieldName); + } + Field.Order order = Field.Order.ASCENDING; + JsonNode orderNode = field.get("order"); + if (orderNode != null) + order = Field.Order.valueOf(orderNode.textValue().toUpperCase(Locale.ENGLISH)); + JsonNode defaultValue = field.get("default"); + + if (defaultValue != null + && (Type.FLOAT.equals(fieldSchema.getType()) || Type.DOUBLE.equals(fieldSchema.getType())) + && defaultValue.isTextual()) { + try { + defaultValue = new DoubleNode(Double.valueOf(defaultValue.textValue())); + } catch (NumberFormatException ex) { + throw new SchemaParseException( + "Can't parse number '" + defaultValue.textValue() + "' for field '" + fieldName); + } + } + + Field f = new Field(fieldName, fieldSchema, fieldDoc, defaultValue, true, order); + Iterator i = field.fieldNames(); + while (i.hasNext()) { // add field props + String prop = i.next(); + if (!FIELD_RESERVED.contains(prop)) + f.addProp(prop, field.get(prop)); + } + f.aliases = parseAliases(field); + + return f; + } + } static class Name { @@ -1451,6 +1541,20 @@ public Schema parse(File file) throws IOException { return parse(FACTORY.createParser(file), false); } + public List parse(Iterable sources) throws IOException { + final List schemas = new ArrayList<>(); + for (File source : sources) { + final Schema emptySchema = parseNamesDeclared(FACTORY.createParser(source)); + schemas.add(emptySchema); + } + + for (File source : sources) { + parseFieldsOnly(FACTORY.createParser(source)); + } + + return schemas; + } + /** * Parse a schema from the provided stream. If named, the schema is added to the * names known to this parser. The input stream stays open after the parsing. @@ -1479,13 +1583,29 @@ public Schema parse(String s) { } } - private Schema parse(JsonParser parser, boolean allowDanglingContent) throws IOException { + private static interface ParseFunction { + Schema parse(JsonNode node) throws IOException; + } + + private Schema runParser(JsonParser parser, ParseFunction f) throws IOException { boolean saved = validateNames.get(); boolean savedValidateDefaults = VALIDATE_DEFAULTS.get(); try { validateNames.set(validate); VALIDATE_DEFAULTS.set(validateDefaults); JsonNode jsonNode = MAPPER.readTree(parser); + return f.parse(jsonNode); + } catch (JsonParseException e) { + throw new SchemaParseException(e); + } finally { + parser.close(); + validateNames.set(saved); + VALIDATE_DEFAULTS.set(savedValidateDefaults); + } + } + + private Schema parse(JsonParser parser, final boolean allowDanglingContent) throws IOException { + return this.runParser(parser, (JsonNode jsonNode) -> { Schema schema = Schema.parse(jsonNode, names); if (!allowDanglingContent) { String dangling; @@ -1503,14 +1623,17 @@ private Schema parse(JsonParser parser, boolean allowDanglingContent) throws IOE } } return schema; - } catch (JsonParseException e) { - throw new SchemaParseException(e); - } finally { - parser.close(); - validateNames.set(saved); - VALIDATE_DEFAULTS.set(savedValidateDefaults); - } + }); } + + private Schema parseNamesDeclared(JsonParser parser) throws IOException { + return this.runParser(parser, (JsonNode jsonNode) -> Schema.parseNamesDeclared(jsonNode, names, names.space)); + } + + private Schema parseFieldsOnly(JsonParser parser) throws IOException { + return this.runParser(parser, (JsonNode jsonNode) -> Schema.parseCompleteSchema(jsonNode, names, names.space)); + } + } /** @@ -1618,8 +1741,14 @@ public void add(Schema schema) { @Override public Schema put(Name name, Schema schema) { - if (containsKey(name)) - throw new SchemaParseException("Can't redefine: " + name); + if (containsKey(name)) { + final Schema other = super.get(name); + if (!Objects.equals(other, schema)) { + throw new SchemaParseException("Can't redefine: " + name); + } else { + return schema; + } + } return super.put(name, schema); } } @@ -1706,7 +1835,7 @@ private static boolean isValidDefault(Schema schema, JsonNode defaultValue) { /** * Validate a value against the schema. - * + * * @param schema : schema for value. * @param value : value to validate. * @return true if ok. @@ -1728,78 +1857,47 @@ private static boolean isValidValue(Schema schema, JsonNode value) { } } - /** @see #parse(String) */ - static Schema parse(JsonNode schema, Names names) { + /** + * Parse named schema in order to fill names map. This method does not parse + * field of record/error schema. + * + * @param schema : json schema representation. + * @param names : map of named schema. + * @param currentNameSpace : current working name space. + * @return schema. + */ + private static Schema parseNamesDeclared(JsonNode schema, Names names, String currentNameSpace) { if (schema == null) { - throw new SchemaParseException("Cannot parse schema"); + return null; } - if (schema.isTextual()) { // name - Schema result = names.get(schema.textValue()); - if (result == null) - throw new SchemaParseException("Undefined name: " + schema); - return result; - } else if (schema.isObject()) { - Schema result; - String type = getRequiredText(schema, "type", "No type"); + if (schema.isObject()) { + + String type = Schema.getOptionalText(schema, "type"); Name name = null; - String savedSpace = names.space(); + String doc = null; + Schema result = null; final boolean isTypeError = "error".equals(type); final boolean isTypeRecord = "record".equals(type); final boolean isTypeEnum = "enum".equals(type); final boolean isTypeFixed = "fixed".equals(type); + if (isTypeRecord || isTypeError || isTypeEnum || isTypeFixed) { String space = getOptionalText(schema, "namespace"); doc = getOptionalText(schema, "doc"); if (space == null) - space = savedSpace; + space = currentNameSpace; name = new Name(getRequiredText(schema, "name", "No name in schema"), space); - names.space(name.space); // set default namespace } - if (PRIMITIVES.containsKey(type)) { // primitive - result = create(PRIMITIVES.get(type)); - } else if (isTypeRecord || isTypeError) { // record - List fields = new ArrayList<>(); + if (isTypeRecord || isTypeError) { // record result = new RecordSchema(name, doc, isTypeError); - if (name != null) - names.add(result); + names.add(result); JsonNode fieldsNode = schema.get("fields"); + if (fieldsNode == null || !fieldsNode.isArray()) throw new SchemaParseException("Record has no fields: " + schema); - for (JsonNode field : fieldsNode) { - String fieldName = getRequiredText(field, "name", "No field name"); - String fieldDoc = getOptionalText(field, "doc"); - JsonNode fieldTypeNode = field.get("type"); - if (fieldTypeNode == null) - throw new SchemaParseException("No field type: " + field); - if (fieldTypeNode.isTextual() && names.get(fieldTypeNode.textValue()) == null) - throw new SchemaParseException(fieldTypeNode + " is not a defined name." + " The type of the \"" + fieldName - + "\" field must be a defined name or a {\"type\": ...} expression."); - Schema fieldSchema = parse(fieldTypeNode, names); - Field.Order order = Field.Order.ASCENDING; - JsonNode orderNode = field.get("order"); - if (orderNode != null) - order = Field.Order.valueOf(orderNode.textValue().toUpperCase(Locale.ENGLISH)); - JsonNode defaultValue = field.get("default"); - if (defaultValue != null - && (Type.FLOAT.equals(fieldSchema.getType()) || Type.DOUBLE.equals(fieldSchema.getType())) - && defaultValue.isTextual()) - defaultValue = new DoubleNode(Double.valueOf(defaultValue.textValue())); - Field f = new Field(fieldName, fieldSchema, fieldDoc, defaultValue, true, order); - Iterator i = field.fieldNames(); - while (i.hasNext()) { // add field props - String prop = i.next(); - if (!FIELD_RESERVED.contains(prop)) - f.addProp(prop, field.get(prop)); - } - f.aliases = parseAliases(field); - fields.add(f); - if (fieldSchema.getLogicalType() == null && getOptionalText(field, LOGICAL_TYPE_PROP) != null) - LOG.warn( - "Ignored the {}.{}.logicalType property (\"{}\"). It should probably be nested inside the \"type\" for the field.", - name, fieldName, getOptionalText(field, "logicalType")); - } - result.setFields(fields); + exploreFields(fieldsNode, names, name != null ? name.space : null); + } else if (isTypeEnum) { // enum JsonNode symbolsNode = schema.get("symbols"); if (symbolsNode == null || !symbolsNode.isArray()) @@ -1812,18 +1910,19 @@ static Schema parse(JsonNode schema, Names names) { if (enumDefault != null) defaultSymbol = enumDefault.textValue(); result = new EnumSchema(name, doc, symbols, defaultSymbol); - if (name != null) - names.add(result); + names.add(result); } else if (type.equals("array")) { // array JsonNode itemsNode = schema.get("items"); if (itemsNode == null) throw new SchemaParseException("Array has no items type: " + schema); - result = new ArraySchema(parse(itemsNode, names)); + final Schema items = Schema.parseNamesDeclared(itemsNode, names, currentNameSpace); + result = Schema.createArray(items); } else if (type.equals("map")) { // map JsonNode valuesNode = schema.get("values"); if (valuesNode == null) throw new SchemaParseException("Map has no values type: " + schema); - result = new MapSchema(parse(valuesNode, names)); + final Schema values = Schema.parseNamesDeclared(valuesNode, names, currentNameSpace); + result = Schema.createMap(values); } else if (isTypeFixed) { // fixed JsonNode sizeNode = schema.get("size"); if (sizeNode == null || !sizeNode.isInt()) @@ -1831,42 +1930,194 @@ static Schema parse(JsonNode schema, Names names) { result = new FixedSchema(name, doc, sizeNode.intValue()); if (name != null) names.add(result); - } else { // For unions with self reference - Name nameFromType = new Name(type, names.space); - if (names.containsKey(nameFromType)) { - return names.get(nameFromType); + } else if (PRIMITIVES.containsKey(type)) { + result = Schema.create(PRIMITIVES.get(type)); + } + if (result != null) { + Set reserved = SCHEMA_RESERVED; + if (isTypeEnum) { + reserved = ENUM_RESERVED; } - throw new SchemaParseException("Type not supported: " + type); + Schema.addProperties(schema, reserved, result); } - Iterator i = schema.fieldNames(); + return result; + } else if (schema.isArray()) { + List subs = new ArrayList<>(schema.size()); + schema.forEach((JsonNode item) -> { + Schema sub = Schema.parseNamesDeclared(item, names, currentNameSpace); + if (sub != null) { + subs.add(sub); + } + }); + return Schema.createUnion(subs); + } else if (schema.isTextual()) { + String value = schema.asText(); + return names.get(value); + } + return null; + } - Set reserved = SCHEMA_RESERVED; - if (isTypeEnum) { - reserved = ENUM_RESERVED; + private static void addProperties(JsonNode schema, Set reserved, Schema avroSchema) { + Iterator i = schema.fieldNames(); + while (i.hasNext()) { // add properties + String prop = i.next(); + if (!reserved.contains(prop)) // ignore reserved + avroSchema.addProp(prop, schema.get(prop)); + } + // parse logical type if present + avroSchema.logicalType = LogicalTypes.fromSchemaIgnoreInvalid(avroSchema); + // names.space(savedSpace); // restore space + if (avroSchema instanceof NamedSchema) { + Set aliases = parseAliases(schema); + if (aliases != null) // add aliases + for (String alias : aliases) + avroSchema.addAlias(alias); + } + } + + /** + * Explore record fields in order to fill names map with inner defined named + * types. + * + * @param fieldsNode : json node for field. + * @param names : names map. + * @param nameSpace : current working namespace. + */ + private static void exploreFields(JsonNode fieldsNode, Names names, String nameSpace) { + for (JsonNode field : fieldsNode) { + final JsonNode fieldType = field.get("type"); + if (fieldType != null) { + if (fieldType.isObject()) { + parseNamesDeclared(fieldType, names, nameSpace); + } else if (fieldType.isArray()) { + exploreFields(fieldType, names, nameSpace); + } else if (fieldType.isTextual() && field.isObject()) { + parseNamesDeclared(field, names, nameSpace); + } } - while (i.hasNext()) { // add properties - String prop = i.next(); - if (!reserved.contains(prop)) // ignore reserved - result.addProp(prop, schema.get(prop)); + } + } + + /** + * in complement of parseNamesDeclared, this method parse schema in details. + * + * @param schema : json schema. + * @param names : names map. + * @param currentSpace : current working name space. + * @return complete schema. + */ + static Schema parseCompleteSchema(JsonNode schema, Names names, String currentSpace) { + if (schema == null) { + throw new SchemaParseException("Cannot parse schema"); + } + if (schema.isTextual()) { + String type = schema.asText(); + Schema avroSchema = names.get(type); + if (avroSchema == null) { + avroSchema = names.get(currentSpace + "." + type); + } + return avroSchema; + } + if (schema.isArray()) { + List schemas = StreamSupport.stream(schema.spliterator(), false) + .map((JsonNode sub) -> parseCompleteSchema(sub, names, currentSpace)).collect(Collectors.toList()); + return Schema.createUnion(schemas); + } + if (schema.isObject()) { + Schema result = null; + String type = getRequiredText(schema, "type", "No type"); + Name name = null; + + final boolean isTypeError = "error".equals(type); + final boolean isTypeRecord = "record".equals(type); + final boolean isTypeArray = "array".equals(type); + + if (isTypeRecord || isTypeError || "enum".equals(type) || "fixed".equals(type)) { + // named schema + String space = getOptionalText(schema, "namespace"); + + if (space == null) + space = currentSpace; + name = new Name(getRequiredText(schema, "name", "No name in schema"), space); + + result = names.get(name); + if (result == null) { + throw new SchemaParseException("Unparsed field type " + name); + } } - // parse logical type if present - result.logicalType = LogicalTypes.fromSchemaIgnoreInvalid(result); - names.space(savedSpace); // restore space - if (result instanceof NamedSchema) { - Set aliases = parseAliases(schema); - if (aliases != null) // add aliases - for (String alias : aliases) - result.addAlias(alias); + if (isTypeRecord || isTypeError) { + if (result != null && !result.hasFields()) { + final List fields = new ArrayList<>(); + JsonNode fieldsNode = schema.get("fields"); + if (fieldsNode == null || !fieldsNode.isArray()) + throw new SchemaParseException("Record has no fields: " + schema); + + for (JsonNode field : fieldsNode) { + Field f = Field.parse(field, names, name.space); + + fields.add(f); + if (f.schema.getLogicalType() == null && getOptionalText(field, LOGICAL_TYPE_PROP) != null) + LOG.warn( + "Ignored the {}.{}.logicalType property (\"{}\"). It should probably be nested inside the \"type\" for the field.", + name, f.name, getOptionalText(field, "logicalType")); + } + result.setFields(fields); + } + } else if (isTypeArray) { + JsonNode items = schema.get("items"); + Schema schemaItems = parseCompleteSchema(items, names, currentSpace); + result = Schema.createArray(schemaItems); + } else if ("map".equals(type)) { + JsonNode values = schema.get("values"); + Schema mapItems = parseCompleteSchema(values, names, currentSpace); + result = Schema.createMap(mapItems); + } else if (result == null) { + result = names.get(currentSpace + "." + type); + if (result == null) { + result = names.get(type); + } } + + Set reserved = SCHEMA_RESERVED; + if ("enum".equals(type)) { + reserved = ENUM_RESERVED; + } + Schema.addProperties(schema, reserved, result); return result; - } else if (schema.isArray()) { // union - LockableArrayList types = new LockableArrayList<>(schema.size()); - for (JsonNode typeNode : schema) - types.add(parse(typeNode, names)); - return new UnionSchema(types); - } else { - throw new SchemaParseException("Schema not yet supported: " + schema); } + return null; + } + + static Schema parse(JsonNode schema, Names names) { + if (schema == null) { + throw new SchemaParseException("Cannot parse schema"); + } + + Schema result = Schema.parseNamesDeclared(schema, names, names.space); + Schema.parseCompleteSchema(schema, names, names.space); + + return result; + } + + static Schema resolveSchema(JsonNode schema, Names names, String currentNameSpace) { + String np = currentNameSpace; + String nodeName = getOptionalText(schema, "name"); + if (nodeName != null) { + final JsonNode nameSpace = schema.get("namespace"); + StringBuilder fullName = new StringBuilder(); + if (nameSpace != null && nameSpace.isTextual()) { + fullName.append(nameSpace.asText()).append("."); + np = nameSpace.asText(); + } + fullName.append(nodeName); + Schema schema1 = names.get(fullName.toString()); + + if (schema1 != null && schema1.getType() == Type.RECORD && !schema1.hasFields()) { + Schema.parseCompleteSchema(schema, names, np); + } + return schema1; + } + return null; } static Set parseAliases(JsonNode node) { diff --git a/lang/java/avro/src/test/java/org/apache/avro/TestSchema.java b/lang/java/avro/src/test/java/org/apache/avro/TestSchema.java index 58afc9e07ab..b4c259e92f0 100644 --- a/lang/java/avro/src/test/java/org/apache/avro/TestSchema.java +++ b/lang/java/avro/src/test/java/org/apache/avro/TestSchema.java @@ -27,16 +27,24 @@ import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; +import java.net.URL; import java.nio.charset.StandardCharsets; import java.nio.file.Files; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.Set; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.avro.Schema.Field; import org.apache.avro.Schema.Type; import org.apache.avro.generic.GenericData; + +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; public class TestSchema { @@ -413,6 +421,84 @@ void qualifiedName() { assertEquals("Int", nameInt.getQualified("space")); } + @Test + void enumLateDefine() { + String schemaString = "{\n" + " \"type\":\"record\",\n" + " \"name\": \"Main\",\n" + " \"fields\":[\n" + + " {\n" + " \"name\":\"f1\",\n" + " \"type\":\"Sub\"\n" + " },\n" + + " {\n" + " \"name\":\"f2\",\n" + " \"type\":{\n" + + " \"type\":\"enum\",\n" + " \"name\":\"Sub\",\n" + + " \"symbols\":[\"OPEN\",\"CLOSE\"]\n" + " }\n" + " }\n" + " ]\n" + "}"; + + final Schema schema = new Schema.Parser().parse(schemaString); + Schema f1Schema = schema.getField("f1").schema(); + Schema f2Schema = schema.getField("f2").schema(); + assertSame(f1Schema, f2Schema); + assertEquals(Type.ENUM, f1Schema.getType()); + String stringSchema = schema.toString(); + int definitionIndex = stringSchema.indexOf("\"symbols\":[\"OPEN\",\"CLOSE\"]"); + int usageIndex = stringSchema.indexOf("\"type\":\"Sub\""); + assertTrue(definitionIndex < usageIndex, "usage is before definition"); + } + + @Test + public void testRecordInArray() { + String schemaString = "{\n" + " \"type\": \"record\",\n" + " \"name\": \"TestRecord\",\n" + " \"fields\": [\n" + + " {\n" + " \"name\": \"value\",\n" + " \"type\": {\n" + " \"type\": \"record\",\n" + + " \"name\": \"Container\",\n" + " \"fields\": [\n" + " {\n" + + " \"name\": \"Optional\",\n" + " \"type\": {\n" + " \"type\": \"array\",\n" + + " \"items\": [\n" + " {\n" + " \"type\": \"record\",\n" + + " \"name\": \"optional_field_0\",\n" + " \"namespace\": \"\",\n" + + " \"doc\": \"\",\n" + " \"fields\": [\n" + " {\n" + + " \"name\": \"optional_field_1\",\n" + " \"type\": \"long\",\n" + + " \"doc\": \"\",\n" + " \"default\": 0\n" + + " }\n" + " ]\n" + " }\n" + " ]\n" + + " }\n" + " }\n" + " ]\n" + " }\n" + " }\n" + " ]\n" + "}"; + final Schema schema = new Schema.Parser().parse(schemaString); + assertNotNull(schema); + } + + /* + * @Test public void testRec() { String schemaString = + * "[{\"name\":\"employees\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"Pair1081149ea1d6eb80\",\"fields\":[{\"name\":\"key\",\"type\":\"int\"},{\"name\":\"value\",\"type\":{\"type\":\"record\",\"name\":\"EmployeeInfo2\",\"fields\":[{\"name\":\"companyMap\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"PairIntegerString\",\"fields\":[{\"name\":\"key\",\"type\":\"int\"},{\"name\":\"value\",\"type\":\"string\"}]},\"java-class\":\"java.util.HashMap\"}],\"default\":null},{\"name\":\"name\",\"type\":[\"null\",\"string\"],\"default\":null}]}}]},\"java-class\":\"java.util.HashMap\"}],\"default\":null}]"; + * final Schema schema = new Schema.Parser().parse(schemaString); + * Assert.assertNotNull(schema); + * + * } + */ + + @Test + public void testUnionFieldType() { + String schemaString = "{\"type\": \"record\", \"name\": \"Lisp\", \"fields\": [{\"name\":\"value\", \"type\":[\"null\", \"string\",{\"type\": \"record\", \"name\": \"Cons\", \"fields\": [{\"name\":\"car\", \"type\":\"Lisp\"},{\"name\":\"cdr\", \"type\":\"Lisp\"}]}]}]}"; + final Schema schema = new Schema.Parser().parse(schemaString); + Field value = schema.getField("value"); + Schema fieldSchema = value.schema(); + Schema subSchema = fieldSchema.getTypes().stream().filter((Schema s) -> s.getType() == Type.RECORD).findFirst() + .get(); + assertTrue(subSchema.hasFields()); + } + + @Test + public void parseAliases() throws JsonProcessingException { + String s1 = "{ \"aliases\" : [\"a1\", \"b1\"]}"; + ObjectMapper mapper = new ObjectMapper(); + JsonNode j1 = mapper.readTree(s1); + Set aliases = Schema.parseAliases(j1); + assertEquals(2, aliases.size()); + assertTrue(aliases.contains("a1")); + assertTrue(aliases.contains("b1")); + + String s2 = "{ \"aliases\" : {\"a1\": \"b1\"}}"; + JsonNode j2 = mapper.readTree(s2); + + SchemaParseException ex = assertThrows(SchemaParseException.class, () -> Schema.parseAliases(j2)); + assertTrue(ex.getMessage().contains("aliases not an array")); + + String s3 = "{ \"aliases\" : [11, \"b1\"]}"; + JsonNode j3 = mapper.readTree(s3); + SchemaParseException ex3 = assertThrows(SchemaParseException.class, () -> Schema.parseAliases(j3)); + assertTrue(ex3.getMessage().contains("alias not a string")); + } + @Test void testContentAfterAvsc() throws Exception { Schema.Parser parser = new Schema.Parser(); @@ -446,6 +532,32 @@ void testContentAfterAvscInFile() throws Exception { assertThrows(SchemaParseException.class, () -> parser.parse(avscFile)); } + @Test + void testParseMultipleFile() throws IOException { + URL directory = Thread.currentThread().getContextClassLoader().getResource("multipleFile"); + File f1 = new File(directory.getPath(), "ApplicationEvent.avsc"); + File f2 = new File(directory.getPath(), "DocumentInfo.avsc"); + File f3 = new File(directory.getPath(), "MyResponse.avsc"); + Assertions.assertTrue(f1.exists(), "File not exist for test " + f1.getPath()); + Assertions.assertTrue(f2.exists(), "File not exist for test " + f2.getPath()); + Assertions.assertTrue(f3.exists(), "File not exist for test " + f3.getPath()); + + final List schemas = new Schema.Parser().parse(Arrays.asList(f1, f2, f3)); + Assertions.assertEquals(3, schemas.size()); + Schema schemaAppEvent = schemas.get(0); + Schema schemaDocInfo = schemas.get(1); + Schema schemaResponse = schemas.get(2); + + Assertions.assertNotNull(schemaAppEvent); + Assertions.assertEquals(3, schemaAppEvent.getFields().size()); + Field documents = schemaAppEvent.getField("documents"); + Schema docSchema = documents.schema().getTypes().get(1).getElementType(); + Assertions.assertEquals(docSchema, schemaDocInfo); + + Assertions.assertNotNull(schemaDocInfo); + Assertions.assertNotNull(schemaResponse); + } + @Test void add_types() { String schemaRecord2 = "{\"type\":\"record\", \"name\":\"record2\", \"fields\": [" diff --git a/lang/java/avro/src/test/resources/multipleFile/ApplicationEvent.avsc b/lang/java/avro/src/test/resources/multipleFile/ApplicationEvent.avsc new file mode 100644 index 00000000000..6902084350f --- /dev/null +++ b/lang/java/avro/src/test/resources/multipleFile/ApplicationEvent.avsc @@ -0,0 +1,28 @@ +{ + "namespace": "model", + "type": "record", + "doc": "", + "name": "ApplicationEvent", + "fields": [ + { + "name": "applicationId", + "type": "string", + "doc": "Application ID" + }, + { + "name": "status", + "type": "string", + "doc": "Application Status" + }, + { + "name": "documents", + "type": ["null", { + "type": "array", + "items": "model.DocumentInfo" + }], + "doc": "", + "default": null + } + ] + +} diff --git a/lang/java/avro/src/test/resources/multipleFile/DocumentInfo.avsc b/lang/java/avro/src/test/resources/multipleFile/DocumentInfo.avsc new file mode 100644 index 00000000000..95dd4243ea6 --- /dev/null +++ b/lang/java/avro/src/test/resources/multipleFile/DocumentInfo.avsc @@ -0,0 +1,19 @@ +{ + "namespace": "model", + "type": "record", + "doc": "", + "name": "DocumentInfo", + "fields": [ + { + "name": "documentId", + "type": "string", + "doc": "Document ID" + }, + { + "name": "filePath", + "type": "string", + "doc": "Document Path" + } + ] + +} diff --git a/lang/java/avro/src/test/resources/multipleFile/MyResponse.avsc b/lang/java/avro/src/test/resources/multipleFile/MyResponse.avsc new file mode 100644 index 00000000000..ac6d08291d9 --- /dev/null +++ b/lang/java/avro/src/test/resources/multipleFile/MyResponse.avsc @@ -0,0 +1,14 @@ +{ + "namespace": "model", + "type": "record", + "doc": "", + "name": "MyResponse", + "fields": [ + { + "name": "isSuccessful", + "type": "boolean", + "doc": "Indicator for successful or unsuccessful call" + } + ] + +} diff --git a/lang/java/avro/src/test/resources/multipleFile/README.md b/lang/java/avro/src/test/resources/multipleFile/README.md new file mode 100644 index 00000000000..fe3541b660e --- /dev/null +++ b/lang/java/avro/src/test/resources/multipleFile/README.md @@ -0,0 +1,8 @@ +## test for parsing multiple files. +This folder aims to test `public List Schema.parse(Iterable sources) throws IOException` method. + +The objective is to check that a record schema define in a file can be use in another record schema as a field type. +Here, ApplicationEvent.avsc file contains a field of type DocumentInfo, defined in file DocumentInfo.avsc. + +The is written at TestSchema.testParseMultipleFile. + diff --git a/lang/java/compiler/src/main/java/org/apache/avro/compiler/specific/SpecificCompiler.java b/lang/java/compiler/src/main/java/org/apache/avro/compiler/specific/SpecificCompiler.java index 1f248ed6daf..c6dd0bcc11a 100644 --- a/lang/java/compiler/src/main/java/org/apache/avro/compiler/specific/SpecificCompiler.java +++ b/lang/java/compiler/src/main/java/org/apache/avro/compiler/specific/SpecificCompiler.java @@ -183,6 +183,12 @@ public SpecificCompiler(Schema schema) { this.protocol = null; } + public SpecificCompiler(Iterable schemas) { + this(); + schemas.forEach(this::enqueue); + this.protocol = null; + } + /** * Creates a specific compiler with the given type to use for date/time related * logical types. diff --git a/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java b/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java index 38a81326ae2..9bb3e281a7d 100644 --- a/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java +++ b/lang/java/ipc/src/test/java/org/apache/avro/TestSchema.java @@ -513,7 +513,7 @@ void nullPointer() throws Exception { private static void checkParseError(String json) { try { new Schema.Parser().parse(json); - } catch (SchemaParseException e) { + } catch (AvroRuntimeException e) { return; } fail("Should not have parsed: " + json); diff --git a/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/AbstractAvroMojo.java b/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/AbstractAvroMojo.java index 5120aa62170..e8186d42cdd 100644 --- a/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/AbstractAvroMojo.java +++ b/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/AbstractAvroMojo.java @@ -283,15 +283,13 @@ private String[] getIncludedFiles(String absPath, String[] excludes, String[] in } private void compileFiles(String[] files, File sourceDir, File outDir) throws MojoExecutionException { - for (String filename : files) { - try { - // Need to register custom logical type factories before schema compilation. - loadLogicalTypesFactories(); - doCompile(filename, sourceDir, outDir); - } catch (IOException e) { - throw new MojoExecutionException("Error compiling protocol file " + filename + " to " + outDir, e); - } + // Need to register custom logical type factories before schema compilation. + try { + loadLogicalTypesFactories(); + } catch (IOException e) { + throw new MojoExecutionException("Error while loading logical types factories ", e); } + this.doCompile(files, sourceDir, outDir); } private void loadLogicalTypesFactories() throws IOException, MojoExecutionException { @@ -332,6 +330,16 @@ protected List instantiateAdditionalVelocityTools() { return velocityTools; } + protected void doCompile(String[] files, File sourceDirectory, File outputDirectory) throws MojoExecutionException { + for (String filename : files) { + try { + doCompile(filename, sourceDirectory, outputDirectory); + } catch (IOException e) { + throw new MojoExecutionException("Error compiling protocol file " + filename + " to " + outputDirectory, e); + } + } + } + protected abstract void doCompile(String filename, File sourceDirectory, File outputDirectory) throws IOException; protected URLClassLoader createClassLoader() throws DependencyResolutionRequiredException, MalformedURLException { diff --git a/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/SchemaMojo.java b/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/SchemaMojo.java index a4134cbab76..de2b18e7f87 100644 --- a/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/SchemaMojo.java +++ b/lang/java/maven-plugin/src/main/java/org/apache/avro/mojo/SchemaMojo.java @@ -18,15 +18,21 @@ package org.apache.avro.mojo; +import org.apache.avro.SchemaParseException; import org.apache.avro.generic.GenericData.StringType; import java.io.File; import java.io.IOException; +import java.net.MalformedURLException; import java.net.URLClassLoader; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; import org.apache.avro.Schema; import org.apache.avro.compiler.specific.SpecificCompiler; import org.apache.maven.artifact.DependencyResolutionRequiredException; +import org.apache.maven.plugin.MojoExecutionException; /** * Generate Java classes from Avro schema files (.avsc) @@ -72,21 +78,27 @@ public class SchemaMojo extends AbstractAvroMojo { private String errorSpecificClass = "org.apache.avro.specific.SpecificExceptionBase"; @Override - protected void doCompile(String filename, File sourceDirectory, File outputDirectory) throws IOException { - File src = new File(sourceDirectory, filename); - final Schema schema; + protected void doCompile(String[] filesName, File sourceDirectory, File outputDirectory) + throws MojoExecutionException { + final List sourceFiles = Arrays.stream(filesName) + .map((String filename) -> new File(sourceDirectory, filename)).collect(Collectors.toList()); + final List schemas; // This is necessary to maintain backward-compatibility. If there are // no imported files then isolate the schemas from each other, otherwise // allow them to share a single schema so reuse and sharing of schema // is possible. - if (imports == null) { - schema = new Schema.Parser().parse(src); - } else { - schema = schemaParser.parse(src); + try { + if (imports == null) { + schemas = new Schema.Parser().parse(sourceFiles); + } else { + schemas = schemaParser.parse(sourceFiles); + } + } catch (IOException | SchemaParseException ex) { + throw new MojoExecutionException("Error compiling one file of " + sourceDirectory + " to " + outputDirectory, ex); } - final SpecificCompiler compiler = new SpecificCompiler(schema); + final SpecificCompiler compiler = new SpecificCompiler(schemas); compiler.setTemplateDir(templateDirectory); compiler.setStringType(StringType.valueOf(stringType)); compiler.setFieldVisibility(getFieldVisibility()); @@ -100,14 +112,26 @@ protected void doCompile(String filename, File sourceDirectory, File outputDirec for (String customConversion : customConversions) { compiler.addCustomConversion(classLoader.loadClass(customConversion)); } - } catch (ClassNotFoundException | DependencyResolutionRequiredException e) { - throw new IOException(e); + } catch (ClassNotFoundException | DependencyResolutionRequiredException | MalformedURLException e) { + throw new MojoExecutionException("Compilation error: Can't add custom conversion", e); } compiler.setOutputCharacterEncoding(project.getProperties().getProperty("project.build.sourceEncoding")); compiler.setAdditionalVelocityTools(instantiateAdditionalVelocityTools()); compiler.setRecordSpecificClass(this.recordSpecificClass); compiler.setErrorSpecificClass(this.errorSpecificClass); - compiler.compileToDestination(src, outputDirectory); + for (File src : sourceFiles) { + try { + compiler.compileToDestination(src, outputDirectory); + } catch (IOException ex) { + throw new MojoExecutionException("Compilation error with file " + src + " to " + outputDirectory, ex); + } + } + } + + @Override + protected void doCompile(final String filename, final File sourceDirectory, final File outputDirectory) + throws IOException { + // Not call. } @Override diff --git a/lang/java/maven-plugin/src/test/avro/multipleSchemas/ApplicationEvent.avsc b/lang/java/maven-plugin/src/test/avro/multipleSchemas/ApplicationEvent.avsc new file mode 100644 index 00000000000..6902084350f --- /dev/null +++ b/lang/java/maven-plugin/src/test/avro/multipleSchemas/ApplicationEvent.avsc @@ -0,0 +1,28 @@ +{ + "namespace": "model", + "type": "record", + "doc": "", + "name": "ApplicationEvent", + "fields": [ + { + "name": "applicationId", + "type": "string", + "doc": "Application ID" + }, + { + "name": "status", + "type": "string", + "doc": "Application Status" + }, + { + "name": "documents", + "type": ["null", { + "type": "array", + "items": "model.DocumentInfo" + }], + "doc": "", + "default": null + } + ] + +} diff --git a/lang/java/maven-plugin/src/test/avro/multipleSchemas/DocumentInfo.avsc b/lang/java/maven-plugin/src/test/avro/multipleSchemas/DocumentInfo.avsc new file mode 100644 index 00000000000..95dd4243ea6 --- /dev/null +++ b/lang/java/maven-plugin/src/test/avro/multipleSchemas/DocumentInfo.avsc @@ -0,0 +1,19 @@ +{ + "namespace": "model", + "type": "record", + "doc": "", + "name": "DocumentInfo", + "fields": [ + { + "name": "documentId", + "type": "string", + "doc": "Document ID" + }, + { + "name": "filePath", + "type": "string", + "doc": "Document Path" + } + ] + +} diff --git a/lang/java/maven-plugin/src/test/avro/multipleSchemas/MyResponse.avsc b/lang/java/maven-plugin/src/test/avro/multipleSchemas/MyResponse.avsc new file mode 100644 index 00000000000..ac6d08291d9 --- /dev/null +++ b/lang/java/maven-plugin/src/test/avro/multipleSchemas/MyResponse.avsc @@ -0,0 +1,14 @@ +{ + "namespace": "model", + "type": "record", + "doc": "", + "name": "MyResponse", + "fields": [ + { + "name": "isSuccessful", + "type": "boolean", + "doc": "Indicator for successful or unsuccessful call" + } + ] + +} diff --git a/lang/java/maven-plugin/src/test/avro/multipleSchemas/README.md b/lang/java/maven-plugin/src/test/avro/multipleSchemas/README.md new file mode 100644 index 00000000000..fe3541b660e --- /dev/null +++ b/lang/java/maven-plugin/src/test/avro/multipleSchemas/README.md @@ -0,0 +1,8 @@ +## test for parsing multiple files. +This folder aims to test `public List Schema.parse(Iterable sources) throws IOException` method. + +The objective is to check that a record schema define in a file can be use in another record schema as a field type. +Here, ApplicationEvent.avsc file contains a field of type DocumentInfo, defined in file DocumentInfo.avsc. + +The is written at TestSchema.testParseMultipleFile. + diff --git a/lang/java/maven-plugin/src/test/resources/unit/schema/pom-multiple-schema.xml b/lang/java/maven-plugin/src/test/resources/unit/schema/pom-multiple-schema.xml new file mode 100644 index 00000000000..10b0b3fae80 --- /dev/null +++ b/lang/java/maven-plugin/src/test/resources/unit/schema/pom-multiple-schema.xml @@ -0,0 +1,66 @@ + + + + 4.0.0 + + + avro-parent + org.apache.avro + 1.12.0-SNAPSHOT + ../../../../../../../../../pom.xml + + + avro-maven-plugin-test + jar + + testproject + + + + + avro-maven-plugin + + + schema + + schema + + + + + ${basedir}/src/test/avro/multipleSchemas + ${basedir}/target/test-harness/schema + + + + + + + + org.apache.avro + avro + ${parent.version} + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.version} + + +