Skip to content

Commit 56b6176

Browse files
committed
Adding default retry configuration changes for async cluster client
1 parent 6b0dcee commit 56b6176

File tree

6 files changed

+124
-85
lines changed

6 files changed

+124
-85
lines changed

redis/asyncio/cluster.py

+58-40
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from redis.asyncio.lock import Lock
3030
from redis.asyncio.retry import Retry
3131
from redis.auth.token import TokenInterface
32-
from redis.backoff import default_backoff
32+
from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
3333
from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis
3434
from redis.cluster import (
3535
PIPELINE_BLOCKED_COMMANDS,
@@ -143,19 +143,23 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
143143
To avoid reinitializing the cluster on moved errors, set reinitialize_steps to
144144
0.
145145
:param cluster_error_retry_attempts:
146-
| Number of times to retry before raising an error when :class:`~.TimeoutError`
147-
or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered
148-
:param connection_error_retry_attempts:
149-
| Number of times to retry before reinitializing when :class:`~.TimeoutError`
150-
or :class:`~.ConnectionError` are encountered.
151-
The default backoff strategy will be set if Retry object is not passed (see
152-
default_backoff in backoff.py). To change it, pass a custom Retry object
153-
using the "retry" keyword.
146+
| @deprecated - Please configure the 'retry' object instead
147+
In case 'retry' object is set - this argument is ignored!
148+
149+
Number of times to retry before raising an error when :class:`~.TimeoutError`,
150+
:class:`~.ConnectionError`, :class:`~.SlotNotCoveredError`
151+
or :class:`~.ClusterDownError` are encountered
152+
:param retry:
153+
| A retry object that defines the retry strategy and the number of
154+
retries for the cluster client.
155+
In current implementation for the cluster client (starting form redis-py version 6.0.0)
156+
the retry object is not yet fully utilized, instead it is used just to determine
157+
the number of retries for the cluster client.
158+
In the future releases the retry object will be used to handle the cluster client retries!
154159
:param max_connections:
155160
| Maximum number of connections per node. If there are no free connections & the
156161
maximum number of connections are already created, a
157-
:class:`~.MaxConnectionsError` is raised. This error may be retried as defined
158-
by :attr:`connection_error_retry_attempts`
162+
:class:`~.MaxConnectionsError` is raised.
159163
:param address_remap:
160164
| An optional callable which, when provided with an internal network
161165
address of a node, e.g. a `(host, port)` tuple, will return the address
@@ -211,10 +215,9 @@ def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
211215
__slots__ = (
212216
"_initialize",
213217
"_lock",
214-
"cluster_error_retry_attempts",
218+
"retry",
215219
"command_flags",
216220
"commands_parser",
217-
"connection_error_retry_attempts",
218221
"connection_kwargs",
219222
"encoder",
220223
"node_flags",
@@ -231,6 +234,13 @@ def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
231234
reason="Please configure the 'load_balancing_strategy' instead",
232235
version="5.0.3",
233236
)
237+
@deprecated_args(
238+
args_to_warn=[
239+
"cluster_error_retry_attempts",
240+
],
241+
reason="Please configure the 'retry' object instead",
242+
version="6.0.0",
243+
)
234244
def __init__(
235245
self,
236246
host: Optional[str] = None,
@@ -242,8 +252,9 @@ def __init__(
242252
load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
243253
reinitialize_steps: int = 5,
244254
cluster_error_retry_attempts: int = 3,
245-
connection_error_retry_attempts: int = 3,
246255
max_connections: int = 2**31,
256+
retry: Optional["Retry"] = None,
257+
retry_on_error: Optional[List[Type[Exception]]] = None,
247258
# Client related kwargs
248259
db: Union[str, int] = 0,
249260
path: Optional[str] = None,
@@ -263,8 +274,6 @@ def __init__(
263274
socket_keepalive: bool = False,
264275
socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
265276
socket_timeout: Optional[float] = None,
266-
retry: Optional["Retry"] = None,
267-
retry_on_error: Optional[List[Type[Exception]]] = None,
268277
# SSL related kwargs
269278
ssl: bool = False,
270279
ssl_ca_certs: Optional[str] = None,
@@ -318,7 +327,6 @@ def __init__(
318327
"socket_keepalive": socket_keepalive,
319328
"socket_keepalive_options": socket_keepalive_options,
320329
"socket_timeout": socket_timeout,
321-
"retry": retry,
322330
"protocol": protocol,
323331
}
324332

@@ -342,17 +350,15 @@ def __init__(
342350
# Call our on_connect function to configure READONLY mode
343351
kwargs["redis_connect_func"] = self.on_connect
344352

345-
self.retry = retry
346-
if retry or retry_on_error or connection_error_retry_attempts > 0:
347-
# Set a retry object for all cluster nodes
348-
self.retry = retry or Retry(
349-
default_backoff(), connection_error_retry_attempts
353+
if retry:
354+
self.retry = retry
355+
else:
356+
self.retry = Retry(
357+
backoff=ExponentialWithJitterBackoff(base=1, cap=10),
358+
retries=cluster_error_retry_attempts,
350359
)
351-
if not retry_on_error:
352-
# Default errors for retrying
353-
retry_on_error = [ConnectionError, TimeoutError]
360+
if retry_on_error:
354361
self.retry.update_supported_errors(retry_on_error)
355-
kwargs.update({"retry": self.retry})
356362

357363
kwargs["response_callbacks"] = _RedisCallbacks.copy()
358364
if kwargs.get("protocol") in ["3", 3]:
@@ -389,8 +395,6 @@ def __init__(
389395
self.read_from_replicas = read_from_replicas
390396
self.load_balancing_strategy = load_balancing_strategy
391397
self.reinitialize_steps = reinitialize_steps
392-
self.cluster_error_retry_attempts = cluster_error_retry_attempts
393-
self.connection_error_retry_attempts = connection_error_retry_attempts
394398
self.reinitialize_counter = 0
395399
self.commands_parser = AsyncCommandsParser()
396400
self.node_flags = self.__class__.NODE_FLAGS.copy()
@@ -561,15 +565,13 @@ def get_connection_kwargs(self) -> Dict[str, Optional[Any]]:
561565
"""Get the kwargs passed to :class:`~redis.asyncio.connection.Connection`."""
562566
return self.connection_kwargs
563567

564-
def get_retry(self) -> Optional["Retry"]:
568+
def get_retry(self) -> Retry:
565569
return self.retry
566570

567-
def set_retry(self, retry: "Retry") -> None:
571+
def set_retry(self, retry: Retry) -> None:
572+
if not isinstance(retry, Retry):
573+
raise TypeError("retry must be a valid instance of redis.retry.Retry")
568574
self.retry = retry
569-
for node in self.get_nodes():
570-
node.connection_kwargs.update({"retry": retry})
571-
for conn in node._connections:
572-
conn.retry = retry
573575

574576
def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None:
575577
"""Set a custom response callback."""
@@ -688,8 +690,8 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
688690
"""
689691
Execute a raw command on the appropriate cluster node or target_nodes.
690692
691-
It will retry the command as specified by :attr:`cluster_error_retry_attempts` &
692-
then raise an exception.
693+
It will retry the command as specified by the retries property of
694+
the :attr:`retry` & then raise an exception.
693695
694696
:param args:
695697
| Raw command args
@@ -705,7 +707,7 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
705707
command = args[0]
706708
target_nodes = []
707709
target_nodes_specified = False
708-
retry_attempts = self.cluster_error_retry_attempts
710+
retry_attempts = self.retry.get_retries()
709711

710712
passed_targets = kwargs.pop("target_nodes", None)
711713
if passed_targets and not self._is_node_flag(passed_targets):
@@ -1042,7 +1044,23 @@ def acquire_connection(self) -> Connection:
10421044
return self._free.popleft()
10431045
except IndexError:
10441046
if len(self._connections) < self.max_connections:
1045-
connection = self.connection_class(**self.connection_kwargs)
1047+
# We are configuring the connection pool not to retry
1048+
# connections on lower level clients to avoid retrying
1049+
# connections to nodes that are not reachable
1050+
# and to avoid blocking the connection pool.
1051+
# The only error that will have some handling in the lower
1052+
# level clients is ConnectionError which will trigger disconnection
1053+
# of the socket.
1054+
# The retries will be handled on cluster client level
1055+
# where we will have proper handling of the cluster topology
1056+
retry = Retry(
1057+
backoff=NoBackoff(),
1058+
retries=0,
1059+
supported_errors=(ConnectionError,),
1060+
)
1061+
connection_kwargs = self.connection_kwargs.copy()
1062+
connection_kwargs["retry"] = retry
1063+
connection = self.connection_class(**connection_kwargs)
10461064
self._connections.append(connection)
10471065
return connection
10481066

@@ -1538,7 +1556,7 @@ async def execute(
15381556
"""
15391557
Execute the pipeline.
15401558
1541-
It will retry the commands as specified by :attr:`cluster_error_retry_attempts`
1559+
It will retry the commands as specified by retries specified in :attr:`retry`
15421560
& then raise an exception.
15431561
15441562
:param raise_on_error:
@@ -1554,7 +1572,7 @@ async def execute(
15541572
return []
15551573

15561574
try:
1557-
retry_attempts = self._client.cluster_error_retry_attempts
1575+
retry_attempts = self._client.retry.get_retries()
15581576
while True:
15591577
try:
15601578
if self._client._initialize:

redis/asyncio/retry.py

+14
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,20 @@ def update_supported_errors(self, specified_errors: list):
4343
set(self._supported_errors + tuple(specified_errors))
4444
)
4545

46+
def get_retries(self) -> int:
47+
"""
48+
Get the number of retries.
49+
"""
50+
return self._retries
51+
52+
def update_retries(self, value: int) -> None:
53+
"""
54+
Set the number of retries.
55+
"""
56+
if not isinstance(value, int):
57+
raise ValueError("Retries must be an integer.")
58+
self._retries = value
59+
4660
async def call_with_retry(
4761
self, do: Callable[[], Awaitable[T]], fail: Callable[[RedisError], Any]
4862
) -> T:

redis/cluster.py

+5-7
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,6 @@ def __init__(
665665
if (cache_config or cache) and protocol not in [3, "3"]:
666666
raise RedisError("Client caching is only supported with RESP version 3")
667667

668-
self.cluster_error_retry_attempts = cluster_error_retry_attempts
669668
self.command_flags = self.__class__.COMMAND_FLAGS.copy()
670669
self.node_flags = self.__class__.NODE_FLAGS.copy()
671670
self.read_from_replicas = read_from_replicas
@@ -849,7 +848,7 @@ def pipeline(self, transaction=None, shard_hint=None):
849848
startup_nodes=self.nodes_manager.startup_nodes,
850849
result_callbacks=self.result_callbacks,
851850
cluster_response_callbacks=self.cluster_response_callbacks,
852-
cluster_error_retry_attempts=self.cluster_error_retry_attempts,
851+
cluster_error_retry_attempts=self.retry.get_retries(),
853852
read_from_replicas=self.read_from_replicas,
854853
load_balancing_strategy=self.load_balancing_strategy,
855854
reinitialize_steps=self.reinitialize_steps,
@@ -1115,8 +1114,8 @@ def _internal_execute_command(self, *args, **kwargs):
11151114
"""
11161115
Wrapper for ERRORS_ALLOW_RETRY error handling.
11171116
1118-
It will try the number of times specified by the config option
1119-
"self.cluster_error_retry_attempts" which defaults to 3 unless manually
1117+
It will try the number of times specified by the retries property from
1118+
config option "self.retry" which defaults to 3 unless manually
11201119
configured.
11211120
11221121
If it reaches the number of times, the command will raise the exception
@@ -1606,7 +1605,7 @@ def create_redis_connections(self, nodes):
16061605
)
16071606

16081607
def create_redis_node(self, host, port, **kwargs):
1609-
# We are configuring the connection pool to not retry
1608+
# We are configuring the connection pool not to retry
16101609
# connections on lower level clients to avoid retrying
16111610
# connections to nodes that are not reachable
16121611
# and to avoid blocking the connection pool.
@@ -2110,7 +2109,6 @@ def __init__(
21102109
self.load_balancing_strategy = load_balancing_strategy
21112110
self.command_flags = self.__class__.COMMAND_FLAGS.copy()
21122111
self.cluster_response_callbacks = cluster_response_callbacks
2113-
self.cluster_error_retry_attempts = cluster_error_retry_attempts
21142112
self.reinitialize_counter = 0
21152113
self.reinitialize_steps = reinitialize_steps
21162114
if retry is not None:
@@ -2246,7 +2244,7 @@ def send_cluster_commands(
22462244
- refereh_table_asap set to True
22472245
22482246
It will try the number of times specified by
2249-
the config option "self.cluster_error_retry_attempts"
2247+
the retries in config option "self.retry"
22502248
which defaults to 3 unless manually configured.
22512249
22522250
If it reaches the number of times, the command will

redis/commands/json/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ def pipeline(self, transaction=True, shard_hint=None):
120120
startup_nodes=self.client.nodes_manager.startup_nodes,
121121
result_callbacks=self.client.result_callbacks,
122122
cluster_response_callbacks=self.client.cluster_response_callbacks,
123-
cluster_error_retry_attempts=self.client.cluster_error_retry_attempts,
123+
cluster_error_retry_attempts=self.client.retry.get_retries(),
124124
read_from_replicas=self.client.read_from_replicas,
125125
reinitialize_steps=self.client.reinitialize_steps,
126126
lock=self.client._lock,

redis/commands/timeseries/__init__.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def pipeline(self, transaction=True, shard_hint=None):
8484
startup_nodes=self.client.nodes_manager.startup_nodes,
8585
result_callbacks=self.client.result_callbacks,
8686
cluster_response_callbacks=self.client.cluster_response_callbacks,
87-
cluster_error_retry_attempts=self.client.cluster_error_retry_attempts,
87+
cluster_error_retry_attempts=self.client.retry.get_retries(),
8888
read_from_replicas=self.client.read_from_replicas,
8989
reinitialize_steps=self.client.reinitialize_steps,
9090
lock=self.client._lock,

0 commit comments

Comments
 (0)