This repository was archived by the owner on Mar 24, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 180
Add rediscluster support #573
Merged
Merged
Changes from 17 commits
Commits
Show all changes
28 commits
Select commit
Hold shift + click to select a range
7492d69
Add redisc cluster client support
e9b646a
Add integration tests for ClusterRedisClient
6050bc3
remove unused import
8d5d40b
remove unnneeded comment
cb95feb
remove unused import
9655d17
disable pylint warning
6214d10
Update baseplate/clients/redis.py
FranGM 78dc569
Update baseplate/clients/redis.py
FranGM 1765def
address comments
ffdecf9
set version
9570abf
Update baseplate/clients/redis_cluster.py
FranGM df35f03
address comments (take 2)
94914ae
When read_from_replicas is enabled also read from master
191ed5d
Return only one node on get_node_by_slot
FranGM c78dcf2
fix documentation lint issues
FranGM 8db95aa
change some client defaults
FranGM 024adff
add more tests
FranGM b904a82
Remove unused metric
aeb416d
pass read_from_replicas arg when creating pipeline
FranGM 2c099df
bump redis-py-cluster version
FranGM 1ee4bdc
Add hot key tracker to the cluster reddis client
35839ad
Merge branch 'rediscluster-support' of github.com:FranGM/baseplate.py…
4255cf2
Allow to configure max_connections_per_node
FranGM 972f82d
Update to current redis-py-cluster version
FranGM 96054f4
Update requirements-transitive.txt
FranGM c35f976
Add hot key tracking to docs
44ecda5
Update setup.py
FranGM 99b4ffd
Update docs/api/baseplate/clients/redis_cluster.rst
spladug File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,226 @@ | ||
import random | ||
|
||
from typing import Any | ||
from typing import Dict | ||
|
||
import rediscluster | ||
|
||
from rediscluster.pipeline import ClusterPipeline | ||
|
||
from baseplate import Span | ||
from baseplate.clients import ContextFactory | ||
from baseplate.lib import config | ||
from baseplate.lib import metrics | ||
|
||
|
||
# We want to be able to combine blocking behaviour with the ability to read from replicas | ||
# Unfortunately this is not provide as-is so we combine two connection pool classes to provide | ||
# the desired behaviour. | ||
class ClusterWithReadReplicasBlockingConnectionPool(rediscluster.ClusterBlockingConnectionPool): | ||
# pylint: disable=arguments-differ | ||
def get_node_by_slot(self, slot: int, read_command: bool = False) -> Dict[str, Any]: | ||
""" | ||
Get a node from the slot. | ||
If the command is a read command we'll try to return a random node. | ||
If there are no replicas or this isn't a read command we'll return the primary. | ||
""" | ||
if read_command: | ||
return random.choice(self.nodes.slots[slot]) | ||
|
||
# This isn't a read command, so return the primary (first node) | ||
return self.nodes.slots[slot][0] | ||
|
||
|
||
def cluster_pool_from_config( | ||
app_config: config.RawConfig, prefix: str = "rediscluster.", **kwargs: Any | ||
) -> rediscluster.ClusterConnectionPool: | ||
"""Make a ClusterConnectionPool from a configuration dictionary. | ||
|
||
The keys useful to :py:func:`cluster_pool_from_config` should be prefixed, e.g. | ||
``rediscluster.url``, ``rediscluster.max_connections``, etc. The ``prefix`` argument | ||
specifies the prefix used to filter keys. Each key is mapped to a | ||
corresponding keyword argument on the :py:class:`rediscluster.ClusterConnectionPool` | ||
constructor. | ||
|
||
Supported keys: | ||
|
||
* ``url`` (required): a URL like ``redis://localhost/0``. | ||
* ``max_connections``: an integer maximum number of connections in the pool | ||
* ``skip_full_coverage_check``: Skips the check of cluster-require-full-coverage | ||
config, useful for clusters without the CONFIG command (like aws) | ||
* ``nodemanager_follow_cluster``: Tell the node manager to reuse the last set of | ||
nodes it was operating on when intializing. | ||
* ``read_from_replicas``: (Boolean) Whether the client should send all read queries to | ||
replicas instead of just the primary | ||
* ``timeout``: how long to wait for sockets to connect. e.g. | ||
``200 milliseconds`` (:py:func:`~baseplate.lib.config.Timespan`) | ||
""" | ||
|
||
assert prefix.endswith(".") | ||
|
||
parser = config.SpecParser( | ||
{ | ||
"url": config.String, | ||
"max_connections": config.Optional(config.Integer, default=50), | ||
"timeout": config.Optional(config.Timespan, default=None), | ||
"read_from_replicas": config.Optional(config.Boolean, default=True), | ||
"skip_full_coverage_check": config.Optional(config.Boolean, default=True), | ||
"nodemanager_follow_cluster": config.Optional(config.Boolean, default=None), | ||
"decode_responses": config.Optional(config.Boolean, default=True), | ||
} | ||
) | ||
|
||
options = parser.parse(prefix[:-1], app_config) | ||
|
||
# We're explicitly setting a default here because of https://github.com/Grokzen/redis-py-cluster/issues/435 | ||
kwargs.setdefault("max_connections", options.max_connections) | ||
|
||
kwargs.setdefault("decode_responses", options.decode_responses) | ||
|
||
if options.nodemanager_follow_cluster is not None: | ||
kwargs.setdefault("nodemanager_follow_cluster", options.nodemanager_follow_cluster) | ||
if options.skip_full_coverage_check is not None: | ||
kwargs.setdefault("skip_full_coverage_check", options.skip_full_coverage_check) | ||
if options.timeout is not None: | ||
kwargs.setdefault("timeout", options.timeout.total_seconds()) | ||
|
||
if options.read_from_replicas: | ||
connection_pool = ClusterWithReadReplicasBlockingConnectionPool.from_url( | ||
options.url, **kwargs | ||
) | ||
else: | ||
connection_pool = rediscluster.ClusterBlockingConnectionPool.from_url(options.url, **kwargs) | ||
|
||
connection_pool.read_from_replicas = options.read_from_replicas | ||
connection_pool.skip_full_coverage_check = options.skip_full_coverage_check | ||
|
||
return connection_pool | ||
|
||
|
||
class ClusterRedisClient(config.Parser): | ||
"""Configure a clustered Redis client. | ||
|
||
This is meant to be used with | ||
:py:meth:`baseplate.Baseplate.configure_context`. | ||
|
||
See :py:func:`cluster_pool_from_config` for available configuration settings. | ||
|
||
""" | ||
|
||
def __init__(self, **kwargs: Any): | ||
self.kwargs = kwargs | ||
|
||
def parse(self, key_path: str, raw_config: config.RawConfig) -> "ClusterRedisContextFactory": | ||
connection_pool = cluster_pool_from_config(raw_config, f"{key_path}.", **self.kwargs) | ||
return ClusterRedisContextFactory(connection_pool) | ||
|
||
|
||
class ClusterRedisContextFactory(ContextFactory): | ||
"""Cluster Redis client context factory. | ||
|
||
This factory will attach a | ||
:py:class:`~baseplate.clients.redis.MonitoredClusterRedisConnection` to an | ||
attribute on the :py:class:`~baseplate.RequestContext`. When Redis commands | ||
are executed via this connection object, they will use connections from the | ||
provided :py:class:`rediscluster.ClusterConnectionPool` and automatically record | ||
diagnostic information. | ||
|
||
:param connection_pool: A connection pool. | ||
""" | ||
|
||
def __init__(self, connection_pool: rediscluster.ClusterConnectionPool): | ||
self.connection_pool = connection_pool | ||
|
||
def report_runtime_metrics(self, batch: metrics.Client) -> None: | ||
if not isinstance(self.connection_pool, rediscluster.ClusterBlockingConnectionPool): | ||
return | ||
|
||
size = self.connection_pool.max_connections | ||
open_connections = len(self.connection_pool._connections) | ||
available = self.connection_pool.pool.qsize() | ||
in_use = size - available | ||
|
||
batch.gauge("pool.size").replace(size) | ||
batch.gauge("pool.in_use").replace(in_use) | ||
batch.gauge("pool.open_and_available").replace(open_connections - in_use) | ||
|
||
def make_object_for_context(self, name: str, span: Span) -> "MonitoredClusterRedisConnection": | ||
return MonitoredClusterRedisConnection(name, span, self.connection_pool) | ||
|
||
|
||
class MonitoredClusterRedisConnection(rediscluster.RedisCluster): | ||
"""Cluster Redis connection that collects diagnostic information. | ||
|
||
This connection acts like :py:class:`rediscluster.Redis` except that all | ||
operations are automatically wrapped with diagnostic collection. | ||
|
||
The interface is the same as that class except for the | ||
:py:meth:`~baseplate.clients.redis.MonitoredClusterRedisConnection.pipeline` | ||
method. | ||
|
||
""" | ||
|
||
def __init__( | ||
self, | ||
context_name: str, | ||
server_span: Span, | ||
connection_pool: rediscluster.ClusterConnectionPool, | ||
): | ||
self.context_name = context_name | ||
self.server_span = server_span | ||
|
||
super().__init__( | ||
connection_pool=connection_pool, | ||
read_from_replicas=connection_pool.read_from_replicas, | ||
skip_full_coverage_check=connection_pool.skip_full_coverage_check, | ||
) | ||
|
||
def execute_command(self, *args: Any, **kwargs: Any) -> Any: | ||
command = args[0] | ||
trace_name = f"{self.context_name}.{command}" | ||
|
||
with self.server_span.make_child(trace_name): | ||
return super().execute_command(command, *args[1:], **kwargs) | ||
|
||
# pylint: disable=arguments-differ | ||
def pipeline(self, name: str) -> "MonitoredClusterRedisPipeline": | ||
"""Create a pipeline. | ||
|
||
This returns an object on which you can call the standard Redis | ||
commands. Execution will be deferred until ``execute`` is called. This | ||
is useful for saving round trips even in a clustered environment . | ||
|
||
:param name: The name to attach to diagnostics for this pipeline. | ||
|
||
""" | ||
return MonitoredClusterRedisPipeline( | ||
f"{self.context_name}.pipeline_{name}", | ||
self.server_span, | ||
self.connection_pool, | ||
self.response_callbacks, | ||
) | ||
|
||
# No transaction support in redis-py-cluster | ||
def transaction(self, *args: Any, **kwargs: Any) -> Any: | ||
"""Not currently implemented.""" | ||
raise NotImplementedError | ||
|
||
|
||
# pylint: disable=abstract-method | ||
class MonitoredClusterRedisPipeline(ClusterPipeline): | ||
def __init__( | ||
self, | ||
trace_name: str, | ||
server_span: Span, | ||
connection_pool: rediscluster.ClusterConnectionPool, | ||
response_callbacks: Dict, | ||
**kwargs: Any, | ||
): | ||
self.trace_name = trace_name | ||
self.server_span = server_span | ||
super().__init__(connection_pool, response_callbacks, **kwargs) | ||
|
||
# pylint: disable=arguments-differ | ||
def execute(self, **kwargs: Any) -> Any: | ||
with self.server_span.make_child(self.trace_name): | ||
return super().execute(**kwargs) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
``baseplate.clients.redis_cluster`` | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. should the hot key tracking stuff be mentioned somewhere in this page? |
||
=================================== | ||
|
||
`Redis`_ is an in-memory data structure store used where speed is necessary but | ||
complexity is beyond simple key-value operations. (If you're just doing | ||
caching, prefer :doc:`memcached <memcache>`). `Redis-py-cluster`_ is a Python | ||
client library that supports interacting with Redis when operating in cluster mode. | ||
|
||
.. _`Redis`: https://redis.io/ | ||
.. _`redis-py-cluster`: https://github.com/Grokzen/redis-py | ||
|
||
.. automodule:: baseplate.clients.redis_cluster | ||
|
||
Example | ||
------- | ||
|
||
To integrate redis-py-cluster with your application, add the appropriate client | ||
declaration to your context configuration:: | ||
|
||
baseplate.configure_context( | ||
app_config, | ||
{ | ||
... | ||
"foo": ClusterRedisClient(), | ||
... | ||
} | ||
) | ||
|
||
configure it in your application's configuration file: | ||
|
||
.. code-block:: ini | ||
|
||
[app:main] | ||
|
||
... | ||
|
||
|
||
# required: what redis instance to connect to | ||
foo.url = redis://localhost:6379/0 | ||
|
||
# optional: the maximum size of the connection pool | ||
foo.max_connections = 99 | ||
|
||
# optional: how long to wait for a connection to establish | ||
foo.timeout = 3 seconds | ||
|
||
# optional: Whether read requests should be directed to replicas as well | ||
# instead of just the primary | ||
foo.read_from_replicas = true | ||
... | ||
|
||
|
||
and then use the attached :py:class:`~redis.Redis`-like object in | ||
request:: | ||
|
||
def my_method(request): | ||
request.foo.ping() | ||
|
||
Configuration | ||
------------- | ||
|
||
.. autoclass:: ClusterRedisClient | ||
|
||
.. autofunction:: cluster_pool_from_config | ||
|
||
Classes | ||
------- | ||
|
||
.. autoclass:: ClusterRedisContextFactory | ||
:members: | ||
|
||
.. autoclass:: MonitoredClusterRedisConnection | ||
:members: | ||
|
||
Runtime Metrics | ||
--------------- | ||
|
||
In addition to request-level metrics reported through spans, this wrapper | ||
reports connection pool statistics periodically via the :ref:`runtime-metrics` | ||
system. All metrics are tagged with ``client``, the name given to | ||
:py:meth:`~baseplate.Baseplate.configure_context` when registering this context | ||
factory. | ||
|
||
The following metrics are reported: | ||
|
||
``runtime.pool.size`` | ||
The size limit for the connection pool. | ||
``runtime.pool.in_use`` | ||
How many connections have been established and are currently checked out and | ||
being used. | ||
|
||
.. versionadded:: 2.1 | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can this go up at the top, like right before the |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.