Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/codespell.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Harden Runner
uses: step-security/harden-runner@5ef0c079ce82195b2a36a210272d6b661572d83e # v2.14.2
uses: step-security/harden-runner@a90bcbc6539c36a85cdfeb73f7e2f433735f215b # v2.15.0
with:
egress-policy: audit

Expand Down
40 changes: 33 additions & 7 deletions cmd/hubagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"k8s.io/apimachinery/pkg/runtime"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
clusterinventory "sigs.k8s.io/cluster-inventory-api/apis/v1alpha1"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -102,21 +103,46 @@ func main() {
// Set up controller-runtime logger
ctrl.SetLogger(zap.New(zap.UseDevMode(true)))

config := ctrl.GetConfigOrDie()
config.QPS, config.Burst = float32(opts.CtrlMgrOpts.HubQPS), opts.CtrlMgrOpts.HubBurst
// Create separate configs for the general access purpose and the leader election purpose.
//
// This aims to improve the availability of the hub agent; originally all access to the API
// server would share the same rate limiting configuration, and under adverse conditions (e.g.,
// large volume of concurrent placements) controllers would exhause all tokens in the rate limiter,
// and effectively starve the leader election process (the runtime can no longer renew leases),
// which would trigger the hub agent to restart even though the system remains functional.
defaultCfg := ctrl.GetConfigOrDie()
leaderElectionCfg := rest.CopyConfig(defaultCfg)

defaultCfg.QPS, defaultCfg.Burst = float32(opts.CtrlMgrOpts.HubQPS), opts.CtrlMgrOpts.HubBurst
leaderElectionCfg.QPS, leaderElectionCfg.Burst = float32(opts.LeaderElectionOpts.LeaderElectionQPS), opts.LeaderElectionOpts.LeaderElectionBurst

mgrOpts := ctrl.Options{
Scheme: scheme,
Cache: cache.Options{
SyncPeriod: &opts.CtrlMgrOpts.ResyncPeriod.Duration,
DefaultTransform: cache.TransformStripManagedFields(),
},
LeaderElection: opts.LeaderElectionOpts.LeaderElect,
LeaderElectionID: "136224848560.hub.fleet.azure.com",
LeaderElectionNamespace: opts.LeaderElectionOpts.ResourceNamespace,
LeaderElection: opts.LeaderElectionOpts.LeaderElect,
LeaderElectionConfig: leaderElectionCfg,
// If leader election is enabled, the hub agent by default uses a setup
// with a lease duration of 60 secs, a renew deadline of 45 secs, and a retry period of 5 secs.
// This setup gives the hub agent up to 9 attempts/45 seconds to renew its leadership lease
// before it loses the leadership and restarts.
//
// These values are set significantly higher than the controller-runtime defaults
// (15 seconds, 10 seconds, and 2 seconds respectively), as under heavy loads the hub agent
// might have difficulty renewing its lease in time due to API server side latencies, which
// might further lead to unexpected leadership losses (even when it is the only candidate
// running) and restarts.
//
// Note (chenyu1): a minor side effect with the higher values is that when the agent does restart,
// (or in the future when we do run multiple hub agent replicas), the new leader might have to wait a bit
// longer (up to 60 seconds) to acquire the leadership, which should still be acceptable in most scenarios.
LeaseDuration: &opts.LeaderElectionOpts.LeaseDuration.Duration,
RenewDeadline: &opts.LeaderElectionOpts.RenewDeadline.Duration,
RetryPeriod: &opts.LeaderElectionOpts.RetryPeriod.Duration,
LeaderElectionID: "136224848560.hub.fleet.azure.com",
LeaderElectionNamespace: opts.LeaderElectionOpts.ResourceNamespace,
HealthProbeBindAddress: opts.CtrlMgrOpts.HealthProbeBindAddress,
Metrics: metricsserver.Options{
BindAddress: opts.CtrlMgrOpts.MetricsBindAddress,
Expand All @@ -129,7 +155,7 @@ func main() {
if opts.CtrlMgrOpts.EnablePprof {
mgrOpts.PprofBindAddress = fmt.Sprintf(":%d", opts.CtrlMgrOpts.PprofPort)
}
mgr, err := ctrl.NewManager(config, mgrOpts)
mgr, err := ctrl.NewManager(defaultCfg, mgrOpts)
if err != nil {
klog.ErrorS(err, "unable to start controller manager.")
exitWithErrorFunc()
Expand Down Expand Up @@ -187,7 +213,7 @@ func main() {
}

ctx := ctrl.SetupSignalHandler()
if err := workload.SetupControllers(ctx, &wg, mgr, config, opts); err != nil {
if err := workload.SetupControllers(ctx, &wg, mgr, defaultCfg, opts); err != nil {
klog.ErrorS(err, "unable to set up controllers")
exitWithErrorFunc()
}
Expand Down
90 changes: 87 additions & 3 deletions cmd/hubagent/options/leaderelection.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package options

import (
"flag"
"fmt"
"strconv"
"time"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand Down Expand Up @@ -53,6 +55,18 @@ type LeaderElectionOptions struct {
// The namespace of the resource object that will be used to lock during leader election cycles.
// This option only applies if leader election is enabled.
ResourceNamespace string

// The QPS limit set to the rate limiter of the Kubernetes client in use by the controller manager
// for leader election purposes. This sets up a separate client-side throttling mechanism specifically
// for lease related operations, mostly to avoid an adverse situation where normal operations
// in the controller manager starve the lease related operations, and thus cause unexpected leadership losses.
LeaderElectionQPS float64

// The burst limit set to the rate limiter of the Kubernetes client in use by the controller manager
// for leader election purposes. This sets up a separate client-side throttling mechanism specifically
// for lease related operations, mostly to avoid an adverse situation where normal operations
// in the controller manager starve the lease related operations, and thus cause unexpected leadership losses.
LeaderElectionBurst int
}

// AddFlags adds flags for LeaderElectionOptions to the specified FlagSet.
Expand All @@ -69,23 +83,23 @@ func (o *LeaderElectionOptions) AddFlags(flags *flag.FlagSet) {
flags.DurationVar(
&o.LeaseDuration.Duration,
"leader-lease-duration",
15*time.Second,
60*time.Second,
"The duration of a leader election lease. This is the period where a non-leader candidate will wait after observing a leadership renewal before attempting to acquire leadership of the current leader. And it is also effectively the maximum duration that a leader can be stopped before it is replaced by another candidate. The option only applies if leader election is enabled.",
)

// This input is sent to the controller manager for validation; no further check here.
flags.DurationVar(
&o.RenewDeadline.Duration,
"leader-renew-deadline",
10*time.Second,
45*time.Second,
"The interval between attempts by the acting master to renew a leadership slot before it stops leading. This must be less than or equal to the lease duration. The option only applies if leader election is enabled",
)

// This input is sent to the controller manager for validation; no further check here.
flags.DurationVar(
&o.RetryPeriod.Duration,
"leader-retry-period",
2*time.Second,
5*time.Second,
"The duration the clients should wait between attempting acquisition and renewal of a leadership. The option only applies if leader election is enabled",
)

Expand All @@ -96,4 +110,74 @@ func (o *LeaderElectionOptions) AddFlags(flags *flag.FlagSet) {
utils.FleetSystemNamespace,
"The namespace of the resource object that will be used to lock during leader election cycles. The option only applies if leader election is enabled.",
)

flags.Var(
newLeaderElectionQPSValueWithValidation(250, &o.LeaderElectionQPS),
"leader-election-qps",
"The QPS limit set to the rate limiter of the Kubernetes client in use by the controller manager for leader election purposes. This sets up a separate client-side throttling mechanism specifically for lease related operations, mostly to avoid an adverse situation where normal operations in the controller manager starve the lease related operations, and thus cause unexpected leadership losses. Defaults to 250. Use a positive float64 value in the range [10.0, 1000.0], or set a less or equal to zero value to disable client-side throttling.")

flags.Var(
newLeaderElectionBurstValueWithValidation(1000, &o.LeaderElectionBurst),
"leader-election-burst",
"The burst limit set to the rate limiter of the Kubernetes client in use by the controller manager for leader election purposes. This sets up a separate client-side throttling mechanism specifically for lease related operations, mostly to avoid an adverse situation where normal operations in the controller manager starve the lease related operations, and thus cause unexpected leadership losses. Defaults to 1000. Use a positive int value in the range [10, 2000].")
}

// A list of flag variables that allow pluggable validation logic when parsing the input args.

type LeaderElectionQPSValueWithValidation float64

func (v *LeaderElectionQPSValueWithValidation) String() string {
return fmt.Sprintf("%f", *v)
}

func (v *LeaderElectionQPSValueWithValidation) Set(s string) error {
// Some validation is also performed on the controller manager side and the client-go side. Just
// to be on the safer side we also impose some limits here.
qps, err := strconv.ParseFloat(s, 64)
if err != nil {
return fmt.Errorf("failed to parse float64 value: %w", err)
}

if qps <= 0.0 {
// Disable client-side throttling.
*v = -1.0
return nil
}

if qps < 10.0 || qps > 1000.0 {
return fmt.Errorf("QPS limit is set to an invalid value (%f), must be a value in the range [10.0, 1000.0]", qps)
}
*v = LeaderElectionQPSValueWithValidation(qps)
return nil
}

func newLeaderElectionQPSValueWithValidation(defaultVal float64, p *float64) *LeaderElectionQPSValueWithValidation {
*p = defaultVal
return (*LeaderElectionQPSValueWithValidation)(p)
}

type LeaderElectionBurstValueWithValidation int

func (v *LeaderElectionBurstValueWithValidation) String() string {
return fmt.Sprintf("%d", *v)
}

func (v *LeaderElectionBurstValueWithValidation) Set(s string) error {
// Some validation is also performed on the controller manager side and the client-go side. Just
// to be on the safer side we also impose some limits here.
burst, err := strconv.Atoi(s)
if err != nil {
return fmt.Errorf("failed to parse int value: %w", err)
}

if burst < 10 || burst > 2000 {
return fmt.Errorf("burst limit is set to an invalid value (%d), must be a value in the range [10, 2000]", burst)
}
*v = LeaderElectionBurstValueWithValidation(burst)
return nil
}

func newLeaderElectionBurstValueWithValidation(defaultVal int, p *int) *LeaderElectionBurstValueWithValidation {
*p = defaultVal
return (*LeaderElectionBurstValueWithValidation)(p)
}
82 changes: 72 additions & 10 deletions cmd/hubagent/options/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,13 @@ func TestLeaderElectionOpts(t *testing.T) {
flagSetName: "allDefault",
args: []string{},
wantLeaderElectionOpts: LeaderElectionOptions{
LeaderElect: false,
LeaseDuration: metav1.Duration{Duration: 15 * time.Second},
RenewDeadline: metav1.Duration{Duration: 10 * time.Second},
RetryPeriod: metav1.Duration{Duration: 2 * time.Second},
ResourceNamespace: utils.FleetSystemNamespace,
LeaderElect: false,
LeaseDuration: metav1.Duration{Duration: 60 * time.Second},
RenewDeadline: metav1.Duration{Duration: 45 * time.Second},
RetryPeriod: metav1.Duration{Duration: 5 * time.Second},
ResourceNamespace: utils.FleetSystemNamespace,
LeaderElectionQPS: 250.0,
LeaderElectionBurst: 1000,
},
},
{
Expand All @@ -59,15 +61,75 @@ func TestLeaderElectionOpts(t *testing.T) {
"--leader-renew-deadline=20s",
"--leader-retry-period=5s",
"--leader-election-namespace=test-namespace",
"--leader-election-qps=500",
"--leader-election-burst=1500",
},
wantLeaderElectionOpts: LeaderElectionOptions{
LeaderElect: true,
LeaseDuration: metav1.Duration{Duration: 30 * time.Second},
RenewDeadline: metav1.Duration{Duration: 20 * time.Second},
RetryPeriod: metav1.Duration{Duration: 5 * time.Second},
ResourceNamespace: "test-namespace",
LeaderElect: true,
LeaseDuration: metav1.Duration{Duration: 30 * time.Second},
RenewDeadline: metav1.Duration{Duration: 20 * time.Second},
RetryPeriod: metav1.Duration{Duration: 5 * time.Second},
ResourceNamespace: "test-namespace",
LeaderElectionQPS: 500.0,
LeaderElectionBurst: 1500,
},
},
{
name: "negative leader election QPS value",
flagSetName: "qpsNegative",
args: []string{"--leader-election-qps=-5"},
wantLeaderElectionOpts: LeaderElectionOptions{
LeaderElect: false,
LeaseDuration: metav1.Duration{Duration: 60 * time.Second},
RenewDeadline: metav1.Duration{Duration: 45 * time.Second},
RetryPeriod: metav1.Duration{Duration: 5 * time.Second},
ResourceNamespace: utils.FleetSystemNamespace,
LeaderElectionQPS: -1,
LeaderElectionBurst: 1000,
},
},
{
name: "leader election QPS parse error",
flagSetName: "qpsParseError",
args: []string{"--leader-election-qps=abc"},
wantErred: true,
wantErrMsgSubStr: "failed to parse float64 value",
},
{
name: "leader election QPS out of range (too small)",
flagSetName: "qpsOutOfRangeTooSmall",
args: []string{"--leader-election-qps=9.9"},
wantErred: true,
wantErrMsgSubStr: "QPS limit is set to an invalid value",
},
{
name: "leader election QPS out of range (too large)",
flagSetName: "qpsOutOfRangeTooLarge",
args: []string{"--leader-election-qps=1000.1"},
wantErred: true,
wantErrMsgSubStr: "QPS limit is set to an invalid value",
},
{
name: "leader election burst parse error",
flagSetName: "burstParseError",
args: []string{"--leader-election-burst=abc"},
wantErred: true,
wantErrMsgSubStr: "failed to parse int value",
},
{
name: "leader election burst out of range (too small)",
flagSetName: "burstOutOfRangeTooSmall",
args: []string{"--leader-election-burst=9"},
wantErred: true,
wantErrMsgSubStr: "burst limit is set to an invalid value",
},
{
name: "leader election burst out of range (too large)",
flagSetName: "burstOutOfRangeTooLarge",
args: []string{"--leader-election-burst=2001"},
wantErred: true,
wantErrMsgSubStr: "burst limit is set to an invalid value",
},
}

for _, tc := range testCases {
Expand Down
5 changes: 5 additions & 0 deletions cmd/hubagent/options/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,11 @@ func (o *Options) Validate() field.ErrorList {
errs = append(errs, field.Invalid(newPath.Child("HubBurst"), o.CtrlMgrOpts.HubBurst, "The burst limit for client-side throttling must be greater than or equal to its QPS limit"))
}

// Cross-field validation for leader election options.
if float64(o.LeaderElectionOpts.LeaderElectionBurst) < float64(o.LeaderElectionOpts.LeaderElectionQPS) {
errs = append(errs, field.Invalid(newPath.Child("LeaderElectionBurst"), o.LeaderElectionOpts.LeaderElectionBurst, "The burst limit for client-side throttling of leader election related operations must be greater than or equal to its QPS limit"))
}

// Cross-field validation for webhook options.

// Note: this validation logic is a bit weird in the sense that the system accepts
Expand Down
11 changes: 11 additions & 0 deletions cmd/hubagent/options/validation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ func newTestOptions(modifyOptions ModifyOptions) Options {
HubQPS: 250,
HubBurst: 1000,
},
LeaderElectionOpts: LeaderElectionOptions{
LeaderElectionQPS: 250.0,
LeaderElectionBurst: 1000,
},
WebhookOpts: WebhookOptions{
ClientConnectionType: "url",
ServiceName: testWebhookServiceName,
Expand Down Expand Up @@ -78,6 +82,13 @@ func TestValidateControllerManagerConfiguration(t *testing.T) {
}),
want: field.ErrorList{field.Invalid(newPath.Child("HubBurst"), 50, "The burst limit for client-side throttling must be greater than or equal to its QPS limit")},
},
"invalid: leader election burst value is less than its QPS value": {
opt: newTestOptions(func(option *Options) {
option.LeaderElectionOpts.LeaderElectionQPS = 100
option.LeaderElectionOpts.LeaderElectionBurst = 50
}),
want: field.ErrorList{field.Invalid(newPath.Child("LeaderElectionBurst"), 50, "The burst limit for client-side throttling of leader election related operations must be greater than or equal to its QPS limit")},
},
"WebhookServiceName is empty": {
opt: newTestOptions(func(option *Options) {
option.WebhookOpts.EnableWebhooks = true
Expand Down
5 changes: 2 additions & 3 deletions pkg/controllers/updaterun/initialization.go
Original file line number Diff line number Diff line change
Expand Up @@ -623,9 +623,8 @@ func (r *Reconciler) getResourceSnapshotObjs(ctx context.Context, placement plac
}

if latestResourceSnapshot == nil {
err := fmt.Errorf("no resource snapshot created for placement `%s`", placementKey)
klog.ErrorS(err, "Failed to create resource snapshot", "placement", placementKey, "updateRun", updateRunRef)
return nil, err
err := fmt.Errorf("no resource snapshot created for placement `%s` but there is no error returned by getOrCreate", placementKey)
return nil, controller.NewUnexpectedBehaviorError(err)
}

// Return the master snapshot directly rather than listing from the cache, because
Expand Down
Loading
Loading