Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
8c28b18
vectored read config changes
anmolanmol1234 Jan 16, 2026
08617b7
Vectored read code
anmolanmol1234 Jan 20, 2026
95cbb73
Fix tests
anmolanmol1234 Jan 20, 2026
d03d3cd
Made changes for inprogress list
anmolanmol1234 Jan 23, 2026
1e62df3
Merge branch 'trunk' of https://github.com/anmolanmol1234/hadoop into…
anmolanmol1234 Jan 23, 2026
3f36997
Merge conflicts
anmolanmol1234 Jan 27, 2026
c4313e9
Checkstyle fix
anmolanmol1234 Jan 27, 2026
5043c90
Merge branch 'trunk' of https://github.com/anmolanmol1234/hadoop into…
anmolanmol1234 Jan 27, 2026
0106fc1
Fix checkstyle
anmolanmol1234 Jan 27, 2026
1430b4a
Fix checkstyle
anmolanmol1234 Jan 27, 2026
0927ca1
Add explanations
anmolanmol1234 Jan 27, 2026
08ca94a
Add debug log statements
anmolanmol1234 Jan 27, 2026
02834ef
Checkstyle fixes
anmolanmol1234 Jan 28, 2026
64e5da6
Merge branch 'trunk' of https://github.com/anmolanmol1234/hadoop into…
anmolanmol1234 Feb 2, 2026
eb52c29
Merge branch 'trunk' of https://github.com/anmolanmol1234/hadoop into…
anmolanmol1234 Feb 20, 2026
1ea571e
fix null issue
anmolanmol1234 Mar 12, 2026
279e7f4
Fix vectored read
anmolanmol1234 Mar 30, 2026
7de0740
Merge branch 'trunk' of https://github.com/anmolanmol1234/hadoop into…
anmolanmol1234 Apr 1, 2026
7c46352
Merge branch 'trunk' of https://github.com/anmolanmol1234/hadoop into…
anmolanmol1234 Apr 2, 2026
25fa821
range validation fixes
anmolanmol1234 Apr 2, 2026
5b2632a
fix issues
anmolanmol1234 Apr 2, 2026
437ffc8
Fix checkstyle
anmolanmol1234 Apr 2, 2026
975bf73
fix checkstyle
anmolanmol1234 Apr 2, 2026
ba7c9ff
fix checkstyle failures
anmolanmol1234 Apr 3, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions hadoop-tools/hadoop-azure/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,7 @@
<exclude>**/azurebfs/ITestSmallWriteOptimization.java</exclude>
<exclude>**/azurebfs/ITestAbfsStreamStatistics*.java</exclude>
<exclude>**/azurebfs/services/ITestReadBufferManager.java</exclude>
<exclude>**/azurebfs/services/ITestVectoredRead.java</exclude>
<exclude>**/azurebfs/commit/*.java</exclude>
</excludes>

Expand Down Expand Up @@ -497,6 +498,7 @@
<include>**/azurebfs/ITestSmallWriteOptimization.java</include>
<include>**/azurebfs/services/ITestReadBufferManager.java</include>
<include>**/azurebfs/ITestAbfsStreamStatistics*.java</include>
<include>**/azurebfs/services/ITestVectoredRead.java</include>
<include>**/azurebfs/commit/*.java</include>
</includes>
</configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator;
import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator;
import org.apache.hadoop.fs.azurebfs.enums.Trilean;
import org.apache.hadoop.fs.azurebfs.enums.VectoredReadStrategy;
import org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee;
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
Expand Down Expand Up @@ -623,6 +624,22 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_FS_AZURE_LOWEST_REQUEST_PRIORITY_VALUE)
private int prefetchRequestPriorityValue;

@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_VECTORED_READ_STRATEGY,
DefaultValue = DEFAULT_FS_AZURE_VECTORED_READ_STRATEGY)
private String vectoredReadStrategy;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_MIN_SEEK_FOR_VECTORED_READS,
DefaultValue = DEFAULT_FS_AZURE_MIN_SEEK_FOR_VECTORED_READS)
private int minSeekForVectoredReads;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_MAX_SEEK_FOR_VECTORED_READS,
DefaultValue = DEFAULT_FS_AZURE_MAX_SEEK_FOR_VECTORED_READS)
private int maxSeekForVectoredReads;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_MAX_SEEK_FOR_VECTORED_READS_THROUGHPUT,
DefaultValue = DEFAULT_FS_AZURE_MAX_SEEK_FOR_VECTORED_READS_THROUGHPUT)
private int maxSeekForVectoredReadsThroughput;

@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_READ_POLICY,
DefaultValue = DEFAULT_AZURE_READ_POLICY)
private String abfsReadPolicy;
Expand Down Expand Up @@ -2167,4 +2184,58 @@ public int getTailLatencyAnalysisWindowGranularity() {
public int getTailLatencyMaxRetryCount() {
return tailLatencyMaxRetryCount;
}

/**
* Returns the configured vectored read strategy.
*
* <p>
* The configuration value is parsed in a case-insensitive manner and may be
* specified either using the enum name (for example,
* {@code TPS_OPTIMIZED}) or the short name (for example, {@code TPS}).
* </p>
*
* @return the resolved {@link VectoredReadStrategy}
*
* @throws IllegalArgumentException if the configured value is invalid
*/
public VectoredReadStrategy getVectoredReadStrategy() {
try {
return VectoredReadStrategy.fromString(vectoredReadStrategy);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"Invalid value for " + FS_AZURE_VECTORED_READ_STRATEGY
+ ": " + vectoredReadStrategy
+ ". Expected one of: TPS, THROUGHPUT, TPS_OPTIMIZED, THROUGHPUT_OPTIMIZED", e);
}
}

/**
* Returns the minimum gap between adjacent read ranges that qualifies them
* for merging during vectored reads.
*
* @return minimum gap threshold for range merging
*/
public int getMinSeekForVectoredReads() {
return minSeekForVectoredReads;
}

/**
* Returns the maximum gap between adjacent read ranges allowed when
* considering them for merging during vectored reads.
*
* @return maximum gap threshold for range merging
*/
public int getMaxSeekForVectoredReads() {
return maxSeekForVectoredReads;
}

/**
* Returns the maximum gap between adjacent read ranges allowed when
* considering them for merging during vectored reads throughput optimized..
*
* @return maximum gap threshold for range merging
*/
public int getMaxSeekForVectoredReadsThroughput() {
return maxSeekForVectoredReadsThroughput;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -627,5 +627,31 @@ public static String containerProperty(String property, String fsName, String ac
*/
public static final String FS_AZURE_TAIL_LATENCY_MAX_RETRY_COUNT = "fs.azure.tail.latency.max.retry.count";

/**
* Configuration key to control the vectored read strategy used by ABFS: {@value}
*/
public static final String FS_AZURE_VECTORED_READ_STRATEGY = "fs.azure.vectored.read.strategy";

/**
* Configuration key that defines the minimum gap between adjacent read ranges
* for merging ranges during vectored reads in ABFS: {@value}.
*/
public static final String FS_AZURE_MIN_SEEK_FOR_VECTORED_READS =
"fs.azure.min.seek.for.vectored.reads";

/**
* Configuration key that defines the maximum gap between adjacent read ranges
* for merging ranges during vectored reads in ABFS: {@value}.
*/
public static final String FS_AZURE_MAX_SEEK_FOR_VECTORED_READS =
"fs.azure.max.seek.for.vectored.reads";

/**
* Configuration key that defines the maximum gap between adjacent read ranges
* for merging ranges during vectored reads in ABFS throughput optimized: {@value}.
*/
public static final String FS_AZURE_MAX_SEEK_FOR_VECTORED_READS_THROUGHPUT =
"fs.azure.max.seek.for.vectored.reads.throughput";

private ConfigurationKeys() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,10 @@ public final class FileSystemConfigurations {
public static final int MIN_FS_AZURE_TAIL_LATENCY_ANALYSIS_WINDOW_GRANULARITY = 1;
public static final int DEFAULT_FS_AZURE_TAIL_LATENCY_PERCENTILE_COMPUTATION_INTERVAL_MILLIS = 500;
public static final int DEFAULT_FS_AZURE_TAIL_LATENCY_MAX_RETRY_COUNT = 1;
public static final String DEFAULT_FS_AZURE_VECTORED_READ_STRATEGY = "TPS";
public static final int DEFAULT_FS_AZURE_MIN_SEEK_FOR_VECTORED_READS = ONE_MB;
public static final int DEFAULT_FS_AZURE_MAX_SEEK_FOR_VECTORED_READS = 4 * ONE_MB;
public static final int DEFAULT_FS_AZURE_MAX_SEEK_FOR_VECTORED_READS_THROUGHPUT = 8 * ONE_MB;

private FileSystemConfigurations() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ public enum ReadType {
* Only triggered when small file read optimization kicks in.
*/
SMALLFILE_READ("SR"),
/**
* Read multiple disjoint ranges from the storage service using vectored reads.
* Used to coalesce and execute non-contiguous reads efficiently.
*/
VECTORED_READ("VR"),
/**
* Performs a vectored direct read by fetching multiple non-contiguous
* ranges in a single operation.
*/
VECTORED_DIRECT_READ("VDR"),
/**
* Reads from Random Input Stream with read ahead up to readAheadRange
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.enums;

import org.apache.hadoop.fs.azurebfs.services.ReadBuffer;

/**
* Enum for buffer types.
* Used in {@link ReadBuffer} to separate normal vs vectored read.
*/
public enum BufferType {
/**
* Normal read buffer.
*/
NORMAL,
/**
* Vectored read buffer.
*/
VECTORED
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.enums;

/**
* Defines the strategy used for vectored reads in ABFS.
*
* <p>
* The strategy controls how read ranges are planned and executed, trading off
* between request parallelism and per-request payload size.
* </p>
*/
public enum VectoredReadStrategy {

/**
* Optimizes for transactions per second (TPS).
*/
TPS_OPTIMIZED("TPS"),

/**
* Optimizes for overall data throughput.
*/
THROUGHPUT_OPTIMIZED("THROUGHPUT");

/** Short name used for configuration and logging. */
private final String name;

/**
* Constructs a vectored read strategy with a short, user-friendly name.
*
* @param name short identifier for the strategy
*/
VectoredReadStrategy(String name) {
this.name = name;
}

/**
* Returns the short name of the vectored read strategy.
*
* @return short strategy name
*/
public String getName() {
return name;
}

/**
* Parses a configuration value into a {@link VectoredReadStrategy}.
* @param value configuration value
* @return matching vectored read strategy
* @throws IllegalArgumentException if the value is invalid
*/
public static VectoredReadStrategy fromString(String value) {
for (VectoredReadStrategy strategy : values()) {
if (strategy.name().equalsIgnoreCase(value)
|| strategy.getName().equalsIgnoreCase(value)) {
return strategy;
}
}
throw new IllegalArgumentException("Invalid vectored read strategy: " + value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.UUID;
import java.util.function.IntFunction;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.azurebfs.constants.ReadType;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.util.Preconditions;
Expand Down Expand Up @@ -54,8 +58,6 @@
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN;
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
import static org.apache.hadoop.io.Sizes.S_128K;
import static org.apache.hadoop.io.Sizes.S_2M;
import static org.apache.hadoop.util.StringUtils.toLowerCase;

/**
Expand Down Expand Up @@ -125,6 +127,7 @@ public abstract class AbfsInputStream extends FSInputStream implements CanUnbuff
private final AbfsInputStreamContext context;
private IOStatistics ioStatistics;
private String filePathIdentifier;

/**
* This is the actual position within the object, used by
* lazy seek to decide whether to seek on the next read or not.
Expand Down Expand Up @@ -199,7 +202,7 @@ public AbfsInputStream(
readAheadBlockSize, client.getAbfsConfiguration());
readBufferManager = ReadBufferManagerV2.getBufferManager(client.getAbfsCounters());
} else {
ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize);
ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize, client.getAbfsConfiguration());
readBufferManager = ReadBufferManagerV1.getBufferManager();
}

Expand All @@ -220,6 +223,14 @@ private String createInputStreamId() {
return StringUtils.right(UUID.randomUUID().toString(), STREAM_ID_LEN);
}

/**
* Retrieves the handler responsible for processing vectored read requests.
* @return the {@link VectoredReadHandler} instance associated with the buffer manager.
*/
VectoredReadHandler getVectoredReadHandler() {
return getReadBufferManager().getVectoredReadHandler();
}

@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
Expand Down Expand Up @@ -331,6 +342,19 @@ public synchronized int read(final byte[] b, final int off, final int len) throw
return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
}

/**
* {@inheritDoc}
* Vectored read implementation for AbfsInputStream.
*
* @param ranges the byte ranges to read.
* @param allocate the function to allocate ByteBuffer.
*/
@Override
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws EOFException {
getVectoredReadHandler().readVectored(this, ranges, allocate);
}

private boolean shouldReadFully() {
return this.firstRead && this.context.readSmallFilesCompletely()
&& this.contentLength <= this.bufferSize;
Expand Down Expand Up @@ -795,7 +819,10 @@ public synchronized void unbuffer() {

@Override
public boolean hasCapability(String capability) {
return StreamCapabilities.UNBUFFER.equals(toLowerCase(capability));
return switch (toLowerCase(capability)) {
case StreamCapabilities.UNBUFFER, StreamCapabilities.VECTOREDIO -> true;
default -> false;
};
}

/**
Expand Down Expand Up @@ -1069,7 +1096,7 @@ ReadBufferManager getReadBufferManager() {
*/
@Override
public int minSeekForVectorReads() {
return S_128K;
return client.getAbfsConfiguration().getMinSeekForVectoredReads();
}

/**
Expand All @@ -1078,7 +1105,7 @@ public int minSeekForVectorReads() {
*/
@Override
public int maxReadSizeForVectorReads() {
return S_2M;
return client.getAbfsConfiguration().getMaxSeekForVectoredReads();
}

/**
Expand Down
Loading
Loading