diff --git a/README.md b/README.md index 15c79b2..d8c8148 100644 --- a/README.md +++ b/README.md @@ -44,6 +44,12 @@ In addition to the `http.path` parameter discussed above, the following paramete If `http.gzip` is set to true, the logs will be compressed with GZIP. This is off by default, but for example supported by Sumo Logic. +Override docker hostname with `hostname` parameter + +### Basic Http Authentication +Use `http.user` and `http.password` parameters to set Authorization header + + ### Development diff --git a/http/http.go b/http/http.go index d01fffa..7bb40fd 100644 --- a/http/http.go +++ b/http/http.go @@ -17,6 +17,7 @@ import ( "strings" "sync" "time" + "math" "github.com/gliderlabs/logspout/router" ) @@ -101,7 +102,10 @@ type HTTPAdapter struct { totalMessageCount int bufferMutex sync.Mutex useGzip bool - crash bool + crash bool + hostname string + user string + password string } // NewHTTPAdapter creates an HTTPAdapter @@ -110,8 +114,12 @@ func NewHTTPAdapter(route *router.Route) (router.LogAdapter, error) { // Figure out the URI and create the HTTP client defaultPath := "" path := getStringParameter(route.Options, "http.path", defaultPath) + user := getStringParameter(route.Options, "http.user", "") + password := getStringParameter(route.Options, "http.password", "") endpointUrl := fmt.Sprintf("%s://%s%s", route.Adapter, route.Address, path) debug("http: url:", endpointUrl) + debug("user:",user) + debug("password:", password) transport := &http.Transport{} transport.Dial = dial @@ -170,6 +178,9 @@ func NewHTTPAdapter(route *router.Route) (router.LogAdapter, error) { debug("http: don't crash, keep going") } + // Override docker hostname with a custom hostname + hostname := getStringParameter(route.Options, "hostname", "") + // Make the HTTP adapter return &HTTPAdapter{ route: route, @@ -181,6 +192,9 @@ func NewHTTPAdapter(route *router.Route) (router.LogAdapter, error) { timeout: timeout, useGzip: useGzip, crash: crash, + hostname: hostname, + user: user, + password: password, }, nil } @@ -197,11 +211,13 @@ func (a *HTTPAdapter) Stream(logstream chan *router.Message) { // Flush if the buffer is at capacity if len(a.buffer) >= cap(a.buffer) { + debug("full - flush") a.flushHttp("full") } case <-a.timer.C: // Timeout, flush + debug("timeout - flush") a.flushHttp("timeout") } } @@ -224,6 +240,7 @@ func (a *HTTPAdapter) flushHttp(reason string) { if len(a.buffer) < 1 { return } + debug("processing ", len(a.buffer), " log entries...") // Capture the buffer and make a new one a.bufferMutex.Lock() @@ -235,6 +252,10 @@ func (a *HTTPAdapter) flushHttp(reason string) { messages := make([]string, 0, len(buffer)) for i := range buffer { m := buffer[i] + hostname := a.hostname + if (hostname == "") { + hostname = m.Container.Config.Hostname + } httpMessage := HTTPMessage{ Message: m.Data, Time: m.Time.Format(time.RFC3339), @@ -242,7 +263,7 @@ func (a *HTTPAdapter) flushHttp(reason string) { Name: m.Container.Name, ID: m.Container.ID, Image: m.Container.Config.Image, - Hostname: m.Container.Config.Hostname, + Hostname: hostname, } message, err := json.Marshal(httpMessage) if err != nil { @@ -256,33 +277,49 @@ func (a *HTTPAdapter) flushHttp(reason string) { payload := strings.Join(messages, "\n") go func() { - - // Create the request and send it on its way - request := createRequest(a.url, a.useGzip, payload) start := time.Now() - response, err := a.client.Do(request) - if err != nil { - debug("http - error on client.Do:", err, a.url) - // TODO @raychaser - now what? - if a.crash { - die("http - error on client.Do:", err, a.url) + try := 0 + max_tries := 5 + for { + // Create the request and send it on its way + request := createRequest(a.url, a.user, a.password, a.useGzip, payload) + start = time.Now() + response, err := a.client.Do(request) + if err != nil { + debug("http - error on client.Do:", err, a.url) + // TODO @raychaser - now what? + if a.crash { + die("http - error on client.Do:", err, a.url) + } else { + log.Println("http: error on client.Do:", err) + } } else { - debug("http: error on client.Do:", err) + if response.StatusCode != 200 { + log.Println("http: response not 200 but", response.StatusCode) + // TODO @raychaser - now what? + if a.crash { + die("http: response not 200 but", response.StatusCode) + } + } + + // Make sure the entire response body is read so the HTTP + // connection can be reused + io.Copy(ioutil.Discard, response.Body) + response.Body.Close() + if (err == nil && response.StatusCode == 200) { + break + } } - } - if response.StatusCode != 200 { - debug("http: response not 200 but", response.StatusCode) - // TODO @raychaser - now what? - if a.crash { - die("http: response not 200 but", response.StatusCode) + + if (try < max_tries) { + log.Println("retrying after", math.Exp2(float64(2 * try + 1)), "s...") + time.Sleep(time.Second * time.Duration(math.Exp2(float64(2 * try + 1)))) + } else { + log.Println("stop retrying. ", len(buffer), " log entries lost") + break } + try++ } - - // Make sure the entire response body is read so the HTTP - // connection can be reused - io.Copy(ioutil.Discard, response.Body) - response.Body.Close() - // Bookkeeping, logging timeAll := time.Since(start) a.totalMessageCount += len(messages) @@ -292,7 +329,7 @@ func (a *HTTPAdapter) flushHttp(reason string) { } // Create the request based on whether GZIP compression is to be used -func createRequest(url string, useGzip bool, payload string) *http.Request { +func createRequest(url string, user string, password string, useGzip bool, payload string) *http.Request { var request *http.Request if useGzip { gzipBuffer := new(bytes.Buffer) @@ -323,6 +360,9 @@ func createRequest(url string, useGzip bool, payload string) *http.Request { die("", "http: error on http.NewRequest:", err, url) } } + if (user != "" && password != "") { + request.SetBasicAuth(user, password) + } return request }