From fcab98f6d952ea7a778c03072f231bbf6f48df4e Mon Sep 17 00:00:00 2001 From: Kilemonn Date: Sat, 4 Jan 2025 19:28:11 +0900 Subject: [PATCH 1/8] Add memcached dependency. --- build.gradle.kts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/build.gradle.kts b/build.gradle.kts index e36edde..6df6b25 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -32,6 +32,10 @@ dependencies { // JPA dependency implementation("org.springframework.boot:spring-boot-starter-data-jpa:${springVersion}") + // Memcached + // https://mvnrepository.com/artifact/com.googlecode.xmemcached/xmemcached + implementation("com.googlecode.xmemcached:xmemcached:2.4.8") + // https://mvnrepository.com/artifact/org.springdoc/springdoc-openapi-starter-webmvc-ui implementation("org.springdoc:springdoc-openapi-starter-webmvc-api:${springDocVersion}") // https://mvnrepository.com/artifact/org.springdoc/springdoc-openapi-starter-webmvc-ui From 4bccc66b492238a98ac3409f0d32e71fea7cb6eb Mon Sep 17 00:00:00 2001 From: Kilemonn Date: Sat, 4 Jan 2025 21:56:32 +0900 Subject: [PATCH 2/8] Work through adding memcached support. Implementing most methods the same way redis is implemented for now. Adding test class. --- .../configuration/QueueConfiguration.kt | 8 +- .../cache/memcached/MemcachedConfiguration.kt | 53 +++++ .../cache/redis/RedisConfiguration.kt | 18 +- .../cache/memcached/MemcachedMultiQueue.kt | 204 ++++++++++++++++++ .../queue/cache/redis/RedisMultiQueue.kt | 3 - .../settings/MessageQueueSettings.kt | 48 +++-- .../messagequeue/settings/StorageMedium.kt | 7 +- .../redis/RedisSentinelAuthenticatorTest.kt | 5 + .../redis/RedisStandAloneAuthenticatorTest.kt | 4 +- .../cache/redis/RedisConfigurationTest.kt | 13 +- .../memcached/MemcachedMultiQueueTest.kt | 87 ++++++++ .../cache/redis/RedisClusterMultiQueueTest.kt | 2 +- .../redis/RedisSentinelMultiQueueTest.kt | 2 +- .../redis/RedisStandAloneMultiQueueTest.kt | 4 +- .../rest/controller/SettingsControllerTest.kt | 5 +- .../MessageQueueSettingsDefaultTest.kt | 4 +- .../settings/MessageQueueSettingsTest.kt | 8 +- 17 files changed, 421 insertions(+), 54 deletions(-) create mode 100644 src/main/kotlin/au/kilemon/messagequeue/configuration/cache/memcached/MemcachedConfiguration.kt create mode 100644 src/main/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueue.kt create mode 100644 src/test/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueueTest.kt diff --git a/src/main/kotlin/au/kilemon/messagequeue/configuration/QueueConfiguration.kt b/src/main/kotlin/au/kilemon/messagequeue/configuration/QueueConfiguration.kt index 39c0f74..a7cd77d 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/configuration/QueueConfiguration.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/configuration/QueueConfiguration.kt @@ -13,6 +13,7 @@ import au.kilemon.messagequeue.logging.Messages import au.kilemon.messagequeue.message.QueueMessage import au.kilemon.messagequeue.queue.MultiQueue import au.kilemon.messagequeue.queue.cache.CacheKeyManager +import au.kilemon.messagequeue.queue.cache.memcached.MemcachedMultiQueue import au.kilemon.messagequeue.queue.cache.redis.RedisMultiQueue import au.kilemon.messagequeue.queue.inmemory.InMemoryMultiQueue import au.kilemon.messagequeue.queue.nosql.mongo.MongoMultiQueue @@ -58,7 +59,7 @@ class QueueConfiguration : HasLogger var queue: MultiQueue = InMemoryMultiQueue() when (messageQueueSettings.storageMedium.uppercase()) { StorageMedium.REDIS.toString() -> { - queue = RedisMultiQueue(messageQueueSettings.redisPrefix) + queue = RedisMultiQueue(messageQueueSettings.cachePrefix) } StorageMedium.SQL.toString() -> { queue = SqlMultiQueue() @@ -66,6 +67,9 @@ class QueueConfiguration : HasLogger StorageMedium.MONGO.toString() -> { queue = MongoMultiQueue() } + StorageMedium.MEMCACHED.toString() -> { + queue = MemcachedMultiQueue(messageQueueSettings.cachePrefix) + } } LOG.info("Initialising [{}] queue as the [{}] is set to [{}].", queue::class.java.name, MessageQueueSettings.STORAGE_MEDIUM, messageQueueSettings.storageMedium) @@ -115,7 +119,7 @@ class QueueConfiguration : HasLogger var authenticator: MultiQueueAuthenticator = InMemoryAuthenticator() when (messageQueueSettings.storageMedium.uppercase()) { StorageMedium.REDIS.toString() -> { - authenticator = RedisAuthenticator(messageQueueSettings.redisPrefix) + authenticator = RedisAuthenticator(messageQueueSettings.cachePrefix) } StorageMedium.SQL.toString() -> { authenticator = SqlAuthenticator() diff --git a/src/main/kotlin/au/kilemon/messagequeue/configuration/cache/memcached/MemcachedConfiguration.kt b/src/main/kotlin/au/kilemon/messagequeue/configuration/cache/memcached/MemcachedConfiguration.kt new file mode 100644 index 0000000..092ba07 --- /dev/null +++ b/src/main/kotlin/au/kilemon/messagequeue/configuration/cache/memcached/MemcachedConfiguration.kt @@ -0,0 +1,53 @@ +package au.kilemon.messagequeue.configuration.cache.memcached + +import au.kilemon.messagequeue.configuration.cache.redis.RedisConfiguration +import au.kilemon.messagequeue.logging.HasLogger +import au.kilemon.messagequeue.settings.MessageQueueSettings +import net.rubyeye.xmemcached.MemcachedClient +import net.rubyeye.xmemcached.XMemcachedClientBuilder +import org.slf4j.Logger +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.data.redis.connection.RedisConnectionFactory +import org.springframework.data.redis.connection.RedisSentinelConfiguration +import org.springframework.data.redis.connection.RedisStandaloneConfiguration + +/** + * A class that creates the required [Bean] objects when memcached is enabled. + * + * @author github.com/Kilemonn + */ +@Configuration +class MemcachedConfiguration: HasLogger +{ + override val LOG: Logger = this.initialiseLogger() + + companion object + { + const val MEMCACHED_DEFAULT_PORT: String = "11211" + } + + @Autowired + private lateinit var messageQueueSettings: MessageQueueSettings + + /** + * Create the [RedisConnectionFactory] based on the loaded configuration. + * If [MessageQueueSettings.redisUseSentinels] is `true` then multiple endpoints are expected in [MessageQueueSettings.cacheEndpoint] and will attempt to be parsed out + * and set into the [RedisSentinelConfiguration]. + * + * Otherwise, the first endpoint and port provided will be used to create a [RedisStandaloneConfiguration]. + * + * @return the created [RedisConnectionFactory] based on the configured [MessageQueueSettings] + */ + @Bean + @ConditionalOnProperty(name=[MessageQueueSettings.STORAGE_MEDIUM], havingValue="MEMCACHED") + fun getMemcachedClient(): MemcachedClient + { + LOG.info("Initialising memcached configuration with the following configuration: Endpoints {}. With prefix {}.", + messageQueueSettings.cacheEndpoint, messageQueueSettings.cachePrefix) + val builder = XMemcachedClientBuilder(RedisConfiguration.stringToInetSocketAddresses(messageQueueSettings.cacheEndpoint, MEMCACHED_DEFAULT_PORT)) + return builder.build() + } +} diff --git a/src/main/kotlin/au/kilemon/messagequeue/configuration/cache/redis/RedisConfiguration.kt b/src/main/kotlin/au/kilemon/messagequeue/configuration/cache/redis/RedisConfiguration.kt index 0d784e1..65e4972 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/configuration/cache/redis/RedisConfiguration.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/configuration/cache/redis/RedisConfiguration.kt @@ -116,7 +116,11 @@ class RedisConfiguration: HasLogger /** * Create the [RedisConnectionFactory] based on the loaded configuration. +<<<<<<< HEAD * If [MessageQueueSettings.redisMode] is `true` then multiple endpoints are expected in [MessageQueueSettings.redisEndpoint] and will attempt to be parsed out +======= + * If [MessageQueueSettings.redisUseSentinels] is `true` then multiple endpoints are expected in [MessageQueueSettings.cacheEndpoint] and will attempt to be parsed out +>>>>>>> 36e6084 (Work through adding memcached support. Implementing most methods the same way redis is implemented for now.) * and set into the [RedisSentinelConfiguration]. * * Otherwise, the first endpoint and port provided will be used to create a [RedisStandaloneConfiguration]. @@ -148,10 +152,11 @@ class RedisConfiguration: HasLogger fun getSentinelConfiguration(): RedisSentinelConfiguration { LOG.info("Initialising redis sentinel configuration with the following configuration: Endpoints {}, master {}. With prefix {}.", - messageQueueSettings.redisEndpoint, messageQueueSettings.redisMasterName, messageQueueSettings.redisPrefix) + messageQueueSettings.cacheEndpoint, messageQueueSettings.redisMasterName, messageQueueSettings.cachePrefix) + val redisSentinelConfiguration = RedisSentinelConfiguration() redisSentinelConfiguration.master(messageQueueSettings.redisMasterName) - val sentinelEndpoints = stringToInetSocketAddresses(messageQueueSettings.redisEndpoint, REDIS_SENTINEL_DEFAULT_PORT) + val sentinelEndpoints = stringToInetSocketAddresses(messageQueueSettings.cacheEndpoint, REDIS_SENTINEL_DEFAULT_PORT) if (sentinelEndpoints.isEmpty()) { @@ -173,9 +178,10 @@ class RedisConfiguration: HasLogger fun getStandAloneConfiguration(): RedisStandaloneConfiguration { LOG.info("Initialising redis standalone configuration with the following configuration: Endpoint [{}], prefix [{}].", - messageQueueSettings.redisEndpoint, messageQueueSettings.redisPrefix) + messageQueueSettings.cacheEndpoint, messageQueueSettings.cachePrefix) + val redisConfiguration = RedisStandaloneConfiguration() - val redisEndpoints = stringToInetSocketAddresses(messageQueueSettings.redisEndpoint, REDIS_DEFAULT_PORT) + val redisEndpoints = stringToInetSocketAddresses(messageQueueSettings.cacheEndpoint, REDIS_DEFAULT_PORT) if (redisEndpoints.isEmpty()) { LOG.error("No redis endpoints defined for standalone configuration. Unable to initialise redis configuration.") @@ -194,10 +200,10 @@ class RedisConfiguration: HasLogger fun getClusterConfiguration(): RedisClusterConfiguration { LOG.info("Initialising redis cluster configuration with the following configuration: Endpoint [{}], prefix [{}].", - messageQueueSettings.redisEndpoint, messageQueueSettings.redisPrefix) + messageQueueSettings.cacheEndpoint, messageQueueSettings.cachePrefix) val configuration = RedisClusterConfiguration() - val nodes = endpointToNodes(messageQueueSettings.redisEndpoint) + val nodes = endpointToNodes(messageQueueSettings.cacheEndpoint) configuration.setClusterNodes(nodes) return configuration diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueue.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueue.kt new file mode 100644 index 0000000..e26456a --- /dev/null +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueue.kt @@ -0,0 +1,204 @@ +package au.kilemon.messagequeue.queue.cache.memcached + +import au.kilemon.messagequeue.logging.HasLogger +import au.kilemon.messagequeue.message.QueueMessage +import au.kilemon.messagequeue.queue.MultiQueue +import au.kilemon.messagequeue.queue.exception.MessageUpdateException +import au.kilemon.messagequeue.settings.MessageQueueSettings +import net.rubyeye.xmemcached.MemcachedClient +import org.slf4j.Logger +import org.springframework.beans.factory.annotation.Autowired +import java.util.HashSet +import java.util.Optional +import java.util.Queue +import java.util.concurrent.ConcurrentLinkedQueue + +/** + * A `Memcached` specific implementation of the [MultiQueue]. + * All messages stored and accessed directly from the `Memcached` instance. + * This increases overhead when checking UUID, but it is required incase the cache is edited manually, or by another message managing instance. + * + * @author github.com/Kilemonn + */ +class MemcachedMultiQueue(private val prefix: String = ""): MultiQueue(), HasLogger +{ + override val LOG: Logger = this.initialiseLogger() + + @Autowired + private lateinit var client: MemcachedClient + + /** + * Append the [MessageQueueSettings.cachePrefix] to the provided [subQueue] [String]. + * + * @param subQueue the [String] to add the prefix to + * @return a [String] with the provided [subQueue] with the [MessageQueueSettings.cachePrefix] appended to the beginning. + */ + private fun appendPrefix(subQueue: String): String + { + if (hasPrefix() && !subQueue.startsWith(getPrefix())) + { + return "${getPrefix()}$subQueue" + } + return subQueue + } + + /** + * @return whether the [prefix] is [String.isNotBlank] + */ + internal fun hasPrefix(): Boolean + { + return getPrefix().isNotBlank() + } + + /** + * @return [prefix] + */ + internal fun getPrefix(): String + { + return prefix + } + + override fun getNextSubQueueIndex(subQueue: String): Optional + { + val queue = getSubQueue(appendPrefix(subQueue)) + return if (queue.isNotEmpty()) + { + var lastIndex = queue.last().id + if (lastIndex == null) + { + LOG.warn("subQueue [{}] is not empty but last index is null. Returning index with value [{}].", subQueue, 1) + return Optional.of(1) + } + else + { + lastIndex++ + LOG.trace("Incrementing and returning index for subQueue [{}]. Returning index with value [{}].", subQueue, lastIndex) + return Optional.of(lastIndex) + } + } + else + { + LOG.trace("subQueue [{}] is empty, returning index with value [{}].", subQueue, 1) + Optional.of(1) + } + } + + override fun persistMessageInternal(message: QueueMessage) + { + val queue = getSubQueue(message.subQueue) + val matchingMessage = queue.stream().filter{ element -> element.uuid == message.uuid }.findFirst() + if (matchingMessage.isPresent) + { + message.id = matchingMessage.get().id + val wasRemoved = removeInternal(matchingMessage.get()) + val wasReAdded = addInternal(message) + if (wasRemoved && wasReAdded) + { + return + } + } + throw MessageUpdateException(message.uuid) + } + + override fun getSubQueueInternal(subQueue: String): Queue + { + val queue = ConcurrentLinkedQueue() + var set: Set? = client.get?>(appendPrefix(subQueue)) + if (set == null) + { + set = HashSet() + client.set(appendPrefix(subQueue), 0, set) + } + + if (set.isNotEmpty()) + { + queue.addAll(set.toSortedSet { message1, message2 -> (message1.id ?: 0).minus(message2.id ?: 0).toInt() }) + } + return queue + } + + override fun performHealthCheckInternal() + { + client.get("") + } + + override fun getMessageByUUID(uuid: String): Optional + { + val subQueue = containsUUID(uuid) + if (subQueue.isPresent) + { + LOG.trace("Found message with uuid [{}].", uuid) + val queue: Queue = getSubQueue(subQueue.get()) + return queue.stream().filter { message -> message.uuid == uuid }.findFirst() + } + LOG.trace("No message found with uuid [{}].", uuid) + return Optional.empty() + } + + override fun clearSubQueueInternal(subQueue: String): Int + { + var amountRemoved = 0 + val queue = getSubQueue(subQueue) + if (queue.isNotEmpty()) + { + amountRemoved = queue.size + client.delete(appendPrefix(subQueue)) + LOG.debug("Cleared existing sub-queue [{}]. Removed [{}] message entries.", subQueue, amountRemoved) + } + else + { + LOG.debug("Attempting to clear non-existent sub-queue [{}]. No messages cleared.", subQueue) + } + return amountRemoved + } + + override fun isEmptySubQueue(subQueue: String): Boolean + { + return getSubQueue(subQueue).isEmpty() + } + + override fun pollInternal(subQueue: String): Optional + { + val queue = getSubQueue(subQueue) + if (queue.isNotEmpty()) + { + return Optional.of(queue.iterator().next()) + } + return Optional.empty() + } + + override fun keysInternal(includeEmpty: Boolean): HashSet + { + TODO("Not yet implemented") + } + + override fun containsUUID(uuid: String): Optional + { + for (key in keys()) + { + val queue = getSubQueue(key) + val anyMatchTheUUID = queue.stream().anyMatch{ message -> uuid == message.uuid } + if (anyMatchTheUUID) + { + LOG.debug("Found sub-queue [{}] for message UUID: [{}].", key, uuid) + return Optional.of(key) + } + } + LOG.debug("No sub-queue contains message with UUID: [{}].", uuid) + return Optional.empty() + } + + override fun addInternal(element: QueueMessage): Boolean + { + val queue: Queue = getSubQueue(element.subQueue) + val wasAdded = queue.add(element) + return wasAdded && client.set(appendPrefix(element.subQueue), 0, queue) + } + + override fun removeInternal(element: QueueMessage): Boolean + { + val queue: Queue = getSubQueue(element.subQueue) + val wasRemoved = queue.remove(element) + return wasRemoved && client.set(appendPrefix(element.subQueue), 0, queue) + } +} diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisMultiQueue.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisMultiQueue.kt index 2f0f51d..1516cef 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisMultiQueue.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisMultiQueue.kt @@ -43,9 +43,6 @@ class RedisMultiQueue(private val prefix: String) : MultiQueue(), HasLogger, Cac return prefix } - /** - * Attempts to append the prefix before requesting the underlying redis entry if the provided [subQueue] is not prefixed with [MessageQueueSettings.redisPrefix]. - */ override fun getSubQueueInternal(subQueue: String): Queue { val queue = ConcurrentLinkedQueue() diff --git a/src/main/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettings.kt b/src/main/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettings.kt index cfb52b7..383230b 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettings.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettings.kt @@ -36,12 +36,17 @@ class MessageQueueSettings * Start redis related properties */ private const val REDIS: String = "redis" - const val REDIS_PREFIX: String = "$MESSAGE_QUEUE.$REDIS.prefix" - const val REDIS_ENDPOINT: String = "$MESSAGE_QUEUE.$REDIS.endpoint" + const val REDIS_ENDPOINT_DEFAULT: String = "127.0.0.1" const val REDIS_USERNAME: String = "$MESSAGE_QUEUE.$REDIS.username" const val REDIS_PASSWORD: String = "$MESSAGE_QUEUE.$REDIS.password" + // Used by memcached + const val CACHE_PREFIX: String = "$MESSAGE_QUEUE.cache.prefix" + // Used by memcached + const val CACHE_ENDPOINT: String = "$MESSAGE_QUEUE.cache.endpoint" + const val CACHE_ENDPOINT_DEFAULT: String = "127.0.0.1" + // Redis sentinel related properties const val REDIS_MODE: String = "$MESSAGE_QUEUE.$REDIS.mode" const val REDIS_MODE_DEFAULT: String = "STANDALONE" @@ -120,38 +125,37 @@ class MessageQueueSettings /** - * `Optional` when [STORAGE_MEDIUM] is set to [StorageMedium.REDIS]. - * Uses the [REDIS_PREFIX] to set a prefix used for all redis entry keys. + * `Optional` when [STORAGE_MEDIUM] is set to [StorageMedium.MEMCACHED]. + * Uses the [CACHE_PREFIX] to set a prefix used for all cache entry keys. * - * E.g. if the initial value for the redis entry is "my-key" and no prefix is defined the entries would be stored under "my-key". + * E.g. if the initial value for the cache entry is "my-key" and no prefix is defined the entries would be stored under "my-key". * Using the same scenario if the prefix is "prefix" then the resultant key would be "prefixmy-key". */ - @Schema(title = "Redis Prefix", example = "my-prefix-", - description = "Used to remove/reduce the likelihood of any collisions if this is being used in an existing redis instance. " + - "The prefix will be added to all entries made in the redis storage medium.") - @SerializedName(REDIS_PREFIX) - @JsonProperty(REDIS_PREFIX) - @Value("\${$REDIS_PREFIX:}") + @Schema(title = "Cache Prefix", example = "my-prefix-", + description = "Used to remove/reduce the likelihood of any collisions if this is being used in an existing cache instance. " + + "The prefix will be added to all entries made in the cache storage medium.") + @SerializedName(CACHE_PREFIX) + @JsonProperty(CACHE_PREFIX) + @Value("\${$CACHE_PREFIX:}") @get:Generated @set:Generated - lateinit var redisPrefix: String + lateinit var cachePrefix: String /** - * `Required` when [STORAGE_MEDIUM] is set to [StorageMedium.REDIS]. - * The input endpoint string which is used for both standalone and the sentinel redis configurations. - * This supports a comma separated list or single definition of a redis endpoint in the following formats: + * `Required` when [STORAGE_MEDIUM] is set to [StorageMedium.REDIS] OR [StorageMedium.MEMCACHED]. + * This supports a comma separated list or single definition of a endpoints in the following formats: * `:,:,` * - * If not provided [REDIS_ENDPOINT_DEFAULT] will be used by default. + * If not provided [CACHE_ENDPOINT_DEFAULT] will be used by default. */ - @Schema(title = "Redis Endpoint", example = "sentinel1.com:5545,sentinel2.org:9980", - description = "The endpoint string which is used for both standalone and the sentinel redis configurations.") - @SerializedName(REDIS_ENDPOINT) - @JsonProperty(REDIS_ENDPOINT) - @Value("\${$REDIS_ENDPOINT:$REDIS_ENDPOINT_DEFAULT}") + @Schema(title = "Cache Endpoint", example = "sentinel1.com:5545,sentinel2.org:9980", + description = "The endpoint string which can contain multiple comma separated endpoints and ports.") + @SerializedName(CACHE_ENDPOINT) + @JsonProperty(CACHE_ENDPOINT) + @Value("\${$CACHE_ENDPOINT:$CACHE_ENDPOINT_DEFAULT}") @get:Generated @set:Generated - lateinit var redisEndpoint: String + lateinit var cacheEndpoint: String /** * `Optional` when [STORAGE_MEDIUM] is set to [StorageMedium.REDIS]. diff --git a/src/main/kotlin/au/kilemon/messagequeue/settings/StorageMedium.kt b/src/main/kotlin/au/kilemon/messagequeue/settings/StorageMedium.kt index db97125..97b6385 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/settings/StorageMedium.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/settings/StorageMedium.kt @@ -26,5 +26,10 @@ enum class StorageMedium /** * Initialise and connect to the defined `mongo` store. */ - MONGO; + MONGO, + + /** + * Will connect to the defined memcache instance to store queue messages. + */ + MEMCACHED; } diff --git a/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisSentinelAuthenticatorTest.kt b/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisSentinelAuthenticatorTest.kt index 8126204..15fe8d3 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisSentinelAuthenticatorTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisSentinelAuthenticatorTest.kt @@ -83,8 +83,13 @@ class RedisSentinelAuthenticatorTest: MultiQueueAuthenticatorTest() sentinel.start() TestPropertyValues.of( +<<<<<<< HEAD "${MessageQueueSettings.REDIS_ENDPOINT}=${sentinel.host}:${sentinel.getMappedPort(RedisConfiguration.REDIS_SENTINEL_DEFAULT_PORT.toInt())}", "${MessageQueueSettings.REDIS_MODE}=${RedisMode.SENTINEL.name}" +======= + "${MessageQueueSettings.CACHE_ENDPOINT}=${sentinel.host}:${sentinel.getMappedPort(RedisConfiguration.REDIS_SENTINEL_DEFAULT_PORT.toInt())}", + "${MessageQueueSettings.REDIS_USE_SENTINELS}=true" +>>>>>>> 36e6084 (Work through adding memcached support. Implementing most methods the same way redis is implemented for now.) ).applyTo(configurableApplicationContext.environment) } } diff --git a/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisStandAloneAuthenticatorTest.kt b/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisStandAloneAuthenticatorTest.kt index fe77f31..6d34d68 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisStandAloneAuthenticatorTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisStandAloneAuthenticatorTest.kt @@ -27,7 +27,7 @@ import org.testcontainers.utility.DockerImageName * @author github.com/Kilemonn */ @ExtendWith(SpringExtension::class) -@TestPropertySource(properties = ["${MessageQueueSettings.STORAGE_MEDIUM}=REDIS", "${MessageQueueSettings.REDIS_PREFIX}=test"]) +@TestPropertySource(properties = ["${MessageQueueSettings.STORAGE_MEDIUM}=REDIS", "${MessageQueueSettings.CACHE_PREFIX}=test"]) @Testcontainers @ContextConfiguration(initializers = [RedisStandAloneAuthenticatorTest.Initializer::class]) @Import(*[QueueConfiguration::class, LoggingConfiguration::class, RedisConfiguration::class, MultiQueueTest.MultiQueueTestConfiguration::class]) @@ -70,7 +70,7 @@ class RedisStandAloneAuthenticatorTest: MultiQueueAuthenticatorTest() redis.start() TestPropertyValues.of( - "${MessageQueueSettings.REDIS_ENDPOINT}=${redis.host}:${redis.getMappedPort(REDIS_PORT)}" + "${MessageQueueSettings.CACHE_ENDPOINT}=${redis.host}:${redis.getMappedPort(REDIS_PORT)}" ).applyTo(configurableApplicationContext.environment) } } diff --git a/src/test/kotlin/au/kilemon/messagequeue/configuration/cache/redis/RedisConfigurationTest.kt b/src/test/kotlin/au/kilemon/messagequeue/configuration/cache/redis/RedisConfigurationTest.kt index 867d662..2bd3849 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/configuration/cache/redis/RedisConfigurationTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/configuration/cache/redis/RedisConfigurationTest.kt @@ -134,8 +134,8 @@ class RedisConfigurationTest fun testGetConnectionFactory_sentinelWithNoEndpoints() { messageQueueSettings.redisMode = RedisMode.SENTINEL.name - messageQueueSettings.redisEndpoint = "" - Assertions.assertEquals("", messageQueueSettings.redisEndpoint) + messageQueueSettings.cacheEndpoint = "" + Assertions.assertEquals("", messageQueueSettings.cacheEndpoint) Assertions.assertThrows(RedisInitialisationException::class.java) { redisConfiguration.getSentinelConfiguration() } @@ -148,8 +148,8 @@ class RedisConfigurationTest fun testGetConnectionFactory_standAloneWithNoEndpoints() { messageQueueSettings.redisMode = RedisMode.STANDALONE.name - messageQueueSettings.redisEndpoint = "" - Assertions.assertEquals("", messageQueueSettings.redisEndpoint) + messageQueueSettings.cacheEndpoint = "" + Assertions.assertEquals("", messageQueueSettings.cacheEndpoint) Assertions.assertThrows(RedisInitialisationException::class.java) { redisConfiguration.getStandAloneConfiguration() } @@ -167,8 +167,9 @@ class RedisConfigurationTest val endpoint1 = "$endpoint1Host:$endpoint1Port" val endpoint2 = "redis:6789" val endpoints = "$endpoint1,$endpoint2" - messageQueueSettings.redisEndpoint = endpoints - Assertions.assertEquals(endpoints, messageQueueSettings.redisEndpoint) + + messageQueueSettings.cacheEndpoint = endpoints + Assertions.assertEquals(endpoints, messageQueueSettings.cacheEndpoint) val standAloneConfiguration = redisConfiguration.getStandAloneConfiguration() Assertions.assertEquals(endpoint1Host, standAloneConfiguration.hostName) Assertions.assertEquals(endpoint1Port.toInt(), standAloneConfiguration.port) diff --git a/src/test/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueueTest.kt b/src/test/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueueTest.kt new file mode 100644 index 0000000..6ca17ed --- /dev/null +++ b/src/test/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueueTest.kt @@ -0,0 +1,87 @@ +package au.kilemon.messagequeue.queue.cache.memcached + +import au.kilemon.messagequeue.configuration.QueueConfiguration +import au.kilemon.messagequeue.configuration.cache.memcached.MemcachedConfiguration +import au.kilemon.messagequeue.configuration.cache.redis.RedisConfiguration +import au.kilemon.messagequeue.logging.LoggingConfiguration +import au.kilemon.messagequeue.queue.MultiQueueTest +import au.kilemon.messagequeue.queue.cache.redis.RedisStandAloneMultiQueueTest +import au.kilemon.messagequeue.settings.MessageQueueSettings +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.extension.ExtendWith +import org.springframework.boot.test.util.TestPropertyValues +import org.springframework.context.ApplicationContextInitializer +import org.springframework.context.ConfigurableApplicationContext +import org.springframework.context.annotation.Import +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.TestPropertySource +import org.springframework.test.context.junit.jupiter.SpringExtension +import org.testcontainers.containers.GenericContainer +import org.testcontainers.junit.jupiter.Testcontainers +import org.testcontainers.utility.DockerImageName + +/** + * A test class for the [MemcachedMultiQueue] `Component` class. + * + * @author github.com/Kilemonn + */ +@ExtendWith(SpringExtension::class) +@TestPropertySource(properties = ["${MessageQueueSettings.STORAGE_MEDIUM}=MEMCACHED", "${MessageQueueSettings.CACHE_PREFIX}=test"]) +@Testcontainers +@ContextConfiguration(initializers = [MemcachedMultiQueueTest.Initializer::class]) +@Import(*[QueueConfiguration::class, LoggingConfiguration::class, MemcachedConfiguration::class, MultiQueueTest.MultiQueueTestConfiguration::class]) +class MemcachedMultiQueueTest: MultiQueueTest() +{ + companion object + { + private const val MEMCACHED_PORT: Int = 11211 + private const val MEMCACHED_CONTAINER: String = "memcached:1.6.34-alpine3.21" + + lateinit var memcache: GenericContainer<*> + + /** + * Stop the container at the end of all the tests. + */ + @AfterAll + @JvmStatic + fun afterClass() + { + memcache.stop() + } + } + + /** + * The test initialiser for [MemcachedMultiQueueTest] to initialise the container and test properties. + * + * @author github.com/Kilemonn + */ + internal class Initializer : ApplicationContextInitializer + { + /** + * Force start the container, so we can place its host and dynamic ports into the system properties. + * + * Set the environment variables before any of the beans are initialised. + */ + override fun initialize(configurableApplicationContext: ConfigurableApplicationContext) + { + memcache = GenericContainer(DockerImageName.parse(MEMCACHED_CONTAINER)) + .withExposedPorts(MEMCACHED_PORT).withReuse(false) + memcache.start() + + TestPropertyValues.of( + "${MessageQueueSettings.CACHE_ENDPOINT}=${memcache.host}:${memcache.getMappedPort(MEMCACHED_PORT)}" + ).applyTo(configurableApplicationContext.environment) + } + } + + /** + * Check the container is running before each test. + */ + @BeforeEach + fun beforeEach() + { + Assertions.assertTrue(memcache.isRunning) + } +} diff --git a/src/test/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisClusterMultiQueueTest.kt b/src/test/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisClusterMultiQueueTest.kt index 352b155..1985340 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisClusterMultiQueueTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisClusterMultiQueueTest.kt @@ -134,7 +134,7 @@ class RedisClusterMultiQueueTest: CacheMultiQueueTest() val endpoints = StringBuilder() redisInstances.forEach { endpoints.append("$hostIp:${it.getMappedPort(RedisConfiguration.REDIS_DEFAULT_PORT.toInt())},") } TestPropertyValues.of( - "${MessageQueueSettings.REDIS_ENDPOINT}=${endpoints}", + "${MessageQueueSettings.CACHE_ENDPOINT}=${endpoints}", "${MessageQueueSettings.REDIS_MODE}=${RedisMode.CLUSTER.name}" ).applyTo(configurableApplicationContext.environment) } diff --git a/src/test/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisSentinelMultiQueueTest.kt b/src/test/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisSentinelMultiQueueTest.kt index f29a0d1..1d708c8 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisSentinelMultiQueueTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisSentinelMultiQueueTest.kt @@ -88,7 +88,7 @@ class RedisSentinelMultiQueueTest: CacheMultiQueueTest() sentinel.start() TestPropertyValues.of( - "${MessageQueueSettings.REDIS_ENDPOINT}=${sentinel.host}:${sentinel.getMappedPort(RedisConfiguration.REDIS_SENTINEL_DEFAULT_PORT.toInt())}", + "${MessageQueueSettings.CACHE_ENDPOINT}=${sentinel.host}:${sentinel.getMappedPort(RedisConfiguration.REDIS_SENTINEL_DEFAULT_PORT.toInt())}", "${MessageQueueSettings.REDIS_MODE}=${RedisMode.SENTINEL.name}" ).applyTo(configurableApplicationContext.environment) } diff --git a/src/test/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisStandAloneMultiQueueTest.kt b/src/test/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisStandAloneMultiQueueTest.kt index 90ac148..b71af48 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisStandAloneMultiQueueTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisStandAloneMultiQueueTest.kt @@ -42,7 +42,7 @@ import org.testcontainers.utility.DockerImageName * @author github.com/Kilemonn */ @ExtendWith(SpringExtension::class) -@TestPropertySource(properties = ["${MessageQueueSettings.STORAGE_MEDIUM}=REDIS", "${MessageQueueSettings.REDIS_PREFIX}=test"]) +@TestPropertySource(properties = ["${MessageQueueSettings.STORAGE_MEDIUM}=REDIS", "${MessageQueueSettings.CACHE_PREFIX}=test"]) @Testcontainers @ContextConfiguration(initializers = [RedisStandAloneMultiQueueTest.Initializer::class]) @Import(*[QueueConfiguration::class, LoggingConfiguration::class, RedisConfiguration::class, MultiQueueTest.MultiQueueTestConfiguration::class]) @@ -85,7 +85,7 @@ class RedisStandAloneMultiQueueTest: CacheMultiQueueTest() redis.start() TestPropertyValues.of( - "${MessageQueueSettings.REDIS_ENDPOINT}=${redis.host}:${redis.getMappedPort(REDIS_PORT)}" + "${MessageQueueSettings.CACHE_ENDPOINT}=${redis.host}:${redis.getMappedPort(REDIS_PORT)}" ).applyTo(configurableApplicationContext.environment) } } diff --git a/src/test/kotlin/au/kilemon/messagequeue/rest/controller/SettingsControllerTest.kt b/src/test/kotlin/au/kilemon/messagequeue/rest/controller/SettingsControllerTest.kt index 3501137..f36e85e 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/rest/controller/SettingsControllerTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/rest/controller/SettingsControllerTest.kt @@ -78,9 +78,10 @@ class SettingsControllerTest Assertions.assertEquals(StorageMedium.IN_MEMORY.toString(), settings.storageMedium) Assertions.assertEquals(RestrictionMode.NONE.toString(), settings.restrictionMode) - Assertions.assertTrue(settings.redisPrefix.isEmpty()) - Assertions.assertEquals(MessageQueueSettings.REDIS_ENDPOINT_DEFAULT, settings.redisEndpoint) + Assertions.assertTrue(settings.cachePrefix.isEmpty()) + Assertions.assertEquals(MessageQueueSettings.REDIS_ENDPOINT_DEFAULT, settings.cacheEndpoint) Assertions.assertEquals(RedisMode.STANDALONE.name, settings.redisMode) + Assertions.assertEquals(MessageQueueSettings.REDIS_MASTER_NAME_DEFAULT, settings.redisMasterName) Assertions.assertTrue(settings.sqlEndpoint.isEmpty()) diff --git a/src/test/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettingsDefaultTest.kt b/src/test/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettingsDefaultTest.kt index c382f56..244ff6e 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettingsDefaultTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettingsDefaultTest.kt @@ -46,8 +46,8 @@ class MessageQueueSettingsDefaultTest Assertions.assertEquals(MessageQueueSettings.STORAGE_MEDIUM_DEFAULT, messageQueueSettings.storageMedium) Assertions.assertEquals(MessageQueueSettings.RESTRICTION_MODE_DEFAULT, messageQueueSettings.restrictionMode) - Assertions.assertEquals(MessageQueueSettings.REDIS_ENDPOINT_DEFAULT, messageQueueSettings.redisEndpoint) - Assertions.assertEquals("", messageQueueSettings.redisPrefix) + Assertions.assertEquals(MessageQueueSettings.CACHE_ENDPOINT_DEFAULT, messageQueueSettings.cacheEndpoint) + Assertions.assertEquals("", messageQueueSettings.cachePrefix) Assertions.assertEquals(MessageQueueSettings.REDIS_MASTER_NAME_DEFAULT, messageQueueSettings.redisMasterName) Assertions.assertEquals(RedisMode.STANDALONE.name, messageQueueSettings.redisMode) diff --git a/src/test/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettingsTest.kt b/src/test/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettingsTest.kt index a73af64..b7d210f 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettingsTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettingsTest.kt @@ -18,8 +18,8 @@ import org.springframework.test.context.junit.jupiter.SpringExtension @ExtendWith(SpringExtension::class) @TestPropertySource(properties = [ "${MessageQueueSettings.STORAGE_MEDIUM}=REDIS", - "${MessageQueueSettings.REDIS_ENDPOINT}=123.123.123.123", - "${MessageQueueSettings.REDIS_PREFIX}=redis", + "${MessageQueueSettings.CACHE_ENDPOINT}=123.123.123.123", + "${MessageQueueSettings.CACHE_PREFIX}=redis", "${MessageQueueSettings.REDIS_MODE}=SENTINEL", "${MessageQueueSettings.REDIS_MASTER_NAME}=master" ]) @@ -52,8 +52,8 @@ class MessageQueueSettingsTest { Assertions.assertNotNull(messageQueueSettings) Assertions.assertEquals("REDIS", messageQueueSettings.storageMedium) - Assertions.assertEquals("123.123.123.123", messageQueueSettings.redisEndpoint) - Assertions.assertEquals("redis", messageQueueSettings.redisPrefix) + Assertions.assertEquals("123.123.123.123", messageQueueSettings.cacheEndpoint) + Assertions.assertEquals("redis", messageQueueSettings.cachePrefix) Assertions.assertEquals("master", messageQueueSettings.redisMasterName) Assertions.assertEquals("SENTINEL", messageQueueSettings.redisMode) } From 2ec4d5d433df8ed19de4288c42cba7de52392ae9 Mon Sep 17 00:00:00 2001 From: Kilemonn Date: Sun, 5 Jan 2025 11:06:10 +0900 Subject: [PATCH 3/8] Implement MemcachedAuthenticator and tests. --- .../cache/memcached/MemcachedAuthenticator.kt | 80 +++++++++++++++++ .../configuration/QueueConfiguration.kt | 4 + .../memcached/MemcachedAuthenticatorTest.kt | 88 +++++++++++++++++++ 3 files changed, 172 insertions(+) create mode 100644 src/main/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/memcached/MemcachedAuthenticator.kt create mode 100644 src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/memcached/MemcachedAuthenticatorTest.kt diff --git a/src/main/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/memcached/MemcachedAuthenticator.kt b/src/main/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/memcached/MemcachedAuthenticator.kt new file mode 100644 index 0000000..0ea717e --- /dev/null +++ b/src/main/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/memcached/MemcachedAuthenticator.kt @@ -0,0 +1,80 @@ +package au.kilemon.messagequeue.authentication.authenticator.cache.memcached + +import au.kilemon.messagequeue.authentication.AuthenticationMatrix +import au.kilemon.messagequeue.authentication.authenticator.MultiQueueAuthenticator +import au.kilemon.messagequeue.authentication.authenticator.cache.redis.RedisAuthenticator.Companion.RESTRICTED_KEY +import net.rubyeye.xmemcached.MemcachedClient +import org.slf4j.Logger +import org.springframework.beans.factory.annotation.Autowired +import java.util.stream.Collectors + +/** + * A [MultiQueueAuthenticator] implementation using Memcached as the storage mechanism for the restricted + * sub-queue identifiers. + * + * @author github.com/Kilemonn + */ +class MemcachedAuthenticator: MultiQueueAuthenticator() +{ + override val LOG: Logger = this.initialiseLogger() + + @Autowired + private lateinit var client: MemcachedClient + + override fun isRestrictedInternal(subQueue: String): Boolean + { + return getMembersSet().contains(AuthenticationMatrix(subQueue)) + } + + override fun addRestrictedEntryInternal(subQueue: String) + { + val restricted = getMembersSet() + val added = restricted.add(AuthenticationMatrix(subQueue)) + if (added) + { + client.set(RESTRICTED_KEY, 0, restricted) + } + } + + override fun removeRestrictionInternal(subQueue: String): Boolean + { + val restricted = getMembersSet() + val removed = restricted.remove(AuthenticationMatrix(subQueue)) + if (removed) + { + client.set(RESTRICTED_KEY, 0, restricted) + } + return removed + } + + /** + * Overriding to completely remove all access to the [RESTRICTED_KEY]. + * Only if [isInNoneMode] returns `false`. + */ + override fun getReservedSubQueues(): Set + { + if (!isInNoneMode()) + { + return setOf(RESTRICTED_KEY) + } + return setOf() + } + + override fun getRestrictedSubQueueIdentifiers(): Set + { + return getMembersSet().stream().map { authMatrix -> authMatrix.subQueue }.collect(Collectors.toSet()) + } + + private fun getMembersSet(): HashSet + { + return client.get>(RESTRICTED_KEY) ?: HashSet() + } + + override fun clearRestrictedSubQueues(): Long + { + val members = getMembersSet() + val existingMembersSize = members.size.toLong() + client.delete(RESTRICTED_KEY) + return existingMembersSize + } +} diff --git a/src/main/kotlin/au/kilemon/messagequeue/configuration/QueueConfiguration.kt b/src/main/kotlin/au/kilemon/messagequeue/configuration/QueueConfiguration.kt index a7cd77d..b3a3bf3 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/configuration/QueueConfiguration.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/configuration/QueueConfiguration.kt @@ -3,6 +3,7 @@ package au.kilemon.messagequeue.configuration import au.kilemon.messagequeue.MessageQueueApplication import au.kilemon.messagequeue.authentication.RestrictionMode import au.kilemon.messagequeue.authentication.authenticator.MultiQueueAuthenticator +import au.kilemon.messagequeue.authentication.authenticator.cache.memcached.MemcachedAuthenticator import au.kilemon.messagequeue.authentication.authenticator.cache.redis.RedisAuthenticator import au.kilemon.messagequeue.authentication.authenticator.inmemory.InMemoryAuthenticator import au.kilemon.messagequeue.authentication.authenticator.nosql.mongo.MongoAuthenticator @@ -127,6 +128,9 @@ class QueueConfiguration : HasLogger StorageMedium.MONGO.toString() -> { authenticator = MongoAuthenticator() } + StorageMedium.MEMCACHED.toString() -> { + authenticator = MemcachedAuthenticator() + } } LOG.info("Initialising [{}] authenticator as the [{}] is set to [{}].", authenticator::class.java.name, MessageQueueSettings.STORAGE_MEDIUM, messageQueueSettings.storageMedium) diff --git a/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/memcached/MemcachedAuthenticatorTest.kt b/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/memcached/MemcachedAuthenticatorTest.kt new file mode 100644 index 0000000..b3f555a --- /dev/null +++ b/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/memcached/MemcachedAuthenticatorTest.kt @@ -0,0 +1,88 @@ +package au.kilemon.messagequeue.authentication.authenticator.cache.memcached + +import au.kilemon.messagequeue.authentication.authenticator.MultiQueueAuthenticatorTest +import au.kilemon.messagequeue.configuration.QueueConfiguration +import au.kilemon.messagequeue.configuration.cache.memcached.MemcachedConfiguration +import au.kilemon.messagequeue.logging.LoggingConfiguration +import au.kilemon.messagequeue.queue.MultiQueueTest +import au.kilemon.messagequeue.settings.MessageQueueSettings +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.extension.ExtendWith +import org.springframework.boot.test.util.TestPropertyValues +import org.springframework.context.ApplicationContextInitializer +import org.springframework.context.ConfigurableApplicationContext +import org.springframework.context.annotation.Import +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.TestPropertySource +import org.springframework.test.context.junit.jupiter.SpringExtension +import org.testcontainers.containers.GenericContainer +import org.testcontainers.junit.jupiter.Testcontainers +import org.testcontainers.utility.DockerImageName + + +/** + * A test class for [MemcachedAuthenticator]. + * + * @author github.com/Kilemonn + */ +@ExtendWith(SpringExtension::class) +@TestPropertySource(properties = ["${MessageQueueSettings.STORAGE_MEDIUM}=MEMCACHED"]) +@Testcontainers +@ContextConfiguration(initializers = [MemcachedAuthenticatorTest.Initializer::class]) +@Import(*[LoggingConfiguration::class, MemcachedConfiguration::class, QueueConfiguration::class, MultiQueueTest.MultiQueueTestConfiguration::class]) +class MemcachedAuthenticatorTest: MultiQueueAuthenticatorTest() +{ + companion object + { + private const val MEMCACHED_PORT: Int = 11211 + private const val MEMCACHED_CONTAINER: String = "memcached:1.6.34-alpine3.21" + + lateinit var memcache: GenericContainer<*> + + /** + * Stop the container at the end of all the tests. + */ + @AfterAll + @JvmStatic + fun afterClass() + { + memcache.stop() + } + } + + /** + * The test initialiser for [MemcachedAuthenticatorTest] to initialise the container and test properties. + * + * @author github.com/Kilemonn + */ + internal class Initializer : ApplicationContextInitializer + { + /** + * Force start the container, so we can place its host and dynamic ports into the system properties. + * + * Set the environment variables before any of the beans are initialised. + */ + override fun initialize(configurableApplicationContext: ConfigurableApplicationContext) + { + memcache = GenericContainer(DockerImageName.parse(MEMCACHED_CONTAINER)) + .withExposedPorts(MEMCACHED_PORT).withReuse(false) + memcache.start() + + TestPropertyValues.of( + "${MessageQueueSettings.CACHE_ENDPOINT}=${memcache.host}:${memcache.getMappedPort(MEMCACHED_PORT)}" + ).applyTo(configurableApplicationContext.environment) + } + } + + /** + * Check the container is running before each test. + */ + @BeforeEach + fun beforeEach() + { + Assertions.assertTrue(memcache.isRunning) + multiQueueAuthenticator.clearRestrictedSubQueues() + } +} From 3a222d76d22362a8831695061359c2350c16928b Mon Sep 17 00:00:00 2001 From: Kilemonn Date: Sun, 5 Jan 2025 12:20:53 +0900 Subject: [PATCH 4/8] Fix casting error in memcached multi queue. --- .../cache/memcached/MemcachedMultiQueue.kt | 18 +++++++----------- 1 file changed, 7 insertions(+), 11 deletions(-) diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueue.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueue.kt index e26456a..e794d1c 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueue.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueue.kt @@ -102,24 +102,19 @@ class MemcachedMultiQueue(private val prefix: String = ""): MultiQueue(), HasLog override fun getSubQueueInternal(subQueue: String): Queue { - val queue = ConcurrentLinkedQueue() - var set: Set? = client.get?>(appendPrefix(subQueue)) - if (set == null) + var queue: Queue? = client.get?>(appendPrefix(subQueue)) + if (queue == null) { - set = HashSet() - client.set(appendPrefix(subQueue), 0, set) + queue = ConcurrentLinkedQueue() + client.set(appendPrefix(subQueue), 0, queue) } - if (set.isNotEmpty()) - { - queue.addAll(set.toSortedSet { message1, message2 -> (message1.id ?: 0).minus(message2.id ?: 0).toInt() }) - } return queue } override fun performHealthCheckInternal() { - client.get("") + client.get("health-check-key") } override fun getMessageByUUID(uuid: String): Optional @@ -169,7 +164,8 @@ class MemcachedMultiQueue(private val prefix: String = ""): MultiQueue(), HasLog override fun keysInternal(includeEmpty: Boolean): HashSet { - TODO("Not yet implemented") + // TODO + return HashSet() } override fun containsUUID(uuid: String): Optional From 7cb9242ea0f60de74f40ddbb8c3f0550dbccb739 Mon Sep 17 00:00:00 2001 From: Kilemonn Date: Tue, 19 May 2026 00:39:12 +0900 Subject: [PATCH 5/8] Cherry pick and fix memcached support. --- .../cache/memcached/MemcachedConfiguration.kt | 19 ++-- .../queue/cache/CacheKeyManager.kt | 7 +- .../memcached/MemcachedCacheKeyManager.kt | 59 ++++++++++++ .../cache/memcached/MemcachedMultiQueue.kt | 93 ++++++++----------- .../queue/cache/redis/RedisCacheKeyManager.kt | 10 +- .../queue/cache/redis/RedisMultiQueue.kt | 2 +- .../redis/RedisClusterAuthenticatorTest.kt | 2 +- .../redis/RedisSentinelAuthenticatorTest.kt | 7 +- .../messagequeue/queue/MultiQueueTest.kt | 2 +- .../memcached/MemcachedMultiQueueTest.kt | 9 +- 10 files changed, 129 insertions(+), 81 deletions(-) create mode 100644 src/main/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedCacheKeyManager.kt diff --git a/src/main/kotlin/au/kilemon/messagequeue/configuration/cache/memcached/MemcachedConfiguration.kt b/src/main/kotlin/au/kilemon/messagequeue/configuration/cache/memcached/MemcachedConfiguration.kt index 092ba07..50d7188 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/configuration/cache/memcached/MemcachedConfiguration.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/configuration/cache/memcached/MemcachedConfiguration.kt @@ -2,6 +2,7 @@ package au.kilemon.messagequeue.configuration.cache.memcached import au.kilemon.messagequeue.configuration.cache.redis.RedisConfiguration import au.kilemon.messagequeue.logging.HasLogger +import au.kilemon.messagequeue.queue.cache.memcached.MemcachedCacheKeyManager import au.kilemon.messagequeue.settings.MessageQueueSettings import net.rubyeye.xmemcached.MemcachedClient import net.rubyeye.xmemcached.XMemcachedClientBuilder @@ -33,21 +34,23 @@ class MemcachedConfiguration: HasLogger private lateinit var messageQueueSettings: MessageQueueSettings /** - * Create the [RedisConnectionFactory] based on the loaded configuration. - * If [MessageQueueSettings.redisUseSentinels] is `true` then multiple endpoints are expected in [MessageQueueSettings.cacheEndpoint] and will attempt to be parsed out - * and set into the [RedisSentinelConfiguration]. - * - * Otherwise, the first endpoint and port provided will be used to create a [RedisStandaloneConfiguration]. - * - * @return the created [RedisConnectionFactory] based on the configured [MessageQueueSettings] + * Create the [MemcachedClient] based on the loaded configuration. */ @Bean @ConditionalOnProperty(name=[MessageQueueSettings.STORAGE_MEDIUM], havingValue="MEMCACHED") fun getMemcachedClient(): MemcachedClient { - LOG.info("Initialising memcached configuration with the following configuration: Endpoints {}. With prefix {}.", + LOG.info("Initialising memcached configuration with the following configuration: Endpoint(s) [{}]. With prefix [{}].", messageQueueSettings.cacheEndpoint, messageQueueSettings.cachePrefix) + val builder = XMemcachedClientBuilder(RedisConfiguration.stringToInetSocketAddresses(messageQueueSettings.cacheEndpoint, MEMCACHED_DEFAULT_PORT)) return builder.build() } + + @Bean + @ConditionalOnProperty(name=[MessageQueueSettings.STORAGE_MEDIUM], havingValue="MEMCACHED") + fun getMemcachedCacheKeyManager(): MemcachedCacheKeyManager + { + return MemcachedCacheKeyManager() + } } diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/cache/CacheKeyManager.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/CacheKeyManager.kt index dcecbda..eac553e 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/cache/CacheKeyManager.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/CacheKeyManager.kt @@ -13,9 +13,14 @@ abstract class CacheKeyManager(protected val prefix: String = "") const val CACHE_KEYS_KEY: String = "messagequeue-cache-keys" } + protected fun getReservedKey(): String + { + return "$prefix$CACHE_KEYS_KEY" + } + fun getReservedKeys(): Set { - return setOf("$prefix$CACHE_KEYS_KEY") + return setOf(getReservedKey()) } abstract fun add(key: String) diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedCacheKeyManager.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedCacheKeyManager.kt new file mode 100644 index 0000000..c4023ac --- /dev/null +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedCacheKeyManager.kt @@ -0,0 +1,59 @@ +package au.kilemon.messagequeue.queue.cache.memcached + +import au.kilemon.messagequeue.queue.cache.CacheKeyManager +import net.rubyeye.xmemcached.MemcachedClient +import org.springframework.beans.factory.annotation.Autowired + +/** + * To optimise how we determine the key list/sub queue list when using a cache, this class is used to store and + * manage the subqueue list for the [MemcachedMultiQueue]. + * + * @author github.com/Kilemonn + */ +class MemcachedCacheKeyManager: CacheKeyManager() +{ + @Autowired + private lateinit var client: MemcachedClient + + private fun persistUpdatedKeysSet(keys: HashSet) + { + client.set(getReservedKey(), 0, keys) + } + + override fun add(key: String) + { + val keys = getKeys() + keys.add(key) + persistUpdatedKeysSet(keys) + } + + override fun remove(key: String) + { + val keys = getKeys() + if (keys.remove(key)) + { + persistUpdatedKeysSet(keys) + } + } + + override fun getKeys(): HashSet + { + var keys: HashSet? = client.get(getReservedKey()) + if (keys == null) + { + keys = HashSet() + } + return keys + } + + override fun contains(key: String): Boolean + { + return getKeys().contains(key) + } + + override fun clear() + { + persistUpdatedKeysSet(HashSet()) + } + +} diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueue.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueue.kt index e794d1c..2e40f72 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueue.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueue.kt @@ -3,15 +3,16 @@ package au.kilemon.messagequeue.queue.cache.memcached import au.kilemon.messagequeue.logging.HasLogger import au.kilemon.messagequeue.message.QueueMessage import au.kilemon.messagequeue.queue.MultiQueue +import au.kilemon.messagequeue.queue.cache.CacheMultiQueue +import au.kilemon.messagequeue.queue.exception.IllegalSubQueueIdentifierException import au.kilemon.messagequeue.queue.exception.MessageUpdateException -import au.kilemon.messagequeue.settings.MessageQueueSettings import net.rubyeye.xmemcached.MemcachedClient import org.slf4j.Logger import org.springframework.beans.factory.annotation.Autowired -import java.util.HashSet import java.util.Optional import java.util.Queue import java.util.concurrent.ConcurrentLinkedQueue +import kotlin.collections.HashSet /** * A `Memcached` specific implementation of the [MultiQueue]. @@ -20,76 +21,30 @@ import java.util.concurrent.ConcurrentLinkedQueue * * @author github.com/Kilemonn */ -class MemcachedMultiQueue(private val prefix: String = ""): MultiQueue(), HasLogger +class MemcachedMultiQueue(private val prefix: String): MultiQueue(), HasLogger, CacheMultiQueue { override val LOG: Logger = this.initialiseLogger() @Autowired private lateinit var client: MemcachedClient - /** - * Append the [MessageQueueSettings.cachePrefix] to the provided [subQueue] [String]. - * - * @param subQueue the [String] to add the prefix to - * @return a [String] with the provided [subQueue] with the [MessageQueueSettings.cachePrefix] appended to the beginning. - */ - private fun appendPrefix(subQueue: String): String - { - if (hasPrefix() && !subQueue.startsWith(getPrefix())) - { - return "${getPrefix()}$subQueue" - } - return subQueue - } - - /** - * @return whether the [prefix] is [String.isNotBlank] - */ - internal fun hasPrefix(): Boolean - { - return getPrefix().isNotBlank() - } + @Autowired + private lateinit var cacheKeyManager: MemcachedCacheKeyManager /** * @return [prefix] */ - internal fun getPrefix(): String + override fun getPrefix(): String { return prefix } - override fun getNextSubQueueIndex(subQueue: String): Optional - { - val queue = getSubQueue(appendPrefix(subQueue)) - return if (queue.isNotEmpty()) - { - var lastIndex = queue.last().id - if (lastIndex == null) - { - LOG.warn("subQueue [{}] is not empty but last index is null. Returning index with value [{}].", subQueue, 1) - return Optional.of(1) - } - else - { - lastIndex++ - LOG.trace("Incrementing and returning index for subQueue [{}]. Returning index with value [{}].", subQueue, lastIndex) - return Optional.of(lastIndex) - } - } - else - { - LOG.trace("subQueue [{}] is empty, returning index with value [{}].", subQueue, 1) - Optional.of(1) - } - } - override fun persistMessageInternal(message: QueueMessage) { val queue = getSubQueue(message.subQueue) val matchingMessage = queue.stream().filter{ element -> element.uuid == message.uuid }.findFirst() if (matchingMessage.isPresent) { - message.id = matchingMessage.get().id val wasRemoved = removeInternal(matchingMessage.get()) val wasReAdded = addInternal(message) if (wasRemoved && wasReAdded) @@ -109,7 +64,8 @@ class MemcachedMultiQueue(private val prefix: String = ""): MultiQueue(), HasLog client.set(appendPrefix(subQueue), 0, queue) } - return queue + // Memcached does not guarantee the order, so we need to order it ourselves + return ConcurrentLinkedQueue(queue.sortedBy { it.uuid }) } override fun performHealthCheckInternal() @@ -144,6 +100,7 @@ class MemcachedMultiQueue(private val prefix: String = ""): MultiQueue(), HasLog { LOG.debug("Attempting to clear non-existent sub-queue [{}]. No messages cleared.", subQueue) } + cacheKeyManager.remove(appendPrefix(subQueue)) return amountRemoved } @@ -164,8 +121,27 @@ class MemcachedMultiQueue(private val prefix: String = ""): MultiQueue(), HasLog override fun keysInternal(includeEmpty: Boolean): HashSet { - // TODO - return HashSet() + val keys = cacheKeyManager.getKeys() + if (includeEmpty) + { + LOG.debug("Including all empty queue keys in call to keys(). Total queue keys [{}].", keys.size) + return keys + } + else + { + val retainedKeys = HashSet() + for (key: String in keys) + { + val sizeOfQueue = getSubQueue(key).size + if (sizeOfQueue > 0) + { + LOG.trace("Sub-queue [{}] is not empty and will be returned in keys() call.", key) + retainedKeys.add(key) + } + } + LOG.debug("Removing all empty queue keys in call to keys(). Total queue keys [{}], non-empty queue keys [{}].", keys.size, retainedKeys.size) + return retainedKeys + } } override fun containsUUID(uuid: String): Optional @@ -186,8 +162,15 @@ class MemcachedMultiQueue(private val prefix: String = ""): MultiQueue(), HasLog override fun addInternal(element: QueueMessage): Boolean { + if (cacheKeyManager.getReservedKeys().contains(element.subQueue) + || cacheKeyManager.getReservedKeys().contains(appendPrefix(element.subQueue))) + { + throw IllegalSubQueueIdentifierException(element.subQueue) + } + val queue: Queue = getSubQueue(element.subQueue) val wasAdded = queue.add(element) + cacheKeyManager.add(appendPrefix(element.subQueue)) return wasAdded && client.set(appendPrefix(element.subQueue), 0, queue) } diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisCacheKeyManager.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisCacheKeyManager.kt index bef19e6..0389aaa 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisCacheKeyManager.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisCacheKeyManager.kt @@ -17,26 +17,26 @@ class RedisCacheKeyManager: CacheKeyManager() override fun add(key: String) { - redisTemplate.opsForSet().add(CACHE_KEYS_KEY, key) + redisTemplate.opsForSet().add(getReservedKey(), key) } override fun remove(key: String) { - redisTemplate.opsForSet().remove(CACHE_KEYS_KEY, key) + redisTemplate.opsForSet().remove(getReservedKey(), key) } override fun contains(key: String): Boolean { - return redisTemplate.opsForSet().isMember(CACHE_KEYS_KEY, key) + return redisTemplate.opsForSet().isMember(getReservedKey(), key) } override fun getKeys(): HashSet { - return HashSet(redisTemplate.opsForSet().members(CACHE_KEYS_KEY)) + return HashSet(redisTemplate.opsForSet().members(getReservedKey())) } override fun clear() { - redisTemplate.delete(CACHE_KEYS_KEY) + redisTemplate.delete(getReservedKey()) } } diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisMultiQueue.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisMultiQueue.kt index 1516cef..beed218 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisMultiQueue.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisMultiQueue.kt @@ -99,8 +99,8 @@ class RedisMultiQueue(private val prefix: String) : MultiQueue(), HasLogger, Cac throw IllegalSubQueueIdentifierException(element.subQueue) } - cacheKeyManager.add(appendPrefix(element.subQueue)) val result = redisTemplate.opsForSet().add(appendPrefix(element.subQueue), element) + cacheKeyManager.add(appendPrefix(element.subQueue)) return result != null && result > 0 } diff --git a/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisClusterAuthenticatorTest.kt b/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisClusterAuthenticatorTest.kt index d9d71dc..a0dad92 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisClusterAuthenticatorTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisClusterAuthenticatorTest.kt @@ -132,7 +132,7 @@ class RedisClusterAuthenticatorTest: MultiQueueAuthenticatorTest() val endpoints = StringBuilder() redisInstances.forEach { endpoints.append("$hostIp:${it.getMappedPort(RedisConfiguration.REDIS_DEFAULT_PORT.toInt())},") } TestPropertyValues.of( - "${MessageQueueSettings.REDIS_ENDPOINT}=${endpoints}", + "${MessageQueueSettings.CACHE_PREFIX}=${endpoints}", "${MessageQueueSettings.REDIS_MODE}=${RedisMode.CLUSTER.name}" ).applyTo(configurableApplicationContext.environment) } diff --git a/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisSentinelAuthenticatorTest.kt b/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisSentinelAuthenticatorTest.kt index 15fe8d3..0edc5a8 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisSentinelAuthenticatorTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisSentinelAuthenticatorTest.kt @@ -83,13 +83,8 @@ class RedisSentinelAuthenticatorTest: MultiQueueAuthenticatorTest() sentinel.start() TestPropertyValues.of( -<<<<<<< HEAD - "${MessageQueueSettings.REDIS_ENDPOINT}=${sentinel.host}:${sentinel.getMappedPort(RedisConfiguration.REDIS_SENTINEL_DEFAULT_PORT.toInt())}", - "${MessageQueueSettings.REDIS_MODE}=${RedisMode.SENTINEL.name}" -======= "${MessageQueueSettings.CACHE_ENDPOINT}=${sentinel.host}:${sentinel.getMappedPort(RedisConfiguration.REDIS_SENTINEL_DEFAULT_PORT.toInt())}", - "${MessageQueueSettings.REDIS_USE_SENTINELS}=true" ->>>>>>> 36e6084 (Work through adding memcached support. Implementing most methods the same way redis is implemented for now.) + "${MessageQueueSettings.REDIS_MODE}=${RedisMode.SENTINEL.name}" ).applyTo(configurableApplicationContext.environment) } } diff --git a/src/test/kotlin/au/kilemon/messagequeue/queue/MultiQueueTest.kt b/src/test/kotlin/au/kilemon/messagequeue/queue/MultiQueueTest.kt index 44977f7..fd888b9 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/queue/MultiQueueTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/queue/MultiQueueTest.kt @@ -294,7 +294,7 @@ abstract class MultiQueueTest queue.forEach { message -> if (previousUuid != null) { - Assertions.assertTrue(previousUuid.compareTo(message.uuid) < 0) + Assertions.assertTrue(previousUuid < message.uuid) } previousUuid = message.uuid diff --git a/src/test/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueueTest.kt b/src/test/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueueTest.kt index 6ca17ed..6f1332c 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueueTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueueTest.kt @@ -5,6 +5,7 @@ import au.kilemon.messagequeue.configuration.cache.memcached.MemcachedConfigurat import au.kilemon.messagequeue.configuration.cache.redis.RedisConfiguration import au.kilemon.messagequeue.logging.LoggingConfiguration import au.kilemon.messagequeue.queue.MultiQueueTest +import au.kilemon.messagequeue.queue.cache.CacheMultiQueueTest import au.kilemon.messagequeue.queue.cache.redis.RedisStandAloneMultiQueueTest import au.kilemon.messagequeue.settings.MessageQueueSettings import org.junit.jupiter.api.AfterAll @@ -28,11 +29,12 @@ import org.testcontainers.utility.DockerImageName * @author github.com/Kilemonn */ @ExtendWith(SpringExtension::class) -@TestPropertySource(properties = ["${MessageQueueSettings.STORAGE_MEDIUM}=MEMCACHED", "${MessageQueueSettings.CACHE_PREFIX}=test"]) +@TestPropertySource(properties = ["${MessageQueueSettings.STORAGE_MEDIUM}=MEMCACHED", + "${MessageQueueSettings.CACHE_PREFIX}=test"]) @Testcontainers @ContextConfiguration(initializers = [MemcachedMultiQueueTest.Initializer::class]) @Import(*[QueueConfiguration::class, LoggingConfiguration::class, MemcachedConfiguration::class, MultiQueueTest.MultiQueueTestConfiguration::class]) -class MemcachedMultiQueueTest: MultiQueueTest() +class MemcachedMultiQueueTest: CacheMultiQueueTest() { companion object { @@ -80,8 +82,9 @@ class MemcachedMultiQueueTest: MultiQueueTest() * Check the container is running before each test. */ @BeforeEach - fun beforeEach() + override fun beforeEach() { + super.beforeEach() Assertions.assertTrue(memcache.isRunning) } } From fd59a1afcff3d5c2d224d16a79cf28b27e4f6a4f Mon Sep 17 00:00:00 2001 From: Kilemonn Date: Tue, 19 May 2026 00:44:31 +0900 Subject: [PATCH 6/8] Fix test from merge. --- .../authenticator/cache/redis/RedisClusterAuthenticatorTest.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisClusterAuthenticatorTest.kt b/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisClusterAuthenticatorTest.kt index a0dad92..72009cc 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisClusterAuthenticatorTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisClusterAuthenticatorTest.kt @@ -132,7 +132,7 @@ class RedisClusterAuthenticatorTest: MultiQueueAuthenticatorTest() val endpoints = StringBuilder() redisInstances.forEach { endpoints.append("$hostIp:${it.getMappedPort(RedisConfiguration.REDIS_DEFAULT_PORT.toInt())},") } TestPropertyValues.of( - "${MessageQueueSettings.CACHE_PREFIX}=${endpoints}", + "${MessageQueueSettings.CACHE_ENDPOINT}=${endpoints}", "${MessageQueueSettings.REDIS_MODE}=${RedisMode.CLUSTER.name}" ).applyTo(configurableApplicationContext.environment) } From 79a455c65b011aa8ea6568f9841cf8e481cf7c21 Mon Sep 17 00:00:00 2001 From: Kilemonn Date: Tue, 19 May 2026 20:43:04 +0900 Subject: [PATCH 7/8] Qualify the RedisCacheKeyManager redis template (RedisTemplate) object --- .../configuration/cache/redis/RedisConfiguration.kt | 2 +- .../messagequeue/queue/cache/redis/RedisCacheKeyManager.kt | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/kotlin/au/kilemon/messagequeue/configuration/cache/redis/RedisConfiguration.kt b/src/main/kotlin/au/kilemon/messagequeue/configuration/cache/redis/RedisConfiguration.kt index 65e4972..279cec1 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/configuration/cache/redis/RedisConfiguration.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/configuration/cache/redis/RedisConfiguration.kt @@ -265,7 +265,7 @@ class RedisConfiguration: HasLogger return template } - @Bean + @Bean(name=["RedisCacheKeyManagerTemplate"]) @ConditionalOnProperty(name=[MessageQueueSettings.STORAGE_MEDIUM], havingValue="REDIS") fun getRedisCacheKeyManagerRedisTemplate(): RedisTemplate { diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisCacheKeyManager.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisCacheKeyManager.kt index 0389aaa..b5eee6c 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisCacheKeyManager.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisCacheKeyManager.kt @@ -2,6 +2,7 @@ package au.kilemon.messagequeue.queue.cache.redis import au.kilemon.messagequeue.queue.cache.CacheKeyManager import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Qualifier import org.springframework.data.redis.core.RedisTemplate /** @@ -12,6 +13,7 @@ import org.springframework.data.redis.core.RedisTemplate */ class RedisCacheKeyManager: CacheKeyManager() { + @Qualifier("RedisCacheKeyManagerTemplate") @Autowired private lateinit var redisTemplate: RedisTemplate From 80554573b0c427167354cb3abdc3c3a4ec291089 Mon Sep 17 00:00:00 2001 From: Kilemonn Date: Tue, 19 May 2026 21:22:29 +0900 Subject: [PATCH 8/8] Update test, update memcached image version. Make sure the prefix is removed for CacheMultiQueue not just RedisMultiQueue. --- .../messagequeue/rest/controller/MessageQueueController.kt | 5 +++-- .../cache/memcached/MemcachedAuthenticatorTest.kt | 2 +- .../messagequeue/queue/cache/CacheMultiQueueTest.kt | 7 +++++++ .../queue/cache/memcached/MemcachedMultiQueueTest.kt | 2 +- 4 files changed, 12 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/au/kilemon/messagequeue/rest/controller/MessageQueueController.kt b/src/main/kotlin/au/kilemon/messagequeue/rest/controller/MessageQueueController.kt index c5cfce9..ec270a6 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/rest/controller/MessageQueueController.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/rest/controller/MessageQueueController.kt @@ -4,6 +4,7 @@ import au.kilemon.messagequeue.authentication.authenticator.MultiQueueAuthentica import au.kilemon.messagequeue.logging.HasLogger import au.kilemon.messagequeue.message.QueueMessage import au.kilemon.messagequeue.queue.MultiQueue +import au.kilemon.messagequeue.queue.cache.CacheMultiQueue import au.kilemon.messagequeue.queue.cache.redis.RedisMultiQueue import au.kilemon.messagequeue.queue.exception.DuplicateMessageException import au.kilemon.messagequeue.queue.exception.HealthCheckFailureException @@ -264,9 +265,9 @@ open class MessageQueueController : HasLogger @RequestParam(required = false, name = RestParameters.INCLUDE_EMPTY) includeEmpty: Boolean?): ResponseEntity { val keys = messageQueue.keys(includeEmpty != false) - if (messageQueue is RedisMultiQueue) + if (messageQueue is CacheMultiQueue) { - return ResponseEntity.ok(KeysResponse((messageQueue as RedisMultiQueue).removePrefix(keys))) + return ResponseEntity.ok(KeysResponse((messageQueue as CacheMultiQueue).removePrefix(keys))) } return ResponseEntity.ok(KeysResponse(keys)) } diff --git a/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/memcached/MemcachedAuthenticatorTest.kt b/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/memcached/MemcachedAuthenticatorTest.kt index b3f555a..f61a8f1 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/memcached/MemcachedAuthenticatorTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/memcached/MemcachedAuthenticatorTest.kt @@ -37,7 +37,7 @@ class MemcachedAuthenticatorTest: MultiQueueAuthenticatorTest() companion object { private const val MEMCACHED_PORT: Int = 11211 - private const val MEMCACHED_CONTAINER: String = "memcached:1.6.34-alpine3.21" + private const val MEMCACHED_CONTAINER: String = "memcached:1.6.41-alpine3.23" lateinit var memcache: GenericContainer<*> diff --git a/src/test/kotlin/au/kilemon/messagequeue/queue/cache/CacheMultiQueueTest.kt b/src/test/kotlin/au/kilemon/messagequeue/queue/cache/CacheMultiQueueTest.kt index ede662c..f2de9c2 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/queue/cache/CacheMultiQueueTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/queue/cache/CacheMultiQueueTest.kt @@ -80,6 +80,13 @@ abstract class CacheMultiQueueTest: MultiQueueTest() val keys = keyManager.getKeys() Assertions.assertEquals(keysSize, keys.size) + + val multiQueueKeys = multiQueue.keys() + IntStream.range(0, keysSize).forEach { i -> + val key = "$allSubQueuePrefix$keyPrefix$i" + Assertions.assertTrue(keys.contains(key)) + Assertions.assertTrue(multiQueueKeys.contains(key)) + } } /** diff --git a/src/test/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueueTest.kt b/src/test/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueueTest.kt index 6f1332c..1554bc4 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueueTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueueTest.kt @@ -39,7 +39,7 @@ class MemcachedMultiQueueTest: CacheMultiQueueTest() companion object { private const val MEMCACHED_PORT: Int = 11211 - private const val MEMCACHED_CONTAINER: String = "memcached:1.6.34-alpine3.21" + private const val MEMCACHED_CONTAINER: String = "memcached:1.6.41-alpine3.23" lateinit var memcache: GenericContainer<*>