-
Notifications
You must be signed in to change notification settings - Fork 312
Reddit patched: Pipeline optimizations #406
base: master
Are you sure you want to change the base?
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -39,6 +39,7 @@ def __init__(self, startup_nodes=None, reinitialize_steps=None, skip_full_covera | |
self.connection_kwargs = connection_kwargs | ||
self.nodes = {} | ||
self.slots = {} | ||
self.slave_nodes_by_master = {} | ||
self.startup_nodes = [] if startup_nodes is None else startup_nodes | ||
self.orig_startup_nodes = [node for node in self.startup_nodes] | ||
self.reinitialize_counter = 0 | ||
|
@@ -257,6 +258,8 @@ def initialize(self): | |
node, node_name = self.make_node_obj(master_node[0], master_node[1], 'master') | ||
nodes_cache[node_name] = node | ||
|
||
self.slave_nodes_by_master[node_name] = set() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. set? i try to avoid them if possible There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was thinking to lookup the slaves by slave names, it turns out I don't need it eventually. So I think list is perfectly fine here. I will change it. |
||
|
||
for i in range(int(slot[0]), int(slot[1]) + 1): | ||
if i not in tmp_slots: | ||
tmp_slots[i] = [node] | ||
|
@@ -267,6 +270,7 @@ def initialize(self): | |
target_slave_node, slave_node_name = self.make_node_obj(slave_node[0], slave_node[1], 'slave') | ||
nodes_cache[slave_node_name] = target_slave_node | ||
tmp_slots[i].append(target_slave_node) | ||
self.slave_nodes_by_master[node_name].add(slave_node_name) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Am i guessing right that we need this as a convenience tracking data structure to determine what slaves is for each master as we do not directly has that right now? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Exactly, it's for the moved error handling. We should copy the master and the corresponding replicas to the slot. |
||
else: | ||
# Validate that 2 nodes want to use the same slot cache setup | ||
if tmp_slots[i][0]['name'] != node['name']: | ||
|
@@ -409,6 +413,25 @@ def set_node(self, host, port, server_type=None): | |
|
||
return node | ||
|
||
def get_node(self, host, port, server_type=None): | ||
node, node_name = self.make_node_obj(host, port, server_type) | ||
if node_name not in self.nodes: | ||
self.nodes[node_name] = node | ||
return self.nodes[node_name] | ||
|
||
def move_slot_to_node(self, slot, node): | ||
""" | ||
When moved response received, we should move all replicas with the master to the new slot. | ||
""" | ||
node_name = node['name'] | ||
self.slots[slot] = [node] | ||
slave_nodes = self.slave_nodes_by_master.get(node_name) | ||
if slave_nodes: | ||
for slave_name in slave_nodes: | ||
slave_node = self.nodes.get(slave_name) | ||
if slave_node: | ||
self.slots[slot].append(slave_node) | ||
|
||
def populate_startup_nodes(self): | ||
""" | ||
Do something with all startup nodes and filters out any duplicates | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,6 +2,7 @@ | |
|
||
# python std lib | ||
import sys | ||
import logging | ||
|
||
# rediscluster imports | ||
from .client import RedisCluster | ||
|
@@ -15,6 +16,10 @@ | |
from redis.exceptions import ConnectionError, RedisError, TimeoutError | ||
from redis._compat import imap, unicode | ||
|
||
from gevent import monkey; monkey.patch_all() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not a super fan of this addition tho. I think we had a gevent implementation and a threaded implementation way back but we replaced it with the current version that we have in the main execution code path as it was a major overhead to use any kind of threading or parallelization librar/framework to achive some kind of speed improvements which we never saw that we actually gained unless you ran pipelines against a ton of master nodes, not number of commands but just a big number of nodes. Basically you will have to defend this new implementation why we should add this one over the existing one. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It will be more efficient if we need to talk to multiple shards/nodes in the pipeline. And for now, it's only used for pipeline commands which should have no impact/overhead on the rest of the logic. |
||
import gevent | ||
|
||
log = logging.getLogger(__name__) | ||
|
||
ERRORS_ALLOW_RETRY = (ConnectionError, TimeoutError, MovedError, AskError, TryAgainError) | ||
|
||
|
@@ -174,71 +179,127 @@ def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=T | |
# If it fails the configured number of times then raise exception back to caller of this method | ||
raise ClusterDownError("CLUSTERDOWN error. Unable to rebuild the cluster") | ||
|
||
def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=True): | ||
""" | ||
Send a bunch of cluster commands to the redis cluster. | ||
def _execute_node_commands(self, n): | ||
n.write() | ||
|
||
`allow_redirections` If the pipeline should follow `ASK` & `MOVED` responses | ||
automatically. If set to false it will raise RedisClusterException. | ||
""" | ||
# the first time sending the commands we send all of the commands that were queued up. | ||
# if we have to run through it again, we only retry the commands that failed. | ||
attempt = sorted(stack, key=lambda x: x.position) | ||
n.read() | ||
|
||
# build a list of node objects based on node names we need to | ||
def _get_commands_by_node(self, cmds): | ||
nodes = {} | ||
proxy_node_by_master = {} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I dont really get what this helps with from the docs lines you written about this variable? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The proxy_node will be the node to execute the NodeCommands.
|
||
connection_by_node = {} | ||
|
||
# as we move through each command that still needs to be processed, | ||
# we figure out the slot number that command maps to, then from the slot determine the node. | ||
for c in attempt: | ||
for c in cmds: | ||
# refer to our internal node -> slot table that tells us where a given | ||
# command should route to. | ||
slot = self._determine_slot(*c.args) | ||
node = self.connection_pool.get_node_by_slot(slot) | ||
|
||
# little hack to make sure the node name is populated. probably could clean this up. | ||
self.connection_pool.nodes.set_node_name(node) | ||
master_node = self.connection_pool.get_node_by_slot(slot) | ||
|
||
# for the same master_node, it should always get the same proxy_node to group | ||
# as many commands as possible per node | ||
if master_node['name'] in proxy_node_by_master: | ||
node = proxy_node_by_master[master_node['name']] | ||
else: | ||
# TODO: should determine if using replicas by if command is read only | ||
node = self.connection_pool.get_node_by_slot(slot, self.read_from_replicas) | ||
proxy_node_by_master[master_node['name']] = node | ||
|
||
# little hack to make sure the node name is populated. probably could clean this up. | ||
self.connection_pool.nodes.set_node_name(node) | ||
|
||
# now that we know the name of the node ( it's just a string in the form of host:port ) | ||
# we can build a list of commands for each node. | ||
node_name = node['name'] | ||
if node_name not in nodes: | ||
nodes[node_name] = NodeCommands(self.parse_response, self.connection_pool.get_connection_by_node(node)) | ||
if node_name in connection_by_node: | ||
connection = connection_by_node[node_name] | ||
else: | ||
connection = self.connection_pool.get_connection_by_node(node) | ||
connection_by_node[node_name] = connection | ||
nodes[node_name] = NodeCommands(self.parse_response, connection) | ||
|
||
nodes[node_name].append(c) | ||
|
||
# send the commands in sequence. | ||
# we write to all the open sockets for each node first, before reading anything | ||
# this allows us to flush all the requests out across the network essentially in parallel | ||
# so that we can read them all in parallel as they come back. | ||
# we dont' multiplex on the sockets as they come available, but that shouldn't make too much difference. | ||
node_commands = nodes.values() | ||
for n in node_commands: | ||
n.write() | ||
|
||
for n in node_commands: | ||
n.read() | ||
|
||
# release all of the redis connections we allocated earlier back into the connection pool. | ||
# we used to do this step as part of a try/finally block, but it is really dangerous to | ||
# release connections back into the pool if for some reason the socket has data still left in it | ||
# from a previous operation. The write and read operations already have try/catch around them for | ||
# all known types of errors including connection and socket level errors. | ||
# So if we hit an exception, something really bad happened and putting any of | ||
# these connections back into the pool is a very bad idea. | ||
# the socket might have unread buffer still sitting in it, and then the | ||
# next time we read from it we pass the buffered result back from a previous | ||
# command and every single request after to that connection will always get | ||
# a mismatched result. (not just theoretical, I saw this happen on production x.x). | ||
for n in nodes.values(): | ||
self.connection_pool.release(n.connection) | ||
return nodes, connection_by_node | ||
|
||
def _execute_single_command(self, cmd): | ||
try: | ||
# send each command individually like we do in the main client. | ||
cmd.result = super(ClusterPipeline, self).execute_command(*cmd.args, **cmd.options) | ||
except RedisError as e: | ||
cmd.result = e | ||
|
||
def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=True): | ||
""" | ||
Send a bunch of cluster commands to the redis cluster. | ||
|
||
`allow_redirections` If the pipeline should follow `ASK` & `MOVED` responses | ||
automatically. If set to false it will raise RedisClusterException. | ||
""" | ||
# the first time sending the commands we send all of the commands that were queued up. | ||
# if we have to run through it again, we only retry the commands that failed. | ||
cmds = sorted(stack, key=lambda x: x.position) | ||
|
||
max_redirects = 5 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Number of redirections should be configurable in as a class variable and not hard coded here. Also the main execution loop has 16 TTL rounds before failing out. This has 5 and that is possibly a discrepancy, yes if you get that many moved/ask errors then something is probably more broken then how this code handles the case :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, it's an initial PR which I just list out all the things. This would be read from the config for sure. |
||
cur_attempt = 0 | ||
|
||
while cur_attempt < max_redirects: | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. minor lint issue, no blank line after a while statement |
||
# build a list of node objects based on node names we need to | ||
nodes, connection_by_node = self._get_commands_by_node(cmds) | ||
|
||
# send the commands in sequence. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Old docs that is not relevant with your new solution? |
||
# we write to all the open sockets for each node first, before reading anything | ||
# this allows us to flush all the requests out across the network essentially in parallel | ||
# so that we can read them all in parallel as they come back. | ||
# we dont' multiplex on the sockets as they come available, but that shouldn't make too much difference. | ||
|
||
# duke-cliff: I think it would still be faster if we use gevent to make the command in parallel | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be a comment here in the PR, not inside the code with speculation that it might be faster. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will remove it. |
||
# the io is non-blocking, but serialization/deserialization will still be blocking previously | ||
node_commands = nodes.values() | ||
events = [] | ||
for n in node_commands: | ||
events.append(gevent.spawn(self._execute_node_commands, n)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Wrapping gevent around this is not really nessesary? You are still doing the same There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. When using non-blocking, it only means the thread would not be blocked by the IO, but you still need CPU time to serialize/deserialize data to/from the socket. So in this case, if you call redis.packed_commands(....) for multiple I said this part is nice to have, if you don't want it in the public version, we can talk about whether to give an option to the user or remove it. |
||
|
||
gevent.joinall(events) | ||
|
||
# release all of the redis connections we allocated earlier back into the connection pool. | ||
# we used to do this step as part of a try/finally block, but it is really dangerous to | ||
# release connections back into the pool if for some reason the socket has data still left in it | ||
# from a previous operation. The write and read operations already have try/catch around them for | ||
# all known types of errors including connection and socket level errors. | ||
# So if we hit an exception, something really bad happened and putting any of | ||
# these connections back into the pool is a very bad idea. | ||
# the socket might have unread buffer still sitting in it, and then the | ||
# next time we read from it we pass the buffered result back from a previous | ||
# command and every single request after to that connection will always get | ||
# a mismatched result. (not just theoretical, I saw this happen on production x.x). | ||
for conn in connection_by_node.values(): | ||
self.connection_pool.release(conn) | ||
|
||
# will regroup moved commands and retry using pipeline(stacked commands) | ||
# this would increase the pipeline performance a lot | ||
moved_cmds = [] | ||
for c in cmds: | ||
if isinstance(c.result, MovedError): | ||
e = c.result | ||
node = self.connection_pool.nodes.get_node(e.host, e.port, server_type='master') | ||
self.connection_pool.nodes.move_slot_to_node(e.slot_id, node) | ||
|
||
moved_cmds.append(c) | ||
|
||
if moved_cmds: | ||
cur_attempt += 1 | ||
cmds = sorted(moved_cmds, key=lambda x: x.position) | ||
continue | ||
|
||
break | ||
|
||
# if the response isn't an exception it is a valid response from the node | ||
# we're all done with that command, YAY! | ||
# if we have more commands to attempt, we've run into problems. | ||
# collect all the commands we are allowed to retry. | ||
# (MOVED, ASK, or connection errors or timeout errors) | ||
attempt = sorted([c for c in attempt if isinstance(c.result, ERRORS_ALLOW_RETRY)], key=lambda x: x.position) | ||
attempt = sorted([c for c in stack if isinstance(c.result, ERRORS_ALLOW_RETRY)], key=lambda x: x.position) | ||
if attempt and allow_redirections: | ||
# RETRY MAGIC HAPPENS HERE! | ||
# send these remaing comamnds one at a time using `execute_command` | ||
|
@@ -255,13 +316,19 @@ def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections= | |
# If a lot of commands have failed, we'll be setting the | ||
# flag to rebuild the slots table from scratch. So MOVED errors should | ||
# correct themselves fairly quickly. | ||
|
||
# with the previous redirect retries, I could barely see the slow mode happening again | ||
log.info("pipeline in slow mode to execute failed commands: {}".format([c.result for c in attempt])) | ||
|
||
self.connection_pool.nodes.increment_reinitialize_counter(len(attempt)) | ||
|
||
# even in the slow mode, we could use gevent to make things faster | ||
events = [] | ||
for c in attempt: | ||
try: | ||
# send each command individually like we do in the main client. | ||
c.result = super(ClusterPipeline, self).execute_command(*c.args, **c.options) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Did we not handle the moved and ask errors inside this main execution call? Why move that logic up to inside the pipeline when this call used to handle it all before? Did you find out some new case or reason why this old call was either not working or the new solution covers some cases not previously thought of? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because the moved commands can be grouped(stacked) and executed faster in pipeline. Instead of the regular I think we have a fairly large set of keys per pipeline, so we had many |
||
except RedisError as e: | ||
c.result = e | ||
events.append(gevent.spawn(self._execute_single_command, c)) | ||
|
||
gevent.joinall(events) | ||
|
||
|
||
# turn the response back into a simple flat array that corresponds | ||
# to the sequence of commands issued in the stack in pipeline.execute() | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1 +1,3 @@ | ||
redis>=3.0.0,<4.0.0 | ||
gevent | ||
greenlet |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A set is bad here as manipulating it as a user should technically be possible to manipulate that list to add in things that might not be supported or in the main code in a release yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
True, a
frozenset
would be better here.