Skip to content

Commit b4aba83

Browse files
authored
[Messaging]: MessageFlux: adding a retry const that limit retry to terminal completion event (Azure#45400)
* MessageFlux: adding a retry const that when used limit retry to terminal completion event * Using completion based retry for event hubs * update Service Bus pom to use core-amqp dev version * minor rearrangement in version_client * updating changelog for core-amqp and event hubs
1 parent a9a12aa commit b4aba83

File tree

8 files changed

+22
-4
lines changed

8 files changed

+22
-4
lines changed

eng/versioning/version_client.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -514,6 +514,8 @@ io.clientcore:annotation-processor-test;1.0.0-beta.1;1.0.0-beta.1
514514
# In the pom, the version update tag after the version should name the unreleased package and the dependency version:
515515
# <!-- {x-version-update;unreleased_com.azure:azure-core;dependency} -->
516516

517+
unreleased_com.azure:azure-core-amqp;2.10.0-beta.1
518+
517519
unreleased_io.clientcore:core;1.0.0-beta.10
518520
unreleased_io.clientcore:annotation-processor;1.0.0-beta.3
519521
unreleased_com.azure.v2:azure-core;2.0.0-beta.1

sdk/core/azure-core-amqp/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
### Features Added
66

7+
- Added an `AmqpRetryOptions` constant in `MessageFlux` to configure retries on terminal completion events but propagates terminal errors. ([45400](https://github.com/Azure/azure-sdk-for-java/pull/45400))
8+
79
### Breaking Changes
810

911
### Bugs Fixed

sdk/core/azure-core-amqp/src/main/java/com/azure/core/amqp/implementation/MessageFlux.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ public final class MessageFlux extends FluxOperator<AmqpReceiveLink, Message> {
5353
* (i.e., disables the retry action to obtain next receiver from the upstream).
5454
**/
5555
public static final AmqpRetryPolicy NULL_RETRY_POLICY = new FixedAmqpRetryPolicy(new AmqpRetryOptions());
56+
/** An AmqpRetryPolicy const indicates that MessageFlux should retry to obtain next receiver from the upstream
57+
* only when the current receiver terminates with completion, any error from the current receiver will terminate
58+
* the MessageFlux.
59+
**/
60+
public static final AmqpRetryPolicy RETRY_ONLY_COMPLETION = new FixedAmqpRetryPolicy(new AmqpRetryOptions());
5661
private static final String MESSAGE_FLUX_KEY = "messageFlux";
5762
private final ClientLogger logger;
5863
/**
@@ -650,6 +655,13 @@ private void setTerminationSignalOrScheduleNextMediatorRequest(Throwable error,
650655
return;
651656
}
652657

658+
if (retryPolicy == RETRY_ONLY_COMPLETION) {
659+
if (error != null) {
660+
onError(error);
661+
return;
662+
}
663+
}
664+
653665
final Duration delay;
654666
if (error == null) {
655667
// Even if the broker sets no link error condition, it's good to back off before creating a new link.

sdk/eventhubs/azure-messaging-eventhubs/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44

55
### Features Added
66

7+
- Updated the partition receiver to retry on terminal completion signal instead of sending it downstream. ([45400](https://github.com/Azure/azure-sdk-for-java/pull/45400))
8+
79
### Breaking Changes
810

911
### Bugs Fixed

sdk/eventhubs/azure-messaging-eventhubs/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
<dependency>
5454
<groupId>com.azure</groupId>
5555
<artifactId>azure-core-amqp</artifactId>
56-
<version>2.9.16</version> <!-- {x-version-update;com.azure:azure-core-amqp;dependency} -->
56+
<version>2.10.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core-amqp;dependency} -->
5757
</dependency>
5858

5959
<!-- Test dependencies -->

sdk/eventhubs/azure-messaging-eventhubs/src/main/java/com/azure/messaging/eventhubs/EventHubConsumerAsyncClient.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -584,7 +584,7 @@ private EventHubPartitionAsyncConsumer createPartitionConsumer(String linkName,
584584
final MessageFluxWrapper linkMessageProcessor;
585585
if (connectionProcessor.isV2()) {
586586
MessageFlux messageFlux = new MessageFlux(receiveLinkFlux, prefetchCount, CreditFlowMode.EmissionDriven,
587-
MessageFlux.NULL_RETRY_POLICY);
587+
MessageFlux.RETRY_ONLY_COMPLETION);
588588
linkMessageProcessor
589589
= new MessageFluxWrapper(InstrumentedMessageFlux.instrument(messageFlux, partitionId, instrumentation));
590590
} else {

sdk/eventhubs/azure-messaging-eventhubs/src/test/java/com/azure/messaging/eventhubs/EventHubPartitionAsyncConsumerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -328,7 +328,7 @@ private static SystemProperties getSystemProperties(AmqpAnnotatedMessage amqpAnn
328328
private MessageFluxWrapper createLinkProcessor(boolean isV2) {
329329
if (isV2) {
330330
final MessageFlux messageFlux = new MessageFlux(createSink(link1, link2), PREFETCH,
331-
CreditFlowMode.EmissionDriven, MessageFlux.NULL_RETRY_POLICY);
331+
CreditFlowMode.EmissionDriven, MessageFlux.RETRY_ONLY_COMPLETION);
332332
return new MessageFluxWrapper(messageFlux);
333333
} else {
334334
final AmqpReceiveLinkProcessor receiveLinkProcessor

sdk/servicebus/azure-messaging-servicebus/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
<dependency>
7373
<groupId>com.azure</groupId>
7474
<artifactId>azure-core-amqp</artifactId>
75-
<version>2.9.16</version> <!-- {x-version-update;com.azure:azure-core-amqp;dependency} -->
75+
<version>2.10.0-beta.1</version> <!-- {x-version-update;unreleased_com.azure:azure-core-amqp;dependency} -->
7676
</dependency>
7777
<dependency>
7878
<groupId>com.azure</groupId>

0 commit comments

Comments
 (0)