diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java index e0b0d76e0e..95914bcb2a 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ParquetFileReader.java @@ -65,6 +65,7 @@ import org.apache.parquet.HadoopReadOptions; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.Preconditions; +import org.apache.parquet.bytes.ByteBufferAllocator; import org.apache.parquet.bytes.ByteBufferInputStream; import org.apache.parquet.bytes.ByteBufferReleaser; import org.apache.parquet.bytes.BytesInput; @@ -1361,8 +1362,32 @@ private void readVectored(List allParts, ChunkListBuilder b totalSize += len; } LOG.debug("Reading {} bytes of data with vectored IO in {} ranges", totalSize, ranges.size()); - // Request a vectored read; - f.readVectored(ranges, options.getAllocator()); + // Wrap the allocator to track buffers allocated during the vectored read. + // Hadoop's vectored IO may merge ranges and return slices of merged buffers, + // so the buffers returned via CompletableFuture may differ from those originally + // allocated. We track the originals here to ensure they are properly released. + ByteBufferAllocator baseAllocator = options.getAllocator(); + List allocatedBuffers = new ArrayList<>(); + ByteBufferAllocator trackingAllocator = new ByteBufferAllocator() { + @Override + public ByteBuffer allocate(int size) { + ByteBuffer buf = baseAllocator.allocate(size); + allocatedBuffers.add(buf); + return buf; + } + + @Override + public void release(ByteBuffer b) { + baseAllocator.release(b); + } + + @Override + public boolean isDirect() { + return baseAllocator.isDirect(); + } + }; + f.readVectored(ranges, trackingAllocator); + builder.addBuffersToRelease(allocatedBuffers); int k = 0; for (ConsecutivePartList consecutivePart : allParts) { ParquetFileRange currRange = ranges.get(k++); diff --git a/pom.xml b/pom.xml index a0a6678163..45ad9118e4 100644 --- a/pom.xml +++ b/pom.xml @@ -83,7 +83,7 @@ 2.46.1 shaded.parquet - 3.3.0 + 3.4.3 2.13.0 1.17.0 thrift