-
Notifications
You must be signed in to change notification settings - Fork 479
Added ScanServer property for allowed tables #6146
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 2.1
Are you sure you want to change the base?
Changes from all commits
05bda74
881b810
f3ca130
02b55d1
9ed878f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -45,15 +45,20 @@ | |||||
| import java.util.concurrent.atomic.AtomicLong; | ||||||
| import java.util.concurrent.locks.Condition; | ||||||
| import java.util.concurrent.locks.ReentrantReadWriteLock; | ||||||
| import java.util.regex.Matcher; | ||||||
| import java.util.regex.Pattern; | ||||||
| import java.util.regex.PatternSyntaxException; | ||||||
| import java.util.stream.Collectors; | ||||||
| import java.util.stream.Stream; | ||||||
|
|
||||||
| import org.apache.accumulo.core.Constants; | ||||||
| import org.apache.accumulo.core.client.AccumuloException; | ||||||
| import org.apache.accumulo.core.clientImpl.thrift.SecurityErrorCode; | ||||||
| import org.apache.accumulo.core.clientImpl.thrift.ThriftSecurityException; | ||||||
| import org.apache.accumulo.core.conf.AccumuloConfiguration; | ||||||
| import org.apache.accumulo.core.conf.Property; | ||||||
| import org.apache.accumulo.core.conf.cluster.ClusterConfigParser; | ||||||
| import org.apache.accumulo.core.data.TableId; | ||||||
| import org.apache.accumulo.core.dataImpl.KeyExtent; | ||||||
| import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan; | ||||||
| import org.apache.accumulo.core.dataImpl.thrift.InitialScan; | ||||||
|
|
@@ -177,6 +182,8 @@ private TabletMetadataLoader(Ample ample) { | |||||
| } | ||||||
|
|
||||||
| private static final Logger LOG = LoggerFactory.getLogger(ScanServer.class); | ||||||
| // Default pattern to allow scans on all tables not in accumulo namespace | ||||||
| private static final String DEFAULT_SCAN_ALLOWED_PATTERN = "^(?!accumulo\\.).*$"; | ||||||
|
|
||||||
| protected ThriftScanClientHandler delegate; | ||||||
| private UUID serverLockUUID; | ||||||
|
|
@@ -213,6 +220,9 @@ private TabletMetadataLoader(Ample ample) { | |||||
|
|
||||||
| private final String groupName; | ||||||
|
|
||||||
| private final ConcurrentHashMap<TableId,Boolean> allowedTables = new ConcurrentHashMap<>(); | ||||||
| private volatile String currentAllowedTableRegex; | ||||||
|
|
||||||
| public ScanServer(ScanServerOpts opts, String[] args) { | ||||||
| super("sserver", opts, args); | ||||||
|
|
||||||
|
|
@@ -388,6 +398,7 @@ public void run() { | |||||
| } | ||||||
|
|
||||||
| SecurityUtil.serverLogin(getConfiguration()); | ||||||
| updateAllowedTables(false); | ||||||
|
|
||||||
| ServerAddress address = null; | ||||||
| try { | ||||||
|
|
@@ -423,6 +434,7 @@ public void run() { | |||||
| Thread.sleep(1000); | ||||||
| updateIdleStatus(sessionManager.getActiveScans().isEmpty() | ||||||
| && tabletMetadataCache.estimatedSize() == 0); | ||||||
| updateAllowedTables(false); | ||||||
| } catch (InterruptedException e) { | ||||||
| LOG.info("Interrupt Exception received, shutting down"); | ||||||
| gracefulShutdown(getContext().rpcCreds()); | ||||||
|
|
@@ -477,6 +489,86 @@ public void run() { | |||||
| } | ||||||
| } | ||||||
|
|
||||||
| // Visible for testing | ||||||
| protected boolean isAllowed(TCredentials credentials, TableId tid) | ||||||
| throws ThriftSecurityException { | ||||||
| Boolean result = allowedTables.get(tid); | ||||||
| while (result == null) { | ||||||
| LOG.debug( | ||||||
| "Allowed tables mapping does not contain an entry for table: {}, refreshing table...", | ||||||
| tid); | ||||||
| // Clear the cache and try again, maybe there | ||||||
| // is a race condition in table creation and scan | ||||||
| updateAllowedTables(true); | ||||||
| // validate that the table exists, else throw | ||||||
| delegate.getNamespaceId(credentials, tid); | ||||||
| result = allowedTables.get(tid); | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Its possible the table id still may not exist in the map after the update call because that only works on known tables. A client could pass a table id to the scan server that no longer exists. Could do the following to protect against this and avoid a NPE when trying to convert null to a boolean primitave.
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I approached this differently in f3ca130, want to get your thoughts on it.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thats much better. Thats a tight loop that has the potential to hammer ZK, but it does not seems like that would happen under any circumstance I can think of. If wanted to make the code a bit more future proof (like changes in the code it calls) could use a retry w/ a limit on retries and bit of backoff. Not sure if thats worthwhile though.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I was wondering if there should be a limit to the retries and then error. Maybe 1 a second, 10 times? That gives 10 seconds for ZK changes to propagate across a cluster. Is that enough time?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 1 a second for 10 times sounds good to me, if its gets that far something is probably off so would be good to include the table id in the error message. The code is clearing the zoo cache, so then its a matter of propagation between ZK servers?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What type of exception do you think should be propagated back to the client? ScanServerBusy so that it tries another server?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is an unexpected condition, could throw an IllegalState exception. |
||||||
| } | ||||||
| return result; | ||||||
| } | ||||||
|
|
||||||
| private synchronized void updateAllowedTables(boolean clearCache) { | ||||||
|
|
||||||
| LOG.debug("Updating allowed tables for ScanServer"); | ||||||
| if (clearCache) { | ||||||
| context.clearTableListCache(); | ||||||
| } | ||||||
|
|
||||||
| // Remove tables that no longer exist | ||||||
| allowedTables.keySet().forEach(tid -> { | ||||||
| if (!getContext().getTableIdToNameMap().containsKey(tid)) { | ||||||
| LOG.debug("Removing table {} from allowed table map as it no longer exists", tid); | ||||||
| allowedTables.remove(tid); | ||||||
| } | ||||||
| }); | ||||||
|
|
||||||
| final String propName = Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + groupName; | ||||||
| String allowedTableRegex = getConfiguration().get(propName); | ||||||
| if (allowedTableRegex == null) { | ||||||
| allowedTableRegex = DEFAULT_SCAN_ALLOWED_PATTERN; | ||||||
| } | ||||||
|
|
||||||
| if (currentAllowedTableRegex == null) { | ||||||
| LOG.debug("Property {} initial value: {}", propName, allowedTableRegex); | ||||||
| } else if (currentAllowedTableRegex.equals(allowedTableRegex)) { | ||||||
| // Property value has not changed, do nothing | ||||||
| } else { | ||||||
| LOG.debug("Property {} has changed. Old value: {}, new value: {}", propName, | ||||||
| currentAllowedTableRegex, allowedTableRegex); | ||||||
| } | ||||||
|
|
||||||
| Pattern allowedTablePattern; | ||||||
| try { | ||||||
| allowedTablePattern = Pattern.compile(allowedTableRegex); | ||||||
| // Regex is valid, store it | ||||||
| currentAllowedTableRegex = allowedTableRegex; | ||||||
| } catch (PatternSyntaxException e) { | ||||||
| LOG.error( | ||||||
| "Property {} contains an invalid regular expression. Property value: {}. Disabling all tables.", | ||||||
| propName, allowedTableRegex); | ||||||
| allowedTablePattern = null; | ||||||
| } | ||||||
|
|
||||||
| Pattern p = allowedTablePattern; | ||||||
| context.getTableNameToIdMap().entrySet().forEach(e -> { | ||||||
| String tname = e.getKey(); | ||||||
| TableId tid = e.getValue(); | ||||||
| if (p == null) { | ||||||
| allowedTables.put(tid, Boolean.FALSE); | ||||||
| } else { | ||||||
| Matcher m = p.matcher(tname); | ||||||
| if (m.matches()) { | ||||||
| LOG.debug("Table {} can now be scanned via this ScanServer", tname); | ||||||
| allowedTables.put(tid, Boolean.TRUE); | ||||||
| } else { | ||||||
| LOG.debug("Table {} cannot be scanned via this ScanServer", tname); | ||||||
| allowedTables.put(tid, Boolean.FALSE); | ||||||
| } | ||||||
| } | ||||||
| }); | ||||||
|
|
||||||
| } | ||||||
|
|
||||||
| @SuppressWarnings("unchecked") | ||||||
| private Map<KeyExtent,TabletMetadata> getTabletMetadata(Collection<KeyExtent> extents) { | ||||||
| if (tabletMetadataCache == null) { | ||||||
|
|
@@ -945,11 +1037,6 @@ public void close() { | |||||
| }; | ||||||
| } | ||||||
|
|
||||||
| /* Exposed for testing */ | ||||||
| protected boolean isSystemUser(TCredentials creds) { | ||||||
| return context.getSecurityOperation().isSystemUser(creds); | ||||||
| } | ||||||
|
|
||||||
| @Override | ||||||
| public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent, | ||||||
| TRange range, List<TColumn> columns, int batchSize, List<IterInfo> ssiList, | ||||||
|
|
@@ -966,9 +1053,9 @@ public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent t | |||||
|
|
||||||
| KeyExtent extent = getKeyExtent(textent); | ||||||
|
|
||||||
| if (extent.isMeta() && !isSystemUser(credentials)) { | ||||||
| throw new TException( | ||||||
| "Only the system user can perform eventual consistency scans on the root and metadata tables"); | ||||||
| if (!isAllowed(credentials, extent.tableId())) { | ||||||
| throw new TException("Scan of table " + extent.tableId() + " disallowed by property: " | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a preexisting problem. Throwing TException will probably result in a TappException on the client side. Need to throw a thrift exception defined on the RPC to get anything meaningful on the client side.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated in 02b55d1
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. startMultiScan was changed to throw a thrift security exception, can the same change be made for startScan? |
||||||
| + Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + this.groupName); | ||||||
| } | ||||||
|
|
||||||
| try (ScanReservation reservation = | ||||||
|
|
@@ -1038,9 +1125,11 @@ public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, | |||||
| for (Entry<TKeyExtent,List<TRange>> entry : tbatch.entrySet()) { | ||||||
| KeyExtent extent = getKeyExtent(entry.getKey()); | ||||||
|
|
||||||
| if (extent.isMeta() && !context.getSecurityOperation().isSystemUser(credentials)) { | ||||||
| throw new TException( | ||||||
| "Only the system user can perform eventual consistency scans on the root and metadata tables"); | ||||||
| if (!isAllowed(credentials, extent.tableId())) { | ||||||
| throw new ThriftSecurityException( | ||||||
| "Scan of table " + extent.tableId() + " disallowed by property: " | ||||||
| + Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + this.groupName, | ||||||
| SecurityErrorCode.PERMISSION_DENIED); | ||||||
| } | ||||||
|
|
||||||
| batch.put(extent, entry.getValue()); | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In 4.0, with the changes in #5749 would not need to group name in the property.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
correct. There are probably other changes when merging in 4.0 as well.