Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 44 additions & 164 deletions comp/core/workloadmeta/collectors/internal/ecsfargate/ecsfargate.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,37 +11,51 @@ package ecsfargate
import (
"context"
"strings"
"time"

"go.uber.org/fx"

"github.com/DataDog/datadog-agent/comp/core/config"
"github.com/DataDog/datadog-agent/comp/core/workloadmeta"
"github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/util"
pkgConfig "github.com/DataDog/datadog-agent/pkg/config"
"github.com/DataDog/datadog-agent/pkg/errors"
ecsmeta "github.com/DataDog/datadog-agent/pkg/util/ecs/metadata"
v2 "github.com/DataDog/datadog-agent/pkg/util/ecs/metadata/v2"
"github.com/DataDog/datadog-agent/pkg/util/log"
"go.uber.org/fx"
"github.com/DataDog/datadog-agent/pkg/util/ecs/metadata/v3or4"
)

const (
collectorID = "ecs_fargate"
componentName = "workloadmeta-ecs_fargate"
)

type dependencies struct {
fx.In

Config config.Component
}

type collector struct {
id string
store workloadmeta.Component
catalog workloadmeta.AgentType
metaV2 v2.Client
seen map[workloadmeta.EntityID]struct{}
id string
store workloadmeta.Component
catalog workloadmeta.AgentType
metaV2 v2.Client
metaV4 v3or4.Client
seen map[workloadmeta.EntityID]struct{}
config config.Component
taskCollectionEnabled bool
taskCollectionParser util.TaskParser
}

// NewCollector returns a new ecsfargate collector provider and an error
func NewCollector() (workloadmeta.CollectorProvider, error) {
func NewCollector(deps dependencies) (workloadmeta.CollectorProvider, error) {
return workloadmeta.CollectorProvider{
Collector: &collector{
id: collectorID,
catalog: workloadmeta.NodeAgent | workloadmeta.ProcessAgent,
seen: make(map[workloadmeta.EntityID]struct{}),
id: collectorID,
catalog: workloadmeta.NodeAgent | workloadmeta.ProcessAgent,
seen: make(map[workloadmeta.EntityID]struct{}),
config: deps.Config,
taskCollectionEnabled: util.IsTaskCollectionEnabled(deps.Config),
},
}, nil
}
Expand All @@ -52,28 +66,40 @@ func GetFxOptions() fx.Option {
}

func (c *collector) Start(_ context.Context, store workloadmeta.Component) error {
if !config.IsFeaturePresent(config.ECSFargate) {
if !pkgConfig.IsFeaturePresent(pkgConfig.ECSFargate) {
return errors.NewDisabled(componentName, "Agent is not running on ECS Fargate")
}

var err error

c.store = store

var err error
c.metaV2, err = ecsmeta.V2()
if err != nil {
return err
}

c.setTaskCollectionParser()

return nil
}

func (c *collector) setTaskCollectionParser() {
var err error
c.metaV4, err = ecsmeta.V4FromCurrentTask()
if c.taskCollectionEnabled && err == nil {
c.taskCollectionParser = c.parseTaskFromV4Endpoint
return
}
c.taskCollectionParser = c.parseTaskFromV2Endpoint
}

func (c *collector) Pull(ctx context.Context) error {
task, err := c.metaV2.GetTask(ctx)
task, err := c.taskCollectionParser(ctx)
if err != nil {
return err
}

c.store.Notify(c.parseTask(task))
c.store.Notify(task)

return nil
}
Expand All @@ -86,152 +112,6 @@ func (c *collector) GetTargetCatalog() workloadmeta.AgentType {
return c.catalog
}

func (c *collector) parseTask(task *v2.Task) []workloadmeta.CollectorEvent {
events := []workloadmeta.CollectorEvent{}
seen := make(map[workloadmeta.EntityID]struct{})

// We only want to collect tasks without a STOPPED status.
if task.KnownStatus == "STOPPED" {
return events
}

arnParts := strings.Split(task.TaskARN, "/")
taskID := arnParts[len(arnParts)-1]
entityID := workloadmeta.EntityID{
Kind: workloadmeta.KindECSTask,
ID: task.TaskARN,
}

seen[entityID] = struct{}{}

taskContainers, containerEvents := c.parseTaskContainers(task, seen)
entity := &workloadmeta.ECSTask{
EntityID: entityID,
EntityMeta: workloadmeta.EntityMeta{
Name: taskID,
},
ClusterName: parseClusterName(task.ClusterName),
Region: parseRegion(task.ClusterName),
Family: task.Family,
Version: task.Version,
LaunchType: workloadmeta.ECSLaunchTypeFargate,
Containers: taskContainers,

// the AvailabilityZone metadata is only available for
// Fargate tasks using platform version 1.4 or later
AvailabilityZone: task.AvailabilityZone,
}

events = append(events, containerEvents...)
events = append(events, workloadmeta.CollectorEvent{
Source: workloadmeta.SourceRuntime,
Type: workloadmeta.EventTypeSet,
Entity: entity,
})

for seenID := range c.seen {
if _, ok := seen[seenID]; ok {
continue
}

var entity workloadmeta.Entity
switch seenID.Kind {
case workloadmeta.KindECSTask:
entity = &workloadmeta.ECSTask{EntityID: seenID}
case workloadmeta.KindContainer:
entity = &workloadmeta.Container{EntityID: seenID}
default:
log.Errorf("cannot handle expired entity of kind %q, skipping", seenID.Kind)
continue
}

events = append(events, workloadmeta.CollectorEvent{
Type: workloadmeta.EventTypeUnset,
Source: workloadmeta.SourceRuntime,
Entity: entity,
})
}

c.seen = seen

return events
}

func (c *collector) parseTaskContainers(
task *v2.Task,
seen map[workloadmeta.EntityID]struct{},
) ([]workloadmeta.OrchestratorContainer, []workloadmeta.CollectorEvent) {
taskContainers := make([]workloadmeta.OrchestratorContainer, 0, len(task.Containers))
events := make([]workloadmeta.CollectorEvent, 0, len(task.Containers))

for _, container := range task.Containers {
containerID := container.DockerID
taskContainers = append(taskContainers, workloadmeta.OrchestratorContainer{
ID: containerID,
Name: container.Name,
})
entityID := workloadmeta.EntityID{
Kind: workloadmeta.KindContainer,
ID: containerID,
}

seen[entityID] = struct{}{}

image, err := workloadmeta.NewContainerImage(container.ImageID, container.Image)

if err != nil {
log.Debugf("cannot split image name %q: %s", container.Image, err)
}

ips := make(map[string]string)

for _, net := range container.Networks {
if net.NetworkMode == "awsvpc" && len(net.IPv4Addresses) > 0 {
ips["awsvpc"] = net.IPv4Addresses[0]
}
}

var startedAt time.Time
if container.StartedAt != "" {
startedAt, err = time.Parse(time.RFC3339, container.StartedAt)
if err != nil {
log.Debugf("cannot parse StartedAt %q for container %q: %s", container.StartedAt, container.DockerID, err)
}
}

var createdAt time.Time
if container.CreatedAt != "" {
createdAt, err = time.Parse(time.RFC3339, container.CreatedAt)
if err != nil {
log.Debugf("could not parse creation time '%q' for container %q: %s", container.CreatedAt, container.DockerID, err)
}
}

events = append(events, workloadmeta.CollectorEvent{
Source: workloadmeta.SourceRuntime,
Type: workloadmeta.EventTypeSet,
Entity: &workloadmeta.Container{
EntityID: entityID,
EntityMeta: workloadmeta.EntityMeta{
Name: container.DockerName,
Labels: container.Labels,
},
Image: image,
Runtime: workloadmeta.ContainerRuntimeECSFargate,
NetworkIPs: ips,
State: workloadmeta.ContainerState{
StartedAt: startedAt,
CreatedAt: createdAt,
Running: container.KnownStatus == "RUNNING",
Status: parseStatus(container.KnownStatus),
},
},
})
}

return taskContainers, events
}

// parseClusterName returns the short name of a cluster. it detects if the name
// is an ARN and converts it if that's the case.
func parseClusterName(value string) string {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build docker

package ecsfargate

import (
"github.com/DataDog/datadog-agent/comp/core/workloadmeta"
)

type fakeWorkloadmetaStore struct {
workloadmeta.Component
notifiedEvents []workloadmeta.CollectorEvent
}

func (store *fakeWorkloadmetaStore) Notify(events []workloadmeta.CollectorEvent) {
store.notifiedEvents = append(store.notifiedEvents, events...)
}
Loading