Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions syncmap.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,25 +57,31 @@ func (bm *bucketMap[TKey]) count() int64 {
return count
}

// gc deletes buckets that are full, which has the
// same semantics as the bucket not existing
// gc deletes buckets that are full using mark-and-sweep
func (bm *bucketMap[TKey]) gc(timeFunc func() ntime.Time) (deleted int64) {
bm.mu.Lock()
defer bm.mu.Unlock()
// Phase 1: Mark buckets for deletion
toDelete := make([]bucketSpec[TKey], 0)

bm.m.Range(func(key, value any) bool {
// I wonder if there is a possibility of a race here, not sure.
// Thinking about the timing between getting the bucket from the map
// and locking the bucket. Maybe it's not a problem.
spec := key.(bucketSpec[TKey])
b := value.(*bucket)
b.mu.Lock()
if b.isFull(timeFunc(), spec.limit) {
bm.m.Delete(key)
deleted++
toDelete = append(toDelete, spec)
}
b.mu.Unlock()
return true
})

// Phase 2: Delete marked buckets atomically
bm.mu.Lock()
defer bm.mu.Unlock()

for _, spec := range toDelete {
if _, loaded := bm.m.LoadAndDelete(spec); loaded {
deleted++
}
}

return deleted
}
115 changes: 109 additions & 6 deletions syncmap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,7 @@ func TestBucketMap_GC(t *testing.T) {
// Trying to get the timing right for a good test,
// since a slow system like GitHub Actions seems
// to take a while to launch goroutines.
signal := make(chan struct{})
ready := make(chan struct{})
launched := int64(0)

var wg sync.WaitGroup
Expand All @@ -269,17 +269,17 @@ func TestBucketMap_GC(t *testing.T) {
}
b.mu.Unlock()

// Signal when we reach 100 launched goroutines
if atomic.AddInt64(&launched, 1) == 100 {
close(signal)
// Signal when we reach 10% of the buckets
if atomic.AddInt64(&launched, 1) == count/10 {
close(ready)
}
}(i)
}

// Wait for 100 goroutines to launch before running GC,
// Wait for 10% of the buckets to be created before running GC,
// try to induce some concurrency.

<-signal
<-ready
bm.gc(func() ntime.Time {
return executionTime
})
Expand All @@ -295,4 +295,107 @@ func TestBucketMap_GC(t *testing.T) {
})
require.Equal(t, int64(200), bm.count(), "should have 200 buckets after deletion and GC")
})

t.Run("gc multiple concurrent calls", func(t *testing.T) {
var bm bucketMap[string]
const count int64 = 500

limit := NewLimit(10, time.Second)
executionTime := ntime.Now()

// Create buckets first
for i := range count {
key := fmt.Sprintf("key%d", i)
b := bm.loadOrStore(key, executionTime, limit)

// Make 1/5 of the buckets not full by consuming tokens
if i%5 == 0 {
b.mu.Lock()
b.consumeTokens(executionTime, limit, 1)
b.mu.Unlock()
}
}

require.Equal(t, count, bm.count(), "should have 500 buckets initially")

// Test multiple goroutines calling gc() simultaneously
const concurrency = 10
var wg sync.WaitGroup
var deleted int64

wg.Add(concurrency)
for range concurrency {
go func() {
defer wg.Done()
d := bm.gc(func() ntime.Time {
return executionTime
})
atomic.AddInt64(&deleted, d)
}()
}

wg.Wait()

// With 1/5 buckets not full, we expect 4/5 * 500 = 400 buckets to be deleted
expectedDeleted := count * 4 / 5
require.Equal(t, expectedDeleted, deleted, "should have deleted 400 full buckets")
require.Equal(t, int64(100), bm.count(), "should have 100 buckets remaining (the non-full ones)")
})

t.Run("gc during rapid bucket creation", func(t *testing.T) {
var bm bucketMap[string]

limit := NewLimit(10, time.Second)
executionTime := ntime.Now()

// Start rapid bucket creation
stop := make(chan struct{})
ready := make(chan struct{})
var wg sync.WaitGroup
var bucketCount int64

wg.Add(1)
go func() {
defer wg.Done()
const minBuckets = 100
for i := int64(0); ; i++ {
select {
case <-stop:
return
default:
key := fmt.Sprintf("rapid_key%d", i)
bm.loadOrStore(key, executionTime, limit)

// Signal when we reach the target number of buckets
if atomic.AddInt64(&bucketCount, 1) == minBuckets {
close(ready)
}

// Small delay to make it "rapid" but not overwhelming
time.Sleep(time.Microsecond)
}
}
}()

// Wait for target number of buckets to be created before running GC
<-ready

// Run GC while buckets are being created rapidly
deleted := bm.gc(func() ntime.Time {
return executionTime
})

// Stop the rapid creation
close(stop)
wg.Wait()

// Verify GC completed successfully
require.GreaterOrEqual(t, deleted, int64(0), "GC should complete without errors")

// Final cleanup
deleted = bm.gc(func() ntime.Time {
return executionTime
})
require.GreaterOrEqual(t, deleted, int64(0), "final GC should complete without errors")
})
}
Loading