From ee5ec50245ed8948119715826cbb9e4fe4e4ab6d Mon Sep 17 00:00:00 2001 From: Andrei Shitov Date: Thu, 26 Mar 2026 10:09:20 +0300 Subject: [PATCH] feat(ZOOKEEPER-3824): allow SASL allowlist expansion during reconfig --- .../zookeeper/server/quorum/QuorumPeer.java | 93 +++++++ .../server/quorum/QuorumPeerConfig.java | 13 + .../server/quorum/QuorumZooKeeperServer.java | 132 ++++++++++ .../server/quorum/auth/QuorumAuth.java | 4 + .../quorum/auth/SaslQuorumAuthServer.java | 12 +- .../auth/SaslQuorumServerCallbackHandler.java | 31 ++- .../quorum/QuorumSaslAuthzZnodeTest.java | 234 ++++++++++++++++++ 7 files changed, 514 insertions(+), 5 deletions(-) create mode 100644 zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSaslAuthzZnodeTest.java diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java index bd444eae1de..66ba5b7ea12 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeer.java @@ -39,6 +39,7 @@ import java.util.HashSet; import java.util.LinkedList; import java.util.List; +import java.util.Locale; import java.util.Map; import java.util.Map.Entry; import java.util.Properties; @@ -793,6 +794,11 @@ public synchronized void setCurrentVote(Vote v) { */ protected int quorumCnxnThreadsSize = QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE; + private boolean quorumSaslAuthzZnodeEnabled = false; + private String quorumSaslAuthzZnodePath = ""; + private final AtomicReference> manualSaslAuthzHosts = + new AtomicReference<>(Collections.emptySet()); + public static final String QUORUM_CNXN_TIMEOUT_MS = "zookeeper.quorumCnxnTimeoutMs"; private static int quorumCnxnTimeoutMs; @@ -1903,6 +1909,40 @@ private void connectNewPeers(QuorumCnxManager qcm) { } } + public void refreshQuorumSaslAuthzHosts(QuorumVerifier... extraQVs) { + if (!(authServer instanceof SaslQuorumAuthServer)) { + return; + } + + Set hosts = new HashSet<>(); + synchronized (QV_LOCK) { + addHostsFromQV(quorumVerifier, hosts); + addHostsFromQV(lastSeenQuorumVerifier, hosts); + } + Set manualHosts = manualSaslAuthzHosts.get(); + if (manualHosts != null) { + hosts.addAll(manualHosts); + } + if (extraQVs != null) { + for (QuorumVerifier qv : extraQVs) { + addHostsFromQV(qv, hosts); + } + } + + ((SaslQuorumAuthServer) authServer).updateAuthorizedHosts(hosts); + } + + private static void addHostsFromQV(QuorumVerifier qv, Set hosts) { + if (qv == null) { + return; + } + for (QuorumServer qs : qv.getAllMembers().values()) { + if (qs != null && qs.hostname != null) { + hosts.add(qs.hostname); + } + } + } + public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) { if (!isReconfigEnabled()) { LOG.info("Dynamic reconfig is disabled, we don't store the last seen config."); @@ -1932,6 +1972,7 @@ public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) { return; } lastSeenQuorumVerifier = qv; + refreshQuorumSaslAuthzHosts(); if (qcm != null) { connectNewPeers(qcm); } @@ -1991,6 +2032,7 @@ public QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) setAddrs(qs.addr, qs.electionAddr, qs.clientAddr); } updateObserverMasterList(); + refreshQuorumSaslAuthzHosts(); return prevQV; } } @@ -2597,6 +2639,55 @@ void setQuorumCnxnThreadsSize(int qCnxnThreadsSize) { LOG.info("quorum.cnxn.threads.size set to {}", quorumCnxnThreadsSize); } + void setQuorumSaslAuthzZnodeEnabled(boolean enabled) { + quorumSaslAuthzZnodeEnabled = enabled; + } + + boolean isQuorumSaslAuthzZnodeEnabled() { + return quorumSaslAuthzZnodeEnabled; + } + + void setQuorumSaslAuthzZnodePath(String path) { + if (path == null) { + return; + } + quorumSaslAuthzZnodePath = path.trim(); + } + + String getQuorumSaslAuthzZnodePath() { + return quorumSaslAuthzZnodePath; + } + + void setManualSaslAuthzHosts(String hostsCsv) { + manualSaslAuthzHosts.set(parseAuthzHosts(hostsCsv)); + } + + void clearManualSaslAuthzHosts() { + manualSaslAuthzHosts.set(Collections.emptySet()); + } + + // VisibleForTesting + Set getManualSaslAuthzHosts() { + return manualSaslAuthzHosts.get(); + } + + private static Set parseAuthzHosts(String hostsCsv) { + if (hostsCsv == null) { + return Collections.emptySet(); + } + String trimmed = hostsCsv.trim(); + if (trimmed.isEmpty()) { + return Collections.emptySet(); + } + Set hosts = new HashSet<>(); + for (String token : trimmed.split("[,\\s]+")) { + if (!token.isEmpty()) { + hosts.add(token.toLowerCase(Locale.ROOT)); + } + } + return Collections.unmodifiableSet(hosts); + } + boolean isQuorumSaslAuthEnabled() { return quorumSaslEnableAuth; } @@ -2700,6 +2791,8 @@ public static QuorumPeer createFromConfig(QuorumPeerConfig config) throws IOExce quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext); } quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize); + quorumPeer.setQuorumSaslAuthzZnodeEnabled(config.quorumSaslAuthzZnodeEnabled); + quorumPeer.setQuorumSaslAuthzZnodePath(config.quorumSaslAuthzZnodePath); if (config.jvmPauseMonitorToRun) { quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config)); diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java index b87058e1cd1..7293157c523 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumPeerConfig.java @@ -121,6 +121,8 @@ public class QuorumPeerConfig { protected String quorumLearnerLoginContext = QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT_DFAULT_VALUE; protected String quorumServerLoginContext = QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT_DFAULT_VALUE; protected int quorumCnxnThreadsSize; + protected boolean quorumSaslAuthzZnodeEnabled = false; + protected String quorumSaslAuthzZnodePath = ""; // multi address related configs private boolean multiAddressEnabled = Boolean.parseBoolean( @@ -362,6 +364,10 @@ public void parseProperties(Properties zkProp) throws IOException, ConfigExcepti quorumServerLoginContext = value; } else if (key.equals(QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL)) { quorumServicePrincipal = value; + } else if (key.equals(QuorumAuth.QUORUM_SASL_AUTHZ_ZNODE_ENABLED)) { + quorumSaslAuthzZnodeEnabled = parseBoolean(key, value); + } else if (key.equals(QuorumAuth.QUORUM_SASL_AUTHZ_ZNODE_PATH)) { + quorumSaslAuthzZnodePath = value; } else if (key.equals("quorum.cnxn.threads.size")) { quorumCnxnThreadsSize = Integer.parseInt(value); } else if (key.equals(JvmPauseMonitor.INFO_THRESHOLD_KEY)) { @@ -390,6 +396,13 @@ public void parseProperties(Properties zkProp) throws IOException, ConfigExcepti } } + if (quorumSaslAuthzZnodeEnabled && quorumSaslAuthzZnodePath.trim().isEmpty()) { + quorumSaslAuthzZnodePath = QuorumAuth.QUORUM_SASL_AUTHZ_ZNODE_DEFAULT_PATH; + } + if (!quorumSaslAuthzZnodeEnabled && !quorumSaslAuthzZnodePath.trim().isEmpty()) { + quorumSaslAuthzZnodeEnabled = true; + } + if (!quorumEnableSasl && quorumServerRequireSasl) { throw new IllegalArgumentException(QuorumAuth.QUORUM_SASL_AUTH_ENABLED + " is disabled, so cannot enable " diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java index 240936956fc..cd14d0c0feb 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/QuorumZooKeeperServer.java @@ -20,16 +20,23 @@ import java.io.IOException; import java.io.PrintWriter; +import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.function.BiConsumer; import java.util.stream.Collectors; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.MultiOperationRecord; import org.apache.zookeeper.Op; +import org.apache.zookeeper.ZooDefs; import org.apache.zookeeper.ZooDefs.OpCode; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.metrics.MetricsContext; import org.apache.zookeeper.proto.CreateRequest; +import org.apache.zookeeper.server.DataTree.ProcessTxnResult; import org.apache.zookeeper.server.Request; import org.apache.zookeeper.server.RequestRecord; import org.apache.zookeeper.server.ServerMetrics; @@ -46,6 +53,7 @@ public abstract class QuorumZooKeeperServer extends ZooKeeperServer { public final QuorumPeer self; protected UpgradeableSessionTracker upgradeableSessionTracker; + private final AtomicBoolean authzAclWarningLogged = new AtomicBoolean(false); protected QuorumZooKeeperServer(FileTxnSnapLog logFactory, int tickTime, int minSessionTimeout, int maxSessionTimeout, int listenBacklog, ZKDatabase zkDb, QuorumPeer self) { @@ -54,6 +62,12 @@ protected QuorumZooKeeperServer(FileTxnSnapLog logFactory, int tickTime, int min this.self = self; } + @Override + public synchronized void startup() { + super.startup(); + refreshAuthzHostsFromZnode(); + } + @Override protected void startSessionTracker() { upgradeableSessionTracker = (UpgradeableSessionTracker) sessionTracker; @@ -213,4 +227,122 @@ public void dumpMonitorValues(BiConsumer response) { response.accept("peer_state", self.getDetailedPeerState()); } + @Override + public ProcessTxnResult processTxn(Request request) { + ProcessTxnResult rc = super.processTxn(request); + maybeRefreshAuthzHostsFromTxn(rc); + return rc; + } + + private void maybeRefreshAuthzHostsFromTxn(ProcessTxnResult rc) { + if (rc == null) { + return; + } + if (!self.isQuorumSaslAuthEnabled() || !self.isQuorumSaslAuthzZnodeEnabled()) { + return; + } + String znodePath = self.getQuorumSaslAuthzZnodePath(); + if (znodePath == null || znodePath.isEmpty()) { + return; + } + + if (rc.multiResult != null) { + for (ProcessTxnResult sub : rc.multiResult) { + if (isAuthzZnodeTxn(sub, znodePath)) { + if (sub.type == OpCode.delete) { + clearAuthzHostsFromZnode(); + } else { + refreshAuthzHostsFromZnode(); + } + return; + } + } + return; + } + + if (isAuthzZnodeTxn(rc, znodePath)) { + LOG.info("Authz znode txn applied: type={}, path={}", rc.type, rc.path); + if (rc.type == OpCode.delete) { + clearAuthzHostsFromZnode(); + } else { + refreshAuthzHostsFromZnode(); + } + } + } + + private static boolean isAuthzZnodeTxn(ProcessTxnResult rc, String znodePath) { + if (rc == null || rc.path == null) { + return false; + } + if (!znodePath.equals(rc.path)) { + return false; + } + return rc.type == OpCode.create + || rc.type == OpCode.create2 + || rc.type == OpCode.createContainer + || rc.type == OpCode.createTTL + || rc.type == OpCode.setData + || rc.type == OpCode.delete; + } + + private void refreshAuthzHostsFromZnode() { + if (!self.isQuorumSaslAuthEnabled() || !self.isQuorumSaslAuthzZnodeEnabled()) { + return; + } + String path = self.getQuorumSaslAuthzZnodePath(); + if (path == null || path.isEmpty()) { + return; + } + try { + if (hasWorldWritableAcl(path) && authzAclWarningLogged.compareAndSet(false, true)) { + LOG.warn("Authz znode {} has world-writable ACL. " + + "This is a security risk: any client can modify the quorum authorization host list. " + + "Set restrictive ACLs on this znode.", path); + } + byte[] data = getZKDatabase().getDataTree().getData(path, new Stat(), null); + if (data == null) { + LOG.info("Authz znode read returned null data for {}", path); + return; + } + if (data.length == 0) { + LOG.info("Authz znode read returned empty data for {}", path); + self.clearManualSaslAuthzHosts(); + } else { + LOG.info("Authz znode read {} bytes for {}", data.length, path); + self.setManualSaslAuthzHosts(new String(data, StandardCharsets.UTF_8)); + } + } catch (KeeperException.NoNodeException e) { + LOG.info("Authz znode missing at {}", path); + return; + } catch (Exception e) { + LOG.warn("Failed to refresh quorum SASL authz hosts from znode {}", path, e); + return; + } + + self.refreshQuorumSaslAuthzHosts(); + } + + private boolean hasWorldWritableAcl(String path) { + try { + List acls = getZKDatabase().getDataTree().getACL(path, new Stat()); + for (ACL acl : acls) { + if (ZooDefs.Ids.ANYONE_ID_UNSAFE.equals(acl.getId()) + && (acl.getPerms() & ZooDefs.Perms.WRITE) != 0) { + return true; + } + } + } catch (KeeperException.NoNodeException e) { + // node doesn't exist yet + } + return false; + } + + private void clearAuthzHostsFromZnode() { + if (!self.isQuorumSaslAuthEnabled() || !self.isQuorumSaslAuthzZnodeEnabled()) { + return; + } + self.clearManualSaslAuthzHosts(); + self.refreshQuorumSaslAuthzHosts(); + } + } diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/auth/QuorumAuth.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/auth/QuorumAuth.java index 9e5f914747d..032eec36dc1 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/auth/QuorumAuth.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/auth/QuorumAuth.java @@ -42,6 +42,10 @@ public class QuorumAuth { public static final String QUORUM_SERVER_SASL_LOGIN_CONTEXT = "quorum.auth.server.saslLoginContext"; public static final String QUORUM_SERVER_SASL_LOGIN_CONTEXT_DFAULT_VALUE = "QuorumServer"; + public static final String QUORUM_SASL_AUTHZ_ZNODE_ENABLED = "quorum.auth.sasl.authzZnode.enabled"; + public static final String QUORUM_SASL_AUTHZ_ZNODE_PATH = "quorum.auth.sasl.authzZnode.path"; + public static final String QUORUM_SASL_AUTHZ_ZNODE_DEFAULT_PATH = "/zookeeper/quorumAuthzHosts"; + static final String QUORUM_SERVER_PROTOCOL_NAME = "zookeeper-quorum"; static final String QUORUM_SERVER_SASL_DIGEST = "zk-quorum-sasl-md5"; static final String QUORUM_AUTH_MESSAGE_TAG = "qpconnect"; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthServer.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthServer.java index a1425833b7f..c7b67542cfb 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthServer.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/auth/SaslQuorumAuthServer.java @@ -23,7 +23,9 @@ import java.io.DataOutputStream; import java.io.IOException; import java.net.Socket; +import java.util.Collections; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import javax.security.auth.callback.CallbackHandler; import javax.security.auth.login.AppConfigurationEntry; @@ -47,9 +49,11 @@ public class SaslQuorumAuthServer implements QuorumAuthServer { private static final int MAX_RETRIES = 5; private final Login serverLogin; private final boolean quorumRequireSasl; + private final AtomicReference> currentAuthzHosts; public SaslQuorumAuthServer(boolean quorumRequireSasl, String loginContext, Set authzHosts) throws SaslException { this.quorumRequireSasl = quorumRequireSasl; + this.currentAuthzHosts = new AtomicReference<>(authzHosts != null ? authzHosts : Collections.emptySet()); try { AppConfigurationEntry[] entries = Configuration.getConfiguration().getAppConfigurationEntry(loginContext); if (entries == null || entries.length == 0) { @@ -58,7 +62,7 @@ public SaslQuorumAuthServer(boolean quorumRequireSasl, String loginContext, Set< loginContext)); } Supplier callbackSupplier = () -> { - return new SaslQuorumServerCallbackHandler(entries, authzHosts); + return new SaslQuorumServerCallbackHandler(entries, currentAuthzHosts.get()); }; serverLogin = new Login(loginContext, callbackSupplier, new ZKConfig()); serverLogin.startThreadIfNeeded(); @@ -67,6 +71,12 @@ public SaslQuorumAuthServer(boolean quorumRequireSasl, String loginContext, Set< } } + public void updateAuthorizedHosts(Set authzHosts) { + if (authzHosts != null) { + currentAuthzHosts.set(authzHosts); + } + } + @Override public void authenticate(Socket sock, DataInputStream din) throws SaslException { DataOutputStream dout = null; diff --git a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/auth/SaslQuorumServerCallbackHandler.java b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/auth/SaslQuorumServerCallbackHandler.java index e17c16f89cb..533a2b96750 100644 --- a/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/auth/SaslQuorumServerCallbackHandler.java +++ b/zookeeper-server/src/main/java/org/apache/zookeeper/server/quorum/auth/SaslQuorumServerCallbackHandler.java @@ -20,8 +20,12 @@ import java.util.Collections; import java.util.HashMap; +import java.util.Locale; import java.util.Map; +import java.util.Objects; import java.util.Set; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; import javax.security.auth.callback.Callback; import javax.security.auth.callback.CallbackHandler; import javax.security.auth.callback.NameCallback; @@ -48,7 +52,7 @@ public class SaslQuorumServerCallbackHandler implements CallbackHandler { private String userName; private final boolean isDigestAuthn; private final Map credentials; - private final Set authzHosts; + private final AtomicReference> authzHostsRef = new AtomicReference<>(Collections.emptySet()); public SaslQuorumServerCallbackHandler( AppConfigurationEntry[] configurationEntries, @@ -82,8 +86,27 @@ public SaslQuorumServerCallbackHandler( this.credentials = Collections.emptyMap(); } - // authorized host lists - this.authzHosts = authzHosts; + setAuthorizedHosts(authzHosts); + } + + void setAuthorizedHosts(Set newHosts) { + if (newHosts == null || newHosts.isEmpty()) { + authzHostsRef.set(Collections.emptySet()); + return; + } + Set normalized = Collections.unmodifiableSet( + newHosts.stream() + .filter(Objects::nonNull) + .map(host -> host.toLowerCase(Locale.ROOT)) + .collect(Collectors.toSet()) + ); + authzHostsRef.set(normalized); + LOG.info("Updated quorum SASL authorized hosts: {}", normalized); + } + + // VisibleForTesting + Set getAuthorizedHostsForTest() { + return authzHostsRef.get(); } public void handle(Callback[] callbacks) throws UnsupportedCallbackException { @@ -137,7 +160,7 @@ private void handleAuthorizeCallback(AuthorizeCallback ac) { if (!isDigestAuthn && authzFlag) { String[] components = authorizationID.split("[/@]"); if (components.length == 3) { - authzFlag = authzHosts.contains(components[1]); + authzFlag = authzHostsRef.get().contains(components[1].toLowerCase(Locale.ROOT)); } else { authzFlag = false; } diff --git a/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSaslAuthzZnodeTest.java b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSaslAuthzZnodeTest.java new file mode 100644 index 00000000000..da99cbfb03a --- /dev/null +++ b/zookeeper-server/src/test/java/org/apache/zookeeper/server/quorum/QuorumSaslAuthzZnodeTest.java @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.zookeeper.server.quorum; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import javax.security.sasl.SaslException; +import org.apache.jute.BinaryOutputArchive; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.server.Request; +import org.apache.zookeeper.server.ZKDatabase; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.apache.zookeeper.server.quorum.flexible.QuorumVerifier; +import org.apache.zookeeper.txn.CreateTxn; +import org.apache.zookeeper.txn.DeleteTxn; +import org.apache.zookeeper.txn.MultiTxn; +import org.apache.zookeeper.txn.SetDataTxn; +import org.apache.zookeeper.txn.Txn; +import org.apache.zookeeper.txn.TxnHeader; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +public class QuorumSaslAuthzZnodeTest { + + @TempDir + public File tmpDir; + + @Test + public void testAuthzHostsRefreshFromZnode() throws Exception { + TrackingQuorumPeer peer = createPeer(); + LeaderZooKeeperServer zks = createServer(peer); + String path = peer.getQuorumSaslAuthzZnodePath(); + + // Create the authz znode + TxnHeader createHdr = new TxnHeader(1, 1, 1, 1, ZooDefs.OpCode.create); + CreateTxn createTxn = new CreateTxn(path, "HostA,hostB".getBytes(StandardCharsets.UTF_8), + ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 0); + Request createReq = new Request(null, 1, 1, ZooDefs.OpCode.create, null, null); + createReq.setHdr(createHdr); + createReq.setTxn(createTxn); + zks.processTxn(createReq); + + assertEquals(new HashSet<>(Arrays.asList("hosta", "hostb")), peer.getManualSaslAuthzHosts()); + assertEquals(1, peer.getRefreshCalls()); + + // Update the authz znode + TxnHeader setHdr = new TxnHeader(1, 1, 2, 2, ZooDefs.OpCode.setData); + SetDataTxn setDataTxn = new SetDataTxn(path, "hostC".getBytes(StandardCharsets.UTF_8), -1); + Request setReq = new Request(null, 1, 2, ZooDefs.OpCode.setData, null, null); + setReq.setHdr(setHdr); + setReq.setTxn(setDataTxn); + zks.processTxn(setReq); + + assertEquals(new HashSet<>(Arrays.asList("hostc")), peer.getManualSaslAuthzHosts()); + assertEquals(2, peer.getRefreshCalls()); + + // Delete the authz znode + TxnHeader deleteHdr = new TxnHeader(1, 1, 3, 3, ZooDefs.OpCode.delete); + DeleteTxn deleteTxn = new DeleteTxn(path); + Request deleteReq = new Request(null, 1, 3, ZooDefs.OpCode.delete, null, null); + deleteReq.setHdr(deleteHdr); + deleteReq.setTxn(deleteTxn); + zks.processTxn(deleteReq); + + assertTrue(peer.getManualSaslAuthzHosts().isEmpty()); + assertEquals(3, peer.getRefreshCalls()); + } + + @Test + public void testAuthzHostsRefreshFromMultiTxn() throws Exception { + TrackingQuorumPeer peer = createPeer(); + LeaderZooKeeperServer zks = createServer(peer); + String path = peer.getQuorumSaslAuthzZnodePath(); + + // Multi txn: create parent + create authz znode + CreateTxn parentTxn = new CreateTxn("/zookeeper", new byte[0], + ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 0); + CreateTxn authzTxn = new CreateTxn(path, "hostX,hostY".getBytes(StandardCharsets.UTF_8), + ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 0); + + List txns = new ArrayList<>(); + txns.add(new Txn(ZooDefs.OpCode.create, serializeTxn(parentTxn))); + txns.add(new Txn(ZooDefs.OpCode.create, serializeTxn(authzTxn))); + + TxnHeader multiHdr = new TxnHeader(1, 1, 1, 1, ZooDefs.OpCode.multi); + MultiTxn multiTxn = new MultiTxn(txns); + Request multiReq = new Request(null, 1, 1, ZooDefs.OpCode.multi, null, null); + multiReq.setHdr(multiHdr); + multiReq.setTxn(multiTxn); + zks.processTxn(multiReq); + + assertEquals(new HashSet<>(Arrays.asList("hostx", "hosty")), peer.getManualSaslAuthzHosts()); + assertEquals(1, peer.getRefreshCalls()); + } + + @Test + public void testUnrelatedZnodeDoesNotTriggerRefresh() throws Exception { + TrackingQuorumPeer peer = createPeer(); + LeaderZooKeeperServer zks = createServer(peer); + + // Create an unrelated znode + TxnHeader createHdr = new TxnHeader(1, 1, 1, 1, ZooDefs.OpCode.create); + CreateTxn createTxn = new CreateTxn("/some/other/path", "data".getBytes(StandardCharsets.UTF_8), + ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 0); + Request createReq = new Request(null, 1, 1, ZooDefs.OpCode.create, null, null); + createReq.setHdr(createHdr); + createReq.setTxn(createTxn); + zks.processTxn(createReq); + + assertTrue(peer.getManualSaslAuthzHosts().isEmpty()); + assertEquals(0, peer.getRefreshCalls()); + } + + private TrackingQuorumPeer createPeer() throws SaslException { + TrackingQuorumPeer peer = new TrackingQuorumPeer(); + peer.setTickTime(2000); + peer.setMinSessionTimeout(4000); + peer.setMaxSessionTimeout(40000); + peer.setInitialConfig("server.1=localhost:2888:3888:participant"); + peer.setQuorumSaslEnabled(true); + peer.setQuorumSaslAuthzZnodeEnabled(true); + peer.setQuorumSaslAuthzZnodePath("/zookeeper/quorumAuthzHosts"); + return peer; + } + + private LeaderZooKeeperServer createServer(TrackingQuorumPeer peer) throws Exception { + FileTxnSnapLog snapLog = new FileTxnSnapLog(tmpDir, tmpDir); + ZKDatabase zkDb = new ZKDatabase(snapLog); + return new LeaderZooKeeperServer(snapLog, peer, zkDb); + } + + @Test + public void testDisabledFeatureDoesNotTriggerRefresh() throws Exception { + TrackingQuorumPeer peer = new TrackingQuorumPeer(); + peer.setTickTime(2000); + peer.setMinSessionTimeout(4000); + peer.setMaxSessionTimeout(40000); + peer.setInitialConfig("server.1=localhost:2888:3888:participant"); + peer.setQuorumSaslEnabled(true); + peer.setQuorumSaslAuthzZnodeEnabled(false); + peer.setQuorumSaslAuthzZnodePath("/zookeeper/quorumAuthzHosts"); + + LeaderZooKeeperServer zks = createServer(peer); + String path = peer.getQuorumSaslAuthzZnodePath(); + + TxnHeader createHdr = new TxnHeader(1, 1, 1, 1, ZooDefs.OpCode.create); + CreateTxn createTxn = new CreateTxn(path, "hostA".getBytes(StandardCharsets.UTF_8), + ZooDefs.Ids.OPEN_ACL_UNSAFE, false, 0); + Request createReq = new Request(null, 1, 1, ZooDefs.OpCode.create, null, null); + createReq.setHdr(createHdr); + createReq.setTxn(createTxn); + zks.processTxn(createReq); + + assertTrue(peer.getManualSaslAuthzHosts().isEmpty()); + assertEquals(0, peer.getRefreshCalls()); + } + + @Test + public void testParseAuthzHostsEdgeCases() throws Exception { + TrackingQuorumPeer peer = createPeer(); + + peer.setManualSaslAuthzHosts("hostA, hostB\thostC,, hostD"); + assertEquals( + new HashSet<>(Arrays.asList("hosta", "hostb", "hostc", "hostd")), + peer.getManualSaslAuthzHosts()); + + peer.setManualSaslAuthzHosts(" "); + assertTrue(peer.getManualSaslAuthzHosts().isEmpty()); + + peer.setManualSaslAuthzHosts(null); + assertTrue(peer.getManualSaslAuthzHosts().isEmpty()); + + peer.setManualSaslAuthzHosts("SingleHost"); + assertEquals( + new HashSet<>(Arrays.asList("singlehost")), + peer.getManualSaslAuthzHosts()); + } + + private static byte[] serializeTxn(org.apache.jute.Record record) throws Exception { + try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) { + BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos); + record.serialize(boa, "request"); + return baos.toByteArray(); + } + } + + private static class TrackingQuorumPeer extends QuorumPeer { + private final AtomicInteger refreshCalls = new AtomicInteger(0); + + TrackingQuorumPeer() throws SaslException { + super(); + } + + @Override + public void refreshQuorumSaslAuthzHosts(QuorumVerifier... extraQVs) { + refreshCalls.incrementAndGet(); + } + + int getRefreshCalls() { + return refreshCalls.get(); + } + + @Override + Set getManualSaslAuthzHosts() { + return super.getManualSaslAuthzHosts(); + } + } + +}