From 30278ae536ec941d57c19fce722c283d778cd5d5 Mon Sep 17 00:00:00 2001 From: Joe Corall Date: Sun, 22 Mar 2026 17:03:30 -0400 Subject: [PATCH] [minor] add a job SDK --- cmd/cron.go | 469 +++++++++++++++++++++++++++++++ cmd/cron_test.go | 56 ++++ cmd/job.go | 250 ++++++++++++++++ pkg/config/config.go | 5 +- pkg/config/context.go | 34 +-- pkg/config/cron.go | 74 +++++ pkg/config/file_accessor.go | 58 ++++ pkg/config/files.go | 11 + pkg/config/local_context.go | 24 +- pkg/config/utils.go | 3 + pkg/config/validation_helpers.go | 16 ++ pkg/cron/spec.go | 135 +++++++++ pkg/cron/spec_test.go | 57 ++++ pkg/docker/docker.go | 62 +++- pkg/job/artifact.go | 72 +++++ pkg/job/artifact_test.go | 62 ++++ pkg/job/spec.go | 18 ++ pkg/job/sync.go | 83 ++++++ pkg/job/transfer.go | 80 ++++++ pkg/plugin/jobs.go | 95 +++++++ pkg/plugin/progress.go | 88 ++++++ pkg/plugin/sdk.go | 2 + 22 files changed, 1721 insertions(+), 33 deletions(-) create mode 100644 cmd/cron.go create mode 100644 cmd/cron_test.go create mode 100644 cmd/job.go create mode 100644 pkg/config/cron.go create mode 100644 pkg/cron/spec.go create mode 100644 pkg/cron/spec_test.go create mode 100644 pkg/job/artifact.go create mode 100644 pkg/job/artifact_test.go create mode 100644 pkg/job/spec.go create mode 100644 pkg/job/sync.go create mode 100644 pkg/job/transfer.go create mode 100644 pkg/plugin/jobs.go create mode 100644 pkg/plugin/progress.go diff --git a/cmd/cron.go b/cmd/cron.go new file mode 100644 index 0000000..a9d5dca --- /dev/null +++ b/cmd/cron.go @@ -0,0 +1,469 @@ +package cmd + +import ( + "bytes" + "fmt" + "os" + "os/exec" + "path/filepath" + "sort" + "strconv" + "strings" + "text/tabwriter" + "time" + + "github.com/libops/sitectl/pkg/config" + corecron "github.com/libops/sitectl/pkg/cron" + "github.com/libops/sitectl/pkg/plugin" + "github.com/spf13/cobra" + yaml "gopkg.in/yaml.v3" +) + +var ( + cronSpecContext string + cronSpecSchedule string + cronSpecOutputDir string + cronSpecComponents []string + cronSpecRetentionDays int + cronSpecPreserveFirstOfMonth bool + cronSpecDockerPrune bool +) + +var cronCmd = &cobra.Command{ + Use: "cron", + Short: "Manage scheduled cron-style jobs", +} + +var cronAddCmd = &cobra.Command{ + Use: "add NAME", + Args: cobra.ExactArgs(1), + Short: "Create or update a cron spec", + RunE: func(cmd *cobra.Command, args []string) error { + name := strings.TrimSpace(args[0]) + if name == "" { + return fmt.Errorf("cron spec name is required") + } + if strings.TrimSpace(cronSpecContext) == "" { + return fmt.Errorf("--context is required") + } + if strings.TrimSpace(cronSpecSchedule) == "" { + return fmt.Errorf("--schedule is required") + } + if strings.TrimSpace(cronSpecOutputDir) == "" { + return fmt.Errorf("--output-dir is required") + } + + ctx, err := config.GetContext(cronSpecContext) + if err != nil { + return fmt.Errorf("load context %q: %w", cronSpecContext, err) + } + components, err := normalizeCronComponents(ctx, cronSpecComponents) + if err != nil { + return err + } + + spec := config.CronSpec{ + Name: name, + Context: ctx.Name, + Schedule: cronSpecSchedule, + OutputDir: cronSpecOutputDir, + Components: components, + RetentionDays: cronSpecRetentionDays, + PreserveFirstOfMonth: cronSpecPreserveFirstOfMonth, + DockerPrune: cronSpecDockerPrune, + } + if spec.RetentionDays < 0 { + return fmt.Errorf("--retention-days must be >= 0") + } + if err := config.SaveCronSpec(spec); err != nil { + return err + } + fmt.Fprintf(cmd.OutOrStdout(), "Saved cron spec %q for context %q\n", spec.Name, spec.Context) + return nil + }, +} + +var cronListCmd = &cobra.Command{ + Use: "list", + Short: "List configured cron specs", + RunE: func(cmd *cobra.Command, args []string) error { + cfg, err := config.Load() + if err != nil { + return err + } + if len(cfg.CronSpecs) == 0 { + fmt.Fprintln(cmd.OutOrStdout(), "No cron specs configured") + return nil + } + + w := tabwriter.NewWriter(cmd.OutOrStdout(), 0, 0, 2, ' ', 0) + fmt.Fprintln(w, "NAME\tCONTEXT\tSCHEDULE\tOUTPUT\tCOMPONENTS\tRETENTION") + for _, spec := range cfg.CronSpecs { + retention := "-" + if spec.RetentionDays > 0 { + retention = strconv.Itoa(spec.RetentionDays) + "d" + } + fmt.Fprintf(w, "%s\t%s\t%s\t%s\t%s\t%s\n", + spec.Name, + spec.Context, + spec.Schedule, + spec.OutputDir, + strings.Join(spec.Components, ", "), + retention, + ) + } + return w.Flush() + }, +} + +var cronDeleteCmd = &cobra.Command{ + Use: "delete NAME", + Args: cobra.ExactArgs(1), + Short: "Delete a cron spec", + RunE: func(cmd *cobra.Command, args []string) error { + if err := config.DeleteCronSpec(args[0]); err != nil { + return err + } + fmt.Fprintf(cmd.OutOrStdout(), "Deleted cron spec %q\n", args[0]) + return nil + }, +} + +var cronComponentsCmd = &cobra.Command{ + Use: "components", + Short: "List available cron components for the active or selected context", + RunE: func(cmd *cobra.Command, args []string) error { + contextName, err := config.ResolveCurrentContextName(cmd.Flags()) + if err != nil { + return err + } + ctx, err := config.GetContext(contextName) + if err != nil { + return err + } + specs, err := availableCronComponents(ctx) + if err != nil { + return err + } + if len(specs) == 0 { + fmt.Fprintf(cmd.OutOrStdout(), "No cron components available for context %q\n", ctx.Name) + return nil + } + w := tabwriter.NewWriter(cmd.OutOrStdout(), 0, 0, 2, ' ', 0) + fmt.Fprintln(w, "NAME\tPLUGIN\tFILENAME\tDESCRIPTION") + for _, spec := range specs { + fmt.Fprintf(w, "%s\t%s\t%s\t%s\n", spec.Name, spec.Plugin, spec.Filename, spec.Description) + } + return w.Flush() + }, +} + +var cronRunCmd = &cobra.Command{ + Use: "run NAME", + Args: cobra.ExactArgs(1), + Short: "Run a configured cron spec now", + RunE: func(cmd *cobra.Command, args []string) error { + spec, err := config.GetCronSpec(args[0]) + if err != nil { + return err + } + ctx, err := config.GetContext(spec.Context) + if err != nil { + return fmt.Errorf("load context %q: %w", spec.Context, err) + } + return runCronSpec(cmd, spec, ctx) + }, +} + +var cronRenderSystemdCmd = &cobra.Command{ + Use: "render-systemd NAME", + Args: cobra.ExactArgs(1), + Short: "Render setup instructions and systemd units for a cron spec", + RunE: func(cmd *cobra.Command, args []string) error { + spec, err := config.GetCronSpec(args[0]) + if err != nil { + return err + } + ctx, err := config.GetContext(spec.Context) + if err != nil { + return fmt.Errorf("load context %q: %w", spec.Context, err) + } + + serviceName, timerName := cronUnitNames(spec.Name) + serviceUnit, timerUnit, err := renderCronSystemdUnits(spec) + if err != nil { + return err + } + fmt.Fprint(cmd.OutOrStdout(), renderCronSystemdInstructions(spec, ctx, serviceName, timerName, serviceUnit, timerUnit)) + return nil + }, +} + +var cronInstalledCmd = &cobra.Command{ + Use: "installed", + Short: "List installed sitectl systemd cron units on a context host", + RunE: func(cmd *cobra.Command, args []string) error { + contextName, err := config.ResolveCurrentContextName(cmd.Flags()) + if err != nil { + return err + } + ctx, err := config.GetContext(contextName) + if err != nil { + return err + } + units, err := listInstalledCronUnits(&ctx) + if err != nil { + return err + } + if len(units) == 0 { + fmt.Fprintf(cmd.OutOrStdout(), "No sitectl systemd cron units found on context %q\n", ctx.Name) + return nil + } + for _, unit := range units { + fmt.Fprintln(cmd.OutOrStdout(), unit) + } + return nil + }, +} + +func runCronSpec(cmd *cobra.Command, spec config.CronSpec, ctx config.Context) error { + available, err := availableCronComponents(ctx) + if err != nil { + return err + } + selected, err := selectCronComponents(spec.Components, available) + if err != nil { + return err + } + + now := time.Now().UTC() + outDir, err := corecron.EnsureDatedDestination(spec.OutputDir, now) + if err != nil { + return err + } + + for _, component := range selected { + outputPath := filepath.Join(outDir, component.Filename) + if err := runCronComponent(cmd, ctx, component, outputPath); err != nil { + return fmt.Errorf("run cron component %q: %w", component.Name, err) + } + } + + if err := corecron.PruneArtifacts(spec.OutputDir, now, spec.RetentionDays, spec.PreserveFirstOfMonth); err != nil { + return err + } + if spec.DockerPrune { + if _, err := ctx.RunQuietCommandContext(cmd.Context(), exec.Command("docker", "system", "prune", "-af")); err != nil { + return fmt.Errorf("docker system prune: %w", err) + } + } + + fmt.Fprintf(cmd.OutOrStdout(), "Cron job %q wrote artifacts under %s\n", spec.Name, outDir) + return nil +} + +func runCronComponent(cmd *cobra.Command, ctx config.Context, component corecron.ComponentSpec, outputPath string) error { + _, err := pluginSDK.InvokePluginCommand(component.Plugin, []string{ + "--context", ctx.Name, + "__cron", "run", + "--component", component.Name, + "--output", outputPath, + }, plugin.CommandExecOptions{ + Context: RootCmd.Context(), + Stdin: RootCmd.InOrStdin(), + Stdout: RootCmd.OutOrStdout(), + Stderr: RootCmd.ErrOrStderr(), + }) + return err +} + +func renderCronSystemdUnits(spec config.CronSpec) (string, string, error) { + exe, err := os.Executable() + if err != nil { + return "", "", fmt.Errorf("resolve sitectl path: %w", err) + } + + execLine := strings.Join([]string{exe, "--context", spec.Context, "cron", "run", spec.Name}, " ") + serviceName, _ := cronUnitNames(spec.Name) + var service bytes.Buffer + fmt.Fprintf(&service, "[Unit]\nDescription=Sitectl cron job for %s\nAfter=network-online.target\nWants=network-online.target\n\n", spec.Name) + fmt.Fprintf(&service, "[Service]\nType=oneshot\nExecStart=%s\n", execLine) + + var timer bytes.Buffer + fmt.Fprintf(&timer, "[Unit]\nDescription=Schedule sitectl cron job for %s\n\n", spec.Name) + fmt.Fprintf(&timer, "[Timer]\nOnCalendar=%s\nPersistent=true\nUnit=%s\n\n", spec.Schedule, serviceName) + fmt.Fprintf(&timer, "[Install]\nWantedBy=timers.target\n") + + return service.String(), timer.String(), nil +} + +func renderCronSystemdInstructions(spec config.CronSpec, ctx config.Context, serviceName, timerName, serviceUnit, timerUnit string) string { + var out bytes.Buffer + fmt.Fprintf(&out, "Cron job: %s\nContext: %s\nHost type: %s\n\n", spec.Name, ctx.Name, ctx.DockerHostType) + fmt.Fprintln(&out, "Prerequisites:") + fmt.Fprintln(&out, "- `sitectl` must be installed on the host and available on `$PATH`.") + fmt.Fprintln(&out, "- The host must be able to run the selected context.") + fmt.Fprintln(&out) + if ctx.DockerHostType == config.ContextLocal { + fmt.Fprintln(&out, "Install on the local host with:") + } else { + fmt.Fprintln(&out, "Remote manual setup is required for now.") + fmt.Fprintln(&out, "Until sitectl supports remote sudo actions, SSH to the target host and install these units there manually.") + fmt.Fprintln(&out, "Tracking: https://github.com/libops/sitectl/issues") + fmt.Fprintln(&out) + fmt.Fprintln(&out, "Run on the remote host with:") + } + fmt.Fprintf(&out, "sudo tee /etc/systemd/system/%s >/dev/null <<'EOF'\n%sEOF\n\n", serviceName, serviceUnit) + fmt.Fprintf(&out, "sudo tee /etc/systemd/system/%s >/dev/null <<'EOF'\n%sEOF\n\n", timerName, timerUnit) + fmt.Fprintf(&out, "sudo systemctl daemon-reload\nsudo systemctl enable --now %s\n", timerName) + return out.String() +} + +func cronUnitNames(name string) (string, string) { + base := "sitectl-" + strings.TrimSpace(name) + return base + ".service", base + ".timer" +} + +func listInstalledCronUnits(ctx *config.Context) ([]string, error) { + accessor, err := config.NewFileAccessor(ctx) + if err != nil { + return nil, err + } + defer accessor.Close() + + files, err := accessor.ListFiles("/etc/systemd/system") + if err != nil { + return nil, err + } + units := make([]string, 0, len(files)) + seen := map[string]bool{} + for _, rel := range files { + name := filepath.Base(rel) + if !strings.HasPrefix(name, "sitectl-") { + continue + } + if !strings.HasSuffix(name, ".service") && !strings.HasSuffix(name, ".timer") { + continue + } + if seen[name] { + continue + } + seen[name] = true + units = append(units, name) + } + sort.Strings(units) + return units, nil +} + +func normalizeCronComponents(ctx config.Context, requested []string) ([]string, error) { + available, err := availableCronComponents(ctx) + if err != nil { + return nil, err + } + if len(requested) == 0 { + names := make([]string, 0, len(available)) + for _, spec := range available { + names = append(names, spec.Name) + } + return names, nil + } + selected, err := selectCronComponents(requested, available) + if err != nil { + return nil, err + } + names := make([]string, 0, len(selected)) + for _, spec := range selected { + names = append(names, spec.Name) + } + return names, nil +} + +func selectCronComponents(names []string, available []corecron.ComponentSpec) ([]corecron.ComponentSpec, error) { + selected := make([]corecron.ComponentSpec, 0, len(names)) + seen := map[string]bool{} + for _, name := range names { + spec, ok := corecron.FindComponent(available, name) + if !ok { + return nil, fmt.Errorf("unknown cron component %q", name) + } + if seen[spec.Name] { + continue + } + seen[spec.Name] = true + selected = append(selected, spec) + } + return selected, nil +} + +func availableCronComponents(ctx config.Context) ([]corecron.ComponentSpec, error) { + owners := cronPluginsForContext(ctx.Plugin) + var combined []corecron.ComponentSpec + for _, owner := range owners { + output, err := pluginSDK.InvokePluginCommand(owner, []string{"--context", ctx.Name, "__cron", "components"}, plugin.CommandExecOptions{ + Context: RootCmd.Context(), + Capture: true, + }) + if err != nil { + continue + } + var specs []corecron.ComponentSpec + if err := yaml.Unmarshal([]byte(output), &specs); err != nil { + return nil, fmt.Errorf("parse cron components from plugin %q: %w", owner, err) + } + for i := range specs { + if strings.TrimSpace(specs[i].Plugin) == "" { + specs[i].Plugin = owner + } + } + if err := corecron.ValidateComponents(specs); err != nil { + return nil, fmt.Errorf("validate cron components from plugin %q: %w", owner, err) + } + combined = append(combined, specs...) + } + corecron.SortComponents(combined) + return combined, nil +} + +func cronPluginsForContext(root string) []string { + if strings.TrimSpace(root) == "" { + return nil + } + seen := map[string]bool{} + var ordered []string + var walk func(string) + walk = func(name string) { + if name == "" || seen[name] { + return + } + seen[name] = true + ordered = append(ordered, name) + installed, ok := plugin.FindInstalled(name) + if !ok { + return + } + for _, include := range installed.Includes { + walk(include) + } + } + walk(root) + return ordered +} + +func init() { + cronAddCmd.Flags().StringVar(&cronSpecContext, "context", "", "Context to run the cron job on") + cronAddCmd.Flags().StringVar(&cronSpecSchedule, "schedule", "", "systemd OnCalendar value, for example daily or *-*-* 03:00:00") + cronAddCmd.Flags().StringVar(&cronSpecOutputDir, "output-dir", "", "Host directory where dated outputs are stored") + cronAddCmd.Flags().StringSliceVar(&cronSpecComponents, "component", nil, "Cron component to include; repeat to select multiple components") + cronAddCmd.Flags().IntVar(&cronSpecRetentionDays, "retention-days", 14, "Delete non-monthly artifacts older than this many days") + cronAddCmd.Flags().BoolVar(&cronSpecPreserveFirstOfMonth, "preserve-first-of-month", true, "Keep dated artifacts created on day 01 when pruning") + cronAddCmd.Flags().BoolVar(&cronSpecDockerPrune, "docker-prune", false, "Run docker system prune -af after a successful cron run") + + cronCmd.AddCommand(cronAddCmd) + cronCmd.AddCommand(cronListCmd) + cronCmd.AddCommand(cronDeleteCmd) + cronCmd.AddCommand(cronComponentsCmd) + cronCmd.AddCommand(cronRunCmd) + cronCmd.AddCommand(cronInstalledCmd) + cronCmd.AddCommand(cronRenderSystemdCmd) + RootCmd.AddCommand(cronCmd) +} diff --git a/cmd/cron_test.go b/cmd/cron_test.go new file mode 100644 index 0000000..904c51c --- /dev/null +++ b/cmd/cron_test.go @@ -0,0 +1,56 @@ +package cmd + +import ( + "strings" + "testing" + + "github.com/libops/sitectl/pkg/config" +) + +func TestRenderCronSystemdUnits(t *testing.T) { + spec := config.CronSpec{ + Name: "nightly", + Context: "prod", + Schedule: "daily", + OutputDir: "/opt/islandora/backups", + Components: []string{"drupal-db-backup", "fcrepo-db-backup"}, + } + + service, timer, err := renderCronSystemdUnits(spec) + if err != nil { + t.Fatalf("renderCronSystemdUnits() error = %v", err) + } + + if !strings.Contains(service, "cron run nightly") { + t.Fatalf("service unit missing cron run command: %q", service) + } + if !strings.Contains(timer, "Unit=sitectl-nightly.service") { + t.Fatalf("timer unit missing sitectl unit target: %q", timer) + } + if !strings.Contains(timer, "OnCalendar=daily") { + t.Fatalf("timer unit missing schedule: %q", timer) + } +} + +func TestRenderCronSystemdInstructions(t *testing.T) { + spec := config.CronSpec{Name: "nightly", Context: "prod", Schedule: "daily"} + ctx := config.Context{Name: "prod", DockerHostType: config.ContextRemote} + + serviceUnit, timerUnit, err := renderCronSystemdUnits(spec) + if err != nil { + t.Fatalf("renderCronSystemdUnits() error = %v", err) + } + + instructions := renderCronSystemdInstructions(spec, ctx, "sitectl-nightly.service", "sitectl-nightly.timer", serviceUnit, timerUnit) + wantParts := []string{ + "Remote manual setup is required for now.", + "https://github.com/libops/sitectl/issues", + "/etc/systemd/system/sitectl-nightly.service", + "sudo systemctl enable --now sitectl-nightly.timer", + } + for _, part := range wantParts { + if !strings.Contains(instructions, part) { + t.Fatalf("renderCronSystemdInstructions() missing %q in %q", part, instructions) + } + } +} diff --git a/cmd/job.go b/cmd/job.go new file mode 100644 index 0000000..b8e8ca5 --- /dev/null +++ b/cmd/job.go @@ -0,0 +1,250 @@ +package cmd + +import ( + "errors" + "fmt" + "os" + "sort" + "strings" + "text/tabwriter" + + "github.com/libops/sitectl/pkg/config" + corejob "github.com/libops/sitectl/pkg/job" + "github.com/libops/sitectl/pkg/plugin" + "github.com/spf13/cobra" + "golang.org/x/term" + yaml "gopkg.in/yaml.v3" +) + +var jobCmd = &cobra.Command{ + Use: "job", + Short: "List and run plugin-defined jobs", +} + +var jobListCmd = &cobra.Command{ + Use: "list", + Short: "List available jobs for the active or selected context", + RunE: func(cmd *cobra.Command, args []string) error { + contextName, err := config.ResolveCurrentContextName(cmd.Flags()) + if err != nil { + return err + } + ctx, err := config.GetContext(contextName) + if err != nil { + return err + } + jobs, err := availableJobs(ctx) + if err != nil { + return err + } + if len(jobs) == 0 { + fmt.Fprintf(cmd.OutOrStdout(), "No jobs available for context %q\n", ctx.Name) + return nil + } + w := tabwriter.NewWriter(cmd.OutOrStdout(), 0, 0, 2, ' ', 0) + fmt.Fprintln(w, "NAME\tPLUGIN\tDESCRIPTION") + for _, spec := range jobs { + fmt.Fprintf(w, "%s\t%s\t%s\n", spec.Name, spec.Plugin, spec.Description) + } + return w.Flush() + }, +} + +var jobExecCmd = &cobra.Command{ + Use: "run JOB [args...]", + Short: "Run a plugin-defined job", + DisableFlagParsing: true, + Args: cobra.MinimumNArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + filteredArgs, contextName, err := pluginContextArgs(args) + if err != nil { + return err + } + if len(filteredArgs) == 0 { + return fmt.Errorf("job name is required") + } + jobName := filteredArgs[0] + jobArgs := filteredArgs[1:] + + ctx, err := config.GetContext(contextName) + if err != nil { + return err + } + owner, name, err := resolveJobOwner(ctx, jobName) + if err != nil { + return err + } + if requestsHelp(jobArgs) { + return renderJobHelp(cmd, ctx, owner, name, jobArgs) + } + output, err := runJobCommand(cmd, contextName, owner, name, jobArgs) + if err != nil { + return cleanPluginCommandError(err) + } + if strings.TrimSpace(output) != "" { + _, _ = fmt.Fprint(cmd.OutOrStdout(), output) + } + return nil + }, +} + +func availableJobs(ctx config.Context) ([]corejob.Spec, error) { + owners := cronPluginsForContext(ctx.Plugin) + var combined []corejob.Spec + for _, owner := range owners { + output, err := pluginSDK.InvokePluginCommand(owner, []string{"--context", ctx.Name, "__job", "list"}, plugin.CommandExecOptions{ + Context: RootCmd.Context(), + Capture: true, + }) + if err != nil { + continue + } + var specs []corejob.Spec + if err := yaml.Unmarshal([]byte(output), &specs); err != nil { + return nil, fmt.Errorf("parse jobs from plugin %q: %w", owner, err) + } + for i := range specs { + if strings.TrimSpace(specs[i].Plugin) == "" { + specs[i].Plugin = owner + } + } + combined = append(combined, specs...) + } + sort.Slice(combined, func(i, j int) bool { + if combined[i].Plugin != combined[j].Plugin { + return combined[i].Plugin < combined[j].Plugin + } + return combined[i].Name < combined[j].Name + }) + return combined, nil +} + +func resolveJobOwner(ctx config.Context, raw string) (string, string, error) { + name := strings.TrimSpace(raw) + if pluginName, jobName, ok := splitNamespacedComponent(name); ok { + return pluginName, jobName, nil + } + jobs, err := availableJobs(ctx) + if err != nil { + return "", "", err + } + var matches []corejob.Spec + for _, spec := range jobs { + if strings.EqualFold(spec.Name, name) { + matches = append(matches, spec) + } + } + if len(matches) == 0 { + return "", "", fmt.Errorf("job %q not found for context %q", name, ctx.Name) + } + if len(matches) > 1 { + owners := make([]string, 0, len(matches)) + for _, spec := range matches { + owners = append(owners, spec.Plugin) + } + sort.Strings(owners) + return "", "", fmt.Errorf("job %q is ambiguous; qualify it as plugin/job (%s)", name, strings.Join(owners, ", ")) + } + return matches[0].Plugin, matches[0].Name, nil +} + +func pluginContextArgs(args []string) ([]string, string, error) { + contextName, err := RootCmd.PersistentFlags().GetString("context") + if err != nil { + return nil, "", err + } + filtered := make([]string, 0, len(args)) + skipNext := false + for _, arg := range args { + if skipNext { + contextName = arg + skipNext = false + continue + } + if arg == "--context" { + skipNext = true + continue + } + if strings.HasPrefix(arg, "--context=") { + contextName = strings.TrimSpace(strings.TrimPrefix(arg, "--context=")) + continue + } + filtered = append(filtered, arg) + } + return filtered, contextName, nil +} + +func init() { + jobCmd.AddCommand(jobListCmd) + jobCmd.AddCommand(jobExecCmd) + RootCmd.AddCommand(jobCmd) +} + +func runJobCommand(cmd *cobra.Command, contextName, owner, name string, jobArgs []string) (string, error) { + invocation := append([]string{"--context", contextName, "__job", name}, jobArgs...) + if stderrFile, ok := cmd.ErrOrStderr().(*os.File); ok && term.IsTerminal(int(stderrFile.Fd())) { + progress := newDebugProgressLine(cmd.ErrOrStderr()) + progress.Report("Running Job", fmt.Sprintf("%s on %s", name, contextName)) + defer progress.Close() + return pluginSDK.InvokePluginCommand(owner, invocation, plugin.CommandExecOptions{ + Context: RootCmd.Context(), + Capture: true, + }) + } + + return pluginSDK.InvokePluginCommand(owner, invocation, plugin.CommandExecOptions{ + Context: RootCmd.Context(), + Capture: true, + }) +} + +func requestsHelp(args []string) bool { + for _, arg := range args { + switch arg { + case "--help", "-h": + return true + } + } + return false +} + +func renderJobHelp(cmd *cobra.Command, ctx config.Context, owner, name string, jobArgs []string) error { + output, err := pluginSDK.InvokePluginCommand(owner, append([]string{"--context", ctx.Name, "__job", name}, jobArgs...), plugin.CommandExecOptions{ + Context: RootCmd.Context(), + Capture: true, + }) + if err != nil { + return cleanPluginCommandError(err) + } + + replacements := []string{ + fmt.Sprintf("sitectl %s __job %s", owner, name), fmt.Sprintf("sitectl job run %s", name), + fmt.Sprintf("%s __job %s", owner, name), fmt.Sprintf("job run %s", name), + "Internal job execution command", "Run a plugin-defined job", + } + rewritten := strings.NewReplacer(replacements...).Replace(output) + _, err = fmt.Fprint(cmd.OutOrStdout(), rewritten) + return err +} + +func cleanPluginCommandError(err error) error { + message := err.Error() + for _, prefix := range []string{ + `run plugin "drupal": exit status 1: `, + `run plugin "isle": exit status 1: `, + `run plugin "wordpress": exit status 1: `, + `run plugin "core": exit status 1: `, + } { + if strings.HasPrefix(message, prefix) { + return errors.New(strings.TrimPrefix(message, prefix)) + } + } + + const genericPrefix = `run plugin "` + if strings.HasPrefix(message, genericPrefix) { + if idx := strings.Index(message, `: exit status 1: `); idx != -1 { + return errors.New(message[idx+len(`: exit status 1: `):]) + } + } + return err +} diff --git a/pkg/config/config.go b/pkg/config/config.go index 53fe08e..a6b1ed9 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -9,8 +9,9 @@ import ( ) type Config struct { - CurrentContext string `yaml:"current-context"` - Contexts []Context `yaml:"contexts"` + CurrentContext string `yaml:"current-context"` + Contexts []Context `yaml:"contexts"` + CronSpecs []CronSpec `yaml:"cron-specs,omitempty"` } func ConfigFilePath() string { diff --git a/pkg/config/context.go b/pkg/config/context.go index 3382319..81713c7 100644 --- a/pkg/config/context.go +++ b/pkg/config/context.go @@ -27,22 +27,24 @@ const ( ) type Context struct { - Name string `yaml:"name"` - Site string `yaml:"site"` - Plugin string `yaml:"plugin"` - DockerHostType ContextType `mapstructure:"type" yaml:"type"` - Environment string `yaml:"environment,omitempty"` - DockerSocket string `yaml:"docker-socket"` - ProjectName string `yaml:"project-name"` - ComposeProjectName string `yaml:"compose-project-name,omitempty"` - ComposeNetwork string `yaml:"compose-network,omitempty"` - ProjectDir string `yaml:"project-dir"` - SSHUser string `yaml:"ssh-user"` - SSHHostname string `yaml:"ssh-hostname,omitempty"` - SSHPort uint `yaml:"ssh-port,omitempty"` - SSHKeyPath string `yaml:"ssh-key,omitempty"` - EnvFile []string `yaml:"env-file"` - ComposeFile []string `yaml:"compose-file,omitempty"` + Name string `yaml:"name"` + Site string `yaml:"site"` + Plugin string `yaml:"plugin"` + DockerHostType ContextType `mapstructure:"type" yaml:"type"` + Environment string `yaml:"environment,omitempty"` + DockerSocket string `yaml:"docker-socket"` + ProjectName string `yaml:"project-name"` + ComposeProjectName string `yaml:"compose-project-name,omitempty"` + ComposeNetwork string `yaml:"compose-network,omitempty"` + ProjectDir string `yaml:"project-dir"` + DrupalRootfs string `yaml:"drupal-rootfs,omitempty"` + DrupalContainerRoot string `yaml:"drupal-container-root,omitempty"` + SSHUser string `yaml:"ssh-user"` + SSHHostname string `yaml:"ssh-hostname,omitempty"` + SSHPort uint `yaml:"ssh-port,omitempty"` + SSHKeyPath string `yaml:"ssh-key,omitempty"` + EnvFile []string `yaml:"env-file"` + ComposeFile []string `yaml:"compose-file,omitempty"` // Database connection configuration DatabaseService string `yaml:"database-service,omitempty"` diff --git a/pkg/config/cron.go b/pkg/config/cron.go new file mode 100644 index 0000000..cd668da --- /dev/null +++ b/pkg/config/cron.go @@ -0,0 +1,74 @@ +package config + +import ( + "fmt" + "slices" + "strings" +) + +type CronSpec struct { + Name string `yaml:"name"` + Context string `yaml:"context"` + Schedule string `yaml:"schedule"` + OutputDir string `yaml:"output-dir"` + Components []string `yaml:"components,omitempty"` + RetentionDays int `yaml:"retention-days,omitempty"` + PreserveFirstOfMonth bool `yaml:"preserve-first-of-month,omitempty"` + DockerPrune bool `yaml:"docker-prune,omitempty"` +} + +func GetCronSpec(name string) (CronSpec, error) { + cfg, err := Load() + if err != nil { + return CronSpec{}, err + } + for _, spec := range cfg.CronSpecs { + if strings.EqualFold(spec.Name, name) { + return spec, nil + } + } + return CronSpec{}, fmt.Errorf("cron spec %q not found", name) +} + +func SaveCronSpec(spec CronSpec) error { + cfg, err := Load() + if err != nil { + return err + } + updated := false + for i := range cfg.CronSpecs { + if strings.EqualFold(cfg.CronSpecs[i].Name, spec.Name) { + cfg.CronSpecs[i] = spec + updated = true + break + } + } + if !updated { + cfg.CronSpecs = append(cfg.CronSpecs, spec) + slices.SortFunc(cfg.CronSpecs, func(a, b CronSpec) int { + return strings.Compare(strings.ToLower(a.Name), strings.ToLower(b.Name)) + }) + } + return Save(cfg) +} + +func DeleteCronSpec(name string) error { + cfg, err := Load() + if err != nil { + return err + } + filtered := cfg.CronSpecs[:0] + found := false + for _, spec := range cfg.CronSpecs { + if strings.EqualFold(spec.Name, name) { + found = true + continue + } + filtered = append(filtered, spec) + } + if !found { + return fmt.Errorf("cron spec %q not found", name) + } + cfg.CronSpecs = filtered + return Save(cfg) +} diff --git a/pkg/config/file_accessor.go b/pkg/config/file_accessor.go index 49988d0..93a4379 100644 --- a/pkg/config/file_accessor.go +++ b/pkg/config/file_accessor.go @@ -229,6 +229,16 @@ func (a *FileAccessor) WriteFile(filename string, data []byte) error { return err } +func (a *FileAccessor) MkdirAll(path string) error { + if strings.TrimSpace(path) == "" { + return nil + } + if a == nil || a.ctx == nil || a.ctx.DockerHostType == ContextLocal { + return os.MkdirAll(path, 0o755) + } + return mkdirAllRemote(a.sftp, path) +} + func (a *FileAccessor) RemoveFile(filename string) error { if a == nil || a.ctx == nil || a.ctx.DockerHostType == ContextLocal { if err := os.Remove(filename); err != nil && !os.IsNotExist(err) { @@ -242,6 +252,54 @@ func (a *FileAccessor) RemoveFile(filename string) error { return nil } +func (a *FileAccessor) RemoveAll(path string) error { + if strings.TrimSpace(path) == "" { + return nil + } + if a == nil || a.ctx == nil || a.ctx.DockerHostType == ContextLocal { + if err := os.RemoveAll(path); err != nil && !os.IsNotExist(err) { + return err + } + return nil + } + + entries := []string{} + walker := a.sftp.Walk(path) + for walker.Step() { + if err := walker.Err(); err != nil { + if isSFTPNotExist(err) { + return nil + } + return err + } + entries = append(entries, walker.Path()) + } + if len(entries) == 0 { + return nil + } + + for i := len(entries) - 1; i >= 0; i-- { + entry := entries[i] + info, err := a.sftp.Stat(entry) + if err != nil { + if isSFTPNotExist(err) { + continue + } + return err + } + if info.IsDir() { + if err := a.sftp.RemoveDirectory(entry); err != nil && !isSFTPNotExist(err) { + return err + } + continue + } + if err := a.sftp.Remove(entry); err != nil && !isSFTPNotExist(err) { + return err + } + } + return nil +} + func (a *FileAccessor) ListFiles(root string) ([]string, error) { if a == nil || a.ctx == nil || a.ctx.DockerHostType == ContextLocal { return listLocalFiles(root) diff --git a/pkg/config/files.go b/pkg/config/files.go index 5510cec..065ee82 100644 --- a/pkg/config/files.go +++ b/pkg/config/files.go @@ -102,7 +102,18 @@ func mkdirAllRemote(client *sftp.Client, dir string) error { continue } current = filepath.Join(current, part) + info, err := client.Stat(current) + if err == nil { + if info.IsDir() { + continue + } + return fmt.Errorf("remote path %q exists and is not a directory", current) + } if err := client.Mkdir(current); err != nil && !isSFTPExist(err) { + info, statErr := client.Stat(current) + if statErr == nil && info.IsDir() { + continue + } return fmt.Errorf("create remote directory %q: %w", current, err) } } diff --git a/pkg/config/local_context.go b/pkg/config/local_context.go index 3713157..bb5f1ba 100644 --- a/pkg/config/local_context.go +++ b/pkg/config/local_context.go @@ -25,6 +25,8 @@ type LocalContextCreateOptions struct { ComposeNetwork string Environment string DockerSocket string + DrupalRootfs string + DrupalContainerRoot string SetDefault bool ConfirmOverwrite bool Input InputFunc @@ -94,16 +96,18 @@ func PromptAndSaveLocalContext(opts LocalContextCreateOptions) (*Context, error) dockerSocket := GetDefaultLocalDockerSocket(firstNonEmpty(opts.DockerSocket, existing.DockerSocket, "/var/run/docker.sock")) ctx := &Context{ - Name: name, - Site: site, - Plugin: plugin, - DockerHostType: ContextLocal, - Environment: environment, - DockerSocket: dockerSocket, - ProjectName: projectName, - ComposeProjectName: composeProjectName, - ComposeNetwork: composeNetwork, - ProjectDir: projectDir, + Name: name, + Site: site, + Plugin: plugin, + DockerHostType: ContextLocal, + Environment: environment, + DockerSocket: dockerSocket, + ProjectName: projectName, + ComposeProjectName: composeProjectName, + ComposeNetwork: composeNetwork, + ProjectDir: projectDir, + DrupalRootfs: firstNonEmpty(opts.DrupalRootfs, existing.DrupalRootfs), + DrupalContainerRoot: firstNonEmpty(opts.DrupalContainerRoot, existing.DrupalContainerRoot), } if err := SaveContext(ctx, opts.SetDefault); err != nil { diff --git a/pkg/config/utils.go b/pkg/config/utils.go index ddb9675..a4dd225 100644 --- a/pkg/config/utils.go +++ b/pkg/config/utils.go @@ -60,6 +60,9 @@ func LoadFromFlags(f *pflag.FlagSet, context Context) (*Context, error) { continue } tag = strings.Split(tag, ",")[0] + if f.Lookup(tag) == nil { + continue + } // Skip map types as they're not supported as flags if field.Type.Kind() == reflect.Map { diff --git a/pkg/config/validation_helpers.go b/pkg/config/validation_helpers.go index 178c916..b49fdb9 100644 --- a/pkg/config/validation_helpers.go +++ b/pkg/config/validation_helpers.go @@ -10,6 +10,8 @@ import ( "github.com/kballard/go-shellquote" ) +const defaultDrupalContainerRoot = "/var/www/drupal" + func IsDockerSocketAlive(socket string) bool { return isDockerSocketAlive(socket) } @@ -52,6 +54,20 @@ func (c *Context) ResolveProjectPath(path string) string { return filepath.Join(c.ProjectDir, path) } +func (c *Context) EffectiveDrupalRootfs() string { + if c == nil || strings.TrimSpace(c.DrupalRootfs) == "" { + return "." + } + return strings.TrimSpace(c.DrupalRootfs) +} + +func (c *Context) EffectiveDrupalContainerRoot() string { + if c == nil || strings.TrimSpace(c.DrupalContainerRoot) == "" { + return defaultDrupalContainerRoot + } + return strings.TrimSpace(c.DrupalContainerRoot) +} + func (c *Context) HasComposeProject() (bool, error) { if c == nil { return false, fmt.Errorf("context is nil") diff --git a/pkg/cron/spec.go b/pkg/cron/spec.go new file mode 100644 index 0000000..fd868e9 --- /dev/null +++ b/pkg/cron/spec.go @@ -0,0 +1,135 @@ +package cron + +import ( + "fmt" + "os" + "path/filepath" + "sort" + "strings" + "time" +) + +type ComponentSpec struct { + Name string `yaml:"name"` + Plugin string `yaml:"plugin"` + Description string `yaml:"description,omitempty"` + Filename string `yaml:"filename"` +} + +func PruneArtifacts(root string, now time.Time, retentionDays int, preserveFirstOfMonth bool) error { + if retentionDays <= 0 { + return nil + } + + err := filepath.WalkDir(root, func(path string, d os.DirEntry, walkErr error) error { + if walkErr != nil { + return walkErr + } + if d.IsDir() { + return nil + } + info, err := d.Info() + if err != nil { + return err + } + if ShouldDeleteArtifact(info.ModTime(), now, retentionDays, preserveFirstOfMonth) { + if err := os.Remove(path); err != nil { + return err + } + } + return nil + }) + if err != nil { + return err + } + + return removeEmptyDirs(root) +} + +func removeEmptyDirs(root string) error { + var dirs []string + if err := filepath.WalkDir(root, func(path string, d os.DirEntry, err error) error { + if err != nil { + return err + } + if d.IsDir() { + dirs = append(dirs, path) + } + return nil + }); err != nil { + return err + } + + sort.Slice(dirs, func(i, j int) bool { return len(dirs[i]) > len(dirs[j]) }) + for _, dir := range dirs { + if dir == root { + continue + } + entries, err := os.ReadDir(dir) + if err != nil { + return err + } + if len(entries) == 0 { + if err := os.Remove(dir); err != nil { + return err + } + } + } + return nil +} + +func ShouldDeleteArtifact(modTime, now time.Time, retentionDays int, preserveFirstOfMonth bool) bool { + if retentionDays <= 0 { + return false + } + if preserveFirstOfMonth && modTime.UTC().Day() == 1 { + return false + } + return now.Sub(modTime) > time.Duration(retentionDays)*24*time.Hour +} + +func EnsureDatedDestination(root string, now time.Time) (string, error) { + out := filepath.Join(root, now.Format("2006"), now.Format("01"), now.Format("02")) + if err := os.MkdirAll(out, 0o755); err != nil { + return "", err + } + return out, nil +} + +func ValidateComponents(specs []ComponentSpec) error { + seen := map[string]bool{} + for _, spec := range specs { + if strings.TrimSpace(spec.Name) == "" { + return fmt.Errorf("cron component name is required") + } + if strings.TrimSpace(spec.Plugin) == "" { + return fmt.Errorf("cron component %q plugin is required", spec.Name) + } + if strings.TrimSpace(spec.Filename) == "" { + return fmt.Errorf("cron component %q filename is required", spec.Name) + } + if seen[spec.Name] { + return fmt.Errorf("duplicate cron component %q", spec.Name) + } + seen[spec.Name] = true + } + return nil +} + +func SortComponents(specs []ComponentSpec) { + sort.Slice(specs, func(i, j int) bool { + if specs[i].Plugin != specs[j].Plugin { + return specs[i].Plugin < specs[j].Plugin + } + return specs[i].Name < specs[j].Name + }) +} + +func FindComponent(specs []ComponentSpec, name string) (ComponentSpec, bool) { + for _, spec := range specs { + if strings.EqualFold(spec.Name, name) { + return spec, true + } + } + return ComponentSpec{}, false +} diff --git a/pkg/cron/spec_test.go b/pkg/cron/spec_test.go new file mode 100644 index 0000000..3242621 --- /dev/null +++ b/pkg/cron/spec_test.go @@ -0,0 +1,57 @@ +package cron + +import ( + "os" + "path/filepath" + "strings" + "testing" + "testing/synctest" + "time" +) + +func TestPruneArtifactsWithSynctest(t *testing.T) { + synctest.Test(t, func(t *testing.T) { + root := t.TempDir() + + for day := 0; day < 31; day++ { + now := time.Now().UTC() + name := now.Format("20060102.15.04.05") + ".gz" + path := filepath.Join(root, name) + if err := os.WriteFile(path, nil, 0o644); err != nil { + t.Fatalf("WriteFile() error = %v", err) + } + if err := os.Chtimes(path, now, now); err != nil { + t.Fatalf("Chtimes() error = %v", err) + } + time.Sleep(24 * time.Hour) + synctest.Wait() + } + + if err := PruneArtifacts(root, time.Now(), 14, true); err != nil { + t.Fatalf("PruneArtifacts() error = %v", err) + } + + entries, err := os.ReadDir(root) + if err != nil { + t.Fatalf("ReadDir() error = %v", err) + } + if got, want := len(entries), 15; got != want { + names := make([]string, 0, len(entries)) + for _, entry := range entries { + names = append(names, entry.Name()) + } + t.Fatalf("len(entries) = %d, want %d; remaining=%v", got, want, names) + } + + var foundMonthly bool + for _, entry := range entries { + if strings.HasPrefix(entry.Name(), "20000101.") { + foundMonthly = true + break + } + } + if !foundMonthly { + t.Fatal("expected preserved monthly artifact from 2000-01-01") + } + }) +} diff --git a/pkg/docker/docker.go b/pkg/docker/docker.go index 00be6dc..09ce891 100644 --- a/pkg/docker/docker.go +++ b/pkg/docker/docker.go @@ -2,6 +2,7 @@ package docker import ( "context" + "errors" "fmt" "io" "log/slog" @@ -13,6 +14,8 @@ import ( "sort" "strconv" "strings" + "sync" + "syscall" dockercontainer "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/filters" @@ -275,9 +278,32 @@ func (d *DockerClient) Exec(ctx context.Context, opts ExecOptions) (int, error) } defer resp.Close() - // Copy output - errCh := make(chan error, 1) + if ctx != nil { + go func() { + <-ctx.Done() + resp.Close() + }() + } + + var wg sync.WaitGroup + errCh := make(chan error, 2) + if opts.AttachStdin { + wg.Add(1) + go func() { + defer wg.Done() + if _, err := io.Copy(resp.Conn, opts.Stdin); err != nil && !isIgnorableExecStreamError(err) { + errCh <- err + return + } + if closer, ok := resp.Conn.(interface{ CloseWrite() error }); ok { + _ = closer.CloseWrite() + } + }() + } + + wg.Add(1) go func() { + defer wg.Done() if opts.Tty { // For TTY, copy directly _, err := io.Copy(opts.Stdout, resp.Reader) @@ -289,9 +315,22 @@ func (d *DockerClient) Exec(ctx context.Context, opts ExecOptions) (int, error) } }() - // Wait for completion - if err := <-errCh; err != nil && err != io.EOF { - return -1, fmt.Errorf("failed to copy output: %w", err) + go func() { + wg.Wait() + close(errCh) + }() + + for copyErr := range errCh { + if ctx != nil && ctx.Err() != nil { + return -1, ctx.Err() + } + if copyErr != nil && !isIgnorableExecStreamError(copyErr) { + return -1, fmt.Errorf("failed to copy exec stream: %w", copyErr) + } + } + + if ctx != nil && ctx.Err() != nil { + return -1, ctx.Err() } // Get exit code @@ -303,6 +342,19 @@ func (d *DockerClient) Exec(ctx context.Context, opts ExecOptions) (int, error) return inspectResp.ExitCode, nil } +func isIgnorableExecStreamError(err error) bool { + if err == nil || err == io.EOF { + return true + } + if errors.Is(err, syscall.EPIPE) { + return true + } + message := strings.ToLower(err.Error()) + return strings.Contains(message, "broken pipe") || + strings.Contains(message, "closed network connection") || + strings.Contains(message, "use of closed network connection") +} + // ExecSimple executes a simple command and returns the exit code func (d *DockerClient) ExecSimple(ctx context.Context, containerID string, cmd []string) (int, error) { return d.Exec(ctx, ExecOptions{ diff --git a/pkg/job/artifact.go b/pkg/job/artifact.go new file mode 100644 index 0000000..e4003d3 --- /dev/null +++ b/pkg/job/artifact.go @@ -0,0 +1,72 @@ +package job + +import ( + "fmt" + "path/filepath" + "time" + + "github.com/libops/sitectl/pkg/config" +) + +var defaultRecentArtifactOffsets = []time.Duration{0, -24 * time.Hour} + +func DatedArtifactPath(rootDir, filename string, ts time.Time) string { + if ts.IsZero() { + ts = time.Now().UTC() + } + return filepath.Join(rootDir, ts.Format("2006"), ts.Format("01"), ts.Format("02"), filename) +} + +func ResolveRecentArtifact(ctx *config.Context, rootDir, filename string, fresh bool, now time.Time, produce func(path string) error) (string, error) { + if ctx == nil { + return "", fmt.Errorf("context is required") + } + if filename == "" { + return "", fmt.Errorf("filename is required") + } + if now.IsZero() { + now = time.Now().UTC() + } + + if !fresh { + for _, offset := range defaultRecentArtifactOffsets { + path := DatedArtifactPath(rootDir, filename, now.Add(offset)) + exists, err := PathExistsOnContext(ctx, path) + if err != nil { + return "", err + } + if exists { + return path, nil + } + } + } + + path := DatedArtifactPath(rootDir, filename, now) + if err := produce(path); err != nil { + return "", err + } + return path, nil +} + +func PathExistsOnContext(ctx *config.Context, path string) (bool, error) { + accessor, err := config.NewFileAccessor(ctx) + if err != nil { + return false, err + } + defer accessor.Close() + return accessor.FileExists(path) +} + +func EnsurePathAbsentOnContext(ctx *config.Context, path string) error { + if ctx == nil { + return fmt.Errorf("context is required") + } + exists, err := PathExistsOnContext(ctx, path) + if err != nil { + return err + } + if exists { + return fmt.Errorf("refusing to overwrite existing host file %q on context %q", path, ctx.Name) + } + return nil +} diff --git a/pkg/job/artifact_test.go b/pkg/job/artifact_test.go new file mode 100644 index 0000000..da9b658 --- /dev/null +++ b/pkg/job/artifact_test.go @@ -0,0 +1,62 @@ +package job + +import ( + "os" + "path/filepath" + "testing" + "time" + + "github.com/libops/sitectl/pkg/config" +) + +func TestResolveRecentArtifactReusesExistingFile(t *testing.T) { + root := t.TempDir() + now := time.Date(2026, 3, 22, 10, 0, 0, 0, time.UTC) + existing := DatedArtifactPath(root, "drupal.sql.gz", now.Add(-24*time.Hour)) + if err := os.MkdirAll(filepath.Dir(existing), 0o755); err != nil { + t.Fatal(err) + } + if err := os.WriteFile(existing, []byte("ok"), 0o644); err != nil { + t.Fatal(err) + } + + ctx := &config.Context{DockerHostType: config.ContextLocal} + produced := false + got, err := ResolveRecentArtifact(ctx, root, "drupal.sql.gz", false, now, func(path string) error { + produced = true + return nil + }) + if err != nil { + t.Fatal(err) + } + if got != existing { + t.Fatalf("got %q, want %q", got, existing) + } + if produced { + t.Fatal("expected existing artifact to be reused") + } +} + +func TestResolveRecentArtifactProducesWhenMissingOrFresh(t *testing.T) { + root := t.TempDir() + now := time.Date(2026, 3, 22, 10, 0, 0, 0, time.UTC) + ctx := &config.Context{DockerHostType: config.ContextLocal} + + for _, fresh := range []bool{false, true} { + produced := "" + got, err := ResolveRecentArtifact(ctx, root, "fcrepo.sql.gz", fresh, now, func(path string) error { + produced = path + return nil + }) + if err != nil { + t.Fatal(err) + } + want := DatedArtifactPath(root, "fcrepo.sql.gz", now) + if got != want { + t.Fatalf("fresh=%v got %q, want %q", fresh, got, want) + } + if produced != want { + t.Fatalf("fresh=%v produced %q, want %q", fresh, produced, want) + } + } +} diff --git a/pkg/job/spec.go b/pkg/job/spec.go new file mode 100644 index 0000000..e244a75 --- /dev/null +++ b/pkg/job/spec.go @@ -0,0 +1,18 @@ +package job + +import "strings" + +type Spec struct { + Name string `yaml:"name"` + Plugin string `yaml:"plugin,omitempty"` + Description string `yaml:"description,omitempty"` +} + +func Find(specs []Spec, name string) (Spec, bool) { + for _, spec := range specs { + if strings.EqualFold(spec.Name, name) { + return spec, true + } + } + return Spec{}, false +} diff --git a/pkg/job/sync.go b/pkg/job/sync.go new file mode 100644 index 0000000..e6eb2b9 --- /dev/null +++ b/pkg/job/sync.go @@ -0,0 +1,83 @@ +package job + +import ( + "context" + "fmt" + "os" + "path/filepath" + "strings" + "time" + + "github.com/libops/sitectl/pkg/config" +) + +func ResolveContextPair(sourceName, targetName string) (*config.Context, *config.Context, error) { + if strings.TrimSpace(sourceName) == "" { + return nil, nil, fmt.Errorf("--source is required") + } + if strings.TrimSpace(targetName) == "" { + return nil, nil, fmt.Errorf("--target is required") + } + if sourceName == targetName { + return nil, nil, fmt.Errorf("--source and --target must be different contexts") + } + + sourceCtx, err := config.GetContext(sourceName) + if err != nil { + return nil, nil, fmt.Errorf("load source context %q: %w", sourceName, err) + } + targetCtx, err := config.GetContext(targetName) + if err != nil { + return nil, nil, fmt.Errorf("load target context %q: %w", targetName, err) + } + + return &sourceCtx, &targetCtx, nil +} + +func SyncArtifactName(prefix, suffix string) string { + prefix = strings.TrimSpace(prefix) + if prefix == "" { + prefix = "sitectl-sync" + } + return fmt.Sprintf("%s-%d-%s", prefix, time.Now().UnixNano(), suffix) +} + +func StageArtifactBetweenContexts(runCtx context.Context, sourceCtx, targetCtx *config.Context, sourcePath, localWorkDir, targetSuffix, prefix string) (string, func(), error) { + if sourceCtx == nil { + return "", nil, fmt.Errorf("source context is required") + } + if targetCtx == nil { + return "", nil, fmt.Errorf("target context is required") + } + if strings.TrimSpace(sourcePath) == "" { + return "", nil, fmt.Errorf("source path is required") + } + if strings.TrimSpace(localWorkDir) == "" { + return "", nil, fmt.Errorf("local work dir is required") + } + + localArtifactPath := filepath.Join(localWorkDir, filepath.Base(sourcePath)) + if err := DownloadContextFile(sourceCtx, sourcePath, localArtifactPath); err != nil { + return "", nil, err + } + + targetHostPath := filepath.ToSlash(filepath.Join("/tmp", SyncArtifactName(prefix, targetSuffix))) + if err := targetCtx.UploadFile(localArtifactPath, targetHostPath); err != nil { + return "", nil, err + } + + cleanup := func() { + RemoveContextHostPath(runCtx, targetCtx, targetHostPath) + } + return targetHostPath, cleanup, nil +} + +func MakeTempWorkDir(pattern string) (string, func(), error) { + workDir, err := os.MkdirTemp("", pattern) + if err != nil { + return "", nil, err + } + return workDir, func() { + _ = os.RemoveAll(workDir) + }, nil +} diff --git a/pkg/job/transfer.go b/pkg/job/transfer.go new file mode 100644 index 0000000..e1274fc --- /dev/null +++ b/pkg/job/transfer.go @@ -0,0 +1,80 @@ +package job + +import ( + "context" + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "strings" + + "github.com/libops/sitectl/pkg/config" + "github.com/pkg/sftp" +) + +func RemoveContextHostPath(runCtx context.Context, ctx *config.Context, path string) { + if ctx == nil || strings.TrimSpace(path) == "" { + return + } + _, _ = ctx.RunQuietCommandContext(runCtx, exec.Command("rm", "-f", path)) +} + +func DownloadContextFile(ctx *config.Context, sourcePath, localPath string) error { + if ctx == nil { + return fmt.Errorf("context is required") + } + if ctx.DockerHostType == config.ContextLocal { + sourceFile, err := os.Open(sourcePath) + if err != nil { + return err + } + defer sourceFile.Close() + return writeLocalFile(localPath, sourceFile) + } + + sshClient, err := ctx.DialSSH() + if err != nil { + return err + } + defer sshClient.Close() + + sftpClient, err := sftp.NewClient(sshClient) + if err != nil { + return err + } + defer sftpClient.Close() + + sourceFile, err := sftpClient.Open(sourcePath) + if err != nil { + return err + } + defer sourceFile.Close() + + return writeLocalFile(localPath, sourceFile) +} + +func EnsureDirOnContext(ctx *config.Context, dir string) error { + if ctx == nil { + return fmt.Errorf("context is required") + } + accessor, err := config.NewFileAccessor(ctx) + if err != nil { + return err + } + defer accessor.Close() + return accessor.MkdirAll(dir) +} + +func writeLocalFile(path string, r io.Reader) error { + if err := os.MkdirAll(filepath.Dir(path), 0o755); err != nil { + return err + } + dst, err := os.Create(path) + if err != nil { + return err + } + defer dst.Close() + _, err = io.Copy(dst, r) + return err +} diff --git a/pkg/plugin/jobs.go b/pkg/plugin/jobs.go new file mode 100644 index 0000000..0dbda8b --- /dev/null +++ b/pkg/plugin/jobs.go @@ -0,0 +1,95 @@ +package plugin + +import ( + "fmt" + "strings" + + "github.com/libops/sitectl/pkg/config" + "github.com/libops/sitectl/pkg/job" + "github.com/spf13/cobra" + yaml "gopkg.in/yaml.v3" +) + +type RegisteredJob struct { + Spec job.Spec + Command *cobra.Command +} + +type ContextJob interface { + BindFlags(cmd *cobra.Command) + Run(cmd *cobra.Command, ctx *config.Context) error +} + +func (s *SDK) RegisterJob(spec job.Spec, cmd *cobra.Command) { + if s == nil || cmd == nil { + return + } + root := s.ensureJobRoot() + if strings.TrimSpace(spec.Name) == "" { + spec.Name = strings.TrimSpace(cmd.Use) + } + if strings.TrimSpace(spec.Plugin) == "" { + spec.Plugin = s.Metadata.Name + } + if strings.TrimSpace(spec.Name) == "" { + return + } + cmd.Use = spec.Name + cmd.Hidden = true + if cmd.Short == "" { + cmd.Short = spec.Description + } + root.AddCommand(cmd) + s.jobs = append(s.jobs, RegisteredJob{Spec: spec, Command: cmd}) +} + +func (s *SDK) RegisterContextJob(spec job.Spec, runner ContextJob) { + if s == nil || runner == nil { + return + } + cmd := &cobra.Command{ + Use: strings.TrimSpace(spec.Name), + Short: spec.Description, + SilenceUsage: true, + RunE: func(cmd *cobra.Command, args []string) error { + ctx, err := s.GetContext() + if err != nil { + return err + } + return runner.Run(cmd, ctx) + }, + } + runner.BindFlags(cmd) + s.RegisterJob(spec, cmd) +} + +func (s *SDK) ensureJobRoot() *cobra.Command { + if s.jobRootCmd != nil { + return s.jobRootCmd + } + root := &cobra.Command{ + Use: "__job", + Hidden: true, + SilenceUsage: true, + } + listCmd := &cobra.Command{ + Use: "list", + Hidden: true, + RunE: func(cmd *cobra.Command, args []string) error { + specs := make([]job.Spec, 0, len(s.jobs)) + for _, registered := range s.jobs { + specs = append(specs, registered.Spec) + } + data, err := yaml.Marshal(specs) + if err != nil { + return fmt.Errorf("marshal jobs: %w", err) + } + _, err = cmd.OutOrStdout().Write(data) + return err + }, + } + root.AddCommand(listCmd) + s.jobRootCmd = root + s.RootCmd.AddCommand(root) + return root +} diff --git a/pkg/plugin/progress.go b/pkg/plugin/progress.go new file mode 100644 index 0000000..18f7eaf --- /dev/null +++ b/pkg/plugin/progress.go @@ -0,0 +1,88 @@ +package plugin + +import ( + "fmt" + "io" + "os" + "strings" + "sync" + "time" + + "golang.org/x/term" +) + +type ProgressLine struct { + out *os.File + frames []string + index int + title string + detail string + mu sync.Mutex + done chan struct{} + once sync.Once +} + +func NewProgressLine(w io.Writer, title, detail string) *ProgressLine { + file, ok := w.(*os.File) + if !ok || !term.IsTerminal(int(file.Fd())) { + return &ProgressLine{} + } + + progress := &ProgressLine{ + out: file, + frames: []string{"-", "\\", "|", "/"}, + title: strings.TrimSpace(title), + detail: strings.TrimSpace(detail), + done: make(chan struct{}), + } + go progress.animate(120 * time.Millisecond) + return progress +} + +func (p *ProgressLine) Report(title, detail string) { + if p == nil || p.out == nil { + return + } + p.mu.Lock() + p.title = strings.TrimSpace(title) + p.detail = strings.TrimSpace(detail) + p.renderLocked() + p.mu.Unlock() +} + +func (p *ProgressLine) Close() { + if p == nil || p.out == nil { + return + } + p.once.Do(func() { + close(p.done) + p.mu.Lock() + defer p.mu.Unlock() + _, _ = fmt.Fprint(p.out, "\r\033[2K") + }) +} + +func (p *ProgressLine) animate(interval time.Duration) { + ticker := time.NewTicker(interval) + defer ticker.Stop() + for { + select { + case <-ticker.C: + p.mu.Lock() + p.renderLocked() + p.mu.Unlock() + case <-p.done: + return + } + } +} + +func (p *ProgressLine) renderLocked() { + if p.out == nil { + return + } + frame := p.frames[p.index%len(p.frames)] + p.index++ + line := strings.TrimSpace(strings.Join([]string{p.title, p.detail}, " - ")) + _, _ = fmt.Fprintf(p.out, "\r%s %s", frame, line) +} diff --git a/pkg/plugin/sdk.go b/pkg/plugin/sdk.go index 901799e..50bdc0f 100644 --- a/pkg/plugin/sdk.go +++ b/pkg/plugin/sdk.go @@ -54,6 +54,8 @@ type SDK struct { contextValidators []validate.Validator contextCache *config.Context sshClient *ssh.Client + jobs []RegisteredJob + jobRootCmd *cobra.Command } // NewSDK creates a new plugin SDK instance