Skip to content

Commit 7d25a18

Browse files
committed
Make the build more forward-compatible (before move/testing with Java 21/25)
Hadoop upgrade (3.3.0 -> 3.4.3): - Hadoop 3.3.x uses javax.security.auth.Subject.getSubject() which was removed in JDK 23+ (JEP 471). Hadoop 3.4.x uses Subject.current() instead, restoring JDK 25 compatibility. Spotless upgrade (2.46.1 -> 3.5.1): - Spotless 2.46.1 calls com.sun.tools.javac.util.Log methods that were removed in JDK 25, causing NoSuchMethodError at format time. Spotless 3.5.1 is compatible with JDK 25. The minor formatting changes to switch/case comment indentation are from the new version. Fix ByteBuffer leak in vectored I/O reads: - Hadoop's readVectored() API accepts an IntFunction<ByteBuffer> for allocation but has no corresponding release callback. When a wrapping filesystem like ChecksumFileSystem is in the path, Hadoop allocates a buffer through the caller's allocator, uses it internally for checksum verification, then creates a different buffer for the CompletableFuture result. The originally allocated buffer is abandoned without release. - This caused TrackingByteBufferAllocator (used in tests) to throw LeakedByteBufferException for tests using vectored I/O: TestRecordLevelFilters, TestColumnIndexFiltering, TestParquetReader. - Fix: wrap the allocator in a capturing decorator that tracks every buffer allocated during readVectored(), then registers them all for release via ByteBufferReleaser. A try-finally ensures buffers are registered even if a read future times out or fails.
1 parent 8c8ec92 commit 7d25a18

8 files changed

Lines changed: 46 additions & 15 deletions

File tree

parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -763,7 +763,7 @@ public EncodingStats convertEncodingStats(List<PageEncodingStats> stats) {
763763
switch (stat.getPage_type()) {
764764
case DATA_PAGE_V2:
765765
builder.withV2Pages();
766-
// falls through
766+
// falls through
767767
case DATA_PAGE:
768768
builder.addDataEncoding(getEncoding(stat.getEncoding()), stat.getCount());
769769
break;

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -325,7 +325,7 @@ private String cacheKey(CompressionCodecName codecName) {
325325
level = conf.get("parquet.compression.codec.zstd.level");
326326
break;
327327
default:
328-
// compression level is not supported; ignore it
328+
// compression level is not supported; ignore it
329329
}
330330
String codecClass = codecName.getHadoopCompressionCodecClassName();
331331
return level == null ? codecClass : codecClass + ":" + level;

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnIndexValidator.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -546,7 +546,7 @@ private void validateBoundaryOrder(
546546
prevMaxValue::toString);
547547
break;
548548
case UNORDERED:
549-
// No checks necessary.
549+
// No checks necessary.
550550
}
551551
}
552552
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/DirectCodecFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -103,8 +103,8 @@ protected BytesCompressor createCompressor(final CompressionCodecName codecName)
103103
return new SnappyCompressor();
104104
case ZSTD:
105105
return new ZstdCompressor();
106-
// todo: create class similar to the SnappyCompressor for zlib and exclude it as
107-
// snappy is above since it also generates allocateDirect calls.
106+
// todo: create class similar to the SnappyCompressor for zlib and exclude it as
107+
// snappy is above since it also generates allocateDirect calls.
108108
default:
109109
return super.createCompressor(codecName);
110110
}

parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java

Lines changed: 37 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import org.apache.parquet.HadoopReadOptions;
6666
import org.apache.parquet.ParquetReadOptions;
6767
import org.apache.parquet.Preconditions;
68+
import org.apache.parquet.bytes.ByteBufferAllocator;
6869
import org.apache.parquet.bytes.ByteBufferInputStream;
6970
import org.apache.parquet.bytes.ByteBufferReleaser;
7071
import org.apache.parquet.bytes.BytesInput;
@@ -1361,12 +1362,42 @@ private void readVectored(List<ConsecutivePartList> allParts, ChunkListBuilder b
13611362
totalSize += len;
13621363
}
13631364
LOG.debug("Reading {} bytes of data with vectored IO in {} ranges", totalSize, ranges.size());
1364-
// Request a vectored read;
1365-
f.readVectored(ranges, options.getAllocator());
1366-
int k = 0;
1367-
for (ConsecutivePartList consecutivePart : allParts) {
1368-
ParquetFileRange currRange = ranges.get(k++);
1369-
consecutivePart.readFromVectoredRange(currRange, builder);
1365+
// Use a capturing allocator to track all buffers allocated by Hadoop during vectored reads.
1366+
// The buffer returned from the read future may differ from the one originally allocated
1367+
// (e.g., ChecksumFileSystem wraps/copies buffers), so we must track the actual allocations.
1368+
List<ByteBuffer> allocatedBuffers = new ArrayList<>();
1369+
ByteBufferAllocator capturingAllocator = new ByteBufferAllocator() {
1370+
@Override
1371+
public ByteBuffer allocate(int size) {
1372+
ByteBuffer buf = options.getAllocator().allocate(size);
1373+
allocatedBuffers.add(buf);
1374+
return buf;
1375+
}
1376+
1377+
@Override
1378+
public void release(ByteBuffer b) {
1379+
// Use identity comparison; ByteBuffer.equals() is content-based and could match wrong buffer
1380+
allocatedBuffers.removeIf(buf -> buf == b);
1381+
options.getAllocator().release(b);
1382+
}
1383+
1384+
@Override
1385+
public boolean isDirect() {
1386+
return options.getAllocator().isDirect();
1387+
}
1388+
};
1389+
try {
1390+
// Request a vectored read;
1391+
f.readVectored(ranges, capturingAllocator);
1392+
int k = 0;
1393+
for (ConsecutivePartList consecutivePart : allParts) {
1394+
ParquetFileRange currRange = ranges.get(k++);
1395+
consecutivePart.readFromVectoredRange(currRange, builder);
1396+
}
1397+
} finally {
1398+
// Register all buffers allocated during vectored reads for release.
1399+
// In a finally block so buffers are not leaked on read failures.
1400+
builder.addBuffersToRelease(allocatedBuffers);
13701401
}
13711402
}
13721403

parquet-thrift/src/main/java/org/apache/parquet/thrift/BufferedProtocolReadToWrite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ private boolean readOneValue(TProtocol in, byte type, List<Action> buffer, Thrif
226226
writeShortAction(buffer, s);
227227
break;
228228
case TType.ENUM: // same as i32 => actually never seen in the protocol layer as enums are written as a i32
229-
// field
229+
// field
230230
case TType.I32:
231231
final int i = in.readI32();
232232
checkEnum(expectedType, i);

parquet-thrift/src/main/java/org/apache/parquet/thrift/ProtocolReadToWrite.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ void readOneValue(TProtocol in, TProtocol out, byte type) throws TException {
7474
out.writeI16(in.readI16());
7575
break;
7676
case TType.ENUM: // same as i32 => actually never seen in the protocol layer as enums are written as a i32
77-
// field
77+
// field
7878
case TType.I32:
7979
out.writeI32(in.readI32());
8080
break;

pom.xml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -80,10 +80,10 @@
8080
<jackson-annotations.version>2.21</jackson-annotations.version>
8181
<japicmp.version>0.25.7</japicmp.version>
8282
<javax.annotation.version>1.3.2</javax.annotation.version>
83-
<spotless.version>2.46.1</spotless.version>
83+
<spotless.version>3.5.1</spotless.version>
8484
<shade.prefix>shaded.parquet</shade.prefix>
8585
<!-- Guarantees no newer classes/methods/constants are used by parquet. -->
86-
<hadoop.version>3.3.0</hadoop.version>
86+
<hadoop.version>3.4.3</hadoop.version>
8787
<parquet.format.version>2.12.0</parquet.format.version>
8888
<previous.version>1.17.0</previous.version>
8989
<thrift.executable>thrift</thrift.executable>

0 commit comments

Comments
 (0)