diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java index 6c95bd3768909..0a32a0507755f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AbfsConfiguration.java @@ -627,6 +627,10 @@ public class AbfsConfiguration{ DefaultValue = DEFAULT_AZURE_READ_POLICY) private String abfsReadPolicy; + @BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_RESTRICT_GPS_ON_OPENFILE, + DefaultValue = DEFAULT_FS_AZURE_RESTRICT_GPS_ON_OPENFILE) + private boolean restrictGpsOnOpenFile; + private String clientProvidedEncryptionKey; private String clientProvidedEncryptionKeySHA; @@ -1389,6 +1393,14 @@ public String getAbfsReadPolicy() { return abfsReadPolicy; } +/** + * Indicates whether GPS restriction on open file is enabled. + * @return true if GPS restriction is enabled on open file, false otherwise. + */ + public boolean shouldRestrictGpsOnOpenFile() { + return restrictGpsOnOpenFile; + } + /** * Enum config to allow user to pick format of x-ms-client-request-id header * @return tracingContextFormat config if valid, else default ALL_ID_FORMAT diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java index 9ed20251043d2..a24f3303270f1 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystem.java @@ -402,6 +402,21 @@ public FSDataInputStream open(final Path path, final int bufferSize) throws IOEx return open(path, Optional.empty()); } + /** + * Open a file for reading and return an {@link FSDataInputStream} that wraps + * the underlying {@link InputStream}. + * + * Note: when the filesystem is configured with `restrictGpsOnOpenFile` enabled + * (its disabled by default), existence check for the file path will be deferred + * and will not occur during this open call; it will happen when the first read + * is attempted on the returned stream. + * + * @param path the location of the file to open + * @param parameters optional {@link OpenFileParameters} which can include + * FileStatus, configuration, buffer size and mandatory keys + * @return an {@link FSDataInputStream} wrapping the opened InputStream + * @throws IOException if an I/O error occurs while opening the file + */ private FSDataInputStream open(final Path path, final Optional parameters) throws IOException { statIncrement(CALL_OPEN); diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java index 055cee0625682..e92e3397c915f 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/AzureBlobFileSystemStore.java @@ -151,6 +151,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY; +import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH; import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE; @@ -162,6 +163,7 @@ import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.INFINITE_LEASE_DURATION; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME; import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_OPENFILE_ON_DIRECTORY; import static org.apache.hadoop.fs.azurebfs.utils.UriUtils.isKeyForDirectorySet; /** @@ -553,7 +555,7 @@ public Hashtable getPathStatus(final Path path, /** * Creates an object of {@link ContextEncryptionAdapter} - * from a file path. It calls {@link org.apache.hadoop.fs.azurebfs.services.AbfsClient + * from a file path. It calls {@link org.apache.hadoop.fs.azurebfs.services.AbfsClient * #getPathStatus(String, boolean, TracingContext, EncryptionAdapter)} method to get * contextValue (x-ms-encryption-context) from the server. The contextValue is passed * to the constructor of EncryptionAdapter to create the required object of @@ -866,6 +868,53 @@ public AbfsInputStream openFileForRead(final Path path, tracingContext); } + /** + * Creates an exception indicating that openFileForRead was called on a directory. + * + * @return AbfsRestOperationException with PATH_NOT_FOUND error code and a message + * indicating that openFileForRead must be used with files and not directories. + */ + private AbfsRestOperationException openFileForReadDirectoryException() { + return new AbfsRestOperationException( + AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), + AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), + ERR_OPENFILE_ON_DIRECTORY, + null); + } + + /** + * Opens a file for read and returns an {@link AbfsInputStream}. + * + *

+ * The method decides whether to call the server's GetPathStatus based on: + *

+ * If the encryption type is {@code ENCRYPTION_CONTEXT} the server-supplied + * X-MS-ENCRYPTION-CONTEXT header will be required and used to construct a + * {@link ContextProviderEncryptionAdapter}. If that header is missing a + * {@link PathIOException} is thrown. + *

+ * + *

+ * Note: when {@link AbfsConfiguration#shouldRestrictGpsOnOpenFile()} is enabled, + * the implementation won't do the GetPathStatus call. In that case, if the file does not + * actually exist or read is attempted on a directory, {@code openFileForRead} will not fail immediately. + * It will only be detected when the returned stream performs its first read, at which point an appropriate error will be raised. + *

+ * + * @param path the path to open (may be unqualified) + * @param parameters optional {@link OpenFileParameters} that may include a {@link FileStatus} + * (possibly a {@link VersionedFileStatus}) and other open parameters + * @param statistics filesystem statistics to associate with the returned stream + * @param tracingContext tracing context for remote calls + * @return an {@link AbfsInputStream} for reading the file + * @throws IOException on IO or server errors. A {@link PathIOException} is thrown when + * an expected encryption context header is missing. + */ public AbfsInputStream openFileForRead(Path path, final Optional parameters, final FileSystem.Statistics statistics, TracingContext tracingContext) @@ -878,13 +927,13 @@ public AbfsInputStream openFileForRead(Path path, FileStatus fileStatus = parameters.map(OpenFileParameters::getStatus) .orElse(null); String relativePath = getRelativePath(path); - String resourceType, eTag; - long contentLength; + String resourceType = EMPTY_STRING, eTag = EMPTY_STRING; + long contentLength = 0; ContextEncryptionAdapter contextEncryptionAdapter = NoContextEncryptionAdapter.getInstance(); /* * GetPathStatus API has to be called in case of: - * 1. fileStatus is null or not an object of VersionedFileStatus: as eTag - * would not be there in the fileStatus object. + * 1. restrictGpsOnOpenFile config is disabled AND fileStatus is null or not + * an object of VersionedFileStatus: as eTag would not be there in the fileStatus object. * 2. fileStatus is an object of VersionedFileStatus and the object doesn't * have encryptionContext field when client's encryptionType is * ENCRYPTION_CONTEXT. @@ -908,19 +957,23 @@ public AbfsInputStream openFileForRead(Path path, getClient().getEncryptionContextProvider(), getRelativePath(path), encryptionContext.getBytes(StandardCharsets.UTF_8)); } - } else { + } + /* + * If file created with ENCRYPTION_CONTEXT, irrespective of whether isRestrictGpsOnOpenFile config is enabled or not, + * GetPathStatus API has to be called to get the encryptionContext from the response header + */ + else if (getClient().getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT + || !getAbfsConfiguration().shouldRestrictGpsOnOpenFile()) { + AbfsHttpOperation op = getClient().getPathStatus(relativePath, false, - tracingContext, null).getResult(); - resourceType = getClient().checkIsDir(op) ? DIRECTORY : FILE; - contentLength = extractContentLength(op); - eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG); + tracingContext, null).getResult(); /* * For file created with ENCRYPTION_CONTEXT, client shall receive * encryptionContext from header field: X_MS_ENCRYPTION_CONTEXT. */ if (getClient().getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) { final String fileEncryptionContext = op.getResponseHeader( - HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT); + HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT); if (fileEncryptionContext == null) { LOG.debug("EncryptionContext missing in GetPathStatus response"); throw new PathIOException(path.toString(), @@ -930,14 +983,20 @@ public AbfsInputStream openFileForRead(Path path, getClient().getEncryptionContextProvider(), getRelativePath(path), fileEncryptionContext.getBytes(StandardCharsets.UTF_8)); } + resourceType = getClient().checkIsDir(op) ? DIRECTORY : FILE; + contentLength = extractContentLength(op); + eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG); + } + /* The only remaining case is: + * - restrictGpsOnOpenFile config is enabled with null/wrong FileStatus and encryptionType not as ENCRYPTION_CONTEXT + * In this case, we don't need to call GetPathStatus API. + */ + else { + // do nothing } if (parseIsDirectory(resourceType)) { - throw new AbfsRestOperationException( - AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), - AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), - "openFileForRead must be used with files and not directories", - null); + throw openFileForReadDirectoryException(); } perfInfo.registerSuccess(true); @@ -1003,6 +1062,7 @@ AZURE_FOOTER_READ_BUFFER_SIZE, getAbfsConfiguration().getFooterReadBufferSize()) .withStreamStatistics(new AbfsInputStreamStatisticsImpl()) .withShouldReadBufferSizeAlways(getAbfsConfiguration().shouldReadBufferSizeAlways()) .withReadAheadBlockSize(getAbfsConfiguration().getReadAheadBlockSize()) + .shouldRestrictGpsOnOpenFile(getAbfsConfiguration().shouldRestrictGpsOnOpenFile()) .withBufferedPreadDisabled(bufferedPreadDisabled) .withEncryptionAdapter(contextEncryptionAdapter) .withAbfsBackRef(fsBackRef) @@ -1855,7 +1915,7 @@ private AbfsClientContext populateAbfsClientContext() { .build(); } - public String getRelativePath(final Path path) { + public static String getRelativePath(final Path path) { Preconditions.checkNotNull(path, "path"); String relPath = path.toUri().getPath(); if (relPath.isEmpty()) { diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java index aa1d410298314..9c31936a72219 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/AbfsHttpConstants.java @@ -175,6 +175,7 @@ public final class AbfsHttpConstants { * @see "https://learn.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#error-handling" */ public static final int HTTP_TOO_MANY_REQUESTS = 429; + public static final int HTTP_INVALID_RANGE = 416; public static final char CHAR_FORWARD_SLASH = '/'; public static final char CHAR_EXCLAMATION_POINT = '!'; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java index e3b3c2a467575..d2dbe17650820 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/ConfigurationKeys.java @@ -627,5 +627,11 @@ public static String containerProperty(String property, String fsName, String ac */ public static final String FS_AZURE_TAIL_LATENCY_MAX_RETRY_COUNT = "fs.azure.tail.latency.max.retry.count"; + /** + * If true, restricts GPS (getPathStatus) calls on openFileforRead + * Default: false + */ + public static final String FS_AZURE_RESTRICT_GPS_ON_OPENFILE = "fs.azure.restrict.gps.on.openfile"; + private ConfigurationKeys() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java index feafbe4f3a55f..328c878401c65 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/FileSystemConfigurations.java @@ -306,6 +306,7 @@ public final class FileSystemConfigurations { public static final int MIN_FS_AZURE_TAIL_LATENCY_ANALYSIS_WINDOW_GRANULARITY = 1; public static final int DEFAULT_FS_AZURE_TAIL_LATENCY_PERCENTILE_COMPUTATION_INTERVAL_MILLIS = 500; public static final int DEFAULT_FS_AZURE_TAIL_LATENCY_MAX_RETRY_COUNT = 1; + public static final boolean DEFAULT_FS_AZURE_RESTRICT_GPS_ON_OPENFILE = false; private FileSystemConfigurations() {} } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java index 9521518fa1f17..fd8c162fd99d6 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/constants/HttpHeaderConfigurations.java @@ -39,6 +39,7 @@ public final class HttpHeaderConfigurations { public static final String CONTENT_MD5 = "Content-MD5"; public static final String CONTENT_TYPE = "Content-Type"; public static final String RANGE = "Range"; + public static final String CONTENT_RANGE = "Content-Range"; public static final String TRANSFER_ENCODING = "Transfer-Encoding"; public static final String USER_AGENT = "User-Agent"; public static final String X_HTTP_METHOD_OVERRIDE = "X-HTTP-Method-Override"; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java index ce03d794ddb51..777460cde8446 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/contracts/services/AzureServiceErrorCode.java @@ -21,6 +21,8 @@ import java.net.HttpURLConnection; import java.util.ArrayList; import java.util.List; + +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -66,6 +68,8 @@ public enum AzureServiceErrorCode { INVALID_APPEND_OPERATION("InvalidAppendOperation", HttpURLConnection.HTTP_CONFLICT, null), UNAUTHORIZED_BLOB_OVERWRITE("UnauthorizedBlobOverwrite", HttpURLConnection.HTTP_FORBIDDEN, "This request is not authorized to perform blob overwrites."), + INVALID_RANGE("InvalidRange", AbfsHttpConstants.HTTP_INVALID_RANGE, + "The range specified is invalid for the current size of the resource."), UNKNOWN(null, -1, null); private final String errorCode; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java index 25b4529aa0863..2b2daa8a95199 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsAdaptiveInputStream.java @@ -68,7 +68,7 @@ protected int readOneBlock(final byte[] b, final int off, final int len) throws // If buffer is empty, then fill the buffer. if (getBCursor() == getLimit()) { // If EOF, then return -1 - if (getFCursor() >= getContentLength()) { + if (!(shouldRestrictGpsOnOpenFile() && isFirstRead()) && getFCursor() >= getContentLength()) { return -1; } @@ -83,7 +83,14 @@ protected int readOneBlock(final byte[] b, final int off, final int len) throws // Reset Read Type back to normal and set again based on code flow. getTracingContext().setReadType(ReadType.NORMAL_READ); - if (shouldAlwaysReadBufferSize()) { + + // If restrictGpsOnOpenFile config is enabled, skip prefetch for the first read since contentLength + // is not available yet. + if (shouldRestrictGpsOnOpenFile() && isFirstRead()) { + LOG.debug("RestrictGpsOnOpenFile is enabled. Skip readahead for first read."); + bytesRead = readInternal(getFCursor(), getBuffer(), 0, getBufferSize(), true); + } + else if (shouldAlwaysReadBufferSize()) { bytesRead = readInternal(getFCursor(), getBuffer(), 0, getBufferSize(), false); } else { // Enable readAhead when reading sequentially diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java index 5ddb9770ac56e..9e7dbdd89e823 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsDfsClient.java @@ -1347,7 +1347,7 @@ public AbfsRestOperation checkAccess(String path, public boolean checkIsDir(AbfsHttpOperation result) { String resourceType = result.getResponseHeader( HttpHeaderConfigurations.X_MS_RESOURCE_TYPE); - return StringUtils.equalsIgnoreCase(resourceType, DIRECTORY); + return resourceType != null && StringUtils.equalsIgnoreCase(resourceType, DIRECTORY); } /** diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java index fe7f3b5cb1b21..70dd05c19b1e3 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsErrors.java @@ -59,6 +59,10 @@ public final class AbfsErrors { + "and cannot be appended to by the Azure Data Lake Storage Service API"; public static final String CONDITION_NOT_MET = "The condition specified using " + "HTTP conditional header(s) is not met."; + public static final String ERR_READ_ON_DIRECTORY = "Read operation not permitted on a directory."; + public static final String ERR_OPENFILE_ON_DIRECTORY = "openFileForRead must be used with files and not directories"; + + /** * Exception message on filesystem init if token-provider-auth-type configs are provided */ diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java index 3c009c71bcdab..eba5b09337775 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java @@ -26,18 +26,22 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.CanUnbuffer; +import org.apache.hadoop.fs.FSExceptionMessages; +import org.apache.hadoop.fs.FSInputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.StreamCapabilities; +import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; import org.apache.hadoop.fs.azurebfs.constants.ReadType; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; import org.apache.hadoop.fs.impl.BackReference; import org.apache.hadoop.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.hadoop.fs.CanUnbuffer; -import org.apache.hadoop.fs.FSExceptionMessages; -import org.apache.hadoop.fs.FSInputStream; import org.apache.hadoop.fs.FileSystem.Statistics; -import org.apache.hadoop.fs.StreamCapabilities; import org.apache.hadoop.fs.azurebfs.constants.FSOperationType; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException; @@ -51,9 +55,13 @@ import static java.lang.Math.max; import static java.lang.Math.min; +import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.extractEtagHeader; +import static org.apache.hadoop.fs.azurebfs.AzureBlobFileSystemStore.getRelativePath; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.ONE_KB; import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.STREAM_ID_LEN; import static org.apache.hadoop.fs.azurebfs.constants.InternalConstants.CAPABILITY_SAFE_READAHEAD; +import static org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode.INVALID_RANGE; +import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_READ_ON_DIRECTORY; import static org.apache.hadoop.io.Sizes.S_128K; import static org.apache.hadoop.io.Sizes.S_2M; import static org.apache.hadoop.util.StringUtils.toLowerCase; @@ -73,11 +81,11 @@ public abstract class AbfsInputStream extends FSInputStream implements CanUnbuff private final Statistics statistics; private final String path; - private final long contentLength; + private volatile long contentLength; private final int bufferSize; // default buffer size private final int footerReadSize; // default buffer size to read when reading footer private final int readAheadQueueDepth; // initialized in constructor - private final String eTag; // eTag of the path when InputStream are created + private String eTag; // eTag of the path when InputStream is created private final boolean tolerateOobAppends; // whether tolerate Oob Appends private final boolean readAheadEnabled; // whether enable readAhead; private final boolean readAheadV2Enabled; // whether enable readAhead V2; @@ -293,9 +301,6 @@ public synchronized int read(final byte[] b, final int off, final int len) throw // go back and read from buffer is fCursor - limit. // There maybe case that we read less than requested data. long filePosAtStartOfBuffer = fCursor - limit; - if (abfsReadFooterMetrics != null) { - abfsReadFooterMetrics.updateReadMetrics(filePathIdentifier, len, contentLength, nextReadPos); - } if (nextReadPos >= filePosAtStartOfBuffer && nextReadPos <= fCursor) { // Determining position in buffer from where data is to be read. bCursor = (int) (nextReadPos - filePosAtStartOfBuffer); @@ -312,6 +317,9 @@ public synchronized int read(final byte[] b, final int off, final int len) throw limit = 0; bCursor = 0; } + + long nextReadPosForMetric = nextReadPos; + if (shouldReadFully()) { lastReadBytes = readFileCompletely(b, currentOff, currentLen); } else if (shouldReadLastBlock()) { @@ -319,6 +327,11 @@ public synchronized int read(final byte[] b, final int off, final int len) throw } else { lastReadBytes = readOneBlock(b, currentOff, currentLen); } + + if (abfsReadFooterMetrics != null) { + abfsReadFooterMetrics.updateReadMetrics(filePathIdentifier, len, contentLength, nextReadPosForMetric); + } + if (lastReadBytes > 0) { currentOff += lastReadBytes; currentLen -= lastReadBytes; @@ -332,13 +345,13 @@ public synchronized int read(final byte[] b, final int off, final int len) throw } private boolean shouldReadFully() { - return this.firstRead && this.context.readSmallFilesCompletely() + return this.firstRead && !shouldRestrictGpsOnOpenFile() && this.context.readSmallFilesCompletely() && this.contentLength <= this.bufferSize; } private boolean shouldReadLastBlock() { long footerStart = max(0, this.contentLength - FOOTER_SIZE); - return this.firstRead && this.context.optimizeFooterRead() + return this.firstRead && !shouldRestrictGpsOnOpenFile() && this.context.optimizeFooterRead() && this.fCursor >= footerStart; } @@ -561,11 +574,62 @@ protected int readInternal(final long position, final byte[] b, final int offset } } + /** + * Creates an exception indicating that a read operation was attempted on a directory. + * + * @return an {@link UnsupportedOperationException} indicating the operation is not permitted on a directory + */ + private UnsupportedOperationException directoryReadException() { + return new UnsupportedOperationException( + ERR_READ_ON_DIRECTORY, + new AbfsRestOperationException( + AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(), + AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(), + ERR_READ_ON_DIRECTORY, + null)); + } + + /** + * Checks if the current path is a directory (for both implicit and explicit) in FNS account. + * If the path is a directory, throws an exception indicating that read operations are not permitted. + * + * @throws IOException if the path is a directory or if there is an error accessing the path status + */ + private void checkIfDirPathInFNS() throws IOException { + AbfsHttpOperation gpsOp = client.getPathStatus( + getRelativePath(new Path(path)), + false, + tracingContext, + contextEncryptionAdapter).getResult(); + + if (client.checkIsDir(gpsOp)) { + throw directoryReadException(); + } + } + +/** + * Extracts the content length from the HTTP response headers. + * Uses the Content-Range header to determine the total file size, which is necessary + * for handling partial reads correctly. + * + * @param op the ABFS HTTP operation containing the response headers + * @return the content length of the file + */ + private long extractContentLength(AbfsHttpOperation op) { + // We need to use content range header instead of content length to take care of partial reads + String contentRange = op.getResponseHeader(HttpHeaderConfigurations.CONTENT_RANGE); + contentLength = 0; + if (!StringUtils.isEmpty(contentRange)) { + contentLength = Long.parseLong(contentRange.split(AbfsHttpConstants.FORWARD_SLASH)[1]); + } + return contentLength; + } + int readRemote(long position, byte[] b, int offset, int length, TracingContext tracingContext) throws IOException { if (position < 0) { throw new IllegalArgumentException("attempting to read from negative offset"); } - if (position >= contentLength) { + if (!(shouldRestrictGpsOnOpenFile() && isFirstRead()) && position >= contentLength) { return -1; // Hadoop prefers -1 to EOFException } if (b == null) { @@ -591,6 +655,16 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t op = client.read(path, position, b, offset, length, tolerateOobAppends ? "*" : eTag, cachedSasToken.get(), contextEncryptionAdapter, tracingContext); + + // Update metadata on first read if restrictGpsOnOpenFile is enabled + if (shouldRestrictGpsOnOpenFile() && isFirstRead()) { + if (client.checkIsDir(op.getResult())) { + throw directoryReadException(); + } + contentLength = extractContentLength(op.getResult()); + eTag = extractEtagHeader(op.getResult()); + } + cachedSasToken.update(op.getSasToken()); LOG.debug("issuing HTTP GET request params position = {} b.length = {} " + "offset = {} length = {}", position, b.length, offset, length); @@ -599,12 +673,50 @@ int readRemote(long position, byte[] b, int offset, int length, TracingContext t } catch (AzureBlobFileSystemException ex) { if (ex instanceof AbfsRestOperationException) { AbfsRestOperationException ere = (AbfsRestOperationException) ex; - if (ere.getStatusCode() == HttpURLConnection.HTTP_NOT_FOUND) { - throw new FileNotFoundException(ere.getMessage()); + int status = ere.getStatusCode(); + if (ere.getErrorMessage().contains(ERR_READ_ON_DIRECTORY)) { + throw ere; + } + boolean isHnsEnabled = client.getIsNamespaceEnabled(); + + // Case-1: 404 NOT FOUND + if (status == HttpURLConnection.HTTP_NOT_FOUND) { + /* + * If HNS account or restrictGpsOnOpenFile disabled, + * we dont need any further checks + */ + if (isHnsEnabled || !shouldRestrictGpsOnOpenFile()) { + throw new FileNotFoundException(ere.getMessage()); + } + + try { + /* + * For FNS account with restrictGpsOnOpenFile enabled, + * need to rule out if the path is an implicit directory + */ + checkIfDirPathInFNS(); + } catch (AzureBlobFileSystemException gpsEx) { + AbfsRestOperationException gpsEre = (AbfsRestOperationException) gpsEx; + if (gpsEre.getErrorMessage().contains(ERR_READ_ON_DIRECTORY)) { + throw gpsEre; + } + // The file does not exist + else { + throw new FileNotFoundException(gpsEre.getMessage()); + } + } + } + + // Case-2: 416 INVALID RANGE + if (!isHnsEnabled && INVALID_RANGE.equals(ere.getErrorCode())) { + // Need to rule out if the path is an explicit directory + checkIfDirPathInFNS(); } } + // Default: propagate original error throw new IOException(ex); } + long bytesRead = op.getResult().getBytesReceived(); if (streamStatistics != null) { streamStatistics.remoteBytesRead(bytesRead); @@ -693,9 +805,12 @@ public synchronized int available() throws IOException { throw new IOException( FSExceptionMessages.STREAM_IS_CLOSED); } - final long remaining = this.contentLength - this.getPos(); - return remaining <= Integer.MAX_VALUE - ? (int) remaining : Integer.MAX_VALUE; + if (!(shouldRestrictGpsOnOpenFile() && isFirstRead())) { + final long remaining = this.contentLength - this.getPos(); + return remaining <= Integer.MAX_VALUE + ? (int) remaining : Integer.MAX_VALUE; + } + return Integer.MAX_VALUE; } /** @@ -1088,4 +1203,8 @@ public int maxReadSizeForVectorReads() { protected long getContentLength() { return contentLength; } + + public boolean shouldRestrictGpsOnOpenFile() { + return context.shouldRestrictGpsOnOpenFile(); + } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java index cb51fa22900e4..96531e4653360 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStreamContext.java @@ -59,6 +59,8 @@ public class AbfsInputStreamContext extends AbfsStreamContext { private boolean bufferedPreadDisabled; + private boolean restrictGpsOnOpenFile; + /** A BackReference to the FS instance that created this OutputStream. */ private BackReference fsBackRef; @@ -254,6 +256,18 @@ public AbfsInputStreamContext withEncryptionAdapter( return this; } +/** + * Sets restriction of GPS on open file. + * + * @param restrictGpsOnOpenFile whether to restrict GPS on open file. + * @return this instance. + */ + public AbfsInputStreamContext shouldRestrictGpsOnOpenFile( + final boolean restrictGpsOnOpenFile) { + this.restrictGpsOnOpenFile = restrictGpsOnOpenFile; + return this; + } + /** * Finalizes and validates the context configuration. *

@@ -337,6 +351,11 @@ public int getReadAheadBlockSize() { return readAheadBlockSize; } + /** @return whether restrictGpsOnOpenFile is enabled. */ + public boolean shouldRestrictGpsOnOpenFile() { + return this.restrictGpsOnOpenFile; + } + /** @return whether buffered pread is disabled. */ public boolean isBufferedPreadDisabled() { return bufferedPreadDisabled; diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPrefetchInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPrefetchInputStream.java index c0343ca724e05..0002f3bdf5089 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPrefetchInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsPrefetchInputStream.java @@ -21,6 +21,7 @@ import java.io.IOException; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.azurebfs.constants.ReadType; import org.apache.hadoop.fs.azurebfs.utils.TracingContext; /** @@ -66,7 +67,7 @@ protected int readOneBlock(final byte[] b, final int off, final int len) throws // If buffer is empty, then fill the buffer. if (getBCursor() == getLimit()) { // If EOF, then return -1 - if (getFCursor() >= getContentLength()) { + if (!(shouldRestrictGpsOnOpenFile() && isFirstRead()) && getFCursor() >= getContentLength()) { return -1; } @@ -80,10 +81,22 @@ protected int readOneBlock(final byte[] b, final int off, final int len) throws } /* - * Always start with Prefetch even from first read. - * Even if out of order seek comes, prefetches will be triggered for next set of blocks. + Skips prefetch for the first read if restrictGpsOnOpenFile config is enabled. + This is required since contentLength is not available yet to determine prefetch block size. */ - bytesRead = readInternal(getFCursor(), getBuffer(), 0, getBufferSize(), false); + if (shouldRestrictGpsOnOpenFile() && isFirstRead()) { + getTracingContext().setReadType(ReadType.NORMAL_READ); + LOG.debug("RestrictGpsOnOpenFile is enabled. Skip readahead for first read even for sequential input policy."); + bytesRead = readInternal(getFCursor(), getBuffer(), 0, getBufferSize(), true); + } + else { + /* + * Always start with Prefetch even from first read UNLESS restrictGpsOnOpenFile config is enabled. + * Even if out of order seek comes, prefetches will be triggered for next set of blocks. + */ + bytesRead = readInternal(getFCursor(), getBuffer(), 0, getBufferSize(), false); + } + if (isFirstRead()) { setFirstRead(false); } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRandomInputStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRandomInputStream.java index b484cc6c84353..d676cfd49bf75 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRandomInputStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRandomInputStream.java @@ -68,7 +68,7 @@ protected int readOneBlock(final byte[] b, final int off, final int len) // If buffer is empty, then fill the buffer. if (getBCursor() == getLimit()) { // If EOF, then return -1 - if (getFCursor() >= getContentLength()) { + if (!(shouldRestrictGpsOnOpenFile() && isFirstRead()) && getFCursor() >= getContentLength()) { return -1; } diff --git a/hadoop-tools/hadoop-azure/src/site/markdown/index.md b/hadoop-tools/hadoop-azure/src/site/markdown/index.md index 5a90265c81220..8b8c8c51ff6e0 100644 --- a/hadoop-tools/hadoop-azure/src/site/markdown/index.md +++ b/hadoop-tools/hadoop-azure/src/site/markdown/index.md @@ -1202,6 +1202,33 @@ when there are too many writes from the same process. `fs.azure.analysis.period`: The time after which sleep duration is recomputed after analyzing metrics. The default value for the same is 10 seconds. +### Metadata Options + +The following configurations are related to metadata operations. + +`fs.azure.restrict.gps.on.openfile`: Controls whether the GetPathStatus (GPS) API call +is restricted when opening a file for read. When enabled, this configuration reduces +metadata overhead by skipping the GPS call during file open operations. The file +existence checks are also delayed until the first read operation occurs. + +**Default:** `false` (disabled) + +**Behavior when enabled:** +* The GetPathStatus call is skipped when opening a file, reducing metadata call overhead +* File existence validation is deferred until the first read operation +* Small file full read optimizations are not available +* Footer read optimizations are not available +* The first read operation will not be able to initiate prefetch + +**Exception:** If the file was created with an encryption context, the GetPathStatus call +will still be performed even when this configuration is enabled, as the encryption metadata +is required. + +**Recommended Alternative:** To reduce metadata calls while maintaining optimal read +performance, provide the `FileStatus` object when opening the file using the +`openFile()` builder pattern with the `.withFileStatus()` option. This approach avoids +the GPS call while preserving all read optimizations. + ### Security Options `fs.azure.always.use.https`: Enforces to use HTTPS instead of HTTP when the flag is made true. Irrespective of the flag, `AbfsClient` will use HTTPS if the secure diff --git a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java index 5cf0bd473fc24..40c3440ef25c5 100644 --- a/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java +++ b/hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/services/TestAbfsInputStream.java @@ -23,8 +23,10 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.URL; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Arrays; +import java.util.Base64; import java.util.List; import java.util.Optional; import java.util.Random; @@ -32,6 +34,13 @@ import org.apache.hadoop.fs.azurebfs.AbfsConfiguration; import org.apache.hadoop.fs.azurebfs.AbfsCountersImpl; +import org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys; +import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations; +import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException; +import org.apache.hadoop.fs.azurebfs.contracts.services.AzureServiceErrorCode; +import org.apache.hadoop.fs.azurebfs.extensions.EncryptionContextProvider; +import org.apache.hadoop.fs.azurebfs.security.ABFSKey; +import org.apache.hadoop.fs.azurebfs.utils.EncryptionType; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; @@ -78,13 +87,16 @@ import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.anyList; +import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -117,6 +129,8 @@ public class TestAbfsInputStream extends AbstractAbfsIntegrationTest { private static final int POSITION_INDEX = 9; private static final int OPERATION_INDEX = 6; private static final int READTYPE_INDEX = 11; + private static final int ENCRYPTION_KEY_SIZE = 32; + private static final int SMALL_BUFFER_SIZE = 100; @AfterEach @@ -135,6 +149,24 @@ AbfsRestOperation getMockRestOp() { return op; } + /** + * Creates a mock AbfsRestOperation with metadata headers for testing. + * The mock includes Content-Range and ETag headers in the response. + * + * @return a mocked AbfsRestOperation with response metadata + */ + AbfsRestOperation getMockRestOpWithMetadata() { + AbfsRestOperation op = mock(AbfsRestOperation.class); + AbfsHttpOperation httpOp = mock(AbfsHttpOperation.class); + when(httpOp.getBytesReceived()).thenReturn(1024L); + when(op.getResult()).thenReturn(httpOp); + when(op.getSasToken()).thenReturn(TestCachedSASToken.getTestCachedSASTokenInstance().get()); + when(op.getResult().getResponseHeader(HttpHeaderConfigurations.CONTENT_RANGE)).thenReturn("bytes 0-1023/1024"); + when(op.getResult().getResponseHeader(HttpHeaderConfigurations.ETAG)).thenReturn("etag"); + + return op; + } + AbfsClient getMockAbfsClient() throws URISyntaxException { // Mock failure for client.read() AbfsClient client = mock(AbfsClient.class); @@ -350,6 +382,663 @@ public void testOpenFileWithOptions() throws Exception { AbfsRestOperationType.GetPathStatus)); } +/** + * Mocks an {@link AbfsClient} to simulate encryption context behavior for testing. + * Sets up the client to return ENCRYPTION_CONTEXT as the encryption type and all the necessary + * mock responses to simulate reading a file with encryption context. + * + * @param encryptedClient the {@link AbfsClient} to mock + * @throws IOException if mocking fails + */ +private void mockClientForEncryptionContext(AbfsClient encryptedClient) throws IOException { + doReturn(EncryptionType.ENCRYPTION_CONTEXT) + .when(encryptedClient) + .getEncryptionType(); + + AbfsHttpOperation mockOp = mock(AbfsHttpOperation.class); + when(mockOp.getResponseHeader(HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT)) + .thenReturn(Base64.getEncoder() + .encodeToString("ctx".getBytes(StandardCharsets.UTF_8))); + when(mockOp.getResponseHeader(HttpHeaderConfigurations.CONTENT_LENGTH)) + .thenReturn("10"); + + AbfsRestOperation mockResult = mock(AbfsRestOperation.class); + when(mockResult.getResult()).thenReturn(mockOp); + + doReturn(mockResult) + .when(encryptedClient) + .getPathStatus(anyString(), anyBoolean(), any(), any()); + + doReturn(false) + .when(encryptedClient) + .checkIsDir(any()); + + EncryptionContextProvider provider = + mock(EncryptionContextProvider.class); + when(provider.getEncryptionKey(anyString(), any())) + .thenReturn(new ABFSKey(new byte[ENCRYPTION_KEY_SIZE])); + + doReturn(provider) + .when(encryptedClient) + .getEncryptionContextProvider(); +} + + /** + * Returns an instance of {@link AzureBlobFileSystem} with the + * FS_AZURE_RESTRICT_GPS_ON_OPENFILE configuration enabled. + * This setting restricts the use of GetPathStatus on open file operations. + * + * @return an AzureBlobFileSystem with restrictGpsOnOpenFile enabled + * @throws Exception if the file system cannot be created + */ + private AzureBlobFileSystem getFileSystemWithRestrictGpsEnabled() throws Exception { + Configuration conf = getRawConfiguration(); + conf.setBoolean( + ConfigurationKeys.FS_AZURE_RESTRICT_GPS_ON_OPENFILE, + true); + + return getFileSystem(conf); + } + + /** + * Tests opening encrypted and non-encrypted files under different clients. + * Verifies that even with restrictGpsOnOpenFile enabled, for files created with ENCRYPTION_CONTEXT, the client invokes + * getPathStatus, while for non-encrypted files, getPathStatus is not called. + * + * @throws Exception if any error occurs during the test + */ + @Test + public void testEncryptedAndNonEncryptedOpenUnderDifferentClients() + throws Exception { + AzureBlobFileSystem fs = getFileSystemWithRestrictGpsEnabled(); + + Path encryptedFile = new Path("/enc/file1"); + Path plainFile = new Path("/plain/file2"); + + fs.mkdirs(encryptedFile.getParent()); + fs.mkdirs(plainFile.getParent()); + + writeBufferToNewFile(encryptedFile, new byte[10]); + writeBufferToNewFile(plainFile, new byte[10]); + + TracingContext tracingContext = getTestTracingContext(fs, false); + + /* + * ========================= + * Client A — ENCRYPTED FILE + * ========================= + */ + AzureBlobFileSystemStore encryptedStore = getAbfsStore(fs); + + AbfsClient encryptedRealClient = + getAbfsClient(encryptedStore); + AbfsClient encryptedClient = + spy(encryptedRealClient); + + setAbfsClient(encryptedStore, encryptedClient); + mockClientForEncryptionContext(encryptedClient); + + encryptedStore.openFileForRead( + encryptedFile, Optional.empty(), null, tracingContext); + + // File created with ENCRYPTION_CONTEXT, so GPS should be invoked + verify(encryptedClient, times(1)) + .getPathStatus(anyString(), anyBoolean(), any(), isNull()); + + /* + * ============================= + * Client B — NON-ENCRYPTED FILE + * ============================= + */ + AzureBlobFileSystemStore plainStore = getAbfsStore(fs); + + AbfsClient plainRealClient = + getAbfsClient(plainStore); + AbfsClient plainClient = + spy(plainRealClient); + + setAbfsClient(plainStore, plainClient); + + doReturn(EncryptionType.NONE) + .when(plainClient) + .getEncryptionType(); + + plainStore.openFileForRead( + plainFile, Optional.empty(), null, tracingContext); + + verify(plainClient, never()) + .getPathStatus(anyString(), anyBoolean(), any(), any()); + } + + /** + * Verifies the prefetch behavior of the input stream by performing two reads and checking + * the number of times the client's read method is invoked after each read. + * + * @param fs the AzureBlobFileSystem instance + * @param store the AzureBlobFileSystemStore instance + * @param config the AbfsConfiguration instance + * @param file the file path to read from + * @param restrictGps whether to restrict GPS on open file + * @param readsAfterFirst expected number of client.read invocations after the first read + * @param readsAfterSecond expected number of client.read invocations after the second read + * @throws Exception if any error occurs during verification + */ + private void verifyPrefetchBehavior( + AzureBlobFileSystem fs, + AzureBlobFileSystemStore store, + AbfsConfiguration config, + Path file, + boolean restrictGps, + int readsAfterFirst, + int readsAfterSecond) throws Exception { + + AbfsClient realClient = store.getClient(); + AbfsClient spyClient = Mockito.spy(realClient); + Mockito.doReturn(spyClient).when(store).getClient(); + + Mockito.doReturn(restrictGps) + .when(config).shouldRestrictGpsOnOpenFile(); + + try (FSDataInputStream in = fs.open(file)) { + AbfsInputStream abfsIn = + (AbfsInputStream) in.getWrappedStream(); + + // First read. Sleep for a sec to get the readAhead threads to complete + abfsIn.read(new byte[ONE_MB]); + Thread.sleep(1000); + + verify(spyClient, times(readsAfterFirst)) + .read(anyString(), anyLong(), any(byte[].class), + anyInt(), anyInt(), + anyString(), nullable(String.class), + any(ContextEncryptionAdapter.class), + any(TracingContext.class)); + + // Second read. Sleep for a sec to get the readAhead threads to complete + abfsIn.read(ONE_MB, new byte[ONE_MB], 0, ONE_MB); + Thread.sleep(1000); + + verify(spyClient, times(readsAfterSecond)) + .read(anyString(), anyLong(), any(byte[].class), + anyInt(), anyInt(), + anyString(), nullable(String.class), + any(ContextEncryptionAdapter.class), + any(TracingContext.class)); + } + } + + /** + * Tests the prefetch behavior of the input stream when restrictGPSOnOpenFile is enabled. + * First read: only direct read is triggered. + * Second read: triggers readahead reads. + * Verifies the expected number of read invocations after each read operation + * + * @throws Exception if any error occurs during the test + */ + @Test + public void testPrefetchBehaviourWithRestrictGPSOnOpenFile() throws Exception { + AzureBlobFileSystem fs = Mockito.spy(getFileSystem()); + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + AbfsConfiguration config = Mockito.spy(store.getAbfsConfiguration()); + + Mockito.doReturn(ONE_MB).when(config).getReadBufferSize(); + Mockito.doReturn(ONE_MB).when(config).getReadAheadBlockSize(); + Mockito.doReturn(3).when(config).getReadAheadQueueDepth(); + Mockito.doReturn(true).when(config).isReadAheadEnabled(); + + Mockito.doReturn(store).when(fs).getAbfsStore(); + Mockito.doReturn(config).when(store).getAbfsConfiguration(); + + Path file = createTestFile(fs, 4 * ONE_MB); + + // restrictGPSOnOpenFile set as true + verifyPrefetchBehavior( + fs, store, config, file, + true, + 1, // only direct read + 4 // second read triggers readaheads + ); + } + + + /** + * Asserts that the correct read types are present in the tracing context headers + * when restrictGpsOnOpenFile is enabled. + * + * @param fs the AzureBlobFileSystem instance + * @param numOfReadCalls the number of read calls to check for the specified read type + * @param totalReadCalls the total number of read calls expected + * @param readType the expected ReadType for the calls (e.g., PREFETCH_READ, MISSEDCACHE_READ) + * @throws Exception if verification fails + */ + private void assertReadTypeWithRestrictGpsOnOpenFileEnabled(AzureBlobFileSystem fs, int numOfReadCalls, + int totalReadCalls, ReadType readType) throws Exception { + ArgumentCaptor captor1 = ArgumentCaptor.forClass(String.class); + ArgumentCaptor captor2 = ArgumentCaptor.forClass(Long.class); + ArgumentCaptor captor3 = ArgumentCaptor.forClass(byte[].class); + ArgumentCaptor captor4 = ArgumentCaptor.forClass(Integer.class); + ArgumentCaptor captor5 = ArgumentCaptor.forClass(Integer.class); + ArgumentCaptor captor6 = ArgumentCaptor.forClass(String.class); + ArgumentCaptor captor7 = ArgumentCaptor.forClass(String.class); + ArgumentCaptor captor8 = ArgumentCaptor.forClass(ContextEncryptionAdapter.class); + ArgumentCaptor captor9 = ArgumentCaptor.forClass(TracingContext.class); + + verify(fs.getAbfsStore().getClient(), times(totalReadCalls)).read( + captor1.capture(), captor2.capture(), captor3.capture(), + captor4.capture(), captor5.capture(), captor6.capture(), + captor7.capture(), captor8.capture(), captor9.capture()); + List tracingContextList = captor9.getAllValues(); + + // Apart from random read policy, all policies will result in first read being normal read. + verifyHeaderForReadTypeInTracingContextHeader(tracingContextList.get(0), NORMAL_READ, 0); + + if (readType == PREFETCH_READ) { + /* + * The first read was a normal read. Prefetching would have started from second read. + * Second read also could be a Normal or Missed Cache read, so we will assert for reads apart from these two. + * Since calls are asynchronous, we can not guarantee the order of calls. + * Therefore, we cannot assert on exact position here. + */ + for (int i = tracingContextList.size() - (numOfReadCalls - 2); i < tracingContextList.size(); i++) { + verifyHeaderForReadTypeInTracingContextHeader(tracingContextList.get(i), readType, -1); + } + } else if (readType == MISSEDCACHE_READ) { + int expectedReadPos = ONE_MB; + for (int i = tracingContextList.size() - numOfReadCalls + 1; i < tracingContextList.size(); i++) { + verifyHeaderForReadTypeInTracingContextHeader(tracingContextList.get(i), readType, expectedReadPos); + expectedReadPos += ONE_MB; + } + } else { + int expectedReadPos = ONE_MB; + for (int i = tracingContextList.size() - numOfReadCalls + 1; i < tracingContextList.size(); i++) { + verifyHeaderForReadTypeInTracingContextHeader(tracingContextList.get(i), readType, expectedReadPos); + expectedReadPos += ONE_MB; + } + } + } + + /** + * Helper method to test the read type behavior with FS_AZURE_RESTRICT_GPS_ON_OPENFILE enabled. + *

+ * Creates a test file of the specified size, performs a read operation, and asserts that the number + * of bytes read matches the file size. Then verifies that the expected read types are present in the + * tracing context headers for the specified number of read calls. + * + * @param fs the AzureBlobFileSystem instance + * @param fileSize the size of the test file to create and read + * @param readType the expected ReadType for the calls (e.g., PREFETCH_READ, MISSEDCACHE_READ) + * @param numOfReadCalls the number of read calls to check for the specified read type + * @param totalReadCalls the total number of read calls expected + * @throws Exception if verification fails + */ + private void testReadTypeWithRestrictGpsOnOpenFile(AzureBlobFileSystem fs, + int fileSize, ReadType readType, int numOfReadCalls, int totalReadCalls) throws Exception { + Path testPath = createTestFile(fs, fileSize); + try (FSDataInputStream iStream = fs.open(testPath)) { + int bytesRead = iStream.read(new byte[fileSize], 0, + fileSize); + Thread.sleep(1000); //Sleep for a sec to get the readAhead threads to complete + assertThat(fileSize) + .describedAs("Read size should match file size") + .isEqualTo(bytesRead); + } + assertReadTypeWithRestrictGpsOnOpenFileEnabled(fs, numOfReadCalls, totalReadCalls, readType); + } + + /** + * Test to verify the read type behavior when FS_AZURE_RESTRICT_GPS_ON_OPENFILE is enabled. + *

+ * This test covers multiple scenarios to ensure that the correct read type is set in the tracing context + * and that the expected number of client read calls are made for different configurations: + *

    + *
  • Prefetch Read Type with adaptive policy (read ahead enabled, depth 2)
  • + *
  • Missed Cache Read Type with adaptive policy (read ahead enabled, depth 0)
  • + *
  • Footer Read Type (file size less than footer read size, small file optimization disabled)
  • + *
  • Small File Read Type (file size less than block size, small file optimization enabled)
  • + *
  • Missed Cache Read Type with sequential policy (read ahead enabled, depth 0, sequential policy)
  • + *
  • Prefetch Read Type with sequential policy (read ahead enabled, depth 3, sequential policy)
  • + *
+ * + * @throws Exception if any error occurs during the test + */ + @Test + public void testReadTypeWithRestrictGpsInOpenFileEnabled() throws Exception { + AzureBlobFileSystem spiedFs = Mockito.spy(getFileSystem()); + AzureBlobFileSystemStore spiedStore = Mockito.spy(spiedFs.getAbfsStore()); + AbfsConfiguration spiedConfig = Mockito.spy(spiedStore.getAbfsConfiguration()); + AbfsClient spiedClient = Mockito.spy(spiedStore.getClient()); + Mockito.doReturn(ONE_MB).when(spiedConfig).getReadBufferSize(); + Mockito.doReturn(ONE_MB).when(spiedConfig).getReadAheadBlockSize(); + Mockito.doReturn(true).when(spiedConfig).shouldRestrictGpsOnOpenFile(); + Mockito.doReturn(false).when(spiedConfig).isReadAheadV2Enabled(); + Mockito.doReturn(spiedClient).when(spiedStore).getClient(); + Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore(); + Mockito.doReturn(spiedConfig).when(spiedStore).getAbfsConfiguration(); + int totalReadCalls = 0; + int fileSize; + + /* + * Test to verify Prefetch Read Type with adaptive policy. + * Setting read ahead depth to 2 with prefetch enabled ensures that prefetch is done. + * + * Should give normal read for first read. + */ + fileSize = 4 * ONE_MB; // To make sure multiple blocks are read. + totalReadCalls += 4; + doReturn(true).when(spiedConfig).isReadAheadEnabled(); + Mockito.doReturn(2).when(spiedConfig).getReadAheadQueueDepth(); + testReadTypeWithRestrictGpsOnOpenFile(spiedFs, fileSize, PREFETCH_READ, 4, totalReadCalls); + + /* + * Test to verify Missed Cache Read Type with adaptive policy. + * Setting read ahead depth to 0 ensure that nothing can be got from prefetch. + * In such a case Input Stream will do a sequential read with missed cache read type. + * + * Should give normal read for first read. + */ + fileSize = 3 * ONE_MB; // To make sure multiple blocks are read. + totalReadCalls += 3; + doReturn(true).when(spiedConfig).isReadAheadEnabled(); + Mockito.doReturn(0).when(spiedConfig).getReadAheadQueueDepth(); + testReadTypeWithRestrictGpsOnOpenFile(spiedFs, fileSize, MISSEDCACHE_READ, 3, totalReadCalls); + + /* + * Test to verify Footer Read Type. + * Having file size less than footer read size and disabling small file opt + */ + fileSize = 8 * ONE_KB; + totalReadCalls += 1; // Full file will be read along with footer. + doReturn(false).when(spiedConfig).readSmallFilesCompletely(); + doReturn(true).when(spiedConfig).optimizeFooterRead(); + testReadTypeWithRestrictGpsOnOpenFile(spiedFs, fileSize, NORMAL_READ, 1, totalReadCalls); + + /* + * Test to verify Small File Read Type. + * Having file size less than block size and disabling footer read opt + */ + totalReadCalls += 1; // Full file will be read along with footer. + doReturn(true).when(spiedConfig).readSmallFilesCompletely(); + doReturn(false).when(spiedConfig).optimizeFooterRead(); + testReadTypeWithRestrictGpsOnOpenFile(spiedFs, fileSize, NORMAL_READ, 1, totalReadCalls); + + /* + * Test to verify Missed Cache Read Type with sequential policy. + * Setting read ahead depth to 0 ensure that nothing can be got from prefetch. + * In such a case Input Stream will do a sequential read with missed cache read type. + * + * Should give normal read for first read. + */ + fileSize = 3 * ONE_MB; // To make sure multiple blocks are read. + totalReadCalls += 3; + Mockito.doReturn(0).when(spiedConfig).getReadAheadQueueDepth(); + Mockito.doReturn(FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL).when(spiedConfig).getAbfsReadPolicy(); + doReturn(true).when(spiedConfig).isReadAheadEnabled(); + testReadTypeWithRestrictGpsOnOpenFile(spiedFs, fileSize, MISSEDCACHE_READ, 3, totalReadCalls); + + /* + * Test to verify Prefetch Read Type with sequential policy. + * Setting read ahead depth to 3 with prefetch enabled ensures that prefetch is done. + * + * Should give normal read for first read. + */ + totalReadCalls += 3; + Mockito.doReturn(3).when(spiedConfig).getReadAheadQueueDepth(); + doReturn(true).when(spiedConfig).isReadAheadEnabled(); + testReadTypeWithRestrictGpsOnOpenFile(spiedFs, fileSize, PREFETCH_READ, 3, totalReadCalls); + } + + /** + * Tests that for FNS accounts reading from a directory with FS_AZURE_RESTRICT_GPS_ON_OPENFILE enabled + * throws the expected exception with PATH_NOT_FOUND status code and respective error message. + * Verifies that getPathStatus is called for both explicit and implicit folders' confirmation. + * + * @throws Exception if any error occurs during the test + */ + @Test + public void testFNSExceptionOnDirReadWithRestrictGpsConfig() throws Exception { + assumeHnsDisabled(); + AzureBlobFileSystem fs = getFileSystemWithRestrictGpsEnabled(); + AzureBlobFileSystemStore store = getAbfsStore(fs); + + AbfsClient realClient = getAbfsClient(store); + AbfsClient spyClient = spy(realClient); + setAbfsClient(store, spyClient); + + String explicitTestFolder = "/testExplFolderRestrictGps"; + fs.mkdirs(new Path(explicitTestFolder)); + + try (FSDataInputStream in = fs.open(new Path(explicitTestFolder))) { + AbfsInputStream abfsIn = (AbfsInputStream) in.getWrappedStream(); + byte[] buf = new byte[SMALL_BUFFER_SIZE]; + UnsupportedOperationException ex = Assertions.assertThrows(UnsupportedOperationException.class, () -> abfsIn.read(buf)); + AbfsRestOperationException cause = (AbfsRestOperationException) ex.getCause(); + assertThat(cause.getStatusCode()).isEqualTo(AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode()); + assertThat(ex.getMessage()).contains("Read operation not permitted on a directory."); + } + verify(spyClient, times(1)) + .getPathStatus( + anyString(), + anyBoolean(), + any(TracingContext.class), + any()); + + String implicitTestFolder = "/testImplFolderRestrictGps"; + createAzCopyFolder(new Path(implicitTestFolder)); + try (FSDataInputStream in2 = fs.open(new Path(implicitTestFolder))) { + AbfsInputStream abfsIn2 = (AbfsInputStream) in2.getWrappedStream(); + byte[] buf2 = new byte[SMALL_BUFFER_SIZE]; + UnsupportedOperationException ex2 = Assertions.assertThrows(UnsupportedOperationException.class, () -> abfsIn2.read(buf2)); + AbfsRestOperationException cause = (AbfsRestOperationException) ex2.getCause(); + assertThat(cause.getStatusCode()).isEqualTo(AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode()); + assertThat(ex2.getMessage()).contains("Read operation not permitted on a directory."); + } + verify(spyClient, times(2)) + .getPathStatus( + anyString(), + anyBoolean(), + any(TracingContext.class), + any()); + } + + /** + * Tests that for HNS accounts reading from a directory when + * FS_AZURE_RESTRICT_GPS_ON_OPENFILE is set to true, throws an AbfsRestOperationException + * with PATH_NOT_FOUND status code and the correct error message. + * Also verifies that getPathStatus is not called. + * + * @throws Exception if any error occurs during the test + */ + @Test + public void testHNSExceptionOnDirReadWithRestrictGpsConfig() throws Exception { + assumeHnsEnabled(); + AzureBlobFileSystem fs = getFileSystemWithRestrictGpsEnabled(); + + AzureBlobFileSystemStore store = getAbfsStore(fs); + + AbfsClient realClient = getAbfsClient(store); + AbfsClient spyClient = spy(realClient); + setAbfsClient(store, spyClient); + + String testFolder = "/testFolderRestrictGps"; + fs.mkdirs(new Path(testFolder)); + + try (FSDataInputStream in = fs.open(new Path(testFolder))) { + AbfsInputStream abfsIn = (AbfsInputStream) in.getWrappedStream(); + byte[] buf = new byte[SMALL_BUFFER_SIZE]; + UnsupportedOperationException ex = Assertions.assertThrows(UnsupportedOperationException.class, () -> abfsIn.read(buf)); + AbfsRestOperationException cause = (AbfsRestOperationException) ex.getCause(); + assertThat(cause.getStatusCode()).isEqualTo(AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode()); + assertThat(ex.getMessage()).contains("Read operation not permitted on a directory."); + } + + verify(spyClient, times(0)) + .getPathStatus( + anyString(), + anyBoolean(), + any(TracingContext.class), + any()); + } + + /** + * Tests the behavior of openFileForRead with the FS_AZURE_RESTRICT_GPS_ON_OPENFILE configuration enabled. + * Verifies that irrespective of whether FileStatus is provided (correct or incorrect status type) or not, + * getPathStatus is not invoked for read flow. + * + * @throws Exception if any error occurs during the test + */ + @Test + public void testOpenFileWithOptionsWithRestrictGpsOnOpenFile() throws Exception { + AzureBlobFileSystem fs = getFileSystemWithRestrictGpsEnabled(); + + Path fileWithFileStatus = new Path("/testFile0"); + Path fileWithoutFileStatus = new Path("/testFile1"); + + // Create and write to both files, but we'll be sending FileStatus only for one of them + writeBufferToNewFile(fileWithFileStatus, new byte[5]); + writeBufferToNewFile(fileWithoutFileStatus, new byte[5]); + + AzureBlobFileSystemStore abfsStore = getAbfsStore(fs); + AbfsClient mockClient = spy(getAbfsClient(abfsStore)); + setAbfsClient(abfsStore, mockClient); + TracingContext tracingContext = getTestTracingContext(fs, false); + + // Case 1: FileStatus is not provided + abfsStore.openFileForRead(fileWithoutFileStatus, Optional.empty(), null, tracingContext); + verify(mockClient, times(0) + .description("FileStatus not provided, restrict GPS: getPathStatus should NOT be invoked")) + .getPathStatus(any(String.class), any(Boolean.class), any(TracingContext.class), + nullable(ContextEncryptionAdapter.class)); + + // NOTE: One call for GPS will come from getFileStatus for both cases below. + // If GPS were happening at openFileForRead, we would've seen more than 2 calls. + + // Case 2: FileStatus is provided (of wrong status type AbfsLocatedFileStatus) + abfsStore.openFileForRead(fileWithFileStatus, + Optional.ofNullable(new OpenFileParameters().withStatus( + new AbfsLocatedFileStatus(fs.getFileStatus(fileWithFileStatus), null))), + null, tracingContext); + verify(mockClient, times(1) + .description("Wrong FileStatus type provided, restrict GPS: getPathStatus still should NOT be invoked")) + .getPathStatus(any(String.class), any(Boolean.class), any(TracingContext.class), + nullable(ContextEncryptionAdapter.class)); + + // Case 3: FileStatus is provided (correct status type VersionedFileStatus) + abfsStore.openFileForRead(fileWithFileStatus, + Optional.ofNullable(new OpenFileParameters() + .withStatus(fs.getFileStatus(fileWithFileStatus))), + null, tracingContext); + verify(mockClient, times(2).description( + "Correct type FileStatus provided, restrict GPS: " + + "getPathStatus should NOT be invoked")) + .getPathStatus(any(String.class), any(Boolean.class), + any(TracingContext.class), nullable(ContextEncryptionAdapter.class)); + } + + /** + * Tests that when FS_AZURE_RESTRICT_GPS_ON_OPENFILE is enabled, + * the eTag and content length metadata are correctly initialized + * after the first read operation, and remain consistent for subsequent reads. + * + * @throws Exception if any error occurs during the test + */ + @Test + public void testMetadataFromReadForRestrictGpsOnOpenFile() throws Exception { + AzureBlobFileSystem fs = getFileSystemWithRestrictGpsEnabled(); + + Path testFile = new Path("/testFile0"); + writeBufferToNewFile(testFile, new byte[6 * ONE_MB]); + + // Open the file and perform a read + try (FSDataInputStream in = fs.open(testFile)) { + AbfsInputStream abfsIn = (AbfsInputStream) in.getWrappedStream(); + + // Before first read, eTag and content length aren't initialized + String etagPreRead = abfsIn.getETag(); + long fileLengthPreRead = abfsIn.getContentLength(); + assertThat(etagPreRead).isEmpty(); + assertThat(fileLengthPreRead).isEqualTo(0); + + // Trigger the first read + byte[] buf = new byte[6 * ONE_MB]; + int n = abfsIn.read(0, buf, 0, 2 * ONE_MB); + assertThat(n).isEqualTo(2 * ONE_MB); + + // After first read, eTag and content length should be set from the response + String etagFirstRead = abfsIn.getETag(); + long fileLengthFirstRead = abfsIn.getContentLength(); + assertThat(etagFirstRead).isNotNull(); + assertThat(fileLengthFirstRead).isEqualTo(6 * ONE_MB); + + //Trigger the second read + n = abfsIn.read(2 * ONE_MB, buf, 2 * ONE_MB, 4 * ONE_MB); + assertThat(n).isEqualTo(4 * ONE_MB); + + // eTag and content length should remain same as first read for second read onwards + String etagSecondRead = abfsIn.getETag(); + long fileLengthSecondRead = abfsIn.getContentLength(); + assertThat(etagSecondRead).isEqualTo(etagFirstRead); + assertThat(fileLengthSecondRead).isEqualTo(fileLengthFirstRead); + } + } + + /** + * Tests that when FS_AZURE_RESTRICT_GPS_ON_OPENFILE is enabled, + * metadata (eTag and content length) is not partially initialized if read requests fail with a timeout. + * Ensures that after failed reads, metadata remains unset, and only after a successful read + * are the metadata fields correctly initialized. + * + * @throws Exception if any error occurs during the test + */ + @Test + public void testMetadataNotPartiallyInitializedOnReadWithRestrictGpsOnOpenFile() + throws Exception { + AzureBlobFileSystem fs = spy(getFileSystemWithRestrictGpsEnabled()); + + AzureBlobFileSystemStore store = Mockito.spy(fs.getAbfsStore()); + Mockito.doReturn(store).when(fs).getAbfsStore(); + AbfsClient client = Mockito.spy(store.getClient()); + Mockito.doReturn(client).when(store).getClient(); + + AbfsRestOperation successOp = getMockRestOpWithMetadata(); + + doThrow(new TimeoutException("First-read-failure")) + .doThrow(new TimeoutException("Second-read-failure")) + .doReturn(successOp) + .when(client) + .read(any(String.class), any(Long.class), any(byte[].class), + any(Integer.class), any(Integer.class), any(String.class), + nullable(String.class), any(ContextEncryptionAdapter.class), any(TracingContext.class)); + + Path testFile = new Path("/testFile0"); + byte[] fileContent = new byte[ONE_KB]; + writeBufferToNewFile(testFile, fileContent); + + try (FSDataInputStream in = fs.open(testFile)) { + AbfsInputStream abfsIn = (AbfsInputStream) in.getWrappedStream(); + + // Metadata not initialized before read + assertThat(abfsIn.getETag()).isEmpty(); + assertThat(abfsIn.getContentLength()).isEqualTo(0); + + // First read fails- metadata should not be initialized + intercept(IOException.class, + () -> abfsIn.read(fileContent)); + assertThat(abfsIn.getETag()).isEmpty(); + assertThat(abfsIn.getContentLength()).isEqualTo(0); + + // Second read fails- metadata should not be initialized + intercept(IOException.class, + () -> abfsIn.read(fileContent)); + assertThat(abfsIn.getETag()).isEmpty(); + assertThat(abfsIn.getContentLength()).isEqualTo(0); + + // Third read succeeds- metadata should be initialized + abfsIn.read(fileContent); + assertThat(abfsIn.getETag()).isEqualTo("etag"); + assertThat(abfsIn.getContentLength()).isEqualTo(1024L); + } + } + /** * This test expects AbfsInputStream to throw the exception that readAhead * thread received on read. The readAhead thread must be initiated from the @@ -752,13 +1441,13 @@ public void testReadAheadManagerForSuccessfulReadAhead() throws Exception { // getBlock for a new read should return the buffer read-ahead int bytesRead = getBufferManager().getBlock( - inputStream, - ONE_KB, - ONE_KB, - new byte[ONE_KB]); + inputStream, + ONE_KB, + ONE_KB, + new byte[ONE_KB]); Assertions.assertTrue(bytesRead > 0, "bytesRead should be non-zero from the " - + "buffer that was read-ahead"); + + "buffer that was read-ahead"); // Once created, mock will remember all interactions. // As the above read should not have triggered any server calls, total @@ -799,6 +1488,7 @@ public void testDiffReadRequestSizeAndRAHBlockSize() throws Exception { true, SIXTEEN_KB); testReadAheads(inputStream, FORTY_EIGHT_KB, SIXTEEN_KB); + resetReadBufferManager(FOUR_MB, REDUCED_READ_BUFFER_AGE_THRESHOLD); //reset for next set of tests } @Test