Skip to content

Commit be1f455

Browse files
simorenohbambriz
andauthored
[Cosmos] Retryable writes (#41272)
* retriable writes implementation * pylint/mypy * Update test_backwards_compatibility.py * Update test_backwards_compatibility_async.py * Update test_headers_async.py * Update test_headers.py * Update test_headers.py * add tests for retryable writes retryable writes tests * add tests for patch item * adds README with snippets, removes preview from VS * add retry write at client level tests * fix mark * small typos * handle service response errors * service response request level test * client-level check for service response * Update retryable writes with tests and multi region retry Adds constants for keyword retry_policy, adds logic for multi region retry for retryable writes on timeout errors, adds test for delete item retry. * add mwr tests, fix logic * update tests with replace item * pylint fixes * updates * address comments * Update _service_response_retry_policy.py * update tests Updates the tests and service response retry policy in order for the test to pass * Update test_retryable_writes_async.py * Update test_retryable_writes_async.py * Update test_retryable_writes_async.py * Update test_retryable_writes_async.py * Update test_retryable_writes_async.py --------- Co-authored-by: bambriz <beambriz2017@gmail.com>
1 parent 7bd4b11 commit be1f455

21 files changed

+1730
-19
lines changed

sdk/cosmos/azure-cosmos/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
### 4.13.0b3 (Unreleased)
44

55
#### Features Added
6+
* Added option to enable automatic retries for write operations. See [PR 41272](https://github.com/Azure/azure-sdk-for-python/pull/41272).
67

78
#### Breaking Changes
89
* Adds cross region retries when no preferred locations are set. This is only a breaking change for customers using bounded staleness consistency. See [PR 39714](https://github.com/Azure/azure-sdk-for-python/pull/39714)

sdk/cosmos/azure-cosmos/README.md

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -668,7 +668,38 @@ as well as containing the list of failed responses for the failed request.
668668

669669
For more information on Transactional Batch, see [Azure Cosmos DB Transactional Batch][cosmos_transactional_batch].
670670

671-
### Public Preview - Vector Embeddings and Vector Indexes
671+
### Native Retryable Writes
672+
We have added native retryable writes to the SDK, a feature that can be used by customers who don't mind the
673+
non-idempotency of these retries and would instead like to ensure that the given operation executed in case of timeouts
674+
or connectivity issues (status codes 408, 5xx).
675+
676+
This feature can be enabled either at the client level, to retry all write operations under these conditions,
677+
or at the per-request level to enable the retries for an individual request.
678+
679+
If enabled at the client level, the one exception to the rule would be patch requests, since the operations can change
680+
the nature of the overall request - replace, set, and copy for example would be idempotent while add, move or remove would
681+
not be idempotent unless combined with patch precondition checks. So, for patch we allow opting-in into automatic
682+
retries only on the request options level.
683+
684+
The snippet below shows how to enable this feature at the client and request level:
685+
```python
686+
cosmos_client = CosmosClient(
687+
url=URL,
688+
credential=KEY,
689+
retry_write=True, # enables native retryable writes at the client level
690+
)
691+
692+
database = cosmos_client.get_database_client(DATABASE_NAME)
693+
container = database.get_container_client(CONTAINER_NAME)
694+
695+
container.create_item(
696+
item_body,
697+
retry_write=True # enables native retryable writes at the request level
698+
)
699+
```
700+
701+
702+
### Vector Embeddings and Vector Indexes
672703
We have added new capabilities to utilize vector embeddings and vector indexing for users to leverage vector
673704
search utilizing our Cosmos SDK. These two container-level configurations have to be turned on at the account-level
674705
before you can use them.
@@ -755,7 +786,7 @@ database.create_container(id=container_id, partition_key=PartitionKey(path="/id"
755786
```
756787
***Note: vector embeddings and vector indexes CANNOT be edited by container replace operations. They are only available directly through creation.***
757788

758-
### Public Preview - Vector Search
789+
### Vector Search
759790

760791
With the addition of the vector indexing and vector embedding capabilities, the SDK can now perform order by vector search queries.
761792
These queries specify the VectorDistance to use as a metric within the query text. These must always use a TOP or LIMIT clause within the query though,

sdk/cosmos/azure-cosmos/azure/cosmos/_base.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@
3737
from . import documents
3838
from . import http_constants
3939
from . import _runtime_constants
40+
from ._constants import _Constants as Constants
4041
from .auth import _get_authorization_header
4142
from .offer import ThroughputProperties
4243
from .partition_key import _Empty, _Undefined
@@ -62,6 +63,7 @@
6263
'query_version': 'queryVersion',
6364
'priority': 'priorityLevel',
6465
'no_response': 'responsePayloadOnWriteDisabled',
66+
'retry_write': Constants.Kwargs.RETRY_WRITE,
6567
'max_item_count': 'maxItemCount',
6668
'throughput_bucket': 'throughputBucket',
6769
'excluded_locations': 'excludedLocations'

sdk/cosmos/azure-cosmos/azure/cosmos/_constants.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,3 +78,9 @@ class _Constants:
7878
429: "TOO_MANY_REQUESTS",
7979
449: "RETRY_WITH - Conflicting request to resource has been attempted. Retry to avoid conflicts."
8080
}
81+
82+
class Kwargs:
83+
"""Keyword arguments used in the azure-cosmos package"""
84+
85+
RETRY_WRITE: Literal["retry_write"] = "retry_write"
86+
"""Whether to retry write operations if they fail. Used either at client level or request level."""

sdk/cosmos/azure-cosmos/azure/cosmos/_cosmos_client_connection.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2073,6 +2073,7 @@ def PatchItem(
20732073
documents._OperationType.Patch,
20742074
headers)
20752075
request_params.set_excluded_location_from_options(options)
2076+
request_params.set_retry_write(options, self.connection_policy.RetryNonIdempotentWrites)
20762077
request_data = {}
20772078
if options.get("filterPredicate"):
20782079
request_data["condition"] = options.get("filterPredicate")
@@ -2692,6 +2693,7 @@ def Create(
26922693

26932694
request_params = RequestObject(typ, documents._OperationType.Create, headers)
26942695
request_params.set_excluded_location_from_options(options)
2696+
request_params.set_retry_write(options, self.connection_policy.RetryNonIdempotentWrites)
26952697
result, last_response_headers = self.__Post(path, request_params, body, headers, **kwargs)
26962698
self.last_response_headers = last_response_headers
26972699

@@ -2739,6 +2741,7 @@ def Upsert(
27392741
# Upsert will use WriteEndpoint since it uses POST operation
27402742
request_params = RequestObject(typ, documents._OperationType.Upsert, headers)
27412743
request_params.set_excluded_location_from_options(options)
2744+
request_params.set_retry_write(options, self.connection_policy.RetryNonIdempotentWrites)
27422745
result, last_response_headers = self.__Post(path, request_params, body, headers, **kwargs)
27432746
self.last_response_headers = last_response_headers
27442747
# update session for write request
@@ -2783,6 +2786,7 @@ def Replace(
27832786
# Replace will use WriteEndpoint since it uses PUT operation
27842787
request_params = RequestObject(typ, documents._OperationType.Replace, headers)
27852788
request_params.set_excluded_location_from_options(options)
2789+
request_params.set_retry_write(options, self.connection_policy.RetryNonIdempotentWrites)
27862790
result, last_response_headers = self.__Put(path, request_params, resource, headers, **kwargs)
27872791
self.last_response_headers = last_response_headers
27882792

@@ -2865,6 +2869,7 @@ def DeleteResource(
28652869
# Delete will use WriteEndpoint since it uses DELETE operation
28662870
request_params = RequestObject(typ, documents._OperationType.Delete, headers)
28672871
request_params.set_excluded_location_from_options(options)
2872+
request_params.set_retry_write(options, self.connection_policy.RetryNonIdempotentWrites)
28682873
result, last_response_headers = self.__Delete(path, request_params, headers, **kwargs)
28692874
self.last_response_headers = last_response_headers
28702875

sdk/cosmos/azure-cosmos/azure/cosmos/_request_object.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
"""Represents a request object.
2323
"""
2424
from typing import Optional, Mapping, Any, Dict, List
25+
from .documents import _OperationType
26+
from ._constants import _Constants as Constants
2527

2628
class RequestObject(object): # pylint: disable=too-many-instance-attributes
2729
def __init__(
@@ -43,6 +45,7 @@ def __init__(
4345
self.excluded_locations: Optional[List[str]] = None
4446
self.excluded_locations_circuit_breaker: List[str] = []
4547
self.healthy_tentative_location: Optional[str] = None
48+
self.retry_write: bool = False
4649

4750
def route_to_location_with_preferred_location_flag( # pylint: disable=name-too-long
4851
self,
@@ -80,5 +83,13 @@ def set_excluded_location_from_options(self, options: Mapping[str, Any]) -> None
8083
if self._can_set_excluded_location(options):
8184
self.excluded_locations = options['excludedLocations']
8285

86+
def set_retry_write(self, request_options: Mapping[str, Any], client_retry_write: bool) -> None:
87+
if request_options and request_options.get(Constants.Kwargs.RETRY_WRITE):
88+
# If request retry write is True, set the option
89+
self.retry_write = request_options[Constants.Kwargs.RETRY_WRITE]
90+
elif client_retry_write and self.operation_type != _OperationType.Patch:
91+
# If it is not a patch operation and the client config is set, set the retry write to True
92+
self.retry_write = client_retry_write
93+
8394
def set_excluded_locations_from_circuit_breaker(self, excluded_locations: List[str]) -> None: # pylint: disable=name-too-long
8495
self.excluded_locations_circuit_breaker = excluded_locations

sdk/cosmos/azure-cosmos/azure/cosmos/_retry_utility.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -263,7 +263,7 @@ def _handle_service_request_retries(
263263
raise exception
264264

265265
def _handle_service_response_retries(request, client, response_retry_policy, exception, *args):
266-
if request and _has_read_retryable_headers(request.headers):
266+
if request and (_has_read_retryable_headers(request.headers) or (args and is_write_retryable(args[0], client))):
267267
# we resolve the request endpoint to the next preferred region
268268
# once we are out of preferred regions we stop retrying
269269
retry_policy = response_retry_policy
@@ -274,6 +274,11 @@ def _handle_service_response_retries(request, client, response_retry_policy, exc
274274
else:
275275
raise exception
276276

277+
def is_write_retryable(request_params, client):
278+
return (request_params.retry_write or
279+
client.connection_policy.RetryNonIdempotentWrites and
280+
not request_params.operation_type == _OperationType.Patch)
281+
277282
def _configure_timeout(request: PipelineRequest, absolute: Optional[int], per_request: int) -> None:
278283
if absolute is not None:
279284
if absolute <= 0:

sdk/cosmos/azure-cosmos/azure/cosmos/_service_response_retry_policy.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,13 @@ def __init__(self, connection_policy, global_endpoint_manager, pk_range_wrapper,
2121
self.connection_policy = connection_policy
2222
self.request = args[0] if args else None
2323
if self.request:
24+
if self.request.retry_write:
25+
# If the request is a write operation, we set the maximum retry count to be the number of
26+
# write regional routing contexts available in the global endpoint manager.
27+
# This ensures that we retry the write operation across all available regions.
28+
# We also ensure that we retry at least once, hence the max is set to 2 by default.
29+
self.max_write_retry_count = max(len(self.global_endpoint_manager.
30+
location_cache.write_regional_routing_contexts), 2)
2431
self.location_endpoint = (self.global_endpoint_manager
2532
.resolve_service_endpoint_for_partition(self.request, pk_range_wrapper))
2633
self.logger = logging.getLogger('azure.cosmos.ServiceResponseRetryPolicy')
@@ -35,15 +42,21 @@ def ShouldRetry(self):
3542
return False
3643

3744
# Check if the next retry about to be done is safe
38-
if (self.failover_retry_count + 1) >= self.total_retries:
45+
if ((self.failover_retry_count + 1) >= self.total_retries and
46+
_OperationType.IsReadOnlyOperation(self.request.operation_type)):
3947
return False
4048

4149
if self.request:
42-
if not _OperationType.IsReadOnlyOperation(self.request.operation_type):
43-
return False
4450

51+
if not _OperationType.IsReadOnlyOperation(self.request.operation_type) and not self.request.retry_write:
52+
return False
53+
if self.request.retry_write and self.failover_retry_count + 1 >= self.max_write_retry_count:
54+
# If we have already retried the write operation to the maximum allowed number of times,
55+
# we do not retry further.
56+
return False
4557
self.location_endpoint = self.resolve_next_region_service_endpoint()
4658
self.request.route_to_location(self.location_endpoint)
59+
4760
return True
4861

4962
# This function prepares the request to go to the next region

sdk/cosmos/azure-cosmos/azure/cosmos/_timeout_failover_retry_policy.py

Lines changed: 23 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -11,14 +11,23 @@ class _TimeoutFailoverRetryPolicy(object):
1111

1212
def __init__(self, connection_policy, global_endpoint_manager, pk_range_wrapper, *args):
1313
self.retry_after_in_milliseconds = 500
14+
self.args = args
15+
self.request = args[0] if args else None
16+
1417
self.global_endpoint_manager = global_endpoint_manager
1518
self.pk_range_wrapper = pk_range_wrapper
1619
# If an account only has 1 region, then we still want to retry once on the same region
17-
self._max_retry_attempt_count = (len(self.global_endpoint_manager.location_cache.read_regional_routing_contexts)
18-
+ 1)
20+
# We want this to be the default retry attempts as paging through a query means there are requests without
21+
# a request object
22+
self._max_retry_attempt_count = len(self.global_endpoint_manager.location_cache
23+
.read_regional_routing_contexts) + 1
24+
# If the request is a write operation, we only want to retry once if retry write is enabled
25+
if self.request and _OperationType.IsWriteOperation(self.request.operation_type):
26+
self._max_retry_attempt_count = len(
27+
self.global_endpoint_manager.location_cache.write_regional_routing_contexts
28+
) + 1
1929
self.retry_count = 0
2030
self.connection_policy = connection_policy
21-
self.request = args[0] if args else None
2231

2332
def ShouldRetry(self, _exception):
2433
"""Returns true if the request should retry based on the passed-in exception.
@@ -27,8 +36,8 @@ def ShouldRetry(self, _exception):
2736
:returns: a boolean stating whether the request should be retried
2837
:rtype: bool
2938
"""
30-
# we don't retry on write operations for timeouts or any internal server errors
31-
if self.request and (not _OperationType.IsReadOnlyOperation(self.request.operation_type)):
39+
# we retry only if the request is a read operation or if it is a write operation with retry enabled
40+
if self.request and not self.is_operation_retryable():
3241
return False
3342

3443
if not self.connection_policy.EnableEndpointDiscovery:
@@ -39,7 +48,10 @@ def ShouldRetry(self, _exception):
3948
if self.retry_count >= self._max_retry_attempt_count:
4049
return False
4150

42-
if self.request:
51+
# second check here ensures we only do cross-regional retries for read requests
52+
# non-idempotent write retries should only be retried once, using preferred locations if available (MM)
53+
if self.request and (_OperationType.IsReadOnlyOperation(self.request.operation_type)
54+
or self.global_endpoint_manager.can_use_multiple_write_locations(self.request)):
4355
location_endpoint = self.resolve_next_region_service_endpoint()
4456
self.request.route_to_location(location_endpoint)
4557
return True
@@ -56,3 +68,8 @@ def resolve_next_region_service_endpoint(self):
5668
# Resolve the endpoint for the request and pin the resolution to the resolved endpoint
5769
# This enables marking the endpoint unavailability on endpoint failover/unreachability
5870
return self.global_endpoint_manager.resolve_service_endpoint_for_partition(self.request, self.pk_range_wrapper)
71+
72+
def is_operation_retryable(self):
73+
if _OperationType.IsReadOnlyOperation(self.request.operation_type):
74+
return True
75+
return self.request.retry_write

0 commit comments

Comments
 (0)