Skip to content

Conversation

Zerpet
Copy link
Member

@Zerpet Zerpet commented Oct 9, 2025

This pull request introduces a robust topology recovery feature to the RabbitMQ AMQP client, allowing for automatic recovery of exchanges, queues, and bindings after connection failures. It adds new configuration options to control recovery behavior, tracks topology changes for recovery, and ensures recovery actions are performed safely and efficiently.

Topology Recovery Feature

  • Added TopologyRecoveryOptions to configure recovery behavior (disabled, only transient, or all topology) in AmqpConnOptions, with propagation throughout connection and management layers. [1] [2] [3] [4] [5]
  • Implemented topologyRecoveryRecords to track exchanges, queues, and bindings for recovery, including logic for adding/removing records and converting them to specification objects.

Recovery Execution

  • Added recoverTopology() method to AmqpConnection to restore all tracked exchanges, queues, and bindings after reconnect, with atomic flags to prevent duplicate recovery actions. [1] [2]
  • Ensured that topology recovery is triggered on successful reconnection and that the management layer is aware of recovery state.

API Changes and Record Management

  • Modified DeclareQueue, DeclareExchange, Bind, DeleteQueue, DeleteExchange, and Unbind methods in AmqpManagement to update recovery records and prevent duplicate entries during recovery. [1] [2] [3]

Dependency and Utility Updates

  • Added usage of the slices package for efficient manipulation of recovery record slices. [1] [2]

CI Configuration

  • Updated .ci/ubuntu/gha-setup.sh to allow overriding the RabbitMQ Docker image via the RABBITMQ_IMAGE environment variable for more flexible CI setups.

Zerpet added 3 commits October 7, 2025 13:43
This commit introduces the foundational infrastructure for topology recovery:
- Add TopologyRecoveryOptions configuration with three modes: disabled, transient queues only, and full recovery
- Implement topologyRecoveryRecords to track queues, exchanges, and bindings
- Add recovery record types (queueRecoveryRecord, exchangeRecoveryRecord, bindingRecoveryRecord) with conversion methods to specifications
- Integrate topology recovery tracking into AmqpConnection and AmqpManagement
- Add helper method isQueueDestinationForBindingTransient to determine if binding destinations are transient
- Include comprehensive unit tests for recovery record conversions and transient queue detection

This is a first step to support topology recovery. The actual recovery logic will be implemented in subsequent commits.
As per conversation with @Gsantomaggio

We want the default topology recovery to only recover transient topology
@Zerpet Zerpet linked an issue Oct 9, 2025 that may be closed by this pull request
@Zerpet Zerpet requested a review from Gsantomaggio October 9, 2025 15:58
@Gsantomaggio
Copy link
Member

I would expect that broadcast example would work (with this patch to avoid stopping the script ):

diff --git a/docs/examples/broadcast/broadcast.go b/docs/examples/broadcast/broadcast.go
index eed6779..3e7ec78 100644
--- a/docs/examples/broadcast/broadcast.go
+++ b/docs/examples/broadcast/broadcast.go
@@ -89,7 +89,8 @@ func main() {
 			rmq.NewMessage([]byte("Hello AMQP 1.0 - id:"+fmt.Sprintf("%d", i))))
 		if err != nil {
 			rmq.Error("Error publishing message", err)
-			return
+			time.Sleep(2 * time.Second)
+			continue
 		}
 
 		switch publishResult.Outcome.(type) {

Steps

  1. Run the script above
  2. Close the connection using the management API
  3. Wait a bit
  4. The TCP connection is recreated, but:
2025/10/10 14:06:48 ERROR Error publishing message !BADKEY="*Error{Condition: amqp:not-found, Description: no queue 'client.gen-MzAxMWRkZWUtZDkzNC00Yzk0LWIxZjItMWMzNDA4NmM5ZGY31B2M2Y8AsgTpgAmY7PhCfg' in vhost '/', Info: map[]}"
2025/10/10 14:06:50 ERROR Error publishing message !BADKEY="*Error{Condition: amqp:not-found, Description: no queue 'client.gen-MzAxMWRkZWUtZDkzNC00Yzk0LWIxZjItMWMzNDA4NmM5ZGY31B2M2Y8AsgTpgAmY7PhCfg' in vhost '/', Info: map[]}"
2025/10/10 14:06:52 ERROR Error publishing message !BADKEY="*Error{Condition: amqp:not-found, Description: no queue 'client.gen-MzAxMWRkZWUtZDkzNC00Yzk0LWIxZjItMWMzNDA4NmM5ZGY31B2M2Y8AsgTpgAmY7PhCfg' in vhost '/', Info: map[]}"
2025/10/10 14:06:54 ERROR Error publishing message !BADKEY="*Error{Condition: amqp:not-found, Description: no queue 'client.gen-MzAxMWRkZWUtZDkzNC00Yzk0LWIxZjItMWMzNDA4NmM5ZGY31B2M2Y8AsgTpgAmY7PhCfg' in vhost '/', Info: map[]}"
2025/10/10 14:06:56 ERROR Error publishing message !BADKEY="*Error{Condition: amqp:not-found, Description: no queue 'client.gen-MzAxMWRkZWUtZDkzNC00Yzk0LWIxZjItMWMzNDA4NmM5ZGY31B2M2Y8AsgTpgAmY7PhCfg' in vhost '/', Info: map[]}"

@Zerpet
Copy link
Member Author

Zerpet commented Oct 13, 2025

Looks like it's not recording the bindings. I'm looking into it.

debug log

time=2025-10-13T11:32:25.117+01:00 level=DEBUG msg="Recovering topology"
time=2025-10-13T11:32:25.117+01:00 level=DEBUG msg="Recovering queue" queue=client.gen-MjFhNDgwODYtMDc4My00MjRiLTk4YWItMGY4Y2EwMjc1NWI11B2M2Y8AsgTpgAmY7PhCfg
time=2025-10-13T11:32:25.122+01:00 level=DEBUG msg="Recovering queue" queue=client.gen-NDU3MGU2ODEtNGVkOC00NjVjLWJhMTEtZjA4YjQxZWRiOWYx1B2M2Y8AsgTpgAmY7PhCfg
time=2025-10-13T11:32:25.125+01:00 level=DEBUG msg="Recovering queue" queue=client.gen-MzZmNzNmYTctMDM0Ny00NzBmLTlmZjItZmIxMWE4NjRlZTU21B2M2Y8AsgTpgAmY7PhCfg
time=2025-10-13T11:32:25.127+01:00 level=DEBUG msg="Recovering queue" queue=client.gen-YjEyYzAzMTctMGY4My00NWU4LTk3M2QtNGVjNGZiMTNjZDZm1B2M2Y8AsgTpgAmY7PhCfg
time=2025-10-13T11:32:25.129+01:00 level=DEBUG msg="Recovering queue" queue=client.gen-OWEwMTUwNDktNzM0NS00Y2FlLThlZGQtOTE2Zjc1ODg0N2E41B2M2Y8AsgTpgAmY7PhCfg
time=2025-10-13T11:32:25.133+01:00 level=ERROR msg="Failed to restart consumer" ID=consumer-77509298-0ba5-4a3d-9af2-3e353698f36b error="amqp: sender settlement mode \"unsettled\" requested, received \"mixed\" from server" ID=71c8aa01-4bdc-4132-9b79-2e36805f0ad4_1
time=2025-10-13T11:32:25.134+01:00 level=ERROR msg="Failed to restart consumer" ID=consumer-a1419a27-51f2-4396-848c-4971a921fba3 error="amqp: sender settlement mode \"unsettled\" requested, received \"mixed\" from server" ID=71c8aa01-4bdc-4132-9b79-2e36805f0ad4_1
time=2025-10-13T11:32:25.135+01:00 level=ERROR msg="Failed to restart consumer" ID=consumer-e9217a8c-3d44-4ffd-ac88-063a7195a426 error="amqp: sender settlement mode \"unsettled\" requested, received \"mixed\" from server" ID=71c8aa01-4bdc-4132-9b79-2e36805f0ad4_1
time=2025-10-13T11:32:25.136+01:00 level=ERROR msg="Failed to restart consumer" ID=consumer-7d3ae8c5-777e-4f71-8d85-492c713e5f83 error="amqp: sender settlement mode \"unsettled\" requested, received \"mixed\" from server" ID=71c8aa01-4bdc-4132-9b79-2e36805f0ad4_1
time=2025-10-13T11:32:25.137+01:00 level=ERROR msg="Failed to restart consumer" ID=consumer-5166156b-180d-493c-84f6-c82c4e079793 error="amqp: sender settlement mode \"unsettled\" requested, received \"mixed\" from server" ID=71c8aa01-4bdc-4132-9b79-2e36805f0ad4_1
time=2025-10-13T11:32:25.137+01:00 level=INFO msg="Entity restart complete" publisherFails=0 consumerFails=5

@Zerpet
Copy link
Member Author

Zerpet commented Oct 13, 2025

I've found the problem, however, I'm not sure where to apply a fix. The problem is that AutoGeneratedQueueSpecification does not return the name consistently. In other words, every time you call name(), you get a different name. As a consequence, the binding recovery record does not find a matching transient queue, and it does not add a recovery record for the binding.

The following test demonstrates the behaviour I mention:

package rabbitmqamqp

import (
	. "github.com/onsi/ginkgo/v2"
	. "github.com/onsi/gomega"
)

var _ = Describe("Entities", func() {
	Describe("AutoGeneratedQueueSpecification", func() {
		It("should generate a unique name consistently", func() {
			queue := &AutoGeneratedQueueSpecification{}
			generatedName := queue.name()
			Expect(queue.name()).To(Equal(generatedName), "generated name should be consistent")
		})
	})
})

I think that a specific AutoGeneratedQueueSpecification (AGQS) should return the same name consistently. Of course, different AGQS should return different names, like the following test would demonstrate:

		It("should generate a unique name consistently", func() {
			queue := &AutoGeneratedQueueSpecification{}
			generatedName := queue.name()
			Expect(queue.name()).To(Equal(generatedName), "same instance should generate the same name")

			anotherQueue := &AutoGeneratedQueueSpecification{}
			anotherGeneratedName := anotherQueue.name()
			Expect(anotherQueue.name()).To(Equal(anotherGeneratedName), "same instance should generate the same name")

			Expect(generatedName).ToNot(Equal(anotherGeneratedName), "different instances should generate different names")
		})

Because a concrete queue specification should always the same name,
otherwise the queue spec is not really a spec, but a random generator.
It always failed to stop rabbitmq container
@Zerpet Zerpet force-pushed the 37-implement-topology-recovery branch from e1c0ecd to e18f1b6 Compare October 13, 2025 12:46
@Zerpet Zerpet merged commit 3c8ebb7 into main Oct 13, 2025
3 of 4 checks passed
@Zerpet Zerpet deleted the 37-implement-topology-recovery branch October 13, 2025 14:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Implement Topology recovery

2 participants