Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 111 additions & 3 deletions cli/cmd/project/refresh.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
package project

import (
"context"
"fmt"
"strconv"

"github.com/rilldata/rill/cli/pkg/cmdutil"
runtimev1 "github.com/rilldata/rill/proto/gen/rill/runtime/v1"
"github.com/rilldata/rill/runtime"
"github.com/spf13/cobra"
"github.com/spf13/pflag"
"google.golang.org/protobuf/types/known/structpb"
)

func RefreshCmd(ch *cmdutil.Helper) *cobra.Command {
var project, path, branch string
var local bool
var models, modelPartitions, sources, metricViews, alerts, reports, connectors []string
var all, full, erroredPartitions, parser bool
var all, full, erroredPartitions, parser, yes bool
var partitionKey, partitionStart, partitionEnd string

refreshCmd := &cobra.Command{
Use: "refresh [<project-name>]",
Expand Down Expand Up @@ -79,11 +83,26 @@ func RefreshCmd(ch *cmdutil.Helper) *cobra.Command {
// Merge sources into models since sources have been deprecated and are no longer created on the backend.
models = append(models, sources...)

// Validate partition-range flags. All three must be set together, and the range mode
// is mutually exclusive with --partition and --errored-partitions.
rangeMode := partitionKey != "" || partitionStart != "" || partitionEnd != ""
if rangeMode {
if partitionKey == "" || partitionStart == "" || partitionEnd == "" {
return fmt.Errorf("--partition-key, --partition-start, and --partition-end must all be set together")
}
if len(modelPartitions) > 0 || erroredPartitions {
return fmt.Errorf("--partition-key cannot be combined with --partition or --errored-partitions")
}
if partitionStart > partitionEnd {
return fmt.Errorf("--partition-start (%q) must be <= --partition-end (%q)", partitionStart, partitionEnd)
}
}

// Build model triggers
if len(modelPartitions) > 0 || erroredPartitions {
if len(modelPartitions) > 0 || erroredPartitions || rangeMode {
// If partitions are specified, ensure exactly one model is specified.
if len(models) != 1 {
return fmt.Errorf("must specify exactly one --model when using --partition or --errored-partitions")
return fmt.Errorf("must specify exactly one --model when using --partition, --errored-partitions, or --partition-key")
}

// Since it's a common error, do an early check to ensure the model is incremental.
Expand All @@ -101,6 +120,31 @@ func RefreshCmd(ch *cmdutil.Helper) *cobra.Command {
return fmt.Errorf("can't refresh partitions on model %q because it is not incremental", mn)
}
}

// Resolve partition range to concrete partition keys.
if rangeMode {
matched, err := resolvePartitionRange(cmd.Context(), rt, instanceID, models[0], partitionKey, partitionStart, partitionEnd)
if err != nil {
return err
}
if len(matched) == 0 {
ch.Printf("No partitions match %s in [%s, %s] on model %q.\n", partitionKey, partitionStart, partitionEnd, models[0])
return nil
}

ch.PrintModelPartitions(matched)

if !yes && ch.Interactive {
if err := cmdutil.ConfirmPrompt(fmt.Sprintf("Refresh %d partition(s)?", len(matched)), true); err != nil {
return err
}
}

modelPartitions = make([]string, 0, len(matched))
for _, p := range matched {
modelPartitions = append(modelPartitions, p.Key)
}
}
var modelTriggers []*runtimev1.RefreshModelTrigger
for _, m := range models {
modelTriggers = append(modelTriggers, &runtimev1.RefreshModelTrigger{
Expand Down Expand Up @@ -150,6 +194,10 @@ func RefreshCmd(ch *cmdutil.Helper) *cobra.Command {
refreshCmd.Flags().StringSliceVar(&models, "model", nil, "Refresh a model")
refreshCmd.Flags().StringSliceVar(&modelPartitions, "partition", nil, "Refresh a model partition (must set --model)")
refreshCmd.Flags().BoolVar(&erroredPartitions, "errored-partitions", false, "Refresh all model partitions with errors (must set --model)")
refreshCmd.Flags().StringVar(&partitionKey, "partition-key", "", "Name of the field in the partition data to range-filter on (must set --model)")
refreshCmd.Flags().StringVar(&partitionStart, "partition-start", "", "Inclusive lower bound for --partition-key (lexicographic string compare)")
refreshCmd.Flags().StringVar(&partitionEnd, "partition-end", "", "Inclusive upper bound for --partition-key (lexicographic string compare)")
refreshCmd.Flags().BoolVar(&yes, "yes", false, "Skip the partition-range refresh confirmation prompt")
refreshCmd.Flags().StringSliceVar(&sources, "source", nil, "Refresh a source")
refreshCmd.Flags().StringSliceVar(&metricViews, "metrics-view", nil, "Refresh a metrics view")
refreshCmd.Flags().StringSliceVar(&alerts, "alert", nil, "Refresh an alert")
Expand All @@ -159,3 +207,63 @@ func RefreshCmd(ch *cmdutil.Helper) *cobra.Command {

return refreshCmd
}

// resolvePartitionRange lists all partitions of the given model and returns those whose
// data field `key` falls within [start, end] (inclusive, lexicographic string compare).
func resolvePartitionRange(ctx context.Context, rt runtimev1.RuntimeServiceClient, instanceID, model, key, start, end string) ([]*runtimev1.ModelPartition, error) {
var matched []*runtimev1.ModelPartition
var pageToken string
var sawAnyPartition bool
for {
res, err := rt.GetModelPartitions(ctx, &runtimev1.GetModelPartitionsRequest{
InstanceId: instanceID,
Model: model,
PageSize: 100,
PageToken: pageToken,
})
if err != nil {
return nil, fmt.Errorf("failed to list partitions for model %q: %w", model, err)
}

for _, p := range res.Partitions {
sawAnyPartition = true
if p.Data == nil {
continue
}
fields := p.Data.GetFields()
v, ok := fields[key]
if !ok {
available := make([]string, 0, len(fields))
for f := range fields {
available = append(available, f)
}
return nil, fmt.Errorf("partition field %q not found on partition %q; available fields: %v", key, p.Key, available)
}

var s string
switch k := v.Kind.(type) {
case *structpb.Value_StringValue:
s = k.StringValue
case *structpb.Value_NumberValue:
s = strconv.FormatFloat(k.NumberValue, 'f', -1, 64)
case *structpb.Value_BoolValue:
s = strconv.FormatBool(k.BoolValue)
default:
return nil, fmt.Errorf("partition %q: unsupported type %T for field %q", p.Key, v.Kind, key)
}
if s >= start && s <= end {
matched = append(matched, p)
}
}

if res.NextPageToken == "" {
break
}
pageToken = res.NextPageToken
}

if !sawAnyPartition {
return nil, fmt.Errorf("model %q has no partitions to filter", model)
}
return matched, nil
}
Loading