|
14 | 14 | import com.azure.cosmos.CosmosDiagnostics;
|
15 | 15 | import com.azure.cosmos.CosmosDiagnosticsContext;
|
16 | 16 | import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig;
|
| 17 | +import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfigBuilder; |
17 | 18 | import com.azure.cosmos.CosmosException;
|
18 | 19 | import com.azure.cosmos.CosmosItemSerializer;
|
19 | 20 | import com.azure.cosmos.CosmosOperationPolicy;
|
@@ -192,6 +193,7 @@ public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorization
|
192 | 193 |
|
193 | 194 | private static final String DUMMY_SQL_QUERY = "this is dummy and only used in creating " +
|
194 | 195 | "ParallelDocumentQueryExecutioncontext, but not used";
|
| 196 | + |
195 | 197 | private final static ObjectMapper mapper = Utils.getSimpleObjectMapper();
|
196 | 198 | private final CosmosItemSerializer defaultCustomSerializer;
|
197 | 199 | private final static Logger logger = LoggerFactory.getLogger(RxDocumentClientImpl.class);
|
@@ -266,7 +268,8 @@ public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorization
|
266 | 268 | private final boolean sessionCapturingDisabled;
|
267 | 269 | private final boolean isRegionScopedSessionCapturingEnabledOnClientOrSystemConfig;
|
268 | 270 | private List<CosmosOperationPolicy> operationPolicies;
|
269 |
| - private AtomicReference<CosmosAsyncClient> cachedCosmosAsyncClientSnapshot; |
| 271 | + private final AtomicReference<CosmosAsyncClient> cachedCosmosAsyncClientSnapshot; |
| 272 | + private final CosmosEndToEndOperationLatencyPolicyConfig ppafEnforcedE2ELatencyPolicyConfigForReads; |
270 | 273 |
|
271 | 274 | public RxDocumentClientImpl(URI serviceEndpoint,
|
272 | 275 | String masterKeyOrResourceToken,
|
@@ -600,6 +603,10 @@ private RxDocumentClientImpl(URI serviceEndpoint,
|
600 | 603 | this.queryPlanCache = new ConcurrentHashMap<>();
|
601 | 604 | this.apiType = apiType;
|
602 | 605 | this.clientTelemetryConfig = clientTelemetryConfig;
|
| 606 | + this.ppafEnforcedE2ELatencyPolicyConfigForReads = evaluatePpafEnforcedE2eLatencyPolicyCfgForReads( |
| 607 | + this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover, |
| 608 | + this.connectionPolicy |
| 609 | + ); |
603 | 610 | } catch (RuntimeException e) {
|
604 | 611 | logger.error("unexpected failure in initializing client.", e);
|
605 | 612 | close();
|
@@ -2466,7 +2473,7 @@ private Mono<ResourceResponse<Document>> createDocumentCore(
|
2466 | 2473 | crossRegionAvailabilityContextForRxDocumentServiceRequest),
|
2467 | 2474 | requestRetryPolicy),
|
2468 | 2475 | scopedDiagnosticsFactory
|
2469 |
| - ), requestReference); |
| 2476 | + ), requestReference, endToEndPolicyConfig); |
2470 | 2477 | }
|
2471 | 2478 |
|
2472 | 2479 | private Mono<ResourceResponse<Document>> createDocumentInternal(
|
@@ -2576,7 +2583,10 @@ private static <T> Mono<T> getPointOperationResponseMonoWithE2ETimeout(
|
2576 | 2583 |
|
2577 | 2584 | private <T> Mono<T> handleCircuitBreakingFeedbackForPointOperation(
|
2578 | 2585 | Mono<T> response,
|
2579 |
| - AtomicReference<RxDocumentServiceRequest> requestReference) { |
| 2586 | + AtomicReference<RxDocumentServiceRequest> requestReference, |
| 2587 | + CosmosEndToEndOperationLatencyPolicyConfig effectiveEndToEndPolicyConfig) { |
| 2588 | + |
| 2589 | + applyEndToEndLatencyPolicyCfgToRequestContext(requestReference.get(), effectiveEndToEndPolicyConfig); |
2580 | 2590 |
|
2581 | 2591 | return response
|
2582 | 2592 | .doOnSuccess(ignore -> {
|
@@ -2769,6 +2779,23 @@ private static CosmosException getNegativeTimeoutException(CosmosDiagnostics cos
|
2769 | 2779 | return exception;
|
2770 | 2780 | }
|
2771 | 2781 |
|
| 2782 | + private static void applyEndToEndLatencyPolicyCfgToRequestContext(RxDocumentServiceRequest rxDocumentServiceRequest, CosmosEndToEndOperationLatencyPolicyConfig effectiveEndToEndPolicyConfig) { |
| 2783 | + |
| 2784 | + if (rxDocumentServiceRequest == null) { |
| 2785 | + return; |
| 2786 | + } |
| 2787 | + |
| 2788 | + if (rxDocumentServiceRequest.requestContext == null) { |
| 2789 | + return; |
| 2790 | + } |
| 2791 | + |
| 2792 | + if (effectiveEndToEndPolicyConfig == null) { |
| 2793 | + return; |
| 2794 | + } |
| 2795 | + |
| 2796 | + rxDocumentServiceRequest.requestContext.setEndToEndOperationLatencyPolicyConfig(effectiveEndToEndPolicyConfig); |
| 2797 | + } |
| 2798 | + |
2772 | 2799 | @Override
|
2773 | 2800 | public Mono<ResourceResponse<Document>> upsertDocument(String collectionLink, Object document,
|
2774 | 2801 | RequestOptions options, boolean disableAutomaticIdGeneration) {
|
@@ -2831,7 +2858,7 @@ private Mono<ResourceResponse<Document>> upsertDocumentCore(
|
2831 | 2858 | requestReference,
|
2832 | 2859 | crossRegionAvailabilityContextForRequest),
|
2833 | 2860 | finalRetryPolicyInstance),
|
2834 |
| - scopedDiagnosticsFactory), requestReference); |
| 2861 | + scopedDiagnosticsFactory), requestReference, endToEndPolicyConfig); |
2835 | 2862 | }
|
2836 | 2863 |
|
2837 | 2864 | private Mono<ResourceResponse<Document>> upsertDocumentInternal(
|
@@ -2974,7 +3001,7 @@ private Mono<ResourceResponse<Document>> replaceDocumentCore(
|
2974 | 3001 | requestReference,
|
2975 | 3002 | crossRegionAvailabilityContextForRequest),
|
2976 | 3003 | requestRetryPolicy),
|
2977 |
| - scopedDiagnosticsFactory), requestReference); |
| 3004 | + scopedDiagnosticsFactory), requestReference, endToEndPolicyConfig); |
2978 | 3005 | }
|
2979 | 3006 |
|
2980 | 3007 | private Mono<ResourceResponse<Document>> replaceDocumentInternal(
|
@@ -3056,7 +3083,7 @@ private Mono<ResourceResponse<Document>> replaceDocumentCore(
|
3056 | 3083 | clientContextOverride,
|
3057 | 3084 | requestReference,
|
3058 | 3085 | crossRegionAvailabilityContextForRequest),
|
3059 |
| - requestRetryPolicy), requestReference); |
| 3086 | + requestRetryPolicy), requestReference, cosmosEndToEndOperationLatencyPolicyConfig); |
3060 | 3087 | }
|
3061 | 3088 |
|
3062 | 3089 | private Mono<ResourceResponse<Document>> replaceDocumentInternal(
|
@@ -3232,7 +3259,17 @@ private CosmosEndToEndOperationLatencyPolicyConfig getEffectiveEndToEndOperation
|
3232 | 3259 | return null;
|
3233 | 3260 | }
|
3234 | 3261 |
|
3235 |
| - return this.cosmosEndToEndOperationLatencyPolicyConfig; |
| 3262 | + if (this.cosmosEndToEndOperationLatencyPolicyConfig != null) { |
| 3263 | + return this.cosmosEndToEndOperationLatencyPolicyConfig; |
| 3264 | + } |
| 3265 | + |
| 3266 | + // If request options level and client-level e2e latency policy config, |
| 3267 | + // rely on PPAF enforced defaults |
| 3268 | + if (operationType.isReadOnlyOperation()) { |
| 3269 | + return this.ppafEnforcedE2ELatencyPolicyConfigForReads; |
| 3270 | + } |
| 3271 | + |
| 3272 | + return null; |
3236 | 3273 | }
|
3237 | 3274 |
|
3238 | 3275 | @Override
|
@@ -3295,7 +3332,7 @@ private Mono<ResourceResponse<Document>> patchDocumentCore(
|
3295 | 3332 | requestReference,
|
3296 | 3333 | crossRegionAvailabilityContextForRequest),
|
3297 | 3334 | documentClientRetryPolicy),
|
3298 |
| - scopedDiagnosticsFactory), requestReference); |
| 3335 | + scopedDiagnosticsFactory), requestReference, cosmosEndToEndOperationLatencyPolicyConfig); |
3299 | 3336 | }
|
3300 | 3337 |
|
3301 | 3338 | private Mono<ResourceResponse<Document>> patchDocumentInternal(
|
@@ -3504,7 +3541,7 @@ private Mono<ResourceResponse<Document>> deleteDocumentCore(
|
3504 | 3541 | requestReference,
|
3505 | 3542 | crossRegionAvailabilityContextForRequest),
|
3506 | 3543 | requestRetryPolicy),
|
3507 |
| - scopedDiagnosticsFactory), requestReference); |
| 3544 | + scopedDiagnosticsFactory), requestReference, endToEndPolicyConfig); |
3508 | 3545 | }
|
3509 | 3546 |
|
3510 | 3547 | private Mono<ResourceResponse<Document>> deleteDocumentInternal(
|
@@ -3692,7 +3729,7 @@ private Mono<ResourceResponse<Document>> readDocumentCore(
|
3692 | 3729 | crossRegionAvailabilityContextForRequest),
|
3693 | 3730 | retryPolicyInstance),
|
3694 | 3731 | scopedDiagnosticsFactory
|
3695 |
| - ), requestReference); |
| 3732 | + ), requestReference, endToEndPolicyConfig); |
3696 | 3733 | }
|
3697 | 3734 |
|
3698 | 3735 | private Mono<ResourceResponse<Document>> readDocumentInternal(
|
@@ -4883,7 +4920,7 @@ public Mono<CosmosBatchResponse> executeBatchRequest(String collectionLink,
|
4883 | 4920 | requestReference), documentClientRetryPolicy),
|
4884 | 4921 | scopedDiagnosticsFactory
|
4885 | 4922 | ),
|
4886 |
| - requestReference); |
| 4923 | + requestReference, endToEndPolicyConfig); |
4887 | 4924 | }
|
4888 | 4925 |
|
4889 | 4926 | private Mono<StoredProcedureResponse> executeStoredProcedureInternal(String storedProcedureLink,
|
@@ -7145,6 +7182,49 @@ private static boolean isNonTransientResultForHedging(int statusCode, int subSta
|
7145 | 7182 | return false;
|
7146 | 7183 | }
|
7147 | 7184 |
|
| 7185 | + private static CosmosEndToEndOperationLatencyPolicyConfig evaluatePpafEnforcedE2eLatencyPolicyCfgForReads( |
| 7186 | + GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover globalPartitionEndpointManagerForPerPartitionAutomaticFailover, |
| 7187 | + ConnectionPolicy connectionPolicy) { |
| 7188 | + |
| 7189 | + if (!globalPartitionEndpointManagerForPerPartitionAutomaticFailover.isPerPartitionAutomaticFailoverEnabled()) { |
| 7190 | + return null; |
| 7191 | + } |
| 7192 | + |
| 7193 | + if (Configs.isReadAvailabilityStrategyEnabledWithPpaf()) { |
| 7194 | + |
| 7195 | + if (connectionPolicy.getConnectionMode() == ConnectionMode.DIRECT) { |
| 7196 | + Duration networkRequestTimeout = connectionPolicy.getTcpNetworkRequestTimeout(); |
| 7197 | + |
| 7198 | + checkNotNull(networkRequestTimeout, "Argument 'networkRequestTimeout' cannot be null!"); |
| 7199 | + |
| 7200 | + Duration overallE2eLatencyTimeout = networkRequestTimeout.plus(Utils.ONE_SECOND); |
| 7201 | + Duration threshold = Utils.min(networkRequestTimeout.dividedBy(2), Utils.ONE_SECOND); |
| 7202 | + Duration thresholdStep = Utils.min(threshold.dividedBy(2), Utils.HALF_SECOND); |
| 7203 | + |
| 7204 | + return new CosmosEndToEndOperationLatencyPolicyConfigBuilder(overallE2eLatencyTimeout) |
| 7205 | + .availabilityStrategy(new ThresholdBasedAvailabilityStrategy(threshold, thresholdStep)) |
| 7206 | + .build(); |
| 7207 | + } else { |
| 7208 | + |
| 7209 | + Duration httpNetworkRequestTimeout = connectionPolicy.getHttpNetworkRequestTimeout(); |
| 7210 | + |
| 7211 | + checkNotNull(httpNetworkRequestTimeout, "Argument 'httpNetworkRequestTimeout' cannot be null!"); |
| 7212 | + |
| 7213 | + // 6s was chosen to accommodate for control-plane hot path read timeout retries (like QueryPlan / PartitionKeyRange) |
| 7214 | + Duration overallE2eLatencyTimeout = Utils.min(Utils.SIX_SECONDS, httpNetworkRequestTimeout); |
| 7215 | + |
| 7216 | + Duration threshold = Utils.min(overallE2eLatencyTimeout.dividedBy(2), Utils.ONE_SECOND); |
| 7217 | + Duration thresholdStep = Utils.min(threshold.dividedBy(2), Utils.HALF_SECOND); |
| 7218 | + |
| 7219 | + return new CosmosEndToEndOperationLatencyPolicyConfigBuilder(overallE2eLatencyTimeout) |
| 7220 | + .availabilityStrategy(new ThresholdBasedAvailabilityStrategy(threshold, thresholdStep)) |
| 7221 | + .build(); |
| 7222 | + } |
| 7223 | + } |
| 7224 | + |
| 7225 | + return null; |
| 7226 | + } |
| 7227 | + |
7148 | 7228 | private DiagnosticsClientContext getEffectiveClientContext(DiagnosticsClientContext clientContextOverride) {
|
7149 | 7229 | if (clientContextOverride != null) {
|
7150 | 7230 | return clientContextOverride;
|
@@ -7261,6 +7341,8 @@ private <T> Mono<T> executeFeedOperationWithAvailabilityStrategy(
|
7261 | 7341 | this.getEffectiveEndToEndOperationLatencyPolicyConfig(
|
7262 | 7342 | req.requestContext.getEndToEndOperationLatencyPolicyConfig(), resourceType, operationType);
|
7263 | 7343 |
|
| 7344 | + req.requestContext.setEndToEndOperationLatencyPolicyConfig(endToEndPolicyConfig); |
| 7345 | + |
7264 | 7346 | List<String> initialExcludedRegions = req.requestContext.getExcludeRegions();
|
7265 | 7347 | List<String> orderedApplicableRegionsForSpeculation = this.getApplicableRegionsForSpeculation(
|
7266 | 7348 | endToEndPolicyConfig,
|
|
0 commit comments