9
9
10
10
from redis ._parsers import CommandsParser , Encoder
11
11
from redis ._parsers .helpers import parse_scan
12
- from redis .backoff import default_backoff
12
+ from redis .backoff import ExponentialWithJitterBackoff , NoBackoff
13
13
from redis .cache import CacheConfig , CacheFactory , CacheFactoryInterface , CacheInterface
14
14
from redis .client import CaseInsensitiveDict , PubSub , Redis
15
15
from redis .commands import READ_COMMANDS , RedisClusterCommands
@@ -179,7 +179,7 @@ def parse_cluster_myshardid(resp, **options):
179
179
"cache" ,
180
180
"cache_config" ,
181
181
)
182
- KWARGS_DISABLED_KEYS = ("host" , "port" )
182
+ KWARGS_DISABLED_KEYS = ("host" , "port" , "retry" )
183
183
184
184
185
185
def cleanup_kwargs (** kwargs ):
@@ -431,7 +431,7 @@ def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
431
431
# Choose a primary if the cluster contains different primaries
432
432
self .nodes_manager .default_node = random .choice (primaries )
433
433
else :
434
- # Otherwise, hoose a primary if the cluster contains different primaries
434
+ # Otherwise, choose a primary if the cluster contains different primaries
435
435
replicas = [node for node in self .get_replicas () if node != curr_node ]
436
436
if replicas :
437
437
self .nodes_manager .default_node = random .choice (replicas )
@@ -487,6 +487,13 @@ class initializer. In the case of conflicting arguments, querystring
487
487
reason = "Please configure the 'load_balancing_strategy' instead" ,
488
488
version = "5.0.3" ,
489
489
)
490
+ @deprecated_args (
491
+ args_to_warn = [
492
+ "cluster_error_retry_attempts" ,
493
+ ],
494
+ reason = "Please configure the 'retry' object instead" ,
495
+ version = "6.0.0" ,
496
+ )
490
497
def __init__ (
491
498
self ,
492
499
host : Optional [str ] = None ,
@@ -544,9 +551,19 @@ def __init__(
544
551
If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
545
552
specific IP addresses, it is best to set it to false.
546
553
:param cluster_error_retry_attempts:
554
+ @deprecated - Please configure the 'retry' object instead
555
+ In case 'retry' object is set - this argument is ignored!
556
+
547
557
Number of times to retry before raising an error when
548
- :class:`~.TimeoutError` or :class:`~.ConnectionError` or
558
+ :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or
549
559
:class:`~.ClusterDownError` are encountered
560
+ :param retry:
561
+ A retry object that defines the retry strategy and the number of
562
+ retries for the cluster client.
563
+ In current implementation for the cluster client (starting form redis-py version 6.0.0)
564
+ the retry object is not yet fully utilized, instead it is used just to determine
565
+ the number of retries for the cluster client.
566
+ In the future releases the retry object will be used to handle the cluster client retries!
550
567
:param reinitialize_steps:
551
568
Specifies the number of MOVED errors that need to occur before
552
569
reinitializing the whole cluster topology. If a MOVED error occurs
@@ -566,7 +583,8 @@ def __init__(
566
583
567
584
:**kwargs:
568
585
Extra arguments that will be sent into Redis instance when created
569
- (See Official redis-py doc for supported kwargs
586
+ (See Official redis-py doc for supported kwargs - the only limitation
587
+ is that you can't provide 'retry' object as part of kwargs.
570
588
[https://github.com/andymccurdy/redis-py/blob/master/redis/client.py])
571
589
Some kwargs are not supported and will raise a
572
590
RedisClusterException:
@@ -581,6 +599,15 @@ def __init__(
581
599
"Argument 'db' is not possible to use in cluster mode"
582
600
)
583
601
602
+ if "retry" in kwargs :
603
+ # Argument 'retry' is not possible to be used in kwargs when in cluster mode
604
+ # the kwargs are set to the lower level connections to the cluster nodes
605
+ # and there we provide retry configuration without retries allowed.
606
+ # The retries should be handled on cluster client level.
607
+ raise RedisClusterException (
608
+ "Argument 'retry' is not possible to be used in kwargs when in cluster mode"
609
+ )
610
+
584
611
# Get the startup node/s
585
612
from_url = False
586
613
if url is not None :
@@ -623,9 +650,11 @@ def __init__(
623
650
kwargs = cleanup_kwargs (** kwargs )
624
651
if retry :
625
652
self .retry = retry
626
- kwargs .update ({"retry" : self .retry })
627
653
else :
628
- kwargs .update ({"retry" : Retry (default_backoff (), 0 )})
654
+ self .retry = Retry (
655
+ backoff = ExponentialWithJitterBackoff (base = 1 , cap = 10 ),
656
+ retries = cluster_error_retry_attempts ,
657
+ )
629
658
630
659
self .encoder = Encoder (
631
660
kwargs .get ("encoding" , "utf-8" ),
@@ -767,13 +796,13 @@ def set_default_node(self, node):
767
796
self .nodes_manager .default_node = node
768
797
return True
769
798
770
- def get_retry (self ) -> Optional [ " Retry" ] :
799
+ def get_retry (self ) -> Retry :
771
800
return self .retry
772
801
773
- def set_retry (self , retry : "Retry" ) -> None :
802
+ def set_retry (self , retry : Retry ) -> None :
803
+ if not isinstance (retry , Retry ):
804
+ raise TypeError ("retry must be a valid instance of redis.retry.Retry" )
774
805
self .retry = retry
775
- for node in self .get_nodes ():
776
- node .redis_connection .set_retry (retry )
777
806
778
807
def monitor (self , target_node = None ):
779
808
"""
@@ -824,6 +853,7 @@ def pipeline(self, transaction=None, shard_hint=None):
824
853
read_from_replicas = self .read_from_replicas ,
825
854
load_balancing_strategy = self .load_balancing_strategy ,
826
855
reinitialize_steps = self .reinitialize_steps ,
856
+ retry = self .retry ,
827
857
lock = self ._lock ,
828
858
)
829
859
@@ -1112,9 +1142,7 @@ def _internal_execute_command(self, *args, **kwargs):
1112
1142
# execution since the nodes may not be valid anymore after the tables
1113
1143
# were reinitialized. So in case of passed target nodes,
1114
1144
# retry_attempts will be set to 0.
1115
- retry_attempts = (
1116
- 0 if target_nodes_specified else self .cluster_error_retry_attempts
1117
- )
1145
+ retry_attempts = 0 if target_nodes_specified else self .retry .get_retries ()
1118
1146
# Add one for the first execution
1119
1147
execute_attempts = 1 + retry_attempts
1120
1148
for _ in range (execute_attempts ):
@@ -1322,8 +1350,12 @@ def __eq__(self, obj):
1322
1350
return isinstance (obj , ClusterNode ) and obj .name == self .name
1323
1351
1324
1352
def __del__ (self ):
1325
- if self .redis_connection is not None :
1326
- self .redis_connection .close ()
1353
+ try :
1354
+ if self .redis_connection is not None :
1355
+ self .redis_connection .close ()
1356
+ except Exception :
1357
+ # Ignore errors when closing the connection
1358
+ pass
1327
1359
1328
1360
1329
1361
class LoadBalancingStrategy (Enum ):
@@ -1574,17 +1606,27 @@ def create_redis_connections(self, nodes):
1574
1606
)
1575
1607
1576
1608
def create_redis_node (self , host , port , ** kwargs ):
1609
+ # We are configuring the connection pool to not retry
1610
+ # connections on lower level clients to avoid retrying
1611
+ # connections to nodes that are not reachable
1612
+ # and to avoid blocking the connection pool.
1613
+ # The retries will be handled on cluster client level
1614
+ # where we will have proper handling of the cluster topology
1615
+ node_retry_config = Retry (backoff = NoBackoff (), retries = 0 )
1616
+
1577
1617
if self .from_url :
1578
1618
# Create a redis node with a costumed connection pool
1579
1619
kwargs .update ({"host" : host })
1580
1620
kwargs .update ({"port" : port })
1581
1621
kwargs .update ({"cache" : self ._cache })
1622
+ kwargs .update ({"retry" : node_retry_config })
1582
1623
r = Redis (connection_pool = self .connection_pool_class (** kwargs ))
1583
1624
else :
1584
1625
r = Redis (
1585
1626
host = host ,
1586
1627
port = port ,
1587
1628
cache = self ._cache ,
1629
+ retry = node_retry_config ,
1588
1630
** kwargs ,
1589
1631
)
1590
1632
return r
@@ -2028,6 +2070,13 @@ class ClusterPipeline(RedisCluster):
2028
2070
TryAgainError ,
2029
2071
)
2030
2072
2073
+ @deprecated_args (
2074
+ args_to_warn = [
2075
+ "cluster_error_retry_attempts" ,
2076
+ ],
2077
+ reason = "Please configure the 'retry' object instead" ,
2078
+ version = "6.0.0" ,
2079
+ )
2031
2080
def __init__ (
2032
2081
self ,
2033
2082
nodes_manager : "NodesManager" ,
@@ -2039,6 +2088,7 @@ def __init__(
2039
2088
load_balancing_strategy : Optional [LoadBalancingStrategy ] = None ,
2040
2089
cluster_error_retry_attempts : int = 3 ,
2041
2090
reinitialize_steps : int = 5 ,
2091
+ retry : Optional [Retry ] = None ,
2042
2092
lock = None ,
2043
2093
** kwargs ,
2044
2094
):
@@ -2058,6 +2108,14 @@ def __init__(
2058
2108
self .cluster_error_retry_attempts = cluster_error_retry_attempts
2059
2109
self .reinitialize_counter = 0
2060
2110
self .reinitialize_steps = reinitialize_steps
2111
+ if retry is not None :
2112
+ self .retry = retry
2113
+ else :
2114
+ self .retry = Retry (
2115
+ backoff = ExponentialWithJitterBackoff (base = 1 , cap = 10 ),
2116
+ retries = self .cluster_error_retry_attempts ,
2117
+ )
2118
+
2061
2119
self .encoder = Encoder (
2062
2120
kwargs .get ("encoding" , "utf-8" ),
2063
2121
kwargs .get ("encoding_errors" , "strict" ),
@@ -2191,7 +2249,7 @@ def send_cluster_commands(
2191
2249
"""
2192
2250
if not stack :
2193
2251
return []
2194
- retry_attempts = self .cluster_error_retry_attempts
2252
+ retry_attempts = self .retry . get_retries ()
2195
2253
while True :
2196
2254
try :
2197
2255
return self ._send_cluster_commands (
0 commit comments