Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ In addition to the `http.path` parameter discussed above, the following paramete

If `http.gzip` is set to true, the logs will be compressed with GZIP. This is off by default, but for example supported by Sumo Logic.

Override docker hostname with `hostname` parameter

### Basic Http Authentication
Use `http.user` and `http.password` parameters to set Authorization header



### Development

Expand Down
90 changes: 65 additions & 25 deletions http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"strings"
"sync"
"time"
"math"

"github.com/gliderlabs/logspout/router"
)
Expand Down Expand Up @@ -101,7 +102,10 @@ type HTTPAdapter struct {
totalMessageCount int
bufferMutex sync.Mutex
useGzip bool
crash bool
crash bool
hostname string
user string
password string
}

// NewHTTPAdapter creates an HTTPAdapter
Expand All @@ -110,8 +114,12 @@ func NewHTTPAdapter(route *router.Route) (router.LogAdapter, error) {
// Figure out the URI and create the HTTP client
defaultPath := ""
path := getStringParameter(route.Options, "http.path", defaultPath)
user := getStringParameter(route.Options, "http.user", "")
password := getStringParameter(route.Options, "http.password", "")
endpointUrl := fmt.Sprintf("%s://%s%s", route.Adapter, route.Address, path)
debug("http: url:", endpointUrl)
debug("user:",user)
debug("password:", password)
transport := &http.Transport{}
transport.Dial = dial

Expand Down Expand Up @@ -170,6 +178,9 @@ func NewHTTPAdapter(route *router.Route) (router.LogAdapter, error) {
debug("http: don't crash, keep going")
}

// Override docker hostname with a custom hostname
hostname := getStringParameter(route.Options, "hostname", "")

// Make the HTTP adapter
return &HTTPAdapter{
route: route,
Expand All @@ -181,6 +192,9 @@ func NewHTTPAdapter(route *router.Route) (router.LogAdapter, error) {
timeout: timeout,
useGzip: useGzip,
crash: crash,
hostname: hostname,
user: user,
password: password,
}, nil
}

Expand All @@ -197,11 +211,13 @@ func (a *HTTPAdapter) Stream(logstream chan *router.Message) {

// Flush if the buffer is at capacity
if len(a.buffer) >= cap(a.buffer) {
debug("full - flush")
a.flushHttp("full")
}
case <-a.timer.C:

// Timeout, flush
debug("timeout - flush")
a.flushHttp("timeout")
}
}
Expand All @@ -224,6 +240,7 @@ func (a *HTTPAdapter) flushHttp(reason string) {
if len(a.buffer) < 1 {
return
}
debug("processing ", len(a.buffer), " log entries...")

// Capture the buffer and make a new one
a.bufferMutex.Lock()
Expand All @@ -235,14 +252,18 @@ func (a *HTTPAdapter) flushHttp(reason string) {
messages := make([]string, 0, len(buffer))
for i := range buffer {
m := buffer[i]
hostname := a.hostname
if (hostname == "") {
hostname = m.Container.Config.Hostname
}
httpMessage := HTTPMessage{
Message: m.Data,
Time: m.Time.Format(time.RFC3339),
Source: m.Source,
Name: m.Container.Name,
ID: m.Container.ID,
Image: m.Container.Config.Image,
Hostname: m.Container.Config.Hostname,
Hostname: hostname,
}
message, err := json.Marshal(httpMessage)
if err != nil {
Expand All @@ -256,33 +277,49 @@ func (a *HTTPAdapter) flushHttp(reason string) {
payload := strings.Join(messages, "\n")

go func() {

// Create the request and send it on its way
request := createRequest(a.url, a.useGzip, payload)
start := time.Now()
response, err := a.client.Do(request)
if err != nil {
debug("http - error on client.Do:", err, a.url)
// TODO @raychaser - now what?
if a.crash {
die("http - error on client.Do:", err, a.url)
try := 0
max_tries := 5
for {
// Create the request and send it on its way
request := createRequest(a.url, a.user, a.password, a.useGzip, payload)
start = time.Now()
response, err := a.client.Do(request)
if err != nil {
debug("http - error on client.Do:", err, a.url)
// TODO @raychaser - now what?
if a.crash {
die("http - error on client.Do:", err, a.url)
} else {
log.Println("http: error on client.Do:", err)
}
} else {
debug("http: error on client.Do:", err)
if response.StatusCode != 200 {
log.Println("http: response not 200 but", response.StatusCode)
// TODO @raychaser - now what?
if a.crash {
die("http: response not 200 but", response.StatusCode)
}
}

// Make sure the entire response body is read so the HTTP
// connection can be reused
io.Copy(ioutil.Discard, response.Body)
response.Body.Close()
if (err == nil && response.StatusCode == 200) {
break
}
}
}
if response.StatusCode != 200 {
debug("http: response not 200 but", response.StatusCode)
// TODO @raychaser - now what?
if a.crash {
die("http: response not 200 but", response.StatusCode)

if (try < max_tries) {
log.Println("retrying after", math.Exp2(float64(2 * try + 1)), "s...")
time.Sleep(time.Second * time.Duration(math.Exp2(float64(2 * try + 1))))
} else {
log.Println("stop retrying. ", len(buffer), " log entries lost")
break
}
try++
}

// Make sure the entire response body is read so the HTTP
// connection can be reused
io.Copy(ioutil.Discard, response.Body)
response.Body.Close()

// Bookkeeping, logging
timeAll := time.Since(start)
a.totalMessageCount += len(messages)
Expand All @@ -292,7 +329,7 @@ func (a *HTTPAdapter) flushHttp(reason string) {
}

// Create the request based on whether GZIP compression is to be used
func createRequest(url string, useGzip bool, payload string) *http.Request {
func createRequest(url string, user string, password string, useGzip bool, payload string) *http.Request {
var request *http.Request
if useGzip {
gzipBuffer := new(bytes.Buffer)
Expand Down Expand Up @@ -323,6 +360,9 @@ func createRequest(url string, useGzip bool, payload string) *http.Request {
die("", "http: error on http.NewRequest:", err, url)
}
}
if (user != "" && password != "") {
request.SetBasicAuth(user, password)
}
return request
}

Expand Down