-
Notifications
You must be signed in to change notification settings - Fork 0
Add Sidecar maxconn support - DO NOT MERGE #7
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,30 +1,41 @@ | ||
| package provider | ||
|
|
||
| import ( | ||
| "context" | ||
| "fmt" | ||
| "io/ioutil" | ||
| "net" | ||
| "net/http" | ||
| "os" | ||
| "path/filepath" | ||
| "strconv" | ||
| "strings" | ||
| "time" | ||
|
|
||
| fsnotify "gopkg.in/fsnotify.v1" | ||
|
|
||
| "github.com/BurntSushi/toml" | ||
| "github.com/Nitro/sidecar/catalog" | ||
| "github.com/Nitro/sidecar/service" | ||
| "github.com/containous/flaeg" | ||
| "github.com/containous/traefik/log" | ||
| "github.com/containous/traefik/safe" | ||
| "github.com/containous/traefik/types" | ||
| fsnotify "gopkg.in/fsnotify.v1" | ||
| ) | ||
|
|
||
| const ( | ||
| method = "wrr" | ||
| weight = 0 | ||
| sticky = false | ||
| defaultMethod = "wrr" | ||
| defaultSticky = false | ||
| ) | ||
|
|
||
| var ( | ||
| // Disable all timeouts for watcher requests | ||
| watcherHTTPClient = &http.Client{ | ||
| Timeout: 0, | ||
| Transport: &http.Transport{ResponseHeaderTimeout: 0}, | ||
| } | ||
|
|
||
| sidecarHTTPClient = &http.Client{ | ||
| Timeout: 15 * time.Second, | ||
| } | ||
| ) | ||
|
|
||
| var _ Provider = (*Sidecar)(nil) | ||
|
|
@@ -33,7 +44,6 @@ var _ Provider = (*Sidecar)(nil) | |
| type Sidecar struct { | ||
| BaseProvider `mapstructure:",squash"` | ||
| Endpoint string `description:"Sidecar URL"` | ||
| Frontend string `description:"Configuration file for frontend"` | ||
| configurationChan chan<- types.ConfigMessage | ||
| RefreshConn flaeg.Duration `description:"How often to refresh the connection to Sidecar backend"` | ||
| connTimer *time.Timer | ||
|
|
@@ -43,22 +53,31 @@ type callback func(map[string][]*service.Service, error) | |
|
|
||
| // Provide allows the provider to provide configurations to traefik | ||
| // using the given configuration channel. | ||
| func (provider *Sidecar) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error { | ||
| func (provider *Sidecar) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, _ types.Constraints) error { | ||
| provider.configurationChan = configurationChan | ||
|
|
||
| // Exit with a bang if we can't reach the Sidecar endpoint | ||
| // TODO: On second thought, maybe it's best to continuously log errors in a loop | ||
| // since no other provider uses log.Fatal if something goes wrong... | ||
| _, err := watcherHTTPClient.Get(provider.Endpoint) | ||
| if err != nil { | ||
| log.Fatal("Failed to connect to the Sidecar endpoint: ", err) | ||
| } | ||
|
|
||
| if provider.Watch { | ||
| safe.Go(func() { | ||
| provider.sidecarWatcher() | ||
| provider.runSidecarWatcher() | ||
| }) | ||
|
|
||
| watcher, err := fsnotify.NewWatcher() | ||
| if err != nil { | ||
| log.Errorln("Error creating file watcher", err) | ||
| log.Error("Error creating file watcher: ", err) | ||
| return err | ||
| } | ||
|
|
||
| file, err := os.Open(provider.Frontend) | ||
| file, err := os.Open(provider.Filename) | ||
| if err != nil { | ||
| log.Errorln("Error opening file", err) | ||
| log.Error("Error opening file: ", err) | ||
| return err | ||
| } | ||
| defer file.Close() | ||
|
|
@@ -70,150 +89,171 @@ func (provider *Sidecar) Provide(configurationChan chan<- types.ConfigMessage, p | |
| return | ||
| case event := <-watcher.Events: | ||
| if strings.Contains(event.Name, file.Name()) { | ||
| log.Debug("Sidecar Frontend File event:", event) | ||
| states, errState := provider.fetchState() | ||
| if errState != nil { | ||
| log.Errorln("Error reloading Sidecar config", errState) | ||
| log.Debug("Sidecar config file event: ", event) | ||
| err = provider.reloadConfig() | ||
| if err != nil { | ||
| log.Error(err) | ||
| } | ||
| provider.loadSidecarConfig(states.ByService()) | ||
| } | ||
| case errWatcher := <-watcher.Errors: | ||
| log.Errorln("Watcher event error", errWatcher) | ||
| log.Error("Watcher event error: ", errWatcher) | ||
| } | ||
| } | ||
| }) | ||
| err = watcher.Add(filepath.Dir(file.Name())) | ||
| if err != nil { | ||
| log.Error("Error adding file watcher", err) | ||
| log.Error("Error adding file watcher: ", err) | ||
| return err | ||
| } | ||
| } else { | ||
| err := provider.reloadConfig() | ||
| if err != nil { | ||
| log.Error(err) | ||
| } | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func (provider *Sidecar) reloadConfig() error { | ||
| states, err := provider.fetchState() | ||
| if err != nil { | ||
| log.Fatalln("Error reloading Sidecar config", err) | ||
| return fmt.Errorf("Error fetching Sidecar state: %s", err) | ||
| } | ||
| err = provider.loadSidecarConfig(states.ByService()) | ||
|
|
||
| err = provider.loadConfig(states.ByService()) | ||
| if err != nil { | ||
| return err | ||
| return fmt.Errorf("Error loading Sidecar config: %s", err) | ||
| } | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func (provider *Sidecar) constructConfig(sidecarStates map[string][]*service.Service) (*types.Configuration, error) { | ||
| log.Infoln("loading sidecar config") | ||
| sidecarConfig := types.Configuration{Backends: provider.makeBackends(sidecarStates)} | ||
| var err error | ||
| sidecarConfig.Frontends, err = provider.makeFrontend() | ||
| if err != nil { | ||
| return nil, err | ||
| func (provider *Sidecar) loadConfig(sidecarStates map[string][]*service.Service) error { | ||
| log.Info("Loading sidecar config...") | ||
|
|
||
| config := &types.Configuration{ | ||
| Backends: make(map[string]*types.Backend), | ||
| } | ||
| return &sidecarConfig, nil | ||
| } | ||
|
|
||
| func (provider *Sidecar) loadSidecarConfig(sidecarStates map[string][]*service.Service) error { | ||
| conf, err := provider.constructConfig(sidecarStates) | ||
| if err != nil { | ||
| if _, err := toml.DecodeFile(provider.Filename, config); err != nil { | ||
| return err | ||
| } | ||
|
|
||
| // Create backends from Sidecar state data | ||
| for serviceName, services := range sidecarStates { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here we could just pass the whole state in and call |
||
| backend, ok := config.Backends[serviceName] | ||
| if !ok { | ||
| backend = &types.Backend{} | ||
| config.Backends[serviceName] = backend | ||
| } | ||
|
|
||
| if backend.LoadBalancer == nil { | ||
| backend.LoadBalancer = &types.LoadBalancer{Method: defaultMethod, Sticky: defaultSticky} | ||
| } | ||
| if backend.Servers == nil { | ||
| backend.Servers = make(map[string]types.Server) | ||
| } | ||
|
|
||
| for _, serv := range services { | ||
| if serv.IsAlive() { | ||
| ipAddr, err := net.LookupIP(serv.Hostname) | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should use the IP address for the Port if there is one and only fall back to lookups when there isn't (now that we have this avialable). There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I guess that ^^ was discussed here earlier. Probably best in a separate PR. |
||
|
|
||
| for i := 0; i < len(serv.Ports); i++ { | ||
| // TODO: is there any point to add unreachable hosts? | ||
| var hostname string | ||
| if err != nil { | ||
| log.Warn("Failed to resolve IP address: ", err) | ||
| hostname = serv.Hostname | ||
| } else { | ||
| hostname = ipAddr[0].String() | ||
| } | ||
|
|
||
| backend.Servers[serv.Hostname] = types.Server{ | ||
| URL: fmt.Sprintf("http://%s:%d", hostname, serv.Ports[i].Port), | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| provider.configurationChan <- types.ConfigMessage{ | ||
| ProviderName: "sidecar", | ||
| Configuration: conf, | ||
| Configuration: config, | ||
| } | ||
|
|
||
| log.Info("Finished loading sidecar config") | ||
|
|
||
| return nil | ||
| } | ||
|
|
||
| func (provider *Sidecar) sidecarWatcher() error { | ||
| //set timeout to be just a bot more than connection refresh interval | ||
| func (provider *Sidecar) runSidecarWatcher() { | ||
| // Set timeout to be just a bit more than connection refresh interval | ||
| provider.connTimer = time.NewTimer(time.Duration(provider.RefreshConn)) | ||
| tr := &http.Transport{ResponseHeaderTimeout: 0} | ||
| client := &http.Client{ | ||
| Timeout: 0, | ||
| Transport: tr} | ||
|
|
||
| log.Debugf("Using %s Sidecar connection refresh interval", provider.RefreshConn) | ||
| provider.recycleConn(client, tr) | ||
| return nil | ||
| for { | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How will this
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @relistan Not sure what should make it exit. My understanding of the design is that it should keep running as long as Traefik is in Watch mode. Maybe have a way to stop it if the config changes to turn the Watch mode off? PS: it's the same as in the original implementation: https://github.com/Nitro/traefik/blob/master/provider/sidecar.go#L141 |
||
| // Call a separate function because the defer statement has function scope | ||
| provider.sidecarWatcher() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Probably an anonymous function here would be better since this method does nothing on its own and all the work is now in
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Initially, I had it like that, but @bparli argued that it's less readable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd originally reused pointer variables. I still don't see a problem with doing that and it seems more efficient than the overhead of repeatedly calling a function, even in Go. I guess I'm still not seeing how this particular refactoring is an improvement.
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have read the original code multiple times now and I still don't feel confident enough to explain to somebody else why it's 100% correct. It may very well be, but I don't understand it fully and I'm probably missing some context. The fact that it is working in production is encouraging so I don't mind leaving it the way you designed it. In this sense, I have created #8 for moving forward with the MaxConn additions. My objection to the way it was done is clarity and an explanation of why it works the way it does that we can relate to some existing documentation. The fact that |
||
| } | ||
| } | ||
|
|
||
| func (provider *Sidecar) recycleConn(client *http.Client, tr *http.Transport) { | ||
| var err error | ||
| var resp *http.Response | ||
| var req *http.Request | ||
| for { //use refresh interval to occasionally reconnect to Sidecar in case the stream connection is lost | ||
| req, err = http.NewRequest("GET", provider.Endpoint+"/watch", nil) | ||
| if err != nil { | ||
| log.Errorf("Error creating http request to Sidecar: %s, Error: %s", provider.Endpoint, err) | ||
| continue | ||
| } | ||
| resp, err = client.Do(req) | ||
| if err != nil { | ||
| log.Errorf("Error connecting to Sidecar: %s, Error: %s", provider.Endpoint, err) | ||
| time.Sleep(5 * time.Second) | ||
| continue | ||
| } | ||
| safe.Go(func() { catalog.DecodeStream(resp.Body, provider.callbackLoader) }) | ||
| func (provider *Sidecar) sidecarWatcher() { | ||
| // Use refresh interval to occasionally reconnect to Sidecar in case the stream connection is lost | ||
| req, err := http.NewRequest(http.MethodGet, provider.Endpoint+"/watch", nil) | ||
| if err != nil { | ||
| log.Errorf("Error creating http request to Sidecar instance '%s': %s", provider.Endpoint, err) | ||
| time.Sleep(5 * time.Second) | ||
| return | ||
| } | ||
|
|
||
| //wait on refresh connection timer. If this expires we haven't seen an update in a | ||
| //while and should cancel the request, reset the time, and reconnect just in case | ||
| <-provider.connTimer.C | ||
| provider.connTimer.Reset(time.Duration(provider.RefreshConn)) | ||
| tr.CancelRequest(req) | ||
|
||
| cx, cancel := context.WithCancel(context.Background()) | ||
| // Cancel the infinite timeout request automatically after we reset connTimer | ||
| defer cancel() | ||
|
|
||
| req = req.WithContext(cx) | ||
|
|
||
| resp, err := watcherHTTPClient.Do(req) | ||
| if err != nil { | ||
| log.Errorf("Error connecting to Sidecar instance '%s': %s", provider.Endpoint, err) | ||
| time.Sleep(5 * time.Second) | ||
| return | ||
| } | ||
| defer resp.Body.Close() | ||
|
|
||
| safe.Go(func() { catalog.DecodeStream(resp.Body, provider.callbackLoader) }) | ||
|
|
||
| // Wait on refresh connection timer. If this expires we haven't seen an update in a | ||
| // while and should cancel the request, reset the time, and reconnect just in case | ||
| <-provider.connTimer.C | ||
| provider.connTimer.Reset(time.Duration(provider.RefreshConn)) | ||
| } | ||
|
|
||
| func (provider *Sidecar) callbackLoader(sidecarStates map[string][]*service.Service, err error) { | ||
| //load config regardless | ||
| provider.loadSidecarConfig(sidecarStates) | ||
| configErr := provider.loadConfig(sidecarStates) | ||
| if configErr != nil { | ||
| log.Error("Error loading sidecar config: ", err) | ||
| } | ||
|
|
||
| if err != nil { | ||
| return | ||
| } | ||
| //else reset connection timer | ||
|
|
||
| // Else reset connection timer | ||
| if !provider.connTimer.Stop() { | ||
| <-provider.connTimer.C | ||
| } | ||
| provider.connTimer.Reset(time.Duration(provider.RefreshConn)) | ||
| return | ||
| } | ||
|
|
||
| func (provider *Sidecar) makeFrontend() (map[string]*types.Frontend, error) { | ||
| configuration := new(types.Configuration) | ||
| if _, err := toml.DecodeFile(provider.Frontend, configuration); err != nil { | ||
| log.Errorf("Error reading file: %s", err) | ||
| return nil, err | ||
| } | ||
| return configuration.Frontends, nil | ||
| } | ||
|
|
||
| func (provider *Sidecar) makeBackends(sidecarStates map[string][]*service.Service) map[string]*types.Backend { | ||
| sidecarBacks := make(map[string]*types.Backend) | ||
| for serviceName, services := range sidecarStates { | ||
| newServers := make(map[string]types.Server) | ||
| newBackend := &types.Backend{LoadBalancer: &types.LoadBalancer{Method: method, Sticky: sticky}, | ||
| Servers: newServers} | ||
| for _, serv := range services { | ||
| if serv.IsAlive() { | ||
| for i := 0; i < len(serv.Ports); i++ { | ||
| ipAddr, err := net.LookupIP(serv.Hostname) | ||
| if err != nil { | ||
| log.Errorln("Error resolving Ip address, ", err) | ||
| newBackend.Servers[serv.Hostname] = types.Server{URL: "http://" + serv.Hostname + ":" + strconv.FormatInt(serv.Ports[i].Port, 10)} | ||
| } else { | ||
| newBackend.Servers[serv.Hostname] = types.Server{URL: "http://" + ipAddr[0].String() + ":" + strconv.FormatInt(serv.Ports[i].Port, 10)} | ||
| } | ||
| } | ||
| } | ||
| } | ||
| sidecarBacks[serviceName] = newBackend | ||
| } | ||
| return sidecarBacks | ||
| provider.connTimer.Reset(time.Duration(provider.RefreshConn)) | ||
| } | ||
|
|
||
| func (provider *Sidecar) fetchState() (*catalog.ServicesState, error) { | ||
| client := &http.Client{Timeout: 5 * time.Second} | ||
| resp, err := client.Get(provider.Endpoint + "/state.json") | ||
| resp, err := watcherHTTPClient.Get(provider.Endpoint + "/state.json") | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
| defer resp.Body.Close() | ||
|
|
||
| bytes, err := ioutil.ReadAll(resp.Body) | ||
| if err != nil { | ||
|
|
@@ -224,5 +264,6 @@ func (provider *Sidecar) fetchState() (*catalog.ServicesState, error) { | |
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| return state, nil | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change makes sense to me given the nature of the PR.