Skip to content
This repository was archived by the owner on Jan 9, 2024. It is now read-only.

Commit 5acdb82

Browse files
committed
pipeline optimizations
1 parent 73f27ed commit 5acdb82

File tree

6 files changed

+157
-65
lines changed

6 files changed

+157
-65
lines changed

rediscluster/client.py

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -172,7 +172,7 @@ class RedisCluster(Redis):
172172

173173
# Not complete, but covers the major ones
174174
# https://redis.io/commands
175-
READ_COMMANDS = [
175+
READ_COMMANDS = set([
176176
"BITCOUNT",
177177
"BITPOS",
178178
"EXISTS",
@@ -212,7 +212,7 @@ class RedisCluster(Redis):
212212
"ZCOUNT",
213213
"ZRANGE",
214214
"ZSCORE"
215-
]
215+
])
216216

217217
RESULT_CALLBACKS = dict_merge(
218218
string_keys_to_dict([
@@ -442,7 +442,7 @@ def pubsub(self, **kwargs):
442442
"""
443443
return ClusterPubSub(self.connection_pool, **kwargs)
444444

445-
def pipeline(self, transaction=None, shard_hint=None):
445+
def pipeline(self, transaction=None, shard_hint=None, read_from_replicas=False):
446446
"""
447447
Cluster impl:
448448
Pipelines do not work in cluster mode the same way they do in normal mode.
@@ -461,6 +461,7 @@ def pipeline(self, transaction=None, shard_hint=None):
461461
result_callbacks=self.result_callbacks,
462462
response_callbacks=self.response_callbacks,
463463
cluster_down_retry_attempts=self.cluster_down_retry_attempts,
464+
read_from_replicas=read_from_replicas,
464465
)
465466

466467
def transaction(self, *args, **kwargs):
@@ -578,7 +579,6 @@ def _execute_command(self, *args, **kwargs):
578579

579580
redirect_addr = None
580581
asking = False
581-
is_read_replica = False
582582

583583
try_random_node = False
584584
slot = self._determine_slot(*args)
@@ -605,7 +605,6 @@ def _execute_command(self, *args, **kwargs):
605605
slot,
606606
self.read_from_replicas and (command in self.READ_COMMANDS)
607607
)
608-
is_read_replica = node['server_type'] == 'slave'
609608

610609
connection = self.connection_pool.get_connection_by_node(node)
611610

@@ -615,12 +614,6 @@ def _execute_command(self, *args, **kwargs):
615614
connection.send_command('ASKING')
616615
self.parse_response(connection, "ASKING", **kwargs)
617616
asking = False
618-
if is_read_replica:
619-
# Ask read replica to accept reads (see https://redis.io/commands/readonly)
620-
# TODO: do we need to handle errors from this response?
621-
connection.send_command('READONLY')
622-
self.parse_response(connection, 'READONLY', **kwargs)
623-
is_read_replica = False
624617

625618
connection.send_command(*args)
626619
return self.parse_response(connection, command, **kwargs)
@@ -687,8 +680,8 @@ def _execute_command(self, *args, **kwargs):
687680
self.refresh_table_asap = True
688681
self.connection_pool.nodes.increment_reinitialize_counter()
689682

690-
node = self.connection_pool.nodes.set_node(e.host, e.port, server_type='master')
691-
self.connection_pool.nodes.slots[e.slot_id][0] = node
683+
node = self.connection_pool.nodes.get_node(e.host, e.port, server_type='master')
684+
self.connection_pool.nodes.move_slot_to_node(e.slot_id, node)
692685
except TryAgainError as e:
693686
log.exception("TryAgainError")
694687

rediscluster/connection.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,11 @@ def get_connection_by_node(self, node):
343343
connection = self._available_connections.get(node["name"], []).pop()
344344
except IndexError:
345345
connection = self.make_connection(node)
346+
server_type = node.get("server_type", "master")
347+
if server_type == "slave":
348+
connection.send_command('READONLY')
349+
if nativestr(connection.read_response()) != 'OK':
350+
raise ConnectionError('READONLY command failed')
346351

347352
self._in_use_connections.setdefault(node["name"], set()).add(connection)
348353

rediscluster/nodemanager.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ def __init__(self, startup_nodes=None, reinitialize_steps=None, skip_full_covera
3939
self.connection_kwargs = connection_kwargs
4040
self.nodes = {}
4141
self.slots = {}
42+
self.slave_nodes_by_master = {}
4243
self.startup_nodes = [] if startup_nodes is None else startup_nodes
4344
self.orig_startup_nodes = [node for node in self.startup_nodes]
4445
self.reinitialize_counter = 0
@@ -257,6 +258,8 @@ def initialize(self):
257258
node, node_name = self.make_node_obj(master_node[0], master_node[1], 'master')
258259
nodes_cache[node_name] = node
259260

261+
self.slave_nodes_by_master[node_name] = set()
262+
260263
for i in range(int(slot[0]), int(slot[1]) + 1):
261264
if i not in tmp_slots:
262265
tmp_slots[i] = [node]
@@ -267,6 +270,7 @@ def initialize(self):
267270
target_slave_node, slave_node_name = self.make_node_obj(slave_node[0], slave_node[1], 'slave')
268271
nodes_cache[slave_node_name] = target_slave_node
269272
tmp_slots[i].append(target_slave_node)
273+
self.slave_nodes_by_master[node_name].add(slave_node_name)
270274
else:
271275
# Validate that 2 nodes want to use the same slot cache setup
272276
if tmp_slots[i][0]['name'] != node['name']:
@@ -409,6 +413,25 @@ def set_node(self, host, port, server_type=None):
409413

410414
return node
411415

416+
def get_node(self, host, port, server_type=None):
417+
node, node_name = self.make_node_obj(host, port, server_type)
418+
if node_name not in self.nodes:
419+
self.nodes[node_name] = node
420+
return self.nodes[node_name]
421+
422+
def move_slot_to_node(self, slot, node):
423+
"""
424+
When moved response received, we should move all replicas with the master to the new slot.
425+
"""
426+
node_name = node['name']
427+
self.slots[slot] = [node]
428+
slave_nodes = self.slave_nodes_by_master.get(node_name)
429+
if slave_nodes:
430+
for slave_name in slave_nodes:
431+
slave_node = self.nodes.get(slave_name)
432+
if slave_node:
433+
self.slots[slot].append(slave_node)
434+
412435
def populate_startup_nodes(self):
413436
"""
414437
Do something with all startup nodes and filters out any duplicates

rediscluster/pipeline.py

Lines changed: 117 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
# python std lib
44
import sys
5+
import logging
56

67
# rediscluster imports
78
from .client import RedisCluster
@@ -15,6 +16,10 @@
1516
from redis.exceptions import ConnectionError, RedisError, TimeoutError
1617
from redis._compat import imap, unicode
1718

19+
from gevent import monkey; monkey.patch_all()
20+
import gevent
21+
22+
log = logging.getLogger(__name__)
1823

1924
ERRORS_ALLOW_RETRY = (ConnectionError, TimeoutError, MovedError, AskError, TryAgainError)
2025

@@ -174,71 +179,127 @@ def send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=T
174179
# If it fails the configured number of times then raise exception back to caller of this method
175180
raise ClusterDownError("CLUSTERDOWN error. Unable to rebuild the cluster")
176181

177-
def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=True):
178-
"""
179-
Send a bunch of cluster commands to the redis cluster.
182+
def _execute_node_commands(self, n):
183+
n.write()
180184

181-
`allow_redirections` If the pipeline should follow `ASK` & `MOVED` responses
182-
automatically. If set to false it will raise RedisClusterException.
183-
"""
184-
# the first time sending the commands we send all of the commands that were queued up.
185-
# if we have to run through it again, we only retry the commands that failed.
186-
attempt = sorted(stack, key=lambda x: x.position)
185+
n.read()
187186

188-
# build a list of node objects based on node names we need to
187+
def _get_commands_by_node(self, cmds):
189188
nodes = {}
189+
proxy_node_by_master = {}
190+
connection_by_node = {}
190191

191-
# as we move through each command that still needs to be processed,
192-
# we figure out the slot number that command maps to, then from the slot determine the node.
193-
for c in attempt:
192+
for c in cmds:
194193
# refer to our internal node -> slot table that tells us where a given
195194
# command should route to.
196195
slot = self._determine_slot(*c.args)
197-
node = self.connection_pool.get_node_by_slot(slot)
198196

199-
# little hack to make sure the node name is populated. probably could clean this up.
200-
self.connection_pool.nodes.set_node_name(node)
197+
master_node = self.connection_pool.get_node_by_slot(slot)
198+
199+
# for the same master_node, it should always get the same proxy_node to group
200+
# as many commands as possible per node
201+
if master_node['name'] in proxy_node_by_master:
202+
node = proxy_node_by_master[master_node['name']]
203+
else:
204+
# TODO: should determine if using replicas by if command is read only
205+
node = self.connection_pool.get_node_by_slot(slot, self.read_from_replicas)
206+
proxy_node_by_master[master_node['name']] = node
207+
208+
# little hack to make sure the node name is populated. probably could clean this up.
209+
self.connection_pool.nodes.set_node_name(node)
201210

202-
# now that we know the name of the node ( it's just a string in the form of host:port )
203-
# we can build a list of commands for each node.
204211
node_name = node['name']
205212
if node_name not in nodes:
206-
nodes[node_name] = NodeCommands(self.parse_response, self.connection_pool.get_connection_by_node(node))
213+
if node_name in connection_by_node:
214+
connection = connection_by_node[node_name]
215+
else:
216+
connection = self.connection_pool.get_connection_by_node(node)
217+
connection_by_node[node_name] = connection
218+
nodes[node_name] = NodeCommands(self.parse_response, connection)
207219

208220
nodes[node_name].append(c)
209221

210-
# send the commands in sequence.
211-
# we write to all the open sockets for each node first, before reading anything
212-
# this allows us to flush all the requests out across the network essentially in parallel
213-
# so that we can read them all in parallel as they come back.
214-
# we dont' multiplex on the sockets as they come available, but that shouldn't make too much difference.
215-
node_commands = nodes.values()
216-
for n in node_commands:
217-
n.write()
218-
219-
for n in node_commands:
220-
n.read()
221-
222-
# release all of the redis connections we allocated earlier back into the connection pool.
223-
# we used to do this step as part of a try/finally block, but it is really dangerous to
224-
# release connections back into the pool if for some reason the socket has data still left in it
225-
# from a previous operation. The write and read operations already have try/catch around them for
226-
# all known types of errors including connection and socket level errors.
227-
# So if we hit an exception, something really bad happened and putting any of
228-
# these connections back into the pool is a very bad idea.
229-
# the socket might have unread buffer still sitting in it, and then the
230-
# next time we read from it we pass the buffered result back from a previous
231-
# command and every single request after to that connection will always get
232-
# a mismatched result. (not just theoretical, I saw this happen on production x.x).
233-
for n in nodes.values():
234-
self.connection_pool.release(n.connection)
222+
return nodes, connection_by_node
223+
224+
def _execute_single_command(self, cmd):
225+
try:
226+
# send each command individually like we do in the main client.
227+
cmd.result = super(ClusterPipeline, self).execute_command(*cmd.args, **cmd.options)
228+
except RedisError as e:
229+
cmd.result = e
230+
231+
def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=True):
232+
"""
233+
Send a bunch of cluster commands to the redis cluster.
234+
235+
`allow_redirections` If the pipeline should follow `ASK` & `MOVED` responses
236+
automatically. If set to false it will raise RedisClusterException.
237+
"""
238+
# the first time sending the commands we send all of the commands that were queued up.
239+
# if we have to run through it again, we only retry the commands that failed.
240+
cmds = sorted(stack, key=lambda x: x.position)
241+
242+
max_redirects = 5
243+
cur_attempt = 0
244+
245+
while cur_attempt < max_redirects:
246+
247+
# build a list of node objects based on node names we need to
248+
nodes, connection_by_node = self._get_commands_by_node(cmds)
249+
250+
# send the commands in sequence.
251+
# we write to all the open sockets for each node first, before reading anything
252+
# this allows us to flush all the requests out across the network essentially in parallel
253+
# so that we can read them all in parallel as they come back.
254+
# we dont' multiplex on the sockets as they come available, but that shouldn't make too much difference.
255+
256+
# duke-cliff: I think it would still be faster if we use gevent to make the command in parallel
257+
# the io is non-blocking, but serialization/deserialization will still be blocking previously
258+
node_commands = nodes.values()
259+
events = []
260+
for n in node_commands:
261+
events.append(gevent.spawn(self._execute_node_commands, n))
262+
263+
gevent.joinall(events)
264+
265+
# release all of the redis connections we allocated earlier back into the connection pool.
266+
# we used to do this step as part of a try/finally block, but it is really dangerous to
267+
# release connections back into the pool if for some reason the socket has data still left in it
268+
# from a previous operation. The write and read operations already have try/catch around them for
269+
# all known types of errors including connection and socket level errors.
270+
# So if we hit an exception, something really bad happened and putting any of
271+
# these connections back into the pool is a very bad idea.
272+
# the socket might have unread buffer still sitting in it, and then the
273+
# next time we read from it we pass the buffered result back from a previous
274+
# command and every single request after to that connection will always get
275+
# a mismatched result. (not just theoretical, I saw this happen on production x.x).
276+
for conn in connection_by_node.values():
277+
self.connection_pool.release(conn)
278+
279+
# will regroup moved commands and retry using pipeline(stacked commands)
280+
# this would increase the pipeline performance a lot
281+
moved_cmds = []
282+
for c in cmds:
283+
if isinstance(c.result, MovedError):
284+
e = c.result
285+
node = self.connection_pool.nodes.get_node(e.host, e.port, server_type='master')
286+
self.connection_pool.nodes.move_slot_to_node(e.slot_id, node)
287+
288+
moved_cmds.append(c)
289+
290+
if moved_cmds:
291+
cur_attempt += 1
292+
cmds = sorted(moved_cmds, key=lambda x: x.position)
293+
continue
294+
295+
break
235296

236297
# if the response isn't an exception it is a valid response from the node
237298
# we're all done with that command, YAY!
238299
# if we have more commands to attempt, we've run into problems.
239300
# collect all the commands we are allowed to retry.
240301
# (MOVED, ASK, or connection errors or timeout errors)
241-
attempt = sorted([c for c in attempt if isinstance(c.result, ERRORS_ALLOW_RETRY)], key=lambda x: x.position)
302+
attempt = sorted([c for c in stack if isinstance(c.result, ERRORS_ALLOW_RETRY)], key=lambda x: x.position)
242303
if attempt and allow_redirections:
243304
# RETRY MAGIC HAPPENS HERE!
244305
# send these remaing comamnds one at a time using `execute_command`
@@ -255,13 +316,19 @@ def _send_cluster_commands(self, stack, raise_on_error=True, allow_redirections=
255316
# If a lot of commands have failed, we'll be setting the
256317
# flag to rebuild the slots table from scratch. So MOVED errors should
257318
# correct themselves fairly quickly.
319+
320+
# with the previous redirect retries, I could barely see the slow mode happening again
321+
log.info("pipeline in slow mode to execute failed commands: {}".format([c.result for c in attempt]))
322+
258323
self.connection_pool.nodes.increment_reinitialize_counter(len(attempt))
324+
325+
# even in the slow mode, we could use gevent to make things faster
326+
events = []
259327
for c in attempt:
260-
try:
261-
# send each command individually like we do in the main client.
262-
c.result = super(ClusterPipeline, self).execute_command(*c.args, **c.options)
263-
except RedisError as e:
264-
c.result = e
328+
events.append(gevent.spawn(self._execute_single_command, c))
329+
330+
gevent.joinall(events)
331+
265332

266333
# turn the response back into a simple flat array that corresponds
267334
# to the sequence of commands issued in the stack in pipeline.execute()

requirements.txt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
redis>=3.0.0,<4.0.0
2+
gevent
3+
greenlet

setup.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
setup(
2222
name="redis-py-cluster",
23-
version="2.1.0",
23+
version="2.1.1",
2424
description="Library for communicating with Redis Clusters. Built on top of redis-py lib",
2525
long_description=readme + '\n\n' + history,
2626
long_description_content_type="text/markdown",
@@ -32,7 +32,9 @@
3232
url='http://github.com/grokzen/redis-py-cluster',
3333
license='MIT',
3434
install_requires=[
35-
'redis>=3.0.0,<4.0.0'
35+
'redis>=3.0.0,<4.0.0',
36+
'gevent',
37+
'greenlet',
3638
],
3739
python_requires=">=2.7, !=3.0.*, !=3.1.*, !=3.2.*, !=3.3.*, !=3.4",
3840
extras_require={

0 commit comments

Comments
 (0)