Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,10 @@ public class AbfsConfiguration{
DefaultValue = DEFAULT_AZURE_READ_POLICY)
private String abfsReadPolicy;

@BooleanConfigurationValidatorAnnotation(ConfigurationKey = FS_AZURE_RESTRICT_GPS_ON_OPENFILE,
DefaultValue = DEFAULT_FS_AZURE_RESTRICT_GPS_ON_OPENFILE)
private boolean restrictGpsOnOpenFile;

private String clientProvidedEncryptionKey;
private String clientProvidedEncryptionKeySHA;

Expand Down Expand Up @@ -1389,6 +1393,14 @@ public String getAbfsReadPolicy() {
return abfsReadPolicy;
}

/**
* Indicates whether GPS restriction on open file is enabled.
* @return true if GPS restriction is enabled on open file, false otherwise.
*/
public boolean shouldRestrictGpsOnOpenFile() {
return restrictGpsOnOpenFile;
}

/**
* Enum config to allow user to pick format of x-ms-client-request-id header
* @return tracingContextFormat config if valid, else default ALL_ID_FORMAT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,21 @@ public FSDataInputStream open(final Path path, final int bufferSize) throws IOEx
return open(path, Optional.empty());
}

/**
* Open a file for reading and return an {@link FSDataInputStream} that wraps
* the underlying {@link InputStream}.
*
* Note: when the filesystem is configured with `restrictGpsOnOpenFile` enabled
* (its disabled by default), existence check for the file path will be deferred
* and will not occur during this open call; it will happen when the first read
* is attempted on the returned stream.
*
* @param path the location of the file to open
* @param parameters optional {@link OpenFileParameters} which can include
* FileStatus, configuration, buffer size and mandatory keys
* @return an {@link FSDataInputStream} wrapping the opened InputStream
* @throws IOException if an I/O error occurs while opening the file
*/
private FSDataInputStream open(final Path path,
final Optional<OpenFileParameters> parameters) throws IOException {
statIncrement(CALL_OPEN);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_STAR;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.CHAR_UNDERSCORE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.DIRECTORY;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.FILE;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.ROOT_PATH;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.SINGLE_WHITE_SPACE;
Expand All @@ -162,6 +163,7 @@
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemConfigurations.INFINITE_LEASE_DURATION;
import static org.apache.hadoop.fs.azurebfs.constants.FileSystemUriSchemes.ABFS_BLOB_DOMAIN_NAME;
import static org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT;
import static org.apache.hadoop.fs.azurebfs.services.AbfsErrors.ERR_OPENFILE_ON_DIRECTORY;
import static org.apache.hadoop.fs.azurebfs.utils.UriUtils.isKeyForDirectorySet;

/**
Expand Down Expand Up @@ -553,7 +555,7 @@ public Hashtable<String, String> getPathStatus(final Path path,

/**
* Creates an object of {@link ContextEncryptionAdapter}
* from a file path. It calls {@link org.apache.hadoop.fs.azurebfs.services.AbfsClient
* from a file path. It calls {@link org.apache.hadoop.fs.azurebfs.services.AbfsClient
* #getPathStatus(String, boolean, TracingContext, EncryptionAdapter)} method to get
* contextValue (x-ms-encryption-context) from the server. The contextValue is passed
* to the constructor of EncryptionAdapter to create the required object of
Expand Down Expand Up @@ -866,6 +868,53 @@ public AbfsInputStream openFileForRead(final Path path,
tracingContext);
}

/**
* Creates an exception indicating that openFileForRead was called on a directory.
*
* @return AbfsRestOperationException with PATH_NOT_FOUND error code and a message
* indicating that openFileForRead must be used with files and not directories.
*/
private AbfsRestOperationException openFileForReadDirectoryException() {
return new AbfsRestOperationException(
AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
ERR_OPENFILE_ON_DIRECTORY,
null);
}

/**
* Opens a file for read and returns an {@link AbfsInputStream}.
*
* <p>
* The method decides whether to call the server's GetPathStatus based on:
* <ul>
* <li>the supplied {@code parameters} (if it contains a {@link VersionedFileStatus}
* with a valid encryption context when required),</li>
* <li>the client's encryption type ({@link EncryptionType#ENCRYPTION_CONTEXT}), and</li>
* <li>the configuration flag returned by {@link AbfsConfiguration#shouldRestrictGpsOnOpenFile()}.</li>
* </ul>
* If the encryption type is {@code ENCRYPTION_CONTEXT} the server-supplied
* X-MS-ENCRYPTION-CONTEXT header will be required and used to construct a
* {@link ContextProviderEncryptionAdapter}. If that header is missing a
* {@link PathIOException} is thrown.
* </p>
*
* <p>
* Note: when {@link AbfsConfiguration#shouldRestrictGpsOnOpenFile()} is enabled,
* the implementation won't do the GetPathStatus call. In that case, if the file does not
* actually exist or read is attempted on a directory, {@code openFileForRead} will not fail immediately.
* It will only be detected when the returned stream performs its first read, at which point an appropriate error will be raised.
* </p>
*
* @param path the path to open (may be unqualified)
* @param parameters optional {@link OpenFileParameters} that may include a {@link FileStatus}
* (possibly a {@link VersionedFileStatus}) and other open parameters
* @param statistics filesystem statistics to associate with the returned stream
* @param tracingContext tracing context for remote calls
* @return an {@link AbfsInputStream} for reading the file
* @throws IOException on IO or server errors. A {@link PathIOException} is thrown when
* an expected encryption context header is missing.
*/
public AbfsInputStream openFileForRead(Path path,
final Optional<OpenFileParameters> parameters,
final FileSystem.Statistics statistics, TracingContext tracingContext)
Expand All @@ -878,13 +927,13 @@ public AbfsInputStream openFileForRead(Path path,
FileStatus fileStatus = parameters.map(OpenFileParameters::getStatus)
.orElse(null);
String relativePath = getRelativePath(path);
String resourceType, eTag;
long contentLength;
String resourceType = EMPTY_STRING, eTag = EMPTY_STRING;
long contentLength = 0;
ContextEncryptionAdapter contextEncryptionAdapter = NoContextEncryptionAdapter.getInstance();
/*
* GetPathStatus API has to be called in case of:
* 1. fileStatus is null or not an object of VersionedFileStatus: as eTag
* would not be there in the fileStatus object.
* 1. restrictGpsOnOpenFile config is disabled AND fileStatus is null or not
* an object of VersionedFileStatus: as eTag would not be there in the fileStatus object.
* 2. fileStatus is an object of VersionedFileStatus and the object doesn't
* have encryptionContext field when client's encryptionType is
* ENCRYPTION_CONTEXT.
Expand All @@ -908,19 +957,23 @@ public AbfsInputStream openFileForRead(Path path,
getClient().getEncryptionContextProvider(), getRelativePath(path),
encryptionContext.getBytes(StandardCharsets.UTF_8));
}
} else {
}
/*
* If file created with ENCRYPTION_CONTEXT, irrespective of whether isRestrictGpsOnOpenFile config is enabled or not,
* GetPathStatus API has to be called to get the encryptionContext from the response header
*/
else if (getClient().getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT
|| !getAbfsConfiguration().shouldRestrictGpsOnOpenFile()) {

AbfsHttpOperation op = getClient().getPathStatus(relativePath, false,
tracingContext, null).getResult();
resourceType = getClient().checkIsDir(op) ? DIRECTORY : FILE;
contentLength = extractContentLength(op);
eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
tracingContext, null).getResult();
/*
* For file created with ENCRYPTION_CONTEXT, client shall receive
* encryptionContext from header field: X_MS_ENCRYPTION_CONTEXT.
*/
if (getClient().getEncryptionType() == EncryptionType.ENCRYPTION_CONTEXT) {
final String fileEncryptionContext = op.getResponseHeader(
HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT);
HttpHeaderConfigurations.X_MS_ENCRYPTION_CONTEXT);
if (fileEncryptionContext == null) {
LOG.debug("EncryptionContext missing in GetPathStatus response");
throw new PathIOException(path.toString(),
Expand All @@ -930,14 +983,20 @@ public AbfsInputStream openFileForRead(Path path,
getClient().getEncryptionContextProvider(), getRelativePath(path),
fileEncryptionContext.getBytes(StandardCharsets.UTF_8));
}
resourceType = getClient().checkIsDir(op) ? DIRECTORY : FILE;
contentLength = extractContentLength(op);
eTag = op.getResponseHeader(HttpHeaderConfigurations.ETAG);
}
/* The only remaining case is:
* - restrictGpsOnOpenFile config is enabled with null/wrong FileStatus and encryptionType not as ENCRYPTION_CONTEXT
* In this case, we don't need to call GetPathStatus API.
*/
else {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Won't this lead to going ahead and opening the stream without checks? Do we fail later for this case ?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the checks with this config would be happening after the first read then. If the read fails there then we throw the appropriate exception

// do nothing
}

if (parseIsDirectory(resourceType)) {
throw new AbfsRestOperationException(
AzureServiceErrorCode.PATH_NOT_FOUND.getStatusCode(),
AzureServiceErrorCode.PATH_NOT_FOUND.getErrorCode(),
"openFileForRead must be used with files and not directories",
null);
throw openFileForReadDirectoryException();
}

perfInfo.registerSuccess(true);
Expand Down Expand Up @@ -1003,6 +1062,7 @@ AZURE_FOOTER_READ_BUFFER_SIZE, getAbfsConfiguration().getFooterReadBufferSize())
.withStreamStatistics(new AbfsInputStreamStatisticsImpl())
.withShouldReadBufferSizeAlways(getAbfsConfiguration().shouldReadBufferSizeAlways())
.withReadAheadBlockSize(getAbfsConfiguration().getReadAheadBlockSize())
.shouldRestrictGpsOnOpenFile(getAbfsConfiguration().shouldRestrictGpsOnOpenFile())
.withBufferedPreadDisabled(bufferedPreadDisabled)
.withEncryptionAdapter(contextEncryptionAdapter)
.withAbfsBackRef(fsBackRef)
Expand Down Expand Up @@ -1855,7 +1915,7 @@ private AbfsClientContext populateAbfsClientContext() {
.build();
}

public String getRelativePath(final Path path) {
public static String getRelativePath(final Path path) {
Preconditions.checkNotNull(path, "path");
String relPath = path.toUri().getPath();
if (relPath.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ public final class AbfsHttpConstants {
* @see "https://learn.microsoft.com/en-us/azure/active-directory/managed-identities-azure-resources/how-to-use-vm-token#error-handling"
*/
public static final int HTTP_TOO_MANY_REQUESTS = 429;
public static final int HTTP_INVALID_RANGE = 416;

public static final char CHAR_FORWARD_SLASH = '/';
public static final char CHAR_EXCLAMATION_POINT = '!';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -627,5 +627,11 @@ public static String containerProperty(String property, String fsName, String ac
*/
public static final String FS_AZURE_TAIL_LATENCY_MAX_RETRY_COUNT = "fs.azure.tail.latency.max.retry.count";

/**
* If true, restricts GPS (getPathStatus) calls on openFileforRead
* Default: false
*/
public static final String FS_AZURE_RESTRICT_GPS_ON_OPENFILE = "fs.azure.restrict.gps.on.openfile";

private ConfigurationKeys() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ public final class FileSystemConfigurations {
public static final int MIN_FS_AZURE_TAIL_LATENCY_ANALYSIS_WINDOW_GRANULARITY = 1;
public static final int DEFAULT_FS_AZURE_TAIL_LATENCY_PERCENTILE_COMPUTATION_INTERVAL_MILLIS = 500;
public static final int DEFAULT_FS_AZURE_TAIL_LATENCY_MAX_RETRY_COUNT = 1;
public static final boolean DEFAULT_FS_AZURE_RESTRICT_GPS_ON_OPENFILE = false;

private FileSystemConfigurations() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public final class HttpHeaderConfigurations {
public static final String CONTENT_MD5 = "Content-MD5";
public static final String CONTENT_TYPE = "Content-Type";
public static final String RANGE = "Range";
public static final String CONTENT_RANGE = "Content-Range";
public static final String TRANSFER_ENCODING = "Transfer-Encoding";
public static final String USER_AGENT = "User-Agent";
public static final String X_HTTP_METHOD_OVERRIDE = "X-HTTP-Method-Override";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.net.HttpURLConnection;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience;
Expand Down Expand Up @@ -66,6 +68,8 @@ public enum AzureServiceErrorCode {
INVALID_APPEND_OPERATION("InvalidAppendOperation", HttpURLConnection.HTTP_CONFLICT, null),
UNAUTHORIZED_BLOB_OVERWRITE("UnauthorizedBlobOverwrite", HttpURLConnection.HTTP_FORBIDDEN,
"This request is not authorized to perform blob overwrites."),
INVALID_RANGE("InvalidRange", AbfsHttpConstants.HTTP_INVALID_RANGE,
"The range specified is invalid for the current size of the resource."),
UNKNOWN(null, -1, null);

private final String errorCode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected int readOneBlock(final byte[] b, final int off, final int len) throws
// If buffer is empty, then fill the buffer.
if (getBCursor() == getLimit()) {
// If EOF, then return -1
if (getFCursor() >= getContentLength()) {
if (!(shouldRestrictGpsOnOpenFile() && isFirstRead()) && getFCursor() >= getContentLength()) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we decouple !(shouldRestrictGpsOnOpenFile() && isFirstRead()) from the other conditions?

Currently, if both shouldRestrictGpsOnOpenFile() and isFirstRead() are true, the entire expression evaluates to false. This prevents the function from returning -1 even if getFCursor() >= getContentLength() is true, allowing the execution to proceed incorrectly.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, for first reads we have other validation in place

return -1;
}

Expand All @@ -83,7 +83,14 @@ protected int readOneBlock(final byte[] b, final int off, final int len) throws

// Reset Read Type back to normal and set again based on code flow.
getTracingContext().setReadType(ReadType.NORMAL_READ);
if (shouldAlwaysReadBufferSize()) {

// If restrictGpsOnOpenFile config is enabled, skip prefetch for the first read since contentLength
// is not available yet.
if (shouldRestrictGpsOnOpenFile() && isFirstRead()) {
LOG.debug("RestrictGpsOnOpenFile is enabled. Skip readahead for first read.");
bytesRead = readInternal(getFCursor(), getBuffer(), 0, getBufferSize(), true);
}
else if (shouldAlwaysReadBufferSize()) {
bytesRead = readInternal(getFCursor(), getBuffer(), 0, getBufferSize(), false);
} else {
// Enable readAhead when reading sequentially
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1347,7 +1347,7 @@ public AbfsRestOperation checkAccess(String path,
public boolean checkIsDir(AbfsHttpOperation result) {
String resourceType = result.getResponseHeader(
HttpHeaderConfigurations.X_MS_RESOURCE_TYPE);
return StringUtils.equalsIgnoreCase(resourceType, DIRECTORY);
return resourceType != null && StringUtils.equalsIgnoreCase(resourceType, DIRECTORY);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check is not needed here as equalsIgnoreCase internally checks if resourceType is null or not.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, returns NPE if not present

}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ public final class AbfsErrors {
+ "and cannot be appended to by the Azure Data Lake Storage Service API";
public static final String CONDITION_NOT_MET = "The condition specified using "
+ "HTTP conditional header(s) is not met.";
public static final String ERR_READ_ON_DIRECTORY = "Read operation not permitted on a directory.";
public static final String ERR_OPENFILE_ON_DIRECTORY = "openFileForRead must be used with files and not directories";


/**
* Exception message on filesystem init if token-provider-auth-type configs are provided
*/
Expand Down
Loading
Loading