diff --git a/comp/core/workloadmeta/collectors/internal/ecsfargate/ecsfargate.go b/comp/core/workloadmeta/collectors/internal/ecsfargate/ecsfargate.go index de5fa1c3e548..c0f8ea86c88d 100644 --- a/comp/core/workloadmeta/collectors/internal/ecsfargate/ecsfargate.go +++ b/comp/core/workloadmeta/collectors/internal/ecsfargate/ecsfargate.go @@ -11,15 +11,17 @@ package ecsfargate import ( "context" "strings" - "time" + "go.uber.org/fx" + + "github.com/DataDog/datadog-agent/comp/core/config" "github.com/DataDog/datadog-agent/comp/core/workloadmeta" - "github.com/DataDog/datadog-agent/pkg/config" + "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/util" + pkgConfig "github.com/DataDog/datadog-agent/pkg/config" "github.com/DataDog/datadog-agent/pkg/errors" ecsmeta "github.com/DataDog/datadog-agent/pkg/util/ecs/metadata" v2 "github.com/DataDog/datadog-agent/pkg/util/ecs/metadata/v2" - "github.com/DataDog/datadog-agent/pkg/util/log" - "go.uber.org/fx" + "github.com/DataDog/datadog-agent/pkg/util/ecs/metadata/v3or4" ) const ( @@ -27,21 +29,33 @@ const ( componentName = "workloadmeta-ecs_fargate" ) +type dependencies struct { + fx.In + + Config config.Component +} + type collector struct { - id string - store workloadmeta.Component - catalog workloadmeta.AgentType - metaV2 v2.Client - seen map[workloadmeta.EntityID]struct{} + id string + store workloadmeta.Component + catalog workloadmeta.AgentType + metaV2 v2.Client + metaV4 v3or4.Client + seen map[workloadmeta.EntityID]struct{} + config config.Component + taskCollectionEnabled bool + taskCollectionParser util.TaskParser } // NewCollector returns a new ecsfargate collector provider and an error -func NewCollector() (workloadmeta.CollectorProvider, error) { +func NewCollector(deps dependencies) (workloadmeta.CollectorProvider, error) { return workloadmeta.CollectorProvider{ Collector: &collector{ - id: collectorID, - catalog: workloadmeta.NodeAgent | workloadmeta.ProcessAgent, - seen: make(map[workloadmeta.EntityID]struct{}), + id: collectorID, + catalog: workloadmeta.NodeAgent | workloadmeta.ProcessAgent, + seen: make(map[workloadmeta.EntityID]struct{}), + config: deps.Config, + taskCollectionEnabled: util.IsTaskCollectionEnabled(deps.Config), }, }, nil } @@ -52,28 +66,40 @@ func GetFxOptions() fx.Option { } func (c *collector) Start(_ context.Context, store workloadmeta.Component) error { - if !config.IsFeaturePresent(config.ECSFargate) { + if !pkgConfig.IsFeaturePresent(pkgConfig.ECSFargate) { return errors.NewDisabled(componentName, "Agent is not running on ECS Fargate") } - var err error - c.store = store + + var err error c.metaV2, err = ecsmeta.V2() if err != nil { return err } + c.setTaskCollectionParser() + return nil } +func (c *collector) setTaskCollectionParser() { + var err error + c.metaV4, err = ecsmeta.V4FromCurrentTask() + if c.taskCollectionEnabled && err == nil { + c.taskCollectionParser = c.parseTaskFromV4Endpoint + return + } + c.taskCollectionParser = c.parseTaskFromV2Endpoint +} + func (c *collector) Pull(ctx context.Context) error { - task, err := c.metaV2.GetTask(ctx) + task, err := c.taskCollectionParser(ctx) if err != nil { return err } - c.store.Notify(c.parseTask(task)) + c.store.Notify(task) return nil } @@ -86,152 +112,6 @@ func (c *collector) GetTargetCatalog() workloadmeta.AgentType { return c.catalog } -func (c *collector) parseTask(task *v2.Task) []workloadmeta.CollectorEvent { - events := []workloadmeta.CollectorEvent{} - seen := make(map[workloadmeta.EntityID]struct{}) - - // We only want to collect tasks without a STOPPED status. - if task.KnownStatus == "STOPPED" { - return events - } - - arnParts := strings.Split(task.TaskARN, "/") - taskID := arnParts[len(arnParts)-1] - entityID := workloadmeta.EntityID{ - Kind: workloadmeta.KindECSTask, - ID: task.TaskARN, - } - - seen[entityID] = struct{}{} - - taskContainers, containerEvents := c.parseTaskContainers(task, seen) - entity := &workloadmeta.ECSTask{ - EntityID: entityID, - EntityMeta: workloadmeta.EntityMeta{ - Name: taskID, - }, - ClusterName: parseClusterName(task.ClusterName), - Region: parseRegion(task.ClusterName), - Family: task.Family, - Version: task.Version, - LaunchType: workloadmeta.ECSLaunchTypeFargate, - Containers: taskContainers, - - // the AvailabilityZone metadata is only available for - // Fargate tasks using platform version 1.4 or later - AvailabilityZone: task.AvailabilityZone, - } - - events = append(events, containerEvents...) - events = append(events, workloadmeta.CollectorEvent{ - Source: workloadmeta.SourceRuntime, - Type: workloadmeta.EventTypeSet, - Entity: entity, - }) - - for seenID := range c.seen { - if _, ok := seen[seenID]; ok { - continue - } - - var entity workloadmeta.Entity - switch seenID.Kind { - case workloadmeta.KindECSTask: - entity = &workloadmeta.ECSTask{EntityID: seenID} - case workloadmeta.KindContainer: - entity = &workloadmeta.Container{EntityID: seenID} - default: - log.Errorf("cannot handle expired entity of kind %q, skipping", seenID.Kind) - continue - } - - events = append(events, workloadmeta.CollectorEvent{ - Type: workloadmeta.EventTypeUnset, - Source: workloadmeta.SourceRuntime, - Entity: entity, - }) - } - - c.seen = seen - - return events -} - -func (c *collector) parseTaskContainers( - task *v2.Task, - seen map[workloadmeta.EntityID]struct{}, -) ([]workloadmeta.OrchestratorContainer, []workloadmeta.CollectorEvent) { - taskContainers := make([]workloadmeta.OrchestratorContainer, 0, len(task.Containers)) - events := make([]workloadmeta.CollectorEvent, 0, len(task.Containers)) - - for _, container := range task.Containers { - containerID := container.DockerID - taskContainers = append(taskContainers, workloadmeta.OrchestratorContainer{ - ID: containerID, - Name: container.Name, - }) - entityID := workloadmeta.EntityID{ - Kind: workloadmeta.KindContainer, - ID: containerID, - } - - seen[entityID] = struct{}{} - - image, err := workloadmeta.NewContainerImage(container.ImageID, container.Image) - - if err != nil { - log.Debugf("cannot split image name %q: %s", container.Image, err) - } - - ips := make(map[string]string) - - for _, net := range container.Networks { - if net.NetworkMode == "awsvpc" && len(net.IPv4Addresses) > 0 { - ips["awsvpc"] = net.IPv4Addresses[0] - } - } - - var startedAt time.Time - if container.StartedAt != "" { - startedAt, err = time.Parse(time.RFC3339, container.StartedAt) - if err != nil { - log.Debugf("cannot parse StartedAt %q for container %q: %s", container.StartedAt, container.DockerID, err) - } - } - - var createdAt time.Time - if container.CreatedAt != "" { - createdAt, err = time.Parse(time.RFC3339, container.CreatedAt) - if err != nil { - log.Debugf("could not parse creation time '%q' for container %q: %s", container.CreatedAt, container.DockerID, err) - } - } - - events = append(events, workloadmeta.CollectorEvent{ - Source: workloadmeta.SourceRuntime, - Type: workloadmeta.EventTypeSet, - Entity: &workloadmeta.Container{ - EntityID: entityID, - EntityMeta: workloadmeta.EntityMeta{ - Name: container.DockerName, - Labels: container.Labels, - }, - Image: image, - Runtime: workloadmeta.ContainerRuntimeECSFargate, - NetworkIPs: ips, - State: workloadmeta.ContainerState{ - StartedAt: startedAt, - CreatedAt: createdAt, - Running: container.KnownStatus == "RUNNING", - Status: parseStatus(container.KnownStatus), - }, - }, - }) - } - - return taskContainers, events -} - // parseClusterName returns the short name of a cluster. it detects if the name // is an ARN and converts it if that's the case. func parseClusterName(value string) string { diff --git a/comp/core/workloadmeta/collectors/internal/ecsfargate/ecsfargate_test.go b/comp/core/workloadmeta/collectors/internal/ecsfargate/ecsfargate_test.go new file mode 100644 index 000000000000..b3e883297d08 --- /dev/null +++ b/comp/core/workloadmeta/collectors/internal/ecsfargate/ecsfargate_test.go @@ -0,0 +1,21 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build docker + +package ecsfargate + +import ( + "github.com/DataDog/datadog-agent/comp/core/workloadmeta" +) + +type fakeWorkloadmetaStore struct { + workloadmeta.Component + notifiedEvents []workloadmeta.CollectorEvent +} + +func (store *fakeWorkloadmetaStore) Notify(events []workloadmeta.CollectorEvent) { + store.notifiedEvents = append(store.notifiedEvents, events...) +} diff --git a/comp/core/workloadmeta/collectors/internal/ecsfargate/testdata/redis.json b/comp/core/workloadmeta/collectors/internal/ecsfargate/testdata/redis.json new file mode 100644 index 000000000000..33bf00d97445 --- /dev/null +++ b/comp/core/workloadmeta/collectors/internal/ecsfargate/testdata/redis.json @@ -0,0 +1,146 @@ +{ + "Cluster": "arn:aws:ecs:us-east-1:123457279990:cluster/ecs-cluster", + "TaskARN": "arn:aws:ecs:us-east-1:123457279990:task/ecs-cluster/938f6d263c464aa5985dc67ab7f38a7e", + "Family": "my-redis", + "Revision": "1", + "DesiredStatus": "RUNNING", + "KnownStatus": "RUNNING", + "Limits": { "CPU": 1, "Memory": 2048 }, + "PullStartedAt": "2023-11-20T12:09:45.059013479Z", + "PullStoppedAt": "2023-11-20T12:10:41.166377771Z", + "AvailabilityZone": "us-east-1d", + "LaunchType": "FARGATE", + "Containers": [ + { + "DockerId": "938f6d263c464aa5985dc67ab7f38a7e-1714341083", + "Name": "log_router", + "DockerName": "log_router", + "Image": "amazon/aws-for-fluent-bit:latest", + "ImageID": "sha256:ed2bd1c0fa887e59338a8761e040acc495213fd3c1b2be661c44c7158425e6e3", + "Labels": { + "com.amazonaws.ecs.container-name": "log_router", + "com.amazonaws.ecs.task-definition-family": "my-redis", + "com.amazonaws.ecs.task-definition-version": "1", + "com.amazonaws.ecs.cluster": "ecs-cluster" + }, + "DesiredStatus": "RUNNING", + "KnownStatus": "RUNNING", + "Limits": { "CPU": 2 }, + "CreatedAt": "2023-11-20T12:10:44.559880428Z", + "StartedAt": "2023-11-20T12:10:44.559880428Z", + "Type": "NORMAL", + "LogDriver": "awslogs", + "LogOptions": { + "awslogs-group": "aws", + "awslogs-region": "us-east-1", + "awslogs-stream": "log_router/log_router/938f6d263c464a" + }, + "ContainerARN": "arn:aws:ecs:us-east-1:601427279990:container/ecs-cluster/938f6d263c464aa59/dc51359e-7f8a", + "Networks": [ + { + "NetworkMode": "awsvpc", + "IPv4Addresses": ["172.31.15.128"], + "AttachmentIndex": 0, + "MACAddress": "0e:73:3c:72:d3:c6", + "IPv4SubnetCIDRBlock": "172.31.15.0/24", + "DomainNameServers": ["172.31.44.19", "172.31.0.2"], + "PrivateDNSName": "ip-123-31-115-123.ec2.internal", + "SubnetGatewayIpv4Address": "172.31.15.1/24" + } + ], + "Snapshotter": "overlayfs" + }, + { + "DockerId": "938f6d263c464aa5985dc67ab7f38a7e-2537586469", + "Name": "datadog-agent", + "DockerName": "datadog-agent", + "Image": "public.ecr.aws/datadog/agent:latest", + "ImageID": "sha256:ba1d175ac08f8241d62c07785cbc6e026310cd2293dc4cf148e05d63655d1297", + "Labels": { + "com.amazonaws.ecs.container-name": "datadog-agent", + "com.amazonaws.ecs.task-definition-family": "my-redis", + "com.amazonaws.ecs.task-definition-version": "1" + }, + "DesiredStatus": "RUNNING", + "KnownStatus": "RUNNING", + "Limits": { "CPU": 2 }, + "CreatedAt": "2023-11-20T12:10:44.404563253Z", + "StartedAt": "2023-11-20T12:10:44.404563253Z", + "Type": "NORMAL", + "Health": { + "status": "HEALTHY", + "statusSince": "2023-11-20T12:11:16.383262018Z" + }, + "Volumes": [ + { + "DockerName": "my-redis-1-dd-sockets", + "Destination": "/var/run/datadog" + } + ], + "LogDriver": "awslogs", + "LogOptions": { + "awslogs-group": "aws", + "awslogs-region": "us-east-1", + "awslogs-stream": "log_router/datadog-agent/938f6d263c46e" + }, + "ContainerARN": "arn:aws:ecs:us-east-1:601427279990:container/ecs-cluster/938f6d263c464aa/a17c293b-ab52", + "Networks": [ + { + "NetworkMode": "awsvpc", + "IPv4Addresses": ["172.31.115.123"], + "AttachmentIndex": 0, + "MACAddress": "0e:73:3c:72:d3:ca", + "IPv4SubnetCIDRBlock": "172.31.11.0/24", + "DomainNameServers": ["172.31.44.17", "172.31.0.2"], + "PrivateDNSName": "ip-123-31-115-125.ec2.internal", + "SubnetGatewayIpv4Address": "172.31.11.1/24" + } + ], + "Snapshotter": "overlayfs" + }, + { + "DockerId": "938f6d263c464aa5985dc67ab7f38a7e-3054012820", + "Name": "redis", + "DockerName": "redis", + "Image": "redis/redis:latest", + "ImageID": "sha256:529d1db9e246240208eab2a61fbb8614b09d7505ac1329c1dec70b6aba2e1428", + "Labels": { + "com.amazonaws.ecs.container-name": "redis", + "com.amazonaws.ecs.task-definition-family": "my-redis", + "com.amazonaws.ecs.task-definition-version": "1" + }, + "DesiredStatus": "RUNNING", + "KnownStatus": "RUNNING", + "Limits": { "CPU": 2 }, + "CreatedAt": "2023-11-20T12:11:16.701115523Z", + "StartedAt": "2023-11-20T12:11:16.701115523Z", + "Type": "NORMAL", + "LogDriver": "awslogs", + "LogOptions": { + "awslogs-group": "aws", + "awslogs-region": "us-east-1", + "awslogs-stream": "log_router/redis/938f6d263c4" + }, + "ContainerARN": "arn:aws:ecs:us-east-1:601427279990:container/ecs-cluster/938f6d263c464aa5/a40518e2-7573", + "Networks": [ + { + "NetworkMode": "awsvpc", + "IPv4Addresses": ["172.31.115.18"], + "AttachmentIndex": 0, + "MACAddress": "0e:73:3d:72:c3:cb", + "IPv4SubnetCIDRBlock": "172.31.15.0/24", + "DomainNameServers": ["172.31.44.19", "172.31.0.2"], + "PrivateDNSName": "ip-12-31-115-18.ec2.internal", + "SubnetGatewayIpv4Address": "172.111.115.1/24" + } + ], + "Snapshotter": "overlayfs" + } + ], + "ClockDrift": { + "ClockErrorBound": 0.5730080000000001, + "ReferenceTimestamp": "2023-12-27T16:06:41Z", + "ClockSynchronizationStatus": "SYNCHRONIZED" + }, + "EphemeralStorageMetrics": { "Utilized": 2298, "Reserved": 20496 } +} diff --git a/comp/core/workloadmeta/collectors/internal/ecsfargate/v2parser.go b/comp/core/workloadmeta/collectors/internal/ecsfargate/v2parser.go new file mode 100644 index 000000000000..d5de879207be --- /dev/null +++ b/comp/core/workloadmeta/collectors/internal/ecsfargate/v2parser.go @@ -0,0 +1,173 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build docker + +// Package ecsfargate implements the ECS Fargate Workloadmeta collector. +package ecsfargate + +import ( + "context" + "strings" + "time" + + "github.com/DataDog/datadog-agent/comp/core/workloadmeta" + v2 "github.com/DataDog/datadog-agent/pkg/util/ecs/metadata/v2" + "github.com/DataDog/datadog-agent/pkg/util/log" +) + +func (c *collector) parseTaskFromV2Endpoint(ctx context.Context) ([]workloadmeta.CollectorEvent, error) { + task, err := c.metaV2.GetTask(ctx) + if err != nil { + return nil, err + } + return c.parseV2Task(task), nil +} + +func (c *collector) parseV2Task(task *v2.Task) []workloadmeta.CollectorEvent { + events := []workloadmeta.CollectorEvent{} + seen := make(map[workloadmeta.EntityID]struct{}) + + // We only want to collect tasks without a STOPPED status. + if task.KnownStatus == workloadmeta.ECSTaskKnownStatusStopped { + return events + } + + arnParts := strings.Split(task.TaskARN, "/") + taskID := arnParts[len(arnParts)-1] + entityID := workloadmeta.EntityID{ + Kind: workloadmeta.KindECSTask, + ID: task.TaskARN, + } + + seen[entityID] = struct{}{} + + taskContainers, containerEvents := c.parseTaskContainers(task, seen) + entity := &workloadmeta.ECSTask{ + EntityID: entityID, + EntityMeta: workloadmeta.EntityMeta{ + Name: taskID, + }, + ClusterName: parseClusterName(task.ClusterName), + Region: parseRegion(task.ClusterName), + Family: task.Family, + Version: task.Version, + LaunchType: workloadmeta.ECSLaunchTypeFargate, + Containers: taskContainers, + + // the AvailabilityZone metadata is only available for + // Fargate tasks using platform version 1.4 or later + AvailabilityZone: task.AvailabilityZone, + } + + events = append(events, containerEvents...) + events = append(events, workloadmeta.CollectorEvent{ + Source: workloadmeta.SourceRuntime, + Type: workloadmeta.EventTypeSet, + Entity: entity, + }) + + for seenID := range c.seen { + if _, ok := seen[seenID]; ok { + continue + } + + var entity workloadmeta.Entity + switch seenID.Kind { + case workloadmeta.KindECSTask: + entity = &workloadmeta.ECSTask{EntityID: seenID} + case workloadmeta.KindContainer: + entity = &workloadmeta.Container{EntityID: seenID} + default: + log.Errorf("cannot handle expired entity of kind %q, skipping", seenID.Kind) + continue + } + + events = append(events, workloadmeta.CollectorEvent{ + Type: workloadmeta.EventTypeUnset, + Source: workloadmeta.SourceRuntime, + Entity: entity, + }) + } + + c.seen = seen + + return events +} + +func (c *collector) parseTaskContainers( + task *v2.Task, + seen map[workloadmeta.EntityID]struct{}, +) ([]workloadmeta.OrchestratorContainer, []workloadmeta.CollectorEvent) { + taskContainers := make([]workloadmeta.OrchestratorContainer, 0, len(task.Containers)) + events := make([]workloadmeta.CollectorEvent, 0, len(task.Containers)) + + for _, container := range task.Containers { + containerID := container.DockerID + taskContainers = append(taskContainers, workloadmeta.OrchestratorContainer{ + ID: containerID, + Name: container.Name, + }) + entityID := workloadmeta.EntityID{ + Kind: workloadmeta.KindContainer, + ID: containerID, + } + + seen[entityID] = struct{}{} + + image, err := workloadmeta.NewContainerImage(container.ImageID, container.Image) + + if err != nil { + log.Debugf("cannot split image name %q: %s", container.Image, err) + } + + ips := make(map[string]string) + + for _, net := range container.Networks { + if net.NetworkMode == "awsvpc" && len(net.IPv4Addresses) > 0 { + ips["awsvpc"] = net.IPv4Addresses[0] + } + } + + var startedAt time.Time + if container.StartedAt != "" { + startedAt, err = time.Parse(time.RFC3339, container.StartedAt) + if err != nil { + log.Debugf("cannot parse StartedAt %q for container %q: %s", container.StartedAt, container.DockerID, err) + } + } + + var createdAt time.Time + if container.CreatedAt != "" { + createdAt, err = time.Parse(time.RFC3339, container.CreatedAt) + if err != nil { + log.Debugf("could not parse creation time '%q' for container %q: %s", container.CreatedAt, container.DockerID, err) + } + } + + events = append(events, workloadmeta.CollectorEvent{ + Source: workloadmeta.SourceRuntime, + Type: workloadmeta.EventTypeSet, + Entity: &workloadmeta.Container{ + EntityID: entityID, + EntityMeta: workloadmeta.EntityMeta{ + Name: container.DockerName, + Labels: container.Labels, + }, + Image: image, + Runtime: workloadmeta.ContainerRuntimeECSFargate, + NetworkIPs: ips, + State: workloadmeta.ContainerState{ + StartedAt: startedAt, + CreatedAt: createdAt, + Running: container.KnownStatus == "RUNNING", + Status: parseStatus(container.KnownStatus), + }, + }, + }) + } + + return taskContainers, events +} diff --git a/comp/core/workloadmeta/collectors/internal/ecsfargate/v2parser_test.go b/comp/core/workloadmeta/collectors/internal/ecsfargate/v2parser_test.go new file mode 100644 index 000000000000..2bfb964fa7ef --- /dev/null +++ b/comp/core/workloadmeta/collectors/internal/ecsfargate/v2parser_test.go @@ -0,0 +1,125 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build docker + +// Package ecsfargate implements the ECS Fargate Workloadmeta collector. +package ecsfargate + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/DataDog/datadog-agent/comp/core/workloadmeta" + "github.com/DataDog/datadog-agent/pkg/util/ecs/metadata/testutil" + v2 "github.com/DataDog/datadog-agent/pkg/util/ecs/metadata/v2" +) + +func TestPullWithTaskCollectionEnabledWithV2Parser(t *testing.T) { + // Start a dummy Http server to simulate ECS Fargate metadata v2 endpoint + dummyECS, err := testutil.NewDummyECS( + testutil.FileHandlerOption("/v2/metadata", "./testdata/redis.json"), + ) + require.Nil(t, err) + ts := dummyECS.Start() + defer ts.Close() + + store := &fakeWorkloadmetaStore{} + // create an ECS Fargate collector with orchestratorECSCollectionEnabled enabled + collector := collector{ + store: store, + taskCollectionEnabled: true, + metaV2: v2.NewClient(fmt.Sprintf("%s/v2/metadata", ts.URL)), + } + collector.taskCollectionParser = collector.parseTaskFromV2Endpoint + + err = collector.Pull(context.Background()) + require.Nil(t, err) + // one ECS task event and three container events should be notified + require.Len(t, store.notifiedEvents, 4) + + count := 0 + for _, event := range store.notifiedEvents { + require.Equal(t, workloadmeta.EventTypeSet, event.Type) + require.Equal(t, workloadmeta.SourceRuntime, event.Source) + switch entity := event.Entity.(type) { + case *workloadmeta.ECSTask: + require.Equal(t, "us-east-1", entity.Region) + require.Equal(t, "ecs-cluster", entity.ClusterName) + require.Equal(t, "my-redis", entity.Family) + require.Equal(t, "1", entity.Version) + require.Equal(t, workloadmeta.ECSLaunchTypeFargate, entity.LaunchType) + require.ElementsMatch(t, entity.Containers, []workloadmeta.OrchestratorContainer{ + { + ID: "938f6d263c464aa5985dc67ab7f38a7e-1714341083", + Name: "log_router", + }, + { + ID: "938f6d263c464aa5985dc67ab7f38a7e-2537586469", + Name: "datadog-agent", + }, + { + ID: "938f6d263c464aa5985dc67ab7f38a7e-3054012820", + Name: "redis", + }, + }) + count++ + case *workloadmeta.Container: + require.Equal(t, workloadmeta.ContainerRuntimeECSFargate, entity.Runtime) + if entity.Image.Name == "public.ecr.aws/datadog/agent" { + require.Equal(t, "datadog-agent", entity.Name) + require.Equal(t, "latest", entity.Image.Tag) + require.Len(t, entity.Labels, 3) + require.Equal(t, map[string]string{"awsvpc": "172.31.115.123"}, entity.NetworkIPs) + ts, err := time.Parse(time.RFC3339Nano, "2023-11-20T12:10:44.404563253Z") + require.NoError(t, err) + require.Equal(t, workloadmeta.ContainerState{ + Running: true, + Status: workloadmeta.ContainerStatusRunning, + StartedAt: ts, + CreatedAt: ts, + }, entity.State) + count++ + } else if entity.Image.Name == "redis/redis" { + require.Equal(t, "redis", entity.Name) + require.Equal(t, "latest", entity.Image.Tag) + require.Len(t, entity.Labels, 3) + require.Equal(t, map[string]string{"awsvpc": "172.31.115.18"}, entity.NetworkIPs) + ts, err := time.Parse(time.RFC3339Nano, "2023-11-20T12:11:16.701115523Z") + require.NoError(t, err) + require.Equal(t, workloadmeta.ContainerState{ + Running: true, + Status: workloadmeta.ContainerStatusRunning, + StartedAt: ts, + CreatedAt: ts, + }, entity.State) + count++ + } else if entity.Image.Name == "amazon/aws-for-fluent-bit" { + require.Equal(t, "log_router", entity.Name) + require.Equal(t, "latest", entity.Image.Tag) + require.Len(t, entity.Labels, 4) + require.Equal(t, map[string]string{"awsvpc": "172.31.15.128"}, entity.NetworkIPs) + ts, err := time.Parse(time.RFC3339Nano, "2023-11-20T12:10:44.559880428Z") + require.NoError(t, err) + require.Equal(t, workloadmeta.ContainerState{ + Running: true, + Status: workloadmeta.ContainerStatusRunning, + StartedAt: ts, + CreatedAt: ts, + }, entity.State) + count++ + } else { + t.Errorf("unexpected image name: %s", entity.Image.Name) + } + default: + t.Errorf("unexpected entity type: %T", entity) + } + } + require.Equal(t, 4, count) +} diff --git a/comp/core/workloadmeta/collectors/internal/ecsfargate/v4parser.go b/comp/core/workloadmeta/collectors/internal/ecsfargate/v4parser.go new file mode 100644 index 000000000000..1d2c42081a9e --- /dev/null +++ b/comp/core/workloadmeta/collectors/internal/ecsfargate/v4parser.go @@ -0,0 +1,65 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build docker + +// Package ecsfargate implements the ECS Fargate Workloadmeta collector. +package ecsfargate + +import ( + "context" + + "github.com/DataDog/datadog-agent/comp/core/workloadmeta" + "github.com/DataDog/datadog-agent/comp/core/workloadmeta/collectors/util" + "github.com/DataDog/datadog-agent/pkg/util/ecs/metadata/v3or4" + "github.com/DataDog/datadog-agent/pkg/util/log" +) + +func (c *collector) parseTaskFromV4Endpoint(ctx context.Context) ([]workloadmeta.CollectorEvent, error) { + task, err := c.metaV4.GetTask(ctx) + if err != nil { + return nil, err + } + return c.parseV4Task(task), nil +} + +func (c *collector) parseV4Task(task *v3or4.Task) []workloadmeta.CollectorEvent { + events := []workloadmeta.CollectorEvent{} + seen := make(map[workloadmeta.EntityID]struct{}) + + // We only want to collect tasks without a STOPPED status. + if task.KnownStatus == workloadmeta.ECSTaskKnownStatusStopped { + return events + } + + events = append(events, util.ParseV4Task(*task, seen)...) + + for seenID := range c.seen { + if _, ok := seen[seenID]; ok { + continue + } + + var entity workloadmeta.Entity + switch seenID.Kind { + case workloadmeta.KindECSTask: + entity = &workloadmeta.ECSTask{EntityID: seenID} + case workloadmeta.KindContainer: + entity = &workloadmeta.Container{EntityID: seenID} + default: + log.Errorf("cannot handle expired entity of kind %q, skipping", seenID.Kind) + continue + } + + events = append(events, workloadmeta.CollectorEvent{ + Type: workloadmeta.EventTypeUnset, + Source: workloadmeta.SourceRuntime, + Entity: entity, + }) + } + + c.seen = seen + + return events +} diff --git a/comp/core/workloadmeta/collectors/internal/ecsfargate/v4parser_test.go b/comp/core/workloadmeta/collectors/internal/ecsfargate/v4parser_test.go new file mode 100644 index 000000000000..39ea189c0510 --- /dev/null +++ b/comp/core/workloadmeta/collectors/internal/ecsfargate/v4parser_test.go @@ -0,0 +1,91 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the Apache License Version 2.0. +// This product includes software developed at Datadog (https://www.datadoghq.com/). +// Copyright 2016-present Datadog, Inc. + +//go:build docker + +// Package ecsfargate implements the ECS Fargate Workloadmeta collector. +package ecsfargate + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/DataDog/datadog-agent/comp/core/workloadmeta" + "github.com/DataDog/datadog-agent/pkg/util/ecs/metadata/testutil" + "github.com/DataDog/datadog-agent/pkg/util/ecs/metadata/v3or4" +) + +func TestPullWithTaskCollectionEnabledWithV4Parser(t *testing.T) { + // Start a dummy Http server to simulate ECS Fargate metadata v4 endpoint + dummyECS, err := testutil.NewDummyECS( + testutil.FileHandlerOption("/v4/1234-1/task", "./testdata/redis.json"), + ) + require.Nil(t, err) + ts := dummyECS.Start() + defer ts.Close() + + store := &fakeWorkloadmetaStore{} + // create an ECS Fargate collector with orchestratorECSCollectionEnabled enabled + collector := collector{ + store: store, + taskCollectionEnabled: true, + metaV4: v3or4.NewClient(fmt.Sprintf("%s/v4/1234-1", ts.URL), "v4"), + } + collector.taskCollectionParser = collector.parseTaskFromV4Endpoint + + err = collector.Pull(context.Background()) + require.Nil(t, err) + // one ECS task event and three container events should be notified + require.Len(t, store.notifiedEvents, 4) + + count := 0 + for _, event := range store.notifiedEvents { + require.Equal(t, workloadmeta.EventTypeSet, event.Type) + require.Equal(t, workloadmeta.SourceRuntime, event.Source) + switch entity := event.Entity.(type) { + case *workloadmeta.ECSTask: + require.Equal(t, 123457279990, entity.AWSAccountID) + require.Equal(t, "us-east-1", entity.Region) + require.Equal(t, "ecs-cluster", entity.ClusterName) + require.Equal(t, "RUNNING", entity.DesiredStatus) + require.Equal(t, "my-redis", entity.Family) + require.Equal(t, "1", entity.Version) + require.Equal(t, workloadmeta.ECSLaunchTypeFargate, entity.LaunchType) + count++ + case *workloadmeta.Container: + require.Equal(t, "RUNNING", entity.KnownStatus) + require.Equal(t, "awslogs", entity.LogDriver) + require.Len(t, entity.Networks, 1) + require.Equal(t, "awsvpc", entity.Networks[0].NetworkMode) + if entity.Image.Name == "public.ecr.aws/datadog/agent" { + require.Equal(t, "HEALTHY", entity.Health.Status) + require.Equal(t, "latest", entity.Image.Tag) + require.Len(t, entity.Volumes, 1) + require.Len(t, entity.Labels, 3) + count++ + } else if entity.Image.Name == "redis/redis" { + require.Nil(t, entity.Health) + require.Equal(t, "latest", entity.Image.Tag) + require.Len(t, entity.Volumes, 0) + require.Len(t, entity.Labels, 3) + count++ + } else if entity.Image.Name == "amazon/aws-for-fluent-bit" { + require.Nil(t, entity.Health) + require.Equal(t, "latest", entity.Image.Tag) + require.Len(t, entity.Volumes, 0) + require.Len(t, entity.Labels, 4) + count++ + } else { + t.Errorf("unexpected image name: %s", entity.Image.Name) + } + default: + t.Errorf("unexpected entity type: %T", entity) + } + } + require.Equal(t, 4, count) +}