diff --git a/flytestdlib/resolver/k8s_resolver.go b/flytestdlib/resolver/k8s_resolver.go index c3ccf0f443f..9316613e9e9 100644 --- a/flytestdlib/resolver/k8s_resolver.go +++ b/flytestdlib/resolver/k8s_resolver.go @@ -9,7 +9,7 @@ import ( "google.golang.org/grpc/grpclog" "google.golang.org/grpc/resolver" - v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" @@ -133,16 +133,16 @@ func (k *kResolver) Close() { logger.Infof(k.ctx, "k8s resolver: closed") } -func (k *kResolver) resolve(e *v1.Endpoints) { +func (k *kResolver) resolve(e *discoveryv1.EndpointSlice) { var newAddrs []resolver.Address - for _, subset := range e.Subsets { - port := k.target.port - - for _, address := range subset.Addresses { + for _, endpoint := range e.Endpoints { + if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready { + continue + } + for _, address := range endpoint.Addresses { newAddrs = append(newAddrs, resolver.Address{ - Addr: net.JoinHostPort(address.IP, port), + Addr: net.JoinHostPort(address, k.target.port), ServerName: fmt.Sprintf("%s.%s", k.target.serviceName, k.target.serviceNamespace), - Metadata: nil, }) } } @@ -158,7 +158,7 @@ func (k *kResolver) run() { logger.Infof(k.ctx, "Starting k8s resolver for target: [%s], service namespace: [%s], service name: [%s]", k.target, k.target.serviceNamespace, k.target.serviceName) - watcher, err := k.k8sClient.CoreV1().Endpoints(k.target.serviceNamespace).Watch(k.ctx, metav1.ListOptions{FieldSelector: "metadata.name=" + k.target.serviceName}) + watcher, err := k.k8sClient.DiscoveryV1().EndpointSlices(k.target.serviceNamespace).Watch(k.ctx, metav1.ListOptions{LabelSelector: "kubernetes.io/service-name=" + k.target.serviceName}) if err != nil { logger.Errorf( k.ctx, @@ -187,7 +187,7 @@ func (k *kResolver) run() { if event.Object == nil { continue } - k.resolve(event.Object.(*v1.Endpoints)) + k.resolve(event.Object.(*discoveryv1.EndpointSlice)) } } } diff --git a/flytestdlib/resolver/k8s_resolver_test.go b/flytestdlib/resolver/k8s_resolver_test.go index 18aa22c80a3..3c9cac7b8bd 100644 --- a/flytestdlib/resolver/k8s_resolver_test.go +++ b/flytestdlib/resolver/k8s_resolver_test.go @@ -11,9 +11,10 @@ import ( "google.golang.org/grpc/resolver" "google.golang.org/grpc/serviceconfig" "gotest.tools/assert" - v1 "k8s.io/api/core/v1" + discoveryv1 "k8s.io/api/discovery/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" testclient "k8s.io/client-go/kubernetes/fake" + "k8s.io/utils/ptr" ) func parseTarget(target string) resolver.Target { @@ -68,24 +69,23 @@ func TestBuilder(t *testing.T) { // Make sure watcher is started before we create the endpoint time.Sleep(2 * time.Second) - _, err = k8sClient.CoreV1().Endpoints("flyte").Create(context.Background(), &v1.Endpoints{ + _, err = k8sClient.DiscoveryV1().EndpointSlices("flyte").Create(context.Background(), &discoveryv1.EndpointSlice{ ObjectMeta: metav1.ObjectMeta{ - Name: "flyteagent", + Name: "flyteagent-abc12", Namespace: "flyte", + Labels: map[string]string{"kubernetes.io/service-name": "flyteagent"}, }, - Subsets: []v1.EndpointSubset{ + AddressType: discoveryv1.AddressTypeIPv4, + Endpoints: []discoveryv1.Endpoint{ { - Addresses: []v1.EndpointAddress{ - { - IP: "10.0.0.1", - }, - }, - Ports: []v1.EndpointPort{ - { - Name: "grpc", - Port: 8000, - }, - }, + Addresses: []string{"10.0.0.1"}, + Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)}, + }, + }, + Ports: []discoveryv1.EndpointPort{ + { + Name: ptr.To("grpc"), + Port: ptr.To(int32(8000)), }, }, }, metav1.CreateOptions{})