diff --git a/rediscluster/client.py b/rediscluster/client.py index 844cb4c9..49a82dbf 100644 --- a/rediscluster/client.py +++ b/rediscluster/client.py @@ -300,7 +300,8 @@ class RedisCluster(Redis): def __init__(self, host=None, port=None, startup_nodes=None, max_connections=None, max_connections_per_node=False, init_slot_cache=True, readonly_mode=False, reinitialize_steps=None, skip_full_coverage_check=False, nodemanager_follow_cluster=False, - connection_class=None, read_from_replicas=False, cluster_down_retry_attempts=3, host_port_remap=None, **kwargs): + nodemanager_random_cluster_nodes=False, connection_class=None, read_from_replicas=False, cluster_down_retry_attempts=3, + host_port_remap=None, **kwargs): """ :startup_nodes: List of nodes that initial bootstrapping can be done from @@ -319,6 +320,9 @@ def __init__(self, host=None, port=None, startup_nodes=None, max_connections=Non The node manager will during initialization try the last set of nodes that it was operating on. This will allow the client to drift along side the cluster if the cluster nodes move around alot. + :nodemanager_random_cluster_nodes: + The node manager will use disorder nodes during initialization, avoid all + cluster slots command are send to first node. :**kwargs: Extra arguments that will be sent into Redis instance when created (See Official redis-py doc for supported kwargs @@ -373,6 +377,7 @@ def __init__(self, host=None, port=None, startup_nodes=None, max_connections=Non max_connections_per_node=max_connections_per_node, skip_full_coverage_check=skip_full_coverage_check, nodemanager_follow_cluster=nodemanager_follow_cluster, + nodemanager_random_cluster_nodes=nodemanager_random_cluster_nodes, connection_class=connection_class, host_port_remap=host_port_remap, **kwargs diff --git a/rediscluster/connection.py b/rediscluster/connection.py index 70dadd0b..ad290e5f 100644 --- a/rediscluster/connection.py +++ b/rediscluster/connection.py @@ -112,7 +112,8 @@ class ClusterConnectionPool(ConnectionPool): def __init__(self, startup_nodes=None, init_slot_cache=True, connection_class=None, max_connections=None, max_connections_per_node=False, reinitialize_steps=None, - skip_full_coverage_check=False, nodemanager_follow_cluster=False, host_port_remap=None, + skip_full_coverage_check=False, nodemanager_follow_cluster=False, + nodemanager_random_cluster_nodes=False, host_port_remap=None, **connection_kwargs): """ :skip_full_coverage_check: @@ -122,6 +123,9 @@ def __init__(self, startup_nodes=None, init_slot_cache=True, connection_class=No The node manager will during initialization try the last set of nodes that it was operating on. This will allow the client to drift along side the cluster if the cluster nodes move around a lot. + :nodemanager_random_cluster_nodes: + The node manager will use disorder nodes during initialization, avoid all + cluster slots command are send to first node. """ log.debug("Creating new ClusterConnectionPool instance") @@ -152,6 +156,7 @@ def __init__(self, startup_nodes=None, init_slot_cache=True, connection_class=No skip_full_coverage_check=skip_full_coverage_check, max_connections=self.max_connections, nodemanager_follow_cluster=nodemanager_follow_cluster, + nodemanager_random_cluster_nodes=nodemanager_random_cluster_nodes, host_port_remap=host_port_remap, **connection_kwargs ) @@ -422,7 +427,7 @@ class ClusterBlockingConnectionPool(ClusterConnectionPool): def __init__(self, startup_nodes=None, init_slot_cache=True, connection_class=None, max_connections=50, max_connections_per_node=False, reinitialize_steps=None, skip_full_coverage_check=False, nodemanager_follow_cluster=False, - timeout=20, **connection_kwargs): + nodemanager_random_cluster_nodes=False, timeout=20, **connection_kwargs): self.timeout = timeout super(ClusterBlockingConnectionPool, self).__init__( @@ -434,6 +439,7 @@ def __init__(self, startup_nodes=None, init_slot_cache=True, connection_class=No reinitialize_steps=reinitialize_steps, skip_full_coverage_check=skip_full_coverage_check, nodemanager_follow_cluster=nodemanager_follow_cluster, + nodemanager_random_cluster_nodes=nodemanager_random_cluster_nodes, **connection_kwargs ) @@ -566,7 +572,8 @@ class ClusterReadOnlyConnectionPool(ClusterConnectionPool): """ def __init__(self, startup_nodes=None, init_slot_cache=True, connection_class=None, - max_connections=None, nodemanager_follow_cluster=False, **connection_kwargs): + max_connections=None, nodemanager_follow_cluster=False, + nodemanager_random_cluster_nodes=False, **connection_kwargs): """ """ if connection_class is None: @@ -578,6 +585,7 @@ def __init__(self, startup_nodes=None, init_slot_cache=True, connection_class=No max_connections=max_connections, readonly=True, nodemanager_follow_cluster=nodemanager_follow_cluster, + nodemanager_random_cluster_nodes=nodemanager_random_cluster_nodes, **connection_kwargs ) diff --git a/rediscluster/nodemanager.py b/rediscluster/nodemanager.py index 9fe2b13a..6f63a0d8 100644 --- a/rediscluster/nodemanager.py +++ b/rediscluster/nodemanager.py @@ -23,7 +23,8 @@ class NodeManager(object): """ REDIS_CLUSTER_HASH_SLOTS = 16384 - def __init__(self, startup_nodes=None, reinitialize_steps=None, skip_full_coverage_check=False, nodemanager_follow_cluster=False, + def __init__(self, startup_nodes=None, reinitialize_steps=None, skip_full_coverage_check=False, + nodemanager_follow_cluster=False, nodemanager_random_cluster_nodes=False, host_port_remap=None, **connection_kwargs): """ :skip_full_coverage_check: @@ -33,6 +34,9 @@ def __init__(self, startup_nodes=None, reinitialize_steps=None, skip_full_covera The node manager will during initialization try the last set of nodes that it was operating on. This will allow the client to drift along side the cluster if the cluster nodes move around alot. + :nodemanager_random_cluster_nodes: + The node manager will use disorder nodes during initialization, avoid all + cluster slots command are send to first node. """ log.debug("Creating new NodeManager instance") @@ -45,6 +49,7 @@ def __init__(self, startup_nodes=None, reinitialize_steps=None, skip_full_covera self.reinitialize_steps = reinitialize_steps or 25 self._skip_full_coverage_check = skip_full_coverage_check self.nodemanager_follow_cluster = nodemanager_follow_cluster + self.nodemanager_random_cluster_nodes = nodemanager_random_cluster_nodes self.encoder = Encoder( connection_kwargs.get('encoding', 'utf-8'), connection_kwargs.get('encoding_errors', 'strict'), @@ -219,7 +224,8 @@ def initialize(self): # With this option the client will attempt to connect to any of the previous set of nodes instead of the original set of nodes if self.nodemanager_follow_cluster: nodes = self.startup_nodes - + if self.nodemanager_random_cluster_nodes: + random.shuffle(nodes) for node in nodes: try: r = self.get_redis_link(host=node["host"], port=node["port"], decode_responses=True)