Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ kotlin = "2.3.20"
nebula = "20.2.0"
nexus = "2.0.0"

prometheus = "1.5.1"
prometheus = "1.6.1"
opentelemetry = "1.60.1"
opentelemetry-instrumentation = "2.26.1"
opentelemetry-instrumentation-incubator = "2.26.1-alpha"
opentelemetry-instrumentation = "2.27.0"
opentelemetry-instrumentation-incubator = "2.27.0-alpha"
opentelemetry-semconv = "1.40.0"
hikaricp = "7.0.2"
postgresql = "42.7.10"

logback = "1.5.32"
testcontainers = "2.0.4"
testcontainers = "2.0.5"
junit = "6.0.3"
mockk = "1.14.9"

Expand Down
2 changes: 1 addition & 1 deletion gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-9.4.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-9.5.0-bin.zip
networkTimeout=10000
validateDistributionUrl=true
zipStoreBase=GRADLE_USER_HOME
Expand Down
2 changes: 1 addition & 1 deletion gradlew

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 15 additions & 3 deletions src/main/kotlin/kolbasa/cluster/butcher/Butcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ fun main(args: Array<String>) {
Command.parseCommand(args)
} catch (e: ButcherException.InvalidConfigurationException) {
println("=================================================")
println("Invalid configuration.")
println(red("Invalid configuration."))
println(e.messageToShow)
println("=================================================")
exitProcess(1)
Expand All @@ -24,14 +24,26 @@ fun main(args: Array<String>) {
}
} catch (e: ButcherException.ExecutionException) {
println("=================================================")
println("Execution error.")
println(red("Execution error."))
println(e.messageToShow)
println("=================================================")
exitProcess(1)
}

println("=================================================")
println("Completed successfully.")
println(green("Completed successfully."))
println(result)
println("=================================================")
}

private fun red(msg: String): String {
return "$ANSI_RED$msg$ANSI_RESET"
}

private fun green(msg: String): String {
return "$ANSI_GREEN$msg$ANSI_RESET"
}

private const val ANSI_RESET = "\u001B[0m"
private const val ANSI_RED = "\u001B[31m"
private const val ANSI_GREEN = "\u001B[32m"
2 changes: 1 addition & 1 deletion src/main/kotlin/kolbasa/cluster/butcher/MoveHelpers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ internal object MoveHelpers {
}
}

throw IllegalStateException("Initialized shard table not found")
throw ButcherException.ExecutionException("Initialized shard table ${ShardSchema.SHARD_TABLE_NAME} not found. Is it a Kolbasa cluster?")
}

fun splitNodes(nodes: SortedMap<Node, DataSource>, targetNodeId: NodeId): SourceAndTargetNodes {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@ internal data class MigrationStateResult(
appendLine("Migration state: $totalMigratingShards shard(s) in migration")
migratingShardsByTarget.toSortedMap().forEach { (target, shards) ->
val sorted = shards.sortedBy { it.shard }
appendLine(" -> ${target.id} (${sorted.size} shards):")
sorted.forEach { shard ->
appendLine(" shard ${shard.shard.toString().padStart(4)}")
}
appendLine(" ⟶ ${target.id} (${sorted.size} shards):")
appendLine(" shards: ${sorted.joinToString(separator = ",") { it.shard.toString() }}")
appendLine(" target: ${target.id}")
}
}.trimEnd()
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,16 @@ internal data class OrphanTablesResult(

override fun toString(): String = buildString {
if (isClean) {
append("Orphan tables: no orphan companion tables")
append("Orphan tables: no orphan queue tables")
return@buildString
}

appendLine("Orphan tables: $totalOrphans orphan companion(s) across ${orphansByNode.size} node(s)")
appendLine("Orphan tables: $totalOrphans orphan queue(s) across ${orphansByNode.size} node(s)")
orphansByNode.toSortedMap().forEach { (node, orphans) ->
appendLine(" ${node.id}:")
val width = orphans.maxOf { it.companionTable.length }
orphans.sortedBy { it.companionTable }.forEach { orphan ->
appendLine(" ${orphan.companionTable.padEnd(width)} (main ${orphan.missingMainTable} missing)")
appendLine(" ${orphan.companionTable.padEnd(width)} (main queue ${orphan.missingMainTable} missing)")
}
}
}.trimEnd()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ internal data class ShardBalanceResult(
appendLine(" Proposed moves ($totalMoves):")
proposedMoves.toSortedMap().forEach { (target, shards) ->
val sorted = shards.sortedBy { it.shard }
appendLine(" -> ${target.id} (${sorted.size} shards):")
sorted.forEach { shard ->
appendLine(" shard ${shard.shard.toString().padStart(4)}: ${shard.producerNode.id} -> ${target.id}")
}
appendLine(" ⟶ ${target.id} (${sorted.size} shards):")
appendLine(" shards: ${sorted.joinToString(separator = ",") { it.shard.toString() }}")
appendLine(" target: ${target.id}")
}
}.trimEnd()
}
2 changes: 1 addition & 1 deletion src/test/kotlin/kolbasa/cluster/butcher/MoveHelpersTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class MoveHelpersTest : AbstractPostgresqlTest() {
val nodes = ClusterHelper.readNodes(listOf(dataSource, dataSourceFirstSchema, dataSourceSecondSchema))

// Without shard table we expect an exception
assertThrows<IllegalStateException> {
assertThrows<ButcherException.ExecutionException> {
MoveHelpers.readShards(nodes)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,23 +87,25 @@ class MigrationStateTest {
val n2 = NodeId("n2")
val n3 = NodeId("n3")
val shards = listOf(
migratingShard(50, n3),
migratingShard(40, n3),
migratingShard(30, n3),
migratingShard(10, n2),
migratingShard(20, n2),
)

val text = MigrationState(shards).compute().toString()

assertTrue(text.contains("3 shard(s) in migration"), text)
assertTrue(text.contains("-> n2"), text)
assertTrue(text.contains("-> n3"), text)
assertTrue(text.contains("5 shard(s) in migration"), text)
assertTrue(text.contains(" n2"), text)
assertTrue(text.contains(" n3"), text)
// Targets sorted by id (n2 before n3) and shards sorted ascending within each group.
val n2Pos = text.indexOf("-> n2")
val n3Pos = text.indexOf("-> n3")
val n2Pos = text.indexOf(" n2")
val n3Pos = text.indexOf(" n3")
assertTrue(n2Pos < n3Pos, "n2 group should come before n3 group:\n$text")
val shard10Pos = text.indexOf("shard 10")
val shard20Pos = text.indexOf("shard 20")
assertTrue(shard10Pos in 0 until shard20Pos, "shard 10 should appear before shard 20:\n$text")
// Check shards list
assertTrue(text.contains("shards: 10,20"), text)
assertTrue(text.contains("shards: 30,40,50"), text)
}

// ---------- helpers ----------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class OrphanTablesTest {
fun testToString_Clean() {
val result = OrphanTables(emptyMap()).compute()

assertEquals("Orphan tables: no orphan companion tables", result.toString())
assertEquals("Orphan tables: no orphan queue tables", result.toString())
}

@Test
Expand All @@ -176,14 +176,14 @@ class OrphanTablesTest {

val text = OrphanTables(tables).compute().toString()

assertTrue(text.contains("3 orphan companion(s) across 2 node(s)"), text)
assertTrue(text.contains("3 orphan queue(s) across 2 node(s)"), text)
assertTrue(text.contains("n1"), text)
assertTrue(text.contains("n2"), text)
assertTrue(text.contains("q_orders_dlq"), text)
assertTrue(text.contains("q_orders_arc"), text)
assertTrue(text.contains("q_payments_arc"), text)
assertTrue(text.contains("main q_orders missing"), text)
assertTrue(text.contains("main q_payments missing"), text)
assertTrue(text.contains("main queue q_orders missing"), text)
assertTrue(text.contains("main queue q_payments missing"), text)
// Nodes sorted by id: n1 before n2.
val n1Pos = text.indexOf("n1:")
val n2Pos = text.indexOf("n2:")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ class ShardBalanceTest {
val text = ShardBalance(shards, nodes.toSet()).compute().toString()

assertTrue(text.contains("Proposed moves"), text)
assertTrue(text.contains("n1 -> n2"), text)
assertTrue(text.contains(" n2"), text)
}

@Test
Expand Down