39
39
)
40
40
from redis .asyncio .lock import Lock
41
41
from redis .asyncio .retry import Retry
42
+ from redis .backoff import ExponentialWithJitterBackoff
42
43
from redis .client import (
43
44
EMPTY_RESPONSE ,
44
45
NEVER_DECODE ,
65
66
PubSubError ,
66
67
RedisError ,
67
68
ResponseError ,
68
- TimeoutError ,
69
69
WatchError ,
70
70
)
71
71
from redis .typing import ChannelT , EncodableT , KeyT
72
72
from redis .utils import (
73
73
HIREDIS_AVAILABLE ,
74
74
SSL_AVAILABLE ,
75
75
_set_info_logger ,
76
+ deprecated_args ,
76
77
deprecated_function ,
77
78
get_lib_version ,
78
79
safe_str ,
@@ -208,6 +209,11 @@ def from_pool(
208
209
client .auto_close_connection_pool = True
209
210
return client
210
211
212
+ @deprecated_args (
213
+ args_to_warn = ["retry_on_timeout" ],
214
+ reason = "TimeoutError is included by default." ,
215
+ version = "6.0.0" ,
216
+ )
211
217
def __init__ (
212
218
self ,
213
219
* ,
@@ -226,6 +232,9 @@ def __init__(
226
232
decode_responses : bool = False ,
227
233
check_server_ready : bool = False ,
228
234
retry_on_timeout : bool = False ,
235
+ retry : Retry = Retry (
236
+ backoff = ExponentialWithJitterBackoff (base = 1 , cap = 10 ), retries = 3
237
+ ),
229
238
retry_on_error : Optional [list ] = None ,
230
239
ssl : bool = False ,
231
240
ssl_keyfile : Optional [str ] = None ,
@@ -243,7 +252,6 @@ def __init__(
243
252
lib_name : Optional [str ] = "redis-py" ,
244
253
lib_version : Optional [str ] = get_lib_version (),
245
254
username : Optional [str ] = None ,
246
- retry : Optional [Retry ] = None ,
247
255
auto_close_connection_pool : Optional [bool ] = None ,
248
256
redis_connect_func = None ,
249
257
credential_provider : Optional [CredentialProvider ] = None ,
@@ -252,10 +260,24 @@ def __init__(
252
260
):
253
261
"""
254
262
Initialize a new Redis client.
255
- To specify a retry policy for specific errors, first set
256
- `retry_on_error` to a list of the error/s to retry on, then set
257
- `retry` to a valid `Retry` object.
258
- To retry on TimeoutError, `retry_on_timeout` can also be set to `True`.
263
+
264
+ To specify a retry policy for specific errors, you have two options:
265
+
266
+ 1. Set the `retry_on_error` to a list of the error/s to retry on, and
267
+ you can also set `retry` to a valid `Retry` object(in case the default
268
+ one is not appropriate) - with this approach the retries will be triggered
269
+ on the default errors specified in the Retry object enriched with the
270
+ errors specified in `retry_on_error`.
271
+
272
+ 2. Define a `Retry` object with configured 'supported_errors' and set
273
+ it to the `retry` parameter - with this approach you completely redefine
274
+ the errors on which retries will happen.
275
+
276
+ `retry_on_timeout` is deprecated - please include the TimeoutError
277
+ either in the Retry object or in the `retry_on_error` list.
278
+
279
+ When 'connection_pool' is provided - the retry configuration of the
280
+ provided pool will be used.
259
281
260
282
Args:
261
283
check_server_ready: if `True`, an extra handshake is performed by sending a PING command, since
@@ -285,8 +307,6 @@ def __init__(
285
307
# Create internal connection pool, expected to be closed by Redis instance
286
308
if not retry_on_error :
287
309
retry_on_error = []
288
- if retry_on_timeout is True :
289
- retry_on_error .append (TimeoutError )
290
310
kwargs = {
291
311
"db" : db ,
292
312
"username" : username ,
@@ -297,7 +317,6 @@ def __init__(
297
317
"encoding_errors" : encoding_errors ,
298
318
"decode_responses" : decode_responses ,
299
319
"check_server_ready" : check_server_ready ,
300
- "retry_on_timeout" : retry_on_timeout ,
301
320
"retry_on_error" : retry_on_error ,
302
321
"retry" : copy .deepcopy (retry ),
303
322
"max_connections" : max_connections ,
@@ -409,10 +428,10 @@ def get_connection_kwargs(self):
409
428
"""Get the connection's key-word arguments"""
410
429
return self .connection_pool .connection_kwargs
411
430
412
- def get_retry (self ) -> Optional [" Retry" ]:
431
+ def get_retry (self ) -> Optional [Retry ]:
413
432
return self .get_connection_kwargs ().get ("retry" )
414
433
415
- def set_retry (self , retry : " Retry" ) -> None :
434
+ def set_retry (self , retry : Retry ) -> None :
416
435
self .get_connection_kwargs ().update ({"retry" : retry })
417
436
self .connection_pool .set_retry (retry )
418
437
@@ -639,18 +658,17 @@ async def _send_command_parse_response(self, conn, command_name, *args, **option
639
658
await conn .send_command (* args )
640
659
return await self .parse_response (conn , command_name , ** options )
641
660
642
- async def _disconnect_raise (self , conn : Connection , error : Exception ):
661
+ async def _close_connection (self , conn : Connection ):
643
662
"""
644
- Close the connection and raise an exception
645
- if retry_on_error is not set or the error
646
- is not one of the specified error types
663
+ Close the connection before retrying.
664
+
665
+ The supported exceptions are already checked in the
666
+ retry object so we don't need to do it here.
667
+
668
+ After we disconnect the connection, it will try to reconnect and
669
+ do a health check as part of the send_command logic(on connection level).
647
670
"""
648
671
await conn .disconnect ()
649
- if (
650
- conn .retry_on_error is None
651
- or isinstance (error , tuple (conn .retry_on_error )) is False
652
- ):
653
- raise error
654
672
655
673
# COMMAND EXECUTION AND PROTOCOL PARSING
656
674
async def execute_command (self , * args , ** options ):
@@ -667,7 +685,7 @@ async def execute_command(self, *args, **options):
667
685
lambda : self ._send_command_parse_response (
668
686
conn , command_name , * args , ** options
669
687
),
670
- lambda error : self ._disconnect_raise (conn , error ),
688
+ lambda _ : self ._close_connection (conn ),
671
689
)
672
690
finally :
673
691
if self .single_connection_client :
@@ -935,19 +953,11 @@ async def connect(self):
935
953
)
936
954
)
937
955
938
- async def _disconnect_raise_connect (self , conn , error ):
956
+ async def _reconnect (self , conn ):
939
957
"""
940
- Close the connection and raise an exception
941
- if retry_on_error is not set or the error is not one
942
- of the specified error types. Otherwise, try to
943
- reconnect
958
+ Try to reconnect
944
959
"""
945
960
await conn .disconnect ()
946
- if (
947
- conn .retry_on_error is None
948
- or isinstance (error , tuple (conn .retry_on_error )) is False
949
- ):
950
- raise error
951
961
await conn .connect ()
952
962
953
963
async def _execute (self , conn , command , * args , ** kwargs ):
@@ -960,7 +970,7 @@ async def _execute(self, conn, command, *args, **kwargs):
960
970
"""
961
971
return await conn .retry .call_with_retry (
962
972
lambda : command (* args , ** kwargs ),
963
- lambda error : self ._disconnect_raise_connect (conn , error ),
973
+ lambda _ : self ._reconnect (conn ),
964
974
)
965
975
966
976
async def parse_response (self , block : bool = True , timeout : float = 0 ):
@@ -1251,7 +1261,8 @@ class Pipeline(Redis): # lgtm [py/init-calls-subclass]
1251
1261
in one transmission. This is convenient for batch processing, such as
1252
1262
saving all the values in a list to Redis.
1253
1263
1254
- All commands executed within a pipeline are wrapped with MULTI and EXEC
1264
+ All commands executed within a pipeline(when running in transactional mode,
1265
+ which is the default behavior) are wrapped with MULTI and EXEC
1255
1266
calls. This guarantees all commands executed in the pipeline will be
1256
1267
executed atomically.
1257
1268
@@ -1280,7 +1291,7 @@ def __init__(
1280
1291
self .shard_hint = shard_hint
1281
1292
self .watching = False
1282
1293
self .command_stack : CommandStackT = []
1283
- self .scripts : Set [" Script" ] = set ()
1294
+ self .scripts : Set [Script ] = set ()
1284
1295
self .explicit_transaction = False
1285
1296
1286
1297
async def __aenter__ (self : _RedisT ) -> _RedisT :
@@ -1352,36 +1363,36 @@ def execute_command(
1352
1363
return self .immediate_execute_command (* args , ** kwargs )
1353
1364
return self .pipeline_execute_command (* args , ** kwargs )
1354
1365
1355
- async def _disconnect_reset_raise (self , conn , error ):
1366
+ async def _disconnect_reset_raise_on_watching (
1367
+ self ,
1368
+ conn : Connection ,
1369
+ error : Exception ,
1370
+ ):
1356
1371
"""
1357
- Close the connection, reset watching state and
1358
- raise an exception if we were watching,
1359
- if retry_on_error is not set or the error is not one
1360
- of the specified error types.
1372
+ Close the connection reset watching state and
1373
+ raise an exception if we were watching.
1374
+
1375
+ The supported exceptions are already checked in the
1376
+ retry object so we don't need to do it here.
1377
+
1378
+ After we disconnect the connection, it will try to reconnect and
1379
+ do a health check as part of the send_command logic(on connection level).
1361
1380
"""
1362
1381
await conn .disconnect ()
1363
1382
# if we were already watching a variable, the watch is no longer
1364
1383
# valid since this connection has died. raise a WatchError, which
1365
1384
# indicates the user should retry this transaction.
1366
1385
if self .watching :
1367
- await self .aclose ()
1386
+ await self .reset ()
1368
1387
raise WatchError (
1369
- "A ConnectionError occurred on while watching one or more keys"
1388
+ f "A { type ( error ). __name__ } occurred while watching one or more keys"
1370
1389
)
1371
- # if retry_on_error is not set or the error is not one
1372
- # of the specified error types, raise it
1373
- if (
1374
- conn .retry_on_error is None
1375
- or isinstance (error , tuple (conn .retry_on_error )) is False
1376
- ):
1377
- await self .aclose ()
1378
- raise
1379
1390
1380
1391
async def immediate_execute_command (self , * args , ** options ):
1381
1392
"""
1382
- Execute a command immediately, but don't auto-retry on a
1383
- ConnectionError if we're already WATCHing a variable. Used when
1384
- issuing WATCH or subsequent commands retrieving their values but before
1393
+ Execute a command immediately, but don't auto-retry on the supported
1394
+ errors for retry if we're already WATCHing a variable.
1395
+ Used when issuing WATCH or subsequent commands retrieving their values but before
1385
1396
MULTI is called.
1386
1397
"""
1387
1398
command_name = args [0 ]
@@ -1395,7 +1406,7 @@ async def immediate_execute_command(self, *args, **options):
1395
1406
lambda : self ._send_command_parse_response (
1396
1407
conn , command_name , * args , ** options
1397
1408
),
1398
- lambda error : self ._disconnect_reset_raise (conn , error ),
1409
+ lambda error : self ._disconnect_reset_raise_on_watching (conn , error ),
1399
1410
)
1400
1411
1401
1412
def pipeline_execute_command (self , * args , ** options ):
@@ -1550,28 +1561,24 @@ async def load_scripts(self):
1550
1561
if not exist :
1551
1562
s .sha = await immediate ("SCRIPT LOAD" , s .script )
1552
1563
1553
- async def _disconnect_raise_reset (self , conn : Connection , error : Exception ):
1564
+ async def _disconnect_raise_on_watching (self , conn : Connection , error : Exception ):
1554
1565
"""
1555
- Close the connection, raise an exception if we were watching,
1556
- and raise an exception if retry_on_error is not set or the
1557
- error is not one of the specified error types.
1566
+ Close the connection, raise an exception if we were watching.
1567
+
1568
+ The supported exceptions are already checked in the
1569
+ retry object so we don't need to do it here.
1570
+
1571
+ After we disconnect the connection, it will try to reconnect and
1572
+ do a health check as part of the send_command logic(on connection level).
1558
1573
"""
1559
1574
await conn .disconnect ()
1560
1575
# if we were watching a variable, the watch is no longer valid
1561
1576
# since this connection has died. raise a WatchError, which
1562
1577
# indicates the user should retry this transaction.
1563
1578
if self .watching :
1564
1579
raise WatchError (
1565
- "A ConnectionError occurred on while watching one or more keys"
1580
+ f "A { type ( error ). __name__ } occurred while watching one or more keys"
1566
1581
)
1567
- # if retry_on_error is not set or the error is not one
1568
- # of the specified error types, raise it
1569
- if (
1570
- conn .retry_on_error is None
1571
- or isinstance (error , tuple (conn .retry_on_error )) is False
1572
- ):
1573
- await self .reset ()
1574
- raise
1575
1582
1576
1583
async def execute (self , raise_on_error : bool = True ) -> List [Any ]:
1577
1584
"""Execute all the commands in the current pipeline"""
@@ -1596,7 +1603,7 @@ async def execute(self, raise_on_error: bool = True) -> List[Any]:
1596
1603
try :
1597
1604
return await conn .retry .call_with_retry (
1598
1605
lambda : execute (conn , stack , raise_on_error ),
1599
- lambda error : self ._disconnect_raise_reset (conn , error ),
1606
+ lambda error : self ._disconnect_raise_on_watching (conn , error ),
1600
1607
)
1601
1608
finally :
1602
1609
await self .reset ()
0 commit comments