Skip to content

Commit 300214b

Browse files
authored
Merge pull request #1 from afaf-tech/fix/consumer-option
fix: queue option and godoc
2 parents c4a057d + 4612783 commit 300214b

File tree

4 files changed

+178
-52
lines changed

4 files changed

+178
-52
lines changed

README.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ Explores the hypothesis of using a single connection for many channels, both for
1010

1111
## Requirements
1212
- go 1.21
13-
- docker
1413

1514
### Installation
1615

consumer.go

Lines changed: 71 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,57 @@ import (
88
amqp "github.com/rabbitmq/amqp091-go"
99
)
1010

11+
// Message represents a message received by a consumer from a RabbitMQ queue.
12+
// It includes the message body and methods for acknowledging, negatively acknowledging,
13+
// or rejecting the message, as well as metadata about the message's origin.
14+
type Message struct {
15+
// Body is the content of the message received from the queue.
16+
// It contains the actual data sent by the producer.
17+
Body []byte
18+
19+
// Ack is a function that, when called, acknowledges the message.
20+
// It informs RabbitMQ that the message has been successfully processed.
21+
Ack func()
22+
23+
// Nack is a function that, when called, negatively acknowledges the message.
24+
// It informs RabbitMQ that the message was not processed successfully, and depending on
25+
// the RabbitMQ configuration, it may be re-delivered or discarded.
26+
Nack func()
27+
28+
// Reject is a function that, when called, rejects the message.
29+
// It informs RabbitMQ that the message was not processed and does not want it to be re-delivered.
30+
Reject func()
31+
32+
// QueueName is the name of the RabbitMQ queue from which the message was consumed.
33+
// It helps to identify the source of the message.
34+
QueueName string
35+
36+
// ConsumerID is the unique identifier of the consumer that received the message.
37+
// It can be used for logging or to distinguish between multiple consumers.
38+
ConsumerID string
39+
}
40+
41+
// Consumer represents an AMQP consumer that consumes messages from a RabbitMQ queue.
42+
// It manages the connection, channel, and queue configurations for the consumer.
1143
type Consumer struct {
12-
conn *Connection
13-
channel *amqp.Channel
14-
queueName string
15-
name string
44+
// conn is the underlying AMQP connection used by the consumer to communicate with RabbitMQ.
45+
// It allows for opening channels and managing message consumption.
46+
conn *Connection
47+
48+
// channel is the AMQP channel through which the consumer will consume messages from the queue.
49+
// A channel is used to send and receive messages between the consumer and the broker.
50+
channel *amqp.Channel
51+
52+
// queueName is the name of the RabbitMQ queue from which this consumer will consume messages.
53+
// It helps the consumer identify which queue to connect to.
54+
queueName string
55+
56+
// name is the unique name of this consumer, used for identification purposes.
57+
// It is often used for logging or distinguishing between multiple consumers.
58+
name string
59+
60+
// queueConfig holds the configuration options for the queue, such as whether the queue is durable,
61+
// auto-delete, and other properties that affect its behavior.
1662
queueConfig QueueOptions
1763
}
1864

@@ -48,41 +94,40 @@ func NewConsumer(conn *Connection, name, queue string, options QueueOptions) (*C
4894
}, nil
4995
}
5096

51-
// Start begins the consumer's main loop
52-
func (c *Consumer) Start(ctx context.Context, fn func(*Connection, string, string, *<-chan amqp.Delivery)) {
53-
var err error
54-
97+
func (c *Consumer) Start(ctx context.Context, handler func(ctx context.Context, msg *Message)) {
5598
for {
56-
// Ensure connection and channel are valid before starting consumption
57-
err = c.checkConnectionAndChannel()
58-
if err != nil {
99+
if err := c.checkConnectionAndChannel(); err != nil {
59100
time.Sleep(c.conn.config.RetryDuration)
60101
continue
61102
}
62103

63-
// Register the consumer and handle any errors
64-
msgs, err := c.channel.ConsumeWithContext(
104+
deliveries, err := c.channel.ConsumeWithContext(
65105
ctx,
66-
c.queueName, // queue
67-
c.name, // consumer
68-
false, // auto-ack
69-
false, // exclusive
70-
false, // no-local
71-
false, // no-wait
72-
nil, // args
106+
c.queueName,
107+
c.name,
108+
c.queueConfig.AutoAck,
109+
c.queueConfig.Exclusive,
110+
c.queueConfig.NoLocal,
111+
c.queueConfig.NoWait,
112+
c.queueConfig.Args,
73113
)
74114
if err != nil {
75-
log.Printf("[%s] Failed to register a consumer: %v. Retrying in 5 seconds.", c.name, err)
115+
log.Printf("[%s] Failed to start consuming: %v. Retrying...", c.name, err)
76116
time.Sleep(c.conn.config.RetryDuration)
77117
continue
78118
}
79119

80-
// Execute the user-defined function
81-
fn(c.conn, c.name, c.queueName, &msgs)
120+
for d := range deliveries {
121+
msg := &Message{
122+
Body: d.Body,
123+
Ack: func() { d.Ack(false) },
124+
Nack: func() { d.Nack(false, true) },
125+
Reject: func() { d.Reject(false) },
126+
QueueName: c.queueName,
127+
ConsumerID: c.name,
128+
}
82129

83-
// Close the channel after consuming messages (if necessary)
84-
if c.channel != nil {
85-
c.channel.Close()
130+
handler(ctx, msg)
86131
}
87132
}
88133
}

producer.go

Lines changed: 43 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,23 @@ import (
88
amqp "github.com/rabbitmq/amqp091-go"
99
)
1010

11+
// Producer represents an AMQP producer that sends messages to a specified RabbitMQ queue.
1112
type Producer struct {
12-
conn *Connection
13-
channel *amqp.Channel
13+
// conn is the underlying AMQP connection that this producer uses to communicate with the RabbitMQ broker.
14+
// It provides the connection functionality for creating channels and sending messages.
15+
conn *Connection
16+
17+
// channel is the AMQP channel used by this producer to publish messages to RabbitMQ.
18+
// A channel is a lightweight connection to the broker and is used to send and receive messages.
19+
channel *amqp.Channel
20+
21+
// queueName is the name of the RabbitMQ queue to which this producer sends messages.
22+
// It identifies the target queue for message delivery.
1423
queueName string
15-
name string
24+
25+
// name is the unique name of this producer, often used for logging or identifying the producer.
26+
// It helps distinguish multiple producers in the system.
27+
name string
1628
}
1729

1830
// NewProducer initializes a new Producer instance and declares the queue with the specified options
@@ -40,14 +52,35 @@ func NewProducer(conn *Connection, name, queue string) (*Producer, error) {
4052
}, nil
4153
}
4254

55+
// PublishOptions represents the configuration options for publishing a message to RabbitMQ.
4356
type PublishOptions struct {
44-
Exchange string // The exchange to publish the message to
45-
RoutingKey string // The routing key to use for the message
46-
ContentType string // The content type of the message (e.g., "text/plain")
47-
Body []byte // The actual message body
48-
Mandatory bool // Whether the message is mandatory
49-
Immediate bool // Whether the message is immediate
50-
Headers amqp.Table // Optional headers for the message
57+
// Exchange is the name of the exchange to which the message will be published.
58+
// The exchange determines how the message will be routed to queues.
59+
Exchange string
60+
61+
// RoutingKey is the routing key used by the exchange to decide where to route the message.
62+
// The value depends on the type of exchange (e.g., direct, topic).
63+
RoutingKey string
64+
65+
// ContentType specifies the content type of the message. It helps the consumer interpret the message body.
66+
// For example, "text/plain" or "application/json".
67+
ContentType string
68+
69+
// Body is the actual message content, represented as a byte slice.
70+
// This is the payload of the message being sent to the RabbitMQ exchange.
71+
Body []byte
72+
73+
// Mandatory indicates whether the message is mandatory.
74+
// If true, RabbitMQ will return the message to the producer if it cannot be routed to a queue.
75+
Mandatory bool
76+
77+
// Immediate indicates whether the message is immediate.
78+
// If true, RabbitMQ will try to deliver the message to a consumer immediately, if possible.
79+
Immediate bool
80+
81+
// Headers is an optional map of headers that can be included with the message.
82+
// These headers can carry metadata or additional information to help the consumer process the message.
83+
Headers amqp.Table
5184
}
5285

5386
// Publish sends a message to the queue with retry logic and context support

rabbitmq.go

Lines changed: 64 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,35 @@ import (
99
amqp "github.com/rabbitmq/amqp091-go"
1010
)
1111

12-
// QueueOptions defines RabbitMQ queue configurations
12+
// QueueOptions represents the configuration options for declaring a queue in RabbitMQ.
1313
type QueueOptions struct {
14-
Durable bool
14+
// Durable specifies whether the queue should survive broker restarts. If true, the queue will
15+
// be durable, meaning it will persist even if RabbitMQ crashes or restarts.
16+
Durable bool
17+
18+
// AutoAck enables or disables automatic message acknowledgment. If true, messages are automatically
19+
// acknowledged by the broker once they are received by the consumer.
20+
AutoAck bool
21+
22+
// AutoDelete determines whether the queue should be automatically deleted when no consumers are
23+
// connected. If true, the queue will be deleted when the last consumer disconnects.
1524
AutoDelete bool
16-
Exclusive bool
17-
NoWait bool
18-
Args amqp.Table
25+
26+
// Exclusive makes the queue private to the connection that created it. If true, the queue can only
27+
// be used by the connection that declared it and will be deleted once that connection closes.
28+
Exclusive bool
29+
30+
// NoWait prevents the server from sending a response to the queue declaration. If true, the declaration
31+
// will not wait for an acknowledgment from the server and no error will be returned if the queue already exists.
32+
NoWait bool
33+
34+
// NoLocal prevents the delivery of messages to the connection that published them. If true, messages
35+
// will not be delivered to the connection that created the queue.
36+
NoLocal bool
37+
38+
// Args allows additional arguments to be passed when declaring the queue. This can be used for advanced
39+
// RabbitMQ configurations, such as setting arguments for policies or defining queue TTLs (Time-To-Live).
40+
Args amqp.Table
1941
}
2042

2143
const (
@@ -24,21 +46,48 @@ const (
2446
defaultMaxChannels = 10 // Default maximum number of channels
2547
)
2648

27-
// Config holds the configuration for connecting to RabbitMQ
49+
// Config holds the configuration options for establishing a RabbitMQ connection and managing
50+
// consumer channels.
2851
type Config struct {
29-
URI string // URI for RabbitMQ connection (e.g., "amqp://guest:guest@localhost:5672/")
30-
RetryDuration time.Duration // Time to wait before retrying a failed connection attempt
31-
AMQPConfig *amqp.Config // AMQP-specific configuration, can be nil to use defaults
32-
MaxChannels int // Maximum number of channels allowed (default is 10)
52+
// URI is the connection string for RabbitMQ. It should follow the AMQP URI format, e.g.,
53+
// "amqp://guest:guest@localhost:5672/". It is used to establish the connection to the RabbitMQ broker.
54+
URI string
55+
56+
// RetryDuration specifies the time duration to wait before retrying a failed connection attempt.
57+
// This is useful to implement a backoff strategy in case of temporary network issues.
58+
RetryDuration time.Duration
59+
60+
// AMQPConfig holds AMQP-specific configuration options. If nil, default AMQP configurations
61+
// are used. This can include settings like heartbeat intervals and other advanced AMQP features.
62+
AMQPConfig *amqp.Config
63+
64+
// MaxChannels defines the maximum number of channels that can be opened to the RabbitMQ server.
65+
// If the value is 0 or negative, the default is used (which may be 10 channels).
66+
MaxChannels int
3367
}
3468

35-
// Connection wraps the actual AMQP connection and provides reconnection logic
69+
// Connection wraps the actual AMQP connection and provides reconnection logic, ensuring
70+
// that the connection and channels remain active or are re-established when necessary.
3671
type Connection struct {
72+
// Connection is the underlying AMQP connection provided by the AMQP client library.
73+
// It provides the basic connection functionalities such as opening channels, closing the connection, etc.
3774
*amqp.Connection
38-
config *Config // Custom configuration for the connection
39-
mu sync.RWMutex // Mutex to synchronize access to the connection
40-
reconnecting bool // Flag to indicate ongoing reconnection
41-
channels []*amqp.Channel // Slice to hold active channels
75+
76+
// config holds the custom configuration settings for the RabbitMQ connection, such as URI, retry duration, etc.
77+
// These settings are used for connection management and reconnection attempts.
78+
config *Config
79+
80+
// mu is a read-write mutex used to synchronize access to the connection and channels,
81+
// ensuring thread-safe operations for shared resources.
82+
mu sync.RWMutex
83+
84+
// reconnecting is a flag indicating whether a reconnection attempt is currently in progress.
85+
// This is used to prevent multiple concurrent reconnection attempts.
86+
reconnecting bool
87+
88+
// channels holds a slice of active channels associated with the current connection.
89+
// Channels are used to send and receive messages within a specific connection context.
90+
channels []*amqp.Channel
4291
}
4392

4493
// NewConnection creates a new Connection object and initializes it with the provided configuration

0 commit comments

Comments
 (0)