From 6f1fb57c9522ce65ccea7d1a75b1118a32634d02 Mon Sep 17 00:00:00 2001 From: Kefu Du Date: Mon, 5 Oct 2020 15:57:58 -0400 Subject: [PATCH 1/2] only send readonly to replicas once on new connections --- rediscluster/client.py | 8 -------- rediscluster/connection.py | 4 ++++ 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/rediscluster/client.py b/rediscluster/client.py index cb8f59de..5571c49d 100644 --- a/rediscluster/client.py +++ b/rediscluster/client.py @@ -578,7 +578,6 @@ def _execute_command(self, *args, **kwargs): redirect_addr = None asking = False - is_read_replica = False try_random_node = False slot = self._determine_slot(*args) @@ -605,7 +604,6 @@ def _execute_command(self, *args, **kwargs): slot, self.read_from_replicas and (command in self.READ_COMMANDS) ) - is_read_replica = node['server_type'] == 'slave' connection = self.connection_pool.get_connection_by_node(node) @@ -615,12 +613,6 @@ def _execute_command(self, *args, **kwargs): connection.send_command('ASKING') self.parse_response(connection, "ASKING", **kwargs) asking = False - if is_read_replica: - # Ask read replica to accept reads (see https://redis.io/commands/readonly) - # TODO: do we need to handle errors from this response? - connection.send_command('READONLY') - self.parse_response(connection, 'READONLY', **kwargs) - is_read_replica = False connection.send_command(*args) return self.parse_response(connection, command, **kwargs) diff --git a/rediscluster/connection.py b/rediscluster/connection.py index 561e7657..1b998598 100644 --- a/rediscluster/connection.py +++ b/rediscluster/connection.py @@ -343,6 +343,10 @@ def get_connection_by_node(self, node): connection = self._available_connections.get(node["name"], []).pop() except IndexError: connection = self.make_connection(node) + if node["server_type"] == "slave": + connection.send_command('READONLY') + if nativestr(connection.read_response()) != 'OK': + raise ConnectionError('READONLY command failed') self._in_use_connections.setdefault(node["name"], set()).add(connection) From 27bd398682a344feb8e0b93d6925e405efae3d3e Mon Sep 17 00:00:00 2001 From: Kefu Du Date: Tue, 6 Oct 2020 11:37:29 -0400 Subject: [PATCH 2/2] fix tests --- rediscluster/connection.py | 3 ++- tests/test_cluster_connection_pool.py | 26 +++++++++++++------------- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/rediscluster/connection.py b/rediscluster/connection.py index 1b998598..f06eae09 100644 --- a/rediscluster/connection.py +++ b/rediscluster/connection.py @@ -343,7 +343,8 @@ def get_connection_by_node(self, node): connection = self._available_connections.get(node["name"], []).pop() except IndexError: connection = self.make_connection(node) - if node["server_type"] == "slave": + server_type = node.get("server_type", "master") + if server_type == "slave": connection.send_command('READONLY') if nativestr(connection.read_response()) != 'OK': raise ConnectionError('READONLY command failed') diff --git a/tests/test_cluster_connection_pool.py b/tests/test_cluster_connection_pool.py index 654eb6fe..99eeb92f 100644 --- a/tests/test_cluster_connection_pool.py +++ b/tests/test_cluster_connection_pool.py @@ -76,29 +76,29 @@ def test_in_use_not_exists(self): def test_connection_creation(self): connection_kwargs = {'foo': 'bar', 'biz': 'baz'} pool = self.get_pool(connection_kwargs=connection_kwargs) - connection = pool.get_connection_by_node({"host": "127.0.0.1", "port": 7000}) + connection = pool.get_connection_by_node({"host": "127.0.0.1", "port": 7000, "sever_type": "master"}) assert isinstance(connection, DummyConnection) assert connection.kwargs == connection_kwargs def test_multiple_connections(self): pool = self.get_pool() - c1 = pool.get_connection_by_node({"host": "127.0.0.1", "port": 7000}) - c2 = pool.get_connection_by_node({"host": "127.0.0.1", "port": 7001}) + c1 = pool.get_connection_by_node({"host": "127.0.0.1", "port": 7000, "sever_type": "master"}) + c2 = pool.get_connection_by_node({"host": "127.0.0.1", "port": 7001, "sever_type": "slave"}) assert c1 != c2 def test_max_connections(self): pool = self.get_pool(max_connections=2) - pool.get_connection_by_node({"host": "127.0.0.1", "port": 7000}) - pool.get_connection_by_node({"host": "127.0.0.1", "port": 7001}) + pool.get_connection_by_node({"host": "127.0.0.1", "port": 7000, "sever_type": "master"}) + pool.get_connection_by_node({"host": "127.0.0.1", "port": 7001, "sever_type": "slave"}) with pytest.raises(RedisClusterException): pool.get_connection_by_node({"host": "127.0.0.1", "port": 7000}) def test_max_connections_per_node(self): pool = self.get_pool(max_connections=2, max_connections_per_node=True) - pool.get_connection_by_node({"host": "127.0.0.1", "port": 7000}) - pool.get_connection_by_node({"host": "127.0.0.1", "port": 7001}) - pool.get_connection_by_node({"host": "127.0.0.1", "port": 7000}) - pool.get_connection_by_node({"host": "127.0.0.1", "port": 7001}) + pool.get_connection_by_node({"host": "127.0.0.1", "port": 7000, "sever_type": "master"}) + pool.get_connection_by_node({"host": "127.0.0.1", "port": 7001, "sever_type": "master"}) + pool.get_connection_by_node({"host": "127.0.0.1", "port": 7000, "sever_type": "master"}) + pool.get_connection_by_node({"host": "127.0.0.1", "port": 7001, "sever_type": "master"}) with pytest.raises(RedisClusterException): pool.get_connection_by_node({"host": "127.0.0.1", "port": 7000}) @@ -108,9 +108,9 @@ def test_max_connections_default_setting(self): def test_reuse_previously_released_connection(self): pool = self.get_pool() - c1 = pool.get_connection_by_node({"host": "127.0.0.1", "port": 7000}) + c1 = pool.get_connection_by_node({"host": "127.0.0.1", "port": 7000, "sever_type": "master"}) pool.release(c1) - c2 = pool.get_connection_by_node({"host": "127.0.0.1", "port": 7000}) + c2 = pool.get_connection_by_node({"host": "127.0.0.1", "port": 7000, "sever_type": "master"}) assert c1 == c2 def test_repr_contains_db_info_tcp(self): @@ -426,8 +426,8 @@ def test_repr_contains_db_info_readonly(self): def test_max_connections(self): pool = self.get_pool(max_connections=2) - pool.get_connection_by_node({"host": "127.0.0.1", "port": 7000}) - pool.get_connection_by_node({"host": "127.0.0.1", "port": 7001}) + pool.get_connection_by_node({"host": "127.0.0.1", "port": 7000, "sever_type": "master"}) + pool.get_connection_by_node({"host": "127.0.0.1", "port": 7001, "sever_type": "master"}) with pytest.raises(RedisClusterException): pool.get_connection_by_node({"host": "127.0.0.1", "port": 7000})