From d271a2662efc97bfdd6e82a2349e9f6af4e286db Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 16 Jan 2025 09:10:18 +0100 Subject: [PATCH 01/13] check if the producer is closed when send Signed-off-by: Gabriele Santomaggio --- pkg/stream/producer.go | 17 ++++++++++++++++- pkg/stream/producer_test.go | 2 +- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index c6c3f09d..483ec465 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -362,10 +362,25 @@ func (producer *Producer) fromMessageToMessageSequence(streamMessage message.Str // and the messages in the buffer. The aggregation is up to the client. // returns an error if the message could not be sent for marshal problems or if the buffer is too large func (producer *Producer) Send(streamMessage message.StreamMessage) error { + messageSeq, err := producer.fromMessageToMessageSequence(streamMessage) if err != nil { return err } + if producer.getStatus() == closed { + producer.sendConfirmationStatus([]*ConfirmationStatus{ + { + inserted: time.Now(), + message: streamMessage, + producerID: producer.GetID(), + publishingId: messageSeq.publishingId, + confirmed: false, + err: AlreadyClosed, + }, + }) + return fmt.Errorf("producer id: %d closed", producer.id) + } + producer.unConfirmed.addFromSequence(messageSeq, &streamMessage, producer.GetID()) if len(messageSeq.messageBytes) > defaultMaxFrameSize { @@ -377,7 +392,7 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error { // se the processPendingSequencesQueue function err = producer.pendingSequencesQueue.Enqueue(messageSeq) if err != nil { - return fmt.Errorf("error during enqueue message: %s. Message will be in timed. Producer id: %d ", err, producer.id) + return fmt.Errorf("error during enqueue message: %s pending queue closed. Producer id: %d ", err, producer.id) } return nil } diff --git a/pkg/stream/producer_test.go b/pkg/stream/producer_test.go index 8cb31fc4..86d370aa 100644 --- a/pkg/stream/producer_test.go +++ b/pkg/stream/producer_test.go @@ -806,12 +806,12 @@ var _ = Describe("Streaming Producers", func() { }) It("Can't send message if the producer is closed", func() { - producer, err := testEnvironment.NewProducer(testProducerStream, nil) Expect(err).NotTo(HaveOccurred()) Expect(producer.Close()).NotTo(HaveOccurred()) err = producer.Send(amqp.NewMessage(make([]byte, 50))) Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("producer id: 0 closed")) }) }) From 284869bcd70b8e116286da7e05852efb599459c6 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 16 Jan 2025 09:18:22 +0100 Subject: [PATCH 02/13] check if the producer is closed when send Signed-off-by: Gabriele Santomaggio --- pkg/stream/constants.go | 8 +++++--- pkg/stream/producer.go | 8 ++++++++ pkg/stream/producer_test.go | 9 +++++++++ 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/pkg/stream/constants.go b/pkg/stream/constants.go index f845efeb..a0979ef4 100644 --- a/pkg/stream/constants.go +++ b/pkg/stream/constants.go @@ -81,9 +81,8 @@ const ( responseCodeNoOffset = uint16(19) /// responses out of protocol - closeChannel = uint16(60) - connectionCloseError = uint16(61) - timeoutError = uint16(62) + timeoutError = uint16(62) + entityClosed = uint16(63) /// defaultSocketCallTimeout = 10 * time.Second @@ -194,6 +193,9 @@ func lookErrorCode(errorCode uint16) error { return AuthenticationFailureLoopbackError case timeoutError: return ConfirmationTimoutError + case entityClosed: + return AlreadyClosed + default: { logs.LogWarn("Error not handled %d", errorCode) diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 483ec465..082fac17 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -418,6 +418,14 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error } // + if producer.getStatus() == closed { + for _, msg := range messagesSequence { + m := producer.unConfirmed.extractWithError(msg.publishingId, entityClosed) + producer.sendConfirmationStatus([]*ConfirmationStatus{m}) + } + return fmt.Errorf("producer id: %d closed", producer.id) + } + if totalBufferToSend+initBufferPublishSize > maxFrame { // if the totalBufferToSend is greater than the requestedMaxFrameSize // all the messages are unconfirmed diff --git a/pkg/stream/producer_test.go b/pkg/stream/producer_test.go index 86d370aa..57d91762 100644 --- a/pkg/stream/producer_test.go +++ b/pkg/stream/producer_test.go @@ -814,6 +814,15 @@ var _ = Describe("Streaming Producers", func() { Expect(err.Error()).To(ContainSubstring("producer id: 0 closed")) }) + It("Can't bach send message if the producer is closed", func() { + producer, err := testEnvironment.NewProducer(testProducerStream, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(producer.Close()).NotTo(HaveOccurred()) + err = producer.BatchSend([]message.StreamMessage{amqp.NewMessage(make([]byte, 50))}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("producer id: 0 closed")) + }) + }) func testCompress(producer *Producer) { From 0a3a5abbb48abcbc9f75dded476b618744fbe15a Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 16 Jan 2025 10:58:03 +0100 Subject: [PATCH 03/13] remove the producer from the list after removed from the server Signed-off-by: Gabriele Santomaggio --- pkg/stream/producer.go | 3 ++- pkg/stream/server_frame.go | 14 +------------- 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 082fac17..feed78b2 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -628,7 +628,6 @@ func (producer *Producer) close(reason Event) error { logs.LogDebug("producer options is nil, the close will be ignored") return nil } - _, _ = producer.options.client.coordinator.ExtractProducerById(producer.id) if !producer.options.client.socket.isOpen() { return fmt.Errorf("tcp connection is closed") @@ -639,6 +638,8 @@ func (producer *Producer) close(reason Event) error { _ = producer.options.client.deletePublisher(producer.id) } + _, _ = producer.options.client.coordinator.ExtractProducerById(producer.id) + if producer.options.client.coordinator.ProducersCount() == 0 { _ = producer.options.client.Close() } diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 0afb3c4b..8316e9c3 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -255,23 +255,11 @@ func (c *Client) handleConfirm(readProtocol *ReaderProtocol, r *bufio.Reader) in // even the producer is not found we need to read the publishingId // to empty the buffer. // The producer here could not exist because the producer is closed before the confirmations are received - //var unConfirmedRecv []*ConfirmationStatus + var arraySeq []int64 for publishingIdCount != 0 { seq := readInt64(r) arraySeq = append(arraySeq, seq) - //if producerFound { - // m := producer.unConfirmed.extractWithConfirm(seq) - // if m != nil { - // unConfirmedRecv = append(unConfirmedRecv, m) - // - // // in case of sub-batch entry the client receives only - // // one publishingId (or sequence) - // // so the other messages are confirmed using the linkedTo - // unConfirmedRecv = append(unConfirmedRecv, m.linkedTo...) - // } - //} - publishingIdCount-- } From 66d3cef2477ddf1ffd02fea15b58652efaf14e9b Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 16 Jan 2025 11:38:21 +0100 Subject: [PATCH 04/13] remove the confirmation from the ha struct Signed-off-by: Gabriele Santomaggio --- pkg/ha/ha_publisher.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/ha/ha_publisher.go b/pkg/ha/ha_publisher.go index e6a0b1db..8f12ef34 100644 --- a/pkg/ha/ha_publisher.go +++ b/pkg/ha/ha_publisher.go @@ -63,7 +63,6 @@ type ReliableProducer struct { producerOptions *stream.ProducerOptions count int32 confirmMessageHandler ConfirmMessageHandler - channelPublishConfirm stream.ChannelPublishConfirm mutex *sync.Mutex mutexStatus *sync.Mutex status int @@ -108,8 +107,7 @@ func (p *ReliableProducer) newProducer() error { return err } - p.channelPublishConfirm = producer.NotifyPublishConfirmation() - p.handlePublishConfirm(p.channelPublishConfirm) + p.handlePublishConfirm(producer.NotifyPublishConfirmation()) channelNotifyClose := producer.NotifyClose() p.handleNotifyClose(channelNotifyClose) p.producer = producer From c3f950a3730b2fed099236d780553a5205c75fb5 Mon Sep 17 00:00:00 2001 From: Alberto Moretti Date: Fri, 17 Jan 2025 16:54:55 +0100 Subject: [PATCH 05/13] fix: extract with timeout when the difference is >= timeout --- pkg/stream/producer_unconfirmed.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/stream/producer_unconfirmed.go b/pkg/stream/producer_unconfirmed.go index 9d232c26..ff275397 100644 --- a/pkg/stream/producer_unconfirmed.go +++ b/pkg/stream/producer_unconfirmed.go @@ -1,9 +1,10 @@ package stream import ( - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" "sync" "time" + + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" ) // unConfirmed is a structure that holds unconfirmed messages @@ -105,7 +106,7 @@ func (u *unConfirmed) extractWithTimeOut(timeout time.Duration) []*ConfirmationS defer u.mutexMessageMap.Unlock() var res []*ConfirmationStatus for _, v := range u.messages { - if time.Since(v.inserted) > timeout { + if time.Since(v.inserted) >= timeout { v := u.extract(v.publishingId, timeoutError, false) res = append(res, v) } From 1703f2379792eeeff856e11294ae1ee57bd573d2 Mon Sep 17 00:00:00 2001 From: Alberto Moretti Date: Fri, 17 Jan 2025 19:36:58 +0100 Subject: [PATCH 06/13] perf: reduce the locks and the average size of unconfirmed messages --- pkg/stream/blocking_queue.go | 31 +++++++--------- pkg/stream/producer.go | 58 +++++++++++++++++------------- pkg/stream/producer_test.go | 37 +++++++++---------- pkg/stream/producer_unconfirmed.go | 19 +++++----- 4 files changed, 75 insertions(+), 70 deletions(-) diff --git a/pkg/stream/blocking_queue.go b/pkg/stream/blocking_queue.go index 6f66a23c..2f85841b 100644 --- a/pkg/stream/blocking_queue.go +++ b/pkg/stream/blocking_queue.go @@ -2,17 +2,17 @@ package stream import ( "errors" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" "sync/atomic" "time" + + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" ) var ErrBlockingQueueStopped = errors.New("blocking queue stopped") type BlockingQueue[T any] struct { - queue chan T - status int32 - lastUpdate int64 + queue chan T + status int32 } // NewBlockingQueue initializes a new BlockingQueue with the given capacity @@ -28,7 +28,6 @@ func (bq *BlockingQueue[T]) Enqueue(item T) error { if bq.IsStopped() { return ErrBlockingQueueStopped } - atomic.StoreInt64(&bq.lastUpdate, time.Now().UnixNano()) bq.queue <- item return nil } @@ -50,26 +49,22 @@ func (bq *BlockingQueue[T]) IsEmpty() bool { // Stop is different from Close in that it allows the // existing items to be processed. // Drain the queue to be sure there are not pending messages -func (bq *BlockingQueue[T]) Stop() { +func (bq *BlockingQueue[T]) Stop() []T { atomic.StoreInt32(&bq.status, 1) // drain the queue. To be sure there are not pending messages - // in the queue. - // it does not matter if we lose some messages here - // since there is the unConfirmed map to handle the messages - isActive := true - for isActive { + // in the queue and return to the caller the remaining pending messages + msgInQueue := make([]T, 0, len(bq.queue)) +outer: + for { select { - case <-bq.queue: - // do nothing + case msg := <-bq.queue: + msgInQueue = append(msgInQueue, msg) case <-time.After(10 * time.Millisecond): - isActive = false - return - default: - isActive = false - return + break outer } } logs.LogDebug("BlockingQueue stopped") + return msgInQueue } func (bq *BlockingQueue[T]) Close() { diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index feed78b2..ac55f7ab 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -4,11 +4,12 @@ import ( "bufio" "bytes" "fmt" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" "sync" "sync/atomic" "time" + + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" ) type ConfirmationStatus struct { @@ -51,6 +52,7 @@ func (cs *ConfirmationStatus) GetErrorCode() uint16 { } type messageSequence struct { + sourceMsg message.StreamMessage messageBytes []byte publishingId int64 filterValue string @@ -201,10 +203,6 @@ func (po *ProducerOptions) isSubEntriesBatching() bool { return po.SubEntrySize > 1 } -func (producer *Producer) lenUnConfirmed() int { - return producer.unConfirmed.size() -} - // NotifyPublishConfirmation returns a channel that receives the confirmation status of the messages sent by the producer. func (producer *Producer) NotifyPublishConfirmation() ChannelPublishConfirm { ch := make(chan []*ConfirmationStatus, 1) @@ -292,6 +290,9 @@ func (producer *Producer) processPendingSequencesQueue() { var lastError error if producer.pendingSequencesQueue.IsStopped() { + // add also the last message to sequenceToSend + // otherwise it will be lost + sequenceToSend = append(sequenceToSend, msg) break } // There is something in the queue. Checks the buffer is still less than the maxFrame @@ -299,6 +300,7 @@ func (producer *Producer) processPendingSequencesQueue() { if totalBufferToSend > maxFrame { // if the totalBufferToSend is greater than the requestedMaxFrameSize // the producer sends the messages and reset the buffer + producer.unConfirmed.addFromSequences(sequenceToSend, producer.GetID()) lastError = producer.internalBatchSend(sequenceToSend) sequenceToSend = sequenceToSend[:0] totalBufferToSend = initBufferPublishSize @@ -310,6 +312,7 @@ func (producer *Producer) processPendingSequencesQueue() { // the messages during the checks of the buffer. In this case if producer.pendingSequencesQueue.IsEmpty() || len(sequenceToSend) >= producer.options.BatchSize { if len(sequenceToSend) > 0 { + producer.unConfirmed.addFromSequences(sequenceToSend, producer.GetID()) lastError = producer.internalBatchSend(sequenceToSend) sequenceToSend = sequenceToSend[:0] totalBufferToSend += initBufferPublishSize @@ -323,7 +326,7 @@ func (producer *Producer) processPendingSequencesQueue() { // just in case there are messages in the buffer // not matter is sent or not the messages will be timed out if len(sequenceToSend) > 0 { - _ = producer.internalBatchSend(sequenceToSend) + producer.unConfirmed.addFromSequences(sequenceToSend, producer.GetID()) } }() @@ -349,10 +352,12 @@ func (producer *Producer) fromMessageToMessageSequence(streamMessage message.Str if producer.options.IsFilterEnabled() { filterValue = producer.options.Filter.FilterValue(streamMessage) } - msqSeq := &messageSequence{} - msqSeq.messageBytes = marshalBinary - msqSeq.publishingId = seq - msqSeq.filterValue = filterValue + msqSeq := &messageSequence{ + sourceMsg: streamMessage, + messageBytes: marshalBinary, + publishingId: seq, + filterValue: filterValue, + } return msqSeq, nil } @@ -362,7 +367,6 @@ func (producer *Producer) fromMessageToMessageSequence(streamMessage message.Str // and the messages in the buffer. The aggregation is up to the client. // returns an error if the message could not be sent for marshal problems or if the buffer is too large func (producer *Producer) Send(streamMessage message.StreamMessage) error { - messageSeq, err := producer.fromMessageToMessageSequence(streamMessage) if err != nil { return err @@ -381,9 +385,8 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error { return fmt.Errorf("producer id: %d closed", producer.id) } - producer.unConfirmed.addFromSequence(messageSeq, &streamMessage, producer.GetID()) - if len(messageSeq.messageBytes) > defaultMaxFrameSize { + producer.unConfirmed.addFromSequences([]*messageSequence{messageSeq}, producer.GetID()) tooLarge := producer.unConfirmed.extractWithError(messageSeq.publishingId, responseCodeFrameTooLarge) producer.sendConfirmationStatus([]*ConfirmationStatus{tooLarge}) return FrameTooLarge @@ -404,22 +407,25 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error { // returns an error if the message could not be sent for marshal problems or if the buffer is too large func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error { maxFrame := defaultMaxFrameSize - var messagesSequence = make([]*messageSequence, 0) + var messagesSequences = make([]*messageSequence, 0, len(batchMessages)) totalBufferToSend := 0 + for _, batchMessage := range batchMessages { messageSeq, err := producer.fromMessageToMessageSequence(batchMessage) if err != nil { return err } - producer.unConfirmed.addFromSequence(messageSeq, &batchMessage, producer.GetID()) totalBufferToSend += len(messageSeq.messageBytes) - messagesSequence = append(messagesSequence, messageSeq) + messagesSequences = append(messagesSequences, messageSeq) + } + + if len(messagesSequences) > 0 { + producer.unConfirmed.addFromSequences(messagesSequences, producer.GetID()) } - // if producer.getStatus() == closed { - for _, msg := range messagesSequence { + for _, msg := range messagesSequences { m := producer.unConfirmed.extractWithError(msg.publishingId, entityClosed) producer.sendConfirmationStatus([]*ConfirmationStatus{m}) } @@ -430,14 +436,14 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error // if the totalBufferToSend is greater than the requestedMaxFrameSize // all the messages are unconfirmed - for _, msg := range messagesSequence { + for _, msg := range messagesSequences { m := producer.unConfirmed.extractWithError(msg.publishingId, responseCodeFrameTooLarge) producer.sendConfirmationStatus([]*ConfirmationStatus{m}) } return FrameTooLarge } - return producer.internalBatchSend(messagesSequence) + return producer.internalBatchSend(messagesSequences) } func (producer *Producer) GetID() uint8 { @@ -659,7 +665,11 @@ func (producer *Producer) stopAndWaitPendingSequencesQueue() { // Stop the pendingSequencesQueue, so the producer can't send messages anymore // but the producer can still handle the inflight messages - producer.pendingSequencesQueue.Stop() + pendingSequences := producer.pendingSequencesQueue.Stop() + + if len(pendingSequences) > 0 { + producer.unConfirmed.addFromSequences(pendingSequences, producer.GetID()) + } // Stop the confirmationTimeoutTicker. It will flush the unconfirmed messages producer.confirmationTimeoutTicker.Stop() @@ -681,9 +691,9 @@ func (producer *Producer) waitForInflightMessages() { tentatives := 0 - for (producer.lenUnConfirmed() > 0) && tentatives < 5 { + for (producer.unConfirmed.size() > 0) && tentatives < 5 { logs.LogInfo("wait inflight messages - unconfirmed len: %d - retry: %d", - producer.lenUnConfirmed(), tentatives) + producer.unConfirmed.size(), tentatives) producer.flushUnConfirmedMessages() time.Sleep(time.Duration(500) * time.Millisecond) tentatives++ diff --git a/pkg/stream/producer_test.go b/pkg/stream/producer_test.go index 57d91762..688a72fb 100644 --- a/pkg/stream/producer_test.go +++ b/pkg/stream/producer_test.go @@ -2,14 +2,15 @@ package stream import ( "fmt" + "sync" + "sync/atomic" + "time" + "github.com/google/uuid" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" - "sync" - "sync/atomic" - "time" ) var _ = Describe("Streaming Producers", func() { @@ -273,7 +274,7 @@ var _ = Describe("Streaming Producers", func() { }, 5*time.Second).Should(Equal(int32(14)), "confirm should receive same messages Send by producer") - Expect(producer.lenUnConfirmed()).To(Equal(0)) + Expect(producer.unConfirmed.size()).To(Equal(0)) Expect(producer.Close()).NotTo(HaveOccurred()) }) @@ -288,7 +289,7 @@ var _ = Describe("Streaming Producers", func() { } Expect(producer.Close()).NotTo(HaveOccurred()) - Expect(producer.lenUnConfirmed()).To(Equal(0)) + Expect(producer.unConfirmed.size()).To(Equal(0)) Expect(producer.pendingSequencesQueue.IsEmpty()).To(Equal(true)) }) @@ -337,7 +338,7 @@ var _ = Describe("Streaming Producers", func() { }, 5*time.Second).WithPolling(300*time.Millisecond).Should(Equal(int32(101)), "confirm should receive same messages Send by producer") - Expect(producer.lenUnConfirmed()).To(Equal(0)) + Expect(producer.unConfirmed.size()).To(Equal(0)) Expect(producer.Close()).NotTo(HaveOccurred()) // in this case must raise an error since the producer is closed Expect(producer.Close()).To(HaveOccurred()) @@ -389,7 +390,7 @@ var _ = Describe("Streaming Producers", func() { }, 5*time.Second).Should(Equal(int32(101)), "confirm should receive same messages Send by producer") - Expect(producer.lenUnConfirmed()).To(Equal(0)) + Expect(producer.unConfirmed.size()).To(Equal(0)) Expect(producer.Close()).NotTo(HaveOccurred()) }) @@ -422,7 +423,7 @@ var _ = Describe("Streaming Producers", func() { }, 5*time.Second).Should(Equal(int32(10)), "confirm should receive same messages Send by producer") - Expect(producer.lenUnConfirmed()).To(Equal(0)) + Expect(producer.unConfirmed.size()).To(Equal(0)) Expect(producer.Close()).NotTo(HaveOccurred()) }) @@ -621,7 +622,7 @@ var _ = Describe("Streaming Producers", func() { }, 5*time.Second).Should(Equal(int32(232)), "confirm should receive same messages Send by producer") - Expect(producer.lenUnConfirmed()).To(Equal(0)) + Expect(producer.unConfirmed.size()).To(Equal(0)) // same test above but using batch Send var arr []message.StreamMessage @@ -639,7 +640,7 @@ var _ = Describe("Streaming Producers", func() { }, 5*time.Second).Should(Equal(int32(12*20)), "confirm should receive same messages Send by producer") - Expect(producer.lenUnConfirmed()).To(Equal(0)) + Expect(producer.unConfirmed.size()).To(Equal(0)) Expect(producer.Close()).NotTo(HaveOccurred()) @@ -753,7 +754,7 @@ var _ = Describe("Streaming Producers", func() { }, 5*time.Second).Should(Equal(int32(501)), "confirm should receive same messages Send by producer") - Expect(producer.lenUnConfirmed()).To(Equal(0)) + Expect(producer.unConfirmed.size()).To(Equal(0)) atomic.StoreInt32(&messagesConfirmed, 0) for z := 0; z < 501; z++ { @@ -766,7 +767,7 @@ var _ = Describe("Streaming Producers", func() { }, 5*time.Second).Should(Equal(int32(501*5)), "confirm should receive same messages Send by producer") - Expect(producer.lenUnConfirmed()).To(Equal(0)) + Expect(producer.unConfirmed.size()).To(Equal(0)) Expect(producer.Close()).NotTo(HaveOccurred()) }) @@ -776,7 +777,7 @@ var _ = Describe("Streaming Producers", func() { SetSubEntrySize(33).SetCompression(Compression{}.Gzip())) Expect(err).NotTo(HaveOccurred()) testCompress(producerGZIP) - Expect(producerGZIP.lenUnConfirmed()).To(Equal(0)) + Expect(producerGZIP.unConfirmed.size()).To(Equal(0)) Expect(producerGZIP.Close()).NotTo(HaveOccurred()) producerLz4, err := testEnvironment.NewProducer(testProducerStream, @@ -784,7 +785,7 @@ var _ = Describe("Streaming Producers", func() { SetSubEntrySize(55).SetCompression(Compression{}.Lz4())) Expect(err).NotTo(HaveOccurred()) testCompress(producerLz4) - Expect(producerLz4.lenUnConfirmed()).To(Equal(0)) + Expect(producerLz4.unConfirmed.size()).To(Equal(0)) Expect(producerLz4.Close()).NotTo(HaveOccurred()) producerSnappy, err := testEnvironment.NewProducer(testProducerStream, @@ -792,7 +793,7 @@ var _ = Describe("Streaming Producers", func() { SetSubEntrySize(666).SetCompression(Compression{}.Snappy())) Expect(err).NotTo(HaveOccurred()) testCompress(producerSnappy) - Expect(producerSnappy.lenUnConfirmed()).To(Equal(0)) + Expect(producerSnappy.unConfirmed.size()).To(Equal(0)) Expect(producerSnappy.Close()).NotTo(HaveOccurred()) producerZstd, err := testEnvironment.NewProducer(testProducerStream, @@ -800,7 +801,7 @@ var _ = Describe("Streaming Producers", func() { SetSubEntrySize(98).SetCompression(Compression{}.Zstd())) Expect(err).NotTo(HaveOccurred()) testCompress(producerZstd) - Expect(producerZstd.lenUnConfirmed()).To(Equal(0)) + Expect(producerZstd.unConfirmed.size()).To(Equal(0)) Expect(producerZstd.Close()).NotTo(HaveOccurred()) }) @@ -844,7 +845,7 @@ func testCompress(producer *Producer) { }, 5*time.Second).Should(Equal(int32(457)), "confirm should receive same messages Send by producer") - Expect(producer.lenUnConfirmed()).To(Equal(0)) + Expect(producer.unConfirmed.size()).To(Equal(0)) atomic.StoreInt32(&messagesConfirmed, 0) for z := 0; z < 457; z++ { @@ -906,7 +907,7 @@ func verifyProducerSent(producer *Producer, confirmationReceived *int32, message }, 10*time.Second, 1*time.Second).Should(Equal(int32(messageSent)), "confirm should receive same messages Send by producer") - Expect(producer.lenUnConfirmed()).To(Equal(0)) + Expect(producer.unConfirmed.size()).To(Equal(0)) } func runConcurrentlyAndWaitTillAllDone(threadCount int, wg *sync.WaitGroup, runner func(int)) { diff --git a/pkg/stream/producer_unconfirmed.go b/pkg/stream/producer_unconfirmed.go index ff275397..a68e2910 100644 --- a/pkg/stream/producer_unconfirmed.go +++ b/pkg/stream/producer_unconfirmed.go @@ -3,8 +3,6 @@ package stream import ( "sync" "time" - - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" ) // unConfirmed is a structure that holds unconfirmed messages @@ -32,15 +30,16 @@ func newUnConfirmed() *unConfirmed { return r } -func (u *unConfirmed) addFromSequence(message *messageSequence, source *message.StreamMessage, producerID uint8) { - +func (u *unConfirmed) addFromSequences(messages []*messageSequence, producerID uint8) { u.mutexMessageMap.Lock() - u.messages[message.publishingId] = &ConfirmationStatus{ - inserted: time.Now(), - message: *source, - producerID: producerID, - publishingId: message.publishingId, - confirmed: false, + for _, msgSeq := range messages { + u.messages[msgSeq.publishingId] = &ConfirmationStatus{ + inserted: time.Now(), + message: msgSeq.sourceMsg, + producerID: producerID, + publishingId: msgSeq.publishingId, + confirmed: false, + } } u.mutexMessageMap.Unlock() } From 6bb1d8a017a4965ec1784822deca7d98b39ed7b3 Mon Sep 17 00:00:00 2001 From: Alberto Moretti Date: Fri, 17 Jan 2025 19:57:04 +0100 Subject: [PATCH 07/13] test: update test timeout to reduce flakyness The test that needs reconnection sometimes exceeded the 2m timeout. --- Makefile | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/Makefile b/Makefile index 916814bc..07d4dcbc 100644 --- a/Makefile +++ b/Makefile @@ -32,16 +32,8 @@ $(STATICCHECK): check: $(STATICCHECK) $(STATICCHECK) ./pkg/stream -GOMOCK ?= $(GOBIN)/mockgen -GOMOCK_VERSION ?= v1.6.0 -$(GOMOCK): - go install github.com/golang/mock/mockgen@$(GOMOCK_VERSION) - -.PHONY: gomock -gomock: $(GOMOCK) - NUM_PROCS ?= 2 -TEST_TIMEOUT ?= 2m +TEST_TIMEOUT ?= 3m test: vet fmt check go run -mod=mod github.com/onsi/ginkgo/v2/ginkgo -r --procs=$(NUM_PROCS) --compilers=$(NUM_PROCS) \ --randomize-all --randomize-suites \ From 3a4737191f76af2de6222c87ee2af3e0172159e0 Mon Sep 17 00:00:00 2001 From: Alberto Moretti Date: Fri, 17 Jan 2025 20:42:57 +0100 Subject: [PATCH 08/13] perf: unconfirmed optimization --- pkg/stream/producer.go | 4 - pkg/stream/producer_unconfirmed.go | 147 ++++++++++++++++++++--------- 2 files changed, 102 insertions(+), 49 deletions(-) diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index ac55f7ab..103811aa 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -195,10 +195,6 @@ func NewProducerOptions() *ProducerOptions { } } -func (producer *Producer) GetUnConfirmed() map[int64]*ConfirmationStatus { - return producer.unConfirmed.getAll() -} - func (po *ProducerOptions) isSubEntriesBatching() bool { return po.SubEntrySize > 1 } diff --git a/pkg/stream/producer_unconfirmed.go b/pkg/stream/producer_unconfirmed.go index a68e2910..c2c28393 100644 --- a/pkg/stream/producer_unconfirmed.go +++ b/pkg/stream/producer_unconfirmed.go @@ -1,74 +1,113 @@ package stream import ( + "container/heap" "sync" "time" ) -// unConfirmed is a structure that holds unconfirmed messages -// And unconfirmed message is a message that has been sent to the broker but not yet confirmed, -// and it is added to the unConfirmed structure as soon is possible when -// -// the Send() or BatchSend() method is called -// -// The confirmation status is updated when the confirmation is received from the broker (see server_frame.go) -// or due of timeout. The Timeout is configurable, and it is calculated client side. +type priorityMessage struct { + *ConfirmationStatus + index int +} + +// priorityQueue implements heap.Interface +type priorityQueue []*priorityMessage + +func (pq priorityQueue) Len() int { return len(pq) } + +func (pq priorityQueue) Less(i, j int) bool { + // Earlier timestamps have higher priority + return pq[i].inserted.Before(pq[j].inserted) +} + +func (pq priorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *priorityQueue) Push(x interface{}) { + n := len(*pq) + item := x.(*priorityMessage) + item.index = n + *pq = append(*pq, item) +} + +func (pq *priorityQueue) Pop() interface{} { + old := *pq + n := len(old) + item := old[n-1] + old[n-1] = nil + item.index = -1 + *pq = old[0 : n-1] + return item +} + type unConfirmed struct { - messages map[int64]*ConfirmationStatus + messages map[int64]*priorityMessage + timeoutQueue priorityQueue mutexMessageMap sync.RWMutex } const DefaultUnconfirmedSize = 10_000 func newUnConfirmed() *unConfirmed { - r := &unConfirmed{ - messages: make(map[int64]*ConfirmationStatus, DefaultUnconfirmedSize), + messages: make(map[int64]*priorityMessage, DefaultUnconfirmedSize), + timeoutQueue: make(priorityQueue, 0, DefaultUnconfirmedSize), mutexMessageMap: sync.RWMutex{}, } - + heap.Init(&r.timeoutQueue) return r } func (u *unConfirmed) addFromSequences(messages []*messageSequence, producerID uint8) { u.mutexMessageMap.Lock() + defer u.mutexMessageMap.Unlock() + for _, msgSeq := range messages { - u.messages[msgSeq.publishingId] = &ConfirmationStatus{ - inserted: time.Now(), - message: msgSeq.sourceMsg, - producerID: producerID, - publishingId: msgSeq.publishingId, - confirmed: false, + pm := &priorityMessage{ + ConfirmationStatus: &ConfirmationStatus{ + inserted: time.Now(), + message: msgSeq.sourceMsg, + producerID: producerID, + publishingId: msgSeq.publishingId, + confirmed: false, + }, } + u.messages[msgSeq.publishingId] = pm + heap.Push(&u.timeoutQueue, pm) } - u.mutexMessageMap.Unlock() } func (u *unConfirmed) link(from int64, to int64) { u.mutexMessageMap.Lock() defer u.mutexMessageMap.Unlock() - r := u.messages[from] - if r != nil { - r.linkedTo = append(r.linkedTo, u.messages[to]) + + fromMsg := u.messages[from] + if fromMsg != nil { + toMsg := u.messages[to] + if toMsg != nil { + fromMsg.linkedTo = append(fromMsg.linkedTo, toMsg.ConfirmationStatus) + } } } func (u *unConfirmed) extractWithConfirms(ids []int64) []*ConfirmationStatus { u.mutexMessageMap.Lock() defer u.mutexMessageMap.Unlock() - var res []*ConfirmationStatus + var res []*ConfirmationStatus for _, v := range ids { - m := u.extract(v, 0, true) - if m != nil { - res = append(res, m) - if m.linkedTo != nil { - res = append(res, m.linkedTo...) + if msg := u.extract(v, 0, true); msg != nil { + res = append(res, msg) + if msg.linkedTo != nil { + res = append(res, msg.linkedTo...) } } } return res - } func (u *unConfirmed) extractWithError(id int64, errorCode uint16) *ConfirmationStatus { @@ -78,16 +117,31 @@ func (u *unConfirmed) extractWithError(id int64, errorCode uint16) *Confirmation } func (u *unConfirmed) extract(id int64, errorCode uint16, confirmed bool) *ConfirmationStatus { - rootMessage := u.messages[id] - if rootMessage != nil { - u.updateStatus(rootMessage, errorCode, confirmed) + pm := u.messages[id] + if pm == nil { + return nil + } + + rootMessage := pm.ConfirmationStatus + u.updateStatus(rootMessage, errorCode, confirmed) - for _, linkedMessage := range rootMessage.linkedTo { - u.updateStatus(linkedMessage, errorCode, confirmed) + for _, linkedMessage := range rootMessage.linkedTo { + u.updateStatus(linkedMessage, errorCode, confirmed) + if linkedPm := u.messages[linkedMessage.publishingId]; linkedPm != nil { + // Remove from priority queue if exists + if linkedPm.index != -1 { + heap.Remove(&u.timeoutQueue, linkedPm.index) + } delete(u.messages, linkedMessage.publishingId) } - delete(u.messages, id) } + + // Remove from priority queue if exists + if pm.index != -1 { + heap.Remove(&u.timeoutQueue, pm.index) + } + delete(u.messages, id) + return rootMessage } @@ -103,13 +157,22 @@ func (u *unConfirmed) updateStatus(rootMessage *ConfirmationStatus, errorCode ui func (u *unConfirmed) extractWithTimeOut(timeout time.Duration) []*ConfirmationStatus { u.mutexMessageMap.Lock() defer u.mutexMessageMap.Unlock() + var res []*ConfirmationStatus - for _, v := range u.messages { - if time.Since(v.inserted) >= timeout { - v := u.extract(v.publishingId, timeoutError, false) - res = append(res, v) + now := time.Now() + + for u.timeoutQueue.Len() > 0 { + pm := u.timeoutQueue[0] + if now.Sub(pm.inserted) < timeout { + break + } + + heap.Pop(&u.timeoutQueue) + if msg := u.extract(pm.publishingId, timeoutError, false); msg != nil { + res = append(res, msg) } } + return res } @@ -118,9 +181,3 @@ func (u *unConfirmed) size() int { defer u.mutexMessageMap.Unlock() return len(u.messages) } - -func (u *unConfirmed) getAll() map[int64]*ConfirmationStatus { - u.mutexMessageMap.Lock() - defer u.mutexMessageMap.Unlock() - return u.messages -} From 55945d448a45bd5884e8a3b68fd0128a62caf903 Mon Sep 17 00:00:00 2001 From: Alberto Moretti Date: Sat, 18 Jan 2025 12:57:51 +0100 Subject: [PATCH 09/13] refactor: move updateStatus as ConfirmationStatus method --- pkg/stream/producer.go | 9 +++++++++ pkg/stream/producer_unconfirmed.go | 13 ++----------- 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 103811aa..3650d410 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -51,6 +51,15 @@ func (cs *ConfirmationStatus) GetErrorCode() uint16 { return cs.errorCode } +func (cs *ConfirmationStatus) updateStatus(errorCode uint16, confirmed bool) { + cs.confirmed = confirmed + if confirmed { + return + } + cs.errorCode = errorCode + cs.err = lookErrorCode(errorCode) +} + type messageSequence struct { sourceMsg message.StreamMessage messageBytes []byte diff --git a/pkg/stream/producer_unconfirmed.go b/pkg/stream/producer_unconfirmed.go index c2c28393..1d37076e 100644 --- a/pkg/stream/producer_unconfirmed.go +++ b/pkg/stream/producer_unconfirmed.go @@ -123,10 +123,10 @@ func (u *unConfirmed) extract(id int64, errorCode uint16, confirmed bool) *Confi } rootMessage := pm.ConfirmationStatus - u.updateStatus(rootMessage, errorCode, confirmed) + rootMessage.updateStatus(errorCode, confirmed) for _, linkedMessage := range rootMessage.linkedTo { - u.updateStatus(linkedMessage, errorCode, confirmed) + linkedMessage.updateStatus(errorCode, confirmed) if linkedPm := u.messages[linkedMessage.publishingId]; linkedPm != nil { // Remove from priority queue if exists if linkedPm.index != -1 { @@ -145,15 +145,6 @@ func (u *unConfirmed) extract(id int64, errorCode uint16, confirmed bool) *Confi return rootMessage } -func (u *unConfirmed) updateStatus(rootMessage *ConfirmationStatus, errorCode uint16, confirmed bool) { - rootMessage.confirmed = confirmed - if confirmed { - return - } - rootMessage.errorCode = errorCode - rootMessage.err = lookErrorCode(errorCode) -} - func (u *unConfirmed) extractWithTimeOut(timeout time.Duration) []*ConfirmationStatus { u.mutexMessageMap.Lock() defer u.mutexMessageMap.Unlock() From 115379ae3558162a56e769b10d1a3f1d748b75ae Mon Sep 17 00:00:00 2001 From: Alberto Moretti Date: Sat, 18 Jan 2025 13:37:31 +0100 Subject: [PATCH 10/13] refactor: mark as unconfirmed with entityClosed messages that have never been sent to rabbitmq --- pkg/stream/producer.go | 35 ++++++++++++++++++++++++----------- 1 file changed, 24 insertions(+), 11 deletions(-) diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 3650d410..b54c18c9 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -377,16 +377,15 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error { return err } if producer.getStatus() == closed { - producer.sendConfirmationStatus([]*ConfirmationStatus{ - { - inserted: time.Now(), - message: streamMessage, - producerID: producer.GetID(), - publishingId: messageSeq.publishingId, - confirmed: false, - err: AlreadyClosed, - }, - }) + cs := &ConfirmationStatus{ + inserted: time.Now(), + message: streamMessage, + producerID: producer.GetID(), + publishingId: messageSeq.publishingId, + } + cs.updateStatus(entityClosed, false) + + producer.sendConfirmationStatus([]*ConfirmationStatus{cs}) return fmt.Errorf("producer id: %d closed", producer.id) } @@ -673,7 +672,21 @@ func (producer *Producer) stopAndWaitPendingSequencesQueue() { pendingSequences := producer.pendingSequencesQueue.Stop() if len(pendingSequences) > 0 { - producer.unConfirmed.addFromSequences(pendingSequences, producer.GetID()) + // Send as unconfirmed the messages in the pendingSequencesQueue, that have never been sent, + // with the "entityClosed" error. + pending := make([]*ConfirmationStatus, 0, len(pendingSequences)) + for _, ps := range pendingSequences { + cs := &ConfirmationStatus{ + inserted: time.Now(), + message: ps.sourceMsg, + producerID: producer.GetID(), + publishingId: ps.publishingId, + confirmed: false, + } + cs.updateStatus(entityClosed, false) + pending = append(pending, cs) + } + producer.sendConfirmationStatus(pending) } // Stop the confirmationTimeoutTicker. It will flush the unconfirmed messages From 954fdd4cb8692bf135803adb3bd0bf8efb62c5a8 Mon Sep 17 00:00:00 2001 From: Alberto Moretti Date: Sat, 18 Jan 2025 15:42:56 +0100 Subject: [PATCH 11/13] perf: optimize allocation on handleConfirm --- pkg/stream/producer_unconfirmed.go | 2 +- pkg/stream/server_frame.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/stream/producer_unconfirmed.go b/pkg/stream/producer_unconfirmed.go index 1d37076e..8e5922db 100644 --- a/pkg/stream/producer_unconfirmed.go +++ b/pkg/stream/producer_unconfirmed.go @@ -98,7 +98,7 @@ func (u *unConfirmed) extractWithConfirms(ids []int64) []*ConfirmationStatus { u.mutexMessageMap.Lock() defer u.mutexMessageMap.Unlock() - var res []*ConfirmationStatus + res := make([]*ConfirmationStatus, 0, len(ids)) for _, v := range ids { if msg := u.extract(v, 0, true); msg != nil { res = append(res, msg) diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 8316e9c3..f6381446 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -256,7 +256,7 @@ func (c *Client) handleConfirm(readProtocol *ReaderProtocol, r *bufio.Reader) in // to empty the buffer. // The producer here could not exist because the producer is closed before the confirmations are received - var arraySeq []int64 + arraySeq := make([]int64, 0, publishingIdCount) for publishingIdCount != 0 { seq := readInt64(r) arraySeq = append(arraySeq, seq) From 298d6fe665949619ee490f1c4d9fce6d04d25866 Mon Sep 17 00:00:00 2001 From: Alberto Moretti Date: Sat, 18 Jan 2025 18:44:54 +0100 Subject: [PATCH 12/13] perf: restore simple map for unconfirmed since it is 15% faster --- pkg/stream/producer_unconfirmed.go | 142 +++++++++-------------------- 1 file changed, 44 insertions(+), 98 deletions(-) diff --git a/pkg/stream/producer_unconfirmed.go b/pkg/stream/producer_unconfirmed.go index 8e5922db..0cc0f606 100644 --- a/pkg/stream/producer_unconfirmed.go +++ b/pkg/stream/producer_unconfirmed.go @@ -1,52 +1,20 @@ package stream import ( - "container/heap" "sync" "time" ) -type priorityMessage struct { - *ConfirmationStatus - index int -} - -// priorityQueue implements heap.Interface -type priorityQueue []*priorityMessage - -func (pq priorityQueue) Len() int { return len(pq) } - -func (pq priorityQueue) Less(i, j int) bool { - // Earlier timestamps have higher priority - return pq[i].inserted.Before(pq[j].inserted) -} - -func (pq priorityQueue) Swap(i, j int) { - pq[i], pq[j] = pq[j], pq[i] - pq[i].index = i - pq[j].index = j -} - -func (pq *priorityQueue) Push(x interface{}) { - n := len(*pq) - item := x.(*priorityMessage) - item.index = n - *pq = append(*pq, item) -} - -func (pq *priorityQueue) Pop() interface{} { - old := *pq - n := len(old) - item := old[n-1] - old[n-1] = nil - item.index = -1 - *pq = old[0 : n-1] - return item -} - +// unConfirmed is a structure that holds unconfirmed messages +// And unconfirmed message is a message that has been sent to the broker but not yet confirmed, +// and it is added to the unConfirmed structure as soon is possible when +// +// the Send() or BatchSend() method is called +// +// The confirmation status is updated when the confirmation is received from the broker (see server_frame.go) +// or due of timeout. The Timeout is configurable, and it is calculated client side. type unConfirmed struct { - messages map[int64]*priorityMessage - timeoutQueue priorityQueue + messages map[int64]*ConfirmationStatus mutexMessageMap sync.RWMutex } @@ -54,11 +22,9 @@ const DefaultUnconfirmedSize = 10_000 func newUnConfirmed() *unConfirmed { r := &unConfirmed{ - messages: make(map[int64]*priorityMessage, DefaultUnconfirmedSize), - timeoutQueue: make(priorityQueue, 0, DefaultUnconfirmedSize), + messages: make(map[int64]*ConfirmationStatus, DefaultUnconfirmedSize), mutexMessageMap: sync.RWMutex{}, } - heap.Init(&r.timeoutQueue) return r } @@ -67,30 +33,23 @@ func (u *unConfirmed) addFromSequences(messages []*messageSequence, producerID u defer u.mutexMessageMap.Unlock() for _, msgSeq := range messages { - pm := &priorityMessage{ - ConfirmationStatus: &ConfirmationStatus{ - inserted: time.Now(), - message: msgSeq.sourceMsg, - producerID: producerID, - publishingId: msgSeq.publishingId, - confirmed: false, - }, + u.messages[msgSeq.publishingId] = &ConfirmationStatus{ + inserted: time.Now(), + message: msgSeq.sourceMsg, + producerID: producerID, + publishingId: msgSeq.publishingId, + confirmed: false, } - u.messages[msgSeq.publishingId] = pm - heap.Push(&u.timeoutQueue, pm) + } } func (u *unConfirmed) link(from int64, to int64) { u.mutexMessageMap.Lock() defer u.mutexMessageMap.Unlock() - - fromMsg := u.messages[from] - if fromMsg != nil { - toMsg := u.messages[to] - if toMsg != nil { - fromMsg.linkedTo = append(fromMsg.linkedTo, toMsg.ConfirmationStatus) - } + r := u.messages[from] + if r != nil { + r.linkedTo = append(r.linkedTo, u.messages[to]) } } @@ -100,14 +59,16 @@ func (u *unConfirmed) extractWithConfirms(ids []int64) []*ConfirmationStatus { res := make([]*ConfirmationStatus, 0, len(ids)) for _, v := range ids { - if msg := u.extract(v, 0, true); msg != nil { - res = append(res, msg) - if msg.linkedTo != nil { - res = append(res, msg.linkedTo...) + m := u.extract(v, 0, true) + if m != nil { + res = append(res, m) + if m.linkedTo != nil { + res = append(res, m.linkedTo...) } } } return res + } func (u *unConfirmed) extractWithError(id int64, errorCode uint16) *ConfirmationStatus { @@ -117,53 +78,38 @@ func (u *unConfirmed) extractWithError(id int64, errorCode uint16) *Confirmation } func (u *unConfirmed) extract(id int64, errorCode uint16, confirmed bool) *ConfirmationStatus { - pm := u.messages[id] - if pm == nil { - return nil - } - - rootMessage := pm.ConfirmationStatus - rootMessage.updateStatus(errorCode, confirmed) + rootMessage := u.messages[id] + if rootMessage != nil { + u.updateStatus(rootMessage, errorCode, confirmed) - for _, linkedMessage := range rootMessage.linkedTo { - linkedMessage.updateStatus(errorCode, confirmed) - if linkedPm := u.messages[linkedMessage.publishingId]; linkedPm != nil { - // Remove from priority queue if exists - if linkedPm.index != -1 { - heap.Remove(&u.timeoutQueue, linkedPm.index) - } + for _, linkedMessage := range rootMessage.linkedTo { + u.updateStatus(linkedMessage, errorCode, confirmed) delete(u.messages, linkedMessage.publishingId) } + delete(u.messages, id) } + return rootMessage +} - // Remove from priority queue if exists - if pm.index != -1 { - heap.Remove(&u.timeoutQueue, pm.index) +func (u *unConfirmed) updateStatus(rootMessage *ConfirmationStatus, errorCode uint16, confirmed bool) { + rootMessage.confirmed = confirmed + if confirmed { + return } - delete(u.messages, id) - - return rootMessage + rootMessage.errorCode = errorCode + rootMessage.err = lookErrorCode(errorCode) } func (u *unConfirmed) extractWithTimeOut(timeout time.Duration) []*ConfirmationStatus { u.mutexMessageMap.Lock() defer u.mutexMessageMap.Unlock() - var res []*ConfirmationStatus - now := time.Now() - - for u.timeoutQueue.Len() > 0 { - pm := u.timeoutQueue[0] - if now.Sub(pm.inserted) < timeout { - break - } - - heap.Pop(&u.timeoutQueue) - if msg := u.extract(pm.publishingId, timeoutError, false); msg != nil { - res = append(res, msg) + for _, v := range u.messages { + if time.Since(v.inserted) > timeout { + v := u.extract(v.publishingId, timeoutError, false) + res = append(res, v) } } - return res } From a37b68ac72a1207908c32c91261ae378af45bd72 Mon Sep 17 00:00:00 2001 From: Alberto Moretti Date: Sat, 18 Jan 2025 18:56:58 +0100 Subject: [PATCH 13/13] refactor: move to a function the unconfirmed messages due to closed client --- pkg/stream/producer.go | 67 +++++++++++++++++++----------------------- 1 file changed, 31 insertions(+), 36 deletions(-) diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index b54c18c9..9c533d83 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -331,13 +331,36 @@ func (producer *Producer) processPendingSequencesQueue() { // just in case there are messages in the buffer // not matter is sent or not the messages will be timed out if len(sequenceToSend) > 0 { - producer.unConfirmed.addFromSequences(sequenceToSend, producer.GetID()) + producer.markUnsentAsUnconfirmed(sequenceToSend) } }() logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id) } +func (producer *Producer) markUnsentAsUnconfirmed(sequences []*messageSequence) { + if len(sequences) == 0 { + return + } + + // Send as unconfirmed the messages in the pendingSequencesQueue, + // that have never been sent, + // with the "entityClosed" error. + confirms := make([]*ConfirmationStatus, 0, len(sequences)) + for _, ps := range sequences { + cs := &ConfirmationStatus{ + inserted: time.Now(), + message: ps.sourceMsg, + producerID: producer.GetID(), + publishingId: ps.publishingId, + confirmed: false, + } + cs.updateStatus(entityClosed, false) + confirms = append(confirms, cs) + } + producer.sendConfirmationStatus(confirms) +} + func (producer *Producer) assignPublishingID(message message.StreamMessage) int64 { sequence := message.GetPublishingId() // in case of sub entry the deduplication is disabled @@ -377,15 +400,7 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error { return err } if producer.getStatus() == closed { - cs := &ConfirmationStatus{ - inserted: time.Now(), - message: streamMessage, - producerID: producer.GetID(), - publishingId: messageSeq.publishingId, - } - cs.updateStatus(entityClosed, false) - - producer.sendConfirmationStatus([]*ConfirmationStatus{cs}) + producer.markUnsentAsUnconfirmed([]*messageSequence{messageSeq}) return fmt.Errorf("producer id: %d closed", producer.id) } @@ -424,18 +439,15 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error messagesSequences = append(messagesSequences, messageSeq) } - if len(messagesSequences) > 0 { - producer.unConfirmed.addFromSequences(messagesSequences, producer.GetID()) - } - if producer.getStatus() == closed { - for _, msg := range messagesSequences { - m := producer.unConfirmed.extractWithError(msg.publishingId, entityClosed) - producer.sendConfirmationStatus([]*ConfirmationStatus{m}) - } + producer.markUnsentAsUnconfirmed(messagesSequences) return fmt.Errorf("producer id: %d closed", producer.id) } + if len(messagesSequences) > 0 { + producer.unConfirmed.addFromSequences(messagesSequences, producer.GetID()) + } + if totalBufferToSend+initBufferPublishSize > maxFrame { // if the totalBufferToSend is greater than the requestedMaxFrameSize // all the messages are unconfirmed @@ -670,24 +682,7 @@ func (producer *Producer) stopAndWaitPendingSequencesQueue() { // Stop the pendingSequencesQueue, so the producer can't send messages anymore // but the producer can still handle the inflight messages pendingSequences := producer.pendingSequencesQueue.Stop() - - if len(pendingSequences) > 0 { - // Send as unconfirmed the messages in the pendingSequencesQueue, that have never been sent, - // with the "entityClosed" error. - pending := make([]*ConfirmationStatus, 0, len(pendingSequences)) - for _, ps := range pendingSequences { - cs := &ConfirmationStatus{ - inserted: time.Now(), - message: ps.sourceMsg, - producerID: producer.GetID(), - publishingId: ps.publishingId, - confirmed: false, - } - cs.updateStatus(entityClosed, false) - pending = append(pending, cs) - } - producer.sendConfirmationStatus(pending) - } + producer.markUnsentAsUnconfirmed(pendingSequences) // Stop the confirmationTimeoutTicker. It will flush the unconfirmed messages producer.confirmationTimeoutTicker.Stop()