Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,10 @@ jobs:
GITHUB_USERNAME: ${{ github.actor }}
SPARK_K8S_TEST_HOST_GATEWAY: 192.168.49.1
run: |
PVC_TMP_DIR=$(mktemp -d)
export PVC_TESTS_HOST_PATH=$PVC_TMP_DIR
export PVC_TESTS_VM_PATH=$PVC_TMP_DIR
minikube mount ${PVC_TESTS_HOST_PATH}:${PVC_TESTS_VM_PATH} --gid=0 --uid=185 &
mkdir -p /tmp/k8s-debug
nohup kubectl get events -A -w \
-o 'custom-columns=TIME:.lastTimestamp,NS:.metadata.namespace,KIND:.involvedObject.kind,NAME:.involvedObject.name,REASON:.reason,MESSAGE:.message' \
Expand All @@ -389,6 +393,7 @@ jobs:
./build/mvn ${{ env.MAVEN_ARGS }} integration-test \
-pl :spark-kubernetes-integration-tests_2.13 \
${{ env.SPARK_PROFILES }} -Pkubernetes-integration-tests \
-Dtest.exclude.tags=local \
-Dspark.kubernetes.test.imageRepo=docker.io/library \
-Dspark.kubernetes.test.imageTag=ci-test \
-Dspark.kubernetes.test.deployMode=minikube \
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/DESCRIPTION
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
Package: SparkR
Type: Package
Version: 3.5.4.4
Version: 3.5.4.4-4.3.0-0
Title: R Front End for 'Apache Spark'
Description: Provides an R Front end for 'Apache Spark' <https://spark.apache.org>.
Authors@R:
Expand Down
4 changes: 4 additions & 0 deletions R/pkg/R/sparkR.R
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,10 @@ sparkR.session <- function(
jvmVersionStrip <- gsub("-SNAPSHOT", "", jvmVersion, fixed = TRUE)
rPackageVersion <- paste0(packageVersion("SparkR"))

# let's compare versions with - replaced by .
jvmVersionStrip <- gsub("-", ".", jvmVersionStrip)
rPackageVersion <- gsub("-", ".", rPackageVersion)

if (jvmVersionStrip != rPackageVersion) {
warning("Version mismatch between Spark JVM and SparkR package. ",
"JVM version was ", jvmVersion,
Expand Down
28 changes: 26 additions & 2 deletions python/pyspark/tests/test_install_spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
# limitations under the License.
#
import os
import re
import tempfile
import unittest
import urllib.request

from pyspark.install import (
get_preferred_mirrors,
install_spark,
DEFAULT_HADOOP,
DEFAULT_HIVE,
Expand All @@ -29,10 +32,31 @@


class SparkInstallationTestCase(unittest.TestCase):
def get_latest_spark_version(self):
if "PYSPARK_RELEASE_MIRROR" in os.environ:
sites = [os.environ["PYSPARK_RELEASE_MIRROR"]]
else:
sites = get_preferred_mirrors()
# Filter out the archive sites
sites = [site for site in sites if "archive.apache.org" not in site]
for site in sites:
url = site + "/spark/"
try:
with urllib.request.urlopen(url) as response:
html = response.read().decode("utf-8")
versions = re.findall(r"spark-(\d+[\.-]\d+[\.-]\d+)/", html)
versions = [v.replace("-", ".") for v in versions]
return max(versions)
except Exception:
continue
return None

def test_install_spark(self):
# Test only one case. Testing this is expensive because it needs to download
# the Spark distribution.
spark_version, hadoop_version, hive_version = checked_versions("3.4.4", "3", "2.3")
# the Spark distribution. We try to get the latest version, but if we can't,
# we just use a hard-coded version.
spark_version = self.get_latest_spark_version() or "4.1.1"
spark_version, hadoop_version, hive_version = checked_versions(spark_version, "3", "2.3")

with tempfile.TemporaryDirectory() as tmp_dir:
install_spark(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.deploy.k8s.integrationtest

import java.io.File
import java.net.URL
import java.nio.charset.StandardCharsets
import java.nio.file.Files

import scala.collection.JavaConverters._
Expand All @@ -26,6 +27,8 @@ import com.amazonaws.auth.BasicAWSCredentials
import com.amazonaws.services.s3.AmazonS3Client
import io.fabric8.kubernetes.api.model._
import io.fabric8.kubernetes.api.model.apps.StatefulSetBuilder
import io.fabric8.kubernetes.api.model.SecretBuilder
import org.apache.commons.codec.binary.Base64
import org.apache.hadoop.util.VersionInfo
import org.scalatest.concurrent.{Eventually, PatienceConfiguration}
import org.scalatest.time.{Minutes, Span}
Expand All @@ -45,6 +48,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
val BUCKET = "spark"
val ACCESS_KEY = "minio"
val SECRET_KEY = "miniostorage"
val ivySecretName = "ivy-secret"

private def getMinioContainer(): Container = {
val envVars = Map (
Expand Down Expand Up @@ -160,6 +164,50 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
.delete()
}

private def setupIvySecret(): Unit = {
val ivySource = new File(sparkHomeDir.resolve("dev/ivysettings.xml").toString)

// Read original file content
val content = new String(Files.readAllBytes(ivySource.toPath), StandardCharsets.UTF_8)

// Fetch GitHub credentials from environment (or system properties / test config)
val githubUser = sys.env.getOrElse("GITHUB_USERNAME",
throw new IllegalStateException("GITHUB_USERNAME env var not set"))
val githubToken = sys.env.getOrElse("GITHUB_TOKEN",
throw new IllegalStateException("GITHUB_TOKEN env var not set"))

// Replace Ivy environment variable references with literal values
val replaced = content
.replace("${env.GITHUB_USERNAME}", githubUser)
.replace("${env.GITHUB_TOKEN}", githubToken)

// Build Secret with the concrete, substituted content
val ivySecret = new SecretBuilder()
.withNewMetadata()
.withName(ivySecretName)
.endMetadata()
.addToData("ivysettings.xml",
Base64.encodeBase64String(replaced.getBytes(StandardCharsets.UTF_8)))
.build()

Eventually.eventually(TIMEOUT, INTERVAL) {
kubernetesTestComponents
.kubernetesClient
.secrets()
.inNamespace(kubernetesTestComponents.namespace)
.create(ivySecret)
}
}

private def deleteIvySecret(): Unit = {
kubernetesTestComponents
.kubernetesClient
.secrets()
.inNamespace(kubernetesTestComponents.namespace)
.withName(ivySecretName)
.delete()
}

test("Launcher client dependencies", k8sTestTag, MinikubeTag) {
tryDepsTest({
val fileName = Utils.createTempFile(FILE_CONTENTS, HOST_PATH)
Expand Down Expand Up @@ -372,6 +420,7 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
.set("spark.jars.packages", packages)
.set("spark.jars.ivySettings", sparkHomeDir.resolve("dev/ivysettings.xml").toString)
.set("spark.kubernetes.driver.secrets."+ivySecretName, sparkHomeDir.resolve("dev").toString)
.set("spark.driver.extraJavaOptions", "-Divy.cache.dir=/tmp -Divy.home=/tmp")
}

Expand All @@ -381,10 +430,12 @@ private[spark] trait DepsTestsSuite { k8sSuite: KubernetesSuite =>
val minioUrlStr = getServiceUrl(svcName)
createS3Bucket(ACCESS_KEY, SECRET_KEY, minioUrlStr)
setCommonSparkConfPropertiesForS3Access(sparkAppConf, minioUrlStr)
setupIvySecret()
runTest
} finally {
// make sure this always runs
deleteMinioStorage()
deleteIvySecret()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ private[spark] trait RTestsSuite { k8sSuite: KubernetesSuite =>
import RTestsSuite._
import KubernetesSuite.{k8sTestTag, rTestTag}

test("Run SparkR on simple dataframe.R example", k8sTestTag, rTestTag) {
// TODO: build R package in kubernetes integration tests
ignore("Run SparkR on simple dataframe.R example", k8sTestTag, rTestTag) {
sparkAppConf.set("spark.kubernetes.container.image", rImage)
runSparkApplicationAndVerifyCompletion(
appResource = SPARK_R_DATAFRAME_TEST,
Expand Down
Loading