From 75d3e2445b3dbe4bc4f93bad2c0a829ad034d79d Mon Sep 17 00:00:00 2001 From: Alice Lottini Date: Wed, 7 Dec 2022 18:05:49 +0000 Subject: [PATCH 01/10] Added PS limit test for cache and index maps --- proxy/pkg/zdmproxy/pscache_test.go | 160 +++++++++++++++++++++++++++++ 1 file changed, 160 insertions(+) create mode 100644 proxy/pkg/zdmproxy/pscache_test.go diff --git a/proxy/pkg/zdmproxy/pscache_test.go b/proxy/pkg/zdmproxy/pscache_test.go new file mode 100644 index 00000000..2f6de6d4 --- /dev/null +++ b/proxy/pkg/zdmproxy/pscache_test.go @@ -0,0 +1,160 @@ +package zdmproxy + +import ( + "fmt" + "github.com/datastax/go-cassandra-native-protocol/message" + "github.com/stretchr/testify/require" + "testing" +) + +const MAX_PS_CACHE_SIZE_FOR_TESTS = 10 +const MAX_INTERCEPTED_PS_CACHE_SIZE_FOR_TESTS = 10 +const ORIGIN_ID_PREFIX = "originId_" +const TARGET_ID_PREFIX = "targetId_" + +func TestPreparedStatementCache_Store(t *testing.T) { + + tests := []struct{ + name string + numElementsToAdd int + elementSuffixesToAccess []int + expectedCacheSize int + expectedElementSuffixesInCache []int + }{ + { + name: "insert less elements than capacity, nothing accessed, nothing evicted", + numElementsToAdd: 9, + elementSuffixesToAccess: []int{}, + expectedCacheSize: 9, + expectedElementSuffixesInCache: []int{0,1,2,3,4,5,6,7,8}, + }, + { + name: "insert as many elements as capacity, nothing accessed, nothing evicted", + numElementsToAdd: 10, + elementSuffixesToAccess: []int{}, + expectedCacheSize: 10, + expectedElementSuffixesInCache: []int{0,1,2,3,4,5,6,7,8,9}, + }, + { + name: "insert more elements than capacity, nothing accessed, overflowing oldest ones should be evicted", + numElementsToAdd: 13, + elementSuffixesToAccess: []int{}, + expectedCacheSize: MAX_PS_CACHE_SIZE_FOR_TESTS, + expectedElementSuffixesInCache: []int{3,4,5,6,7,8,9,10,11,12}, + }, + { + name: "insert more elements than capacity, only recent ones accessed, overflowing oldest ones should be evicted", + numElementsToAdd: 13, + elementSuffixesToAccess: []int{5,7,9}, + expectedCacheSize: MAX_PS_CACHE_SIZE_FOR_TESTS, + expectedElementSuffixesInCache: []int{3,4,5,6,7,8,9,10,11,12}, + }, + { + name: "insert more elements than capacity, overflowing oldest ones accessed, non-accessed oldest ones should be evicted", + numElementsToAdd: 13, + elementSuffixesToAccess: []int{0,2}, + expectedCacheSize: MAX_PS_CACHE_SIZE_FOR_TESTS, + expectedElementSuffixesInCache: []int{0,2,5,6,7,8,9,10,11,12}, + }, + { + name: "insert more elements than capacity, overflowing oldest and recent ones accessed, non-accessed oldest ones should be evicted", + numElementsToAdd: 13, + elementSuffixesToAccess: []int{0,2,3,8}, + expectedCacheSize: MAX_PS_CACHE_SIZE_FOR_TESTS, + expectedElementSuffixesInCache: []int{0,2,3,6,7,8,9,10,11,12}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + psCache := NewPreparedStatementCache() + + if test.numElementsToAdd < MAX_PS_CACHE_SIZE_FOR_TESTS { + // no overflow or evictions, just insert all elements + for i := 0; i < test.numElementsToAdd; i++ { + originPreparedResult := &message.PreparedResult{ + PreparedQueryId: []byte(fmt.Sprint(ORIGIN_ID_PREFIX, i)), + } + targetPreparedResult := &message.PreparedResult{ + PreparedQueryId: []byte(fmt.Sprint(TARGET_ID_PREFIX, i)), + } + psCache.Store(originPreparedResult, targetPreparedResult, nil) + } + } else { + // fill the cache + for i := 0; i < MAX_PS_CACHE_SIZE_FOR_TESTS; i++ { + originPreparedResult := &message.PreparedResult{ + PreparedQueryId: []byte(fmt.Sprint(ORIGIN_ID_PREFIX, i)), + } + targetPreparedResult := &message.PreparedResult{ + PreparedQueryId: []byte(fmt.Sprint(TARGET_ID_PREFIX, i)), + } + psCache.Store(originPreparedResult, targetPreparedResult, nil) + } + + // access the specified elements + for elementSuffix := range test.elementSuffixesToAccess { + // access the specified elements to make them recently used + foundInOriginMap := checkIfElementIsInOriginMap(psCache, elementSuffix) + require.True(tt, foundInOriginMap, "element could not be found in origin map", elementSuffix) + foundInTargetMap := checkIfElementIsInTargetMap(psCache, elementSuffix) + require.True(tt, foundInTargetMap, "element could not be found in target map", elementSuffix) + } + + // add more elements + for i := MAX_PS_CACHE_SIZE_FOR_TESTS; i < test.numElementsToAdd; i++ { + originPreparedResult := &message.PreparedResult{ + PreparedQueryId: []byte(fmt.Sprint(ORIGIN_ID_PREFIX, i)), + } + targetPreparedResult := &message.PreparedResult{ + PreparedQueryId: []byte(fmt.Sprint(TARGET_ID_PREFIX, i)), + } + psCache.Store(originPreparedResult, targetPreparedResult, nil) + } + } + + require.Equal(tt, test.expectedCacheSize, len(psCache.cache)) + require.Equal(tt, test.expectedCacheSize, len(psCache.index)) + require.Equal(tt, 0, len(psCache.interceptedCache)) + require.Equal(tt, float64(test.expectedCacheSize), psCache.GetPreparedStatementCacheSize()) + + for elementSuffix := range test.expectedElementSuffixesInCache { + foundInOriginMap := checkIfElementIsInOriginMap(psCache, elementSuffix) + require.True(tt, foundInOriginMap, "element could not be found in origin map", elementSuffix) + foundInTargetMap := checkIfElementIsInTargetMap(psCache, elementSuffix) + require.True(tt, foundInTargetMap, "element could not be found in target map", elementSuffix) + } + + }) + } + +} + +func checkIfElementIsInOriginMap(psCache *PreparedStatementCache, elementSuffix int) bool { + originId := fmt.Sprint(ORIGIN_ID_PREFIX, elementSuffix) + _, foundOriginId := psCache.Get([]byte(originId)) + return foundOriginId +} + +func checkIfElementIsInTargetMap(psCache *PreparedStatementCache, elementSuffix int) bool { + targetId := fmt.Sprint(TARGET_ID_PREFIX, elementSuffix) + _, foundTargetId := psCache.GetByTargetPreparedId([]byte(targetId)) + return foundTargetId +} + +func TestPreparedStatementCache_StoreIntercepted(t *testing.T) { + +} + +func TestPreparedStatementCache_Get(t *testing.T) { + +} + +func TestPreparedStatementCache_GetByTargetPreparedId(t *testing.T) { + +} + +func TestPreparedStatementCache_GetPreparedStatementCacheSize(t *testing.T) { + +} + From 193eedf99e612535ca1d3874b2c5576ecafbcd07 Mon Sep 17 00:00:00 2001 From: Alice Lottini Date: Wed, 18 Jan 2023 15:39:01 +0000 Subject: [PATCH 02/10] Changed cache maps to lru.Cache. Introduced configuration parameter to set the max cache size. Fixed tests. # Conflicts: # go.mod # go.sum # proxy/pkg/config/config.go --- proxy/pkg/config/config.go | 11 +- proxy/pkg/zdmproxy/cqlparser_test.go | 16 +-- ..._utils_test.go => cqlparser_utils_test.go} | 8 +- proxy/pkg/zdmproxy/proxy.go | 5 +- proxy/pkg/zdmproxy/pscache.go | 61 +++++++--- proxy/pkg/zdmproxy/pscache_test.go | 113 +++++++++--------- 6 files changed, 123 insertions(+), 91 deletions(-) rename proxy/pkg/zdmproxy/{cqlparser_adv_workloads_utils_test.go => cqlparser_utils_test.go} (93%) diff --git a/proxy/pkg/config/config.go b/proxy/pkg/config/config.go index fe3cebdb..cde65e6f 100644 --- a/proxy/pkg/config/config.go +++ b/proxy/pkg/config/config.go @@ -74,11 +74,12 @@ type Config struct { // Proxy bucket - ProxyListenAddress string `default:"localhost" split_words:"true" yaml:"proxy_listen_address"` - ProxyListenPort int `default:"14002" split_words:"true" yaml:"proxy_listen_port"` - ProxyRequestTimeoutMs int `default:"10000" split_words:"true" yaml:"proxy_request_timeout_ms"` - ProxyMaxClientConnections int `default:"1000" split_words:"true" yaml:"proxy_max_client_connections"` - ProxyMaxStreamIds int `default:"2048" split_words:"true" yaml:"proxy_max_stream_ids"` + ProxyListenAddress string `default:"localhost" split_words:"true" yaml:"proxy_listen_address"` + ProxyListenPort int `default:"14002" split_words:"true" yaml:"proxy_listen_port"` + ProxyRequestTimeoutMs int `default:"10000" split_words:"true" yaml:"proxy_request_timeout_ms"` + ProxyMaxClientConnections int `default:"1000" split_words:"true" yaml:"proxy_max_client_connections"` + ProxyMaxStreamIds int `default:"2048" split_words:"true" yaml:"proxy_max_stream_ids"` + ProxyMaxPreparedStatementCacheSize int `default:"5000" split_words:"true" yaml:"proxy_max_prepared_statement_cache_size"` ProxyTlsCaPath string `split_words:"true" yaml:"proxy_tls_ca_path"` ProxyTlsCertPath string `split_words:"true" yaml:"proxy_tls_cert_path"` diff --git a/proxy/pkg/zdmproxy/cqlparser_test.go b/proxy/pkg/zdmproxy/cqlparser_test.go index 1637e7d7..fb00b754 100644 --- a/proxy/pkg/zdmproxy/cqlparser_test.go +++ b/proxy/pkg/zdmproxy/cqlparser_test.go @@ -57,14 +57,14 @@ func TestInspectFrame(t *testing.T) { targetPreparedId: []byte("LOCAL"), prepareRequestInfo: NewPrepareRequestInfo(NewInterceptedRequestInfo(local, newStarSelectClause()), nil, false, "SELECT * FROM system.local", ""), } - psCache := NewPreparedStatementCache() - psCache.cache["BOTH"] = bothCacheEntry - psCache.cache["ORIGIN"] = originCacheEntry - psCache.cache["TARGET"] = targetCacheEntry - psCache.interceptedCache["PEERS"] = peersCacheEntry - psCache.interceptedCache["PEERS_KS"] = peersKsCacheEntry - psCache.interceptedCache["LOCAL"] = localCacheEntry - psCache.interceptedCache["LOCAL_KS"] = localKsCacheEntry + psCache := createPSCacheForTests(t) + psCache.cache.Add("BOTH", bothCacheEntry) + psCache.cache.Add("ORIGIN", originCacheEntry) + psCache.cache.Add("TARGET", targetCacheEntry) + psCache.interceptedCache.Add("PEERS", peersCacheEntry) + psCache.interceptedCache.Add("PEERS_KS", peersKsCacheEntry) + psCache.interceptedCache.Add("LOCAL", localCacheEntry) + psCache.interceptedCache.Add("LOCAL_KS", localKsCacheEntry) mh := newFakeMetricHandler() km := "" primaryClusterTarget := common.ClusterTypeTarget diff --git a/proxy/pkg/zdmproxy/cqlparser_adv_workloads_utils_test.go b/proxy/pkg/zdmproxy/cqlparser_utils_test.go similarity index 93% rename from proxy/pkg/zdmproxy/cqlparser_adv_workloads_utils_test.go rename to proxy/pkg/zdmproxy/cqlparser_utils_test.go index 5de43327..9c38dd99 100644 --- a/proxy/pkg/zdmproxy/cqlparser_adv_workloads_utils_test.go +++ b/proxy/pkg/zdmproxy/cqlparser_utils_test.go @@ -27,7 +27,7 @@ func getGeneralParamsForTests(t *testing.T) params { require.Nil(t, err) return params{ - psCache: NewPreparedStatementCache(), + psCache: createPSCacheForTests(t), mh: newFakeMetricHandler(), kn: "", primaryCluster: common.ClusterTypeOrigin, @@ -38,6 +38,12 @@ func getGeneralParamsForTests(t *testing.T) params { } } +func createPSCacheForTests(t *testing.T) *PreparedStatementCache { + psCache, err := NewPreparedStatementCache(1000) + require.Nil(t, err) + return psCache +} + func buildQueryMessageForTests(queryString string) *message.Query { var defaultTimestamp int64 = 1647023221311969 var serialConsistency = primitive.ConsistencyLevelLocalSerial diff --git a/proxy/pkg/zdmproxy/proxy.go b/proxy/pkg/zdmproxy/proxy.go index ffd4f1c2..39edbb1d 100644 --- a/proxy/pkg/zdmproxy/proxy.go +++ b/proxy/pkg/zdmproxy/proxy.go @@ -391,7 +391,10 @@ func (p *ZdmProxy) initializeGlobalStructures() error { p.globalClientHandlersWg = &sync.WaitGroup{} p.clientHandlersShutdownRequestCtx, p.clientHandlersShutdownRequestCancelFn = context.WithCancel(context.Background()) - p.PreparedStatementCache = NewPreparedStatementCache() + p.PreparedStatementCache, err = NewPreparedStatementCache(p.Conf.ProxyMaxPreparedStatementCacheSize) + if err != nil { + return err + } p.controlConnShutdownCtx, p.controlConnCancelFn = context.WithCancel(context.Background()) p.controlConnShutdownWg = &sync.WaitGroup{} diff --git a/proxy/pkg/zdmproxy/pscache.go b/proxy/pkg/zdmproxy/pscache.go index b3233431..01e56524 100644 --- a/proxy/pkg/zdmproxy/pscache.go +++ b/proxy/pkg/zdmproxy/pscache.go @@ -4,33 +4,50 @@ import ( "encoding/hex" "fmt" "github.com/datastax/go-cassandra-native-protocol/message" + lru "github.com/hashicorp/golang-lru" log "github.com/sirupsen/logrus" "sync" ) type PreparedStatementCache struct { - cache map[string]PreparedData // Map containing the prepared queries (raw bytes) keyed on prepareId - index map[string]string // Map that can be used as an index to look up origin prepareIds by target prepareId + cache *lru.Cache // Map containing the prepared queries (raw bytes) keyed on prepareId + index *lru.Cache // Map that can be used as an index to look up origin prepareIds by target prepareId - interceptedCache map[string]PreparedData // Map containing the prepared queries for intercepted requests + interceptedCache *lru.Cache // Map containing the prepared queries for intercepted requests lock *sync.RWMutex } -func NewPreparedStatementCache() *PreparedStatementCache { +func NewPreparedStatementCache(maxSize int) (*PreparedStatementCache, error) { + cache, err := lru.New(maxSize) + if err != nil { + return nil, fmt.Errorf("error initializing the PreparedStatementCache cache map: %v", err) + } + + index, err := lru.New(maxSize) + if err != nil { + return nil, fmt.Errorf("error initializing the PreparedStatementCache index map: %v", err) + } + + interceptedCache, err := lru.New(maxSize) + if err != nil { + return nil, fmt.Errorf("error initializing the PreparedStatementCache interceptedCache map: %v", err) + } + return &PreparedStatementCache{ - cache: make(map[string]PreparedData), - index: make(map[string]string), - interceptedCache: make(map[string]PreparedData), + cache: cache, + index: index, + interceptedCache: interceptedCache, lock: &sync.RWMutex{}, - } + }, nil } func (psc PreparedStatementCache) GetPreparedStatementCacheSize() float64 { psc.lock.RLock() defer psc.lock.RUnlock() - return float64(len(psc.cache) + len(psc.interceptedCache)) + //return float64(len(psc.cache) + len(psc.interceptedCache)) + return float64(psc.cache.Len() + psc.interceptedCache.Len()) } func (psc *PreparedStatementCache) Store( @@ -42,8 +59,8 @@ func (psc *PreparedStatementCache) Store( psc.lock.Lock() defer psc.lock.Unlock() - psc.cache[originPrepareIdStr] = NewPreparedData(originPreparedResult, targetPreparedResult, prepareRequestInfo) - psc.index[targetPrepareIdStr] = originPrepareIdStr + psc.cache.Add(originPrepareIdStr, NewPreparedData(originPreparedResult, targetPreparedResult, prepareRequestInfo)) + psc.index.Add(targetPrepareIdStr, originPrepareIdStr) log.Debugf("Storing PS cache entry: {OriginPreparedId=%v, TargetPreparedId: %v, RequestInfo: %v}", hex.EncodeToString(originPreparedResult.PreparedQueryId), hex.EncodeToString(targetPreparedResult.PreparedQueryId), prepareRequestInfo) @@ -55,7 +72,7 @@ func (psc *PreparedStatementCache) StoreIntercepted(preparedResult *message.Prep defer psc.lock.Unlock() preparedData := NewPreparedData(preparedResult, preparedResult, prepareRequestInfo) - psc.interceptedCache[prepareIdStr] = preparedData + psc.interceptedCache.Add(prepareIdStr, preparedData) log.Debugf("Storing intercepted PS cache entry: {PreparedId=%v, RequestInfo: %v}", hex.EncodeToString(preparedResult.PreparedQueryId), prepareRequestInfo) @@ -64,31 +81,37 @@ func (psc *PreparedStatementCache) StoreIntercepted(preparedResult *message.Prep func (psc *PreparedStatementCache) Get(originPreparedId []byte) (PreparedData, bool) { psc.lock.RLock() defer psc.lock.RUnlock() - data, ok := psc.cache[string(originPreparedId)] - if !ok { - data, ok = psc.interceptedCache[string(originPreparedId)] + data, ok := psc.cache.Get(string(originPreparedId)) + if ok { + return data.(PreparedData), true } - return data, ok + + data, ok = psc.interceptedCache.Get(string(originPreparedId)) + if ok { + return data.(PreparedData), true + } + + return nil, false } func (psc *PreparedStatementCache) GetByTargetPreparedId(targetPreparedId []byte) (PreparedData, bool) { psc.lock.RLock() defer psc.lock.RUnlock() - originPreparedId, ok := psc.index[string(targetPreparedId)] + originPreparedId, ok := psc.index.Get(string(targetPreparedId)) if !ok { // Don't bother attempting a lookup on the intercepted cache because this method should only be used to handle UNPREPARED responses return nil, false } - data, ok := psc.cache[originPreparedId] + data, ok := psc.cache.Get(originPreparedId) if !ok { log.Errorf("Could not get prepared data by target id even though there is an entry on the index map. "+ "This is most likely a bug. OriginPreparedId = %v, TargetPreparedId = %v", originPreparedId, targetPreparedId) return nil, false } - return data, true + return data.(PreparedData), true } type PreparedData interface { diff --git a/proxy/pkg/zdmproxy/pscache_test.go b/proxy/pkg/zdmproxy/pscache_test.go index 2f6de6d4..bf16d7c1 100644 --- a/proxy/pkg/zdmproxy/pscache_test.go +++ b/proxy/pkg/zdmproxy/pscache_test.go @@ -7,93 +7,93 @@ import ( "testing" ) -const MAX_PS_CACHE_SIZE_FOR_TESTS = 10 -const MAX_INTERCEPTED_PS_CACHE_SIZE_FOR_TESTS = 10 -const ORIGIN_ID_PREFIX = "originId_" -const TARGET_ID_PREFIX = "targetId_" +const MaxPSCacheSizeForTests = 10 +const OriginIdPrefix = "originId_" +const TargetIdPrefix = "targetId_" func TestPreparedStatementCache_Store(t *testing.T) { - tests := []struct{ - name string - numElementsToAdd int - elementSuffixesToAccess []int - expectedCacheSize int + tests := []struct { + name string + numElementsToAdd int + elementSuffixesToAccess []int + expectedCacheSize int expectedElementSuffixesInCache []int }{ { - name: "insert less elements than capacity, nothing accessed, nothing evicted", - numElementsToAdd: 9, - elementSuffixesToAccess: []int{}, - expectedCacheSize: 9, - expectedElementSuffixesInCache: []int{0,1,2,3,4,5,6,7,8}, + name: "insert less elements than capacity, nothing accessed, nothing evicted", + numElementsToAdd: 9, + elementSuffixesToAccess: []int{}, + expectedCacheSize: 9, + expectedElementSuffixesInCache: []int{0, 1, 2, 3, 4, 5, 6, 7, 8}, }, { - name: "insert as many elements as capacity, nothing accessed, nothing evicted", - numElementsToAdd: 10, - elementSuffixesToAccess: []int{}, - expectedCacheSize: 10, - expectedElementSuffixesInCache: []int{0,1,2,3,4,5,6,7,8,9}, + name: "insert as many elements as capacity, nothing accessed, nothing evicted", + numElementsToAdd: 10, + elementSuffixesToAccess: []int{}, + expectedCacheSize: 10, + expectedElementSuffixesInCache: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, }, { - name: "insert more elements than capacity, nothing accessed, overflowing oldest ones should be evicted", - numElementsToAdd: 13, - elementSuffixesToAccess: []int{}, - expectedCacheSize: MAX_PS_CACHE_SIZE_FOR_TESTS, - expectedElementSuffixesInCache: []int{3,4,5,6,7,8,9,10,11,12}, + name: "insert more elements than capacity, nothing accessed, overflowing oldest ones should be evicted", + numElementsToAdd: 13, + elementSuffixesToAccess: []int{}, + expectedCacheSize: MaxPSCacheSizeForTests, + expectedElementSuffixesInCache: []int{3, 4, 5, 6, 7, 8, 9, 10, 11, 12}, }, { - name: "insert more elements than capacity, only recent ones accessed, overflowing oldest ones should be evicted", - numElementsToAdd: 13, - elementSuffixesToAccess: []int{5,7,9}, - expectedCacheSize: MAX_PS_CACHE_SIZE_FOR_TESTS, - expectedElementSuffixesInCache: []int{3,4,5,6,7,8,9,10,11,12}, + name: "insert more elements than capacity, only recent ones accessed, overflowing oldest ones should be evicted", + numElementsToAdd: 13, + elementSuffixesToAccess: []int{5, 7, 9}, + expectedCacheSize: MaxPSCacheSizeForTests, + expectedElementSuffixesInCache: []int{3, 4, 5, 6, 7, 8, 9, 10, 11, 12}, }, { - name: "insert more elements than capacity, overflowing oldest ones accessed, non-accessed oldest ones should be evicted", - numElementsToAdd: 13, - elementSuffixesToAccess: []int{0,2}, - expectedCacheSize: MAX_PS_CACHE_SIZE_FOR_TESTS, - expectedElementSuffixesInCache: []int{0,2,5,6,7,8,9,10,11,12}, + name: "insert more elements than capacity, overflowing oldest ones accessed, non-accessed oldest ones should be evicted", + numElementsToAdd: 13, + elementSuffixesToAccess: []int{0, 2}, + expectedCacheSize: MaxPSCacheSizeForTests, + expectedElementSuffixesInCache: []int{0, 2, 5, 6, 7, 8, 9, 10, 11, 12}, }, { - name: "insert more elements than capacity, overflowing oldest and recent ones accessed, non-accessed oldest ones should be evicted", - numElementsToAdd: 13, - elementSuffixesToAccess: []int{0,2,3,8}, - expectedCacheSize: MAX_PS_CACHE_SIZE_FOR_TESTS, - expectedElementSuffixesInCache: []int{0,2,3,6,7,8,9,10,11,12}, + name: "insert more elements than capacity, overflowing oldest and recent ones accessed, non-accessed oldest ones should be evicted", + numElementsToAdd: 13, + elementSuffixesToAccess: []int{0, 2, 3, 8}, + expectedCacheSize: MaxPSCacheSizeForTests, + expectedElementSuffixesInCache: []int{0, 2, 3, 6, 7, 8, 9, 10, 11, 12}, }, } for _, test := range tests { t.Run(test.name, func(tt *testing.T) { - psCache := NewPreparedStatementCache() + psCache, err := NewPreparedStatementCache(MaxPSCacheSizeForTests) + require.Nil(tt, err, "Error creating the PSCache", err) - if test.numElementsToAdd < MAX_PS_CACHE_SIZE_FOR_TESTS { + if test.numElementsToAdd < MaxPSCacheSizeForTests { // no overflow or evictions, just insert all elements for i := 0; i < test.numElementsToAdd; i++ { originPreparedResult := &message.PreparedResult{ - PreparedQueryId: []byte(fmt.Sprint(ORIGIN_ID_PREFIX, i)), + PreparedQueryId: []byte(fmt.Sprint(OriginIdPrefix, i)), } targetPreparedResult := &message.PreparedResult{ - PreparedQueryId: []byte(fmt.Sprint(TARGET_ID_PREFIX, i)), + PreparedQueryId: []byte(fmt.Sprint(TargetIdPrefix, i)), } psCache.Store(originPreparedResult, targetPreparedResult, nil) } } else { // fill the cache - for i := 0; i < MAX_PS_CACHE_SIZE_FOR_TESTS; i++ { + for i := 0; i < MaxPSCacheSizeForTests; i++ { originPreparedResult := &message.PreparedResult{ - PreparedQueryId: []byte(fmt.Sprint(ORIGIN_ID_PREFIX, i)), + PreparedQueryId: []byte(fmt.Sprint(OriginIdPrefix, i)), } targetPreparedResult := &message.PreparedResult{ - PreparedQueryId: []byte(fmt.Sprint(TARGET_ID_PREFIX, i)), + PreparedQueryId: []byte(fmt.Sprint(TargetIdPrefix, i)), } psCache.Store(originPreparedResult, targetPreparedResult, nil) } // access the specified elements - for elementSuffix := range test.elementSuffixesToAccess { + for _, elementSuffix := range test.elementSuffixesToAccess { // access the specified elements to make them recently used foundInOriginMap := checkIfElementIsInOriginMap(psCache, elementSuffix) require.True(tt, foundInOriginMap, "element could not be found in origin map", elementSuffix) @@ -102,23 +102,23 @@ func TestPreparedStatementCache_Store(t *testing.T) { } // add more elements - for i := MAX_PS_CACHE_SIZE_FOR_TESTS; i < test.numElementsToAdd; i++ { + for i := MaxPSCacheSizeForTests; i < test.numElementsToAdd; i++ { originPreparedResult := &message.PreparedResult{ - PreparedQueryId: []byte(fmt.Sprint(ORIGIN_ID_PREFIX, i)), + PreparedQueryId: []byte(fmt.Sprint(OriginIdPrefix, i)), } targetPreparedResult := &message.PreparedResult{ - PreparedQueryId: []byte(fmt.Sprint(TARGET_ID_PREFIX, i)), + PreparedQueryId: []byte(fmt.Sprint(TargetIdPrefix, i)), } psCache.Store(originPreparedResult, targetPreparedResult, nil) } } - require.Equal(tt, test.expectedCacheSize, len(psCache.cache)) - require.Equal(tt, test.expectedCacheSize, len(psCache.index)) - require.Equal(tt, 0, len(psCache.interceptedCache)) + require.Equal(tt, test.expectedCacheSize, psCache.cache.Len()) + require.Equal(tt, test.expectedCacheSize, psCache.index.Len()) + require.Equal(tt, 0, psCache.interceptedCache.Len()) require.Equal(tt, float64(test.expectedCacheSize), psCache.GetPreparedStatementCacheSize()) - for elementSuffix := range test.expectedElementSuffixesInCache { + for _, elementSuffix := range test.expectedElementSuffixesInCache { foundInOriginMap := checkIfElementIsInOriginMap(psCache, elementSuffix) require.True(tt, foundInOriginMap, "element could not be found in origin map", elementSuffix) foundInTargetMap := checkIfElementIsInTargetMap(psCache, elementSuffix) @@ -131,13 +131,13 @@ func TestPreparedStatementCache_Store(t *testing.T) { } func checkIfElementIsInOriginMap(psCache *PreparedStatementCache, elementSuffix int) bool { - originId := fmt.Sprint(ORIGIN_ID_PREFIX, elementSuffix) + originId := fmt.Sprint(OriginIdPrefix, elementSuffix) _, foundOriginId := psCache.Get([]byte(originId)) return foundOriginId } func checkIfElementIsInTargetMap(psCache *PreparedStatementCache, elementSuffix int) bool { - targetId := fmt.Sprint(TARGET_ID_PREFIX, elementSuffix) + targetId := fmt.Sprint(TargetIdPrefix, elementSuffix) _, foundTargetId := psCache.GetByTargetPreparedId([]byte(targetId)) return foundTargetId } @@ -157,4 +157,3 @@ func TestPreparedStatementCache_GetByTargetPreparedId(t *testing.T) { func TestPreparedStatementCache_GetPreparedStatementCacheSize(t *testing.T) { } - From 7a072fa5d9fa8e3700328ef18f0046c88772aa70 Mon Sep 17 00:00:00 2001 From: Alice Lottini Date: Wed, 18 Jan 2023 18:35:07 +0000 Subject: [PATCH 03/10] Added default for new configuration parameter in test cluster setup --- integration-tests/setup/testcluster.go | 1 + 1 file changed, 1 insertion(+) diff --git a/integration-tests/setup/testcluster.go b/integration-tests/setup/testcluster.go index b7336ecd..c266a19d 100644 --- a/integration-tests/setup/testcluster.go +++ b/integration-tests/setup/testcluster.go @@ -448,6 +448,7 @@ func NewTestConfig(originHost string, targetHost string) *config.Config { conf.ProxyMaxClientConnections = 1000 conf.ProxyMaxStreamIds = 2048 + conf.ProxyMaxPreparedStatementCacheSize = 5000 conf.RequestResponseMaxWorkers = -1 conf.WriteMaxWorkers = -1 From 77995c899121d6c6bc0baefba5a6c6e79670b9d6 Mon Sep 17 00:00:00 2001 From: Alice Lottini Date: Thu, 19 Jan 2023 13:32:18 +0000 Subject: [PATCH 04/10] Added test cases for the Get methods --- proxy/pkg/zdmproxy/pscache_test.go | 161 ++++++++++++++++++++++++----- 1 file changed, 137 insertions(+), 24 deletions(-) diff --git a/proxy/pkg/zdmproxy/pscache_test.go b/proxy/pkg/zdmproxy/pscache_test.go index bf16d7c1..d55f3051 100644 --- a/proxy/pkg/zdmproxy/pscache_test.go +++ b/proxy/pkg/zdmproxy/pscache_test.go @@ -10,60 +10,77 @@ import ( const MaxPSCacheSizeForTests = 10 const OriginIdPrefix = "originId_" const TargetIdPrefix = "targetId_" +const InterceptedIdPrefix = "interceptedId_" -func TestPreparedStatementCache_Store(t *testing.T) { +type CacheMapType string + +const ( + CacheMapTypeOrigin = CacheMapType("CACHE-ORIGIN") + CacheMapTypeTarget = CacheMapType("INDEX-TARGET") + CacheMapTypeIntercepted = CacheMapType("INTERCEPTED") + CacheMapTypeNone = CacheMapType("NONE") +) + +/* +* +This test has the sole purpose to verify that all three cache maps ("cache", "index" and "intercepted") honour the configured size limit and behave in an LRU fashion +It does not represent a realistic usage of the PS Cache: elements are added to all three cache maps, which would not normally happen, and the data inserted in it is intentionally dummy +*/ +func TestPreparedStatementCache_StoreIntoAllCacheMaps(t *testing.T) { tests := []struct { name string numElementsToAdd int elementSuffixesToAccess []int - expectedCacheSize int + expectedCacheMapSize int expectedElementSuffixesInCache []int }{ { name: "insert less elements than capacity, nothing accessed, nothing evicted", numElementsToAdd: 9, elementSuffixesToAccess: []int{}, - expectedCacheSize: 9, + expectedCacheMapSize: 9, expectedElementSuffixesInCache: []int{0, 1, 2, 3, 4, 5, 6, 7, 8}, }, { name: "insert as many elements as capacity, nothing accessed, nothing evicted", numElementsToAdd: 10, elementSuffixesToAccess: []int{}, - expectedCacheSize: 10, + expectedCacheMapSize: 10, expectedElementSuffixesInCache: []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, }, { name: "insert more elements than capacity, nothing accessed, overflowing oldest ones should be evicted", numElementsToAdd: 13, elementSuffixesToAccess: []int{}, - expectedCacheSize: MaxPSCacheSizeForTests, + expectedCacheMapSize: MaxPSCacheSizeForTests, expectedElementSuffixesInCache: []int{3, 4, 5, 6, 7, 8, 9, 10, 11, 12}, }, { name: "insert more elements than capacity, only recent ones accessed, overflowing oldest ones should be evicted", numElementsToAdd: 13, elementSuffixesToAccess: []int{5, 7, 9}, - expectedCacheSize: MaxPSCacheSizeForTests, + expectedCacheMapSize: MaxPSCacheSizeForTests, expectedElementSuffixesInCache: []int{3, 4, 5, 6, 7, 8, 9, 10, 11, 12}, }, { name: "insert more elements than capacity, overflowing oldest ones accessed, non-accessed oldest ones should be evicted", numElementsToAdd: 13, elementSuffixesToAccess: []int{0, 2}, - expectedCacheSize: MaxPSCacheSizeForTests, + expectedCacheMapSize: MaxPSCacheSizeForTests, expectedElementSuffixesInCache: []int{0, 2, 5, 6, 7, 8, 9, 10, 11, 12}, }, { name: "insert more elements than capacity, overflowing oldest and recent ones accessed, non-accessed oldest ones should be evicted", numElementsToAdd: 13, elementSuffixesToAccess: []int{0, 2, 3, 8}, - expectedCacheSize: MaxPSCacheSizeForTests, + expectedCacheMapSize: MaxPSCacheSizeForTests, expectedElementSuffixesInCache: []int{0, 2, 3, 6, 7, 8, 9, 10, 11, 12}, }, } + dummyPrepareRequestInfo := NewPrepareRequestInfo(NewGenericRequestInfo(forwardToBoth, false, false), []*term{}, false, "", "") + for _, test := range tests { t.Run(test.name, func(tt *testing.T) { psCache, err := NewPreparedStatementCache(MaxPSCacheSizeForTests) @@ -78,7 +95,12 @@ func TestPreparedStatementCache_Store(t *testing.T) { targetPreparedResult := &message.PreparedResult{ PreparedQueryId: []byte(fmt.Sprint(TargetIdPrefix, i)), } - psCache.Store(originPreparedResult, targetPreparedResult, nil) + psCache.Store(originPreparedResult, targetPreparedResult, dummyPrepareRequestInfo) + + interceptedPreparedResult := &message.PreparedResult{ + PreparedQueryId: []byte(fmt.Sprint(InterceptedIdPrefix, i)), + } + psCache.StoreIntercepted(interceptedPreparedResult, dummyPrepareRequestInfo) } } else { // fill the cache @@ -89,7 +111,12 @@ func TestPreparedStatementCache_Store(t *testing.T) { targetPreparedResult := &message.PreparedResult{ PreparedQueryId: []byte(fmt.Sprint(TargetIdPrefix, i)), } - psCache.Store(originPreparedResult, targetPreparedResult, nil) + psCache.Store(originPreparedResult, targetPreparedResult, dummyPrepareRequestInfo) + + interceptedPreparedResult := &message.PreparedResult{ + PreparedQueryId: []byte(fmt.Sprint(InterceptedIdPrefix, i)), + } + psCache.StoreIntercepted(interceptedPreparedResult, dummyPrepareRequestInfo) } // access the specified elements @@ -99,6 +126,8 @@ func TestPreparedStatementCache_Store(t *testing.T) { require.True(tt, foundInOriginMap, "element could not be found in origin map", elementSuffix) foundInTargetMap := checkIfElementIsInTargetMap(psCache, elementSuffix) require.True(tt, foundInTargetMap, "element could not be found in target map", elementSuffix) + foundInInterceptedMap := checkIfElementIsInInterceptedMap(psCache, elementSuffix) + require.True(tt, foundInInterceptedMap, "element could not be found in intercepted map", elementSuffix) } // add more elements @@ -109,20 +138,27 @@ func TestPreparedStatementCache_Store(t *testing.T) { targetPreparedResult := &message.PreparedResult{ PreparedQueryId: []byte(fmt.Sprint(TargetIdPrefix, i)), } - psCache.Store(originPreparedResult, targetPreparedResult, nil) + psCache.Store(originPreparedResult, targetPreparedResult, dummyPrepareRequestInfo) + + interceptedPreparedResult := &message.PreparedResult{ + PreparedQueryId: []byte(fmt.Sprint(InterceptedIdPrefix, i)), + } + psCache.StoreIntercepted(interceptedPreparedResult, dummyPrepareRequestInfo) } } - require.Equal(tt, test.expectedCacheSize, psCache.cache.Len()) - require.Equal(tt, test.expectedCacheSize, psCache.index.Len()) - require.Equal(tt, 0, psCache.interceptedCache.Len()) - require.Equal(tt, float64(test.expectedCacheSize), psCache.GetPreparedStatementCacheSize()) + require.Equal(tt, test.expectedCacheMapSize, psCache.cache.Len()) + require.Equal(tt, test.expectedCacheMapSize, psCache.index.Len()) + require.Equal(tt, test.expectedCacheMapSize, psCache.interceptedCache.Len()) + require.Equal(tt, float64(test.expectedCacheMapSize*2), psCache.GetPreparedStatementCacheSize()) for _, elementSuffix := range test.expectedElementSuffixesInCache { foundInOriginMap := checkIfElementIsInOriginMap(psCache, elementSuffix) require.True(tt, foundInOriginMap, "element could not be found in origin map", elementSuffix) foundInTargetMap := checkIfElementIsInTargetMap(psCache, elementSuffix) require.True(tt, foundInTargetMap, "element could not be found in target map", elementSuffix) + foundInInterceptedMap := checkIfElementIsInInterceptedMap(psCache, elementSuffix) + require.True(tt, foundInInterceptedMap, "element could not be found in intercepted map", elementSuffix) } }) @@ -132,28 +168,105 @@ func TestPreparedStatementCache_Store(t *testing.T) { func checkIfElementIsInOriginMap(psCache *PreparedStatementCache, elementSuffix int) bool { originId := fmt.Sprint(OriginIdPrefix, elementSuffix) - _, foundOriginId := psCache.Get([]byte(originId)) + // not using psCache.Get, which is tested separately + _, foundOriginId := psCache.cache.Get(originId) return foundOriginId } func checkIfElementIsInTargetMap(psCache *PreparedStatementCache, elementSuffix int) bool { targetId := fmt.Sprint(TargetIdPrefix, elementSuffix) - _, foundTargetId := psCache.GetByTargetPreparedId([]byte(targetId)) + // not using psCache.GetByTargetPreparedId, which is tested separately + _, foundTargetId := psCache.index.Get(targetId) return foundTargetId } -func TestPreparedStatementCache_StoreIntercepted(t *testing.T) { - +func checkIfElementIsInInterceptedMap(psCache *PreparedStatementCache, elementSuffix int) bool { + interceptedId := fmt.Sprint(InterceptedIdPrefix, elementSuffix) + // not using psCache.Get, which is tested separately + _, foundInterceptedId := psCache.interceptedCache.Get(interceptedId) + return foundInterceptedId } -func TestPreparedStatementCache_Get(t *testing.T) { +/** +This test focuses on ensuring that Get and GetByTargetPreparedId work correctly. +It inserts elements directly into the cache maps to avoid coupling this test to the logic in the PS Cache's store methods. +It uses dummy, non-realistic data. + */ +func TestPreparedStatementCache_GetFromCache(t *testing.T) { -} + tests := []struct { + name string + elementId string + cacheMapType CacheMapType + }{ + { + name: "Add to origin cache map, found by Get", + elementId: "someOriginId", + cacheMapType: CacheMapTypeOrigin, + }, + { + name: "Add to target cache map, found by GetByTargetId", + elementId: "someTargetId", + cacheMapType: CacheMapTypeTarget, + }, + { + name: "Add to intercepted cache map, found by Get", + elementId: "someInterceptedId", + cacheMapType: CacheMapTypeTarget, + }, + { + name: "Not added, not found", + elementId: "someElementId", + cacheMapType: CacheMapTypeNone, + }, + } -func TestPreparedStatementCache_GetByTargetPreparedId(t *testing.T) { + dummyPreparedResult := &message.PreparedResult{ + PreparedQueryId: []byte("dummy"), + } + dummyPreparedData := NewPreparedData(dummyPreparedResult, dummyPreparedResult, NewPrepareRequestInfo(NewGenericRequestInfo(forwardToBoth, false, false), []*term{}, false, "", "")) -} + for _, test := range tests { + t.Run(test.name, func(tt *testing.T) { + psCache, err := NewPreparedStatementCache(MaxPSCacheSizeForTests) + require.Nil(tt, err, "Error creating the PSCache", err) + + switch test.cacheMapType { + case CacheMapTypeOrigin: + psCache.cache.Add(test.elementId, dummyPreparedData) -func TestPreparedStatementCache_GetPreparedStatementCacheSize(t *testing.T) { + _, foundByGet := psCache.Get([]byte(test.elementId)) + require.True(tt, foundByGet) + + _, foundByGetByTargetPreparedId := psCache.GetByTargetPreparedId([]byte(test.elementId)) + require.False(tt, foundByGetByTargetPreparedId) + case CacheMapTypeTarget: + psCache.index.Add(test.elementId, "origin_" + test.elementId) + psCache.cache.Add("origin_" + test.elementId, dummyPreparedData) + + _, foundByGet := psCache.Get([]byte(test.elementId)) + require.False(tt, foundByGet) + + _, foundByGetByTargetPreparedId := psCache.GetByTargetPreparedId([]byte(test.elementId)) + require.True(tt, foundByGetByTargetPreparedId) + case CacheMapTypeIntercepted: + psCache.interceptedCache.Add(test.elementId, dummyPreparedData) + + _, foundByGet := psCache.Get([]byte(test.elementId)) + require.True(tt, foundByGet) + + _, foundByGetByTargetPreparedId := psCache.GetByTargetPreparedId([]byte(test.elementId)) + require.False(tt, foundByGetByTargetPreparedId) + case CacheMapTypeNone: + _, foundByGet := psCache.Get([]byte(test.elementId)) + require.False(tt, foundByGet) + + _, foundByGetByTargetPreparedId := psCache.GetByTargetPreparedId([]byte(test.elementId)) + require.False(tt, foundByGetByTargetPreparedId) + default: + t.Fatal("Unknown or missing cache map type") + } + }) + } } From 4eed655681746e19855b1a621dff7f175f5014cd Mon Sep 17 00:00:00 2001 From: Auto Gofmt Date: Fri, 20 Jan 2023 13:48:19 +0000 Subject: [PATCH 05/10] Automated gofmt changes --- proxy/pkg/zdmproxy/pscache_test.go | 36 +++++++++++++++--------------- 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/proxy/pkg/zdmproxy/pscache_test.go b/proxy/pkg/zdmproxy/pscache_test.go index d55f3051..5cff25dc 100644 --- a/proxy/pkg/zdmproxy/pscache_test.go +++ b/proxy/pkg/zdmproxy/pscache_test.go @@ -15,10 +15,10 @@ const InterceptedIdPrefix = "interceptedId_" type CacheMapType string const ( - CacheMapTypeOrigin = CacheMapType("CACHE-ORIGIN") - CacheMapTypeTarget = CacheMapType("INDEX-TARGET") - CacheMapTypeIntercepted = CacheMapType("INTERCEPTED") - CacheMapTypeNone = CacheMapType("NONE") + CacheMapTypeOrigin = CacheMapType("CACHE-ORIGIN") + CacheMapTypeTarget = CacheMapType("INDEX-TARGET") + CacheMapTypeIntercepted = CacheMapType("INTERCEPTED") + CacheMapTypeNone = CacheMapType("NONE") ) /* @@ -191,32 +191,32 @@ func checkIfElementIsInInterceptedMap(psCache *PreparedStatementCache, elementSu This test focuses on ensuring that Get and GetByTargetPreparedId work correctly. It inserts elements directly into the cache maps to avoid coupling this test to the logic in the PS Cache's store methods. It uses dummy, non-realistic data. - */ +*/ func TestPreparedStatementCache_GetFromCache(t *testing.T) { tests := []struct { - name string - elementId string - cacheMapType CacheMapType + name string + elementId string + cacheMapType CacheMapType }{ { - name: "Add to origin cache map, found by Get", - elementId: "someOriginId", + name: "Add to origin cache map, found by Get", + elementId: "someOriginId", cacheMapType: CacheMapTypeOrigin, }, { - name: "Add to target cache map, found by GetByTargetId", - elementId: "someTargetId", + name: "Add to target cache map, found by GetByTargetId", + elementId: "someTargetId", cacheMapType: CacheMapTypeTarget, }, { - name: "Add to intercepted cache map, found by Get", - elementId: "someInterceptedId", + name: "Add to intercepted cache map, found by Get", + elementId: "someInterceptedId", cacheMapType: CacheMapTypeTarget, }, { - name: "Not added, not found", - elementId: "someElementId", + name: "Not added, not found", + elementId: "someElementId", cacheMapType: CacheMapTypeNone, }, } @@ -241,8 +241,8 @@ func TestPreparedStatementCache_GetFromCache(t *testing.T) { _, foundByGetByTargetPreparedId := psCache.GetByTargetPreparedId([]byte(test.elementId)) require.False(tt, foundByGetByTargetPreparedId) case CacheMapTypeTarget: - psCache.index.Add(test.elementId, "origin_" + test.elementId) - psCache.cache.Add("origin_" + test.elementId, dummyPreparedData) + psCache.index.Add(test.elementId, "origin_"+test.elementId) + psCache.cache.Add("origin_"+test.elementId, dummyPreparedData) _, foundByGet := psCache.Get([]byte(test.elementId)) require.False(tt, foundByGet) From 0103904d1496f934cc781199f2aaca3f862ae4b2 Mon Sep 17 00:00:00 2001 From: Alice Lottini Date: Mon, 6 Mar 2023 18:08:12 +0000 Subject: [PATCH 06/10] Changes to use LRU cache instead of map --- go.mod | 1 + proxy/pkg/zdmproxy/pscache.go | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/go.mod b/go.mod index b60722df..a5187b22 100644 --- a/go.mod +++ b/go.mod @@ -7,6 +7,7 @@ require ( github.com/apache/cassandra-gocql-driver/v2 v2.0.0 github.com/datastax/go-cassandra-native-protocol v0.0.0-20260130100129-9d5b43677a33 github.com/google/uuid v1.1.1 + github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/jpillora/backoff v1.0.0 github.com/kelseyhightower/envconfig v1.4.0 github.com/mcuadros/go-defaults v1.2.0 diff --git a/proxy/pkg/zdmproxy/pscache.go b/proxy/pkg/zdmproxy/pscache.go index 01e56524..aa38aaf8 100644 --- a/proxy/pkg/zdmproxy/pscache.go +++ b/proxy/pkg/zdmproxy/pscache.go @@ -46,7 +46,6 @@ func (psc PreparedStatementCache) GetPreparedStatementCacheSize() float64 { psc.lock.RLock() defer psc.lock.RUnlock() - //return float64(len(psc.cache) + len(psc.interceptedCache)) return float64(psc.cache.Len() + psc.interceptedCache.Len()) } From 56e8652f6a645d6981d3749f34a505e55e1d6eef Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Reis?= Date: Tue, 21 Apr 2026 15:00:06 +0100 Subject: [PATCH 07/10] rebase --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index a5187b22..3ed59d1f 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/apache/cassandra-gocql-driver/v2 v2.0.0 github.com/datastax/go-cassandra-native-protocol v0.0.0-20260130100129-9d5b43677a33 github.com/google/uuid v1.1.1 - github.com/hashicorp/golang-lru v0.5.4 // indirect + github.com/hashicorp/golang-lru v0.5.4 github.com/jpillora/backoff v1.0.0 github.com/kelseyhightower/envconfig v1.4.0 github.com/mcuadros/go-defaults v1.2.0 diff --git a/go.sum b/go.sum index 300423c9..643f2818 100644 --- a/go.sum +++ b/go.sum @@ -54,6 +54,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= +github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= From a21c544485dcb5346b1b7e6630faeb511f180374 Mon Sep 17 00:00:00 2001 From: Auto Gofmt Date: Tue, 21 Apr 2026 14:01:01 +0000 Subject: [PATCH 08/10] Automated gofmt changes --- proxy/pkg/zdmproxy/pscache_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/proxy/pkg/zdmproxy/pscache_test.go b/proxy/pkg/zdmproxy/pscache_test.go index 5cff25dc..61cf99c9 100644 --- a/proxy/pkg/zdmproxy/pscache_test.go +++ b/proxy/pkg/zdmproxy/pscache_test.go @@ -187,7 +187,8 @@ func checkIfElementIsInInterceptedMap(psCache *PreparedStatementCache, elementSu return foundInterceptedId } -/** +/* +* This test focuses on ensuring that Get and GetByTargetPreparedId work correctly. It inserts elements directly into the cache maps to avoid coupling this test to the logic in the PS Cache's store methods. It uses dummy, non-realistic data. From fe4b43656e67b76dc016d12ebb5eb127fd118890 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Reis?= Date: Tue, 26 May 2026 16:36:00 +0100 Subject: [PATCH 09/10] update golang-lru to 2.0.7, use simplelru to avoid double locking and make index a normal map (use onEvict callback to delete index entries) --- go.mod | 2 +- go.sum | 4 +-- integration-tests/setup/testcluster.go | 2 +- proxy/pkg/config/config.go | 2 +- proxy/pkg/zdmproxy/pscache.go | 45 +++++++++++++++----------- proxy/pkg/zdmproxy/pscache_test.go | 9 +++--- 6 files changed, 36 insertions(+), 28 deletions(-) diff --git a/go.mod b/go.mod index a37c75b0..47d78dfb 100644 --- a/go.mod +++ b/go.mod @@ -7,7 +7,7 @@ require ( github.com/apache/cassandra-gocql-driver/v2 v2.0.0 github.com/datastax/go-cassandra-native-protocol v0.0.0-20260130100129-9d5b43677a33 github.com/google/uuid v1.1.1 - github.com/hashicorp/golang-lru v0.5.4 + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/jpillora/backoff v1.0.0 github.com/kelseyhightower/envconfig v1.4.0 github.com/mcuadros/go-defaults v1.2.0 diff --git a/go.sum b/go.sum index 79740f8c..88d35ec8 100644 --- a/go.sum +++ b/go.sum @@ -54,8 +54,8 @@ github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeN github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= -github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc= -github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/jpillora/backoff v1.0.0 h1:uvFg412JmmHBHw7iwprIxkPMI+sGQ4kzOWsMeHnm2EA= github.com/jpillora/backoff v1.0.0/go.mod h1:J/6gKK9jxlEcS3zixgDgUAsiuZ7yrSoa/FX5e0EB2j4= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= diff --git a/integration-tests/setup/testcluster.go b/integration-tests/setup/testcluster.go index c266a19d..48151268 100644 --- a/integration-tests/setup/testcluster.go +++ b/integration-tests/setup/testcluster.go @@ -448,7 +448,7 @@ func NewTestConfig(originHost string, targetHost string) *config.Config { conf.ProxyMaxClientConnections = 1000 conf.ProxyMaxStreamIds = 2048 - conf.ProxyMaxPreparedStatementCacheSize = 5000 + conf.ProxyMaxPreparedStatementCacheSize = 10000 conf.RequestResponseMaxWorkers = -1 conf.WriteMaxWorkers = -1 diff --git a/proxy/pkg/config/config.go b/proxy/pkg/config/config.go index cde65e6f..730aa1e0 100644 --- a/proxy/pkg/config/config.go +++ b/proxy/pkg/config/config.go @@ -79,7 +79,7 @@ type Config struct { ProxyRequestTimeoutMs int `default:"10000" split_words:"true" yaml:"proxy_request_timeout_ms"` ProxyMaxClientConnections int `default:"1000" split_words:"true" yaml:"proxy_max_client_connections"` ProxyMaxStreamIds int `default:"2048" split_words:"true" yaml:"proxy_max_stream_ids"` - ProxyMaxPreparedStatementCacheSize int `default:"5000" split_words:"true" yaml:"proxy_max_prepared_statement_cache_size"` + ProxyMaxPreparedStatementCacheSize int `default:"10000" split_words:"true" yaml:"proxy_max_prepared_statement_cache_size"` ProxyTlsCaPath string `split_words:"true" yaml:"proxy_tls_ca_path"` ProxyTlsCertPath string `split_words:"true" yaml:"proxy_tls_cert_path"` diff --git a/proxy/pkg/zdmproxy/pscache.go b/proxy/pkg/zdmproxy/pscache.go index aa38aaf8..0c4eb1f4 100644 --- a/proxy/pkg/zdmproxy/pscache.go +++ b/proxy/pkg/zdmproxy/pscache.go @@ -3,40 +3,41 @@ package zdmproxy import ( "encoding/hex" "fmt" + "sync" + "github.com/datastax/go-cassandra-native-protocol/message" - lru "github.com/hashicorp/golang-lru" + "github.com/hashicorp/golang-lru/v2/simplelru" log "github.com/sirupsen/logrus" - "sync" ) type PreparedStatementCache struct { - cache *lru.Cache // Map containing the prepared queries (raw bytes) keyed on prepareId - index *lru.Cache // Map that can be used as an index to look up origin prepareIds by target prepareId + cache *simplelru.LRU[string, PreparedData] // Map containing the prepared queries (raw bytes) keyed on prepareId + index map[string]string // Map that can be used as an index to look up origin prepareIds by target prepareId - interceptedCache *lru.Cache // Map containing the prepared queries for intercepted requests + interceptedCache *simplelru.LRU[string, PreparedData] // Map containing the prepared queries for intercepted requests lock *sync.RWMutex } func NewPreparedStatementCache(maxSize int) (*PreparedStatementCache, error) { - cache, err := lru.New(maxSize) - if err != nil { - return nil, fmt.Errorf("error initializing the PreparedStatementCache cache map: %v", err) - } + indexMap := make(map[string]string) - index, err := lru.New(maxSize) + cache, err := simplelru.NewLRU[string, PreparedData](maxSize, func(key string, value PreparedData) { + // this is called by LRU.Add() so we already have a lock here + delete(indexMap, string(value.GetTargetPreparedId())) + }) if err != nil { - return nil, fmt.Errorf("error initializing the PreparedStatementCache index map: %v", err) + return nil, fmt.Errorf("error initializing the PreparedStatementCache cache map: %v", err) } - interceptedCache, err := lru.New(maxSize) + interceptedCache, err := simplelru.NewLRU[string, PreparedData](maxSize, nil) if err != nil { return nil, fmt.Errorf("error initializing the PreparedStatementCache interceptedCache map: %v", err) } return &PreparedStatementCache{ cache: cache, - index: index, + index: indexMap, interceptedCache: interceptedCache, lock: &sync.RWMutex{}, }, nil @@ -46,6 +47,12 @@ func (psc PreparedStatementCache) GetPreparedStatementCacheSize() float64 { psc.lock.RLock() defer psc.lock.RUnlock() + cacheLen := psc.cache.Len() + interceptedCacheLen := psc.interceptedCache.Len() + + log.Debugf("PS Cache Size: %v, PS Intercepted Size: %v, PS Index Size: %v.", + cacheLen, interceptedCacheLen, len(psc.index)) + return float64(psc.cache.Len() + psc.interceptedCache.Len()) } @@ -59,7 +66,7 @@ func (psc *PreparedStatementCache) Store( defer psc.lock.Unlock() psc.cache.Add(originPrepareIdStr, NewPreparedData(originPreparedResult, targetPreparedResult, prepareRequestInfo)) - psc.index.Add(targetPrepareIdStr, originPrepareIdStr) + psc.index[targetPrepareIdStr] = originPrepareIdStr log.Debugf("Storing PS cache entry: {OriginPreparedId=%v, TargetPreparedId: %v, RequestInfo: %v}", hex.EncodeToString(originPreparedResult.PreparedQueryId), hex.EncodeToString(targetPreparedResult.PreparedQueryId), prepareRequestInfo) @@ -78,8 +85,8 @@ func (psc *PreparedStatementCache) StoreIntercepted(preparedResult *message.Prep } func (psc *PreparedStatementCache) Get(originPreparedId []byte) (PreparedData, bool) { - psc.lock.RLock() - defer psc.lock.RUnlock() + psc.lock.Lock() + defer psc.lock.Unlock() data, ok := psc.cache.Get(string(originPreparedId)) if ok { return data.(PreparedData), true @@ -94,10 +101,10 @@ func (psc *PreparedStatementCache) Get(originPreparedId []byte) (PreparedData, b } func (psc *PreparedStatementCache) GetByTargetPreparedId(targetPreparedId []byte) (PreparedData, bool) { - psc.lock.RLock() - defer psc.lock.RUnlock() + psc.lock.Lock() + defer psc.lock.Unlock() - originPreparedId, ok := psc.index.Get(string(targetPreparedId)) + originPreparedId, ok := psc.index[string(targetPreparedId)] if !ok { // Don't bother attempting a lookup on the intercepted cache because this method should only be used to handle UNPREPARED responses return nil, false diff --git a/proxy/pkg/zdmproxy/pscache_test.go b/proxy/pkg/zdmproxy/pscache_test.go index 61cf99c9..e6d6db8a 100644 --- a/proxy/pkg/zdmproxy/pscache_test.go +++ b/proxy/pkg/zdmproxy/pscache_test.go @@ -2,9 +2,10 @@ package zdmproxy import ( "fmt" + "testing" + "github.com/datastax/go-cassandra-native-protocol/message" "github.com/stretchr/testify/require" - "testing" ) const MaxPSCacheSizeForTests = 10 @@ -148,7 +149,7 @@ func TestPreparedStatementCache_StoreIntoAllCacheMaps(t *testing.T) { } require.Equal(tt, test.expectedCacheMapSize, psCache.cache.Len()) - require.Equal(tt, test.expectedCacheMapSize, psCache.index.Len()) + require.Equal(tt, test.expectedCacheMapSize, len(psCache.index)) require.Equal(tt, test.expectedCacheMapSize, psCache.interceptedCache.Len()) require.Equal(tt, float64(test.expectedCacheMapSize*2), psCache.GetPreparedStatementCacheSize()) @@ -176,7 +177,7 @@ func checkIfElementIsInOriginMap(psCache *PreparedStatementCache, elementSuffix func checkIfElementIsInTargetMap(psCache *PreparedStatementCache, elementSuffix int) bool { targetId := fmt.Sprint(TargetIdPrefix, elementSuffix) // not using psCache.GetByTargetPreparedId, which is tested separately - _, foundTargetId := psCache.index.Get(targetId) + _, foundTargetId := psCache.index[targetId] return foundTargetId } @@ -242,7 +243,7 @@ func TestPreparedStatementCache_GetFromCache(t *testing.T) { _, foundByGetByTargetPreparedId := psCache.GetByTargetPreparedId([]byte(test.elementId)) require.False(tt, foundByGetByTargetPreparedId) case CacheMapTypeTarget: - psCache.index.Add(test.elementId, "origin_"+test.elementId) + psCache.index[test.elementId] = "origin_" + test.elementId psCache.cache.Add("origin_"+test.elementId, dummyPreparedData) _, foundByGet := psCache.Get([]byte(test.elementId)) From 8c6c31ec830b39338933891da0e43d82bd3c21e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jo=C3=A3o=20Reis?= Date: Tue, 26 May 2026 16:56:30 +0100 Subject: [PATCH 10/10] fix couple of issues --- proxy/pkg/zdmproxy/pscache.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/proxy/pkg/zdmproxy/pscache.go b/proxy/pkg/zdmproxy/pscache.go index 0c4eb1f4..810c5cd8 100644 --- a/proxy/pkg/zdmproxy/pscache.go +++ b/proxy/pkg/zdmproxy/pscache.go @@ -53,7 +53,7 @@ func (psc PreparedStatementCache) GetPreparedStatementCacheSize() float64 { log.Debugf("PS Cache Size: %v, PS Intercepted Size: %v, PS Index Size: %v.", cacheLen, interceptedCacheLen, len(psc.index)) - return float64(psc.cache.Len() + psc.interceptedCache.Len()) + return float64(cacheLen + interceptedCacheLen) } func (psc *PreparedStatementCache) Store( @@ -89,12 +89,12 @@ func (psc *PreparedStatementCache) Get(originPreparedId []byte) (PreparedData, b defer psc.lock.Unlock() data, ok := psc.cache.Get(string(originPreparedId)) if ok { - return data.(PreparedData), true + return data, true } data, ok = psc.interceptedCache.Get(string(originPreparedId)) if ok { - return data.(PreparedData), true + return data, true } return nil, false @@ -117,7 +117,7 @@ func (psc *PreparedStatementCache) GetByTargetPreparedId(targetPreparedId []byte return nil, false } - return data.(PreparedData), true + return data, true } type PreparedData interface {