From a71d2a23696a8bd06158523fe252861b9aa980e4 Mon Sep 17 00:00:00 2001 From: Alex Blease Date: Thu, 27 Jul 2023 16:43:07 +0100 Subject: [PATCH 1/3] Heartbeat service in smart layer N.B. Failing test on stopping the heartbeater gracefully. Signed-off-by: Alex Blease --- pkg/raw/client_types.go | 4 ++ pkg/stream/heartbeater.go | 63 ++++++++++++++++++++++ pkg/stream/heartbeater_test.go | 87 ++++++++++++++++++++++++++++++ pkg/stream/locator.go | 28 ++++++++-- pkg/stream/locator_test.go | 59 +++++++++++++++++++- pkg/stream/mock_raw_client_test.go | 2 + 6 files changed, 236 insertions(+), 7 deletions(-) create mode 100644 pkg/stream/heartbeater.go create mode 100644 pkg/stream/heartbeater_test.go diff --git a/pkg/raw/client_types.go b/pkg/raw/client_types.go index 0e154593..0c8acefb 100644 --- a/pkg/raw/client_types.go +++ b/pkg/raw/client_types.go @@ -74,6 +74,10 @@ func (r *ClientConfiguration) RabbitmqBrokers() broker { return r.rabbitmqBroker } +func (r *ClientConfiguration) ClientHeartbeat() uint32 { + return r.clientHeartbeat +} + func (r *ClientConfiguration) SetClientMaxFrameSize(clientMaxFrameSize uint32) { r.clientMaxFrameSize = clientMaxFrameSize } diff --git a/pkg/stream/heartbeater.go b/pkg/stream/heartbeater.go new file mode 100644 index 00000000..2a55a02c --- /dev/null +++ b/pkg/stream/heartbeater.go @@ -0,0 +1,63 @@ +package stream + +import ( + "github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/raw" + "golang.org/x/exp/slog" + "time" +) + +type heartBeater struct { + logger *slog.Logger + client raw.Clienter + tickDuration time.Duration + ticker *time.Ticker + done chan struct{} + receiveCh <-chan *raw.Heartbeat +} + +func NewHeartBeater(duration time.Duration, client raw.Clienter, logger *slog.Logger) *heartBeater { + return &heartBeater{ + logger: logger, + client: client, + tickDuration: duration, + done: make(chan struct{}), + } +} + +func (hb *heartBeater) start() { + hb.ticker = time.NewTicker(hb.tickDuration) + hb.receiveCh = hb.client.NotifyHeartbeat() + + go func() { + for { + select { + case <-hb.done: + return + case <-hb.ticker.C: + hb.send() + case <-hb.receiveCh: + hb.send() + } + } + }() +} + +func (hb *heartBeater) reset() { + // This nil check is mainly for tests. + if hb == nil || hb.ticker == nil { + return + } + hb.ticker.Reset(hb.tickDuration) +} + +func (hb *heartBeater) stop() { + hb.ticker.Stop() + close(hb.done) +} + +func (hb *heartBeater) send() { + err := hb.client.SendHeartbeat() + if err != nil { + hb.logger.Error("error sending heartbeat", "error", err) + } +} diff --git a/pkg/stream/heartbeater_test.go b/pkg/stream/heartbeater_test.go new file mode 100644 index 00000000..9905557d --- /dev/null +++ b/pkg/stream/heartbeater_test.go @@ -0,0 +1,87 @@ +package stream + +import ( + "github.com/golang/mock/gomock" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/raw" + "time" +) + +var _ = Describe("Heartbeater", func() { + + var ( + hb *heartBeater + mockCtrl *gomock.Controller + mockRawClient *MockRawClient + ) + + BeforeEach(func() { + mockCtrl = gomock.NewController(GinkgoT()) + mockRawClient = NewMockRawClient(mockCtrl) + hb = NewHeartBeater(time.Millisecond*100, mockRawClient, nil) + }) + + It("can configure the tick duration", func() { + Expect(hb.tickDuration).To(BeNumerically("==", 100000000)) + }) + + It("when the tick duration expires, it sends a heartbeat", func() { + // setup the mock + mockRawClient.EXPECT().SendHeartbeat() + mockRawClient.EXPECT().NotifyHeartbeat(). + DoAndReturn(func() <-chan *raw.Heartbeat { + return make(chan *raw.Heartbeat) + }) + hb.start() + // wait until the mock gets called + // the mock will fail the test in SendHeartbeat is not called + <-time.After(time.Millisecond * 150) + }) + + It("sends a heartbeat when it receives one from the server", func(ctx SpecContext) { + var receiveCh chan *raw.Heartbeat + mockRawClient.EXPECT().NotifyHeartbeat(). + DoAndReturn(func() <-chan *raw.Heartbeat { + receiveCh = make(chan *raw.Heartbeat) + return receiveCh + }) + mockRawClient.EXPECT().SendHeartbeat() + + hb.start() + + select { + case <-ctx.Done(): + Fail("failed in setup: did not send a heartbeat notification") + case receiveCh <- &raw.Heartbeat{}: + } + + // wait until the mock gets called + // the mock will fail the test in SendHeartbeat is not called + <-time.After(time.Millisecond * 50) + hb.stop() + }, SpecTimeout(100*time.Millisecond)) + + It("stops the heartbeater gracefully", func() { + // TODO use the gleak detector + mockRawClient.EXPECT().NotifyHeartbeat(). + DoAndReturn(func() <-chan *raw.Heartbeat { + return make(chan *raw.Heartbeat) + }) + + hb.tickDuration = time.Minute // we do not want to receive a tick + hb.start() + Expect(hb.done).ToNot(BeClosed()) + Expect(hb.ticker.C).ToNot(BeClosed()) + + hb.stop() + Expect(hb.done).To(BeClosed()) + Consistently(hb.ticker.C, "100ms").ShouldNot(Receive()) + + By("not panicking on subsequent close") + // FIXME close on a closed channel panics + hb.stop() + // TODO investigate using gleak and asserts that heartbeater go routine have not leaked + // tried this before, but could not make the test go red, even after leaking the heartbeater routine + }) +}) diff --git a/pkg/stream/locator.go b/pkg/stream/locator.go index 8ed838ac..9eff62f9 100644 --- a/pkg/stream/locator.go +++ b/pkg/stream/locator.go @@ -2,12 +2,13 @@ package stream import ( "context" - "github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/raw" - "golang.org/x/exp/slog" - "golang.org/x/mod/semver" "net" "sync" "time" + + "github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/raw" + "golang.org/x/exp/slog" + "golang.org/x/mod/semver" ) const ( @@ -24,11 +25,11 @@ type locator struct { clientClose <-chan error backOffPolicy func(int) time.Duration addressResolver net.Addr // TODO: placeholder for address resolver - + heartbeater *heartBeater } func newLocator(c raw.ClientConfiguration, logger *slog.Logger) *locator { - return &locator{ + locator := &locator{ log: logger. WithGroup("locator"). With( @@ -43,7 +44,10 @@ func newLocator(c raw.ClientConfiguration, logger *slog.Logger) *locator { isSet: false, addressResolver: nil, shutdownNotification: make(chan struct{}), + heartbeater: NewHeartBeater(time.Second*time.Duration(c.ClientHeartbeat()), nil, logger), } + + return locator } func (l *locator) connect(ctx context.Context) error { @@ -64,6 +68,10 @@ func (l *locator) connect(ctx context.Context) error { return l.client.ExchangeCommandVersions(ctx) } + // TODO Start heartbeat here + l.heartbeater.client = client + l.heartbeater.start() + return nil } @@ -95,6 +103,7 @@ func (l *locator) shutdownHandler() { // TODO: maybe add a 'ok' safeguard here? log.Debug("unexpected locator disconnection, trying to reconnect", slog.Any("error", err)) l.Lock() + l.heartbeater.stop() for i := 0; i < 100; i++ { dialCtx, cancel := context.WithTimeout(raw.NewContextWithLogger(context.Background(), *log), DefaultTimeout) c, e := raw.DialConfig(dialCtx, &l.rawClientConf) @@ -116,6 +125,8 @@ func (l *locator) shutdownHandler() { l.client = c l.clientClose = c.NotifyConnectionClosed() + l.heartbeater.client = c + l.heartbeater.start() log.Debug("locator reconnected") @@ -171,6 +182,8 @@ func (l *locator) locatorOperation(op locatorOperationFn, args ...any) (result [ l.log.Debug("error in locator operation", slog.Any("error", lastErr), slog.Int("attempt", attempt)) attempt++ } + // TODO reset heartbeat timer + l.heartbeater.reset() return result } @@ -204,6 +217,7 @@ func (l *locator) operationQueryOffset(args ...any) []any { offset, err := l.client.QueryOffset(ctx, reference, stream) return []any{offset, err} } + func (l *locator) operationPartitions(args ...any) []any { ctx := args[0].(context.Context) superstream := args[1].(string) @@ -218,3 +232,7 @@ func (l *locator) operationQuerySequence(args ...any) []any { pubId, err := l.client.QueryPublisherSequence(ctx, reference, stream) return []any{pubId, err} } + +func (l *locator) operationSendHeartbeat(args ...any) []any { + return []any{l.client.SendHeartbeat()} +} diff --git a/pkg/stream/locator_test.go b/pkg/stream/locator_test.go index c269b021..967a203d 100644 --- a/pkg/stream/locator_test.go +++ b/pkg/stream/locator_test.go @@ -3,13 +3,13 @@ package stream import ( "context" "errors" - "time" - + "github.com/golang/mock/gomock" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/onsi/gomega/gbytes" "github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/raw" "golang.org/x/exp/slog" + "time" ) var _ = Describe("Locator", func() { @@ -119,6 +119,61 @@ var _ = Describe("Locator", func() { Eventually(logBuffer).Within(time.Second).Should(gbytes.Say("context deadline exceeded")) }) }) + + }) + + Describe("heartbeats", func() { + var ( + mockCtrl *gomock.Controller + mockRawClient *MockRawClient + discardLogger = slog.New(discardHandler{}) + backOffPolicy = func(_ int) time.Duration { + return time.Millisecond * 10 + } + ) + + BeforeEach(func() { + mockCtrl = gomock.NewController(GinkgoT()) + mockRawClient = NewMockRawClient(mockCtrl) + }) + + It("resets heartbeat ticker on locator operation", func() { + mockRawClient.EXPECT().NotifyHeartbeat() + + hb := NewHeartBeater(time.Second, mockRawClient, discardLogger) + loc := &locator{ + log: discardLogger, + shutdownNotification: make(chan struct{}), + rawClientConf: raw.ClientConfiguration{}, + client: nil, + isSet: true, + clientClose: nil, + backOffPolicy: backOffPolicy, + heartbeater: hb, + } + + done := make(chan struct{}) + hb.start() + + go func() { + defer GinkgoRecover() + Consistently(loc.heartbeater.ticker.C, "1010ms").ShouldNot(Receive()) + close(done) + }() + + // do a locator op + loc.locatorOperation(func(_ *locator, _ ...any) (result []any) { + <-time.After(time.Millisecond * 50) + return []any{nil} + }) + <-done + }) + + It("resets heartbeat ticker", func() { + // do a locator operation + // see that hb.reset is called + + }) }) Describe("Utils", func() { diff --git a/pkg/stream/mock_raw_client_test.go b/pkg/stream/mock_raw_client_test.go index d1068fd2..212594eb 100644 --- a/pkg/stream/mock_raw_client_test.go +++ b/pkg/stream/mock_raw_client_test.go @@ -6,6 +6,7 @@ package stream import ( context "context" + "github.com/onsi/ginkgo/v2" reflect "reflect" common "github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/common" @@ -379,6 +380,7 @@ func (mr *MockRawClientMockRecorder) Send(ctx, publisherId, messages interface{} // SendHeartbeat mocks base method. func (m *MockRawClient) SendHeartbeat() error { + defer ginkgo.GinkgoRecover() m.ctrl.T.Helper() ret := m.ctrl.Call(m, "SendHeartbeat") ret0, _ := ret[0].(error) From a70ef99f1dad1945c90224eed2ae4d040146226c Mon Sep 17 00:00:00 2001 From: Alex Blease Date: Thu, 28 Sep 2023 15:44:09 +0100 Subject: [PATCH 2/3] Safely close heartbeat done channel --- pkg/stream/heartbeater.go | 30 ++++++++++++++++++++++++++---- pkg/stream/heartbeater_test.go | 7 +++---- pkg/stream/locator_test.go | 2 +- 3 files changed, 30 insertions(+), 9 deletions(-) diff --git a/pkg/stream/heartbeater.go b/pkg/stream/heartbeater.go index 2a55a02c..69171dd1 100644 --- a/pkg/stream/heartbeater.go +++ b/pkg/stream/heartbeater.go @@ -3,6 +3,7 @@ package stream import ( "github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/raw" "golang.org/x/exp/slog" + "sync" "time" ) @@ -11,16 +12,37 @@ type heartBeater struct { client raw.Clienter tickDuration time.Duration ticker *time.Ticker - done chan struct{} + done *DoneChan receiveCh <-chan *raw.Heartbeat } +type DoneChan struct { + C chan struct{} + closed bool + mutex sync.Mutex +} + +func NewDoneChan() *DoneChan { + return &DoneChan{C: make(chan struct{})} +} + +// GracefulClose closes the DoneChan only if the Done chan is not already closed. +func (dc *DoneChan) GracefulClose() { + dc.mutex.Lock() + defer dc.mutex.Unlock() + + if !dc.closed { + close(dc.C) + dc.closed = true + } +} + func NewHeartBeater(duration time.Duration, client raw.Clienter, logger *slog.Logger) *heartBeater { return &heartBeater{ logger: logger, client: client, tickDuration: duration, - done: make(chan struct{}), + done: NewDoneChan(), } } @@ -31,7 +53,7 @@ func (hb *heartBeater) start() { go func() { for { select { - case <-hb.done: + case <-hb.done.C: return case <-hb.ticker.C: hb.send() @@ -52,7 +74,7 @@ func (hb *heartBeater) reset() { func (hb *heartBeater) stop() { hb.ticker.Stop() - close(hb.done) + hb.done.GracefulClose() } func (hb *heartBeater) send() { diff --git a/pkg/stream/heartbeater_test.go b/pkg/stream/heartbeater_test.go index 9905557d..e22c04a2 100644 --- a/pkg/stream/heartbeater_test.go +++ b/pkg/stream/heartbeater_test.go @@ -1,10 +1,10 @@ package stream import ( - "github.com/golang/mock/gomock" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/raw" + "go.uber.org/mock/gomock" "time" ) @@ -71,15 +71,14 @@ var _ = Describe("Heartbeater", func() { hb.tickDuration = time.Minute // we do not want to receive a tick hb.start() - Expect(hb.done).ToNot(BeClosed()) + Expect(hb.done.C).ToNot(BeClosed()) Expect(hb.ticker.C).ToNot(BeClosed()) hb.stop() - Expect(hb.done).To(BeClosed()) + Expect(hb.done.C).To(BeClosed()) Consistently(hb.ticker.C, "100ms").ShouldNot(Receive()) By("not panicking on subsequent close") - // FIXME close on a closed channel panics hb.stop() // TODO investigate using gleak and asserts that heartbeater go routine have not leaked // tried this before, but could not make the test go red, even after leaking the heartbeater routine diff --git a/pkg/stream/locator_test.go b/pkg/stream/locator_test.go index 967a203d..94e8db89 100644 --- a/pkg/stream/locator_test.go +++ b/pkg/stream/locator_test.go @@ -3,11 +3,11 @@ package stream import ( "context" "errors" - "github.com/golang/mock/gomock" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/onsi/gomega/gbytes" "github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/raw" + "go.uber.org/mock/gomock" "golang.org/x/exp/slog" "time" ) From 90a9bfb118ee48fd645efc4f5e990248c0d6c0d7 Mon Sep 17 00:00:00 2001 From: Alex Blease Date: Thu, 5 Oct 2023 16:34:32 +0100 Subject: [PATCH 3/3] Heartbeat cleanup --- pkg/stream/locator.go | 5 ----- pkg/stream/locator_test.go | 6 ------ 2 files changed, 11 deletions(-) diff --git a/pkg/stream/locator.go b/pkg/stream/locator.go index 9eff62f9..b6a06ec9 100644 --- a/pkg/stream/locator.go +++ b/pkg/stream/locator.go @@ -68,7 +68,6 @@ func (l *locator) connect(ctx context.Context) error { return l.client.ExchangeCommandVersions(ctx) } - // TODO Start heartbeat here l.heartbeater.client = client l.heartbeater.start() @@ -232,7 +231,3 @@ func (l *locator) operationQuerySequence(args ...any) []any { pubId, err := l.client.QueryPublisherSequence(ctx, reference, stream) return []any{pubId, err} } - -func (l *locator) operationSendHeartbeat(args ...any) []any { - return []any{l.client.SendHeartbeat()} -} diff --git a/pkg/stream/locator_test.go b/pkg/stream/locator_test.go index 94e8db89..03ce30ca 100644 --- a/pkg/stream/locator_test.go +++ b/pkg/stream/locator_test.go @@ -168,12 +168,6 @@ var _ = Describe("Locator", func() { }) <-done }) - - It("resets heartbeat ticker", func() { - // do a locator operation - // see that hb.reset is called - - }) }) Describe("Utils", func() {