-
Notifications
You must be signed in to change notification settings - Fork 24
Bug in retry-handling in the PubSub #169
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Hey @ManelCoutinhoSensei, Thanks for your feedback! Sorry it took me a while to respond, was quite busy lately. Also I tried to create a small reproducer to understand your problem but failed. Here's a script I used: #!/usr/bin/env python3
import time
import traceback
import valkey
def event_handler(msg):
print(msg)
def exception_handler(ex, pubsub, thread):
print(traceback.format_exc())
def main():
client = valkey.ValkeyCluster.from_url("valkey://localhost:16379/0")
vk_pubsub = client.pubsub()
vk_pubsub.psubscribe(**{"test": event_handler})
vk_pubsub.run_in_thread(
sleep_time=0.1, daemon=True, exception_handler=exception_handler
)
while True:
time.sleep(1)
if __name__ == "__main__":
main() Then, I did
Please tell me:
Thanks! |
Hey @mkmkme, Sorry for the delayed response. You're absolutely right in your assessment. My example was intended only to demonstrate the error, which I think, conceptually, shouldn't occur given where/how it happens in the code. Let me try to explain it more clearly. If you modify your code as follows: def event_handler(msg):
print(msg)
def exception_handler(ex, pubsub, thread):
print(traceback.format_exc())
def main():
client = valkey.Valkey.from_url("valkey://localhost:6379/0", Retry(ExponentialBackoff(10, 0.5), 5))
vk_pubsub = client.pubsub()
vk_pubsub.psubscribe(**{"test": event_handler})
vk_pubsub.run_in_thread(
sleep_time=0.1, daemon=True, exception_handler=exception_handler
) Instead of immediately triggering the retry mechanism, the process will first break (producing the mentioned error) before retrying automatically. Furthermore, since the underlying To better visualize this, consider running a separate thread that manually reads messages: while True:
try:
msg = vk_pubsub.get_message(timeout=0.01, ignore_subscribe_messages=True)
if msg is None:
continue
print(msg)
except Exception:
print(traceback.format_exc()) After stopping Valkey, its behavior will be:
If you remove the I say this because, if you use other methods like Hope this clarifies things! Let me know if you have any questions. |
Hey @ManelCoutinhoSensei! Thanks, this does clarify it. Unfortunately I don't have to handle this right now as I'm going for a 2-weeks vacation. I'll have a look at it right after it. |
Hey @mkmkme |
Hey @ManelCoutinhoSensei, Unfortunately I've been quite busy with the main job. Worry not, though, as I haven't forgotten about this! I'll try to handle this in the upcoming days and this is now in my high priority list in valkey-py. Sorry for the delay! |
Hey y'all!
It seems like the retry-functionality in the PubSub-client is not working correctly.
Reproduce Error
To reproduce the error I am closing valkey while my system is trying to get new messages in a parallel thread. Even though our connection is set up with an infinite amount of retry, the PubSub-client always crashes and never recovers.
Here is a minimal pseudo code to help reproduce the issue:
Error Trace
Possible Cause
From my debug and the traces, it seems that the issue is related to the
connect
method since the second part of the error trace occurs outside the retry scope.Quick Patch
A quick workaround is to wrap everything inside a retry call, as shown below. However, this might introduce unexpected side effects:
Thanks in advance for any help 🙂
The text was updated successfully, but these errors were encountered: