3
3
4
4
# python std lib
5
5
import datetime
6
+ import json
7
+ import logging
6
8
import random
7
9
import string
8
10
import time
54
56
)
55
57
56
58
59
+ log = logging .getLogger (__name__ )
60
+
61
+
57
62
class CaseInsensitiveDict (dict ):
58
63
"Case insensitive dict implementation. Assumes string keys only."
59
64
@@ -320,13 +325,19 @@ def __init__(self, host=None, port=None, startup_nodes=None, max_connections=Non
320
325
- db (Redis do not support database SELECT in cluster mode)
321
326
"""
322
327
# Tweaks to Redis client arguments when running in cluster mode
328
+ log .info ("Created new instance of RedisCluster client instance" )
329
+ log .debug ("startup_nodes : " + json .dumps (startup_nodes , indent = 2 ))
330
+
323
331
if "db" in kwargs :
324
332
raise RedisClusterException ("Argument 'db' is not possible to use in cluster mode" )
325
333
326
- if kwargs .pop ('ssl' , False ): # Needs to be removed to avoid exception in redis Connection init
334
+ # Needs to be removed to avoid exception in redis Connection init
335
+ if kwargs .pop ('ssl' , False ):
336
+ log .info ("Patching connection_class to SSLClusterConnection" )
327
337
connection_class = SSLClusterConnection
328
338
329
339
if "connection_pool" in kwargs :
340
+ log .info ("Using custom created connection pool" )
330
341
pool = kwargs .pop ('connection_pool' )
331
342
else :
332
343
startup_nodes = [] if startup_nodes is None else startup_nodes
@@ -337,10 +348,15 @@ def __init__(self, host=None, port=None, startup_nodes=None, max_connections=Non
337
348
338
349
if readonly_mode :
339
350
connection_pool_cls = ClusterReadOnlyConnectionPool
351
+ log .info ("Using ClusterReadOnlyConnectionPool" )
340
352
elif read_from_replicas :
341
353
connection_pool_cls = ClusterWithReadReplicasConnectionPool
354
+ log .info ("Using ClusterWithReadReplicasConnectionPool" )
342
355
else :
343
356
connection_pool_cls = ClusterConnectionPool
357
+ log .info ("Using ClusterConnectionPool" )
358
+
359
+ log .debug ("Connection pool class " + str (connection_pool_cls ))
344
360
345
361
pool = connection_pool_cls (
346
362
startup_nodes = startup_nodes ,
@@ -545,6 +561,7 @@ def _execute_command(self, *args, **kwargs):
545
561
raise RedisClusterException ("Unable to determine command to use" )
546
562
547
563
command = args [0 ]
564
+ log .debug ("Command to execute : " + str (command ) + " : " + str (args ) + " : " + str (kwargs ))
548
565
549
566
# If set externally we must update it before calling any commands
550
567
if self .refresh_table_asap :
@@ -562,28 +579,34 @@ def _execute_command(self, *args, **kwargs):
562
579
try_random_node = False
563
580
slot = self ._determine_slot (* args )
564
581
ttl = int (self .RedisClusterRequestTTL )
582
+ connection_error_retry_counter = 0
565
583
566
584
while ttl > 0 :
567
585
ttl -= 1
568
586
569
- if asking :
570
- node = self .connection_pool .nodes .nodes [redirect_addr ]
571
- connection = self .connection_pool .get_connection_by_node (node )
572
- elif try_random_node :
573
- connection = self .connection_pool .get_random_connection ()
574
- try_random_node = False
575
- else :
576
- if self .refresh_table_asap :
577
- # MOVED
578
- node = self .connection_pool .get_master_node_by_slot (slot )
579
- # Reset the flag when it has been consumed to avoid it being
580
- self .refresh_table_asap = False
587
+ try :
588
+ if asking :
589
+ node = self .connection_pool .nodes .nodes [redirect_addr ]
590
+ connection = self .connection_pool .get_connection_by_node (node )
591
+ elif try_random_node :
592
+ connection = self .connection_pool .get_random_connection ()
593
+ try_random_node = False
581
594
else :
582
- node = self .connection_pool .get_node_by_slot (slot , self .read_from_replicas and (command in self .READ_COMMANDS ))
583
- is_read_replica = node ['server_type' ] == 'slave'
584
- connection = self .connection_pool .get_connection_by_node (node )
595
+ if self .refresh_table_asap :
596
+ # MOVED
597
+ node = self .connection_pool .get_master_node_by_slot (slot )
598
+ self .refresh_table_asap = False
599
+ else :
600
+ node = self .connection_pool .get_node_by_slot (
601
+ slot ,
602
+ self .read_from_replicas and (command in self .READ_COMMANDS )
603
+ )
604
+ is_read_replica = node ['server_type' ] == 'slave'
605
+
606
+ connection = self .connection_pool .get_connection_by_node (node )
607
+
608
+ log .debug ("Determined node to execute : " + str (node ))
585
609
586
- try :
587
610
if asking :
588
611
connection .send_command ('ASKING' )
589
612
self .parse_response (connection , "ASKING" , ** kwargs )
@@ -598,25 +621,53 @@ def _execute_command(self, *args, **kwargs):
598
621
connection .send_command (* args )
599
622
return self .parse_response (connection , command , ** kwargs )
600
623
except SlotNotCoveredError as e :
624
+ log .exception ("SlotNotCoveredError" )
625
+
601
626
# In some cases during failover to a replica is happening
602
627
# a slot sometimes is not covered by the cluster layout and
603
628
# we need to attempt to refresh the cluster layout and try again
604
629
self .refresh_table_asap = True
605
- time .sleep (0.05 )
630
+ time .sleep (0.1 )
606
631
607
632
# This is the last attempt before we run out of TTL, raise the exception
608
633
if ttl == 1 :
609
634
raise e
610
- except (RedisClusterException , BusyLoadingError ):
635
+ except (RedisClusterException , BusyLoadingError ) as e :
636
+ log .exception ("RedisClusterException || BusyLoadingError" )
611
637
raise
612
- except ConnectionError :
638
+ except ConnectionError as e :
639
+ log .exception ("ConnectionError" )
640
+
613
641
connection .disconnect ()
614
- except TimeoutError :
642
+ connection_error_retry_counter += 1
643
+
644
+ # Give the node 0.1 seconds to get back up and retry again with same
645
+ # node and configuration. After 5 attempts then try to reinitialize
646
+ # the cluster and see if the nodes configuration has changed or not
647
+ if connection_error_retry_counter < 5 :
648
+ time .sleep (0.25 )
649
+ else :
650
+ # Reset the counter back to 0 as it should have 5 new attempts
651
+ # after the client tries to reinitailize the cluster setup to the
652
+ # new configuration.
653
+ connection_error_retry_counter = 0
654
+ self .refresh_table_asap = True
655
+
656
+ # Hard force of reinitialize of the node/slots setup
657
+ self .connection_pool .nodes .increment_reinitialize_counter (
658
+ count = self .connection_pool .nodes .reinitialize_steps ,
659
+ )
660
+
661
+ except TimeoutError as e :
662
+ log .exception ("TimeoutError" )
663
+
615
664
if ttl < self .RedisClusterRequestTTL / 2 :
616
665
time .sleep (0.05 )
617
666
else :
618
667
try_random_node = True
619
668
except ClusterDownError as e :
669
+ log .exception ("ClusterDownError" )
670
+
620
671
self .connection_pool .disconnect ()
621
672
self .connection_pool .reset ()
622
673
self .refresh_table_asap = True
@@ -627,19 +678,27 @@ def _execute_command(self, *args, **kwargs):
627
678
# This counter will increase faster when the same client object
628
679
# is shared between multiple threads. To reduce the frequency you
629
680
# can set the variable 'reinitialize_steps' in the constructor.
681
+ log .exception ("MovedError" )
682
+
630
683
self .refresh_table_asap = True
631
684
self .connection_pool .nodes .increment_reinitialize_counter ()
632
685
633
686
node = self .connection_pool .nodes .set_node (e .host , e .port , server_type = 'master' )
634
687
self .connection_pool .nodes .slots [e .slot_id ][0 ] = node
635
688
except TryAgainError as e :
689
+ log .exception ("TryAgainError" )
690
+
636
691
if ttl < self .RedisClusterRequestTTL / 2 :
637
692
time .sleep (0.05 )
638
693
except AskError as e :
694
+ log .exception ("AskError" )
695
+
639
696
redirect_addr , asking = "{0}:{1}" .format (e .host , e .port ), True
640
697
finally :
641
698
self .connection_pool .release (connection )
642
699
700
+ log .debug ("TTL loop : " + str (ttl ))
701
+
643
702
raise ClusterError ('TTL exhausted.' )
644
703
645
704
def _execute_command_on_nodes (self , nodes , * args , ** kwargs ):
0 commit comments