diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 9275882d..7b85e3dd 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -54,7 +54,7 @@ jobs: check-latest: true - run: make test GO_VERSION=${{ steps.setup_go.outputs.go-version }} - uses: actions/checkout@main - - uses: codecov/codecov-action@v4 + - uses: codecov/codecov-action@v5 with: fail_ci_if_error: false # optional (default = false) files: ./coverage.txt diff --git a/README.md b/README.md index c136870c..b39de3ee 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,15 @@ Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-serv ### Overview Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-server/tree/master/deps/rabbitmq_stream) +The client contains all features to interact with the RabbitMQ Stream Queues.
+ +The main structure is the `Environment` that contains the `Producer` and `Consumer` interfaces.
+ +`Producer` and `Consumer` are the main interfaces to interact with the RabbitMQ Stream Queues.
+They don't support the auto-reconnect in case of disconnection but have the events to detect it.
+ +The client provides the `ReliableProducer` and `ReliableConsumer` that support the auto-reconnect in case of disconnection.
+See also the [Reliable Producer and Reliable Consumer](#reliable-producer-and-reliable-consumer) section. ### Installing diff --git a/pkg/amqp/types.go b/pkg/amqp/types.go index be59a157..c605b72c 100644 --- a/pkg/amqp/types.go +++ b/pkg/amqp/types.go @@ -409,8 +409,6 @@ type Message struct { //receiver *Receiver // Receiver the message was received from settled bool // whether transfer was settled by sender - // doneSignal is a channel that indicate when a message is considered acted upon by downstream handler - doneSignal chan struct{} } // AMQP10 is an AMQP 1.0 message with the necessary fields to work with the @@ -501,16 +499,7 @@ func (amqp *AMQP10) GetAMQPValue() interface{} { // more complex usages. func newMessage(data []byte) *Message { return &Message{ - Data: [][]byte{data}, - doneSignal: make(chan struct{}), - } -} - -// done closes the internal doneSignal channel to let the receiver know that this message has been acted upon -func (m *Message) done() { - // TODO: move initialization in ctor and use ctor everywhere? - if m.doneSignal != nil { - close(m.doneSignal) + Data: [][]byte{data}, } } @@ -523,15 +512,6 @@ func (m *Message) GetData() []byte { return m.Data[0] } -// Ignore notifies the amqp message pump that the message has been handled -// without any disposition. It frees the amqp receiver to get the next message -// this is implicitly done after calling message dispositions (Accept/Release/Reject/Modify) -func (m *Message) Ignore() { - if m.shouldSendDisposition() { - m.done() - } -} - // MarshalBinary encodes the message into binary form. func (m *Message) MarshalBinary() ([]byte, error) { buf := new(buffer) diff --git a/pkg/ha/ha_consumer.go b/pkg/ha/ha_consumer.go index 24e8bb9d..4ceec9ea 100644 --- a/pkg/ha/ha_consumer.go +++ b/pkg/ha/ha_consumer.go @@ -88,7 +88,6 @@ func NewReliableConsumer(env *stream.Environment, streamName string, logs.LogDebug("[Reliable] - creating %s", res.getInfo()) err := res.newConsumer() if err == nil { - res.setStatus(StatusOpen) } logs.LogDebug("[Reliable] - created %s", res.getInfo()) diff --git a/pkg/stream/aggregation_test.go b/pkg/stream/aggregation_test.go index e4c16375..8ec901ff 100644 --- a/pkg/stream/aggregation_test.go +++ b/pkg/stream/aggregation_test.go @@ -18,9 +18,8 @@ var _ = Describe("Compression algorithms", func() { } message := &messageSequence{ - messageBytes: messagePayload, - unCompressedSize: len(messagePayload), - publishingId: 0, + messageBytes: messagePayload, + publishingId: 0, } entries = &subEntries{ diff --git a/pkg/stream/blocking_queue.go b/pkg/stream/blocking_queue.go index d949b6e0..6f66a23c 100644 --- a/pkg/stream/blocking_queue.go +++ b/pkg/stream/blocking_queue.go @@ -10,8 +10,9 @@ import ( var ErrBlockingQueueStopped = errors.New("blocking queue stopped") type BlockingQueue[T any] struct { - queue chan T - status int32 + queue chan T + status int32 + lastUpdate int64 } // NewBlockingQueue initializes a new BlockingQueue with the given capacity @@ -27,28 +28,13 @@ func (bq *BlockingQueue[T]) Enqueue(item T) error { if bq.IsStopped() { return ErrBlockingQueueStopped } + atomic.StoreInt64(&bq.lastUpdate, time.Now().UnixNano()) bq.queue <- item - return nil } -// Dequeue removes an item from the queue with a timeout -func (bq *BlockingQueue[T]) Dequeue(timeout time.Duration) T { - if bq.IsStopped() { - var zeroValue T // Zero value of type T - return zeroValue - } - select { - case item, ok := <-bq.queue: - if !ok { - var zeroValue T // Zero value of type T - return zeroValue - } - return item - case <-time.After(timeout): - var zeroValue T // Zero value of type T - return zeroValue - } +func (bq *BlockingQueue[T]) GetChannel() chan T { + return bq.queue } func (bq *BlockingQueue[T]) Size() int { diff --git a/pkg/stream/client.go b/pkg/stream/client.go index 9e25a992..e8700c7b 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -64,7 +64,6 @@ type Client struct { saslConfiguration *SaslConfiguration mutex *sync.Mutex - metadataListener metadataListener lastHeartBeat HeartBeat socketCallTimeout time.Duration availableFeatures *availableFeatures @@ -112,14 +111,10 @@ func newClient(connectionName string, broker *Broker, } func (c *Client) getSocket() *socket { - //c.mutex.Lock() - //defer c.mutex.Unlock() return &c.socket } func (c *Client) setSocketConnection(connection net.Conn) { - //c.mutex.Lock() - //defer c.mutex.Unlock() c.socket.connection = connection c.socket.writer = bufio.NewWriter(connection) } @@ -432,7 +427,7 @@ func (c *Client) heartBeat() { tickerHeartbeat.Stop() return case <-tickerHeartbeat.C: - for c.socket.isOpen() { + if c.socket.isOpen() { if time.Since(c.getLastHeartBeat()) > time.Duration(c.tuneState.requestedHeartbeat)*time.Second { v := atomic.AddInt32(&heartBeatMissed, 1) logs.LogWarn("Missing heart beat: %d", v) @@ -512,10 +507,6 @@ func (c *Client) Close() error { } } - if c.metadataListener != nil { - close(c.metadataListener) - c.metadataListener = nil - } c.closeHartBeat() if c.getSocket().isOpen() { @@ -747,6 +738,7 @@ func (c *Client) BrokerForConsumer(stream string) (*Broker, error) { streamMetadata := streamsMetadata.Get(stream) if streamMetadata.responseCode != responseCodeOk { + return nil, lookErrorCode(streamMetadata.responseCode) } @@ -883,6 +875,10 @@ func (c *Client) DeclareSubscriber(streamName string, return nil, fmt.Errorf("message count before storage must be bigger than one") } + if messagesHandler == nil { + return nil, fmt.Errorf("messages Handler must be set") + } + if options.Offset.isLastConsumed() { lastOffset, err := c.queryOffset(options.ConsumerName, streamName) switch { @@ -992,12 +988,8 @@ func (c *Client) DeclareSubscriber(streamName string, go func() { for { select { - case code := <-consumer.response.code: - if code.id == closeChannel { - return - } - case chunk, ok := <-consumer.response.chunkForConsumer: + case chunk, ok := <-consumer.chunkForConsumer: if !ok { return } diff --git a/pkg/stream/constants.go b/pkg/stream/constants.go index a19ab723..91befece 100644 --- a/pkg/stream/constants.go +++ b/pkg/stream/constants.go @@ -88,7 +88,8 @@ const ( /// defaultSocketCallTimeout = 10 * time.Second - defaultHeartbeat = 60 * time.Second + defaultHeartbeat = 60 * time.Second + defaultMaxFrameSize = 1048574 // LocalhostUriConnection = "rabbitmq-stream://guest:guest@localhost:5552/%2f" @@ -119,6 +120,7 @@ const ( LeaderLocatorBalanced = "balanced" LeaderLocatorClientLocal = "client-local" DeletePublisher = "deletePublisher" + UnSubscribe = "unSubscribe" StreamTcpPort = "5552" diff --git a/pkg/stream/consumer.go b/pkg/stream/consumer.go index e9ff54bc..9add7319 100644 --- a/pkg/stream/consumer.go +++ b/pkg/stream/consumer.go @@ -10,12 +10,13 @@ import ( ) type Consumer struct { - ID uint8 - response *Response - options *ConsumerOptions - onClose onInternalClose - mutex *sync.Mutex - MessagesHandler MessagesHandler + ID uint8 + response *Response + options *ConsumerOptions + onClose onInternalClose + mutex *sync.Mutex + chunkForConsumer chan chunkInfo + MessagesHandler MessagesHandler // different form ConsumerOptions.offset. ConsumerOptions.offset is just the configuration // and won't change. currentOffset is the status of the offset currentOffset int64 @@ -312,52 +313,65 @@ func (consumer *Consumer) Close() error { if consumer.getStatus() == closed { return AlreadyClosed } - consumer.cacheStoreOffset() + return consumer.close(Event{ + Command: CommandUnsubscribe, + StreamName: consumer.GetStreamName(), + Name: consumer.GetName(), + Reason: UnSubscribe, + Err: nil, + }) +} - consumer.setStatus(closed) - _, errGet := consumer.options.client.coordinator.GetConsumerById(consumer.ID) - if errGet != nil { +func (consumer *Consumer) close(reason Event) error { + + if consumer.options == nil { + // the config is usually set. this check is just to avoid panic and to make some test + // easier to write + logs.LogDebug("consumer options is nil, the close will be ignored") return nil } - length := 2 + 2 + 4 + 1 - resp := consumer.options.client.coordinator.NewResponse(CommandUnsubscribe) - correlationId := resp.correlationid - var b = bytes.NewBuffer(make([]byte, 0, length+4)) - writeProtocolHeader(b, length, CommandUnsubscribe, - correlationId) + consumer.cacheStoreOffset() + consumer.setStatus(closed) - writeByte(b, consumer.ID) - err := consumer.options.client.handleWrite(b.Bytes(), resp) - if err.Err != nil && err.isTimeout { - return err.Err + if closeHandler := consumer.GetCloseHandler(); closeHandler != nil { + closeHandler <- reason + close(consumer.closeHandler) + consumer.closeHandler = nil } - errC := consumer.options.client.coordinator.RemoveConsumerById(consumer.ID, Event{ - Command: CommandUnsubscribe, - StreamName: consumer.GetStreamName(), - Name: consumer.GetName(), - Reason: "unSubscribe", - Err: nil, - }) - - if errC != nil { - logs.LogWarn("Error during remove consumer id:%s", errC) + close(consumer.chunkForConsumer) + if consumer.response.data != nil { + close(consumer.response.data) + consumer.response.data = nil } - if consumer.options.client.coordinator.ConsumersCount() == 0 { - err := consumer.options.client.Close() - if err != nil { - return err + if reason.Reason == UnSubscribe { + length := 2 + 2 + 4 + 1 + resp := consumer.options.client.coordinator.NewResponse(CommandUnsubscribe) + correlationId := resp.correlationid + var b = bytes.NewBuffer(make([]byte, 0, length+4)) + writeProtocolHeader(b, length, CommandUnsubscribe, + correlationId) + + writeByte(b, consumer.ID) + err := consumer.options.client.handleWrite(b.Bytes(), resp) + if err.Err != nil && err.isTimeout { + logs.LogWarn("error during consumer unsubscribe:%s", err.Err) } } + _, _ = consumer.options.client.coordinator.ExtractConsumerById(consumer.ID) + + if consumer.options != nil && consumer.options.client.coordinator.ConsumersCount() == 0 { + _ = consumer.options.client.Close() + } ch := make(chan uint8, 1) ch <- consumer.ID consumer.onClose(ch) close(ch) - return err.Err + return nil } func (consumer *Consumer) cacheStoreOffset() { @@ -365,7 +379,7 @@ func (consumer *Consumer) cacheStoreOffset() { consumer.mutex.Lock() consumer.lastAutoCommitStored = time.Now() consumer.messageCountBeforeStorage = 0 - consumer.mutex.Unlock() // updateLastStoredOffset() in internalStoreOffset() also locks mutex, so not using defer for unlock + consumer.mutex.Unlock() // updateLastStoredOffset() in internalStoreOffset() also locks mutexMessageMap, so not using defer for unlock err := consumer.internalStoreOffset() if err != nil { diff --git a/pkg/stream/consumer_test.go b/pkg/stream/consumer_test.go index 49406ed8..e9dc6a43 100644 --- a/pkg/stream/consumer_test.go +++ b/pkg/stream/consumer_test.go @@ -123,7 +123,7 @@ var _ = Describe("Streaming Consumers", func() { Eventually(func() int32 { return atomic.LoadInt32(&commandIdRecv) }, 5*time.Second).Should(Equal(int32(CommandUnsubscribe)), - "command received should be CommandMetadataUpdate ") + "command received should be unSubscribe ") Expect(err).NotTo(HaveOccurred()) }) @@ -641,6 +641,13 @@ var _ = Describe("Streaming Consumers", func() { NewAutoCommitStrategy().SetFlushInterval(10*time.Millisecond))) Expect(err).To(HaveOccurred()) + // message handler must be set + _, err = env.NewConsumer(streamName, + nil, &ConsumerOptions{ + Offset: OffsetSpecification{}, + }) + Expect(err).To(HaveOccurred()) + }) It("Sub Batch consumer with different publishers GZIP and Not", func() { diff --git a/pkg/stream/coordinator.go b/pkg/stream/coordinator.go index a2d9b198..4d8b6f62 100644 --- a/pkg/stream/coordinator.go +++ b/pkg/stream/coordinator.go @@ -37,7 +37,6 @@ type chunkInfo struct { type Response struct { code chan Code data chan interface{} - chunkForConsumer chan chunkInfo commandDescription string correlationid int } @@ -84,20 +83,8 @@ func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event) if err != nil { return err } - consumer.setStatus(closed) - reason.StreamName = consumer.GetStreamName() - reason.Name = consumer.GetName() - - if closeHandler := consumer.GetCloseHandler(); closeHandler != nil { - closeHandler <- reason - close(closeHandler) - closeHandler = nil - } - - close(consumer.response.chunkForConsumer) - close(consumer.response.code) + return consumer.close(reason) - return nil } func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error { @@ -117,7 +104,6 @@ func (coordinator *Coordinator) RemoveResponseById(id interface{}) error { err = coordinator.removeById(fmt.Sprintf("%d", id), coordinator.responses) close(resp.code) close(resp.data) - close(resp.chunkForConsumer) return err } @@ -131,7 +117,6 @@ func newResponse(commandDescription string) *Response { res.commandDescription = commandDescription res.code = make(chan Code, 1) res.data = make(chan interface{}, 1) - res.chunkForConsumer = make(chan chunkInfo, 100) return res } @@ -200,6 +185,7 @@ func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler, lastStoredOffset: -1, // because 0 is a valid value for the offset isPromotedAsActive: true, lastAutoCommitStored: time.Now(), + chunkForConsumer: make(chan chunkInfo, 100), } coordinator.consumers[lastId] = item diff --git a/pkg/stream/coordinator_test.go b/pkg/stream/coordinator_test.go index bfc48771..322de58a 100644 --- a/pkg/stream/coordinator_test.go +++ b/pkg/stream/coordinator_test.go @@ -172,6 +172,7 @@ var _ = Describe("Coordinator", func() { Command: 0, StreamName: "UNIT_TESTS", Name: "", + Reason: "UNIT_TEST", Err: nil, }) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/stream/environment.go b/pkg/stream/environment.go index 2a4c52a5..06499cc6 100644 --- a/pkg/stream/environment.go +++ b/pkg/stream/environment.go @@ -515,12 +515,7 @@ func (c *Client) maybeCleanProducers(streamName string) { } } c.mutex.Unlock() - if c.coordinator.ProducersCount() == 0 { - err := c.Close() - if err != nil { - return - } - } + } func (c *Client) maybeCleanConsumers(streamName string) { @@ -540,12 +535,6 @@ func (c *Client) maybeCleanConsumers(streamName string) { } } c.mutex.Unlock() - if c.coordinator.ConsumersCount() == 0 { - err := c.Close() - if err != nil { - return - } - } } func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, streamName string, @@ -603,19 +592,6 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP func (cc *environmentCoordinator) newClientForProducer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, rpcTimeOut time.Duration) *Client { clientResult := newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeOut) - chMeta := make(chan metaDataUpdateEvent, 1) - clientResult.metadataListener = chMeta - go func(ch <-chan metaDataUpdateEvent, cl *Client) { - for metaDataUpdateEvent := range ch { - clientResult.maybeCleanProducers(metaDataUpdateEvent.StreamName) - cc.maybeCleanClients() - if !cl.socket.isOpen() { - return - } - } - - }(chMeta, clientResult) - cc.nextId++ cc.clientsPerContext[cc.nextId] = clientResult return clientResult @@ -638,19 +614,6 @@ func (cc *environmentCoordinator) newConsumer(connectionName string, leader *Bro if clientResult == nil { clientResult = newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeout) - chMeta := make(chan metaDataUpdateEvent) - clientResult.metadataListener = chMeta - go func(ch <-chan metaDataUpdateEvent, cl *Client) { - for metaDataUpdateEvent := range ch { - clientResult.maybeCleanConsumers(metaDataUpdateEvent.StreamName) - cc.maybeCleanClients() - if !cl.socket.isOpen() { - return - } - } - - }(chMeta, clientResult) - cc.nextId++ cc.clientsPerContext[cc.nextId] = clientResult } @@ -672,9 +635,11 @@ func (cc *environmentCoordinator) Close() error { cc.mutexContext.Lock() defer cc.mutexContext.Unlock() for _, client := range cc.clientsPerContext { - err := client.Close() - if err != nil { - logs.LogWarn("Error during close the client, %s", err) + for i := range client.coordinator.producers { + _ = client.coordinator.producers[i].(*Producer).Close() + } + for i := range client.coordinator.consumers { + _ = client.coordinator.consumers[i].(*Consumer).Close() } } return nil diff --git a/pkg/stream/environment_test.go b/pkg/stream/environment_test.go index 444e19fb..7177dfc7 100644 --- a/pkg/stream/environment_test.go +++ b/pkg/stream/environment_test.go @@ -209,7 +209,7 @@ var _ = Describe("Environment test", func() { TCPParameters: &TCPParameters{ tlsConfig: nil, RequestedHeartbeat: defaultHeartbeat, - RequestedMaxFrameSize: 1048574, + RequestedMaxFrameSize: defaultMaxFrameSize, WriteBuffer: 100, ReadBuffer: 200, NoDelay: false, diff --git a/pkg/stream/listeners.go b/pkg/stream/listeners.go index b5c74b20..f0750226 100644 --- a/pkg/stream/listeners.go +++ b/pkg/stream/listeners.go @@ -8,13 +8,7 @@ type Event struct { Err error } -type metaDataUpdateEvent struct { - StreamName string - code uint16 -} - type onInternalClose func(ch <-chan uint8) -type metadataListener chan metaDataUpdateEvent type ChannelClose = <-chan Event type ChannelPublishConfirm chan []*ConfirmationStatus diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index a6bdb773..c6c3f09d 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -51,11 +51,9 @@ func (cs *ConfirmationStatus) GetErrorCode() uint16 { } type messageSequence struct { - messageBytes []byte - unCompressedSize int - publishingId int64 - filterValue string - refMessage *message.StreamMessage + messageBytes []byte + publishingId int64 + filterValue string } type Producer struct { @@ -101,7 +99,7 @@ type ProducerOptions struct { // It is not used anymore given the dynamic batching QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput. Valid only for the method Send() - // Deprecated: starting from 1.5.0 the BatchPublishingDelay is deprecated, and it will be removed in the next releases + // Deprecated: starting from 1.5.0 the SetBatchPublishingDelay is deprecated, and it will be removed in the next releases // It is not used anymore given the dynamic batching BatchPublishingDelay int // Timout within the aggregation sent a batch of messages. Valid only for the method Send() // Size of sub Entry, to aggregate more subEntry using one publishing id @@ -287,33 +285,27 @@ func (producer *Producer) closeConfirmationStatus() { func (producer *Producer) processPendingSequencesQueue() { maxFrame := producer.options.client.getTuneState().requestedMaxFrameSize - // the buffer is initialized with the size of the header - sequenceToSend := make([]*messageSequence, 0) go func() { + sequenceToSend := make([]*messageSequence, 0) totalBufferToSend := initBufferPublishSize - for { + for msg := range producer.pendingSequencesQueue.GetChannel() { var lastError error - // the dequeue is blocking with a timeout of 500ms - // as soon as a message is available the Dequeue will be unblocked - msg := producer.pendingSequencesQueue.Dequeue(time.Millisecond * 500) + if producer.pendingSequencesQueue.IsStopped() { break } - - if msg != nil { - // There is something in the queue.Checks the buffer is still less than the maxFrame - totalBufferToSend += msg.unCompressedSize - if totalBufferToSend > maxFrame { - // if the totalBufferToSend is greater than the requestedMaxFrameSize - // the producer sends the messages and reset the buffer - lastError = producer.internalBatchSend(sequenceToSend) - sequenceToSend = sequenceToSend[:0] - totalBufferToSend = initBufferPublishSize - } - - sequenceToSend = append(sequenceToSend, msg) + // There is something in the queue. Checks the buffer is still less than the maxFrame + totalBufferToSend += len(msg.messageBytes) + if totalBufferToSend > maxFrame { + // if the totalBufferToSend is greater than the requestedMaxFrameSize + // the producer sends the messages and reset the buffer + lastError = producer.internalBatchSend(sequenceToSend) + sequenceToSend = sequenceToSend[:0] + totalBufferToSend = initBufferPublishSize } + sequenceToSend = append(sequenceToSend, msg) + // if producer.pendingSequencesQueue.IsEmpty() means that the queue is empty so the producer is not sending // the messages during the checks of the buffer. In this case if producer.pendingSequencesQueue.IsEmpty() || len(sequenceToSend) >= producer.options.BatchSize { @@ -327,8 +319,15 @@ func (producer *Producer) processPendingSequencesQueue() { logs.LogError("error during sending messages: %s", lastError) } } - logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id) + + // just in case there are messages in the buffer + // not matter is sent or not the messages will be timed out + if len(sequenceToSend) > 0 { + _ = producer.internalBatchSend(sequenceToSend) + } + }() + logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id) } func (producer *Producer) assignPublishingID(message message.StreamMessage) int64 { @@ -350,30 +349,26 @@ func (producer *Producer) fromMessageToMessageSequence(streamMessage message.Str if producer.options.IsFilterEnabled() { filterValue = producer.options.Filter.FilterValue(streamMessage) } - - return &messageSequence{ - messageBytes: marshalBinary, - unCompressedSize: len(marshalBinary), - publishingId: seq, - filterValue: filterValue, - refMessage: &streamMessage, - }, nil - + msqSeq := &messageSequence{} + msqSeq.messageBytes = marshalBinary + msqSeq.publishingId = seq + msqSeq.filterValue = filterValue + return msqSeq, nil } // Send sends a message to the stream and returns an error if the message could not be sent. // The Send is asynchronous. The message is sent to a channel ant then other goroutines aggregate and sent the messages // The Send is dynamic so the number of messages sent decided internally based on the BatchSize -// and the messages contained in the buffer. The aggregation is up to the client. +// and the messages in the buffer. The aggregation is up to the client. // returns an error if the message could not be sent for marshal problems or if the buffer is too large func (producer *Producer) Send(streamMessage message.StreamMessage) error { messageSeq, err := producer.fromMessageToMessageSequence(streamMessage) if err != nil { return err } - producer.unConfirmed.addFromSequence(messageSeq, producer.GetID()) + producer.unConfirmed.addFromSequence(messageSeq, &streamMessage, producer.GetID()) - if len(messageSeq.messageBytes) > producer.options.client.getTuneState().requestedMaxFrameSize { + if len(messageSeq.messageBytes) > defaultMaxFrameSize { tooLarge := producer.unConfirmed.extractWithError(messageSeq.publishingId, responseCodeFrameTooLarge) producer.sendConfirmationStatus([]*ConfirmationStatus{tooLarge}) return FrameTooLarge @@ -393,7 +388,7 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error { // BatchSend is not affected by the BatchSize and BatchPublishingDelay options. // returns an error if the message could not be sent for marshal problems or if the buffer is too large func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error { - maxFrame := producer.options.client.getTuneState().requestedMaxFrameSize + maxFrame := defaultMaxFrameSize var messagesSequence = make([]*messageSequence, 0) totalBufferToSend := 0 for _, batchMessage := range batchMessages { @@ -401,7 +396,7 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error if err != nil { return err } - producer.unConfirmed.addFromSequence(messageSeq, producer.GetID()) + producer.unConfirmed.addFromSequence(messageSeq, &batchMessage, producer.GetID()) totalBufferToSend += len(messageSeq.messageBytes) messagesSequence = append(messagesSequence, messageSeq) @@ -419,7 +414,6 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error return FrameTooLarge } - // all the messages are unconfirmed return producer.internalBatchSend(messagesSequence) } @@ -526,7 +520,7 @@ func (producer *Producer) internalBatchSendProdId(messagesSequence []*messageSeq if !producer.options.isSubEntriesBatching() { for _, msg := range messagesSequence { - msgLen += msg.unCompressedSize + 8 + 4 + msgLen += len(msg.messageBytes) + 8 + 4 } } @@ -606,9 +600,12 @@ func (producer *Producer) close(reason Event) error { producer.closeConfirmationStatus() if producer.options == nil { + // the options are usually not nil. This is just for safety and for to make some + // test easier to write + logs.LogDebug("producer options is nil, the close will be ignored") return nil } - _ = producer.options.client.coordinator.RemoveProducerById(producer.id, reason) + _, _ = producer.options.client.coordinator.ExtractProducerById(producer.id) if !producer.options.client.socket.isOpen() { return fmt.Errorf("tcp connection is closed") @@ -620,10 +617,7 @@ func (producer *Producer) close(reason Event) error { } if producer.options.client.coordinator.ProducersCount() == 0 { - err := producer.options.client.Close() - if err != nil { - logs.LogError("error during closing client: %s", err) - } + _ = producer.options.client.Close() } if producer.onClose != nil { @@ -690,7 +684,7 @@ func (producer *Producer) sendWithFilter(messagesSequence []*messageSequence, pr frameHeaderLength := initBufferPublishSize var msgLen int for _, msg := range messagesSequence { - msgLen += msg.unCompressedSize + 8 + 4 + msgLen += len(msg.messageBytes) + 8 + 4 if msg.filterValue != "" { msgLen += 2 + len(msg.filterValue) } diff --git a/pkg/stream/producer_test.go b/pkg/stream/producer_test.go index d2a0cd62..8cb31fc4 100644 --- a/pkg/stream/producer_test.go +++ b/pkg/stream/producer_test.go @@ -331,9 +331,10 @@ var _ = Describe("Streaming Producers", func() { Expect(producer.Send(amqp.NewMessage(s))).NotTo(HaveOccurred()) } + time.Sleep(1500 * time.Millisecond) Eventually(func() int32 { return atomic.LoadInt32(&messagesReceived) - }, 5*time.Second).Should(Equal(int32(101)), + }, 5*time.Second).WithPolling(300*time.Millisecond).Should(Equal(int32(101)), "confirm should receive same messages Send by producer") Expect(producer.lenUnConfirmed()).To(Equal(0)) @@ -569,8 +570,7 @@ var _ = Describe("Streaming Producers", func() { for i := 0; i < 1; i++ { s := make([]byte, 50) messagesSequence[i] = &messageSequence{ - messageBytes: s, - unCompressedSize: len(s), + messageBytes: s, } } @@ -578,8 +578,7 @@ var _ = Describe("Streaming Producers", func() { msg.SetPublishingId(1) messageBytes, _ := msg.MarshalBinary() messagesSequence[0] = &messageSequence{ - messageBytes: messageBytes, - unCompressedSize: len(messageBytes), + messageBytes: messageBytes, } // 200 producer ID doesn't exist @@ -658,8 +657,7 @@ var _ = Describe("Streaming Producers", func() { for i := 0; i < 201; i++ { s := make([]byte, 50) messagesSequence[i] = &messageSequence{ - messageBytes: s, - unCompressedSize: len(s), + messageBytes: s, } } @@ -675,8 +673,7 @@ var _ = Describe("Streaming Producers", func() { s := make([]byte, 50) messagesSequence[i] = &messageSequence{ - messageBytes: s, - unCompressedSize: len(s), + messageBytes: s, } } @@ -691,8 +688,7 @@ var _ = Describe("Streaming Producers", func() { for i := 0; i < 1; i++ { s := make([]byte, 50) messagesSequence[i] = &messageSequence{ - messageBytes: s, - unCompressedSize: len(s), + messageBytes: s, } } @@ -707,8 +703,7 @@ var _ = Describe("Streaming Producers", func() { for i := 0; i < 1000; i++ { s := make([]byte, 50) messagesSequence[i] = &messageSequence{ - messageBytes: s, - unCompressedSize: len(s), + messageBytes: s, } } @@ -723,8 +718,7 @@ var _ = Describe("Streaming Producers", func() { for i := 0; i < 14; i++ { s := make([]byte, 50) messagesSequence[i] = &messageSequence{ - messageBytes: s, - unCompressedSize: len(s), + messageBytes: s, } } diff --git a/pkg/stream/producer_unconfirmed.go b/pkg/stream/producer_unconfirmed.go index fa30ed8a..9d232c26 100644 --- a/pkg/stream/producer_unconfirmed.go +++ b/pkg/stream/producer_unconfirmed.go @@ -1,6 +1,7 @@ package stream import ( + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" "sync" "time" ) @@ -14,53 +15,65 @@ import ( // The confirmation status is updated when the confirmation is received from the broker (see server_frame.go) // or due of timeout. The Timeout is configurable, and it is calculated client side. type unConfirmed struct { - messages map[int64]*ConfirmationStatus - mutex sync.RWMutex + messages map[int64]*ConfirmationStatus + mutexMessageMap sync.RWMutex } -const DefaultUnconfirmedSize = 10000 +const DefaultUnconfirmedSize = 10_000 func newUnConfirmed() *unConfirmed { r := &unConfirmed{ - messages: make(map[int64]*ConfirmationStatus, DefaultUnconfirmedSize), - mutex: sync.RWMutex{}, + messages: make(map[int64]*ConfirmationStatus, DefaultUnconfirmedSize), + mutexMessageMap: sync.RWMutex{}, } return r } -func (u *unConfirmed) addFromSequence(message *messageSequence, producerID uint8) { +func (u *unConfirmed) addFromSequence(message *messageSequence, source *message.StreamMessage, producerID uint8) { - u.mutex.Lock() + u.mutexMessageMap.Lock() u.messages[message.publishingId] = &ConfirmationStatus{ inserted: time.Now(), - message: *message.refMessage, + message: *source, producerID: producerID, publishingId: message.publishingId, confirmed: false, } - u.mutex.Unlock() + u.mutexMessageMap.Unlock() } func (u *unConfirmed) link(from int64, to int64) { - u.mutex.Lock() - defer u.mutex.Unlock() + u.mutexMessageMap.Lock() + defer u.mutexMessageMap.Unlock() r := u.messages[from] if r != nil { r.linkedTo = append(r.linkedTo, u.messages[to]) } } -func (u *unConfirmed) extractWithConfirm(id int64) *ConfirmationStatus { - u.mutex.Lock() - defer u.mutex.Unlock() - return u.extract(id, 0, true) +func (u *unConfirmed) extractWithConfirms(ids []int64) []*ConfirmationStatus { + u.mutexMessageMap.Lock() + defer u.mutexMessageMap.Unlock() + var res []*ConfirmationStatus + + for _, v := range ids { + m := u.extract(v, 0, true) + if m != nil { + res = append(res, m) + if m.linkedTo != nil { + res = append(res, m.linkedTo...) + } + } + } + return res + } func (u *unConfirmed) extractWithError(id int64, errorCode uint16) *ConfirmationStatus { - u.mutex.Lock() - defer u.mutex.Unlock() + u.mutexMessageMap.Lock() + defer u.mutexMessageMap.Unlock() return u.extract(id, errorCode, false) } @@ -88,8 +101,8 @@ func (u *unConfirmed) updateStatus(rootMessage *ConfirmationStatus, errorCode ui } func (u *unConfirmed) extractWithTimeOut(timeout time.Duration) []*ConfirmationStatus { - u.mutex.Lock() - defer u.mutex.Unlock() + u.mutexMessageMap.Lock() + defer u.mutexMessageMap.Unlock() var res []*ConfirmationStatus for _, v := range u.messages { if time.Since(v.inserted) > timeout { @@ -101,13 +114,13 @@ func (u *unConfirmed) extractWithTimeOut(timeout time.Duration) []*ConfirmationS } func (u *unConfirmed) size() int { - u.mutex.Lock() - defer u.mutex.Unlock() + u.mutexMessageMap.Lock() + defer u.mutexMessageMap.Unlock() return len(u.messages) } func (u *unConfirmed) getAll() map[int64]*ConfirmationStatus { - u.mutex.Lock() - defer u.mutex.Unlock() + u.mutexMessageMap.Lock() + defer u.mutexMessageMap.Unlock() return u.messages } diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 14d3cfb9..0afb3c4b 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -29,14 +29,17 @@ func logErrorCommand(error error, details string) { func (c *Client) handleResponse() { buffer := bufio.NewReader(c.socket.connection) + for { readerProtocol := &ReaderProtocol{} + frameLen, err := readUInt(buffer) if err != nil { logs.LogDebug("Read connection failed: %s", err) _ = c.Close() break } + c.setLastHeartBeat(time.Now()) readerProtocol.FrameLen = frameLen readerProtocol.CommandID = uShortExtractResponseCode(readUShort(buffer)) @@ -252,26 +255,28 @@ func (c *Client) handleConfirm(readProtocol *ReaderProtocol, r *bufio.Reader) in // even the producer is not found we need to read the publishingId // to empty the buffer. // The producer here could not exist because the producer is closed before the confirmations are received - var unConfirmedRecv []*ConfirmationStatus + //var unConfirmedRecv []*ConfirmationStatus + var arraySeq []int64 for publishingIdCount != 0 { seq := readInt64(r) - if producerFound { - - m := producer.unConfirmed.extractWithConfirm(seq) - if m != nil { - unConfirmedRecv = append(unConfirmedRecv, m) - - // in case of sub-batch entry the client receives only - // one publishingId (or sequence) - // so the other messages are confirmed using the linkedTo - unConfirmedRecv = append(unConfirmedRecv, m.linkedTo...) - } - } + arraySeq = append(arraySeq, seq) + //if producerFound { + // m := producer.unConfirmed.extractWithConfirm(seq) + // if m != nil { + // unConfirmedRecv = append(unConfirmedRecv, m) + // + // // in case of sub-batch entry the client receives only + // // one publishingId (or sequence) + // // so the other messages are confirmed using the linkedTo + // unConfirmedRecv = append(unConfirmedRecv, m.linkedTo...) + // } + //} publishingIdCount-- } + if producerFound { - producer.sendConfirmationStatus(unConfirmedRecv) + producer.sendConfirmationStatus(producer.unConfirmed.extractWithConfirms(arraySeq)) } return 0 @@ -291,9 +296,9 @@ func (c *Client) handleDeliver(r *bufio.Reader) { subscriptionId := readByte(r) consumer, err := c.coordinator.GetConsumerById(subscriptionId) + consumerFound := err == nil if err != nil { logs.LogError("Handle Deliver consumer not found %s", err) - return } @@ -315,6 +320,16 @@ func (c *Client) handleDeliver(r *bufio.Reader) { var offsetLimit int64 = -1 + var bytesBuffer = make([]byte, int(dataLength)) + _, err = io.ReadFull(r, bytesBuffer) + logErrorCommand(err, "handleDeliver") + + if !consumerFound { + // even if the consumer is not found we need to read the buffer + logs.LogWarn("the consumer was not found %d. cleaning the buffer", subscriptionId) + return + } + // we can have two cases // 1. single active consumer is enabled // 2. single active consumer is not enabled @@ -341,9 +356,6 @@ func (c *Client) handleDeliver(r *bufio.Reader) { batchConsumingMessages := make(offsetMessages, 0, numRecords) var chunk chunkInfo chunk.numEntries = numEntries - var bytesBuffer = make([]byte, int(dataLength)) - _, err = io.ReadFull(r, bytesBuffer) - logErrorCommand(err, "handleDeliver") /// headers ---> payload -> messages @@ -407,7 +419,7 @@ func (c *Client) handleDeliver(r *bufio.Reader) { // dispatch the messages with offset to the consumer chunk.offsetMessages = batchConsumingMessages if consumer.getStatus() == open { - consumer.response.chunkForConsumer <- chunk + consumer.chunkForConsumer <- chunk } else { logs.LogDebug("The consumer %s for the stream %s is closed during the chunk dispatching. "+ "Messages won't dispatched", consumer.GetName(), consumer.GetStreamName()) @@ -487,12 +499,8 @@ func (c *Client) metadataUpdateFrameHandler(buffer *bufio.Reader) { if code == responseCodeStreamNotAvailable { stream := readString(buffer) logs.LogDebug("stream %s is no longer available", stream) - c.mutex.Lock() - c.metadataListener <- metaDataUpdateEvent{ - StreamName: stream, - code: responseCodeStreamNotAvailable, - } - c.mutex.Unlock() + c.maybeCleanProducers(stream) + c.maybeCleanConsumers(stream) } else { //TODO handle the error, see the java code