Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file added docs/dedup-testing/after_cpu_usage.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/dedup-testing/after_memory_usage.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/dedup-testing/before_cpu_usage.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/dedup-testing/before_memory_usage.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
91 changes: 91 additions & 0 deletions docs/dedup-testing/benchmark-results.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# eBPF Event Dedup Benchmark Results

Benchmark comparing `node-agent:v0.3.71` (baseline) vs the local build with the dedup cache (`feature/ebpf-event-dedup`).

## Setup

- **Cluster**: kind (2 nodes: control-plane + worker)
- **Prometheus**: kube-prometheus-stack with 10s scrape interval
- **Kubescape**: kubescape-operator chart with runtimeDetection + runtimeObservability enabled
- **Load simulator**: DaemonSet generating events at configurable rates
- **Duration**: 2 min warmup + 10 min load per run
- **CPU rate window**: `rate(...[1m])` for responsive measurement

### Load Simulator Config

| Parameter | Value |
|-----------|-------|
| openRate | 1000/sec |
| httpRate | 100/sec |
| execRate | 10/sec |
| networkRate | 10/sec |
| dnsRate | 2/sec |
| hardlinkRate | 10/sec |
| symlinkRate | 10/sec |
| cpuLoadMs | 500 |
| numberParallelCPUs | 2 |

## Resource Usage

| Metric | BEFORE (v0.3.71) | AFTER (dedup) | Delta |
|--------|-------------------|---------------|-------|
| Avg CPU (cores) | 0.178 | 0.150 | **-15.9%** |
| Peak CPU (cores) | 0.220 | 0.156 | **-29.1%** |
| Avg Memory (MiB) | 339.5 | 335.9 | -1.1% |
| Peak Memory (MiB) | 345.5 | 338.4 | -2.1% |

### CPU Usage

| BEFORE (v0.3.71) | AFTER (dedup) |
|---|---|
| ![before cpu](before_cpu_usage.png) | ![after cpu](after_cpu_usage.png) |

### Memory Usage

| BEFORE (v0.3.71) | AFTER (dedup) |
|---|---|
| ![before memory](before_memory_usage.png) | ![after memory](after_memory_usage.png) |

## Dedup Effectiveness

Events processed by the dedup cache during the AFTER run:

| Event Type | Passed | Deduped | Dedup Ratio |
|------------|--------|---------|-------------|
| http | 1,701 | 119,453 | **98.6%** |
| network | 900 | 77,968 | **98.9%** |
| open | 59,569 | 626,133 | **91.3%** |
| syscall | 998 | 1,967 | **66.3%** |
| dns | 1,197 | 0 | 0.0% |
| hardlink | 6,000 | 0 | 0.0% |
| symlink | 6,000 | 0 | 0.0% |

## Event Counters (cumulative, both runs)

| Metric | BEFORE | AFTER |
|--------|--------|-------|
| open_counter | 801,868 | 816,637 |
| network_counter | 92,197 | 93,735 |
| exec_counter | 7,009 | 7,130 |
| syscall_counter | 3,628 | 3,735 |
| dns_counter | 1,401 | 1,422 |
| capability_counter | 9 | 9 |

Event counters are consistent between runs, confirming the load simulator produced comparable workloads.

## Analysis

- The dedup cache reduces **avg CPU by ~16%** and **peak CPU by ~29%** under sustained load (~1,100 events/sec).
- Memory impact is negligible (~1%) since the dedup cache uses a fixed-size, lock-free array (2 MiB for 2^18 slots at 8 bytes each).
- High-frequency event types benefit most: **network (98.9%)**, **http (98.6%)**, and **open (91.3%)** dedup ratios.
- Events with unique keys per occurrence (dns, hardlink, symlink) show 0% dedup, which is expected.
- The CPU savings come from skipping CEL rule evaluation on deduplicated events. The eBPF ingestion and event enrichment cost (which dominates baseline CPU) is unchanged.

## Reproducing

```bash
cd perfornamce
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

Fix typo in directory path.

The directory name contains a typo: "perfornamce" should be "performance".

📝 Proposed fix
-cd perfornamce
+cd performance
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
cd perfornamce
cd performance
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@docs/dedup-testing/benchmark-results.md` at line 87, Replace the misspelled
directory name "perfornamce" with the correct spelling "performance" in the
markdown line that references the directory (the string "perfornamce" in the
diff); update any other occurrences of the same typo in the document (search for
"perfornamce") so all directory references use "performance".

./dedup-bench.sh quay.io/kubescape/node-agent:v0.3.71 quay.io/kubescape/node-agent:test
```

Requires: kind, helm, kubectl, docker, python3. Estimated runtime: ~35 minutes.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/cenkalti/backoff v2.2.1+incompatible
github.com/cenkalti/backoff/v4 v4.3.0
github.com/cenkalti/backoff/v5 v5.0.3
github.com/cespare/xxhash/v2 v2.3.0
github.com/cilium/ebpf v0.20.0
github.com/crewjam/rfc5424 v0.1.0
github.com/cyphar/filepath-securejoin v0.6.0
Expand Down Expand Up @@ -56,6 +57,7 @@ require (
golang.org/x/sys v0.42.0
gonum.org/v1/plot v0.14.0
google.golang.org/grpc v1.79.3
google.golang.org/protobuf v1.36.11
gopkg.in/mcuadros/go-syslog.v2 v2.3.0
istio.io/pkg v0.0.0-20231221211216-7635388a563e
k8s.io/api v0.35.0
Expand Down Expand Up @@ -155,7 +157,6 @@ require (
github.com/bodgit/windows v1.0.1 // indirect
github.com/briandowns/spinner v1.23.2 // indirect
github.com/campoy/embedmd v1.0.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/charmbracelet/colorprofile v0.3.1 // indirect
github.com/charmbracelet/lipgloss v1.1.0 // indirect
github.com/charmbracelet/x/ansi v0.9.3 // indirect
Expand Down Expand Up @@ -433,7 +434,6 @@ require (
google.golang.org/genproto v0.0.0-20250715232539-7130f93afb79 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 // indirect
google.golang.org/protobuf v1.36.11 // indirect
gopkg.in/evanphx/json-patch.v4 v4.13.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/warnings.v0 v0.1.2 // indirect
Expand Down
9 changes: 9 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ const NodeNameEnvVar = "NODE_NAME"
const PodNameEnvVar = "POD_NAME"
const NamespaceEnvVar = "NAMESPACE_NAME"

// EventDedupConfig controls eBPF event deduplication before CEL rule evaluation.
type EventDedupConfig struct {
Enabled bool `mapstructure:"enabled"`
SlotsExponent uint8 `mapstructure:"slotsExponent"`
}
Comment on lines +28 to +32
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
set -euo pipefail

rg -n -C3 'type EventDedupConfig|SlotsExponent|eventDedup::slotsExponent|NewDedupCache\(' pkg/config pkg/containerwatcher/v2 pkg/dedupcache
rg -n -C4 'func NewDedupCache|slotsExponent|1\s*<<|make\(|panic|return .*error' pkg/dedupcache

Repository: kubescape/node-agent

Length of output: 6520


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Search for LoadConfig function and any validation around eventDedup
rg -n -A 30 'func LoadConfig' pkg/config/config.go | head -60

# Search for any bounds checking or max exponent constant
rg -n 'max.*exponent|exponent.*max|MAX.*EXPONENT|slotsExponent.*<|slotsExponent.*>|validate|bounds' pkg/ -i

# Check if eventDedup appears anywhere else with potential constraints
rg -n 'eventDedup' pkg/ -C 2

Repository: kubescape/node-agent

Length of output: 7611


🏁 Script executed:

#!/bin/bash
set -euo pipefail

# Get complete LoadConfig function from config.go
sed -n '145,260p' pkg/config/config.go

# Also check if there's a Validate method on Config struct
rg -n 'func.*Config.*Validate|func \(c \*Config\)' pkg/config/ -A 5

Repository: kubescape/node-agent

Length of output: 6085


Add validation for SlotsExponent to prevent invalid cache allocations.

The slotsExponent field in EventDedupConfig is passed directly to dedupcache.NewDedupCache(...) without bounds checking. Since NewDedupCache performs an unconstrained bit-shift (size := uint64(1) << slotsExponent), an invalid exponent (e.g., > 63) can cause undefined behavior or runtime panic. Add explicit validation in LoadConfig to ensure slotsExponent stays within a safe range (e.g., 0–31, matching typical cache sizes up to 2 GB).

Also applies to: 79, 193–194

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/config/config.go` around lines 28 - 32, The
EventDedupConfig.SlotsExponent is used unchecked when creating the dedup cache
and can cause an overflow/panic in dedupcache.NewDedupCache; update LoadConfig
to validate SlotsExponent (e.g., ensure 0 <= SlotsExponent <= 31) before using
it, returning an error if out of range. Locate the struct EventDedupConfig and
the code path in LoadConfig (and any other places that call
dedupcache.NewDedupCache) and add explicit bounds checking for SlotsExponent,
with a clear error message referencing the invalid value, so callers cannot pass
an exponent >31 (or your chosen safe max) into dedupcache.NewDedupCache.


type Config struct {
BlockEvents bool `mapstructure:"blockEvents"`
CelConfigCache cache.FunctionCacheConfig `mapstructure:"celConfigCache"`
Expand Down Expand Up @@ -70,6 +76,7 @@ type Config struct {
StandaloneMonitoringEnabled bool `mapstructure:"standaloneMonitoringEnabled"`
SeccompProfileBackend string `mapstructure:"seccompProfileBackend"`
EventBatchSize int `mapstructure:"eventBatchSize"`
EventDedup EventDedupConfig `mapstructure:"eventDedup"`
ExcludeJsonPaths []string `mapstructure:"excludeJsonPaths"`
ExcludeLabels map[string][]string `mapstructure:"excludeLabels"`
ExcludeNamespaces []string `mapstructure:"excludeNamespaces"`
Expand Down Expand Up @@ -183,6 +190,8 @@ func LoadConfig(path string) (Config, error) {
viper.SetDefault("celConfigCache::ttl", 1*time.Minute)
viper.SetDefault("ignoreRuleBindings", false)

viper.SetDefault("eventDedup::enabled", true)
viper.SetDefault("eventDedup::slotsExponent", 18)
viper.SetDefault("dnsCacheSize", 50000)
viper.SetDefault("seccompProfileBackend", "storage") // "storage" or "crd"
viper.SetDefault("containerEolNotificationBuffer", 100)
Expand Down
1 change: 1 addition & 0 deletions pkg/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func TestLoadConfig(t *testing.T) {
},
WorkerPoolSize: 3000,
EventBatchSize: 15000,
EventDedup: EventDedupConfig{Enabled: true, SlotsExponent: 18},
WorkerChannelSize: 750000,
BlockEvents: false,
ProcfsScanInterval: 30 * time.Second,
Expand Down
9 changes: 9 additions & 0 deletions pkg/containerwatcher/v2/container_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/kubescape/node-agent/pkg/config"
"github.com/kubescape/node-agent/pkg/containerprofilemanager"
"github.com/kubescape/node-agent/pkg/containerwatcher"
"github.com/kubescape/node-agent/pkg/dedupcache"
"github.com/kubescape/node-agent/pkg/containerwatcher/v2/tracers"
"github.com/kubescape/node-agent/pkg/dnsmanager"
"github.com/kubescape/node-agent/pkg/ebpf/events"
Expand Down Expand Up @@ -137,6 +138,12 @@ func CreateContainerWatcher(

rulePolicyReporter := rulepolicy.NewRulePolicyReporter(ruleManager, containerProfileManager)

// Create dedup cache if enabled
var dedupCache *dedupcache.DedupCache
if cfg.EventDedup.Enabled {
dedupCache = dedupcache.NewDedupCache(cfg.EventDedup.SlotsExponent)
}

// Create event handler factory
eventHandlerFactory := NewEventHandlerFactory(
cfg,
Expand All @@ -150,6 +157,7 @@ func CreateContainerWatcher(
thirdPartyTracers.ThirdPartyEventReceivers,
thirdPartyEnricher,
rulePolicyReporter,
dedupCache,
)

// Create event enricher
Expand Down Expand Up @@ -462,6 +470,7 @@ func (cw *ContainerWatcher) processQueueBatch() {

func (cw *ContainerWatcher) enrichAndProcess(entry EventEntry) {
enrichedEvent := cw.eventEnricher.EnrichEvents(entry)
enrichedEvent.DedupBucket = uint16(time.Now().UnixNano() / (64 * 1_000_000))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Keep a stable dedup bucket for the whole queue batch.

Line 473 recomputes the bucket per event even though EnrichedEvent.DedupBucket is documented as batch-cached. A large drain can cross a 64ms boundary mid-batch and let same-batch duplicates miss dedup.

🧩 One way to keep the bucket stable
 func (cw *ContainerWatcher) processQueueBatch() {
 	batchSize := cw.cfg.EventBatchSize
 	processedCount := 0
+	dedupBucket := uint16(time.Now().UnixNano() / (64 * 1_000_000))
 	for !cw.orderedEventQueue.Empty() && processedCount < batchSize {
 		event, ok := cw.orderedEventQueue.PopEvent()
 		if !ok {
 			break
 		}
-		cw.enrichAndProcess(event)
+		cw.enrichAndProcess(event, dedupBucket)
 		processedCount++
 	}
 }
 
-func (cw *ContainerWatcher) enrichAndProcess(entry EventEntry) {
+func (cw *ContainerWatcher) enrichAndProcess(entry EventEntry, dedupBucket uint16) {
 	enrichedEvent := cw.eventEnricher.EnrichEvents(entry)
-	enrichedEvent.DedupBucket = uint16(time.Now().UnixNano() / (64 * 1_000_000))
+	enrichedEvent.DedupBucket = dedupBucket
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/containerwatcher/v2/container_watcher.go` at line 473, The DedupBucket is
being recomputed per event which can change mid-batch; compute the bucket once
for the entire queue batch and reuse it. Move the calculation
(uint16(time.Now().UnixNano() / (64 * 1_000_000))) out of the per-event code and
store it in a local variable (e.g., dedupBucket) at the start of the batch
processing routine, then assign enrichedEvent.DedupBucket = dedupBucket for each
event; ensure the calculation and assignment use the same expression so behavior
is unchanged except now stable across the batch.


select {
case cw.workerChan <- enrichedEvent:
Expand Down
138 changes: 138 additions & 0 deletions pkg/containerwatcher/v2/event_handler_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/kubescape/node-agent/pkg/config"
"github.com/kubescape/node-agent/pkg/containerprofilemanager"
"github.com/kubescape/node-agent/pkg/containerwatcher"
"github.com/kubescape/node-agent/pkg/dedupcache"
"github.com/kubescape/node-agent/pkg/dnsmanager"
"github.com/kubescape/node-agent/pkg/ebpf/events"
"github.com/kubescape/node-agent/pkg/eventreporters/rulepolicy"
Expand Down Expand Up @@ -41,6 +42,20 @@ func (ma *ManagerAdapter) ReportEvent(eventType utils.EventType, event utils.K8s
ma.reportEventFunc(eventType, event)
}

// TTL constants for dedup windows in 64ms buckets.
const (
dedupTTLOpen uint16 = 156 // 10s
dedupTTLNetwork uint16 = 78 // 5s
dedupTTLDNS uint16 = 156 // 10s
dedupTTLCapabilities uint16 = 156 // 10s
dedupTTLHTTP uint16 = 31 // 2s
dedupTTLSSH uint16 = 156 // 10s
dedupTTLSymlink uint16 = 156 // 10s
dedupTTLHardlink uint16 = 156 // 10s
dedupTTLPtrace uint16 = 156 // 10s
dedupTTLSyscall uint16 = 78 // 5s
)

// EventHandlerFactory manages the mapping of event types to their managers
type EventHandlerFactory struct {
handlers map[utils.EventType][]Manager
Expand All @@ -49,6 +64,9 @@ type EventHandlerFactory struct {
cfg config.Config
containerCollection *containercollection.ContainerCollection
containerCache *maps.SafeMap[string, *containercollection.Container] // Cache for container lookups
dedupCache *dedupcache.DedupCache
metrics metricsmanager.MetricsManager
dedupSkipSet map[Manager]struct{} // Managers to skip when event is duplicate
}

// NewEventHandlerFactory creates a new event handler factory
Expand All @@ -64,6 +82,7 @@ func NewEventHandlerFactory(
thirdPartyEventReceivers *maps.SafeMap[utils.EventType, mapset.Set[containerwatcher.GenericEventReceiver]],
thirdPartyEnricher containerwatcher.TaskBasedEnricher,
rulePolicyReporter *rulepolicy.RulePolicyReporter,
dedupCache *dedupcache.DedupCache,
) *EventHandlerFactory {
factory := &EventHandlerFactory{
handlers: make(map[utils.EventType][]Manager),
Expand All @@ -72,6 +91,9 @@ func NewEventHandlerFactory(
cfg: cfg,
containerCollection: containerCollection,
containerCache: &maps.SafeMap[string, *containercollection.Container]{},
dedupCache: dedupCache,
metrics: metrics,
dedupSkipSet: make(map[Manager]struct{}),
}

// Create adapters for managers that don't implement the Manager interface directly
Expand Down Expand Up @@ -168,9 +190,108 @@ func NewEventHandlerFactory(
rulePolicyAdapter,
)

// Populate dedupSkipSet: managers that skip processing when event is duplicate.
// RuleManager checks enrichedEvent.Duplicate internally.
factory.dedupSkipSet[containerProfileAdapter] = struct{}{}
factory.dedupSkipSet[malwareManager] = struct{}{}
Comment on lines +193 to +196
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Don't let dedup suppress dropped-event reporting.

containerProfileAdapter does more than profile updates: at Lines 101-104 it also calls ReportDroppedEvent. Once the whole adapter is in dedupSkipSet, any duplicate event with HasDroppedEvents()==true skips that call too, so the profile manager can miss kernel-drop notifications and keep treating the profile as complete when it isn't. Please split the dropped-event path out of the adapter, or run that part before the duplicate skip.

Also applies to: 332-335

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/containerwatcher/v2/event_handler_factory.go` around lines 193 - 196, The
dedupSkipSet currently contains containerProfileAdapter (and malwareManager),
which causes ReportDroppedEvent calls within containerProfileAdapter to be
skipped for duplicate events; modify the event flow so dropped-event reporting
is always executed before deduplication: extract the dropped-event handling path
(calls to ReportDroppedEvent and checks of enrichedEvent.HasDroppedEvents) out
of containerProfileAdapter and invoke it unconditionally prior to evaluating
factory.dedupSkipSet, or alternatively call the adapter's ReportDroppedEvent
logic explicitly before the duplicate-skip check; ensure the same change is
applied to the second occurrence referenced around the other dedupSkipSet
insertion so kernel-drop notifications are never suppressed.


return factory
}

// computeEventDedupKey computes a dedup key and TTL for the given event.
// Returns shouldDedup=false for event types that must not be deduplicated.
func computeEventDedupKey(enrichedEvent *events.EnrichedEvent) (key uint64, ttl uint16, shouldDedup bool) {
event := enrichedEvent.Event
mntns := enrichedEvent.MountNamespaceID
if mntns == 0 {
if ee, ok := event.(utils.EnrichEvent); ok {
mntns = ee.GetMountNsID()
}
}

switch event.GetEventType() {
case utils.OpenEventType:
if e, ok := event.(utils.OpenEvent); ok {
pid := uint32(0)
if ee, ok := event.(utils.EnrichEvent); ok {
pid = ee.GetPID()
}
return dedupcache.ComputeOpenKey(mntns, pid, e.GetPath(), e.GetFlagsRaw()), dedupTTLOpen, true
}
case utils.NetworkEventType:
if e, ok := event.(utils.NetworkEvent); ok {
pid := uint32(0)
if ee, ok := event.(utils.EnrichEvent); ok {
pid = ee.GetPID()
}
dst := e.GetDstEndpoint()
return dedupcache.ComputeNetworkKey(mntns, pid, dst.Addr, e.GetDstPort(), e.GetProto()), dedupTTLNetwork, true
}
case utils.DnsEventType:
if e, ok := event.(utils.DNSEvent); ok {
return dedupcache.ComputeDNSKey(mntns, e.GetDNSName()), dedupTTLDNS, true
}
case utils.CapabilitiesEventType:
if e, ok := event.(utils.CapabilitiesEvent); ok {
pid := uint32(0)
if ee, ok := event.(utils.EnrichEvent); ok {
pid = ee.GetPID()
}
return dedupcache.ComputeCapabilitiesKey(mntns, pid, e.GetCapability(), e.GetSyscall()), dedupTTLCapabilities, true
}
case utils.HTTPEventType:
if e, ok := event.(utils.HttpEvent); ok {
pid := uint32(0)
if ee, ok := event.(utils.EnrichEvent); ok {
pid = ee.GetPID()
}
req := e.GetRequest()
if req == nil {
return 0, 0, false
}
return dedupcache.ComputeHTTPKey(mntns, pid, string(e.GetDirection()), req.Method, req.Host, req.URL.Path, req.URL.RawQuery), dedupTTLHTTP, true
}
case utils.SSHEventType:
if e, ok := event.(utils.SshEvent); ok {
return dedupcache.ComputeSSHKey(mntns, e.GetDstIP(), e.GetDstPort()), dedupTTLSSH, true
}
case utils.SymlinkEventType:
if e, ok := event.(utils.LinkEvent); ok {
pid := uint32(0)
if ee, ok := event.(utils.EnrichEvent); ok {
pid = ee.GetPID()
}
return dedupcache.ComputeSymlinkKey(mntns, pid, e.GetOldPath(), e.GetNewPath()), dedupTTLSymlink, true
}
case utils.HardlinkEventType:
if e, ok := event.(utils.LinkEvent); ok {
pid := uint32(0)
if ee, ok := event.(utils.EnrichEvent); ok {
pid = ee.GetPID()
}
return dedupcache.ComputeHardlinkKey(mntns, pid, e.GetOldPath(), e.GetNewPath()), dedupTTLHardlink, true
}
case utils.PtraceEventType:
if e, ok := event.(utils.PtraceEvent); ok {
pid := uint32(0)
if ee, ok := event.(utils.EnrichEvent); ok {
pid = ee.GetPID()
}
return dedupcache.ComputePtraceKey(mntns, pid, e.GetExePath()), dedupTTLPtrace, true
}
case utils.SyscallEventType:
if e, ok := event.(utils.SyscallEvent); ok {
pid := uint32(0)
if ee, ok := event.(utils.EnrichEvent); ok {
pid = ee.GetPID()
}
return dedupcache.ComputeSyscallKey(mntns, pid, e.GetSyscall()), dedupTTLSyscall, true
}
}
// exec, exit, fork, randomx, kmod, bpf, unshare, iouring — no dedup
return 0, 0, false
}

// ProcessEvent processes an event through all registered handlers
func (ehf *EventHandlerFactory) ProcessEvent(enrichedEvent *events.EnrichedEvent) {
if enrichedEvent.ContainerID == "" {
Expand All @@ -187,6 +308,18 @@ func (ehf *EventHandlerFactory) ProcessEvent(enrichedEvent *events.EnrichedEvent
return
}

// Dedup check: compute key and check cache before dispatching to handlers
if ehf.dedupCache != nil {
key, ttl, shouldDedup := computeEventDedupKey(enrichedEvent)
if shouldDedup {
duplicate := ehf.dedupCache.CheckAndSet(key, ttl, enrichedEvent.DedupBucket)
if duplicate {
enrichedEvent.Duplicate = true
}
ehf.metrics.ReportDedupEvent(enrichedEvent.Event.GetEventType(), duplicate)
}
}

// Get handlers for this event type
eventType := enrichedEvent.Event.GetEventType()
handlers, exists := ehf.handlers[eventType]
Expand All @@ -196,6 +329,11 @@ func (ehf *EventHandlerFactory) ProcessEvent(enrichedEvent *events.EnrichedEvent

// Process event through each handler
for _, handler := range handlers {
if enrichedEvent.Duplicate {
if _, skip := ehf.dedupSkipSet[handler]; skip {
continue
}
}
if enrichedHandler, ok := handler.(containerwatcher.EnrichedEventReceiver); ok {
enrichedHandler.ReportEnrichedEvent(enrichedEvent)
} else if handler, ok := handler.(containerwatcher.EventReceiver); ok {
Expand Down
Loading
Loading