Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
Expand Down Expand Up @@ -793,6 +794,11 @@ public synchronized void setCurrentVote(Vote v) {
*/
protected int quorumCnxnThreadsSize = QUORUM_CNXN_THREADS_SIZE_DEFAULT_VALUE;

private boolean quorumSaslAuthzZnodeEnabled = false;
private String quorumSaslAuthzZnodePath = "";
private final AtomicReference<Set<String>> manualSaslAuthzHosts =
new AtomicReference<>(Collections.emptySet());

public static final String QUORUM_CNXN_TIMEOUT_MS = "zookeeper.quorumCnxnTimeoutMs";
private static int quorumCnxnTimeoutMs;

Expand Down Expand Up @@ -1903,6 +1909,40 @@ private void connectNewPeers(QuorumCnxManager qcm) {
}
}

public void refreshQuorumSaslAuthzHosts(QuorumVerifier... extraQVs) {
if (!(authServer instanceof SaslQuorumAuthServer)) {
return;
}

Set<String> hosts = new HashSet<>();
synchronized (QV_LOCK) {
addHostsFromQV(quorumVerifier, hosts);
addHostsFromQV(lastSeenQuorumVerifier, hosts);
}
Set<String> manualHosts = manualSaslAuthzHosts.get();
if (manualHosts != null) {
hosts.addAll(manualHosts);
}
if (extraQVs != null) {
for (QuorumVerifier qv : extraQVs) {
addHostsFromQV(qv, hosts);
}
}

((SaslQuorumAuthServer) authServer).updateAuthorizedHosts(hosts);
}

private static void addHostsFromQV(QuorumVerifier qv, Set<String> hosts) {
if (qv == null) {
return;
}
for (QuorumServer qs : qv.getAllMembers().values()) {
if (qs != null && qs.hostname != null) {
hosts.add(qs.hostname);
}
}
}

public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) {
if (!isReconfigEnabled()) {
LOG.info("Dynamic reconfig is disabled, we don't store the last seen config.");
Expand Down Expand Up @@ -1932,6 +1972,7 @@ public void setLastSeenQuorumVerifier(QuorumVerifier qv, boolean writeToDisk) {
return;
}
lastSeenQuorumVerifier = qv;
refreshQuorumSaslAuthzHosts();
if (qcm != null) {
connectNewPeers(qcm);
}
Expand Down Expand Up @@ -1991,6 +2032,7 @@ public QuorumVerifier setQuorumVerifier(QuorumVerifier qv, boolean writeToDisk)
setAddrs(qs.addr, qs.electionAddr, qs.clientAddr);
}
updateObserverMasterList();
refreshQuorumSaslAuthzHosts();
return prevQV;
}
}
Expand Down Expand Up @@ -2597,6 +2639,55 @@ void setQuorumCnxnThreadsSize(int qCnxnThreadsSize) {
LOG.info("quorum.cnxn.threads.size set to {}", quorumCnxnThreadsSize);
}

void setQuorumSaslAuthzZnodeEnabled(boolean enabled) {
quorumSaslAuthzZnodeEnabled = enabled;
}

boolean isQuorumSaslAuthzZnodeEnabled() {
return quorumSaslAuthzZnodeEnabled;
}

void setQuorumSaslAuthzZnodePath(String path) {
if (path == null) {
return;
}
quorumSaslAuthzZnodePath = path.trim();
}

String getQuorumSaslAuthzZnodePath() {
return quorumSaslAuthzZnodePath;
}

void setManualSaslAuthzHosts(String hostsCsv) {
manualSaslAuthzHosts.set(parseAuthzHosts(hostsCsv));
}

void clearManualSaslAuthzHosts() {
manualSaslAuthzHosts.set(Collections.emptySet());
}

// VisibleForTesting
Set<String> getManualSaslAuthzHosts() {
return manualSaslAuthzHosts.get();
}

private static Set<String> parseAuthzHosts(String hostsCsv) {
if (hostsCsv == null) {
return Collections.emptySet();
}
String trimmed = hostsCsv.trim();
if (trimmed.isEmpty()) {
return Collections.emptySet();
}
Set<String> hosts = new HashSet<>();
for (String token : trimmed.split("[,\\s]+")) {
if (!token.isEmpty()) {
hosts.add(token.toLowerCase(Locale.ROOT));
}
}
return Collections.unmodifiableSet(hosts);
}

boolean isQuorumSaslAuthEnabled() {
return quorumSaslEnableAuth;
}
Expand Down Expand Up @@ -2700,6 +2791,8 @@ public static QuorumPeer createFromConfig(QuorumPeerConfig config) throws IOExce
quorumPeer.setQuorumLearnerLoginContext(config.quorumLearnerLoginContext);
}
quorumPeer.setQuorumCnxnThreadsSize(config.quorumCnxnThreadsSize);
quorumPeer.setQuorumSaslAuthzZnodeEnabled(config.quorumSaslAuthzZnodeEnabled);
quorumPeer.setQuorumSaslAuthzZnodePath(config.quorumSaslAuthzZnodePath);

if (config.jvmPauseMonitorToRun) {
quorumPeer.setJvmPauseMonitor(new JvmPauseMonitor(config));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ public class QuorumPeerConfig {
protected String quorumLearnerLoginContext = QuorumAuth.QUORUM_LEARNER_SASL_LOGIN_CONTEXT_DFAULT_VALUE;
protected String quorumServerLoginContext = QuorumAuth.QUORUM_SERVER_SASL_LOGIN_CONTEXT_DFAULT_VALUE;
protected int quorumCnxnThreadsSize;
protected boolean quorumSaslAuthzZnodeEnabled = false;
protected String quorumSaslAuthzZnodePath = "";

// multi address related configs
private boolean multiAddressEnabled = Boolean.parseBoolean(
Expand Down Expand Up @@ -362,6 +364,10 @@ public void parseProperties(Properties zkProp) throws IOException, ConfigExcepti
quorumServerLoginContext = value;
} else if (key.equals(QuorumAuth.QUORUM_KERBEROS_SERVICE_PRINCIPAL)) {
quorumServicePrincipal = value;
} else if (key.equals(QuorumAuth.QUORUM_SASL_AUTHZ_ZNODE_ENABLED)) {
quorumSaslAuthzZnodeEnabled = parseBoolean(key, value);
} else if (key.equals(QuorumAuth.QUORUM_SASL_AUTHZ_ZNODE_PATH)) {
quorumSaslAuthzZnodePath = value;
} else if (key.equals("quorum.cnxn.threads.size")) {
quorumCnxnThreadsSize = Integer.parseInt(value);
} else if (key.equals(JvmPauseMonitor.INFO_THRESHOLD_KEY)) {
Expand Down Expand Up @@ -390,6 +396,13 @@ public void parseProperties(Properties zkProp) throws IOException, ConfigExcepti
}
}

if (quorumSaslAuthzZnodeEnabled && quorumSaslAuthzZnodePath.trim().isEmpty()) {
quorumSaslAuthzZnodePath = QuorumAuth.QUORUM_SASL_AUTHZ_ZNODE_DEFAULT_PATH;
}
if (!quorumSaslAuthzZnodeEnabled && !quorumSaslAuthzZnodePath.trim().isEmpty()) {
quorumSaslAuthzZnodeEnabled = true;
}

if (!quorumEnableSasl && quorumServerRequireSasl) {
throw new IllegalArgumentException(QuorumAuth.QUORUM_SASL_AUTH_ENABLED
+ " is disabled, so cannot enable "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,23 @@

import java.io.IOException;
import java.io.PrintWriter;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BiConsumer;
import java.util.stream.Collectors;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.MultiOperationRecord;
import org.apache.zookeeper.Op;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooDefs.OpCode;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.metrics.MetricsContext;
import org.apache.zookeeper.proto.CreateRequest;
import org.apache.zookeeper.server.DataTree.ProcessTxnResult;
import org.apache.zookeeper.server.Request;
import org.apache.zookeeper.server.RequestRecord;
import org.apache.zookeeper.server.ServerMetrics;
Expand All @@ -46,6 +53,7 @@ public abstract class QuorumZooKeeperServer extends ZooKeeperServer {

public final QuorumPeer self;
protected UpgradeableSessionTracker upgradeableSessionTracker;
private final AtomicBoolean authzAclWarningLogged = new AtomicBoolean(false);

protected QuorumZooKeeperServer(FileTxnSnapLog logFactory, int tickTime, int minSessionTimeout,
int maxSessionTimeout, int listenBacklog, ZKDatabase zkDb, QuorumPeer self) {
Expand All @@ -54,6 +62,12 @@ protected QuorumZooKeeperServer(FileTxnSnapLog logFactory, int tickTime, int min
this.self = self;
}

@Override
public synchronized void startup() {
super.startup();
refreshAuthzHostsFromZnode();
}

@Override
protected void startSessionTracker() {
upgradeableSessionTracker = (UpgradeableSessionTracker) sessionTracker;
Expand Down Expand Up @@ -213,4 +227,122 @@ public void dumpMonitorValues(BiConsumer<String, Object> response) {
response.accept("peer_state", self.getDetailedPeerState());
}

@Override
public ProcessTxnResult processTxn(Request request) {
ProcessTxnResult rc = super.processTxn(request);
maybeRefreshAuthzHostsFromTxn(rc);
return rc;
}

private void maybeRefreshAuthzHostsFromTxn(ProcessTxnResult rc) {
if (rc == null) {
return;
}
if (!self.isQuorumSaslAuthEnabled() || !self.isQuorumSaslAuthzZnodeEnabled()) {
return;
}
String znodePath = self.getQuorumSaslAuthzZnodePath();
if (znodePath == null || znodePath.isEmpty()) {
return;
}

if (rc.multiResult != null) {
for (ProcessTxnResult sub : rc.multiResult) {
if (isAuthzZnodeTxn(sub, znodePath)) {
if (sub.type == OpCode.delete) {
clearAuthzHostsFromZnode();
} else {
refreshAuthzHostsFromZnode();
}
return;
}
}
return;
}

if (isAuthzZnodeTxn(rc, znodePath)) {
LOG.info("Authz znode txn applied: type={}, path={}", rc.type, rc.path);
if (rc.type == OpCode.delete) {
clearAuthzHostsFromZnode();
} else {
refreshAuthzHostsFromZnode();
}
}
}

private static boolean isAuthzZnodeTxn(ProcessTxnResult rc, String znodePath) {
if (rc == null || rc.path == null) {
return false;
}
if (!znodePath.equals(rc.path)) {
return false;
}
return rc.type == OpCode.create
|| rc.type == OpCode.create2
|| rc.type == OpCode.createContainer
|| rc.type == OpCode.createTTL
|| rc.type == OpCode.setData
|| rc.type == OpCode.delete;
}

private void refreshAuthzHostsFromZnode() {
if (!self.isQuorumSaslAuthEnabled() || !self.isQuorumSaslAuthzZnodeEnabled()) {
return;
}
String path = self.getQuorumSaslAuthzZnodePath();
if (path == null || path.isEmpty()) {
return;
}
try {
if (hasWorldWritableAcl(path) && authzAclWarningLogged.compareAndSet(false, true)) {
LOG.warn("Authz znode {} has world-writable ACL. "
+ "This is a security risk: any client can modify the quorum authorization host list. "
+ "Set restrictive ACLs on this znode.", path);
}
byte[] data = getZKDatabase().getDataTree().getData(path, new Stat(), null);
if (data == null) {
LOG.info("Authz znode read returned null data for {}", path);
return;
}
if (data.length == 0) {
LOG.info("Authz znode read returned empty data for {}", path);
self.clearManualSaslAuthzHosts();
} else {
LOG.info("Authz znode read {} bytes for {}", data.length, path);
self.setManualSaslAuthzHosts(new String(data, StandardCharsets.UTF_8));
}
} catch (KeeperException.NoNodeException e) {
LOG.info("Authz znode missing at {}", path);
return;
} catch (Exception e) {
LOG.warn("Failed to refresh quorum SASL authz hosts from znode {}", path, e);
return;
}

self.refreshQuorumSaslAuthzHosts();
}

private boolean hasWorldWritableAcl(String path) {
try {
List<ACL> acls = getZKDatabase().getDataTree().getACL(path, new Stat());
for (ACL acl : acls) {
if (ZooDefs.Ids.ANYONE_ID_UNSAFE.equals(acl.getId())
&& (acl.getPerms() & ZooDefs.Perms.WRITE) != 0) {
return true;
}
}
} catch (KeeperException.NoNodeException e) {
// node doesn't exist yet
}
return false;
}

private void clearAuthzHostsFromZnode() {
if (!self.isQuorumSaslAuthEnabled() || !self.isQuorumSaslAuthzZnodeEnabled()) {
return;
}
self.clearManualSaslAuthzHosts();
self.refreshQuorumSaslAuthzHosts();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ public class QuorumAuth {
public static final String QUORUM_SERVER_SASL_LOGIN_CONTEXT = "quorum.auth.server.saslLoginContext";
public static final String QUORUM_SERVER_SASL_LOGIN_CONTEXT_DFAULT_VALUE = "QuorumServer";

public static final String QUORUM_SASL_AUTHZ_ZNODE_ENABLED = "quorum.auth.sasl.authzZnode.enabled";
public static final String QUORUM_SASL_AUTHZ_ZNODE_PATH = "quorum.auth.sasl.authzZnode.path";
public static final String QUORUM_SASL_AUTHZ_ZNODE_DEFAULT_PATH = "/zookeeper/quorumAuthzHosts";

static final String QUORUM_SERVER_PROTOCOL_NAME = "zookeeper-quorum";
static final String QUORUM_SERVER_SASL_DIGEST = "zk-quorum-sasl-md5";
static final String QUORUM_AUTH_MESSAGE_TAG = "qpconnect";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.Socket;
import java.util.Collections;
import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import javax.security.auth.callback.CallbackHandler;
import javax.security.auth.login.AppConfigurationEntry;
Expand All @@ -47,9 +49,11 @@ public class SaslQuorumAuthServer implements QuorumAuthServer {
private static final int MAX_RETRIES = 5;
private final Login serverLogin;
private final boolean quorumRequireSasl;
private final AtomicReference<Set<String>> currentAuthzHosts;

public SaslQuorumAuthServer(boolean quorumRequireSasl, String loginContext, Set<String> authzHosts) throws SaslException {
this.quorumRequireSasl = quorumRequireSasl;
this.currentAuthzHosts = new AtomicReference<>(authzHosts != null ? authzHosts : Collections.emptySet());
try {
AppConfigurationEntry[] entries = Configuration.getConfiguration().getAppConfigurationEntry(loginContext);
if (entries == null || entries.length == 0) {
Expand All @@ -58,7 +62,7 @@ public SaslQuorumAuthServer(boolean quorumRequireSasl, String loginContext, Set<
loginContext));
}
Supplier<CallbackHandler> callbackSupplier = () -> {
return new SaslQuorumServerCallbackHandler(entries, authzHosts);
return new SaslQuorumServerCallbackHandler(entries, currentAuthzHosts.get());
};
serverLogin = new Login(loginContext, callbackSupplier, new ZKConfig());
serverLogin.startThreadIfNeeded();
Expand All @@ -67,6 +71,12 @@ public SaslQuorumAuthServer(boolean quorumRequireSasl, String loginContext, Set<
}
}

public void updateAuthorizedHosts(Set<String> authzHosts) {
if (authzHosts != null) {
currentAuthzHosts.set(authzHosts);
}
}

@Override
public void authenticate(Socket sock, DataInputStream din) throws SaslException {
DataOutputStream dout = null;
Expand Down
Loading
Loading