diff --git a/Makefile b/Makefile index 3f0b264..9106165 100644 --- a/Makefile +++ b/Makefile @@ -228,8 +228,17 @@ install: build ## Build and install the kubectl plugin to ~/.local/bin (no sudo echo " ├─ Temporarily updating PATH for verification"; \ if PATH="$(INSTALL_PATH):$$PATH" command -v kubectl >/dev/null 2>&1; then \ if PATH="$(INSTALL_PATH):$$PATH" kubectl plugin list 2>/dev/null | grep -q "kubectl-oadp"; then \ - echo " ├─ ✅ Installation verified: kubectl oadp plugin is accessible"; \ - PATH="$(INSTALL_PATH):$$PATH" kubectl oadp version 2>/dev/null || echo " │ └─ (Note: version command requires cluster access)"; \ + echo " └─ ✅ Installation verified: kubectl oadp plugin is accessible"; \ + echo ""; \ + echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"; \ + echo "🎉 Installation complete!"; \ + echo ""; \ + echo " ⚠️ To use in this terminal session, run:"; \ + echo " export PATH=\"$(INSTALL_PATH):$$PATH\""; \ + echo ""; \ + echo " Quick start:"; \ + echo " • kubectl oadp --help # Show available commands"; \ + echo " • kubectl oadp backup get # List backups"; \ else \ echo " ├─ ❌ Installation verification failed: kubectl oadp plugin not found"; \ echo " │ └─ Try running: export PATH=\"$(INSTALL_PATH):$$PATH\""; \ @@ -242,9 +251,22 @@ install: build ## Build and install the kubectl plugin to ~/.local/bin (no sudo if command -v kubectl >/dev/null 2>&1; then \ if kubectl plugin list 2>/dev/null | grep -q "kubectl-oadp"; then \ echo " ├─ ✅ Installation verified: kubectl oadp plugin is accessible"; \ - echo " ├─ Running version command..."; \ + echo " └─ Running version command..."; \ + echo ""; \ + version_output=$$(kubectl oadp version 2>&1 | grep -v "WARNING: the client version does not match"); \ + if [ $$? -eq 0 ] && [ -n "$$version_output" ]; then \ + echo "$$version_output" | sed 's/^/ /'; \ + else \ + echo " (Note: version command requires cluster access)"; \ + fi; \ + echo ""; \ + echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━"; \ + echo "🎉 Installation complete!"; \ echo ""; \ - kubectl oadp version 2>/dev/null || echo " │ └─ (Note: version command requires cluster access)"; \ + echo " Quick start:"; \ + echo " • kubectl oadp --help # Show available commands"; \ + echo " • kubectl oadp backup get # List backups"; \ + echo " • kubectl oadp version # Show version info"; \ else \ echo " └─ ❌ Installation verification failed: kubectl oadp plugin not found"; \ fi; \ diff --git a/cmd/non-admin/backup/describe.go b/cmd/non-admin/backup/describe.go index c772e4c..22214b2 100644 --- a/cmd/non-admin/backup/describe.go +++ b/cmd/non-admin/backup/describe.go @@ -2,6 +2,7 @@ package backup import ( "context" + "encoding/json" "fmt" "sort" "strings" @@ -18,7 +19,10 @@ import ( ) func NewDescribeCommand(f client.Factory, use string) *cobra.Command { - var requestTimeout time.Duration + var ( + requestTimeout time.Duration + details bool + ) c := &cobra.Command{ Use: use + " NAME", @@ -68,15 +72,24 @@ func NewDescribeCommand(f client.Factory, use string) *cobra.Command { } // Print in Velero-style format - printNonAdminBackupDetails(cmd, &nab) + printNonAdminBackupDetails(cmd, &nab, kbClient, backupName, userNamespace, effectiveTimeout) + + // Add detailed output if --details flag is set + if details { + if err := printDetailedBackupInfo(cmd, kbClient, backupName, userNamespace, effectiveTimeout); err != nil { + return fmt.Errorf("failed to fetch detailed backup information: %w", err) + } + } return nil }, Example: ` kubectl oadp nonadmin backup describe my-backup - kubectl oadp nonadmin backup describe my-backup --request-timeout=30m`, + kubectl oadp nonadmin backup describe my-backup --details + kubectl oadp nonadmin backup describe my-backup --details --request-timeout=30m`, } c.Flags().DurationVar(&requestTimeout, "request-timeout", 0, fmt.Sprintf("The length of time to wait before giving up on a single server request (e.g., 30s, 5m, 1h). Overrides %s env var. Default: %v", shared.TimeoutEnvVar, shared.DefaultHTTPTimeout)) + c.Flags().BoolVar(&details, "details", false, "Display additional backup details including volume snapshots, resource lists, and item operations") output.BindFlags(c.Flags()) output.ClearOutputFlagDefault(c) @@ -85,7 +98,7 @@ func NewDescribeCommand(f client.Factory, use string) *cobra.Command { } // printNonAdminBackupDetails prints backup details in Velero admin describe format -func printNonAdminBackupDetails(cmd *cobra.Command, nab *nacv1alpha1.NonAdminBackup) { +func printNonAdminBackupDetails(cmd *cobra.Command, nab *nacv1alpha1.NonAdminBackup, kbClient kbclient.Client, backupName string, userNamespace string, timeout time.Duration) { out := cmd.OutOrStdout() // Get Velero backup reference if available @@ -309,6 +322,25 @@ func printNonAdminBackupDetails(cmd *cobra.Command, nab *nacv1alpha1.NonAdminBac fmt.Fprintf(out, "\n") + // Fetch and display Resource List + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + resourceList, err := shared.ProcessDownloadRequest(ctx, kbClient, shared.DownloadRequestOptions{ + BackupName: backupName, + DataType: "BackupResourceList", + Namespace: userNamespace, + HTTPTimeout: timeout, + }) + + if err == nil && resourceList != "" { + if formattedList := formatResourceList(resourceList); formattedList != "" { + fmt.Fprintf(out, "Resource List:\n") + fmt.Fprintf(out, "%s\n", formattedList) + fmt.Fprintf(out, "\n") + } + } + // Backup Volumes fmt.Fprintf(out, "Backup Volumes:\n") @@ -347,6 +379,212 @@ func printNonAdminBackupDetails(cmd *cobra.Command, nab *nacv1alpha1.NonAdminBac } } +// printDetailedBackupInfo fetches and displays additional backup details when --details flag is used. +// It uses NonAdminDownloadRequest to fetch: +// - BackupVolumeInfos (snapshot details) +// - BackupResults (errors, warnings) +// - BackupItemOperations (plugin operations) +func printDetailedBackupInfo(cmd *cobra.Command, kbClient kbclient.Client, backupName string, userNamespace string, timeout time.Duration) error { + out := cmd.OutOrStdout() + + ctx, cancel := context.WithTimeout(context.Background(), timeout) + defer cancel() + + hasOutput := false + + // 1. Fetch BackupVolumeInfos + volumeInfo, err := shared.ProcessDownloadRequest(ctx, kbClient, shared.DownloadRequestOptions{ + BackupName: backupName, + DataType: "BackupVolumeInfos", + Namespace: userNamespace, + HTTPTimeout: timeout, + }) + + if err == nil && volumeInfo != "" { + if formattedInfo := formatVolumeInfo(volumeInfo); formattedInfo != "" { + if !hasOutput { + fmt.Fprintf(out, "\n") + hasOutput = true + } + fmt.Fprintf(out, "Volume Snapshot Details:\n") + fmt.Fprintf(out, "%s\n", formattedInfo) + fmt.Fprintf(out, "\n") + } + } + + // 2. Fetch BackupResults + results, err := shared.ProcessDownloadRequest(ctx, kbClient, shared.DownloadRequestOptions{ + BackupName: backupName, + DataType: "BackupResults", + Namespace: userNamespace, + HTTPTimeout: timeout, + }) + + if err == nil && results != "" { + if formattedResults := formatBackupResults(results); formattedResults != "" { + if !hasOutput { + fmt.Fprintf(out, "\n") + hasOutput = true + } + fmt.Fprintf(out, "Backup Results:\n") + fmt.Fprintf(out, "%s\n", formattedResults) + fmt.Fprintf(out, "\n") + } + } + + // 3. Fetch BackupItemOperations + itemOps, err := shared.ProcessDownloadRequest(ctx, kbClient, shared.DownloadRequestOptions{ + BackupName: backupName, + DataType: "BackupItemOperations", + Namespace: userNamespace, + HTTPTimeout: timeout, + }) + + if err == nil && itemOps != "" { + if formattedOps := formatItemOperations(itemOps); formattedOps != "" { + if !hasOutput { + fmt.Fprintf(out, "\n") + } + fmt.Fprintf(out, "Backup Item Operations:\n") + fmt.Fprintf(out, "%s\n", formattedOps) + fmt.Fprintf(out, "\n") + } + } + + return nil +} + +// formatVolumeInfo formats volume snapshot information for display +func formatVolumeInfo(volumeInfo string) string { + if strings.TrimSpace(volumeInfo) == "" { + return "" + } + + // Try to parse as JSON array + var snapshots []interface{} + if err := json.Unmarshal([]byte(volumeInfo), &snapshots); err != nil { + // If parsing fails, fall back to indented output + return indent(volumeInfo, " ") + } + + // If empty array, return empty string (will show "") + if len(snapshots) == 0 { + return "" + } + + // Format as indented JSON for readability + formatted, err := json.MarshalIndent(snapshots, " ", " ") + if err != nil { + return indent(volumeInfo, " ") + } + return indent(string(formatted), " ") +} + +// formatResourceList formats the resource list for display +func formatResourceList(resourceList string) string { + if strings.TrimSpace(resourceList) == "" { + return "" + } + + // Try to parse as JSON map + var resources map[string][]string + if err := json.Unmarshal([]byte(resourceList), &resources); err != nil { + // If parsing fails, fall back to indented output + return indent(resourceList, " ") + } + + // Sort the keys (GroupVersionKind) + keys := make([]string, 0, len(resources)) + for k := range resources { + keys = append(keys, k) + } + sort.Strings(keys) + + // Build formatted output + var output strings.Builder + for _, gvk := range keys { + items := resources[gvk] + output.WriteString(fmt.Sprintf(" %s:\n", gvk)) + for _, item := range items { + output.WriteString(fmt.Sprintf(" - %s\n", item)) + } + } + + return strings.TrimSuffix(output.String(), "\n") +} + +// formatBackupResults formats backup results (errors/warnings) for display +func formatBackupResults(results string) string { + if strings.TrimSpace(results) == "" { + return "" + } + + // Try to parse as JSON object with errors and warnings + var resultsObj struct { + Errors map[string]interface{} `json:"errors"` + Warnings map[string]interface{} `json:"warnings"` + } + if err := json.Unmarshal([]byte(results), &resultsObj); err != nil { + // If parsing fails, fall back to indented output + return indent(results, " ") + } + + // If both are empty, return empty string so section won't be printed + if len(resultsObj.Errors) == 0 && len(resultsObj.Warnings) == 0 { + return "" + } + + // Format nicely + var output strings.Builder + + // Show errors + output.WriteString(" Errors:\n") + if len(resultsObj.Errors) > 0 { + formatted, _ := json.MarshalIndent(resultsObj.Errors, " ", " ") + output.WriteString(indent(string(formatted), " ")) + } else { + output.WriteString(" ") + } + output.WriteString("\n\n") + + // Show warnings + output.WriteString(" Warnings:\n") + if len(resultsObj.Warnings) > 0 { + formatted, _ := json.MarshalIndent(resultsObj.Warnings, " ", " ") + output.WriteString(indent(string(formatted), " ")) + } else { + output.WriteString(" ") + } + + return strings.TrimSuffix(output.String(), "\n") +} + +// formatItemOperations formats backup item operations for display +func formatItemOperations(itemOps string) string { + if strings.TrimSpace(itemOps) == "" { + return "" + } + + // Try to parse as JSON array + var operations []interface{} + if err := json.Unmarshal([]byte(itemOps), &operations); err != nil { + // If parsing fails, fall back to indented output + return indent(itemOps, " ") + } + + // If empty array, return empty string (will show "") + if len(operations) == 0 { + return "" + } + + // Format as indented JSON for readability + formatted, err := json.MarshalIndent(operations, " ", " ") + if err != nil { + return indent(itemOps, " ") + } + return indent(string(formatted), " ") +} + // colorizePhase returns the phase string with ANSI color codes func colorizePhase(phase string) string { const ( diff --git a/cmd/non-admin/backup/logs.go b/cmd/non-admin/backup/logs.go index 850699e..73dad09 100644 --- a/cmd/non-admin/backup/logs.go +++ b/cmd/non-admin/backup/logs.go @@ -25,9 +25,7 @@ import ( "github.com/migtools/oadp-cli/cmd/shared" nacv1alpha1 "github.com/migtools/oadp-non-admin/api/v1alpha1" "github.com/spf13/cobra" - velerov1 "github.com/vmware-tanzu/velero/pkg/apis/velero/v1" "github.com/vmware-tanzu/velero/pkg/client" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" kbclient "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -92,21 +90,32 @@ func NewLogsCommand(f client.Factory, use string) *cobra.Command { return fmt.Errorf("failed to get NonAdminBackup %q: %w", backupName, err) } - req := &nacv1alpha1.NonAdminDownloadRequest{ - ObjectMeta: metav1.ObjectMeta{ - GenerateName: backupName + "-logs-", - Namespace: userNamespace, - }, - Spec: nacv1alpha1.NonAdminDownloadRequestSpec{ - Target: velerov1.DownloadTarget{ - Kind: "BackupLog", - Name: backupName, // Use NonAdminBackup name, controller will resolve to Velero backup - }, + fmt.Fprintf(cmd.OutOrStdout(), "Waiting for backup logs to be processed (timeout: %v)...\n", effectiveTimeout) + + // Create download request and wait for signed URL + req, signedURL, err := shared.CreateAndWaitForDownloadURL(ctx, kbClient, shared.DownloadRequestOptions{ + BackupName: backupName, + DataType: "BackupLog", + Namespace: userNamespace, + Timeout: effectiveTimeout, + PollInterval: 2 * time.Second, + HTTPTimeout: effectiveTimeout, + OnProgress: func() { + fmt.Fprintf(cmd.OutOrStdout(), ".") }, - } + }) - if err := kbClient.Create(ctx, req); err != nil { - return fmt.Errorf("failed to create NonAdminDownloadRequest: %w", err) + if err != nil { + if req != nil { + // Clean up on error + if ctx.Err() == context.DeadlineExceeded { + return shared.FormatDownloadRequestTimeoutError(kbClient, req, effectiveTimeout) + } + deleteCtx, cancelDelete := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelDelete() + _ = kbClient.Delete(deleteCtx, req) + } + return err } // Clean up the download request when done @@ -116,56 +125,7 @@ func NewLogsCommand(f client.Factory, use string) *cobra.Command { _ = kbClient.Delete(deleteCtx, req) }() - fmt.Fprintf(cmd.OutOrStdout(), "Waiting for backup logs to be processed (timeout: %v)...\n", effectiveTimeout) - - // Wait for the download request to be processed - ticker := time.NewTicker(2 * time.Second) - defer ticker.Stop() - - var signedURL string - Loop: - for { - select { - case <-ctx.Done(): - // Check if context was cancelled due to timeout or other reason - if ctx.Err() == context.DeadlineExceeded { - return shared.FormatDownloadRequestTimeoutError(kbClient, req, effectiveTimeout) - } - // Context cancelled for other reason (e.g., user interruption) - return fmt.Errorf("operation cancelled: %w", ctx.Err()) - case <-ticker.C: - fmt.Fprintf(cmd.OutOrStdout(), ".") - var updated nacv1alpha1.NonAdminDownloadRequest - if err := kbClient.Get(ctx, kbclient.ObjectKey{ - Namespace: req.Namespace, - Name: req.Name, - }, &updated); err != nil { - // If context expired during Get, handle it in next iteration - if ctx.Err() != nil { - continue - } - return fmt.Errorf("failed to get NonAdminDownloadRequest: %w", err) - } - - // Check if the download request was processed successfully - for _, condition := range updated.Status.Conditions { - if condition.Type == "Processed" && condition.Status == "True" { - if updated.Status.VeleroDownloadRequest.Status.DownloadURL != "" { - signedURL = updated.Status.VeleroDownloadRequest.Status.DownloadURL - fmt.Fprintf(cmd.OutOrStdout(), "\nDownload URL received, fetching logs...\n") - break Loop - } - } - } - - // Check for failure conditions - for _, condition := range updated.Status.Conditions { - if condition.Status == "True" && condition.Reason == "Error" { - return fmt.Errorf("NonAdminDownloadRequest failed: %s - %s", condition.Type, condition.Message) - } - } - } - } + fmt.Fprintf(cmd.OutOrStdout(), "\nDownload URL received, fetching logs...\n") // Use the shared StreamDownloadContent function to download and stream logs // Note: We use the same effective timeout for the HTTP download diff --git a/cmd/root.go b/cmd/root.go index 56e046e..fb82fce 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -208,11 +208,14 @@ func replaceVeleroWithOADP(cmd *cobra.Command) *cobra.Command { // Replace in multiple command fields using context-aware replacement cmd.Example = replaceVeleroCommandWithOADP(cmd.Example) - // Skip wrapping logs commands to allow real-time streaming without buffering + // Skip wrapping commands that stream output or make long-running requests + // to allow real-time display without buffering isLogsCommand := cmd.Use == "logs" || strings.HasPrefix(cmd.Use, "logs ") + isDescribeCommand := cmd.Use == "describe" || strings.HasPrefix(cmd.Use, "describe ") + shouldSkipWrapper := isLogsCommand || isDescribeCommand // Wrap the Run function to replace velero in output - if cmd.Run != nil && !isLogsCommand { + if cmd.Run != nil && !shouldSkipWrapper { originalRun := cmd.Run cmd.Run = func(c *cobra.Command, args []string) { // Capture stdout temporarily @@ -239,7 +242,7 @@ func replaceVeleroWithOADP(cmd *cobra.Command) *cobra.Command { } // Wrap the RunE function to replace velero in output - if cmd.RunE != nil && !isLogsCommand { + if cmd.RunE != nil && !shouldSkipWrapper { originalRunE := cmd.RunE cmd.RunE = func(c *cobra.Command, args []string) error { // Capture stdout temporarily diff --git a/cmd/root_test.go b/cmd/root_test.go index cb0fb9b..aba87a9 100644 --- a/cmd/root_test.go +++ b/cmd/root_test.go @@ -591,8 +591,8 @@ func TestApplyTimeoutToConfig(t *testing.T) { } } -// TestReplaceVeleroWithOADP_LogsCommandNotWrapped tests that logs commands are never wrapped -func TestReplaceVeleroWithOADP_LogsCommandNotWrapped(t *testing.T) { +// TestReplaceVeleroWithOADP_OutputWrapperExclusions tests that certain commands are excluded from output wrapping +func TestReplaceVeleroWithOADP_OutputWrapperExclusions(t *testing.T) { tests := []struct { name string use string @@ -616,7 +616,12 @@ func TestReplaceVeleroWithOADP_LogsCommandNotWrapped(t *testing.T) { { name: "describe command", use: "describe", - shouldWrap: true, + shouldWrap: false, + }, + { + name: "describe with args", + use: "describe NAME", + shouldWrap: false, }, { name: "create command", @@ -677,16 +682,16 @@ func TestReplaceVeleroWithOADP_LogsCommandNotWrapped(t *testing.T) { t.Errorf("Wrapped command output should have 'velero' replaced, got: %s", output) } } else { - // Logs commands should NOT have output replaced + // Excluded commands should NOT have output replaced if !strings.Contains(output, "velero backup create") { - t.Errorf("Logs command output should NOT be modified, got: %s", output) + t.Errorf("Excluded command output should NOT be modified, got: %s", output) } } }) } // Test with RunE function - t.Run("logs_command_runE_not_wrapped", func(t *testing.T) { + t.Run("excluded_command_runE_not_wrapped", func(t *testing.T) { runECalled := false cmd := &cobra.Command{ Use: "logs", @@ -700,10 +705,10 @@ func TestReplaceVeleroWithOADP_LogsCommandNotWrapped(t *testing.T) { originalRunE := cmd.RunE replaceVeleroWithOADP(cmd) - // Logs command should not be wrapped + // Excluded command should not be wrapped isWrapped := fmt.Sprintf("%p", originalRunE) != fmt.Sprintf("%p", cmd.RunE) if isWrapped { - t.Error("Expected logs command RunE NOT to be wrapped, but it was") + t.Error("Expected excluded command RunE NOT to be wrapped, but it was") } // Verify output is not modified @@ -728,9 +733,9 @@ func TestReplaceVeleroWithOADP_LogsCommandNotWrapped(t *testing.T) { } output := buf.String() - // Logs command output should NOT be modified + // Excluded command output should NOT be modified if !strings.Contains(output, "velero backup logs") { - t.Errorf("Logs command output should NOT be modified, got: %s", output) + t.Errorf("Excluded command output should NOT be modified, got: %s", output) } }) } diff --git a/cmd/shared/download.go b/cmd/shared/download.go index c17de81..5e5a75c 100644 --- a/cmd/shared/download.go +++ b/cmd/shared/download.go @@ -17,11 +17,11 @@ limitations under the License. package shared import ( + "bufio" "compress/gzip" "context" "fmt" "io" - "log" "net/http" "os" "strings" @@ -53,17 +53,14 @@ func getHTTPTimeout() time.Duration { func GetHTTPTimeoutWithOverride(override time.Duration) time.Duration { // If an explicit override is provided (e.g., from --timeout flag), use it if override > 0 { - log.Printf("Using HTTP timeout from command-line flag: %v", override) return override } // Check for environment variable if envTimeout := os.Getenv(TimeoutEnvVar); envTimeout != "" { if parsed, err := time.ParseDuration(envTimeout); err == nil { - log.Printf("Using custom HTTP timeout from %s: %v", TimeoutEnvVar, parsed) return parsed } - log.Printf("Warning: Invalid duration in %s=%q, using default %v", TimeoutEnvVar, envTimeout, DefaultHTTPTimeout) } return DefaultHTTPTimeout @@ -92,12 +89,14 @@ type DownloadRequestOptions struct { // HTTPTimeout is the timeout for downloading content from the signed URL. // If zero, uses the default timeout (env var or DefaultHTTPTimeout). HTTPTimeout time.Duration + // OnProgress is an optional callback called on each polling iteration + OnProgress func() } -// ProcessDownloadRequest creates a NonAdminDownloadRequest, waits for it to be processed, -// downloads the content from the signed URL, and returns it as a string. -// This function automatically cleans up the download request when done. -func ProcessDownloadRequest(ctx context.Context, kbClient kbclient.Client, opts DownloadRequestOptions) (string, error) { +// CreateAndWaitForDownloadURL creates a NonAdminDownloadRequest, waits for it to be processed, +// and returns the signed URL. The request is NOT automatically cleaned up - caller is responsible. +// This is a lower-level function that allows callers to control cleanup timing. +func CreateAndWaitForDownloadURL(ctx context.Context, kbClient kbclient.Client, opts DownloadRequestOptions) (*nacv1alpha1.NonAdminDownloadRequest, string, error) { // Set defaults if opts.Timeout == 0 { opts.Timeout = 120 * time.Second @@ -121,7 +120,31 @@ func ProcessDownloadRequest(ctx context.Context, kbClient kbclient.Client, opts } if err := kbClient.Create(ctx, req); err != nil { - return "", fmt.Errorf("failed to create NonAdminDownloadRequest for %s: %w", opts.DataType, err) + return nil, "", fmt.Errorf("failed to create NonAdminDownloadRequest for %s: %w", opts.DataType, err) + } + + // Wait for the download request to be processed + signedURL, err := waitForDownloadURL(ctx, kbClient, req, opts.Timeout, opts.PollInterval, opts.OnProgress) + if err != nil { + return req, "", err + } + + return req, signedURL, nil +} + +// ProcessDownloadRequest creates a NonAdminDownloadRequest, waits for it to be processed, +// downloads the content from the signed URL, and returns it as a string. +// This function automatically cleans up the download request when done. +func ProcessDownloadRequest(ctx context.Context, kbClient kbclient.Client, opts DownloadRequestOptions) (string, error) { + req, signedURL, err := CreateAndWaitForDownloadURL(ctx, kbClient, opts) + if err != nil { + if req != nil { + // Clean up on error + deleteCtx, cancelDelete := context.WithTimeout(context.Background(), 5*time.Second) + defer cancelDelete() + _ = kbClient.Delete(deleteCtx, req) + } + return "", err } // Clean up the download request when done @@ -131,19 +154,18 @@ func ProcessDownloadRequest(ctx context.Context, kbClient kbclient.Client, opts _ = kbClient.Delete(deleteCtx, req) }() - // Wait for the download request to be processed - signedURL, err := waitForDownloadURL(ctx, kbClient, req, opts.Timeout, opts.PollInterval) + // Download and return the content using the specified HTTP timeout + httpTimeout := GetHTTPTimeoutWithOverride(opts.HTTPTimeout) + content, err := DownloadContentWithTimeout(signedURL, httpTimeout) if err != nil { return "", err } - - // Download and return the content using the specified HTTP timeout - httpTimeout := GetHTTPTimeoutWithOverride(opts.HTTPTimeout) - return DownloadContentWithTimeout(signedURL, httpTimeout) + return content, nil } -// waitForDownloadURL waits for a NonAdminDownloadRequest to be processed and returns the signed URL -func waitForDownloadURL(ctx context.Context, kbClient kbclient.Client, req *nacv1alpha1.NonAdminDownloadRequest, timeout, pollInterval time.Duration) (string, error) { +// waitForDownloadURL waits for a NonAdminDownloadRequest to be processed and returns the signed URL. +// If onProgress is provided, it will be called on each polling iteration. +func waitForDownloadURL(ctx context.Context, kbClient kbclient.Client, req *nacv1alpha1.NonAdminDownloadRequest, timeout, pollInterval time.Duration, onProgress func()) (string, error) { timeoutCtx, cancel := context.WithTimeout(ctx, timeout) defer cancel() @@ -155,11 +177,19 @@ func waitForDownloadURL(ctx context.Context, kbClient kbclient.Client, req *nacv case <-timeoutCtx.Done(): return "", fmt.Errorf("timed out waiting for NonAdminDownloadRequest to be processed") case <-ticker.C: + if onProgress != nil { + onProgress() + } + var updated nacv1alpha1.NonAdminDownloadRequest if err := kbClient.Get(ctx, kbclient.ObjectKey{ Namespace: req.Namespace, Name: req.Name, }, &updated); err != nil { + // If context expired during Get, let next iteration handle it + if ctx.Err() != nil { + continue + } return "", fmt.Errorf("failed to get NonAdminDownloadRequest: %w", err) } @@ -193,6 +223,7 @@ func DownloadContent(url string) (string, error) { // It handles both gzipped and non-gzipped content automatically. func DownloadContentWithTimeout(url string, timeout time.Duration) (string, error) { client := httpClientWithTimeout(timeout) + resp, err := client.Get(url) if err != nil { return "", fmt.Errorf("failed to download content from URL %q: %w", url, err) @@ -204,10 +235,27 @@ func DownloadContentWithTimeout(url string, timeout time.Duration) (string, erro return "", fmt.Errorf("failed to download content: status %s, body: %s", resp.Status, string(bodyBytes)) } - // Try to decompress if it's gzipped - var reader io.Reader = resp.Body - if strings.Contains(resp.Header.Get("Content-Encoding"), "gzip") { - gzr, err := gzip.NewReader(resp.Body) + // Use a buffered reader to peek at the content and detect gzip format + bufReader := bufio.NewReader(resp.Body) + var reader io.Reader = bufReader + + // Check if content is gzipped by: + // 1. Content-Encoding header (HTTP-level compression) + // 2. Magic bytes 0x1f 0x8b at start (file-level gzip) + // Object storage signed URLs often serve .gz files without Content-Encoding header, + // so we need to detect gzip by inspecting the actual file content. + isGzipped := strings.Contains(resp.Header.Get("Content-Encoding"), "gzip") + + if !isGzipped { + // Peek at first 2 bytes to check for gzip magic bytes (0x1f 0x8b) + magicBytes, err := bufReader.Peek(2) + if err == nil && len(magicBytes) == 2 && magicBytes[0] == 0x1f && magicBytes[1] == 0x8b { + isGzipped = true + } + } + + if isGzipped { + gzr, err := gzip.NewReader(bufReader) if err != nil { return "", fmt.Errorf("failed to create gzip reader: %w", err) } @@ -246,10 +294,27 @@ func StreamDownloadContentWithTimeout(url string, writer io.Writer, timeout time return fmt.Errorf("failed to download content: status %s, body: %s", resp.Status, string(bodyBytes)) } - // Try to decompress if it's gzipped - var reader io.Reader = resp.Body - if strings.Contains(resp.Header.Get("Content-Encoding"), "gzip") { - gzr, err := gzip.NewReader(resp.Body) + // Use a buffered reader to peek at the content and detect gzip format + bufReader := bufio.NewReader(resp.Body) + var reader io.Reader = bufReader + + // Check if content is gzipped by: + // 1. Content-Encoding header (HTTP-level compression) + // 2. Magic bytes 0x1f 0x8b at start (file-level gzip) + // Object storage signed URLs often serve .gz files without Content-Encoding header, + // so we need to detect gzip by inspecting the actual file content. + isGzipped := strings.Contains(resp.Header.Get("Content-Encoding"), "gzip") + + if !isGzipped { + // Peek at first 2 bytes to check for gzip magic bytes (0x1f 0x8b) + magicBytes, err := bufReader.Peek(2) + if err == nil && len(magicBytes) == 2 && magicBytes[0] == 0x1f && magicBytes[1] == 0x8b { + isGzipped = true + } + } + + if isGzipped { + gzr, err := gzip.NewReader(bufReader) if err != nil { return fmt.Errorf("failed to create gzip reader: %w", err) }