Skip to content
This repository was archived by the owner on Sep 26, 2018. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,7 +332,7 @@ func NewTraefikDefaultPointersConfiguration() *TraefikConfiguration {
var defaultSidecar provider.Sidecar
defaultSidecar.Watch = true
defaultSidecar.Endpoint = "http://127.0.0.1:7777"
defaultSidecar.Frontend = "sidecar.toml"
defaultSidecar.Filename = "sidecar.toml"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change makes sense to me given the nature of the PR.

defaultSidecar.RefreshConn = flaeg.Duration(60 * time.Second)

//default Docker
Expand Down
247 changes: 144 additions & 103 deletions provider/sidecar.go
Original file line number Diff line number Diff line change
@@ -1,30 +1,41 @@
package provider

import (
"context"
"fmt"
"io/ioutil"
"net"
"net/http"
"os"
"path/filepath"
"strconv"
"strings"
"time"

fsnotify "gopkg.in/fsnotify.v1"

"github.com/BurntSushi/toml"
"github.com/Nitro/sidecar/catalog"
"github.com/Nitro/sidecar/service"
"github.com/containous/flaeg"
"github.com/containous/traefik/log"
"github.com/containous/traefik/safe"
"github.com/containous/traefik/types"
fsnotify "gopkg.in/fsnotify.v1"
)

const (
method = "wrr"
weight = 0
sticky = false
defaultMethod = "wrr"
defaultSticky = false
)

var (
// Disable all timeouts for watcher requests
watcherHTTPClient = &http.Client{
Timeout: 0,
Transport: &http.Transport{ResponseHeaderTimeout: 0},
}

sidecarHTTPClient = &http.Client{
Timeout: 15 * time.Second,
}
)

var _ Provider = (*Sidecar)(nil)
Expand All @@ -33,7 +44,6 @@ var _ Provider = (*Sidecar)(nil)
type Sidecar struct {
BaseProvider `mapstructure:",squash"`
Endpoint string `description:"Sidecar URL"`
Frontend string `description:"Configuration file for frontend"`
configurationChan chan<- types.ConfigMessage
RefreshConn flaeg.Duration `description:"How often to refresh the connection to Sidecar backend"`
connTimer *time.Timer
Expand All @@ -43,22 +53,31 @@ type callback func(map[string][]*service.Service, error)

// Provide allows the provider to provide configurations to traefik
// using the given configuration channel.
func (provider *Sidecar) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, constraints types.Constraints) error {
func (provider *Sidecar) Provide(configurationChan chan<- types.ConfigMessage, pool *safe.Pool, _ types.Constraints) error {
provider.configurationChan = configurationChan

// Exit with a bang if we can't reach the Sidecar endpoint
// TODO: On second thought, maybe it's best to continuously log errors in a loop
// since no other provider uses log.Fatal if something goes wrong...
_, err := watcherHTTPClient.Get(provider.Endpoint)
if err != nil {
log.Fatal("Failed to connect to the Sidecar endpoint: ", err)
}

if provider.Watch {
safe.Go(func() {
provider.sidecarWatcher()
provider.runSidecarWatcher()
})

watcher, err := fsnotify.NewWatcher()
if err != nil {
log.Errorln("Error creating file watcher", err)
log.Error("Error creating file watcher: ", err)
return err
}

file, err := os.Open(provider.Frontend)
file, err := os.Open(provider.Filename)
if err != nil {
log.Errorln("Error opening file", err)
log.Error("Error opening file: ", err)
return err
}
defer file.Close()
Expand All @@ -70,150 +89,171 @@ func (provider *Sidecar) Provide(configurationChan chan<- types.ConfigMessage, p
return
case event := <-watcher.Events:
if strings.Contains(event.Name, file.Name()) {
log.Debug("Sidecar Frontend File event:", event)
states, errState := provider.fetchState()
if errState != nil {
log.Errorln("Error reloading Sidecar config", errState)
log.Debug("Sidecar config file event: ", event)
err = provider.reloadConfig()
if err != nil {
log.Error(err)
}
provider.loadSidecarConfig(states.ByService())
}
case errWatcher := <-watcher.Errors:
log.Errorln("Watcher event error", errWatcher)
log.Error("Watcher event error: ", errWatcher)
}
}
})
err = watcher.Add(filepath.Dir(file.Name()))
if err != nil {
log.Error("Error adding file watcher", err)
log.Error("Error adding file watcher: ", err)
return err
}
} else {
err := provider.reloadConfig()
if err != nil {
log.Error(err)
}
}

return nil
}

func (provider *Sidecar) reloadConfig() error {
states, err := provider.fetchState()
if err != nil {
log.Fatalln("Error reloading Sidecar config", err)
return fmt.Errorf("Error fetching Sidecar state: %s", err)
}
err = provider.loadSidecarConfig(states.ByService())

err = provider.loadConfig(states.ByService())
if err != nil {
return err
return fmt.Errorf("Error loading Sidecar config: %s", err)
}

return nil
}

func (provider *Sidecar) constructConfig(sidecarStates map[string][]*service.Service) (*types.Configuration, error) {
log.Infoln("loading sidecar config")
sidecarConfig := types.Configuration{Backends: provider.makeBackends(sidecarStates)}
var err error
sidecarConfig.Frontends, err = provider.makeFrontend()
if err != nil {
return nil, err
func (provider *Sidecar) loadConfig(sidecarStates map[string][]*service.Service) error {
log.Info("Loading sidecar config...")

config := &types.Configuration{
Backends: make(map[string]*types.Backend),
}
return &sidecarConfig, nil
}

func (provider *Sidecar) loadSidecarConfig(sidecarStates map[string][]*service.Service) error {
conf, err := provider.constructConfig(sidecarStates)
if err != nil {
if _, err := toml.DecodeFile(provider.Filename, config); err != nil {
return err
}

// Create backends from Sidecar state data
for serviceName, services := range sidecarStates {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we could just pass the whole state in and call EachService on it rather than first generating the map and then passing it here and then ranging over it. I know this is different from the original code, too. But if we were going to redo it, that's what I'd recommend over this.

backend, ok := config.Backends[serviceName]
if !ok {
backend = &types.Backend{}
config.Backends[serviceName] = backend
}

if backend.LoadBalancer == nil {
backend.LoadBalancer = &types.LoadBalancer{Method: defaultMethod, Sticky: defaultSticky}
}
if backend.Servers == nil {
backend.Servers = make(map[string]types.Server)
}

for _, serv := range services {
if serv.IsAlive() {
ipAddr, err := net.LookupIP(serv.Hostname)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should use the IP address for the Port if there is one and only fall back to lookups when there isn't (now that we have this avialable).

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess that ^^ was discussed here earlier. Probably best in a separate PR.


for i := 0; i < len(serv.Ports); i++ {
// TODO: is there any point to add unreachable hosts?
var hostname string
if err != nil {
log.Warn("Failed to resolve IP address: ", err)
hostname = serv.Hostname
} else {
hostname = ipAddr[0].String()
}

backend.Servers[serv.Hostname] = types.Server{
URL: fmt.Sprintf("http://%s:%d", hostname, serv.Ports[i].Port),
}
}
}
}
}

provider.configurationChan <- types.ConfigMessage{
ProviderName: "sidecar",
Configuration: conf,
Configuration: config,
}

log.Info("Finished loading sidecar config")

return nil
}

func (provider *Sidecar) sidecarWatcher() error {
//set timeout to be just a bot more than connection refresh interval
func (provider *Sidecar) runSidecarWatcher() {
// Set timeout to be just a bit more than connection refresh interval
provider.connTimer = time.NewTimer(time.Duration(provider.RefreshConn))
tr := &http.Transport{ResponseHeaderTimeout: 0}
client := &http.Client{
Timeout: 0,
Transport: tr}

log.Debugf("Using %s Sidecar connection refresh interval", provider.RefreshConn)
provider.recycleConn(client, tr)
return nil
for {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will this for loop be exited?

Copy link
Author

@mihaitodor mihaitodor Apr 28, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@relistan Not sure what should make it exit. My understanding of the design is that it should keep running as long as Traefik is in Watch mode. Maybe have a way to stop it if the config changes to turn the Watch mode off?

PS: it's the same as in the original implementation: https://github.com/Nitro/traefik/blob/master/provider/sidecar.go#L141

// Call a separate function because the defer statement has function scope
provider.sidecarWatcher()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Probably an anonymous function here would be better since this method does nothing on its own and all the work is now in sidecarWatcher

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initially, I had it like that, but @bparli argued that it's less readable.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd originally reused pointer variables. I still don't see a problem with doing that and it seems more efficient than the overhead of repeatedly calling a function, even in Go. I guess I'm still not seeing how this particular refactoring is an improvement.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have read the original code multiple times now and I still don't feel confident enough to explain to somebody else why it's 100% correct. It may very well be, but I don't understand it fully and I'm probably missing some context. The fact that it is working in production is encouraging so I don't mind leaving it the way you designed it. In this sense, I have created #8 for moving forward with the MaxConn additions.

My objection to the way it was done is clarity and an explanation of why it works the way it does that we can relate to some existing documentation. The fact that Close() is never called on that HTTP response body deviates from a well-established Go pattern and the whole pointer reuse technique makes me scratch my head, because the variable will get overwritten during the subsequent iteration of the for loop.

}
}

func (provider *Sidecar) recycleConn(client *http.Client, tr *http.Transport) {
var err error
var resp *http.Response
var req *http.Request
for { //use refresh interval to occasionally reconnect to Sidecar in case the stream connection is lost
req, err = http.NewRequest("GET", provider.Endpoint+"/watch", nil)
if err != nil {
log.Errorf("Error creating http request to Sidecar: %s, Error: %s", provider.Endpoint, err)
continue
}
resp, err = client.Do(req)
if err != nil {
log.Errorf("Error connecting to Sidecar: %s, Error: %s", provider.Endpoint, err)
time.Sleep(5 * time.Second)
continue
}
safe.Go(func() { catalog.DecodeStream(resp.Body, provider.callbackLoader) })
func (provider *Sidecar) sidecarWatcher() {
// Use refresh interval to occasionally reconnect to Sidecar in case the stream connection is lost
req, err := http.NewRequest(http.MethodGet, provider.Endpoint+"/watch", nil)
if err != nil {
log.Errorf("Error creating http request to Sidecar instance '%s': %s", provider.Endpoint, err)
time.Sleep(5 * time.Second)
return
}

//wait on refresh connection timer. If this expires we haven't seen an update in a
//while and should cancel the request, reset the time, and reconnect just in case
<-provider.connTimer.C
provider.connTimer.Reset(time.Duration(provider.RefreshConn))
tr.CancelRequest(req)
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that CancelRequest has been deprecated and the new context functionality should be used instead.

cx, cancel := context.WithCancel(context.Background())
// Cancel the infinite timeout request automatically after we reset connTimer
defer cancel()

req = req.WithContext(cx)

resp, err := watcherHTTPClient.Do(req)
if err != nil {
log.Errorf("Error connecting to Sidecar instance '%s': %s", provider.Endpoint, err)
time.Sleep(5 * time.Second)
return
}
defer resp.Body.Close()

safe.Go(func() { catalog.DecodeStream(resp.Body, provider.callbackLoader) })

// Wait on refresh connection timer. If this expires we haven't seen an update in a
// while and should cancel the request, reset the time, and reconnect just in case
<-provider.connTimer.C
provider.connTimer.Reset(time.Duration(provider.RefreshConn))
}

func (provider *Sidecar) callbackLoader(sidecarStates map[string][]*service.Service, err error) {
//load config regardless
provider.loadSidecarConfig(sidecarStates)
configErr := provider.loadConfig(sidecarStates)
if configErr != nil {
log.Error("Error loading sidecar config: ", err)
}

if err != nil {
return
}
//else reset connection timer

// Else reset connection timer
if !provider.connTimer.Stop() {
<-provider.connTimer.C
}
provider.connTimer.Reset(time.Duration(provider.RefreshConn))
return
}

func (provider *Sidecar) makeFrontend() (map[string]*types.Frontend, error) {
configuration := new(types.Configuration)
if _, err := toml.DecodeFile(provider.Frontend, configuration); err != nil {
log.Errorf("Error reading file: %s", err)
return nil, err
}
return configuration.Frontends, nil
}

func (provider *Sidecar) makeBackends(sidecarStates map[string][]*service.Service) map[string]*types.Backend {
sidecarBacks := make(map[string]*types.Backend)
for serviceName, services := range sidecarStates {
newServers := make(map[string]types.Server)
newBackend := &types.Backend{LoadBalancer: &types.LoadBalancer{Method: method, Sticky: sticky},
Servers: newServers}
for _, serv := range services {
if serv.IsAlive() {
for i := 0; i < len(serv.Ports); i++ {
ipAddr, err := net.LookupIP(serv.Hostname)
if err != nil {
log.Errorln("Error resolving Ip address, ", err)
newBackend.Servers[serv.Hostname] = types.Server{URL: "http://" + serv.Hostname + ":" + strconv.FormatInt(serv.Ports[i].Port, 10)}
} else {
newBackend.Servers[serv.Hostname] = types.Server{URL: "http://" + ipAddr[0].String() + ":" + strconv.FormatInt(serv.Ports[i].Port, 10)}
}
}
}
}
sidecarBacks[serviceName] = newBackend
}
return sidecarBacks
provider.connTimer.Reset(time.Duration(provider.RefreshConn))
}

func (provider *Sidecar) fetchState() (*catalog.ServicesState, error) {
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Get(provider.Endpoint + "/state.json")
resp, err := watcherHTTPClient.Get(provider.Endpoint + "/state.json")
if err != nil {
return nil, err
}
defer resp.Body.Close()

bytes, err := ioutil.ReadAll(resp.Body)
if err != nil {
Expand All @@ -224,5 +264,6 @@ func (provider *Sidecar) fetchState() (*catalog.ServicesState, error) {
if err != nil {
return nil, err
}

return state, nil
}
Loading