diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 6c95bd3768909..1d65dd56485b3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -51,6 +51,7 @@ import org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator; import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator; import org.apache.hadoop.fs.azurebfs.enums.Trilean; +import org.apache.hadoop.fs.azurebfs.enums.VectoredReadStrategy; import org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee; import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider; import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider; @@ -623,6 +624,22 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_FS_AZURE_LOWEST_REQUEST_PRIORITY_VALUE) private int prefetchRequestPriorityValue; + @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_VECTORED_READ_STRATEGY, + DefaultValue = DEFAULT_FS_AZURE_VECTORED_READ_STRATEGY) + private String vectoredReadStrategy; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_MIN_SEEK_FOR_VECTORED_READS, + DefaultValue = DEFAULT_FS_AZURE_MIN_SEEK_FOR_VECTORED_READS) + private int minSeekForVectoredReads; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_MAX_SEEK_FOR_VECTORED_READS, + DefaultValue = DEFAULT_FS_AZURE_MAX_SEEK_FOR_VECTORED_READS) + private int maxSeekForVectoredReads; + + @IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_MAX_SEEK_FOR_VECTORED_READS_THROUGHPUT, + DefaultValue = DEFAULT_FS_AZURE_MAX_SEEK_FOR_VECTORED_READS_THROUGHPUT) + private int maxSeekForVectoredReadsThroughput; + @StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_READ_POLICY, DefaultValue = DEFAULT_AZURE_READ_POLICY) private String abfsReadPolicy; @@ -2167,4 +2184,58 @@ public int getTailLatencyAnalysisWindowGranularity() { public int getTailLatencyMaxRetryCount() { return tailLatencyMaxRetryCount; } + + /** + * Returns the configured vectored read strategy. + * + *

+ * The configuration value is parsed in a case-insensitive manner and may be + * specified either using the enum name (for example, + * {@code TPS_OPTIMIZED}) or the short name (for example, {@code TPS}). + *

+ * + * @return the resolved {@link VectoredReadStrategy} + * + * @throws IllegalArgumentException if the configured value is invalid + */ + public VectoredReadStrategy getVectoredReadStrategy() { + try { + return VectoredReadStrategy.fromString(vectoredReadStrategy); + } catch (IllegalArgumentException e) { + throw new IllegalArgumentException( + "Invalid value for " + FS_AZURE_VECTORED_READ_STRATEGY + + ": " + vectoredReadStrategy + + ". Expected one of: TPS, THROUGHPUT, TPS_OPTIMIZED, THROUGHPUT_OPTIMIZED", e); + } + } + + /** + * Returns the minimum gap between adjacent read ranges that qualifies them + * for merging during vectored reads. + * + * @return minimum gap threshold for range merging + */ + public int getMinSeekForVectoredReads() { + return minSeekForVectoredReads; + } + + /** + * Returns the maximum gap between adjacent read ranges allowed when + * considering them for merging during vectored reads. + * + * @return maximum gap threshold for range merging + */ + public int getMaxSeekForVectoredReads() { + return maxSeekForVectoredReads; + } + + /** + * Returns the maximum gap between adjacent read ranges allowed when + * considering them for merging during vectored reads throughput optimized.. + * + * @return maximum gap threshold for range merging + */ + public int getMaxSeekForVectoredReadsThroughput() { + return maxSeekForVectoredReadsThroughput; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index e3b3c2a467575..20244587fe329 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -627,5 +627,31 @@ public static String containerProperty(String property, String fsName, String ac */ public static final String FS_AZURE_TAIL_LATENCY_MAX_RETRY_COUNT = "fs.azure.tail.latency.max.retry.count"; + /** + * Configuration key to control the vectored read strategy used by ABFS: {@value} + */ + public static final String FS_AZURE_VECTORED_READ_STRATEGY = "fs.azure.vectored.read.strategy"; + + /** + * Configuration key that defines the minimum gap between adjacent read ranges + * for merging ranges during vectored reads in ABFS: {@value}. + */ + public static final String FS_AZURE_MIN_SEEK_FOR_VECTORED_READS = + "fs.azure.min.seek.for.vectored.reads"; + + /** + * Configuration key that defines the maximum gap between adjacent read ranges + * for merging ranges during vectored reads in ABFS: {@value}. + */ + public static final String FS_AZURE_MAX_SEEK_FOR_VECTORED_READS = + "fs.azure.max.seek.for.vectored.reads"; + + /** + * Configuration key that defines the maximum gap between adjacent read ranges + * for merging ranges during vectored reads in ABFS throughput optimized: {@value}. + */ + public static final String FS_AZURE_MAX_SEEK_FOR_VECTORED_READS_THROUGHPUT = + "fs.azure.max.seek.for.vectored.reads.throughput"; + private ConfigurationKeys() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index feafbe4f3a55f..e54cf493eff15 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -306,6 +306,10 @@ public final class FileSystemConfigurations { public static final int MIN_FS_AZURE_TAIL_LATENCY_ANALYSIS_WINDOW_GRANULARITY = 1; public static final int DEFAULT_FS_AZURE_TAIL_LATENCY_PERCENTILE_COMPUTATION_INTERVAL_MILLIS = 500; public static final int DEFAULT_FS_AZURE_TAIL_LATENCY_MAX_RETRY_COUNT = 1; + public static final String DEFAULT_FS_AZURE_VECTORED_READ_STRATEGY = "TPS"; + public static final int DEFAULT_FS_AZURE_MIN_SEEK_FOR_VECTORED_READS = ONE_MB; + public static final int DEFAULT_FS_AZURE_MAX_SEEK_FOR_VECTORED_READS = 4 * ONE_MB; + public static final int DEFAULT_FS_AZURE_MAX_SEEK_FOR_VECTORED_READS_THROUGHPUT = 8 * ONE_MB; private FileSystemConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ReadType.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ReadType.java index 51391cc747740..056a0aba4988c 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ReadType.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ReadType.java @@ -48,6 +48,16 @@ public enum ReadType { * Only triggered when small file read optimization kicks in. */ SMALLFILE_READ("SR"), + /** + * Read multiple disjoint ranges from the storage service using vectored reads. + * Used to coalesce and execute non-contiguous reads efficiently. + */ + VECTORED_READ("VR"), + /** + * Performs a vectored direct read by fetching multiple non-contiguous + * ranges in a single operation. + */ + VECTORED_DIRECT_READ("VDR"), /** * Reads from Random Input Stream with read ahead up to readAheadRange */ diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/BufferType.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/BufferType.java new file mode 100644 index 0000000000000..85c6d1e2e7d86 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/BufferType.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.enums; + +import org.apache.hadoop.fs.azurebfs.services.ReadBuffer; + +/** + * Enum for buffer types. + * Used in {@link ReadBuffer} to separate normal vs vectored read. + */ +public enum BufferType { + /** + * Normal read buffer. + */ + NORMAL, + /** + * Vectored read buffer. + */ + VECTORED +} diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/VectoredReadStrategy.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/VectoredReadStrategy.java new file mode 100644 index 0000000000000..d67499ddb9dd5 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/enums/VectoredReadStrategy.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.enums; + +/** + * Defines the strategy used for vectored reads in ABFS. + * + *

+ * The strategy controls how read ranges are planned and executed, trading off + * between request parallelism and per-request payload size. + *

+ */ +public enum VectoredReadStrategy { + + /** + * Optimizes for transactions per second (TPS). + */ + TPS_OPTIMIZED("TPS"), + + /** + * Optimizes for overall data throughput. + */ + THROUGHPUT_OPTIMIZED("THROUGHPUT"); + + /** Short name used for configuration and logging. */ + private final String name; + + /** + * Constructs a vectored read strategy with a short, user-friendly name. + * + * @param name short identifier for the strategy + */ + VectoredReadStrategy(String name) { + this.name = name; + } + + /** + * Returns the short name of the vectored read strategy. + * + * @return short strategy name + */ + public String getName() { + return name; + } + + /** + * Parses a configuration value into a {@link VectoredReadStrategy}. + * @param value configuration value + * @return matching vectored read strategy + * @throws IllegalArgumentException if the value is invalid + */ + public static VectoredReadStrategy fromString(String value) { + for (VectoredReadStrategy strategy : values()) { + if (strategy.name().equalsIgnoreCase(value) + || strategy.getName().equalsIgnoreCase(value)) { + return strategy; + } + } + throw new IllegalArgumentException("Invalid vectored read strategy: " + value); + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 3c009c71bcdab..b39c35cf30691 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -22,10 +22,14 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.net.HttpURLConnection; +import java.nio.ByteBuffer; +import java.util.List; import java.util.UUID; +import java.util.function.IntFunction; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.FileRange; import org.apache.hadoop.fs.azurebfs.constants.ReadType; import org.apache.hadoop.fs.impl.BackReference; import org.apache.hadoop.util.Preconditions; @@ -54,8 +58,6 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN; import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD; -import static org.apache.hadoop.io.Sizes.S_128K; -import static org.apache.hadoop.io.Sizes.S_2M; import static org.apache.hadoop.util.StringUtils.toLowerCase; /** @@ -72,7 +74,6 @@ public abstract class AbfsInputStream extends FSInputStream implements CanUnbuff private final AbfsClient client; private final Statistics statistics; private final String path; - private final long contentLength; private final int bufferSize; // default buffer size private final int footerReadSize; // default buffer size to read when reading footer @@ -94,8 +95,7 @@ public abstract class AbfsInputStream extends FSInputStream implements CanUnbuff // User configured size of read ahead. private final int readAheadRange; - private boolean firstRead = true; // to identify first read for optimizations - + private boolean firstRead = true; // SAS tokens can be re-used until they expire private CachedSASToken cachedSasToken; private byte[] buffer = null; // will be initialized on first use @@ -125,6 +125,8 @@ public abstract class AbfsInputStream extends FSInputStream implements CanUnbuff private final AbfsInputStreamContext context; private IOStatistics ioStatistics; private String filePathIdentifier; + private VectoredReadHandler vectoredReadHandler; + /** * This is the actual position within the object, used by * lazy seek to decide whether to seek on the next read or not. @@ -199,7 +201,7 @@ public AbfsInputStream( readAheadBlockSize, client.getAbfsConfiguration()); readBufferManager = ReadBufferManagerV2.getBufferManager(client.getAbfsCounters()); } else { - ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize); + ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize, client.getAbfsConfiguration()); readBufferManager = ReadBufferManagerV1.getBufferManager(); } @@ -220,6 +222,14 @@ private String createInputStreamId() { return StringUtils.right(UUID.randomUUID().toString(), STREAM_ID_LEN); } + /** + * Retrieves the handler responsible for processing vectored read requests. + * @return the {@link VectoredReadHandler} instance associated with the buffer manager. + */ + VectoredReadHandler getVectoredReadHandler() { + return getReadBufferManager().getVectoredReadHandler(); + } + @Override public int read(long position, byte[] buffer, int offset, int length) throws IOException { @@ -331,6 +341,19 @@ public synchronized int read(final byte[] b, final int off, final int len) throw return totalReadBytes > 0 ? totalReadBytes : lastReadBytes; } + /** + * {@inheritDoc} + * Vectored read implementation for AbfsInputStream. + * + * @param ranges the byte ranges to read. + * @param allocate the function to allocate ByteBuffer. + */ + @Override + public void readVectored(List ranges, + IntFunction allocate) throws EOFException { + getVectoredReadHandler().readVectored(this, ranges, allocate); + } + private boolean shouldReadFully() { return this.firstRead && this.context.readSmallFilesCompletely() && this.contentLength <= this.bufferSize; @@ -795,7 +818,10 @@ public synchronized void unbuffer() { @Override public boolean hasCapability(String capability) { - return StreamCapabilities.UNBUFFER.equals(toLowerCase(capability)); + return switch (toLowerCase(capability)) { + case StreamCapabilities.UNBUFFER, StreamCapabilities.VECTOREDIO -> true; + default -> false; + }; } /** @@ -1069,7 +1095,7 @@ ReadBufferManager getReadBufferManager() { */ @Override public int minSeekForVectorReads() { - return S_128K; + return client.getAbfsConfiguration().getMinSeekForVectoredReads(); } /** @@ -1078,7 +1104,7 @@ public int minSeekForVectorReads() { */ @Override public int maxReadSizeForVectorReads() { - return S_2M; + return client.getAbfsConfiguration().getMaxSeekForVectoredReads(); } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java index a6aa75f59d261..b5809d97e07c0 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBuffer.java @@ -19,15 +19,21 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntFunction; import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; +import org.apache.hadoop.fs.azurebfs.enums.BufferType; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.impl.CombinedFileRange; import static org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus.READ_FAILED; -class ReadBuffer { +public class ReadBuffer { private AbfsInputStream stream; private String eTag; @@ -48,6 +54,13 @@ class ReadBuffer { private boolean isLastByteConsumed = false; private boolean isAnyByteConsumed = false; private AtomicInteger refCount = new AtomicInteger(0); + private BufferType bufferType = BufferType.NORMAL; + // list of combined file ranges for vectored read. + private List vectoredUnits = new ArrayList<>(); + // Allocator used for vectored fan-out; captured at queue time */ + private IntFunction allocator; + // Tracks whether fanOut has already been executed + private final AtomicInteger fanOutDone = new AtomicInteger(0); private IOException errException = null; @@ -199,4 +212,67 @@ public void setAnyByteConsumed(boolean isAnyByteConsumed) { public boolean isFullyConsumed() { return isFirstByteConsumed() && isLastByteConsumed(); } + + void addVectoredUnit(CombinedFileRange u) { + vectoredUnits.add(u); + } + + List getVectoredUnits() { + return vectoredUnits; + } + + void clearVectoredUnits() { + if (vectoredUnits != null) { + vectoredUnits.clear(); + } + } + + public BufferType getBufferType() { + return bufferType; + } + + public void setBufferType(final BufferType bufferType) { + this.bufferType = bufferType; + } + + /** + * Set the allocator associated with this buffer. + * + *

The same allocator instance may be shared across multiple buffers + * belonging to a single vectored read operation. It is captured at + * queue time so it is available when the asynchronous read completes.

+ * + * @param allocator allocator used for vectored fan-out + */ + public void setAllocator(IntFunction allocator) { + this.allocator = allocator; + } + + /** + * Return the allocator associated with this buffer. + * + * @return allocator used for vectored fan-out, or {@code null} for + * non-vectored buffers + */ + public IntFunction getAllocator() { + return allocator; + } + + /** + * Attempt to execute vectored fan-out exactly once for this buffer. + * + * @return {@code true} if the caller should perform fan-out; {@code false} + * if fan-out has already been executed + */ + boolean tryFanOut() { + return fanOutDone.compareAndSet(0, 1); + } + + /** + * @return {@code true} if vectored fan-out has already been executed + * for this buffer; {@code false} otherwise + */ + boolean isFanOutDone() { + return fanOutDone.get() == 1; + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java index 5b53d641a20df..1151f9b5d363b 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManager.java @@ -19,18 +19,23 @@ package org.apache.hadoop.fs.azurebfs.services; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collection; import java.util.LinkedList; import java.util.List; import java.util.Queue; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.IntFunction; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.VisibleForTesting; import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; +import org.apache.hadoop.fs.azurebfs.enums.VectoredReadStrategy; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.impl.CombinedFileRange; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.DEFAULT_READ_AHEAD_BLOCK_SIZE; @@ -67,6 +72,18 @@ abstract void queueReadAhead(AbfsInputStream stream, int requestedLength, TracingContext tracingContext); + /** + * Queues a read-ahead request from {@link AbfsInputStream} + * for a given offset in file and given length. + * + * @param stream the input stream requesting the read-ahead + * @param unit buffer-sized vectored read unit to be queued + * @param tracingContext the tracing context for diagnostics + */ + abstract boolean queueVectoredRead(AbfsInputStream stream, + CombinedFileRange unit, + TracingContext tracingContext, IntFunction allocator); + /** * Gets a block of data from the prefetched data by ReadBufferManager. * {@link AbfsInputStream} calls this method read any bytes already available in a buffer (thereby saving a @@ -127,6 +144,14 @@ abstract void doneReading(ReadBuffer buffer, @VisibleForTesting abstract int getNumBuffers(); + abstract VectoredReadHandler getVectoredReadHandler(); + + abstract VectoredReadStrategy getVectoredReadStrategy(); + + abstract int getMaxSeekForVectoredReads(); + + abstract int getMaxSeekForVectoredReadsThroughput(); + /** * Attempts to evict buffers based on the eviction policy. */ @@ -285,5 +310,94 @@ protected void testMimicFullUseAndAddFailedBuffer(ReadBuffer buf) { completedReadList.add(buf); } + /** + * Finds an existing {@link ReadBuffer} for the given stream whose buffered + * range covers the specified logical offset. + * + *

The search is performed in the read-ahead queue, in-progress list, + * and completed-read list, in that order. + * + * @param stream the {@link AbfsInputStream} associated with the read request + * + * @return a matching {@link ReadBuffer} if one exists, or {@code null} otherwise + */ + ReadBuffer findQueuedBuffer(final AbfsInputStream stream, + long requestedOffset) { + ReadBuffer buffer; + buffer = findInList(getReadAheadQueue(), stream, requestedOffset); + if (buffer != null) { + return buffer; + } + buffer = findInList(getInProgressList(), stream, requestedOffset); + if (buffer != null) { + return buffer; + } + return findInList(getCompletedReadList(), stream, requestedOffset); + } + + /** + * Searches the given collection of {@link ReadBuffer}s for one that belongs + * to the specified stream and whose buffered range covers the given offset. + * + * @param buffers the collection of {@link ReadBuffer}s to search + * @param stream the {@link AbfsInputStream} associated with the read request + * + * @return the matching {@link ReadBuffer}, or {@code null} if none is found + */ + ReadBuffer findInList(final Collection buffers, + final AbfsInputStream stream, long requestedOffset) { + for (ReadBuffer buffer : buffers) { + if (buffer.getStream() == stream + && requestedOffset >= buffer.getOffset() + && requestedOffset < buffer.getOffset() + + buffer.getRequestedLength()) { + return buffer; + } + } + return null; + } + + /** + * Handle vectored-read completion for a buffer. + * + *

If the buffer participates in a vectored read, this method performs + * vectored fan-out exactly once when the physical read completes successfully, + * or fails all associated logical ranges on error. Vectored units are cleared + * after fan-out is finalized to allow safe publication or reuse of the buffer.

+ * + * @param buffer the read buffer whose physical read has completed + * @param result the completion status of the physical read + * @param bytesActuallyRead number of bytes read from the backend + */ + void handleVectoredCompletion( + ReadBuffer buffer, + ReadBufferStatus result, + int bytesActuallyRead) { + try { + if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { + if (buffer.tryFanOut()) { + getVectoredReadHandler().fanOut(buffer, bytesActuallyRead); + } + } else { + LOGGER.debug( + "Handling vectored completion for buffer with path: {}, offset: {}, length: {}, result: {}, bytesActuallyRead: {}", + buffer.getPath(), buffer.getOffset(), buffer.getRequestedLength(), + result, bytesActuallyRead); + throw new IOException( + "Vectored read failed for path: " + buffer.getPath() + + ", status=" + buffer.getStatus()); + } + } catch (Exception e) { + // Fail all logical FileRange futures + getVectoredReadHandler().failBufferFutures(buffer, e); + buffer.setStatus(ReadBufferStatus.READ_FAILED); + } finally { + if (buffer.isFanOutDone()) { + // Must be cleared before publication / reuse + buffer.clearVectoredUnits(); + } + } + } + abstract void clearFreeList(); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java index c034d85659603..ce64daed6d71a 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV1.java @@ -17,9 +17,8 @@ */ package org.apache.hadoop.fs.azurebfs.services; -import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; - import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -27,10 +26,17 @@ import java.util.List; import java.util.Stack; import java.util.concurrent.CountDownLatch; +import java.util.function.IntFunction; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; +import org.apache.hadoop.fs.azurebfs.constants.ReadType; +import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; +import org.apache.hadoop.fs.azurebfs.enums.BufferType; +import org.apache.hadoop.fs.azurebfs.enums.VectoredReadStrategy; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.impl.CombinedFileRange; import org.apache.hadoop.util.concurrent.SubjectInheritingThread; -import org.apache.hadoop.classification.VisibleForTesting; /** * The Read Buffer Manager for Rest AbfsClient. @@ -46,21 +52,37 @@ public final class ReadBufferManagerV1 extends ReadBufferManager { private byte[][] buffers; private Stack freeList = new Stack<>(); // indices in buffers[] array that are available private static ReadBufferManagerV1 bufferManager; + private final VectoredReadHandler vectoredReadHandler; + private static VectoredReadStrategy vectoredReadStrategy; + private static int maxSeekForVectoredReads; + private static int maxSeekForeVectoredReadsThroughput; // hide instance constructor private ReadBufferManagerV1() { + this.vectoredReadHandler = new VectoredReadHandler(this); LOGGER.trace("Creating readbuffer manager with HADOOP-18546 patch"); } + public VectoredReadHandler getVectoredReadHandler() { + return vectoredReadHandler; + } + /** * Sets the read buffer manager configurations. + * * @param readAheadBlockSize the size of the read-ahead block in bytes + * @param abfsConfiguration the configuration to set for the ReadBufferManagerV1. */ - public static void setReadBufferManagerConfigs(int readAheadBlockSize) { + public static void setReadBufferManagerConfigs(int readAheadBlockSize, + AbfsConfiguration abfsConfiguration) { if (bufferManager == null) { LOGGER.debug( "ReadBufferManagerV1 not initialized yet. Overriding readAheadBlockSize as {}", readAheadBlockSize); + vectoredReadStrategy = abfsConfiguration.getVectoredReadStrategy(); + maxSeekForVectoredReads = abfsConfiguration.getMaxSeekForVectoredReads(); + maxSeekForeVectoredReadsThroughput + = abfsConfiguration.getMaxSeekForVectoredReadsThroughput(); setReadAheadBlockSize(readAheadBlockSize); setThresholdAgeMilliseconds(DEFAULT_THRESHOLD_AGE_MILLISECONDS); } @@ -146,6 +168,124 @@ public void queueReadAhead(final AbfsInputStream stream, final long requestedOff } } + /** + * Queue a vectored read for a buffer-sized physical read unit. + * + *

The method first attempts to attach the logical unit to an already + * in-progress physical read for the same file and offset. If that is not + * possible, a free read buffer is acquired and a new backend read is + * queued.

+ * + * @param stream input stream for the file being read + * @param unit buffer-sized combined file range to be read + * @param tracingContext tracing context used for the backend read request + * @param allocator allocator used to create buffers for vectored fan-out + * @return {@code true} if the read was queued or attached to an existing + * in-progress buffer; {@code false} if no buffer was available + */ + boolean queueVectoredRead(AbfsInputStream stream, + CombinedFileRange unit, + TracingContext tracingContext, + IntFunction allocator) { + /* Create a child tracing context for vectored read-ahead requests */ + TracingContext readAheadTracingContext = + new TracingContext(tracingContext); + readAheadTracingContext.setPrimaryRequestID(); + readAheadTracingContext.setReadType(ReadType.VECTORED_READ); + + synchronized (this) { + if (isAlreadyQueued(stream, unit.getOffset())) { + ReadBuffer existing = findQueuedBuffer(stream, unit.getOffset()); + if (existing != null && existing.getStream().getETag() != null + && stream.getETag().equals(existing.getStream().getETag())) { + /* + * For AVAILABLE buffers use actual bytes read (getLength()) for + * coverage check. For READING_IN_PROGRESS buffers use + * requestedLength as an estimate — the short-read guard will be + * applied later in doneReading before dispatching completion. + */ + long end = existing.getOffset() + ( + existing.getStatus() == ReadBufferStatus.AVAILABLE + ? existing.getLength() + : existing.getRequestedLength()); + if (end >= unit.getOffset() + unit.getLength()) { + existing.setBufferType(BufferType.VECTORED); + existing.addVectoredUnit(unit); + existing.setAllocator(allocator); + if (existing.getStatus() == ReadBufferStatus.AVAILABLE) { + /* + * Buffer is already AVAILABLE. Trigger completion immediately. + * Use getLength() (actual bytes) for coverage — redundant here + * since the outer check already used getLength() for AVAILABLE, + * but kept explicit for clarity. + */ + LOGGER.debug("Hitchhiking onto AVAILABLE buffer {}, length {}", + existing, existing.getLength()); + handleVectoredCompletion(existing, + existing.getStatus(), + existing.getLength()); + } + /* + * For READING_IN_PROGRESS: unit is attached and will be + * completed in doneReading once actual bytes are known. + * Short-read safety is enforced there via per-unit coverage check. + */ + return true; + } + } + } + /* + * Ensure a free buffer is available, attempting best-effort recovery + * through memory upscaling or eviction if necessary. + */ + if (getFreeList().isEmpty() && !tryEvict()) { + return false; + } + /* + * Create a logical ReadBuffer descriptor without binding pooled memory. + * This captures metadata required to schedule the physical read. + */ + ReadBuffer buffer = new ReadBuffer(); + buffer.setStream(stream); + buffer.setETag(stream.getETag()); + buffer.setPath(stream.getPath()); + buffer.setOffset(unit.getOffset()); + buffer.setRequestedLength(unit.getLength()); + buffer.setBufferType(BufferType.VECTORED); + buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); + buffer.setLatch(new CountDownLatch(1)); + buffer.addVectoredUnit(unit); + buffer.setAllocator(allocator); + buffer.setTracingContext(readAheadTracingContext); + /* + * Perform a final free-list check before consuming pooled memory to + * ensure buffer availability. + */ + if (getFreeList().isEmpty()) { + return false; + } + Integer bufferIndex = getFreeList().pop(); + if (bufferIndex >= buffers.length) { + /* Defensive guard; should never occur */ + return false; + } + /* + * Bind the physical buffer and queue the read for asynchronous + * execution. + */ + buffer.setBuffer(buffers[bufferIndex]); + buffer.setBufferindex(bufferIndex); + + getReadAheadQueue().add(buffer); + notifyAll(); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("Done q-ing readAhead for file {} offset {} buffer idx {}", + stream.getPath(), unit.getOffset(), buffer.getBufferindex()); + } + return true; + } + } + /** * {@inheritDoc} */ @@ -208,8 +348,69 @@ public ReadBuffer getNextBlockToRead() throws InterruptedException { public void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { if (LOGGER.isTraceEnabled()) { LOGGER.trace("ReadBufferWorker completed read file {} for offset {} outcome {} bytes {}", - buffer.getStream().getPath(), buffer.getOffset(), result, bytesActuallyRead); + buffer.getStream().getPath(), buffer.getOffset(), result, bytesActuallyRead); + } + + List vectoredUnits = buffer.getVectoredUnits(); + if (result == ReadBufferStatus.AVAILABLE + && (buffer.getBufferType() == BufferType.VECTORED && !vectoredUnits.isEmpty())) { + + /* + * Set length BEFORE handling vectored completion so that any + * hitchhiked units that call existing.getLength() see the correct + * actual value rather than 0. + */ + buffer.setLength(bytesActuallyRead); + + /* + * Guard against short reads: units hitchhiked while buffer was + * READING_IN_PROGRESS used requestedLength as coverage estimate. + * Now that actual bytes are known, fail any units not fully covered + * so their callers are not left hanging on the CompletableFuture. + */ + long actualEnd = buffer.getOffset() + bytesActuallyRead; + + /* + * Fast path: check if any unit exceeds actual bytes read before + * doing expensive stream/collect. Short reads are rare so this + * avoids unnecessary allocations in the common case. + */ + boolean hasUncovered = false; + for (CombinedFileRange u : vectoredUnits) { + if ((u.getOffset() + u.getLength()) > actualEnd) { + hasUncovered = true; + break; + } + } + + if (hasUncovered) { + /* + * Short read detected — fail uncovered units explicitly so callers + * are not left hanging on their CompletableFuture. + */ + Iterator it = vectoredUnits.iterator(); + while (it.hasNext()) { + CombinedFileRange u = it.next(); + if ((u.getOffset() + u.getLength()) > actualEnd) { + it.remove(); + LOGGER.debug( + "Vectored unit not covered by actual bytes read: unitEnd={} actualEnd={}, failing unit", + (u.getOffset() + u.getLength()), actualEnd); + u.getData().completeExceptionally(new IOException( + "Vectored read unit not covered by actual bytes read: " + + "unitEnd=" + (u.getOffset() + u.getLength()) + + " actualEnd=" + actualEnd)); + } + } + } + + if (!vectoredUnits.isEmpty()) { + LOGGER.debug("Entering vectored read completion with buffer {}, result {}, bytesActuallyRead {}", + buffer, result, bytesActuallyRead); + handleVectoredCompletion(buffer, result, bytesActuallyRead); + } } + synchronized (this) { // If this buffer has already been purged during // close of InputStream then we don't update the lists. @@ -623,6 +824,21 @@ private static void setBufferManager(ReadBufferManagerV1 manager) { bufferManager = manager; } + @VisibleForTesting + public VectoredReadStrategy getVectoredReadStrategy() { + return vectoredReadStrategy; + } + + @VisibleForTesting + public int getMaxSeekForVectoredReads() { + return maxSeekForVectoredReads; + } + + @VisibleForTesting + public int getMaxSeekForVectoredReadsThroughput() { + return maxSeekForeVectoredReadsThroughput; + } + @Override protected void clearFreeList() { getFreeList().clear(); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java index c271a5e354acd..a4c0ac7720988 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferManagerV2.java @@ -19,9 +19,11 @@ import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.JvmUniqueIdProvider; +import org.apache.hadoop.fs.azurebfs.constants.ReadType; import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; import java.util.Iterator; @@ -38,10 +40,14 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.IntFunction; +import org.apache.hadoop.fs.azurebfs.enums.BufferType; +import org.apache.hadoop.fs.azurebfs.enums.VectoredReadStrategy; import org.apache.hadoop.fs.azurebfs.utils.ResourceUtilizationUtils; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.impl.CombinedFileRange; import org.apache.hadoop.util.concurrent.SubjectInheritingThread; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; @@ -122,6 +128,10 @@ public final class ReadBufferManagerV2 extends ReadBufferManager { private volatile String lastScaleDirection = EMPTY_STRING; /* Maximum CPU utilization observed during the monitoring interval. */ private volatile long maxJvmCpuUtilization = 0L; + private final VectoredReadHandler vectoredReadHandler; + private static VectoredReadStrategy vectoredReadStrategy; + private static int maxSeekForVectoredReads; + private static int maxSeekForeVectoredReadsThroughput; /** * Private constructor to prevent instantiation as this needs to be singleton. @@ -131,6 +141,7 @@ public final class ReadBufferManagerV2 extends ReadBufferManager { private ReadBufferManagerV2(AbfsCounters abfsCounters) { this.abfsCounters = abfsCounters; readThreadPoolMetrics = abfsCounters.getAbfsReadResourceUtilizationMetrics(); + vectoredReadHandler = new VectoredReadHandler(this); printTraceLog("Creating Read Buffer Manager V2 with HADOOP-18546 patch"); } @@ -160,12 +171,16 @@ static ReadBufferManagerV2 getBufferManager(AbfsCounters abfsCounters) { return bufferManager; } +VectoredReadHandler getVectoredReadHandler() { + return vectoredReadHandler; + } + /** * Set the ReadBufferManagerV2 configurations based on the provided before singleton initialization. * @param readAheadBlockSize the read-ahead block size to set for the ReadBufferManagerV2. * @param abfsConfiguration the configuration to set for the ReadBufferManagerV2. */ - public static void setReadBufferManagerConfigs(final int readAheadBlockSize, + public static void setReadBufferManagerConfigs(int readAheadBlockSize, final AbfsConfiguration abfsConfiguration) { // Set Configs only before initializations. if (bufferManager == null && !isConfigured.get()) { @@ -194,6 +209,9 @@ public static void setReadBufferManagerConfigs(final int readAheadBlockSize, abfsConfiguration.getReadAheadV2CachedBufferTTLMillis()); isDynamicScalingEnabled = abfsConfiguration.isReadAheadV2DynamicScalingEnabled(); + vectoredReadStrategy = abfsConfiguration.getVectoredReadStrategy(); + maxSeekForVectoredReads = abfsConfiguration.getMaxSeekForVectoredReads(); + maxSeekForeVectoredReadsThroughput = abfsConfiguration.getMaxSeekForVectoredReadsThroughput(); setReadAheadBlockSize(readAheadBlockSize); setIsConfigured(true); } @@ -338,6 +356,117 @@ public void queueReadAhead(final AbfsInputStream stream, } } + /** + * Queue a vectored read for a buffer-sized physical read unit. + * + *

The method first attempts to attach the logical unit to an already + * in-progress physical read for the same file and offset. If that is not + * possible, a free read buffer is acquired and a new backend read is + * queued.

+ * + * @param stream input stream for the file being read + * @param unit buffer-sized combined file range to be read + * @param tracingContext tracing context used for the backend read request + * @param allocator allocator used to create buffers for vectored fan-out + * @return {@code true} if the read was queued or attached to an existing + * in-progress buffer; {@code false} if no buffer was available + */ + boolean queueVectoredRead(AbfsInputStream stream, + CombinedFileRange unit, + TracingContext tracingContext, + IntFunction allocator) { + /* Create a child tracing context for vectored read-ahead requests */ + TracingContext readAheadTracingContext = + new TracingContext(tracingContext); + readAheadTracingContext.setPrimaryRequestID(); + readAheadTracingContext.setReadType(ReadType.VECTORED_READ); + synchronized (this) { + /* + * Attempt to hitchhike on an existing in-progress physical read if it + * covers the requested logical range completely. + */ + if (isAlreadyQueued(stream.getETag(), unit.getOffset())) { + ReadBuffer existing = findQueuedBuffer(stream, unit.getOffset()); + if (existing != null && existing.getStream().getETag() != null && stream.getETag() + .equals(existing.getStream().getETag())) { + long end = existing.getOffset() + ( + existing.getStatus() == ReadBufferStatus.AVAILABLE + ? existing.getLength() + : existing.getRequestedLength()); + if (end >= unit.getOffset() + unit.getLength()) { + existing.setBufferType(BufferType.VECTORED); + existing.addVectoredUnit(unit); + existing.setAllocator(allocator); + if (existing.getStatus() == ReadBufferStatus.AVAILABLE) { + /* + * Buffer is already AVAILABLE. Trigger completion immediately. + * Use getLength() (actual bytes) for coverage — redundant here + * since the outer check already used getLength() for AVAILABLE, + * but kept explicit for clarity. + */ + LOGGER.debug("Hitchhiking onto AVAILABLE buffer {}, length {}", + existing, existing.getLength()); + handleVectoredCompletion(existing, + existing.getStatus(), + existing.getLength()); + } + /* + * For READING_IN_PROGRESS: unit is attached and will be + * completed in doneReading once actual bytes are known. + * Short-read safety is enforced there via per-unit coverage check. + */ + return true; + } + } + } + /* + * Ensure a free buffer is available, attempting best-effort recovery + * through memory upscaling or eviction if necessary. + */ + if (isFreeListEmpty() && !tryMemoryUpscale() && !tryEvict()) { + return false; + } + /* + * Create a logical ReadBuffer descriptor without binding pooled memory. + * This captures metadata required to schedule the physical read. + */ + ReadBuffer buffer = new ReadBuffer(); + buffer.setStream(stream); + buffer.setETag(stream.getETag()); + buffer.setPath(stream.getPath()); + buffer.setOffset(unit.getOffset()); + buffer.setRequestedLength(unit.getLength()); + buffer.setBufferType(BufferType.VECTORED); + buffer.setStatus(ReadBufferStatus.NOT_AVAILABLE); + buffer.setLatch(new CountDownLatch(1)); + buffer.addVectoredUnit(unit); + buffer.setAllocator(allocator); + buffer.setTracingContext(readAheadTracingContext); + /* + * Perform a final free-list check before consuming pooled memory to + * ensure buffer availability. + */ + if (isFreeListEmpty()) { + return false; + } + Integer bufferIndex = popFromFreeList(); + if (bufferIndex >= bufferPool.length) { + /* Defensive guard; should never occur */ + return false; + } + /* + * Bind the physical buffer and queue the read for asynchronous + * execution. + */ + buffer.setBuffer(bufferPool[bufferIndex]); + buffer.setBufferindex(bufferIndex); + + getReadAheadQueue().add(buffer); + notifyAll(); + return true; + } + } + /** * {@link AbfsInputStream} calls this method read any bytes already available in a buffer (thereby saving a * remote read). This returns the bytes if the data already exists in buffer. If there is a buffer that is reading @@ -427,10 +556,65 @@ public ReadBuffer getNextBlockToRead() throws InterruptedException { public void doneReading(final ReadBuffer buffer, final ReadBufferStatus result, final int bytesActuallyRead) { - printTraceLog( - "ReadBufferWorker completed prefetch for file: {} with eTag: {}, for offset: {}, queued by stream: {}, with status: {} and bytes read: {}", - buffer.getPath(), buffer.getETag(), buffer.getOffset(), - buffer.getStream().hashCode(), result, bytesActuallyRead); + if (LOGGER.isTraceEnabled()) { + LOGGER.trace("ReadBufferWorker completed read file {} for offset {} outcome {} bytes {}", + buffer.getStream().getPath(), buffer.getOffset(), result, bytesActuallyRead); + } + List vectoredUnits = buffer.getVectoredUnits(); + if (result == ReadBufferStatus.AVAILABLE + && (buffer.getBufferType() == BufferType.VECTORED && !vectoredUnits.isEmpty())) { + + /* + * Set length BEFORE handling vectored completion so that any + * hitchhiked units that call existing.getLength() see the correct + * actual value rather than 0. + */ + buffer.setLength(bytesActuallyRead); + /* + * Guard against short reads: units hitchhiked while buffer was + * READING_IN_PROGRESS used requestedLength as coverage estimate. + * Now that actual bytes are known, fail any units not fully covered + * so their callers are not left hanging on the CompletableFuture. + */ + long actualEnd = buffer.getOffset() + bytesActuallyRead; + /* + * Fast path: check if any unit exceeds actual bytes read before + * doing expensive stream/collect. Short reads are rare so this + * avoids unnecessary allocations in the common case. + */ + boolean hasUncovered = false; + for (CombinedFileRange u : vectoredUnits) { + if ((u.getOffset() + u.getLength()) > actualEnd) { + hasUncovered = true; + break; + } + } + if (hasUncovered) { + /* + * Short read detected — fail uncovered units explicitly so callers + * are not left hanging on their CompletableFuture. + */ + Iterator it = vectoredUnits.iterator(); + while (it.hasNext()) { + CombinedFileRange u = it.next(); + if ((u.getOffset() + u.getLength()) > actualEnd) { + it.remove(); + LOGGER.debug( + "Vectored unit not covered by actual bytes read: unitEnd={} actualEnd={}, failing unit", + (u.getOffset() + u.getLength()), actualEnd); + u.getData().completeExceptionally(new IOException( + "Vectored read unit not covered by actual bytes read: " + + "unitEnd=" + (u.getOffset() + u.getLength()) + + " actualEnd=" + actualEnd)); + } + } + } + if (!vectoredUnits.isEmpty()) { + LOGGER.debug("Entering vectored read completion with buffer {}, result {}, bytesActuallyRead {}", + buffer, result, bytesActuallyRead); + handleVectoredCompletion(buffer, result, bytesActuallyRead); + } + } synchronized (this) { // If this buffer has already been purged during // close of InputStream then we don't update the lists. @@ -438,16 +622,18 @@ public void doneReading(final ReadBuffer buffer, getInProgressList().remove(buffer); if (result == ReadBufferStatus.AVAILABLE && bytesActuallyRead > 0) { // Successful read, so update the buffer status and length - buffer.setStatus(ReadBufferStatus.AVAILABLE); - buffer.setLength(bytesActuallyRead); + if (!buffer.isFanOutDone()) { + buffer.setStatus(ReadBufferStatus.AVAILABLE); + buffer.setLength(bytesActuallyRead); + } } else { // Failed read, reuse buffer for next read, this buffer will be // evicted later based on eviction policy. pushToFreeList(buffer.getBufferindex()); + buffer.setStatus(result); } // completed list also contains FAILED read buffers // for sending exception message to clients. - buffer.setStatus(result); buffer.setTimeStamp(currentTimeMillis()); getCompletedReadList().add(buffer); } @@ -506,7 +692,7 @@ private ReadBuffer getFromList(final Collection list, final String eTag, final long requestedOffset) { for (ReadBuffer buffer : list) { - if (eTag.equals(buffer.getETag())) { + if (eTag != null && eTag.equals(buffer.getETag())) { if (buffer.getStatus() == ReadBufferStatus.AVAILABLE && requestedOffset >= buffer.getOffset() && requestedOffset < buffer.getOffset() + buffer.getLength()) { @@ -749,7 +935,7 @@ private ReadBuffer getBufferFromCompletedQueue(final String eTag, for (ReadBuffer buffer : getCompletedReadList()) { // Buffer is returned if the requestedOffset is at or above buffer's // offset but less than buffer's length or the actual requestedLength - if (eTag.equals(buffer.getETag()) + if (eTag != null && eTag.equals(buffer.getETag()) && (requestedOffset >= buffer.getOffset()) && ((requestedOffset < buffer.getOffset() + buffer.getLength()) || (requestedOffset @@ -1120,6 +1306,21 @@ public ScheduledExecutorService getCpuMonitoringThread() { return cpuMonitorThread; } + @VisibleForTesting + public VectoredReadStrategy getVectoredReadStrategy() { + return vectoredReadStrategy; + } + + @VisibleForTesting + public int getMaxSeekForVectoredReads() { + return maxSeekForVectoredReads; + } + + @VisibleForTesting + public int getMaxSeekForVectoredReadsThroughput() { + return maxSeekForeVectoredReadsThroughput; + } + /** * Returns the maximum JVM CPU utilization observed during the current * monitoring interval or since the last reset. diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/VectoredReadHandler.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/VectoredReadHandler.java new file mode 100644 index 0000000000000..8d90cb85c9888 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/VectoredReadHandler.java @@ -0,0 +1,662 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.io.EOFException; +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.IntFunction; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.VectoredReadUtils; +import org.apache.hadoop.fs.azurebfs.constants.ReadType; +import org.apache.hadoop.fs.azurebfs.enums.VectoredReadStrategy; +import org.apache.hadoop.fs.azurebfs.utils.TracingContext; +import org.apache.hadoop.fs.impl.CombinedFileRange; + +/** + * Handles vectored read operations by coordinating with a ReadBufferManager + * and applying the configured VectoredReadStrategy. + * This class acts as the orchestration layer that decides how vectored reads + * are executed, while delegating buffer management and read behavior to + * dedicated components. + */ +class VectoredReadHandler { + + private static final Logger LOG = LoggerFactory.getLogger(VectoredReadHandler.class); + + /** + * Manages allocation, lifecycle, and reuse of read buffers + * used during vectored read operations. + */ + private final ReadBufferManager readBufferManager; + + /** + * Strategy defining how vectored reads should be performed. + */ + private final VectoredReadStrategy strategy; + + /** + * Shared reconstruction buffers for ranges that span multiple chunks. + * Keyed by the original FileRange instance. + */ + private final ConcurrentHashMap partialBuffers = + new ConcurrentHashMap<>(); + + /** + * Tracks remaining bytes to be received for each logical range. + * Keyed by the original FileRange instance. + */ + private final ConcurrentHashMap pendingBytes = + new ConcurrentHashMap<>(); + + /** + * Creates a VectoredReadHandler using the provided ReadBufferManager. + * The vectored read strategy is obtained from the manager to ensure + * consistent configuration across the read pipeline. + * + * @param readBufferManager manager responsible for buffer handling + * and providing the vectored read strategy + */ + VectoredReadHandler(ReadBufferManager readBufferManager) { + this.readBufferManager = readBufferManager; + this.strategy = readBufferManager.getVectoredReadStrategy(); + LOG.debug("VectoredReadHandler initialized with strategy={}", strategy); + } + + /** + * Perform a vectored read over multiple logical file ranges. + * + *

Logical ranges are first merged using a span-first strategy determined + * by the configured {@link VectoredReadStrategy}. The merged ranges are then + * split into buffer-sized physical read units and queued for asynchronous + * execution. If a pooled buffer is unavailable, the read falls back to a + * direct read path.

+ * + * @param stream input stream for the file being read + * @param ranges logical file ranges to read; each range will be completed + * with data or failure via its associated future + * @param allocator allocator used to create buffers for direct reads and + * vectored fan-out + */ + public void readVectored( + AbfsInputStream stream, + List ranges, + IntFunction allocator) throws EOFException { + LOG.debug("readVectored invoked: path={}, rangeCount={}", + stream.getPath(), ranges.size()); + + /* Initialize a future for each logical file range */ + /* Initialize a future for each logical file range */ + long fileLength = stream.getContentLength(); + List validRanges = new ArrayList<>(); + for (FileRange r : ranges) { + VectoredReadUtils.validateRangeRequest(r); + r.setData(new CompletableFuture<>()); + long offset = r.getOffset(); + long length = r.getLength(); + + if (offset < 0 || length < 0 || offset > fileLength || length > fileLength - offset) { + r.getData().completeExceptionally(new EOFException( + "Invalid range: offset=" + offset + ", length=" + length + ", fileLength=" + fileLength)); + continue; + } + validRanges.add(r); + } + + /* Select the maximum allowed merge span based on the configured strategy */ + int maxSpan = + (strategy == VectoredReadStrategy.TPS_OPTIMIZED) + ? readBufferManager.getMaxSeekForVectoredReads() + : readBufferManager.getMaxSeekForVectoredReadsThroughput(); + + LOG.debug("readVectored: path={}, strategy={}, maxSpan={}", + stream.getPath(), strategy, maxSpan); + + /* Merge logical ranges using a span-first coalescing strategy */ + List merged = mergeBySpanAndGap(validRanges, maxSpan, fileLength); + + LOG.debug("readVectored: path={}, mergedRangeCount={}", + stream.getPath(), merged.size()); + + /* Read buffer size acts as a hard upper bound for physical reads */ + int readBufferSize = ReadBufferManager.getReadAheadBlockSize(); + + /* Split merged ranges into buffer-sized chunks and queue each for read */ + for (CombinedFileRange unit : merged) { + List chunks = splitByBufferSize(unit, readBufferSize); + + LOG.debug("readVectored: path={}, mergedOffset={}, mergedLength={}, chunkCount={}", + stream.getPath(), unit.getOffset(), unit.getLength(), chunks.size()); + + for (CombinedFileRange chunk : chunks) { + try { + VectoredReadUtils.validateRangeRequest(chunk); + boolean queued = queueVectoredRead(stream, chunk, allocator); + if (!queued) { + LOG.debug("readVectored: buffer pool exhausted, falling back to directRead:" + + " path={}, offset={}, length={}", + stream.getPath(), chunk.getOffset(), chunk.getLength()); + directRead(stream, chunk, allocator); + } + } catch (Exception e) { + LOG.warn("readVectored: chunk read failed, failing underlying ranges:" + + " path={}, offset={}, length={}", + stream.getPath(), chunk.getOffset(), chunk.getLength(), e); + failUnit(chunk, e); + } + } + } + } + + /** + * Queues a vectored read request with the buffer manager. + * + * @return true if successfully queued, false if the queue is full and fallback is required. + */ + @VisibleForTesting + boolean queueVectoredRead(AbfsInputStream stream, CombinedFileRange unit, + IntFunction allocator) { + LOG.debug("queueVectoredRead: path={}, offset={}, length={}", + stream.getPath(), unit.getOffset(), unit.getLength()); + TracingContext tracingContext = stream.getTracingContext(); + tracingContext.setReadType(ReadType.VECTORED_READ); + return getReadBufferManager().queueVectoredRead(stream, unit, tracingContext, allocator); + } + + /** + * Accesses the shared manager responsible for coordinating asynchronous read buffers. + * + * @return the {@link ReadBufferManager} instance. + */ + public ReadBufferManager getReadBufferManager() { + return readBufferManager; + } + + /** + * Splits a merged logical {@link CombinedFileRange} into smaller + * buffer-sized physical read units. + * + *

Each resulting unit will have a maximum size equal to the provided + * {@code bufferSize}. Any handling of multi-chunk ranges or reassembly + * of underlying {@link FileRange} data is delegated to the fan-out logic.

+ * + * @param unit the combined logical range to be split + * @param bufferSize the maximum size (in bytes) of each physical read unit + * @return a list of buffer-sized {@link CombinedFileRange} instances + */ + private List splitByBufferSize( + CombinedFileRange unit, + int bufferSize) { + LOG.debug("splitByBufferSize: offset={}, length={}, bufferSize={}", + unit.getOffset(), unit.getLength(), bufferSize); + + List parts = new ArrayList<>(); + long unitStart = unit.getOffset(); + long unitEnd = unitStart + unit.getLength(); + long start = unitStart; + + while (start < unitEnd) { + long partEnd = Math.min(start + bufferSize, unitEnd); + + CombinedFileRange part = + new CombinedFileRange(start, partEnd, unit.getUnderlying().get(0)); + part.getUnderlying().clear(); + + for (FileRange r : unit.getUnderlying()) { + long rStart = r.getOffset(); + long rEnd = rStart + r.getLength(); + if (rEnd > start && rStart < partEnd) { + part.getUnderlying().add(r); + } + } + parts.add(part); + start = partEnd; + } + + LOG.debug("splitByBufferSize: offset={}, produced {} parts", + unit.getOffset(), parts.size()); + return parts; + } + + /** + * Merge logical {@link FileRange}s into {@link CombinedFileRange}s using a + * span-first coalescing strategy. + * + *

Ranges are merged as long as the total span from the first offset to the + * end of the last range does not exceed {@code maxSpan}. Gaps between ranges + * are ignored.

+ * + * @param ranges logical file ranges to merge + * @param maxSpan maximum allowed span (in bytes) for a combined read + * @return merged {@link CombinedFileRange}s covering the input ranges + */ + private List mergeBySpanAndGap( + List ranges, + int maxSpan, long fileLength) throws EOFException { + + LOG.debug("mergeBySpanAndGap: rangeCount={}, maxSpan={}", ranges.size(), maxSpan); + List sortedRanges = VectoredReadUtils.validateAndSortRanges( + ranges, Optional.of(fileLength)); + + List out = new ArrayList<>(); + CombinedFileRange current = null; + + for (FileRange r : sortedRanges) { + long rOffset = r.getOffset(); + long rEnd = rOffset + r.getLength(); + + /* Initialize the first combined range */ + if (current == null) { + current = new CombinedFileRange(rOffset, rEnd, r); + continue; + } + + /* Check whether adding this range keeps the total span within the limit */ + long newSpan = rEnd - current.getOffset(); + + if (newSpan <= maxSpan) { + current.setLength((int) newSpan); + current.getUnderlying().add(r); + } else { + /* Span exceeded; finalize current range and start a new one */ + out.add(current); + current = new CombinedFileRange(rOffset, rEnd, r); + } + } + + /* Add the final combined range, if any */ + if (current != null) { + out.add(current); + } + + LOG.debug("mergeBySpanAndGap: produced {} combined ranges", out.size()); + return out; + } + + /** + * Distributes data from a physical read buffer into the corresponding + * logical {@link FileRange}s. + * + *

This method performs a "fan-out" operation where a single physical + * read (represented by {@link ReadBuffer}) may contain data for multiple + * logical ranges. The relevant portions are copied into per-range buffers + * and completed once fully populated.

+ * + *

Partial reads are accumulated using {@code partialBuffers} and + * {@code pendingBytes}. A range is only completed when all expected + * bytes have been received.

+ * + *

Thread safety: + *

    + *
  • Each logical range buffer is synchronized independently
  • + *
  • Writes use {@code System.arraycopy} directly into the backing array + * to avoid shared {@link ByteBuffer} position mutation
  • + *
+ *

+ * + * @param buffer the physical read buffer containing merged data + * @param bytesRead number of valid bytes in the buffer + */ + void fanOut(ReadBuffer buffer, int bytesRead) { + LOG.debug("fanOut: path={}, bufferOffset={}, bytesRead={}", + buffer.getPath(), buffer.getOffset(), bytesRead); + + List units = buffer.getVectoredUnits(); + if (units == null) { + LOG.warn("fanOut: no vectored units found for path={}, offset={}", + buffer.getPath(), buffer.getOffset()); + return; + } + + long bufferStart = buffer.getOffset(); + long bufferEnd = bufferStart + bytesRead; + + /* Iterate over all combined logical units mapped to this buffer */ + for (CombinedFileRange unit : units) { + /* Each unit may contain multiple logical FileRanges */ + for (FileRange r : unit.getUnderlying()) { + CompletableFuture future = r.getData(); + + /* Skip already completed or cancelled ranges */ + if (future.isCancelled()) { + LOG.debug("fanOut: range cancelled, cleaning up: path={}, rangeOffset={}", + buffer.getPath(), r.getOffset()); + RangeKey key = new RangeKey(r); + partialBuffers.remove(key); + pendingBytes.remove(key); + continue; + } + if (future.isDone()) { + continue; + } + + try { + long rangeStart = r.getOffset(); + long rangeEnd = rangeStart + r.getLength(); + + /* Compute overlap between buffer and logical range */ + long overlapStart = Math.max(rangeStart, bufferStart); + long overlapEnd = Math.min(rangeEnd, bufferEnd); + + /* No overlap nothing to copy */ + if (overlapStart >= overlapEnd) { + LOG.debug("fanOut: no overlap for path={}, rangeOffset={}, bufferOffset={}", + buffer.getPath(), r.getOffset(), bufferStart); + continue; + } + + int srcOffset = (int) (overlapStart - bufferStart); + int destOffset = (int) (overlapStart - rangeStart); + int length = (int)(overlapEnd - overlapStart); + + LOG.debug("fanOut: copying path={}, rangeOffset={}, rangeLength={}," + + " bufferOffset={}, srcOffset={}, destOffset={}, length={}", + buffer.getPath(), r.getOffset(), r.getLength(), + bufferStart, srcOffset, destOffset, length); + + RangeKey key = new RangeKey(r); + + /* Allocate or reuse the full buffer for this logical range */ + ByteBuffer fullBuf = partialBuffers.computeIfAbsent( + key, k -> buffer.getAllocator().apply(r.getLength())); + + /* Track remaining bytes required to complete this range */ + AtomicInteger pending = pendingBytes.computeIfAbsent( + key, k -> new AtomicInteger(r.getLength())); + + synchronized (fullBuf) { + /* Double-check completion inside lock */ + if (future.isDone()) { + continue; + } + + ByteBuffer dst = fullBuf.duplicate(); + dst.position(destOffset); + dst.put(buffer.getBuffer(), srcOffset, length); + + int left = pending.addAndGet(-length); + + LOG.debug("fanOut: wrote chunk: path={}, rangeOffset={}, destOffset={}," + + " length={}, pendingBytes={}", + buffer.getPath(), r.getOffset(), destOffset, length, left); + + if (left < 0) { + LOG.error("fanOut: pending bytes went negative  possible duplicate write:" + + " path={}, rangeOffset={}, pending={}", + buffer.getPath(), r.getOffset(), left); + future.completeExceptionally(new IllegalStateException( + "Pending bytes negative for offset=" + r.getOffset())); + partialBuffers.remove(key); + pendingBytes.remove(key); + continue; + } + + /* Complete future once all bytes are received */ + if (left == 0 && !future.isDone()) { + /* + * Prepare buffer for reading. + * DO NOT use flip() because writes may arrive out-of-order. + * Instead explicitly expose the full buffer. + */ + fullBuf.position(0); + fullBuf.limit(fullBuf.capacity()); + + if (fullBuf.limit() != r.getLength()) { + LOG.warn("fanOut: buffer size mismatch: path={}, rangeOffset={}," + + " expected={}, actual={}", + buffer.getPath(), r.getOffset(), r.getLength(), fullBuf.limit()); + } + + future.complete(fullBuf); + partialBuffers.remove(key); + pendingBytes.remove(key); + + LOG.debug("fanOut: completed range: path={}, rangeOffset={}, rangeLength={}", + buffer.getPath(), r.getOffset(), r.getLength()); + } + } + } catch (Exception e) { + LOG.warn("fanOut: exception processing range: path={}, rangeOffset={}", + buffer.getPath(), r.getOffset(), e); + RangeKey key = new RangeKey(r); + partialBuffers.remove(key); + pendingBytes.remove(key); + if (!future.isDone()) { + future.completeExceptionally(e); + } + } + } + } + } + + /** + * Fails all logical {@link FileRange}s associated with a given + * {@link CombinedFileRange}. + * + *

This method is invoked when a vectored read for the combined unit + * fails. It ensures that: + *

    + *
  • Any partially accumulated buffers are cleaned up
  • + *
  • Pending byte tracking state is removed
  • + *
  • All corresponding {@link CompletableFuture}s are completed exceptionally
  • + *
+ *

+ * + * @param unit the combined vectored read unit whose underlying ranges must be failed + * @param t the exception that caused the failure + */ + private void failUnit(CombinedFileRange unit, Throwable t) { + for (FileRange r : unit.getUnderlying()) { + RangeKey key = new RangeKey(r); + partialBuffers.remove(key); + pendingBytes.remove(key); + CompletableFuture future = r.getData(); + if (future != null && !future.isDone()) { + future.completeExceptionally(t); + } + } + } + + /** + * Fails all {@link FileRange} futures associated with the given + * {@link ReadBuffer} and clears any partial state. + * + * @param buffer the read buffer whose ranges should be failed + * @param t the exception causing the failure + */ + void failBufferFutures(ReadBuffer buffer, Throwable t) { + List units = buffer.getVectoredUnits(); + if (units == null) return; + + for (CombinedFileRange unit : units) { + for (FileRange r : unit.getUnderlying()) { + RangeKey key = new RangeKey(r); + partialBuffers.remove(key); + pendingBytes.remove(key); + CompletableFuture future = r.getData(); + if (future != null && !future.isDone()) { + future.completeExceptionally(t); + } + } + } + } + + /** + * Performs a synchronous direct read for a {@link CombinedFileRange} + * when pooled buffering is not available. + * + *

Data is accumulated into per-range partial buffers shared with + * {@link #fanOut}, ensuring that ranges spanning multiple chunks are + * correctly reassembled regardless of whether individual chunks were + * served via the async queue or this direct fallback path.

+ * + * @param stream input stream to read from + * @param unit combined range to read + * @param allocator buffer allocator for logical ranges + * @throws IOException if the read fails + */ + void directRead( + AbfsInputStream stream, + CombinedFileRange unit, + IntFunction allocator) throws IOException { + + LOG.debug("directRead: path={}, offset={}, length={}", + stream.getPath(), unit.getOffset(), unit.getLength()); + + /* Read entire combined range into a temporary buffer */ + byte[] tmp = new byte[unit.getLength()]; + TracingContext tracingContext = new TracingContext(stream.getTracingContext()); + tracingContext.setReadType(ReadType.VECTORED_DIRECT_READ); + + int total = 0; + int requested = unit.getLength(); + while (total < requested) { + int n = stream.readRemote(unit.getOffset() + total, tmp, total, + requested - total, tracingContext); + if (n <= 0) { + throw new IOException( + "Unexpected end of stream during direct read: path=" + stream.getPath() + + ", offset=" + (unit.getOffset() + total) + + ", requested=" + requested); + } + total += n; + } + + LOG.debug("directRead: read complete: path={}, offset={}, bytesRead={}", + stream.getPath(), unit.getOffset(), total); + + long unitStart = unit.getOffset(); + long unitEnd = unitStart + unit.getLength(); + + /* Distribute data to each logical FileRange */ + for (FileRange r : unit.getUnderlying()) { + CompletableFuture future = r.getData(); + if (future == null || future.isDone()) continue; + + long rangeStart = r.getOffset(); + long rangeEnd = rangeStart + r.getLength(); + + /* Compute overlap between unit and logical range */ + long overlapStart = Math.max(rangeStart, unitStart); + long overlapEnd = Math.min(rangeEnd, unitEnd); + if (overlapStart >= overlapEnd) continue; + + int srcOffset = (int) (overlapStart - unitStart); + int destOffset = (int) (overlapStart - rangeStart); + int length = (int) (overlapEnd - overlapStart); + + LOG.debug("directRead: copying: path={}, rangeOffset={}, rangeLength={}," + + " srcOffset={}, destOffset={}, length={}", + stream.getPath(), r.getOffset(), r.getLength(), + srcOffset, destOffset, length); + + RangeKey key = new RangeKey(r); + + /* + * Use the shared partialBuffers/pendingBytes maps so that ranges + * spanning multiple chunks are correctly reassembled even when some + * chunks are served via directRead and others via the async fanOut path. + */ + ByteBuffer fullBuf = partialBuffers.computeIfAbsent( + key, k -> allocator.apply(r.getLength())); + AtomicInteger pending = pendingBytes.computeIfAbsent( + key, k -> new AtomicInteger(r.getLength())); + + synchronized (fullBuf) { + /* Re-check inside lock in case another chunk already completed this range */ + if (future.isDone()) continue; + + System.arraycopy(tmp, srcOffset, + fullBuf.array(), fullBuf.arrayOffset() + destOffset, + length); + + int left = pending.addAndGet(-length); + + LOG.debug("directRead: wrote chunk: path={}, rangeOffset={}, destOffset={}," + + " length={}, pendingBytes={}", + stream.getPath(), r.getOffset(), destOffset, length, left); + + if (left < 0) { + LOG.error("directRead: pending bytes went negative  possible duplicate write:" + + " path={}, rangeOffset={}, pending={}", + stream.getPath(), r.getOffset(), left); + future.completeExceptionally(new IllegalStateException( + "Pending bytes negative in directRead for offset=" + r.getOffset())); + partialBuffers.remove(key); + pendingBytes.remove(key); + continue; + } + + if (left == 0) { + fullBuf.position(0); + fullBuf.limit(r.getLength()); + future.complete(fullBuf); + partialBuffers.remove(key); + pendingBytes.remove(key); + LOG.debug("directRead: completed range: path={}, rangeOffset={}, rangeLength={}", + stream.getPath(), r.getOffset(), r.getLength()); + } + } + } + } + + /** + * Identity-based key wrapper for {@link FileRange}. + * + *

This class ensures that {@link FileRange} instances are compared + * using reference equality rather than logical equality. It is used as + * a key in maps where multiple ranges may have identical offsets and + * lengths but must be treated as distinct objects.

+ * + *

Equality and hash code are based on object identity + * ({@code ==}) via {@link System#identityHashCode(Object)}.

+ */ + static final class RangeKey { + private final FileRange range; + + RangeKey(FileRange range) { + this.range = range; + } + + @Override + public boolean equals(Object o) { + return this == o || + (o instanceof RangeKey && + this.range == ((RangeKey) o).range); + } + + @Override + public int hashCode() { + return System.identityHashCode(range); + } + } +} \ No newline at end of file diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java index e663db9bbdd39..57a82ee7c4060 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestReadBufferManager.java @@ -174,7 +174,7 @@ private ReadBufferManager getBufferManager(AzureBlobFileSystem fs) { getConfiguration()); return ReadBufferManagerV2.getBufferManager(fs.getAbfsStore().getClient().getAbfsCounters()); } - ReadBufferManagerV1.setReadBufferManagerConfigs(blockSize); + ReadBufferManagerV1.setReadBufferManagerConfigs(blockSize, fs.getAbfsStore().getClient().getAbfsConfiguration()); return ReadBufferManagerV1.getBufferManager(); } } diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestVectoredRead.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestVectoredRead.java new file mode 100644 index 0000000000000..56d06a9f53282 --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/ITestVectoredRead.java @@ -0,0 +1,654 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.azurebfs.services; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.function.IntFunction; + +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.Mockito; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileRange; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; +import org.apache.hadoop.fs.azurebfs.AzureBlobFileSystem; +import org.apache.hadoop.fs.azurebfs.enums.VectoredReadStrategy; +import org.apache.hadoop.fs.impl.CombinedFileRange; + +import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.FS_AZURE_VECTORED_READ_STRATEGY; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_MB; +import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ZERO; +import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult; + +public class ITestVectoredRead extends AbstractAbfsIntegrationTest { + private static final int DATA_2_MB = 2 * ONE_MB; + private static final int DATA_4_MB = 4 * ONE_MB; + private static final int DATA_8_MB = 8 * ONE_MB; + private static final int DATA_10_MB = 10 * ONE_MB; + private static final int DATA_12_MB = 12 * ONE_MB; + private static final int DATA_16_MB = 16 * ONE_MB; + private static final int DATA_32_MB = 32 * ONE_MB; + private static final int DATA_100_MB = 100 * ONE_MB; + private static final int OFFSET_100_B = 100; + private static final int OFFSET_15K_B = 15_000; + private static final int OFFSET_42K_B = 42_500; + private static final int LEN_10K_B = 10_000; + private static final int LEN_27K_B = 27_000; + private static final int LEN_40K_B = 40_000; + private static final double MB_1_2 = 1.2; + private static final double MB_3_1 = 3.1; + private static final double MB_4_1 = 4.1; + private static final double MB_6_2 = 6.2; + private static final double MB_0_8 = 0.8; + private static final double MB_0_9 = 0.9; + private static final double MB_1_8 = 1.8; + private static final double MB_1_9 = 1.9; + private static final double MB_3_8 = 3.8; + private static final double MB_4_0 = 4.0; + private static final double MB_3_2 = 3.2; + private static final double MB_8_0 = 8.0; + private static final double MB_2_0 = 2.0; + private static final double MB_12_0 = 12.0; + private static final double MB_16_0 = 16.0; + private static final int HUGE_OFFSET_1 = 5_856_368; + private static final int HUGE_OFFSET_2 = 3_520_861; + private static final int HUGE_OFFSET_3 = 8_191_913; + private static final int HUGE_OFFSET_4 = 1_520_861; + private static final int HUGE_OFFSET_5 = 2_520_861; + private static final int HUGE_OFFSET_6 = 9_191_913; + private static final int HUGE_OFFSET_7 = 2_820_861; + private static final int HUGE_RANGE = 116_770; + private static final int HUGE_RANGE_LARGE = 156_770; + private static final int LOOKUP_RETRIES = 100; + private static final int EXEC_THREADS = 3; + private static final int SEQ_READ_ITERATIONS = 5; + private static final int FUTURE_TIMEOUT_SEC = 50; + public static final int SLEEP_TIME = 10; + + public ITestVectoredRead() throws Exception { + } + + /** + * Verifies basic correctness of vectored reads using simple disjoint ranges. + * Compares vectored read output against a full sequential read to ensure + * data integrity is preserved. + */ + @Test + public void testDisjointRangesWithVectoredRead() throws Throwable { + int fileSize = ONE_MB; + final AzureBlobFileSystem fs = getFileSystem(); + String fileName = methodName.getMethodName() + 1; + byte[] fileContent = getRandomBytesArray(fileSize); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + + List rangeList = new ArrayList<>(); + rangeList.add(FileRange.createFileRange(OFFSET_100_B, LEN_10K_B)); + rangeList.add(FileRange.createFileRange(OFFSET_15K_B, LEN_27K_B)); + IntFunction allocate = ByteBuffer::allocate; + CompletableFuture builder = fs.openFile(testFilePath) + .build(); + + try (FSDataInputStream in = builder.get()) { + in.readVectored(rangeList, allocate); + byte[] readFullRes = new byte[(int) fileSize]; + in.readFully(0, readFullRes); + // Comparing vectored read results with read fully. + validateVectoredReadResult(rangeList, readFullRes, 0); + } + } + + /** + * Ensures disjoint but mergeable ranges result in fewer backend reads. + * Validates that vectored read coalescing reduces remote calls + * while still returning correct data. + */ + @Test + public void testVectoredReadDisjointRangesExpectTwoBackendReads() + throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + String fileName = methodName.getMethodName(); + byte[] fileContent = getRandomBytesArray(DATA_16_MB); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + List ranges = new ArrayList<>(); + ranges.add(FileRange.createFileRange(0L, ONE_MB)); + ranges.add(FileRange.createFileRange((long) (MB_1_2 * ONE_MB), + (int) (MB_0_8 * ONE_MB))); + ranges.add(FileRange.createFileRange((long) (MB_3_1 * ONE_MB), + (int) (MB_0_9 * ONE_MB))); + ranges.add(FileRange.createFileRange((long) (MB_4_1 * ONE_MB), + (int) (MB_1_9 * ONE_MB))); + ranges.add(FileRange.createFileRange((long) (MB_6_2 * ONE_MB), + (int) (MB_1_8 * ONE_MB))); + IntFunction allocate = ByteBuffer::allocate; + try (FSDataInputStream in = + fs.openFile(testFilePath).build().get()) { + AbfsInputStream abfsIn = (AbfsInputStream) in.getWrappedStream(); + AbfsInputStream spyIn = Mockito.spy(abfsIn); + spyIn.readVectored(ranges, allocate); + CompletableFuture[] futures = + new CompletableFuture[ranges.size()]; + int i = 0; + for (FileRange range : ranges) { + futures[i++] = range.getData(); + } + CompletableFuture.allOf(futures).get(); + validateVectoredReadResult(ranges, fileContent, 0); + Mockito.verify(spyIn, Mockito.times(2)) + .readRemote( + Mockito.anyLong(), + Mockito.any(byte[].class), + Mockito.anyInt(), + Mockito.anyInt(), + Mockito.any()); + } + } + + /** + * Validates fallback behavior when vectored read queuing fails. + * Ensures the implementation switches to direct reads and still + * completes all requested ranges correctly. + */ + @Test + public void testVectoredReadFallsBackToDirectReadWhenQueuingFails() + throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + String fileName = methodName.getMethodName(); + byte[] fileContent = getRandomBytesArray(DATA_4_MB); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + + List ranges = List.of( + FileRange.createFileRange(0, ONE_MB), + FileRange.createFileRange(DATA_2_MB, ONE_MB)); + IntFunction allocator = ByteBuffer::allocate; + + try (FSDataInputStream in = fs.openFile(testFilePath).build().get()) { + AbfsInputStream abfsIn = (AbfsInputStream) in.getWrappedStream(); + AbfsInputStream spyIn = Mockito.spy(abfsIn); + VectoredReadHandler realHandler = abfsIn.getVectoredReadHandler(); + VectoredReadHandler spyHandler = Mockito.spy(realHandler); + Mockito.doReturn(spyHandler).when(spyIn).getVectoredReadHandler(); + Mockito.doReturn(false) + .when(spyHandler) + .queueVectoredRead( + Mockito.any(AbfsInputStream.class), + Mockito.any(CombinedFileRange.class), + ArgumentMatchers.>any()); + spyIn.readVectored(ranges, allocator); + CompletableFuture[] futures + = new CompletableFuture[ranges.size()]; + for (int i = 0; i < ranges.size(); i++) { + futures[i] = ranges.get(i).getData(); + } + CompletableFuture.allOf(futures).get(); + Mockito.verify(spyHandler, Mockito.atLeastOnce()) + .directRead( + Mockito.any(AbfsInputStream.class), + Mockito.any(CombinedFileRange.class), + Mockito.eq(allocator)); + + validateVectoredReadResult(ranges, fileContent, 0); + } + } + + /** + * Tests vectored read correctness with multiple non-contiguous ranges. + * Confirms that all ranges are read correctly even when more than two + * disjoint segments are requested. + */ + @Test + public void testMultipleDisjointRangesWithVectoredRead() throws Throwable { + int fileSize = ONE_MB; + final AzureBlobFileSystem fs = getFileSystem(); + String fileName = methodName.getMethodName() + 1; + byte[] fileContent = getRandomBytesArray(fileSize); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + + List ranges = List.of( + FileRange.createFileRange(OFFSET_100_B, LEN_10K_B), + FileRange.createFileRange(OFFSET_15K_B, LEN_27K_B), + FileRange.createFileRange(OFFSET_42K_B, LEN_40K_B)); + IntFunction allocate = ByteBuffer::allocate; + CompletableFuture builder = fs.openFile(testFilePath) + .build(); + + try (FSDataInputStream in = builder.get()) { + in.readVectored(ranges, allocate); + byte[] readFullRes = new byte[(int) fileSize]; + in.readFully(0, readFullRes); + // Comparing vectored read results with read fully. + validateVectoredReadResult(ranges, readFullRes, 0); + } + } + + /** + * Exercises vectored reads on a large file with many scattered ranges. + * Ensures correctness and stability of vectored read logic under + * high-offset and large-file conditions. + */ + @Test + public void testVectoredIOHugeFile() throws Throwable { + int fileSize = DATA_100_MB; + final AzureBlobFileSystem fs = getFileSystem(); + String fileName = methodName.getMethodName() + 1; + byte[] fileContent = getRandomBytesArray(fileSize); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + + List ranges = List.of( + FileRange.createFileRange(HUGE_OFFSET_1, HUGE_RANGE), + FileRange.createFileRange(HUGE_OFFSET_2, HUGE_RANGE), + FileRange.createFileRange(HUGE_OFFSET_3, HUGE_RANGE), + FileRange.createFileRange(HUGE_OFFSET_4, HUGE_RANGE), + FileRange.createFileRange(HUGE_OFFSET_5, HUGE_RANGE), + FileRange.createFileRange(HUGE_OFFSET_6, HUGE_RANGE), + FileRange.createFileRange(HUGE_OFFSET_7, HUGE_RANGE_LARGE)); + IntFunction allocate = ByteBuffer::allocate; + + CompletableFuture builder = + fs.openFile(testFilePath).build(); + try (FSDataInputStream in = builder.get()) { + in.readVectored(ranges, allocate); + byte[] readFullRes = new byte[(int) fileSize]; + in.readFully(0, readFullRes); + // Comparing vectored read results with read fully. + validateVectoredReadResult(ranges, readFullRes, 0); + } + } + + /** + * Verifies that vectored reads and sequential reads can execute concurrently. + * Ensures correct behavior when prefetch and vectored I/O overlap in time. + */ + @Test + public void testSimultaneousPrefetchAndVectoredRead() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + String fileName = methodName.getMethodName(); + byte[] fileContent = getRandomBytesArray(DATA_16_MB); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + try (FSDataInputStream in = fs.openFile(testFilePath).build().get()) { + AbfsInputStream abfsIn = (AbfsInputStream) in.getWrappedStream(); + IntFunction allocator = ByteBuffer::allocate; + List vRanges = new ArrayList<>(); + vRanges.add(FileRange.createFileRange(DATA_10_MB, (int) ONE_MB)); + vRanges.add(FileRange.createFileRange(DATA_12_MB, (int) ONE_MB)); + byte[] seqBuffer = new byte[(int) ONE_MB]; + CountDownLatch latch = new CountDownLatch(1); + CompletableFuture vectoredTask = CompletableFuture.runAsync(() -> { + try { + latch.await(); + abfsIn.readVectored(vRanges, allocator); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + CompletableFuture sequentialTask = CompletableFuture.runAsync( + () -> { + try { + latch.await(); + abfsIn.read(0, seqBuffer, 0, (int) ONE_MB); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + latch.countDown(); + CompletableFuture.allOf(vectoredTask, sequentialTask).get(); + CompletableFuture[] vFutures = vRanges.stream() + .map(FileRange::getData) + .toArray(CompletableFuture[]::new); + CompletableFuture.allOf(vFutures).get(); + assertArrayEquals(Arrays.copyOfRange(fileContent, 0, (int) ONE_MB), + seqBuffer, "Sequential read data mismatch"); + validateVectoredReadResult(vRanges, fileContent, 0); + } + } + + /** + * Tests concurrent access using separate streams on different files. + * Ensures vectored reads on one file do not interfere with sequential + * reads and readahead on another file. + */ + @Test + public void testConcurrentStreamsOnDifferentFiles() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + // Create two distinct files with random content + byte[] content1 = getRandomBytesArray(DATA_16_MB); + byte[] content2 = getRandomBytesArray(DATA_16_MB); + Path path1 = createFileWithContent(fs, "file1", content1); + Path path2 = createFileWithContent(fs, "file2", content2); + + // Open two separate input streams for concurrent access + try (FSDataInputStream in1 = fs.openFile(path1).build().get(); + FSDataInputStream in2 = fs.openFile(path2).build().get()) { + + AbfsInputStream streamVectored = (AbfsInputStream) in1.getWrappedStream(); + AbfsInputStream streamSequential + = (AbfsInputStream) in2.getWrappedStream(); + IntFunction allocator = ByteBuffer::allocate; + + // Define non-contiguous ranges for the vectored read on file 1 + List ranges = List.of( + FileRange.createFileRange(DATA_2_MB, ONE_MB), + FileRange.createFileRange(DATA_4_MB, ONE_MB)); + + // Use a latch to ensure both threads start their I/O at the same time + CountDownLatch latch = new CountDownLatch(1); + + // Thread 1: Perform asynchronous vectored reads on file 1 + CompletableFuture vectoredTask = CompletableFuture.runAsync(() -> { + try { + latch.await(); + streamVectored.readVectored(ranges, allocator); + } catch (Exception e) { + throw new RuntimeException("Vectored read task failed", e); + } + }); + + // Thread 2: Perform multiple sequential reads on file 2 to trigger readahead + CompletableFuture sequentialTask = CompletableFuture.runAsync( + () -> { + try { + latch.await(); + for (int i = 0; i < SEQ_READ_ITERATIONS; i++) { + byte[] tempBuf = new byte[(int) ONE_MB]; + streamSequential.read(i * ONE_MB, tempBuf, 0, (int) ONE_MB); + // Validate data integrity for file 2 immediately + assertArrayEquals(Arrays.copyOfRange(content2, i * (int) ONE_MB, + (i + 1) * (int) ONE_MB), tempBuf, + "Sequential read mismatch in file 2 at block " + i); + } + } catch (Exception e) { + throw new RuntimeException("Sequential read task failed", e); + } + }); + // Trigger simultaneous execution + latch.countDown(); + // Wait for both high-level tasks to finish + CompletableFuture.allOf(vectoredTask, sequentialTask).get(); + + // Explicitly wait for the vectored read futures to complete their data transfer + CompletableFuture[] vFutures = ranges.stream() + .map(FileRange::getData) + .toArray(CompletableFuture[]::new); + CompletableFuture.allOf(vFutures).get(); + + // Final validation of vectored read content for file 1 + validateVectoredReadResult(ranges, content1, 0); + } + } + + /** + * Validates that vectored reads can reuse an in-progress prefetch buffer. + * Ensures no redundant backend read is issued when data is already + * available via readahead. + */ + @Test + public void testVectoredReadHitchhikesOnExistingPrefetch() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + String fileName = methodName.getMethodName(); + byte[] fileContent = getRandomBytesArray(DATA_8_MB); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + + try (FSDataInputStream in = fs.openFile(testFilePath).build().get()) { + AbfsInputStream abfsIn = (AbfsInputStream) in.getWrappedStream(); + AbfsInputStream spyIn = Mockito.spy(abfsIn); + + // 1. Trigger a normal read to start the prefetch logic + // Reading the first byte often triggers a larger readahead (e.g., 4MB) + byte[] seqBuf = new byte[1]; + spyIn.read(seqBuf, 0, 1); + + // 2. Immediately queue a vectored read for an offset within that prefetch range + List vRanges = new ArrayList<>(); + // Using 1MB offset, which should be inside the initial readahead buffer + vRanges.add(FileRange.createFileRange(ONE_MB, (int) ONE_MB)); + + IntFunction allocator = ByteBuffer::allocate; + spyIn.readVectored(vRanges, allocator); + + // 3. Wait for the vectored read to complete + vRanges.get(0).getData().get(); + + // 4. Validate Data Integrity + validateVectoredReadResult(vRanges, fileContent, ZERO); + + // 5. THE CRITICAL VALIDATION: + // Even though we did a manual read and a vectored read, + // there should only be ONE remote call if hitchhiking worked. + Mockito.verify(spyIn, Mockito.atMost(spyIn.getReadAheadQueueDepth())) + .readRemote( + Mockito.anyLong(), + Mockito.any(byte[].class), + Mockito.anyInt(), + Mockito.anyInt(), + Mockito.any()); + } + } + + @Test + public void testMultipleReadsWhileBufferInProgressEventuallyComplete() throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + String fileName = methodName.getMethodName(); + byte[] fileContent = getRandomBytesArray(DATA_8_MB); + Path testFilePath = createFileWithContent(fs, fileName, fileContent); + CountDownLatch blockCompletion = new CountDownLatch(1); + try (FSDataInputStream in = fs.openFile(testFilePath).build().get()) { + AbfsInputStream spyIn = Mockito.spy((AbfsInputStream) in.getWrappedStream()); + ReadBufferManager rbm = spyIn.getReadBufferManager(); + AtomicBoolean firstCall = new AtomicBoolean(true); + Mockito.doAnswer(invocation -> { + if (firstCall.getAndSet(false)) { + blockCompletion.await(); + } + return invocation.callRealMethod(); + }).when(spyIn).readRemote(Mockito.anyLong(),Mockito.any(byte[].class),Mockito.anyInt(),Mockito.anyInt(),Mockito.any()); + ExecutorService exec = Executors.newFixedThreadPool(EXEC_THREADS); + Future r1 = exec.submit(() -> { + try { + spyIn.read(new byte[1], 0, 1); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + ReadBuffer inProgress = null; + for (int i = 0; i < LOOKUP_RETRIES; i++) { + synchronized (rbm) { + inProgress = rbm.findInList(rbm.getInProgressList(), spyIn, 0); + } + if (inProgress != null) { + break; + } + Thread.sleep(SLEEP_TIME); + } + assertNotNull(inProgress,"Expected buffer to be in inProgressList while completion is blocked"); + Future r2 = exec.submit(() -> { + try { + spyIn.read(new byte[1], 0, 1); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + long bufferOffset = inProgress.getOffset(); + int length = (int)Math.min(ONE_MB, DATA_8_MB - bufferOffset); + List ranges = new ArrayList<>(); + ranges.add(FileRange.createFileRange(bufferOffset, length)); + Future vr = exec.submit(() -> { + try { + spyIn.readVectored(ranges, ByteBuffer::allocate); + } catch (Exception e) { + throw new RuntimeException(e); + } + }); + Thread.sleep(50); + blockCompletion.countDown(); + r1.get(FUTURE_TIMEOUT_SEC, TimeUnit.SECONDS); + r2.get(FUTURE_TIMEOUT_SEC, TimeUnit.SECONDS); + vr.get(FUTURE_TIMEOUT_SEC, TimeUnit.SECONDS); + ranges.get(0).getData().get(FUTURE_TIMEOUT_SEC, TimeUnit.SECONDS); + validateVectoredReadResult(ranges, fileContent, bufferOffset); + exec.shutdownNow(); + } + } + + /** + * Verifies vectored read behavior under throughput-optimized strategy. + * Confirms that ranges are split as expected and that the number of + * backend reads matches the throughput-oriented execution model. + */ + @Test + public void testThroughputOptimizedReadVectored() throws Exception { + Configuration configuration = getRawConfiguration(); + configuration.set(FS_AZURE_VECTORED_READ_STRATEGY, + VectoredReadStrategy.THROUGHPUT_OPTIMIZED.getName()); + FileSystem fileSystem = FileSystem.newInstance(configuration); + try (AzureBlobFileSystem abfs = (AzureBlobFileSystem) fileSystem) { + String fileName = methodName.getMethodName(); + byte[] fileContent = getRandomBytesArray(DATA_32_MB); + Path testFilePath = createFileWithContent(abfs, fileName, fileContent); + List fileRanges = new ArrayList<>(); + fileRanges.add( + FileRange.createFileRange(0L, (int) (MB_3_8 * ONE_MB))); + fileRanges.add( + FileRange.createFileRange( + (long) (MB_4_0 * ONE_MB), (int) (MB_3_2 * ONE_MB))); + fileRanges.add( + FileRange.createFileRange( + (long) (MB_8_0 * ONE_MB), (int) (MB_2_0 * ONE_MB))); + fileRanges.add( + FileRange.createFileRange( + (long) (MB_12_0 * ONE_MB), (int) (MB_4_0 * ONE_MB))); + fileRanges.add( + FileRange.createFileRange( + (long) (MB_16_0 * ONE_MB), (int) (MB_2_0 * ONE_MB))); + IntFunction allocate = ByteBuffer::allocate; + try (FSDataInputStream in = + abfs.openFile(testFilePath).build().get()) { + AbfsInputStream abfsIn = (AbfsInputStream) in.getWrappedStream(); + AbfsInputStream spyIn = Mockito.spy(abfsIn); + spyIn.readVectored(fileRanges, allocate); + CompletableFuture[] futures = + new CompletableFuture[fileRanges.size()]; + int i = 0; + for (FileRange range : fileRanges) { + futures[i++] = range.getData(); + } + CompletableFuture.allOf(futures).get(); + validateVectoredReadResult(fileRanges, fileContent, 0); + Mockito.verify(spyIn, Mockito.times(5)) + .readRemote( + Mockito.anyLong(), + Mockito.any(byte[].class), + Mockito.anyInt(), + Mockito.anyInt(), + Mockito.any()); + } + } + } + + /** + * Compares performance of many random reads using: + * 1. Individual random (non-vectored) reads + * 2. A single vectored read covering the same offsets + * + * All non-vectored reads complete first, followed by vectored reads. + * This is a relative performance test intended to catch regressions, + * not to assert absolute timing guarantees. + */ + @Test + public void testRandomReadsNonVectoredThenVectoredPerformance() + throws Exception { + final AzureBlobFileSystem fs = getFileSystem(); + final int fileSize = 128 * ONE_MB; + final int readSize = 64 * ONE_KB; + final int readCount = 512; + byte[] fileContent = getRandomBytesArray(fileSize); + Path testPath = + createFileWithContent(fs, methodName.getMethodName(), fileContent); + /* ---------------------------------------------------- + * Generate NON-overlapping offsets (shuffled) + * ---------------------------------------------------- */ + List offsets = new ArrayList<>(); + for (long offset = 0; offset + readSize <= fileSize; offset += readSize) { + offsets.add(offset); + } + // Shuffle to simulate randomness without overlap + Collections.shuffle(offsets, new Random(12345L)); + // Limit to readCount + offsets = offsets.subList(0, Math.min(readCount, offsets.size())); + + /* ---------------------------------------------------- + * Build vectored ranges + * ---------------------------------------------------- */ + List vectoredRanges = new ArrayList<>(); + for (long offset : offsets) { + vectoredRanges.add( + FileRange.createFileRange(offset, readSize)); + } + try (FSDataInputStream in = fs.openFile(testPath).build().get()) { + + /* ---------------------------------------------------- + * Phase 1: Random non-vectored reads + * ---------------------------------------------------- */ + byte[] buffer = new byte[readSize]; + long nonVectoredStartNs = System.nanoTime(); + for (long offset : offsets) { + in.seek(offset); + in.read(buffer, 0, readSize); + } + long nonVectoredTimeNs = + System.nanoTime() - nonVectoredStartNs; + /* ---------------------------------------------------- + * Phase 2: Vectored reads + * ---------------------------------------------------- */ + long vectoredStartNs = System.nanoTime(); + in.readVectored(vectoredRanges, ByteBuffer::allocate); + CompletableFuture.allOf( + vectoredRanges.stream() + .map(FileRange::getData) + .toArray(CompletableFuture[]::new) + ).get(); + long vectoredTimeNs = + System.nanoTime() - vectoredStartNs; + /* ---------------------------------------------------- + * Assertion (less flaky) + * ---------------------------------------------------- */ + assertTrue( + vectoredTimeNs <= nonVectoredTimeNs * 1.2, + String.format( + "Vectored read slower: vectored=%d ns, non-vectored=%d ns", + vectoredTimeNs, nonVectoredTimeNs) + ); + } + } +} diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/ITestAbfsDriverLatency.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/ITestAbfsDriverLatency.java new file mode 100644 index 0000000000000..91f3e26322b4c --- /dev/null +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/utils/ITestAbfsDriverLatency.java @@ -0,0 +1,217 @@ +package org.apache.hadoop.fs.azurebfs.utils; + +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.*; + +/** + * Measures e2e read latency through the ABFS driver. + * + * Compare P50/P90/P99 from this test against AbfsCheckLatency + * (plain HTTP app) to isolate driver overhead from network latency. + * + * Run: + * mvn test -Dtest=ITestAbfsDriverLatency + * -Dlatency.jobs=10 + * -Dlatency.files=100 + * -Dlatency.chunkKB=64 + * -Dlatency.fileSizeMB=1024 + */ +public class ITestAbfsDriverLatency extends AbstractAbfsIntegrationTest { + + // ===== CONFIG ===== + private int NUM_JOBS; + private int FILES_PER_JOB; + private int CHUNK_SIZE_KB; + private int FILE_SIZE_MB; + + private int CHUNK_SIZE; + private long FILE_SIZE; + + // one buffer per thread — no allocations per chunk + private ThreadLocal threadBuffer; + + // all chunk latencies across all jobs and files + private final List readLatencies = + Collections.synchronizedList(new ArrayList<>()); + + // test file paths created during setup + private final List testFiles = new ArrayList<>(); + + // ===== CONSTRUCTOR ===== + public ITestAbfsDriverLatency() throws Exception { + super(); + } + + // ===== SETUP ===== + @BeforeEach + public void setUp() throws Exception { + NUM_JOBS = Integer.parseInt(System.getProperty("latency.jobs", "2")); + FILES_PER_JOB = Integer.parseInt(System.getProperty("latency.files", "5")); + CHUNK_SIZE_KB = Integer.parseInt(System.getProperty("latency.chunkKB", "64")); + FILE_SIZE_MB = Integer.parseInt(System.getProperty("latency.fileSizeMB", "10")); + + CHUNK_SIZE = CHUNK_SIZE_KB * 1024; + FILE_SIZE = (long) FILE_SIZE_MB * 1024 * 1024; + + threadBuffer = ThreadLocal.withInitial(() -> new byte[CHUNK_SIZE]); + + System.out.println("===== CONFIG ====="); + System.out.printf("Jobs: %d%n", NUM_JOBS); + System.out.printf("Files/job: %d%n", FILES_PER_JOB); + System.out.printf("Chunk size: %d KB%n", CHUNK_SIZE_KB); + System.out.printf("File size: %d MB%n", FILE_SIZE_MB); + System.out.println(); + + createTestFiles(); + + // warmup — not recorded + System.out.println("Warmup..."); + readFile(getFileSystem(), testFiles.get(0)); + readLatencies.clear(); + Thread.sleep(2000); + } + + // ===== TEARDOWN ===== + @AfterEach + public void tearDown() throws Exception { + FileSystem fs = getFileSystem(); + for (Path p : testFiles) { + try { fs.delete(p, false); } + catch (Exception ignored) {} + } + testFiles.clear(); + readLatencies.clear(); + } + + // ===== TEST ===== + @Test + public void testE2EReadLatency() throws Exception { + System.out.println("Starting benchmark...\n"); + long wallStart = System.currentTimeMillis(); + + List threads = new ArrayList<>(); + for (int j = 0; j < NUM_JOBS; j++) { + final int jobId = j; + Thread t = new Thread(() -> runJob(jobId)); + threads.add(t); + t.start(); + } + + for (Thread t : threads) t.join(); + long wallMs = System.currentTimeMillis() - wallStart; + + printResults(wallMs); + } + + // ===== CREATE TEST FILES ===== + private void createTestFiles() throws Exception { + FileSystem fs = getFileSystem(); + int total = NUM_JOBS * FILES_PER_JOB; + + System.out.println("Creating " + total + " test files of " + + FILE_SIZE_MB + " MB each..."); + + for (int i = 0; i < total; i++) { + Path path = new Path(getTestPath1(), "latency_test_file_" + i); + testFiles.add(path); + + try (FSDataOutputStream out = fs.create(path, true)) { + byte[] buf = new byte[4 * 1024 * 1024]; + long written = 0; + while (written < FILE_SIZE) { + int toWrite = (int) Math.min(buf.length, FILE_SIZE - written); + out.write(buf, 0, toWrite); + written += toWrite; + } + } + } + + System.out.println("Done.\n"); + } + + // ===== ONE JOB ===== + private void runJob(int jobId) { + try { + FileSystem fs = getFileSystem(); + + int start = jobId * FILES_PER_JOB; + int end = start + FILES_PER_JOB; + + for (int i = start; i < end; i++) { + try { + readFile(fs, testFiles.get(i)); + } catch (Exception e) { + System.err.println("Job " + jobId + + " failed on file " + i + ": " + e.getMessage()); + } + } + + System.out.println("Job " + jobId + " done"); + + } catch (Exception e) { + System.err.println("Job " + jobId + " failed: " + e.getMessage()); + } + } + + // ===== READ ONE FILE via ABFS driver ===== + private void readFile(FileSystem fs, Path path) throws Exception { + byte[] buf = threadBuffer.get(); + + try (FSDataInputStream in = fs.open(path)) { + int n; + while (true) { + long start = System.currentTimeMillis(); + n = in.read(buf, 0, CHUNK_SIZE); + long latency = System.currentTimeMillis() - start; + + if (n == -1) break; + + readLatencies.add(latency); + // discard — no processing + } + } + } + + // ===== HELPER ===== + private Path getTestPath1() throws IOException { + return new Path(getFileSystem().getUri().toString() + + "/latency-benchmark"); + } + + // ===== PRINT RESULTS ===== + private void printResults(long wallMs) { + int chunksPerFile = (int) Math.ceil((double) FILE_SIZE / CHUNK_SIZE); + + System.out.println("\n===== RESULTS ====="); + System.out.printf("Wall time: %d ms%n", wallMs); + System.out.printf("Total reads: %d%n", readLatencies.size()); + System.out.printf("Expected reads: %d%n", + NUM_JOBS * FILES_PER_JOB * chunksPerFile); + + System.out.println("\n===== READ LATENCY via ABFS DRIVER ====="); + System.out.printf("Min: %d ms%n", readLatencies.stream().mapToLong(l->l).min().orElse(0)); + System.out.printf("Avg: %.0f ms%n", readLatencies.stream().mapToLong(l->l).average().orElse(0)); + System.out.printf("P50: %d ms%n", percentile(50)); + System.out.printf("P90: %d ms%n", percentile(90)); + System.out.printf("P99: %d ms%n", percentile(99)); + System.out.printf("Max: %d ms%n", readLatencies.stream().mapToLong(l->l).max().orElse(0)); + } + + // ===== PERCENTILE ===== + private long percentile(double p) { + List sorted = new ArrayList<>(readLatencies); + Collections.sort(sorted); + int idx = (int) Math.ceil(p / 100.0 * sorted.size()) - 1; + return sorted.get(Math.max(idx, 0)); + } +} \ No newline at end of file