diff --git a/catalog/services_state.go b/catalog/services_state.go index db688418..6d357bb2 100644 --- a/catalog/services_state.go +++ b/catalog/services_state.go @@ -171,8 +171,8 @@ func (state *ServicesState) ExpireServer(hostname string) { for _, svc := range state.Servers[hostname].Services { previousStatus := svc.Status - state.ServiceChanged(svc, previousStatus, svc.Updated) svc.Tombstone() + state.ServiceChanged(svc, previousStatus, svc.Updated) tombstones = append(tombstones, *svc) } diff --git a/catalog/services_state_test.go b/catalog/services_state_test.go index ac848f5a..d164edb2 100644 --- a/catalog/services_state_test.go +++ b/catalog/services_state_test.go @@ -670,6 +670,11 @@ func Test_ClusterMembershipManagement(t *testing.T) { state.AddServiceEntry(service1) state.AddServiceEntry(service2) + dummyListener := mockListener{ + events: make(chan ChangeEvent, len(state.Servers[hostname].Services)), + } + state.AddListener(&dummyListener) + go state.ExpireServer(hostname) expired := <-state.Broadcasts @@ -677,6 +682,13 @@ func Test_ClusterMembershipManagement(t *testing.T) { // Timestamps chagne when tombstoning, so regex match So(expired[0], ShouldMatch, "^{\"ID\":\"deadbeef.*\"Status\":1}$") So(expired[1], ShouldMatch, "^{\"ID\":\"deadbeef.*\"Status\":1}$") + + Convey("and sends the tombstones to any listener", func() { + for i := 0; i < len(state.Servers[hostname].Services); i++ { + changeEvent := <-dummyListener.Chan() + So(changeEvent.Service.Status, ShouldEqual, service.TOMBSTONE) + } + }) }) Convey("does not announce services for hosts with none", func() { diff --git a/catalog/url_listener_test.go b/catalog/url_listener_test.go index 86c5dd98..c4757dd5 100644 --- a/catalog/url_listener_test.go +++ b/catalog/url_listener_test.go @@ -1,9 +1,12 @@ package catalog import ( + "encoding/json" + "io/ioutil" "net/http" "net/url" "testing" + "time" "github.com/Nitro/sidecar/service" "github.com/relistan/go-director" @@ -47,36 +50,57 @@ func Test_prepareCookieJar(t *testing.T) { } func Test_Listen(t *testing.T) { - Convey("Listen()", t, func() { + Convey("Listen()", t, func(c C) { url := "http://beowulf.example.com" - - httpmock.RegisterResponder( - "POST", url, - func(req *http.Request) (*http.Response, error) { - return httpmock.NewStringResponse(500, "so bad!"), nil - }, - ) - - httpmock.Activate() - listener := NewUrlListener(url, false) - errors := make(chan error) - listener.looper = director.NewFreeLooper(1, errors) - hostname := "grendel" svcId1 := "deadbeef123" service1 := service.Service{ID: svcId1, Hostname: hostname} + svcId2 := "ecgtheow" + service2 := service.Service{ID: svcId2, Hostname: hostname} state := NewServicesState() state.Hostname = hostname - state.AddServiceEntry(service1) - state.Servers[hostname].Services[service1.ID].Tombstone() + postShouldErr := false + var changeEventTime time.Time + httpmock.RegisterResponder( + "POST", url, + func(req *http.Request) (*http.Response, error) { + if postShouldErr { + return httpmock.NewStringResponse(500, "so bad!"), nil + } + + bodyBytes, err := ioutil.ReadAll(req.Body) + c.So(err, ShouldBeNil) + + var evt StateChangedEvent + err = json.Unmarshal(bodyBytes, &evt) + c.So(err, ShouldBeNil) + c.So(evt.ChangeEvent.PreviousStatus, ShouldEqual, service.ALIVE) + + // Make sure each new event comes in with a different timestamp + c.So(evt.ChangeEvent.Time, ShouldNotEqual, changeEventTime) + changeEventTime = evt.ChangeEvent.Time + + return httpmock.NewBytesResponse(200, nil), nil + }, + ) + httpmock.Activate() Reset(func() { httpmock.DeactivateAndReset() }) Convey("handles a bad post", func() { + postShouldErr = true + + state.AddServiceEntry(service1) + state.Servers[hostname].Services[service1.ID].Tombstone() + + listener := NewUrlListener(url, false) + errors := make(chan error) + listener.looper = director.NewFreeLooper(1, errors) + listener.eventChannel <- ChangeEvent{} listener.Retries = 0 listener.Watch(state) @@ -85,5 +109,26 @@ func Test_Listen(t *testing.T) { So(err, ShouldBeNil) So(len(errors), ShouldEqual, 0) }) + + Convey("gets all updates when a server expires", func() { + state.AddServiceEntry(service1) + state.AddServiceEntry(service2) + + listener := NewUrlListener(url, false) + errors := make(chan error) + // Do two iterations: One for each service from the expired server + listener.looper = director.NewFreeLooper( + len(state.Servers[hostname].Services), errors) + listener.Retries = 0 + + listener.Watch(state) + + state.ExpireServer(hostname) + + // Block until both iterations are done + err := listener.looper.Wait() + So(err, ShouldBeNil) + So(len(errors), ShouldEqual, 0) + }) }) } diff --git a/receiver/http.go b/receiver/http.go index 13d29e0d..5deee5c2 100644 --- a/receiver/http.go +++ b/receiver/http.go @@ -44,7 +44,7 @@ func UpdateHandler(response http.ResponseWriter, req *http.Request, rcvr *Receiv rcvr.StateLock.Lock() defer rcvr.StateLock.Unlock() - if rcvr.CurrentState == nil || rcvr.CurrentState.LastChanged.Before(evt.State.LastChanged) { + if rcvr.CurrentState == nil || rcvr.CurrentState.LastChanged.Before(evt.ChangeEvent.Time) { rcvr.CurrentState = evt.State rcvr.LastSvcChanged = &evt.ChangeEvent.Service diff --git a/receiver/http_test.go b/receiver/http_test.go index b012b188..8e441401 100644 --- a/receiver/http_test.go +++ b/receiver/http_test.go @@ -86,6 +86,7 @@ func Test_updateHandler(t *testing.T) { Status: service.ALIVE, }, PreviousStatus: service.TOMBSTONE, + Time: evtState.LastChanged, }, } @@ -120,6 +121,7 @@ func Test_updateHandler(t *testing.T) { Status: service.ALIVE, }, PreviousStatus: service.TOMBSTONE, + Time: evtState.LastChanged, }, } @@ -178,6 +180,7 @@ func Test_updateHandler(t *testing.T) { Status: service.ALIVE, }, PreviousStatus: service.TOMBSTONE, + Time: evtState.LastChanged, }, } @@ -207,6 +210,7 @@ func Test_updateHandler(t *testing.T) { Status: service.ALIVE, }, PreviousStatus: service.TOMBSTONE, + Time: evtState.LastChanged, }, } @@ -239,6 +243,7 @@ func Test_updateHandler(t *testing.T) { Status: service.DRAINING, }, PreviousStatus: service.ALIVE, + Time: evtState.LastChanged, }, }