Skip to content

Bug in retry-handling in the PubSub #169

Open
@ManelCoutinhoSensei

Description

@ManelCoutinhoSensei

Hey y'all!

It seems like the retry-functionality in the PubSub-client is not working correctly.

Reproduce Error

To reproduce the error I am closing valkey while my system is trying to get new messages in a parallel thread. Even though our connection is set up with an infinite amount of retry, the PubSub-client always crashes and never recovers.
Here is a minimal pseudo code to help reproduce the issue:

def event_handler(msg):
    print(msg)

vk_pub_sub.psubscribe(**{"test": event_handler})

def exception_handler(ex, pubsub, thread):
    print(traceback.format_exc())

vk_pub_sub.run_in_thread(sleep_time=0.1, exception_handler=exception_handler)

# Close VK connection

Error Trace

Traceback (most recent call last):
  File "/path/to/env/lib/python3.10/site-packages/valkey/retry.py", line 62, in call_with_retry
    return do()
  File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 860, in <lambda>
    lambda: command(*args, **kwargs),
  File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 877, in try_read
    if not conn.can_read(timeout=timeout):
  File "/path/to/env/lib/python3.10/site-packages/valkey/connection.py", line 538, in can_read
    return self._parser.can_read(timeout)
  File "/path/to/env/lib/python3.10/site-packages/valkey/_parsers/libvalkey.py", line 80, in can_read
    return self.read_from_socket(timeout=timeout, raise_on_timeout=False)
  File "/path/to/env/lib/python3.10/site-packages/valkey/_parsers/libvalkey.py", line 91, in read_from_socket
    raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
valkey.exceptions.ConnectionError: Connection closed by server.

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/path/to/env/lib/python3.10/site-packages/valkey/connection.py", line 560, in read_response
    response = self._parser.read_response(disable_decoding=disable_decoding)
  File "/path/to/env/lib/python3.10/site-packages/valkey/_parsers/libvalkey.py", line 129, in read_response
    self.read_from_socket()
  File "/path/to/env/lib/python3.10/site-packages/valkey/_parsers/libvalkey.py", line 89, in read_from_socket
    bufflen = self._sock.recv_into(self._buffer)
ConnectionResetError: [Errno 104] Connection reset by peer

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 1210, in run
    pubsub.get_message(ignore_subscribe_messages=True, timeout=sleep_time)
  File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 1072, in get_message
    response = self.parse_response(block=(timeout is None), timeout=timeout)
  File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 883, in parse_response
    response = self._execute(conn, try_read)
  File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 859, in _execute
    return conn.retry.call_with_retry(
  File "/path/to/env/lib/python3.10/site-packages/valkey/retry.py", line 65, in call_with_retry
    fail(error)
  File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 861, in <lambda>
    lambda error: self._disconnect_raise_connect(conn, error),
  File "/path/to/env/lib/python3.10/site-packages/valkey/client.py", line 849, in _disconnect_raise_connect
    conn.connect()
  File "/path/to/env/lib/python3.10/site-packages/valkey/connection.py", line 328, in connect
    self.on_connect()
  File "/path/to/env/lib/python3.10/site-packages/valkey/connection.py", line 430, in on_connect
    self.read_response()
  File "/path/to/env/lib/python3.10/site-packages/valkey/connection.py", line 568, in read_response
    raise ConnectionError(
valkey.exceptions.ConnectionError: Error while reading from localhost:6379 : (104, 'Connection reset by peer')

Possible Cause

From my debug and the traces, it seems that the issue is related to the connect method since the second part of the error trace occurs outside the retry scope.

Quick Patch

A quick workaround is to wrap everything inside a retry call, as shown below. However, this might introduce unexpected side effects:

def connect(self):
    """Connects to the Valkey server if not already connected."""
    if self._sock:
        return

    def _full_connect():
        self._sock = self._connect()
        if self.valkey_connect_func is None:
            # Use the default on_connect function
            self.on_connect()
        else:
            # Use the passed function valkey_connect_func
            self.valkey_connect_func(self)

        # run any user callbacks. right now the only internal callback
        # is for pubsub channel/pattern resubscription
        # first, remove any dead weakrefs
        self._connect_callbacks = [ref for ref in self._connect_callbacks if ref()]
        for ref in self._connect_callbacks:
            callback = ref()
            if callback:
                callback(self)

    try:
        sock = self.retry.call_with_retry(
            lambda: _full_connect(),
            lambda error: self.disconnect(error)
        )
    except socket.timeout:
        raise TimeoutError("Timeout connecting to server")
    except OSError as e:
        raise ConnectionError(self._error_message(e))

Thanks in advance for any help 🙂

Metadata

Metadata

Assignees

Labels

No labels
No labels

Type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions