Skip to content

Commit b12d6f4

Browse files
committed
add test for batching messages with headers
1 parent fcf85ac commit b12d6f4

File tree

1 file changed

+66
-0
lines changed

1 file changed

+66
-0
lines changed

writer_test.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import (
77
"io"
88
"math"
99
"strconv"
10+
"strings"
1011
"sync"
1112
"testing"
1213
"time"
@@ -134,6 +135,10 @@ func TestWriter(t *testing.T) {
134135
scenario: "writing messages with a small batch byte size",
135136
function: testWriterSmallBatchBytes,
136137
},
138+
{
139+
scenario: "writing messages with headers",
140+
function: testWriterBatchBytesHeaders,
141+
},
137142
{
138143
scenario: "setting a non default balancer on the writer",
139144
function: testWriterSetsRightBalancer,
@@ -592,6 +597,67 @@ func testWriterSmallBatchBytes(t *testing.T) {
592597
}
593598
}
594599

600+
func testWriterBatchBytesHeaders(t *testing.T) {
601+
topic := makeTopic()
602+
createTopic(t, topic, 1)
603+
defer deleteTopic(t, topic)
604+
605+
offset, err := readOffset(topic, 0)
606+
if err != nil {
607+
t.Fatal(err)
608+
}
609+
610+
w := newTestWriter(WriterConfig{
611+
Topic: topic,
612+
BatchBytes: 100,
613+
BatchTimeout: 50 * time.Millisecond,
614+
Balancer: &RoundRobin{},
615+
})
616+
defer w.Close()
617+
618+
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
619+
defer cancel()
620+
if err := w.WriteMessages(ctx, []Message{
621+
{
622+
Value: []byte("Hello World 1"),
623+
Headers: []Header{
624+
{Key: "User-Agent", Value: []byte("abc/xyz")},
625+
},
626+
},
627+
{
628+
Value: []byte("Hello World 2"),
629+
Headers: []Header{
630+
{Key: "User-Agent", Value: []byte("abc/xyz")},
631+
},
632+
},
633+
}...); err != nil {
634+
t.Error(err)
635+
return
636+
}
637+
ws := w.Stats()
638+
if ws.Writes != 2 {
639+
t.Error("didn't batch messages; Writes: ", ws.Writes)
640+
return
641+
}
642+
msgs, err := readPartition(topic, 0, offset)
643+
if err != nil {
644+
t.Error("error reading partition", err)
645+
return
646+
}
647+
648+
if len(msgs) != 2 {
649+
t.Error("bad messages in partition", msgs)
650+
return
651+
}
652+
653+
for _, m := range msgs {
654+
if strings.HasPrefix(string(m.Value), "Hello World") {
655+
continue
656+
}
657+
t.Error("bad messages in partition", msgs)
658+
}
659+
}
660+
595661
func testWriterMultipleTopics(t *testing.T) {
596662
topic1 := makeTopic()
597663
createTopic(t, topic1, 1)

0 commit comments

Comments
 (0)