Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions cli/cmd/project/refresh.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func RefreshCmd(ch *cmdutil.Helper) *cobra.Command {
var project, path, branch string
var local bool
var models, modelPartitions, sources, metricViews, alerts, reports, connectors []string
var all, full, erroredPartitions, parser bool
var all, full, erroredPartitions, skippedPartitions, parser bool

refreshCmd := &cobra.Command{
Use: "refresh [<project-name>]",
Expand Down Expand Up @@ -80,10 +80,10 @@ func RefreshCmd(ch *cmdutil.Helper) *cobra.Command {
models = append(models, sources...)

// Build model triggers
if len(modelPartitions) > 0 || erroredPartitions {
if len(modelPartitions) > 0 || erroredPartitions || skippedPartitions {
// If partitions are specified, ensure exactly one model is specified.
if len(models) != 1 {
return fmt.Errorf("must specify exactly one --model when using --partition or --errored-partitions")
return fmt.Errorf("must specify exactly one --model when using --partition, --errored-partitions, or --skipped-partitions")
}

// Since it's a common error, do an early check to ensure the model is incremental.
Expand All @@ -107,6 +107,7 @@ func RefreshCmd(ch *cmdutil.Helper) *cobra.Command {
Model: m,
Full: full,
AllErroredPartitions: erroredPartitions,
AllSkippedPartitions: skippedPartitions,
Partitions: modelPartitions,
})
}
Expand Down Expand Up @@ -150,6 +151,7 @@ func RefreshCmd(ch *cmdutil.Helper) *cobra.Command {
refreshCmd.Flags().StringSliceVar(&models, "model", nil, "Refresh a model")
refreshCmd.Flags().StringSliceVar(&modelPartitions, "partition", nil, "Refresh a model partition (must set --model)")
refreshCmd.Flags().BoolVar(&erroredPartitions, "errored-partitions", false, "Refresh all model partitions with errors (must set --model)")
refreshCmd.Flags().BoolVar(&skippedPartitions, "skipped-partitions", false, "Refresh all skipped model partitions (must set --model)")
refreshCmd.Flags().StringSliceVar(&sources, "source", nil, "Refresh a source")
refreshCmd.Flags().StringSliceVar(&metricViews, "metrics-view", nil, "Refresh a metrics view")
refreshCmd.Flags().StringSliceVar(&alerts, "alert", nil, "Refresh an alert")
Expand Down
1 change: 1 addition & 0 deletions docs/docs/reference/cli/project/refresh.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ rill project refresh [<project-name>] [flags]
--model strings Refresh a model
--partition strings Refresh a model partition (must set --model)
--errored-partitions Refresh all model partitions with errors (must set --model)
--skipped-partitions Refresh all skipped model partitions (must set --model)
--source strings Refresh a source
--metrics-view strings Refresh a metrics view
--alert strings Refresh an alert
Expand Down
778 changes: 395 additions & 383 deletions proto/gen/rill/runtime/v1/resources.pb.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions proto/gen/rill/runtime/v1/resources.pb.validate.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions proto/gen/rill/runtime/v1/runtime.swagger.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7514,6 +7514,9 @@ definitions:
allErroredPartitions:
type: boolean
description: If true, it will refresh all partitions that errored on their last execution.
allSkippedPartitions:
type: boolean
description: If true, it will refresh all partitions that are currently marked as skipped.
v1RefreshTrigger:
type: object
properties:
Expand Down
2 changes: 2 additions & 0 deletions proto/rill/runtime/v1/resources.proto
Original file line number Diff line number Diff line change
Expand Up @@ -785,6 +785,8 @@ message RefreshModelTrigger {
repeated string partitions = 3;
// If true, it will refresh all partitions that errored on their last execution.
bool all_errored_partitions = 4;
// If true, it will refresh all partitions that are currently marked as skipped.
bool all_skipped_partitions = 5;
}

message Theme {
Expand Down
2 changes: 1 addition & 1 deletion runtime/drivers/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ type CatalogStore interface {
CheckModelPartitionsHaveErrors(ctx context.Context, modelID string) (bool, error)
InsertModelPartition(ctx context.Context, modelID string, partition ModelPartition) error
UpdateModelPartition(ctx context.Context, modelID string, partition ModelPartition) error
UpdateModelPartitionsTriggered(ctx context.Context, modelID string, wherePartitionKeyIn []string, whereErrored bool) error
UpdateModelPartitionsTriggered(ctx context.Context, modelID string, wherePartitionKeyIn []string, whereErrored, whereSkipped bool) error
UpdateModelPartitionsExecuted(ctx context.Context, modelID string, keys []string) error
UpdateModelPartitionsSkipped(ctx context.Context, modelID string, wherePartitionKeyIn []string, wherePending, whereErrored bool) error
DeleteModelPartitions(ctx context.Context, modelID string) error
Expand Down
5 changes: 4 additions & 1 deletion runtime/drivers/sqlite/catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ func (c *catalogStore) UpdateModelPartition(ctx context.Context, modelID string,
return nil
}

func (c *catalogStore) UpdateModelPartitionsTriggered(ctx context.Context, modelID string, wherePartitionKeyIn []string, whereErrored bool) error {
func (c *catalogStore) UpdateModelPartitionsTriggered(ctx context.Context, modelID string, wherePartitionKeyIn []string, whereErrored, whereSkipped bool) error {
var qry strings.Builder
var args []any

Expand All @@ -327,6 +327,9 @@ func (c *catalogStore) UpdateModelPartitionsTriggered(ctx context.Context, model
if whereErrored {
qry.WriteString(" OR error != ''")
}
if whereSkipped {
qry.WriteString(" OR skipped = true")
}
if len(wherePartitionKeyIn) > 0 {
qry.WriteString(" OR key IN (")
for i, k := range wherePartitionKeyIn {
Expand Down
56 changes: 56 additions & 0 deletions runtime/reconcilers/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,62 @@ sql: SELECT CAST('{{.partition.v}}' AS INTEGER) AS n
require.False(t, model.State.PartitionsHaveErrors)
}

func TestModelRefreshSkippedPartitions(t *testing.T) {
rt, instanceID := testruntime.NewInstance(t)
ctx := t.Context()

testruntime.PutFiles(t, rt, instanceID, map[string]string{
"rill.yaml": ``,
"models/partitioned.yaml": `
type: model
incremental: true
partitions:
sql: SELECT v FROM (VALUES (1), (2), (3)) t(v)
sql: SELECT {{.partition.v}} AS n
`,
})
testruntime.ReconcileParserAndWait(t, rt, instanceID)
testruntime.RefreshModelAndWait(t, rt, instanceID, &runtimev1.RefreshModelTrigger{Model: "partitioned", Full: true})

modelID := testruntime.GetResource(t, rt, instanceID, runtime.ResourceKindModel, "partitioned").GetModel().State.PartitionsModelId

findPartitions := func(opts *drivers.FindModelPartitionsOptions) []drivers.ModelPartition {
catalog, release, err := rt.Catalog(ctx, instanceID)
require.NoError(t, err)
defer release()
opts.ModelID = modelID
ps, err := catalog.FindModelPartitions(ctx, opts)
require.NoError(t, err)
return ps
}

// Skip all partitions by key (they've already executed, so they aren't pending).
var keys []string
for _, p := range findPartitions(&drivers.FindModelPartitionsOptions{}) {
keys = append(keys, p.Key)
}
require.Len(t, keys, 3)

catalog, release, err := rt.Catalog(ctx, instanceID)
require.NoError(t, err)
err = catalog.UpdateModelPartitionsSkipped(ctx, modelID, keys, false, false)
release()
require.NoError(t, err)
require.Len(t, findPartitions(&drivers.FindModelPartitionsOptions{WhereSkipped: true}), 3)

// Refresh all skipped partitions: they should be un-skipped and re-executed.
testruntime.RefreshModelAndWait(t, rt, instanceID, &runtimev1.RefreshModelTrigger{Model: "partitioned", AllSkippedPartitions: true})

require.Empty(t, findPartitions(&drivers.FindModelPartitionsOptions{WhereSkipped: true}))

all := findPartitions(&drivers.FindModelPartitionsOptions{})
require.Len(t, all, 3)
for _, p := range all {
require.False(t, p.Skipped)
require.NotNil(t, p.ExecutedOn, "partition %s should have been re-executed", p.Key)
}
}

func TestExplicitPartitionRefreshDoesNotProcessNewPartitions(t *testing.T) {
rt, instanceID := testruntime.NewInstance(t)
ctx := t.Context()
Expand Down
4 changes: 2 additions & 2 deletions runtime/reconcilers/refresh_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ func (r *RefreshTriggerReconciler) Reconcile(ctx context.Context, n *runtimev1.R
continue
}

triggerPartitions := len(mt.Partitions) > 0 || mt.AllErroredPartitions
triggerPartitions := len(mt.Partitions) > 0 || mt.AllErroredPartitions || mt.AllSkippedPartitions
if triggerPartitions {
mdl := mr.GetModel()
modelID := mdl.State.PartitionsModelId
Expand All @@ -130,7 +130,7 @@ func (r *RefreshTriggerReconciler) Reconcile(ctx context.Context, n *runtimev1.R
continue
}

err = catalog.UpdateModelPartitionsTriggered(ctx, modelID, mt.Partitions, mt.AllErroredPartitions)
err = catalog.UpdateModelPartitionsTriggered(ctx, modelID, mt.Partitions, mt.AllErroredPartitions, mt.AllSkippedPartitions)
if err != nil {
return runtime.ReconcileResult{Err: fmt.Errorf("failed to update partitions as triggered for model %s: %w", mt.Model, err)}
}
Expand Down
8 changes: 8 additions & 0 deletions web-common/src/proto/gen/rill/runtime/v1/resources_pb.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4473,6 +4473,13 @@ export class RefreshModelTrigger extends Message<RefreshModelTrigger> {
*/
allErroredPartitions = false;

/**
* If true, it will refresh all partitions that are currently marked as skipped.
*
* @generated from field: bool all_skipped_partitions = 5;
*/
allSkippedPartitions = false;

constructor(data?: PartialMessage<RefreshModelTrigger>) {
super();
proto3.util.initPartial(data, this);
Expand All @@ -4485,6 +4492,7 @@ export class RefreshModelTrigger extends Message<RefreshModelTrigger> {
{ no: 2, name: "full", kind: "scalar", T: 8 /* ScalarType.BOOL */ },
{ no: 3, name: "partitions", kind: "scalar", T: 9 /* ScalarType.STRING */, repeated: true },
{ no: 4, name: "all_errored_partitions", kind: "scalar", T: 8 /* ScalarType.BOOL */ },
{ no: 5, name: "all_skipped_partitions", kind: "scalar", T: 8 /* ScalarType.BOOL */ },
]);

static fromBinary(bytes: Uint8Array, options?: Partial<BinaryReadOptions>): RefreshModelTrigger {
Expand Down
2 changes: 2 additions & 0 deletions web-common/src/runtime-client/gen/index.schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2208,6 +2208,8 @@ For non-incremental models, this is equivalent to a normal refresh. */
partitions?: string[];
/** If true, it will refresh all partitions that errored on their last execution. */
allErroredPartitions?: boolean;
/** If true, it will refresh all partitions that are currently marked as skipped. */
allSkippedPartitions?: boolean;
}

export interface V1RefreshTrigger {
Expand Down
Loading