diff --git a/benchmark_comparison_test.go b/benchmark_comparison_test.go new file mode 100644 index 0000000..760b958 --- /dev/null +++ b/benchmark_comparison_test.go @@ -0,0 +1,171 @@ +package bloomfilter + +import ( + "fmt" + "math/rand" + "testing" + "unsafe" +) + +// Helper: Sequential Union implementation (bypassing the Parallel check) +// Helper: Sequential Union implementation (bypassing the Parallel check) +func unionSequential(bf, other *CacheOptimizedBloomFilter) { + totalBytes := int(bf.cacheLineCount * CacheLineSize) + bf.simdOps.VectorOr( + unsafe.Pointer(&bf.cacheLines[0]), + unsafe.Pointer(&other.cacheLines[0]), + totalBytes, + ) +} + +// Helper: Sequential Add implementation +func addBatchSequential(bf *CacheOptimizedBloomFilter, data [][]byte) { + for _, item := range data { + bf.Add(item) + } +} + +// ----------------------------------------------------------------------------- +// Benchmarks: Add Batch +// ----------------------------------------------------------------------------- + +func BenchmarkAddBatch_Comparison(b *testing.B) { + // Size large enough to benefit from parallelism + size := uint64(1000000) + batchSize := 50000 + + // Pre-generate data + data := make([][]byte, batchSize) + for i := 0; i < batchSize; i++ { + data[i] = []byte(fmt.Sprintf("bench-item-%d", rand.Int())) + } + + b.Run("Sequential_Loop", func(b *testing.B) { + for i := 0; i < b.N; i++ { + b.StopTimer() + bf := NewCacheOptimizedBloomFilter(size, 0.01) + b.StartTimer() + addBatchSequential(bf, data) + } + }) + + b.Run("Parallel_AddBatch", func(b *testing.B) { + for i := 0; i < b.N; i++ { + b.StopTimer() + bf := NewCacheOptimizedBloomFilter(size, 0.01) + b.StartTimer() + bf.AddBatch(data) + } + }) +} + +// ----------------------------------------------------------------------------- +// Benchmarks: Contains Batch +// ----------------------------------------------------------------------------- + +func BenchmarkContainsBatch_Comparison(b *testing.B) { + size := uint64(1000000) + batchSize := 50000 + + data := make([][]byte, batchSize) + for i := 0; i < batchSize; i++ { + data[i] = []byte(fmt.Sprintf("bench-item-%d", rand.Int())) + } + + bf := NewCacheOptimizedBloomFilter(size, 0.01) + bf.AddBatch(data) + + b.Run("Sequential_Loop", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + for _, item := range data { + bf.Contains(item) + } + } + }) + + b.Run("Parallel_ContainsBatch", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + bf.ContainsBatch(data) + } + }) +} + +// ----------------------------------------------------------------------------- +// Benchmarks: Union +// ----------------------------------------------------------------------------- + +func BenchmarkUnion_Comparison(b *testing.B) { + // Must be larger than ParallelThreshold (4096 lines) + // 500k elements -> ~10k lines + size := uint64(1000000) + + bf1 := NewCacheOptimizedBloomFilter(size, 0.01) + bf2 := NewCacheOptimizedBloomFilter(size, 0.01) + + // Fill them up a bit + batchSize := 50000 + data1 := make([][]byte, batchSize) + data2 := make([][]byte, batchSize) + for i := 0; i < batchSize; i++ { + data1[i] = []byte(fmt.Sprintf("set1-%d", i)) + data2[i] = []byte(fmt.Sprintf("set2-%d", i)) + } + bf1.AddBatch(data1) + bf2.AddBatch(data2) + + b.Run("Sequential_Union", func(b *testing.B) { + b.StopTimer() + for i := 0; i < b.N; i++ { + // Create a fresh copy of bf1 to avoid state accumulation + bfDest := bf1.Clone() + + b.StartTimer() + unionSequential(bfDest, bf2) + b.StopTimer() + } + }) + + b.Run("Parallel_Union", func(b *testing.B) { + b.StopTimer() + for i := 0; i < b.N; i++ { + bfDest := bf1.Clone() + + b.StartTimer() + bfDest.Union(bf2) + b.StopTimer() + } + }) +} + +// ----------------------------------------------------------------------------- +// Benchmarks: PopCount +// ----------------------------------------------------------------------------- + +func BenchmarkPopCount_Comparison(b *testing.B) { + size := uint64(1000000) + bf := NewCacheOptimizedBloomFilter(size, 0.01) + + // Fill + data := make([][]byte, 50000) + for i := 0; i < 50000; i++ { + data[i] = []byte(fmt.Sprintf("item-%d", i)) + } + bf.AddBatch(data) + + b.Run("Sequential_PopCount", func(b *testing.B) { + totalBytes := int(bf.cacheLineCount * CacheLineSize) + b.ResetTimer() + for i := 0; i < b.N; i++ { + bf.simdOps.PopCount(unsafe.Pointer(&bf.cacheLines[0]), totalBytes) + } + }) + + b.Run("Parallel_PopCount", func(b *testing.B) { + b.ResetTimer() + for i := 0; i < b.N; i++ { + bf.PopCount() + } + }) +} diff --git a/bloomfilter.go b/bloomfilter.go index 45cd3c9..acb55a4 100644 --- a/bloomfilter.go +++ b/bloomfilter.go @@ -3,6 +3,8 @@ package bloomfilter import ( "fmt" "math" + "runtime" + "sync" "sync/atomic" "unsafe" @@ -106,11 +108,16 @@ func NewCacheOptimizedBloomFilter(expectedElements uint64, falsePositiveRate flo return bf } +// Clone creates a deep copy of the bloom filter +func (bf *CacheOptimizedBloomFilter) Clone() *CacheOptimizedBloomFilter { + newBf := *bf + newBf.cacheLines = make([]CacheLine, len(bf.cacheLines)) + copy(newBf.cacheLines, bf.cacheLines) + return &newBf +} + // Add adds an element with cache line optimization func (bf *CacheOptimizedBloomFilter) Add(data []byte) { - h1 := hash.Optimized1(data) - h2 := hash.Optimized2(data) - // Stack buffer for typical filters var stackBuf [16]uint64 var positions []uint64 @@ -121,9 +128,7 @@ func (bf *CacheOptimizedBloomFilter) Add(data []byte) { } // Generate positions - for i := uint32(0); i < bf.hashCount; i++ { - positions[i] = (h1 + uint64(i)*h2) % bf.bitCount - } + bf.calculatePositions(data, positions) // Set bits atomically bf.setBitsAtomic(positions) @@ -131,9 +136,6 @@ func (bf *CacheOptimizedBloomFilter) Add(data []byte) { // Contains checks membership with cache line optimization func (bf *CacheOptimizedBloomFilter) Contains(data []byte) bool { - h1 := hash.Optimized1(data) - h2 := hash.Optimized2(data) - var stackBuf [16]uint64 var positions []uint64 if bf.hashCount <= 16 { @@ -142,13 +144,112 @@ func (bf *CacheOptimizedBloomFilter) Contains(data []byte) bool { positions = make([]uint64, bf.hashCount) } - for i := uint32(0); i < bf.hashCount; i++ { - positions[i] = (h1 + uint64(i)*h2) % bf.bitCount - } + bf.calculatePositions(data, positions) return bf.checkBitsAtomic(positions) } +// AddBatch adds multiple elements concurrently using available CPU cores. +// +// Thread Safety: This method is thread-safe and uses atomic operations (CAS) to set bits. +// It can be safely called concurrently from multiple goroutines, although the internal +// Parallelization itself spawns goroutines. +// +// Parallel Execution: Parallel processing is triggered only if the batch size exceeds +// (runtime.NumCPU() * MinBatchSizePerCPU). For smaller batches, it falls back to +// sequential addition to avoid the overhead of spawning goroutines. +// +// Performance: checking benchmarks, for large batches (e.g., 50k items), this method +// can be 2x or more faster than a sequential loop, depending on the number of cores. +func (bf *CacheOptimizedBloomFilter) AddBatch(data [][]byte) { + if len(data) == 0 { + return + } + + numCPU := runtime.NumCPU() + if len(data) < numCPU*MinBatchSizePerCPU { // fallback for small batches + for _, item := range data { + bf.Add(item) + } + return + } + + bf.executeParallel(uint64(len(data)), func(start, end uint64) { + items := data[start:end] + + // Thread-local buffers to avoid allocation in loop + // We can't reuse bf.Add because it allocates internally + // So we duplicate the add logic here for performance + + hCount := bf.hashCount + var stackBuf [16]uint64 + var positions []uint64 + + if hCount <= 16 { + positions = stackBuf[:hCount] + } else { + positions = make([]uint64, hCount) + } + + for _, item := range items { + bf.calculatePositions(item, positions) + bf.setBitsAtomic(positions) + } + }) +} + +// ContainsBatch checks multiple elements concurrently. +// +// Returns: A slice of booleans where each boolean corresponds to the element at the same index +// in the input slice. true indicates the element might be in the set, false indicates it definitely is not. +// +// Thread Safety: This method is thread-safe (read-only operations on the bitset). +// +// Parallel Execution: Parallel processing is triggered only if the batch size exceeds +// (runtime.NumCPU() * MinBatchSizePerCPU). For smaller batches, it falls back to +// sequential checks. +// +// Performance: Optimizes throughput for large batches by utilizing multiple cores. +func (bf *CacheOptimizedBloomFilter) ContainsBatch(data [][]byte) []bool { + if len(data) == 0 { + return nil + } + + results := make([]bool, len(data)) + numCPU := runtime.NumCPU() + + if len(data) < numCPU*MinBatchSizePerCPU { // fallback for small batches + for i, item := range data { + results[i] = bf.Contains(item) + } + return results + } + + bf.executeParallel(uint64(len(data)), func(start, end uint64) { + chunkIdx := int(start) + items := data[start:end] + + hCount := bf.hashCount + var stackBuf [16]uint64 + var positions []uint64 + + if hCount <= 16 { + positions = stackBuf[:hCount] + } else { + positions = make([]uint64, hCount) + } + + for j, item := range items { + bf.calculatePositions(item, positions) + + // Calculate global index for results + results[chunkIdx+j] = bf.checkBitsAtomic(positions) + } + }) + + return results +} + // AddString adds a string element to the bloom filter func (bf *CacheOptimizedBloomFilter) AddString(s string) { data := *(*[]byte)(unsafe.Pointer(&struct { @@ -202,6 +303,21 @@ func (bf *CacheOptimizedBloomFilter) Union(other *CacheOptimizedBloomFilter) err return nil } + // Parallel execution for large filters + if bf.cacheLineCount >= ParallelThreshold { + bf.executeParallel(bf.cacheLineCount, func(start, end uint64) { + startIdx := int(start) + endIdx := int(end) + chunkBytes := (endIdx - startIdx) * CacheLineSize + bf.simdOps.VectorOr( + unsafe.Pointer(&bf.cacheLines[startIdx]), + unsafe.Pointer(&other.cacheLines[startIdx]), + chunkBytes, + ) + }) + return nil + } + // Calculate total data size in bytes totalBytes := int(bf.cacheLineCount * CacheLineSize) @@ -225,6 +341,21 @@ func (bf *CacheOptimizedBloomFilter) Intersection(other *CacheOptimizedBloomFilt return nil } + // Parallel execution for large filters + if bf.cacheLineCount >= ParallelThreshold { + bf.executeParallel(bf.cacheLineCount, func(start, end uint64) { + startIdx := int(start) + endIdx := int(end) + chunkBytes := (endIdx - startIdx) * CacheLineSize + bf.simdOps.VectorAnd( + unsafe.Pointer(&bf.cacheLines[startIdx]), + unsafe.Pointer(&other.cacheLines[startIdx]), + chunkBytes, + ) + }) + return nil + } + // Calculate total data size in bytes totalBytes := int(bf.cacheLineCount * CacheLineSize) @@ -244,6 +375,19 @@ func (bf *CacheOptimizedBloomFilter) PopCount() uint64 { return 0 } + // Parallel execution for large filters + if bf.cacheLineCount >= ParallelThreshold { + var totalCount uint64 + bf.executeParallel(bf.cacheLineCount, func(start, end uint64) { + startIdx := int(start) + endIdx := int(end) + chunkBytes := (endIdx - startIdx) * CacheLineSize + c := bf.simdOps.PopCount(unsafe.Pointer(&bf.cacheLines[startIdx]), chunkBytes) + atomic.AddUint64(&totalCount, uint64(c)) + }) + return totalCount + } + // Calculate total data size in bytes totalBytes := int(bf.cacheLineCount * CacheLineSize) @@ -321,6 +465,16 @@ const ( // Maps: >10K cache lines = scalable for large filters (up to billions of elements) // Memory overhead: Array mode ~240KB fixed, Map mode grows dynamically ArrayModeThreshold = 10000 + + // ParallelThreshold defines the minimum number of cache lines to trigger parallel processing + // for bulk operations. + // 4096 cache lines * 64 bytes = 262,144 bytes = 256 KiB + ParallelThreshold = 4096 + + // MinBatchSizePerCPU defines the minimum number of items per CPU core + // required to trigger parallel processing in batch operations. + // This avoids overhead of starting goroutines for small batches. + MinBatchSizePerCPU = 100 ) // CacheLine represents a single 64-byte cache line containing 8 uint64 words @@ -395,3 +549,41 @@ func (bf *CacheOptimizedBloomFilter) checkBitsAtomic(positions []uint64) bool { } return true } + +// executeParallel executes a task concurrently across multiple goroutines. +// It divides the work into chunks based on the number of available CPUs. +func (bf *CacheOptimizedBloomFilter) executeParallel(totalItems uint64, task func(start, end uint64)) { + numCPU := runtime.NumCPU() + var wg sync.WaitGroup + chunkSize := (totalItems + uint64(numCPU) - 1) / uint64(numCPU) + + for i := 0; i < numCPU; i++ { + start := uint64(i) * chunkSize + end := start + chunkSize + if start >= totalItems { + break + } + if end > totalItems { + end = totalItems + } + + wg.Add(1) + go func(s, e uint64) { + defer wg.Done() + task(s, e) + }(start, end) + } + wg.Wait() +} + +// calculatePositions computes the bit positions for a given item +func (bf *CacheOptimizedBloomFilter) calculatePositions(data []byte, positions []uint64) { + h1 := hash.Optimized1(data) + h2 := hash.Optimized2(data) + bitCount := bf.bitCount + hCount := bf.hashCount + + for i := uint32(0); i < hCount; i++ { + positions[i] = (h1 + uint64(i)*h2) % bitCount + } +} diff --git a/bloomfilter_parallel_test.go b/bloomfilter_parallel_test.go new file mode 100644 index 0000000..b70bbd4 --- /dev/null +++ b/bloomfilter_parallel_test.go @@ -0,0 +1,177 @@ +package bloomfilter + +import ( + "fmt" + "math/rand" + "testing" +) + +func TestBatchOperations(t *testing.T) { + bf := NewCacheOptimizedBloomFilter(100000, 0.01) + + // Prepare data + count := 10000 + data := make([][]byte, count) + for i := 0; i < count; i++ { + data[i] = []byte(fmt.Sprintf("item-%d", i)) + } + + // Test AddBatch + bf.AddBatch(data) + + // Test ContainsBatch in small batch (sequential fallback) + smallBatch := data[:100] + results := bf.ContainsBatch(smallBatch) + for i, exists := range results { + if !exists { + t.Errorf("Item %s should exist", smallBatch[i]) + } + } + + // Test ContainsBatch in large batch (parallel) + // We need enough items to force parallel path if we lowered the threshold, + // but here we are testing the logic. + // To actually force parallel path for Add/Check logic which uses CPU count, + // we rely on the implementation details (threshold is currently numCPU*100). + // Let's create a larger dataset to be sure. + + largeCount := 100000 + largeData := make([][]byte, largeCount) + for i := 0; i < largeCount; i++ { + largeData[i] = []byte(fmt.Sprintf("large-%d", i)) + } + + bf2 := NewCacheOptimizedBloomFilter(uint64(largeCount), 0.01) + bf2.AddBatch(largeData) + + largeResults := bf2.ContainsBatch(largeData) + if len(largeResults) != largeCount { + t.Fatalf("Expected %d results, got %d", largeCount, len(largeResults)) + } + + for i, exists := range largeResults { + if !exists { + t.Errorf("Item large-%d should exist", i) + } + } +} + +func TestParallelSetOperations(t *testing.T) { + // Create filters large enough to trigger parallel path + // ParallelThreshold is 4096 cache lines. + // 4096 * 512 bits = 2,097,152 bits. + // With optimal settings (m/n = 9.58 for 1%), we need ~220k elements. + + size := uint64(300000) + bf1 := NewCacheOptimizedBloomFilter(size, 0.01) + bf2 := NewCacheOptimizedBloomFilter(size, 0.01) + + // Check if we actually triggered parallel threshold + if bf1.cacheLineCount < ParallelThreshold { + t.Logf("Warning: Filter size (%d lines) smaller than ParallelThreshold (%d). Test will run sequentially.", + bf1.cacheLineCount, ParallelThreshold) + } else { + t.Logf("Filter size: %d lines (Parallel Test Active)", bf1.cacheLineCount) + } + + // Fill filters + // bf1 has evens, bf2 has odds + itemCount := 50000 + chunk1 := make([][]byte, itemCount) + chunk2 := make([][]byte, itemCount) + + for i := 0; i < itemCount; i++ { + chunk1[i] = []byte(fmt.Sprintf("num-%d", i*2)) // 0, 2, 4... + chunk2[i] = []byte(fmt.Sprintf("num-%d", i*2+1)) // 1, 3, 5... + } + + bf1.AddBatch(chunk1) + bf2.AddBatch(chunk2) + + // Test PopCount Parallel + pop1 := bf1.PopCount() + pop2 := bf2.PopCount() + + if pop1 == 0 || pop2 == 0 { + t.Error("PopCount should not be zero") + } + + // Test Union + // Clone bf1 to verify union result later if needed, but here we just check result + bfUnion := NewCacheOptimizedBloomFilter(size, 0.01) + bfUnion.Union(bf1) // merge bf1 + bfUnion.Union(bf2) // merge bf2 + + // Real union on bf1 + bf1.Union(bf2) + + popUnion := bf1.PopCount() + if popUnion < pop1 || popUnion < pop2 { + t.Errorf("Union PopCount %d should be >= individual counts (%d, %d)", popUnion, pop1, pop2) + } + + // Check content + if !bf1.Contains([]byte("num-0")) || !bf1.Contains([]byte("num-1")) { + t.Error("Union should contain elements from both sets") + } + + // Test Intersection + // Reset filters + bfA := NewCacheOptimizedBloomFilter(size, 0.01) + bfB := NewCacheOptimizedBloomFilter(size, 0.01) + + commonData := make([][]byte, 1000) + for i := 0; i < 1000; i++ { + commonData[i] = []byte(fmt.Sprintf("common-%d", i)) + } + + bfA.AddBatch(chunk1) // evens + bfA.AddBatch(commonData) + + bfB.AddBatch(chunk2) // odds + bfB.AddBatch(commonData) + + bfA.Intersection(bfB) + + // Should contain common + if !bfA.Contains([]byte("common-0")) { + t.Error("Intersection should keep common elements") + } + + // Should NOT contain disjoint (probabilistically) + // "num-0" was in A but not B. + if bfA.Contains([]byte("num-0")) && bfA.Contains([]byte("num-2")) && bfA.Contains([]byte("num-4")) { + // It's possible for false positives, but unlikely all 3 survive if intersection worked + t.Log("Warning: Intersection might have failed to remove disjoint elements (or false positives)") + } +} + +func BenchmarkAddBatch(b *testing.B) { + data := make([][]byte, 10000) + for i := 0; i < 10000; i++ { + data[i] = []byte(fmt.Sprintf("bench-%d", rand.Int())) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + bf := NewCacheOptimizedBloomFilter(1000000, 0.01) + b.StartTimer() + bf.AddBatch(data) + } +} + +func BenchmarkSequentialAdd(b *testing.B) { + data := make([][]byte, 10000) + for i := 0; i < 10000; i++ { + data[i] = []byte(fmt.Sprintf("bench-%d", rand.Int())) + } + b.ResetTimer() + for i := 0; i < b.N; i++ { + b.StopTimer() + bf := NewCacheOptimizedBloomFilter(1000000, 0.01) + b.StartTimer() + for _, item := range data { + bf.Add(item) + } + } +}