diff --git a/syncmap.go b/syncmap.go index 99100f3..2e83ffc 100644 --- a/syncmap.go +++ b/syncmap.go @@ -57,25 +57,31 @@ func (bm *bucketMap[TKey]) count() int64 { return count } -// gc deletes buckets that are full, which has the -// same semantics as the bucket not existing +// gc deletes buckets that are full using mark-and-sweep func (bm *bucketMap[TKey]) gc(timeFunc func() ntime.Time) (deleted int64) { - bm.mu.Lock() - defer bm.mu.Unlock() + // Phase 1: Mark buckets for deletion + toDelete := make([]bucketSpec[TKey], 0) bm.m.Range(func(key, value any) bool { - // I wonder if there is a possibility of a race here, not sure. - // Thinking about the timing between getting the bucket from the map - // and locking the bucket. Maybe it's not a problem. spec := key.(bucketSpec[TKey]) b := value.(*bucket) b.mu.Lock() if b.isFull(timeFunc(), spec.limit) { - bm.m.Delete(key) - deleted++ + toDelete = append(toDelete, spec) } b.mu.Unlock() return true }) + + // Phase 2: Delete marked buckets atomically + bm.mu.Lock() + defer bm.mu.Unlock() + + for _, spec := range toDelete { + if _, loaded := bm.m.LoadAndDelete(spec); loaded { + deleted++ + } + } + return deleted } diff --git a/syncmap_test.go b/syncmap_test.go index a2ba955..5a70ac3 100644 --- a/syncmap_test.go +++ b/syncmap_test.go @@ -250,7 +250,7 @@ func TestBucketMap_GC(t *testing.T) { // Trying to get the timing right for a good test, // since a slow system like GitHub Actions seems // to take a while to launch goroutines. - signal := make(chan struct{}) + ready := make(chan struct{}) launched := int64(0) var wg sync.WaitGroup @@ -269,17 +269,17 @@ func TestBucketMap_GC(t *testing.T) { } b.mu.Unlock() - // Signal when we reach 100 launched goroutines - if atomic.AddInt64(&launched, 1) == 100 { - close(signal) + // Signal when we reach 10% of the buckets + if atomic.AddInt64(&launched, 1) == count/10 { + close(ready) } }(i) } - // Wait for 100 goroutines to launch before running GC, + // Wait for 10% of the buckets to be created before running GC, // try to induce some concurrency. - <-signal + <-ready bm.gc(func() ntime.Time { return executionTime }) @@ -295,4 +295,107 @@ func TestBucketMap_GC(t *testing.T) { }) require.Equal(t, int64(200), bm.count(), "should have 200 buckets after deletion and GC") }) + + t.Run("gc multiple concurrent calls", func(t *testing.T) { + var bm bucketMap[string] + const count int64 = 500 + + limit := NewLimit(10, time.Second) + executionTime := ntime.Now() + + // Create buckets first + for i := range count { + key := fmt.Sprintf("key%d", i) + b := bm.loadOrStore(key, executionTime, limit) + + // Make 1/5 of the buckets not full by consuming tokens + if i%5 == 0 { + b.mu.Lock() + b.consumeTokens(executionTime, limit, 1) + b.mu.Unlock() + } + } + + require.Equal(t, count, bm.count(), "should have 500 buckets initially") + + // Test multiple goroutines calling gc() simultaneously + const concurrency = 10 + var wg sync.WaitGroup + var deleted int64 + + wg.Add(concurrency) + for range concurrency { + go func() { + defer wg.Done() + d := bm.gc(func() ntime.Time { + return executionTime + }) + atomic.AddInt64(&deleted, d) + }() + } + + wg.Wait() + + // With 1/5 buckets not full, we expect 4/5 * 500 = 400 buckets to be deleted + expectedDeleted := count * 4 / 5 + require.Equal(t, expectedDeleted, deleted, "should have deleted 400 full buckets") + require.Equal(t, int64(100), bm.count(), "should have 100 buckets remaining (the non-full ones)") + }) + + t.Run("gc during rapid bucket creation", func(t *testing.T) { + var bm bucketMap[string] + + limit := NewLimit(10, time.Second) + executionTime := ntime.Now() + + // Start rapid bucket creation + stop := make(chan struct{}) + ready := make(chan struct{}) + var wg sync.WaitGroup + var bucketCount int64 + + wg.Add(1) + go func() { + defer wg.Done() + const minBuckets = 100 + for i := int64(0); ; i++ { + select { + case <-stop: + return + default: + key := fmt.Sprintf("rapid_key%d", i) + bm.loadOrStore(key, executionTime, limit) + + // Signal when we reach the target number of buckets + if atomic.AddInt64(&bucketCount, 1) == minBuckets { + close(ready) + } + + // Small delay to make it "rapid" but not overwhelming + time.Sleep(time.Microsecond) + } + } + }() + + // Wait for target number of buckets to be created before running GC + <-ready + + // Run GC while buckets are being created rapidly + deleted := bm.gc(func() ntime.Time { + return executionTime + }) + + // Stop the rapid creation + close(stop) + wg.Wait() + + // Verify GC completed successfully + require.GreaterOrEqual(t, deleted, int64(0), "GC should complete without errors") + + // Final cleanup + deleted = bm.gc(func() ntime.Time { + return executionTime + }) + require.GreaterOrEqual(t, deleted, int64(0), "final GC should complete without errors") + }) }