Skip to content

Commit 16e5c35

Browse files
committed
ensure message headers are including for batching decision
1 parent b12d6f4 commit 16e5c35

File tree

2 files changed

+12
-1
lines changed

2 files changed

+12
-1
lines changed

message.go

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,17 @@ func (msg *Message) size() int32 {
4949
return 4 + 1 + 1 + sizeofBytes(msg.Key) + sizeofBytes(msg.Value) + timestampSize
5050
}
5151

52+
func (msg *Message) headerSize() int {
53+
return varArrayLen(len(msg.Headers), func(i int) int {
54+
h := &msg.Headers[i]
55+
return varStringLen(h.Key) + varBytesLen(h.Value)
56+
})
57+
}
58+
59+
func (msg *Message) totalSize() int32 {
60+
return int32(msg.headerSize()) + msg.size()
61+
}
62+
5263
type message struct {
5364
CRC int32
5465
MagicByte int8

writer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1216,7 +1216,7 @@ func newWriteBatch(now time.Time, timeout time.Duration) *writeBatch {
12161216
}
12171217

12181218
func (b *writeBatch) add(msg Message, maxSize int, maxBytes int64) bool {
1219-
bytes := int64(msg.size())
1219+
bytes := int64(msg.totalSize())
12201220

12211221
if b.size > 0 && (b.bytes+bytes) > maxBytes {
12221222
return false

0 commit comments

Comments
 (0)