-
Notifications
You must be signed in to change notification settings - Fork 5
Add Topology Recovery record #67
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
This commit introduces the foundational infrastructure for topology recovery: - Add TopologyRecoveryOptions configuration with three modes: disabled, transient queues only, and full recovery - Implement topologyRecoveryRecords to track queues, exchanges, and bindings - Add recovery record types (queueRecoveryRecord, exchangeRecoveryRecord, bindingRecoveryRecord) with conversion methods to specifications - Integrate topology recovery tracking into AmqpConnection and AmqpManagement - Add helper method isQueueDestinationForBindingTransient to determine if binding destinations are transient - Include comprehensive unit tests for recovery record conversions and transient queue detection This is a first step to support topology recovery. The actual recovery logic will be implemented in subsequent commits.
Related to #37
As per conversation with @Gsantomaggio We want the default topology recovery to only recover transient topology
I would expect that broadcast example would work (with this patch to avoid stopping the script ): diff --git a/docs/examples/broadcast/broadcast.go b/docs/examples/broadcast/broadcast.go
index eed6779..3e7ec78 100644
--- a/docs/examples/broadcast/broadcast.go
+++ b/docs/examples/broadcast/broadcast.go
@@ -89,7 +89,8 @@ func main() {
rmq.NewMessage([]byte("Hello AMQP 1.0 - id:"+fmt.Sprintf("%d", i))))
if err != nil {
rmq.Error("Error publishing message", err)
- return
+ time.Sleep(2 * time.Second)
+ continue
}
switch publishResult.Outcome.(type) { Steps
|
Looks like it's not recording the bindings. I'm looking into it. debug log
|
I've found the problem, however, I'm not sure where to apply a fix. The problem is that The following test demonstrates the behaviour I mention: package rabbitmqamqp
import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)
var _ = Describe("Entities", func() {
Describe("AutoGeneratedQueueSpecification", func() {
It("should generate a unique name consistently", func() {
queue := &AutoGeneratedQueueSpecification{}
generatedName := queue.name()
Expect(queue.name()).To(Equal(generatedName), "generated name should be consistent")
})
})
}) I think that a specific It("should generate a unique name consistently", func() {
queue := &AutoGeneratedQueueSpecification{}
generatedName := queue.name()
Expect(queue.name()).To(Equal(generatedName), "same instance should generate the same name")
anotherQueue := &AutoGeneratedQueueSpecification{}
anotherGeneratedName := anotherQueue.name()
Expect(anotherQueue.name()).To(Equal(anotherGeneratedName), "same instance should generate the same name")
Expect(generatedName).ToNot(Equal(anotherGeneratedName), "different instances should generate different names")
}) |
Because a concrete queue specification should always the same name, otherwise the queue spec is not really a spec, but a random generator.
It always failed to stop rabbitmq container
e1c0ecd
to
e18f1b6
Compare
This pull request introduces a robust topology recovery feature to the RabbitMQ AMQP client, allowing for automatic recovery of exchanges, queues, and bindings after connection failures. It adds new configuration options to control recovery behavior, tracks topology changes for recovery, and ensures recovery actions are performed safely and efficiently.
Topology Recovery Feature
TopologyRecoveryOptions
to configure recovery behavior (disabled, only transient, or all topology) inAmqpConnOptions
, with propagation throughout connection and management layers. [1] [2] [3] [4] [5]topologyRecoveryRecords
to track exchanges, queues, and bindings for recovery, including logic for adding/removing records and converting them to specification objects.Recovery Execution
recoverTopology()
method toAmqpConnection
to restore all tracked exchanges, queues, and bindings after reconnect, with atomic flags to prevent duplicate recovery actions. [1] [2]API Changes and Record Management
DeclareQueue
,DeclareExchange
,Bind
,DeleteQueue
,DeleteExchange
, andUnbind
methods inAmqpManagement
to update recovery records and prevent duplicate entries during recovery. [1] [2] [3]Dependency and Utility Updates
slices
package for efficient manipulation of recovery record slices. [1] [2]CI Configuration
.ci/ubuntu/gha-setup.sh
to allow overriding the RabbitMQ Docker image via theRABBITMQ_IMAGE
environment variable for more flexible CI setups.