diff --git a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/pom.xml b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/pom.xml
index d4b827365..9c5453ed0 100644
--- a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/pom.xml
+++ b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/pom.xml
@@ -98,6 +98,11 @@
spring-cloud-stream-binder-rabbit-test-support
test
+
+ org.springframework.boot
+ spring-boot-testcontainers
+ test
+
org.testcontainers
rabbitmq
diff --git a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java
index a2b3a0ee9..22d1f9656 100644
--- a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java
+++ b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java
@@ -815,6 +815,7 @@ private MessageProperties adjustMessagePropertiesHeader(Throwable cause, String
messageProperties.setDeliveryMode(
properties.getExtension().getRepublishDeliveyMode());
}
+ messageProperties.incrementRetryCount();
return messageProperties;
}
diff --git a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/test/java/org/springframework/cloud/stream/binder/rabbit/dlq/RabbitDlqTests.java b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/test/java/org/springframework/cloud/stream/binder/rabbit/dlq/RabbitDlqTests.java
new file mode 100644
index 000000000..22454101a
--- /dev/null
+++ b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/test/java/org/springframework/cloud/stream/binder/rabbit/dlq/RabbitDlqTests.java
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2024-2024 the original author or authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.springframework.cloud.stream.binder.rabbit.dlq;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Consumer;
+
+import org.junit.jupiter.api.Nested;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.RabbitMQContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import org.springframework.amqp.AmqpRejectAndDontRequeueException;
+import org.springframework.amqp.ImmediateAcknowledgeAmqpException;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.amqp.support.AmqpHeaders;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.SpringBootConfiguration;
+import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.testcontainers.service.connection.ServiceConnection;
+import org.springframework.context.annotation.Bean;
+import org.springframework.messaging.Message;
+import org.springframework.test.annotation.DirtiesContext;
+import org.springframework.test.context.TestPropertySource;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * @author Artem Bilan
+ *
+ * @since 4.2
+ */
+@SpringBootTest(properties = { "spring.cloud.stream.function.bindings.listener-in-0=input",
+ "spring.cloud.stream.bindings.input.destination=myDestination",
+ "spring.cloud.stream.bindings.input.group=consumerGroup",
+ "spring.cloud.stream.bindings.input.consumer.max-attempts=1",
+ "spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true",
+ "spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=1000",
+ "spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=" })
+@DirtiesContext
+public class RabbitDlqTests {
+
+ @Test
+ void verifyServerSideRetry(@Autowired RabbitTemplate rabbitTemplate, @Autowired CountDownLatch dlqRetryExhausted)
+ throws InterruptedException {
+
+ rabbitTemplate.convertAndSend("myDestination.consumerGroup", "test data");
+
+ assertThat(dlqRetryExhausted.await(10, TimeUnit.SECONDS)).isTrue();
+ }
+
+ @EnableAutoConfiguration
+ @SpringBootConfiguration
+ public static class RabbitDlqConfiguration {
+
+ @Bean
+ @ServiceConnection
+ RabbitMQContainer rabbitContainer() {
+ return new RabbitMQContainer(DockerImageName.parse("rabbitmq:4"));
+ }
+
+ @Bean
+ CountDownLatch dlqRetryExhausted() {
+ return new CountDownLatch(1);
+ }
+
+ @Bean
+ public Consumer> listener(CountDownLatch dlqRetryExhausted) {
+ return message -> {
+ Long retryCount = message.getHeaders().get(AmqpHeaders.RETRY_COUNT, Long.class);
+ if (retryCount != null && retryCount.equals(3L)) {
+ dlqRetryExhausted.countDown();
+ // giving up - don't send to DLX
+ throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts");
+ }
+ throw new AmqpRejectAndDontRequeueException("failed");
+ };
+ }
+
+ }
+
+ @Nested
+ @TestPropertySource(properties = "spring.cloud.stream.rabbit.bindings.input.consumer.republish-to-dlq=false")
+ class NoRepublishToDlx {
+
+ @Test
+ void verifyServerSideRetry(@Autowired RabbitTemplate rabbitTemplate,
+ @Autowired CountDownLatch dlqRetryExhausted) throws InterruptedException {
+
+ rabbitTemplate.convertAndSend("myDestination.consumerGroup", "test data");
+
+ assertThat(dlqRetryExhausted.await(10, TimeUnit.SECONDS)).isTrue();
+ }
+
+ }
+
+}
diff --git a/docs/modules/ROOT/pages/rabbit/rabbit_dlq.adoc b/docs/modules/ROOT/pages/rabbit/rabbit_dlq.adoc
index c20f1ce92..749ed3f1c 100644
--- a/docs/modules/ROOT/pages/rabbit/rabbit_dlq.adoc
+++ b/docs/modules/ROOT/pages/rabbit/rabbit_dlq.adoc
@@ -28,8 +28,6 @@ public class ReRouteDlqApplication {
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
- private static final String X_RETRIES_HEADER = "x-retries";
-
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Press enter to exit");
@@ -42,12 +40,9 @@ public class ReRouteDlqApplication {
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
- Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER);
- if (retriesHeader == null) {
- retriesHeader = Integer.valueOf(0);
- }
- if (retriesHeader < 3) {
- failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1);
+ long retries = failedMessage.getMessageProperties().getRetryCount();
+ if (retries < 3) {
+ failedMessage.getMessageProperties().incrementRetryCount();
this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage);
}
else {
@@ -74,8 +69,6 @@ public class ReRouteDlqApplication {
private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot";
- private static final String X_RETRIES_HEADER = "x-retries";
-
private static final String DELAY_EXCHANGE = "dlqReRouter";
public static void main(String[] args) throws Exception {
@@ -90,13 +83,10 @@ public class ReRouteDlqApplication {
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
- Map headers = failedMessage.getMessageProperties().getHeaders();
- Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
- if (retriesHeader == null) {
- retriesHeader = Integer.valueOf(0);
- }
- if (retriesHeader < 3) {
- headers.put(X_RETRIES_HEADER, retriesHeader + 1);
+ long retries = failedMessage.getMessageProperties().getRetryCount();
+ if (retries < 3) {
+ failedMessage.getMessageProperties().incrementRetryCount();
+ Map headers = failedMessage.getMessageProperties().getHeaders();
headers.put("x-delay", 5000 * retriesHeader);
this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage);
}
@@ -128,9 +118,10 @@ public class ReRouteDlqApplication {
[[partitioned-destinations]]
== Partitioned Destinations
-With partitioned destinations, there is one DLQ for all partitions. We determine the original queue from the headers.
+With partitioned destinations, there is one DLQ for all partitions.
+We determine the original queue from the headers.
-[[republishtodlq=false]]
+[[republishtodlq-false]]
=== `republishToDlq=false`
When `republishToDlq` is `false`, RabbitMQ publishes the message to the DLX/DLQ with an `x-death` header containing information about the original destination, as shown in the following example:
@@ -148,8 +139,6 @@ public class ReRouteDlqApplication {
private static final String X_DEATH_HEADER = "x-death";
- private static final String X_RETRIES_HEADER = "x-retries";
-
public static void main(String[] args) throws Exception {
ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args);
System.out.println("Press enter to exit");
@@ -164,12 +153,9 @@ public class ReRouteDlqApplication {
@RabbitListener(queues = DLQ)
public void rePublish(Message failedMessage) {
Map headers = failedMessage.getMessageProperties().getHeaders();
- Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER);
- if (retriesHeader == null) {
- retriesHeader = Integer.valueOf(0);
- }
- if (retriesHeader < 3) {
- headers.put(X_RETRIES_HEADER, retriesHeader + 1);
+ long retries = failedMessage.getMessageProperties().getRetryCount();
+ if (retries < 3) {
+ failedMessage.getMessageProperties().incrementRetryCount();
List