diff --git a/build.gradle.kts b/build.gradle.kts index e36edde..6df6b25 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -32,6 +32,10 @@ dependencies { // JPA dependency implementation("org.springframework.boot:spring-boot-starter-data-jpa:${springVersion}") + // Memcached + // https://mvnrepository.com/artifact/com.googlecode.xmemcached/xmemcached + implementation("com.googlecode.xmemcached:xmemcached:2.4.8") + // https://mvnrepository.com/artifact/org.springdoc/springdoc-openapi-starter-webmvc-ui implementation("org.springdoc:springdoc-openapi-starter-webmvc-api:${springDocVersion}") // https://mvnrepository.com/artifact/org.springdoc/springdoc-openapi-starter-webmvc-ui diff --git a/src/main/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/memcached/MemcachedAuthenticator.kt b/src/main/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/memcached/MemcachedAuthenticator.kt new file mode 100644 index 0000000..0ea717e --- /dev/null +++ b/src/main/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/memcached/MemcachedAuthenticator.kt @@ -0,0 +1,80 @@ +package au.kilemon.messagequeue.authentication.authenticator.cache.memcached + +import au.kilemon.messagequeue.authentication.AuthenticationMatrix +import au.kilemon.messagequeue.authentication.authenticator.MultiQueueAuthenticator +import au.kilemon.messagequeue.authentication.authenticator.cache.redis.RedisAuthenticator.Companion.RESTRICTED_KEY +import net.rubyeye.xmemcached.MemcachedClient +import org.slf4j.Logger +import org.springframework.beans.factory.annotation.Autowired +import java.util.stream.Collectors + +/** + * A [MultiQueueAuthenticator] implementation using Memcached as the storage mechanism for the restricted + * sub-queue identifiers. + * + * @author github.com/Kilemonn + */ +class MemcachedAuthenticator: MultiQueueAuthenticator() +{ + override val LOG: Logger = this.initialiseLogger() + + @Autowired + private lateinit var client: MemcachedClient + + override fun isRestrictedInternal(subQueue: String): Boolean + { + return getMembersSet().contains(AuthenticationMatrix(subQueue)) + } + + override fun addRestrictedEntryInternal(subQueue: String) + { + val restricted = getMembersSet() + val added = restricted.add(AuthenticationMatrix(subQueue)) + if (added) + { + client.set(RESTRICTED_KEY, 0, restricted) + } + } + + override fun removeRestrictionInternal(subQueue: String): Boolean + { + val restricted = getMembersSet() + val removed = restricted.remove(AuthenticationMatrix(subQueue)) + if (removed) + { + client.set(RESTRICTED_KEY, 0, restricted) + } + return removed + } + + /** + * Overriding to completely remove all access to the [RESTRICTED_KEY]. + * Only if [isInNoneMode] returns `false`. + */ + override fun getReservedSubQueues(): Set + { + if (!isInNoneMode()) + { + return setOf(RESTRICTED_KEY) + } + return setOf() + } + + override fun getRestrictedSubQueueIdentifiers(): Set + { + return getMembersSet().stream().map { authMatrix -> authMatrix.subQueue }.collect(Collectors.toSet()) + } + + private fun getMembersSet(): HashSet + { + return client.get>(RESTRICTED_KEY) ?: HashSet() + } + + override fun clearRestrictedSubQueues(): Long + { + val members = getMembersSet() + val existingMembersSize = members.size.toLong() + client.delete(RESTRICTED_KEY) + return existingMembersSize + } +} diff --git a/src/main/kotlin/au/kilemon/messagequeue/configuration/QueueConfiguration.kt b/src/main/kotlin/au/kilemon/messagequeue/configuration/QueueConfiguration.kt index 39c0f74..b3a3bf3 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/configuration/QueueConfiguration.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/configuration/QueueConfiguration.kt @@ -3,6 +3,7 @@ package au.kilemon.messagequeue.configuration import au.kilemon.messagequeue.MessageQueueApplication import au.kilemon.messagequeue.authentication.RestrictionMode import au.kilemon.messagequeue.authentication.authenticator.MultiQueueAuthenticator +import au.kilemon.messagequeue.authentication.authenticator.cache.memcached.MemcachedAuthenticator import au.kilemon.messagequeue.authentication.authenticator.cache.redis.RedisAuthenticator import au.kilemon.messagequeue.authentication.authenticator.inmemory.InMemoryAuthenticator import au.kilemon.messagequeue.authentication.authenticator.nosql.mongo.MongoAuthenticator @@ -13,6 +14,7 @@ import au.kilemon.messagequeue.logging.Messages import au.kilemon.messagequeue.message.QueueMessage import au.kilemon.messagequeue.queue.MultiQueue import au.kilemon.messagequeue.queue.cache.CacheKeyManager +import au.kilemon.messagequeue.queue.cache.memcached.MemcachedMultiQueue import au.kilemon.messagequeue.queue.cache.redis.RedisMultiQueue import au.kilemon.messagequeue.queue.inmemory.InMemoryMultiQueue import au.kilemon.messagequeue.queue.nosql.mongo.MongoMultiQueue @@ -58,7 +60,7 @@ class QueueConfiguration : HasLogger var queue: MultiQueue = InMemoryMultiQueue() when (messageQueueSettings.storageMedium.uppercase()) { StorageMedium.REDIS.toString() -> { - queue = RedisMultiQueue(messageQueueSettings.redisPrefix) + queue = RedisMultiQueue(messageQueueSettings.cachePrefix) } StorageMedium.SQL.toString() -> { queue = SqlMultiQueue() @@ -66,6 +68,9 @@ class QueueConfiguration : HasLogger StorageMedium.MONGO.toString() -> { queue = MongoMultiQueue() } + StorageMedium.MEMCACHED.toString() -> { + queue = MemcachedMultiQueue(messageQueueSettings.cachePrefix) + } } LOG.info("Initialising [{}] queue as the [{}] is set to [{}].", queue::class.java.name, MessageQueueSettings.STORAGE_MEDIUM, messageQueueSettings.storageMedium) @@ -115,7 +120,7 @@ class QueueConfiguration : HasLogger var authenticator: MultiQueueAuthenticator = InMemoryAuthenticator() when (messageQueueSettings.storageMedium.uppercase()) { StorageMedium.REDIS.toString() -> { - authenticator = RedisAuthenticator(messageQueueSettings.redisPrefix) + authenticator = RedisAuthenticator(messageQueueSettings.cachePrefix) } StorageMedium.SQL.toString() -> { authenticator = SqlAuthenticator() @@ -123,6 +128,9 @@ class QueueConfiguration : HasLogger StorageMedium.MONGO.toString() -> { authenticator = MongoAuthenticator() } + StorageMedium.MEMCACHED.toString() -> { + authenticator = MemcachedAuthenticator() + } } LOG.info("Initialising [{}] authenticator as the [{}] is set to [{}].", authenticator::class.java.name, MessageQueueSettings.STORAGE_MEDIUM, messageQueueSettings.storageMedium) diff --git a/src/main/kotlin/au/kilemon/messagequeue/configuration/cache/memcached/MemcachedConfiguration.kt b/src/main/kotlin/au/kilemon/messagequeue/configuration/cache/memcached/MemcachedConfiguration.kt new file mode 100644 index 0000000..50d7188 --- /dev/null +++ b/src/main/kotlin/au/kilemon/messagequeue/configuration/cache/memcached/MemcachedConfiguration.kt @@ -0,0 +1,56 @@ +package au.kilemon.messagequeue.configuration.cache.memcached + +import au.kilemon.messagequeue.configuration.cache.redis.RedisConfiguration +import au.kilemon.messagequeue.logging.HasLogger +import au.kilemon.messagequeue.queue.cache.memcached.MemcachedCacheKeyManager +import au.kilemon.messagequeue.settings.MessageQueueSettings +import net.rubyeye.xmemcached.MemcachedClient +import net.rubyeye.xmemcached.XMemcachedClientBuilder +import org.slf4j.Logger +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.data.redis.connection.RedisConnectionFactory +import org.springframework.data.redis.connection.RedisSentinelConfiguration +import org.springframework.data.redis.connection.RedisStandaloneConfiguration + +/** + * A class that creates the required [Bean] objects when memcached is enabled. + * + * @author github.com/Kilemonn + */ +@Configuration +class MemcachedConfiguration: HasLogger +{ + override val LOG: Logger = this.initialiseLogger() + + companion object + { + const val MEMCACHED_DEFAULT_PORT: String = "11211" + } + + @Autowired + private lateinit var messageQueueSettings: MessageQueueSettings + + /** + * Create the [MemcachedClient] based on the loaded configuration. + */ + @Bean + @ConditionalOnProperty(name=[MessageQueueSettings.STORAGE_MEDIUM], havingValue="MEMCACHED") + fun getMemcachedClient(): MemcachedClient + { + LOG.info("Initialising memcached configuration with the following configuration: Endpoint(s) [{}]. With prefix [{}].", + messageQueueSettings.cacheEndpoint, messageQueueSettings.cachePrefix) + + val builder = XMemcachedClientBuilder(RedisConfiguration.stringToInetSocketAddresses(messageQueueSettings.cacheEndpoint, MEMCACHED_DEFAULT_PORT)) + return builder.build() + } + + @Bean + @ConditionalOnProperty(name=[MessageQueueSettings.STORAGE_MEDIUM], havingValue="MEMCACHED") + fun getMemcachedCacheKeyManager(): MemcachedCacheKeyManager + { + return MemcachedCacheKeyManager() + } +} diff --git a/src/main/kotlin/au/kilemon/messagequeue/configuration/cache/redis/RedisConfiguration.kt b/src/main/kotlin/au/kilemon/messagequeue/configuration/cache/redis/RedisConfiguration.kt index 0d784e1..279cec1 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/configuration/cache/redis/RedisConfiguration.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/configuration/cache/redis/RedisConfiguration.kt @@ -116,7 +116,11 @@ class RedisConfiguration: HasLogger /** * Create the [RedisConnectionFactory] based on the loaded configuration. +<<<<<<< HEAD * If [MessageQueueSettings.redisMode] is `true` then multiple endpoints are expected in [MessageQueueSettings.redisEndpoint] and will attempt to be parsed out +======= + * If [MessageQueueSettings.redisUseSentinels] is `true` then multiple endpoints are expected in [MessageQueueSettings.cacheEndpoint] and will attempt to be parsed out +>>>>>>> 36e6084 (Work through adding memcached support. Implementing most methods the same way redis is implemented for now.) * and set into the [RedisSentinelConfiguration]. * * Otherwise, the first endpoint and port provided will be used to create a [RedisStandaloneConfiguration]. @@ -148,10 +152,11 @@ class RedisConfiguration: HasLogger fun getSentinelConfiguration(): RedisSentinelConfiguration { LOG.info("Initialising redis sentinel configuration with the following configuration: Endpoints {}, master {}. With prefix {}.", - messageQueueSettings.redisEndpoint, messageQueueSettings.redisMasterName, messageQueueSettings.redisPrefix) + messageQueueSettings.cacheEndpoint, messageQueueSettings.redisMasterName, messageQueueSettings.cachePrefix) + val redisSentinelConfiguration = RedisSentinelConfiguration() redisSentinelConfiguration.master(messageQueueSettings.redisMasterName) - val sentinelEndpoints = stringToInetSocketAddresses(messageQueueSettings.redisEndpoint, REDIS_SENTINEL_DEFAULT_PORT) + val sentinelEndpoints = stringToInetSocketAddresses(messageQueueSettings.cacheEndpoint, REDIS_SENTINEL_DEFAULT_PORT) if (sentinelEndpoints.isEmpty()) { @@ -173,9 +178,10 @@ class RedisConfiguration: HasLogger fun getStandAloneConfiguration(): RedisStandaloneConfiguration { LOG.info("Initialising redis standalone configuration with the following configuration: Endpoint [{}], prefix [{}].", - messageQueueSettings.redisEndpoint, messageQueueSettings.redisPrefix) + messageQueueSettings.cacheEndpoint, messageQueueSettings.cachePrefix) + val redisConfiguration = RedisStandaloneConfiguration() - val redisEndpoints = stringToInetSocketAddresses(messageQueueSettings.redisEndpoint, REDIS_DEFAULT_PORT) + val redisEndpoints = stringToInetSocketAddresses(messageQueueSettings.cacheEndpoint, REDIS_DEFAULT_PORT) if (redisEndpoints.isEmpty()) { LOG.error("No redis endpoints defined for standalone configuration. Unable to initialise redis configuration.") @@ -194,10 +200,10 @@ class RedisConfiguration: HasLogger fun getClusterConfiguration(): RedisClusterConfiguration { LOG.info("Initialising redis cluster configuration with the following configuration: Endpoint [{}], prefix [{}].", - messageQueueSettings.redisEndpoint, messageQueueSettings.redisPrefix) + messageQueueSettings.cacheEndpoint, messageQueueSettings.cachePrefix) val configuration = RedisClusterConfiguration() - val nodes = endpointToNodes(messageQueueSettings.redisEndpoint) + val nodes = endpointToNodes(messageQueueSettings.cacheEndpoint) configuration.setClusterNodes(nodes) return configuration @@ -259,7 +265,7 @@ class RedisConfiguration: HasLogger return template } - @Bean + @Bean(name=["RedisCacheKeyManagerTemplate"]) @ConditionalOnProperty(name=[MessageQueueSettings.STORAGE_MEDIUM], havingValue="REDIS") fun getRedisCacheKeyManagerRedisTemplate(): RedisTemplate { diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/cache/CacheKeyManager.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/CacheKeyManager.kt index dcecbda..eac553e 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/cache/CacheKeyManager.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/CacheKeyManager.kt @@ -13,9 +13,14 @@ abstract class CacheKeyManager(protected val prefix: String = "") const val CACHE_KEYS_KEY: String = "messagequeue-cache-keys" } + protected fun getReservedKey(): String + { + return "$prefix$CACHE_KEYS_KEY" + } + fun getReservedKeys(): Set { - return setOf("$prefix$CACHE_KEYS_KEY") + return setOf(getReservedKey()) } abstract fun add(key: String) diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedCacheKeyManager.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedCacheKeyManager.kt new file mode 100644 index 0000000..c4023ac --- /dev/null +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedCacheKeyManager.kt @@ -0,0 +1,59 @@ +package au.kilemon.messagequeue.queue.cache.memcached + +import au.kilemon.messagequeue.queue.cache.CacheKeyManager +import net.rubyeye.xmemcached.MemcachedClient +import org.springframework.beans.factory.annotation.Autowired + +/** + * To optimise how we determine the key list/sub queue list when using a cache, this class is used to store and + * manage the subqueue list for the [MemcachedMultiQueue]. + * + * @author github.com/Kilemonn + */ +class MemcachedCacheKeyManager: CacheKeyManager() +{ + @Autowired + private lateinit var client: MemcachedClient + + private fun persistUpdatedKeysSet(keys: HashSet) + { + client.set(getReservedKey(), 0, keys) + } + + override fun add(key: String) + { + val keys = getKeys() + keys.add(key) + persistUpdatedKeysSet(keys) + } + + override fun remove(key: String) + { + val keys = getKeys() + if (keys.remove(key)) + { + persistUpdatedKeysSet(keys) + } + } + + override fun getKeys(): HashSet + { + var keys: HashSet? = client.get(getReservedKey()) + if (keys == null) + { + keys = HashSet() + } + return keys + } + + override fun contains(key: String): Boolean + { + return getKeys().contains(key) + } + + override fun clear() + { + persistUpdatedKeysSet(HashSet()) + } + +} diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueue.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueue.kt new file mode 100644 index 0000000..2e40f72 --- /dev/null +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueue.kt @@ -0,0 +1,183 @@ +package au.kilemon.messagequeue.queue.cache.memcached + +import au.kilemon.messagequeue.logging.HasLogger +import au.kilemon.messagequeue.message.QueueMessage +import au.kilemon.messagequeue.queue.MultiQueue +import au.kilemon.messagequeue.queue.cache.CacheMultiQueue +import au.kilemon.messagequeue.queue.exception.IllegalSubQueueIdentifierException +import au.kilemon.messagequeue.queue.exception.MessageUpdateException +import net.rubyeye.xmemcached.MemcachedClient +import org.slf4j.Logger +import org.springframework.beans.factory.annotation.Autowired +import java.util.Optional +import java.util.Queue +import java.util.concurrent.ConcurrentLinkedQueue +import kotlin.collections.HashSet + +/** + * A `Memcached` specific implementation of the [MultiQueue]. + * All messages stored and accessed directly from the `Memcached` instance. + * This increases overhead when checking UUID, but it is required incase the cache is edited manually, or by another message managing instance. + * + * @author github.com/Kilemonn + */ +class MemcachedMultiQueue(private val prefix: String): MultiQueue(), HasLogger, CacheMultiQueue +{ + override val LOG: Logger = this.initialiseLogger() + + @Autowired + private lateinit var client: MemcachedClient + + @Autowired + private lateinit var cacheKeyManager: MemcachedCacheKeyManager + + /** + * @return [prefix] + */ + override fun getPrefix(): String + { + return prefix + } + + override fun persistMessageInternal(message: QueueMessage) + { + val queue = getSubQueue(message.subQueue) + val matchingMessage = queue.stream().filter{ element -> element.uuid == message.uuid }.findFirst() + if (matchingMessage.isPresent) + { + val wasRemoved = removeInternal(matchingMessage.get()) + val wasReAdded = addInternal(message) + if (wasRemoved && wasReAdded) + { + return + } + } + throw MessageUpdateException(message.uuid) + } + + override fun getSubQueueInternal(subQueue: String): Queue + { + var queue: Queue? = client.get?>(appendPrefix(subQueue)) + if (queue == null) + { + queue = ConcurrentLinkedQueue() + client.set(appendPrefix(subQueue), 0, queue) + } + + // Memcached does not guarantee the order, so we need to order it ourselves + return ConcurrentLinkedQueue(queue.sortedBy { it.uuid }) + } + + override fun performHealthCheckInternal() + { + client.get("health-check-key") + } + + override fun getMessageByUUID(uuid: String): Optional + { + val subQueue = containsUUID(uuid) + if (subQueue.isPresent) + { + LOG.trace("Found message with uuid [{}].", uuid) + val queue: Queue = getSubQueue(subQueue.get()) + return queue.stream().filter { message -> message.uuid == uuid }.findFirst() + } + LOG.trace("No message found with uuid [{}].", uuid) + return Optional.empty() + } + + override fun clearSubQueueInternal(subQueue: String): Int + { + var amountRemoved = 0 + val queue = getSubQueue(subQueue) + if (queue.isNotEmpty()) + { + amountRemoved = queue.size + client.delete(appendPrefix(subQueue)) + LOG.debug("Cleared existing sub-queue [{}]. Removed [{}] message entries.", subQueue, amountRemoved) + } + else + { + LOG.debug("Attempting to clear non-existent sub-queue [{}]. No messages cleared.", subQueue) + } + cacheKeyManager.remove(appendPrefix(subQueue)) + return amountRemoved + } + + override fun isEmptySubQueue(subQueue: String): Boolean + { + return getSubQueue(subQueue).isEmpty() + } + + override fun pollInternal(subQueue: String): Optional + { + val queue = getSubQueue(subQueue) + if (queue.isNotEmpty()) + { + return Optional.of(queue.iterator().next()) + } + return Optional.empty() + } + + override fun keysInternal(includeEmpty: Boolean): HashSet + { + val keys = cacheKeyManager.getKeys() + if (includeEmpty) + { + LOG.debug("Including all empty queue keys in call to keys(). Total queue keys [{}].", keys.size) + return keys + } + else + { + val retainedKeys = HashSet() + for (key: String in keys) + { + val sizeOfQueue = getSubQueue(key).size + if (sizeOfQueue > 0) + { + LOG.trace("Sub-queue [{}] is not empty and will be returned in keys() call.", key) + retainedKeys.add(key) + } + } + LOG.debug("Removing all empty queue keys in call to keys(). Total queue keys [{}], non-empty queue keys [{}].", keys.size, retainedKeys.size) + return retainedKeys + } + } + + override fun containsUUID(uuid: String): Optional + { + for (key in keys()) + { + val queue = getSubQueue(key) + val anyMatchTheUUID = queue.stream().anyMatch{ message -> uuid == message.uuid } + if (anyMatchTheUUID) + { + LOG.debug("Found sub-queue [{}] for message UUID: [{}].", key, uuid) + return Optional.of(key) + } + } + LOG.debug("No sub-queue contains message with UUID: [{}].", uuid) + return Optional.empty() + } + + override fun addInternal(element: QueueMessage): Boolean + { + if (cacheKeyManager.getReservedKeys().contains(element.subQueue) + || cacheKeyManager.getReservedKeys().contains(appendPrefix(element.subQueue))) + { + throw IllegalSubQueueIdentifierException(element.subQueue) + } + + val queue: Queue = getSubQueue(element.subQueue) + val wasAdded = queue.add(element) + cacheKeyManager.add(appendPrefix(element.subQueue)) + return wasAdded && client.set(appendPrefix(element.subQueue), 0, queue) + } + + override fun removeInternal(element: QueueMessage): Boolean + { + val queue: Queue = getSubQueue(element.subQueue) + val wasRemoved = queue.remove(element) + return wasRemoved && client.set(appendPrefix(element.subQueue), 0, queue) + } +} diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisCacheKeyManager.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisCacheKeyManager.kt index bef19e6..b5eee6c 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisCacheKeyManager.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisCacheKeyManager.kt @@ -2,6 +2,7 @@ package au.kilemon.messagequeue.queue.cache.redis import au.kilemon.messagequeue.queue.cache.CacheKeyManager import org.springframework.beans.factory.annotation.Autowired +import org.springframework.beans.factory.annotation.Qualifier import org.springframework.data.redis.core.RedisTemplate /** @@ -12,31 +13,32 @@ import org.springframework.data.redis.core.RedisTemplate */ class RedisCacheKeyManager: CacheKeyManager() { + @Qualifier("RedisCacheKeyManagerTemplate") @Autowired private lateinit var redisTemplate: RedisTemplate override fun add(key: String) { - redisTemplate.opsForSet().add(CACHE_KEYS_KEY, key) + redisTemplate.opsForSet().add(getReservedKey(), key) } override fun remove(key: String) { - redisTemplate.opsForSet().remove(CACHE_KEYS_KEY, key) + redisTemplate.opsForSet().remove(getReservedKey(), key) } override fun contains(key: String): Boolean { - return redisTemplate.opsForSet().isMember(CACHE_KEYS_KEY, key) + return redisTemplate.opsForSet().isMember(getReservedKey(), key) } override fun getKeys(): HashSet { - return HashSet(redisTemplate.opsForSet().members(CACHE_KEYS_KEY)) + return HashSet(redisTemplate.opsForSet().members(getReservedKey())) } override fun clear() { - redisTemplate.delete(CACHE_KEYS_KEY) + redisTemplate.delete(getReservedKey()) } } diff --git a/src/main/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisMultiQueue.kt b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisMultiQueue.kt index 2f0f51d..beed218 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisMultiQueue.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisMultiQueue.kt @@ -43,9 +43,6 @@ class RedisMultiQueue(private val prefix: String) : MultiQueue(), HasLogger, Cac return prefix } - /** - * Attempts to append the prefix before requesting the underlying redis entry if the provided [subQueue] is not prefixed with [MessageQueueSettings.redisPrefix]. - */ override fun getSubQueueInternal(subQueue: String): Queue { val queue = ConcurrentLinkedQueue() @@ -102,8 +99,8 @@ class RedisMultiQueue(private val prefix: String) : MultiQueue(), HasLogger, Cac throw IllegalSubQueueIdentifierException(element.subQueue) } - cacheKeyManager.add(appendPrefix(element.subQueue)) val result = redisTemplate.opsForSet().add(appendPrefix(element.subQueue), element) + cacheKeyManager.add(appendPrefix(element.subQueue)) return result != null && result > 0 } diff --git a/src/main/kotlin/au/kilemon/messagequeue/rest/controller/MessageQueueController.kt b/src/main/kotlin/au/kilemon/messagequeue/rest/controller/MessageQueueController.kt index c5cfce9..ec270a6 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/rest/controller/MessageQueueController.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/rest/controller/MessageQueueController.kt @@ -4,6 +4,7 @@ import au.kilemon.messagequeue.authentication.authenticator.MultiQueueAuthentica import au.kilemon.messagequeue.logging.HasLogger import au.kilemon.messagequeue.message.QueueMessage import au.kilemon.messagequeue.queue.MultiQueue +import au.kilemon.messagequeue.queue.cache.CacheMultiQueue import au.kilemon.messagequeue.queue.cache.redis.RedisMultiQueue import au.kilemon.messagequeue.queue.exception.DuplicateMessageException import au.kilemon.messagequeue.queue.exception.HealthCheckFailureException @@ -264,9 +265,9 @@ open class MessageQueueController : HasLogger @RequestParam(required = false, name = RestParameters.INCLUDE_EMPTY) includeEmpty: Boolean?): ResponseEntity { val keys = messageQueue.keys(includeEmpty != false) - if (messageQueue is RedisMultiQueue) + if (messageQueue is CacheMultiQueue) { - return ResponseEntity.ok(KeysResponse((messageQueue as RedisMultiQueue).removePrefix(keys))) + return ResponseEntity.ok(KeysResponse((messageQueue as CacheMultiQueue).removePrefix(keys))) } return ResponseEntity.ok(KeysResponse(keys)) } diff --git a/src/main/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettings.kt b/src/main/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettings.kt index cfb52b7..383230b 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettings.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettings.kt @@ -36,12 +36,17 @@ class MessageQueueSettings * Start redis related properties */ private const val REDIS: String = "redis" - const val REDIS_PREFIX: String = "$MESSAGE_QUEUE.$REDIS.prefix" - const val REDIS_ENDPOINT: String = "$MESSAGE_QUEUE.$REDIS.endpoint" + const val REDIS_ENDPOINT_DEFAULT: String = "127.0.0.1" const val REDIS_USERNAME: String = "$MESSAGE_QUEUE.$REDIS.username" const val REDIS_PASSWORD: String = "$MESSAGE_QUEUE.$REDIS.password" + // Used by memcached + const val CACHE_PREFIX: String = "$MESSAGE_QUEUE.cache.prefix" + // Used by memcached + const val CACHE_ENDPOINT: String = "$MESSAGE_QUEUE.cache.endpoint" + const val CACHE_ENDPOINT_DEFAULT: String = "127.0.0.1" + // Redis sentinel related properties const val REDIS_MODE: String = "$MESSAGE_QUEUE.$REDIS.mode" const val REDIS_MODE_DEFAULT: String = "STANDALONE" @@ -120,38 +125,37 @@ class MessageQueueSettings /** - * `Optional` when [STORAGE_MEDIUM] is set to [StorageMedium.REDIS]. - * Uses the [REDIS_PREFIX] to set a prefix used for all redis entry keys. + * `Optional` when [STORAGE_MEDIUM] is set to [StorageMedium.MEMCACHED]. + * Uses the [CACHE_PREFIX] to set a prefix used for all cache entry keys. * - * E.g. if the initial value for the redis entry is "my-key" and no prefix is defined the entries would be stored under "my-key". + * E.g. if the initial value for the cache entry is "my-key" and no prefix is defined the entries would be stored under "my-key". * Using the same scenario if the prefix is "prefix" then the resultant key would be "prefixmy-key". */ - @Schema(title = "Redis Prefix", example = "my-prefix-", - description = "Used to remove/reduce the likelihood of any collisions if this is being used in an existing redis instance. " + - "The prefix will be added to all entries made in the redis storage medium.") - @SerializedName(REDIS_PREFIX) - @JsonProperty(REDIS_PREFIX) - @Value("\${$REDIS_PREFIX:}") + @Schema(title = "Cache Prefix", example = "my-prefix-", + description = "Used to remove/reduce the likelihood of any collisions if this is being used in an existing cache instance. " + + "The prefix will be added to all entries made in the cache storage medium.") + @SerializedName(CACHE_PREFIX) + @JsonProperty(CACHE_PREFIX) + @Value("\${$CACHE_PREFIX:}") @get:Generated @set:Generated - lateinit var redisPrefix: String + lateinit var cachePrefix: String /** - * `Required` when [STORAGE_MEDIUM] is set to [StorageMedium.REDIS]. - * The input endpoint string which is used for both standalone and the sentinel redis configurations. - * This supports a comma separated list or single definition of a redis endpoint in the following formats: + * `Required` when [STORAGE_MEDIUM] is set to [StorageMedium.REDIS] OR [StorageMedium.MEMCACHED]. + * This supports a comma separated list or single definition of a endpoints in the following formats: * `:,:,` * - * If not provided [REDIS_ENDPOINT_DEFAULT] will be used by default. + * If not provided [CACHE_ENDPOINT_DEFAULT] will be used by default. */ - @Schema(title = "Redis Endpoint", example = "sentinel1.com:5545,sentinel2.org:9980", - description = "The endpoint string which is used for both standalone and the sentinel redis configurations.") - @SerializedName(REDIS_ENDPOINT) - @JsonProperty(REDIS_ENDPOINT) - @Value("\${$REDIS_ENDPOINT:$REDIS_ENDPOINT_DEFAULT}") + @Schema(title = "Cache Endpoint", example = "sentinel1.com:5545,sentinel2.org:9980", + description = "The endpoint string which can contain multiple comma separated endpoints and ports.") + @SerializedName(CACHE_ENDPOINT) + @JsonProperty(CACHE_ENDPOINT) + @Value("\${$CACHE_ENDPOINT:$CACHE_ENDPOINT_DEFAULT}") @get:Generated @set:Generated - lateinit var redisEndpoint: String + lateinit var cacheEndpoint: String /** * `Optional` when [STORAGE_MEDIUM] is set to [StorageMedium.REDIS]. diff --git a/src/main/kotlin/au/kilemon/messagequeue/settings/StorageMedium.kt b/src/main/kotlin/au/kilemon/messagequeue/settings/StorageMedium.kt index db97125..97b6385 100644 --- a/src/main/kotlin/au/kilemon/messagequeue/settings/StorageMedium.kt +++ b/src/main/kotlin/au/kilemon/messagequeue/settings/StorageMedium.kt @@ -26,5 +26,10 @@ enum class StorageMedium /** * Initialise and connect to the defined `mongo` store. */ - MONGO; + MONGO, + + /** + * Will connect to the defined memcache instance to store queue messages. + */ + MEMCACHED; } diff --git a/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/memcached/MemcachedAuthenticatorTest.kt b/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/memcached/MemcachedAuthenticatorTest.kt new file mode 100644 index 0000000..f61a8f1 --- /dev/null +++ b/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/memcached/MemcachedAuthenticatorTest.kt @@ -0,0 +1,88 @@ +package au.kilemon.messagequeue.authentication.authenticator.cache.memcached + +import au.kilemon.messagequeue.authentication.authenticator.MultiQueueAuthenticatorTest +import au.kilemon.messagequeue.configuration.QueueConfiguration +import au.kilemon.messagequeue.configuration.cache.memcached.MemcachedConfiguration +import au.kilemon.messagequeue.logging.LoggingConfiguration +import au.kilemon.messagequeue.queue.MultiQueueTest +import au.kilemon.messagequeue.settings.MessageQueueSettings +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.extension.ExtendWith +import org.springframework.boot.test.util.TestPropertyValues +import org.springframework.context.ApplicationContextInitializer +import org.springframework.context.ConfigurableApplicationContext +import org.springframework.context.annotation.Import +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.TestPropertySource +import org.springframework.test.context.junit.jupiter.SpringExtension +import org.testcontainers.containers.GenericContainer +import org.testcontainers.junit.jupiter.Testcontainers +import org.testcontainers.utility.DockerImageName + + +/** + * A test class for [MemcachedAuthenticator]. + * + * @author github.com/Kilemonn + */ +@ExtendWith(SpringExtension::class) +@TestPropertySource(properties = ["${MessageQueueSettings.STORAGE_MEDIUM}=MEMCACHED"]) +@Testcontainers +@ContextConfiguration(initializers = [MemcachedAuthenticatorTest.Initializer::class]) +@Import(*[LoggingConfiguration::class, MemcachedConfiguration::class, QueueConfiguration::class, MultiQueueTest.MultiQueueTestConfiguration::class]) +class MemcachedAuthenticatorTest: MultiQueueAuthenticatorTest() +{ + companion object + { + private const val MEMCACHED_PORT: Int = 11211 + private const val MEMCACHED_CONTAINER: String = "memcached:1.6.41-alpine3.23" + + lateinit var memcache: GenericContainer<*> + + /** + * Stop the container at the end of all the tests. + */ + @AfterAll + @JvmStatic + fun afterClass() + { + memcache.stop() + } + } + + /** + * The test initialiser for [MemcachedAuthenticatorTest] to initialise the container and test properties. + * + * @author github.com/Kilemonn + */ + internal class Initializer : ApplicationContextInitializer + { + /** + * Force start the container, so we can place its host and dynamic ports into the system properties. + * + * Set the environment variables before any of the beans are initialised. + */ + override fun initialize(configurableApplicationContext: ConfigurableApplicationContext) + { + memcache = GenericContainer(DockerImageName.parse(MEMCACHED_CONTAINER)) + .withExposedPorts(MEMCACHED_PORT).withReuse(false) + memcache.start() + + TestPropertyValues.of( + "${MessageQueueSettings.CACHE_ENDPOINT}=${memcache.host}:${memcache.getMappedPort(MEMCACHED_PORT)}" + ).applyTo(configurableApplicationContext.environment) + } + } + + /** + * Check the container is running before each test. + */ + @BeforeEach + fun beforeEach() + { + Assertions.assertTrue(memcache.isRunning) + multiQueueAuthenticator.clearRestrictedSubQueues() + } +} diff --git a/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisClusterAuthenticatorTest.kt b/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisClusterAuthenticatorTest.kt index d9d71dc..72009cc 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisClusterAuthenticatorTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisClusterAuthenticatorTest.kt @@ -132,7 +132,7 @@ class RedisClusterAuthenticatorTest: MultiQueueAuthenticatorTest() val endpoints = StringBuilder() redisInstances.forEach { endpoints.append("$hostIp:${it.getMappedPort(RedisConfiguration.REDIS_DEFAULT_PORT.toInt())},") } TestPropertyValues.of( - "${MessageQueueSettings.REDIS_ENDPOINT}=${endpoints}", + "${MessageQueueSettings.CACHE_ENDPOINT}=${endpoints}", "${MessageQueueSettings.REDIS_MODE}=${RedisMode.CLUSTER.name}" ).applyTo(configurableApplicationContext.environment) } diff --git a/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisSentinelAuthenticatorTest.kt b/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisSentinelAuthenticatorTest.kt index 8126204..0edc5a8 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisSentinelAuthenticatorTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisSentinelAuthenticatorTest.kt @@ -83,7 +83,7 @@ class RedisSentinelAuthenticatorTest: MultiQueueAuthenticatorTest() sentinel.start() TestPropertyValues.of( - "${MessageQueueSettings.REDIS_ENDPOINT}=${sentinel.host}:${sentinel.getMappedPort(RedisConfiguration.REDIS_SENTINEL_DEFAULT_PORT.toInt())}", + "${MessageQueueSettings.CACHE_ENDPOINT}=${sentinel.host}:${sentinel.getMappedPort(RedisConfiguration.REDIS_SENTINEL_DEFAULT_PORT.toInt())}", "${MessageQueueSettings.REDIS_MODE}=${RedisMode.SENTINEL.name}" ).applyTo(configurableApplicationContext.environment) } diff --git a/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisStandAloneAuthenticatorTest.kt b/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisStandAloneAuthenticatorTest.kt index fe77f31..6d34d68 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisStandAloneAuthenticatorTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/authentication/authenticator/cache/redis/RedisStandAloneAuthenticatorTest.kt @@ -27,7 +27,7 @@ import org.testcontainers.utility.DockerImageName * @author github.com/Kilemonn */ @ExtendWith(SpringExtension::class) -@TestPropertySource(properties = ["${MessageQueueSettings.STORAGE_MEDIUM}=REDIS", "${MessageQueueSettings.REDIS_PREFIX}=test"]) +@TestPropertySource(properties = ["${MessageQueueSettings.STORAGE_MEDIUM}=REDIS", "${MessageQueueSettings.CACHE_PREFIX}=test"]) @Testcontainers @ContextConfiguration(initializers = [RedisStandAloneAuthenticatorTest.Initializer::class]) @Import(*[QueueConfiguration::class, LoggingConfiguration::class, RedisConfiguration::class, MultiQueueTest.MultiQueueTestConfiguration::class]) @@ -70,7 +70,7 @@ class RedisStandAloneAuthenticatorTest: MultiQueueAuthenticatorTest() redis.start() TestPropertyValues.of( - "${MessageQueueSettings.REDIS_ENDPOINT}=${redis.host}:${redis.getMappedPort(REDIS_PORT)}" + "${MessageQueueSettings.CACHE_ENDPOINT}=${redis.host}:${redis.getMappedPort(REDIS_PORT)}" ).applyTo(configurableApplicationContext.environment) } } diff --git a/src/test/kotlin/au/kilemon/messagequeue/configuration/cache/redis/RedisConfigurationTest.kt b/src/test/kotlin/au/kilemon/messagequeue/configuration/cache/redis/RedisConfigurationTest.kt index 867d662..2bd3849 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/configuration/cache/redis/RedisConfigurationTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/configuration/cache/redis/RedisConfigurationTest.kt @@ -134,8 +134,8 @@ class RedisConfigurationTest fun testGetConnectionFactory_sentinelWithNoEndpoints() { messageQueueSettings.redisMode = RedisMode.SENTINEL.name - messageQueueSettings.redisEndpoint = "" - Assertions.assertEquals("", messageQueueSettings.redisEndpoint) + messageQueueSettings.cacheEndpoint = "" + Assertions.assertEquals("", messageQueueSettings.cacheEndpoint) Assertions.assertThrows(RedisInitialisationException::class.java) { redisConfiguration.getSentinelConfiguration() } @@ -148,8 +148,8 @@ class RedisConfigurationTest fun testGetConnectionFactory_standAloneWithNoEndpoints() { messageQueueSettings.redisMode = RedisMode.STANDALONE.name - messageQueueSettings.redisEndpoint = "" - Assertions.assertEquals("", messageQueueSettings.redisEndpoint) + messageQueueSettings.cacheEndpoint = "" + Assertions.assertEquals("", messageQueueSettings.cacheEndpoint) Assertions.assertThrows(RedisInitialisationException::class.java) { redisConfiguration.getStandAloneConfiguration() } @@ -167,8 +167,9 @@ class RedisConfigurationTest val endpoint1 = "$endpoint1Host:$endpoint1Port" val endpoint2 = "redis:6789" val endpoints = "$endpoint1,$endpoint2" - messageQueueSettings.redisEndpoint = endpoints - Assertions.assertEquals(endpoints, messageQueueSettings.redisEndpoint) + + messageQueueSettings.cacheEndpoint = endpoints + Assertions.assertEquals(endpoints, messageQueueSettings.cacheEndpoint) val standAloneConfiguration = redisConfiguration.getStandAloneConfiguration() Assertions.assertEquals(endpoint1Host, standAloneConfiguration.hostName) Assertions.assertEquals(endpoint1Port.toInt(), standAloneConfiguration.port) diff --git a/src/test/kotlin/au/kilemon/messagequeue/queue/MultiQueueTest.kt b/src/test/kotlin/au/kilemon/messagequeue/queue/MultiQueueTest.kt index 44977f7..fd888b9 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/queue/MultiQueueTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/queue/MultiQueueTest.kt @@ -294,7 +294,7 @@ abstract class MultiQueueTest queue.forEach { message -> if (previousUuid != null) { - Assertions.assertTrue(previousUuid.compareTo(message.uuid) < 0) + Assertions.assertTrue(previousUuid < message.uuid) } previousUuid = message.uuid diff --git a/src/test/kotlin/au/kilemon/messagequeue/queue/cache/CacheMultiQueueTest.kt b/src/test/kotlin/au/kilemon/messagequeue/queue/cache/CacheMultiQueueTest.kt index ede662c..f2de9c2 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/queue/cache/CacheMultiQueueTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/queue/cache/CacheMultiQueueTest.kt @@ -80,6 +80,13 @@ abstract class CacheMultiQueueTest: MultiQueueTest() val keys = keyManager.getKeys() Assertions.assertEquals(keysSize, keys.size) + + val multiQueueKeys = multiQueue.keys() + IntStream.range(0, keysSize).forEach { i -> + val key = "$allSubQueuePrefix$keyPrefix$i" + Assertions.assertTrue(keys.contains(key)) + Assertions.assertTrue(multiQueueKeys.contains(key)) + } } /** diff --git a/src/test/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueueTest.kt b/src/test/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueueTest.kt new file mode 100644 index 0000000..1554bc4 --- /dev/null +++ b/src/test/kotlin/au/kilemon/messagequeue/queue/cache/memcached/MemcachedMultiQueueTest.kt @@ -0,0 +1,90 @@ +package au.kilemon.messagequeue.queue.cache.memcached + +import au.kilemon.messagequeue.configuration.QueueConfiguration +import au.kilemon.messagequeue.configuration.cache.memcached.MemcachedConfiguration +import au.kilemon.messagequeue.configuration.cache.redis.RedisConfiguration +import au.kilemon.messagequeue.logging.LoggingConfiguration +import au.kilemon.messagequeue.queue.MultiQueueTest +import au.kilemon.messagequeue.queue.cache.CacheMultiQueueTest +import au.kilemon.messagequeue.queue.cache.redis.RedisStandAloneMultiQueueTest +import au.kilemon.messagequeue.settings.MessageQueueSettings +import org.junit.jupiter.api.AfterAll +import org.junit.jupiter.api.Assertions +import org.junit.jupiter.api.BeforeEach +import org.junit.jupiter.api.extension.ExtendWith +import org.springframework.boot.test.util.TestPropertyValues +import org.springframework.context.ApplicationContextInitializer +import org.springframework.context.ConfigurableApplicationContext +import org.springframework.context.annotation.Import +import org.springframework.test.context.ContextConfiguration +import org.springframework.test.context.TestPropertySource +import org.springframework.test.context.junit.jupiter.SpringExtension +import org.testcontainers.containers.GenericContainer +import org.testcontainers.junit.jupiter.Testcontainers +import org.testcontainers.utility.DockerImageName + +/** + * A test class for the [MemcachedMultiQueue] `Component` class. + * + * @author github.com/Kilemonn + */ +@ExtendWith(SpringExtension::class) +@TestPropertySource(properties = ["${MessageQueueSettings.STORAGE_MEDIUM}=MEMCACHED", + "${MessageQueueSettings.CACHE_PREFIX}=test"]) +@Testcontainers +@ContextConfiguration(initializers = [MemcachedMultiQueueTest.Initializer::class]) +@Import(*[QueueConfiguration::class, LoggingConfiguration::class, MemcachedConfiguration::class, MultiQueueTest.MultiQueueTestConfiguration::class]) +class MemcachedMultiQueueTest: CacheMultiQueueTest() +{ + companion object + { + private const val MEMCACHED_PORT: Int = 11211 + private const val MEMCACHED_CONTAINER: String = "memcached:1.6.41-alpine3.23" + + lateinit var memcache: GenericContainer<*> + + /** + * Stop the container at the end of all the tests. + */ + @AfterAll + @JvmStatic + fun afterClass() + { + memcache.stop() + } + } + + /** + * The test initialiser for [MemcachedMultiQueueTest] to initialise the container and test properties. + * + * @author github.com/Kilemonn + */ + internal class Initializer : ApplicationContextInitializer + { + /** + * Force start the container, so we can place its host and dynamic ports into the system properties. + * + * Set the environment variables before any of the beans are initialised. + */ + override fun initialize(configurableApplicationContext: ConfigurableApplicationContext) + { + memcache = GenericContainer(DockerImageName.parse(MEMCACHED_CONTAINER)) + .withExposedPorts(MEMCACHED_PORT).withReuse(false) + memcache.start() + + TestPropertyValues.of( + "${MessageQueueSettings.CACHE_ENDPOINT}=${memcache.host}:${memcache.getMappedPort(MEMCACHED_PORT)}" + ).applyTo(configurableApplicationContext.environment) + } + } + + /** + * Check the container is running before each test. + */ + @BeforeEach + override fun beforeEach() + { + super.beforeEach() + Assertions.assertTrue(memcache.isRunning) + } +} diff --git a/src/test/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisClusterMultiQueueTest.kt b/src/test/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisClusterMultiQueueTest.kt index 352b155..1985340 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisClusterMultiQueueTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisClusterMultiQueueTest.kt @@ -134,7 +134,7 @@ class RedisClusterMultiQueueTest: CacheMultiQueueTest() val endpoints = StringBuilder() redisInstances.forEach { endpoints.append("$hostIp:${it.getMappedPort(RedisConfiguration.REDIS_DEFAULT_PORT.toInt())},") } TestPropertyValues.of( - "${MessageQueueSettings.REDIS_ENDPOINT}=${endpoints}", + "${MessageQueueSettings.CACHE_ENDPOINT}=${endpoints}", "${MessageQueueSettings.REDIS_MODE}=${RedisMode.CLUSTER.name}" ).applyTo(configurableApplicationContext.environment) } diff --git a/src/test/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisSentinelMultiQueueTest.kt b/src/test/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisSentinelMultiQueueTest.kt index f29a0d1..1d708c8 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisSentinelMultiQueueTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisSentinelMultiQueueTest.kt @@ -88,7 +88,7 @@ class RedisSentinelMultiQueueTest: CacheMultiQueueTest() sentinel.start() TestPropertyValues.of( - "${MessageQueueSettings.REDIS_ENDPOINT}=${sentinel.host}:${sentinel.getMappedPort(RedisConfiguration.REDIS_SENTINEL_DEFAULT_PORT.toInt())}", + "${MessageQueueSettings.CACHE_ENDPOINT}=${sentinel.host}:${sentinel.getMappedPort(RedisConfiguration.REDIS_SENTINEL_DEFAULT_PORT.toInt())}", "${MessageQueueSettings.REDIS_MODE}=${RedisMode.SENTINEL.name}" ).applyTo(configurableApplicationContext.environment) } diff --git a/src/test/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisStandAloneMultiQueueTest.kt b/src/test/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisStandAloneMultiQueueTest.kt index 90ac148..b71af48 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisStandAloneMultiQueueTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/queue/cache/redis/RedisStandAloneMultiQueueTest.kt @@ -42,7 +42,7 @@ import org.testcontainers.utility.DockerImageName * @author github.com/Kilemonn */ @ExtendWith(SpringExtension::class) -@TestPropertySource(properties = ["${MessageQueueSettings.STORAGE_MEDIUM}=REDIS", "${MessageQueueSettings.REDIS_PREFIX}=test"]) +@TestPropertySource(properties = ["${MessageQueueSettings.STORAGE_MEDIUM}=REDIS", "${MessageQueueSettings.CACHE_PREFIX}=test"]) @Testcontainers @ContextConfiguration(initializers = [RedisStandAloneMultiQueueTest.Initializer::class]) @Import(*[QueueConfiguration::class, LoggingConfiguration::class, RedisConfiguration::class, MultiQueueTest.MultiQueueTestConfiguration::class]) @@ -85,7 +85,7 @@ class RedisStandAloneMultiQueueTest: CacheMultiQueueTest() redis.start() TestPropertyValues.of( - "${MessageQueueSettings.REDIS_ENDPOINT}=${redis.host}:${redis.getMappedPort(REDIS_PORT)}" + "${MessageQueueSettings.CACHE_ENDPOINT}=${redis.host}:${redis.getMappedPort(REDIS_PORT)}" ).applyTo(configurableApplicationContext.environment) } } diff --git a/src/test/kotlin/au/kilemon/messagequeue/rest/controller/SettingsControllerTest.kt b/src/test/kotlin/au/kilemon/messagequeue/rest/controller/SettingsControllerTest.kt index 3501137..f36e85e 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/rest/controller/SettingsControllerTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/rest/controller/SettingsControllerTest.kt @@ -78,9 +78,10 @@ class SettingsControllerTest Assertions.assertEquals(StorageMedium.IN_MEMORY.toString(), settings.storageMedium) Assertions.assertEquals(RestrictionMode.NONE.toString(), settings.restrictionMode) - Assertions.assertTrue(settings.redisPrefix.isEmpty()) - Assertions.assertEquals(MessageQueueSettings.REDIS_ENDPOINT_DEFAULT, settings.redisEndpoint) + Assertions.assertTrue(settings.cachePrefix.isEmpty()) + Assertions.assertEquals(MessageQueueSettings.REDIS_ENDPOINT_DEFAULT, settings.cacheEndpoint) Assertions.assertEquals(RedisMode.STANDALONE.name, settings.redisMode) + Assertions.assertEquals(MessageQueueSettings.REDIS_MASTER_NAME_DEFAULT, settings.redisMasterName) Assertions.assertTrue(settings.sqlEndpoint.isEmpty()) diff --git a/src/test/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettingsDefaultTest.kt b/src/test/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettingsDefaultTest.kt index c382f56..244ff6e 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettingsDefaultTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettingsDefaultTest.kt @@ -46,8 +46,8 @@ class MessageQueueSettingsDefaultTest Assertions.assertEquals(MessageQueueSettings.STORAGE_MEDIUM_DEFAULT, messageQueueSettings.storageMedium) Assertions.assertEquals(MessageQueueSettings.RESTRICTION_MODE_DEFAULT, messageQueueSettings.restrictionMode) - Assertions.assertEquals(MessageQueueSettings.REDIS_ENDPOINT_DEFAULT, messageQueueSettings.redisEndpoint) - Assertions.assertEquals("", messageQueueSettings.redisPrefix) + Assertions.assertEquals(MessageQueueSettings.CACHE_ENDPOINT_DEFAULT, messageQueueSettings.cacheEndpoint) + Assertions.assertEquals("", messageQueueSettings.cachePrefix) Assertions.assertEquals(MessageQueueSettings.REDIS_MASTER_NAME_DEFAULT, messageQueueSettings.redisMasterName) Assertions.assertEquals(RedisMode.STANDALONE.name, messageQueueSettings.redisMode) diff --git a/src/test/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettingsTest.kt b/src/test/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettingsTest.kt index a73af64..b7d210f 100644 --- a/src/test/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettingsTest.kt +++ b/src/test/kotlin/au/kilemon/messagequeue/settings/MessageQueueSettingsTest.kt @@ -18,8 +18,8 @@ import org.springframework.test.context.junit.jupiter.SpringExtension @ExtendWith(SpringExtension::class) @TestPropertySource(properties = [ "${MessageQueueSettings.STORAGE_MEDIUM}=REDIS", - "${MessageQueueSettings.REDIS_ENDPOINT}=123.123.123.123", - "${MessageQueueSettings.REDIS_PREFIX}=redis", + "${MessageQueueSettings.CACHE_ENDPOINT}=123.123.123.123", + "${MessageQueueSettings.CACHE_PREFIX}=redis", "${MessageQueueSettings.REDIS_MODE}=SENTINEL", "${MessageQueueSettings.REDIS_MASTER_NAME}=master" ]) @@ -52,8 +52,8 @@ class MessageQueueSettingsTest { Assertions.assertNotNull(messageQueueSettings) Assertions.assertEquals("REDIS", messageQueueSettings.storageMedium) - Assertions.assertEquals("123.123.123.123", messageQueueSettings.redisEndpoint) - Assertions.assertEquals("redis", messageQueueSettings.redisPrefix) + Assertions.assertEquals("123.123.123.123", messageQueueSettings.cacheEndpoint) + Assertions.assertEquals("redis", messageQueueSettings.cachePrefix) Assertions.assertEquals("master", messageQueueSettings.redisMasterName) Assertions.assertEquals("SENTINEL", messageQueueSettings.redisMode) }