@@ -8,11 +8,57 @@ import (
8
8
amqp "github.com/rabbitmq/amqp091-go"
9
9
)
10
10
11
+ // Message represents a message received by a consumer from a RabbitMQ queue.
12
+ // It includes the message body and methods for acknowledging, negatively acknowledging,
13
+ // or rejecting the message, as well as metadata about the message's origin.
14
+ type Message struct {
15
+ // Body is the content of the message received from the queue.
16
+ // It contains the actual data sent by the producer.
17
+ Body []byte
18
+
19
+ // Ack is a function that, when called, acknowledges the message.
20
+ // It informs RabbitMQ that the message has been successfully processed.
21
+ Ack func ()
22
+
23
+ // Nack is a function that, when called, negatively acknowledges the message.
24
+ // It informs RabbitMQ that the message was not processed successfully, and depending on
25
+ // the RabbitMQ configuration, it may be re-delivered or discarded.
26
+ Nack func ()
27
+
28
+ // Reject is a function that, when called, rejects the message.
29
+ // It informs RabbitMQ that the message was not processed and does not want it to be re-delivered.
30
+ Reject func ()
31
+
32
+ // QueueName is the name of the RabbitMQ queue from which the message was consumed.
33
+ // It helps to identify the source of the message.
34
+ QueueName string
35
+
36
+ // ConsumerID is the unique identifier of the consumer that received the message.
37
+ // It can be used for logging or to distinguish between multiple consumers.
38
+ ConsumerID string
39
+ }
40
+
41
+ // Consumer represents an AMQP consumer that consumes messages from a RabbitMQ queue.
42
+ // It manages the connection, channel, and queue configurations for the consumer.
11
43
type Consumer struct {
12
- conn * Connection
13
- channel * amqp.Channel
14
- queueName string
15
- name string
44
+ // conn is the underlying AMQP connection used by the consumer to communicate with RabbitMQ.
45
+ // It allows for opening channels and managing message consumption.
46
+ conn * Connection
47
+
48
+ // channel is the AMQP channel through which the consumer will consume messages from the queue.
49
+ // A channel is used to send and receive messages between the consumer and the broker.
50
+ channel * amqp.Channel
51
+
52
+ // queueName is the name of the RabbitMQ queue from which this consumer will consume messages.
53
+ // It helps the consumer identify which queue to connect to.
54
+ queueName string
55
+
56
+ // name is the unique name of this consumer, used for identification purposes.
57
+ // It is often used for logging or distinguishing between multiple consumers.
58
+ name string
59
+
60
+ // queueConfig holds the configuration options for the queue, such as whether the queue is durable,
61
+ // auto-delete, and other properties that affect its behavior.
16
62
queueConfig QueueOptions
17
63
}
18
64
@@ -48,41 +94,40 @@ func NewConsumer(conn *Connection, name, queue string, options QueueOptions) (*C
48
94
}, nil
49
95
}
50
96
51
- // Start begins the consumer's main loop
52
- func (c * Consumer ) Start (ctx context.Context , fn func (* Connection , string , string , * <- chan amqp.Delivery )) {
53
- var err error
54
-
97
+ func (c * Consumer ) Start (ctx context.Context , handler func (ctx context.Context , msg * Message )) {
55
98
for {
56
- // Ensure connection and channel are valid before starting consumption
57
- err = c .checkConnectionAndChannel ()
58
- if err != nil {
99
+ if err := c .checkConnectionAndChannel (); err != nil {
59
100
time .Sleep (c .conn .config .RetryDuration )
60
101
continue
61
102
}
62
103
63
- // Register the consumer and handle any errors
64
- msgs , err := c .channel .ConsumeWithContext (
104
+ deliveries , err := c .channel .ConsumeWithContext (
65
105
ctx ,
66
- c .queueName , // queue
67
- c .name , // consumer
68
- false , // auto-ack
69
- false , // exclusive
70
- false , // no-local
71
- false , // no-wait
72
- nil , // args
106
+ c .queueName ,
107
+ c .name ,
108
+ c . queueConfig . AutoAck ,
109
+ c . queueConfig . Exclusive ,
110
+ c . queueConfig . NoLocal ,
111
+ c . queueConfig . NoWait ,
112
+ c . queueConfig . Args ,
73
113
)
74
114
if err != nil {
75
- log .Printf ("[%s] Failed to register a consumer : %v. Retrying in 5 seconds ." , c .name , err )
115
+ log .Printf ("[%s] Failed to start consuming : %v. Retrying.. ." , c .name , err )
76
116
time .Sleep (c .conn .config .RetryDuration )
77
117
continue
78
118
}
79
119
80
- // Execute the user-defined function
81
- fn (c .conn , c .name , c .queueName , & msgs )
120
+ for d := range deliveries {
121
+ msg := & Message {
122
+ Body : d .Body ,
123
+ Ack : func () { d .Ack (false ) },
124
+ Nack : func () { d .Nack (false , true ) },
125
+ Reject : func () { d .Reject (false ) },
126
+ QueueName : c .queueName ,
127
+ ConsumerID : c .name ,
128
+ }
82
129
83
- // Close the channel after consuming messages (if necessary)
84
- if c .channel != nil {
85
- c .channel .Close ()
130
+ handler (ctx , msg )
86
131
}
87
132
}
88
133
}
0 commit comments