diff --git a/parquet/file/column_writer.go b/parquet/file/column_writer.go index b9d2220c..e72c36d3 100644 --- a/parquet/file/column_writer.go +++ b/parquet/file/column_writer.go @@ -140,6 +140,7 @@ type columnWriter struct { totalCompressedBytes int64 closed bool fallbackToNonDict bool + dictPageWritten bool pages []DataPage @@ -264,8 +265,20 @@ func (w *columnWriter) commitWriteAndCheckPageLimit(numLevels, numValues int64) w.numBufferedValues += numLevels w.numDataValues += numValues - enc := w.currentEncoder.EstimatedDataEncodedSize() - if enc >= w.props.DataPageSize() { + // While dictionary encoding is active we size pages by raw input bytes + // instead of the RLE-indices' encoded size. Mirrors parquet-mr's + // FallbackValuesWriter.getBufferedSize — it keeps dict pages roughly + // the same raw-byte footprint as the PLAIN pages they'd otherwise be, + // which also pulls the first-page compression check into the same + // cadence parquet-mr uses and avoids committing dict pages that only + // look cheap because their RLE indices are tiny. + var bufferedSize int64 + if w.hasDict && !w.fallbackToNonDict { + bufferedSize = w.currentEncoder.(encoding.DictEncoder).ObservedRawSize() + } else { + bufferedSize = w.currentEncoder.EstimatedDataEncodedSize() + } + if bufferedSize >= w.props.DataPageSize() { return w.FlushCurrentPage() } return nil @@ -427,7 +440,14 @@ func (w *columnWriter) FlushBufferedDataPages() (err error) { return err } } + return w.drainBufferedDataPages() +} +// drainBufferedDataPages writes out and releases any pages buffered while +// dictionary encoding was active. Unlike FlushBufferedDataPages, it does not +// touch the current encoder's unflushed values, so the caller can re-encode +// them as PLAIN during a dictionary fallback. +func (w *columnWriter) drainBufferedDataPages() (err error) { for i, p := range w.pages { defer p.Release() if err = w.WriteDataPage(p); err != nil { @@ -502,6 +522,7 @@ func (w *columnWriter) WriteDictionaryPage() error { page := NewDictionaryPage(buffer, int32(dictEncoder.NumEntries()), w.props.DictionaryPageEncoding()) written, err := w.pager.WriteDictionaryPage(page) w.totalBytesWritten += written + w.dictPageWritten = err == nil return err } @@ -620,7 +641,14 @@ func (w *columnWriter) Close() (err error) { if w.rowsWritten > 0 && chunkStats.IsSet() { w.metaData.SetStats(chunkStats) } - err = w.pager.Close(w.hasDict, w.fallbackToNonDict) + // Only advertise PLAIN_DICTIONARY / fallback encodings in the column + // chunk's encoding list when a dictionary page was actually written. + // When fallback discards the dictionary before any dict-encoded page + // is flushed, the column contains only PLAIN data and the list should + // reflect that unambiguously. + advertiseDict := w.hasDict && w.dictPageWritten + advertiseFallback := w.fallbackToNonDict && w.dictPageWritten + err = w.pager.Close(advertiseDict, advertiseFallback) } return err } diff --git a/parquet/file/column_writer_test.go b/parquet/file/column_writer_test.go index 03dbd25e..65d5022f 100644 --- a/parquet/file/column_writer_test.go +++ b/parquet/file/column_writer_test.go @@ -427,39 +427,20 @@ func (p *PrimitiveWriterTestSuite) testDictionaryFallbackEncoding(version parque p.EqualValues(VeryLargeSize, valuesRead) p.Equal(p.Values, p.ValuesOut) + // With the parquet-mr-aligned fallback behavior, a dictionary that overflows + // before any dict-encoded data page has been flushed is discarded entirely. + // With the default page-size parameters used here the dict-encoded + // EstimatedDataEncodedSize (RLE indices) stays under DataPageSize right up + // to the overflow point, so no dict page and no dict-encoded data pages + // appear in the output — the column is pure PLAIN. encodings := p.metadataEncodings() - if p.Typ.Kind() == reflect.Bool || p.Typ == reflect.TypeOf(parquet.Int96{}) { - // dictionary encoding is not allowed for booleans - // there are 2 encodings (PLAIN, RLE) in a non dictionary encoding case - p.Equal([]parquet.Encoding{parquet.Encodings.Plain, parquet.Encodings.RLE}, encodings) - } else if version == parquet.V1_0 { - // There are 4 encodings (PLAIN_DICTIONARY, PLAIN, RLE, PLAIN) in a fallback case - // for version 1.0 - p.Equal([]parquet.Encoding{parquet.Encodings.PlainDict, parquet.Encodings.Plain, parquet.Encodings.RLE, parquet.Encodings.Plain}, encodings) - } else { - // There are 4 encodings (RLE_DICTIONARY, PLAIN, RLE, PLAIN) in a fallback case for - // version 2.0 - p.Equal([]parquet.Encoding{parquet.Encodings.RLEDict, parquet.Encodings.Plain, parquet.Encodings.RLE, parquet.Encodings.Plain}, encodings) - } + p.Equal([]parquet.Encoding{parquet.Encodings.Plain, parquet.Encodings.RLE}, encodings) encodingStats := p.metadataEncodingStats() - if p.Typ.Kind() == reflect.Bool || p.Typ == reflect.TypeOf(parquet.Int96{}) { - p.Equal(parquet.Encodings.Plain, encodingStats[0].Encoding) - p.Equal(format.PageType_DATA_PAGE, encodingStats[0].PageType) - } else if version == parquet.V1_0 { - expected := []metadata.PageEncodingStats{ - {Encoding: parquet.Encodings.PlainDict, PageType: format.PageType_DICTIONARY_PAGE}, - {Encoding: parquet.Encodings.Plain, PageType: format.PageType_DATA_PAGE}, - {Encoding: parquet.Encodings.PlainDict, PageType: format.PageType_DATA_PAGE}} - p.Equal(expected[0], encodingStats[0]) - p.ElementsMatch(expected[1:], encodingStats[1:]) - } else { - expected := []metadata.PageEncodingStats{ - {Encoding: parquet.Encodings.Plain, PageType: format.PageType_DICTIONARY_PAGE}, - {Encoding: parquet.Encodings.Plain, PageType: format.PageType_DATA_PAGE}, - {Encoding: parquet.Encodings.RLEDict, PageType: format.PageType_DATA_PAGE}} - p.Equal(expected[0], encodingStats[0]) - p.ElementsMatch(expected[1:], encodingStats[1:]) + p.NotEmpty(encodingStats) + for _, es := range encodingStats { + p.Equal(parquet.Encodings.Plain, es.Encoding) + p.Equal(format.PageType_DATA_PAGE, es.PageType) } } diff --git a/parquet/file/column_writer_types.gen.go b/parquet/file/column_writer_types.gen.go index e7f8c341..15a6d1e5 100644 --- a/parquet/file/column_writer_types.gen.go +++ b/parquet/file/column_writer_types.gen.go @@ -212,20 +212,63 @@ func (w *Int32ColumnChunkWriter) checkDictionarySizeLimit() { return } - if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { + dictEnc := w.currentEncoder.(encoding.DictEncoder) + if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { w.FallbackToPlain() + return } -} + // Before any dict-encoded data page has been cut, check whether the + // dictionary is actually saving space against a PLAIN baseline. Mirrors + // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary + // plus the encoded indices meet or exceed the raw input bytes, fall back + // to PLAIN now and discard the dictionary — avoiding the mid-cardinality + // case where a dict page stays in the file alongside PLAIN pages without + // any net compression win. + if !w.dictPageWritten && len(w.pages) == 0 { + rawSize := dictEnc.ObservedRawSize() + encodedSize := dictEnc.EstimatedDataEncodedSize() + dictSize := int64(dictEnc.DictEncodedSize()) + // For an all-nulls batch rawSize, dictSize, and encodedSize are all 0, + // so this 0 >= 0 path triggers. Harmless: nothing is encoded yet, so + // PLAIN vs dict doesn't matter and we avoid an empty dictionary. + if dictSize+encodedSize >= rawSize { + w.FallbackToPlain() + } + } +} + +// FallbackToPlain switches this column from dictionary to PLAIN encoding when +// the dictionary outgrows the configured page size limit. It mirrors +// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded +// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the +// dictionary page is emitted only if a dict-encoded data page had already +// been cut before the overflow — otherwise the dictionary is discarded. func (w *Int32ColumnChunkWriter) FallbackToPlain() { - if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { - w.WriteDictionaryPage() - w.FlushBufferedDataPages() - w.fallbackToNonDict = true - w.currentEncoder.Release() - w.currentEncoder = encoding.Int32EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) - w.encoding = parquet.Encodings.Plain + if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict { + return } + + dictEnc := w.currentEncoder.(encoding.DictEncoder) + plainEnc := encoding.Int32EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem).(encoding.Int32Encoder) + + if len(w.pages) > 0 { + if err := w.WriteDictionaryPage(); err != nil { + panic(err) + } + if err := w.drainBufferedDataPages(); err != nil { + panic(err) + } + } + + if err := dictEnc.FallBackTo(plainEnc); err != nil { + panic(err) + } + + dictEnc.Release() + w.currentEncoder = plainEnc + w.encoding = parquet.Encodings.Plain + w.fallbackToNonDict = true } // Int64ColumnChunkWriter is the typed interface for writing columns to a parquet @@ -410,20 +453,63 @@ func (w *Int64ColumnChunkWriter) checkDictionarySizeLimit() { return } - if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { + dictEnc := w.currentEncoder.(encoding.DictEncoder) + if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { w.FallbackToPlain() + return } -} + // Before any dict-encoded data page has been cut, check whether the + // dictionary is actually saving space against a PLAIN baseline. Mirrors + // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary + // plus the encoded indices meet or exceed the raw input bytes, fall back + // to PLAIN now and discard the dictionary — avoiding the mid-cardinality + // case where a dict page stays in the file alongside PLAIN pages without + // any net compression win. + if !w.dictPageWritten && len(w.pages) == 0 { + rawSize := dictEnc.ObservedRawSize() + encodedSize := dictEnc.EstimatedDataEncodedSize() + dictSize := int64(dictEnc.DictEncodedSize()) + // For an all-nulls batch rawSize, dictSize, and encodedSize are all 0, + // so this 0 >= 0 path triggers. Harmless: nothing is encoded yet, so + // PLAIN vs dict doesn't matter and we avoid an empty dictionary. + if dictSize+encodedSize >= rawSize { + w.FallbackToPlain() + } + } +} + +// FallbackToPlain switches this column from dictionary to PLAIN encoding when +// the dictionary outgrows the configured page size limit. It mirrors +// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded +// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the +// dictionary page is emitted only if a dict-encoded data page had already +// been cut before the overflow — otherwise the dictionary is discarded. func (w *Int64ColumnChunkWriter) FallbackToPlain() { - if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { - w.WriteDictionaryPage() - w.FlushBufferedDataPages() - w.fallbackToNonDict = true - w.currentEncoder.Release() - w.currentEncoder = encoding.Int64EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) - w.encoding = parquet.Encodings.Plain + if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict { + return } + + dictEnc := w.currentEncoder.(encoding.DictEncoder) + plainEnc := encoding.Int64EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem).(encoding.Int64Encoder) + + if len(w.pages) > 0 { + if err := w.WriteDictionaryPage(); err != nil { + panic(err) + } + if err := w.drainBufferedDataPages(); err != nil { + panic(err) + } + } + + if err := dictEnc.FallBackTo(plainEnc); err != nil { + panic(err) + } + + dictEnc.Release() + w.currentEncoder = plainEnc + w.encoding = parquet.Encodings.Plain + w.fallbackToNonDict = true } // Int96ColumnChunkWriter is the typed interface for writing columns to a parquet @@ -608,20 +694,63 @@ func (w *Int96ColumnChunkWriter) checkDictionarySizeLimit() { return } - if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { + dictEnc := w.currentEncoder.(encoding.DictEncoder) + if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { w.FallbackToPlain() + return } -} + // Before any dict-encoded data page has been cut, check whether the + // dictionary is actually saving space against a PLAIN baseline. Mirrors + // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary + // plus the encoded indices meet or exceed the raw input bytes, fall back + // to PLAIN now and discard the dictionary — avoiding the mid-cardinality + // case where a dict page stays in the file alongside PLAIN pages without + // any net compression win. + if !w.dictPageWritten && len(w.pages) == 0 { + rawSize := dictEnc.ObservedRawSize() + encodedSize := dictEnc.EstimatedDataEncodedSize() + dictSize := int64(dictEnc.DictEncodedSize()) + // For an all-nulls batch rawSize, dictSize, and encodedSize are all 0, + // so this 0 >= 0 path triggers. Harmless: nothing is encoded yet, so + // PLAIN vs dict doesn't matter and we avoid an empty dictionary. + if dictSize+encodedSize >= rawSize { + w.FallbackToPlain() + } + } +} + +// FallbackToPlain switches this column from dictionary to PLAIN encoding when +// the dictionary outgrows the configured page size limit. It mirrors +// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded +// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the +// dictionary page is emitted only if a dict-encoded data page had already +// been cut before the overflow — otherwise the dictionary is discarded. func (w *Int96ColumnChunkWriter) FallbackToPlain() { - if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { - w.WriteDictionaryPage() - w.FlushBufferedDataPages() - w.fallbackToNonDict = true - w.currentEncoder.Release() - w.currentEncoder = encoding.Int96EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) - w.encoding = parquet.Encodings.Plain + if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict { + return } + + dictEnc := w.currentEncoder.(encoding.DictEncoder) + plainEnc := encoding.Int96EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem).(encoding.Int96Encoder) + + if len(w.pages) > 0 { + if err := w.WriteDictionaryPage(); err != nil { + panic(err) + } + if err := w.drainBufferedDataPages(); err != nil { + panic(err) + } + } + + if err := dictEnc.FallBackTo(plainEnc); err != nil { + panic(err) + } + + dictEnc.Release() + w.currentEncoder = plainEnc + w.encoding = parquet.Encodings.Plain + w.fallbackToNonDict = true } // Float32ColumnChunkWriter is the typed interface for writing columns to a parquet @@ -806,20 +935,63 @@ func (w *Float32ColumnChunkWriter) checkDictionarySizeLimit() { return } - if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { + dictEnc := w.currentEncoder.(encoding.DictEncoder) + if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { w.FallbackToPlain() + return } -} + // Before any dict-encoded data page has been cut, check whether the + // dictionary is actually saving space against a PLAIN baseline. Mirrors + // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary + // plus the encoded indices meet or exceed the raw input bytes, fall back + // to PLAIN now and discard the dictionary — avoiding the mid-cardinality + // case where a dict page stays in the file alongside PLAIN pages without + // any net compression win. + if !w.dictPageWritten && len(w.pages) == 0 { + rawSize := dictEnc.ObservedRawSize() + encodedSize := dictEnc.EstimatedDataEncodedSize() + dictSize := int64(dictEnc.DictEncodedSize()) + // For an all-nulls batch rawSize, dictSize, and encodedSize are all 0, + // so this 0 >= 0 path triggers. Harmless: nothing is encoded yet, so + // PLAIN vs dict doesn't matter and we avoid an empty dictionary. + if dictSize+encodedSize >= rawSize { + w.FallbackToPlain() + } + } +} + +// FallbackToPlain switches this column from dictionary to PLAIN encoding when +// the dictionary outgrows the configured page size limit. It mirrors +// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded +// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the +// dictionary page is emitted only if a dict-encoded data page had already +// been cut before the overflow — otherwise the dictionary is discarded. func (w *Float32ColumnChunkWriter) FallbackToPlain() { - if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { - w.WriteDictionaryPage() - w.FlushBufferedDataPages() - w.fallbackToNonDict = true - w.currentEncoder.Release() - w.currentEncoder = encoding.Float32EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) - w.encoding = parquet.Encodings.Plain + if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict { + return } + + dictEnc := w.currentEncoder.(encoding.DictEncoder) + plainEnc := encoding.Float32EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem).(encoding.Float32Encoder) + + if len(w.pages) > 0 { + if err := w.WriteDictionaryPage(); err != nil { + panic(err) + } + if err := w.drainBufferedDataPages(); err != nil { + panic(err) + } + } + + if err := dictEnc.FallBackTo(plainEnc); err != nil { + panic(err) + } + + dictEnc.Release() + w.currentEncoder = plainEnc + w.encoding = parquet.Encodings.Plain + w.fallbackToNonDict = true } // Float64ColumnChunkWriter is the typed interface for writing columns to a parquet @@ -1004,20 +1176,63 @@ func (w *Float64ColumnChunkWriter) checkDictionarySizeLimit() { return } - if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { + dictEnc := w.currentEncoder.(encoding.DictEncoder) + if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { w.FallbackToPlain() + return } -} + // Before any dict-encoded data page has been cut, check whether the + // dictionary is actually saving space against a PLAIN baseline. Mirrors + // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary + // plus the encoded indices meet or exceed the raw input bytes, fall back + // to PLAIN now and discard the dictionary — avoiding the mid-cardinality + // case where a dict page stays in the file alongside PLAIN pages without + // any net compression win. + if !w.dictPageWritten && len(w.pages) == 0 { + rawSize := dictEnc.ObservedRawSize() + encodedSize := dictEnc.EstimatedDataEncodedSize() + dictSize := int64(dictEnc.DictEncodedSize()) + // For an all-nulls batch rawSize, dictSize, and encodedSize are all 0, + // so this 0 >= 0 path triggers. Harmless: nothing is encoded yet, so + // PLAIN vs dict doesn't matter and we avoid an empty dictionary. + if dictSize+encodedSize >= rawSize { + w.FallbackToPlain() + } + } +} + +// FallbackToPlain switches this column from dictionary to PLAIN encoding when +// the dictionary outgrows the configured page size limit. It mirrors +// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded +// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the +// dictionary page is emitted only if a dict-encoded data page had already +// been cut before the overflow — otherwise the dictionary is discarded. func (w *Float64ColumnChunkWriter) FallbackToPlain() { - if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { - w.WriteDictionaryPage() - w.FlushBufferedDataPages() - w.fallbackToNonDict = true - w.currentEncoder.Release() - w.currentEncoder = encoding.Float64EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) - w.encoding = parquet.Encodings.Plain + if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict { + return } + + dictEnc := w.currentEncoder.(encoding.DictEncoder) + plainEnc := encoding.Float64EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem).(encoding.Float64Encoder) + + if len(w.pages) > 0 { + if err := w.WriteDictionaryPage(); err != nil { + panic(err) + } + if err := w.drainBufferedDataPages(); err != nil { + panic(err) + } + } + + if err := dictEnc.FallBackTo(plainEnc); err != nil { + panic(err) + } + + dictEnc.Release() + w.currentEncoder = plainEnc + w.encoding = parquet.Encodings.Plain + w.fallbackToNonDict = true } // BooleanColumnChunkWriter is the typed interface for writing columns to a parquet @@ -1327,20 +1542,63 @@ func (w *BooleanColumnChunkWriter) checkDictionarySizeLimit() { return } - if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { + dictEnc := w.currentEncoder.(encoding.DictEncoder) + if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { w.FallbackToPlain() + return } -} + // Before any dict-encoded data page has been cut, check whether the + // dictionary is actually saving space against a PLAIN baseline. Mirrors + // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary + // plus the encoded indices meet or exceed the raw input bytes, fall back + // to PLAIN now and discard the dictionary — avoiding the mid-cardinality + // case where a dict page stays in the file alongside PLAIN pages without + // any net compression win. + if !w.dictPageWritten && len(w.pages) == 0 { + rawSize := dictEnc.ObservedRawSize() + encodedSize := dictEnc.EstimatedDataEncodedSize() + dictSize := int64(dictEnc.DictEncodedSize()) + // For an all-nulls batch rawSize, dictSize, and encodedSize are all 0, + // so this 0 >= 0 path triggers. Harmless: nothing is encoded yet, so + // PLAIN vs dict doesn't matter and we avoid an empty dictionary. + if dictSize+encodedSize >= rawSize { + w.FallbackToPlain() + } + } +} + +// FallbackToPlain switches this column from dictionary to PLAIN encoding when +// the dictionary outgrows the configured page size limit. It mirrors +// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded +// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the +// dictionary page is emitted only if a dict-encoded data page had already +// been cut before the overflow — otherwise the dictionary is discarded. func (w *BooleanColumnChunkWriter) FallbackToPlain() { - if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { - w.WriteDictionaryPage() - w.FlushBufferedDataPages() - w.fallbackToNonDict = true - w.currentEncoder.Release() - w.currentEncoder = encoding.BooleanEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) - w.encoding = parquet.Encodings.Plain + if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict { + return } + + dictEnc := w.currentEncoder.(encoding.DictEncoder) + plainEnc := encoding.BooleanEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem).(encoding.BooleanEncoder) + + if len(w.pages) > 0 { + if err := w.WriteDictionaryPage(); err != nil { + panic(err) + } + if err := w.drainBufferedDataPages(); err != nil { + panic(err) + } + } + + if err := dictEnc.FallBackTo(plainEnc); err != nil { + panic(err) + } + + dictEnc.Release() + w.currentEncoder = plainEnc + w.encoding = parquet.Encodings.Plain + w.fallbackToNonDict = true } // ByteArrayColumnChunkWriter is the typed interface for writing columns to a parquet @@ -1598,20 +1856,63 @@ func (w *ByteArrayColumnChunkWriter) checkDictionarySizeLimit() { return } - if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { + dictEnc := w.currentEncoder.(encoding.DictEncoder) + if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { w.FallbackToPlain() + return } -} + // Before any dict-encoded data page has been cut, check whether the + // dictionary is actually saving space against a PLAIN baseline. Mirrors + // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary + // plus the encoded indices meet or exceed the raw input bytes, fall back + // to PLAIN now and discard the dictionary — avoiding the mid-cardinality + // case where a dict page stays in the file alongside PLAIN pages without + // any net compression win. + if !w.dictPageWritten && len(w.pages) == 0 { + rawSize := dictEnc.ObservedRawSize() + encodedSize := dictEnc.EstimatedDataEncodedSize() + dictSize := int64(dictEnc.DictEncodedSize()) + // For an all-nulls batch rawSize, dictSize, and encodedSize are all 0, + // so this 0 >= 0 path triggers. Harmless: nothing is encoded yet, so + // PLAIN vs dict doesn't matter and we avoid an empty dictionary. + if dictSize+encodedSize >= rawSize { + w.FallbackToPlain() + } + } +} + +// FallbackToPlain switches this column from dictionary to PLAIN encoding when +// the dictionary outgrows the configured page size limit. It mirrors +// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded +// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the +// dictionary page is emitted only if a dict-encoded data page had already +// been cut before the overflow — otherwise the dictionary is discarded. func (w *ByteArrayColumnChunkWriter) FallbackToPlain() { - if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { - w.WriteDictionaryPage() - w.FlushBufferedDataPages() - w.fallbackToNonDict = true - w.currentEncoder.Release() - w.currentEncoder = encoding.ByteArrayEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) - w.encoding = parquet.Encodings.Plain + if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict { + return } + + dictEnc := w.currentEncoder.(encoding.DictEncoder) + plainEnc := encoding.ByteArrayEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem).(encoding.ByteArrayEncoder) + + if len(w.pages) > 0 { + if err := w.WriteDictionaryPage(); err != nil { + panic(err) + } + if err := w.drainBufferedDataPages(); err != nil { + panic(err) + } + } + + if err := dictEnc.FallBackTo(plainEnc); err != nil { + panic(err) + } + + dictEnc.Release() + w.currentEncoder = plainEnc + w.encoding = parquet.Encodings.Plain + w.fallbackToNonDict = true } // FixedLenByteArrayColumnChunkWriter is the typed interface for writing columns to a parquet @@ -1877,20 +2178,63 @@ func (w *FixedLenByteArrayColumnChunkWriter) checkDictionarySizeLimit() { return } - if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { + dictEnc := w.currentEncoder.(encoding.DictEncoder) + if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { w.FallbackToPlain() + return } -} + // Before any dict-encoded data page has been cut, check whether the + // dictionary is actually saving space against a PLAIN baseline. Mirrors + // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary + // plus the encoded indices meet or exceed the raw input bytes, fall back + // to PLAIN now and discard the dictionary — avoiding the mid-cardinality + // case where a dict page stays in the file alongside PLAIN pages without + // any net compression win. + if !w.dictPageWritten && len(w.pages) == 0 { + rawSize := dictEnc.ObservedRawSize() + encodedSize := dictEnc.EstimatedDataEncodedSize() + dictSize := int64(dictEnc.DictEncodedSize()) + // For an all-nulls batch rawSize, dictSize, and encodedSize are all 0, + // so this 0 >= 0 path triggers. Harmless: nothing is encoded yet, so + // PLAIN vs dict doesn't matter and we avoid an empty dictionary. + if dictSize+encodedSize >= rawSize { + w.FallbackToPlain() + } + } +} + +// FallbackToPlain switches this column from dictionary to PLAIN encoding when +// the dictionary outgrows the configured page size limit. It mirrors +// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded +// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the +// dictionary page is emitted only if a dict-encoded data page had already +// been cut before the overflow — otherwise the dictionary is discarded. func (w *FixedLenByteArrayColumnChunkWriter) FallbackToPlain() { - if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { - w.WriteDictionaryPage() - w.FlushBufferedDataPages() - w.fallbackToNonDict = true - w.currentEncoder.Release() - w.currentEncoder = encoding.FixedLenByteArrayEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) - w.encoding = parquet.Encodings.Plain + if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict { + return } + + dictEnc := w.currentEncoder.(encoding.DictEncoder) + plainEnc := encoding.FixedLenByteArrayEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem).(encoding.FixedLenByteArrayEncoder) + + if len(w.pages) > 0 { + if err := w.WriteDictionaryPage(); err != nil { + panic(err) + } + if err := w.drainBufferedDataPages(); err != nil { + panic(err) + } + } + + if err := dictEnc.FallBackTo(plainEnc); err != nil { + panic(err) + } + + dictEnc.Release() + w.currentEncoder = plainEnc + w.encoding = parquet.Encodings.Plain + w.fallbackToNonDict = true } // NewColumnChunkWriter constructs a column writer of the appropriate type by using the metadata builder diff --git a/parquet/file/column_writer_types.gen.go.tmpl b/parquet/file/column_writer_types.gen.go.tmpl index 9a22bb71..6cee1bad 100644 --- a/parquet/file/column_writer_types.gen.go.tmpl +++ b/parquet/file/column_writer_types.gen.go.tmpl @@ -481,20 +481,63 @@ func (w *{{.Name}}ColumnChunkWriter) checkDictionarySizeLimit() { return } - if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { + dictEnc := w.currentEncoder.(encoding.DictEncoder) + if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { w.FallbackToPlain() + return + } + + // Before any dict-encoded data page has been cut, check whether the + // dictionary is actually saving space against a PLAIN baseline. Mirrors + // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary + // plus the encoded indices meet or exceed the raw input bytes, fall back + // to PLAIN now and discard the dictionary — avoiding the mid-cardinality + // case where a dict page stays in the file alongside PLAIN pages without + // any net compression win. + if !w.dictPageWritten && len(w.pages) == 0 { + rawSize := dictEnc.ObservedRawSize() + encodedSize := dictEnc.EstimatedDataEncodedSize() + dictSize := int64(dictEnc.DictEncodedSize()) + // For an all-nulls batch rawSize, dictSize, and encodedSize are all 0, + // so this 0 >= 0 path triggers. Harmless: nothing is encoded yet, so + // PLAIN vs dict doesn't matter and we avoid an empty dictionary. + if dictSize+encodedSize >= rawSize { + w.FallbackToPlain() + } } } +// FallbackToPlain switches this column from dictionary to PLAIN encoding when +// the dictionary outgrows the configured page size limit. It mirrors +// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded +// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the +// dictionary page is emitted only if a dict-encoded data page had already +// been cut before the overflow — otherwise the dictionary is discarded. func (w *{{.Name}}ColumnChunkWriter) FallbackToPlain() { - if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { - w.WriteDictionaryPage() - w.FlushBufferedDataPages() - w.fallbackToNonDict = true - w.currentEncoder.Release() - w.currentEncoder = encoding.{{.Name}}EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) - w.encoding = parquet.Encodings.Plain + if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict { + return } + + dictEnc := w.currentEncoder.(encoding.DictEncoder) + plainEnc := encoding.{{.Name}}EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem).(encoding.{{.Name}}Encoder) + + if len(w.pages) > 0 { + if err := w.WriteDictionaryPage(); err != nil { + panic(err) + } + if err := w.drainBufferedDataPages(); err != nil { + panic(err) + } + } + + if err := dictEnc.FallBackTo(plainEnc); err != nil { + panic(err) + } + + dictEnc.Release() + w.currentEncoder = plainEnc + w.encoding = parquet.Encodings.Plain + w.fallbackToNonDict = true } {{end}} diff --git a/parquet/file/dict_fallback_repro_test.go b/parquet/file/dict_fallback_repro_test.go new file mode 100644 index 00000000..f82b3253 --- /dev/null +++ b/parquet/file/dict_fallback_repro_test.go @@ -0,0 +1,372 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package file_test + +import ( + "bytes" + "fmt" + "testing" + + "github.com/apache/arrow-go/v18/parquet" + "github.com/apache/arrow-go/v18/parquet/compress" + "github.com/apache/arrow-go/v18/parquet/file" + format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet" + "github.com/apache/arrow-go/v18/parquet/schema" + "github.com/stretchr/testify/require" +) + +// TestDictFallbackDiscardsOrphanDict verifies parquet-mr parity: when dict +// encoding overflows before any dict-encoded data page has been flushed, the +// dictionary is discarded and the buffered indices are re-encoded as PLAIN. +// The resulting chunk must therefore contain zero dict bytes and match a +// dict-disabled baseline to within the tiny delta from differences in how +// rows are batched across pages. +func TestDictFallbackDiscardsOrphanDict(t *testing.T) { + cases := []struct { + name string + version parquet.Version + codec compress.Compression + }{ + {"v1/uncompressed", parquet.V1_0, compress.Codecs.Uncompressed}, + {"v2/uncompressed", parquet.V2_LATEST, compress.Codecs.Uncompressed}, + {"v1/snappy", parquet.V1_0, compress.Codecs.Snappy}, + {"v2/snappy", parquet.V2_LATEST, compress.Codecs.Snappy}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + runDictFallbackNoDictPageCheck(t, tc.version, tc.codec) + }) + } +} + +func runDictFallbackNoDictPageCheck(t *testing.T, version parquet.Version, codec compress.Compression) { + const ( + numValues = 30000 + valueWidth = 32 + dictPageSizeLimit = 256 * 1024 + dataPageSize = 128 * 1024 + ) + + values := highCardinalityStrings(numValues, valueWidth) + knobs := writerKnobs{ + dictPageSizeLimit: dictPageSizeLimit, + dataPageSize: dataPageSize, + codec: codec, + } + + knobsOn := knobs + knobsOn.dictEnabled = true + _, dictOn := writeByteArrayColumn(t, values, version, knobsOn) + + knobsOff := knobs + knobsOff.dictEnabled = false + _, dictOff := writeByteArrayColumn(t, values, version, knobsOff) + + t.Logf("version=%s codec=%s numValues=%d valueWidth=%d (raw≈%d KiB)", + version, codec, numValues, valueWidth, numValues*valueWidth/1024) + t.Logf("dict=ON chunk.compressed=%d chunk.uncompressed=%d", dictOn.totalCompressed, dictOn.totalUncompressed) + t.Logf("dict=OFF chunk.compressed=%d chunk.uncompressed=%d", dictOff.totalCompressed, dictOff.totalUncompressed) + t.Logf("dict=ON encodings: %v", dictOn.encodings) + t.Logf("dict=ON per-page breakdown:") + for _, p := range dictOn.pages { + t.Logf(" %-18s encoding=%-16s numValues=%-6d compressed=%d B", + p.pageType, p.encoding, p.numValues, p.compressed) + } + + require.Falsef(t, dictOn.hasDictionaryPage, + "dictionary page should have been discarded; chunk=%d", dictOn.totalCompressed) + require.Zero(t, + dictOn.countDataPagesByEncoding(parquet.Encodings.PlainDict)+ + dictOn.countDataPagesByEncoding(parquet.Encodings.RLEDict), + "no dict-encoded data pages expected after discard; encodings=%v", dictOn.encodings) + require.NotContains(t, dictOn.encodings, parquet.Encodings.PlainDict, + "encoding list should not advertise PLAIN_DICTIONARY when no dict page exists") + require.NotContains(t, dictOn.encodings, parquet.Encodings.RLEDict, + "encoding list should not advertise RLE_DICTIONARY when no dict page exists") + + // With the fix, dict=on should match dict=off to within the rounding from + // first-page size differences (FallbackToPlain re-seeds the plain encoder + // with all buffered values, producing one oversized first page). + diff := dictOn.totalCompressed - dictOff.totalCompressed + t.Logf("size delta dict=on vs dict=off: %d B", diff) + require.LessOrEqualf(t, absInt64(diff), int64(valueWidth*2048), + "post-fix: dict=on should closely match dict=off, got delta=%d", diff) +} + +// TestDictFallbackKeepsDictWhenAlreadyFlushed covers the case where some +// dict-encoded data pages have already been emitted before the overflow — +// those must be preserved alongside the dict page, and the remainder of the +// column must switch to PLAIN. Matches parquet-mr's "initialUsedAndHadDictionary" +// path. +func TestDictFallbackKeepsDictWhenAlreadyFlushed(t *testing.T) { + const ( + valueWidth = 32 + dictPageSizeLimit = 8 * 1024 + dataPageSize = 4 * 1024 + ) + + // Mixed-cardinality data: a low-card prefix big enough that several + // dict-encoded data pages are flushed (and pass the first-page + // compression check) before a high-card tail overflows the dict and + // triggers fallback. The committed dict pages must be kept and the + // dictionary page preserved. + lowCard := make([]parquet.ByteArray, 5000) + for i := range lowCard { + lowCard[i] = parquet.ByteArray(fmt.Sprintf("cat_%02d%*s", i%16, valueWidth-8, "")) + } + highCard := highCardinalityStrings(5000, valueWidth) + values := append(lowCard, highCard...) + + _, chunk := writeByteArrayColumn(t, values, parquet.V1_0, writerKnobs{ + dictEnabled: true, + dictPageSizeLimit: dictPageSizeLimit, + dataPageSize: dataPageSize, + codec: compress.Codecs.Uncompressed, + }) + + t.Logf("encodings: %v hasDictPage=%v totalCompressed=%d", + chunk.encodings, chunk.hasDictionaryPage, chunk.totalCompressed) + + require.True(t, chunk.hasDictionaryPage, + "dict page must be kept when dict-encoded data pages were already flushed") + + dictPages := chunk.countDataPagesByEncoding(parquet.Encodings.PlainDict) + + chunk.countDataPagesByEncoding(parquet.Encodings.RLEDict) + plainPages := chunk.countDataPagesByEncoding(parquet.Encodings.Plain) + require.Greater(t, dictPages, 0, "expected dict-encoded data pages to survive fallback") + require.Greater(t, plainPages, 0, "expected PLAIN data pages after fallback") + + // All dict-encoded data pages must come before any PLAIN page — otherwise + // the dictionary offset in the file footer would reference data pages + // that can't be decoded against it. + sawPlain := false + for _, p := range chunk.pages { + if p.pageType != format.PageType_DATA_PAGE && p.pageType != format.PageType_DATA_PAGE_V2 { + continue + } + switch p.encoding { + case parquet.Encodings.Plain: + sawPlain = true + case parquet.Encodings.PlainDict, parquet.Encodings.RLEDict: + require.False(t, sawPlain, + "dict-encoded data page appeared after a PLAIN page — wrong ordering") + } + } +} + +func absInt64(v int64) int64 { + if v < 0 { + return -v + } + return v +} + +// TestDictFallbackMidCardinality exercises the case the iceberg-go TPC-DS +// benchmark flagged: mid-cardinality columns (ss_list_price etc.) where the +// dictionary grows slowly enough to pass the first-page compression check, +// eventually overflows, and leaves a dict page + dict-encoded pages + PLAIN +// pages stranded in one chunk. With dict-mode pages sized by raw bytes +// (matching parquet-mr), overflow happens at a similar cadence to plain +// encoding, so dict=on either stays competitive with dict=off or the +// committed dict pages are genuinely earning their keep. +func TestDictFallbackMidCardinality(t *testing.T) { + const ( + numRows = 200000 + valueWidth = 8 + distinctValueCount = 20000 + dictPageSizeLimit = 128 * 1024 + dataPageSize = 128 * 1024 + ) + + values := midCardinalityStrings(numRows, distinctValueCount, valueWidth) + + _, dictOn := writeByteArrayColumn(t, values, parquet.V1_0, writerKnobs{ + dictEnabled: true, + dictPageSizeLimit: dictPageSizeLimit, + dataPageSize: dataPageSize, + codec: compress.Codecs.Snappy, + }) + _, dictOff := writeByteArrayColumn(t, values, parquet.V1_0, writerKnobs{ + dictEnabled: false, + dataPageSize: dataPageSize, + codec: compress.Codecs.Snappy, + }) + + t.Logf("mid-card: distinct=%d rows=%d width=%d", distinctValueCount, numRows, valueWidth) + t.Logf("dict=ON compressed=%d encodings=%v", dictOn.totalCompressed, dictOn.encodings) + t.Logf("dict=OFF compressed=%d encodings=%v", dictOff.totalCompressed, dictOff.encodings) + + // The specific assertion: dict=on must not regress against dict=off by + // more than a small constant. Before the raw-byte page cadence, this + // scenario produced the 4-entry encoding layout and 20-30% bloat. + require.LessOrEqualf(t, + dictOn.totalCompressed, dictOff.totalCompressed+int64(numRows)/10, + "dict=on (%d) must not balloon against dict=off (%d) on mid-card data", + dictOn.totalCompressed, dictOff.totalCompressed) +} + +func midCardinalityStrings(n, distinct, width int) []parquet.ByteArray { + pool := make([]parquet.ByteArray, distinct) + for i := range pool { + b := make([]byte, width) + for j := range width { + b[j] = byte('A' + (i+j*7)%26) + } + tail := fmt.Appendf(nil, "-%05d", i) + copy(b[width-len(tail):], tail) + pool[i] = parquet.ByteArray(b) + } + out := make([]parquet.ByteArray, n) + for i := range out { + // Deterministic, roughly uniform distribution across the pool. + out[i] = pool[int64(i)*2654435761%int64(distinct)] + } + return out +} + +type writerKnobs struct { + dictEnabled bool + dictPageSizeLimit int64 + dataPageSize int64 + codec compress.Compression +} + +type pageInfo struct { + pageType format.PageType + encoding parquet.Encoding + numValues int32 + compressed int +} + +type chunkSummary struct { + totalCompressed int64 + totalUncompressed int64 + encodings []parquet.Encoding + encodingStats []pageEncodingStatsEntry + pages []pageInfo + hasDictionaryPage bool +} + +type pageEncodingStatsEntry struct { + PageType format.PageType + Encoding parquet.Encoding +} + +func (c *chunkSummary) countDataPagesByEncoding(enc parquet.Encoding) int { + n := 0 + for _, p := range c.pages { + if p.encoding == enc && (p.pageType == format.PageType_DATA_PAGE || p.pageType == format.PageType_DATA_PAGE_V2) { + n++ + } + } + return n +} + +func writeByteArrayColumn(t *testing.T, values []parquet.ByteArray, version parquet.Version, knobs writerKnobs) ([]byte, *chunkSummary) { + t.Helper() + + root := mustByteArraySchema(t) + opts := []parquet.WriterProperty{ + parquet.WithVersion(version), + parquet.WithDictionaryDefault(knobs.dictEnabled), + parquet.WithCompression(knobs.codec), + } + if knobs.dictPageSizeLimit > 0 { + opts = append(opts, parquet.WithDictionaryPageSizeLimit(knobs.dictPageSizeLimit)) + } + if knobs.dataPageSize > 0 { + opts = append(opts, parquet.WithDataPageSize(knobs.dataPageSize)) + } + if version == parquet.V2_LATEST { + opts = append(opts, parquet.WithDataPageVersion(parquet.DataPageV2)) + } + props := parquet.NewWriterProperties(opts...) + + var buf bytes.Buffer + w := file.NewParquetWriter(&buf, root, file.WithWriterProps(props)) + rgw := w.AppendRowGroup() + cw, err := rgw.NextColumn() + require.NoError(t, err) + _, err = cw.(*file.ByteArrayColumnChunkWriter).WriteBatch(values, nil, nil) + require.NoError(t, err) + require.NoError(t, cw.Close()) + require.NoError(t, rgw.Close()) + require.NoError(t, w.Close()) + + return buf.Bytes(), summarizeFirstColumnChunk(t, buf.Bytes()) +} + +func summarizeFirstColumnChunk(t *testing.T, raw []byte) *chunkSummary { + t.Helper() + + r, err := file.NewParquetReader(bytes.NewReader(raw)) + require.NoError(t, err) + defer r.Close() + + md, err := r.MetaData().RowGroup(0).ColumnChunk(0) + require.NoError(t, err) + s := &chunkSummary{ + totalCompressed: md.TotalCompressedSize(), + totalUncompressed: md.TotalUncompressedSize(), + encodings: md.Encodings(), + hasDictionaryPage: md.HasDictionaryPage(), + } + for _, es := range md.EncodingStats() { + s.encodingStats = append(s.encodingStats, pageEncodingStatsEntry{ + PageType: es.PageType, + Encoding: es.Encoding, + }) + } + + rg := r.RowGroup(0) + pr, err := rg.GetColumnPageReader(0) + require.NoError(t, err) + for pr.Next() { + page := pr.Page() + s.pages = append(s.pages, pageInfo{ + pageType: format.PageType(page.Type()), + encoding: parquet.Encoding(page.Encoding()), + numValues: page.NumValues(), + compressed: len(page.Data()), + }) + } + require.NoError(t, pr.Err()) + return s +} + +func mustByteArraySchema(t *testing.T) *schema.GroupNode { + t.Helper() + root, err := schema.NewGroupNode("schema", parquet.Repetitions.Required, schema.FieldList{ + schema.NewByteArrayNode("v", parquet.Repetitions.Required, -1), + }, -1) + require.NoError(t, err) + return root +} + +func highCardinalityStrings(n, width int) []parquet.ByteArray { + out := make([]parquet.ByteArray, n) + for i := range out { + b := make([]byte, width) + for j := range width { + b[j] = byte('A' + (i+j*31)%26) + } + tail := fmt.Appendf(nil, "-%07d", i) + copy(b[width-len(tail):], tail) + out[i] = parquet.ByteArray(b) + } + return out +} diff --git a/parquet/internal/encoding/byte_array_encoder.go b/parquet/internal/encoding/byte_array_encoder.go index d0439faf..397a18a7 100644 --- a/parquet/internal/encoding/byte_array_encoder.go +++ b/parquet/internal/encoding/byte_array_encoder.go @@ -24,6 +24,7 @@ import ( "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/internal/bitutils" + "github.com/apache/arrow-go/v18/internal/hashing" "github.com/apache/arrow-go/v18/internal/utils" "github.com/apache/arrow-go/v18/parquet" ) @@ -103,6 +104,7 @@ func (enc *DictByteArrayEncoder) PutByteArray(in parquet.ByteArray) { enc.dictEncodedSize += in.Len() + arrow.Uint32SizeBytes } enc.addIndex(memoIdx) + enc.AddRawSize(int64(in.Len() + arrow.Uint32SizeBytes)) } // Put takes a slice of ByteArrays to add and encode. @@ -127,6 +129,23 @@ func (enc *DictByteArrayEncoder) NormalizeDict(values arrow.Array) (arrow.Array, return values, nil } +// FallBackTo drains buffered indices through the dictionary into the +// fallback plain encoder and clears the index buffer. +func (enc *DictByteArrayEncoder) FallBackTo(fallback TypedEncoder) error { + target, ok := fallback.(ByteArrayEncoder) + if !ok { + return fmt.Errorf("parquet: dict fallback target encoder has wrong element type") + } + bm := enc.memo.(*hashing.BinaryMemoTable) + vals := make([]parquet.ByteArray, len(enc.idxValues)) + for i, idx := range enc.idxValues { + vals[i] = parquet.ByteArray(bm.Value(int(idx))) + } + target.Put(vals) + enc.idxValues = enc.idxValues[:0] + return nil +} + // PutDictionary allows pre-seeding a dictionary encoder with // a dictionary from an Arrow Array. // diff --git a/parquet/internal/encoding/encoder.go b/parquet/internal/encoding/encoder.go index 2f1d698f..64a992e7 100644 --- a/parquet/internal/encoding/encoder.go +++ b/parquet/internal/encoding/encoder.go @@ -114,6 +114,13 @@ type dictEncoder struct { idxValues []int32 memo MemoTable + // rawDataSize is the number of bytes of input values observed since + // the last page flush. Mirrors parquet-mr's rawDataByteSize and is + // consulted by FlushCurrentPage to decide whether the dictionary is + // actually compressing (dict + indices < raw) before committing the + // first dict-encoded data page. + rawDataSize int64 + preservedDict arrow.Array } @@ -134,6 +141,7 @@ func (d *dictEncoder) Reset() { d.dictEncodedSize = 0 d.idxValues = d.idxValues[:0] d.idxBuffer.ResizeNoShrink(0) + d.rawDataSize = 0 d.memo.Reset() if d.preservedDict != nil { d.preservedDict.Release() @@ -141,6 +149,17 @@ func (d *dictEncoder) Reset() { } } +// ObservedRawSize returns the number of raw input bytes accumulated since +// the last data page flush. Used with DictEncodedSize and +// EstimatedDataEncodedSize to evaluate whether dictionary encoding is +// actually saving space on the first page. +func (d *dictEncoder) ObservedRawSize() int64 { return d.rawDataSize } + +// AddRawSize is used by per-type dict encoders to accumulate the raw-bytes +// total as values are written. Exported at package scope because the typed +// encoders live in different files within this package. +func (d *dictEncoder) AddRawSize(n int64) { d.rawDataSize += n } + func (d *dictEncoder) Release() { d.encoder.Release() d.idxBuffer.Release() @@ -285,6 +304,7 @@ func (d *dictEncoder) WriteIndices(out []byte) (int, error) { nbytes := enc.Flush() d.idxValues = d.idxValues[:0] + d.rawDataSize = 0 return nbytes + 1, nil } diff --git a/parquet/internal/encoding/fixed_len_byte_array_encoder.go b/parquet/internal/encoding/fixed_len_byte_array_encoder.go index 0f4345d2..854b8b8a 100644 --- a/parquet/internal/encoding/fixed_len_byte_array_encoder.go +++ b/parquet/internal/encoding/fixed_len_byte_array_encoder.go @@ -21,6 +21,7 @@ import ( "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/internal/bitutils" + "github.com/apache/arrow-go/v18/internal/hashing" "github.com/apache/arrow-go/v18/parquet" ) @@ -136,6 +137,7 @@ func (enc *DictFixedLenByteArrayEncoder) Put(in []parquet.FixedLenByteArray) { } enc.addIndex(memoIdx) } + enc.AddRawSize(int64(len(in)) * int64(enc.typeLen)) } // PutSpaced is like Put but leaves space for nulls @@ -151,6 +153,23 @@ func (enc *DictFixedLenByteArrayEncoder) NormalizeDict(values arrow.Array) (arro return values, nil } +// FallBackTo drains buffered indices through the dictionary into the +// fallback plain encoder and clears the index buffer. +func (enc *DictFixedLenByteArrayEncoder) FallBackTo(fallback TypedEncoder) error { + target, ok := fallback.(FixedLenByteArrayEncoder) + if !ok { + return fmt.Errorf("parquet: dict fallback target encoder has wrong element type") + } + bm := enc.memo.(*hashing.BinaryMemoTable) + vals := make([]parquet.FixedLenByteArray, len(enc.idxValues)) + for i, idx := range enc.idxValues { + vals[i] = parquet.FixedLenByteArray(bm.Value(int(idx))) + } + target.Put(vals) + enc.idxValues = enc.idxValues[:0] + return nil +} + // PutDictionary allows pre-seeding a dictionary encoder with // a dictionary from an Arrow Array. // diff --git a/parquet/internal/encoding/typed_encoder.go b/parquet/internal/encoding/typed_encoder.go index 1cba18da..82bb6fc9 100644 --- a/parquet/internal/encoding/typed_encoder.go +++ b/parquet/internal/encoding/typed_encoder.go @@ -26,6 +26,7 @@ import ( "github.com/apache/arrow-go/v18/arrow/compute" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/apache/arrow-go/v18/internal/bitutils" + "github.com/apache/arrow-go/v18/internal/hashing" shared_utils "github.com/apache/arrow-go/v18/internal/utils" "github.com/apache/arrow-go/v18/parquet" format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet" @@ -144,6 +145,7 @@ func (enc *typedDictEncoder[T]) Put(in []T) { for _, val := range in { enc.dictEncoder.Put(val) } + enc.AddRawSize(int64(len(in)) * int64(unsafe.Sizeof(T(0)))) } func (enc *typedDictEncoder[T]) PutSpaced(in []T, validBits []byte, validBitsOffset int64) { @@ -158,6 +160,22 @@ type arrvalues[T arrow.ValueType] interface { Values() []T } +func (enc *typedDictEncoder[T]) FallBackTo(fallback TypedEncoder) error { + target, ok := fallback.(Encoder[T]) + if !ok { + return fmt.Errorf("parquet: dict fallback target encoder has wrong element type") + } + dict := make([]T, enc.memo.Size()) + enc.memo.CopyValues(dict) + vals := make([]T, len(enc.idxValues)) + for i, idx := range enc.idxValues { + vals[i] = dict[idx] + } + target.Put(vals) + enc.idxValues = enc.idxValues[:0] + return nil +} + func (enc *typedDictEncoder[T]) NormalizeDict(values arrow.Array) (arrow.Array, error) { if _, ok := values.(arrvalues[T]); ok { values.Retain() @@ -427,6 +445,7 @@ func (enc *DictInt96Encoder) Put(in []parquet.Int96) { } enc.addIndex(memoIdx) } + enc.AddRawSize(int64(len(in)) * int64(parquet.Int96SizeBytes)) } // PutSpaced is like Put but assumes space for nulls @@ -446,6 +465,24 @@ func (enc *DictInt96Encoder) PutDictionary(arrow.Array) error { return fmt.Errorf("%w: direct PutDictionary to Int96", arrow.ErrNotImplemented) } +// FallBackTo drains buffered indices through the dictionary into the +// fallback plain encoder and clears the index buffer. +func (enc *DictInt96Encoder) FallBackTo(fallback TypedEncoder) error { + target, ok := fallback.(Int96Encoder) + if !ok { + return fmt.Errorf("parquet: dict fallback target encoder has wrong element type") + } + bm := enc.memo.(*hashing.BinaryMemoTable) + vals := make([]parquet.Int96, len(enc.idxValues)) + for i, idx := range enc.idxValues { + v := bm.Value(int(idx)) + copy(vals[i][:], v) + } + target.Put(vals) + enc.idxValues = enc.idxValues[:0] + return nil +} + // the boolEncoderTraits struct is used to make it easy to create encoders and decoders based on type type boolEncoderTraits struct{} diff --git a/parquet/internal/encoding/types.go b/parquet/internal/encoding/types.go index a2f1a5b2..7c4015df 100644 --- a/parquet/internal/encoding/types.go +++ b/parquet/internal/encoding/types.go @@ -119,6 +119,20 @@ type DictEncoder interface { // // The returned array must always be released by the caller. NormalizeDict(arrow.Array) (arrow.Array, error) + // FallBackTo translates the buffered indices back through the dictionary + // and puts the raw values into the fallback encoder, clearing the dict + // encoder's index buffer. Mirrors parquet-mr's + // RequiresFallback.fallBackAllValuesTo: used by column writers when the + // dictionary overflows mid-chunk so already-buffered values can be + // re-encoded with the fallback (PLAIN) encoder instead of being emitted + // as a stranded dict-encoded page. + FallBackTo(fallback TypedEncoder) error + // ObservedRawSize returns the raw input byte count accumulated since + // the last page flush. Used alongside DictEncodedSize and + // EstimatedDataEncodedSize to decide whether dictionary encoding is + // actually saving space before committing the first dict data page — + // mirrors parquet-mr's rawDataByteSize. + ObservedRawSize() int64 } var bufferPool = sync.Pool{ diff --git a/parquet/pqarrow/encode_dictionary_test.go b/parquet/pqarrow/encode_dictionary_test.go index 79d3147d..e92b2587 100644 --- a/parquet/pqarrow/encode_dictionary_test.go +++ b/parquet/pqarrow/encode_dictionary_test.go @@ -188,7 +188,6 @@ func (ad *ArrowWriteDictionarySuite) TestStatisticsWithFallback() { {0, 1}, {0, 0}, {3}} - expectedDictCounts := []int32{4, 4, 4, 3} // pairs of (min, max) expectedMinMax := [][2]string{ {"a", "b"}, @@ -266,22 +265,23 @@ func (ad *ArrowWriteDictionarySuite) TestStatisticsWithFallback() { for rowGroup := 0; rowGroup < 2; rowGroup++ { pr, err := rdr.RowGroup(0).GetColumnPageReader(0) ad.Require().NoError(err) - ad.True(pr.Next()) - page := pr.Page() - ad.NotNil(page) - ad.NoError(pr.Err()) - ad.Require().IsType((*file.DictionaryPage)(nil), page) - dictPage := page.(*file.DictionaryPage) - ad.EqualValues(expectedDictCounts[caseIndex], dictPage.NumValues()) + + // pqarrow falls back to PLAIN whenever the arrow dict has + // duplicates, and does so before any dict-encoded data page + // is flushed. Matching parquet-mr, arrow-go now discards the + // dictionary in that case — so no DICTIONARY page appears and + // all data pages are PLAIN. for pageIdx := 0; pageIdx < expectedNumDataPages[caseIndex]; pageIdx++ { ad.True(pr.Next()) - page = pr.Page() + page := pr.Page() ad.NotNil(page) ad.NoError(pr.Err()) dataPage, ok := page.(file.DataPage) ad.Require().True(ok) + ad.EqualValues(parquet.Encodings.Plain, dataPage.Encoding(), + "page %d should be PLAIN after dict fallback", pageIdx) stats := dataPage.Statistics() ad.EqualValues(expectedNullByPage[caseIndex][pageIdx], stats.NullCount) diff --git a/parquet/pqarrow/file_writer_test.go b/parquet/pqarrow/file_writer_test.go index 0c771931..7b98b212 100644 --- a/parquet/pqarrow/file_writer_test.go +++ b/parquet/pqarrow/file_writer_test.go @@ -171,8 +171,8 @@ func TestFileWriterTotalBytes(t *testing.T) { require.NoError(t, writer.Close()) // Verify total bytes & compressed bytes are correct - assert.Equal(t, int64(408), writer.TotalCompressedBytes()) - assert.Equal(t, int64(910), writer.TotalBytesWritten()) + assert.Equal(t, int64(340), writer.TotalCompressedBytes()) + assert.Equal(t, int64(799), writer.TotalBytesWritten()) } func TestFileWriterTotalBytesBuffered(t *testing.T) { @@ -205,8 +205,8 @@ func TestFileWriterTotalBytesBuffered(t *testing.T) { require.NoError(t, writer.Close()) // Verify total bytes & compressed bytes are correct - assert.Equal(t, int64(596), writer.TotalCompressedBytes()) - assert.Equal(t, int64(1306), writer.TotalBytesWritten()) + assert.Equal(t, int64(494), writer.TotalCompressedBytes()) + assert.Equal(t, int64(1139), writer.TotalBytesWritten()) } func TestWriteOnClosedFileWriter(t *testing.T) {