From 9466b91301741b8c34188e4fb89fbe7835bbfee6 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Mon, 7 Oct 2024 17:34:46 -0400 Subject: [PATCH 1/2] GH-2939: Rely on custom `AmqpHeaders.RETRY_COUNT` for server retries Fixes: https://github.com/spring-cloud/spring-cloud-stream/issues/2939 The RabbitMQ 4.0 does not deal with client side `x-*` headers. Therefore, an `x-death.count` is not incremented anymore when message is re-published from client back to the broker. * Spring AMQP 3.2 has introduced an `AmqpHeaders.RETRY_COUNT` custom header. Use `messageProperties.incrementRetryCount()` in the `RabbitMessageChannelBinder` when we re-published message back to the broker for server-side retries * Fix docs respectively --- .../rabbit/RabbitMessageChannelBinder.java | 1 + .../modules/ROOT/pages/rabbit/rabbit_dlq.adoc | 53 ++++++------------- 2 files changed, 18 insertions(+), 36 deletions(-) diff --git a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java index a2b3a0ee9..22d1f9656 100644 --- a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java +++ b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/main/java/org/springframework/cloud/stream/binder/rabbit/RabbitMessageChannelBinder.java @@ -815,6 +815,7 @@ private MessageProperties adjustMessagePropertiesHeader(Throwable cause, String messageProperties.setDeliveryMode( properties.getExtension().getRepublishDeliveyMode()); } + messageProperties.incrementRetryCount(); return messageProperties; } diff --git a/docs/modules/ROOT/pages/rabbit/rabbit_dlq.adoc b/docs/modules/ROOT/pages/rabbit/rabbit_dlq.adoc index c20f1ce92..749ed3f1c 100644 --- a/docs/modules/ROOT/pages/rabbit/rabbit_dlq.adoc +++ b/docs/modules/ROOT/pages/rabbit/rabbit_dlq.adoc @@ -28,8 +28,6 @@ public class ReRouteDlqApplication { private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot"; - private static final String X_RETRIES_HEADER = "x-retries"; - public static void main(String[] args) throws Exception { ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args); System.out.println("Press enter to exit"); @@ -42,12 +40,9 @@ public class ReRouteDlqApplication { @RabbitListener(queues = DLQ) public void rePublish(Message failedMessage) { - Integer retriesHeader = (Integer) failedMessage.getMessageProperties().getHeaders().get(X_RETRIES_HEADER); - if (retriesHeader == null) { - retriesHeader = Integer.valueOf(0); - } - if (retriesHeader < 3) { - failedMessage.getMessageProperties().getHeaders().put(X_RETRIES_HEADER, retriesHeader + 1); + long retries = failedMessage.getMessageProperties().getRetryCount(); + if (retries < 3) { + failedMessage.getMessageProperties().incrementRetryCount(); this.rabbitTemplate.send(ORIGINAL_QUEUE, failedMessage); } else { @@ -74,8 +69,6 @@ public class ReRouteDlqApplication { private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot"; - private static final String X_RETRIES_HEADER = "x-retries"; - private static final String DELAY_EXCHANGE = "dlqReRouter"; public static void main(String[] args) throws Exception { @@ -90,13 +83,10 @@ public class ReRouteDlqApplication { @RabbitListener(queues = DLQ) public void rePublish(Message failedMessage) { - Map headers = failedMessage.getMessageProperties().getHeaders(); - Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER); - if (retriesHeader == null) { - retriesHeader = Integer.valueOf(0); - } - if (retriesHeader < 3) { - headers.put(X_RETRIES_HEADER, retriesHeader + 1); + long retries = failedMessage.getMessageProperties().getRetryCount(); + if (retries < 3) { + failedMessage.getMessageProperties().incrementRetryCount(); + Map headers = failedMessage.getMessageProperties().getHeaders(); headers.put("x-delay", 5000 * retriesHeader); this.rabbitTemplate.send(DELAY_EXCHANGE, ORIGINAL_QUEUE, failedMessage); } @@ -128,9 +118,10 @@ public class ReRouteDlqApplication { [[partitioned-destinations]] == Partitioned Destinations -With partitioned destinations, there is one DLQ for all partitions. We determine the original queue from the headers. +With partitioned destinations, there is one DLQ for all partitions. +We determine the original queue from the headers. -[[republishtodlq=false]] +[[republishtodlq-false]] === `republishToDlq=false` When `republishToDlq` is `false`, RabbitMQ publishes the message to the DLX/DLQ with an `x-death` header containing information about the original destination, as shown in the following example: @@ -148,8 +139,6 @@ public class ReRouteDlqApplication { private static final String X_DEATH_HEADER = "x-death"; - private static final String X_RETRIES_HEADER = "x-retries"; - public static void main(String[] args) throws Exception { ConfigurableApplicationContext context = SpringApplication.run(ReRouteDlqApplication.class, args); System.out.println("Press enter to exit"); @@ -164,12 +153,9 @@ public class ReRouteDlqApplication { @RabbitListener(queues = DLQ) public void rePublish(Message failedMessage) { Map headers = failedMessage.getMessageProperties().getHeaders(); - Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER); - if (retriesHeader == null) { - retriesHeader = Integer.valueOf(0); - } - if (retriesHeader < 3) { - headers.put(X_RETRIES_HEADER, retriesHeader + 1); + long retries = failedMessage.getMessageProperties().getRetryCount(); + if (retries < 3) { + failedMessage.getMessageProperties().incrementRetryCount(); List> xDeath = (List>) headers.get(X_DEATH_HEADER); String exchange = (String) xDeath.get(0).get("exchange"); List routingKeys = (List) xDeath.get(0).get("routing-keys"); @@ -188,7 +174,7 @@ public class ReRouteDlqApplication { } ---- -[[republishtodlq=true]] +[[republishtodlq-true]] === `republishToDlq=true` When `republishToDlq` is `true`, the republishing recoverer adds the original exchange and routing key to headers, as shown in the following example: @@ -204,8 +190,6 @@ public class ReRouteDlqApplication { private static final String PARKING_LOT = ORIGINAL_QUEUE + ".parkingLot"; - private static final String X_RETRIES_HEADER = "x-retries"; - private static final String X_ORIGINAL_EXCHANGE_HEADER = RepublishMessageRecoverer.X_ORIGINAL_EXCHANGE; private static final String X_ORIGINAL_ROUTING_KEY_HEADER = RepublishMessageRecoverer.X_ORIGINAL_ROUTING_KEY; @@ -223,12 +207,9 @@ public class ReRouteDlqApplication { @RabbitListener(queues = DLQ) public void rePublish(Message failedMessage) { Map headers = failedMessage.getMessageProperties().getHeaders(); - Integer retriesHeader = (Integer) headers.get(X_RETRIES_HEADER); - if (retriesHeader == null) { - retriesHeader = Integer.valueOf(0); - } - if (retriesHeader < 3) { - headers.put(X_RETRIES_HEADER, retriesHeader + 1); + long retries = failedMessage.getMessageProperties().getRetryCount(); + if (retries < 3) { + failedMessage.getMessageProperties().incrementRetryCount(); String exchange = (String) headers.get(X_ORIGINAL_EXCHANGE_HEADER); String originalRoutingKey = (String) headers.get(X_ORIGINAL_ROUTING_KEY_HEADER); this.rabbitTemplate.send(exchange, originalRoutingKey, failedMessage); From 30c2b3d95e3c8d68544b6cd0524e2726027338b5 Mon Sep 17 00:00:00 2001 From: Artem Bilan Date: Tue, 8 Oct 2024 15:55:26 -0400 Subject: [PATCH 2/2] * Add `RabbitDlqTests` to cover DLX functionality --- .../spring-cloud-stream-binder-rabbit/pom.xml | 5 + .../binder/rabbit/dlq/RabbitDlqTests.java | 113 ++++++++++++++++++ 2 files changed, 118 insertions(+) create mode 100644 binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/test/java/org/springframework/cloud/stream/binder/rabbit/dlq/RabbitDlqTests.java diff --git a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/pom.xml b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/pom.xml index d4b827365..9c5453ed0 100644 --- a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/pom.xml +++ b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/pom.xml @@ -98,6 +98,11 @@ spring-cloud-stream-binder-rabbit-test-support test + + org.springframework.boot + spring-boot-testcontainers + test + org.testcontainers rabbitmq diff --git a/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/test/java/org/springframework/cloud/stream/binder/rabbit/dlq/RabbitDlqTests.java b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/test/java/org/springframework/cloud/stream/binder/rabbit/dlq/RabbitDlqTests.java new file mode 100644 index 000000000..22454101a --- /dev/null +++ b/binders/rabbit-binder/spring-cloud-stream-binder-rabbit/src/test/java/org/springframework/cloud/stream/binder/rabbit/dlq/RabbitDlqTests.java @@ -0,0 +1,113 @@ +/* + * Copyright 2024-2024 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.cloud.stream.binder.rabbit.dlq; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; + +import org.junit.jupiter.api.Nested; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.RabbitMQContainer; +import org.testcontainers.utility.DockerImageName; + +import org.springframework.amqp.AmqpRejectAndDontRequeueException; +import org.springframework.amqp.ImmediateAcknowledgeAmqpException; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.AmqpHeaders; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringBootConfiguration; +import org.springframework.boot.autoconfigure.EnableAutoConfiguration; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.testcontainers.service.connection.ServiceConnection; +import org.springframework.context.annotation.Bean; +import org.springframework.messaging.Message; +import org.springframework.test.annotation.DirtiesContext; +import org.springframework.test.context.TestPropertySource; + +import static org.assertj.core.api.Assertions.assertThat; + +/** + * @author Artem Bilan + * + * @since 4.2 + */ +@SpringBootTest(properties = { "spring.cloud.stream.function.bindings.listener-in-0=input", + "spring.cloud.stream.bindings.input.destination=myDestination", + "spring.cloud.stream.bindings.input.group=consumerGroup", + "spring.cloud.stream.bindings.input.consumer.max-attempts=1", + "spring.cloud.stream.rabbit.bindings.input.consumer.auto-bind-dlq=true", + "spring.cloud.stream.rabbit.bindings.input.consumer.dlq-ttl=1000", + "spring.cloud.stream.rabbit.bindings.input.consumer.dlq-dead-letter-exchange=" }) +@DirtiesContext +public class RabbitDlqTests { + + @Test + void verifyServerSideRetry(@Autowired RabbitTemplate rabbitTemplate, @Autowired CountDownLatch dlqRetryExhausted) + throws InterruptedException { + + rabbitTemplate.convertAndSend("myDestination.consumerGroup", "test data"); + + assertThat(dlqRetryExhausted.await(10, TimeUnit.SECONDS)).isTrue(); + } + + @EnableAutoConfiguration + @SpringBootConfiguration + public static class RabbitDlqConfiguration { + + @Bean + @ServiceConnection + RabbitMQContainer rabbitContainer() { + return new RabbitMQContainer(DockerImageName.parse("rabbitmq:4")); + } + + @Bean + CountDownLatch dlqRetryExhausted() { + return new CountDownLatch(1); + } + + @Bean + public Consumer> listener(CountDownLatch dlqRetryExhausted) { + return message -> { + Long retryCount = message.getHeaders().get(AmqpHeaders.RETRY_COUNT, Long.class); + if (retryCount != null && retryCount.equals(3L)) { + dlqRetryExhausted.countDown(); + // giving up - don't send to DLX + throw new ImmediateAcknowledgeAmqpException("Failed after 4 attempts"); + } + throw new AmqpRejectAndDontRequeueException("failed"); + }; + } + + } + + @Nested + @TestPropertySource(properties = "spring.cloud.stream.rabbit.bindings.input.consumer.republish-to-dlq=false") + class NoRepublishToDlx { + + @Test + void verifyServerSideRetry(@Autowired RabbitTemplate rabbitTemplate, + @Autowired CountDownLatch dlqRetryExhausted) throws InterruptedException { + + rabbitTemplate.convertAndSend("myDestination.consumerGroup", "test data"); + + assertThat(dlqRetryExhausted.await(10, TimeUnit.SECONDS)).isTrue(); + } + + } + +}