diff --git a/cli/cmd/project/refresh.go b/cli/cmd/project/refresh.go index 515822ac0fc..f4e006dec10 100644 --- a/cli/cmd/project/refresh.go +++ b/cli/cmd/project/refresh.go @@ -1,20 +1,24 @@ package project import ( + "context" "fmt" + "strconv" "github.com/rilldata/rill/cli/pkg/cmdutil" runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1" "github.com/rilldata/rill/runtime" "github.com/spf13/cobra" "github.com/spf13/pflag" + "google.golang.org/protobuf/types/known/structpb" ) func RefreshCmd(ch *cmdutil.Helper) *cobra.Command { var project, path, branch string var local bool var models, modelPartitions, sources, metricViews, alerts, reports, connectors []string - var all, full, erroredPartitions, parser bool + var all, full, erroredPartitions, parser, yes bool + var partitionKey, partitionStart, partitionEnd string refreshCmd := &cobra.Command{ Use: "refresh []", @@ -79,11 +83,26 @@ func RefreshCmd(ch *cmdutil.Helper) *cobra.Command { // Merge sources into models since sources have been deprecated and are no longer created on the backend. models = append(models, sources...) + // Validate partition-range flags. All three must be set together, and the range mode + // is mutually exclusive with --partition and --errored-partitions. + rangeMode := partitionKey != "" || partitionStart != "" || partitionEnd != "" + if rangeMode { + if partitionKey == "" || partitionStart == "" || partitionEnd == "" { + return fmt.Errorf("--partition-key, --partition-start, and --partition-end must all be set together") + } + if len(modelPartitions) > 0 || erroredPartitions { + return fmt.Errorf("--partition-key cannot be combined with --partition or --errored-partitions") + } + if partitionStart > partitionEnd { + return fmt.Errorf("--partition-start (%q) must be <= --partition-end (%q)", partitionStart, partitionEnd) + } + } + // Build model triggers - if len(modelPartitions) > 0 || erroredPartitions { + if len(modelPartitions) > 0 || erroredPartitions || rangeMode { // If partitions are specified, ensure exactly one model is specified. if len(models) != 1 { - return fmt.Errorf("must specify exactly one --model when using --partition or --errored-partitions") + return fmt.Errorf("must specify exactly one --model when using --partition, --errored-partitions, or --partition-key") } // Since it's a common error, do an early check to ensure the model is incremental. @@ -101,6 +120,31 @@ func RefreshCmd(ch *cmdutil.Helper) *cobra.Command { return fmt.Errorf("can't refresh partitions on model %q because it is not incremental", mn) } } + + // Resolve partition range to concrete partition keys. + if rangeMode { + matched, err := resolvePartitionRange(cmd.Context(), rt, instanceID, models[0], partitionKey, partitionStart, partitionEnd) + if err != nil { + return err + } + if len(matched) == 0 { + ch.Printf("No partitions match %s in [%s, %s] on model %q.\n", partitionKey, partitionStart, partitionEnd, models[0]) + return nil + } + + ch.PrintModelPartitions(matched) + + if !yes && ch.Interactive { + if err := cmdutil.ConfirmPrompt(fmt.Sprintf("Refresh %d partition(s)?", len(matched)), true); err != nil { + return err + } + } + + modelPartitions = make([]string, 0, len(matched)) + for _, p := range matched { + modelPartitions = append(modelPartitions, p.Key) + } + } var modelTriggers []*runtimev1.RefreshModelTrigger for _, m := range models { modelTriggers = append(modelTriggers, &runtimev1.RefreshModelTrigger{ @@ -150,6 +194,10 @@ func RefreshCmd(ch *cmdutil.Helper) *cobra.Command { refreshCmd.Flags().StringSliceVar(&models, "model", nil, "Refresh a model") refreshCmd.Flags().StringSliceVar(&modelPartitions, "partition", nil, "Refresh a model partition (must set --model)") refreshCmd.Flags().BoolVar(&erroredPartitions, "errored-partitions", false, "Refresh all model partitions with errors (must set --model)") + refreshCmd.Flags().StringVar(&partitionKey, "partition-key", "", "Name of the field in the partition data to range-filter on (must set --model)") + refreshCmd.Flags().StringVar(&partitionStart, "partition-start", "", "Inclusive lower bound for --partition-key (lexicographic string compare)") + refreshCmd.Flags().StringVar(&partitionEnd, "partition-end", "", "Inclusive upper bound for --partition-key (lexicographic string compare)") + refreshCmd.Flags().BoolVar(&yes, "yes", false, "Skip the partition-range refresh confirmation prompt") refreshCmd.Flags().StringSliceVar(&sources, "source", nil, "Refresh a source") refreshCmd.Flags().StringSliceVar(&metricViews, "metrics-view", nil, "Refresh a metrics view") refreshCmd.Flags().StringSliceVar(&alerts, "alert", nil, "Refresh an alert") @@ -159,3 +207,63 @@ func RefreshCmd(ch *cmdutil.Helper) *cobra.Command { return refreshCmd } + +// resolvePartitionRange lists all partitions of the given model and returns those whose +// data field `key` falls within [start, end] (inclusive, lexicographic string compare). +func resolvePartitionRange(ctx context.Context, rt runtimev1.RuntimeServiceClient, instanceID, model, key, start, end string) ([]*runtimev1.ModelPartition, error) { + var matched []*runtimev1.ModelPartition + var pageToken string + var sawAnyPartition bool + for { + res, err := rt.GetModelPartitions(ctx, &runtimev1.GetModelPartitionsRequest{ + InstanceId: instanceID, + Model: model, + PageSize: 100, + PageToken: pageToken, + }) + if err != nil { + return nil, fmt.Errorf("failed to list partitions for model %q: %w", model, err) + } + + for _, p := range res.Partitions { + sawAnyPartition = true + if p.Data == nil { + continue + } + fields := p.Data.GetFields() + v, ok := fields[key] + if !ok { + available := make([]string, 0, len(fields)) + for f := range fields { + available = append(available, f) + } + return nil, fmt.Errorf("partition field %q not found on partition %q; available fields: %v", key, p.Key, available) + } + + var s string + switch k := v.Kind.(type) { + case *structpb.Value_StringValue: + s = k.StringValue + case *structpb.Value_NumberValue: + s = strconv.FormatFloat(k.NumberValue, 'f', -1, 64) + case *structpb.Value_BoolValue: + s = strconv.FormatBool(k.BoolValue) + default: + return nil, fmt.Errorf("partition %q: unsupported type %T for field %q", p.Key, v.Kind, key) + } + if s >= start && s <= end { + matched = append(matched, p) + } + } + + if res.NextPageToken == "" { + break + } + pageToken = res.NextPageToken + } + + if !sawAnyPartition { + return nil, fmt.Errorf("model %q has no partitions to filter", model) + } + return matched, nil +}