-
Notifications
You must be signed in to change notification settings - Fork 12
feat: eBPF event deduplication before CEL rule evaluation #762
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,91 @@ | ||
| # eBPF Event Dedup Benchmark Results | ||
|
|
||
| Benchmark comparing `node-agent:v0.3.71` (baseline) vs the local build with the dedup cache (`feature/ebpf-event-dedup`). | ||
|
|
||
| ## Setup | ||
|
|
||
| - **Cluster**: kind (2 nodes: control-plane + worker) | ||
| - **Prometheus**: kube-prometheus-stack with 10s scrape interval | ||
| - **Kubescape**: kubescape-operator chart with runtimeDetection + runtimeObservability enabled | ||
| - **Load simulator**: DaemonSet generating events at configurable rates | ||
| - **Duration**: 2 min warmup + 10 min load per run | ||
| - **CPU rate window**: `rate(...[1m])` for responsive measurement | ||
|
|
||
| ### Load Simulator Config | ||
|
|
||
| | Parameter | Value | | ||
| |-----------|-------| | ||
| | openRate | 1000/sec | | ||
| | httpRate | 100/sec | | ||
| | execRate | 10/sec | | ||
| | networkRate | 10/sec | | ||
| | dnsRate | 2/sec | | ||
| | hardlinkRate | 10/sec | | ||
| | symlinkRate | 10/sec | | ||
| | cpuLoadMs | 500 | | ||
| | numberParallelCPUs | 2 | | ||
|
|
||
| ## Resource Usage | ||
|
|
||
| | Metric | BEFORE (v0.3.71) | AFTER (dedup) | Delta | | ||
| |--------|-------------------|---------------|-------| | ||
| | Avg CPU (cores) | 0.178 | 0.150 | **-15.9%** | | ||
| | Peak CPU (cores) | 0.220 | 0.156 | **-29.1%** | | ||
| | Avg Memory (MiB) | 339.5 | 335.9 | -1.1% | | ||
| | Peak Memory (MiB) | 345.5 | 338.4 | -2.1% | | ||
|
|
||
| ### CPU Usage | ||
|
|
||
| | BEFORE (v0.3.71) | AFTER (dedup) | | ||
| |---|---| | ||
| |  |  | | ||
|
|
||
| ### Memory Usage | ||
|
|
||
| | BEFORE (v0.3.71) | AFTER (dedup) | | ||
| |---|---| | ||
| |  |  | | ||
|
|
||
| ## Dedup Effectiveness | ||
|
|
||
| Events processed by the dedup cache during the AFTER run: | ||
|
|
||
| | Event Type | Passed | Deduped | Dedup Ratio | | ||
| |------------|--------|---------|-------------| | ||
| | http | 1,701 | 119,453 | **98.6%** | | ||
| | network | 900 | 77,968 | **98.9%** | | ||
| | open | 59,569 | 626,133 | **91.3%** | | ||
| | syscall | 998 | 1,967 | **66.3%** | | ||
| | dns | 1,197 | 0 | 0.0% | | ||
| | hardlink | 6,000 | 0 | 0.0% | | ||
| | symlink | 6,000 | 0 | 0.0% | | ||
|
|
||
| ## Event Counters (cumulative, both runs) | ||
|
|
||
| | Metric | BEFORE | AFTER | | ||
| |--------|--------|-------| | ||
| | open_counter | 801,868 | 816,637 | | ||
| | network_counter | 92,197 | 93,735 | | ||
| | exec_counter | 7,009 | 7,130 | | ||
| | syscall_counter | 3,628 | 3,735 | | ||
| | dns_counter | 1,401 | 1,422 | | ||
| | capability_counter | 9 | 9 | | ||
|
|
||
| Event counters are consistent between runs, confirming the load simulator produced comparable workloads. | ||
|
|
||
| ## Analysis | ||
|
|
||
| - The dedup cache reduces **avg CPU by ~16%** and **peak CPU by ~29%** under sustained load (~1,100 events/sec). | ||
| - Memory impact is negligible (~1%) since the dedup cache uses a fixed-size, lock-free array (2 MiB for 2^18 slots at 8 bytes each). | ||
| - High-frequency event types benefit most: **network (98.9%)**, **http (98.6%)**, and **open (91.3%)** dedup ratios. | ||
| - Events with unique keys per occurrence (dns, hardlink, symlink) show 0% dedup, which is expected. | ||
| - The CPU savings come from skipping CEL rule evaluation on deduplicated events. The eBPF ingestion and event enrichment cost (which dominates baseline CPU) is unchanged. | ||
|
|
||
| ## Reproducing | ||
|
|
||
| ```bash | ||
| cd perfornamce | ||
| ./dedup-bench.sh quay.io/kubescape/node-agent:v0.3.71 quay.io/kubescape/node-agent:test | ||
| ``` | ||
|
|
||
| Requires: kind, helm, kubectl, docker, python3. Estimated runtime: ~35 minutes. | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,12 @@ const NodeNameEnvVar = "NODE_NAME" | |
| const PodNameEnvVar = "POD_NAME" | ||
| const NamespaceEnvVar = "NAMESPACE_NAME" | ||
|
|
||
| // EventDedupConfig controls eBPF event deduplication before CEL rule evaluation. | ||
| type EventDedupConfig struct { | ||
| Enabled bool `mapstructure:"enabled"` | ||
| SlotsExponent uint8 `mapstructure:"slotsExponent"` | ||
| } | ||
|
Comment on lines
+28
to
+32
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧩 Analysis chain🏁 Script executed: #!/bin/bash
set -euo pipefail
rg -n -C3 'type EventDedupConfig|SlotsExponent|eventDedup::slotsExponent|NewDedupCache\(' pkg/config pkg/containerwatcher/v2 pkg/dedupcache
rg -n -C4 'func NewDedupCache|slotsExponent|1\s*<<|make\(|panic|return .*error' pkg/dedupcacheRepository: kubescape/node-agent Length of output: 6520 🏁 Script executed: #!/bin/bash
set -euo pipefail
# Search for LoadConfig function and any validation around eventDedup
rg -n -A 30 'func LoadConfig' pkg/config/config.go | head -60
# Search for any bounds checking or max exponent constant
rg -n 'max.*exponent|exponent.*max|MAX.*EXPONENT|slotsExponent.*<|slotsExponent.*>|validate|bounds' pkg/ -i
# Check if eventDedup appears anywhere else with potential constraints
rg -n 'eventDedup' pkg/ -C 2Repository: kubescape/node-agent Length of output: 7611 🏁 Script executed: #!/bin/bash
set -euo pipefail
# Get complete LoadConfig function from config.go
sed -n '145,260p' pkg/config/config.go
# Also check if there's a Validate method on Config struct
rg -n 'func.*Config.*Validate|func \(c \*Config\)' pkg/config/ -A 5Repository: kubescape/node-agent Length of output: 6085 Add validation for The Also applies to: 79, 193–194 🤖 Prompt for AI Agents |
||
|
|
||
| type Config struct { | ||
| BlockEvents bool `mapstructure:"blockEvents"` | ||
| CelConfigCache cache.FunctionCacheConfig `mapstructure:"celConfigCache"` | ||
|
|
@@ -70,6 +76,7 @@ type Config struct { | |
| StandaloneMonitoringEnabled bool `mapstructure:"standaloneMonitoringEnabled"` | ||
| SeccompProfileBackend string `mapstructure:"seccompProfileBackend"` | ||
| EventBatchSize int `mapstructure:"eventBatchSize"` | ||
| EventDedup EventDedupConfig `mapstructure:"eventDedup"` | ||
| ExcludeJsonPaths []string `mapstructure:"excludeJsonPaths"` | ||
| ExcludeLabels map[string][]string `mapstructure:"excludeLabels"` | ||
| ExcludeNamespaces []string `mapstructure:"excludeNamespaces"` | ||
|
|
@@ -183,6 +190,8 @@ func LoadConfig(path string) (Config, error) { | |
| viper.SetDefault("celConfigCache::ttl", 1*time.Minute) | ||
| viper.SetDefault("ignoreRuleBindings", false) | ||
|
|
||
| viper.SetDefault("eventDedup::enabled", true) | ||
| viper.SetDefault("eventDedup::slotsExponent", 18) | ||
| viper.SetDefault("dnsCacheSize", 50000) | ||
| viper.SetDefault("seccompProfileBackend", "storage") // "storage" or "crd" | ||
| viper.SetDefault("containerEolNotificationBuffer", 100) | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ import ( | |
| "github.com/kubescape/node-agent/pkg/config" | ||
| "github.com/kubescape/node-agent/pkg/containerprofilemanager" | ||
| "github.com/kubescape/node-agent/pkg/containerwatcher" | ||
| "github.com/kubescape/node-agent/pkg/dedupcache" | ||
| "github.com/kubescape/node-agent/pkg/containerwatcher/v2/tracers" | ||
| "github.com/kubescape/node-agent/pkg/dnsmanager" | ||
| "github.com/kubescape/node-agent/pkg/ebpf/events" | ||
|
|
@@ -137,6 +138,12 @@ func CreateContainerWatcher( | |
|
|
||
| rulePolicyReporter := rulepolicy.NewRulePolicyReporter(ruleManager, containerProfileManager) | ||
|
|
||
| // Create dedup cache if enabled | ||
| var dedupCache *dedupcache.DedupCache | ||
| if cfg.EventDedup.Enabled { | ||
| dedupCache = dedupcache.NewDedupCache(cfg.EventDedup.SlotsExponent) | ||
| } | ||
|
|
||
| // Create event handler factory | ||
| eventHandlerFactory := NewEventHandlerFactory( | ||
| cfg, | ||
|
|
@@ -150,6 +157,7 @@ func CreateContainerWatcher( | |
| thirdPartyTracers.ThirdPartyEventReceivers, | ||
| thirdPartyEnricher, | ||
| rulePolicyReporter, | ||
| dedupCache, | ||
| ) | ||
|
|
||
| // Create event enricher | ||
|
|
@@ -462,6 +470,7 @@ func (cw *ContainerWatcher) processQueueBatch() { | |
|
|
||
| func (cw *ContainerWatcher) enrichAndProcess(entry EventEntry) { | ||
| enrichedEvent := cw.eventEnricher.EnrichEvents(entry) | ||
| enrichedEvent.DedupBucket = uint16(time.Now().UnixNano() / (64 * 1_000_000)) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keep a stable dedup bucket for the whole queue batch. Line 473 recomputes the bucket per event even though 🧩 One way to keep the bucket stable func (cw *ContainerWatcher) processQueueBatch() {
batchSize := cw.cfg.EventBatchSize
processedCount := 0
+ dedupBucket := uint16(time.Now().UnixNano() / (64 * 1_000_000))
for !cw.orderedEventQueue.Empty() && processedCount < batchSize {
event, ok := cw.orderedEventQueue.PopEvent()
if !ok {
break
}
- cw.enrichAndProcess(event)
+ cw.enrichAndProcess(event, dedupBucket)
processedCount++
}
}
-func (cw *ContainerWatcher) enrichAndProcess(entry EventEntry) {
+func (cw *ContainerWatcher) enrichAndProcess(entry EventEntry, dedupBucket uint16) {
enrichedEvent := cw.eventEnricher.EnrichEvents(entry)
- enrichedEvent.DedupBucket = uint16(time.Now().UnixNano() / (64 * 1_000_000))
+ enrichedEvent.DedupBucket = dedupBucket🤖 Prompt for AI Agents |
||
|
|
||
| select { | ||
| case cw.workerChan <- enrichedEvent: | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -7,6 +7,7 @@ import ( | |
| "github.com/kubescape/node-agent/pkg/config" | ||
| "github.com/kubescape/node-agent/pkg/containerprofilemanager" | ||
| "github.com/kubescape/node-agent/pkg/containerwatcher" | ||
| "github.com/kubescape/node-agent/pkg/dedupcache" | ||
| "github.com/kubescape/node-agent/pkg/dnsmanager" | ||
| "github.com/kubescape/node-agent/pkg/ebpf/events" | ||
| "github.com/kubescape/node-agent/pkg/eventreporters/rulepolicy" | ||
|
|
@@ -41,6 +42,20 @@ func (ma *ManagerAdapter) ReportEvent(eventType utils.EventType, event utils.K8s | |
| ma.reportEventFunc(eventType, event) | ||
| } | ||
|
|
||
| // TTL constants for dedup windows in 64ms buckets. | ||
| const ( | ||
| dedupTTLOpen uint16 = 156 // 10s | ||
| dedupTTLNetwork uint16 = 78 // 5s | ||
| dedupTTLDNS uint16 = 156 // 10s | ||
| dedupTTLCapabilities uint16 = 156 // 10s | ||
| dedupTTLHTTP uint16 = 31 // 2s | ||
| dedupTTLSSH uint16 = 156 // 10s | ||
| dedupTTLSymlink uint16 = 156 // 10s | ||
| dedupTTLHardlink uint16 = 156 // 10s | ||
| dedupTTLPtrace uint16 = 156 // 10s | ||
| dedupTTLSyscall uint16 = 78 // 5s | ||
| ) | ||
|
|
||
| // EventHandlerFactory manages the mapping of event types to their managers | ||
| type EventHandlerFactory struct { | ||
| handlers map[utils.EventType][]Manager | ||
|
|
@@ -49,6 +64,9 @@ type EventHandlerFactory struct { | |
| cfg config.Config | ||
| containerCollection *containercollection.ContainerCollection | ||
| containerCache *maps.SafeMap[string, *containercollection.Container] // Cache for container lookups | ||
| dedupCache *dedupcache.DedupCache | ||
| metrics metricsmanager.MetricsManager | ||
| dedupSkipSet map[Manager]struct{} // Managers to skip when event is duplicate | ||
| } | ||
|
|
||
| // NewEventHandlerFactory creates a new event handler factory | ||
|
|
@@ -64,6 +82,7 @@ func NewEventHandlerFactory( | |
| thirdPartyEventReceivers *maps.SafeMap[utils.EventType, mapset.Set[containerwatcher.GenericEventReceiver]], | ||
| thirdPartyEnricher containerwatcher.TaskBasedEnricher, | ||
| rulePolicyReporter *rulepolicy.RulePolicyReporter, | ||
| dedupCache *dedupcache.DedupCache, | ||
| ) *EventHandlerFactory { | ||
| factory := &EventHandlerFactory{ | ||
| handlers: make(map[utils.EventType][]Manager), | ||
|
|
@@ -72,6 +91,9 @@ func NewEventHandlerFactory( | |
| cfg: cfg, | ||
| containerCollection: containerCollection, | ||
| containerCache: &maps.SafeMap[string, *containercollection.Container]{}, | ||
| dedupCache: dedupCache, | ||
| metrics: metrics, | ||
| dedupSkipSet: make(map[Manager]struct{}), | ||
| } | ||
|
|
||
| // Create adapters for managers that don't implement the Manager interface directly | ||
|
|
@@ -168,9 +190,108 @@ func NewEventHandlerFactory( | |
| rulePolicyAdapter, | ||
| ) | ||
|
|
||
| // Populate dedupSkipSet: managers that skip processing when event is duplicate. | ||
| // RuleManager checks enrichedEvent.Duplicate internally. | ||
| factory.dedupSkipSet[containerProfileAdapter] = struct{}{} | ||
| factory.dedupSkipSet[malwareManager] = struct{}{} | ||
|
Comment on lines
+193
to
+196
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Don't let dedup suppress dropped-event reporting.
Also applies to: 332-335 🤖 Prompt for AI Agents |
||
|
|
||
| return factory | ||
| } | ||
|
|
||
| // computeEventDedupKey computes a dedup key and TTL for the given event. | ||
| // Returns shouldDedup=false for event types that must not be deduplicated. | ||
| func computeEventDedupKey(enrichedEvent *events.EnrichedEvent) (key uint64, ttl uint16, shouldDedup bool) { | ||
| event := enrichedEvent.Event | ||
| mntns := enrichedEvent.MountNamespaceID | ||
| if mntns == 0 { | ||
| if ee, ok := event.(utils.EnrichEvent); ok { | ||
| mntns = ee.GetMountNsID() | ||
| } | ||
| } | ||
|
|
||
| switch event.GetEventType() { | ||
| case utils.OpenEventType: | ||
| if e, ok := event.(utils.OpenEvent); ok { | ||
| pid := uint32(0) | ||
| if ee, ok := event.(utils.EnrichEvent); ok { | ||
| pid = ee.GetPID() | ||
| } | ||
| return dedupcache.ComputeOpenKey(mntns, pid, e.GetPath(), e.GetFlagsRaw()), dedupTTLOpen, true | ||
| } | ||
| case utils.NetworkEventType: | ||
| if e, ok := event.(utils.NetworkEvent); ok { | ||
| pid := uint32(0) | ||
| if ee, ok := event.(utils.EnrichEvent); ok { | ||
| pid = ee.GetPID() | ||
| } | ||
| dst := e.GetDstEndpoint() | ||
| return dedupcache.ComputeNetworkKey(mntns, pid, dst.Addr, e.GetDstPort(), e.GetProto()), dedupTTLNetwork, true | ||
| } | ||
| case utils.DnsEventType: | ||
| if e, ok := event.(utils.DNSEvent); ok { | ||
| return dedupcache.ComputeDNSKey(mntns, e.GetDNSName()), dedupTTLDNS, true | ||
| } | ||
| case utils.CapabilitiesEventType: | ||
| if e, ok := event.(utils.CapabilitiesEvent); ok { | ||
| pid := uint32(0) | ||
| if ee, ok := event.(utils.EnrichEvent); ok { | ||
| pid = ee.GetPID() | ||
| } | ||
| return dedupcache.ComputeCapabilitiesKey(mntns, pid, e.GetCapability(), e.GetSyscall()), dedupTTLCapabilities, true | ||
| } | ||
| case utils.HTTPEventType: | ||
| if e, ok := event.(utils.HttpEvent); ok { | ||
| pid := uint32(0) | ||
| if ee, ok := event.(utils.EnrichEvent); ok { | ||
| pid = ee.GetPID() | ||
| } | ||
| req := e.GetRequest() | ||
| if req == nil { | ||
| return 0, 0, false | ||
| } | ||
| return dedupcache.ComputeHTTPKey(mntns, pid, string(e.GetDirection()), req.Method, req.Host, req.URL.Path, req.URL.RawQuery), dedupTTLHTTP, true | ||
| } | ||
| case utils.SSHEventType: | ||
| if e, ok := event.(utils.SshEvent); ok { | ||
| return dedupcache.ComputeSSHKey(mntns, e.GetDstIP(), e.GetDstPort()), dedupTTLSSH, true | ||
| } | ||
| case utils.SymlinkEventType: | ||
| if e, ok := event.(utils.LinkEvent); ok { | ||
| pid := uint32(0) | ||
| if ee, ok := event.(utils.EnrichEvent); ok { | ||
| pid = ee.GetPID() | ||
| } | ||
| return dedupcache.ComputeSymlinkKey(mntns, pid, e.GetOldPath(), e.GetNewPath()), dedupTTLSymlink, true | ||
| } | ||
| case utils.HardlinkEventType: | ||
| if e, ok := event.(utils.LinkEvent); ok { | ||
| pid := uint32(0) | ||
| if ee, ok := event.(utils.EnrichEvent); ok { | ||
| pid = ee.GetPID() | ||
| } | ||
| return dedupcache.ComputeHardlinkKey(mntns, pid, e.GetOldPath(), e.GetNewPath()), dedupTTLHardlink, true | ||
| } | ||
| case utils.PtraceEventType: | ||
| if e, ok := event.(utils.PtraceEvent); ok { | ||
| pid := uint32(0) | ||
| if ee, ok := event.(utils.EnrichEvent); ok { | ||
| pid = ee.GetPID() | ||
| } | ||
| return dedupcache.ComputePtraceKey(mntns, pid, e.GetExePath()), dedupTTLPtrace, true | ||
| } | ||
| case utils.SyscallEventType: | ||
| if e, ok := event.(utils.SyscallEvent); ok { | ||
| pid := uint32(0) | ||
| if ee, ok := event.(utils.EnrichEvent); ok { | ||
| pid = ee.GetPID() | ||
| } | ||
| return dedupcache.ComputeSyscallKey(mntns, pid, e.GetSyscall()), dedupTTLSyscall, true | ||
| } | ||
| } | ||
| // exec, exit, fork, randomx, kmod, bpf, unshare, iouring — no dedup | ||
| return 0, 0, false | ||
| } | ||
|
|
||
| // ProcessEvent processes an event through all registered handlers | ||
| func (ehf *EventHandlerFactory) ProcessEvent(enrichedEvent *events.EnrichedEvent) { | ||
| if enrichedEvent.ContainerID == "" { | ||
|
|
@@ -187,6 +308,18 @@ func (ehf *EventHandlerFactory) ProcessEvent(enrichedEvent *events.EnrichedEvent | |
| return | ||
| } | ||
|
|
||
| // Dedup check: compute key and check cache before dispatching to handlers | ||
| if ehf.dedupCache != nil { | ||
| key, ttl, shouldDedup := computeEventDedupKey(enrichedEvent) | ||
| if shouldDedup { | ||
| duplicate := ehf.dedupCache.CheckAndSet(key, ttl, enrichedEvent.DedupBucket) | ||
| if duplicate { | ||
| enrichedEvent.Duplicate = true | ||
| } | ||
| ehf.metrics.ReportDedupEvent(enrichedEvent.Event.GetEventType(), duplicate) | ||
| } | ||
| } | ||
|
|
||
| // Get handlers for this event type | ||
| eventType := enrichedEvent.Event.GetEventType() | ||
| handlers, exists := ehf.handlers[eventType] | ||
|
|
@@ -196,6 +329,11 @@ func (ehf *EventHandlerFactory) ProcessEvent(enrichedEvent *events.EnrichedEvent | |
|
|
||
| // Process event through each handler | ||
| for _, handler := range handlers { | ||
| if enrichedEvent.Duplicate { | ||
| if _, skip := ehf.dedupSkipSet[handler]; skip { | ||
| continue | ||
| } | ||
| } | ||
| if enrichedHandler, ok := handler.(containerwatcher.EnrichedEventReceiver); ok { | ||
| enrichedHandler.ReportEnrichedEvent(enrichedEvent) | ||
| } else if handler, ok := handler.(containerwatcher.EventReceiver); ok { | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix typo in directory path.
The directory name contains a typo: "perfornamce" should be "performance".
📝 Proposed fix
📝 Committable suggestion
🤖 Prompt for AI Agents