Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,11 @@ packages:
all: true
dir: gen/go/flyteidl2/service/mocks
include-auto-generated: true
github.com/flyteorg/flyte/v2/gen/go/flyteidl2/actions/actionsconnect:
config:
all: true
dir: gen/go/flyteidl2/actions/actionsconnect/mocks
include-auto-generated: true
github.com/flyteorg/flyte/v2/gen/go/flyteidl2/workflow/workflowconnect:
config:
all: true
Expand Down
5 changes: 5 additions & 0 deletions actions/service/actions_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package service

import (
"context"
"errors"
"fmt"
"strings"

Expand All @@ -20,6 +21,10 @@ type ActionsService struct {
client ActionsClientInterface
}

func (s *ActionsService) Signal(ctx context.Context, c *connect.Request[actions.SignalRequest]) (*connect.Response[actions.SignalResponse], error) {
return nil, connect.NewError(connect.CodeUnimplemented, errors.New("endpoint Signal not implemented"))
}

// NewActionsService creates a new ActionsService.
func NewActionsService(client ActionsClientInterface) *ActionsService {
return &ActionsService{client: client}
Expand Down
30 changes: 30 additions & 0 deletions flyteidl2/actions/actions_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ package flyteidl2.actions;

import "buf/validate/validate.proto";
import "flyteidl2/common/identifier.proto";
import "flyteidl2/core/literals.proto";
import "flyteidl2/task/run.proto";
import "flyteidl2/workflow/run_definition.proto";
import "flyteidl2/workflow/state_service.proto";
Expand Down Expand Up @@ -32,6 +33,14 @@ service ActionsService {
// Abort aborts a single action that was previously queued or is currently being processed by a worker.
// Note that this will cascade aborts to all descendant actions of the specified action.
rpc Abort(AbortRequest) returns (AbortResponse) {}

// Signal resolves a ConditionAction by providing its signal value.
// On success, transitions the condition to SUCCEEDED with the provided
// value as its output.
// Returns FAILED_PRECONDITION if the action is not a condition or is
// already terminal. Returns NOT_FOUND if the action does not exist.
// Returns ABORTED if the action has a write in-flight (retry).
rpc Signal(SignalRequest) returns (SignalResponse) {}
}

// Action represents a unit of work to be executed. Theses can be task executions, traces, or conditions.
Expand Down Expand Up @@ -168,3 +177,24 @@ message AbortRequest {

// AbortResponse is the response message for aborting a queued or running action.
message AbortResponse {}

// ============================================================================
// Signal
// ============================================================================

// SignalRequest is the request message for resolving a condition action.
message SignalRequest {
// The unique identifier for the condition action to signal.
flyteidl2.common.ActionIdentifier action_id = 1 [(buf.validate.field).required = true];

// The name of the parent action that owns this condition. Required for
// state store lookup and partition routing in the middleware.
string parent_action_name = 2 [(buf.validate.field).string.min_len = 1];

// The value literal to signal the condition with. Must match the
// ConditionAction.type declared at enqueue time.
flyteidl2.core.Literal value = 3 [(buf.validate.field).required = true];
}

// SignalResponse is the response message for resolving a condition action.
message SignalResponse {}
4 changes: 4 additions & 0 deletions flyteidl2/common/phase.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ option go_package = "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/common";
//
// Phase transitions follow this typical flow:
// QUEUED -> WAITING_FOR_RESOURCES -> INITIALIZING -> RUNNING -> {SUCCEEDED|FAILED|ABORTED|TIMED_OUT}
// Condition actions: PAUSED -> {SUCCEEDED(signal)|TIMED_OUT(timeout)|ABORTED(abort)}
enum ActionPhase {
// Default/unknown phase
ACTION_PHASE_UNSPECIFIED = 0;
Expand Down Expand Up @@ -35,4 +36,7 @@ enum ActionPhase {

// Action exceeded its execution time limit
ACTION_PHASE_TIMED_OUT = 8;

// Action is paused and waiting for an external signal (condition actions)
ACTION_PHASE_PAUSED = 9;
}
65 changes: 36 additions & 29 deletions flyteidl2/workflow/run_definition.proto
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import "flyteidl2/task/common.proto";
import "flyteidl2/task/run.proto";
import "flyteidl2/task/task_definition.proto";
import "google/protobuf/duration.proto";
import "google/protobuf/struct.proto";
import "google/protobuf/timestamp.proto";
import "google/protobuf/wrappers.proto";

Expand Down Expand Up @@ -71,32 +72,47 @@ message TraceAction {
// ConditionAction is used to define a condition that can be evaluated at runtime. It can be used to
// await a signal from an external system and can carry a value.
message ConditionAction {
// Name is the unique identifier for the action. It must be unique within the defined scope below.
// Name is the unique identifier for the action.
string name = 1 [(buf.validate.field).string.min_len = 1];

oneof scope {
option (buf.validate.oneof).required = true;
// RunId is the unique identifier for the run this action is associated with.
string run_id = 2 [(buf.validate.field).string.min_len = 1];

// ActionId is the unique identifier for the action this action is associated with.
string action_id = 3 [(buf.validate.field).string.min_len = 1];

// Global indicates the condition is global and can be used across all runs and actions.
bool global = 4;
}
// Fields 2-4 reserved (formerly scope oneof: run_id, action_id, global).
reserved 2, 3, 4;
reserved "run_id", "action_id", "global";

// Type is the type of the value the condition is expected. This can be used to properly render
// a UI element for the condition or validate when a value is received that it is of the expected
// type.
// Type is the expected value type for the condition.
flyteidl2.core.LiteralType type = 6;

// Prompt is the prompt that will be shown to the user when the condition is awaited.
// Prompt shown to the user when the condition is awaited.
string prompt = 7;

// Description is a description of the condition. This can be used to provide additional
// information to the user about the condition.
// Description of the condition.
string description = 8;

// How to render the prompt.
ConditionPromptType prompt_type = 9;

// Optional timeout duration. If the condition is not signaled within this
// duration after creation, it transitions to TIMED_OUT with a timeout error.
// Must be strictly positive when set; zero or negative values are ignored.
google.protobuf.Duration timeout = 10;

// Optional webhook to fire when the condition action is created.
ConditionWebhook webhook = 11;
}

enum ConditionPromptType {
CONDITION_PROMPT_TYPE_UNSPECIFIED = 0;
CONDITION_PROMPT_TYPE_TEXT = 1;
CONDITION_PROMPT_TYPE_MARKDOWN = 2;
}

message ConditionWebhook {
// The HTTP endpoint to POST to when the condition action is created.
string url = 1;

// Optional JSON payload. String values may contain "{callback_uri}"
// which the backend replaces with the signal URI for this condition.
google.protobuf.Struct payload = 2;
}

message TaskActionMetadata {
Expand All @@ -116,17 +132,8 @@ message TraceActionMetadata {

message ConditionActionMetadata {
string name = 1;
oneof scope {
option (buf.validate.oneof).required = true;
// RunId is the unique identifier for the run this action is associated with.
string run_id = 2 [(buf.validate.field).string.min_len = 1];

// ActionId is the unique identifier for the action this action is associated with.
string action_id = 3 [(buf.validate.field).string.min_len = 1];

// Global indicates the condition is global and can be used across all runs and actions.
bool global = 4;
}
reserved 2, 3, 4;
reserved "run_id", "action_id", "global";
}

// Static, lightweight metadata about an action.
Expand Down
4 changes: 4 additions & 0 deletions flyteidl2/workflow/state_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import "buf/validate/validate.proto";
import "flyteidl2/common/identifier.proto";
import "flyteidl2/common/phase.proto";
import "flyteidl2/core/execution.proto";
import "flyteidl2/core/literals.proto";
import "google/rpc/status.proto";

option go_package = "github.com/flyteorg/flyte/v2/gen/go/flyteidl2/workflow";
Expand Down Expand Up @@ -104,4 +105,7 @@ message ActionUpdate {

// the output uri for the action
string output_uri = 4;

// The value literal for the action (used by condition actions).
optional flyteidl2.core.Literal value = 5;
}
Loading
Loading