Skip to content

Commit a0d8b72

Browse files
committed
feat: add publish confirmation function
1 parent 560baf3 commit a0d8b72

File tree

2 files changed

+45
-3
lines changed

2 files changed

+45
-3
lines changed

pkg/rabbitmq/producer.go

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ type producerOptions struct {
2323
// If true, the message will be returned to the sender if the queue cannot be
2424
// found according to its own exchange type and routeKey rules.
2525
mandatory bool
26+
27+
// only for publish-subscribe mode
28+
isPublisherConfirm bool
2629
}
2730

2831
func (o *producerOptions) apply(opts ...ProducerOption) {
@@ -39,8 +42,9 @@ func defaultProducerOptions() *producerOptions {
3942
queueBind: defaultQueueBindOptions(),
4043
deadLetter: defaultDeadLetterOptions(),
4144

42-
isPersistent: true,
43-
mandatory: true,
45+
isPersistent: true,
46+
mandatory: true,
47+
isPublisherConfirm: false,
4448
}
4549
}
4650

@@ -86,6 +90,13 @@ func WithProducerMandatory(enable bool) ProducerOption {
8690
}
8791
}
8892

93+
// WithPublisherConfirm enables publisher confirm.
94+
func WithPublisherConfirm() ProducerOption {
95+
return func(o *producerOptions) {
96+
o.isPublisherConfirm = true
97+
}
98+
}
99+
89100
// -------------------------------------------------------------------------------------------
90101

91102
// Producer session
@@ -108,6 +119,9 @@ type Producer struct {
108119
exchangeArgs amqp.Table
109120
queueArgs amqp.Table
110121
queueBindArgs amqp.Table
122+
123+
// only for publish-subscribe mode
124+
isPublisherConfirm bool
111125
}
112126

113127
// NewProducer create a producer

pkg/rabbitmq/publisher.go

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package rabbitmq
22

33
import (
44
"context"
5+
"fmt"
56

67
amqp "github.com/rabbitmq/amqp091-go"
78
"go.uber.org/zap"
@@ -25,6 +26,15 @@ func NewPublisher(channelName string, connection *Connection, opts ...ProducerOp
2526
return nil, err
2627
}
2728

29+
// enable publisher confirm
30+
if o.isPublisherConfirm {
31+
err = ch.Confirm(false)
32+
if err != nil {
33+
_ = ch.Close()
34+
return nil, err
35+
}
36+
}
37+
2838
// declare the exchange type
2939
err = ch.ExchangeDeclare(
3040
channelName,
@@ -61,7 +71,7 @@ func NewPublisher(channelName string, connection *Connection, opts ...ProducerOp
6171
}
6272

6373
func (p *Publisher) Publish(ctx context.Context, body []byte) error {
64-
return p.ch.PublishWithContext(
74+
err := p.ch.PublishWithContext(
6575
ctx,
6676
p.Exchange.name,
6777
p.Exchange.routingKey,
@@ -73,6 +83,24 @@ func (p *Publisher) Publish(ctx context.Context, body []byte) error {
7383
Body: body,
7484
},
7585
)
86+
if err != nil {
87+
return err
88+
}
89+
90+
if p.isPublisherConfirm {
91+
// wait for publisher confirm
92+
select {
93+
case <-ctx.Done():
94+
return ctx.Err()
95+
case confirm := <-p.ch.NotifyPublish(make(chan amqp.Confirmation, 1)):
96+
if !confirm.Ack {
97+
return fmt.Errorf("publisher confirm failed, exchangeName: %s, routingKey: %s, deliveryTag: %d",
98+
p.Exchange.name, p.Exchange.routingKey, confirm.DeliveryTag)
99+
}
100+
}
101+
}
102+
103+
return nil
76104
}
77105

78106
// Close publisher

0 commit comments

Comments
 (0)