Skip to content

Commit a02fa71

Browse files
petyaslavovaManelCoutinhoSensei
authored andcommitted
Adding default retry configuration changes for cluster clients (redis#3622)
* Adding default retry configuration changes for sync cluster client * Adding default retry configuration changes for sync cluster client * Adding default retry configuration changes for async cluster client * Updating docs related to retries and read_from_replicas. * Applying review comments. * Removing retry checks when using set_retry for cluster clients.
1 parent f6eb2d8 commit a02fa71

File tree

11 files changed

+283
-153
lines changed

11 files changed

+283
-153
lines changed

docs/clustering.rst

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,8 @@ When a ClusterPubSub instance is created without specifying a node, a
187187
single node will be transparently chosen for the pubsub connection on
188188
the first command execution. The node will be determined by: 1. Hashing
189189
the channel name in the request to find its keyslot 2. Selecting a node
190-
that handles the keyslot: If read_from_replicas is set to true, a
191-
replica can be selected.
190+
that handles the keyslot: If read_from_replicas is set to true or
191+
load_balancing_strategy is provided, a replica can be selected.
192192
193193
Known PubSub Limitations
194194
------------------------
@@ -216,9 +216,12 @@ By default, Redis Cluster always returns MOVE redirection response on
216216
accessing a replica node. You can overcome this limitation and scale
217217
read commands by triggering READONLY mode.
218218
219-
To enable READONLY mode pass read_from_replicas=True to RedisCluster
220-
constructor. When set to true, read commands will be assigned between
219+
To enable READONLY mode pass read_from_replicas=True or define
220+
a load_balancing_strategy to RedisCluster constructor.
221+
When read_from_replicas is set to true read commands will be assigned between
221222
the primary and its replications in a Round-Robin manner.
223+
With load_balancing_strategy you can define a custom strategy for
224+
assigning read commands to the replicas and primary nodes.
222225
223226
READONLY mode can be set at runtime by calling the readonly() method
224227
with target_nodes=‘replicas’, and read-write access can be restored by

docs/retry.rst

Lines changed: 23 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,25 +13,25 @@ Retry in Redis Standalone
1313
>>> from redis.client import Redis
1414
>>> from redis.exceptions import (
1515
>>> BusyLoadingError,
16-
>>> ConnectionError,
17-
>>> TimeoutError
16+
>>> RedisError,
1817
>>> )
1918
>>>
2019
>>> # Run 3 retries with exponential backoff strategy
2120
>>> retry = Retry(ExponentialBackoff(), 3)
22-
>>> # Redis client with retries on custom errors
23-
>>> r = Redis(host='localhost', port=6379, retry=retry, retry_on_error=[BusyLoadingError, ConnectionError, TimeoutError])
24-
>>> # Redis client with retries on TimeoutError only
25-
>>> r_only_timeout = Redis(host='localhost', port=6379, retry=retry, retry_on_timeout=True)
21+
>>> # Redis client with retries on custom errors in addition to the errors
22+
>>> # that are already retried by default
23+
>>> r = Redis(host='localhost', port=6379, retry=retry, retry_on_error=[BusyLoadingError, RedisError])
2624

27-
As you can see from the example above, Redis client supports 3 parameters to configure the retry behaviour:
25+
As you can see from the example above, Redis client supports 2 parameters to configure the retry behaviour:
2826

2927
* ``retry``: :class:`~.Retry` instance with a :ref:`backoff-label` strategy and the max number of retries
30-
* ``retry_on_error``: list of :ref:`exceptions-label` to retry on
31-
* ``retry_on_timeout``: if ``True``, retry on :class:`~.TimeoutError` only
28+
* The :class:`~.Retry` instance has default set of :ref:`exceptions-label` to retry on,
29+
which can be overridden by passing a tuple with :ref:`exceptions-label` to the ``supported_errors`` parameter.
30+
* ``retry_on_error``: list of additional :ref:`exceptions-label` to retry on
3231

33-
If either ``retry_on_error`` or ``retry_on_timeout`` are passed and no ``retry`` is given,
34-
by default it uses a ``Retry(NoBackoff(), 1)`` (meaning 1 retry right after the first failure).
32+
33+
If no ``retry`` is provided, a default one is created with :class:`~.ExponentialWithJitterBackoff` as backoff strategy
34+
and 3 retries.
3535

3636

3737
Retry in Redis Cluster
@@ -44,27 +44,30 @@ Retry in Redis Cluster
4444
>>> # Run 3 retries with exponential backoff strategy
4545
>>> retry = Retry(ExponentialBackoff(), 3)
4646
>>> # Redis Cluster client with retries
47-
>>> rc = RedisCluster(host='localhost', port=6379, retry=retry, cluster_error_retry_attempts=2)
47+
>>> rc = RedisCluster(host='localhost', port=6379, retry=retry)
4848

4949
Retry behaviour in Redis Cluster is a little bit different from Standalone:
5050

51-
* ``retry``: :class:`~.Retry` instance with a :ref:`backoff-label` strategy and the max number of retries, default value is ``Retry(NoBackoff(), 0)``
52-
* ``cluster_error_retry_attempts``: number of times to retry before raising an error when :class:`~.TimeoutError` or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered, default value is ``3``
51+
* ``retry``: :class:`~.Retry` instance with a :ref:`backoff-label` strategy and the max number of retries, default value is ``Retry(ExponentialWithJitterBackoff(base=1, cap=10), cluster_error_retry_attempts)``
52+
* ``cluster_error_retry_attempts``: number of times to retry before raising an error when :class:`~.TimeoutError`, :class:`~.ConnectionError`, :class:`~.ClusterDownError` or :class:`~.SlotNotCoveredError` are encountered, default value is ``3``
53+
* This argument is deprecated - it is used to initialize the number of retries for the retry object,
54+
only in the case when the ``retry`` object is not provided.
55+
When the ``retry`` argument is provided, the ``cluster_error_retry_attempts`` argument is ignored!
56+
57+
* The retry object is not yet fully utilized in the cluster client.
58+
The retry object is used only to determine the number of retries for the cluster level calls.
5359

5460
Let's consider the following example:
5561

5662
>>> from redis.backoff import ExponentialBackoff
5763
>>> from redis.retry import Retry
5864
>>> from redis.cluster import RedisCluster
5965
>>>
60-
>>> rc = RedisCluster(host='localhost', port=6379, retry=Retry(ExponentialBackoff(), 6), cluster_error_retry_attempts=1)
66+
>>> rc = RedisCluster(host='localhost', port=6379, retry=Retry(ExponentialBackoff(), 6))
6167
>>> rc.set('foo', 'bar')
6268

6369
#. the client library calculates the hash slot for key 'foo'.
6470
#. given the hash slot, it then determines which node to connect to, in order to execute the command.
6571
#. during the connection, a :class:`~.ConnectionError` is raised.
66-
#. because we set ``retry=Retry(ExponentialBackoff(), 6)``, the client tries to reconnect to the node up to 6 times, with an exponential backoff between each attempt.
67-
#. even after 6 retries, the client is still unable to connect.
68-
#. because we set ``cluster_error_retry_attempts=1``, before giving up, the client starts a cluster update, removes the failed node from the startup nodes, and re-initializes the cluster.
69-
#. after the cluster has been re-initialized, it starts a new cycle of retries, up to 6 retries, with an exponential backoff.
70-
#. if the client can connect, we're good. Otherwise, the exception is finally raised to the caller, because we've run out of attempts.
72+
#. because we set ``retry=Retry(ExponentialBackoff(), 6)``, the cluster client starts a cluster update, removes the failed node from the startup nodes, and re-initializes the cluster.
73+
#. the cluster client retries the command until it either succeeds or the max number of retries is reached.

redis/asyncio/cluster.py

Lines changed: 55 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
from redis.asyncio.lock import Lock
3030
from redis.asyncio.retry import Retry
3131
from redis.auth.token import TokenInterface
32-
from redis.backoff import default_backoff
32+
from redis.backoff import ExponentialWithJitterBackoff, NoBackoff
3333
from redis.client import EMPTY_RESPONSE, NEVER_DECODE, AbstractRedis
3434
from redis.cluster import (
3535
PIPELINE_BLOCKED_COMMANDS,
@@ -143,19 +143,23 @@ class RedisCluster(AbstractRedis, AbstractRedisCluster, AsyncRedisClusterCommand
143143
To avoid reinitializing the cluster on moved errors, set reinitialize_steps to
144144
0.
145145
:param cluster_error_retry_attempts:
146-
| Number of times to retry before raising an error when :class:`~.TimeoutError`
147-
or :class:`~.ConnectionError` or :class:`~.ClusterDownError` are encountered
148-
:param connection_error_retry_attempts:
149-
| Number of times to retry before reinitializing when :class:`~.TimeoutError`
150-
or :class:`~.ConnectionError` are encountered.
151-
The default backoff strategy will be set if Retry object is not passed (see
152-
default_backoff in backoff.py). To change it, pass a custom Retry object
153-
using the "retry" keyword.
146+
| @deprecated - Please configure the 'retry' object instead
147+
In case 'retry' object is set - this argument is ignored!
148+
149+
Number of times to retry before raising an error when :class:`~.TimeoutError`,
150+
:class:`~.ConnectionError`, :class:`~.SlotNotCoveredError`
151+
or :class:`~.ClusterDownError` are encountered
152+
:param retry:
153+
| A retry object that defines the retry strategy and the number of
154+
retries for the cluster client.
155+
In current implementation for the cluster client (starting form redis-py version 6.0.0)
156+
the retry object is not yet fully utilized, instead it is used just to determine
157+
the number of retries for the cluster client.
158+
In the future releases the retry object will be used to handle the cluster client retries!
154159
:param max_connections:
155160
| Maximum number of connections per node. If there are no free connections & the
156161
maximum number of connections are already created, a
157-
:class:`~.MaxConnectionsError` is raised. This error may be retried as defined
158-
by :attr:`connection_error_retry_attempts`
162+
:class:`~.MaxConnectionsError` is raised.
159163
:param address_remap:
160164
| An optional callable which, when provided with an internal network
161165
address of a node, e.g. a `(host, port)` tuple, will return the address
@@ -211,10 +215,9 @@ def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
211215
__slots__ = (
212216
"_initialize",
213217
"_lock",
214-
"cluster_error_retry_attempts",
218+
"retry",
215219
"command_flags",
216220
"commands_parser",
217-
"connection_error_retry_attempts",
218221
"connection_kwargs",
219222
"encoder",
220223
"node_flags",
@@ -231,6 +234,13 @@ def from_url(cls, url: str, **kwargs: Any) -> "RedisCluster":
231234
reason="Please configure the 'load_balancing_strategy' instead",
232235
version="5.3.0",
233236
)
237+
@deprecated_args(
238+
args_to_warn=[
239+
"cluster_error_retry_attempts",
240+
],
241+
reason="Please configure the 'retry' object instead",
242+
version="6.0.0",
243+
)
234244
def __init__(
235245
self,
236246
host: Optional[str] = None,
@@ -242,8 +252,9 @@ def __init__(
242252
load_balancing_strategy: Optional[LoadBalancingStrategy] = None,
243253
reinitialize_steps: int = 5,
244254
cluster_error_retry_attempts: int = 3,
245-
connection_error_retry_attempts: int = 3,
246255
max_connections: int = 2**31,
256+
retry: Optional["Retry"] = None,
257+
retry_on_error: Optional[List[Type[Exception]]] = None,
247258
# Client related kwargs
248259
db: Union[str, int] = 0,
249260
path: Optional[str] = None,
@@ -264,8 +275,6 @@ def __init__(
264275
socket_keepalive: bool = False,
265276
socket_keepalive_options: Optional[Mapping[int, Union[int, bytes]]] = None,
266277
socket_timeout: Optional[float] = None,
267-
retry: Optional["Retry"] = None,
268-
retry_on_error: Optional[List[Type[Exception]]] = None,
269278
# SSL related kwargs
270279
ssl: bool = False,
271280
ssl_ca_certs: Optional[str] = None,
@@ -320,7 +329,6 @@ def __init__(
320329
"socket_keepalive": socket_keepalive,
321330
"socket_keepalive_options": socket_keepalive_options,
322331
"socket_timeout": socket_timeout,
323-
"retry": retry,
324332
"protocol": protocol,
325333
}
326334

@@ -344,17 +352,15 @@ def __init__(
344352
# Call our on_connect function to configure READONLY mode
345353
kwargs["redis_connect_func"] = self.on_connect
346354

347-
self.retry = retry
348-
if retry or retry_on_error or connection_error_retry_attempts > 0:
349-
# Set a retry object for all cluster nodes
350-
self.retry = retry or Retry(
351-
default_backoff(), connection_error_retry_attempts
355+
if retry:
356+
self.retry = retry
357+
else:
358+
self.retry = Retry(
359+
backoff=ExponentialWithJitterBackoff(base=1, cap=10),
360+
retries=cluster_error_retry_attempts,
352361
)
353-
if not retry_on_error:
354-
# Default errors for retrying
355-
retry_on_error = [ConnectionError, TimeoutError]
362+
if retry_on_error:
356363
self.retry.update_supported_errors(retry_on_error)
357-
kwargs.update({"retry": self.retry})
358364

359365
kwargs["response_callbacks"] = _RedisCallbacks.copy()
360366
if kwargs.get("protocol") in ["3", 3]:
@@ -391,8 +397,6 @@ def __init__(
391397
self.read_from_replicas = read_from_replicas
392398
self.load_balancing_strategy = load_balancing_strategy
393399
self.reinitialize_steps = reinitialize_steps
394-
self.cluster_error_retry_attempts = cluster_error_retry_attempts
395-
self.connection_error_retry_attempts = connection_error_retry_attempts
396400
self.reinitialize_counter = 0
397401
self.commands_parser = AsyncCommandsParser()
398402
self.node_flags = self.__class__.NODE_FLAGS.copy()
@@ -563,15 +567,8 @@ def get_connection_kwargs(self) -> Dict[str, Optional[Any]]:
563567
"""Get the kwargs passed to :class:`~redis.asyncio.connection.Connection`."""
564568
return self.connection_kwargs
565569

566-
def get_retry(self) -> Optional["Retry"]:
567-
return self.retry
568-
569-
def set_retry(self, retry: "Retry") -> None:
570+
def set_retry(self, retry: Retry) -> None:
570571
self.retry = retry
571-
for node in self.get_nodes():
572-
node.connection_kwargs.update({"retry": retry})
573-
for conn in node._connections:
574-
conn.retry = retry
575572

576573
def set_response_callback(self, command: str, callback: ResponseCallbackT) -> None:
577574
"""Set a custom response callback."""
@@ -690,8 +687,8 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
690687
"""
691688
Execute a raw command on the appropriate cluster node or target_nodes.
692689
693-
It will retry the command as specified by :attr:`cluster_error_retry_attempts` &
694-
then raise an exception.
690+
It will retry the command as specified by the retries property of
691+
the :attr:`retry` & then raise an exception.
695692
696693
:param args:
697694
| Raw command args
@@ -707,7 +704,7 @@ async def execute_command(self, *args: EncodableT, **kwargs: Any) -> Any:
707704
command = args[0]
708705
target_nodes = []
709706
target_nodes_specified = False
710-
retry_attempts = self.cluster_error_retry_attempts
707+
retry_attempts = self.retry.get_retries()
711708

712709
passed_targets = kwargs.pop("target_nodes", None)
713710
if passed_targets and not self._is_node_flag(passed_targets):
@@ -1050,7 +1047,23 @@ def acquire_connection(self) -> Connection:
10501047
return self._free.popleft()
10511048
except IndexError:
10521049
if len(self._connections) < self.max_connections:
1053-
connection = self.connection_class(**self.connection_kwargs)
1050+
# We are configuring the connection pool not to retry
1051+
# connections on lower level clients to avoid retrying
1052+
# connections to nodes that are not reachable
1053+
# and to avoid blocking the connection pool.
1054+
# The only error that will have some handling in the lower
1055+
# level clients is ConnectionError which will trigger disconnection
1056+
# of the socket.
1057+
# The retries will be handled on cluster client level
1058+
# where we will have proper handling of the cluster topology
1059+
retry = Retry(
1060+
backoff=NoBackoff(),
1061+
retries=0,
1062+
supported_errors=(ConnectionError,),
1063+
)
1064+
connection_kwargs = self.connection_kwargs.copy()
1065+
connection_kwargs["retry"] = retry
1066+
connection = self.connection_class(**connection_kwargs)
10541067
self._connections.append(connection)
10551068
return connection
10561069

@@ -1546,7 +1559,7 @@ async def execute(
15461559
"""
15471560
Execute the pipeline.
15481561
1549-
It will retry the commands as specified by :attr:`cluster_error_retry_attempts`
1562+
It will retry the commands as specified by retries specified in :attr:`retry`
15501563
& then raise an exception.
15511564
15521565
:param raise_on_error:
@@ -1562,7 +1575,7 @@ async def execute(
15621575
return []
15631576

15641577
try:
1565-
retry_attempts = self._client.cluster_error_retry_attempts
1578+
retry_attempts = self._client.retry.get_retries()
15661579
while True:
15671580
try:
15681581
if self._client._initialize:

redis/asyncio/retry.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,18 @@ def update_supported_errors(self, specified_errors: list):
4343
set(self._supported_errors + tuple(specified_errors))
4444
)
4545

46+
def get_retries(self) -> int:
47+
"""
48+
Get the number of retries.
49+
"""
50+
return self._retries
51+
52+
def update_retries(self, value: int) -> None:
53+
"""
54+
Set the number of retries.
55+
"""
56+
self._retries = value
57+
4658
async def call_with_retry(
4759
self, do: Callable[[], Awaitable[T]], fail: Callable[[RedisError], Any]
4860
) -> T:

0 commit comments

Comments
 (0)