Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,20 +47,25 @@ public class CorruptStatistics {
private static final SemanticVersion CDH_5_PARQUET_251_FIXED_END = new SemanticVersion(1, 5, 0);

/**
* Decides if the statistics from a file created by createdBy (the created_by field from parquet format)
* should be ignored because they are potentially corrupt.
* Returns whether the given column type is one of the types affected by the PARQUET-251 bug
* (BINARY or FIXED_LEN_BYTE_ARRAY).
*
* @param createdBy the created-by string from a file footer
* @param columnType the type of the column that this is checking
* @return true if the statistics may be invalid and should be ignored, false otherwise
* @param columnType the primitive type of the column
* @return true if this column type could have corrupt statistics
*/
public static boolean shouldIgnoreStatistics(String createdBy, PrimitiveTypeName columnType) {

if (columnType != PrimitiveTypeName.BINARY && columnType != PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY) {
// the bug only applies to binary columns
return false;
}
public static boolean isCorruptStatisticsColumnType(PrimitiveTypeName columnType) {
return columnType == PrimitiveTypeName.BINARY || columnType == PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY;
}

/**
* Determines whether a file (identified by its created_by string) was written by a version of
* parquet-mr that had the PARQUET-251 statistics bug. This is a file-level check that does not
* consider column type.
*
* @param createdBy the created-by string from a file footer
* @return true if the file was written by a version with the corrupt statistics bug
*/
public static boolean fileHasCorruptStatistics(String createdBy) {
if (Strings.isNullOrEmpty(createdBy)) {
// created_by is not populated, which could have been caused by
// parquet-mr during the same time as PARQUET-251, see PARQUET-297
Expand Down Expand Up @@ -103,6 +108,22 @@ public static boolean shouldIgnoreStatistics(String createdBy, PrimitiveTypeName
}
}

/**
* Decides if the statistics from a file created by createdBy (the created_by field from parquet format)
* should be ignored because they are potentially corrupt.
*
* @param createdBy the created-by string from a file footer
* @param columnType the type of the column that this is checking
* @return true if the statistics may be invalid and should be ignored, false otherwise
*/
public static boolean shouldIgnoreStatistics(String createdBy, PrimitiveTypeName columnType) {
if (!isCorruptStatisticsColumnType(columnType)) {
// the bug only applies to binary columns
return false;
}
return fileHasCorruptStatistics(createdBy);
}

private static void warnParseErrorOnce(String createdBy, Throwable e) {
if (!alreadyLogged.getAndSet(true)) {
LOG.warn("Ignoring statistics because created_by could not be parsed (see PARQUET-251): " + createdBy, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,4 +124,34 @@ public void testDistributionCorruptStatistics() {
assertTrue(CorruptStatistics.shouldIgnoreStatistics(
"parquet-mr version 1.7.0 (build abcd)", PrimitiveTypeName.BINARY));
}

@Test
public void testIsCorruptStatisticsColumnType() {
assertTrue(CorruptStatistics.isCorruptStatisticsColumnType(PrimitiveTypeName.BINARY));
assertTrue(CorruptStatistics.isCorruptStatisticsColumnType(PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY));
assertFalse(CorruptStatistics.isCorruptStatisticsColumnType(PrimitiveTypeName.INT32));
assertFalse(CorruptStatistics.isCorruptStatisticsColumnType(PrimitiveTypeName.INT64));
assertFalse(CorruptStatistics.isCorruptStatisticsColumnType(PrimitiveTypeName.DOUBLE));
assertFalse(CorruptStatistics.isCorruptStatisticsColumnType(PrimitiveTypeName.FLOAT));
assertFalse(CorruptStatistics.isCorruptStatisticsColumnType(PrimitiveTypeName.BOOLEAN));
assertFalse(CorruptStatistics.isCorruptStatisticsColumnType(PrimitiveTypeName.INT96));
}

@Test
public void testFileHasCorruptStatistics() {
// Corrupt versions
assertTrue(CorruptStatistics.fileHasCorruptStatistics("parquet-mr version 1.6.0 (build abcd)"));
assertTrue(CorruptStatistics.fileHasCorruptStatistics("parquet-mr version 1.4.2 (build abcd)"));
assertTrue(CorruptStatistics.fileHasCorruptStatistics("parquet-mr version 1.7.999 (build abcd)"));
// Null/empty
assertTrue(CorruptStatistics.fileHasCorruptStatistics(null));
assertTrue(CorruptStatistics.fileHasCorruptStatistics(""));
// Unparseable
assertTrue(CorruptStatistics.fileHasCorruptStatistics("unparseable string"));
// Fixed versions
assertFalse(CorruptStatistics.fileHasCorruptStatistics("parquet-mr version 1.8.0 (build abcd)"));
assertFalse(CorruptStatistics.fileHasCorruptStatistics("parquet-mr version 2.0.0 (build abcd)"));
// Non-parquet-mr applications
assertFalse(CorruptStatistics.fileHasCorruptStatistics("impala version 1.6.0 (build abcd)"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.parquet.CorruptStatistics;
import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.Preconditions;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.column.EncodingStats;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.column.statistics.BinaryStatistics;
Expand Down Expand Up @@ -926,6 +927,13 @@ public static org.apache.parquet.column.statistics.Statistics fromParquetStatist
// Visible for testing
static org.apache.parquet.column.statistics.Statistics fromParquetStatisticsInternal(
String createdBy, Statistics formatStats, PrimitiveType type, SortOrder typeSortOrder) {
boolean fileHasCorruptStats = CorruptStatistics.fileHasCorruptStatistics(createdBy);
return fromParquetStatisticsInternal(formatStats, type, typeSortOrder, fileHasCorruptStats);
}

// Core implementation using pre-computed file-level corrupt stats flag
static org.apache.parquet.column.statistics.Statistics fromParquetStatisticsInternal(
Statistics formatStats, PrimitiveType type, SortOrder typeSortOrder, boolean fileHasCorruptStats) {
// create stats object based on the column type
org.apache.parquet.column.statistics.Statistics.Builder statsBuilder =
org.apache.parquet.column.statistics.Statistics.getBuilderForReading(type);
Expand All @@ -948,8 +956,10 @@ static org.apache.parquet.column.statistics.Statistics fromParquetStatisticsInte
// valid with the type's sort order. In previous releases, all stats were
// aggregated using a signed byte-wise ordering, which isn't valid for all the
// types (e.g. strings, decimals etc.).
if (!CorruptStatistics.shouldIgnoreStatistics(createdBy, type.getPrimitiveTypeName())
&& (sortOrdersMatch || maxEqualsMin)) {
// The fileHasCorruptStats flag applies only to BINARY and FIXED_LEN_BYTE_ARRAY columns.
boolean ignoreForThisColumn =
fileHasCorruptStats && CorruptStatistics.isCorruptStatisticsColumnType(type.getPrimitiveTypeName());
if (!ignoreForThisColumn && (sortOrdersMatch || maxEqualsMin)) {
if (isSet) {
statsBuilder.withMin(formatStats.min.array());
statsBuilder.withMax(formatStats.max.array());
Expand Down Expand Up @@ -1794,13 +1804,23 @@ public FileMetaDataAndRowGroupOffsetInfo visit(RangeMetadataFilter filter) throw

public ColumnChunkMetaData buildColumnChunkMetaData(
ColumnMetaData metaData, ColumnPath columnPath, PrimitiveType type, String createdBy) {
boolean fileHasCorruptStats = CorruptStatistics.fileHasCorruptStatistics(createdBy);
return buildColumnChunkMetaData(metaData, columnPath, type, fileHasCorruptStats);
}

ColumnChunkMetaData buildColumnChunkMetaData(

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

buildColumnChunkMetaData can delegate to a package-private overload that takes the boolean similar to what you have done but with few changes,

public ColumnChunkMetaData buildColumnChunkMetaData(
      ColumnMetaData metaData, ColumnPath columnPath, PrimitiveType type, String createdBy) {
    return buildColumnChunkMetaData(
        metaData, columnPath, type, CorruptStatistics.fileHasCorruptStatistics(createdBy));
}

ColumnChunkMetaData buildColumnChunkMetaData(
      ColumnMetaData metaData, ColumnPath columnPath, PrimitiveType type, boolean fileHasCorruptStats) {
    SortOrder expectedOrder = overrideSortOrderToSigned(type) ? SortOrder.SIGNED : sortOrder(type);
    return ColumnChunkMetaData.get(...,
        fromParquetStatisticsInternal(metaData.statistics, type, expectedOrder, fileHasCorruptStats), ...);
}

No need to pass createdBy downstream, the boolean is all the internal overload needs. SortOrder computation moves here since we bypass fromParquetStatistics to avoid re-parsing createdBy as you have already done by replacing fromParquetStatisticsInternal with fromParquetStatistics.

Also notice how the new public methods we extracted in CorruptStatistics are being used in each delegate method here

ColumnMetaData metaData,
ColumnPath columnPath,
PrimitiveType type,
boolean fileHasCorruptStats) {
SortOrder typeSortOrder = overrideSortOrderToSigned(type) ? SortOrder.SIGNED : sortOrder(type);
return ColumnChunkMetaData.get(
columnPath,
type,
fromFormatCodec(metaData.codec),
convertEncodingStats(metaData.getEncoding_stats()),
fromFormatEncodings(metaData.encodings),
fromParquetStatistics(createdBy, metaData.statistics, type),
fromParquetStatisticsInternal(metaData.statistics, type, typeSortOrder, fileHasCorruptStats),
metaData.data_page_offset,
metaData.dictionary_page_offset,
metaData.num_values,
Expand Down Expand Up @@ -1829,6 +1849,11 @@ public ParquetMetadata fromParquetMetadata(
MessageType messageType = fromParquetSchema(parquetMetadata.getSchema(), parquetMetadata.getColumn_orders());
List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
List<RowGroup> row_groups = parquetMetadata.getRow_groups();
// Compute once per file: whether this file was written by a version with the PARQUET-251 bug.
// Only parse created_by if the schema has columns affected by the bug (BINARY/FIXED_LEN_BYTE_ARRAY).
// The per-column type check is applied later when statistics are actually read.
boolean fileHasCorruptStats = schemaHasCorruptStatisticsColumnType(messageType)
&& CorruptStatistics.fileHasCorruptStatistics(parquetMetadata.getCreated_by());

if (row_groups != null) {
for (RowGroup rowGroup : row_groups) {
Expand Down Expand Up @@ -1909,7 +1934,7 @@ public ParquetMetadata fromParquetMetadata(
metaData,
columnPath,
messageType.getType(columnPath.toArray()).asPrimitiveType(),
createdBy);
fileHasCorruptStats);
column.setRowGroupOrdinal(rowGroup.getOrdinal());
if (metaData.isSetBloom_filter_offset()) {
column.setBloomFilterOffset(metaData.getBloom_filter_offset());
Expand Down Expand Up @@ -1988,6 +2013,18 @@ private static ColumnPath getPath(ColumnMetaData metaData) {
return ColumnPath.get(path);
}

/**
* Returns true if the schema contains at least one column with a type affected by the PARQUET-251 bug.
*/
private static boolean schemaHasCorruptStatisticsColumnType(MessageType schema) {
for (ColumnDescriptor column : schema.getColumns()) {
if (CorruptStatistics.isCorruptStatisticsColumnType(column.getPrimitiveType().getPrimitiveTypeName())) {
return true;
}
}
return false;
}

// Visible for testing
MessageType fromParquetSchema(List<SchemaElement> schema, List<ColumnOrder> columnOrders) {
Iterator<SchemaElement> iterator = schema.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1977,4 +1977,105 @@ public void testEdgeInterpolationAlgorithmConversion() {
assertNull(ParquetMetadataConverter.fromParquetEdgeInterpolationAlgorithm(null));
assertNull(ParquetMetadataConverter.toParquetEdgeInterpolationAlgorithm(null));
}

@Test
public void testCorruptStatsPerColumnGate() {
// A created_by string from a version known to have the PARQUET-251 bug
String corruptCreatedBy = "parquet-mr version 1.6.0 (build abcd)";

// Set up legacy V1 statistics with min/max (not min_value/max_value)
org.apache.parquet.format.Statistics formatStats = new org.apache.parquet.format.Statistics();
byte[] minBytes = new byte[] {0, 1, 2, 3};
byte[] maxBytes = new byte[] {4, 5, 6, 7};
formatStats.setMin(minBytes);
formatStats.setMax(maxBytes);
formatStats.setNull_count(5);

// For BINARY column: stats should be ignored (only null_count preserved)
PrimitiveType binaryType = new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.BINARY, "bin_col");
Statistics binaryStats = ParquetMetadataConverter.fromParquetStatisticsInternal(
corruptCreatedBy, formatStats, binaryType, ParquetMetadataConverter.SortOrder.SIGNED);
assertFalse("BINARY min/max should be ignored for corrupt file", binaryStats.hasNonNullValue());
assertEquals(5, binaryStats.getNumNulls());

// For FIXED_LEN_BYTE_ARRAY column: stats should also be ignored
PrimitiveType fixedType =
new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.FIXED_LEN_BYTE_ARRAY, 4, "fixed_col");
Statistics fixedStats = ParquetMetadataConverter.fromParquetStatisticsInternal(
corruptCreatedBy, formatStats, fixedType, ParquetMetadataConverter.SortOrder.SIGNED);
assertFalse("FIXED_LEN_BYTE_ARRAY min/max should be ignored for corrupt file", fixedStats.hasNonNullValue());
assertEquals(5, fixedStats.getNumNulls());

// For INT32 column: stats should NOT be ignored (per-column gate)
PrimitiveType int32Type = new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.INT32, "int_col");
Statistics int32Stats = ParquetMetadataConverter.fromParquetStatisticsInternal(
corruptCreatedBy, formatStats, int32Type, ParquetMetadataConverter.SortOrder.SIGNED);
assertTrue("INT32 min/max should NOT be ignored for corrupt file", int32Stats.hasNonNullValue());
assertEquals(5, int32Stats.getNumNulls());

// For INT64 column: stats should NOT be ignored
org.apache.parquet.format.Statistics formatStatsLong = new org.apache.parquet.format.Statistics();
byte[] minLong = new byte[] {0, 0, 0, 0, 0, 0, 0, 1};
byte[] maxLong = new byte[] {0, 0, 0, 0, 0, 0, 0, 7};
formatStatsLong.setMin(minLong);
formatStatsLong.setMax(maxLong);
formatStatsLong.setNull_count(5);
PrimitiveType int64Type = new PrimitiveType(Repetition.OPTIONAL, PrimitiveTypeName.INT64, "long_col");
Statistics int64Stats = ParquetMetadataConverter.fromParquetStatisticsInternal(
corruptCreatedBy, formatStatsLong, int64Type, ParquetMetadataConverter.SortOrder.SIGNED);
assertTrue("INT64 min/max should NOT be ignored for corrupt file", int64Stats.hasNonNullValue());
assertEquals(5, int64Stats.getNumNulls());

// For a non-corrupt file, BINARY stats should be preserved
String goodCreatedBy = "parquet-mr version 1.8.0 (build abcd)";
Statistics binaryStatsGood = ParquetMetadataConverter.fromParquetStatisticsInternal(
goodCreatedBy, formatStats, binaryType, ParquetMetadataConverter.SortOrder.SIGNED);
assertTrue("BINARY min/max should be kept for non-corrupt file", binaryStatsGood.hasNonNullValue());
}

@Test
public void testSchemaGateSkipsCorruptStatsCheckForNonBinarySchema() throws Exception {
// A file with ONLY INT32/INT64 columns should never trigger corrupt stats detection,
// even with a known-corrupt createdBy string.
String corruptCreatedBy = "parquet-mr version 1.6.0 (build abcd)";

MessageType intOnlySchema = Types.buildMessage()
.required(PrimitiveTypeName.INT32).named("id")
.required(PrimitiveTypeName.INT64).named("ts")
.named("msg");

ParquetMetadataConverter converter = new ParquetMetadataConverter();
List<SchemaElement> schemaElements = converter.toParquetSchema(intOnlySchema);

// Build ColumnMetaData with V1 statistics for the INT32 column
org.apache.parquet.format.Statistics stats = new org.apache.parquet.format.Statistics();
stats.setMin(new byte[] {0, 0, 0, 1});
stats.setMax(new byte[] {0, 0, 0, 7});
stats.setNull_count(0);

ColumnMetaData cmd = new ColumnMetaData(
Type.INT32,
Collections.<org.apache.parquet.format.Encoding>emptyList(),
Collections.singletonList("id"),
UNCOMPRESSED,
100L, 200L, 100L, 0L);
cmd.setStatistics(stats);

ColumnChunk cc = new ColumnChunk(0L);
cc.setMeta_data(cmd);
RowGroup rg = new RowGroup(List.of(cc), 100L, 1);

FileMetaData fmd = new FileMetaData(1, schemaElements, 1, List.of(rg));
fmd.setCreated_by(corruptCreatedBy);

// Parse via fromParquetMetadata – the schema gate should prevent fileHasCorruptStats
ParquetMetadata metadata = converter.fromParquetMetadata(fmd);
Statistics<?> columnStats =
metadata.getBlocks().get(0).getColumns().get(0).getStatistics();

// Stats should be preserved (not ignored) because schema has no BINARY/FIXED columns
assertTrue(
"INT32 stats should be preserved when schema has no corrupt-stats-affected columns",
columnStats.hasNonNullValue());
}
}