Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 219 additions & 7 deletions internal/pkg/api/handleOpAMP.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,11 @@ import (
"errors"
"fmt"
"net/http"
"strings"
"time"

"gopkg.in/yaml.v3"

"github.com/gofrs/uuid/v5"
"github.com/open-telemetry/opamp-go/protobufs"
oaServer "github.com/open-telemetry/opamp-go/server"
Expand All @@ -27,6 +30,7 @@ import (
"github.com/elastic/fleet-server/v7/internal/pkg/checkin"
"github.com/elastic/fleet-server/v7/internal/pkg/config"
"github.com/elastic/fleet-server/v7/internal/pkg/dl"
"github.com/elastic/fleet-server/v7/internal/pkg/es"
"github.com/elastic/fleet-server/v7/internal/pkg/model"
)

Expand All @@ -38,13 +42,17 @@ type OpAMPT struct {
cfg *config.Server
bulk bulk.Bulk
cache cache.Cache
bc *checkin.Bulk
bc checkinBulk

srv oaServer.OpAMPServer
handler oaServer.HTTPHandlerFunc
connCtx oaServer.ConnContext
}

type checkinBulk interface {
CheckIn(id string, opts ...checkin.Option) error
}

func NewOpAMPT(
ctx context.Context,
cfg *config.Server,
Expand Down Expand Up @@ -200,13 +208,20 @@ func (oa *OpAMPT) handleMessage(zlog zerolog.Logger, apiKey *apikey.APIKey) func
}
}

func (oa *OpAMPT) findEnrolledAgent(ctx context.Context, _ zerolog.Logger, agentID string) (*model.Agent, error) {
func (oa *OpAMPT) findEnrolledAgent(ctx context.Context, zlog zerolog.Logger, agentID string) (*model.Agent, error) {
agent, err := dl.FindAgent(ctx, oa.bulk, dl.QueryAgentByID, dl.FieldID, agentID)
if errors.Is(err, dl.ErrNotFound) {
return nil, nil
}

// if agents index doesn't exist yet, it will be created when the first agent document is indexed
if errors.Is(err, es.ErrIndexNotFound) {
zlog.Info().Msg("index not found when searching for enrolled agent")
return nil, nil
}

if err != nil {
zlog.Error().Err(err).Msg("failed to find agent by ID")
return nil, fmt.Errorf("failed to find agent: %w", err)
}

Expand All @@ -233,12 +248,17 @@ func (oa *OpAMPT) enrollAgent(zlog zerolog.Logger, agentID string, aToS *protobu
// description is only sent if any of its fields change.
meta := localMetadata{}
meta.Elastic.Agent.ID = agentID
agentType := ""
var identifyingAttributes, nonIdentifyingAttributes json.RawMessage
if aToS.AgentDescription != nil {
// Extract agent version
for _, ia := range aToS.AgentDescription.IdentifyingAttributes {
switch attribute.Key(ia.Key) {
case semconv.ServiceVersionKey:
meta.Elastic.Agent.Version = ia.GetValue().GetStringValue()
case semconv.ServiceNameKey:
agentType = ia.GetValue().GetStringValue()
meta.Elastic.Agent.Name = agentType
}
}
zlog.Debug().Str("opamp.agent.version", meta.Elastic.Agent.Version).Msg("extracted agent version")
Expand All @@ -250,9 +270,22 @@ func (oa *OpAMPT) enrollAgent(zlog zerolog.Logger, agentID string, aToS *protobu
hostname := nia.GetValue().GetStringValue()
meta.Host.Name = hostname
meta.Host.Hostname = hostname
case semconv.OSTypeKey:
osType := nia.GetValue().GetStringValue()
meta.Os.Platform = osType
}
}
zlog.Debug().Str("hostname", meta.Host.Hostname).Msg("extracted hostname")

identifyingAttributes, err = ProtobufKVToRawMessage(zlog, aToS.AgentDescription.IdentifyingAttributes)
if err != nil {
return nil, fmt.Errorf("failed to marshal identifying attributes: %w", err)
}

nonIdentifyingAttributes, err = ProtobufKVToRawMessage(zlog, aToS.AgentDescription.NonIdentifyingAttributes)
if err != nil {
return nil, fmt.Errorf("failed to marshal non-identifying attributes: %w", err)
}
}

// Update local metadata if something has changed
Expand All @@ -267,9 +300,17 @@ func (oa *OpAMPT) enrollAgent(zlog zerolog.Logger, agentID string, aToS *protobu
EnrolledAt: now.UTC().Format(time.RFC3339),
PolicyID: rec.PolicyID,
Agent: &model.AgentMetadata{
ID: agentID,
ID: agentID,
Version: meta.Elastic.Agent.Version,
Type: agentType,
},
LocalMetadata: data,
// Setting revision to 1, the collector won't receive policy changes and 0 would keep the collector in updating state
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

qq: the policy on fleet will never have a policy id > 1, if it can then agents can appear as outdated

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The UI hides the policy name and revision for collectors, as policies are not relevant for them (other than needing an enrollment token to authenticate with).

PolicyRevisionIdx: 1,
IdentifyingAttributes: identifyingAttributes,
NonIdentifyingAttributes: nonIdentifyingAttributes,
Type: "OPAMP",
Tags: []string{agentType},
}

data, err = json.Marshal(agent)
Expand All @@ -291,14 +332,42 @@ func (oa *OpAMPT) updateAgent(zlog zerolog.Logger, agent *model.Agent, aToS *pro

initialOpts := make([]checkin.Option, 0)

status := "online"

// Extract the health status from the health message if it exists.
if aToS.Health != nil {
initialOpts = append(initialOpts, checkin.WithStatus(aToS.Health.Status))
if !aToS.Health.Healthy {
status = "error"
} else if aToS.Health.Status == "StatusRecoverableError" {
status = "degraded"
}

// Extract the unhealthy reason from the health message if it exists.
// Extract the last_checkin_message from the health message if it exists.
if aToS.Health.LastError != "" {
unhealthyReason := []string{aToS.Health.LastError}
initialOpts = append(initialOpts, checkin.WithUnhealthyReason(&unhealthyReason))
initialOpts = append(initialOpts, checkin.WithMessage(aToS.Health.LastError))
} else {
initialOpts = append(initialOpts, checkin.WithMessage(aToS.Health.Status))
}
healthBytes, err := json.Marshal(aToS.Health)
if err != nil {
return fmt.Errorf("failed to marshal health: %w", err)
}
initialOpts = append(initialOpts, checkin.WithHealth(healthBytes))
}

initialOpts = append(initialOpts, checkin.WithStatus(status))
initialOpts = append(initialOpts, checkin.WithSequenceNum(aToS.SequenceNum))

capabilities := decodeCapabilities(aToS.Capabilities)
initialOpts = append(initialOpts, checkin.WithCapabilities(capabilities))

if aToS.EffectiveConfig != nil {
effectiveConfigBytes, err := ParseEffectiveConfig(aToS.EffectiveConfig)
if err != nil {
return fmt.Errorf("failed to parse effective config: %w", err)
}
if effectiveConfigBytes != nil {
initialOpts = append(initialOpts, checkin.WithEffectiveConfig(effectiveConfigBytes))
}
}

Expand All @@ -310,10 +379,153 @@ type localMetadata struct {
Agent struct {
ID string `json:"id,omitempty"`
Version string `json:"version,omitempty"`
Name string `json:"name,omitempty"`
} `json:"agent,omitempty"`
} `json:"elastic,omitempty"`
Host struct {
Hostname string `json:"hostname,omitempty"`
Name string `json:"name,omitempty"`
} `json:"host,omitempty"`
Os struct {
Platform string `json:"platform,omitempty"`
} `json:"os,omitempty"`
}

func ParseEffectiveConfig(effectiveConfig *protobufs.EffectiveConfig) ([]byte, error) {
if effectiveConfig.ConfigMap != nil && effectiveConfig.ConfigMap.ConfigMap[""] != nil {
configMap := effectiveConfig.ConfigMap.ConfigMap[""]

if len(configMap.Body) != 0 {
bodyBytes := configMap.Body

obj := make(map[string]interface{})
if err := yaml.Unmarshal(bodyBytes, &obj); err != nil {
return nil, fmt.Errorf("unmarshal effective config failure: %w", err)
}
redactSensitive(obj)
effectiveConfigBytes, err := json.Marshal(obj)
if err != nil {
return nil, fmt.Errorf("failed to marshal effective config: %w", err)
}
return effectiveConfigBytes, nil
}
}
return nil, nil
}

func redactSensitive(v interface{}) {
const redacted = "[REDACTED]"
switch typed := v.(type) {
case map[string]interface{}:
for key, val := range typed {
if redactKey(key) {
typed[key] = redacted
continue
}
redactSensitive(val)
}
case map[interface{}]interface{}:
for rawKey, val := range typed {
key, ok := rawKey.(string)
if ok && redactKey(key) {
typed[rawKey] = redacted
continue
}
redactSensitive(val)
}
case []interface{}:
for i := range typed {
redactSensitive(typed[i])
}
}
}

// TODO move to a common place, same as https://github.com/elastic/elastic-agent/blob/1c3fb4b4c8989cd2cfb692780debd7619820ae72/internal/pkg/diagnostics/diagnostics.go#L454-L468
func redactKey(k string) bool {
// "routekey" shouldn't be redacted.
// Add any other exceptions here.
if k == "routekey" {
return false
}

k = strings.ToLower(k)
return strings.Contains(k, "auth") ||
strings.Contains(k, "certificate") ||
strings.Contains(k, "passphrase") ||
strings.Contains(k, "password") ||
strings.Contains(k, "token") ||
strings.Contains(k, "key") ||
strings.Contains(k, "secret")
}

// anyValueToInterface recursively converts protobufs.AnyValue to Go interface{} for JSON marshalling
func anyValueToInterface(zlog zerolog.Logger, av *protobufs.AnyValue) interface{} {
switch v := av.GetValue().(type) {
case *protobufs.AnyValue_StringValue:
return v.StringValue
case *protobufs.AnyValue_IntValue:
return v.IntValue
case *protobufs.AnyValue_DoubleValue:
return v.DoubleValue
case *protobufs.AnyValue_BoolValue:
return v.BoolValue
case *protobufs.AnyValue_BytesValue:
return v.BytesValue
case *protobufs.AnyValue_ArrayValue:
arr := make([]interface{}, 0, len(v.ArrayValue.Values))
for _, av2 := range v.ArrayValue.Values {
arr = append(arr, anyValueToInterface(zlog, av2))
}
return arr
case *protobufs.AnyValue_KvlistValue:
m := make(map[string]interface{}, len(v.KvlistValue.Values))
for _, kv := range v.KvlistValue.Values {
if kv.Value != nil {
m[kv.Key] = anyValueToInterface(zlog, kv.Value)
}
}
return m
default:
zlog.Warn().Msg("unknown AnyValue type encountered in anyValueToInterface")
return nil
}
}

func ProtobufKVToRawMessage(zlog zerolog.Logger, kv []*protobufs.KeyValue) (json.RawMessage, error) {
// 1. Build an intermediate map to represent the JSON object
data := make(map[string]interface{}, len(kv))
for _, item := range kv {
if item.Value == nil {
continue
}
data[item.Key] = anyValueToInterface(zlog, item.Value)
}

// 2. Marshal the map into bytes
b, err := json.Marshal(data)
if err != nil {
zlog.Error().Err(err).Msg("failed to marshal key-value pairs")
return nil, err
}

return json.RawMessage(b), nil
}

// decodeCapabilities converts capability bitmask to human-readable strings
func decodeCapabilities(caps uint64) []string {
var result []string
capMap := map[uint64]string{
uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsStatus): "ReportsStatus",
uint64(protobufs.AgentCapabilities_AgentCapabilities_AcceptsRemoteConfig): "AcceptsRemoteConfig",
uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsEffectiveConfig): "ReportsEffectiveConfig",
uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsHealth): "ReportsHealth",
uint64(protobufs.AgentCapabilities_AgentCapabilities_ReportsAvailableComponents): "ReportsAvailableComponents",
uint64(protobufs.AgentCapabilities_AgentCapabilities_AcceptsRestartCommand): "AcceptsRestartCommand",
}
for mask, name := range capMap {
if caps&mask != 0 {
result = append(result, name)
}
}
return result
}
Loading