Skip to content
This repository was archived by the owner on Mar 24, 2025. It is now read-only.

Add rediscluster support #573

Merged
merged 28 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
7492d69
Add redisc cluster client support
Mar 8, 2021
e9b646a
Add integration tests for ClusterRedisClient
Mar 8, 2021
6050bc3
remove unused import
Mar 8, 2021
8d5d40b
remove unnneeded comment
Mar 8, 2021
cb95feb
remove unused import
Mar 8, 2021
9655d17
disable pylint warning
Mar 8, 2021
6214d10
Update baseplate/clients/redis.py
FranGM Mar 8, 2021
78dc569
Update baseplate/clients/redis.py
FranGM Mar 8, 2021
1765def
address comments
Mar 10, 2021
ffdecf9
set version
Mar 10, 2021
9570abf
Update baseplate/clients/redis_cluster.py
FranGM Mar 10, 2021
df35f03
address comments (take 2)
Mar 11, 2021
94914ae
When read_from_replicas is enabled also read from master
Mar 11, 2021
191ed5d
Return only one node on get_node_by_slot
FranGM Mar 30, 2021
c78dcf2
fix documentation lint issues
FranGM Mar 30, 2021
8db95aa
change some client defaults
FranGM Mar 30, 2021
024adff
add more tests
FranGM Mar 30, 2021
b904a82
Remove unused metric
Apr 6, 2021
aeb416d
pass read_from_replicas arg when creating pipeline
FranGM Apr 20, 2021
2c099df
bump redis-py-cluster version
FranGM Apr 20, 2021
1ee4bdc
Add hot key tracker to the cluster reddis client
Apr 29, 2021
35839ad
Merge branch 'rediscluster-support' of github.com:FranGM/baseplate.py…
Apr 29, 2021
4255cf2
Allow to configure max_connections_per_node
FranGM May 2, 2021
972f82d
Update to current redis-py-cluster version
FranGM May 18, 2021
96054f4
Update requirements-transitive.txt
FranGM May 18, 2021
c35f976
Add hot key tracking to docs
May 19, 2021
44ecda5
Update setup.py
FranGM May 19, 2021
99b4ffd
Update docs/api/baseplate/clients/redis_cluster.rst
spladug May 19, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
447 changes: 447 additions & 0 deletions baseplate/clients/redis_cluster.py

Large diffs are not rendered by default.

4 changes: 3 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ services:
- "memcached"
- "redis"
- "zookeeper"

- "redis-cluster-node"
cassandra:
image: "cassandra:3.11"
environment:
Expand All @@ -25,3 +25,5 @@ services:
image: "redis:4.0.9"
zookeeper:
image: "zookeeper:3.4.10"
redis-cluster-node:
image: docker.io/grokzen/redis-cluster:6.2.0
1 change: 1 addition & 0 deletions docs/api/baseplate/clients/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Instrumented Client Libraries
baseplate.clients.kombu: Client for publishing to queues <kombu>
baseplate.clients.memcache: Memcached Client <memcache>
baseplate.clients.redis: Redis Client <redis>
baseplate.clients.redis_cluster: Redis Cluster Client <redis_cluster>
baseplate.clients.requests: Requests (HTTP) Client <requests>
baseplate.clients.sqlalchemy: SQL Client for relational databases (e.g. PostgreSQL) <sqlalchemy>
baseplate.clients.thrift: Thrift client for RPC to other backend services <thrift>
Expand Down
92 changes: 92 additions & 0 deletions docs/api/baseplate/clients/redis_cluster.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
``baseplate.clients.redis_cluster``
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the hot key tracking stuff be mentioned somewhere in this page?

===================================

`Redis`_ is an in-memory data structure store used where speed is necessary but
complexity is beyond simple key-value operations. (If you're just doing
caching, prefer :doc:`memcached <memcache>`). `Redis-py-cluster`_ is a Python
client library that supports interacting with Redis when operating in cluster mode.

.. _`Redis`: https://redis.io/
.. _`redis-py-cluster`: https://github.com/Grokzen/redis-py

.. automodule:: baseplate.clients.redis_cluster

Example
-------

To integrate redis-py-cluster with your application, add the appropriate client
declaration to your context configuration::

baseplate.configure_context(
app_config,
{
...
"foo": ClusterRedisClient(),
...
}
)

configure it in your application's configuration file:

.. code-block:: ini

[app:main]

...


# required: what redis instance to connect to
foo.url = redis://localhost:6379/0

# optional: the maximum size of the connection pool
foo.max_connections = 99

# optional: how long to wait for a connection to establish
foo.timeout = 3 seconds

# optional: Whether read requests should be directed to replicas as well
# instead of just the primary
foo.read_from_replicas = true
...


and then use the attached :py:class:`~redis.Redis`-like object in
request::

def my_method(request):
request.foo.ping()

Configuration
-------------

.. autoclass:: ClusterRedisClient

.. autofunction:: cluster_pool_from_config

Classes
-------

.. autoclass:: ClusterRedisContextFactory
:members:

.. autoclass:: MonitoredClusterRedisConnection
:members:

Runtime Metrics
---------------

In addition to request-level metrics reported through spans, this wrapper
reports connection pool statistics periodically via the :ref:`runtime-metrics`
system. All metrics are tagged with ``client``, the name given to
:py:meth:`~baseplate.Baseplate.configure_context` when registering this context
factory.

The following metrics are reported:

``runtime.pool.size``
The size limit for the connection pool.
``runtime.pool.in_use``
How many connections have been established and are currently checked out and
being used.

.. versionadded:: 2.1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this go up at the top, like right before the Example heading? I've only put the versionadded stuff down here on some other clients where the runtime metrics were the thing that was new.

1 change: 1 addition & 0 deletions requirements-transitive.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pyramid==1.10.5
python-json-logger==2.0.1
reddit-cqlmapper==0.3.0
redis==3.5.3
redis-py-cluster==2.1.2
regex==2020.11.13
requests==2.25.1
sentry-sdk==0.20.1
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ sphinx-autodoc-typehints==1.11.1
sphinxcontrib-spelling==7.1.0
webtest==2.0.35
wheel==0.36.2
fakeredis==1.5.0
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ ignore_missing_imports = True
[mypy-pythonjsonlogger.*]
ignore_missing_imports = True

[mypy-rediscluster.*]
ignore_missing_imports = True

[mypy-sqlalchemy.*]
ignore_missing_imports = True

Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"memcache": ["pymemcache>=1.3.0,<1.4.4"],
"pyramid": ["pyramid>=1.9.0,<2.0"],
"redis": ["redis>=2.10.0,<4.0.0"],
"redis-py-cluster": ["redis-py-cluster>=2.0.0,<3.0.0"],
"refcycle": ["objgraph>=3.0,<4.0"],
"requests": ["advocate>=1.0.0,<2.0"],
"sentry": ["sentry-sdk>=0.19,<1.0"],
Expand Down
158 changes: 158 additions & 0 deletions tests/integration/redis_cluster_tests.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import unittest

try:
import rediscluster
except ImportError:
raise unittest.SkipTest("redis-py-cluster is not installed")

from baseplate.lib.config import ConfigurationError
from baseplate.clients.redis_cluster import cluster_pool_from_config

from baseplate.clients.redis_cluster import ClusterRedisClient
from baseplate import Baseplate
from . import TestBaseplateObserver, get_endpoint_or_skip_container

redis_endpoint = get_endpoint_or_skip_container("redis-cluster-node", 7000)


# This belongs on the unit tests section but the client class attempts to initialise
# the list of nodes when being instantiated so it's simpler to test here with a redis
# cluster available
class ClusterPoolFromConfigTests(unittest.TestCase):
def test_empty_config(self):
with self.assertRaises(ConfigurationError):
cluster_pool_from_config({})

def test_basic_url(self):
pool = cluster_pool_from_config({"rediscluster.url": f"redis://{redis_endpoint}/0"})

self.assertEqual(pool.nodes.startup_nodes[0]["host"], "redis-cluster-node")
self.assertEqual(pool.nodes.startup_nodes[0]["port"], "7000")

def test_timeouts(self):
pool = cluster_pool_from_config(
{
"rediscluster.url": f"redis://{redis_endpoint}/0",
"rediscluster.timeout": "30 seconds",
}
)

self.assertEqual(pool.timeout, 30)

def test_max_connections(self):
pool = cluster_pool_from_config(
{
"rediscluster.url": f"redis://{redis_endpoint}/0",
"rediscluster.max_connections": "300",
}
)

self.assertEqual(pool.max_connections, 300)

def test_max_connections_default(self):
# https://github.com/Grokzen/redis-py-cluster/issues/435
pool = cluster_pool_from_config({"rediscluster.url": f"redis://{redis_endpoint}/0"})

self.assertEqual(pool.max_connections, 50)

def test_kwargs_passthrough(self):
pool = cluster_pool_from_config(
{"rediscluster.url": f"redis://{redis_endpoint}/0"}, example="present"
)

self.assertEqual(pool.connection_kwargs["example"], "present")

def test_alternate_prefix(self):
pool = cluster_pool_from_config(
{"noodle.url": f"redis://{redis_endpoint}/0"}, prefix="noodle."
)
self.assertEqual(pool.nodes.startup_nodes[0]["host"], "redis-cluster-node")
self.assertEqual(pool.nodes.startup_nodes[0]["port"], "7000")

def test_only_primary_available(self):
pool = cluster_pool_from_config({"rediscluster.url": f"redis://{redis_endpoint}/0"})
node_list = [pool.get_node_by_slot(slot=1, read_command=False) for _ in range(0, 100)]

# The primary is on port 7000 so that's the only port we expect to see
self.assertTrue(all(node["port"] == 7000 for node in node_list))

def test_read_from_replicas(self):
pool = cluster_pool_from_config({"rediscluster.url": f"redis://{redis_endpoint}/0"})

node_list = [pool.get_node_by_slot(slot=1, read_command=True) for _ in range(0, 100)]

# Both replicas and primary are available, so we expect to see some non-primaries here
self.assertTrue(any(node["port"] != 7000 for node in node_list))


class RedisClusterIntegrationTests(unittest.TestCase):
def setUp(self):
self.baseplate_observer = TestBaseplateObserver()

baseplate = Baseplate(
{
"rediscluster.url": f"redis://{redis_endpoint}/0",
"rediscluster.timeout": "1 second",
"rediscluster.max_connections": "4",
}
)
baseplate.register(self.baseplate_observer)
baseplate.configure_context({"rediscluster": ClusterRedisClient()})

self.context = baseplate.make_context_object()
self.server_span = baseplate.make_server_span(self.context, "test")

def test_simple_command(self):
with self.server_span:
result = self.context.rediscluster.ping()

self.assertTrue(result)

server_span_observer = self.baseplate_observer.get_only_child()
span_observer = server_span_observer.get_only_child()
self.assertEqual(span_observer.span.name, "rediscluster.PING")
self.assertTrue(span_observer.on_start_called)
self.assertTrue(span_observer.on_finish_called)
self.assertIsNone(span_observer.on_finish_exc_info)

def test_error(self):
with self.server_span:
with self.assertRaises(rediscluster.RedisClusterException):
self.context.rediscluster.execute_command("crazycommand")

server_span_observer = self.baseplate_observer.get_only_child()
span_observer = server_span_observer.get_only_child()
self.assertTrue(span_observer.on_start_called)
self.assertTrue(span_observer.on_finish_called)
self.assertIsNotNone(span_observer.on_finish_exc_info)

def test_lock(self):
with self.server_span:
with self.context.rediscluster.lock("foo-lock"):
pass

server_span_observer = self.baseplate_observer.get_only_child()

self.assertGreater(len(server_span_observer.children), 0)
for span_observer in server_span_observer.children:
self.assertTrue(span_observer.on_start_called)
self.assertTrue(span_observer.on_finish_called)

def test_pipeline(self):
with self.server_span:
with self.context.rediscluster.pipeline("foo") as pipeline:
pipeline.set("foo", "bar")
pipeline.get("foo")
pipeline.get("foo")
pipeline.get("foo")
pipeline.get("foo")
pipeline.get("foo")
pipeline.delete("foo")
pipeline.execute()

server_span_observer = self.baseplate_observer.get_only_child()
span_observer = server_span_observer.get_only_child()
self.assertEqual(span_observer.span.name, "rediscluster.pipeline_foo")
self.assertTrue(span_observer.on_start_called)
self.assertTrue(span_observer.on_finish_called)
self.assertIsNone(span_observer.on_finish_exc_info)
Loading