-
Notifications
You must be signed in to change notification settings - Fork 312
Reddit patched: Pipeline optimizations #406
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
|
||
# python std lib | ||
import sys | ||
import logging | ||
|
||
# rediscluster imports | ||
from .client import RedisCluster | ||
|
@@ -15,6 +16,10 @@ | |
from redis.exceptions import ConnectionError, RedisError, TimeoutError | ||
from redis._compat import imap, unicode | ||
|
||
from gevent import monkey; monkey.patch_all() | ||
import gevent | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
ERRORS_ALLOW_RETRY = (ConnectionError, TimeoutError, MovedError, AskError, TryAgainError) | ||
|
||
|
@@ -24,7 +29,8 @@ class ClusterPipeline(RedisCluster): | |
""" | ||
|
||
def __init__(self, connection_pool, result_callbacks=None, | ||
response_callbacks=None, startup_nodes=None, read_from_replicas=False, cluster_down_retry_attempts=3): | ||
response_callbacks=None, startup_nodes=None, read_from_replicas=False, cluster_down_retry_attempts=3, | ||
max_redirects=5, use_gevent=False): | ||
""" | ||
""" | ||
self.command_stack = [] | ||
|
@@ -37,6 +43,8 @@ def __init__(self, connection_pool, result_callbacks=None, | |
self.response_callbacks = dict_merge(response_callbacks or self.__class__.RESPONSE_CALLBACKS.copy(), | ||
self.CLUSTER_COMMANDS_RESPONSE_CALLBACKS) | ||
self.cluster_down_retry_attempts = cluster_down_retry_attempts | ||
self.max_redirects = max_redirects | ||
self.use_gevent = use_gevent | ||
|
||
def __repr__(self): | ||
""" | ||
|
@@ -165,6 +173,8 @@ def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=T | |
stack, | ||
raise_on_error=raise_on_error, | ||
allow_redirections=allow_redirections, | ||
max_redirects=self.max_redirects, | ||
use_gevent=self.use_gevent, | ||
) | ||
except ClusterDownError: | ||
# Try again with the new cluster setup. All other errors | ||
|
@@ -174,71 +184,137 @@ def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=T | |
# If it fails the configured number of times then raise exception back to caller of this method | ||
raise ClusterDownError("CLUSTERDOWN error. Unable to rebuild the cluster") | ||
|
||
def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=True): | ||
""" | ||
Send a bunch of cluster commands to the redis cluster. | ||
def _execute_node_commands(self, n): | ||
n.write() | ||
|
||
`allow_redirections` If the pipeline should follow `ASK` & `MOVED` responses | ||
automatically. If set to false it will raise RedisClusterException. | ||
""" | ||
# the first time sending the commands we send all of the commands that were queued up. | ||
# if we have to run through it again, we only retry the commands that failed. | ||
attempt = sorted(stack, key=lambda x: x.position) | ||
n.read() | ||
|
||
# build a list of node objects based on node names we need to | ||
def _get_commands_by_node(self, cmds): | ||
nodes = {} | ||
proxy_node_by_master = {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I dont really get what this helps with from the docs lines you written about this variable? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The proxy_node will be the node to execute the NodeCommands.
|
||
connection_by_node = {} | ||
|
||
# as we move through each command that still needs to be processed, | ||
# we figure out the slot number that command maps to, then from the slot determine the node. | ||
for c in attempt: | ||
for c in cmds: | ||
# refer to our internal node -> slot table that tells us where a given | ||
# command should route to. | ||
slot = self._determine_slot(*c.args) | ||
node = self.connection_pool.get_node_by_slot(slot) | ||
|
||
# little hack to make sure the node name is populated. probably could clean this up. | ||
self.connection_pool.nodes.set_node_name(node) | ||
master_node = self.connection_pool.get_node_by_slot(slot) | ||
|
||
# for the same master_node, it should always get the same proxy_node to group | ||
# as many commands as possible per node | ||
if master_node['name'] in proxy_node_by_master: | ||
node = proxy_node_by_master[master_node['name']] | ||
else: | ||
command = c.args[0] | ||
read_from_replicas = self.read_from_replicas and (command in self.READ_COMMANDS) | ||
node = self.connection_pool.get_node_by_slot(slot, read_from_replicas) | ||
proxy_node_by_master[master_node['name']] = node | ||
|
||
# little hack to make sure the node name is populated. probably could clean this up. | ||
self.connection_pool.nodes.set_node_name(node) | ||
|
||
# now that we know the name of the node ( it's just a string in the form of host:port ) | ||
# we can build a list of commands for each node. | ||
node_name = node['name'] | ||
if node_name not in nodes: | ||
nodes[node_name] = NodeCommands(self.parse_response, self.connection_pool.get_connection_by_node(node)) | ||
if node_name in connection_by_node: | ||
connection = connection_by_node[node_name] | ||
else: | ||
connection = self.connection_pool.get_connection_by_node(node) | ||
connection_by_node[node_name] = connection | ||
nodes[node_name] = NodeCommands(self.parse_response, connection) | ||
|
||
nodes[node_name].append(c) | ||
|
||
# send the commands in sequence. | ||
# we write to all the open sockets for each node first, before reading anything | ||
# this allows us to flush all the requests out across the network essentially in parallel | ||
# so that we can read them all in parallel as they come back. | ||
# we dont' multiplex on the sockets as they come available, but that shouldn't make too much difference. | ||
node_commands = nodes.values() | ||
for n in node_commands: | ||
n.write() | ||
|
||
for n in node_commands: | ||
n.read() | ||
|
||
# release all of the redis connections we allocated earlier back into the connection pool. | ||
# we used to do this step as part of a try/finally block, but it is really dangerous to | ||
# release connections back into the pool if for some reason the socket has data still left in it | ||
# from a previous operation. The write and read operations already have try/catch around them for | ||
# all known types of errors including connection and socket level errors. | ||
# So if we hit an exception, something really bad happened and putting any of | ||
# these connections back into the pool is a very bad idea. | ||
# the socket might have unread buffer still sitting in it, and then the | ||
# next time we read from it we pass the buffered result back from a previous | ||
# command and every single request after to that connection will always get | ||
# a mismatched result. (not just theoretical, I saw this happen on production x.x). | ||
for n in nodes.values(): | ||
self.connection_pool.release(n.connection) | ||
return nodes, connection_by_node | ||
|
||
def _execute_single_command(self, cmd): | ||
try: | ||
# send each command individually like we do in the main client. | ||
cmd.result = super(ClusterPipeline, self).execute_command(*cmd.args, **cmd.options) | ||
except RedisError as e: | ||
cmd.result = e | ||
|
||
def _send_cluster_commands( | ||
self, | ||
stack, | ||
raise_on_error=True, | ||
allow_redirections=True, | ||
max_redirects=5, | ||
use_gevent=True, | ||
): | ||
""" | ||
Send a bunch of cluster commands to the redis cluster. | ||
|
||
`allow_redirections` If the pipeline should follow `ASK` & `MOVED` responses | ||
automatically. If set to false it will raise RedisClusterException. | ||
""" | ||
# the first time sending the commands we send all of the commands that were queued up. | ||
# if we have to run through it again, we only retry the commands that failed. | ||
cmds = sorted(stack, key=lambda x: x.position) | ||
|
||
cur_attempt = 0 | ||
|
||
while cur_attempt < max_redirects: | ||
# build a list of node objects based on node names we need to | ||
nodes, connection_by_node = self._get_commands_by_node(cmds) | ||
|
||
# use gevent to execute node commands in parallel | ||
# this would allow the commands serialization(pack_commands) | ||
# and deserialization(parse_response) to run on multiple cores and make things faster | ||
|
||
node_commands = nodes.values() | ||
|
||
if use_gevent: | ||
events = [] | ||
for n in node_commands: | ||
events.append(gevent.spawn(self._execute_node_commands, n)) | ||
|
||
gevent.joinall(events) | ||
else: | ||
for n in node_commands: | ||
n.write() | ||
|
||
for n in node_commands: | ||
n.read() | ||
|
||
# release all of the redis connections we allocated earlier back into the connection pool. | ||
# we used to do this step as part of a try/finally block, but it is really dangerous to | ||
# release connections back into the pool if for some reason the socket has data still left in it | ||
# from a previous operation. The write and read operations already have try/catch around them for | ||
# all known types of errors including connection and socket level errors. | ||
# So if we hit an exception, something really bad happened and putting any of | ||
# these connections back into the pool is a very bad idea. | ||
# the socket might have unread buffer still sitting in it, and then the | ||
# next time we read from it we pass the buffered result back from a previous | ||
# command and every single request after to that connection will always get | ||
# a mismatched result. (not just theoretical, I saw this happen on production x.x). | ||
for conn in connection_by_node.values(): | ||
self.connection_pool.release(conn) | ||
|
||
# will regroup moved commands and retry using pipeline(stacked commands) | ||
# this would increase the pipeline performance a lot | ||
moved_cmds = [] | ||
for c in cmds: | ||
if isinstance(c.result, MovedError): | ||
e = c.result | ||
node = self.connection_pool.nodes.get_node(e.host, e.port, server_type='master') | ||
self.connection_pool.nodes.move_slot_to_node(e.slot_id, node) | ||
|
||
moved_cmds.append(c) | ||
|
||
if moved_cmds: | ||
cur_attempt += 1 | ||
cmds = sorted(moved_cmds, key=lambda x: x.position) | ||
continue | ||
|
||
break | ||
|
||
# if the response isn't an exception it is a valid response from the node | ||
# we're all done with that command, YAY! | ||
# if we have more commands to attempt, we've run into problems. | ||
# collect all the commands we are allowed to retry. | ||
# (MOVED, ASK, or connection errors or timeout errors) | ||
attempt = sorted([c for c in attempt if isinstance(c.result, ERRORS_ALLOW_RETRY)], key=lambda x: x.position) | ||
attempt = sorted([c for c in stack if isinstance(c.result, ERRORS_ALLOW_RETRY)], key=lambda x: x.position) | ||
if attempt and allow_redirections: | ||
# RETRY MAGIC HAPPENS HERE! | ||
# send these remaing comamnds one at a time using `execute_command` | ||
|
@@ -255,13 +331,22 @@ def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections= | |
# If a lot of commands have failed, we'll be setting the | ||
# flag to rebuild the slots table from scratch. So MOVED errors should | ||
# correct themselves fairly quickly. | ||
|
||
# with the previous redirect retries, I could barely see the slow mode happening again | ||
log.info("pipeline in slow mode to execute failed commands: {}".format([c.result for c in attempt])) | ||
|
||
self.connection_pool.nodes.increment_reinitialize_counter(len(attempt)) | ||
for c in attempt: | ||
try: | ||
# send each command individually like we do in the main client. | ||
c.result = super(ClusterPipeline, self).execute_command(*c.args, **c.options) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did we not handle the moved and ask errors inside this main execution call? Why move that logic up to inside the pipeline when this call used to handle it all before? Did you find out some new case or reason why this old call was either not working or the new solution covers some cases not previously thought of? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because the moved commands can be grouped(stacked) and executed faster in pipeline. Instead of the regular I think we have a fairly large set of keys per pipeline, so we had many |
||
except RedisError as e: | ||
c.result = e | ||
|
||
if use_gevent: | ||
# even in the slow mode, we could use gevent to make things faster | ||
events = [] | ||
for c in attempt: | ||
events.append(gevent.spawn(self._execute_single_command, c)) | ||
|
||
gevent.joinall(events) | ||
else: | ||
for c in attempt: | ||
self._execute_single_command(c) | ||
|
||
# turn the response back into a simple flat array that corresponds | ||
# to the sequence of commands issued in the stack in pipeline.execute() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,3 @@ | ||
redis>=3.0.0,<4.0.0 | ||
gevent | ||
greenlet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not a super fan of this addition tho. I think we had a gevent implementation and a threaded implementation way back but we replaced it with the current version that we have in the main execution code path as it was a major overhead to use any kind of threading or parallelization librar/framework to achive some kind of speed improvements which we never saw that we actually gained unless you ran pipelines against a ton of master nodes, not number of commands but just a big number of nodes.
Basically you will have to defend this new implementation why we should add this one over the existing one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It will be more efficient if we need to talk to multiple shards/nodes in the pipeline. And for now, it's only used for pipeline commands which should have no impact/overhead on the rest of the logic.