diff --git a/pkg/stream/server_frame.go b/pkg/stream/server_frame.go index ff19ff08..67a76c35 100644 --- a/pkg/stream/server_frame.go +++ b/pkg/stream/server_frame.go @@ -415,7 +415,8 @@ func (c *Client) handleDeliver(r *bufio.Reader) { if consumer.getStatus() == open { consumer.response.chunkForConsumer <- chunk } else { - logs.LogWarn("Consumer %s is closed", consumer.GetStreamName()) + logs.LogDebug("The consumer %s for the stream %s is closed during the chunk dispatching. "+ + "Messages won't dispatched", consumer.GetName(), consumer.GetStreamName()) } } @@ -439,7 +440,17 @@ func (c *Client) creditNotificationFrameHandler(readProtocol *ReaderProtocol, r *bufio.Reader) { readProtocol.ResponseCode = uShortExtractResponseCode(readUShort(r)) subscriptionId := readByte(r) - logs.LogWarn("received a credit for an unknown subscriptionId: %d", subscriptionId) + consumer, err := c.coordinator.GetConsumerById(subscriptionId) + if err != nil { + logs.LogWarn("received a credit for an unknown subscriptionId: %d", subscriptionId) + return + } + + if consumer != nil && consumer.getStatus() == closed { + logs.LogDebug("received a credit for a closed consumer %d", subscriptionId) + return + } + } func (c *Client) queryOffsetFrameHandler(readProtocol *ReaderProtocol,