diff --git a/core/src/main/java/org/apache/accumulo/core/conf/Property.java b/core/src/main/java/org/apache/accumulo/core/conf/Property.java index 17edff8d2ea..90602ad6165 100644 --- a/core/src/main/java/org/apache/accumulo/core/conf/Property.java +++ b/core/src/main/java/org/apache/accumulo/core/conf/Property.java @@ -561,6 +561,13 @@ public enum Property { "The amount of time a scan reference is unused before its deleted from metadata table.", "2.1.0"), @Experimental + SSERV_SCAN_ALLOWED_TABLES("sserver.scan.allowed.tables.group.", null, PropertyType.PREFIX, + "A regular expression that determines which tables are allowed to be scanned for" + + " servers in the specified group. The property name should end with the scan server" + + " group and the property value should take into account the table namespace and name." + + " The default value disallows scans on tables in the accumulo namespace.", + "2.1.5"), + @Experimental SSERV_THREADCHECK("sserver.server.threadcheck.time", "1s", PropertyType.TIMEDURATION, "The time between adjustments of the thrift server thread pool.", "2.1.0"), // properties that are specific to tablet server behavior diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java index 83f57347b16..53b121f08d5 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ScanServer.java @@ -19,6 +19,7 @@ package org.apache.accumulo.tserver; import static java.nio.charset.StandardCharsets.UTF_8; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.accumulo.core.util.UtilWaitThread.sleepUninterruptibly; import static org.apache.accumulo.core.util.threads.ThreadPoolNames.SCAN_SERVER_TABLET_METADATA_CACHE_POOL; @@ -45,6 +46,9 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.regex.Matcher; +import java.util.regex.Pattern; +import java.util.regex.PatternSyntaxException; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -54,6 +58,7 @@ import org.apache.accumulo.core.conf.AccumuloConfiguration; import org.apache.accumulo.core.conf.Property; import org.apache.accumulo.core.conf.cluster.ClusterConfigParser; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan; import org.apache.accumulo.core.dataImpl.thrift.InitialScan; @@ -88,6 +93,7 @@ import org.apache.accumulo.core.tabletserver.thrift.TooManyFilesException; import org.apache.accumulo.core.trace.thrift.TInfo; import org.apache.accumulo.core.util.HostAndPort; +import org.apache.accumulo.core.util.Retry; import org.apache.accumulo.core.util.UtilWaitThread; import org.apache.accumulo.core.util.threads.ThreadPools; import org.apache.accumulo.server.AbstractServer; @@ -111,6 +117,7 @@ import org.apache.accumulo.tserver.tablet.SnapshotTablet; import org.apache.accumulo.tserver.tablet.Tablet; import org.apache.accumulo.tserver.tablet.TabletBase; +import org.apache.thrift.TApplicationException; import org.apache.thrift.TException; import org.apache.thrift.TProcessor; import org.apache.zookeeper.KeeperException; @@ -177,6 +184,8 @@ private TabletMetadataLoader(Ample ample) { } private static final Logger LOG = LoggerFactory.getLogger(ScanServer.class); + // Default pattern to allow scans on all tables not in accumulo namespace + private static final String DEFAULT_SCAN_ALLOWED_PATTERN = "^(?!accumulo\\.).*$"; protected ThriftScanClientHandler delegate; private UUID serverLockUUID; @@ -213,6 +222,9 @@ private TabletMetadataLoader(Ample ample) { private final String groupName; + private final ConcurrentHashMap allowedTables = new ConcurrentHashMap<>(); + private volatile String currentAllowedTableRegex; + public ScanServer(ScanServerOpts opts, String[] args) { super("sserver", opts, args); @@ -388,6 +400,7 @@ public void run() { } SecurityUtil.serverLogin(getConfiguration()); + updateAllowedTables(false); ServerAddress address = null; try { @@ -423,6 +436,7 @@ public void run() { Thread.sleep(1000); updateIdleStatus(sessionManager.getActiveScans().isEmpty() && tabletMetadataCache.estimatedSize() == 0); + updateAllowedTables(false); } catch (InterruptedException e) { LOG.info("Interrupt Exception received, shutting down"); gracefulShutdown(getContext().rpcCreds()); @@ -477,6 +491,106 @@ public void run() { } } + // Visible for testing + protected boolean isAllowed(TCredentials credentials, TableId tid) + throws ThriftSecurityException { + Boolean result = allowedTables.get(tid); + if (result == null) { + + final Retry retry = + Retry.builder().maxRetries(10).retryAfter(1, SECONDS).incrementBy(0, SECONDS) + .maxWait(2, SECONDS).backOffFactor(1.0).logInterval(3, SECONDS).createRetry(); + + while (result == null && retry.canRetry()) { + try { + retry.waitForNextAttempt(LOG, + "Allowed tables mapping does not contain an entry for table: " + tid + + ", refreshing table..."); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.error("Interrupted while waiting for next retry", e); + break; + } + // Clear the cache and try again, maybe there + // is a race condition in table creation and scan + updateAllowedTables(true); + // validate that the table exists, else throw + delegate.getNamespaceId(credentials, tid); + result = allowedTables.get(tid); + retry.useRetry(); + } + + if (result == null) { + // Ran out of retries + throw new IllegalStateException( + "Unable to get allowed table mapping for table: " + tid + " within 10s"); + } + } + return result; + } + + private synchronized void updateAllowedTables(boolean clearCache) { + + LOG.trace("Updating allowed tables for ScanServer"); + if (clearCache) { + context.clearTableListCache(); + } + + // Remove tables that no longer exist + allowedTables.keySet().forEach(tid -> { + if (!getContext().getTableIdToNameMap().containsKey(tid)) { + LOG.trace("Removing table {} from allowed table map as it no longer exists", tid); + allowedTables.remove(tid); + } + }); + + final String propName = Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + groupName; + String allowedTableRegex = getConfiguration().get(propName); + if (allowedTableRegex == null) { + allowedTableRegex = DEFAULT_SCAN_ALLOWED_PATTERN; + } + + if (currentAllowedTableRegex == null) { + LOG.trace("Property {} initial value: {}", propName, allowedTableRegex); + } else if (currentAllowedTableRegex.equals(allowedTableRegex)) { + // Property value has not changed, do nothing + } else { + LOG.info("Property {} has changed. Old value: {}, new value: {}", propName, + currentAllowedTableRegex, allowedTableRegex); + } + + Pattern allowedTablePattern; + try { + allowedTablePattern = Pattern.compile(allowedTableRegex); + // Regex is valid, store it + currentAllowedTableRegex = allowedTableRegex; + } catch (PatternSyntaxException e) { + LOG.error( + "Property {} contains an invalid regular expression. Property value: {}. Disabling all tables.", + propName, allowedTableRegex); + allowedTablePattern = null; + } + + Pattern p = allowedTablePattern; + context.getTableNameToIdMap().entrySet().forEach(e -> { + String tname = e.getKey(); + TableId tid = e.getValue(); + if (p == null) { + allowedTables.put(tid, Boolean.FALSE); + } else { + Matcher m = p.matcher(tname); + if (m.matches()) { + LOG.trace("Table {} can now be scanned via this ScanServer", tname); + allowedTables.put(tid, Boolean.TRUE); + } else { + LOG.trace("Table {} cannot be scanned via this ScanServer", tname); + allowedTables.put(tid, Boolean.FALSE); + } + } + }); + + } + @SuppressWarnings("unchecked") private Map getTabletMetadata(Collection extents) { if (tabletMetadataCache == null) { @@ -945,11 +1059,6 @@ public void close() { }; } - /* Exposed for testing */ - protected boolean isSystemUser(TCredentials creds) { - return context.getSecurityOperation().isSystemUser(creds); - } - @Override public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent textent, TRange range, List columns, int batchSize, List ssiList, @@ -966,9 +1075,10 @@ public InitialScan startScan(TInfo tinfo, TCredentials credentials, TKeyExtent t KeyExtent extent = getKeyExtent(textent); - if (extent.isMeta() && !isSystemUser(credentials)) { - throw new TException( - "Only the system user can perform eventual consistency scans on the root and metadata tables"); + if (!isAllowed(credentials, extent.tableId())) { + throw new TApplicationException(TApplicationException.INTERNAL_ERROR, + "Scan of table " + extent.tableId() + " disallowed by property: " + + Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + this.groupName); } try (ScanReservation reservation = @@ -1038,9 +1148,10 @@ public InitialMultiScan startMultiScan(TInfo tinfo, TCredentials credentials, for (Entry> entry : tbatch.entrySet()) { KeyExtent extent = getKeyExtent(entry.getKey()); - if (extent.isMeta() && !context.getSecurityOperation().isSystemUser(credentials)) { - throw new TException( - "Only the system user can perform eventual consistency scans on the root and metadata tables"); + if (!isAllowed(credentials, extent.tableId())) { + throw new TApplicationException(TApplicationException.INTERNAL_ERROR, + "Scan of table " + extent.tableId() + " disallowed by property: " + + Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + this.groupName); } batch.put(extent, entry.getValue()); diff --git a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java index b254fd3e405..e0a54c83745 100644 --- a/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java +++ b/server/tserver/src/main/java/org/apache/accumulo/tserver/ThriftScanClientHandler.java @@ -109,7 +109,7 @@ public ThriftScanClientHandler(TabletHostingServer server, WriteTracker writeTra .getTimeInMillis(Property.TSERV_SCAN_RESULTS_MAX_TIMEOUT); } - private NamespaceId getNamespaceId(TCredentials credentials, TableId tableId) + public NamespaceId getNamespaceId(TCredentials credentials, TableId tableId) throws ThriftSecurityException { try { return server.getContext().getNamespaceId(tableId); diff --git a/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java b/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java index 5a080408720..968f75e1b26 100644 --- a/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java +++ b/server/tserver/src/test/java/org/apache/accumulo/tserver/ScanServerTest.java @@ -24,7 +24,9 @@ import static org.easymock.EasyMock.replay; import static org.easymock.EasyMock.verify; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; import java.nio.ByteBuffer; import java.util.ArrayList; @@ -32,8 +34,11 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.regex.Pattern; import org.apache.accumulo.core.client.AccumuloException; +import org.apache.accumulo.core.data.TableId; import org.apache.accumulo.core.dataImpl.KeyExtent; import org.apache.accumulo.core.dataImpl.thrift.InitialMultiScan; import org.apache.accumulo.core.dataImpl.thrift.InitialScan; @@ -43,6 +48,7 @@ import org.apache.accumulo.core.dataImpl.thrift.TColumn; import org.apache.accumulo.core.dataImpl.thrift.TKeyExtent; import org.apache.accumulo.core.dataImpl.thrift.TRange; +import org.apache.accumulo.core.metadata.MetadataTable; import org.apache.accumulo.core.process.thrift.ServerProcessService; import org.apache.accumulo.core.securityImpl.thrift.TCredentials; import org.apache.accumulo.core.tabletserver.thrift.NoSuchScanIDException; @@ -55,6 +61,7 @@ import org.apache.accumulo.tserver.tablet.SnapshotTablet; import org.apache.accumulo.tserver.tablet.Tablet; import org.apache.accumulo.tserver.tablet.TabletBase; +import org.apache.thrift.TApplicationException; import org.apache.thrift.TException; import org.junit.jupiter.api.Test; @@ -65,7 +72,7 @@ public class TestScanServer extends ScanServer implements ServerProcessService.I private KeyExtent extent; private TabletResolver resolver; private ScanReservation reservation; - private boolean systemUser; + private ConcurrentHashMap allowedTables; protected TestScanServer(ScanServerOpts opts, String[] args) { super(opts, args); @@ -114,13 +121,17 @@ ScanReservation reserveFilesInstrumented(long scanId) { } @Override - protected boolean isSystemUser(TCredentials creds) { - return systemUser; + public boolean isShutdownRequested() { + return false; } @Override - public boolean isShutdownRequested() { - return false; + protected boolean isAllowed(TCredentials credentials, TableId tid) { + return allowedTables.containsKey(tid); + } + + public void addAllowedTable(TableId tid) { + allowedTables.put(tid, tid); } } @@ -147,6 +158,8 @@ public void testScan() throws Exception { TabletResolver resolver = createMock(TabletResolver.class); TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock(); + TableId tid = TableId.of("42"); + expect(sextent.tableId()).andReturn(tid).once(); expect(reservation.newTablet(ss, sextent)).andReturn(tablet); expect(reservation.getFailures()).andReturn(Map.of()).anyTimes(); reservation.close(); @@ -157,14 +170,15 @@ public void testScan() throws Exception { expect(handler.continueScan(tinfo, 15, 0L)).andReturn(new ScanResult()); handler.closeScan(tinfo, 15); - replay(reservation, handler); + replay(reservation, sextent, handler); + ss.allowedTables = new ConcurrentHashMap<>(); + ss.addAllowedTable(tid); ss.delegate = handler; ss.extent = sextent; ss.resolver = resolver; ss.reservation = reservation; ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234); - ss.systemUser = false; TKeyExtent textent = createMock(TKeyExtent.class); InitialScan is = ss.startScan(tinfo, tcreds, textent, trange, tcols, 10, titer, ssio, auths, @@ -172,7 +186,7 @@ public void testScan() throws Exception { assertEquals(15, is.getScanID()); ss.continueScan(tinfo, is.getScanID(), 0L); ss.closeScan(tinfo, is.getScanID()); - verify(reservation, handler); + verify(reservation, sextent, handler); } @Test @@ -194,18 +208,20 @@ public void testScanTabletLoadFailure() throws Exception { Map execHints = new HashMap<>(); ScanReservation reservation = createMock(ScanReservation.class); - expect(extent.isMeta()).andReturn(false).anyTimes(); + TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock(); + TableId tid = TableId.of("42"); + expect(extent.tableId()).andReturn(tid).once(); expect(extent.toThrift()).andReturn(textent).anyTimes(); expect(reservation.getFailures()).andReturn(Map.of(textent, ranges)); reservation.close(); replay(extent, reservation); - TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock(); + ss.allowedTables = new ConcurrentHashMap<>(); + ss.addAllowedTable(tid); ss.extent = extent; ss.delegate = handler; ss.reservation = reservation; - ss.systemUser = false; assertThrows(NotServingTabletException.class, () -> { ss.startScan(tinfo, tcreds, textent, trange, tcols, 10, titer, ssio, auths, false, false, 10, @@ -246,7 +262,8 @@ public void close() {} }; TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock(); - expect(extent.isMeta()).andReturn(false).anyTimes(); + TableId tid = TableId.of("42"); + expect(extent.tableId()).andReturn(tid).once(); expect(reservation.newTablet(ss, extent)).andReturn(tablet); expect(reservation.getTabletMetadataExtents()).andReturn(Set.of(extent)); expect(reservation.getFailures()).andReturn(Map.of()); @@ -259,12 +276,13 @@ public void close() {} replay(extent, reservation, handler); + ss.allowedTables = new ConcurrentHashMap<>(); + ss.addAllowedTable(tid); ss.delegate = handler; ss.extent = extent; ss.resolver = resolver; ss.reservation = reservation; ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234); - ss.systemUser = false; Map> extents = new HashMap<>(); extents.put(createMock(TKeyExtent.class), ranges); @@ -309,7 +327,8 @@ public void close() {} }; TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock(); - expect(extent.isMeta()).andReturn(false).anyTimes(); + TableId tid = TableId.of("42"); + expect(extent.tableId()).andReturn(tid).once(); expect(reservation.newTablet(ss, extent)).andReturn(tablet).anyTimes(); expect(reservation.getTabletMetadataExtents()).andReturn(Set.of()); expect(reservation.getFailures()).andReturn(Map.of(textent, ranges)).anyTimes(); @@ -321,12 +340,13 @@ public void close() {} replay(extent, reservation, handler); + ss.allowedTables = new ConcurrentHashMap<>(); + ss.addAllowedTable(tid); ss.delegate = handler; ss.extent = extent; ss.resolver = resolver; ss.reservation = reservation; ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234); - ss.systemUser = false; Map> extents = new HashMap<>(); extents.put(textent, ranges); @@ -370,7 +390,6 @@ public void close() {} ss.delegate = handler; ss.resolver = resolver; ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234); - ss.systemUser = false; assertThrows(TException.class, () -> { ss.startMultiScan(tinfo, tcreds, extents, tcols, titer, ssio, auths, false, tsc, 30L, @@ -380,7 +399,7 @@ public void close() {} } @Test - public void testScanMetaTablesSystemUser() throws Exception { + public void testScanDefaultAllowedTables() throws Exception { handler = createMock(ThriftScanClientHandler.class); TInfo tinfo = createMock(TInfo.class); @@ -399,7 +418,7 @@ public void testScanMetaTablesSystemUser() throws Exception { TabletResolver resolver = createMock(TabletResolver.class); TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock(); - expect(sextent.isMeta()).andReturn(true).anyTimes(); + expect(sextent.tableId()).andReturn(MetadataTable.ID).once(); expect(reservation.newTablet(ss, sextent)).andReturn(tablet); expect(reservation.getFailures()).andReturn(Map.of()).anyTimes(); reservation.close(); @@ -412,12 +431,13 @@ public void testScanMetaTablesSystemUser() throws Exception { replay(sextent, reservation, handler); + ss.allowedTables = new ConcurrentHashMap<>(); + ss.addAllowedTable(MetadataTable.ID); ss.delegate = handler; ss.extent = sextent; ss.resolver = resolver; ss.reservation = reservation; ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234); - ss.systemUser = true; TKeyExtent textent = createMock(TKeyExtent.class); InitialScan is = ss.startScan(tinfo, tcreds, textent, trange, tcols, 10, titer, ssio, auths, @@ -430,7 +450,7 @@ public void testScanMetaTablesSystemUser() throws Exception { } @Test - public void testScanMetaTablesNonSystemUser() throws Exception { + public void testScanDisallowedTable() throws Exception { handler = createMock(ThriftScanClientHandler.class); TInfo tinfo = createMock(TInfo.class); @@ -448,25 +468,40 @@ public void testScanMetaTablesNonSystemUser() throws Exception { TabletResolver resolver = createMock(TabletResolver.class); TestScanServer ss = partialMockBuilder(TestScanServer.class).createMock(); - expect(sextent.isMeta()).andReturn(true).anyTimes(); + expect(sextent.tableId()).andReturn(MetadataTable.ID).anyTimes(); expect(reservation.getFailures()).andReturn(Map.of()).anyTimes(); replay(sextent, reservation, handler); + ss.allowedTables = new ConcurrentHashMap<>(); + ss.addAllowedTable(TableId.of("42")); ss.delegate = handler; ss.extent = sextent; ss.resolver = resolver; ss.reservation = reservation; ss.clientAddress = HostAndPort.fromParts("127.0.0.1", 1234); - ss.systemUser = false; TKeyExtent textent = createMock(TKeyExtent.class); - assertThrows(TException.class, () -> { + TException te = assertThrows(TException.class, () -> { ss.startScan(tinfo, tcreds, textent, trange, tcols, 10, titer, ssio, auths, false, false, 10, tsc, 30L, classLoaderContext, execHints, 0L); }); + assertTrue(te instanceof TApplicationException); + TApplicationException tae = (TApplicationException) te; + assertEquals(TApplicationException.INTERNAL_ERROR, tae.getType()); + assertTrue(tae.getMessage().contains("disallowed by property")); verify(sextent, reservation, handler); } + @Test + public void testTableNameRegex() { + String r = "^(?!accumulo\\.).*$"; + Pattern p = Pattern.compile(r); + + assertFalse(p.matcher("accumulo.root").matches()); + assertFalse(p.matcher("accumulo.metadata").matches()); + assertTrue(p.matcher("test.table").matches()); + } + } diff --git a/test/src/main/java/org/apache/accumulo/test/ScanServerAllowedTablesIT.java b/test/src/main/java/org/apache/accumulo/test/ScanServerAllowedTablesIT.java new file mode 100644 index 00000000000..e14834cf483 --- /dev/null +++ b/test/src/main/java/org/apache/accumulo/test/ScanServerAllowedTablesIT.java @@ -0,0 +1,292 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.accumulo.test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Map; +import java.util.Set; + +import org.apache.accumulo.core.Constants; +import org.apache.accumulo.core.client.Accumulo; +import org.apache.accumulo.core.client.AccumuloClient; +import org.apache.accumulo.core.client.BatchScanner; +import org.apache.accumulo.core.client.Scanner; +import org.apache.accumulo.core.client.ScannerBase; +import org.apache.accumulo.core.client.ScannerBase.ConsistencyLevel; +import org.apache.accumulo.core.client.TableNotFoundException; +import org.apache.accumulo.core.clientImpl.ClientContext; +import org.apache.accumulo.core.conf.ClientProperty; +import org.apache.accumulo.core.conf.Property; +import org.apache.accumulo.core.data.Range; +import org.apache.accumulo.core.fate.zookeeper.ZooReaderWriter; +import org.apache.accumulo.core.metadata.MetadataTable; +import org.apache.accumulo.core.security.Authorizations; +import org.apache.accumulo.core.spi.scan.ScanServerSelector; +import org.apache.accumulo.harness.MiniClusterConfigurationCallback; +import org.apache.accumulo.harness.SharedMiniClusterBase; +import org.apache.accumulo.minicluster.ServerType; +import org.apache.accumulo.miniclusterImpl.MiniAccumuloConfigImpl; +import org.apache.accumulo.test.util.Wait; +import org.apache.accumulo.tserver.ScanServer; +import org.apache.commons.lang3.exception.ExceptionUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.thrift.TApplicationException; +import org.apache.zookeeper.ZooKeeper; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; + +import com.google.common.collect.Iterables; + +public class ScanServerAllowedTablesIT extends SharedMiniClusterBase { + + // @formatter:off + private static final String clientConfiguration = + "["+ + " {"+ + " \"isDefault\": true,"+ + " \"maxBusyTimeout\": \"5m\","+ + " \"busyTimeoutMultiplier\": 8,"+ + " \"scanTypeActivations\": [],"+ + " \"attemptPlans\": ["+ + " {"+ + " \"servers\": \"3\","+ + " \"busyTimeout\": \"33ms\","+ + " \"salt\": \"one\""+ + " },"+ + " {"+ + " \"servers\": \"13\","+ + " \"busyTimeout\": \"33ms\","+ + " \"salt\": \"two\""+ + " },"+ + " {"+ + " \"servers\": \"100%\","+ + " \"busyTimeout\": \"33ms\""+ + " }"+ + " ]"+ + " },"+ + " {"+ + " \"isDefault\": false,"+ + " \"maxBusyTimeout\": \"5m\","+ + " \"busyTimeoutMultiplier\": 8,"+ + " \"group\": \"GROUP1\","+ + " \"scanTypeActivations\": [\"use_group1\"],"+ + " \"attemptPlans\": ["+ + " {"+ + " \"servers\": \"3\","+ + " \"busyTimeout\": \"33ms\","+ + " \"salt\": \"one\""+ + " },"+ + " {"+ + " \"servers\": \"13\","+ + " \"busyTimeout\": \"33ms\","+ + " \"salt\": \"two\""+ + " },"+ + " {"+ + " \"servers\": \"100%\","+ + " \"busyTimeout\": \"33ms\""+ + " }"+ + " ]"+ + " }"+ + "]"; + // @formatter:on + + public static class SSATITConfiguration implements MiniClusterConfigurationCallback { + + @Override + public void configureMiniCluster(MiniAccumuloConfigImpl cfg, Configuration coreSite) { + + cfg.setNumScanServers(1); + + // allow the ScanServer in the DEFAULT group to only scan tables in accumulo namespace + cfg.setProperty(Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + + ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME, "^accumulo\\..*$"); + // allow the ScanServer in the GROUP1 group to only scan tables created with the prefix 'test' + cfg.setProperty(Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + "GROUP1", "^test.*"); + + cfg.setClientProperty(ClientProperty.SCAN_SERVER_SELECTOR_OPTS_PREFIX.getKey() + "profiles", + clientConfiguration); + } + + } + + @BeforeAll + public static void start() throws Exception { + SharedMiniClusterBase.startMiniClusterWithConfig(new SSATITConfiguration()); + SharedMiniClusterBase.getCluster().getClusterControl().start(ServerType.SCAN_SERVER, + "localhost"); + + String zooRoot = getCluster().getServerContext().getZooKeeperRoot(); + ZooReaderWriter zrw = getCluster().getServerContext().getZooReaderWriter(); + String scanServerRoot = zooRoot + Constants.ZSSERVERS; + + while (zrw.getChildren(scanServerRoot).size() == 0) { + Thread.sleep(500); + } + } + + @AfterAll + public static void stop() throws Exception { + SharedMiniClusterBase.stopMiniCluster(); + } + + public static enum ScannerType { + BATCH_SCANNER, SCANNER; + } + + private ScannerBase createScanner(AccumuloClient client, ScannerType stype, String tableName) + throws TableNotFoundException { + switch (stype) { + case BATCH_SCANNER: + BatchScanner batchScanner = client.createBatchScanner(tableName, Authorizations.EMPTY); + batchScanner.setRanges(Set.of(new Range())); + return batchScanner; + case SCANNER: + Scanner scanner = client.createScanner(tableName, Authorizations.EMPTY); + scanner.setRange(new Range()); + return scanner; + default: + throw new IllegalArgumentException("Unknown scanner type: " + stype); + } + } + + @SuppressWarnings("unused") + @ParameterizedTest + @EnumSource(value = ScannerType.class) + public void testAllowedTables(ScannerType stype) throws Exception { + + final String zooRoot = getCluster().getServerContext().getZooKeeperRoot(); + final ZooKeeper zk = getCluster().getServerContext().getZooReaderWriter().getZooKeeper(); + final String scanServerRoot = zooRoot + Constants.ZSSERVERS; + + try (AccumuloClient client = Accumulo.newClient().from(getClientProps()).build()) { + + // Start the 2nd ScanServer + // Bump the number of scan serves that can run to start the GROUP1 scan server + getCluster().getConfig().setNumScanServers(2); + getCluster()._exec(ScanServer.class, ServerType.SCAN_SERVER, Map.of(), + new String[] {"-g", "GROUP1"}); + Wait.waitFor(() -> zk.getChildren(scanServerRoot, false).size() == 2); + Wait.waitFor(() -> ((ClientContext) client).getScanServers().values().stream().anyMatch( + (p) -> p.getSecond().equals(ScanServerSelector.DEFAULT_SCAN_SERVER_GROUP_NAME)) == true); + Wait.waitFor(() -> ((ClientContext) client).getScanServers().values().stream() + .anyMatch((p) -> p.getSecond().equals("GROUP1")) == true); + + // Create table with test prefix, load some data + final String testTableName = "testAllowedTables" + stype.name(); + final int ingestedEntryCount = + ScanServerIT.createTableAndIngest(client, testTableName, null, 10, 10, "colf"); + assertEquals(100, ingestedEntryCount); + + // Using default ScanServer should succeed, only allowed to scan system tables + try (ScannerBase scanner = createScanner(client, stype, MetadataTable.NAME)) { + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + assertTrue(Iterables.size(scanner) > 0); + } + + // Using default ScanServer should fail, only allowed to scan system tables + try (ScannerBase scanner = createScanner(client, stype, testTableName)) { + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + RuntimeException re = assertThrows(RuntimeException.class, () -> Iterables.size(scanner)); + Throwable root = ExceptionUtils.getRootCause(re); + assertTrue(root instanceof TApplicationException); + TApplicationException tae = (TApplicationException) root; + assertEquals(TApplicationException.INTERNAL_ERROR, tae.getType()); + assertTrue(tae.getMessage().contains("disallowed by property")); + } + + // Using GROUP1 ScanServer should fail, only allowed to test tables + try (ScannerBase scanner = createScanner(client, stype, MetadataTable.NAME)) { + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + scanner.setExecutionHints(Map.of("scan_type", "use_group1")); + RuntimeException re = assertThrows(RuntimeException.class, () -> Iterables.size(scanner)); + Throwable root = ExceptionUtils.getRootCause(re); + assertTrue(root instanceof TApplicationException); + TApplicationException tae = (TApplicationException) root; + assertEquals(TApplicationException.INTERNAL_ERROR, tae.getType()); + assertTrue(tae.getMessage().contains("disallowed by property")); + } + + // Using GROUP1 ScanServer should succeed + try (ScannerBase scanner = createScanner(client, stype, testTableName)) { + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + scanner.setExecutionHints(Map.of("scan_type", "use_group1")); + assertEquals(100, Iterables.size(scanner)); + } + + // Change the GROUP1 property so that subsequent test tables don't work + getCluster().getServerContext().instanceOperations() + .setProperty(Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + "GROUP1", "^foo.*"); + + // Using GROUP1 ScanServer should fail, only allowed to test 'test*' tables + try (ScannerBase scanner = createScanner(client, stype, MetadataTable.NAME)) { + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + scanner.setExecutionHints(Map.of("scan_type", "use_group1")); + RuntimeException re = assertThrows(RuntimeException.class, () -> Iterables.size(scanner)); + Throwable root = ExceptionUtils.getRootCause(re); + assertTrue(root instanceof TApplicationException); + TApplicationException tae = (TApplicationException) root; + assertEquals(TApplicationException.INTERNAL_ERROR, tae.getType()); + assertTrue(tae.getMessage().contains("disallowed by property")); + } + + // Using GROUP1 ScanServer should fail as the property was changed + try (ScannerBase scanner = createScanner(client, stype, testTableName)) { + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + scanner.setExecutionHints(Map.of("scan_type", "use_group1")); + // Try multiple times waiting for the server to pick up the property change + Wait.waitFor(() -> { + try { + var unused = Iterables.size(scanner); + return false; + } catch (RuntimeException e) { + return true; + } + }); + } + + // Change the GROUP1 property so that subsequent test tables do work + getCluster().getServerContext().instanceOperations() + .setProperty(Property.SSERV_SCAN_ALLOWED_TABLES.getKey() + "GROUP1", "^test.*"); + + // Using GROUP1 ScanServer should succeed as the property was changed back + try (ScannerBase scanner = createScanner(client, stype, testTableName)) { + scanner.setConsistencyLevel(ConsistencyLevel.EVENTUAL); + scanner.setExecutionHints(Map.of("scan_type", "use_group1")); + // Try multiple times waiting for the server to pick up the property change + Wait.waitFor(() -> { + try { + int size = Iterables.size(scanner); + return size == 100; + } catch (RuntimeException e) { + return false; + } + }); + + } + + } + + } + +}