Skip to content
This repository was archived by the owner on Mar 24, 2025. It is now read-only.

Commit fa3487b

Browse files
authored
Add rediscluster support (#573)
1 parent 4381dec commit fa3487b

File tree

10 files changed

+869
-1
lines changed

10 files changed

+869
-1
lines changed

baseplate/clients/redis_cluster.py

Lines changed: 450 additions & 0 deletions
Large diffs are not rendered by default.

docker-compose.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ services:
1313
- "memcached"
1414
- "redis"
1515
- "zookeeper"
16-
16+
- "redis-cluster-node"
1717
cassandra:
1818
image: "cassandra:3.11"
1919
environment:
@@ -25,3 +25,5 @@ services:
2525
image: "redis:4.0.9"
2626
zookeeper:
2727
image: "zookeeper:3.4.10"
28+
redis-cluster-node:
29+
image: docker.io/grokzen/redis-cluster:6.2.0

docs/api/baseplate/clients/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ Instrumented Client Libraries
1313
baseplate.clients.kombu: Client for publishing to queues <kombu>
1414
baseplate.clients.memcache: Memcached Client <memcache>
1515
baseplate.clients.redis: Redis Client <redis>
16+
baseplate.clients.redis_cluster: Redis Cluster Client <redis_cluster>
1617
baseplate.clients.requests: Requests (HTTP) Client <requests>
1718
baseplate.clients.sqlalchemy: SQL Client for relational databases (e.g. PostgreSQL) <sqlalchemy>
1819
baseplate.clients.thrift: Thrift client for RPC to other backend services <thrift>
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
``baseplate.clients.redis_cluster``
2+
===================================
3+
4+
`Redis`_ is an in-memory data structure store used where speed is necessary but
5+
complexity is beyond simple key-value operations. (If you're just doing
6+
caching, prefer :doc:`memcached <memcache>`). `Redis-py-cluster`_ is a Python
7+
client library that supports interacting with Redis when operating in cluster mode.
8+
9+
.. _`Redis`: https://redis.io/
10+
.. _`redis-py-cluster`: https://github.com/Grokzen/redis-py
11+
12+
.. automodule:: baseplate.clients.redis_cluster
13+
14+
.. versionadded:: 2.1
15+
16+
Example
17+
-------
18+
19+
To integrate redis-py-cluster with your application, add the appropriate client
20+
declaration to your context configuration::
21+
22+
baseplate.configure_context(
23+
app_config,
24+
{
25+
...
26+
"foo": ClusterRedisClient(),
27+
...
28+
}
29+
)
30+
31+
configure it in your application's configuration file:
32+
33+
.. code-block:: ini
34+
35+
[app:main]
36+
37+
...
38+
39+
40+
# required: what redis instance to connect to
41+
foo.url = redis://localhost:6379/0
42+
43+
# optional: the maximum size of the connection pool
44+
foo.max_connections = 99
45+
46+
# optional: how long to wait for a connection to establish
47+
foo.timeout = 3 seconds
48+
49+
# optional: Whether read requests should be directed to replicas as well
50+
# instead of just the primary
51+
foo.read_from_replicas = true
52+
...
53+
54+
55+
and then use the attached :py:class:`~redis.Redis`-like object in
56+
request::
57+
58+
def my_method(request):
59+
request.foo.ping()
60+
61+
Configuration
62+
-------------
63+
64+
.. autoclass:: ClusterRedisClient
65+
66+
.. autofunction:: cluster_pool_from_config
67+
68+
Classes
69+
-------
70+
71+
.. autoclass:: ClusterRedisContextFactory
72+
:members:
73+
74+
.. autoclass:: MonitoredClusterRedisConnection
75+
:members:
76+
77+
Runtime Metrics
78+
---------------
79+
80+
In addition to request-level metrics reported through spans, this wrapper
81+
reports connection pool statistics periodically via the :ref:`runtime-metrics`
82+
system. All metrics are tagged with ``client``, the name given to
83+
:py:meth:`~baseplate.Baseplate.configure_context` when registering this context
84+
factory.
85+
86+
The following metrics are reported:
87+
88+
``runtime.pool.size``
89+
The size limit for the connection pool.
90+
``runtime.pool.in_use``
91+
How many connections have been established and are currently checked out and
92+
being used.
93+
94+
95+
Hot Key Tracking
96+
----------------
97+
98+
Optionally, the client can help track key usage across the Redis cluster to
99+
help you identify if you have "hot" keys (keys that are read from or
100+
written to much more frequently than other keys). This is particularly useful
101+
in clusters with ``noeviction`` set as the eviction policy, since Redis
102+
lacks a built-in mechanism to help you track hot keys in this case.
103+
104+
Since tracking every single key used is expensive, the tracker works by
105+
tracking a small percentage or reads and/or writes, which can be configured
106+
on your client:
107+
108+
.. code-block:: ini
109+
110+
[app:main]
111+
112+
...
113+
# Note that by default the sample rate will be zero for both reads and writes
114+
115+
# optional: Sample keys for 1% of read operations
116+
foo.track_key_reads_sample_rate = 0.01
117+
118+
# optional: Sample keys for 10% of write operations
119+
foo.track_key_writes_sample_rate = 0.01
120+
121+
...
122+
123+
The keys tracked will be written to a sorted set in the Redis cluster itself,
124+
which we can query at any time to see what keys are read from or written to
125+
more often than others. Keys used for writes will be stored in
126+
`baseplate-hot-key-tracker-writes` and keys used for reads will be stored in
127+
`baseplate-hot-key-tracker-reads`. Here's an example of how you can query the
128+
top 10 keys on each categories with their associated scores:
129+
130+
.. code-block:: console
131+
132+
> ZREVRANGEBYSCORE baseplate-hot-key-tracker-reads +inf 0 WITHSCORES LIMIT 0 10
133+
134+
> ZREVRANGEBYSCORE baseplate-hot-key-tracker-writes +inf 0 WITHSCORES LIMIT 0 10
135+
136+
137+
Note that due to how the sampling works the scores are only meaningful in a
138+
relative sense (by comparing one key's access frequency to others in the list)
139+
but can't be used to make any inferences about key access rate or anything like
140+
that.
141+
142+
Both tracker sets have a default TTL of 24 hours, so once they stop being
143+
written to (for instance, if key tracking is disabled) they will clean up
144+
after themselves in 24 hours, allowing us to start fresh the next time we
145+
want to enable key tracking.

requirements-transitive.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@ pyramid==1.10.5
6161
python-json-logger==2.0.1
6262
reddit-cqlmapper==0.3.0
6363
redis==3.5.3
64+
redis-py-cluster==2.1.2
6465
regex==2020.11.13
6566
requests==2.25.1
6667
sentry-sdk==0.20.1

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,3 +20,4 @@ sphinx-autodoc-typehints==1.11.1
2020
sphinxcontrib-spelling==7.1.0
2121
webtest==2.0.35
2222
wheel==0.36.2
23+
fakeredis==1.5.0

setup.cfg

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@ ignore_missing_imports = True
102102
[mypy-pythonjsonlogger.*]
103103
ignore_missing_imports = True
104104

105+
[mypy-rediscluster.*]
106+
ignore_missing_imports = True
107+
105108
[mypy-sqlalchemy.*]
106109
ignore_missing_imports = True
107110

setup.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
"memcache": ["pymemcache>=1.3.0,<1.4.4"],
1313
"pyramid": ["pyramid>=1.9.0,<2.0"],
1414
"redis": ["redis>=2.10.0,<4.0.0"],
15+
"redis-py-cluster": ["redis-py-cluster>=2.1.2,<3.0.0"],
1516
"refcycle": ["objgraph>=3.0,<4.0"],
1617
"requests": ["advocate>=1.0.0,<2.0"],
1718
"sentry": ["sentry-sdk>=0.19,<1.0"],
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
import unittest
2+
3+
try:
4+
import rediscluster
5+
except ImportError:
6+
raise unittest.SkipTest("redis-py-cluster is not installed")
7+
8+
from baseplate.lib.config import ConfigurationError
9+
from baseplate.clients.redis_cluster import cluster_pool_from_config
10+
11+
from baseplate.clients.redis_cluster import ClusterRedisClient
12+
from baseplate import Baseplate
13+
from . import TestBaseplateObserver, get_endpoint_or_skip_container
14+
15+
redis_endpoint = get_endpoint_or_skip_container("redis-cluster-node", 7000)
16+
17+
18+
# This belongs on the unit tests section but the client class attempts to initialise
19+
# the list of nodes when being instantiated so it's simpler to test here with a redis
20+
# cluster available
21+
class ClusterPoolFromConfigTests(unittest.TestCase):
22+
def test_empty_config(self):
23+
with self.assertRaises(ConfigurationError):
24+
cluster_pool_from_config({})
25+
26+
def test_basic_url(self):
27+
pool = cluster_pool_from_config({"rediscluster.url": f"redis://{redis_endpoint}/0"})
28+
29+
self.assertEqual(pool.nodes.startup_nodes[0]["host"], "redis-cluster-node")
30+
self.assertEqual(pool.nodes.startup_nodes[0]["port"], "7000")
31+
32+
def test_timeouts(self):
33+
pool = cluster_pool_from_config(
34+
{
35+
"rediscluster.url": f"redis://{redis_endpoint}/0",
36+
"rediscluster.timeout": "30 seconds",
37+
}
38+
)
39+
40+
self.assertEqual(pool.timeout, 30)
41+
42+
def test_max_connections(self):
43+
pool = cluster_pool_from_config(
44+
{
45+
"rediscluster.url": f"redis://{redis_endpoint}/0",
46+
"rediscluster.max_connections": "300",
47+
}
48+
)
49+
50+
self.assertEqual(pool.max_connections, 300)
51+
52+
def test_max_connections_default(self):
53+
# https://github.com/Grokzen/redis-py-cluster/issues/435
54+
pool = cluster_pool_from_config({"rediscluster.url": f"redis://{redis_endpoint}/0"})
55+
56+
self.assertEqual(pool.max_connections, 50)
57+
58+
def test_kwargs_passthrough(self):
59+
pool = cluster_pool_from_config(
60+
{"rediscluster.url": f"redis://{redis_endpoint}/0"}, example="present"
61+
)
62+
63+
self.assertEqual(pool.connection_kwargs["example"], "present")
64+
65+
def test_alternate_prefix(self):
66+
pool = cluster_pool_from_config(
67+
{"noodle.url": f"redis://{redis_endpoint}/0"}, prefix="noodle."
68+
)
69+
self.assertEqual(pool.nodes.startup_nodes[0]["host"], "redis-cluster-node")
70+
self.assertEqual(pool.nodes.startup_nodes[0]["port"], "7000")
71+
72+
def test_only_primary_available(self):
73+
pool = cluster_pool_from_config({"rediscluster.url": f"redis://{redis_endpoint}/0"})
74+
node_list = [pool.get_node_by_slot(slot=1, read_command=False) for _ in range(0, 100)]
75+
76+
# The primary is on port 7000 so that's the only port we expect to see
77+
self.assertTrue(all(node["port"] == 7000 for node in node_list))
78+
79+
def test_read_from_replicas(self):
80+
pool = cluster_pool_from_config({"rediscluster.url": f"redis://{redis_endpoint}/0"})
81+
82+
node_list = [pool.get_node_by_slot(slot=1, read_command=True) for _ in range(0, 100)]
83+
84+
# Both replicas and primary are available, so we expect to see some non-primaries here
85+
self.assertTrue(any(node["port"] != 7000 for node in node_list))
86+
87+
88+
class RedisClusterIntegrationTests(unittest.TestCase):
89+
def setUp(self):
90+
self.baseplate_observer = TestBaseplateObserver()
91+
92+
baseplate = Baseplate(
93+
{
94+
"rediscluster.url": f"redis://{redis_endpoint}/0",
95+
"rediscluster.timeout": "1 second",
96+
"rediscluster.max_connections": "4",
97+
}
98+
)
99+
baseplate.register(self.baseplate_observer)
100+
baseplate.configure_context({"rediscluster": ClusterRedisClient()})
101+
102+
self.context = baseplate.make_context_object()
103+
self.server_span = baseplate.make_server_span(self.context, "test")
104+
105+
def test_simple_command(self):
106+
with self.server_span:
107+
result = self.context.rediscluster.ping()
108+
109+
self.assertTrue(result)
110+
111+
server_span_observer = self.baseplate_observer.get_only_child()
112+
span_observer = server_span_observer.get_only_child()
113+
self.assertEqual(span_observer.span.name, "rediscluster.PING")
114+
self.assertTrue(span_observer.on_start_called)
115+
self.assertTrue(span_observer.on_finish_called)
116+
self.assertIsNone(span_observer.on_finish_exc_info)
117+
118+
def test_error(self):
119+
with self.server_span:
120+
with self.assertRaises(rediscluster.RedisClusterException):
121+
self.context.rediscluster.execute_command("crazycommand")
122+
123+
server_span_observer = self.baseplate_observer.get_only_child()
124+
span_observer = server_span_observer.get_only_child()
125+
self.assertTrue(span_observer.on_start_called)
126+
self.assertTrue(span_observer.on_finish_called)
127+
self.assertIsNotNone(span_observer.on_finish_exc_info)
128+
129+
def test_lock(self):
130+
with self.server_span:
131+
with self.context.rediscluster.lock("foo-lock"):
132+
pass
133+
134+
server_span_observer = self.baseplate_observer.get_only_child()
135+
136+
self.assertGreater(len(server_span_observer.children), 0)
137+
for span_observer in server_span_observer.children:
138+
self.assertTrue(span_observer.on_start_called)
139+
self.assertTrue(span_observer.on_finish_called)
140+
141+
def test_pipeline(self):
142+
with self.server_span:
143+
with self.context.rediscluster.pipeline("foo") as pipeline:
144+
pipeline.set("foo", "bar")
145+
pipeline.get("foo")
146+
pipeline.get("foo")
147+
pipeline.get("foo")
148+
pipeline.get("foo")
149+
pipeline.get("foo")
150+
pipeline.delete("foo")
151+
pipeline.execute()
152+
153+
server_span_observer = self.baseplate_observer.get_only_child()
154+
span_observer = server_span_observer.get_only_child()
155+
self.assertEqual(span_observer.span.name, "rediscluster.pipeline_foo")
156+
self.assertTrue(span_observer.on_start_called)
157+
self.assertTrue(span_observer.on_finish_called)
158+
self.assertIsNone(span_observer.on_finish_exc_info)

0 commit comments

Comments
 (0)