From 1bf978bfe445cac3278f660af9abfa3a59e1ce29 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 9 Jan 2025 16:57:41 +0100 Subject: [PATCH 01/18] wip Signed-off-by: Gabriele Santomaggio --- pkg/ha/ha_consumer.go | 1 - pkg/stream/client.go | 19 ++++---- pkg/stream/constants.go | 1 + pkg/stream/consumer.go | 85 ++++++++++++++++++++-------------- pkg/stream/consumer_test.go | 2 +- pkg/stream/coordinator.go | 18 +------ pkg/stream/coordinator_test.go | 1 + pkg/stream/environment.go | 69 ++++++++++++--------------- pkg/stream/listeners.go | 6 --- pkg/stream/producer.go | 8 ++-- pkg/stream/server_frame.go | 13 +++--- 11 files changed, 101 insertions(+), 122 deletions(-) diff --git a/pkg/ha/ha_consumer.go b/pkg/ha/ha_consumer.go index 24e8bb9d..4ceec9ea 100644 --- a/pkg/ha/ha_consumer.go +++ b/pkg/ha/ha_consumer.go @@ -88,7 +88,6 @@ func NewReliableConsumer(env *stream.Environment, streamName string, logs.LogDebug("[Reliable] - creating %s", res.getInfo()) err := res.newConsumer() if err == nil { - res.setStatus(StatusOpen) } logs.LogDebug("[Reliable] - created %s", res.getInfo()) diff --git a/pkg/stream/client.go b/pkg/stream/client.go index 9e25a992..ae94d525 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -63,8 +63,8 @@ type Client struct { tcpParameters *TCPParameters saslConfiguration *SaslConfiguration - mutex *sync.Mutex - metadataListener metadataListener + mutex *sync.Mutex + //metadataListener metadataListener lastHeartBeat HeartBeat socketCallTimeout time.Duration availableFeatures *availableFeatures @@ -512,10 +512,10 @@ func (c *Client) Close() error { } } - if c.metadataListener != nil { - close(c.metadataListener) - c.metadataListener = nil - } + //if c.metadataListener != nil { + // close(c.metadataListener) + // c.metadataListener = nil + //} c.closeHartBeat() if c.getSocket().isOpen() { @@ -747,6 +747,7 @@ func (c *Client) BrokerForConsumer(stream string) (*Broker, error) { streamMetadata := streamsMetadata.Get(stream) if streamMetadata.responseCode != responseCodeOk { + return nil, lookErrorCode(streamMetadata.responseCode) } @@ -992,12 +993,8 @@ func (c *Client) DeclareSubscriber(streamName string, go func() { for { select { - case code := <-consumer.response.code: - if code.id == closeChannel { - return - } - case chunk, ok := <-consumer.response.chunkForConsumer: + case chunk, ok := <-consumer.chunkForConsumer: if !ok { return } diff --git a/pkg/stream/constants.go b/pkg/stream/constants.go index a19ab723..c760d370 100644 --- a/pkg/stream/constants.go +++ b/pkg/stream/constants.go @@ -119,6 +119,7 @@ const ( LeaderLocatorBalanced = "balanced" LeaderLocatorClientLocal = "client-local" DeletePublisher = "deletePublisher" + UnSubscribe = "unSubscribe" StreamTcpPort = "5552" diff --git a/pkg/stream/consumer.go b/pkg/stream/consumer.go index e9ff54bc..8f6d8c70 100644 --- a/pkg/stream/consumer.go +++ b/pkg/stream/consumer.go @@ -10,12 +10,13 @@ import ( ) type Consumer struct { - ID uint8 - response *Response - options *ConsumerOptions - onClose onInternalClose - mutex *sync.Mutex - MessagesHandler MessagesHandler + ID uint8 + response *Response + options *ConsumerOptions + onClose onInternalClose + mutex *sync.Mutex + chunkForConsumer chan chunkInfo + MessagesHandler MessagesHandler // different form ConsumerOptions.offset. ConsumerOptions.offset is just the configuration // and won't change. currentOffset is the status of the offset currentOffset int64 @@ -312,52 +313,64 @@ func (consumer *Consumer) Close() error { if consumer.getStatus() == closed { return AlreadyClosed } - consumer.cacheStoreOffset() - - consumer.setStatus(closed) - _, errGet := consumer.options.client.coordinator.GetConsumerById(consumer.ID) - if errGet != nil { - return nil - } - - length := 2 + 2 + 4 + 1 - resp := consumer.options.client.coordinator.NewResponse(CommandUnsubscribe) - correlationId := resp.correlationid - var b = bytes.NewBuffer(make([]byte, 0, length+4)) - writeProtocolHeader(b, length, CommandUnsubscribe, - correlationId) - - writeByte(b, consumer.ID) - err := consumer.options.client.handleWrite(b.Bytes(), resp) - if err.Err != nil && err.isTimeout { - return err.Err - } - - errC := consumer.options.client.coordinator.RemoveConsumerById(consumer.ID, Event{ + return consumer.close(Event{ Command: CommandUnsubscribe, StreamName: consumer.GetStreamName(), Name: consumer.GetName(), - Reason: "unSubscribe", + Reason: UnSubscribe, Err: nil, }) +} - if errC != nil { - logs.LogWarn("Error during remove consumer id:%s", errC) +func (consumer *Consumer) close(reason Event) error { + if consumer.options == nil { + logs.LogWarn("consumer options is nil, the close will be ignored") + return nil } + consumer.cacheStoreOffset() + consumer.setStatus(closed) - if consumer.options.client.coordinator.ConsumersCount() == 0 { - err := consumer.options.client.Close() - if err != nil { - return err + if closeHandler := consumer.GetCloseHandler(); closeHandler != nil { + closeHandler <- reason + close(consumer.closeHandler) + consumer.closeHandler = nil + } + + if consumer.chunkForConsumer != nil { + close(consumer.chunkForConsumer) + consumer.chunkForConsumer = nil + } + + if consumer.response.data != nil { + close(consumer.response.data) + consumer.response.data = nil + } + + if reason.Reason == UnSubscribe { + length := 2 + 2 + 4 + 1 + resp := consumer.options.client.coordinator.NewResponse(CommandUnsubscribe) + correlationId := resp.correlationid + var b = bytes.NewBuffer(make([]byte, 0, length+4)) + writeProtocolHeader(b, length, CommandUnsubscribe, + correlationId) + + writeByte(b, consumer.ID) + err := consumer.options.client.handleWrite(b.Bytes(), resp) + if err.Err != nil && err.isTimeout { + logs.LogWarn("error during consumer unsubscribe:%s", err.Err) } } + if consumer.options != nil && consumer.options.client.coordinator.ConsumersCount() == 0 { + _ = consumer.options.client.Close() + } + ch := make(chan uint8, 1) ch <- consumer.ID consumer.onClose(ch) close(ch) - return err.Err + return nil } func (consumer *Consumer) cacheStoreOffset() { diff --git a/pkg/stream/consumer_test.go b/pkg/stream/consumer_test.go index 49406ed8..a1436f72 100644 --- a/pkg/stream/consumer_test.go +++ b/pkg/stream/consumer_test.go @@ -123,7 +123,7 @@ var _ = Describe("Streaming Consumers", func() { Eventually(func() int32 { return atomic.LoadInt32(&commandIdRecv) }, 5*time.Second).Should(Equal(int32(CommandUnsubscribe)), - "command received should be CommandMetadataUpdate ") + "command received should be unSubscribe ") Expect(err).NotTo(HaveOccurred()) }) diff --git a/pkg/stream/coordinator.go b/pkg/stream/coordinator.go index a2d9b198..4d8b6f62 100644 --- a/pkg/stream/coordinator.go +++ b/pkg/stream/coordinator.go @@ -37,7 +37,6 @@ type chunkInfo struct { type Response struct { code chan Code data chan interface{} - chunkForConsumer chan chunkInfo commandDescription string correlationid int } @@ -84,20 +83,8 @@ func (coordinator *Coordinator) RemoveConsumerById(id interface{}, reason Event) if err != nil { return err } - consumer.setStatus(closed) - reason.StreamName = consumer.GetStreamName() - reason.Name = consumer.GetName() - - if closeHandler := consumer.GetCloseHandler(); closeHandler != nil { - closeHandler <- reason - close(closeHandler) - closeHandler = nil - } - - close(consumer.response.chunkForConsumer) - close(consumer.response.code) + return consumer.close(reason) - return nil } func (coordinator *Coordinator) RemoveProducerById(id uint8, reason Event) error { @@ -117,7 +104,6 @@ func (coordinator *Coordinator) RemoveResponseById(id interface{}) error { err = coordinator.removeById(fmt.Sprintf("%d", id), coordinator.responses) close(resp.code) close(resp.data) - close(resp.chunkForConsumer) return err } @@ -131,7 +117,6 @@ func newResponse(commandDescription string) *Response { res.commandDescription = commandDescription res.code = make(chan Code, 1) res.data = make(chan interface{}, 1) - res.chunkForConsumer = make(chan chunkInfo, 100) return res } @@ -200,6 +185,7 @@ func (coordinator *Coordinator) NewConsumer(messagesHandler MessagesHandler, lastStoredOffset: -1, // because 0 is a valid value for the offset isPromotedAsActive: true, lastAutoCommitStored: time.Now(), + chunkForConsumer: make(chan chunkInfo, 100), } coordinator.consumers[lastId] = item diff --git a/pkg/stream/coordinator_test.go b/pkg/stream/coordinator_test.go index bfc48771..322de58a 100644 --- a/pkg/stream/coordinator_test.go +++ b/pkg/stream/coordinator_test.go @@ -172,6 +172,7 @@ var _ = Describe("Coordinator", func() { Command: 0, StreamName: "UNIT_TESTS", Name: "", + Reason: "UNIT_TEST", Err: nil, }) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/stream/environment.go b/pkg/stream/environment.go index 2a4c52a5..11a5cc52 100644 --- a/pkg/stream/environment.go +++ b/pkg/stream/environment.go @@ -515,12 +515,7 @@ func (c *Client) maybeCleanProducers(streamName string) { } } c.mutex.Unlock() - if c.coordinator.ProducersCount() == 0 { - err := c.Close() - if err != nil { - return - } - } + } func (c *Client) maybeCleanConsumers(streamName string) { @@ -540,12 +535,6 @@ func (c *Client) maybeCleanConsumers(streamName string) { } } c.mutex.Unlock() - if c.coordinator.ConsumersCount() == 0 { - err := c.Close() - if err != nil { - return - } - } } func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, streamName string, @@ -603,18 +592,18 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP func (cc *environmentCoordinator) newClientForProducer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, rpcTimeOut time.Duration) *Client { clientResult := newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeOut) - chMeta := make(chan metaDataUpdateEvent, 1) - clientResult.metadataListener = chMeta - go func(ch <-chan metaDataUpdateEvent, cl *Client) { - for metaDataUpdateEvent := range ch { - clientResult.maybeCleanProducers(metaDataUpdateEvent.StreamName) - cc.maybeCleanClients() - if !cl.socket.isOpen() { - return - } - } - - }(chMeta, clientResult) + //chMeta := make(chan metaDataUpdateEvent, 1) + //clientResult.metadataListener = chMeta + //go func(ch <-chan metaDataUpdateEvent, cl *Client) { + // for metaDataUpdateEvent := range ch { + // clientResult.maybeCleanProducers(metaDataUpdateEvent.StreamName) + // cc.maybeCleanClients() + // if !cl.socket.isOpen() { + // return + // } + // } + // + //}(chMeta, clientResult) cc.nextId++ cc.clientsPerContext[cc.nextId] = clientResult @@ -638,18 +627,18 @@ func (cc *environmentCoordinator) newConsumer(connectionName string, leader *Bro if clientResult == nil { clientResult = newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeout) - chMeta := make(chan metaDataUpdateEvent) - clientResult.metadataListener = chMeta - go func(ch <-chan metaDataUpdateEvent, cl *Client) { - for metaDataUpdateEvent := range ch { - clientResult.maybeCleanConsumers(metaDataUpdateEvent.StreamName) - cc.maybeCleanClients() - if !cl.socket.isOpen() { - return - } - } - - }(chMeta, clientResult) + //chMeta := make(chan metaDataUpdateEvent) + //clientResult.metadataListener = chMeta + //go func(ch <-chan metaDataUpdateEvent, cl *Client) { + // for metaDataUpdateEvent := range ch { + // clientResult.maybeCleanConsumers(metaDataUpdateEvent.StreamName) + // cc.maybeCleanClients() + // if !cl.socket.isOpen() { + // return + // } + // } + // + //}(chMeta, clientResult) cc.nextId++ cc.clientsPerContext[cc.nextId] = clientResult @@ -672,9 +661,11 @@ func (cc *environmentCoordinator) Close() error { cc.mutexContext.Lock() defer cc.mutexContext.Unlock() for _, client := range cc.clientsPerContext { - err := client.Close() - if err != nil { - logs.LogWarn("Error during close the client, %s", err) + for i := range client.coordinator.producers { + _ = client.coordinator.producers[i].(*Producer).Close() + } + for i := range client.coordinator.consumers { + _ = client.coordinator.consumers[i].(*Consumer).Close() } } return nil diff --git a/pkg/stream/listeners.go b/pkg/stream/listeners.go index b5c74b20..f0750226 100644 --- a/pkg/stream/listeners.go +++ b/pkg/stream/listeners.go @@ -8,13 +8,7 @@ type Event struct { Err error } -type metaDataUpdateEvent struct { - StreamName string - code uint16 -} - type onInternalClose func(ch <-chan uint8) -type metadataListener chan metaDataUpdateEvent type ChannelClose = <-chan Event type ChannelPublishConfirm chan []*ConfirmationStatus diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index a6bdb773..b18b154c 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -606,9 +606,10 @@ func (producer *Producer) close(reason Event) error { producer.closeConfirmationStatus() if producer.options == nil { + logs.LogWarn("producer options is nil, the close will be ignored") return nil } - _ = producer.options.client.coordinator.RemoveProducerById(producer.id, reason) + _, _ = producer.options.client.coordinator.ExtractProducerById(producer.id) if !producer.options.client.socket.isOpen() { return fmt.Errorf("tcp connection is closed") @@ -620,10 +621,7 @@ func (producer *Producer) close(reason Event) error { } if producer.options.client.coordinator.ProducersCount() == 0 { - err := producer.options.client.Close() - if err != nil { - logs.LogError("error during closing client: %s", err) - } + _ = producer.options.client.Close() } if producer.onClose != nil { diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 14d3cfb9..0f1e57f0 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -29,14 +29,17 @@ func logErrorCommand(error error, details string) { func (c *Client) handleResponse() { buffer := bufio.NewReader(c.socket.connection) + for { readerProtocol := &ReaderProtocol{} + frameLen, err := readUInt(buffer) if err != nil { logs.LogDebug("Read connection failed: %s", err) _ = c.Close() break } + c.setLastHeartBeat(time.Now()) readerProtocol.FrameLen = frameLen readerProtocol.CommandID = uShortExtractResponseCode(readUShort(buffer)) @@ -407,7 +410,7 @@ func (c *Client) handleDeliver(r *bufio.Reader) { // dispatch the messages with offset to the consumer chunk.offsetMessages = batchConsumingMessages if consumer.getStatus() == open { - consumer.response.chunkForConsumer <- chunk + consumer.chunkForConsumer <- chunk } else { logs.LogDebug("The consumer %s for the stream %s is closed during the chunk dispatching. "+ "Messages won't dispatched", consumer.GetName(), consumer.GetStreamName()) @@ -487,12 +490,8 @@ func (c *Client) metadataUpdateFrameHandler(buffer *bufio.Reader) { if code == responseCodeStreamNotAvailable { stream := readString(buffer) logs.LogDebug("stream %s is no longer available", stream) - c.mutex.Lock() - c.metadataListener <- metaDataUpdateEvent{ - StreamName: stream, - code: responseCodeStreamNotAvailable, - } - c.mutex.Unlock() + c.maybeCleanProducers(stream) + c.maybeCleanConsumers(stream) } else { //TODO handle the error, see the java code From c3c52cb60856480be57affdebc1f231de95dae08 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 9 Jan 2025 17:35:50 +0100 Subject: [PATCH 02/18] wip Signed-off-by: Gabriele Santomaggio --- pkg/stream/blocking_queue.go | 19 +++++++++++++++---- pkg/stream/producer.go | 13 ++++++++++++- 2 files changed, 27 insertions(+), 5 deletions(-) diff --git a/pkg/stream/blocking_queue.go b/pkg/stream/blocking_queue.go index d949b6e0..c1bbfdae 100644 --- a/pkg/stream/blocking_queue.go +++ b/pkg/stream/blocking_queue.go @@ -10,15 +10,18 @@ import ( var ErrBlockingQueueStopped = errors.New("blocking queue stopped") type BlockingQueue[T any] struct { - queue chan T - status int32 + queue chan T + status int32 + capacity int + lastUpdate time.Time } // NewBlockingQueue initializes a new BlockingQueue with the given capacity func NewBlockingQueue[T any](capacity int) *BlockingQueue[T] { return &BlockingQueue[T]{ - queue: make(chan T, capacity), - status: 0, + queue: make(chan T, capacity), + capacity: capacity, + status: 0, } } @@ -27,6 +30,7 @@ func (bq *BlockingQueue[T]) Enqueue(item T) error { if bq.IsStopped() { return ErrBlockingQueueStopped } + bq.lastUpdate = time.Now() bq.queue <- item return nil @@ -59,6 +63,13 @@ func (bq *BlockingQueue[T]) IsEmpty() bool { return len(bq.queue) == 0 } +func (bq *BlockingQueue[T]) IsReadyToSend() bool { + if bq.lastUpdate.IsZero() { + return true + } + return time.Since(bq.lastUpdate) > 10*time.Millisecond && len(bq.queue) == 0 +} + // Stop stops the queue from accepting new items // but allows some pending items. // Stop is different from Close in that it allows the diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index b18b154c..49b31f6c 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -287,6 +287,9 @@ func (producer *Producer) closeConfirmationStatus() { func (producer *Producer) processPendingSequencesQueue() { maxFrame := producer.options.client.getTuneState().requestedMaxFrameSize + var avarage int32 + iterations := 0 + // the buffer is initialized with the size of the header sequenceToSend := make([]*messageSequence, 0) go func() { @@ -316,8 +319,16 @@ func (producer *Producer) processPendingSequencesQueue() { // if producer.pendingSequencesQueue.IsEmpty() means that the queue is empty so the producer is not sending // the messages during the checks of the buffer. In this case - if producer.pendingSequencesQueue.IsEmpty() || len(sequenceToSend) >= producer.options.BatchSize { + if producer.pendingSequencesQueue.IsReadyToSend() || len(sequenceToSend) >= producer.options.BatchSize { if len(sequenceToSend) > 0 { + avarage = atomic.AddInt32(&avarage, int32(len(sequenceToSend))) + iterations++ + if iterations > 10000 { + logs.LogInfo("producer %d avarage: %d", producer.id, avarage/int32(iterations)) + avarage = 0 + iterations = 0 + } + lastError = producer.internalBatchSend(sequenceToSend) sequenceToSend = sequenceToSend[:0] totalBufferToSend += initBufferPublishSize From 09ef07ad5669cf8bc9637b5afeec6c04bf595e54 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 9 Jan 2025 17:40:28 +0100 Subject: [PATCH 03/18] wip Signed-off-by: Gabriele Santomaggio --- pkg/stream/producer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 49b31f6c..c1f6ed74 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -321,7 +321,7 @@ func (producer *Producer) processPendingSequencesQueue() { // the messages during the checks of the buffer. In this case if producer.pendingSequencesQueue.IsReadyToSend() || len(sequenceToSend) >= producer.options.BatchSize { if len(sequenceToSend) > 0 { - avarage = atomic.AddInt32(&avarage, int32(len(sequenceToSend))) + avarage += avarage iterations++ if iterations > 10000 { logs.LogInfo("producer %d avarage: %d", producer.id, avarage/int32(iterations)) From 8b3c4a1c74ec53a2a9c36c7fbaee5b12d09d8577 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 9 Jan 2025 17:42:02 +0100 Subject: [PATCH 04/18] temp calculation Signed-off-by: Gabriele Santomaggio --- pkg/stream/producer.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index c1f6ed74..3784f1c8 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -287,7 +287,7 @@ func (producer *Producer) closeConfirmationStatus() { func (producer *Producer) processPendingSequencesQueue() { maxFrame := producer.options.client.getTuneState().requestedMaxFrameSize - var avarage int32 + var avarage = 0 iterations := 0 // the buffer is initialized with the size of the header @@ -321,10 +321,10 @@ func (producer *Producer) processPendingSequencesQueue() { // the messages during the checks of the buffer. In this case if producer.pendingSequencesQueue.IsReadyToSend() || len(sequenceToSend) >= producer.options.BatchSize { if len(sequenceToSend) > 0 { - avarage += avarage + avarage += len(sequenceToSend) iterations++ if iterations > 10000 { - logs.LogInfo("producer %d avarage: %d", producer.id, avarage/int32(iterations)) + logs.LogInfo("producer %d avarage: %d", producer.id, avarage/iterations) avarage = 0 iterations = 0 } From b4563a8f768ac002a38ad15ccc42ab5120b76573 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 9 Jan 2025 18:52:04 +0100 Subject: [PATCH 05/18] add function to process the messages Signed-off-by: Gabriele Santomaggio --- pkg/stream/blocking_queue.go | 14 +++++++ pkg/stream/producer.go | 72 +++++++++++++++++++++++++++--------- 2 files changed, 68 insertions(+), 18 deletions(-) diff --git a/pkg/stream/blocking_queue.go b/pkg/stream/blocking_queue.go index c1bbfdae..fee6704a 100644 --- a/pkg/stream/blocking_queue.go +++ b/pkg/stream/blocking_queue.go @@ -55,6 +55,20 @@ func (bq *BlockingQueue[T]) Dequeue(timeout time.Duration) T { } } +func (bq *BlockingQueue[T]) Process(f func(T)) { + isActive := true + for isActive { + select { + case item, ok := <-bq.queue: + if !ok { + isActive = false + return + } + f(item) + } + } +} + func (bq *BlockingQueue[T]) Size() int { return len(bq.queue) } diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 3784f1c8..323a6768 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -289,20 +289,11 @@ func (producer *Producer) processPendingSequencesQueue() { maxFrame := producer.options.client.getTuneState().requestedMaxFrameSize var avarage = 0 iterations := 0 - - // the buffer is initialized with the size of the header - sequenceToSend := make([]*messageSequence, 0) go func() { + sequenceToSend := make([]*messageSequence, 0) totalBufferToSend := initBufferPublishSize - for { + producer.pendingSequencesQueue.Process(func(msg *messageSequence) { var lastError error - // the dequeue is blocking with a timeout of 500ms - // as soon as a message is available the Dequeue will be unblocked - msg := producer.pendingSequencesQueue.Dequeue(time.Millisecond * 500) - if producer.pendingSequencesQueue.IsStopped() { - break - } - if msg != nil { // There is something in the queue.Checks the buffer is still less than the maxFrame totalBufferToSend += msg.unCompressedSize @@ -313,18 +304,14 @@ func (producer *Producer) processPendingSequencesQueue() { sequenceToSend = sequenceToSend[:0] totalBufferToSend = initBufferPublishSize } - sequenceToSend = append(sequenceToSend, msg) } - - // if producer.pendingSequencesQueue.IsEmpty() means that the queue is empty so the producer is not sending - // the messages during the checks of the buffer. In this case if producer.pendingSequencesQueue.IsReadyToSend() || len(sequenceToSend) >= producer.options.BatchSize { if len(sequenceToSend) > 0 { avarage += len(sequenceToSend) iterations++ if iterations > 10000 { - logs.LogInfo("producer %d avarage: %d", producer.id, avarage/iterations) + logs.LogInfo("producer %d average: %d", producer.id, avarage/iterations) avarage = 0 iterations = 0 } @@ -337,9 +324,58 @@ func (producer *Producer) processPendingSequencesQueue() { if lastError != nil { logs.LogError("error during sending messages: %s", lastError) } - } - logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id) + + }) }() + logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id) + // the buffer is initialized with the size of the header + + // for { + // var lastError error + // // the dequeue is blocking with a timeout of 500ms + // // as soon as a message is available the Dequeue will be unblocked + // msg := producer.pendingSequencesQueue.Dequeue(time.Millisecond * 500) + // if producer.pendingSequencesQueue.IsStopped() { + // break + // } + // + // if msg != nil { + // // There is something in the queue.Checks the buffer is still less than the maxFrame + // totalBufferToSend += msg.unCompressedSize + // if totalBufferToSend > maxFrame { + // // if the totalBufferToSend is greater than the requestedMaxFrameSize + // // the producer sends the messages and reset the buffer + // lastError = producer.internalBatchSend(sequenceToSend) + // sequenceToSend = sequenceToSend[:0] + // totalBufferToSend = initBufferPublishSize + // } + // + // sequenceToSend = append(sequenceToSend, msg) + // } + // + // // if producer.pendingSequencesQueue.IsEmpty() means that the queue is empty so the producer is not sending + // // the messages during the checks of the buffer. In this case + // if producer.pendingSequencesQueue.IsReadyToSend() || len(sequenceToSend) >= producer.options.BatchSize { + // if len(sequenceToSend) > 0 { + // avarage += len(sequenceToSend) + // iterations++ + // if iterations > 10000 { + // logs.LogInfo("producer %d average: %d", producer.id, avarage/iterations) + // avarage = 0 + // iterations = 0 + // } + // + // lastError = producer.internalBatchSend(sequenceToSend) + // sequenceToSend = sequenceToSend[:0] + // totalBufferToSend += initBufferPublishSize + // } + // } + // if lastError != nil { + // logs.LogError("error during sending messages: %s", lastError) + // } + // } + // logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id) + //}() } func (producer *Producer) assignPublishingID(message message.StreamMessage) int64 { From 6a1e0f950f04add4d1093406fb72420a01c411d1 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 9 Jan 2025 18:55:44 +0100 Subject: [PATCH 06/18] add function to process the messages Signed-off-by: Gabriele Santomaggio --- pkg/stream/blocking_queue.go | 12 ++---------- 1 file changed, 2 insertions(+), 10 deletions(-) diff --git a/pkg/stream/blocking_queue.go b/pkg/stream/blocking_queue.go index fee6704a..22766965 100644 --- a/pkg/stream/blocking_queue.go +++ b/pkg/stream/blocking_queue.go @@ -56,16 +56,8 @@ func (bq *BlockingQueue[T]) Dequeue(timeout time.Duration) T { } func (bq *BlockingQueue[T]) Process(f func(T)) { - isActive := true - for isActive { - select { - case item, ok := <-bq.queue: - if !ok { - isActive = false - return - } - f(item) - } + for item := range bq.queue { + f(item) } } From 3374ca6038d999b15a95b4950d33003e14cb6efb Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 10 Jan 2025 15:15:53 +0100 Subject: [PATCH 07/18] aggregate responses Signed-off-by: Gabriele Santomaggio --- pkg/stream/blocking_queue.go | 18 +++++--- pkg/stream/constants.go | 3 +- pkg/stream/environment_test.go | 2 +- pkg/stream/producer.go | 74 ++++++++---------------------- pkg/stream/producer_test.go | 5 +- pkg/stream/producer_unconfirmed.go | 15 +++++- pkg/stream/server_frame.go | 30 ++++++------ 7 files changed, 65 insertions(+), 82 deletions(-) diff --git a/pkg/stream/blocking_queue.go b/pkg/stream/blocking_queue.go index 22766965..2522ed4e 100644 --- a/pkg/stream/blocking_queue.go +++ b/pkg/stream/blocking_queue.go @@ -13,7 +13,7 @@ type BlockingQueue[T any] struct { queue chan T status int32 capacity int - lastUpdate time.Time + lastUpdate int64 } // NewBlockingQueue initializes a new BlockingQueue with the given capacity @@ -30,9 +30,8 @@ func (bq *BlockingQueue[T]) Enqueue(item T) error { if bq.IsStopped() { return ErrBlockingQueueStopped } - bq.lastUpdate = time.Now() + atomic.StoreInt64(&bq.lastUpdate, time.Now().UnixNano()) bq.queue <- item - return nil } @@ -42,6 +41,7 @@ func (bq *BlockingQueue[T]) Dequeue(timeout time.Duration) T { var zeroValue T // Zero value of type T return zeroValue } + select { case item, ok := <-bq.queue: if !ok { @@ -61,6 +61,10 @@ func (bq *BlockingQueue[T]) Process(f func(T)) { } } +func (bq *BlockingQueue[T]) GetChannel() chan T { + return bq.queue +} + func (bq *BlockingQueue[T]) Size() int { return len(bq.queue) } @@ -70,10 +74,10 @@ func (bq *BlockingQueue[T]) IsEmpty() bool { } func (bq *BlockingQueue[T]) IsReadyToSend() bool { - if bq.lastUpdate.IsZero() { - return true - } - return time.Since(bq.lastUpdate) > 10*time.Millisecond && len(bq.queue) == 0 + + millis := time.Since(time.Unix(0, atomic.LoadInt64(&bq.lastUpdate))).Milliseconds() + + return millis > 10 && len(bq.queue) == 0 } // Stop stops the queue from accepting new items diff --git a/pkg/stream/constants.go b/pkg/stream/constants.go index c760d370..91befece 100644 --- a/pkg/stream/constants.go +++ b/pkg/stream/constants.go @@ -88,7 +88,8 @@ const ( /// defaultSocketCallTimeout = 10 * time.Second - defaultHeartbeat = 60 * time.Second + defaultHeartbeat = 60 * time.Second + defaultMaxFrameSize = 1048574 // LocalhostUriConnection = "rabbitmq-stream://guest:guest@localhost:5552/%2f" diff --git a/pkg/stream/environment_test.go b/pkg/stream/environment_test.go index 444e19fb..7177dfc7 100644 --- a/pkg/stream/environment_test.go +++ b/pkg/stream/environment_test.go @@ -209,7 +209,7 @@ var _ = Describe("Environment test", func() { TCPParameters: &TCPParameters{ tlsConfig: nil, RequestedHeartbeat: defaultHeartbeat, - RequestedMaxFrameSize: 1048574, + RequestedMaxFrameSize: defaultMaxFrameSize, WriteBuffer: 100, ReadBuffer: 200, NoDelay: false, diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 323a6768..3f2c9916 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -289,11 +289,19 @@ func (producer *Producer) processPendingSequencesQueue() { maxFrame := producer.options.client.getTuneState().requestedMaxFrameSize var avarage = 0 iterations := 0 + // the buffer is initialized with the size of the header go func() { sequenceToSend := make([]*messageSequence, 0) totalBufferToSend := initBufferPublishSize - producer.pendingSequencesQueue.Process(func(msg *messageSequence) { + for msg := range producer.pendingSequencesQueue.GetChannel() { + var lastError error + // the dequeue is blocking with a timeout of 500ms + // as soon as a message is available the Dequeue will be unblocked + //msg := producer.pendingSequencesQueue.Dequeue(time.Millisecond * 500) + if producer.pendingSequencesQueue.IsStopped() { + break + } if msg != nil { // There is something in the queue.Checks the buffer is still less than the maxFrame totalBufferToSend += msg.unCompressedSize @@ -304,13 +312,17 @@ func (producer *Producer) processPendingSequencesQueue() { sequenceToSend = sequenceToSend[:0] totalBufferToSend = initBufferPublishSize } + sequenceToSend = append(sequenceToSend, msg) } - if producer.pendingSequencesQueue.IsReadyToSend() || len(sequenceToSend) >= producer.options.BatchSize { + + // if producer.pendingSequencesQueue.IsEmpty() means that the queue is empty so the producer is not sending + // the messages during the checks of the buffer. In this case + if producer.pendingSequencesQueue.IsEmpty() || len(sequenceToSend) >= producer.options.BatchSize { if len(sequenceToSend) > 0 { avarage += len(sequenceToSend) iterations++ - if iterations > 10000 { + if iterations > 100000 { logs.LogInfo("producer %d average: %d", producer.id, avarage/iterations) avarage = 0 iterations = 0 @@ -324,58 +336,10 @@ func (producer *Producer) processPendingSequencesQueue() { if lastError != nil { logs.LogError("error during sending messages: %s", lastError) } + } - }) + logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id) }() - logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id) - // the buffer is initialized with the size of the header - - // for { - // var lastError error - // // the dequeue is blocking with a timeout of 500ms - // // as soon as a message is available the Dequeue will be unblocked - // msg := producer.pendingSequencesQueue.Dequeue(time.Millisecond * 500) - // if producer.pendingSequencesQueue.IsStopped() { - // break - // } - // - // if msg != nil { - // // There is something in the queue.Checks the buffer is still less than the maxFrame - // totalBufferToSend += msg.unCompressedSize - // if totalBufferToSend > maxFrame { - // // if the totalBufferToSend is greater than the requestedMaxFrameSize - // // the producer sends the messages and reset the buffer - // lastError = producer.internalBatchSend(sequenceToSend) - // sequenceToSend = sequenceToSend[:0] - // totalBufferToSend = initBufferPublishSize - // } - // - // sequenceToSend = append(sequenceToSend, msg) - // } - // - // // if producer.pendingSequencesQueue.IsEmpty() means that the queue is empty so the producer is not sending - // // the messages during the checks of the buffer. In this case - // if producer.pendingSequencesQueue.IsReadyToSend() || len(sequenceToSend) >= producer.options.BatchSize { - // if len(sequenceToSend) > 0 { - // avarage += len(sequenceToSend) - // iterations++ - // if iterations > 10000 { - // logs.LogInfo("producer %d average: %d", producer.id, avarage/iterations) - // avarage = 0 - // iterations = 0 - // } - // - // lastError = producer.internalBatchSend(sequenceToSend) - // sequenceToSend = sequenceToSend[:0] - // totalBufferToSend += initBufferPublishSize - // } - // } - // if lastError != nil { - // logs.LogError("error during sending messages: %s", lastError) - // } - // } - // logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id) - //}() } func (producer *Producer) assignPublishingID(message message.StreamMessage) int64 { @@ -420,7 +384,7 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error { } producer.unConfirmed.addFromSequence(messageSeq, producer.GetID()) - if len(messageSeq.messageBytes) > producer.options.client.getTuneState().requestedMaxFrameSize { + if len(messageSeq.messageBytes) > defaultMaxFrameSize { tooLarge := producer.unConfirmed.extractWithError(messageSeq.publishingId, responseCodeFrameTooLarge) producer.sendConfirmationStatus([]*ConfirmationStatus{tooLarge}) return FrameTooLarge @@ -440,7 +404,7 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error { // BatchSend is not affected by the BatchSize and BatchPublishingDelay options. // returns an error if the message could not be sent for marshal problems or if the buffer is too large func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error { - maxFrame := producer.options.client.getTuneState().requestedMaxFrameSize + maxFrame := defaultMaxFrameSize var messagesSequence = make([]*messageSequence, 0) totalBufferToSend := 0 for _, batchMessage := range batchMessages { diff --git a/pkg/stream/producer_test.go b/pkg/stream/producer_test.go index d2a0cd62..a35381e1 100644 --- a/pkg/stream/producer_test.go +++ b/pkg/stream/producer_test.go @@ -315,7 +315,7 @@ var _ = Describe("Streaming Producers", func() { }) - It("Smart Send/Close", func() { + It("Smart Send/Close", Focus, func() { producer, err := testEnvironment.NewProducer(testProducerStream, nil) Expect(err).NotTo(HaveOccurred()) var messagesReceived int32 @@ -331,9 +331,10 @@ var _ = Describe("Streaming Producers", func() { Expect(producer.Send(amqp.NewMessage(s))).NotTo(HaveOccurred()) } + time.Sleep(1500 * time.Millisecond) Eventually(func() int32 { return atomic.LoadInt32(&messagesReceived) - }, 5*time.Second).Should(Equal(int32(101)), + }, 5*time.Second).WithPolling(300*time.Millisecond).Should(Equal(int32(101)), "confirm should receive same messages Send by producer") Expect(producer.lenUnConfirmed()).To(Equal(0)) diff --git a/pkg/stream/producer_unconfirmed.go b/pkg/stream/producer_unconfirmed.go index fa30ed8a..41812ea2 100644 --- a/pkg/stream/producer_unconfirmed.go +++ b/pkg/stream/producer_unconfirmed.go @@ -52,10 +52,21 @@ func (u *unConfirmed) link(from int64, to int64) { } } -func (u *unConfirmed) extractWithConfirm(id int64) *ConfirmationStatus { +func (u *unConfirmed) extractWithConfirms(id []int64) []*ConfirmationStatus { u.mutex.Lock() defer u.mutex.Unlock() - return u.extract(id, 0, true) + var res []*ConfirmationStatus + for _, v := range id { + m := u.extract(v, 0, true) + if m != nil { + res = append(res, m) + if m.linkedTo != nil { + res = append(res, m.linkedTo...) + } + } + } + return res + } func (u *unConfirmed) extractWithError(id int64, errorCode uint16) *ConfirmationStatus { diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 0f1e57f0..9f8438d3 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -255,26 +255,28 @@ func (c *Client) handleConfirm(readProtocol *ReaderProtocol, r *bufio.Reader) in // even the producer is not found we need to read the publishingId // to empty the buffer. // The producer here could not exist because the producer is closed before the confirmations are received - var unConfirmedRecv []*ConfirmationStatus + //var unConfirmedRecv []*ConfirmationStatus + var arraySeq []int64 for publishingIdCount != 0 { seq := readInt64(r) - if producerFound { - - m := producer.unConfirmed.extractWithConfirm(seq) - if m != nil { - unConfirmedRecv = append(unConfirmedRecv, m) - - // in case of sub-batch entry the client receives only - // one publishingId (or sequence) - // so the other messages are confirmed using the linkedTo - unConfirmedRecv = append(unConfirmedRecv, m.linkedTo...) - } - } + arraySeq = append(arraySeq, seq) + //if producerFound { + // m := producer.unConfirmed.extractWithConfirm(seq) + // if m != nil { + // unConfirmedRecv = append(unConfirmedRecv, m) + // + // // in case of sub-batch entry the client receives only + // // one publishingId (or sequence) + // // so the other messages are confirmed using the linkedTo + // unConfirmedRecv = append(unConfirmedRecv, m.linkedTo...) + // } + //} publishingIdCount-- } + if producerFound { - producer.sendConfirmationStatus(unConfirmedRecv) + producer.sendConfirmationStatus(producer.unConfirmed.extractWithConfirms(arraySeq)) } return 0 From d9ac9ac92f34edcdf310a8f4625ba7c3bfa6de00 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Fri, 10 Jan 2025 16:54:26 +0100 Subject: [PATCH 08/18] restore timeout Signed-off-by: Gabriele Santomaggio --- pkg/stream/producer.go | 89 +++++++++++++++++++++++++----------------- 1 file changed, 54 insertions(+), 35 deletions(-) diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 3f2c9916..4d56be39 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -101,8 +101,7 @@ type ProducerOptions struct { // It is not used anymore given the dynamic batching QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput. Valid only for the method Send() - // Deprecated: starting from 1.5.0 the BatchPublishingDelay is deprecated, and it will be removed in the next releases - // It is not used anymore given the dynamic batching + BatchPublishingDelay int // Timout within the aggregation sent a batch of messages. Valid only for the method Send() // Size of sub Entry, to aggregate more subEntry using one publishing id SubEntrySize int @@ -289,57 +288,77 @@ func (producer *Producer) processPendingSequencesQueue() { maxFrame := producer.options.client.getTuneState().requestedMaxFrameSize var avarage = 0 iterations := 0 - // the buffer is initialized with the size of the header go func() { + var ticker = time.NewTicker(time.Duration(producer.options.BatchPublishingDelay) * time.Millisecond) + defer ticker.Stop() + sequenceToSend := make([]*messageSequence, 0) totalBufferToSend := initBufferPublishSize - for msg := range producer.pendingSequencesQueue.GetChannel() { - - var lastError error - // the dequeue is blocking with a timeout of 500ms - // as soon as a message is available the Dequeue will be unblocked - //msg := producer.pendingSequencesQueue.Dequeue(time.Millisecond * 500) - if producer.pendingSequencesQueue.IsStopped() { - break - } - if msg != nil { - // There is something in the queue.Checks the buffer is still less than the maxFrame - totalBufferToSend += msg.unCompressedSize - if totalBufferToSend > maxFrame { - // if the totalBufferToSend is greater than the requestedMaxFrameSize - // the producer sends the messages and reset the buffer - lastError = producer.internalBatchSend(sequenceToSend) - sequenceToSend = sequenceToSend[:0] - totalBufferToSend = initBufferPublishSize - } + var lastError error + for { + select { - sequenceToSend = append(sequenceToSend, msg) - } + case msg, ok := <-producer.pendingSequencesQueue.GetChannel(): + { + if !ok { + logs.LogInfo("producer %d processPendingSequencesQueue closed", producer.id) + return + } - // if producer.pendingSequencesQueue.IsEmpty() means that the queue is empty so the producer is not sending - // the messages during the checks of the buffer. In this case - if producer.pendingSequencesQueue.IsEmpty() || len(sequenceToSend) >= producer.options.BatchSize { - if len(sequenceToSend) > 0 { - avarage += len(sequenceToSend) - iterations++ - if iterations > 100000 { - logs.LogInfo("producer %d average: %d", producer.id, avarage/iterations) - avarage = 0 - iterations = 0 + // the dequeue is blocking with a timeout of 500ms + // as soon as a message is available the Dequeue will be unblocked + //msg := producer.pendingSequencesQueue.Dequeue(time.Millisecond * 500) + if producer.pendingSequencesQueue.IsStopped() { + break + } + if msg != nil { + // There is something in the queue.Checks the buffer is still less than the maxFrame + totalBufferToSend += msg.unCompressedSize + if totalBufferToSend > maxFrame { + // if the totalBufferToSend is greater than the requestedMaxFrameSize + // the producer sends the messages and reset the buffer + lastError = producer.internalBatchSend(sequenceToSend) + sequenceToSend = sequenceToSend[:0] + totalBufferToSend = initBufferPublishSize + } + + sequenceToSend = append(sequenceToSend, msg) } + canSend := producer.pendingSequencesQueue.IsEmpty() && producer.options.BatchPublishingDelay == 0 + // if producer.pendingSequencesQueue.IsEmpty() means that the queue is empty so the producer is not sending + // the messages during the checks of the buffer. In this case + if canSend || len(sequenceToSend) >= producer.options.BatchSize { + if len(sequenceToSend) > 0 { + avarage += len(sequenceToSend) + iterations++ + if iterations > 100000 { + logs.LogInfo("producer %d average: %d", producer.id, avarage/iterations) + avarage = 0 + iterations = 0 + } + + lastError = producer.internalBatchSend(sequenceToSend) + sequenceToSend = sequenceToSend[:0] + totalBufferToSend += initBufferPublishSize + } + } + } + case <-ticker.C: + if len(sequenceToSend) > 0 { lastError = producer.internalBatchSend(sequenceToSend) sequenceToSend = sequenceToSend[:0] totalBufferToSend += initBufferPublishSize } } + if lastError != nil { logs.LogError("error during sending messages: %s", lastError) } } - logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id) }() + logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id) } func (producer *Producer) assignPublishingID(message message.StreamMessage) int64 { From 948dc30da9d945e9a828e102a8accc83d7b84cfa Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 13 Jan 2025 10:53:34 +0100 Subject: [PATCH 09/18] tmp buffer Signed-off-by: Gabriele Santomaggio --- pkg/stream/producer.go | 4 ++-- pkg/stream/producer_unconfirmed.go | 34 ++++++++++++++++++++++++------ 2 files changed, 29 insertions(+), 9 deletions(-) diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 4d56be39..2d75ddf8 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -529,8 +529,8 @@ func (producer *Producer) aggregateEntities(msgs []*messageSequence, size int, c // / the producer id is always the producer.GetID(). This function is needed only for testing // some condition, like simulate publish error. func (producer *Producer) internalBatchSendProdId(messagesSequence []*messageSequence, producerID uint8) error { - producer.options.client.socket.mutex.Lock() - defer producer.options.client.socket.mutex.Unlock() + //producer.options.client.socket.mutex.Lock() + //defer producer.options.client.socket.mutex.Unlock() if producer.getStatus() == closed { return fmt.Errorf("producer id: %d closed", producer.id) } diff --git a/pkg/stream/producer_unconfirmed.go b/pkg/stream/producer_unconfirmed.go index 41812ea2..bfc52365 100644 --- a/pkg/stream/producer_unconfirmed.go +++ b/pkg/stream/producer_unconfirmed.go @@ -15,15 +15,19 @@ import ( // or due of timeout. The Timeout is configurable, and it is calculated client side. type unConfirmed struct { messages map[int64]*ConfirmationStatus + tmp []*ConfirmationStatus + tmpMutex sync.Mutex mutex sync.RWMutex } -const DefaultUnconfirmedSize = 10000 +const DefaultUnconfirmedSize = 10_000 func newUnConfirmed() *unConfirmed { r := &unConfirmed{ messages: make(map[int64]*ConfirmationStatus, DefaultUnconfirmedSize), + tmp: []*ConfirmationStatus{}, + tmpMutex: sync.Mutex{}, mutex: sync.RWMutex{}, } @@ -32,15 +36,16 @@ func newUnConfirmed() *unConfirmed { func (u *unConfirmed) addFromSequence(message *messageSequence, producerID uint8) { - u.mutex.Lock() - u.messages[message.publishingId] = &ConfirmationStatus{ + u.tmpMutex.Lock() + u.tmp = append(u.tmp, &ConfirmationStatus{ + //u.messages[message.publishingId] = &ConfirmationStatus{ inserted: time.Now(), message: *message.refMessage, producerID: producerID, publishingId: message.publishingId, confirmed: false, - } - u.mutex.Unlock() + }) + u.tmpMutex.Unlock() } func (u *unConfirmed) link(from int64, to int64) { @@ -52,11 +57,13 @@ func (u *unConfirmed) link(from int64, to int64) { } } -func (u *unConfirmed) extractWithConfirms(id []int64) []*ConfirmationStatus { +func (u *unConfirmed) extractWithConfirms(ids []int64) []*ConfirmationStatus { u.mutex.Lock() defer u.mutex.Unlock() var res []*ConfirmationStatus - for _, v := range id { + u.fromTmpToMap() + + for _, v := range ids { m := u.extract(v, 0, true) if m != nil { res = append(res, m) @@ -69,9 +76,19 @@ func (u *unConfirmed) extractWithConfirms(id []int64) []*ConfirmationStatus { } +func (u *unConfirmed) fromTmpToMap() { + u.tmpMutex.Lock() + defer u.tmpMutex.Unlock() + for i := range u.tmp { + u.messages[u.tmp[i].publishingId] = u.tmp[i] + } + u.tmp = u.tmp[:0] +} + func (u *unConfirmed) extractWithError(id int64, errorCode uint16) *ConfirmationStatus { u.mutex.Lock() defer u.mutex.Unlock() + u.fromTmpToMap() return u.extract(id, errorCode, false) } @@ -101,6 +118,7 @@ func (u *unConfirmed) updateStatus(rootMessage *ConfirmationStatus, errorCode ui func (u *unConfirmed) extractWithTimeOut(timeout time.Duration) []*ConfirmationStatus { u.mutex.Lock() defer u.mutex.Unlock() + u.fromTmpToMap() var res []*ConfirmationStatus for _, v := range u.messages { if time.Since(v.inserted) > timeout { @@ -114,11 +132,13 @@ func (u *unConfirmed) extractWithTimeOut(timeout time.Duration) []*ConfirmationS func (u *unConfirmed) size() int { u.mutex.Lock() defer u.mutex.Unlock() + u.fromTmpToMap() return len(u.messages) } func (u *unConfirmed) getAll() map[int64]*ConfirmationStatus { u.mutex.Lock() defer u.mutex.Unlock() + u.fromTmpToMap() return u.messages } From 62d612512746723319454557a571b475b5966c37 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 13 Jan 2025 12:17:48 +0100 Subject: [PATCH 10/18] add pool Signed-off-by: Gabriele Santomaggio --- pkg/amqp/types.go | 22 +--------------- pkg/stream/coordinator.go | 5 ++++ pkg/stream/producer.go | 40 +++++++++++++++++------------- pkg/stream/producer_unconfirmed.go | 5 ++-- 4 files changed, 32 insertions(+), 40 deletions(-) diff --git a/pkg/amqp/types.go b/pkg/amqp/types.go index be59a157..c605b72c 100644 --- a/pkg/amqp/types.go +++ b/pkg/amqp/types.go @@ -409,8 +409,6 @@ type Message struct { //receiver *Receiver // Receiver the message was received from settled bool // whether transfer was settled by sender - // doneSignal is a channel that indicate when a message is considered acted upon by downstream handler - doneSignal chan struct{} } // AMQP10 is an AMQP 1.0 message with the necessary fields to work with the @@ -501,16 +499,7 @@ func (amqp *AMQP10) GetAMQPValue() interface{} { // more complex usages. func newMessage(data []byte) *Message { return &Message{ - Data: [][]byte{data}, - doneSignal: make(chan struct{}), - } -} - -// done closes the internal doneSignal channel to let the receiver know that this message has been acted upon -func (m *Message) done() { - // TODO: move initialization in ctor and use ctor everywhere? - if m.doneSignal != nil { - close(m.doneSignal) + Data: [][]byte{data}, } } @@ -523,15 +512,6 @@ func (m *Message) GetData() []byte { return m.Data[0] } -// Ignore notifies the amqp message pump that the message has been handled -// without any disposition. It frees the amqp receiver to get the next message -// this is implicitly done after calling message dispositions (Accept/Release/Reject/Modify) -func (m *Message) Ignore() { - if m.shouldSendDisposition() { - m.done() - } -} - // MarshalBinary encodes the message into binary form. func (m *Message) MarshalBinary() ([]byte, error) { buf := new(buffer) diff --git a/pkg/stream/coordinator.go b/pkg/stream/coordinator.go index 4d8b6f62..5bf6ed43 100644 --- a/pkg/stream/coordinator.go +++ b/pkg/stream/coordinator.go @@ -73,6 +73,11 @@ func (coordinator *Coordinator) NewProducer( status: open, pendingSequencesQueue: NewBlockingQueue[*messageSequence](dynSize), confirmMutex: &sync.Mutex{}, + pool: &sync.Pool{ + New: func() any { + return &messageSequence{} + }, + }, } coordinator.producers[lastId] = producer return producer, err diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 2d75ddf8..0d14960e 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -51,11 +51,9 @@ func (cs *ConfirmationStatus) GetErrorCode() uint16 { } type messageSequence struct { - messageBytes []byte - unCompressedSize int - publishingId int64 - filterValue string - refMessage *message.StreamMessage + messageBytes []byte + publishingId int64 + filterValue string } type Producer struct { @@ -75,6 +73,7 @@ type Producer struct { publishConfirmation chan []*ConfirmationStatus pendingSequencesQueue *BlockingQueue[*messageSequence] + pool *sync.Pool } type FilterValue func(message message.StreamMessage) string @@ -313,7 +312,7 @@ func (producer *Producer) processPendingSequencesQueue() { } if msg != nil { // There is something in the queue.Checks the buffer is still less than the maxFrame - totalBufferToSend += msg.unCompressedSize + totalBufferToSend += len(msg.messageBytes) if totalBufferToSend > maxFrame { // if the totalBufferToSend is greater than the requestedMaxFrameSize // the producer sends the messages and reset the buffer @@ -380,14 +379,12 @@ func (producer *Producer) fromMessageToMessageSequence(streamMessage message.Str if producer.options.IsFilterEnabled() { filterValue = producer.options.Filter.FilterValue(streamMessage) } + fromPool := producer.pool.Get().(*messageSequence) + fromPool.messageBytes = marshalBinary + fromPool.publishingId = seq + fromPool.filterValue = filterValue - return &messageSequence{ - messageBytes: marshalBinary, - unCompressedSize: len(marshalBinary), - publishingId: seq, - filterValue: filterValue, - refMessage: &streamMessage, - }, nil + return fromPool, nil } @@ -401,11 +398,12 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error { if err != nil { return err } - producer.unConfirmed.addFromSequence(messageSeq, producer.GetID()) + producer.unConfirmed.addFromSequence(messageSeq, &streamMessage, producer.GetID()) if len(messageSeq.messageBytes) > defaultMaxFrameSize { tooLarge := producer.unConfirmed.extractWithError(messageSeq.publishingId, responseCodeFrameTooLarge) producer.sendConfirmationStatus([]*ConfirmationStatus{tooLarge}) + producer.pool.Put(messageSeq) return FrameTooLarge } @@ -431,7 +429,7 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error if err != nil { return err } - producer.unConfirmed.addFromSequence(messageSeq, producer.GetID()) + producer.unConfirmed.addFromSequence(messageSeq, &batchMessage, producer.GetID()) totalBufferToSend += len(messageSeq.messageBytes) messagesSequence = append(messagesSequence, messageSeq) @@ -445,6 +443,7 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error for _, msg := range messagesSequence { m := producer.unConfirmed.extractWithError(msg.publishingId, responseCodeFrameTooLarge) producer.sendConfirmationStatus([]*ConfirmationStatus{m}) + producer.pool.Put(msg) } return FrameTooLarge } @@ -458,6 +457,13 @@ func (producer *Producer) GetID() uint8 { } func (producer *Producer) internalBatchSend(messagesSequence []*messageSequence) error { + // remove form pool + defer func() { + for _, m := range messagesSequence { + producer.pool.Put(m) + } + }() + return producer.internalBatchSendProdId(messagesSequence, producer.GetID()) } @@ -556,7 +562,7 @@ func (producer *Producer) internalBatchSendProdId(messagesSequence []*messageSeq if !producer.options.isSubEntriesBatching() { for _, msg := range messagesSequence { - msgLen += msg.unCompressedSize + 8 + 4 + msgLen += len(msg.messageBytes) + 8 + 4 } } @@ -718,7 +724,7 @@ func (producer *Producer) sendWithFilter(messagesSequence []*messageSequence, pr frameHeaderLength := initBufferPublishSize var msgLen int for _, msg := range messagesSequence { - msgLen += msg.unCompressedSize + 8 + 4 + msgLen += len(msg.messageBytes) + 8 + 4 if msg.filterValue != "" { msgLen += 2 + len(msg.filterValue) } diff --git a/pkg/stream/producer_unconfirmed.go b/pkg/stream/producer_unconfirmed.go index bfc52365..b14822ae 100644 --- a/pkg/stream/producer_unconfirmed.go +++ b/pkg/stream/producer_unconfirmed.go @@ -1,6 +1,7 @@ package stream import ( + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" "sync" "time" ) @@ -34,13 +35,13 @@ func newUnConfirmed() *unConfirmed { return r } -func (u *unConfirmed) addFromSequence(message *messageSequence, producerID uint8) { +func (u *unConfirmed) addFromSequence(message *messageSequence, source *message.StreamMessage, producerID uint8) { u.tmpMutex.Lock() u.tmp = append(u.tmp, &ConfirmationStatus{ //u.messages[message.publishingId] = &ConfirmationStatus{ inserted: time.Now(), - message: *message.refMessage, + message: *source, producerID: producerID, publishingId: message.publishingId, confirmed: false, From 9ff963fbd8cc6cbc459ce4d03b00b0e4c413e2e4 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 13 Jan 2025 13:15:18 +0100 Subject: [PATCH 11/18] rename Signed-off-by: Gabriele Santomaggio --- pkg/stream/client.go | 8 +++--- pkg/stream/consumer.go | 2 +- pkg/stream/producer.go | 4 +-- pkg/stream/producer_unconfirmed.go | 43 +++++++++++++++--------------- 4 files changed, 28 insertions(+), 29 deletions(-) diff --git a/pkg/stream/client.go b/pkg/stream/client.go index ae94d525..dd8184e0 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -112,14 +112,14 @@ func newClient(connectionName string, broker *Broker, } func (c *Client) getSocket() *socket { - //c.mutex.Lock() - //defer c.mutex.Unlock() + //c.mutexMessageMap.Lock() + //defer c.mutexMessageMap.Unlock() return &c.socket } func (c *Client) setSocketConnection(connection net.Conn) { - //c.mutex.Lock() - //defer c.mutex.Unlock() + //c.mutexMessageMap.Lock() + //defer c.mutexMessageMap.Unlock() c.socket.connection = connection c.socket.writer = bufio.NewWriter(connection) } diff --git a/pkg/stream/consumer.go b/pkg/stream/consumer.go index 8f6d8c70..64a7ee10 100644 --- a/pkg/stream/consumer.go +++ b/pkg/stream/consumer.go @@ -378,7 +378,7 @@ func (consumer *Consumer) cacheStoreOffset() { consumer.mutex.Lock() consumer.lastAutoCommitStored = time.Now() consumer.messageCountBeforeStorage = 0 - consumer.mutex.Unlock() // updateLastStoredOffset() in internalStoreOffset() also locks mutex, so not using defer for unlock + consumer.mutex.Unlock() // updateLastStoredOffset() in internalStoreOffset() also locks mutexMessageMap, so not using defer for unlock err := consumer.internalStoreOffset() if err != nil { diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 0d14960e..76807117 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -535,8 +535,8 @@ func (producer *Producer) aggregateEntities(msgs []*messageSequence, size int, c // / the producer id is always the producer.GetID(). This function is needed only for testing // some condition, like simulate publish error. func (producer *Producer) internalBatchSendProdId(messagesSequence []*messageSequence, producerID uint8) error { - //producer.options.client.socket.mutex.Lock() - //defer producer.options.client.socket.mutex.Unlock() + //producer.options.client.socket.mutexMessageMap.Lock() + //defer producer.options.client.socket.mutexMessageMap.Unlock() if producer.getStatus() == closed { return fmt.Errorf("producer id: %d closed", producer.id) } diff --git a/pkg/stream/producer_unconfirmed.go b/pkg/stream/producer_unconfirmed.go index b14822ae..63ca9d37 100644 --- a/pkg/stream/producer_unconfirmed.go +++ b/pkg/stream/producer_unconfirmed.go @@ -15,10 +15,10 @@ import ( // The confirmation status is updated when the confirmation is received from the broker (see server_frame.go) // or due of timeout. The Timeout is configurable, and it is calculated client side. type unConfirmed struct { - messages map[int64]*ConfirmationStatus - tmp []*ConfirmationStatus - tmpMutex sync.Mutex - mutex sync.RWMutex + messages map[int64]*ConfirmationStatus + tmp []*ConfirmationStatus + tmpMutex sync.Mutex + mutexMessageMap sync.RWMutex } const DefaultUnconfirmedSize = 10_000 @@ -26,10 +26,10 @@ const DefaultUnconfirmedSize = 10_000 func newUnConfirmed() *unConfirmed { r := &unConfirmed{ - messages: make(map[int64]*ConfirmationStatus, DefaultUnconfirmedSize), - tmp: []*ConfirmationStatus{}, - tmpMutex: sync.Mutex{}, - mutex: sync.RWMutex{}, + messages: make(map[int64]*ConfirmationStatus, DefaultUnconfirmedSize), + tmp: []*ConfirmationStatus{}, + tmpMutex: sync.Mutex{}, + mutexMessageMap: sync.RWMutex{}, } return r @@ -39,7 +39,6 @@ func (u *unConfirmed) addFromSequence(message *messageSequence, source *message. u.tmpMutex.Lock() u.tmp = append(u.tmp, &ConfirmationStatus{ - //u.messages[message.publishingId] = &ConfirmationStatus{ inserted: time.Now(), message: *source, producerID: producerID, @@ -50,8 +49,8 @@ func (u *unConfirmed) addFromSequence(message *messageSequence, source *message. } func (u *unConfirmed) link(from int64, to int64) { - u.mutex.Lock() - defer u.mutex.Unlock() + u.mutexMessageMap.Lock() + defer u.mutexMessageMap.Unlock() r := u.messages[from] if r != nil { r.linkedTo = append(r.linkedTo, u.messages[to]) @@ -59,10 +58,10 @@ func (u *unConfirmed) link(from int64, to int64) { } func (u *unConfirmed) extractWithConfirms(ids []int64) []*ConfirmationStatus { - u.mutex.Lock() - defer u.mutex.Unlock() + u.mutexMessageMap.Lock() + defer u.mutexMessageMap.Unlock() var res []*ConfirmationStatus - u.fromTmpToMap() + u.fromTmpToMap() /// for _, v := range ids { m := u.extract(v, 0, true) @@ -87,8 +86,8 @@ func (u *unConfirmed) fromTmpToMap() { } func (u *unConfirmed) extractWithError(id int64, errorCode uint16) *ConfirmationStatus { - u.mutex.Lock() - defer u.mutex.Unlock() + u.mutexMessageMap.Lock() + defer u.mutexMessageMap.Unlock() u.fromTmpToMap() return u.extract(id, errorCode, false) } @@ -117,8 +116,8 @@ func (u *unConfirmed) updateStatus(rootMessage *ConfirmationStatus, errorCode ui } func (u *unConfirmed) extractWithTimeOut(timeout time.Duration) []*ConfirmationStatus { - u.mutex.Lock() - defer u.mutex.Unlock() + u.mutexMessageMap.Lock() + defer u.mutexMessageMap.Unlock() u.fromTmpToMap() var res []*ConfirmationStatus for _, v := range u.messages { @@ -131,15 +130,15 @@ func (u *unConfirmed) extractWithTimeOut(timeout time.Duration) []*ConfirmationS } func (u *unConfirmed) size() int { - u.mutex.Lock() - defer u.mutex.Unlock() + u.mutexMessageMap.Lock() + defer u.mutexMessageMap.Unlock() u.fromTmpToMap() return len(u.messages) } func (u *unConfirmed) getAll() map[int64]*ConfirmationStatus { - u.mutex.Lock() - defer u.mutex.Unlock() + u.mutexMessageMap.Lock() + defer u.mutexMessageMap.Unlock() u.fromTmpToMap() return u.messages } From 811d615b90bb7e0161c6d8b552bb15f6cbae0133 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 13 Jan 2025 14:55:04 +0100 Subject: [PATCH 12/18] remove loop Signed-off-by: Gabriele Santomaggio --- pkg/stream/aggregation_test.go | 5 ++--- pkg/stream/client.go | 2 +- pkg/stream/consumer.go | 8 ++++---- pkg/stream/coordinator.go | 5 ----- pkg/stream/producer.go | 25 +++++++------------------ pkg/stream/producer_test.go | 23 ++++++++--------------- 6 files changed, 22 insertions(+), 46 deletions(-) diff --git a/pkg/stream/aggregation_test.go b/pkg/stream/aggregation_test.go index e4c16375..8ec901ff 100644 --- a/pkg/stream/aggregation_test.go +++ b/pkg/stream/aggregation_test.go @@ -18,9 +18,8 @@ var _ = Describe("Compression algorithms", func() { } message := &messageSequence{ - messageBytes: messagePayload, - unCompressedSize: len(messagePayload), - publishingId: 0, + messageBytes: messagePayload, + publishingId: 0, } entries = &subEntries{ diff --git a/pkg/stream/client.go b/pkg/stream/client.go index dd8184e0..1f493eeb 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -432,7 +432,7 @@ func (c *Client) heartBeat() { tickerHeartbeat.Stop() return case <-tickerHeartbeat.C: - for c.socket.isOpen() { + if c.socket.isOpen() { if time.Since(c.getLastHeartBeat()) > time.Duration(c.tuneState.requestedHeartbeat)*time.Second { v := atomic.AddInt32(&heartBeatMissed, 1) logs.LogWarn("Missing heart beat: %d", v) diff --git a/pkg/stream/consumer.go b/pkg/stream/consumer.go index 64a7ee10..161f868c 100644 --- a/pkg/stream/consumer.go +++ b/pkg/stream/consumer.go @@ -337,10 +337,10 @@ func (consumer *Consumer) close(reason Event) error { consumer.closeHandler = nil } - if consumer.chunkForConsumer != nil { - close(consumer.chunkForConsumer) - consumer.chunkForConsumer = nil - } + //if consumer.chunkForConsumer != nil { + close(consumer.chunkForConsumer) + //consumer.chunkForConsumer = nil + //} if consumer.response.data != nil { close(consumer.response.data) diff --git a/pkg/stream/coordinator.go b/pkg/stream/coordinator.go index 5bf6ed43..4d8b6f62 100644 --- a/pkg/stream/coordinator.go +++ b/pkg/stream/coordinator.go @@ -73,11 +73,6 @@ func (coordinator *Coordinator) NewProducer( status: open, pendingSequencesQueue: NewBlockingQueue[*messageSequence](dynSize), confirmMutex: &sync.Mutex{}, - pool: &sync.Pool{ - New: func() any { - return &messageSequence{} - }, - }, } coordinator.producers[lastId] = producer return producer, err diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 76807117..9c8d9663 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -73,7 +73,6 @@ type Producer struct { publishConfirmation chan []*ConfirmationStatus pendingSequencesQueue *BlockingQueue[*messageSequence] - pool *sync.Pool } type FilterValue func(message message.StreamMessage) string @@ -296,7 +295,6 @@ func (producer *Producer) processPendingSequencesQueue() { var lastError error for { select { - case msg, ok := <-producer.pendingSequencesQueue.GetChannel(): { if !ok { @@ -379,12 +377,12 @@ func (producer *Producer) fromMessageToMessageSequence(streamMessage message.Str if producer.options.IsFilterEnabled() { filterValue = producer.options.Filter.FilterValue(streamMessage) } - fromPool := producer.pool.Get().(*messageSequence) - fromPool.messageBytes = marshalBinary - fromPool.publishingId = seq - fromPool.filterValue = filterValue + msqSeq := &messageSequence{} + msqSeq.messageBytes = marshalBinary + msqSeq.publishingId = seq + msqSeq.filterValue = filterValue - return fromPool, nil + return msqSeq, nil } @@ -403,7 +401,6 @@ func (producer *Producer) Send(streamMessage message.StreamMessage) error { if len(messageSeq.messageBytes) > defaultMaxFrameSize { tooLarge := producer.unConfirmed.extractWithError(messageSeq.publishingId, responseCodeFrameTooLarge) producer.sendConfirmationStatus([]*ConfirmationStatus{tooLarge}) - producer.pool.Put(messageSeq) return FrameTooLarge } @@ -443,7 +440,6 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error for _, msg := range messagesSequence { m := producer.unConfirmed.extractWithError(msg.publishingId, responseCodeFrameTooLarge) producer.sendConfirmationStatus([]*ConfirmationStatus{m}) - producer.pool.Put(msg) } return FrameTooLarge } @@ -457,13 +453,6 @@ func (producer *Producer) GetID() uint8 { } func (producer *Producer) internalBatchSend(messagesSequence []*messageSequence) error { - // remove form pool - defer func() { - for _, m := range messagesSequence { - producer.pool.Put(m) - } - }() - return producer.internalBatchSendProdId(messagesSequence, producer.GetID()) } @@ -535,8 +524,8 @@ func (producer *Producer) aggregateEntities(msgs []*messageSequence, size int, c // / the producer id is always the producer.GetID(). This function is needed only for testing // some condition, like simulate publish error. func (producer *Producer) internalBatchSendProdId(messagesSequence []*messageSequence, producerID uint8) error { - //producer.options.client.socket.mutexMessageMap.Lock() - //defer producer.options.client.socket.mutexMessageMap.Unlock() + producer.options.client.socket.mutex.Lock() + defer producer.options.client.socket.mutex.Unlock() if producer.getStatus() == closed { return fmt.Errorf("producer id: %d closed", producer.id) } diff --git a/pkg/stream/producer_test.go b/pkg/stream/producer_test.go index a35381e1..8cb31fc4 100644 --- a/pkg/stream/producer_test.go +++ b/pkg/stream/producer_test.go @@ -315,7 +315,7 @@ var _ = Describe("Streaming Producers", func() { }) - It("Smart Send/Close", Focus, func() { + It("Smart Send/Close", func() { producer, err := testEnvironment.NewProducer(testProducerStream, nil) Expect(err).NotTo(HaveOccurred()) var messagesReceived int32 @@ -570,8 +570,7 @@ var _ = Describe("Streaming Producers", func() { for i := 0; i < 1; i++ { s := make([]byte, 50) messagesSequence[i] = &messageSequence{ - messageBytes: s, - unCompressedSize: len(s), + messageBytes: s, } } @@ -579,8 +578,7 @@ var _ = Describe("Streaming Producers", func() { msg.SetPublishingId(1) messageBytes, _ := msg.MarshalBinary() messagesSequence[0] = &messageSequence{ - messageBytes: messageBytes, - unCompressedSize: len(messageBytes), + messageBytes: messageBytes, } // 200 producer ID doesn't exist @@ -659,8 +657,7 @@ var _ = Describe("Streaming Producers", func() { for i := 0; i < 201; i++ { s := make([]byte, 50) messagesSequence[i] = &messageSequence{ - messageBytes: s, - unCompressedSize: len(s), + messageBytes: s, } } @@ -676,8 +673,7 @@ var _ = Describe("Streaming Producers", func() { s := make([]byte, 50) messagesSequence[i] = &messageSequence{ - messageBytes: s, - unCompressedSize: len(s), + messageBytes: s, } } @@ -692,8 +688,7 @@ var _ = Describe("Streaming Producers", func() { for i := 0; i < 1; i++ { s := make([]byte, 50) messagesSequence[i] = &messageSequence{ - messageBytes: s, - unCompressedSize: len(s), + messageBytes: s, } } @@ -708,8 +703,7 @@ var _ = Describe("Streaming Producers", func() { for i := 0; i < 1000; i++ { s := make([]byte, 50) messagesSequence[i] = &messageSequence{ - messageBytes: s, - unCompressedSize: len(s), + messageBytes: s, } } @@ -724,8 +718,7 @@ var _ = Describe("Streaming Producers", func() { for i := 0; i < 14; i++ { s := make([]byte, 50) messagesSequence[i] = &messageSequence{ - messageBytes: s, - unCompressedSize: len(s), + messageBytes: s, } } From df586f3a3d923fd2be0fcdcfdc0e456574902ad6 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 13 Jan 2025 15:16:06 +0100 Subject: [PATCH 13/18] restore map Signed-off-by: Gabriele Santomaggio --- pkg/stream/consumer.go | 4 +++- pkg/stream/producer_unconfirmed.go | 26 ++++---------------------- 2 files changed, 7 insertions(+), 23 deletions(-) diff --git a/pkg/stream/consumer.go b/pkg/stream/consumer.go index 161f868c..184e7c87 100644 --- a/pkg/stream/consumer.go +++ b/pkg/stream/consumer.go @@ -325,7 +325,9 @@ func (consumer *Consumer) Close() error { func (consumer *Consumer) close(reason Event) error { if consumer.options == nil { - logs.LogWarn("consumer options is nil, the close will be ignored") + // the config is usually set. this check is just to avoid panic and to make some test + // easier to write + logs.LogDebug("consumer options is nil, the close will be ignored") return nil } consumer.cacheStoreOffset() diff --git a/pkg/stream/producer_unconfirmed.go b/pkg/stream/producer_unconfirmed.go index 63ca9d37..9d232c26 100644 --- a/pkg/stream/producer_unconfirmed.go +++ b/pkg/stream/producer_unconfirmed.go @@ -16,8 +16,6 @@ import ( // or due of timeout. The Timeout is configurable, and it is calculated client side. type unConfirmed struct { messages map[int64]*ConfirmationStatus - tmp []*ConfirmationStatus - tmpMutex sync.Mutex mutexMessageMap sync.RWMutex } @@ -27,8 +25,6 @@ func newUnConfirmed() *unConfirmed { r := &unConfirmed{ messages: make(map[int64]*ConfirmationStatus, DefaultUnconfirmedSize), - tmp: []*ConfirmationStatus{}, - tmpMutex: sync.Mutex{}, mutexMessageMap: sync.RWMutex{}, } @@ -37,15 +33,15 @@ func newUnConfirmed() *unConfirmed { func (u *unConfirmed) addFromSequence(message *messageSequence, source *message.StreamMessage, producerID uint8) { - u.tmpMutex.Lock() - u.tmp = append(u.tmp, &ConfirmationStatus{ + u.mutexMessageMap.Lock() + u.messages[message.publishingId] = &ConfirmationStatus{ inserted: time.Now(), message: *source, producerID: producerID, publishingId: message.publishingId, confirmed: false, - }) - u.tmpMutex.Unlock() + } + u.mutexMessageMap.Unlock() } func (u *unConfirmed) link(from int64, to int64) { @@ -61,7 +57,6 @@ func (u *unConfirmed) extractWithConfirms(ids []int64) []*ConfirmationStatus { u.mutexMessageMap.Lock() defer u.mutexMessageMap.Unlock() var res []*ConfirmationStatus - u.fromTmpToMap() /// for _, v := range ids { m := u.extract(v, 0, true) @@ -76,19 +71,9 @@ func (u *unConfirmed) extractWithConfirms(ids []int64) []*ConfirmationStatus { } -func (u *unConfirmed) fromTmpToMap() { - u.tmpMutex.Lock() - defer u.tmpMutex.Unlock() - for i := range u.tmp { - u.messages[u.tmp[i].publishingId] = u.tmp[i] - } - u.tmp = u.tmp[:0] -} - func (u *unConfirmed) extractWithError(id int64, errorCode uint16) *ConfirmationStatus { u.mutexMessageMap.Lock() defer u.mutexMessageMap.Unlock() - u.fromTmpToMap() return u.extract(id, errorCode, false) } @@ -118,7 +103,6 @@ func (u *unConfirmed) updateStatus(rootMessage *ConfirmationStatus, errorCode ui func (u *unConfirmed) extractWithTimeOut(timeout time.Duration) []*ConfirmationStatus { u.mutexMessageMap.Lock() defer u.mutexMessageMap.Unlock() - u.fromTmpToMap() var res []*ConfirmationStatus for _, v := range u.messages { if time.Since(v.inserted) > timeout { @@ -132,13 +116,11 @@ func (u *unConfirmed) extractWithTimeOut(timeout time.Duration) []*ConfirmationS func (u *unConfirmed) size() int { u.mutexMessageMap.Lock() defer u.mutexMessageMap.Unlock() - u.fromTmpToMap() return len(u.messages) } func (u *unConfirmed) getAll() map[int64]*ConfirmationStatus { u.mutexMessageMap.Lock() defer u.mutexMessageMap.Unlock() - u.fromTmpToMap() return u.messages } From 729e156b775fdd1fa0b3756ff2bb47cbe11770f8 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 13 Jan 2025 15:51:24 +0100 Subject: [PATCH 14/18] logs Signed-off-by: Gabriele Santomaggio --- pkg/stream/consumer.go | 2 ++ pkg/stream/producer.go | 6 ++++-- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/stream/consumer.go b/pkg/stream/consumer.go index 184e7c87..7c43a607 100644 --- a/pkg/stream/consumer.go +++ b/pkg/stream/consumer.go @@ -330,6 +330,8 @@ func (consumer *Consumer) close(reason Event) error { logs.LogDebug("consumer options is nil, the close will be ignored") return nil } + + _, _ = consumer.options.client.coordinator.ExtractConsumerById(consumer.ID) consumer.cacheStoreOffset() consumer.setStatus(closed) diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 9c8d9663..c238b7f2 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -298,7 +298,7 @@ func (producer *Producer) processPendingSequencesQueue() { case msg, ok := <-producer.pendingSequencesQueue.GetChannel(): { if !ok { - logs.LogInfo("producer %d processPendingSequencesQueue closed", producer.id) + logs.LogDebug("producer %d processPendingSequencesQueue closed by closed channel", producer.id) return } @@ -631,7 +631,9 @@ func (producer *Producer) close(reason Event) error { producer.closeConfirmationStatus() if producer.options == nil { - logs.LogWarn("producer options is nil, the close will be ignored") + // the options are usually not nil. This is just for safety and for to make some + // test easier to write + logs.LogDebug("producer options is nil, the close will be ignored") return nil } _, _ = producer.options.client.coordinator.ExtractProducerById(producer.id) From 101a3d2b1c6c541e2c2dd3d49333be8848fed58d Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 13 Jan 2025 17:19:23 +0100 Subject: [PATCH 15/18] restore dynamic batch Signed-off-by: Gabriele Santomaggio --- .github/workflows/build_and_test.yml | 2 +- pkg/stream/producer.go | 22 +--------------------- 2 files changed, 2 insertions(+), 22 deletions(-) diff --git a/.github/workflows/build_and_test.yml b/.github/workflows/build_and_test.yml index 9275882d..7b85e3dd 100644 --- a/.github/workflows/build_and_test.yml +++ b/.github/workflows/build_and_test.yml @@ -54,7 +54,7 @@ jobs: check-latest: true - run: make test GO_VERSION=${{ steps.setup_go.outputs.go-version }} - uses: actions/checkout@main - - uses: codecov/codecov-action@v4 + - uses: codecov/codecov-action@v5 with: fail_ci_if_error: false # optional (default = false) files: ./coverage.txt diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index c238b7f2..4f315bf9 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -284,12 +284,7 @@ func (producer *Producer) closeConfirmationStatus() { func (producer *Producer) processPendingSequencesQueue() { maxFrame := producer.options.client.getTuneState().requestedMaxFrameSize - var avarage = 0 - iterations := 0 go func() { - var ticker = time.NewTicker(time.Duration(producer.options.BatchPublishingDelay) * time.Millisecond) - defer ticker.Stop() - sequenceToSend := make([]*messageSequence, 0) totalBufferToSend := initBufferPublishSize var lastError error @@ -322,33 +317,18 @@ func (producer *Producer) processPendingSequencesQueue() { sequenceToSend = append(sequenceToSend, msg) } - canSend := producer.pendingSequencesQueue.IsEmpty() && producer.options.BatchPublishingDelay == 0 + canSend := producer.pendingSequencesQueue.IsEmpty() // if producer.pendingSequencesQueue.IsEmpty() means that the queue is empty so the producer is not sending // the messages during the checks of the buffer. In this case if canSend || len(sequenceToSend) >= producer.options.BatchSize { if len(sequenceToSend) > 0 { - avarage += len(sequenceToSend) - iterations++ - if iterations > 100000 { - logs.LogInfo("producer %d average: %d", producer.id, avarage/iterations) - avarage = 0 - iterations = 0 - } - lastError = producer.internalBatchSend(sequenceToSend) sequenceToSend = sequenceToSend[:0] totalBufferToSend += initBufferPublishSize } } } - case <-ticker.C: - if len(sequenceToSend) > 0 { - lastError = producer.internalBatchSend(sequenceToSend) - sequenceToSend = sequenceToSend[:0] - totalBufferToSend += initBufferPublishSize - } } - if lastError != nil { logs.LogError("error during sending messages: %s", lastError) } From d71bb84c2e30c74c7fe19d0dc31f715a6cdfb1db Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 13 Jan 2025 18:01:15 +0100 Subject: [PATCH 16/18] clean the consumer buffer in case the consumer does not exist Signed-off-by: Gabriele Santomaggio --- pkg/stream/blocking_queue.go | 33 ---------------- pkg/stream/producer.go | 73 +++++++++++++++++------------------- pkg/stream/server_frame.go | 15 ++++++-- 3 files changed, 46 insertions(+), 75 deletions(-) diff --git a/pkg/stream/blocking_queue.go b/pkg/stream/blocking_queue.go index 2522ed4e..1da39b31 100644 --- a/pkg/stream/blocking_queue.go +++ b/pkg/stream/blocking_queue.go @@ -35,32 +35,6 @@ func (bq *BlockingQueue[T]) Enqueue(item T) error { return nil } -// Dequeue removes an item from the queue with a timeout -func (bq *BlockingQueue[T]) Dequeue(timeout time.Duration) T { - if bq.IsStopped() { - var zeroValue T // Zero value of type T - return zeroValue - } - - select { - case item, ok := <-bq.queue: - if !ok { - var zeroValue T // Zero value of type T - return zeroValue - } - return item - case <-time.After(timeout): - var zeroValue T // Zero value of type T - return zeroValue - } -} - -func (bq *BlockingQueue[T]) Process(f func(T)) { - for item := range bq.queue { - f(item) - } -} - func (bq *BlockingQueue[T]) GetChannel() chan T { return bq.queue } @@ -73,13 +47,6 @@ func (bq *BlockingQueue[T]) IsEmpty() bool { return len(bq.queue) == 0 } -func (bq *BlockingQueue[T]) IsReadyToSend() bool { - - millis := time.Since(time.Unix(0, atomic.LoadInt64(&bq.lastUpdate))).Milliseconds() - - return millis > 10 && len(bq.queue) == 0 -} - // Stop stops the queue from accepting new items // but allows some pending items. // Stop is different from Close in that it allows the diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 4f315bf9..940b83fb 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -287,46 +287,37 @@ func (producer *Producer) processPendingSequencesQueue() { go func() { sequenceToSend := make([]*messageSequence, 0) totalBufferToSend := initBufferPublishSize - var lastError error - for { - select { - case msg, ok := <-producer.pendingSequencesQueue.GetChannel(): - { - if !ok { - logs.LogDebug("producer %d processPendingSequencesQueue closed by closed channel", producer.id) - return - } + for msg := range producer.pendingSequencesQueue.GetChannel() { + var lastError error + + // the dequeue is blocking with a timeout of 500ms + // as soon as a message is available the Dequeue will be unblocked + //msg := producer.pendingSequencesQueue.Dequeue(time.Millisecond * 500) + if producer.pendingSequencesQueue.IsStopped() { + break + } + if msg != nil { + // There is something in the queue.Checks the buffer is still less than the maxFrame + totalBufferToSend += len(msg.messageBytes) + if totalBufferToSend > maxFrame { + // if the totalBufferToSend is greater than the requestedMaxFrameSize + // the producer sends the messages and reset the buffer + lastError = producer.internalBatchSend(sequenceToSend) + sequenceToSend = sequenceToSend[:0] + totalBufferToSend = initBufferPublishSize + } - // the dequeue is blocking with a timeout of 500ms - // as soon as a message is available the Dequeue will be unblocked - //msg := producer.pendingSequencesQueue.Dequeue(time.Millisecond * 500) - if producer.pendingSequencesQueue.IsStopped() { - break - } - if msg != nil { - // There is something in the queue.Checks the buffer is still less than the maxFrame - totalBufferToSend += len(msg.messageBytes) - if totalBufferToSend > maxFrame { - // if the totalBufferToSend is greater than the requestedMaxFrameSize - // the producer sends the messages and reset the buffer - lastError = producer.internalBatchSend(sequenceToSend) - sequenceToSend = sequenceToSend[:0] - totalBufferToSend = initBufferPublishSize - } - - sequenceToSend = append(sequenceToSend, msg) - } + sequenceToSend = append(sequenceToSend, msg) + } - canSend := producer.pendingSequencesQueue.IsEmpty() - // if producer.pendingSequencesQueue.IsEmpty() means that the queue is empty so the producer is not sending - // the messages during the checks of the buffer. In this case - if canSend || len(sequenceToSend) >= producer.options.BatchSize { - if len(sequenceToSend) > 0 { - lastError = producer.internalBatchSend(sequenceToSend) - sequenceToSend = sequenceToSend[:0] - totalBufferToSend += initBufferPublishSize - } - } + canSend := producer.pendingSequencesQueue.IsEmpty() + // if producer.pendingSequencesQueue.IsEmpty() means that the queue is empty so the producer is not sending + // the messages during the checks of the buffer. In this case + if canSend || len(sequenceToSend) >= producer.options.BatchSize { + if len(sequenceToSend) > 0 { + lastError = producer.internalBatchSend(sequenceToSend) + sequenceToSend = sequenceToSend[:0] + totalBufferToSend += initBufferPublishSize } } if lastError != nil { @@ -334,6 +325,12 @@ func (producer *Producer) processPendingSequencesQueue() { } } + // just in case there are messages in the buffer + // not matter is sent or not the messages will be timed out + if len(sequenceToSend) > 0 { + _ = producer.internalBatchSend(sequenceToSend) + } + }() logs.LogDebug("producer %d processPendingSequencesQueue closed", producer.id) } diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 9f8438d3..0afb3c4b 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -296,9 +296,9 @@ func (c *Client) handleDeliver(r *bufio.Reader) { subscriptionId := readByte(r) consumer, err := c.coordinator.GetConsumerById(subscriptionId) + consumerFound := err == nil if err != nil { logs.LogError("Handle Deliver consumer not found %s", err) - return } @@ -320,6 +320,16 @@ func (c *Client) handleDeliver(r *bufio.Reader) { var offsetLimit int64 = -1 + var bytesBuffer = make([]byte, int(dataLength)) + _, err = io.ReadFull(r, bytesBuffer) + logErrorCommand(err, "handleDeliver") + + if !consumerFound { + // even if the consumer is not found we need to read the buffer + logs.LogWarn("the consumer was not found %d. cleaning the buffer", subscriptionId) + return + } + // we can have two cases // 1. single active consumer is enabled // 2. single active consumer is not enabled @@ -346,9 +356,6 @@ func (c *Client) handleDeliver(r *bufio.Reader) { batchConsumingMessages := make(offsetMessages, 0, numRecords) var chunk chunkInfo chunk.numEntries = numEntries - var bytesBuffer = make([]byte, int(dataLength)) - _, err = io.ReadFull(r, bytesBuffer) - logErrorCommand(err, "handleDeliver") /// headers ---> payload -> messages From eff30cf77d09473ee332512c6f468e98bad4d83b Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 14 Jan 2025 11:34:15 +0100 Subject: [PATCH 17/18] make the handler mandatory Signed-off-by: Gabriele Santomaggio --- pkg/stream/client.go | 4 ++++ pkg/stream/consumer.go | 5 +---- pkg/stream/consumer_test.go | 7 +++++++ pkg/stream/environment.go | 13 ------------- pkg/stream/producer.go | 36 ++++++++++++++---------------------- 5 files changed, 26 insertions(+), 39 deletions(-) diff --git a/pkg/stream/client.go b/pkg/stream/client.go index 1f493eeb..43ad9beb 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -884,6 +884,10 @@ func (c *Client) DeclareSubscriber(streamName string, return nil, fmt.Errorf("message count before storage must be bigger than one") } + if messagesHandler == nil { + return nil, fmt.Errorf("messages Handler must be set") + } + if options.Offset.isLastConsumed() { lastOffset, err := c.queryOffset(options.ConsumerName, streamName) switch { diff --git a/pkg/stream/consumer.go b/pkg/stream/consumer.go index 7c43a607..9add7319 100644 --- a/pkg/stream/consumer.go +++ b/pkg/stream/consumer.go @@ -331,7 +331,6 @@ func (consumer *Consumer) close(reason Event) error { return nil } - _, _ = consumer.options.client.coordinator.ExtractConsumerById(consumer.ID) consumer.cacheStoreOffset() consumer.setStatus(closed) @@ -341,10 +340,7 @@ func (consumer *Consumer) close(reason Event) error { consumer.closeHandler = nil } - //if consumer.chunkForConsumer != nil { close(consumer.chunkForConsumer) - //consumer.chunkForConsumer = nil - //} if consumer.response.data != nil { close(consumer.response.data) @@ -365,6 +361,7 @@ func (consumer *Consumer) close(reason Event) error { logs.LogWarn("error during consumer unsubscribe:%s", err.Err) } } + _, _ = consumer.options.client.coordinator.ExtractConsumerById(consumer.ID) if consumer.options != nil && consumer.options.client.coordinator.ConsumersCount() == 0 { _ = consumer.options.client.Close() diff --git a/pkg/stream/consumer_test.go b/pkg/stream/consumer_test.go index a1436f72..e9dc6a43 100644 --- a/pkg/stream/consumer_test.go +++ b/pkg/stream/consumer_test.go @@ -641,6 +641,13 @@ var _ = Describe("Streaming Consumers", func() { NewAutoCommitStrategy().SetFlushInterval(10*time.Millisecond))) Expect(err).To(HaveOccurred()) + // message handler must be set + _, err = env.NewConsumer(streamName, + nil, &ConsumerOptions{ + Offset: OffsetSpecification{}, + }) + Expect(err).To(HaveOccurred()) + }) It("Sub Batch consumer with different publishers GZIP and Not", func() { diff --git a/pkg/stream/environment.go b/pkg/stream/environment.go index 11a5cc52..541c49af 100644 --- a/pkg/stream/environment.go +++ b/pkg/stream/environment.go @@ -627,19 +627,6 @@ func (cc *environmentCoordinator) newConsumer(connectionName string, leader *Bro if clientResult == nil { clientResult = newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeout) - //chMeta := make(chan metaDataUpdateEvent) - //clientResult.metadataListener = chMeta - //go func(ch <-chan metaDataUpdateEvent, cl *Client) { - // for metaDataUpdateEvent := range ch { - // clientResult.maybeCleanConsumers(metaDataUpdateEvent.StreamName) - // cc.maybeCleanClients() - // if !cl.socket.isOpen() { - // return - // } - // } - // - //}(chMeta, clientResult) - cc.nextId++ cc.clientsPerContext[cc.nextId] = clientResult } diff --git a/pkg/stream/producer.go b/pkg/stream/producer.go index 940b83fb..c6c3f09d 100644 --- a/pkg/stream/producer.go +++ b/pkg/stream/producer.go @@ -99,7 +99,8 @@ type ProducerOptions struct { // It is not used anymore given the dynamic batching QueueSize int // Internal queue to handle back-pressure, low value reduces the back-pressure on the server BatchSize int // It is the batch-unCompressedSize aggregation, low value reduce the latency, high value increase the throughput. Valid only for the method Send() - + // Deprecated: starting from 1.5.0 the SetBatchPublishingDelay is deprecated, and it will be removed in the next releases + // It is not used anymore given the dynamic batching BatchPublishingDelay int // Timout within the aggregation sent a batch of messages. Valid only for the method Send() // Size of sub Entry, to aggregate more subEntry using one publishing id SubEntrySize int @@ -290,30 +291,24 @@ func (producer *Producer) processPendingSequencesQueue() { for msg := range producer.pendingSequencesQueue.GetChannel() { var lastError error - // the dequeue is blocking with a timeout of 500ms - // as soon as a message is available the Dequeue will be unblocked - //msg := producer.pendingSequencesQueue.Dequeue(time.Millisecond * 500) if producer.pendingSequencesQueue.IsStopped() { break } - if msg != nil { - // There is something in the queue.Checks the buffer is still less than the maxFrame - totalBufferToSend += len(msg.messageBytes) - if totalBufferToSend > maxFrame { - // if the totalBufferToSend is greater than the requestedMaxFrameSize - // the producer sends the messages and reset the buffer - lastError = producer.internalBatchSend(sequenceToSend) - sequenceToSend = sequenceToSend[:0] - totalBufferToSend = initBufferPublishSize - } - - sequenceToSend = append(sequenceToSend, msg) + // There is something in the queue. Checks the buffer is still less than the maxFrame + totalBufferToSend += len(msg.messageBytes) + if totalBufferToSend > maxFrame { + // if the totalBufferToSend is greater than the requestedMaxFrameSize + // the producer sends the messages and reset the buffer + lastError = producer.internalBatchSend(sequenceToSend) + sequenceToSend = sequenceToSend[:0] + totalBufferToSend = initBufferPublishSize } - canSend := producer.pendingSequencesQueue.IsEmpty() + sequenceToSend = append(sequenceToSend, msg) + // if producer.pendingSequencesQueue.IsEmpty() means that the queue is empty so the producer is not sending // the messages during the checks of the buffer. In this case - if canSend || len(sequenceToSend) >= producer.options.BatchSize { + if producer.pendingSequencesQueue.IsEmpty() || len(sequenceToSend) >= producer.options.BatchSize { if len(sequenceToSend) > 0 { lastError = producer.internalBatchSend(sequenceToSend) sequenceToSend = sequenceToSend[:0] @@ -358,15 +353,13 @@ func (producer *Producer) fromMessageToMessageSequence(streamMessage message.Str msqSeq.messageBytes = marshalBinary msqSeq.publishingId = seq msqSeq.filterValue = filterValue - return msqSeq, nil - } // Send sends a message to the stream and returns an error if the message could not be sent. // The Send is asynchronous. The message is sent to a channel ant then other goroutines aggregate and sent the messages // The Send is dynamic so the number of messages sent decided internally based on the BatchSize -// and the messages contained in the buffer. The aggregation is up to the client. +// and the messages in the buffer. The aggregation is up to the client. // returns an error if the message could not be sent for marshal problems or if the buffer is too large func (producer *Producer) Send(streamMessage message.StreamMessage) error { messageSeq, err := producer.fromMessageToMessageSequence(streamMessage) @@ -421,7 +414,6 @@ func (producer *Producer) BatchSend(batchMessages []message.StreamMessage) error return FrameTooLarge } - // all the messages are unconfirmed return producer.internalBatchSend(messagesSequence) } From 57be0d3ab7ac80f04d4d117b4f7b353bc8b55eb2 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 14 Jan 2025 12:06:08 +0100 Subject: [PATCH 18/18] remove unsed code Signed-off-by: Gabriele Santomaggio --- README.md | 9 +++++++++ pkg/stream/blocking_queue.go | 6 ++---- pkg/stream/client.go | 11 +---------- pkg/stream/environment.go | 13 ------------- 4 files changed, 12 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index c136870c..b39de3ee 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,15 @@ Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-serv ### Overview Go client for [RabbitMQ Stream Queues](https://github.com/rabbitmq/rabbitmq-server/tree/master/deps/rabbitmq_stream) +The client contains all features to interact with the RabbitMQ Stream Queues.
+ +The main structure is the `Environment` that contains the `Producer` and `Consumer` interfaces.
+ +`Producer` and `Consumer` are the main interfaces to interact with the RabbitMQ Stream Queues.
+They don't support the auto-reconnect in case of disconnection but have the events to detect it.
+ +The client provides the `ReliableProducer` and `ReliableConsumer` that support the auto-reconnect in case of disconnection.
+See also the [Reliable Producer and Reliable Consumer](#reliable-producer-and-reliable-consumer) section. ### Installing diff --git a/pkg/stream/blocking_queue.go b/pkg/stream/blocking_queue.go index 1da39b31..6f66a23c 100644 --- a/pkg/stream/blocking_queue.go +++ b/pkg/stream/blocking_queue.go @@ -12,16 +12,14 @@ var ErrBlockingQueueStopped = errors.New("blocking queue stopped") type BlockingQueue[T any] struct { queue chan T status int32 - capacity int lastUpdate int64 } // NewBlockingQueue initializes a new BlockingQueue with the given capacity func NewBlockingQueue[T any](capacity int) *BlockingQueue[T] { return &BlockingQueue[T]{ - queue: make(chan T, capacity), - capacity: capacity, - status: 0, + queue: make(chan T, capacity), + status: 0, } } diff --git a/pkg/stream/client.go b/pkg/stream/client.go index 43ad9beb..e8700c7b 100644 --- a/pkg/stream/client.go +++ b/pkg/stream/client.go @@ -63,8 +63,7 @@ type Client struct { tcpParameters *TCPParameters saslConfiguration *SaslConfiguration - mutex *sync.Mutex - //metadataListener metadataListener + mutex *sync.Mutex lastHeartBeat HeartBeat socketCallTimeout time.Duration availableFeatures *availableFeatures @@ -112,14 +111,10 @@ func newClient(connectionName string, broker *Broker, } func (c *Client) getSocket() *socket { - //c.mutexMessageMap.Lock() - //defer c.mutexMessageMap.Unlock() return &c.socket } func (c *Client) setSocketConnection(connection net.Conn) { - //c.mutexMessageMap.Lock() - //defer c.mutexMessageMap.Unlock() c.socket.connection = connection c.socket.writer = bufio.NewWriter(connection) } @@ -512,10 +507,6 @@ func (c *Client) Close() error { } } - //if c.metadataListener != nil { - // close(c.metadataListener) - // c.metadataListener = nil - //} c.closeHartBeat() if c.getSocket().isOpen() { diff --git a/pkg/stream/environment.go b/pkg/stream/environment.go index 541c49af..06499cc6 100644 --- a/pkg/stream/environment.go +++ b/pkg/stream/environment.go @@ -592,19 +592,6 @@ func (cc *environmentCoordinator) newProducer(leader *Broker, tcpParameters *TCP func (cc *environmentCoordinator) newClientForProducer(connectionName string, leader *Broker, tcpParameters *TCPParameters, saslConfiguration *SaslConfiguration, rpcTimeOut time.Duration) *Client { clientResult := newClient(connectionName, leader, tcpParameters, saslConfiguration, rpcTimeOut) - //chMeta := make(chan metaDataUpdateEvent, 1) - //clientResult.metadataListener = chMeta - //go func(ch <-chan metaDataUpdateEvent, cl *Client) { - // for metaDataUpdateEvent := range ch { - // clientResult.maybeCleanProducers(metaDataUpdateEvent.StreamName) - // cc.maybeCleanClients() - // if !cl.socket.isOpen() { - // return - // } - // } - // - //}(chMeta, clientResult) - cc.nextId++ cc.clientsPerContext[cc.nextId] = clientResult return clientResult