From a11c0a138f3a441988b9a6d4e85249fb8377ec87 Mon Sep 17 00:00:00 2001 From: Petr Fedchenkov Date: Mon, 4 May 2026 14:06:51 +0300 Subject: [PATCH 1/2] [SPARK-23782] SHS should list only applications with read permissions Pulled from closed PR https://github.com/apache/spark/pull/20891 Signed-off-by: Petr Fedchenkov --- .../org/apache/spark/SecurityManager.scala | 49 +++++++++++-------- .../deploy/history/ApplicationCache.scala | 2 +- .../history/ApplicationHistoryProvider.scala | 6 ++- .../deploy/history/FsHistoryProvider.scala | 44 +++++++++++++---- .../spark/deploy/history/HistoryPage.scala | 8 +-- .../spark/deploy/history/HistoryServer.scala | 12 ++--- .../spark/status/api/v1/ApiRootResource.scala | 5 +- .../api/v1/ApplicationListResource.scala | 2 +- .../scala/org/apache/spark/ui/SparkUI.scala | 8 +-- .../history/ApplicationCacheSuite.scala | 2 +- .../history/FsHistoryProviderSuite.scala | 30 ++++++------ .../history/HistoryServerPageSuite.scala | 2 +- .../deploy/history/HistoryServerSuite.scala | 48 +++++++++++++++--- 13 files changed, 143 insertions(+), 75 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SecurityManager.scala b/core/src/main/scala/org/apache/spark/SecurityManager.scala index 7e72ae8d89e37..926c95d2c5a08 100644 --- a/core/src/main/scala/org/apache/spark/SecurityManager.scala +++ b/core/src/main/scala/org/apache/spark/SecurityManager.scala @@ -223,7 +223,7 @@ private[spark] class SecurityManager( * making UI requests. */ def checkAdminPermissions(user: String): Boolean = { - isUserInACL(user, adminAcls, adminAclsGroups) + checkApplicationViewPermissions(user, aclsEnabled(), adminAcls, adminAclsGroups, sparkConf) } /** @@ -239,7 +239,7 @@ private[spark] class SecurityManager( def checkUIViewPermissions(user: String): Boolean = { logDebug("user=" + user + " aclsEnabled=" + aclsEnabled() + " viewAcls=" + viewAcls.mkString(",") + " viewAclsGroups=" + viewAclsGroups.mkString(",")) - isUserInACL(user, viewAcls, viewAclsGroups) + checkApplicationViewPermissions(user, aclsEnabled(), viewAcls, viewAclsGroups, sparkConf) } /** @@ -255,7 +255,7 @@ private[spark] class SecurityManager( def checkModifyPermissions(user: String): Boolean = { logDebug("user=" + user + " aclsEnabled=" + aclsEnabled() + " modifyAcls=" + modifyAcls.mkString(",") + " modifyAclsGroups=" + modifyAclsGroups.mkString(",")) - isUserInACL(user, modifyAcls, modifyAclsGroups) + checkApplicationViewPermissions(user, aclsEnabled(), modifyAcls, modifyAclsGroups, sparkConf) } /** @@ -371,29 +371,14 @@ private[spark] class SecurityManager( } } - private def isUserInACL( - user: String, - aclUsers: Set[String], - aclGroups: Set[String]): Boolean = { - if (user == null || - !aclsEnabled || - aclUsers.contains(WILDCARD_ACL) || - aclUsers.contains(user) || - aclGroups.contains(WILDCARD_ACL)) { - true - } else { - val userGroups = Utils.getCurrentUserGroups(sparkConf, user) - logDebug(s"user $user is in groups ${userGroups.mkString(",")}") - aclGroups.exists(userGroups.contains(_)) - } - } - // Default SecurityManager only has a single secret key, so ignore appId. override def getSaslUser(appId: String): String = getSaslUser() override def getSecretKey(appId: String): String = getSecretKey() } -private[spark] object SecurityManager { +private[spark] object SecurityManager extends Logging { + // allow all users/groups to have view/modify permissions + val WILDCARD_ACL = "*" val SPARK_AUTH_CONF = NETWORK_AUTH_ENABLED.key val SPARK_AUTH_SECRET_CONF = AUTH_SECRET.key @@ -403,4 +388,26 @@ private[spark] object SecurityManager { // key used to store the spark secret in the Hadoop UGI val SECRET_LOOKUP_KEY = new Text("sparkCookie") + + def checkApplicationViewPermissions( + user: String, + aclsEnabled: Boolean, + usersAcls: Set[String], + groupAcls: Set[String], + conf: SparkConf): Boolean = { + if (!aclsEnabled || user == null || usersAcls.contains(user) || + usersAcls.contains(WILDCARD_ACL) || groupAcls.contains(WILDCARD_ACL)) { + return true + } + val currentUserGroups = Utils.getCurrentUserGroups(conf, user) + logDebug("userGroups=" + currentUserGroups.mkString(",")) + groupAcls.exists(currentUserGroups.contains) + } + + /** + * Split a comma separated String, filter out any empty items, and return a Set of strings + */ + def stringToSet(list: String): Set[String] = { + list.split(',').map(_.trim).filter(!_.isEmpty).toSet + } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala index 909f5ea937cee..94c15e3c846c7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationCache.scala @@ -173,7 +173,7 @@ private[history] class ApplicationCache( } } try { - val completed = loadedUI.ui.getApplicationInfoList.exists(_.attempts.last.completed) + val completed = loadedUI.ui.getApplicationInfoList(None).exists(_.attempts.last.completed) if (!completed) { // incomplete UIs have the cache-check filter put in front of them. registerFilter(new CacheKey(appId, attemptId), loadedUI) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala index 89f0d12935ce1..22050a96c8022 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala @@ -97,16 +97,18 @@ private[history] abstract class ApplicationHistoryProvider { * * @return List of all know applications. */ - def getListing(): Iterator[ApplicationInfo] + def getListing(user: Option[String]): Iterator[ApplicationInfo] /** * Returns a list of applications available for the history server to show. * + * @param user The user try to list * @param max The maximum number of applications to return * @param predicate A function that filters the applications to be returned * @return An iterator of matching applications up to the specified maximum */ - def getListing(max: Int)(predicate: ApplicationInfo => Boolean): Iterator[ApplicationInfo] + def getListing(user: Option[String], max: Int) + (predicate: ApplicationInfo => Boolean): Iterator[ApplicationInfo] /** * Returns the Spark UI for a specific application. diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index 7554bb7618911..fa394514aa127 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -294,18 +294,42 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) } } - override def getListing(): Iterator[ApplicationInfo] = { - // Return the listing in end time descending order. - KVUtils.mapToSeq(listing.view(classOf[ApplicationInfoWrapper]) - .index("endTime").reverse())(_.toApplicationInfo()).iterator + override def getListing(user: Option[String]): Iterator[ApplicationInfo] = { + KVUtils.viewToSeq( + listing.view(classOf[ApplicationInfoWrapper]).index("endTime").reverse(), + Int.MaxValue + ) { appInfo => isAuthorized(user, appInfo) } + .map(_.toApplicationInfo()) + .iterator } - override def getListing(max: Int)( - predicate: ApplicationInfo => Boolean): Iterator[ApplicationInfo] = { - // Return the filtered listing in end time descending order. - KVUtils.mapToSeqWithFilter( - listing.view(classOf[ApplicationInfoWrapper]).index("endTime").reverse(), - max)(_.toApplicationInfo())(predicate).iterator + override def getListing(user: Option[String], max: Int)( + predicate: ApplicationInfo => Boolean): Iterator[ApplicationInfo] = { + KVUtils.viewToSeq( + listing.view(classOf[ApplicationInfoWrapper]).index("endTime").reverse(), + max + ) { appInfo => isAuthorized(user, appInfo) && predicate(appInfo.toApplicationInfo()) } + .map(_.toApplicationInfo()) + .iterator + } + + /** Returns true if the given user is allowed to view the application. */ + private def isAuthorized(user: Option[String], appInfo: ApplicationInfoWrapper): Boolean = { + val attempt = appInfo.attempts.last + val usersAcls = Set(attempt.info.sparkUser) ++ SecurityManager.stringToSet( + historyUiAdminAcls.mkString(",") + "," + attempt.adminAcls.getOrElse("") + "," + + attempt.viewAcls.getOrElse("")) + val groupAcls = Set(attempt.info.sparkUser) ++ SecurityManager.stringToSet( + historyUiAdminAclsGroups.mkString(",") + "," + + attempt.adminAclsGroups.getOrElse("") + "," + + attempt.viewAclsGroups.getOrElse("")) + SecurityManager.checkApplicationViewPermissions( + user.orNull, + historyUiAclsEnable, + usersAcls, + groupAcls, + this.conf + ) } override def getApplicationInfo(appId: String): Option[ApplicationInfo] = { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala index dd4921207c961..c3cda81eb0319 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala @@ -30,7 +30,8 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") val requestedIncomplete = Option(request.getParameter("showIncomplete")) .getOrElse("false").toBoolean - val displayApplications = shouldDisplayApplications(requestedIncomplete) + val displayApplications = shouldDisplayApplications(Option(request.getRemoteUser), + requestedIncomplete) val eventLogsUnderProcessCount = parent.getEventLogsUnderProcess() val lastUpdatedTime = parent.getLastUpdatedTime() val providerConfig = parent.getProviderConfig() @@ -90,8 +91,9 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("") UIUtils.basicSparkPage(request, content, "History Server", true) } - def shouldDisplayApplications(requestedIncomplete: Boolean): Boolean = { - parent.getApplicationInfoList(1)(isApplicationCompleted(_) != requestedIncomplete).nonEmpty + def shouldDisplayApplications(user: Option[String], requestedIncomplete: Boolean): Boolean = { + parent.getApplicationInfoList(user, 1)(isApplicationCompleted(_) != + requestedIncomplete).nonEmpty } private def makePageLink(request: HttpServletRequest, showIncomplete: Boolean): String = { diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 97425ba2339fd..58add02ae56af 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -204,8 +204,8 @@ class HistoryServer( * * @return List of all known applications. */ - def getApplicationList(): Iterator[ApplicationInfo] = { - provider.getListing() + def getApplicationList(user: Option[String]): Iterator[ApplicationInfo] = { + provider.getListing(user: Option[String]) } def getEventLogsUnderProcess(): Int = { @@ -216,13 +216,13 @@ class HistoryServer( provider.getLastUpdatedTime() } - def getApplicationInfoList: Iterator[ApplicationInfo] = { - getApplicationList() + def getApplicationInfoList(user: Option[String]): Iterator[ApplicationInfo] = { + getApplicationList(user: Option[String]) } - override def getApplicationInfoList(max: Int)( + override def getApplicationInfoList(user: Option[String], max: Int)( filter: ApplicationInfo => Boolean): Iterator[ApplicationInfo] = { - provider.getListing(max)(filter) + provider.getListing(user, max)(filter) } def getApplicationInfo(appId: String): Option[ApplicationInfo] = { diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala index 44db9f2eec53e..a71ac2dc33442 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApiRootResource.scala @@ -81,9 +81,9 @@ private[spark] trait UIRoot { */ def withSparkUI[T](appId: String, attemptId: Option[String])(fn: SparkUI => T): T - def getApplicationInfoList: Iterator[ApplicationInfo] + def getApplicationInfoList(user: Option[String]): Iterator[ApplicationInfo] - def getApplicationInfoList(max: Int)( + def getApplicationInfoList(user: Option[String], max: Int)( filter: ApplicationInfo => Boolean): Iterator[ApplicationInfo] def getApplicationInfo(appId: String): Option[ApplicationInfo] @@ -125,6 +125,7 @@ private[v1] trait ApiRequestContext { def uiRoot: UIRoot = UIRootFromServletContext.getUiRoot(servletContext) + def remoteUser: Option[String] = Option(httpRequest.getRemoteUser) } /** diff --git a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala index 04a25a818c3c6..dbd3ee63aff3a 100644 --- a/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala +++ b/core/src/main/scala/org/apache/spark/status/api/v1/ApplicationListResource.scala @@ -37,7 +37,7 @@ private[v1] class ApplicationListResource extends ApiRequestContext { val includeCompleted = status.isEmpty || status.contains(ApplicationStatus.COMPLETED) val includeRunning = status.isEmpty || status.contains(ApplicationStatus.RUNNING) - uiRoot.getApplicationInfoList(numApps) { app => + uiRoot.getApplicationInfoList(remoteUser, numApps) { app => val anyRunning = app.attempts.isEmpty || !app.attempts.head.completed // if any attempt is still running, we consider the app to also still be running; // keep the app if *any* attempts fall in the right time window diff --git a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala index 4cbedc057c16b..d593003daeddb 100644 --- a/core/src/main/scala/org/apache/spark/ui/SparkUI.scala +++ b/core/src/main/scala/org/apache/spark/ui/SparkUI.scala @@ -171,7 +171,7 @@ private[spark] class SparkUI private ( securityManager.checkUIViewPermissions(user) } - def getApplicationInfoList: Iterator[ApplicationInfo] = { + def getApplicationInfoList(user: Option[String]): Iterator[ApplicationInfo] = { Iterator(new ApplicationInfo( id = appId, name = appName, @@ -192,13 +192,13 @@ private[spark] class SparkUI private ( )) } - override def getApplicationInfoList(max: Int)( + override def getApplicationInfoList(user: Option[String], max: Int)( filter: ApplicationInfo => Boolean): Iterator[ApplicationInfo] = { - getApplicationInfoList.filter(filter).take(max) + getApplicationInfoList(user).filter(filter).take(max) } def getApplicationInfo(appId: String): Option[ApplicationInfo] = { - getApplicationInfoList.find(_.id == appId) + getApplicationInfoList(None).find(_.id == appId) } def getStreamingJobProgressListener: Option[SparkListener] = streamingJobProgressListener diff --git a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala index 25d668ad75ccf..f108aaf0b7112 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/ApplicationCacheSuite.scala @@ -132,7 +132,7 @@ class ApplicationCacheSuite extends SparkFunSuite with MockitoSugar with Matcher Seq(new AttemptInfo(attemptId, new Date(started), new Date(ended), new Date(ended), ended - started, "user", completed, org.apache.spark.SPARK_VERSION))) val ui = mock[SparkUI] - when(ui.getApplicationInfoList).thenReturn(List(info).iterator) + when(ui.getApplicationInfoList(any[Option[String]])).thenReturn(List(info).iterator) when(ui.getAppName).thenReturn(name) when(ui.appName).thenReturn(name) val handler = new ServletContextHandler() diff --git a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala index 893f108335763..673c550b8e0a9 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/FsHistoryProviderSuite.scala @@ -303,7 +303,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P oldLog.mkdir() provider.checkForLogs() - val appListAfterRename = provider.getListing() + val appListAfterRename = provider.getListing(None) appListAfterRename.size should be (1) } @@ -1556,7 +1556,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P provider.checkForLogs() provider.cleanLogs() assert(dir.listFiles().size === 1) - assert(provider.getListing.length === 1) + assert(provider.getListing(None).length === 1) // Manually delete the appstatus file to make an invalid rolling event log val appStatusPath = RollingEventLogFilesWriter.getAppStatusFilePath(new Path(writer.logPath), @@ -1564,7 +1564,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P fs.delete(appStatusPath, false) provider.checkForLogs() provider.cleanLogs() - assert(provider.getListing.length === 0) + assert(provider.getListing(None).length === 0) // Create a new application val writer2 = new RollingEventLogFilesWriter("app2", None, dir.toURI, conf, hadoopConf) @@ -1576,14 +1576,14 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P // Both folders exist but only one application found provider.checkForLogs() provider.cleanLogs() - assert(provider.getListing.length === 1) + assert(provider.getListing(None).length === 1) assert(dir.listFiles().size === 2) // Make sure a new provider sees the valid application provider.stop() val newProvider = new FsHistoryProvider(conf) newProvider.checkForLogs() - assert(newProvider.getListing.length === 1) + assert(newProvider.getListing(None).length === 1) } } @@ -1613,10 +1613,10 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P // The 1st checkForLogs should scan/update app2 only since it is newer than app1 provider.checkForLogs() - assert(provider.getListing.length === 1) + assert(provider.getListing(None).length === 1) assert(dir.listFiles().size === 2) - assert(provider.getListing().map(e => e.id).contains("app2")) - assert(!provider.getListing().map(e => e.id).contains("app1")) + assert(provider.getListing(None).map(e => e.id).contains("app2")) + assert(!provider.getListing(None).map(e => e.id).contains("app1")) // Create 3rd application val writer3 = new RollingEventLogFilesWriter("app3", None, dir.toURI, conf, hadoopConf) @@ -1628,10 +1628,10 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P // The 2nd checkForLogs should scan/update app3 only since it is newer than app1 provider.checkForLogs() - assert(provider.getListing.length === 2) + assert(provider.getListing(None).length === 2) assert(dir.listFiles().size === 3) - assert(provider.getListing().map(e => e.id).contains("app3")) - assert(!provider.getListing().map(e => e.id).contains("app1")) + assert(provider.getListing(None).map(e => e.id).contains("app3")) + assert(!provider.getListing(None).map(e => e.id).contains("app1")) provider.stop() } @@ -1655,7 +1655,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P provider.checkForLogs() provider.cleanLogs() assert(dir.listFiles().size === 1) - assert(provider.getListing.length === 1) + assert(provider.getListing(None).length === 1) // Manually delete event log files and create event log file reader val eventLogDir = dir.listFiles().head @@ -1759,7 +1759,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P ) log3.setLastModified(0L) - provider.getListing().size should be (0) + provider.getListing(None).size should be (0) // Move the clock forward so log1 and log3 exceed the max age. clock.advance(maxAge) @@ -1767,7 +1767,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P provider.checkForLogs() provider.doMergeApplicationListingCall should be (1) - provider.getListing().size should be (1) + provider.getListing(None).size should be (1) assert(!log1.exists()) assert(!log3.exists()) @@ -1786,7 +1786,7 @@ abstract class FsHistoryProviderSuite extends SparkFunSuite with Matchers with P (checkFn: Seq[ApplicationInfo] => Unit): Unit = { provider.checkForLogs() provider.cleanLogs() - checkFn(provider.getListing().toSeq) + checkFn(provider.getListing(None).toSeq) } private def writeFile(file: File, codec: Option[CompressionCodec], diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerPageSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerPageSuite.scala index f6ef4f7b4f61a..03848ed911476 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerPageSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerPageSuite.scala @@ -91,7 +91,7 @@ class HistoryServerPageSuite extends SparkFunSuite with BeforeAndAfter { val page = new HistoryPage(server.get) Seq(true, false).foreach { requestedIncomplete => val apiResponse = callApplicationsAPI(requestedIncomplete) - if (page.shouldDisplayApplications(requestedIncomplete)) { + if (page.shouldDisplayApplications(None, requestedIncomplete)) { assert(apiResponse.nonEmpty) } else { assert(apiResponse.isEmpty) diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index 6322661f4afd2..b7ce74e3e9429 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -541,7 +541,7 @@ abstract class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with assert(4 === getNumJobsRestful(), s"two jobs back-to-back not updated, server=$server\n") } val jobcount = getNumJobs("/jobs") - assert(!isApplicationCompleted(provider.getListing().next)) + assert(!isApplicationCompleted(provider.getListing(None).next)) listApplications(false) should contain(appId) @@ -549,7 +549,7 @@ abstract class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with resetSparkContext() // check the app is now found as completed eventually(stdTimeout, stdInterval) { - assert(isApplicationCompleted(provider.getListing().next), + assert(isApplicationCompleted(provider.getListing(None).next), s"application never completed, server=$server\n") } @@ -602,6 +602,28 @@ abstract class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with } } + test("show only applications which the users has the permission to read") { + val owner = "irashid" + val admin = "admin" + val other = "sam" + + stop() + init( + "spark.ui.filters" -> classOf[FakeAuthFilter].getName(), + "spark.history.ui.acls.enable" -> "true", + "spark.history.ui.admin.acls" -> admin) + Seq((owner, 6), (admin, 10), (other, 0)).foreach { case (user, expectedApplicationsNum) => + val (_, response, _) = getContentAndCode("applications", server.boundPort, + Seq(FakeAuthFilter.FAKE_HTTP_USER -> user)) + assert(response.isDefined) + parse(response.get) match { + case apps: JArray => + assert(apps.children.size == expectedApplicationsNum) + case _ => fail() + } + } + } + test("SPARK-33215: speed up event log download by skipping UI rebuild") { val appId = "local-1430917381535" @@ -663,8 +685,11 @@ abstract class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with } } - def getContentAndCode(path: String, port: Int = port): (Int, Option[String], Option[String]) = { - HistoryServerSuite.getContentAndCode(new URL(s"http://$localhost:$port/api/v1/$path")) + def getContentAndCode( + path: String, + port: Int = port, + headers: Seq[(String, String)] = Nil): (Int, Option[String], Option[String]) = { + HistoryServerSuite.getContentAndCode(new URL(s"http://localhost:$port/api/v1/$path"), headers) } def getUrl(path: String): String = { @@ -729,15 +754,22 @@ object HistoryServerSuite { } } - def getContentAndCode(url: URL): (Int, Option[String], Option[String]) = { - val (code, in, errString) = connectAndGetInputStream(url) - val inString = in.map(IOUtils.toString(_, StandardCharsets.UTF_8)) + def getContentAndCode( + url: URL, + headers: Seq[(String, String)] = Nil): (Int, Option[String], Option[String]) = { + val (code, in, errString) = connectAndGetInputStream(url, headers) + val inString = in.map(IOUtils.toString) (code, inString, errString) } - def connectAndGetInputStream(url: URL): (Int, Option[InputStream], Option[String]) = { + def connectAndGetInputStream( + url: URL, + headers: Seq[(String, String)] = Nil): (Int, Option[InputStream], Option[String]) = { val connection = url.openConnection().asInstanceOf[HttpURLConnection] connection.setRequestMethod("GET") + headers.foreach { case (key, value) => + connection.addRequestProperty(key, value) + } connection.connect() val code = connection.getResponseCode() val inStream = try { From 11c4660e32b873e61080fd89843683d946b867fc Mon Sep 17 00:00:00 2001 From: Petr Fedchenkov Date: Mon, 4 May 2026 16:31:23 +0300 Subject: [PATCH 2/2] [SPARK-23782] spark.history.ui.acls.filterList flag to not filter apps Signed-off-by: Petr Fedchenkov --- .../deploy/history/FsHistoryProvider.scala | 6 +++++ .../spark/internal/config/History.scala | 6 +++++ .../deploy/history/HistoryServerSuite.scala | 25 ++++++++++++++++++- 3 files changed, 36 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala index fa394514aa127..0b110f065d13e 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala @@ -103,6 +103,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) private val logDir = conf.get(History.HISTORY_LOG_DIR) private val historyUiAclsEnable = conf.get(History.HISTORY_SERVER_UI_ACLS_ENABLE) + private val historyUiAclsFilterListEnabled = conf.get(HISTORY_SERVER_UI_ACLS_FILTER_LIST) private val historyUiAdminAcls = conf.get(History.HISTORY_SERVER_UI_ADMIN_ACLS) private val historyUiAdminAclsGroups = conf.get(History.HISTORY_SERVER_UI_ADMIN_ACLS_GROUPS) logInfo(s"History server ui acls " + (if (historyUiAclsEnable) "enabled" else "disabled") + @@ -315,6 +316,11 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock) /** Returns true if the given user is allowed to view the application. */ private def isAuthorized(user: Option[String], appInfo: ApplicationInfoWrapper): Boolean = { + // If ACL-based list filtering is disabled, show all applications + if (!historyUiAclsFilterListEnabled) { + return true + } + val attempt = appInfo.attempts.last val usersAcls = Set(attempt.info.sparkUser) ++ SecurityManager.stringToSet( historyUiAdminAcls.mkString(",") + "," + attempt.adminAcls.getOrElse("") + "," + diff --git a/core/src/main/scala/org/apache/spark/internal/config/History.scala b/core/src/main/scala/org/apache/spark/internal/config/History.scala index 7101318eb5672..51bf2625db52d 100644 --- a/core/src/main/scala/org/apache/spark/internal/config/History.scala +++ b/core/src/main/scala/org/apache/spark/internal/config/History.scala @@ -160,6 +160,12 @@ private[spark] object History { .booleanConf .createWithDefault(false) + val HISTORY_SERVER_UI_ACLS_FILTER_LIST = ConfigBuilder("spark.history.ui.acls.filterList") + .doc("Enable filtering of application list based on ACLs.") + .version("3.5.4") + .booleanConf + .createWithDefault(false) + val HISTORY_SERVER_UI_ADMIN_ACLS = ConfigBuilder("spark.history.ui.admin.acls") .version("2.1.1") .stringConf diff --git a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala index b7ce74e3e9429..f99f4ffc149d0 100644 --- a/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/history/HistoryServerSuite.scala @@ -611,8 +611,31 @@ abstract class HistoryServerSuite extends SparkFunSuite with BeforeAndAfter with init( "spark.ui.filters" -> classOf[FakeAuthFilter].getName(), "spark.history.ui.acls.enable" -> "true", + "spark.history.ui.acls.filterList" -> "true", "spark.history.ui.admin.acls" -> admin) - Seq((owner, 6), (admin, 10), (other, 0)).foreach { case (user, expectedApplicationsNum) => + Seq((owner, 7), (admin, 16), (other, 1)).foreach { case (user, expectedApplicationsNum) => + val (_, response, _) = getContentAndCode("applications", server.boundPort, + Seq(FakeAuthFilter.FAKE_HTTP_USER -> user)) + assert(response.isDefined) + parse(response.get) match { + case apps: JArray => + assert(apps.children.size == expectedApplicationsNum) + case _ => fail() + } + } + } + + test("check that all applications in list if no spark.history.ui.acls.filterList set") { + val owner = "irashid" + val admin = "admin" + val other = "sam" + + stop() + init( + "spark.ui.filters" -> classOf[FakeAuthFilter].getName(), + "spark.history.ui.acls.enable" -> "true", + "spark.history.ui.admin.acls" -> admin) + Seq((owner, 16), (admin, 16), (other, 16)).foreach { case (user, expectedApplicationsNum) => val (_, response, _) = getContentAndCode("applications", server.boundPort, Seq(FakeAuthFilter.FAKE_HTTP_USER -> user)) assert(response.isDefined)