Skip to content

[Flyte-7005] Supporting Ray autoscalerOptions#7111

Open
0yukali0 wants to merge 15 commits intoflyteorg:masterfrom
0yukali0:flyte-7005
Open

[Flyte-7005] Supporting Ray autoscalerOptions#7111
0yukali0 wants to merge 15 commits intoflyteorg:masterfrom
0yukali0:flyte-7005

Conversation

@0yukali0
Copy link
Copy Markdown
Contributor

@0yukali0 0yukali0 commented Mar 30, 2026

Tracking issue

Related #7005

Why are the changes needed?

Ray autoScalerOptions is not supportted in flyte v1

What changes were proposed in this pull request?

  1. Add autoscalerOptions to ray.proto
  2. Generate go files of grpc proto
  3. Parsing the options from grpc and translate it to crd of ray operator.

How was this patch tested?

Labels

Please add one or more of the following labels to categorize your PR:

  • added: For new features.
  • changed: For changes in existing functionality.
  • deprecated: For soon-to-be-removed features.
  • removed: For features being removed.
  • fixed: For any bug fixed.
  • security: In case of vulnerabilities

This is important to improve the readability of release notes.

Setup process

  1. clone flyte and flytekit (branch flyte-7005 and flytekit-7005)
  2. run the flyte backend in sandbox
    2.1 flytectl demo start --dev
    2.2 make compile && POD_NAMESPACE=flyte ./flyte start --config flyte-single-binary-local.yaml
  3. Update ray example like Ray example
    3.1 uv venv -p 3.11 && source .venv/bin/activate && uv pip install -e ~/flytekit ~/flytekit/plugins/flytekit-ray && uv pip install -e ~/flyte/flyteidl
    3.2 "pyflyte run --remote ray_example.py ray_workflow --n 10"

Screenshots

image image
  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link

@codecov
Copy link
Copy Markdown

codecov bot commented Mar 30, 2026

Codecov Report

❌ Patch coverage is 89.18919% with 4 lines in your changes missing coverage. Please review.
✅ Project coverage is 56.97%. Comparing base (19daf37) to head (f4429c9).
⚠️ Report is 11 commits behind head on master.

Files with missing lines Patch % Lines
flyteplugins/go/tasks/plugins/k8s/ray/ray.go 89.18% 4 Missing ⚠️
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #7111      +/-   ##
==========================================
+ Coverage   56.95%   56.97%   +0.01%     
==========================================
  Files         931      931              
  Lines       58188    58283      +95     
==========================================
+ Hits        33143    33208      +65     
- Misses      22004    22021      +17     
- Partials     3041     3054      +13     
Flag Coverage Δ
unittests-datacatalog 53.51% <ø> (ø)
unittests-flyteadmin 53.14% <ø> (ø)
unittests-flytecopilot 43.06% <ø> (ø)
unittests-flytectl 64.09% <ø> (+0.07%) ⬆️
unittests-flyteidl 75.71% <ø> (ø)
unittests-flyteplugins 60.27% <89.18%> (+0.10%) ⬆️
unittests-flytepropeller 53.71% <ø> (+0.06%) ⬆️
unittests-flytestdlib 62.61% <ø> (-0.42%) ⬇️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@0yukali0 0yukali0 force-pushed the flyte-7005 branch 2 times, most recently from 951fbeb to 8a99445 Compare March 31, 2026 14:07
@0yukali0 0yukali0 marked this pull request as ready for review March 31, 2026 14:45
string runtime_env_yaml = 5;
}

message Resources {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use core.Resources?

message EnvVar {
string name = 1;
string value = 2;
EnvValueFrom valueFrom = 3;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need EnvValueFrom here? Can we use core.KeyValuePair as the type for env?

message KeyValuePair {
//required.
string key = 1;
//+optional.
string value = 2;
}


message AutoscalerOptions {
// "Default", "Aggressive", "Conservative"
string upscaling_mode = 1;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make this as enum?

enum UpscalingMode {
    UPSCALING_MODE_UNSPECIFIED = 0;
    UPSCALING_MODE_DEFAULT = 1;
    UPSCALING_MODE_AGGRESSIVE = 2;
    UPSCALING_MODE_CONSERVATIVE = 3;
  }

return rayjob, err
}

func NewAutoscalerOptions(options *plugins.AutoscalerOptions) *rayv1.AutoscalerOptions {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's name it as buildAutoscalerOptions to follow the naming convention (e.g. like buildHeadPodTemplate), and make it private

},
WorkerGroupSpecs: []rayv1.WorkerGroupSpec{},
EnableInTreeAutoscaling: &rayJob.RayCluster.EnableAutoscaling,
AutoscalerOptions: autoScalarOptions,
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While protobuf generated GetXXX() has nil check, we can directly pass:

AutoscalerOptions: NewAutoscalerOptions(rayJob.GetRayCluster().GetAutoscalerOptions())

@machichima
Copy link
Copy Markdown
Member

Hi @0yukali0,
Could you briefly describe how you test and provide the test script?
Thank you!

@0yukali0
Copy link
Copy Markdown
Contributor Author

0yukali0 commented Apr 2, 2026

Hi @machichima , i updated the setup steps and screenshot in the PR descriptions.
i am going to add the resources field in headspec and workgroupspec to replace requests and limits in next commit.

0yukali0 added 13 commits April 6, 2026 14:19
Signed-off-by: Yuteng Chen <a08h0283@gmail.com>
Signed-off-by: Yuteng Chen <a08h0283@gmail.com>
Signed-off-by: Yuteng Chen <a08h0283@gmail.com>
Signed-off-by: Yuteng Chen <a08h0283@gmail.com>
Signed-off-by: Yuteng Chen <a08h0283@gmail.com>
Signed-off-by: Yuteng Chen <a08h0283@gmail.com>
Signed-off-by: Yuteng Chen <a08h0283@gmail.com>
Signed-off-by: Yuteng Chen <a08h0283@gmail.com>
Signed-off-by: Yuteng Chen <a08h0283@gmail.com>
Signed-off-by: Yuteng Chen <a08h0283@gmail.com>
Signed-off-by: Yuteng Chen <a08h0283@gmail.com>
Signed-off-by: Yuteng Chen <a08h0283@gmail.com>
Signed-off-by: Yuteng Chen <a08h0283@gmail.com>
@0yukali0
Copy link
Copy Markdown
Contributor Author

with kuberay 1.1.0 and 1.5.1,run ray task with 2.54.1 is worked.
image
image

Comment on lines +155 to +161
resourceRequirements := v1.ResourceRequirements{}
if requests := res.GetRequests(); len(requests) > 0 {
resourceRequirements.Requests = convertResourceEntriesToResourceList(requests)
}
if limits := res.GetLimits(); len(limits) > 0 {
resourceRequirements.Limits = convertResourceEntriesToResourceList(limits)
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use flytek8s.ToK8sResourceRequirements here instead?

func ToK8sResourceRequirements(resources *core.Resources) (*v1.ResourceRequirements, error) {
res := &v1.ResourceRequirements{}
if resources == nil {
return res, nil
}
req, err := ToK8sResourceList(resources.GetRequests())
if err != nil {
return res, err
}
lim, err := ToK8sResourceList(resources.GetLimits())
if err != nil {
return res, err
}
res.Limits = lim
res.Requests = req
return res, nil
}

Comment on lines +165 to +174
autoScalarOptions.Env = []v1.EnvVar{}
for _, env := range envs {
name := env.GetKey()
if val := env.GetValue(); val != "" {
autoScalarOptions.Env = append(autoScalarOptions.Env, v1.EnvVar{
Name: name,
Value: val,
})
}
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we use flytek8s.ToK8sEnvVar instead?

func ToK8sEnvVar(env []*core.KeyValuePair) []v1.EnvVar {

Comment on lines +137 to +138
idleTimeoutTime := options.GetIdleTimeoutSeconds()
autoScalarOptions.IdleTimeoutSeconds = &idleTimeoutTime
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the idleTimeoutTime is not set (= 0), this part will overwrite the default value (60 sec) set in kuberay. we should only set autoScalarOptions.IdleTimeoutSeconds if it's not 0 (not empty)

https://github.com/ray-project/kuberay/blob/0706043ae00a0dedfadead423926136e01daaef9/proto/cluster.proto#L145-L147

https://github.com/ray-project/ray/blob/db711158e25231b77ef2fb968320b2319270732d/python/ray/autoscaler/_private/kuberay/autoscaling_config.py#L108-L111

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @machichima
I will update it and the following test case to check the setting.

}

func buildAutoscalerOptions(options *plugins.AutoscalerOptions) *rayv1.AutoscalerOptions {
var autoScalarOptions *rayv1.AutoscalerOptions
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var autoScalarOptions *rayv1.AutoscalerOptions
var autoScalerOptions *rayv1.AutoscalerOptions

nit

}
}

func TestNewAutoscalerOptions(t *testing.T) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
func TestNewAutoscalerOptions(t *testing.T) {
func TestBuildAutoscalerOptions(t *testing.T) {

nit

assert.Equal(t, resource.MustParse("1Gi"), result.Resources.Limits[corev1.ResourceMemory])
})

t.Run("env literal value", func(t *testing.T) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we add a test for env with empty value (should be skipped)?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

flytek8s.ToK8sEnvVar allows empty value, so i will skip the test with empty value.

assert.Nil(t, buildAutoscalerOptions(nil))
})

t.Run("idle timeout propagated", func(t *testing.T) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we also add a test like this:

t.Run("idle timeout zero should not be set", func(t *testing.T) {
      result := buildAutoscalerOptions(&plugins.AutoscalerOptions{})
      require.NotNil(t, result)
      assert.Nil(t, result.IdleTimeoutSeconds)
  })

Which is related to my comment above: https://github.com/flyteorg/flyte/pull/7111/changes#r3114611720
If the IdleTimeoutSeconds is not set, it should remain empty so that Ray can apply its default value

Signed-off-by: Yuteng Chen <a08h0283@gmail.com>
Signed-off-by: Yuteng Chen <a08h0283@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants