Skip to content

Commit da9498b

Browse files
author
afaf
committed
version 0.1.0
0 parents  commit da9498b

File tree

8 files changed

+517
-0
lines changed

8 files changed

+517
-0
lines changed

.gitignore

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
# If you prefer the allow list template instead of the deny list, see community template:
2+
# https://github.com/github/gitignore/blob/main/community/Golang/Go.AllowList.gitignore
3+
#
4+
# Binaries for programs and plugins
5+
*.exe
6+
*.exe~
7+
*.dll
8+
*.so
9+
*.dylib
10+
11+
# Test binary, built with `go test -c`
12+
*.test
13+
14+
# Output of the go coverage tool, specifically when used with LiteIDE
15+
*.out
16+
17+
# Dependency directories (remove the comment below to include it)
18+
# vendor/
19+
20+
# Go workspace file
21+
# go.work

LICENSE.md

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
MIT License
2+
3+
Copyright (c) 2021 Lane Wagner
4+
5+
Permission is hereby granted, free of charge, to any person obtaining a copy
6+
of this software and associated documentation files (the "Software"), to deal
7+
in the Software without restriction, including without limitation the rights
8+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9+
copies of the Software, and to permit persons to whom the Software is
10+
furnished to do so, subject to the following conditions:
11+
12+
The above copyright notice and this permission notice shall be included in all
13+
copies or substantial portions of the Software.
14+
15+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21+
SOFTWARE.

README.md

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
# Go RabbitMQ Auto-Reconnect Project
2+
3+
This project demonstrates a simple implementation of a RabbitMQ consumer and producer in Go with automatic reconnection capabilities.
4+
5+
Explores the hypothesis of using a single connection for many channels, both for producers and consumers, in a RabbitMQ setup. This architectural choice aims to optimize resource usage and improve performance in scenarios with a large number of channels.
6+
7+
8+
## Requirements
9+
- go 1.21
10+
- docker
11+
12+
### Installation
13+
14+
1. use the module:
15+
```bash
16+
go get github.com/afaf-tech/go-rabbitmq
17+
```
18+
#### Consumer
19+
```bash
20+
# How to use the consumer
21+
```
22+
23+
24+
#### Producer
25+
```bash
26+
# How to use the producer
27+
```
28+
29+
30+
## Backlog
31+
- Proper Logging
32+
- Producer function
33+
34+
35+
## License
36+
This project is licensed under the MIT License - see the LICENSE.md file for details.

consumer.go

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
package rabbitmq
2+
3+
import (
4+
"context"
5+
"log"
6+
"time"
7+
8+
amqp "github.com/rabbitmq/amqp091-go"
9+
)
10+
11+
type Consumer struct {
12+
conn *Connection
13+
channel *amqp.Channel
14+
queueName string
15+
name string
16+
queueConfig QueueOptions
17+
}
18+
19+
// NewConsumer initializes a new Consumer instance and declares the queue with the specified options
20+
func NewConsumer(conn *Connection, name, queue string, options QueueOptions) (*Consumer, error) {
21+
ch, err := conn.OpenChannel()
22+
if err != nil {
23+
log.Printf("[%s] Failed to open a channel", name)
24+
return nil, err
25+
}
26+
27+
// Declare the queue
28+
_, err = ch.QueueDeclare(
29+
queue,
30+
options.Durable,
31+
options.AutoDelete,
32+
options.Exclusive,
33+
options.NoWait,
34+
options.Args,
35+
)
36+
if err != nil {
37+
log.Printf("[%s] Failed to declare queue: %v", name, err)
38+
defer ch.Close()
39+
return nil, err
40+
}
41+
42+
return &Consumer{
43+
conn: conn,
44+
channel: ch,
45+
queueName: queue,
46+
name: name,
47+
queueConfig: options,
48+
}, nil
49+
}
50+
51+
// Start begins the consumer's main loop
52+
func (c *Consumer) Start(ctx context.Context, fn func(*Connection, string, string, *<-chan amqp.Delivery)) {
53+
var err error
54+
55+
for {
56+
// Ensure connection and channel are valid before starting consumption
57+
err = c.checkConnectionAndChannel()
58+
if err != nil {
59+
time.Sleep(c.conn.config.RetryDuration)
60+
continue
61+
}
62+
63+
// Register the consumer and handle any errors
64+
msgs, err := c.channel.ConsumeWithContext(
65+
ctx,
66+
c.queueName, // queue
67+
c.name, // consumer
68+
false, // auto-ack
69+
false, // exclusive
70+
false, // no-local
71+
false, // no-wait
72+
nil, // args
73+
)
74+
if err != nil {
75+
log.Printf("[%s] Failed to register a consumer: %v. Retrying in 5 seconds.", c.name, err)
76+
time.Sleep(c.conn.config.RetryDuration)
77+
continue
78+
}
79+
80+
// Execute the user-defined function
81+
fn(c.conn, c.name, c.queueName, &msgs)
82+
83+
// Close the channel after consuming messages (if necessary)
84+
if c.channel != nil {
85+
c.channel.Close()
86+
}
87+
}
88+
}
89+
90+
// checkConnectionAndChannel ensures that both the connection and channel are open
91+
func (c *Consumer) checkConnectionAndChannel() error {
92+
var err error
93+
// Check if the connection is closed
94+
if c.conn.IsClosed() {
95+
log.Printf("[%s] Connection is closed. Attempting to reconnect...", c.name)
96+
err = c.conn.Reconnect()
97+
if err != nil {
98+
log.Printf("[%s] Failed to reconnect: %v. Retrying in 5 seconds.", c.name, err)
99+
return err
100+
}
101+
log.Printf("[%s] Reconnected to RabbitMQ", c.name)
102+
}
103+
104+
// Check if the channel is closed or nil
105+
if c.channel == nil || c.channel.IsClosed() {
106+
log.Printf("[%s] Channel is nil or closed. Trying to reopen...", c.name)
107+
c.channel, err = c.conn.OpenChannel()
108+
if err != nil {
109+
log.Printf("[%s] Failed to reopen channel: %v. Retrying in 5 seconds.", c.name, err)
110+
return err
111+
}
112+
log.Printf("[%s] Channel reopened", c.name)
113+
}
114+
115+
return nil
116+
}

go.mod

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
module github.com/afaf-tech/go-rabbitmq
2+
3+
go 1.22.1
4+
5+
require github.com/rabbitmq/amqp091-go v1.10.0

go.sum

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw=
2+
github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o=
3+
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
4+
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=

producer.go

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
package rabbitmq
2+
3+
import (
4+
"context"
5+
"log"
6+
"time"
7+
8+
amqp "github.com/rabbitmq/amqp091-go"
9+
)
10+
11+
type Producer struct {
12+
conn *Connection
13+
channel *amqp.Channel
14+
queueName string
15+
name string
16+
}
17+
18+
// NewProducer initializes a new Producer instance and declares the queue with the specified options
19+
func NewProducer(conn *Connection, name, queue string) (*Producer, error) {
20+
// Ensure the connection is established
21+
if conn.IsClosed() {
22+
err := conn.Connect()
23+
if err != nil {
24+
log.Printf("[%s] Failed to connect to RabbitMQ", name)
25+
return nil, err
26+
}
27+
}
28+
29+
ch, err := conn.OpenChannel()
30+
if err != nil {
31+
log.Printf("[%s] Failed to open a channel", name)
32+
return nil, err
33+
}
34+
35+
return &Producer{
36+
conn: conn,
37+
channel: ch,
38+
queueName: queue,
39+
name: name,
40+
}, nil
41+
}
42+
43+
type PublishOptions struct {
44+
Exchange string // The exchange to publish the message to
45+
RoutingKey string // The routing key to use for the message
46+
ContentType string // The content type of the message (e.g., "text/plain")
47+
Body []byte // The actual message body
48+
Mandatory bool // Whether the message is mandatory
49+
Immediate bool // Whether the message is immediate
50+
Headers amqp.Table // Optional headers for the message
51+
}
52+
53+
// Publish sends a message to the queue with retry logic and context support
54+
func (p *Producer) Publish(ctx context.Context, message []byte, options PublishOptions) error {
55+
if ctx == nil {
56+
// Default to a background context with a timeout
57+
defaultCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
58+
defer cancel()
59+
ctx = defaultCtx
60+
}
61+
62+
retryDuration := p.conn.config.RetryDuration
63+
64+
// Retry indefinitely until the message is successfully published
65+
for {
66+
// Check if connection or channel is closed, attempt to reconnect if necessary
67+
err := p.checkConnection()
68+
if err != nil {
69+
time.Sleep(retryDuration)
70+
continue
71+
}
72+
73+
// Attempt to publish the message
74+
err = p.channel.PublishWithContext(
75+
ctx,
76+
options.Exchange, // Use the exchange from the options
77+
options.RoutingKey, // Use the routing key from the options
78+
options.Mandatory, // Use the mandatory flag from the options
79+
options.Immediate, // Use the immediate flag from the options
80+
amqp.Publishing{
81+
ContentType: options.ContentType, // Use the content type from the options
82+
Body: options.Body, // Use the message body from the options
83+
Headers: options.Headers, // Use the headers from the options (if any)
84+
},
85+
)
86+
87+
if err == nil {
88+
return nil
89+
}
90+
91+
log.Printf("[%s] Failed to publish message: %v. Retrying in %v...", p.name, err, retryDuration)
92+
time.Sleep(retryDuration)
93+
}
94+
}
95+
96+
// checkConnection checks if the connection and channel are valid and reconnects if necessary
97+
func (p *Producer) checkConnection() error {
98+
// If the connection is closed, try to reconnect
99+
if p.conn.IsClosed() {
100+
log.Printf("[%s] Connection is closed. Attempting to reconnect...", p.name)
101+
err := p.conn.Reconnect()
102+
if err != nil {
103+
log.Printf("[%s] Failed to reconnect: %v. Retrying...", p.name, err)
104+
return err
105+
}
106+
log.Printf("[%s] Reconnected to RabbitMQ", p.name)
107+
}
108+
109+
// If the channel is closed or nil, try to open a new one
110+
if p.channel == nil || p.channel.IsClosed() {
111+
log.Printf("[%s] Channel is closed or nil. Reopening channel...", p.name)
112+
ch, err := p.conn.OpenChannel()
113+
if err != nil {
114+
log.Printf("[%s] Failed to open channel: %v. Retrying...", p.name, err)
115+
return err
116+
}
117+
p.channel = ch
118+
log.Printf("[%s] Channel reopened", p.name)
119+
}
120+
121+
return nil
122+
}
123+
124+
// Close gracefully shuts down the producer
125+
func (p *Producer) Close() {
126+
if p.channel != nil {
127+
p.channel.Close()
128+
}
129+
if p.conn != nil {
130+
p.conn.Close()
131+
}
132+
}

0 commit comments

Comments
 (0)