diff --git a/process/connections.go b/process/connections.go index a65d23fa..0c8942f4 100644 --- a/process/connections.go +++ b/process/connections.go @@ -142,3 +142,46 @@ func (m *CollectorConnections) GetConnectionsTags(tagIndex int32) []string { func (m *CollectorConnections) UnsafeIterateConnectionTags(tagIndex int32, cb func(i, total int, tag []byte) bool) { unsafeIterateTags(m.EncodedConnectionsTags, int(tagIndex), cb) } + +// Aggregate telemetry counters +func (t *ConnectionsTelemetry) Aggregate(a *ConnectionsTelemetry) { + if a == nil { + return + } + t.MonotonicKprobesTriggered += a.MonotonicKprobesTriggered + t.MonotonicKprobesMissed += a.MonotonicKprobesMissed + t.MonotonicConntrackRegisters += a.MonotonicConntrackRegisters + t.MonotonicConntrackRegistersDropped += a.MonotonicConntrackRegistersDropped + t.MonotonicDnsPacketsProcessed += a.MonotonicDnsPacketsProcessed + t.MonotonicConnsClosed += a.MonotonicConnsClosed + t.ConnsBpfMapSize += a.ConnsBpfMapSize + t.MonotonicUdpSendsProcessed += a.MonotonicUdpSendsProcessed + t.MonotonicUdpSendsMissed += a.MonotonicUdpSendsMissed + t.ConntrackSamplingPercent += a.ConntrackSamplingPercent + t.DnsStatsDropped += a.DnsStatsDropped +} + +// Aggregate connections +func (c *Connections) Aggregate(a *Connections) { + c.Conns = append(c.Conns, a.Conns...) + for ip, names := range a.Dns { + c.Dns[ip] = names + } + c.Domains = append(c.Domains, a.Domains...) + c.ConnTelemetry.Aggregate(a.ConnTelemetry) + + c.Routes = append(c.Routes, a.Routes...) + + for name, count := range a.ConnTelemetryMap { + c.ConnTelemetryMap[name] += count + } + + // Would not need to be aggregated + // c.AgentConfiguration + + // Only first message contain these fields, so we don't need to aggregate + // c.KernelHeaderFetchResult + // c.CompilationTelemetryByAsset + // c.CORETelemetryByAsset + // c.PrebuiltEBPFAssets +} diff --git a/process/connections.pb.go b/process/connections.pb.go index 41e3474e..c58eeee9 100644 --- a/process/connections.pb.go +++ b/process/connections.pb.go @@ -415,6 +415,7 @@ func (m *CollectorConnections) GetResolvedHostsByName() map[string]*Host { return nil } +// please update process/connections.go Aggregate() if you add a field type Connections struct { Conns []*Connection `protobuf:"bytes,1,rep,name=conns" json:"conns,omitempty"` Dns map[string]*DNSEntry `protobuf:"bytes,2,rep,name=dns" json:"dns,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value"` @@ -630,6 +631,7 @@ func (m *ResourceMetadata) String() string { return proto.CompactText func (*ResourceMetadata) ProtoMessage() {} func (*ResourceMetadata) Descriptor() ([]byte, []int) { return fileDescriptorConnections, []int{3} } +// please update process/connections.go Aggregate() if you add a field type ConnectionsTelemetry struct { MonotonicKprobesTriggered int64 `protobuf:"varint,1,opt,name=monotonicKprobesTriggered,proto3" json:"monotonicKprobesTriggered,omitempty"` MonotonicKprobesMissed int64 `protobuf:"varint,2,opt,name=monotonicKprobesMissed,proto3" json:"monotonicKprobesMissed,omitempty"` diff --git a/proto/process/connections.proto b/proto/process/connections.proto index 1e78a238..4c2b6ace 100644 --- a/proto/process/connections.proto +++ b/proto/process/connections.proto @@ -85,6 +85,7 @@ message CollectorConnections { map resolvedHostsByName = 40; // Post-resolution field } +// please update process/connections.go Aggregate() if you add a field message Connections { repeated Connection conns = 1; map dns = 2; @@ -101,7 +102,6 @@ message Connections { repeated string PrebuiltEBPFAssets = 12; } - message Connection { reserved 2, 3, 4, 7, 8, 9, 12, 13, 14, 15, 35; // Please update when adding fields @@ -196,6 +196,7 @@ message ResourceMetadata { int64 tagsModified = 6; } +// please update process/connections.go Aggregate() if you add a field message ConnectionsTelemetry { int64 monotonicKprobesTriggered = 1; int64 monotonicKprobesMissed = 2;