From bfaa6985f3aef2c2057c7e9a3ba62e94ce9023d0 Mon Sep 17 00:00:00 2001 From: Fabien FLEUREAU Date: Tue, 30 Aug 2016 11:47:01 +0200 Subject: [PATCH 01/12] Add support for HTTP Basic Authentication --- README.md | 3 +++ http/http.go | 15 +++++++++++++-- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 15c79b2..9ca099c 100644 --- a/README.md +++ b/README.md @@ -45,6 +45,9 @@ In addition to the `http.path` parameter discussed above, the following paramete If `http.gzip` is set to true, the logs will be compressed with GZIP. This is off by default, but for example supported by Sumo Logic. +### Basic Http Authentication +Use `http.user` and `http.password` parameters to set Authorization header + ### Development This assumes that the unique token for the Sumo Logic HTTP collector endpoint is in the environment as ```$SUMO_HTTP_TOKEN```. diff --git a/http/http.go b/http/http.go index d01fffa..65e1f5c 100644 --- a/http/http.go +++ b/http/http.go @@ -102,6 +102,8 @@ type HTTPAdapter struct { bufferMutex sync.Mutex useGzip bool crash bool + user string + password string } // NewHTTPAdapter creates an HTTPAdapter @@ -110,8 +112,12 @@ func NewHTTPAdapter(route *router.Route) (router.LogAdapter, error) { // Figure out the URI and create the HTTP client defaultPath := "" path := getStringParameter(route.Options, "http.path", defaultPath) + user := getStringParameter(route.Options, "http.user", "") + password := getStringParameter(route.Options, "http.password", "") endpointUrl := fmt.Sprintf("%s://%s%s", route.Adapter, route.Address, path) debug("http: url:", endpointUrl) + debug("user:",user) + debug("password:", password) transport := &http.Transport{} transport.Dial = dial @@ -181,6 +187,8 @@ func NewHTTPAdapter(route *router.Route) (router.LogAdapter, error) { timeout: timeout, useGzip: useGzip, crash: crash, + user: user, + password: password, }, nil } @@ -258,7 +266,7 @@ func (a *HTTPAdapter) flushHttp(reason string) { go func() { // Create the request and send it on its way - request := createRequest(a.url, a.useGzip, payload) + request := createRequest(a.url, a.user, a.password, a.useGzip, payload) start := time.Now() response, err := a.client.Do(request) if err != nil { @@ -292,7 +300,7 @@ func (a *HTTPAdapter) flushHttp(reason string) { } // Create the request based on whether GZIP compression is to be used -func createRequest(url string, useGzip bool, payload string) *http.Request { +func createRequest(url string, user string, password string, useGzip bool, payload string) *http.Request { var request *http.Request if useGzip { gzipBuffer := new(bytes.Buffer) @@ -323,6 +331,9 @@ func createRequest(url string, useGzip bool, payload string) *http.Request { die("", "http: error on http.NewRequest:", err, url) } } + if (user != "" && password != "") { + request.SetBasicAuth(user, password) + } return request } From 30105042be1a1d4dca00fc9208489ebd11b00dcd Mon Sep 17 00:00:00 2001 From: Fabien FLEUREAU Date: Tue, 30 Aug 2016 17:54:45 +0200 Subject: [PATCH 02/12] Add retry if request failed --- http/http.go | 53 +++++++++++++++++++++++++++++----------------------- 1 file changed, 30 insertions(+), 23 deletions(-) diff --git a/http/http.go b/http/http.go index 65e1f5c..6be0e39 100644 --- a/http/http.go +++ b/http/http.go @@ -265,32 +265,39 @@ func (a *HTTPAdapter) flushHttp(reason string) { go func() { - // Create the request and send it on its way - request := createRequest(a.url, a.user, a.password, a.useGzip, payload) - start := time.Now() - response, err := a.client.Do(request) - if err != nil { - debug("http - error on client.Do:", err, a.url) - // TODO @raychaser - now what? - if a.crash { - die("http - error on client.Do:", err, a.url) - } else { - debug("http: error on client.Do:", err) + for { + // Create the request and send it on its way + request := createRequest(a.url, a.user, a.password, a.useGzip, payload) + start := time.Now() + response, err := a.client.Do(request) + if err != nil { + debug("http - error on client.Do:", err, a.url) + // TODO @raychaser - now what? + if a.crash { + die("http - error on client.Do:", err, a.url) + } else { + log.Println("http: error on client.Do:", err) + } } - } - if response.StatusCode != 200 { - debug("http: response not 200 but", response.StatusCode) - // TODO @raychaser - now what? - if a.crash { - die("http: response not 200 but", response.StatusCode) + if response.StatusCode != 200 { + log.Printl("http: response not 200 but", response.StatusCode) + // TODO @raychaser - now what? + if a.crash { + die("http: response not 200 but", response.StatusCode) + } } - } - - // Make sure the entire response body is read so the HTTP - // connection can be reused - io.Copy(ioutil.Discard, response.Body) - response.Body.Close() + // Make sure the entire response body is read so the HTTP + // connection can be reused + io.Copy(ioutil.Discard, response.Body) + response.Body.Close() + if (err == nil && response.StatusCode == 200) { + break + } else { + log.Println("retrying after 2s...") + time.Sleep(time.Second * 2) + } + } // Bookkeeping, logging timeAll := time.Since(start) a.totalMessageCount += len(messages) From 02e8eddcc753935cc6facbc23a0eff16bdde8c8d Mon Sep 17 00:00:00 2001 From: Fabien FLEUREAU Date: Tue, 30 Aug 2016 18:01:09 +0200 Subject: [PATCH 03/12] Add parameter to override hostname parameter --- README.md | 3 +++ http/http.go | 13 +++++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 9ca099c..d8c8148 100644 --- a/README.md +++ b/README.md @@ -44,10 +44,13 @@ In addition to the `http.path` parameter discussed above, the following paramete If `http.gzip` is set to true, the logs will be compressed with GZIP. This is off by default, but for example supported by Sumo Logic. +Override docker hostname with `hostname` parameter ### Basic Http Authentication Use `http.user` and `http.password` parameters to set Authorization header + + ### Development This assumes that the unique token for the Sumo Logic HTTP collector endpoint is in the environment as ```$SUMO_HTTP_TOKEN```. diff --git a/http/http.go b/http/http.go index 6be0e39..76a82ef 100644 --- a/http/http.go +++ b/http/http.go @@ -101,7 +101,8 @@ type HTTPAdapter struct { totalMessageCount int bufferMutex sync.Mutex useGzip bool - crash bool + crash bool + hostname string user string password string } @@ -176,6 +177,9 @@ func NewHTTPAdapter(route *router.Route) (router.LogAdapter, error) { debug("http: don't crash, keep going") } + // Override docker hostname with a custom hostname + hostname := getStringParameter(route.Options, "hostname", "") + // Make the HTTP adapter return &HTTPAdapter{ route: route, @@ -187,6 +191,7 @@ func NewHTTPAdapter(route *router.Route) (router.LogAdapter, error) { timeout: timeout, useGzip: useGzip, crash: crash, + hostname: hostname, user: user, password: password, }, nil @@ -243,6 +248,10 @@ func (a *HTTPAdapter) flushHttp(reason string) { messages := make([]string, 0, len(buffer)) for i := range buffer { m := buffer[i] + hostname := a.hostname + if (hostname == "") { + hostname = m.Container.Config.Hostname + } httpMessage := HTTPMessage{ Message: m.Data, Time: m.Time.Format(time.RFC3339), @@ -250,7 +259,7 @@ func (a *HTTPAdapter) flushHttp(reason string) { Name: m.Container.Name, ID: m.Container.ID, Image: m.Container.Config.Image, - Hostname: m.Container.Config.Hostname, + Hostname: hostname, } message, err := json.Marshal(httpMessage) if err != nil { From 0c8eb4b50e52429176719d0f3e40b2e91d616752 Mon Sep 17 00:00:00 2001 From: Fabien FLEUREAU Date: Tue, 30 Aug 2016 18:15:52 +0200 Subject: [PATCH 04/12] Small fixes --- http/http.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/http/http.go b/http/http.go index 76a82ef..9397478 100644 --- a/http/http.go +++ b/http/http.go @@ -273,11 +273,11 @@ func (a *HTTPAdapter) flushHttp(reason string) { payload := strings.Join(messages, "\n") go func() { - + start := time.Now() for { // Create the request and send it on its way request := createRequest(a.url, a.user, a.password, a.useGzip, payload) - start := time.Now() + start = time.Now() response, err := a.client.Do(request) if err != nil { debug("http - error on client.Do:", err, a.url) @@ -289,7 +289,7 @@ func (a *HTTPAdapter) flushHttp(reason string) { } } if response.StatusCode != 200 { - log.Printl("http: response not 200 but", response.StatusCode) + log.Println("http: response not 200 but", response.StatusCode) // TODO @raychaser - now what? if a.crash { die("http: response not 200 but", response.StatusCode) From dd0ee230e1d493840d5c5e02534a4f1f2f237602 Mon Sep 17 00:00:00 2001 From: Fabien FLEUREAU Date: Tue, 3 Jan 2017 14:36:50 +0100 Subject: [PATCH 05/12] fix nil pointer dereference (response is nil when err != nil) --- http/http.go | 32 ++++++++++++++++---------------- 1 file changed, 16 insertions(+), 16 deletions(-) diff --git a/http/http.go b/http/http.go index 9397478..ecf29ce 100644 --- a/http/http.go +++ b/http/http.go @@ -287,25 +287,25 @@ func (a *HTTPAdapter) flushHttp(reason string) { } else { log.Println("http: error on client.Do:", err) } - } - if response.StatusCode != 200 { - log.Println("http: response not 200 but", response.StatusCode) - // TODO @raychaser - now what? - if a.crash { - die("http: response not 200 but", response.StatusCode) + } else { + if response.StatusCode != 200 { + log.Println("http: response not 200 but", response.StatusCode) + // TODO @raychaser - now what? + if a.crash { + die("http: response not 200 but", response.StatusCode) + } } - } - // Make sure the entire response body is read so the HTTP - // connection can be reused - io.Copy(ioutil.Discard, response.Body) - response.Body.Close() - if (err == nil && response.StatusCode == 200) { - break - } else { - log.Println("retrying after 2s...") - time.Sleep(time.Second * 2) + // Make sure the entire response body is read so the HTTP + // connection can be reused + io.Copy(ioutil.Discard, response.Body) + response.Body.Close() + if (err == nil && response.StatusCode == 200) { + break + } } + log.Println("retrying after 2s...") + time.Sleep(time.Second * 2) } // Bookkeeping, logging timeAll := time.Since(start) From 88ea4ccd5f3a04860153552ad4d4a510a400f7e7 Mon Sep 17 00:00:00 2001 From: Fabien FLEUREAU Date: Tue, 7 Feb 2017 17:48:07 +0100 Subject: [PATCH 06/12] add try exponential delay and max tries --- http/http.go | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/http/http.go b/http/http.go index ecf29ce..ffa6be2 100644 --- a/http/http.go +++ b/http/http.go @@ -210,11 +210,13 @@ func (a *HTTPAdapter) Stream(logstream chan *router.Message) { // Flush if the buffer is at capacity if len(a.buffer) >= cap(a.buffer) { + debug("full - flush") a.flushHttp("full") } case <-a.timer.C: // Timeout, flush + debug("timeout - flush") a.flushHttp("timeout") } } @@ -274,6 +276,8 @@ func (a *HTTPAdapter) flushHttp(reason string) { go func() { start := time.Now() + try := 0 + max_tries := 5 for { // Create the request and send it on its way request := createRequest(a.url, a.user, a.password, a.useGzip, payload) @@ -304,8 +308,15 @@ func (a *HTTPAdapter) flushHttp(reason string) { break } } - log.Println("retrying after 2s...") - time.Sleep(time.Second * 2) + + if (try < max_tries) { + log.Println("retrying after", 2 ** (try + 1), "s...") + time.Sleep(time.Second * 2 ** (try + 1)) + } else { + log.Println("stop retrying - logs lost") + break + } + try = try + 1 } // Bookkeeping, logging timeAll := time.Since(start) From 1838ec5f95662b8f7bea3aca58d0de3b983d2a4c Mon Sep 17 00:00:00 2001 From: Fabien FLEUREAU Date: Tue, 7 Feb 2017 18:12:04 +0100 Subject: [PATCH 07/12] increment --- http/http.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/http/http.go b/http/http.go index ffa6be2..2582cd8 100644 --- a/http/http.go +++ b/http/http.go @@ -316,7 +316,7 @@ func (a *HTTPAdapter) flushHttp(reason string) { log.Println("stop retrying - logs lost") break } - try = try + 1 + try++ } // Bookkeeping, logging timeAll := time.Since(start) From 8e882176a419fe9f45a3f225fa3ed51a4da0e143 Mon Sep 17 00:00:00 2001 From: Fabien FLEUREAU Date: Wed, 8 Feb 2017 12:06:11 +0100 Subject: [PATCH 08/12] fix power --- http/http.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/http/http.go b/http/http.go index 2582cd8..55fa8ab 100644 --- a/http/http.go +++ b/http/http.go @@ -310,8 +310,8 @@ func (a *HTTPAdapter) flushHttp(reason string) { } if (try < max_tries) { - log.Println("retrying after", 2 ** (try + 1), "s...") - time.Sleep(time.Second * 2 ** (try + 1)) + log.Println("retrying after", 2 ^ (try + 1), "s...") + time.Sleep(time.Second * 2 ^ (try + 1)) } else { log.Println("stop retrying - logs lost") break From b27b2ca2eaee0db8ea4f5e30b098b2cfdd52ceb4 Mon Sep 17 00:00:00 2001 From: Fabien FLEUREAU Date: Wed, 8 Feb 2017 14:02:36 +0100 Subject: [PATCH 09/12] fix exp2 --- http/http.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/http/http.go b/http/http.go index 55fa8ab..80e9c74 100644 --- a/http/http.go +++ b/http/http.go @@ -17,6 +17,7 @@ import ( "strings" "sync" "time" + "math" "github.com/gliderlabs/logspout/router" ) @@ -310,8 +311,8 @@ func (a *HTTPAdapter) flushHttp(reason string) { } if (try < max_tries) { - log.Println("retrying after", 2 ^ (try + 1), "s...") - time.Sleep(time.Second * 2 ^ (try + 1)) + log.Println("retrying after", math.Exp2(try + 1.), "s...") + time.Sleep(time.Second * time.Duration(math.Exp2(try + 1.))) } else { log.Println("stop retrying - logs lost") break From 6ca49d3e2ad93aaf446a91f66aa2bf61d1a8c360 Mon Sep 17 00:00:00 2001 From: Fabien FLEUREAU Date: Wed, 8 Feb 2017 14:16:15 +0100 Subject: [PATCH 10/12] fix try type --- http/http.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/http/http.go b/http/http.go index 80e9c74..afcaf2c 100644 --- a/http/http.go +++ b/http/http.go @@ -277,7 +277,7 @@ func (a *HTTPAdapter) flushHttp(reason string) { go func() { start := time.Now() - try := 0 + try := 0. max_tries := 5 for { // Create the request and send it on its way From 349dc85b1b2f5dffeac72cb49d957c120500a373 Mon Sep 17 00:00:00 2001 From: Fabien FLEUREAU Date: Wed, 8 Feb 2017 14:35:28 +0100 Subject: [PATCH 11/12] fix casts --- http/http.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/http/http.go b/http/http.go index afcaf2c..2b9fc0f 100644 --- a/http/http.go +++ b/http/http.go @@ -277,7 +277,7 @@ func (a *HTTPAdapter) flushHttp(reason string) { go func() { start := time.Now() - try := 0. + try := 0 max_tries := 5 for { // Create the request and send it on its way @@ -311,8 +311,8 @@ func (a *HTTPAdapter) flushHttp(reason string) { } if (try < max_tries) { - log.Println("retrying after", math.Exp2(try + 1.), "s...") - time.Sleep(time.Second * time.Duration(math.Exp2(try + 1.))) + log.Println("retrying after", math.Exp2(float64(try + 1)), "s...") + time.Sleep(time.Second * time.Duration(math.Exp2(float64(try + 1)))) } else { log.Println("stop retrying - logs lost") break From 705a0cb8416a6172c13f94e9d7ca69c69376e2a3 Mon Sep 17 00:00:00 2001 From: Fabien FLEUREAU Date: Wed, 8 Feb 2017 15:58:18 +0100 Subject: [PATCH 12/12] more logs --- http/http.go | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/http/http.go b/http/http.go index 2b9fc0f..7bb40fd 100644 --- a/http/http.go +++ b/http/http.go @@ -240,6 +240,7 @@ func (a *HTTPAdapter) flushHttp(reason string) { if len(a.buffer) < 1 { return } + debug("processing ", len(a.buffer), " log entries...") // Capture the buffer and make a new one a.bufferMutex.Lock() @@ -311,10 +312,10 @@ func (a *HTTPAdapter) flushHttp(reason string) { } if (try < max_tries) { - log.Println("retrying after", math.Exp2(float64(try + 1)), "s...") - time.Sleep(time.Second * time.Duration(math.Exp2(float64(try + 1)))) + log.Println("retrying after", math.Exp2(float64(2 * try + 1)), "s...") + time.Sleep(time.Second * time.Duration(math.Exp2(float64(2 * try + 1)))) } else { - log.Println("stop retrying - logs lost") + log.Println("stop retrying. ", len(buffer), " log entries lost") break } try++