49
49
import com .azure .cosmos .implementation .patch .PatchUtil ;
50
50
import com .azure .cosmos .implementation .perPartitionAutomaticFailover .GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover ;
51
51
import com .azure .cosmos .implementation .perPartitionCircuitBreaker .GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker ;
52
+ import com .azure .cosmos .implementation .perPartitionCircuitBreaker .PartitionLevelCircuitBreakerConfig ;
52
53
import com .azure .cosmos .implementation .query .DocumentQueryExecutionContextFactory ;
53
54
import com .azure .cosmos .implementation .query .IDocumentQueryClient ;
54
55
import com .azure .cosmos .implementation .query .IDocumentQueryExecutionContext ;
@@ -268,7 +269,7 @@ public class RxDocumentClientImpl implements AsyncDocumentClient, IAuthorization
268
269
private final boolean isRegionScopedSessionCapturingEnabledOnClientOrSystemConfig ;
269
270
private List <CosmosOperationPolicy > operationPolicies ;
270
271
private final AtomicReference <CosmosAsyncClient > cachedCosmosAsyncClientSnapshot ;
271
- private final CosmosEndToEndOperationLatencyPolicyConfig ppafEnforcedE2ELatencyPolicyConfigForReads ;
272
+ private CosmosEndToEndOperationLatencyPolicyConfig ppafEnforcedE2ELatencyPolicyConfigForReads ;
272
273
273
274
public RxDocumentClientImpl (URI serviceEndpoint ,
274
275
String masterKeyOrResourceToken ,
@@ -290,8 +291,7 @@ public RxDocumentClientImpl(URI serviceEndpoint,
290
291
SessionRetryOptions sessionRetryOptions ,
291
292
CosmosContainerProactiveInitConfig containerProactiveInitConfig ,
292
293
CosmosItemSerializer defaultCustomSerializer ,
293
- boolean isRegionScopedSessionCapturingEnabled ,
294
- boolean isPerPartitionAutomaticFailoverEnabled ) {
294
+ boolean isRegionScopedSessionCapturingEnabled ) {
295
295
this (
296
296
serviceEndpoint ,
297
297
masterKeyOrResourceToken ,
@@ -313,8 +313,8 @@ public RxDocumentClientImpl(URI serviceEndpoint,
313
313
sessionRetryOptions ,
314
314
containerProactiveInitConfig ,
315
315
defaultCustomSerializer ,
316
- isRegionScopedSessionCapturingEnabled ,
317
- isPerPartitionAutomaticFailoverEnabled );
316
+ isRegionScopedSessionCapturingEnabled
317
+ );
318
318
319
319
this .cosmosAuthorizationTokenResolver = cosmosAuthorizationTokenResolver ;
320
320
}
@@ -364,8 +364,8 @@ public RxDocumentClientImpl(URI serviceEndpoint,
364
364
sessionRetryOptions ,
365
365
containerProactiveInitConfig ,
366
366
defaultCustomSerializer ,
367
- isRegionScopedSessionCapturingEnabled ,
368
- isPerPartitionAutomaticFailoverEnabled );
367
+ isRegionScopedSessionCapturingEnabled
368
+ );
369
369
370
370
this .cosmosAuthorizationTokenResolver = cosmosAuthorizationTokenResolver ;
371
371
this .operationPolicies = operationPolicies ;
@@ -391,8 +391,7 @@ private RxDocumentClientImpl(URI serviceEndpoint,
391
391
SessionRetryOptions sessionRetryOptions ,
392
392
CosmosContainerProactiveInitConfig containerProactiveInitConfig ,
393
393
CosmosItemSerializer defaultCustomSerializer ,
394
- boolean isRegionScopedSessionCapturingEnabled ,
395
- boolean isPerPartitionAutomaticFailoverEnabled ) {
394
+ boolean isRegionScopedSessionCapturingEnabled ) {
396
395
this (
397
396
serviceEndpoint ,
398
397
masterKeyOrResourceToken ,
@@ -413,8 +412,8 @@ private RxDocumentClientImpl(URI serviceEndpoint,
413
412
sessionRetryOptions ,
414
413
containerProactiveInitConfig ,
415
414
defaultCustomSerializer ,
416
- isRegionScopedSessionCapturingEnabled ,
417
- isPerPartitionAutomaticFailoverEnabled );
415
+ isRegionScopedSessionCapturingEnabled
416
+ );
418
417
419
418
if (permissionFeed != null && permissionFeed .size () > 0 ) {
420
419
this .resourceTokensMap = new HashMap <>();
@@ -477,8 +476,7 @@ private RxDocumentClientImpl(URI serviceEndpoint,
477
476
SessionRetryOptions sessionRetryOptions ,
478
477
CosmosContainerProactiveInitConfig containerProactiveInitConfig ,
479
478
CosmosItemSerializer defaultCustomSerializer ,
480
- boolean isRegionScopedSessionCapturingEnabled ,
481
- boolean isPerPartitionAutomaticFailoverEnabled ) {
479
+ boolean isRegionScopedSessionCapturingEnabled ) {
482
480
483
481
assert (clientTelemetryConfig != null );
484
482
activeClientsCnt .incrementAndGet ();
@@ -583,14 +581,13 @@ private RxDocumentClientImpl(URI serviceEndpoint,
583
581
584
582
this .globalPartitionEndpointManagerForPerPartitionCircuitBreaker
585
583
= new GlobalPartitionEndpointManagerForPerPartitionCircuitBreaker (this .globalEndpointManager );
584
+
585
+ // enablement of PPAF is revaluated in RxDocumentClientImpl#init
586
586
this .globalPartitionEndpointManagerForPerPartitionAutomaticFailover
587
- = new GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover (this .globalEndpointManager , isPerPartitionAutomaticFailoverEnabled );
587
+ = new GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover (this .globalEndpointManager , false );
588
588
589
- this .globalPartitionEndpointManagerForPerPartitionCircuitBreaker .init ();
590
589
this .cachedCosmosAsyncClientSnapshot = new AtomicReference <>();
591
590
592
- this .diagnosticsClientConfig .withPartitionLevelCircuitBreakerConfig (this .globalPartitionEndpointManagerForPerPartitionCircuitBreaker .getCircuitBreakerConfig ());
593
-
594
591
this .retryPolicy = new RetryPolicy (
595
592
this ,
596
593
this .globalEndpointManager ,
@@ -602,10 +599,6 @@ private RxDocumentClientImpl(URI serviceEndpoint,
602
599
this .queryPlanCache = new ConcurrentHashMap <>();
603
600
this .apiType = apiType ;
604
601
this .clientTelemetryConfig = clientTelemetryConfig ;
605
- this .ppafEnforcedE2ELatencyPolicyConfigForReads = evaluatePpafEnforcedE2eLatencyPolicyCfgForReads (
606
- this .globalPartitionEndpointManagerForPerPartitionAutomaticFailover ,
607
- this .connectionPolicy
608
- );
609
602
} catch (RuntimeException e ) {
610
603
logger .error ("unexpected failure in initializing client." , e );
611
604
close ();
@@ -795,7 +788,7 @@ public void init(CosmosClientMetadataCachesSnapshot metadataCachesSnapshot, Func
795
788
&& readConsistencyStrategy != ReadConsistencyStrategy .SESSION
796
789
&& !sessionCapturingOverrideEnabled );
797
790
this .sessionContainer .setDisableSessionCapturing (updatedDisableSessionCapturing );
798
-
791
+ this . initializePerPartitionFailover ( databaseAccountSnapshot );
799
792
this .addUserAgentSuffix (this .userAgentContainer , EnumSet .allOf (UserAgentFeatureFlags .class ));
800
793
} catch (Exception e ) {
801
794
logger .error ("unexpected failure in initializing client." , e );
@@ -7181,7 +7174,7 @@ private static boolean isNonTransientResultForHedging(int statusCode, int subSta
7181
7174
return false ;
7182
7175
}
7183
7176
7184
- private static CosmosEndToEndOperationLatencyPolicyConfig evaluatePpafEnforcedE2eLatencyPolicyCfgForReads (
7177
+ private CosmosEndToEndOperationLatencyPolicyConfig evaluatePpafEnforcedE2eLatencyPolicyCfgForReads (
7185
7178
GlobalPartitionEndpointManagerForPerPartitionAutomaticFailover globalPartitionEndpointManagerForPerPartitionAutomaticFailover ,
7186
7179
ConnectionPolicy connectionPolicy ) {
7187
7180
@@ -7191,6 +7184,9 @@ private static CosmosEndToEndOperationLatencyPolicyConfig evaluatePpafEnforcedE2
7191
7184
7192
7185
if (Configs .isReadAvailabilityStrategyEnabledWithPpaf ()) {
7193
7186
7187
+ logger .warn ("Availability strategy for reads, queries, read all and read many" +
7188
+ " is enabled when PerPartitionAutomaticFailover is enabled." );
7189
+
7194
7190
if (connectionPolicy .getConnectionMode () == ConnectionMode .DIRECT ) {
7195
7191
Duration networkRequestTimeout = connectionPolicy .getTcpNetworkRequestTimeout ();
7196
7192
@@ -7618,6 +7614,57 @@ private void addCancelledGatewayModeDiagnosticsIntoCosmosException(CosmosExcepti
7618
7614
}
7619
7615
}
7620
7616
7617
+ // this is a one time call, so we can afford to synchronize as the benefit is now all PPAF and PPCB related dependencies are visible
7618
+ // if initializePerPartitionFailover has been invoked prior
7619
+ private synchronized void initializePerPartitionFailover (DatabaseAccount databaseAccountSnapshot ) {
7620
+ initializePerPartitionAutomaticFailover (databaseAccountSnapshot );
7621
+ initializePerPartitionCircuitBreaker ();
7622
+ enableAvailabilityStrategyForReads ();
7623
+
7624
+ checkNotNull (this .globalPartitionEndpointManagerForPerPartitionAutomaticFailover , "Argument 'globalPartitionEndpointManagerForPerPartitionAutomaticFailover' cannot be null." );
7625
+ checkNotNull (this .globalPartitionEndpointManagerForPerPartitionCircuitBreaker , "Argument 'globalPartitionEndpointManagerForPerPartitionCircuitBreaker' cannot be null." );
7626
+
7627
+ this .diagnosticsClientConfig .withPartitionLevelCircuitBreakerConfig (this .globalPartitionEndpointManagerForPerPartitionCircuitBreaker .getCircuitBreakerConfig ());
7628
+ }
7629
+
7630
+ private void initializePerPartitionAutomaticFailover (DatabaseAccount databaseAccountSnapshot ) {
7631
+
7632
+ Boolean isPerPartitionAutomaticFailoverEnabledAsMandatedByService
7633
+ = databaseAccountSnapshot .isPerPartitionFailoverBehaviorEnabled ();
7634
+
7635
+ if (isPerPartitionAutomaticFailoverEnabledAsMandatedByService != null ) {
7636
+ this .globalPartitionEndpointManagerForPerPartitionAutomaticFailover .resetPerPartitionAutomaticFailoverEnabled (isPerPartitionAutomaticFailoverEnabledAsMandatedByService );
7637
+ } else {
7638
+ boolean isPerPartitionAutomaticFailoverOptedIntoByClient
7639
+ = Configs .isPerPartitionAutomaticFailoverEnabled ().equalsIgnoreCase ("true" );
7640
+ this .globalPartitionEndpointManagerForPerPartitionAutomaticFailover .resetPerPartitionAutomaticFailoverEnabled (isPerPartitionAutomaticFailoverOptedIntoByClient );
7641
+ }
7642
+ }
7643
+
7644
+ private void initializePerPartitionCircuitBreaker () {
7645
+ if (this .globalPartitionEndpointManagerForPerPartitionAutomaticFailover .isPerPartitionAutomaticFailoverEnabled ()) {
7646
+
7647
+ PartitionLevelCircuitBreakerConfig partitionLevelCircuitBreakerConfig = Configs .getPartitionLevelCircuitBreakerConfig ();
7648
+
7649
+ if (partitionLevelCircuitBreakerConfig != null && !partitionLevelCircuitBreakerConfig .isPartitionLevelCircuitBreakerEnabled ()) {
7650
+ logger .warn ("Per-Partition Circuit Breaker is enabled by default when Per-Partition Automatic Failover is enabled." );
7651
+ System .setProperty ("COSMOS.PARTITION_LEVEL_CIRCUIT_BREAKER_CONFIG" , "{\" isPartitionLevelCircuitBreakerEnabled\" : true}" );
7652
+ }
7653
+ }
7654
+
7655
+ this .globalPartitionEndpointManagerForPerPartitionCircuitBreaker .resetCircuitBreakerConfig ();
7656
+ this .globalPartitionEndpointManagerForPerPartitionCircuitBreaker .init ();
7657
+ }
7658
+
7659
+ private void enableAvailabilityStrategyForReads () {
7660
+ if (this .globalPartitionEndpointManagerForPerPartitionAutomaticFailover .isPerPartitionAutomaticFailoverEnabled ()) {
7661
+ this .ppafEnforcedE2ELatencyPolicyConfigForReads = this .evaluatePpafEnforcedE2eLatencyPolicyCfgForReads (
7662
+ this .globalPartitionEndpointManagerForPerPartitionAutomaticFailover ,
7663
+ this .connectionPolicy
7664
+ );
7665
+ }
7666
+ }
7667
+
7621
7668
private boolean useThinClient () {
7622
7669
return Configs .isThinClientEnabled () && this .connectionPolicy .getConnectionMode () == ConnectionMode .GATEWAY ;
7623
7670
}
0 commit comments