Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 28 additions & 21 deletions core/src/main/scala/org/apache/spark/SecurityManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ private[spark] class SecurityManager(
* making UI requests.
*/
def checkAdminPermissions(user: String): Boolean = {
isUserInACL(user, adminAcls, adminAclsGroups)
checkApplicationViewPermissions(user, aclsEnabled(), adminAcls, adminAclsGroups, sparkConf)
}

/**
Expand All @@ -239,7 +239,7 @@ private[spark] class SecurityManager(
def checkUIViewPermissions(user: String): Boolean = {
logDebug("user=" + user + " aclsEnabled=" + aclsEnabled() + " viewAcls=" +
viewAcls.mkString(",") + " viewAclsGroups=" + viewAclsGroups.mkString(","))
isUserInACL(user, viewAcls, viewAclsGroups)
checkApplicationViewPermissions(user, aclsEnabled(), viewAcls, viewAclsGroups, sparkConf)
}

/**
Expand All @@ -255,7 +255,7 @@ private[spark] class SecurityManager(
def checkModifyPermissions(user: String): Boolean = {
logDebug("user=" + user + " aclsEnabled=" + aclsEnabled() + " modifyAcls=" +
modifyAcls.mkString(",") + " modifyAclsGroups=" + modifyAclsGroups.mkString(","))
isUserInACL(user, modifyAcls, modifyAclsGroups)
checkApplicationViewPermissions(user, aclsEnabled(), modifyAcls, modifyAclsGroups, sparkConf)
}

/**
Expand Down Expand Up @@ -371,29 +371,14 @@ private[spark] class SecurityManager(
}
}

private def isUserInACL(
user: String,
aclUsers: Set[String],
aclGroups: Set[String]): Boolean = {
if (user == null ||
!aclsEnabled ||
aclUsers.contains(WILDCARD_ACL) ||
aclUsers.contains(user) ||
aclGroups.contains(WILDCARD_ACL)) {
true
} else {
val userGroups = Utils.getCurrentUserGroups(sparkConf, user)
logDebug(s"user $user is in groups ${userGroups.mkString(",")}")
aclGroups.exists(userGroups.contains(_))
}
}

// Default SecurityManager only has a single secret key, so ignore appId.
override def getSaslUser(appId: String): String = getSaslUser()
override def getSecretKey(appId: String): String = getSecretKey()
}

private[spark] object SecurityManager {
private[spark] object SecurityManager extends Logging {
// allow all users/groups to have view/modify permissions
val WILDCARD_ACL = "*"

val SPARK_AUTH_CONF = NETWORK_AUTH_ENABLED.key
val SPARK_AUTH_SECRET_CONF = AUTH_SECRET.key
Expand All @@ -403,4 +388,26 @@ private[spark] object SecurityManager {

// key used to store the spark secret in the Hadoop UGI
val SECRET_LOOKUP_KEY = new Text("sparkCookie")

def checkApplicationViewPermissions(
user: String,
aclsEnabled: Boolean,
usersAcls: Set[String],
groupAcls: Set[String],
conf: SparkConf): Boolean = {
if (!aclsEnabled || user == null || usersAcls.contains(user) ||
usersAcls.contains(WILDCARD_ACL) || groupAcls.contains(WILDCARD_ACL)) {
return true
}
val currentUserGroups = Utils.getCurrentUserGroups(conf, user)
logDebug("userGroups=" + currentUserGroups.mkString(","))
groupAcls.exists(currentUserGroups.contains)
}

/**
* Split a comma separated String, filter out any empty items, and return a Set of strings
*/
def stringToSet(list: String): Set[String] = {
list.split(',').map(_.trim).filter(!_.isEmpty).toSet
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ private[history] class ApplicationCache(
}
}
try {
val completed = loadedUI.ui.getApplicationInfoList.exists(_.attempts.last.completed)
val completed = loadedUI.ui.getApplicationInfoList(None).exists(_.attempts.last.completed)
if (!completed) {
// incomplete UIs have the cache-check filter put in front of them.
registerFilter(new CacheKey(appId, attemptId), loadedUI)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,16 +97,18 @@ private[history] abstract class ApplicationHistoryProvider {
*
* @return List of all know applications.
*/
def getListing(): Iterator[ApplicationInfo]
def getListing(user: Option[String]): Iterator[ApplicationInfo]

/**
* Returns a list of applications available for the history server to show.
*
* @param user The user try to list
* @param max The maximum number of applications to return
* @param predicate A function that filters the applications to be returned
* @return An iterator of matching applications up to the specified maximum
*/
def getListing(max: Int)(predicate: ApplicationInfo => Boolean): Iterator[ApplicationInfo]
def getListing(user: Option[String], max: Int)
(predicate: ApplicationInfo => Boolean): Iterator[ApplicationInfo]

/**
* Returns the Spark UI for a specific application.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
private val logDir = conf.get(History.HISTORY_LOG_DIR)

private val historyUiAclsEnable = conf.get(History.HISTORY_SERVER_UI_ACLS_ENABLE)
private val historyUiAclsFilterListEnabled = conf.get(HISTORY_SERVER_UI_ACLS_FILTER_LIST)
private val historyUiAdminAcls = conf.get(History.HISTORY_SERVER_UI_ADMIN_ACLS)
private val historyUiAdminAclsGroups = conf.get(History.HISTORY_SERVER_UI_ADMIN_ACLS_GROUPS)
logInfo(s"History server ui acls " + (if (historyUiAclsEnable) "enabled" else "disabled") +
Expand Down Expand Up @@ -294,18 +295,47 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
}
}

override def getListing(): Iterator[ApplicationInfo] = {
// Return the listing in end time descending order.
KVUtils.mapToSeq(listing.view(classOf[ApplicationInfoWrapper])
.index("endTime").reverse())(_.toApplicationInfo()).iterator
override def getListing(user: Option[String]): Iterator[ApplicationInfo] = {
KVUtils.viewToSeq(
listing.view(classOf[ApplicationInfoWrapper]).index("endTime").reverse(),
Int.MaxValue
) { appInfo => isAuthorized(user, appInfo) }
.map(_.toApplicationInfo())
.iterator
}

override def getListing(max: Int)(
predicate: ApplicationInfo => Boolean): Iterator[ApplicationInfo] = {
// Return the filtered listing in end time descending order.
KVUtils.mapToSeqWithFilter(
listing.view(classOf[ApplicationInfoWrapper]).index("endTime").reverse(),
max)(_.toApplicationInfo())(predicate).iterator
override def getListing(user: Option[String], max: Int)(
predicate: ApplicationInfo => Boolean): Iterator[ApplicationInfo] = {
KVUtils.viewToSeq(
listing.view(classOf[ApplicationInfoWrapper]).index("endTime").reverse(),
max
) { appInfo => isAuthorized(user, appInfo) && predicate(appInfo.toApplicationInfo()) }
.map(_.toApplicationInfo())
.iterator
}

/** Returns true if the given user is allowed to view the application. */
private def isAuthorized(user: Option[String], appInfo: ApplicationInfoWrapper): Boolean = {
// If ACL-based list filtering is disabled, show all applications
if (!historyUiAclsFilterListEnabled) {
return true
}

val attempt = appInfo.attempts.last
val usersAcls = Set(attempt.info.sparkUser) ++ SecurityManager.stringToSet(
historyUiAdminAcls.mkString(",") + "," + attempt.adminAcls.getOrElse("") + "," +
attempt.viewAcls.getOrElse(""))
val groupAcls = Set(attempt.info.sparkUser) ++ SecurityManager.stringToSet(
historyUiAdminAclsGroups.mkString(",") + "," +
attempt.adminAclsGroups.getOrElse("") + "," +
attempt.viewAclsGroups.getOrElse(""))
SecurityManager.checkApplicationViewPermissions(
user.orNull,
historyUiAclsEnable,
usersAcls,
groupAcls,
this.conf
)
}

override def getApplicationInfo(appId: String): Option[ApplicationInfo] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
val requestedIncomplete = Option(request.getParameter("showIncomplete"))
.getOrElse("false").toBoolean

val displayApplications = shouldDisplayApplications(requestedIncomplete)
val displayApplications = shouldDisplayApplications(Option(request.getRemoteUser),
requestedIncomplete)
val eventLogsUnderProcessCount = parent.getEventLogsUnderProcess()
val lastUpdatedTime = parent.getLastUpdatedTime()
val providerConfig = parent.getProviderConfig()
Expand Down Expand Up @@ -90,8 +91,9 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
UIUtils.basicSparkPage(request, content, "History Server", true)
}

def shouldDisplayApplications(requestedIncomplete: Boolean): Boolean = {
parent.getApplicationInfoList(1)(isApplicationCompleted(_) != requestedIncomplete).nonEmpty
def shouldDisplayApplications(user: Option[String], requestedIncomplete: Boolean): Boolean = {
parent.getApplicationInfoList(user, 1)(isApplicationCompleted(_) !=
requestedIncomplete).nonEmpty
}

private def makePageLink(request: HttpServletRequest, showIncomplete: Boolean): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ class HistoryServer(
*
* @return List of all known applications.
*/
def getApplicationList(): Iterator[ApplicationInfo] = {
provider.getListing()
def getApplicationList(user: Option[String]): Iterator[ApplicationInfo] = {
provider.getListing(user: Option[String])
}

def getEventLogsUnderProcess(): Int = {
Expand All @@ -216,13 +216,13 @@ class HistoryServer(
provider.getLastUpdatedTime()
}

def getApplicationInfoList: Iterator[ApplicationInfo] = {
getApplicationList()
def getApplicationInfoList(user: Option[String]): Iterator[ApplicationInfo] = {
getApplicationList(user: Option[String])
}

override def getApplicationInfoList(max: Int)(
override def getApplicationInfoList(user: Option[String], max: Int)(
filter: ApplicationInfo => Boolean): Iterator[ApplicationInfo] = {
provider.getListing(max)(filter)
provider.getListing(user, max)(filter)
}

def getApplicationInfo(appId: String): Option[ApplicationInfo] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,12 @@ private[spark] object History {
.booleanConf
.createWithDefault(false)

val HISTORY_SERVER_UI_ACLS_FILTER_LIST = ConfigBuilder("spark.history.ui.acls.filterList")
.doc("Enable filtering of application list based on ACLs.")
.version("3.5.4")
.booleanConf
.createWithDefault(false)

val HISTORY_SERVER_UI_ADMIN_ACLS = ConfigBuilder("spark.history.ui.admin.acls")
.version("2.1.1")
.stringConf
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ private[spark] trait UIRoot {
*/
def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T

def getApplicationInfoList: Iterator[ApplicationInfo]
def getApplicationInfoList(user: Option[String]): Iterator[ApplicationInfo]

def getApplicationInfoList(max: Int)(
def getApplicationInfoList(user: Option[String], max: Int)(
filter: ApplicationInfo => Boolean): Iterator[ApplicationInfo]

def getApplicationInfo(appId: String): Option[ApplicationInfo]
Expand Down Expand Up @@ -125,6 +125,7 @@ private[v1] trait ApiRequestContext {

def uiRoot: UIRoot = UIRootFromServletContext.getUiRoot(servletContext)

def remoteUser: Option[String] = Option(httpRequest.getRemoteUser)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ private[v1] class ApplicationListResource extends ApiRequestContext {
val includeCompleted = status.isEmpty || status.contains(ApplicationStatus.COMPLETED)
val includeRunning = status.isEmpty || status.contains(ApplicationStatus.RUNNING)

uiRoot.getApplicationInfoList(numApps) { app =>
uiRoot.getApplicationInfoList(remoteUser, numApps) { app =>
val anyRunning = app.attempts.isEmpty || !app.attempts.head.completed
// if any attempt is still running, we consider the app to also still be running;
// keep the app if *any* attempts fall in the right time window
Expand Down
8 changes: 4 additions & 4 deletions core/src/main/scala/org/apache/spark/ui/SparkUI.scala
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private[spark] class SparkUI private (
securityManager.checkUIViewPermissions(user)
}

def getApplicationInfoList: Iterator[ApplicationInfo] = {
def getApplicationInfoList(user: Option[String]): Iterator[ApplicationInfo] = {
Iterator(new ApplicationInfo(
id = appId,
name = appName,
Expand All @@ -192,13 +192,13 @@ private[spark] class SparkUI private (
))
}

override def getApplicationInfoList(max: Int)(
override def getApplicationInfoList(user: Option[String], max: Int)(
filter: ApplicationInfo => Boolean): Iterator[ApplicationInfo] = {
getApplicationInfoList.filter(filter).take(max)
getApplicationInfoList(user).filter(filter).take(max)
}

def getApplicationInfo(appId: String): Option[ApplicationInfo] = {
getApplicationInfoList.find(_.id == appId)
getApplicationInfoList(None).find(_.id == appId)
}

def getStreamingJobProgressListener: Option[SparkListener] = streamingJobProgressListener
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ class ApplicationCacheSuite extends SparkFunSuite with MockitoSugar with Matcher
Seq(new AttemptInfo(attemptId, new Date(started), new Date(ended),
new Date(ended), ended - started, "user", completed, org.apache.spark.SPARK_VERSION)))
val ui = mock[SparkUI]
when(ui.getApplicationInfoList).thenReturn(List(info).iterator)
when(ui.getApplicationInfoList(any[Option[String]])).thenReturn(List(info).iterator)
when(ui.getAppName).thenReturn(name)
when(ui.appName).thenReturn(name)
val handler = new ServletContextHandler()
Expand Down
Loading
Loading