diff --git a/pkg/ha/ha_publisher.go b/pkg/ha/ha_publisher.go index e6a0b1db..8f12ef34 100644 --- a/pkg/ha/ha_publisher.go +++ b/pkg/ha/ha_publisher.go @@ -63,7 +63,6 @@ type ReliableProducer struct { producerOptions *stream.ProducerOptions count int32 confirmMessageHandler ConfirmMessageHandler - channelPublishConfirm stream.ChannelPublishConfirm mutex *sync.Mutex mutexStatus *sync.Mutex status int @@ -108,8 +107,7 @@ func (p *ReliableProducer) newProducer() error { return err } - p.channelPublishConfirm = producer.NotifyPublishConfirmation() - p.handlePublishConfirm(p.channelPublishConfirm) + p.handlePublishConfirm(producer.NotifyPublishConfirmation()) channelNotifyClose := producer.NotifyClose() p.handleNotifyClose(channelNotifyClose) p.producer = producer diff --git a/pkg/stream/constants.go b/pkg/stream/constants.go index f845efeb..a0979ef4 100644 --- a/pkg/stream/constants.go +++ b/pkg/stream/constants.go @@ -81,9 +81,8 @@ const ( responseCodeNoOffset = uint16(19) /// responses out of protocol - closeChannel = uint16(60) - connectionCloseError = uint16(61) - timeoutError = uint16(62) + timeoutError = uint16(62) + entityClosed = uint16(63) /// defaultSocketCallTimeout = 10 * time.Second @@ -194,6 +193,9 @@ func lookErrorCode(errorCode uint16) error { return AuthenticationFailureLoopbackError case timeoutError: return ConfirmationTimoutError + case entityClosed: + return AlreadyClosed + default: { logs.LogWarn("Error not handled %d", errorCode) diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index c6c3f09d..feed78b2 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -362,10 +362,25 @@ func (producer *Producer) fromMessageToMessageSequence(streamMessage message.Str // and the messages in the buffer. The aggregation is up to the client. // returns an error if the message could not be sent for marshal problems or if the buffer is too large func (producer *Producer) Send(streamMessage message.StreamMessage) error { + messageSeq, err := producer.fromMessageToMessageSequence(streamMessage) if err != nil { return err } + if producer.getStatus() == closed { + producer.sendConfirmationStatus([]*ConfirmationStatus{ + { + inserted: time.Now(), + message: streamMessage, + producerID: producer.GetID(), + publishingId: messageSeq.publishingId, + confirmed: false, + err: AlreadyClosed, + }, + }) + return fmt.Errorf("producer id: %d closed", producer.id) + } + producer.unConfirmed.addFromSequence(messageSeq, &streamMessage, producer.GetID()) if len(messageSeq.messageBytes) > defaultMaxFrameSize { @@ -377,7 +392,7 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error { // se the processPendingSequencesQueue function err = producer.pendingSequencesQueue.Enqueue(messageSeq) if err != nil { - return fmt.Errorf("error during enqueue message: %s. Message will be in timed. Producer id: %d ", err, producer.id) + return fmt.Errorf("error during enqueue message: %s pending queue closed. Producer id: %d ", err, producer.id) } return nil } @@ -403,6 +418,14 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error } // + if producer.getStatus() == closed { + for _, msg := range messagesSequence { + m := producer.unConfirmed.extractWithError(msg.publishingId, entityClosed) + producer.sendConfirmationStatus([]*ConfirmationStatus{m}) + } + return fmt.Errorf("producer id: %d closed", producer.id) + } + if totalBufferToSend+initBufferPublishSize > maxFrame { // if the totalBufferToSend is greater than the requestedMaxFrameSize // all the messages are unconfirmed @@ -605,7 +628,6 @@ func (producer *Producer) close(reason Event) error { logs.LogDebug("producer options is nil, the close will be ignored") return nil } - _, _ = producer.options.client.coordinator.ExtractProducerById(producer.id) if !producer.options.client.socket.isOpen() { return fmt.Errorf("tcp connection is closed") @@ -616,6 +638,8 @@ func (producer *Producer) close(reason Event) error { _ = producer.options.client.deletePublisher(producer.id) } + _, _ = producer.options.client.coordinator.ExtractProducerById(producer.id) + if producer.options.client.coordinator.ProducersCount() == 0 { _ = producer.options.client.Close() } diff --git a/pkg/stream/producer_test.go b/pkg/stream/producer_test.go index 8cb31fc4..57d91762 100644 --- a/pkg/stream/producer_test.go +++ b/pkg/stream/producer_test.go @@ -806,12 +806,21 @@ var _ = Describe("Streaming Producers", func() { }) It("Can't send message if the producer is closed", func() { - producer, err := testEnvironment.NewProducer(testProducerStream, nil) Expect(err).NotTo(HaveOccurred()) Expect(producer.Close()).NotTo(HaveOccurred()) err = producer.Send(amqp.NewMessage(make([]byte, 50))) Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("producer id: 0 closed")) + }) + + It("Can't bach send message if the producer is closed", func() { + producer, err := testEnvironment.NewProducer(testProducerStream, nil) + Expect(err).NotTo(HaveOccurred()) + Expect(producer.Close()).NotTo(HaveOccurred()) + err = producer.BatchSend([]message.StreamMessage{amqp.NewMessage(make([]byte, 50))}) + Expect(err).To(HaveOccurred()) + Expect(err.Error()).To(ContainSubstring("producer id: 0 closed")) }) }) diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 0afb3c4b..8316e9c3 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -255,23 +255,11 @@ func (c *Client) handleConfirm(readProtocol *ReaderProtocol, r *bufio.Reader) in // even the producer is not found we need to read the publishingId // to empty the buffer. // The producer here could not exist because the producer is closed before the confirmations are received - //var unConfirmedRecv []*ConfirmationStatus + var arraySeq []int64 for publishingIdCount != 0 { seq := readInt64(r) arraySeq = append(arraySeq, seq) - //if producerFound { - // m := producer.unConfirmed.extractWithConfirm(seq) - // if m != nil { - // unConfirmedRecv = append(unConfirmedRecv, m) - // - // // in case of sub-batch entry the client receives only - // // one publishingId (or sequence) - // // so the other messages are confirmed using the linkedTo - // unConfirmedRecv = append(unConfirmedRecv, m.linkedTo...) - // } - //} - publishingIdCount-- }