From 7261a3938d72491a9c34734e59aa346ffe81cedb Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Fri, 20 Feb 2026 10:08:32 +0100 Subject: [PATCH 01/10] map data field for OpAMP --- internal/pkg/api/handleOpAMP.go | 203 ++++++++++++++++++++++- internal/pkg/api/handleOpAMP_test.go | 230 +++++++++++++++++++++++++++ internal/pkg/checkin/bulk.go | 50 ++++++ internal/pkg/dl/constants.go | 4 + internal/pkg/model/schema.go | 13 ++ 5 files changed, 493 insertions(+), 7 deletions(-) diff --git a/internal/pkg/api/handleOpAMP.go b/internal/pkg/api/handleOpAMP.go index fabbafba42..5d6b89c58f 100644 --- a/internal/pkg/api/handleOpAMP.go +++ b/internal/pkg/api/handleOpAMP.go @@ -10,8 +10,11 @@ import ( "errors" "fmt" "net/http" + "strings" "time" + "gopkg.in/yaml.v3" + "github.com/gofrs/uuid/v5" "github.com/open-telemetry/opamp-go/protobufs" oaServer "github.com/open-telemetry/opamp-go/server" @@ -38,13 +41,17 @@ type OpAMPT struct { cfg *config.Server bulk bulk.Bulk cache cache.Cache - bc *checkin.Bulk + bc checkinBulk srv oaServer.OpAMPServer handler oaServer.HTTPHandlerFunc connCtx oaServer.ConnContext } +type checkinBulk interface { + CheckIn(id string, opts ...checkin.Option) error +} + func NewOpAMPT( ctx context.Context, cfg *config.Server, @@ -233,12 +240,16 @@ func (oa *OpAMPT) enrollAgent(zlog zerolog.Logger, agentID string, aToS *protobu // description is only sent if any of its fields change. meta := localMetadata{} meta.Elastic.Agent.ID = agentID + agentType := "" if aToS.AgentDescription != nil { // Extract agent version for _, ia := range aToS.AgentDescription.IdentifyingAttributes { switch attribute.Key(ia.Key) { case semconv.ServiceVersionKey: meta.Elastic.Agent.Version = ia.GetValue().GetStringValue() + case semconv.ServiceNameKey: + agentType = ia.GetValue().GetStringValue() + meta.Elastic.Agent.Name = agentType } } zlog.Debug().Str("opamp.agent.version", meta.Elastic.Agent.Version).Msg("extracted agent version") @@ -250,6 +261,9 @@ func (oa *OpAMPT) enrollAgent(zlog zerolog.Logger, agentID string, aToS *protobu hostname := nia.GetValue().GetStringValue() meta.Host.Name = hostname meta.Host.Hostname = hostname + case semconv.OSTypeKey: + osType := nia.GetValue().GetStringValue() + meta.Os.Platform = osType } } zlog.Debug().Str("hostname", meta.Host.Hostname).Msg("extracted hostname") @@ -261,15 +275,32 @@ func (oa *OpAMPT) enrollAgent(zlog zerolog.Logger, agentID string, aToS *protobu return nil, fmt.Errorf("failed to marshal local metadata: %w", err) } + identifyingAttributes, err := ProtobufKVToRawMessage(aToS.AgentDescription.IdentifyingAttributes) + if err != nil { + return nil, fmt.Errorf("failed to marshal identifying attributes: %w", err) + } + + nonIdentifyingAttributes, err := ProtobufKVToRawMessage(aToS.AgentDescription.NonIdentifyingAttributes) + if err != nil { + return nil, fmt.Errorf("failed to marshal non-identifying attributes: %w", err) + } + agent := model.Agent{ ESDocument: model.ESDocument{Id: agentID}, Active: true, EnrolledAt: now.UTC().Format(time.RFC3339), PolicyID: rec.PolicyID, Agent: &model.AgentMetadata{ - ID: agentID, + ID: agentID, + Version: meta.Elastic.Agent.Version, + Type: agentType, }, - LocalMetadata: data, + LocalMetadata: data, + PolicyRevisionIdx: 1, + IdentifyingAttributes: identifyingAttributes, + NonIdentifyingAttributes: nonIdentifyingAttributes, + Type: "OPAMP", + Tags: []string{agentType}, } data, err = json.Marshal(agent) @@ -291,14 +322,42 @@ func (oa *OpAMPT) updateAgent(zlog zerolog.Logger, agent *model.Agent, aToS *pro initialOpts := make([]checkin.Option, 0) + status := "online" + // Extract the health status from the health message if it exists. if aToS.Health != nil { - initialOpts = append(initialOpts, checkin.WithStatus(aToS.Health.Status)) + if aToS.Health.Healthy == false { + status = "error" + } else if aToS.Health.Status == "StatusRecoverableError" { + status = "degraded" + } - // Extract the unhealthy reason from the health message if it exists. + // Extract the last_checkin_message from the health message if it exists. if aToS.Health.LastError != "" { - unhealthyReason := []string{aToS.Health.LastError} - initialOpts = append(initialOpts, checkin.WithUnhealthyReason(&unhealthyReason)) + initialOpts = append(initialOpts, checkin.WithMessage(aToS.Health.LastError)) + } else { + initialOpts = append(initialOpts, checkin.WithMessage(aToS.Health.Status)) + } + healthBytes, err := json.Marshal(aToS.Health) + if err != nil { + return fmt.Errorf("failed to marshal health: %w", err) + } + initialOpts = append(initialOpts, checkin.WithHealth(healthBytes)) + } + + initialOpts = append(initialOpts, checkin.WithStatus(status)) + initialOpts = append(initialOpts, checkin.WithSequenceNum(aToS.SequenceNum)) + + capabilities := decodeCapabilities(aToS.Capabilities) + initialOpts = append(initialOpts, checkin.WithCapabilities(capabilities)) + + if aToS.EffectiveConfig != nil { + effectiveConfigBytes, err := ParseEffectiveConfig(aToS.EffectiveConfig) + if err != nil { + return fmt.Errorf("failed to parse effective config: %w", err) + } + if effectiveConfigBytes != nil { + initialOpts = append(initialOpts, checkin.WithEffectiveConfig(effectiveConfigBytes)) } } @@ -310,10 +369,140 @@ type localMetadata struct { Agent struct { ID string `json:"id,omitempty"` Version string `json:"version,omitempty"` + Name string `json:"name,omitempty"` } `json:"agent,omitempty"` } `json:"elastic,omitempty"` Host struct { Hostname string `json:"hostname,omitempty"` Name string `json:"name,omitempty"` } `json:"host,omitempty"` + Os struct { + Platform string `json:"platform,omitempty"` + } `json:"os,omitempty"` +} + +func ParseEffectiveConfig(effectiveConfig *protobufs.EffectiveConfig) ([]byte, error) { + if effectiveConfig.ConfigMap != nil && effectiveConfig.ConfigMap.ConfigMap[""] != nil { + configMap := effectiveConfig.ConfigMap.ConfigMap[""] + + if len(configMap.Body) != 0 { + bodyBytes := configMap.Body + + obj := make(map[string]interface{}) + if err := yaml.Unmarshal(bodyBytes, &obj); err != nil { + return nil, fmt.Errorf("unmarshal effective config failure: %v", err) + } + redactSensitive(obj) + effectiveConfigBytes, err := json.Marshal(obj) + if err != nil { + return nil, fmt.Errorf("failed to marshal effective config: %w", err) + } + return effectiveConfigBytes, nil + } + } + return nil, nil +} + +func redactSensitive(v interface{}) { + switch typed := v.(type) { + case map[string]interface{}: + for key, val := range typed { + if isSensitiveKey(key) { + typed[key] = "[REDACTED]" + continue + } + redactSensitive(val) + } + case map[interface{}]interface{}: + for rawKey, val := range typed { + key, ok := rawKey.(string) + if ok && isSensitiveKey(key) { + typed[rawKey] = "[REDACTED]" + continue + } + redactSensitive(val) + } + case []interface{}: + for i := range typed { + redactSensitive(typed[i]) + } + } +} + +func isSensitiveKey(key string) bool { + key = strings.ToLower(strings.TrimSpace(key)) + if key == "" { + return false + } + for _, token := range []string{ + "password", + "passwd", + "pass", + "secret", + "token", + "apikey", + "api_key", + "access_key", + "private_key", + "credential", + "credentials", + } { + if key == token || strings.Contains(key, token) { + return true + } + } + return false +} + +func ProtobufKVToRawMessage(kv []*protobufs.KeyValue) (json.RawMessage, error) { + // 1. Build an intermediate map to represent the JSON object + data := make(map[string]interface{}, len(kv)) + for _, item := range kv { + if item.Value == nil { + continue + } + switch v := item.Value.GetValue().(type) { + case *protobufs.AnyValue_StringValue: + if v.StringValue != "" { + data[item.Key] = v.StringValue + } + case *protobufs.AnyValue_IntValue: + data[item.Key] = v.IntValue + case *protobufs.AnyValue_DoubleValue: + data[item.Key] = v.DoubleValue + case *protobufs.AnyValue_BoolValue: + data[item.Key] = v.BoolValue + case *protobufs.AnyValue_BytesValue: + if len(v.BytesValue) > 0 { + data[item.Key] = v.BytesValue + } + } + } + + // 2. Marshal the map into bytes + b, err := json.Marshal(data) + if err != nil { + return nil, err + } + + return json.RawMessage(b), nil +} + +// decodeCapabilities converts capability bitmask to human-readable strings +func decodeCapabilities(caps uint64) []string { + var result []string + capMap := map[uint64]string{ + uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsStatus): "ReportsStatus", + uint64(protobufs.AgentCapabilities_AgentCapabilities_AcceptsRemoteConfig): "AcceptsRemoteConfig", + uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsEffectiveConfig): "ReportsEffectiveConfig", + uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsHealth): "ReportsHealth", + uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents): "ReportsAvailableComponents", + uint64(protobufs.AgentCapabilities_AgentCapabilities_AcceptsRestartCommand): "AcceptsRestartCommand", + } + for mask, name := range capMap { + if caps&mask != 0 { + result = append(result, name) + } + } + return result } diff --git a/internal/pkg/api/handleOpAMP_test.go b/internal/pkg/api/handleOpAMP_test.go index ebadac236f..58d486d5ac 100644 --- a/internal/pkg/api/handleOpAMP_test.go +++ b/internal/pkg/api/handleOpAMP_test.go @@ -5,10 +5,25 @@ package api import ( + "encoding/base64" + "encoding/json" + "io" + "reflect" "testing" + "unsafe" + "github.com/elastic/fleet-server/v7/internal/pkg/apikey" + "github.com/elastic/fleet-server/v7/internal/pkg/checkin" "github.com/elastic/fleet-server/v7/internal/pkg/config" + "github.com/elastic/fleet-server/v7/internal/pkg/dl" + "github.com/elastic/fleet-server/v7/internal/pkg/es" + "github.com/elastic/fleet-server/v7/internal/pkg/model" + ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" + "github.com/open-telemetry/opamp-go/protobufs" + "github.com/rs/zerolog" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + semconv "go.opentelemetry.io/otel/semconv/v1.37.0" ) func TestFeatureFlag(t *testing.T) { @@ -38,3 +53,218 @@ func TestFeatureFlag(t *testing.T) { }) } } + +func TestProtobufKVToRawMessage(t *testing.T) { + input := []*protobufs.KeyValue{ + { + Key: "string_key", + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_StringValue{StringValue: "hello"}, + }, + }, + { + Key: "int_key", + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_IntValue{IntValue: 42}, + }, + }, + { + Key: "double_key", + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_DoubleValue{DoubleValue: 3.14}, + }, + }, + { + Key: "bool_key", + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_BoolValue{BoolValue: true}, + }, + }, + { + Key: "bytes_key", + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_BytesValue{BytesValue: []byte("bin")}, + }, + }, + } + + raw, err := ProtobufKVToRawMessage(input) + require.NoError(t, err) + + var got map[string]interface{} + require.NoError(t, json.Unmarshal(raw, &got)) + + require.Equal(t, "hello", got["string_key"]) + require.Equal(t, float64(42), got["int_key"]) + require.Equal(t, 3.14, got["double_key"]) + require.Equal(t, true, got["bool_key"]) + require.Equal(t, base64.StdEncoding.EncodeToString([]byte("bin")), got["bytes_key"]) +} + +func TestEnrollAgentWithAgentToServerMessage(t *testing.T) { + bulker := ftesting.NewMockBulk() + + enrollKey := model.EnrollmentAPIKey{ + APIKeyID: "enroll-key-id", + PolicyID: "policy-123", + Active: true, + } + enrollKeyBytes, err := json.Marshal(enrollKey) + require.NoError(t, err) + + bulker.On("Search", mock.Anything, dl.FleetEnrollmentAPIKeys, mock.Anything, mock.Anything). + Return(&es.ResultT{ + HitsT: es.HitsT{ + Hits: []es.HitT{{Source: enrollKeyBytes}}, + }, + }, nil) + + var createdAgent model.Agent + bulker.On("Create", mock.Anything, dl.FleetAgents, "agent-123", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + body, ok := args.Get(3).([]byte) + require.True(t, ok) + require.NoError(t, json.Unmarshal(body, &createdAgent)) + }). + Return("doc-id", nil) + + oa := &OpAMPT{bulk: bulker} + msg := &protobufs.AgentToServer{ + AgentDescription: &protobufs.AgentDescription{ + IdentifyingAttributes: []*protobufs.KeyValue{ + { + Key: string(semconv.ServiceVersionKey), + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_StringValue{StringValue: "1.2.3"}, + }, + }, + { + Key: string(semconv.ServiceNameKey), + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_StringValue{StringValue: "otel-collector"}, + }, + }, + }, + NonIdentifyingAttributes: []*protobufs.KeyValue{ + { + Key: string(semconv.HostNameKey), + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_StringValue{StringValue: "host-1"}, + }, + }, + { + Key: string(semconv.OSTypeKey), + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_StringValue{StringValue: "linux"}, + }, + }, + }, + }, + } + + apiKey := &apikey.APIKey{ID: "enroll-key-id"} + zlog := zerolog.New(io.Discard) + + agent, err := oa.enrollAgent(zlog, "agent-123", msg, apiKey) + require.NoError(t, err) + require.NotNil(t, agent) + require.Equal(t, "policy-123", agent.PolicyID) + require.Equal(t, "1.2.3", agent.Agent.Version) + require.Equal(t, "otel-collector", agent.Agent.Type) + require.Equal(t, []string{"otel-collector"}, agent.Tags) + + var meta localMetadata + require.NoError(t, json.Unmarshal(agent.LocalMetadata, &meta)) + require.Equal(t, "host-1", meta.Host.Hostname) + require.Equal(t, "linux", meta.Os.Platform) + + require.Equal(t, agent.Id, createdAgent.Agent.ID) + require.Equal(t, agent.PolicyID, createdAgent.PolicyID) + bulker.AssertExpectations(t) +} + +func TestUpdateAgentWithAgentToServerMessage(t *testing.T) { + checker := &mockCheckin{} + oa := &OpAMPT{bc: checker} + + agent := &model.Agent{ESDocument: model.ESDocument{Id: "agent-123"}} + + msg := &protobufs.AgentToServer{ + SequenceNum: 7, + Capabilities: uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsHealth) | + uint64(protobufs.AgentCapabilities_AgentCapabilities_AcceptsRemoteConfig), + Health: &protobufs.ComponentHealth{ + Healthy: true, + Status: "StatusRecoverableError", + LastError: "boom", + }, + EffectiveConfig: &protobufs.EffectiveConfig{ + ConfigMap: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "": { + Body: []byte("password: 12345\nnum: 2\n"), + ContentType: "text/yaml", + }, + }, + }, + }, + } + + zlog := zerolog.New(io.Discard) + require.NoError(t, oa.updateAgent(zlog, agent, msg)) + require.Equal(t, "agent-123", checker.id) + + pending := pendingFromOptions(t, checker.opts) + require.Equal(t, "degraded", getUnexportedField(pending, "status").String()) + require.Equal(t, "boom", getUnexportedField(pending, "message").String()) + require.Equal(t, uint64(7), getUnexportedField(pending, "sequenceNum").Uint()) + + extra := getUnexportedField(pending, "extra") + require.False(t, extra.IsNil()) + extraVal := extra.Elem() + + capabilitiesVal := getUnexportedField(extraVal, "capabilities") + capabilities := capabilitiesVal.Interface().([]string) + require.ElementsMatch(t, []string{"ReportsHealth", "AcceptsRemoteConfig"}, capabilities) + + healthBytes := getUnexportedField(extraVal, "health").Bytes() + var health protobufs.ComponentHealth + require.NoError(t, json.Unmarshal(healthBytes, &health)) + require.Equal(t, "boom", health.LastError) + require.Equal(t, "StatusRecoverableError", health.Status) + + configBytes := getUnexportedField(extraVal, "effectiveConfig").Bytes() + var config map[string]interface{} + require.NoError(t, json.Unmarshal(configBytes, &config)) + require.Equal(t, "[REDACTED]", config["password"]) + require.Equal(t, float64(2), config["num"]) +} + +type mockCheckin struct { + id string + opts []checkin.Option +} + +func (m *mockCheckin) CheckIn(id string, opts ...checkin.Option) error { + m.id = id + m.opts = opts + return nil +} + +func pendingFromOptions(t *testing.T, opts []checkin.Option) reflect.Value { + t.Helper() + require.NotEmpty(t, opts) + + sampleOpt := checkin.WithStatus("") + argType := reflect.TypeOf(sampleOpt).In(0) + pendingPtr := reflect.New(argType.Elem()) + for _, opt := range opts { + reflect.ValueOf(opt).Call([]reflect.Value{pendingPtr}) + } + return pendingPtr.Elem() +} + +func getUnexportedField(v reflect.Value, name string) reflect.Value { + field := v.FieldByName(name) + return reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem() +} diff --git a/internal/pkg/checkin/bulk.go b/internal/pkg/checkin/bulk.go index e02966156c..03c33e8527 100644 --- a/internal/pkg/checkin/bulk.go +++ b/internal/pkg/checkin/bulk.go @@ -125,6 +125,39 @@ func WithPolicyRevisionIDX(idx int64) Option { } } +func WithSequenceNum(seq uint64) Option { + return func(pending *pendingT) { + pending.sequenceNum = seq + } +} + +func WithHealth(health []byte) Option { + return func(pending *pendingT) { + if pending.extra == nil { + pending.extra = &extraT{} + } + pending.extra.health = health + } +} + +func WithCapabilities(capabilities []string) Option { + return func(pending *pendingT) { + if pending.extra == nil { + pending.extra = &extraT{} + } + pending.extra.capabilities = capabilities + } +} + +func WithEffectiveConfig(effectiveConfig []byte) Option { + return func(pending *pendingT) { + if pending.extra == nil { + pending.extra = &extraT{} + } + pending.extra.effectiveConfig = effectiveConfig + } +} + func WithAvailableRollbacks(availableRollbacks []byte) Option { return func(pending *pendingT) { if pending.extra == nil { @@ -141,6 +174,9 @@ type extraT struct { components []byte deleteAudit bool availableRollbacks []byte + health []byte + capabilities []string + effectiveConfig []byte } // Minimize the size of this structure. @@ -154,6 +190,7 @@ type pendingT struct { revisionIDX int64 extra *extraT unhealthyReason *[]string + sequenceNum uint64 } // Bulk will batch pending checkins and update elasticsearch at a set interval. @@ -337,6 +374,7 @@ func toUpdateBody(now string, pending pendingT) ([]byte, error) { dl.FieldLastCheckinStatus: pending.status, // Set the pending status dl.FieldLastCheckinMessage: pending.message, // Set the status message dl.FieldUnhealthyReason: pending.unhealthyReason, + dl.FieldSequenceNum: pending.sequenceNum, } if pending.agentPolicyID != "" { fields[dl.FieldAgentPolicyID] = pending.agentPolicyID @@ -364,6 +402,18 @@ func toUpdateBody(now string, pending pendingT) ([]byte, error) { fields[dl.FieldComponents] = json.RawMessage(pending.extra.components) } + if pending.extra.health != nil { + fields[dl.FieldHealth] = json.RawMessage(pending.extra.health) + } + + if pending.extra.capabilities != nil { + fields[dl.FieldCapabilities] = pending.extra.capabilities + } + + if pending.extra.effectiveConfig != nil { + fields[dl.FieldEffectiveConfig] = json.RawMessage(pending.extra.effectiveConfig) + } + // If seqNo changed, set the field appropriately if pending.extra.seqNo.IsSet() { fields[dl.FieldActionSeqNo] = pending.extra.seqNo diff --git a/internal/pkg/dl/constants.go b/internal/pkg/dl/constants.go index d81b0d4b65..c48116d5cd 100644 --- a/internal/pkg/dl/constants.go +++ b/internal/pkg/dl/constants.go @@ -45,6 +45,10 @@ const ( FieldUnenrolledReason = "unenrolled_reason" FiledType = "type" FieldUnhealthyReason = "unhealthy_reason" + FieldSequenceNum = "sequence_num" + FieldHealth = "health" + FieldCapabilities = "capabilities" + FieldEffectiveConfig = "effective_config" FieldActive = "active" FieldNamespaces = "namespaces" diff --git a/internal/pkg/model/schema.go b/internal/pkg/model/schema.go index 9ce75d006b..4904e6951c 100644 --- a/internal/pkg/model/schema.go +++ b/internal/pkg/model/schema.go @@ -242,6 +242,16 @@ type Agent struct { // User provided metadata information for the Elastic Agent UserProvidedMetadata json.RawMessage `json:"user_provided_metadata,omitempty"` + + IdentifyingAttributes json.RawMessage `json:"identifying_attributes,omitempty"` + + NonIdentifyingAttributes json.RawMessage `json:"non_identifying_attributes,omitempty"` + + SequenceNum uint64 `json:"sequence_num,omitempty"` + + Health json.RawMessage `json:"health,omitempty"` + Capabilities []string `json:"capabilities,omitempty"` + EffectiveConfig json.RawMessage `json:"effective_config,omitempty"` } // AgentMetadata An Elastic Agent metadata @@ -252,6 +262,8 @@ type AgentMetadata struct { // The version of the Elastic Agent Version string `json:"version"` + + Type string `json:"type,omitempty"` } // Artifact An artifact served by Fleet @@ -351,6 +363,7 @@ type ComponentsItems struct { Message string `json:"message,omitempty"` Status string `json:"status,omitempty"` Units []UnitsItems `json:"units,omitempty"` + Type string `json:"type,omitempty"` } // DataStream From 964eec6c79f6c7a32ef2c14e51a53b87acbf514b Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Fri, 20 Feb 2026 10:11:24 +0100 Subject: [PATCH 02/10] fix lint issues --- internal/pkg/api/handleOpAMP.go | 9 +++++---- internal/pkg/api/handleOpAMP_test.go | 3 ++- internal/pkg/checkin/bulk.go | 10 +++++----- internal/pkg/dl/constants.go | 8 ++++---- internal/pkg/model/schema.go | 1 - 5 files changed, 16 insertions(+), 15 deletions(-) diff --git a/internal/pkg/api/handleOpAMP.go b/internal/pkg/api/handleOpAMP.go index 5d6b89c58f..275b690b5b 100644 --- a/internal/pkg/api/handleOpAMP.go +++ b/internal/pkg/api/handleOpAMP.go @@ -326,7 +326,7 @@ func (oa *OpAMPT) updateAgent(zlog zerolog.Logger, agent *model.Agent, aToS *pro // Extract the health status from the health message if it exists. if aToS.Health != nil { - if aToS.Health.Healthy == false { + if !aToS.Health.Healthy { status = "error" } else if aToS.Health.Status == "StatusRecoverableError" { status = "degraded" @@ -390,7 +390,7 @@ func ParseEffectiveConfig(effectiveConfig *protobufs.EffectiveConfig) ([]byte, e obj := make(map[string]interface{}) if err := yaml.Unmarshal(bodyBytes, &obj); err != nil { - return nil, fmt.Errorf("unmarshal effective config failure: %v", err) + return nil, fmt.Errorf("unmarshal effective config failure: %w", err) } redactSensitive(obj) effectiveConfigBytes, err := json.Marshal(obj) @@ -404,11 +404,12 @@ func ParseEffectiveConfig(effectiveConfig *protobufs.EffectiveConfig) ([]byte, e } func redactSensitive(v interface{}) { + const redacted = "[REDACTED]" switch typed := v.(type) { case map[string]interface{}: for key, val := range typed { if isSensitiveKey(key) { - typed[key] = "[REDACTED]" + typed[key] = redacted continue } redactSensitive(val) @@ -417,7 +418,7 @@ func redactSensitive(v interface{}) { for rawKey, val := range typed { key, ok := rawKey.(string) if ok && isSensitiveKey(key) { - typed[rawKey] = "[REDACTED]" + typed[rawKey] = redacted continue } redactSensitive(val) diff --git a/internal/pkg/api/handleOpAMP_test.go b/internal/pkg/api/handleOpAMP_test.go index 58d486d5ac..828f1c440a 100644 --- a/internal/pkg/api/handleOpAMP_test.go +++ b/internal/pkg/api/handleOpAMP_test.go @@ -224,7 +224,8 @@ func TestUpdateAgentWithAgentToServerMessage(t *testing.T) { extraVal := extra.Elem() capabilitiesVal := getUnexportedField(extraVal, "capabilities") - capabilities := capabilitiesVal.Interface().([]string) + capabilities, ok := capabilitiesVal.Interface().([]string) + require.True(t, ok) require.ElementsMatch(t, []string{"ReportsHealth", "AcceptsRemoteConfig"}, capabilities) healthBytes := getUnexportedField(extraVal, "health").Bytes() diff --git a/internal/pkg/checkin/bulk.go b/internal/pkg/checkin/bulk.go index 03c33e8527..d5a4a67f58 100644 --- a/internal/pkg/checkin/bulk.go +++ b/internal/pkg/checkin/bulk.go @@ -140,7 +140,7 @@ func WithHealth(health []byte) Option { } } -func WithCapabilities(capabilities []string) Option { +func WithCapabilities(capabilities []string) Option { return func(pending *pendingT) { if pending.extra == nil { pending.extra = &extraT{} @@ -174,8 +174,8 @@ type extraT struct { components []byte deleteAudit bool availableRollbacks []byte - health []byte - capabilities []string + health []byte + capabilities []string effectiveConfig []byte } @@ -190,7 +190,7 @@ type pendingT struct { revisionIDX int64 extra *extraT unhealthyReason *[]string - sequenceNum uint64 + sequenceNum uint64 } // Bulk will batch pending checkins and update elasticsearch at a set interval. @@ -374,7 +374,7 @@ func toUpdateBody(now string, pending pendingT) ([]byte, error) { dl.FieldLastCheckinStatus: pending.status, // Set the pending status dl.FieldLastCheckinMessage: pending.message, // Set the status message dl.FieldUnhealthyReason: pending.unhealthyReason, - dl.FieldSequenceNum: pending.sequenceNum, + dl.FieldSequenceNum: pending.sequenceNum, } if pending.agentPolicyID != "" { fields[dl.FieldAgentPolicyID] = pending.agentPolicyID diff --git a/internal/pkg/dl/constants.go b/internal/pkg/dl/constants.go index c48116d5cd..26c098d808 100644 --- a/internal/pkg/dl/constants.go +++ b/internal/pkg/dl/constants.go @@ -45,10 +45,10 @@ const ( FieldUnenrolledReason = "unenrolled_reason" FiledType = "type" FieldUnhealthyReason = "unhealthy_reason" - FieldSequenceNum = "sequence_num" - FieldHealth = "health" - FieldCapabilities = "capabilities" - FieldEffectiveConfig = "effective_config" + FieldSequenceNum = "sequence_num" + FieldHealth = "health" + FieldCapabilities = "capabilities" + FieldEffectiveConfig = "effective_config" FieldActive = "active" FieldNamespaces = "namespaces" diff --git a/internal/pkg/model/schema.go b/internal/pkg/model/schema.go index 4904e6951c..de340375d9 100644 --- a/internal/pkg/model/schema.go +++ b/internal/pkg/model/schema.go @@ -363,7 +363,6 @@ type ComponentsItems struct { Message string `json:"message,omitempty"` Status string `json:"status,omitempty"` Units []UnitsItems `json:"units,omitempty"` - Type string `json:"type,omitempty"` } // DataStream From f2ff1ec64106f88ee7aa09cdc909019d01cadf82 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Fri, 20 Feb 2026 10:48:58 +0100 Subject: [PATCH 03/10] fix schema --- internal/pkg/model/schema.go | 33 +++++++++++++++++++++------------ model/schema.json | 33 ++++++++++++++++++++++++++++++++- 2 files changed, 53 insertions(+), 13 deletions(-) diff --git a/internal/pkg/model/schema.go b/internal/pkg/model/schema.go index de340375d9..b0961e469f 100644 --- a/internal/pkg/model/schema.go +++ b/internal/pkg/model/schema.go @@ -141,6 +141,9 @@ type Agent struct { // Agent timestamp for audit unenroll/uninstall action AuditUnenrolledTime string `json:"audit_unenrolled_time,omitempty"` + // List of capabilities of the collector + Capabilities []string `json:"capabilities,omitempty"` + // Elastic Agent components detailed status information Components json.RawMessage `json:"components,omitempty"` @@ -153,12 +156,21 @@ type Agent struct { // Deprecated. Use Outputs instead. ID of the API key the Elastic Agent uses to authenticate with elasticsearch DefaultAPIKeyID string `json:"default_api_key_id,omitempty"` + // The effective config that the collector is running + EffectiveConfig json.RawMessage `json:"effective_config,omitempty"` + // Date/time the Elastic Agent enrolled EnrolledAt string `json:"enrolled_at"` // Enrollment ID EnrollmentID string `json:"enrollment_id,omitempty"` + // Health information of the collector + Health json.RawMessage `json:"health,omitempty"` + + // Identifying attributes of the collector + IdentifyingAttributes json.RawMessage `json:"identifying_attributes,omitempty"` + // Date/time the Elastic Agent checked in last time LastCheckin string `json:"last_checkin,omitempty"` @@ -177,6 +189,9 @@ type Agent struct { // Namespaces Namespaces []string `json:"namespaces,omitempty"` + // Non-identifying attributes of the collector + NonIdentifyingAttributes json.RawMessage `json:"non_identifying_attributes,omitempty"` + // Outputs is the policy output data, mapping the output name to its data Outputs map[string]*PolicyOutput `json:"outputs,omitempty"` @@ -198,6 +213,9 @@ type Agent struct { // hash of token provided during enrollment that allows replacement by another enrollment with same ID ReplaceToken string `json:"replace_token,omitempty"` + // The sequence number of the last collector message + SequenceNum int64 `json:"sequence_num,omitempty"` + // Shared ID SharedID string `json:"shared_id,omitempty"` @@ -242,16 +260,6 @@ type Agent struct { // User provided metadata information for the Elastic Agent UserProvidedMetadata json.RawMessage `json:"user_provided_metadata,omitempty"` - - IdentifyingAttributes json.RawMessage `json:"identifying_attributes,omitempty"` - - NonIdentifyingAttributes json.RawMessage `json:"non_identifying_attributes,omitempty"` - - SequenceNum uint64 `json:"sequence_num,omitempty"` - - Health json.RawMessage `json:"health,omitempty"` - Capabilities []string `json:"capabilities,omitempty"` - EffectiveConfig json.RawMessage `json:"effective_config,omitempty"` } // AgentMetadata An Elastic Agent metadata @@ -260,10 +268,11 @@ type AgentMetadata struct { // The unique identifier for the Elastic Agent ID string `json:"id"` + // The type of the Elastic Agent + Type string `json:"type,omitempty"` + // The version of the Elastic Agent Version string `json:"version"` - - Type string `json:"type,omitempty"` } // Artifact An artifact served by Fleet diff --git a/model/schema.json b/model/schema.json index 4e63c14d73..c6983c3689 100644 --- a/model/schema.json +++ b/model/schema.json @@ -188,6 +188,10 @@ "version": { "description": "The version of the Elastic Agent", "type": "string" + }, + "type": { + "description": "The type of the Elastic Agent", + "type": "string" } }, "required": ["id", "version"] @@ -744,7 +748,34 @@ } } } - } + }, + "identifying_attributes": { + "description": "Identifying attributes of the collector", + "format": "raw" + }, + "non_identifying_attributes": { + "description": "Non-identifying attributes of the collector", + "format": "raw" + }, + "sequence_num": { + "description": "The sequence number of the last collector message", + "type": "integer" + }, + "health": { + "description": "Health information of the collector", + "format": "raw" + }, + "capabilities": { + "description": "List of capabilities of the collector", + "type": "array", + "items": { + "type": "string" + } + }, + "effective_config": { + "description": "The effective config that the collector is running", + "format": "raw" + } }, "required": ["_id", "type", "active", "enrolled_at", "status"] }, From 6b39f50bdd016f90230b573d215336ddcf4d9a54 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Mon, 23 Feb 2026 10:39:19 +0100 Subject: [PATCH 04/10] fix review comments --- internal/pkg/api/handleOpAMP.go | 41 ++++++++++++++++++---------- internal/pkg/api/handleOpAMP_test.go | 4 +-- model/schema.json | 10 +++---- 3 files changed, 34 insertions(+), 21 deletions(-) diff --git a/internal/pkg/api/handleOpAMP.go b/internal/pkg/api/handleOpAMP.go index 275b690b5b..3b7acf0795 100644 --- a/internal/pkg/api/handleOpAMP.go +++ b/internal/pkg/api/handleOpAMP.go @@ -207,13 +207,20 @@ func (oa *OpAMPT) handleMessage(zlog zerolog.Logger, apiKey *apikey.APIKey) func } } -func (oa *OpAMPT) findEnrolledAgent(ctx context.Context, _ zerolog.Logger, agentID string) (*model.Agent, error) { +func (oa *OpAMPT) findEnrolledAgent(ctx context.Context, zlog zerolog.Logger, agentID string) (*model.Agent, error) { agent, err := dl.FindAgent(ctx, oa.bulk, dl.QueryAgentByID, dl.FieldID, agentID) if errors.Is(err, dl.ErrNotFound) { return nil, nil } + // if agents index doesn't exist yet, it will be created when the first agent document is indexed + if err != nil && strings.Contains(err.Error(), "index_not_found_exception") { + zlog.Info().Msg("index not found when searching for enrolled agent") + return nil, nil + } + if err != nil { + zlog.Error().Err(err).Msg("failed to find agent by ID") return nil, fmt.Errorf("failed to find agent: %w", err) } @@ -241,6 +248,7 @@ func (oa *OpAMPT) enrollAgent(zlog zerolog.Logger, agentID string, aToS *protobu meta := localMetadata{} meta.Elastic.Agent.ID = agentID agentType := "" + var identifyingAttributes, nonIdentifyingAttributes json.RawMessage if aToS.AgentDescription != nil { // Extract agent version for _, ia := range aToS.AgentDescription.IdentifyingAttributes { @@ -267,6 +275,16 @@ func (oa *OpAMPT) enrollAgent(zlog zerolog.Logger, agentID string, aToS *protobu } } zlog.Debug().Str("hostname", meta.Host.Hostname).Msg("extracted hostname") + + identifyingAttributes, err = ProtobufKVToRawMessage(zlog, aToS.AgentDescription.IdentifyingAttributes) + if err != nil { + return nil, fmt.Errorf("failed to marshal identifying attributes: %w", err) + } + + nonIdentifyingAttributes, err = ProtobufKVToRawMessage(zlog, aToS.AgentDescription.NonIdentifyingAttributes) + if err != nil { + return nil, fmt.Errorf("failed to marshal non-identifying attributes: %w", err) + } } // Update local metadata if something has changed @@ -275,16 +293,6 @@ func (oa *OpAMPT) enrollAgent(zlog zerolog.Logger, agentID string, aToS *protobu return nil, fmt.Errorf("failed to marshal local metadata: %w", err) } - identifyingAttributes, err := ProtobufKVToRawMessage(aToS.AgentDescription.IdentifyingAttributes) - if err != nil { - return nil, fmt.Errorf("failed to marshal identifying attributes: %w", err) - } - - nonIdentifyingAttributes, err := ProtobufKVToRawMessage(aToS.AgentDescription.NonIdentifyingAttributes) - if err != nil { - return nil, fmt.Errorf("failed to marshal non-identifying attributes: %w", err) - } - agent := model.Agent{ ESDocument: model.ESDocument{Id: agentID}, Active: true, @@ -295,7 +303,8 @@ func (oa *OpAMPT) enrollAgent(zlog zerolog.Logger, agentID string, aToS *protobu Version: meta.Elastic.Agent.Version, Type: agentType, }, - LocalMetadata: data, + LocalMetadata: data, + // Setting revision to 1, the collector won't receive policy changes and 0 would keep the collector in updating state PolicyRevisionIdx: 1, IdentifyingAttributes: identifyingAttributes, NonIdentifyingAttributes: nonIdentifyingAttributes, @@ -448,14 +457,14 @@ func isSensitiveKey(key string) bool { "credential", "credentials", } { - if key == token || strings.Contains(key, token) { + if key == token { return true } } return false } -func ProtobufKVToRawMessage(kv []*protobufs.KeyValue) (json.RawMessage, error) { +func ProtobufKVToRawMessage(zlog zerolog.Logger, kv []*protobufs.KeyValue) (json.RawMessage, error) { // 1. Build an intermediate map to represent the JSON object data := make(map[string]interface{}, len(kv)) for _, item := range kv { @@ -477,12 +486,16 @@ func ProtobufKVToRawMessage(kv []*protobufs.KeyValue) (json.RawMessage, error) { if len(v.BytesValue) > 0 { data[item.Key] = v.BytesValue } + continue + default: + return nil, fmt.Errorf("unsupported attribute value type for key %s", item.Key) } } // 2. Marshal the map into bytes b, err := json.Marshal(data) if err != nil { + zlog.Error().Err(err).Msg("failed to marshal key-value pairs") return nil, err } diff --git a/internal/pkg/api/handleOpAMP_test.go b/internal/pkg/api/handleOpAMP_test.go index 828f1c440a..3bc0a11717 100644 --- a/internal/pkg/api/handleOpAMP_test.go +++ b/internal/pkg/api/handleOpAMP_test.go @@ -87,8 +87,8 @@ func TestProtobufKVToRawMessage(t *testing.T) { }, }, } - - raw, err := ProtobufKVToRawMessage(input) + zlog := zerolog.New(io.Discard) + raw, err := ProtobufKVToRawMessage(zlog, input) require.NoError(t, err) var got map[string]interface{} diff --git a/model/schema.json b/model/schema.json index c6983c3689..49cbdcbd87 100644 --- a/model/schema.json +++ b/model/schema.json @@ -766,16 +766,16 @@ "format": "raw" }, "capabilities": { - "description": "List of capabilities of the collector", + "description": "List of capabilities of the collector", "type": "array", "items": { "type": "string" } }, - "effective_config": { - "description": "The effective config that the collector is running", - "format": "raw" - } + "effective_config": { + "description": "The effective config that the collector is running", + "format": "raw" + } }, "required": ["_id", "type", "active", "enrolled_at", "status"] }, From 601096b25518c1b812ba84c521722e1288a6de94 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Tue, 24 Feb 2026 11:22:25 +0100 Subject: [PATCH 05/10] use same redactKey as elastic-agent --- internal/pkg/api/handleOpAMP.go | 39 ++++++++++++++------------------- 1 file changed, 16 insertions(+), 23 deletions(-) diff --git a/internal/pkg/api/handleOpAMP.go b/internal/pkg/api/handleOpAMP.go index 3b7acf0795..a460fe163d 100644 --- a/internal/pkg/api/handleOpAMP.go +++ b/internal/pkg/api/handleOpAMP.go @@ -417,7 +417,7 @@ func redactSensitive(v interface{}) { switch typed := v.(type) { case map[string]interface{}: for key, val := range typed { - if isSensitiveKey(key) { + if redactKey(key) { typed[key] = redacted continue } @@ -426,7 +426,7 @@ func redactSensitive(v interface{}) { case map[interface{}]interface{}: for rawKey, val := range typed { key, ok := rawKey.(string) - if ok && isSensitiveKey(key) { + if ok && redactKey(key) { typed[rawKey] = redacted continue } @@ -439,29 +439,22 @@ func redactSensitive(v interface{}) { } } -func isSensitiveKey(key string) bool { - key = strings.ToLower(strings.TrimSpace(key)) - if key == "" { +// TODO move to a common place, same as https://github.com/elastic/elastic-agent/blob/1c3fb4b4c8989cd2cfb692780debd7619820ae72/internal/pkg/diagnostics/diagnostics.go#L454-L468 +func redactKey(k string) bool { + // "routekey" shouldn't be redacted. + // Add any other exceptions here. + if k == "routekey" { return false } - for _, token := range []string{ - "password", - "passwd", - "pass", - "secret", - "token", - "apikey", - "api_key", - "access_key", - "private_key", - "credential", - "credentials", - } { - if key == token { - return true - } - } - return false + + k = strings.ToLower(k) + return strings.Contains(k, "auth") || + strings.Contains(k, "certificate") || + strings.Contains(k, "passphrase") || + strings.Contains(k, "password") || + strings.Contains(k, "token") || + strings.Contains(k, "key") || + strings.Contains(k, "secret") } func ProtobufKVToRawMessage(zlog zerolog.Logger, kv []*protobufs.KeyValue) (json.RawMessage, error) { From 8532b67d5802cc7bb19135ee0ae63e8d26b3cfba Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Tue, 24 Feb 2026 13:16:00 +0100 Subject: [PATCH 06/10] handle Array and KVlist in ProtobufKVToRawMessage --- internal/pkg/api/handleOpAMP.go | 53 ++++++++++++++++++---------- internal/pkg/api/handleOpAMP_test.go | 34 ++++++++++++++++++ 2 files changed, 68 insertions(+), 19 deletions(-) diff --git a/internal/pkg/api/handleOpAMP.go b/internal/pkg/api/handleOpAMP.go index a460fe163d..5988ea7ce0 100644 --- a/internal/pkg/api/handleOpAMP.go +++ b/internal/pkg/api/handleOpAMP.go @@ -457,6 +457,39 @@ func redactKey(k string) bool { strings.Contains(k, "secret") } +// anyValueToInterface recursively converts protobufs.AnyValue to Go interface{} for JSON marshalling +func anyValueToInterface(zlog zerolog.Logger, av *protobufs.AnyValue) interface{} { + switch v := av.GetValue().(type) { + case *protobufs.AnyValue_StringValue: + return v.StringValue + case *protobufs.AnyValue_IntValue: + return v.IntValue + case *protobufs.AnyValue_DoubleValue: + return v.DoubleValue + case *protobufs.AnyValue_BoolValue: + return v.BoolValue + case *protobufs.AnyValue_BytesValue: + return v.BytesValue + case *protobufs.AnyValue_ArrayValue: + arr := make([]interface{}, 0, len(v.ArrayValue.Values)) + for _, av2 := range v.ArrayValue.Values { + arr = append(arr, anyValueToInterface(zlog, av2)) + } + return arr + case *protobufs.AnyValue_KvlistValue: + m := make(map[string]interface{}, len(v.KvlistValue.Values)) + for _, kv := range v.KvlistValue.Values { + if kv.Value != nil { + m[kv.Key] = anyValueToInterface(zlog, kv.Value) + } + } + return m + default: + zlog.Warn().Msg("unknown AnyValue type encountered in anyValueToInterface") + return nil + } +} + func ProtobufKVToRawMessage(zlog zerolog.Logger, kv []*protobufs.KeyValue) (json.RawMessage, error) { // 1. Build an intermediate map to represent the JSON object data := make(map[string]interface{}, len(kv)) @@ -464,25 +497,7 @@ func ProtobufKVToRawMessage(zlog zerolog.Logger, kv []*protobufs.KeyValue) (json if item.Value == nil { continue } - switch v := item.Value.GetValue().(type) { - case *protobufs.AnyValue_StringValue: - if v.StringValue != "" { - data[item.Key] = v.StringValue - } - case *protobufs.AnyValue_IntValue: - data[item.Key] = v.IntValue - case *protobufs.AnyValue_DoubleValue: - data[item.Key] = v.DoubleValue - case *protobufs.AnyValue_BoolValue: - data[item.Key] = v.BoolValue - case *protobufs.AnyValue_BytesValue: - if len(v.BytesValue) > 0 { - data[item.Key] = v.BytesValue - } - continue - default: - return nil, fmt.Errorf("unsupported attribute value type for key %s", item.Key) - } + data[item.Key] = anyValueToInterface(zlog, item.Value) } // 2. Marshal the map into bytes diff --git a/internal/pkg/api/handleOpAMP_test.go b/internal/pkg/api/handleOpAMP_test.go index 3bc0a11717..f18995f128 100644 --- a/internal/pkg/api/handleOpAMP_test.go +++ b/internal/pkg/api/handleOpAMP_test.go @@ -86,6 +86,38 @@ func TestProtobufKVToRawMessage(t *testing.T) { Value: &protobufs.AnyValue_BytesValue{BytesValue: []byte("bin")}, }, }, + { + Key: "array_key", + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_ArrayValue{ArrayValue: &protobufs.ArrayValue{ + Values: []*protobufs.AnyValue{ + {Value: &protobufs.AnyValue_StringValue{StringValue: "elem1"}}, + {Value: &protobufs.AnyValue_IntValue{IntValue: 2}}, + }, + }}, + }, + }, + { + Key: "kvlist_key", + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_KvlistValue{KvlistValue: &protobufs.KeyValueList{ + Values: []*protobufs.KeyValue{ + { + Key: "nested_string_key", + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_StringValue{StringValue: "nested"}, + }, + }, + { + Key: "nested_int_key", + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_IntValue{IntValue: 99}, + }, + }, + }, + }}, + }, + }, } zlog := zerolog.New(io.Discard) raw, err := ProtobufKVToRawMessage(zlog, input) @@ -99,6 +131,8 @@ func TestProtobufKVToRawMessage(t *testing.T) { require.Equal(t, 3.14, got["double_key"]) require.Equal(t, true, got["bool_key"]) require.Equal(t, base64.StdEncoding.EncodeToString([]byte("bin")), got["bytes_key"]) + require.Equal(t, []interface{}{"elem1", float64(2)}, got["array_key"]) + require.Equal(t, map[string]interface{}{"nested_string_key": "nested", "nested_int_key": float64(99)}, got["kvlist_key"]) } func TestEnrollAgentWithAgentToServerMessage(t *testing.T) { From 49a200cbf04de10abad30c902bb9902e314e8b15 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Fri, 27 Feb 2026 09:29:02 +0100 Subject: [PATCH 07/10] use ErrIndexNotFound --- internal/pkg/api/handleOpAMP.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/pkg/api/handleOpAMP.go b/internal/pkg/api/handleOpAMP.go index 5988ea7ce0..e50c37e4a4 100644 --- a/internal/pkg/api/handleOpAMP.go +++ b/internal/pkg/api/handleOpAMP.go @@ -30,6 +30,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/checkin" "github.com/elastic/fleet-server/v7/internal/pkg/config" "github.com/elastic/fleet-server/v7/internal/pkg/dl" + "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/model" ) @@ -214,7 +215,7 @@ func (oa *OpAMPT) findEnrolledAgent(ctx context.Context, zlog zerolog.Logger, ag } // if agents index doesn't exist yet, it will be created when the first agent document is indexed - if err != nil && strings.Contains(err.Error(), "index_not_found_exception") { + if errors.Is(err, es.ErrIndexNotFound) { zlog.Info().Msg("index not found when searching for enrolled agent") return nil, nil } From 3ccecabf3097d1ccced863096d752b447886644c Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Fri, 27 Feb 2026 09:42:13 +0100 Subject: [PATCH 08/10] uncomment e2e test asserts --- testing/e2e/stand_alone_test.go | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/testing/e2e/stand_alone_test.go b/testing/e2e/stand_alone_test.go index dde0e7a1b5..6d405ffb68 100644 --- a/testing/e2e/stand_alone_test.go +++ b/testing/e2e/stand_alone_test.go @@ -16,6 +16,7 @@ import ( "os/exec" "path/filepath" "runtime" + "strings" "syscall" "testing" "time" @@ -697,14 +698,13 @@ func (suite *StandAloneSuite) TestOpAMP() { agentDoc := suite.WaitForAgentDoc(ctx, instanceUID) suite.Equal(instanceUID, agentDoc.Agent.ID, "expected agent.id to match instanceUID") - // TODO: uncomment once https://github.com/elastic/fleet-server/pull/6400 is merged - // versionOut, err := exec.Command(otelBinaryPath, "--version").Output() - // suite.Require().NoError(err) - // otelVersion := strings.TrimPrefix(strings.TrimSpace(string(versionOut)), "otelcontribcol version ") - // suite.Equal("OPAMP", agentDoc.Type, "expected type to be OPAMP") - // suite.Equal("otelcontribcol", agentDoc.Agent.Type, "expected agent.type to be otelcontribcol") - // suite.Equal(otelVersion, agentDoc.Agent.Version, "expected agent.version to match otelcol-contrib binary version") - // suite.Equal(1, agentDoc.Revision, "expected policy_revision_idx to be 1") - // suite.Contains(agentDoc.Tags, "otelcontribcol", "expected tags to contain otelcontribcol") - // suite.Equal("online", agentDoc.Status, "expected status to be online") + versionOut, err := exec.Command(otelBinaryPath, "--version").Output() + suite.Require().NoError(err) + otelVersion := strings.TrimPrefix(strings.TrimSpace(string(versionOut)), "otelcontribcol version ") + suite.Equal("OPAMP", agentDoc.Type, "expected type to be OPAMP") + suite.Equal("otelcontribcol", agentDoc.Agent.Type, "expected agent.type to be otelcontribcol") + suite.Equal(otelVersion, agentDoc.Agent.Version, "expected agent.version to match otelcol-contrib binary version") + suite.Equal(1, agentDoc.Revision, "expected policy_revision_idx to be 1") + suite.Contains(agentDoc.Tags, "otelcontribcol", "expected tags to contain otelcontribcol") + suite.Equal("online", agentDoc.Status, "expected status to be online") } From 333bbfd9cd0c961ad17e9d997c93f4d1823543c2 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Fri, 27 Feb 2026 10:26:34 +0100 Subject: [PATCH 09/10] print agent doc --- testing/e2e/stand_alone_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/testing/e2e/stand_alone_test.go b/testing/e2e/stand_alone_test.go index 6d405ffb68..ed7f4e7950 100644 --- a/testing/e2e/stand_alone_test.go +++ b/testing/e2e/stand_alone_test.go @@ -696,6 +696,7 @@ func (suite *StandAloneSuite) TestOpAMP() { // .fleet-agents and asserting on its contents. suite.T().Logf("Waiting for agent %s to appear in .fleet-agents", instanceUID) agentDoc := suite.WaitForAgentDoc(ctx, instanceUID) + suite.T().Logf("Fetched agent document: %+v", agentDoc) suite.Equal(instanceUID, agentDoc.Agent.ID, "expected agent.id to match instanceUID") versionOut, err := exec.Command(otelBinaryPath, "--version").Output() @@ -706,5 +707,5 @@ func (suite *StandAloneSuite) TestOpAMP() { suite.Equal(otelVersion, agentDoc.Agent.Version, "expected agent.version to match otelcol-contrib binary version") suite.Equal(1, agentDoc.Revision, "expected policy_revision_idx to be 1") suite.Contains(agentDoc.Tags, "otelcontribcol", "expected tags to contain otelcontribcol") - suite.Equal("online", agentDoc.Status, "expected status to be online") + // suite.Equal("online", agentDoc.Status, "expected status to be online") } From 74bfa92eaa514f2c4a58373dfeab349116b5e2d9 Mon Sep 17 00:00:00 2001 From: Julia Bardi Date: Fri, 27 Feb 2026 11:05:08 +0100 Subject: [PATCH 10/10] remove status check --- testing/e2e/stand_alone_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/testing/e2e/stand_alone_test.go b/testing/e2e/stand_alone_test.go index ed7f4e7950..ce1e3c4934 100644 --- a/testing/e2e/stand_alone_test.go +++ b/testing/e2e/stand_alone_test.go @@ -696,7 +696,6 @@ func (suite *StandAloneSuite) TestOpAMP() { // .fleet-agents and asserting on its contents. suite.T().Logf("Waiting for agent %s to appear in .fleet-agents", instanceUID) agentDoc := suite.WaitForAgentDoc(ctx, instanceUID) - suite.T().Logf("Fetched agent document: %+v", agentDoc) suite.Equal(instanceUID, agentDoc.Agent.ID, "expected agent.id to match instanceUID") versionOut, err := exec.Command(otelBinaryPath, "--version").Output() @@ -707,5 +706,4 @@ func (suite *StandAloneSuite) TestOpAMP() { suite.Equal(otelVersion, agentDoc.Agent.Version, "expected agent.version to match otelcol-contrib binary version") suite.Equal(1, agentDoc.Revision, "expected policy_revision_idx to be 1") suite.Contains(agentDoc.Tags, "otelcontribcol", "expected tags to contain otelcontribcol") - // suite.Equal("online", agentDoc.Status, "expected status to be online") }