fix: resolution of complex type variants in Avro unions#9328
fix: resolution of complex type variants in Avro unions#9328mzabaluev wants to merge 8 commits intoapache:mainfrom
Conversation
Move the newly added tests above the two large comprehensive e2e tests.
711a3a5 to
409eef0
Compare
Test an Avro reader schema evolution where a field is added to a nested record, and the record is typed as nullable in the reader schema. Reuse the existing nested_records.avro test file which provides a wealth of nested record types to exercise different cases of nesting, nullability and defaults.
Move the nullability index value into the NullablePlan enum used by the decoder.
When an Avro reader schema has a union type that needs to be resolved against the type in the writer schema, resolution information other than primitive type promotions was not properly handled when creating the decoder. Extend the union resolution information in the decoder with variant data for enum remapping and record projection. The Projector data structure with Skipper decoders makes part of this information, which necessitated some refactoring.
Do apply resolutions other than promotions; only the promotions are actually worked into the decoder when decode_with_promotion is called. A default value is a valid resolution case for fields added in the reader schema with a default value, don't return an error, but implement as a ResolutionPlan::DefaultValue variant.
In the Avro decoder, the NullabilityPlan::ReadTag variant did not preserve the resolution information that needs to be applied to conform to the writer schema.
Change the expected results to the new behavior: the resolution for a writer union type that is not an exact match is always Union to signal that the tag needs to be read. Certain ResolutionInfo values could be equivalently replaced with None which means tag reading and direct promotion, but the nullability plan will be constructed the same way regardless. Conversely, the resolution for a non-union writer type may be other than Union.
409eef0 to
6bdb068
Compare
jecsand838
left a comment
There was a problem hiding this comment.
@mzabaluev LMGTM!
Overall these changes are extremely clean. Just left a few comments.
| panic!( | ||
| "unexpected union resolution info for non-union writer and union reader type", | ||
| ); |
There was a problem hiding this comment.
This new panic in decoder construction can abort the process for malformed/partial union resolution state. Since this path already returns Result, I think using return AvroError::SchemaError (or ParseError) instead of panicking would be better, so callers can decide how to handle the failure.
| let Some((_, resolution)) = | ||
| info.writer_to_reader[nullability.non_null_index()].as_ref() | ||
| else { | ||
| panic!("unexpected union resolution info for nullable writer type"); |
There was a problem hiding this comment.
Same here. The panic on unexpected nullable-union mapping turns a recoverable schema mismatch into a hard crash.
| panic!("enum mapping resolution provided for non-enum decoder"); | ||
| }; | ||
| let raw = buf.get_int()?; | ||
| let resolved = res.resolve(raw)?; | ||
| indices.push(resolved); | ||
| Ok(()) | ||
| } | ||
| ResolutionPlan::Record(proj) => { | ||
| let Self::Record(_, encodings, _, _) = self else { | ||
| panic!("record projection provided for non-record decoder"); |
There was a problem hiding this comment.
Same here as well. We should probably be passing AvroErrors back to the caller instead of potentially crashing imo.
| ResolutionPlan::Record(ProjectorBuilder::try_new(r, field_defaults).build()?), | ||
| ), | ||
| (_, ResolutionInfo::Record(_)) => { | ||
| unreachable!("record resolution on non-record decoder") |
There was a problem hiding this comment.
Using unreachable! here is potentially unsafe. Probably best to also pass an AvroError back here too. Perhaps it's unreachable right now, but all it takes is for an edge case to occur or some change down the line to create the potential for a crash.
| type_ids: Vec<i8>, | ||
| offsets: Vec<i32>, | ||
| branches: Vec<Decoder>, | ||
| counts: Vec<i32>, | ||
| reader_type_codes: Vec<i8>, | ||
| branches: UnionDecoderBranches, |
There was a problem hiding this comment.
Nice! Love the clean up here :)
| let children = reader_variants | ||
| .iter() | ||
| .map(|variant| self.parse_type(variant, namespace)) | ||
| .collect::<Result<Vec<_>, _>>()?; |
There was a problem hiding this comment.
I think there's a risk of a regression here by not using the resolve_type to preserve field level resolution, especially the DefaultValue.
See the test below and let me know what you think.
#[test]
fn test_resolve_writer_non_union_record_to_reader_union_preserves_defaults() {
// Writer: record Inner{a: int}
// Reader: union [Inner{a: int, b: int default 42}, string]
// The matching child (Inner) should preserve DefaultValue(Int(42)) on field b.
let writer = Schema::Complex(ComplexType::Record(Record {
name: "Inner",
namespace: None,
doc: None,
aliases: vec![],
fields: vec![AvroFieldSchema {
name: "a",
doc: None,
r#type: mk_primitive(PrimitiveType::Int),
default: None,
aliases: vec![],
}],
attributes: Attributes::default(),
}));
let reader = mk_union(vec![
Schema::Complex(ComplexType::Record(Record {
name: "Inner",
namespace: None,
doc: None,
aliases: vec![],
fields: vec![
AvroFieldSchema {
name: "a",
doc: None,
r#type: mk_primitive(PrimitiveType::Int),
default: None,
aliases: vec![],
},
AvroFieldSchema {
name: "b",
doc: None,
r#type: mk_primitive(PrimitiveType::Int),
default: Some(Value::Number(serde_json::Number::from(42))),
aliases: vec![],
},
],
attributes: Attributes::default(),
})),
mk_primitive(PrimitiveType::String),
]);
let mut maker = Maker::new(false, false);
let dt = maker
.make_data_type(&writer, Some(&reader), None)
.expect("resolution should succeed");
// Verify the union resolution structure
let resolved = match dt.resolution.as_ref() {
Some(ResolutionInfo::Union(u)) => u,
other => panic!("expected union resolution info, got {other:?}"),
};
assert!(!resolved.writer_is_union && resolved.reader_is_union);
// The matching child (Inner at index 0) should have field b with DefaultValue
let children = match dt.codec() {
Codec::Union(children, _, _) => children,
other => panic!("expected union codec, got {other:?}"),
};
let inner_fields = match children[0].codec() {
Codec::Struct(f) => f,
other => panic!("expected struct codec for Inner, got {other:?}"),
};
assert_eq!(inner_fields.len(), 2);
assert_eq!(inner_fields[1].name(), "b");
assert_eq!(
inner_fields[1].data_type().resolution,
Some(ResolutionInfo::DefaultValue(AvroLiteral::Int(42))),
"field b should have DefaultValue(Int(42)) from schema resolution"
);
}
Which issue does this PR close?
Rationale for this change
When an Avro reader schema has a union type that needs to be resolved against the type in the writer schema, resolution information other than primitive type promotions is not properly handled when creating the decoder.
For example, when the reader schema has a nullable record field that has an added nested field on top of the fields defined in the writer schema, the record type resolution needs to be applied, using a projection with the default field value.
What changes are included in this PR?
Extend the union resolution information in the decoder with variant
data for enum remapping and record projection. The
Projectordatastructure with
Skipperdecoders makes part of this information,which necessitated some refactoring.
Are these changes tested?
TODO:
Are there any user-facing changes?
No.