diff --git a/docs/dedup-testing/after_cpu_usage.png b/docs/dedup-testing/after_cpu_usage.png new file mode 100644 index 000000000..24b721213 Binary files /dev/null and b/docs/dedup-testing/after_cpu_usage.png differ diff --git a/docs/dedup-testing/after_memory_usage.png b/docs/dedup-testing/after_memory_usage.png new file mode 100644 index 000000000..f1d00a18a Binary files /dev/null and b/docs/dedup-testing/after_memory_usage.png differ diff --git a/docs/dedup-testing/before_cpu_usage.png b/docs/dedup-testing/before_cpu_usage.png new file mode 100644 index 000000000..93b61f39c Binary files /dev/null and b/docs/dedup-testing/before_cpu_usage.png differ diff --git a/docs/dedup-testing/before_memory_usage.png b/docs/dedup-testing/before_memory_usage.png new file mode 100644 index 000000000..e6afdef5b Binary files /dev/null and b/docs/dedup-testing/before_memory_usage.png differ diff --git a/docs/dedup-testing/benchmark-results.md b/docs/dedup-testing/benchmark-results.md new file mode 100644 index 000000000..1f508bd38 --- /dev/null +++ b/docs/dedup-testing/benchmark-results.md @@ -0,0 +1,91 @@ +# eBPF Event Dedup Benchmark Results + +Benchmark comparing `node-agent:v0.3.71` (baseline) vs the local build with the dedup cache (`feature/ebpf-event-dedup`). + +## Setup + +- **Cluster**: kind (2 nodes: control-plane + worker) +- **Prometheus**: kube-prometheus-stack with 10s scrape interval +- **Kubescape**: kubescape-operator chart with runtimeDetection + runtimeObservability enabled +- **Load simulator**: DaemonSet generating events at configurable rates +- **Duration**: 2 min warmup + 10 min load per run +- **CPU rate window**: `rate(...[1m])` for responsive measurement + +### Load Simulator Config + +| Parameter | Value | +|-----------|-------| +| openRate | 1000/sec | +| httpRate | 100/sec | +| execRate | 10/sec | +| networkRate | 10/sec | +| dnsRate | 2/sec | +| hardlinkRate | 10/sec | +| symlinkRate | 10/sec | +| cpuLoadMs | 500 | +| numberParallelCPUs | 2 | + +## Resource Usage + +| Metric | BEFORE (v0.3.71) | AFTER (dedup) | Delta | +|--------|-------------------|---------------|-------| +| Avg CPU (cores) | 0.178 | 0.150 | **-15.9%** | +| Peak CPU (cores) | 0.220 | 0.156 | **-29.1%** | +| Avg Memory (MiB) | 339.5 | 335.9 | -1.1% | +| Peak Memory (MiB) | 345.5 | 338.4 | -2.1% | + +### CPU Usage + +| BEFORE (v0.3.71) | AFTER (dedup) | +|---|---| +| ![before cpu](before_cpu_usage.png) | ![after cpu](after_cpu_usage.png) | + +### Memory Usage + +| BEFORE (v0.3.71) | AFTER (dedup) | +|---|---| +| ![before memory](before_memory_usage.png) | ![after memory](after_memory_usage.png) | + +## Dedup Effectiveness + +Events processed by the dedup cache during the AFTER run: + +| Event Type | Passed | Deduped | Dedup Ratio | +|------------|--------|---------|-------------| +| http | 1,701 | 119,453 | **98.6%** | +| network | 900 | 77,968 | **98.9%** | +| open | 59,569 | 626,133 | **91.3%** | +| syscall | 998 | 1,967 | **66.3%** | +| dns | 1,197 | 0 | 0.0% | +| hardlink | 6,000 | 0 | 0.0% | +| symlink | 6,000 | 0 | 0.0% | + +## Event Counters (cumulative, both runs) + +| Metric | BEFORE | AFTER | +|--------|--------|-------| +| open_counter | 801,868 | 816,637 | +| network_counter | 92,197 | 93,735 | +| exec_counter | 7,009 | 7,130 | +| syscall_counter | 3,628 | 3,735 | +| dns_counter | 1,401 | 1,422 | +| capability_counter | 9 | 9 | + +Event counters are consistent between runs, confirming the load simulator produced comparable workloads. + +## Analysis + +- The dedup cache reduces **avg CPU by ~16%** and **peak CPU by ~29%** under sustained load (~1,100 events/sec). +- Memory impact is negligible (~1%) since the dedup cache uses a fixed-size, lock-free array (2 MiB for 2^18 slots at 8 bytes each). +- High-frequency event types benefit most: **network (98.9%)**, **http (98.6%)**, and **open (91.3%)** dedup ratios. +- Events with unique keys per occurrence (dns, hardlink, symlink) show 0% dedup, which is expected. +- The CPU savings come from skipping CEL rule evaluation on deduplicated events. The eBPF ingestion and event enrichment cost (which dominates baseline CPU) is unchanged. + +## Reproducing + +```bash +cd perfornamce +./dedup-bench.sh quay.io/kubescape/node-agent:v0.3.71 quay.io/kubescape/node-agent:test +``` + +Requires: kind, helm, kubectl, docker, python3. Estimated runtime: ~35 minutes. diff --git a/go.mod b/go.mod index 572f65d3a..04260d90b 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/cenkalti/backoff v2.2.1+incompatible github.com/cenkalti/backoff/v4 v4.3.0 github.com/cenkalti/backoff/v5 v5.0.3 + github.com/cespare/xxhash/v2 v2.3.0 github.com/cilium/ebpf v0.20.0 github.com/crewjam/rfc5424 v0.1.0 github.com/cyphar/filepath-securejoin v0.6.0 @@ -56,6 +57,7 @@ require ( golang.org/x/sys v0.42.0 gonum.org/v1/plot v0.14.0 google.golang.org/grpc v1.79.3 + google.golang.org/protobuf v1.36.11 gopkg.in/mcuadros/go-syslog.v2 v2.3.0 istio.io/pkg v0.0.0-20231221211216-7635388a563e k8s.io/api v0.35.0 @@ -155,7 +157,6 @@ require ( github.com/bodgit/windows v1.0.1 // indirect github.com/briandowns/spinner v1.23.2 // indirect github.com/campoy/embedmd v1.0.0 // indirect - github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/charmbracelet/colorprofile v0.3.1 // indirect github.com/charmbracelet/lipgloss v1.1.0 // indirect github.com/charmbracelet/x/ansi v0.9.3 // indirect @@ -433,7 +434,6 @@ require ( google.golang.org/genproto v0.0.0-20250715232539-7130f93afb79 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 // indirect - google.golang.org/protobuf v1.36.11 // indirect gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/warnings.v0 v0.1.2 // indirect diff --git a/pkg/config/config.go b/pkg/config/config.go index 3af844815..2cfd1b46b 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -25,6 +25,12 @@ const NodeNameEnvVar = "NODE_NAME" const PodNameEnvVar = "POD_NAME" const NamespaceEnvVar = "NAMESPACE_NAME" +// EventDedupConfig controls eBPF event deduplication before CEL rule evaluation. +type EventDedupConfig struct { + Enabled bool `mapstructure:"enabled"` + SlotsExponent uint8 `mapstructure:"slotsExponent"` +} + type Config struct { BlockEvents bool `mapstructure:"blockEvents"` CelConfigCache cache.FunctionCacheConfig `mapstructure:"celConfigCache"` @@ -70,6 +76,7 @@ type Config struct { StandaloneMonitoringEnabled bool `mapstructure:"standaloneMonitoringEnabled"` SeccompProfileBackend string `mapstructure:"seccompProfileBackend"` EventBatchSize int `mapstructure:"eventBatchSize"` + EventDedup EventDedupConfig `mapstructure:"eventDedup"` ExcludeJsonPaths []string `mapstructure:"excludeJsonPaths"` ExcludeLabels map[string][]string `mapstructure:"excludeLabels"` ExcludeNamespaces []string `mapstructure:"excludeNamespaces"` @@ -183,6 +190,8 @@ func LoadConfig(path string) (Config, error) { viper.SetDefault("celConfigCache::ttl", 1*time.Minute) viper.SetDefault("ignoreRuleBindings", false) + viper.SetDefault("eventDedup::enabled", true) + viper.SetDefault("eventDedup::slotsExponent", 18) viper.SetDefault("dnsCacheSize", 50000) viper.SetDefault("seccompProfileBackend", "storage") // "storage" or "crd" viper.SetDefault("containerEolNotificationBuffer", 100) diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 8acb04cf8..acb00bd7e 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -80,6 +80,7 @@ func TestLoadConfig(t *testing.T) { }, WorkerPoolSize: 3000, EventBatchSize: 15000, + EventDedup: EventDedupConfig{Enabled: true, SlotsExponent: 18}, WorkerChannelSize: 750000, BlockEvents: false, ProcfsScanInterval: 30 * time.Second, diff --git a/pkg/containerwatcher/v2/container_watcher.go b/pkg/containerwatcher/v2/container_watcher.go index 841786152..627aa7770 100644 --- a/pkg/containerwatcher/v2/container_watcher.go +++ b/pkg/containerwatcher/v2/container_watcher.go @@ -19,6 +19,7 @@ import ( "github.com/kubescape/node-agent/pkg/config" "github.com/kubescape/node-agent/pkg/containerprofilemanager" "github.com/kubescape/node-agent/pkg/containerwatcher" + "github.com/kubescape/node-agent/pkg/dedupcache" "github.com/kubescape/node-agent/pkg/containerwatcher/v2/tracers" "github.com/kubescape/node-agent/pkg/dnsmanager" "github.com/kubescape/node-agent/pkg/ebpf/events" @@ -137,6 +138,12 @@ func CreateContainerWatcher( rulePolicyReporter := rulepolicy.NewRulePolicyReporter(ruleManager, containerProfileManager) + // Create dedup cache if enabled + var dedupCache *dedupcache.DedupCache + if cfg.EventDedup.Enabled { + dedupCache = dedupcache.NewDedupCache(cfg.EventDedup.SlotsExponent) + } + // Create event handler factory eventHandlerFactory := NewEventHandlerFactory( cfg, @@ -150,6 +157,7 @@ func CreateContainerWatcher( thirdPartyTracers.ThirdPartyEventReceivers, thirdPartyEnricher, rulePolicyReporter, + dedupCache, ) // Create event enricher @@ -462,6 +470,7 @@ func (cw *ContainerWatcher) processQueueBatch() { func (cw *ContainerWatcher) enrichAndProcess(entry EventEntry) { enrichedEvent := cw.eventEnricher.EnrichEvents(entry) + enrichedEvent.DedupBucket = uint16(time.Now().UnixNano() / (64 * 1_000_000)) select { case cw.workerChan <- enrichedEvent: diff --git a/pkg/containerwatcher/v2/event_handler_factory.go b/pkg/containerwatcher/v2/event_handler_factory.go index 668a91881..a422a625f 100644 --- a/pkg/containerwatcher/v2/event_handler_factory.go +++ b/pkg/containerwatcher/v2/event_handler_factory.go @@ -7,6 +7,7 @@ import ( "github.com/kubescape/node-agent/pkg/config" "github.com/kubescape/node-agent/pkg/containerprofilemanager" "github.com/kubescape/node-agent/pkg/containerwatcher" + "github.com/kubescape/node-agent/pkg/dedupcache" "github.com/kubescape/node-agent/pkg/dnsmanager" "github.com/kubescape/node-agent/pkg/ebpf/events" "github.com/kubescape/node-agent/pkg/eventreporters/rulepolicy" @@ -41,6 +42,20 @@ func (ma *ManagerAdapter) ReportEvent(eventType utils.EventType, event utils.K8s ma.reportEventFunc(eventType, event) } +// TTL constants for dedup windows in 64ms buckets. +const ( + dedupTTLOpen uint16 = 156 // 10s + dedupTTLNetwork uint16 = 78 // 5s + dedupTTLDNS uint16 = 156 // 10s + dedupTTLCapabilities uint16 = 156 // 10s + dedupTTLHTTP uint16 = 31 // 2s + dedupTTLSSH uint16 = 156 // 10s + dedupTTLSymlink uint16 = 156 // 10s + dedupTTLHardlink uint16 = 156 // 10s + dedupTTLPtrace uint16 = 156 // 10s + dedupTTLSyscall uint16 = 78 // 5s +) + // EventHandlerFactory manages the mapping of event types to their managers type EventHandlerFactory struct { handlers map[utils.EventType][]Manager @@ -49,6 +64,9 @@ type EventHandlerFactory struct { cfg config.Config containerCollection *containercollection.ContainerCollection containerCache *maps.SafeMap[string, *containercollection.Container] // Cache for container lookups + dedupCache *dedupcache.DedupCache + metrics metricsmanager.MetricsManager + dedupSkipSet map[Manager]struct{} // Managers to skip when event is duplicate } // NewEventHandlerFactory creates a new event handler factory @@ -64,6 +82,7 @@ func NewEventHandlerFactory( thirdPartyEventReceivers *maps.SafeMap[utils.EventType, mapset.Set[containerwatcher.GenericEventReceiver]], thirdPartyEnricher containerwatcher.TaskBasedEnricher, rulePolicyReporter *rulepolicy.RulePolicyReporter, + dedupCache *dedupcache.DedupCache, ) *EventHandlerFactory { factory := &EventHandlerFactory{ handlers: make(map[utils.EventType][]Manager), @@ -72,6 +91,9 @@ func NewEventHandlerFactory( cfg: cfg, containerCollection: containerCollection, containerCache: &maps.SafeMap[string, *containercollection.Container]{}, + dedupCache: dedupCache, + metrics: metrics, + dedupSkipSet: make(map[Manager]struct{}), } // Create adapters for managers that don't implement the Manager interface directly @@ -168,9 +190,108 @@ func NewEventHandlerFactory( rulePolicyAdapter, ) + // Populate dedupSkipSet: managers that skip processing when event is duplicate. + // RuleManager checks enrichedEvent.Duplicate internally. + factory.dedupSkipSet[containerProfileAdapter] = struct{}{} + factory.dedupSkipSet[malwareManager] = struct{}{} + return factory } +// computeEventDedupKey computes a dedup key and TTL for the given event. +// Returns shouldDedup=false for event types that must not be deduplicated. +func computeEventDedupKey(enrichedEvent *events.EnrichedEvent) (key uint64, ttl uint16, shouldDedup bool) { + event := enrichedEvent.Event + mntns := enrichedEvent.MountNamespaceID + if mntns == 0 { + if ee, ok := event.(utils.EnrichEvent); ok { + mntns = ee.GetMountNsID() + } + } + + switch event.GetEventType() { + case utils.OpenEventType: + if e, ok := event.(utils.OpenEvent); ok { + pid := uint32(0) + if ee, ok := event.(utils.EnrichEvent); ok { + pid = ee.GetPID() + } + return dedupcache.ComputeOpenKey(mntns, pid, e.GetPath(), e.GetFlagsRaw()), dedupTTLOpen, true + } + case utils.NetworkEventType: + if e, ok := event.(utils.NetworkEvent); ok { + pid := uint32(0) + if ee, ok := event.(utils.EnrichEvent); ok { + pid = ee.GetPID() + } + dst := e.GetDstEndpoint() + return dedupcache.ComputeNetworkKey(mntns, pid, dst.Addr, e.GetDstPort(), e.GetProto()), dedupTTLNetwork, true + } + case utils.DnsEventType: + if e, ok := event.(utils.DNSEvent); ok { + return dedupcache.ComputeDNSKey(mntns, e.GetDNSName()), dedupTTLDNS, true + } + case utils.CapabilitiesEventType: + if e, ok := event.(utils.CapabilitiesEvent); ok { + pid := uint32(0) + if ee, ok := event.(utils.EnrichEvent); ok { + pid = ee.GetPID() + } + return dedupcache.ComputeCapabilitiesKey(mntns, pid, e.GetCapability(), e.GetSyscall()), dedupTTLCapabilities, true + } + case utils.HTTPEventType: + if e, ok := event.(utils.HttpEvent); ok { + pid := uint32(0) + if ee, ok := event.(utils.EnrichEvent); ok { + pid = ee.GetPID() + } + req := e.GetRequest() + if req == nil { + return 0, 0, false + } + return dedupcache.ComputeHTTPKey(mntns, pid, string(e.GetDirection()), req.Method, req.Host, req.URL.Path, req.URL.RawQuery), dedupTTLHTTP, true + } + case utils.SSHEventType: + if e, ok := event.(utils.SshEvent); ok { + return dedupcache.ComputeSSHKey(mntns, e.GetDstIP(), e.GetDstPort()), dedupTTLSSH, true + } + case utils.SymlinkEventType: + if e, ok := event.(utils.LinkEvent); ok { + pid := uint32(0) + if ee, ok := event.(utils.EnrichEvent); ok { + pid = ee.GetPID() + } + return dedupcache.ComputeSymlinkKey(mntns, pid, e.GetOldPath(), e.GetNewPath()), dedupTTLSymlink, true + } + case utils.HardlinkEventType: + if e, ok := event.(utils.LinkEvent); ok { + pid := uint32(0) + if ee, ok := event.(utils.EnrichEvent); ok { + pid = ee.GetPID() + } + return dedupcache.ComputeHardlinkKey(mntns, pid, e.GetOldPath(), e.GetNewPath()), dedupTTLHardlink, true + } + case utils.PtraceEventType: + if e, ok := event.(utils.PtraceEvent); ok { + pid := uint32(0) + if ee, ok := event.(utils.EnrichEvent); ok { + pid = ee.GetPID() + } + return dedupcache.ComputePtraceKey(mntns, pid, e.GetExePath()), dedupTTLPtrace, true + } + case utils.SyscallEventType: + if e, ok := event.(utils.SyscallEvent); ok { + pid := uint32(0) + if ee, ok := event.(utils.EnrichEvent); ok { + pid = ee.GetPID() + } + return dedupcache.ComputeSyscallKey(mntns, pid, e.GetSyscall()), dedupTTLSyscall, true + } + } + // exec, exit, fork, randomx, kmod, bpf, unshare, iouring — no dedup + return 0, 0, false +} + // ProcessEvent processes an event through all registered handlers func (ehf *EventHandlerFactory) ProcessEvent(enrichedEvent *events.EnrichedEvent) { if enrichedEvent.ContainerID == "" { @@ -187,6 +308,18 @@ func (ehf *EventHandlerFactory) ProcessEvent(enrichedEvent *events.EnrichedEvent return } + // Dedup check: compute key and check cache before dispatching to handlers + if ehf.dedupCache != nil { + key, ttl, shouldDedup := computeEventDedupKey(enrichedEvent) + if shouldDedup { + duplicate := ehf.dedupCache.CheckAndSet(key, ttl, enrichedEvent.DedupBucket) + if duplicate { + enrichedEvent.Duplicate = true + } + ehf.metrics.ReportDedupEvent(enrichedEvent.Event.GetEventType(), duplicate) + } + } + // Get handlers for this event type eventType := enrichedEvent.Event.GetEventType() handlers, exists := ehf.handlers[eventType] @@ -196,6 +329,11 @@ func (ehf *EventHandlerFactory) ProcessEvent(enrichedEvent *events.EnrichedEvent // Process event through each handler for _, handler := range handlers { + if enrichedEvent.Duplicate { + if _, skip := ehf.dedupSkipSet[handler]; skip { + continue + } + } if enrichedHandler, ok := handler.(containerwatcher.EnrichedEventReceiver); ok { enrichedHandler.ReportEnrichedEvent(enrichedEvent) } else if handler, ok := handler.(containerwatcher.EventReceiver); ok { diff --git a/pkg/dedupcache/dedup_cache.go b/pkg/dedupcache/dedup_cache.go new file mode 100644 index 000000000..6d400d7e4 --- /dev/null +++ b/pkg/dedupcache/dedup_cache.go @@ -0,0 +1,47 @@ +package dedupcache + +import "sync/atomic" + +// DedupCache is a lock-free, fixed-size deduplication cache. +// Each slot packs a 48-bit key and 16-bit expiry bucket into a single atomic uint64. +// Concurrent access from thousands of goroutines is safe without mutexes — +// benign races only cause missed dedup (safe direction), never false dedup. +type DedupCache struct { + slots []atomic.Uint64 + mask uint64 +} + +// NewDedupCache creates a cache with 2^slotsExponent slots. +// Each slot is 8 bytes; e.g. exponent 18 = 262,144 slots = 2 MB. +func NewDedupCache(slotsExponent uint8) *DedupCache { + size := uint64(1) << slotsExponent + return &DedupCache{ + slots: make([]atomic.Uint64, size), + mask: size - 1, + } +} + +// pack stores the upper 48 bits of key and 16-bit expiry bucket in one uint64. +func pack(key uint64, expiryBucket uint16) uint64 { + return (key & 0xFFFFFFFFFFFF0000) | uint64(expiryBucket) +} + +// unpack extracts the 48-bit key portion and 16-bit expiry bucket. +func unpack(packed uint64) (keyBits uint64, expiryBucket uint16) { + return packed & 0xFFFFFFFFFFFF0000, uint16(packed) +} + +// CheckAndSet returns true if the key is already present and not expired (duplicate). +// Otherwise it inserts the key with expiry = currentBucket + ttlBuckets and returns false. +func (c *DedupCache) CheckAndSet(key uint64, ttlBuckets uint16, currentBucket uint16) bool { + idx := key & c.mask + + stored := c.slots[idx].Load() + storedKey, storedExpiry := unpack(stored) + if storedKey == (key & 0xFFFFFFFFFFFF0000) && storedExpiry > currentBucket { + return true // duplicate + } + + c.slots[idx].Store(pack(key, currentBucket+ttlBuckets)) + return false +} diff --git a/pkg/dedupcache/dedup_cache_test.go b/pkg/dedupcache/dedup_cache_test.go new file mode 100644 index 000000000..f754af1f0 --- /dev/null +++ b/pkg/dedupcache/dedup_cache_test.go @@ -0,0 +1,133 @@ +package dedupcache + +import ( + "sync" + "testing" +) + +func TestCheckAndSet_BasicInsertAndLookup(t *testing.T) { + c := NewDedupCache(10) // 1024 slots + + key := uint64(0xDEADBEEF12340000) + ttl := uint16(156) // ~10s in 64ms buckets + now := uint16(1000) + + // First call: not a duplicate + if c.CheckAndSet(key, ttl, now) { + t.Fatal("expected false on first insert") + } + + // Second call: duplicate + if !c.CheckAndSet(key, ttl, now) { + t.Fatal("expected true on second lookup") + } +} + +func TestCheckAndSet_TTLExpiry(t *testing.T) { + c := NewDedupCache(10) + + key := uint64(0xABCDABCD00000000) + ttl := uint16(10) // expires at bucket 1010 + now := uint16(1000) + + c.CheckAndSet(key, ttl, now) + + // Still within TTL (bucket 1009 < expiry 1010) + if !c.CheckAndSet(key, ttl, uint16(1009)) { + t.Fatal("expected duplicate within TTL") + } + + // Exactly at expiry boundary (1010 is NOT > 1010, so expired) + if c.CheckAndSet(key, ttl, uint16(1010)) { + t.Fatal("expected not duplicate at expiry boundary") + } + + // After expiry + if c.CheckAndSet(key, ttl, uint16(1100)) { + t.Fatal("expected not duplicate after expiry") + } +} + +func TestCheckAndSet_SlotCollision(t *testing.T) { + c := NewDedupCache(10) // mask = 1023 + + // Two different keys that map to the same slot but have different upper 48 bits + key1 := uint64(0xAAAA000000000100) // slot = 0x100 & 0x3FF = 256 + key2 := uint64(0xBBBB000000000100) // slot = 0x100 & 0x3FF = 256, different upper bits + + ttl := uint16(156) + now := uint16(1000) + + c.CheckAndSet(key1, ttl, now) + + // key2 overwrites key1's slot — not a duplicate + if c.CheckAndSet(key2, ttl, now) { + t.Fatal("expected false for different key in same slot") + } + + // key1 is now evicted — not found + if c.CheckAndSet(key1, ttl, now) { + t.Fatal("expected false for evicted key") + } +} + +func TestCheckAndSet_PackUnpack(t *testing.T) { + key := uint64(0xDEADBEEFCAFE0000) + expiry := uint16(42) + + packed := pack(key, expiry) + gotKey, gotExpiry := unpack(packed) + + if gotKey != (key & 0xFFFFFFFFFFFF0000) { + t.Fatalf("key mismatch: got %x, want %x", gotKey, key&0xFFFFFFFFFFFF0000) + } + if gotExpiry != expiry { + t.Fatalf("expiry mismatch: got %d, want %d", gotExpiry, expiry) + } +} + +func TestCheckAndSet_ConcurrentHammer(t *testing.T) { + c := NewDedupCache(14) // 16384 slots + + const goroutines = 100 + const opsPerGoroutine = 10000 + + var wg sync.WaitGroup + wg.Add(goroutines) + + for g := 0; g < goroutines; g++ { + go func(id int) { + defer wg.Done() + for i := 0; i < opsPerGoroutine; i++ { + key := uint64(id*opsPerGoroutine+i) << 16 + c.CheckAndSet(key, 156, uint16(1000)) + } + }(g) + } + + wg.Wait() + // No panics or data races = success (run with -race) +} + +func BenchmarkCheckAndSet(b *testing.B) { + c := NewDedupCache(18) // production size + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + key := uint64(i) << 16 + c.CheckAndSet(key, 156, uint16(1000)) + } +} + +func BenchmarkCheckAndSet_Hit(b *testing.B) { + c := NewDedupCache(18) + key := uint64(0xDEADBEEF00000000) + c.CheckAndSet(key, 156, uint16(1000)) + + b.ResetTimer() + b.ReportAllocs() + for i := 0; i < b.N; i++ { + c.CheckAndSet(key, 156, uint16(1000)) + } +} diff --git a/pkg/dedupcache/keys.go b/pkg/dedupcache/keys.go new file mode 100644 index 000000000..812908fba --- /dev/null +++ b/pkg/dedupcache/keys.go @@ -0,0 +1,128 @@ +package dedupcache + +import ( + "encoding/binary" + + "github.com/cespare/xxhash/v2" +) + +// Reusable byte buffers for writing integers into the hash. +// These are stack-allocated per call via the fixed-size array trick. + +func writeUint64(h *xxhash.Digest, v uint64) { + var buf [8]byte + binary.LittleEndian.PutUint64(buf[:], v) + h.Write(buf[:]) +} + +func writeUint32(h *xxhash.Digest, v uint32) { + var buf [4]byte + binary.LittleEndian.PutUint32(buf[:], v) + h.Write(buf[:]) +} + +func writeUint16(h *xxhash.Digest, v uint16) { + var buf [2]byte + binary.LittleEndian.PutUint16(buf[:], v) + h.Write(buf[:]) +} + +// ComputeOpenKey computes a dedup key for open events. +func ComputeOpenKey(mntns uint64, pid uint32, path string, flagsRaw uint32) uint64 { + h := xxhash.New() + writeUint64(h, mntns) + writeUint32(h, pid) + h.WriteString(path) + writeUint32(h, flagsRaw) + return h.Sum64() +} + +// ComputeNetworkKey computes a dedup key for network events. +func ComputeNetworkKey(mntns uint64, pid uint32, dstAddr string, dstPort uint16, proto string) uint64 { + h := xxhash.New() + writeUint64(h, mntns) + writeUint32(h, pid) + h.WriteString(dstAddr) + writeUint16(h, dstPort) + h.WriteString(proto) + return h.Sum64() +} + +// ComputeDNSKey computes a dedup key for DNS events. +// No qtype getter exists in the interface, so key is mntns + dnsName. +func ComputeDNSKey(mntns uint64, dnsName string) uint64 { + h := xxhash.New() + writeUint64(h, mntns) + h.WriteString(dnsName) + return h.Sum64() +} + +// ComputeCapabilitiesKey computes a dedup key for capabilities events. +func ComputeCapabilitiesKey(mntns uint64, pid uint32, capability string, syscall string) uint64 { + h := xxhash.New() + writeUint64(h, mntns) + writeUint32(h, pid) + h.WriteString(capability) + h.WriteString(syscall) + return h.Sum64() +} + +// ComputeHTTPKey computes a dedup key for HTTP events. +func ComputeHTTPKey(mntns uint64, pid uint32, direction string, method string, host string, path string, rawQuery string) uint64 { + h := xxhash.New() + writeUint64(h, mntns) + writeUint32(h, pid) + h.WriteString(direction) + h.WriteString(method) + h.WriteString(host) + h.WriteString(path) + h.WriteString(rawQuery) + return h.Sum64() +} + +// ComputeSSHKey computes a dedup key for SSH events. +func ComputeSSHKey(mntns uint64, dstIP string, dstPort uint16) uint64 { + h := xxhash.New() + writeUint64(h, mntns) + h.WriteString(dstIP) + writeUint16(h, dstPort) + return h.Sum64() +} + +// ComputeSymlinkKey computes a dedup key for symlink events. +func ComputeSymlinkKey(mntns uint64, pid uint32, oldPath string, newPath string) uint64 { + h := xxhash.New() + writeUint64(h, mntns) + writeUint32(h, pid) + h.WriteString(oldPath) + h.WriteString(newPath) + return h.Sum64() +} + +// ComputeHardlinkKey computes a dedup key for hardlink events. +func ComputeHardlinkKey(mntns uint64, pid uint32, oldPath string, newPath string) uint64 { + h := xxhash.New() + writeUint64(h, mntns) + writeUint32(h, pid) + h.WriteString(oldPath) + h.WriteString(newPath) + return h.Sum64() +} + +// ComputePtraceKey computes a dedup key for ptrace events. +func ComputePtraceKey(mntns uint64, pid uint32, exePath string) uint64 { + h := xxhash.New() + writeUint64(h, mntns) + writeUint32(h, pid) + h.WriteString(exePath) + return h.Sum64() +} + +// ComputeSyscallKey computes a dedup key for syscall events. +func ComputeSyscallKey(mntns uint64, pid uint32, syscall string) uint64 { + h := xxhash.New() + writeUint64(h, mntns) + writeUint32(h, pid) + h.WriteString(syscall) + return h.Sum64() +} diff --git a/pkg/dedupcache/keys_test.go b/pkg/dedupcache/keys_test.go new file mode 100644 index 000000000..76be557ea --- /dev/null +++ b/pkg/dedupcache/keys_test.go @@ -0,0 +1,137 @@ +package dedupcache + +import "testing" + +func TestComputeOpenKey_Deterministic(t *testing.T) { + k1 := ComputeOpenKey(123456, 42, "/etc/passwd", 0x02) + k2 := ComputeOpenKey(123456, 42, "/etc/passwd", 0x02) + if k1 != k2 { + t.Fatalf("non-deterministic: %x != %x", k1, k2) + } +} + +func TestComputeOpenKey_DifferentInputs(t *testing.T) { + k1 := ComputeOpenKey(123456, 42, "/etc/passwd", 0x02) + k2 := ComputeOpenKey(123456, 42, "/etc/shadow", 0x02) + k3 := ComputeOpenKey(123456, 43, "/etc/passwd", 0x02) + k4 := ComputeOpenKey(789012, 42, "/etc/passwd", 0x02) + k5 := ComputeOpenKey(123456, 42, "/etc/passwd", 0x04) + + keys := []uint64{k1, k2, k3, k4, k5} + for i := 0; i < len(keys); i++ { + for j := i + 1; j < len(keys); j++ { + if keys[i] == keys[j] { + t.Fatalf("collision between key[%d]=%x and key[%d]=%x", i, keys[i], j, keys[j]) + } + } + } +} + +func TestComputeNetworkKey_Deterministic(t *testing.T) { + k1 := ComputeNetworkKey(100, 1, "10.0.0.1", 80, "tcp") + k2 := ComputeNetworkKey(100, 1, "10.0.0.1", 80, "tcp") + if k1 != k2 { + t.Fatalf("non-deterministic: %x != %x", k1, k2) + } +} + +func TestComputeNetworkKey_DifferentInputs(t *testing.T) { + k1 := ComputeNetworkKey(100, 1, "10.0.0.1", 80, "tcp") + k2 := ComputeNetworkKey(100, 1, "10.0.0.2", 80, "tcp") + k3 := ComputeNetworkKey(100, 1, "10.0.0.1", 443, "tcp") + k4 := ComputeNetworkKey(100, 1, "10.0.0.1", 80, "udp") + if k1 == k2 || k1 == k3 || k1 == k4 { + t.Fatal("unexpected collision") + } +} + +func TestComputeDNSKey_Deterministic(t *testing.T) { + k1 := ComputeDNSKey(100, "example.com") + k2 := ComputeDNSKey(100, "example.com") + if k1 != k2 { + t.Fatalf("non-deterministic: %x != %x", k1, k2) + } +} + +func TestComputeDNSKey_DifferentInputs(t *testing.T) { + k1 := ComputeDNSKey(100, "example.com") + k2 := ComputeDNSKey(100, "other.com") + k3 := ComputeDNSKey(200, "example.com") + if k1 == k2 || k1 == k3 { + t.Fatal("unexpected collision") + } +} + +func TestComputeHTTPKey_IncludesRawQuery(t *testing.T) { + k1 := ComputeHTTPKey(100, 1, "outbound", "GET", "example.com", "/api", "page=1") + k2 := ComputeHTTPKey(100, 1, "outbound", "GET", "example.com", "/api", "page=1' OR 1=1--") + if k1 == k2 { + t.Fatal("different query strings must produce different keys") + } +} + +func TestComputeSSHKey_Deterministic(t *testing.T) { + k1 := ComputeSSHKey(100, "192.168.1.1", 22) + k2 := ComputeSSHKey(100, "192.168.1.1", 22) + if k1 != k2 { + t.Fatalf("non-deterministic: %x != %x", k1, k2) + } +} + +func TestComputeCapabilitiesKey_Deterministic(t *testing.T) { + k1 := ComputeCapabilitiesKey(100, 1, "CAP_NET_RAW", "socket") + k2 := ComputeCapabilitiesKey(100, 1, "CAP_NET_RAW", "socket") + if k1 != k2 { + t.Fatalf("non-deterministic: %x != %x", k1, k2) + } +} + +func TestComputeSyscallKey_DifferentInputs(t *testing.T) { + k1 := ComputeSyscallKey(100, 1, "read") + k2 := ComputeSyscallKey(100, 1, "write") + if k1 == k2 { + t.Fatal("unexpected collision") + } +} + +func BenchmarkComputeOpenKey(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + ComputeOpenKey(123456, 42, "/etc/passwd", 0x02) + } +} + +func BenchmarkComputeNetworkKey(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + ComputeNetworkKey(100, 1, "10.0.0.1", 80, "tcp") + } +} + +func BenchmarkComputeDNSKey(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + ComputeDNSKey(100, "example.com") + } +} + +func BenchmarkComputeHTTPKey(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + ComputeHTTPKey(100, 1, "outbound", "GET", "example.com", "/api/v1/users", "page=1&limit=50") + } +} + +func BenchmarkComputeCapabilitiesKey(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + ComputeCapabilitiesKey(100, 1, "CAP_NET_RAW", "socket") + } +} + +func BenchmarkComputeSyscallKey(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + ComputeSyscallKey(100, 1, "read") + } +} diff --git a/pkg/ebpf/events/enriched_event.go b/pkg/ebpf/events/enriched_event.go index c158e3be5..54eec8da2 100644 --- a/pkg/ebpf/events/enriched_event.go +++ b/pkg/ebpf/events/enriched_event.go @@ -33,4 +33,9 @@ type EnrichedEvent struct { // This uniquely identifies the container/host and is used for context lookup. // May be 0 if unavailable. MountNamespaceID uint64 + // Duplicate is set by the dedup check in EventHandlerFactory.ProcessEvent(). + // Consumers that respect this flag skip processing for deduplicated events. + Duplicate bool + // DedupBucket is the current time expressed in 64ms buckets, cached per batch tick. + DedupBucket uint16 } diff --git a/pkg/metricsmanager/metrics_manager_interface.go b/pkg/metricsmanager/metrics_manager_interface.go index cce38824d..503aa904d 100644 --- a/pkg/metricsmanager/metrics_manager_interface.go +++ b/pkg/metricsmanager/metrics_manager_interface.go @@ -18,4 +18,5 @@ type MetricsManager interface { //ReportEbpfStats(stats *top.Event[toptypes.Stats]) ReportContainerStart() ReportContainerStop() + ReportDedupEvent(eventType utils.EventType, duplicate bool) } diff --git a/pkg/metricsmanager/metrics_manager_mock.go b/pkg/metricsmanager/metrics_manager_mock.go index 1d8f24e66..f58d378ac 100644 --- a/pkg/metricsmanager/metrics_manager_mock.go +++ b/pkg/metricsmanager/metrics_manager_mock.go @@ -63,3 +63,5 @@ func (m *MetricsMock) ReportRuleEvaluationTime(ruleID string, eventType utils.Ev func (m *MetricsMock) ReportContainerStart() {} func (m *MetricsMock) ReportContainerStop() {} + +func (m *MetricsMock) ReportDedupEvent(eventType utils.EventType, duplicate bool) {} diff --git a/pkg/metricsmanager/prometheus/prometheus.go b/pkg/metricsmanager/prometheus/prometheus.go index 77f2eb22f..97dcff53e 100644 --- a/pkg/metricsmanager/prometheus/prometheus.go +++ b/pkg/metricsmanager/prometheus/prometheus.go @@ -59,6 +59,9 @@ type PrometheusMetric struct { containerStartCounter prometheus.Counter containerStopCounter prometheus.Counter + // Dedup metrics + dedupEventCounter *prometheus.CounterVec + // Cache to avoid allocating Labels maps on every call ruleCounterCache map[string]prometheus.Counter alertCounterCache map[string]prometheus.Counter @@ -200,6 +203,12 @@ func NewPrometheusMetric() *PrometheusMetric { Help: "The total number of container stop events", }), + // Dedup metrics + dedupEventCounter: promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "node_agent_dedup_events_total", + Help: "Total number of events processed by the dedup layer", + }, []string{eventTypeLabel, "result"}), + // Initialize counter caches ruleCounterCache: make(map[string]prometheus.Counter), alertCounterCache: make(map[string]prometheus.Counter), @@ -238,6 +247,7 @@ func (p *PrometheusMetric) Destroy() { prometheus.Unregister(p.ebpfBpfCounter) prometheus.Unregister(p.containerStartCounter) prometheus.Unregister(p.containerStopCounter) + prometheus.Unregister(p.dedupEventCounter) // Unregister program ID metrics prometheus.Unregister(p.programRuntimeGauge) prometheus.Unregister(p.programRunCountGauge) @@ -381,3 +391,11 @@ func (p *PrometheusMetric) ReportContainerStart() { func (p *PrometheusMetric) ReportContainerStop() { p.containerStopCounter.Inc() } + +func (p *PrometheusMetric) ReportDedupEvent(eventType utils.EventType, duplicate bool) { + result := "passed" + if duplicate { + result = "deduplicated" + } + p.dedupEventCounter.With(prometheus.Labels{eventTypeLabel: string(eventType), "result": result}).Inc() +} diff --git a/pkg/rulemanager/rule_manager.go b/pkg/rulemanager/rule_manager.go index f08cb8bfa..29d0061bb 100644 --- a/pkg/rulemanager/rule_manager.go +++ b/pkg/rulemanager/rule_manager.go @@ -146,6 +146,9 @@ func (rm *RuleManager) startRuleManager(container *containercollection.Container } func (rm *RuleManager) ReportEnrichedEvent(enrichedEvent *events.EnrichedEvent) { + if enrichedEvent.Duplicate { + return + } rm.enrichEventWithContext(enrichedEvent) var profileExists bool