diff --git a/.travis.yml b/.travis.yml index 06dc332d..66678474 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,9 +1,8 @@ -language: go +os: linux +language: go go: - - 1.12.x - -sudo: required + - 1.13.x services: - docker @@ -12,7 +11,7 @@ env: - GO111MODULE=on before_install: - - sudo apt-get install -y nodejs + - nvm install node - curl -sfL https://install.goreleaser.com/github.com/golangci/golangci-lint.sh | sh -s -- -b $(go env GOPATH)/bin v1.23.1 script: diff --git a/README.md b/README.md index 2bd023b1..f831dfe5 100644 --- a/README.md +++ b/README.md @@ -21,9 +21,9 @@ or can also leverage: * [Lyft's Envoy Proxy](https://github.com/envoyproxy/envoy) - In less than a year it is fast becoming a core microservices architecture component. - Sidecar implements the Envoy proxy SDS, CDS, and LDS APIs (v1). These - allow a standalone Envoy to be entirely configured by Sidecar. This is - best used with Nitro's + Sidecar implements the Envoy proxy SDS, CDS, LDS (V1) and gRPC (V2) APIs. + These allow a standalone Envoy to be entirely configured by Sidecar. This + is best used with Nitro's [Envoy proxy container](https://hub.docker.com/r/gonitro/envoyproxy/tags/). * [haproxy-api](https://github.com/Nitro/haproxy-api) - A separation layer @@ -229,6 +229,12 @@ Defaults are in bold at the end of the line: * `HAPROXY_USE_HOSTNAMES`: Should we write hostnames in the HAproxy config instead of IP addresses? **`false`** + * `ENVOY_USE_GRPC_API`: Enable the Envoy gRPC API (V2) **`true`** + * `ENVOY_BIND_IP`: The IP that Envoy should bind to on the host **192.168.168.168** + * `ENVOY_USE_HOSTNAMES`: Should we write hostnames in the Envoy config instead + of IP addresses? **`false`** + * `ENVOY_GRPC_PORT`: The port for the Envoy API gRPC server **`7776`** + ### Ports @@ -424,7 +430,7 @@ The logging output is pretty good in the normal `info` level. It can be made quite verbose in `debug` mode, and contains lots of information about what's going on and what the current state is. The web interface also contains a lot of runtime information on the cluster and the services. If you are running -HAproxy, it's also recommneded that you expose the HAproxy stats port on 3212 +HAproxy, it's also recommended that you expose the HAproxy stats port on 3212 so that Sidecar can find it. Currently the web interface runs on port 7777 on each machine that runs @@ -457,11 +463,18 @@ Envoy Proxy Support ------------------- Envoy uses a very different model than HAproxy and thus Sidecar's support for -it is quite different from its support for HAproxy. Envoy makes requests to a -variety of discovery service APIs on a timed basis. Sidecar currently -implements three of these: the Cluster Discovery Service (CDS), the Service -Discovery Service (SDS), and the Listeners Discovery Service (LDS). Nitro -builds and supports [an Envoy +it is quite different from its support for HAproxy. + +When using the REST-based LDS API (V1), Envoy makes requests to a variety of +discovery service APIs on a timed basis. Sidecar currently implements three +of these: the Cluster Discovery Service (CDS), the Service Discovery Service +(SDS), and the Listeners Discovery Service (LDS). When using the gRPC V2 API, +Sidecar sends updates to Envoy as soon as possible via gRPC. + +Note that the LDS API (V1) has been deprecated by Envoy and it's recommended +to use the gRPC-based V2 API. + +Nitro builds and supports [an Envoy container](https://hub.docker.com/r/gonitro/envoyproxy/tags/) that is tested and works against Sidecar. This is the easiest way to run Envoy with Sidecar. You can find an example container configuration diff --git a/catalog/services_state.go b/catalog/services_state.go index db688418..00a4a48a 100644 --- a/catalog/services_state.go +++ b/catalog/services_state.go @@ -24,15 +24,16 @@ import ( // servers to Service lists and manages the lifecycle. const ( - TOMBSTONE_LIFESPAN = 3 * time.Hour // How long we keep tombstones around - TOMBSTONE_COUNT = 10 // Send tombstones at 1 per second 10 times - ALIVE_COUNT = 5 // Send new services at 1 per second 5 times - TOMBSTONE_SLEEP_INTERVAL = 2 * time.Second // Sleep between local service checks - TOMBSTONE_RETRANSMIT = 1 * time.Second // Time between tombstone retranmission - ALIVE_LIFESPAN = 1*time.Minute + 20*time.Second // Down if not heard from in 80 seconds - DRAINING_LIFESPAN = 10 * time.Minute // Down if not heard from in 10 minutes - ALIVE_SLEEP_INTERVAL = 1 * time.Second // Sleep between local service checks - ALIVE_BROADCAST_INTERVAL = 1 * time.Minute // Broadcast Alive messages every minute + TOMBSTONE_LIFESPAN = 3 * time.Hour // How long we keep tombstones around + TOMBSTONE_COUNT = 10 // Send tombstones at 1 per second 10 times + ALIVE_COUNT = 5 // Send new services at 1 per second 5 times + TOMBSTONE_SLEEP_INTERVAL = 2 * time.Second // Sleep between local service checks + TOMBSTONE_RETRANSMIT = 1 * time.Second // Time between tombstone retranmission + ALIVE_LIFESPAN = 1*time.Minute + 20*time.Second // Down if not heard from in 80 seconds + DRAINING_LIFESPAN = 10 * time.Minute // Down if not heard from in 10 minutes + ALIVE_SLEEP_INTERVAL = 1 * time.Second // Sleep between local service checks + ALIVE_BROADCAST_INTERVAL = 1 * time.Minute // Broadcast Alive messages every minute + LISTENER_EVENT_BUFFER_SIZE = 20 // The number of events that can be buffered in the listener eventChannel ) // A ChangeEvent represents the time and hostname that was modified and signals a major @@ -147,6 +148,9 @@ func (state *ServicesState) HasServer(hostname string) bool { // A server has left the cluster, so tombstone all of its records func (state *ServicesState) ExpireServer(hostname string) { + state.Lock() + defer state.Unlock() + if !state.HasServer(hostname) || len(state.Servers[hostname].Services) == 0 { log.Infof("No records to expire for %s", hostname) return @@ -171,8 +175,8 @@ func (state *ServicesState) ExpireServer(hostname string) { for _, svc := range state.Servers[hostname].Services { previousStatus := svc.Status - state.ServiceChanged(svc, previousStatus, svc.Updated) svc.Tombstone() + state.ServiceChanged(svc, previousStatus, svc.Updated) tombstones = append(tombstones, *svc) } diff --git a/catalog/services_state_test.go b/catalog/services_state_test.go index ac848f5a..d164edb2 100644 --- a/catalog/services_state_test.go +++ b/catalog/services_state_test.go @@ -670,6 +670,11 @@ func Test_ClusterMembershipManagement(t *testing.T) { state.AddServiceEntry(service1) state.AddServiceEntry(service2) + dummyListener := mockListener{ + events: make(chan ChangeEvent, len(state.Servers[hostname].Services)), + } + state.AddListener(&dummyListener) + go state.ExpireServer(hostname) expired := <-state.Broadcasts @@ -677,6 +682,13 @@ func Test_ClusterMembershipManagement(t *testing.T) { // Timestamps chagne when tombstoning, so regex match So(expired[0], ShouldMatch, "^{\"ID\":\"deadbeef.*\"Status\":1}$") So(expired[1], ShouldMatch, "^{\"ID\":\"deadbeef.*\"Status\":1}$") + + Convey("and sends the tombstones to any listener", func() { + for i := 0; i < len(state.Servers[hostname].Services); i++ { + changeEvent := <-dummyListener.Chan() + So(changeEvent.Service.Status, ShouldEqual, service.TOMBSTONE) + } + }) }) Convey("does not announce services for hosts with none", func() { diff --git a/catalog/url_listener.go b/catalog/url_listener.go index da76058c..e20c7c48 100644 --- a/catalog/url_listener.go +++ b/catalog/url_listener.go @@ -71,7 +71,7 @@ func NewUrlListener(listenurl string, managed bool) *UrlListener { Url: listenurl, looper: director.NewFreeLooper(director.FOREVER, errorChan), Client: &http.Client{Timeout: ClientTimeout, Jar: cookieJar}, - eventChannel: make(chan ChangeEvent, 20), + eventChannel: make(chan ChangeEvent, LISTENER_EVENT_BUFFER_SIZE), Retries: DefaultRetries, managed: managed, name: "UrlListener(" + listenurl + ")", diff --git a/config.go b/config/config.go similarity index 84% rename from config.go rename to config/config.go index 2f4316e7..7c03e010 100644 --- a/config.go +++ b/config/config.go @@ -1,9 +1,10 @@ -package main +package config import ( "time" "github.com/kelseyhightower/envconfig" + log "github.com/sirupsen/logrus" "gopkg.in/relistan/rubberneck.v1" ) @@ -24,6 +25,13 @@ type HAproxyConfig struct { UseHostnames bool `envconfig:"USE_HOSTNAMES"` } +type EnvoyConfig struct { + UseGRPCAPI bool `envconfig:"USE_GRPC_API" default:"true"` + BindIP string `envconfig:"BIND_IP" default:"192.168.168.168"` + UseHostnames bool `envconfig:"USE_HOSTNAMES"` + GRPCPort string `envconfig:"GRPC_PORT" default:"7776"` +} + type ServicesConfig struct { NameMatch string `envconfig:"NAME_MATCH"` ServiceNamer string `envconfig:"NAMER" default:"docker_label"` @@ -59,10 +67,11 @@ type Config struct { StaticDiscovery StaticConfig // STATIC_ Services ServicesConfig // SERVICES_ HAproxy HAproxyConfig // HAPROXY_ + Envoy EnvoyConfig // ENVOY_ Listeners ListenerUrlsConfig // LISTENERS_ } -func parseConfig() Config { +func ParseConfig() *Config { var config Config errs := []error{ @@ -71,15 +80,16 @@ func parseConfig() Config { envconfig.Process("static", &config.StaticDiscovery), envconfig.Process("services", &config.Services), envconfig.Process("haproxy", &config.HAproxy), + envconfig.Process("envoy", &config.Envoy), envconfig.Process("listeners", &config.Listeners), } for _, err := range errs { if err != nil { rubberneck.Print(config) - exitWithError(err, "Can't parse environment config!") + log.Fatalf("Can't parse environment config: %s", err) } } - return config + return &config } diff --git a/envoy/adapter/adapter.go b/envoy/adapter/adapter.go new file mode 100644 index 00000000..77173204 --- /dev/null +++ b/envoy/adapter/adapter.go @@ -0,0 +1,274 @@ +package adapter + +import ( + "fmt" + "net" + "strconv" + "strings" + + "github.com/Nitro/sidecar/catalog" + "github.com/Nitro/sidecar/service" + api "github.com/envoyproxy/go-control-plane/envoy/api/v2" + core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + endpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" + listener "github.com/envoyproxy/go-control-plane/envoy/api/v2/listener" + route "github.com/envoyproxy/go-control-plane/envoy/api/v2/route" + hcm "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2" + tcpp "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/tcp_proxy/v2" + "github.com/envoyproxy/go-control-plane/pkg/cache" + "github.com/envoyproxy/go-control-plane/pkg/wellknown" + "github.com/gogo/protobuf/proto" + "github.com/golang/protobuf/ptypes" + "github.com/golang/protobuf/ptypes/duration" + log "github.com/sirupsen/logrus" +) + +const ( + // ServiceNameSeparator is used to join service name and port. Must not occur in service names. + ServiceNameSeparator = ":" +) + +// EnvoyResources is a collection of Enovy API resource definitions +type EnvoyResources struct { + Clusters []cache.Resource + Listeners []cache.Resource +} + +// SvcName formats an Envoy service name from our service name and port +func SvcName(name string, port int64) string { + return fmt.Sprintf("%s%s%d", name, ServiceNameSeparator, port) +} + +// SvcNameSplit an Enovy service name into our service name and port +func SvcNameSplit(name string) (string, int64, error) { + parts := strings.Split(name, ServiceNameSeparator) + if len(parts) < 2 { + return "", -1, fmt.Errorf("%s", "Unable to split service name and port!") + } + + svcName := parts[0] + svcPort, err := strconv.ParseInt(parts[1], 10, 64) + if err != nil { + return "", -1, fmt.Errorf("%s", "Unable to parse port!") + } + + return svcName, svcPort, nil +} + +// LookupHost does a vv slow lookup of the DNS host for a service. Totally +// not optimized for high throughput. You should only do this in development +// scenarios. +func LookupHost(hostname string) (string, error) { + addrs, err := net.LookupHost(hostname) + + if err != nil { + return "", err + } + return addrs[0], nil +} + +// EnvoyResourcesFromState creates a set of Enovy API resource definitions from all +// the ServicePorts in the Sidecar state. The Sidecar state needs to be locked by the +// caller before calling this function. +func EnvoyResourcesFromState(state *catalog.ServicesState, bindIP string, + useHostnames bool) EnvoyResources { + + clusterMap := make(map[string]*api.Cluster) + listenerMap := make(map[string]cache.Resource) + + state.EachService(func(hostname *string, id *string, svc *service.Service) { + if svc == nil || !svc.IsAlive() { + return + } + + // Loop over the ports and generate a named listener for each port + for _, port := range svc.Ports { + // Only listen on ServicePorts + if port.ServicePort < 1 { + continue + } + + envoyServiceName := SvcName(svc.Name, port.ServicePort) + + if cluster, ok := clusterMap[envoyServiceName]; ok { + cluster.LoadAssignment.Endpoints[0].LbEndpoints = + append(cluster.LoadAssignment.Endpoints[0].LbEndpoints, + envoyServiceFromService(svc, port.ServicePort, useHostnames)...) + } else { + envoyCluster := &api.Cluster{ + Name: envoyServiceName, + ConnectTimeout: &duration.Duration{Nanos: 500000000}, // 500ms + ClusterDiscoveryType: &api.Cluster_Type{Type: api.Cluster_STATIC}, // Use IPs only + ProtocolSelection: api.Cluster_USE_CONFIGURED_PROTOCOL, + // Setting the endpoints here directly bypasses EDS, so we can + // avoid having to configure that as well + // Note that in `EnvoyClustersFromState()` for the REST API we only need + // the first non-nil alive endpoint instance to construct the cluster + // because, in that case, SDS (now EDS) fetches the actual endpoints in a + // separate call. + LoadAssignment: &api.ClusterLoadAssignment{ + ClusterName: envoyServiceName, + Endpoints: []*endpoint.LocalityLbEndpoints{{ + LbEndpoints: envoyServiceFromService(svc, port.ServicePort, useHostnames), + }}, + }, + // Contour believes the IdleTimeout should be set to 60s. Not sure if we also need to enable these. + // See here: https://github.com/projectcontour/contour/blob/2858fec20d26f56cc75a19d91b61d625a86f36de/internal/envoy/listener.go#L102-L106 + // CommonHttpProtocolOptions: &core.HttpProtocolOptions{ + // IdleTimeout: &duration.Duration{Seconds: 60}, + // MaxConnectionDuration: &duration.Duration{Seconds: 60}, + // }, + // If this needs to be enabled, we might also need to set `ProtocolSelection: api.USE_DOWNSTREAM_PROTOCOL`. + // Http2ProtocolOptions: &core.Http2ProtocolOptions{}, + } + + clusterMap[envoyServiceName] = envoyCluster + } + + if _, ok := listenerMap[envoyServiceName]; !ok { + listener, err := envoyListenerFromService(svc, envoyServiceName, port.ServicePort, bindIP) + if err != nil { + log.Errorf("Failed to create Envoy listener for service %q and port %d: %s", svc.Name, port.ServicePort, err) + continue + } + listenerMap[envoyServiceName] = listener + } + } + }) + + clusters := make([]cache.Resource, 0, len(clusterMap)) + for _, cluster := range clusterMap { + clusters = append(clusters, cluster) + } + + listeners := make([]cache.Resource, 0, len(listenerMap)) + for _, listener := range listenerMap { + listeners = append(listeners, listener) + } + + return EnvoyResources{ + Clusters: clusters, + Listeners: listeners, + } +} + +// envoyListenerFromService creates an Envoy listener from a service instance +func envoyListenerFromService(svc *service.Service, envoyServiceName string, + servicePort int64, bindIP string) (cache.Resource, error) { + + var connectionManagerName string + var connectionManager proto.Message + switch svc.ProxyMode { + case "http": + connectionManagerName = wellknown.HTTPConnectionManager + + connectionManager = &hcm.HttpConnectionManager{ + StatPrefix: "ingress_http", + HttpFilters: []*hcm.HttpFilter{{ + Name: wellknown.Router, + }}, + RouteSpecifier: &hcm.HttpConnectionManager_RouteConfig{ + RouteConfig: &api.RouteConfiguration{ + VirtualHosts: []*route.VirtualHost{{ + Name: envoyServiceName, + Domains: []string{"*"}, + Routes: []*route.Route{{ + Match: &route.RouteMatch{ + PathSpecifier: &route.RouteMatch_Prefix{ + Prefix: "/", + }, + }, + Action: &route.Route_Route{ + Route: &route.RouteAction{ + ClusterSpecifier: &route.RouteAction_Cluster{ + Cluster: envoyServiceName, + }, + Timeout: &duration.Duration{}, + }, + }, + }}, + }}, + }, + }, + } + case "tcp": + connectionManagerName = wellknown.TCPProxy + + connectionManager = &tcpp.TcpProxy{ + StatPrefix: "ingress_tcp", + ClusterSpecifier: &tcpp.TcpProxy_Cluster{ + Cluster: envoyServiceName, + }, + } + default: + return nil, fmt.Errorf("unrecognised proxy mode: %s", svc.ProxyMode) + } + + serialisedConnectionManager, err := ptypes.MarshalAny(connectionManager) + if err != nil { + return nil, fmt.Errorf("failed to create the connection manager: %s", err) + } + + return &api.Listener{ + Name: svc.Name, + Address: &core.Address{ + Address: &core.Address_SocketAddress{ + SocketAddress: &core.SocketAddress{ + Address: bindIP, + PortSpecifier: &core.SocketAddress_PortValue{ + PortValue: uint32(servicePort), + }, + }, + }, + }, + FilterChains: []*listener.FilterChain{{ + Filters: []*listener.Filter{{ + Name: connectionManagerName, + ConfigType: &listener.Filter_TypedConfig{ + TypedConfig: serialisedConnectionManager, + }, + }}, + }}, + }, nil +} + +// envoyServiceFromService converts a Sidecar service to an Envoy API service for +// reporting to the proxy +func envoyServiceFromService(svc *service.Service, svcPort int64, useHostnames bool) []*endpoint.LbEndpoint { + var endpoints []*endpoint.LbEndpoint + for _, port := range svc.Ports { + // No sense worrying about unexposed ports + if port.ServicePort == svcPort { + address := port.IP + + // NOT recommended... this is very slow. Useful in dev modes where you + // need to resolve to a different IP address only. + if useHostnames { + if host, err := LookupHost(svc.Hostname); err == nil { + address = host + } else { + log.Warnf("Unable to resolve %s, using IP address", svc.Hostname) + } + } + + endpoints = append(endpoints, &endpoint.LbEndpoint{ + HostIdentifier: &endpoint.LbEndpoint_Endpoint{ + Endpoint: &endpoint.Endpoint{ + Address: &core.Address{ + Address: &core.Address_SocketAddress{ + SocketAddress: &core.SocketAddress{ + Address: address, + PortSpecifier: &core.SocketAddress_PortValue{ + PortValue: uint32(port.Port), + }, + }, + }, + }, + }, + }, + }) + } + } + + return endpoints +} diff --git a/envoy/server.go b/envoy/server.go new file mode 100644 index 00000000..8e911b3f --- /dev/null +++ b/envoy/server.go @@ -0,0 +1,154 @@ +package envoy + +import ( + "context" + "net" + "strconv" + "strings" + "time" + + "github.com/Nitro/sidecar/catalog" + "github.com/Nitro/sidecar/config" + "github.com/Nitro/sidecar/envoy/adapter" + api "github.com/envoyproxy/go-control-plane/envoy/api/v2" + envoy_discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" + "github.com/envoyproxy/go-control-plane/pkg/cache" + xds "github.com/envoyproxy/go-control-plane/pkg/server" + "github.com/relistan/go-director" + log "github.com/sirupsen/logrus" + "google.golang.org/grpc" +) + +const ( + // LooperUpdateInterval indicates how often to check if the state has changed + LooperUpdateInterval = 1 * time.Second +) + +type xdsCallbacks struct{} + +func (*xdsCallbacks) OnStreamOpen(context.Context, int64, string) error { return nil } +func (*xdsCallbacks) OnStreamClosed(int64) {} +func (*xdsCallbacks) OnStreamRequest(int64, *api.DiscoveryRequest) error { return nil } +func (*xdsCallbacks) OnStreamResponse(_ int64, req *api.DiscoveryRequest, _ *api.DiscoveryResponse) { + if req.GetErrorDetail().GetCode() != 0 { + log.Errorf("Received Envoy error code %d: %s", + req.GetErrorDetail().GetCode(), + strings.ReplaceAll(req.GetErrorDetail().GetMessage(), "\n", ""), + ) + } +} +func (*xdsCallbacks) OnFetchRequest(context.Context, *api.DiscoveryRequest) error { return nil } +func (*xdsCallbacks) OnFetchResponse(*api.DiscoveryRequest, *api.DiscoveryResponse) {} + +// Server is a wrapper around Envoy's control plane xDS gRPC server and it uses +// the Aggregated Discovery Service (ADS) mechanism. +type Server struct { + config config.EnvoyConfig + state *catalog.ServicesState + snapshotCache cache.SnapshotCache + xdsServer xds.Server +} + +// newSnapshotVersion returns a unique version for Envoy cache snapshots +func newSnapshotVersion() string { + // When triggering watches after a cache snapshot is set, the go-control-plane + // only sends resources which have a different version to Envoy. + // `time.Now().UnixNano()` should always return a unique number. + return strconv.FormatInt(time.Now().UnixNano(), 10) +} + +// Run starts the Envoy update looper and the Envoy gRPC server +func (s *Server) Run(ctx context.Context, looper director.Looper, grpcListener net.Listener) { + // The local hostname needs to match the value passed via `--service-node` to Envoy + // See https://github.com/envoyproxy/envoy/issues/144#issuecomment-267401271 + // This never changes, so we don't need to lock the state here + hostname := s.state.Hostname + + // prevStateLastChanged caches the state.LastChanged timestamp when we send an + // update to Envoy + prevStateLastChanged := time.Unix(0, 0) + go looper.Loop(func() error { + s.state.RLock() + lastChanged := s.state.LastChanged + + // Do nothing if the state hasn't changed + if lastChanged == prevStateLastChanged { + s.state.RUnlock() + return nil + } + resources := adapter.EnvoyResourcesFromState(s.state, s.config.BindIP, s.config.UseHostnames) + s.state.RUnlock() + + prevStateLastChanged = lastChanged + + snapshotVersion := newSnapshotVersion() + + // Set the new clusters in the current snapshot to send them along with the + // previous listeners to Envoy. If we would pass in the new listeners too, Envoy + // will complain if it happens to receive the new listeners before the new clusters + // because some of the listeners might be associated with clusters which don't + // exit yet. + // See the eventual consistency considerations in the documentation for details: + // https://www.envoyproxy.io/docs/envoy/latest/api-docs/xds_protocol#eventual-consistency-considerations + snapshot, err := s.snapshotCache.GetSnapshot(hostname) + if err != nil { + // During the first iteration, there is no existing snapshot, so we create one + snapshot = cache.NewSnapshot(snapshotVersion, nil, resources.Clusters, nil, nil, nil) + } else { + snapshot.Resources[cache.Cluster] = cache.NewResources(snapshotVersion, resources.Clusters) + } + + err = s.snapshotCache.SetSnapshot(hostname, snapshot) + if err != nil { + log.Errorf("Failed to set new Envoy cache snapshot: %s", err) + return nil + } + log.Infof("Sent %d clusters to Envoy with version %s", len(resources.Clusters), snapshotVersion) + + // Create a new snapshot version and, finally, send the updated listeners to Envoy + snapshotVersion = newSnapshotVersion() + err = s.snapshotCache.SetSnapshot(hostname, cache.NewSnapshot( + snapshotVersion, + nil, + resources.Clusters, + nil, + resources.Listeners, + nil, + )) + if err != nil { + log.Errorf("Failed to set new Envoy cache snapshot: %s", err) + return nil + } + log.Infof("Sent %d listeners to Envoy with version %s", len(resources.Listeners), snapshotVersion) + + return nil + }) + + grpcServer := grpc.NewServer() + envoy_discovery.RegisterAggregatedDiscoveryServiceServer(grpcServer, s.xdsServer) + + go func() { + if err := grpcServer.Serve(grpcListener); err != nil { + log.Fatalf("Failed to start Envoy gRPC server: %s", err) + } + }() + + // Currently, this will block forever + <-ctx.Done() + grpcServer.GracefulStop() +} + +// NewServer creates a new Server instance +func NewServer(ctx context.Context, state *catalog.ServicesState, config config.EnvoyConfig) *Server { + // Instruct the snapshot cache to use Aggregated Discovery Service (ADS) + // The third parameter can contain a logger instance, but I didn't find + // those logs particularly useful. + snapshotCache := cache.NewSnapshotCache(true, cache.IDHash{}, nil) + + return &Server{ + config: config, + state: state, + snapshotCache: snapshotCache, + xdsServer: xds.NewServer(ctx, snapshotCache, &xdsCallbacks{}), + } +} diff --git a/envoy/server_test.go b/envoy/server_test.go new file mode 100644 index 00000000..40e0d2a4 --- /dev/null +++ b/envoy/server_test.go @@ -0,0 +1,380 @@ +package envoy + +import ( + "context" + "fmt" + "io" + "net" + "sort" + "testing" + "time" + + "github.com/Nitro/sidecar/catalog" + "github.com/Nitro/sidecar/config" + "github.com/Nitro/sidecar/envoy/adapter" + "github.com/Nitro/sidecar/service" + api "github.com/envoyproxy/go-control-plane/envoy/api/v2" + core "github.com/envoyproxy/go-control-plane/envoy/api/v2/core" + endpoint "github.com/envoyproxy/go-control-plane/envoy/api/v2/endpoint" + hcm "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/http_connection_manager/v2" + tcpp "github.com/envoyproxy/go-control-plane/envoy/config/filter/network/tcp_proxy/v2" + envoy_discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v2" + "github.com/envoyproxy/go-control-plane/pkg/cache" + xds "github.com/envoyproxy/go-control-plane/pkg/server" + "github.com/envoyproxy/go-control-plane/pkg/wellknown" + "github.com/golang/protobuf/ptypes" + "github.com/golang/protobuf/ptypes/any" + "github.com/relistan/go-director" + . "github.com/smartystreets/goconvey/convey" + "google.golang.org/grpc" +) + +const ( + bindIP = "192.168.168.168" +) + +var ( + validators = map[string]func(*any.Any, service.Service){ + cache.ListenerType: validateListener, + cache.ClusterType: validateCluster, + } +) + +func validateListener(serialisedListener *any.Any, svc service.Service) { + listener := &api.Listener{} + err := ptypes.UnmarshalAny(serialisedListener, listener) + So(err, ShouldBeNil) + So(listener.Name, ShouldEqual, svc.Name) + So(listener.GetAddress().GetSocketAddress().GetAddress(), ShouldEqual, bindIP) + So(listener.GetAddress().GetSocketAddress().GetPortValue(), ShouldEqual, svc.Ports[0].ServicePort) + filterChains := listener.GetFilterChains() + So(filterChains, ShouldHaveLength, 1) + filters := filterChains[0].GetFilters() + So(filters, ShouldHaveLength, 1) + + if svc.ProxyMode == "http" { + So(filters[0].GetName(), ShouldEqual, wellknown.HTTPConnectionManager) + connectionManager := &hcm.HttpConnectionManager{} + err = ptypes.UnmarshalAny(filters[0].GetTypedConfig(), connectionManager) + So(err, ShouldBeNil) + So(connectionManager.GetStatPrefix(), ShouldEqual, "ingress_http") + So(connectionManager.GetRouteConfig(), ShouldNotBeNil) + So(connectionManager.GetRouteConfig().GetVirtualHosts(), ShouldHaveLength, 1) + virtualHost := connectionManager.GetRouteConfig().GetVirtualHosts()[0] + So(virtualHost.GetName(), ShouldEqual, adapter.SvcName(svc.Name, svc.Ports[0].ServicePort)) + So(virtualHost.GetRoutes(), ShouldHaveLength, 1) + route := virtualHost.GetRoutes()[0].GetRoute() + So(route, ShouldNotBeNil) + So(route.GetCluster(), ShouldEqual, adapter.SvcName(svc.Name, svc.Ports[0].ServicePort)) + So(route.GetTimeout(), ShouldNotBeNil) + } else { // tcp + So(filters[0].GetName(), ShouldEqual, wellknown.TCPProxy) + connectionManager := &tcpp.TcpProxy{} + err = ptypes.UnmarshalAny(filters[0].GetTypedConfig(), connectionManager) + So(err, ShouldBeNil) + So(connectionManager.GetStatPrefix(), ShouldEqual, "ingress_tcp") + So(connectionManager.GetCluster(), ShouldEqual, adapter.SvcName(svc.Name, svc.Ports[0].ServicePort)) + } +} + +func extractClusterEndpoints(serialisedCluster *any.Any, svc service.Service) []*endpoint.LbEndpoint { + cluster := &api.Cluster{} + err := ptypes.UnmarshalAny(serialisedCluster, cluster) + So(err, ShouldBeNil) + So(cluster.Name, ShouldEqual, adapter.SvcName(svc.Name, svc.Ports[0].ServicePort)) + So(cluster.GetConnectTimeout().GetNanos(), ShouldEqual, 500000000) + loadAssignment := cluster.GetLoadAssignment() + So(loadAssignment, ShouldNotBeNil) + So(loadAssignment.GetClusterName(), ShouldEqual, adapter.SvcName(svc.Name, svc.Ports[0].ServicePort)) + localityEndpoints := loadAssignment.GetEndpoints() + So(localityEndpoints, ShouldHaveLength, 1) + + return localityEndpoints[0].GetLbEndpoints() +} + +func validateCluster(serialisedCluster *any.Any, svc service.Service) { + endpoints := extractClusterEndpoints(serialisedCluster, svc) + So(endpoints, ShouldHaveLength, 1) + So(endpoints[0].GetEndpoint().GetAddress().GetSocketAddress().GetAddress(), ShouldEqual, svc.Ports[0].IP) + So(endpoints[0].GetEndpoint().GetAddress().GetSocketAddress().GetPortValue(), ShouldEqual, svc.Ports[0].Port) +} + +// EnvoyMock is used to validate the Envoy state by making the same gRPC stream calls +// to the Server as Envoy would +type EnvoyMock struct { + nonces map[string]string +} + +func NewEnvoyMock() EnvoyMock { + return EnvoyMock{ + nonces: make(map[string]string), + } +} + +func (sv *EnvoyMock) GetResource(stream envoy_discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient, resource string, hostname string) []*any.Any { + nonce, ok := sv.nonces[resource] + if !ok { + // Set the initial nonce to 0 for each resource type. The control plane will increment + // it after each call, so we need to pass back the value we last received. + nonce = "0" + } + err := stream.Send(&api.DiscoveryRequest{ + VersionInfo: "1", + Node: &core.Node{ + Id: hostname, + }, + TypeUrl: resource, + ResponseNonce: nonce, + }) + if err != nil && err != io.EOF { + So(err, ShouldBeNil) + } + + // Recv() blocks until the stream ctx expires if the message sent via Send() is not recognised / valid + response, err := stream.Recv() + + So(err, ShouldBeNil) + + sv.nonces[resource] = response.GetNonce() + + return response.Resources +} + +func (sv *EnvoyMock) ValidateResources(stream envoy_discovery.AggregatedDiscoveryService_StreamAggregatedResourcesClient, svc service.Service, hostname string) { + for resourceType, validator := range validators { + resources := sv.GetResource(stream, resourceType, hostname) + So(resources, ShouldHaveLength, 1) + validator(resources[0], svc) + } +} + +// SnapshotCache is a light wrapper around cache.SnapshotCache which lets +// us get a notification after calling SetSnapshot via the Waiter chan +type SnapshotCache struct { + cache.SnapshotCache + Waiter chan struct{} + PreCallWaiter chan struct{} +} + +func (c *SnapshotCache) SetSnapshot(node string, snapshot cache.Snapshot) error { + if c.PreCallWaiter != nil { + <-c.PreCallWaiter + } + + err := c.SnapshotCache.SetSnapshot(node, snapshot) + + c.Waiter <- struct{}{} + + return err +} + +func NewSnapshotCache() *SnapshotCache { + return &SnapshotCache{ + SnapshotCache: cache.NewSnapshotCache(true, cache.IDHash{}, nil), + Waiter: make(chan struct{}), + } +} + +func Test_PortForServicePort(t *testing.T) { + Convey("Run()", t, func() { + config := config.EnvoyConfig{ + UseGRPCAPI: true, + BindIP: bindIP, + } + + state := catalog.NewServicesState() + + dummyHostname := "carcasone" + baseTime := time.Now().UTC() + httpSvc := service.Service{ + ID: "deadbeef123", + Name: "bocaccio", + Created: baseTime, + Hostname: dummyHostname, + Updated: baseTime, + Status: service.ALIVE, + ProxyMode: "http", + Ports: []service.Port{ + {IP: "127.0.0.1", Port: 9990, ServicePort: 10100}, + }, + } + + anotherHTTPSvc := service.Service{ + ID: "deadbeef456", + Name: "bocaccio", + Created: baseTime, + Hostname: dummyHostname, + Updated: baseTime, + Status: service.ALIVE, + ProxyMode: "http", + Ports: []service.Port{ + {IP: "127.0.0.1", Port: 9991, ServicePort: 10100}, + }, + } + + tcpSvc := service.Service{ + ID: "undeadbeef", + Name: "tolstoy", + Created: baseTime, + Hostname: state.Hostname, + Updated: baseTime, + Status: service.ALIVE, + ProxyMode: "tcp", + Ports: []service.Port{ + {IP: "127.0.0.1", Port: 666, ServicePort: 10101}, + }, + } + + ctx, cancel := context.WithCancel(context.Background()) + Reset(func() { + cancel() + }) + + // Use a custom SnapshotCache in the xdsServer so we can block after updating + // the state until the Server gets a chance to set a new snapshot in the cache + snapshotCache := NewSnapshotCache() + server := &Server{ + config: config, + state: state, + snapshotCache: snapshotCache, + xdsServer: xds.NewServer(ctx, snapshotCache, &xdsCallbacks{}), + } + + // The gRPC listener will be assigned a random port and will be owned and managed + // by the gRPC server + lis, err := net.Listen("tcp", ":0") + So(err, ShouldBeNil) + So(lis.Addr(), ShouldHaveSameTypeAs, &net.TCPAddr{}) + + // Using a FreeLooper instead would make it run too often, triggering spurious + // locking on the state, which can cause the tests to time out + go server.Run(ctx, director.NewTimedLooper(director.FOREVER, 10*time.Millisecond, make(chan error)), lis) + + Convey("sends the Envoy state via gRPC", func() { + conn, err := grpc.DialContext(ctx, + fmt.Sprintf(":%d", lis.Addr().(*net.TCPAddr).Port), + grpc.WithInsecure(), grpc.WithBlock(), + ) + So(err, ShouldBeNil) + + // 100 milliseconds should give us enough time to run hundreds of server transactions + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + Reset(func() { + cancel() + }) + + stream, err := envoy_discovery.NewAggregatedDiscoveryServiceClient(conn).StreamAggregatedResources(ctx) + So(err, ShouldBeNil) + + envoyMock := NewEnvoyMock() + + Convey("for a HTTP service", func() { + state.AddServiceEntry(httpSvc) + <-snapshotCache.Waiter + <-snapshotCache.Waiter + + envoyMock.ValidateResources(stream, httpSvc, state.Hostname) + + Convey("and removes it after it gets tombstoned", func() { + httpSvc.Tombstone() + httpSvc.Updated.Add(1 * time.Millisecond) + state.AddServiceEntry(httpSvc) + <-snapshotCache.Waiter + <-snapshotCache.Waiter + + for resourceType := range validators { + resources := envoyMock.GetResource(stream, resourceType, state.Hostname) + So(resources, ShouldHaveLength, 0) + } + }) + + Convey("and places another instance of the same service in the same cluster", func() { + // Make sure this other service instance was more recently updated than httpSvc + anotherHTTPSvc.Updated = anotherHTTPSvc.Updated.Add(1 * time.Millisecond) + state.AddServiceEntry(anotherHTTPSvc) + <-snapshotCache.Waiter + <-snapshotCache.Waiter + + resources := envoyMock.GetResource(stream, cache.ClusterType, state.Hostname) + So(resources, ShouldHaveLength, 1) + endpoints := extractClusterEndpoints(resources[0], httpSvc) + So(endpoints, ShouldHaveLength, 2) + var ports sort.IntSlice + for _, endpoint := range endpoints { + ports = append(ports, + int(endpoint.GetEndpoint().GetAddress().GetSocketAddress().GetPortValue())) + } + ports.Sort() + So(ports, ShouldResemble, sort.IntSlice{9990, 9991}) + }) + }) + + Convey("for a TCP service", func() { + state.AddServiceEntry(tcpSvc) + <-snapshotCache.Waiter + <-snapshotCache.Waiter + + envoyMock.ValidateResources(stream, tcpSvc, state.Hostname) + }) + + Convey("and skips tombstones", func() { + httpSvc.Tombstone() + state.AddServiceEntry(httpSvc) + <-snapshotCache.Waiter + <-snapshotCache.Waiter + + for resourceType := range validators { + resources := envoyMock.GetResource(stream, resourceType, state.Hostname) + So(resources, ShouldHaveLength, 0) + } + }) + + Convey("and triggers an update when expiring a server with only one service running", func(c C) { + state.AddServiceEntry(httpSvc) + <-snapshotCache.Waiter + <-snapshotCache.Waiter + + done := make(chan struct{}) + go func() { + select { + case <-snapshotCache.Waiter: + <-snapshotCache.Waiter + close(done) + case <-time.After(100 * time.Millisecond): + c.So(true, ShouldEqual, false) + } + }() + + state.ExpireServer(dummyHostname) + <-done + + for resourceType := range validators { + resources := envoyMock.GetResource(stream, resourceType, state.Hostname) + So(resources, ShouldHaveLength, 0) + } + }) + + Convey("and sends an update with the new clusters", func() { + // The snapshotCache.PreCallWaiter will block before the first cache snapshot + // containing the clusters is set + snapshotCache.PreCallWaiter = make(chan struct{}) + state.AddServiceEntry(httpSvc) + + snapshotCache.PreCallWaiter <- struct{}{} + <-snapshotCache.Waiter + + clusters := envoyMock.GetResource(stream, cache.ClusterType, state.Hostname) + So(clusters, ShouldHaveLength, 1) + validateCluster(clusters[0], httpSvc) + + listeners := envoyMock.GetResource(stream, cache.ListenerType, state.Hostname) + So(listeners, ShouldHaveLength, 0) + + snapshotCache.PreCallWaiter <- struct{}{} + <-snapshotCache.Waiter + + envoyMock.ValidateResources(stream, httpSvc, state.Hostname) + }) + }) + }) +} diff --git a/go.mod b/go.mod index e23a75bc..804ca4c6 100644 --- a/go.mod +++ b/go.mod @@ -11,9 +11,10 @@ require ( github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf // indirect github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 github.com/containerd/continuity v0.0.0-20181203112020-004b46473808 // indirect + github.com/envoyproxy/go-control-plane v0.9.2 github.com/fsouza/go-dockerclient v1.3.1 - github.com/gogo/protobuf v1.2.1 // indirect - github.com/golang/protobuf v1.3.1 // indirect + github.com/gogo/protobuf v1.2.1 + github.com/golang/protobuf v1.3.2 github.com/gorilla/mux v1.6.2 github.com/hashicorp/go-msgpack v0.5.5 // indirect github.com/hashicorp/go-multierror v1.0.0 // indirect @@ -34,9 +35,9 @@ require ( github.com/smartystreets/goconvey v0.0.0-20190306220146-200a235640ff golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c // indirect golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c // indirect - golang.org/x/sync v0.0.0-20190423024810-112230192c58 // indirect golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5 // indirect golang.org/x/text v0.3.2 // indirect + google.golang.org/grpc v1.26.0 gopkg.in/alecthomas/kingpin.v2 v2.2.5 gopkg.in/jarcoal/httpmock.v1 v1.0.0-20170412085702-cf52904a3cf0 gopkg.in/relistan/rubberneck.v1 v1.0.1 diff --git a/go.sum b/go.sum index aea52cec..59d6e0d0 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78 h1:w+iIsaOQNcT7OZ575w+acHgRric5iCyQh+xv+KJ4HB8= github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/Microsoft/go-winio v0.4.11 h1:zoIOcVf0xPN1tnMVbTtEdI+P8OofVk3NObnwOQ6nK2Q= github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcyvVC4A4RozmA= @@ -20,8 +22,12 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878 h1:EFSB7Zo9Eg91v7MJPVsifUysc/wPdN+NOnVe6bWbdBM= github.com/armon/go-metrics v0.0.0-20190430140413-ec5e00d3c878/go.mod h1:3AMJUQhVx52RsWOnlkpikZr01T/yAVN2gn0861vByNg= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6Dob7S7YxXgwXpfOuvO54S+tGdZdw9fuRZt25Ag= github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f h1:WBZRG4aNOuI15bLRrCgN8fCq8E5Xuty6jGbmSNEvSsU= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/containerd/continuity v0.0.0-20180814194400-c7c5070e6f6e h1:KEBqsIJcjops96ysfjRTg3x6STnVHBxe7CZLwwnlkWA= github.com/containerd/continuity v0.0.0-20180814194400-c7c5070e6f6e/go.mod h1:GL3xCUCBDV3CZiTSEKksMWbLE66hEyuu9qyDOOqM47Y= github.com/containerd/continuity v0.0.0-20181203112020-004b46473808 h1:4BX8f882bXEDKfWIf0wa8HRvpnBoPszJJXL+TVbBw4M= @@ -37,6 +43,12 @@ github.com/docker/go-units v0.3.3 h1:Xk8S3Xj5sLGlG5g67hJmYMmUgXv5N4PhkjJHHqrwnTk github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/libnetwork v0.8.0-dev.2.0.20180608203834-19279f049241 h1:+ebE/hCU02srkeIg8Vp/vlUp182JapYWtXzV+bCeR2I= github.com/docker/libnetwork v0.8.0-dev.2.0.20180608203834-19279f049241/go.mod h1:93m0aTqz6z+g32wla4l4WxTrdtvBRmVzYRkYvasA5Z8= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.2 h1:GJ5MKABRjz+QuET1GHm0KD9HC/mAzb3g2FznLQ0aThc= +github.com/envoyproxy/go-control-plane v0.9.2/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/protoc-gen-validate v0.1.0 h1:EQciDnbrYxy13PgWoY8AqoxGiPrpgBZ1R8UNe3ddc+A= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsouza/go-dockerclient v1.3.1 h1:h0SaeiAGihssk+aZeKohbubHYKroCBlC7uuUyNhORI4= @@ -45,10 +57,13 @@ github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1 h1:/s5zKNz0uPFCZ5hddgPdo2TK2TVrUNMn0OOX8/aZMTE= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= -github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2 h1:6nsPYzhq5kReh6QImI3k5qWzO4PEbvbIW2cwSfR/6xs= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1 h1:EGx4pi6eqNxGaHF6qqu48+N2wcFQ5qg5FXgOdqsJ5d8= @@ -113,6 +128,7 @@ github.com/pquerna/ffjson v0.0.0-20171002144729-d49c2bc1aa13 h1:AUK/hm/tPsiNNASd github.com/pquerna/ffjson v0.0.0-20171002144729-d49c2bc1aa13/go.mod h1:YARuvh7BUWHNhzDq2OM5tzR2RiCcN2D7sapiKyCel/M= github.com/prometheus/client_golang v0.9.2/go.mod h1:OsXs2jCmiKlQ1lTBmv21f2mNfw4xf/QclQDMrYNZzcM= github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= github.com/relistan/go-director v0.0.0-20181104164737-5f56787d9731 h1:M8d8wZ2QCkGfp+N3LxT6bTFAXqhBV4Az450DuCqZEp0= @@ -145,12 +161,20 @@ golang.org/x/crypto v0.0.0-20180820150726-614d502a4dac/go.mod h1:6SG95UA2DQfeDnf golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c h1:Vj5n4GlwjmQteupaxJ9+0FNOmBrHfq7vN4btdGoDZgI= golang.org/x/crypto v0.0.0-20190325154230-a5d413f7728c/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d h1:g9qWBGx4puODJTMVyoPrpoxPFgVGd+z1DZwjfRu4d0I= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181201002055-351d144fa1fc/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c h1:uOCk1iQW6Vc18bnC13MfzScl+wdKBmM9Y9kU7Z83/lw= golang.org/x/net v0.0.0-20190503192946-f4e77d36d62c/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -158,6 +182,7 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58 h1:8gQV6CLnAEikrhgkHFbMAEha golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180824143301-4910a1d54f87 h1:GqwDwfvIpC33dK9bA1fD+JiDUNsuAiQiEkpHqUKze4o= golang.org/x/sys v0.0.0-20180824143301-4910a1d54f87/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190523142557-0e01d883c5c5 h1:sM3evRHxE/1RuMe1FYAL3j7C7fUfIjkbE+NiDAYUF8U= @@ -168,6 +193,20 @@ golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55 h1:gSJIx1SDwno+2ElGhA4+qG2zF97qiUzTM+rQ0klBOcE= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.26.0 h1:2dTRdpdFEEhJYQD8EMLB61nnrzSCTbG38PhqdhvOltg= +google.golang.org/grpc v1.26.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= gopkg.in/airbrake/gobrake.v2 v2.0.9 h1:7z2uVWwn7oVeeugY1DtlPAy5H+KYgB1KeKTnqjNatLo= gopkg.in/airbrake/gobrake.v2 v2.0.9/go.mod h1:/h5ZAUhDkGaJfjzjKLSjv6zCL6O0LLBxU4K+aSYdM/U= gopkg.in/alecthomas/kingpin.v2 v2.2.5 h1:qskSCq465uEvC3oGocwvZNsO3RF3SpLVLumOAhL0bXo= @@ -190,3 +229,5 @@ gotest.tools v2.1.0+incompatible h1:5USw7CrJBYKqjg9R7QlA6jzqZKEAtvW82aNmsxxGPxw= gotest.tools v2.1.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/main.go b/main.go index 0b2eaaf9..e44d6142 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,8 @@ package main // import "github.com/Nitro/sidecar" import ( + "context" + "net" "os" "os/signal" "runtime/pprof" @@ -8,7 +10,9 @@ import ( "github.com/Nitro/memberlist" "github.com/Nitro/sidecar/catalog" + "github.com/Nitro/sidecar/config" "github.com/Nitro/sidecar/discovery" + "github.com/Nitro/sidecar/envoy" "github.com/Nitro/sidecar/haproxy" "github.com/Nitro/sidecar/healthy" "github.com/Nitro/sidecar/service" @@ -37,7 +41,7 @@ func announceMembers(list *memberlist.Memberlist, state *catalog.ServicesState) // configureOverrides takes CLI opts and applies them over the top of settings // taken from the environment variables and stored in config. -func configureOverrides(config *Config, opts *CliOpts) { +func configureOverrides(config *config.Config, opts *CliOpts) { if len(*opts.AdvertiseIP) > 0 { config.Sidecar.AdvertiseIP = *opts.AdvertiseIP } @@ -55,7 +59,7 @@ func configureOverrides(config *Config, opts *CliOpts) { } } -func configureHAproxy(config Config) *haproxy.HAproxy { +func configureHAproxy(config *config.Config) *haproxy.HAproxy { proxy := haproxy.New(config.HAproxy.ConfigFile, config.HAproxy.PidFile) if len(config.HAproxy.BindIP) > 0 { @@ -87,7 +91,7 @@ func configureHAproxy(config Config) *haproxy.HAproxy { return proxy } -func configureDiscovery(config *Config, publishedIP string) discovery.Discoverer { +func configureDiscovery(config *config.Config, publishedIP string) discovery.Discoverer { disco := new(discovery.MultiDiscovery) var svcNamer discovery.ServiceNamer @@ -140,7 +144,7 @@ func configureDiscovery(config *Config, publishedIP string) discovery.Discoverer } // configureMetrics sets up remote performance metrics if we're asked to send them (statsd) -func configureMetrics(config *Config) { +func configureMetrics(config *config.Config) { if config.Sidecar.StatsAddr != "" { sink, err := metrics.NewStatsdSink(config.Sidecar.StatsAddr) exitWithError(err, "Can't configure Statsd") @@ -152,7 +156,7 @@ func configureMetrics(config *Config) { } // configureDelegate sets up the Memberlist delegate we'll use -func configureDelegate(state *catalog.ServicesState, config *Config) *servicesDelegate { +func configureDelegate(state *catalog.ServicesState, config *config.Config) *servicesDelegate { delegate := NewServicesDelegate(state) delegate.Metadata = NodeMetadata{ ClusterName: config.Sidecar.ClusterName, @@ -195,7 +199,7 @@ func configureCpuProfiler(opts *CliOpts) { } } -func configureLoggingLevel(config *Config) { +func configureLoggingLevel(config *config.Config) { level := config.Sidecar.LoggingLevel switch { @@ -213,7 +217,7 @@ func configureLoggingLevel(config *Config) { } // configureLoggingFormat switches between text and JSON log format -func configureLoggingFormat(config *Config) { +func configureLoggingFormat(config *config.Config) { if config.Sidecar.LoggingFormat == "json" { log.SetFormatter(&log.JSONFormatter{}) } else { @@ -222,7 +226,7 @@ func configureLoggingFormat(config *Config) { } } -func configureMemberlist(config *Config, state *catalog.ServicesState) *memberlist.Config { +func configureMemberlist(config *config.Config, state *catalog.ServicesState) *memberlist.Config { delegate := configureDelegate(state, config) // Use a LAN config but add our delegate @@ -258,7 +262,7 @@ func configureMemberlist(config *Config, state *catalog.ServicesState) *memberli } // configureListeners sets up any statically configured state change event listeners. -func configureListeners(config *Config, state *catalog.ServicesState) { +func configureListeners(config *config.Config, state *catalog.ServicesState) { for _, url := range config.Listeners.Urls { listener := catalog.NewUrlListener(url, false) listener.Watch(state) @@ -266,13 +270,13 @@ func configureListeners(config *Config, state *catalog.ServicesState) { } func main() { - config := parseConfig() + config := config.ParseConfig() opts := parseCommandLine() - configureOverrides(&config, opts) + configureOverrides(config, opts) configureCpuProfiler(opts) - configureLoggingLevel(&config) - configureLoggingFormat(&config) - configureMetrics(&config) + configureLoggingLevel(config) + configureLoggingFormat(config) + configureMetrics(config) // Create a new state instance and fire up the processor. We need // this to happen early in the startup. @@ -282,9 +286,9 @@ func main() { ) go state.ProcessServiceMsgs(svcMsgLooper) - configureListeners(&config, state) + configureListeners(config, state) - mlConfig := configureMemberlist(&config, state) + mlConfig := configureMemberlist(config, state) printer := rubberneck.NewPrinter(log.Infof, rubberneck.NoAddLineFeed) printer.PrintWithLabel("Sidecar", config) @@ -323,7 +327,7 @@ func main() { // Register the cluster name with the state object state.ClusterName = config.Sidecar.ClusterName - disco := configureDiscovery(&config, mlConfig.AdvertiseAddr) + disco := configureDiscovery(config, mlConfig.AdvertiseAddr) go disco.Run(discoLooper) // Configure the monitor and use the public address as the default @@ -372,5 +376,21 @@ func main() { exitWithError(err, "Failed to reload HAProxy config") } + if config.Envoy.UseGRPCAPI { + ctx := context.Background() + envoyServer := envoy.NewServer(ctx, state, config.Envoy) + envoyServerLooper := director.NewTimedLooper( + director.FOREVER, envoy.LooperUpdateInterval, make(chan error), + ) + + // This listener will be owned and managed by the gRPC server + grpcListener, err := net.Listen("tcp", ":"+config.Envoy.GRPCPort) + if err != nil { + log.Fatalf("Failed to listen on port %q: %s", config.Envoy.GRPCPort, err) + } + + go envoyServer.Run(ctx, envoyServerLooper, grpcListener) + } + select {} } diff --git a/services_delegate.go b/services_delegate.go index 54320857..c54c6d84 100644 --- a/services_delegate.go +++ b/services_delegate.go @@ -171,11 +171,7 @@ func (d *servicesDelegate) NotifyJoin(node *memberlist.Node) { func (d *servicesDelegate) NotifyLeave(node *memberlist.Node) { log.Debugf("NotifyLeave(): %s", node.Name) - go func() { - d.state.Lock() - defer d.state.Unlock() - d.state.ExpireServer(node.Name) - }() + go d.state.ExpireServer(node.Name) } func (d *servicesDelegate) NotifyUpdate(node *memberlist.Node) { diff --git a/sidecarhttp/envoy_api.go b/sidecarhttp/envoy_api.go index 5f98a60a..22348a22 100644 --- a/sidecarhttp/envoy_api.go +++ b/sidecarhttp/envoy_api.go @@ -4,25 +4,18 @@ package sidecarhttp import ( "fmt" - "net" "net/http" _ "net/http/pprof" - "strconv" - "strings" "github.com/Nitro/memberlist" "github.com/Nitro/sidecar/catalog" + "github.com/Nitro/sidecar/envoy/adapter" "github.com/Nitro/sidecar/service" "github.com/gorilla/mux" "github.com/pquerna/ffjson/ffjson" log "github.com/sirupsen/logrus" ) -const ( - // Used to join service name and port. Must not occur in service names - ServiceNameSeparator = ":" -) - // This file implements the Envoy proxy V1 API on top of a Sidecar // service discovery cluster. @@ -130,7 +123,7 @@ func (s *EnvoyApi) registrationHandler(response http.ResponseWriter, req *http.R return } - svcName, svcPort, err := SvcNameSplit(name) + svcName, svcPort, err := adapter.SvcNameSplit(name) if err != nil { log.Debugf("Envoy Service '%s' not found in registrationHandler: %s", name, err) sendJsonError(response, 404, "Not Found - "+err.Error()) @@ -240,18 +233,6 @@ func (s *EnvoyApi) listenersHandler(response http.ResponseWriter, req *http.Requ } } -// lookupHost does a vv slow lookup of the DNS host for a service. Totally -// not optimized for high throughput. You should only do this in development -// scenarios. -func lookupHost(hostname string) (string, error) { - addrs, err := net.LookupHost(hostname) - - if err != nil { - return "", err - } - return addrs[0], nil -} - // EnvoyServiceFromService converts a Sidecar service to an Envoy // API service for reporting to the proxy func (s *EnvoyApi) EnvoyServiceFromService(svc *service.Service, svcPort int64) *EnvoyService { @@ -267,7 +248,7 @@ func (s *EnvoyApi) EnvoyServiceFromService(svc *service.Service, svcPort int64) // NOT recommended... this is very slow. Useful in dev modes where you // need to resolve to a different IP address only. if s.config.UseHostnames { - if host, err := lookupHost(svc.Hostname); err == nil { + if host, err := adapter.LookupHost(svc.Hostname); err == nil { address = host } else { log.Warnf("Unable to resolve %s, using IP address", svc.Hostname) @@ -279,7 +260,7 @@ func (s *EnvoyApi) EnvoyServiceFromService(svc *service.Service, svcPort int64) LastCheckIn: svc.Updated.String(), Port: port.Port, Revision: svc.Version(), - Service: SvcName(svc.Name, port.ServicePort), + Service: adapter.SvcName(svc.Name, port.ServicePort), ServiceRepoName: svc.Image, Tags: map[string]string{}, } @@ -321,11 +302,11 @@ func (s *EnvoyApi) EnvoyClustersFromState() []*EnvoyCluster { } clusters = append(clusters, &EnvoyCluster{ - Name: SvcName(svcName, port.ServicePort), + Name: adapter.SvcName(svcName, port.ServicePort), Type: "sds", // use Sidecar's SDS endpoint for the hosts ConnectTimeoutMs: 500, LBType: "round_robin", // TODO figure this out! - ServiceName: SvcName(svcName, port.ServicePort), + ServiceName: adapter.SvcName(svcName, port.ServicePort), }) } } @@ -336,7 +317,7 @@ func (s *EnvoyApi) EnvoyClustersFromState() []*EnvoyCluster { // EnvoyListenerFromService takes a Sidecar service and formats it into // the API format for an Envoy proxy listener (LDS API v1) func (s *EnvoyApi) EnvoyListenerFromService(svc *service.Service, port int64) *EnvoyListener { - apiName := SvcName(svc.Name, port) + apiName := adapter.SvcName(svc.Name, port) listener := &EnvoyListener{ Name: apiName, @@ -439,27 +420,6 @@ func (s *EnvoyApi) EnvoyListenersFromState() []*EnvoyListener { return listeners } -// Format an Envoy service name from our service name and port -func SvcName(name string, port int64) string { - return fmt.Sprintf("%s%s%d", name, ServiceNameSeparator, port) -} - -// Split an Enovy service name into our service name and port -func SvcNameSplit(name string) (string, int64, error) { - parts := strings.Split(name, ServiceNameSeparator) - if len(parts) < 2 { - return "", -1, fmt.Errorf("%s", "Unable to split service name and port!") - } - - svcName := parts[0] - svcPort, err := strconv.ParseInt(parts[1], 10, 64) - if err != nil { - return "", -1, fmt.Errorf("%s", "Unable to parse port!") - } - - return svcName, svcPort, nil -} - // HttpMux returns a configured Gorilla mux to handle all the endpoints // for the Envoy API. func (s *EnvoyApi) HttpMux() http.Handler {