diff --git a/build/Dockerfile.debug b/build/Dockerfile.debug new file mode 100644 index 0000000..a98c64b --- /dev/null +++ b/build/Dockerfile.debug @@ -0,0 +1,20 @@ +FROM --platform=$BUILDPLATFORM golang:1.24-bookworm AS builder + +ENV GO111MODULE=on CGO_ENABLED=0 +WORKDIR /work +ARG TARGETOS TARGETARCH +RUN go install github.com/go-delve/delve/cmd/dlv@latest +RUN --mount=target=. \ + --mount=type=cache,target=/root/.cache/go-build \ + --mount=type=cache,target=/go/pkg \ + GOOS=$TARGETOS GOARCH=$TARGETARCH go build -o /out/operator . + +FROM gcr.io/distroless/static-debian12:debug-nonroot + +COPY --from=builder /go/bin/dlv /usr/bin/dlv +COPY --from=builder /out/operator /usr/bin/operator + +ARG image_version +ENV RELEASE=$image_version + +ENTRYPOINT ["/usr/bin/dlv", "--listen=:40000", "--headless=true","--accept-multiclient", "--continue", "--api-version=2", "--log", "exec", "/usr/bin/operator"] diff --git a/watcher/containerprofilewatcher.go b/watcher/containerprofilewatcher.go index edac1c1..6a7d95f 100644 --- a/watcher/containerprofilewatcher.go +++ b/watcher/containerprofilewatcher.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "strings" - "time" "github.com/armosec/armoapi-go/apis" "github.com/kubescape/go-logger" @@ -16,87 +15,24 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/kubernetes" - "k8s.io/client-go/tools/pager" ) -// ContainerProfileWatch watches and processes changes on ContainerProfile resources +// ContainerProfileWatch uses the generic resource watcher for ContainerProfiles func (wh *WatchHandler) ContainerProfileWatch(ctx context.Context, workerPool *ants.PoolWithFunc) { - eventQueue := NewCooldownQueueWithParams(15*time.Second, 1*time.Second) - cmdCh := make(chan *apis.Command) - errorCh := make(chan error) - apEvents := make(<-chan watch.Event) - - // The watcher is considered unavailable by default - apWatcherUnavailable := make(chan struct{}) - go func() { - apWatcherUnavailable <- struct{}{} - }() - - go wh.HandleContainerProfileEvents(eventQueue, cmdCh, errorCh) - - // notifyWatcherDown notifies the appropriate channel that the watcher - // is down and backs off for the retry interval to not produce - // unnecessary events - notifyWatcherDown := func(watcherDownCh chan<- struct{}) { - go func() { watcherDownCh <- struct{}{} }() - time.Sleep(retryInterval) - } - - // get the initial profiles - if err := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { - return wh.storageClient.SpdxV1beta1().ContainerProfiles("").List(ctx, opts) - }).EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error { - ap := obj.(*spdxv1beta1.ContainerProfile) - // simulate "add" event - eventQueue.Enqueue(watch.Event{ - Type: watch.Added, - Object: ap, - }) - return nil - }); err != nil { - logger.L().Ctx(ctx).Error("failed to list existing container profiles", helpers.Error(err)) - } - - var watcher watch.Interface - var err error - for { - select { - case apEvent, ok := <-apEvents: - if ok { - eventQueue.Enqueue(apEvent) - } else { - notifyWatcherDown(apWatcherUnavailable) - } - case cmd, ok := <-cmdCh: - if ok { - _ = utils.AddCommandToChannel(ctx, wh.cfg, cmd, workerPool) - } else { - notifyWatcherDown(apWatcherUnavailable) - } - case err, ok := <-errorCh: - if ok { - logger.L().Ctx(ctx).Error("error in ContainerProfileWatch", helpers.Error(err)) - } else { - notifyWatcherDown(apWatcherUnavailable) - } - case <-apWatcherUnavailable: - if watcher != nil { - watcher.Stop() - } - - watcher, err = wh.getContainerProfileWatcher() - if err != nil { - notifyWatcherDown(apWatcherUnavailable) - } else { - apEvents = watcher.ResultChan() - } + GenericResourceWatch[*spdxv1beta1.ContainerProfile](ctx, wh.cfg, workerPool, func(ctx context.Context, opts metav1.ListOptions) ([]*spdxv1beta1.ContainerProfile, string, string, error) { + list, err := wh.storageClient.SpdxV1beta1().ContainerProfiles("").List(ctx, opts) + if err != nil { + return nil, "", "", err } - } - + items := make([]*spdxv1beta1.ContainerProfile, len(list.Items)) + for i := range list.Items { + items[i] = &list.Items[i] + } + return items, list.Continue, list.ResourceVersion, nil + }, wh.HandleContainerProfileEvents) } func (wh *WatchHandler) HandleContainerProfileEvents(eventQueue *CooldownQueue, producedCommands chan<- *apis.Command, errorCh chan<- error) { @@ -155,11 +91,6 @@ func (wh *WatchHandler) HandleContainerProfileEvents(eventQueue *CooldownQueue, } } -func (wh *WatchHandler) getContainerProfileWatcher() (watch.Interface, error) { - // no need to support ExcludeNamespaces and IncludeNamespaces since node-agent will respect them as well - return wh.storageClient.SpdxV1beta1().ContainerProfiles("").Watch(context.Background(), metav1.ListOptions{}) -} - func getPod(client kubernetes.Interface, obj *spdxv1beta1.ContainerProfile) (*corev1.Pod, error) { if kind, ok := obj.Labels[helpersv1.KindMetadataKey]; !ok || kind != "Pod" { return nil, nil diff --git a/watcher/genericwatcher.go b/watcher/genericwatcher.go new file mode 100644 index 0000000..e110bae --- /dev/null +++ b/watcher/genericwatcher.go @@ -0,0 +1,75 @@ +package watcher + +import ( + "context" + "time" + + "github.com/armosec/armoapi-go/apis" + "github.com/kubescape/go-logger" + "github.com/kubescape/go-logger/helpers" + "github.com/kubescape/operator/config" + "github.com/kubescape/operator/utils" + "github.com/panjf2000/ants/v2" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/watch" +) + +// ListFunc is a function that lists resources with paging. +type ListFunc[T runtime.Object] func(ctx context.Context, opts metav1.ListOptions) ([]T, string, string, error) + +// IDAndChecksumFunc extracts a unique ID and a checksum/version from a resource. +type IDAndChecksumFunc[T runtime.Object] func(obj T) (id string, checksum string) + +// EventHandlerFunc handles events for a resource. +type EventHandlerFunc[T runtime.Object] func(eventQueue *CooldownQueue, producedCommands chan<- *apis.Command, errorCh chan<- error) + +// GenericResourceWatch is a generic periodic watcher for any resource type implementing metav1.Object. +func GenericResourceWatch[T runtime.Object](ctx context.Context, cfg config.IConfig, workerPool *ants.PoolWithFunc, listFunc ListFunc[T], eventHandler EventHandlerFunc[T]) { + eventQueue := NewCooldownQueueWithParams(15*time.Second, 1*time.Second) + cmdCh := make(chan *apis.Command) + errorCh := make(chan error) + + go eventHandler(eventQueue, cmdCh, errorCh) + + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + var since string + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + var continueToken string + for { + logger.L().Debug("GenericResourceWatch - listing resources", helpers.String("continueToken", continueToken), helpers.String("since", since)) + items, nextToken, lastUpdated, err := listFunc(ctx, metav1.ListOptions{ + Limit: int64(100), + Continue: continueToken, + ResourceVersion: since, // ensure we only get changes since the last check + }) + if err != nil { + logger.L().Ctx(ctx).Error("GenericResourceWatch - error in listFunc", helpers.Error(err)) + break + } + for _, obj := range items { + // added and modified events are treated the same, so we enqueue a Modified event for both + eventQueue.Enqueue(watch.Event{Type: watch.Modified, Object: obj}) + } + since = lastUpdated + if nextToken == "" { + break + } + continueToken = nextToken + } + case cmd, ok := <-cmdCh: + if ok { + _ = utils.AddCommandToChannel(ctx, cfg, cmd, workerPool) + } + case err, ok := <-errorCh: + if ok { + logger.L().Ctx(ctx).Error("GenericResourceWatch - error from errorCh", helpers.Error(err)) + } + } + } +} diff --git a/watcher/sbomwatcher.go b/watcher/sbomwatcher.go index 6f4bc3d..d36c645 100644 --- a/watcher/sbomwatcher.go +++ b/watcher/sbomwatcher.go @@ -4,7 +4,6 @@ import ( "context" "slices" "strings" - "time" "github.com/armosec/armoapi-go/apis" "github.com/kubescape/go-logger" @@ -13,26 +12,12 @@ import ( "github.com/kubescape/operator/utils" spdxv1beta1 "github.com/kubescape/storage/pkg/apis/softwarecomposition/v1beta1" "github.com/panjf2000/ants/v2" - corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/watch" - "k8s.io/client-go/tools/pager" ) -// SBOMWatch watches and processes changes on SBOMs +// SBOMWatch uses the generic resource watcher for SBOMSyft resources func (wh *WatchHandler) SBOMWatch(ctx context.Context, workerPool *ants.PoolWithFunc) { - eventQueue := NewCooldownQueueWithParams(15*time.Second, 1*time.Second) - cmdCh := make(chan *apis.Command) - errorCh := make(chan error) - sbomEvents := make(<-chan watch.Event) - - // The watcher is considered unavailable by default - sbomWatcherUnavailable := make(chan struct{}) - go func() { - sbomWatcherUnavailable <- struct{}{} - }() - // SBOM watcher needs pods to build a map of : set of watchOpts := metav1.ListOptions{ Watch: true, @@ -50,88 +35,17 @@ func (wh *WatchHandler) SBOMWatch(ctx context.Context, workerPool *ants.PoolWith go wh.watchRetry(ctx, watchOpts) } - // start watching SBOMs - go wh.HandleSBOMEvents(eventQueue, cmdCh, errorCh) - - // notifyWatcherDown notifies the appropriate channel that the watcher - // is down and backs off for the retry interval to not produce - // unnecessary events - notifyWatcherDown := func(watcherDownCh chan<- struct{}) { - go func() { watcherDownCh <- struct{}{} }() - time.Sleep(retryInterval) - } - - // get the initial SBOMs - if err := pager.New(func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) { - return wh.storageClient.SpdxV1beta1().SBOMSyfts("").List(ctx, opts) - }).EachListItem(ctx, metav1.ListOptions{}, func(obj runtime.Object) error { - sbom := obj.(*spdxv1beta1.SBOMSyft) - // simulate "add" event - eventQueue.Enqueue(watch.Event{ - Type: watch.Added, - Object: sbom, - }) - return nil - }); err != nil { - logger.L().Ctx(ctx).Error("failed to list existing SBOMs", helpers.Error(err)) - } - - var watcher watch.Interface - for { - select { - // FIXME select processes the events randomly, so we might see the SBOM event before the pod event - case event := <-wh.eventQueue.ResultChan: // this is the event queue for pods - // skip non-pod objects - pod, ok := event.Object.(*corev1.Pod) - if !ok { - continue - } - wlid, err := utils.GetParentIDForPod(wh.k8sAPI, pod, wh.cfg.ClusterName()) - if err != nil { - logger.L().Ctx(ctx).Error("failed to get wlid for pod", helpers.Error(err), helpers.String("pod", pod.Name), helpers.String("namespace", pod.Namespace)) - continue - } - containerStatuses := slices.Concat(pod.Status.ContainerStatuses, pod.Status.InitContainerStatuses, pod.Status.EphemeralContainerStatuses) - for _, containerStatus := range containerStatuses { - hash := hashFromImageID(containerStatus.ImageID) - wh.ImageToContainerData.Set(hash, utils.ContainerData{ - ContainerName: containerStatus.Name, - Wlid: wlid, - }) - } - case sbomEvent, ok := <-sbomEvents: - if ok { - eventQueue.Enqueue(sbomEvent) - } else { - notifyWatcherDown(sbomWatcherUnavailable) - } - case cmd, ok := <-cmdCh: - if ok { - _ = utils.AddCommandToChannel(ctx, wh.cfg, cmd, workerPool) - } else { - notifyWatcherDown(sbomWatcherUnavailable) - } - case err, ok := <-errorCh: - if ok { - logger.L().Ctx(ctx).Error("error in SBOMWatch", helpers.Error(err)) - } else { - notifyWatcherDown(sbomWatcherUnavailable) - } - case <-sbomWatcherUnavailable: - if watcher != nil { - watcher.Stop() - } - - var err error - watcher, err = wh.getSBOMWatcher() - if err != nil { - notifyWatcherDown(sbomWatcherUnavailable) - } else { - sbomEvents = watcher.ResultChan() - } + GenericResourceWatch[*spdxv1beta1.SBOMSyft](ctx, wh.cfg, workerPool, func(ctx context.Context, opts metav1.ListOptions) ([]*spdxv1beta1.SBOMSyft, string, string, error) { + list, err := wh.storageClient.SpdxV1beta1().SBOMSyfts("").List(ctx, opts) + if err != nil { + return nil, "", "", err } - } - + items := make([]*spdxv1beta1.SBOMSyft, len(list.Items)) + for i := range list.Items { + items[i] = &list.Items[i] + } + return items, list.Continue, list.ResourceVersion, nil + }, wh.HandleSBOMEvents) } func (wh *WatchHandler) HandleSBOMEvents(eventQueue *CooldownQueue, producedCommands chan<- *apis.Command, errorCh chan<- error) { @@ -191,11 +105,6 @@ func (wh *WatchHandler) HandleSBOMEvents(eventQueue *CooldownQueue, producedComm } } -func (wh *WatchHandler) getSBOMWatcher() (watch.Interface, error) { - // no need to support ExcludeNamespaces and IncludeNamespaces since node-agent will respect them as well - return wh.storageClient.SpdxV1beta1().SBOMSyfts("").Watch(context.Background(), metav1.ListOptions{}) -} - func hashFromImageID(imageID string) string { s := strings.Split(imageID, ":") return s[len(s)-1]