Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions build/Dockerfile.debug
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
FROM --platform=$BUILDPLATFORM golang:1.24-bookworm AS builder

ENV GO111MODULE=on CGO_ENABLED=0
WORKDIR /work
ARG TARGETOS TARGETARCH
RUN go install github.com/go-delve/delve/cmd/dlv@latest
RUN --mount=target=. \
--mount=type=cache,target=/root/.cache/go-build \
--mount=type=cache,target=/go/pkg \
GOOS=$TARGETOS GOARCH=$TARGETARCH go build -o /out/operator .

FROM gcr.io/distroless/static-debian12:debug-nonroot

COPY --from=builder /go/bin/dlv /usr/bin/dlv
COPY --from=builder /out/operator /usr/bin/operator

ARG image_version
ENV RELEASE=$image_version

ENTRYPOINT ["/usr/bin/dlv", "--listen=:40000", "--headless=true","--accept-multiclient", "--continue", "--api-version=2", "--log", "exec", "/usr/bin/operator"]
91 changes: 11 additions & 80 deletions watcher/containerprofilewatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"fmt"
"strings"
"time"

"github.com/armosec/armoapi-go/apis"
"github.com/kubescape/go-logger"
Expand All @@ -16,87 +15,24 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/pager"
)

// ContainerProfileWatch watches and processes changes on ContainerProfile resources
// ContainerProfileWatch uses the generic resource watcher for ContainerProfiles
func (wh *WatchHandler) ContainerProfileWatch(ctx context.Context, workerPool *ants.PoolWithFunc) {
eventQueue := NewCooldownQueueWithParams(15*time.Second, 1*time.Second)
cmdCh := make(chan *apis.Command)
errorCh := make(chan error)
apEvents := make(<-chan watch.Event)

// The watcher is considered unavailable by default
apWatcherUnavailable := make(chan struct{})
go func() {
apWatcherUnavailable <- struct{}{}
}()

go wh.HandleContainerProfileEvents(eventQueue, cmdCh, errorCh)

// notifyWatcherDown notifies the appropriate channel that the watcher
// is down and backs off for the retry interval to not produce
// unnecessary events
notifyWatcherDown := func(watcherDownCh chan<- struct{}) {
go func() { watcherDownCh <- struct{}{} }()
time.Sleep(retryInterval)
}

// get the initial profiles
if err := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
return wh.storageClient.SpdxV1beta1().ContainerProfiles("").List(ctx, opts)
}).EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error {
ap := obj.(*spdxv1beta1.ContainerProfile)
// simulate "add" event
eventQueue.Enqueue(watch.Event{
Type: watch.Added,
Object: ap,
})
return nil
}); err != nil {
logger.L().Ctx(ctx).Error("failed to list existing container profiles", helpers.Error(err))
}

var watcher watch.Interface
var err error
for {
select {
case apEvent, ok := <-apEvents:
if ok {
eventQueue.Enqueue(apEvent)
} else {
notifyWatcherDown(apWatcherUnavailable)
}
case cmd, ok := <-cmdCh:
if ok {
_ = utils.AddCommandToChannel(ctx, wh.cfg, cmd, workerPool)
} else {
notifyWatcherDown(apWatcherUnavailable)
}
case err, ok := <-errorCh:
if ok {
logger.L().Ctx(ctx).Error("error in ContainerProfileWatch", helpers.Error(err))
} else {
notifyWatcherDown(apWatcherUnavailable)
}
case <-apWatcherUnavailable:
if watcher != nil {
watcher.Stop()
}

watcher, err = wh.getContainerProfileWatcher()
if err != nil {
notifyWatcherDown(apWatcherUnavailable)
} else {
apEvents = watcher.ResultChan()
}
GenericResourceWatch[*spdxv1beta1.ContainerProfile](ctx, wh.cfg, workerPool, func(ctx context.Context, opts metav1.ListOptions) ([]*spdxv1beta1.ContainerProfile, string, string, error) {
list, err := wh.storageClient.SpdxV1beta1().ContainerProfiles("").List(ctx, opts)
if err != nil {
return nil, "", "", err
}
}

items := make([]*spdxv1beta1.ContainerProfile, len(list.Items))
for i := range list.Items {
items[i] = &list.Items[i]
}
return items, list.Continue, list.ResourceVersion, nil
}, wh.HandleContainerProfileEvents)
}

func (wh *WatchHandler) HandleContainerProfileEvents(eventQueue *CooldownQueue, producedCommands chan<- *apis.Command, errorCh chan<- error) {
Expand Down Expand Up @@ -155,11 +91,6 @@ func (wh *WatchHandler) HandleContainerProfileEvents(eventQueue *CooldownQueue,
}
}

func (wh *WatchHandler) getContainerProfileWatcher() (watch.Interface, error) {
// no need to support ExcludeNamespaces and IncludeNamespaces since node-agent will respect them as well
return wh.storageClient.SpdxV1beta1().ContainerProfiles("").Watch(context.Background(), metav1.ListOptions{})
}

func getPod(client kubernetes.Interface, obj *spdxv1beta1.ContainerProfile) (*corev1.Pod, error) {
if kind, ok := obj.Labels[helpersv1.KindMetadataKey]; !ok || kind != "Pod" {
return nil, nil
Expand Down
75 changes: 75 additions & 0 deletions watcher/genericwatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package watcher

import (
"context"
"time"

"github.com/armosec/armoapi-go/apis"
"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
"github.com/kubescape/operator/config"
"github.com/kubescape/operator/utils"
"github.com/panjf2000/ants/v2"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
)

// ListFunc is a function that lists resources with paging.
type ListFunc[T runtime.Object] func(ctx context.Context, opts metav1.ListOptions) ([]T, string, string, error)

// IDAndChecksumFunc extracts a unique ID and a checksum/version from a resource.
type IDAndChecksumFunc[T runtime.Object] func(obj T) (id string, checksum string)

// EventHandlerFunc handles events for a resource.
type EventHandlerFunc[T runtime.Object] func(eventQueue *CooldownQueue, producedCommands chan<- *apis.Command, errorCh chan<- error)

// GenericResourceWatch is a generic periodic watcher for any resource type implementing metav1.Object.
func GenericResourceWatch[T runtime.Object](ctx context.Context, cfg config.IConfig, workerPool *ants.PoolWithFunc, listFunc ListFunc[T], eventHandler EventHandlerFunc[T]) {
eventQueue := NewCooldownQueueWithParams(15*time.Second, 1*time.Second)
cmdCh := make(chan *apis.Command)
errorCh := make(chan error)

go eventHandler(eventQueue, cmdCh, errorCh)

ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
var since string
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
var continueToken string
for {
logger.L().Debug("GenericResourceWatch - listing resources", helpers.String("continueToken", continueToken), helpers.String("since", since))
items, nextToken, lastUpdated, err := listFunc(ctx, metav1.ListOptions{
Limit: int64(100),
Continue: continueToken,
ResourceVersion: since, // ensure we only get changes since the last check
})
if err != nil {
logger.L().Ctx(ctx).Error("GenericResourceWatch - error in listFunc", helpers.Error(err))
break
}
for _, obj := range items {
// added and modified events are treated the same, so we enqueue a Modified event for both
eventQueue.Enqueue(watch.Event{Type: watch.Modified, Object: obj})
}
since = lastUpdated
if nextToken == "" {
break
}
continueToken = nextToken
}
case cmd, ok := <-cmdCh:
if ok {
_ = utils.AddCommandToChannel(ctx, cfg, cmd, workerPool)
}
case err, ok := <-errorCh:
if ok {
logger.L().Ctx(ctx).Error("GenericResourceWatch - error from errorCh", helpers.Error(err))
}
}
}
}
113 changes: 11 additions & 102 deletions watcher/sbomwatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"slices"
"strings"
"time"

"github.com/armosec/armoapi-go/apis"
"github.com/kubescape/go-logger"
Expand All @@ -13,26 +12,12 @@ import (
"github.com/kubescape/operator/utils"
spdxv1beta1 "github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1"
"github.com/panjf2000/ants/v2"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/tools/pager"
)

// SBOMWatch watches and processes changes on SBOMs
// SBOMWatch uses the generic resource watcher for SBOMSyft resources
func (wh *WatchHandler) SBOMWatch(ctx context.Context, workerPool *ants.PoolWithFunc) {
eventQueue := NewCooldownQueueWithParams(15*time.Second, 1*time.Second)
cmdCh := make(chan *apis.Command)
errorCh := make(chan error)
sbomEvents := make(<-chan watch.Event)

// The watcher is considered unavailable by default
sbomWatcherUnavailable := make(chan struct{})
go func() {
sbomWatcherUnavailable <- struct{}{}
}()

// SBOM watcher needs pods to build a map of <image ID> : set of <wlid>
watchOpts := metav1.ListOptions{
Watch: true,
Expand All @@ -50,88 +35,17 @@ func (wh *WatchHandler) SBOMWatch(ctx context.Context, workerPool *ants.PoolWith
go wh.watchRetry(ctx, watchOpts)
}

// start watching SBOMs
go wh.HandleSBOMEvents(eventQueue, cmdCh, errorCh)

// notifyWatcherDown notifies the appropriate channel that the watcher
// is down and backs off for the retry interval to not produce
// unnecessary events
notifyWatcherDown := func(watcherDownCh chan<- struct{}) {
go func() { watcherDownCh <- struct{}{} }()
time.Sleep(retryInterval)
}

// get the initial SBOMs
if err := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
return wh.storageClient.SpdxV1beta1().SBOMSyfts("").List(ctx, opts)
}).EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error {
sbom := obj.(*spdxv1beta1.SBOMSyft)
// simulate "add" event
eventQueue.Enqueue(watch.Event{
Type: watch.Added,
Object: sbom,
})
return nil
}); err != nil {
logger.L().Ctx(ctx).Error("failed to list existing SBOMs", helpers.Error(err))
}

var watcher watch.Interface
for {
select {
// FIXME select processes the events randomly, so we might see the SBOM event before the pod event
case event := <-wh.eventQueue.ResultChan: // this is the event queue for pods
// skip non-pod objects
pod, ok := event.Object.(*corev1.Pod)
if !ok {
continue
}
wlid, err := utils.GetParentIDForPod(wh.k8sAPI, pod, wh.cfg.ClusterName())
if err != nil {
logger.L().Ctx(ctx).Error("failed to get wlid for pod", helpers.Error(err), helpers.String("pod", pod.Name), helpers.String("namespace", pod.Namespace))
continue
}
containerStatuses := slices.Concat(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses, pod.Status.EphemeralContainerStatuses)
for _, containerStatus := range containerStatuses {
hash := hashFromImageID(containerStatus.ImageID)
wh.ImageToContainerData.Set(hash, utils.ContainerData{
ContainerName: containerStatus.Name,
Wlid: wlid,
})
}
case sbomEvent, ok := <-sbomEvents:
if ok {
eventQueue.Enqueue(sbomEvent)
} else {
notifyWatcherDown(sbomWatcherUnavailable)
}
case cmd, ok := <-cmdCh:
if ok {
_ = utils.AddCommandToChannel(ctx, wh.cfg, cmd, workerPool)
} else {
notifyWatcherDown(sbomWatcherUnavailable)
}
case err, ok := <-errorCh:
if ok {
logger.L().Ctx(ctx).Error("error in SBOMWatch", helpers.Error(err))
} else {
notifyWatcherDown(sbomWatcherUnavailable)
}
case <-sbomWatcherUnavailable:
if watcher != nil {
watcher.Stop()
}

var err error
watcher, err = wh.getSBOMWatcher()
if err != nil {
notifyWatcherDown(sbomWatcherUnavailable)
} else {
sbomEvents = watcher.ResultChan()
}
GenericResourceWatch[*spdxv1beta1.SBOMSyft](ctx, wh.cfg, workerPool, func(ctx context.Context, opts metav1.ListOptions) ([]*spdxv1beta1.SBOMSyft, string, string, error) {
list, err := wh.storageClient.SpdxV1beta1().SBOMSyfts("").List(ctx, opts)
if err != nil {
return nil, "", "", err
}
}

items := make([]*spdxv1beta1.SBOMSyft, len(list.Items))
for i := range list.Items {
items[i] = &list.Items[i]
}
return items, list.Continue, list.ResourceVersion, nil
}, wh.HandleSBOMEvents)
}

func (wh *WatchHandler) HandleSBOMEvents(eventQueue *CooldownQueue, producedCommands chan<- *apis.Command, errorCh chan<- error) {
Expand Down Expand Up @@ -191,11 +105,6 @@ func (wh *WatchHandler) HandleSBOMEvents(eventQueue *CooldownQueue, producedComm
}
}

func (wh *WatchHandler) getSBOMWatcher() (watch.Interface, error) {
// no need to support ExcludeNamespaces and IncludeNamespaces since node-agent will respect them as well
return wh.storageClient.SpdxV1beta1().SBOMSyfts("").Watch(context.Background(), metav1.ListOptions{})
}

func hashFromImageID(imageID string) string {
s := strings.Split(imageID, ":")
return s[len(s)-1]
Expand Down
Loading