|
3 | 3 | from typing import Any
|
4 | 4 | from typing import Dict
|
5 | 5 |
|
6 |
| -import rediscluster # type: ignore |
| 6 | +import rediscluster |
7 | 7 |
|
8 |
| -from rediscluster.pipeline import ClusterPipeline # type: ignore |
| 8 | +from rediscluster.pipeline import ClusterPipeline |
9 | 9 |
|
10 | 10 | from baseplate import Span
|
11 | 11 | from baseplate.clients import ContextFactory
|
|
14 | 14 |
|
15 | 15 |
|
16 | 16 | # We want to be able to combine blocking behaviour with the ability to read from replicas
|
17 |
| -# Unfortunately this is not provide as-is so we cmobine two connection pool classes to provide |
| 17 | +# Unfortunately this is not provide as-is so we combine two connection pool classes to provide |
18 | 18 | # the desired behaviour.
|
19 | 19 | class ClusterWithReadReplicasBlockingConnectionPool(rediscluster.ClusterBlockingConnectionPool):
|
20 | 20 | # pylint: disable=arguments-differ
|
21 | 21 | def get_node_by_slot(self, slot: int, read_command: bool = False) -> Dict[str, Any]:
|
22 | 22 | """
|
23 |
| - Get a random node from the slot, including master |
| 23 | + Get a node from the slot. |
| 24 | + If the command is a read command we'll try to return a random replica. |
| 25 | + If there are no replicas or this isn't a read command we'll return the primary. |
24 | 26 | """
|
25 |
| - nodes_in_slot = self.nodes.slots[slot] |
26 |
| - if read_command: |
27 |
| - random_index = random.randrange(1, len(nodes_in_slot)) |
28 |
| - return nodes_in_slot[random_index] |
| 27 | + primary, *replicas = self.nodes.slots[slot] |
29 | 28 |
|
30 |
| - return nodes_in_slot[0] |
| 29 | + if replicas and read_command: |
| 30 | + return random.choice(replicas) |
| 31 | + |
| 32 | + # Either this isn't a read command or there aren't any replicas |
| 33 | + return primary |
31 | 34 |
|
32 | 35 |
|
33 | 36 | def cluster_pool_from_config(
|
@@ -60,7 +63,7 @@ def cluster_pool_from_config(
|
60 | 63 | parser = config.SpecParser(
|
61 | 64 | {
|
62 | 65 | "url": config.String,
|
63 |
| - "max_connections": config.Optional(config.Integer, default=None), |
| 66 | + "max_connections": config.Optional(config.Integer, default=50), |
64 | 67 | "timeout": config.Optional(config.Timespan, default=100),
|
65 | 68 | "read_from_replicas": config.Optional(config.Boolean, default=False),
|
66 | 69 | "skip_full_coverage_check": config.Optional(config.Boolean, default=True),
|
|
0 commit comments