Skip to content

Commit 4be949a

Browse files
committed
Add dynamic_startup_nodes parameter to async ValkeyCluster
Signed-off-by: Jonas Dittrich <kakadus2303@gmail.com>
1 parent e0151c1 commit 4be949a

File tree

2 files changed

+38
-2
lines changed

2 files changed

+38
-2
lines changed

tests/test_asyncio/test_cluster.py

+21
Original file line numberDiff line numberDiff line change
@@ -2598,6 +2598,27 @@ def cmd_init_mock(self, r: ClusterNode) -> None:
25982598
assert rc.get_node(host=default_host, port=7001) is not None
25992599
assert rc.get_node(host=default_host, port=7002) is not None
26002600

2601+
@pytest.mark.parametrize("dynamic_startup_nodes", [True, False])
2602+
async def test_init_slots_dynamic_startup_nodes(self, dynamic_startup_nodes):
2603+
rc = await get_mocked_valkey_client(
2604+
host="my@DNS.com",
2605+
port=7000,
2606+
cluster_slots=default_cluster_slots,
2607+
dynamic_startup_nodes=dynamic_startup_nodes,
2608+
)
2609+
# Nodes are taken from default_cluster_slots
2610+
discovered_nodes = [
2611+
"127.0.0.1:7000",
2612+
"127.0.0.1:7001",
2613+
"127.0.0.1:7002",
2614+
"127.0.0.1:7003",
2615+
]
2616+
startup_nodes = list(rc.nodes_manager.startup_nodes.keys())
2617+
if dynamic_startup_nodes is True:
2618+
assert startup_nodes.sort() == discovered_nodes.sort()
2619+
else:
2620+
assert startup_nodes == ["my@DNS.com:7000"]
2621+
26012622

26022623
class TestClusterPipeline:
26032624
"""Tests for the ClusterPipeline class."""

valkey/asyncio/cluster.py

+17-2
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,14 @@ class ValkeyCluster(AbstractValkey, AbstractValkeyCluster, AsyncValkeyClusterCom
139139
| Enable read from replicas in READONLY mode. You can read possibly stale data.
140140
When set to true, read commands will be assigned between the primary and
141141
its replications in a Round-Robin manner.
142+
:param dynamic_startup_nodes:
143+
| Set the ValkeyCluster's startup nodes to all the discovered nodes.
144+
If true (default value), the cluster's discovered nodes will be used to
145+
determine the cluster nodes-slots mapping in the next topology refresh.
146+
It will remove the initial passed startup nodes if their endpoints aren't
147+
listed in the CLUSTER SLOTS output.
148+
If you use dynamic DNS endpoints for startup nodes but CLUSTER SLOTS lists
149+
specific IP addresses, it is best to set it to false.
142150
:param reinitialize_steps:
143151
| Specifies the number of MOVED errors that need to occur before reinitializing
144152
the whole cluster topology. If a MOVED error occurs and the cluster does not
@@ -237,6 +245,7 @@ def __init__(
237245
startup_nodes: Optional[List["ClusterNode"]] = None,
238246
require_full_coverage: bool = True,
239247
read_from_replicas: bool = False,
248+
dynamic_startup_nodes: bool = True,
240249
reinitialize_steps: int = 5,
241250
cluster_error_retry_attempts: int = 3,
242251
connection_error_retry_attempts: int = 3,
@@ -389,6 +398,7 @@ def __init__(
389398
startup_nodes,
390399
require_full_coverage,
391400
kwargs,
401+
dynamic_startup_nodes=dynamic_startup_nodes,
392402
address_remap=address_remap,
393403
)
394404
self.encoder = Encoder(encoding, encoding_errors, decode_responses)
@@ -1142,6 +1152,7 @@ class NodesManager:
11421152
"require_full_coverage",
11431153
"slots_cache",
11441154
"startup_nodes",
1155+
"_dynamic_startup_nodes",
11451156
"address_remap",
11461157
)
11471158

@@ -1150,11 +1161,13 @@ def __init__(
11501161
startup_nodes: List["ClusterNode"],
11511162
require_full_coverage: bool,
11521163
connection_kwargs: Dict[str, Any],
1164+
dynamic_startup_nodes: bool = True,
11531165
address_remap: Optional[Callable[[Tuple[str, int]], Tuple[str, int]]] = None,
11541166
) -> None:
11551167
self.startup_nodes = {node.name: node for node in startup_nodes}
11561168
self.require_full_coverage = require_full_coverage
11571169
self.connection_kwargs = connection_kwargs
1170+
self._dynamic_startup_nodes = dynamic_startup_nodes
11581171
self.address_remap = address_remap
11591172

11601173
self.default_node: "ClusterNode" = None
@@ -1387,8 +1400,10 @@ async def initialize(self) -> None:
13871400
# Set the tmp variables to the real variables
13881401
self.slots_cache = tmp_slots
13891402
self.set_nodes(self.nodes_cache, tmp_nodes_cache, remove_old=True)
1390-
# Populate the startup nodes with all discovered nodes
1391-
self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True)
1403+
1404+
if self._dynamic_startup_nodes:
1405+
# Populate the startup nodes with all discovered nodes
1406+
self.set_nodes(self.startup_nodes, self.nodes_cache, remove_old=True)
13921407

13931408
# Set the default node
13941409
self.default_node = self.get_nodes_by_server_type(PRIMARY)[0]

0 commit comments

Comments
 (0)