diff --git a/Makefile b/Makefile index 916814bc..07d4dcbc 100644 --- a/Makefile +++ b/Makefile @@ -32,16 +32,8 @@ $(STATICCHECK): check: $(STATICCHECK) $(STATICCHECK) ./pkg/stream -GOMOCK ?= $(GOBIN)/mockgen -GOMOCK_VERSION ?= v1.6.0 -$(GOMOCK): - go install github.com/golang/mock/mockgen@$(GOMOCK_VERSION) - -.PHONY: gomock -gomock: $(GOMOCK) - NUM_PROCS ?= 2 -TEST_TIMEOUT ?= 2m +TEST_TIMEOUT ?= 3m test: vet fmt check go run -mod=mod github.com/onsi/ginkgo/v2/ginkgo -r --procs=$(NUM_PROCS) --compilers=$(NUM_PROCS) \ --randomize-all --randomize-suites \ diff --git a/pkg/ha/ha_publisher.go b/pkg/ha/ha_publisher.go index e6a0b1db..8f12ef34 100644 --- a/pkg/ha/ha_publisher.go +++ b/pkg/ha/ha_publisher.go @@ -63,7 +63,6 @@ type ReliableProducer struct { producerOptions *stream.ProducerOptions count int32 confirmMessageHandler ConfirmMessageHandler - channelPublishConfirm stream.ChannelPublishConfirm mutex *sync.Mutex mutexStatus *sync.Mutex status int @@ -108,8 +107,7 @@ func (p *ReliableProducer) newProducer() error { return err } - p.channelPublishConfirm = producer.NotifyPublishConfirmation() - p.handlePublishConfirm(p.channelPublishConfirm) + p.handlePublishConfirm(producer.NotifyPublishConfirmation()) channelNotifyClose := producer.NotifyClose() p.handleNotifyClose(channelNotifyClose) p.producer = producer diff --git a/pkg/stream/blocking_queue.go b/pkg/stream/blocking_queue.go index 6f66a23c..2f85841b 100644 --- a/pkg/stream/blocking_queue.go +++ b/pkg/stream/blocking_queue.go @@ -2,17 +2,17 @@ package stream import ( "errors" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" "sync/atomic" "time" + + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" ) var ErrBlockingQueueStopped = errors.New("blocking queue stopped") type BlockingQueue[T any] struct { - queue chan T - status int32 - lastUpdate int64 + queue chan T + status int32 } // NewBlockingQueue initializes a new BlockingQueue with the given capacity @@ -28,7 +28,6 @@ func (bq *BlockingQueue[T]) Enqueue(item T) error { if bq.IsStopped() { return ErrBlockingQueueStopped } - atomic.StoreInt64(&bq.lastUpdate, time.Now().UnixNano()) bq.queue <- item return nil } @@ -50,26 +49,22 @@ func (bq *BlockingQueue[T]) IsEmpty() bool { // Stop is different from Close in that it allows the // existing items to be processed. // Drain the queue to be sure there are not pending messages -func (bq *BlockingQueue[T]) Stop() { +func (bq *BlockingQueue[T]) Stop() []T { atomic.StoreInt32(&bq.status, 1) // drain the queue. To be sure there are not pending messages - // in the queue. - // it does not matter if we lose some messages here - // since there is the unConfirmed map to handle the messages - isActive := true - for isActive { + // in the queue and return to the caller the remaining pending messages + msgInQueue := make([]T, 0, len(bq.queue)) +outer: + for { select { - case <-bq.queue: - // do nothing + case msg := <-bq.queue: + msgInQueue = append(msgInQueue, msg) case <-time.After(10 * time.Millisecond): - isActive = false - return - default: - isActive = false - return + break outer } } logs.LogDebug("BlockingQueue stopped") + return msgInQueue } func (bq *BlockingQueue[T]) Close() { diff --git a/pkg/stream/constants.go b/pkg/stream/constants.go index f845efeb..a0979ef4 100644 --- a/pkg/stream/constants.go +++ b/pkg/stream/constants.go @@ -81,9 +81,8 @@ const ( responseCodeNoOffset = uint16(19) /// responses out of protocol - closeChannel = uint16(60) - connectionCloseError = uint16(61) - timeoutError = uint16(62) + timeoutError = uint16(62) + entityClosed = uint16(63) /// defaultSocketCallTimeout = 10 * time.Second @@ -194,6 +193,9 @@ func lookErrorCode(errorCode uint16) error { return AuthenticationFailureLoopbackError case timeoutError: return ConfirmationTimoutError + case entityClosed: + return AlreadyClosed + default: { logs.LogWarn("Error not handled %d", errorCode) diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index c6c3f09d..9c533d83 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -4,11 +4,12 @@ import ( "bufio" "bytes" "fmt" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" "sync" "sync/atomic" "time" + + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" ) type ConfirmationStatus struct { @@ -50,7 +51,17 @@ func (cs *ConfirmationStatus) GetErrorCode() uint16 { return cs.errorCode } +func (cs *ConfirmationStatus) updateStatus(errorCode uint16, confirmed bool) { + cs.confirmed = confirmed + if confirmed { + return + } + cs.errorCode = errorCode + cs.err = lookErrorCode(errorCode) +} + type messageSequence struct { + sourceMsg message.StreamMessage messageBytes []byte publishingId int64 filterValue string @@ -193,18 +204,10 @@ func NewProducerOptions() *ProducerOptions { } } -func (producer *Producer) GetUnConfirmed() map[int64]*ConfirmationStatus { - return producer.unConfirmed.getAll() -} - func (po *ProducerOptions) isSubEntriesBatching() bool { return po.SubEntrySize > 1 } -func (producer *Producer) lenUnConfirmed() int { - return producer.unConfirmed.size() -} - // NotifyPublishConfirmation returns a channel that receives the confirmation status of the messages sent by the producer. func (producer *Producer) NotifyPublishConfirmation() ChannelPublishConfirm { ch := make(chan []*ConfirmationStatus, 1) @@ -292,6 +295,9 @@ func (producer *Producer) processPendingSequencesQueue() { var lastError error if producer.pendingSequencesQueue.IsStopped() { + // add also the last message to sequenceToSend + // otherwise it will be lost + sequenceToSend = append(sequenceToSend, msg) break } // There is something in the queue. Checks the buffer is still less than the maxFrame @@ -299,6 +305,7 @@ func (producer *Producer) processPendingSequencesQueue() { if totalBufferToSend > maxFrame { // if the totalBufferToSend is greater than the requestedMaxFrameSize // the producer sends the messages and reset the buffer + producer.unConfirmed.addFromSequences(sequenceToSend, producer.GetID()) lastError = producer.internalBatchSend(sequenceToSend) sequenceToSend = sequenceToSend[:0] totalBufferToSend = initBufferPublishSize @@ -310,6 +317,7 @@ func (producer *Producer) processPendingSequencesQueue() { // the messages during the checks of the buffer. In this case if producer.pendingSequencesQueue.IsEmpty() || len(sequenceToSend) >= producer.options.BatchSize { if len(sequenceToSend) > 0 { + producer.unConfirmed.addFromSequences(sequenceToSend, producer.GetID()) lastError = producer.internalBatchSend(sequenceToSend) sequenceToSend = sequenceToSend[:0] totalBufferToSend += initBufferPublishSize @@ -323,13 +331,36 @@ func (producer *Producer) processPendingSequencesQueue() { // just in case there are messages in the buffer // not matter is sent or not the messages will be timed out if len(sequenceToSend) > 0 { - _ = producer.internalBatchSend(sequenceToSend) + producer.markUnsentAsUnconfirmed(sequenceToSend) } }() logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id) } +func (producer *Producer) markUnsentAsUnconfirmed(sequences []*messageSequence) { + if len(sequences) == 0 { + return + } + + // Send as unconfirmed the messages in the pendingSequencesQueue, + // that have never been sent, + // with the "entityClosed" error. + confirms := make([]*ConfirmationStatus, 0, len(sequences)) + for _, ps := range sequences { + cs := &ConfirmationStatus{ + inserted: time.Now(), + message: ps.sourceMsg, + producerID: producer.GetID(), + publishingId: ps.publishingId, + confirmed: false, + } + cs.updateStatus(entityClosed, false) + confirms = append(confirms, cs) + } + producer.sendConfirmationStatus(confirms) +} + func (producer *Producer) assignPublishingID(message message.StreamMessage) int64 { sequence := message.GetPublishingId() // in case of sub entry the deduplication is disabled @@ -349,10 +380,12 @@ func (producer *Producer) fromMessageToMessageSequence(streamMessage message.Str if producer.options.IsFilterEnabled() { filterValue = producer.options.Filter.FilterValue(streamMessage) } - msqSeq := &messageSequence{} - msqSeq.messageBytes = marshalBinary - msqSeq.publishingId = seq - msqSeq.filterValue = filterValue + msqSeq := &messageSequence{ + sourceMsg: streamMessage, + messageBytes: marshalBinary, + publishingId: seq, + filterValue: filterValue, + } return msqSeq, nil } @@ -366,9 +399,13 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error { if err != nil { return err } - producer.unConfirmed.addFromSequence(messageSeq, &streamMessage, producer.GetID()) + if producer.getStatus() == closed { + producer.markUnsentAsUnconfirmed([]*messageSequence{messageSeq}) + return fmt.Errorf("producer id: %d closed", producer.id) + } if len(messageSeq.messageBytes) > defaultMaxFrameSize { + producer.unConfirmed.addFromSequences([]*messageSequence{messageSeq}, producer.GetID()) tooLarge := producer.unConfirmed.extractWithError(messageSeq.publishingId, responseCodeFrameTooLarge) producer.sendConfirmationStatus([]*ConfirmationStatus{tooLarge}) return FrameTooLarge @@ -377,7 +414,7 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error { // se the processPendingSequencesQueue function err = producer.pendingSequencesQueue.Enqueue(messageSeq) if err != nil { - return fmt.Errorf("error during enqueue message: %s. Message will be in timed. Producer id: %d ", err, producer.id) + return fmt.Errorf("error during enqueue message: %s pending queue closed. Producer id: %d ", err, producer.id) } return nil } @@ -389,32 +426,40 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error { // returns an error if the message could not be sent for marshal problems or if the buffer is too large func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error { maxFrame := defaultMaxFrameSize - var messagesSequence = make([]*messageSequence, 0) + var messagesSequences = make([]*messageSequence, 0, len(batchMessages)) totalBufferToSend := 0 + for _, batchMessage := range batchMessages { messageSeq, err := producer.fromMessageToMessageSequence(batchMessage) if err != nil { return err } - producer.unConfirmed.addFromSequence(messageSeq, &batchMessage, producer.GetID()) totalBufferToSend += len(messageSeq.messageBytes) - messagesSequence = append(messagesSequence, messageSeq) + messagesSequences = append(messagesSequences, messageSeq) + } + + if producer.getStatus() == closed { + producer.markUnsentAsUnconfirmed(messagesSequences) + return fmt.Errorf("producer id: %d closed", producer.id) + } + + if len(messagesSequences) > 0 { + producer.unConfirmed.addFromSequences(messagesSequences, producer.GetID()) } - // if totalBufferToSend+initBufferPublishSize > maxFrame { // if the totalBufferToSend is greater than the requestedMaxFrameSize // all the messages are unconfirmed - for _, msg := range messagesSequence { + for _, msg := range messagesSequences { m := producer.unConfirmed.extractWithError(msg.publishingId, responseCodeFrameTooLarge) producer.sendConfirmationStatus([]*ConfirmationStatus{m}) } return FrameTooLarge } - return producer.internalBatchSend(messagesSequence) + return producer.internalBatchSend(messagesSequences) } func (producer *Producer) GetID() uint8 { @@ -605,7 +650,6 @@ func (producer *Producer) close(reason Event) error { logs.LogDebug("producer options is nil, the close will be ignored") return nil } - _, _ = producer.options.client.coordinator.ExtractProducerById(producer.id) if !producer.options.client.socket.isOpen() { return fmt.Errorf("tcp connection is closed") @@ -616,6 +660,8 @@ func (producer *Producer) close(reason Event) error { _ = producer.options.client.deletePublisher(producer.id) } + _, _ = producer.options.client.coordinator.ExtractProducerById(producer.id) + if producer.options.client.coordinator.ProducersCount() == 0 { _ = producer.options.client.Close() } @@ -635,7 +681,8 @@ func (producer *Producer) stopAndWaitPendingSequencesQueue() { // Stop the pendingSequencesQueue, so the producer can't send messages anymore // but the producer can still handle the inflight messages - producer.pendingSequencesQueue.Stop() + pendingSequences := producer.pendingSequencesQueue.Stop() + producer.markUnsentAsUnconfirmed(pendingSequences) // Stop the confirmationTimeoutTicker. It will flush the unconfirmed messages producer.confirmationTimeoutTicker.Stop() @@ -657,9 +704,9 @@ func (producer *Producer) waitForInflightMessages() { tentatives := 0 - for (producer.lenUnConfirmed() > 0) && tentatives < 5 { + for (producer.unConfirmed.size() > 0) && tentatives < 5 { logs.LogInfo("wait inflight messages - unconfirmed len: %d - retry: %d", - producer.lenUnConfirmed(), tentatives) + producer.unConfirmed.size(), tentatives) producer.flushUnConfirmedMessages() time.Sleep(time.Duration(500) * time.Millisecond) tentatives++ diff --git a/pkg/stream/producer_test.go b/pkg/stream/producer_test.go index 8cb31fc4..688a72fb 100644 --- a/pkg/stream/producer_test.go +++ b/pkg/stream/producer_test.go @@ -2,14 +2,15 @@ package stream import ( "fmt" + "sync" + "sync/atomic" + "time" + "github.com/google/uuid" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" - "sync" - "sync/atomic" - "time" ) var _ = Describe("Streaming Producers", func() { @@ -273,7 +274,7 @@ var _ = Describe("Streaming Producers", func() { }, 5*time.Second).Should(Equal(int32(14)), "confirm should receive same messages Send by producer") - Expect(producer.lenUnConfirmed()).To(Equal(0)) + Expect(producer.unConfirmed.size()).To(Equal(0)) Expect(producer.Close()).NotTo(HaveOccurred()) }) @@ -288,7 +289,7 @@ var _ = Describe("Streaming Producers", func() { } Expect(producer.Close()).NotTo(HaveOccurred()) - Expect(producer.lenUnConfirmed()).To(Equal(0)) + Expect(producer.unConfirmed.size()).To(Equal(0)) Expect(producer.pendingSequencesQueue.IsEmpty()).To(Equal(true)) }) @@ -337,7 +338,7 @@ var _ = Describe("Streaming Producers", func() { }, 5*time.Second).WithPolling(300*time.Millisecond).Should(Equal(int32(101)), "confirm should receive same messages Send by producer") - Expect(producer.lenUnConfirmed()).To(Equal(0)) + Expect(producer.unConfirmed.size()).To(Equal(0)) Expect(producer.Close()).NotTo(HaveOccurred()) // in this case must raise an error since the producer is closed Expect(producer.Close()).To(HaveOccurred()) @@ -389,7 +390,7 @@ var _ = Describe("Streaming Producers", func() { }, 5*time.Second).Should(Equal(int32(101)), "confirm should receive same messages Send by producer") - Expect(producer.lenUnConfirmed()).To(Equal(0)) + Expect(producer.unConfirmed.size()).To(Equal(0)) Expect(producer.Close()).NotTo(HaveOccurred()) }) @@ -422,7 +423,7 @@ var _ = Describe("Streaming Producers", func() { }, 5*time.Second).Should(Equal(int32(10)), "confirm should receive same messages Send by producer") - Expect(producer.lenUnConfirmed()).To(Equal(0)) + Expect(producer.unConfirmed.size()).To(Equal(0)) Expect(producer.Close()).NotTo(HaveOccurred()) }) @@ -621,7 +622,7 @@ var _ = Describe("Streaming Producers", func() { }, 5*time.Second).Should(Equal(int32(232)), "confirm should receive same messages Send by producer") - Expect(producer.lenUnConfirmed()).To(Equal(0)) + Expect(producer.unConfirmed.size()).To(Equal(0)) // same test above but using batch Send var arr []message.StreamMessage @@ -639,7 +640,7 @@ var _ = Describe("Streaming Producers", func() { }, 5*time.Second).Should(Equal(int32(12*20)), "confirm should receive same messages Send by producer") - Expect(producer.lenUnConfirmed()).To(Equal(0)) + Expect(producer.unConfirmed.size()).To(Equal(0)) Expect(producer.Close()).NotTo(HaveOccurred()) @@ -753,7 +754,7 @@ var _ = Describe("Streaming Producers", func() { }, 5*time.Second).Should(Equal(int32(501)), "confirm should receive same messages Send by producer") - Expect(producer.lenUnConfirmed()).To(Equal(0)) + Expect(producer.unConfirmed.size()).To(Equal(0)) atomic.StoreInt32(&messagesConfirmed, 0) for z := 0; z < 501; z++ { @@ -766,7 +767,7 @@ var _ = Describe("Streaming Producers", func() { }, 5*time.Second).Should(Equal(int32(501*5)), "confirm should receive same messages Send by producer") - Expect(producer.lenUnConfirmed()).To(Equal(0)) + Expect(producer.unConfirmed.size()).To(Equal(0)) Expect(producer.Close()).NotTo(HaveOccurred()) }) @@ -776,7 +777,7 @@ var _ = Describe("Streaming Producers", func() { SetSubEntrySize(33).SetCompression(Compression{}.Gzip())) Expect(err).NotTo(HaveOccurred()) testCompress(producerGZIP) - Expect(producerGZIP.lenUnConfirmed()).To(Equal(0)) + Expect(producerGZIP.unConfirmed.size()).To(Equal(0)) Expect(producerGZIP.Close()).NotTo(HaveOccurred()) producerLz4, err := testEnvironment.NewProducer(testProducerStream, @@ -784,7 +785,7 @@ var _ = Describe("Streaming Producers", func() { SetSubEntrySize(55).SetCompression(Compression{}.Lz4())) Expect(err).NotTo(HaveOccurred()) testCompress(producerLz4) - Expect(producerLz4.lenUnConfirmed()).To(Equal(0)) + Expect(producerLz4.unConfirmed.size()).To(Equal(0)) Expect(producerLz4.Close()).NotTo(HaveOccurred()) producerSnappy, err := testEnvironment.NewProducer(testProducerStream, @@ -792,7 +793,7 @@ var _ = Describe("Streaming Producers", func() { SetSubEntrySize(666).SetCompression(Compression{}.Snappy())) Expect(err).NotTo(HaveOccurred()) testCompress(producerSnappy) - Expect(producerSnappy.lenUnConfirmed()).To(Equal(0)) + Expect(producerSnappy.unConfirmed.size()).To(Equal(0)) Expect(producerSnappy.Close()).NotTo(HaveOccurred()) producerZstd, err := testEnvironment.NewProducer(testProducerStream, @@ -800,18 +801,27 @@ var _ = Describe("Streaming Producers", func() { SetSubEntrySize(98).SetCompression(Compression{}.Zstd())) Expect(err).NotTo(HaveOccurred()) testCompress(producerZstd) - Expect(producerZstd.lenUnConfirmed()).To(Equal(0)) + Expect(producerZstd.unConfirmed.size()).To(Equal(0)) Expect(producerZstd.Close()).NotTo(HaveOccurred()) }) It("Can't send message if the producer is closed", func() { - producer, err := testEnvironment.NewProducer(testProducerStream, nil) Expect(err).NotTo(HaveOccurred()) Expect(producer.Close()).NotTo(HaveOccurred()) err = producer.Send(amqp.NewMessage(make([]byte, 50))) Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("producer id: 0 closed")) + }) + + It("Can't bach send message if the producer is closed", func() { + producer, err := testEnvironment.NewProducer(testProducerStream, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(producer.Close()).NotTo(HaveOccurred()) + err = producer.BatchSend([]message.StreamMessage{amqp.NewMessage(make([]byte, 50))}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("producer id: 0 closed")) }) }) @@ -835,7 +845,7 @@ func testCompress(producer *Producer) { }, 5*time.Second).Should(Equal(int32(457)), "confirm should receive same messages Send by producer") - Expect(producer.lenUnConfirmed()).To(Equal(0)) + Expect(producer.unConfirmed.size()).To(Equal(0)) atomic.StoreInt32(&messagesConfirmed, 0) for z := 0; z < 457; z++ { @@ -897,7 +907,7 @@ func verifyProducerSent(producer *Producer, confirmationReceived *int32, message }, 10*time.Second, 1*time.Second).Should(Equal(int32(messageSent)), "confirm should receive same messages Send by producer") - Expect(producer.lenUnConfirmed()).To(Equal(0)) + Expect(producer.unConfirmed.size()).To(Equal(0)) } func runConcurrentlyAndWaitTillAllDone(threadCount int, wg *sync.WaitGroup, runner func(int)) { diff --git a/pkg/stream/producer_unconfirmed.go b/pkg/stream/producer_unconfirmed.go index 9d232c26..0cc0f606 100644 --- a/pkg/stream/producer_unconfirmed.go +++ b/pkg/stream/producer_unconfirmed.go @@ -1,7 +1,6 @@ package stream import ( - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" "sync" "time" ) @@ -22,26 +21,27 @@ type unConfirmed struct { const DefaultUnconfirmedSize = 10_000 func newUnConfirmed() *unConfirmed { - r := &unConfirmed{ messages: make(map[int64]*ConfirmationStatus, DefaultUnconfirmedSize), mutexMessageMap: sync.RWMutex{}, } - return r } -func (u *unConfirmed) addFromSequence(message *messageSequence, source *message.StreamMessage, producerID uint8) { - +func (u *unConfirmed) addFromSequences(messages []*messageSequence, producerID uint8) { u.mutexMessageMap.Lock() - u.messages[message.publishingId] = &ConfirmationStatus{ - inserted: time.Now(), - message: *source, - producerID: producerID, - publishingId: message.publishingId, - confirmed: false, + defer u.mutexMessageMap.Unlock() + + for _, msgSeq := range messages { + u.messages[msgSeq.publishingId] = &ConfirmationStatus{ + inserted: time.Now(), + message: msgSeq.sourceMsg, + producerID: producerID, + publishingId: msgSeq.publishingId, + confirmed: false, + } + } - u.mutexMessageMap.Unlock() } func (u *unConfirmed) link(from int64, to int64) { @@ -56,8 +56,8 @@ func (u *unConfirmed) link(from int64, to int64) { func (u *unConfirmed) extractWithConfirms(ids []int64) []*ConfirmationStatus { u.mutexMessageMap.Lock() defer u.mutexMessageMap.Unlock() - var res []*ConfirmationStatus + res := make([]*ConfirmationStatus, 0, len(ids)) for _, v := range ids { m := u.extract(v, 0, true) if m != nil { @@ -118,9 +118,3 @@ func (u *unConfirmed) size() int { defer u.mutexMessageMap.Unlock() return len(u.messages) } - -func (u *unConfirmed) getAll() map[int64]*ConfirmationStatus { - u.mutexMessageMap.Lock() - defer u.mutexMessageMap.Unlock() - return u.messages -} diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 0afb3c4b..f6381446 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -255,23 +255,11 @@ func (c *Client) handleConfirm(readProtocol *ReaderProtocol, r *bufio.Reader) in // even the producer is not found we need to read the publishingId // to empty the buffer. // The producer here could not exist because the producer is closed before the confirmations are received - //var unConfirmedRecv []*ConfirmationStatus - var arraySeq []int64 + + arraySeq := make([]int64, 0, publishingIdCount) for publishingIdCount != 0 { seq := readInt64(r) arraySeq = append(arraySeq, seq) - //if producerFound { - // m := producer.unConfirmed.extractWithConfirm(seq) - // if m != nil { - // unConfirmedRecv = append(unConfirmedRecv, m) - // - // // in case of sub-batch entry the client receives only - // // one publishingId (or sequence) - // // so the other messages are confirmed using the linkedTo - // unConfirmedRecv = append(unConfirmedRecv, m.linkedTo...) - // } - //} - publishingIdCount-- }