Skip to content

Commit c5ad6ff

Browse files
committed
improve the message log
Signed-off-by: Gabriele Santomaggio <g.santomaggio@gmail.com>
1 parent a2fd590 commit c5ad6ff

File tree

1 file changed

+13
-2
lines changed

1 file changed

+13
-2
lines changed

pkg/stream/server_frame.go

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -415,7 +415,8 @@ func (c *Client) handleDeliver(r *bufio.Reader) {
415415
if consumer.getStatus() == open {
416416
consumer.response.chunkForConsumer <- chunk
417417
} else {
418-
logs.LogWarn("Consumer %s is closed", consumer.GetStreamName())
418+
logs.LogDebug("The consumer %s for the stream %s is closed during the chunk dispatching. "+
419+
"Messages won't dispatched", consumer.GetName(), consumer.GetStreamName())
419420
}
420421

421422
}
@@ -439,7 +440,17 @@ func (c *Client) creditNotificationFrameHandler(readProtocol *ReaderProtocol,
439440
r *bufio.Reader) {
440441
readProtocol.ResponseCode = uShortExtractResponseCode(readUShort(r))
441442
subscriptionId := readByte(r)
442-
logs.LogWarn("received a credit for an unknown subscriptionId: %d", subscriptionId)
443+
consumer, err := c.coordinator.GetConsumerById(subscriptionId)
444+
if err != nil {
445+
logs.LogWarn("received a credit for an unknown subscriptionId: %d", subscriptionId)
446+
return
447+
}
448+
449+
if consumer != nil && consumer.getStatus() == closed {
450+
logs.LogDebug("received a credit for a closed consumer %d", subscriptionId)
451+
return
452+
}
453+
443454
}
444455

445456
func (c *Client) queryOffsetFrameHandler(readProtocol *ReaderProtocol,

0 commit comments

Comments
 (0)