Skip to content
299 changes: 46 additions & 253 deletions internal/mirror/cmd/pull/pull.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"syscall"
"time"

"github.com/Masterminds/semver/v3"
"github.com/google/go-containerregistry/pkg/authn"
"github.com/hashicorp/go-multierror"
"github.com/samber/lo"
Expand All @@ -44,10 +43,8 @@ import (
"github.com/deckhouse/deckhouse-cli/internal/mirror"
pullflags "github.com/deckhouse/deckhouse-cli/internal/mirror/cmd/pull/flags"
"github.com/deckhouse/deckhouse-cli/internal/mirror/gostsums"
"github.com/deckhouse/deckhouse-cli/internal/mirror/operations"
"github.com/deckhouse/deckhouse-cli/internal/mirror/releases"
"github.com/deckhouse/deckhouse-cli/internal/mirror/modules"
"github.com/deckhouse/deckhouse-cli/internal/version"
"github.com/deckhouse/deckhouse-cli/pkg/libmirror/modules"
"github.com/deckhouse/deckhouse-cli/pkg/libmirror/operations/params"
"github.com/deckhouse/deckhouse-cli/pkg/libmirror/util/log"
"github.com/deckhouse/deckhouse-cli/pkg/libmirror/validation"
Expand Down Expand Up @@ -135,24 +132,6 @@ func setupLogger() *log.SLogger {
return log.NewSLogger(logLevel)
}

func findTagsToMirror(pullParams *params.PullParams, logger *log.SLogger, client registry.Client) ([]string, []string, error) {
strickTags := []string{}
if pullParams.DeckhouseTag != "" {
strickTags = append(strickTags, pullParams.DeckhouseTag)
}

versionsToMirror, channelsToMirror, err := versionsToMirrorFunc(pullParams, client, strickTags)
if err != nil {
return nil, nil, fmt.Errorf("Find versions to mirror: %w", err)
}

logger.Infof("Deckhouse releases to pull: %+v", versionsToMirror)

return lo.Map(versionsToMirror, func(v semver.Version, _ int) string {
return "v" + v.String()
}), channelsToMirror, nil
}

func buildPullParams(logger params.Logger) *params.PullParams {
mirrorCtx := &params.PullParams{
BaseParams: params.BaseParams{
Expand Down Expand Up @@ -212,9 +191,6 @@ func lastPullWasTooLongAgoToRetry(pullParams *params.PullParams) bool {
return time.Since(s.ModTime()) > 24*time.Hour
}

// versionsToMirrorFunc allows mocking releases.VersionsToMirror in tests
var versionsToMirrorFunc = releases.VersionsToMirror

// Puller encapsulates the logic for pulling Deckhouse components
type Puller struct {
cmd *cobra.Command
Expand Down Expand Up @@ -246,108 +222,6 @@ func (p *Puller) Execute(ctx context.Context) error {
return err
}

if os.Getenv("NEW_PULL") == "true" {
logger := dkplog.NewNop()

if log.DebugLogLevel() >= 3 {
logger = dkplog.NewLogger(dkplog.WithLevel(slog.LevelDebug))
}

// Create registry client for module operations
clientOpts := &regclient.Options{
Insecure: p.params.Insecure,
TLSSkipVerify: p.params.SkipTLSVerification,
Logger: logger,
}

if p.params.RegistryAuth != nil {
clientOpts.Auth = p.params.RegistryAuth
}

var c registry.Client
c = regclient.NewClientWithOptions(p.params.DeckhouseRegistryRepo, clientOpts)

if os.Getenv("STUB_REGISTRY_CLIENT") == "true" {
c = stub.NewRegistryClientStub()
}

// Scope to the registry path and modules suffix
if p.params.RegistryPath != "" {
c = c.WithSegment(p.params.RegistryPath)
}

// Create module filter from CLI flags
filter, err := p.createModuleFilter()
if err != nil {
return err
}

svc := mirror.NewPullService(
registryservice.NewService(c, logger),
pullflags.TempDir,
pullflags.DeckhouseTag,
&mirror.PullServiceOptions{
SkipPlatform: pullflags.NoPlatform,
SkipSecurity: pullflags.NoSecurityDB,
SkipModules: pullflags.NoModules,
OnlyExtraImages: pullflags.OnlyExtraImages,
IgnoreSuspend: pullflags.IgnoreSuspend,
ModuleFilter: filter,
BundleDir: pullflags.ImagesBundlePath,
BundleChunkSize: pullflags.ImagesBundleChunkSizeGB * 1000 * 1000 * 1000,
},
logger.Named("pull"),
p.logger,
)

err = svc.Pull(ctx)
if err != nil {
// Handle context cancellation gracefully
if errors.Is(err, context.Canceled) {
p.logger.WarnLn("Operation cancelled by user")
return nil
}
return fmt.Errorf("pull from registry: %w", err)
}

return nil
}

if err := p.pullPlatform(); err != nil {
return err
}

if err := p.pullSecurityDatabases(); err != nil {
return err
}

if err := p.pullModules(); err != nil {
return err
}

if err := p.computeGOSTDigests(); err != nil {
return err
}

return p.finalCleanup()
}

// cleanupWorkingDirectory handles cleanup of the working directory if needed
func (p *Puller) cleanupWorkingDirectory() error {
if pullflags.NoPullResume || lastPullWasTooLongAgoToRetry(p.params) {
if err := os.RemoveAll(p.params.WorkingDir); err != nil {
return fmt.Errorf("Cleanup last unfinished pull data: %w", err)
}
}
return nil
}

// pullPlatform pulls the Deckhouse platform components
func (p *Puller) pullPlatform() error {
if p.params.SkipPlatform {
return nil
}

logger := dkplog.NewNop()

if log.DebugLogLevel() >= 3 {
Expand Down Expand Up @@ -377,22 +251,55 @@ func (p *Puller) pullPlatform() error {
c = c.WithSegment(p.params.RegistryPath)
}

return p.logger.Process("Pull Deckhouse Kubernetes Platform", func() error {
if err := p.validatePlatformAccess(); err != nil {
return err
}
// Create module filter from CLI flags
filter, err := p.createModuleFilter()
if err != nil {
return err
}

tagsToMirror, channelsToMirror, err := findTagsToMirror(p.params, p.logger, c)
if err != nil {
return fmt.Errorf("Find tags to mirror: %w", err)
}
svc := mirror.NewPullService(
registryservice.NewService(c, logger),
pullflags.TempDir,
pullflags.DeckhouseTag,
&mirror.PullServiceOptions{
SkipPlatform: pullflags.NoPlatform,
SkipSecurity: pullflags.NoSecurityDB,
SkipModules: pullflags.NoModules,
OnlyExtraImages: pullflags.OnlyExtraImages,
IgnoreSuspend: pullflags.IgnoreSuspend,
ModuleFilter: filter,
BundleDir: pullflags.ImagesBundlePath,
BundleChunkSize: pullflags.ImagesBundleChunkSizeGB * 1000 * 1000 * 1000,
},
logger.Named("pull"),
p.logger,
)

if err = operations.PullDeckhousePlatform(p.params, channelsToMirror, tagsToMirror, c); err != nil {
return err
err = svc.Pull(ctx)
if err != nil {
// Handle context cancellation gracefully
if errors.Is(err, context.Canceled) {
p.logger.WarnLn("Operation cancelled by user")
return nil
}
return fmt.Errorf("pull from registry: %w", err)
}

return nil
})
if err := p.computeGOSTDigests(); err != nil {
return err
}

return p.finalCleanup()
}

// cleanupWorkingDirectory handles cleanup of the working directory if needed
func (p *Puller) cleanupWorkingDirectory() error {
if pullflags.NoPullResume || lastPullWasTooLongAgoToRetry(p.params) {
if err := os.RemoveAll(p.params.WorkingDir); err != nil {
return fmt.Errorf("Cleanup last unfinished pull data: %w", err)
}
}
return nil
}

// validatePlatformAccess validates access to the platform registry
Expand Down Expand Up @@ -426,120 +333,6 @@ func (p *Puller) validatePlatformAccess() error {
return nil
}

// pullSecurityDatabases pulls the security databases
func (p *Puller) pullSecurityDatabases() error {
if p.params.SkipSecurityDatabases {
return nil
}

logger := dkplog.NewNop()

if log.DebugLogLevel() >= 3 {
logger = dkplog.NewLogger(dkplog.WithLevel(slog.LevelDebug))
}

// Create registry client for module operations
clientOpts := &regclient.Options{
Insecure: p.params.Insecure,
TLSSkipVerify: p.params.SkipTLSVerification,
Logger: logger,
}

if p.params.RegistryAuth != nil {
clientOpts.Auth = p.params.RegistryAuth
}

var c registry.Client
c = regclient.NewClientWithOptions(p.params.DeckhouseRegistryRepo, clientOpts)

if os.Getenv("STUB_REGISTRY_CLIENT") == "true" {
c = stub.NewRegistryClientStub()
}

// Scope to the registry path and modules suffix
if p.params.RegistryPath != "" {
c = c.WithSegment(p.params.RegistryPath)
}

return p.logger.Process("Pull Security Databases", func() error {
ctx, cancel := context.WithTimeout(p.cmd.Context(), 15*time.Second)
defer cancel()

imageRef := p.params.DeckhouseRegistryRepo + "/security/trivy-db:2"
err := p.accessValidator.ValidateReadAccessForImage(ctx, imageRef, p.validationOpts...)
switch {
case errors.Is(err, validation.ErrImageUnavailable):
p.logger.Warnf("Skipping pull of security databases: %v", err)
return nil
case err != nil:
return fmt.Errorf("Source registry is not accessible: %w", err)
}

if err := operations.PullSecurityDatabases(p.params, c); err != nil {
return err
}
return nil
})
}

// pullModules pulls the Deckhouse modules
func (p *Puller) pullModules() error {
if p.params.SkipModules && !p.params.OnlyExtraImages {
return nil
}

processName := "Pull Modules"
if p.params.OnlyExtraImages {
processName = "Pull Extra Images"
}

logger := dkplog.NewNop()

if log.DebugLogLevel() >= 3 {
logger = dkplog.NewLogger(dkplog.WithLevel(slog.LevelDebug))
}

// Create registry client for module operations
clientOpts := &regclient.Options{
Insecure: p.params.Insecure,
TLSSkipVerify: p.params.SkipTLSVerification,
Logger: logger,
}

if p.params.RegistryAuth != nil {
clientOpts.Auth = p.params.RegistryAuth
}

var c registry.Client
c = regclient.NewClientWithOptions(p.params.DeckhouseRegistryRepo, clientOpts)

if os.Getenv("STUB_REGISTRY_CLIENT") == "true" {
c = stub.NewRegistryClientStub()
}

// Scope to the registry path and modules suffix
if p.params.RegistryPath != "" {
c = c.WithSegment(p.params.RegistryPath)
}

if p.params.ModulesPathSuffix != "" {
c = c.WithSegment(p.params.ModulesPathSuffix)
}

return p.logger.Process(processName, func() error {
if err := p.validateModulesAccess(); err != nil {
return err
}

filter, err := p.createModuleFilter()
if err != nil {
return err
}

return operations.PullModules(p.params, filter, c)
})
}

// validateModulesAccess validates access to the modules registry
func (p *Puller) validateModulesAccess() error {
modulesRepo := path.Join(p.params.DeckhouseRegistryRepo, p.params.ModulesPathSuffix)
Expand Down
Loading