Skip to content

Commit e485574

Browse files
authored
add BatchSend to HA Producer (#334)
1 parent 0b9344e commit e485574

File tree

2 files changed

+48
-14
lines changed

2 files changed

+48
-14
lines changed

pkg/ha/ha_publisher.go

Lines changed: 33 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,14 @@ package ha
33
import (
44
"errors"
55
"fmt"
6-
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
7-
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
8-
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
96
"strings"
107
"sync"
118
"sync/atomic"
129
"time"
10+
11+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/logs"
12+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
13+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
1314
)
1415

1516
func (p *ReliableProducer) handlePublishConfirm(confirms stream.ChannelPublishConfirm) {
@@ -109,12 +110,13 @@ func (p *ReliableProducer) newProducer() error {
109110
return err
110111
}
111112

112-
func (p *ReliableProducer) Send(message message.StreamMessage) error {
113+
func (p *ReliableProducer) isReadyToSend() error {
113114
if p.GetStatus() == StatusStreamDoesNotExist {
114115
return stream.StreamDoesNotExist
115116
}
117+
116118
if p.GetStatus() == StatusClosed {
117-
return errors.New(fmt.Sprintf("%s is closed", p.getInfo()))
119+
return fmt.Errorf("%s is closed", p.getInfo())
118120
}
119121

120122
if p.GetStatus() == StatusReconnecting {
@@ -123,11 +125,10 @@ func (p *ReliableProducer) Send(message message.StreamMessage) error {
123125
logs.LogDebug("[Reliable] %s reconnected. The send is unlocked", p.getInfo())
124126
}
125127

126-
p.mutex.Lock()
127-
defer p.mutex.Unlock()
128-
129-
errW := p.producer.Send(message)
128+
return nil
129+
}
130130

131+
func (p *ReliableProducer) checkWriteError(errW error) error {
131132
if errW != nil {
132133
switch {
133134
case errors.Is(errW, stream.FrameTooLarge):
@@ -138,10 +139,32 @@ func (p *ReliableProducer) Send(message message.StreamMessage) error {
138139
time.Sleep(500 * time.Millisecond)
139140
logs.LogError("[Reliable] %s - error during send %s", p.getInfo(), errW.Error())
140141
}
142+
}
143+
return nil
144+
}
141145

146+
func (p *ReliableProducer) Send(message message.StreamMessage) error {
147+
if err := p.isReadyToSend(); err != nil {
148+
return err
142149
}
143150

144-
return nil
151+
p.mutex.Lock()
152+
errW := p.producer.Send(message)
153+
p.mutex.Unlock()
154+
155+
return p.checkWriteError(errW)
156+
}
157+
158+
func (p *ReliableProducer) BatchSend(batchMessages []message.StreamMessage) error {
159+
if err := p.isReadyToSend(); err != nil {
160+
return err
161+
}
162+
163+
p.mutex.Lock()
164+
errW := p.producer.BatchSend(batchMessages)
165+
p.mutex.Unlock()
166+
167+
return p.checkWriteError(errW)
145168
}
146169

147170
func (p *ReliableProducer) IsOpen() bool {

pkg/ha/ha_publisher_test.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,16 @@
11
package ha
22

33
import (
4+
"sync/atomic"
5+
"time"
6+
47
"github.com/google/uuid"
58
. "github.com/onsi/ginkgo/v2"
69
. "github.com/onsi/gomega"
710
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp"
11+
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message"
812
. "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"
9-
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/test-helper"
10-
"sync/atomic"
11-
"time"
13+
test_helper "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/test-helper"
1214
)
1315

1416
var _ = Describe("Reliable Producer", func() {
@@ -44,14 +46,15 @@ var _ = Describe("Reliable Producer", func() {
4446
})
4547

4648
It("Create/Confirm and close a Reliable Producer", func() {
49+
const expectedMessages = 20
4750
signal := make(chan struct{})
4851
var confirmed int32
4952
producer, err := NewReliableProducer(envForRProducer,
5053
streamForRProducer, NewProducerOptions(), func(messageConfirm []*ConfirmationStatus) {
5154
for _, confirm := range messageConfirm {
5255
Expect(confirm.IsConfirmed()).To(BeTrue())
5356
}
54-
if atomic.AddInt32(&confirmed, int32(len(messageConfirm))) == 10 {
57+
if atomic.AddInt32(&confirmed, int32(len(messageConfirm))) == expectedMessages {
5558
signal <- struct{}{}
5659
}
5760
})
@@ -61,6 +64,14 @@ var _ = Describe("Reliable Producer", func() {
6164
err := producer.Send(msg)
6265
Expect(err).NotTo(HaveOccurred())
6366
}
67+
68+
for i := 0; i < 5; i++ {
69+
msg1 := amqp.NewMessage([]byte("ha_batch_1"))
70+
msg2 := amqp.NewMessage([]byte("ha_batch_2"))
71+
batch := []message.StreamMessage{msg1, msg2}
72+
err := producer.BatchSend(batch)
73+
Expect(err).NotTo(HaveOccurred())
74+
}
6475
<-signal
6576
Expect(producer.Close()).NotTo(HaveOccurred())
6677
})

0 commit comments

Comments
 (0)