Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 10 additions & 10 deletions flytestdlib/resolver/k8s_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (

"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -133,16 +133,16 @@ func (k *kResolver) Close() {
logger.Infof(k.ctx, "k8s resolver: closed")
}

func (k *kResolver) resolve(e *v1.Endpoints) {
func (k *kResolver) resolve(e *discoveryv1.EndpointSlice) {
var newAddrs []resolver.Address
for _, subset := range e.Subsets {
port := k.target.port

for _, address := range subset.Addresses {
for _, endpoint := range e.Endpoints {
if endpoint.Conditions.Ready != nil && !*endpoint.Conditions.Ready {
continue
}
for _, address := range endpoint.Addresses {
newAddrs = append(newAddrs, resolver.Address{
Addr: net.JoinHostPort(address.IP, port),
Addr: net.JoinHostPort(address, k.target.port),
ServerName: fmt.Sprintf("%s.%s", k.target.serviceName, k.target.serviceNamespace),
Metadata: nil,
})
}
}
Expand All @@ -158,7 +158,7 @@ func (k *kResolver) run() {

logger.Infof(k.ctx, "Starting k8s resolver for target: [%s], service namespace: [%s], service name: [%s]", k.target, k.target.serviceNamespace, k.target.serviceName)

watcher, err := k.k8sClient.CoreV1().Endpoints(k.target.serviceNamespace).Watch(k.ctx, metav1.ListOptions{FieldSelector: "metadata.name=" + k.target.serviceName})
watcher, err := k.k8sClient.DiscoveryV1().EndpointSlices(k.target.serviceNamespace).Watch(k.ctx, metav1.ListOptions{LabelSelector: "kubernetes.io/service-name=" + k.target.serviceName})
if err != nil {
logger.Errorf(
k.ctx,
Expand Down Expand Up @@ -187,7 +187,7 @@ func (k *kResolver) run() {
if event.Object == nil {
continue
}
k.resolve(event.Object.(*v1.Endpoints))
k.resolve(event.Object.(*discoveryv1.EndpointSlice))
}
}
}
30 changes: 15 additions & 15 deletions flytestdlib/resolver/k8s_resolver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ import (
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
"gotest.tools/assert"
v1 "k8s.io/api/core/v1"
discoveryv1 "k8s.io/api/discovery/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
testclient "k8s.io/client-go/kubernetes/fake"
"k8s.io/utils/ptr"
)

func parseTarget(target string) resolver.Target {
Expand Down Expand Up @@ -68,24 +69,23 @@ func TestBuilder(t *testing.T) {
// Make sure watcher is started before we create the endpoint
time.Sleep(2 * time.Second)

_, err = k8sClient.CoreV1().Endpoints("flyte").Create(context.Background(), &v1.Endpoints{
_, err = k8sClient.DiscoveryV1().EndpointSlices("flyte").Create(context.Background(), &discoveryv1.EndpointSlice{
ObjectMeta: metav1.ObjectMeta{
Name: "flyteagent",
Name: "flyteagent-abc12",
Namespace: "flyte",
Labels: map[string]string{"kubernetes.io/service-name": "flyteagent"},
},
Subsets: []v1.EndpointSubset{
AddressType: discoveryv1.AddressTypeIPv4,
Endpoints: []discoveryv1.Endpoint{
{
Addresses: []v1.EndpointAddress{
{
IP: "10.0.0.1",
},
},
Ports: []v1.EndpointPort{
{
Name: "grpc",
Port: 8000,
},
},
Addresses: []string{"10.0.0.1"},
Conditions: discoveryv1.EndpointConditions{Ready: ptr.To(true)},
},
},
Ports: []discoveryv1.EndpointPort{
{
Name: ptr.To("grpc"),
Port: ptr.To(int32(8000)),
},
},
}, metav1.CreateOptions{})
Expand Down
Loading