From 40baaf8714912a5fe4556fae731f5bed1e031eea Mon Sep 17 00:00:00 2001 From: Zach Smith Date: Tue, 10 Mar 2026 22:48:14 -0700 Subject: [PATCH] feat: move DecodeCallback to storage package, add SetFeatureSupported - Move DecodeCallback from etcd3 to storage package so non-etcd backends (Spanner) can use it - Add deprecated aliases in etcd3 for backwards compatibility - Add SetFeatureSupported() on feature checker so non-etcd backends can declare native feature support (e.g. RequestWatchProgress) Co-Authored-By: Claude Opus 4.6 --- .../pkg/storage/cacher/lister_watcher.go | 3 +- .../apiserver/pkg/storage/decode_callback.go | 42 +++++++++++++++++++ .../pkg/storage/etcd3/decode_callback.go | 20 ++++----- .../feature/feature_support_checker.go | 29 ++++++++++++- 4 files changed, 79 insertions(+), 15 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/storage/decode_callback.go diff --git a/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher.go b/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher.go index a4802cd5e8f..969e8b9282f 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/cacher/lister_watcher.go @@ -31,7 +31,6 @@ import ( "k8s.io/apimachinery/pkg/watch" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/storage" - "k8s.io/apiserver/pkg/storage/etcd3" utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/consistencydetector" @@ -109,7 +108,7 @@ func (lw *listerWatcher) List(options metav1.ListOptions) (runtime.Object, error var keyMap map[string]string if lw.identityFromKey != nil && lw.wrapObject != nil { keyMap = make(map[string]string) - ctx = etcd3.WithDecodeCallback(ctx, func(obj runtime.Object, storageKey string, modRev int64) { + ctx = storage.WithDecodeCallback(ctx, func(obj runtime.Object, storageKey string, modRev int64) { accessor, err := meta.Accessor(obj) if err != nil { return diff --git a/staging/src/k8s.io/apiserver/pkg/storage/decode_callback.go b/staging/src/k8s.io/apiserver/pkg/storage/decode_callback.go new file mode 100644 index 00000000000..5530d3b0e6c --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storage/decode_callback.go @@ -0,0 +1,42 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "context" + + "k8s.io/apimachinery/pkg/runtime" +) + +type decodeCallbackKeyType struct{} + +// DecodeCallback is called for each item decoded during GetList, +// providing the decoded object, its storage-relative key (backend prefix +// stripped), and the modification revision. +type DecodeCallback func(obj runtime.Object, storageKey string, modRevision int64) + +// WithDecodeCallback returns a context that carries a DecodeCallback. +// The callback will be invoked for each item decoded in GetList. +func WithDecodeCallback(ctx context.Context, cb DecodeCallback) context.Context { + return context.WithValue(ctx, decodeCallbackKeyType{}, cb) +} + +// DecodeCallbackFromContext extracts the DecodeCallback from the context, if any. +func DecodeCallbackFromContext(ctx context.Context) DecodeCallback { + cb, _ := ctx.Value(decodeCallbackKeyType{}).(DecodeCallback) + return cb +} diff --git a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/decode_callback.go b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/decode_callback.go index 0e3514e53c3..624338f6955 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/etcd3/decode_callback.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/etcd3/decode_callback.go @@ -19,23 +19,19 @@ package etcd3 import ( "context" - "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apiserver/pkg/storage" ) -type decodeCallbackKeyType struct{} +// DecodeCallback is an alias for storage.DecodeCallback. +// Deprecated: use storage.DecodeCallback directly. +type DecodeCallback = storage.DecodeCallback -// DecodeCallback is called for each item decoded during GetList, -// providing the decoded object, its storage-relative key (etcd prefix -// stripped), and the etcd mod revision. -type DecodeCallback func(obj runtime.Object, storageKey string, modRevision int64) - -// WithDecodeCallback returns a context that carries a DecodeCallback. -// The callback will be invoked for each item decoded in GetList. +// WithDecodeCallback is an alias for storage.WithDecodeCallback. +// Deprecated: use storage.WithDecodeCallback directly. func WithDecodeCallback(ctx context.Context, cb DecodeCallback) context.Context { - return context.WithValue(ctx, decodeCallbackKeyType{}, cb) + return storage.WithDecodeCallback(ctx, cb) } func decodeCallbackFromContext(ctx context.Context) DecodeCallback { - cb, _ := ctx.Value(decodeCallbackKeyType{}).(DecodeCallback) - return cb + return storage.DecodeCallbackFromContext(ctx) } diff --git a/staging/src/k8s.io/apiserver/pkg/storage/feature/feature_support_checker.go b/staging/src/k8s.io/apiserver/pkg/storage/feature/feature_support_checker.go index e664d22a646..a6373b11835 100644 --- a/staging/src/k8s.io/apiserver/pkg/storage/feature/feature_support_checker.go +++ b/staging/src/k8s.io/apiserver/pkg/storage/feature/feature_support_checker.go @@ -41,6 +41,15 @@ var ( DefaultFeatureSupportChecker FeatureSupportChecker = newDefaultFeatureSupportChecker() ) +// SetFeatureSupported explicitly marks a feature as supported in the default +// checker. Used by non-etcd storage backends (e.g. Spanner) that implement +// features like RequestWatchProgress natively. +func SetFeatureSupported(feature storage.Feature, supported bool) { + if d, ok := DefaultFeatureSupportChecker.(*defaultFeatureSupportChecker); ok { + d.SetSupported(feature, supported) + } +} + // FeatureSupportChecker to define Supports functions. type FeatureSupportChecker interface { // Supports check if the feature is supported or not by checking internal cache. @@ -61,12 +70,17 @@ type FeatureSupportChecker interface { type defaultFeatureSupportChecker struct { lock sync.Mutex progressNotifySupported *bool - checkingEndpoint map[string]struct{} + // forcedFeatures overrides etcd-detected support. Once a feature is + // force-set, CheckClient cannot override it. Used by non-etcd storage + // backends (e.g. Spanner) that implement features natively. + forcedFeatures map[storage.Feature]bool + checkingEndpoint map[string]struct{} } func newDefaultFeatureSupportChecker() *defaultFeatureSupportChecker { return &defaultFeatureSupportChecker{ checkingEndpoint: make(map[string]struct{}), + forcedFeatures: make(map[storage.Feature]bool), } } @@ -77,6 +91,9 @@ func (f *defaultFeatureSupportChecker) Supports(feature storage.Feature) bool { f.lock.Lock() defer f.lock.Unlock() + if v, ok := f.forcedFeatures[feature]; ok { + return v + } return ptr.Deref(f.progressNotifySupported, false) default: runtime.HandleError(fmt.Errorf("feature %q is not implemented in DefaultFeatureSupportChecker", feature)) @@ -84,6 +101,16 @@ func (f *defaultFeatureSupportChecker) Supports(feature storage.Feature) bool { } } +// SetSupported explicitly marks a feature as supported, overriding any +// etcd-detected value. Once set, CheckClient cannot override it. This is +// used by non-etcd storage backends (e.g. Spanner) that implement the +// feature natively. +func (f *defaultFeatureSupportChecker) SetSupported(feature storage.Feature, supported bool) { + f.lock.Lock() + defer f.lock.Unlock() + f.forcedFeatures[feature] = supported +} + // CheckClient accepts client and calculate the support per endpoint and caches it. func (f *defaultFeatureSupportChecker) CheckClient(ctx context.Context, c client, feature storage.Feature) { switch feature {