From a22b3b3b01596430cea0d5e4f877999f52134713 Mon Sep 17 00:00:00 2001 From: Konstantin Kozoriz Date: Thu, 19 Feb 2026 15:49:36 +0300 Subject: [PATCH 1/3] refactor: add SafeClient.SetProbeEndpoint and Copy methods Signed-off-by: Konstantin Kozoriz --- pkg/libsaferequest/client/http.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/libsaferequest/client/http.go b/pkg/libsaferequest/client/http.go index c63e49a7..a448aaf2 100644 --- a/pkg/libsaferequest/client/http.go +++ b/pkg/libsaferequest/client/http.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "net/http" + "time" "github.com/spf13/pflag" apiruntime "k8s.io/apimachinery/pkg/runtime" @@ -57,6 +58,13 @@ func NewSafeClient(flags ...*pflag.FlagSet) (*SafeClient, error) { return &SafeClient{restConfig}, nil } +// SetProbeEndpoint configures host, TLS ServerName and timeout for probe requests. +func (c *SafeClient) SetProbeEndpoint(timeout time.Duration, targetHost, kubeServiceServerName string) { + c.restConfig.Host = targetHost + c.restConfig.TLSClientConfig.ServerName = kubeServiceServerName + c.restConfig.Timeout = timeout +} + func (c *SafeClient) HTTPDo(req *http.Request) (*http.Response, error) { if len(req.Header.Get("Authorization")) != 0 { httpClient, err := rest.HTTPClientFor(c.restConfig) @@ -132,7 +140,7 @@ func (c *SafeClient) HTTPDo(req *http.Request) (*http.Response, error) { return nil, errors.New("No auth") } -func (c *SafeClient) NewRTClient(schemeFuncs ...(func(s *apiruntime.Scheme) error)) (ctrlrtclient.Client, error) { +func (c *SafeClient) NewRTClient(schemeFuncs ...func(s *apiruntime.Scheme) error) (ctrlrtclient.Client, error) { if c.restConfig == nil { return nil, fmt.Errorf("No rest config") } From 18db7bbd789f76ef1f2b7299131467bf07338c10 Mon Sep 17 00:00:00 2001 From: Konstantin Kozoriz Date: Thu, 19 Feb 2026 15:49:55 +0300 Subject: [PATCH 2/3] feat: auto-detect --publish flag via kubernetes service probe When --publish is not explicitly set, detect whether the client is inside the target cluster by probing the ClusterIP of the default/kubernetes service and comparing UIDs. Signed-off-by: Konstantin Kozoriz --- internal/data/dataexport/cmd/create/create.go | 15 +- .../data/dataexport/cmd/download/download.go | 11 +- internal/data/dataexport/cmd/list/list.go | 12 +- internal/data/dataexport/util/util.go | 50 ++++- internal/data/dataimport/cmd/create/create.go | 11 +- internal/data/dataimport/cmd/upload/upload.go | 18 +- .../dataimport/cmd/upload/upload_windows.go | 21 +- internal/data/dataimport/util/util.go | 50 ++++- internal/data/publish.go | 50 +++++ internal/data/publish_detect.go | 197 ++++++++++++++++++ 10 files changed, 423 insertions(+), 12 deletions(-) create mode 100644 internal/data/publish.go create mode 100644 internal/data/publish_detect.go diff --git a/internal/data/dataexport/cmd/create/create.go b/internal/data/dataexport/cmd/create/create.go index b01ec8dc..6eccb9ce 100644 --- a/internal/data/dataexport/cmd/create/create.go +++ b/internal/data/dataexport/cmd/create/create.go @@ -99,7 +99,6 @@ func Run(ctx context.Context, log *slog.Logger, cmd *cobra.Command, args []strin defer cancel() namespace, _ := cmd.Flags().GetString("namespace") ttl, _ := cmd.Flags().GetString("ttl") - publish, _ := cmd.Flags().GetBool("publish") deName, volumeKind, volumeName, err := parseArgs(args) if err != nil { @@ -107,11 +106,21 @@ func Run(ctx context.Context, log *slog.Logger, cmd *cobra.Command, args []strin } flags := cmd.PersistentFlags() - safeClient, err := safeClient.NewSafeClient(flags) + sc, err := safeClient.NewSafeClient(flags) if err != nil { return err } - rtClient, err := safeClient.NewRTClient(v1alpha1.AddToScheme) + rtClient, err := sc.NewRTClient(v1alpha1.AddToScheme) + if err != nil { + return err + } + + publishFlag, err := dataio.ParsePublishFlag(cmd.Flags()) + if err != nil { + return err + } + + publish, err := dataio.ResolvePublish(ctx, publishFlag, rtClient, sc, log) if err != nil { return err } diff --git a/internal/data/dataexport/cmd/download/download.go b/internal/data/dataexport/cmd/download/download.go index 412272ca..0e027bf7 100644 --- a/internal/data/dataexport/cmd/download/download.go +++ b/internal/data/dataexport/cmd/download/download.go @@ -224,7 +224,6 @@ func recursiveDownload(ctx context.Context, sClient *safeClient.SafeClient, log func Run(ctx context.Context, log *slog.Logger, cmd *cobra.Command, args []string) error { namespace, _ := cmd.Flags().GetString("namespace") dstPath, _ := cmd.Flags().GetString("output") - publish, _ := cmd.Flags().GetBool("publish") ttl, _ := cmd.Flags().GetString("ttl") dataName, srcPath, err := dataio.ParseArgs(args) @@ -243,6 +242,16 @@ func Run(ctx context.Context, log *slog.Logger, cmd *cobra.Command, args []strin return err } + publishFlag, err := dataio.ParsePublishFlag(cmd.Flags()) + if err != nil { + return err + } + + publish, err := dataio.ResolvePublish(ctx, publishFlag, rtClient, sClient, log) + if err != nil { + return err + } + deName, err := util.CreateDataExporterIfNeededFunc(ctx, log, dataName, namespace, publish, ttl, rtClient) if err != nil { return err diff --git a/internal/data/dataexport/cmd/list/list.go b/internal/data/dataexport/cmd/list/list.go index 165335a6..3e5de98e 100644 --- a/internal/data/dataexport/cmd/list/list.go +++ b/internal/data/dataexport/cmd/list/list.go @@ -162,7 +162,6 @@ func Run(ctx context.Context, log *slog.Logger, cmd *cobra.Command, args []strin defer cancel() namespace, _ := cmd.Flags().GetString("namespace") - publish, _ := cmd.Flags().GetBool("publish") ttl, _ := cmd.Flags().GetString("ttl") dataName, srcPath, err := parseArgs(args) @@ -181,6 +180,17 @@ func Run(ctx context.Context, log *slog.Logger, cmd *cobra.Command, args []strin if err != nil { return err } + + publishFlag, err := dataio.ParsePublishFlag(cmd.Flags()) + if err != nil { + return err + } + + publish, err := dataio.ResolvePublish(ctx, publishFlag, rtClient, sClient, log) + if err != nil { + return err + } + deName, err := util.CreateDataExporterIfNeededFunc(ctx, log, dataName, namespace, publish, ttl, rtClient) if err != nil { return err diff --git a/internal/data/dataexport/util/util.go b/internal/data/dataexport/util/util.go index f10a4898..4ea23e26 100644 --- a/internal/data/dataexport/util/util.go +++ b/internal/data/dataexport/util/util.go @@ -65,8 +65,9 @@ func GetDataExport(ctx context.Context, deName, namespace string, rtClient ctrlr return deObj, nil } -func GetDataExportWithRestart(ctx context.Context, deName, namespace string, rtClient ctrlrtclient.Client) (*v1alpha1.DataExport, error) { +func GetDataExportWithRestart(ctx context.Context, deName, namespace string, publish bool, rtClient ctrlrtclient.Client) (*v1alpha1.DataExport, error) { deObj := &v1alpha1.DataExport{} + publishReconciled := false for i := 0; ; i++ { var returnErr error @@ -77,6 +78,19 @@ func GetDataExportWithRestart(ctx context.Context, deName, namespace string, rtC return nil, fmt.Errorf("kube Get dataexport with restart: %s", err.Error()) } + // On the first iteration, reconcile Spec.Publish with the resolved value. + // If the object was patched, restart the loop to pick up the updated status. + if !publishReconciled { + patched, err := EnsureDataExportPublish(ctx, deObj, publish, rtClient) + if err != nil { + return nil, err + } + publishReconciled = true + if patched { + continue + } + } + for _, condition := range deObj.Status.Conditions { // restart DataExport if Expired if condition.Type == "Expired" { @@ -230,7 +244,7 @@ func getExportStatus(ctx context.Context, log *slog.Logger, deName, namespace st var podURL, volumeMode, internalCAData string log.Info("Waiting for DataExport to be ready", slog.String("name", deName), slog.String("namespace", namespace)) - deObj, err := GetDataExportWithRestart(ctx, deName, namespace, rtClient) + deObj, err := GetDataExportWithRestart(ctx, deName, namespace, public, rtClient) if err != nil { return "", "", "", err } @@ -307,3 +321,35 @@ func PrepareDownload(ctx context.Context, log *slog.Logger, deName, namespace st return url, volumeMode, subClient, nil } + +// EnsureDataExportPublish patches DataExport.Spec.Publish to match the resolved value. +// Only upgrades publish: false -> true is patched, true -> false is intentionally skipped +// to avoid downgrading already-published resources. +// Returns (true, nil) if the object was patched and the caller should re-read it. +func EnsureDataExportPublish( + ctx context.Context, + deObj *v1alpha1.DataExport, + publish bool, + rtClient ctrlrtclient.Client, +) (bool, error) { + if !publish { + return false, nil + } + + if deObj == nil { + return false, fmt.Errorf("nil DataExport object") + } + + if deObj.Spec.Publish == publish { + return false, nil + } + + patch := ctrlrtclient.MergeFrom(deObj.DeepCopy()) + deObj.Spec.Publish = publish + + if err := rtClient.Patch(ctx, deObj, patch); err != nil { + return false, fmt.Errorf("patch DataExport publish: %w", err) + } + + return true, nil +} diff --git a/internal/data/dataimport/cmd/create/create.go b/internal/data/dataimport/cmd/create/create.go index 38002832..d960c3b6 100644 --- a/internal/data/dataimport/cmd/create/create.go +++ b/internal/data/dataimport/cmd/create/create.go @@ -77,7 +77,6 @@ func Run(ctx context.Context, log *slog.Logger, cmd *cobra.Command, args []strin name := args[0] namespace, _ := cmd.Flags().GetString("namespace") ttl, _ := cmd.Flags().GetString("ttl") - publish, _ := cmd.Flags().GetBool("publish") pvcFilePath, _ := cmd.Flags().GetString("file") wffc, _ := cmd.Flags().GetBool("wffc") @@ -109,6 +108,16 @@ func Run(ctx context.Context, log *slog.Logger, cmd *cobra.Command, args []strin namespace = pvcSpec.Namespace } + publishFlag, err := dataio.ParsePublishFlag(cmd.Flags()) + if err != nil { + return err + } + + publish, err := dataio.ResolvePublish(ctx, publishFlag, rtClient, sc, log) + if err != nil { + return err + } + if err := util.CreateDataImport(ctx, name, namespace, ttl, publish, wffc, pvcSpec, rtClient); err != nil { return err } diff --git a/internal/data/dataimport/cmd/upload/upload.go b/internal/data/dataimport/cmd/upload/upload.go index 89a1bc3e..a92245de 100644 --- a/internal/data/dataimport/cmd/upload/upload.go +++ b/internal/data/dataimport/cmd/upload/upload.go @@ -17,6 +17,7 @@ import ( "github.com/spf13/cobra" dataio "github.com/deckhouse/deckhouse-cli/internal/data" + v1alpha1 "github.com/deckhouse/deckhouse-cli/internal/data/dataimport/api/v1alpha1" "github.com/deckhouse/deckhouse-cli/internal/data/dataimport/util" client "github.com/deckhouse/deckhouse-cli/pkg/libsaferequest/client" ) @@ -65,7 +66,6 @@ func cmdExamples() string { func Run(ctx context.Context, log *slog.Logger, cmd *cobra.Command, args []string) error { pathToFile, _ := cmd.Flags().GetString("file") chunks, _ := cmd.Flags().GetInt("chunks") - publish, _ := cmd.Flags().GetBool("publish") namespace, _ := cmd.Flags().GetString("namespace") dstPath, _ := cmd.Flags().GetString("dstPath") resume, _ := cmd.Flags().GetBool("resume") @@ -83,6 +83,22 @@ func Run(ctx context.Context, log *slog.Logger, cmd *cobra.Command, args []strin log.Info("Run") + // Create runtime client for publish auto-detection and reconciliation. + rtClient, err := httpClient.NewRTClient(v1alpha1.AddToScheme) + if err != nil { + return err + } + + publishFlag, err := dataio.ParsePublishFlag(cmd.Flags()) + if err != nil { + return err + } + + publish, err := dataio.ResolvePublish(ctx, publishFlag, rtClient, httpClient, log) + if err != nil { + return err + } + permOctal := defaultFilePermissions uid := os.Getuid() gid := os.Getgid() diff --git a/internal/data/dataimport/cmd/upload/upload_windows.go b/internal/data/dataimport/cmd/upload/upload_windows.go index 04c590b4..28dc0d2e 100644 --- a/internal/data/dataimport/cmd/upload/upload_windows.go +++ b/internal/data/dataimport/cmd/upload/upload_windows.go @@ -13,10 +13,12 @@ import ( "strconv" "strings" + "github.com/spf13/cobra" + dataio "github.com/deckhouse/deckhouse-cli/internal/data" + v1alpha1 "github.com/deckhouse/deckhouse-cli/internal/data/dataimport/api/v1alpha1" "github.com/deckhouse/deckhouse-cli/internal/data/dataimport/util" client "github.com/deckhouse/deckhouse-cli/pkg/libsaferequest/client" - "github.com/spf13/cobra" ) const ( @@ -63,7 +65,6 @@ func cmdExamples() string { func Run(ctx context.Context, log *slog.Logger, cmd *cobra.Command, args []string) error { pathToFile, _ := cmd.Flags().GetString("file") chunks, _ := cmd.Flags().GetInt("chunks") - publish, _ := cmd.Flags().GetBool("publish") namespace, _ := cmd.Flags().GetString("namespace") dstPath, _ := cmd.Flags().GetString("dstPath") resume, _ := cmd.Flags().GetBool("resume") @@ -91,6 +92,22 @@ func Run(ctx context.Context, log *slog.Logger, cmd *cobra.Command, args []strin } } + // Create runtime client for publish auto-detection and reconciliation. + rtClient, err := httpClient.NewRTClient(v1alpha1.AddToScheme) + if err != nil { + return err + } + + publishFlag, err := dataio.ParsePublishFlag(cmd.Flags()) + if err != nil { + return err + } + + publish, err := dataio.ResolvePublish(ctx, publishFlag, rtClient, httpClient, log) + if err != nil { + return err + } + podUrl, _, subClient, err := util.PrepareUpload(ctx, log, diName, namespace, publish, httpClient) if err != nil { return err diff --git a/internal/data/dataimport/util/util.go b/internal/data/dataimport/util/util.go index 039e2400..85a533e1 100644 --- a/internal/data/dataimport/util/util.go +++ b/internal/data/dataimport/util/util.go @@ -95,7 +95,10 @@ func GetDataImportWithRestart( ctx context.Context, diName, namespace string, rtClient ctrlrtclient.Client, + publish bool, ) (*v1alpha1.DataImport, error) { + publishReconciled := false + for i := 0; ; i++ { if err := ctx.Err(); err != nil { return nil, err @@ -106,6 +109,19 @@ func GetDataImportWithRestart( return nil, fmt.Errorf("kube Get dataimport with ready: %s", err.Error()) } + // On the first iteration, reconcile Spec.Publish with the resolved value. + // If the object was patched, restart the loop to pick up the updated status. + if !publishReconciled { + patched, err := EnsureDataImportPublish(ctx, diObj, publish, rtClient) + if err != nil { + return nil, err + } + publishReconciled = true + if patched { + continue + } + } + var notReadyErr error for _, condition := range diObj.Status.Conditions { if condition.Type == "Expired" && condition.Status == "True" { @@ -176,7 +192,7 @@ func PrepareUpload( return "", "", nil, err } - diObj, err := GetDataImportWithRestart(ctx, diName, namespace, rtClient) + diObj, err := GetDataImportWithRestart(ctx, diName, namespace, rtClient, publish) if err != nil { return "", "", nil, err } @@ -225,6 +241,38 @@ func PrepareUpload( return url, volumeMode, subClient, nil } +// EnsureDataImportPublish patches DataImport.Spec.Publish to match the resolved value. +// Only upgrades publish: false -> true is patched, true -> false is intentionally skipped +// to avoid downgrading already-published resources. +// Returns (true, nil) if the object was patched and the caller should re-read it. +func EnsureDataImportPublish( + ctx context.Context, + diObj *v1alpha1.DataImport, + publish bool, + rtClient ctrlrtclient.Client, +) (bool, error) { + if !publish { + return false, nil + } + + if diObj == nil { + return false, fmt.Errorf("nil DataImport") + } + + if diObj.Spec.Publish == publish { + return false, nil + } + + patch := ctrlrtclient.MergeFrom(diObj.DeepCopy()) + diObj.Spec.Publish = publish + + if err := rtClient.Patch(ctx, diObj, patch); err != nil { + return false, fmt.Errorf("patch DataImport publish: %w", err) + } + + return true, nil +} + func CheckUploadProgress(ctx context.Context, httpClient *safeClient.SafeClient, targetURL string) (int64, error) { req, err := http.NewRequest(http.MethodHead, targetURL, nil) if err != nil { diff --git a/internal/data/publish.go b/internal/data/publish.go new file mode 100644 index 00000000..0ef81353 --- /dev/null +++ b/internal/data/publish.go @@ -0,0 +1,50 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataio + +import ( + "fmt" + + "github.com/spf13/pflag" +) + +// PublishFlag represents the three-state publish flag: +// - Explicit=true, Value=true: user explicitly requested public (published) access +// - Explicit=true, Value=false: user explicitly requested internal (in-cluster) access +// - Explicit=false: auto-detect mode (Value is meaningless) +type PublishFlag struct { + Explicit bool + Value bool +} + +// ParsePublishFlag reads --publish as a three-state value. +// Explicit is true only when user provided --publish/--publish=true/--publish=false. +func ParsePublishFlag(flags *pflag.FlagSet) (PublishFlag, error) { + if flags == nil { + return PublishFlag{}, fmt.Errorf("publish flag parse: nil flag set") + } + + value, err := flags.GetBool("publish") + if err != nil { + return PublishFlag{}, fmt.Errorf("publish flag parse: %w", err) + } + + return PublishFlag{ + Explicit: flags.Changed("publish"), + Value: value, + }, nil +} diff --git a/internal/data/publish_detect.go b/internal/data/publish_detect.go new file mode 100644 index 00000000..79340a74 --- /dev/null +++ b/internal/data/publish_detect.go @@ -0,0 +1,197 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataio + +import ( + "context" + "errors" + "log/slog" + "net" + "time" + + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/types" + ctrlrtclient "sigs.k8s.io/controller-runtime/pkg/client" + + safeClient "github.com/deckhouse/deckhouse-cli/pkg/libsaferequest/client" +) + +const ( + kubeServiceNamespace = "default" + kubeServiceName = "kubernetes" + kubeServiceServerName = "kubernetes.default.svc" + ProbeTimeout = 3 * time.Second +) + +var ErrAutoDetectWithHint = errors.New("cannot auto-detect publish mode, specify --publish=true or --publish=false") + +// ResolvePublish returns explicit publish value if user set the flag, +// otherwise runs autodetection. +func ResolvePublish( + ctx context.Context, + publishFlag PublishFlag, + rtClient ctrlrtclient.Client, + sClient *safeClient.SafeClient, + log *slog.Logger, +) (bool, error) { + if log == nil { + log = slog.Default() + } + + if publishFlag.Explicit { + // User set the flag, return value without autodetection. + log.Info("Using explicit publish mode", slog.Bool("publish", publishFlag.Value)) + return publishFlag.Value, nil + } + + // User didn't set the flag, run autodetection. + log.Info("Auto-detecting publish mode") + return DetectPublish(ctx, rtClient, sClient, log) +} + +// DetectPublish decides default publish mode when user did not set --publish. +// +// Detection strategy: +// 1. Read Service default/kubernetes via the normal kubeconfig endpoint. +// 2. Read the same Service via https://:443 with ServerName override. +// 3. Compare UIDs of both objects. +// +// Decision matrix: +// - same UID: internal path is reachable -> publish=false +// - UID mismatch: ClusterIP reached a different cluster (e.g. local minikube/kind) -> publish=true +// - network-unreachable on probe: internal path is not reachable -> publish=true +// - any other probe error: ambiguous -> fail fast with hint +func DetectPublish( + ctx context.Context, + rtClient ctrlrtclient.Client, + sClient *safeClient.SafeClient, + log *slog.Logger, +) (bool, error) { + if log == nil { + log = slog.Default() + } + + firstSvc, err := getKubeService(ctx, rtClient) + if err != nil { + return false, ErrAutoDetectWithHint + } + + targetURL := "https://" + net.JoinHostPort(firstSvc.Spec.ClusterIP, "443") + + // Clone the original client to avoid mutating command-wide kubeconfig settings. + // Keep auth/CA from kubeconfig, but switch endpoint to ClusterIP and set ServerName + // so TLS validation uses service DNS name instead of raw IP. + probeClient := sClient.Copy() + // Timeout in restConfig.Timeout is required in addition to context.WithTimeout below: + // context limits Go-level read/write, but restConfig.Timeout sets http.Client.Timeout + // which also covers TLS handshake and DNS resolve. Without it the HTTP client inherits + // the default kubeconfig timeout (typically 30s). + probeClient.SetProbeEndpoint(ProbeTimeout, targetURL, kubeServiceServerName) + probeRtClient, err := probeClient.NewRTClient() + + if err != nil { + return false, ErrAutoDetectWithHint + } + + // Probe timeout limits only autodetect latency + // main command context stays unchanged. + probeCtx, cancel := context.WithTimeout(ctx, ProbeTimeout) + defer cancel() + + secondSvc, err := getKubeService(probeCtx, probeRtClient) + if err != nil { + // Network-level failure means in-cluster endpoint is not reachable + // from current environment. + if isNetworkUnreachable(err) { + log.Info("Publish autodetect: internal endpoint is unreachable, selecting publish=true") + return true, nil + } + // TLS/auth/RBAC/other errors are ambiguous. + return false, ErrAutoDetectWithHint + } + + // UID mismatch: both endpoints responded but belong to different clusters. + // Typical case: a local cluster (minikube, kind) has the same ClusterIP + // as the remote target cluster. The user is not inside the target cluster. + if firstSvc.UID != secondSvc.UID { + log.Info("Publish autodetect: UID mismatch between external and internal endpoints, selecting publish=true") + return true, nil + } + + // Same service identity via both paths -> internal endpoint is reachable. + log.Info("Publish autodetect: internal endpoint is reachable, selecting publish=false") + return false, nil +} + +// isNetworkUnreachable classifies transport-level failures that indicate +// the in-cluster endpoint is not reachable from the current environment. +// +// Returns true for errors that clearly mean "no network path to ClusterIP": +// - context.DeadlineExceeded: probe timed out waiting for any response +// - net.OpError: low-level socket failures (EHOSTUNREACH, ENETUNREACH, +// ECONNREFUSED, ETIMEDOUT, etc.) - all indicate the ClusterIP is not +// routable from here. +// - net.DNSError: DNS resolution failed for the target host +// - net.Error with Timeout(): any other network-level timeout +// +// Returns false for: +// - nil: no error +// - context.Canceled: deliberate cancellation, not a network issue +// - everything else (TLS, RBAC, HTTP-level errors): the endpoint is +// reachable but rejected the request - ambiguous for autodetect +func isNetworkUnreachable(err error) bool { + if err == nil { + return false + } + + if errors.Is(err, context.DeadlineExceeded) { + return true + } + + if errors.Is(err, context.Canceled) { + return false + } + + var opErr *net.OpError + if errors.As(err, &opErr) { + return true + } + + var dnsErr *net.DNSError + if errors.As(err, &dnsErr) { + return true + } + + var netErr net.Error + if errors.As(err, &netErr) && netErr.Timeout() { + return true + } + + return false +} + +func getKubeService(ctx context.Context, rtClient ctrlrtclient.Client) (*corev1.Service, error) { + var svc corev1.Service + if err := rtClient.Get(ctx, types.NamespacedName{ + Name: kubeServiceName, + Namespace: kubeServiceNamespace, + }, &svc); err != nil { + return nil, err + } + + return &svc, nil +} From 63439f76fcbca69aff275a55ed05272e69651eee Mon Sep 17 00:00:00 2001 From: Konstantin Kozoriz Date: Thu, 19 Feb 2026 15:50:27 +0300 Subject: [PATCH 3/3] test: change and add unit tests for publish autodetect Signed-off-by: Konstantin Kozoriz --- .../cmd/download/download_http_test.go | 8 +- .../dataexport/cmd/list/list_http_test.go | 7 +- internal/data/publish_detect_test.go | 214 ++++++++++++++++++ 3 files changed, 222 insertions(+), 7 deletions(-) create mode 100644 internal/data/publish_detect_test.go diff --git a/internal/data/dataexport/cmd/download/download_http_test.go b/internal/data/dataexport/cmd/download/download_http_test.go index c1b58d9a..bfe67ece 100644 --- a/internal/data/dataexport/cmd/download/download_http_test.go +++ b/internal/data/dataexport/cmd/download/download_http_test.go @@ -57,7 +57,7 @@ func TestDownloadFilesystem_OK(t *testing.T) { outFile := filepath.Join(t.TempDir(), "out.txt") cmd := NewCommand(context.TODO(), slog.Default()) - cmd.SetArgs([]string{"myexport", "foo.txt", "-o", outFile}) + cmd.SetArgs([]string{"myexport", "foo.txt", "-o", outFile, "--publish=false"}) var buf bytes.Buffer cmd.SetOut(&buf) cmd.SetErr(&buf) @@ -87,7 +87,7 @@ func TestDownloadFilesystem_BadPath(t *testing.T) { defer func() { util.PrepareDownloadFunc = origPrep; util.CreateDataExporterIfNeededFunc = origCreate }() cmd := NewCommand(context.TODO(), slog.Default()) - cmd.SetArgs([]string{"myexport", "foo.txt", "-o", filepath.Join(t.TempDir(), "out.txt")}) + cmd.SetArgs([]string{"myexport", "foo.txt", "-o", filepath.Join(t.TempDir(), "out.txt"), "--publish=false"}) require.NoError(t, cmd.Execute()) } @@ -115,7 +115,7 @@ func TestDownloadBlock_OK(t *testing.T) { outFile := filepath.Join(t.TempDir(), "raw.img") cmd := NewCommand(context.TODO(), slog.Default()) - cmd.SetArgs([]string{"myexport", "-o", outFile}) + cmd.SetArgs([]string{"myexport", "-o", outFile, "--publish=false"}) cmd.SetOut(io.Discard) cmd.SetErr(io.Discard) require.NoError(t, cmd.Execute()) @@ -141,7 +141,7 @@ func TestDownloadBlock_WrongEndpoint(t *testing.T) { defer func() { util.PrepareDownloadFunc = origPrep; util.CreateDataExporterIfNeededFunc = origCreate }() cmd := NewCommand(context.TODO(), slog.Default()) - cmd.SetArgs([]string{"myexport", "-o", filepath.Join(t.TempDir(), "raw.img")}) + cmd.SetArgs([]string{"myexport", "-o", filepath.Join(t.TempDir(), "raw.img"), "--publish=false"}) cmd.SetOut(io.Discard) cmd.SetErr(io.Discard) require.NoError(t, cmd.Execute()) diff --git a/internal/data/dataexport/cmd/list/list_http_test.go b/internal/data/dataexport/cmd/list/list_http_test.go index bff3df82..1a8c5f3d 100644 --- a/internal/data/dataexport/cmd/list/list_http_test.go +++ b/internal/data/dataexport/cmd/list/list_http_test.go @@ -55,7 +55,7 @@ func TestListFilesystem_OK(t *testing.T) { os.Stdout = w cmd := NewCommand(context.TODO(), slog.Default()) - cmd.SetArgs([]string{"myexport", "/"}) + cmd.SetArgs([]string{"myexport", "/", "--publish=false"}) require.NoError(t, cmd.Execute()) w.Close() @@ -92,7 +92,7 @@ func TestListBlock_OK(t *testing.T) { os.Stdout = w cmd := NewCommand(context.TODO(), slog.Default()) - cmd.SetArgs([]string{"myexport"}) + cmd.SetArgs([]string{"myexport", "--publish=false"}) require.NoError(t, cmd.Execute()) w.Close() @@ -124,7 +124,8 @@ func TestListFilesystem_NotDir(t *testing.T) { cmd := NewCommand(context.TODO(), slog.Default()) cmd.SetOut(&bytes.Buffer{}) cmd.SetErr(&bytes.Buffer{}) - cmd.SetArgs([]string{"myexport", "some/invalid"}) + cmd.SetArgs([]string{"myexport", "some/invalid", "--publish=false"}) err := cmd.Execute() require.Error(t, err) + require.Contains(t, err.Error(), "invalid source path") } diff --git a/internal/data/publish_detect_test.go b/internal/data/publish_detect_test.go new file mode 100644 index 00000000..2bc05620 --- /dev/null +++ b/internal/data/publish_detect_test.go @@ -0,0 +1,214 @@ +/* +Copyright 2026 Flant JSC + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package dataio + +import ( + "context" + "errors" + "fmt" + "log/slog" + "net" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/types" + kubescheme "k8s.io/client-go/kubernetes/scheme" + "sigs.k8s.io/controller-runtime/pkg/client/fake" +) + +// timeoutError implements net.Error with Timeout()=true but is neither +// net.OpError nor net.DNSError, so it exercises the final net.Error branch. +type timeoutError struct{} + +func (e *timeoutError) Error() string { return "i/o timeout" } +func (e *timeoutError) Timeout() bool { return true } +func (e *timeoutError) Temporary() bool { return false } + +// nonTimeoutNetError implements net.Error with Timeout()=false. +type nonTimeoutNetError struct{} + +func (e *nonTimeoutNetError) Error() string { return "net error" } +func (e *nonTimeoutNetError) Timeout() bool { return false } +func (e *nonTimeoutNetError) Temporary() bool { return false } + +func TestIsNetworkUnreachable(t *testing.T) { + tests := []struct { + name string + err error + want bool + }{ + { + name: "nil error", + err: nil, + want: false, + }, + { + name: "deadline exceeded", + err: context.DeadlineExceeded, + want: true, + }, + { + name: "wrapped deadline exceeded", + err: fmt.Errorf("get service: %w", context.DeadlineExceeded), + want: true, + }, + { + name: "context canceled", + err: context.Canceled, + want: false, + }, + { + name: "wrapped context canceled", + err: fmt.Errorf("probe: %w", context.Canceled), + want: false, + }, + { + name: "net.OpError dial", + err: &net.OpError{Op: "dial", Net: "tcp", Err: errors.New("connection refused")}, + want: true, + }, + { + name: "net.OpError wrapped", + err: fmt.Errorf("probe: %w", &net.OpError{Op: "dial", Net: "tcp", Err: errors.New("no route to host")}), + want: true, + }, + { + name: "net.DNSError", + err: &net.DNSError{Err: "no such host", Name: "kubernetes.default.svc"}, + want: true, + }, + { + name: "net.DNSError wrapped", + err: fmt.Errorf("resolve: %w", &net.DNSError{Err: "server misbehaving", Name: "example.com"}), + want: true, + }, + { + name: "net.Error with timeout", + err: &timeoutError{}, + want: true, + }, + { + name: "net.Error without timeout", + err: &nonTimeoutNetError{}, + want: false, + }, + { + name: "generic error", + err: errors.New("something went wrong"), + want: false, + }, + { + name: "wrapped generic error", + err: fmt.Errorf("outer: %w", errors.New("inner")), + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := isNetworkUnreachable(tt.err) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestResolvePublish_Explicit(t *testing.T) { + ctx := context.Background() + log := slog.Default() + + tests := []struct { + name string + flag PublishFlag + want bool + }{ + { + name: "explicit true", + flag: PublishFlag{Explicit: true, Value: true}, + want: true, + }, + { + name: "explicit false", + flag: PublishFlag{Explicit: true, Value: false}, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // rtClient and sClient are nil — explicit path returns early. + got, err := ResolvePublish(ctx, tt.flag, nil, nil, log) + require.NoError(t, err) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestResolvePublish_NilLogger(t *testing.T) { + ctx := context.Background() + + got, err := ResolvePublish(ctx, PublishFlag{Explicit: true, Value: true}, nil, nil, nil) + require.NoError(t, err) + assert.True(t, got) +} + +func TestGetKubeService(t *testing.T) { + scheme := runtime.NewScheme() + require.NoError(t, kubescheme.AddToScheme(scheme)) + + ctx := context.Background() + + t.Run("service exists", func(t *testing.T) { + svc := &corev1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: kubeServiceName, + Namespace: kubeServiceNamespace, + UID: types.UID("test-uid-123"), + }, + Spec: corev1.ServiceSpec{ + ClusterIP: "10.96.0.1", + }, + } + c := fake.NewClientBuilder().WithScheme(scheme).WithObjects(svc).Build() + + got, err := getKubeService(ctx, c) + require.NoError(t, err) + assert.Equal(t, types.UID("test-uid-123"), got.UID) + assert.Equal(t, "10.96.0.1", got.Spec.ClusterIP) + }) + + t.Run("service not found", func(t *testing.T) { + c := fake.NewClientBuilder().WithScheme(scheme).Build() + + _, err := getKubeService(ctx, c) + require.Error(t, err) + }) +} + +func TestDetectPublish_ServiceNotFound(t *testing.T) { + scheme := runtime.NewScheme() + require.NoError(t, kubescheme.AddToScheme(scheme)) + + c := fake.NewClientBuilder().WithScheme(scheme).Build() + log := slog.Default() + + _, err := DetectPublish(context.Background(), c, nil, log) + require.ErrorIs(t, err, ErrAutoDetectWithHint) +}