Skip to content

Add limit to the unconfirmed messages #378

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Jan 27, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions perfTest/cmd/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var (
variableBody int
fixedBody int
batchSize int
queueSize int
subEntrySize int
compression string
exitOnError bool
Expand All @@ -58,6 +59,7 @@ func setupCli(baseCmd *cobra.Command) {
baseCmd.PersistentFlags().StringSliceVarP(&rabbitmqBrokerUrl, "uris", "", []string{stream.LocalhostUriConnection}, "Broker URLs")
baseCmd.PersistentFlags().IntVarP(&publishers, "publishers", "", 1, "Number of Publishers")
baseCmd.PersistentFlags().IntVarP(&batchSize, "batch-size", "", 200, "Batch Size, from 1 to 300")
baseCmd.PersistentFlags().IntVarP(&queueSize, "queue-size", "", 50_000, "Queue Size for the server back pressure = messages send - messages confirmed")
baseCmd.PersistentFlags().IntVarP(&subEntrySize, "sub-entry-size", "", 1, "SubEntry size, default 1. > 1 Enable the subEntryBatch")
baseCmd.PersistentFlags().StringVarP(&compression, "compression", "", "", "Compression for sub batching, none,gzip,lz4,snappy,zstd")
baseCmd.PersistentFlags().IntVarP(&consumers, "consumers", "", 1, "Number of Consumers")
Expand Down
4 changes: 2 additions & 2 deletions pkg/stream/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ const (
///
defaultWriteSocketBuffer = 8192
defaultReadSocketBuffer = 8192
defaultQueuePublisherSize = 10000
minQueuePublisherSize = 100
defaultQueuePublisherSize = 10_000
minQueuePublisherSize = 500
maxQueuePublisherSize = 1_000_000

minBatchSize = 1
Expand Down
4 changes: 3 additions & 1 deletion pkg/stream/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,12 @@ func (coordinator *Coordinator) NewProducer(
coordinator.mutex.Lock()
defer coordinator.mutex.Unlock()
dynSize := 10000
queueSize := defaultQueuePublisherSize
tickerTime := defaultConfirmationTimeOut
if parameters != nil {
dynSize = parameters.BatchSize
tickerTime = parameters.ConfirmationTimeOut
queueSize = parameters.QueueSize
}

var lastId, err = coordinator.getNextProducerItem()
Expand All @@ -67,7 +69,7 @@ func (coordinator *Coordinator) NewProducer(
var producer = &Producer{id: lastId,
options: parameters,
mutex: &sync.RWMutex{},
unConfirmed: newUnConfirmed(),
unConfirmed: newUnConfirmed(queueSize),
confirmationTimeoutTicker: time.NewTicker(tickerTime),
doneTimeoutTicker: make(chan struct{}, 1),
status: open,
Expand Down
13 changes: 7 additions & 6 deletions pkg/stream/producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,10 +105,12 @@ type ProducerOptions struct {
// Deduplication is a feature that allows the producer to avoid sending duplicate messages to the stream.
// see: https://www.rabbitmq.com/blog/2021/07/28/rabbitmq-streams-message-deduplication for more information.
// Don't use it if you don't need the deduplication.
Name string
// Deprecated: starting from 1.5.0 the QueueSize is deprecated, and it will be removed in the next releases
// It is not used anymore given the dynamic batching
QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server
Name string
QueueSize int // Internal queue to handle back-pressure.
// Default value is enough high (See defaultQueuePublisherSize). You usually don't need to change it unless high memory usage is a concern.
// High value can increase the memory usage and deal with spikes in the traffic.
// Low value can reduce the memory usage but can increase the back-pressure on the server.

BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput. Valid only for the method Send()
// Deprecated: starting from 1.5.0 the SetBatchPublishingDelay is deprecated, and it will be removed in the next releases
// It is not used anymore given the dynamic batching
Expand All @@ -134,8 +136,7 @@ func (po *ProducerOptions) SetProducerName(name string) *ProducerOptions {
return po
}

// Deprecated: starting from 1.5.0 the SetQueueSize is deprecated, and it will be removed in the next releases
// It is not used anymore given the dynamic batching
// SetQueueSize See ProducerOptions.QueueSize for more details
func (po *ProducerOptions) SetQueueSize(size int) *ProducerOptions {
po.QueueSize = size
return po
Expand Down
2 changes: 2 additions & 0 deletions pkg/stream/producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,6 +654,7 @@ var _ = Describe("Streaming Producers", func() {
It(" sub-entry batching test Aggregation", func() {
producer, err := testEnvironment.NewProducer(testProducerStream,
NewProducerOptions().SetBatchPublishingDelay(100).
SetQueueSize(1000).
SetSubEntrySize(77))
Expect(err).NotTo(HaveOccurred())
messagesSequence := make([]*messageSequence, 201)
Expand Down Expand Up @@ -737,6 +738,7 @@ var _ = Describe("Streaming Producers", func() {
It("Sub Size Publish Confirm/Send", func() {
producer, err := testEnvironment.NewProducer(testProducerStream,
NewProducerOptions().SetBatchPublishingDelay(100).
SetQueueSize(500).
SetSubEntrySize(77))
Expect(err).NotTo(HaveOccurred())
var messagesConfirmed int32
Expand Down
37 changes: 29 additions & 8 deletions pkg/stream/producer_unconfirmed.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package stream

import (
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
"sync"
"time"
)
Expand All @@ -16,22 +17,30 @@ import (
type unConfirmed struct {
messages map[int64]*ConfirmationStatus
mutexMessageMap sync.RWMutex
maxSize int
blockSignal *sync.Cond
}

const DefaultUnconfirmedSize = 10_000

func newUnConfirmed() *unConfirmed {
func newUnConfirmed(maxSize int) *unConfirmed {
r := &unConfirmed{
messages: make(map[int64]*ConfirmationStatus, DefaultUnconfirmedSize),
messages: make(map[int64]*ConfirmationStatus, maxSize),
mutexMessageMap: sync.RWMutex{},
maxSize: maxSize,
blockSignal: sync.NewCond(&sync.Mutex{}),
}
return r
}

func (u *unConfirmed) addFromSequences(messages []*messageSequence, producerID uint8) {
u.mutexMessageMap.Lock()
defer u.mutexMessageMap.Unlock()

if u.size() > u.maxSize {
logs.LogDebug("unConfirmed size: %d reached, producer blocked", u.maxSize)
u.blockSignal.L.Lock()
u.blockSignal.Wait()
u.blockSignal.L.Unlock()
}

u.mutexMessageMap.Lock()
for _, msgSeq := range messages {
u.messages[msgSeq.publishingId] = &ConfirmationStatus{
inserted: time.Now(),
Expand All @@ -40,8 +49,9 @@ func (u *unConfirmed) addFromSequences(messages []*messageSequence, producerID u
publishingId: msgSeq.publishingId,
confirmed: false,
}

}
u.mutexMessageMap.Unlock()

}

func (u *unConfirmed) link(from int64, to int64) {
Expand All @@ -67,13 +77,14 @@ func (u *unConfirmed) extractWithConfirms(ids []int64) []*ConfirmationStatus {
}
}
}
u.maybeUnLock()
return res

}

func (u *unConfirmed) extractWithError(id int64, errorCode uint16) *ConfirmationStatus {
u.mutexMessageMap.Lock()
defer u.mutexMessageMap.Unlock()
u.maybeUnLock()
return u.extract(id, errorCode, false)
}

Expand Down Expand Up @@ -110,6 +121,7 @@ func (u *unConfirmed) extractWithTimeOut(timeout time.Duration) []*ConfirmationS
res = append(res, v)
}
}
u.maybeUnLock()
return res
}

Expand All @@ -118,3 +130,12 @@ func (u *unConfirmed) size() int {
defer u.mutexMessageMap.Unlock()
return len(u.messages)
}

func (u *unConfirmed) maybeUnLock() {
if len(u.messages) < u.maxSize {
logs.LogDebug("unConfirmed size: %d back to normal, producer unblocked", u.maxSize)
u.blockSignal.L.Lock()
u.blockSignal.Signal()
u.blockSignal.L.Unlock()
}
}
8 changes: 4 additions & 4 deletions pkg/stream/server_frame.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,9 @@ func (c *Client) handleDeliver(r *bufio.Reader) {
if consumer.options.CRCCheck {
checkSum := crc32.ChecksumIEEE(bytesBuffer)
if crc != checkSum {
logs.LogError("Error during the checkSum, expected %d, checksum %d", crc, checkSum)
panic("Error during CRC")
} /// ???
logs.LogError("Error during the checkSum, expected %d, checksum %d. Tcp connection will be closed", crc, checkSum)
c.Close()
}
}

bufferReader := bytes.NewReader(bytesBuffer)
Expand Down Expand Up @@ -469,7 +469,7 @@ func (c *Client) handlePublishError(buffer *bufio.Reader) {
producer, err := c.coordinator.GetProducerById(publisherId)
if err != nil {
logs.LogWarn("producer id %d not found, publish error :%s", publisherId, lookErrorCode(code))
producer = &Producer{unConfirmed: newUnConfirmed()}
producer = &Producer{unConfirmed: newUnConfirmed(defaultQueuePublisherSize)}
} else {
unConfirmedMessage := producer.unConfirmed.extractWithError(publishingId, code)

Expand Down
Loading