Skip to content

Commit 878fb75

Browse files
Fixing possible hang in CosmosPagedIterable when using custom defaultPageSize or FeedResponse handler (Azure#45290)
* Fixing possible hang in CosmosPagedIterable when using custom defaultPageSize or FeedResponse handler * Iterating on change
1 parent 193adf3 commit 878fb75

File tree

2 files changed

+79
-33
lines changed

2 files changed

+79
-33
lines changed

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/CosmosPagedFluxDefaultImpl.java

Lines changed: 42 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,13 @@
1212
import com.azure.cosmos.implementation.FeedOperationState;
1313
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
1414
import com.azure.cosmos.models.FeedResponse;
15+
import org.slf4j.Logger;
16+
import org.slf4j.LoggerFactory;
1517
import reactor.core.publisher.Flux;
1618

1719
import java.time.Duration;
1820
import java.time.Instant;
21+
import java.util.concurrent.atomic.AtomicInteger;
1922
import java.util.concurrent.atomic.AtomicLong;
2023
import java.util.concurrent.atomic.AtomicReference;
2124
import java.util.function.Consumer;
@@ -36,12 +39,13 @@
3639
* @see FeedResponse
3740
*/
3841
final class CosmosPagedFluxDefaultImpl<T> extends CosmosPagedFlux<T> {
42+
private static final Logger LOGGER = LoggerFactory.getLogger(CosmosPagedFluxStaticListImpl.class);
3943
private static final ImplementationBridgeHelpers.CosmosDiagnosticsContextHelper.CosmosDiagnosticsContextAccessor ctxAccessor =
4044
ImplementationBridgeHelpers.CosmosDiagnosticsContextHelper.getCosmosDiagnosticsContextAccessor();
4145

4246
private final Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction;
43-
private final Consumer<FeedResponse<T>> feedResponseConsumer;
44-
private final int defaultPageSize;
47+
private final AtomicReference<Consumer<FeedResponse<T>>> feedResponseConsumer;
48+
private final AtomicInteger defaultPageSize;
4549

4650
CosmosPagedFluxDefaultImpl(Function<CosmosPagedFluxOptions, Flux<FeedResponse<T>>> optionsFluxFunction) {
4751
this(optionsFluxFunction, null, -1);
@@ -57,8 +61,8 @@ final class CosmosPagedFluxDefaultImpl<T> extends CosmosPagedFlux<T> {
5761
int defaultPageSize) {
5862
super();
5963
this.optionsFluxFunction = optionsFluxFunction;
60-
this.feedResponseConsumer = feedResponseConsumer;
61-
this.defaultPageSize = defaultPageSize;
64+
this.feedResponseConsumer = new AtomicReference<>(feedResponseConsumer);
65+
this.defaultPageSize = new AtomicInteger(defaultPageSize);
6266
}
6367

6468
/**
@@ -68,18 +72,39 @@ final class CosmosPagedFluxDefaultImpl<T> extends CosmosPagedFlux<T> {
6872
* @return CosmosPagedFlux instance with attached handler
6973
*/
7074
public CosmosPagedFlux<T> handle(Consumer<FeedResponse<T>> newFeedResponseConsumer) {
71-
if (this.feedResponseConsumer != null) {
72-
return new CosmosPagedFluxDefaultImpl<>(
73-
this.optionsFluxFunction,
74-
this.feedResponseConsumer.andThen(newFeedResponseConsumer));
75-
} else {
76-
return new CosmosPagedFluxDefaultImpl<>(this.optionsFluxFunction, newFeedResponseConsumer);
75+
int i = 0;
76+
while (true) {
77+
Consumer<FeedResponse<T>> feedResponseConsumerSnapshot = this.feedResponseConsumer.get();
78+
i++;
79+
if (feedResponseConsumerSnapshot != null) {
80+
81+
if (this.feedResponseConsumer.compareAndSet(
82+
feedResponseConsumerSnapshot, feedResponseConsumerSnapshot.andThen(newFeedResponseConsumer))) {
83+
84+
break;
85+
}
86+
} else {
87+
if (this.feedResponseConsumer.compareAndSet(
88+
null,
89+
newFeedResponseConsumer)) {
90+
91+
break;
92+
}
93+
}
94+
95+
if (i > 10) {
96+
LOGGER.warn("Highly concurrent calls to CosmosPagedFlux.handle "
97+
+ "are not expected and can result in perf regressions. Avoid this by reducing concurrency.");
98+
}
7799
}
100+
101+
return this;
78102
}
79103

80104
@Override
81105
CosmosPagedFlux<T> withDefaultPageSize(int pageSize) {
82-
return new CosmosPagedFluxDefaultImpl<>(this.optionsFluxFunction, this.feedResponseConsumer, pageSize);
106+
this.defaultPageSize.set(pageSize);
107+
return this;
83108
}
84109

85110
@Override
@@ -113,8 +138,9 @@ public Flux<FeedResponse<T>> byPage(String continuationToken, int preferredPageS
113138
private CosmosPagedFluxOptions createCosmosPagedFluxOptions() {
114139
CosmosPagedFluxOptions cosmosPagedFluxOptions = new CosmosPagedFluxOptions();
115140

116-
if (this.defaultPageSize > 0) {
117-
cosmosPagedFluxOptions.setMaxItemCount(this.defaultPageSize);
141+
int defaultPageSizeSnapshot = this.defaultPageSize.get();
142+
if (defaultPageSizeSnapshot > 0) {
143+
cosmosPagedFluxOptions.setMaxItemCount(defaultPageSizeSnapshot);
118144
}
119145

120146
return cosmosPagedFluxOptions;
@@ -137,7 +163,7 @@ private Flux<FeedResponse<T>> wrapWithTracingIfEnabled(CosmosPagedFluxOptions p
137163
case ON_COMPLETE:
138164
case ON_NEXT:
139165
DiagnosticsProvider.recordFeedResponse(
140-
feedResponseConsumer,
166+
feedResponseConsumer.get(),
141167
pagedFluxOptions.getFeedOperationState(),
142168
() ->pagedFluxOptions.getSamplingRateSnapshot(),
143169
tracerProvider,
@@ -169,7 +195,7 @@ private Flux<FeedResponse<T>> wrapWithTracingIfEnabled(CosmosPagedFluxOptions p
169195
case ON_COMPLETE:
170196
if (response != null) {
171197
DiagnosticsProvider.recordFeedResponse(
172-
feedResponseConsumer,
198+
feedResponseConsumer.get(),
173199
pagedFluxOptions.getFeedOperationState(),
174200
() ->pagedFluxOptions.getSamplingRateSnapshot(),
175201
tracerProvider,
@@ -193,7 +219,7 @@ private Flux<FeedResponse<T>> wrapWithTracingIfEnabled(CosmosPagedFluxOptions p
193219
break;
194220
case ON_NEXT:
195221
DiagnosticsProvider.recordFeedResponse(
196-
feedResponseConsumer,
222+
feedResponseConsumer.get(),
197223
pagedFluxOptions.getFeedOperationState(),
198224
() ->pagedFluxOptions.getSamplingRateSnapshot(),
199225
tracerProvider,

sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/util/CosmosPagedFluxStaticListImpl.java

Lines changed: 37 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,25 +5,30 @@
55

66
import com.azure.cosmos.implementation.ImplementationBridgeHelpers;
77
import com.azure.cosmos.models.FeedResponse;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
810
import reactor.core.publisher.Flux;
911

1012
import java.util.ArrayList;
1113
import java.util.List;
14+
import java.util.concurrent.atomic.AtomicInteger;
15+
import java.util.concurrent.atomic.AtomicReference;
1216
import java.util.function.Consumer;
1317

1418
import static com.azure.cosmos.implementation.guava25.base.Preconditions.checkNotNull;
1519

1620
final class CosmosPagedFluxStaticListImpl<T> extends CosmosPagedFlux<T> {
21+
private static final Logger LOGGER = LoggerFactory.getLogger(CosmosPagedFluxStaticListImpl.class);
1722
private static final ImplementationBridgeHelpers.FeedResponseHelper.FeedResponseAccessor feedResponseHlp =
1823
ImplementationBridgeHelpers.FeedResponseHelper.getFeedResponseAccessor();
1924

2025
private static final int DEFAULT_PAGE_SIZE = 100;
2126

22-
private final Consumer<FeedResponse<T>> feedResponseConsumer;
27+
private final AtomicReference<Consumer<FeedResponse<T>>> feedResponseConsumer;
2328

2429
private final List<T> items;
2530
private final boolean isChangeFeed;
26-
private final int defaultPageSize;
31+
private final AtomicInteger defaultPageSize;
2732

2833
CosmosPagedFluxStaticListImpl(List<T> items, boolean isChangeFeed) {
2934
this(items, isChangeFeed, null, DEFAULT_PAGE_SIZE);
@@ -40,8 +45,8 @@ final class CosmosPagedFluxStaticListImpl<T> extends CosmosPagedFlux<T> {
4045
checkNotNull(items, "Argument 'items' must not be null.");
4146
this.items = items;
4247
this.isChangeFeed = isChangeFeed;
43-
this.feedResponseConsumer = feedResponseConsumer;
44-
this.defaultPageSize = defaultPageSize;
48+
this.feedResponseConsumer = new AtomicReference<>(feedResponseConsumer);
49+
this.defaultPageSize = new AtomicInteger(defaultPageSize);
4550
}
4651

4752
@Override
@@ -110,23 +115,38 @@ public Flux<FeedResponse<T>> byPage(String continuation, int pageSize) {
110115

111116
@Override
112117
public CosmosPagedFlux<T> handle(Consumer<FeedResponse<T>> newFeedResponseConsumer) {
113-
if (this.feedResponseConsumer != null) {
114-
return new CosmosPagedFluxStaticListImpl<>(
115-
this.items,
116-
this.isChangeFeed,
117-
this.feedResponseConsumer.andThen(newFeedResponseConsumer),
118-
this.defaultPageSize);
119-
} else {
120-
return new CosmosPagedFluxStaticListImpl<>(
121-
this.items,
122-
this.isChangeFeed,
123-
newFeedResponseConsumer,
124-
this.defaultPageSize);
118+
int i = 0;
119+
while (true) {
120+
i++;
121+
Consumer<FeedResponse<T>> feedResponseConsumerSnapshot = this.feedResponseConsumer.get();
122+
if (feedResponseConsumerSnapshot != null) {
123+
124+
if (this.feedResponseConsumer.compareAndSet(
125+
feedResponseConsumerSnapshot, feedResponseConsumerSnapshot.andThen(newFeedResponseConsumer))) {
126+
127+
break;
128+
}
129+
} else {
130+
if (this.feedResponseConsumer.compareAndSet(
131+
null,
132+
newFeedResponseConsumer)) {
133+
134+
break;
135+
}
136+
}
137+
138+
if (i > 10) {
139+
LOGGER.warn("Highly concurrent calls to CosmosPagedFlux.handle "
140+
+ "are not expected and can result in perf regressions. Avoid this by reducing concurrency.");
141+
}
125142
}
143+
144+
return this;
126145
}
127146

128147
@Override
129148
CosmosPagedFlux<T> withDefaultPageSize(int pageSize) {
130-
return new CosmosPagedFluxStaticListImpl<>(this.items, this.isChangeFeed, this.feedResponseConsumer, pageSize);
149+
this.defaultPageSize.set(pageSize);
150+
return this;
131151
}
132152
}

0 commit comments

Comments
 (0)