Skip to content

PARQUET-3459: Per column compression#3526

Open
mengna-lin wants to merge 6 commits intoapache:masterfrom
mengna-lin:per_column_compression
Open

PARQUET-3459: Per column compression#3526
mengna-lin wants to merge 6 commits intoapache:masterfrom
mengna-lin:per_column_compression

Conversation

@mengna-lin
Copy link
Copy Markdown

Rationale for this change

The Parquet format supports per-column compression at the spec level, but parquet-java has always forced a single codec across all columns —
this PR exposes that existing capability.

What changes are included in this PR?

  • CompressionCodecFactory — new default getCompressor(codec, level) interface method
  • CodecFactory — level-aware compressor instantiation and validation (ZSTD: 1–22, GZIP: 0–9 or -1, BROTLI: 0–11)
  • ParquetProperties — new builder methods withCompressionCodec(col, codec) / withCompressionLevel(col, level)
  • ColumnChunkPageWriteStore — resolves compressor per column at write time, falls back to job-level default
  • ParquetWriter — new builder methods withCompressionCodec(col, codec) / withCompressionLevel(col, level)
  • ParquetOutputFormat — wires per-column codec/level from Hadoop Configuration into ParquetProperties

Are these changes tested?

Yes.
Unit tests cover ParquetProperties getters/copy behavior (TestParquetProperties),
CodecFactory level-aware caching and invalid level rejection (TestDirectCodecFactory),
and ColumnChunkPageWriteStore codec resolution and invalid level rejection (TestColumnChunkPageWriteStore).
Integration tests cover end-to-end data round-trips and footer metadata verification
through both the ParquetWriter builder API and ParquetOutputFormat (TestParquetWriter).
Also test with spark job

/**
 * Local Spark job that writes a Parquet file with per-column compression:
 *   col_1 (string) -> ZSTD level 9
 *   col_2 (int)    -> UNCOMPRESSED
 *   col_3 (double) -> SNAPPY (default)
 *
 * Build:
 *   cd test-spark-job && mvn package -DskipTests
 *
 * Run:
 *   spark-submit \
 *     --master local[2] \
 *     target/parquet-per-column-compression-test-1.0-SNAPSHOT-job.jar
 *
 * Inspect output with parquet-cli:
 *   hadoop jar ../parquet-cli/target/parquet-cli-1.18.0-SNAPSHOT-runtime.jar \
 *     org.apache.parquet.cli.Main meta \
 *     /tmp/per_column_compression_test/part-*.parquet
 */
public class PerColumnCompressionJob {

  private static final String OUTPUT_PATH = "/tmp/per_column_compression_test";

  public static void main(String[] args) {
    SparkSession spark = SparkSession.builder()
        .master("local[2]")
        .appName("PerColumnCompressionTest")
        // Default codec for all columns
        .config("spark.hadoop.parquet.compression", "SNAPPY")
        // col_1: ZSTD at level 9
        .config("spark.hadoop.parquet.compression#col_1", "ZSTD")
        .config("spark.hadoop.parquet.compression.level#col_1", "9")
        // col_2: no compression
        .config("spark.hadoop.parquet.compression#col_2", "UNCOMPRESSED")
        .getOrCreate();

    spark.sparkContext().setLogLevel("WARN");

    StructType schema = DataTypes.createStructType(new StructField[]{
        DataTypes.createStructField("col_1", DataTypes.StringType, false),
        DataTypes.createStructField("col_2", DataTypes.IntegerType, false),
        DataTypes.createStructField("col_3", DataTypes.DoubleType, false),
    });

    List<Row> rows = Arrays.asList(
        RowFactory.create("alice",   1, 1.1),
        RowFactory.create("bob",     2, 2.2),
        RowFactory.create("charlie", 3, 3.3),
        RowFactory.create("dave",    4, 4.4),
        RowFactory.create("eve",     5, 5.5)
    );

    Dataset<Row> df = spark.createDataFrame(rows, schema);
    System.out.println("\n=== Input DataFrame ===");
    df.printSchema();
    df.show();

    df.coalesce(1).write().mode("overwrite").parquet(OUTPUT_PATH);
    System.out.println("Wrote Parquet to: " + OUTPUT_PATH);

    System.out.println("\n=== Read Back ===");
    spark.read().parquet(OUTPUT_PATH).show();

    System.out.println("\nDone. Inspect compression with:");
    System.out.println("  hadoop jar ../parquet-cli/target/parquet-cli-1.18.0-SNAPSHOT-runtime.jar \\");
    System.out.println("    org.apache.parquet.cli.Main meta " + OUTPUT_PATH + "/part-*.parquet");

    spark.stop();
  }
}

Result

mengnalin@Mengnas-MacBook-Pro apache-parquet-java-mengna % hadoop jar parquet-cli/target/parquet-cli-1.18.0-SNAPSHOT-runtime.jar meta /tmp/per_column_compression_test/part-*.parquet

File path:  /tmp/per_column_compression_test/part-00000-017e9683-14be-4539-85ce-92f3904a744d-c000.snappy.parquet
Created by: *
Properties:
                   org.apache.spark.version: 4.1.1
  org.apache.spark.sql.parquet.row.metadata: {"type":"struct","fields":[{"name":"col_1","type":"string","nullable":false,"metadata":{}},{"name":"col_2","type":"integer","nullable":false,"metadata":{}},{"name":"col_3","type":"double","nullable":false,"metadata":{}}]}
Schema:
message spark_schema {
  required binary col_1 (STRING);
  required int32 col_2;
  required double col_3;
}


Row group 0:  count: 5  35.20 B records  start: 4  total(compressed): 176 B total(uncompressed):171 B 
--------------------------------------------------------------------------------
       type      encodings count     avg size   nulls   min / max
col_1  BINARY    Z   _     5         15.40 B    0       "alice" / "eve"
col_2  INT32     _   _     5         8.60 B     0       "1" / "5"
col_3  DOUBLE    S   _     5         11.20 B    0       "1.1" / "5.5"

Are there any user-facing changes?

Two new APIs, fully backwards compatible:
- ParquetWriter.Builder.withCompressionCodec(col, codec)
- ParquetWriter.Builder.withCompressionLevel(col, level)
(Also accessible at the lower level via ParquetProperties.Builder.)

@wgtmac
Copy link
Copy Markdown
Member

wgtmac commented Apr 23, 2026

Does this duplicate #3396?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants