Skip to content

Commit 3b0dc4c

Browse files
Adapt the heartbeat checker to the configuration. (#361)
* Adapt the heartbeat checker to the configuration. The client could raise a missing heartbeat if a heartbeat is less than 20 seconds. In some cases, it could disconnect the client --------- Signed-off-by: Gabriele Santomaggio <G.santomaggio@gmail.com> Co-authored-by: Alberto Moretti <58828402+hiimjako@users.noreply.github.com>
1 parent bb5b42a commit 3b0dc4c

File tree

6 files changed

+40
-9
lines changed

6 files changed

+40
-9
lines changed

pkg/stream/brokers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ func newBrokerDefault() *Broker {
4949

5050
func newTCPParameterDefault() *TCPParameters {
5151
return &TCPParameters{
52-
RequestedHeartbeat: 60 * time.Second,
52+
RequestedHeartbeat: defaultHeartbeat,
5353
RequestedMaxFrameSize: 1048576,
5454
WriteBuffer: 8192,
5555
ReadBuffer: 65536,

pkg/stream/client.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -413,13 +413,15 @@ func (c *Client) DeleteStream(streamName string) error {
413413
}
414414

415415
func (c *Client) heartBeat() {
416-
ticker := time.NewTicker(60 * time.Second)
417-
tickerHeatBeat := time.NewTicker(20 * time.Second)
416+
ticker := time.NewTicker(time.Duration(c.tuneState.requestedHeartbeat) * time.Second)
417+
tickerHeartbeat := time.NewTicker(time.Duration(c.tuneState.requestedHeartbeat-2) * time.Second)
418+
418419
resp := c.coordinator.NewResponseWitName("heartbeat")
419420
var heartBeatMissed int32
421+
420422
go func() {
421423
for c.socket.isOpen() {
422-
<-tickerHeatBeat.C
424+
<-tickerHeartbeat.C
423425
if time.Since(c.getLastHeartBeat()) > time.Duration(c.tuneState.requestedHeartbeat)*time.Second {
424426
v := atomic.AddInt32(&heartBeatMissed, 1)
425427
logs.LogWarn("Missing heart beat: %d", v)
@@ -432,7 +434,7 @@ func (c *Client) heartBeat() {
432434
}
433435

434436
}
435-
tickerHeatBeat.Stop()
437+
tickerHeartbeat.Stop()
436438
}()
437439

438440
go func() {

pkg/stream/constants.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,8 @@ const (
8888
///
8989
defaultSocketCallTimeout = 10 * time.Second
9090

91+
defaultHeartbeat = 60 * time.Second
92+
9193
//
9294
LocalhostUriConnection = "rabbitmq-stream://guest:guest@localhost:5552/%2f"
9395

pkg/stream/environment.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,11 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) {
3030
options.RPCTimeout = defaultSocketCallTimeout
3131
}
3232

33+
if options.TCPParameters == nil {
34+
options.TCPParameters = newTCPParameterDefault()
35+
36+
}
37+
3338
client := newClient("go-stream-locator", nil,
3439
options.TCPParameters, options.SaslConfiguration, options.RPCTimeout)
3540
defer func(client *Client) {
@@ -39,6 +44,12 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) {
3944
}
4045
}(client)
4146

47+
// we put a limit to the heartbeat.
48+
// it doesn't make sense to have a heartbeat less than 3 seconds
49+
if options.TCPParameters.RequestedHeartbeat < (3 * time.Second) {
50+
return nil, errors.New("RequestedHeartbeat must be greater than 3 seconds")
51+
}
52+
4253
if options.MaxConsumersPerClient <= 0 || options.MaxProducersPerClient <= 0 ||
4354
options.MaxConsumersPerClient > 254 || options.MaxProducersPerClient > 254 {
4455
return nil, fmt.Errorf(" MaxConsumersPerClient and MaxProducersPerClient must be between 1 and 254")

pkg/stream/environment_test.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,7 @@ var _ = Describe("Environment test", func() {
208208
},
209209
TCPParameters: &TCPParameters{
210210
tlsConfig: nil,
211-
RequestedHeartbeat: 60,
211+
RequestedHeartbeat: defaultHeartbeat,
212212
RequestedMaxFrameSize: 1048574,
213213
WriteBuffer: 100,
214214
ReadBuffer: 200,
@@ -263,6 +263,11 @@ var _ = Describe("Environment test", func() {
263263
Expect(env.options.TCPParameters.RequestedMaxFrameSize).NotTo(BeZero())
264264
})
265265

266+
It("RequestedHeartbeat should be greater then 3 seconds", func() {
267+
_, err := NewEnvironment(NewEnvironmentOptions().SetRequestedHeartbeat(2 * time.Second))
268+
Expect(err).To(HaveOccurred())
269+
})
270+
266271
})
267272

268273
Describe("Validation Query Offset/Sequence", func() {

pkg/stream/producer.go

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -94,10 +94,10 @@ func NewProducerFilter(filterValue FilterValue) *ProducerFilter {
9494
type ProducerOptions struct {
9595
client *Client
9696
streamName string
97-
Name string // Producer name, it is useful to handle deduplication messages
97+
Name string // Producer name, valid for deduplication
9898
QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
99-
BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput
100-
BatchPublishingDelay int // Period to Send a batch of messages.
99+
BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput. Valid only for the method Send()
100+
BatchPublishingDelay int // Timout within the aggregation sent a batch of messages. Valid only for the method Send()
101101
SubEntrySize int // Size of sub Entry, to aggregate more subEntry using one publishing id
102102
Compression Compression // Compression type, it is valid only if SubEntrySize > 1
103103
ConfirmationTimeOut time.Duration // Time to wait for the confirmation
@@ -115,6 +115,8 @@ func (po *ProducerOptions) SetQueueSize(size int) *ProducerOptions {
115115
return po
116116
}
117117

118+
// SetBatchSize sets the batch size for the producer
119+
// Valid only for the method Send()
118120
func (po *ProducerOptions) SetBatchSize(size int) *ProducerOptions {
119121
po.BatchSize = size
120122
return po
@@ -355,6 +357,9 @@ func (producer *Producer) sendBytes(streamMessage message.StreamMessage, message
355357
return nil
356358
}
357359

360+
// Send sends a message to the stream and returns an error if the message could not be sent.
361+
// Send is asynchronous. The aggregation of the messages is based on the BatchSize and BatchPublishingDelay
362+
// options. The message is sent when the aggregation is reached or the BatchPublishingDelay is reached.
358363
func (producer *Producer) Send(streamMessage message.StreamMessage) error {
359364
messageBytes, err := streamMessage.MarshalBinary()
360365
if err != nil {
@@ -372,6 +377,12 @@ func (producer *Producer) assignPublishingID(message message.StreamMessage) int6
372377
return sequence
373378
}
374379

380+
// BatchSend sends a batch of messages to the stream and returns an error if the messages could not be sent.
381+
// BatchSend is synchronous. The aggregation is up to the user. The user has to aggregate the messages
382+
// and send them in a batch.
383+
// BatchSend is not affected by the BatchSize and BatchPublishingDelay options.
384+
// BatchSend is the primitive method to send messages to the stream, the method Send prepares the messages and
385+
// calls BatchSend internally.
375386
func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error {
376387
var messagesSequence = make([]messageSequence, len(batchMessages))
377388
totalBufferToSend := 0

0 commit comments

Comments
 (0)