From ea570c11611ee37f419b9099602d0d8b21a7587f Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Fri, 3 Jan 2020 14:45:56 +0000 Subject: [PATCH 1/7] Implement the new Envoy gRPC API --- catalog/services_state.go | 22 +- catalog/url_listener.go | 2 +- config.go => config/config.go | 18 +- envoy/adapter/adapter.go | 307 ++++++++++++++++++++++++++++ envoy/listener.go | 45 ++++ envoy/server.go | 126 ++++++++++++ envoy/server_test.go | 375 ++++++++++++++++++++++++++++++++++ go.mod | 7 +- go.sum | 45 +++- main.go | 56 +++-- services_delegate.go | 6 +- sidecarhttp/envoy_api.go | 54 +---- 12 files changed, 975 insertions(+), 88 deletions(-) rename config.go => config/config.go (84%) create mode 100644 envoy/adapter/adapter.go create mode 100644 envoy/listener.go create mode 100644 envoy/server.go create mode 100644 envoy/server_test.go diff --git a/catalog/services_state.go b/catalog/services_state.go index db688418..1ce92e0b 100644 --- a/catalog/services_state.go +++ b/catalog/services_state.go @@ -24,15 +24,16 @@ import ( // servers to Service lists and manages the lifecycle. const ( - TOMBSTONE_LIFESPAN = 3 * time.Hour // How long we keep tombstones around - TOMBSTONE_COUNT = 10 // Send tombstones at 1 per second 10 times - ALIVE_COUNT = 5 // Send new services at 1 per second 5 times - TOMBSTONE_SLEEP_INTERVAL = 2 * time.Second // Sleep between local service checks - TOMBSTONE_RETRANSMIT = 1 * time.Second // Time between tombstone retranmission - ALIVE_LIFESPAN = 1*time.Minute + 20*time.Second // Down if not heard from in 80 seconds - DRAINING_LIFESPAN = 10 * time.Minute // Down if not heard from in 10 minutes - ALIVE_SLEEP_INTERVAL = 1 * time.Second // Sleep between local service checks - ALIVE_BROADCAST_INTERVAL = 1 * time.Minute // Broadcast Alive messages every minute + TOMBSTONE_LIFESPAN = 3 * time.Hour // How long we keep tombstones around + TOMBSTONE_COUNT = 10 // Send tombstones at 1 per second 10 times + ALIVE_COUNT = 5 // Send new services at 1 per second 5 times + TOMBSTONE_SLEEP_INTERVAL = 2 * time.Second // Sleep between local service checks + TOMBSTONE_RETRANSMIT = 1 * time.Second // Time between tombstone retranmission + ALIVE_LIFESPAN = 1*time.Minute + 20*time.Second // Down if not heard from in 80 seconds + DRAINING_LIFESPAN = 10 * time.Minute // Down if not heard from in 10 minutes + ALIVE_SLEEP_INTERVAL = 1 * time.Second // Sleep between local service checks + ALIVE_BROADCAST_INTERVAL = 1 * time.Minute // Broadcast Alive messages every minute + LISTENER_EVENT_BUFFER_SIZE = 20 // The number of events that can be buffered in the listener eventChannel ) // A ChangeEvent represents the time and hostname that was modified and signals a major @@ -147,6 +148,9 @@ func (state *ServicesState) HasServer(hostname string) bool { // A server has left the cluster, so tombstone all of its records func (state *ServicesState) ExpireServer(hostname string) { + state.Lock() + defer state.Unlock() + if !state.HasServer(hostname) || len(state.Servers[hostname].Services) == 0 { log.Infof("No records to expire for %s", hostname) return diff --git a/catalog/url_listener.go b/catalog/url_listener.go index da76058c..e20c7c48 100644 --- a/catalog/url_listener.go +++ b/catalog/url_listener.go @@ -71,7 +71,7 @@ func NewUrlListener(listenurl string, managed bool) *UrlListener { Url: listenurl, looper: director.NewFreeLooper(director.FOREVER, errorChan), Client: &http.Client{Timeout: ClientTimeout, Jar: cookieJar}, - eventChannel: make(chan ChangeEvent, 20), + eventChannel: make(chan ChangeEvent, LISTENER_EVENT_BUFFER_SIZE), Retries: DefaultRetries, managed: managed, name: "UrlListener(" + listenurl + ")", diff --git a/config.go b/config/config.go similarity index 84% rename from config.go rename to config/config.go index 2f4316e7..7c03e010 100644 --- a/config.go +++ b/config/config.go @@ -1,9 +1,10 @@ -package main +package config import ( "time" "github.com/kelseyhightower/envconfig" + log "github.com/sirupsen/logrus" "gopkg.in/relistan/rubberneck.v1" ) @@ -24,6 +25,13 @@ type HAproxyConfig struct { UseHostnames bool `envconfig:"USE_HOSTNAMES"` } +type EnvoyConfig struct { + UseGRPCAPI bool `envconfig:"USE_GRPC_API" default:"true"` + BindIP string `envconfig:"BIND_IP" default:"192.168.168.168"` + UseHostnames bool `envconfig:"USE_HOSTNAMES"` + GRPCPort string `envconfig:"GRPC_PORT" default:"7776"` +} + type ServicesConfig struct { NameMatch string `envconfig:"NAME_MATCH"` ServiceNamer string `envconfig:"NAMER" default:"docker_label"` @@ -59,10 +67,11 @@ type Config struct { StaticDiscovery StaticConfig // STATIC_ Services ServicesConfig // SERVICES_ HAproxy HAproxyConfig // HAPROXY_ + Envoy EnvoyConfig // ENVOY_ Listeners ListenerUrlsConfig // LISTENERS_ } -func parseConfig() Config { +func ParseConfig() *Config { var config Config errs := []error{ @@ -71,15 +80,16 @@ func parseConfig() Config { envconfig.Process("static", &config.StaticDiscovery), envconfig.Process("services", &config.Services), envconfig.Process("haproxy", &config.HAproxy), + envconfig.Process("envoy", &config.Envoy), envconfig.Process("listeners", &config.Listeners), } for _, err := range errs { if err != nil { rubberneck.Print(config) - exitWithError(err, "Can't parse environment config!") + log.Fatalf("Can't parse environment config: %s", err) } } - return config + return &config } diff --git a/envoy/adapter/adapter.go b/envoy/adapter/adapter.go new file mode 100644 index 00000000..48571600 --- /dev/null +++ b/envoy/adapter/adapter.go @@ -0,0 +1,307 @@ +package adapter + +import ( + "fmt" + "net" + "strconv" + "strings" + + "github.com/Nitro/sidecar/catalog" + "github.com/Nitro/sidecar/service" + api "github.com/envoyproxy/go-control-plane/envoy/api/v2" + core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + endpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" + listener "github.com/envoyproxy/go-control-plane/envoy/api/v2/listener" + route "github.com/envoyproxy/go-control-plane/envoy/api/v2/route" + hcm "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2" + tcpp "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/tcp_proxy/v2" + "github.com/envoyproxy/go-control-plane/pkg/cache" + "github.com/envoyproxy/go-control-plane/pkg/wellknown" + "github.com/gogo/protobuf/proto" + "github.com/golang/protobuf/ptypes" + "github.com/golang/protobuf/ptypes/duration" + log "github.com/sirupsen/logrus" +) + +const ( + // ServiceNameSeparator is used to join service name and port. Must not occur in service names. + ServiceNameSeparator = ":" +) + +// SvcName formats an Envoy service name from our service name and port +func SvcName(name string, port int64) string { + return fmt.Sprintf("%s%s%d", name, ServiceNameSeparator, port) +} + +// SvcNameSplit an Enovy service name into our service name and port +func SvcNameSplit(name string) (string, int64, error) { + parts := strings.Split(name, ServiceNameSeparator) + if len(parts) < 2 { + return "", -1, fmt.Errorf("%s", "Unable to split service name and port!") + } + + svcName := parts[0] + svcPort, err := strconv.ParseInt(parts[1], 10, 64) + if err != nil { + return "", -1, fmt.Errorf("%s", "Unable to parse port!") + } + + return svcName, svcPort, nil +} + +// LookupHost does a vv slow lookup of the DNS host for a service. Totally +// not optimized for high throughput. You should only do this in development +// scenarios. +func LookupHost(hostname string) (string, error) { + addrs, err := net.LookupHost(hostname) + + if err != nil { + return "", err + } + return addrs[0], nil +} + +// EnvoyListenersFromState creates a set of Enovy API listener +// definitions from all the ServicePorts in the Sidecar state. +func EnvoyListenersFromState(state *catalog.ServicesState, bindIP string) ([]cache.Resource, error) { + var listeners []cache.Resource + + state.RLock() + defer state.RUnlock() + + svcs := state.ByService() + // Loop over all the services by service name + for _, endpoints := range svcs { + if len(endpoints) < 1 { + continue + } + + var svc *service.Service + // Find the first alive service and use that as the definition. + // If none are alive, we won't open the port. + for _, endpoint := range endpoints { + if endpoint.IsAlive() { + svc = endpoint + break + } + } + + if svc == nil { + continue + } + + // Loop over the ports and generate a named listener for + // each port. + for _, port := range svc.Ports { + // Only listen on ServicePorts + if port.ServicePort < 1 { + continue + } + + listener, err := EnvoyListenerFromService(svc, port.ServicePort, bindIP) + if err != nil { + return nil, fmt.Errorf("failed to create listener from service: %s", err) + } + listeners = append(listeners, listener) + } + } + + log.Debugf("Created %d Envoy listeners", len(listeners)) + + return listeners, nil +} + +// EnvoyListenerFromService creates an Envoy listener from a service instance +func EnvoyListenerFromService(svc *service.Service, port int64, bindIP string) (cache.Resource, error) { + apiName := SvcName(svc.Name, port) + + var connectionManagerName string + var connectionManager proto.Message + switch svc.ProxyMode { + case "http": + connectionManagerName = wellknown.HTTPConnectionManager + + connectionManager = &hcm.HttpConnectionManager{ + StatPrefix: "ingress_http", + HttpFilters: []*hcm.HttpFilter{{ + Name: wellknown.Router, + }}, + RouteSpecifier: &hcm.HttpConnectionManager_RouteConfig{ + RouteConfig: &api.RouteConfiguration{ + VirtualHosts: []*route.VirtualHost{{ + Name: apiName, + Domains: []string{"*"}, + Routes: []*route.Route{{ + Match: &route.RouteMatch{ + PathSpecifier: &route.RouteMatch_Prefix{ + Prefix: "/", + }, + }, + Action: &route.Route_Route{ + Route: &route.RouteAction{ + ClusterSpecifier: &route.RouteAction_Cluster{ + Cluster: apiName, + }, + Timeout: &duration.Duration{}, + }, + }, + }}, + }}, + }, + }, + } + case "tcp": + connectionManagerName = wellknown.TCPProxy + + connectionManager = &tcpp.TcpProxy{ + StatPrefix: "ingress_tcp", + ClusterSpecifier: &tcpp.TcpProxy_Cluster{ + Cluster: apiName, + }, + } + default: + return nil, fmt.Errorf("unrecognised proxy mode: %s", svc.ProxyMode) + } + + serialisedConnectionManager, err := ptypes.MarshalAny(connectionManager) + if err != nil { + return nil, fmt.Errorf("failed to create the connection manager: %s", err) + } + + return &api.Listener{ + Name: svc.Name, + Address: &core.Address{ + Address: &core.Address_SocketAddress{ + SocketAddress: &core.SocketAddress{ + Address: bindIP, + PortSpecifier: &core.SocketAddress_PortValue{ + PortValue: uint32(port), + }, + }, + }, + }, + FilterChains: []*listener.FilterChain{{ + Filters: []*listener.Filter{{ + Name: connectionManagerName, + ConfigType: &listener.Filter_TypedConfig{ + TypedConfig: serialisedConnectionManager, + }, + }}, + }}, + }, nil +} + +// EnvoyClustersFromState genenerates a list of Envoy clusters from the +// current Sidecar state +func EnvoyClustersFromState(state *catalog.ServicesState, useHostnames bool) []cache.Resource { + state.RLock() + defer state.RUnlock() + + // `s.state.ByService()` returns the list of service endpoints for each service. + // Since some services can expose multiple service ports, we need to create a + // separate cluster for each (service, servicePort) pair. If a service doesn't + // have any endpoints that are alive, we don't want to create a cluster for it. + // + // Note that in `EnvoyClustersFromState()` for the REST API we only need + // the first non-nil alive endpoint instance to construct the cluster + // because, in that case, SDS (now EDS) fetches the actual endpoints in a + // separate call. + var clusters []cache.Resource + clustersMap := make(map[string]*api.Cluster) + for svcName, svcEndpoints := range state.ByService() { + if len(svcEndpoints) < 1 { + continue + } + + for _, svcEndpoint := range svcEndpoints { + if svcEndpoint == nil || !svcEndpoint.IsAlive() { + continue + } + + for _, port := range svcEndpoint.Ports { + if port.ServicePort < 1 { + continue + } + + envoyServiceName := SvcName(svcName, port.ServicePort) + + if cluster, ok := clustersMap[envoyServiceName]; ok { + cluster.LoadAssignment.Endpoints[0].LbEndpoints = + append(cluster.LoadAssignment.Endpoints[0].LbEndpoints, + envoyServiceFromService(svcEndpoint, port.ServicePort, useHostnames)...) + } else { + envoyCluster := &api.Cluster{ + Name: envoyServiceName, + ConnectTimeout: &duration.Duration{Nanos: 500000000}, // 500ms + ClusterDiscoveryType: &api.Cluster_Type{Type: api.Cluster_STATIC}, // Use IPs only + ProtocolSelection: api.Cluster_USE_CONFIGURED_PROTOCOL, + // Setting the endpoints here directly bypasses EDS, so we can + // avoid having to configure that as well + LoadAssignment: &api.ClusterLoadAssignment{ + ClusterName: envoyServiceName, + Endpoints: []*endpoint.LocalityLbEndpoints{{ + LbEndpoints: envoyServiceFromService(svcEndpoint, port.ServicePort, useHostnames), + }}, + }, + // Contour believes the IdleTimeout should be set to 60s. Not sure if we also need to enable these. + // See here: https://github.com/projectcontour/contour/blob/2858fec20d26f56cc75a19d91b61d625a86f36de/internal/envoy/listener.go#L102-L106 + // CommonHttpProtocolOptions: &core.HttpProtocolOptions{ + // IdleTimeout: &duration.Duration{Seconds: 60}, + // MaxConnectionDuration: &duration.Duration{Seconds: 60}, + // }, + // If this needs to be enabled, we might also need to set `ProtocolSelection: api.USE_DOWNSTREAM_PROTOCOL`. + // Http2ProtocolOptions: &core.Http2ProtocolOptions{}, + } + + clustersMap[envoyServiceName] = envoyCluster + clusters = append(clusters, envoyCluster) + } + } + } + } + + log.Debugf("Created %d Envoy clusters", len(clusters)) + + return clusters +} + +// envoyServiceFromService converts a Sidecar service to an Envoy +// API service for reporting to the proxy +func envoyServiceFromService(svc *service.Service, svcPort int64, useHostnames bool) []*endpoint.LbEndpoint { + var endpoints []*endpoint.LbEndpoint + for _, port := range svc.Ports { + // No sense worrying about unexposed ports + if port.ServicePort == svcPort { + address := port.IP + + // NOT recommended... this is very slow. Useful in dev modes where you + // need to resolve to a different IP address only. + if useHostnames { + if host, err := LookupHost(svc.Hostname); err == nil { + address = host + } else { + log.Warnf("Unable to resolve %s, using IP address", svc.Hostname) + } + } + + endpoints = append(endpoints, &endpoint.LbEndpoint{ + HostIdentifier: &endpoint.LbEndpoint_Endpoint{ + Endpoint: &endpoint.Endpoint{ + Address: &core.Address{ + Address: &core.Address_SocketAddress{ + SocketAddress: &core.SocketAddress{ + Address: address, + PortSpecifier: &core.SocketAddress_PortValue{ + PortValue: uint32(port.Port), + }, + }, + }, + }, + }, + }, + }) + } + } + + return endpoints +} diff --git a/envoy/listener.go b/envoy/listener.go new file mode 100644 index 00000000..cd83d37b --- /dev/null +++ b/envoy/listener.go @@ -0,0 +1,45 @@ +package envoy + +import ( + "github.com/Nitro/sidecar/catalog" +) + +const ( + listenerEventBufferSize = 100 +) + +// Listener is an internal Sidecar listener that will be hooked up when +// config.Envoy.UseGRPCAPI is true. It only needs to know when the state +// changes, ignoring what the actual change was. +type Listener struct { + eventsChan chan catalog.ChangeEvent +} + +// Chan exposes the internal events channel +func (l *Listener) Chan() chan catalog.ChangeEvent { + return l.eventsChan +} + +// Name returns a unique name for this listener +func (l *Listener) Name() string { + // Be careful not to clash with names assigned automatically in + // service/service.go -> ListenerName() + return "internal-envoy" +} + +// Managed tells Sidecar that it shouldn't try to automatically remove +// this listener +func (l *Listener) Managed() bool { + return false +} + +// NewListener creates a new Listener instance +func NewListener() *Listener { + return &Listener{ + // See catalog/url_listener.go -> NewUrlListener() for a similar mechanism. + // We use a larger buffer here, because, unlike the URL listener, this is + // all processed internally in the same process, so we can buffer/flush + // more events faster. + eventsChan: make(chan catalog.ChangeEvent, listenerEventBufferSize), + } +} diff --git a/envoy/server.go b/envoy/server.go new file mode 100644 index 00000000..df60ddbc --- /dev/null +++ b/envoy/server.go @@ -0,0 +1,126 @@ +package envoy + +import ( + "context" + "net" + "strconv" + "strings" + "time" + + "github.com/Nitro/sidecar/catalog" + "github.com/Nitro/sidecar/config" + "github.com/Nitro/sidecar/envoy/adapter" + api "github.com/envoyproxy/go-control-plane/envoy/api/v2" + envoy_discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" + "github.com/envoyproxy/go-control-plane/pkg/cache" + xds "github.com/envoyproxy/go-control-plane/pkg/server" + "github.com/relistan/go-director" + log "github.com/sirupsen/logrus" + "google.golang.org/grpc" +) + +type xdsCallbacks struct{} + +func (*xdsCallbacks) OnStreamOpen(context.Context, int64, string) error { return nil } +func (*xdsCallbacks) OnStreamClosed(int64) {} +func (*xdsCallbacks) OnStreamRequest(int64, *api.DiscoveryRequest) error { return nil } +func (*xdsCallbacks) OnStreamResponse(_ int64, req *api.DiscoveryRequest, _ *api.DiscoveryResponse) { + if req.GetErrorDetail().GetCode() != 0 { + log.Errorf("Received Envoy error code %d: %s", + req.GetErrorDetail().GetCode(), + strings.ReplaceAll(req.GetErrorDetail().GetMessage(), "\n", ""), + ) + } +} +func (*xdsCallbacks) OnFetchRequest(context.Context, *api.DiscoveryRequest) error { return nil } +func (*xdsCallbacks) OnFetchResponse(*api.DiscoveryRequest, *api.DiscoveryResponse) {} + +// Server is a wrapper around Envoy's control plane xDS gRPC server and it uses +// the Aggregated Discovery Service (ADS) mechanism. +type Server struct { + Listener catalog.Listener + config config.EnvoyConfig + state *catalog.ServicesState + snapshotCache cache.SnapshotCache + xdsServer xds.Server +} + +// Run sets up the Sidecar listener event loop and starts the Envoy gRPC server +func (s *Server) Run(ctx context.Context, looper director.Looper, grpcListener net.Listener) { + grpcServer := grpc.NewServer() + envoy_discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, s.xdsServer) + + go looper.Loop(func() error { + // Block until we get an event indicating a state change. + // We discard the event since we need a snapshot of the entire state. + <-s.Listener.Chan() + + // When a server is expired in catalog/services_state.go -> ExpireServer(), + // the listener will receive an event for each expired service. We want to + // flush the channel to prevent rapid-fire updates to Envoy. + // This was inspired from receiver/receiver.go -> ProcessUpdates(). + // TODO: Think of a more aggressive / reliable way of draining since we + // used a larger value for listenerEventBufferSize. + pendingEventCount := len(s.Listener.Chan()) + for i := 0; i < pendingEventCount; i++ { + <-s.Listener.Chan() + } + + listeners, err := adapter.EnvoyListenersFromState(s.state, s.config.BindIP) + if err != nil { + log.Errorf("Failed to create Envoy listeners: %s", err) + return nil + } + + // We are using `time.Now().UnixNano()` to ensure that all versions we send to + // Envoy are unique. Otherwise, Envoy will skip the update. + version := strconv.FormatInt(time.Now().UnixNano(), 10) + err = s.snapshotCache.SetSnapshot( + // The hostname needs to match the value passed via `--service-node` to Envoy + // See https://github.com/envoyproxy/envoy/issues/144#issuecomment-267401271 + s.state.Hostname, + cache.NewSnapshot( + version, + nil, + adapter.EnvoyClustersFromState(s.state, s.config.UseHostnames), + nil, + listeners, + nil, + ), + ) + if err != nil { + log.Errorf("Failed to create new Envoy cache snapshot: %s", err) + return nil + } + + log.Infof("Envoy configuration updated to version %s", version) + + return nil + }) + + go func() { + if err := grpcServer.Serve(grpcListener); err != nil { + log.Fatalf("Failed to start Envoy gRPC server: %s", err) + } + }() + + // Currently, this will block forever + <-ctx.Done() + grpcServer.GracefulStop() +} + +// NewServer creates a new Server instance +func NewServer(ctx context.Context, state *catalog.ServicesState, config config.EnvoyConfig) *Server { + // Instruct the snapshot cache to use Aggregated Discovery Service (ADS) + // The third parameter can contain a logger instance, but I didn't find + // those logs particularly useful. + snapshotCache := cache.NewSnapshotCache(true, cache.IDHash{}, nil) + + return &Server{ + Listener: NewListener(), + config: config, + state: state, + snapshotCache: snapshotCache, + xdsServer: xds.NewServer(ctx, snapshotCache, &xdsCallbacks{}), + } +} diff --git a/envoy/server_test.go b/envoy/server_test.go new file mode 100644 index 00000000..fa143c89 --- /dev/null +++ b/envoy/server_test.go @@ -0,0 +1,375 @@ +package envoy + +import ( + "context" + "fmt" + "io" + "net" + "sort" + "testing" + "time" + + "github.com/Nitro/sidecar/catalog" + "github.com/Nitro/sidecar/config" + "github.com/Nitro/sidecar/envoy/adapter" + "github.com/Nitro/sidecar/service" + api "github.com/envoyproxy/go-control-plane/envoy/api/v2" + core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + endpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" + hcm "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2" + tcpp "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/tcp_proxy/v2" + envoy_discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" + "github.com/envoyproxy/go-control-plane/pkg/cache" + xds "github.com/envoyproxy/go-control-plane/pkg/server" + "github.com/envoyproxy/go-control-plane/pkg/wellknown" + "github.com/golang/protobuf/ptypes" + "github.com/golang/protobuf/ptypes/any" + "github.com/relistan/go-director" + . "github.com/smartystreets/goconvey/convey" + "google.golang.org/grpc" +) + +const ( + bindIP = "192.168.168.168" +) + +var ( + validators = map[string]func(*any.Any, service.Service){ + cache.ListenerType: validateListener, + cache.ClusterType: validateCluster, + } +) + +func validateListener(serialisedListener *any.Any, svc service.Service) { + listener := &api.Listener{} + err := ptypes.UnmarshalAny(serialisedListener, listener) + So(err, ShouldBeNil) + So(listener.Name, ShouldEqual, svc.Name) + So(listener.GetAddress().GetSocketAddress().GetAddress(), ShouldEqual, bindIP) + So(listener.GetAddress().GetSocketAddress().GetPortValue(), ShouldEqual, svc.Ports[0].ServicePort) + filterChains := listener.GetFilterChains() + So(filterChains, ShouldHaveLength, 1) + filters := filterChains[0].GetFilters() + So(filters, ShouldHaveLength, 1) + + if svc.ProxyMode == "http" { + So(filters[0].GetName(), ShouldEqual, wellknown.HTTPConnectionManager) + connectionManager := &hcm.HttpConnectionManager{} + err = ptypes.UnmarshalAny(filters[0].GetTypedConfig(), connectionManager) + So(err, ShouldBeNil) + So(connectionManager.GetStatPrefix(), ShouldEqual, "ingress_http") + So(connectionManager.GetRouteConfig(), ShouldNotBeNil) + So(connectionManager.GetRouteConfig().GetVirtualHosts(), ShouldHaveLength, 1) + virtualHost := connectionManager.GetRouteConfig().GetVirtualHosts()[0] + So(virtualHost.GetName(), ShouldEqual, adapter.SvcName(svc.Name, svc.Ports[0].ServicePort)) + So(virtualHost.GetRoutes(), ShouldHaveLength, 1) + route := virtualHost.GetRoutes()[0].GetRoute() + So(route, ShouldNotBeNil) + So(route.GetCluster(), ShouldEqual, adapter.SvcName(svc.Name, svc.Ports[0].ServicePort)) + So(route.GetTimeout(), ShouldNotBeNil) + } else { // tcp + So(filters[0].GetName(), ShouldEqual, wellknown.TCPProxy) + connectionManager := &tcpp.TcpProxy{} + err = ptypes.UnmarshalAny(filters[0].GetTypedConfig(), connectionManager) + So(err, ShouldBeNil) + So(connectionManager.GetStatPrefix(), ShouldEqual, "ingress_tcp") + So(connectionManager.GetCluster(), ShouldEqual, adapter.SvcName(svc.Name, svc.Ports[0].ServicePort)) + } +} + +func extractClusterEndpoints(serialisedCluster *any.Any, svc service.Service) []*endpoint.LbEndpoint { + cluster := &api.Cluster{} + err := ptypes.UnmarshalAny(serialisedCluster, cluster) + So(err, ShouldBeNil) + So(cluster.Name, ShouldEqual, adapter.SvcName(svc.Name, svc.Ports[0].ServicePort)) + So(cluster.GetConnectTimeout().GetNanos(), ShouldEqual, 500000000) + loadAssignment := cluster.GetLoadAssignment() + So(loadAssignment, ShouldNotBeNil) + So(loadAssignment.GetClusterName(), ShouldEqual, adapter.SvcName(svc.Name, svc.Ports[0].ServicePort)) + localityEndpoints := loadAssignment.GetEndpoints() + So(localityEndpoints, ShouldHaveLength, 1) + + return localityEndpoints[0].GetLbEndpoints() +} + +func validateCluster(serialisedCluster *any.Any, svc service.Service) { + endpoints := extractClusterEndpoints(serialisedCluster, svc) + So(endpoints, ShouldHaveLength, 1) + So(endpoints[0].GetEndpoint().GetAddress().GetSocketAddress().GetAddress(), ShouldEqual, svc.Ports[0].IP) + So(endpoints[0].GetEndpoint().GetAddress().GetSocketAddress().GetPortValue(), ShouldEqual, svc.Ports[0].Port) +} + +// EnvoyMock is used to validate the Envoy state by making the same gRPC stream calls +// to the Server as Envoy would +type EnvoyMock struct { + nonces map[string]string +} + +func NewEnvoyMock() EnvoyMock { + return EnvoyMock{ + nonces: make(map[string]string), + } +} + +func (sv *EnvoyMock) GetResource(stream envoy_discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient, resource string, hostname string) []*any.Any { + nonce, ok := sv.nonces[resource] + if !ok { + // Set the initial nonce to 0 for each resource type. The control plane will increment + // it after each call, so we need to pass back the value we last received. + nonce = "0" + } + err := stream.Send(&api.DiscoveryRequest{ + VersionInfo: "1", + Node: &core.Node{ + Id: hostname, + }, + TypeUrl: resource, + ResponseNonce: nonce, + }) + if err != nil && err != io.EOF { + So(err, ShouldBeNil) + } + + // Recv() blocks until the stream ctx expires if the message sent via Send() is not recognised / valid + response, err := stream.Recv() + So(err, ShouldBeNil) + + sv.nonces[resource] = response.GetNonce() + + return response.Resources +} + +func (sv *EnvoyMock) ValidateResources(stream envoy_discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient, svc service.Service, hostname string) { + for resourceType, validator := range validators { + resources := sv.GetResource(stream, resourceType, hostname) + So(resources, ShouldHaveLength, 1) + validator(resources[0], svc) + } +} + +// SnapshotCache is a light wrapper around cache.SnapshotCache which lets +// us get a notification after calling SetSnapshot via the Waiter chan +type SnapshotCache struct { + cache.SnapshotCache + Waiter chan struct{} +} + +func (c *SnapshotCache) SetSnapshot(node string, snapshot cache.Snapshot) error { + err := c.SnapshotCache.SetSnapshot(node, snapshot) + + c.Waiter <- struct{}{} + + return err +} + +func NewSnapshotCache() *SnapshotCache { + return &SnapshotCache{ + SnapshotCache: cache.NewSnapshotCache(true, cache.IDHash{}, nil), + Waiter: make(chan struct{}), + } +} + +func Test_PortForServicePort(t *testing.T) { + Convey("Run()", t, func() { + config := config.EnvoyConfig{ + UseGRPCAPI: true, + BindIP: bindIP, + } + + state := catalog.NewServicesState() + + dummyHostname := "carcasone" + baseTime := time.Now().UTC() + httpSvc := service.Service{ + ID: "deadbeef123", + Name: "bocaccio", + Created: baseTime, + Hostname: dummyHostname, + Updated: baseTime, + Status: service.ALIVE, + ProxyMode: "http", + Ports: []service.Port{ + {IP: "127.0.0.1", Port: 9990, ServicePort: 10100}, + }, + } + + anotherHTTPSvc := service.Service{ + ID: "deadbeef456", + Name: "bocaccio", + Created: baseTime, + Hostname: dummyHostname, + Updated: baseTime, + Status: service.ALIVE, + ProxyMode: "http", + Ports: []service.Port{ + {IP: "127.0.0.1", Port: 9991, ServicePort: 10100}, + }, + } + + tcpSvc := service.Service{ + ID: "undeadbeef", + Name: "tolstoy", + Created: baseTime, + Hostname: state.Hostname, + Updated: baseTime, + Status: service.ALIVE, + ProxyMode: "tcp", + Ports: []service.Port{ + {IP: "127.0.0.1", Port: 666, ServicePort: 10101}, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + Reset(func() { + cancel() + }) + + // Use a custom SnapshotCache in the xdsServer so we can block after updating + // the state until the Server gets a chance to set a new snapshot in the cache + snapshotCache := NewSnapshotCache() + server := &Server{ + Listener: NewListener(), + config: config, + state: state, + snapshotCache: snapshotCache, + xdsServer: xds.NewServer(ctx, snapshotCache, &xdsCallbacks{}), + } + + // Hook up the Envoy server Sidecar listener into the state + state.AddListener(server.Listener) + + // The gRPC listener will be assigned a random port and will be owned and managed + // by the gRPC server + lis, err := net.Listen("tcp", ":0") + So(err, ShouldBeNil) + So(lis.Addr(), ShouldHaveSameTypeAs, &net.TCPAddr{}) + + go server.Run(ctx, director.NewFreeLooper(director.FOREVER, make(chan error)), lis) + + Convey("sends the Envoy state via gRPC", func() { + conn, err := grpc.DialContext(ctx, + fmt.Sprintf(":%d", lis.Addr().(*net.TCPAddr).Port), + grpc.WithInsecure(), grpc.WithBlock(), + ) + So(err, ShouldBeNil) + + // 100 milliseconds should give us enough time to run hundreds of server transactions + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + Reset(func() { + cancel() + }) + + stream, err := envoy_discovery.NewAggregatedDiscoveryServiceClient(conn).StreamAggregatedResources(ctx) + So(err, ShouldBeNil) + + envoyMock := NewEnvoyMock() + + Convey("for a HTTP service", func() { + state.AddServiceEntry(httpSvc) + <-snapshotCache.Waiter + + envoyMock.ValidateResources(stream, httpSvc, state.Hostname) + + Convey("and removes it after it gets tombstoned", func() { + httpSvc.Tombstone() + httpSvc.Updated.Add(1 * time.Millisecond) + state.AddServiceEntry(httpSvc) + <-snapshotCache.Waiter + + for resourceType := range validators { + resources := envoyMock.GetResource(stream, resourceType, state.Hostname) + So(resources, ShouldHaveLength, 0) + } + }) + + Convey("and places another instance of the same service in the same cluster", func() { + state.AddServiceEntry(anotherHTTPSvc) + <-snapshotCache.Waiter + + resources := envoyMock.GetResource(stream, cache.ClusterType, state.Hostname) + So(resources, ShouldHaveLength, 1) + endpoints := extractClusterEndpoints(resources[0], httpSvc) + So(endpoints, ShouldHaveLength, 2) + var ports sort.IntSlice + for _, endpoint := range endpoints { + ports = append(ports, + int(endpoint.GetEndpoint().GetAddress().GetSocketAddress().GetPortValue())) + } + ports.Sort() + So(ports, ShouldResemble, sort.IntSlice{9990, 9991}) + }) + }) + + Convey("for a TCP service", func() { + state.AddServiceEntry(tcpSvc) + <-snapshotCache.Waiter + + envoyMock.ValidateResources(stream, tcpSvc, state.Hostname) + }) + + Convey("and skips tombstones", func() { + httpSvc.Tombstone() + state.AddServiceEntry(httpSvc) + <-snapshotCache.Waiter + + for resourceType := range validators { + resources := envoyMock.GetResource(stream, resourceType, state.Hostname) + So(resources, ShouldHaveLength, 0) + } + }) + + Convey("and triggers an update when expiring a server with only one service running", func(c C) { + state.AddServiceEntry(httpSvc) + <-snapshotCache.Waiter + + done := make(chan struct{}) + go func() { + select { + case <-snapshotCache.Waiter: + close(done) + case <-time.After(100 * time.Millisecond): + c.So(true, ShouldEqual, false) + } + }() + + state.ExpireServer(dummyHostname) + <-done + + for resourceType := range validators { + resources := envoyMock.GetResource(stream, resourceType, state.Hostname) + So(resources, ShouldHaveLength, 0) + } + }) + + // TODO: This test is flaky in the current implementation + // Convey("and doesn't do spurious updates", func() { + // state.AddServiceEntry(httpSvc) + // state.AddServiceEntry(anotherHTTPSvc) + // <-snapshotCache.Waiter + + // updateCount := 0 + // done := make(chan struct{}) + // go func() { + // for { + // select { + // case <-snapshotCache.Waiter: + // updateCount++ + // case <-time.After(10 * time.Millisecond): + // done <- struct{}{} + // return + // } + // } + // }() + + // state.ExpireServer(dummyHostname) + // <-done + // So(updateCount, ShouldEqual, 1) + + // for resourceType := range validators { + // resources := envoyMock.GetResource(stream, resourceType, state.Hostname) + // So(resources, ShouldHaveLength, 0) + // } + // }) + }) + }) +} diff --git a/go.mod b/go.mod index e23a75bc..804ca4c6 100644 --- a/go.mod +++ b/go.mod @@ -11,9 +11,10 @@ require ( github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 github.com/containerd/continuity v0.0.0-20181203112020-004b46473808 // indirect + github.com/envoyproxy/go-control-plane v0.9.2 github.com/fsouza/go-dockerclient v1.3.1 - github.com/gogo/protobuf v1.2.1 // indirect - github.com/golang/protobuf v1.3.1 // indirect + github.com/gogo/protobuf v1.2.1 + github.com/golang/protobuf v1.3.2 github.com/gorilla/mux v1.6.2 github.com/hashicorp/go-msgpack v0.5.5 // indirect github.com/hashicorp/go-multierror v1.0.0 // indirect @@ -34,9 +35,9 @@ require ( github.com/smartystreets/goconvey v0.0.0-20190306220146-200a235640ff golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c // indirect golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c // indirect - golang.org/x/sync v0.0.0-20190423024810-112230192c58 // indirect golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5 // indirect golang.org/x/text v0.3.2 // indirect + google.golang.org/grpc v1.26.0 gopkg.in/alecthomas/kingpin.v2 v2.2.5 gopkg.in/jarcoal/httpmock.v1 v1.0.0-20170412085702-cf52904a3cf0 gopkg.in/relistan/rubberneck.v1 v1.0.1 diff --git a/go.sum b/go.sum index aea52cec..59d6e0d0 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/Microsoft/go-winio v0.4.11 h1:zoIOcVf0xPN1tnMVbTtEdI+P8OofVk3NObnwOQ6nK2Q= github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA= @@ -20,8 +22,12 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 h1:EFSB7Zo9Eg91v7MJPVsifUysc/wPdN+NOnVe6bWbdBM= github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f h1:WBZRG4aNOuI15bLRrCgN8fCq8E5Xuty6jGbmSNEvSsU= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/containerd/continuity v0.0.0-20180814194400-c7c5070e6f6e h1:KEBqsIJcjops96ysfjRTg3x6STnVHBxe7CZLwwnlkWA= github.com/containerd/continuity v0.0.0-20180814194400-c7c5070e6f6e/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y= github.com/containerd/continuity v0.0.0-20181203112020-004b46473808 h1:4BX8f882bXEDKfWIf0wa8HRvpnBoPszJJXL+TVbBw4M= @@ -37,6 +43,12 @@ github.com/docker/go-units v0.3.3 h1:Xk8S3Xj5sLGlG5g67hJmYMmUgXv5N4PhkjJHHqrwnTk github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/libnetwork v0.8.0-dev.2.0.20180608203834-19279f049241 h1:+ebE/hCU02srkeIg8Vp/vlUp182JapYWtXzV+bCeR2I= github.com/docker/libnetwork v0.8.0-dev.2.0.20180608203834-19279f049241/go.mod h1:93m0aTqz6z+g32wla4l4WxTrdtvBRmVzYRkYvasA5Z8= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.2 h1:GJ5MKABRjz+QuET1GHm0KD9HC/mAzb3g2FznLQ0aThc= +github.com/envoyproxy/go-control-plane v0.9.2/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsouza/go-dockerclient v1.3.1 h1:h0SaeiAGihssk+aZeKohbubHYKroCBlC7uuUyNhORI4= @@ -45,10 +57,13 @@ github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= -github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= @@ -113,6 +128,7 @@ github.com/pquerna/ffjson v0.0.0-20171002144729-d49c2bc1aa13 h1:AUK/hm/tPsiNNASd github.com/pquerna/ffjson v0.0.0-20171002144729-d49c2bc1aa13/go.mod h1:YARuvh7BUWHNhzDq2OM5tzR2RiCcN2D7sapiKyCel/M= github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/relistan/go-director v0.0.0-20181104164737-5f56787d9731 h1:M8d8wZ2QCkGfp+N3LxT6bTFAXqhBV4Az450DuCqZEp0= @@ -145,12 +161,20 @@ golang.org/x/crypto v0.0.0-20180820150726-614d502a4dac/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c h1:Vj5n4GlwjmQteupaxJ9+0FNOmBrHfq7vN4btdGoDZgI= golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d h1:g9qWBGx4puODJTMVyoPrpoxPFgVGd+z1DZwjfRu4d0I= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c h1:uOCk1iQW6Vc18bnC13MfzScl+wdKBmM9Y9kU7Z83/lw= golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -158,6 +182,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEha golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180824143301-4910a1d54f87 h1:GqwDwfvIpC33dK9bA1fD+JiDUNsuAiQiEkpHqUKze4o= golang.org/x/sys v0.0.0-20180824143301-4910a1d54f87/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5 h1:sM3evRHxE/1RuMe1FYAL3j7C7fUfIjkbE+NiDAYUF8U= @@ -168,6 +193,20 @@ golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.26.0 h1:2dTRdpdFEEhJYQD8EMLB61nnrzSCTbG38PhqdhvOltg= +google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= gopkg.in/airbrake/gobrake.v2 v2.0.9 h1:7z2uVWwn7oVeeugY1DtlPAy5H+KYgB1KeKTnqjNatLo= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= gopkg.in/alecthomas/kingpin.v2 v2.2.5 h1:qskSCq465uEvC3oGocwvZNsO3RF3SpLVLumOAhL0bXo= @@ -190,3 +229,5 @@ gotest.tools v2.1.0+incompatible h1:5USw7CrJBYKqjg9R7QlA6jzqZKEAtvW82aNmsxxGPxw= gotest.tools v2.1.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/main.go b/main.go index 0b2eaaf9..75db93d8 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,8 @@ package main // import "github.com/Nitro/sidecar" import ( + "context" + "net" "os" "os/signal" "runtime/pprof" @@ -8,7 +10,9 @@ import ( "github.com/Nitro/memberlist" "github.com/Nitro/sidecar/catalog" + "github.com/Nitro/sidecar/config" "github.com/Nitro/sidecar/discovery" + "github.com/Nitro/sidecar/envoy" "github.com/Nitro/sidecar/haproxy" "github.com/Nitro/sidecar/healthy" "github.com/Nitro/sidecar/service" @@ -37,7 +41,7 @@ func announceMembers(list *memberlist.Memberlist, state *catalog.ServicesState) // configureOverrides takes CLI opts and applies them over the top of settings // taken from the environment variables and stored in config. -func configureOverrides(config *Config, opts *CliOpts) { +func configureOverrides(config *config.Config, opts *CliOpts) { if len(*opts.AdvertiseIP) > 0 { config.Sidecar.AdvertiseIP = *opts.AdvertiseIP } @@ -55,7 +59,7 @@ func configureOverrides(config *Config, opts *CliOpts) { } } -func configureHAproxy(config Config) *haproxy.HAproxy { +func configureHAproxy(config *config.Config) *haproxy.HAproxy { proxy := haproxy.New(config.HAproxy.ConfigFile, config.HAproxy.PidFile) if len(config.HAproxy.BindIP) > 0 { @@ -87,7 +91,7 @@ func configureHAproxy(config Config) *haproxy.HAproxy { return proxy } -func configureDiscovery(config *Config, publishedIP string) discovery.Discoverer { +func configureDiscovery(config *config.Config, publishedIP string) discovery.Discoverer { disco := new(discovery.MultiDiscovery) var svcNamer discovery.ServiceNamer @@ -140,7 +144,7 @@ func configureDiscovery(config *Config, publishedIP string) discovery.Discoverer } // configureMetrics sets up remote performance metrics if we're asked to send them (statsd) -func configureMetrics(config *Config) { +func configureMetrics(config *config.Config) { if config.Sidecar.StatsAddr != "" { sink, err := metrics.NewStatsdSink(config.Sidecar.StatsAddr) exitWithError(err, "Can't configure Statsd") @@ -152,7 +156,7 @@ func configureMetrics(config *Config) { } // configureDelegate sets up the Memberlist delegate we'll use -func configureDelegate(state *catalog.ServicesState, config *Config) *servicesDelegate { +func configureDelegate(state *catalog.ServicesState, config *config.Config) *servicesDelegate { delegate := NewServicesDelegate(state) delegate.Metadata = NodeMetadata{ ClusterName: config.Sidecar.ClusterName, @@ -195,7 +199,7 @@ func configureCpuProfiler(opts *CliOpts) { } } -func configureLoggingLevel(config *Config) { +func configureLoggingLevel(config *config.Config) { level := config.Sidecar.LoggingLevel switch { @@ -213,7 +217,7 @@ func configureLoggingLevel(config *Config) { } // configureLoggingFormat switches between text and JSON log format -func configureLoggingFormat(config *Config) { +func configureLoggingFormat(config *config.Config) { if config.Sidecar.LoggingFormat == "json" { log.SetFormatter(&log.JSONFormatter{}) } else { @@ -222,7 +226,7 @@ func configureLoggingFormat(config *Config) { } } -func configureMemberlist(config *Config, state *catalog.ServicesState) *memberlist.Config { +func configureMemberlist(config *config.Config, state *catalog.ServicesState) *memberlist.Config { delegate := configureDelegate(state, config) // Use a LAN config but add our delegate @@ -258,7 +262,7 @@ func configureMemberlist(config *Config, state *catalog.ServicesState) *memberli } // configureListeners sets up any statically configured state change event listeners. -func configureListeners(config *Config, state *catalog.ServicesState) { +func configureListeners(config *config.Config, state *catalog.ServicesState) { for _, url := range config.Listeners.Urls { listener := catalog.NewUrlListener(url, false) listener.Watch(state) @@ -266,13 +270,13 @@ func configureListeners(config *Config, state *catalog.ServicesState) { } func main() { - config := parseConfig() + config := config.ParseConfig() opts := parseCommandLine() - configureOverrides(&config, opts) + configureOverrides(config, opts) configureCpuProfiler(opts) - configureLoggingLevel(&config) - configureLoggingFormat(&config) - configureMetrics(&config) + configureLoggingLevel(config) + configureLoggingFormat(config) + configureMetrics(config) // Create a new state instance and fire up the processor. We need // this to happen early in the startup. @@ -282,9 +286,9 @@ func main() { ) go state.ProcessServiceMsgs(svcMsgLooper) - configureListeners(&config, state) + configureListeners(config, state) - mlConfig := configureMemberlist(&config, state) + mlConfig := configureMemberlist(config, state) printer := rubberneck.NewPrinter(log.Infof, rubberneck.NoAddLineFeed) printer.PrintWithLabel("Sidecar", config) @@ -323,7 +327,7 @@ func main() { // Register the cluster name with the state object state.ClusterName = config.Sidecar.ClusterName - disco := configureDiscovery(&config, mlConfig.AdvertiseAddr) + disco := configureDiscovery(config, mlConfig.AdvertiseAddr) go disco.Run(discoLooper) // Configure the monitor and use the public address as the default @@ -372,5 +376,23 @@ func main() { exitWithError(err, "Failed to reload HAProxy config") } + if config.Envoy.UseGRPCAPI { + ctx := context.Background() + envoyServer := envoy.NewServer(ctx, state, config.Envoy) + envoyServerLooper := director.NewFreeLooper( + director.FOREVER, make(chan error), + ) + + // This listener will be owned and managed by the gRPC server + grpcListener, err := net.Listen("tcp", ":"+config.Envoy.GRPCPort) + if err != nil { + log.Fatalf("Failed to listen on port %q: %s", config.Envoy.GRPCPort, err) + } + + go envoyServer.Run(ctx, envoyServerLooper, grpcListener) + + state.AddListener(envoyServer.Listener) + } + select {} } diff --git a/services_delegate.go b/services_delegate.go index 54320857..c54c6d84 100644 --- a/services_delegate.go +++ b/services_delegate.go @@ -171,11 +171,7 @@ func (d *servicesDelegate) NotifyJoin(node *memberlist.Node) { func (d *servicesDelegate) NotifyLeave(node *memberlist.Node) { log.Debugf("NotifyLeave(): %s", node.Name) - go func() { - d.state.Lock() - defer d.state.Unlock() - d.state.ExpireServer(node.Name) - }() + go d.state.ExpireServer(node.Name) } func (d *servicesDelegate) NotifyUpdate(node *memberlist.Node) { diff --git a/sidecarhttp/envoy_api.go b/sidecarhttp/envoy_api.go index 5f98a60a..22348a22 100644 --- a/sidecarhttp/envoy_api.go +++ b/sidecarhttp/envoy_api.go @@ -4,25 +4,18 @@ package sidecarhttp import ( "fmt" - "net" "net/http" _ "net/http/pprof" - "strconv" - "strings" "github.com/Nitro/memberlist" "github.com/Nitro/sidecar/catalog" + "github.com/Nitro/sidecar/envoy/adapter" "github.com/Nitro/sidecar/service" "github.com/gorilla/mux" "github.com/pquerna/ffjson/ffjson" log "github.com/sirupsen/logrus" ) -const ( - // Used to join service name and port. Must not occur in service names - ServiceNameSeparator = ":" -) - // This file implements the Envoy proxy V1 API on top of a Sidecar // service discovery cluster. @@ -130,7 +123,7 @@ func (s *EnvoyApi) registrationHandler(response http.ResponseWriter, req *http.R return } - svcName, svcPort, err := SvcNameSplit(name) + svcName, svcPort, err := adapter.SvcNameSplit(name) if err != nil { log.Debugf("Envoy Service '%s' not found in registrationHandler: %s", name, err) sendJsonError(response, 404, "Not Found - "+err.Error()) @@ -240,18 +233,6 @@ func (s *EnvoyApi) listenersHandler(response http.ResponseWriter, req *http.Requ } } -// lookupHost does a vv slow lookup of the DNS host for a service. Totally -// not optimized for high throughput. You should only do this in development -// scenarios. -func lookupHost(hostname string) (string, error) { - addrs, err := net.LookupHost(hostname) - - if err != nil { - return "", err - } - return addrs[0], nil -} - // EnvoyServiceFromService converts a Sidecar service to an Envoy // API service for reporting to the proxy func (s *EnvoyApi) EnvoyServiceFromService(svc *service.Service, svcPort int64) *EnvoyService { @@ -267,7 +248,7 @@ func (s *EnvoyApi) EnvoyServiceFromService(svc *service.Service, svcPort int64) // NOT recommended... this is very slow. Useful in dev modes where you // need to resolve to a different IP address only. if s.config.UseHostnames { - if host, err := lookupHost(svc.Hostname); err == nil { + if host, err := adapter.LookupHost(svc.Hostname); err == nil { address = host } else { log.Warnf("Unable to resolve %s, using IP address", svc.Hostname) @@ -279,7 +260,7 @@ func (s *EnvoyApi) EnvoyServiceFromService(svc *service.Service, svcPort int64) LastCheckIn: svc.Updated.String(), Port: port.Port, Revision: svc.Version(), - Service: SvcName(svc.Name, port.ServicePort), + Service: adapter.SvcName(svc.Name, port.ServicePort), ServiceRepoName: svc.Image, Tags: map[string]string{}, } @@ -321,11 +302,11 @@ func (s *EnvoyApi) EnvoyClustersFromState() []*EnvoyCluster { } clusters = append(clusters, &EnvoyCluster{ - Name: SvcName(svcName, port.ServicePort), + Name: adapter.SvcName(svcName, port.ServicePort), Type: "sds", // use Sidecar's SDS endpoint for the hosts ConnectTimeoutMs: 500, LBType: "round_robin", // TODO figure this out! - ServiceName: SvcName(svcName, port.ServicePort), + ServiceName: adapter.SvcName(svcName, port.ServicePort), }) } } @@ -336,7 +317,7 @@ func (s *EnvoyApi) EnvoyClustersFromState() []*EnvoyCluster { // EnvoyListenerFromService takes a Sidecar service and formats it into // the API format for an Envoy proxy listener (LDS API v1) func (s *EnvoyApi) EnvoyListenerFromService(svc *service.Service, port int64) *EnvoyListener { - apiName := SvcName(svc.Name, port) + apiName := adapter.SvcName(svc.Name, port) listener := &EnvoyListener{ Name: apiName, @@ -439,27 +420,6 @@ func (s *EnvoyApi) EnvoyListenersFromState() []*EnvoyListener { return listeners } -// Format an Envoy service name from our service name and port -func SvcName(name string, port int64) string { - return fmt.Sprintf("%s%s%d", name, ServiceNameSeparator, port) -} - -// Split an Enovy service name into our service name and port -func SvcNameSplit(name string) (string, int64, error) { - parts := strings.Split(name, ServiceNameSeparator) - if len(parts) < 2 { - return "", -1, fmt.Errorf("%s", "Unable to split service name and port!") - } - - svcName := parts[0] - svcPort, err := strconv.ParseInt(parts[1], 10, 64) - if err != nil { - return "", -1, fmt.Errorf("%s", "Unable to parse port!") - } - - return svcName, svcPort, nil -} - // HttpMux returns a configured Gorilla mux to handle all the endpoints // for the Envoy API. func (s *EnvoyApi) HttpMux() http.Handler { From 17dddeb95f5a62e004e68f4bbaa3f55b4b69d8c3 Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Sat, 25 Jan 2020 22:06:45 +0000 Subject: [PATCH 2/7] Update Readme to add the new Envoy gRPC API (V2) --- README.md | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 2bd023b1..f831dfe5 100644 --- a/README.md +++ b/README.md @@ -21,9 +21,9 @@ or can also leverage: * [Lyft's Envoy Proxy](https://github.com/envoyproxy/envoy) - In less than a year it is fast becoming a core microservices architecture component. - Sidecar implements the Envoy proxy SDS, CDS, and LDS APIs (v1). These - allow a standalone Envoy to be entirely configured by Sidecar. This is - best used with Nitro's + Sidecar implements the Envoy proxy SDS, CDS, LDS (V1) and gRPC (V2) APIs. + These allow a standalone Envoy to be entirely configured by Sidecar. This + is best used with Nitro's [Envoy proxy container](https://hub.docker.com/r/gonitro/envoyproxy/tags/). * [haproxy-api](https://github.com/Nitro/haproxy-api) - A separation layer @@ -229,6 +229,12 @@ Defaults are in bold at the end of the line: * `HAPROXY_USE_HOSTNAMES`: Should we write hostnames in the HAproxy config instead of IP addresses? **`false`** + * `ENVOY_USE_GRPC_API`: Enable the Envoy gRPC API (V2) **`true`** + * `ENVOY_BIND_IP`: The IP that Envoy should bind to on the host **192.168.168.168** + * `ENVOY_USE_HOSTNAMES`: Should we write hostnames in the Envoy config instead + of IP addresses? **`false`** + * `ENVOY_GRPC_PORT`: The port for the Envoy API gRPC server **`7776`** + ### Ports @@ -424,7 +430,7 @@ The logging output is pretty good in the normal `info` level. It can be made quite verbose in `debug` mode, and contains lots of information about what's going on and what the current state is. The web interface also contains a lot of runtime information on the cluster and the services. If you are running -HAproxy, it's also recommneded that you expose the HAproxy stats port on 3212 +HAproxy, it's also recommended that you expose the HAproxy stats port on 3212 so that Sidecar can find it. Currently the web interface runs on port 7777 on each machine that runs @@ -457,11 +463,18 @@ Envoy Proxy Support ------------------- Envoy uses a very different model than HAproxy and thus Sidecar's support for -it is quite different from its support for HAproxy. Envoy makes requests to a -variety of discovery service APIs on a timed basis. Sidecar currently -implements three of these: the Cluster Discovery Service (CDS), the Service -Discovery Service (SDS), and the Listeners Discovery Service (LDS). Nitro -builds and supports [an Envoy +it is quite different from its support for HAproxy. + +When using the REST-based LDS API (V1), Envoy makes requests to a variety of +discovery service APIs on a timed basis. Sidecar currently implements three +of these: the Cluster Discovery Service (CDS), the Service Discovery Service +(SDS), and the Listeners Discovery Service (LDS). When using the gRPC V2 API, +Sidecar sends updates to Envoy as soon as possible via gRPC. + +Note that the LDS API (V1) has been deprecated by Envoy and it's recommended +to use the gRPC-based V2 API. + +Nitro builds and supports [an Envoy container](https://hub.docker.com/r/gonitro/envoyproxy/tags/) that is tested and works against Sidecar. This is the easiest way to run Envoy with Sidecar. You can find an example container configuration From 228aff19ccf18d8bfbaaefa4edb9674e690f89f6 Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Sun, 26 Jan 2020 13:08:14 +0000 Subject: [PATCH 3/7] Send clusters before sending listeners to Envoy Using two distinct transactions for sending the updated resources to Envoy ensures that we don't get an error back if one of the listeners contains a cluster that Envoy hasn't received yet. --- envoy/adapter/adapter.go | 4 --- envoy/server.go | 69 +++++++++++++++++++++++++++++----------- envoy/server_test.go | 27 ++++++++++++++++ 3 files changed, 77 insertions(+), 23 deletions(-) diff --git a/envoy/adapter/adapter.go b/envoy/adapter/adapter.go index 48571600..a9b315e1 100644 --- a/envoy/adapter/adapter.go +++ b/envoy/adapter/adapter.go @@ -106,8 +106,6 @@ func EnvoyListenersFromState(state *catalog.ServicesState, bindIP string) ([]cac } } - log.Debugf("Created %d Envoy listeners", len(listeners)) - return listeners, nil } @@ -260,8 +258,6 @@ func EnvoyClustersFromState(state *catalog.ServicesState, useHostnames bool) []c } } - log.Debugf("Created %d Envoy clusters", len(clusters)) - return clusters } diff --git a/envoy/server.go b/envoy/server.go index df60ddbc..76575b8d 100644 --- a/envoy/server.go +++ b/envoy/server.go @@ -45,6 +45,14 @@ type Server struct { xdsServer xds.Server } +// newSnapshotVersion returns a unique version for Envoy cache snapshots +func newSnapshotVersion() string { + // When triggering watches after a cache snapshot is set, the go-control-plane + // only sends resources which have a different version to Envoy. + // `time.Now().UnixNano()` should always return a unique number. + return strconv.FormatInt(time.Now().UnixNano(), 10) +} + // Run sets up the Sidecar listener event loop and starts the Envoy gRPC server func (s *Server) Run(ctx context.Context, looper director.Looper, grpcListener net.Listener) { grpcServer := grpc.NewServer() @@ -66,34 +74,57 @@ func (s *Server) Run(ctx context.Context, looper director.Looper, grpcListener n <-s.Listener.Chan() } + // The hostname needs to match the value passed via `--service-node` to Envoy + // See https://github.com/envoyproxy/envoy/issues/144#issuecomment-267401271 + hostname := s.state.Hostname + + snapshotVersion := newSnapshotVersion() + + clusters := adapter.EnvoyClustersFromState(s.state, s.config.UseHostnames) + + // Set the new clusters in the current snapshot to send them along with the + // previous listeners to Envoy. If we would pass in the new listeners too, Envoy + // will complain if it happens to receive the new listeners before the new clusters + // because some of the listeners might be associated with clusters which don't + // exit yet. + // See the eventual consistency considerations in the documentation for details: + // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#eventual-consistency-considerations + snapshot, err := s.snapshotCache.GetSnapshot(hostname) + if err != nil { + // During the first iteration, there is no existing snapshot, so we create one + snapshot = cache.NewSnapshot(snapshotVersion, nil, clusters, nil, nil, nil) + } else { + snapshot.Resources[cache.Cluster] = cache.NewResources(snapshotVersion, clusters) + } + + err = s.snapshotCache.SetSnapshot(hostname, snapshot) + if err != nil { + log.Errorf("Failed to set new Envoy cache snapshot: %s", err) + return nil + } + log.Infof("Sent %d clusters to Envoy with version %s", len(clusters), snapshotVersion) + listeners, err := adapter.EnvoyListenersFromState(s.state, s.config.BindIP) if err != nil { log.Errorf("Failed to create Envoy listeners: %s", err) return nil } - // We are using `time.Now().UnixNano()` to ensure that all versions we send to - // Envoy are unique. Otherwise, Envoy will skip the update. - version := strconv.FormatInt(time.Now().UnixNano(), 10) - err = s.snapshotCache.SetSnapshot( - // The hostname needs to match the value passed via `--service-node` to Envoy - // See https://github.com/envoyproxy/envoy/issues/144#issuecomment-267401271 - s.state.Hostname, - cache.NewSnapshot( - version, - nil, - adapter.EnvoyClustersFromState(s.state, s.config.UseHostnames), - nil, - listeners, - nil, - ), - ) + // Create a new snapshot version and, finally, send the updated listeners to Envoy + snapshotVersion = newSnapshotVersion() + err = s.snapshotCache.SetSnapshot(hostname, cache.NewSnapshot( + snapshotVersion, + nil, + clusters, + nil, + listeners, + nil, + )) if err != nil { - log.Errorf("Failed to create new Envoy cache snapshot: %s", err) + log.Errorf("Failed to set new Envoy cache snapshot: %s", err) return nil } - - log.Infof("Envoy configuration updated to version %s", version) + log.Infof("Sent %d listeners to Envoy with version %s", len(listeners), snapshotVersion) return nil }) diff --git a/envoy/server_test.go b/envoy/server_test.go index fa143c89..50590df8 100644 --- a/envoy/server_test.go +++ b/envoy/server_test.go @@ -267,6 +267,7 @@ func Test_PortForServicePort(t *testing.T) { Convey("for a HTTP service", func() { state.AddServiceEntry(httpSvc) <-snapshotCache.Waiter + <-snapshotCache.Waiter envoyMock.ValidateResources(stream, httpSvc, state.Hostname) @@ -275,6 +276,7 @@ func Test_PortForServicePort(t *testing.T) { httpSvc.Updated.Add(1 * time.Millisecond) state.AddServiceEntry(httpSvc) <-snapshotCache.Waiter + <-snapshotCache.Waiter for resourceType := range validators { resources := envoyMock.GetResource(stream, resourceType, state.Hostname) @@ -285,6 +287,7 @@ func Test_PortForServicePort(t *testing.T) { Convey("and places another instance of the same service in the same cluster", func() { state.AddServiceEntry(anotherHTTPSvc) <-snapshotCache.Waiter + <-snapshotCache.Waiter resources := envoyMock.GetResource(stream, cache.ClusterType, state.Hostname) So(resources, ShouldHaveLength, 1) @@ -303,6 +306,7 @@ func Test_PortForServicePort(t *testing.T) { Convey("for a TCP service", func() { state.AddServiceEntry(tcpSvc) <-snapshotCache.Waiter + <-snapshotCache.Waiter envoyMock.ValidateResources(stream, tcpSvc, state.Hostname) }) @@ -311,6 +315,7 @@ func Test_PortForServicePort(t *testing.T) { httpSvc.Tombstone() state.AddServiceEntry(httpSvc) <-snapshotCache.Waiter + <-snapshotCache.Waiter for resourceType := range validators { resources := envoyMock.GetResource(stream, resourceType, state.Hostname) @@ -321,11 +326,13 @@ func Test_PortForServicePort(t *testing.T) { Convey("and triggers an update when expiring a server with only one service running", func(c C) { state.AddServiceEntry(httpSvc) <-snapshotCache.Waiter + <-snapshotCache.Waiter done := make(chan struct{}) go func() { select { case <-snapshotCache.Waiter: + <-snapshotCache.Waiter close(done) case <-time.After(100 * time.Millisecond): c.So(true, ShouldEqual, false) @@ -341,11 +348,31 @@ func Test_PortForServicePort(t *testing.T) { } }) + Convey("and sends an update with the new clusters", func() { + state.AddServiceEntry(httpSvc) + + // The snapshotCache.Waiter will block after the first cache snapshot + // containing the clusters is set + + clusters := envoyMock.GetResource(stream, cache.ClusterType, state.Hostname) + So(clusters, ShouldHaveLength, 1) + validateCluster(clusters[0], httpSvc) + + listeners := envoyMock.GetResource(stream, cache.ListenerType, state.Hostname) + So(listeners, ShouldHaveLength, 0) + + <-snapshotCache.Waiter + <-snapshotCache.Waiter + + envoyMock.ValidateResources(stream, httpSvc, state.Hostname) + }) + // TODO: This test is flaky in the current implementation // Convey("and doesn't do spurious updates", func() { // state.AddServiceEntry(httpSvc) // state.AddServiceEntry(anotherHTTPSvc) // <-snapshotCache.Waiter + // <-snapshotCache.Waiter // updateCount := 0 // done := make(chan struct{}) From 3f561315406ee682148aec6c973d84023d3db69d Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Sun, 2 Feb 2020 21:05:45 +0000 Subject: [PATCH 4/7] Update the Envoy gRPC API via a timed looper instead of using the listener mechanism --- envoy/listener.go | 45 ---------------------------------- envoy/server.go | 47 ++++++++++++++++++----------------- envoy/server_test.go | 58 ++++++++++++++------------------------------ main.go | 6 ++--- 4 files changed, 45 insertions(+), 111 deletions(-) delete mode 100644 envoy/listener.go diff --git a/envoy/listener.go b/envoy/listener.go deleted file mode 100644 index cd83d37b..00000000 --- a/envoy/listener.go +++ /dev/null @@ -1,45 +0,0 @@ -package envoy - -import ( - "github.com/Nitro/sidecar/catalog" -) - -const ( - listenerEventBufferSize = 100 -) - -// Listener is an internal Sidecar listener that will be hooked up when -// config.Envoy.UseGRPCAPI is true. It only needs to know when the state -// changes, ignoring what the actual change was. -type Listener struct { - eventsChan chan catalog.ChangeEvent -} - -// Chan exposes the internal events channel -func (l *Listener) Chan() chan catalog.ChangeEvent { - return l.eventsChan -} - -// Name returns a unique name for this listener -func (l *Listener) Name() string { - // Be careful not to clash with names assigned automatically in - // service/service.go -> ListenerName() - return "internal-envoy" -} - -// Managed tells Sidecar that it shouldn't try to automatically remove -// this listener -func (l *Listener) Managed() bool { - return false -} - -// NewListener creates a new Listener instance -func NewListener() *Listener { - return &Listener{ - // See catalog/url_listener.go -> NewUrlListener() for a similar mechanism. - // We use a larger buffer here, because, unlike the URL listener, this is - // all processed internally in the same process, so we can buffer/flush - // more events faster. - eventsChan: make(chan catalog.ChangeEvent, listenerEventBufferSize), - } -} diff --git a/envoy/server.go b/envoy/server.go index 76575b8d..38d741ef 100644 --- a/envoy/server.go +++ b/envoy/server.go @@ -19,6 +19,11 @@ import ( "google.golang.org/grpc" ) +const ( + // LooperUpdateInterval indicates how often to check if the state has changed + LooperUpdateInterval = 1 * time.Second +) + type xdsCallbacks struct{} func (*xdsCallbacks) OnStreamOpen(context.Context, int64, string) error { return nil } @@ -38,7 +43,6 @@ func (*xdsCallbacks) OnFetchResponse(*api.DiscoveryRequest, *api.DiscoveryRespon // Server is a wrapper around Envoy's control plane xDS gRPC server and it uses // the Aggregated Discovery Service (ADS) mechanism. type Server struct { - Listener catalog.Listener config config.EnvoyConfig state *catalog.ServicesState snapshotCache cache.SnapshotCache @@ -53,30 +57,27 @@ func newSnapshotVersion() string { return strconv.FormatInt(time.Now().UnixNano(), 10) } -// Run sets up the Sidecar listener event loop and starts the Envoy gRPC server +// Run starts the Envoy update looper and the Envoy gRPC server func (s *Server) Run(ctx context.Context, looper director.Looper, grpcListener net.Listener) { - grpcServer := grpc.NewServer() - envoy_discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, s.xdsServer) - + // The local hostname needs to match the value passed via `--service-node` to Envoy + // See https://github.com/envoyproxy/envoy/issues/144#issuecomment-267401271 + // This never changes, so we don't need to lock the state here + hostname := s.state.Hostname + + // prevStateLastChanged caches the state.LastChanged timestamp when we send an + // update to Envoy + prevStateLastChanged := time.Unix(0, 0) go looper.Loop(func() error { - // Block until we get an event indicating a state change. - // We discard the event since we need a snapshot of the entire state. - <-s.Listener.Chan() - - // When a server is expired in catalog/services_state.go -> ExpireServer(), - // the listener will receive an event for each expired service. We want to - // flush the channel to prevent rapid-fire updates to Envoy. - // This was inspired from receiver/receiver.go -> ProcessUpdates(). - // TODO: Think of a more aggressive / reliable way of draining since we - // used a larger value for listenerEventBufferSize. - pendingEventCount := len(s.Listener.Chan()) - for i := 0; i < pendingEventCount; i++ { - <-s.Listener.Chan() + s.state.RLock() + lastChanged := s.state.LastChanged + s.state.RUnlock() + + // Do nothing if the state hasn't changed + if lastChanged == prevStateLastChanged { + return nil } - // The hostname needs to match the value passed via `--service-node` to Envoy - // See https://github.com/envoyproxy/envoy/issues/144#issuecomment-267401271 - hostname := s.state.Hostname + prevStateLastChanged = lastChanged snapshotVersion := newSnapshotVersion() @@ -129,6 +130,9 @@ func (s *Server) Run(ctx context.Context, looper director.Looper, grpcListener n return nil }) + grpcServer := grpc.NewServer() + envoy_discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, s.xdsServer) + go func() { if err := grpcServer.Serve(grpcListener); err != nil { log.Fatalf("Failed to start Envoy gRPC server: %s", err) @@ -148,7 +152,6 @@ func NewServer(ctx context.Context, state *catalog.ServicesState, config config. snapshotCache := cache.NewSnapshotCache(true, cache.IDHash{}, nil) return &Server{ - Listener: NewListener(), config: config, state: state, snapshotCache: snapshotCache, diff --git a/envoy/server_test.go b/envoy/server_test.go index 50590df8..40e0d2a4 100644 --- a/envoy/server_test.go +++ b/envoy/server_test.go @@ -132,6 +132,7 @@ func (sv *EnvoyMock) GetResource(stream envoy_discovery.AggregatedDiscoveryServi // Recv() blocks until the stream ctx expires if the message sent via Send() is not recognised / valid response, err := stream.Recv() + So(err, ShouldBeNil) sv.nonces[resource] = response.GetNonce() @@ -151,10 +152,15 @@ func (sv *EnvoyMock) ValidateResources(stream envoy_discovery.AggregatedDiscover // us get a notification after calling SetSnapshot via the Waiter chan type SnapshotCache struct { cache.SnapshotCache - Waiter chan struct{} + Waiter chan struct{} + PreCallWaiter chan struct{} } func (c *SnapshotCache) SetSnapshot(node string, snapshot cache.Snapshot) error { + if c.PreCallWaiter != nil { + <-c.PreCallWaiter + } + err := c.SnapshotCache.SetSnapshot(node, snapshot) c.Waiter <- struct{}{} @@ -228,23 +234,21 @@ func Test_PortForServicePort(t *testing.T) { // the state until the Server gets a chance to set a new snapshot in the cache snapshotCache := NewSnapshotCache() server := &Server{ - Listener: NewListener(), config: config, state: state, snapshotCache: snapshotCache, xdsServer: xds.NewServer(ctx, snapshotCache, &xdsCallbacks{}), } - // Hook up the Envoy server Sidecar listener into the state - state.AddListener(server.Listener) - // The gRPC listener will be assigned a random port and will be owned and managed // by the gRPC server lis, err := net.Listen("tcp", ":0") So(err, ShouldBeNil) So(lis.Addr(), ShouldHaveSameTypeAs, &net.TCPAddr{}) - go server.Run(ctx, director.NewFreeLooper(director.FOREVER, make(chan error)), lis) + // Using a FreeLooper instead would make it run too often, triggering spurious + // locking on the state, which can cause the tests to time out + go server.Run(ctx, director.NewTimedLooper(director.FOREVER, 10*time.Millisecond, make(chan error)), lis) Convey("sends the Envoy state via gRPC", func() { conn, err := grpc.DialContext(ctx, @@ -285,6 +289,8 @@ func Test_PortForServicePort(t *testing.T) { }) Convey("and places another instance of the same service in the same cluster", func() { + // Make sure this other service instance was more recently updated than httpSvc + anotherHTTPSvc.Updated = anotherHTTPSvc.Updated.Add(1 * time.Millisecond) state.AddServiceEntry(anotherHTTPSvc) <-snapshotCache.Waiter <-snapshotCache.Waiter @@ -349,10 +355,13 @@ func Test_PortForServicePort(t *testing.T) { }) Convey("and sends an update with the new clusters", func() { + // The snapshotCache.PreCallWaiter will block before the first cache snapshot + // containing the clusters is set + snapshotCache.PreCallWaiter = make(chan struct{}) state.AddServiceEntry(httpSvc) - // The snapshotCache.Waiter will block after the first cache snapshot - // containing the clusters is set + snapshotCache.PreCallWaiter <- struct{}{} + <-snapshotCache.Waiter clusters := envoyMock.GetResource(stream, cache.ClusterType, state.Hostname) So(clusters, ShouldHaveLength, 1) @@ -361,42 +370,11 @@ func Test_PortForServicePort(t *testing.T) { listeners := envoyMock.GetResource(stream, cache.ListenerType, state.Hostname) So(listeners, ShouldHaveLength, 0) - <-snapshotCache.Waiter + snapshotCache.PreCallWaiter <- struct{}{} <-snapshotCache.Waiter envoyMock.ValidateResources(stream, httpSvc, state.Hostname) }) - - // TODO: This test is flaky in the current implementation - // Convey("and doesn't do spurious updates", func() { - // state.AddServiceEntry(httpSvc) - // state.AddServiceEntry(anotherHTTPSvc) - // <-snapshotCache.Waiter - // <-snapshotCache.Waiter - - // updateCount := 0 - // done := make(chan struct{}) - // go func() { - // for { - // select { - // case <-snapshotCache.Waiter: - // updateCount++ - // case <-time.After(10 * time.Millisecond): - // done <- struct{}{} - // return - // } - // } - // }() - - // state.ExpireServer(dummyHostname) - // <-done - // So(updateCount, ShouldEqual, 1) - - // for resourceType := range validators { - // resources := envoyMock.GetResource(stream, resourceType, state.Hostname) - // So(resources, ShouldHaveLength, 0) - // } - // }) }) }) } diff --git a/main.go b/main.go index 75db93d8..e44d6142 100644 --- a/main.go +++ b/main.go @@ -379,8 +379,8 @@ func main() { if config.Envoy.UseGRPCAPI { ctx := context.Background() envoyServer := envoy.NewServer(ctx, state, config.Envoy) - envoyServerLooper := director.NewFreeLooper( - director.FOREVER, make(chan error), + envoyServerLooper := director.NewTimedLooper( + director.FOREVER, envoy.LooperUpdateInterval, make(chan error), ) // This listener will be owned and managed by the gRPC server @@ -390,8 +390,6 @@ func main() { } go envoyServer.Run(ctx, envoyServerLooper, grpcListener) - - state.AddListener(envoyServer.Listener) } select {} From 00ad6e2819a67e7d635a50d346ff9f5aeeaad5a4 Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Sun, 2 Feb 2020 23:32:23 +0000 Subject: [PATCH 5/7] Speed up the Envoy adapter implementation Use state.EachService() instead of state.ByService() when populating the clusters and listeners for the Envoy gRPC API. This is much more efficient, since state.ByService() does a lot of unnecessary manipulation, including sorting. Also merge EnvoyClustersFromState and EnvoyListenersFromState into one function to avoid locking the state several times. --- envoy/adapter/adapter.go | 197 +++++++++++++++++---------------------- envoy/server.go | 24 ++--- 2 files changed, 93 insertions(+), 128 deletions(-) diff --git a/envoy/adapter/adapter.go b/envoy/adapter/adapter.go index a9b315e1..77173204 100644 --- a/envoy/adapter/adapter.go +++ b/envoy/adapter/adapter.go @@ -28,6 +28,12 @@ const ( ServiceNameSeparator = ":" ) +// EnvoyResources is a collection of Enovy API resource definitions +type EnvoyResources struct { + Clusters []cache.Resource + Listeners []cache.Resource +} + // SvcName formats an Envoy service name from our service name and port func SvcName(name string, port int64) string { return fmt.Sprintf("%s%s%d", name, ServiceNameSeparator, port) @@ -61,57 +67,94 @@ func LookupHost(hostname string) (string, error) { return addrs[0], nil } -// EnvoyListenersFromState creates a set of Enovy API listener -// definitions from all the ServicePorts in the Sidecar state. -func EnvoyListenersFromState(state *catalog.ServicesState, bindIP string) ([]cache.Resource, error) { - var listeners []cache.Resource - - state.RLock() - defer state.RUnlock() - - svcs := state.ByService() - // Loop over all the services by service name - for _, endpoints := range svcs { - if len(endpoints) < 1 { - continue - } +// EnvoyResourcesFromState creates a set of Enovy API resource definitions from all +// the ServicePorts in the Sidecar state. The Sidecar state needs to be locked by the +// caller before calling this function. +func EnvoyResourcesFromState(state *catalog.ServicesState, bindIP string, + useHostnames bool) EnvoyResources { - var svc *service.Service - // Find the first alive service and use that as the definition. - // If none are alive, we won't open the port. - for _, endpoint := range endpoints { - if endpoint.IsAlive() { - svc = endpoint - break - } - } + clusterMap := make(map[string]*api.Cluster) + listenerMap := make(map[string]cache.Resource) - if svc == nil { - continue + state.EachService(func(hostname *string, id *string, svc *service.Service) { + if svc == nil || !svc.IsAlive() { + return } - // Loop over the ports and generate a named listener for - // each port. + // Loop over the ports and generate a named listener for each port for _, port := range svc.Ports { // Only listen on ServicePorts if port.ServicePort < 1 { continue } - listener, err := EnvoyListenerFromService(svc, port.ServicePort, bindIP) - if err != nil { - return nil, fmt.Errorf("failed to create listener from service: %s", err) + envoyServiceName := SvcName(svc.Name, port.ServicePort) + + if cluster, ok := clusterMap[envoyServiceName]; ok { + cluster.LoadAssignment.Endpoints[0].LbEndpoints = + append(cluster.LoadAssignment.Endpoints[0].LbEndpoints, + envoyServiceFromService(svc, port.ServicePort, useHostnames)...) + } else { + envoyCluster := &api.Cluster{ + Name: envoyServiceName, + ConnectTimeout: &duration.Duration{Nanos: 500000000}, // 500ms + ClusterDiscoveryType: &api.Cluster_Type{Type: api.Cluster_STATIC}, // Use IPs only + ProtocolSelection: api.Cluster_USE_CONFIGURED_PROTOCOL, + // Setting the endpoints here directly bypasses EDS, so we can + // avoid having to configure that as well + // Note that in `EnvoyClustersFromState()` for the REST API we only need + // the first non-nil alive endpoint instance to construct the cluster + // because, in that case, SDS (now EDS) fetches the actual endpoints in a + // separate call. + LoadAssignment: &api.ClusterLoadAssignment{ + ClusterName: envoyServiceName, + Endpoints: []*endpoint.LocalityLbEndpoints{{ + LbEndpoints: envoyServiceFromService(svc, port.ServicePort, useHostnames), + }}, + }, + // Contour believes the IdleTimeout should be set to 60s. Not sure if we also need to enable these. + // See here: https://github.com/projectcontour/contour/blob/2858fec20d26f56cc75a19d91b61d625a86f36de/internal/envoy/listener.go#L102-L106 + // CommonHttpProtocolOptions: &core.HttpProtocolOptions{ + // IdleTimeout: &duration.Duration{Seconds: 60}, + // MaxConnectionDuration: &duration.Duration{Seconds: 60}, + // }, + // If this needs to be enabled, we might also need to set `ProtocolSelection: api.USE_DOWNSTREAM_PROTOCOL`. + // Http2ProtocolOptions: &core.Http2ProtocolOptions{}, + } + + clusterMap[envoyServiceName] = envoyCluster + } + + if _, ok := listenerMap[envoyServiceName]; !ok { + listener, err := envoyListenerFromService(svc, envoyServiceName, port.ServicePort, bindIP) + if err != nil { + log.Errorf("Failed to create Envoy listener for service %q and port %d: %s", svc.Name, port.ServicePort, err) + continue + } + listenerMap[envoyServiceName] = listener } - listeners = append(listeners, listener) } + }) + + clusters := make([]cache.Resource, 0, len(clusterMap)) + for _, cluster := range clusterMap { + clusters = append(clusters, cluster) + } + + listeners := make([]cache.Resource, 0, len(listenerMap)) + for _, listener := range listenerMap { + listeners = append(listeners, listener) } - return listeners, nil + return EnvoyResources{ + Clusters: clusters, + Listeners: listeners, + } } -// EnvoyListenerFromService creates an Envoy listener from a service instance -func EnvoyListenerFromService(svc *service.Service, port int64, bindIP string) (cache.Resource, error) { - apiName := SvcName(svc.Name, port) +// envoyListenerFromService creates an Envoy listener from a service instance +func envoyListenerFromService(svc *service.Service, envoyServiceName string, + servicePort int64, bindIP string) (cache.Resource, error) { var connectionManagerName string var connectionManager proto.Message @@ -127,7 +170,7 @@ func EnvoyListenerFromService(svc *service.Service, port int64, bindIP string) ( RouteSpecifier: &hcm.HttpConnectionManager_RouteConfig{ RouteConfig: &api.RouteConfiguration{ VirtualHosts: []*route.VirtualHost{{ - Name: apiName, + Name: envoyServiceName, Domains: []string{"*"}, Routes: []*route.Route{{ Match: &route.RouteMatch{ @@ -138,7 +181,7 @@ func EnvoyListenerFromService(svc *service.Service, port int64, bindIP string) ( Action: &route.Route_Route{ Route: &route.RouteAction{ ClusterSpecifier: &route.RouteAction_Cluster{ - Cluster: apiName, + Cluster: envoyServiceName, }, Timeout: &duration.Duration{}, }, @@ -154,7 +197,7 @@ func EnvoyListenerFromService(svc *service.Service, port int64, bindIP string) ( connectionManager = &tcpp.TcpProxy{ StatPrefix: "ingress_tcp", ClusterSpecifier: &tcpp.TcpProxy_Cluster{ - Cluster: apiName, + Cluster: envoyServiceName, }, } default: @@ -173,7 +216,7 @@ func EnvoyListenerFromService(svc *service.Service, port int64, bindIP string) ( SocketAddress: &core.SocketAddress{ Address: bindIP, PortSpecifier: &core.SocketAddress_PortValue{ - PortValue: uint32(port), + PortValue: uint32(servicePort), }, }, }, @@ -189,80 +232,8 @@ func EnvoyListenerFromService(svc *service.Service, port int64, bindIP string) ( }, nil } -// EnvoyClustersFromState genenerates a list of Envoy clusters from the -// current Sidecar state -func EnvoyClustersFromState(state *catalog.ServicesState, useHostnames bool) []cache.Resource { - state.RLock() - defer state.RUnlock() - - // `s.state.ByService()` returns the list of service endpoints for each service. - // Since some services can expose multiple service ports, we need to create a - // separate cluster for each (service, servicePort) pair. If a service doesn't - // have any endpoints that are alive, we don't want to create a cluster for it. - // - // Note that in `EnvoyClustersFromState()` for the REST API we only need - // the first non-nil alive endpoint instance to construct the cluster - // because, in that case, SDS (now EDS) fetches the actual endpoints in a - // separate call. - var clusters []cache.Resource - clustersMap := make(map[string]*api.Cluster) - for svcName, svcEndpoints := range state.ByService() { - if len(svcEndpoints) < 1 { - continue - } - - for _, svcEndpoint := range svcEndpoints { - if svcEndpoint == nil || !svcEndpoint.IsAlive() { - continue - } - - for _, port := range svcEndpoint.Ports { - if port.ServicePort < 1 { - continue - } - - envoyServiceName := SvcName(svcName, port.ServicePort) - - if cluster, ok := clustersMap[envoyServiceName]; ok { - cluster.LoadAssignment.Endpoints[0].LbEndpoints = - append(cluster.LoadAssignment.Endpoints[0].LbEndpoints, - envoyServiceFromService(svcEndpoint, port.ServicePort, useHostnames)...) - } else { - envoyCluster := &api.Cluster{ - Name: envoyServiceName, - ConnectTimeout: &duration.Duration{Nanos: 500000000}, // 500ms - ClusterDiscoveryType: &api.Cluster_Type{Type: api.Cluster_STATIC}, // Use IPs only - ProtocolSelection: api.Cluster_USE_CONFIGURED_PROTOCOL, - // Setting the endpoints here directly bypasses EDS, so we can - // avoid having to configure that as well - LoadAssignment: &api.ClusterLoadAssignment{ - ClusterName: envoyServiceName, - Endpoints: []*endpoint.LocalityLbEndpoints{{ - LbEndpoints: envoyServiceFromService(svcEndpoint, port.ServicePort, useHostnames), - }}, - }, - // Contour believes the IdleTimeout should be set to 60s. Not sure if we also need to enable these. - // See here: https://github.com/projectcontour/contour/blob/2858fec20d26f56cc75a19d91b61d625a86f36de/internal/envoy/listener.go#L102-L106 - // CommonHttpProtocolOptions: &core.HttpProtocolOptions{ - // IdleTimeout: &duration.Duration{Seconds: 60}, - // MaxConnectionDuration: &duration.Duration{Seconds: 60}, - // }, - // If this needs to be enabled, we might also need to set `ProtocolSelection: api.USE_DOWNSTREAM_PROTOCOL`. - // Http2ProtocolOptions: &core.Http2ProtocolOptions{}, - } - - clustersMap[envoyServiceName] = envoyCluster - clusters = append(clusters, envoyCluster) - } - } - } - } - - return clusters -} - -// envoyServiceFromService converts a Sidecar service to an Envoy -// API service for reporting to the proxy +// envoyServiceFromService converts a Sidecar service to an Envoy API service for +// reporting to the proxy func envoyServiceFromService(svc *service.Service, svcPort int64, useHostnames bool) []*endpoint.LbEndpoint { var endpoints []*endpoint.LbEndpoint for _, port := range svc.Ports { diff --git a/envoy/server.go b/envoy/server.go index 38d741ef..8e911b3f 100644 --- a/envoy/server.go +++ b/envoy/server.go @@ -70,19 +70,19 @@ func (s *Server) Run(ctx context.Context, looper director.Looper, grpcListener n go looper.Loop(func() error { s.state.RLock() lastChanged := s.state.LastChanged - s.state.RUnlock() // Do nothing if the state hasn't changed if lastChanged == prevStateLastChanged { + s.state.RUnlock() return nil } + resources := adapter.EnvoyResourcesFromState(s.state, s.config.BindIP, s.config.UseHostnames) + s.state.RUnlock() prevStateLastChanged = lastChanged snapshotVersion := newSnapshotVersion() - clusters := adapter.EnvoyClustersFromState(s.state, s.config.UseHostnames) - // Set the new clusters in the current snapshot to send them along with the // previous listeners to Envoy. If we would pass in the new listeners too, Envoy // will complain if it happens to receive the new listeners before the new clusters @@ -93,9 +93,9 @@ func (s *Server) Run(ctx context.Context, looper director.Looper, grpcListener n snapshot, err := s.snapshotCache.GetSnapshot(hostname) if err != nil { // During the first iteration, there is no existing snapshot, so we create one - snapshot = cache.NewSnapshot(snapshotVersion, nil, clusters, nil, nil, nil) + snapshot = cache.NewSnapshot(snapshotVersion, nil, resources.Clusters, nil, nil, nil) } else { - snapshot.Resources[cache.Cluster] = cache.NewResources(snapshotVersion, clusters) + snapshot.Resources[cache.Cluster] = cache.NewResources(snapshotVersion, resources.Clusters) } err = s.snapshotCache.SetSnapshot(hostname, snapshot) @@ -103,29 +103,23 @@ func (s *Server) Run(ctx context.Context, looper director.Looper, grpcListener n log.Errorf("Failed to set new Envoy cache snapshot: %s", err) return nil } - log.Infof("Sent %d clusters to Envoy with version %s", len(clusters), snapshotVersion) - - listeners, err := adapter.EnvoyListenersFromState(s.state, s.config.BindIP) - if err != nil { - log.Errorf("Failed to create Envoy listeners: %s", err) - return nil - } + log.Infof("Sent %d clusters to Envoy with version %s", len(resources.Clusters), snapshotVersion) // Create a new snapshot version and, finally, send the updated listeners to Envoy snapshotVersion = newSnapshotVersion() err = s.snapshotCache.SetSnapshot(hostname, cache.NewSnapshot( snapshotVersion, nil, - clusters, + resources.Clusters, nil, - listeners, + resources.Listeners, nil, )) if err != nil { log.Errorf("Failed to set new Envoy cache snapshot: %s", err) return nil } - log.Infof("Sent %d listeners to Envoy with version %s", len(listeners), snapshotVersion) + log.Infof("Sent %d listeners to Envoy with version %s", len(resources.Listeners), snapshotVersion) return nil }) From 86dfe99bb2356daa08bae27476fe3d66b39401c2 Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Tue, 28 Jan 2020 01:17:41 +0000 Subject: [PATCH 6/7] Update Travis go version to 1.13 Also remove deprecated sudo directive --- .travis.yml | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/.travis.yml b/.travis.yml index 06dc332d..66678474 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,8 @@ -language: go +os: linux +language: go go: - - 1.12.x - -sudo: required + - 1.13.x services: - docker @@ -12,7 +11,7 @@ env: - GO111MODULE=on before_install: - - sudo apt-get install -y nodejs + - nvm install node - curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(go env GOPATH)/bin v1.23.1 script: From 50c2fb7f17607da0bc91c51607f82e0f9d687efb Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Sat, 11 May 2019 23:15:27 +0100 Subject: [PATCH 7/7] Tombstone services before sending updates Services from expired servers need to be tombstoned before updating the state and informing listeners about them. --- catalog/services_state.go | 2 +- catalog/services_state_test.go | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/catalog/services_state.go b/catalog/services_state.go index 1ce92e0b..00a4a48a 100644 --- a/catalog/services_state.go +++ b/catalog/services_state.go @@ -175,8 +175,8 @@ func (state *ServicesState) ExpireServer(hostname string) { for _, svc := range state.Servers[hostname].Services { previousStatus := svc.Status - state.ServiceChanged(svc, previousStatus, svc.Updated) svc.Tombstone() + state.ServiceChanged(svc, previousStatus, svc.Updated) tombstones = append(tombstones, *svc) } diff --git a/catalog/services_state_test.go b/catalog/services_state_test.go index ac848f5a..d164edb2 100644 --- a/catalog/services_state_test.go +++ b/catalog/services_state_test.go @@ -670,6 +670,11 @@ func Test_ClusterMembershipManagement(t *testing.T) { state.AddServiceEntry(service1) state.AddServiceEntry(service2) + dummyListener := mockListener{ + events: make(chan ChangeEvent, len(state.Servers[hostname].Services)), + } + state.AddListener(&dummyListener) + go state.ExpireServer(hostname) expired := <-state.Broadcasts @@ -677,6 +682,13 @@ func Test_ClusterMembershipManagement(t *testing.T) { // Timestamps chagne when tombstoning, so regex match So(expired[0], ShouldMatch, "^{\"ID\":\"deadbeef.*\"Status\":1}$") So(expired[1], ShouldMatch, "^{\"ID\":\"deadbeef.*\"Status\":1}$") + + Convey("and sends the tombstones to any listener", func() { + for i := 0; i < len(state.Servers[hostname].Services); i++ { + changeEvent := <-dummyListener.Chan() + So(changeEvent.Service.Status, ShouldEqual, service.TOMBSTONE) + } + }) }) Convey("does not announce services for hosts with none", func() {