Skip to content
This repository was archived by the owner on Jan 9, 2024. It is now read-only.

Reddit patched: Pipeline optimizations #406

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 5 additions & 12 deletions rediscluster/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ class RedisCluster(Redis):

# Not complete, but covers the major ones
# https://redis.io/commands
READ_COMMANDS = [
READ_COMMANDS = frozenset([
"BITCOUNT",
"BITPOS",
"EXISTS",
Expand Down Expand Up @@ -212,7 +212,7 @@ class RedisCluster(Redis):
"ZCOUNT",
"ZRANGE",
"ZSCORE"
]
])

RESULT_CALLBACKS = dict_merge(
string_keys_to_dict([
Expand Down Expand Up @@ -461,6 +461,7 @@ def pipeline(self, transaction=None, shard_hint=None):
result_callbacks=self.result_callbacks,
response_callbacks=self.response_callbacks,
cluster_down_retry_attempts=self.cluster_down_retry_attempts,
read_from_replicas=self.read_from_replicas,
)

def transaction(self, *args, **kwargs):
Expand Down Expand Up @@ -578,7 +579,6 @@ def _execute_command(self, *args, **kwargs):

redirect_addr = None
asking = False
is_read_replica = False

try_random_node = False
slot = self._determine_slot(*args)
Expand All @@ -605,7 +605,6 @@ def _execute_command(self, *args, **kwargs):
slot,
self.read_from_replicas and (command in self.READ_COMMANDS)
)
is_read_replica = node['server_type'] == 'slave'

connection = self.connection_pool.get_connection_by_node(node)

Expand All @@ -615,12 +614,6 @@ def _execute_command(self, *args, **kwargs):
connection.send_command('ASKING')
self.parse_response(connection, "ASKING", **kwargs)
asking = False
if is_read_replica:
# Ask read replica to accept reads (see https://redis.io/commands/readonly)
# TODO: do we need to handle errors from this response?
connection.send_command('READONLY')
self.parse_response(connection, 'READONLY', **kwargs)
is_read_replica = False

connection.send_command(*args)
return self.parse_response(connection, command, **kwargs)
Expand Down Expand Up @@ -687,8 +680,8 @@ def _execute_command(self, *args, **kwargs):
self.refresh_table_asap = True
self.connection_pool.nodes.increment_reinitialize_counter()

node = self.connection_pool.nodes.set_node(e.host, e.port, server_type='master')
self.connection_pool.nodes.slots[e.slot_id][0] = node
node = self.connection_pool.nodes.get_node(e.host, e.port, server_type='master')
self.connection_pool.nodes.move_slot_to_node(e.slot_id, node)
except TryAgainError as e:
log.exception("TryAgainError")

Expand Down
5 changes: 5 additions & 0 deletions rediscluster/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,6 +343,11 @@ def get_connection_by_node(self, node):
connection = self._available_connections.get(node["name"], []).pop()
except IndexError:
connection = self.make_connection(node)
server_type = node.get("server_type", "master")
if server_type == "slave":
connection.send_command('READONLY')
if nativestr(connection.read_response()) != 'OK':
raise ConnectionError('READONLY command failed')

self._in_use_connections.setdefault(node["name"], set()).add(connection)

Expand Down
23 changes: 23 additions & 0 deletions rediscluster/nodemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def __init__(self, startup_nodes=None, reinitialize_steps=None, skip_full_covera
self.connection_kwargs = connection_kwargs
self.nodes = {}
self.slots = {}
self.slave_nodes_by_master = {}
self.startup_nodes = [] if startup_nodes is None else startup_nodes
self.orig_startup_nodes = [node for node in self.startup_nodes]
self.reinitialize_counter = 0
Expand Down Expand Up @@ -257,6 +258,8 @@ def initialize(self):
node, node_name = self.make_node_obj(master_node[0], master_node[1], 'master')
nodes_cache[node_name] = node

self.slave_nodes_by_master[node_name] = []

for i in range(int(slot[0]), int(slot[1]) + 1):
if i not in tmp_slots:
tmp_slots[i] = [node]
Expand All @@ -267,6 +270,7 @@ def initialize(self):
target_slave_node, slave_node_name = self.make_node_obj(slave_node[0], slave_node[1], 'slave')
nodes_cache[slave_node_name] = target_slave_node
tmp_slots[i].append(target_slave_node)
self.slave_nodes_by_master[node_name].append(slave_node_name)
else:
# Validate that 2 nodes want to use the same slot cache setup
if tmp_slots[i][0]['name'] != node['name']:
Expand Down Expand Up @@ -409,6 +413,25 @@ def set_node(self, host, port, server_type=None):

return node

def get_node(self, host, port, server_type=None):
node, node_name = self.make_node_obj(host, port, server_type)
if node_name not in self.nodes:
self.nodes[node_name] = node
return self.nodes[node_name]

def move_slot_to_node(self, slot, node):
"""
When moved response received, we should move all replicas with the master to the new slot.
"""
node_name = node['name']
self.slots[slot] = [node]
slave_nodes = self.slave_nodes_by_master.get(node_name)
if slave_nodes:
for slave_name in slave_nodes:
slave_node = self.nodes.get(slave_name)
if slave_node:
self.slots[slot].append(slave_node)

def populate_startup_nodes(self):
"""
Do something with all startup nodes and filters out any duplicates
Expand Down
189 changes: 137 additions & 52 deletions rediscluster/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

# python std lib
import sys
import logging

# rediscluster imports
from .client import RedisCluster
Expand All @@ -15,6 +16,10 @@
from redis.exceptions import ConnectionError, RedisError, TimeoutError
from redis._compat import imap, unicode

from gevent import monkey; monkey.patch_all()
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not a super fan of this addition tho. I think we had a gevent implementation and a threaded implementation way back but we replaced it with the current version that we have in the main execution code path as it was a major overhead to use any kind of threading or parallelization librar/framework to achive some kind of speed improvements which we never saw that we actually gained unless you ran pipelines against a ton of master nodes, not number of commands but just a big number of nodes.

Basically you will have to defend this new implementation why we should add this one over the existing one.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It will be more efficient if we need to talk to multiple shards/nodes in the pipeline. And for now, it's only used for pipeline commands which should have no impact/overhead on the rest of the logic.

import gevent

log = logging.getLogger(__name__)

ERRORS_ALLOW_RETRY = (ConnectionError, TimeoutError, MovedError, AskError, TryAgainError)

Expand All @@ -24,7 +29,8 @@ class ClusterPipeline(RedisCluster):
"""

def __init__(self, connection_pool, result_callbacks=None,
response_callbacks=None, startup_nodes=None, read_from_replicas=False, cluster_down_retry_attempts=3):
response_callbacks=None, startup_nodes=None, read_from_replicas=False, cluster_down_retry_attempts=3,
max_redirects=5, use_gevent=False):
"""
"""
self.command_stack = []
Expand All @@ -37,6 +43,8 @@ def __init__(self, connection_pool, result_callbacks=None,
self.response_callbacks = dict_merge(response_callbacks or self.__class__.RESPONSE_CALLBACKS.copy(),
self.CLUSTER_COMMANDS_RESPONSE_CALLBACKS)
self.cluster_down_retry_attempts = cluster_down_retry_attempts
self.max_redirects = max_redirects
self.use_gevent = use_gevent

def __repr__(self):
"""
Expand Down Expand Up @@ -165,6 +173,8 @@ def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=T
stack,
raise_on_error=raise_on_error,
allow_redirections=allow_redirections,
max_redirects=self.max_redirects,
use_gevent=self.use_gevent,
)
except ClusterDownError:
# Try again with the new cluster setup. All other errors
Expand All @@ -174,71 +184,137 @@ def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=T
# If it fails the configured number of times then raise exception back to caller of this method
raise ClusterDownError("CLUSTERDOWN error. Unable to rebuild the cluster")

def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=True):
"""
Send a bunch of cluster commands to the redis cluster.
def _execute_node_commands(self, n):
n.write()

`allow_redirections` If the pipeline should follow `ASK` & `MOVED` responses
automatically. If set to false it will raise RedisClusterException.
"""
# the first time sending the commands we send all of the commands that were queued up.
# if we have to run through it again, we only retry the commands that failed.
attempt = sorted(stack, key=lambda x: x.position)
n.read()

# build a list of node objects based on node names we need to
def _get_commands_by_node(self, cmds):
nodes = {}
proxy_node_by_master = {}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I dont really get what this helps with from the docs lines you written about this variable?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proxy_node will be the node to execute the NodeCommands.
My previous PR will just find a random node by get_node_by_slot(slot, True). But this would cause a lot of commands in the same shard would pick different replicas, thus they are not able to be packed efficiently.
So the new logic is:

  1. determine the shard of the slot, the shard is the master_node_name
  2. find a proxy node for the shard: which is proxy_node_by_master
  3. group all commands in the same shard to the proxy_node

connection_by_node = {}

# as we move through each command that still needs to be processed,
# we figure out the slot number that command maps to, then from the slot determine the node.
for c in attempt:
for c in cmds:
# refer to our internal node -> slot table that tells us where a given
# command should route to.
slot = self._determine_slot(*c.args)
node = self.connection_pool.get_node_by_slot(slot)

# little hack to make sure the node name is populated. probably could clean this up.
self.connection_pool.nodes.set_node_name(node)
master_node = self.connection_pool.get_node_by_slot(slot)

# for the same master_node, it should always get the same proxy_node to group
# as many commands as possible per node
if master_node['name'] in proxy_node_by_master:
node = proxy_node_by_master[master_node['name']]
else:
command = c.args[0]
read_from_replicas = self.read_from_replicas and (command in self.READ_COMMANDS)
node = self.connection_pool.get_node_by_slot(slot, read_from_replicas)
proxy_node_by_master[master_node['name']] = node

# little hack to make sure the node name is populated. probably could clean this up.
self.connection_pool.nodes.set_node_name(node)

# now that we know the name of the node ( it's just a string in the form of host:port )
# we can build a list of commands for each node.
node_name = node['name']
if node_name not in nodes:
nodes[node_name] = NodeCommands(self.parse_response, self.connection_pool.get_connection_by_node(node))
if node_name in connection_by_node:
connection = connection_by_node[node_name]
else:
connection = self.connection_pool.get_connection_by_node(node)
connection_by_node[node_name] = connection
nodes[node_name] = NodeCommands(self.parse_response, connection)

nodes[node_name].append(c)

# send the commands in sequence.
# we write to all the open sockets for each node first, before reading anything
# this allows us to flush all the requests out across the network essentially in parallel
# so that we can read them all in parallel as they come back.
# we dont' multiplex on the sockets as they come available, but that shouldn't make too much difference.
node_commands = nodes.values()
for n in node_commands:
n.write()

for n in node_commands:
n.read()

# release all of the redis connections we allocated earlier back into the connection pool.
# we used to do this step as part of a try/finally block, but it is really dangerous to
# release connections back into the pool if for some reason the socket has data still left in it
# from a previous operation. The write and read operations already have try/catch around them for
# all known types of errors including connection and socket level errors.
# So if we hit an exception, something really bad happened and putting any of
# these connections back into the pool is a very bad idea.
# the socket might have unread buffer still sitting in it, and then the
# next time we read from it we pass the buffered result back from a previous
# command and every single request after to that connection will always get
# a mismatched result. (not just theoretical, I saw this happen on production x.x).
for n in nodes.values():
self.connection_pool.release(n.connection)
return nodes, connection_by_node

def _execute_single_command(self, cmd):
try:
# send each command individually like we do in the main client.
cmd.result = super(ClusterPipeline, self).execute_command(*cmd.args, **cmd.options)
except RedisError as e:
cmd.result = e

def _send_cluster_commands(
self,
stack,
raise_on_error=True,
allow_redirections=True,
max_redirects=5,
use_gevent=True,
):
"""
Send a bunch of cluster commands to the redis cluster.

`allow_redirections` If the pipeline should follow `ASK` & `MOVED` responses
automatically. If set to false it will raise RedisClusterException.
"""
# the first time sending the commands we send all of the commands that were queued up.
# if we have to run through it again, we only retry the commands that failed.
cmds = sorted(stack, key=lambda x: x.position)

cur_attempt = 0

while cur_attempt < max_redirects:
# build a list of node objects based on node names we need to
nodes, connection_by_node = self._get_commands_by_node(cmds)

# use gevent to execute node commands in parallel
# this would allow the commands serialization(pack_commands)
# and deserialization(parse_response) to run on multiple cores and make things faster

node_commands = nodes.values()

if use_gevent:
events = []
for n in node_commands:
events.append(gevent.spawn(self._execute_node_commands, n))

gevent.joinall(events)
else:
for n in node_commands:
n.write()

for n in node_commands:
n.read()

# release all of the redis connections we allocated earlier back into the connection pool.
# we used to do this step as part of a try/finally block, but it is really dangerous to
# release connections back into the pool if for some reason the socket has data still left in it
# from a previous operation. The write and read operations already have try/catch around them for
# all known types of errors including connection and socket level errors.
# So if we hit an exception, something really bad happened and putting any of
# these connections back into the pool is a very bad idea.
# the socket might have unread buffer still sitting in it, and then the
# next time we read from it we pass the buffered result back from a previous
# command and every single request after to that connection will always get
# a mismatched result. (not just theoretical, I saw this happen on production x.x).
for conn in connection_by_node.values():
self.connection_pool.release(conn)

# will regroup moved commands and retry using pipeline(stacked commands)
# this would increase the pipeline performance a lot
moved_cmds = []
for c in cmds:
if isinstance(c.result, MovedError):
e = c.result
node = self.connection_pool.nodes.get_node(e.host, e.port, server_type='master')
self.connection_pool.nodes.move_slot_to_node(e.slot_id, node)

moved_cmds.append(c)

if moved_cmds:
cur_attempt += 1
cmds = sorted(moved_cmds, key=lambda x: x.position)
continue

break

# if the response isn't an exception it is a valid response from the node
# we're all done with that command, YAY!
# if we have more commands to attempt, we've run into problems.
# collect all the commands we are allowed to retry.
# (MOVED, ASK, or connection errors or timeout errors)
attempt = sorted([c for c in attempt if isinstance(c.result, ERRORS_ALLOW_RETRY)], key=lambda x: x.position)
attempt = sorted([c for c in stack if isinstance(c.result, ERRORS_ALLOW_RETRY)], key=lambda x: x.position)
if attempt and allow_redirections:
# RETRY MAGIC HAPPENS HERE!
# send these remaing comamnds one at a time using `execute_command`
Expand All @@ -255,13 +331,22 @@ def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=
# If a lot of commands have failed, we'll be setting the
# flag to rebuild the slots table from scratch. So MOVED errors should
# correct themselves fairly quickly.

# with the previous redirect retries, I could barely see the slow mode happening again
log.info("pipeline in slow mode to execute failed commands: {}".format([c.result for c in attempt]))

self.connection_pool.nodes.increment_reinitialize_counter(len(attempt))
for c in attempt:
try:
# send each command individually like we do in the main client.
c.result = super(ClusterPipeline, self).execute_command(*c.args, **c.options)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Did we not handle the moved and ask errors inside this main execution call? Why move that logic up to inside the pipeline when this call used to handle it all before? Did you find out some new case or reason why this old call was either not working or the new solution covers some cases not previously thought of?

Copy link
Author

@duke-cliff duke-cliff Oct 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the moved commands can be grouped(stacked) and executed faster in pipeline. Instead of the regular execute_command will retry it one by one. I see a significant different by doing this. Also, I read the main logic from redis-go, it's doing the same retries: https://github.com/go-redis/redis/blob/master/cluster.go#L1084

I think we have a fairly large set of keys per pipeline, so we had many moved commands and failed in the first pipeline execution, then it will just run into the slow mode. With the redirect retries, I barely see any pipeline command will run into the slow mode any more on our end.

except RedisError as e:
c.result = e

if use_gevent:
# even in the slow mode, we could use gevent to make things faster
events = []
for c in attempt:
events.append(gevent.spawn(self._execute_single_command, c))

gevent.joinall(events)
else:
for c in attempt:
self._execute_single_command(c)

# turn the response back into a simple flat array that corresponds
# to the sequence of commands issued in the stack in pipeline.execute()
Expand Down
2 changes: 2 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
redis>=3.0.0,<4.0.0
gevent
greenlet
Loading