Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion pkg/clusteragent/kubeactions/config_retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
package kubeactions

import (
rcclient "github.com/DataDog/datadog-agent/pkg/config/remote/client"
"github.com/DataDog/datadog-agent/pkg/remoteconfig/state"
"github.com/DataDog/datadog-agent/pkg/util/log"
)
Expand All @@ -24,7 +25,7 @@ type ConfigRetriever struct {
}

// NewConfigRetriever creates a new ConfigRetriever and subscribes to K8S_ACTIONS
func NewConfigRetriever(processor *ActionProcessor, isLeader func() bool, rcClient RcClient) *ConfigRetriever {
func NewConfigRetriever(processor *ActionProcessor, isLeader func() bool, rcClient *rcclient.Client) *ConfigRetriever {
cr := &ConfigRetriever{
processor: processor,
isLeader: isLeader,
Expand Down
6 changes: 6 additions & 0 deletions pkg/clusteragent/kubeactions/executors/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,16 @@ package executors

import (
"context"
"time"

kubeactions "github.com/DataDog/agent-payload/v5/kubeactions"
)

const (
// default timeout for all executors, can be overridden by individual executors if needed
defaultExecutorTimeout = 10 * time.Second
)

// Executor is the interface that all executors in this package implement
type Executor interface {
Execute(ctx context.Context, action *kubeactions.KubeAction) ExecutionResult
Expand Down
107 changes: 107 additions & 0 deletions pkg/clusteragent/kubeactions/executors/get_resource.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build kubeapiserver

package executors

import (
"context"
"fmt"
"strings"

"k8s.io/client-go/kubernetes"

"sigs.k8s.io/yaml"

"github.com/DataDog/datadog-agent/pkg/util/log"

kubeactions "github.com/DataDog/agent-payload/v5/kubeactions"
)

type GetResourceExecutor struct {
clientset kubernetes.Interface
}

// NewGetResourceExecutor creates a new GetResourceExecutor
func NewGetResourceExecutor(clientset kubernetes.Interface) *GetResourceExecutor {
return &GetResourceExecutor{
clientset: clientset,
}
}

// Execute retrieves the specified Kubernetes resource and returns it as JSON string in the message field of ExecutionResult
func (e *GetResourceExecutor) Execute(ctx context.Context, action *kubeactions.KubeAction) ExecutionResult {
resource := action.Resource
namespace := strings.ToLower(resource.GetNamespace())
name := strings.ToLower(resource.GetName())
apiVersion := strings.ToLower(resource.GetApiVersion())
kind := strings.ToLower(resource.GetKind())
Comment thread
lavigne958 marked this conversation as resolved.

if apiVersion == "" {
return ExecutionResult{
Status: StatusFailed,
Message: "apiVersion is required to get resource",
}
}

log.Infof("Getting resource %s/%s of type %s", namespace, name, resource.Kind)

// build the raw REST request to get the resource as unstructured JSON
// resource.GetApiVersion() returns group/version, it will automagically handle adding the group prefix if needed
// or not adding it for core resources
var path string
if namespace == "" {
path = fmt.Sprintf("/apis/%s/%s/%s", apiVersion, kind, name)
} else {
path = fmt.Sprintf("/apis/%s/namespaces/%s/%s/%s", apiVersion, namespace, kind, name)
}

ctx, cancel := context.WithTimeout(ctx, defaultExecutorTimeout)
defer cancel()

log.Debugf("get_resource using path '%s'", path)
data, err := e.clientset.CoreV1().RESTClient().Get().AbsPath(path).Do(ctx).Raw()
if err != nil {
return ExecutionResult{
Status: StatusFailed,
Message: fmt.Sprintf("failed to get resource: %v -- raw response body: %s", err, string(data)),
}
}

outputFormat := "json"
// if output := action.GetGetResource_().GetOutputFormat(); output != "" {
// outputFormat = strings.ToLower(output)
// }

output, err := formatOutput(data, outputFormat)
if err != nil {
return ExecutionResult{
Status: StatusFailed,
Message: fmt.Sprintf("failed to format output to %s: %v", outputFormat, err),
}
}

return ExecutionResult{
Status: StatusSuccess,
Message: output,
}
}

func formatOutput(data []byte, format string) (string, error) {
switch format {
case "json":
return string(data), nil
case "yaml":
jsonData := data
yamlData, err := yaml.JSONToYAML(jsonData)
if err != nil {
return "", fmt.Errorf("failed to convert resource JSON to YAML: %v", err)
}
return string(yamlData), nil
default:
return "", fmt.Errorf("unsupported output format: %s", format)
}
}
8 changes: 7 additions & 1 deletion pkg/clusteragent/kubeactions/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,12 @@ import (
"github.com/DataDog/datadog-agent/pkg/clusteragent/kubeactions/executors"
"github.com/DataDog/datadog-agent/pkg/util/log"
"k8s.io/client-go/kubernetes"

rcclient "github.com/DataDog/datadog-agent/pkg/config/remote/client"
)

// Setup initializes the kubeactions subsystem with all executors registered
func Setup(ctx context.Context, clientset kubernetes.Interface, clusterName, clusterID string, isLeader func() bool, rcClient RcClient, epForwarderComp eventplatform.Component) (*ConfigRetriever, error) {
func Setup(ctx context.Context, clientset kubernetes.Interface, clusterName, clusterID string, isLeader func() bool, rcClient *rcclient.Client, epForwarderComp eventplatform.Component) (*ConfigRetriever, error) {
log.Infof("Setting up Kubernetes actions subsystem")

// Create the executor registry
Expand Down Expand Up @@ -76,6 +78,10 @@ func registerExecutors(registry *ExecutorRegistry, clientset kubernetes.Interfac
registry.Register("patch_deployment", &executorAdapter{exec: executors.NewPatchDeploymentExecutor(clientset)})
log.Infof("Registered executor for action type: patch_deployment")

// Register get_resource executor
registry.Register("get_resource", &executorAdapter{exec: executors.NewGetResourceExecutor(clientset)})
log.Infof("Registered executor for action type: get_resource")

// TODO: Add more executors here as they are implemented:
// registry.Register("drain_node", &executorAdapter{exec: executors.NewDrainNodeExecutor(clientset)})
// registry.Register("cordon_node", &executorAdapter{exec: executors.NewCordonNodeExecutor(clientset)})
Expand Down
4 changes: 4 additions & 0 deletions pkg/clusteragent/kubeactions/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ const (
ActionTypeDeletePod = "delete_pod"
ActionTypeRestartDeployment = "restart_deployment"
ActionTypePatchDeployment = "patch_deployment"
ActionTypeGetResource = "get_resource"
ActionTypeUnknown = "unknown"
)

Expand Down Expand Up @@ -55,6 +56,9 @@ func GetActionType(action *kubeactions.KubeAction) string {
return ActionTypeRestartDeployment
case *kubeactions.KubeAction_PatchDeployment:
return ActionTypePatchDeployment
// to add when PR https://github.com/DataDog/agent-payload/pull/472 is merged
// case *kubeactions.KubeAction_GetResource:
// return ActionTypeGetResource
default:
return ActionTypeUnknown
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Each section from every release note are combined when the
# CHANGELOG.rst is rendered. So the text needs to be worded so that
# it does not depend on any information only available in another
# section. This may mean repeating some details, but each section
# must be readable independently of the other.
#
# Each section note must be formatted as reStructuredText.
---
features:
- |
Add a new action get-resource in kubeactions

Loading