From d271a2662efc97bfdd6e82a2349e9f6af4e286db Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 16 Jan 2025 09:10:18 +0100 Subject: [PATCH 1/4] check if the producer is closed when send Signed-off-by: Gabriele Santomaggio --- pkg/stream/producer.go | 17 ++++++++++++++++- pkg/stream/producer_test.go | 2 +- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index c6c3f09d..483ec465 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -362,10 +362,25 @@ func (producer *Producer) fromMessageToMessageSequence(streamMessage message.Str // and the messages in the buffer. The aggregation is up to the client. // returns an error if the message could not be sent for marshal problems or if the buffer is too large func (producer *Producer) Send(streamMessage message.StreamMessage) error { + messageSeq, err := producer.fromMessageToMessageSequence(streamMessage) if err != nil { return err } + if producer.getStatus() == closed { + producer.sendConfirmationStatus([]*ConfirmationStatus{ + { + inserted: time.Now(), + message: streamMessage, + producerID: producer.GetID(), + publishingId: messageSeq.publishingId, + confirmed: false, + err: AlreadyClosed, + }, + }) + return fmt.Errorf("producer id: %d closed", producer.id) + } + producer.unConfirmed.addFromSequence(messageSeq, &streamMessage, producer.GetID()) if len(messageSeq.messageBytes) > defaultMaxFrameSize { @@ -377,7 +392,7 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error { // se the processPendingSequencesQueue function err = producer.pendingSequencesQueue.Enqueue(messageSeq) if err != nil { - return fmt.Errorf("error during enqueue message: %s. Message will be in timed. Producer id: %d ", err, producer.id) + return fmt.Errorf("error during enqueue message: %s pending queue closed. Producer id: %d ", err, producer.id) } return nil } diff --git a/pkg/stream/producer_test.go b/pkg/stream/producer_test.go index 8cb31fc4..86d370aa 100644 --- a/pkg/stream/producer_test.go +++ b/pkg/stream/producer_test.go @@ -806,12 +806,12 @@ var _ = Describe("Streaming Producers", func() { }) It("Can't send message if the producer is closed", func() { - producer, err := testEnvironment.NewProducer(testProducerStream, nil) Expect(err).NotTo(HaveOccurred()) Expect(producer.Close()).NotTo(HaveOccurred()) err = producer.Send(amqp.NewMessage(make([]byte, 50))) Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("producer id: 0 closed")) }) }) From 284869bcd70b8e116286da7e05852efb599459c6 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 16 Jan 2025 09:18:22 +0100 Subject: [PATCH 2/4] check if the producer is closed when send Signed-off-by: Gabriele Santomaggio --- pkg/stream/constants.go | 8 +++++--- pkg/stream/producer.go | 8 ++++++++ pkg/stream/producer_test.go | 9 +++++++++ 3 files changed, 22 insertions(+), 3 deletions(-) diff --git a/pkg/stream/constants.go b/pkg/stream/constants.go index f845efeb..a0979ef4 100644 --- a/pkg/stream/constants.go +++ b/pkg/stream/constants.go @@ -81,9 +81,8 @@ const ( responseCodeNoOffset = uint16(19) /// responses out of protocol - closeChannel = uint16(60) - connectionCloseError = uint16(61) - timeoutError = uint16(62) + timeoutError = uint16(62) + entityClosed = uint16(63) /// defaultSocketCallTimeout = 10 * time.Second @@ -194,6 +193,9 @@ func lookErrorCode(errorCode uint16) error { return AuthenticationFailureLoopbackError case timeoutError: return ConfirmationTimoutError + case entityClosed: + return AlreadyClosed + default: { logs.LogWarn("Error not handled %d", errorCode) diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 483ec465..082fac17 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -418,6 +418,14 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error } // + if producer.getStatus() == closed { + for _, msg := range messagesSequence { + m := producer.unConfirmed.extractWithError(msg.publishingId, entityClosed) + producer.sendConfirmationStatus([]*ConfirmationStatus{m}) + } + return fmt.Errorf("producer id: %d closed", producer.id) + } + if totalBufferToSend+initBufferPublishSize > maxFrame { // if the totalBufferToSend is greater than the requestedMaxFrameSize // all the messages are unconfirmed diff --git a/pkg/stream/producer_test.go b/pkg/stream/producer_test.go index 86d370aa..57d91762 100644 --- a/pkg/stream/producer_test.go +++ b/pkg/stream/producer_test.go @@ -814,6 +814,15 @@ var _ = Describe("Streaming Producers", func() { Expect(err.Error()).To(ContainSubstring("producer id: 0 closed")) }) + It("Can't bach send message if the producer is closed", func() { + producer, err := testEnvironment.NewProducer(testProducerStream, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(producer.Close()).NotTo(HaveOccurred()) + err = producer.BatchSend([]message.StreamMessage{amqp.NewMessage(make([]byte, 50))}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("producer id: 0 closed")) + }) + }) func testCompress(producer *Producer) { From 0a3a5abbb48abcbc9f75dded476b618744fbe15a Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 16 Jan 2025 10:58:03 +0100 Subject: [PATCH 3/4] remove the producer from the list after removed from the server Signed-off-by: Gabriele Santomaggio --- pkg/stream/producer.go | 3 ++- pkg/stream/server_frame.go | 14 +------------- 2 files changed, 3 insertions(+), 14 deletions(-) diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 082fac17..feed78b2 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -628,7 +628,6 @@ func (producer *Producer) close(reason Event) error { logs.LogDebug("producer options is nil, the close will be ignored") return nil } - _, _ = producer.options.client.coordinator.ExtractProducerById(producer.id) if !producer.options.client.socket.isOpen() { return fmt.Errorf("tcp connection is closed") @@ -639,6 +638,8 @@ func (producer *Producer) close(reason Event) error { _ = producer.options.client.deletePublisher(producer.id) } + _, _ = producer.options.client.coordinator.ExtractProducerById(producer.id) + if producer.options.client.coordinator.ProducersCount() == 0 { _ = producer.options.client.Close() } diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 0afb3c4b..8316e9c3 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -255,23 +255,11 @@ func (c *Client) handleConfirm(readProtocol *ReaderProtocol, r *bufio.Reader) in // even the producer is not found we need to read the publishingId // to empty the buffer. // The producer here could not exist because the producer is closed before the confirmations are received - //var unConfirmedRecv []*ConfirmationStatus + var arraySeq []int64 for publishingIdCount != 0 { seq := readInt64(r) arraySeq = append(arraySeq, seq) - //if producerFound { - // m := producer.unConfirmed.extractWithConfirm(seq) - // if m != nil { - // unConfirmedRecv = append(unConfirmedRecv, m) - // - // // in case of sub-batch entry the client receives only - // // one publishingId (or sequence) - // // so the other messages are confirmed using the linkedTo - // unConfirmedRecv = append(unConfirmedRecv, m.linkedTo...) - // } - //} - publishingIdCount-- } From 66d3cef2477ddf1ffd02fea15b58652efaf14e9b Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 16 Jan 2025 11:38:21 +0100 Subject: [PATCH 4/4] remove the confirmation from the ha struct Signed-off-by: Gabriele Santomaggio --- pkg/ha/ha_publisher.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/pkg/ha/ha_publisher.go b/pkg/ha/ha_publisher.go index e6a0b1db..8f12ef34 100644 --- a/pkg/ha/ha_publisher.go +++ b/pkg/ha/ha_publisher.go @@ -63,7 +63,6 @@ type ReliableProducer struct { producerOptions *stream.ProducerOptions count int32 confirmMessageHandler ConfirmMessageHandler - channelPublishConfirm stream.ChannelPublishConfirm mutex *sync.Mutex mutexStatus *sync.Mutex status int @@ -108,8 +107,7 @@ func (p *ReliableProducer) newProducer() error { return err } - p.channelPublishConfirm = producer.NotifyPublishConfirmation() - p.handlePublishConfirm(p.channelPublishConfirm) + p.handlePublishConfirm(producer.NotifyPublishConfirmation()) channelNotifyClose := producer.NotifyClose() p.handleNotifyClose(channelNotifyClose) p.producer = producer