-
Notifications
You must be signed in to change notification settings - Fork 24
Heartbeats #239
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Heartbeats #239
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
package stream | ||
|
||
import ( | ||
"github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/raw" | ||
"golang.org/x/exp/slog" | ||
"sync" | ||
"time" | ||
) | ||
|
||
type heartBeater struct { | ||
logger *slog.Logger | ||
client raw.Clienter | ||
tickDuration time.Duration | ||
ticker *time.Ticker | ||
done *DoneChan | ||
receiveCh <-chan *raw.Heartbeat | ||
} | ||
|
||
type DoneChan struct { | ||
C chan struct{} | ||
closed bool | ||
mutex sync.Mutex | ||
} | ||
|
||
func NewDoneChan() *DoneChan { | ||
return &DoneChan{C: make(chan struct{})} | ||
} | ||
|
||
// GracefulClose closes the DoneChan only if the Done chan is not already closed. | ||
func (dc *DoneChan) GracefulClose() { | ||
dc.mutex.Lock() | ||
defer dc.mutex.Unlock() | ||
|
||
if !dc.closed { | ||
close(dc.C) | ||
dc.closed = true | ||
} | ||
} | ||
Comment on lines
+19
to
+38
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I prefer to make The graceful close is fine, it works as expected and it should not panic. A simpler implementation, since we are already importing Either way is fine by me, as long as we un-export the |
||
|
||
func NewHeartBeater(duration time.Duration, client raw.Clienter, logger *slog.Logger) *heartBeater { | ||
return &heartBeater{ | ||
logger: logger, | ||
client: client, | ||
tickDuration: duration, | ||
done: NewDoneChan(), | ||
} | ||
} | ||
|
||
func (hb *heartBeater) start() { | ||
hb.ticker = time.NewTicker(hb.tickDuration) | ||
hb.receiveCh = hb.client.NotifyHeartbeat() | ||
|
||
go func() { | ||
for { | ||
select { | ||
case <-hb.done.C: | ||
return | ||
case <-hb.ticker.C: | ||
hb.send() | ||
case <-hb.receiveCh: | ||
hb.send() | ||
} | ||
} | ||
}() | ||
} | ||
|
||
func (hb *heartBeater) reset() { | ||
// This nil check is mainly for tests. | ||
if hb == nil || hb.ticker == nil { | ||
return | ||
} | ||
hb.ticker.Reset(hb.tickDuration) | ||
} | ||
|
||
func (hb *heartBeater) stop() { | ||
hb.ticker.Stop() | ||
hb.done.GracefulClose() | ||
} | ||
|
||
func (hb *heartBeater) send() { | ||
err := hb.client.SendHeartbeat() | ||
Comment on lines
+80
to
+81
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have to ensure that access to |
||
if err != nil { | ||
hb.logger.Error("error sending heartbeat", "error", err) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,86 @@ | ||
package stream | ||
|
||
import ( | ||
. "github.com/onsi/ginkgo/v2" | ||
. "github.com/onsi/gomega" | ||
"github.com/rabbitmq/rabbitmq-stream-go-client/v2/pkg/raw" | ||
"go.uber.org/mock/gomock" | ||
"time" | ||
) | ||
|
||
var _ = Describe("Heartbeater", func() { | ||
|
||
var ( | ||
hb *heartBeater | ||
mockCtrl *gomock.Controller | ||
mockRawClient *MockRawClient | ||
) | ||
|
||
BeforeEach(func() { | ||
mockCtrl = gomock.NewController(GinkgoT()) | ||
mockRawClient = NewMockRawClient(mockCtrl) | ||
hb = NewHeartBeater(time.Millisecond*100, mockRawClient, nil) | ||
}) | ||
|
||
It("can configure the tick duration", func() { | ||
Expect(hb.tickDuration).To(BeNumerically("==", 100000000)) | ||
}) | ||
|
||
It("when the tick duration expires, it sends a heartbeat", func() { | ||
// setup the mock | ||
mockRawClient.EXPECT().SendHeartbeat() | ||
mockRawClient.EXPECT().NotifyHeartbeat(). | ||
DoAndReturn(func() <-chan *raw.Heartbeat { | ||
return make(chan *raw.Heartbeat) | ||
}) | ||
hb.start() | ||
// wait until the mock gets called | ||
// the mock will fail the test in SendHeartbeat is not called | ||
<-time.After(time.Millisecond * 150) | ||
}) | ||
|
||
It("sends a heartbeat when it receives one from the server", func(ctx SpecContext) { | ||
var receiveCh chan *raw.Heartbeat | ||
mockRawClient.EXPECT().NotifyHeartbeat(). | ||
DoAndReturn(func() <-chan *raw.Heartbeat { | ||
receiveCh = make(chan *raw.Heartbeat) | ||
return receiveCh | ||
}) | ||
mockRawClient.EXPECT().SendHeartbeat() | ||
|
||
hb.start() | ||
|
||
select { | ||
case <-ctx.Done(): | ||
Fail("failed in setup: did not send a heartbeat notification") | ||
case receiveCh <- &raw.Heartbeat{}: | ||
} | ||
|
||
// wait until the mock gets called | ||
// the mock will fail the test in SendHeartbeat is not called | ||
<-time.After(time.Millisecond * 50) | ||
hb.stop() | ||
}, SpecTimeout(100*time.Millisecond)) | ||
|
||
It("stops the heartbeater gracefully", func() { | ||
// TODO use the gleak detector | ||
mockRawClient.EXPECT().NotifyHeartbeat(). | ||
DoAndReturn(func() <-chan *raw.Heartbeat { | ||
return make(chan *raw.Heartbeat) | ||
}) | ||
|
||
hb.tickDuration = time.Minute // we do not want to receive a tick | ||
hb.start() | ||
Expect(hb.done.C).ToNot(BeClosed()) | ||
Expect(hb.ticker.C).ToNot(BeClosed()) | ||
|
||
hb.stop() | ||
Expect(hb.done.C).To(BeClosed()) | ||
Consistently(hb.ticker.C, "100ms").ShouldNot(Receive()) | ||
|
||
By("not panicking on subsequent close") | ||
hb.stop() | ||
// TODO investigate using gleak and asserts that heartbeater go routine have not leaked | ||
// tried this before, but could not make the test go red, even after leaking the heartbeater routine | ||
}) | ||
}) |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
raw.Clienter
will be shared between consumers and producers, and it is not thread-safe. We will need a pointer to a mutex to ensure synchronised access to the raw client, and pass a shared mutex.