Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ dependencies {
// JPA dependency
implementation("org.springframework.boot:spring-boot-starter-data-jpa:${springVersion}")

// Memcached
// https://mvnrepository.com/artifact/com.googlecode.xmemcached/xmemcached
implementation("com.googlecode.xmemcached:xmemcached:2.4.8")

// https://mvnrepository.com/artifact/org.springdoc/springdoc-openapi-starter-webmvc-ui
implementation("org.springdoc:springdoc-openapi-starter-webmvc-api:${springDocVersion}")
// https://mvnrepository.com/artifact/org.springdoc/springdoc-openapi-starter-webmvc-ui
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package au.kilemon.messagequeue.authentication.authenticator.cache.memcached

import au.kilemon.messagequeue.authentication.AuthenticationMatrix
import au.kilemon.messagequeue.authentication.authenticator.MultiQueueAuthenticator
import au.kilemon.messagequeue.authentication.authenticator.cache.redis.RedisAuthenticator.Companion.RESTRICTED_KEY
import net.rubyeye.xmemcached.MemcachedClient
import org.slf4j.Logger
import org.springframework.beans.factory.annotation.Autowired
import java.util.stream.Collectors

/**
* A [MultiQueueAuthenticator] implementation using Memcached as the storage mechanism for the restricted
* sub-queue identifiers.
*
* @author github.com/Kilemonn
*/
class MemcachedAuthenticator: MultiQueueAuthenticator()
{
override val LOG: Logger = this.initialiseLogger()

@Autowired
private lateinit var client: MemcachedClient

override fun isRestrictedInternal(subQueue: String): Boolean
{
return getMembersSet().contains(AuthenticationMatrix(subQueue))
}

override fun addRestrictedEntryInternal(subQueue: String)
{
val restricted = getMembersSet()
val added = restricted.add(AuthenticationMatrix(subQueue))
if (added)
{
client.set(RESTRICTED_KEY, 0, restricted)
}
}

override fun removeRestrictionInternal(subQueue: String): Boolean
{
val restricted = getMembersSet()
val removed = restricted.remove(AuthenticationMatrix(subQueue))
if (removed)
{
client.set(RESTRICTED_KEY, 0, restricted)
}
return removed
}

/**
* Overriding to completely remove all access to the [RESTRICTED_KEY].
* Only if [isInNoneMode] returns `false`.
*/
override fun getReservedSubQueues(): Set<String>
{
if (!isInNoneMode())
{
return setOf(RESTRICTED_KEY)
}
return setOf()
}

override fun getRestrictedSubQueueIdentifiers(): Set<String>
{
return getMembersSet().stream().map { authMatrix -> authMatrix.subQueue }.collect(Collectors.toSet())
}

private fun getMembersSet(): HashSet<AuthenticationMatrix>
{
return client.get<HashSet<AuthenticationMatrix>>(RESTRICTED_KEY) ?: HashSet()
}

override fun clearRestrictedSubQueues(): Long
{
val members = getMembersSet()
val existingMembersSize = members.size.toLong()
client.delete(RESTRICTED_KEY)
return existingMembersSize
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package au.kilemon.messagequeue.configuration
import au.kilemon.messagequeue.MessageQueueApplication
import au.kilemon.messagequeue.authentication.RestrictionMode
import au.kilemon.messagequeue.authentication.authenticator.MultiQueueAuthenticator
import au.kilemon.messagequeue.authentication.authenticator.cache.memcached.MemcachedAuthenticator
import au.kilemon.messagequeue.authentication.authenticator.cache.redis.RedisAuthenticator
import au.kilemon.messagequeue.authentication.authenticator.inmemory.InMemoryAuthenticator
import au.kilemon.messagequeue.authentication.authenticator.nosql.mongo.MongoAuthenticator
Expand All @@ -13,6 +14,7 @@ import au.kilemon.messagequeue.logging.Messages
import au.kilemon.messagequeue.message.QueueMessage
import au.kilemon.messagequeue.queue.MultiQueue
import au.kilemon.messagequeue.queue.cache.CacheKeyManager
import au.kilemon.messagequeue.queue.cache.memcached.MemcachedMultiQueue
import au.kilemon.messagequeue.queue.cache.redis.RedisMultiQueue
import au.kilemon.messagequeue.queue.inmemory.InMemoryMultiQueue
import au.kilemon.messagequeue.queue.nosql.mongo.MongoMultiQueue
Expand Down Expand Up @@ -58,14 +60,17 @@ class QueueConfiguration : HasLogger
var queue: MultiQueue = InMemoryMultiQueue()
when (messageQueueSettings.storageMedium.uppercase()) {
StorageMedium.REDIS.toString() -> {
queue = RedisMultiQueue(messageQueueSettings.redisPrefix)
queue = RedisMultiQueue(messageQueueSettings.cachePrefix)
}
StorageMedium.SQL.toString() -> {
queue = SqlMultiQueue()
}
StorageMedium.MONGO.toString() -> {
queue = MongoMultiQueue()
}
StorageMedium.MEMCACHED.toString() -> {
queue = MemcachedMultiQueue(messageQueueSettings.cachePrefix)
}
}

LOG.info("Initialising [{}] queue as the [{}] is set to [{}].", queue::class.java.name, MessageQueueSettings.STORAGE_MEDIUM, messageQueueSettings.storageMedium)
Expand Down Expand Up @@ -115,14 +120,17 @@ class QueueConfiguration : HasLogger
var authenticator: MultiQueueAuthenticator = InMemoryAuthenticator()
when (messageQueueSettings.storageMedium.uppercase()) {
StorageMedium.REDIS.toString() -> {
authenticator = RedisAuthenticator(messageQueueSettings.redisPrefix)
authenticator = RedisAuthenticator(messageQueueSettings.cachePrefix)
}
StorageMedium.SQL.toString() -> {
authenticator = SqlAuthenticator()
}
StorageMedium.MONGO.toString() -> {
authenticator = MongoAuthenticator()
}
StorageMedium.MEMCACHED.toString() -> {
authenticator = MemcachedAuthenticator()
}
}

LOG.info("Initialising [{}] authenticator as the [{}] is set to [{}].", authenticator::class.java.name, MessageQueueSettings.STORAGE_MEDIUM, messageQueueSettings.storageMedium)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package au.kilemon.messagequeue.configuration.cache.memcached

import au.kilemon.messagequeue.configuration.cache.redis.RedisConfiguration
import au.kilemon.messagequeue.logging.HasLogger
import au.kilemon.messagequeue.queue.cache.memcached.MemcachedCacheKeyManager
import au.kilemon.messagequeue.settings.MessageQueueSettings
import net.rubyeye.xmemcached.MemcachedClient
import net.rubyeye.xmemcached.XMemcachedClientBuilder
import org.slf4j.Logger
import org.springframework.beans.factory.annotation.Autowired
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.data.redis.connection.RedisConnectionFactory
import org.springframework.data.redis.connection.RedisSentinelConfiguration
import org.springframework.data.redis.connection.RedisStandaloneConfiguration

/**
* A class that creates the required [Bean] objects when memcached is enabled.
*
* @author github.com/Kilemonn
*/
@Configuration
class MemcachedConfiguration: HasLogger
{
override val LOG: Logger = this.initialiseLogger()

companion object
{
const val MEMCACHED_DEFAULT_PORT: String = "11211"
}

@Autowired
private lateinit var messageQueueSettings: MessageQueueSettings

/**
* Create the [MemcachedClient] based on the loaded configuration.
*/
@Bean
@ConditionalOnProperty(name=[MessageQueueSettings.STORAGE_MEDIUM], havingValue="MEMCACHED")
fun getMemcachedClient(): MemcachedClient
{
LOG.info("Initialising memcached configuration with the following configuration: Endpoint(s) [{}]. With prefix [{}].",
messageQueueSettings.cacheEndpoint, messageQueueSettings.cachePrefix)

val builder = XMemcachedClientBuilder(RedisConfiguration.stringToInetSocketAddresses(messageQueueSettings.cacheEndpoint, MEMCACHED_DEFAULT_PORT))
return builder.build()
}

@Bean
@ConditionalOnProperty(name=[MessageQueueSettings.STORAGE_MEDIUM], havingValue="MEMCACHED")
fun getMemcachedCacheKeyManager(): MemcachedCacheKeyManager
{
return MemcachedCacheKeyManager()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,11 @@ class RedisConfiguration: HasLogger

/**
* Create the [RedisConnectionFactory] based on the loaded configuration.
<<<<<<< HEAD
* If [MessageQueueSettings.redisMode] is `true` then multiple endpoints are expected in [MessageQueueSettings.redisEndpoint] and will attempt to be parsed out
=======
* If [MessageQueueSettings.redisUseSentinels] is `true` then multiple endpoints are expected in [MessageQueueSettings.cacheEndpoint] and will attempt to be parsed out
>>>>>>> 36e6084 (Work through adding memcached support. Implementing most methods the same way redis is implemented for now.)
* and set into the [RedisSentinelConfiguration].
*
* Otherwise, the first endpoint and port provided will be used to create a [RedisStandaloneConfiguration].
Expand Down Expand Up @@ -148,10 +152,11 @@ class RedisConfiguration: HasLogger
fun getSentinelConfiguration(): RedisSentinelConfiguration
{
LOG.info("Initialising redis sentinel configuration with the following configuration: Endpoints {}, master {}. With prefix {}.",
messageQueueSettings.redisEndpoint, messageQueueSettings.redisMasterName, messageQueueSettings.redisPrefix)
messageQueueSettings.cacheEndpoint, messageQueueSettings.redisMasterName, messageQueueSettings.cachePrefix)

val redisSentinelConfiguration = RedisSentinelConfiguration()
redisSentinelConfiguration.master(messageQueueSettings.redisMasterName)
val sentinelEndpoints = stringToInetSocketAddresses(messageQueueSettings.redisEndpoint, REDIS_SENTINEL_DEFAULT_PORT)
val sentinelEndpoints = stringToInetSocketAddresses(messageQueueSettings.cacheEndpoint, REDIS_SENTINEL_DEFAULT_PORT)

if (sentinelEndpoints.isEmpty())
{
Expand All @@ -173,9 +178,10 @@ class RedisConfiguration: HasLogger
fun getStandAloneConfiguration(): RedisStandaloneConfiguration
{
LOG.info("Initialising redis standalone configuration with the following configuration: Endpoint [{}], prefix [{}].",
messageQueueSettings.redisEndpoint, messageQueueSettings.redisPrefix)
messageQueueSettings.cacheEndpoint, messageQueueSettings.cachePrefix)

val redisConfiguration = RedisStandaloneConfiguration()
val redisEndpoints = stringToInetSocketAddresses(messageQueueSettings.redisEndpoint, REDIS_DEFAULT_PORT)
val redisEndpoints = stringToInetSocketAddresses(messageQueueSettings.cacheEndpoint, REDIS_DEFAULT_PORT)
if (redisEndpoints.isEmpty())
{
LOG.error("No redis endpoints defined for standalone configuration. Unable to initialise redis configuration.")
Expand All @@ -194,10 +200,10 @@ class RedisConfiguration: HasLogger
fun getClusterConfiguration(): RedisClusterConfiguration
{
LOG.info("Initialising redis cluster configuration with the following configuration: Endpoint [{}], prefix [{}].",
messageQueueSettings.redisEndpoint, messageQueueSettings.redisPrefix)
messageQueueSettings.cacheEndpoint, messageQueueSettings.cachePrefix)

val configuration = RedisClusterConfiguration()
val nodes = endpointToNodes(messageQueueSettings.redisEndpoint)
val nodes = endpointToNodes(messageQueueSettings.cacheEndpoint)
configuration.setClusterNodes(nodes)

return configuration
Expand Down Expand Up @@ -259,7 +265,7 @@ class RedisConfiguration: HasLogger
return template
}

@Bean
@Bean(name=["RedisCacheKeyManagerTemplate"])
@ConditionalOnProperty(name=[MessageQueueSettings.STORAGE_MEDIUM], havingValue="REDIS")
fun getRedisCacheKeyManagerRedisTemplate(): RedisTemplate<String, String>
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ abstract class CacheKeyManager(protected val prefix: String = "")
const val CACHE_KEYS_KEY: String = "messagequeue-cache-keys"
}

protected fun getReservedKey(): String
{
return "$prefix$CACHE_KEYS_KEY"
}

fun getReservedKeys(): Set<String>
{
return setOf("$prefix$CACHE_KEYS_KEY")
return setOf(getReservedKey())
}

abstract fun add(key: String)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package au.kilemon.messagequeue.queue.cache.memcached

import au.kilemon.messagequeue.queue.cache.CacheKeyManager
import net.rubyeye.xmemcached.MemcachedClient
import org.springframework.beans.factory.annotation.Autowired

/**
* To optimise how we determine the key list/sub queue list when using a cache, this class is used to store and
* manage the subqueue list for the [MemcachedMultiQueue].
*
* @author github.com/Kilemonn
*/
class MemcachedCacheKeyManager: CacheKeyManager()
{
@Autowired
private lateinit var client: MemcachedClient

private fun persistUpdatedKeysSet(keys: HashSet<String>)
{
client.set(getReservedKey(), 0, keys)
}

override fun add(key: String)
{
val keys = getKeys()
keys.add(key)
persistUpdatedKeysSet(keys)
}

override fun remove(key: String)
{
val keys = getKeys()
if (keys.remove(key))
{
persistUpdatedKeysSet(keys)
}
}

override fun getKeys(): HashSet<String>
{
var keys: HashSet<String>? = client.get(getReservedKey())
if (keys == null)
{
keys = HashSet()
}
return keys
}

override fun contains(key: String): Boolean
{
return getKeys().contains(key)
}

override fun clear()
{
persistUpdatedKeysSet(HashSet())
}

}
Loading
Loading