From 1e2a0f664a6fc0148e026b688beaca9ce6635a7f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20P=C3=BCtz?= Date: Tue, 28 Apr 2026 14:55:38 +0200 Subject: [PATCH 1/5] fix(parquet): align dictionary fallback with parquet-mr On dictionary overflow, arrow-go always flushed the dictionary page and any buffered dict-encoded data pages before switching to PLAIN, even when no dict-encoded data page had been cut. On mid-cardinality columns the result was a 4-encoding chunk layout (PLAIN_DICTIONARY, PLAIN, RLE, PLAIN) that bloated output by 20-30% versus parquet-mr. Mirror parquet-mr's FallbackValuesWriter: - Discard the dictionary and re-encode buffered indices as PLAIN when no dict-encoded data page has been flushed yet; only emit the dictionary page once a dict-encoded page is committed. - Before the first dict-encoded page, fall back to PLAIN if dict + indices >= raw input bytes. - Size dict-encoded pages by raw input bytes (not the RLE indices' encoded size) so the page cadence matches PLAIN. Adds DictEncoder.FallBackTo / ObservedRawSize and exposes BinaryMemoTable.Value for the fallback translation. --- parquet/file/column_writer.go | 60 ++- parquet/file/column_writer_test.go | 41 +- parquet/file/column_writer_types.gen.go | 296 +++++++++++--- parquet/file/column_writer_types.gen.go.tmpl | 37 +- parquet/file/dict_fallback_repro_test.go | 372 ++++++++++++++++++ .../internal/encoding/byte_array_encoder.go | 18 + parquet/internal/encoding/encoder.go | 20 + .../encoding/fixed_len_byte_array_encoder.go | 18 + parquet/internal/encoding/memo_table.go | 8 + parquet/internal/encoding/typed_encoder.go | 36 ++ parquet/internal/encoding/types.go | 14 + parquet/pqarrow/encode_dictionary_test.go | 18 +- 12 files changed, 831 insertions(+), 107 deletions(-) create mode 100644 parquet/file/dict_fallback_repro_test.go diff --git a/parquet/file/column_writer.go b/parquet/file/column_writer.go index b9d2220cd..40ae487ec 100644 --- a/parquet/file/column_writer.go +++ b/parquet/file/column_writer.go @@ -140,6 +140,11 @@ type columnWriter struct { totalCompressedBytes int64 closed bool fallbackToNonDict bool + dictPageWritten bool + // fallbackFn is set by each typed column writer at construction to its + // own FallbackToPlain. It lets the base FlushCurrentPage trigger + // fallback without needing to know the concrete value type. + fallbackFn func() pages []DataPage @@ -264,14 +269,44 @@ func (w *columnWriter) commitWriteAndCheckPageLimit(numLevels, numValues int64) w.numBufferedValues += numLevels w.numDataValues += numValues - enc := w.currentEncoder.EstimatedDataEncodedSize() - if enc >= w.props.DataPageSize() { + // While dictionary encoding is active we size pages by raw input bytes + // instead of the RLE-indices' encoded size. Mirrors parquet-mr's + // FallbackValuesWriter.getBufferedSize — it keeps dict pages roughly + // the same raw-byte footprint as the PLAIN pages they'd otherwise be, + // which also pulls the first-page compression check into the same + // cadence parquet-mr uses and avoids committing dict pages that only + // look cheap because their RLE indices are tiny. + var bufferedSize int64 + if w.hasDict && !w.fallbackToNonDict { + bufferedSize = w.currentEncoder.(encoding.DictEncoder).ObservedRawSize() + } else { + bufferedSize = w.currentEncoder.EstimatedDataEncodedSize() + } + if bufferedSize >= w.props.DataPageSize() { return w.FlushCurrentPage() } return nil } func (w *columnWriter) FlushCurrentPage() error { + // Before committing what would be the first dict-encoded data page, + // check whether dictionary encoding is actually saving space against + // a PLAIN baseline. This mirrors parquet-mr's + // FallbackValuesWriter.getBytes + isCompressionSatisfying: if the + // dictionary plus the encoded indices meet or exceed the raw input + // bytes, fall back to PLAIN now and discard the dictionary — avoiding + // the mid-cardinality case where a dict page stays in the file + // alongside PLAIN pages without any net compression win. + if w.hasDict && !w.fallbackToNonDict && !w.dictPageWritten && len(w.pages) == 0 && w.fallbackFn != nil { + dictEnc := w.currentEncoder.(encoding.DictEncoder) + rawSize := dictEnc.ObservedRawSize() + encodedSize := dictEnc.EstimatedDataEncodedSize() + dictSize := int64(dictEnc.DictEncodedSize()) + if rawSize > 0 && dictSize+encodedSize >= rawSize { + w.fallbackFn() + } + } + var ( defLevelsRLESize int32 = 0 repLevelsRLESize int32 = 0 @@ -427,12 +462,17 @@ func (w *columnWriter) FlushBufferedDataPages() (err error) { return err } } + return w.drainBufferedDataPages() +} +// drainBufferedDataPages writes out and releases any pages buffered while +// dictionary encoding was active. Unlike FlushBufferedDataPages, it does not +// touch the current encoder's unflushed values, so the caller can re-encode +// them as PLAIN during a dictionary fallback. +func (w *columnWriter) drainBufferedDataPages() (err error) { for i, p := range w.pages { defer p.Release() if err = w.WriteDataPage(p); err != nil { - // To keep pages in consistent state, - // remove the pages that will be released using above defer call. w.pages = w.pages[i+1:] return err } @@ -502,6 +542,9 @@ func (w *columnWriter) WriteDictionaryPage() error { page := NewDictionaryPage(buffer, int32(dictEncoder.NumEntries()), w.props.DictionaryPageEncoding()) written, err := w.pager.WriteDictionaryPage(page) w.totalBytesWritten += written + if err == nil { + w.dictPageWritten = true + } return err } @@ -620,7 +663,14 @@ func (w *columnWriter) Close() (err error) { if w.rowsWritten > 0 && chunkStats.IsSet() { w.metaData.SetStats(chunkStats) } - err = w.pager.Close(w.hasDict, w.fallbackToNonDict) + // Only advertise PLAIN_DICTIONARY / fallback encodings in the column + // chunk's encoding list when a dictionary page was actually written. + // When fallback discards the dictionary before any dict-encoded page + // is flushed, the column contains only PLAIN data and the list should + // reflect that unambiguously. + advertiseDict := w.hasDict && w.dictPageWritten + advertiseFallback := w.fallbackToNonDict && w.dictPageWritten + err = w.pager.Close(advertiseDict, advertiseFallback) } return err } diff --git a/parquet/file/column_writer_test.go b/parquet/file/column_writer_test.go index 03dbd25e4..65d5022fb 100644 --- a/parquet/file/column_writer_test.go +++ b/parquet/file/column_writer_test.go @@ -427,39 +427,20 @@ func (p *PrimitiveWriterTestSuite) testDictionaryFallbackEncoding(version parque p.EqualValues(VeryLargeSize, valuesRead) p.Equal(p.Values, p.ValuesOut) + // With the parquet-mr-aligned fallback behavior, a dictionary that overflows + // before any dict-encoded data page has been flushed is discarded entirely. + // With the default page-size parameters used here the dict-encoded + // EstimatedDataEncodedSize (RLE indices) stays under DataPageSize right up + // to the overflow point, so no dict page and no dict-encoded data pages + // appear in the output — the column is pure PLAIN. encodings := p.metadataEncodings() - if p.Typ.Kind() == reflect.Bool || p.Typ == reflect.TypeOf(parquet.Int96{}) { - // dictionary encoding is not allowed for booleans - // there are 2 encodings (PLAIN, RLE) in a non dictionary encoding case - p.Equal([]parquet.Encoding{parquet.Encodings.Plain, parquet.Encodings.RLE}, encodings) - } else if version == parquet.V1_0 { - // There are 4 encodings (PLAIN_DICTIONARY, PLAIN, RLE, PLAIN) in a fallback case - // for version 1.0 - p.Equal([]parquet.Encoding{parquet.Encodings.PlainDict, parquet.Encodings.Plain, parquet.Encodings.RLE, parquet.Encodings.Plain}, encodings) - } else { - // There are 4 encodings (RLE_DICTIONARY, PLAIN, RLE, PLAIN) in a fallback case for - // version 2.0 - p.Equal([]parquet.Encoding{parquet.Encodings.RLEDict, parquet.Encodings.Plain, parquet.Encodings.RLE, parquet.Encodings.Plain}, encodings) - } + p.Equal([]parquet.Encoding{parquet.Encodings.Plain, parquet.Encodings.RLE}, encodings) encodingStats := p.metadataEncodingStats() - if p.Typ.Kind() == reflect.Bool || p.Typ == reflect.TypeOf(parquet.Int96{}) { - p.Equal(parquet.Encodings.Plain, encodingStats[0].Encoding) - p.Equal(format.PageType_DATA_PAGE, encodingStats[0].PageType) - } else if version == parquet.V1_0 { - expected := []metadata.PageEncodingStats{ - {Encoding: parquet.Encodings.PlainDict, PageType: format.PageType_DICTIONARY_PAGE}, - {Encoding: parquet.Encodings.Plain, PageType: format.PageType_DATA_PAGE}, - {Encoding: parquet.Encodings.PlainDict, PageType: format.PageType_DATA_PAGE}} - p.Equal(expected[0], encodingStats[0]) - p.ElementsMatch(expected[1:], encodingStats[1:]) - } else { - expected := []metadata.PageEncodingStats{ - {Encoding: parquet.Encodings.Plain, PageType: format.PageType_DICTIONARY_PAGE}, - {Encoding: parquet.Encodings.Plain, PageType: format.PageType_DATA_PAGE}, - {Encoding: parquet.Encodings.RLEDict, PageType: format.PageType_DATA_PAGE}} - p.Equal(expected[0], encodingStats[0]) - p.ElementsMatch(expected[1:], encodingStats[1:]) + p.NotEmpty(encodingStats) + for _, es := range encodingStats { + p.Equal(parquet.Encodings.Plain, es.Encoding) + p.Equal(format.PageType_DATA_PAGE, es.PageType) } } diff --git a/parquet/file/column_writer_types.gen.go b/parquet/file/column_writer_types.gen.go index e7f8c3417..521d02d74 100644 --- a/parquet/file/column_writer_types.gen.go +++ b/parquet/file/column_writer_types.gen.go @@ -45,6 +45,7 @@ type Int32ColumnChunkWriter struct { func NewInt32ColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *Int32ColumnChunkWriter { ret := &Int32ColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} ret.currentEncoder = encoding.Int32EncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) + ret.fallbackFn = ret.FallbackToPlain return ret } @@ -217,15 +218,37 @@ func (w *Int32ColumnChunkWriter) checkDictionarySizeLimit() { } } +// FallbackToPlain switches this column from dictionary to PLAIN encoding when +// the dictionary outgrows the configured page size limit. It mirrors +// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded +// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the +// dictionary page is emitted only if a dict-encoded data page had already +// been cut before the overflow — otherwise the dictionary is discarded. func (w *Int32ColumnChunkWriter) FallbackToPlain() { - if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { - w.WriteDictionaryPage() - w.FlushBufferedDataPages() - w.fallbackToNonDict = true - w.currentEncoder.Release() - w.currentEncoder = encoding.Int32EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) - w.encoding = parquet.Encodings.Plain + if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict { + return } + + dictEnc := w.currentEncoder.(encoding.DictEncoder) + plainEnc := encoding.Int32EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem).(encoding.Int32Encoder) + + if len(w.pages) > 0 { + if err := w.WriteDictionaryPage(); err != nil { + panic(err) + } + if err := w.drainBufferedDataPages(); err != nil { + panic(err) + } + } + + if err := dictEnc.FallBackTo(plainEnc); err != nil { + panic(err) + } + + dictEnc.Release() + w.currentEncoder = plainEnc + w.encoding = parquet.Encodings.Plain + w.fallbackToNonDict = true } // Int64ColumnChunkWriter is the typed interface for writing columns to a parquet @@ -243,6 +266,7 @@ type Int64ColumnChunkWriter struct { func NewInt64ColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *Int64ColumnChunkWriter { ret := &Int64ColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} ret.currentEncoder = encoding.Int64EncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) + ret.fallbackFn = ret.FallbackToPlain return ret } @@ -415,15 +439,37 @@ func (w *Int64ColumnChunkWriter) checkDictionarySizeLimit() { } } +// FallbackToPlain switches this column from dictionary to PLAIN encoding when +// the dictionary outgrows the configured page size limit. It mirrors +// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded +// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the +// dictionary page is emitted only if a dict-encoded data page had already +// been cut before the overflow — otherwise the dictionary is discarded. func (w *Int64ColumnChunkWriter) FallbackToPlain() { - if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { - w.WriteDictionaryPage() - w.FlushBufferedDataPages() - w.fallbackToNonDict = true - w.currentEncoder.Release() - w.currentEncoder = encoding.Int64EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) - w.encoding = parquet.Encodings.Plain + if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict { + return } + + dictEnc := w.currentEncoder.(encoding.DictEncoder) + plainEnc := encoding.Int64EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem).(encoding.Int64Encoder) + + if len(w.pages) > 0 { + if err := w.WriteDictionaryPage(); err != nil { + panic(err) + } + if err := w.drainBufferedDataPages(); err != nil { + panic(err) + } + } + + if err := dictEnc.FallBackTo(plainEnc); err != nil { + panic(err) + } + + dictEnc.Release() + w.currentEncoder = plainEnc + w.encoding = parquet.Encodings.Plain + w.fallbackToNonDict = true } // Int96ColumnChunkWriter is the typed interface for writing columns to a parquet @@ -441,6 +487,7 @@ type Int96ColumnChunkWriter struct { func NewInt96ColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *Int96ColumnChunkWriter { ret := &Int96ColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} ret.currentEncoder = encoding.Int96EncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) + ret.fallbackFn = ret.FallbackToPlain return ret } @@ -613,15 +660,37 @@ func (w *Int96ColumnChunkWriter) checkDictionarySizeLimit() { } } +// FallbackToPlain switches this column from dictionary to PLAIN encoding when +// the dictionary outgrows the configured page size limit. It mirrors +// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded +// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the +// dictionary page is emitted only if a dict-encoded data page had already +// been cut before the overflow — otherwise the dictionary is discarded. func (w *Int96ColumnChunkWriter) FallbackToPlain() { - if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { - w.WriteDictionaryPage() - w.FlushBufferedDataPages() - w.fallbackToNonDict = true - w.currentEncoder.Release() - w.currentEncoder = encoding.Int96EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) - w.encoding = parquet.Encodings.Plain + if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict { + return } + + dictEnc := w.currentEncoder.(encoding.DictEncoder) + plainEnc := encoding.Int96EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem).(encoding.Int96Encoder) + + if len(w.pages) > 0 { + if err := w.WriteDictionaryPage(); err != nil { + panic(err) + } + if err := w.drainBufferedDataPages(); err != nil { + panic(err) + } + } + + if err := dictEnc.FallBackTo(plainEnc); err != nil { + panic(err) + } + + dictEnc.Release() + w.currentEncoder = plainEnc + w.encoding = parquet.Encodings.Plain + w.fallbackToNonDict = true } // Float32ColumnChunkWriter is the typed interface for writing columns to a parquet @@ -639,6 +708,7 @@ type Float32ColumnChunkWriter struct { func NewFloat32ColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *Float32ColumnChunkWriter { ret := &Float32ColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} ret.currentEncoder = encoding.Float32EncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) + ret.fallbackFn = ret.FallbackToPlain return ret } @@ -811,15 +881,37 @@ func (w *Float32ColumnChunkWriter) checkDictionarySizeLimit() { } } +// FallbackToPlain switches this column from dictionary to PLAIN encoding when +// the dictionary outgrows the configured page size limit. It mirrors +// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded +// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the +// dictionary page is emitted only if a dict-encoded data page had already +// been cut before the overflow — otherwise the dictionary is discarded. func (w *Float32ColumnChunkWriter) FallbackToPlain() { - if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { - w.WriteDictionaryPage() - w.FlushBufferedDataPages() - w.fallbackToNonDict = true - w.currentEncoder.Release() - w.currentEncoder = encoding.Float32EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) - w.encoding = parquet.Encodings.Plain + if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict { + return } + + dictEnc := w.currentEncoder.(encoding.DictEncoder) + plainEnc := encoding.Float32EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem).(encoding.Float32Encoder) + + if len(w.pages) > 0 { + if err := w.WriteDictionaryPage(); err != nil { + panic(err) + } + if err := w.drainBufferedDataPages(); err != nil { + panic(err) + } + } + + if err := dictEnc.FallBackTo(plainEnc); err != nil { + panic(err) + } + + dictEnc.Release() + w.currentEncoder = plainEnc + w.encoding = parquet.Encodings.Plain + w.fallbackToNonDict = true } // Float64ColumnChunkWriter is the typed interface for writing columns to a parquet @@ -837,6 +929,7 @@ type Float64ColumnChunkWriter struct { func NewFloat64ColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *Float64ColumnChunkWriter { ret := &Float64ColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} ret.currentEncoder = encoding.Float64EncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) + ret.fallbackFn = ret.FallbackToPlain return ret } @@ -1009,15 +1102,37 @@ func (w *Float64ColumnChunkWriter) checkDictionarySizeLimit() { } } +// FallbackToPlain switches this column from dictionary to PLAIN encoding when +// the dictionary outgrows the configured page size limit. It mirrors +// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded +// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the +// dictionary page is emitted only if a dict-encoded data page had already +// been cut before the overflow — otherwise the dictionary is discarded. func (w *Float64ColumnChunkWriter) FallbackToPlain() { - if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { - w.WriteDictionaryPage() - w.FlushBufferedDataPages() - w.fallbackToNonDict = true - w.currentEncoder.Release() - w.currentEncoder = encoding.Float64EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) - w.encoding = parquet.Encodings.Plain + if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict { + return } + + dictEnc := w.currentEncoder.(encoding.DictEncoder) + plainEnc := encoding.Float64EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem).(encoding.Float64Encoder) + + if len(w.pages) > 0 { + if err := w.WriteDictionaryPage(); err != nil { + panic(err) + } + if err := w.drainBufferedDataPages(); err != nil { + panic(err) + } + } + + if err := dictEnc.FallBackTo(plainEnc); err != nil { + panic(err) + } + + dictEnc.Release() + w.currentEncoder = plainEnc + w.encoding = parquet.Encodings.Plain + w.fallbackToNonDict = true } // BooleanColumnChunkWriter is the typed interface for writing columns to a parquet @@ -1038,6 +1153,7 @@ func NewBooleanColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, page } ret := &BooleanColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} ret.currentEncoder = encoding.BooleanEncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) + ret.fallbackFn = ret.FallbackToPlain return ret } @@ -1332,15 +1448,37 @@ func (w *BooleanColumnChunkWriter) checkDictionarySizeLimit() { } } +// FallbackToPlain switches this column from dictionary to PLAIN encoding when +// the dictionary outgrows the configured page size limit. It mirrors +// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded +// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the +// dictionary page is emitted only if a dict-encoded data page had already +// been cut before the overflow — otherwise the dictionary is discarded. func (w *BooleanColumnChunkWriter) FallbackToPlain() { - if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { - w.WriteDictionaryPage() - w.FlushBufferedDataPages() - w.fallbackToNonDict = true - w.currentEncoder.Release() - w.currentEncoder = encoding.BooleanEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) - w.encoding = parquet.Encodings.Plain + if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict { + return } + + dictEnc := w.currentEncoder.(encoding.DictEncoder) + plainEnc := encoding.BooleanEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem).(encoding.BooleanEncoder) + + if len(w.pages) > 0 { + if err := w.WriteDictionaryPage(); err != nil { + panic(err) + } + if err := w.drainBufferedDataPages(); err != nil { + panic(err) + } + } + + if err := dictEnc.FallBackTo(plainEnc); err != nil { + panic(err) + } + + dictEnc.Release() + w.currentEncoder = plainEnc + w.encoding = parquet.Encodings.Plain + w.fallbackToNonDict = true } // ByteArrayColumnChunkWriter is the typed interface for writing columns to a parquet @@ -1358,6 +1496,7 @@ type ByteArrayColumnChunkWriter struct { func NewByteArrayColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *ByteArrayColumnChunkWriter { ret := &ByteArrayColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} ret.currentEncoder = encoding.ByteArrayEncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) + ret.fallbackFn = ret.FallbackToPlain return ret } @@ -1603,15 +1742,37 @@ func (w *ByteArrayColumnChunkWriter) checkDictionarySizeLimit() { } } +// FallbackToPlain switches this column from dictionary to PLAIN encoding when +// the dictionary outgrows the configured page size limit. It mirrors +// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded +// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the +// dictionary page is emitted only if a dict-encoded data page had already +// been cut before the overflow — otherwise the dictionary is discarded. func (w *ByteArrayColumnChunkWriter) FallbackToPlain() { - if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { - w.WriteDictionaryPage() - w.FlushBufferedDataPages() - w.fallbackToNonDict = true - w.currentEncoder.Release() - w.currentEncoder = encoding.ByteArrayEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) - w.encoding = parquet.Encodings.Plain + if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict { + return } + + dictEnc := w.currentEncoder.(encoding.DictEncoder) + plainEnc := encoding.ByteArrayEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem).(encoding.ByteArrayEncoder) + + if len(w.pages) > 0 { + if err := w.WriteDictionaryPage(); err != nil { + panic(err) + } + if err := w.drainBufferedDataPages(); err != nil { + panic(err) + } + } + + if err := dictEnc.FallBackTo(plainEnc); err != nil { + panic(err) + } + + dictEnc.Release() + w.currentEncoder = plainEnc + w.encoding = parquet.Encodings.Plain + w.fallbackToNonDict = true } // FixedLenByteArrayColumnChunkWriter is the typed interface for writing columns to a parquet @@ -1629,6 +1790,7 @@ type FixedLenByteArrayColumnChunkWriter struct { func NewFixedLenByteArrayColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *FixedLenByteArrayColumnChunkWriter { ret := &FixedLenByteArrayColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} ret.currentEncoder = encoding.FixedLenByteArrayEncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) + ret.fallbackFn = ret.FallbackToPlain return ret } @@ -1882,15 +2044,37 @@ func (w *FixedLenByteArrayColumnChunkWriter) checkDictionarySizeLimit() { } } +// FallbackToPlain switches this column from dictionary to PLAIN encoding when +// the dictionary outgrows the configured page size limit. It mirrors +// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded +// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the +// dictionary page is emitted only if a dict-encoded data page had already +// been cut before the overflow — otherwise the dictionary is discarded. func (w *FixedLenByteArrayColumnChunkWriter) FallbackToPlain() { - if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { - w.WriteDictionaryPage() - w.FlushBufferedDataPages() - w.fallbackToNonDict = true - w.currentEncoder.Release() - w.currentEncoder = encoding.FixedLenByteArrayEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) - w.encoding = parquet.Encodings.Plain + if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict { + return } + + dictEnc := w.currentEncoder.(encoding.DictEncoder) + plainEnc := encoding.FixedLenByteArrayEncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem).(encoding.FixedLenByteArrayEncoder) + + if len(w.pages) > 0 { + if err := w.WriteDictionaryPage(); err != nil { + panic(err) + } + if err := w.drainBufferedDataPages(); err != nil { + panic(err) + } + } + + if err := dictEnc.FallBackTo(plainEnc); err != nil { + panic(err) + } + + dictEnc.Release() + w.currentEncoder = plainEnc + w.encoding = parquet.Encodings.Plain + w.fallbackToNonDict = true } // NewColumnChunkWriter constructs a column writer of the appropriate type by using the metadata builder diff --git a/parquet/file/column_writer_types.gen.go.tmpl b/parquet/file/column_writer_types.gen.go.tmpl index 9a22bb711..446347408 100644 --- a/parquet/file/column_writer_types.gen.go.tmpl +++ b/parquet/file/column_writer_types.gen.go.tmpl @@ -49,6 +49,7 @@ func New{{.Name}}ColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pa {{- end}} ret := &{{.Name}}ColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} ret.currentEncoder = encoding.{{.Name}}EncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) + ret.fallbackFn = ret.FallbackToPlain return ret } @@ -486,15 +487,37 @@ func (w *{{.Name}}ColumnChunkWriter) checkDictionarySizeLimit() { } } +// FallbackToPlain switches this column from dictionary to PLAIN encoding when +// the dictionary outgrows the configured page size limit. It mirrors +// parquet-mr's FallbackValuesWriter: unflushed buffered values are re-encoded +// as PLAIN (so the next data page is PLAIN, not dict-indexed), and the +// dictionary page is emitted only if a dict-encoded data page had already +// been cut before the overflow — otherwise the dictionary is discarded. func (w *{{.Name}}ColumnChunkWriter) FallbackToPlain() { - if w.currentEncoder.Encoding() == parquet.Encodings.PlainDict { - w.WriteDictionaryPage() - w.FlushBufferedDataPages() - w.fallbackToNonDict = true - w.currentEncoder.Release() - w.currentEncoder = encoding.{{.Name}}EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem) - w.encoding = parquet.Encodings.Plain + if w.currentEncoder.Encoding() != parquet.Encodings.PlainDict { + return } + + dictEnc := w.currentEncoder.(encoding.DictEncoder) + plainEnc := encoding.{{.Name}}EncoderTraits.Encoder(format.Encoding(parquet.Encodings.Plain), false, w.descr, w.mem).(encoding.{{.Name}}Encoder) + + if len(w.pages) > 0 { + if err := w.WriteDictionaryPage(); err != nil { + panic(err) + } + if err := w.drainBufferedDataPages(); err != nil { + panic(err) + } + } + + if err := dictEnc.FallBackTo(plainEnc); err != nil { + panic(err) + } + + dictEnc.Release() + w.currentEncoder = plainEnc + w.encoding = parquet.Encodings.Plain + w.fallbackToNonDict = true } {{end}} diff --git a/parquet/file/dict_fallback_repro_test.go b/parquet/file/dict_fallback_repro_test.go new file mode 100644 index 000000000..b581edad3 --- /dev/null +++ b/parquet/file/dict_fallback_repro_test.go @@ -0,0 +1,372 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package file_test + +import ( + "bytes" + "fmt" + "testing" + + "github.com/apache/arrow-go/v18/parquet" + "github.com/apache/arrow-go/v18/parquet/compress" + "github.com/apache/arrow-go/v18/parquet/file" + format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet" + "github.com/apache/arrow-go/v18/parquet/schema" + "github.com/stretchr/testify/require" +) + +// TestDictFallbackDiscardsOrphanDict verifies parquet-mr parity: when dict +// encoding overflows before any dict-encoded data page has been flushed, the +// dictionary is discarded and the buffered indices are re-encoded as PLAIN. +// The resulting chunk must therefore contain zero dict bytes and match a +// dict-disabled baseline to within the tiny delta from differences in how +// rows are batched across pages. +func TestDictFallbackDiscardsOrphanDict(t *testing.T) { + cases := []struct { + name string + version parquet.Version + codec compress.Compression + }{ + {"v1/uncompressed", parquet.V1_0, compress.Codecs.Uncompressed}, + {"v2/uncompressed", parquet.V2_LATEST, compress.Codecs.Uncompressed}, + {"v1/snappy", parquet.V1_0, compress.Codecs.Snappy}, + {"v2/snappy", parquet.V2_LATEST, compress.Codecs.Snappy}, + } + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + runDictFallbackNoDictPageCheck(t, tc.version, tc.codec) + }) + } +} + +func runDictFallbackNoDictPageCheck(t *testing.T, version parquet.Version, codec compress.Compression) { + const ( + numValues = 30000 + valueWidth = 32 + dictPageSizeLimit = 256 * 1024 + dataPageSize = 128 * 1024 + ) + + values := highCardinalityStrings(numValues, valueWidth) + knobs := writerKnobs{ + dictPageSizeLimit: dictPageSizeLimit, + dataPageSize: dataPageSize, + codec: codec, + } + + knobsOn := knobs + knobsOn.dictEnabled = true + _, dictOn := writeByteArrayColumn(t, values, version, knobsOn) + + knobsOff := knobs + knobsOff.dictEnabled = false + _, dictOff := writeByteArrayColumn(t, values, version, knobsOff) + + t.Logf("version=%s codec=%s numValues=%d valueWidth=%d (raw≈%d KiB)", + version, codec, numValues, valueWidth, numValues*valueWidth/1024) + t.Logf("dict=ON chunk.compressed=%d chunk.uncompressed=%d", dictOn.totalCompressed, dictOn.totalUncompressed) + t.Logf("dict=OFF chunk.compressed=%d chunk.uncompressed=%d", dictOff.totalCompressed, dictOff.totalUncompressed) + t.Logf("dict=ON encodings: %v", dictOn.encodings) + t.Logf("dict=ON per-page breakdown:") + for _, p := range dictOn.pages { + t.Logf(" %-18s encoding=%-16s numValues=%-6d compressed=%d B", + p.pageType, p.encoding, p.numValues, p.compressed) + } + + require.Falsef(t, dictOn.hasDictionaryPage, + "dictionary page should have been discarded; chunk=%d", dictOn.totalCompressed) + require.Zero(t, + dictOn.countDataPagesByEncoding(parquet.Encodings.PlainDict)+ + dictOn.countDataPagesByEncoding(parquet.Encodings.RLEDict), + "no dict-encoded data pages expected after discard; encodings=%v", dictOn.encodings) + require.NotContains(t, dictOn.encodings, parquet.Encodings.PlainDict, + "encoding list should not advertise PLAIN_DICTIONARY when no dict page exists") + require.NotContains(t, dictOn.encodings, parquet.Encodings.RLEDict, + "encoding list should not advertise RLE_DICTIONARY when no dict page exists") + + // With the fix, dict=on should match dict=off to within the rounding from + // first-page size differences (FallbackToPlain re-seeds the plain encoder + // with all buffered values, producing one oversized first page). + diff := dictOn.totalCompressed - dictOff.totalCompressed + t.Logf("size delta dict=on vs dict=off: %d B", diff) + require.LessOrEqualf(t, absInt64(diff), int64(valueWidth*2048), + "post-fix: dict=on should closely match dict=off, got delta=%d", diff) +} + +// TestDictFallbackKeepsDictWhenAlreadyFlushed covers the case where some +// dict-encoded data pages have already been emitted before the overflow — +// those must be preserved alongside the dict page, and the remainder of the +// column must switch to PLAIN. Matches parquet-mr's "initialUsedAndHadDictionary" +// path. +func TestDictFallbackKeepsDictWhenAlreadyFlushed(t *testing.T) { + const ( + valueWidth = 32 + dictPageSizeLimit = 8 * 1024 + dataPageSize = 4 * 1024 + ) + + // Mixed-cardinality data: a low-card prefix big enough that several + // dict-encoded data pages are flushed (and pass the first-page + // compression check) before a high-card tail overflows the dict and + // triggers fallback. The committed dict pages must be kept and the + // dictionary page preserved. + lowCard := make([]parquet.ByteArray, 5000) + for i := range lowCard { + lowCard[i] = parquet.ByteArray(fmt.Sprintf("cat_%02d%*s", i%16, valueWidth-8, "")) + } + highCard := highCardinalityStrings(5000, valueWidth) + values := append(lowCard, highCard...) + + _, chunk := writeByteArrayColumn(t, values, parquet.V1_0, writerKnobs{ + dictEnabled: true, + dictPageSizeLimit: dictPageSizeLimit, + dataPageSize: dataPageSize, + codec: compress.Codecs.Uncompressed, + }) + + t.Logf("encodings: %v hasDictPage=%v totalCompressed=%d", + chunk.encodings, chunk.hasDictionaryPage, chunk.totalCompressed) + + require.True(t, chunk.hasDictionaryPage, + "dict page must be kept when dict-encoded data pages were already flushed") + + dictPages := chunk.countDataPagesByEncoding(parquet.Encodings.PlainDict) + + chunk.countDataPagesByEncoding(parquet.Encodings.RLEDict) + plainPages := chunk.countDataPagesByEncoding(parquet.Encodings.Plain) + require.Greater(t, dictPages, 0, "expected dict-encoded data pages to survive fallback") + require.Greater(t, plainPages, 0, "expected PLAIN data pages after fallback") + + // All dict-encoded data pages must come before any PLAIN page — otherwise + // the dictionary offset in the file footer would reference data pages + // that can't be decoded against it. + sawPlain := false + for _, p := range chunk.pages { + if p.pageType != format.PageType_DATA_PAGE && p.pageType != format.PageType_DATA_PAGE_V2 { + continue + } + switch p.encoding { + case parquet.Encodings.Plain: + sawPlain = true + case parquet.Encodings.PlainDict, parquet.Encodings.RLEDict: + require.False(t, sawPlain, + "dict-encoded data page appeared after a PLAIN page — wrong ordering") + } + } +} + +func absInt64(v int64) int64 { + if v < 0 { + return -v + } + return v +} + +// TestDictFallbackMidCardinality exercises the case the iceberg-go TPC-DS +// benchmark flagged: mid-cardinality columns (ss_list_price etc.) where the +// dictionary grows slowly enough to pass the first-page compression check, +// eventually overflows, and leaves a dict page + dict-encoded pages + PLAIN +// pages stranded in one chunk. With dict-mode pages sized by raw bytes +// (matching parquet-mr), overflow happens at a similar cadence to plain +// encoding, so dict=on either stays competitive with dict=off or the +// committed dict pages are genuinely earning their keep. +func TestDictFallbackMidCardinality(t *testing.T) { + const ( + numRows = 200000 + valueWidth = 8 + distinctValueCount = 20000 + dictPageSizeLimit = 128 * 1024 + dataPageSize = 128 * 1024 + ) + + values := midCardinalityStrings(numRows, distinctValueCount, valueWidth) + + _, dictOn := writeByteArrayColumn(t, values, parquet.V1_0, writerKnobs{ + dictEnabled: true, + dictPageSizeLimit: dictPageSizeLimit, + dataPageSize: dataPageSize, + codec: compress.Codecs.Snappy, + }) + _, dictOff := writeByteArrayColumn(t, values, parquet.V1_0, writerKnobs{ + dictEnabled: false, + dataPageSize: dataPageSize, + codec: compress.Codecs.Snappy, + }) + + t.Logf("mid-card: distinct=%d rows=%d width=%d", distinctValueCount, numRows, valueWidth) + t.Logf("dict=ON compressed=%d encodings=%v", dictOn.totalCompressed, dictOn.encodings) + t.Logf("dict=OFF compressed=%d encodings=%v", dictOff.totalCompressed, dictOff.encodings) + + // The specific assertion: dict=on must not regress against dict=off by + // more than a small constant. Before the raw-byte page cadence, this + // scenario produced the 4-entry encoding layout and 20-30% bloat. + require.LessOrEqualf(t, + dictOn.totalCompressed, dictOff.totalCompressed+int64(numRows)/10, + "dict=on (%d) must not balloon against dict=off (%d) on mid-card data", + dictOn.totalCompressed, dictOff.totalCompressed) +} + +func midCardinalityStrings(n, distinct, width int) []parquet.ByteArray { + pool := make([]parquet.ByteArray, distinct) + for i := range pool { + b := make([]byte, width) + for j := range width { + b[j] = byte('A' + (i+j*7)%26) + } + tail := fmt.Appendf(nil, "-%05d", i) + copy(b[width-len(tail):], tail) + pool[i] = parquet.ByteArray(b) + } + out := make([]parquet.ByteArray, n) + for i := range out { + // Deterministic, roughly uniform distribution across the pool. + out[i] = pool[(i*2654435761)%distinct] + } + return out +} + +type writerKnobs struct { + dictEnabled bool + dictPageSizeLimit int64 + dataPageSize int64 + codec compress.Compression +} + +type pageInfo struct { + pageType format.PageType + encoding parquet.Encoding + numValues int32 + compressed int +} + +type chunkSummary struct { + totalCompressed int64 + totalUncompressed int64 + encodings []parquet.Encoding + encodingStats []pageEncodingStatsEntry + pages []pageInfo + hasDictionaryPage bool +} + +type pageEncodingStatsEntry struct { + PageType format.PageType + Encoding parquet.Encoding +} + +func (c *chunkSummary) countDataPagesByEncoding(enc parquet.Encoding) int { + n := 0 + for _, p := range c.pages { + if p.encoding == enc && (p.pageType == format.PageType_DATA_PAGE || p.pageType == format.PageType_DATA_PAGE_V2) { + n++ + } + } + return n +} + +func writeByteArrayColumn(t *testing.T, values []parquet.ByteArray, version parquet.Version, knobs writerKnobs) ([]byte, *chunkSummary) { + t.Helper() + + root := mustByteArraySchema(t) + opts := []parquet.WriterProperty{ + parquet.WithVersion(version), + parquet.WithDictionaryDefault(knobs.dictEnabled), + parquet.WithCompression(knobs.codec), + } + if knobs.dictPageSizeLimit > 0 { + opts = append(opts, parquet.WithDictionaryPageSizeLimit(knobs.dictPageSizeLimit)) + } + if knobs.dataPageSize > 0 { + opts = append(opts, parquet.WithDataPageSize(knobs.dataPageSize)) + } + if version == parquet.V2_LATEST { + opts = append(opts, parquet.WithDataPageVersion(parquet.DataPageV2)) + } + props := parquet.NewWriterProperties(opts...) + + var buf bytes.Buffer + w := file.NewParquetWriter(&buf, root, file.WithWriterProps(props)) + rgw := w.AppendRowGroup() + cw, err := rgw.NextColumn() + require.NoError(t, err) + _, err = cw.(*file.ByteArrayColumnChunkWriter).WriteBatch(values, nil, nil) + require.NoError(t, err) + require.NoError(t, cw.Close()) + require.NoError(t, rgw.Close()) + require.NoError(t, w.Close()) + + return buf.Bytes(), summarizeFirstColumnChunk(t, buf.Bytes()) +} + +func summarizeFirstColumnChunk(t *testing.T, raw []byte) *chunkSummary { + t.Helper() + + r, err := file.NewParquetReader(bytes.NewReader(raw)) + require.NoError(t, err) + defer r.Close() + + md, err := r.MetaData().RowGroup(0).ColumnChunk(0) + require.NoError(t, err) + s := &chunkSummary{ + totalCompressed: md.TotalCompressedSize(), + totalUncompressed: md.TotalUncompressedSize(), + encodings: md.Encodings(), + hasDictionaryPage: md.HasDictionaryPage(), + } + for _, es := range md.EncodingStats() { + s.encodingStats = append(s.encodingStats, pageEncodingStatsEntry{ + PageType: es.PageType, + Encoding: es.Encoding, + }) + } + + rg := r.RowGroup(0) + pr, err := rg.GetColumnPageReader(0) + require.NoError(t, err) + for pr.Next() { + page := pr.Page() + s.pages = append(s.pages, pageInfo{ + pageType: format.PageType(page.Type()), + encoding: parquet.Encoding(page.Encoding()), + numValues: page.NumValues(), + compressed: len(page.Data()), + }) + } + require.NoError(t, pr.Err()) + return s +} + +func mustByteArraySchema(t *testing.T) *schema.GroupNode { + t.Helper() + root, err := schema.NewGroupNode("schema", parquet.Repetitions.Required, schema.FieldList{ + schema.NewByteArrayNode("v", parquet.Repetitions.Required, -1), + }, -1) + require.NoError(t, err) + return root +} + +func highCardinalityStrings(n, width int) []parquet.ByteArray { + out := make([]parquet.ByteArray, n) + for i := range out { + b := make([]byte, width) + for j := range width { + b[j] = byte('A' + (i+j*31)%26) + } + tail := fmt.Appendf(nil, "-%07d", i) + copy(b[width-len(tail):], tail) + out[i] = parquet.ByteArray(b) + } + return out +} diff --git a/parquet/internal/encoding/byte_array_encoder.go b/parquet/internal/encoding/byte_array_encoder.go index d0439fafb..ba314f0c7 100644 --- a/parquet/internal/encoding/byte_array_encoder.go +++ b/parquet/internal/encoding/byte_array_encoder.go @@ -103,6 +103,7 @@ func (enc *DictByteArrayEncoder) PutByteArray(in parquet.ByteArray) { enc.dictEncodedSize += in.Len() + arrow.Uint32SizeBytes } enc.addIndex(memoIdx) + enc.AddRawSize(int64(in.Len() + arrow.Uint32SizeBytes)) } // Put takes a slice of ByteArrays to add and encode. @@ -127,6 +128,23 @@ func (enc *DictByteArrayEncoder) NormalizeDict(values arrow.Array) (arrow.Array, return values, nil } +// FallBackTo drains buffered indices through the dictionary into the +// fallback plain encoder and clears the index buffer. +func (enc *DictByteArrayEncoder) FallBackTo(fallback TypedEncoder) error { + target, ok := fallback.(ByteArrayEncoder) + if !ok { + return fmt.Errorf("parquet: dict fallback target encoder has wrong element type") + } + bm := enc.memo.(BinaryMemoTable) + vals := make([]parquet.ByteArray, len(enc.idxValues)) + for i, idx := range enc.idxValues { + vals[i] = parquet.ByteArray(bm.Value(int(idx))) + } + target.Put(vals) + enc.idxValues = enc.idxValues[:0] + return nil +} + // PutDictionary allows pre-seeding a dictionary encoder with // a dictionary from an Arrow Array. // diff --git a/parquet/internal/encoding/encoder.go b/parquet/internal/encoding/encoder.go index 2f1d698f9..64a992e71 100644 --- a/parquet/internal/encoding/encoder.go +++ b/parquet/internal/encoding/encoder.go @@ -114,6 +114,13 @@ type dictEncoder struct { idxValues []int32 memo MemoTable + // rawDataSize is the number of bytes of input values observed since + // the last page flush. Mirrors parquet-mr's rawDataByteSize and is + // consulted by FlushCurrentPage to decide whether the dictionary is + // actually compressing (dict + indices < raw) before committing the + // first dict-encoded data page. + rawDataSize int64 + preservedDict arrow.Array } @@ -134,6 +141,7 @@ func (d *dictEncoder) Reset() { d.dictEncodedSize = 0 d.idxValues = d.idxValues[:0] d.idxBuffer.ResizeNoShrink(0) + d.rawDataSize = 0 d.memo.Reset() if d.preservedDict != nil { d.preservedDict.Release() @@ -141,6 +149,17 @@ func (d *dictEncoder) Reset() { } } +// ObservedRawSize returns the number of raw input bytes accumulated since +// the last data page flush. Used with DictEncodedSize and +// EstimatedDataEncodedSize to evaluate whether dictionary encoding is +// actually saving space on the first page. +func (d *dictEncoder) ObservedRawSize() int64 { return d.rawDataSize } + +// AddRawSize is used by per-type dict encoders to accumulate the raw-bytes +// total as values are written. Exported at package scope because the typed +// encoders live in different files within this package. +func (d *dictEncoder) AddRawSize(n int64) { d.rawDataSize += n } + func (d *dictEncoder) Release() { d.encoder.Release() d.idxBuffer.Release() @@ -285,6 +304,7 @@ func (d *dictEncoder) WriteIndices(out []byte) (int, error) { nbytes := enc.Flush() d.idxValues = d.idxValues[:0] + d.rawDataSize = 0 return nbytes + 1, nil } diff --git a/parquet/internal/encoding/fixed_len_byte_array_encoder.go b/parquet/internal/encoding/fixed_len_byte_array_encoder.go index 0f4345d22..ef162d4d6 100644 --- a/parquet/internal/encoding/fixed_len_byte_array_encoder.go +++ b/parquet/internal/encoding/fixed_len_byte_array_encoder.go @@ -136,6 +136,7 @@ func (enc *DictFixedLenByteArrayEncoder) Put(in []parquet.FixedLenByteArray) { } enc.addIndex(memoIdx) } + enc.AddRawSize(int64(len(in)) * int64(enc.typeLen)) } // PutSpaced is like Put but leaves space for nulls @@ -151,6 +152,23 @@ func (enc *DictFixedLenByteArrayEncoder) NormalizeDict(values arrow.Array) (arro return values, nil } +// FallBackTo drains buffered indices through the dictionary into the +// fallback plain encoder and clears the index buffer. +func (enc *DictFixedLenByteArrayEncoder) FallBackTo(fallback TypedEncoder) error { + target, ok := fallback.(FixedLenByteArrayEncoder) + if !ok { + return fmt.Errorf("parquet: dict fallback target encoder has wrong element type") + } + bm := enc.memo.(BinaryMemoTable) + vals := make([]parquet.FixedLenByteArray, len(enc.idxValues)) + for i, idx := range enc.idxValues { + vals[i] = parquet.FixedLenByteArray(bm.Value(int(idx))) + } + target.Put(vals) + enc.idxValues = enc.idxValues[:0] + return nil +} + // PutDictionary allows pre-seeding a dictionary encoder with // a dictionary from an Arrow Array. // diff --git a/parquet/internal/encoding/memo_table.go b/parquet/internal/encoding/memo_table.go index 062104b6b..01db0f389 100644 --- a/parquet/internal/encoding/memo_table.go +++ b/parquet/internal/encoding/memo_table.go @@ -114,6 +114,10 @@ type BinaryMemoTable interface { CopyFixedWidthValues(start int, width int, out []byte) // VisitValues calls visitFn on each value in the table starting with the index specified VisitValues(start int, visitFn func([]byte)) + // Value returns the byte slice stored at the given dictionary index. The + // returned slice is owned by the memo table; callers must copy if they + // need the data to outlive the table. + Value(i int) []byte // Retain increases the reference count of the separately stored binary data that is // kept alongside the table which contains all of the values in the table. This is // safe to call simultaneously across multiple goroutines. @@ -307,6 +311,10 @@ func (m *binaryMemoTableImpl) CopyOffsets(out []int32) { m.CopyOffsetsSubset(0, out) } +func (m *binaryMemoTableImpl) Value(i int) []byte { + return m.builder.Value(i) +} + func (m *binaryMemoTableImpl) VisitValues(start int, visitFn func([]byte)) { for i := int(start); i < m.Size(); i++ { visitFn(m.builder.Value(i)) diff --git a/parquet/internal/encoding/typed_encoder.go b/parquet/internal/encoding/typed_encoder.go index 1cba18da5..3762c631b 100644 --- a/parquet/internal/encoding/typed_encoder.go +++ b/parquet/internal/encoding/typed_encoder.go @@ -144,6 +144,7 @@ func (enc *typedDictEncoder[T]) Put(in []T) { for _, val := range in { enc.dictEncoder.Put(val) } + enc.AddRawSize(int64(len(in)) * int64(unsafe.Sizeof(T(0)))) } func (enc *typedDictEncoder[T]) PutSpaced(in []T, validBits []byte, validBitsOffset int64) { @@ -158,6 +159,22 @@ type arrvalues[T arrow.ValueType] interface { Values() []T } +func (enc *typedDictEncoder[T]) FallBackTo(fallback TypedEncoder) error { + target, ok := fallback.(Encoder[T]) + if !ok { + return fmt.Errorf("parquet: dict fallback target encoder has wrong element type") + } + dict := make([]T, enc.memo.Size()) + enc.memo.CopyValues(dict) + vals := make([]T, len(enc.idxValues)) + for i, idx := range enc.idxValues { + vals[i] = dict[idx] + } + target.Put(vals) + enc.idxValues = enc.idxValues[:0] + return nil +} + func (enc *typedDictEncoder[T]) NormalizeDict(values arrow.Array) (arrow.Array, error) { if _, ok := values.(arrvalues[T]); ok { values.Retain() @@ -427,6 +444,7 @@ func (enc *DictInt96Encoder) Put(in []parquet.Int96) { } enc.addIndex(memoIdx) } + enc.AddRawSize(int64(len(in)) * int64(parquet.Int96SizeBytes)) } // PutSpaced is like Put but assumes space for nulls @@ -446,6 +464,24 @@ func (enc *DictInt96Encoder) PutDictionary(arrow.Array) error { return fmt.Errorf("%w: direct PutDictionary to Int96", arrow.ErrNotImplemented) } +// FallBackTo drains buffered indices through the dictionary into the +// fallback plain encoder and clears the index buffer. +func (enc *DictInt96Encoder) FallBackTo(fallback TypedEncoder) error { + target, ok := fallback.(Int96Encoder) + if !ok { + return fmt.Errorf("parquet: dict fallback target encoder has wrong element type") + } + bm := enc.memo.(BinaryMemoTable) + vals := make([]parquet.Int96, len(enc.idxValues)) + for i, idx := range enc.idxValues { + v := bm.Value(int(idx)) + copy(vals[i][:], v) + } + target.Put(vals) + enc.idxValues = enc.idxValues[:0] + return nil +} + // the boolEncoderTraits struct is used to make it easy to create encoders and decoders based on type type boolEncoderTraits struct{} diff --git a/parquet/internal/encoding/types.go b/parquet/internal/encoding/types.go index a2f1a5b2c..7c4015df3 100644 --- a/parquet/internal/encoding/types.go +++ b/parquet/internal/encoding/types.go @@ -119,6 +119,20 @@ type DictEncoder interface { // // The returned array must always be released by the caller. NormalizeDict(arrow.Array) (arrow.Array, error) + // FallBackTo translates the buffered indices back through the dictionary + // and puts the raw values into the fallback encoder, clearing the dict + // encoder's index buffer. Mirrors parquet-mr's + // RequiresFallback.fallBackAllValuesTo: used by column writers when the + // dictionary overflows mid-chunk so already-buffered values can be + // re-encoded with the fallback (PLAIN) encoder instead of being emitted + // as a stranded dict-encoded page. + FallBackTo(fallback TypedEncoder) error + // ObservedRawSize returns the raw input byte count accumulated since + // the last page flush. Used alongside DictEncodedSize and + // EstimatedDataEncodedSize to decide whether dictionary encoding is + // actually saving space before committing the first dict data page — + // mirrors parquet-mr's rawDataByteSize. + ObservedRawSize() int64 } var bufferPool = sync.Pool{ diff --git a/parquet/pqarrow/encode_dictionary_test.go b/parquet/pqarrow/encode_dictionary_test.go index 79d3147d2..e92b25874 100644 --- a/parquet/pqarrow/encode_dictionary_test.go +++ b/parquet/pqarrow/encode_dictionary_test.go @@ -188,7 +188,6 @@ func (ad *ArrowWriteDictionarySuite) TestStatisticsWithFallback() { {0, 1}, {0, 0}, {3}} - expectedDictCounts := []int32{4, 4, 4, 3} // pairs of (min, max) expectedMinMax := [][2]string{ {"a", "b"}, @@ -266,22 +265,23 @@ func (ad *ArrowWriteDictionarySuite) TestStatisticsWithFallback() { for rowGroup := 0; rowGroup < 2; rowGroup++ { pr, err := rdr.RowGroup(0).GetColumnPageReader(0) ad.Require().NoError(err) - ad.True(pr.Next()) - page := pr.Page() - ad.NotNil(page) - ad.NoError(pr.Err()) - ad.Require().IsType((*file.DictionaryPage)(nil), page) - dictPage := page.(*file.DictionaryPage) - ad.EqualValues(expectedDictCounts[caseIndex], dictPage.NumValues()) + + // pqarrow falls back to PLAIN whenever the arrow dict has + // duplicates, and does so before any dict-encoded data page + // is flushed. Matching parquet-mr, arrow-go now discards the + // dictionary in that case — so no DICTIONARY page appears and + // all data pages are PLAIN. for pageIdx := 0; pageIdx < expectedNumDataPages[caseIndex]; pageIdx++ { ad.True(pr.Next()) - page = pr.Page() + page := pr.Page() ad.NotNil(page) ad.NoError(pr.Err()) dataPage, ok := page.(file.DataPage) ad.Require().True(ok) + ad.EqualValues(parquet.Encodings.Plain, dataPage.Encoding(), + "page %d should be PLAIN after dict fallback", pageIdx) stats := dataPage.Statistics() ad.EqualValues(expectedNullByPage[caseIndex][pageIdx], stats.NullCount) From ad82eabec940d969b288487abecbca1d36174238 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20P=C3=BCtz?= Date: Wed, 29 Apr 2026 17:52:37 +0200 Subject: [PATCH 2/5] int64 --- parquet/file/dict_fallback_repro_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/file/dict_fallback_repro_test.go b/parquet/file/dict_fallback_repro_test.go index b581edad3..866b72c76 100644 --- a/parquet/file/dict_fallback_repro_test.go +++ b/parquet/file/dict_fallback_repro_test.go @@ -233,7 +233,7 @@ func midCardinalityStrings(n, distinct, width int) []parquet.ByteArray { out := make([]parquet.ByteArray, n) for i := range out { // Deterministic, roughly uniform distribution across the pool. - out[i] = pool[(i*2654435761)%distinct] + out[i] = pool[int(int64(i)*2654435761)%distinct] } return out } From d2a4950cfde2bf73fcb06824f3a5fddb8d37df30 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20P=C3=BCtz?= Date: Wed, 29 Apr 2026 18:10:20 +0200 Subject: [PATCH 3/5] no more wrapping? --- parquet/file/dict_fallback_repro_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/parquet/file/dict_fallback_repro_test.go b/parquet/file/dict_fallback_repro_test.go index 866b72c76..f82b32536 100644 --- a/parquet/file/dict_fallback_repro_test.go +++ b/parquet/file/dict_fallback_repro_test.go @@ -233,7 +233,7 @@ func midCardinalityStrings(n, distinct, width int) []parquet.ByteArray { out := make([]parquet.ByteArray, n) for i := range out { // Deterministic, roughly uniform distribution across the pool. - out[i] = pool[int(int64(i)*2654435761)%distinct] + out[i] = pool[int64(i)*2654435761%int64(distinct)] } return out } From c9b5a687d8bee4a33748845a41a0900d3041cd1e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20P=C3=BCtz?= Date: Thu, 30 Apr 2026 10:41:41 +0200 Subject: [PATCH 4/5] review comments # --- parquet/file/column_writer.go | 28 +-- parquet/file/column_writer_types.gen.go | 192 ++++++++++++++++-- parquet/file/column_writer_types.gen.go.tmpl | 24 ++- .../internal/encoding/byte_array_encoder.go | 3 +- .../encoding/fixed_len_byte_array_encoder.go | 3 +- parquet/internal/encoding/memo_table.go | 8 - parquet/internal/encoding/typed_encoder.go | 3 +- 7 files changed, 207 insertions(+), 54 deletions(-) diff --git a/parquet/file/column_writer.go b/parquet/file/column_writer.go index 40ae487ec..e72c36d3d 100644 --- a/parquet/file/column_writer.go +++ b/parquet/file/column_writer.go @@ -141,10 +141,6 @@ type columnWriter struct { closed bool fallbackToNonDict bool dictPageWritten bool - // fallbackFn is set by each typed column writer at construction to its - // own FallbackToPlain. It lets the base FlushCurrentPage trigger - // fallback without needing to know the concrete value type. - fallbackFn func() pages []DataPage @@ -289,24 +285,6 @@ func (w *columnWriter) commitWriteAndCheckPageLimit(numLevels, numValues int64) } func (w *columnWriter) FlushCurrentPage() error { - // Before committing what would be the first dict-encoded data page, - // check whether dictionary encoding is actually saving space against - // a PLAIN baseline. This mirrors parquet-mr's - // FallbackValuesWriter.getBytes + isCompressionSatisfying: if the - // dictionary plus the encoded indices meet or exceed the raw input - // bytes, fall back to PLAIN now and discard the dictionary — avoiding - // the mid-cardinality case where a dict page stays in the file - // alongside PLAIN pages without any net compression win. - if w.hasDict && !w.fallbackToNonDict && !w.dictPageWritten && len(w.pages) == 0 && w.fallbackFn != nil { - dictEnc := w.currentEncoder.(encoding.DictEncoder) - rawSize := dictEnc.ObservedRawSize() - encodedSize := dictEnc.EstimatedDataEncodedSize() - dictSize := int64(dictEnc.DictEncodedSize()) - if rawSize > 0 && dictSize+encodedSize >= rawSize { - w.fallbackFn() - } - } - var ( defLevelsRLESize int32 = 0 repLevelsRLESize int32 = 0 @@ -473,6 +451,8 @@ func (w *columnWriter) drainBufferedDataPages() (err error) { for i, p := range w.pages { defer p.Release() if err = w.WriteDataPage(p); err != nil { + // To keep pages in consistent state, + // remove the pages that will be released using above defer call. w.pages = w.pages[i+1:] return err } @@ -542,9 +522,7 @@ func (w *columnWriter) WriteDictionaryPage() error { page := NewDictionaryPage(buffer, int32(dictEncoder.NumEntries()), w.props.DictionaryPageEncoding()) written, err := w.pager.WriteDictionaryPage(page) w.totalBytesWritten += written - if err == nil { - w.dictPageWritten = true - } + w.dictPageWritten = err == nil return err } diff --git a/parquet/file/column_writer_types.gen.go b/parquet/file/column_writer_types.gen.go index 521d02d74..15a6d1e50 100644 --- a/parquet/file/column_writer_types.gen.go +++ b/parquet/file/column_writer_types.gen.go @@ -45,7 +45,6 @@ type Int32ColumnChunkWriter struct { func NewInt32ColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *Int32ColumnChunkWriter { ret := &Int32ColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} ret.currentEncoder = encoding.Int32EncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) - ret.fallbackFn = ret.FallbackToPlain return ret } @@ -213,8 +212,29 @@ func (w *Int32ColumnChunkWriter) checkDictionarySizeLimit() { return } - if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { + dictEnc := w.currentEncoder.(encoding.DictEncoder) + if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { w.FallbackToPlain() + return + } + + // Before any dict-encoded data page has been cut, check whether the + // dictionary is actually saving space against a PLAIN baseline. Mirrors + // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary + // plus the encoded indices meet or exceed the raw input bytes, fall back + // to PLAIN now and discard the dictionary — avoiding the mid-cardinality + // case where a dict page stays in the file alongside PLAIN pages without + // any net compression win. + if !w.dictPageWritten && len(w.pages) == 0 { + rawSize := dictEnc.ObservedRawSize() + encodedSize := dictEnc.EstimatedDataEncodedSize() + dictSize := int64(dictEnc.DictEncodedSize()) + // For an all-nulls batch rawSize, dictSize, and encodedSize are all 0, + // so this 0 >= 0 path triggers. Harmless: nothing is encoded yet, so + // PLAIN vs dict doesn't matter and we avoid an empty dictionary. + if dictSize+encodedSize >= rawSize { + w.FallbackToPlain() + } } } @@ -266,7 +286,6 @@ type Int64ColumnChunkWriter struct { func NewInt64ColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *Int64ColumnChunkWriter { ret := &Int64ColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} ret.currentEncoder = encoding.Int64EncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) - ret.fallbackFn = ret.FallbackToPlain return ret } @@ -434,8 +453,29 @@ func (w *Int64ColumnChunkWriter) checkDictionarySizeLimit() { return } - if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { + dictEnc := w.currentEncoder.(encoding.DictEncoder) + if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { w.FallbackToPlain() + return + } + + // Before any dict-encoded data page has been cut, check whether the + // dictionary is actually saving space against a PLAIN baseline. Mirrors + // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary + // plus the encoded indices meet or exceed the raw input bytes, fall back + // to PLAIN now and discard the dictionary — avoiding the mid-cardinality + // case where a dict page stays in the file alongside PLAIN pages without + // any net compression win. + if !w.dictPageWritten && len(w.pages) == 0 { + rawSize := dictEnc.ObservedRawSize() + encodedSize := dictEnc.EstimatedDataEncodedSize() + dictSize := int64(dictEnc.DictEncodedSize()) + // For an all-nulls batch rawSize, dictSize, and encodedSize are all 0, + // so this 0 >= 0 path triggers. Harmless: nothing is encoded yet, so + // PLAIN vs dict doesn't matter and we avoid an empty dictionary. + if dictSize+encodedSize >= rawSize { + w.FallbackToPlain() + } } } @@ -487,7 +527,6 @@ type Int96ColumnChunkWriter struct { func NewInt96ColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *Int96ColumnChunkWriter { ret := &Int96ColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} ret.currentEncoder = encoding.Int96EncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) - ret.fallbackFn = ret.FallbackToPlain return ret } @@ -655,8 +694,29 @@ func (w *Int96ColumnChunkWriter) checkDictionarySizeLimit() { return } - if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { + dictEnc := w.currentEncoder.(encoding.DictEncoder) + if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { w.FallbackToPlain() + return + } + + // Before any dict-encoded data page has been cut, check whether the + // dictionary is actually saving space against a PLAIN baseline. Mirrors + // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary + // plus the encoded indices meet or exceed the raw input bytes, fall back + // to PLAIN now and discard the dictionary — avoiding the mid-cardinality + // case where a dict page stays in the file alongside PLAIN pages without + // any net compression win. + if !w.dictPageWritten && len(w.pages) == 0 { + rawSize := dictEnc.ObservedRawSize() + encodedSize := dictEnc.EstimatedDataEncodedSize() + dictSize := int64(dictEnc.DictEncodedSize()) + // For an all-nulls batch rawSize, dictSize, and encodedSize are all 0, + // so this 0 >= 0 path triggers. Harmless: nothing is encoded yet, so + // PLAIN vs dict doesn't matter and we avoid an empty dictionary. + if dictSize+encodedSize >= rawSize { + w.FallbackToPlain() + } } } @@ -708,7 +768,6 @@ type Float32ColumnChunkWriter struct { func NewFloat32ColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *Float32ColumnChunkWriter { ret := &Float32ColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} ret.currentEncoder = encoding.Float32EncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) - ret.fallbackFn = ret.FallbackToPlain return ret } @@ -876,8 +935,29 @@ func (w *Float32ColumnChunkWriter) checkDictionarySizeLimit() { return } - if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { + dictEnc := w.currentEncoder.(encoding.DictEncoder) + if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { w.FallbackToPlain() + return + } + + // Before any dict-encoded data page has been cut, check whether the + // dictionary is actually saving space against a PLAIN baseline. Mirrors + // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary + // plus the encoded indices meet or exceed the raw input bytes, fall back + // to PLAIN now and discard the dictionary — avoiding the mid-cardinality + // case where a dict page stays in the file alongside PLAIN pages without + // any net compression win. + if !w.dictPageWritten && len(w.pages) == 0 { + rawSize := dictEnc.ObservedRawSize() + encodedSize := dictEnc.EstimatedDataEncodedSize() + dictSize := int64(dictEnc.DictEncodedSize()) + // For an all-nulls batch rawSize, dictSize, and encodedSize are all 0, + // so this 0 >= 0 path triggers. Harmless: nothing is encoded yet, so + // PLAIN vs dict doesn't matter and we avoid an empty dictionary. + if dictSize+encodedSize >= rawSize { + w.FallbackToPlain() + } } } @@ -929,7 +1009,6 @@ type Float64ColumnChunkWriter struct { func NewFloat64ColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *Float64ColumnChunkWriter { ret := &Float64ColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} ret.currentEncoder = encoding.Float64EncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) - ret.fallbackFn = ret.FallbackToPlain return ret } @@ -1097,8 +1176,29 @@ func (w *Float64ColumnChunkWriter) checkDictionarySizeLimit() { return } - if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { + dictEnc := w.currentEncoder.(encoding.DictEncoder) + if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { w.FallbackToPlain() + return + } + + // Before any dict-encoded data page has been cut, check whether the + // dictionary is actually saving space against a PLAIN baseline. Mirrors + // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary + // plus the encoded indices meet or exceed the raw input bytes, fall back + // to PLAIN now and discard the dictionary — avoiding the mid-cardinality + // case where a dict page stays in the file alongside PLAIN pages without + // any net compression win. + if !w.dictPageWritten && len(w.pages) == 0 { + rawSize := dictEnc.ObservedRawSize() + encodedSize := dictEnc.EstimatedDataEncodedSize() + dictSize := int64(dictEnc.DictEncodedSize()) + // For an all-nulls batch rawSize, dictSize, and encodedSize are all 0, + // so this 0 >= 0 path triggers. Harmless: nothing is encoded yet, so + // PLAIN vs dict doesn't matter and we avoid an empty dictionary. + if dictSize+encodedSize >= rawSize { + w.FallbackToPlain() + } } } @@ -1153,7 +1253,6 @@ func NewBooleanColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, page } ret := &BooleanColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} ret.currentEncoder = encoding.BooleanEncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) - ret.fallbackFn = ret.FallbackToPlain return ret } @@ -1443,8 +1542,29 @@ func (w *BooleanColumnChunkWriter) checkDictionarySizeLimit() { return } - if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { + dictEnc := w.currentEncoder.(encoding.DictEncoder) + if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { w.FallbackToPlain() + return + } + + // Before any dict-encoded data page has been cut, check whether the + // dictionary is actually saving space against a PLAIN baseline. Mirrors + // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary + // plus the encoded indices meet or exceed the raw input bytes, fall back + // to PLAIN now and discard the dictionary — avoiding the mid-cardinality + // case where a dict page stays in the file alongside PLAIN pages without + // any net compression win. + if !w.dictPageWritten && len(w.pages) == 0 { + rawSize := dictEnc.ObservedRawSize() + encodedSize := dictEnc.EstimatedDataEncodedSize() + dictSize := int64(dictEnc.DictEncodedSize()) + // For an all-nulls batch rawSize, dictSize, and encodedSize are all 0, + // so this 0 >= 0 path triggers. Harmless: nothing is encoded yet, so + // PLAIN vs dict doesn't matter and we avoid an empty dictionary. + if dictSize+encodedSize >= rawSize { + w.FallbackToPlain() + } } } @@ -1496,7 +1616,6 @@ type ByteArrayColumnChunkWriter struct { func NewByteArrayColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *ByteArrayColumnChunkWriter { ret := &ByteArrayColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} ret.currentEncoder = encoding.ByteArrayEncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) - ret.fallbackFn = ret.FallbackToPlain return ret } @@ -1737,8 +1856,29 @@ func (w *ByteArrayColumnChunkWriter) checkDictionarySizeLimit() { return } - if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { + dictEnc := w.currentEncoder.(encoding.DictEncoder) + if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { w.FallbackToPlain() + return + } + + // Before any dict-encoded data page has been cut, check whether the + // dictionary is actually saving space against a PLAIN baseline. Mirrors + // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary + // plus the encoded indices meet or exceed the raw input bytes, fall back + // to PLAIN now and discard the dictionary — avoiding the mid-cardinality + // case where a dict page stays in the file alongside PLAIN pages without + // any net compression win. + if !w.dictPageWritten && len(w.pages) == 0 { + rawSize := dictEnc.ObservedRawSize() + encodedSize := dictEnc.EstimatedDataEncodedSize() + dictSize := int64(dictEnc.DictEncodedSize()) + // For an all-nulls batch rawSize, dictSize, and encodedSize are all 0, + // so this 0 >= 0 path triggers. Harmless: nothing is encoded yet, so + // PLAIN vs dict doesn't matter and we avoid an empty dictionary. + if dictSize+encodedSize >= rawSize { + w.FallbackToPlain() + } } } @@ -1790,7 +1930,6 @@ type FixedLenByteArrayColumnChunkWriter struct { func NewFixedLenByteArrayColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pager PageWriter, useDict bool, enc parquet.Encoding, props *parquet.WriterProperties) *FixedLenByteArrayColumnChunkWriter { ret := &FixedLenByteArrayColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} ret.currentEncoder = encoding.FixedLenByteArrayEncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) - ret.fallbackFn = ret.FallbackToPlain return ret } @@ -2039,8 +2178,29 @@ func (w *FixedLenByteArrayColumnChunkWriter) checkDictionarySizeLimit() { return } - if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { + dictEnc := w.currentEncoder.(encoding.DictEncoder) + if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { w.FallbackToPlain() + return + } + + // Before any dict-encoded data page has been cut, check whether the + // dictionary is actually saving space against a PLAIN baseline. Mirrors + // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary + // plus the encoded indices meet or exceed the raw input bytes, fall back + // to PLAIN now and discard the dictionary — avoiding the mid-cardinality + // case where a dict page stays in the file alongside PLAIN pages without + // any net compression win. + if !w.dictPageWritten && len(w.pages) == 0 { + rawSize := dictEnc.ObservedRawSize() + encodedSize := dictEnc.EstimatedDataEncodedSize() + dictSize := int64(dictEnc.DictEncodedSize()) + // For an all-nulls batch rawSize, dictSize, and encodedSize are all 0, + // so this 0 >= 0 path triggers. Harmless: nothing is encoded yet, so + // PLAIN vs dict doesn't matter and we avoid an empty dictionary. + if dictSize+encodedSize >= rawSize { + w.FallbackToPlain() + } } } diff --git a/parquet/file/column_writer_types.gen.go.tmpl b/parquet/file/column_writer_types.gen.go.tmpl index 446347408..6cee1badd 100644 --- a/parquet/file/column_writer_types.gen.go.tmpl +++ b/parquet/file/column_writer_types.gen.go.tmpl @@ -49,7 +49,6 @@ func New{{.Name}}ColumnChunkWriter(meta *metadata.ColumnChunkMetaDataBuilder, pa {{- end}} ret := &{{.Name}}ColumnChunkWriter{columnWriter: newColumnWriterBase(meta, pager, useDict, enc, props)} ret.currentEncoder = encoding.{{.Name}}EncoderTraits.Encoder(format.Encoding(enc), useDict, meta.Descr(), props.Allocator()) - ret.fallbackFn = ret.FallbackToPlain return ret } @@ -482,8 +481,29 @@ func (w *{{.Name}}ColumnChunkWriter) checkDictionarySizeLimit() { return } - if w.currentEncoder.(encoding.DictEncoder).DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { + dictEnc := w.currentEncoder.(encoding.DictEncoder) + if dictEnc.DictEncodedSize() >= int(w.props.DictionaryPageSizeLimit()) { w.FallbackToPlain() + return + } + + // Before any dict-encoded data page has been cut, check whether the + // dictionary is actually saving space against a PLAIN baseline. Mirrors + // parquet-mr's FallbackValuesWriter.shouldFallBack: if the dictionary + // plus the encoded indices meet or exceed the raw input bytes, fall back + // to PLAIN now and discard the dictionary — avoiding the mid-cardinality + // case where a dict page stays in the file alongside PLAIN pages without + // any net compression win. + if !w.dictPageWritten && len(w.pages) == 0 { + rawSize := dictEnc.ObservedRawSize() + encodedSize := dictEnc.EstimatedDataEncodedSize() + dictSize := int64(dictEnc.DictEncodedSize()) + // For an all-nulls batch rawSize, dictSize, and encodedSize are all 0, + // so this 0 >= 0 path triggers. Harmless: nothing is encoded yet, so + // PLAIN vs dict doesn't matter and we avoid an empty dictionary. + if dictSize+encodedSize >= rawSize { + w.FallbackToPlain() + } } } diff --git a/parquet/internal/encoding/byte_array_encoder.go b/parquet/internal/encoding/byte_array_encoder.go index ba314f0c7..397a18a7b 100644 --- a/parquet/internal/encoding/byte_array_encoder.go +++ b/parquet/internal/encoding/byte_array_encoder.go @@ -24,6 +24,7 @@ import ( "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/arrow/array" "github.com/apache/arrow-go/v18/internal/bitutils" + "github.com/apache/arrow-go/v18/internal/hashing" "github.com/apache/arrow-go/v18/internal/utils" "github.com/apache/arrow-go/v18/parquet" ) @@ -135,7 +136,7 @@ func (enc *DictByteArrayEncoder) FallBackTo(fallback TypedEncoder) error { if !ok { return fmt.Errorf("parquet: dict fallback target encoder has wrong element type") } - bm := enc.memo.(BinaryMemoTable) + bm := enc.memo.(*hashing.BinaryMemoTable) vals := make([]parquet.ByteArray, len(enc.idxValues)) for i, idx := range enc.idxValues { vals[i] = parquet.ByteArray(bm.Value(int(idx))) diff --git a/parquet/internal/encoding/fixed_len_byte_array_encoder.go b/parquet/internal/encoding/fixed_len_byte_array_encoder.go index ef162d4d6..854b8b8aa 100644 --- a/parquet/internal/encoding/fixed_len_byte_array_encoder.go +++ b/parquet/internal/encoding/fixed_len_byte_array_encoder.go @@ -21,6 +21,7 @@ import ( "github.com/apache/arrow-go/v18/arrow" "github.com/apache/arrow-go/v18/internal/bitutils" + "github.com/apache/arrow-go/v18/internal/hashing" "github.com/apache/arrow-go/v18/parquet" ) @@ -159,7 +160,7 @@ func (enc *DictFixedLenByteArrayEncoder) FallBackTo(fallback TypedEncoder) error if !ok { return fmt.Errorf("parquet: dict fallback target encoder has wrong element type") } - bm := enc.memo.(BinaryMemoTable) + bm := enc.memo.(*hashing.BinaryMemoTable) vals := make([]parquet.FixedLenByteArray, len(enc.idxValues)) for i, idx := range enc.idxValues { vals[i] = parquet.FixedLenByteArray(bm.Value(int(idx))) diff --git a/parquet/internal/encoding/memo_table.go b/parquet/internal/encoding/memo_table.go index 01db0f389..062104b6b 100644 --- a/parquet/internal/encoding/memo_table.go +++ b/parquet/internal/encoding/memo_table.go @@ -114,10 +114,6 @@ type BinaryMemoTable interface { CopyFixedWidthValues(start int, width int, out []byte) // VisitValues calls visitFn on each value in the table starting with the index specified VisitValues(start int, visitFn func([]byte)) - // Value returns the byte slice stored at the given dictionary index. The - // returned slice is owned by the memo table; callers must copy if they - // need the data to outlive the table. - Value(i int) []byte // Retain increases the reference count of the separately stored binary data that is // kept alongside the table which contains all of the values in the table. This is // safe to call simultaneously across multiple goroutines. @@ -311,10 +307,6 @@ func (m *binaryMemoTableImpl) CopyOffsets(out []int32) { m.CopyOffsetsSubset(0, out) } -func (m *binaryMemoTableImpl) Value(i int) []byte { - return m.builder.Value(i) -} - func (m *binaryMemoTableImpl) VisitValues(start int, visitFn func([]byte)) { for i := int(start); i < m.Size(); i++ { visitFn(m.builder.Value(i)) diff --git a/parquet/internal/encoding/typed_encoder.go b/parquet/internal/encoding/typed_encoder.go index 3762c631b..82bb6fc90 100644 --- a/parquet/internal/encoding/typed_encoder.go +++ b/parquet/internal/encoding/typed_encoder.go @@ -26,6 +26,7 @@ import ( "github.com/apache/arrow-go/v18/arrow/compute" "github.com/apache/arrow-go/v18/arrow/memory" "github.com/apache/arrow-go/v18/internal/bitutils" + "github.com/apache/arrow-go/v18/internal/hashing" shared_utils "github.com/apache/arrow-go/v18/internal/utils" "github.com/apache/arrow-go/v18/parquet" format "github.com/apache/arrow-go/v18/parquet/internal/gen-go/parquet" @@ -471,7 +472,7 @@ func (enc *DictInt96Encoder) FallBackTo(fallback TypedEncoder) error { if !ok { return fmt.Errorf("parquet: dict fallback target encoder has wrong element type") } - bm := enc.memo.(BinaryMemoTable) + bm := enc.memo.(*hashing.BinaryMemoTable) vals := make([]parquet.Int96, len(enc.idxValues)) for i, idx := range enc.idxValues { v := bm.Value(int(idx)) From 84ecf26115efb65d4fe181e8a121476cede839a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Tobias=20P=C3=BCtz?= Date: Thu, 30 Apr 2026 18:23:04 +0200 Subject: [PATCH 5/5] update magic numbers to reflect improved file sizes --- parquet/pqarrow/file_writer_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/parquet/pqarrow/file_writer_test.go b/parquet/pqarrow/file_writer_test.go index 0c771931d..7b98b2126 100644 --- a/parquet/pqarrow/file_writer_test.go +++ b/parquet/pqarrow/file_writer_test.go @@ -171,8 +171,8 @@ func TestFileWriterTotalBytes(t *testing.T) { require.NoError(t, writer.Close()) // Verify total bytes & compressed bytes are correct - assert.Equal(t, int64(408), writer.TotalCompressedBytes()) - assert.Equal(t, int64(910), writer.TotalBytesWritten()) + assert.Equal(t, int64(340), writer.TotalCompressedBytes()) + assert.Equal(t, int64(799), writer.TotalBytesWritten()) } func TestFileWriterTotalBytesBuffered(t *testing.T) { @@ -205,8 +205,8 @@ func TestFileWriterTotalBytesBuffered(t *testing.T) { require.NoError(t, writer.Close()) // Verify total bytes & compressed bytes are correct - assert.Equal(t, int64(596), writer.TotalCompressedBytes()) - assert.Equal(t, int64(1306), writer.TotalBytesWritten()) + assert.Equal(t, int64(494), writer.TotalCompressedBytes()) + assert.Equal(t, int64(1139), writer.TotalBytesWritten()) } func TestWriteOnClosedFileWriter(t *testing.T) {