Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
ad22aa0
Bump com.google.api.grpc:proto-google-common-protos (#3177)
dependabot[bot] Mar 30, 2025
b69149f
Bump Parquet Format to 2.11 (#3181)
Fokko Mar 31, 2025
6fe139e
Enable jitpack.io repo only when brotli is required (#3180)
pan3793 Mar 31, 2025
a24ea5c
Minor: Use logicaltypes constants in ParquetMetadataConverter (#3186)
aihuaxu Apr 10, 2025
00b6bab
GH-3188: Set the global configured column stats enable flag to defaul…
huaxiangsun Apr 12, 2025
66e0c4e
GH-3070: Add Variant logical type annotation to parquet-java (#3072)
aihuaxu Apr 17, 2025
a8c8997
Variant support from https://github.com/apache/parquet-java/pull/3117
cashmand Mar 21, 2025
f86b924
Create VariantSchema
cashmand Mar 25, 2025
2d708a3
Fix after rebase
cashmand Apr 30, 2025
186301a
Remove hack
cashmand Apr 30, 2025
90db4cc
Fix tests
cashmand Apr 30, 2025
ddd7090
Set explicit schema
cashmand Apr 30, 2025
be9321c
More tests
cashmand Apr 30, 2025
733f445
Fix to handle empty object
cashmand May 1, 2025
b15d8ba
More tests, cleanup
cashmand May 1, 2025
58ffda7
More tests
cashmand May 1, 2025
68f407c
More tests
cashmand May 1, 2025
58e8760
Cleanup
cashmand May 1, 2025
2276e00
Handle conflicting empty object and non-object, add test
cashmand May 1, 2025
d191886
Fix offset issue and add test
cashmand May 1, 2025
67bace9
Finish porting remaining Iceberg tests
cashmand May 1, 2025
46f9a63
Cleanup
cashmand May 1, 2025
3761848
Cleanup
cashmand May 2, 2025
78e2935
import Assert.*
cashmand May 2, 2025
7ea19b0
Cleanup
cashmand May 2, 2025
b5f6c20
Match Iceberg on invalid values, and more cleanup
cashmand May 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions parquet-avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@
<artifactId>parquet-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-variant</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@
import org.apache.parquet.io.api.Binary;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.PrimitiveStringifier;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.variant.VariantColumnConverter;

public class AvroConverters {

Expand Down Expand Up @@ -363,4 +365,26 @@ public String convert(Binary binary) {
return stringifier.stringify(binary);
}
}

static final class FieldVariantConverter<T> extends VariantColumnConverter {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we want to produce a Variant from apache#3072 rather than a record with a record determined by the schema?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand the comment. I'm not that familiar with Avro, but doesn't the type we produce need to be an Avro type? What would it mean to produce a Variant?

protected final ParentValueContainer parent;
private final Schema avroSchema;
private final GenericData model;

public FieldVariantConverter(
ParentValueContainer parent, GroupType schema, Schema avroSchema, GenericData model) {
super(schema);
this.avroSchema = avroSchema;
this.model = model;
this.parent = parent;
}

@Override
public void addVariant(Binary value, Binary metadata) {
T currentRecord = (T) model.newRecord(null, avroSchema);
model.setField(currentRecord, "metadata", 0, metadata.toByteBuffer());
model.setField(currentRecord, "value", 1, value.toByteBuffer());
parent.add(currentRecord);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.io.api.PrimitiveConverter;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;

Expand Down Expand Up @@ -168,7 +169,12 @@ private static Converter newConverter(Schema schema, Type type, GenericData mode
case MAP:
return new MapConverter(parent, type.asGroupType(), schema, model);
case RECORD:
return new AvroIndexedRecordConverter<>(parent, type.asGroupType(), schema, model);
if (type.getLogicalTypeAnnotation()
instanceof LogicalTypeAnnotation.VariantLogicalTypeAnnotation) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Continuation indent should be 2 indents (4 spaces)

return new AvroConverters.FieldVariantConverter(parent, type.asGroupType(), schema, model);
} else {
return new AvroIndexedRecordConverter<>(parent, type.asGroupType(), schema, model);
}
case STRING:
return new AvroConverters.FieldStringConverter(parent);
case UNION:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.parquet.io.api.Converter;
import org.apache.parquet.io.api.GroupConverter;
import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.slf4j.Logger;
Expand Down Expand Up @@ -383,7 +384,12 @@ private static Converter newConverter(
}
return newStringConverter(schema, model, parent);
case RECORD:
return new AvroRecordConverter(parent, type.asGroupType(), schema, model);
if (type.getLogicalTypeAnnotation()
instanceof LogicalTypeAnnotation.VariantLogicalTypeAnnotation) {
return new AvroConverters.FieldVariantConverter(parent, type.asGroupType(), schema, model);
} else {
return new AvroRecordConverter(parent, type.asGroupType(), schema, model);
}
case ENUM:
return new AvroConverters.FieldEnumConverter(parent, schema, model);
case ARRAY:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,6 +466,21 @@ public Optional<Schema> visit(
LogicalTypeAnnotation.EnumLogicalTypeAnnotation enumLogicalType) {
return of(Schema.create(Schema.Type.STRING));
}

@Override
public Optional<Schema> visit(
LogicalTypeAnnotation.VariantLogicalTypeAnnotation variantLogicalType) {
String name = parquetGroupType.getName();
SchemaBuilder.FieldAssembler<Schema> builder = SchemaBuilder.builder(
namespace(name, names))
.record(name)
.fields();
builder.name("metadata")
.type(Schema.create(Schema.Type.BYTES))
.noDefault();
builder.name("value").type().optional().type(Schema.create(Schema.Type.BYTES));
return of(builder.endRecord());
}
})
.orElseThrow(
() -> new UnsupportedOperationException("Cannot convert Parquet type " + parquetType));
Expand Down
Loading