Skip to content
Merged

Docs #87

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[![Maven Central](https://img.shields.io/maven-central/v/io.github.vgv/kolbasa)](https://central.sonatype.com/search?namespace=io.github.vgv)
[![Maven Central](https://img.shields.io/maven-central/v/io.github.vgv/kolbasa)](https://central.sonatype.com/artifact/io.github.vgv/kolbasa)
[![GitHub License](https://img.shields.io/badge/license-Apache%20License%202.0-blue.svg?style=flat)](http://www.apache.org/licenses/LICENSE-2.0)

# kolbasa
Expand All @@ -12,7 +12,7 @@ Kolbasa is a small, efficient and capable Kotlin library to add PostgreSQL-based
* Message visibility timeout (delay before consumed but not deleted message will be visible to another consumers)
* Configurable amount of receive attempts
* Ability to receive messages filtered by one or more meta-fields (like `user_id=42 and event_type=PAGE_VIEW`)
* Ability to receive messages sorted by one or more meta-fields (like `custom_priority desc, created asc`)
* Ability to receive messages sorted by one or more meta-fields (like `priority desc, registration_date asc`)
* Supports working in "external" transaction context (send/receive messages from a queue will follow "external" transaction commit/rollback)
* Batch send/receive to improve performance
* Different modes to deal with sending failures (fail all messages in a batch, send all until first failure, send as many as possible)
Expand Down
4 changes: 1 addition & 3 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -135,9 +135,7 @@ tasks.register<Jar>("butcherJar") {
archiveClassifier.set("")
archiveVersion.set("")

// Reproducibility: same source → same output bytes. Useful for both
// CI cache hits and for the "same tag → same butcher.jar" guarantee
// we promised in Phase 1's failure-recovery section.
// Reproducibility: same source → same output bytes
isPreserveFileTimestamps = false
isReproducibleFileOrder = true

Expand Down
787 changes: 787 additions & 0 deletions docs/Architecture.md

Large diffs are not rendered by default.

24 changes: 10 additions & 14 deletions src/main/kotlin/kolbasa/consumer/sweep/SweepConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@ package kolbasa.consumer.sweep
import kolbasa.queue.Checks

data class SweepConfig(
/**
* Do we need to remove outdated records "in place", i.e. when consuming records?
* It is possible to completely disable "in place" sweep and do it at your own schedule
*/
val enabled: Boolean = true,

/**
* Max messages to delete during sweep
*/
Expand All @@ -19,8 +13,8 @@ data class SweepConfig(
* Every fifth consume? Every tenth? Every hundredth?
*
* Default value is `0.0001 (1 / 10_000)`, so, it means that every ten thousandth consume will trigger a sweep.
* If you want to trigger a sweep at every consume, you have to use `probability = 1.0f`, to disable automatic sweep
* completely and manage it manually use `probability = 0.0f`
* If you want to trigger a sweep at every consume, you have to use `probability = 1.0` (SWEEP_IS_ALWAYS_ON constant),
* to disable automatic sweep completely and manage it manually use `probability = 0.0` (SWEEP_IS_DISABLED constant)
*/
val probability: Double = DEFAULT_SWEEP_PROBABILITY,
) {
Expand All @@ -31,16 +25,14 @@ data class SweepConfig(
}

class Builder internal constructor() {
private var enabled: Boolean = true
private var maxMessages: Int = DEFAULT_SWEEP_MESSAGES
private var probability: Double = DEFAULT_SWEEP_PROBABILITY

fun enabled() = apply { this.enabled = true }
fun disabled() = apply { this.enabled = false }
fun disable() = apply { this.probability = SWEEP_IS_DISABLED }
fun maxMessages(maxMessages: Int) = apply { this.maxMessages = maxMessages }
fun probability(probability: Double) = apply { this.probability = probability }

fun build() = SweepConfig(enabled, maxMessages, probability)
fun build() = SweepConfig(maxMessages, probability)
}

companion object {
Expand All @@ -57,13 +49,17 @@ data class SweepConfig(
* Every fifth consume? Every tenth? Every hundredth?
*
* Default value is `0.0001 (1 / 10_000)`, so, it means that every ten thousandth consume will trigger a sweep.
* If you want to trigger a sweep at every consume, you have to use `probability = 1.0`, to disable automatic sweep
* completely and manage it manually use `probability = 0.0`
* If you want to trigger a sweep at every consume, you have to use `probability = 1.0` (SWEEP_IS_ALWAYS_ON constant),
* to disable automatic sweep completely and manage it manually use `probability = 0.0` (SWEEP_IS_DISABLED constant)
*/
const val MIN_SWEEP_PROBABILITY = 0.0
const val DEFAULT_SWEEP_PROBABILITY = 1.0 / 10_000
const val MAX_SWEEP_PROBABILITY = 1.0

// Nice mnemonic constants
const val SWEEP_IS_DISABLED = MIN_SWEEP_PROBABILITY
const val SWEEP_IS_ALWAYS_ON = MAX_SWEEP_PROBABILITY

@JvmStatic
fun builder(): Builder = Builder()
}
Expand Down
18 changes: 3 additions & 15 deletions src/main/kotlin/kolbasa/consumer/sweep/SweepHelper.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,7 @@ import kotlin.math.max
object SweepHelper {

fun needSweep(): Boolean {
val sweepConfig = Kolbasa.sweepConfig

// Sweep is disabled at all, stop all other checks
if (!sweepConfig.enabled) {
return false
}

// Check
if (!checkProbability(sweepConfig.probability)) {
return false
}

return true
return checkProbability(Kolbasa.sweepConfig.probability)
}

/**
Expand All @@ -55,8 +43,8 @@ object SweepHelper {
}

internal fun checkProbability(probability: Double): Boolean = when (probability) {
0.0 -> false
1.0 -> true
SweepConfig.SWEEP_IS_DISABLED -> false
SweepConfig.SWEEP_IS_ALWAYS_ON -> true
else -> (ThreadLocalRandom.current().nextDouble() <= probability)
}

Expand Down
4 changes: 2 additions & 2 deletions src/main/kotlin/kolbasa/stats/sql/SqlDumpConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ data class SqlDumpConfig(
private var writer: Writer = Writer.nullWriter()
private var queues: MutableMap<String, EnumSet<StatementKind>> = mutableMapOf()

fun enabled() = apply { this.enabled = true }
fun disabled() = apply { this.enabled = false }
fun enable() = apply { this.enabled = true }
fun disable() = apply { this.enabled = false }
fun writer(writer: Writer) = apply { this.writer = writer }
fun queue(queue: Queue<*>, vararg kind: StatementKind) = apply {
queues[queue.name] = EnumSet.copyOf(kind.toList())
Expand Down
30 changes: 30 additions & 0 deletions src/test/kotlin/kolbasa/consumer/sweep/SweepConfigTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package kolbasa.consumer.sweep

import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.Test

class SweepConfigTest {

@Test
fun testConfigBuilder() {
val config = SweepConfig
.builder()
.probability(0.42)
.maxMessages(4200)
.build()

assertEquals(0.42, config.probability)
assertEquals(4200, config.maxMessages)
}

@Test
fun testConfigBuilder_Check_Disable() {
val config = SweepConfig
.builder()
.disable()
.build()

assertEquals(SweepConfig.SWEEP_IS_DISABLED, config.probability)
}

}