From 34ae287ff28ce09e96c9d506889017b2c0bbfeb7 Mon Sep 17 00:00:00 2001
From: Guillaume LECLERC
Date: Wed, 15 Apr 2026 15:47:47 +0200
Subject: [PATCH] Add cross build capabilities for several spark version
---
.github/workflows/ci.yml | 39 ++-
.github/workflows/publish.yml | 87 +++++-
BUILDING.md | 153 +++++++++++
README.md | 33 ++-
build.sbt | 155 +++++++----
.../dataio/config/ConfigNodeCollection.scala | 4 +-
.../fields/DropDuplicatesConfigurator.scala | 2 +-
.../fields/PartitionByConfigurator.scala | 2 +-
.../SortWithinPartitionsConfigurator.scala | 2 +-
.../dataio/core/InstantiationHelper.scala | 47 ++--
.../com/amadeus/dataio/core/time/Time.scala | 254 +++++++++---------
.../dataio/pipes/elk/ElkConfigurator.scala | 40 ++-
.../dataio/pipes/elk/batch/ElkOutput.scala | 59 ++--
.../pipes/elk/streaming/ElkOutput.scala | 84 +++---
.../pipes/elk/ElkConfiguratorTest.scala | 6 +-
.../pipes/elk/batch/ElkOutputTest.scala | 42 +--
.../pipes/elk/streaming/ElkOutputTest.scala | 150 +++++------
.../pipes/kafka/streaming/KafkaOutput.scala | 26 +-
project/SparkProfiles.scala | 100 +++++++
.../testutils/JavaImplicitConverters.scala | 35 ++-
.../testutils/SparkStreamingSuite.scala | 55 ++--
.../amadeus/dataio/testutils/SparkSuite.scala | 4 -
version.sbt | 2 +-
23 files changed, 885 insertions(+), 496 deletions(-)
create mode 100644 BUILDING.md
create mode 100644 project/SparkProfiles.scala
diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml
index cd0bd25..fbcfccb 100644
--- a/.github/workflows/ci.yml
+++ b/.github/workflows/ci.yml
@@ -16,29 +16,52 @@ on:
jobs:
build:
runs-on: ubuntu-latest
+
+ strategy:
+ fail-fast: false
+ matrix:
+ include:
+ - spark_profile: spark34
+ java_version: "11"
+ label: "Spark 3.4 / Java 11"
+ - spark_profile: spark35
+ java_version: "11"
+ label: "Spark 3.5 / Java 11"
+ - spark_profile: spark40
+ java_version: "17"
+ label: "Spark 4.0 / Java 17"
+
+ name: "Build (${{ matrix.label }})"
+
+ env:
+ SPARK_PROFILE: ${{ matrix.spark_profile }}
+
steps:
- uses: actions/checkout@v4
- - uses: actions/setup-java@v3
+ - uses: actions/setup-java@v4
with:
distribution: "temurin"
- java-version: 8
+ java-version: ${{ matrix.java_version }}
cache: "sbt"
- uses: sbt/setup-sbt@v1
with:
sbt-runner-version: 1.10.11
- - run: sbt compile
+ - name: Compile
+ run: sbt compile
- - run: sbt test
+ - name: Test
+ run: sbt test
- - run: sbt package
+ - name: Package
+ run: sbt package
- - run: tar cf artefacts.tar target/ */target/
+ - name: Bundle artifacts
+ run: tar cf artefacts.tar target/ */target/
- uses: actions/upload-artifact@v4
with:
- name: Artefacts
+ name: Artefacts-${{ matrix.spark_profile }}
path: artefacts.tar
-
diff --git a/.github/workflows/publish.yml b/.github/workflows/publish.yml
index 640442d..3fb73d3 100644
--- a/.github/workflows/publish.yml
+++ b/.github/workflows/publish.yml
@@ -11,8 +11,13 @@ on:
- MINOR
- MAJOR
+run-name: ${{ format('Publishing new version - {0}', inputs.release_type) }}
+
jobs:
- publish:
+ # ────────────────────────────────────────────────────────────────────
+ # Phase 1: Create release tag (runs once, on default Spark profile)
+ # ────────────────────────────────────────────────────────────────────
+ release-tag:
runs-on: ubuntu-latest
permissions:
contents: write
@@ -22,6 +27,10 @@ jobs:
env:
GITHUB_REGISTRY_TOKEN: ${{ secrets.GITHUB_TOKEN }}
RELEASE_TYPE: ${{ inputs.release_type }}
+ SPARK_PROFILE: spark35
+ outputs:
+ release_tag: ${{ steps.get_tag.outputs.tag }}
+ release_base_version: ${{ steps.extract_version.outputs.base_version }}
steps:
- uses: actions/checkout@v4
@@ -30,9 +39,9 @@ jobs:
git config user.name "github-actions[bot]"
git config user.email "41898282+github-actions[bot]@users.noreply.github.com"
- - uses: actions/setup-java@v3
+ - uses: actions/setup-java@v4
with:
- java-version: 8
+ java-version: 11
distribution: "temurin"
cache: "sbt"
@@ -40,7 +49,75 @@ jobs:
with:
sbt-runner-version: 1.10.11
- - name: Run SBT release
+ - name: Run SBT release (tag + publish default profile)
run: sbt 'release with-defaults'
-run-name: ${{ format('Publishing new version - {0}', inputs.release_type) }}
\ No newline at end of file
+ - name: Extract release tag
+ id: get_tag
+ run: echo "tag=$(git describe --tags --abbrev=0)" >> "$GITHUB_OUTPUT"
+
+ - name: Extract base version (without Spark suffix)
+ id: extract_version
+ run: |
+ # Tag format is e.g. "v1.2.0-spark3.5.3" — extract "1.2.0"
+ TAG=$(git describe --tags --abbrev=0)
+ BASE_VERSION=$(echo "$TAG" | sed 's/^v//' | sed 's/-spark.*//')
+ echo "base_version=$BASE_VERSION" >> "$GITHUB_OUTPUT"
+
+ # ────────────────────────────────────────────────────────────────────
+ # Phase 2: Publish all other Spark profiles from the release tag
+ # ────────────────────────────────────────────────────────────────────
+ publish-profiles:
+ needs: release-tag
+ runs-on: ubuntu-latest
+ permissions:
+ contents: read
+ packages: write
+ attestations: write
+ id-token: write
+
+ strategy:
+ fail-fast: false
+ matrix:
+ include:
+ - spark_profile: spark34
+ java_version: "11"
+ label: "Spark 3.4"
+ - spark_profile: spark35
+ java_version: "11"
+ label: "Spark 3.5"
+ - spark_profile: spark40
+ java_version: "17"
+ label: "Spark 4.0"
+
+ name: "Publish (${{ matrix.label }})"
+
+ env:
+ GITHUB_REGISTRY_TOKEN: ${{ secrets.GITHUB_TOKEN }}
+ SPARK_PROFILE: ${{ matrix.spark_profile }}
+
+ steps:
+ - uses: actions/checkout@v4
+ with:
+ ref: ${{ needs.release-tag.outputs.release_tag }}
+
+ - uses: actions/setup-java@v4
+ with:
+ java-version: ${{ matrix.java_version }}
+ distribution: "temurin"
+ cache: "sbt"
+
+ - uses: sbt/setup-sbt@v1
+ with:
+ sbt-runner-version: 1.10.11
+
+ - name: Set version for this Spark profile
+ env:
+ BASE_VERSION: ${{ needs.release-tag.outputs.release_base_version }}
+ run: |
+ # sbt-release wrote a hardcoded version at the tag (e.g. 1.2.0-spark3.5.3).
+ # We need to re-derive it for this profile's Spark version.
+ echo "ThisBuild / version := s\"${BASE_VERSION}-spark\${SparkProfiles.active.sparkVersion}\"" > version.sbt
+
+ - name: Publish JARs for ${{ matrix.label }}
+ run: sbt publish
diff --git a/BUILDING.md b/BUILDING.md
new file mode 100644
index 0000000..4fcf332
--- /dev/null
+++ b/BUILDING.md
@@ -0,0 +1,153 @@
+# Building Data I/O
+
+This document describes how the multi-Spark-version build and release system works.
+
+## Architecture Overview
+
+Data I/O publishes artifacts for multiple Apache Spark versions from a **single codebase**.
+All version-coupled values are defined in one place:
+
+```
+project/SparkProfiles.scala ← Single source of truth for ALL versioned dependencies
+build.sbt ← References SparkProfiles.active (no hardcoded versions)
+version.sbt ← Derives the Spark tag dynamically from the active profile
+```
+
+### How It Works
+
+1. **`project/SparkProfiles.scala`** defines a `SparkProfile` case class containing:
+ - Spark version (e.g., `3.5.3`)
+ - Scala version (e.g., `2.12.15`)
+ - Java target (e.g., `11`)
+ - All connector library versions (Snowflake, Elasticsearch, Embedded Kafka, etc.)
+ - Feature flags (`supportsSnowflake`, `supportsElasticsearch`)
+
+2. **Profile selection** is driven by the `SPARK_PROFILE` environment variable:
+ ```bash
+ SPARK_PROFILE=spark34 sbt compile
+ ```
+ If unset, it defaults to `spark35`.
+
+3. **`build.sbt`** reads `SparkProfiles.active` and uses it for all `scalaVersion`,
+ `javacOptions`, and `libraryDependencies` settings. Connector modules (Snowflake,
+ Elasticsearch) are conditionally included based on profile feature flags.
+
+4. **`version.sbt`** embeds the Spark version in the artifact version string:
+ ```
+ 1.1.1-spark3.5.3-SNAPSHOT
+ ```
+
+## Supported Profiles
+
+| Profile | Spark | Scala | Java | Snowflake | Elasticsearch |
+|------------|-------|---------|------|-------------------|------------------|
+| `spark34` | 3.4.4 | 2.12.15 | 11 | 2.16.0-spark_3.4 | 8.17.4 |
+| `spark35` | 3.5.3 | 2.12.15 | 11 | 3.1.1 | 8.17.4 |
+| `spark40` | 4.0.2 | 2.13.17 | 17 | — | — |
+
+## Local Development
+
+```bash
+# Default profile (Spark 3.5)
+sbt compile test package
+
+# Specific profile
+SPARK_PROFILE=spark35 sbt compile test
+
+# Build all profiles locally (shell loop)
+for profile in spark34 spark35 spark40; do
+ echo "=== Building $profile ==="
+ SPARK_PROFILE=$profile sbt clean compile test
+done
+```
+
+## CI Pipeline
+
+The GitHub Actions CI workflow (`.github/workflows/ci.yml`) uses a **matrix strategy**
+to build, test, and package all supported Spark profiles in parallel:
+
+```yaml
+strategy:
+ matrix:
+ include:
+ - spark_profile: spark34
+ java_version: "11"
+ - spark_profile: spark35
+ java_version: "11"
+ - spark_profile: spark40
+ java_version: "17"
+```
+
+Each matrix leg runs with the appropriate Java version and `SPARK_PROFILE` env var.
+Artifacts are uploaded with profile-specific names (e.g., `Artefacts-spark34`).
+
+## Release Process
+
+The publish workflow (`.github/workflows/publish.yml`) uses a **two-phase approach**:
+
+### Phase 1: Release Tag (runs once)
+1. Runs `sbt 'release with-defaults'` with the default Spark profile (`spark35`)
+2. This creates the git tag, commits version bumps, and publishes the Spark 3.5 artifacts
+3. The release tag is captured as an output for Phase 2
+
+### Phase 2: Publish Other Profiles (matrix, runs in parallel)
+1. Checks out the release tag from Phase 1
+2. Regenerates `version.sbt` with the correct Spark-tagged version for the profile
+ (needed because sbt-release hardcodes the version string at the tag commit)
+3. For each additional profile (`spark34`, `spark40`, ...), runs `sbt publish`
+4. Each profile publishes artifacts with its own Spark-tagged version string
+
+### Artifact Naming
+
+Published artifacts follow this convention:
+```
+com.amadeus.dataio:dataio-core_2.12:1.2.0-spark4.0.2
+com.amadeus.dataio:dataio-core_2.12:1.2.0-spark3.5.3
+com.amadeus.dataio:dataio-core_2.12:1.2.0-spark3.4.4
+```
+
+## Adding a New Spark Version
+
+To add support for a new Spark version:
+
+1. **Edit `project/SparkProfiles.scala`** — add a new entry to the `profiles` map:
+ ```scala
+ "spark41" -> SparkProfile(
+ sparkVersion = "4.1.1",
+ scalaVersion = "2.13.18",
+ javaTarget = "17",
+ sparkSnowflakeVersion = Some("x.y.z"),
+ elasticsearchSparkVersion = Some("8.x.y"),
+ embeddedKafkaVersion = "3.6.0"
+ )
+ ```
+
+2. **Update CI matrix** — add the profile to `.github/workflows/ci.yml`:
+ ```yaml
+ - spark_profile: spark41
+ java_version: "17"
+ label: "Spark 4.0 / Java 17"
+ ```
+
+3. **Update Publish matrix** — add to `.github/workflows/publish.yml`
+
+4. **Update README.md** — add badge and compatibility table entry
+
+That's it. No changes to `build.sbt` or any source code are needed.
+
+## Removing a Spark Version
+
+1. Remove the entry from `project/SparkProfiles.scala`
+2. Remove from CI and Publish workflow matrices
+3. Update README
+
+## Connector Availability
+
+Some connectors may not support all Spark versions (e.g., Spark 4.0). The `SparkProfile`
+uses `Option[String]` for connector versions:
+- `Some("x.y.z")` → connector is included in the build and published
+- `None` → connector module is excluded from aggregation and `publish/skip := true`
+
+This is handled automatically — no manual module toggling needed.
+
+
diff --git a/README.md b/README.md
index 2884a1a..33ddeef 100644
--- a/README.md
+++ b/README.md
@@ -1,7 +1,8 @@
# Data I/O
[](https://opensource.org/licenses/Apache-2.0)
-[](https://spark.apache.org/releases/spark-release-3-4-1.html)
+[](https://spark.apache.org/releases/spark-release-3-4-4.html)
+[](https://spark.apache.org/releases/spark-release-3-5-3.html)
[](https://www.scala-lang.org/)
[][contributing]
@@ -14,6 +15,36 @@ Data I/O is an open source project that provides a flexible and scalable framewo
- Support for batch and streaming data processing
- Extensible architecture for custom data processors and pipelines
- Scalable and fault-tolerant processing using Apache Spark
+- **Multi-Spark version support** — builds and publishes for Spark 3.3, 3.4, and 3.5 from a single codebase
+
+## Supported Spark Versions
+
+| Profile | Spark | Scala | Java | Snowflake | Elasticsearch |
+|------------|-------|-------|------|-----------|---------------|
+| `spark34` | 3.4.4 | 2.12 | 11 | ✅ | ✅ |
+| `spark35` | 3.5.3 | 2.12 | 11 | ✅ | ✅ |
+| `spark40` | 4.0.2 | 2.13 | 17 | ❌ | ❌ |
+
+> **Note:** Spark 4.0 support is experimental. Snowflake and Elasticsearch connectors
+> do not yet have Spark 4.0–compatible releases.
+
+## Building Locally
+
+Select a Spark profile via the `SPARK_PROFILE` environment variable (defaults to `spark35`):
+
+```bash
+# Build for Spark 3.4
+SPARK_PROFILE=spark34 sbt compile
+
+# Run tests for Spark 3.4
+SPARK_PROFILE=spark34 sbt test
+
+# Package for Spark 3.5 (default)
+sbt package
+```
+
+All version-coupled dependencies (Spark, Scala, Java target, connectors) are defined in
+[`project/SparkProfiles.scala`](project/SparkProfiles.scala) — the single source of truth.
## Getting Started
To get started with Data I/O, please refer to the [documentation][gettingstarted] for installation instructions, usage examples, and API references.
diff --git a/build.sbt b/build.sbt
index a4a91cd..d8b83d5 100644
--- a/build.sbt
+++ b/build.sbt
@@ -1,16 +1,31 @@
-// BUILD SETUP
+// ════════════════════════════════════════════════════════════════════════════
+// BUILD SETUP — all version-coupled values come from SparkProfiles
+// See: project/SparkProfiles.scala (the single source of truth)
+// ════════════════════════════════════════════════════════════════════════════
+
+val spark = SparkProfiles.active
+
ThisBuild / organization := "com.amadeus.dataio"
ThisBuild / versionScheme := Some("early-semver")
-ThisBuild / scalaVersion := "2.12.15"
+ThisBuild / scalaVersion := spark.scalaVersion
+ThisBuild / javacOptions ++= Seq("-source", spark.javaTarget, "-target", spark.javaTarget)
+ThisBuild / scalacOptions += "-deprecation"
+
+ThisBuild / Test / javaOptions ++= Seq(
+ "--add-opens=java.base/sun.util.calendar=ALL-UNNAMED",
+ "--add-opens=java.base/java.lang=ALL-UNNAMED",
+ "--add-opens=java.base/java.nio=ALL-UNNAMED",
+ "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED",
+ "--add-opens=java.base/java.util=ALL-UNNAMED",
+ "--add-opens=java.base/java.lang.invoke=ALL-UNNAMED"
+)
+// Required for javaOptions to take effect with forked JVM
+ThisBuild / Test / fork := true
-val scalatestVersion = "3.2.15"
-val scalamockVersion = "5.2.0"
-val sparkVersion = "3.5.0"
-val typesafeConfigVersion = "1.4.3"
-val slf4jApiVersion = "2.0.7"
-val commonsIoVersion = "2.13.0"
+// ════════════════════════════════════════════════════════════════════════════
+// RELEASE SETUP
+// ════════════════════════════════════════════════════════════════════════════
-// RELEASE SETUP
import sbt.Keys.libraryDependencies
import sbtrelease.ReleaseStateTransformations.*
@@ -18,11 +33,11 @@ def getReleaseVersion(ver: String, bumpType: String): String = {
val pattern = """(\d+)\.(\d+)\.(\d+)-(spark[\d.]+)-SNAPSHOT""".r
ver match {
- case pattern(major, minor, patch, sparkVersion) =>
+ case pattern(major, minor, patch, sparkTag) =>
bumpType match {
- case "MAJOR" => s"${major.toInt + 1}.0.0-$sparkVersion"
- case "MINOR" => s"$major.${minor.toInt + 1}.0-$sparkVersion"
- case "PATCH" => s"$major.$minor.$patch-$sparkVersion"
+ case "MAJOR" => s"${major.toInt + 1}.0.0-$sparkTag"
+ case "MINOR" => s"$major.${minor.toInt + 1}.0-$sparkTag"
+ case "PATCH" => s"$major.$minor.$patch-$sparkTag"
case _ => sys.error(s"Invalid RELEASE_TYPE: $bumpType")
}
case _ => sys.error(s"Invalid version format: $ver")
@@ -33,8 +48,8 @@ def getReleaseNextVersion(ver: String): String = {
val pattern = """(\d+)\.(\d+)\.(\d+)-(spark[\d.]+)""".r
ver match {
- case pattern(major, minor, patch, sparkVersion) =>
- s"$major.$minor.${patch.toInt + 1}-$sparkVersion-SNAPSHOT"
+ case pattern(major, minor, patch, sparkTag) =>
+ s"$major.$minor.${patch.toInt + 1}-$sparkTag-SNAPSHOT"
case _ => sys.error(s"Invalid version format: $ver")
}
}
@@ -55,7 +70,10 @@ ThisBuild / releaseProcess := Seq[ReleaseStep](
pushChanges // Push everything to Git
)
-// Global GitHub Packages settings
+// ════════════════════════════════════════════════════════════════════════════
+// PUBLISHING SETUP — GitHub Packages
+// ════════════════════════════════════════════════════════════════════════════
+
ThisBuild / credentials += Credentials(
"GitHub Package Registry",
"maven.pkg.github.com",
@@ -70,7 +88,6 @@ ThisBuild / publishTo := Some(
// ThisBuild / publishTo := Some(Resolver.file("local-maven", file(Path.userHome.absolutePath + "/.m2/repository")))
ThisBuild / publishMavenStyle := true
-// Additional Maven metadata
ThisBuild / pomIncludeRepository := { _ => false }
ThisBuild / pomExtra :=
https://github.com/AmadeusITGroup/dataio-framework
@@ -81,21 +98,31 @@ ThisBuild / pomExtra :=
-// TESTS SETUP
+// ════════════════════════════════════════════════════════════════════════════
+// TESTS SETUP
+// ════════════════════════════════════════════════════════════════════════════
+
ThisBuild / Test / parallelExecution := false
ThisBuild / Test / publishArtifact := false
-// PROJECTS SETUP
+// ════════════════════════════════════════════════════════════════════════════
+// SHARED DEPENDENCIES (driven by SparkProfiles)
+// ════════════════════════════════════════════════════════════════════════════
+
lazy val commonSettings = Seq(
libraryDependencies ++= Seq(
- "org.apache.spark" %% "spark-sql" % sparkVersion,
- "org.apache.spark" %% "spark-core" % sparkVersion,
- "com.typesafe" % "config" % typesafeConfigVersion,
- "org.scalatest" %% "scalatest" % scalatestVersion % Test,
- "org.scalamock" %% "scalamock" % scalamockVersion % Test
+ "org.apache.spark" %% "spark-sql" % spark.sparkVersion,
+ "org.apache.spark" %% "spark-core" % spark.sparkVersion,
+ "com.typesafe" % "config" % spark.typesafeConfigVersion,
+ "org.scalatest" %% "scalatest" % spark.scalatestVersion % Test,
+ "org.scalamock" %% "scalamock" % spark.scalamockVersion % Test
)
)
+// ════════════════════════════════════════════════════════════════════════════
+// PROJECTS
+// ════════════════════════════════════════════════════════════════════════════
+
/** Shared traits and functions for testing inside Data I/O sub projects.
* It should not be published, and only be used in the Data I/O project itself.
* @see [[test]] For testing applications made with Data I/O.
@@ -103,11 +130,11 @@ lazy val commonSettings = Seq(
lazy val testutils = (project in file("testutils"))
.settings(
libraryDependencies ++= Seq(
- "org.apache.spark" %% "spark-sql" % sparkVersion,
- "org.apache.spark" %% "spark-core" % sparkVersion,
- "com.typesafe" % "config" % typesafeConfigVersion,
- "org.scalatest" %% "scalatest" % scalatestVersion,
- "org.scalamock" %% "scalamock" % scalamockVersion
+ "org.apache.spark" %% "spark-sql" % spark.sparkVersion,
+ "org.apache.spark" %% "spark-core" % spark.sparkVersion,
+ "com.typesafe" % "config" % spark.typesafeConfigVersion,
+ "org.scalatest" %% "scalatest" % spark.scalatestVersion,
+ "org.scalamock" %% "scalamock" % spark.scalamockVersion
),
publish / skip := true
)
@@ -117,9 +144,10 @@ lazy val core = (project in file("core"))
commonSettings,
name := "dataio-core",
libraryDependencies ++= Seq(
- "org.slf4j" % "slf4j-api" % slf4jApiVersion,
- "commons-io" % "commons-io" % commonsIoVersion
- )
+ "org.slf4j" % "slf4j-api" % spark.slf4jApiVersion,
+ "commons-io" % "commons-io" % spark.commonsIoVersion
+ ),
+ Test / baseDirectory := (ThisBuild / baseDirectory).value // <-- fix CWD for forked JVM
)
.dependsOn(testutils % Test)
@@ -128,20 +156,26 @@ lazy val kafka = (project in file("kafka"))
commonSettings,
name := "dataio-kafka",
libraryDependencies ++= Seq(
- "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion,
- "io.github.embeddedkafka" %% "embedded-kafka" % "3.5.1" % Test,
- "io.github.embeddedkafka" %% "embedded-kafka-streams" % "3.5.1" % Test
- )
+ "org.apache.spark" %% "spark-sql-kafka-0-10" % spark.sparkVersion,
+ "io.github.embeddedkafka" %% "embedded-kafka" % spark.embeddedKafkaVersion % Test,
+ "io.github.embeddedkafka" %% "embedded-kafka-streams" % spark.embeddedKafkaVersion % Test
+ ),
+ Test / baseDirectory := (ThisBuild / baseDirectory).value // <-- fix CWD for forked JVM
)
.dependsOn(core, testutils % Test)
+// ── Conditionally-included modules (some connectors don't support all Spark versions) ──
+
lazy val snowflake = (project in file("snowflake"))
.settings(
commonSettings,
name := "dataio-snowflake",
- libraryDependencies ++= Seq(
- "net.snowflake" %% "spark-snowflake" % f"3.1.1"
- )
+ libraryDependencies ++= spark.sparkSnowflakeVersion.toSeq.map { v =>
+ "net.snowflake" %% "spark-snowflake" % v
+ },
+ // If the connector is unavailable for this profile, skip publishing an empty JAR
+ publish / skip := !spark.supportsSnowflake,
+ Test / baseDirectory := (ThisBuild / baseDirectory).value // <-- fix CWD for forked JVM
)
.dependsOn(core, testutils % Test)
@@ -149,16 +183,19 @@ lazy val elasticsearch = (project in file("elasticsearch"))
.settings(
commonSettings,
name := "dataio-elasticsearch",
- libraryDependencies ++= Seq(
- "org.elasticsearch" %% "elasticsearch-spark-30" % "8.17.4"
- exclude ("org.scala-lang", "scala-library")
- exclude ("org.scala-lang", "scala-reflect")
- exclude ("org.slf4j", "slf4j-api")
- exclude ("org.apache.spark", "spark-core_" + scalaVersion.value.substring(0, 4))
- exclude ("org.apache.spark", "spark-sql_" + scalaVersion.value.substring(0, 4))
- exclude ("org.apache.spark", "spark-catalyst_" + scalaVersion.value.substring(0, 4))
- exclude ("org.apache.spark", "spark-streaming_" + scalaVersion.value.substring(0, 4))
- )
+ libraryDependencies ++= spark.elasticsearchSparkVersion.toSeq.map { v =>
+ "org.elasticsearch" %% "elasticsearch-spark-30" % v exclude
+ ("org.scala-lang", "scala-library") exclude
+ ("org.scala-lang", "scala-reflect") exclude
+ ("org.slf4j", "slf4j-api") exclude
+ ("org.apache.spark", s"spark-core_${spark.scalaBinaryVersion}") exclude
+ ("org.apache.spark", s"spark-sql_${spark.scalaBinaryVersion}") exclude
+ ("org.apache.spark", s"spark-catalyst_${spark.scalaBinaryVersion}") exclude
+ ("org.apache.spark", s"spark-streaming_${spark.scalaBinaryVersion}")
+ },
+ // If the connector is unavailable for this profile, skip publishing an empty JAR
+ publish / skip := !spark.supportsElasticsearch,
+ Test / baseDirectory := (ThisBuild / baseDirectory).value // <-- fix CWD for forked JVM
)
.dependsOn(core, testutils % Test)
@@ -167,16 +204,26 @@ lazy val test = (project in file("test"))
commonSettings,
name := "dataio-test",
libraryDependencies ++= Seq(
- "org.scalatest" %% "scalatest" % scalatestVersion,
- "org.scalamock" %% "scalamock" % scalamockVersion
- )
+ "org.scalatest" %% "scalatest" % spark.scalatestVersion,
+ "org.scalamock" %% "scalamock" % spark.scalamockVersion
+ ),
+ Test / baseDirectory := (ThisBuild / baseDirectory).value // <-- fix CWD for forked JVM
)
.dependsOn(core, testutils % Test)
-// Projects configuration
+// ════════════════════════════════════════════════════════════════════════════
+// ROOT AGGREGATE — dynamically includes only modules available for this profile
+// ════════════════════════════════════════════════════════════════════════════
+
+lazy val alwaysModules: Seq[ProjectReference] = Seq(core, test, kafka)
+
+lazy val conditionalModules: Seq[ProjectReference] =
+ (if (spark.supportsSnowflake) Seq[ProjectReference](snowflake) else Nil) ++
+ (if (spark.supportsElasticsearch) Seq[ProjectReference](elasticsearch) else Nil)
+
lazy val root = (project in file("."))
.settings(
name := "dataio",
publish / skip := true
)
- .aggregate(core, test, kafka, snowflake)
+ .aggregate(alwaysModules ++ conditionalModules: _*)
diff --git a/core/src/main/scala/com/amadeus/dataio/config/ConfigNodeCollection.scala b/core/src/main/scala/com/amadeus/dataio/config/ConfigNodeCollection.scala
index 93fa506..d30ba85 100644
--- a/core/src/main/scala/com/amadeus/dataio/config/ConfigNodeCollection.scala
+++ b/core/src/main/scala/com/amadeus/dataio/config/ConfigNodeCollection.scala
@@ -25,14 +25,14 @@ object ConfigNodeCollection extends Logging {
* config argument.
*/
def apply(nodeName: String, config: Config): ConfigNodeCollection = {
- import collection.JavaConverters._
+ import scala.collection.JavaConverters._
if (!config.hasPath(nodeName))
return ConfigNodeCollection(Nil)
val rawConfigs: Seq[Config] =
Try {
- config.getConfigList(nodeName).asScala
+ config.getConfigList(nodeName).asScala.toSeq
} orElse Try {
config.getConfig(nodeName) +: Nil
} getOrElse {
diff --git a/core/src/main/scala/com/amadeus/dataio/config/fields/DropDuplicatesConfigurator.scala b/core/src/main/scala/com/amadeus/dataio/config/fields/DropDuplicatesConfigurator.scala
index c03f071..9bc9d1d 100644
--- a/core/src/main/scala/com/amadeus/dataio/config/fields/DropDuplicatesConfigurator.scala
+++ b/core/src/main/scala/com/amadeus/dataio/config/fields/DropDuplicatesConfigurator.scala
@@ -15,7 +15,7 @@ trait DropDuplicatesConfigurator {
Try {
config.getString("drop_duplicates").split(",").map(_.trim).toList
} orElse Try {
- config.getStringList("drop_duplicates").asScala
+ config.getStringList("drop_duplicates").asScala.toSeq
} getOrElse {
Seq[String]()
}
diff --git a/core/src/main/scala/com/amadeus/dataio/config/fields/PartitionByConfigurator.scala b/core/src/main/scala/com/amadeus/dataio/config/fields/PartitionByConfigurator.scala
index 4a770e5..9ea7991 100644
--- a/core/src/main/scala/com/amadeus/dataio/config/fields/PartitionByConfigurator.scala
+++ b/core/src/main/scala/com/amadeus/dataio/config/fields/PartitionByConfigurator.scala
@@ -15,7 +15,7 @@ trait PartitionByConfigurator extends Logging {
Try {
config.getString("partition_by").split(",").map(_.trim).toList
} orElse Try {
- config.getStringList("partition_by").asScala
+ config.getStringList("partition_by").asScala.toSeq
} getOrElse {
Seq[String]()
}
diff --git a/core/src/main/scala/com/amadeus/dataio/config/fields/SortWithinPartitionsConfigurator.scala b/core/src/main/scala/com/amadeus/dataio/config/fields/SortWithinPartitionsConfigurator.scala
index 48ef216..bafe33f 100644
--- a/core/src/main/scala/com/amadeus/dataio/config/fields/SortWithinPartitionsConfigurator.scala
+++ b/core/src/main/scala/com/amadeus/dataio/config/fields/SortWithinPartitionsConfigurator.scala
@@ -14,7 +14,7 @@ trait SortWithinPartitionsConfigurator {
Try {
config.getString("sort_within_partitions.exprs").split(",").map(_.trim).toSeq
} orElse Try {
- config.getStringList("sort_within_partitions.exprs").asScala
+ config.getStringList("sort_within_partitions.exprs").asScala.toSeq
} getOrElse {
Seq[String]()
}
diff --git a/core/src/main/scala/com/amadeus/dataio/core/InstantiationHelper.scala b/core/src/main/scala/com/amadeus/dataio/core/InstantiationHelper.scala
index 8d16044..c66b2a5 100644
--- a/core/src/main/scala/com/amadeus/dataio/core/InstantiationHelper.scala
+++ b/core/src/main/scala/com/amadeus/dataio/core/InstantiationHelper.scala
@@ -6,21 +6,19 @@ import scala.reflect.runtime.universe.TermName
import scala.reflect.runtime.{universe => ru}
import scala.util.{Failure, Success, Try}
-/**
- * Contains a variety of helper functions to facilitate instantiating objects using reflection.
- */
+/** Contains a variety of helper functions to facilitate instantiating objects using reflection.
+ */
trait InstantiationHelper {
- /**
- * Creates an instance of a given class by using its companion object.
- * @param config The configuration of the object to create.
- * @tparam T The parent class of the class to instantiate.
- * @return A new instance of the subclass of T provided in the config argument.
- * @throws ClassNotFoundException If the class in the config argument can not be found.
- * @throws Exception If the companion object of the class is not defined properly, that is to say missing an apply
- * method taking a ConfigNode argument and returning a T. Finally, if the companion object of the
- * class throws an exception.
- */
+ /** Creates an instance of a given class by using its companion object.
+ * @param config The configuration of the object to create.
+ * @tparam T The parent class of the class to instantiate.
+ * @return A new instance of the subclass of T provided in the config argument.
+ * @throws ClassNotFoundException If the class in the config argument can not be found.
+ * @throws Exception If the companion object of the class is not defined properly, that is to say missing an apply
+ * method taking a ConfigNode argument and returning a T. Finally, if the companion object of the
+ * class throws an exception.
+ */
def instantiateWithCompanionObject[T](config: ConfigNode): T = {
// The Scala reflection API being difficult to debug, exceptions are
// thrown here in order to guide the implementation of new entities
@@ -64,28 +62,29 @@ trait InstantiationHelper {
// If the entity's companion object does not exist or its apply method does not have the proper signature, an IllegalArgumentException
// will be thrown.
case "IllegalArgumentException" =>
- throw new Exception(s"$className is not a proper Type. This usually happens if the companion object does not have an apply method with a valid signature.")
+ throw new Exception(
+ s"$className is not a proper Type. This usually happens if the companion object does not have an apply method with a valid signature."
+ )
// In all other cases, we don't know how to handle the error, so we re-throw the exception as is.
case _ => throw ex
}
}
}
- /**
- * Creates an instance of a given class by using its empty constructor.
- * @param fullyQualifiedClassName The fully qualified name of the class to instantiate.
- * @tparam T The parent class of the class to instantiate.
- * @return A new instance of the subclass of T provided in the className argument.
- */
+ /** Creates an instance of a given class by using its empty constructor.
+ * @param fullyQualifiedClassName The fully qualified name of the class to instantiate.
+ * @tparam T The parent class of the class to instantiate.
+ * @return A new instance of the subclass of T provided in the className argument.
+ */
def instantiateWithEmptyConstructor[T](fullyQualifiedClassName: String): T = {
Class
.forName(fullyQualifiedClassName)
- .newInstance
+ .getDeclaredConstructor()
+ .newInstance()
.asInstanceOf[T]
}
}
-/**
- * Contains a variety of helper functions to facilitate instantiating objects using reflection.
- */
+/** Contains a variety of helper functions to facilitate instantiating objects using reflection.
+ */
object InstantiationHelper extends InstantiationHelper
diff --git a/core/src/main/scala/com/amadeus/dataio/core/time/Time.scala b/core/src/main/scala/com/amadeus/dataio/core/time/Time.scala
index 154cdc4..33db7a7 100644
--- a/core/src/main/scala/com/amadeus/dataio/core/time/Time.scala
+++ b/core/src/main/scala/com/amadeus/dataio/core/time/Time.scala
@@ -6,18 +6,15 @@ import com.amadeus.dataio.core.time.Modifiers.ModifierString
import java.time.{LocalDate, LocalDateTime, ZoneId, ZonedDateTime}
import scala.util.Try
-
-/**
- * This object exposes helper methods for working on Time objects.
- */
+/** This object exposes helper methods for working on Time objects.
+ */
object Time {
- /**
- * Function that returns the first time stamp on chronological order
- * @param instant1 a time stamp
- * @param instant2 another time stamp
- * @return first time stamp on chronological order
- */
+ /** Function that returns the first time stamp on chronological order
+ * @param instant1 a time stamp
+ * @param instant2 another time stamp
+ * @return first time stamp on chronological order
+ */
def minTime(instant1: LocalDateTime, instant2: LocalDateTime): LocalDateTime = {
if (instant1.isBefore(instant2)) {
instant1
@@ -26,12 +23,11 @@ object Time {
}
}
- /**
- * Function that returns the second time stamp on chronological order
- * @param instant1 a time stamp
- * @param instant2 another time stamp
- * @return second time stamp on chronological order
- */
+ /** Function that returns the second time stamp on chronological order
+ * @param instant1 a time stamp
+ * @param instant2 another time stamp
+ * @return second time stamp on chronological order
+ */
def maxTime(instant1: LocalDateTime, instant2: LocalDateTime): LocalDateTime = {
if (instant1.isAfter(instant2)) {
instant1
@@ -40,23 +36,21 @@ object Time {
}
}
- /**
- * Function that returns a map of key-value pairs based on NOW and a prefix
- *
- * @param prefix a prefix string that is added to each key
- * @return map of key-value pairs based on a time stamp
- */
+ /** Function that returns a map of key-value pairs based on NOW and a prefix
+ *
+ * @param prefix a prefix string that is added to each key
+ * @return map of key-value pairs based on a time stamp
+ */
def getTimeMap(prefix: Option[String]): Map[String, String] = {
getTimeMap(LocalDateTime.now(), prefix)
}
- /**
- * Function that returns a map of key-value pairs based on a time stamp and a prefix
- *
- * @param timestamp a time stamp (by default is NOW)
- * @param prefix an optional prefix string that is added to each key
- * @return map of key-value pairs based on a time stamp
- */
+ /** Function that returns a map of key-value pairs based on a time stamp and a prefix
+ *
+ * @param timestamp a time stamp (by default is NOW)
+ * @param prefix an optional prefix string that is added to each key
+ * @return map of key-value pairs based on a time stamp
+ */
def getTimeMap(timestamp: LocalDateTime = LocalDateTime.now(), prefix: Option[String] = None): Map[String, String] = {
def addPrefix(label: String): String = prefix match {
case Some(p) => p + "." + label
@@ -65,76 +59,67 @@ object Time {
Formats.AllFormats.map(f => (addPrefix(f.label), timestamp.formatted(f))).toMap
}
- /**
- * Class used to extend LocalDateTime methods
- */
+ /** Class used to extend LocalDateTime methods
+ */
implicit class DateImplicits(time: LocalDateTime) {
- /**
- * Use to get a time string based upon the chosen formatter
- * @param formatter a labeled formatter
- * @return time string based upon the formatter
- */
+ /** Use to get a time string based upon the chosen formatter
+ * @param formatter a labeled formatter
+ * @return time string based upon the formatter
+ */
def formatted(formatter: LabeledFormatter): String = {
ZonedDateTime.of(time, ZoneId.of("Z")).format(formatter.formatter)
}
- /**
- * Truncate a given time stamp to second (example: 2015-09-17T13:35:27.957Z -> 2015-09-17T13:35:27.000Z)
- * @return a time stamp
- */
+ /** Truncate a given time stamp to second (example: 2015-09-17T13:35:27.957Z -> 2015-09-17T13:35:27.000Z)
+ * @return a time stamp
+ */
def truncSecond: LocalDateTime = {
time.withNano(0)
}
- /**
- * Truncate a given time stamp to minute (example: 2015-09-17T13:35:27.957Z -> 2015-09-17T13:35:00.000Z)
- * @return a time stamp
- */
+ /** Truncate a given time stamp to minute (example: 2015-09-17T13:35:27.957Z -> 2015-09-17T13:35:00.000Z)
+ * @return a time stamp
+ */
def truncMinute: LocalDateTime = {
time.truncSecond.withSecond(0)
}
- /**
- * Truncate a given time stamp to hour (example: 2015-09-17T13:35:27.957Z -> 2015-09-17T13:00:00.000Z)
- * @return a time stamp
- */
+ /** Truncate a given time stamp to hour (example: 2015-09-17T13:35:27.957Z -> 2015-09-17T13:00:00.000Z)
+ * @return a time stamp
+ */
def truncHour: LocalDateTime = {
time.truncMinute.withMinute(0)
}
- /**
- * Truncate a given time stamp to day (example: 2015-09-17T13:35:27.957Z -> 2015-09-17T00:00:00.000Z)
- * @return a time stamp
- */
+ /** Truncate a given time stamp to day (example: 2015-09-17T13:35:27.957Z -> 2015-09-17T00:00:00.000Z)
+ * @return a time stamp
+ */
def truncDay: LocalDateTime = {
time.truncHour.withHour(0)
}
- /**
- * Truncate a given time stamp to month (example: 2015-09-17T13:35:27.957Z -> 2015-09-01T00:00:00.000Z)
- * @return a time stamp
- */
+ /** Truncate a given time stamp to month (example: 2015-09-17T13:35:27.957Z -> 2015-09-01T00:00:00.000Z)
+ * @return a time stamp
+ */
def truncMonth: LocalDateTime = {
time.truncDay.withDayOfMonth(1)
}
- /**
- * Truncate a given time stamp to year (example: 2015-09-17T13:35:27.957Z -> 2015-01-01T00:00:00.000Z)
- * @return a time stamp
- */
+ /** Truncate a given time stamp to year (example: 2015-09-17T13:35:27.957Z -> 2015-01-01T00:00:00.000Z)
+ * @return a time stamp
+ */
def truncYear: LocalDateTime = {
time.truncMonth.withMonth(1)
}
- /**
- * Truncate a given time stamp using a modifier
- *
- * @return a time stamp
- * @throws IllegalArgumentException when the modifier is incorrect
- */
+ /** Truncate a given time stamp using a modifier
+ *
+ * @return a time stamp
+ * @throws IllegalArgumentException when the modifier is incorrect
+ */
def truncate(modifier: String): LocalDateTime = {
- modifier.getModifier match {
+ modifier.getModifier() match {
case Modifiers.TruncSecond => time.truncSecond
case Modifiers.TruncMinute => time.truncMinute
case Modifiers.TruncHour => time.truncHour
@@ -145,19 +130,18 @@ object Time {
}
}
- /**
- * Apply a delta to a time stamp using a modifier.
- * @param modifier a string with syntax: (+|-)number(time pattern symbol) -> examples: +5D -3W -1Y +6M -25m +1H -30s
- * @return a time stamp
- * @throws IllegalArgumentException when the modifier is incorrect
- */
+ /** Apply a delta to a time stamp using a modifier.
+ * @param modifier a string with syntax: (+|-)number(time pattern symbol) -> examples: +5D -3W -1Y +6M -25m +1H -30s
+ * @return a time stamp
+ * @throws IllegalArgumentException when the modifier is incorrect
+ */
def applyDelta(modifier: String): LocalDateTime = {
val sign = modifier.substring(0, 1)
val letter = modifier.substring(modifier.length - 1)
val delta = Try(Integer.parseInt(modifier.substring(1, modifier.length - 1)))
if (delta.isSuccess) {
if (sign.equals("-")) {
- letter.getModifier match {
+ letter.getModifier() match {
case Modifiers.LetterSecond => time.minusSeconds(delta.get)
case Modifiers.LetterMinute => time.minusMinutes(delta.get)
case Modifiers.LetterHour => time.minusHours(delta.get)
@@ -168,7 +152,7 @@ object Time {
case _ => throw new IllegalArgumentException(modifier)
}
} else if (sign.equals("+")) {
- letter.getModifier match {
+ letter.getModifier() match {
case Modifiers.LetterSecond => time.plusSeconds(delta.get)
case Modifiers.LetterMinute => time.plusMinutes(delta.get)
case Modifiers.LetterHour => time.plusHours(delta.get)
@@ -186,17 +170,16 @@ object Time {
}
}
- /**
- * Set a time property (second, minute, etc...) to a required value.
- * @param letter a time pattern symbol -> examples: D, W, Y, M, m, H, s
- * @param right a numeric value that you want to set
- * @return a time stamp
- * @throws IllegalArgumentException when the letter or numeric value are incorrect
- */
+ /** Set a time property (second, minute, etc...) to a required value.
+ * @param letter a time pattern symbol -> examples: D, W, Y, M, m, H, s
+ * @param right a numeric value that you want to set
+ * @return a time stamp
+ * @throws IllegalArgumentException when the letter or numeric value are incorrect
+ */
def applySetter(letter: String, right: String): LocalDateTime = {
val value = Try(Integer.parseInt(right))
if (value.isSuccess) {
- letter.getModifier match {
+ letter.getModifier() match {
case Modifiers.LetterSecond => time.withSecond(value.get)
case Modifiers.LetterMinute => time.withMinute(value.get)
case Modifiers.LetterHour => time.withHour(value.get)
@@ -210,36 +193,38 @@ object Time {
}
}
- /**
- * Determines if this time is between the two given time bounds. (ClosedClosed range type)
- * @param from the left bound
- * @param until the right bound
- * @return a Boolean
- */
+ /** Determines if this time is between the two given time bounds. (ClosedClosed range type)
+ * @param from the left bound
+ * @param until the right bound
+ * @return a Boolean
+ */
def isBetween(from: LocalDateTime, until: LocalDateTime): Boolean = {
time.isBetween(from, until, ClosedClosed)
}
- /**
- * Determines if this time is between the two given time bounds.
- * @param from the left bound
- * @param until the right bound
- * @param rangeType determines if the left bound and right bound are included in the range
- * @return a Boolean
- */
+ /** Determines if this time is between the two given time bounds.
+ * @param from the left bound
+ * @param until the right bound
+ * @param rangeType determines if the left bound and right bound are included in the range
+ * @return a Boolean
+ */
def isBetween(from: LocalDateTime, until: LocalDateTime, rangeType: RangeType): Boolean = {
require(!from.isAfter(until))
time.isBetween(new DateRange(from, until), rangeType)
}
- /**
- * Determines if this time is between the two given time bounds.
- * @param dateRange the date range containing the left and right time bounds
- * @param rangeType determines if the left bound and right bound are included in the range
- * @return a Boolean
- */
+ /** Determines if this time is between the two given time bounds.
+ * @param dateRange the date range containing the left and right time bounds
+ * @param rangeType determines if the left bound and right bound are included in the range
+ * @return a Boolean
+ */
def isBetween(dateRange: DateRange, rangeType: RangeType): Boolean = {
- (time.isBefore(dateRange.from), time.equals(dateRange.from), time.equals(dateRange.until), time.isAfter(dateRange.until)) match {
+ (
+ time.isBefore(dateRange.from),
+ time.equals(dateRange.from),
+ time.equals(dateRange.until),
+ time.isAfter(dateRange.until)
+ ) match {
case (true, _, _, _) => false // time, from before the left bound
case (_, _, _, true) => false // after the right bound
case (_, true, true, _) => rangeType == ClosedClosed // left and right bound are the same time
@@ -250,58 +235,61 @@ object Time {
}
}
- /**
- * Function that take a time expression and produce a time stamp instance.
- * @param timeExpression a time expression formed by a reference followed by modifiers
- * @return a time stamp
- * @throws RuntimeException when the reference cannot be parsed as a time stamp
- */
+ /** Function that take a time expression and produce a time stamp instance.
+ * @param timeExpression a time expression formed by a reference followed by modifiers
+ * @return a time stamp
+ * @throws RuntimeException when the reference cannot be parsed as a time stamp
+ */
def produceTime(timeExpression: String): LocalDateTime = {
require(timeExpression != null)
val parameters = timeExpression.trim().split(" ")
val timeStr = parameters(0)
- val modifiers = parameters.drop(1).filterNot { x => x.isEmpty() }
+ val modifiers = parameters.drop(1).filterNot { x => x.isEmpty }
- val parsedDates = Formats.AllFormats.map { format => Try(format.formatter.parse(timeStr)) }.filter { parsedDate => parsedDate.isSuccess }
+ val parsedDates =
+ Formats.AllFormats.map { format => Try(format.formatter.parse(timeStr)) }.filter { parsedDate => parsedDate.isSuccess }
if (parsedDates.isEmpty) {
calculateTime(LocalDateTime.now(), modifiers)
} else {
- var referenceTimes = parsedDates.map { parsedDate => Try(LocalDateTime.from(parsedDate.get)) }.filter { parsedDate => parsedDate.isSuccess }
+ var referenceTimes =
+ parsedDates.map { parsedDate => Try(LocalDateTime.from(parsedDate.get)) }.filter { parsedDate => parsedDate.isSuccess }
if (referenceTimes.isEmpty) {
- referenceTimes = parsedDates.map { parsedDate => Try(LocalDate.from(parsedDate.get).atStartOfDay()) }.filter { parsedDate => parsedDate.isSuccess }
+ referenceTimes = parsedDates.map { parsedDate => Try(LocalDate.from(parsedDate.get).atStartOfDay()) }.filter {
+ parsedDate => parsedDate.isSuccess
+ }
}
if (referenceTimes.isEmpty) {
- throw new RuntimeException("Impossible to create a time stamp instance using the parsed objects. Investigation required: " + parsedDates.toString)
+ throw new RuntimeException(
+ "Impossible to create a time stamp instance using the parsed objects. Investigation required: " + parsedDates.toString
+ )
} else {
calculateTime(referenceTimes.last.get, modifiers)
}
}
}
- /**
- * Function that calculate a time stamp using a list of modifiers.
- * Pay attention to modifiers order since the operation are not commutative.
- * @param reference a time stamp
- * @param modifiers a string listing time modifiers
- * @return a time stamp
- * @throws IllegalArgumentException when the modifiers are incorrect
- */
+ /** Function that calculate a time stamp using a list of modifiers.
+ * Pay attention to modifiers order since the operation are not commutative.
+ * @param reference a time stamp
+ * @param modifiers a string listing time modifiers
+ * @return a time stamp
+ * @throws IllegalArgumentException when the modifiers are incorrect
+ */
def calculateTime(reference: LocalDateTime, modifiers: String): LocalDateTime = {
calculateTime(reference, modifiers.trim().split(" "))
}
- /**
- * Function that calculate a time stamp using a list of modifiers.
- * Pay attention to modifiers order since the operation are not commutative.
- * @param reference a time stamp
- * @param modifiers a list of modifiers
- * @return a time stamp
- * @throws IllegalArgumentException when the modifiers are incorrect
- */
+ /** Function that calculate a time stamp using a list of modifiers.
+ * Pay attention to modifiers order since the operation are not commutative.
+ * @param reference a time stamp
+ * @param modifiers a list of modifiers
+ * @return a time stamp
+ * @throws IllegalArgumentException when the modifiers are incorrect
+ */
def calculateTime(reference: LocalDateTime, modifiers: Array[String] = Array.empty): LocalDateTime = {
modifiers.foldLeft(reference) { (date, modifier) =>
- if (!modifier.isEmpty) {
- modifier.isTruncateModifier match {
+ if (modifier.nonEmpty) {
+ modifier.isTruncateModifier() match {
case true => date.truncate(modifier)
case false => {
val parts = modifier.split("=")
diff --git a/elasticsearch/src/main/scala/com/amadeus/dataio/pipes/elk/ElkConfigurator.scala b/elasticsearch/src/main/scala/com/amadeus/dataio/pipes/elk/ElkConfigurator.scala
index 53a3480..33d69a7 100644
--- a/elasticsearch/src/main/scala/com/amadeus/dataio/pipes/elk/ElkConfigurator.scala
+++ b/elasticsearch/src/main/scala/com/amadeus/dataio/pipes/elk/ElkConfigurator.scala
@@ -4,36 +4,32 @@ import com.typesafe.config.Config
import scala.util.Try
-/**
- * Elasticsearch parameterization
- */
+/** Elasticsearch parameterization
+ */
object ElkConfigurator {
- /**
- * @param config The typesafe Config object holding the configuration.
- * @return A String of the index to use.
- * @throws com.typesafe.config.ConfigException If data is missing in the config argument. See the user documentation
- * for the expected fields.
- */
+ /** @param config The typesafe Config object holding the configuration.
+ * @return A String of the index to use.
+ * @throws com.typesafe.config.ConfigException If data is missing in the config argument. See the user documentation
+ * for the expected fields.
+ */
def getIndex(implicit config: Config): String = {
- config.getString("Index")
+ config.getString("index")
}
- /**
- * @param config The typesafe Config object holding the configuration.
- * @return A String of the date field to be used for sub index partitioning.
- * @throws com.typesafe.config.ConfigException If data is missing in the config argument. See the user documentation
- * for the expected fields.
- */
+ /** @param config The typesafe Config object holding the configuration.
+ * @return A String of the date field to be used for sub index partitioning.
+ * @throws com.typesafe.config.ConfigException If data is missing in the config argument. See the user documentation
+ * for the expected fields.
+ */
def getDateField(implicit config: Config): String = {
- config.getString("DateField")
+ config.getString("dateField")
}
- /**
- * @param config The typesafe Config object holding the configuration.
- * @return A Option[String] of the date pattern to use for sub index partitioning.
- */
+ /** @param config The typesafe Config object holding the configuration.
+ * @return A Option[String] of the date pattern to use for sub index partitioning.
+ */
def getSubIndexDatePattern(implicit config: Config): Option[String] = {
- Try(config.getString("SubIndexDatePattern")).toOption
+ Try(config.getString("subIndexDatePattern")).toOption
}
}
diff --git a/elasticsearch/src/main/scala/com/amadeus/dataio/pipes/elk/batch/ElkOutput.scala b/elasticsearch/src/main/scala/com/amadeus/dataio/pipes/elk/batch/ElkOutput.scala
index 6884236..9018f6c 100644
--- a/elasticsearch/src/main/scala/com/amadeus/dataio/pipes/elk/batch/ElkOutput.scala
+++ b/elasticsearch/src/main/scala/com/amadeus/dataio/pipes/elk/batch/ElkOutput.scala
@@ -8,17 +8,17 @@ import org.apache.spark.sql.{Dataset, SparkSession}
import scala.util.Try
-/**
- * Allows to write batch data to Elasticsearch with automatic date sub-indexing.
- *
- * @param index the Index to write to.
- * @param mode mode.
- * @param dateField The date field to use for sub index partitioning.
- * @param suffixDatePattern the date suffix pattern to use for the full index.
- * @param options options.
- * @param config Contains the Typesafe Config object that was used at instantiation to configure this entity.
- */
+/** Allows to write batch data to Elasticsearch with automatic date sub-indexing.
+ *
+ * @param index the Index to write to.
+ * @param mode mode.
+ * @param dateField The date field to use for sub index partitioning.
+ * @param suffixDatePattern the date suffix pattern to use for the full index.
+ * @param options options.
+ * @param config Contains the Typesafe Config object that was used at instantiation to configure this entity.
+ */
case class ElkOutput(
+ name: String,
index: String,
mode: String,
dateField: String,
@@ -29,12 +29,11 @@ case class ElkOutput(
with Logging
with ElkOutputCommons {
- /**
- * Writes data to this output.
- *
- * @param data The data to write.
- * @param spark The SparkSession which will be used to write the data.
- */
+ /** Writes data to this output.
+ *
+ * @param data The data to write.
+ * @param spark The SparkSession which will be used to write the data.
+ */
override def write[T](data: Dataset[T])(implicit spark: SparkSession): Unit = {
val fullIndexName = computeFullIndexName()
logger.info(s"Write dataframe to Elasticsearch index [$fullIndexName]")
@@ -48,17 +47,21 @@ object ElkOutput {
import com.amadeus.dataio.config.fields._
import com.amadeus.dataio.pipes.elk.ElkConfigurator._
- /**
- * Creates an ElkOutput based on a given configuration.
- *
- * @param config The collection of config nodes that will be used to instantiate KafkaOutput.
- * @return a new instance of ElkOutput.
- */
+ /** Creates an ElkOutput based on a given configuration.
+ *
+ * @param config The collection of config nodes that will be used to instantiate KafkaOutput.
+ * @return a new instance of ElkOutput.
+ */
def apply(implicit config: Config): ElkOutput = {
+ val name = Try {
+ config.getString("name")
+ } getOrElse {
+ throw new Exception("Missing required `name` field in configuration.")
+ }
val index = getIndex
- val mode = config.getString("Mode")
+ val mode = config.getString("mode")
val options = Try(getOptions).getOrElse(Map())
@@ -69,7 +72,15 @@ object ElkOutput {
val suffixDatePattern = getSubIndexDatePattern.getOrElse(DefaultSuffixDatePattern)
- ElkOutput(index = index, mode = mode, dateField = dateField, suffixDatePattern = suffixDatePattern, options = options, config = config)
+ ElkOutput(
+ name = name,
+ index = index,
+ mode = mode,
+ dateField = dateField,
+ suffixDatePattern = suffixDatePattern,
+ options = options,
+ config = config
+ )
}
}
diff --git a/elasticsearch/src/main/scala/com/amadeus/dataio/pipes/elk/streaming/ElkOutput.scala b/elasticsearch/src/main/scala/com/amadeus/dataio/pipes/elk/streaming/ElkOutput.scala
index f2b72bb..ec70759 100644
--- a/elasticsearch/src/main/scala/com/amadeus/dataio/pipes/elk/streaming/ElkOutput.scala
+++ b/elasticsearch/src/main/scala/com/amadeus/dataio/pipes/elk/streaming/ElkOutput.scala
@@ -8,20 +8,20 @@ import org.apache.spark.sql.{Dataset, SparkSession}
import scala.util.Try
-/**
- * Allows to write stream data to Elasticsearch with automatic date sub-indexing.
- *
- * @param index the Index to write to.
- * @param trigger the trigger to be used for the streaming query.
- * @param timeout timeout in milliseconds.
- * @param mode mode.
- * @param options options.
- * @param config Contains the Typesafe Config object that was used at instantiation to configure this entity.
- * @param dateField The date field to use for sub index partitioning.
- * @param suffixDatePattern the date suffix pattern to use for the full index.
- * @param outputName the output name used to define the streaming query name.
- */
+/** Allows to write stream data to Elasticsearch with automatic date sub-indexing.
+ *
+ * @param index the Index to write to.
+ * @param trigger the trigger to be used for the streaming query.
+ * @param timeout timeout in milliseconds.
+ * @param mode mode.
+ * @param options options.
+ * @param config Contains the Typesafe Config object that was used at instantiation to configure this entity.
+ * @param dateField The date field to use for sub index partitioning.
+ * @param suffixDatePattern the date suffix pattern to use for the full index.
+ * @param outputName the output name used to define the streaming query name.
+ */
case class ElkOutput(
+ name: String,
index: String,
trigger: Option[Trigger],
timeout: Long,
@@ -29,18 +29,16 @@ case class ElkOutput(
options: Map[String, String] = Map.empty,
config: Config = ConfigFactory.empty(),
dateField: String,
- suffixDatePattern: String,
- outputName: Option[String]
+ suffixDatePattern: String
) extends Output
with Logging
with ElkOutputCommons {
- /**
- * Writes data to this output.
- *
- * @param data The data to write.
- * @param spark The SparkSession which will be used to write the data.
- */
+ /** Writes data to this output.
+ *
+ * @param data The data to write.
+ * @param spark The SparkSession which will be used to write the data.
+ */
def write[T](data: Dataset[T])(implicit spark: SparkSession): Unit = {
val fullIndexName = computeFullIndexName()
logger.info(s"Write dataframe to Elasticsearch index [$fullIndexName] using trigger [$trigger]")
@@ -64,19 +62,11 @@ case class ElkOutput(
streamingQuery.stop()
}
- /**
- * Create a unique query name based on output path if exists.
- *
- * @return a unique query name.
- */
- private[streaming] def createQueryName(): String = {
-
- outputName match {
- case Some(name) => s"QN_${name}_${index}_${java.util.UUID.randomUUID}"
- case _ => s"QN_${index}_${java.util.UUID.randomUUID}"
- }
-
- }
+ /** Create a unique query name based on output path if exists.
+ *
+ * @return a unique query name.
+ */
+ private[streaming] def createQueryName(): String = s"QN_${index}_${java.util.UUID.randomUUID}"
}
object ElkOutput {
@@ -84,17 +74,21 @@ object ElkOutput {
import com.amadeus.dataio.pipes.elk.ElkConfigurator._
import com.amadeus.dataio.pipes.elk.ElkOutputCommons.{DefaultSuffixDatePattern, checkNodesIsDefined, checkPortIsDefined}
- /**
- * Creates an ElkOutput based on a given configuration.
- *
- * @param config The collection of config nodes that will be used to instantiate KafkaOutput.
- * @return a new instance of ElkOutput.
- */
+ /** Creates an ElkOutput based on a given configuration.
+ *
+ * @param config The collection of config nodes that will be used to instantiate KafkaOutput.
+ * @return a new instance of ElkOutput.
+ */
def apply(implicit config: Config): ElkOutput = {
+ val name = Try {
+ config.getString("name")
+ } getOrElse {
+ throw new Exception("Missing required `name` field in configuration.")
+ }
val index = getIndex
- val mode = config.getString("Mode")
+ val mode = config.getString("mode")
val trigger = getStreamingTrigger
@@ -109,18 +103,16 @@ object ElkOutput {
val suffixDatePattern = getSubIndexDatePattern.getOrElse(DefaultSuffixDatePattern)
- val name = Try(config.getString("Name")).toOption
-
ElkOutput(
+ name = name,
index = index,
trigger = trigger,
- timeout = timeout,
+ timeout = timeout.get,
mode = mode,
options = options,
config = config,
dateField = dateField,
- suffixDatePattern = suffixDatePattern,
- outputName = name
+ suffixDatePattern = suffixDatePattern
)
}
}
diff --git a/elasticsearch/src/test/scala/com/amadeus/dataio/pipes/elk/ElkConfiguratorTest.scala b/elasticsearch/src/test/scala/com/amadeus/dataio/pipes/elk/ElkConfiguratorTest.scala
index f78c37d..052c8f7 100644
--- a/elasticsearch/src/test/scala/com/amadeus/dataio/pipes/elk/ElkConfiguratorTest.scala
+++ b/elasticsearch/src/test/scala/com/amadeus/dataio/pipes/elk/ElkConfiguratorTest.scala
@@ -11,7 +11,7 @@ class ElkConfiguratorTest extends AnyWordSpec with Matchers {
"getIndex" should {
"return index_x given Index = index_x" in {
val config = ConfigFactory.parseMap(
- Map("Index" -> "index_x")
+ Map("index" -> "index_x")
)
getIndex(config) shouldEqual "index_x"
}
@@ -27,7 +27,7 @@ class ElkConfiguratorTest extends AnyWordSpec with Matchers {
"getElkDateField" should {
"return timestamp given DateField = timestamp" in {
val config = ConfigFactory.parseMap(
- Map("DateField" -> "timestamp")
+ Map("dateField" -> "timestamp")
)
getDateField(config) shouldEqual "timestamp"
}
@@ -43,7 +43,7 @@ class ElkConfiguratorTest extends AnyWordSpec with Matchers {
"getSubIndexDatePattern" should {
"return yyyy.MM given SubIndexDatePattern = yyyy.MM" in {
val config = ConfigFactory.parseMap(
- Map("SubIndexDatePattern" -> "yyyy.MM")
+ Map("subIndexDatePattern" -> "yyyy.MM")
)
getSubIndexDatePattern(config) shouldEqual Some("yyyy.MM")
}
diff --git a/elasticsearch/src/test/scala/com/amadeus/dataio/pipes/elk/batch/ElkOutputTest.scala b/elasticsearch/src/test/scala/com/amadeus/dataio/pipes/elk/batch/ElkOutputTest.scala
index 89bb61a..076feea 100644
--- a/elasticsearch/src/test/scala/com/amadeus/dataio/pipes/elk/batch/ElkOutputTest.scala
+++ b/elasticsearch/src/test/scala/com/amadeus/dataio/pipes/elk/batch/ElkOutputTest.scala
@@ -12,15 +12,15 @@ class ElkOutputTest extends AnyWordSpec with Matchers {
val config = ConfigFactory.parseMap(
Map(
- "Output" -> Map(
- "Type" -> "com.amadeus.dataio.pipes.elk.batch.ElkOutput",
- "Name" -> "my-test-elk",
- "Nodes" -> "bktv001, bktv002.amadeus.net",
- "Ports" -> "9200",
- "Index" -> "test.index",
- "DateField" -> "docDate",
- "Mode" -> "append",
- "Options" -> Map(
+ "output" -> Map(
+ "type" -> "com.amadeus.dataio.pipes.elk.batch.ElkOutput",
+ "name" -> "my-test-elk",
+ "nodes" -> "bktv001, bktv002.amadeus.net",
+ "ports" -> "9200",
+ "index" -> "test.index",
+ "dateField" -> "docDate",
+ "mode" -> "append",
+ "options" -> Map(
"\"es.net.ssl.cert.allow.self.signed\"" -> true,
"\"es.index.auto.create\"" -> true,
"\"es.mapping.id\"" -> "docId",
@@ -31,7 +31,7 @@ class ElkOutputTest extends AnyWordSpec with Matchers {
)
)
- val elkStreamingOutput = ElkOutput.apply(config.getConfig("Output"))
+ val elkStreamingOutput = ElkOutput.apply(config.getConfig("output"))
elkStreamingOutput.index shouldEqual "test.index"
elkStreamingOutput.dateField shouldEqual "docDate"
@@ -51,16 +51,16 @@ class ElkOutputTest extends AnyWordSpec with Matchers {
val config = ConfigFactory.parseMap(
Map(
- "Output" -> Map(
- "Type" -> "com.amadeus.dataio.pipes.elk.batch.ElkOutput",
- "Name" -> "my-test-elk",
- "Nodes" -> "bktv001, bktv002.amadeus.net",
- "Ports" -> "9200",
- "Index" -> "test.index",
- "DateField" -> "docDate",
- "SubIndexDatePattern" -> "yyyy.MM.dd",
- "Mode" -> "append",
- "Options" -> Map(
+ "output" -> Map(
+ "type" -> "com.amadeus.dataio.pipes.elk.batch.ElkOutput",
+ "name" -> "my-test-elk",
+ "nodes" -> "bktv001, bktv002.amadeus.net",
+ "ports" -> "9200",
+ "index" -> "test.index",
+ "dateField" -> "docDate",
+ "mode" -> "append",
+ "subIndexDatePattern" -> "yyyy.MM.dd",
+ "options" -> Map(
"\"es.net.ssl.cert.allow.self.signed\"" -> true,
"\"es.index.auto.create\"" -> true,
"\"es.mapping.id\"" -> "docId",
@@ -71,7 +71,7 @@ class ElkOutputTest extends AnyWordSpec with Matchers {
)
)
- val elkStreamingOutput = ElkOutput.apply(config.getConfig("Output"))
+ val elkStreamingOutput = ElkOutput.apply(config.getConfig("output"))
elkStreamingOutput.index shouldEqual "test.index"
elkStreamingOutput.dateField shouldEqual "docDate"
diff --git a/elasticsearch/src/test/scala/com/amadeus/dataio/pipes/elk/streaming/ElkOutputTest.scala b/elasticsearch/src/test/scala/com/amadeus/dataio/pipes/elk/streaming/ElkOutputTest.scala
index c8093f6..1ef3945 100644
--- a/elasticsearch/src/test/scala/com/amadeus/dataio/pipes/elk/streaming/ElkOutputTest.scala
+++ b/elasticsearch/src/test/scala/com/amadeus/dataio/pipes/elk/streaming/ElkOutputTest.scala
@@ -15,17 +15,17 @@ class ElkOutputTest extends AnyWordSpec with Matchers {
val config = ConfigFactory.parseMap(
Map(
- "Output" -> Map(
- "Type" -> "com.amadeus.dataio.output.streaming.ElkOutput",
- "Name" -> "my-test-elk",
- "Nodes" -> "bktv001, bktv002.amadeus.net",
- "Ports" -> "9200",
- "Index" -> "test.index",
- "DateField" -> "docDate",
- "Mode" -> "append",
- "Duration" -> "6 hours",
- "Timeout" -> "24",
- "Options" -> Map(
+ "output" -> Map(
+ "type" -> "com.amadeus.dataio.output.streaming.ElkOutput",
+ "name" -> "my-test-elk",
+ "nodes" -> "bktv001, bktv002.amadeus.net",
+ "ports" -> "9200",
+ "index" -> "test.index",
+ "dateField" -> "docDate",
+ "mode" -> "append",
+ "duration" -> "6 hours",
+ "timeout" -> "24 hours",
+ "options" -> Map(
"\"es.net.ssl.cert.allow.self.signed\"" -> true,
"\"es.index.auto.create\"" -> true,
"\"es.mapping.id\"" -> "docId",
@@ -36,9 +36,9 @@ class ElkOutputTest extends AnyWordSpec with Matchers {
)
)
- val elkStreamingOutput = ElkOutput.apply(config.getConfig("Output"))
+ val elkStreamingOutput = ElkOutput.apply(config.getConfig("output"))
- elkStreamingOutput.outputName shouldEqual Some("my-test-elk")
+ elkStreamingOutput.name shouldEqual "my-test-elk"
elkStreamingOutput.index shouldEqual "test.index"
elkStreamingOutput.dateField shouldEqual "docDate"
elkStreamingOutput.suffixDatePattern shouldEqual "yyyy.MM"
@@ -55,21 +55,21 @@ class ElkOutputTest extends AnyWordSpec with Matchers {
}
- "be initialized according to configuration without output name" in {
+ "raise exception given missing output name" in {
val config = ConfigFactory.parseMap(
Map(
- "Output" -> Map(
- "Type" -> "com.amadeus.dataio.output.streaming.ElkOutput",
- "Nodes" -> "bktv001, bktv002.amadeus.net",
- "Ports" -> "9200",
- "Index" -> "test.index",
- "DateField" -> "docDate",
- "Mode" -> "append",
- "Trigger" -> "Continuous",
- "Duration" -> "6 hours",
- "Timeout" -> "24",
- "Options" -> Map(
+ "output" -> Map(
+ "type" -> "com.amadeus.dataio.output.streaming.ElkOutput",
+ "nodes" -> "bktv001, bktv002.amadeus.net",
+ "ports" -> "9200",
+ "index" -> "test.index",
+ "dateField" -> "docDate",
+ "mode" -> "append",
+ "trigger" -> "Continuous",
+ "duration" -> "6 hours",
+ "timeout" -> "24 hours",
+ "options" -> Map(
"\"es.net.ssl.cert.allow.self.signed\"" -> true,
"\"es.index.auto.create\"" -> true,
"\"es.mapping.id\"" -> "docId",
@@ -80,22 +80,11 @@ class ElkOutputTest extends AnyWordSpec with Matchers {
)
)
- val elkStreamingOutput = ElkOutput.apply(config.getConfig("Output"))
+ intercept[Exception] {
+ val elkStreamingOutput = ElkOutput.apply(config.getConfig("output"))
- elkStreamingOutput.outputName shouldEqual None
- elkStreamingOutput.index shouldEqual "test.index"
- elkStreamingOutput.dateField shouldEqual "docDate"
- elkStreamingOutput.suffixDatePattern shouldEqual "yyyy.MM"
- elkStreamingOutput.mode shouldEqual "append"
- elkStreamingOutput.trigger shouldEqual Some(Trigger.Continuous(Duration("6 hours")))
- elkStreamingOutput.timeout shouldEqual 86400000
- elkStreamingOutput.options shouldEqual Map(
- "es.net.ssl.cert.allow.self.signed" -> "true",
- "es.index.auto.create" -> "true",
- "es.mapping.id" -> "docId",
- "es.port" -> "9200",
- "es.nodes" -> "bktv001, bktv002.amadeus.net"
- )
+ fail("Expected an exception to be thrown due to missing required `name` field in configuration.")
+ }
}
@@ -103,18 +92,18 @@ class ElkOutputTest extends AnyWordSpec with Matchers {
val config = ConfigFactory.parseMap(
Map(
- "Output" -> Map(
- "Type" -> "com.amadeus.dataio.output.streaming.ElkOutput",
- "Name" -> "my-test-elk",
- "Nodes" -> "bktv001, bktv002.amadeus.net",
- "Ports" -> "9200",
- "Index" -> "test.index",
- "DateField" -> "docDate",
- "SubIndexDatePattern" -> "yyyy.MM.dd",
- "Mode" -> "append",
- "Trigger" -> "AvailableNow",
- "Timeout" -> "24",
- "Options" -> Map(
+ "output" -> Map(
+ "type" -> "com.amadeus.dataio.output.streaming.ElkOutput",
+ "name" -> "my-test-elk",
+ "nodes" -> "bktv001, bktv002.amadeus.net",
+ "ports" -> "9200",
+ "index" -> "test.index",
+ "dateField" -> "docDate",
+ "subIndexDatePattern" -> "yyyy.MM.dd",
+ "mode" -> "append",
+ "trigger" -> "AvailableNow",
+ "timeout" -> "24 hours",
+ "options" -> Map(
"\"es.net.ssl.cert.allow.self.signed\"" -> true,
"\"es.index.auto.create\"" -> true,
"\"es.mapping.id\"" -> "docId",
@@ -125,9 +114,9 @@ class ElkOutputTest extends AnyWordSpec with Matchers {
)
)
- val elkStreamingOutput = ElkOutput.apply(config.getConfig("Output"))
+ val elkStreamingOutput = ElkOutput.apply(config.getConfig("output"))
- elkStreamingOutput.outputName shouldEqual Some("my-test-elk")
+ elkStreamingOutput.name shouldEqual "my-test-elk"
elkStreamingOutput.index shouldEqual "test.index"
elkStreamingOutput.dateField shouldEqual "docDate"
elkStreamingOutput.suffixDatePattern shouldEqual "yyyy.MM.dd"
@@ -148,17 +137,17 @@ class ElkOutputTest extends AnyWordSpec with Matchers {
val config = ConfigFactory.parseMap(
Map(
- "Output" -> Map(
- "Type" -> "com.amadeus.dataio.output.streaming.ElkOutput",
- "Name" -> "my-test-elk",
- "Nodes" -> "bktv001, bktv002.amadeus.net",
- "Ports" -> "9200",
- "Index" -> "test.index",
- "DateField" -> "docDate",
- "SubIndexDatePattern" -> "yyyy.MM.dd",
- "Mode" -> "append",
- "Timeout" -> "24",
- "Options" -> Map(
+ "output" -> Map(
+ "type" -> "com.amadeus.dataio.output.streaming.ElkOutput",
+ "name" -> "my-test-elk",
+ "nodes" -> "bktv001, bktv002.amadeus.net",
+ "ports" -> "9200",
+ "index" -> "test.index",
+ "dateField" -> "docDate",
+ "subIndexDatePattern" -> "yyyy.MM.dd",
+ "mode" -> "append",
+ "timeout" -> "24 hours",
+ "options" -> Map(
"\"es.net.ssl.cert.allow.self.signed\"" -> true,
"\"es.index.auto.create\"" -> true,
"\"es.mapping.id\"" -> "docId",
@@ -169,9 +158,9 @@ class ElkOutputTest extends AnyWordSpec with Matchers {
)
)
- val elkStreamingOutput = ElkOutput.apply(config.getConfig("Output"))
+ val elkStreamingOutput = ElkOutput.apply(config.getConfig("output"))
- elkStreamingOutput.outputName shouldEqual Some("my-test-elk")
+ elkStreamingOutput.name shouldEqual "my-test-elk"
elkStreamingOutput.index shouldEqual "test.index"
elkStreamingOutput.dateField shouldEqual "docDate"
elkStreamingOutput.suffixDatePattern shouldEqual "yyyy.MM.dd"
@@ -196,29 +185,20 @@ class ElkOutputTest extends AnyWordSpec with Matchers {
"return a query name based on index name" in {
val elkOutput =
- ElkOutput(index = "test.index", trigger = None, timeout = 0L, mode = "", dateField = "docDate", suffixDatePattern = "yyyy.MM", outputName = None)
+ ElkOutput(
+ index = "test.index",
+ trigger = None,
+ timeout = 0L,
+ mode = "",
+ dateField = "docDate",
+ suffixDatePattern = "yyyy.MM",
+ name = ""
+ )
val queryName = elkOutput.createQueryName()
queryName should fullyMatch regex "^QN_test.index_" + uuidPattern + "$"
}
-
- "return a query name based on output name" in {
-
- val elkOutput = ElkOutput(
- index = "test.index",
- trigger = None,
- timeout = 0L,
- mode = "",
- dateField = "docDate",
- suffixDatePattern = "yyyy.MM",
- outputName = Some("myTestOutput")
- )
-
- val queryName = elkOutput.createQueryName()
-
- queryName should fullyMatch regex "^QN_myTestOutput_test.index_" + uuidPattern + "$"
- }
}
}
diff --git a/kafka/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala b/kafka/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala
index 6ca60c3..583f164 100644
--- a/kafka/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala
+++ b/kafka/src/main/scala/com/amadeus/dataio/pipes/kafka/streaming/KafkaOutput.scala
@@ -8,16 +8,16 @@ import org.apache.spark.sql.{Dataset, SparkSession}
import scala.util.Try
/** Class for reading kafka dataframe
- *
- * @param config Contains the Typesafe Config object that was used at instantiation to configure this entity.
- */
+ *
+ * @param config Contains the Typesafe Config object that was used at instantiation to configure this entity.
+ */
case class KafkaOutput(
- name: String,
+ name: String,
trigger: Option[Trigger],
- timeout: Option[Long],
+ timeout: Option[Long],
mode: String,
options: Map[String, String] = Map(),
- config: Config = ConfigFactory.empty()
+ config: Config = ConfigFactory.empty()
) extends Output
with Logging {
@@ -53,9 +53,9 @@ case class KafkaOutput(
}
/** Create a unique query name based on output topic.
- *
- * @return a unique query name.
- */
+ *
+ * @return a unique query name.
+ */
private[streaming] def createQueryName(): String = {
s"QN_${name}_${java.util.UUID.randomUUID}"
@@ -66,10 +66,10 @@ object KafkaOutput {
import com.amadeus.dataio.config.fields._
/** Creates an KafkaOutput based on a given configuration.
- *
- * @param config The collection of config nodes that will be used to instantiate KafkaOutput.
- * @return a new instance of KafkaOutput.
- */
+ *
+ * @param config The collection of config nodes that will be used to instantiate KafkaOutput.
+ * @return a new instance of KafkaOutput.
+ */
def apply(implicit config: Config): KafkaOutput = {
val name = Try {
config.getString("name")
diff --git a/project/SparkProfiles.scala b/project/SparkProfiles.scala
new file mode 100644
index 0000000..6350699
--- /dev/null
+++ b/project/SparkProfiles.scala
@@ -0,0 +1,100 @@
+/** Spark version profiles — the SINGLE SOURCE OF TRUTH for all version-coupled dependencies.
+ *
+ * Each profile encapsulates:
+ * - Spark version (e.g., 3.3.4, 3.4.3, 3.5.3, 4.0.0)
+ * - Scala version (2.12 or 2.13, derived from Spark requirements)
+ * - Java target compatibility (8, 11, 17, 21)
+ * - All connector library versions (Kafka, Snowflake, Elasticsearch, etc.)
+ * - Test library versions coupled to the Spark/Scala combination
+ *
+ * Usage:
+ * Set the SPARK_PROFILE environment variable before invoking SBT:
+ * SPARK_PROFILE=spark34 sbt compile
+ *
+ * Defaults to "spark35" if not set.
+ */
+object SparkProfiles {
+
+ case class SparkProfile(
+ // ── Core versions ──────────────────────────────────────────────
+ sparkVersion: String,
+ scalaVersion: String,
+ javaTarget: String,
+ // ── Connector versions ─────────────────────────────────────────
+ sparkSnowflakeVersion: Option[String],
+ elasticsearchSparkVersion: Option[String],
+ embeddedKafkaVersion: String,
+ // ── Common dependency versions ────────────────────────────────
+ typesafeConfigVersion: String = "1.4.3",
+ slf4jApiVersion: String = "2.0.7",
+ commonsIoVersion: String = "2.13.0",
+ scalatestVersion: String = "3.2.15",
+ scalamockVersion: String = "5.2.0"
+ ) {
+
+ /** Major.Minor string, e.g. "3.5" */
+ val sparkBinaryVersion: String = sparkVersion.split("\\.").take(2).mkString(".")
+
+ /** Scala binary version, e.g. "2.12" */
+ val scalaBinaryVersion: String = scalaVersion.split("\\.").take(2).mkString(".")
+
+ /** Whether this profile supports Snowflake connector */
+ val supportsSnowflake: Boolean = sparkSnowflakeVersion.isDefined
+
+ /** Whether this profile supports Elasticsearch connector */
+ val supportsElasticsearch: Boolean = elasticsearchSparkVersion.isDefined
+ }
+
+ // ════════════════════════════════════════════════════════════════════
+ // Profile definitions — update versions HERE when upgrading
+ // ════════════════════════════════════════════════════════════════════
+
+ val profiles: Map[String, SparkProfile] = Map(
+ // ── Spark 3.4.x ──────────────────────────────────────────────────
+ "spark34" -> SparkProfile(
+ sparkVersion = "3.4.4",
+ scalaVersion = "2.12.15",
+ javaTarget = "11",
+ sparkSnowflakeVersion = Some("2.16.0-spark_3.4"),
+ elasticsearchSparkVersion = Some("8.17.4"),
+ embeddedKafkaVersion = "3.4.1"
+ ),
+ // ── Spark 3.5.x (default) ────────────────────────────────────────
+ "spark35" -> SparkProfile(
+ sparkVersion = "3.5.3",
+ scalaVersion = "2.12.15",
+ javaTarget = "11",
+ sparkSnowflakeVersion = Some("3.1.1"),
+ elasticsearchSparkVersion = None,
+ embeddedKafkaVersion = "3.5.1"
+ ),
+ // ── Spark 4.0.x ──────────────────────────────────────────────────
+ // NOTE: Spark 4.0 drops Scala 2.12 support. Snowflake & ES
+ // connectors may not yet support Spark 4.0 — disabled here.
+ "spark40" -> SparkProfile(
+ sparkVersion = "4.0.2",
+ scalaVersion = "2.13.17",
+ javaTarget = "17",
+ sparkSnowflakeVersion = None,
+ elasticsearchSparkVersion = None,
+ embeddedKafkaVersion = "3.9.1"
+ )
+ )
+
+ // ════════════════════════════════════════════════════════════════════
+ // Active profile resolution
+ // ════════════════════════════════════════════════════════════════════
+
+ val DefaultProfile = "spark35"
+
+ val activeProfileName: String = sys.env.getOrElse("SPARK_PROFILE", DefaultProfile)
+
+ val active: SparkProfile = profiles.getOrElse(
+ activeProfileName,
+ sys.error(s"Unknown SPARK_PROFILE '$activeProfileName'. Available profiles: ${profiles.keys.mkString(", ")}")
+ )
+
+ println(
+ s"[SparkProfiles] Active profile: $activeProfileName (Spark ${active.sparkVersion}, Scala ${active.scalaVersion}, Java ${active.javaTarget})"
+ )
+}
diff --git a/testutils/src/main/scala/com/amadeus/dataio/testutils/JavaImplicitConverters.scala b/testutils/src/main/scala/com/amadeus/dataio/testutils/JavaImplicitConverters.scala
index 3b8ee01..e5e04c1 100644
--- a/testutils/src/main/scala/com/amadeus/dataio/testutils/JavaImplicitConverters.scala
+++ b/testutils/src/main/scala/com/amadeus/dataio/testutils/JavaImplicitConverters.scala
@@ -1,25 +1,24 @@
package com.amadeus.dataio.testutils
-import scala.collection.JavaConverters._
+import scala.jdk.CollectionConverters._
import scala.language.implicitConversions
-/**
- * Contains helper implicit conversions to make working with functions expecting Java maps/lists easier.
- * These conversions are meant for tests only, for instance to create typesafe Config objects.
- * e.g.:
- *
- * import com.amadeus.dataio.test.JavaImplicitConverters._
- * import com.typesafe.config.ConfigFactory
- *
- * val config = ConfigFactory.parseMap(
- * Map(
- * "MyField" -> Seq("val1", "val2", "val3"),
- * "MyOtherField" -> 5
- * )
- * )
- *
- *
- */
+/** Contains helper implicit conversions to make working with functions expecting Java maps/lists easier.
+ * These conversions are meant for tests only, for instance to create typesafe Config objects.
+ * e.g.:
+ *
+ * import com.amadeus.dataio.test.JavaImplicitConverters._
+ * import com.typesafe.config.ConfigFactory
+ *
+ * val config = ConfigFactory.parseMap(
+ * Map(
+ * "MyField" -> Seq("val1", "val2", "val3"),
+ * "MyOtherField" -> 5
+ * )
+ * )
+ *
+ *
+ */
object JavaImplicitConverters {
import scala.language.implicitConversions
diff --git a/testutils/src/main/scala/com/amadeus/dataio/testutils/SparkStreamingSuite.scala b/testutils/src/main/scala/com/amadeus/dataio/testutils/SparkStreamingSuite.scala
index 00efa1a..51fa211 100644
--- a/testutils/src/main/scala/com/amadeus/dataio/testutils/SparkStreamingSuite.scala
+++ b/testutils/src/main/scala/com/amadeus/dataio/testutils/SparkStreamingSuite.scala
@@ -3,45 +3,42 @@ package com.amadeus.dataio.testutils
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.streaming.Trigger
-/**
- * Provides function for spark streaming tests
- *
- * Initialize the spark session before the tests
- * Automatically close the session after TestSuite is finished
- * Provides a wrapper to spark.implicits by importing sparkTestImplicits
- * Provides functions to collect results
- *
- * e.g.
- * {{{
- * class MyClassTest extends AnyWordSpec with SparkStreamingSuite {
- * // provided by SparkStreamingSuite:
- * // sparkSession: SparkSession
- * // sparkTestImplicits
- * }
- * }}}
- */
+/** Provides function for spark streaming tests
+ *
+ * Initialize the spark session before the tests
+ * Automatically close the session after TestSuite is finished
+ * Provides a wrapper to spark.implicits by importing sparkTestImplicits
+ * Provides functions to collect results
+ *
+ * e.g.
+ * {{{
+ * class MyClassTest extends AnyWordSpec with SparkStreamingSuite {
+ * // provided by SparkStreamingSuite:
+ * // sparkSession: SparkSession
+ * // sparkTestImplicits
+ * }
+ * }}}
+ */
trait SparkStreamingSuite extends SparkSuite {
- /**
- * enable spark streaming schema inference
- */
+ /** enable spark streaming schema inference
+ */
def enableSparkStreamingSchemaInference(): Unit = sparkSession.sql("set spark.sql.streaming.schemaInference=true")
- /**
- * Collect data from dataframe read from stream
- *
- * Use an in memory sink to gather the data
- *
- * @param dataFrame the dataFrame to collect.
- * @return the String representation of each rows.
- */
+ /** Collect data from dataframe read from stream
+ *
+ * Use an in memory sink to gather the data
+ *
+ * @param dataFrame the dataFrame to collect.
+ * @return the String representation of each rows.
+ */
def collectDataStream(dataFrame: DataFrame): Array[String] = {
val tmpTableName = "result_" + System.currentTimeMillis()
val streamWriter = dataFrame.writeStream
.format("memory")
.queryName(tmpTableName)
- .trigger(Trigger.Once())
+ .trigger(Trigger.AvailableNow())
.start()
streamWriter.awaitTermination(2000)
diff --git a/testutils/src/main/scala/com/amadeus/dataio/testutils/SparkSuite.scala b/testutils/src/main/scala/com/amadeus/dataio/testutils/SparkSuite.scala
index be5fe0c..9968f1d 100644
--- a/testutils/src/main/scala/com/amadeus/dataio/testutils/SparkSuite.scala
+++ b/testutils/src/main/scala/com/amadeus/dataio/testutils/SparkSuite.scala
@@ -24,10 +24,6 @@ trait SparkSuite extends Suite with BeforeAndAfter {
implicit var sparkSession: SparkSession = _
- object sparkTestImplicits extends SQLImplicits with Serializable {
- protected override def _sqlContext: SQLContext = sparkSession.sqlContext
- }
-
before {
sparkSession = SparkSession
.builder()
diff --git a/version.sbt b/version.sbt
index 6075046..22af523 100644
--- a/version.sbt
+++ b/version.sbt
@@ -1 +1 @@
-ThisBuild / version := "1.1.1-spark3.5.0-SNAPSHOT"
+ThisBuild / version := s"1.1.1-spark${SparkProfiles.active.sparkVersion}-SNAPSHOT"