Skip to content
This repository was archived by the owner on Jan 9, 2024. It is now read-only.

Bugfix/fix move errors #477

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/cluster-setup.rst
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ A fully functional docker image can be found at https://github.com/Grokzen/docke

See repo `README` for detailed instructions how to setup and run.


A docker_compose.yml file is included in the test suite, which can be used to run a redis cluster configured appropriately to run the tests against.

Vagrant
-------
Expand Down
28 changes: 18 additions & 10 deletions rediscluster/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,14 @@ def _execute_command(self, *args, **kwargs):
ttl = int(self.RedisClusterRequestTTL)
connection_error_retry_counter = 0

def log_exception(message, exception):
if ttl == 0:
# This is the last attempt before we run out of TTL, so log the full exception.
log.exception(message)
else:
# We are going to retry, and therefore may yet succeed, so just log a warning.
log.warning(message + str(exception))

while ttl > 0:
ttl -= 1
connection = None
Expand Down Expand Up @@ -630,7 +638,7 @@ def _execute_command(self, *args, **kwargs):
connection.send_command(*args)
return self.parse_response(connection, command, **kwargs)
except SlotNotCoveredError as e:
log.exception("SlotNotCoveredError")
log_exception("SlotNotCoveredError", e)

# In some cases during failover to a replica is happening
# a slot sometimes is not covered by the cluster layout and
Expand All @@ -639,13 +647,13 @@ def _execute_command(self, *args, **kwargs):
time.sleep(0.1)

# This is the last attempt before we run out of TTL, raise the exception
if ttl == 1:
if ttl == 0:
raise e
except (RedisClusterException, BusyLoadingError):
log.exception("RedisClusterException || BusyLoadingError")
raise
except ConnectionError:
log.exception("ConnectionError")
except ConnectionError as e:
log_exception("ConnectionError", e)

# ConnectionError can also be raised if we couldn't get a connection
# from the pool before timing out, so check that this is an actual
Expand All @@ -670,8 +678,8 @@ def _execute_command(self, *args, **kwargs):
self.connection_pool.nodes.increment_reinitialize_counter(
count=self.connection_pool.nodes.reinitialize_steps,
)
except TimeoutError:
log.exception("TimeoutError")
except TimeoutError as e:
log_exception("TimeoutError", e)
connection.disconnect()

if ttl < self.RedisClusterRequestTTL / 2:
Expand All @@ -692,20 +700,20 @@ def _execute_command(self, *args, **kwargs):
# This counter will increase faster when the same client object
# is shared between multiple threads. To reduce the frequency you
# can set the variable 'reinitialize_steps' in the constructor.
log.exception("MovedError")
log_exception("MovedError", e)

self.refresh_table_asap = True
self.connection_pool.nodes.increment_reinitialize_counter()

node = self.connection_pool.nodes.set_node(e.host, e.port, server_type='master')
self.connection_pool.nodes.slots[e.slot_id][0] = node
except TryAgainError:
log.exception("TryAgainError")
except TryAgainError as e:
log_exception("TryAgainError", e)

if ttl < self.RedisClusterRequestTTL / 2:
time.sleep(0.05)
except AskError as e:
log.exception("AskError")
log_exception("AskError", e)

redirect_addr, asking = "{0}:{1}".format(e.host, e.port), True
except BaseException as e:
Expand Down
4 changes: 3 additions & 1 deletion rediscluster/nodemanager.py
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ def initialize(self):
# Set the tmp variables to the real variables
self.slots = tmp_slots
self.nodes = nodes_cache
self.populate_startup_nodes()
self.reinitialize_counter = 0

log.debug("NodeManager initialize done : Nodes")
Expand Down Expand Up @@ -413,7 +414,8 @@ def set_node(self, host, port, server_type=None):

def populate_startup_nodes(self):
"""
Do something with all startup nodes and filters out any duplicates
Use nodes to populate startup_nodes, so that we have more chances
if a subset of the cluster fails.
"""
for item in self.startup_nodes:
self.set_node_name(item)
Expand Down
20 changes: 20 additions & 0 deletions tests/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
version: "3.7"

services:

redis:
image: grokzen/redis-cluster:latest
environment:
REDIS_PORT: 7000
BIND_ADDRESS: 0.0.0.0
ports:
- "7000:7000"
- "7001:7001"
- "7002:7002"
- "7003:7003"
- "7004:7004"
- "7005:7005"
- "7006:7006"
- "7007:7007"
# Catch signals (in particular, termination)
init: true
14 changes: 12 additions & 2 deletions tests/test_cluster_node_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ def test_empty_startup_nodes():

def test_wrong_startup_nodes_type():
"""
If something other then a list type itteratable is provided it should fail
If something other then a list type iterable is provided it should fail
"""
with pytest.raises(RedisClusterException):
NodeManager({})
Expand Down Expand Up @@ -263,9 +263,19 @@ def test_all_nodes():
assert node in nodes


def test_startup_nodes_are_populated():
"""
Set a list of nodes and it should be possible to iterate over all
"""
n = NodeManager(startup_nodes=[{"host": "127.0.0.1", "port": 7000}])
n.initialize()

assert sorted([node['port'] for node in n.startup_nodes]) == [7000, 7000, 7001, 7002, 7003, 7004, 7005]


def test_all_nodes_masters():
"""
Set a list of nodes with random masters/slaves config and it shold be possible
Set a list of nodes with random masters/slaves config and it should be possible
to iterate over all of them.
"""
n = NodeManager(
Expand Down
1 change: 0 additions & 1 deletion tests/test_multiprocessing_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,6 @@ def target(pool):

# Check that connection is still alive after fork process has exited
# and disconnected the connections in its pool
conn = pool.get_random_connection()
with exit_callback(pool.release, conn):
assert conn.send_command('ping') is None
assert conn.read_response() == b'PONG'
Expand Down