diff --git a/rediscluster/client.py b/rediscluster/client.py index cb8f59de..659b1244 100644 --- a/rediscluster/client.py +++ b/rediscluster/client.py @@ -172,7 +172,7 @@ class RedisCluster(Redis): # Not complete, but covers the major ones # https://redis.io/commands - READ_COMMANDS = [ + READ_COMMANDS = frozenset([ "BITCOUNT", "BITPOS", "EXISTS", @@ -212,7 +212,7 @@ class RedisCluster(Redis): "ZCOUNT", "ZRANGE", "ZSCORE" - ] + ]) RESULT_CALLBACKS = dict_merge( string_keys_to_dict([ @@ -461,6 +461,7 @@ def pipeline(self, transaction=None, shard_hint=None): result_callbacks=self.result_callbacks, response_callbacks=self.response_callbacks, cluster_down_retry_attempts=self.cluster_down_retry_attempts, + read_from_replicas=self.read_from_replicas, ) def transaction(self, *args, **kwargs): @@ -578,7 +579,6 @@ def _execute_command(self, *args, **kwargs): redirect_addr = None asking = False - is_read_replica = False try_random_node = False slot = self._determine_slot(*args) @@ -605,7 +605,6 @@ def _execute_command(self, *args, **kwargs): slot, self.read_from_replicas and (command in self.READ_COMMANDS) ) - is_read_replica = node['server_type'] == 'slave' connection = self.connection_pool.get_connection_by_node(node) @@ -615,12 +614,6 @@ def _execute_command(self, *args, **kwargs): connection.send_command('ASKING') self.parse_response(connection, "ASKING", **kwargs) asking = False - if is_read_replica: - # Ask read replica to accept reads (see https://redis.io/commands/readonly) - # TODO: do we need to handle errors from this response? - connection.send_command('READONLY') - self.parse_response(connection, 'READONLY', **kwargs) - is_read_replica = False connection.send_command(*args) return self.parse_response(connection, command, **kwargs) @@ -687,8 +680,8 @@ def _execute_command(self, *args, **kwargs): self.refresh_table_asap = True self.connection_pool.nodes.increment_reinitialize_counter() - node = self.connection_pool.nodes.set_node(e.host, e.port, server_type='master') - self.connection_pool.nodes.slots[e.slot_id][0] = node + node = self.connection_pool.nodes.get_node(e.host, e.port, server_type='master') + self.connection_pool.nodes.move_slot_to_node(e.slot_id, node) except TryAgainError as e: log.exception("TryAgainError") diff --git a/rediscluster/connection.py b/rediscluster/connection.py index 561e7657..f06eae09 100644 --- a/rediscluster/connection.py +++ b/rediscluster/connection.py @@ -343,6 +343,11 @@ def get_connection_by_node(self, node): connection = self._available_connections.get(node["name"], []).pop() except IndexError: connection = self.make_connection(node) + server_type = node.get("server_type", "master") + if server_type == "slave": + connection.send_command('READONLY') + if nativestr(connection.read_response()) != 'OK': + raise ConnectionError('READONLY command failed') self._in_use_connections.setdefault(node["name"], set()).add(connection) diff --git a/rediscluster/nodemanager.py b/rediscluster/nodemanager.py index 0a53e309..7444fa53 100644 --- a/rediscluster/nodemanager.py +++ b/rediscluster/nodemanager.py @@ -39,6 +39,7 @@ def __init__(self, startup_nodes=None, reinitialize_steps=None, skip_full_covera self.connection_kwargs = connection_kwargs self.nodes = {} self.slots = {} + self.slave_nodes_by_master = {} self.startup_nodes = [] if startup_nodes is None else startup_nodes self.orig_startup_nodes = [node for node in self.startup_nodes] self.reinitialize_counter = 0 @@ -257,6 +258,8 @@ def initialize(self): node, node_name = self.make_node_obj(master_node[0], master_node[1], 'master') nodes_cache[node_name] = node + self.slave_nodes_by_master[node_name] = [] + for i in range(int(slot[0]), int(slot[1]) + 1): if i not in tmp_slots: tmp_slots[i] = [node] @@ -267,6 +270,7 @@ def initialize(self): target_slave_node, slave_node_name = self.make_node_obj(slave_node[0], slave_node[1], 'slave') nodes_cache[slave_node_name] = target_slave_node tmp_slots[i].append(target_slave_node) + self.slave_nodes_by_master[node_name].append(slave_node_name) else: # Validate that 2 nodes want to use the same slot cache setup if tmp_slots[i][0]['name'] != node['name']: @@ -409,6 +413,25 @@ def set_node(self, host, port, server_type=None): return node + def get_node(self, host, port, server_type=None): + node, node_name = self.make_node_obj(host, port, server_type) + if node_name not in self.nodes: + self.nodes[node_name] = node + return self.nodes[node_name] + + def move_slot_to_node(self, slot, node): + """ + When moved response received, we should move all replicas with the master to the new slot. + """ + node_name = node['name'] + self.slots[slot] = [node] + slave_nodes = self.slave_nodes_by_master.get(node_name) + if slave_nodes: + for slave_name in slave_nodes: + slave_node = self.nodes.get(slave_name) + if slave_node: + self.slots[slot].append(slave_node) + def populate_startup_nodes(self): """ Do something with all startup nodes and filters out any duplicates diff --git a/rediscluster/pipeline.py b/rediscluster/pipeline.py index 1b7065fc..1fa84a4c 100644 --- a/rediscluster/pipeline.py +++ b/rediscluster/pipeline.py @@ -2,6 +2,7 @@ # python std lib import sys +import logging # rediscluster imports from .client import RedisCluster @@ -15,6 +16,10 @@ from redis.exceptions import ConnectionError, RedisError, TimeoutError from redis._compat import imap, unicode +from gevent import monkey; monkey.patch_all() +import gevent + +log = logging.getLogger(__name__) ERRORS_ALLOW_RETRY = (ConnectionError, TimeoutError, MovedError, AskError, TryAgainError) @@ -24,7 +29,8 @@ class ClusterPipeline(RedisCluster): """ def __init__(self, connection_pool, result_callbacks=None, - response_callbacks=None, startup_nodes=None, read_from_replicas=False, cluster_down_retry_attempts=3): + response_callbacks=None, startup_nodes=None, read_from_replicas=False, cluster_down_retry_attempts=3, + max_redirects=5, use_gevent=False): """ """ self.command_stack = [] @@ -37,6 +43,8 @@ def __init__(self, connection_pool, result_callbacks=None, self.response_callbacks = dict_merge(response_callbacks or self.__class__.RESPONSE_CALLBACKS.copy(), self.CLUSTER_COMMANDS_RESPONSE_CALLBACKS) self.cluster_down_retry_attempts = cluster_down_retry_attempts + self.max_redirects = max_redirects + self.use_gevent = use_gevent def __repr__(self): """ @@ -165,6 +173,8 @@ def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=T stack, raise_on_error=raise_on_error, allow_redirections=allow_redirections, + max_redirects=self.max_redirects, + use_gevent=self.use_gevent, ) except ClusterDownError: # Try again with the new cluster setup. All other errors @@ -174,71 +184,137 @@ def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=T # If it fails the configured number of times then raise exception back to caller of this method raise ClusterDownError("CLUSTERDOWN error. Unable to rebuild the cluster") - def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=True): - """ - Send a bunch of cluster commands to the redis cluster. + def _execute_node_commands(self, n): + n.write() - `allow_redirections` If the pipeline should follow `ASK` & `MOVED` responses - automatically. If set to false it will raise RedisClusterException. - """ - # the first time sending the commands we send all of the commands that were queued up. - # if we have to run through it again, we only retry the commands that failed. - attempt = sorted(stack, key=lambda x: x.position) + n.read() - # build a list of node objects based on node names we need to + def _get_commands_by_node(self, cmds): nodes = {} + proxy_node_by_master = {} + connection_by_node = {} - # as we move through each command that still needs to be processed, - # we figure out the slot number that command maps to, then from the slot determine the node. - for c in attempt: + for c in cmds: # refer to our internal node -> slot table that tells us where a given # command should route to. slot = self._determine_slot(*c.args) - node = self.connection_pool.get_node_by_slot(slot) - # little hack to make sure the node name is populated. probably could clean this up. - self.connection_pool.nodes.set_node_name(node) + master_node = self.connection_pool.get_node_by_slot(slot) + + # for the same master_node, it should always get the same proxy_node to group + # as many commands as possible per node + if master_node['name'] in proxy_node_by_master: + node = proxy_node_by_master[master_node['name']] + else: + command = c.args[0] + read_from_replicas = self.read_from_replicas and (command in self.READ_COMMANDS) + node = self.connection_pool.get_node_by_slot(slot, read_from_replicas) + proxy_node_by_master[master_node['name']] = node + + # little hack to make sure the node name is populated. probably could clean this up. + self.connection_pool.nodes.set_node_name(node) - # now that we know the name of the node ( it's just a string in the form of host:port ) - # we can build a list of commands for each node. node_name = node['name'] if node_name not in nodes: - nodes[node_name] = NodeCommands(self.parse_response, self.connection_pool.get_connection_by_node(node)) + if node_name in connection_by_node: + connection = connection_by_node[node_name] + else: + connection = self.connection_pool.get_connection_by_node(node) + connection_by_node[node_name] = connection + nodes[node_name] = NodeCommands(self.parse_response, connection) nodes[node_name].append(c) - # send the commands in sequence. - # we write to all the open sockets for each node first, before reading anything - # this allows us to flush all the requests out across the network essentially in parallel - # so that we can read them all in parallel as they come back. - # we dont' multiplex on the sockets as they come available, but that shouldn't make too much difference. - node_commands = nodes.values() - for n in node_commands: - n.write() - - for n in node_commands: - n.read() - - # release all of the redis connections we allocated earlier back into the connection pool. - # we used to do this step as part of a try/finally block, but it is really dangerous to - # release connections back into the pool if for some reason the socket has data still left in it - # from a previous operation. The write and read operations already have try/catch around them for - # all known types of errors including connection and socket level errors. - # So if we hit an exception, something really bad happened and putting any of - # these connections back into the pool is a very bad idea. - # the socket might have unread buffer still sitting in it, and then the - # next time we read from it we pass the buffered result back from a previous - # command and every single request after to that connection will always get - # a mismatched result. (not just theoretical, I saw this happen on production x.x). - for n in nodes.values(): - self.connection_pool.release(n.connection) + return nodes, connection_by_node + + def _execute_single_command(self, cmd): + try: + # send each command individually like we do in the main client. + cmd.result = super(ClusterPipeline, self).execute_command(*cmd.args, **cmd.options) + except RedisError as e: + cmd.result = e + + def _send_cluster_commands( + self, + stack, + raise_on_error=True, + allow_redirections=True, + max_redirects=5, + use_gevent=True, + ): + """ + Send a bunch of cluster commands to the redis cluster. + + `allow_redirections` If the pipeline should follow `ASK` & `MOVED` responses + automatically. If set to false it will raise RedisClusterException. + """ + # the first time sending the commands we send all of the commands that were queued up. + # if we have to run through it again, we only retry the commands that failed. + cmds = sorted(stack, key=lambda x: x.position) + + cur_attempt = 0 + + while cur_attempt < max_redirects: + # build a list of node objects based on node names we need to + nodes, connection_by_node = self._get_commands_by_node(cmds) + + # use gevent to execute node commands in parallel + # this would allow the commands serialization(pack_commands) + # and deserialization(parse_response) to run on multiple cores and make things faster + + node_commands = nodes.values() + + if use_gevent: + events = [] + for n in node_commands: + events.append(gevent.spawn(self._execute_node_commands, n)) + + gevent.joinall(events) + else: + for n in node_commands: + n.write() + + for n in node_commands: + n.read() + + # release all of the redis connections we allocated earlier back into the connection pool. + # we used to do this step as part of a try/finally block, but it is really dangerous to + # release connections back into the pool if for some reason the socket has data still left in it + # from a previous operation. The write and read operations already have try/catch around them for + # all known types of errors including connection and socket level errors. + # So if we hit an exception, something really bad happened and putting any of + # these connections back into the pool is a very bad idea. + # the socket might have unread buffer still sitting in it, and then the + # next time we read from it we pass the buffered result back from a previous + # command and every single request after to that connection will always get + # a mismatched result. (not just theoretical, I saw this happen on production x.x). + for conn in connection_by_node.values(): + self.connection_pool.release(conn) + + # will regroup moved commands and retry using pipeline(stacked commands) + # this would increase the pipeline performance a lot + moved_cmds = [] + for c in cmds: + if isinstance(c.result, MovedError): + e = c.result + node = self.connection_pool.nodes.get_node(e.host, e.port, server_type='master') + self.connection_pool.nodes.move_slot_to_node(e.slot_id, node) + + moved_cmds.append(c) + + if moved_cmds: + cur_attempt += 1 + cmds = sorted(moved_cmds, key=lambda x: x.position) + continue + + break # if the response isn't an exception it is a valid response from the node # we're all done with that command, YAY! # if we have more commands to attempt, we've run into problems. # collect all the commands we are allowed to retry. # (MOVED, ASK, or connection errors or timeout errors) - attempt = sorted([c for c in attempt if isinstance(c.result, ERRORS_ALLOW_RETRY)], key=lambda x: x.position) + attempt = sorted([c for c in stack if isinstance(c.result, ERRORS_ALLOW_RETRY)], key=lambda x: x.position) if attempt and allow_redirections: # RETRY MAGIC HAPPENS HERE! # send these remaing comamnds one at a time using `execute_command` @@ -255,13 +331,22 @@ def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections= # If a lot of commands have failed, we'll be setting the # flag to rebuild the slots table from scratch. So MOVED errors should # correct themselves fairly quickly. + + # with the previous redirect retries, I could barely see the slow mode happening again + log.info("pipeline in slow mode to execute failed commands: {}".format([c.result for c in attempt])) + self.connection_pool.nodes.increment_reinitialize_counter(len(attempt)) - for c in attempt: - try: - # send each command individually like we do in the main client. - c.result = super(ClusterPipeline, self).execute_command(*c.args, **c.options) - except RedisError as e: - c.result = e + + if use_gevent: + # even in the slow mode, we could use gevent to make things faster + events = [] + for c in attempt: + events.append(gevent.spawn(self._execute_single_command, c)) + + gevent.joinall(events) + else: + for c in attempt: + self._execute_single_command(c) # turn the response back into a simple flat array that corresponds # to the sequence of commands issued in the stack in pipeline.execute() diff --git a/requirements.txt b/requirements.txt index 73054e91..f3d2e8f5 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1 +1,3 @@ redis>=3.0.0,<4.0.0 +gevent +greenlet \ No newline at end of file diff --git a/setup.py b/setup.py index f58c7f17..07d97e2f 100644 --- a/setup.py +++ b/setup.py @@ -20,7 +20,7 @@ setup( name="redis-py-cluster", - version="2.1.0", + version="2.1.1", description="Library for communicating with Redis Clusters. Built on top of redis-py lib", long_description=readme + '\n\n' + history, long_description_content_type="text/markdown", @@ -32,7 +32,9 @@ url='http://github.com/grokzen/redis-py-cluster', license='MIT', install_requires=[ - 'redis>=3.0.0,<4.0.0' + 'redis>=3.0.0,<4.0.0', + 'gevent', + 'greenlet', ], python_requires=">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4", extras_require={