9
9
10
10
from redis ._parsers import CommandsParser , Encoder
11
11
from redis ._parsers .helpers import parse_scan
12
- from redis .backoff import default_backoff
12
+ from redis .backoff import ExponentialWithJitterBackoff , NoBackoff
13
13
from redis .cache import CacheConfig , CacheFactory , CacheFactoryInterface , CacheInterface
14
14
from redis .client import CaseInsensitiveDict , PubSub , Redis
15
15
from redis .commands import READ_COMMANDS , RedisClusterCommands
@@ -179,7 +179,7 @@ def parse_cluster_myshardid(resp, **options):
179
179
"cache" ,
180
180
"cache_config" ,
181
181
)
182
- KWARGS_DISABLED_KEYS = ("host" , "port" )
182
+ KWARGS_DISABLED_KEYS = ("host" , "port" , "retry" )
183
183
184
184
185
185
def cleanup_kwargs (** kwargs ):
@@ -436,7 +436,7 @@ def replace_default_node(self, target_node: "ClusterNode" = None) -> None:
436
436
# Choose a primary if the cluster contains different primaries
437
437
self .nodes_manager .default_node = random .choice (primaries )
438
438
else :
439
- # Otherwise, hoose a primary if the cluster contains different primaries
439
+ # Otherwise, choose a primary if the cluster contains different primaries
440
440
replicas = [node for node in self .get_replicas () if node != curr_node ]
441
441
if replicas :
442
442
self .nodes_manager .default_node = random .choice (replicas )
@@ -492,6 +492,13 @@ class initializer. In the case of conflicting arguments, querystring
492
492
reason = "Please configure the 'load_balancing_strategy' instead" ,
493
493
version = "5.0.3" ,
494
494
)
495
+ @deprecated_args (
496
+ args_to_warn = [
497
+ "cluster_error_retry_attempts" ,
498
+ ],
499
+ reason = "Please configure the 'retry' object instead" ,
500
+ version = "6.0.0" ,
501
+ )
495
502
def __init__ (
496
503
self ,
497
504
host : Optional [str ] = None ,
@@ -549,9 +556,19 @@ def __init__(
549
556
If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
550
557
specific IP addresses, it is best to set it to false.
551
558
:param cluster_error_retry_attempts:
559
+ @deprecated - Please configure the 'retry' object instead
560
+ In case 'retry' object is set - this argument is ignored!
561
+
552
562
Number of times to retry before raising an error when
553
- :class:`~.TimeoutError` or :class:`~.ConnectionError` or
563
+ :class:`~.TimeoutError` or :class:`~.ConnectionError`, :class:`~.SlotNotCoveredError` or
554
564
:class:`~.ClusterDownError` are encountered
565
+ :param retry:
566
+ A retry object that defines the retry strategy and the number of
567
+ retries for the cluster client.
568
+ In current implementation for the cluster client (starting form redis-py version 6.0.0)
569
+ the retry object is not yet fully utilized, instead it is used just to determine
570
+ the number of retries for the cluster client.
571
+ In the future releases the retry object will be used to handle the cluster client retries!
555
572
:param reinitialize_steps:
556
573
Specifies the number of MOVED errors that need to occur before
557
574
reinitializing the whole cluster topology. If a MOVED error occurs
@@ -571,7 +588,8 @@ def __init__(
571
588
572
589
:**kwargs:
573
590
Extra arguments that will be sent into Redis instance when created
574
- (See Official redis-py doc for supported kwargs
591
+ (See Official redis-py doc for supported kwargs - the only limitation
592
+ is that you can't provide 'retry' object as part of kwargs.
575
593
[https://github.com/andymccurdy/redis-py/blob/master/redis/client.py])
576
594
Some kwargs are not supported and will raise a
577
595
RedisClusterException:
@@ -586,6 +604,15 @@ def __init__(
586
604
"Argument 'db' is not possible to use in cluster mode"
587
605
)
588
606
607
+ if "retry" in kwargs :
608
+ # Argument 'retry' is not possible to be used in kwargs when in cluster mode
609
+ # the kwargs are set to the lower level connections to the cluster nodes
610
+ # and there we provide retry configuration without retries allowed.
611
+ # The retries should be handled on cluster client level.
612
+ raise RedisClusterException (
613
+ "Argument 'retry' is not possible to be used in kwargs when in cluster mode"
614
+ )
615
+
589
616
# Get the startup node/s
590
617
from_url = False
591
618
if url is not None :
@@ -628,9 +655,11 @@ def __init__(
628
655
kwargs = cleanup_kwargs (** kwargs )
629
656
if retry :
630
657
self .retry = retry
631
- kwargs .update ({"retry" : self .retry })
632
658
else :
633
- kwargs .update ({"retry" : Retry (default_backoff (), 0 )})
659
+ self .retry = Retry (
660
+ backoff = ExponentialWithJitterBackoff (base = 1 , cap = 10 ),
661
+ retries = cluster_error_retry_attempts ,
662
+ )
634
663
635
664
self .encoder = Encoder (
636
665
kwargs .get ("encoding" , "utf-8" ),
@@ -772,13 +801,13 @@ def set_default_node(self, node):
772
801
self .nodes_manager .default_node = node
773
802
return True
774
803
775
- def get_retry (self ) -> Optional [ " Retry" ] :
804
+ def get_retry (self ) -> Retry :
776
805
return self .retry
777
806
778
- def set_retry (self , retry : "Retry" ) -> None :
807
+ def set_retry (self , retry : Retry ) -> None :
808
+ if not isinstance (retry , Retry ):
809
+ raise TypeError ("retry must be a valid instance of redis.retry.Retry" )
779
810
self .retry = retry
780
- for node in self .get_nodes ():
781
- node .redis_connection .set_retry (retry )
782
811
783
812
def monitor (self , target_node = None ):
784
813
"""
@@ -829,6 +858,7 @@ def pipeline(self, transaction=None, shard_hint=None):
829
858
read_from_replicas = self .read_from_replicas ,
830
859
load_balancing_strategy = self .load_balancing_strategy ,
831
860
reinitialize_steps = self .reinitialize_steps ,
861
+ retry = self .retry ,
832
862
lock = self ._lock ,
833
863
)
834
864
@@ -1117,9 +1147,7 @@ def _internal_execute_command(self, *args, **kwargs):
1117
1147
# execution since the nodes may not be valid anymore after the tables
1118
1148
# were reinitialized. So in case of passed target nodes,
1119
1149
# retry_attempts will be set to 0.
1120
- retry_attempts = (
1121
- 0 if target_nodes_specified else self .cluster_error_retry_attempts
1122
- )
1150
+ retry_attempts = 0 if target_nodes_specified else self .retry .get_retries ()
1123
1151
# Add one for the first execution
1124
1152
execute_attempts = 1 + retry_attempts
1125
1153
for _ in range (execute_attempts ):
@@ -1333,8 +1361,12 @@ def __eq__(self, obj):
1333
1361
return isinstance (obj , ClusterNode ) and obj .name == self .name
1334
1362
1335
1363
def __del__ (self ):
1336
- if self .redis_connection is not None :
1337
- self .redis_connection .close ()
1364
+ try :
1365
+ if self .redis_connection is not None :
1366
+ self .redis_connection .close ()
1367
+ except Exception :
1368
+ # Ignore errors when closing the connection
1369
+ pass
1338
1370
1339
1371
1340
1372
class LoadBalancingStrategy (Enum ):
@@ -1585,17 +1617,27 @@ def create_redis_connections(self, nodes):
1585
1617
)
1586
1618
1587
1619
def create_redis_node (self , host , port , ** kwargs ):
1620
+ # We are configuring the connection pool to not retry
1621
+ # connections on lower level clients to avoid retrying
1622
+ # connections to nodes that are not reachable
1623
+ # and to avoid blocking the connection pool.
1624
+ # The retries will be handled on cluster client level
1625
+ # where we will have proper handling of the cluster topology
1626
+ node_retry_config = Retry (backoff = NoBackoff (), retries = 0 )
1627
+
1588
1628
if self .from_url :
1589
1629
# Create a redis node with a costumed connection pool
1590
1630
kwargs .update ({"host" : host })
1591
1631
kwargs .update ({"port" : port })
1592
1632
kwargs .update ({"cache" : self ._cache })
1633
+ kwargs .update ({"retry" : node_retry_config })
1593
1634
r = Redis (connection_pool = self .connection_pool_class (** kwargs ))
1594
1635
else :
1595
1636
r = Redis (
1596
1637
host = host ,
1597
1638
port = port ,
1598
1639
cache = self ._cache ,
1640
+ retry = node_retry_config ,
1599
1641
** kwargs ,
1600
1642
)
1601
1643
return r
@@ -2039,6 +2081,13 @@ class ClusterPipeline(RedisCluster):
2039
2081
TryAgainError ,
2040
2082
)
2041
2083
2084
+ @deprecated_args (
2085
+ args_to_warn = [
2086
+ "cluster_error_retry_attempts" ,
2087
+ ],
2088
+ reason = "Please configure the 'retry' object instead" ,
2089
+ version = "6.0.0" ,
2090
+ )
2042
2091
def __init__ (
2043
2092
self ,
2044
2093
nodes_manager : "NodesManager" ,
@@ -2050,6 +2099,7 @@ def __init__(
2050
2099
load_balancing_strategy : Optional [LoadBalancingStrategy ] = None ,
2051
2100
cluster_error_retry_attempts : int = 3 ,
2052
2101
reinitialize_steps : int = 5 ,
2102
+ retry : Optional [Retry ] = None ,
2053
2103
lock = None ,
2054
2104
** kwargs ,
2055
2105
):
@@ -2069,6 +2119,14 @@ def __init__(
2069
2119
self .cluster_error_retry_attempts = cluster_error_retry_attempts
2070
2120
self .reinitialize_counter = 0
2071
2121
self .reinitialize_steps = reinitialize_steps
2122
+ if retry is not None :
2123
+ self .retry = retry
2124
+ else :
2125
+ self .retry = Retry (
2126
+ backoff = ExponentialWithJitterBackoff (base = 1 , cap = 10 ),
2127
+ retries = self .cluster_error_retry_attempts ,
2128
+ )
2129
+
2072
2130
self .encoder = Encoder (
2073
2131
kwargs .get ("encoding" , "utf-8" ),
2074
2132
kwargs .get ("encoding_errors" , "strict" ),
@@ -2202,7 +2260,7 @@ def send_cluster_commands(
2202
2260
"""
2203
2261
if not stack :
2204
2262
return []
2205
- retry_attempts = self .cluster_error_retry_attempts
2263
+ retry_attempts = self .retry . get_retries ()
2206
2264
while True :
2207
2265
try :
2208
2266
return self ._send_cluster_commands (
0 commit comments