From 70a5e45c3830b872e1bc03088d4bee36f10930a2 Mon Sep 17 00:00:00 2001 From: Richard Sugg Date: Wed, 7 May 2025 07:20:09 -0400 Subject: [PATCH 1/3] do not panic during chunk dispatching if consumer suddenly closed --- pkg/stream/server_frame.go | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index d3a323b3..658cd62e 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -3,11 +3,12 @@ package stream import ( "bufio" "bytes" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" - "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" "hash/crc32" "io" "time" + + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs" ) type ReaderProtocol struct { @@ -281,7 +282,6 @@ func (c *Client) queryPublisherSequenceFrameHandler(readProtocol *ReaderProtocol res.data <- sequence } func (c *Client) handleDeliver(r *bufio.Reader) { - subscriptionId := readByte(r) consumer, err := c.coordinator.GetConsumerById(subscriptionId) consumerFound := err == nil @@ -406,8 +406,16 @@ func (c *Client) handleDeliver(r *bufio.Reader) { // dispatch the messages with offset to the consumer chunk.offsetMessages = batchConsumingMessages + logs.LogDebug("Dispatching %d messages to consumer %d", len(batchConsumingMessages), subscriptionId) if consumer.getStatus() == open { - consumer.chunkForConsumer <- chunk + select { + case consumer.chunkForConsumer <- chunk: + return + default: + logs.LogDebug("The consumer %s for the stream %s reports as open but is probably "+ + "closed during chunk dispatching. Messages won't be dispatched. ", + consumer.GetName(), consumer.GetStreamName()) + } } else { logs.LogDebug("The consumer %s for the stream %s is closed during the chunk dispatching. "+ "Messages won't dispatched", consumer.GetName(), consumer.GetStreamName()) From fc27a365fbb45d09a4ff761c9fd21e266711fa7b Mon Sep 17 00:00:00 2001 From: Richard Sugg Date: Wed, 7 May 2025 09:39:28 -0400 Subject: [PATCH 2/3] remove unnecessary return --- pkg/stream/server_frame.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 8d14a059..0c606361 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -410,7 +410,6 @@ func (c *Client) handleDeliver(r *bufio.Reader) { if consumer.getStatus() == open { select { case consumer.chunkForConsumer <- chunk: - return default: logs.LogDebug("The consumer %s for the stream %s reports as open but is probably "+ "closed during chunk dispatching. Messages won't be dispatched. ", From 38c3765755678f19e2db3169f73d34143b9b156b Mon Sep 17 00:00:00 2001 From: Richard Sugg Date: Wed, 7 May 2025 09:43:58 -0400 Subject: [PATCH 3/3] remove unnecessary logDebug statement --- pkg/stream/server_frame.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index 0c606361..a5d48412 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -406,7 +406,6 @@ func (c *Client) handleDeliver(r *bufio.Reader) { // dispatch the messages with offset to the consumer chunk.offsetMessages = batchConsumingMessages - logs.LogDebug("Dispatching %d messages to consumer %d", len(batchConsumingMessages), subscriptionId) if consumer.getStatus() == open { select { case consumer.chunkForConsumer <- chunk: