diff --git a/internal/pkg/api/handleOpAMP.go b/internal/pkg/api/handleOpAMP.go index fabbafba42..e50c37e4a4 100644 --- a/internal/pkg/api/handleOpAMP.go +++ b/internal/pkg/api/handleOpAMP.go @@ -10,8 +10,11 @@ import ( "errors" "fmt" "net/http" + "strings" "time" + "gopkg.in/yaml.v3" + "github.com/gofrs/uuid/v5" "github.com/open-telemetry/opamp-go/protobufs" oaServer "github.com/open-telemetry/opamp-go/server" @@ -27,6 +30,7 @@ import ( "github.com/elastic/fleet-server/v7/internal/pkg/checkin" "github.com/elastic/fleet-server/v7/internal/pkg/config" "github.com/elastic/fleet-server/v7/internal/pkg/dl" + "github.com/elastic/fleet-server/v7/internal/pkg/es" "github.com/elastic/fleet-server/v7/internal/pkg/model" ) @@ -38,13 +42,17 @@ type OpAMPT struct { cfg *config.Server bulk bulk.Bulk cache cache.Cache - bc *checkin.Bulk + bc checkinBulk srv oaServer.OpAMPServer handler oaServer.HTTPHandlerFunc connCtx oaServer.ConnContext } +type checkinBulk interface { + CheckIn(id string, opts ...checkin.Option) error +} + func NewOpAMPT( ctx context.Context, cfg *config.Server, @@ -200,13 +208,20 @@ func (oa *OpAMPT) handleMessage(zlog zerolog.Logger, apiKey *apikey.APIKey) func } } -func (oa *OpAMPT) findEnrolledAgent(ctx context.Context, _ zerolog.Logger, agentID string) (*model.Agent, error) { +func (oa *OpAMPT) findEnrolledAgent(ctx context.Context, zlog zerolog.Logger, agentID string) (*model.Agent, error) { agent, err := dl.FindAgent(ctx, oa.bulk, dl.QueryAgentByID, dl.FieldID, agentID) if errors.Is(err, dl.ErrNotFound) { return nil, nil } + // if agents index doesn't exist yet, it will be created when the first agent document is indexed + if errors.Is(err, es.ErrIndexNotFound) { + zlog.Info().Msg("index not found when searching for enrolled agent") + return nil, nil + } + if err != nil { + zlog.Error().Err(err).Msg("failed to find agent by ID") return nil, fmt.Errorf("failed to find agent: %w", err) } @@ -233,12 +248,17 @@ func (oa *OpAMPT) enrollAgent(zlog zerolog.Logger, agentID string, aToS *protobu // description is only sent if any of its fields change. meta := localMetadata{} meta.Elastic.Agent.ID = agentID + agentType := "" + var identifyingAttributes, nonIdentifyingAttributes json.RawMessage if aToS.AgentDescription != nil { // Extract agent version for _, ia := range aToS.AgentDescription.IdentifyingAttributes { switch attribute.Key(ia.Key) { case semconv.ServiceVersionKey: meta.Elastic.Agent.Version = ia.GetValue().GetStringValue() + case semconv.ServiceNameKey: + agentType = ia.GetValue().GetStringValue() + meta.Elastic.Agent.Name = agentType } } zlog.Debug().Str("opamp.agent.version", meta.Elastic.Agent.Version).Msg("extracted agent version") @@ -250,9 +270,22 @@ func (oa *OpAMPT) enrollAgent(zlog zerolog.Logger, agentID string, aToS *protobu hostname := nia.GetValue().GetStringValue() meta.Host.Name = hostname meta.Host.Hostname = hostname + case semconv.OSTypeKey: + osType := nia.GetValue().GetStringValue() + meta.Os.Platform = osType } } zlog.Debug().Str("hostname", meta.Host.Hostname).Msg("extracted hostname") + + identifyingAttributes, err = ProtobufKVToRawMessage(zlog, aToS.AgentDescription.IdentifyingAttributes) + if err != nil { + return nil, fmt.Errorf("failed to marshal identifying attributes: %w", err) + } + + nonIdentifyingAttributes, err = ProtobufKVToRawMessage(zlog, aToS.AgentDescription.NonIdentifyingAttributes) + if err != nil { + return nil, fmt.Errorf("failed to marshal non-identifying attributes: %w", err) + } } // Update local metadata if something has changed @@ -267,9 +300,17 @@ func (oa *OpAMPT) enrollAgent(zlog zerolog.Logger, agentID string, aToS *protobu EnrolledAt: now.UTC().Format(time.RFC3339), PolicyID: rec.PolicyID, Agent: &model.AgentMetadata{ - ID: agentID, + ID: agentID, + Version: meta.Elastic.Agent.Version, + Type: agentType, }, LocalMetadata: data, + // Setting revision to 1, the collector won't receive policy changes and 0 would keep the collector in updating state + PolicyRevisionIdx: 1, + IdentifyingAttributes: identifyingAttributes, + NonIdentifyingAttributes: nonIdentifyingAttributes, + Type: "OPAMP", + Tags: []string{agentType}, } data, err = json.Marshal(agent) @@ -291,14 +332,42 @@ func (oa *OpAMPT) updateAgent(zlog zerolog.Logger, agent *model.Agent, aToS *pro initialOpts := make([]checkin.Option, 0) + status := "online" + // Extract the health status from the health message if it exists. if aToS.Health != nil { - initialOpts = append(initialOpts, checkin.WithStatus(aToS.Health.Status)) + if !aToS.Health.Healthy { + status = "error" + } else if aToS.Health.Status == "StatusRecoverableError" { + status = "degraded" + } - // Extract the unhealthy reason from the health message if it exists. + // Extract the last_checkin_message from the health message if it exists. if aToS.Health.LastError != "" { - unhealthyReason := []string{aToS.Health.LastError} - initialOpts = append(initialOpts, checkin.WithUnhealthyReason(&unhealthyReason)) + initialOpts = append(initialOpts, checkin.WithMessage(aToS.Health.LastError)) + } else { + initialOpts = append(initialOpts, checkin.WithMessage(aToS.Health.Status)) + } + healthBytes, err := json.Marshal(aToS.Health) + if err != nil { + return fmt.Errorf("failed to marshal health: %w", err) + } + initialOpts = append(initialOpts, checkin.WithHealth(healthBytes)) + } + + initialOpts = append(initialOpts, checkin.WithStatus(status)) + initialOpts = append(initialOpts, checkin.WithSequenceNum(aToS.SequenceNum)) + + capabilities := decodeCapabilities(aToS.Capabilities) + initialOpts = append(initialOpts, checkin.WithCapabilities(capabilities)) + + if aToS.EffectiveConfig != nil { + effectiveConfigBytes, err := ParseEffectiveConfig(aToS.EffectiveConfig) + if err != nil { + return fmt.Errorf("failed to parse effective config: %w", err) + } + if effectiveConfigBytes != nil { + initialOpts = append(initialOpts, checkin.WithEffectiveConfig(effectiveConfigBytes)) } } @@ -310,10 +379,153 @@ type localMetadata struct { Agent struct { ID string `json:"id,omitempty"` Version string `json:"version,omitempty"` + Name string `json:"name,omitempty"` } `json:"agent,omitempty"` } `json:"elastic,omitempty"` Host struct { Hostname string `json:"hostname,omitempty"` Name string `json:"name,omitempty"` } `json:"host,omitempty"` + Os struct { + Platform string `json:"platform,omitempty"` + } `json:"os,omitempty"` +} + +func ParseEffectiveConfig(effectiveConfig *protobufs.EffectiveConfig) ([]byte, error) { + if effectiveConfig.ConfigMap != nil && effectiveConfig.ConfigMap.ConfigMap[""] != nil { + configMap := effectiveConfig.ConfigMap.ConfigMap[""] + + if len(configMap.Body) != 0 { + bodyBytes := configMap.Body + + obj := make(map[string]interface{}) + if err := yaml.Unmarshal(bodyBytes, &obj); err != nil { + return nil, fmt.Errorf("unmarshal effective config failure: %w", err) + } + redactSensitive(obj) + effectiveConfigBytes, err := json.Marshal(obj) + if err != nil { + return nil, fmt.Errorf("failed to marshal effective config: %w", err) + } + return effectiveConfigBytes, nil + } + } + return nil, nil +} + +func redactSensitive(v interface{}) { + const redacted = "[REDACTED]" + switch typed := v.(type) { + case map[string]interface{}: + for key, val := range typed { + if redactKey(key) { + typed[key] = redacted + continue + } + redactSensitive(val) + } + case map[interface{}]interface{}: + for rawKey, val := range typed { + key, ok := rawKey.(string) + if ok && redactKey(key) { + typed[rawKey] = redacted + continue + } + redactSensitive(val) + } + case []interface{}: + for i := range typed { + redactSensitive(typed[i]) + } + } +} + +// TODO move to a common place, same as https://github.com/elastic/elastic-agent/blob/1c3fb4b4c8989cd2cfb692780debd7619820ae72/internal/pkg/diagnostics/diagnostics.go#L454-L468 +func redactKey(k string) bool { + // "routekey" shouldn't be redacted. + // Add any other exceptions here. + if k == "routekey" { + return false + } + + k = strings.ToLower(k) + return strings.Contains(k, "auth") || + strings.Contains(k, "certificate") || + strings.Contains(k, "passphrase") || + strings.Contains(k, "password") || + strings.Contains(k, "token") || + strings.Contains(k, "key") || + strings.Contains(k, "secret") +} + +// anyValueToInterface recursively converts protobufs.AnyValue to Go interface{} for JSON marshalling +func anyValueToInterface(zlog zerolog.Logger, av *protobufs.AnyValue) interface{} { + switch v := av.GetValue().(type) { + case *protobufs.AnyValue_StringValue: + return v.StringValue + case *protobufs.AnyValue_IntValue: + return v.IntValue + case *protobufs.AnyValue_DoubleValue: + return v.DoubleValue + case *protobufs.AnyValue_BoolValue: + return v.BoolValue + case *protobufs.AnyValue_BytesValue: + return v.BytesValue + case *protobufs.AnyValue_ArrayValue: + arr := make([]interface{}, 0, len(v.ArrayValue.Values)) + for _, av2 := range v.ArrayValue.Values { + arr = append(arr, anyValueToInterface(zlog, av2)) + } + return arr + case *protobufs.AnyValue_KvlistValue: + m := make(map[string]interface{}, len(v.KvlistValue.Values)) + for _, kv := range v.KvlistValue.Values { + if kv.Value != nil { + m[kv.Key] = anyValueToInterface(zlog, kv.Value) + } + } + return m + default: + zlog.Warn().Msg("unknown AnyValue type encountered in anyValueToInterface") + return nil + } +} + +func ProtobufKVToRawMessage(zlog zerolog.Logger, kv []*protobufs.KeyValue) (json.RawMessage, error) { + // 1. Build an intermediate map to represent the JSON object + data := make(map[string]interface{}, len(kv)) + for _, item := range kv { + if item.Value == nil { + continue + } + data[item.Key] = anyValueToInterface(zlog, item.Value) + } + + // 2. Marshal the map into bytes + b, err := json.Marshal(data) + if err != nil { + zlog.Error().Err(err).Msg("failed to marshal key-value pairs") + return nil, err + } + + return json.RawMessage(b), nil +} + +// decodeCapabilities converts capability bitmask to human-readable strings +func decodeCapabilities(caps uint64) []string { + var result []string + capMap := map[uint64]string{ + uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsStatus): "ReportsStatus", + uint64(protobufs.AgentCapabilities_AgentCapabilities_AcceptsRemoteConfig): "AcceptsRemoteConfig", + uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsEffectiveConfig): "ReportsEffectiveConfig", + uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsHealth): "ReportsHealth", + uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents): "ReportsAvailableComponents", + uint64(protobufs.AgentCapabilities_AgentCapabilities_AcceptsRestartCommand): "AcceptsRestartCommand", + } + for mask, name := range capMap { + if caps&mask != 0 { + result = append(result, name) + } + } + return result } diff --git a/internal/pkg/api/handleOpAMP_test.go b/internal/pkg/api/handleOpAMP_test.go index ebadac236f..f18995f128 100644 --- a/internal/pkg/api/handleOpAMP_test.go +++ b/internal/pkg/api/handleOpAMP_test.go @@ -5,10 +5,25 @@ package api import ( + "encoding/base64" + "encoding/json" + "io" + "reflect" "testing" + "unsafe" + "github.com/elastic/fleet-server/v7/internal/pkg/apikey" + "github.com/elastic/fleet-server/v7/internal/pkg/checkin" "github.com/elastic/fleet-server/v7/internal/pkg/config" + "github.com/elastic/fleet-server/v7/internal/pkg/dl" + "github.com/elastic/fleet-server/v7/internal/pkg/es" + "github.com/elastic/fleet-server/v7/internal/pkg/model" + ftesting "github.com/elastic/fleet-server/v7/internal/pkg/testing" + "github.com/open-telemetry/opamp-go/protobufs" + "github.com/rs/zerolog" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + semconv "go.opentelemetry.io/otel/semconv/v1.37.0" ) func TestFeatureFlag(t *testing.T) { @@ -38,3 +53,253 @@ func TestFeatureFlag(t *testing.T) { }) } } + +func TestProtobufKVToRawMessage(t *testing.T) { + input := []*protobufs.KeyValue{ + { + Key: "string_key", + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_StringValue{StringValue: "hello"}, + }, + }, + { + Key: "int_key", + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_IntValue{IntValue: 42}, + }, + }, + { + Key: "double_key", + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_DoubleValue{DoubleValue: 3.14}, + }, + }, + { + Key: "bool_key", + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_BoolValue{BoolValue: true}, + }, + }, + { + Key: "bytes_key", + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_BytesValue{BytesValue: []byte("bin")}, + }, + }, + { + Key: "array_key", + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_ArrayValue{ArrayValue: &protobufs.ArrayValue{ + Values: []*protobufs.AnyValue{ + {Value: &protobufs.AnyValue_StringValue{StringValue: "elem1"}}, + {Value: &protobufs.AnyValue_IntValue{IntValue: 2}}, + }, + }}, + }, + }, + { + Key: "kvlist_key", + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_KvlistValue{KvlistValue: &protobufs.KeyValueList{ + Values: []*protobufs.KeyValue{ + { + Key: "nested_string_key", + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_StringValue{StringValue: "nested"}, + }, + }, + { + Key: "nested_int_key", + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_IntValue{IntValue: 99}, + }, + }, + }, + }}, + }, + }, + } + zlog := zerolog.New(io.Discard) + raw, err := ProtobufKVToRawMessage(zlog, input) + require.NoError(t, err) + + var got map[string]interface{} + require.NoError(t, json.Unmarshal(raw, &got)) + + require.Equal(t, "hello", got["string_key"]) + require.Equal(t, float64(42), got["int_key"]) + require.Equal(t, 3.14, got["double_key"]) + require.Equal(t, true, got["bool_key"]) + require.Equal(t, base64.StdEncoding.EncodeToString([]byte("bin")), got["bytes_key"]) + require.Equal(t, []interface{}{"elem1", float64(2)}, got["array_key"]) + require.Equal(t, map[string]interface{}{"nested_string_key": "nested", "nested_int_key": float64(99)}, got["kvlist_key"]) +} + +func TestEnrollAgentWithAgentToServerMessage(t *testing.T) { + bulker := ftesting.NewMockBulk() + + enrollKey := model.EnrollmentAPIKey{ + APIKeyID: "enroll-key-id", + PolicyID: "policy-123", + Active: true, + } + enrollKeyBytes, err := json.Marshal(enrollKey) + require.NoError(t, err) + + bulker.On("Search", mock.Anything, dl.FleetEnrollmentAPIKeys, mock.Anything, mock.Anything). + Return(&es.ResultT{ + HitsT: es.HitsT{ + Hits: []es.HitT{{Source: enrollKeyBytes}}, + }, + }, nil) + + var createdAgent model.Agent + bulker.On("Create", mock.Anything, dl.FleetAgents, "agent-123", mock.Anything, mock.Anything). + Run(func(args mock.Arguments) { + body, ok := args.Get(3).([]byte) + require.True(t, ok) + require.NoError(t, json.Unmarshal(body, &createdAgent)) + }). + Return("doc-id", nil) + + oa := &OpAMPT{bulk: bulker} + msg := &protobufs.AgentToServer{ + AgentDescription: &protobufs.AgentDescription{ + IdentifyingAttributes: []*protobufs.KeyValue{ + { + Key: string(semconv.ServiceVersionKey), + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_StringValue{StringValue: "1.2.3"}, + }, + }, + { + Key: string(semconv.ServiceNameKey), + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_StringValue{StringValue: "otel-collector"}, + }, + }, + }, + NonIdentifyingAttributes: []*protobufs.KeyValue{ + { + Key: string(semconv.HostNameKey), + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_StringValue{StringValue: "host-1"}, + }, + }, + { + Key: string(semconv.OSTypeKey), + Value: &protobufs.AnyValue{ + Value: &protobufs.AnyValue_StringValue{StringValue: "linux"}, + }, + }, + }, + }, + } + + apiKey := &apikey.APIKey{ID: "enroll-key-id"} + zlog := zerolog.New(io.Discard) + + agent, err := oa.enrollAgent(zlog, "agent-123", msg, apiKey) + require.NoError(t, err) + require.NotNil(t, agent) + require.Equal(t, "policy-123", agent.PolicyID) + require.Equal(t, "1.2.3", agent.Agent.Version) + require.Equal(t, "otel-collector", agent.Agent.Type) + require.Equal(t, []string{"otel-collector"}, agent.Tags) + + var meta localMetadata + require.NoError(t, json.Unmarshal(agent.LocalMetadata, &meta)) + require.Equal(t, "host-1", meta.Host.Hostname) + require.Equal(t, "linux", meta.Os.Platform) + + require.Equal(t, agent.Id, createdAgent.Agent.ID) + require.Equal(t, agent.PolicyID, createdAgent.PolicyID) + bulker.AssertExpectations(t) +} + +func TestUpdateAgentWithAgentToServerMessage(t *testing.T) { + checker := &mockCheckin{} + oa := &OpAMPT{bc: checker} + + agent := &model.Agent{ESDocument: model.ESDocument{Id: "agent-123"}} + + msg := &protobufs.AgentToServer{ + SequenceNum: 7, + Capabilities: uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsHealth) | + uint64(protobufs.AgentCapabilities_AgentCapabilities_AcceptsRemoteConfig), + Health: &protobufs.ComponentHealth{ + Healthy: true, + Status: "StatusRecoverableError", + LastError: "boom", + }, + EffectiveConfig: &protobufs.EffectiveConfig{ + ConfigMap: &protobufs.AgentConfigMap{ + ConfigMap: map[string]*protobufs.AgentConfigFile{ + "": { + Body: []byte("password: 12345\nnum: 2\n"), + ContentType: "text/yaml", + }, + }, + }, + }, + } + + zlog := zerolog.New(io.Discard) + require.NoError(t, oa.updateAgent(zlog, agent, msg)) + require.Equal(t, "agent-123", checker.id) + + pending := pendingFromOptions(t, checker.opts) + require.Equal(t, "degraded", getUnexportedField(pending, "status").String()) + require.Equal(t, "boom", getUnexportedField(pending, "message").String()) + require.Equal(t, uint64(7), getUnexportedField(pending, "sequenceNum").Uint()) + + extra := getUnexportedField(pending, "extra") + require.False(t, extra.IsNil()) + extraVal := extra.Elem() + + capabilitiesVal := getUnexportedField(extraVal, "capabilities") + capabilities, ok := capabilitiesVal.Interface().([]string) + require.True(t, ok) + require.ElementsMatch(t, []string{"ReportsHealth", "AcceptsRemoteConfig"}, capabilities) + + healthBytes := getUnexportedField(extraVal, "health").Bytes() + var health protobufs.ComponentHealth + require.NoError(t, json.Unmarshal(healthBytes, &health)) + require.Equal(t, "boom", health.LastError) + require.Equal(t, "StatusRecoverableError", health.Status) + + configBytes := getUnexportedField(extraVal, "effectiveConfig").Bytes() + var config map[string]interface{} + require.NoError(t, json.Unmarshal(configBytes, &config)) + require.Equal(t, "[REDACTED]", config["password"]) + require.Equal(t, float64(2), config["num"]) +} + +type mockCheckin struct { + id string + opts []checkin.Option +} + +func (m *mockCheckin) CheckIn(id string, opts ...checkin.Option) error { + m.id = id + m.opts = opts + return nil +} + +func pendingFromOptions(t *testing.T, opts []checkin.Option) reflect.Value { + t.Helper() + require.NotEmpty(t, opts) + + sampleOpt := checkin.WithStatus("") + argType := reflect.TypeOf(sampleOpt).In(0) + pendingPtr := reflect.New(argType.Elem()) + for _, opt := range opts { + reflect.ValueOf(opt).Call([]reflect.Value{pendingPtr}) + } + return pendingPtr.Elem() +} + +func getUnexportedField(v reflect.Value, name string) reflect.Value { + field := v.FieldByName(name) + return reflect.NewAt(field.Type(), unsafe.Pointer(field.UnsafeAddr())).Elem() +} diff --git a/internal/pkg/checkin/bulk.go b/internal/pkg/checkin/bulk.go index e02966156c..d5a4a67f58 100644 --- a/internal/pkg/checkin/bulk.go +++ b/internal/pkg/checkin/bulk.go @@ -125,6 +125,39 @@ func WithPolicyRevisionIDX(idx int64) Option { } } +func WithSequenceNum(seq uint64) Option { + return func(pending *pendingT) { + pending.sequenceNum = seq + } +} + +func WithHealth(health []byte) Option { + return func(pending *pendingT) { + if pending.extra == nil { + pending.extra = &extraT{} + } + pending.extra.health = health + } +} + +func WithCapabilities(capabilities []string) Option { + return func(pending *pendingT) { + if pending.extra == nil { + pending.extra = &extraT{} + } + pending.extra.capabilities = capabilities + } +} + +func WithEffectiveConfig(effectiveConfig []byte) Option { + return func(pending *pendingT) { + if pending.extra == nil { + pending.extra = &extraT{} + } + pending.extra.effectiveConfig = effectiveConfig + } +} + func WithAvailableRollbacks(availableRollbacks []byte) Option { return func(pending *pendingT) { if pending.extra == nil { @@ -141,6 +174,9 @@ type extraT struct { components []byte deleteAudit bool availableRollbacks []byte + health []byte + capabilities []string + effectiveConfig []byte } // Minimize the size of this structure. @@ -154,6 +190,7 @@ type pendingT struct { revisionIDX int64 extra *extraT unhealthyReason *[]string + sequenceNum uint64 } // Bulk will batch pending checkins and update elasticsearch at a set interval. @@ -337,6 +374,7 @@ func toUpdateBody(now string, pending pendingT) ([]byte, error) { dl.FieldLastCheckinStatus: pending.status, // Set the pending status dl.FieldLastCheckinMessage: pending.message, // Set the status message dl.FieldUnhealthyReason: pending.unhealthyReason, + dl.FieldSequenceNum: pending.sequenceNum, } if pending.agentPolicyID != "" { fields[dl.FieldAgentPolicyID] = pending.agentPolicyID @@ -364,6 +402,18 @@ func toUpdateBody(now string, pending pendingT) ([]byte, error) { fields[dl.FieldComponents] = json.RawMessage(pending.extra.components) } + if pending.extra.health != nil { + fields[dl.FieldHealth] = json.RawMessage(pending.extra.health) + } + + if pending.extra.capabilities != nil { + fields[dl.FieldCapabilities] = pending.extra.capabilities + } + + if pending.extra.effectiveConfig != nil { + fields[dl.FieldEffectiveConfig] = json.RawMessage(pending.extra.effectiveConfig) + } + // If seqNo changed, set the field appropriately if pending.extra.seqNo.IsSet() { fields[dl.FieldActionSeqNo] = pending.extra.seqNo diff --git a/internal/pkg/dl/constants.go b/internal/pkg/dl/constants.go index d81b0d4b65..26c098d808 100644 --- a/internal/pkg/dl/constants.go +++ b/internal/pkg/dl/constants.go @@ -45,6 +45,10 @@ const ( FieldUnenrolledReason = "unenrolled_reason" FiledType = "type" FieldUnhealthyReason = "unhealthy_reason" + FieldSequenceNum = "sequence_num" + FieldHealth = "health" + FieldCapabilities = "capabilities" + FieldEffectiveConfig = "effective_config" FieldActive = "active" FieldNamespaces = "namespaces" diff --git a/internal/pkg/model/schema.go b/internal/pkg/model/schema.go index 9ce75d006b..b0961e469f 100644 --- a/internal/pkg/model/schema.go +++ b/internal/pkg/model/schema.go @@ -141,6 +141,9 @@ type Agent struct { // Agent timestamp for audit unenroll/uninstall action AuditUnenrolledTime string `json:"audit_unenrolled_time,omitempty"` + // List of capabilities of the collector + Capabilities []string `json:"capabilities,omitempty"` + // Elastic Agent components detailed status information Components json.RawMessage `json:"components,omitempty"` @@ -153,12 +156,21 @@ type Agent struct { // Deprecated. Use Outputs instead. ID of the API key the Elastic Agent uses to authenticate with elasticsearch DefaultAPIKeyID string `json:"default_api_key_id,omitempty"` + // The effective config that the collector is running + EffectiveConfig json.RawMessage `json:"effective_config,omitempty"` + // Date/time the Elastic Agent enrolled EnrolledAt string `json:"enrolled_at"` // Enrollment ID EnrollmentID string `json:"enrollment_id,omitempty"` + // Health information of the collector + Health json.RawMessage `json:"health,omitempty"` + + // Identifying attributes of the collector + IdentifyingAttributes json.RawMessage `json:"identifying_attributes,omitempty"` + // Date/time the Elastic Agent checked in last time LastCheckin string `json:"last_checkin,omitempty"` @@ -177,6 +189,9 @@ type Agent struct { // Namespaces Namespaces []string `json:"namespaces,omitempty"` + // Non-identifying attributes of the collector + NonIdentifyingAttributes json.RawMessage `json:"non_identifying_attributes,omitempty"` + // Outputs is the policy output data, mapping the output name to its data Outputs map[string]*PolicyOutput `json:"outputs,omitempty"` @@ -198,6 +213,9 @@ type Agent struct { // hash of token provided during enrollment that allows replacement by another enrollment with same ID ReplaceToken string `json:"replace_token,omitempty"` + // The sequence number of the last collector message + SequenceNum int64 `json:"sequence_num,omitempty"` + // Shared ID SharedID string `json:"shared_id,omitempty"` @@ -250,6 +268,9 @@ type AgentMetadata struct { // The unique identifier for the Elastic Agent ID string `json:"id"` + // The type of the Elastic Agent + Type string `json:"type,omitempty"` + // The version of the Elastic Agent Version string `json:"version"` } diff --git a/model/schema.json b/model/schema.json index 4e63c14d73..49cbdcbd87 100644 --- a/model/schema.json +++ b/model/schema.json @@ -188,6 +188,10 @@ "version": { "description": "The version of the Elastic Agent", "type": "string" + }, + "type": { + "description": "The type of the Elastic Agent", + "type": "string" } }, "required": ["id", "version"] @@ -744,6 +748,33 @@ } } } + }, + "identifying_attributes": { + "description": "Identifying attributes of the collector", + "format": "raw" + }, + "non_identifying_attributes": { + "description": "Non-identifying attributes of the collector", + "format": "raw" + }, + "sequence_num": { + "description": "The sequence number of the last collector message", + "type": "integer" + }, + "health": { + "description": "Health information of the collector", + "format": "raw" + }, + "capabilities": { + "description": "List of capabilities of the collector", + "type": "array", + "items": { + "type": "string" + } + }, + "effective_config": { + "description": "The effective config that the collector is running", + "format": "raw" } }, "required": ["_id", "type", "active", "enrolled_at", "status"] diff --git a/testing/e2e/stand_alone_test.go b/testing/e2e/stand_alone_test.go index dde0e7a1b5..ce1e3c4934 100644 --- a/testing/e2e/stand_alone_test.go +++ b/testing/e2e/stand_alone_test.go @@ -16,6 +16,7 @@ import ( "os/exec" "path/filepath" "runtime" + "strings" "syscall" "testing" "time" @@ -697,14 +698,12 @@ func (suite *StandAloneSuite) TestOpAMP() { agentDoc := suite.WaitForAgentDoc(ctx, instanceUID) suite.Equal(instanceUID, agentDoc.Agent.ID, "expected agent.id to match instanceUID") - // TODO: uncomment once https://github.com/elastic/fleet-server/pull/6400 is merged - // versionOut, err := exec.Command(otelBinaryPath, "--version").Output() - // suite.Require().NoError(err) - // otelVersion := strings.TrimPrefix(strings.TrimSpace(string(versionOut)), "otelcontribcol version ") - // suite.Equal("OPAMP", agentDoc.Type, "expected type to be OPAMP") - // suite.Equal("otelcontribcol", agentDoc.Agent.Type, "expected agent.type to be otelcontribcol") - // suite.Equal(otelVersion, agentDoc.Agent.Version, "expected agent.version to match otelcol-contrib binary version") - // suite.Equal(1, agentDoc.Revision, "expected policy_revision_idx to be 1") - // suite.Contains(agentDoc.Tags, "otelcontribcol", "expected tags to contain otelcontribcol") - // suite.Equal("online", agentDoc.Status, "expected status to be online") + versionOut, err := exec.Command(otelBinaryPath, "--version").Output() + suite.Require().NoError(err) + otelVersion := strings.TrimPrefix(strings.TrimSpace(string(versionOut)), "otelcontribcol version ") + suite.Equal("OPAMP", agentDoc.Type, "expected type to be OPAMP") + suite.Equal("otelcontribcol", agentDoc.Agent.Type, "expected agent.type to be otelcontribcol") + suite.Equal(otelVersion, agentDoc.Agent.Version, "expected agent.version to match otelcol-contrib binary version") + suite.Equal(1, agentDoc.Revision, "expected policy_revision_idx to be 1") + suite.Contains(agentDoc.Tags, "otelcontribcol", "expected tags to contain otelcontribcol") }