Skip to content
This repository was archived by the owner on Mar 24, 2025. It is now read-only.

Add rediscluster support #573

Merged
merged 28 commits into from
May 19, 2021
Merged
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
7492d69
Add redisc cluster client support
Mar 8, 2021
e9b646a
Add integration tests for ClusterRedisClient
Mar 8, 2021
6050bc3
remove unused import
Mar 8, 2021
8d5d40b
remove unnneeded comment
Mar 8, 2021
cb95feb
remove unused import
Mar 8, 2021
9655d17
disable pylint warning
Mar 8, 2021
6214d10
Update baseplate/clients/redis.py
FranGM Mar 8, 2021
78dc569
Update baseplate/clients/redis.py
FranGM Mar 8, 2021
1765def
address comments
Mar 10, 2021
ffdecf9
set version
Mar 10, 2021
9570abf
Update baseplate/clients/redis_cluster.py
FranGM Mar 10, 2021
df35f03
address comments (take 2)
Mar 11, 2021
94914ae
When read_from_replicas is enabled also read from master
Mar 11, 2021
191ed5d
Return only one node on get_node_by_slot
FranGM Mar 30, 2021
c78dcf2
fix documentation lint issues
FranGM Mar 30, 2021
8db95aa
change some client defaults
FranGM Mar 30, 2021
024adff
add more tests
FranGM Mar 30, 2021
b904a82
Remove unused metric
Apr 6, 2021
aeb416d
pass read_from_replicas arg when creating pipeline
FranGM Apr 20, 2021
2c099df
bump redis-py-cluster version
FranGM Apr 20, 2021
1ee4bdc
Add hot key tracker to the cluster reddis client
Apr 29, 2021
35839ad
Merge branch 'rediscluster-support' of github.com:FranGM/baseplate.py…
Apr 29, 2021
4255cf2
Allow to configure max_connections_per_node
FranGM May 2, 2021
972f82d
Update to current redis-py-cluster version
FranGM May 18, 2021
96054f4
Update requirements-transitive.txt
FranGM May 18, 2021
c35f976
Add hot key tracking to docs
May 19, 2021
44ecda5
Update setup.py
FranGM May 19, 2021
99b4ffd
Update docs/api/baseplate/clients/redis_cluster.rst
spladug May 19, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
226 changes: 226 additions & 0 deletions baseplate/clients/redis_cluster.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,226 @@
import random

from typing import Any
from typing import Dict

import rediscluster

from rediscluster.pipeline import ClusterPipeline

from baseplate import Span
from baseplate.clients import ContextFactory
from baseplate.lib import config
from baseplate.lib import metrics


# We want to be able to combine blocking behaviour with the ability to read from replicas
# Unfortunately this is not provide as-is so we combine two connection pool classes to provide
# the desired behaviour.
class ClusterWithReadReplicasBlockingConnectionPool(rediscluster.ClusterBlockingConnectionPool):
# pylint: disable=arguments-differ
def get_node_by_slot(self, slot: int, read_command: bool = False) -> Dict[str, Any]:
"""
Get a node from the slot.
If the command is a read command we'll try to return a random node.
If there are no replicas or this isn't a read command we'll return the primary.
"""
if read_command:
return random.choice(self.nodes.slots[slot])

# This isn't a read command, so return the primary (first node)
return self.nodes.slots[slot][0]


def cluster_pool_from_config(
app_config: config.RawConfig, prefix: str = "rediscluster.", **kwargs: Any
) -> rediscluster.ClusterConnectionPool:
"""Make a ClusterConnectionPool from a configuration dictionary.

The keys useful to :py:func:`cluster_pool_from_config` should be prefixed, e.g.
``rediscluster.url``, ``rediscluster.max_connections``, etc. The ``prefix`` argument
specifies the prefix used to filter keys. Each key is mapped to a
corresponding keyword argument on the :py:class:`rediscluster.ClusterConnectionPool`
constructor.

Supported keys:

* ``url`` (required): a URL like ``redis://localhost/0``.
* ``max_connections``: an integer maximum number of connections in the pool
* ``skip_full_coverage_check``: Skips the check of cluster-require-full-coverage
config, useful for clusters without the CONFIG command (like aws)
* ``nodemanager_follow_cluster``: Tell the node manager to reuse the last set of
nodes it was operating on when intializing.
* ``read_from_replicas``: (Boolean) Whether the client should send all read queries to
replicas instead of just the primary
* ``timeout``: how long to wait for sockets to connect. e.g.
``200 milliseconds`` (:py:func:`~baseplate.lib.config.Timespan`)
"""

assert prefix.endswith(".")

parser = config.SpecParser(
{
"url": config.String,
"max_connections": config.Optional(config.Integer, default=50),
"timeout": config.Optional(config.Timespan, default=None),
"read_from_replicas": config.Optional(config.Boolean, default=True),
"skip_full_coverage_check": config.Optional(config.Boolean, default=True),
"nodemanager_follow_cluster": config.Optional(config.Boolean, default=None),
"decode_responses": config.Optional(config.Boolean, default=True),
}
)

options = parser.parse(prefix[:-1], app_config)

# We're explicitly setting a default here because of https://github.com/Grokzen/redis-py-cluster/issues/435
kwargs.setdefault("max_connections", options.max_connections)

kwargs.setdefault("decode_responses", options.decode_responses)

if options.nodemanager_follow_cluster is not None:
kwargs.setdefault("nodemanager_follow_cluster", options.nodemanager_follow_cluster)
if options.skip_full_coverage_check is not None:
kwargs.setdefault("skip_full_coverage_check", options.skip_full_coverage_check)
if options.timeout is not None:
kwargs.setdefault("timeout", options.timeout.total_seconds())

if options.read_from_replicas:
connection_pool = ClusterWithReadReplicasBlockingConnectionPool.from_url(
options.url, **kwargs
)
else:
connection_pool = rediscluster.ClusterBlockingConnectionPool.from_url(options.url, **kwargs)

connection_pool.read_from_replicas = options.read_from_replicas
connection_pool.skip_full_coverage_check = options.skip_full_coverage_check

return connection_pool


class ClusterRedisClient(config.Parser):
"""Configure a clustered Redis client.

This is meant to be used with
:py:meth:`baseplate.Baseplate.configure_context`.

See :py:func:`cluster_pool_from_config` for available configuration settings.

"""

def __init__(self, **kwargs: Any):
self.kwargs = kwargs

def parse(self, key_path: str, raw_config: config.RawConfig) -> "ClusterRedisContextFactory":
connection_pool = cluster_pool_from_config(raw_config, f"{key_path}.", **self.kwargs)
return ClusterRedisContextFactory(connection_pool)


class ClusterRedisContextFactory(ContextFactory):
"""Cluster Redis client context factory.

This factory will attach a
:py:class:`~baseplate.clients.redis.MonitoredClusterRedisConnection` to an
attribute on the :py:class:`~baseplate.RequestContext`. When Redis commands
are executed via this connection object, they will use connections from the
provided :py:class:`rediscluster.ClusterConnectionPool` and automatically record
diagnostic information.

:param connection_pool: A connection pool.
"""

def __init__(self, connection_pool: rediscluster.ClusterConnectionPool):
self.connection_pool = connection_pool

def report_runtime_metrics(self, batch: metrics.Client) -> None:
if not isinstance(self.connection_pool, rediscluster.ClusterBlockingConnectionPool):
return

size = self.connection_pool.max_connections
open_connections = len(self.connection_pool._connections)
available = self.connection_pool.pool.qsize()
in_use = size - available

batch.gauge("pool.size").replace(size)
batch.gauge("pool.in_use").replace(in_use)
batch.gauge("pool.open_and_available").replace(open_connections - in_use)

def make_object_for_context(self, name: str, span: Span) -> "MonitoredClusterRedisConnection":
return MonitoredClusterRedisConnection(name, span, self.connection_pool)


class MonitoredClusterRedisConnection(rediscluster.RedisCluster):
"""Cluster Redis connection that collects diagnostic information.

This connection acts like :py:class:`rediscluster.Redis` except that all
operations are automatically wrapped with diagnostic collection.

The interface is the same as that class except for the
:py:meth:`~baseplate.clients.redis.MonitoredClusterRedisConnection.pipeline`
method.

"""

def __init__(
self,
context_name: str,
server_span: Span,
connection_pool: rediscluster.ClusterConnectionPool,
):
self.context_name = context_name
self.server_span = server_span

super().__init__(
connection_pool=connection_pool,
read_from_replicas=connection_pool.read_from_replicas,
skip_full_coverage_check=connection_pool.skip_full_coverage_check,
)

def execute_command(self, *args: Any, **kwargs: Any) -> Any:
command = args[0]
trace_name = f"{self.context_name}.{command}"

with self.server_span.make_child(trace_name):
return super().execute_command(command, *args[1:], **kwargs)

# pylint: disable=arguments-differ
def pipeline(self, name: str) -> "MonitoredClusterRedisPipeline":
"""Create a pipeline.

This returns an object on which you can call the standard Redis
commands. Execution will be deferred until ``execute`` is called. This
is useful for saving round trips even in a clustered environment .

:param name: The name to attach to diagnostics for this pipeline.

"""
return MonitoredClusterRedisPipeline(
f"{self.context_name}.pipeline_{name}",
self.server_span,
self.connection_pool,
self.response_callbacks,
)

# No transaction support in redis-py-cluster
def transaction(self, *args: Any, **kwargs: Any) -> Any:
"""Not currently implemented."""
raise NotImplementedError


# pylint: disable=abstract-method
class MonitoredClusterRedisPipeline(ClusterPipeline):
def __init__(
self,
trace_name: str,
server_span: Span,
connection_pool: rediscluster.ClusterConnectionPool,
response_callbacks: Dict,
**kwargs: Any,
):
self.trace_name = trace_name
self.server_span = server_span
super().__init__(connection_pool, response_callbacks, **kwargs)

# pylint: disable=arguments-differ
def execute(self, **kwargs: Any) -> Any:
with self.server_span.make_child(self.trace_name):
return super().execute(**kwargs)
4 changes: 3 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ services:
- "memcached"
- "redis"
- "zookeeper"

- "redis-cluster-node"
cassandra:
image: "cassandra:3.11"
environment:
Expand All @@ -25,3 +25,5 @@ services:
image: "redis:4.0.9"
zookeeper:
image: "zookeeper:3.4.10"
redis-cluster-node:
image: docker.io/grokzen/redis-cluster:6.2.0
1 change: 1 addition & 0 deletions docs/api/baseplate/clients/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ Instrumented Client Libraries
baseplate.clients.kombu: Client for publishing to queues <kombu>
baseplate.clients.memcache: Memcached Client <memcache>
baseplate.clients.redis: Redis Client <redis>
baseplate.clients.redis_cluster: Redis Cluster Client <redis_cluster>
baseplate.clients.requests: Requests (HTTP) Client <requests>
baseplate.clients.sqlalchemy: SQL Client for relational databases (e.g. PostgreSQL) <sqlalchemy>
baseplate.clients.thrift: Thrift client for RPC to other backend services <thrift>
Expand Down
92 changes: 92 additions & 0 deletions docs/api/baseplate/clients/redis_cluster.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
``baseplate.clients.redis_cluster``
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should the hot key tracking stuff be mentioned somewhere in this page?

===================================

`Redis`_ is an in-memory data structure store used where speed is necessary but
complexity is beyond simple key-value operations. (If you're just doing
caching, prefer :doc:`memcached <memcache>`). `Redis-py-cluster`_ is a Python
client library that supports interacting with Redis when operating in cluster mode.

.. _`Redis`: https://redis.io/
.. _`redis-py-cluster`: https://github.com/Grokzen/redis-py

.. automodule:: baseplate.clients.redis_cluster

Example
-------

To integrate redis-py-cluster with your application, add the appropriate client
declaration to your context configuration::

baseplate.configure_context(
app_config,
{
...
"foo": ClusterRedisClient(),
...
}
)

configure it in your application's configuration file:

.. code-block:: ini

[app:main]

...


# required: what redis instance to connect to
foo.url = redis://localhost:6379/0

# optional: the maximum size of the connection pool
foo.max_connections = 99

# optional: how long to wait for a connection to establish
foo.timeout = 3 seconds

# optional: Whether read requests should be directed to replicas as well
# instead of just the primary
foo.read_from_replicas = true
...


and then use the attached :py:class:`~redis.Redis`-like object in
request::

def my_method(request):
request.foo.ping()

Configuration
-------------

.. autoclass:: ClusterRedisClient

.. autofunction:: cluster_pool_from_config

Classes
-------

.. autoclass:: ClusterRedisContextFactory
:members:

.. autoclass:: MonitoredClusterRedisConnection
:members:

Runtime Metrics
---------------

In addition to request-level metrics reported through spans, this wrapper
reports connection pool statistics periodically via the :ref:`runtime-metrics`
system. All metrics are tagged with ``client``, the name given to
:py:meth:`~baseplate.Baseplate.configure_context` when registering this context
factory.

The following metrics are reported:

``runtime.pool.size``
The size limit for the connection pool.
``runtime.pool.in_use``
How many connections have been established and are currently checked out and
being used.

.. versionadded:: 2.1
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can this go up at the top, like right before the Example heading? I've only put the versionadded stuff down here on some other clients where the runtime metrics were the thing that was new.

1 change: 1 addition & 0 deletions requirements-transitive.txt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ pyramid==1.10.5
python-json-logger==2.0.1
reddit-cqlmapper==0.3.0
redis==3.5.3
redis-py-cluster==2.1.0
regex==2020.11.13
requests==2.25.1
sentry-sdk==0.20.1
Expand Down
3 changes: 3 additions & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ ignore_missing_imports = True
[mypy-pythonjsonlogger.*]
ignore_missing_imports = True

[mypy-rediscluster.*]
ignore_missing_imports = True

[mypy-sqlalchemy.*]
ignore_missing_imports = True

Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
"memcache": ["pymemcache>=1.3.0,<1.4.4"],
"pyramid": ["pyramid>=1.9.0,<2.0"],
"redis": ["redis>=2.10.0,<4.0.0"],
"redis-py-cluster": ["redis-py-cluster>=2.0.0,<3.0.0"],
"refcycle": ["objgraph>=3.0,<4.0"],
"requests": ["advocate>=1.0.0,<2.0"],
"sentry": ["sentry-sdk>=0.19,<1.0"],
Expand Down
Loading