Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.apache.hadoop.fs.azurebfs.diagnostics.LongConfigurationBasicValidator;
import org.apache.hadoop.fs.azurebfs.diagnostics.StringConfigurationBasicValidator;
import org.apache.hadoop.fs.azurebfs.enums.Trilean;
import org.apache.hadoop.fs.azurebfs.enums.VectoredReadStrategy;
import org.apache.hadoop.fs.azurebfs.extensions.CustomTokenProviderAdaptee;
import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider;
import org.apache.hadoop.fs.azurebfs.extensions.SASTokenProvider;
Expand Down Expand Up @@ -623,6 +624,22 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_FS_AZURE_LOWEST_REQUEST_PRIORITY_VALUE)
private int prefetchRequestPriorityValue;

@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_VECTORED_READ_STRATEGY,
DefaultValue = DEFAULT_FS_AZURE_VECTORED_READ_STRATEGY)
private String vectoredReadStrategy;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_MIN_SEEK_FOR_VECTORED_READS,
DefaultValue = DEFAULT_FS_AZURE_MIN_SEEK_FOR_VECTORED_READS)
private int minSeekForVectoredReads;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_MAX_SEEK_FOR_VECTORED_READS,
DefaultValue = DEFAULT_FS_AZURE_MAX_SEEK_FOR_VECTORED_READS)
private int maxSeekForVectoredReads;

@IntegerConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_MAX_SEEK_FOR_VECTORED_READS_THROUGHPUT,
DefaultValue = DEFAULT_FS_AZURE_MAX_SEEK_FOR_VECTORED_READS_THROUGHPUT)
private int maxSeekForVectoredReadsThroughput;

@StringConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_READ_POLICY,
DefaultValue = DEFAULT_AZURE_READ_POLICY)
private String abfsReadPolicy;
Expand Down Expand Up @@ -2167,4 +2184,58 @@ public int getTailLatencyAnalysisWindowGranularity() {
public int getTailLatencyMaxRetryCount() {
return tailLatencyMaxRetryCount;
}

/**
* Returns the configured vectored read strategy.
*
* <p>
* The configuration value is parsed in a case-insensitive manner and may be
* specified either using the enum name (for example,
* {@code TPS_OPTIMIZED}) or the short name (for example, {@code TPS}).
* </p>
*
* @return the resolved {@link VectoredReadStrategy}
*
* @throws IllegalArgumentException if the configured value is invalid
*/
public VectoredReadStrategy getVectoredReadStrategy() {
try {
return VectoredReadStrategy.fromString(vectoredReadStrategy);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(
"Invalid value for " + FS_AZURE_VECTORED_READ_STRATEGY
+ ": " + vectoredReadStrategy
+ ". Expected one of: TPS, THROUGHPUT, TPS_OPTIMIZED, THROUGHPUT_OPTIMIZED", e);
}
}

/**
* Returns the minimum gap between adjacent read ranges that qualifies them
* for merging during vectored reads.
*
* @return minimum gap threshold for range merging
*/
public int getMinSeekForVectoredReads() {
return minSeekForVectoredReads;
}

/**
* Returns the maximum gap between adjacent read ranges allowed when
* considering them for merging during vectored reads.
*
* @return maximum gap threshold for range merging
*/
public int getMaxSeekForVectoredReads() {
return maxSeekForVectoredReads;
}

/**
* Returns the maximum gap between adjacent read ranges allowed when
* considering them for merging during vectored reads throughput optimized..
*
* @return maximum gap threshold for range merging
*/
public int getMaxSeekForVectoredReadsThroughput() {
return maxSeekForVectoredReadsThroughput;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -627,5 +627,31 @@ public static String containerProperty(String property, String fsName, String ac
*/
public static final String FS_AZURE_TAIL_LATENCY_MAX_RETRY_COUNT = "fs.azure.tail.latency.max.retry.count";

/**
* Configuration key to control the vectored read strategy used by ABFS: {@value}
*/
public static final String FS_AZURE_VECTORED_READ_STRATEGY = "fs.azure.vectored.read.strategy";

/**
* Configuration key that defines the minimum gap between adjacent read ranges
* for merging ranges during vectored reads in ABFS: {@value}.
*/
public static final String FS_AZURE_MIN_SEEK_FOR_VECTORED_READS =
"fs.azure.min.seek.for.vectored.reads";

/**
* Configuration key that defines the maximum gap between adjacent read ranges
* for merging ranges during vectored reads in ABFS: {@value}.
*/
public static final String FS_AZURE_MAX_SEEK_FOR_VECTORED_READS =
"fs.azure.max.seek.for.vectored.reads";

/**
* Configuration key that defines the maximum gap between adjacent read ranges
* for merging ranges during vectored reads in ABFS throughput optimized: {@value}.
*/
public static final String FS_AZURE_MAX_SEEK_FOR_VECTORED_READS_THROUGHPUT =
"fs.azure.max.seek.for.vectored.reads.throughput";

private ConfigurationKeys() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,10 @@ public final class FileSystemConfigurations {
public static final int MIN_FS_AZURE_TAIL_LATENCY_ANALYSIS_WINDOW_GRANULARITY = 1;
public static final int DEFAULT_FS_AZURE_TAIL_LATENCY_PERCENTILE_COMPUTATION_INTERVAL_MILLIS = 500;
public static final int DEFAULT_FS_AZURE_TAIL_LATENCY_MAX_RETRY_COUNT = 1;
public static final String DEFAULT_FS_AZURE_VECTORED_READ_STRATEGY = "TPS";
public static final int DEFAULT_FS_AZURE_MIN_SEEK_FOR_VECTORED_READS = ONE_MB;
public static final int DEFAULT_FS_AZURE_MAX_SEEK_FOR_VECTORED_READS = 4 * ONE_MB;
public static final int DEFAULT_FS_AZURE_MAX_SEEK_FOR_VECTORED_READS_THROUGHPUT = 8 * ONE_MB;

private FileSystemConfigurations() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,16 @@ public enum ReadType {
* Only triggered when small file read optimization kicks in.
*/
SMALLFILE_READ("SR"),
/**
* Read multiple disjoint ranges from the storage service using vectored reads.
* Used to coalesce and execute non-contiguous reads efficiently.
*/
VECTORED_READ("VR"),
/**
* Performs a vectored direct read by fetching multiple non-contiguous
* ranges in a single operation.
*/
VECTORED_DIRECT_READ("VDR"),
/**
* Reads from Random Input Stream with read ahead up to readAheadRange
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.enums;

import org.apache.hadoop.fs.azurebfs.services.ReadBuffer;

/**
* Enum for buffer types.
* Used in {@link ReadBuffer} to separate normal vs vectored read.
*/
public enum BufferType {
/**
* Normal read buffer.
*/
NORMAL,
/**
* Vectored read buffer.
*/
VECTORED
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.azurebfs.enums;

/**
* Defines the strategy used for vectored reads in ABFS.
*
* <p>
* The strategy controls how read ranges are planned and executed, trading off
* between request parallelism and per-request payload size.
* </p>
*/
public enum VectoredReadStrategy {

/**
* Optimizes for transactions per second (TPS).
*/
TPS_OPTIMIZED("TPS"),

/**
* Optimizes for overall data throughput.
*/
THROUGHPUT_OPTIMIZED("THROUGHPUT");

/** Short name used for configuration and logging. */
private final String name;

/**
* Constructs a vectored read strategy with a short, user-friendly name.
*
* @param name short identifier for the strategy
*/
VectoredReadStrategy(String name) {
this.name = name;
}

/**
* Returns the short name of the vectored read strategy.
*
* @return short strategy name
*/
public String getName() {
return name;
}

/**
* Parses a configuration value into a {@link VectoredReadStrategy}.
* @param value configuration value
* @return matching vectored read strategy
* @throws IllegalArgumentException if the value is invalid
*/
public static VectoredReadStrategy fromString(String value) {
for (VectoredReadStrategy strategy : values()) {
if (strategy.name().equalsIgnoreCase(value)
|| strategy.getName().equalsIgnoreCase(value)) {
return strategy;
}
}
throw new IllegalArgumentException("Invalid vectored read strategy: " + value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,14 @@
import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.UUID;
import java.util.function.IntFunction;

import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.FileRange;
import org.apache.hadoop.fs.azurebfs.constants.ReadType;
import org.apache.hadoop.fs.impl.BackReference;
import org.apache.hadoop.util.Preconditions;
Expand Down Expand Up @@ -54,8 +58,6 @@
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN;
import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD;
import static org.apache.hadoop.io.Sizes.S_128K;
import static org.apache.hadoop.io.Sizes.S_2M;
import static org.apache.hadoop.util.StringUtils.toLowerCase;

/**
Expand All @@ -72,7 +74,6 @@ public abstract class AbfsInputStream extends FSInputStream implements CanUnbuff
private final AbfsClient client;
private final Statistics statistics;
private final String path;

private final long contentLength;
private final int bufferSize; // default buffer size
private final int footerReadSize; // default buffer size to read when reading footer
Expand All @@ -94,8 +95,7 @@ public abstract class AbfsInputStream extends FSInputStream implements CanUnbuff
// User configured size of read ahead.
private final int readAheadRange;

private boolean firstRead = true; // to identify first read for optimizations

private boolean firstRead = true;
// SAS tokens can be re-used until they expire
private CachedSASToken cachedSasToken;
private byte[] buffer = null; // will be initialized on first use
Expand Down Expand Up @@ -125,6 +125,8 @@ public abstract class AbfsInputStream extends FSInputStream implements CanUnbuff
private final AbfsInputStreamContext context;
private IOStatistics ioStatistics;
private String filePathIdentifier;
private VectoredReadHandler vectoredReadHandler;

/**
* This is the actual position within the object, used by
* lazy seek to decide whether to seek on the next read or not.
Expand Down Expand Up @@ -199,7 +201,7 @@ public AbfsInputStream(
readAheadBlockSize, client.getAbfsConfiguration());
readBufferManager = ReadBufferManagerV2.getBufferManager(client.getAbfsCounters());
} else {
ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize);
ReadBufferManagerV1.setReadBufferManagerConfigs(readAheadBlockSize, client.getAbfsConfiguration());
readBufferManager = ReadBufferManagerV1.getBufferManager();
}

Expand All @@ -220,6 +222,14 @@ private String createInputStreamId() {
return StringUtils.right(UUID.randomUUID().toString(), STREAM_ID_LEN);
}

/**
* Retrieves the handler responsible for processing vectored read requests.
* @return the {@link VectoredReadHandler} instance associated with the buffer manager.
*/
VectoredReadHandler getVectoredReadHandler() {
return getReadBufferManager().getVectoredReadHandler();
}

@Override
public int read(long position, byte[] buffer, int offset, int length)
throws IOException {
Expand Down Expand Up @@ -331,6 +341,19 @@ public synchronized int read(final byte[] b, final int off, final int len) throw
return totalReadBytes > 0 ? totalReadBytes : lastReadBytes;
}

/**
* {@inheritDoc}
* Vectored read implementation for AbfsInputStream.
*
* @param ranges the byte ranges to read.
* @param allocate the function to allocate ByteBuffer.
*/
@Override
public void readVectored(List<? extends FileRange> ranges,
IntFunction<ByteBuffer> allocate) throws EOFException {
getVectoredReadHandler().readVectored(this, ranges, allocate);
}

private boolean shouldReadFully() {
return this.firstRead && this.context.readSmallFilesCompletely()
&& this.contentLength <= this.bufferSize;
Expand Down Expand Up @@ -795,7 +818,10 @@ public synchronized void unbuffer() {

@Override
public boolean hasCapability(String capability) {
return StreamCapabilities.UNBUFFER.equals(toLowerCase(capability));
return switch (toLowerCase(capability)) {
case StreamCapabilities.UNBUFFER, StreamCapabilities.VECTOREDIO -> true;
default -> false;
};
}

/**
Expand Down Expand Up @@ -1069,7 +1095,7 @@ ReadBufferManager getReadBufferManager() {
*/
@Override
public int minSeekForVectorReads() {
return S_128K;
return client.getAbfsConfiguration().getMinSeekForVectoredReads();
}

/**
Expand All @@ -1078,7 +1104,7 @@ public int minSeekForVectorReads() {
*/
@Override
public int maxReadSizeForVectorReads() {
return S_2M;
return client.getAbfsConfiguration().getMaxSeekForVectoredReads();
}

/**
Expand Down
Loading
Loading