From ece1e69f36605a76d3177644b9bee2d27597987d Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Sat, 11 May 2019 23:15:27 +0100 Subject: [PATCH 1/2] Tombstone services before sending updates Services from expired servers need to be tombstoned before informing the listeners about them. --- catalog/services_state.go | 2 +- catalog/services_state_test.go | 12 ++++++++++++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/catalog/services_state.go b/catalog/services_state.go index db688418..6d357bb2 100644 --- a/catalog/services_state.go +++ b/catalog/services_state.go @@ -171,8 +171,8 @@ func (state *ServicesState) ExpireServer(hostname string) { for _, svc := range state.Servers[hostname].Services { previousStatus := svc.Status - state.ServiceChanged(svc, previousStatus, svc.Updated) svc.Tombstone() + state.ServiceChanged(svc, previousStatus, svc.Updated) tombstones = append(tombstones, *svc) } diff --git a/catalog/services_state_test.go b/catalog/services_state_test.go index ac848f5a..d164edb2 100644 --- a/catalog/services_state_test.go +++ b/catalog/services_state_test.go @@ -670,6 +670,11 @@ func Test_ClusterMembershipManagement(t *testing.T) { state.AddServiceEntry(service1) state.AddServiceEntry(service2) + dummyListener := mockListener{ + events: make(chan ChangeEvent, len(state.Servers[hostname].Services)), + } + state.AddListener(&dummyListener) + go state.ExpireServer(hostname) expired := <-state.Broadcasts @@ -677,6 +682,13 @@ func Test_ClusterMembershipManagement(t *testing.T) { // Timestamps chagne when tombstoning, so regex match So(expired[0], ShouldMatch, "^{\"ID\":\"deadbeef.*\"Status\":1}$") So(expired[1], ShouldMatch, "^{\"ID\":\"deadbeef.*\"Status\":1}$") + + Convey("and sends the tombstones to any listener", func() { + for i := 0; i < len(state.Servers[hostname].Services); i++ { + changeEvent := <-dummyListener.Chan() + So(changeEvent.Service.Status, ShouldEqual, service.TOMBSTONE) + } + }) }) Convey("does not announce services for hosts with none", func() { From 7211a4ffebcaafc163525dffdea7d32915a7c0fb Mon Sep 17 00:00:00 2001 From: Mihai Todor Date: Tue, 14 May 2019 14:59:36 +0100 Subject: [PATCH 2/2] Make sure listeners get all updates when a server is expired Use the StateChangedEvent.ChangeEvent.Time to figure out if the current state of the receiver is older than the received event. We can't rely on evt.State.LastChanged for this because, when expiring a server, we get multiple events coming in on UrlListener.eventChannel and each StateChangedEvent created will contain a snapshot of the same state. This will make the receiver process only the first incoming event and drop the rest, because rcvr.CurrentState.LastChanged = evt.State.LastChanged after the first event gets processed. --- catalog/url_listener_test.go | 77 ++++++++++++++++++++++++++++-------- receiver/http.go | 2 +- receiver/http_test.go | 5 +++ 3 files changed, 67 insertions(+), 17 deletions(-) diff --git a/catalog/url_listener_test.go b/catalog/url_listener_test.go index 86c5dd98..c4757dd5 100644 --- a/catalog/url_listener_test.go +++ b/catalog/url_listener_test.go @@ -1,9 +1,12 @@ package catalog import ( + "encoding/json" + "io/ioutil" "net/http" "net/url" "testing" + "time" "github.com/Nitro/sidecar/service" "github.com/relistan/go-director" @@ -47,36 +50,57 @@ func Test_prepareCookieJar(t *testing.T) { } func Test_Listen(t *testing.T) { - Convey("Listen()", t, func() { + Convey("Listen()", t, func(c C) { url := "http://beowulf.example.com" - - httpmock.RegisterResponder( - "POST", url, - func(req *http.Request) (*http.Response, error) { - return httpmock.NewStringResponse(500, "so bad!"), nil - }, - ) - - httpmock.Activate() - listener := NewUrlListener(url, false) - errors := make(chan error) - listener.looper = director.NewFreeLooper(1, errors) - hostname := "grendel" svcId1 := "deadbeef123" service1 := service.Service{ID: svcId1, Hostname: hostname} + svcId2 := "ecgtheow" + service2 := service.Service{ID: svcId2, Hostname: hostname} state := NewServicesState() state.Hostname = hostname - state.AddServiceEntry(service1) - state.Servers[hostname].Services[service1.ID].Tombstone() + postShouldErr := false + var changeEventTime time.Time + httpmock.RegisterResponder( + "POST", url, + func(req *http.Request) (*http.Response, error) { + if postShouldErr { + return httpmock.NewStringResponse(500, "so bad!"), nil + } + + bodyBytes, err := ioutil.ReadAll(req.Body) + c.So(err, ShouldBeNil) + + var evt StateChangedEvent + err = json.Unmarshal(bodyBytes, &evt) + c.So(err, ShouldBeNil) + c.So(evt.ChangeEvent.PreviousStatus, ShouldEqual, service.ALIVE) + + // Make sure each new event comes in with a different timestamp + c.So(evt.ChangeEvent.Time, ShouldNotEqual, changeEventTime) + changeEventTime = evt.ChangeEvent.Time + + return httpmock.NewBytesResponse(200, nil), nil + }, + ) + httpmock.Activate() Reset(func() { httpmock.DeactivateAndReset() }) Convey("handles a bad post", func() { + postShouldErr = true + + state.AddServiceEntry(service1) + state.Servers[hostname].Services[service1.ID].Tombstone() + + listener := NewUrlListener(url, false) + errors := make(chan error) + listener.looper = director.NewFreeLooper(1, errors) + listener.eventChannel <- ChangeEvent{} listener.Retries = 0 listener.Watch(state) @@ -85,5 +109,26 @@ func Test_Listen(t *testing.T) { So(err, ShouldBeNil) So(len(errors), ShouldEqual, 0) }) + + Convey("gets all updates when a server expires", func() { + state.AddServiceEntry(service1) + state.AddServiceEntry(service2) + + listener := NewUrlListener(url, false) + errors := make(chan error) + // Do two iterations: One for each service from the expired server + listener.looper = director.NewFreeLooper( + len(state.Servers[hostname].Services), errors) + listener.Retries = 0 + + listener.Watch(state) + + state.ExpireServer(hostname) + + // Block until both iterations are done + err := listener.looper.Wait() + So(err, ShouldBeNil) + So(len(errors), ShouldEqual, 0) + }) }) } diff --git a/receiver/http.go b/receiver/http.go index 13d29e0d..5deee5c2 100644 --- a/receiver/http.go +++ b/receiver/http.go @@ -44,7 +44,7 @@ func UpdateHandler(response http.ResponseWriter, req *http.Request, rcvr *Receiv rcvr.StateLock.Lock() defer rcvr.StateLock.Unlock() - if rcvr.CurrentState == nil || rcvr.CurrentState.LastChanged.Before(evt.State.LastChanged) { + if rcvr.CurrentState == nil || rcvr.CurrentState.LastChanged.Before(evt.ChangeEvent.Time) { rcvr.CurrentState = evt.State rcvr.LastSvcChanged = &evt.ChangeEvent.Service diff --git a/receiver/http_test.go b/receiver/http_test.go index b012b188..8e441401 100644 --- a/receiver/http_test.go +++ b/receiver/http_test.go @@ -86,6 +86,7 @@ func Test_updateHandler(t *testing.T) { Status: service.ALIVE, }, PreviousStatus: service.TOMBSTONE, + Time: evtState.LastChanged, }, } @@ -120,6 +121,7 @@ func Test_updateHandler(t *testing.T) { Status: service.ALIVE, }, PreviousStatus: service.TOMBSTONE, + Time: evtState.LastChanged, }, } @@ -178,6 +180,7 @@ func Test_updateHandler(t *testing.T) { Status: service.ALIVE, }, PreviousStatus: service.TOMBSTONE, + Time: evtState.LastChanged, }, } @@ -207,6 +210,7 @@ func Test_updateHandler(t *testing.T) { Status: service.ALIVE, }, PreviousStatus: service.TOMBSTONE, + Time: evtState.LastChanged, }, } @@ -239,6 +243,7 @@ func Test_updateHandler(t *testing.T) { Status: service.DRAINING, }, PreviousStatus: service.ALIVE, + Time: evtState.LastChanged, }, }