Open
Description
Hey y'all!
It seems like the retry-functionality in the PubSub-client is not working correctly.
Reproduce Error
To reproduce the error I am closing valkey while my system is trying to get new messages in a parallel thread. Even though our connection is set up with an infinite amount of retry, the PubSub-client always crashes and never recovers.
Here is a minimal pseudo code to help reproduce the issue:
def event_handler(msg):
print(msg)
vk_pub_sub.psubscribe(**{"test": event_handler})
def exception_handler(ex, pubsub, thread):
print(traceback.format_exc())
vk_pub_sub.run_in_thread(sleep_time=0.1, exception_handler=exception_handler)
# Close VK connection
Error Trace
Traceback (most recent call last):
File "/path/to/env/lib/python3.10/site-packages/valkey/retry.py", line 62, in call_with_retry
return do()
File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 860, in <lambda>
lambda: command(*args, **kwargs),
File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 877, in try_read
if not conn.can_read(timeout=timeout):
File "/path/to/env/lib/python3.10/site-packages/valkey/connection.py", line 538, in can_read
return self._parser.can_read(timeout)
File "/path/to/env/lib/python3.10/site-packages/valkey/_parsers/libvalkey.py", line 80, in can_read
return self.read_from_socket(timeout=timeout, raise_on_timeout=False)
File "/path/to/env/lib/python3.10/site-packages/valkey/_parsers/libvalkey.py", line 91, in read_from_socket
raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
valkey.exceptions.ConnectionError: Connection closed by server.
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/path/to/env/lib/python3.10/site-packages/valkey/connection.py", line 560, in read_response
response = self._parser.read_response(disable_decoding=disable_decoding)
File "/path/to/env/lib/python3.10/site-packages/valkey/_parsers/libvalkey.py", line 129, in read_response
self.read_from_socket()
File "/path/to/env/lib/python3.10/site-packages/valkey/_parsers/libvalkey.py", line 89, in read_from_socket
bufflen = self._sock.recv_into(self._buffer)
ConnectionResetError: [Errno 104] Connection reset by peer
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 1210, in run
pubsub.get_message(ignore_subscribe_messages=True, timeout=sleep_time)
File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 1072, in get_message
response = self.parse_response(block=(timeout is None), timeout=timeout)
File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 883, in parse_response
response = self._execute(conn, try_read)
File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 859, in _execute
return conn.retry.call_with_retry(
File "/path/to/env/lib/python3.10/site-packages/valkey/retry.py", line 65, in call_with_retry
fail(error)
File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 861, in <lambda>
lambda error: self._disconnect_raise_connect(conn, error),
File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 849, in _disconnect_raise_connect
conn.connect()
File "/path/to/env/lib/python3.10/site-packages/valkey/connection.py", line 328, in connect
self.on_connect()
File "/path/to/env/lib/python3.10/site-packages/valkey/connection.py", line 430, in on_connect
self.read_response()
File "/path/to/env/lib/python3.10/site-packages/valkey/connection.py", line 568, in read_response
raise ConnectionError(
valkey.exceptions.ConnectionError: Error while reading from localhost:6379 : (104, 'Connection reset by peer')
Possible Cause
From my debug and the traces, it seems that the issue is related to the connect
method since the second part of the error trace occurs outside the retry scope.
Quick Patch
A quick workaround is to wrap everything inside a retry call, as shown below. However, this might introduce unexpected side effects:
def connect(self):
"""Connects to the Valkey server if not already connected."""
if self._sock:
return
def _full_connect():
self._sock = self._connect()
if self.valkey_connect_func is None:
# Use the default on_connect function
self.on_connect()
else:
# Use the passed function valkey_connect_func
self.valkey_connect_func(self)
# run any user callbacks. right now the only internal callback
# is for pubsub channel/pattern resubscription
# first, remove any dead weakrefs
self._connect_callbacks = [ref for ref in self._connect_callbacks if ref()]
for ref in self._connect_callbacks:
callback = ref()
if callback:
callback(self)
try:
sock = self.retry.call_with_retry(
lambda: _full_connect(),
lambda error: self.disconnect(error)
)
except socket.timeout:
raise TimeoutError("Timeout connecting to server")
except OSError as e:
raise ConnectionError(self._error_message(e))
Thanks in advance for any help 🙂