diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..8d29fb9 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,4 @@ +# Exclude everything by default… +* +# …then re-include only what the Dockerfile actually copies. +!build/libs/butcher.jar diff --git a/.github/workflows/butcher-release.yml b/.github/workflows/butcher-release.yml new file mode 100644 index 0000000..5f1eb2b --- /dev/null +++ b/.github/workflows/butcher-release.yml @@ -0,0 +1,123 @@ +name: Butcher release + +on: + workflow_dispatch: + inputs: + tag: + description: 'Tag to (re)release butcher for, e.g. v1.15.0' + required: true + type: string + +concurrency: butcher-release + +permissions: + contents: write # create GitHub Release + upload jar asset + packages: write # push image to GHCR + +jobs: + release: + runs-on: ubuntu-latest + # Long enough for multi-arch builds, arm64 under QEMU emulation can be slow on first run before GHA cache is warm + timeout-minutes: 30 + + steps: + - name: Resolve tag + id: tag + env: + # `inputs.tag` is user-supplied (typed into the workflow_dispatch form) + INPUT_TAG: ${{ inputs.tag }} + + run: | + if [ "${{ github.event_name }}" = "workflow_dispatch" ]; then + # Manually triggered, use the user-supplied tag + TAG="$INPUT_TAG" + else + # Automatically triggered, use the tag from the ref + TAG="$GITHUB_REF_NAME" + fi + + # Guard: tag must look like `v..` with an optional pre-release suffix, e.g. + # `v1.15.0` or `v1.15.0-rc1` + if [[ ! "$TAG" =~ ^v[0-9]+\.[0-9]+\.[0-9]+(-[a-zA-Z0-9.-]+)?$ ]]; then + echo "FAIL: tag '$TAG' must match pattern 'v..[-prerelease]'" + exit 1 + fi + + VERSION="${TAG#v}" + echo "tag=$TAG" >> "$GITHUB_OUTPUT" + echo "version=$VERSION" >> "$GITHUB_OUTPUT" + echo "Releasing butcher for tag=$TAG (version=$VERSION)" + + - name: Checkout tag commit + uses: actions/checkout@v6 + with: + ref: ${{ steps.tag.outputs.tag }} + + - name: Setup JDK 17 + uses: actions/setup-java@v5 + with: + java-version: '17' + distribution: 'corretto' + cache: 'gradle' + + - name: Build butcher fat jar + # `-Prelease.version=` is nebula-release's override for "use this version, skip inference." The tag name is the + # single source of truth: this step uses it, the Docker metadata step uses it, the GitHub Release step uses it. + # + # To verify the mechanism locally: + # ./gradlew butcherJar -Prelease.version=9.9.9 + # unzip -p build/libs/butcher.jar META-INF/MANIFEST.MF \ + # | grep Implementation-Version + # Expected: `Implementation-Version: 9.9.9` + run: ./gradlew butcherJar -Prelease.version=${{ steps.tag.outputs.version }} + + - name: Set up QEMU + uses: docker/setup-qemu-action@v4 + + - name: Set up Docker Buildx + # QEMU enables cross-arch builds (ARM under emulation on amd64 runners). Buildx is the multi-platform-aware builder. + uses: docker/setup-buildx-action@v4 + + - name: Log in to GHCR + uses: docker/login-action@v4 + with: + registry: ghcr.io + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + + - name: Extract Docker metadata + id: docker-meta + uses: docker/metadata-action@v6 + with: + images: ghcr.io/vgv/butcher + # `:latest` moves only on a plain production release. A prerelease tag (e.g. v0.195.0-rc.1, version contains a `-`) + # still builds and pushes its own immutable `:0.195.0-rc.1` image, but must not hijack `:latest` — operators pulling + # `:latest` expect the last stable build. + # + # type=semver skips the floating {{major}} / {{major}}.{{minor}} tags for prerelease versions automatically — no guard needed there. + tags: | + type=raw,value=${{ steps.tag.outputs.version }} + type=raw,value=latest,enable=${{ !contains(steps.tag.outputs.version, '-') }} + type=semver,pattern={{major}}.{{minor}},value=${{ steps.tag.outputs.version }} + type=semver,pattern={{major}},value=${{ steps.tag.outputs.version }} + + - name: Build and push Docker image + uses: docker/build-push-action@v7 + with: + context: . + file: Dockerfile + push: true + platforms: linux/amd64,linux/arm64 + tags: ${{ steps.docker-meta.outputs.tags }} + labels: ${{ steps.docker-meta.outputs.labels }} + cache-from: type=gha + cache-to: type=gha,mode=max + + - name: Upload butcher.jar to GitHub Release + uses: softprops/action-gh-release@v3 + with: + tag_name: ${{ steps.tag.outputs.tag }} + files: build/libs/butcher.jar + fail_on_unmatched_files: true + env: + GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/pull-request.yml b/.github/workflows/pull-request.yml index a531fb0..09ec451 100644 --- a/.github/workflows/pull-request.yml +++ b/.github/workflows/pull-request.yml @@ -1,39 +1,40 @@ name: On pull request on: - pull_request: - paths-ignore: - - README.md + pull_request: + paths-ignore: + - README.md + - docs/** concurrency: on-pull-request jobs: - build: - runs-on: ubuntu-latest + build: + runs-on: ubuntu-latest - steps: - - name: Checkout - uses: actions/checkout@v6 - with: - fetch-depth: 0 + steps: + - name: Checkout + uses: actions/checkout@v6 + with: + fetch-depth: 0 - - name: Setup JDK 17 - uses: actions/setup-java@v5 - with: - java-version: '17' - distribution: 'corretto' - cache: 'gradle' + - name: Setup JDK 17 + uses: actions/setup-java@v5 + with: + java-version: '17' + distribution: 'corretto' + cache: 'gradle' - - name: Build (by human) - if: github.actor != 'dependabot[bot]' - run: ./gradlew devSnapshot printDevSnapshotReleaseNote - env: - GPG_SIGNING_KEY: ${{ secrets.GPG_SIGNING_KEY }} - GPG_SIGNING_PASSWORD: ${{ secrets.GPG_SIGNING_PASSWORD }} - SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }} - SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }} - GITHUB_HEAD_REF: ${{ github.head_ref }} + - name: Build (by human) + if: github.actor != 'dependabot[bot]' + run: ./gradlew devSnapshot printDevSnapshotReleaseNote + env: + GPG_SIGNING_KEY: ${{ secrets.GPG_SIGNING_KEY }} + GPG_SIGNING_PASSWORD: ${{ secrets.GPG_SIGNING_PASSWORD }} + SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }} + SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }} + GITHUB_HEAD_REF: ${{ github.head_ref }} - - name: Build (by dependabot) - if: github.actor == 'dependabot[bot]' - run: ./gradlew test + - name: Build (by dependabot) + if: github.actor == 'dependabot[bot]' + run: ./gradlew test diff --git a/.github/workflows/push-to-main.yml b/.github/workflows/push-to-main.yml index d94c1f5..d328e9f 100644 --- a/.github/workflows/push-to-main.yml +++ b/.github/workflows/push-to-main.yml @@ -1,41 +1,42 @@ name: On push to main on: - push: - branches: - - main - paths-ignore: - - README.md + push: + branches: + - main + paths-ignore: + - README.md + - docs/** concurrency: on-push-to-main jobs: - build: - runs-on: ubuntu-latest + build: + runs-on: ubuntu-latest - steps: - - name: Checkout - uses: actions/checkout@v6 - with: - fetch-depth: 0 + steps: + - name: Checkout + uses: actions/checkout@v6 + with: + fetch-depth: 0 - - name: Setup Git - run: | - git config user.name "${{ github.actor }}" - git config user.email "${{ github.actor }}@users.noreply.github.com" + - name: Setup Git + run: | + git config user.name "${{ github.actor }}" + git config user.email "${{ github.actor }}@users.noreply.github.com" - - name: Setup JDK 17 - uses: actions/setup-java@v5 - with: - java-version: '17' - distribution: 'corretto' - cache: 'gradle' + - name: Setup JDK 17 + uses: actions/setup-java@v5 + with: + java-version: '17' + distribution: 'corretto' + cache: 'gradle' - - name: Build - run: ./gradlew final closeAndReleaseStagingRepository printFinalReleaseNote - env: - GPG_SIGNING_KEY: ${{ secrets.GPG_SIGNING_KEY }} - GPG_SIGNING_PASSWORD: ${{ secrets.GPG_SIGNING_PASSWORD }} - SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }} - SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }} - GITHUB_HEAD_REF: ${{ github.head_ref }} + - name: Build + run: ./gradlew final closeAndReleaseStagingRepository printFinalReleaseNote + env: + GPG_SIGNING_KEY: ${{ secrets.GPG_SIGNING_KEY }} + GPG_SIGNING_PASSWORD: ${{ secrets.GPG_SIGNING_PASSWORD }} + SONATYPE_USERNAME: ${{ secrets.SONATYPE_USERNAME }} + SONATYPE_PASSWORD: ${{ secrets.SONATYPE_PASSWORD }} + GITHUB_HEAD_REF: ${{ github.head_ref }} diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..2e4ff8d --- /dev/null +++ b/Dockerfile @@ -0,0 +1,11 @@ +FROM eclipse-temurin:17-jre + +LABEL org.opencontainers.image.source=https://github.com/vgv/kolbasa +LABEL org.opencontainers.image.title=butcher +LABEL org.opencontainers.image.description="CLI for managing a kolbasa cluster" + +WORKDIR /work + +COPY build/libs/butcher.jar /app/butcher.jar + +ENTRYPOINT ["java", "-jar", "/app/butcher.jar"] diff --git a/build.gradle.kts b/build.gradle.kts index 573bd47..2630e25 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -126,6 +126,51 @@ tasks { } } +// ===== Butcher fat jar (build/libs/butcher.jar) ===== +tasks.register("butcherJar") { + group = "build" + description = "Builds the standalone butcher CLI fat jar" + + archiveBaseName.set("butcher") + archiveClassifier.set("") + archiveVersion.set("") + + // Reproducibility: same source → same output bytes. Useful for both + // CI cache hits and for the "same tag → same butcher.jar" guarantee + // we promised in Phase 1's failure-recovery section. + isPreserveFileTimestamps = false + isReproducibleFileOrder = true + + manifest { + attributes( + "Main-Class" to "kolbasa.cluster.butcher.ButcherKt", + "Implementation-Title" to "butcher", + "Implementation-Version" to project.sanitizeVersion(), + // postgresql 42.7.x is a Multi-Release jar: it ships JDK11-specialized classes under META-INF/versions/11/ + "Multi-Release" to true, + ) + } + + // Our own compiled classes + resources. + from(sourceSets.main.get().output) + + // All runtime dependencies, unpacked. compileOnly deps (prometheus, + // opentelemetry) are deliberately NOT in runtimeClasspath, so they + // don't end up in the fat jar. + dependsOn(configurations.runtimeClasspath) + from({ + configurations.runtimeClasspath.get().map { if (it.isDirectory) it else zipTree(it) } + }) + + // Exclude per-dep MANIFEST.MF: each dep has its own; our `manifest { }` + // block above wins regardless, but excluding explicitly is clearer. + exclude("META-INF/MANIFEST.MF") + // Exclude a root module-info.class: we're an executable, not a JPMS module + exclude("/module-info.class") + + duplicatesStrategy = DuplicatesStrategy.EXCLUDE +} + tasks.withType { doFirst { settingsProvider.validateGPGSecrets() diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index d3d4d8c..ce11cd7 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -11,10 +11,10 @@ nebula = "20.2.0" nexus = "2.0.0" prometheus = "1.6.1" -opentelemetry = "1.61.0" +opentelemetry = "1.62.0" opentelemetry-instrumentation = "2.27.0" opentelemetry-instrumentation-incubator = "2.27.0-alpha" -opentelemetry-semconv = "1.41.0" +opentelemetry-semconv = "1.41.1" hikaricp = "7.0.2" postgresql = "42.7.11" diff --git a/gradle/wrapper/gradle-wrapper.jar b/gradle/wrapper/gradle-wrapper.jar index d997cfc..b1b8ef5 100644 Binary files a/gradle/wrapper/gradle-wrapper.jar and b/gradle/wrapper/gradle-wrapper.jar differ diff --git a/gradle/wrapper/gradle-wrapper.properties b/gradle/wrapper/gradle-wrapper.properties index 1a70468..df6a6ad 100644 --- a/gradle/wrapper/gradle-wrapper.properties +++ b/gradle/wrapper/gradle-wrapper.properties @@ -1,7 +1,9 @@ distributionBase=GRADLE_USER_HOME distributionPath=wrapper/dists -distributionUrl=https\://services.gradle.org/distributions/gradle-9.5.0-bin.zip +distributionUrl=https\://services.gradle.org/distributions/gradle-9.5.1-bin.zip networkTimeout=10000 +retries=0 +retryBackOffMs=500 validateDistributionUrl=true zipStoreBase=GRADLE_USER_HOME zipStorePath=wrapper/dists diff --git a/gradlew b/gradlew index 739907d..b9bb139 100755 --- a/gradlew +++ b/gradlew @@ -57,7 +57,7 @@ # Darwin, MinGW, and NonStop. # # (3) This script is generated from the Groovy template -# https://github.com/gradle/gradle/blob/2d6327017519d23b96af35865dc997fcb544fb40/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt +# https://github.com/gradle/gradle/blob/3d91ce3b8caaf77ad09f381f43615b715b53f72c/platforms/jvm/plugins-application/src/main/resources/org/gradle/api/internal/plugins/unixStartScript.txt # within the Gradle project. # # You can find Gradle at https://github.com/gradle/gradle/. diff --git a/gradlew.bat b/gradlew.bat old mode 100644 new mode 100755 index e509b2d..aa5f10b --- a/gradlew.bat +++ b/gradlew.bat @@ -23,8 +23,8 @@ @rem @rem ########################################################################## -@rem Set local scope for the variables with windows NT shell -if "%OS%"=="Windows_NT" setlocal +@rem Set local scope for the variables, and ensure extensions are enabled +setlocal EnableExtensions set DIRNAME=%~dp0 if "%DIRNAME%"=="" set DIRNAME=. @@ -51,7 +51,7 @@ echo. 1>&2 echo Please set the JAVA_HOME variable in your environment to match the 1>&2 echo location of your Java installation. 1>&2 -goto fail +"%COMSPEC%" /c exit 1 :findJavaFromJavaHome set JAVA_HOME=%JAVA_HOME:"=% @@ -65,7 +65,7 @@ echo. 1>&2 echo Please set the JAVA_HOME variable in your environment to match the 1>&2 echo location of your Java installation. 1>&2 -goto fail +"%COMSPEC%" /c exit 1 :execute @rem Setup the command line @@ -73,21 +73,10 @@ goto fail @rem Execute Gradle -"%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -jar "%APP_HOME%\gradle\wrapper\gradle-wrapper.jar" %* +@rem endlocal doesn't take effect until after the line is parsed and variables are expanded +@rem which allows us to clear the local environment before executing the java command +endlocal & "%JAVA_EXE%" %DEFAULT_JVM_OPTS% %JAVA_OPTS% %GRADLE_OPTS% "-Dorg.gradle.appname=%APP_BASE_NAME%" -jar "%APP_HOME%\gradle\wrapper\gradle-wrapper.jar" %* & call :exitWithErrorLevel -:end -@rem End local scope for the variables with windows NT shell -if %ERRORLEVEL% equ 0 goto mainEnd - -:fail -rem Set variable GRADLE_EXIT_CONSOLE if you need the _script_ return code instead of -rem the _cmd.exe /c_ return code! -set EXIT_CODE=%ERRORLEVEL% -if %EXIT_CODE% equ 0 set EXIT_CODE=1 -if not ""=="%GRADLE_EXIT_CONSOLE%" exit %EXIT_CODE% -exit /b %EXIT_CODE% - -:mainEnd -if "%OS%"=="Windows_NT" endlocal - -:omega +:exitWithErrorLevel +@rem Use "%COMSPEC%" /c exit to allow operators to work properly in scripts +"%COMSPEC%" /c exit %ERRORLEVEL% diff --git a/src/main/kotlin/kolbasa/cluster/butcher/Butcher.kt b/src/main/kotlin/kolbasa/cluster/butcher/Butcher.kt index f138626..725d20c 100644 --- a/src/main/kotlin/kolbasa/cluster/butcher/Butcher.kt +++ b/src/main/kotlin/kolbasa/cluster/butcher/Butcher.kt @@ -2,9 +2,15 @@ package kolbasa.cluster.butcher import kolbasa.cluster.butcher.check.check import kolbasa.cluster.butcher.config.Command +import java.lang.invoke.MethodHandles import kotlin.system.exitProcess fun main(args: Array) { + if (args.isNotEmpty() && (args[0] == "--version" || args[0] == "-v")) { + println(butcherVersion()) + exitProcess(0) + } + val command = try { Command.parseCommand(args) } catch (e: ButcherException.InvalidConfigurationException) { @@ -36,6 +42,17 @@ fun main(args: Array) { println("=================================================") } +private fun butcherVersion(): String { + // `MethodHandles.lookup().lookupClass()` is the idiomatic JVM way to get a + // Class handle to "the class containing this code" without declaring a + // placeholder type. It resolves to the synthetic `ButcherKt` class that + // Kotlin generates for top-level functions in this file, whose `Package` + // carries the jar manifest's `Implementation-Version` attribute set by + // the shadow task. Falls back to "unknown" outside a jar (IDE, + // `./gradlew run`), where the JVM doesn't populate the attribute. + return MethodHandles.lookup().lookupClass().`package`.implementationVersion ?: "unknown" +} + private fun red(msg: String): String { return "$ANSI_RED$msg$ANSI_RESET" } diff --git a/src/main/kotlin/kolbasa/cluster/butcher/ConsoleProgressCallback.kt b/src/main/kotlin/kolbasa/cluster/butcher/ConsoleProgressCallback.kt index 6c6e861..3784b17 100644 --- a/src/main/kotlin/kolbasa/cluster/butcher/ConsoleProgressCallback.kt +++ b/src/main/kotlin/kolbasa/cluster/butcher/ConsoleProgressCallback.kt @@ -9,14 +9,13 @@ internal object ConsoleProgressCallback : ProgressCallback { println("Prepare") println("Shards: $shards") println("Target node: ${targetNode.id}") - println("Shards to move (${shardsDiff.size}):") + println("Shards prepared to move (${shardsDiff.size}):") shardsDiff.forEach { diff -> - // Shard(shard=6, producerNode=NodeId(id=db4), consumerNode=NodeId(id=db4), nextConsumerNode=null) => Shard(shard=6, producerNode=NodeId(id=db5), consumerNode=null, nextConsumerNode=NodeId(id=db5)) val originalShard = - "Shard #${diff.originalShard.shard}(producerNode=${diff.originalShard.producerNode.id}, consumerNode=${diff.originalShard.consumerNode?.id}, nextConsumerNode=${diff.originalShard.nextConsumerNode?.id})" + "[producerNode=${diff.originalShard.producerNode.id}, consumerNode=${diff.originalShard.consumerNode?.id}, nextConsumerNode=${diff.originalShard.nextConsumerNode?.id}]" val updatedShard = - "Shard #${diff.updatedShard.shard}(producerNode=${diff.updatedShard.producerNode.id}, consumerNode=${diff.updatedShard.consumerNode?.id}, nextConsumerNode=${diff.updatedShard.nextConsumerNode?.id})" - println("\t$originalShard=>$updatedShard") + "[producerNode=${diff.updatedShard.producerNode.id}, consumerNode=${diff.updatedShard.consumerNode?.id}, nextConsumerNode=${diff.updatedShard.nextConsumerNode?.id}]" + println("\tShard #${diff.originalShard.shard} $originalShard=>$updatedShard") } } @@ -41,4 +40,16 @@ internal object ConsoleProgressCallback : ProgressCallback { ) { println("Move table '$tableName' from $from to $to, migrated rows: $migratedRows") } + + override fun finalizeSuccessful(shardsDiff: List) { + println("Finalize") + println("Shards moved to stable state (${shardsDiff.size}):") + shardsDiff.forEach { diff -> + val originalShard = + "[producerNode=${diff.originalShard.producerNode.id}, consumerNode=${diff.originalShard.consumerNode?.id}, nextConsumerNode=${diff.originalShard.nextConsumerNode?.id}]" + val updatedShard = + "[producerNode=${diff.updatedShard.producerNode.id}, consumerNode=${diff.updatedShard.consumerNode?.id}, nextConsumerNode=${diff.updatedShard.nextConsumerNode?.id}]" + println("\tShard #${diff.originalShard.shard} $originalShard=>$updatedShard") + } + } } diff --git a/src/main/kotlin/kolbasa/cluster/butcher/Finalize.kt b/src/main/kotlin/kolbasa/cluster/butcher/Finalize.kt index 971dec8..16fabe0 100644 --- a/src/main/kotlin/kolbasa/cluster/butcher/Finalize.kt +++ b/src/main/kotlin/kolbasa/cluster/butcher/Finalize.kt @@ -1,7 +1,31 @@ package kolbasa.cluster.butcher +import kolbasa.cluster.ClusterHelper import kolbasa.cluster.butcher.config.Command +import kolbasa.cluster.schema.ShardSchema +import kolbasa.utils.JdbcHelpers.useStatement internal fun finalize(command: Command.Finalize) { - println("TODO: finalize migration") + val nodes = ClusterHelper.readNodes(command.nodes.dataSources) + val (shardDataSource, initialShards) = MoveHelpers.readShards(nodes) + + // Okay, everything looks good, let's switch the shard table to a stable state + val query = """ + update ${ShardSchema.SHARD_TABLE_NAME} + set + ${ShardSchema.CONSUMER_NODE_COLUMN_NAME} = ${ShardSchema.NEXT_CONSUMER_NODE_COLUMN_NAME}, + ${ShardSchema.NEXT_CONSUMER_NODE_COLUMN_NAME} = null + where + ${ShardSchema.CONSUMER_NODE_COLUMN_NAME} is null + """.trimIndent() + + shardDataSource.useStatement { statement -> + statement.executeUpdate(query) + } + + // Calculate difference between initial and updated shards and notify + val updatedShards = ShardSchema.readShards(shardDataSource) + ConsoleProgressCallback.finalizeSuccessful( + shardsDiff = MoveHelpers.calculateShardsDiff(initialShards, updatedShards) + ) } diff --git a/src/main/kotlin/kolbasa/cluster/butcher/Move.kt b/src/main/kotlin/kolbasa/cluster/butcher/Move.kt index ef8f180..24f691e 100644 --- a/src/main/kotlin/kolbasa/cluster/butcher/Move.kt +++ b/src/main/kotlin/kolbasa/cluster/butcher/Move.kt @@ -18,7 +18,11 @@ internal fun move(command: Command.Move) { val nodes = ClusterHelper.readNodes(command.nodes.dataSources) // tablename => schema - val schemas = findAndCompareAllSchemas(command.nodes.dataSources, command.tables) + val schemas = findAndCompareAllSchemas( + command.nodes.dataSources, + includeTables = command.includeTables, + excludeTables = command.excludeTables + ) // targetnode => shards (which should be migrated to this node) val targetsToShards = findTargetNodeAndShards(nodes) @@ -60,13 +64,16 @@ private fun findTargetNodeAndShards(nodes: SortedMap): Map, - tablesToFind: Set? + includeTables: Set?, + excludeTables: Set? ): Map { // schemas val allSchemas = mutableMapOf>>() dataSources.forEach { dataSource -> - SchemaExtractor.extractRawSchema(dataSource, tablesToFind).forEach { (tableName, table) -> - allSchemas.computeIfAbsent(tableName) { mutableListOf() }.add(dataSource to table) + SchemaExtractor.extractRawSchema(dataSource, includeTables).forEach { (tableName, table) -> + if (excludeTables == null || tableName !in excludeTables) { + allSchemas.computeIfAbsent(tableName) { mutableListOf() }.add(dataSource to table) + } } } diff --git a/src/main/kotlin/kolbasa/cluster/butcher/ProgressCallback.kt b/src/main/kotlin/kolbasa/cluster/butcher/ProgressCallback.kt index eb2f2cc..45df792 100644 --- a/src/main/kotlin/kolbasa/cluster/butcher/ProgressCallback.kt +++ b/src/main/kotlin/kolbasa/cluster/butcher/ProgressCallback.kt @@ -11,4 +11,6 @@ internal interface ProgressCallback { fun moveEnd(tableName: String, from: DataSource, to: DataSource, migratedRows: Int) fun moveNextBatch(tableName: String, from: DataSource, to: DataSource, migratedRows: Int) + fun finalizeSuccessful(shardsDiff: List) + } diff --git a/src/main/kotlin/kolbasa/cluster/butcher/config/AvailableCommand.kt b/src/main/kotlin/kolbasa/cluster/butcher/config/AvailableCommand.kt index f7c59f6..c1e6038 100644 --- a/src/main/kotlin/kolbasa/cluster/butcher/config/AvailableCommand.kt +++ b/src/main/kotlin/kolbasa/cluster/butcher/config/AvailableCommand.kt @@ -107,27 +107,29 @@ internal enum class AvailableCommand( MOVE_DATA( commandName = "move-data", shortUsage = "Transfer data to target nodes (safe to re-run if crashed)", fullUsage = """ - Usage: java -jar butcher.jar move-data [--tables=] ... + Usage: java -jar butcher.jar move-data [--include-tables=] [--exclude-tables=] ... Transfer data from source nodes to target nodes for all shards in migration state. Safe to re-run — INSERT uses ON CONFLICT DO NOTHING. Optional flags: - --tables= comma-separated queue table names (default: all) + --include-tables= comma-separated queue table names to migrate (default: all) + --exclude-tables= comma-separated queue table names not to migrate (default: none) $CONFIG_FILE_FORMAT_HELP """.trimIndent() ) { override fun parse(args: Array): Command.Move { val parsed = try { - ConfigHelper.parseArgs(args.drop(1), supportedFlags = setOf(FLAG_TABLES)) + ConfigHelper.parseArgs(args.drop(1), supportedFlags = setOf(INCLUDE_FLAG_TABLES, EXCLUDE_FLAG_TABLES)) } catch (e: ButcherException.InvalidConfigurationException) { throw wrapWithUsage(e) } - val tables = parsed.flags[FLAG_TABLES]?.split(",")?.map { it.trim() }?.toSet() + val includeTables = parsed.flags[INCLUDE_FLAG_TABLES]?.split(",")?.map { it.trim() }?.toSet() + val excludeTables = parsed.flags[EXCLUDE_FLAG_TABLES]?.split(",")?.map { it.trim() }?.toSet() val nodes = ClusterNodes.buildClusterNodes(parsed.files) - return Command.Move(nodes, tables) + return Command.Move(nodes, includeTables, excludeTables) } }, @@ -201,7 +203,8 @@ private val CONFIG_FILE_FORMAT_HELP = """ private const val FLAG_TARGET = "--target" private const val FLAG_SHARDS = "--shards" -private const val FLAG_TABLES = "--tables" +private const val INCLUDE_FLAG_TABLES = "--include-tables" +private const val EXCLUDE_FLAG_TABLES = "--exclude-tables" diff --git a/src/main/kotlin/kolbasa/cluster/butcher/config/Command.kt b/src/main/kotlin/kolbasa/cluster/butcher/config/Command.kt index 064f5b1..b3f9f57 100644 --- a/src/main/kotlin/kolbasa/cluster/butcher/config/Command.kt +++ b/src/main/kotlin/kolbasa/cluster/butcher/config/Command.kt @@ -13,7 +13,8 @@ internal sealed class Command(val nodes: ClusterNodes, val command: AvailableCom /** * Transfer data to target nodes. Safe to re-run if crashed — INSERT uses ON CONFLICT DO NOTHING. */ - class Move(nodes: ClusterNodes, val tables: Set?) : Command(nodes, AvailableCommand.MOVE_DATA) + class Move(nodes: ClusterNodes, val includeTables: Set?, val excludeTables: Set?) : + Command(nodes, AvailableCommand.MOVE_DATA) class Finalize(nodes: ClusterNodes) : Command(nodes, AvailableCommand.FINALIZE_MIGRATION) diff --git a/src/main/kotlin/kolbasa/inspector/InspectorSchemaHelpers.kt b/src/main/kotlin/kolbasa/inspector/InspectorSchemaHelpers.kt index 7ba7777..63f1433 100644 --- a/src/main/kotlin/kolbasa/inspector/InspectorSchemaHelpers.kt +++ b/src/main/kotlin/kolbasa/inspector/InspectorSchemaHelpers.kt @@ -8,24 +8,21 @@ import java.sql.Connection internal object InspectorSchemaHelpers { - // Message state SQL conditions - private const val SCHEDULED_CONDITION = "${Const.PROCESSING_AT_COLUMN_NAME} is null and " + - "${Const.SCHEDULED_AT_COLUMN_NAME} > statement_timestamp() and " + - "${Const.REMAINING_ATTEMPTS_COLUMN_NAME} > 0" - - private const val READY_CONDITION = "${Const.PROCESSING_AT_COLUMN_NAME} is null and " + - "${Const.SCHEDULED_AT_COLUMN_NAME} <= statement_timestamp() and " + - "${Const.REMAINING_ATTEMPTS_COLUMN_NAME} > 0" + private const val TOUCHED = "${Const.PROCESSING_AT_COLUMN_NAME} is not null" + private const val UNTOUCHED = "${Const.PROCESSING_AT_COLUMN_NAME} is null" - private const val IN_FLIGHT_CONDITION = "${Const.PROCESSING_AT_COLUMN_NAME} is not null and " + - "${Const.SCHEDULED_AT_COLUMN_NAME} > statement_timestamp()" + private const val VISIBLE = "${Const.SCHEDULED_AT_COLUMN_NAME} <= statement_timestamp()" + private const val NOT_VISIBLE = "${Const.SCHEDULED_AT_COLUMN_NAME} > statement_timestamp()" - private const val RETRY_CONDITION = "${Const.PROCESSING_AT_COLUMN_NAME} is not null and " + - "${Const.SCHEDULED_AT_COLUMN_NAME} <= statement_timestamp() and " + - "${Const.REMAINING_ATTEMPTS_COLUMN_NAME} > 0" + private const val EXHAUSTED = "${Const.REMAINING_ATTEMPTS_COLUMN_NAME} <= 0" + private const val NOT_EXHAUSTED = "${Const.REMAINING_ATTEMPTS_COLUMN_NAME} > 0" - private const val DEAD_CONDITION = "${Const.SCHEDULED_AT_COLUMN_NAME} <= statement_timestamp() and " + - "${Const.REMAINING_ATTEMPTS_COLUMN_NAME} <= 0" + // Message state SQL conditions + private const val SCHEDULED_CONDITION = "$UNTOUCHED and $NOT_VISIBLE and $NOT_EXHAUSTED " + private const val READY_CONDITION = "$UNTOUCHED and $VISIBLE and $NOT_EXHAUSTED" + private const val IN_FLIGHT_CONDITION = "$TOUCHED and $NOT_VISIBLE" + private const val RETRY_CONDITION = "$TOUCHED and $VISIBLE and $NOT_EXHAUSTED" + private const val DEAD_CONDITION = "$VISIBLE and $EXHAUSTED" fun generateCountWithFilterQuery(connection: Connection, queue: Queue<*>, options: CountOptions): QueryAndSample { val samplePercent = effectiveSamplePercent(connection, queue, options.samplePercent) @@ -93,9 +90,9 @@ internal object InspectorSchemaHelpers { limit 1) as newest_message_age, (select extract(epoch from (statement_timestamp() - ${Const.SCHEDULED_AT_COLUMN_NAME})) from ${queue.dbTableName} - where ${Const.SCHEDULED_AT_COLUMN_NAME} <= statement_timestamp() and ${Const.REMAINING_ATTEMPTS_COLUMN_NAME} > 0 + where $VISIBLE and $NOT_EXHAUSTED order by ${Const.SCHEDULED_AT_COLUMN_NAME} asc - limit 1) as oldest_ready_message_age + limit 1) as oldest_ready_or_retry_message_age """.trimIndent() } diff --git a/src/test/kotlin/kolbasa/AbstractPostgresqlTest.kt b/src/test/kotlin/kolbasa/AbstractPostgresqlTest.kt index 0da3981..2e5b074 100644 --- a/src/test/kotlin/kolbasa/AbstractPostgresqlTest.kt +++ b/src/test/kotlin/kolbasa/AbstractPostgresqlTest.kt @@ -107,11 +107,11 @@ abstract class AbstractPostgresqlTest { "postgres:11.22-alpine", "postgres:12.22-alpine", "postgres:13.23-alpine", - "postgres:14.21-alpine", - "postgres:15.16-alpine", - "postgres:16.12-alpine", - "postgres:17.8-alpine", - "postgres:18.2-alpine" + "postgres:14.22-alpine", + "postgres:15.17-alpine", + "postgres:16.13-alpine", + "postgres:17.9-alpine", + "postgres:18.3-alpine" ) val RANDOM_POSTGRES_IMAGE = POSTGRES_IMAGES.random() diff --git a/src/test/kotlin/kolbasa/cluster/butcher/config/AvailableCommandTest.kt b/src/test/kotlin/kolbasa/cluster/butcher/config/AvailableCommandTest.kt index a976401..28e1a36 100644 --- a/src/test/kotlin/kolbasa/cluster/butcher/config/AvailableCommandTest.kt +++ b/src/test/kotlin/kolbasa/cluster/butcher/config/AvailableCommandTest.kt @@ -201,40 +201,44 @@ class AvailableCommandTest { val parsed = AvailableCommand.MOVE_DATA.parse(arrayOf("move-data", file.absolutePath)) assertInstanceOf(parsed) - assertNull(parsed.tables) + assertNull(parsed.includeTables) + assertNull(parsed.excludeTables) } @Test fun testMoveData_Parse_WithTables() { val file = configFile() val parsed = AvailableCommand.MOVE_DATA.parse( - arrayOf("move-data", "--tables=orders,events,logs", file.absolutePath) + arrayOf("move-data", "--include-tables=orders,events,logs", "--exclude-tables=garbage,trash", file.absolutePath) ) assertInstanceOf(parsed) - assertEquals(setOf("orders", "events", "logs"), parsed.tables) + assertEquals(setOf("orders", "events", "logs"), parsed.includeTables) + assertEquals(setOf("garbage", "trash"), parsed.excludeTables) } @Test fun testMoveData_Parse_TablesWhitespaceTrimmed() { val file = configFile() val parsed = AvailableCommand.MOVE_DATA.parse( - arrayOf("move-data", "--tables= orders , events ", file.absolutePath) + arrayOf("move-data", "--exclude-tables= garbage , trash ", "--include-tables= orders , events ", file.absolutePath) ) assertInstanceOf(parsed) - assertEquals(setOf("orders", "events"), parsed.tables) + assertEquals(setOf("orders", "events"), parsed.includeTables) + assertEquals(setOf("garbage", "trash"), parsed.excludeTables) } @Test fun testMoveData_Parse_TablesDuplicatesCollapse() { val file = configFile() val parsed = AvailableCommand.MOVE_DATA.parse( - arrayOf("move-data", "--tables=orders,orders,events", file.absolutePath) + arrayOf("move-data", "--include-tables=orders,orders,events", "--exclude-tables=trash,garbage,trash", file.absolutePath) ) assertInstanceOf(parsed) - assertEquals(setOf("orders", "events"), parsed.tables) + assertEquals(setOf("orders", "events"), parsed.includeTables) + assertEquals(setOf("trash", "garbage"), parsed.excludeTables) } @Test