Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,13 @@ public enum Property {
"The amount of time a scan reference is unused before its deleted from metadata table.",
"2.1.0"),
@Experimental
SSERV_SCAN_ALLOWED_TABLES("sserver.scan.allowed.tables.group.", null, PropertyType.PREFIX,
"A regular expression that determines which tables are allowed to be scanned for"
+ " servers in the specified group. The property name should end with the scan server"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 4.0, with the changes in #5749 would not need to group name in the property.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correct. There are probably other changes when merging in 4.0 as well.

+ " group and the property value should take into account the table namespace and name."
+ " The default value disallows scans on tables in the accumulo namespace.",
"2.1.5"),
@Experimental
SSERV_THREADCHECK("sserver.server.threadcheck.time", "1s", PropertyType.TIMEDURATION,
"The time between adjustments of the thrift server thread pool.", "2.1.0"),
// properties that are specific to tablet server behavior
Expand Down
111 changes: 100 additions & 11 deletions server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,20 @@
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.accumulo.core.Constants;
import org.apache.accumulo.core.client.AccumuloException;
import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode;
import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException;
import org.apache.accumulo.core.conf.AccumuloConfiguration;
import org.apache.accumulo.core.conf.Property;
import org.apache.accumulo.core.conf.cluster.ClusterConfigParser;
import org.apache.accumulo.core.data.TableId;
import org.apache.accumulo.core.dataImpl.KeyExtent;
import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan;
import org.apache.accumulo.core.dataImpl.thrift.InitialScan;
Expand Down Expand Up @@ -177,6 +182,8 @@ private TabletMetadataLoader(Ample ample) {
}

private static final Logger LOG = LoggerFactory.getLogger(ScanServer.class);
// Default pattern to allow scans on all tables not in accumulo namespace
private static final String DEFAULT_SCAN_ALLOWED_PATTERN = "^(?!accumulo\\.).*$";

protected ThriftScanClientHandler delegate;
private UUID serverLockUUID;
Expand Down Expand Up @@ -213,6 +220,9 @@ private TabletMetadataLoader(Ample ample) {

private final String groupName;

private final ConcurrentHashMap<TableId,Boolean> allowedTables = new ConcurrentHashMap<>();
private volatile String currentAllowedTableRegex;

public ScanServer(ScanServerOpts opts, String[] args) {
super("sserver", opts, args);

Expand Down Expand Up @@ -388,6 +398,7 @@ public void run() {
}

SecurityUtil.serverLogin(getConfiguration());
updateAllowedTables(false);

ServerAddress address = null;
try {
Expand Down Expand Up @@ -423,6 +434,7 @@ public void run() {
Thread.sleep(1000);
updateIdleStatus(sessionManager.getActiveScans().isEmpty()
&& tabletMetadataCache.estimatedSize() == 0);
updateAllowedTables(false);
} catch (InterruptedException e) {
LOG.info("Interrupt Exception received, shutting down");
gracefulShutdown(getContext().rpcCreds());
Expand Down Expand Up @@ -477,6 +489,86 @@ public void run() {
}
}

// Visible for testing
protected boolean isAllowed(TCredentials credentials, TableId tid)
throws ThriftSecurityException {
Boolean result = allowedTables.get(tid);
while (result == null) {
LOG.debug(
"Allowed tables mapping does not contain an entry for table: {}, refreshing table...",
tid);
// Clear the cache and try again, maybe there
// is a race condition in table creation and scan
updateAllowedTables(true);
// validate that the table exists, else throw
delegate.getNamespaceId(credentials, tid);
result = allowedTables.get(tid);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Its possible the table id still may not exist in the map after the update call because that only works on known tables. A client could pass a table id to the scan server that no longer exists. Could do the following to protect against this and avoid a NPE when trying to convert null to a boolean primitave.

Suggested change
result = allowedTables.get(tid);
result = allowedTables.getOrDefault(tid, false);

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I approached this differently in f3ca130, want to get your thoughts on it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thats much better. Thats a tight loop that has the potential to hammer ZK, but it does not seems like that would happen under any circumstance I can think of. If wanted to make the code a bit more future proof (like changes in the code it calls) could use a retry w/ a limit on retries and bit of backoff. Not sure if thats worthwhile though.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was wondering if there should be a limit to the retries and then error. Maybe 1 a second, 10 times? That gives 10 seconds for ZK changes to propagate across a cluster. Is that enough time?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

1 a second for 10 times sounds good to me, if its gets that far something is probably off so would be good to include the table id in the error message. The code is clearing the zoo cache, so then its a matter of propagation between ZK servers?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What type of exception do you think should be propagated back to the client? ScanServerBusy so that it tries another server?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an unexpected condition, could throw an IllegalState exception.

}
return result;
}

private synchronized void updateAllowedTables(boolean clearCache) {

LOG.debug("Updating allowed tables for ScanServer");
if (clearCache) {
context.clearTableListCache();
}

// Remove tables that no longer exist
allowedTables.keySet().forEach(tid -> {
if (!getContext().getTableIdToNameMap().containsKey(tid)) {
LOG.debug("Removing table {} from allowed table map as it no longer exists", tid);
allowedTables.remove(tid);
}
});

final String propName = Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + groupName;
String allowedTableRegex = getConfiguration().get(propName);
if (allowedTableRegex == null) {
allowedTableRegex = DEFAULT_SCAN_ALLOWED_PATTERN;
}

if (currentAllowedTableRegex == null) {
LOG.debug("Property {} initial value: {}", propName, allowedTableRegex);
} else if (currentAllowedTableRegex.equals(allowedTableRegex)) {
// Property value has not changed, do nothing
} else {
LOG.debug("Property {} has changed. Old value: {}, new value: {}", propName,
currentAllowedTableRegex, allowedTableRegex);
}

Pattern allowedTablePattern;
try {
allowedTablePattern = Pattern.compile(allowedTableRegex);
// Regex is valid, store it
currentAllowedTableRegex = allowedTableRegex;
} catch (PatternSyntaxException e) {
LOG.error(
"Property {} contains an invalid regular expression. Property value: {}. Disabling all tables.",
propName, allowedTableRegex);
allowedTablePattern = null;
}

Pattern p = allowedTablePattern;
context.getTableNameToIdMap().entrySet().forEach(e -> {
String tname = e.getKey();
TableId tid = e.getValue();
if (p == null) {
allowedTables.put(tid, Boolean.FALSE);
} else {
Matcher m = p.matcher(tname);
if (m.matches()) {
LOG.debug("Table {} can now be scanned via this ScanServer", tname);
allowedTables.put(tid, Boolean.TRUE);
} else {
LOG.debug("Table {} cannot be scanned via this ScanServer", tname);
allowedTables.put(tid, Boolean.FALSE);
}
}
});

}

@SuppressWarnings("unchecked")
private Map<KeyExtent,TabletMetadata> getTabletMetadata(Collection<KeyExtent> extents) {
if (tabletMetadataCache == null) {
Expand Down Expand Up @@ -945,11 +1037,6 @@ public void close() {
};
}

/* Exposed for testing */
protected boolean isSystemUser(TCredentials creds) {
return context.getSecurityOperation().isSystemUser(creds);
}

@Override
public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent,
TRange range, List<TColumn> columns, int batchSize, List<IterInfo> ssiList,
Expand All @@ -966,9 +1053,9 @@ public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent t

KeyExtent extent = getKeyExtent(textent);

if (extent.isMeta() && !isSystemUser(credentials)) {
throw new TException(
"Only the system user can perform eventual consistency scans on the root and metadata tables");
if (!isAllowed(credentials, extent.tableId())) {
throw new TException("Scan of table " + extent.tableId() + " disallowed by property: "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a preexisting problem. Throwing TException will probably result in a TappException on the client side. Need to throw a thrift exception defined on the RPC to get anything meaningful on the client side.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated in 02b55d1

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

startMultiScan was changed to throw a thrift security exception, can the same change be made for startScan?

+ Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + this.groupName);
}

try (ScanReservation reservation =
Expand Down Expand Up @@ -1038,9 +1125,11 @@ public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials,
for (Entry<TKeyExtent,List<TRange>> entry : tbatch.entrySet()) {
KeyExtent extent = getKeyExtent(entry.getKey());

if (extent.isMeta() && !context.getSecurityOperation().isSystemUser(credentials)) {
throw new TException(
"Only the system user can perform eventual consistency scans on the root and metadata tables");
if (!isAllowed(credentials, extent.tableId())) {
throw new ThriftSecurityException(
"Scan of table " + extent.tableId() + " disallowed by property: "
+ Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + this.groupName,
SecurityErrorCode.PERMISSION_DENIED);
}

batch.put(extent, entry.getValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public ThriftScanClientHandler(TabletHostingServer server, WriteTracker writeTra
.getTimeInMillis(Property.TSERV_SCAN_RESULTS_MAX_TIMEOUT);
}

private NamespaceId getNamespaceId(TCredentials credentials, TableId tableId)
public NamespaceId getNamespaceId(TCredentials credentials, TableId tableId)
throws ThriftSecurityException {
try {
return server.getContext().getNamespaceId(tableId);
Expand Down
Loading