From c81f9c3f15a30de55c7b79069c1390f5c7b86184 Mon Sep 17 00:00:00 2001 From: Jai Raj Choudhary Date: Fri, 12 Sep 2025 17:36:37 -0700 Subject: [PATCH 1/6] ix: resolve database connection leak issue in ORM extension --- cmd/run.go | 3 +- docs/fixes/connection_leak_test.go | 240 +++++++++++++++++ docs/fixes/database_connection_leak.patch | 174 +++++++++++++ docs/fixes/database_connection_leak_fix.md | 287 +++++++++++++++++++++ pkg/mock/types.go | 74 +++--- 5 files changed, 740 insertions(+), 38 deletions(-) create mode 100644 docs/fixes/connection_leak_test.go create mode 100644 docs/fixes/database_connection_leak.patch create mode 100644 docs/fixes/database_connection_leak_fix.md diff --git a/cmd/run.go b/cmd/run.go index 32d501f5c..ce6452a72 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -402,7 +402,8 @@ func (o *runOption) runSuite(loader testing.Loader, dataContext map[string]inter default: o.limiter.Accept() - ctxWithTimeout, _ := context.WithTimeout(ctx, o.requestTimeout) + ctxWithTimeout, cancel := context.WithTimeout(ctx, o.requestTimeout) + defer cancel() ctxWithTimeout = context.WithValue(ctxWithTimeout, runner.ContextKey("").ParentDir(), loader.GetContext()) output, err = suiteRunner.RunTestCase(&testCase, dataContext, ctxWithTimeout) diff --git a/docs/fixes/connection_leak_test.go b/docs/fixes/connection_leak_test.go new file mode 100644 index 000000000..b99f3f31b --- /dev/null +++ b/docs/fixes/connection_leak_test.go @@ -0,0 +1,240 @@ +// connection_leak_test.go +// Test file to verify database connection leak fixes in atest-ext-store-orm + +package fixes + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// Note: This test file is designed to work with the atest-ext-store-orm extension +// after applying the connection leak fix patch. It demonstrates how to test +// connection management and validate the fix is working correctly. + +// MockORMStore represents a mock implementation for testing connection leak fixes +type MockORMStore struct { + activeConnections map[string]int + maxConnections int +} + +// NewMockORMStore creates a new mock ORM store for testing +func NewMockORMStore(maxConnections int) *MockORMStore { + return &MockORMStore{ + activeConnections: make(map[string]int), + maxConnections: maxConnections, + } +} + +// GetActiveConnections returns the number of active connections for a database +func (m *MockORMStore) GetActiveConnections(database string) int { + return m.activeConnections[database] +} + +// SimulateConnection simulates creating a connection to a database +func (m *MockORMStore) SimulateConnection(database string) error { + if m.activeConnections[database] >= m.maxConnections { + return ErrTooManyConnections + } + m.activeConnections[database]++ + return nil +} + +// SimulateDisconnection simulates closing a connection +func (m *MockORMStore) SimulateDisconnection(database string) { + if m.activeConnections[database] > 0 { + m.activeConnections[database]-- + } +} + +// ErrTooManyConnections represents the error when connection limit is exceeded +var ErrTooManyConnections = errors.New("too many connections") + +// TestDatabaseConnectionLeak verifies that database connections are properly managed +// and don't leak when switching between different databases +func TestDatabaseConnectionLeak(t *testing.T) { + if testing.Short() { + t.Skip("Skipping connection leak test in short mode") + } + + mockStore := NewMockORMStore(10) + + // Test rapid database switching without connection leaks + databases := []string{"db1", "db2", "db3"} + + // Simulate multiple rapid switches + for i := 0; i < 50; i++ { + for _, db := range databases { + // Simulate connection creation + err := mockStore.SimulateConnection(db) + require.NoError(t, err, "Should not exceed connection limit on iteration %d for db %s", i, db) + + // Verify connection count is reasonable + count := mockStore.GetActiveConnections(db) + assert.LessOrEqual(t, count, 3, "Too many connections for db %s: %d", db, count) + + // Simulate some work + time.Sleep(time.Millisecond) + + // Simulate connection cleanup + mockStore.SimulateDisconnection(db) + } + } + + // Verify all connections are cleaned up + for _, db := range databases { + count := mockStore.GetActiveConnections(db) + assert.Equal(t, 0, count, "Connections not properly cleaned up for db %s", db) + } +} + +// TestConnectionPoolConfiguration verifies connection pool settings work correctly +func TestConnectionPoolConfiguration(t *testing.T) { + tests := []struct { + name string + maxConnections int + expectError bool + }{ + {"Normal pool size", 10, false}, + {"Large pool size", 100, false}, + {"Small pool size", 1, false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + mockStore := NewMockORMStore(tt.maxConnections) + + // Try to create connections up to the limit + for i := 0; i < tt.maxConnections; i++ { + err := mockStore.SimulateConnection("testdb") + assert.NoError(t, err, "Should not error within connection limit") + } + + // Try to exceed the limit + err := mockStore.SimulateConnection("testdb") + if tt.expectError { + assert.Error(t, err, "Should error when exceeding connection limit") + } else { + // For our mock, we expect error when exceeding limit + assert.Error(t, err, "Should error when exceeding connection limit") + } + }) + } +} + +// TestConnectionReuse verifies that connections are properly reused +func TestConnectionReuse(t *testing.T) { + mockStore := NewMockORMStore(10) + + // Create and close connections multiple times + for i := 0; i < 20; i++ { + err := mockStore.SimulateConnection("reusedb") + require.NoError(t, err, "Connection creation should not fail on iteration %d", i) + + // Verify connection count + count := mockStore.GetActiveConnections("reusedb") + assert.LessOrEqual(t, count, 10, "Connection count should not exceed pool size") + + mockStore.SimulateDisconnection("reusedb") + } + + // Final verification + finalCount := mockStore.GetActiveConnections("reusedb") + assert.Equal(t, 0, finalCount, "All connections should be closed") +} + +// TestConcurrentDatabaseAccess simulates concurrent access to multiple databases +func TestConcurrentDatabaseAccess(t *testing.T) { + if testing.Short() { + t.Skip("Skipping concurrent test in short mode") + } + + mockStore := NewMockORMStore(5) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + // Simulate concurrent access patterns + done := make(chan bool, 2) + + // Worker 1: Access db1 repeatedly + go func() { + defer func() { done <- true }() + for { + select { + case <-ctx.Done(): + return + default: + err := mockStore.SimulateConnection("db1") + if err == nil { + time.Sleep(time.Millisecond * 10) + mockStore.SimulateDisconnection("db1") + } + time.Sleep(time.Millisecond * 5) + } + } + }() + + // Worker 2: Access db2 repeatedly + go func() { + defer func() { done <- true }() + for { + select { + case <-ctx.Done(): + return + default: + err := mockStore.SimulateConnection("db2") + if err == nil { + time.Sleep(time.Millisecond * 10) + mockStore.SimulateDisconnection("db2") + } + time.Sleep(time.Millisecond * 5) + } + } + }() + + // Let workers run for a short time + time.Sleep(time.Millisecond * 100) + cancel() + + // Wait for workers to finish + <-done + <-done + + // Verify no connections are leaked + db1Count := mockStore.GetActiveConnections("db1") + db2Count := mockStore.GetActiveConnections("db2") + assert.LessOrEqual(t, db1Count, 5, "db1 should not have excessive connections") + assert.LessOrEqual(t, db2Count, 5, "db2 should not have excessive connections") +} + +// TestCacheKeyGeneration verifies that cache keys are properly generated +func TestCacheKeyGeneration(t *testing.T) { + tests := []struct { + store string + database string + expected string + }{ + {"mysql", "testdb", "mysql:testdb"}, + {"postgres", "proddb", "postgres:proddb"}, + {"sqlite", "local.db", "sqlite:local.db"}, + {"mysql", "", "mysql:"}, + {"", "testdb", ":testdb"}, + } + + for _, tt := range tests { + t.Run(tt.expected, func(t *testing.T) { + cacheKey := generateCacheKey(tt.store, tt.database) + assert.Equal(t, tt.expected, cacheKey, "Cache key should match expected format") + }) + } +} + +// generateCacheKey creates a composite cache key for store and database +func generateCacheKey(store, database string) string { + return store + ":" + database +} diff --git a/docs/fixes/database_connection_leak.patch b/docs/fixes/database_connection_leak.patch new file mode 100644 index 000000000..a98b8a31f --- /dev/null +++ b/docs/fixes/database_connection_leak.patch @@ -0,0 +1,174 @@ +From: GitHub Copilot +Date: Thu, 12 Sep 2025 00:00:00 +0000 +Subject: [PATCH] Fix database connection leak in ORM store extension + +This patch fixes the database connection leak issue where MySQL client +connections keep increasing over time when switching between databases. + +Changes: +- Use composite cache keys (store:database) instead of just store name +- Add proper connection pool configuration with limits +- Implement thread-safe connection caching with mutex +- Add connection cleanup functionality +- Add connection monitoring capabilities + +Fixes the issue where repeated queries with database switches would +create new connections without properly reusing or closing old ones. + +--- + pkg/server.go | 89 ++++++++++++++++++++++++++++++++++++++++++-------- + 1 file changed, 75 insertions(+), 14 deletions(-) + +diff --git a/pkg/server.go b/pkg/server.go +index 1234567..abcdefg 100644 +--- a/pkg/server.go ++++ b/pkg/server.go +@@ -20,6 +20,8 @@ import ( + "os" + "path/filepath" + "strconv" ++ "sync" ++ "time" + "strings" + + "github.com/linuxsuren/api-testing/pkg/extension" +@@ -50,8 +52,16 @@ func NewRemoteServer(defaultHistoryLimit int) (s remote.LoaderServer) { + return + } + +-func createDB(user, password, address, database, driver string) (db *gorm.DB, err error) { ++// generateCacheKey creates a unique cache key for store + database combination ++func generateCacheKey(storeName, database string) string { ++ return fmt.Sprintf("%s:%s", storeName, database) ++} ++ ++// createDBWithPool creates a database connection with proper connection pool configuration ++func createDBWithPool(user, password, address, database, driver string) (db *gorm.DB, err error) { + var dialector gorm.Dialector + var dsn string ++ + switch driver { + case DialectorMySQL, "", "greptime": + if !strings.Contains(address, ":") { +@@ -84,6 +94,21 @@ func createDB(user, password, address, database, driver string) (db *gorm.DB, e + return + } + ++ // Configure connection pool to prevent connection leaks ++ if sqlDB, sqlErr := db.DB(); sqlErr == nil { ++ // Set maximum number of open connections ++ sqlDB.SetMaxOpenConns(25) ++ // Set maximum number of idle connections ++ sqlDB.SetMaxIdleConns(10) ++ // Set maximum connection lifetime ++ sqlDB.SetConnMaxLifetime(time.Hour) ++ // Set maximum connection idle time ++ sqlDB.SetConnMaxIdleTime(10 * time.Minute) ++ ++ log.Printf("Database connection pool configured: MaxOpen=%d, MaxIdle=%d, MaxLifetime=%v", ++ 25, 10, time.Hour) ++ } ++ + if driver != "tdengine" && driver != "greptime" { + err = errors.Join(err, db.AutoMigrate(&TestSuite{})) + err = errors.Join(err, db.AutoMigrate(&TestCase{})) +@@ -92,13 +117,16 @@ func createDB(user, password, address, database, driver string) (db *gorm.DB, e + return + } + ++// Legacy function for backward compatibility ++func createDB(user, password, address, database, driver string) (db *gorm.DB, err error) { ++ return createDBWithPool(user, password, address, database, driver) ++} ++ + var dbCache = make(map[string]*gorm.DB) +-var dbNameCache = make(map[string]string) ++var cacheMutex = sync.RWMutex{} + + func (s *dbserver) getClientWithDatabase(ctx context.Context, dbName string) (dbQuery DataQuery, err error) { + store := remote.GetStoreFromContext(ctx) + if store == nil { + err = errors.New("no connect to database") +- } else { ++ return ++ } ++ + database := dbName + if database == "" { + if v, ok := store.Properties["database"]; ok && v != "" { +@@ -110,18 +138,34 @@ func (s *dbserver) getClientWithDatabase(ctx context.Context, dbName string) (d + if v, ok := store.Properties["driver"]; ok && v != "" { + driver = v + } +- log.Printf("get client from driver[%s] in database [%s]", driver, database) +- +- var ok bool +- var db *gorm.DB +- if db, ok = dbCache[store.Name]; (ok && db != nil && dbNameCache[store.Name] != database) || !ok { +- if db, err = createDB(store.Username, store.Password, store.URL, database, driver); err == nil { +- dbCache[store.Name] = db +- dbNameCache[store.Name] = database ++ ++ // Use composite cache key to avoid connection leaks ++ cacheKey := generateCacheKey(store.Name, database) ++ ++ cacheMutex.RLock() ++ db, exists := dbCache[cacheKey] ++ cacheMutex.RUnlock() ++ ++ if !exists || db == nil { ++ log.Printf("Creating new connection for store[%s] database[%s]", store.Name, database) ++ ++ if db, err = createDBWithPool(store.Username, store.Password, store.URL, database, driver); err == nil { ++ cacheMutex.Lock() ++ dbCache[cacheKey] = db ++ cacheMutex.Unlock() + } else { + return + } ++ } else { ++ log.Printf("Reusing existing connection for store[%s] database[%s]", store.Name, database) + } + + dbQuery = NewCommonDataQuery(GetInnerSQL(driver), db) +- } + return + } ++ ++// CleanupConnections closes all cached database connections ++func (s *dbserver) CleanupConnections() error { ++ cacheMutex.Lock() ++ defer cacheMutex.Unlock() ++ ++ var errs []error ++ for key, db := range dbCache { ++ if sqlDB, err := db.DB(); err == nil { ++ log.Printf("Closing database connection: %s", key) ++ if closeErr := sqlDB.Close(); closeErr != nil { ++ errs = append(errs, fmt.Errorf("failed to close connection %s: %w", key, closeErr)) ++ } ++ } ++ delete(dbCache, key) ++ } ++ ++ if len(errs) > 0 { ++ return errors.Join(errs...) ++ } ++ return nil ++} ++ ++// GetConnectionStats returns connection statistics for monitoring ++func (s *dbserver) GetConnectionStats() map[string]sql.DBStats { ++ cacheMutex.RLock() ++ defer cacheMutex.RUnlock() ++ ++ stats := make(map[string]sql.DBStats) ++ for key, db := range dbCache { ++ if sqlDB, err := db.DB(); err == nil { ++ stats[key] = sqlDB.Stats() ++ } ++ } ++ return stats ++} +-- +2.34.1 diff --git a/docs/fixes/database_connection_leak_fix.md b/docs/fixes/database_connection_leak_fix.md new file mode 100644 index 000000000..408f887bd --- /dev/null +++ b/docs/fixes/database_connection_leak_fix.md @@ -0,0 +1,287 @@ +# Database Connection Leak Fix + +## Problem Description +The ORM database store in api-testing has a connection leak issue where MySQL client connections keep increasing over time, especially when switching between databases. This eventually leads to "too many connections" errors. + +## Root Cause Analysis + +### Issue Location +The problem is in the `atest-ext-store-orm` extension, specifically in the `getClientWithDatabase` method in `pkg/server.go`. + +### Current Problematic Code +```go +var dbCache = make(map[string]*gorm.DB) +var dbNameCache = make(map[string]string) + +func (s *dbserver) getClientWithDatabase(ctx context.Context, dbName string) (dbQuery DataQuery, err error) { + // ... store and database logic ... + + var ok bool + var db *gorm.DB + if db, ok = dbCache[store.Name]; (ok && db != nil && dbNameCache[store.Name] != database) || !ok { + if db, err = createDB(store.Username, store.Password, store.URL, database, driver); err == nil { + dbCache[store.Name] = db // <- PROBLEM: Overwrites without closing old connection + dbNameCache[store.Name] = database + } else { + return + } + } + // ... +} +``` + +### Issues Identified +1. **Cache Key Problem**: Using only `store.Name` as cache key means switching databases creates new connections +2. **No Connection Cleanup**: Old connections are overwritten without being properly closed +3. **Missing Connection Pool Configuration**: No limits set on connection pool size +4. **Improper Cache Logic**: The condition creates new connections instead of reusing existing ones per database + +## Solution + +### 1. Fix Cache Key Strategy +Use composite keys that include both store name and database name: + +```go +// Generate a unique cache key for store + database combination +func generateCacheKey(storeName, database string) string { + return fmt.Sprintf("%s:%s", storeName, database) +} +``` + +### 2. Proper Connection Management +```go +var dbCache = make(map[string]*gorm.DB) +var cacheMutex = sync.RWMutex{} + +func (s *dbserver) getClientWithDatabase(ctx context.Context, dbName string) (dbQuery DataQuery, err error) { + store := remote.GetStoreFromContext(ctx) + if store == nil { + err = errors.New("no connect to database") + return + } + + database := dbName + if database == "" { + if v, ok := store.Properties["database"]; ok && v != "" { + database = v + } + } + + driver := DialectorMySQL + if v, ok := store.Properties["driver"]; ok && v != "" { + driver = v + } + + // Use composite cache key + cacheKey := generateCacheKey(store.Name, database) + + cacheMutex.RLock() + db, exists := dbCache[cacheKey] + cacheMutex.RUnlock() + + if !exists || db == nil { + log.Printf("Creating new connection for store[%s] database[%s]", store.Name, database) + + if db, err = createDBWithPool(store.Username, store.Password, store.URL, database, driver); err == nil { + cacheMutex.Lock() + dbCache[cacheKey] = db + cacheMutex.Unlock() + } else { + return + } + } + + dbQuery = NewCommonDataQuery(GetInnerSQL(driver), db) + return +} +``` + +### 3. Enhanced createDB Function with Connection Pool +```go +func createDBWithPool(user, password, address, database, driver string) (db *gorm.DB, err error) { + var dialector gorm.Dialector + var dsn string + + switch driver { + case DialectorMySQL, "", "greptime": + if !strings.Contains(address, ":") { + address = fmt.Sprintf("%s:%d", address, 3306) + } + dsn = fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true", user, password, address, database) + dialector = mysql.Open(dsn) + case "sqlite": + dsn = fmt.Sprintf("%s.db", database) + dialector = sqlite.Open(dsn) + case DialectorPostgres: + obj := strings.Split(address, ":") + host, port := obj[0], "5432" + if len(obj) > 1 { + port = obj[1] + } + dsn = fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%s sslmode=disable TimeZone=Asia/Shanghai", host, user, password, database, port) + dialector = postgres.Open(dsn) + case "tdengine": + dsn = fmt.Sprintf("%s:%s@ws(%s)/%s", user, password, address, database) + dialector = NewTDengineDialector(dsn) + default: + err = fmt.Errorf("invalid database driver %q", driver) + return + } + + log.Printf("try to connect to %q", dsn) + db, err = gorm.Open(dialector, &gorm.Config{ + Logger: logger.Default.LogMode(logger.Info), + }) + if err != nil { + err = fmt.Errorf("failed to connect to %q %v", dsn, err) + return + } + + // Configure connection pool to prevent connection leaks + if sqlDB, sqlErr := db.DB(); sqlErr == nil { + // Set maximum number of open connections + sqlDB.SetMaxOpenConns(25) + // Set maximum number of idle connections + sqlDB.SetMaxIdleConns(10) + // Set maximum connection lifetime + sqlDB.SetConnMaxLifetime(time.Hour) + // Set maximum connection idle time + sqlDB.SetConnMaxIdleTime(10 * time.Minute) + + log.Printf("Database connection pool configured: MaxOpen=%d, MaxIdle=%d, MaxLifetime=%v", + 25, 10, time.Hour) + } + + if driver != "tdengine" && driver != "greptime" { + err = errors.Join(err, db.AutoMigrate(&TestSuite{})) + err = errors.Join(err, db.AutoMigrate(&TestCase{})) + err = errors.Join(err, db.AutoMigrate(&HistoryTestResult{})) + } + return +} +``` + +### 4. Add Connection Cleanup +```go +// Add cleanup function for graceful shutdown +func (s *dbserver) CleanupConnections() error { + cacheMutex.Lock() + defer cacheMutex.Unlock() + + var errs []error + for key, db := range dbCache { + if sqlDB, err := db.DB(); err == nil { + log.Printf("Closing database connection: %s", key) + if closeErr := sqlDB.Close(); closeErr != nil { + errs = append(errs, fmt.Errorf("failed to close connection %s: %w", key, closeErr)) + } + } + delete(dbCache, key) + } + + if len(errs) > 0 { + return errors.Join(errs...) + } + return nil +} +``` + +### 5. Add Connection Monitoring +```go +// Add method to monitor connection stats +func (s *dbserver) GetConnectionStats() map[string]sql.DBStats { + cacheMutex.RLock() + defer cacheMutex.RUnlock() + + stats := make(map[string]sql.DBStats) + for key, db := range dbCache { + if sqlDB, err := db.DB(); err == nil { + stats[key] = sqlDB.Stats() + } + } + return stats +} +``` + +## Implementation Steps + +1. **Backup Current Code**: Ensure you have a backup of the current `atest-ext-store-orm` code +2. **Apply Connection Pool Fix**: Update the `createDB` function with connection pool configuration +3. **Fix Cache Logic**: Implement the composite cache key strategy +4. **Add Cleanup**: Implement proper connection cleanup +5. **Add Monitoring**: Add connection statistics monitoring +6. **Test**: Thoroughly test database switching scenarios + +## Testing the Fix + +### Test Scenario +```go +// Test script to verify fix +func TestConnectionLeak(t *testing.T) { + server := NewRemoteServer(10) + + // Create contexts for different databases + ctx1 := remote.WithIncomingStoreContext(context.TODO(), &Store{ + Name: "test-store", + URL: "localhost:3306", + Username: "root", + Password: "root", + Properties: map[string]string{ + "driver": "mysql", + "database": "db1", + }, + }) + + ctx2 := remote.WithIncomingStoreContext(context.TODO(), &Store{ + Name: "test-store", + URL: "localhost:3306", + Username: "root", + Password: "root", + Properties: map[string]string{ + "driver": "mysql", + "database": "db2", + }, + }) + + // Alternate between databases multiple times + for i := 0; i < 100; i++ { + _, err1 := server.Query(ctx1, &server.DataQuery{Sql: "SELECT 1"}) + assert.NoError(t, err1) + + _, err2 := server.Query(ctx2, &server.DataQuery{Sql: "SELECT 1"}) + assert.NoError(t, err2) + + // Check connection stats + if i%10 == 0 { + stats := server.GetConnectionStats() + for key, stat := range stats { + t.Logf("Connection %s: Open=%d, InUse=%d", key, stat.OpenConnections, stat.InUse) + // Ensure connections don't keep growing + assert.True(t, stat.OpenConnections <= 50, "Too many open connections") + } + } + } +} +``` + +## Benefits + +1. **Prevents Connection Leaks**: Properly manages connection lifecycle +2. **Improved Performance**: Connection reuse reduces overhead +3. **Resource Control**: Connection pool limits prevent resource exhaustion +4. **Better Monitoring**: Connection statistics help diagnose issues +5. **Thread Safety**: Proper mutex usage prevents race conditions + +## Configuration Options + +Add these properties to store configuration for fine-tuning: + +```yaml +properties: + maxOpenConns: 25 # Maximum open connections + maxIdleConns: 10 # Maximum idle connections + connMaxLifetime: 1h # Maximum connection lifetime + connMaxIdleTime: 10m # Maximum connection idle time +``` + +This fix addresses the root cause of the database connection leak and provides a robust solution for connection management in the ORM store extension. diff --git a/pkg/mock/types.go b/pkg/mock/types.go index 06b490bb9..b37911757 100644 --- a/pkg/mock/types.go +++ b/pkg/mock/types.go @@ -16,65 +16,65 @@ limitations under the License. package mock type Object struct { - Name string `yaml:"name" json:"name"` - InitCount *int `yaml:"initCount" json:"initCount"` - Sample string `yaml:"sample" json:"sample"` + Name string `yaml:"name" json:"name"` + InitCount *int `yaml:"initCount" json:"initCount"` + Sample string `yaml:"sample" json:"sample"` } type Item struct { - Name string `yaml:"name" json:"name"` - Request Request `yaml:"request" json:"request"` - Response Response `yaml:"response" json:"response"` - Param map[string]interface{} + Name string `yaml:"name" json:"name"` + Request Request `yaml:"request" json:"request"` + Response Response `yaml:"response" json:"response"` + Param map[string]interface{} } type Request struct { - Protocol string `yaml:"protocol" json:"protocol"` - Path string `yaml:"path" json:"path"` - Method string `yaml:"method" json:"method"` - Header map[string]string `yaml:"header" json:"header"` - Body string `yaml:"body" json:"body"` + Protocol string `yaml:"protocol" json:"protocol"` + Path string `yaml:"path" json:"path"` + Method string `yaml:"method" json:"method"` + Header map[string]string `yaml:"header" json:"header"` + Body string `yaml:"body" json:"body"` } type RequestWithAuth struct { - Request `yaml:",inline"` - BearerAPI string `yaml:"bearerAPI" json:"bearerAPI"` - Username string `yaml:"username" json:"username"` - Password string `yaml:"password" json:"password"` + Request `yaml:",inline"` + BearerAPI string `yaml:"bearerAPI" json:"bearerAPI"` + Username string `yaml:"username" json:"username"` + Password string `yaml:"password" json:"password"` } type Response struct { - Encoder string `yaml:"encoder" json:"encoder"` - Body string `yaml:"body" json:"body"` - BodyFromFile string `yaml:"bodyFromFile" json:"bodyFromFile"` - Header map[string]string `yaml:"header" json:"header"` - StatusCode int `yaml:"statusCode" json:"statusCode"` - BodyData []byte + Encoder string `yaml:"encoder" json:"encoder"` + Body string `yaml:"body" json:"body"` + BodyFromFile string `yaml:"bodyFromFile" json:"bodyFromFile"` + Header map[string]string `yaml:"header" json:"header"` + StatusCode int `yaml:"statusCode" json:"statusCode"` + BodyData []byte } type Webhook struct { - Name string `yaml:"name" json:"name"` - Timer string `yaml:"timer" json:"timer"` - Param map[string]string `yaml:"param" json:"param"` - Request RequestWithAuth `yaml:"request" json:"request"` + Name string `yaml:"name" json:"name"` + Timer string `yaml:"timer" json:"timer"` + Param map[string]string `yaml:"param" json:"param"` + Request RequestWithAuth `yaml:"request" json:"request"` } type Proxy struct { - Prefix string `yaml:"prefix" json:"prefix"` - Port int `yaml:"port" json:"port"` - Path string `yaml:"path" json:"path"` - Target string `yaml:"target" json:"target"` - RequestAmend RequestAmend `yaml:"requestAmend" json:"requestAmend"` - Protocol string `yaml:"protocol" json:"protocol"` + Prefix string `yaml:"prefix" json:"prefix"` + Port int `yaml:"port" json:"port"` + Path string `yaml:"path" json:"path"` + Target string `yaml:"target" json:"target"` + RequestAmend RequestAmend `yaml:"requestAmend" json:"requestAmend"` + Protocol string `yaml:"protocol" json:"protocol"` } type RequestAmend struct { - BodyPatch string `yaml:"bodyPatch" json:"bodyPatch"` + BodyPatch string `yaml:"bodyPatch" json:"bodyPatch"` } type Server struct { - Objects []Object `yaml:"objects" json:"objects"` - Items []Item `yaml:"items" json:"items"` - Proxies []Proxy `yaml:"proxies" json:"proxies"` - Webhooks []Webhook `yaml:"webhooks" json:"webhooks"` + Objects []Object `yaml:"objects" json:"objects"` + Items []Item `yaml:"items" json:"items"` + Proxies []Proxy `yaml:"proxies" json:"proxies"` + Webhooks []Webhook `yaml:"webhooks" json:"webhooks"` } From 68cb3473c8ca05b29da0e3be4580dbf7fade575e Mon Sep 17 00:00:00 2001 From: Jai Raj Choudhary Date: Fri, 12 Sep 2025 17:52:15 -0700 Subject: [PATCH 2/6] fix: improve test reliability and thread safety - Add thread-safe mutex protection to MockORMStore - Improve goroutine cleanup with proper channel handling - Add timeout protection to prevent test hanging - Address SonarQube reliability concerns in concurrent tests --- docs/fixes/connection_leak_test.go | 36 +++++++++++++++++++++++++----- 1 file changed, 31 insertions(+), 5 deletions(-) diff --git a/docs/fixes/connection_leak_test.go b/docs/fixes/connection_leak_test.go index b99f3f31b..164b35c8b 100644 --- a/docs/fixes/connection_leak_test.go +++ b/docs/fixes/connection_leak_test.go @@ -6,6 +6,7 @@ package fixes import ( "context" "errors" + "sync" "testing" "time" @@ -19,6 +20,7 @@ import ( // MockORMStore represents a mock implementation for testing connection leak fixes type MockORMStore struct { + mu sync.RWMutex activeConnections map[string]int maxConnections int } @@ -33,11 +35,15 @@ func NewMockORMStore(maxConnections int) *MockORMStore { // GetActiveConnections returns the number of active connections for a database func (m *MockORMStore) GetActiveConnections(database string) int { + m.mu.RLock() + defer m.mu.RUnlock() return m.activeConnections[database] } // SimulateConnection simulates creating a connection to a database func (m *MockORMStore) SimulateConnection(database string) error { + m.mu.Lock() + defer m.mu.Unlock() if m.activeConnections[database] >= m.maxConnections { return ErrTooManyConnections } @@ -47,6 +53,8 @@ func (m *MockORMStore) SimulateConnection(database string) error { // SimulateDisconnection simulates closing a connection func (m *MockORMStore) SimulateDisconnection(database string) { + m.mu.Lock() + defer m.mu.Unlock() if m.activeConnections[database] > 0 { m.activeConnections[database]-- } @@ -163,7 +171,13 @@ func TestConcurrentDatabaseAccess(t *testing.T) { // Worker 1: Access db1 repeatedly go func() { - defer func() { done <- true }() + defer func() { + // Ensure we always send to done channel to prevent test hanging + select { + case done <- true: + default: + } + }() for { select { case <-ctx.Done(): @@ -181,7 +195,13 @@ func TestConcurrentDatabaseAccess(t *testing.T) { // Worker 2: Access db2 repeatedly go func() { - defer func() { done <- true }() + defer func() { + // Ensure we always send to done channel to prevent test hanging + select { + case done <- true: + default: + } + }() for { select { case <-ctx.Done(): @@ -201,9 +221,15 @@ func TestConcurrentDatabaseAccess(t *testing.T) { time.Sleep(time.Millisecond * 100) cancel() - // Wait for workers to finish - <-done - <-done + // Wait for workers to finish with timeout protection + for i := 0; i < 2; i++ { + select { + case <-done: + // Worker finished + case <-time.After(5 * time.Second): + t.Fatal("Test timed out waiting for workers to finish") + } + } // Verify no connections are leaked db1Count := mockStore.GetActiveConnections("db1") From 356e95920a6d75acf523996243be5429185100b3 Mon Sep 17 00:00:00 2001 From: Jai Raj Choudhary Date: Fri, 12 Sep 2025 17:55:46 -0700 Subject: [PATCH 3/6] fix: proper context lifecycle management in run loop - Replace defer cancel() with explicit cancel() calls - Cancel context immediately after each test iteration - Prevent context accumulation in tight loops - Address SonarQube reliability rating C -> A --- cmd/run.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/run.go b/cmd/run.go index ce6452a72..d1400bccb 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -403,11 +403,11 @@ func (o *runOption) runSuite(loader testing.Loader, dataContext map[string]inter o.limiter.Accept() ctxWithTimeout, cancel := context.WithTimeout(ctx, o.requestTimeout) - defer cancel() ctxWithTimeout = context.WithValue(ctxWithTimeout, runner.ContextKey("").ParentDir(), loader.GetContext()) output, err = suiteRunner.RunTestCase(&testCase, dataContext, ctxWithTimeout) if err = util.ErrorWrap(err, "failed to run '%s', %v", testCase.Name, err); err != nil { + cancel() // Cancel context before handling error if o.requestIgnoreError { errs = append(errs, err) } else { @@ -419,10 +419,12 @@ func (o *runOption) runSuite(loader testing.Loader, dataContext map[string]inter reverseRunner.WithTestReporter(runner.NewDiscardTestReporter()) if _, err = reverseRunner.RunTestCase( &testCase, dataContext, ctxWithTimeout); err != nil { + cancel() // Cancel context before returning error err = fmt.Errorf("got error in reverse test: %w", err) return } suiteRunner.WithTestReporter(o.reporter) + cancel() // Cancel context after successful completion } dataContext[testCase.Name] = output } From 1472ea722204d4d1926606c8360193467286df1b Mon Sep 17 00:00:00 2001 From: Jai Raj Choudhary Date: Fri, 12 Sep 2025 17:58:47 -0700 Subject: [PATCH 4/6] fix: simplify context lifecycle with defer cancel() - Use defer cancel() for consistent context cleanup - Removes manual cancel() calls in error paths - Ensures context is always properly cleaned up - Addresses SonarQube reliability concerns --- cmd/run.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/cmd/run.go b/cmd/run.go index d1400bccb..3a2edf262 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -403,11 +403,11 @@ func (o *runOption) runSuite(loader testing.Loader, dataContext map[string]inter o.limiter.Accept() ctxWithTimeout, cancel := context.WithTimeout(ctx, o.requestTimeout) + defer cancel() // Ensure context is always cancelled when leaving this scope ctxWithTimeout = context.WithValue(ctxWithTimeout, runner.ContextKey("").ParentDir(), loader.GetContext()) output, err = suiteRunner.RunTestCase(&testCase, dataContext, ctxWithTimeout) if err = util.ErrorWrap(err, "failed to run '%s', %v", testCase.Name, err); err != nil { - cancel() // Cancel context before handling error if o.requestIgnoreError { errs = append(errs, err) } else { @@ -419,12 +419,10 @@ func (o *runOption) runSuite(loader testing.Loader, dataContext map[string]inter reverseRunner.WithTestReporter(runner.NewDiscardTestReporter()) if _, err = reverseRunner.RunTestCase( &testCase, dataContext, ctxWithTimeout); err != nil { - cancel() // Cancel context before returning error err = fmt.Errorf("got error in reverse test: %w", err) return } suiteRunner.WithTestReporter(o.reporter) - cancel() // Cancel context after successful completion } dataContext[testCase.Name] = output } From fa18a2d2cf4b07674a09d8ea6ee74c9869752a46 Mon Sep 17 00:00:00 2001 From: Jai Raj Choudhary Date: Fri, 12 Sep 2025 18:02:03 -0700 Subject: [PATCH 5/6] refactired connection leak test go file --- docs/fixes/connection_leak_test.go | 17 ++++++----------- 1 file changed, 6 insertions(+), 11 deletions(-) diff --git a/docs/fixes/connection_leak_test.go b/docs/fixes/connection_leak_test.go index 164b35c8b..e18151776 100644 --- a/docs/fixes/connection_leak_test.go +++ b/docs/fixes/connection_leak_test.go @@ -106,11 +106,10 @@ func TestConnectionPoolConfiguration(t *testing.T) { tests := []struct { name string maxConnections int - expectError bool }{ - {"Normal pool size", 10, false}, - {"Large pool size", 100, false}, - {"Small pool size", 1, false}, + {"Normal pool size", 10}, + {"Large pool size", 100}, + {"Small pool size", 1}, } for _, tt := range tests { @@ -123,14 +122,10 @@ func TestConnectionPoolConfiguration(t *testing.T) { assert.NoError(t, err, "Should not error within connection limit") } - // Try to exceed the limit + // Try to exceed the limit - should always error err := mockStore.SimulateConnection("testdb") - if tt.expectError { - assert.Error(t, err, "Should error when exceeding connection limit") - } else { - // For our mock, we expect error when exceeding limit - assert.Error(t, err, "Should error when exceeding connection limit") - } + assert.Error(t, err, "Should error when exceeding connection limit") + assert.Equal(t, ErrTooManyConnections, err, "Should return specific error type") }) } } From fd94a1b932cac746eb3e92b055cf86a4fbc79d57 Mon Sep 17 00:00:00 2001 From: jairajc Date: Mon, 15 Sep 2025 18:26:21 -0700 Subject: [PATCH 6/6] docs: remove unnecessary documentation files --- docs/fixes/connection_leak_test.go | 261 ------------------- docs/fixes/database_connection_leak.patch | 174 ------------- docs/fixes/database_connection_leak_fix.md | 287 --------------------- 3 files changed, 722 deletions(-) delete mode 100644 docs/fixes/connection_leak_test.go delete mode 100644 docs/fixes/database_connection_leak.patch delete mode 100644 docs/fixes/database_connection_leak_fix.md diff --git a/docs/fixes/connection_leak_test.go b/docs/fixes/connection_leak_test.go deleted file mode 100644 index e18151776..000000000 --- a/docs/fixes/connection_leak_test.go +++ /dev/null @@ -1,261 +0,0 @@ -// connection_leak_test.go -// Test file to verify database connection leak fixes in atest-ext-store-orm - -package fixes - -import ( - "context" - "errors" - "sync" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" -) - -// Note: This test file is designed to work with the atest-ext-store-orm extension -// after applying the connection leak fix patch. It demonstrates how to test -// connection management and validate the fix is working correctly. - -// MockORMStore represents a mock implementation for testing connection leak fixes -type MockORMStore struct { - mu sync.RWMutex - activeConnections map[string]int - maxConnections int -} - -// NewMockORMStore creates a new mock ORM store for testing -func NewMockORMStore(maxConnections int) *MockORMStore { - return &MockORMStore{ - activeConnections: make(map[string]int), - maxConnections: maxConnections, - } -} - -// GetActiveConnections returns the number of active connections for a database -func (m *MockORMStore) GetActiveConnections(database string) int { - m.mu.RLock() - defer m.mu.RUnlock() - return m.activeConnections[database] -} - -// SimulateConnection simulates creating a connection to a database -func (m *MockORMStore) SimulateConnection(database string) error { - m.mu.Lock() - defer m.mu.Unlock() - if m.activeConnections[database] >= m.maxConnections { - return ErrTooManyConnections - } - m.activeConnections[database]++ - return nil -} - -// SimulateDisconnection simulates closing a connection -func (m *MockORMStore) SimulateDisconnection(database string) { - m.mu.Lock() - defer m.mu.Unlock() - if m.activeConnections[database] > 0 { - m.activeConnections[database]-- - } -} - -// ErrTooManyConnections represents the error when connection limit is exceeded -var ErrTooManyConnections = errors.New("too many connections") - -// TestDatabaseConnectionLeak verifies that database connections are properly managed -// and don't leak when switching between different databases -func TestDatabaseConnectionLeak(t *testing.T) { - if testing.Short() { - t.Skip("Skipping connection leak test in short mode") - } - - mockStore := NewMockORMStore(10) - - // Test rapid database switching without connection leaks - databases := []string{"db1", "db2", "db3"} - - // Simulate multiple rapid switches - for i := 0; i < 50; i++ { - for _, db := range databases { - // Simulate connection creation - err := mockStore.SimulateConnection(db) - require.NoError(t, err, "Should not exceed connection limit on iteration %d for db %s", i, db) - - // Verify connection count is reasonable - count := mockStore.GetActiveConnections(db) - assert.LessOrEqual(t, count, 3, "Too many connections for db %s: %d", db, count) - - // Simulate some work - time.Sleep(time.Millisecond) - - // Simulate connection cleanup - mockStore.SimulateDisconnection(db) - } - } - - // Verify all connections are cleaned up - for _, db := range databases { - count := mockStore.GetActiveConnections(db) - assert.Equal(t, 0, count, "Connections not properly cleaned up for db %s", db) - } -} - -// TestConnectionPoolConfiguration verifies connection pool settings work correctly -func TestConnectionPoolConfiguration(t *testing.T) { - tests := []struct { - name string - maxConnections int - }{ - {"Normal pool size", 10}, - {"Large pool size", 100}, - {"Small pool size", 1}, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - mockStore := NewMockORMStore(tt.maxConnections) - - // Try to create connections up to the limit - for i := 0; i < tt.maxConnections; i++ { - err := mockStore.SimulateConnection("testdb") - assert.NoError(t, err, "Should not error within connection limit") - } - - // Try to exceed the limit - should always error - err := mockStore.SimulateConnection("testdb") - assert.Error(t, err, "Should error when exceeding connection limit") - assert.Equal(t, ErrTooManyConnections, err, "Should return specific error type") - }) - } -} - -// TestConnectionReuse verifies that connections are properly reused -func TestConnectionReuse(t *testing.T) { - mockStore := NewMockORMStore(10) - - // Create and close connections multiple times - for i := 0; i < 20; i++ { - err := mockStore.SimulateConnection("reusedb") - require.NoError(t, err, "Connection creation should not fail on iteration %d", i) - - // Verify connection count - count := mockStore.GetActiveConnections("reusedb") - assert.LessOrEqual(t, count, 10, "Connection count should not exceed pool size") - - mockStore.SimulateDisconnection("reusedb") - } - - // Final verification - finalCount := mockStore.GetActiveConnections("reusedb") - assert.Equal(t, 0, finalCount, "All connections should be closed") -} - -// TestConcurrentDatabaseAccess simulates concurrent access to multiple databases -func TestConcurrentDatabaseAccess(t *testing.T) { - if testing.Short() { - t.Skip("Skipping concurrent test in short mode") - } - - mockStore := NewMockORMStore(5) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() - - // Simulate concurrent access patterns - done := make(chan bool, 2) - - // Worker 1: Access db1 repeatedly - go func() { - defer func() { - // Ensure we always send to done channel to prevent test hanging - select { - case done <- true: - default: - } - }() - for { - select { - case <-ctx.Done(): - return - default: - err := mockStore.SimulateConnection("db1") - if err == nil { - time.Sleep(time.Millisecond * 10) - mockStore.SimulateDisconnection("db1") - } - time.Sleep(time.Millisecond * 5) - } - } - }() - - // Worker 2: Access db2 repeatedly - go func() { - defer func() { - // Ensure we always send to done channel to prevent test hanging - select { - case done <- true: - default: - } - }() - for { - select { - case <-ctx.Done(): - return - default: - err := mockStore.SimulateConnection("db2") - if err == nil { - time.Sleep(time.Millisecond * 10) - mockStore.SimulateDisconnection("db2") - } - time.Sleep(time.Millisecond * 5) - } - } - }() - - // Let workers run for a short time - time.Sleep(time.Millisecond * 100) - cancel() - - // Wait for workers to finish with timeout protection - for i := 0; i < 2; i++ { - select { - case <-done: - // Worker finished - case <-time.After(5 * time.Second): - t.Fatal("Test timed out waiting for workers to finish") - } - } - - // Verify no connections are leaked - db1Count := mockStore.GetActiveConnections("db1") - db2Count := mockStore.GetActiveConnections("db2") - assert.LessOrEqual(t, db1Count, 5, "db1 should not have excessive connections") - assert.LessOrEqual(t, db2Count, 5, "db2 should not have excessive connections") -} - -// TestCacheKeyGeneration verifies that cache keys are properly generated -func TestCacheKeyGeneration(t *testing.T) { - tests := []struct { - store string - database string - expected string - }{ - {"mysql", "testdb", "mysql:testdb"}, - {"postgres", "proddb", "postgres:proddb"}, - {"sqlite", "local.db", "sqlite:local.db"}, - {"mysql", "", "mysql:"}, - {"", "testdb", ":testdb"}, - } - - for _, tt := range tests { - t.Run(tt.expected, func(t *testing.T) { - cacheKey := generateCacheKey(tt.store, tt.database) - assert.Equal(t, tt.expected, cacheKey, "Cache key should match expected format") - }) - } -} - -// generateCacheKey creates a composite cache key for store and database -func generateCacheKey(store, database string) string { - return store + ":" + database -} diff --git a/docs/fixes/database_connection_leak.patch b/docs/fixes/database_connection_leak.patch deleted file mode 100644 index a98b8a31f..000000000 --- a/docs/fixes/database_connection_leak.patch +++ /dev/null @@ -1,174 +0,0 @@ -From: GitHub Copilot -Date: Thu, 12 Sep 2025 00:00:00 +0000 -Subject: [PATCH] Fix database connection leak in ORM store extension - -This patch fixes the database connection leak issue where MySQL client -connections keep increasing over time when switching between databases. - -Changes: -- Use composite cache keys (store:database) instead of just store name -- Add proper connection pool configuration with limits -- Implement thread-safe connection caching with mutex -- Add connection cleanup functionality -- Add connection monitoring capabilities - -Fixes the issue where repeated queries with database switches would -create new connections without properly reusing or closing old ones. - ---- - pkg/server.go | 89 ++++++++++++++++++++++++++++++++++++++++++-------- - 1 file changed, 75 insertions(+), 14 deletions(-) - -diff --git a/pkg/server.go b/pkg/server.go -index 1234567..abcdefg 100644 ---- a/pkg/server.go -+++ b/pkg/server.go -@@ -20,6 +20,8 @@ import ( - "os" - "path/filepath" - "strconv" -+ "sync" -+ "time" - "strings" - - "github.com/linuxsuren/api-testing/pkg/extension" -@@ -50,8 +52,16 @@ func NewRemoteServer(defaultHistoryLimit int) (s remote.LoaderServer) { - return - } - --func createDB(user, password, address, database, driver string) (db *gorm.DB, err error) { -+// generateCacheKey creates a unique cache key for store + database combination -+func generateCacheKey(storeName, database string) string { -+ return fmt.Sprintf("%s:%s", storeName, database) -+} -+ -+// createDBWithPool creates a database connection with proper connection pool configuration -+func createDBWithPool(user, password, address, database, driver string) (db *gorm.DB, err error) { - var dialector gorm.Dialector - var dsn string -+ - switch driver { - case DialectorMySQL, "", "greptime": - if !strings.Contains(address, ":") { -@@ -84,6 +94,21 @@ func createDB(user, password, address, database, driver string) (db *gorm.DB, e - return - } - -+ // Configure connection pool to prevent connection leaks -+ if sqlDB, sqlErr := db.DB(); sqlErr == nil { -+ // Set maximum number of open connections -+ sqlDB.SetMaxOpenConns(25) -+ // Set maximum number of idle connections -+ sqlDB.SetMaxIdleConns(10) -+ // Set maximum connection lifetime -+ sqlDB.SetConnMaxLifetime(time.Hour) -+ // Set maximum connection idle time -+ sqlDB.SetConnMaxIdleTime(10 * time.Minute) -+ -+ log.Printf("Database connection pool configured: MaxOpen=%d, MaxIdle=%d, MaxLifetime=%v", -+ 25, 10, time.Hour) -+ } -+ - if driver != "tdengine" && driver != "greptime" { - err = errors.Join(err, db.AutoMigrate(&TestSuite{})) - err = errors.Join(err, db.AutoMigrate(&TestCase{})) -@@ -92,13 +117,16 @@ func createDB(user, password, address, database, driver string) (db *gorm.DB, e - return - } - -+// Legacy function for backward compatibility -+func createDB(user, password, address, database, driver string) (db *gorm.DB, err error) { -+ return createDBWithPool(user, password, address, database, driver) -+} -+ - var dbCache = make(map[string]*gorm.DB) --var dbNameCache = make(map[string]string) -+var cacheMutex = sync.RWMutex{} - - func (s *dbserver) getClientWithDatabase(ctx context.Context, dbName string) (dbQuery DataQuery, err error) { - store := remote.GetStoreFromContext(ctx) - if store == nil { - err = errors.New("no connect to database") -- } else { -+ return -+ } -+ - database := dbName - if database == "" { - if v, ok := store.Properties["database"]; ok && v != "" { -@@ -110,18 +138,34 @@ func (s *dbserver) getClientWithDatabase(ctx context.Context, dbName string) (d - if v, ok := store.Properties["driver"]; ok && v != "" { - driver = v - } -- log.Printf("get client from driver[%s] in database [%s]", driver, database) -- -- var ok bool -- var db *gorm.DB -- if db, ok = dbCache[store.Name]; (ok && db != nil && dbNameCache[store.Name] != database) || !ok { -- if db, err = createDB(store.Username, store.Password, store.URL, database, driver); err == nil { -- dbCache[store.Name] = db -- dbNameCache[store.Name] = database -+ -+ // Use composite cache key to avoid connection leaks -+ cacheKey := generateCacheKey(store.Name, database) -+ -+ cacheMutex.RLock() -+ db, exists := dbCache[cacheKey] -+ cacheMutex.RUnlock() -+ -+ if !exists || db == nil { -+ log.Printf("Creating new connection for store[%s] database[%s]", store.Name, database) -+ -+ if db, err = createDBWithPool(store.Username, store.Password, store.URL, database, driver); err == nil { -+ cacheMutex.Lock() -+ dbCache[cacheKey] = db -+ cacheMutex.Unlock() - } else { - return - } -+ } else { -+ log.Printf("Reusing existing connection for store[%s] database[%s]", store.Name, database) - } - - dbQuery = NewCommonDataQuery(GetInnerSQL(driver), db) -- } - return - } -+ -+// CleanupConnections closes all cached database connections -+func (s *dbserver) CleanupConnections() error { -+ cacheMutex.Lock() -+ defer cacheMutex.Unlock() -+ -+ var errs []error -+ for key, db := range dbCache { -+ if sqlDB, err := db.DB(); err == nil { -+ log.Printf("Closing database connection: %s", key) -+ if closeErr := sqlDB.Close(); closeErr != nil { -+ errs = append(errs, fmt.Errorf("failed to close connection %s: %w", key, closeErr)) -+ } -+ } -+ delete(dbCache, key) -+ } -+ -+ if len(errs) > 0 { -+ return errors.Join(errs...) -+ } -+ return nil -+} -+ -+// GetConnectionStats returns connection statistics for monitoring -+func (s *dbserver) GetConnectionStats() map[string]sql.DBStats { -+ cacheMutex.RLock() -+ defer cacheMutex.RUnlock() -+ -+ stats := make(map[string]sql.DBStats) -+ for key, db := range dbCache { -+ if sqlDB, err := db.DB(); err == nil { -+ stats[key] = sqlDB.Stats() -+ } -+ } -+ return stats -+} --- -2.34.1 diff --git a/docs/fixes/database_connection_leak_fix.md b/docs/fixes/database_connection_leak_fix.md deleted file mode 100644 index 408f887bd..000000000 --- a/docs/fixes/database_connection_leak_fix.md +++ /dev/null @@ -1,287 +0,0 @@ -# Database Connection Leak Fix - -## Problem Description -The ORM database store in api-testing has a connection leak issue where MySQL client connections keep increasing over time, especially when switching between databases. This eventually leads to "too many connections" errors. - -## Root Cause Analysis - -### Issue Location -The problem is in the `atest-ext-store-orm` extension, specifically in the `getClientWithDatabase` method in `pkg/server.go`. - -### Current Problematic Code -```go -var dbCache = make(map[string]*gorm.DB) -var dbNameCache = make(map[string]string) - -func (s *dbserver) getClientWithDatabase(ctx context.Context, dbName string) (dbQuery DataQuery, err error) { - // ... store and database logic ... - - var ok bool - var db *gorm.DB - if db, ok = dbCache[store.Name]; (ok && db != nil && dbNameCache[store.Name] != database) || !ok { - if db, err = createDB(store.Username, store.Password, store.URL, database, driver); err == nil { - dbCache[store.Name] = db // <- PROBLEM: Overwrites without closing old connection - dbNameCache[store.Name] = database - } else { - return - } - } - // ... -} -``` - -### Issues Identified -1. **Cache Key Problem**: Using only `store.Name` as cache key means switching databases creates new connections -2. **No Connection Cleanup**: Old connections are overwritten without being properly closed -3. **Missing Connection Pool Configuration**: No limits set on connection pool size -4. **Improper Cache Logic**: The condition creates new connections instead of reusing existing ones per database - -## Solution - -### 1. Fix Cache Key Strategy -Use composite keys that include both store name and database name: - -```go -// Generate a unique cache key for store + database combination -func generateCacheKey(storeName, database string) string { - return fmt.Sprintf("%s:%s", storeName, database) -} -``` - -### 2. Proper Connection Management -```go -var dbCache = make(map[string]*gorm.DB) -var cacheMutex = sync.RWMutex{} - -func (s *dbserver) getClientWithDatabase(ctx context.Context, dbName string) (dbQuery DataQuery, err error) { - store := remote.GetStoreFromContext(ctx) - if store == nil { - err = errors.New("no connect to database") - return - } - - database := dbName - if database == "" { - if v, ok := store.Properties["database"]; ok && v != "" { - database = v - } - } - - driver := DialectorMySQL - if v, ok := store.Properties["driver"]; ok && v != "" { - driver = v - } - - // Use composite cache key - cacheKey := generateCacheKey(store.Name, database) - - cacheMutex.RLock() - db, exists := dbCache[cacheKey] - cacheMutex.RUnlock() - - if !exists || db == nil { - log.Printf("Creating new connection for store[%s] database[%s]", store.Name, database) - - if db, err = createDBWithPool(store.Username, store.Password, store.URL, database, driver); err == nil { - cacheMutex.Lock() - dbCache[cacheKey] = db - cacheMutex.Unlock() - } else { - return - } - } - - dbQuery = NewCommonDataQuery(GetInnerSQL(driver), db) - return -} -``` - -### 3. Enhanced createDB Function with Connection Pool -```go -func createDBWithPool(user, password, address, database, driver string) (db *gorm.DB, err error) { - var dialector gorm.Dialector - var dsn string - - switch driver { - case DialectorMySQL, "", "greptime": - if !strings.Contains(address, ":") { - address = fmt.Sprintf("%s:%d", address, 3306) - } - dsn = fmt.Sprintf("%s:%s@tcp(%s)/%s?charset=utf8mb4&parseTime=true", user, password, address, database) - dialector = mysql.Open(dsn) - case "sqlite": - dsn = fmt.Sprintf("%s.db", database) - dialector = sqlite.Open(dsn) - case DialectorPostgres: - obj := strings.Split(address, ":") - host, port := obj[0], "5432" - if len(obj) > 1 { - port = obj[1] - } - dsn = fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%s sslmode=disable TimeZone=Asia/Shanghai", host, user, password, database, port) - dialector = postgres.Open(dsn) - case "tdengine": - dsn = fmt.Sprintf("%s:%s@ws(%s)/%s", user, password, address, database) - dialector = NewTDengineDialector(dsn) - default: - err = fmt.Errorf("invalid database driver %q", driver) - return - } - - log.Printf("try to connect to %q", dsn) - db, err = gorm.Open(dialector, &gorm.Config{ - Logger: logger.Default.LogMode(logger.Info), - }) - if err != nil { - err = fmt.Errorf("failed to connect to %q %v", dsn, err) - return - } - - // Configure connection pool to prevent connection leaks - if sqlDB, sqlErr := db.DB(); sqlErr == nil { - // Set maximum number of open connections - sqlDB.SetMaxOpenConns(25) - // Set maximum number of idle connections - sqlDB.SetMaxIdleConns(10) - // Set maximum connection lifetime - sqlDB.SetConnMaxLifetime(time.Hour) - // Set maximum connection idle time - sqlDB.SetConnMaxIdleTime(10 * time.Minute) - - log.Printf("Database connection pool configured: MaxOpen=%d, MaxIdle=%d, MaxLifetime=%v", - 25, 10, time.Hour) - } - - if driver != "tdengine" && driver != "greptime" { - err = errors.Join(err, db.AutoMigrate(&TestSuite{})) - err = errors.Join(err, db.AutoMigrate(&TestCase{})) - err = errors.Join(err, db.AutoMigrate(&HistoryTestResult{})) - } - return -} -``` - -### 4. Add Connection Cleanup -```go -// Add cleanup function for graceful shutdown -func (s *dbserver) CleanupConnections() error { - cacheMutex.Lock() - defer cacheMutex.Unlock() - - var errs []error - for key, db := range dbCache { - if sqlDB, err := db.DB(); err == nil { - log.Printf("Closing database connection: %s", key) - if closeErr := sqlDB.Close(); closeErr != nil { - errs = append(errs, fmt.Errorf("failed to close connection %s: %w", key, closeErr)) - } - } - delete(dbCache, key) - } - - if len(errs) > 0 { - return errors.Join(errs...) - } - return nil -} -``` - -### 5. Add Connection Monitoring -```go -// Add method to monitor connection stats -func (s *dbserver) GetConnectionStats() map[string]sql.DBStats { - cacheMutex.RLock() - defer cacheMutex.RUnlock() - - stats := make(map[string]sql.DBStats) - for key, db := range dbCache { - if sqlDB, err := db.DB(); err == nil { - stats[key] = sqlDB.Stats() - } - } - return stats -} -``` - -## Implementation Steps - -1. **Backup Current Code**: Ensure you have a backup of the current `atest-ext-store-orm` code -2. **Apply Connection Pool Fix**: Update the `createDB` function with connection pool configuration -3. **Fix Cache Logic**: Implement the composite cache key strategy -4. **Add Cleanup**: Implement proper connection cleanup -5. **Add Monitoring**: Add connection statistics monitoring -6. **Test**: Thoroughly test database switching scenarios - -## Testing the Fix - -### Test Scenario -```go -// Test script to verify fix -func TestConnectionLeak(t *testing.T) { - server := NewRemoteServer(10) - - // Create contexts for different databases - ctx1 := remote.WithIncomingStoreContext(context.TODO(), &Store{ - Name: "test-store", - URL: "localhost:3306", - Username: "root", - Password: "root", - Properties: map[string]string{ - "driver": "mysql", - "database": "db1", - }, - }) - - ctx2 := remote.WithIncomingStoreContext(context.TODO(), &Store{ - Name: "test-store", - URL: "localhost:3306", - Username: "root", - Password: "root", - Properties: map[string]string{ - "driver": "mysql", - "database": "db2", - }, - }) - - // Alternate between databases multiple times - for i := 0; i < 100; i++ { - _, err1 := server.Query(ctx1, &server.DataQuery{Sql: "SELECT 1"}) - assert.NoError(t, err1) - - _, err2 := server.Query(ctx2, &server.DataQuery{Sql: "SELECT 1"}) - assert.NoError(t, err2) - - // Check connection stats - if i%10 == 0 { - stats := server.GetConnectionStats() - for key, stat := range stats { - t.Logf("Connection %s: Open=%d, InUse=%d", key, stat.OpenConnections, stat.InUse) - // Ensure connections don't keep growing - assert.True(t, stat.OpenConnections <= 50, "Too many open connections") - } - } - } -} -``` - -## Benefits - -1. **Prevents Connection Leaks**: Properly manages connection lifecycle -2. **Improved Performance**: Connection reuse reduces overhead -3. **Resource Control**: Connection pool limits prevent resource exhaustion -4. **Better Monitoring**: Connection statistics help diagnose issues -5. **Thread Safety**: Proper mutex usage prevents race conditions - -## Configuration Options - -Add these properties to store configuration for fine-tuning: - -```yaml -properties: - maxOpenConns: 25 # Maximum open connections - maxIdleConns: 10 # Maximum idle connections - connMaxLifetime: 1h # Maximum connection lifetime - connMaxIdleTime: 10m # Maximum connection idle time -``` - -This fix addresses the root cause of the database connection leak and provides a robust solution for connection management in the ORM store extension.