From 34ae287ff28ce09e96c9d506889017b2c0bbfeb7 Mon Sep 17 00:00:00 2001 From: Guillaume LECLERC Date: Wed, 15 Apr 2026 15:47:47 +0200 Subject: [PATCH] Add cross build capabilities for several spark version --- .github/workflows/ci.yml | 39 ++- .github/workflows/publish.yml | 87 +++++- BUILDING.md | 153 +++++++++++ README.md | 33 ++- build.sbt | 155 +++++++---- .../dataio/config/ConfigNodeCollection.scala | 4 +- .../fields/DropDuplicatesConfigurator.scala | 2 +- .../fields/PartitionByConfigurator.scala | 2 +- .../SortWithinPartitionsConfigurator.scala | 2 +- .../dataio/core/InstantiationHelper.scala | 47 ++-- .../com/amadeus/dataio/core/time/Time.scala | 254 +++++++++--------- .../dataio/pipes/elk/ElkConfigurator.scala | 40 ++- .../dataio/pipes/elk/batch/ElkOutput.scala | 59 ++-- .../pipes/elk/streaming/ElkOutput.scala | 84 +++--- .../pipes/elk/ElkConfiguratorTest.scala | 6 +- .../pipes/elk/batch/ElkOutputTest.scala | 42 +-- .../pipes/elk/streaming/ElkOutputTest.scala | 150 +++++------ .../pipes/kafka/streaming/KafkaOutput.scala | 26 +- project/SparkProfiles.scala | 100 +++++++ .../testutils/JavaImplicitConverters.scala | 35 ++- .../testutils/SparkStreamingSuite.scala | 55 ++-- .../amadeus/dataio/testutils/SparkSuite.scala | 4 - version.sbt | 2 +- 23 files changed, 885 insertions(+), 496 deletions(-) create mode 100644 BUILDING.md create mode 100644 project/SparkProfiles.scala diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cd0bd25..fbcfccb 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -16,29 +16,52 @@ on: jobs: build: runs-on: ubuntu-latest + + strategy: + fail-fast: false + matrix: + include: + - spark_profile: spark34 + java_version: "11" + label: "Spark 3.4 / Java 11" + - spark_profile: spark35 + java_version: "11" + label: "Spark 3.5 / Java 11" + - spark_profile: spark40 + java_version: "17" + label: "Spark 4.0 / Java 17" + + name: "Build (${{ matrix.label }})" + + env: + SPARK_PROFILE: ${{ matrix.spark_profile }} + steps: - uses: actions/checkout@v4 - - uses: actions/setup-java@v3 + - uses: actions/setup-java@v4 with: distribution: "temurin" - java-version: 8 + java-version: ${{ matrix.java_version }} cache: "sbt" - uses: sbt/setup-sbt@v1 with: sbt-runner-version: 1.10.11 - - run: sbt compile + - name: Compile + run: sbt compile - - run: sbt test + - name: Test + run: sbt test - - run: sbt package + - name: Package + run: sbt package - - run: tar cf artefacts.tar target/ */target/ + - name: Bundle artifacts + run: tar cf artefacts.tar target/ */target/ - uses: actions/upload-artifact@v4 with: - name: Artefacts + name: Artefacts-${{ matrix.spark_profile }} path: artefacts.tar - diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml index 640442d..3fb73d3 100644 --- a/.github/workflows/publish.yml +++ b/.github/workflows/publish.yml @@ -11,8 +11,13 @@ on: - MINOR - MAJOR +run-name: ${{ format('Publishing new version - {0}', inputs.release_type) }} + jobs: - publish: + # ──────────────────────────────────────────────────────────────────── + # Phase 1: Create release tag (runs once, on default Spark profile) + # ──────────────────────────────────────────────────────────────────── + release-tag: runs-on: ubuntu-latest permissions: contents: write @@ -22,6 +27,10 @@ jobs: env: GITHUB_REGISTRY_TOKEN: ${{ secrets.GITHUB_TOKEN }} RELEASE_TYPE: ${{ inputs.release_type }} + SPARK_PROFILE: spark35 + outputs: + release_tag: ${{ steps.get_tag.outputs.tag }} + release_base_version: ${{ steps.extract_version.outputs.base_version }} steps: - uses: actions/checkout@v4 @@ -30,9 +39,9 @@ jobs: git config user.name "github-actions[bot]" git config user.email "41898282+github-actions[bot]@users.noreply.github.com" - - uses: actions/setup-java@v3 + - uses: actions/setup-java@v4 with: - java-version: 8 + java-version: 11 distribution: "temurin" cache: "sbt" @@ -40,7 +49,75 @@ jobs: with: sbt-runner-version: 1.10.11 - - name: Run SBT release + - name: Run SBT release (tag + publish default profile) run: sbt 'release with-defaults' -run-name: ${{ format('Publishing new version - {0}', inputs.release_type) }} \ No newline at end of file + - name: Extract release tag + id: get_tag + run: echo "tag=$(git describe --tags --abbrev=0)" >> "$GITHUB_OUTPUT" + + - name: Extract base version (without Spark suffix) + id: extract_version + run: | + # Tag format is e.g. "v1.2.0-spark3.5.3" — extract "1.2.0" + TAG=$(git describe --tags --abbrev=0) + BASE_VERSION=$(echo "$TAG" | sed 's/^v//' | sed 's/-spark.*//') + echo "base_version=$BASE_VERSION" >> "$GITHUB_OUTPUT" + + # ──────────────────────────────────────────────────────────────────── + # Phase 2: Publish all other Spark profiles from the release tag + # ──────────────────────────────────────────────────────────────────── + publish-profiles: + needs: release-tag + runs-on: ubuntu-latest + permissions: + contents: read + packages: write + attestations: write + id-token: write + + strategy: + fail-fast: false + matrix: + include: + - spark_profile: spark34 + java_version: "11" + label: "Spark 3.4" + - spark_profile: spark35 + java_version: "11" + label: "Spark 3.5" + - spark_profile: spark40 + java_version: "17" + label: "Spark 4.0" + + name: "Publish (${{ matrix.label }})" + + env: + GITHUB_REGISTRY_TOKEN: ${{ secrets.GITHUB_TOKEN }} + SPARK_PROFILE: ${{ matrix.spark_profile }} + + steps: + - uses: actions/checkout@v4 + with: + ref: ${{ needs.release-tag.outputs.release_tag }} + + - uses: actions/setup-java@v4 + with: + java-version: ${{ matrix.java_version }} + distribution: "temurin" + cache: "sbt" + + - uses: sbt/setup-sbt@v1 + with: + sbt-runner-version: 1.10.11 + + - name: Set version for this Spark profile + env: + BASE_VERSION: ${{ needs.release-tag.outputs.release_base_version }} + run: | + # sbt-release wrote a hardcoded version at the tag (e.g. 1.2.0-spark3.5.3). + # We need to re-derive it for this profile's Spark version. + echo "ThisBuild / version := s\"${BASE_VERSION}-spark\${SparkProfiles.active.sparkVersion}\"" > version.sbt + + - name: Publish JARs for ${{ matrix.label }} + run: sbt publish diff --git a/BUILDING.md b/BUILDING.md new file mode 100644 index 0000000..4fcf332 --- /dev/null +++ b/BUILDING.md @@ -0,0 +1,153 @@ +# Building Data I/O + +This document describes how the multi-Spark-version build and release system works. + +## Architecture Overview + +Data I/O publishes artifacts for multiple Apache Spark versions from a **single codebase**. +All version-coupled values are defined in one place: + +``` +project/SparkProfiles.scala ← Single source of truth for ALL versioned dependencies +build.sbt ← References SparkProfiles.active (no hardcoded versions) +version.sbt ← Derives the Spark tag dynamically from the active profile +``` + +### How It Works + +1. **`project/SparkProfiles.scala`** defines a `SparkProfile` case class containing: + - Spark version (e.g., `3.5.3`) + - Scala version (e.g., `2.12.15`) + - Java target (e.g., `11`) + - All connector library versions (Snowflake, Elasticsearch, Embedded Kafka, etc.) + - Feature flags (`supportsSnowflake`, `supportsElasticsearch`) + +2. **Profile selection** is driven by the `SPARK_PROFILE` environment variable: + ```bash + SPARK_PROFILE=spark34 sbt compile + ``` + If unset, it defaults to `spark35`. + +3. **`build.sbt`** reads `SparkProfiles.active` and uses it for all `scalaVersion`, + `javacOptions`, and `libraryDependencies` settings. Connector modules (Snowflake, + Elasticsearch) are conditionally included based on profile feature flags. + +4. **`version.sbt`** embeds the Spark version in the artifact version string: + ``` + 1.1.1-spark3.5.3-SNAPSHOT + ``` + +## Supported Profiles + +| Profile | Spark | Scala | Java | Snowflake | Elasticsearch | +|------------|-------|---------|------|-------------------|------------------| +| `spark34` | 3.4.4 | 2.12.15 | 11 | 2.16.0-spark_3.4 | 8.17.4 | +| `spark35` | 3.5.3 | 2.12.15 | 11 | 3.1.1 | 8.17.4 | +| `spark40` | 4.0.2 | 2.13.17 | 17 | — | — | + +## Local Development + +```bash +# Default profile (Spark 3.5) +sbt compile test package + +# Specific profile +SPARK_PROFILE=spark35 sbt compile test + +# Build all profiles locally (shell loop) +for profile in spark34 spark35 spark40; do + echo "=== Building $profile ===" + SPARK_PROFILE=$profile sbt clean compile test +done +``` + +## CI Pipeline + +The GitHub Actions CI workflow (`.github/workflows/ci.yml`) uses a **matrix strategy** +to build, test, and package all supported Spark profiles in parallel: + +```yaml +strategy: + matrix: + include: + - spark_profile: spark34 + java_version: "11" + - spark_profile: spark35 + java_version: "11" + - spark_profile: spark40 + java_version: "17" +``` + +Each matrix leg runs with the appropriate Java version and `SPARK_PROFILE` env var. +Artifacts are uploaded with profile-specific names (e.g., `Artefacts-spark34`). + +## Release Process + +The publish workflow (`.github/workflows/publish.yml`) uses a **two-phase approach**: + +### Phase 1: Release Tag (runs once) +1. Runs `sbt 'release with-defaults'` with the default Spark profile (`spark35`) +2. This creates the git tag, commits version bumps, and publishes the Spark 3.5 artifacts +3. The release tag is captured as an output for Phase 2 + +### Phase 2: Publish Other Profiles (matrix, runs in parallel) +1. Checks out the release tag from Phase 1 +2. Regenerates `version.sbt` with the correct Spark-tagged version for the profile + (needed because sbt-release hardcodes the version string at the tag commit) +3. For each additional profile (`spark34`, `spark40`, ...), runs `sbt publish` +4. Each profile publishes artifacts with its own Spark-tagged version string + +### Artifact Naming + +Published artifacts follow this convention: +``` +com.amadeus.dataio:dataio-core_2.12:1.2.0-spark4.0.2 +com.amadeus.dataio:dataio-core_2.12:1.2.0-spark3.5.3 +com.amadeus.dataio:dataio-core_2.12:1.2.0-spark3.4.4 +``` + +## Adding a New Spark Version + +To add support for a new Spark version: + +1. **Edit `project/SparkProfiles.scala`** — add a new entry to the `profiles` map: + ```scala + "spark41" -> SparkProfile( + sparkVersion = "4.1.1", + scalaVersion = "2.13.18", + javaTarget = "17", + sparkSnowflakeVersion = Some("x.y.z"), + elasticsearchSparkVersion = Some("8.x.y"), + embeddedKafkaVersion = "3.6.0" + ) + ``` + +2. **Update CI matrix** — add the profile to `.github/workflows/ci.yml`: + ```yaml + - spark_profile: spark41 + java_version: "17" + label: "Spark 4.0 / Java 17" + ``` + +3. **Update Publish matrix** — add to `.github/workflows/publish.yml` + +4. **Update README.md** — add badge and compatibility table entry + +That's it. No changes to `build.sbt` or any source code are needed. + +## Removing a Spark Version + +1. Remove the entry from `project/SparkProfiles.scala` +2. Remove from CI and Publish workflow matrices +3. Update README + +## Connector Availability + +Some connectors may not support all Spark versions (e.g., Spark 4.0). The `SparkProfile` +uses `Option[String]` for connector versions: +- `Some("x.y.z")` → connector is included in the build and published +- `None` → connector module is excluded from aggregation and `publish/skip := true` + +This is handled automatically — no manual module toggling needed. + + diff --git a/README.md b/README.md index 2884a1a..33ddeef 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,8 @@ # Data I/O [![License](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) -[![Spark](https://img.shields.io/badge/Spark-3.5.0-blue)](https://spark.apache.org/releases/spark-release-3-4-1.html) +[![Spark 3.4](https://img.shields.io/badge/Spark-3.4.4-blue)](https://spark.apache.org/releases/spark-release-3-4-4.html) +[![Spark 3.5](https://img.shields.io/badge/Spark-3.5.3-blue)](https://spark.apache.org/releases/spark-release-3-5-3.html) [![Scala](https://img.shields.io/badge/Scala-2.12.15-red)](https://www.scala-lang.org/) [![PRs Welcome](https://img.shields.io/badge/PRs-welcome-brightgreen.svg)][contributing] @@ -14,6 +15,36 @@ Data I/O is an open source project that provides a flexible and scalable framewo - Support for batch and streaming data processing - Extensible architecture for custom data processors and pipelines - Scalable and fault-tolerant processing using Apache Spark +- **Multi-Spark version support** — builds and publishes for Spark 3.3, 3.4, and 3.5 from a single codebase + +## Supported Spark Versions + +| Profile | Spark | Scala | Java | Snowflake | Elasticsearch | +|------------|-------|-------|------|-----------|---------------| +| `spark34` | 3.4.4 | 2.12 | 11 | ✅ | ✅ | +| `spark35` | 3.5.3 | 2.12 | 11 | ✅ | ✅ | +| `spark40` | 4.0.2 | 2.13 | 17 | ❌ | ❌ | + +> **Note:** Spark 4.0 support is experimental. Snowflake and Elasticsearch connectors +> do not yet have Spark 4.0–compatible releases. + +## Building Locally + +Select a Spark profile via the `SPARK_PROFILE` environment variable (defaults to `spark35`): + +```bash +# Build for Spark 3.4 +SPARK_PROFILE=spark34 sbt compile + +# Run tests for Spark 3.4 +SPARK_PROFILE=spark34 sbt test + +# Package for Spark 3.5 (default) +sbt package +``` + +All version-coupled dependencies (Spark, Scala, Java target, connectors) are defined in +[`project/SparkProfiles.scala`](project/SparkProfiles.scala) — the single source of truth. ## Getting Started To get started with Data I/O, please refer to the [documentation][gettingstarted] for installation instructions, usage examples, and API references. diff --git a/build.sbt b/build.sbt index a4a91cd..d8b83d5 100644 --- a/build.sbt +++ b/build.sbt @@ -1,16 +1,31 @@ -// BUILD SETUP +// ════════════════════════════════════════════════════════════════════════════ +// BUILD SETUP — all version-coupled values come from SparkProfiles +// See: project/SparkProfiles.scala (the single source of truth) +// ════════════════════════════════════════════════════════════════════════════ + +val spark = SparkProfiles.active + ThisBuild / organization := "com.amadeus.dataio" ThisBuild / versionScheme := Some("early-semver") -ThisBuild / scalaVersion := "2.12.15" +ThisBuild / scalaVersion := spark.scalaVersion +ThisBuild / javacOptions ++= Seq("-source", spark.javaTarget, "-target", spark.javaTarget) +ThisBuild / scalacOptions += "-deprecation" + +ThisBuild / Test / javaOptions ++= Seq( + "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED", + "--add-opens=java.base/java.lang=ALL-UNNAMED", + "--add-opens=java.base/java.nio=ALL-UNNAMED", + "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED", + "--add-opens=java.base/java.util=ALL-UNNAMED", + "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED" +) +// Required for javaOptions to take effect with forked JVM +ThisBuild / Test / fork := true -val scalatestVersion = "3.2.15" -val scalamockVersion = "5.2.0" -val sparkVersion = "3.5.0" -val typesafeConfigVersion = "1.4.3" -val slf4jApiVersion = "2.0.7" -val commonsIoVersion = "2.13.0" +// ════════════════════════════════════════════════════════════════════════════ +// RELEASE SETUP +// ════════════════════════════════════════════════════════════════════════════ -// RELEASE SETUP import sbt.Keys.libraryDependencies import sbtrelease.ReleaseStateTransformations.* @@ -18,11 +33,11 @@ def getReleaseVersion(ver: String, bumpType: String): String = { val pattern = """(\d+)\.(\d+)\.(\d+)-(spark[\d.]+)-SNAPSHOT""".r ver match { - case pattern(major, minor, patch, sparkVersion) => + case pattern(major, minor, patch, sparkTag) => bumpType match { - case "MAJOR" => s"${major.toInt + 1}.0.0-$sparkVersion" - case "MINOR" => s"$major.${minor.toInt + 1}.0-$sparkVersion" - case "PATCH" => s"$major.$minor.$patch-$sparkVersion" + case "MAJOR" => s"${major.toInt + 1}.0.0-$sparkTag" + case "MINOR" => s"$major.${minor.toInt + 1}.0-$sparkTag" + case "PATCH" => s"$major.$minor.$patch-$sparkTag" case _ => sys.error(s"Invalid RELEASE_TYPE: $bumpType") } case _ => sys.error(s"Invalid version format: $ver") @@ -33,8 +48,8 @@ def getReleaseNextVersion(ver: String): String = { val pattern = """(\d+)\.(\d+)\.(\d+)-(spark[\d.]+)""".r ver match { - case pattern(major, minor, patch, sparkVersion) => - s"$major.$minor.${patch.toInt + 1}-$sparkVersion-SNAPSHOT" + case pattern(major, minor, patch, sparkTag) => + s"$major.$minor.${patch.toInt + 1}-$sparkTag-SNAPSHOT" case _ => sys.error(s"Invalid version format: $ver") } } @@ -55,7 +70,10 @@ ThisBuild / releaseProcess := Seq[ReleaseStep]( pushChanges // Push everything to Git ) -// Global GitHub Packages settings +// ════════════════════════════════════════════════════════════════════════════ +// PUBLISHING SETUP — GitHub Packages +// ════════════════════════════════════════════════════════════════════════════ + ThisBuild / credentials += Credentials( "GitHub Package Registry", "maven.pkg.github.com", @@ -70,7 +88,6 @@ ThisBuild / publishTo := Some( // ThisBuild / publishTo := Some(Resolver.file("local-maven", file(Path.userHome.absolutePath + "/.m2/repository"))) ThisBuild / publishMavenStyle := true -// Additional Maven metadata ThisBuild / pomIncludeRepository := { _ => false } ThisBuild / pomExtra := https://github.com/AmadeusITGroup/dataio-framework @@ -81,21 +98,31 @@ ThisBuild / pomExtra := -// TESTS SETUP +// ════════════════════════════════════════════════════════════════════════════ +// TESTS SETUP +// ════════════════════════════════════════════════════════════════════════════ + ThisBuild / Test / parallelExecution := false ThisBuild / Test / publishArtifact := false -// PROJECTS SETUP +// ════════════════════════════════════════════════════════════════════════════ +// SHARED DEPENDENCIES (driven by SparkProfiles) +// ════════════════════════════════════════════════════════════════════════════ + lazy val commonSettings = Seq( libraryDependencies ++= Seq( - "org.apache.spark" %% "spark-sql" % sparkVersion, - "org.apache.spark" %% "spark-core" % sparkVersion, - "com.typesafe" % "config" % typesafeConfigVersion, - "org.scalatest" %% "scalatest" % scalatestVersion % Test, - "org.scalamock" %% "scalamock" % scalamockVersion % Test + "org.apache.spark" %% "spark-sql" % spark.sparkVersion, + "org.apache.spark" %% "spark-core" % spark.sparkVersion, + "com.typesafe" % "config" % spark.typesafeConfigVersion, + "org.scalatest" %% "scalatest" % spark.scalatestVersion % Test, + "org.scalamock" %% "scalamock" % spark.scalamockVersion % Test ) ) +// ════════════════════════════════════════════════════════════════════════════ +// PROJECTS +// ════════════════════════════════════════════════════════════════════════════ + /** Shared traits and functions for testing inside Data I/O sub projects. * It should not be published, and only be used in the Data I/O project itself. * @see [[test]] For testing applications made with Data I/O. @@ -103,11 +130,11 @@ lazy val commonSettings = Seq( lazy val testutils = (project in file("testutils")) .settings( libraryDependencies ++= Seq( - "org.apache.spark" %% "spark-sql" % sparkVersion, - "org.apache.spark" %% "spark-core" % sparkVersion, - "com.typesafe" % "config" % typesafeConfigVersion, - "org.scalatest" %% "scalatest" % scalatestVersion, - "org.scalamock" %% "scalamock" % scalamockVersion + "org.apache.spark" %% "spark-sql" % spark.sparkVersion, + "org.apache.spark" %% "spark-core" % spark.sparkVersion, + "com.typesafe" % "config" % spark.typesafeConfigVersion, + "org.scalatest" %% "scalatest" % spark.scalatestVersion, + "org.scalamock" %% "scalamock" % spark.scalamockVersion ), publish / skip := true ) @@ -117,9 +144,10 @@ lazy val core = (project in file("core")) commonSettings, name := "dataio-core", libraryDependencies ++= Seq( - "org.slf4j" % "slf4j-api" % slf4jApiVersion, - "commons-io" % "commons-io" % commonsIoVersion - ) + "org.slf4j" % "slf4j-api" % spark.slf4jApiVersion, + "commons-io" % "commons-io" % spark.commonsIoVersion + ), + Test / baseDirectory := (ThisBuild / baseDirectory).value // <-- fix CWD for forked JVM ) .dependsOn(testutils % Test) @@ -128,20 +156,26 @@ lazy val kafka = (project in file("kafka")) commonSettings, name := "dataio-kafka", libraryDependencies ++= Seq( - "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion, - "io.github.embeddedkafka" %% "embedded-kafka" % "3.5.1" % Test, - "io.github.embeddedkafka" %% "embedded-kafka-streams" % "3.5.1" % Test - ) + "org.apache.spark" %% "spark-sql-kafka-0-10" % spark.sparkVersion, + "io.github.embeddedkafka" %% "embedded-kafka" % spark.embeddedKafkaVersion % Test, + "io.github.embeddedkafka" %% "embedded-kafka-streams" % spark.embeddedKafkaVersion % Test + ), + Test / baseDirectory := (ThisBuild / baseDirectory).value // <-- fix CWD for forked JVM ) .dependsOn(core, testutils % Test) +// ── Conditionally-included modules (some connectors don't support all Spark versions) ── + lazy val snowflake = (project in file("snowflake")) .settings( commonSettings, name := "dataio-snowflake", - libraryDependencies ++= Seq( - "net.snowflake" %% "spark-snowflake" % f"3.1.1" - ) + libraryDependencies ++= spark.sparkSnowflakeVersion.toSeq.map { v => + "net.snowflake" %% "spark-snowflake" % v + }, + // If the connector is unavailable for this profile, skip publishing an empty JAR + publish / skip := !spark.supportsSnowflake, + Test / baseDirectory := (ThisBuild / baseDirectory).value // <-- fix CWD for forked JVM ) .dependsOn(core, testutils % Test) @@ -149,16 +183,19 @@ lazy val elasticsearch = (project in file("elasticsearch")) .settings( commonSettings, name := "dataio-elasticsearch", - libraryDependencies ++= Seq( - "org.elasticsearch" %% "elasticsearch-spark-30" % "8.17.4" - exclude ("org.scala-lang", "scala-library") - exclude ("org.scala-lang", "scala-reflect") - exclude ("org.slf4j", "slf4j-api") - exclude ("org.apache.spark", "spark-core_" + scalaVersion.value.substring(0, 4)) - exclude ("org.apache.spark", "spark-sql_" + scalaVersion.value.substring(0, 4)) - exclude ("org.apache.spark", "spark-catalyst_" + scalaVersion.value.substring(0, 4)) - exclude ("org.apache.spark", "spark-streaming_" + scalaVersion.value.substring(0, 4)) - ) + libraryDependencies ++= spark.elasticsearchSparkVersion.toSeq.map { v => + "org.elasticsearch" %% "elasticsearch-spark-30" % v exclude + ("org.scala-lang", "scala-library") exclude + ("org.scala-lang", "scala-reflect") exclude + ("org.slf4j", "slf4j-api") exclude + ("org.apache.spark", s"spark-core_${spark.scalaBinaryVersion}") exclude + ("org.apache.spark", s"spark-sql_${spark.scalaBinaryVersion}") exclude + ("org.apache.spark", s"spark-catalyst_${spark.scalaBinaryVersion}") exclude + ("org.apache.spark", s"spark-streaming_${spark.scalaBinaryVersion}") + }, + // If the connector is unavailable for this profile, skip publishing an empty JAR + publish / skip := !spark.supportsElasticsearch, + Test / baseDirectory := (ThisBuild / baseDirectory).value // <-- fix CWD for forked JVM ) .dependsOn(core, testutils % Test) @@ -167,16 +204,26 @@ lazy val test = (project in file("test")) commonSettings, name := "dataio-test", libraryDependencies ++= Seq( - "org.scalatest" %% "scalatest" % scalatestVersion, - "org.scalamock" %% "scalamock" % scalamockVersion - ) + "org.scalatest" %% "scalatest" % spark.scalatestVersion, + "org.scalamock" %% "scalamock" % spark.scalamockVersion + ), + Test / baseDirectory := (ThisBuild / baseDirectory).value // <-- fix CWD for forked JVM ) .dependsOn(core, testutils % Test) -// Projects configuration +// ════════════════════════════════════════════════════════════════════════════ +// ROOT AGGREGATE — dynamically includes only modules available for this profile +// ════════════════════════════════════════════════════════════════════════════ + +lazy val alwaysModules: Seq[ProjectReference] = Seq(core, test, kafka) + +lazy val conditionalModules: Seq[ProjectReference] = + (if (spark.supportsSnowflake) Seq[ProjectReference](snowflake) else Nil) ++ + (if (spark.supportsElasticsearch) Seq[ProjectReference](elasticsearch) else Nil) + lazy val root = (project in file(".")) .settings( name := "dataio", publish / skip := true ) - .aggregate(core, test, kafka, snowflake) + .aggregate(alwaysModules ++ conditionalModules: _*) diff --git a/core/src/main/scala/com/amadeus/dataio/config/ConfigNodeCollection.scala b/core/src/main/scala/com/amadeus/dataio/config/ConfigNodeCollection.scala index 93fa506..d30ba85 100644 --- a/core/src/main/scala/com/amadeus/dataio/config/ConfigNodeCollection.scala +++ b/core/src/main/scala/com/amadeus/dataio/config/ConfigNodeCollection.scala @@ -25,14 +25,14 @@ object ConfigNodeCollection extends Logging { * config argument. */ def apply(nodeName: String, config: Config): ConfigNodeCollection = { - import collection.JavaConverters._ + import scala.collection.JavaConverters._ if (!config.hasPath(nodeName)) return ConfigNodeCollection(Nil) val rawConfigs: Seq[Config] = Try { - config.getConfigList(nodeName).asScala + config.getConfigList(nodeName).asScala.toSeq } orElse Try { config.getConfig(nodeName) +: Nil } getOrElse { diff --git a/core/src/main/scala/com/amadeus/dataio/config/fields/DropDuplicatesConfigurator.scala b/core/src/main/scala/com/amadeus/dataio/config/fields/DropDuplicatesConfigurator.scala index c03f071..9bc9d1d 100644 --- a/core/src/main/scala/com/amadeus/dataio/config/fields/DropDuplicatesConfigurator.scala +++ b/core/src/main/scala/com/amadeus/dataio/config/fields/DropDuplicatesConfigurator.scala @@ -15,7 +15,7 @@ trait DropDuplicatesConfigurator { Try { config.getString("drop_duplicates").split(",").map(_.trim).toList } orElse Try { - config.getStringList("drop_duplicates").asScala + config.getStringList("drop_duplicates").asScala.toSeq } getOrElse { Seq[String]() } diff --git a/core/src/main/scala/com/amadeus/dataio/config/fields/PartitionByConfigurator.scala b/core/src/main/scala/com/amadeus/dataio/config/fields/PartitionByConfigurator.scala index 4a770e5..9ea7991 100644 --- a/core/src/main/scala/com/amadeus/dataio/config/fields/PartitionByConfigurator.scala +++ b/core/src/main/scala/com/amadeus/dataio/config/fields/PartitionByConfigurator.scala @@ -15,7 +15,7 @@ trait PartitionByConfigurator extends Logging { Try { config.getString("partition_by").split(",").map(_.trim).toList } orElse Try { - config.getStringList("partition_by").asScala + config.getStringList("partition_by").asScala.toSeq } getOrElse { Seq[String]() } diff --git a/core/src/main/scala/com/amadeus/dataio/config/fields/SortWithinPartitionsConfigurator.scala b/core/src/main/scala/com/amadeus/dataio/config/fields/SortWithinPartitionsConfigurator.scala index 48ef216..bafe33f 100644 --- a/core/src/main/scala/com/amadeus/dataio/config/fields/SortWithinPartitionsConfigurator.scala +++ b/core/src/main/scala/com/amadeus/dataio/config/fields/SortWithinPartitionsConfigurator.scala @@ -14,7 +14,7 @@ trait SortWithinPartitionsConfigurator { Try { config.getString("sort_within_partitions.exprs").split(",").map(_.trim).toSeq } orElse Try { - config.getStringList("sort_within_partitions.exprs").asScala + config.getStringList("sort_within_partitions.exprs").asScala.toSeq } getOrElse { Seq[String]() } diff --git a/core/src/main/scala/com/amadeus/dataio/core/InstantiationHelper.scala b/core/src/main/scala/com/amadeus/dataio/core/InstantiationHelper.scala index 8d16044..c66b2a5 100644 --- a/core/src/main/scala/com/amadeus/dataio/core/InstantiationHelper.scala +++ b/core/src/main/scala/com/amadeus/dataio/core/InstantiationHelper.scala @@ -6,21 +6,19 @@ import scala.reflect.runtime.universe.TermName import scala.reflect.runtime.{universe => ru} import scala.util.{Failure, Success, Try} -/** - * Contains a variety of helper functions to facilitate instantiating objects using reflection. - */ +/** Contains a variety of helper functions to facilitate instantiating objects using reflection. + */ trait InstantiationHelper { - /** - * Creates an instance of a given class by using its companion object. - * @param config The configuration of the object to create. - * @tparam T The parent class of the class to instantiate. - * @return A new instance of the subclass of T provided in the config argument. - * @throws ClassNotFoundException If the class in the config argument can not be found. - * @throws Exception If the companion object of the class is not defined properly, that is to say missing an apply - * method taking a ConfigNode argument and returning a T. Finally, if the companion object of the - * class throws an exception. - */ + /** Creates an instance of a given class by using its companion object. + * @param config The configuration of the object to create. + * @tparam T The parent class of the class to instantiate. + * @return A new instance of the subclass of T provided in the config argument. + * @throws ClassNotFoundException If the class in the config argument can not be found. + * @throws Exception If the companion object of the class is not defined properly, that is to say missing an apply + * method taking a ConfigNode argument and returning a T. Finally, if the companion object of the + * class throws an exception. + */ def instantiateWithCompanionObject[T](config: ConfigNode): T = { // The Scala reflection API being difficult to debug, exceptions are // thrown here in order to guide the implementation of new entities @@ -64,28 +62,29 @@ trait InstantiationHelper { // If the entity's companion object does not exist or its apply method does not have the proper signature, an IllegalArgumentException // will be thrown. case "IllegalArgumentException" => - throw new Exception(s"$className is not a proper Type. This usually happens if the companion object does not have an apply method with a valid signature.") + throw new Exception( + s"$className is not a proper Type. This usually happens if the companion object does not have an apply method with a valid signature." + ) // In all other cases, we don't know how to handle the error, so we re-throw the exception as is. case _ => throw ex } } } - /** - * Creates an instance of a given class by using its empty constructor. - * @param fullyQualifiedClassName The fully qualified name of the class to instantiate. - * @tparam T The parent class of the class to instantiate. - * @return A new instance of the subclass of T provided in the className argument. - */ + /** Creates an instance of a given class by using its empty constructor. + * @param fullyQualifiedClassName The fully qualified name of the class to instantiate. + * @tparam T The parent class of the class to instantiate. + * @return A new instance of the subclass of T provided in the className argument. + */ def instantiateWithEmptyConstructor[T](fullyQualifiedClassName: String): T = { Class .forName(fullyQualifiedClassName) - .newInstance + .getDeclaredConstructor() + .newInstance() .asInstanceOf[T] } } -/** - * Contains a variety of helper functions to facilitate instantiating objects using reflection. - */ +/** Contains a variety of helper functions to facilitate instantiating objects using reflection. + */ object InstantiationHelper extends InstantiationHelper diff --git a/core/src/main/scala/com/amadeus/dataio/core/time/Time.scala b/core/src/main/scala/com/amadeus/dataio/core/time/Time.scala index 154cdc4..33db7a7 100644 --- a/core/src/main/scala/com/amadeus/dataio/core/time/Time.scala +++ b/core/src/main/scala/com/amadeus/dataio/core/time/Time.scala @@ -6,18 +6,15 @@ import com.amadeus.dataio.core.time.Modifiers.ModifierString import java.time.{LocalDate, LocalDateTime, ZoneId, ZonedDateTime} import scala.util.Try - -/** - * This object exposes helper methods for working on Time objects. - */ +/** This object exposes helper methods for working on Time objects. + */ object Time { - /** - * Function that returns the first time stamp on chronological order - * @param instant1 a time stamp - * @param instant2 another time stamp - * @return first time stamp on chronological order - */ + /** Function that returns the first time stamp on chronological order + * @param instant1 a time stamp + * @param instant2 another time stamp + * @return first time stamp on chronological order + */ def minTime(instant1: LocalDateTime, instant2: LocalDateTime): LocalDateTime = { if (instant1.isBefore(instant2)) { instant1 @@ -26,12 +23,11 @@ object Time { } } - /** - * Function that returns the second time stamp on chronological order - * @param instant1 a time stamp - * @param instant2 another time stamp - * @return second time stamp on chronological order - */ + /** Function that returns the second time stamp on chronological order + * @param instant1 a time stamp + * @param instant2 another time stamp + * @return second time stamp on chronological order + */ def maxTime(instant1: LocalDateTime, instant2: LocalDateTime): LocalDateTime = { if (instant1.isAfter(instant2)) { instant1 @@ -40,23 +36,21 @@ object Time { } } - /** - * Function that returns a map of key-value pairs based on NOW and a prefix - * - * @param prefix a prefix string that is added to each key - * @return map of key-value pairs based on a time stamp - */ + /** Function that returns a map of key-value pairs based on NOW and a prefix + * + * @param prefix a prefix string that is added to each key + * @return map of key-value pairs based on a time stamp + */ def getTimeMap(prefix: Option[String]): Map[String, String] = { getTimeMap(LocalDateTime.now(), prefix) } - /** - * Function that returns a map of key-value pairs based on a time stamp and a prefix - * - * @param timestamp a time stamp (by default is NOW) - * @param prefix an optional prefix string that is added to each key - * @return map of key-value pairs based on a time stamp - */ + /** Function that returns a map of key-value pairs based on a time stamp and a prefix + * + * @param timestamp a time stamp (by default is NOW) + * @param prefix an optional prefix string that is added to each key + * @return map of key-value pairs based on a time stamp + */ def getTimeMap(timestamp: LocalDateTime = LocalDateTime.now(), prefix: Option[String] = None): Map[String, String] = { def addPrefix(label: String): String = prefix match { case Some(p) => p + "." + label @@ -65,76 +59,67 @@ object Time { Formats.AllFormats.map(f => (addPrefix(f.label), timestamp.formatted(f))).toMap } - /** - * Class used to extend LocalDateTime methods - */ + /** Class used to extend LocalDateTime methods + */ implicit class DateImplicits(time: LocalDateTime) { - /** - * Use to get a time string based upon the chosen formatter - * @param formatter a labeled formatter - * @return time string based upon the formatter - */ + /** Use to get a time string based upon the chosen formatter + * @param formatter a labeled formatter + * @return time string based upon the formatter + */ def formatted(formatter: LabeledFormatter): String = { ZonedDateTime.of(time, ZoneId.of("Z")).format(formatter.formatter) } - /** - * Truncate a given time stamp to second (example: 2015-09-17T13:35:27.957Z -> 2015-09-17T13:35:27.000Z) - * @return a time stamp - */ + /** Truncate a given time stamp to second (example: 2015-09-17T13:35:27.957Z -> 2015-09-17T13:35:27.000Z) + * @return a time stamp + */ def truncSecond: LocalDateTime = { time.withNano(0) } - /** - * Truncate a given time stamp to minute (example: 2015-09-17T13:35:27.957Z -> 2015-09-17T13:35:00.000Z) - * @return a time stamp - */ + /** Truncate a given time stamp to minute (example: 2015-09-17T13:35:27.957Z -> 2015-09-17T13:35:00.000Z) + * @return a time stamp + */ def truncMinute: LocalDateTime = { time.truncSecond.withSecond(0) } - /** - * Truncate a given time stamp to hour (example: 2015-09-17T13:35:27.957Z -> 2015-09-17T13:00:00.000Z) - * @return a time stamp - */ + /** Truncate a given time stamp to hour (example: 2015-09-17T13:35:27.957Z -> 2015-09-17T13:00:00.000Z) + * @return a time stamp + */ def truncHour: LocalDateTime = { time.truncMinute.withMinute(0) } - /** - * Truncate a given time stamp to day (example: 2015-09-17T13:35:27.957Z -> 2015-09-17T00:00:00.000Z) - * @return a time stamp - */ + /** Truncate a given time stamp to day (example: 2015-09-17T13:35:27.957Z -> 2015-09-17T00:00:00.000Z) + * @return a time stamp + */ def truncDay: LocalDateTime = { time.truncHour.withHour(0) } - /** - * Truncate a given time stamp to month (example: 2015-09-17T13:35:27.957Z -> 2015-09-01T00:00:00.000Z) - * @return a time stamp - */ + /** Truncate a given time stamp to month (example: 2015-09-17T13:35:27.957Z -> 2015-09-01T00:00:00.000Z) + * @return a time stamp + */ def truncMonth: LocalDateTime = { time.truncDay.withDayOfMonth(1) } - /** - * Truncate a given time stamp to year (example: 2015-09-17T13:35:27.957Z -> 2015-01-01T00:00:00.000Z) - * @return a time stamp - */ + /** Truncate a given time stamp to year (example: 2015-09-17T13:35:27.957Z -> 2015-01-01T00:00:00.000Z) + * @return a time stamp + */ def truncYear: LocalDateTime = { time.truncMonth.withMonth(1) } - /** - * Truncate a given time stamp using a modifier - * - * @return a time stamp - * @throws IllegalArgumentException when the modifier is incorrect - */ + /** Truncate a given time stamp using a modifier + * + * @return a time stamp + * @throws IllegalArgumentException when the modifier is incorrect + */ def truncate(modifier: String): LocalDateTime = { - modifier.getModifier match { + modifier.getModifier() match { case Modifiers.TruncSecond => time.truncSecond case Modifiers.TruncMinute => time.truncMinute case Modifiers.TruncHour => time.truncHour @@ -145,19 +130,18 @@ object Time { } } - /** - * Apply a delta to a time stamp using a modifier. - * @param modifier a string with syntax: (+|-)number(time pattern symbol) -> examples: +5D -3W -1Y +6M -25m +1H -30s - * @return a time stamp - * @throws IllegalArgumentException when the modifier is incorrect - */ + /** Apply a delta to a time stamp using a modifier. + * @param modifier a string with syntax: (+|-)number(time pattern symbol) -> examples: +5D -3W -1Y +6M -25m +1H -30s + * @return a time stamp + * @throws IllegalArgumentException when the modifier is incorrect + */ def applyDelta(modifier: String): LocalDateTime = { val sign = modifier.substring(0, 1) val letter = modifier.substring(modifier.length - 1) val delta = Try(Integer.parseInt(modifier.substring(1, modifier.length - 1))) if (delta.isSuccess) { if (sign.equals("-")) { - letter.getModifier match { + letter.getModifier() match { case Modifiers.LetterSecond => time.minusSeconds(delta.get) case Modifiers.LetterMinute => time.minusMinutes(delta.get) case Modifiers.LetterHour => time.minusHours(delta.get) @@ -168,7 +152,7 @@ object Time { case _ => throw new IllegalArgumentException(modifier) } } else if (sign.equals("+")) { - letter.getModifier match { + letter.getModifier() match { case Modifiers.LetterSecond => time.plusSeconds(delta.get) case Modifiers.LetterMinute => time.plusMinutes(delta.get) case Modifiers.LetterHour => time.plusHours(delta.get) @@ -186,17 +170,16 @@ object Time { } } - /** - * Set a time property (second, minute, etc...) to a required value. - * @param letter a time pattern symbol -> examples: D, W, Y, M, m, H, s - * @param right a numeric value that you want to set - * @return a time stamp - * @throws IllegalArgumentException when the letter or numeric value are incorrect - */ + /** Set a time property (second, minute, etc...) to a required value. + * @param letter a time pattern symbol -> examples: D, W, Y, M, m, H, s + * @param right a numeric value that you want to set + * @return a time stamp + * @throws IllegalArgumentException when the letter or numeric value are incorrect + */ def applySetter(letter: String, right: String): LocalDateTime = { val value = Try(Integer.parseInt(right)) if (value.isSuccess) { - letter.getModifier match { + letter.getModifier() match { case Modifiers.LetterSecond => time.withSecond(value.get) case Modifiers.LetterMinute => time.withMinute(value.get) case Modifiers.LetterHour => time.withHour(value.get) @@ -210,36 +193,38 @@ object Time { } } - /** - * Determines if this time is between the two given time bounds. (ClosedClosed range type) - * @param from the left bound - * @param until the right bound - * @return a Boolean - */ + /** Determines if this time is between the two given time bounds. (ClosedClosed range type) + * @param from the left bound + * @param until the right bound + * @return a Boolean + */ def isBetween(from: LocalDateTime, until: LocalDateTime): Boolean = { time.isBetween(from, until, ClosedClosed) } - /** - * Determines if this time is between the two given time bounds. - * @param from the left bound - * @param until the right bound - * @param rangeType determines if the left bound and right bound are included in the range - * @return a Boolean - */ + /** Determines if this time is between the two given time bounds. + * @param from the left bound + * @param until the right bound + * @param rangeType determines if the left bound and right bound are included in the range + * @return a Boolean + */ def isBetween(from: LocalDateTime, until: LocalDateTime, rangeType: RangeType): Boolean = { require(!from.isAfter(until)) time.isBetween(new DateRange(from, until), rangeType) } - /** - * Determines if this time is between the two given time bounds. - * @param dateRange the date range containing the left and right time bounds - * @param rangeType determines if the left bound and right bound are included in the range - * @return a Boolean - */ + /** Determines if this time is between the two given time bounds. + * @param dateRange the date range containing the left and right time bounds + * @param rangeType determines if the left bound and right bound are included in the range + * @return a Boolean + */ def isBetween(dateRange: DateRange, rangeType: RangeType): Boolean = { - (time.isBefore(dateRange.from), time.equals(dateRange.from), time.equals(dateRange.until), time.isAfter(dateRange.until)) match { + ( + time.isBefore(dateRange.from), + time.equals(dateRange.from), + time.equals(dateRange.until), + time.isAfter(dateRange.until) + ) match { case (true, _, _, _) => false // time, from before the left bound case (_, _, _, true) => false // after the right bound case (_, true, true, _) => rangeType == ClosedClosed // left and right bound are the same time @@ -250,58 +235,61 @@ object Time { } } - /** - * Function that take a time expression and produce a time stamp instance. - * @param timeExpression a time expression formed by a reference followed by modifiers - * @return a time stamp - * @throws RuntimeException when the reference cannot be parsed as a time stamp - */ + /** Function that take a time expression and produce a time stamp instance. + * @param timeExpression a time expression formed by a reference followed by modifiers + * @return a time stamp + * @throws RuntimeException when the reference cannot be parsed as a time stamp + */ def produceTime(timeExpression: String): LocalDateTime = { require(timeExpression != null) val parameters = timeExpression.trim().split(" ") val timeStr = parameters(0) - val modifiers = parameters.drop(1).filterNot { x => x.isEmpty() } + val modifiers = parameters.drop(1).filterNot { x => x.isEmpty } - val parsedDates = Formats.AllFormats.map { format => Try(format.formatter.parse(timeStr)) }.filter { parsedDate => parsedDate.isSuccess } + val parsedDates = + Formats.AllFormats.map { format => Try(format.formatter.parse(timeStr)) }.filter { parsedDate => parsedDate.isSuccess } if (parsedDates.isEmpty) { calculateTime(LocalDateTime.now(), modifiers) } else { - var referenceTimes = parsedDates.map { parsedDate => Try(LocalDateTime.from(parsedDate.get)) }.filter { parsedDate => parsedDate.isSuccess } + var referenceTimes = + parsedDates.map { parsedDate => Try(LocalDateTime.from(parsedDate.get)) }.filter { parsedDate => parsedDate.isSuccess } if (referenceTimes.isEmpty) { - referenceTimes = parsedDates.map { parsedDate => Try(LocalDate.from(parsedDate.get).atStartOfDay()) }.filter { parsedDate => parsedDate.isSuccess } + referenceTimes = parsedDates.map { parsedDate => Try(LocalDate.from(parsedDate.get).atStartOfDay()) }.filter { + parsedDate => parsedDate.isSuccess + } } if (referenceTimes.isEmpty) { - throw new RuntimeException("Impossible to create a time stamp instance using the parsed objects. Investigation required: " + parsedDates.toString) + throw new RuntimeException( + "Impossible to create a time stamp instance using the parsed objects. Investigation required: " + parsedDates.toString + ) } else { calculateTime(referenceTimes.last.get, modifiers) } } } - /** - * Function that calculate a time stamp using a list of modifiers. - * Pay attention to modifiers order since the operation are not commutative. - * @param reference a time stamp - * @param modifiers a string listing time modifiers - * @return a time stamp - * @throws IllegalArgumentException when the modifiers are incorrect - */ + /** Function that calculate a time stamp using a list of modifiers. + * Pay attention to modifiers order since the operation are not commutative. + * @param reference a time stamp + * @param modifiers a string listing time modifiers + * @return a time stamp + * @throws IllegalArgumentException when the modifiers are incorrect + */ def calculateTime(reference: LocalDateTime, modifiers: String): LocalDateTime = { calculateTime(reference, modifiers.trim().split(" ")) } - /** - * Function that calculate a time stamp using a list of modifiers. - * Pay attention to modifiers order since the operation are not commutative. - * @param reference a time stamp - * @param modifiers a list of modifiers - * @return a time stamp - * @throws IllegalArgumentException when the modifiers are incorrect - */ + /** Function that calculate a time stamp using a list of modifiers. + * Pay attention to modifiers order since the operation are not commutative. + * @param reference a time stamp + * @param modifiers a list of modifiers + * @return a time stamp + * @throws IllegalArgumentException when the modifiers are incorrect + */ def calculateTime(reference: LocalDateTime, modifiers: Array[String] = Array.empty): LocalDateTime = { modifiers.foldLeft(reference) { (date, modifier) => - if (!modifier.isEmpty) { - modifier.isTruncateModifier match { + if (modifier.nonEmpty) { + modifier.isTruncateModifier() match { case true => date.truncate(modifier) case false => { val parts = modifier.split("=") diff --git a/elasticsearch/src/main/scala/com/amadeus/dataio/pipes/elk/ElkConfigurator.scala b/elasticsearch/src/main/scala/com/amadeus/dataio/pipes/elk/ElkConfigurator.scala index 53a3480..33d69a7 100644 --- a/elasticsearch/src/main/scala/com/amadeus/dataio/pipes/elk/ElkConfigurator.scala +++ b/elasticsearch/src/main/scala/com/amadeus/dataio/pipes/elk/ElkConfigurator.scala @@ -4,36 +4,32 @@ import com.typesafe.config.Config import scala.util.Try -/** - * Elasticsearch parameterization - */ +/** Elasticsearch parameterization + */ object ElkConfigurator { - /** - * @param config The typesafe Config object holding the configuration. - * @return A String of the index to use. - * @throws com.typesafe.config.ConfigException If data is missing in the config argument. See the user documentation - * for the expected fields. - */ + /** @param config The typesafe Config object holding the configuration. + * @return A String of the index to use. + * @throws com.typesafe.config.ConfigException If data is missing in the config argument. See the user documentation + * for the expected fields. + */ def getIndex(implicit config: Config): String = { - config.getString("Index") + config.getString("index") } - /** - * @param config The typesafe Config object holding the configuration. - * @return A String of the date field to be used for sub index partitioning. - * @throws com.typesafe.config.ConfigException If data is missing in the config argument. See the user documentation - * for the expected fields. - */ + /** @param config The typesafe Config object holding the configuration. + * @return A String of the date field to be used for sub index partitioning. + * @throws com.typesafe.config.ConfigException If data is missing in the config argument. See the user documentation + * for the expected fields. + */ def getDateField(implicit config: Config): String = { - config.getString("DateField") + config.getString("dateField") } - /** - * @param config The typesafe Config object holding the configuration. - * @return A Option[String] of the date pattern to use for sub index partitioning. - */ + /** @param config The typesafe Config object holding the configuration. + * @return A Option[String] of the date pattern to use for sub index partitioning. + */ def getSubIndexDatePattern(implicit config: Config): Option[String] = { - Try(config.getString("SubIndexDatePattern")).toOption + Try(config.getString("subIndexDatePattern")).toOption } } diff --git a/elasticsearch/src/main/scala/com/amadeus/dataio/pipes/elk/batch/ElkOutput.scala b/elasticsearch/src/main/scala/com/amadeus/dataio/pipes/elk/batch/ElkOutput.scala index 6884236..9018f6c 100644 --- a/elasticsearch/src/main/scala/com/amadeus/dataio/pipes/elk/batch/ElkOutput.scala +++ b/elasticsearch/src/main/scala/com/amadeus/dataio/pipes/elk/batch/ElkOutput.scala @@ -8,17 +8,17 @@ import org.apache.spark.sql.{Dataset, SparkSession} import scala.util.Try -/** - * Allows to write batch data to Elasticsearch with automatic date sub-indexing. - * - * @param index the Index to write to. - * @param mode mode. - * @param dateField The date field to use for sub index partitioning. - * @param suffixDatePattern the date suffix pattern to use for the full index. - * @param options options. - * @param config Contains the Typesafe Config object that was used at instantiation to configure this entity. - */ +/** Allows to write batch data to Elasticsearch with automatic date sub-indexing. + * + * @param index the Index to write to. + * @param mode mode. + * @param dateField The date field to use for sub index partitioning. + * @param suffixDatePattern the date suffix pattern to use for the full index. + * @param options options. + * @param config Contains the Typesafe Config object that was used at instantiation to configure this entity. + */ case class ElkOutput( + name: String, index: String, mode: String, dateField: String, @@ -29,12 +29,11 @@ case class ElkOutput( with Logging with ElkOutputCommons { - /** - * Writes data to this output. - * - * @param data The data to write. - * @param spark The SparkSession which will be used to write the data. - */ + /** Writes data to this output. + * + * @param data The data to write. + * @param spark The SparkSession which will be used to write the data. + */ override def write[T](data: Dataset[T])(implicit spark: SparkSession): Unit = { val fullIndexName = computeFullIndexName() logger.info(s"Write dataframe to Elasticsearch index [$fullIndexName]") @@ -48,17 +47,21 @@ object ElkOutput { import com.amadeus.dataio.config.fields._ import com.amadeus.dataio.pipes.elk.ElkConfigurator._ - /** - * Creates an ElkOutput based on a given configuration. - * - * @param config The collection of config nodes that will be used to instantiate KafkaOutput. - * @return a new instance of ElkOutput. - */ + /** Creates an ElkOutput based on a given configuration. + * + * @param config The collection of config nodes that will be used to instantiate KafkaOutput. + * @return a new instance of ElkOutput. + */ def apply(implicit config: Config): ElkOutput = { + val name = Try { + config.getString("name") + } getOrElse { + throw new Exception("Missing required `name` field in configuration.") + } val index = getIndex - val mode = config.getString("Mode") + val mode = config.getString("mode") val options = Try(getOptions).getOrElse(Map()) @@ -69,7 +72,15 @@ object ElkOutput { val suffixDatePattern = getSubIndexDatePattern.getOrElse(DefaultSuffixDatePattern) - ElkOutput(index = index, mode = mode, dateField = dateField, suffixDatePattern = suffixDatePattern, options = options, config = config) + ElkOutput( + name = name, + index = index, + mode = mode, + dateField = dateField, + suffixDatePattern = suffixDatePattern, + options = options, + config = config + ) } } diff --git a/elasticsearch/src/main/scala/com/amadeus/dataio/pipes/elk/streaming/ElkOutput.scala b/elasticsearch/src/main/scala/com/amadeus/dataio/pipes/elk/streaming/ElkOutput.scala index f2b72bb..ec70759 100644 --- a/elasticsearch/src/main/scala/com/amadeus/dataio/pipes/elk/streaming/ElkOutput.scala +++ b/elasticsearch/src/main/scala/com/amadeus/dataio/pipes/elk/streaming/ElkOutput.scala @@ -8,20 +8,20 @@ import org.apache.spark.sql.{Dataset, SparkSession} import scala.util.Try -/** - * Allows to write stream data to Elasticsearch with automatic date sub-indexing. - * - * @param index the Index to write to. - * @param trigger the trigger to be used for the streaming query. - * @param timeout timeout in milliseconds. - * @param mode mode. - * @param options options. - * @param config Contains the Typesafe Config object that was used at instantiation to configure this entity. - * @param dateField The date field to use for sub index partitioning. - * @param suffixDatePattern the date suffix pattern to use for the full index. - * @param outputName the output name used to define the streaming query name. - */ +/** Allows to write stream data to Elasticsearch with automatic date sub-indexing. + * + * @param index the Index to write to. + * @param trigger the trigger to be used for the streaming query. + * @param timeout timeout in milliseconds. + * @param mode mode. + * @param options options. + * @param config Contains the Typesafe Config object that was used at instantiation to configure this entity. + * @param dateField The date field to use for sub index partitioning. + * @param suffixDatePattern the date suffix pattern to use for the full index. + * @param outputName the output name used to define the streaming query name. + */ case class ElkOutput( + name: String, index: String, trigger: Option[Trigger], timeout: Long, @@ -29,18 +29,16 @@ case class ElkOutput( options: Map[String, String] = Map.empty, config: Config = ConfigFactory.empty(), dateField: String, - suffixDatePattern: String, - outputName: Option[String] + suffixDatePattern: String ) extends Output with Logging with ElkOutputCommons { - /** - * Writes data to this output. - * - * @param data The data to write. - * @param spark The SparkSession which will be used to write the data. - */ + /** Writes data to this output. + * + * @param data The data to write. + * @param spark The SparkSession which will be used to write the data. + */ def write[T](data: Dataset[T])(implicit spark: SparkSession): Unit = { val fullIndexName = computeFullIndexName() logger.info(s"Write dataframe to Elasticsearch index [$fullIndexName] using trigger [$trigger]") @@ -64,19 +62,11 @@ case class ElkOutput( streamingQuery.stop() } - /** - * Create a unique query name based on output path if exists. - * - * @return a unique query name. - */ - private[streaming] def createQueryName(): String = { - - outputName match { - case Some(name) => s"QN_${name}_${index}_${java.util.UUID.randomUUID}" - case _ => s"QN_${index}_${java.util.UUID.randomUUID}" - } - - } + /** Create a unique query name based on output path if exists. + * + * @return a unique query name. + */ + private[streaming] def createQueryName(): String = s"QN_${index}_${java.util.UUID.randomUUID}" } object ElkOutput { @@ -84,17 +74,21 @@ object ElkOutput { import com.amadeus.dataio.pipes.elk.ElkConfigurator._ import com.amadeus.dataio.pipes.elk.ElkOutputCommons.{DefaultSuffixDatePattern, checkNodesIsDefined, checkPortIsDefined} - /** - * Creates an ElkOutput based on a given configuration. - * - * @param config The collection of config nodes that will be used to instantiate KafkaOutput. - * @return a new instance of ElkOutput. - */ + /** Creates an ElkOutput based on a given configuration. + * + * @param config The collection of config nodes that will be used to instantiate KafkaOutput. + * @return a new instance of ElkOutput. + */ def apply(implicit config: Config): ElkOutput = { + val name = Try { + config.getString("name") + } getOrElse { + throw new Exception("Missing required `name` field in configuration.") + } val index = getIndex - val mode = config.getString("Mode") + val mode = config.getString("mode") val trigger = getStreamingTrigger @@ -109,18 +103,16 @@ object ElkOutput { val suffixDatePattern = getSubIndexDatePattern.getOrElse(DefaultSuffixDatePattern) - val name = Try(config.getString("Name")).toOption - ElkOutput( + name = name, index = index, trigger = trigger, - timeout = timeout, + timeout = timeout.get, mode = mode, options = options, config = config, dateField = dateField, - suffixDatePattern = suffixDatePattern, - outputName = name + suffixDatePattern = suffixDatePattern ) } } diff --git a/elasticsearch/src/test/scala/com/amadeus/dataio/pipes/elk/ElkConfiguratorTest.scala b/elasticsearch/src/test/scala/com/amadeus/dataio/pipes/elk/ElkConfiguratorTest.scala index f78c37d..052c8f7 100644 --- a/elasticsearch/src/test/scala/com/amadeus/dataio/pipes/elk/ElkConfiguratorTest.scala +++ b/elasticsearch/src/test/scala/com/amadeus/dataio/pipes/elk/ElkConfiguratorTest.scala @@ -11,7 +11,7 @@ class ElkConfiguratorTest extends AnyWordSpec with Matchers { "getIndex" should { "return index_x given Index = index_x" in { val config = ConfigFactory.parseMap( - Map("Index" -> "index_x") + Map("index" -> "index_x") ) getIndex(config) shouldEqual "index_x" } @@ -27,7 +27,7 @@ class ElkConfiguratorTest extends AnyWordSpec with Matchers { "getElkDateField" should { "return timestamp given DateField = timestamp" in { val config = ConfigFactory.parseMap( - Map("DateField" -> "timestamp") + Map("dateField" -> "timestamp") ) getDateField(config) shouldEqual "timestamp" } @@ -43,7 +43,7 @@ class ElkConfiguratorTest extends AnyWordSpec with Matchers { "getSubIndexDatePattern" should { "return yyyy.MM given SubIndexDatePattern = yyyy.MM" in { val config = ConfigFactory.parseMap( - Map("SubIndexDatePattern" -> "yyyy.MM") + Map("subIndexDatePattern" -> "yyyy.MM") ) getSubIndexDatePattern(config) shouldEqual Some("yyyy.MM") } diff --git a/elasticsearch/src/test/scala/com/amadeus/dataio/pipes/elk/batch/ElkOutputTest.scala b/elasticsearch/src/test/scala/com/amadeus/dataio/pipes/elk/batch/ElkOutputTest.scala index 89bb61a..076feea 100644 --- a/elasticsearch/src/test/scala/com/amadeus/dataio/pipes/elk/batch/ElkOutputTest.scala +++ b/elasticsearch/src/test/scala/com/amadeus/dataio/pipes/elk/batch/ElkOutputTest.scala @@ -12,15 +12,15 @@ class ElkOutputTest extends AnyWordSpec with Matchers { val config = ConfigFactory.parseMap( Map( - "Output" -> Map( - "Type" -> "com.amadeus.dataio.pipes.elk.batch.ElkOutput", - "Name" -> "my-test-elk", - "Nodes" -> "bktv001, bktv002.amadeus.net", - "Ports" -> "9200", - "Index" -> "test.index", - "DateField" -> "docDate", - "Mode" -> "append", - "Options" -> Map( + "output" -> Map( + "type" -> "com.amadeus.dataio.pipes.elk.batch.ElkOutput", + "name" -> "my-test-elk", + "nodes" -> "bktv001, bktv002.amadeus.net", + "ports" -> "9200", + "index" -> "test.index", + "dateField" -> "docDate", + "mode" -> "append", + "options" -> Map( "\"es.net.ssl.cert.allow.self.signed\"" -> true, "\"es.index.auto.create\"" -> true, "\"es.mapping.id\"" -> "docId", @@ -31,7 +31,7 @@ class ElkOutputTest extends AnyWordSpec with Matchers { ) ) - val elkStreamingOutput = ElkOutput.apply(config.getConfig("Output")) + val elkStreamingOutput = ElkOutput.apply(config.getConfig("output")) elkStreamingOutput.index shouldEqual "test.index" elkStreamingOutput.dateField shouldEqual "docDate" @@ -51,16 +51,16 @@ class ElkOutputTest extends AnyWordSpec with Matchers { val config = ConfigFactory.parseMap( Map( - "Output" -> Map( - "Type" -> "com.amadeus.dataio.pipes.elk.batch.ElkOutput", - "Name" -> "my-test-elk", - "Nodes" -> "bktv001, bktv002.amadeus.net", - "Ports" -> "9200", - "Index" -> "test.index", - "DateField" -> "docDate", - "SubIndexDatePattern" -> "yyyy.MM.dd", - "Mode" -> "append", - "Options" -> Map( + "output" -> Map( + "type" -> "com.amadeus.dataio.pipes.elk.batch.ElkOutput", + "name" -> "my-test-elk", + "nodes" -> "bktv001, bktv002.amadeus.net", + "ports" -> "9200", + "index" -> "test.index", + "dateField" -> "docDate", + "mode" -> "append", + "subIndexDatePattern" -> "yyyy.MM.dd", + "options" -> Map( "\"es.net.ssl.cert.allow.self.signed\"" -> true, "\"es.index.auto.create\"" -> true, "\"es.mapping.id\"" -> "docId", @@ -71,7 +71,7 @@ class ElkOutputTest extends AnyWordSpec with Matchers { ) ) - val elkStreamingOutput = ElkOutput.apply(config.getConfig("Output")) + val elkStreamingOutput = ElkOutput.apply(config.getConfig("output")) elkStreamingOutput.index shouldEqual "test.index" elkStreamingOutput.dateField shouldEqual "docDate" diff --git a/elasticsearch/src/test/scala/com/amadeus/dataio/pipes/elk/streaming/ElkOutputTest.scala b/elasticsearch/src/test/scala/com/amadeus/dataio/pipes/elk/streaming/ElkOutputTest.scala index c8093f6..1ef3945 100644 --- a/elasticsearch/src/test/scala/com/amadeus/dataio/pipes/elk/streaming/ElkOutputTest.scala +++ b/elasticsearch/src/test/scala/com/amadeus/dataio/pipes/elk/streaming/ElkOutputTest.scala @@ -15,17 +15,17 @@ class ElkOutputTest extends AnyWordSpec with Matchers { val config = ConfigFactory.parseMap( Map( - "Output" -> Map( - "Type" -> "com.amadeus.dataio.output.streaming.ElkOutput", - "Name" -> "my-test-elk", - "Nodes" -> "bktv001, bktv002.amadeus.net", - "Ports" -> "9200", - "Index" -> "test.index", - "DateField" -> "docDate", - "Mode" -> "append", - "Duration" -> "6 hours", - "Timeout" -> "24", - "Options" -> Map( + "output" -> Map( + "type" -> "com.amadeus.dataio.output.streaming.ElkOutput", + "name" -> "my-test-elk", + "nodes" -> "bktv001, bktv002.amadeus.net", + "ports" -> "9200", + "index" -> "test.index", + "dateField" -> "docDate", + "mode" -> "append", + "duration" -> "6 hours", + "timeout" -> "24 hours", + "options" -> Map( "\"es.net.ssl.cert.allow.self.signed\"" -> true, "\"es.index.auto.create\"" -> true, "\"es.mapping.id\"" -> "docId", @@ -36,9 +36,9 @@ class ElkOutputTest extends AnyWordSpec with Matchers { ) ) - val elkStreamingOutput = ElkOutput.apply(config.getConfig("Output")) + val elkStreamingOutput = ElkOutput.apply(config.getConfig("output")) - elkStreamingOutput.outputName shouldEqual Some("my-test-elk") + elkStreamingOutput.name shouldEqual "my-test-elk" elkStreamingOutput.index shouldEqual "test.index" elkStreamingOutput.dateField shouldEqual "docDate" elkStreamingOutput.suffixDatePattern shouldEqual "yyyy.MM" @@ -55,21 +55,21 @@ class ElkOutputTest extends AnyWordSpec with Matchers { } - "be initialized according to configuration without output name" in { + "raise exception given missing output name" in { val config = ConfigFactory.parseMap( Map( - "Output" -> Map( - "Type" -> "com.amadeus.dataio.output.streaming.ElkOutput", - "Nodes" -> "bktv001, bktv002.amadeus.net", - "Ports" -> "9200", - "Index" -> "test.index", - "DateField" -> "docDate", - "Mode" -> "append", - "Trigger" -> "Continuous", - "Duration" -> "6 hours", - "Timeout" -> "24", - "Options" -> Map( + "output" -> Map( + "type" -> "com.amadeus.dataio.output.streaming.ElkOutput", + "nodes" -> "bktv001, bktv002.amadeus.net", + "ports" -> "9200", + "index" -> "test.index", + "dateField" -> "docDate", + "mode" -> "append", + "trigger" -> "Continuous", + "duration" -> "6 hours", + "timeout" -> "24 hours", + "options" -> Map( "\"es.net.ssl.cert.allow.self.signed\"" -> true, "\"es.index.auto.create\"" -> true, "\"es.mapping.id\"" -> "docId", @@ -80,22 +80,11 @@ class ElkOutputTest extends AnyWordSpec with Matchers { ) ) - val elkStreamingOutput = ElkOutput.apply(config.getConfig("Output")) + intercept[Exception] { + val elkStreamingOutput = ElkOutput.apply(config.getConfig("output")) - elkStreamingOutput.outputName shouldEqual None - elkStreamingOutput.index shouldEqual "test.index" - elkStreamingOutput.dateField shouldEqual "docDate" - elkStreamingOutput.suffixDatePattern shouldEqual "yyyy.MM" - elkStreamingOutput.mode shouldEqual "append" - elkStreamingOutput.trigger shouldEqual Some(Trigger.Continuous(Duration("6 hours"))) - elkStreamingOutput.timeout shouldEqual 86400000 - elkStreamingOutput.options shouldEqual Map( - "es.net.ssl.cert.allow.self.signed" -> "true", - "es.index.auto.create" -> "true", - "es.mapping.id" -> "docId", - "es.port" -> "9200", - "es.nodes" -> "bktv001, bktv002.amadeus.net" - ) + fail("Expected an exception to be thrown due to missing required `name` field in configuration.") + } } @@ -103,18 +92,18 @@ class ElkOutputTest extends AnyWordSpec with Matchers { val config = ConfigFactory.parseMap( Map( - "Output" -> Map( - "Type" -> "com.amadeus.dataio.output.streaming.ElkOutput", - "Name" -> "my-test-elk", - "Nodes" -> "bktv001, bktv002.amadeus.net", - "Ports" -> "9200", - "Index" -> "test.index", - "DateField" -> "docDate", - "SubIndexDatePattern" -> "yyyy.MM.dd", - "Mode" -> "append", - "Trigger" -> "AvailableNow", - "Timeout" -> "24", - "Options" -> Map( + "output" -> Map( + "type" -> "com.amadeus.dataio.output.streaming.ElkOutput", + "name" -> "my-test-elk", + "nodes" -> "bktv001, bktv002.amadeus.net", + "ports" -> "9200", + "index" -> "test.index", + "dateField" -> "docDate", + "subIndexDatePattern" -> "yyyy.MM.dd", + "mode" -> "append", + "trigger" -> "AvailableNow", + "timeout" -> "24 hours", + "options" -> Map( "\"es.net.ssl.cert.allow.self.signed\"" -> true, "\"es.index.auto.create\"" -> true, "\"es.mapping.id\"" -> "docId", @@ -125,9 +114,9 @@ class ElkOutputTest extends AnyWordSpec with Matchers { ) ) - val elkStreamingOutput = ElkOutput.apply(config.getConfig("Output")) + val elkStreamingOutput = ElkOutput.apply(config.getConfig("output")) - elkStreamingOutput.outputName shouldEqual Some("my-test-elk") + elkStreamingOutput.name shouldEqual "my-test-elk" elkStreamingOutput.index shouldEqual "test.index" elkStreamingOutput.dateField shouldEqual "docDate" elkStreamingOutput.suffixDatePattern shouldEqual "yyyy.MM.dd" @@ -148,17 +137,17 @@ class ElkOutputTest extends AnyWordSpec with Matchers { val config = ConfigFactory.parseMap( Map( - "Output" -> Map( - "Type" -> "com.amadeus.dataio.output.streaming.ElkOutput", - "Name" -> "my-test-elk", - "Nodes" -> "bktv001, bktv002.amadeus.net", - "Ports" -> "9200", - "Index" -> "test.index", - "DateField" -> "docDate", - "SubIndexDatePattern" -> "yyyy.MM.dd", - "Mode" -> "append", - "Timeout" -> "24", - "Options" -> Map( + "output" -> Map( + "type" -> "com.amadeus.dataio.output.streaming.ElkOutput", + "name" -> "my-test-elk", + "nodes" -> "bktv001, bktv002.amadeus.net", + "ports" -> "9200", + "index" -> "test.index", + "dateField" -> "docDate", + "subIndexDatePattern" -> "yyyy.MM.dd", + "mode" -> "append", + "timeout" -> "24 hours", + "options" -> Map( "\"es.net.ssl.cert.allow.self.signed\"" -> true, "\"es.index.auto.create\"" -> true, "\"es.mapping.id\"" -> "docId", @@ -169,9 +158,9 @@ class ElkOutputTest extends AnyWordSpec with Matchers { ) ) - val elkStreamingOutput = ElkOutput.apply(config.getConfig("Output")) + val elkStreamingOutput = ElkOutput.apply(config.getConfig("output")) - elkStreamingOutput.outputName shouldEqual Some("my-test-elk") + elkStreamingOutput.name shouldEqual "my-test-elk" elkStreamingOutput.index shouldEqual "test.index" elkStreamingOutput.dateField shouldEqual "docDate" elkStreamingOutput.suffixDatePattern shouldEqual "yyyy.MM.dd" @@ -196,29 +185,20 @@ class ElkOutputTest extends AnyWordSpec with Matchers { "return a query name based on index name" in { val elkOutput = - ElkOutput(index = "test.index", trigger = None, timeout = 0L, mode = "", dateField = "docDate", suffixDatePattern = "yyyy.MM", outputName = None) + ElkOutput( + index = "test.index", + trigger = None, + timeout = 0L, + mode = "", + dateField = "docDate", + suffixDatePattern = "yyyy.MM", + name = "" + ) val queryName = elkOutput.createQueryName() queryName should fullyMatch regex "^QN_test.index_" + uuidPattern + "$" } - - "return a query name based on output name" in { - - val elkOutput = ElkOutput( - index = "test.index", - trigger = None, - timeout = 0L, - mode = "", - dateField = "docDate", - suffixDatePattern = "yyyy.MM", - outputName = Some("myTestOutput") - ) - - val queryName = elkOutput.createQueryName() - - queryName should fullyMatch regex "^QN_myTestOutput_test.index_" + uuidPattern + "$" - } } } diff --git a/kafka/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala b/kafka/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala index 6ca60c3..583f164 100644 --- a/kafka/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala +++ b/kafka/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala @@ -8,16 +8,16 @@ import org.apache.spark.sql.{Dataset, SparkSession} import scala.util.Try /** Class for reading kafka dataframe - * - * @param config Contains the Typesafe Config object that was used at instantiation to configure this entity. - */ + * + * @param config Contains the Typesafe Config object that was used at instantiation to configure this entity. + */ case class KafkaOutput( - name: String, + name: String, trigger: Option[Trigger], - timeout: Option[Long], + timeout: Option[Long], mode: String, options: Map[String, String] = Map(), - config: Config = ConfigFactory.empty() + config: Config = ConfigFactory.empty() ) extends Output with Logging { @@ -53,9 +53,9 @@ case class KafkaOutput( } /** Create a unique query name based on output topic. - * - * @return a unique query name. - */ + * + * @return a unique query name. + */ private[streaming] def createQueryName(): String = { s"QN_${name}_${java.util.UUID.randomUUID}" @@ -66,10 +66,10 @@ object KafkaOutput { import com.amadeus.dataio.config.fields._ /** Creates an KafkaOutput based on a given configuration. - * - * @param config The collection of config nodes that will be used to instantiate KafkaOutput. - * @return a new instance of KafkaOutput. - */ + * + * @param config The collection of config nodes that will be used to instantiate KafkaOutput. + * @return a new instance of KafkaOutput. + */ def apply(implicit config: Config): KafkaOutput = { val name = Try { config.getString("name") diff --git a/project/SparkProfiles.scala b/project/SparkProfiles.scala new file mode 100644 index 0000000..6350699 --- /dev/null +++ b/project/SparkProfiles.scala @@ -0,0 +1,100 @@ +/** Spark version profiles — the SINGLE SOURCE OF TRUTH for all version-coupled dependencies. + * + * Each profile encapsulates: + * - Spark version (e.g., 3.3.4, 3.4.3, 3.5.3, 4.0.0) + * - Scala version (2.12 or 2.13, derived from Spark requirements) + * - Java target compatibility (8, 11, 17, 21) + * - All connector library versions (Kafka, Snowflake, Elasticsearch, etc.) + * - Test library versions coupled to the Spark/Scala combination + * + * Usage: + * Set the SPARK_PROFILE environment variable before invoking SBT: + * SPARK_PROFILE=spark34 sbt compile + * + * Defaults to "spark35" if not set. + */ +object SparkProfiles { + + case class SparkProfile( + // ── Core versions ────────────────────────────────────────────── + sparkVersion: String, + scalaVersion: String, + javaTarget: String, + // ── Connector versions ───────────────────────────────────────── + sparkSnowflakeVersion: Option[String], + elasticsearchSparkVersion: Option[String], + embeddedKafkaVersion: String, + // ── Common dependency versions ──────────────────────────────── + typesafeConfigVersion: String = "1.4.3", + slf4jApiVersion: String = "2.0.7", + commonsIoVersion: String = "2.13.0", + scalatestVersion: String = "3.2.15", + scalamockVersion: String = "5.2.0" + ) { + + /** Major.Minor string, e.g. "3.5" */ + val sparkBinaryVersion: String = sparkVersion.split("\\.").take(2).mkString(".") + + /** Scala binary version, e.g. "2.12" */ + val scalaBinaryVersion: String = scalaVersion.split("\\.").take(2).mkString(".") + + /** Whether this profile supports Snowflake connector */ + val supportsSnowflake: Boolean = sparkSnowflakeVersion.isDefined + + /** Whether this profile supports Elasticsearch connector */ + val supportsElasticsearch: Boolean = elasticsearchSparkVersion.isDefined + } + + // ════════════════════════════════════════════════════════════════════ + // Profile definitions — update versions HERE when upgrading + // ════════════════════════════════════════════════════════════════════ + + val profiles: Map[String, SparkProfile] = Map( + // ── Spark 3.4.x ────────────────────────────────────────────────── + "spark34" -> SparkProfile( + sparkVersion = "3.4.4", + scalaVersion = "2.12.15", + javaTarget = "11", + sparkSnowflakeVersion = Some("2.16.0-spark_3.4"), + elasticsearchSparkVersion = Some("8.17.4"), + embeddedKafkaVersion = "3.4.1" + ), + // ── Spark 3.5.x (default) ──────────────────────────────────────── + "spark35" -> SparkProfile( + sparkVersion = "3.5.3", + scalaVersion = "2.12.15", + javaTarget = "11", + sparkSnowflakeVersion = Some("3.1.1"), + elasticsearchSparkVersion = None, + embeddedKafkaVersion = "3.5.1" + ), + // ── Spark 4.0.x ────────────────────────────────────────────────── + // NOTE: Spark 4.0 drops Scala 2.12 support. Snowflake & ES + // connectors may not yet support Spark 4.0 — disabled here. + "spark40" -> SparkProfile( + sparkVersion = "4.0.2", + scalaVersion = "2.13.17", + javaTarget = "17", + sparkSnowflakeVersion = None, + elasticsearchSparkVersion = None, + embeddedKafkaVersion = "3.9.1" + ) + ) + + // ════════════════════════════════════════════════════════════════════ + // Active profile resolution + // ════════════════════════════════════════════════════════════════════ + + val DefaultProfile = "spark35" + + val activeProfileName: String = sys.env.getOrElse("SPARK_PROFILE", DefaultProfile) + + val active: SparkProfile = profiles.getOrElse( + activeProfileName, + sys.error(s"Unknown SPARK_PROFILE '$activeProfileName'. Available profiles: ${profiles.keys.mkString(", ")}") + ) + + println( + s"[SparkProfiles] Active profile: $activeProfileName (Spark ${active.sparkVersion}, Scala ${active.scalaVersion}, Java ${active.javaTarget})" + ) +} diff --git a/testutils/src/main/scala/com/amadeus/dataio/testutils/JavaImplicitConverters.scala b/testutils/src/main/scala/com/amadeus/dataio/testutils/JavaImplicitConverters.scala index 3b8ee01..e5e04c1 100644 --- a/testutils/src/main/scala/com/amadeus/dataio/testutils/JavaImplicitConverters.scala +++ b/testutils/src/main/scala/com/amadeus/dataio/testutils/JavaImplicitConverters.scala @@ -1,25 +1,24 @@ package com.amadeus.dataio.testutils -import scala.collection.JavaConverters._ +import scala.jdk.CollectionConverters._ import scala.language.implicitConversions -/** - *

Contains helper implicit conversions to make working with functions expecting Java maps/lists easier.

- *

These conversions are meant for tests only, for instance to create typesafe Config objects.

- *

e.g.: - *

- * import com.amadeus.dataio.test.JavaImplicitConverters._
- * import com.typesafe.config.ConfigFactory
- *
- * val config = ConfigFactory.parseMap(
- *   Map(
- *     "MyField" -> Seq("val1", "val2", "val3"),
- *     "MyOtherField" -> 5
- *   )
- * )
- * 
- *

- */ +/**

Contains helper implicit conversions to make working with functions expecting Java maps/lists easier.

+ *

These conversions are meant for tests only, for instance to create typesafe Config objects.

+ *

e.g.: + *

+  * import com.amadeus.dataio.test.JavaImplicitConverters._
+  * import com.typesafe.config.ConfigFactory
+  *
+  * val config = ConfigFactory.parseMap(
+  *   Map(
+  *     "MyField" -> Seq("val1", "val2", "val3"),
+  *     "MyOtherField" -> 5
+  *   )
+  * )
+  * 
+ *

+ */ object JavaImplicitConverters { import scala.language.implicitConversions diff --git a/testutils/src/main/scala/com/amadeus/dataio/testutils/SparkStreamingSuite.scala b/testutils/src/main/scala/com/amadeus/dataio/testutils/SparkStreamingSuite.scala index 00efa1a..51fa211 100644 --- a/testutils/src/main/scala/com/amadeus/dataio/testutils/SparkStreamingSuite.scala +++ b/testutils/src/main/scala/com/amadeus/dataio/testutils/SparkStreamingSuite.scala @@ -3,45 +3,42 @@ package com.amadeus.dataio.testutils import org.apache.spark.sql.DataFrame import org.apache.spark.sql.streaming.Trigger -/** - * Provides function for spark streaming tests - * - * Initialize the spark session before the tests - * Automatically close the session after TestSuite is finished - * Provides a wrapper to spark.implicits by importing sparkTestImplicits - * Provides functions to collect results - * - * e.g. - * {{{ - * class MyClassTest extends AnyWordSpec with SparkStreamingSuite { - * // provided by SparkStreamingSuite: - * // sparkSession: SparkSession - * // sparkTestImplicits - * } - * }}} - */ +/** Provides function for spark streaming tests + * + * Initialize the spark session before the tests + * Automatically close the session after TestSuite is finished + * Provides a wrapper to spark.implicits by importing sparkTestImplicits + * Provides functions to collect results + * + * e.g. + * {{{ + * class MyClassTest extends AnyWordSpec with SparkStreamingSuite { + * // provided by SparkStreamingSuite: + * // sparkSession: SparkSession + * // sparkTestImplicits + * } + * }}} + */ trait SparkStreamingSuite extends SparkSuite { - /** - * enable spark streaming schema inference - */ + /** enable spark streaming schema inference + */ def enableSparkStreamingSchemaInference(): Unit = sparkSession.sql("set spark.sql.streaming.schemaInference=true") - /** - * Collect data from dataframe read from stream - * - * Use an in memory sink to gather the data - * - * @param dataFrame the dataFrame to collect. - * @return the String representation of each rows. - */ + /** Collect data from dataframe read from stream + * + * Use an in memory sink to gather the data + * + * @param dataFrame the dataFrame to collect. + * @return the String representation of each rows. + */ def collectDataStream(dataFrame: DataFrame): Array[String] = { val tmpTableName = "result_" + System.currentTimeMillis() val streamWriter = dataFrame.writeStream .format("memory") .queryName(tmpTableName) - .trigger(Trigger.Once()) + .trigger(Trigger.AvailableNow()) .start() streamWriter.awaitTermination(2000) diff --git a/testutils/src/main/scala/com/amadeus/dataio/testutils/SparkSuite.scala b/testutils/src/main/scala/com/amadeus/dataio/testutils/SparkSuite.scala index be5fe0c..9968f1d 100644 --- a/testutils/src/main/scala/com/amadeus/dataio/testutils/SparkSuite.scala +++ b/testutils/src/main/scala/com/amadeus/dataio/testutils/SparkSuite.scala @@ -24,10 +24,6 @@ trait SparkSuite extends Suite with BeforeAndAfter { implicit var sparkSession: SparkSession = _ - object sparkTestImplicits extends SQLImplicits with Serializable { - protected override def _sqlContext: SQLContext = sparkSession.sqlContext - } - before { sparkSession = SparkSession .builder() diff --git a/version.sbt b/version.sbt index 6075046..22af523 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -ThisBuild / version := "1.1.1-spark3.5.0-SNAPSHOT" +ThisBuild / version := s"1.1.1-spark${SparkProfiles.active.sparkVersion}-SNAPSHOT"