Skip to content

Adapt the heartbeat checker to the configuration. #361

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/stream/brokers.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func newBrokerDefault() *Broker {

func newTCPParameterDefault() *TCPParameters {
return &TCPParameters{
RequestedHeartbeat: 60 * time.Second,
RequestedHeartbeat: defaultHeartbeat,
RequestedMaxFrameSize: 1048576,
WriteBuffer: 8192,
ReadBuffer: 65536,
Expand Down
6 changes: 4 additions & 2 deletions pkg/stream/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,10 +413,12 @@ func (c *Client) DeleteStream(streamName string) error {
}

func (c *Client) heartBeat() {
ticker := time.NewTicker(60 * time.Second)
tickerHeatBeat := time.NewTicker(20 * time.Second)
ticker := time.NewTicker(time.Duration(c.tuneState.requestedHeartbeat) * time.Second)
tickerHeatBeat := time.NewTicker(time.Duration(c.tuneState.requestedHeartbeat-2) * time.Second)

resp := c.coordinator.NewResponseWitName("heartbeat")
var heartBeatMissed int32

go func() {
for c.socket.isOpen() {
<-tickerHeatBeat.C
Expand Down
2 changes: 2 additions & 0 deletions pkg/stream/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ const (
///
defaultSocketCallTimeout = 10 * time.Second

defaultHeartbeat = 60 * time.Second

//
LocalhostUriConnection = "rabbitmq-stream://guest:guest@localhost:5552/%2f"

Expand Down
11 changes: 11 additions & 0 deletions pkg/stream/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) {
options.RPCTimeout = defaultSocketCallTimeout
}

if options.TCPParameters == nil {
options.TCPParameters = newTCPParameterDefault()

}

client := newClient("go-stream-locator", nil,
options.TCPParameters, options.SaslConfiguration, options.RPCTimeout)
defer func(client *Client) {
Expand All @@ -39,6 +44,12 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) {
}
}(client)

// we put a limit to the heartbeat.
// it doesn't make sense to have a heartbeat less than 3 seconds
if options.TCPParameters.RequestedHeartbeat < (3 * time.Second) {
return nil, errors.New("RequestedHeartbeat must be greater than 3 seconds")
}

if options.MaxConsumersPerClient <= 0 || options.MaxProducersPerClient <= 0 ||
options.MaxConsumersPerClient > 254 || options.MaxProducersPerClient > 254 {
return nil, fmt.Errorf(" MaxConsumersPerClient and MaxProducersPerClient must be between 1 and 254")
Expand Down
7 changes: 6 additions & 1 deletion pkg/stream/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ var _ = Describe("Environment test", func() {
},
TCPParameters: &TCPParameters{
tlsConfig: nil,
RequestedHeartbeat: 60,
RequestedHeartbeat: defaultHeartbeat,
RequestedMaxFrameSize: 1048574,
WriteBuffer: 100,
ReadBuffer: 200,
Expand Down Expand Up @@ -263,6 +263,11 @@ var _ = Describe("Environment test", func() {
Expect(env.options.TCPParameters.RequestedMaxFrameSize).NotTo(BeZero())
})

It("RequestedHeartbeat should be greater then 3 seconds", func() {
_, err := NewEnvironment(NewEnvironmentOptions().SetRequestedHeartbeat(2 * time.Second))
Expect(err).To(HaveOccurred())
})

})

Describe("Validation Query Offset/Sequence", func() {
Expand Down
17 changes: 14 additions & 3 deletions pkg/stream/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,10 +94,10 @@ func NewProducerFilter(filterValue FilterValue) *ProducerFilter {
type ProducerOptions struct {
client *Client
streamName string
Name string // Producer name, it is useful to handle deduplication messages
Name string // Producer name, valid for deduplication
QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput
BatchPublishingDelay int // Period to Send a batch of messages.
BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput. Valid only for the method Send()
BatchPublishingDelay int // Timout within the aggregation sent a batch of messages. Valid only for the method Send()
SubEntrySize int // Size of sub Entry, to aggregate more subEntry using one publishing id
Compression Compression // Compression type, it is valid only if SubEntrySize > 1
ConfirmationTimeOut time.Duration // Time to wait for the confirmation
Expand All @@ -115,6 +115,8 @@ func (po *ProducerOptions) SetQueueSize(size int) *ProducerOptions {
return po
}

// SetBatchSize sets the batch size for the producer
// Valid only for the method Send()
func (po *ProducerOptions) SetBatchSize(size int) *ProducerOptions {
po.BatchSize = size
return po
Expand Down Expand Up @@ -355,6 +357,9 @@ func (producer *Producer) sendBytes(streamMessage message.StreamMessage, message
return nil
}

// Send sends a message to the stream and returns an error if the message could not be sent.
// Send is asynchronous. The aggregation of the messages is based on the BatchSize and BatchPublishingDelay
// options. The message is sent when the aggregation is reached or the BatchPublishingDelay is reached.
func (producer *Producer) Send(streamMessage message.StreamMessage) error {
messageBytes, err := streamMessage.MarshalBinary()
if err != nil {
Expand All @@ -372,6 +377,12 @@ func (producer *Producer) assignPublishingID(message message.StreamMessage) int6
return sequence
}

// BatchSend sends a batch of messages to the stream and returns an error if the messages could not be sent.
// BatchSend is synchronous. The aggregation is up to the user. The user has to aggregate the messages
// and send them in a batch.
// BatchSend is not affected by the BatchSize and BatchPublishingDelay options.
// BatchSend is the primitive method to send messages to the stream, the method Send prepares the messages and
// calls BatchSend internally.
func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error {
var messagesSequence = make([]messageSequence, len(batchMessages))
totalBufferToSend := 0
Expand Down
Loading