-
Notifications
You must be signed in to change notification settings - Fork 180
Add rediscluster support #573
Changes from 3 commits
7492d69
e9b646a
6050bc3
8d5d40b
cb95feb
9655d17
6214d10
78dc569
1765def
ffdecf9
9570abf
df35f03
94914ae
191ed5d
c78dcf2
8db95aa
024adff
b904a82
aeb416d
2c099df
1ee4bdc
35839ad
4255cf2
972f82d
96054f4
c35f976
44ecda5
99b4ffd
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,6 +4,7 @@ | |
from typing import Optional | ||
|
||
import redis | ||
import rediscluster | ||
|
||
# redis.client.StrictPipeline was renamed to redis.client.Pipeline in version 3.0 | ||
try: | ||
|
@@ -253,3 +254,173 @@ def close(self) -> None: | |
and dequeue as the actions will recreate the queue) | ||
""" | ||
self.client.delete(self.queue) | ||
|
||
|
||
def cluster_pool_from_config( | ||
app_config: config.RawConfig, prefix: str = "rediscluster.", **kwargs: Any | ||
) -> rediscluster.ClusterConnectionPool: | ||
"""Make a ClusterConnectionPool from a configuration dictionary. | ||
|
||
The keys useful to :py:func:`clusterpool_from_config` should be prefixed, e.g. | ||
``rediscluster.url``, ``rediscluster.max_connections``, etc. The ``prefix`` argument | ||
specifies the prefix used to filter keys. Each key is mapped to a | ||
corresponding keyword argument on the :py:class:`redis.ConnectionPool` | ||
constructor. | ||
|
||
Supported keys: | ||
|
||
* ``url`` (required): a URL like ``redis://localhost/0``. | ||
* ``max_connections``: an integer maximum number of connections in the pool | ||
* ``timeout``: how long to wait for sockets to connect. e.g. | ||
``200 milliseconds`` (:py:func:`~baseplate.lib.config.Timespan`) | ||
""" | ||
|
||
assert prefix.endswith(".") | ||
|
||
parser = config.SpecParser( | ||
{ | ||
"startup_nodes": config.TupleOf(config.String), | ||
FranGM marked this conversation as resolved.
Show resolved
Hide resolved
|
||
"max_connections": config.Optional(config.Integer, default=None), | ||
"timeout": config.Optional(config.Timespan, default=None), | ||
} | ||
) | ||
|
||
options = parser.parse(prefix[:-1], app_config) | ||
|
||
nodes = [ | ||
{"host": node[0], "port": node[1]} | ||
for node in [startup_node.split(":") for startup_node in options.startup_nodes] | ||
] | ||
FranGM marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
kwargs["startup_nodes"] = nodes | ||
|
||
if options.max_connections is not None: | ||
kwargs.setdefault("max_connections", options.max_connections) | ||
if options.timeout is not None: | ||
kwargs.setdefault("timeout", options.timeout.total_seconds()) | ||
|
||
return rediscluster.ClusterBlockingConnectionPool.from_url(options.url, **kwargs) | ||
|
||
|
||
class ClusterRedisClient(config.Parser): | ||
"""Configure a clustered Redis client. | ||
|
||
This is meant to be used with | ||
:py:meth:`baseplate.Baseplate.configure_context`. | ||
|
||
See :py:func:`cluster_pool_from_config` for available configuration settings. | ||
|
||
""" | ||
|
||
def __init__(self, **kwargs: Any): | ||
self.kwargs = kwargs | ||
|
||
def parse(self, key_path: str, raw_config: config.RawConfig) -> "ClusterRedisContextFactory": | ||
connection_pool = cluster_pool_from_config(raw_config, f"{key_path}.", **self.kwargs) | ||
return ClusterRedisContextFactory(connection_pool) | ||
|
||
|
||
class ClusterRedisContextFactory(ContextFactory): | ||
"""Cluster Redis client context factory. | ||
|
||
This factory will attach a | ||
:py:class:`~baseplate.clients.redis.MonitoredClusterRedisConnection` to an | ||
attribute on the :py:class:`~baseplate.RequestContext`. When Redis commands | ||
are executed via this connection object, they will use connections from the | ||
provided :py:class:`rediscluster.ClusterConnectionPool` and automatically record | ||
diagnostic information. | ||
|
||
:param connection_pool: A connection pool. | ||
""" | ||
|
||
def __init__(self, connection_pool: rediscluster.ClusterConnectionPool): | ||
self.connection_pool = connection_pool | ||
|
||
def report_runtime_metrics(self, batch: metrics.Client) -> None: | ||
if not isinstance(self.connection_pool, rediscluster.ClusterBlockingConnectionPool): | ||
return | ||
|
||
size = self.connection_pool.max_connections | ||
open_connections = len(self.connection_pool._connections) # type: ignore | ||
available = self.connection_pool.pool.qsize() | ||
in_use = size - available | ||
|
||
batch.gauge("pool.size").replace(size) | ||
batch.gauge("pool.in_use").replace(in_use) | ||
batch.gauge("pool.open_and_available").replace(open_connections - in_use) | ||
|
||
def make_object_for_context(self, name: str, span: Span) -> "MonitoredClusterRedisConnection": | ||
return MonitoredClusterRedisConnection(name, span, self.connection_pool) | ||
|
||
|
||
class MonitoredClusterRedisConnection(rediscluster.RedisCluster): | ||
"""Cluster Redis connection that collects diagnostic information. | ||
|
||
This connection acts like :py:class:`rediscluster.Redis` except that all | ||
operations are automatically wrapped with diagnostic collection. | ||
|
||
The interface is the same as that class except for the | ||
:py:meth:`~baseplate.clients.redis.MonitoredClusterRedisConnection.pipeline` | ||
method. | ||
|
||
""" | ||
|
||
# TODO: Add all args below | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ? |
||
def __init__( | ||
self, | ||
context_name: str, | ||
server_span: Span, | ||
connection_pool: rediscluster.ClusterConnectionPool, | ||
): | ||
self.context_name = context_name | ||
self.server_span = server_span | ||
|
||
super().__init__(connection_pool=connection_pool) | ||
|
||
def execute_command(self, *args: Any, **kwargs: Any) -> Any: | ||
command = args[0] | ||
trace_name = f"{self.context_name}.{command}" | ||
|
||
with self.server_span.make_child(trace_name): | ||
return super().execute_command(command, *args[1:], **kwargs) | ||
|
||
def pipeline(self, name: str) -> "MonitoredClusterRedisPipeline": | ||
"""Create a pipeline. | ||
|
||
This returns an object on which you can call the standard Redis | ||
commands. Execution will be deferred until ``execute`` is called. This | ||
is useful for saving round trips even in a clustered environment . | ||
|
||
:param name: The name to attach to diagnostics for this pipeline. | ||
|
||
""" | ||
return MonitoredClusterRedisPipeline( | ||
f"{self.context_name}.pipeline_{name}", | ||
self.server_span, | ||
self.connection_pool, | ||
self.response_callbacks, | ||
) | ||
|
||
# No transaction support in redis-py-cluster | ||
def transaction(self, *args: Any, **kwargs: Any) -> Any: | ||
"""Not currently implemented.""" | ||
raise NotImplementedError | ||
|
||
|
||
class MonitoredClusterRedisPipeline(Pipeline): | ||
def __init__( | ||
self, | ||
trace_name: str, | ||
server_span: Span, | ||
connection_pool: rediscluster.ClusterConnectionPool, | ||
response_callbacks: Dict, | ||
**kwargs: Any, | ||
): | ||
self.trace_name = trace_name | ||
self.server_span = server_span | ||
super().__init__(connection_pool, response_callbacks, **kwargs) | ||
|
||
# pylint: disable=arguments-differ | ||
def execute(self, **kwargs: Any) -> Any: # type: ignore | ||
with self.server_span.make_child(self.trace_name): | ||
return super().execute(**kwargs) |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,9 @@ services: | |
- "memcached" | ||
- "redis" | ||
- "zookeeper" | ||
- "redis-cluster-node-0" | ||
- "redis-cluster-node-1" | ||
- "redis-cluster-node-2" | ||
|
||
cassandra: | ||
image: "cassandra:3.11" | ||
|
@@ -25,3 +28,29 @@ services: | |
image: "redis:4.0.9" | ||
zookeeper: | ||
image: "zookeeper:3.4.10" | ||
redis-cluster-node-0: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. wow, this is a lot of stuff. do we actually need a three node cluster to test this out? or can we mock it more simply? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. One idea would be to use
which gives us a Redis cluster with a single Docker dependency. It is not recommended for production use but perfect for testing environments. I will say, Redis cluster has idiosyncrasies that are worth having tests run against a real instance but that depends entirely on the tests. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we're not actually implementing a full redis cluster client here, thankfully, but just wrapping an existing one. do you think we need to have behavioral tests here in baseplate beyond ensuring that we generate spans? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There were a few cases where I was thinking more testing would be useful but ultimately any test I could think of would just end up testing the underlying |
||
image: docker.io/bitnami/redis-cluster:6.2-debian-10 | ||
environment: | ||
- 'ALLOW_EMPTY_PASSWORD=yes' | ||
- 'REDIS_NODES=redis-cluster-node-0 redis-cluster-node-1 redis-cluster-node-2' | ||
redis-cluster-node-1: | ||
image: docker.io/bitnami/redis-cluster:6.2-debian-10 | ||
environment: | ||
- 'ALLOW_EMPTY_PASSWORD=yes' | ||
- 'REDIS_NODES=redis-cluster-node-0 redis-cluster-node-1 redis-cluster-node-2' | ||
redis-cluster-node-2: | ||
image: docker.io/bitnami/redis-cluster:6.2-debian-10 | ||
environment: | ||
- 'ALLOW_EMPTY_PASSWORD=yes' | ||
- 'REDIS_NODES=redis-cluster-node-0 redis-cluster-node-1 redis-cluster-node-2' | ||
redis-cluster-init: | ||
image: docker.io/bitnami/redis-cluster:6.2-debian-10 | ||
depends_on: | ||
- redis-cluster-node-0 | ||
- redis-cluster-node-1 | ||
- redis-cluster-node-2 | ||
environment: | ||
- 'REDISCLI_AUTH=' | ||
- 'REDIS_CLUSTER_REPLICAS=1' | ||
- 'REDIS_NODES=redis-cluster-node-0 redis-cluster-node-1 redis-cluster-node-2' | ||
- 'REDIS_CLUSTER_CREATOR=yes' |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,9 +6,13 @@ | |
except ImportError: | ||
raise unittest.SkipTest("redis-py is not installed") | ||
|
||
try: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. likewise on splitting up the redis/redis_cluster modules here please |
||
import rediscluster | ||
except ImportError: | ||
raise unittest.SkipTest("redis-py-cluster is not installed") | ||
|
||
from baseplate.clients.redis import RedisClient | ||
from baseplate import Baseplate | ||
|
||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. gasp! |
||
from . import TestBaseplateObserver, get_endpoint_or_skip_container | ||
|
||
from baseplate.clients.redis import MessageQueue | ||
|
@@ -79,6 +83,67 @@ def test_pipeline(self): | |
self.assertIsNone(span_observer.on_finish_exc_info) | ||
|
||
|
||
class RedisClusterIntegrationTests(unittest.TestCase): | ||
def setUp(self): | ||
self.baseplate_observer = TestBaseplateObserver() | ||
|
||
baseplate = Baseplate({"rediscluster.url": f"redis://{redis_endpoint}/0"}) | ||
baseplate.register(self.baseplate_observer) | ||
baseplate.configure_context({"rediscluster": RedisClient()}) | ||
|
||
self.context = baseplate.make_context_object() | ||
self.server_span = baseplate.make_server_span(self.context, "test") | ||
|
||
def test_simple_command(self): | ||
with self.server_span: | ||
result = self.context.rediscluster.ping() | ||
|
||
self.assertTrue(result) | ||
|
||
server_span_observer = self.baseplate_observer.get_only_child() | ||
span_observer = server_span_observer.get_only_child() | ||
self.assertEqual(span_observer.span.name, "rediscluster.PING") | ||
self.assertTrue(span_observer.on_start_called) | ||
self.assertTrue(span_observer.on_finish_called) | ||
self.assertIsNone(span_observer.on_finish_exc_info) | ||
|
||
def test_error(self): | ||
with self.server_span: | ||
with self.assertRaises(redis.ResponseError): | ||
self.context.rediscluster.execute_command("crazycommand") | ||
|
||
server_span_observer = self.baseplate_observer.get_only_child() | ||
span_observer = server_span_observer.get_only_child() | ||
self.assertTrue(span_observer.on_start_called) | ||
self.assertTrue(span_observer.on_finish_called) | ||
self.assertIsNotNone(span_observer.on_finish_exc_info) | ||
|
||
def test_lock(self): | ||
with self.server_span: | ||
with self.context.rediscluster.lock("foo"): | ||
pass | ||
|
||
server_span_observer = self.baseplate_observer.get_only_child() | ||
|
||
self.assertGreater(len(server_span_observer.children), 0) | ||
for span_observer in server_span_observer.children: | ||
self.assertTrue(span_observer.on_start_called) | ||
self.assertTrue(span_observer.on_finish_called) | ||
|
||
def test_pipeline(self): | ||
with self.server_span: | ||
with self.context.rediscluster.pipeline("foo") as pipeline: | ||
pipeline.ping() | ||
pipeline.execute() | ||
|
||
server_span_observer = self.baseplate_observer.get_only_child() | ||
span_observer = server_span_observer.get_only_child() | ||
self.assertEqual(span_observer.span.name, "rediscluster.pipeline_foo") | ||
self.assertTrue(span_observer.on_start_called) | ||
self.assertTrue(span_observer.on_finish_called) | ||
self.assertIsNone(span_observer.on_finish_exc_info) | ||
|
||
|
||
class RedisMessageQueueTests(unittest.TestCase): | ||
qname = "redisTestQueue" | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This would make anyone using the current redis client suddenly have to install
redis-py-cluster
as well just to continue using the other client. Can you move the new cluster stuff into its own module in the clients folder? Likebaseplate/clients/redis_cluster.py
perhaps?