diff --git a/pkg/stream/brokers.go b/pkg/stream/brokers.go index 5092565c..3a7e5f59 100644 --- a/pkg/stream/brokers.go +++ b/pkg/stream/brokers.go @@ -49,7 +49,7 @@ func newBrokerDefault() *Broker { func newTCPParameterDefault() *TCPParameters { return &TCPParameters{ - RequestedHeartbeat: 60 * time.Second, + RequestedHeartbeat: defaultHeartbeat, RequestedMaxFrameSize: 1048576, WriteBuffer: 8192, ReadBuffer: 65536, diff --git a/pkg/stream/client.go b/pkg/stream/client.go index 5c01bc53..2c92e717 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -413,13 +413,15 @@ func (c *Client) DeleteStream(streamName string) error { } func (c *Client) heartBeat() { - ticker := time.NewTicker(60 * time.Second) - tickerHeatBeat := time.NewTicker(20 * time.Second) + ticker := time.NewTicker(time.Duration(c.tuneState.requestedHeartbeat) * time.Second) + tickerHeartbeat := time.NewTicker(time.Duration(c.tuneState.requestedHeartbeat-2) * time.Second) + resp := c.coordinator.NewResponseWitName("heartbeat") var heartBeatMissed int32 + go func() { for c.socket.isOpen() { - <-tickerHeatBeat.C + <-tickerHeartbeat.C if time.Since(c.getLastHeartBeat()) > time.Duration(c.tuneState.requestedHeartbeat)*time.Second { v := atomic.AddInt32(&heartBeatMissed, 1) logs.LogWarn("Missing heart beat: %d", v) @@ -432,7 +434,7 @@ func (c *Client) heartBeat() { } } - tickerHeatBeat.Stop() + tickerHeartbeat.Stop() }() go func() { diff --git a/pkg/stream/constants.go b/pkg/stream/constants.go index 22dc542c..a0c0c464 100644 --- a/pkg/stream/constants.go +++ b/pkg/stream/constants.go @@ -88,6 +88,8 @@ const ( /// defaultSocketCallTimeout = 10 * time.Second + defaultHeartbeat = 60 * time.Second + // LocalhostUriConnection = "rabbitmq-stream://guest:guest@localhost:5552/%2f" diff --git a/pkg/stream/environment.go b/pkg/stream/environment.go index 1ab06afb..3d0b17c0 100644 --- a/pkg/stream/environment.go +++ b/pkg/stream/environment.go @@ -30,6 +30,11 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) { options.RPCTimeout = defaultSocketCallTimeout } + if options.TCPParameters == nil { + options.TCPParameters = newTCPParameterDefault() + + } + client := newClient("go-stream-locator", nil, options.TCPParameters, options.SaslConfiguration, options.RPCTimeout) defer func(client *Client) { @@ -39,6 +44,12 @@ func NewEnvironment(options *EnvironmentOptions) (*Environment, error) { } }(client) + // we put a limit to the heartbeat. + // it doesn't make sense to have a heartbeat less than 3 seconds + if options.TCPParameters.RequestedHeartbeat < (3 * time.Second) { + return nil, errors.New("RequestedHeartbeat must be greater than 3 seconds") + } + if options.MaxConsumersPerClient <= 0 || options.MaxProducersPerClient <= 0 || options.MaxConsumersPerClient > 254 || options.MaxProducersPerClient > 254 { return nil, fmt.Errorf(" MaxConsumersPerClient and MaxProducersPerClient must be between 1 and 254") diff --git a/pkg/stream/environment_test.go b/pkg/stream/environment_test.go index 4a31f386..444e19fb 100644 --- a/pkg/stream/environment_test.go +++ b/pkg/stream/environment_test.go @@ -208,7 +208,7 @@ var _ = Describe("Environment test", func() { }, TCPParameters: &TCPParameters{ tlsConfig: nil, - RequestedHeartbeat: 60, + RequestedHeartbeat: defaultHeartbeat, RequestedMaxFrameSize: 1048574, WriteBuffer: 100, ReadBuffer: 200, @@ -263,6 +263,11 @@ var _ = Describe("Environment test", func() { Expect(env.options.TCPParameters.RequestedMaxFrameSize).NotTo(BeZero()) }) + It("RequestedHeartbeat should be greater then 3 seconds", func() { + _, err := NewEnvironment(NewEnvironmentOptions().SetRequestedHeartbeat(2 * time.Second)) + Expect(err).To(HaveOccurred()) + }) + }) Describe("Validation Query Offset/Sequence", func() { diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 296378d9..3cdc080e 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -94,10 +94,10 @@ func NewProducerFilter(filterValue FilterValue) *ProducerFilter { type ProducerOptions struct { client *Client streamName string - Name string // Producer name, it is useful to handle deduplication messages + Name string // Producer name, valid for deduplication QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server - BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput - BatchPublishingDelay int // Period to Send a batch of messages. + BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput. Valid only for the method Send() + BatchPublishingDelay int // Timout within the aggregation sent a batch of messages. Valid only for the method Send() SubEntrySize int // Size of sub Entry, to aggregate more subEntry using one publishing id Compression Compression // Compression type, it is valid only if SubEntrySize > 1 ConfirmationTimeOut time.Duration // Time to wait for the confirmation @@ -115,6 +115,8 @@ func (po *ProducerOptions) SetQueueSize(size int) *ProducerOptions { return po } +// SetBatchSize sets the batch size for the producer +// Valid only for the method Send() func (po *ProducerOptions) SetBatchSize(size int) *ProducerOptions { po.BatchSize = size return po @@ -355,6 +357,9 @@ func (producer *Producer) sendBytes(streamMessage message.StreamMessage, message return nil } +// Send sends a message to the stream and returns an error if the message could not be sent. +// Send is asynchronous. The aggregation of the messages is based on the BatchSize and BatchPublishingDelay +// options. The message is sent when the aggregation is reached or the BatchPublishingDelay is reached. func (producer *Producer) Send(streamMessage message.StreamMessage) error { messageBytes, err := streamMessage.MarshalBinary() if err != nil { @@ -372,6 +377,12 @@ func (producer *Producer) assignPublishingID(message message.StreamMessage) int6 return sequence } +// BatchSend sends a batch of messages to the stream and returns an error if the messages could not be sent. +// BatchSend is synchronous. The aggregation is up to the user. The user has to aggregate the messages +// and send them in a batch. +// BatchSend is not affected by the BatchSize and BatchPublishingDelay options. +// BatchSend is the primitive method to send messages to the stream, the method Send prepares the messages and +// calls BatchSend internally. func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error { var messagesSequence = make([]messageSequence, len(batchMessages)) totalBufferToSend := 0