Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 31 additions & 3 deletions parquet/file/column_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ type columnWriter struct {
totalCompressedBytes int64
closed bool
fallbackToNonDict bool
dictPageWritten bool

pages []DataPage

Expand Down Expand Up @@ -264,8 +265,20 @@ func (w *columnWriter) commitWriteAndCheckPageLimit(numLevels, numValues int64)
w.numBufferedValues += numLevels
w.numDataValues += numValues

enc := w.currentEncoder.EstimatedDataEncodedSize()
if enc >= w.props.DataPageSize() {
// While dictionary encoding is active we size pages by raw input bytes
// instead of the RLE-indices' encoded size. Mirrors parquet-mr's
// FallbackValuesWriter.getBufferedSize — it keeps dict pages roughly
// the same raw-byte footprint as the PLAIN pages they'd otherwise be,
// which also pulls the first-page compression check into the same
// cadence parquet-mr uses and avoids committing dict pages that only
// look cheap because their RLE indices are tiny.
var bufferedSize int64
if w.hasDict && !w.fallbackToNonDict {
bufferedSize = w.currentEncoder.(encoding.DictEncoder).ObservedRawSize()
} else {
bufferedSize = w.currentEncoder.EstimatedDataEncodedSize()
}
if bufferedSize >= w.props.DataPageSize() {
return w.FlushCurrentPage()
}
return nil
Expand Down Expand Up @@ -427,7 +440,14 @@ func (w *columnWriter) FlushBufferedDataPages() (err error) {
return err
}
}
return w.drainBufferedDataPages()
}

// drainBufferedDataPages writes out and releases any pages buffered while
// dictionary encoding was active. Unlike FlushBufferedDataPages, it does not
// touch the current encoder's unflushed values, so the caller can re-encode
// them as PLAIN during a dictionary fallback.
func (w *columnWriter) drainBufferedDataPages() (err error) {
for i, p := range w.pages {
defer p.Release()
if err = w.WriteDataPage(p); err != nil {
Expand Down Expand Up @@ -502,6 +522,7 @@ func (w *columnWriter) WriteDictionaryPage() error {
page := NewDictionaryPage(buffer, int32(dictEncoder.NumEntries()), w.props.DictionaryPageEncoding())
written, err := w.pager.WriteDictionaryPage(page)
w.totalBytesWritten += written
w.dictPageWritten = err == nil
return err
}

Expand Down Expand Up @@ -620,7 +641,14 @@ func (w *columnWriter) Close() (err error) {
if w.rowsWritten > 0 && chunkStats.IsSet() {
w.metaData.SetStats(chunkStats)
}
err = w.pager.Close(w.hasDict, w.fallbackToNonDict)
// Only advertise PLAIN_DICTIONARY / fallback encodings in the column
// chunk's encoding list when a dictionary page was actually written.
// When fallback discards the dictionary before any dict-encoded page
// is flushed, the column contains only PLAIN data and the list should
// reflect that unambiguously.
advertiseDict := w.hasDict && w.dictPageWritten
advertiseFallback := w.fallbackToNonDict && w.dictPageWritten
err = w.pager.Close(advertiseDict, advertiseFallback)
}
return err
}
Expand Down
41 changes: 11 additions & 30 deletions parquet/file/column_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,39 +427,20 @@ func (p *PrimitiveWriterTestSuite) testDictionaryFallbackEncoding(version parque
p.EqualValues(VeryLargeSize, valuesRead)
p.Equal(p.Values, p.ValuesOut)

// With the parquet-mr-aligned fallback behavior, a dictionary that overflows
// before any dict-encoded data page has been flushed is discarded entirely.
// With the default page-size parameters used here the dict-encoded
// EstimatedDataEncodedSize (RLE indices) stays under DataPageSize right up
// to the overflow point, so no dict page and no dict-encoded data pages
// appear in the output — the column is pure PLAIN.
encodings := p.metadataEncodings()
if p.Typ.Kind() == reflect.Bool || p.Typ == reflect.TypeOf(parquet.Int96{}) {
// dictionary encoding is not allowed for booleans
// there are 2 encodings (PLAIN, RLE) in a non dictionary encoding case
p.Equal([]parquet.Encoding{parquet.Encodings.Plain, parquet.Encodings.RLE}, encodings)
} else if version == parquet.V1_0 {
// There are 4 encodings (PLAIN_DICTIONARY, PLAIN, RLE, PLAIN) in a fallback case
// for version 1.0
p.Equal([]parquet.Encoding{parquet.Encodings.PlainDict, parquet.Encodings.Plain, parquet.Encodings.RLE, parquet.Encodings.Plain}, encodings)
} else {
// There are 4 encodings (RLE_DICTIONARY, PLAIN, RLE, PLAIN) in a fallback case for
// version 2.0
p.Equal([]parquet.Encoding{parquet.Encodings.RLEDict, parquet.Encodings.Plain, parquet.Encodings.RLE, parquet.Encodings.Plain}, encodings)
}
p.Equal([]parquet.Encoding{parquet.Encodings.Plain, parquet.Encodings.RLE}, encodings)

encodingStats := p.metadataEncodingStats()
if p.Typ.Kind() == reflect.Bool || p.Typ == reflect.TypeOf(parquet.Int96{}) {
p.Equal(parquet.Encodings.Plain, encodingStats[0].Encoding)
p.Equal(format.PageType_DATA_PAGE, encodingStats[0].PageType)
} else if version == parquet.V1_0 {
expected := []metadata.PageEncodingStats{
{Encoding: parquet.Encodings.PlainDict, PageType: format.PageType_DICTIONARY_PAGE},
{Encoding: parquet.Encodings.Plain, PageType: format.PageType_DATA_PAGE},
{Encoding: parquet.Encodings.PlainDict, PageType: format.PageType_DATA_PAGE}}
p.Equal(expected[0], encodingStats[0])
p.ElementsMatch(expected[1:], encodingStats[1:])
} else {
expected := []metadata.PageEncodingStats{
{Encoding: parquet.Encodings.Plain, PageType: format.PageType_DICTIONARY_PAGE},
{Encoding: parquet.Encodings.Plain, PageType: format.PageType_DATA_PAGE},
{Encoding: parquet.Encodings.RLEDict, PageType: format.PageType_DATA_PAGE}}
p.Equal(expected[0], encodingStats[0])
p.ElementsMatch(expected[1:], encodingStats[1:])
p.NotEmpty(encodingStats)
for _, es := range encodingStats {
p.Equal(parquet.Encodings.Plain, es.Encoding)
p.Equal(format.PageType_DATA_PAGE, es.PageType)
}
}

Expand Down
Loading